Clean up memory barriers and documentation in lockless_queue
Various fixups to make it safe under my understanding of the C++ memory
model, and also easier to verify that that's true.
Change-Id: I94afdc3908c9b77a9e72a33abaae5c4a354de350
diff --git a/aos/ipc_lib/lockless_queue.cc b/aos/ipc_lib/lockless_queue.cc
index c2b0254..78f990d 100644
--- a/aos/ipc_lib/lockless_queue.cc
+++ b/aos/ipc_lib/lockless_queue.cc
@@ -16,16 +16,31 @@
namespace aos {
namespace ipc_lib {
-
namespace {
-void GrabQueueSetupLockOrDie(LocklessQueueMemory *memory) {
- const int result = mutex_grab(&(memory->queue_setup_lock));
- CHECK(result == 0 || result == 1);
-}
+class GrabQueueSetupLockOrDie {
+ public:
+ GrabQueueSetupLockOrDie(LocklessQueueMemory *memory) : memory_(memory) {
+ const int result = mutex_grab(&(memory->queue_setup_lock));
+ CHECK(result == 0 || result == 1) << ": " << result;
+ }
-// This must be called under the queue_setup_lock.
-void Cleanup(LocklessQueueMemory *memory) {
+ ~GrabQueueSetupLockOrDie() { mutex_unlock(&(memory_->queue_setup_lock)); }
+
+ GrabQueueSetupLockOrDie(const GrabQueueSetupLockOrDie &) = delete;
+ GrabQueueSetupLockOrDie &operator=(const GrabQueueSetupLockOrDie &) = delete;
+
+ private:
+ LocklessQueueMemory *const memory_;
+};
+
+void Cleanup(LocklessQueueMemory *memory, const GrabQueueSetupLockOrDie &) {
+ // Make sure we start looking at shared memory fresh right now. We'll handle
+ // people dying partway through by either cleaning up after them or not, but
+ // we want to ensure we clean up after anybody who has already died when we
+ // start.
+ aos_compiler_memory_barrier();
+
const size_t num_senders = memory->num_senders();
const size_t queue_size = memory->queue_size();
const size_t num_messages = memory->num_messages();
@@ -54,14 +69,16 @@
for (size_t i = 0; i < num_senders; ++i) {
Sender *sender = memory->GetSender(i);
const uint32_t tid =
- __atomic_load_n(&(sender->tid.futex), __ATOMIC_RELAXED);
+ __atomic_load_n(&(sender->tid.futex), __ATOMIC_ACQUIRE);
if (tid & FUTEX_OWNER_DIED) {
VLOG(3) << "Found an easy death for sender " << i;
+ // We can do a relaxed load here because we're the only person touching
+ // this sender at this point.
const Index to_replace = sender->to_replace.RelaxedLoad();
const Index scratch_index = sender->scratch_index.Load();
// I find it easiest to think about this in terms of the set of observable
- // states. The main code follows the following states:
+ // states. The main code progresses through the following states:
// 1) scratch_index = xxx
// to_replace = invalid
@@ -92,14 +109,14 @@
sender->to_replace.Invalidate();
// And mark that we succeeded.
- __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_SEQ_CST);
+ __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_RELEASE);
++valid_senders;
}
} else {
// 1) or 4). Make sure we aren't corrupted and declare victory.
CHECK(scratch_index.valid());
- __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_SEQ_CST);
+ __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_RELEASE);
++valid_senders;
}
} else {
@@ -123,12 +140,16 @@
while ((num_accounted_for + num_missing) != num_messages) {
num_missing = 0;
for (size_t i = 0; i < num_senders; ++i) {
- Sender *sender = memory->GetSender(i);
+ Sender *const sender = memory->GetSender(i);
const uint32_t tid =
- __atomic_load_n(&(sender->tid.futex), __ATOMIC_RELAXED);
+ __atomic_load_n(&(sender->tid.futex), __ATOMIC_ACQUIRE);
if (tid & FUTEX_OWNER_DIED) {
++num_missing;
} else {
+ // We can do a relaxed load here because we're the only person touching
+ // this sender at this point, if it matters. If it's not a dead sender,
+ // then any message it every has will already be accounted for, so this
+ // will always be a NOP.
const Index scratch_index = sender->scratch_index.RelaxedLoad();
if (!accounted_for[scratch_index.message_index()]) {
++num_accounted_for;
@@ -138,6 +159,7 @@
}
for (size_t i = 0; i < queue_size; ++i) {
+ // Same logic as above for scratch_index applies here too.
const Index index = memory->GetQueue(i)->RelaxedLoad();
if (!accounted_for[index.message_index()]) {
++num_accounted_for;
@@ -151,8 +173,10 @@
for (size_t i = 0; i < num_senders; ++i) {
Sender *sender = memory->GetSender(i);
const uint32_t tid =
- __atomic_load_n(&(sender->tid.futex), __ATOMIC_RELAXED);
+ __atomic_load_n(&(sender->tid.futex), __ATOMIC_ACQUIRE);
if (tid & FUTEX_OWNER_DIED) {
+ // We can do relaxed loads here because we're the only person touching
+ // this sender at this point.
const Index scratch_index = sender->scratch_index.RelaxedLoad();
const Index to_replace = sender->to_replace.RelaxedLoad();
@@ -170,7 +194,7 @@
sender->to_replace.Invalidate();
// And then mark this sender clean.
- __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_SEQ_CST);
+ __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_RELEASE);
// And account for scratch_index.
accounted_for[scratch_index.message_index()] = true;
@@ -188,7 +212,7 @@
sender->to_replace.Invalidate();
// And then mark this sender clean.
- __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_SEQ_CST);
+ __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_RELEASE);
// And account for to_replace.
accounted_for[to_replace.message_index()] = true;
@@ -208,6 +232,7 @@
// Exposes rt_tgsigqueueinfo so we can send the signal *just* to the target
// thread.
+// TODO(Brian): Do directly in assembly for armhf at least for efficiency.
int rt_tgsigqueueinfo(pid_t tgid, pid_t tid, int sig, siginfo_t *si) {
return syscall(SYS_rt_tgsigqueueinfo, tgid, tid, sig, si);
}
@@ -215,25 +240,25 @@
} // namespace
size_t LocklessQueueMemorySize(LocklessQueueConfiguration config) {
- // Round up the message size so following data is double aligned. That should
- // be overkill for most platforms. And the checks below confirms it.
- config.message_data_size = (config.message_data_size + 7) & ~0x7;
+ // Round up the message size so following data is aligned appropriately.
+ config.message_data_size =
+ LocklessQueueMemory::AlignmentRoundUp(config.message_data_size);
// As we build up the size, confirm that everything is aligned to the
// alignment requirements of the type.
size_t size = sizeof(LocklessQueueMemory);
- CHECK_EQ(size & (alignof(LocklessQueueMemory) - 1), 0u);
+ CHECK_EQ(size % alignof(LocklessQueueMemory), 0u);
- CHECK_EQ(size & (alignof(AtomicIndex) - 1), 0u);
+ CHECK_EQ(size % alignof(AtomicIndex), 0u);
size += LocklessQueueMemory::SizeOfQueue(config);
- CHECK_EQ(size & (alignof(Message) - 1), 0u);
+ CHECK_EQ(size % alignof(Message), 0u);
size += LocklessQueueMemory::SizeOfMessages(config);
- CHECK_EQ(size & (alignof(Watcher) - 1), 0u);
+ CHECK_EQ(size % alignof(Watcher), 0u);
size += LocklessQueueMemory::SizeOfWatchers(config);
- CHECK_EQ(size & (alignof(Sender) - 1), 0u);
+ CHECK_EQ(size % alignof(Sender), 0u);
size += LocklessQueueMemory::SizeOfSenders(config);
return size;
@@ -246,15 +271,15 @@
// Grab the mutex. We don't care if the previous reader died. We are going
// to check everything anyways.
- GrabQueueSetupLockOrDie(memory);
+ GrabQueueSetupLockOrDie grab_queue_setup_lock(memory);
if (!memory->initialized) {
// TODO(austin): Check these for out of bounds.
memory->config.num_watchers = config.num_watchers;
memory->config.num_senders = config.num_senders;
memory->config.queue_size = config.queue_size;
- // Round up to the nearest double word bytes.
- memory->config.message_data_size = (config.message_data_size + 7) & ~0x7;
+ memory->config.message_data_size =
+ LocklessQueueMemory::AlignmentRoundUp(config.message_data_size);
const size_t num_messages = memory->num_messages();
// There need to be at most MaxMessages() messages allocated.
@@ -278,18 +303,18 @@
for (size_t i = 0; i < memory->num_senders(); ++i) {
::aos::ipc_lib::Sender *s = memory->GetSender(i);
- s->scratch_index.Store(Index(0xffff, i + memory->queue_size()));
+ // Nobody else can possibly be touching these because we haven't set
+ // initialized to true yet.
+ s->scratch_index.RelaxedStore(Index(0xffff, i + memory->queue_size()));
s->to_replace.RelaxedInvalidate();
}
+ aos_compiler_memory_barrier();
// Signal everything is done. This needs to be done last, so if we die, we
// redo initialization.
- // This is a full atomic (probably overkill), but this is at initialization
- // time, so it is cheap.
- memory->initialized.store(true);
+ memory->initialized = true;
}
- mutex_unlock(&(memory->queue_setup_lock));
return memory;
}
@@ -303,15 +328,15 @@
LocklessQueue::~LocklessQueue() {
CHECK_EQ(watcher_index_, -1);
- GrabQueueSetupLockOrDie(memory_);
+ GrabQueueSetupLockOrDie grab_queue_setup_lock(memory_);
const int num_watchers = memory_->num_watchers();
- // Cleanup is cheap. Go for it anyways.
+ // Cleanup is cheap. The next user will do it anyways, so no need for us to do
+ // anything right now.
// And confirm that nothing is owned by us.
for (int i = 0; i < num_watchers; ++i) {
- CHECK(!mutex_islocked(&(memory_->GetWatcher(i)->tid)));
+ CHECK(!death_notification_is_held(&(memory_->GetWatcher(i)->tid)));
}
- mutex_unlock(&(memory_->queue_setup_lock));
}
size_t LocklessQueue::QueueSize() const { return memory_->queue_size(); }
@@ -323,15 +348,17 @@
// Since everything is self consistent, all we need to do is make sure nobody
// else is running. Someone dying will get caught in the generic consistency
// check.
- GrabQueueSetupLockOrDie(memory_);
+ GrabQueueSetupLockOrDie grab_queue_setup_lock(memory_);
const int num_watchers = memory_->num_watchers();
// Now, find the first empty watcher and grab it.
CHECK_EQ(watcher_index_, -1);
for (int i = 0; i < num_watchers; ++i) {
+ // If we see a slot the kernel has marked as dead, everything we do reusing
+ // it needs to happen-after whatever that process did before dying.
const uint32_t tid =
- __atomic_load_n(&(memory_->GetWatcher(i)->tid.futex), __ATOMIC_RELAXED);
- if (tid == 0 || tid & FUTEX_OWNER_DIED) {
+ __atomic_load_n(&(memory_->GetWatcher(i)->tid.futex), __ATOMIC_ACQUIRE);
+ if (tid == 0 || (tid & FUTEX_OWNER_DIED)) {
watcher_index_ = i;
break;
}
@@ -339,7 +366,6 @@
// Bail if we failed to find an open slot.
if (watcher_index_ == -1) {
- mutex_unlock(&(memory_->queue_setup_lock));
return false;
}
@@ -350,36 +376,27 @@
// Grabbing a mutex is a compiler and memory barrier, so nothing before will
// get rearranged afterwords.
- //
- // Since everything is done under the queue_setup_lock, this should always
- // return immediately.
- const int result = mutex_grab(&(w->tid));
-
- mutex_unlock(&(memory_->queue_setup_lock));
-
- // We should either get the lock, or the previous owner should have died.
- // Anything else is a pretty serious error.
- return result == 0 || result == 1;
+ death_notification_init(&(w->tid));
+ return true;
}
void LocklessQueue::UnregisterWakeup() {
// Since everything is self consistent, all we need to do is make sure nobody
// else is running. Someone dying will get caught in the generic consistency
// check.
- GrabQueueSetupLockOrDie(memory_);
+ GrabQueueSetupLockOrDie grab_queue_setup_lock(memory_);
// Make sure we are registered.
CHECK_NE(watcher_index_, -1);
// Make sure we still own the slot we are supposed to.
- CHECK(mutex_islocked(&(memory_->GetWatcher(watcher_index_)->tid)));
+ CHECK(
+ death_notification_is_held(&(memory_->GetWatcher(watcher_index_)->tid)));
// The act of unlocking invalidates the entry. Invalidate it.
- mutex_unlock(&(memory_->GetWatcher(watcher_index_)->tid));
+ death_notification_release(&(memory_->GetWatcher(watcher_index_)->tid));
// And internally forget the slot.
watcher_index_ = -1;
-
- mutex_unlock(&(memory_->queue_setup_lock));
}
int LocklessQueue::Wakeup(const int current_priority) {
@@ -395,22 +412,26 @@
// creating a pidfd is likely not RT.
for (size_t i = 0; i < num_watchers; ++i) {
Watcher *w = memory_->GetWatcher(i);
- // Start by reading the tid. This needs to be atomic to force it to come
- // first.
- watcher_copy_[i].tid = __atomic_load_n(&(w->tid.futex), __ATOMIC_SEQ_CST);
- watcher_copy_[i].pid = w->pid;
- watcher_copy_[i].priority = w->priority;
+ watcher_copy_[i].tid = __atomic_load_n(&(w->tid.futex), __ATOMIC_RELAXED);
+ // Force the load of the TID to come first.
+ aos_compiler_memory_barrier();
+ watcher_copy_[i].pid = w->pid.load(std::memory_order_relaxed);
+ watcher_copy_[i].priority = w->priority.load(std::memory_order_relaxed);
// Use a priority of -1 to mean an invalid entry to make sorting easier.
if (watcher_copy_[i].tid & FUTEX_OWNER_DIED || watcher_copy_[i].tid == 0) {
watcher_copy_[i].priority = -1;
- } else if (watcher_copy_[i].tid !=
- static_cast<pid_t>(
- __atomic_load_n(&(w->tid.futex), __ATOMIC_SEQ_CST))) {
- // Confirm that the watcher hasn't been re-used and modified while we read
- // it. If it has, mark it invalid again.
- watcher_copy_[i].priority = -1;
- watcher_copy_[i].tid = 0;
+ } else {
+ // Ensure all of this happens after we're done looking at the pid+priority
+ // in shared memory.
+ aos_compiler_memory_barrier();
+ if (watcher_copy_[i].tid != static_cast<pid_t>(__atomic_load_n(
+ &(w->tid.futex), __ATOMIC_RELAXED))) {
+ // Confirm that the watcher hasn't been re-used and modified while we
+ // read it. If it has, mark it invalid again.
+ watcher_copy_[i].priority = -1;
+ watcher_copy_[i].tid = 0;
+ }
}
}
@@ -464,15 +485,18 @@
}
LocklessQueue::Sender::Sender(LocklessQueueMemory *memory) : memory_(memory) {
- GrabQueueSetupLockOrDie(memory_);
+ GrabQueueSetupLockOrDie grab_queue_setup_lock(memory_);
// Since we already have the lock, go ahead and try cleaning up.
- Cleanup(memory_);
+ Cleanup(memory_, grab_queue_setup_lock);
const int num_senders = memory_->num_senders();
for (int i = 0; i < num_senders; ++i) {
::aos::ipc_lib::Sender *s = memory->GetSender(i);
+ // This doesn't need synchronization because we're the only process doing
+ // initialization right now, and nobody else will be touching senders which
+ // we're interested in.
const uint32_t tid = __atomic_load_n(&(s->tid.futex), __ATOMIC_RELAXED);
if (tid == 0) {
sender_index_ = i;
@@ -486,17 +510,14 @@
::aos::ipc_lib::Sender *s = memory_->GetSender(sender_index_);
- // Atomically grab the mutex. This signals that we are alive. If the
- // previous owner died, we don't care, and want to grab the mutex anyways.
- const int result = mutex_grab(&(s->tid));
- CHECK(result == 0 || result == 1);
-
- mutex_unlock(&(memory->queue_setup_lock));
+ // Indicate that we are now alive by taking over the slot. If the previous
+ // owner died, we still want to do this.
+ death_notification_init(&(s->tid));
}
LocklessQueue::Sender::~Sender() {
if (memory_ != nullptr) {
- mutex_unlock(&(memory_->GetSender(sender_index_)->tid));
+ death_notification_release(&(memory_->GetSender(sender_index_)->tid));
}
}
@@ -532,9 +553,11 @@
const size_t queue_size = memory_->queue_size();
CHECK_LE(length, size());
- ::aos::ipc_lib::Sender *sender = memory_->GetSender(sender_index_);
- Index scratch_index = sender->scratch_index.RelaxedLoad();
- Message *message = memory_->GetMessage(scratch_index);
+ ::aos::ipc_lib::Sender *const sender = memory_->GetSender(sender_index_);
+ // We can do a relaxed load on our sender because we're the only person
+ // modifying it right now.
+ const Index scratch_index = sender->scratch_index.RelaxedLoad();
+ Message *const message = memory_->GetMessage(scratch_index);
message->header.length = length;
@@ -545,7 +568,8 @@
const QueueIndex incremented_queue_index = next_queue_index.Increment();
- // TODO(austin): I think we can drop the barrier off this.
+ // This needs to synchronize with whoever the previous writer at this
+ // location was.
const Index to_replace = memory_->LoadIndex(next_queue_index);
const QueueIndex decremented_queue_index =
@@ -575,12 +599,8 @@
// Confirm that the message is what it should be.
{
- // We just need this to be atomic and after the index has been calculated
- // and before we exchange the index back in. Both of those will be strong
- // barriers, so this is fine.
const QueueIndex previous_index =
- memory_->GetMessage(to_replace)
- ->header.queue_index.RelaxedLoad(queue_size);
+ memory_->GetMessage(to_replace)->header.queue_index.Load(queue_size);
if (previous_index != decremented_queue_index && previous_index.valid()) {
// Retry.
VLOG(3) << "Something fishy happened, queue index doesn't match. "
@@ -599,22 +619,30 @@
// queue_index store.
const Index index_to_write(next_queue_index, scratch_index.message_index());
+ aos_compiler_memory_barrier();
+ // We're the only person who cares about our scratch index, besides somebody
+ // cleaning up after us.
sender->scratch_index.RelaxedStore(index_to_write);
+ aos_compiler_memory_barrier();
message->header.queue_index.Store(next_queue_index);
+ aos_compiler_memory_barrier();
// The message is now filled out, and we have a confirmed slot to store
// into.
//
// Start by writing down what we are going to pull out of the queue. This
- // was Invalid before now.
+ // was Invalid before now. Only person who will read this is whoever cleans
+ // up after us, so no synchronization necessary.
sender->to_replace.RelaxedStore(to_replace);
+ aos_compiler_memory_barrier();
// Then exchange the next index into the queue.
if (!memory_->GetQueue(next_queue_index.Wrapped())
->CompareAndExchangeStrong(to_replace, index_to_write)) {
// Aw, didn't succeed. Retry.
sender->to_replace.RelaxedInvalidate();
+ aos_compiler_memory_barrier();
VLOG(3) << "Failed to wrap into queue";
continue;
}
@@ -623,12 +651,11 @@
memory_->next_queue_index.CompareAndExchangeStrong(actual_next_queue_index,
incremented_queue_index);
+ aos_compiler_memory_barrier();
// Now update the scratch space and record that we succeeded.
sender->scratch_index.Store(to_replace);
- // And then clear out the entry used to replace. This just needs to be
- // atomic. It can't be moved above the store because that is a full
- // barrier, but delaying it until later will only affect things if something
- // died.
+ aos_compiler_memory_barrier();
+ // And then record that we succeeded, but definitely after the above store.
sender->to_replace.RelaxedInvalidate();
break;
}
@@ -665,7 +692,7 @@
// Someone has re-used this message between when we pulled it out of the
// queue and when we grabbed its index. It is pretty hard to deduce
// what happened. Just try again.
- Message *new_m = memory_->GetMessage(queue_index);
+ Message *const new_m = memory_->GetMessage(queue_index);
if (m != new_m) {
m = new_m;
VLOG(3) << "Retrying, m doesn't match";
@@ -698,7 +725,7 @@
VLOG(3) << "Near zero, " << std::hex << uint32_queue_index;
return ReadResult::NOTHING_NEW;
} else {
- VLOG(3) << "not near zero, " << std::hex << uint32_queue_index;
+ VLOG(3) << "Not near zero, " << std::hex << uint32_queue_index;
return ReadResult::TOO_OLD;
}
}
@@ -719,6 +746,7 @@
// want. This means it didn't change out from under us.
// If something changed out from under us, we were reading it much too late in
// it's lifetime.
+ aos_compiler_memory_barrier();
const QueueIndex final_queue_index = m->header.queue_index.Load(queue_size);
if (final_queue_index != queue_index) {
VLOG(3) << "Changed out from under us. Reading " << std::hex
@@ -778,8 +806,7 @@
::std::cout << "LocklessQueueMemory (" << memory << ") {" << ::std::endl;
::std::cout << " aos_mutex queue_setup_lock = "
<< PrintMutex(&memory->queue_setup_lock) << ::std::endl;
- ::std::cout << " ::std::atomic<bool> initialized = " << memory->initialized
- << ::std::endl;
+ ::std::cout << " bool initialized = " << memory->initialized << ::std::endl;
::std::cout << " config {" << ::std::endl;
::std::cout << " size_t num_watchers = " << memory->config.num_watchers
<< ::std::endl;
diff --git a/aos/ipc_lib/lockless_queue.h b/aos/ipc_lib/lockless_queue.h
index fcc5d79..4a8523b 100644
--- a/aos/ipc_lib/lockless_queue.h
+++ b/aos/ipc_lib/lockless_queue.h
@@ -4,7 +4,6 @@
#include <signal.h>
#include <sys/signalfd.h>
#include <sys/types.h>
-#include <atomic>
#include <vector>
#include "aos/ipc_lib/aos_sync.h"
@@ -20,15 +19,16 @@
// then this watcher is invalid. The futex variable will then hold the tid of
// the watcher, or FUTEX_OWNER_DIED if the task died.
//
- // Note: this is modified with a lock held, but is always able to be read.
+ // Note: this is only modified with the queue_setup_lock lock held, but may
+ // always be read.
// Any state modification should happen before the lock is acquired.
aos_mutex tid;
// PID of the watcher.
- pid_t pid;
+ std::atomic<pid_t> pid;
// RT priority of the watcher.
- int priority;
+ std::atomic<int> priority;
};
// Structure to hold the state required to send messages.
@@ -37,7 +37,8 @@
// this sender is invalid. The futex variable will then hold the tid of the
// sender, or FUTEX_OWNER_DIED if the task died.
//
- // Note: this is modified with a lock held, but is always able to be read.
+ // Note: this is only modified with the queue_setup_lock lock held, but may
+ // always be read.
aos_mutex tid;
// Index of the message we will be filling out.
@@ -59,7 +60,7 @@
// Note: a value of 0xffffffff always means that the contents aren't valid.
AtomicQueueIndex queue_index;
- // Timestamp of the message. Needs to be atomically incrementing in the
+ // Timestamp of the message. Needs to be monotonically incrementing in the
// queue, which means that time needs to be re-sampled every time a write
// fails.
::aos::monotonic_clock::time_point monotonic_sent_time;
diff --git a/aos/ipc_lib/lockless_queue_memory.h b/aos/ipc_lib/lockless_queue_memory.h
index 7cb90e5..0c0973c 100644
--- a/aos/ipc_lib/lockless_queue_memory.h
+++ b/aos/ipc_lib/lockless_queue_memory.h
@@ -2,33 +2,37 @@
#define AOS_IPC_LIB_LOCKLESS_QUEUE_MEMORY_H_
#include <sys/types.h>
-#include <atomic>
+#include <cstddef>
#include "aos/ipc_lib/aos_sync.h"
#include "aos/ipc_lib/index.h"
+#include "aos/ipc_lib/lockless_queue.h"
#include "aos/time/time.h"
namespace aos {
namespace ipc_lib {
struct LocklessQueueMemory {
+ // This is held during initialization. Cleanup after dead processes happens
+ // during this initialization process.
+ //
// A lot of things get easier if the only lockless writes are when messages
// are published. Do note, the datastructures protected by this lock need to
// be consistent at all times because a reader (or writer) may read them
// regardless of if the lock is held or not.
- //
- // Any non-constant time operations need to be done at queue startup time,
- // including cleanup. Cleanup is done when the next queue is opened.
aos_mutex queue_setup_lock;
- ::std::atomic<bool> initialized;
+ // Tracks that one process has completed initialization once.
+ // Only accessed under queue_setup_lock.
+ bool initialized;
LocklessQueueConfiguration config;
- // Size of the watchers list.
+ // Size of the watcher list.
size_t num_watchers() const { return config.num_watchers; }
// Size of the sender list.
size_t num_senders() const { return config.num_senders; }
+ // Number of messages logically in the queue at a time.
// List of pointers into the messages list.
size_t queue_size() const { return config.queue_size; }