blob: 976f758624982dd3c744a73676ad3d031ab9a552 [file] [log] [blame]
Austin Schuh20b2b082019-09-11 20:42:56 -07001#ifndef AOS_IPC_LIB_LOCKLESS_QUEUE_H_
2#define AOS_IPC_LIB_LOCKLESS_QUEUE_H_
3
4#include <signal.h>
5#include <sys/signalfd.h>
6#include <sys/types.h>
Austin Schuh20b2b082019-09-11 20:42:56 -07007#include <vector>
8
9#include "aos/ipc_lib/aos_sync.h"
10#include "aos/ipc_lib/index.h"
11#include "aos/time/time.h"
12
13namespace aos {
14namespace ipc_lib {
15
16// Structure to hold the state required to wake a watcher.
17struct Watcher {
18 // Mutex that the watcher locks. If the futex is 0 (or FUTEX_OWNER_DIED),
19 // then this watcher is invalid. The futex variable will then hold the tid of
20 // the watcher, or FUTEX_OWNER_DIED if the task died.
21 //
Brian Silvermanfafe1fa2019-12-18 21:42:18 -080022 // Note: this is only modified with the queue_setup_lock lock held, but may
23 // always be read.
Austin Schuh20b2b082019-09-11 20:42:56 -070024 // Any state modification should happen before the lock is acquired.
25 aos_mutex tid;
26
27 // PID of the watcher.
Brian Silvermanfafe1fa2019-12-18 21:42:18 -080028 std::atomic<pid_t> pid;
Austin Schuh20b2b082019-09-11 20:42:56 -070029
30 // RT priority of the watcher.
Brian Silvermanfafe1fa2019-12-18 21:42:18 -080031 std::atomic<int> priority;
Austin Schuh20b2b082019-09-11 20:42:56 -070032};
33
34// Structure to hold the state required to send messages.
35struct Sender {
36 // Mutex that the sender locks. If the futex is 0 (or FUTEX_OWNER_DIED), then
37 // this sender is invalid. The futex variable will then hold the tid of the
38 // sender, or FUTEX_OWNER_DIED if the task died.
39 //
Brian Silvermanfafe1fa2019-12-18 21:42:18 -080040 // Note: this is only modified with the queue_setup_lock lock held, but may
41 // always be read.
Austin Schuh20b2b082019-09-11 20:42:56 -070042 aos_mutex tid;
43
44 // Index of the message we will be filling out.
45 AtomicIndex scratch_index;
46
47 // Index of the element being swapped with scratch_index, or Invalid if there
48 // is nothing to do.
49 AtomicIndex to_replace;
50};
51
52// Structure representing a message.
53struct Message {
54 struct Header {
55 // Index of this message in the queue. Needs to match the index this
56 // message is written into the queue at. The data in this message is only
57 // valid if it matches the index in the queue both before and after all the
58 // data is read.
59 //
60 // Note: a value of 0xffffffff always means that the contents aren't valid.
61 AtomicQueueIndex queue_index;
62
Brian Silvermanfafe1fa2019-12-18 21:42:18 -080063 // Timestamp of the message. Needs to be monotonically incrementing in the
Austin Schuh20b2b082019-09-11 20:42:56 -070064 // queue, which means that time needs to be re-sampled every time a write
65 // fails.
66 ::aos::monotonic_clock::time_point monotonic_sent_time;
67 ::aos::realtime_clock::time_point realtime_sent_time;
Austin Schuhad154822019-12-27 15:45:13 -080068 // Timestamps of the message from the remote node. These are transparently
69 // passed through.
70 ::aos::monotonic_clock::time_point monotonic_remote_time;
71 ::aos::realtime_clock::time_point realtime_remote_time;
72
73 // Queue index from the remote node.
74 uint32_t remote_queue_index;
Austin Schuh20b2b082019-09-11 20:42:56 -070075
76 size_t length;
77 } header;
78
79 char data[];
80};
81
82struct LocklessQueueConfiguration {
83 // Size of the watchers list.
84 size_t num_watchers;
85 // Size of the sender list.
86 size_t num_senders;
87
88 // Size of the list of pointers into the messages list.
89 size_t queue_size;
90 // Size in bytes of the data stored in each Message.
91 size_t message_data_size;
92
Austin Schuh4bc4f902019-12-23 18:04:51 -080093 size_t message_size() const;
Austin Schuh20b2b082019-09-11 20:42:56 -070094
95 size_t num_messages() const { return num_senders + queue_size; }
96};
97
98// Structure to hold the state of the queue.
99//
100// Reads and writes are lockless and constant time.
101//
102// Adding a new watcher doesn't need to be constant time for the watcher (this
103// is done before the watcher goes RT), but needs to be RT for the sender.
104struct LocklessQueueMemory;
105
106// Initializes the queue memory. memory must be either a valid pointer to the
107// queue datastructure, or must be zero initialized.
108LocklessQueueMemory *InitializeLocklessQueueMemory(
109 LocklessQueueMemory *memory, LocklessQueueConfiguration config);
110
111// Returns the size of the LocklessQueueMemory.
112size_t LocklessQueueMemorySize(LocklessQueueConfiguration config);
113
114// Prints to stdout the data inside the queue for debugging.
115void PrintLocklessQueueMemory(LocklessQueueMemory *memory);
116
Alex Perrycb7da4b2019-08-28 19:35:56 -0700117const static unsigned int kWakeupSignal = SIGRTMIN + 2;
Austin Schuh20b2b082019-09-11 20:42:56 -0700118
119// Class to manage sending and receiving data in the lockless queue. This is
120// separate from the actual memory backing the queue so that memory can be
121// managed with mmap to share across the process boundary.
122class LocklessQueue {
123 public:
124 LocklessQueue(LocklessQueueMemory *memory, LocklessQueueConfiguration config);
125 LocklessQueue(const LocklessQueue &) = delete;
126 LocklessQueue &operator=(const LocklessQueue &) = delete;
127
128 ~LocklessQueue();
129
130 // Returns the number of messages in the queue.
131 size_t QueueSize() const;
132
Alex Perrycb7da4b2019-08-28 19:35:56 -0700133 size_t message_data_size() const;
134
Austin Schuh20b2b082019-09-11 20:42:56 -0700135 // Registers this thread to receive the kWakeupSignal signal when Wakeup is
136 // called. Returns false if there was an error in registration.
137 bool RegisterWakeup(int priority);
138 // Unregisters the wakeup.
139 void UnregisterWakeup();
140
141 // Sends the kWakeupSignal to all threads which have called RegisterWakeup.
142 //
143 // priority of 0 means nonrt. nonrt could have issues, so we don't PI boost
144 // if nonrt.
145 int Wakeup(int current_priority);
146
147 // If you ask for a queue index 2 past the newest, you will still get
148 // NOTHING_NEW until that gets overwritten with new data. If you ask for an
149 // element newer than QueueSize() from the current message, we consider it
Alex Perrycb7da4b2019-08-28 19:35:56 -0700150 // behind by a large amount and return TOO_OLD. If the message is modified
151 // out from underneath us as we read it, return OVERWROTE.
152 enum class ReadResult { TOO_OLD, GOOD, NOTHING_NEW, OVERWROTE };
Austin Schuh20b2b082019-09-11 20:42:56 -0700153 ReadResult Read(uint32_t queue_index,
154 ::aos::monotonic_clock::time_point *monotonic_sent_time,
155 ::aos::realtime_clock::time_point *realtime_sent_time,
Austin Schuhad154822019-12-27 15:45:13 -0800156 ::aos::monotonic_clock::time_point *monotonic_remote_time,
157 ::aos::realtime_clock::time_point *realtime_remote_time,
158 uint32_t *remote_queue_index, size_t *length, char *data);
Austin Schuh20b2b082019-09-11 20:42:56 -0700159
160 // Returns the index to the latest queue message. Returns empty_queue_index()
161 // if there are no messages in the queue. Do note that this index wraps if
162 // more than 2^32 messages are sent.
Alex Perrycb7da4b2019-08-28 19:35:56 -0700163 QueueIndex LatestQueueIndex();
164 static QueueIndex empty_queue_index() { return QueueIndex::Invalid(); }
165
166 // Returns the size of the queue. This is mostly useful for manipulating
167 // QueueIndex.
168 size_t queue_size() const;
Austin Schuh20b2b082019-09-11 20:42:56 -0700169
170 // TODO(austin): Return the oldest queue index. This lets us catch up nicely
171 // if we got behind.
172 // The easiest way to implement this is likely going to be to reserve the
173 // first modulo of values for the initial time around, and never reuse them.
174 // That lets us do a simple atomic read of the next index and deduce what has
175 // happened. It will involve the simplest atomic operations.
176
177 // TODO(austin): Make it so we can find the indices which were sent just
178 // before and after a time with a binary search.
179
180 // Sender for blocks of data. The resources associated with a sender are
181 // scoped to this object's lifetime.
182 class Sender {
183 public:
184 Sender(const Sender &) = delete;
185 Sender &operator=(const Sender &) = delete;
186 Sender(Sender &&other)
187 : memory_(other.memory_), sender_index_(other.sender_index_) {
188 other.memory_ = nullptr;
189 other.sender_index_ = -1;
190 }
191 Sender &operator=(Sender &&other) {
192 memory_ = other.memory_;
193 sender_index_ = other.sender_index_;
194 other.memory_ = nullptr;
195 other.sender_index_ = -1;
196 return *this;
197 }
198
199 ~Sender();
200
Alex Perrycb7da4b2019-08-28 19:35:56 -0700201 // Sends a message without copying the data.
202 // Copy at most size() bytes of data into the memory pointed to by Data(),
203 // and then call Send().
204 // Note: calls to Data() are expensive enough that you should cache it.
205 size_t size();
206 void *Data();
Austin Schuhad154822019-12-27 15:45:13 -0800207 void Send(size_t length,
208 aos::monotonic_clock::time_point monotonic_remote_time =
209 aos::monotonic_clock::min_time,
210 aos::realtime_clock::time_point realtime_remote_time =
211 aos::realtime_clock::min_time,
212 uint32_t remote_queue_index = 0xffffffff,
213 aos::monotonic_clock::time_point *monotonic_sent_time = nullptr,
214 aos::realtime_clock::time_point *realtime_sent_time = nullptr,
215 uint32_t *queue_index = nullptr);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700216
Austin Schuh20b2b082019-09-11 20:42:56 -0700217 // Sends up to length data. Does not wakeup the target.
Austin Schuhad154822019-12-27 15:45:13 -0800218 void Send(const char *data, size_t length,
219 aos::monotonic_clock::time_point monotonic_remote_time =
220 aos::monotonic_clock::min_time,
221 aos::realtime_clock::time_point realtime_remote_time =
222 aos::realtime_clock::min_time,
223 uint32_t remote_queue_index = 0xffffffff,
224 aos::monotonic_clock::time_point *monotonic_sent_time = nullptr,
225 aos::realtime_clock::time_point *realtime_sent_time = nullptr,
226 uint32_t *queue_index = nullptr);
Austin Schuh20b2b082019-09-11 20:42:56 -0700227
228 private:
229 friend class LocklessQueue;
230
231 Sender(LocklessQueueMemory *memory);
232
233 // Pointer to the backing memory.
234 LocklessQueueMemory *memory_ = nullptr;
235
236 // Index into the sender list.
237 int sender_index_ = -1;
238 };
239
240 // Creates a sender.
241 Sender MakeSender();
242
243 private:
244 LocklessQueueMemory *memory_ = nullptr;
245
246 // Memory and datastructure used to sort a list of watchers to wake
247 // up. This isn't a copy of Watcher since tid is simpler to work with here
248 // than the futex above.
249 struct WatcherCopy {
250 pid_t tid;
251 pid_t pid;
252 int priority;
253 };
254 // TODO(austin): Don't allocate this memory if we aren't going to send.
255 ::std::vector<WatcherCopy> watcher_copy_;
256
257 // Index in the watcher list that our entry is, or -1 if no watcher is
258 // registered.
259 int watcher_index_ = -1;
260
261 const int pid_;
262 const uid_t uid_;
263};
264
265} // namespace ipc_lib
266} // namespace aos
267
268#endif // AOS_IPC_LIB_LOCKLESS_QUEUE_H_