Implement a lockless queue

This uses atomics to swap pointers from an array.  It is safe against
process death, and concurent writes.  It also uses signals to wakeup any
processes waiting for new data.

https://docs.google.com/document/d/10xulameLtEqjBFkm54UcN-5N-w5Q_XFNILvNf1Jl1Y4/edit#

Change-Id: Ie7b2aea6f869c1d84e0705aadb21d33fa8241b60
diff --git a/aos/ipc_lib/queue_racer.cc b/aos/ipc_lib/queue_racer.cc
new file mode 100644
index 0000000..53490a0
--- /dev/null
+++ b/aos/ipc_lib/queue_racer.cc
@@ -0,0 +1,309 @@
+#include "aos/ipc_lib/queue_racer.h"
+
+#include <inttypes.h>
+#include <string.h>
+#include <limits>
+
+#include "aos/event.h"
+#include "gtest/gtest.h"
+
+namespace aos {
+namespace ipc_lib {
+namespace {
+
+struct ThreadPlusCount {
+  int thread;
+  uint64_t count;
+};
+
+}  // namespace
+
+struct ThreadState {
+  ::std::thread thread;
+  Event ready;
+  uint64_t event_count = ::std::numeric_limits<uint64_t>::max();
+};
+
+QueueRacer::QueueRacer(LocklessQueueMemory *memory, int num_threads,
+                       uint64_t num_messages, LocklessQueueConfiguration config)
+    : memory_(memory),
+      num_threads_(num_threads),
+      num_messages_(num_messages),
+      config_(config) {
+  Reset();
+}
+
+void QueueRacer::RunIteration(bool race_reads, int write_wrap_count) {
+  const bool will_wrap = num_messages_ * num_threads_ *
+                             static_cast<uint64_t>(1 + write_wrap_count) >
+                         config_.queue_size;
+
+  // Clear out shmem.
+  Reset();
+  started_writes_ = 0;
+  finished_writes_ = 0;
+
+  // Event used to start all the threads processing at once.
+  Event run;
+
+  ::std::atomic<bool> poll_index;
+  poll_index = true;
+
+  // List of threads.
+  ::std::vector<ThreadState> threads(num_threads_);
+
+  ::std::thread queue_index_racer([this, &poll_index]() {
+    LocklessQueue queue(memory_, config_);
+
+    // Track the number of times we wrap, and cache the modulo.
+    uint64_t wrap_count = 0;
+    uint32_t last_queue_index = 0;
+    const uint32_t max_queue_index =
+        QueueIndex::MaxIndex(0xffffffffu, queue.QueueSize());
+    while (poll_index) {
+      // We want to read everything backwards.  This will give us conservative
+      // bounds.  And with enough time and randomness, we will see all the cases
+      // we care to see.
+
+      // These 3 numbers look at the same thing, but at different points of time
+      // in the process.  The process (essentially) looks like:
+      //
+      // ++started_writes;
+      // ++latest_queue_index;
+      // ++finished_writes;
+      //
+      // We want to check that latest_queue_index is bounded by the number of
+      // writes started and finished.  Basically, we can say that
+      // finished_writes < latest_queue_index always.  And
+      // latest_queue_index < started_writes.  And everything always increases.
+      // So, if we let more time elapse between sampling finished_writes and
+      // latest_queue_index, we will only be relaxing our bounds, not
+      // invalidating the check.  The same goes for started_writes.
+      //
+      // So, grab them in order.
+      const uint64_t finished_writes = finished_writes_.load();
+      const uint32_t latest_queue_index_uint32_t = queue.LatestQueueIndex();
+      const uint64_t started_writes = started_writes_.load();
+
+      uint64_t latest_queue_index = latest_queue_index_uint32_t;
+
+      if (latest_queue_index_uint32_t != LocklessQueue::empty_queue_index()) {
+        // If we got smaller, we wrapped.
+        if (latest_queue_index_uint32_t < last_queue_index) {
+          ++wrap_count;
+        }
+        // And apply it.
+        latest_queue_index +=
+            static_cast<uint64_t>(max_queue_index) * wrap_count;
+        last_queue_index = latest_queue_index_uint32_t;
+      }
+
+      // For grins, check that we have always started more than we finished.
+      // Should never fail.
+      EXPECT_GE(started_writes, finished_writes);
+
+      // If we are at the beginning, the queue needs to always return empty.
+      if (started_writes == 0) {
+        EXPECT_EQ(latest_queue_index_uint32_t,
+                  LocklessQueue::empty_queue_index());
+        EXPECT_EQ(finished_writes, 0);
+      } else {
+        if (finished_writes == 0) {
+          // Plausible to be at the beginning.
+          if (latest_queue_index_uint32_t !=
+              LocklessQueue::empty_queue_index()) {
+            // Otherwise, we have started.  The queue is always allowed to
+            EXPECT_GE(started_writes, latest_queue_index + 1);
+          }
+        } else {
+          EXPECT_NE(latest_queue_index_uint32_t,
+                    LocklessQueue::empty_queue_index());
+          // latest_queue_index is an index, not a count.  So it always reads 1
+          // low.
+          EXPECT_GE(latest_queue_index + 1, finished_writes);
+        }
+      }
+    }
+  });
+
+  // Build up each thread and kick it off.
+  int thread_index = 0;
+  for (ThreadState &t : threads) {
+    if (will_wrap) {
+      t.event_count = ::std::numeric_limits<uint64_t>::max();
+    } else {
+      t.event_count = 0;
+    }
+    t.thread =
+        ::std::thread([this, &t, thread_index, &run, write_wrap_count]() {
+          // Build up a sender.
+          LocklessQueue queue(memory_, config_);
+          LocklessQueue::Sender sender = queue.MakeSender();
+
+          // Signal that we are ready to start sending.
+          t.ready.Set();
+
+          // Wait until signaled to start running.
+          run.Wait();
+
+          // Gogogo!
+          for (uint64_t i = 0; i < num_messages_ * static_cast<uint64_t>(1 + write_wrap_count); ++i) {
+            char data[sizeof(ThreadPlusCount)];
+            ThreadPlusCount tpc;
+            tpc.thread = thread_index;
+            tpc.count = i;
+
+            memcpy(data, &tpc, sizeof(ThreadPlusCount));
+
+            if (i % 0x800000 == 0x100000) {
+              fprintf(stderr, "Sent %" PRIu64 ", %f %%\n", i,
+                      static_cast<double>(i) /
+                          static_cast<double>(num_messages_ *
+                                              (1 + write_wrap_count)) *
+                          100.0);
+            }
+
+            ++started_writes_;
+            sender.Send(data, sizeof(ThreadPlusCount));
+            ++finished_writes_;
+          }
+        });
+    ++thread_index;
+  }
+
+  // Wait until all the threads are ready.
+  for (ThreadState &t : threads) {
+    t.ready.Wait();
+  }
+
+  // And start them racing.
+  run.Set();
+
+  // Let all the threads finish before reading if we are supposed to not be
+  // racing reads.
+  if (!race_reads) {
+    for (ThreadState &t : threads) {
+      t.thread.join();
+    }
+    poll_index = false;
+    queue_index_racer.join();
+  }
+
+  CheckReads(race_reads, write_wrap_count, &threads);
+
+  // Reap all the threads.
+  if (race_reads) {
+    for (ThreadState &t : threads) {
+      t.thread.join();
+    }
+    poll_index = false;
+    queue_index_racer.join();
+  }
+
+  // Confirm that the number of writes matches the expected number of writes.
+  ASSERT_EQ(num_threads_ * num_messages_ * (1 + write_wrap_count),
+            started_writes_);
+  ASSERT_EQ(num_threads_ * num_messages_ * (1 + write_wrap_count),
+            finished_writes_);
+
+  // And that every thread sent the right number of messages.
+  for (ThreadState &t : threads) {
+    if (will_wrap) {
+      if (!race_reads) {
+        // If we are wrapping, there is a possibility that a thread writes
+        // everything *before* we can read any of it, and it all gets
+        // overwritten.
+        ASSERT_TRUE(t.event_count == ::std::numeric_limits<uint64_t>::max() ||
+                    t.event_count == (1 + write_wrap_count) * num_messages_)
+            << ": Got " << t.event_count << " events, expected "
+            << (1 + write_wrap_count) * num_messages_;
+      }
+    } else {
+      ASSERT_EQ(t.event_count, num_messages_);
+    }
+  }
+}
+
+void QueueRacer::CheckReads(bool race_reads, int write_wrap_count,
+                            ::std::vector<ThreadState> *threads) {
+  // Now read back the results to double check.
+  LocklessQueue queue(memory_, config_);
+
+  const bool will_wrap =
+      num_messages_ * num_threads_ * (1 + write_wrap_count) > queue.QueueSize();
+
+  monotonic_clock::time_point last_monotonic_sent_time =
+      monotonic_clock::epoch();
+  uint64_t initial_i = 0;
+  if (will_wrap) {
+    initial_i = (1 + write_wrap_count) * num_messages_ * num_threads_ -
+                queue.QueueSize();
+  }
+
+  for (uint64_t i = initial_i;
+       i < (1 + write_wrap_count) * num_messages_ * num_threads_; ++i) {
+    ::aos::monotonic_clock::time_point monotonic_sent_time;
+    ::aos::realtime_clock::time_point realtime_sent_time;
+    size_t length;
+    char read_data[1024];
+
+    // Handle overflowing the message count for the wrap test.
+    const uint32_t wrapped_i = i % static_cast<size_t>(QueueIndex::MaxIndex(
+                                       0xffffffffu, queue.QueueSize()));
+    LocklessQueue::ReadResult read_result =
+        queue.Read(wrapped_i, &monotonic_sent_time, &realtime_sent_time,
+                   &length, &(read_data[0]));
+
+    if (race_reads) {
+      if (read_result == LocklessQueue::ReadResult::NOTHING_NEW) {
+        --i;
+        continue;
+      }
+    }
+
+    if (race_reads && will_wrap) {
+      if (read_result == LocklessQueue::ReadResult::TOO_OLD) {
+        continue;
+      }
+    }
+    // Every message should be good.
+    ASSERT_EQ(read_result, LocklessQueue::ReadResult::GOOD) << ": i is " << i;
+
+    // And, confirm that time never went backwards.
+    ASSERT_GT(monotonic_sent_time, last_monotonic_sent_time);
+    last_monotonic_sent_time = monotonic_sent_time;
+
+    ThreadPlusCount tpc;
+    ASSERT_EQ(length, sizeof(ThreadPlusCount));
+    memcpy(&tpc, read_data, sizeof(ThreadPlusCount));
+
+    if (will_wrap) {
+      // The queue won't chang out from under us, so we should get some amount
+      // of the tail end of the messages from a a thread.
+      // Confirm that once we get our first message, they all show up.
+      if ((*threads)[tpc.thread].event_count ==
+          ::std::numeric_limits<uint64_t>::max()) {
+        (*threads)[tpc.thread].event_count = tpc.count;
+      }
+
+      if (race_reads) {
+        // Make sure nothing goes backwards.  Really not much we can do here.
+        ASSERT_LE((*threads)[tpc.thread].event_count, tpc.count) << ": Thread "
+                                                                 << tpc.thread;
+        (*threads)[tpc.thread].event_count = tpc.count;
+      } else {
+        // Make sure nothing goes backwards.  Really not much we can do here.
+        ASSERT_EQ((*threads)[tpc.thread].event_count, tpc.count) << ": Thread "
+                                                                 << tpc.thread;
+      }
+    } else {
+      // Confirm that we see every message counter from every thread.
+      ASSERT_EQ((*threads)[tpc.thread].event_count, tpc.count) << ": Thread "
+                                                               << tpc.thread;
+    }
+    ++(*threads)[tpc.thread].event_count;
+  }
+}
+
+}  // namespace ipc_lib
+}  // namespace aos