Improve const-correctness in the LocklessQueue code

This will allow using a read-only mapping for reading data from queues
in ShmEventLoop. This will make it much harder for code reading from a
queue to accidentally modify it.

This involves splitting up LocklessQueue into individual components.
This makes it much more obvious what state is used where, and allows
adding some consts.

Change-Id: Ic83b0d2169e6dfae3eec656aa8e49852125698d9
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index e45c065..1ac8656 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -164,6 +164,10 @@
 
   const ipc_lib::LocklessQueueConfiguration &config() const { return config_; }
 
+  ipc_lib::LocklessQueue queue() const {
+    return ipc_lib::LocklessQueue(memory(), memory(), config());
+  }
+
   absl::Span<char> GetSharedMemory() const {
     return absl::Span<char>(static_cast<char *>(data_), size_);
   }
@@ -207,8 +211,7 @@
             channel,
             chrono::ceil<chrono::seconds>(chrono::nanoseconds(
                 event_loop->configuration()->channel_storage_duration()))),
-        lockless_queue_(lockless_queue_memory_.memory(),
-                        lockless_queue_memory_.config()) {
+        reader_(lockless_queue_memory_.queue()) {
     context_.data = nullptr;
     // Point the queue index at the next index to read starting now.  This
     // makes it such that FetchNext will read the next message sent after
@@ -238,7 +241,8 @@
   // Sets this object to pin data in shared memory when fetching.
   void PinDataOnFetch() {
     CHECK(!copy_data());
-    auto maybe_pinner = lockless_queue_.MakePinner();
+    auto maybe_pinner =
+        ipc_lib::LocklessQueuePinner::Make(lockless_queue_memory_.queue());
     if (!maybe_pinner) {
       LOG(FATAL) << "Failed to create reader on "
                  << configuration::CleanedChannelToString(channel_)
@@ -250,26 +254,26 @@
   // Points the next message to fetch at the queue index which will be
   // populated next.
   void PointAtNextQueueIndex() {
-    actual_queue_index_ = lockless_queue_.LatestQueueIndex();
+    actual_queue_index_ = reader_.LatestIndex();
     if (!actual_queue_index_.valid()) {
       // Nothing in the queue.  The next element will show up at the 0th
       // index in the queue.
-      actual_queue_index_ =
-          ipc_lib::QueueIndex::Zero(lockless_queue_.queue_size());
+      actual_queue_index_ = ipc_lib::QueueIndex::Zero(
+          LocklessQueueSize(lockless_queue_memory_.memory()));
     } else {
       actual_queue_index_ = actual_queue_index_.Increment();
     }
   }
 
   bool FetchNext() {
-    const ipc_lib::LocklessQueue::ReadResult read_result =
+    const ipc_lib::LocklessQueueReader::Result read_result =
         DoFetch(actual_queue_index_);
 
-    return read_result == ipc_lib::LocklessQueue::ReadResult::GOOD;
+    return read_result == ipc_lib::LocklessQueueReader::Result::GOOD;
   }
 
   bool Fetch() {
-    const ipc_lib::QueueIndex queue_index = lockless_queue_.LatestQueueIndex();
+    const ipc_lib::QueueIndex queue_index = reader_.LatestIndex();
     // actual_queue_index_ is only meaningful if it was set by Fetch or
     // FetchNext.  This happens when valid_data_ has been set.  So, only
     // skip checking if valid_data_ is true.
@@ -282,22 +286,29 @@
       return false;
     }
 
-    const ipc_lib::LocklessQueue::ReadResult read_result = DoFetch(queue_index);
+    const ipc_lib::LocklessQueueReader::Result read_result =
+        DoFetch(queue_index);
 
-    CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::NOTHING_NEW)
+    CHECK(read_result != ipc_lib::LocklessQueueReader::Result::NOTHING_NEW)
         << ": Queue index went backwards.  This should never happen.  "
         << configuration::CleanedChannelToString(channel_);
 
-    return read_result == ipc_lib::LocklessQueue::ReadResult::GOOD;
+    return read_result == ipc_lib::LocklessQueueReader::Result::GOOD;
   }
 
   Context context() const { return context_; }
 
   bool RegisterWakeup(int priority) {
-    return lockless_queue_.RegisterWakeup(priority);
+    CHECK(!watcher_);
+    watcher_ = ipc_lib::LocklessQueueWatcher::Make(
+        lockless_queue_memory_.queue(), priority);
+    return static_cast<bool>(watcher_);
   }
 
-  void UnregisterWakeup() { lockless_queue_.UnregisterWakeup(); }
+  void UnregisterWakeup() {
+    CHECK(watcher_);
+    watcher_ = std::nullopt;
+  }
 
   absl::Span<char> GetSharedMemory() const {
     return lockless_queue_memory_.GetSharedMemory();
@@ -309,23 +320,24 @@
     // grab the whole shared memory buffer instead.
     return absl::Span<char>(
         const_cast<SimpleShmFetcher *>(this)->data_storage_start(),
-        lockless_queue_.message_data_size());
+        LocklessQueueMessageDataSize(lockless_queue_memory_.memory()));
   }
 
  private:
-  ipc_lib::LocklessQueue::ReadResult DoFetch(ipc_lib::QueueIndex queue_index) {
+  ipc_lib::LocklessQueueReader::Result DoFetch(
+      ipc_lib::QueueIndex queue_index) {
     // TODO(austin): Get behind and make sure it dies.
     char *copy_buffer = nullptr;
     if (copy_data()) {
       copy_buffer = data_storage_start();
     }
-    ipc_lib::LocklessQueue::ReadResult read_result = lockless_queue_.Read(
+    ipc_lib::LocklessQueueReader::Result read_result = reader_.Read(
         queue_index.index(), &context_.monotonic_event_time,
         &context_.realtime_event_time, &context_.monotonic_remote_time,
         &context_.realtime_remote_time, &context_.remote_queue_index,
         &context_.size, copy_buffer);
 
-    if (read_result == ipc_lib::LocklessQueue::ReadResult::GOOD) {
+    if (read_result == ipc_lib::LocklessQueueReader::Result::GOOD) {
       if (pin_data()) {
         const int pin_result = pinner_->PinIndex(queue_index.index());
         CHECK(pin_result >= 0)
@@ -351,7 +363,9 @@
       const char *const data = DataBuffer();
       if (data) {
         context_.data =
-            data + lockless_queue_.message_data_size() - context_.size;
+            data +
+            LocklessQueueMessageDataSize(lockless_queue_memory_.memory()) -
+            context_.size;
       } else {
         context_.data = nullptr;
       }
@@ -361,7 +375,7 @@
     // Make sure the data wasn't modified while we were reading it.  This
     // can only happen if you are reading the last message *while* it is
     // being written to, which means you are pretty far behind.
-    CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::OVERWROTE)
+    CHECK(read_result != ipc_lib::LocklessQueueReader::Result::OVERWROTE)
         << ": Got behind while reading and the last message was modified "
            "out from under us while we were reading it.  Don't get so far "
            "behind on: "
@@ -370,7 +384,7 @@
     // We fell behind between when we read the index and read the value.
     // This isn't worth recovering from since this means we went to sleep
     // for a long time in the middle of this function.
-    if (read_result == ipc_lib::LocklessQueue::ReadResult::TOO_OLD) {
+    if (read_result == ipc_lib::LocklessQueueReader::Result::TOO_OLD) {
       event_loop_->SendTimingReport();
       LOG(FATAL) << "The next message is no longer available.  "
                  << configuration::CleanedChannelToString(channel_);
@@ -402,16 +416,17 @@
   aos::ShmEventLoop *event_loop_;
   const Channel *const channel_;
   MMapedQueue lockless_queue_memory_;
-  ipc_lib::LocklessQueue lockless_queue_;
+  ipc_lib::LocklessQueueReader reader_;
+  // This being nullopt indicates we're not looking for wakeups right now.
+  std::optional<ipc_lib::LocklessQueueWatcher> watcher_;
 
-  ipc_lib::QueueIndex actual_queue_index_ =
-      ipc_lib::LocklessQueue::empty_queue_index();
+  ipc_lib::QueueIndex actual_queue_index_ = ipc_lib::QueueIndex::Invalid();
 
   // This being empty indicates we're not going to copy data.
   std::unique_ptr<char, decltype(&free)> data_storage_{nullptr, &free};
 
   // This being nullopt indicates we're not going to pin messages.
-  std::optional<ipc_lib::LocklessQueue::Pinner> pinner_;
+  std::optional<ipc_lib::LocklessQueuePinner> pinner_;
 
   Context context_;
 };
@@ -458,15 +473,15 @@
             channel,
             chrono::ceil<chrono::seconds>(chrono::nanoseconds(
                 event_loop->configuration()->channel_storage_duration()))),
-        lockless_queue_(lockless_queue_memory_.memory(),
-                        lockless_queue_memory_.config()),
-        lockless_queue_sender_(
-            VerifySender(lockless_queue_.MakeSender(), channel)) {}
+        lockless_queue_sender_(VerifySender(
+            ipc_lib::LocklessQueueSender::Make(lockless_queue_memory_.queue()),
+            channel)),
+        wake_upper_(lockless_queue_memory_.queue()) {}
 
   ~ShmSender() override {}
 
-  static ipc_lib::LocklessQueue::Sender VerifySender(
-      std::optional<ipc_lib::LocklessQueue::Sender> &&sender,
+  static ipc_lib::LocklessQueueSender VerifySender(
+      std::optional<ipc_lib::LocklessQueueSender> sender,
       const Channel *channel) {
     if (sender) {
       return std::move(sender.value());
@@ -488,7 +503,7 @@
     lockless_queue_sender_.Send(
         length, monotonic_remote_time, realtime_remote_time, remote_queue_index,
         &monotonic_sent_time_, &realtime_sent_time_, &sent_queue_index_);
-    lockless_queue_.Wakeup(event_loop()->priority());
+    wake_upper_.Wakeup(event_loop()->priority());
     return true;
   }
 
@@ -503,7 +518,7 @@
                                 monotonic_remote_time, realtime_remote_time,
                                 remote_queue_index, &monotonic_sent_time_,
                                 &realtime_sent_time_, &sent_queue_index_);
-    lockless_queue_.Wakeup(event_loop()->priority());
+    wake_upper_.Wakeup(event_loop()->priority());
     // TODO(austin): Return an error if we send too fast.
     return true;
   }
@@ -516,8 +531,8 @@
 
  private:
   MMapedQueue lockless_queue_memory_;
-  ipc_lib::LocklessQueue lockless_queue_;
-  ipc_lib::LocklessQueue::Sender lockless_queue_sender_;
+  ipc_lib::LocklessQueueSender lockless_queue_sender_;
+  ipc_lib::LocklessQueueWakeUpper wake_upper_;
 };
 
 // Class to manage the state for a Watcher.