Improve const-correctness in the LocklessQueue code
This will allow using a read-only mapping for reading data from queues
in ShmEventLoop. This will make it much harder for code reading from a
queue to accidentally modify it.
This involves splitting up LocklessQueue into individual components.
This makes it much more obvious what state is used where, and allows
adding some consts.
Change-Id: Ic83b0d2169e6dfae3eec656aa8e49852125698d9
diff --git a/aos/ipc_lib/lockless_queue.cc b/aos/ipc_lib/lockless_queue.cc
index dad44bb..4115769 100644
--- a/aos/ipc_lib/lockless_queue.cc
+++ b/aos/ipc_lib/lockless_queue.cc
@@ -424,6 +424,13 @@
return syscall(SYS_rt_tgsigqueueinfo, tgid, tid, sig, si);
}
+QueueIndex ZeroOrValid(QueueIndex index) {
+ if (!index.valid()) {
+ return index.Clear();
+ }
+ return index;
+}
+
} // namespace
size_t LocklessQueueConfiguration::message_size() const {
@@ -571,30 +578,57 @@
return memory;
}
-LocklessQueue::LocklessQueue(LocklessQueueMemory *memory,
- LocklessQueueConfiguration config)
- : memory_(InitializeLocklessQueueMemory(memory, config)),
- watcher_copy_(memory_->num_watchers()),
- pid_(getpid()),
- uid_(getuid()) {}
+void LocklessQueue::Initialize() {
+ InitializeLocklessQueueMemory(memory_, config_);
+}
-LocklessQueue::~LocklessQueue() {
- CHECK_EQ(watcher_index_, -1);
+LocklessQueueWatcher::~LocklessQueueWatcher() {
+ if (watcher_index_ == -1) {
+ return;
+ }
+ // Since everything is self consistent, all we need to do is make sure nobody
+ // else is running. Someone dying will get caught in the generic consistency
+ // check.
GrabQueueSetupLockOrDie grab_queue_setup_lock(memory_);
- const int num_watchers = memory_->num_watchers();
+
+ // Make sure we are registered.
+ CHECK_NE(watcher_index_, -1);
+
+ // Make sure we still own the slot we are supposed to.
+ CHECK(
+ death_notification_is_held(&(memory_->GetWatcher(watcher_index_)->tid)));
+
+ // The act of unlocking invalidates the entry. Invalidate it.
+ death_notification_release(&(memory_->GetWatcher(watcher_index_)->tid));
+ // And internally forget the slot.
+ watcher_index_ = -1;
+
// Cleanup is cheap. The next user will do it anyways, so no need for us to do
// anything right now.
// And confirm that nothing is owned by us.
+ const int num_watchers = memory_->num_watchers();
for (int i = 0; i < num_watchers; ++i) {
- CHECK(!death_notification_is_held(&(memory_->GetWatcher(i)->tid)));
+ CHECK(!death_notification_is_held(&(memory_->GetWatcher(i)->tid)))
+ << ": " << i;
}
}
-size_t LocklessQueue::QueueSize() const { return memory_->queue_size(); }
+std::optional<LocklessQueueWatcher> LocklessQueueWatcher::Make(
+ LocklessQueue queue, int priority) {
+ queue.Initialize();
+ LocklessQueueWatcher result(queue.memory(), priority);
+ if (result.watcher_index_ != -1) {
+ return std::move(result);
+ } else {
+ return std::nullopt;
+ }
+}
-bool LocklessQueue::RegisterWakeup(int priority) {
+LocklessQueueWatcher::LocklessQueueWatcher(LocklessQueueMemory *memory,
+ int priority)
+ : memory_(memory) {
// TODO(austin): Make sure signal coalescing is turned on. We don't need
// duplicates. That will improve performance under high load.
@@ -623,10 +657,10 @@
// Bail if we failed to find an open slot.
if (watcher_index_ == -1) {
- return false;
+ return;
}
- Watcher *w = memory_->GetWatcher(watcher_index_);
+ Watcher *const w = memory_->GetWatcher(watcher_index_);
w->pid = getpid();
w->priority = priority;
@@ -634,29 +668,15 @@
// Grabbing a mutex is a compiler and memory barrier, so nothing before will
// get rearranged afterwords.
death_notification_init(&(w->tid));
- return true;
}
-void LocklessQueue::UnregisterWakeup() {
- // Since everything is self consistent, all we need to do is make sure nobody
- // else is running. Someone dying will get caught in the generic consistency
- // check.
- GrabQueueSetupLockOrDie grab_queue_setup_lock(memory_);
-
- // Make sure we are registered.
- CHECK_NE(watcher_index_, -1);
-
- // Make sure we still own the slot we are supposed to.
- CHECK(
- death_notification_is_held(&(memory_->GetWatcher(watcher_index_)->tid)));
-
- // The act of unlocking invalidates the entry. Invalidate it.
- death_notification_release(&(memory_->GetWatcher(watcher_index_)->tid));
- // And internally forget the slot.
- watcher_index_ = -1;
+LocklessQueueWakeUpper::LocklessQueueWakeUpper(LocklessQueue queue)
+ : memory_(queue.const_memory()), pid_(getpid()), uid_(getuid()) {
+ queue.Initialize();
+ watcher_copy_.resize(memory_->num_watchers());
}
-int LocklessQueue::Wakeup(const int current_priority) {
+int LocklessQueueWakeUpper::Wakeup(const int current_priority) {
const size_t num_watchers = memory_->num_watchers();
CHECK_EQ(watcher_copy_.size(), num_watchers);
@@ -668,7 +688,7 @@
// question. There is no way without pidfd's to close this window, and
// creating a pidfd is likely not RT.
for (size_t i = 0; i < num_watchers; ++i) {
- Watcher *w = memory_->GetWatcher(i);
+ const Watcher *w = memory_->GetWatcher(i);
watcher_copy_[i].tid = __atomic_load_n(&(w->tid.futex), __ATOMIC_RELAXED);
// Force the load of the TID to come first.
aos_compiler_memory_barrier();
@@ -741,7 +761,8 @@
return count;
}
-LocklessQueue::Sender::Sender(LocklessQueueMemory *memory) : memory_(memory) {
+LocklessQueueSender::LocklessQueueSender(LocklessQueueMemory *memory)
+ : memory_(memory) {
GrabQueueSetupLockOrDie grab_queue_setup_lock(memory_);
// Since we already have the lock, go ahead and try cleaning up.
@@ -778,15 +799,29 @@
<< ": " << std::hex << scratch_index.get();
}
-LocklessQueue::Sender::~Sender() {
- if (valid()) {
+LocklessQueueSender::~LocklessQueueSender() {
+ if (sender_index_ != -1) {
+ CHECK(memory_ != nullptr);
death_notification_release(&(memory_->GetSender(sender_index_)->tid));
}
}
-size_t LocklessQueue::Sender::size() { return memory_->message_data_size(); }
+std::optional<LocklessQueueSender> LocklessQueueSender::Make(
+ LocklessQueue queue) {
+ queue.Initialize();
+ LocklessQueueSender result(queue.memory());
+ if (result.sender_index_ != -1) {
+ return std::move(result);
+ } else {
+ return std::nullopt;
+ }
+}
-void *LocklessQueue::Sender::Data() {
+size_t LocklessQueueSender::size() const {
+ return memory_->message_data_size();
+}
+
+void *LocklessQueueSender::Data() {
::aos::ipc_lib::Sender *sender = memory_->GetSender(sender_index_);
const Index scratch_index = sender->scratch_index.RelaxedLoad();
Message *const message = memory_->GetMessage(scratch_index);
@@ -799,7 +834,7 @@
return message->data(memory_->message_data_size());
}
-void LocklessQueue::Sender::Send(
+void LocklessQueueSender::Send(
const char *data, size_t length,
aos::monotonic_clock::time_point monotonic_remote_time,
aos::realtime_clock::time_point realtime_remote_time,
@@ -816,127 +851,7 @@
monotonic_sent_time, realtime_sent_time, queue_index);
}
-LocklessQueue::Pinner::Pinner(LocklessQueueMemory *memory) : memory_(memory) {
- GrabQueueSetupLockOrDie grab_queue_setup_lock(memory_);
-
- // Since we already have the lock, go ahead and try cleaning up.
- Cleanup(memory_, grab_queue_setup_lock);
-
- const int num_pinners = memory_->num_pinners();
-
- for (int i = 0; i < num_pinners; ++i) {
- ::aos::ipc_lib::Pinner *p = memory->GetPinner(i);
- // This doesn't need synchronization because we're the only process doing
- // initialization right now, and nobody else will be touching pinners which
- // we're interested in.
- const uint32_t tid = __atomic_load_n(&(p->tid.futex), __ATOMIC_RELAXED);
- if (tid == 0) {
- pinner_index_ = i;
- break;
- }
- }
-
- if (pinner_index_ == -1) {
- VLOG(1) << "Too many pinners, starting to bail.";
- return;
- }
-
- ::aos::ipc_lib::Pinner *p = memory_->GetPinner(pinner_index_);
- p->pinned.Invalidate();
-
- // Indicate that we are now alive by taking over the slot. If the previous
- // owner died, we still want to do this.
- death_notification_init(&(p->tid));
-}
-
-LocklessQueue::Pinner::~Pinner() {
- if (valid()) {
- memory_->GetPinner(pinner_index_)->pinned.Invalidate();
- aos_compiler_memory_barrier();
- death_notification_release(&(memory_->GetPinner(pinner_index_)->tid));
- }
-}
-
-// This method doesn't mess with any scratch_index, so it doesn't have to worry
-// about message ownership.
-int LocklessQueue::Pinner::PinIndex(uint32_t uint32_queue_index) {
- const size_t queue_size = memory_->queue_size();
- const QueueIndex queue_index =
- QueueIndex::Zero(queue_size).IncrementBy(uint32_queue_index);
- ipc_lib::Pinner *const pinner = memory_->GetPinner(pinner_index_);
-
- AtomicIndex *const queue_slot = memory_->GetQueue(queue_index.Wrapped());
-
- // Indicate that we want to pin this message.
- pinner->pinned.Store(queue_index);
- aos_compiler_memory_barrier();
-
- {
- const Index message_index = queue_slot->Load();
- Message *const message = memory_->GetMessage(message_index);
-
- const QueueIndex message_queue_index =
- message->header.queue_index.Load(queue_size);
- if (message_queue_index == queue_index) {
- VLOG(3) << "Eq: " << std::hex << message_queue_index.index();
- aos_compiler_memory_barrier();
- return message_index.message_index();
- }
- VLOG(3) << "Message reused: " << std::hex << message_queue_index.index()
- << ", " << queue_index.index();
- }
-
- // Being down here means we asked to pin a message before realizing it's no
- // longer in the queue, so back that out now.
- pinner->pinned.Invalidate();
- VLOG(3) << "Unpinned: " << std::hex << queue_index.index();
- return -1;
-}
-
-size_t LocklessQueue::Pinner::size() const {
- return memory_->message_data_size();
-}
-
-const void *LocklessQueue::Pinner::Data() const {
- const size_t queue_size = memory_->queue_size();
- ::aos::ipc_lib::Pinner *pinner = memory_->GetPinner(pinner_index_);
- QueueIndex pinned = pinner->pinned.RelaxedLoad(queue_size);
- CHECK(pinned.valid());
- const Message *message = memory_->GetMessage(pinned);
-
- return message->data(memory_->message_data_size());
-}
-
-std::optional<LocklessQueue::Sender> LocklessQueue::MakeSender() {
- LocklessQueue::Sender result = LocklessQueue::Sender(memory_);
- if (result.valid()) {
- return std::move(result);
- } else {
- return std::nullopt;
- }
-}
-
-std::optional<LocklessQueue::Pinner> LocklessQueue::MakePinner() {
- LocklessQueue::Pinner result = LocklessQueue::Pinner(memory_);
- if (result.valid()) {
- return std::move(result);
- } else {
- return std::nullopt;
- }
-}
-
-namespace {
-
-QueueIndex ZeroOrValid(QueueIndex index) {
- if (!index.valid()) {
- return index.Clear();
- }
- return index;
-}
-
-} // namespace
-
-void LocklessQueue::Sender::Send(
+void LocklessQueueSender::Send(
size_t length, aos::monotonic_clock::time_point monotonic_remote_time,
aos::realtime_clock::time_point realtime_remote_time,
uint32_t remote_queue_index,
@@ -1094,7 +1009,7 @@
memory_->GetMessage(new_scratch)->header.queue_index.RelaxedInvalidate();
}
-int LocklessQueue::Sender::buffer_index() const {
+int LocklessQueueSender::buffer_index() const {
::aos::ipc_lib::Sender *const sender = memory_->GetSender(sender_index_);
// We can do a relaxed load on our sender because we're the only person
// modifying it right now.
@@ -1102,13 +1017,119 @@
return scratch_index.message_index();
}
-LocklessQueue::ReadResult LocklessQueue::Read(
+LocklessQueuePinner::LocklessQueuePinner(
+ LocklessQueueMemory *memory, const LocklessQueueMemory *const_memory)
+ : memory_(memory), const_memory_(const_memory) {
+ GrabQueueSetupLockOrDie grab_queue_setup_lock(memory_);
+
+ // Since we already have the lock, go ahead and try cleaning up.
+ Cleanup(memory_, grab_queue_setup_lock);
+
+ const int num_pinners = memory_->num_pinners();
+
+ for (int i = 0; i < num_pinners; ++i) {
+ ::aos::ipc_lib::Pinner *p = memory->GetPinner(i);
+ // This doesn't need synchronization because we're the only process doing
+ // initialization right now, and nobody else will be touching pinners which
+ // we're interested in.
+ const uint32_t tid = __atomic_load_n(&(p->tid.futex), __ATOMIC_RELAXED);
+ if (tid == 0) {
+ pinner_index_ = i;
+ break;
+ }
+ }
+
+ if (pinner_index_ == -1) {
+ VLOG(1) << "Too many pinners, starting to bail.";
+ return;
+ }
+
+ ::aos::ipc_lib::Pinner *p = memory_->GetPinner(pinner_index_);
+ p->pinned.Invalidate();
+
+ // Indicate that we are now alive by taking over the slot. If the previous
+ // owner died, we still want to do this.
+ death_notification_init(&(p->tid));
+}
+
+LocklessQueuePinner::~LocklessQueuePinner() {
+ if (pinner_index_ != -1) {
+ CHECK(memory_ != nullptr);
+ memory_->GetPinner(pinner_index_)->pinned.Invalidate();
+ aos_compiler_memory_barrier();
+ death_notification_release(&(memory_->GetPinner(pinner_index_)->tid));
+ }
+}
+
+std::optional<LocklessQueuePinner> LocklessQueuePinner::Make(
+ LocklessQueue queue) {
+ queue.Initialize();
+ LocklessQueuePinner result(queue.memory(), queue.const_memory());
+ if (result.pinner_index_ != -1) {
+ return std::move(result);
+ } else {
+ return std::nullopt;
+ }
+}
+
+// This method doesn't mess with any scratch_index, so it doesn't have to worry
+// about message ownership.
+int LocklessQueuePinner::PinIndex(uint32_t uint32_queue_index) {
+ const size_t queue_size = memory_->queue_size();
+ const QueueIndex queue_index =
+ QueueIndex::Zero(queue_size).IncrementBy(uint32_queue_index);
+ ipc_lib::Pinner *const pinner = memory_->GetPinner(pinner_index_);
+
+ AtomicIndex *const queue_slot = memory_->GetQueue(queue_index.Wrapped());
+
+ // Indicate that we want to pin this message.
+ pinner->pinned.Store(queue_index);
+ aos_compiler_memory_barrier();
+
+ {
+ const Index message_index = queue_slot->Load();
+ Message *const message = memory_->GetMessage(message_index);
+
+ const QueueIndex message_queue_index =
+ message->header.queue_index.Load(queue_size);
+ if (message_queue_index == queue_index) {
+ VLOG(3) << "Eq: " << std::hex << message_queue_index.index();
+ aos_compiler_memory_barrier();
+ return message_index.message_index();
+ }
+ VLOG(3) << "Message reused: " << std::hex << message_queue_index.index()
+ << ", " << queue_index.index();
+ }
+
+ // Being down here means we asked to pin a message before realizing it's no
+ // longer in the queue, so back that out now.
+ pinner->pinned.Invalidate();
+ VLOG(3) << "Unpinned: " << std::hex << queue_index.index();
+ return -1;
+}
+
+size_t LocklessQueuePinner::size() const {
+ return const_memory_->message_data_size();
+}
+
+const void *LocklessQueuePinner::Data() const {
+ const size_t queue_size = const_memory_->queue_size();
+ const ::aos::ipc_lib::Pinner *const pinner =
+ const_memory_->GetPinner(pinner_index_);
+ QueueIndex pinned = pinner->pinned.RelaxedLoad(queue_size);
+ CHECK(pinned.valid());
+ const Message *message = const_memory_->GetMessage(pinned);
+
+ return message->data(const_memory_->message_data_size());
+}
+
+LocklessQueueReader::Result LocklessQueueReader::Read(
uint32_t uint32_queue_index,
::aos::monotonic_clock::time_point *monotonic_sent_time,
::aos::realtime_clock::time_point *realtime_sent_time,
::aos::monotonic_clock::time_point *monotonic_remote_time,
::aos::realtime_clock::time_point *realtime_remote_time,
- uint32_t *remote_queue_index, size_t *length, char *data) {
+ uint32_t *remote_queue_index, size_t *length, char *data) const {
const size_t queue_size = memory_->queue_size();
// Build up the QueueIndex.
@@ -1117,7 +1138,7 @@
// Read the message stored at the requested location.
Index mi = memory_->LoadIndex(queue_index);
- Message *m = memory_->GetMessage(mi);
+ const Message *m = memory_->GetMessage(mi);
while (true) {
// We need to confirm that the data doesn't change while we are reading it.
@@ -1130,13 +1151,13 @@
if (starting_queue_index == queue_index.DecrementBy(queue_size)) {
VLOG(3) << "Matches: " << std::hex << starting_queue_index.index()
<< ", " << queue_index.DecrementBy(queue_size).index();
- return ReadResult::NOTHING_NEW;
+ return Result::NOTHING_NEW;
}
// Someone has re-used this message between when we pulled it out of the
// queue and when we grabbed its index. It is pretty hard to deduce
// what happened. Just try again.
- Message *const new_m = memory_->GetMessage(queue_index);
+ const Message *const new_m = memory_->GetMessage(queue_index);
if (m != new_m) {
m = new_m;
VLOG(3) << "Retrying, m doesn't match";
@@ -1154,7 +1175,7 @@
<< ", got " << starting_queue_index.index() << ", behind by "
<< std::dec
<< (starting_queue_index.index() - queue_index.index());
- return ReadResult::TOO_OLD;
+ return Result::TOO_OLD;
}
VLOG(3) << "Initial";
@@ -1167,10 +1188,10 @@
// the queue. Tell them that they are behind.
if (uint32_queue_index < memory_->queue_size()) {
VLOG(3) << "Near zero, " << std::hex << uint32_queue_index;
- return ReadResult::NOTHING_NEW;
+ return Result::NOTHING_NEW;
} else {
VLOG(3) << "Not near zero, " << std::hex << uint32_queue_index;
- return ReadResult::TOO_OLD;
+ return Result::TOO_OLD;
}
}
VLOG(3) << "Eq: " << std::hex << starting_queue_index.index() << ", "
@@ -1190,7 +1211,8 @@
*monotonic_remote_time = m->header.monotonic_remote_time;
*realtime_remote_time = m->header.realtime_remote_time;
if (data) {
- memcpy(data, m->data(memory_->message_data_size()), message_data_size());
+ memcpy(data, m->data(memory_->message_data_size()),
+ memory_->message_data_size());
}
*length = m->header.length;
@@ -1205,18 +1227,13 @@
<< queue_index.index() << ", finished with "
<< final_queue_index.index() << ", delta: " << std::dec
<< (final_queue_index.index() - queue_index.index());
- return ReadResult::OVERWROTE;
+ return Result::OVERWROTE;
}
- return ReadResult::GOOD;
+ return Result::GOOD;
}
-size_t LocklessQueue::queue_size() const { return memory_->queue_size(); }
-size_t LocklessQueue::message_data_size() const {
- return memory_->message_data_size();
-}
-
-QueueIndex LocklessQueue::LatestQueueIndex() {
+QueueIndex LocklessQueueReader::LatestIndex() const {
const size_t queue_size = memory_->queue_size();
// There is only one interesting case. We need to know if the queue is empty.
@@ -1226,9 +1243,16 @@
if (next_queue_index.valid()) {
const QueueIndex current_queue_index = next_queue_index.DecrementBy(1u);
return current_queue_index;
- } else {
- return empty_queue_index();
}
+ return QueueIndex::Invalid();
+}
+
+size_t LocklessQueueSize(const LocklessQueueMemory *memory) {
+ return memory->queue_size();
+}
+
+size_t LocklessQueueMessageDataSize(const LocklessQueueMemory *memory) {
+ return memory->message_data_size();
}
namespace {