blob: 986ca622d821c5cb5cca8a2aa6fa0942589ad8b9 [file] [log] [blame]
Austin Schuh20b2b082019-09-11 20:42:56 -07001#include "aos/ipc_lib/lockless_queue.h"
2
3#include <linux/futex.h>
Brennan Coslett6af53bb2023-07-18 15:22:46 -05004#include <pwd.h>
Stephan Pleines682928d2024-05-31 20:43:48 -07005#include <sched.h>
6#include <string.h>
7#include <sys/syscall.h>
Austin Schuh20b2b082019-09-11 20:42:56 -07008#include <sys/types.h>
Austin Schuh20b2b082019-09-11 20:42:56 -07009#include <unistd.h>
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -070010
Austin Schuh20b2b082019-09-11 20:42:56 -070011#include <algorithm>
Stephan Pleines682928d2024-05-31 20:43:48 -070012#include <chrono>
13#include <compare>
Austin Schuh20b2b082019-09-11 20:42:56 -070014#include <iomanip>
15#include <iostream>
Stephan Pleines682928d2024-05-31 20:43:48 -070016#include <string>
17#include <string_view>
Austin Schuh20b2b082019-09-11 20:42:56 -070018
Austin Schuh99f7c6a2024-06-25 22:07:44 -070019#include "absl/flags/flag.h"
20#include "absl/log/check.h"
21#include "absl/log/log.h"
Austin Schuhbe416742020-10-03 17:24:26 -070022#include "absl/strings/escaping.h"
Philipp Schrader790cb542023-07-05 21:06:52 -070023
Austin Schuh20b2b082019-09-11 20:42:56 -070024#include "aos/ipc_lib/lockless_queue_memory.h"
Austin Schuh20b2b082019-09-11 20:42:56 -070025#include "aos/util/compiler_memory_barrier.h"
26
Austin Schuh99f7c6a2024-06-25 22:07:44 -070027ABSL_FLAG(bool, dump_lockless_queue_data, false,
28 "If true, print the data out when dumping the queue.");
29ABSL_DECLARE_FLAG(bool, skip_realtime_scheduler);
Brian Silverman001f24d2020-08-12 19:33:20 -070030
Stephan Pleinesf63bde82024-01-13 15:59:33 -080031namespace aos::ipc_lib {
Austin Schuh20b2b082019-09-11 20:42:56 -070032namespace {
33
Brian Silvermanfafe1fa2019-12-18 21:42:18 -080034class GrabQueueSetupLockOrDie {
35 public:
36 GrabQueueSetupLockOrDie(LocklessQueueMemory *memory) : memory_(memory) {
37 const int result = mutex_grab(&(memory->queue_setup_lock));
38 CHECK(result == 0 || result == 1) << ": " << result;
39 }
Austin Schuh20b2b082019-09-11 20:42:56 -070040
Brian Silvermanfafe1fa2019-12-18 21:42:18 -080041 ~GrabQueueSetupLockOrDie() { mutex_unlock(&(memory_->queue_setup_lock)); }
42
43 GrabQueueSetupLockOrDie(const GrabQueueSetupLockOrDie &) = delete;
44 GrabQueueSetupLockOrDie &operator=(const GrabQueueSetupLockOrDie &) = delete;
45
46 private:
47 LocklessQueueMemory *const memory_;
48};
49
Brian Silverman177567e2020-08-12 19:51:33 -070050bool IsPinned(LocklessQueueMemory *memory, Index index) {
51 DCHECK(index.valid());
52 const size_t queue_size = memory->queue_size();
53 const QueueIndex message_index =
54 memory->GetMessage(index)->header.queue_index.Load(queue_size);
55 if (!message_index.valid()) {
56 return false;
57 }
58 DCHECK(memory->GetQueue(message_index.Wrapped())->Load() != index)
59 << ": Message is in the queue";
60 for (int pinner_index = 0;
61 pinner_index < static_cast<int>(memory->config.num_pinners);
62 ++pinner_index) {
63 ipc_lib::Pinner *const pinner = memory->GetPinner(pinner_index);
64
65 if (pinner->pinned.RelaxedLoad(queue_size) == message_index) {
66 return true;
67 }
68 }
69 return false;
70}
71
72// Ensures sender->scratch_index (which must contain to_replace) is not pinned.
73//
74// Returns the new scratch_index value.
75Index SwapPinnedSenderScratch(LocklessQueueMemory *const memory,
76 ipc_lib::Sender *const sender,
77 const Index to_replace) {
78 // If anybody's trying to pin this message, then grab a message from a pinner
79 // to write into instead, and leave the message we pulled out of the queue
80 // (currently in our scratch_index) with a pinner.
81 //
82 // This loop will terminate in at most one iteration through the pinners in
83 // any steady-state configuration of the memory. There are only as many
84 // Pinner::pinned values to worry about as there are Pinner::scratch_index
85 // values to check against, plus to_replace, which means there will always be
86 // a free one. We might have to make multiple passes if things are being
87 // changed concurrently though, but nobody dying can make this loop fail to
88 // terminate (because the number of processes that can die is bounded, because
89 // no new ones can start while we've got the lock).
90 for (int pinner_index = 0; true;
91 pinner_index = (pinner_index + 1) % memory->config.num_pinners) {
92 if (!IsPinned(memory, to_replace)) {
93 // No pinners on our current scratch_index, so we're fine now.
94 VLOG(3) << "No pinners: " << to_replace.DebugString();
95 return to_replace;
96 }
97
98 ipc_lib::Pinner *const pinner = memory->GetPinner(pinner_index);
99
100 const Index pinner_scratch = pinner->scratch_index.RelaxedLoad();
101 CHECK(pinner_scratch.valid())
102 << ": Pinner scratch_index should always be valid";
103 if (IsPinned(memory, pinner_scratch)) {
104 // Wouldn't do us any good to swap with this one, so don't bother, and
105 // move onto the next one.
106 VLOG(3) << "Also pinned: " << pinner_scratch.DebugString();
107 continue;
108 }
109
110 sender->to_replace.RelaxedStore(pinner_scratch);
111 aos_compiler_memory_barrier();
112 // Give the pinner the message (which is currently in
113 // sender->scratch_index).
114 if (!pinner->scratch_index.CompareAndExchangeStrong(pinner_scratch,
115 to_replace)) {
116 // Somebody swapped into this pinner before us. The new value is probably
117 // pinned, so we don't want to look at it again immediately.
118 VLOG(3) << "Pinner " << pinner_index
119 << " scratch_index changed: " << pinner_scratch.DebugString()
120 << ", " << to_replace.DebugString();
121 sender->to_replace.RelaxedInvalidate();
122 continue;
123 }
124 aos_compiler_memory_barrier();
125 // Now update the sender's scratch space and record that we succeeded.
126 sender->scratch_index.Store(pinner_scratch);
127 aos_compiler_memory_barrier();
128 // And then record that we succeeded, but definitely after the above
129 // store.
130 sender->to_replace.RelaxedInvalidate();
131 VLOG(3) << "Got new scratch message: " << pinner_scratch.DebugString();
132
133 // If it's in a pinner's scratch_index, it should not be in the queue, which
134 // means nobody new can pin it for real. However, they can still attempt to
135 // pin it, which means we can't verify !IsPinned down here.
136
137 return pinner_scratch;
138 }
139}
140
Brian Silvermand5ca8c62020-08-12 19:51:03 -0700141// Returns true if it succeeded. Returns false if another sender died in the
142// middle.
143bool DoCleanup(LocklessQueueMemory *memory, const GrabQueueSetupLockOrDie &) {
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800144 // Make sure we start looking at shared memory fresh right now. We'll handle
145 // people dying partway through by either cleaning up after them or not, but
146 // we want to ensure we clean up after anybody who has already died when we
147 // start.
148 aos_compiler_memory_barrier();
149
Austin Schuh20b2b082019-09-11 20:42:56 -0700150 const size_t num_senders = memory->num_senders();
Brian Silverman177567e2020-08-12 19:51:33 -0700151 const size_t num_pinners = memory->num_pinners();
Austin Schuh20b2b082019-09-11 20:42:56 -0700152 const size_t queue_size = memory->queue_size();
153 const size_t num_messages = memory->num_messages();
154
155 // There are a large number of crazy cases here for how things can go wrong
156 // and how we have to recover. They either require us to keep extra track of
157 // what is going on, slowing down the send path, or require a large number of
158 // cases.
159 //
160 // The solution here is to not over-think it. This is running while not real
161 // time during construction. It is allowed to be slow. It will also very
162 // rarely trigger. There is a small uS window where process death is
163 // ambiguous.
164 //
165 // So, build up a list N long, where N is the number of messages. Search
166 // through the entire queue and the sender list (ignoring any dead senders),
167 // and mark down which ones we have seen. Once we have seen all the messages
168 // except the N dead senders, we know which messages are dead. Because the
169 // queue is active while we do this, it may take a couple of go arounds to see
170 // everything.
171
Brian Silvermand5ca8c62020-08-12 19:51:03 -0700172 ::std::vector<bool> need_recovery(num_senders, false);
173
Austin Schuh20b2b082019-09-11 20:42:56 -0700174 // Do the easy case. Find all senders who have died. See if they are either
175 // consistent already, or if they have copied over to_replace to the scratch
176 // index, but haven't cleared to_replace. Count them.
177 size_t valid_senders = 0;
178 for (size_t i = 0; i < num_senders; ++i) {
179 Sender *sender = memory->GetSender(i);
Philipp Schrader81fa3fb2023-09-17 18:58:35 -0700180 if (!sender->ownership_tracker.OwnerIsDefinitelyAbsolutelyDead()) {
Austin Schuh20b2b082019-09-11 20:42:56 -0700181 // Not dead.
182 ++valid_senders;
Brian Silvermand5ca8c62020-08-12 19:51:03 -0700183 continue;
Austin Schuh20b2b082019-09-11 20:42:56 -0700184 }
Brian Silvermand5ca8c62020-08-12 19:51:03 -0700185 VLOG(3) << "Found an easy death for sender " << i;
186 // We can do a relaxed load here because we're the only person touching
187 // this sender at this point.
188 const Index to_replace = sender->to_replace.RelaxedLoad();
189 const Index scratch_index = sender->scratch_index.Load();
190
191 // I find it easiest to think about this in terms of the set of observable
192 // states. The main code progresses through the following states:
193
194 // 1) scratch_index = xxx
195 // to_replace = invalid
196 // This is unambiguous. Already good.
197
198 // 2) scratch_index = xxx
199 // to_replace = yyy
200 // Very ambiguous. Is xxx or yyy the correct one? Need to either roll
201 // this forwards or backwards.
202
203 // 3) scratch_index = yyy
204 // to_replace = yyy
205 // We are in the act of moving to_replace to scratch_index, but didn't
206 // finish. Easy.
Brian Silverman177567e2020-08-12 19:51:33 -0700207 //
208 // If doing a pinner swap, we've definitely done it.
Brian Silvermand5ca8c62020-08-12 19:51:03 -0700209
210 // 4) scratch_index = yyy
211 // to_replace = invalid
212 // Finished, but died. Looks like 1)
213
Brian Silverman177567e2020-08-12 19:51:33 -0700214 // Swapping with a pinner's scratch_index passes through the same states.
215 // We just need to ensure the message that ends up in the senders's
216 // scratch_index isn't pinned, using the same code as sending does.
217
Brian Silvermand5ca8c62020-08-12 19:51:03 -0700218 // Any cleanup code needs to follow the same set of states to be robust to
219 // death, so death can be restarted.
220
221 if (!to_replace.valid()) {
222 // 1) or 4). Make sure we aren't corrupted and declare victory.
223 CHECK(scratch_index.valid());
224
Brian Silverman177567e2020-08-12 19:51:33 -0700225 // If it's in 1) with a pinner, the sender might have a pinned message,
226 // so fix that.
227 SwapPinnedSenderScratch(memory, sender, scratch_index);
228
229 // If it's in 4), it may not have completed this step yet. This will
230 // always be a NOP if it's in 1), verified by a DCHECK.
231 memory->GetMessage(scratch_index)->header.queue_index.RelaxedInvalidate();
232
Philipp Schraderab2f8432023-09-17 18:58:06 -0700233 sender->ownership_tracker.ForceClear();
Brian Silvermand5ca8c62020-08-12 19:51:03 -0700234 ++valid_senders;
235 continue;
236 }
237
238 // Could be 2) or 3) at this point.
239
240 if (to_replace == scratch_index) {
241 // 3) for sure.
242 // Just need to invalidate to_replace to finish.
243 sender->to_replace.Invalidate();
244
Brian Silverman177567e2020-08-12 19:51:33 -0700245 // Make sure to indicate it's an unused message before a sender gets its
246 // hands on it.
247 memory->GetMessage(scratch_index)->header.queue_index.RelaxedInvalidate();
248 aos_compiler_memory_barrier();
249
Brian Silvermand5ca8c62020-08-12 19:51:03 -0700250 // And mark that we succeeded.
Philipp Schraderab2f8432023-09-17 18:58:06 -0700251 sender->ownership_tracker.ForceClear();
Brian Silvermand5ca8c62020-08-12 19:51:03 -0700252 ++valid_senders;
253 continue;
254 }
255
256 // Must be 2). Mark it for later.
257 need_recovery[i] = true;
Austin Schuh20b2b082019-09-11 20:42:56 -0700258 }
259
Brian Silverman177567e2020-08-12 19:51:33 -0700260 // Cleaning up pinners is easy. We don't actually have to do anything, but
261 // invalidating its pinned field might help catch bugs elsewhere trying to
262 // read it before it's set.
263 for (size_t i = 0; i < num_pinners; ++i) {
264 Pinner *const pinner = memory->GetPinner(i);
Philipp Schrader81fa3fb2023-09-17 18:58:35 -0700265 if (!pinner->ownership_tracker.OwnerIsDefinitelyAbsolutelyDead()) {
Brian Silverman177567e2020-08-12 19:51:33 -0700266 continue;
267 }
268 pinner->pinned.Invalidate();
Philipp Schraderab2f8432023-09-17 18:58:06 -0700269 pinner->ownership_tracker.ForceClear();
Brian Silverman177567e2020-08-12 19:51:33 -0700270 }
271
Austin Schuh20b2b082019-09-11 20:42:56 -0700272 // If all the senders are (or were made) good, there is no need to do the hard
273 // case.
274 if (valid_senders == num_senders) {
Brian Silvermand5ca8c62020-08-12 19:51:03 -0700275 return true;
Austin Schuh20b2b082019-09-11 20:42:56 -0700276 }
277
Alex Perrycb7da4b2019-08-28 19:35:56 -0700278 VLOG(3) << "Starting hard cleanup";
Austin Schuh20b2b082019-09-11 20:42:56 -0700279
280 size_t num_accounted_for = 0;
281 size_t num_missing = 0;
282 ::std::vector<bool> accounted_for(num_messages, false);
283
284 while ((num_accounted_for + num_missing) != num_messages) {
285 num_missing = 0;
286 for (size_t i = 0; i < num_senders; ++i) {
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800287 Sender *const sender = memory->GetSender(i);
Philipp Schrader81fa3fb2023-09-17 18:58:35 -0700288 if (sender->ownership_tracker.OwnerIsDefinitelyAbsolutelyDead()) {
Brian Silvermand5ca8c62020-08-12 19:51:03 -0700289 if (!need_recovery[i]) {
290 return false;
291 }
Austin Schuh20b2b082019-09-11 20:42:56 -0700292 ++num_missing;
Brian Silverman177567e2020-08-12 19:51:33 -0700293 continue;
Austin Schuh20b2b082019-09-11 20:42:56 -0700294 }
Brian Silverman177567e2020-08-12 19:51:33 -0700295 CHECK(!need_recovery[i]) << ": Somebody else recovered a sender: " << i;
296 // We can do a relaxed load here because we're the only person touching
297 // this sender at this point, if it matters. If it's not a dead sender,
298 // then any message it ever has will eventually be accounted for if we
299 // make enough tries through the outer loop.
300 const Index scratch_index = sender->scratch_index.RelaxedLoad();
301 if (!accounted_for[scratch_index.message_index()]) {
302 ++num_accounted_for;
303 }
304 accounted_for[scratch_index.message_index()] = true;
Austin Schuh20b2b082019-09-11 20:42:56 -0700305 }
306
307 for (size_t i = 0; i < queue_size; ++i) {
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800308 // Same logic as above for scratch_index applies here too.
Austin Schuh20b2b082019-09-11 20:42:56 -0700309 const Index index = memory->GetQueue(i)->RelaxedLoad();
310 if (!accounted_for[index.message_index()]) {
311 ++num_accounted_for;
312 }
313 accounted_for[index.message_index()] = true;
314 }
Brian Silvermand5ca8c62020-08-12 19:51:03 -0700315
Brian Silverman177567e2020-08-12 19:51:33 -0700316 for (size_t pinner_index = 0; pinner_index < num_pinners; ++pinner_index) {
317 // Same logic as above for scratch_index applies here too.
318 const Index index =
319 memory->GetPinner(pinner_index)->scratch_index.RelaxedLoad();
320 if (!accounted_for[index.message_index()]) {
321 ++num_accounted_for;
322 }
323 accounted_for[index.message_index()] = true;
324 }
325
Brian Silvermand5ca8c62020-08-12 19:51:03 -0700326 CHECK_LE(num_accounted_for + num_missing, num_messages);
Austin Schuh20b2b082019-09-11 20:42:56 -0700327 }
328
329 while (num_missing != 0) {
330 const size_t starting_num_missing = num_missing;
331 for (size_t i = 0; i < num_senders; ++i) {
332 Sender *sender = memory->GetSender(i);
Philipp Schrader81fa3fb2023-09-17 18:58:35 -0700333 if (!sender->ownership_tracker.OwnerIsDefinitelyAbsolutelyDead()) {
Brian Silvermand5ca8c62020-08-12 19:51:03 -0700334 CHECK(!need_recovery[i]) << ": Somebody else recovered a sender: " << i;
335 continue;
336 }
337 if (!need_recovery[i]) {
338 return false;
339 }
340 // We can do relaxed loads here because we're the only person touching
341 // this sender at this point.
342 const Index scratch_index = sender->scratch_index.RelaxedLoad();
343 const Index to_replace = sender->to_replace.RelaxedLoad();
Austin Schuh20b2b082019-09-11 20:42:56 -0700344
Brian Silvermand5ca8c62020-08-12 19:51:03 -0700345 // Candidate.
346 if (to_replace.valid()) {
347 CHECK_LE(to_replace.message_index(), accounted_for.size());
348 }
349 if (scratch_index.valid()) {
350 CHECK_LE(scratch_index.message_index(), accounted_for.size());
351 }
352 if (!to_replace.valid() || accounted_for[to_replace.message_index()]) {
353 CHECK(scratch_index.valid());
354 VLOG(3) << "Sender " << i
355 << " died, to_replace is already accounted for";
356 // If both are accounted for, we are corrupt...
357 CHECK(!accounted_for[scratch_index.message_index()]);
Austin Schuh20b2b082019-09-11 20:42:56 -0700358
Brian Silvermand5ca8c62020-08-12 19:51:03 -0700359 // to_replace is already accounted for. This means that we didn't
360 // atomically insert scratch_index into the queue yet. So
361 // invalidate to_replace.
362 sender->to_replace.Invalidate();
Brian Silverman177567e2020-08-12 19:51:33 -0700363 // Sender definitely will not have gotten here, so finish for it.
364 memory->GetMessage(scratch_index)
365 ->header.queue_index.RelaxedInvalidate();
Austin Schuh20b2b082019-09-11 20:42:56 -0700366
Brian Silvermand5ca8c62020-08-12 19:51:03 -0700367 // And then mark this sender clean.
Philipp Schraderab2f8432023-09-17 18:58:06 -0700368 sender->ownership_tracker.ForceClear();
Brian Silvermand5ca8c62020-08-12 19:51:03 -0700369 need_recovery[i] = false;
Austin Schuh20b2b082019-09-11 20:42:56 -0700370
Brian Silvermand5ca8c62020-08-12 19:51:03 -0700371 // And account for scratch_index.
372 accounted_for[scratch_index.message_index()] = true;
373 --num_missing;
374 ++num_accounted_for;
375 } else if (!scratch_index.valid() ||
376 accounted_for[scratch_index.message_index()]) {
377 VLOG(3) << "Sender " << i
378 << " died, scratch_index is already accounted for";
379 // scratch_index is accounted for. That means we did the insert,
380 // but didn't record it.
381 CHECK(to_replace.valid());
Brian Silverman177567e2020-08-12 19:51:33 -0700382
383 // Make sure to indicate it's an unused message before a sender gets its
384 // hands on it.
385 memory->GetMessage(to_replace)->header.queue_index.RelaxedInvalidate();
386 aos_compiler_memory_barrier();
387
Brian Silvermand5ca8c62020-08-12 19:51:03 -0700388 // Finish the transaction. Copy to_replace, then clear it.
Austin Schuh20b2b082019-09-11 20:42:56 -0700389
Brian Silvermand5ca8c62020-08-12 19:51:03 -0700390 sender->scratch_index.Store(to_replace);
391 sender->to_replace.Invalidate();
Austin Schuh20b2b082019-09-11 20:42:56 -0700392
Brian Silvermand5ca8c62020-08-12 19:51:03 -0700393 // And then mark this sender clean.
Philipp Schraderab2f8432023-09-17 18:58:06 -0700394 sender->ownership_tracker.ForceClear();
Brian Silvermand5ca8c62020-08-12 19:51:03 -0700395 need_recovery[i] = false;
Austin Schuh20b2b082019-09-11 20:42:56 -0700396
Brian Silvermand5ca8c62020-08-12 19:51:03 -0700397 // And account for to_replace.
398 accounted_for[to_replace.message_index()] = true;
399 --num_missing;
400 ++num_accounted_for;
401 } else {
402 VLOG(3) << "Sender " << i << " died, neither is accounted for";
403 // Ambiguous. There will be an unambiguous one somewhere that we
404 // can do first.
Austin Schuh20b2b082019-09-11 20:42:56 -0700405 }
406 }
407 // CHECK that we are making progress.
408 CHECK_NE(num_missing, starting_num_missing);
409 }
Brian Silvermand5ca8c62020-08-12 19:51:03 -0700410 return true;
411}
412
413void Cleanup(LocklessQueueMemory *memory, const GrabQueueSetupLockOrDie &lock) {
414 // The number of iterations is bounded here because there are only a finite
415 // number of senders in existence which could die, and no new ones can be
416 // created while we're in here holding the lock.
417 while (!DoCleanup(memory, lock)) {
418 }
Austin Schuh20b2b082019-09-11 20:42:56 -0700419}
420
421// Exposes rt_tgsigqueueinfo so we can send the signal *just* to the target
422// thread.
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800423// TODO(Brian): Do directly in assembly for armhf at least for efficiency.
Austin Schuh20b2b082019-09-11 20:42:56 -0700424int rt_tgsigqueueinfo(pid_t tgid, pid_t tid, int sig, siginfo_t *si) {
425 return syscall(SYS_rt_tgsigqueueinfo, tgid, tid, sig, si);
426}
427
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700428QueueIndex ZeroOrValid(QueueIndex index) {
429 if (!index.valid()) {
430 return index.Clear();
431 }
432 return index;
433}
434
Austin Schuh20b2b082019-09-11 20:42:56 -0700435} // namespace
436
Philipp Schraderab2f8432023-09-17 18:58:06 -0700437bool PretendThatOwnerIsDeadForTesting(aos_mutex *mutex, pid_t tid) {
438 if (static_cast<pid_t>(mutex->futex & FUTEX_TID_MASK) == tid) {
439 mutex->futex = FUTEX_OWNER_DIED;
440 return true;
441 }
442 return false;
443}
444
Austin Schuh4bc4f902019-12-23 18:04:51 -0800445size_t LocklessQueueConfiguration::message_size() const {
446 // Round up the message size so following data is aligned appropriately.
Brian Silverman0eaa1da2020-08-12 20:03:52 -0700447 // Make sure to leave space to align the message data. It will be aligned
448 // relative to the start of the shared memory region, but that might not be
449 // aligned for some use cases.
Brian Silvermana1652f32020-01-29 20:41:44 -0800450 return LocklessQueueMemory::AlignmentRoundUp(message_data_size +
Brian Silverman0eaa1da2020-08-12 20:03:52 -0700451 kChannelDataRedzone * 2 +
Brian Silvermana1652f32020-01-29 20:41:44 -0800452 (kChannelDataAlignment - 1)) +
Austin Schuh4bc4f902019-12-23 18:04:51 -0800453 sizeof(Message);
454}
455
Austin Schuh20b2b082019-09-11 20:42:56 -0700456size_t LocklessQueueMemorySize(LocklessQueueConfiguration config) {
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800457 // Round up the message size so following data is aligned appropriately.
458 config.message_data_size =
459 LocklessQueueMemory::AlignmentRoundUp(config.message_data_size);
Austin Schuh20b2b082019-09-11 20:42:56 -0700460
461 // As we build up the size, confirm that everything is aligned to the
462 // alignment requirements of the type.
463 size_t size = sizeof(LocklessQueueMemory);
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800464 CHECK_EQ(size % alignof(LocklessQueueMemory), 0u);
Austin Schuh20b2b082019-09-11 20:42:56 -0700465
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800466 CHECK_EQ(size % alignof(AtomicIndex), 0u);
Austin Schuh20b2b082019-09-11 20:42:56 -0700467 size += LocklessQueueMemory::SizeOfQueue(config);
468
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800469 CHECK_EQ(size % alignof(Message), 0u);
Austin Schuh20b2b082019-09-11 20:42:56 -0700470 size += LocklessQueueMemory::SizeOfMessages(config);
471
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800472 CHECK_EQ(size % alignof(Watcher), 0u);
Austin Schuh20b2b082019-09-11 20:42:56 -0700473 size += LocklessQueueMemory::SizeOfWatchers(config);
474
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800475 CHECK_EQ(size % alignof(Sender), 0u);
Austin Schuh20b2b082019-09-11 20:42:56 -0700476 size += LocklessQueueMemory::SizeOfSenders(config);
477
Brian Silverman177567e2020-08-12 19:51:33 -0700478 CHECK_EQ(size % alignof(Pinner), 0u);
479 size += LocklessQueueMemory::SizeOfPinners(config);
480
Austin Schuh20b2b082019-09-11 20:42:56 -0700481 return size;
482}
483
Brian Silverman0eaa1da2020-08-12 20:03:52 -0700484// Calculates the starting byte for a redzone in shared memory. This starting
485// value is simply incremented for subsequent bytes.
486//
487// The result is based on the offset of the region in shared memor, to ensure it
488// is the same for each region when we generate and verify, but different for
489// each region to help catch forms of corruption like copying out-of-bounds data
490// from one place to another.
491//
492// memory is the base pointer to the shared memory. It is used to calculated
493// offsets. starting_data is the start of the redzone's data. Each one will
494// get a unique pattern.
495uint8_t RedzoneStart(const LocklessQueueMemory *memory,
496 const char *starting_data) {
497 const auto memory_int = reinterpret_cast<uintptr_t>(memory);
498 const auto starting_int = reinterpret_cast<uintptr_t>(starting_data);
499 DCHECK(starting_int >= memory_int);
500 DCHECK(starting_int < memory_int + LocklessQueueMemorySize(memory->config));
501 const uintptr_t starting_offset = starting_int - memory_int;
502 // Just XOR the lower 2 bytes. They higher-order bytes are probably 0
503 // anyways.
504 return (starting_offset & 0xFF) ^ ((starting_offset >> 8) & 0xFF);
505}
506
507// Returns true if the given redzone has invalid data.
508bool CheckRedzone(const LocklessQueueMemory *memory,
509 absl::Span<const char> redzone) {
510 uint8_t redzone_value = RedzoneStart(memory, redzone.data());
511
512 bool bad = false;
513
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700514 for (size_t i = 0; i < redzone.size() && !bad; ++i) {
Brian Silverman0eaa1da2020-08-12 20:03:52 -0700515 if (memcmp(&redzone[i], &redzone_value, 1)) {
516 bad = true;
517 }
518 ++redzone_value;
519 }
520
521 return bad;
522}
523
524// Returns true if either of message's redzones has invalid data.
525bool CheckBothRedzones(const LocklessQueueMemory *memory,
526 const Message *message) {
527 return CheckRedzone(memory,
528 message->PreRedzone(memory->message_data_size())) ||
529 CheckRedzone(memory, message->PostRedzone(memory->message_data_size(),
530 memory->message_size()));
531}
532
533// Fills the given redzone with the expected data.
534void FillRedzone(LocklessQueueMemory *memory, absl::Span<char> redzone) {
535 uint8_t redzone_value = RedzoneStart(memory, redzone.data());
536 for (size_t i = 0; i < redzone.size(); ++i) {
537 memcpy(&redzone[i], &redzone_value, 1);
538 ++redzone_value;
539 }
540
541 // Just double check that the implementations match.
542 CHECK(!CheckRedzone(memory, redzone));
543}
544
Austin Schuh20b2b082019-09-11 20:42:56 -0700545LocklessQueueMemory *InitializeLocklessQueueMemory(
546 LocklessQueueMemory *memory, LocklessQueueConfiguration config) {
547 // Everything should be zero initialized already. So we just need to fill
548 // everything out properly.
549
Brian Silvermanc57ff0a2020-04-28 16:45:13 -0700550 // This is the UID we will use for checking signal-sending permission
551 // compatibility.
552 //
553 // The manpage says:
554 // For a process to have permission to send a signal, it must either be
555 // privileged [...], or the real or effective user ID of the sending process
556 // must equal the real or saved set-user-ID of the target process.
557 //
558 // Processes typically initialize a queue in random order as they start up.
559 // This means we need an algorithm for verifying all processes have
560 // permissions to send each other signals which gives the same answer no
561 // matter what order they attach in. We would also like to avoid maintaining a
562 // shared list of the UIDs of all processes.
563 //
564 // To do this while still giving sufficient flexibility for all current use
565 // cases, we track a single UID for the queue. All processes with a matching
566 // euid+suid must have this UID. Any processes with distinct euid/suid must
567 // instead have a matching ruid. This guarantees signals can be sent between
568 // all processes attached to the queue.
569 //
570 // In particular, this allows a process to change only its euid (to interact
571 // with a queue) while still maintaining privileges via its ruid. However, it
572 // can only use privileges in ways that do not require changing the euid back,
573 // because while the euid is different it will not be able to receive signals.
574 // We can't actually verify that, but we can sanity check that things are
575 // valid when the queue is initialized.
576
577 uid_t uid;
578 {
579 uid_t ruid, euid, suid;
580 PCHECK(getresuid(&ruid, &euid, &suid) == 0);
581 // If these are equal, then use them, even if that's different from the real
582 // UID. This allows processes to keep a real UID of 0 (to have permissions
583 // to perform system-level changes) while still being able to communicate
584 // with processes running unprivileged as a distinct user.
585 if (euid == suid) {
586 uid = euid;
587 VLOG(1) << "Using euid==suid " << uid;
588 } else {
589 uid = ruid;
590 VLOG(1) << "Using ruid " << ruid;
591 }
592 }
593
Austin Schuh20b2b082019-09-11 20:42:56 -0700594 // Grab the mutex. We don't care if the previous reader died. We are going
595 // to check everything anyways.
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800596 GrabQueueSetupLockOrDie grab_queue_setup_lock(memory);
Austin Schuh20b2b082019-09-11 20:42:56 -0700597
598 if (!memory->initialized) {
599 // TODO(austin): Check these for out of bounds.
600 memory->config.num_watchers = config.num_watchers;
601 memory->config.num_senders = config.num_senders;
Brian Silverman177567e2020-08-12 19:51:33 -0700602 memory->config.num_pinners = config.num_pinners;
Austin Schuh20b2b082019-09-11 20:42:56 -0700603 memory->config.queue_size = config.queue_size;
Austin Schuh4bc4f902019-12-23 18:04:51 -0800604 memory->config.message_data_size = config.message_data_size;
Austin Schuh20b2b082019-09-11 20:42:56 -0700605
606 const size_t num_messages = memory->num_messages();
607 // There need to be at most MaxMessages() messages allocated.
608 CHECK_LE(num_messages, Index::MaxMessages());
609
610 for (size_t i = 0; i < num_messages; ++i) {
Brian Silverman0eaa1da2020-08-12 20:03:52 -0700611 Message *const message =
612 memory->GetMessage(Index(QueueIndex::Zero(memory->queue_size()), i));
613 message->header.queue_index.Invalidate();
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700614 message->header.monotonic_sent_time = monotonic_clock::min_time;
Brian Silverman0eaa1da2020-08-12 20:03:52 -0700615 FillRedzone(memory, message->PreRedzone(memory->message_data_size()));
616 FillRedzone(memory, message->PostRedzone(memory->message_data_size(),
617 memory->message_size()));
Austin Schuh20b2b082019-09-11 20:42:56 -0700618 }
619
620 for (size_t i = 0; i < memory->queue_size(); ++i) {
621 // Make the initial counter be the furthest away number. That means that
622 // index 0 should be 0xffff, 1 should be 0, etc.
623 memory->GetQueue(i)->Store(Index(QueueIndex::Zero(memory->queue_size())
624 .IncrementBy(i)
625 .DecrementBy(memory->queue_size()),
626 i));
627 }
628
629 memory->next_queue_index.Invalidate();
Brian Silvermanc57ff0a2020-04-28 16:45:13 -0700630 memory->uid = uid;
Austin Schuh20b2b082019-09-11 20:42:56 -0700631
632 for (size_t i = 0; i < memory->num_senders(); ++i) {
633 ::aos::ipc_lib::Sender *s = memory->GetSender(i);
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800634 // Nobody else can possibly be touching these because we haven't set
635 // initialized to true yet.
Austin Schuh83cbb1e2023-06-23 12:59:02 -0700636 s->scratch_index.RelaxedStore(
637 Index(QueueIndex::Invalid(), i + memory->queue_size()));
Austin Schuh20b2b082019-09-11 20:42:56 -0700638 s->to_replace.RelaxedInvalidate();
639 }
640
Brian Silverman177567e2020-08-12 19:51:33 -0700641 for (size_t i = 0; i < memory->num_pinners(); ++i) {
642 ::aos::ipc_lib::Pinner *pinner = memory->GetPinner(i);
643 // Nobody else can possibly be touching these because we haven't set
644 // initialized to true yet.
645 pinner->scratch_index.RelaxedStore(
Austin Schuh83cbb1e2023-06-23 12:59:02 -0700646 Index(QueueIndex::Invalid(),
647 i + memory->num_senders() + memory->queue_size()));
Brian Silverman177567e2020-08-12 19:51:33 -0700648 pinner->pinned.Invalidate();
649 }
650
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800651 aos_compiler_memory_barrier();
Austin Schuh20b2b082019-09-11 20:42:56 -0700652 // Signal everything is done. This needs to be done last, so if we die, we
653 // redo initialization.
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800654 memory->initialized = true;
Austin Schuh3328d132020-02-28 13:54:57 -0800655 } else {
Brennan Coslett6af53bb2023-07-18 15:22:46 -0500656 if (memory->uid != uid) {
657 // Subsequent calls to getpwuid() overwrite this
658 // pointer, pull the thing we care about into a
659 // string.
660 struct passwd const *user_pw = getpwuid(uid);
661 std::string user_username = user_pw->pw_name;
662 struct passwd const *memory_pw = getpwuid(memory->uid);
663 std::string memory_username = memory_pw->pw_name;
664 LOG(FATAL) << "Current user " << user_username << " (uid:" << uid << ") "
665 << "doesn't match shared memory user " << memory_username
666 << " (uid:" << memory->uid << "). "
Philipp Schrader5f832612023-08-21 10:29:57 -0700667 << "Log in as " << memory_username
Brennan Coslett6af53bb2023-07-18 15:22:46 -0500668 << " user to access this channel.";
669 }
Austin Schuh20b2b082019-09-11 20:42:56 -0700670 }
671
Austin Schuh20b2b082019-09-11 20:42:56 -0700672 return memory;
673}
674
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700675void LocklessQueue::Initialize() {
676 InitializeLocklessQueueMemory(memory_, config_);
677}
Austin Schuh20b2b082019-09-11 20:42:56 -0700678
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700679LocklessQueueWatcher::~LocklessQueueWatcher() {
680 if (watcher_index_ == -1) {
681 return;
682 }
Austin Schuh20b2b082019-09-11 20:42:56 -0700683
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700684 // Since everything is self consistent, all we need to do is make sure nobody
685 // else is running. Someone dying will get caught in the generic consistency
686 // check.
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800687 GrabQueueSetupLockOrDie grab_queue_setup_lock(memory_);
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700688
689 // Make sure we are registered.
690 CHECK_NE(watcher_index_, -1);
691
692 // Make sure we still own the slot we are supposed to.
Philipp Schraderab2f8432023-09-17 18:58:06 -0700693 CHECK(memory_->GetWatcher(watcher_index_)->ownership_tracker.IsHeldBySelf());
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700694
695 // The act of unlocking invalidates the entry. Invalidate it.
Philipp Schraderab2f8432023-09-17 18:58:06 -0700696 memory_->GetWatcher(watcher_index_)->ownership_tracker.Release();
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700697 // And internally forget the slot.
698 watcher_index_ = -1;
699
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800700 // Cleanup is cheap. The next user will do it anyways, so no need for us to do
701 // anything right now.
Austin Schuh20b2b082019-09-11 20:42:56 -0700702
703 // And confirm that nothing is owned by us.
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700704 const int num_watchers = memory_->num_watchers();
Austin Schuh20b2b082019-09-11 20:42:56 -0700705 for (int i = 0; i < num_watchers; ++i) {
Philipp Schraderab2f8432023-09-17 18:58:06 -0700706 CHECK(!memory_->GetWatcher(i)->ownership_tracker.IsHeldBySelf())
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700707 << ": " << i;
Austin Schuh20b2b082019-09-11 20:42:56 -0700708 }
Austin Schuh20b2b082019-09-11 20:42:56 -0700709}
710
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700711std::optional<LocklessQueueWatcher> LocklessQueueWatcher::Make(
712 LocklessQueue queue, int priority) {
713 queue.Initialize();
714 LocklessQueueWatcher result(queue.memory(), priority);
715 if (result.watcher_index_ != -1) {
James Kuszmaul9776b392023-01-14 14:08:08 -0800716 return result;
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700717 } else {
718 return std::nullopt;
719 }
720}
Austin Schuh20b2b082019-09-11 20:42:56 -0700721
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700722LocklessQueueWatcher::LocklessQueueWatcher(LocklessQueueMemory *memory,
723 int priority)
724 : memory_(memory) {
Austin Schuh20b2b082019-09-11 20:42:56 -0700725 // TODO(austin): Make sure signal coalescing is turned on. We don't need
726 // duplicates. That will improve performance under high load.
727
728 // Since everything is self consistent, all we need to do is make sure nobody
729 // else is running. Someone dying will get caught in the generic consistency
730 // check.
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800731 GrabQueueSetupLockOrDie grab_queue_setup_lock(memory_);
Austin Schuh20b2b082019-09-11 20:42:56 -0700732 const int num_watchers = memory_->num_watchers();
733
734 // Now, find the first empty watcher and grab it.
735 CHECK_EQ(watcher_index_, -1);
736 for (int i = 0; i < num_watchers; ++i) {
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800737 // If we see a slot the kernel has marked as dead, everything we do reusing
738 // it needs to happen-after whatever that process did before dying.
Philipp Schraderab2f8432023-09-17 18:58:06 -0700739 auto *const ownership_tracker =
740 &(memory_->GetWatcher(i)->ownership_tracker);
Philipp Schrader81fa3fb2023-09-17 18:58:35 -0700741 if (ownership_tracker->LoadAcquire().IsUnclaimed() ||
742 ownership_tracker->OwnerIsDefinitelyAbsolutelyDead()) {
Austin Schuh20b2b082019-09-11 20:42:56 -0700743 watcher_index_ = i;
Brian Silverman2484eea2019-12-21 16:48:46 -0800744 // Relaxed is OK here because we're the only task going to touch it
745 // between here and the write in death_notification_init below (other
746 // recovery is blocked by us holding the setup lock).
Philipp Schraderab2f8432023-09-17 18:58:06 -0700747 ownership_tracker->ForceClear();
Austin Schuh20b2b082019-09-11 20:42:56 -0700748 break;
749 }
750 }
751
752 // Bail if we failed to find an open slot.
753 if (watcher_index_ == -1) {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700754 return;
Austin Schuh20b2b082019-09-11 20:42:56 -0700755 }
756
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700757 Watcher *const w = memory_->GetWatcher(watcher_index_);
Austin Schuh20b2b082019-09-11 20:42:56 -0700758
759 w->pid = getpid();
760 w->priority = priority;
761
762 // Grabbing a mutex is a compiler and memory barrier, so nothing before will
763 // get rearranged afterwords.
Philipp Schraderab2f8432023-09-17 18:58:06 -0700764 w->ownership_tracker.Acquire();
Austin Schuh20b2b082019-09-11 20:42:56 -0700765}
766
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700767LocklessQueueWakeUpper::LocklessQueueWakeUpper(LocklessQueue queue)
768 : memory_(queue.const_memory()), pid_(getpid()), uid_(getuid()) {
769 queue.Initialize();
770 watcher_copy_.resize(memory_->num_watchers());
Austin Schuh20b2b082019-09-11 20:42:56 -0700771}
772
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700773int LocklessQueueWakeUpper::Wakeup(const int current_priority) {
Austin Schuh20b2b082019-09-11 20:42:56 -0700774 const size_t num_watchers = memory_->num_watchers();
775
776 CHECK_EQ(watcher_copy_.size(), num_watchers);
777
778 // Grab a copy so it won't change out from underneath us, and we can sort it
779 // nicely in C++.
780 // Do note that there is still a window where the process can die *after* we
781 // read everything. We will still PI boost and send a signal to the thread in
782 // question. There is no way without pidfd's to close this window, and
783 // creating a pidfd is likely not RT.
784 for (size_t i = 0; i < num_watchers; ++i) {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700785 const Watcher *w = memory_->GetWatcher(i);
Philipp Schraderab2f8432023-09-17 18:58:06 -0700786 watcher_copy_[i].ownership_snapshot = w->ownership_tracker.LoadRelaxed();
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800787 // Force the load of the TID to come first.
788 aos_compiler_memory_barrier();
789 watcher_copy_[i].pid = w->pid.load(std::memory_order_relaxed);
790 watcher_copy_[i].priority = w->priority.load(std::memory_order_relaxed);
Austin Schuh20b2b082019-09-11 20:42:56 -0700791
792 // Use a priority of -1 to mean an invalid entry to make sorting easier.
Philipp Schraderab2f8432023-09-17 18:58:06 -0700793 if (watcher_copy_[i].ownership_snapshot.OwnerIsDead() ||
794 watcher_copy_[i].ownership_snapshot.IsUnclaimed()) {
Austin Schuh20b2b082019-09-11 20:42:56 -0700795 watcher_copy_[i].priority = -1;
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800796 } else {
797 // Ensure all of this happens after we're done looking at the pid+priority
798 // in shared memory.
799 aos_compiler_memory_barrier();
Philipp Schraderab2f8432023-09-17 18:58:06 -0700800 if (watcher_copy_[i].ownership_snapshot !=
801 w->ownership_tracker.LoadRelaxed()) {
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800802 // Confirm that the watcher hasn't been re-used and modified while we
803 // read it. If it has, mark it invalid again.
804 watcher_copy_[i].priority = -1;
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800805 }
Austin Schuh20b2b082019-09-11 20:42:56 -0700806 }
807 }
808
809 // Now sort.
810 ::std::sort(watcher_copy_.begin(), watcher_copy_.end(),
811 [](const WatcherCopy &a, const WatcherCopy &b) {
812 return a.priority > b.priority;
813 });
814
815 int count = 0;
816 if (watcher_copy_[0].priority != -1) {
817 const int max_priority =
818 ::std::max(current_priority, watcher_copy_[0].priority);
819 // Boost if we are RT and there is a higher priority sender out there.
820 // Otherwise we might run into priority inversions.
821 if (max_priority > current_priority && current_priority > 0) {
Austin Schuh151f7822024-03-16 12:52:32 -0700822 // Inline the setscheduler call rather than using aos/realtime.h. This is
823 // quite performance sensitive, and halves the time needed to send a
824 // message when pi boosting is in effect.
Austin Schuh99f7c6a2024-06-25 22:07:44 -0700825 if (!absl::GetFlag(FLAGS_skip_realtime_scheduler)) {
Austin Schuh151f7822024-03-16 12:52:32 -0700826 // TODO(austin): Do we need to boost the soft limit here too like we
827 // were before?
828 struct sched_param param;
829 param.sched_priority = max_priority;
830 PCHECK(sched_setscheduler(0, SCHED_FIFO, &param) == 0)
831 << ": changing to SCHED_FIFO with " << max_priority
832 << ", if you want to bypass this check for testing, use "
833 "--skip_realtime_scheduler";
834 }
Austin Schuh20b2b082019-09-11 20:42:56 -0700835 }
836
837 // Build up the siginfo to send.
838 siginfo_t uinfo;
839 memset(&uinfo, 0, sizeof(uinfo));
840
841 uinfo.si_code = SI_QUEUE;
842 uinfo.si_pid = pid_;
843 uinfo.si_uid = uid_;
844 uinfo.si_value.sival_int = 0;
845
846 for (const WatcherCopy &watcher_copy : watcher_copy_) {
847 // The first -1 priority means we are at the end of the valid list.
848 if (watcher_copy.priority == -1) {
849 break;
850 }
851
852 // Send the signal. Target just the thread that sent it so that we can
853 // support multiple watchers in a process (when someone creates multiple
854 // event loops in different threads).
Philipp Schraderab2f8432023-09-17 18:58:06 -0700855 rt_tgsigqueueinfo(watcher_copy.pid, watcher_copy.ownership_snapshot.tid(),
856 kWakeupSignal, &uinfo);
Austin Schuh20b2b082019-09-11 20:42:56 -0700857
858 ++count;
859 }
860
861 // Drop back down if we were boosted.
862 if (max_priority > current_priority && current_priority > 0) {
Austin Schuh99f7c6a2024-06-25 22:07:44 -0700863 if (!absl::GetFlag(FLAGS_skip_realtime_scheduler)) {
Austin Schuh151f7822024-03-16 12:52:32 -0700864 struct sched_param param;
865 param.sched_priority = current_priority;
866 PCHECK(sched_setscheduler(0, SCHED_FIFO, &param) == 0)
867 << ": changing to SCHED_FIFO with " << max_priority
868 << ", if you want to bypass this check for testing, use "
869 "--skip_realtime_scheduler";
870 }
Austin Schuh20b2b082019-09-11 20:42:56 -0700871 }
872 }
873
874 return count;
875}
876
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700877std::ostream &operator<<(std::ostream &os,
878 const LocklessQueueSender::Result r) {
879 os << static_cast<int>(r);
880 return os;
881}
882
883LocklessQueueSender::LocklessQueueSender(
884 LocklessQueueMemory *memory,
885 monotonic_clock::duration channel_storage_duration)
886 : memory_(memory), channel_storage_duration_(channel_storage_duration) {
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800887 GrabQueueSetupLockOrDie grab_queue_setup_lock(memory_);
Austin Schuh20b2b082019-09-11 20:42:56 -0700888
889 // Since we already have the lock, go ahead and try cleaning up.
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800890 Cleanup(memory_, grab_queue_setup_lock);
Austin Schuh20b2b082019-09-11 20:42:56 -0700891
892 const int num_senders = memory_->num_senders();
893
894 for (int i = 0; i < num_senders; ++i) {
895 ::aos::ipc_lib::Sender *s = memory->GetSender(i);
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800896 // This doesn't need synchronization because we're the only process doing
897 // initialization right now, and nobody else will be touching senders which
898 // we're interested in.
Philipp Schraderab2f8432023-09-17 18:58:06 -0700899 if (s->ownership_tracker.LoadRelaxed().IsUnclaimed()) {
Austin Schuh20b2b082019-09-11 20:42:56 -0700900 sender_index_ = i;
901 break;
902 }
903 }
904
905 if (sender_index_ == -1) {
Austin Schuhe516ab02020-05-06 21:37:04 -0700906 VLOG(1) << "Too many senders, starting to bail.";
907 return;
Austin Schuh20b2b082019-09-11 20:42:56 -0700908 }
909
Brian Silverman177567e2020-08-12 19:51:33 -0700910 ::aos::ipc_lib::Sender *const sender = memory_->GetSender(sender_index_);
Austin Schuh20b2b082019-09-11 20:42:56 -0700911
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800912 // Indicate that we are now alive by taking over the slot. If the previous
913 // owner died, we still want to do this.
Philipp Schraderab2f8432023-09-17 18:58:06 -0700914 sender->ownership_tracker.Acquire();
Brian Silverman177567e2020-08-12 19:51:33 -0700915
916 const Index scratch_index = sender->scratch_index.RelaxedLoad();
917 Message *const message = memory_->GetMessage(scratch_index);
918 CHECK(!message->header.queue_index.RelaxedLoad(memory_->queue_size()).valid())
919 << ": " << std::hex << scratch_index.get();
Austin Schuh20b2b082019-09-11 20:42:56 -0700920}
921
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700922LocklessQueueSender::~LocklessQueueSender() {
923 if (sender_index_ != -1) {
924 CHECK(memory_ != nullptr);
Philipp Schraderab2f8432023-09-17 18:58:06 -0700925 memory_->GetSender(sender_index_)->ownership_tracker.Release();
Austin Schuh20b2b082019-09-11 20:42:56 -0700926 }
927}
928
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700929std::optional<LocklessQueueSender> LocklessQueueSender::Make(
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700930 LocklessQueue queue, monotonic_clock::duration channel_storage_duration) {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700931 queue.Initialize();
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700932 LocklessQueueSender result(queue.memory(), channel_storage_duration);
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700933 if (result.sender_index_ != -1) {
James Kuszmaul9776b392023-01-14 14:08:08 -0800934 return result;
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700935 } else {
936 return std::nullopt;
937 }
938}
Alex Perrycb7da4b2019-08-28 19:35:56 -0700939
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700940size_t LocklessQueueSender::size() const {
941 return memory_->message_data_size();
942}
943
944void *LocklessQueueSender::Data() {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700945 ::aos::ipc_lib::Sender *sender = memory_->GetSender(sender_index_);
Brian Silverman177567e2020-08-12 19:51:33 -0700946 const Index scratch_index = sender->scratch_index.RelaxedLoad();
947 Message *const message = memory_->GetMessage(scratch_index);
948 // We should have invalidated this when we first got the buffer. Verify that
949 // in debug mode.
950 DCHECK(
951 !message->header.queue_index.RelaxedLoad(memory_->queue_size()).valid())
952 << ": " << std::hex << scratch_index.get();
Alex Perrycb7da4b2019-08-28 19:35:56 -0700953
Brian Silvermana1652f32020-01-29 20:41:44 -0800954 return message->data(memory_->message_data_size());
Alex Perrycb7da4b2019-08-28 19:35:56 -0700955}
956
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700957LocklessQueueSender::Result LocklessQueueSender::Send(
Austin Schuhad154822019-12-27 15:45:13 -0800958 const char *data, size_t length,
Austin Schuhb5c6f972021-03-14 21:53:07 -0700959 monotonic_clock::time_point monotonic_remote_time,
960 realtime_clock::time_point realtime_remote_time,
Austin Schuhac6d89e2024-03-27 14:56:09 -0700961 monotonic_clock::time_point monotonic_remote_transmit_time,
Austin Schuha9012be2021-07-21 15:19:11 -0700962 uint32_t remote_queue_index, const UUID &source_boot_uuid,
Austin Schuhb5c6f972021-03-14 21:53:07 -0700963 monotonic_clock::time_point *monotonic_sent_time,
964 realtime_clock::time_point *realtime_sent_time, uint32_t *queue_index) {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700965 CHECK_LE(length, size());
Austin Schuh67420a42019-12-21 21:55:04 -0800966 // Flatbuffers write from the back of the buffer to the front. If we are
967 // going to write an explicit chunk of memory into the buffer, we need to
968 // adhere to this convention and place it at the end.
969 memcpy((reinterpret_cast<char *>(Data()) + size() - length), data, length);
Austin Schuh91ba6392020-10-03 13:27:47 -0700970 return Send(length, monotonic_remote_time, realtime_remote_time,
Austin Schuhac6d89e2024-03-27 14:56:09 -0700971 monotonic_remote_transmit_time, remote_queue_index,
972 source_boot_uuid, monotonic_sent_time, realtime_sent_time,
973 queue_index);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700974}
975
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700976LocklessQueueSender::Result LocklessQueueSender::Send(
Austin Schuhb5c6f972021-03-14 21:53:07 -0700977 size_t length, monotonic_clock::time_point monotonic_remote_time,
978 realtime_clock::time_point realtime_remote_time,
Austin Schuhac6d89e2024-03-27 14:56:09 -0700979 monotonic_clock::time_point monotonic_remote_transmit_time,
Austin Schuha9012be2021-07-21 15:19:11 -0700980 uint32_t remote_queue_index, const UUID &source_boot_uuid,
Austin Schuhb5c6f972021-03-14 21:53:07 -0700981 monotonic_clock::time_point *monotonic_sent_time,
982 realtime_clock::time_point *realtime_sent_time, uint32_t *queue_index) {
Austin Schuh20b2b082019-09-11 20:42:56 -0700983 const size_t queue_size = memory_->queue_size();
Alex Perrycb7da4b2019-08-28 19:35:56 -0700984 CHECK_LE(length, size());
Austin Schuh20b2b082019-09-11 20:42:56 -0700985
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800986 ::aos::ipc_lib::Sender *const sender = memory_->GetSender(sender_index_);
987 // We can do a relaxed load on our sender because we're the only person
988 // modifying it right now.
989 const Index scratch_index = sender->scratch_index.RelaxedLoad();
990 Message *const message = memory_->GetMessage(scratch_index);
Austin Schuh91ba6392020-10-03 13:27:47 -0700991 if (CheckBothRedzones(memory_, message)) {
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700992 return Result::INVALID_REDZONE;
Austin Schuh91ba6392020-10-03 13:27:47 -0700993 }
Austin Schuh20b2b082019-09-11 20:42:56 -0700994
Brian Silverman177567e2020-08-12 19:51:33 -0700995 // We should have invalidated this when we first got the buffer. Verify that
996 // in debug mode.
997 DCHECK(
998 !message->header.queue_index.RelaxedLoad(memory_->queue_size()).valid())
999 << ": " << std::hex << scratch_index.get();
1000
Austin Schuh20b2b082019-09-11 20:42:56 -07001001 message->header.length = length;
Austin Schuhad154822019-12-27 15:45:13 -08001002 // Pass these through. Any alternative behavior can be implemented out a
1003 // layer.
1004 message->header.remote_queue_index = remote_queue_index;
Austin Schuha9012be2021-07-21 15:19:11 -07001005 message->header.source_boot_uuid = source_boot_uuid;
Austin Schuhad154822019-12-27 15:45:13 -08001006 message->header.monotonic_remote_time = monotonic_remote_time;
1007 message->header.realtime_remote_time = realtime_remote_time;
Austin Schuhac6d89e2024-03-27 14:56:09 -07001008 message->header.monotonic_remote_transmit_time =
1009 monotonic_remote_transmit_time;
Austin Schuh20b2b082019-09-11 20:42:56 -07001010
Brian Silverman177567e2020-08-12 19:51:33 -07001011 Index to_replace = Index::Invalid();
Austin Schuh20b2b082019-09-11 20:42:56 -07001012 while (true) {
1013 const QueueIndex actual_next_queue_index =
1014 memory_->next_queue_index.Load(queue_size);
1015 const QueueIndex next_queue_index = ZeroOrValid(actual_next_queue_index);
1016
1017 const QueueIndex incremented_queue_index = next_queue_index.Increment();
1018
Brian Silvermanfafe1fa2019-12-18 21:42:18 -08001019 // This needs to synchronize with whoever the previous writer at this
1020 // location was.
Brian Silverman177567e2020-08-12 19:51:33 -07001021 to_replace = memory_->LoadIndex(next_queue_index);
Austin Schuh20b2b082019-09-11 20:42:56 -07001022
1023 const QueueIndex decremented_queue_index =
1024 next_queue_index.DecrementBy(queue_size);
1025
1026 // See if we got beat. If we did, try to atomically update
1027 // next_queue_index in case the previous writer failed and retry.
1028 if (!to_replace.IsPlausible(decremented_queue_index)) {
1029 // We don't care about the result. It will either succeed, or we got
1030 // beat in fixing it and just need to give up and try again. If we got
1031 // beat multiple times, the only way progress can be made is if the queue
1032 // is updated as well. This means that if we retry reading
1033 // next_queue_index, we will be at most off by one and can retry.
1034 //
1035 // Both require no further action from us.
1036 //
1037 // TODO(austin): If we are having fairness issues under contention, we
1038 // could have a mode bit in next_queue_index, and could use a lock or some
1039 // other form of PI boosting to let the higher priority task win.
1040 memory_->next_queue_index.CompareAndExchangeStrong(
1041 actual_next_queue_index, incremented_queue_index);
1042
Alex Perrycb7da4b2019-08-28 19:35:56 -07001043 VLOG(3) << "We were beat. Try again. Was " << std::hex
1044 << to_replace.get() << ", is " << decremented_queue_index.index();
Austin Schuh20b2b082019-09-11 20:42:56 -07001045 continue;
1046 }
1047
1048 // Confirm that the message is what it should be.
Brian Silverman177567e2020-08-12 19:51:33 -07001049 //
1050 // This is just a best-effort check to skip reading the clocks if possible.
1051 // If this fails, then the compare-exchange below definitely would, so we
1052 // can bail out now.
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -07001053 const Message *message_to_replace = memory_->GetMessage(to_replace);
1054 bool is_previous_index_valid = false;
Austin Schuh20b2b082019-09-11 20:42:56 -07001055 {
Austin Schuh20b2b082019-09-11 20:42:56 -07001056 const QueueIndex previous_index =
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -07001057 message_to_replace->header.queue_index.RelaxedLoad(queue_size);
1058 is_previous_index_valid = previous_index.valid();
1059 if (previous_index != decremented_queue_index &&
1060 is_previous_index_valid) {
Austin Schuh20b2b082019-09-11 20:42:56 -07001061 // Retry.
Alex Perrycb7da4b2019-08-28 19:35:56 -07001062 VLOG(3) << "Something fishy happened, queue index doesn't match. "
1063 "Retrying. Previous index was "
1064 << std::hex << previous_index.index() << ", should be "
1065 << decremented_queue_index.index();
Austin Schuh20b2b082019-09-11 20:42:56 -07001066 continue;
1067 }
1068 }
1069
1070 message->header.monotonic_sent_time = ::aos::monotonic_clock::now();
1071 message->header.realtime_sent_time = ::aos::realtime_clock::now();
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -07001072
Austin Schuhad154822019-12-27 15:45:13 -08001073 if (monotonic_sent_time != nullptr) {
1074 *monotonic_sent_time = message->header.monotonic_sent_time;
1075 }
1076 if (realtime_sent_time != nullptr) {
1077 *realtime_sent_time = message->header.realtime_sent_time;
1078 }
1079 if (queue_index != nullptr) {
1080 *queue_index = next_queue_index.index();
1081 }
Austin Schuh20b2b082019-09-11 20:42:56 -07001082
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -07001083 const auto to_replace_monotonic_sent_time =
1084 message_to_replace->header.monotonic_sent_time;
1085
1086 // If we are overwriting a message sent in the last
1087 // channel_storage_duration_, that means that we would be sending more than
1088 // queue_size messages and would therefore be sending too fast. If the
1089 // previous index is not valid then the message hasn't been filled out yet
1090 // so we aren't sending too fast. And, if it is not less than the sent time
1091 // of the message that we are going to write, someone else beat us and the
1092 // compare and exchange below will fail.
1093 if (is_previous_index_valid &&
1094 (to_replace_monotonic_sent_time <
1095 message->header.monotonic_sent_time) &&
1096 (message->header.monotonic_sent_time - to_replace_monotonic_sent_time <
1097 channel_storage_duration_)) {
1098 // There is a possibility that another context beat us to writing out the
1099 // message in the queue, but we beat that context to acquiring the sent
1100 // time. In this case our sent time is *greater than* the other context's
1101 // sent time. Therefore, we can check if we got beat filling out this
1102 // message *after* doing the above check to determine if we hit this edge
1103 // case. Otherwise, messages are being sent too fast.
1104 const QueueIndex previous_index =
1105 message_to_replace->header.queue_index.Load(queue_size);
1106 if (previous_index != decremented_queue_index && previous_index.valid()) {
1107 VLOG(3) << "Got beat during check for messages being sent too fast"
1108 "Retrying.";
1109 continue;
1110 } else {
Austin Schuh71e72142023-05-03 13:10:07 -07001111 VLOG(1) << "Messages sent too fast. Returning. Attempted index: "
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -07001112 << decremented_queue_index.index()
1113 << " message sent time: " << message->header.monotonic_sent_time
1114 << " message to replace sent time: "
1115 << to_replace_monotonic_sent_time;
Austin Schuh71e72142023-05-03 13:10:07 -07001116
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -07001117 // Since we are not using the message obtained from scratch_index
1118 // and we are not retrying, we need to invalidate its queue_index.
1119 message->header.queue_index.Invalidate();
1120 return Result::MESSAGES_SENT_TOO_FAST;
1121 }
1122 }
1123
Austin Schuh20b2b082019-09-11 20:42:56 -07001124 // Before we are fully done filling out the message, update the Sender state
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -07001125 // with the new index to write. This re-uses the barrier for the
Austin Schuh20b2b082019-09-11 20:42:56 -07001126 // queue_index store.
Alex Perrycb7da4b2019-08-28 19:35:56 -07001127 const Index index_to_write(next_queue_index, scratch_index.message_index());
Austin Schuh20b2b082019-09-11 20:42:56 -07001128
Brian Silvermanfafe1fa2019-12-18 21:42:18 -08001129 aos_compiler_memory_barrier();
1130 // We're the only person who cares about our scratch index, besides somebody
1131 // cleaning up after us.
Austin Schuh20b2b082019-09-11 20:42:56 -07001132 sender->scratch_index.RelaxedStore(index_to_write);
Brian Silvermanfafe1fa2019-12-18 21:42:18 -08001133 aos_compiler_memory_barrier();
Austin Schuh20b2b082019-09-11 20:42:56 -07001134
1135 message->header.queue_index.Store(next_queue_index);
1136
Brian Silvermanfafe1fa2019-12-18 21:42:18 -08001137 aos_compiler_memory_barrier();
Austin Schuh20b2b082019-09-11 20:42:56 -07001138 // The message is now filled out, and we have a confirmed slot to store
1139 // into.
1140 //
1141 // Start by writing down what we are going to pull out of the queue. This
Brian Silvermanfafe1fa2019-12-18 21:42:18 -08001142 // was Invalid before now. Only person who will read this is whoever cleans
1143 // up after us, so no synchronization necessary.
Austin Schuh20b2b082019-09-11 20:42:56 -07001144 sender->to_replace.RelaxedStore(to_replace);
Brian Silvermanfafe1fa2019-12-18 21:42:18 -08001145 aos_compiler_memory_barrier();
Austin Schuh20b2b082019-09-11 20:42:56 -07001146
1147 // Then exchange the next index into the queue.
1148 if (!memory_->GetQueue(next_queue_index.Wrapped())
1149 ->CompareAndExchangeStrong(to_replace, index_to_write)) {
1150 // Aw, didn't succeed. Retry.
1151 sender->to_replace.RelaxedInvalidate();
Brian Silvermanfafe1fa2019-12-18 21:42:18 -08001152 aos_compiler_memory_barrier();
Alex Perrycb7da4b2019-08-28 19:35:56 -07001153 VLOG(3) << "Failed to wrap into queue";
Austin Schuh20b2b082019-09-11 20:42:56 -07001154 continue;
1155 }
1156
1157 // Then update next_queue_index to save the next user some computation time.
1158 memory_->next_queue_index.CompareAndExchangeStrong(actual_next_queue_index,
1159 incremented_queue_index);
1160
Brian Silvermanfafe1fa2019-12-18 21:42:18 -08001161 aos_compiler_memory_barrier();
Austin Schuh20b2b082019-09-11 20:42:56 -07001162 // Now update the scratch space and record that we succeeded.
1163 sender->scratch_index.Store(to_replace);
Brian Silvermanfafe1fa2019-12-18 21:42:18 -08001164 aos_compiler_memory_barrier();
1165 // And then record that we succeeded, but definitely after the above store.
Austin Schuh20b2b082019-09-11 20:42:56 -07001166 sender->to_replace.RelaxedInvalidate();
Brian Silverman177567e2020-08-12 19:51:33 -07001167
Austin Schuh20b2b082019-09-11 20:42:56 -07001168 break;
1169 }
Brian Silverman177567e2020-08-12 19:51:33 -07001170
Brian Silverman0eaa1da2020-08-12 20:03:52 -07001171 DCHECK(!CheckBothRedzones(memory_, memory_->GetMessage(to_replace)))
1172 << ": Invalid message found in shared memory";
Brian Silverman177567e2020-08-12 19:51:33 -07001173 // to_replace is our current scratch_index. It isn't in the queue, which means
1174 // nobody new can pin it. They can set their `pinned` to it, but they will
1175 // back it out, so they don't count. This means that we just need to find a
1176 // message for which no pinner had it in `pinned`, and then we know this
1177 // message will never be pinned. We'll start with to_replace, and if that is
1178 // pinned then we'll look for a new one to use instead.
1179 const Index new_scratch =
1180 SwapPinnedSenderScratch(memory_, sender, to_replace);
Brian Silverman0eaa1da2020-08-12 20:03:52 -07001181 DCHECK(!CheckBothRedzones(
1182 memory_, memory_->GetMessage(sender->scratch_index.RelaxedLoad())))
1183 << ": Invalid message found in shared memory";
Brian Silverman177567e2020-08-12 19:51:33 -07001184
1185 // If anybody is looking at this message (they shouldn't be), then try telling
1186 // them about it (best-effort).
1187 memory_->GetMessage(new_scratch)->header.queue_index.RelaxedInvalidate();
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -07001188 return Result::GOOD;
Austin Schuh20b2b082019-09-11 20:42:56 -07001189}
1190
Brian Silvermanfc0d2e82020-08-12 19:58:35 -07001191int LocklessQueueSender::buffer_index() const {
Brian Silverman4f4e0612020-08-12 19:54:41 -07001192 ::aos::ipc_lib::Sender *const sender = memory_->GetSender(sender_index_);
1193 // We can do a relaxed load on our sender because we're the only person
1194 // modifying it right now.
1195 const Index scratch_index = sender->scratch_index.RelaxedLoad();
1196 return scratch_index.message_index();
1197}
1198
Brian Silvermanfc0d2e82020-08-12 19:58:35 -07001199LocklessQueuePinner::LocklessQueuePinner(
1200 LocklessQueueMemory *memory, const LocklessQueueMemory *const_memory)
1201 : memory_(memory), const_memory_(const_memory) {
1202 GrabQueueSetupLockOrDie grab_queue_setup_lock(memory_);
1203
1204 // Since we already have the lock, go ahead and try cleaning up.
1205 Cleanup(memory_, grab_queue_setup_lock);
1206
1207 const int num_pinners = memory_->num_pinners();
1208
1209 for (int i = 0; i < num_pinners; ++i) {
1210 ::aos::ipc_lib::Pinner *p = memory->GetPinner(i);
1211 // This doesn't need synchronization because we're the only process doing
1212 // initialization right now, and nobody else will be touching pinners which
1213 // we're interested in.
Philipp Schraderab2f8432023-09-17 18:58:06 -07001214 if (p->ownership_tracker.LoadRelaxed().IsUnclaimed()) {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -07001215 pinner_index_ = i;
1216 break;
1217 }
1218 }
1219
1220 if (pinner_index_ == -1) {
1221 VLOG(1) << "Too many pinners, starting to bail.";
1222 return;
1223 }
1224
1225 ::aos::ipc_lib::Pinner *p = memory_->GetPinner(pinner_index_);
1226 p->pinned.Invalidate();
1227
1228 // Indicate that we are now alive by taking over the slot. If the previous
1229 // owner died, we still want to do this.
Philipp Schraderab2f8432023-09-17 18:58:06 -07001230 p->ownership_tracker.Acquire();
Brian Silvermanfc0d2e82020-08-12 19:58:35 -07001231}
1232
1233LocklessQueuePinner::~LocklessQueuePinner() {
1234 if (pinner_index_ != -1) {
1235 CHECK(memory_ != nullptr);
1236 memory_->GetPinner(pinner_index_)->pinned.Invalidate();
1237 aos_compiler_memory_barrier();
Philipp Schraderab2f8432023-09-17 18:58:06 -07001238 memory_->GetPinner(pinner_index_)->ownership_tracker.Release();
Brian Silvermanfc0d2e82020-08-12 19:58:35 -07001239 }
1240}
1241
1242std::optional<LocklessQueuePinner> LocklessQueuePinner::Make(
1243 LocklessQueue queue) {
1244 queue.Initialize();
1245 LocklessQueuePinner result(queue.memory(), queue.const_memory());
1246 if (result.pinner_index_ != -1) {
James Kuszmaul9776b392023-01-14 14:08:08 -08001247 return result;
Brian Silvermanfc0d2e82020-08-12 19:58:35 -07001248 } else {
1249 return std::nullopt;
1250 }
1251}
1252
1253// This method doesn't mess with any scratch_index, so it doesn't have to worry
1254// about message ownership.
1255int LocklessQueuePinner::PinIndex(uint32_t uint32_queue_index) {
1256 const size_t queue_size = memory_->queue_size();
1257 const QueueIndex queue_index =
1258 QueueIndex::Zero(queue_size).IncrementBy(uint32_queue_index);
1259 ipc_lib::Pinner *const pinner = memory_->GetPinner(pinner_index_);
1260
1261 AtomicIndex *const queue_slot = memory_->GetQueue(queue_index.Wrapped());
1262
1263 // Indicate that we want to pin this message.
1264 pinner->pinned.Store(queue_index);
1265 aos_compiler_memory_barrier();
1266
1267 {
1268 const Index message_index = queue_slot->Load();
1269 Message *const message = memory_->GetMessage(message_index);
Brian Silverman0eaa1da2020-08-12 20:03:52 -07001270 DCHECK(!CheckBothRedzones(memory_, message))
1271 << ": Invalid message found in shared memory";
Brian Silvermanfc0d2e82020-08-12 19:58:35 -07001272
1273 const QueueIndex message_queue_index =
1274 message->header.queue_index.Load(queue_size);
1275 if (message_queue_index == queue_index) {
1276 VLOG(3) << "Eq: " << std::hex << message_queue_index.index();
1277 aos_compiler_memory_barrier();
1278 return message_index.message_index();
1279 }
1280 VLOG(3) << "Message reused: " << std::hex << message_queue_index.index()
1281 << ", " << queue_index.index();
1282 }
1283
1284 // Being down here means we asked to pin a message before realizing it's no
1285 // longer in the queue, so back that out now.
1286 pinner->pinned.Invalidate();
1287 VLOG(3) << "Unpinned: " << std::hex << queue_index.index();
1288 return -1;
1289}
1290
1291size_t LocklessQueuePinner::size() const {
1292 return const_memory_->message_data_size();
1293}
1294
1295const void *LocklessQueuePinner::Data() const {
1296 const size_t queue_size = const_memory_->queue_size();
1297 const ::aos::ipc_lib::Pinner *const pinner =
1298 const_memory_->GetPinner(pinner_index_);
1299 QueueIndex pinned = pinner->pinned.RelaxedLoad(queue_size);
1300 CHECK(pinned.valid());
1301 const Message *message = const_memory_->GetMessage(pinned);
1302
1303 return message->data(const_memory_->message_data_size());
1304}
1305
1306LocklessQueueReader::Result LocklessQueueReader::Read(
Austin Schuh20b2b082019-09-11 20:42:56 -07001307 uint32_t uint32_queue_index,
Austin Schuhb5c6f972021-03-14 21:53:07 -07001308 monotonic_clock::time_point *monotonic_sent_time,
1309 realtime_clock::time_point *realtime_sent_time,
1310 monotonic_clock::time_point *monotonic_remote_time,
Austin Schuhac6d89e2024-03-27 14:56:09 -07001311 monotonic_clock::time_point *monotonic_remote_transmit_time,
Austin Schuhb5c6f972021-03-14 21:53:07 -07001312 realtime_clock::time_point *realtime_remote_time,
Austin Schuha9012be2021-07-21 15:19:11 -07001313 uint32_t *remote_queue_index, UUID *source_boot_uuid, size_t *length,
Austin Schuhfaec51a2023-09-08 17:43:32 -07001314 char *data,
1315 std::function<bool(const Context &)> should_read_callback) const {
1316 const size_t queue_size = const_memory_->queue_size();
Austin Schuh20b2b082019-09-11 20:42:56 -07001317
1318 // Build up the QueueIndex.
1319 const QueueIndex queue_index =
1320 QueueIndex::Zero(queue_size).IncrementBy(uint32_queue_index);
1321
1322 // Read the message stored at the requested location.
Austin Schuhfaec51a2023-09-08 17:43:32 -07001323 Index mi = const_memory_->LoadIndex(queue_index);
1324 const Message *m = const_memory_->GetMessage(mi);
Austin Schuh20b2b082019-09-11 20:42:56 -07001325
1326 while (true) {
Austin Schuhfaec51a2023-09-08 17:43:32 -07001327 DCHECK(!CheckBothRedzones(const_memory_, m))
Brian Silverman0eaa1da2020-08-12 20:03:52 -07001328 << ": Invalid message found in shared memory";
Austin Schuh20b2b082019-09-11 20:42:56 -07001329 // We need to confirm that the data doesn't change while we are reading it.
1330 // Do that by first confirming that the message points to the queue index we
1331 // want.
1332 const QueueIndex starting_queue_index =
1333 m->header.queue_index.Load(queue_size);
1334 if (starting_queue_index != queue_index) {
1335 // If we found a message that is exactly 1 loop old, we just wrapped.
1336 if (starting_queue_index == queue_index.DecrementBy(queue_size)) {
Alex Perrycb7da4b2019-08-28 19:35:56 -07001337 VLOG(3) << "Matches: " << std::hex << starting_queue_index.index()
1338 << ", " << queue_index.DecrementBy(queue_size).index();
Brian Silvermanfc0d2e82020-08-12 19:58:35 -07001339 return Result::NOTHING_NEW;
Brian Silverman177567e2020-08-12 19:51:33 -07001340 }
1341
1342 // Someone has re-used this message between when we pulled it out of the
1343 // queue and when we grabbed its index. It is pretty hard to deduce
1344 // what happened. Just try again.
Austin Schuhfaec51a2023-09-08 17:43:32 -07001345 const Message *const new_m = const_memory_->GetMessage(queue_index);
Brian Silverman177567e2020-08-12 19:51:33 -07001346 if (m != new_m) {
1347 m = new_m;
1348 VLOG(3) << "Retrying, m doesn't match";
1349 continue;
1350 }
1351
1352 // We have confirmed that message still points to the same message. This
1353 // means that the message didn't get swapped out from under us, so
1354 // starting_queue_index is correct.
1355 //
1356 // Either we got too far behind (signaled by this being a valid
1357 // message), or this is one of the initial messages which are invalid.
1358 if (starting_queue_index.valid()) {
1359 VLOG(3) << "Too old. Tried for " << std::hex << queue_index.index()
1360 << ", got " << starting_queue_index.index() << ", behind by "
1361 << std::dec
1362 << (starting_queue_index.index() - queue_index.index());
Brian Silvermanfc0d2e82020-08-12 19:58:35 -07001363 return Result::TOO_OLD;
Brian Silverman177567e2020-08-12 19:51:33 -07001364 }
1365
1366 VLOG(3) << "Initial";
1367
1368 // There isn't a valid message at this location.
1369 //
1370 // If someone asks for one of the messages within the first go around,
1371 // then they need to wait. They got ahead. Otherwise, they are
1372 // asking for something crazy, like something before the beginning of
1373 // the queue. Tell them that they are behind.
Austin Schuhfaec51a2023-09-08 17:43:32 -07001374 if (uint32_queue_index < const_memory_->queue_size()) {
Brian Silverman177567e2020-08-12 19:51:33 -07001375 VLOG(3) << "Near zero, " << std::hex << uint32_queue_index;
Brian Silvermanfc0d2e82020-08-12 19:58:35 -07001376 return Result::NOTHING_NEW;
Austin Schuh20b2b082019-09-11 20:42:56 -07001377 } else {
Brian Silverman177567e2020-08-12 19:51:33 -07001378 VLOG(3) << "Not near zero, " << std::hex << uint32_queue_index;
Brian Silvermanfc0d2e82020-08-12 19:58:35 -07001379 return Result::TOO_OLD;
Austin Schuh20b2b082019-09-11 20:42:56 -07001380 }
1381 }
Alex Perrycb7da4b2019-08-28 19:35:56 -07001382 VLOG(3) << "Eq: " << std::hex << starting_queue_index.index() << ", "
1383 << queue_index.index();
Austin Schuh20b2b082019-09-11 20:42:56 -07001384 break;
1385 }
1386
Alex Perrycb7da4b2019-08-28 19:35:56 -07001387 // Then read the data out. Copy it all out to be deterministic and so we can
1388 // make length be from either end.
Austin Schuhfaec51a2023-09-08 17:43:32 -07001389 Context context;
1390 context.monotonic_event_time = m->header.monotonic_sent_time;
1391 context.realtime_event_time = m->header.realtime_sent_time;
1392 context.monotonic_remote_time = m->header.monotonic_remote_time;
Austin Schuhac6d89e2024-03-27 14:56:09 -07001393 context.monotonic_remote_transmit_time =
1394 m->header.monotonic_remote_transmit_time;
Austin Schuhfaec51a2023-09-08 17:43:32 -07001395 context.realtime_remote_time = m->header.realtime_remote_time;
1396 context.queue_index = queue_index.index();
1397 if (m->header.remote_queue_index == 0xffffffffu) {
1398 context.remote_queue_index = context.queue_index;
Austin Schuhad154822019-12-27 15:45:13 -08001399 } else {
Austin Schuhfaec51a2023-09-08 17:43:32 -07001400 context.remote_queue_index = m->header.remote_queue_index;
1401 }
1402 context.source_boot_uuid = m->header.source_boot_uuid;
1403 context.size = m->header.length;
1404 context.data = nullptr;
1405 context.buffer_index = -1;
Austin Schuh82ea7382023-07-14 15:17:34 -07001406
Austin Schuhfaec51a2023-09-08 17:43:32 -07001407 // If the callback is provided, use it.
1408 if (should_read_callback) {
Austin Schuh82ea7382023-07-14 15:17:34 -07001409 // And finally, confirm that the message *still* points to the queue index
1410 // we want. This means it didn't change out from under us. If something
1411 // changed out from under us, we were reading it much too late in its
1412 // lifetime.
1413 aos_compiler_memory_barrier();
1414 const QueueIndex final_queue_index = m->header.queue_index.Load(queue_size);
1415 if (final_queue_index != queue_index) {
1416 VLOG(3) << "Changed out from under us. Reading " << std::hex
1417 << queue_index.index() << ", finished with "
1418 << final_queue_index.index() << ", delta: " << std::dec
1419 << (final_queue_index.index() - queue_index.index());
1420 return Result::OVERWROTE;
1421 }
1422
1423 // We now know that the context is safe to use. See if we are supposed to
1424 // take the message or not.
Austin Schuhfaec51a2023-09-08 17:43:32 -07001425 if (!should_read_callback(context)) {
Austin Schuh82ea7382023-07-14 15:17:34 -07001426 return Result::FILTERED;
1427 }
Austin Schuhad154822019-12-27 15:45:13 -08001428 }
Austin Schuh20b2b082019-09-11 20:42:56 -07001429
Austin Schuhfaec51a2023-09-08 17:43:32 -07001430 // Read the data if requested.
1431 if (data) {
1432 memcpy(data, m->data(const_memory_->message_data_size()),
1433 const_memory_->message_data_size());
1434 }
1435
1436 // Now, we need to confirm that nothing has changed by re-reading the queue
1437 // index from the header since we've read all the body. We only need to do it
1438 // if we have read anything new after the previous check up above, which
1439 // happens if we read the data, or if we didn't check for the filtered case.
1440 if (data || !should_read_callback) {
Austin Schuh82ea7382023-07-14 15:17:34 -07001441 aos_compiler_memory_barrier();
1442 const QueueIndex final_queue_index = m->header.queue_index.Load(queue_size);
1443 if (final_queue_index != queue_index) {
1444 VLOG(3) << "Changed out from under us. Reading " << std::hex
1445 << queue_index.index() << ", finished with "
1446 << final_queue_index.index() << ", delta: " << std::dec
1447 << (final_queue_index.index() - queue_index.index());
1448 return Result::OVERWROTE;
1449 }
Austin Schuh20b2b082019-09-11 20:42:56 -07001450 }
1451
Austin Schuhfaec51a2023-09-08 17:43:32 -07001452 // And now take it and make it visible to the user. By doing it here, we will
1453 // never present partial or corrupted state to the user in the output
1454 // pointers.
1455 *monotonic_sent_time = context.monotonic_event_time;
1456 *realtime_sent_time = context.realtime_event_time;
1457 *remote_queue_index = context.remote_queue_index;
1458 *monotonic_remote_time = context.monotonic_remote_time;
Austin Schuhac6d89e2024-03-27 14:56:09 -07001459 *monotonic_remote_transmit_time = context.monotonic_remote_transmit_time;
Austin Schuhfaec51a2023-09-08 17:43:32 -07001460 *realtime_remote_time = context.realtime_remote_time;
1461 *source_boot_uuid = context.source_boot_uuid;
1462 *length = context.size;
1463
Brian Silvermanfc0d2e82020-08-12 19:58:35 -07001464 return Result::GOOD;
Austin Schuh20b2b082019-09-11 20:42:56 -07001465}
1466
Brian Silvermanfc0d2e82020-08-12 19:58:35 -07001467QueueIndex LocklessQueueReader::LatestIndex() const {
Austin Schuhfaec51a2023-09-08 17:43:32 -07001468 const size_t queue_size = const_memory_->queue_size();
Austin Schuh20b2b082019-09-11 20:42:56 -07001469
Austin Schuhfaec51a2023-09-08 17:43:32 -07001470 // There are 2 main cases. Either the next queue index is right, or it is
1471 // behind by 1 and wrong. If nothing has been published, the next queue index
1472 // will be the reserved "Invalid" value, otherwise it will point to the next
1473 // place to write. We need to figure out if it is right or wrong, and it if
1474 // is wrong, fix it. If we don't, Read() can find the next message before
1475 // LatestIndex() sees it if someone is hammering on Read() until it returns
1476 // nothing new is left, which mean watchers and fetchers may disagree on when
1477 // a message is published.
1478 QueueIndex actual_next_queue_index =
1479 const_memory_->next_queue_index.Load(queue_size);
1480
1481 // Handle the "nothing has been published" case by making next_queue_index
1482 // point to the 0th index.
1483 const QueueIndex next_queue_index = ZeroOrValid(actual_next_queue_index);
1484
1485 // This needs to synchronize with whoever the previous writer at this
1486 // location was. Read what is there to see if the message has been published
1487 // and next_queue_index is just behind.
1488 Index to_replace = const_memory_->LoadIndex(next_queue_index);
1489
1490 // See if next_queue_index is consistent with the state of the queue. If it
1491 // is not, try to atomically update next_queue_index in case the previous
1492 // writer failed and retry.
1493 if (to_replace.IsPlausible(next_queue_index)) {
1494 // If next_queue_index ends up pointing to a message with a matching index,
1495 // this is what next_queue_index needs to be updated to
1496 const QueueIndex incremented_queue_index = next_queue_index.Increment();
1497
1498 // We don't care about the result. It will either succeed, or we got
1499 // beat in fixing it. The way the Send logic works, the pointer can never
1500 // get more than 1 behind or the next send will repair it. So, if we fail,
1501 // that means that someone else got there first and fixed it up (and
1502 // potentially someone further continued to send).
1503 //
1504 // Both require no further action from us. Worst case, our Next pointer
1505 // will not be the latest message, but there will always be a point after
1506 // which the index can change. We just need a consistent snapshot where
1507 // there is nothing in the queue that isn't accounted for by
1508 // next_queue_index.
1509 memory_->next_queue_index.CompareAndExchangeStrong(actual_next_queue_index,
1510 incremented_queue_index);
1511
1512 VLOG(3) << "next_queue_index is lagging, fixed it. Found " << std::hex
1513 << to_replace.get() << ", expected "
1514 << next_queue_index.DecrementBy(queue_size).index();
1515
1516 actual_next_queue_index = incremented_queue_index;
1517 }
1518
1519 if (actual_next_queue_index.valid()) {
1520 const QueueIndex current_queue_index =
1521 actual_next_queue_index.DecrementBy(1u);
Alex Perrycb7da4b2019-08-28 19:35:56 -07001522 return current_queue_index;
Austin Schuh20b2b082019-09-11 20:42:56 -07001523 }
Brian Silvermanfc0d2e82020-08-12 19:58:35 -07001524 return QueueIndex::Invalid();
1525}
1526
1527size_t LocklessQueueSize(const LocklessQueueMemory *memory) {
1528 return memory->queue_size();
1529}
1530
1531size_t LocklessQueueMessageDataSize(const LocklessQueueMemory *memory) {
1532 return memory->message_data_size();
Austin Schuh20b2b082019-09-11 20:42:56 -07001533}
1534
1535namespace {
1536
1537// Prints out the mutex state. Not safe to use while the mutex is being
1538// changed.
Austin Schuh83cbb1e2023-06-23 12:59:02 -07001539::std::string PrintMutex(const aos_mutex *mutex) {
Austin Schuh20b2b082019-09-11 20:42:56 -07001540 ::std::stringstream s;
1541 s << "aos_mutex(" << ::std::hex << mutex->futex;
1542
1543 if (mutex->futex != 0) {
1544 s << ":";
1545 if (mutex->futex & FUTEX_OWNER_DIED) {
1546 s << "FUTEX_OWNER_DIED|";
1547 }
1548 s << "tid=" << (mutex->futex & FUTEX_TID_MASK);
1549 }
1550
1551 s << ")";
1552 return s.str();
1553}
1554
1555} // namespace
1556
Austin Schuh83cbb1e2023-06-23 12:59:02 -07001557void PrintLocklessQueueMemory(const LocklessQueueMemory *memory) {
Austin Schuh20b2b082019-09-11 20:42:56 -07001558 const size_t queue_size = memory->queue_size();
1559 ::std::cout << "LocklessQueueMemory (" << memory << ") {" << ::std::endl;
1560 ::std::cout << " aos_mutex queue_setup_lock = "
1561 << PrintMutex(&memory->queue_setup_lock) << ::std::endl;
Brian Silvermanfafe1fa2019-12-18 21:42:18 -08001562 ::std::cout << " bool initialized = " << memory->initialized << ::std::endl;
Austin Schuh20b2b082019-09-11 20:42:56 -07001563 ::std::cout << " config {" << ::std::endl;
1564 ::std::cout << " size_t num_watchers = " << memory->config.num_watchers
1565 << ::std::endl;
1566 ::std::cout << " size_t num_senders = " << memory->config.num_senders
1567 << ::std::endl;
Brian Silverman177567e2020-08-12 19:51:33 -07001568 ::std::cout << " size_t num_pinners = " << memory->config.num_pinners
1569 << ::std::endl;
Austin Schuh20b2b082019-09-11 20:42:56 -07001570 ::std::cout << " size_t queue_size = " << memory->config.queue_size
1571 << ::std::endl;
1572 ::std::cout << " size_t message_data_size = "
1573 << memory->config.message_data_size << ::std::endl;
1574
1575 ::std::cout << " AtomicQueueIndex next_queue_index = "
1576 << memory->next_queue_index.Load(queue_size).DebugString()
1577 << ::std::endl;
1578
Austin Schuh3328d132020-02-28 13:54:57 -08001579 ::std::cout << " uid_t uid = " << memory->uid << ::std::endl;
1580
Austin Schuh20b2b082019-09-11 20:42:56 -07001581 ::std::cout << " }" << ::std::endl;
1582 ::std::cout << " AtomicIndex queue[" << queue_size << "] {" << ::std::endl;
1583 for (size_t i = 0; i < queue_size; ++i) {
1584 ::std::cout << " [" << i << "] -> "
1585 << memory->GetQueue(i)->Load().DebugString() << ::std::endl;
1586 }
1587 ::std::cout << " }" << ::std::endl;
1588 ::std::cout << " Message messages[" << memory->num_messages() << "] {"
1589 << ::std::endl;
1590 for (size_t i = 0; i < memory->num_messages(); ++i) {
Austin Schuh83cbb1e2023-06-23 12:59:02 -07001591 const Message *m = memory->GetMessage(Index(i, i));
Brian Silverman001f24d2020-08-12 19:33:20 -07001592 ::std::cout << " [" << i << "] -> Message 0x" << std::hex
1593 << (reinterpret_cast<uintptr_t>(
1594 memory->GetMessage(Index(i, i))) -
1595 reinterpret_cast<uintptr_t>(memory))
1596 << std::dec << " {" << ::std::endl;
Austin Schuh20b2b082019-09-11 20:42:56 -07001597 ::std::cout << " Header {" << ::std::endl;
1598 ::std::cout << " AtomicQueueIndex queue_index = "
1599 << m->header.queue_index.Load(queue_size).DebugString()
1600 << ::std::endl;
Brian Silverman001f24d2020-08-12 19:33:20 -07001601 ::std::cout << " monotonic_clock::time_point monotonic_sent_time = "
1602 << m->header.monotonic_sent_time << " 0x" << std::hex
1603 << m->header.monotonic_sent_time.time_since_epoch().count()
1604 << std::dec << ::std::endl;
1605 ::std::cout << " realtime_clock::time_point realtime_sent_time = "
1606 << m->header.realtime_sent_time << " 0x" << std::hex
1607 << m->header.realtime_sent_time.time_since_epoch().count()
1608 << std::dec << ::std::endl;
1609 ::std::cout
1610 << " monotonic_clock::time_point monotonic_remote_time = "
1611 << m->header.monotonic_remote_time << " 0x" << std::hex
1612 << m->header.monotonic_remote_time.time_since_epoch().count()
1613 << std::dec << ::std::endl;
Austin Schuhac6d89e2024-03-27 14:56:09 -07001614 ::std::cout
1615 << " monotonic_clock::time_point "
1616 "monotonic_remote_transmit_time = "
1617 << m->header.monotonic_remote_transmit_time << " 0x" << std::hex
1618 << m->header.monotonic_remote_transmit_time.time_since_epoch().count()
1619 << std::dec << ::std::endl;
Brian Silverman001f24d2020-08-12 19:33:20 -07001620 ::std::cout << " realtime_clock::time_point realtime_remote_time = "
1621 << m->header.realtime_remote_time << " 0x" << std::hex
1622 << m->header.realtime_remote_time.time_since_epoch().count()
1623 << std::dec << ::std::endl;
Austin Schuh20b2b082019-09-11 20:42:56 -07001624 ::std::cout << " size_t length = " << m->header.length
1625 << ::std::endl;
1626 ::std::cout << " }" << ::std::endl;
Austin Schuhbe416742020-10-03 17:24:26 -07001627 const bool corrupt = CheckBothRedzones(memory, m);
1628 if (corrupt) {
Austin Schuh83cbb1e2023-06-23 12:59:02 -07001629 absl::Span<const char> pre_redzone =
1630 m->PreRedzone(memory->message_data_size());
1631 absl::Span<const char> post_redzone =
Austin Schuhbe416742020-10-03 17:24:26 -07001632 m->PostRedzone(memory->message_data_size(), memory->message_size());
1633
1634 ::std::cout << " pre-redzone: \""
1635 << absl::BytesToHexString(std::string_view(
1636 pre_redzone.data(), pre_redzone.size()))
1637 << std::endl;
Brian Silverman0eaa1da2020-08-12 20:03:52 -07001638 ::std::cout << " // *** DATA REDZONES ARE CORRUPTED ***"
1639 << ::std::endl;
Austin Schuhbe416742020-10-03 17:24:26 -07001640 ::std::cout << " post-redzone: \""
1641 << absl::BytesToHexString(std::string_view(
1642 post_redzone.data(), post_redzone.size()))
1643 << std::endl;
Brian Silverman0eaa1da2020-08-12 20:03:52 -07001644 }
Austin Schuh20b2b082019-09-11 20:42:56 -07001645 ::std::cout << " data: {";
1646
Austin Schuh99f7c6a2024-06-25 22:07:44 -07001647 if (absl::GetFlag(FLAGS_dump_lockless_queue_data)) {
Brian Silverman001f24d2020-08-12 19:33:20 -07001648 const char *const m_data = m->data(memory->message_data_size());
Austin Schuhbe416742020-10-03 17:24:26 -07001649 std::cout << absl::BytesToHexString(std::string_view(
1650 m_data, corrupt ? memory->message_data_size() : m->header.length));
Austin Schuh20b2b082019-09-11 20:42:56 -07001651 }
1652 ::std::cout << ::std::setfill(' ') << ::std::dec << "}" << ::std::endl;
1653 ::std::cout << " }," << ::std::endl;
1654 }
1655 ::std::cout << " }" << ::std::endl;
1656
Alex Perrycb7da4b2019-08-28 19:35:56 -07001657 ::std::cout << " Sender senders[" << memory->num_senders() << "] {"
1658 << ::std::endl;
Austin Schuh20b2b082019-09-11 20:42:56 -07001659 for (size_t i = 0; i < memory->num_senders(); ++i) {
Austin Schuh83cbb1e2023-06-23 12:59:02 -07001660 const Sender *s = memory->GetSender(i);
Austin Schuh20b2b082019-09-11 20:42:56 -07001661 ::std::cout << " [" << i << "] -> Sender {" << ::std::endl;
Philipp Schraderab2f8432023-09-17 18:58:06 -07001662 ::std::cout << " RobustOwnershipTracker ownership_tracker = "
1663 << s->ownership_tracker.DebugString() << ::std::endl;
Austin Schuh20b2b082019-09-11 20:42:56 -07001664 ::std::cout << " AtomicIndex scratch_index = "
1665 << s->scratch_index.Load().DebugString() << ::std::endl;
1666 ::std::cout << " AtomicIndex to_replace = "
1667 << s->to_replace.Load().DebugString() << ::std::endl;
1668 ::std::cout << " }" << ::std::endl;
1669 }
1670 ::std::cout << " }" << ::std::endl;
1671
Brian Silverman177567e2020-08-12 19:51:33 -07001672 ::std::cout << " Pinner pinners[" << memory->num_pinners() << "] {"
1673 << ::std::endl;
1674 for (size_t i = 0; i < memory->num_pinners(); ++i) {
Austin Schuh83cbb1e2023-06-23 12:59:02 -07001675 const Pinner *p = memory->GetPinner(i);
Brian Silverman177567e2020-08-12 19:51:33 -07001676 ::std::cout << " [" << i << "] -> Pinner {" << ::std::endl;
Philipp Schraderab2f8432023-09-17 18:58:06 -07001677 ::std::cout << " RobustOwnershipTracker ownership_tracker = "
1678 << p->ownership_tracker.DebugString() << ::std::endl;
Brian Silverman177567e2020-08-12 19:51:33 -07001679 ::std::cout << " AtomicIndex scratch_index = "
1680 << p->scratch_index.Load().DebugString() << ::std::endl;
1681 ::std::cout << " AtomicIndex pinned = "
1682 << p->pinned.Load(memory->queue_size()).DebugString()
1683 << ::std::endl;
1684 ::std::cout << " }" << ::std::endl;
1685 }
1686 ::std::cout << " }" << ::std::endl;
1687
Austin Schuh20b2b082019-09-11 20:42:56 -07001688 ::std::cout << " Watcher watchers[" << memory->num_watchers() << "] {"
1689 << ::std::endl;
1690 for (size_t i = 0; i < memory->num_watchers(); ++i) {
Austin Schuh83cbb1e2023-06-23 12:59:02 -07001691 const Watcher *w = memory->GetWatcher(i);
Austin Schuh20b2b082019-09-11 20:42:56 -07001692 ::std::cout << " [" << i << "] -> Watcher {" << ::std::endl;
Philipp Schraderab2f8432023-09-17 18:58:06 -07001693 ::std::cout << " RobustOwnershipTracker ownership_tracker = "
1694 << w->ownership_tracker.DebugString() << ::std::endl;
Austin Schuh20b2b082019-09-11 20:42:56 -07001695 ::std::cout << " pid_t pid = " << w->pid << ::std::endl;
1696 ::std::cout << " int priority = " << w->priority << ::std::endl;
1697 ::std::cout << " }" << ::std::endl;
1698 }
1699 ::std::cout << " }" << ::std::endl;
1700
1701 ::std::cout << "}" << ::std::endl;
1702}
1703
Stephan Pleinesf63bde82024-01-13 15:59:33 -08001704} // namespace aos::ipc_lib