blob: d9a1a712c0ca15565b76e0dd7dfd74d38d60e403 [file] [log] [blame]
Austin Schuh20b2b082019-09-11 20:42:56 -07001#include "aos/ipc_lib/lockless_queue.h"
2
3#include <linux/futex.h>
4#include <sys/types.h>
5#include <syscall.h>
6#include <unistd.h>
7#include <algorithm>
8#include <iomanip>
9#include <iostream>
10#include <sstream>
11
Austin Schuh20b2b082019-09-11 20:42:56 -070012#include "aos/ipc_lib/lockless_queue_memory.h"
Alex Perrycb7da4b2019-08-28 19:35:56 -070013#include "aos/realtime.h"
Austin Schuh20b2b082019-09-11 20:42:56 -070014#include "aos/util/compiler_memory_barrier.h"
Brian Silverman001f24d2020-08-12 19:33:20 -070015#include "gflags/gflags.h"
Austin Schuhf257f3c2019-10-27 21:00:43 -070016#include "glog/logging.h"
Austin Schuh20b2b082019-09-11 20:42:56 -070017
Brian Silverman001f24d2020-08-12 19:33:20 -070018DEFINE_bool(dump_lockless_queue_data, false,
19 "If true, print the data out when dumping the queue.");
20
Austin Schuh20b2b082019-09-11 20:42:56 -070021namespace aos {
22namespace ipc_lib {
Austin Schuh20b2b082019-09-11 20:42:56 -070023namespace {
24
Brian Silvermanfafe1fa2019-12-18 21:42:18 -080025class GrabQueueSetupLockOrDie {
26 public:
27 GrabQueueSetupLockOrDie(LocklessQueueMemory *memory) : memory_(memory) {
28 const int result = mutex_grab(&(memory->queue_setup_lock));
29 CHECK(result == 0 || result == 1) << ": " << result;
30 }
Austin Schuh20b2b082019-09-11 20:42:56 -070031
Brian Silvermanfafe1fa2019-12-18 21:42:18 -080032 ~GrabQueueSetupLockOrDie() { mutex_unlock(&(memory_->queue_setup_lock)); }
33
34 GrabQueueSetupLockOrDie(const GrabQueueSetupLockOrDie &) = delete;
35 GrabQueueSetupLockOrDie &operator=(const GrabQueueSetupLockOrDie &) = delete;
36
37 private:
38 LocklessQueueMemory *const memory_;
39};
40
Brian Silvermand5ca8c62020-08-12 19:51:03 -070041// Returns true if it succeeded. Returns false if another sender died in the
42// middle.
43bool DoCleanup(LocklessQueueMemory *memory, const GrabQueueSetupLockOrDie &) {
Brian Silvermanfafe1fa2019-12-18 21:42:18 -080044 // Make sure we start looking at shared memory fresh right now. We'll handle
45 // people dying partway through by either cleaning up after them or not, but
46 // we want to ensure we clean up after anybody who has already died when we
47 // start.
48 aos_compiler_memory_barrier();
49
Austin Schuh20b2b082019-09-11 20:42:56 -070050 const size_t num_senders = memory->num_senders();
51 const size_t queue_size = memory->queue_size();
52 const size_t num_messages = memory->num_messages();
53
54 // There are a large number of crazy cases here for how things can go wrong
55 // and how we have to recover. They either require us to keep extra track of
56 // what is going on, slowing down the send path, or require a large number of
57 // cases.
58 //
59 // The solution here is to not over-think it. This is running while not real
60 // time during construction. It is allowed to be slow. It will also very
61 // rarely trigger. There is a small uS window where process death is
62 // ambiguous.
63 //
64 // So, build up a list N long, where N is the number of messages. Search
65 // through the entire queue and the sender list (ignoring any dead senders),
66 // and mark down which ones we have seen. Once we have seen all the messages
67 // except the N dead senders, we know which messages are dead. Because the
68 // queue is active while we do this, it may take a couple of go arounds to see
69 // everything.
70
Brian Silvermand5ca8c62020-08-12 19:51:03 -070071 ::std::vector<bool> need_recovery(num_senders, false);
72
Austin Schuh20b2b082019-09-11 20:42:56 -070073 // Do the easy case. Find all senders who have died. See if they are either
74 // consistent already, or if they have copied over to_replace to the scratch
75 // index, but haven't cleared to_replace. Count them.
76 size_t valid_senders = 0;
77 for (size_t i = 0; i < num_senders; ++i) {
78 Sender *sender = memory->GetSender(i);
79 const uint32_t tid =
Brian Silvermanfafe1fa2019-12-18 21:42:18 -080080 __atomic_load_n(&(sender->tid.futex), __ATOMIC_ACQUIRE);
Brian Silvermand5ca8c62020-08-12 19:51:03 -070081 if (!(tid & FUTEX_OWNER_DIED)) {
Austin Schuh20b2b082019-09-11 20:42:56 -070082 // Not dead.
83 ++valid_senders;
Brian Silvermand5ca8c62020-08-12 19:51:03 -070084 continue;
Austin Schuh20b2b082019-09-11 20:42:56 -070085 }
Brian Silvermand5ca8c62020-08-12 19:51:03 -070086 VLOG(3) << "Found an easy death for sender " << i;
87 // We can do a relaxed load here because we're the only person touching
88 // this sender at this point.
89 const Index to_replace = sender->to_replace.RelaxedLoad();
90 const Index scratch_index = sender->scratch_index.Load();
91
92 // I find it easiest to think about this in terms of the set of observable
93 // states. The main code progresses through the following states:
94
95 // 1) scratch_index = xxx
96 // to_replace = invalid
97 // This is unambiguous. Already good.
98
99 // 2) scratch_index = xxx
100 // to_replace = yyy
101 // Very ambiguous. Is xxx or yyy the correct one? Need to either roll
102 // this forwards or backwards.
103
104 // 3) scratch_index = yyy
105 // to_replace = yyy
106 // We are in the act of moving to_replace to scratch_index, but didn't
107 // finish. Easy.
108
109 // 4) scratch_index = yyy
110 // to_replace = invalid
111 // Finished, but died. Looks like 1)
112
113 // Any cleanup code needs to follow the same set of states to be robust to
114 // death, so death can be restarted.
115
116 if (!to_replace.valid()) {
117 // 1) or 4). Make sure we aren't corrupted and declare victory.
118 CHECK(scratch_index.valid());
119
120 __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_RELEASE);
121 ++valid_senders;
122 continue;
123 }
124
125 // Could be 2) or 3) at this point.
126
127 if (to_replace == scratch_index) {
128 // 3) for sure.
129 // Just need to invalidate to_replace to finish.
130 sender->to_replace.Invalidate();
131
132 // And mark that we succeeded.
133 __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_RELEASE);
134 ++valid_senders;
135 continue;
136 }
137
138 // Must be 2). Mark it for later.
139 need_recovery[i] = true;
Austin Schuh20b2b082019-09-11 20:42:56 -0700140 }
141
142 // If all the senders are (or were made) good, there is no need to do the hard
143 // case.
144 if (valid_senders == num_senders) {
Brian Silvermand5ca8c62020-08-12 19:51:03 -0700145 return true;
Austin Schuh20b2b082019-09-11 20:42:56 -0700146 }
147
Alex Perrycb7da4b2019-08-28 19:35:56 -0700148 VLOG(3) << "Starting hard cleanup";
Austin Schuh20b2b082019-09-11 20:42:56 -0700149
150 size_t num_accounted_for = 0;
151 size_t num_missing = 0;
152 ::std::vector<bool> accounted_for(num_messages, false);
153
154 while ((num_accounted_for + num_missing) != num_messages) {
155 num_missing = 0;
156 for (size_t i = 0; i < num_senders; ++i) {
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800157 Sender *const sender = memory->GetSender(i);
Austin Schuh20b2b082019-09-11 20:42:56 -0700158 const uint32_t tid =
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800159 __atomic_load_n(&(sender->tid.futex), __ATOMIC_ACQUIRE);
Austin Schuh20b2b082019-09-11 20:42:56 -0700160 if (tid & FUTEX_OWNER_DIED) {
Brian Silvermand5ca8c62020-08-12 19:51:03 -0700161 if (!need_recovery[i]) {
162 return false;
163 }
Austin Schuh20b2b082019-09-11 20:42:56 -0700164 ++num_missing;
165 } else {
Brian Silvermand5ca8c62020-08-12 19:51:03 -0700166 CHECK(!need_recovery[i]) << ": Somebody else recovered a sender: " << i;
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800167 // We can do a relaxed load here because we're the only person touching
168 // this sender at this point, if it matters. If it's not a dead sender,
169 // then any message it every has will already be accounted for, so this
170 // will always be a NOP.
Austin Schuh20b2b082019-09-11 20:42:56 -0700171 const Index scratch_index = sender->scratch_index.RelaxedLoad();
172 if (!accounted_for[scratch_index.message_index()]) {
173 ++num_accounted_for;
174 }
175 accounted_for[scratch_index.message_index()] = true;
176 }
177 }
178
179 for (size_t i = 0; i < queue_size; ++i) {
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800180 // Same logic as above for scratch_index applies here too.
Austin Schuh20b2b082019-09-11 20:42:56 -0700181 const Index index = memory->GetQueue(i)->RelaxedLoad();
182 if (!accounted_for[index.message_index()]) {
183 ++num_accounted_for;
184 }
185 accounted_for[index.message_index()] = true;
186 }
Brian Silvermand5ca8c62020-08-12 19:51:03 -0700187
188 CHECK_LE(num_accounted_for + num_missing, num_messages);
Austin Schuh20b2b082019-09-11 20:42:56 -0700189 }
190
191 while (num_missing != 0) {
192 const size_t starting_num_missing = num_missing;
193 for (size_t i = 0; i < num_senders; ++i) {
194 Sender *sender = memory->GetSender(i);
195 const uint32_t tid =
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800196 __atomic_load_n(&(sender->tid.futex), __ATOMIC_ACQUIRE);
Brian Silvermand5ca8c62020-08-12 19:51:03 -0700197 if (!(tid & FUTEX_OWNER_DIED)) {
198 CHECK(!need_recovery[i]) << ": Somebody else recovered a sender: " << i;
199 continue;
200 }
201 if (!need_recovery[i]) {
202 return false;
203 }
204 // We can do relaxed loads here because we're the only person touching
205 // this sender at this point.
206 const Index scratch_index = sender->scratch_index.RelaxedLoad();
207 const Index to_replace = sender->to_replace.RelaxedLoad();
Austin Schuh20b2b082019-09-11 20:42:56 -0700208
Brian Silvermand5ca8c62020-08-12 19:51:03 -0700209 // Candidate.
210 if (to_replace.valid()) {
211 CHECK_LE(to_replace.message_index(), accounted_for.size());
212 }
213 if (scratch_index.valid()) {
214 CHECK_LE(scratch_index.message_index(), accounted_for.size());
215 }
216 if (!to_replace.valid() || accounted_for[to_replace.message_index()]) {
217 CHECK(scratch_index.valid());
218 VLOG(3) << "Sender " << i
219 << " died, to_replace is already accounted for";
220 // If both are accounted for, we are corrupt...
221 CHECK(!accounted_for[scratch_index.message_index()]);
Austin Schuh20b2b082019-09-11 20:42:56 -0700222
Brian Silvermand5ca8c62020-08-12 19:51:03 -0700223 // to_replace is already accounted for. This means that we didn't
224 // atomically insert scratch_index into the queue yet. So
225 // invalidate to_replace.
226 sender->to_replace.Invalidate();
Austin Schuh20b2b082019-09-11 20:42:56 -0700227
Brian Silvermand5ca8c62020-08-12 19:51:03 -0700228 // And then mark this sender clean.
229 __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_RELEASE);
230 need_recovery[i] = false;
Austin Schuh20b2b082019-09-11 20:42:56 -0700231
Brian Silvermand5ca8c62020-08-12 19:51:03 -0700232 // And account for scratch_index.
233 accounted_for[scratch_index.message_index()] = true;
234 --num_missing;
235 ++num_accounted_for;
236 } else if (!scratch_index.valid() ||
237 accounted_for[scratch_index.message_index()]) {
238 VLOG(3) << "Sender " << i
239 << " died, scratch_index is already accounted for";
240 // scratch_index is accounted for. That means we did the insert,
241 // but didn't record it.
242 CHECK(to_replace.valid());
243 // Finish the transaction. Copy to_replace, then clear it.
Austin Schuh20b2b082019-09-11 20:42:56 -0700244
Brian Silvermand5ca8c62020-08-12 19:51:03 -0700245 sender->scratch_index.Store(to_replace);
246 sender->to_replace.Invalidate();
Austin Schuh20b2b082019-09-11 20:42:56 -0700247
Brian Silvermand5ca8c62020-08-12 19:51:03 -0700248 // And then mark this sender clean.
249 __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_RELEASE);
250 need_recovery[i] = false;
Austin Schuh20b2b082019-09-11 20:42:56 -0700251
Brian Silvermand5ca8c62020-08-12 19:51:03 -0700252 // And account for to_replace.
253 accounted_for[to_replace.message_index()] = true;
254 --num_missing;
255 ++num_accounted_for;
256 } else {
257 VLOG(3) << "Sender " << i << " died, neither is accounted for";
258 // Ambiguous. There will be an unambiguous one somewhere that we
259 // can do first.
Austin Schuh20b2b082019-09-11 20:42:56 -0700260 }
261 }
262 // CHECK that we are making progress.
263 CHECK_NE(num_missing, starting_num_missing);
264 }
Brian Silvermand5ca8c62020-08-12 19:51:03 -0700265 return true;
266}
267
268void Cleanup(LocklessQueueMemory *memory, const GrabQueueSetupLockOrDie &lock) {
269 // The number of iterations is bounded here because there are only a finite
270 // number of senders in existence which could die, and no new ones can be
271 // created while we're in here holding the lock.
272 while (!DoCleanup(memory, lock)) {
273 }
Austin Schuh20b2b082019-09-11 20:42:56 -0700274}
275
276// Exposes rt_tgsigqueueinfo so we can send the signal *just* to the target
277// thread.
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800278// TODO(Brian): Do directly in assembly for armhf at least for efficiency.
Austin Schuh20b2b082019-09-11 20:42:56 -0700279int rt_tgsigqueueinfo(pid_t tgid, pid_t tid, int sig, siginfo_t *si) {
280 return syscall(SYS_rt_tgsigqueueinfo, tgid, tid, sig, si);
281}
282
283} // namespace
284
Austin Schuh4bc4f902019-12-23 18:04:51 -0800285size_t LocklessQueueConfiguration::message_size() const {
286 // Round up the message size so following data is aligned appropriately.
Brian Silvermana1652f32020-01-29 20:41:44 -0800287 return LocklessQueueMemory::AlignmentRoundUp(message_data_size +
288 (kChannelDataAlignment - 1)) +
Austin Schuh4bc4f902019-12-23 18:04:51 -0800289 sizeof(Message);
290}
291
Austin Schuh20b2b082019-09-11 20:42:56 -0700292size_t LocklessQueueMemorySize(LocklessQueueConfiguration config) {
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800293 // Round up the message size so following data is aligned appropriately.
294 config.message_data_size =
295 LocklessQueueMemory::AlignmentRoundUp(config.message_data_size);
Austin Schuh20b2b082019-09-11 20:42:56 -0700296
297 // As we build up the size, confirm that everything is aligned to the
298 // alignment requirements of the type.
299 size_t size = sizeof(LocklessQueueMemory);
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800300 CHECK_EQ(size % alignof(LocklessQueueMemory), 0u);
Austin Schuh20b2b082019-09-11 20:42:56 -0700301
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800302 CHECK_EQ(size % alignof(AtomicIndex), 0u);
Austin Schuh20b2b082019-09-11 20:42:56 -0700303 size += LocklessQueueMemory::SizeOfQueue(config);
304
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800305 CHECK_EQ(size % alignof(Message), 0u);
Austin Schuh20b2b082019-09-11 20:42:56 -0700306 size += LocklessQueueMemory::SizeOfMessages(config);
307
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800308 CHECK_EQ(size % alignof(Watcher), 0u);
Austin Schuh20b2b082019-09-11 20:42:56 -0700309 size += LocklessQueueMemory::SizeOfWatchers(config);
310
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800311 CHECK_EQ(size % alignof(Sender), 0u);
Austin Schuh20b2b082019-09-11 20:42:56 -0700312 size += LocklessQueueMemory::SizeOfSenders(config);
313
314 return size;
315}
316
317LocklessQueueMemory *InitializeLocklessQueueMemory(
318 LocklessQueueMemory *memory, LocklessQueueConfiguration config) {
319 // Everything should be zero initialized already. So we just need to fill
320 // everything out properly.
321
Brian Silvermanc57ff0a2020-04-28 16:45:13 -0700322 // This is the UID we will use for checking signal-sending permission
323 // compatibility.
324 //
325 // The manpage says:
326 // For a process to have permission to send a signal, it must either be
327 // privileged [...], or the real or effective user ID of the sending process
328 // must equal the real or saved set-user-ID of the target process.
329 //
330 // Processes typically initialize a queue in random order as they start up.
331 // This means we need an algorithm for verifying all processes have
332 // permissions to send each other signals which gives the same answer no
333 // matter what order they attach in. We would also like to avoid maintaining a
334 // shared list of the UIDs of all processes.
335 //
336 // To do this while still giving sufficient flexibility for all current use
337 // cases, we track a single UID for the queue. All processes with a matching
338 // euid+suid must have this UID. Any processes with distinct euid/suid must
339 // instead have a matching ruid. This guarantees signals can be sent between
340 // all processes attached to the queue.
341 //
342 // In particular, this allows a process to change only its euid (to interact
343 // with a queue) while still maintaining privileges via its ruid. However, it
344 // can only use privileges in ways that do not require changing the euid back,
345 // because while the euid is different it will not be able to receive signals.
346 // We can't actually verify that, but we can sanity check that things are
347 // valid when the queue is initialized.
348
349 uid_t uid;
350 {
351 uid_t ruid, euid, suid;
352 PCHECK(getresuid(&ruid, &euid, &suid) == 0);
353 // If these are equal, then use them, even if that's different from the real
354 // UID. This allows processes to keep a real UID of 0 (to have permissions
355 // to perform system-level changes) while still being able to communicate
356 // with processes running unprivileged as a distinct user.
357 if (euid == suid) {
358 uid = euid;
359 VLOG(1) << "Using euid==suid " << uid;
360 } else {
361 uid = ruid;
362 VLOG(1) << "Using ruid " << ruid;
363 }
364 }
365
Austin Schuh20b2b082019-09-11 20:42:56 -0700366 // Grab the mutex. We don't care if the previous reader died. We are going
367 // to check everything anyways.
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800368 GrabQueueSetupLockOrDie grab_queue_setup_lock(memory);
Austin Schuh20b2b082019-09-11 20:42:56 -0700369
370 if (!memory->initialized) {
371 // TODO(austin): Check these for out of bounds.
372 memory->config.num_watchers = config.num_watchers;
373 memory->config.num_senders = config.num_senders;
374 memory->config.queue_size = config.queue_size;
Austin Schuh4bc4f902019-12-23 18:04:51 -0800375 memory->config.message_data_size = config.message_data_size;
Austin Schuh20b2b082019-09-11 20:42:56 -0700376
377 const size_t num_messages = memory->num_messages();
378 // There need to be at most MaxMessages() messages allocated.
379 CHECK_LE(num_messages, Index::MaxMessages());
380
381 for (size_t i = 0; i < num_messages; ++i) {
382 memory->GetMessage(Index(QueueIndex::Zero(memory->queue_size()), i))
383 ->header.queue_index.Invalidate();
384 }
385
386 for (size_t i = 0; i < memory->queue_size(); ++i) {
387 // Make the initial counter be the furthest away number. That means that
388 // index 0 should be 0xffff, 1 should be 0, etc.
389 memory->GetQueue(i)->Store(Index(QueueIndex::Zero(memory->queue_size())
390 .IncrementBy(i)
391 .DecrementBy(memory->queue_size()),
392 i));
393 }
394
395 memory->next_queue_index.Invalidate();
Brian Silvermanc57ff0a2020-04-28 16:45:13 -0700396 memory->uid = uid;
Austin Schuh20b2b082019-09-11 20:42:56 -0700397
398 for (size_t i = 0; i < memory->num_senders(); ++i) {
399 ::aos::ipc_lib::Sender *s = memory->GetSender(i);
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800400 // Nobody else can possibly be touching these because we haven't set
401 // initialized to true yet.
402 s->scratch_index.RelaxedStore(Index(0xffff, i + memory->queue_size()));
Austin Schuh20b2b082019-09-11 20:42:56 -0700403 s->to_replace.RelaxedInvalidate();
404 }
405
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800406 aos_compiler_memory_barrier();
Austin Schuh20b2b082019-09-11 20:42:56 -0700407 // Signal everything is done. This needs to be done last, so if we die, we
408 // redo initialization.
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800409 memory->initialized = true;
Austin Schuh3328d132020-02-28 13:54:57 -0800410 } else {
Brian Silvermanc57ff0a2020-04-28 16:45:13 -0700411 CHECK_EQ(uid, memory->uid) << ": UIDs must match for all processes";
Austin Schuh20b2b082019-09-11 20:42:56 -0700412 }
413
Austin Schuh20b2b082019-09-11 20:42:56 -0700414 return memory;
415}
416
417LocklessQueue::LocklessQueue(LocklessQueueMemory *memory,
418 LocklessQueueConfiguration config)
419 : memory_(InitializeLocklessQueueMemory(memory, config)),
420 watcher_copy_(memory_->num_watchers()),
421 pid_(getpid()),
422 uid_(getuid()) {}
423
424LocklessQueue::~LocklessQueue() {
425 CHECK_EQ(watcher_index_, -1);
426
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800427 GrabQueueSetupLockOrDie grab_queue_setup_lock(memory_);
Austin Schuh20b2b082019-09-11 20:42:56 -0700428 const int num_watchers = memory_->num_watchers();
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800429 // Cleanup is cheap. The next user will do it anyways, so no need for us to do
430 // anything right now.
Austin Schuh20b2b082019-09-11 20:42:56 -0700431
432 // And confirm that nothing is owned by us.
433 for (int i = 0; i < num_watchers; ++i) {
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800434 CHECK(!death_notification_is_held(&(memory_->GetWatcher(i)->tid)));
Austin Schuh20b2b082019-09-11 20:42:56 -0700435 }
Austin Schuh20b2b082019-09-11 20:42:56 -0700436}
437
438size_t LocklessQueue::QueueSize() const { return memory_->queue_size(); }
439
440bool LocklessQueue::RegisterWakeup(int priority) {
441 // TODO(austin): Make sure signal coalescing is turned on. We don't need
442 // duplicates. That will improve performance under high load.
443
444 // Since everything is self consistent, all we need to do is make sure nobody
445 // else is running. Someone dying will get caught in the generic consistency
446 // check.
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800447 GrabQueueSetupLockOrDie grab_queue_setup_lock(memory_);
Austin Schuh20b2b082019-09-11 20:42:56 -0700448 const int num_watchers = memory_->num_watchers();
449
450 // Now, find the first empty watcher and grab it.
451 CHECK_EQ(watcher_index_, -1);
452 for (int i = 0; i < num_watchers; ++i) {
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800453 // If we see a slot the kernel has marked as dead, everything we do reusing
454 // it needs to happen-after whatever that process did before dying.
Brian Silverman2484eea2019-12-21 16:48:46 -0800455 auto *const futex = &(memory_->GetWatcher(i)->tid.futex);
456 const uint32_t tid = __atomic_load_n(futex, __ATOMIC_ACQUIRE);
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800457 if (tid == 0 || (tid & FUTEX_OWNER_DIED)) {
Austin Schuh20b2b082019-09-11 20:42:56 -0700458 watcher_index_ = i;
Brian Silverman2484eea2019-12-21 16:48:46 -0800459 // Relaxed is OK here because we're the only task going to touch it
460 // between here and the write in death_notification_init below (other
461 // recovery is blocked by us holding the setup lock).
462 __atomic_store_n(futex, 0, __ATOMIC_RELAXED);
Austin Schuh20b2b082019-09-11 20:42:56 -0700463 break;
464 }
465 }
466
467 // Bail if we failed to find an open slot.
468 if (watcher_index_ == -1) {
Austin Schuh20b2b082019-09-11 20:42:56 -0700469 return false;
470 }
471
472 Watcher *w = memory_->GetWatcher(watcher_index_);
473
474 w->pid = getpid();
475 w->priority = priority;
476
477 // Grabbing a mutex is a compiler and memory barrier, so nothing before will
478 // get rearranged afterwords.
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800479 death_notification_init(&(w->tid));
480 return true;
Austin Schuh20b2b082019-09-11 20:42:56 -0700481}
482
483void LocklessQueue::UnregisterWakeup() {
484 // Since everything is self consistent, all we need to do is make sure nobody
485 // else is running. Someone dying will get caught in the generic consistency
486 // check.
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800487 GrabQueueSetupLockOrDie grab_queue_setup_lock(memory_);
Austin Schuh20b2b082019-09-11 20:42:56 -0700488
489 // Make sure we are registered.
490 CHECK_NE(watcher_index_, -1);
491
492 // Make sure we still own the slot we are supposed to.
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800493 CHECK(
494 death_notification_is_held(&(memory_->GetWatcher(watcher_index_)->tid)));
Austin Schuh20b2b082019-09-11 20:42:56 -0700495
496 // The act of unlocking invalidates the entry. Invalidate it.
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800497 death_notification_release(&(memory_->GetWatcher(watcher_index_)->tid));
Austin Schuh20b2b082019-09-11 20:42:56 -0700498 // And internally forget the slot.
499 watcher_index_ = -1;
Austin Schuh20b2b082019-09-11 20:42:56 -0700500}
501
502int LocklessQueue::Wakeup(const int current_priority) {
503 const size_t num_watchers = memory_->num_watchers();
504
505 CHECK_EQ(watcher_copy_.size(), num_watchers);
506
507 // Grab a copy so it won't change out from underneath us, and we can sort it
508 // nicely in C++.
509 // Do note that there is still a window where the process can die *after* we
510 // read everything. We will still PI boost and send a signal to the thread in
511 // question. There is no way without pidfd's to close this window, and
512 // creating a pidfd is likely not RT.
513 for (size_t i = 0; i < num_watchers; ++i) {
514 Watcher *w = memory_->GetWatcher(i);
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800515 watcher_copy_[i].tid = __atomic_load_n(&(w->tid.futex), __ATOMIC_RELAXED);
516 // Force the load of the TID to come first.
517 aos_compiler_memory_barrier();
518 watcher_copy_[i].pid = w->pid.load(std::memory_order_relaxed);
519 watcher_copy_[i].priority = w->priority.load(std::memory_order_relaxed);
Austin Schuh20b2b082019-09-11 20:42:56 -0700520
521 // Use a priority of -1 to mean an invalid entry to make sorting easier.
522 if (watcher_copy_[i].tid & FUTEX_OWNER_DIED || watcher_copy_[i].tid == 0) {
523 watcher_copy_[i].priority = -1;
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800524 } else {
525 // Ensure all of this happens after we're done looking at the pid+priority
526 // in shared memory.
527 aos_compiler_memory_barrier();
528 if (watcher_copy_[i].tid != static_cast<pid_t>(__atomic_load_n(
529 &(w->tid.futex), __ATOMIC_RELAXED))) {
530 // Confirm that the watcher hasn't been re-used and modified while we
531 // read it. If it has, mark it invalid again.
532 watcher_copy_[i].priority = -1;
533 watcher_copy_[i].tid = 0;
534 }
Austin Schuh20b2b082019-09-11 20:42:56 -0700535 }
536 }
537
538 // Now sort.
539 ::std::sort(watcher_copy_.begin(), watcher_copy_.end(),
540 [](const WatcherCopy &a, const WatcherCopy &b) {
541 return a.priority > b.priority;
542 });
543
544 int count = 0;
545 if (watcher_copy_[0].priority != -1) {
546 const int max_priority =
547 ::std::max(current_priority, watcher_copy_[0].priority);
548 // Boost if we are RT and there is a higher priority sender out there.
549 // Otherwise we might run into priority inversions.
550 if (max_priority > current_priority && current_priority > 0) {
551 SetCurrentThreadRealtimePriority(max_priority);
552 }
553
554 // Build up the siginfo to send.
555 siginfo_t uinfo;
556 memset(&uinfo, 0, sizeof(uinfo));
557
558 uinfo.si_code = SI_QUEUE;
559 uinfo.si_pid = pid_;
560 uinfo.si_uid = uid_;
561 uinfo.si_value.sival_int = 0;
562
563 for (const WatcherCopy &watcher_copy : watcher_copy_) {
564 // The first -1 priority means we are at the end of the valid list.
565 if (watcher_copy.priority == -1) {
566 break;
567 }
568
569 // Send the signal. Target just the thread that sent it so that we can
570 // support multiple watchers in a process (when someone creates multiple
571 // event loops in different threads).
572 rt_tgsigqueueinfo(watcher_copy.pid, watcher_copy.tid, kWakeupSignal,
573 &uinfo);
574
575 ++count;
576 }
577
578 // Drop back down if we were boosted.
579 if (max_priority > current_priority && current_priority > 0) {
580 SetCurrentThreadRealtimePriority(current_priority);
581 }
582 }
583
584 return count;
585}
586
587LocklessQueue::Sender::Sender(LocklessQueueMemory *memory) : memory_(memory) {
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800588 GrabQueueSetupLockOrDie grab_queue_setup_lock(memory_);
Austin Schuh20b2b082019-09-11 20:42:56 -0700589
590 // Since we already have the lock, go ahead and try cleaning up.
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800591 Cleanup(memory_, grab_queue_setup_lock);
Austin Schuh20b2b082019-09-11 20:42:56 -0700592
593 const int num_senders = memory_->num_senders();
594
595 for (int i = 0; i < num_senders; ++i) {
596 ::aos::ipc_lib::Sender *s = memory->GetSender(i);
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800597 // This doesn't need synchronization because we're the only process doing
598 // initialization right now, and nobody else will be touching senders which
599 // we're interested in.
Austin Schuh20b2b082019-09-11 20:42:56 -0700600 const uint32_t tid = __atomic_load_n(&(s->tid.futex), __ATOMIC_RELAXED);
601 if (tid == 0) {
602 sender_index_ = i;
603 break;
604 }
605 }
606
607 if (sender_index_ == -1) {
Austin Schuhe516ab02020-05-06 21:37:04 -0700608 VLOG(1) << "Too many senders, starting to bail.";
609 return;
Austin Schuh20b2b082019-09-11 20:42:56 -0700610 }
611
612 ::aos::ipc_lib::Sender *s = memory_->GetSender(sender_index_);
613
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800614 // Indicate that we are now alive by taking over the slot. If the previous
615 // owner died, we still want to do this.
616 death_notification_init(&(s->tid));
Austin Schuh20b2b082019-09-11 20:42:56 -0700617}
618
619LocklessQueue::Sender::~Sender() {
Austin Schuhe516ab02020-05-06 21:37:04 -0700620 if (valid()) {
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800621 death_notification_release(&(memory_->GetSender(sender_index_)->tid));
Austin Schuh20b2b082019-09-11 20:42:56 -0700622 }
623}
624
Austin Schuhe516ab02020-05-06 21:37:04 -0700625std::optional<LocklessQueue::Sender> LocklessQueue::MakeSender() {
626 LocklessQueue::Sender result = LocklessQueue::Sender(memory_);
627 if (result.valid()) {
628 return std::move(result);
629 } else {
630 return std::nullopt;
631 }
Austin Schuh20b2b082019-09-11 20:42:56 -0700632}
633
634QueueIndex ZeroOrValid(QueueIndex index) {
635 if (!index.valid()) {
636 return index.Clear();
637 }
638 return index;
639}
640
Alex Perrycb7da4b2019-08-28 19:35:56 -0700641size_t LocklessQueue::Sender::size() { return memory_->message_data_size(); }
642
643void *LocklessQueue::Sender::Data() {
644 ::aos::ipc_lib::Sender *sender = memory_->GetSender(sender_index_);
645 Index scratch_index = sender->scratch_index.RelaxedLoad();
646 Message *message = memory_->GetMessage(scratch_index);
647 message->header.queue_index.Invalidate();
648
Brian Silvermana1652f32020-01-29 20:41:44 -0800649 return message->data(memory_->message_data_size());
Alex Perrycb7da4b2019-08-28 19:35:56 -0700650}
651
Austin Schuhad154822019-12-27 15:45:13 -0800652void LocklessQueue::Sender::Send(
653 const char *data, size_t length,
654 aos::monotonic_clock::time_point monotonic_remote_time,
655 aos::realtime_clock::time_point realtime_remote_time,
656 uint32_t remote_queue_index,
657 aos::monotonic_clock::time_point *monotonic_sent_time,
658 aos::realtime_clock::time_point *realtime_sent_time,
659 uint32_t *queue_index) {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700660 CHECK_LE(length, size());
Austin Schuh67420a42019-12-21 21:55:04 -0800661 // Flatbuffers write from the back of the buffer to the front. If we are
662 // going to write an explicit chunk of memory into the buffer, we need to
663 // adhere to this convention and place it at the end.
664 memcpy((reinterpret_cast<char *>(Data()) + size() - length), data, length);
Austin Schuhad154822019-12-27 15:45:13 -0800665 Send(length, monotonic_remote_time, realtime_remote_time, remote_queue_index,
666 monotonic_sent_time, realtime_sent_time, queue_index);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700667}
668
Austin Schuhad154822019-12-27 15:45:13 -0800669void LocklessQueue::Sender::Send(
670 size_t length, aos::monotonic_clock::time_point monotonic_remote_time,
671 aos::realtime_clock::time_point realtime_remote_time,
672 uint32_t remote_queue_index,
673 aos::monotonic_clock::time_point *monotonic_sent_time,
674 aos::realtime_clock::time_point *realtime_sent_time,
675 uint32_t *queue_index) {
Austin Schuh20b2b082019-09-11 20:42:56 -0700676 const size_t queue_size = memory_->queue_size();
Alex Perrycb7da4b2019-08-28 19:35:56 -0700677 CHECK_LE(length, size());
Austin Schuh20b2b082019-09-11 20:42:56 -0700678
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800679 ::aos::ipc_lib::Sender *const sender = memory_->GetSender(sender_index_);
680 // We can do a relaxed load on our sender because we're the only person
681 // modifying it right now.
682 const Index scratch_index = sender->scratch_index.RelaxedLoad();
683 Message *const message = memory_->GetMessage(scratch_index);
Austin Schuh20b2b082019-09-11 20:42:56 -0700684
Austin Schuh20b2b082019-09-11 20:42:56 -0700685 message->header.length = length;
Austin Schuhad154822019-12-27 15:45:13 -0800686 // Pass these through. Any alternative behavior can be implemented out a
687 // layer.
688 message->header.remote_queue_index = remote_queue_index;
689 message->header.monotonic_remote_time = monotonic_remote_time;
690 message->header.realtime_remote_time = realtime_remote_time;
Austin Schuh20b2b082019-09-11 20:42:56 -0700691
692 while (true) {
693 const QueueIndex actual_next_queue_index =
694 memory_->next_queue_index.Load(queue_size);
695 const QueueIndex next_queue_index = ZeroOrValid(actual_next_queue_index);
696
697 const QueueIndex incremented_queue_index = next_queue_index.Increment();
698
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800699 // This needs to synchronize with whoever the previous writer at this
700 // location was.
Austin Schuh20b2b082019-09-11 20:42:56 -0700701 const Index to_replace = memory_->LoadIndex(next_queue_index);
702
703 const QueueIndex decremented_queue_index =
704 next_queue_index.DecrementBy(queue_size);
705
706 // See if we got beat. If we did, try to atomically update
707 // next_queue_index in case the previous writer failed and retry.
708 if (!to_replace.IsPlausible(decremented_queue_index)) {
709 // We don't care about the result. It will either succeed, or we got
710 // beat in fixing it and just need to give up and try again. If we got
711 // beat multiple times, the only way progress can be made is if the queue
712 // is updated as well. This means that if we retry reading
713 // next_queue_index, we will be at most off by one and can retry.
714 //
715 // Both require no further action from us.
716 //
717 // TODO(austin): If we are having fairness issues under contention, we
718 // could have a mode bit in next_queue_index, and could use a lock or some
719 // other form of PI boosting to let the higher priority task win.
720 memory_->next_queue_index.CompareAndExchangeStrong(
721 actual_next_queue_index, incremented_queue_index);
722
Alex Perrycb7da4b2019-08-28 19:35:56 -0700723 VLOG(3) << "We were beat. Try again. Was " << std::hex
724 << to_replace.get() << ", is " << decremented_queue_index.index();
Austin Schuh20b2b082019-09-11 20:42:56 -0700725 continue;
726 }
727
728 // Confirm that the message is what it should be.
729 {
Austin Schuh20b2b082019-09-11 20:42:56 -0700730 const QueueIndex previous_index =
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800731 memory_->GetMessage(to_replace)->header.queue_index.Load(queue_size);
Austin Schuh20b2b082019-09-11 20:42:56 -0700732 if (previous_index != decremented_queue_index && previous_index.valid()) {
733 // Retry.
Alex Perrycb7da4b2019-08-28 19:35:56 -0700734 VLOG(3) << "Something fishy happened, queue index doesn't match. "
735 "Retrying. Previous index was "
736 << std::hex << previous_index.index() << ", should be "
737 << decremented_queue_index.index();
Austin Schuh20b2b082019-09-11 20:42:56 -0700738 continue;
739 }
740 }
741
742 message->header.monotonic_sent_time = ::aos::monotonic_clock::now();
743 message->header.realtime_sent_time = ::aos::realtime_clock::now();
Austin Schuhad154822019-12-27 15:45:13 -0800744 if (monotonic_sent_time != nullptr) {
745 *monotonic_sent_time = message->header.monotonic_sent_time;
746 }
747 if (realtime_sent_time != nullptr) {
748 *realtime_sent_time = message->header.realtime_sent_time;
749 }
750 if (queue_index != nullptr) {
751 *queue_index = next_queue_index.index();
752 }
Austin Schuh20b2b082019-09-11 20:42:56 -0700753
754 // Before we are fully done filling out the message, update the Sender state
755 // with the new index to write. This re-uses the barrier for the
756 // queue_index store.
Alex Perrycb7da4b2019-08-28 19:35:56 -0700757 const Index index_to_write(next_queue_index, scratch_index.message_index());
Austin Schuh20b2b082019-09-11 20:42:56 -0700758
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800759 aos_compiler_memory_barrier();
760 // We're the only person who cares about our scratch index, besides somebody
761 // cleaning up after us.
Austin Schuh20b2b082019-09-11 20:42:56 -0700762 sender->scratch_index.RelaxedStore(index_to_write);
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800763 aos_compiler_memory_barrier();
Austin Schuh20b2b082019-09-11 20:42:56 -0700764
765 message->header.queue_index.Store(next_queue_index);
766
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800767 aos_compiler_memory_barrier();
Austin Schuh20b2b082019-09-11 20:42:56 -0700768 // The message is now filled out, and we have a confirmed slot to store
769 // into.
770 //
771 // Start by writing down what we are going to pull out of the queue. This
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800772 // was Invalid before now. Only person who will read this is whoever cleans
773 // up after us, so no synchronization necessary.
Austin Schuh20b2b082019-09-11 20:42:56 -0700774 sender->to_replace.RelaxedStore(to_replace);
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800775 aos_compiler_memory_barrier();
Austin Schuh20b2b082019-09-11 20:42:56 -0700776
777 // Then exchange the next index into the queue.
778 if (!memory_->GetQueue(next_queue_index.Wrapped())
779 ->CompareAndExchangeStrong(to_replace, index_to_write)) {
780 // Aw, didn't succeed. Retry.
781 sender->to_replace.RelaxedInvalidate();
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800782 aos_compiler_memory_barrier();
Alex Perrycb7da4b2019-08-28 19:35:56 -0700783 VLOG(3) << "Failed to wrap into queue";
Austin Schuh20b2b082019-09-11 20:42:56 -0700784 continue;
785 }
786
787 // Then update next_queue_index to save the next user some computation time.
788 memory_->next_queue_index.CompareAndExchangeStrong(actual_next_queue_index,
789 incremented_queue_index);
790
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800791 aos_compiler_memory_barrier();
Austin Schuh20b2b082019-09-11 20:42:56 -0700792 // Now update the scratch space and record that we succeeded.
793 sender->scratch_index.Store(to_replace);
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800794 aos_compiler_memory_barrier();
795 // And then record that we succeeded, but definitely after the above store.
Austin Schuh20b2b082019-09-11 20:42:56 -0700796 sender->to_replace.RelaxedInvalidate();
797 break;
798 }
799}
800
801LocklessQueue::ReadResult LocklessQueue::Read(
802 uint32_t uint32_queue_index,
803 ::aos::monotonic_clock::time_point *monotonic_sent_time,
Austin Schuhad154822019-12-27 15:45:13 -0800804 ::aos::realtime_clock::time_point *realtime_sent_time,
805 ::aos::monotonic_clock::time_point *monotonic_remote_time,
806 ::aos::realtime_clock::time_point *realtime_remote_time,
807 uint32_t *remote_queue_index, size_t *length, char *data) {
Austin Schuh20b2b082019-09-11 20:42:56 -0700808 const size_t queue_size = memory_->queue_size();
809
810 // Build up the QueueIndex.
811 const QueueIndex queue_index =
812 QueueIndex::Zero(queue_size).IncrementBy(uint32_queue_index);
813
814 // Read the message stored at the requested location.
815 Index mi = memory_->LoadIndex(queue_index);
816 Message *m = memory_->GetMessage(mi);
817
818 while (true) {
819 // We need to confirm that the data doesn't change while we are reading it.
820 // Do that by first confirming that the message points to the queue index we
821 // want.
822 const QueueIndex starting_queue_index =
823 m->header.queue_index.Load(queue_size);
824 if (starting_queue_index != queue_index) {
825 // If we found a message that is exactly 1 loop old, we just wrapped.
826 if (starting_queue_index == queue_index.DecrementBy(queue_size)) {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700827 VLOG(3) << "Matches: " << std::hex << starting_queue_index.index()
828 << ", " << queue_index.DecrementBy(queue_size).index();
Austin Schuh20b2b082019-09-11 20:42:56 -0700829 return ReadResult::NOTHING_NEW;
830 } else {
831 // Someone has re-used this message between when we pulled it out of the
832 // queue and when we grabbed its index. It is pretty hard to deduce
833 // what happened. Just try again.
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800834 Message *const new_m = memory_->GetMessage(queue_index);
Austin Schuh20b2b082019-09-11 20:42:56 -0700835 if (m != new_m) {
836 m = new_m;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700837 VLOG(3) << "Retrying, m doesn't match";
Austin Schuh20b2b082019-09-11 20:42:56 -0700838 continue;
839 }
840
841 // We have confirmed that message still points to the same message. This
842 // means that the message didn't get swapped out from under us, so
843 // starting_queue_index is correct.
844 //
845 // Either we got too far behind (signaled by this being a valid
846 // message), or this is one of the initial messages which are invalid.
847 if (starting_queue_index.valid()) {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700848 VLOG(3) << "Too old. Tried for " << std::hex << queue_index.index()
849 << ", got " << starting_queue_index.index() << ", behind by "
850 << std::dec
851 << (starting_queue_index.index() - queue_index.index());
Austin Schuh20b2b082019-09-11 20:42:56 -0700852 return ReadResult::TOO_OLD;
853 }
854
Alex Perrycb7da4b2019-08-28 19:35:56 -0700855 VLOG(3) << "Initial";
Austin Schuh20b2b082019-09-11 20:42:56 -0700856
857 // There isn't a valid message at this location.
858 //
859 // If someone asks for one of the messages within the first go around,
860 // then they need to wait. They got ahead. Otherwise, they are
861 // asking for something crazy, like something before the beginning of
862 // the queue. Tell them that they are behind.
863 if (uint32_queue_index < memory_->queue_size()) {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700864 VLOG(3) << "Near zero, " << std::hex << uint32_queue_index;
Austin Schuh20b2b082019-09-11 20:42:56 -0700865 return ReadResult::NOTHING_NEW;
866 } else {
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800867 VLOG(3) << "Not near zero, " << std::hex << uint32_queue_index;
Austin Schuh20b2b082019-09-11 20:42:56 -0700868 return ReadResult::TOO_OLD;
869 }
870 }
871 }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700872 VLOG(3) << "Eq: " << std::hex << starting_queue_index.index() << ", "
873 << queue_index.index();
Austin Schuh20b2b082019-09-11 20:42:56 -0700874 break;
875 }
876
Alex Perrycb7da4b2019-08-28 19:35:56 -0700877 // Then read the data out. Copy it all out to be deterministic and so we can
878 // make length be from either end.
Austin Schuh20b2b082019-09-11 20:42:56 -0700879 *monotonic_sent_time = m->header.monotonic_sent_time;
880 *realtime_sent_time = m->header.realtime_sent_time;
Austin Schuhad154822019-12-27 15:45:13 -0800881 if (m->header.remote_queue_index == 0xffffffffu) {
882 *remote_queue_index = queue_index.index();
883 } else {
884 *remote_queue_index = m->header.remote_queue_index;
885 }
886 *monotonic_remote_time = m->header.monotonic_remote_time;
887 *realtime_remote_time = m->header.realtime_remote_time;
Brian Silverman6b8a3c32020-03-06 11:26:14 -0800888 if (data) {
889 memcpy(data, m->data(memory_->message_data_size()), message_data_size());
890 }
Austin Schuh20b2b082019-09-11 20:42:56 -0700891 *length = m->header.length;
892
893 // And finally, confirm that the message *still* points to the queue index we
894 // want. This means it didn't change out from under us.
895 // If something changed out from under us, we were reading it much too late in
896 // it's lifetime.
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800897 aos_compiler_memory_barrier();
Austin Schuh20b2b082019-09-11 20:42:56 -0700898 const QueueIndex final_queue_index = m->header.queue_index.Load(queue_size);
899 if (final_queue_index != queue_index) {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700900 VLOG(3) << "Changed out from under us. Reading " << std::hex
901 << queue_index.index() << ", finished with "
902 << final_queue_index.index() << ", delta: " << std::dec
903 << (final_queue_index.index() - queue_index.index());
904 return ReadResult::OVERWROTE;
Austin Schuh20b2b082019-09-11 20:42:56 -0700905 }
906
907 return ReadResult::GOOD;
908}
909
Alex Perrycb7da4b2019-08-28 19:35:56 -0700910size_t LocklessQueue::queue_size() const { return memory_->queue_size(); }
911size_t LocklessQueue::message_data_size() const {
912 return memory_->message_data_size();
913}
914
915QueueIndex LocklessQueue::LatestQueueIndex() {
Austin Schuh20b2b082019-09-11 20:42:56 -0700916 const size_t queue_size = memory_->queue_size();
917
918 // There is only one interesting case. We need to know if the queue is empty.
919 // That is done with a sentinel value. At worst, this will be off by one.
920 const QueueIndex next_queue_index =
921 memory_->next_queue_index.Load(queue_size);
922 if (next_queue_index.valid()) {
923 const QueueIndex current_queue_index = next_queue_index.DecrementBy(1u);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700924 return current_queue_index;
Austin Schuh20b2b082019-09-11 20:42:56 -0700925 } else {
926 return empty_queue_index();
927 }
928}
929
930namespace {
931
932// Prints out the mutex state. Not safe to use while the mutex is being
933// changed.
934::std::string PrintMutex(aos_mutex *mutex) {
935 ::std::stringstream s;
936 s << "aos_mutex(" << ::std::hex << mutex->futex;
937
938 if (mutex->futex != 0) {
939 s << ":";
940 if (mutex->futex & FUTEX_OWNER_DIED) {
941 s << "FUTEX_OWNER_DIED|";
942 }
943 s << "tid=" << (mutex->futex & FUTEX_TID_MASK);
944 }
945
946 s << ")";
947 return s.str();
948}
949
950} // namespace
951
952void PrintLocklessQueueMemory(LocklessQueueMemory *memory) {
953 const size_t queue_size = memory->queue_size();
954 ::std::cout << "LocklessQueueMemory (" << memory << ") {" << ::std::endl;
955 ::std::cout << " aos_mutex queue_setup_lock = "
956 << PrintMutex(&memory->queue_setup_lock) << ::std::endl;
Brian Silvermanfafe1fa2019-12-18 21:42:18 -0800957 ::std::cout << " bool initialized = " << memory->initialized << ::std::endl;
Austin Schuh20b2b082019-09-11 20:42:56 -0700958 ::std::cout << " config {" << ::std::endl;
959 ::std::cout << " size_t num_watchers = " << memory->config.num_watchers
960 << ::std::endl;
961 ::std::cout << " size_t num_senders = " << memory->config.num_senders
962 << ::std::endl;
963 ::std::cout << " size_t queue_size = " << memory->config.queue_size
964 << ::std::endl;
965 ::std::cout << " size_t message_data_size = "
966 << memory->config.message_data_size << ::std::endl;
967
968 ::std::cout << " AtomicQueueIndex next_queue_index = "
969 << memory->next_queue_index.Load(queue_size).DebugString()
970 << ::std::endl;
971
Austin Schuh3328d132020-02-28 13:54:57 -0800972 ::std::cout << " uid_t uid = " << memory->uid << ::std::endl;
973
Austin Schuh20b2b082019-09-11 20:42:56 -0700974 ::std::cout << " }" << ::std::endl;
975 ::std::cout << " AtomicIndex queue[" << queue_size << "] {" << ::std::endl;
976 for (size_t i = 0; i < queue_size; ++i) {
977 ::std::cout << " [" << i << "] -> "
978 << memory->GetQueue(i)->Load().DebugString() << ::std::endl;
979 }
980 ::std::cout << " }" << ::std::endl;
981 ::std::cout << " Message messages[" << memory->num_messages() << "] {"
982 << ::std::endl;
983 for (size_t i = 0; i < memory->num_messages(); ++i) {
984 Message *m = memory->GetMessage(Index(i, i));
Brian Silverman001f24d2020-08-12 19:33:20 -0700985 ::std::cout << " [" << i << "] -> Message 0x" << std::hex
986 << (reinterpret_cast<uintptr_t>(
987 memory->GetMessage(Index(i, i))) -
988 reinterpret_cast<uintptr_t>(memory))
989 << std::dec << " {" << ::std::endl;
Austin Schuh20b2b082019-09-11 20:42:56 -0700990 ::std::cout << " Header {" << ::std::endl;
991 ::std::cout << " AtomicQueueIndex queue_index = "
992 << m->header.queue_index.Load(queue_size).DebugString()
993 << ::std::endl;
Brian Silverman001f24d2020-08-12 19:33:20 -0700994 ::std::cout << " monotonic_clock::time_point monotonic_sent_time = "
995 << m->header.monotonic_sent_time << " 0x" << std::hex
996 << m->header.monotonic_sent_time.time_since_epoch().count()
997 << std::dec << ::std::endl;
998 ::std::cout << " realtime_clock::time_point realtime_sent_time = "
999 << m->header.realtime_sent_time << " 0x" << std::hex
1000 << m->header.realtime_sent_time.time_since_epoch().count()
1001 << std::dec << ::std::endl;
1002 ::std::cout
1003 << " monotonic_clock::time_point monotonic_remote_time = "
1004 << m->header.monotonic_remote_time << " 0x" << std::hex
1005 << m->header.monotonic_remote_time.time_since_epoch().count()
1006 << std::dec << ::std::endl;
1007 ::std::cout << " realtime_clock::time_point realtime_remote_time = "
1008 << m->header.realtime_remote_time << " 0x" << std::hex
1009 << m->header.realtime_remote_time.time_since_epoch().count()
1010 << std::dec << ::std::endl;
Austin Schuh20b2b082019-09-11 20:42:56 -07001011 ::std::cout << " size_t length = " << m->header.length
1012 << ::std::endl;
1013 ::std::cout << " }" << ::std::endl;
1014 ::std::cout << " data: {";
1015
Brian Silverman001f24d2020-08-12 19:33:20 -07001016 if (FLAGS_dump_lockless_queue_data) {
1017 const char *const m_data = m->data(memory->message_data_size());
1018 for (size_t j = 0; j < m->header.length; ++j) {
1019 char data = m_data[j];
1020 if (j != 0) {
1021 ::std::cout << " ";
1022 }
1023 if (::std::isprint(data)) {
1024 ::std::cout << ::std::setfill(' ') << ::std::setw(2) << ::std::hex
1025 << data;
1026 } else {
1027 ::std::cout << "0x" << ::std::setfill('0') << ::std::setw(2)
1028 << ::std::hex << (static_cast<unsigned>(data) & 0xff);
1029 }
Austin Schuh20b2b082019-09-11 20:42:56 -07001030 }
1031 }
1032 ::std::cout << ::std::setfill(' ') << ::std::dec << "}" << ::std::endl;
1033 ::std::cout << " }," << ::std::endl;
1034 }
1035 ::std::cout << " }" << ::std::endl;
1036
Alex Perrycb7da4b2019-08-28 19:35:56 -07001037 ::std::cout << " Sender senders[" << memory->num_senders() << "] {"
1038 << ::std::endl;
Austin Schuh20b2b082019-09-11 20:42:56 -07001039 for (size_t i = 0; i < memory->num_senders(); ++i) {
1040 Sender *s = memory->GetSender(i);
1041 ::std::cout << " [" << i << "] -> Sender {" << ::std::endl;
1042 ::std::cout << " aos_mutex tid = " << PrintMutex(&s->tid)
1043 << ::std::endl;
1044 ::std::cout << " AtomicIndex scratch_index = "
1045 << s->scratch_index.Load().DebugString() << ::std::endl;
1046 ::std::cout << " AtomicIndex to_replace = "
1047 << s->to_replace.Load().DebugString() << ::std::endl;
1048 ::std::cout << " }" << ::std::endl;
1049 }
1050 ::std::cout << " }" << ::std::endl;
1051
1052 ::std::cout << " Watcher watchers[" << memory->num_watchers() << "] {"
1053 << ::std::endl;
1054 for (size_t i = 0; i < memory->num_watchers(); ++i) {
1055 Watcher *w = memory->GetWatcher(i);
1056 ::std::cout << " [" << i << "] -> Watcher {" << ::std::endl;
1057 ::std::cout << " aos_mutex tid = " << PrintMutex(&w->tid)
1058 << ::std::endl;
1059 ::std::cout << " pid_t pid = " << w->pid << ::std::endl;
1060 ::std::cout << " int priority = " << w->priority << ::std::endl;
1061 ::std::cout << " }" << ::std::endl;
1062 }
1063 ::std::cout << " }" << ::std::endl;
1064
1065 ::std::cout << "}" << ::std::endl;
1066}
1067
1068} // namespace ipc_lib
1069} // namespace aos