Improve const-correctness in the LocklessQueue code

This will allow using a read-only mapping for reading data from queues
in ShmEventLoop. This will make it much harder for code reading from a
queue to accidentally modify it.

This involves splitting up LocklessQueue into individual components.
This makes it much more obvious what state is used where, and allows
adding some consts.

Change-Id: Ic83b0d2169e6dfae3eec656aa8e49852125698d9
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index e45c065..1ac8656 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -164,6 +164,10 @@
 
   const ipc_lib::LocklessQueueConfiguration &config() const { return config_; }
 
+  ipc_lib::LocklessQueue queue() const {
+    return ipc_lib::LocklessQueue(memory(), memory(), config());
+  }
+
   absl::Span<char> GetSharedMemory() const {
     return absl::Span<char>(static_cast<char *>(data_), size_);
   }
@@ -207,8 +211,7 @@
             channel,
             chrono::ceil<chrono::seconds>(chrono::nanoseconds(
                 event_loop->configuration()->channel_storage_duration()))),
-        lockless_queue_(lockless_queue_memory_.memory(),
-                        lockless_queue_memory_.config()) {
+        reader_(lockless_queue_memory_.queue()) {
     context_.data = nullptr;
     // Point the queue index at the next index to read starting now.  This
     // makes it such that FetchNext will read the next message sent after
@@ -238,7 +241,8 @@
   // Sets this object to pin data in shared memory when fetching.
   void PinDataOnFetch() {
     CHECK(!copy_data());
-    auto maybe_pinner = lockless_queue_.MakePinner();
+    auto maybe_pinner =
+        ipc_lib::LocklessQueuePinner::Make(lockless_queue_memory_.queue());
     if (!maybe_pinner) {
       LOG(FATAL) << "Failed to create reader on "
                  << configuration::CleanedChannelToString(channel_)
@@ -250,26 +254,26 @@
   // Points the next message to fetch at the queue index which will be
   // populated next.
   void PointAtNextQueueIndex() {
-    actual_queue_index_ = lockless_queue_.LatestQueueIndex();
+    actual_queue_index_ = reader_.LatestIndex();
     if (!actual_queue_index_.valid()) {
       // Nothing in the queue.  The next element will show up at the 0th
       // index in the queue.
-      actual_queue_index_ =
-          ipc_lib::QueueIndex::Zero(lockless_queue_.queue_size());
+      actual_queue_index_ = ipc_lib::QueueIndex::Zero(
+          LocklessQueueSize(lockless_queue_memory_.memory()));
     } else {
       actual_queue_index_ = actual_queue_index_.Increment();
     }
   }
 
   bool FetchNext() {
-    const ipc_lib::LocklessQueue::ReadResult read_result =
+    const ipc_lib::LocklessQueueReader::Result read_result =
         DoFetch(actual_queue_index_);
 
-    return read_result == ipc_lib::LocklessQueue::ReadResult::GOOD;
+    return read_result == ipc_lib::LocklessQueueReader::Result::GOOD;
   }
 
   bool Fetch() {
-    const ipc_lib::QueueIndex queue_index = lockless_queue_.LatestQueueIndex();
+    const ipc_lib::QueueIndex queue_index = reader_.LatestIndex();
     // actual_queue_index_ is only meaningful if it was set by Fetch or
     // FetchNext.  This happens when valid_data_ has been set.  So, only
     // skip checking if valid_data_ is true.
@@ -282,22 +286,29 @@
       return false;
     }
 
-    const ipc_lib::LocklessQueue::ReadResult read_result = DoFetch(queue_index);
+    const ipc_lib::LocklessQueueReader::Result read_result =
+        DoFetch(queue_index);
 
-    CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::NOTHING_NEW)
+    CHECK(read_result != ipc_lib::LocklessQueueReader::Result::NOTHING_NEW)
         << ": Queue index went backwards.  This should never happen.  "
         << configuration::CleanedChannelToString(channel_);
 
-    return read_result == ipc_lib::LocklessQueue::ReadResult::GOOD;
+    return read_result == ipc_lib::LocklessQueueReader::Result::GOOD;
   }
 
   Context context() const { return context_; }
 
   bool RegisterWakeup(int priority) {
-    return lockless_queue_.RegisterWakeup(priority);
+    CHECK(!watcher_);
+    watcher_ = ipc_lib::LocklessQueueWatcher::Make(
+        lockless_queue_memory_.queue(), priority);
+    return static_cast<bool>(watcher_);
   }
 
-  void UnregisterWakeup() { lockless_queue_.UnregisterWakeup(); }
+  void UnregisterWakeup() {
+    CHECK(watcher_);
+    watcher_ = std::nullopt;
+  }
 
   absl::Span<char> GetSharedMemory() const {
     return lockless_queue_memory_.GetSharedMemory();
@@ -309,23 +320,24 @@
     // grab the whole shared memory buffer instead.
     return absl::Span<char>(
         const_cast<SimpleShmFetcher *>(this)->data_storage_start(),
-        lockless_queue_.message_data_size());
+        LocklessQueueMessageDataSize(lockless_queue_memory_.memory()));
   }
 
  private:
-  ipc_lib::LocklessQueue::ReadResult DoFetch(ipc_lib::QueueIndex queue_index) {
+  ipc_lib::LocklessQueueReader::Result DoFetch(
+      ipc_lib::QueueIndex queue_index) {
     // TODO(austin): Get behind and make sure it dies.
     char *copy_buffer = nullptr;
     if (copy_data()) {
       copy_buffer = data_storage_start();
     }
-    ipc_lib::LocklessQueue::ReadResult read_result = lockless_queue_.Read(
+    ipc_lib::LocklessQueueReader::Result read_result = reader_.Read(
         queue_index.index(), &context_.monotonic_event_time,
         &context_.realtime_event_time, &context_.monotonic_remote_time,
         &context_.realtime_remote_time, &context_.remote_queue_index,
         &context_.size, copy_buffer);
 
-    if (read_result == ipc_lib::LocklessQueue::ReadResult::GOOD) {
+    if (read_result == ipc_lib::LocklessQueueReader::Result::GOOD) {
       if (pin_data()) {
         const int pin_result = pinner_->PinIndex(queue_index.index());
         CHECK(pin_result >= 0)
@@ -351,7 +363,9 @@
       const char *const data = DataBuffer();
       if (data) {
         context_.data =
-            data + lockless_queue_.message_data_size() - context_.size;
+            data +
+            LocklessQueueMessageDataSize(lockless_queue_memory_.memory()) -
+            context_.size;
       } else {
         context_.data = nullptr;
       }
@@ -361,7 +375,7 @@
     // Make sure the data wasn't modified while we were reading it.  This
     // can only happen if you are reading the last message *while* it is
     // being written to, which means you are pretty far behind.
-    CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::OVERWROTE)
+    CHECK(read_result != ipc_lib::LocklessQueueReader::Result::OVERWROTE)
         << ": Got behind while reading and the last message was modified "
            "out from under us while we were reading it.  Don't get so far "
            "behind on: "
@@ -370,7 +384,7 @@
     // We fell behind between when we read the index and read the value.
     // This isn't worth recovering from since this means we went to sleep
     // for a long time in the middle of this function.
-    if (read_result == ipc_lib::LocklessQueue::ReadResult::TOO_OLD) {
+    if (read_result == ipc_lib::LocklessQueueReader::Result::TOO_OLD) {
       event_loop_->SendTimingReport();
       LOG(FATAL) << "The next message is no longer available.  "
                  << configuration::CleanedChannelToString(channel_);
@@ -402,16 +416,17 @@
   aos::ShmEventLoop *event_loop_;
   const Channel *const channel_;
   MMapedQueue lockless_queue_memory_;
-  ipc_lib::LocklessQueue lockless_queue_;
+  ipc_lib::LocklessQueueReader reader_;
+  // This being nullopt indicates we're not looking for wakeups right now.
+  std::optional<ipc_lib::LocklessQueueWatcher> watcher_;
 
-  ipc_lib::QueueIndex actual_queue_index_ =
-      ipc_lib::LocklessQueue::empty_queue_index();
+  ipc_lib::QueueIndex actual_queue_index_ = ipc_lib::QueueIndex::Invalid();
 
   // This being empty indicates we're not going to copy data.
   std::unique_ptr<char, decltype(&free)> data_storage_{nullptr, &free};
 
   // This being nullopt indicates we're not going to pin messages.
-  std::optional<ipc_lib::LocklessQueue::Pinner> pinner_;
+  std::optional<ipc_lib::LocklessQueuePinner> pinner_;
 
   Context context_;
 };
@@ -458,15 +473,15 @@
             channel,
             chrono::ceil<chrono::seconds>(chrono::nanoseconds(
                 event_loop->configuration()->channel_storage_duration()))),
-        lockless_queue_(lockless_queue_memory_.memory(),
-                        lockless_queue_memory_.config()),
-        lockless_queue_sender_(
-            VerifySender(lockless_queue_.MakeSender(), channel)) {}
+        lockless_queue_sender_(VerifySender(
+            ipc_lib::LocklessQueueSender::Make(lockless_queue_memory_.queue()),
+            channel)),
+        wake_upper_(lockless_queue_memory_.queue()) {}
 
   ~ShmSender() override {}
 
-  static ipc_lib::LocklessQueue::Sender VerifySender(
-      std::optional<ipc_lib::LocklessQueue::Sender> &&sender,
+  static ipc_lib::LocklessQueueSender VerifySender(
+      std::optional<ipc_lib::LocklessQueueSender> sender,
       const Channel *channel) {
     if (sender) {
       return std::move(sender.value());
@@ -488,7 +503,7 @@
     lockless_queue_sender_.Send(
         length, monotonic_remote_time, realtime_remote_time, remote_queue_index,
         &monotonic_sent_time_, &realtime_sent_time_, &sent_queue_index_);
-    lockless_queue_.Wakeup(event_loop()->priority());
+    wake_upper_.Wakeup(event_loop()->priority());
     return true;
   }
 
@@ -503,7 +518,7 @@
                                 monotonic_remote_time, realtime_remote_time,
                                 remote_queue_index, &monotonic_sent_time_,
                                 &realtime_sent_time_, &sent_queue_index_);
-    lockless_queue_.Wakeup(event_loop()->priority());
+    wake_upper_.Wakeup(event_loop()->priority());
     // TODO(austin): Return an error if we send too fast.
     return true;
   }
@@ -516,8 +531,8 @@
 
  private:
   MMapedQueue lockless_queue_memory_;
-  ipc_lib::LocklessQueue lockless_queue_;
-  ipc_lib::LocklessQueue::Sender lockless_queue_sender_;
+  ipc_lib::LocklessQueueSender lockless_queue_sender_;
+  ipc_lib::LocklessQueueWakeUpper wake_upper_;
 };
 
 // Class to manage the state for a Watcher.
diff --git a/aos/ipc_lib/index.h b/aos/ipc_lib/index.h
index 890e85f..a47121e 100644
--- a/aos/ipc_lib/index.h
+++ b/aos/ipc_lib/index.h
@@ -133,12 +133,12 @@
 struct AtomicQueueIndex {
  public:
   // Atomically reads the index without any ordering constraints.
-  QueueIndex RelaxedLoad(uint32_t count) {
+  QueueIndex RelaxedLoad(uint32_t count) const {
     return QueueIndex(index_.load(::std::memory_order_relaxed), count);
   }
 
   // Full bidirectional barriers here.
-  QueueIndex Load(uint32_t count) {
+  QueueIndex Load(uint32_t count) const {
     return QueueIndex(index_.load(::std::memory_order_acquire), count);
   }
   inline void Store(QueueIndex value) {
@@ -222,7 +222,7 @@
 class AtomicIndex {
  public:
   // Stores and loads atomically without ordering constraints.
-  Index RelaxedLoad() {
+  Index RelaxedLoad() const {
     return Index(index_.load(::std::memory_order_relaxed));
   }
   void RelaxedStore(Index index) {
@@ -237,7 +237,7 @@
   void Store(Index index) {
     index_.store(index.index_, ::std::memory_order_release);
   }
-  Index Load() { return Index(index_.load(::std::memory_order_acquire)); }
+  Index Load() const { return Index(index_.load(::std::memory_order_acquire)); }
 
   // Swaps expected for index atomically.  Returns true on success, false
   // otherwise.
diff --git a/aos/ipc_lib/lockless_queue.cc b/aos/ipc_lib/lockless_queue.cc
index dad44bb..4115769 100644
--- a/aos/ipc_lib/lockless_queue.cc
+++ b/aos/ipc_lib/lockless_queue.cc
@@ -424,6 +424,13 @@
   return syscall(SYS_rt_tgsigqueueinfo, tgid, tid, sig, si);
 }
 
+QueueIndex ZeroOrValid(QueueIndex index) {
+  if (!index.valid()) {
+    return index.Clear();
+  }
+  return index;
+}
+
 }  // namespace
 
 size_t LocklessQueueConfiguration::message_size() const {
@@ -571,30 +578,57 @@
   return memory;
 }
 
-LocklessQueue::LocklessQueue(LocklessQueueMemory *memory,
-                             LocklessQueueConfiguration config)
-    : memory_(InitializeLocklessQueueMemory(memory, config)),
-      watcher_copy_(memory_->num_watchers()),
-      pid_(getpid()),
-      uid_(getuid()) {}
+void LocklessQueue::Initialize() {
+  InitializeLocklessQueueMemory(memory_, config_);
+}
 
-LocklessQueue::~LocklessQueue() {
-  CHECK_EQ(watcher_index_, -1);
+LocklessQueueWatcher::~LocklessQueueWatcher() {
+  if (watcher_index_ == -1) {
+    return;
+  }
 
+  // Since everything is self consistent, all we need to do is make sure nobody
+  // else is running.  Someone dying will get caught in the generic consistency
+  // check.
   GrabQueueSetupLockOrDie grab_queue_setup_lock(memory_);
-  const int num_watchers = memory_->num_watchers();
+
+  // Make sure we are registered.
+  CHECK_NE(watcher_index_, -1);
+
+  // Make sure we still own the slot we are supposed to.
+  CHECK(
+      death_notification_is_held(&(memory_->GetWatcher(watcher_index_)->tid)));
+
+  // The act of unlocking invalidates the entry.  Invalidate it.
+  death_notification_release(&(memory_->GetWatcher(watcher_index_)->tid));
+  // And internally forget the slot.
+  watcher_index_ = -1;
+
   // Cleanup is cheap. The next user will do it anyways, so no need for us to do
   // anything right now.
 
   // And confirm that nothing is owned by us.
+  const int num_watchers = memory_->num_watchers();
   for (int i = 0; i < num_watchers; ++i) {
-    CHECK(!death_notification_is_held(&(memory_->GetWatcher(i)->tid)));
+    CHECK(!death_notification_is_held(&(memory_->GetWatcher(i)->tid)))
+        << ": " << i;
   }
 }
 
-size_t LocklessQueue::QueueSize() const { return memory_->queue_size(); }
+std::optional<LocklessQueueWatcher> LocklessQueueWatcher::Make(
+    LocklessQueue queue, int priority) {
+  queue.Initialize();
+  LocklessQueueWatcher result(queue.memory(), priority);
+  if (result.watcher_index_ != -1) {
+    return std::move(result);
+  } else {
+    return std::nullopt;
+  }
+}
 
-bool LocklessQueue::RegisterWakeup(int priority) {
+LocklessQueueWatcher::LocklessQueueWatcher(LocklessQueueMemory *memory,
+                                           int priority)
+    : memory_(memory) {
   // TODO(austin): Make sure signal coalescing is turned on.  We don't need
   // duplicates.  That will improve performance under high load.
 
@@ -623,10 +657,10 @@
 
   // Bail if we failed to find an open slot.
   if (watcher_index_ == -1) {
-    return false;
+    return;
   }
 
-  Watcher *w = memory_->GetWatcher(watcher_index_);
+  Watcher *const w = memory_->GetWatcher(watcher_index_);
 
   w->pid = getpid();
   w->priority = priority;
@@ -634,29 +668,15 @@
   // Grabbing a mutex is a compiler and memory barrier, so nothing before will
   // get rearranged afterwords.
   death_notification_init(&(w->tid));
-  return true;
 }
 
-void LocklessQueue::UnregisterWakeup() {
-  // Since everything is self consistent, all we need to do is make sure nobody
-  // else is running.  Someone dying will get caught in the generic consistency
-  // check.
-  GrabQueueSetupLockOrDie grab_queue_setup_lock(memory_);
-
-  // Make sure we are registered.
-  CHECK_NE(watcher_index_, -1);
-
-  // Make sure we still own the slot we are supposed to.
-  CHECK(
-      death_notification_is_held(&(memory_->GetWatcher(watcher_index_)->tid)));
-
-  // The act of unlocking invalidates the entry.  Invalidate it.
-  death_notification_release(&(memory_->GetWatcher(watcher_index_)->tid));
-  // And internally forget the slot.
-  watcher_index_ = -1;
+LocklessQueueWakeUpper::LocklessQueueWakeUpper(LocklessQueue queue)
+    : memory_(queue.const_memory()), pid_(getpid()), uid_(getuid()) {
+  queue.Initialize();
+  watcher_copy_.resize(memory_->num_watchers());
 }
 
-int LocklessQueue::Wakeup(const int current_priority) {
+int LocklessQueueWakeUpper::Wakeup(const int current_priority) {
   const size_t num_watchers = memory_->num_watchers();
 
   CHECK_EQ(watcher_copy_.size(), num_watchers);
@@ -668,7 +688,7 @@
   // question.  There is no way without pidfd's to close this window, and
   // creating a pidfd is likely not RT.
   for (size_t i = 0; i < num_watchers; ++i) {
-    Watcher *w = memory_->GetWatcher(i);
+    const Watcher *w = memory_->GetWatcher(i);
     watcher_copy_[i].tid = __atomic_load_n(&(w->tid.futex), __ATOMIC_RELAXED);
     // Force the load of the TID to come first.
     aos_compiler_memory_barrier();
@@ -741,7 +761,8 @@
   return count;
 }
 
-LocklessQueue::Sender::Sender(LocklessQueueMemory *memory) : memory_(memory) {
+LocklessQueueSender::LocklessQueueSender(LocklessQueueMemory *memory)
+    : memory_(memory) {
   GrabQueueSetupLockOrDie grab_queue_setup_lock(memory_);
 
   // Since we already have the lock, go ahead and try cleaning up.
@@ -778,15 +799,29 @@
       << ": " << std::hex << scratch_index.get();
 }
 
-LocklessQueue::Sender::~Sender() {
-  if (valid()) {
+LocklessQueueSender::~LocklessQueueSender() {
+  if (sender_index_ != -1) {
+    CHECK(memory_ != nullptr);
     death_notification_release(&(memory_->GetSender(sender_index_)->tid));
   }
 }
 
-size_t LocklessQueue::Sender::size() { return memory_->message_data_size(); }
+std::optional<LocklessQueueSender> LocklessQueueSender::Make(
+    LocklessQueue queue) {
+  queue.Initialize();
+  LocklessQueueSender result(queue.memory());
+  if (result.sender_index_ != -1) {
+    return std::move(result);
+  } else {
+    return std::nullopt;
+  }
+}
 
-void *LocklessQueue::Sender::Data() {
+size_t LocklessQueueSender::size() const {
+  return memory_->message_data_size();
+}
+
+void *LocklessQueueSender::Data() {
   ::aos::ipc_lib::Sender *sender = memory_->GetSender(sender_index_);
   const Index scratch_index = sender->scratch_index.RelaxedLoad();
   Message *const message = memory_->GetMessage(scratch_index);
@@ -799,7 +834,7 @@
   return message->data(memory_->message_data_size());
 }
 
-void LocklessQueue::Sender::Send(
+void LocklessQueueSender::Send(
     const char *data, size_t length,
     aos::monotonic_clock::time_point monotonic_remote_time,
     aos::realtime_clock::time_point realtime_remote_time,
@@ -816,127 +851,7 @@
        monotonic_sent_time, realtime_sent_time, queue_index);
 }
 
-LocklessQueue::Pinner::Pinner(LocklessQueueMemory *memory) : memory_(memory) {
-  GrabQueueSetupLockOrDie grab_queue_setup_lock(memory_);
-
-  // Since we already have the lock, go ahead and try cleaning up.
-  Cleanup(memory_, grab_queue_setup_lock);
-
-  const int num_pinners = memory_->num_pinners();
-
-  for (int i = 0; i < num_pinners; ++i) {
-    ::aos::ipc_lib::Pinner *p = memory->GetPinner(i);
-    // This doesn't need synchronization because we're the only process doing
-    // initialization right now, and nobody else will be touching pinners which
-    // we're interested in.
-    const uint32_t tid = __atomic_load_n(&(p->tid.futex), __ATOMIC_RELAXED);
-    if (tid == 0) {
-      pinner_index_ = i;
-      break;
-    }
-  }
-
-  if (pinner_index_ == -1) {
-    VLOG(1) << "Too many pinners, starting to bail.";
-    return;
-  }
-
-  ::aos::ipc_lib::Pinner *p = memory_->GetPinner(pinner_index_);
-  p->pinned.Invalidate();
-
-  // Indicate that we are now alive by taking over the slot. If the previous
-  // owner died, we still want to do this.
-  death_notification_init(&(p->tid));
-}
-
-LocklessQueue::Pinner::~Pinner() {
-  if (valid()) {
-    memory_->GetPinner(pinner_index_)->pinned.Invalidate();
-    aos_compiler_memory_barrier();
-    death_notification_release(&(memory_->GetPinner(pinner_index_)->tid));
-  }
-}
-
-// This method doesn't mess with any scratch_index, so it doesn't have to worry
-// about message ownership.
-int LocklessQueue::Pinner::PinIndex(uint32_t uint32_queue_index) {
-  const size_t queue_size = memory_->queue_size();
-  const QueueIndex queue_index =
-      QueueIndex::Zero(queue_size).IncrementBy(uint32_queue_index);
-  ipc_lib::Pinner *const pinner = memory_->GetPinner(pinner_index_);
-
-  AtomicIndex *const queue_slot = memory_->GetQueue(queue_index.Wrapped());
-
-  // Indicate that we want to pin this message.
-  pinner->pinned.Store(queue_index);
-  aos_compiler_memory_barrier();
-
-  {
-    const Index message_index = queue_slot->Load();
-    Message *const message = memory_->GetMessage(message_index);
-
-    const QueueIndex message_queue_index =
-        message->header.queue_index.Load(queue_size);
-    if (message_queue_index == queue_index) {
-      VLOG(3) << "Eq: " << std::hex << message_queue_index.index();
-      aos_compiler_memory_barrier();
-      return message_index.message_index();
-    }
-    VLOG(3) << "Message reused: " << std::hex << message_queue_index.index()
-            << ", " << queue_index.index();
-  }
-
-  // Being down here means we asked to pin a message before realizing it's no
-  // longer in the queue, so back that out now.
-  pinner->pinned.Invalidate();
-  VLOG(3) << "Unpinned: " << std::hex << queue_index.index();
-  return -1;
-}
-
-size_t LocklessQueue::Pinner::size() const {
-  return memory_->message_data_size();
-}
-
-const void *LocklessQueue::Pinner::Data() const {
-  const size_t queue_size = memory_->queue_size();
-  ::aos::ipc_lib::Pinner *pinner = memory_->GetPinner(pinner_index_);
-  QueueIndex pinned = pinner->pinned.RelaxedLoad(queue_size);
-  CHECK(pinned.valid());
-  const Message *message = memory_->GetMessage(pinned);
-
-  return message->data(memory_->message_data_size());
-}
-
-std::optional<LocklessQueue::Sender> LocklessQueue::MakeSender() {
-  LocklessQueue::Sender result = LocklessQueue::Sender(memory_);
-  if (result.valid()) {
-    return std::move(result);
-  } else {
-    return std::nullopt;
-  }
-}
-
-std::optional<LocklessQueue::Pinner> LocklessQueue::MakePinner() {
-  LocklessQueue::Pinner result = LocklessQueue::Pinner(memory_);
-  if (result.valid()) {
-    return std::move(result);
-  } else {
-    return std::nullopt;
-  }
-}
-
-namespace {
-
-QueueIndex ZeroOrValid(QueueIndex index) {
-  if (!index.valid()) {
-    return index.Clear();
-  }
-  return index;
-}
-
-}  // namespace
-
-void LocklessQueue::Sender::Send(
+void LocklessQueueSender::Send(
     size_t length, aos::monotonic_clock::time_point monotonic_remote_time,
     aos::realtime_clock::time_point realtime_remote_time,
     uint32_t remote_queue_index,
@@ -1094,7 +1009,7 @@
   memory_->GetMessage(new_scratch)->header.queue_index.RelaxedInvalidate();
 }
 
-int LocklessQueue::Sender::buffer_index() const {
+int LocklessQueueSender::buffer_index() const {
   ::aos::ipc_lib::Sender *const sender = memory_->GetSender(sender_index_);
   // We can do a relaxed load on our sender because we're the only person
   // modifying it right now.
@@ -1102,13 +1017,119 @@
   return scratch_index.message_index();
 }
 
-LocklessQueue::ReadResult LocklessQueue::Read(
+LocklessQueuePinner::LocklessQueuePinner(
+    LocklessQueueMemory *memory, const LocklessQueueMemory *const_memory)
+    : memory_(memory), const_memory_(const_memory) {
+  GrabQueueSetupLockOrDie grab_queue_setup_lock(memory_);
+
+  // Since we already have the lock, go ahead and try cleaning up.
+  Cleanup(memory_, grab_queue_setup_lock);
+
+  const int num_pinners = memory_->num_pinners();
+
+  for (int i = 0; i < num_pinners; ++i) {
+    ::aos::ipc_lib::Pinner *p = memory->GetPinner(i);
+    // This doesn't need synchronization because we're the only process doing
+    // initialization right now, and nobody else will be touching pinners which
+    // we're interested in.
+    const uint32_t tid = __atomic_load_n(&(p->tid.futex), __ATOMIC_RELAXED);
+    if (tid == 0) {
+      pinner_index_ = i;
+      break;
+    }
+  }
+
+  if (pinner_index_ == -1) {
+    VLOG(1) << "Too many pinners, starting to bail.";
+    return;
+  }
+
+  ::aos::ipc_lib::Pinner *p = memory_->GetPinner(pinner_index_);
+  p->pinned.Invalidate();
+
+  // Indicate that we are now alive by taking over the slot. If the previous
+  // owner died, we still want to do this.
+  death_notification_init(&(p->tid));
+}
+
+LocklessQueuePinner::~LocklessQueuePinner() {
+  if (pinner_index_ != -1) {
+    CHECK(memory_ != nullptr);
+    memory_->GetPinner(pinner_index_)->pinned.Invalidate();
+    aos_compiler_memory_barrier();
+    death_notification_release(&(memory_->GetPinner(pinner_index_)->tid));
+  }
+}
+
+std::optional<LocklessQueuePinner> LocklessQueuePinner::Make(
+    LocklessQueue queue) {
+  queue.Initialize();
+  LocklessQueuePinner result(queue.memory(), queue.const_memory());
+  if (result.pinner_index_ != -1) {
+    return std::move(result);
+  } else {
+    return std::nullopt;
+  }
+}
+
+// This method doesn't mess with any scratch_index, so it doesn't have to worry
+// about message ownership.
+int LocklessQueuePinner::PinIndex(uint32_t uint32_queue_index) {
+  const size_t queue_size = memory_->queue_size();
+  const QueueIndex queue_index =
+      QueueIndex::Zero(queue_size).IncrementBy(uint32_queue_index);
+  ipc_lib::Pinner *const pinner = memory_->GetPinner(pinner_index_);
+
+  AtomicIndex *const queue_slot = memory_->GetQueue(queue_index.Wrapped());
+
+  // Indicate that we want to pin this message.
+  pinner->pinned.Store(queue_index);
+  aos_compiler_memory_barrier();
+
+  {
+    const Index message_index = queue_slot->Load();
+    Message *const message = memory_->GetMessage(message_index);
+
+    const QueueIndex message_queue_index =
+        message->header.queue_index.Load(queue_size);
+    if (message_queue_index == queue_index) {
+      VLOG(3) << "Eq: " << std::hex << message_queue_index.index();
+      aos_compiler_memory_barrier();
+      return message_index.message_index();
+    }
+    VLOG(3) << "Message reused: " << std::hex << message_queue_index.index()
+            << ", " << queue_index.index();
+  }
+
+  // Being down here means we asked to pin a message before realizing it's no
+  // longer in the queue, so back that out now.
+  pinner->pinned.Invalidate();
+  VLOG(3) << "Unpinned: " << std::hex << queue_index.index();
+  return -1;
+}
+
+size_t LocklessQueuePinner::size() const {
+  return const_memory_->message_data_size();
+}
+
+const void *LocklessQueuePinner::Data() const {
+  const size_t queue_size = const_memory_->queue_size();
+  const ::aos::ipc_lib::Pinner *const pinner =
+      const_memory_->GetPinner(pinner_index_);
+  QueueIndex pinned = pinner->pinned.RelaxedLoad(queue_size);
+  CHECK(pinned.valid());
+  const Message *message = const_memory_->GetMessage(pinned);
+
+  return message->data(const_memory_->message_data_size());
+}
+
+LocklessQueueReader::Result LocklessQueueReader::Read(
     uint32_t uint32_queue_index,
     ::aos::monotonic_clock::time_point *monotonic_sent_time,
     ::aos::realtime_clock::time_point *realtime_sent_time,
     ::aos::monotonic_clock::time_point *monotonic_remote_time,
     ::aos::realtime_clock::time_point *realtime_remote_time,
-    uint32_t *remote_queue_index, size_t *length, char *data) {
+    uint32_t *remote_queue_index, size_t *length, char *data) const {
   const size_t queue_size = memory_->queue_size();
 
   // Build up the QueueIndex.
@@ -1117,7 +1138,7 @@
 
   // Read the message stored at the requested location.
   Index mi = memory_->LoadIndex(queue_index);
-  Message *m = memory_->GetMessage(mi);
+  const Message *m = memory_->GetMessage(mi);
 
   while (true) {
     // We need to confirm that the data doesn't change while we are reading it.
@@ -1130,13 +1151,13 @@
       if (starting_queue_index == queue_index.DecrementBy(queue_size)) {
         VLOG(3) << "Matches: " << std::hex << starting_queue_index.index()
                 << ", " << queue_index.DecrementBy(queue_size).index();
-        return ReadResult::NOTHING_NEW;
+        return Result::NOTHING_NEW;
       }
 
       // Someone has re-used this message between when we pulled it out of the
       // queue and when we grabbed its index.  It is pretty hard to deduce
       // what happened. Just try again.
-      Message *const new_m = memory_->GetMessage(queue_index);
+      const Message *const new_m = memory_->GetMessage(queue_index);
       if (m != new_m) {
         m = new_m;
         VLOG(3) << "Retrying, m doesn't match";
@@ -1154,7 +1175,7 @@
                 << ", got " << starting_queue_index.index() << ", behind by "
                 << std::dec
                 << (starting_queue_index.index() - queue_index.index());
-        return ReadResult::TOO_OLD;
+        return Result::TOO_OLD;
       }
 
       VLOG(3) << "Initial";
@@ -1167,10 +1188,10 @@
       // the queue.  Tell them that they are behind.
       if (uint32_queue_index < memory_->queue_size()) {
         VLOG(3) << "Near zero, " << std::hex << uint32_queue_index;
-        return ReadResult::NOTHING_NEW;
+        return Result::NOTHING_NEW;
       } else {
         VLOG(3) << "Not near zero, " << std::hex << uint32_queue_index;
-        return ReadResult::TOO_OLD;
+        return Result::TOO_OLD;
       }
     }
     VLOG(3) << "Eq: " << std::hex << starting_queue_index.index() << ", "
@@ -1190,7 +1211,8 @@
   *monotonic_remote_time = m->header.monotonic_remote_time;
   *realtime_remote_time = m->header.realtime_remote_time;
   if (data) {
-    memcpy(data, m->data(memory_->message_data_size()), message_data_size());
+    memcpy(data, m->data(memory_->message_data_size()),
+           memory_->message_data_size());
   }
   *length = m->header.length;
 
@@ -1205,18 +1227,13 @@
             << queue_index.index() << ", finished with "
             << final_queue_index.index() << ", delta: " << std::dec
             << (final_queue_index.index() - queue_index.index());
-    return ReadResult::OVERWROTE;
+    return Result::OVERWROTE;
   }
 
-  return ReadResult::GOOD;
+  return Result::GOOD;
 }
 
-size_t LocklessQueue::queue_size() const { return memory_->queue_size(); }
-size_t LocklessQueue::message_data_size() const {
-  return memory_->message_data_size();
-}
-
-QueueIndex LocklessQueue::LatestQueueIndex() {
+QueueIndex LocklessQueueReader::LatestIndex() const {
   const size_t queue_size = memory_->queue_size();
 
   // There is only one interesting case.  We need to know if the queue is empty.
@@ -1226,9 +1243,16 @@
   if (next_queue_index.valid()) {
     const QueueIndex current_queue_index = next_queue_index.DecrementBy(1u);
     return current_queue_index;
-  } else {
-    return empty_queue_index();
   }
+  return QueueIndex::Invalid();
+}
+
+size_t LocklessQueueSize(const LocklessQueueMemory *memory) {
+  return memory->queue_size();
+}
+
+size_t LocklessQueueMessageDataSize(const LocklessQueueMemory *memory) {
+  return memory->message_data_size();
 }
 
 namespace {
diff --git a/aos/ipc_lib/lockless_queue.h b/aos/ipc_lib/lockless_queue.h
index afa7ced..3cd3726 100644
--- a/aos/ipc_lib/lockless_queue.h
+++ b/aos/ipc_lib/lockless_queue.h
@@ -134,40 +134,72 @@
 // is done before the watcher goes RT), but needs to be RT for the sender.
 struct LocklessQueueMemory;
 
+// Returns the size of the LocklessQueueMemory.
+size_t LocklessQueueMemorySize(LocklessQueueConfiguration config);
+
 // Initializes the queue memory.  memory must be either a valid pointer to the
 // queue datastructure, or must be zero initialized.
 LocklessQueueMemory *InitializeLocklessQueueMemory(
     LocklessQueueMemory *memory, LocklessQueueConfiguration config);
 
-// Returns the size of the LocklessQueueMemory.
-size_t LocklessQueueMemorySize(LocklessQueueConfiguration config);
-
-// Prints to stdout the data inside the queue for debugging.
-void PrintLocklessQueueMemory(LocklessQueueMemory *memory);
-
 const static unsigned int kWakeupSignal = SIGRTMIN + 2;
 
-// Class to manage sending and receiving data in the lockless queue.  This is
-// separate from the actual memory backing the queue so that memory can be
-// managed with mmap to share across the process boundary.
+// A convenient wrapper for accessing a lockless queue.
 class LocklessQueue {
  public:
-  LocklessQueue(LocklessQueueMemory *memory, LocklessQueueConfiguration config);
-  LocklessQueue(const LocklessQueue &) = delete;
-  LocklessQueue &operator=(const LocklessQueue &) = delete;
+  LocklessQueue(const LocklessQueueMemory *const_memory,
+                LocklessQueueMemory *memory, LocklessQueueConfiguration config)
+      : const_memory_(const_memory), memory_(memory), config_(config) {}
 
-  ~LocklessQueue();
+  void Initialize();
 
-  // Returns the number of messages in the queue.
-  size_t QueueSize() const;
+  LocklessQueueConfiguration config() const { return config_; }
 
-  size_t message_data_size() const;
+  const LocklessQueueMemory *const_memory() { return const_memory_; }
+  LocklessQueueMemory *memory() { return memory_; }
 
-  // Registers this thread to receive the kWakeupSignal signal when Wakeup is
-  // called. Returns false if there was an error in registration.
-  bool RegisterWakeup(int priority);
-  // Unregisters the wakeup.
-  void UnregisterWakeup();
+ private:
+  const LocklessQueueMemory *const_memory_;
+  LocklessQueueMemory *memory_;
+  LocklessQueueConfiguration config_;
+};
+
+class LocklessQueueWatcher {
+ public:
+  LocklessQueueWatcher(const LocklessQueueWatcher &) = delete;
+  LocklessQueueWatcher &operator=(const LocklessQueueWatcher &) = delete;
+  LocklessQueueWatcher(LocklessQueueWatcher &&other)
+      : memory_(other.memory_), watcher_index_(other.watcher_index_) {
+    other.watcher_index_ = -1;
+  }
+  LocklessQueueWatcher &operator=(LocklessQueueWatcher &&other) {
+    std::swap(memory_, other.memory_);
+    std::swap(watcher_index_, other.watcher_index_);
+    return *this;
+  }
+
+  ~LocklessQueueWatcher();
+
+  // Registers this thread to receive the kWakeupSignal signal when
+  // LocklessQueueWakeUpper::Wakeup is called. Returns nullopt if there was an
+  // error in registration.
+  // TODO(austin): Change the API if we find ourselves with more errors.
+  static std::optional<LocklessQueueWatcher> Make(LocklessQueue queue,
+                                                  int priority);
+
+ private:
+  LocklessQueueWatcher(LocklessQueueMemory *memory, int priority);
+
+  LocklessQueueMemory *memory_ = nullptr;
+
+  // Index in the watcher list that our entry is, or -1 if no watcher is
+  // registered.
+  int watcher_index_ = -1;
+};
+
+class LocklessQueueWakeUpper {
+ public:
+  LocklessQueueWakeUpper(LocklessQueue queue);
 
   // Sends the kWakeupSignal to all threads which have called RegisterWakeup.
   //
@@ -175,169 +207,7 @@
   // if nonrt.
   int Wakeup(int current_priority);
 
-  // If you ask for a queue index 2 past the newest, you will still get
-  // NOTHING_NEW until that gets overwritten with new data.  If you ask for an
-  // element newer than QueueSize() from the current message, we consider it
-  // behind by a large amount and return TOO_OLD.  If the message is modified
-  // out from underneath us as we read it, return OVERWROTE.
-  //
-  // data may be nullptr to indicate the data should not be copied.
-  enum class ReadResult { TOO_OLD, GOOD, NOTHING_NEW, OVERWROTE };
-  ReadResult Read(uint32_t queue_index,
-                  ::aos::monotonic_clock::time_point *monotonic_sent_time,
-                  ::aos::realtime_clock::time_point *realtime_sent_time,
-                  ::aos::monotonic_clock::time_point *monotonic_remote_time,
-                  ::aos::realtime_clock::time_point *realtime_remote_time,
-                  uint32_t *remote_queue_index, size_t *length, char *data);
-
-  // Returns the index to the latest queue message.  Returns empty_queue_index()
-  // if there are no messages in the queue.  Do note that this index wraps if
-  // more than 2^32 messages are sent.
-  QueueIndex LatestQueueIndex();
-  static QueueIndex empty_queue_index() { return QueueIndex::Invalid(); }
-
-  // Returns the size of the queue.  This is mostly useful for manipulating
-  // QueueIndex.
-  size_t queue_size() const;
-
-  // TODO(austin): Return the oldest queue index.  This lets us catch up nicely
-  // if we got behind.
-  // The easiest way to implement this is likely going to be to reserve the
-  // first modulo of values for the initial time around, and never reuse them.
-  // That lets us do a simple atomic read of the next index and deduce what has
-  // happened.  It will involve the simplest atomic operations.
-
-  // TODO(austin): Make it so we can find the indices which were sent just
-  // before and after a time with a binary search.
-
-  // Sender for blocks of data.  The resources associated with a sender are
-  // scoped to this object's lifetime.
-  class Sender {
-   public:
-    Sender(const Sender &) = delete;
-    Sender &operator=(const Sender &) = delete;
-    Sender(Sender &&other)
-        : memory_(other.memory_), sender_index_(other.sender_index_) {
-      other.memory_ = nullptr;
-      other.sender_index_ = -1;
-    }
-    Sender &operator=(Sender &&other) {
-      memory_ = other.memory_;
-      sender_index_ = other.sender_index_;
-      other.memory_ = nullptr;
-      other.sender_index_ = -1;
-      return *this;
-    }
-
-    ~Sender();
-
-    // Sends a message without copying the data.
-    // Copy at most size() bytes of data into the memory pointed to by Data(),
-    // and then call Send().
-    // Note: calls to Data() are expensive enough that you should cache it.
-    size_t size();
-    void *Data();
-    void Send(size_t length,
-              aos::monotonic_clock::time_point monotonic_remote_time =
-                  aos::monotonic_clock::min_time,
-              aos::realtime_clock::time_point realtime_remote_time =
-                  aos::realtime_clock::min_time,
-              uint32_t remote_queue_index = 0xffffffff,
-              aos::monotonic_clock::time_point *monotonic_sent_time = nullptr,
-              aos::realtime_clock::time_point *realtime_sent_time = nullptr,
-              uint32_t *queue_index = nullptr);
-
-    // Sends up to length data.  Does not wakeup the target.
-    void Send(const char *data, size_t length,
-              aos::monotonic_clock::time_point monotonic_remote_time =
-                  aos::monotonic_clock::min_time,
-              aos::realtime_clock::time_point realtime_remote_time =
-                  aos::realtime_clock::min_time,
-              uint32_t remote_queue_index = 0xffffffff,
-              aos::monotonic_clock::time_point *monotonic_sent_time = nullptr,
-              aos::realtime_clock::time_point *realtime_sent_time = nullptr,
-              uint32_t *queue_index = nullptr);
-
-    int buffer_index() const;
-
-   private:
-    friend class LocklessQueue;
-
-    Sender(LocklessQueueMemory *memory);
-
-    // Returns true if this sender is valid.  If it isn't valid, any of the
-    // other methods won't work.  This is here to allow the lockless queue to
-    // only build a sender if there was one available.
-    bool valid() const { return sender_index_ != -1 && memory_ != nullptr; }
-
-    // Pointer to the backing memory.
-    LocklessQueueMemory *memory_ = nullptr;
-
-    // Index into the sender list.
-    int sender_index_ = -1;
-  };
-
-  // Pinner for blocks of data.  The resources associated with a pinner are
-  // scoped to this object's lifetime.
-  class Pinner {
-   public:
-    Pinner(const Pinner &) = delete;
-    Pinner &operator=(const Pinner &) = delete;
-    Pinner(Pinner &&other)
-        : memory_(other.memory_), pinner_index_(other.pinner_index_) {
-      other.memory_ = nullptr;
-      other.pinner_index_ = -1;
-    }
-    Pinner &operator=(Pinner &&other) {
-      memory_ = other.memory_;
-      pinner_index_ = other.pinner_index_;
-      other.memory_ = nullptr;
-      other.pinner_index_ = -1;
-      return *this;
-    }
-
-    ~Pinner();
-
-    // Attempts to pin the message at queue_index.
-    // Un-pins the previous message.
-    // Returns the buffer index (non-negative) if it succeeds.
-    // Returns -1 if that message is no longer in the queue.
-    int PinIndex(uint32_t queue_index);
-
-    // Read at most size() bytes of data into the memory pointed to by Data().
-    // Note: calls to Data() are expensive enough that you should cache it.
-    // Don't call Data() before a successful PinIndex call.
-    size_t size() const;
-    const void *Data() const;
-
-   private:
-    friend class LocklessQueue;
-
-    Pinner(LocklessQueueMemory *memory);
-
-    // Returns true if this pinner is valid.  If it isn't valid, any of the
-    // other methods won't work.  This is here to allow the lockless queue to
-    // only build a pinner if there was one available.
-    bool valid() const { return pinner_index_ != -1 && memory_ != nullptr; }
-
-    // Pointer to the backing memory.
-    LocklessQueueMemory *memory_ = nullptr;
-
-    // Index into the pinner list.
-    int pinner_index_ = -1;
-  };
-
-  // Creates a sender.  If we couldn't allocate a sender, returns nullopt.
-  // TODO(austin): Change the API if we find ourselves with more errors.
-  std::optional<Sender> MakeSender();
-
-  // Creates a pinner.  If we couldn't allocate a pinner, returns nullopt.
-  // TODO(austin): Change the API if we find ourselves with more errors.
-  std::optional<Pinner> MakePinner();
-
  private:
-  LocklessQueueMemory *memory_ = nullptr;
-
   // Memory and datastructure used to sort a list of watchers to wake
   // up.  This isn't a copy of Watcher since tid is simpler to work with here
   // than the futex above.
@@ -346,17 +216,176 @@
     pid_t pid;
     int priority;
   };
-  // TODO(austin): Don't allocate this memory if we aren't going to send.
-  ::std::vector<WatcherCopy> watcher_copy_;
 
-  // Index in the watcher list that our entry is, or -1 if no watcher is
-  // registered.
-  int watcher_index_ = -1;
-
+  const LocklessQueueMemory *const memory_;
   const int pid_;
   const uid_t uid_;
+
+  ::std::vector<WatcherCopy> watcher_copy_;
 };
 
+// Sender for blocks of data.  The resources associated with a sender are
+// scoped to this object's lifetime.
+class LocklessQueueSender {
+ public:
+  LocklessQueueSender(const LocklessQueueSender &) = delete;
+  LocklessQueueSender &operator=(const LocklessQueueSender &) = delete;
+  LocklessQueueSender(LocklessQueueSender &&other)
+      : memory_(other.memory_), sender_index_(other.sender_index_) {
+    other.memory_ = nullptr;
+    other.sender_index_ = -1;
+  }
+  LocklessQueueSender &operator=(LocklessQueueSender &&other) {
+    std::swap(memory_, other.memory_);
+    std::swap(sender_index_, other.sender_index_);
+    return *this;
+  }
+
+  ~LocklessQueueSender();
+
+  // Creates a sender.  If we couldn't allocate a sender, returns nullopt.
+  // TODO(austin): Change the API if we find ourselves with more errors.
+  static std::optional<LocklessQueueSender> Make(LocklessQueue queue);
+
+  // Sends a message without copying the data.
+  // Copy at most size() bytes of data into the memory pointed to by Data(),
+  // and then call Send().
+  // Note: calls to Data() are expensive enough that you should cache it.
+  size_t size() const;
+  void *Data();
+  void Send(size_t length,
+            aos::monotonic_clock::time_point monotonic_remote_time =
+                aos::monotonic_clock::min_time,
+            aos::realtime_clock::time_point realtime_remote_time =
+                aos::realtime_clock::min_time,
+            uint32_t remote_queue_index = 0xffffffff,
+            aos::monotonic_clock::time_point *monotonic_sent_time = nullptr,
+            aos::realtime_clock::time_point *realtime_sent_time = nullptr,
+            uint32_t *queue_index = nullptr);
+
+  // Sends up to length data.  Does not wakeup the target.
+  void Send(const char *data, size_t length,
+            aos::monotonic_clock::time_point monotonic_remote_time =
+                aos::monotonic_clock::min_time,
+            aos::realtime_clock::time_point realtime_remote_time =
+                aos::realtime_clock::min_time,
+            uint32_t remote_queue_index = 0xffffffff,
+            aos::monotonic_clock::time_point *monotonic_sent_time = nullptr,
+            aos::realtime_clock::time_point *realtime_sent_time = nullptr,
+            uint32_t *queue_index = nullptr);
+
+  int buffer_index() const;
+
+ private:
+  LocklessQueueSender(LocklessQueueMemory *memory);
+
+  // Pointer to the backing memory.
+  LocklessQueueMemory *memory_ = nullptr;
+
+  // Index into the sender list.
+  int sender_index_ = -1;
+};
+
+// Pinner for blocks of data.  The resources associated with a pinner are
+// scoped to this object's lifetime.
+class LocklessQueuePinner {
+ public:
+  LocklessQueuePinner(const LocklessQueuePinner &) = delete;
+  LocklessQueuePinner &operator=(const LocklessQueuePinner &) = delete;
+  LocklessQueuePinner(LocklessQueuePinner &&other)
+      : memory_(other.memory_),
+        const_memory_(other.const_memory_),
+        pinner_index_(other.pinner_index_) {
+    other.pinner_index_ = -1;
+  }
+  LocklessQueuePinner &operator=(LocklessQueuePinner &&other) {
+    std::swap(memory_, other.memory_);
+    std::swap(const_memory_, other.const_memory_);
+    std::swap(pinner_index_, other.pinner_index_);
+    return *this;
+  }
+
+  ~LocklessQueuePinner();
+
+  // Creates a pinner.  If we couldn't allocate a pinner, returns nullopt.
+  // TODO(austin): Change the API if we find ourselves with more errors.
+  static std::optional<LocklessQueuePinner> Make(LocklessQueue queue);
+
+  // Attempts to pin the message at queue_index.
+  // Un-pins the previous message.
+  // Returns the buffer index (non-negative) if it succeeds.
+  // Returns -1 if that message is no longer in the queue.
+  int PinIndex(uint32_t queue_index);
+
+  // Read at most size() bytes of data into the memory pointed to by Data().
+  // Note: calls to Data() are expensive enough that you should cache it.
+  // Don't call Data() before a successful PinIndex call.
+  size_t size() const;
+  const void *Data() const;
+
+ private:
+  LocklessQueuePinner(LocklessQueueMemory *memory,
+                      const LocklessQueueMemory *const_memory);
+
+  // Pointer to the backing memory.
+  LocklessQueueMemory *memory_ = nullptr;
+  const LocklessQueueMemory *const_memory_ = nullptr;
+
+  // Index into the pinner list.
+  int pinner_index_ = -1;
+};
+
+class LocklessQueueReader {
+ public:
+  enum class Result { TOO_OLD, GOOD, NOTHING_NEW, OVERWROTE };
+
+  LocklessQueueReader(LocklessQueue queue) : memory_(queue.const_memory()) {
+    queue.Initialize();
+  }
+
+  // If you ask for a queue index 2 past the newest, you will still get
+  // NOTHING_NEW until that gets overwritten with new data.  If you ask for an
+  // element newer than QueueSize() from the current message, we consider it
+  // behind by a large amount and return TOO_OLD.  If the message is modified
+  // out from underneath us as we read it, return OVERWROTE.
+  //
+  // data may be nullptr to indicate the data should not be copied.
+  Result Read(uint32_t queue_index,
+              ::aos::monotonic_clock::time_point *monotonic_sent_time,
+              ::aos::realtime_clock::time_point *realtime_sent_time,
+              ::aos::monotonic_clock::time_point *monotonic_remote_time,
+              ::aos::realtime_clock::time_point *realtime_remote_time,
+              uint32_t *remote_queue_index, size_t *length, char *data) const;
+
+  // Returns the index to the latest queue message.  Returns empty_queue_index()
+  // if there are no messages in the queue.  Do note that this index wraps if
+  // more than 2^32 messages are sent.
+  QueueIndex LatestIndex() const;
+
+ private:
+  const LocklessQueueMemory *const memory_;
+};
+
+// Returns the number of messages which are logically in the queue at a time.
+size_t LocklessQueueSize(const LocklessQueueMemory *memory);
+
+// Returns the number of bytes queue users are allowed to read/write within each
+// message.
+size_t LocklessQueueMessageDataSize(const LocklessQueueMemory *memory);
+
+// TODO(austin): Return the oldest queue index.  This lets us catch up nicely
+// if we got behind.
+// The easiest way to implement this is likely going to be to reserve the
+// first modulo of values for the initial time around, and never reuse them.
+// That lets us do a simple atomic read of the next index and deduce what has
+// happened.  It will involve the simplest atomic operations.
+
+// TODO(austin): Make it so we can find the indices which were sent just
+// before and after a time with a binary search.
+
+// Prints to stdout the data inside the queue for debugging.
+void PrintLocklessQueueMemory(LocklessQueueMemory *memory);
+
 }  // namespace ipc_lib
 }  // namespace aos
 
diff --git a/aos/ipc_lib/lockless_queue_death_test.cc b/aos/ipc_lib/lockless_queue_death_test.cc
index a8a2429..3d8812d 100644
--- a/aos/ipc_lib/lockless_queue_death_test.cc
+++ b/aos/ipc_lib/lockless_queue_death_test.cc
@@ -511,18 +511,21 @@
       config,
       [config, tid](void *memory) {
         // Initialize the queue and grab the tid.
-        LocklessQueue queue(
+        LocklessQueue(
             reinterpret_cast<aos::ipc_lib::LocklessQueueMemory *>(memory),
-            config);
+            reinterpret_cast<aos::ipc_lib::LocklessQueueMemory *>(memory),
+            config)
+            .Initialize();
         *tid = gettid();
       },
       [config](void *memory) {
-        // Now try to write 2 messages.  We will get killed a bunch as this
-        // tries to happen.
         LocklessQueue queue(
             reinterpret_cast<aos::ipc_lib::LocklessQueueMemory *>(memory),
+            reinterpret_cast<aos::ipc_lib::LocklessQueueMemory *>(memory),
             config);
-        LocklessQueue::Sender sender = queue.MakeSender().value();
+        // Now try to write 2 messages.  We will get killed a bunch as this
+        // tries to happen.
+        LocklessQueueSender sender = LocklessQueueSender::Make(queue).value();
         for (int i = 0; i < 2; ++i) {
           char data[100];
           size_t s = snprintf(data, sizeof(data), "foobar%d", i + 1);
@@ -530,10 +533,11 @@
         }
       },
       [config, tid](void *raw_memory) {
+        ::aos::ipc_lib::LocklessQueueMemory *const memory =
+            reinterpret_cast<::aos::ipc_lib::LocklessQueueMemory *>(raw_memory);
         // Confirm that we can create 2 senders (the number in the queue), and
         // send a message.  And that all the messages in the queue are valid.
-        ::aos::ipc_lib::LocklessQueueMemory *memory =
-            reinterpret_cast<::aos::ipc_lib::LocklessQueueMemory *>(raw_memory);
+        LocklessQueue queue(memory, memory, config);
 
         bool print = false;
 
@@ -555,9 +559,10 @@
           PrintLocklessQueueMemory(memory);
         }
 
-        LocklessQueue queue(memory, config);
         // Building and destroying a sender will clean up the queue.
-        { LocklessQueue::Sender sender = queue.MakeSender().value(); }
+        {
+          LocklessQueueSender sender = LocklessQueueSender::Make(queue).value();
+        }
 
         if (print) {
           printf("Cleaned up version:\n");
@@ -565,12 +570,11 @@
         }
 
         {
-          LocklessQueue::Sender sender = queue.MakeSender().value();
+          LocklessQueueSender sender = LocklessQueueSender::Make(queue).value();
           {
             // Make a second sender to confirm that the slot was freed.
             // If the sender doesn't get cleaned up, this will fail.
-            LocklessQueue queue2(memory, config);
-            queue2.MakeSender().value();
+            LocklessQueueSender::Make(queue).value();
           }
 
           // Send a message to make sure that the queue still works.
@@ -579,6 +583,8 @@
           sender.Send(data, s + 1);
         }
 
+        LocklessQueueReader reader(queue);
+
         // Now loop through the queue and make sure the number in the snprintf
         // increments.
         char last_data = '0';
@@ -592,19 +598,21 @@
           char read_data[1024];
           size_t length;
 
-          LocklessQueue::ReadResult read_result =
-              queue.Read(i, &monotonic_sent_time, &realtime_sent_time,
-                         &monotonic_remote_time, &realtime_remote_time,
-                         &remote_queue_index, &length, &(read_data[0]));
+          LocklessQueueReader::Result read_result =
+              reader.Read(i, &monotonic_sent_time, &realtime_sent_time,
+                          &monotonic_remote_time, &realtime_remote_time,
+                          &remote_queue_index, &length, &(read_data[0]));
 
-          if (read_result != LocklessQueue::ReadResult::GOOD) {
+          if (read_result != LocklessQueueReader::Result::GOOD) {
             break;
           }
 
-          EXPECT_GT(read_data[queue.message_data_size() - length + 6],
-                    last_data)
+          EXPECT_GT(
+              read_data[LocklessQueueMessageDataSize(memory) - length + 6],
+              last_data)
               << ": Got " << read_data;
-          last_data = read_data[queue.message_data_size() - length + 6];
+          last_data =
+              read_data[LocklessQueueMessageDataSize(memory) - length + 6];
 
           ++i;
         }
diff --git a/aos/ipc_lib/lockless_queue_memory.h b/aos/ipc_lib/lockless_queue_memory.h
index aa9b2e1..bb995b9 100644
--- a/aos/ipc_lib/lockless_queue_memory.h
+++ b/aos/ipc_lib/lockless_queue_memory.h
@@ -75,27 +75,27 @@
   alignas(kDataAlignment) char data[];
 
   // Memory size functions for all 4 lists.
-  size_t SizeOfQueue() { return SizeOfQueue(config); }
+  size_t SizeOfQueue() const { return SizeOfQueue(config); }
   static size_t SizeOfQueue(LocklessQueueConfiguration config) {
     return AlignmentRoundUp(sizeof(AtomicIndex) * config.queue_size);
   }
 
-  size_t SizeOfMessages() { return SizeOfMessages(config); }
+  size_t SizeOfMessages() const { return SizeOfMessages(config); }
   static size_t SizeOfMessages(LocklessQueueConfiguration config) {
     return AlignmentRoundUp(config.message_size() * config.num_messages());
   }
 
-  size_t SizeOfWatchers() { return SizeOfWatchers(config); }
+  size_t SizeOfWatchers() const { return SizeOfWatchers(config); }
   static size_t SizeOfWatchers(LocklessQueueConfiguration config) {
     return AlignmentRoundUp(sizeof(Watcher) * config.num_watchers);
   }
 
-  size_t SizeOfSenders() { return SizeOfSenders(config); }
+  size_t SizeOfSenders() const { return SizeOfSenders(config); }
   static size_t SizeOfSenders(LocklessQueueConfiguration config) {
     return AlignmentRoundUp(sizeof(Sender) * config.num_senders);
   }
 
-  size_t SizeOfPinners() { return SizeOfPinners(config); }
+  size_t SizeOfPinners() const { return SizeOfPinners(config); }
   static size_t SizeOfPinners(LocklessQueueConfiguration config) {
     return AlignmentRoundUp(sizeof(Pinner) * config.num_pinners);
   }
@@ -110,6 +110,14 @@
         SizeOfSenders() + pinner_index * sizeof(Pinner));
   }
 
+  const Pinner *GetPinner(size_t pinner_index) const {
+    static_assert(alignof(const Pinner) <= kDataAlignment,
+                  "kDataAlignment is too small");
+    return reinterpret_cast<const Pinner *>(
+        &data[0] + SizeOfQueue() + SizeOfMessages() + SizeOfWatchers() +
+        SizeOfSenders() + pinner_index * sizeof(Pinner));
+  }
+
   Sender *GetSender(size_t sender_index) {
     static_assert(alignof(Sender) <= kDataAlignment,
                   "kDataAlignment is too small");
@@ -126,12 +134,26 @@
                                        watcher_index * sizeof(Watcher));
   }
 
+  const Watcher *GetWatcher(size_t watcher_index) const {
+    static_assert(alignof(const Watcher) <= kDataAlignment,
+                  "kDataAlignment is too small");
+    return reinterpret_cast<const Watcher *>(&data[0] + SizeOfQueue() +
+                                             SizeOfMessages() +
+                                             watcher_index * sizeof(Watcher));
+  }
+
   AtomicIndex *GetQueue(uint32_t index) {
     static_assert(alignof(AtomicIndex) <= kDataAlignment,
                   "kDataAlignment is too small");
     return reinterpret_cast<AtomicIndex *>(&data[0] +
                                            sizeof(AtomicIndex) * index);
   }
+  const AtomicIndex *GetQueue(uint32_t index) const {
+    static_assert(alignof(const AtomicIndex) <= kDataAlignment,
+                  "kDataAlignment is too small");
+    return reinterpret_cast<const AtomicIndex *>(&data[0] +
+                                                 sizeof(AtomicIndex) * index);
+  }
 
   // There are num_messages() messages.  The free list is really the
   // sender+pinner list, since those are messages available to be filled in.
@@ -142,12 +164,19 @@
     return reinterpret_cast<Message *>(&data[0] + SizeOfQueue() +
                                        index.message_index() * message_size());
   }
+  const Message *GetMessage(Index index) const {
+    static_assert(alignof(const Message) <= kDataAlignment,
+                  "kDataAlignment is too small");
+    return reinterpret_cast<const Message *>(
+        &data[0] + SizeOfQueue() + index.message_index() * message_size());
+  }
 
   // Helpers to fetch messages from the queue.
-  Index LoadIndex(QueueIndex index) {
+  Index LoadIndex(QueueIndex index) const {
     return GetQueue(index.Wrapped())->Load();
   }
-  Message *GetMessage(QueueIndex index) {
+  Message *GetMessage(QueueIndex index) { return GetMessage(LoadIndex(index)); }
+  const Message *GetMessage(QueueIndex index) const {
     return GetMessage(LoadIndex(index));
   }
 
diff --git a/aos/ipc_lib/lockless_queue_test.cc b/aos/ipc_lib/lockless_queue_test.cc
index 4e84b9f..e1e2516 100644
--- a/aos/ipc_lib/lockless_queue_test.cc
+++ b/aos/ipc_lib/lockless_queue_test.cc
@@ -56,15 +56,16 @@
     Reset();
   }
 
-  LocklessQueueMemory *get_memory() {
-    return reinterpret_cast<LocklessQueueMemory *>(&(memory_[0]));
+  LocklessQueue queue() {
+    return LocklessQueue(reinterpret_cast<LocklessQueueMemory *>(&(memory_[0])),
+                         reinterpret_cast<LocklessQueueMemory *>(&(memory_[0])),
+                         config_);
   }
 
-  void Reset() { memset(get_memory(), 0, LocklessQueueMemorySize(config_)); }
+  void Reset() { memset(&memory_[0], 0, LocklessQueueMemorySize(config_)); }
 
   // Runs until the signal is received.
   void RunUntilWakeup(Event *ready, int priority) {
-    LocklessQueue queue(get_memory(), config_);
     internal::EPoll epoll;
     SignalFd signalfd({kWakeupSignal});
 
@@ -75,16 +76,18 @@
       epoll.Quit();
     });
 
-    // Register to be woken up *after* the signalfd is catching the signals.
-    queue.RegisterWakeup(priority);
+    {
+      // Register to be woken up *after* the signalfd is catching the signals.
+      LocklessQueueWatcher watcher =
+          LocklessQueueWatcher::Make(queue(), priority).value();
 
-    // And signal we are now ready.
-    ready->Set();
+      // And signal we are now ready.
+      ready->Set();
 
-    epoll.Run();
+      epoll.Run();
 
-    // Cleanup.
-    queue.UnregisterWakeup();
+      // Cleanup, ensuring the watcher is destroyed before the signalfd.
+    }
     epoll.DeleteFd(signalfd.fd());
   }
 
@@ -99,36 +102,35 @@
 
 // Tests that wakeup doesn't do anything if nothing was registered.
 TEST_F(LocklessQueueTest, NoWatcherWakeup) {
-  LocklessQueue queue(get_memory(), config_);
+  LocklessQueueWakeUpper wake_upper(queue());
 
-  EXPECT_EQ(queue.Wakeup(7), 0);
+  EXPECT_EQ(wake_upper.Wakeup(7), 0);
 }
 
 // Tests that wakeup doesn't do anything if a wakeup was registered and then
 // unregistered.
 TEST_F(LocklessQueueTest, UnregisteredWatcherWakeup) {
-  LocklessQueue queue(get_memory(), config_);
+  LocklessQueueWakeUpper wake_upper(queue());
 
-  queue.RegisterWakeup(5);
-  queue.UnregisterWakeup();
+  { LocklessQueueWatcher::Make(queue(), 5).value(); }
 
-  EXPECT_EQ(queue.Wakeup(7), 0);
+  EXPECT_EQ(wake_upper.Wakeup(7), 0);
 }
 
 // Tests that wakeup doesn't do anything if the thread dies.
 TEST_F(LocklessQueueTest, DiedWatcherWakeup) {
-  LocklessQueue queue(get_memory(), config_);
+  LocklessQueueWakeUpper wake_upper(queue());
 
   ::std::thread([this]() {
     // Use placement new so the destructor doesn't get run.
-    ::std::aligned_storage<sizeof(LocklessQueue), alignof(LocklessQueue)>::type
-        data;
-    LocklessQueue *q = new (&data) LocklessQueue(get_memory(), config_);
-    // Register a wakeup.
-    q->RegisterWakeup(5);
-  }).join();
+    ::std::aligned_storage<sizeof(LocklessQueueWatcher),
+                           alignof(LocklessQueueWatcher)>::type data;
+    new (&data)
+        LocklessQueueWatcher(LocklessQueueWatcher::Make(queue(), 5).value());
+  })
+      .join();
 
-  EXPECT_EQ(queue.Wakeup(7), 0);
+  EXPECT_EQ(wake_upper.Wakeup(7), 0);
 }
 
 struct WatcherState {
@@ -155,16 +157,13 @@
 
     WatcherState *s = &queues.back();
     queues.back().t = ::std::thread([this, &cleanup, s]() {
-      LocklessQueue q(get_memory(), config_);
-      EXPECT_TRUE(q.RegisterWakeup(0));
+      LocklessQueueWatcher q = LocklessQueueWatcher::Make(queue(), 0).value();
 
       // Signal that this thread is ready.
       s->ready.Set();
 
       // And wait until we are asked to shut down.
       cleanup.Wait();
-
-      q.UnregisterWakeup();
     });
   }
 
@@ -174,12 +173,9 @@
   }
 
   // Now try to allocate another one.  This will fail.
-  {
-    LocklessQueue queue(get_memory(), config_);
-    EXPECT_FALSE(queue.RegisterWakeup(0));
-  }
+  EXPECT_FALSE(LocklessQueueWatcher::Make(queue(), 0));
 
-  // Trigger the threads to cleanup their resources, and wait unti they are
+  // Trigger the threads to cleanup their resources, and wait until they are
   // done.
   cleanup.Set();
   for (WatcherState &w : queues) {
@@ -187,23 +183,16 @@
   }
 
   // We should now be able to allocate a wakeup.
-  {
-    LocklessQueue queue(get_memory(), config_);
-    EXPECT_TRUE(queue.RegisterWakeup(0));
-    queue.UnregisterWakeup();
-  }
+  EXPECT_TRUE(LocklessQueueWatcher::Make(queue(), 0));
 }
 
 // Tests that too many watchers dies like expected.
 TEST_F(LocklessQueueTest, TooManySenders) {
-  ::std::vector<::std::unique_ptr<LocklessQueue>> queues;
-  ::std::vector<LocklessQueue::Sender> senders;
+  ::std::vector<LocklessQueueSender> senders;
   for (size_t i = 0; i < config_.num_senders; ++i) {
-    queues.emplace_back(new LocklessQueue(get_memory(), config_));
-    senders.emplace_back(queues.back()->MakeSender().value());
+    senders.emplace_back(LocklessQueueSender::Make(queue()).value());
   }
-  queues.emplace_back(new LocklessQueue(get_memory(), config_));
-  EXPECT_FALSE(queues.back()->MakeSender());
+  EXPECT_FALSE(LocklessQueueSender::Make(queue()));
 }
 
 // Now, start 2 threads and have them receive the signals.
@@ -212,7 +201,7 @@
   EXPECT_LE(kWakeupSignal, SIGRTMAX);
   EXPECT_GE(kWakeupSignal, SIGRTMIN);
 
-  LocklessQueue queue(get_memory(), config_);
+  LocklessQueueWakeUpper wake_upper(queue());
 
   // Event used to make sure the thread is ready before the test starts.
   Event ready1;
@@ -225,7 +214,7 @@
   ready1.Wait();
   ready2.Wait();
 
-  EXPECT_EQ(queue.Wakeup(3), 2);
+  EXPECT_EQ(wake_upper.Wakeup(3), 2);
 
   t1.join();
   t2.join();
@@ -237,15 +226,14 @@
 
 // Do a simple send test.
 TEST_F(LocklessQueueTest, Send) {
-  LocklessQueue queue(get_memory(), config_);
-
-  LocklessQueue::Sender sender = queue.MakeSender().value();
+  LocklessQueueSender sender = LocklessQueueSender::Make(queue()).value();
+  LocklessQueueReader reader(queue());
 
   // Send enough messages to wrap.
   for (int i = 0; i < 20000; ++i) {
     // Confirm that the queue index makes sense given the number of sends.
-    EXPECT_EQ(queue.LatestQueueIndex().index(),
-              i == 0 ? LocklessQueue::empty_queue_index().index() : i - 1);
+    EXPECT_EQ(reader.LatestIndex().index(),
+              i == 0 ? QueueIndex::Invalid().index() : i - 1);
 
     // Send a trivial piece of data.
     char data[100];
@@ -254,7 +242,7 @@
 
     // Confirm that the queue index still makes sense.  This is easier since the
     // empty case has been handled.
-    EXPECT_EQ(queue.LatestQueueIndex().index(), i);
+    EXPECT_EQ(reader.LatestIndex().index(), i);
 
     // Read a result from 5 in the past.
     ::aos::monotonic_clock::time_point monotonic_sent_time;
@@ -271,15 +259,15 @@
     } else {
       index = index.IncrementBy(i - 5);
     }
-    LocklessQueue::ReadResult read_result =
-        queue.Read(index.index(), &monotonic_sent_time, &realtime_sent_time,
-                   &monotonic_remote_time, &realtime_remote_time,
-                   &remote_queue_index, &length, &(read_data[0]));
+    LocklessQueueReader::Result read_result =
+        reader.Read(index.index(), &monotonic_sent_time, &realtime_sent_time,
+                    &monotonic_remote_time, &realtime_remote_time,
+                    &remote_queue_index, &length, &(read_data[0]));
 
     // This should either return GOOD, or TOO_OLD if it is before the start of
     // the queue.
-    if (read_result != LocklessQueue::ReadResult::GOOD) {
-      EXPECT_EQ(read_result, LocklessQueue::ReadResult::TOO_OLD);
+    if (read_result != LocklessQueueReader::Result::GOOD) {
+      EXPECT_EQ(read_result, LocklessQueueReader::Result::TOO_OLD);
     }
   }
 }
@@ -295,9 +283,8 @@
 
   const chrono::seconds print_frequency(FLAGS_print_rate);
 
-  QueueRacer racer(get_memory(), FLAGS_thread_count, kNumMessages, config_);
-  const monotonic_clock::time_point start_time =
-      monotonic_clock::now();
+  QueueRacer racer(queue(), FLAGS_thread_count, kNumMessages);
+  const monotonic_clock::time_point start_time = monotonic_clock::now();
   const monotonic_clock::time_point end_time =
       start_time + chrono::seconds(FLAGS_duration);
 
@@ -333,7 +320,7 @@
 // Send enough messages to wrap the 32 bit send counter.
 TEST_F(LocklessQueueTest, WrappedSend) {
   uint64_t kNumMessages = 0x100010000ul;
-  QueueRacer racer(get_memory(), 1, kNumMessages, config_);
+  QueueRacer racer(queue(), 1, kNumMessages);
 
   const monotonic_clock::time_point start_time = monotonic_clock::now();
   EXPECT_NO_FATAL_FAILURE(racer.RunIteration(false, 0));
diff --git a/aos/ipc_lib/queue_racer.cc b/aos/ipc_lib/queue_racer.cc
index 69f5f21..fcc8668 100644
--- a/aos/ipc_lib/queue_racer.cc
+++ b/aos/ipc_lib/queue_racer.cc
@@ -24,19 +24,16 @@
   uint64_t event_count = ::std::numeric_limits<uint64_t>::max();
 };
 
-QueueRacer::QueueRacer(LocklessQueueMemory *memory, int num_threads,
-                       uint64_t num_messages, LocklessQueueConfiguration config)
-    : memory_(memory),
-      num_threads_(num_threads),
-      num_messages_(num_messages),
-      config_(config) {
+QueueRacer::QueueRacer(LocklessQueue queue, int num_threads,
+                       uint64_t num_messages)
+    : queue_(queue), num_threads_(num_threads), num_messages_(num_messages) {
   Reset();
 }
 
 void QueueRacer::RunIteration(bool race_reads, int write_wrap_count) {
   const bool will_wrap = num_messages_ * num_threads_ *
                              static_cast<uint64_t>(1 + write_wrap_count) >
-                         config_.queue_size;
+                         queue_.config().queue_size;
 
   // Clear out shmem.
   Reset();
@@ -52,13 +49,13 @@
   ::std::vector<ThreadState> threads(num_threads_);
 
   ::std::thread queue_index_racer([this, &poll_index]() {
-    LocklessQueue queue(memory_, config_);
+    LocklessQueueReader reader(queue_);
 
     // Track the number of times we wrap, and cache the modulo.
     uint64_t wrap_count = 0;
     uint32_t last_queue_index = 0;
     const uint32_t max_queue_index =
-        QueueIndex::MaxIndex(0xffffffffu, queue.QueueSize());
+        QueueIndex::MaxIndex(0xffffffffu, queue_.config().queue_size);
     while (poll_index) {
       // We want to read everything backwards.  This will give us conservative
       // bounds.  And with enough time and randomness, we will see all the cases
@@ -81,16 +78,14 @@
       //
       // So, grab them in order.
       const uint64_t finished_writes = finished_writes_.load();
-      const QueueIndex latest_queue_index_queue_index =
-          queue.LatestQueueIndex();
+      const QueueIndex latest_queue_index_queue_index = reader.LatestIndex();
       const uint64_t started_writes = started_writes_.load();
 
       const uint32_t latest_queue_index_uint32_t =
           latest_queue_index_queue_index.index();
       uint64_t latest_queue_index = latest_queue_index_uint32_t;
 
-      if (latest_queue_index_queue_index !=
-          LocklessQueue::empty_queue_index()) {
+      if (latest_queue_index_queue_index != QueueIndex::Invalid()) {
         // If we got smaller, we wrapped.
         if (latest_queue_index_uint32_t < last_queue_index) {
           ++wrap_count;
@@ -107,22 +102,19 @@
 
       // If we are at the beginning, the queue needs to always return empty.
       if (started_writes == 0) {
-        EXPECT_EQ(latest_queue_index_queue_index,
-                  LocklessQueue::empty_queue_index());
+        EXPECT_EQ(latest_queue_index_queue_index, QueueIndex::Invalid());
         EXPECT_EQ(finished_writes, 0);
       } else {
         if (finished_writes == 0) {
           // Plausible to be at the beginning, in which case we don't have
           // anything to check.
-          if (latest_queue_index_queue_index !=
-              LocklessQueue::empty_queue_index()) {
+          if (latest_queue_index_queue_index != QueueIndex::Invalid()) {
             // Otherwise, we have started.  The queue can't have any more
             // entries than this.
             EXPECT_GE(started_writes, latest_queue_index + 1);
           }
         } else {
-          EXPECT_NE(latest_queue_index_queue_index,
-                    LocklessQueue::empty_queue_index());
+          EXPECT_NE(latest_queue_index_queue_index, QueueIndex::Invalid());
           // latest_queue_index is an index, not a count.  So it always reads 1
           // low.
           EXPECT_GE(latest_queue_index + 1, finished_writes);
@@ -142,8 +134,7 @@
     t.thread = ::std::thread([this, &t, thread_index, &run,
                               write_wrap_count]() {
       // Build up a sender.
-      LocklessQueue queue(memory_, config_);
-      LocklessQueue::Sender sender = queue.MakeSender().value();
+      LocklessQueueSender sender = LocklessQueueSender::Make(queue_).value();
       CHECK_GE(sender.size(), sizeof(ThreadPlusCount));
 
       // Signal that we are ready to start sending.
@@ -255,17 +246,16 @@
 void QueueRacer::CheckReads(bool race_reads, int write_wrap_count,
                             ::std::vector<ThreadState> *threads) {
   // Now read back the results to double check.
-  LocklessQueue queue(memory_, config_);
-
-  const bool will_wrap =
-      num_messages_ * num_threads_ * (1 + write_wrap_count) > queue.QueueSize();
+  LocklessQueueReader reader(queue_);
+  const bool will_wrap = num_messages_ * num_threads_ * (1 + write_wrap_count) >
+                         LocklessQueueSize(queue_.memory());
 
   monotonic_clock::time_point last_monotonic_sent_time =
       monotonic_clock::epoch();
   uint64_t initial_i = 0;
   if (will_wrap) {
     initial_i = (1 + write_wrap_count) * num_messages_ * num_threads_ -
-                queue.QueueSize();
+                LocklessQueueSize(queue_.memory());
   }
 
   for (uint64_t i = initial_i;
@@ -279,27 +269,28 @@
     char read_data[1024];
 
     // Handle overflowing the message count for the wrap test.
-    const uint32_t wrapped_i = i % static_cast<size_t>(QueueIndex::MaxIndex(
-                                       0xffffffffu, queue.QueueSize()));
-    LocklessQueue::ReadResult read_result =
-        queue.Read(wrapped_i, &monotonic_sent_time, &realtime_sent_time,
-                   &monotonic_remote_time, &realtime_remote_time,
-                   &remote_queue_index, &length, &(read_data[0]));
+    const uint32_t wrapped_i =
+        i % static_cast<size_t>(QueueIndex::MaxIndex(
+                0xffffffffu, LocklessQueueSize(queue_.memory())));
+    LocklessQueueReader::Result read_result =
+        reader.Read(wrapped_i, &monotonic_sent_time, &realtime_sent_time,
+                    &monotonic_remote_time, &realtime_remote_time,
+                    &remote_queue_index, &length, &(read_data[0]));
 
     if (race_reads) {
-      if (read_result == LocklessQueue::ReadResult::NOTHING_NEW) {
+      if (read_result == LocklessQueueReader::Result::NOTHING_NEW) {
         --i;
         continue;
       }
     }
 
     if (race_reads && will_wrap) {
-      if (read_result == LocklessQueue::ReadResult::TOO_OLD) {
+      if (read_result == LocklessQueueReader::Result::TOO_OLD) {
         continue;
       }
     }
     // Every message should be good.
-    ASSERT_EQ(read_result, LocklessQueue::ReadResult::GOOD) << ": i is " << i;
+    ASSERT_EQ(read_result, LocklessQueueReader::Result::GOOD) << ": i is " << i;
 
     // And, confirm that time never went backwards.
     ASSERT_GT(monotonic_sent_time, last_monotonic_sent_time);
@@ -310,7 +301,8 @@
 
     ThreadPlusCount tpc;
     ASSERT_EQ(length, sizeof(ThreadPlusCount));
-    memcpy(&tpc, read_data + queue.message_data_size() - length,
+    memcpy(&tpc,
+           read_data + LocklessQueueMessageDataSize(queue_.memory()) - length,
            sizeof(ThreadPlusCount));
 
     if (will_wrap) {
diff --git a/aos/ipc_lib/queue_racer.h b/aos/ipc_lib/queue_racer.h
index eaeedd4..7e92693 100644
--- a/aos/ipc_lib/queue_racer.h
+++ b/aos/ipc_lib/queue_racer.h
@@ -14,8 +14,7 @@
 // them together to all write at once.
 class QueueRacer {
  public:
-  QueueRacer(LocklessQueueMemory *memory, int num_threads,
-             uint64_t num_messages, LocklessQueueConfiguration config);
+  QueueRacer(LocklessQueue queue, int num_threads, uint64_t num_messages);
 
   // Runs an iteration of the race.
   //
@@ -35,13 +34,14 @@
   void RunIteration(bool race_reads, int write_wrap_count);
 
   size_t CurrentIndex() {
-    LocklessQueue queue(memory_, config_);
-    return queue.LatestQueueIndex().index();
+    return LocklessQueueReader(queue_).LatestIndex().index();
   }
 
  private:
   // Wipes the queue memory out so we get a clean start.
-  void Reset() { memset(memory_, 0, LocklessQueueMemorySize(config_)); }
+  void Reset() {
+    memset(queue_.memory(), 0, LocklessQueueMemorySize(queue_.config()));
+  }
 
   // This is a separate method so that when all the ASSERT_* methods, we still
   // clean up all the threads.  Otherwise we get an assert on the way out of
@@ -49,7 +49,7 @@
   void CheckReads(bool race_reads, int write_wrap_count,
                   ::std::vector<ThreadState> *threads);
 
-  LocklessQueueMemory *memory_;
+  LocklessQueue queue_;
   const uint64_t num_threads_;
   const uint64_t num_messages_;
 
@@ -60,8 +60,6 @@
   ::std::atomic<uint64_t> started_writes_;
   // Number of writes completed.
   ::std::atomic<uint64_t> finished_writes_;
-
-  const LocklessQueueConfiguration config_;
 };
 
 }  // namespace ipc_lib