Implement a lockless queue

This uses atomics to swap pointers from an array.  It is safe against
process death, and concurent writes.  It also uses signals to wakeup any
processes waiting for new data.

https://docs.google.com/document/d/10xulameLtEqjBFkm54UcN-5N-w5Q_XFNILvNf1Jl1Y4/edit#

Change-Id: Ie7b2aea6f869c1d84e0705aadb21d33fa8241b60
diff --git a/aos/ipc_lib/lockless_queue_test.cc b/aos/ipc_lib/lockless_queue_test.cc
new file mode 100644
index 0000000..f3b49b6
--- /dev/null
+++ b/aos/ipc_lib/lockless_queue_test.cc
@@ -0,0 +1,350 @@
+#include "aos/ipc_lib/lockless_queue.h"
+
+#include <inttypes.h>
+#include <signal.h>
+#include <unistd.h>
+#include <wait.h>
+#include <chrono>
+#include <memory>
+#include <random>
+#include <thread>
+
+#include "aos/event.h"
+#include "aos/events/epoll.h"
+#include "aos/init.h"
+#include "aos/ipc_lib/aos_sync.h"
+#include "aos/ipc_lib/queue_racer.h"
+#include "aos/ipc_lib/signalfd.h"
+#include "aos/testing/test_logging.h"
+#include "gflags/gflags.h"
+#include "gtest/gtest.h"
+
+DEFINE_int32(min_iterations, 100,
+             "Minimum number of stress test iterations to run");
+DEFINE_int32(duration, 5, "Number of seconds to test for");
+DEFINE_int32(print_rate, 60, "Number of seconds between status prints");
+
+// The roboRIO can only handle 10 threads before exploding.  Set the default for
+// ARM to 10.
+DEFINE_int32(thread_count,
+#if defined(__ARM_EABI__)
+             10,
+#else
+             100,
+#endif
+             "Number of threads to race");
+
+namespace aos {
+namespace ipc_lib {
+namespace testing {
+
+namespace chrono = ::std::chrono;
+
+class LocklessQueueTest : public ::testing::Test {
+ public:
+  LocklessQueueTest() {
+    ::aos::testing::EnableTestLogging();
+    config_.num_watchers = 10;
+    config_.num_senders = 100;
+    config_.queue_size = 10000;
+    // Exercise the alignment code.  This would throw off alignment.
+    config_.message_data_size = 101;
+
+    // Since our backing store is an array of uint64_t for alignment purposes,
+    // normalize by the size.
+    memory_.resize(LocklessQueueMemorySize(config_) / sizeof(uint64_t));
+
+    Reset();
+  }
+
+  LocklessQueueMemory *get_memory() {
+    return reinterpret_cast<LocklessQueueMemory *>(&(memory_[0]));
+  }
+
+  void Reset() { memset(get_memory(), 0, LocklessQueueMemorySize(config_)); }
+
+  // Runs until the signal is received.
+  void RunUntilWakeup(Event *ready, int priority) {
+    LocklessQueue queue(get_memory(), config_);
+    internal::EPoll epoll;
+    SignalFd signalfd({kWakeupSignal});
+
+    epoll.OnReadable(signalfd.fd(), [&signalfd, &epoll]() {
+      signalfd_siginfo result = signalfd.Read();
+
+      fprintf(stderr, "Got signal: %d\n", result.ssi_signo);
+      epoll.Quit();
+    });
+
+    // Register to be woken up *after* the signalfd is catching the signals.
+    queue.RegisterWakeup(priority);
+
+    // And signal we are now ready.
+    ready->Set();
+
+    epoll.Run();
+
+    // Cleanup.
+    queue.UnregisterWakeup();
+    epoll.DeleteFd(signalfd.fd());
+  }
+
+  // Use a type with enough alignment that we are guarenteed that everything
+  // will be aligned properly on the target platform.
+  ::std::vector<uint64_t> memory_;
+
+  LocklessQueueConfiguration config_;
+};
+
+typedef LocklessQueueTest LocklessQueueDeathTest;
+
+// Tests that wakeup doesn't do anything if nothing was registered.
+TEST_F(LocklessQueueTest, NoWatcherWakeup) {
+  LocklessQueue queue(get_memory(), config_);
+
+  EXPECT_EQ(queue.Wakeup(7), 0);
+}
+
+// Tests that wakeup doesn't do anything if a wakeup was registered and then
+// unregistered.
+TEST_F(LocklessQueueTest, UnregisteredWatcherWakeup) {
+  LocklessQueue queue(get_memory(), config_);
+
+  queue.RegisterWakeup(5);
+  queue.UnregisterWakeup();
+
+  EXPECT_EQ(queue.Wakeup(7), 0);
+}
+
+// Tests that wakeup doesn't do anything if the thread dies.
+TEST_F(LocklessQueueTest, DiedWatcherWakeup) {
+  LocklessQueue queue(get_memory(), config_);
+
+  ::std::thread([this]() {
+    // Use placement new so the destructor doesn't get run.
+    ::std::aligned_storage<sizeof(LocklessQueue), alignof(LocklessQueue)>::type
+        data;
+    LocklessQueue *q = new (&data) LocklessQueue(get_memory(), config_);
+    // Register a wakeup.
+    q->RegisterWakeup(5);
+  }).join();
+
+  EXPECT_EQ(queue.Wakeup(7), 0);
+}
+
+struct WatcherState {
+  ::std::thread t;
+  Event ready;
+};
+
+// Tests that too many watchers fails like expected.
+TEST_F(LocklessQueueTest, TooManyWatchers) {
+  // This is going to be a barrel of monkeys.
+  // We need to spin up a bunch of watchers.  But, they all need to be in
+  // different threads so they have different tids.
+  ::std::vector<WatcherState> queues;
+  // Reserve num_watchers WatcherState objects so the pointer value doesn't
+  // change out from under us below.
+  queues.reserve(config_.num_watchers);
+
+  // Event used to trigger all the threads to unregister.
+  Event cleanup;
+
+  // Start all the threads.
+  for (size_t i = 0; i < config_.num_watchers; ++i) {
+    queues.emplace_back();
+
+    WatcherState *s = &queues.back();
+    queues.back().t = ::std::thread([this, &cleanup, s]() {
+      LocklessQueue q(get_memory(), config_);
+      EXPECT_TRUE(q.RegisterWakeup(0));
+
+      // Signal that this thread is ready.
+      s->ready.Set();
+
+      // And wait until we are asked to shut down.
+      cleanup.Wait();
+
+      q.UnregisterWakeup();
+    });
+  }
+
+  // Wait until all the threads are actually going.
+  for (WatcherState &w : queues) {
+    w.ready.Wait();
+  }
+
+  // Now try to allocate another one.  This will fail.
+  {
+    LocklessQueue queue(get_memory(), config_);
+    EXPECT_FALSE(queue.RegisterWakeup(0));
+  }
+
+  // Trigger the threads to cleanup their resources, and wait unti they are
+  // done.
+  cleanup.Set();
+  for (WatcherState &w : queues) {
+    w.t.join();
+  }
+
+  // We should now be able to allocate a wakeup.
+  {
+    LocklessQueue queue(get_memory(), config_);
+    EXPECT_TRUE(queue.RegisterWakeup(0));
+    queue.UnregisterWakeup();
+  }
+}
+
+// Tests that too many watchers dies like expected.
+TEST_F(LocklessQueueDeathTest, TooManySenders) {
+  EXPECT_DEATH(
+      {
+        ::std::vector<::std::unique_ptr<LocklessQueue>> queues;
+        ::std::vector<LocklessQueue::Sender> senders;
+        for (size_t i = 0; i < config_.num_senders + 1; ++i) {
+          queues.emplace_back(new LocklessQueue(get_memory(), config_));
+          senders.emplace_back(queues.back()->MakeSender());
+        }
+      },
+      "Too many senders");
+}
+
+// Now, start 2 threads and have them receive the signals.
+TEST_F(LocklessQueueTest, WakeUpThreads) {
+  // Confirm that the wakeup signal is in range.
+  EXPECT_LE(kWakeupSignal, SIGRTMAX);
+  EXPECT_GE(kWakeupSignal, SIGRTMIN);
+
+  LocklessQueue queue(get_memory(), config_);
+
+  // Event used to make sure the thread is ready before the test starts.
+  Event ready1;
+  Event ready2;
+
+  // Start the thread.
+  ::std::thread t1([this, &ready1]() { RunUntilWakeup(&ready1, 5); });
+  ::std::thread t2([this, &ready2]() { RunUntilWakeup(&ready2, 4); });
+
+  ready1.Wait();
+  ready2.Wait();
+
+  EXPECT_EQ(queue.Wakeup(3), 2);
+
+  t1.join();
+  t2.join();
+
+  // Clean up afterwords.  We are pretending to be RT when we are really not.
+  // So we will be PI boosted up.
+  UnsetCurrentThreadRealtimePriority();
+}
+
+// Do a simple send test.
+TEST_F(LocklessQueueTest, Send) {
+  LocklessQueue queue(get_memory(), config_);
+
+  LocklessQueue::Sender sender = queue.MakeSender();
+
+  // Send enough messages to wrap.
+  for (int i = 0; i < 20000; ++i) {
+    // Confirm that the queue index makes sense given the number of sends.
+    EXPECT_EQ(queue.LatestQueueIndex(),
+              i == 0 ? LocklessQueue::empty_queue_index() : i - 1);
+
+    // Send a trivial piece of data.
+    char data[100];
+    size_t s = snprintf(data, sizeof(data), "foobar%d", i);
+    sender.Send(data, s);
+
+    // Confirm that the queue index still makes sense.  This is easier since the
+    // empty case has been handled.
+    EXPECT_EQ(queue.LatestQueueIndex(), i);
+
+    // Read a result from 5 in the past.
+    ::aos::monotonic_clock::time_point monotonic_sent_time;
+    ::aos::realtime_clock::time_point realtime_sent_time;
+    char read_data[1024];
+    size_t length;
+
+    QueueIndex index = QueueIndex::Zero(config_.queue_size);
+    if (i - 5 < 0) {
+      index = index.DecrementBy(5 - i);
+    } else {
+      index = index.IncrementBy(i - 5);
+    }
+    LocklessQueue::ReadResult read_result =
+        queue.Read(index.index(), &monotonic_sent_time, &realtime_sent_time,
+                   &length, &(read_data[0]));
+
+    // This should either return GOOD, or TOO_OLD if it is before the start of
+    // the queue.
+    if (read_result != LocklessQueue::ReadResult::GOOD) {
+      EXPECT_EQ(read_result, LocklessQueue::ReadResult::TOO_OLD);
+    }
+  }
+}
+
+// Races a bunch of sending threads to see if it all works.
+TEST_F(LocklessQueueTest, SendRace) {
+  const size_t kNumMessages = 10000 / FLAGS_thread_count;
+
+  ::std::mt19937 generator(0);
+  ::std::uniform_int_distribution<> write_wrap_count_distribution(0, 10);
+  ::std::bernoulli_distribution race_reads_distribution;
+  ::std::bernoulli_distribution wrap_writes_distribution;
+
+  const chrono::seconds print_frequency(FLAGS_print_rate);
+
+  QueueRacer racer(get_memory(), FLAGS_thread_count, kNumMessages, config_);
+  const monotonic_clock::time_point start_time =
+      monotonic_clock::now();
+  const monotonic_clock::time_point end_time =
+      start_time + chrono::seconds(FLAGS_duration);
+
+  monotonic_clock::time_point monotonic_now = start_time;
+  monotonic_clock::time_point next_print_time = start_time + print_frequency;
+  uint64_t messages = 0;
+  for (int i = 0; i < FLAGS_min_iterations || monotonic_now < end_time; ++i) {
+    bool race_reads = race_reads_distribution(generator);
+    int write_wrap_count = write_wrap_count_distribution(generator);
+    if (!wrap_writes_distribution(generator)) {
+      write_wrap_count = 0;
+    }
+    EXPECT_NO_FATAL_FAILURE(racer.RunIteration(race_reads, write_wrap_count))
+        << ": Running with race_reads: " << race_reads
+        << ", and write_wrap_count " << write_wrap_count << " and on iteration "
+        << i;
+
+    messages += racer.CurrentIndex();
+
+    monotonic_now = monotonic_clock::now();
+    if (monotonic_now > next_print_time) {
+      double elapsed_seconds = chrono::duration_cast<chrono::duration<double>>(
+                                   monotonic_now - start_time)
+                                   .count();
+      printf("Finished iteration %d, %f iterations/sec, %f messages/second\n",
+             i, i / elapsed_seconds,
+             static_cast<double>(messages) / elapsed_seconds);
+      next_print_time = monotonic_now + print_frequency;
+    }
+  }
+}
+
+// Send enough messages to wrap the 32 bit send counter.
+TEST_F(LocklessQueueTest, WrappedSend) {
+  uint64_t kNumMessages = 0x100010000ul;
+  QueueRacer racer(get_memory(), 1, kNumMessages, config_);
+
+  const monotonic_clock::time_point start_time = monotonic_clock::now();
+  EXPECT_NO_FATAL_FAILURE(racer.RunIteration(false, 0));
+  const monotonic_clock::time_point monotonic_now = monotonic_clock::now();
+  double elapsed_seconds = chrono::duration_cast<chrono::duration<double>>(
+                               monotonic_now - start_time)
+                               .count();
+  printf("Took %f seconds to write %" PRIu64 " messages, %f messages/s\n",
+         elapsed_seconds, kNumMessages,
+         static_cast<double>(kNumMessages) / elapsed_seconds);
+}
+
+}  // namespace testing
+}  // namespace ipc_lib
+}  // namespace aos