Improve const-correctness in the LocklessQueue code

This will allow using a read-only mapping for reading data from queues
in ShmEventLoop. This will make it much harder for code reading from a
queue to accidentally modify it.

This involves splitting up LocklessQueue into individual components.
This makes it much more obvious what state is used where, and allows
adding some consts.

Change-Id: Ic83b0d2169e6dfae3eec656aa8e49852125698d9
diff --git a/aos/ipc_lib/lockless_queue_test.cc b/aos/ipc_lib/lockless_queue_test.cc
index 4e84b9f..e1e2516 100644
--- a/aos/ipc_lib/lockless_queue_test.cc
+++ b/aos/ipc_lib/lockless_queue_test.cc
@@ -56,15 +56,16 @@
     Reset();
   }
 
-  LocklessQueueMemory *get_memory() {
-    return reinterpret_cast<LocklessQueueMemory *>(&(memory_[0]));
+  LocklessQueue queue() {
+    return LocklessQueue(reinterpret_cast<LocklessQueueMemory *>(&(memory_[0])),
+                         reinterpret_cast<LocklessQueueMemory *>(&(memory_[0])),
+                         config_);
   }
 
-  void Reset() { memset(get_memory(), 0, LocklessQueueMemorySize(config_)); }
+  void Reset() { memset(&memory_[0], 0, LocklessQueueMemorySize(config_)); }
 
   // Runs until the signal is received.
   void RunUntilWakeup(Event *ready, int priority) {
-    LocklessQueue queue(get_memory(), config_);
     internal::EPoll epoll;
     SignalFd signalfd({kWakeupSignal});
 
@@ -75,16 +76,18 @@
       epoll.Quit();
     });
 
-    // Register to be woken up *after* the signalfd is catching the signals.
-    queue.RegisterWakeup(priority);
+    {
+      // Register to be woken up *after* the signalfd is catching the signals.
+      LocklessQueueWatcher watcher =
+          LocklessQueueWatcher::Make(queue(), priority).value();
 
-    // And signal we are now ready.
-    ready->Set();
+      // And signal we are now ready.
+      ready->Set();
 
-    epoll.Run();
+      epoll.Run();
 
-    // Cleanup.
-    queue.UnregisterWakeup();
+      // Cleanup, ensuring the watcher is destroyed before the signalfd.
+    }
     epoll.DeleteFd(signalfd.fd());
   }
 
@@ -99,36 +102,35 @@
 
 // Tests that wakeup doesn't do anything if nothing was registered.
 TEST_F(LocklessQueueTest, NoWatcherWakeup) {
-  LocklessQueue queue(get_memory(), config_);
+  LocklessQueueWakeUpper wake_upper(queue());
 
-  EXPECT_EQ(queue.Wakeup(7), 0);
+  EXPECT_EQ(wake_upper.Wakeup(7), 0);
 }
 
 // Tests that wakeup doesn't do anything if a wakeup was registered and then
 // unregistered.
 TEST_F(LocklessQueueTest, UnregisteredWatcherWakeup) {
-  LocklessQueue queue(get_memory(), config_);
+  LocklessQueueWakeUpper wake_upper(queue());
 
-  queue.RegisterWakeup(5);
-  queue.UnregisterWakeup();
+  { LocklessQueueWatcher::Make(queue(), 5).value(); }
 
-  EXPECT_EQ(queue.Wakeup(7), 0);
+  EXPECT_EQ(wake_upper.Wakeup(7), 0);
 }
 
 // Tests that wakeup doesn't do anything if the thread dies.
 TEST_F(LocklessQueueTest, DiedWatcherWakeup) {
-  LocklessQueue queue(get_memory(), config_);
+  LocklessQueueWakeUpper wake_upper(queue());
 
   ::std::thread([this]() {
     // Use placement new so the destructor doesn't get run.
-    ::std::aligned_storage<sizeof(LocklessQueue), alignof(LocklessQueue)>::type
-        data;
-    LocklessQueue *q = new (&data) LocklessQueue(get_memory(), config_);
-    // Register a wakeup.
-    q->RegisterWakeup(5);
-  }).join();
+    ::std::aligned_storage<sizeof(LocklessQueueWatcher),
+                           alignof(LocklessQueueWatcher)>::type data;
+    new (&data)
+        LocklessQueueWatcher(LocklessQueueWatcher::Make(queue(), 5).value());
+  })
+      .join();
 
-  EXPECT_EQ(queue.Wakeup(7), 0);
+  EXPECT_EQ(wake_upper.Wakeup(7), 0);
 }
 
 struct WatcherState {
@@ -155,16 +157,13 @@
 
     WatcherState *s = &queues.back();
     queues.back().t = ::std::thread([this, &cleanup, s]() {
-      LocklessQueue q(get_memory(), config_);
-      EXPECT_TRUE(q.RegisterWakeup(0));
+      LocklessQueueWatcher q = LocklessQueueWatcher::Make(queue(), 0).value();
 
       // Signal that this thread is ready.
       s->ready.Set();
 
       // And wait until we are asked to shut down.
       cleanup.Wait();
-
-      q.UnregisterWakeup();
     });
   }
 
@@ -174,12 +173,9 @@
   }
 
   // Now try to allocate another one.  This will fail.
-  {
-    LocklessQueue queue(get_memory(), config_);
-    EXPECT_FALSE(queue.RegisterWakeup(0));
-  }
+  EXPECT_FALSE(LocklessQueueWatcher::Make(queue(), 0));
 
-  // Trigger the threads to cleanup their resources, and wait unti they are
+  // Trigger the threads to cleanup their resources, and wait until they are
   // done.
   cleanup.Set();
   for (WatcherState &w : queues) {
@@ -187,23 +183,16 @@
   }
 
   // We should now be able to allocate a wakeup.
-  {
-    LocklessQueue queue(get_memory(), config_);
-    EXPECT_TRUE(queue.RegisterWakeup(0));
-    queue.UnregisterWakeup();
-  }
+  EXPECT_TRUE(LocklessQueueWatcher::Make(queue(), 0));
 }
 
 // Tests that too many watchers dies like expected.
 TEST_F(LocklessQueueTest, TooManySenders) {
-  ::std::vector<::std::unique_ptr<LocklessQueue>> queues;
-  ::std::vector<LocklessQueue::Sender> senders;
+  ::std::vector<LocklessQueueSender> senders;
   for (size_t i = 0; i < config_.num_senders; ++i) {
-    queues.emplace_back(new LocklessQueue(get_memory(), config_));
-    senders.emplace_back(queues.back()->MakeSender().value());
+    senders.emplace_back(LocklessQueueSender::Make(queue()).value());
   }
-  queues.emplace_back(new LocklessQueue(get_memory(), config_));
-  EXPECT_FALSE(queues.back()->MakeSender());
+  EXPECT_FALSE(LocklessQueueSender::Make(queue()));
 }
 
 // Now, start 2 threads and have them receive the signals.
@@ -212,7 +201,7 @@
   EXPECT_LE(kWakeupSignal, SIGRTMAX);
   EXPECT_GE(kWakeupSignal, SIGRTMIN);
 
-  LocklessQueue queue(get_memory(), config_);
+  LocklessQueueWakeUpper wake_upper(queue());
 
   // Event used to make sure the thread is ready before the test starts.
   Event ready1;
@@ -225,7 +214,7 @@
   ready1.Wait();
   ready2.Wait();
 
-  EXPECT_EQ(queue.Wakeup(3), 2);
+  EXPECT_EQ(wake_upper.Wakeup(3), 2);
 
   t1.join();
   t2.join();
@@ -237,15 +226,14 @@
 
 // Do a simple send test.
 TEST_F(LocklessQueueTest, Send) {
-  LocklessQueue queue(get_memory(), config_);
-
-  LocklessQueue::Sender sender = queue.MakeSender().value();
+  LocklessQueueSender sender = LocklessQueueSender::Make(queue()).value();
+  LocklessQueueReader reader(queue());
 
   // Send enough messages to wrap.
   for (int i = 0; i < 20000; ++i) {
     // Confirm that the queue index makes sense given the number of sends.
-    EXPECT_EQ(queue.LatestQueueIndex().index(),
-              i == 0 ? LocklessQueue::empty_queue_index().index() : i - 1);
+    EXPECT_EQ(reader.LatestIndex().index(),
+              i == 0 ? QueueIndex::Invalid().index() : i - 1);
 
     // Send a trivial piece of data.
     char data[100];
@@ -254,7 +242,7 @@
 
     // Confirm that the queue index still makes sense.  This is easier since the
     // empty case has been handled.
-    EXPECT_EQ(queue.LatestQueueIndex().index(), i);
+    EXPECT_EQ(reader.LatestIndex().index(), i);
 
     // Read a result from 5 in the past.
     ::aos::monotonic_clock::time_point monotonic_sent_time;
@@ -271,15 +259,15 @@
     } else {
       index = index.IncrementBy(i - 5);
     }
-    LocklessQueue::ReadResult read_result =
-        queue.Read(index.index(), &monotonic_sent_time, &realtime_sent_time,
-                   &monotonic_remote_time, &realtime_remote_time,
-                   &remote_queue_index, &length, &(read_data[0]));
+    LocklessQueueReader::Result read_result =
+        reader.Read(index.index(), &monotonic_sent_time, &realtime_sent_time,
+                    &monotonic_remote_time, &realtime_remote_time,
+                    &remote_queue_index, &length, &(read_data[0]));
 
     // This should either return GOOD, or TOO_OLD if it is before the start of
     // the queue.
-    if (read_result != LocklessQueue::ReadResult::GOOD) {
-      EXPECT_EQ(read_result, LocklessQueue::ReadResult::TOO_OLD);
+    if (read_result != LocklessQueueReader::Result::GOOD) {
+      EXPECT_EQ(read_result, LocklessQueueReader::Result::TOO_OLD);
     }
   }
 }
@@ -295,9 +283,8 @@
 
   const chrono::seconds print_frequency(FLAGS_print_rate);
 
-  QueueRacer racer(get_memory(), FLAGS_thread_count, kNumMessages, config_);
-  const monotonic_clock::time_point start_time =
-      monotonic_clock::now();
+  QueueRacer racer(queue(), FLAGS_thread_count, kNumMessages);
+  const monotonic_clock::time_point start_time = monotonic_clock::now();
   const monotonic_clock::time_point end_time =
       start_time + chrono::seconds(FLAGS_duration);
 
@@ -333,7 +320,7 @@
 // Send enough messages to wrap the 32 bit send counter.
 TEST_F(LocklessQueueTest, WrappedSend) {
   uint64_t kNumMessages = 0x100010000ul;
-  QueueRacer racer(get_memory(), 1, kNumMessages, config_);
+  QueueRacer racer(queue(), 1, kNumMessages);
 
   const monotonic_clock::time_point start_time = monotonic_clock::now();
   EXPECT_NO_FATAL_FAILURE(racer.RunIteration(false, 0));