blob: 69207f43a12c54c05e43f414f8f3a5243a8557e5 [file] [log] [blame]
Austin Schuh20b2b082019-09-11 20:42:56 -07001#ifndef AOS_IPC_LIB_LOCKLESS_QUEUE_H_
2#define AOS_IPC_LIB_LOCKLESS_QUEUE_H_
3
4#include <signal.h>
5#include <sys/signalfd.h>
6#include <sys/types.h>
Austin Schuhe516ab02020-05-06 21:37:04 -07007#include <optional>
Brian Silverman177567e2020-08-12 19:51:33 -07008#include <vector>
Austin Schuh20b2b082019-09-11 20:42:56 -07009
Brian Silverman0eaa1da2020-08-12 20:03:52 -070010#include "absl/types/span.h"
11
Austin Schuh20b2b082019-09-11 20:42:56 -070012#include "aos/ipc_lib/aos_sync.h"
Brian Silvermana1652f32020-01-29 20:41:44 -080013#include "aos/ipc_lib/data_alignment.h"
Austin Schuh20b2b082019-09-11 20:42:56 -070014#include "aos/ipc_lib/index.h"
15#include "aos/time/time.h"
16
17namespace aos {
18namespace ipc_lib {
19
20// Structure to hold the state required to wake a watcher.
21struct Watcher {
22 // Mutex that the watcher locks. If the futex is 0 (or FUTEX_OWNER_DIED),
23 // then this watcher is invalid. The futex variable will then hold the tid of
24 // the watcher, or FUTEX_OWNER_DIED if the task died.
25 //
Brian Silvermanfafe1fa2019-12-18 21:42:18 -080026 // Note: this is only modified with the queue_setup_lock lock held, but may
27 // always be read.
Austin Schuh20b2b082019-09-11 20:42:56 -070028 // Any state modification should happen before the lock is acquired.
29 aos_mutex tid;
30
31 // PID of the watcher.
Brian Silvermanfafe1fa2019-12-18 21:42:18 -080032 std::atomic<pid_t> pid;
Austin Schuh20b2b082019-09-11 20:42:56 -070033
34 // RT priority of the watcher.
Brian Silvermanfafe1fa2019-12-18 21:42:18 -080035 std::atomic<int> priority;
Austin Schuh20b2b082019-09-11 20:42:56 -070036};
37
38// Structure to hold the state required to send messages.
39struct Sender {
40 // Mutex that the sender locks. If the futex is 0 (or FUTEX_OWNER_DIED), then
41 // this sender is invalid. The futex variable will then hold the tid of the
42 // sender, or FUTEX_OWNER_DIED if the task died.
43 //
Brian Silvermanfafe1fa2019-12-18 21:42:18 -080044 // Note: this is only modified with the queue_setup_lock lock held, but may
45 // always be read.
Austin Schuh20b2b082019-09-11 20:42:56 -070046 aos_mutex tid;
47
48 // Index of the message we will be filling out.
49 AtomicIndex scratch_index;
50
51 // Index of the element being swapped with scratch_index, or Invalid if there
52 // is nothing to do.
53 AtomicIndex to_replace;
54};
55
Brian Silverman177567e2020-08-12 19:51:33 -070056// Structure to hold the state required to pin messages.
57struct Pinner {
58 // The same as Sender::tid. See there for docs.
59 aos_mutex tid;
60
61 // Queue index of the message we have pinned, or Invalid if there isn't one.
62 AtomicQueueIndex pinned;
63
64 // This should always be valid.
65 //
66 // Note that this is fully independent from pinned. It's just a place to stash
67 // a message, to ensure there's always an unpinned one for a writer to grab.
68 AtomicIndex scratch_index;
69};
70
Austin Schuh20b2b082019-09-11 20:42:56 -070071// Structure representing a message.
72struct Message {
73 struct Header {
74 // Index of this message in the queue. Needs to match the index this
75 // message is written into the queue at. The data in this message is only
76 // valid if it matches the index in the queue both before and after all the
77 // data is read.
78 //
79 // Note: a value of 0xffffffff always means that the contents aren't valid.
80 AtomicQueueIndex queue_index;
81
Brian Silvermanfafe1fa2019-12-18 21:42:18 -080082 // Timestamp of the message. Needs to be monotonically incrementing in the
Austin Schuh20b2b082019-09-11 20:42:56 -070083 // queue, which means that time needs to be re-sampled every time a write
84 // fails.
Austin Schuhb5c6f972021-03-14 21:53:07 -070085 monotonic_clock::time_point monotonic_sent_time;
86 realtime_clock::time_point realtime_sent_time;
Austin Schuhad154822019-12-27 15:45:13 -080087 // Timestamps of the message from the remote node. These are transparently
88 // passed through.
Austin Schuhb5c6f972021-03-14 21:53:07 -070089 monotonic_clock::time_point monotonic_remote_time;
90 realtime_clock::time_point realtime_remote_time;
Austin Schuhad154822019-12-27 15:45:13 -080091
92 // Queue index from the remote node.
93 uint32_t remote_queue_index;
Austin Schuh20b2b082019-09-11 20:42:56 -070094
95 size_t length;
96 } header;
97
Brian Silverman0eaa1da2020-08-12 20:03:52 -070098 // Returns the start of the data buffer, given that message_data_size is
99 // the same one used to allocate this message's memory.
100 char *data(size_t message_data_size) {
101 return RoundedData(message_data_size);
102 }
103 const char *data(size_t message_data_size) const {
104 return RoundedData(message_data_size);
105 }
106
107 // Returns the pre-buffer redzone, given that message_data_size is the same
108 // one used to allocate this message's memory.
109 absl::Span<char> PreRedzone(size_t message_data_size) {
110 char *const end = data(message_data_size);
111 const auto result =
112 absl::Span<char>(&data_pointer[0], end - &data_pointer[0]);
113 DCHECK_LT(result.size(), kChannelDataRedzone + kChannelDataAlignment);
114 return result;
115 }
116 absl::Span<const char> PreRedzone(size_t message_data_size) const {
117 const char *const end = data(message_data_size);
118 const auto result =
119 absl::Span<const char>(&data_pointer[0], end - &data_pointer[0]);
120 DCHECK_LT(result.size(), kChannelDataRedzone + kChannelDataAlignment);
121 return result;
122 }
123
124 // Returns the post-buffer redzone, given that message_data_size is the same
125 // one used to allocate this message's memory.
126 absl::Span<char> PostRedzone(size_t message_data_size, size_t message_size) {
127 DCHECK_LT(message_data_size, message_size);
128 char *const redzone_end = reinterpret_cast<char *>(this) + message_size;
129 char *const data_end = data(message_data_size) + message_data_size;
130 DCHECK_GT(static_cast<void *>(redzone_end), static_cast<void *>(data_end));
131 const auto result = absl::Span<char>(data_end, redzone_end - data_end);
132 DCHECK_LT(result.size(), kChannelDataRedzone + kChannelDataAlignment * 2);
133 return result;
134 }
135 absl::Span<const char> PostRedzone(size_t message_data_size,
136 size_t message_size) const {
137 DCHECK_LT(message_data_size, message_size);
138 const char *const redzone_end =
139 reinterpret_cast<const char *>(this) + message_size;
140 const char *const data_end = data(message_data_size) + message_data_size;
141 DCHECK_GT(static_cast<const void *>(redzone_end),
142 static_cast<const void *>(data_end));
143 const auto result =
144 absl::Span<const char>(data_end, redzone_end - data_end);
145 DCHECK_LT(result.size(), kChannelDataRedzone + kChannelDataAlignment * 2);
146 return result;
Brian Silvermana1652f32020-01-29 20:41:44 -0800147 }
148
149 private:
Brian Silverman0eaa1da2020-08-12 20:03:52 -0700150 // This returns a non-const pointer into a const object. Be very careful
151 // about const correctness in publicly accessible APIs using it.
152 char *RoundedData(size_t message_data_size) const {
153 return RoundChannelData(
154 const_cast<char *>(&data_pointer[0] + kChannelDataRedzone),
155 message_data_size);
Brian Silvermana1652f32020-01-29 20:41:44 -0800156 }
157
158 char data_pointer[];
Austin Schuh20b2b082019-09-11 20:42:56 -0700159};
160
161struct LocklessQueueConfiguration {
162 // Size of the watchers list.
163 size_t num_watchers;
164 // Size of the sender list.
165 size_t num_senders;
Brian Silverman177567e2020-08-12 19:51:33 -0700166 // Size of the pinner list.
167 size_t num_pinners;
Austin Schuh20b2b082019-09-11 20:42:56 -0700168
169 // Size of the list of pointers into the messages list.
170 size_t queue_size;
171 // Size in bytes of the data stored in each Message.
172 size_t message_data_size;
173
Austin Schuh4bc4f902019-12-23 18:04:51 -0800174 size_t message_size() const;
Austin Schuh20b2b082019-09-11 20:42:56 -0700175
Brian Silverman177567e2020-08-12 19:51:33 -0700176 size_t num_messages() const { return num_senders + num_pinners + queue_size; }
Austin Schuh20b2b082019-09-11 20:42:56 -0700177};
178
179// Structure to hold the state of the queue.
180//
181// Reads and writes are lockless and constant time.
182//
183// Adding a new watcher doesn't need to be constant time for the watcher (this
184// is done before the watcher goes RT), but needs to be RT for the sender.
185struct LocklessQueueMemory;
186
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700187// Returns the size of the LocklessQueueMemory.
188size_t LocklessQueueMemorySize(LocklessQueueConfiguration config);
189
Austin Schuh20b2b082019-09-11 20:42:56 -0700190// Initializes the queue memory. memory must be either a valid pointer to the
191// queue datastructure, or must be zero initialized.
192LocklessQueueMemory *InitializeLocklessQueueMemory(
193 LocklessQueueMemory *memory, LocklessQueueConfiguration config);
194
Alex Perrycb7da4b2019-08-28 19:35:56 -0700195const static unsigned int kWakeupSignal = SIGRTMIN + 2;
Austin Schuh20b2b082019-09-11 20:42:56 -0700196
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700197// A convenient wrapper for accessing a lockless queue.
Austin Schuh20b2b082019-09-11 20:42:56 -0700198class LocklessQueue {
199 public:
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700200 LocklessQueue(const LocklessQueueMemory *const_memory,
201 LocklessQueueMemory *memory, LocklessQueueConfiguration config)
202 : const_memory_(const_memory), memory_(memory), config_(config) {}
Austin Schuh20b2b082019-09-11 20:42:56 -0700203
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700204 void Initialize();
Austin Schuh20b2b082019-09-11 20:42:56 -0700205
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700206 LocklessQueueConfiguration config() const { return config_; }
Austin Schuh20b2b082019-09-11 20:42:56 -0700207
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700208 const LocklessQueueMemory *const_memory() { return const_memory_; }
209 LocklessQueueMemory *memory() { return memory_; }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700210
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700211 private:
212 const LocklessQueueMemory *const_memory_;
213 LocklessQueueMemory *memory_;
214 LocklessQueueConfiguration config_;
215};
216
217class LocklessQueueWatcher {
218 public:
219 LocklessQueueWatcher(const LocklessQueueWatcher &) = delete;
220 LocklessQueueWatcher &operator=(const LocklessQueueWatcher &) = delete;
221 LocklessQueueWatcher(LocklessQueueWatcher &&other)
222 : memory_(other.memory_), watcher_index_(other.watcher_index_) {
223 other.watcher_index_ = -1;
224 }
225 LocklessQueueWatcher &operator=(LocklessQueueWatcher &&other) {
226 std::swap(memory_, other.memory_);
227 std::swap(watcher_index_, other.watcher_index_);
228 return *this;
229 }
230
231 ~LocklessQueueWatcher();
232
233 // Registers this thread to receive the kWakeupSignal signal when
234 // LocklessQueueWakeUpper::Wakeup is called. Returns nullopt if there was an
235 // error in registration.
236 // TODO(austin): Change the API if we find ourselves with more errors.
237 static std::optional<LocklessQueueWatcher> Make(LocklessQueue queue,
238 int priority);
239
240 private:
241 LocklessQueueWatcher(LocklessQueueMemory *memory, int priority);
242
243 LocklessQueueMemory *memory_ = nullptr;
244
245 // Index in the watcher list that our entry is, or -1 if no watcher is
246 // registered.
247 int watcher_index_ = -1;
248};
249
250class LocklessQueueWakeUpper {
251 public:
252 LocklessQueueWakeUpper(LocklessQueue queue);
Austin Schuh20b2b082019-09-11 20:42:56 -0700253
254 // Sends the kWakeupSignal to all threads which have called RegisterWakeup.
255 //
256 // priority of 0 means nonrt. nonrt could have issues, so we don't PI boost
257 // if nonrt.
258 int Wakeup(int current_priority);
259
Austin Schuh20b2b082019-09-11 20:42:56 -0700260 private:
Austin Schuh20b2b082019-09-11 20:42:56 -0700261 // Memory and datastructure used to sort a list of watchers to wake
262 // up. This isn't a copy of Watcher since tid is simpler to work with here
263 // than the futex above.
264 struct WatcherCopy {
265 pid_t tid;
266 pid_t pid;
267 int priority;
268 };
Austin Schuh20b2b082019-09-11 20:42:56 -0700269
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700270 const LocklessQueueMemory *const memory_;
Austin Schuh20b2b082019-09-11 20:42:56 -0700271 const int pid_;
272 const uid_t uid_;
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700273
274 ::std::vector<WatcherCopy> watcher_copy_;
Austin Schuh20b2b082019-09-11 20:42:56 -0700275};
276
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700277// Sender for blocks of data. The resources associated with a sender are
278// scoped to this object's lifetime.
279class LocklessQueueSender {
280 public:
281 LocklessQueueSender(const LocklessQueueSender &) = delete;
282 LocklessQueueSender &operator=(const LocklessQueueSender &) = delete;
283 LocklessQueueSender(LocklessQueueSender &&other)
284 : memory_(other.memory_), sender_index_(other.sender_index_) {
285 other.memory_ = nullptr;
286 other.sender_index_ = -1;
287 }
288 LocklessQueueSender &operator=(LocklessQueueSender &&other) {
289 std::swap(memory_, other.memory_);
290 std::swap(sender_index_, other.sender_index_);
291 return *this;
292 }
293
294 ~LocklessQueueSender();
295
296 // Creates a sender. If we couldn't allocate a sender, returns nullopt.
297 // TODO(austin): Change the API if we find ourselves with more errors.
298 static std::optional<LocklessQueueSender> Make(LocklessQueue queue);
299
300 // Sends a message without copying the data.
301 // Copy at most size() bytes of data into the memory pointed to by Data(),
302 // and then call Send().
303 // Note: calls to Data() are expensive enough that you should cache it.
304 size_t size() const;
305 void *Data();
Austin Schuhb5c6f972021-03-14 21:53:07 -0700306 bool Send(size_t length, monotonic_clock::time_point monotonic_remote_time,
307 realtime_clock::time_point realtime_remote_time,
308 uint32_t remote_queue_index,
309 monotonic_clock::time_point *monotonic_sent_time = nullptr,
310 realtime_clock::time_point *realtime_sent_time = nullptr,
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700311 uint32_t *queue_index = nullptr);
312
313 // Sends up to length data. Does not wakeup the target.
Austin Schuh91ba6392020-10-03 13:27:47 -0700314 bool Send(const char *data, size_t length,
Austin Schuhb5c6f972021-03-14 21:53:07 -0700315 monotonic_clock::time_point monotonic_remote_time,
316 realtime_clock::time_point realtime_remote_time,
317 uint32_t remote_queue_index,
318 monotonic_clock::time_point *monotonic_sent_time,
319 realtime_clock::time_point *realtime_sent_time,
320 uint32_t *queue_index);
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700321
322 int buffer_index() const;
323
324 private:
325 LocklessQueueSender(LocklessQueueMemory *memory);
326
327 // Pointer to the backing memory.
328 LocklessQueueMemory *memory_ = nullptr;
329
330 // Index into the sender list.
331 int sender_index_ = -1;
332};
333
334// Pinner for blocks of data. The resources associated with a pinner are
335// scoped to this object's lifetime.
336class LocklessQueuePinner {
337 public:
338 LocklessQueuePinner(const LocklessQueuePinner &) = delete;
339 LocklessQueuePinner &operator=(const LocklessQueuePinner &) = delete;
340 LocklessQueuePinner(LocklessQueuePinner &&other)
341 : memory_(other.memory_),
342 const_memory_(other.const_memory_),
343 pinner_index_(other.pinner_index_) {
344 other.pinner_index_ = -1;
345 }
346 LocklessQueuePinner &operator=(LocklessQueuePinner &&other) {
347 std::swap(memory_, other.memory_);
348 std::swap(const_memory_, other.const_memory_);
349 std::swap(pinner_index_, other.pinner_index_);
350 return *this;
351 }
352
353 ~LocklessQueuePinner();
354
355 // Creates a pinner. If we couldn't allocate a pinner, returns nullopt.
356 // TODO(austin): Change the API if we find ourselves with more errors.
357 static std::optional<LocklessQueuePinner> Make(LocklessQueue queue);
358
359 // Attempts to pin the message at queue_index.
360 // Un-pins the previous message.
361 // Returns the buffer index (non-negative) if it succeeds.
362 // Returns -1 if that message is no longer in the queue.
363 int PinIndex(uint32_t queue_index);
364
365 // Read at most size() bytes of data into the memory pointed to by Data().
366 // Note: calls to Data() are expensive enough that you should cache it.
367 // Don't call Data() before a successful PinIndex call.
368 size_t size() const;
369 const void *Data() const;
370
371 private:
372 LocklessQueuePinner(LocklessQueueMemory *memory,
373 const LocklessQueueMemory *const_memory);
374
375 // Pointer to the backing memory.
376 LocklessQueueMemory *memory_ = nullptr;
377 const LocklessQueueMemory *const_memory_ = nullptr;
378
379 // Index into the pinner list.
380 int pinner_index_ = -1;
381};
382
383class LocklessQueueReader {
384 public:
385 enum class Result { TOO_OLD, GOOD, NOTHING_NEW, OVERWROTE };
386
387 LocklessQueueReader(LocklessQueue queue) : memory_(queue.const_memory()) {
388 queue.Initialize();
389 }
390
391 // If you ask for a queue index 2 past the newest, you will still get
392 // NOTHING_NEW until that gets overwritten with new data. If you ask for an
393 // element newer than QueueSize() from the current message, we consider it
394 // behind by a large amount and return TOO_OLD. If the message is modified
395 // out from underneath us as we read it, return OVERWROTE.
396 //
397 // data may be nullptr to indicate the data should not be copied.
398 Result Read(uint32_t queue_index,
Austin Schuhb5c6f972021-03-14 21:53:07 -0700399 monotonic_clock::time_point *monotonic_sent_time,
400 realtime_clock::time_point *realtime_sent_time,
401 monotonic_clock::time_point *monotonic_remote_time,
402 realtime_clock::time_point *realtime_remote_time,
403 uint32_t *remote_queue_index,
404 size_t *length, char *data) const;
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700405
406 // Returns the index to the latest queue message. Returns empty_queue_index()
407 // if there are no messages in the queue. Do note that this index wraps if
408 // more than 2^32 messages are sent.
409 QueueIndex LatestIndex() const;
410
411 private:
412 const LocklessQueueMemory *const memory_;
413};
414
415// Returns the number of messages which are logically in the queue at a time.
416size_t LocklessQueueSize(const LocklessQueueMemory *memory);
417
418// Returns the number of bytes queue users are allowed to read/write within each
419// message.
420size_t LocklessQueueMessageDataSize(const LocklessQueueMemory *memory);
421
422// TODO(austin): Return the oldest queue index. This lets us catch up nicely
423// if we got behind.
424// The easiest way to implement this is likely going to be to reserve the
425// first modulo of values for the initial time around, and never reuse them.
426// That lets us do a simple atomic read of the next index and deduce what has
427// happened. It will involve the simplest atomic operations.
428
429// TODO(austin): Make it so we can find the indices which were sent just
430// before and after a time with a binary search.
431
432// Prints to stdout the data inside the queue for debugging.
433void PrintLocklessQueueMemory(LocklessQueueMemory *memory);
434
Austin Schuh20b2b082019-09-11 20:42:56 -0700435} // namespace ipc_lib
436} // namespace aos
437
438#endif // AOS_IPC_LIB_LOCKLESS_QUEUE_H_