Improve const-correctness in the LocklessQueue code

This will allow using a read-only mapping for reading data from queues
in ShmEventLoop. This will make it much harder for code reading from a
queue to accidentally modify it.

This involves splitting up LocklessQueue into individual components.
This makes it much more obvious what state is used where, and allows
adding some consts.

Change-Id: Ic83b0d2169e6dfae3eec656aa8e49852125698d9
diff --git a/aos/ipc_lib/lockless_queue.h b/aos/ipc_lib/lockless_queue.h
index afa7ced..3cd3726 100644
--- a/aos/ipc_lib/lockless_queue.h
+++ b/aos/ipc_lib/lockless_queue.h
@@ -134,40 +134,72 @@
 // is done before the watcher goes RT), but needs to be RT for the sender.
 struct LocklessQueueMemory;
 
+// Returns the size of the LocklessQueueMemory.
+size_t LocklessQueueMemorySize(LocklessQueueConfiguration config);
+
 // Initializes the queue memory.  memory must be either a valid pointer to the
 // queue datastructure, or must be zero initialized.
 LocklessQueueMemory *InitializeLocklessQueueMemory(
     LocklessQueueMemory *memory, LocklessQueueConfiguration config);
 
-// Returns the size of the LocklessQueueMemory.
-size_t LocklessQueueMemorySize(LocklessQueueConfiguration config);
-
-// Prints to stdout the data inside the queue for debugging.
-void PrintLocklessQueueMemory(LocklessQueueMemory *memory);
-
 const static unsigned int kWakeupSignal = SIGRTMIN + 2;
 
-// Class to manage sending and receiving data in the lockless queue.  This is
-// separate from the actual memory backing the queue so that memory can be
-// managed with mmap to share across the process boundary.
+// A convenient wrapper for accessing a lockless queue.
 class LocklessQueue {
  public:
-  LocklessQueue(LocklessQueueMemory *memory, LocklessQueueConfiguration config);
-  LocklessQueue(const LocklessQueue &) = delete;
-  LocklessQueue &operator=(const LocklessQueue &) = delete;
+  LocklessQueue(const LocklessQueueMemory *const_memory,
+                LocklessQueueMemory *memory, LocklessQueueConfiguration config)
+      : const_memory_(const_memory), memory_(memory), config_(config) {}
 
-  ~LocklessQueue();
+  void Initialize();
 
-  // Returns the number of messages in the queue.
-  size_t QueueSize() const;
+  LocklessQueueConfiguration config() const { return config_; }
 
-  size_t message_data_size() const;
+  const LocklessQueueMemory *const_memory() { return const_memory_; }
+  LocklessQueueMemory *memory() { return memory_; }
 
-  // Registers this thread to receive the kWakeupSignal signal when Wakeup is
-  // called. Returns false if there was an error in registration.
-  bool RegisterWakeup(int priority);
-  // Unregisters the wakeup.
-  void UnregisterWakeup();
+ private:
+  const LocklessQueueMemory *const_memory_;
+  LocklessQueueMemory *memory_;
+  LocklessQueueConfiguration config_;
+};
+
+class LocklessQueueWatcher {
+ public:
+  LocklessQueueWatcher(const LocklessQueueWatcher &) = delete;
+  LocklessQueueWatcher &operator=(const LocklessQueueWatcher &) = delete;
+  LocklessQueueWatcher(LocklessQueueWatcher &&other)
+      : memory_(other.memory_), watcher_index_(other.watcher_index_) {
+    other.watcher_index_ = -1;
+  }
+  LocklessQueueWatcher &operator=(LocklessQueueWatcher &&other) {
+    std::swap(memory_, other.memory_);
+    std::swap(watcher_index_, other.watcher_index_);
+    return *this;
+  }
+
+  ~LocklessQueueWatcher();
+
+  // Registers this thread to receive the kWakeupSignal signal when
+  // LocklessQueueWakeUpper::Wakeup is called. Returns nullopt if there was an
+  // error in registration.
+  // TODO(austin): Change the API if we find ourselves with more errors.
+  static std::optional<LocklessQueueWatcher> Make(LocklessQueue queue,
+                                                  int priority);
+
+ private:
+  LocklessQueueWatcher(LocklessQueueMemory *memory, int priority);
+
+  LocklessQueueMemory *memory_ = nullptr;
+
+  // Index in the watcher list that our entry is, or -1 if no watcher is
+  // registered.
+  int watcher_index_ = -1;
+};
+
+class LocklessQueueWakeUpper {
+ public:
+  LocklessQueueWakeUpper(LocklessQueue queue);
 
   // Sends the kWakeupSignal to all threads which have called RegisterWakeup.
   //
@@ -175,169 +207,7 @@
   // if nonrt.
   int Wakeup(int current_priority);
 
-  // If you ask for a queue index 2 past the newest, you will still get
-  // NOTHING_NEW until that gets overwritten with new data.  If you ask for an
-  // element newer than QueueSize() from the current message, we consider it
-  // behind by a large amount and return TOO_OLD.  If the message is modified
-  // out from underneath us as we read it, return OVERWROTE.
-  //
-  // data may be nullptr to indicate the data should not be copied.
-  enum class ReadResult { TOO_OLD, GOOD, NOTHING_NEW, OVERWROTE };
-  ReadResult Read(uint32_t queue_index,
-                  ::aos::monotonic_clock::time_point *monotonic_sent_time,
-                  ::aos::realtime_clock::time_point *realtime_sent_time,
-                  ::aos::monotonic_clock::time_point *monotonic_remote_time,
-                  ::aos::realtime_clock::time_point *realtime_remote_time,
-                  uint32_t *remote_queue_index, size_t *length, char *data);
-
-  // Returns the index to the latest queue message.  Returns empty_queue_index()
-  // if there are no messages in the queue.  Do note that this index wraps if
-  // more than 2^32 messages are sent.
-  QueueIndex LatestQueueIndex();
-  static QueueIndex empty_queue_index() { return QueueIndex::Invalid(); }
-
-  // Returns the size of the queue.  This is mostly useful for manipulating
-  // QueueIndex.
-  size_t queue_size() const;
-
-  // TODO(austin): Return the oldest queue index.  This lets us catch up nicely
-  // if we got behind.
-  // The easiest way to implement this is likely going to be to reserve the
-  // first modulo of values for the initial time around, and never reuse them.
-  // That lets us do a simple atomic read of the next index and deduce what has
-  // happened.  It will involve the simplest atomic operations.
-
-  // TODO(austin): Make it so we can find the indices which were sent just
-  // before and after a time with a binary search.
-
-  // Sender for blocks of data.  The resources associated with a sender are
-  // scoped to this object's lifetime.
-  class Sender {
-   public:
-    Sender(const Sender &) = delete;
-    Sender &operator=(const Sender &) = delete;
-    Sender(Sender &&other)
-        : memory_(other.memory_), sender_index_(other.sender_index_) {
-      other.memory_ = nullptr;
-      other.sender_index_ = -1;
-    }
-    Sender &operator=(Sender &&other) {
-      memory_ = other.memory_;
-      sender_index_ = other.sender_index_;
-      other.memory_ = nullptr;
-      other.sender_index_ = -1;
-      return *this;
-    }
-
-    ~Sender();
-
-    // Sends a message without copying the data.
-    // Copy at most size() bytes of data into the memory pointed to by Data(),
-    // and then call Send().
-    // Note: calls to Data() are expensive enough that you should cache it.
-    size_t size();
-    void *Data();
-    void Send(size_t length,
-              aos::monotonic_clock::time_point monotonic_remote_time =
-                  aos::monotonic_clock::min_time,
-              aos::realtime_clock::time_point realtime_remote_time =
-                  aos::realtime_clock::min_time,
-              uint32_t remote_queue_index = 0xffffffff,
-              aos::monotonic_clock::time_point *monotonic_sent_time = nullptr,
-              aos::realtime_clock::time_point *realtime_sent_time = nullptr,
-              uint32_t *queue_index = nullptr);
-
-    // Sends up to length data.  Does not wakeup the target.
-    void Send(const char *data, size_t length,
-              aos::monotonic_clock::time_point monotonic_remote_time =
-                  aos::monotonic_clock::min_time,
-              aos::realtime_clock::time_point realtime_remote_time =
-                  aos::realtime_clock::min_time,
-              uint32_t remote_queue_index = 0xffffffff,
-              aos::monotonic_clock::time_point *monotonic_sent_time = nullptr,
-              aos::realtime_clock::time_point *realtime_sent_time = nullptr,
-              uint32_t *queue_index = nullptr);
-
-    int buffer_index() const;
-
-   private:
-    friend class LocklessQueue;
-
-    Sender(LocklessQueueMemory *memory);
-
-    // Returns true if this sender is valid.  If it isn't valid, any of the
-    // other methods won't work.  This is here to allow the lockless queue to
-    // only build a sender if there was one available.
-    bool valid() const { return sender_index_ != -1 && memory_ != nullptr; }
-
-    // Pointer to the backing memory.
-    LocklessQueueMemory *memory_ = nullptr;
-
-    // Index into the sender list.
-    int sender_index_ = -1;
-  };
-
-  // Pinner for blocks of data.  The resources associated with a pinner are
-  // scoped to this object's lifetime.
-  class Pinner {
-   public:
-    Pinner(const Pinner &) = delete;
-    Pinner &operator=(const Pinner &) = delete;
-    Pinner(Pinner &&other)
-        : memory_(other.memory_), pinner_index_(other.pinner_index_) {
-      other.memory_ = nullptr;
-      other.pinner_index_ = -1;
-    }
-    Pinner &operator=(Pinner &&other) {
-      memory_ = other.memory_;
-      pinner_index_ = other.pinner_index_;
-      other.memory_ = nullptr;
-      other.pinner_index_ = -1;
-      return *this;
-    }
-
-    ~Pinner();
-
-    // Attempts to pin the message at queue_index.
-    // Un-pins the previous message.
-    // Returns the buffer index (non-negative) if it succeeds.
-    // Returns -1 if that message is no longer in the queue.
-    int PinIndex(uint32_t queue_index);
-
-    // Read at most size() bytes of data into the memory pointed to by Data().
-    // Note: calls to Data() are expensive enough that you should cache it.
-    // Don't call Data() before a successful PinIndex call.
-    size_t size() const;
-    const void *Data() const;
-
-   private:
-    friend class LocklessQueue;
-
-    Pinner(LocklessQueueMemory *memory);
-
-    // Returns true if this pinner is valid.  If it isn't valid, any of the
-    // other methods won't work.  This is here to allow the lockless queue to
-    // only build a pinner if there was one available.
-    bool valid() const { return pinner_index_ != -1 && memory_ != nullptr; }
-
-    // Pointer to the backing memory.
-    LocklessQueueMemory *memory_ = nullptr;
-
-    // Index into the pinner list.
-    int pinner_index_ = -1;
-  };
-
-  // Creates a sender.  If we couldn't allocate a sender, returns nullopt.
-  // TODO(austin): Change the API if we find ourselves with more errors.
-  std::optional<Sender> MakeSender();
-
-  // Creates a pinner.  If we couldn't allocate a pinner, returns nullopt.
-  // TODO(austin): Change the API if we find ourselves with more errors.
-  std::optional<Pinner> MakePinner();
-
  private:
-  LocklessQueueMemory *memory_ = nullptr;
-
   // Memory and datastructure used to sort a list of watchers to wake
   // up.  This isn't a copy of Watcher since tid is simpler to work with here
   // than the futex above.
@@ -346,17 +216,176 @@
     pid_t pid;
     int priority;
   };
-  // TODO(austin): Don't allocate this memory if we aren't going to send.
-  ::std::vector<WatcherCopy> watcher_copy_;
 
-  // Index in the watcher list that our entry is, or -1 if no watcher is
-  // registered.
-  int watcher_index_ = -1;
-
+  const LocklessQueueMemory *const memory_;
   const int pid_;
   const uid_t uid_;
+
+  ::std::vector<WatcherCopy> watcher_copy_;
 };
 
+// Sender for blocks of data.  The resources associated with a sender are
+// scoped to this object's lifetime.
+class LocklessQueueSender {
+ public:
+  LocklessQueueSender(const LocklessQueueSender &) = delete;
+  LocklessQueueSender &operator=(const LocklessQueueSender &) = delete;
+  LocklessQueueSender(LocklessQueueSender &&other)
+      : memory_(other.memory_), sender_index_(other.sender_index_) {
+    other.memory_ = nullptr;
+    other.sender_index_ = -1;
+  }
+  LocklessQueueSender &operator=(LocklessQueueSender &&other) {
+    std::swap(memory_, other.memory_);
+    std::swap(sender_index_, other.sender_index_);
+    return *this;
+  }
+
+  ~LocklessQueueSender();
+
+  // Creates a sender.  If we couldn't allocate a sender, returns nullopt.
+  // TODO(austin): Change the API if we find ourselves with more errors.
+  static std::optional<LocklessQueueSender> Make(LocklessQueue queue);
+
+  // Sends a message without copying the data.
+  // Copy at most size() bytes of data into the memory pointed to by Data(),
+  // and then call Send().
+  // Note: calls to Data() are expensive enough that you should cache it.
+  size_t size() const;
+  void *Data();
+  void Send(size_t length,
+            aos::monotonic_clock::time_point monotonic_remote_time =
+                aos::monotonic_clock::min_time,
+            aos::realtime_clock::time_point realtime_remote_time =
+                aos::realtime_clock::min_time,
+            uint32_t remote_queue_index = 0xffffffff,
+            aos::monotonic_clock::time_point *monotonic_sent_time = nullptr,
+            aos::realtime_clock::time_point *realtime_sent_time = nullptr,
+            uint32_t *queue_index = nullptr);
+
+  // Sends up to length data.  Does not wakeup the target.
+  void Send(const char *data, size_t length,
+            aos::monotonic_clock::time_point monotonic_remote_time =
+                aos::monotonic_clock::min_time,
+            aos::realtime_clock::time_point realtime_remote_time =
+                aos::realtime_clock::min_time,
+            uint32_t remote_queue_index = 0xffffffff,
+            aos::monotonic_clock::time_point *monotonic_sent_time = nullptr,
+            aos::realtime_clock::time_point *realtime_sent_time = nullptr,
+            uint32_t *queue_index = nullptr);
+
+  int buffer_index() const;
+
+ private:
+  LocklessQueueSender(LocklessQueueMemory *memory);
+
+  // Pointer to the backing memory.
+  LocklessQueueMemory *memory_ = nullptr;
+
+  // Index into the sender list.
+  int sender_index_ = -1;
+};
+
+// Pinner for blocks of data.  The resources associated with a pinner are
+// scoped to this object's lifetime.
+class LocklessQueuePinner {
+ public:
+  LocklessQueuePinner(const LocklessQueuePinner &) = delete;
+  LocklessQueuePinner &operator=(const LocklessQueuePinner &) = delete;
+  LocklessQueuePinner(LocklessQueuePinner &&other)
+      : memory_(other.memory_),
+        const_memory_(other.const_memory_),
+        pinner_index_(other.pinner_index_) {
+    other.pinner_index_ = -1;
+  }
+  LocklessQueuePinner &operator=(LocklessQueuePinner &&other) {
+    std::swap(memory_, other.memory_);
+    std::swap(const_memory_, other.const_memory_);
+    std::swap(pinner_index_, other.pinner_index_);
+    return *this;
+  }
+
+  ~LocklessQueuePinner();
+
+  // Creates a pinner.  If we couldn't allocate a pinner, returns nullopt.
+  // TODO(austin): Change the API if we find ourselves with more errors.
+  static std::optional<LocklessQueuePinner> Make(LocklessQueue queue);
+
+  // Attempts to pin the message at queue_index.
+  // Un-pins the previous message.
+  // Returns the buffer index (non-negative) if it succeeds.
+  // Returns -1 if that message is no longer in the queue.
+  int PinIndex(uint32_t queue_index);
+
+  // Read at most size() bytes of data into the memory pointed to by Data().
+  // Note: calls to Data() are expensive enough that you should cache it.
+  // Don't call Data() before a successful PinIndex call.
+  size_t size() const;
+  const void *Data() const;
+
+ private:
+  LocklessQueuePinner(LocklessQueueMemory *memory,
+                      const LocklessQueueMemory *const_memory);
+
+  // Pointer to the backing memory.
+  LocklessQueueMemory *memory_ = nullptr;
+  const LocklessQueueMemory *const_memory_ = nullptr;
+
+  // Index into the pinner list.
+  int pinner_index_ = -1;
+};
+
+class LocklessQueueReader {
+ public:
+  enum class Result { TOO_OLD, GOOD, NOTHING_NEW, OVERWROTE };
+
+  LocklessQueueReader(LocklessQueue queue) : memory_(queue.const_memory()) {
+    queue.Initialize();
+  }
+
+  // If you ask for a queue index 2 past the newest, you will still get
+  // NOTHING_NEW until that gets overwritten with new data.  If you ask for an
+  // element newer than QueueSize() from the current message, we consider it
+  // behind by a large amount and return TOO_OLD.  If the message is modified
+  // out from underneath us as we read it, return OVERWROTE.
+  //
+  // data may be nullptr to indicate the data should not be copied.
+  Result Read(uint32_t queue_index,
+              ::aos::monotonic_clock::time_point *monotonic_sent_time,
+              ::aos::realtime_clock::time_point *realtime_sent_time,
+              ::aos::monotonic_clock::time_point *monotonic_remote_time,
+              ::aos::realtime_clock::time_point *realtime_remote_time,
+              uint32_t *remote_queue_index, size_t *length, char *data) const;
+
+  // Returns the index to the latest queue message.  Returns empty_queue_index()
+  // if there are no messages in the queue.  Do note that this index wraps if
+  // more than 2^32 messages are sent.
+  QueueIndex LatestIndex() const;
+
+ private:
+  const LocklessQueueMemory *const memory_;
+};
+
+// Returns the number of messages which are logically in the queue at a time.
+size_t LocklessQueueSize(const LocklessQueueMemory *memory);
+
+// Returns the number of bytes queue users are allowed to read/write within each
+// message.
+size_t LocklessQueueMessageDataSize(const LocklessQueueMemory *memory);
+
+// TODO(austin): Return the oldest queue index.  This lets us catch up nicely
+// if we got behind.
+// The easiest way to implement this is likely going to be to reserve the
+// first modulo of values for the initial time around, and never reuse them.
+// That lets us do a simple atomic read of the next index and deduce what has
+// happened.  It will involve the simplest atomic operations.
+
+// TODO(austin): Make it so we can find the indices which were sent just
+// before and after a time with a binary search.
+
+// Prints to stdout the data inside the queue for debugging.
+void PrintLocklessQueueMemory(LocklessQueueMemory *memory);
+
 }  // namespace ipc_lib
 }  // namespace aos