Improve const-correctness in the LocklessQueue code
This will allow using a read-only mapping for reading data from queues
in ShmEventLoop. This will make it much harder for code reading from a
queue to accidentally modify it.
This involves splitting up LocklessQueue into individual components.
This makes it much more obvious what state is used where, and allows
adding some consts.
Change-Id: Ic83b0d2169e6dfae3eec656aa8e49852125698d9
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index e45c065..1ac8656 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -164,6 +164,10 @@
const ipc_lib::LocklessQueueConfiguration &config() const { return config_; }
+ ipc_lib::LocklessQueue queue() const {
+ return ipc_lib::LocklessQueue(memory(), memory(), config());
+ }
+
absl::Span<char> GetSharedMemory() const {
return absl::Span<char>(static_cast<char *>(data_), size_);
}
@@ -207,8 +211,7 @@
channel,
chrono::ceil<chrono::seconds>(chrono::nanoseconds(
event_loop->configuration()->channel_storage_duration()))),
- lockless_queue_(lockless_queue_memory_.memory(),
- lockless_queue_memory_.config()) {
+ reader_(lockless_queue_memory_.queue()) {
context_.data = nullptr;
// Point the queue index at the next index to read starting now. This
// makes it such that FetchNext will read the next message sent after
@@ -238,7 +241,8 @@
// Sets this object to pin data in shared memory when fetching.
void PinDataOnFetch() {
CHECK(!copy_data());
- auto maybe_pinner = lockless_queue_.MakePinner();
+ auto maybe_pinner =
+ ipc_lib::LocklessQueuePinner::Make(lockless_queue_memory_.queue());
if (!maybe_pinner) {
LOG(FATAL) << "Failed to create reader on "
<< configuration::CleanedChannelToString(channel_)
@@ -250,26 +254,26 @@
// Points the next message to fetch at the queue index which will be
// populated next.
void PointAtNextQueueIndex() {
- actual_queue_index_ = lockless_queue_.LatestQueueIndex();
+ actual_queue_index_ = reader_.LatestIndex();
if (!actual_queue_index_.valid()) {
// Nothing in the queue. The next element will show up at the 0th
// index in the queue.
- actual_queue_index_ =
- ipc_lib::QueueIndex::Zero(lockless_queue_.queue_size());
+ actual_queue_index_ = ipc_lib::QueueIndex::Zero(
+ LocklessQueueSize(lockless_queue_memory_.memory()));
} else {
actual_queue_index_ = actual_queue_index_.Increment();
}
}
bool FetchNext() {
- const ipc_lib::LocklessQueue::ReadResult read_result =
+ const ipc_lib::LocklessQueueReader::Result read_result =
DoFetch(actual_queue_index_);
- return read_result == ipc_lib::LocklessQueue::ReadResult::GOOD;
+ return read_result == ipc_lib::LocklessQueueReader::Result::GOOD;
}
bool Fetch() {
- const ipc_lib::QueueIndex queue_index = lockless_queue_.LatestQueueIndex();
+ const ipc_lib::QueueIndex queue_index = reader_.LatestIndex();
// actual_queue_index_ is only meaningful if it was set by Fetch or
// FetchNext. This happens when valid_data_ has been set. So, only
// skip checking if valid_data_ is true.
@@ -282,22 +286,29 @@
return false;
}
- const ipc_lib::LocklessQueue::ReadResult read_result = DoFetch(queue_index);
+ const ipc_lib::LocklessQueueReader::Result read_result =
+ DoFetch(queue_index);
- CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::NOTHING_NEW)
+ CHECK(read_result != ipc_lib::LocklessQueueReader::Result::NOTHING_NEW)
<< ": Queue index went backwards. This should never happen. "
<< configuration::CleanedChannelToString(channel_);
- return read_result == ipc_lib::LocklessQueue::ReadResult::GOOD;
+ return read_result == ipc_lib::LocklessQueueReader::Result::GOOD;
}
Context context() const { return context_; }
bool RegisterWakeup(int priority) {
- return lockless_queue_.RegisterWakeup(priority);
+ CHECK(!watcher_);
+ watcher_ = ipc_lib::LocklessQueueWatcher::Make(
+ lockless_queue_memory_.queue(), priority);
+ return static_cast<bool>(watcher_);
}
- void UnregisterWakeup() { lockless_queue_.UnregisterWakeup(); }
+ void UnregisterWakeup() {
+ CHECK(watcher_);
+ watcher_ = std::nullopt;
+ }
absl::Span<char> GetSharedMemory() const {
return lockless_queue_memory_.GetSharedMemory();
@@ -309,23 +320,24 @@
// grab the whole shared memory buffer instead.
return absl::Span<char>(
const_cast<SimpleShmFetcher *>(this)->data_storage_start(),
- lockless_queue_.message_data_size());
+ LocklessQueueMessageDataSize(lockless_queue_memory_.memory()));
}
private:
- ipc_lib::LocklessQueue::ReadResult DoFetch(ipc_lib::QueueIndex queue_index) {
+ ipc_lib::LocklessQueueReader::Result DoFetch(
+ ipc_lib::QueueIndex queue_index) {
// TODO(austin): Get behind and make sure it dies.
char *copy_buffer = nullptr;
if (copy_data()) {
copy_buffer = data_storage_start();
}
- ipc_lib::LocklessQueue::ReadResult read_result = lockless_queue_.Read(
+ ipc_lib::LocklessQueueReader::Result read_result = reader_.Read(
queue_index.index(), &context_.monotonic_event_time,
&context_.realtime_event_time, &context_.monotonic_remote_time,
&context_.realtime_remote_time, &context_.remote_queue_index,
&context_.size, copy_buffer);
- if (read_result == ipc_lib::LocklessQueue::ReadResult::GOOD) {
+ if (read_result == ipc_lib::LocklessQueueReader::Result::GOOD) {
if (pin_data()) {
const int pin_result = pinner_->PinIndex(queue_index.index());
CHECK(pin_result >= 0)
@@ -351,7 +363,9 @@
const char *const data = DataBuffer();
if (data) {
context_.data =
- data + lockless_queue_.message_data_size() - context_.size;
+ data +
+ LocklessQueueMessageDataSize(lockless_queue_memory_.memory()) -
+ context_.size;
} else {
context_.data = nullptr;
}
@@ -361,7 +375,7 @@
// Make sure the data wasn't modified while we were reading it. This
// can only happen if you are reading the last message *while* it is
// being written to, which means you are pretty far behind.
- CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::OVERWROTE)
+ CHECK(read_result != ipc_lib::LocklessQueueReader::Result::OVERWROTE)
<< ": Got behind while reading and the last message was modified "
"out from under us while we were reading it. Don't get so far "
"behind on: "
@@ -370,7 +384,7 @@
// We fell behind between when we read the index and read the value.
// This isn't worth recovering from since this means we went to sleep
// for a long time in the middle of this function.
- if (read_result == ipc_lib::LocklessQueue::ReadResult::TOO_OLD) {
+ if (read_result == ipc_lib::LocklessQueueReader::Result::TOO_OLD) {
event_loop_->SendTimingReport();
LOG(FATAL) << "The next message is no longer available. "
<< configuration::CleanedChannelToString(channel_);
@@ -402,16 +416,17 @@
aos::ShmEventLoop *event_loop_;
const Channel *const channel_;
MMapedQueue lockless_queue_memory_;
- ipc_lib::LocklessQueue lockless_queue_;
+ ipc_lib::LocklessQueueReader reader_;
+ // This being nullopt indicates we're not looking for wakeups right now.
+ std::optional<ipc_lib::LocklessQueueWatcher> watcher_;
- ipc_lib::QueueIndex actual_queue_index_ =
- ipc_lib::LocklessQueue::empty_queue_index();
+ ipc_lib::QueueIndex actual_queue_index_ = ipc_lib::QueueIndex::Invalid();
// This being empty indicates we're not going to copy data.
std::unique_ptr<char, decltype(&free)> data_storage_{nullptr, &free};
// This being nullopt indicates we're not going to pin messages.
- std::optional<ipc_lib::LocklessQueue::Pinner> pinner_;
+ std::optional<ipc_lib::LocklessQueuePinner> pinner_;
Context context_;
};
@@ -458,15 +473,15 @@
channel,
chrono::ceil<chrono::seconds>(chrono::nanoseconds(
event_loop->configuration()->channel_storage_duration()))),
- lockless_queue_(lockless_queue_memory_.memory(),
- lockless_queue_memory_.config()),
- lockless_queue_sender_(
- VerifySender(lockless_queue_.MakeSender(), channel)) {}
+ lockless_queue_sender_(VerifySender(
+ ipc_lib::LocklessQueueSender::Make(lockless_queue_memory_.queue()),
+ channel)),
+ wake_upper_(lockless_queue_memory_.queue()) {}
~ShmSender() override {}
- static ipc_lib::LocklessQueue::Sender VerifySender(
- std::optional<ipc_lib::LocklessQueue::Sender> &&sender,
+ static ipc_lib::LocklessQueueSender VerifySender(
+ std::optional<ipc_lib::LocklessQueueSender> sender,
const Channel *channel) {
if (sender) {
return std::move(sender.value());
@@ -488,7 +503,7 @@
lockless_queue_sender_.Send(
length, monotonic_remote_time, realtime_remote_time, remote_queue_index,
&monotonic_sent_time_, &realtime_sent_time_, &sent_queue_index_);
- lockless_queue_.Wakeup(event_loop()->priority());
+ wake_upper_.Wakeup(event_loop()->priority());
return true;
}
@@ -503,7 +518,7 @@
monotonic_remote_time, realtime_remote_time,
remote_queue_index, &monotonic_sent_time_,
&realtime_sent_time_, &sent_queue_index_);
- lockless_queue_.Wakeup(event_loop()->priority());
+ wake_upper_.Wakeup(event_loop()->priority());
// TODO(austin): Return an error if we send too fast.
return true;
}
@@ -516,8 +531,8 @@
private:
MMapedQueue lockless_queue_memory_;
- ipc_lib::LocklessQueue lockless_queue_;
- ipc_lib::LocklessQueue::Sender lockless_queue_sender_;
+ ipc_lib::LocklessQueueSender lockless_queue_sender_;
+ ipc_lib::LocklessQueueWakeUpper wake_upper_;
};
// Class to manage the state for a Watcher.