Improve const-correctness in the LocklessQueue code

This will allow using a read-only mapping for reading data from queues
in ShmEventLoop. This will make it much harder for code reading from a
queue to accidentally modify it.

This involves splitting up LocklessQueue into individual components.
This makes it much more obvious what state is used where, and allows
adding some consts.

Change-Id: Ic83b0d2169e6dfae3eec656aa8e49852125698d9
diff --git a/aos/ipc_lib/lockless_queue.cc b/aos/ipc_lib/lockless_queue.cc
index dad44bb..4115769 100644
--- a/aos/ipc_lib/lockless_queue.cc
+++ b/aos/ipc_lib/lockless_queue.cc
@@ -424,6 +424,13 @@
   return syscall(SYS_rt_tgsigqueueinfo, tgid, tid, sig, si);
 }
 
+QueueIndex ZeroOrValid(QueueIndex index) {
+  if (!index.valid()) {
+    return index.Clear();
+  }
+  return index;
+}
+
 }  // namespace
 
 size_t LocklessQueueConfiguration::message_size() const {
@@ -571,30 +578,57 @@
   return memory;
 }
 
-LocklessQueue::LocklessQueue(LocklessQueueMemory *memory,
-                             LocklessQueueConfiguration config)
-    : memory_(InitializeLocklessQueueMemory(memory, config)),
-      watcher_copy_(memory_->num_watchers()),
-      pid_(getpid()),
-      uid_(getuid()) {}
+void LocklessQueue::Initialize() {
+  InitializeLocklessQueueMemory(memory_, config_);
+}
 
-LocklessQueue::~LocklessQueue() {
-  CHECK_EQ(watcher_index_, -1);
+LocklessQueueWatcher::~LocklessQueueWatcher() {
+  if (watcher_index_ == -1) {
+    return;
+  }
 
+  // Since everything is self consistent, all we need to do is make sure nobody
+  // else is running.  Someone dying will get caught in the generic consistency
+  // check.
   GrabQueueSetupLockOrDie grab_queue_setup_lock(memory_);
-  const int num_watchers = memory_->num_watchers();
+
+  // Make sure we are registered.
+  CHECK_NE(watcher_index_, -1);
+
+  // Make sure we still own the slot we are supposed to.
+  CHECK(
+      death_notification_is_held(&(memory_->GetWatcher(watcher_index_)->tid)));
+
+  // The act of unlocking invalidates the entry.  Invalidate it.
+  death_notification_release(&(memory_->GetWatcher(watcher_index_)->tid));
+  // And internally forget the slot.
+  watcher_index_ = -1;
+
   // Cleanup is cheap. The next user will do it anyways, so no need for us to do
   // anything right now.
 
   // And confirm that nothing is owned by us.
+  const int num_watchers = memory_->num_watchers();
   for (int i = 0; i < num_watchers; ++i) {
-    CHECK(!death_notification_is_held(&(memory_->GetWatcher(i)->tid)));
+    CHECK(!death_notification_is_held(&(memory_->GetWatcher(i)->tid)))
+        << ": " << i;
   }
 }
 
-size_t LocklessQueue::QueueSize() const { return memory_->queue_size(); }
+std::optional<LocklessQueueWatcher> LocklessQueueWatcher::Make(
+    LocklessQueue queue, int priority) {
+  queue.Initialize();
+  LocklessQueueWatcher result(queue.memory(), priority);
+  if (result.watcher_index_ != -1) {
+    return std::move(result);
+  } else {
+    return std::nullopt;
+  }
+}
 
-bool LocklessQueue::RegisterWakeup(int priority) {
+LocklessQueueWatcher::LocklessQueueWatcher(LocklessQueueMemory *memory,
+                                           int priority)
+    : memory_(memory) {
   // TODO(austin): Make sure signal coalescing is turned on.  We don't need
   // duplicates.  That will improve performance under high load.
 
@@ -623,10 +657,10 @@
 
   // Bail if we failed to find an open slot.
   if (watcher_index_ == -1) {
-    return false;
+    return;
   }
 
-  Watcher *w = memory_->GetWatcher(watcher_index_);
+  Watcher *const w = memory_->GetWatcher(watcher_index_);
 
   w->pid = getpid();
   w->priority = priority;
@@ -634,29 +668,15 @@
   // Grabbing a mutex is a compiler and memory barrier, so nothing before will
   // get rearranged afterwords.
   death_notification_init(&(w->tid));
-  return true;
 }
 
-void LocklessQueue::UnregisterWakeup() {
-  // Since everything is self consistent, all we need to do is make sure nobody
-  // else is running.  Someone dying will get caught in the generic consistency
-  // check.
-  GrabQueueSetupLockOrDie grab_queue_setup_lock(memory_);
-
-  // Make sure we are registered.
-  CHECK_NE(watcher_index_, -1);
-
-  // Make sure we still own the slot we are supposed to.
-  CHECK(
-      death_notification_is_held(&(memory_->GetWatcher(watcher_index_)->tid)));
-
-  // The act of unlocking invalidates the entry.  Invalidate it.
-  death_notification_release(&(memory_->GetWatcher(watcher_index_)->tid));
-  // And internally forget the slot.
-  watcher_index_ = -1;
+LocklessQueueWakeUpper::LocklessQueueWakeUpper(LocklessQueue queue)
+    : memory_(queue.const_memory()), pid_(getpid()), uid_(getuid()) {
+  queue.Initialize();
+  watcher_copy_.resize(memory_->num_watchers());
 }
 
-int LocklessQueue::Wakeup(const int current_priority) {
+int LocklessQueueWakeUpper::Wakeup(const int current_priority) {
   const size_t num_watchers = memory_->num_watchers();
 
   CHECK_EQ(watcher_copy_.size(), num_watchers);
@@ -668,7 +688,7 @@
   // question.  There is no way without pidfd's to close this window, and
   // creating a pidfd is likely not RT.
   for (size_t i = 0; i < num_watchers; ++i) {
-    Watcher *w = memory_->GetWatcher(i);
+    const Watcher *w = memory_->GetWatcher(i);
     watcher_copy_[i].tid = __atomic_load_n(&(w->tid.futex), __ATOMIC_RELAXED);
     // Force the load of the TID to come first.
     aos_compiler_memory_barrier();
@@ -741,7 +761,8 @@
   return count;
 }
 
-LocklessQueue::Sender::Sender(LocklessQueueMemory *memory) : memory_(memory) {
+LocklessQueueSender::LocklessQueueSender(LocklessQueueMemory *memory)
+    : memory_(memory) {
   GrabQueueSetupLockOrDie grab_queue_setup_lock(memory_);
 
   // Since we already have the lock, go ahead and try cleaning up.
@@ -778,15 +799,29 @@
       << ": " << std::hex << scratch_index.get();
 }
 
-LocklessQueue::Sender::~Sender() {
-  if (valid()) {
+LocklessQueueSender::~LocklessQueueSender() {
+  if (sender_index_ != -1) {
+    CHECK(memory_ != nullptr);
     death_notification_release(&(memory_->GetSender(sender_index_)->tid));
   }
 }
 
-size_t LocklessQueue::Sender::size() { return memory_->message_data_size(); }
+std::optional<LocklessQueueSender> LocklessQueueSender::Make(
+    LocklessQueue queue) {
+  queue.Initialize();
+  LocklessQueueSender result(queue.memory());
+  if (result.sender_index_ != -1) {
+    return std::move(result);
+  } else {
+    return std::nullopt;
+  }
+}
 
-void *LocklessQueue::Sender::Data() {
+size_t LocklessQueueSender::size() const {
+  return memory_->message_data_size();
+}
+
+void *LocklessQueueSender::Data() {
   ::aos::ipc_lib::Sender *sender = memory_->GetSender(sender_index_);
   const Index scratch_index = sender->scratch_index.RelaxedLoad();
   Message *const message = memory_->GetMessage(scratch_index);
@@ -799,7 +834,7 @@
   return message->data(memory_->message_data_size());
 }
 
-void LocklessQueue::Sender::Send(
+void LocklessQueueSender::Send(
     const char *data, size_t length,
     aos::monotonic_clock::time_point monotonic_remote_time,
     aos::realtime_clock::time_point realtime_remote_time,
@@ -816,127 +851,7 @@
        monotonic_sent_time, realtime_sent_time, queue_index);
 }
 
-LocklessQueue::Pinner::Pinner(LocklessQueueMemory *memory) : memory_(memory) {
-  GrabQueueSetupLockOrDie grab_queue_setup_lock(memory_);
-
-  // Since we already have the lock, go ahead and try cleaning up.
-  Cleanup(memory_, grab_queue_setup_lock);
-
-  const int num_pinners = memory_->num_pinners();
-
-  for (int i = 0; i < num_pinners; ++i) {
-    ::aos::ipc_lib::Pinner *p = memory->GetPinner(i);
-    // This doesn't need synchronization because we're the only process doing
-    // initialization right now, and nobody else will be touching pinners which
-    // we're interested in.
-    const uint32_t tid = __atomic_load_n(&(p->tid.futex), __ATOMIC_RELAXED);
-    if (tid == 0) {
-      pinner_index_ = i;
-      break;
-    }
-  }
-
-  if (pinner_index_ == -1) {
-    VLOG(1) << "Too many pinners, starting to bail.";
-    return;
-  }
-
-  ::aos::ipc_lib::Pinner *p = memory_->GetPinner(pinner_index_);
-  p->pinned.Invalidate();
-
-  // Indicate that we are now alive by taking over the slot. If the previous
-  // owner died, we still want to do this.
-  death_notification_init(&(p->tid));
-}
-
-LocklessQueue::Pinner::~Pinner() {
-  if (valid()) {
-    memory_->GetPinner(pinner_index_)->pinned.Invalidate();
-    aos_compiler_memory_barrier();
-    death_notification_release(&(memory_->GetPinner(pinner_index_)->tid));
-  }
-}
-
-// This method doesn't mess with any scratch_index, so it doesn't have to worry
-// about message ownership.
-int LocklessQueue::Pinner::PinIndex(uint32_t uint32_queue_index) {
-  const size_t queue_size = memory_->queue_size();
-  const QueueIndex queue_index =
-      QueueIndex::Zero(queue_size).IncrementBy(uint32_queue_index);
-  ipc_lib::Pinner *const pinner = memory_->GetPinner(pinner_index_);
-
-  AtomicIndex *const queue_slot = memory_->GetQueue(queue_index.Wrapped());
-
-  // Indicate that we want to pin this message.
-  pinner->pinned.Store(queue_index);
-  aos_compiler_memory_barrier();
-
-  {
-    const Index message_index = queue_slot->Load();
-    Message *const message = memory_->GetMessage(message_index);
-
-    const QueueIndex message_queue_index =
-        message->header.queue_index.Load(queue_size);
-    if (message_queue_index == queue_index) {
-      VLOG(3) << "Eq: " << std::hex << message_queue_index.index();
-      aos_compiler_memory_barrier();
-      return message_index.message_index();
-    }
-    VLOG(3) << "Message reused: " << std::hex << message_queue_index.index()
-            << ", " << queue_index.index();
-  }
-
-  // Being down here means we asked to pin a message before realizing it's no
-  // longer in the queue, so back that out now.
-  pinner->pinned.Invalidate();
-  VLOG(3) << "Unpinned: " << std::hex << queue_index.index();
-  return -1;
-}
-
-size_t LocklessQueue::Pinner::size() const {
-  return memory_->message_data_size();
-}
-
-const void *LocklessQueue::Pinner::Data() const {
-  const size_t queue_size = memory_->queue_size();
-  ::aos::ipc_lib::Pinner *pinner = memory_->GetPinner(pinner_index_);
-  QueueIndex pinned = pinner->pinned.RelaxedLoad(queue_size);
-  CHECK(pinned.valid());
-  const Message *message = memory_->GetMessage(pinned);
-
-  return message->data(memory_->message_data_size());
-}
-
-std::optional<LocklessQueue::Sender> LocklessQueue::MakeSender() {
-  LocklessQueue::Sender result = LocklessQueue::Sender(memory_);
-  if (result.valid()) {
-    return std::move(result);
-  } else {
-    return std::nullopt;
-  }
-}
-
-std::optional<LocklessQueue::Pinner> LocklessQueue::MakePinner() {
-  LocklessQueue::Pinner result = LocklessQueue::Pinner(memory_);
-  if (result.valid()) {
-    return std::move(result);
-  } else {
-    return std::nullopt;
-  }
-}
-
-namespace {
-
-QueueIndex ZeroOrValid(QueueIndex index) {
-  if (!index.valid()) {
-    return index.Clear();
-  }
-  return index;
-}
-
-}  // namespace
-
-void LocklessQueue::Sender::Send(
+void LocklessQueueSender::Send(
     size_t length, aos::monotonic_clock::time_point monotonic_remote_time,
     aos::realtime_clock::time_point realtime_remote_time,
     uint32_t remote_queue_index,
@@ -1094,7 +1009,7 @@
   memory_->GetMessage(new_scratch)->header.queue_index.RelaxedInvalidate();
 }
 
-int LocklessQueue::Sender::buffer_index() const {
+int LocklessQueueSender::buffer_index() const {
   ::aos::ipc_lib::Sender *const sender = memory_->GetSender(sender_index_);
   // We can do a relaxed load on our sender because we're the only person
   // modifying it right now.
@@ -1102,13 +1017,119 @@
   return scratch_index.message_index();
 }
 
-LocklessQueue::ReadResult LocklessQueue::Read(
+LocklessQueuePinner::LocklessQueuePinner(
+    LocklessQueueMemory *memory, const LocklessQueueMemory *const_memory)
+    : memory_(memory), const_memory_(const_memory) {
+  GrabQueueSetupLockOrDie grab_queue_setup_lock(memory_);
+
+  // Since we already have the lock, go ahead and try cleaning up.
+  Cleanup(memory_, grab_queue_setup_lock);
+
+  const int num_pinners = memory_->num_pinners();
+
+  for (int i = 0; i < num_pinners; ++i) {
+    ::aos::ipc_lib::Pinner *p = memory->GetPinner(i);
+    // This doesn't need synchronization because we're the only process doing
+    // initialization right now, and nobody else will be touching pinners which
+    // we're interested in.
+    const uint32_t tid = __atomic_load_n(&(p->tid.futex), __ATOMIC_RELAXED);
+    if (tid == 0) {
+      pinner_index_ = i;
+      break;
+    }
+  }
+
+  if (pinner_index_ == -1) {
+    VLOG(1) << "Too many pinners, starting to bail.";
+    return;
+  }
+
+  ::aos::ipc_lib::Pinner *p = memory_->GetPinner(pinner_index_);
+  p->pinned.Invalidate();
+
+  // Indicate that we are now alive by taking over the slot. If the previous
+  // owner died, we still want to do this.
+  death_notification_init(&(p->tid));
+}
+
+LocklessQueuePinner::~LocklessQueuePinner() {
+  if (pinner_index_ != -1) {
+    CHECK(memory_ != nullptr);
+    memory_->GetPinner(pinner_index_)->pinned.Invalidate();
+    aos_compiler_memory_barrier();
+    death_notification_release(&(memory_->GetPinner(pinner_index_)->tid));
+  }
+}
+
+std::optional<LocklessQueuePinner> LocklessQueuePinner::Make(
+    LocklessQueue queue) {
+  queue.Initialize();
+  LocklessQueuePinner result(queue.memory(), queue.const_memory());
+  if (result.pinner_index_ != -1) {
+    return std::move(result);
+  } else {
+    return std::nullopt;
+  }
+}
+
+// This method doesn't mess with any scratch_index, so it doesn't have to worry
+// about message ownership.
+int LocklessQueuePinner::PinIndex(uint32_t uint32_queue_index) {
+  const size_t queue_size = memory_->queue_size();
+  const QueueIndex queue_index =
+      QueueIndex::Zero(queue_size).IncrementBy(uint32_queue_index);
+  ipc_lib::Pinner *const pinner = memory_->GetPinner(pinner_index_);
+
+  AtomicIndex *const queue_slot = memory_->GetQueue(queue_index.Wrapped());
+
+  // Indicate that we want to pin this message.
+  pinner->pinned.Store(queue_index);
+  aos_compiler_memory_barrier();
+
+  {
+    const Index message_index = queue_slot->Load();
+    Message *const message = memory_->GetMessage(message_index);
+
+    const QueueIndex message_queue_index =
+        message->header.queue_index.Load(queue_size);
+    if (message_queue_index == queue_index) {
+      VLOG(3) << "Eq: " << std::hex << message_queue_index.index();
+      aos_compiler_memory_barrier();
+      return message_index.message_index();
+    }
+    VLOG(3) << "Message reused: " << std::hex << message_queue_index.index()
+            << ", " << queue_index.index();
+  }
+
+  // Being down here means we asked to pin a message before realizing it's no
+  // longer in the queue, so back that out now.
+  pinner->pinned.Invalidate();
+  VLOG(3) << "Unpinned: " << std::hex << queue_index.index();
+  return -1;
+}
+
+size_t LocklessQueuePinner::size() const {
+  return const_memory_->message_data_size();
+}
+
+const void *LocklessQueuePinner::Data() const {
+  const size_t queue_size = const_memory_->queue_size();
+  const ::aos::ipc_lib::Pinner *const pinner =
+      const_memory_->GetPinner(pinner_index_);
+  QueueIndex pinned = pinner->pinned.RelaxedLoad(queue_size);
+  CHECK(pinned.valid());
+  const Message *message = const_memory_->GetMessage(pinned);
+
+  return message->data(const_memory_->message_data_size());
+}
+
+LocklessQueueReader::Result LocklessQueueReader::Read(
     uint32_t uint32_queue_index,
     ::aos::monotonic_clock::time_point *monotonic_sent_time,
     ::aos::realtime_clock::time_point *realtime_sent_time,
     ::aos::monotonic_clock::time_point *monotonic_remote_time,
     ::aos::realtime_clock::time_point *realtime_remote_time,
-    uint32_t *remote_queue_index, size_t *length, char *data) {
+    uint32_t *remote_queue_index, size_t *length, char *data) const {
   const size_t queue_size = memory_->queue_size();
 
   // Build up the QueueIndex.
@@ -1117,7 +1138,7 @@
 
   // Read the message stored at the requested location.
   Index mi = memory_->LoadIndex(queue_index);
-  Message *m = memory_->GetMessage(mi);
+  const Message *m = memory_->GetMessage(mi);
 
   while (true) {
     // We need to confirm that the data doesn't change while we are reading it.
@@ -1130,13 +1151,13 @@
       if (starting_queue_index == queue_index.DecrementBy(queue_size)) {
         VLOG(3) << "Matches: " << std::hex << starting_queue_index.index()
                 << ", " << queue_index.DecrementBy(queue_size).index();
-        return ReadResult::NOTHING_NEW;
+        return Result::NOTHING_NEW;
       }
 
       // Someone has re-used this message between when we pulled it out of the
       // queue and when we grabbed its index.  It is pretty hard to deduce
       // what happened. Just try again.
-      Message *const new_m = memory_->GetMessage(queue_index);
+      const Message *const new_m = memory_->GetMessage(queue_index);
       if (m != new_m) {
         m = new_m;
         VLOG(3) << "Retrying, m doesn't match";
@@ -1154,7 +1175,7 @@
                 << ", got " << starting_queue_index.index() << ", behind by "
                 << std::dec
                 << (starting_queue_index.index() - queue_index.index());
-        return ReadResult::TOO_OLD;
+        return Result::TOO_OLD;
       }
 
       VLOG(3) << "Initial";
@@ -1167,10 +1188,10 @@
       // the queue.  Tell them that they are behind.
       if (uint32_queue_index < memory_->queue_size()) {
         VLOG(3) << "Near zero, " << std::hex << uint32_queue_index;
-        return ReadResult::NOTHING_NEW;
+        return Result::NOTHING_NEW;
       } else {
         VLOG(3) << "Not near zero, " << std::hex << uint32_queue_index;
-        return ReadResult::TOO_OLD;
+        return Result::TOO_OLD;
       }
     }
     VLOG(3) << "Eq: " << std::hex << starting_queue_index.index() << ", "
@@ -1190,7 +1211,8 @@
   *monotonic_remote_time = m->header.monotonic_remote_time;
   *realtime_remote_time = m->header.realtime_remote_time;
   if (data) {
-    memcpy(data, m->data(memory_->message_data_size()), message_data_size());
+    memcpy(data, m->data(memory_->message_data_size()),
+           memory_->message_data_size());
   }
   *length = m->header.length;
 
@@ -1205,18 +1227,13 @@
             << queue_index.index() << ", finished with "
             << final_queue_index.index() << ", delta: " << std::dec
             << (final_queue_index.index() - queue_index.index());
-    return ReadResult::OVERWROTE;
+    return Result::OVERWROTE;
   }
 
-  return ReadResult::GOOD;
+  return Result::GOOD;
 }
 
-size_t LocklessQueue::queue_size() const { return memory_->queue_size(); }
-size_t LocklessQueue::message_data_size() const {
-  return memory_->message_data_size();
-}
-
-QueueIndex LocklessQueue::LatestQueueIndex() {
+QueueIndex LocklessQueueReader::LatestIndex() const {
   const size_t queue_size = memory_->queue_size();
 
   // There is only one interesting case.  We need to know if the queue is empty.
@@ -1226,9 +1243,16 @@
   if (next_queue_index.valid()) {
     const QueueIndex current_queue_index = next_queue_index.DecrementBy(1u);
     return current_queue_index;
-  } else {
-    return empty_queue_index();
   }
+  return QueueIndex::Invalid();
+}
+
+size_t LocklessQueueSize(const LocklessQueueMemory *memory) {
+  return memory->queue_size();
+}
+
+size_t LocklessQueueMessageDataSize(const LocklessQueueMemory *memory) {
+  return memory->message_data_size();
 }
 
 namespace {