blob: f83b5580ff3ca0ac6cfa7ff72a55e958c395474a [file] [log] [blame]
Austin Schuh20b2b082019-09-11 20:42:56 -07001#ifndef AOS_IPC_LIB_LOCKLESS_QUEUE_H_
2#define AOS_IPC_LIB_LOCKLESS_QUEUE_H_
3
Austin Schuh20b2b082019-09-11 20:42:56 -07004#include <sys/signalfd.h>
5#include <sys/types.h>
Tyler Chatowbf0609c2021-07-31 16:13:27 -07006
7#include <csignal>
Austin Schuhe516ab02020-05-06 21:37:04 -07008#include <optional>
Brian Silverman177567e2020-08-12 19:51:33 -07009#include <vector>
Austin Schuh20b2b082019-09-11 20:42:56 -070010
Brian Silverman0eaa1da2020-08-12 20:03:52 -070011#include "absl/types/span.h"
Philipp Schrader790cb542023-07-05 21:06:52 -070012
Austin Schuh20b2b082019-09-11 20:42:56 -070013#include "aos/ipc_lib/aos_sync.h"
Brian Silvermana1652f32020-01-29 20:41:44 -080014#include "aos/ipc_lib/data_alignment.h"
Austin Schuh20b2b082019-09-11 20:42:56 -070015#include "aos/ipc_lib/index.h"
16#include "aos/time/time.h"
Austin Schuh8902fa52021-03-14 22:39:24 -070017#include "aos/uuid.h"
Austin Schuh20b2b082019-09-11 20:42:56 -070018
19namespace aos {
20namespace ipc_lib {
21
22// Structure to hold the state required to wake a watcher.
23struct Watcher {
24 // Mutex that the watcher locks. If the futex is 0 (or FUTEX_OWNER_DIED),
25 // then this watcher is invalid. The futex variable will then hold the tid of
26 // the watcher, or FUTEX_OWNER_DIED if the task died.
27 //
Brian Silvermanfafe1fa2019-12-18 21:42:18 -080028 // Note: this is only modified with the queue_setup_lock lock held, but may
29 // always be read.
Austin Schuh20b2b082019-09-11 20:42:56 -070030 // Any state modification should happen before the lock is acquired.
31 aos_mutex tid;
32
33 // PID of the watcher.
Brian Silvermanfafe1fa2019-12-18 21:42:18 -080034 std::atomic<pid_t> pid;
Austin Schuh20b2b082019-09-11 20:42:56 -070035
36 // RT priority of the watcher.
Brian Silvermanfafe1fa2019-12-18 21:42:18 -080037 std::atomic<int> priority;
Austin Schuh20b2b082019-09-11 20:42:56 -070038};
39
40// Structure to hold the state required to send messages.
41struct Sender {
42 // Mutex that the sender locks. If the futex is 0 (or FUTEX_OWNER_DIED), then
43 // this sender is invalid. The futex variable will then hold the tid of the
44 // sender, or FUTEX_OWNER_DIED if the task died.
45 //
Brian Silvermanfafe1fa2019-12-18 21:42:18 -080046 // Note: this is only modified with the queue_setup_lock lock held, but may
47 // always be read.
Austin Schuh20b2b082019-09-11 20:42:56 -070048 aos_mutex tid;
49
50 // Index of the message we will be filling out.
51 AtomicIndex scratch_index;
52
53 // Index of the element being swapped with scratch_index, or Invalid if there
54 // is nothing to do.
55 AtomicIndex to_replace;
56};
57
Brian Silverman177567e2020-08-12 19:51:33 -070058// Structure to hold the state required to pin messages.
59struct Pinner {
60 // The same as Sender::tid. See there for docs.
61 aos_mutex tid;
62
63 // Queue index of the message we have pinned, or Invalid if there isn't one.
64 AtomicQueueIndex pinned;
65
66 // This should always be valid.
67 //
68 // Note that this is fully independent from pinned. It's just a place to stash
69 // a message, to ensure there's always an unpinned one for a writer to grab.
70 AtomicIndex scratch_index;
71};
72
Austin Schuh20b2b082019-09-11 20:42:56 -070073// Structure representing a message.
74struct Message {
75 struct Header {
76 // Index of this message in the queue. Needs to match the index this
77 // message is written into the queue at. The data in this message is only
78 // valid if it matches the index in the queue both before and after all the
79 // data is read.
80 //
81 // Note: a value of 0xffffffff always means that the contents aren't valid.
82 AtomicQueueIndex queue_index;
83
Brian Silvermanfafe1fa2019-12-18 21:42:18 -080084 // Timestamp of the message. Needs to be monotonically incrementing in the
Austin Schuh20b2b082019-09-11 20:42:56 -070085 // queue, which means that time needs to be re-sampled every time a write
86 // fails.
Austin Schuhb5c6f972021-03-14 21:53:07 -070087 monotonic_clock::time_point monotonic_sent_time;
88 realtime_clock::time_point realtime_sent_time;
Austin Schuhad154822019-12-27 15:45:13 -080089 // Timestamps of the message from the remote node. These are transparently
90 // passed through.
Austin Schuhb5c6f972021-03-14 21:53:07 -070091 monotonic_clock::time_point monotonic_remote_time;
92 realtime_clock::time_point realtime_remote_time;
Austin Schuhad154822019-12-27 15:45:13 -080093
94 // Queue index from the remote node.
95 uint32_t remote_queue_index;
Austin Schuh20b2b082019-09-11 20:42:56 -070096
Austin Schuh8902fa52021-03-14 22:39:24 -070097 // Remote boot UUID for this message.
Austin Schuha9012be2021-07-21 15:19:11 -070098 UUID source_boot_uuid;
Austin Schuh8902fa52021-03-14 22:39:24 -070099
Austin Schuh20b2b082019-09-11 20:42:56 -0700100 size_t length;
101 } header;
102
Brian Silverman0eaa1da2020-08-12 20:03:52 -0700103 // Returns the start of the data buffer, given that message_data_size is
104 // the same one used to allocate this message's memory.
105 char *data(size_t message_data_size) {
106 return RoundedData(message_data_size);
107 }
108 const char *data(size_t message_data_size) const {
109 return RoundedData(message_data_size);
110 }
111
112 // Returns the pre-buffer redzone, given that message_data_size is the same
113 // one used to allocate this message's memory.
114 absl::Span<char> PreRedzone(size_t message_data_size) {
115 char *const end = data(message_data_size);
116 const auto result =
117 absl::Span<char>(&data_pointer[0], end - &data_pointer[0]);
118 DCHECK_LT(result.size(), kChannelDataRedzone + kChannelDataAlignment);
119 return result;
120 }
121 absl::Span<const char> PreRedzone(size_t message_data_size) const {
122 const char *const end = data(message_data_size);
123 const auto result =
124 absl::Span<const char>(&data_pointer[0], end - &data_pointer[0]);
125 DCHECK_LT(result.size(), kChannelDataRedzone + kChannelDataAlignment);
126 return result;
127 }
128
129 // Returns the post-buffer redzone, given that message_data_size is the same
130 // one used to allocate this message's memory.
131 absl::Span<char> PostRedzone(size_t message_data_size, size_t message_size) {
132 DCHECK_LT(message_data_size, message_size);
133 char *const redzone_end = reinterpret_cast<char *>(this) + message_size;
134 char *const data_end = data(message_data_size) + message_data_size;
135 DCHECK_GT(static_cast<void *>(redzone_end), static_cast<void *>(data_end));
136 const auto result = absl::Span<char>(data_end, redzone_end - data_end);
137 DCHECK_LT(result.size(), kChannelDataRedzone + kChannelDataAlignment * 2);
138 return result;
139 }
140 absl::Span<const char> PostRedzone(size_t message_data_size,
141 size_t message_size) const {
142 DCHECK_LT(message_data_size, message_size);
143 const char *const redzone_end =
144 reinterpret_cast<const char *>(this) + message_size;
145 const char *const data_end = data(message_data_size) + message_data_size;
146 DCHECK_GT(static_cast<const void *>(redzone_end),
147 static_cast<const void *>(data_end));
148 const auto result =
149 absl::Span<const char>(data_end, redzone_end - data_end);
150 DCHECK_LT(result.size(), kChannelDataRedzone + kChannelDataAlignment * 2);
151 return result;
Brian Silvermana1652f32020-01-29 20:41:44 -0800152 }
153
154 private:
Brian Silverman0eaa1da2020-08-12 20:03:52 -0700155 // This returns a non-const pointer into a const object. Be very careful
156 // about const correctness in publicly accessible APIs using it.
157 char *RoundedData(size_t message_data_size) const {
158 return RoundChannelData(
159 const_cast<char *>(&data_pointer[0] + kChannelDataRedzone),
160 message_data_size);
Brian Silvermana1652f32020-01-29 20:41:44 -0800161 }
162
163 char data_pointer[];
Austin Schuh20b2b082019-09-11 20:42:56 -0700164};
165
166struct LocklessQueueConfiguration {
167 // Size of the watchers list.
168 size_t num_watchers;
169 // Size of the sender list.
170 size_t num_senders;
Brian Silverman177567e2020-08-12 19:51:33 -0700171 // Size of the pinner list.
172 size_t num_pinners;
Austin Schuh20b2b082019-09-11 20:42:56 -0700173
174 // Size of the list of pointers into the messages list.
175 size_t queue_size;
176 // Size in bytes of the data stored in each Message.
177 size_t message_data_size;
178
Austin Schuh4bc4f902019-12-23 18:04:51 -0800179 size_t message_size() const;
Austin Schuh20b2b082019-09-11 20:42:56 -0700180
Brian Silverman177567e2020-08-12 19:51:33 -0700181 size_t num_messages() const { return num_senders + num_pinners + queue_size; }
Austin Schuh20b2b082019-09-11 20:42:56 -0700182};
183
184// Structure to hold the state of the queue.
185//
186// Reads and writes are lockless and constant time.
187//
188// Adding a new watcher doesn't need to be constant time for the watcher (this
189// is done before the watcher goes RT), but needs to be RT for the sender.
190struct LocklessQueueMemory;
191
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700192// Returns the size of the LocklessQueueMemory.
193size_t LocklessQueueMemorySize(LocklessQueueConfiguration config);
194
Austin Schuh20b2b082019-09-11 20:42:56 -0700195// Initializes the queue memory. memory must be either a valid pointer to the
196// queue datastructure, or must be zero initialized.
197LocklessQueueMemory *InitializeLocklessQueueMemory(
198 LocklessQueueMemory *memory, LocklessQueueConfiguration config);
199
Alex Perrycb7da4b2019-08-28 19:35:56 -0700200const static unsigned int kWakeupSignal = SIGRTMIN + 2;
Austin Schuh20b2b082019-09-11 20:42:56 -0700201
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700202// A convenient wrapper for accessing a lockless queue.
Austin Schuh20b2b082019-09-11 20:42:56 -0700203class LocklessQueue {
204 public:
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700205 LocklessQueue(const LocklessQueueMemory *const_memory,
206 LocklessQueueMemory *memory, LocklessQueueConfiguration config)
207 : const_memory_(const_memory), memory_(memory), config_(config) {}
Austin Schuh20b2b082019-09-11 20:42:56 -0700208
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700209 void Initialize();
Austin Schuh20b2b082019-09-11 20:42:56 -0700210
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700211 LocklessQueueConfiguration config() const { return config_; }
Austin Schuh20b2b082019-09-11 20:42:56 -0700212
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700213 const LocklessQueueMemory *const_memory() { return const_memory_; }
214 LocklessQueueMemory *memory() { return memory_; }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700215
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700216 private:
217 const LocklessQueueMemory *const_memory_;
218 LocklessQueueMemory *memory_;
219 LocklessQueueConfiguration config_;
220};
221
222class LocklessQueueWatcher {
223 public:
224 LocklessQueueWatcher(const LocklessQueueWatcher &) = delete;
225 LocklessQueueWatcher &operator=(const LocklessQueueWatcher &) = delete;
226 LocklessQueueWatcher(LocklessQueueWatcher &&other)
227 : memory_(other.memory_), watcher_index_(other.watcher_index_) {
228 other.watcher_index_ = -1;
229 }
230 LocklessQueueWatcher &operator=(LocklessQueueWatcher &&other) {
231 std::swap(memory_, other.memory_);
232 std::swap(watcher_index_, other.watcher_index_);
233 return *this;
234 }
235
236 ~LocklessQueueWatcher();
237
238 // Registers this thread to receive the kWakeupSignal signal when
239 // LocklessQueueWakeUpper::Wakeup is called. Returns nullopt if there was an
240 // error in registration.
241 // TODO(austin): Change the API if we find ourselves with more errors.
242 static std::optional<LocklessQueueWatcher> Make(LocklessQueue queue,
243 int priority);
244
245 private:
246 LocklessQueueWatcher(LocklessQueueMemory *memory, int priority);
247
248 LocklessQueueMemory *memory_ = nullptr;
249
250 // Index in the watcher list that our entry is, or -1 if no watcher is
251 // registered.
252 int watcher_index_ = -1;
253};
254
255class LocklessQueueWakeUpper {
256 public:
257 LocklessQueueWakeUpper(LocklessQueue queue);
Austin Schuh20b2b082019-09-11 20:42:56 -0700258
259 // Sends the kWakeupSignal to all threads which have called RegisterWakeup.
260 //
261 // priority of 0 means nonrt. nonrt could have issues, so we don't PI boost
262 // if nonrt.
263 int Wakeup(int current_priority);
264
Austin Schuh20b2b082019-09-11 20:42:56 -0700265 private:
Austin Schuh20b2b082019-09-11 20:42:56 -0700266 // Memory and datastructure used to sort a list of watchers to wake
267 // up. This isn't a copy of Watcher since tid is simpler to work with here
268 // than the futex above.
269 struct WatcherCopy {
270 pid_t tid;
271 pid_t pid;
272 int priority;
273 };
Austin Schuh20b2b082019-09-11 20:42:56 -0700274
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700275 const LocklessQueueMemory *const memory_;
Austin Schuh20b2b082019-09-11 20:42:56 -0700276 const int pid_;
277 const uid_t uid_;
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700278
279 ::std::vector<WatcherCopy> watcher_copy_;
Austin Schuh20b2b082019-09-11 20:42:56 -0700280};
281
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700282// Sender for blocks of data. The resources associated with a sender are
283// scoped to this object's lifetime.
284class LocklessQueueSender {
285 public:
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700286 // Enum of possible sending errors
287 // Send returns GOOD if the messages was sent successfully, INVALID_REDZONE if
288 // one of a message's redzones has invalid data, or MESSAGES_SENT_TOO_FAST if
289 // more than queue_size messages were going to be sent in a
290 // channel_storage_duration_.
291 enum class Result { GOOD, INVALID_REDZONE, MESSAGES_SENT_TOO_FAST };
292
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700293 LocklessQueueSender(const LocklessQueueSender &) = delete;
294 LocklessQueueSender &operator=(const LocklessQueueSender &) = delete;
295 LocklessQueueSender(LocklessQueueSender &&other)
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700296 : memory_(other.memory_),
297 sender_index_(other.sender_index_),
298 channel_storage_duration_(other.channel_storage_duration_) {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700299 other.memory_ = nullptr;
300 other.sender_index_ = -1;
301 }
302 LocklessQueueSender &operator=(LocklessQueueSender &&other) {
303 std::swap(memory_, other.memory_);
304 std::swap(sender_index_, other.sender_index_);
305 return *this;
306 }
307
308 ~LocklessQueueSender();
309
310 // Creates a sender. If we couldn't allocate a sender, returns nullopt.
311 // TODO(austin): Change the API if we find ourselves with more errors.
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700312 static std::optional<LocklessQueueSender> Make(
313 LocklessQueue queue, monotonic_clock::duration channel_storage_duration);
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700314
315 // Sends a message without copying the data.
316 // Copy at most size() bytes of data into the memory pointed to by Data(),
317 // and then call Send().
318 // Note: calls to Data() are expensive enough that you should cache it.
319 size_t size() const;
320 void *Data();
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700321 LocklessQueueSender::Result Send(
322 size_t length, monotonic_clock::time_point monotonic_remote_time,
323 realtime_clock::time_point realtime_remote_time,
324 uint32_t remote_queue_index, const UUID &source_boot_uuid,
325 monotonic_clock::time_point *monotonic_sent_time = nullptr,
326 realtime_clock::time_point *realtime_sent_time = nullptr,
327 uint32_t *queue_index = nullptr);
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700328
329 // Sends up to length data. Does not wakeup the target.
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700330 LocklessQueueSender::Result Send(
331 const char *data, size_t length,
332 monotonic_clock::time_point monotonic_remote_time,
333 realtime_clock::time_point realtime_remote_time,
334 uint32_t remote_queue_index, const UUID &source_boot_uuid,
335 monotonic_clock::time_point *monotonic_sent_time = nullptr,
336 realtime_clock::time_point *realtime_sent_time = nullptr,
337 uint32_t *queue_index = nullptr);
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700338
339 int buffer_index() const;
340
341 private:
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700342 LocklessQueueSender(LocklessQueueMemory *memory,
343 monotonic_clock::duration channel_storage_duration);
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700344
345 // Pointer to the backing memory.
346 LocklessQueueMemory *memory_ = nullptr;
347
348 // Index into the sender list.
349 int sender_index_ = -1;
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700350
351 // Storage duration of the channel used to check if messages were sent too
352 // fast
353 const monotonic_clock::duration channel_storage_duration_;
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700354};
355
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700356std::ostream &operator<<(std::ostream &os, const LocklessQueueSender::Result r);
357
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700358// Pinner for blocks of data. The resources associated with a pinner are
359// scoped to this object's lifetime.
360class LocklessQueuePinner {
361 public:
362 LocklessQueuePinner(const LocklessQueuePinner &) = delete;
363 LocklessQueuePinner &operator=(const LocklessQueuePinner &) = delete;
364 LocklessQueuePinner(LocklessQueuePinner &&other)
365 : memory_(other.memory_),
366 const_memory_(other.const_memory_),
367 pinner_index_(other.pinner_index_) {
368 other.pinner_index_ = -1;
369 }
370 LocklessQueuePinner &operator=(LocklessQueuePinner &&other) {
371 std::swap(memory_, other.memory_);
372 std::swap(const_memory_, other.const_memory_);
373 std::swap(pinner_index_, other.pinner_index_);
374 return *this;
375 }
376
377 ~LocklessQueuePinner();
378
379 // Creates a pinner. If we couldn't allocate a pinner, returns nullopt.
380 // TODO(austin): Change the API if we find ourselves with more errors.
381 static std::optional<LocklessQueuePinner> Make(LocklessQueue queue);
382
383 // Attempts to pin the message at queue_index.
384 // Un-pins the previous message.
385 // Returns the buffer index (non-negative) if it succeeds.
386 // Returns -1 if that message is no longer in the queue.
387 int PinIndex(uint32_t queue_index);
388
389 // Read at most size() bytes of data into the memory pointed to by Data().
390 // Note: calls to Data() are expensive enough that you should cache it.
391 // Don't call Data() before a successful PinIndex call.
392 size_t size() const;
393 const void *Data() const;
394
395 private:
396 LocklessQueuePinner(LocklessQueueMemory *memory,
397 const LocklessQueueMemory *const_memory);
398
399 // Pointer to the backing memory.
400 LocklessQueueMemory *memory_ = nullptr;
401 const LocklessQueueMemory *const_memory_ = nullptr;
402
403 // Index into the pinner list.
404 int pinner_index_ = -1;
405};
406
407class LocklessQueueReader {
408 public:
409 enum class Result { TOO_OLD, GOOD, NOTHING_NEW, OVERWROTE };
410
411 LocklessQueueReader(LocklessQueue queue) : memory_(queue.const_memory()) {
412 queue.Initialize();
413 }
414
415 // If you ask for a queue index 2 past the newest, you will still get
416 // NOTHING_NEW until that gets overwritten with new data. If you ask for an
417 // element newer than QueueSize() from the current message, we consider it
418 // behind by a large amount and return TOO_OLD. If the message is modified
419 // out from underneath us as we read it, return OVERWROTE.
420 //
421 // data may be nullptr to indicate the data should not be copied.
422 Result Read(uint32_t queue_index,
Austin Schuhb5c6f972021-03-14 21:53:07 -0700423 monotonic_clock::time_point *monotonic_sent_time,
424 realtime_clock::time_point *realtime_sent_time,
425 monotonic_clock::time_point *monotonic_remote_time,
426 realtime_clock::time_point *realtime_remote_time,
Austin Schuha9012be2021-07-21 15:19:11 -0700427 uint32_t *remote_queue_index, UUID *source_boot_uuid,
Austin Schuhb5c6f972021-03-14 21:53:07 -0700428 size_t *length, char *data) const;
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700429
430 // Returns the index to the latest queue message. Returns empty_queue_index()
431 // if there are no messages in the queue. Do note that this index wraps if
432 // more than 2^32 messages are sent.
433 QueueIndex LatestIndex() const;
434
435 private:
436 const LocklessQueueMemory *const memory_;
437};
438
439// Returns the number of messages which are logically in the queue at a time.
440size_t LocklessQueueSize(const LocklessQueueMemory *memory);
441
442// Returns the number of bytes queue users are allowed to read/write within each
443// message.
444size_t LocklessQueueMessageDataSize(const LocklessQueueMemory *memory);
445
446// TODO(austin): Return the oldest queue index. This lets us catch up nicely
447// if we got behind.
448// The easiest way to implement this is likely going to be to reserve the
449// first modulo of values for the initial time around, and never reuse them.
450// That lets us do a simple atomic read of the next index and deduce what has
451// happened. It will involve the simplest atomic operations.
452
453// TODO(austin): Make it so we can find the indices which were sent just
454// before and after a time with a binary search.
455
456// Prints to stdout the data inside the queue for debugging.
Austin Schuh83cbb1e2023-06-23 12:59:02 -0700457void PrintLocklessQueueMemory(const LocklessQueueMemory *memory);
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700458
Austin Schuh20b2b082019-09-11 20:42:56 -0700459} // namespace ipc_lib
460} // namespace aos
461
462#endif // AOS_IPC_LIB_LOCKLESS_QUEUE_H_