Improve const-correctness in the LocklessQueue code

This will allow using a read-only mapping for reading data from queues
in ShmEventLoop. This will make it much harder for code reading from a
queue to accidentally modify it.

This involves splitting up LocklessQueue into individual components.
This makes it much more obvious what state is used where, and allows
adding some consts.

Change-Id: Ic83b0d2169e6dfae3eec656aa8e49852125698d9
diff --git a/aos/ipc_lib/queue_racer.cc b/aos/ipc_lib/queue_racer.cc
index 69f5f21..fcc8668 100644
--- a/aos/ipc_lib/queue_racer.cc
+++ b/aos/ipc_lib/queue_racer.cc
@@ -24,19 +24,16 @@
   uint64_t event_count = ::std::numeric_limits<uint64_t>::max();
 };
 
-QueueRacer::QueueRacer(LocklessQueueMemory *memory, int num_threads,
-                       uint64_t num_messages, LocklessQueueConfiguration config)
-    : memory_(memory),
-      num_threads_(num_threads),
-      num_messages_(num_messages),
-      config_(config) {
+QueueRacer::QueueRacer(LocklessQueue queue, int num_threads,
+                       uint64_t num_messages)
+    : queue_(queue), num_threads_(num_threads), num_messages_(num_messages) {
   Reset();
 }
 
 void QueueRacer::RunIteration(bool race_reads, int write_wrap_count) {
   const bool will_wrap = num_messages_ * num_threads_ *
                              static_cast<uint64_t>(1 + write_wrap_count) >
-                         config_.queue_size;
+                         queue_.config().queue_size;
 
   // Clear out shmem.
   Reset();
@@ -52,13 +49,13 @@
   ::std::vector<ThreadState> threads(num_threads_);
 
   ::std::thread queue_index_racer([this, &poll_index]() {
-    LocklessQueue queue(memory_, config_);
+    LocklessQueueReader reader(queue_);
 
     // Track the number of times we wrap, and cache the modulo.
     uint64_t wrap_count = 0;
     uint32_t last_queue_index = 0;
     const uint32_t max_queue_index =
-        QueueIndex::MaxIndex(0xffffffffu, queue.QueueSize());
+        QueueIndex::MaxIndex(0xffffffffu, queue_.config().queue_size);
     while (poll_index) {
       // We want to read everything backwards.  This will give us conservative
       // bounds.  And with enough time and randomness, we will see all the cases
@@ -81,16 +78,14 @@
       //
       // So, grab them in order.
       const uint64_t finished_writes = finished_writes_.load();
-      const QueueIndex latest_queue_index_queue_index =
-          queue.LatestQueueIndex();
+      const QueueIndex latest_queue_index_queue_index = reader.LatestIndex();
       const uint64_t started_writes = started_writes_.load();
 
       const uint32_t latest_queue_index_uint32_t =
           latest_queue_index_queue_index.index();
       uint64_t latest_queue_index = latest_queue_index_uint32_t;
 
-      if (latest_queue_index_queue_index !=
-          LocklessQueue::empty_queue_index()) {
+      if (latest_queue_index_queue_index != QueueIndex::Invalid()) {
         // If we got smaller, we wrapped.
         if (latest_queue_index_uint32_t < last_queue_index) {
           ++wrap_count;
@@ -107,22 +102,19 @@
 
       // If we are at the beginning, the queue needs to always return empty.
       if (started_writes == 0) {
-        EXPECT_EQ(latest_queue_index_queue_index,
-                  LocklessQueue::empty_queue_index());
+        EXPECT_EQ(latest_queue_index_queue_index, QueueIndex::Invalid());
         EXPECT_EQ(finished_writes, 0);
       } else {
         if (finished_writes == 0) {
           // Plausible to be at the beginning, in which case we don't have
           // anything to check.
-          if (latest_queue_index_queue_index !=
-              LocklessQueue::empty_queue_index()) {
+          if (latest_queue_index_queue_index != QueueIndex::Invalid()) {
             // Otherwise, we have started.  The queue can't have any more
             // entries than this.
             EXPECT_GE(started_writes, latest_queue_index + 1);
           }
         } else {
-          EXPECT_NE(latest_queue_index_queue_index,
-                    LocklessQueue::empty_queue_index());
+          EXPECT_NE(latest_queue_index_queue_index, QueueIndex::Invalid());
           // latest_queue_index is an index, not a count.  So it always reads 1
           // low.
           EXPECT_GE(latest_queue_index + 1, finished_writes);
@@ -142,8 +134,7 @@
     t.thread = ::std::thread([this, &t, thread_index, &run,
                               write_wrap_count]() {
       // Build up a sender.
-      LocklessQueue queue(memory_, config_);
-      LocklessQueue::Sender sender = queue.MakeSender().value();
+      LocklessQueueSender sender = LocklessQueueSender::Make(queue_).value();
       CHECK_GE(sender.size(), sizeof(ThreadPlusCount));
 
       // Signal that we are ready to start sending.
@@ -255,17 +246,16 @@
 void QueueRacer::CheckReads(bool race_reads, int write_wrap_count,
                             ::std::vector<ThreadState> *threads) {
   // Now read back the results to double check.
-  LocklessQueue queue(memory_, config_);
-
-  const bool will_wrap =
-      num_messages_ * num_threads_ * (1 + write_wrap_count) > queue.QueueSize();
+  LocklessQueueReader reader(queue_);
+  const bool will_wrap = num_messages_ * num_threads_ * (1 + write_wrap_count) >
+                         LocklessQueueSize(queue_.memory());
 
   monotonic_clock::time_point last_monotonic_sent_time =
       monotonic_clock::epoch();
   uint64_t initial_i = 0;
   if (will_wrap) {
     initial_i = (1 + write_wrap_count) * num_messages_ * num_threads_ -
-                queue.QueueSize();
+                LocklessQueueSize(queue_.memory());
   }
 
   for (uint64_t i = initial_i;
@@ -279,27 +269,28 @@
     char read_data[1024];
 
     // Handle overflowing the message count for the wrap test.
-    const uint32_t wrapped_i = i % static_cast<size_t>(QueueIndex::MaxIndex(
-                                       0xffffffffu, queue.QueueSize()));
-    LocklessQueue::ReadResult read_result =
-        queue.Read(wrapped_i, &monotonic_sent_time, &realtime_sent_time,
-                   &monotonic_remote_time, &realtime_remote_time,
-                   &remote_queue_index, &length, &(read_data[0]));
+    const uint32_t wrapped_i =
+        i % static_cast<size_t>(QueueIndex::MaxIndex(
+                0xffffffffu, LocklessQueueSize(queue_.memory())));
+    LocklessQueueReader::Result read_result =
+        reader.Read(wrapped_i, &monotonic_sent_time, &realtime_sent_time,
+                    &monotonic_remote_time, &realtime_remote_time,
+                    &remote_queue_index, &length, &(read_data[0]));
 
     if (race_reads) {
-      if (read_result == LocklessQueue::ReadResult::NOTHING_NEW) {
+      if (read_result == LocklessQueueReader::Result::NOTHING_NEW) {
         --i;
         continue;
       }
     }
 
     if (race_reads && will_wrap) {
-      if (read_result == LocklessQueue::ReadResult::TOO_OLD) {
+      if (read_result == LocklessQueueReader::Result::TOO_OLD) {
         continue;
       }
     }
     // Every message should be good.
-    ASSERT_EQ(read_result, LocklessQueue::ReadResult::GOOD) << ": i is " << i;
+    ASSERT_EQ(read_result, LocklessQueueReader::Result::GOOD) << ": i is " << i;
 
     // And, confirm that time never went backwards.
     ASSERT_GT(monotonic_sent_time, last_monotonic_sent_time);
@@ -310,7 +301,8 @@
 
     ThreadPlusCount tpc;
     ASSERT_EQ(length, sizeof(ThreadPlusCount));
-    memcpy(&tpc, read_data + queue.message_data_size() - length,
+    memcpy(&tpc,
+           read_data + LocklessQueueMessageDataSize(queue_.memory()) - length,
            sizeof(ThreadPlusCount));
 
     if (will_wrap) {