blob: c87a37b842556e9cb9047467034a9829014e651f [file] [log] [blame]
Brian Silvermanf7bd1c22015-12-24 16:07:11 -08001/*----------------------------------------------------------------------------*/
2/* Copyright (c) FIRST 2015. All Rights Reserved. */
3/* Open Source Software - may be modified and shared by FRC teams. The code */
4/* must be accompanied by the FIRST BSD license file in the root directory of */
5/* the project. */
6/*----------------------------------------------------------------------------*/
7
8#ifndef NT_STORAGE_H_
9#define NT_STORAGE_H_
10
11#include <atomic>
12#include <cstddef>
13#include <fstream>
14#include <functional>
15#include <iosfwd>
16#include <memory>
17#include <mutex>
18#include <vector>
19
20#include "llvm/DenseMap.h"
21#include "llvm/StringMap.h"
22#include "atomic_static.h"
23#include "Message.h"
24#include "Notifier.h"
25#include "ntcore_cpp.h"
26#include "RpcServer.h"
27#include "SequenceNumber.h"
28
29namespace nt {
30
31class NetworkConnection;
32class StorageTest;
33
34class Storage {
35 friend class StorageTest;
36 public:
37 static Storage& GetInstance() {
38 ATOMIC_STATIC(Storage, instance);
39 return instance;
40 }
41 ~Storage();
42
43 // Accessors required by Dispatcher. A function pointer is used for
44 // generation of outgoing messages to break a dependency loop between
45 // Storage and Dispatcher; in operation this is always set to
46 // Dispatcher::QueueOutgoing.
47 typedef std::function<void(std::shared_ptr<Message> msg,
48 NetworkConnection* only,
49 NetworkConnection* except)> QueueOutgoingFunc;
50 void SetOutgoing(QueueOutgoingFunc queue_outgoing, bool server);
51 void ClearOutgoing();
52
53 // Required for wire protocol 2.0 to get the entry type of an entry when
54 // receiving entry updates (because the length/type is not provided in the
55 // message itself). Not used in wire protocol 3.0.
56 NT_Type GetEntryType(unsigned int id) const;
57
58 void ProcessIncoming(std::shared_ptr<Message> msg, NetworkConnection* conn,
59 std::weak_ptr<NetworkConnection> conn_weak);
60 void GetInitialAssignments(NetworkConnection& conn,
61 std::vector<std::shared_ptr<Message>>* msgs);
62 void ApplyInitialAssignments(NetworkConnection& conn,
63 llvm::ArrayRef<std::shared_ptr<Message>> msgs,
64 bool new_server,
65 std::vector<std::shared_ptr<Message>>* out_msgs);
66
67 // User functions. These are the actual implementations of the corresponding
68 // user API functions in ntcore_cpp.
69 std::shared_ptr<Value> GetEntryValue(StringRef name) const;
70 bool SetEntryValue(StringRef name, std::shared_ptr<Value> value);
71 void SetEntryTypeValue(StringRef name, std::shared_ptr<Value> value);
72 void SetEntryFlags(StringRef name, unsigned int flags);
73 unsigned int GetEntryFlags(StringRef name) const;
74 void DeleteEntry(StringRef name);
75 void DeleteAllEntries();
76 std::vector<EntryInfo> GetEntryInfo(StringRef prefix, unsigned int types);
77 void NotifyEntries(StringRef prefix,
78 EntryListenerCallback only = nullptr) const;
79
80 // Filename-based save/load functions. Used both by periodic saves and
81 // accessible directly via the user API.
82 const char* SavePersistent(StringRef filename, bool periodic) const;
83 const char* LoadPersistent(
84 StringRef filename,
85 std::function<void(std::size_t line, const char* msg)> warn);
86
87 // Stream-based save/load functions (exposed for testing purposes). These
88 // implement the guts of the filename-based functions.
89 void SavePersistent(std::ostream& os, bool periodic) const;
90 bool LoadPersistent(
91 std::istream& is,
92 std::function<void(std::size_t line, const char* msg)> warn);
93
94 // RPC configuration needs to come through here as RPC definitions are
95 // actually special Storage value types.
96 void CreateRpc(StringRef name, StringRef def, RpcCallback callback);
97 void CreatePolledRpc(StringRef name, StringRef def);
98
99 unsigned int CallRpc(StringRef name, StringRef params);
100 bool GetRpcResult(bool blocking, unsigned int call_uid, std::string* result);
101
102 private:
103 Storage();
104 Storage(Notifier& notifier, RpcServer& rpcserver);
105 Storage(const Storage&) = delete;
106 Storage& operator=(const Storage&) = delete;
107
108 // Data for each table entry.
109 struct Entry {
110 Entry(llvm::StringRef name_)
111 : name(name_), flags(0), id(0xffff), rpc_call_uid(0) {}
112 bool IsPersistent() const { return (flags & NT_PERSISTENT) != 0; }
113
114 // We redundantly store the name so that it's available when accessing the
115 // raw Entry* via the ID map.
116 std::string name;
117
118 // The current value and flags.
119 std::shared_ptr<Value> value;
120 unsigned int flags;
121
122 // Unique ID for this entry as used in network messages. The value is
123 // assigned by the server, so on the client this is 0xffff until an
124 // entry assignment is received back from the server.
125 unsigned int id;
126
127 // Sequence number for update resolution.
128 SequenceNumber seq_num;
129
130 // RPC callback function. Null if either not an RPC or if the RPC is
131 // polled.
132 RpcCallback rpc_callback;
133
134 // Last UID used when calling this RPC (primarily for client use). This
135 // is incremented for each call.
136 unsigned int rpc_call_uid;
137 };
138
139 typedef llvm::StringMap<std::unique_ptr<Entry>> EntriesMap;
140 typedef std::vector<Entry*> IdMap;
141 typedef llvm::DenseMap<std::pair<unsigned int, unsigned int>, std::string>
142 RpcResultMap;
143
144 mutable std::mutex m_mutex;
145 EntriesMap m_entries;
146 IdMap m_idmap;
147 RpcResultMap m_rpc_results;
148 // If any persistent values have changed
149 mutable bool m_persistent_dirty = false;
150
151 // condition variable and termination flag for blocking on a RPC result
152 std::atomic_bool m_terminating;
153 std::condition_variable m_rpc_results_cond;
154
155 // configured by dispatcher at startup
156 QueueOutgoingFunc m_queue_outgoing;
157 bool m_server = true;
158
159 // references to singletons (we don't grab them directly for testing purposes)
160 Notifier& m_notifier;
161 RpcServer& m_rpc_server;
162
163 bool GetPersistentEntries(
164 bool periodic,
165 std::vector<std::pair<std::string, std::shared_ptr<Value>>>* entries)
166 const;
167
168 ATOMIC_STATIC_DECL(Storage)
169};
170
171} // namespace nt
172
173#endif // NT_STORAGE_H_