blob: 1fdbb3986e04e37a5dc0ef1cb6c60f2ab3fa717b [file] [log] [blame]
Austin Schuh20b2b082019-09-11 20:42:56 -07001#ifndef AOS_IPC_LIB_LOCKLESS_QUEUE_H_
2#define AOS_IPC_LIB_LOCKLESS_QUEUE_H_
3
Stephan Pleines682928d2024-05-31 20:43:48 -07004#include <signal.h>
5#include <stdint.h>
Tyler Chatowbf0609c2021-07-31 16:13:27 -07006
Stephan Pleines682928d2024-05-31 20:43:48 -07007#include <atomic>
8#include <functional>
9#include <iosfwd>
Austin Schuhe516ab02020-05-06 21:37:04 -070010#include <optional>
Stephan Pleines682928d2024-05-31 20:43:48 -070011#include <utility>
Brian Silverman177567e2020-08-12 19:51:33 -070012#include <vector>
Austin Schuh20b2b082019-09-11 20:42:56 -070013
Austin Schuh99f7c6a2024-06-25 22:07:44 -070014#include "absl/log/check.h"
15#include "absl/log/log.h"
Brian Silverman0eaa1da2020-08-12 20:03:52 -070016#include "absl/types/span.h"
Philipp Schrader790cb542023-07-05 21:06:52 -070017
Austin Schuh82ea7382023-07-14 15:17:34 -070018#include "aos/events/context.h"
Austin Schuh20b2b082019-09-11 20:42:56 -070019#include "aos/ipc_lib/aos_sync.h"
Brian Silvermana1652f32020-01-29 20:41:44 -080020#include "aos/ipc_lib/data_alignment.h"
Austin Schuh20b2b082019-09-11 20:42:56 -070021#include "aos/ipc_lib/index.h"
Philipp Schraderab2f8432023-09-17 18:58:06 -070022#include "aos/ipc_lib/robust_ownership_tracker.h"
Austin Schuh20b2b082019-09-11 20:42:56 -070023#include "aos/time/time.h"
Austin Schuh8902fa52021-03-14 22:39:24 -070024#include "aos/uuid.h"
Austin Schuh20b2b082019-09-11 20:42:56 -070025
Stephan Pleinesd99b1ee2024-02-02 20:56:44 -080026namespace aos::ipc_lib {
Austin Schuh20b2b082019-09-11 20:42:56 -070027
28// Structure to hold the state required to wake a watcher.
29struct Watcher {
30 // Mutex that the watcher locks. If the futex is 0 (or FUTEX_OWNER_DIED),
31 // then this watcher is invalid. The futex variable will then hold the tid of
32 // the watcher, or FUTEX_OWNER_DIED if the task died.
33 //
Brian Silvermanfafe1fa2019-12-18 21:42:18 -080034 // Note: this is only modified with the queue_setup_lock lock held, but may
35 // always be read.
Austin Schuh20b2b082019-09-11 20:42:56 -070036 // Any state modification should happen before the lock is acquired.
Philipp Schraderab2f8432023-09-17 18:58:06 -070037 RobustOwnershipTracker ownership_tracker;
Austin Schuh20b2b082019-09-11 20:42:56 -070038
39 // PID of the watcher.
Brian Silvermanfafe1fa2019-12-18 21:42:18 -080040 std::atomic<pid_t> pid;
Austin Schuh20b2b082019-09-11 20:42:56 -070041
42 // RT priority of the watcher.
Brian Silvermanfafe1fa2019-12-18 21:42:18 -080043 std::atomic<int> priority;
Austin Schuh20b2b082019-09-11 20:42:56 -070044};
45
46// Structure to hold the state required to send messages.
47struct Sender {
48 // Mutex that the sender locks. If the futex is 0 (or FUTEX_OWNER_DIED), then
49 // this sender is invalid. The futex variable will then hold the tid of the
50 // sender, or FUTEX_OWNER_DIED if the task died.
51 //
Brian Silvermanfafe1fa2019-12-18 21:42:18 -080052 // Note: this is only modified with the queue_setup_lock lock held, but may
53 // always be read.
Philipp Schraderab2f8432023-09-17 18:58:06 -070054 RobustOwnershipTracker ownership_tracker;
Austin Schuh20b2b082019-09-11 20:42:56 -070055
56 // Index of the message we will be filling out.
57 AtomicIndex scratch_index;
58
59 // Index of the element being swapped with scratch_index, or Invalid if there
60 // is nothing to do.
61 AtomicIndex to_replace;
62};
63
Brian Silverman177567e2020-08-12 19:51:33 -070064// Structure to hold the state required to pin messages.
65struct Pinner {
66 // The same as Sender::tid. See there for docs.
Philipp Schraderab2f8432023-09-17 18:58:06 -070067 RobustOwnershipTracker ownership_tracker;
Brian Silverman177567e2020-08-12 19:51:33 -070068
69 // Queue index of the message we have pinned, or Invalid if there isn't one.
70 AtomicQueueIndex pinned;
71
72 // This should always be valid.
73 //
74 // Note that this is fully independent from pinned. It's just a place to stash
75 // a message, to ensure there's always an unpinned one for a writer to grab.
76 AtomicIndex scratch_index;
77};
78
Austin Schuh20b2b082019-09-11 20:42:56 -070079// Structure representing a message.
80struct Message {
81 struct Header {
82 // Index of this message in the queue. Needs to match the index this
83 // message is written into the queue at. The data in this message is only
84 // valid if it matches the index in the queue both before and after all the
85 // data is read.
86 //
87 // Note: a value of 0xffffffff always means that the contents aren't valid.
88 AtomicQueueIndex queue_index;
89
Brian Silvermanfafe1fa2019-12-18 21:42:18 -080090 // Timestamp of the message. Needs to be monotonically incrementing in the
Austin Schuh20b2b082019-09-11 20:42:56 -070091 // queue, which means that time needs to be re-sampled every time a write
92 // fails.
Austin Schuhb5c6f972021-03-14 21:53:07 -070093 monotonic_clock::time_point monotonic_sent_time;
94 realtime_clock::time_point realtime_sent_time;
Austin Schuhad154822019-12-27 15:45:13 -080095 // Timestamps of the message from the remote node. These are transparently
96 // passed through.
Austin Schuhb5c6f972021-03-14 21:53:07 -070097 monotonic_clock::time_point monotonic_remote_time;
98 realtime_clock::time_point realtime_remote_time;
Austin Schuhac6d89e2024-03-27 14:56:09 -070099 monotonic_clock::time_point monotonic_remote_transmit_time;
Austin Schuhad154822019-12-27 15:45:13 -0800100
101 // Queue index from the remote node.
102 uint32_t remote_queue_index;
Austin Schuh20b2b082019-09-11 20:42:56 -0700103
Austin Schuh8902fa52021-03-14 22:39:24 -0700104 // Remote boot UUID for this message.
Austin Schuha9012be2021-07-21 15:19:11 -0700105 UUID source_boot_uuid;
Austin Schuh8902fa52021-03-14 22:39:24 -0700106
Austin Schuh20b2b082019-09-11 20:42:56 -0700107 size_t length;
108 } header;
109
Brian Silverman0eaa1da2020-08-12 20:03:52 -0700110 // Returns the start of the data buffer, given that message_data_size is
111 // the same one used to allocate this message's memory.
112 char *data(size_t message_data_size) {
113 return RoundedData(message_data_size);
114 }
115 const char *data(size_t message_data_size) const {
116 return RoundedData(message_data_size);
117 }
118
119 // Returns the pre-buffer redzone, given that message_data_size is the same
120 // one used to allocate this message's memory.
121 absl::Span<char> PreRedzone(size_t message_data_size) {
122 char *const end = data(message_data_size);
123 const auto result =
124 absl::Span<char>(&data_pointer[0], end - &data_pointer[0]);
125 DCHECK_LT(result.size(), kChannelDataRedzone + kChannelDataAlignment);
126 return result;
127 }
128 absl::Span<const char> PreRedzone(size_t message_data_size) const {
129 const char *const end = data(message_data_size);
130 const auto result =
131 absl::Span<const char>(&data_pointer[0], end - &data_pointer[0]);
132 DCHECK_LT(result.size(), kChannelDataRedzone + kChannelDataAlignment);
133 return result;
134 }
135
136 // Returns the post-buffer redzone, given that message_data_size is the same
137 // one used to allocate this message's memory.
138 absl::Span<char> PostRedzone(size_t message_data_size, size_t message_size) {
139 DCHECK_LT(message_data_size, message_size);
140 char *const redzone_end = reinterpret_cast<char *>(this) + message_size;
141 char *const data_end = data(message_data_size) + message_data_size;
142 DCHECK_GT(static_cast<void *>(redzone_end), static_cast<void *>(data_end));
143 const auto result = absl::Span<char>(data_end, redzone_end - data_end);
144 DCHECK_LT(result.size(), kChannelDataRedzone + kChannelDataAlignment * 2);
145 return result;
146 }
147 absl::Span<const char> PostRedzone(size_t message_data_size,
148 size_t message_size) const {
149 DCHECK_LT(message_data_size, message_size);
150 const char *const redzone_end =
151 reinterpret_cast<const char *>(this) + message_size;
152 const char *const data_end = data(message_data_size) + message_data_size;
153 DCHECK_GT(static_cast<const void *>(redzone_end),
154 static_cast<const void *>(data_end));
155 const auto result =
156 absl::Span<const char>(data_end, redzone_end - data_end);
157 DCHECK_LT(result.size(), kChannelDataRedzone + kChannelDataAlignment * 2);
158 return result;
Brian Silvermana1652f32020-01-29 20:41:44 -0800159 }
160
161 private:
Brian Silverman0eaa1da2020-08-12 20:03:52 -0700162 // This returns a non-const pointer into a const object. Be very careful
163 // about const correctness in publicly accessible APIs using it.
164 char *RoundedData(size_t message_data_size) const {
165 return RoundChannelData(
166 const_cast<char *>(&data_pointer[0] + kChannelDataRedzone),
167 message_data_size);
Brian Silvermana1652f32020-01-29 20:41:44 -0800168 }
169
170 char data_pointer[];
Austin Schuh20b2b082019-09-11 20:42:56 -0700171};
172
173struct LocklessQueueConfiguration {
174 // Size of the watchers list.
175 size_t num_watchers;
176 // Size of the sender list.
177 size_t num_senders;
Brian Silverman177567e2020-08-12 19:51:33 -0700178 // Size of the pinner list.
179 size_t num_pinners;
Austin Schuh20b2b082019-09-11 20:42:56 -0700180
181 // Size of the list of pointers into the messages list.
182 size_t queue_size;
183 // Size in bytes of the data stored in each Message.
184 size_t message_data_size;
185
Austin Schuh4bc4f902019-12-23 18:04:51 -0800186 size_t message_size() const;
Austin Schuh20b2b082019-09-11 20:42:56 -0700187
Brian Silverman177567e2020-08-12 19:51:33 -0700188 size_t num_messages() const { return num_senders + num_pinners + queue_size; }
Austin Schuh20b2b082019-09-11 20:42:56 -0700189};
190
191// Structure to hold the state of the queue.
192//
193// Reads and writes are lockless and constant time.
194//
195// Adding a new watcher doesn't need to be constant time for the watcher (this
196// is done before the watcher goes RT), but needs to be RT for the sender.
197struct LocklessQueueMemory;
198
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700199// Returns the size of the LocklessQueueMemory.
200size_t LocklessQueueMemorySize(LocklessQueueConfiguration config);
201
Austin Schuh20b2b082019-09-11 20:42:56 -0700202// Initializes the queue memory. memory must be either a valid pointer to the
203// queue datastructure, or must be zero initialized.
204LocklessQueueMemory *InitializeLocklessQueueMemory(
205 LocklessQueueMemory *memory, LocklessQueueConfiguration config);
206
Alex Perrycb7da4b2019-08-28 19:35:56 -0700207const static unsigned int kWakeupSignal = SIGRTMIN + 2;
Austin Schuh20b2b082019-09-11 20:42:56 -0700208
Philipp Schraderab2f8432023-09-17 18:58:06 -0700209// Sets FUTEX_OWNER_DIED if the owner was tid. This fakes what the kernel does
210// with a robust mutex.
211bool PretendThatOwnerIsDeadForTesting(aos_mutex *mutex, pid_t tid);
212
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700213// A convenient wrapper for accessing a lockless queue.
Austin Schuh20b2b082019-09-11 20:42:56 -0700214class LocklessQueue {
215 public:
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700216 LocklessQueue(const LocklessQueueMemory *const_memory,
217 LocklessQueueMemory *memory, LocklessQueueConfiguration config)
218 : const_memory_(const_memory), memory_(memory), config_(config) {}
Austin Schuh20b2b082019-09-11 20:42:56 -0700219
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700220 void Initialize();
Austin Schuh20b2b082019-09-11 20:42:56 -0700221
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700222 LocklessQueueConfiguration config() const { return config_; }
Austin Schuh20b2b082019-09-11 20:42:56 -0700223
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700224 const LocklessQueueMemory *const_memory() { return const_memory_; }
225 LocklessQueueMemory *memory() { return memory_; }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700226
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700227 private:
228 const LocklessQueueMemory *const_memory_;
229 LocklessQueueMemory *memory_;
230 LocklessQueueConfiguration config_;
231};
232
233class LocklessQueueWatcher {
234 public:
235 LocklessQueueWatcher(const LocklessQueueWatcher &) = delete;
236 LocklessQueueWatcher &operator=(const LocklessQueueWatcher &) = delete;
237 LocklessQueueWatcher(LocklessQueueWatcher &&other)
238 : memory_(other.memory_), watcher_index_(other.watcher_index_) {
239 other.watcher_index_ = -1;
240 }
241 LocklessQueueWatcher &operator=(LocklessQueueWatcher &&other) {
242 std::swap(memory_, other.memory_);
243 std::swap(watcher_index_, other.watcher_index_);
244 return *this;
245 }
246
247 ~LocklessQueueWatcher();
248
249 // Registers this thread to receive the kWakeupSignal signal when
250 // LocklessQueueWakeUpper::Wakeup is called. Returns nullopt if there was an
251 // error in registration.
252 // TODO(austin): Change the API if we find ourselves with more errors.
253 static std::optional<LocklessQueueWatcher> Make(LocklessQueue queue,
254 int priority);
255
256 private:
257 LocklessQueueWatcher(LocklessQueueMemory *memory, int priority);
258
259 LocklessQueueMemory *memory_ = nullptr;
260
261 // Index in the watcher list that our entry is, or -1 if no watcher is
262 // registered.
263 int watcher_index_ = -1;
264};
265
266class LocklessQueueWakeUpper {
267 public:
268 LocklessQueueWakeUpper(LocklessQueue queue);
Austin Schuh20b2b082019-09-11 20:42:56 -0700269
270 // Sends the kWakeupSignal to all threads which have called RegisterWakeup.
271 //
272 // priority of 0 means nonrt. nonrt could have issues, so we don't PI boost
273 // if nonrt.
274 int Wakeup(int current_priority);
275
Austin Schuh20b2b082019-09-11 20:42:56 -0700276 private:
Austin Schuh20b2b082019-09-11 20:42:56 -0700277 // Memory and datastructure used to sort a list of watchers to wake
278 // up. This isn't a copy of Watcher since tid is simpler to work with here
279 // than the futex above.
280 struct WatcherCopy {
Philipp Schraderab2f8432023-09-17 18:58:06 -0700281 ThreadOwnerStatusSnapshot ownership_snapshot;
Austin Schuh20b2b082019-09-11 20:42:56 -0700282 pid_t pid;
283 int priority;
284 };
Austin Schuh20b2b082019-09-11 20:42:56 -0700285
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700286 const LocklessQueueMemory *const memory_;
Austin Schuh20b2b082019-09-11 20:42:56 -0700287 const int pid_;
288 const uid_t uid_;
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700289
290 ::std::vector<WatcherCopy> watcher_copy_;
Austin Schuh20b2b082019-09-11 20:42:56 -0700291};
292
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700293// Sender for blocks of data. The resources associated with a sender are
294// scoped to this object's lifetime.
295class LocklessQueueSender {
296 public:
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700297 // Enum of possible sending errors
298 // Send returns GOOD if the messages was sent successfully, INVALID_REDZONE if
299 // one of a message's redzones has invalid data, or MESSAGES_SENT_TOO_FAST if
300 // more than queue_size messages were going to be sent in a
301 // channel_storage_duration_.
302 enum class Result { GOOD, INVALID_REDZONE, MESSAGES_SENT_TOO_FAST };
303
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700304 LocklessQueueSender(const LocklessQueueSender &) = delete;
305 LocklessQueueSender &operator=(const LocklessQueueSender &) = delete;
306 LocklessQueueSender(LocklessQueueSender &&other)
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700307 : memory_(other.memory_),
308 sender_index_(other.sender_index_),
309 channel_storage_duration_(other.channel_storage_duration_) {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700310 other.memory_ = nullptr;
311 other.sender_index_ = -1;
312 }
313 LocklessQueueSender &operator=(LocklessQueueSender &&other) {
314 std::swap(memory_, other.memory_);
315 std::swap(sender_index_, other.sender_index_);
316 return *this;
317 }
318
319 ~LocklessQueueSender();
320
321 // Creates a sender. If we couldn't allocate a sender, returns nullopt.
322 // TODO(austin): Change the API if we find ourselves with more errors.
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700323 static std::optional<LocklessQueueSender> Make(
324 LocklessQueue queue, monotonic_clock::duration channel_storage_duration);
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700325
326 // Sends a message without copying the data.
327 // Copy at most size() bytes of data into the memory pointed to by Data(),
328 // and then call Send().
329 // Note: calls to Data() are expensive enough that you should cache it.
330 size_t size() const;
331 void *Data();
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700332 LocklessQueueSender::Result Send(
333 size_t length, monotonic_clock::time_point monotonic_remote_time,
334 realtime_clock::time_point realtime_remote_time,
Austin Schuhac6d89e2024-03-27 14:56:09 -0700335 monotonic_clock::time_point monotonic_remote_transmit_time,
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700336 uint32_t remote_queue_index, const UUID &source_boot_uuid,
337 monotonic_clock::time_point *monotonic_sent_time = nullptr,
338 realtime_clock::time_point *realtime_sent_time = nullptr,
339 uint32_t *queue_index = nullptr);
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700340
341 // Sends up to length data. Does not wakeup the target.
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700342 LocklessQueueSender::Result Send(
343 const char *data, size_t length,
344 monotonic_clock::time_point monotonic_remote_time,
345 realtime_clock::time_point realtime_remote_time,
Austin Schuhac6d89e2024-03-27 14:56:09 -0700346 monotonic_clock::time_point monotonic_remote_transmit_time,
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700347 uint32_t remote_queue_index, const UUID &source_boot_uuid,
348 monotonic_clock::time_point *monotonic_sent_time = nullptr,
349 realtime_clock::time_point *realtime_sent_time = nullptr,
350 uint32_t *queue_index = nullptr);
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700351
352 int buffer_index() const;
353
354 private:
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700355 LocklessQueueSender(LocklessQueueMemory *memory,
356 monotonic_clock::duration channel_storage_duration);
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700357
358 // Pointer to the backing memory.
359 LocklessQueueMemory *memory_ = nullptr;
360
361 // Index into the sender list.
362 int sender_index_ = -1;
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700363
364 // Storage duration of the channel used to check if messages were sent too
365 // fast
366 const monotonic_clock::duration channel_storage_duration_;
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700367};
368
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700369std::ostream &operator<<(std::ostream &os, const LocklessQueueSender::Result r);
370
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700371// Pinner for blocks of data. The resources associated with a pinner are
372// scoped to this object's lifetime.
373class LocklessQueuePinner {
374 public:
375 LocklessQueuePinner(const LocklessQueuePinner &) = delete;
376 LocklessQueuePinner &operator=(const LocklessQueuePinner &) = delete;
377 LocklessQueuePinner(LocklessQueuePinner &&other)
378 : memory_(other.memory_),
379 const_memory_(other.const_memory_),
380 pinner_index_(other.pinner_index_) {
381 other.pinner_index_ = -1;
382 }
383 LocklessQueuePinner &operator=(LocklessQueuePinner &&other) {
384 std::swap(memory_, other.memory_);
385 std::swap(const_memory_, other.const_memory_);
386 std::swap(pinner_index_, other.pinner_index_);
387 return *this;
388 }
389
390 ~LocklessQueuePinner();
391
392 // Creates a pinner. If we couldn't allocate a pinner, returns nullopt.
393 // TODO(austin): Change the API if we find ourselves with more errors.
394 static std::optional<LocklessQueuePinner> Make(LocklessQueue queue);
395
396 // Attempts to pin the message at queue_index.
397 // Un-pins the previous message.
398 // Returns the buffer index (non-negative) if it succeeds.
399 // Returns -1 if that message is no longer in the queue.
400 int PinIndex(uint32_t queue_index);
401
402 // Read at most size() bytes of data into the memory pointed to by Data().
403 // Note: calls to Data() are expensive enough that you should cache it.
404 // Don't call Data() before a successful PinIndex call.
405 size_t size() const;
406 const void *Data() const;
407
408 private:
409 LocklessQueuePinner(LocklessQueueMemory *memory,
410 const LocklessQueueMemory *const_memory);
411
412 // Pointer to the backing memory.
413 LocklessQueueMemory *memory_ = nullptr;
414 const LocklessQueueMemory *const_memory_ = nullptr;
415
416 // Index into the pinner list.
417 int pinner_index_ = -1;
418};
419
420class LocklessQueueReader {
421 public:
Austin Schuh82ea7382023-07-14 15:17:34 -0700422 enum class Result {
423 // Message we read was too old and no longer is in the queue.
424 TOO_OLD,
425 // Success!
426 GOOD,
427 // The message is in the future and we haven't written it yet.
428 NOTHING_NEW,
Austin Schuhfaec51a2023-09-08 17:43:32 -0700429 // There is a message, but should_read_callback() returned false so we
430 // didn't fetch it.
Austin Schuh82ea7382023-07-14 15:17:34 -0700431 FILTERED,
432 // The message got overwritten while we were reading it.
433 OVERWROTE,
434 };
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700435
Austin Schuhfaec51a2023-09-08 17:43:32 -0700436 LocklessQueueReader(LocklessQueue queue)
437 : memory_(queue.memory()), const_memory_(queue.const_memory()) {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700438 queue.Initialize();
439 }
440
441 // If you ask for a queue index 2 past the newest, you will still get
442 // NOTHING_NEW until that gets overwritten with new data. If you ask for an
443 // element newer than QueueSize() from the current message, we consider it
444 // behind by a large amount and return TOO_OLD. If the message is modified
Austin Schuh82ea7382023-07-14 15:17:34 -0700445 // out from underneath us as we read it, return OVERWROTE. If we found a new
446 // message, but the filter function returned false, return FILTERED.
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700447 //
448 // data may be nullptr to indicate the data should not be copied.
Austin Schuhfaec51a2023-09-08 17:43:32 -0700449 Result Read(
450 uint32_t queue_index, monotonic_clock::time_point *monotonic_sent_time,
451 realtime_clock::time_point *realtime_sent_time,
452 monotonic_clock::time_point *monotonic_remote_time,
Austin Schuhac6d89e2024-03-27 14:56:09 -0700453 monotonic_clock::time_point *monotonic_remote_transmit_time,
Austin Schuhfaec51a2023-09-08 17:43:32 -0700454 realtime_clock::time_point *realtime_remote_time,
455 uint32_t *remote_queue_index, UUID *source_boot_uuid, size_t *length,
456 char *data,
457 std::function<bool(const Context &context)> should_read_callback) const;
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700458
459 // Returns the index to the latest queue message. Returns empty_queue_index()
460 // if there are no messages in the queue. Do note that this index wraps if
461 // more than 2^32 messages are sent.
462 QueueIndex LatestIndex() const;
463
464 private:
Austin Schuhfaec51a2023-09-08 17:43:32 -0700465 LocklessQueueMemory *const memory_;
466 const LocklessQueueMemory *const_memory_;
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700467};
468
469// Returns the number of messages which are logically in the queue at a time.
470size_t LocklessQueueSize(const LocklessQueueMemory *memory);
471
472// Returns the number of bytes queue users are allowed to read/write within each
473// message.
474size_t LocklessQueueMessageDataSize(const LocklessQueueMemory *memory);
475
476// TODO(austin): Return the oldest queue index. This lets us catch up nicely
477// if we got behind.
478// The easiest way to implement this is likely going to be to reserve the
479// first modulo of values for the initial time around, and never reuse them.
480// That lets us do a simple atomic read of the next index and deduce what has
481// happened. It will involve the simplest atomic operations.
482
483// TODO(austin): Make it so we can find the indices which were sent just
484// before and after a time with a binary search.
485
486// Prints to stdout the data inside the queue for debugging.
Austin Schuh83cbb1e2023-06-23 12:59:02 -0700487void PrintLocklessQueueMemory(const LocklessQueueMemory *memory);
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700488
Stephan Pleinesd99b1ee2024-02-02 20:56:44 -0800489} // namespace aos::ipc_lib
Austin Schuh20b2b082019-09-11 20:42:56 -0700490
491#endif // AOS_IPC_LIB_LOCKLESS_QUEUE_H_