blob: 3cd372681c0d2b7ad292f4fc7932e4d54280d092 [file] [log] [blame]
Austin Schuh20b2b082019-09-11 20:42:56 -07001#ifndef AOS_IPC_LIB_LOCKLESS_QUEUE_H_
2#define AOS_IPC_LIB_LOCKLESS_QUEUE_H_
3
4#include <signal.h>
5#include <sys/signalfd.h>
6#include <sys/types.h>
Austin Schuhe516ab02020-05-06 21:37:04 -07007#include <optional>
Brian Silverman177567e2020-08-12 19:51:33 -07008#include <vector>
Austin Schuh20b2b082019-09-11 20:42:56 -07009
10#include "aos/ipc_lib/aos_sync.h"
Brian Silvermana1652f32020-01-29 20:41:44 -080011#include "aos/ipc_lib/data_alignment.h"
Austin Schuh20b2b082019-09-11 20:42:56 -070012#include "aos/ipc_lib/index.h"
13#include "aos/time/time.h"
14
15namespace aos {
16namespace ipc_lib {
17
18// Structure to hold the state required to wake a watcher.
19struct Watcher {
20 // Mutex that the watcher locks. If the futex is 0 (or FUTEX_OWNER_DIED),
21 // then this watcher is invalid. The futex variable will then hold the tid of
22 // the watcher, or FUTEX_OWNER_DIED if the task died.
23 //
Brian Silvermanfafe1fa2019-12-18 21:42:18 -080024 // Note: this is only modified with the queue_setup_lock lock held, but may
25 // always be read.
Austin Schuh20b2b082019-09-11 20:42:56 -070026 // Any state modification should happen before the lock is acquired.
27 aos_mutex tid;
28
29 // PID of the watcher.
Brian Silvermanfafe1fa2019-12-18 21:42:18 -080030 std::atomic<pid_t> pid;
Austin Schuh20b2b082019-09-11 20:42:56 -070031
32 // RT priority of the watcher.
Brian Silvermanfafe1fa2019-12-18 21:42:18 -080033 std::atomic<int> priority;
Austin Schuh20b2b082019-09-11 20:42:56 -070034};
35
36// Structure to hold the state required to send messages.
37struct Sender {
38 // Mutex that the sender locks. If the futex is 0 (or FUTEX_OWNER_DIED), then
39 // this sender is invalid. The futex variable will then hold the tid of the
40 // sender, or FUTEX_OWNER_DIED if the task died.
41 //
Brian Silvermanfafe1fa2019-12-18 21:42:18 -080042 // Note: this is only modified with the queue_setup_lock lock held, but may
43 // always be read.
Austin Schuh20b2b082019-09-11 20:42:56 -070044 aos_mutex tid;
45
46 // Index of the message we will be filling out.
47 AtomicIndex scratch_index;
48
49 // Index of the element being swapped with scratch_index, or Invalid if there
50 // is nothing to do.
51 AtomicIndex to_replace;
52};
53
Brian Silverman177567e2020-08-12 19:51:33 -070054// Structure to hold the state required to pin messages.
55struct Pinner {
56 // The same as Sender::tid. See there for docs.
57 aos_mutex tid;
58
59 // Queue index of the message we have pinned, or Invalid if there isn't one.
60 AtomicQueueIndex pinned;
61
62 // This should always be valid.
63 //
64 // Note that this is fully independent from pinned. It's just a place to stash
65 // a message, to ensure there's always an unpinned one for a writer to grab.
66 AtomicIndex scratch_index;
67};
68
Austin Schuh20b2b082019-09-11 20:42:56 -070069// Structure representing a message.
70struct Message {
71 struct Header {
72 // Index of this message in the queue. Needs to match the index this
73 // message is written into the queue at. The data in this message is only
74 // valid if it matches the index in the queue both before and after all the
75 // data is read.
76 //
77 // Note: a value of 0xffffffff always means that the contents aren't valid.
78 AtomicQueueIndex queue_index;
79
Brian Silvermanfafe1fa2019-12-18 21:42:18 -080080 // Timestamp of the message. Needs to be monotonically incrementing in the
Austin Schuh20b2b082019-09-11 20:42:56 -070081 // queue, which means that time needs to be re-sampled every time a write
82 // fails.
83 ::aos::monotonic_clock::time_point monotonic_sent_time;
84 ::aos::realtime_clock::time_point realtime_sent_time;
Austin Schuhad154822019-12-27 15:45:13 -080085 // Timestamps of the message from the remote node. These are transparently
86 // passed through.
87 ::aos::monotonic_clock::time_point monotonic_remote_time;
88 ::aos::realtime_clock::time_point realtime_remote_time;
89
90 // Queue index from the remote node.
91 uint32_t remote_queue_index;
Austin Schuh20b2b082019-09-11 20:42:56 -070092
93 size_t length;
94 } header;
95
Brian Silvermana1652f32020-01-29 20:41:44 -080096 char *data(size_t message_size) { return RoundedData(message_size); }
97 const char *data(size_t message_size) const {
98 return RoundedData(message_size);
99 }
100
101 private:
102 // This returns a non-const pointer into a const object. Be very careful about
103 // const correctness in publicly accessible APIs using it.
104 char *RoundedData(size_t message_size) const {
105 return RoundChannelData(const_cast<char *>(&data_pointer[0]), message_size);
106 }
107
108 char data_pointer[];
Austin Schuh20b2b082019-09-11 20:42:56 -0700109};
110
111struct LocklessQueueConfiguration {
112 // Size of the watchers list.
113 size_t num_watchers;
114 // Size of the sender list.
115 size_t num_senders;
Brian Silverman177567e2020-08-12 19:51:33 -0700116 // Size of the pinner list.
117 size_t num_pinners;
Austin Schuh20b2b082019-09-11 20:42:56 -0700118
119 // Size of the list of pointers into the messages list.
120 size_t queue_size;
121 // Size in bytes of the data stored in each Message.
122 size_t message_data_size;
123
Austin Schuh4bc4f902019-12-23 18:04:51 -0800124 size_t message_size() const;
Austin Schuh20b2b082019-09-11 20:42:56 -0700125
Brian Silverman177567e2020-08-12 19:51:33 -0700126 size_t num_messages() const { return num_senders + num_pinners + queue_size; }
Austin Schuh20b2b082019-09-11 20:42:56 -0700127};
128
129// Structure to hold the state of the queue.
130//
131// Reads and writes are lockless and constant time.
132//
133// Adding a new watcher doesn't need to be constant time for the watcher (this
134// is done before the watcher goes RT), but needs to be RT for the sender.
135struct LocklessQueueMemory;
136
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700137// Returns the size of the LocklessQueueMemory.
138size_t LocklessQueueMemorySize(LocklessQueueConfiguration config);
139
Austin Schuh20b2b082019-09-11 20:42:56 -0700140// Initializes the queue memory. memory must be either a valid pointer to the
141// queue datastructure, or must be zero initialized.
142LocklessQueueMemory *InitializeLocklessQueueMemory(
143 LocklessQueueMemory *memory, LocklessQueueConfiguration config);
144
Alex Perrycb7da4b2019-08-28 19:35:56 -0700145const static unsigned int kWakeupSignal = SIGRTMIN + 2;
Austin Schuh20b2b082019-09-11 20:42:56 -0700146
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700147// A convenient wrapper for accessing a lockless queue.
Austin Schuh20b2b082019-09-11 20:42:56 -0700148class LocklessQueue {
149 public:
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700150 LocklessQueue(const LocklessQueueMemory *const_memory,
151 LocklessQueueMemory *memory, LocklessQueueConfiguration config)
152 : const_memory_(const_memory), memory_(memory), config_(config) {}
Austin Schuh20b2b082019-09-11 20:42:56 -0700153
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700154 void Initialize();
Austin Schuh20b2b082019-09-11 20:42:56 -0700155
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700156 LocklessQueueConfiguration config() const { return config_; }
Austin Schuh20b2b082019-09-11 20:42:56 -0700157
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700158 const LocklessQueueMemory *const_memory() { return const_memory_; }
159 LocklessQueueMemory *memory() { return memory_; }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700160
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700161 private:
162 const LocklessQueueMemory *const_memory_;
163 LocklessQueueMemory *memory_;
164 LocklessQueueConfiguration config_;
165};
166
167class LocklessQueueWatcher {
168 public:
169 LocklessQueueWatcher(const LocklessQueueWatcher &) = delete;
170 LocklessQueueWatcher &operator=(const LocklessQueueWatcher &) = delete;
171 LocklessQueueWatcher(LocklessQueueWatcher &&other)
172 : memory_(other.memory_), watcher_index_(other.watcher_index_) {
173 other.watcher_index_ = -1;
174 }
175 LocklessQueueWatcher &operator=(LocklessQueueWatcher &&other) {
176 std::swap(memory_, other.memory_);
177 std::swap(watcher_index_, other.watcher_index_);
178 return *this;
179 }
180
181 ~LocklessQueueWatcher();
182
183 // Registers this thread to receive the kWakeupSignal signal when
184 // LocklessQueueWakeUpper::Wakeup is called. Returns nullopt if there was an
185 // error in registration.
186 // TODO(austin): Change the API if we find ourselves with more errors.
187 static std::optional<LocklessQueueWatcher> Make(LocklessQueue queue,
188 int priority);
189
190 private:
191 LocklessQueueWatcher(LocklessQueueMemory *memory, int priority);
192
193 LocklessQueueMemory *memory_ = nullptr;
194
195 // Index in the watcher list that our entry is, or -1 if no watcher is
196 // registered.
197 int watcher_index_ = -1;
198};
199
200class LocklessQueueWakeUpper {
201 public:
202 LocklessQueueWakeUpper(LocklessQueue queue);
Austin Schuh20b2b082019-09-11 20:42:56 -0700203
204 // Sends the kWakeupSignal to all threads which have called RegisterWakeup.
205 //
206 // priority of 0 means nonrt. nonrt could have issues, so we don't PI boost
207 // if nonrt.
208 int Wakeup(int current_priority);
209
Austin Schuh20b2b082019-09-11 20:42:56 -0700210 private:
Austin Schuh20b2b082019-09-11 20:42:56 -0700211 // Memory and datastructure used to sort a list of watchers to wake
212 // up. This isn't a copy of Watcher since tid is simpler to work with here
213 // than the futex above.
214 struct WatcherCopy {
215 pid_t tid;
216 pid_t pid;
217 int priority;
218 };
Austin Schuh20b2b082019-09-11 20:42:56 -0700219
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700220 const LocklessQueueMemory *const memory_;
Austin Schuh20b2b082019-09-11 20:42:56 -0700221 const int pid_;
222 const uid_t uid_;
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700223
224 ::std::vector<WatcherCopy> watcher_copy_;
Austin Schuh20b2b082019-09-11 20:42:56 -0700225};
226
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700227// Sender for blocks of data. The resources associated with a sender are
228// scoped to this object's lifetime.
229class LocklessQueueSender {
230 public:
231 LocklessQueueSender(const LocklessQueueSender &) = delete;
232 LocklessQueueSender &operator=(const LocklessQueueSender &) = delete;
233 LocklessQueueSender(LocklessQueueSender &&other)
234 : memory_(other.memory_), sender_index_(other.sender_index_) {
235 other.memory_ = nullptr;
236 other.sender_index_ = -1;
237 }
238 LocklessQueueSender &operator=(LocklessQueueSender &&other) {
239 std::swap(memory_, other.memory_);
240 std::swap(sender_index_, other.sender_index_);
241 return *this;
242 }
243
244 ~LocklessQueueSender();
245
246 // Creates a sender. If we couldn't allocate a sender, returns nullopt.
247 // TODO(austin): Change the API if we find ourselves with more errors.
248 static std::optional<LocklessQueueSender> Make(LocklessQueue queue);
249
250 // Sends a message without copying the data.
251 // Copy at most size() bytes of data into the memory pointed to by Data(),
252 // and then call Send().
253 // Note: calls to Data() are expensive enough that you should cache it.
254 size_t size() const;
255 void *Data();
256 void Send(size_t length,
257 aos::monotonic_clock::time_point monotonic_remote_time =
258 aos::monotonic_clock::min_time,
259 aos::realtime_clock::time_point realtime_remote_time =
260 aos::realtime_clock::min_time,
261 uint32_t remote_queue_index = 0xffffffff,
262 aos::monotonic_clock::time_point *monotonic_sent_time = nullptr,
263 aos::realtime_clock::time_point *realtime_sent_time = nullptr,
264 uint32_t *queue_index = nullptr);
265
266 // Sends up to length data. Does not wakeup the target.
267 void Send(const char *data, size_t length,
268 aos::monotonic_clock::time_point monotonic_remote_time =
269 aos::monotonic_clock::min_time,
270 aos::realtime_clock::time_point realtime_remote_time =
271 aos::realtime_clock::min_time,
272 uint32_t remote_queue_index = 0xffffffff,
273 aos::monotonic_clock::time_point *monotonic_sent_time = nullptr,
274 aos::realtime_clock::time_point *realtime_sent_time = nullptr,
275 uint32_t *queue_index = nullptr);
276
277 int buffer_index() const;
278
279 private:
280 LocklessQueueSender(LocklessQueueMemory *memory);
281
282 // Pointer to the backing memory.
283 LocklessQueueMemory *memory_ = nullptr;
284
285 // Index into the sender list.
286 int sender_index_ = -1;
287};
288
289// Pinner for blocks of data. The resources associated with a pinner are
290// scoped to this object's lifetime.
291class LocklessQueuePinner {
292 public:
293 LocklessQueuePinner(const LocklessQueuePinner &) = delete;
294 LocklessQueuePinner &operator=(const LocklessQueuePinner &) = delete;
295 LocklessQueuePinner(LocklessQueuePinner &&other)
296 : memory_(other.memory_),
297 const_memory_(other.const_memory_),
298 pinner_index_(other.pinner_index_) {
299 other.pinner_index_ = -1;
300 }
301 LocklessQueuePinner &operator=(LocklessQueuePinner &&other) {
302 std::swap(memory_, other.memory_);
303 std::swap(const_memory_, other.const_memory_);
304 std::swap(pinner_index_, other.pinner_index_);
305 return *this;
306 }
307
308 ~LocklessQueuePinner();
309
310 // Creates a pinner. If we couldn't allocate a pinner, returns nullopt.
311 // TODO(austin): Change the API if we find ourselves with more errors.
312 static std::optional<LocklessQueuePinner> Make(LocklessQueue queue);
313
314 // Attempts to pin the message at queue_index.
315 // Un-pins the previous message.
316 // Returns the buffer index (non-negative) if it succeeds.
317 // Returns -1 if that message is no longer in the queue.
318 int PinIndex(uint32_t queue_index);
319
320 // Read at most size() bytes of data into the memory pointed to by Data().
321 // Note: calls to Data() are expensive enough that you should cache it.
322 // Don't call Data() before a successful PinIndex call.
323 size_t size() const;
324 const void *Data() const;
325
326 private:
327 LocklessQueuePinner(LocklessQueueMemory *memory,
328 const LocklessQueueMemory *const_memory);
329
330 // Pointer to the backing memory.
331 LocklessQueueMemory *memory_ = nullptr;
332 const LocklessQueueMemory *const_memory_ = nullptr;
333
334 // Index into the pinner list.
335 int pinner_index_ = -1;
336};
337
338class LocklessQueueReader {
339 public:
340 enum class Result { TOO_OLD, GOOD, NOTHING_NEW, OVERWROTE };
341
342 LocklessQueueReader(LocklessQueue queue) : memory_(queue.const_memory()) {
343 queue.Initialize();
344 }
345
346 // If you ask for a queue index 2 past the newest, you will still get
347 // NOTHING_NEW until that gets overwritten with new data. If you ask for an
348 // element newer than QueueSize() from the current message, we consider it
349 // behind by a large amount and return TOO_OLD. If the message is modified
350 // out from underneath us as we read it, return OVERWROTE.
351 //
352 // data may be nullptr to indicate the data should not be copied.
353 Result Read(uint32_t queue_index,
354 ::aos::monotonic_clock::time_point *monotonic_sent_time,
355 ::aos::realtime_clock::time_point *realtime_sent_time,
356 ::aos::monotonic_clock::time_point *monotonic_remote_time,
357 ::aos::realtime_clock::time_point *realtime_remote_time,
358 uint32_t *remote_queue_index, size_t *length, char *data) const;
359
360 // Returns the index to the latest queue message. Returns empty_queue_index()
361 // if there are no messages in the queue. Do note that this index wraps if
362 // more than 2^32 messages are sent.
363 QueueIndex LatestIndex() const;
364
365 private:
366 const LocklessQueueMemory *const memory_;
367};
368
369// Returns the number of messages which are logically in the queue at a time.
370size_t LocklessQueueSize(const LocklessQueueMemory *memory);
371
372// Returns the number of bytes queue users are allowed to read/write within each
373// message.
374size_t LocklessQueueMessageDataSize(const LocklessQueueMemory *memory);
375
376// TODO(austin): Return the oldest queue index. This lets us catch up nicely
377// if we got behind.
378// The easiest way to implement this is likely going to be to reserve the
379// first modulo of values for the initial time around, and never reuse them.
380// That lets us do a simple atomic read of the next index and deduce what has
381// happened. It will involve the simplest atomic operations.
382
383// TODO(austin): Make it so we can find the indices which were sent just
384// before and after a time with a binary search.
385
386// Prints to stdout the data inside the queue for debugging.
387void PrintLocklessQueueMemory(LocklessQueueMemory *memory);
388
Austin Schuh20b2b082019-09-11 20:42:56 -0700389} // namespace ipc_lib
390} // namespace aos
391
392#endif // AOS_IPC_LIB_LOCKLESS_QUEUE_H_