blob: f7a85c3ea19c3f25bcb1666decfa13d4a4ff1263 [file] [log] [blame]
Austin Schuh20b2b082019-09-11 20:42:56 -07001#ifndef AOS_IPC_LIB_LOCKLESS_QUEUE_H_
2#define AOS_IPC_LIB_LOCKLESS_QUEUE_H_
3
Austin Schuh20b2b082019-09-11 20:42:56 -07004#include <sys/signalfd.h>
5#include <sys/types.h>
Tyler Chatowbf0609c2021-07-31 16:13:27 -07006
7#include <csignal>
Austin Schuhe516ab02020-05-06 21:37:04 -07008#include <optional>
Brian Silverman177567e2020-08-12 19:51:33 -07009#include <vector>
Austin Schuh20b2b082019-09-11 20:42:56 -070010
Brian Silverman0eaa1da2020-08-12 20:03:52 -070011#include "absl/types/span.h"
Austin Schuh20b2b082019-09-11 20:42:56 -070012#include "aos/ipc_lib/aos_sync.h"
Brian Silvermana1652f32020-01-29 20:41:44 -080013#include "aos/ipc_lib/data_alignment.h"
Austin Schuh20b2b082019-09-11 20:42:56 -070014#include "aos/ipc_lib/index.h"
15#include "aos/time/time.h"
Austin Schuh8902fa52021-03-14 22:39:24 -070016#include "aos/uuid.h"
Austin Schuh20b2b082019-09-11 20:42:56 -070017
18namespace aos {
19namespace ipc_lib {
20
21// Structure to hold the state required to wake a watcher.
22struct Watcher {
23 // Mutex that the watcher locks. If the futex is 0 (or FUTEX_OWNER_DIED),
24 // then this watcher is invalid. The futex variable will then hold the tid of
25 // the watcher, or FUTEX_OWNER_DIED if the task died.
26 //
Brian Silvermanfafe1fa2019-12-18 21:42:18 -080027 // Note: this is only modified with the queue_setup_lock lock held, but may
28 // always be read.
Austin Schuh20b2b082019-09-11 20:42:56 -070029 // Any state modification should happen before the lock is acquired.
30 aos_mutex tid;
31
32 // PID of the watcher.
Brian Silvermanfafe1fa2019-12-18 21:42:18 -080033 std::atomic<pid_t> pid;
Austin Schuh20b2b082019-09-11 20:42:56 -070034
35 // RT priority of the watcher.
Brian Silvermanfafe1fa2019-12-18 21:42:18 -080036 std::atomic<int> priority;
Austin Schuh20b2b082019-09-11 20:42:56 -070037};
38
39// Structure to hold the state required to send messages.
40struct Sender {
41 // Mutex that the sender locks. If the futex is 0 (or FUTEX_OWNER_DIED), then
42 // this sender is invalid. The futex variable will then hold the tid of the
43 // sender, or FUTEX_OWNER_DIED if the task died.
44 //
Brian Silvermanfafe1fa2019-12-18 21:42:18 -080045 // Note: this is only modified with the queue_setup_lock lock held, but may
46 // always be read.
Austin Schuh20b2b082019-09-11 20:42:56 -070047 aos_mutex tid;
48
49 // Index of the message we will be filling out.
50 AtomicIndex scratch_index;
51
52 // Index of the element being swapped with scratch_index, or Invalid if there
53 // is nothing to do.
54 AtomicIndex to_replace;
55};
56
Brian Silverman177567e2020-08-12 19:51:33 -070057// Structure to hold the state required to pin messages.
58struct Pinner {
59 // The same as Sender::tid. See there for docs.
60 aos_mutex tid;
61
62 // Queue index of the message we have pinned, or Invalid if there isn't one.
63 AtomicQueueIndex pinned;
64
65 // This should always be valid.
66 //
67 // Note that this is fully independent from pinned. It's just a place to stash
68 // a message, to ensure there's always an unpinned one for a writer to grab.
69 AtomicIndex scratch_index;
70};
71
Austin Schuh20b2b082019-09-11 20:42:56 -070072// Structure representing a message.
73struct Message {
74 struct Header {
75 // Index of this message in the queue. Needs to match the index this
76 // message is written into the queue at. The data in this message is only
77 // valid if it matches the index in the queue both before and after all the
78 // data is read.
79 //
80 // Note: a value of 0xffffffff always means that the contents aren't valid.
81 AtomicQueueIndex queue_index;
82
Brian Silvermanfafe1fa2019-12-18 21:42:18 -080083 // Timestamp of the message. Needs to be monotonically incrementing in the
Austin Schuh20b2b082019-09-11 20:42:56 -070084 // queue, which means that time needs to be re-sampled every time a write
85 // fails.
Austin Schuhb5c6f972021-03-14 21:53:07 -070086 monotonic_clock::time_point monotonic_sent_time;
87 realtime_clock::time_point realtime_sent_time;
Austin Schuhad154822019-12-27 15:45:13 -080088 // Timestamps of the message from the remote node. These are transparently
89 // passed through.
Austin Schuhb5c6f972021-03-14 21:53:07 -070090 monotonic_clock::time_point monotonic_remote_time;
91 realtime_clock::time_point realtime_remote_time;
Austin Schuhad154822019-12-27 15:45:13 -080092
93 // Queue index from the remote node.
94 uint32_t remote_queue_index;
Austin Schuh20b2b082019-09-11 20:42:56 -070095
Austin Schuh8902fa52021-03-14 22:39:24 -070096 // Remote boot UUID for this message.
Austin Schuha9012be2021-07-21 15:19:11 -070097 UUID source_boot_uuid;
Austin Schuh8902fa52021-03-14 22:39:24 -070098
Austin Schuh20b2b082019-09-11 20:42:56 -070099 size_t length;
100 } header;
101
Brian Silverman0eaa1da2020-08-12 20:03:52 -0700102 // Returns the start of the data buffer, given that message_data_size is
103 // the same one used to allocate this message's memory.
104 char *data(size_t message_data_size) {
105 return RoundedData(message_data_size);
106 }
107 const char *data(size_t message_data_size) const {
108 return RoundedData(message_data_size);
109 }
110
111 // Returns the pre-buffer redzone, given that message_data_size is the same
112 // one used to allocate this message's memory.
113 absl::Span<char> PreRedzone(size_t message_data_size) {
114 char *const end = data(message_data_size);
115 const auto result =
116 absl::Span<char>(&data_pointer[0], end - &data_pointer[0]);
117 DCHECK_LT(result.size(), kChannelDataRedzone + kChannelDataAlignment);
118 return result;
119 }
120 absl::Span<const char> PreRedzone(size_t message_data_size) const {
121 const char *const end = data(message_data_size);
122 const auto result =
123 absl::Span<const char>(&data_pointer[0], end - &data_pointer[0]);
124 DCHECK_LT(result.size(), kChannelDataRedzone + kChannelDataAlignment);
125 return result;
126 }
127
128 // Returns the post-buffer redzone, given that message_data_size is the same
129 // one used to allocate this message's memory.
130 absl::Span<char> PostRedzone(size_t message_data_size, size_t message_size) {
131 DCHECK_LT(message_data_size, message_size);
132 char *const redzone_end = reinterpret_cast<char *>(this) + message_size;
133 char *const data_end = data(message_data_size) + message_data_size;
134 DCHECK_GT(static_cast<void *>(redzone_end), static_cast<void *>(data_end));
135 const auto result = absl::Span<char>(data_end, redzone_end - data_end);
136 DCHECK_LT(result.size(), kChannelDataRedzone + kChannelDataAlignment * 2);
137 return result;
138 }
139 absl::Span<const char> PostRedzone(size_t message_data_size,
140 size_t message_size) const {
141 DCHECK_LT(message_data_size, message_size);
142 const char *const redzone_end =
143 reinterpret_cast<const char *>(this) + message_size;
144 const char *const data_end = data(message_data_size) + message_data_size;
145 DCHECK_GT(static_cast<const void *>(redzone_end),
146 static_cast<const void *>(data_end));
147 const auto result =
148 absl::Span<const char>(data_end, redzone_end - data_end);
149 DCHECK_LT(result.size(), kChannelDataRedzone + kChannelDataAlignment * 2);
150 return result;
Brian Silvermana1652f32020-01-29 20:41:44 -0800151 }
152
153 private:
Brian Silverman0eaa1da2020-08-12 20:03:52 -0700154 // This returns a non-const pointer into a const object. Be very careful
155 // about const correctness in publicly accessible APIs using it.
156 char *RoundedData(size_t message_data_size) const {
157 return RoundChannelData(
158 const_cast<char *>(&data_pointer[0] + kChannelDataRedzone),
159 message_data_size);
Brian Silvermana1652f32020-01-29 20:41:44 -0800160 }
161
162 char data_pointer[];
Austin Schuh20b2b082019-09-11 20:42:56 -0700163};
164
165struct LocklessQueueConfiguration {
166 // Size of the watchers list.
167 size_t num_watchers;
168 // Size of the sender list.
169 size_t num_senders;
Brian Silverman177567e2020-08-12 19:51:33 -0700170 // Size of the pinner list.
171 size_t num_pinners;
Austin Schuh20b2b082019-09-11 20:42:56 -0700172
173 // Size of the list of pointers into the messages list.
174 size_t queue_size;
175 // Size in bytes of the data stored in each Message.
176 size_t message_data_size;
177
Austin Schuh4bc4f902019-12-23 18:04:51 -0800178 size_t message_size() const;
Austin Schuh20b2b082019-09-11 20:42:56 -0700179
Brian Silverman177567e2020-08-12 19:51:33 -0700180 size_t num_messages() const { return num_senders + num_pinners + queue_size; }
Austin Schuh20b2b082019-09-11 20:42:56 -0700181};
182
183// Structure to hold the state of the queue.
184//
185// Reads and writes are lockless and constant time.
186//
187// Adding a new watcher doesn't need to be constant time for the watcher (this
188// is done before the watcher goes RT), but needs to be RT for the sender.
189struct LocklessQueueMemory;
190
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700191// Returns the size of the LocklessQueueMemory.
192size_t LocklessQueueMemorySize(LocklessQueueConfiguration config);
193
Austin Schuh20b2b082019-09-11 20:42:56 -0700194// Initializes the queue memory. memory must be either a valid pointer to the
195// queue datastructure, or must be zero initialized.
196LocklessQueueMemory *InitializeLocklessQueueMemory(
197 LocklessQueueMemory *memory, LocklessQueueConfiguration config);
198
Alex Perrycb7da4b2019-08-28 19:35:56 -0700199const static unsigned int kWakeupSignal = SIGRTMIN + 2;
Austin Schuh20b2b082019-09-11 20:42:56 -0700200
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700201// A convenient wrapper for accessing a lockless queue.
Austin Schuh20b2b082019-09-11 20:42:56 -0700202class LocklessQueue {
203 public:
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700204 LocklessQueue(const LocklessQueueMemory *const_memory,
205 LocklessQueueMemory *memory, LocklessQueueConfiguration config)
206 : const_memory_(const_memory), memory_(memory), config_(config) {}
Austin Schuh20b2b082019-09-11 20:42:56 -0700207
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700208 void Initialize();
Austin Schuh20b2b082019-09-11 20:42:56 -0700209
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700210 LocklessQueueConfiguration config() const { return config_; }
Austin Schuh20b2b082019-09-11 20:42:56 -0700211
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700212 const LocklessQueueMemory *const_memory() { return const_memory_; }
213 LocklessQueueMemory *memory() { return memory_; }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700214
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700215 private:
216 const LocklessQueueMemory *const_memory_;
217 LocklessQueueMemory *memory_;
218 LocklessQueueConfiguration config_;
219};
220
221class LocklessQueueWatcher {
222 public:
223 LocklessQueueWatcher(const LocklessQueueWatcher &) = delete;
224 LocklessQueueWatcher &operator=(const LocklessQueueWatcher &) = delete;
225 LocklessQueueWatcher(LocklessQueueWatcher &&other)
226 : memory_(other.memory_), watcher_index_(other.watcher_index_) {
227 other.watcher_index_ = -1;
228 }
229 LocklessQueueWatcher &operator=(LocklessQueueWatcher &&other) {
230 std::swap(memory_, other.memory_);
231 std::swap(watcher_index_, other.watcher_index_);
232 return *this;
233 }
234
235 ~LocklessQueueWatcher();
236
237 // Registers this thread to receive the kWakeupSignal signal when
238 // LocklessQueueWakeUpper::Wakeup is called. Returns nullopt if there was an
239 // error in registration.
240 // TODO(austin): Change the API if we find ourselves with more errors.
241 static std::optional<LocklessQueueWatcher> Make(LocklessQueue queue,
242 int priority);
243
244 private:
245 LocklessQueueWatcher(LocklessQueueMemory *memory, int priority);
246
247 LocklessQueueMemory *memory_ = nullptr;
248
249 // Index in the watcher list that our entry is, or -1 if no watcher is
250 // registered.
251 int watcher_index_ = -1;
252};
253
254class LocklessQueueWakeUpper {
255 public:
256 LocklessQueueWakeUpper(LocklessQueue queue);
Austin Schuh20b2b082019-09-11 20:42:56 -0700257
258 // Sends the kWakeupSignal to all threads which have called RegisterWakeup.
259 //
260 // priority of 0 means nonrt. nonrt could have issues, so we don't PI boost
261 // if nonrt.
262 int Wakeup(int current_priority);
263
Austin Schuh20b2b082019-09-11 20:42:56 -0700264 private:
Austin Schuh20b2b082019-09-11 20:42:56 -0700265 // Memory and datastructure used to sort a list of watchers to wake
266 // up. This isn't a copy of Watcher since tid is simpler to work with here
267 // than the futex above.
268 struct WatcherCopy {
269 pid_t tid;
270 pid_t pid;
271 int priority;
272 };
Austin Schuh20b2b082019-09-11 20:42:56 -0700273
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700274 const LocklessQueueMemory *const memory_;
Austin Schuh20b2b082019-09-11 20:42:56 -0700275 const int pid_;
276 const uid_t uid_;
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700277
278 ::std::vector<WatcherCopy> watcher_copy_;
Austin Schuh20b2b082019-09-11 20:42:56 -0700279};
280
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700281// Sender for blocks of data. The resources associated with a sender are
282// scoped to this object's lifetime.
283class LocklessQueueSender {
284 public:
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700285 // Enum of possible sending errors
286 // Send returns GOOD if the messages was sent successfully, INVALID_REDZONE if
287 // one of a message's redzones has invalid data, or MESSAGES_SENT_TOO_FAST if
288 // more than queue_size messages were going to be sent in a
289 // channel_storage_duration_.
290 enum class Result { GOOD, INVALID_REDZONE, MESSAGES_SENT_TOO_FAST };
291
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700292 LocklessQueueSender(const LocklessQueueSender &) = delete;
293 LocklessQueueSender &operator=(const LocklessQueueSender &) = delete;
294 LocklessQueueSender(LocklessQueueSender &&other)
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700295 : memory_(other.memory_),
296 sender_index_(other.sender_index_),
297 channel_storage_duration_(other.channel_storage_duration_) {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700298 other.memory_ = nullptr;
299 other.sender_index_ = -1;
300 }
301 LocklessQueueSender &operator=(LocklessQueueSender &&other) {
302 std::swap(memory_, other.memory_);
303 std::swap(sender_index_, other.sender_index_);
304 return *this;
305 }
306
307 ~LocklessQueueSender();
308
309 // Creates a sender. If we couldn't allocate a sender, returns nullopt.
310 // TODO(austin): Change the API if we find ourselves with more errors.
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700311 static std::optional<LocklessQueueSender> Make(
312 LocklessQueue queue, monotonic_clock::duration channel_storage_duration);
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700313
314 // Sends a message without copying the data.
315 // Copy at most size() bytes of data into the memory pointed to by Data(),
316 // and then call Send().
317 // Note: calls to Data() are expensive enough that you should cache it.
318 size_t size() const;
319 void *Data();
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700320 LocklessQueueSender::Result Send(
321 size_t length, monotonic_clock::time_point monotonic_remote_time,
322 realtime_clock::time_point realtime_remote_time,
323 uint32_t remote_queue_index, const UUID &source_boot_uuid,
324 monotonic_clock::time_point *monotonic_sent_time = nullptr,
325 realtime_clock::time_point *realtime_sent_time = nullptr,
326 uint32_t *queue_index = nullptr);
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700327
328 // Sends up to length data. Does not wakeup the target.
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700329 LocklessQueueSender::Result Send(
330 const char *data, size_t length,
331 monotonic_clock::time_point monotonic_remote_time,
332 realtime_clock::time_point realtime_remote_time,
333 uint32_t remote_queue_index, const UUID &source_boot_uuid,
334 monotonic_clock::time_point *monotonic_sent_time = nullptr,
335 realtime_clock::time_point *realtime_sent_time = nullptr,
336 uint32_t *queue_index = nullptr);
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700337
338 int buffer_index() const;
339
340 private:
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700341 LocklessQueueSender(LocklessQueueMemory *memory,
342 monotonic_clock::duration channel_storage_duration);
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700343
344 // Pointer to the backing memory.
345 LocklessQueueMemory *memory_ = nullptr;
346
347 // Index into the sender list.
348 int sender_index_ = -1;
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700349
350 // Storage duration of the channel used to check if messages were sent too
351 // fast
352 const monotonic_clock::duration channel_storage_duration_;
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700353};
354
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700355std::ostream &operator<<(std::ostream &os, const LocklessQueueSender::Result r);
356
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700357// Pinner for blocks of data. The resources associated with a pinner are
358// scoped to this object's lifetime.
359class LocklessQueuePinner {
360 public:
361 LocklessQueuePinner(const LocklessQueuePinner &) = delete;
362 LocklessQueuePinner &operator=(const LocklessQueuePinner &) = delete;
363 LocklessQueuePinner(LocklessQueuePinner &&other)
364 : memory_(other.memory_),
365 const_memory_(other.const_memory_),
366 pinner_index_(other.pinner_index_) {
367 other.pinner_index_ = -1;
368 }
369 LocklessQueuePinner &operator=(LocklessQueuePinner &&other) {
370 std::swap(memory_, other.memory_);
371 std::swap(const_memory_, other.const_memory_);
372 std::swap(pinner_index_, other.pinner_index_);
373 return *this;
374 }
375
376 ~LocklessQueuePinner();
377
378 // Creates a pinner. If we couldn't allocate a pinner, returns nullopt.
379 // TODO(austin): Change the API if we find ourselves with more errors.
380 static std::optional<LocklessQueuePinner> Make(LocklessQueue queue);
381
382 // Attempts to pin the message at queue_index.
383 // Un-pins the previous message.
384 // Returns the buffer index (non-negative) if it succeeds.
385 // Returns -1 if that message is no longer in the queue.
386 int PinIndex(uint32_t queue_index);
387
388 // Read at most size() bytes of data into the memory pointed to by Data().
389 // Note: calls to Data() are expensive enough that you should cache it.
390 // Don't call Data() before a successful PinIndex call.
391 size_t size() const;
392 const void *Data() const;
393
394 private:
395 LocklessQueuePinner(LocklessQueueMemory *memory,
396 const LocklessQueueMemory *const_memory);
397
398 // Pointer to the backing memory.
399 LocklessQueueMemory *memory_ = nullptr;
400 const LocklessQueueMemory *const_memory_ = nullptr;
401
402 // Index into the pinner list.
403 int pinner_index_ = -1;
404};
405
406class LocklessQueueReader {
407 public:
408 enum class Result { TOO_OLD, GOOD, NOTHING_NEW, OVERWROTE };
409
410 LocklessQueueReader(LocklessQueue queue) : memory_(queue.const_memory()) {
411 queue.Initialize();
412 }
413
414 // If you ask for a queue index 2 past the newest, you will still get
415 // NOTHING_NEW until that gets overwritten with new data. If you ask for an
416 // element newer than QueueSize() from the current message, we consider it
417 // behind by a large amount and return TOO_OLD. If the message is modified
418 // out from underneath us as we read it, return OVERWROTE.
419 //
420 // data may be nullptr to indicate the data should not be copied.
421 Result Read(uint32_t queue_index,
Austin Schuhb5c6f972021-03-14 21:53:07 -0700422 monotonic_clock::time_point *monotonic_sent_time,
423 realtime_clock::time_point *realtime_sent_time,
424 monotonic_clock::time_point *monotonic_remote_time,
425 realtime_clock::time_point *realtime_remote_time,
Austin Schuha9012be2021-07-21 15:19:11 -0700426 uint32_t *remote_queue_index, UUID *source_boot_uuid,
Austin Schuhb5c6f972021-03-14 21:53:07 -0700427 size_t *length, char *data) const;
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700428
429 // Returns the index to the latest queue message. Returns empty_queue_index()
430 // if there are no messages in the queue. Do note that this index wraps if
431 // more than 2^32 messages are sent.
432 QueueIndex LatestIndex() const;
433
434 private:
435 const LocklessQueueMemory *const memory_;
436};
437
438// Returns the number of messages which are logically in the queue at a time.
439size_t LocklessQueueSize(const LocklessQueueMemory *memory);
440
441// Returns the number of bytes queue users are allowed to read/write within each
442// message.
443size_t LocklessQueueMessageDataSize(const LocklessQueueMemory *memory);
444
445// TODO(austin): Return the oldest queue index. This lets us catch up nicely
446// if we got behind.
447// The easiest way to implement this is likely going to be to reserve the
448// first modulo of values for the initial time around, and never reuse them.
449// That lets us do a simple atomic read of the next index and deduce what has
450// happened. It will involve the simplest atomic operations.
451
452// TODO(austin): Make it so we can find the indices which were sent just
453// before and after a time with a binary search.
454
455// Prints to stdout the data inside the queue for debugging.
456void PrintLocklessQueueMemory(LocklessQueueMemory *memory);
457
Austin Schuh20b2b082019-09-11 20:42:56 -0700458} // namespace ipc_lib
459} // namespace aos
460
461#endif // AOS_IPC_LIB_LOCKLESS_QUEUE_H_