Implement a lockless queue

This uses atomics to swap pointers from an array.  It is safe against
process death, and concurent writes.  It also uses signals to wakeup any
processes waiting for new data.

https://docs.google.com/document/d/10xulameLtEqjBFkm54UcN-5N-w5Q_XFNILvNf1Jl1Y4/edit#

Change-Id: Ie7b2aea6f869c1d84e0705aadb21d33fa8241b60
diff --git a/aos/events/BUILD b/aos/events/BUILD
index 330ad40..ef033f8 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -2,6 +2,7 @@
     name = "epoll",
     srcs = ["epoll.cc"],
     hdrs = ["epoll.h"],
+    visibility = ["//visibility:public"],
     deps = [
         "//aos/logging",
         "//aos/time",
diff --git a/aos/ipc_lib/BUILD b/aos/ipc_lib/BUILD
index 6947df1..76c5324 100644
--- a/aos/ipc_lib/BUILD
+++ b/aos/ipc_lib/BUILD
@@ -164,6 +164,7 @@
 
 cc_library(
     name = "index",
+    srcs = ["index.cc"],
     hdrs = ["index.h"],
 )
 
@@ -176,3 +177,69 @@
         "//aos/testing:test_logging",
     ],
 )
+
+cc_library(
+    name = "lockless_queue",
+    srcs = [
+        "lockless_queue.cc",
+        "lockless_queue_memory.h",
+    ],
+    hdrs = ["lockless_queue.h"],
+    deps = [
+        ":aos_sync",
+        ":index",
+        "//aos:init",
+        "//aos/logging",
+        "//aos/time",
+        "//aos/util:compiler_memory_barrier",
+    ],
+)
+
+cc_library(
+    name = "queue_racer",
+    testonly = True,
+    srcs = [
+        "queue_racer.cc",
+    ],
+    hdrs = [
+        "queue_racer.h",
+    ],
+    deps = [
+        ":lockless_queue",
+        "//aos:event",
+        "//third_party/googletest:gtest",
+    ],
+)
+
+cc_test(
+    name = "lockless_queue_test",
+    timeout = "eternal",
+    srcs = ["lockless_queue_test.cc"],
+    deps = [
+        ":lockless_queue",
+        ":queue_racer",
+        ":signalfd",
+        "//aos:event",
+        "//aos/events:epoll",
+        "//aos/libc:aos_strsignal",
+        "//aos/testing:googletest",
+        "//aos/testing:prevent_exit",
+        "//aos/testing:test_logging",
+    ],
+)
+
+cc_test(
+    name = "lockless_queue_death_test",
+    srcs = ["lockless_queue_death_test.cc"],
+    deps = [
+        ":lockless_queue",
+        ":queue_racer",
+        ":signalfd",
+        "//aos:event",
+        "//aos/events:epoll",
+        "//aos/libc:aos_strsignal",
+        "//aos/testing:googletest",
+        "//aos/testing:prevent_exit",
+        "//aos/testing:test_logging",
+    ],
+)
diff --git a/aos/ipc_lib/index.cc b/aos/ipc_lib/index.cc
new file mode 100644
index 0000000..ed58806
--- /dev/null
+++ b/aos/ipc_lib/index.cc
@@ -0,0 +1,33 @@
+#include "aos/ipc_lib/index.h"
+
+#include <string>
+#include <sstream>
+
+namespace aos {
+namespace ipc_lib {
+
+::std::string QueueIndex::DebugString() const {
+  if (valid()) {
+    ::std::stringstream s;
+    s << "QueueIndex(" << index_ << "/0x" << ::std::hex << index_ << ::std::dec
+      << ", count=" << count_ << ")";
+    return s.str();
+  } else {
+    return "QueueIndex::Invalid()";
+  }
+}
+
+::std::string Index::DebugString() const {
+  if (valid()) {
+    ::std::stringstream s;
+    s << "Index(queue_index=" << queue_index() << "/0x" << ::std::hex
+      << queue_index() << ::std::dec << ", message_index=" << message_index()
+      << ")";
+    return s.str();
+  } else {
+    return "QueueIndex::Invalid()";
+  }
+}
+
+}  // namespace ipc_lib
+}  // namespace aos
diff --git a/aos/ipc_lib/index.h b/aos/ipc_lib/index.h
index 69f5856..b1afdc8 100644
--- a/aos/ipc_lib/index.h
+++ b/aos/ipc_lib/index.h
@@ -3,6 +3,7 @@
 
 #include <sys/types.h>
 #include <atomic>
+#include <string>
 
 namespace aos {
 namespace ipc_lib {
@@ -104,6 +105,11 @@
   // Returns the raw index.  This should be used very sparingly.
   uint32_t index() const { return index_; }
 
+  QueueIndex Clear() const { return QueueIndex(0, count_); }
+
+  // Returns a string representing the index.
+  ::std::string DebugString() const;
+
  private:
   QueueIndex(uint32_t index, uint32_t count) : index_(index), count_(count) {}
 
@@ -183,6 +189,9 @@
 
   bool operator==(const Index other) const { return other.index_ == index_; }
 
+  // Returns a string representing the index.
+  ::std::string DebugString() const;
+
  private:
   Index(uint32_t index)
       : index_(index) {}
@@ -213,6 +222,7 @@
   void RelaxedInvalidate() { RelaxedStore(Index::Invalid()); }
 
   // Full bidirectional barriers here.
+  void Invalidate() { Store(Index::Invalid()); }
   void Store(Index index) { index_.store(index.index_); }
   Index Load() { return Index(index_.load()); }
 
diff --git a/aos/ipc_lib/lockless_queue.cc b/aos/ipc_lib/lockless_queue.cc
new file mode 100644
index 0000000..fb9c0b6
--- /dev/null
+++ b/aos/ipc_lib/lockless_queue.cc
@@ -0,0 +1,878 @@
+#include "aos/ipc_lib/lockless_queue.h"
+
+#include <linux/futex.h>
+#include <sys/types.h>
+#include <syscall.h>
+#include <unistd.h>
+#include <algorithm>
+#include <iomanip>
+#include <iostream>
+#include <sstream>
+
+#include "aos/init.h"
+#include "aos/ipc_lib/lockless_queue_memory.h"
+#include "aos/logging/logging.h"
+#include "aos/util/compiler_memory_barrier.h"
+
+namespace aos {
+namespace ipc_lib {
+
+namespace {
+
+constexpr bool kDebug = false;
+
+void GrabQueueSetupLockOrDie(LocklessQueueMemory *memory) {
+  const int result = mutex_grab(&(memory->queue_setup_lock));
+  CHECK(result == 0 || result == 1);
+}
+
+// This must be called under the queue_setup_lock.
+void Cleanup(LocklessQueueMemory *memory) {
+  const size_t num_senders = memory->num_senders();
+  const size_t queue_size = memory->queue_size();
+  const size_t num_messages = memory->num_messages();
+
+  // There are a large number of crazy cases here for how things can go wrong
+  // and how we have to recover.  They either require us to keep extra track of
+  // what is going on, slowing down the send path, or require a large number of
+  // cases.
+  //
+  // The solution here is to not over-think it.  This is running while not real
+  // time during construction.  It is allowed to be slow.  It will also very
+  // rarely trigger.  There is a small uS window where process death is
+  // ambiguous.
+  //
+  // So, build up a list N long, where N is the number of messages.  Search
+  // through the entire queue and the sender list (ignoring any dead senders),
+  // and mark down which ones we have seen.  Once we have seen all the messages
+  // except the N dead senders, we know which messages are dead.  Because the
+  // queue is active while we do this, it may take a couple of go arounds to see
+  // everything.
+
+  // Do the easy case.  Find all senders who have died.  See if they are either
+  // consistent already, or if they have copied over to_replace to the scratch
+  // index, but haven't cleared to_replace.  Count them.
+  size_t valid_senders = 0;
+  for (size_t i = 0; i < num_senders; ++i) {
+    Sender *sender = memory->GetSender(i);
+    const uint32_t tid =
+        __atomic_load_n(&(sender->tid.futex), __ATOMIC_RELAXED);
+    if (tid & FUTEX_OWNER_DIED) {
+      if (kDebug) {
+        printf("Found an easy death for sender %zu\n", i);
+      }
+      const Index to_replace = sender->to_replace.RelaxedLoad();
+      const Index scratch_index = sender->scratch_index.Load();
+
+      // I find it easiest to think about this in terms of the set of observable
+      // states.  The main code follows the following states:
+
+      // 1) scratch_index = xxx
+      //    to_replace = invalid
+      // This is unambiguous.  Already good.
+
+      // 2) scratch_index = xxx
+      //    to_replace = yyy
+      // Very ambiguous.  Is xxx or yyy the correct one?  Need to either roll
+      // this forwards or backwards.
+
+      // 3) scratch_index = yyy
+      //    to_replace = yyy
+      // We are in the act of moving to_replace to scratch_index, but didn't
+      // finish.  Easy.
+
+      // 4) scratch_index = yyy
+      //    to_replace = invalid
+      // Finished, but died.  Looks like 1)
+
+      // Any cleanup code needs to follow the same set of states to be robust to
+      // death, so death can be restarted.
+
+      // Could be 2) or 3).
+      if (to_replace.valid()) {
+        // 3)
+        if (to_replace == scratch_index) {
+          // Just need to invalidate to_replace to finish.
+          sender->to_replace.Invalidate();
+
+          // And mark that we succeeded.
+          __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_SEQ_CST);
+          ++valid_senders;
+        }
+      } else {
+        // 1) or 4).  Make sure we aren't corrupted and declare victory.
+        CHECK(scratch_index.valid());
+
+        __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_SEQ_CST);
+        ++valid_senders;
+      }
+    } else {
+      // Not dead.
+      ++valid_senders;
+    }
+  }
+
+  // If all the senders are (or were made) good, there is no need to do the hard
+  // case.
+  if (valid_senders == num_senders) {
+    return;
+  }
+
+  if (kDebug) {
+    printf("Starting hard cleanup\n");
+  }
+
+  size_t num_accounted_for = 0;
+  size_t num_missing = 0;
+  ::std::vector<bool> accounted_for(num_messages, false);
+
+  while ((num_accounted_for + num_missing) != num_messages) {
+    num_missing = 0;
+    for (size_t i = 0; i < num_senders; ++i) {
+      Sender *sender = memory->GetSender(i);
+      const uint32_t tid =
+          __atomic_load_n(&(sender->tid.futex), __ATOMIC_RELAXED);
+      if (tid & FUTEX_OWNER_DIED) {
+        ++num_missing;
+      } else {
+        const Index scratch_index = sender->scratch_index.RelaxedLoad();
+        if (!accounted_for[scratch_index.message_index()]) {
+          ++num_accounted_for;
+        }
+        accounted_for[scratch_index.message_index()] = true;
+      }
+    }
+
+    for (size_t i = 0; i < queue_size; ++i) {
+      const Index index = memory->GetQueue(i)->RelaxedLoad();
+      if (!accounted_for[index.message_index()]) {
+        ++num_accounted_for;
+      }
+      accounted_for[index.message_index()] = true;
+    }
+  }
+
+  while (num_missing != 0) {
+    const size_t starting_num_missing = num_missing;
+    for (size_t i = 0; i < num_senders; ++i) {
+      Sender *sender = memory->GetSender(i);
+      const uint32_t tid =
+          __atomic_load_n(&(sender->tid.futex), __ATOMIC_RELAXED);
+      if (tid & FUTEX_OWNER_DIED) {
+        const Index scratch_index = sender->scratch_index.RelaxedLoad();
+        const Index to_replace = sender->to_replace.RelaxedLoad();
+
+        // Candidate.
+        CHECK_LE(to_replace.message_index(), accounted_for.size());
+        if (accounted_for[to_replace.message_index()]) {
+          if (kDebug) {
+            printf("Sender %zu died, to_replace is already accounted for\n", i);
+          }
+          // If both are accounted for, we are corrupt...
+          CHECK(!accounted_for[scratch_index.message_index()]);
+
+          // to_replace is already accounted for.  This means that we didn't
+          // atomically insert scratch_index into the queue yet.  So
+          // invalidate to_replace.
+          sender->to_replace.Invalidate();
+
+          // And then mark this sender clean.
+          __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_SEQ_CST);
+
+          // And account for scratch_index.
+          accounted_for[scratch_index.message_index()] = true;
+          --num_missing;
+          ++num_accounted_for;
+        } else if (accounted_for[scratch_index.message_index()]) {
+          if (kDebug) {
+            printf("Sender %zu died, scratch_index is already accounted for\n", i);
+          }
+          // scratch_index is accounted for.  That means we did the insert,
+          // but didn't record it.
+          CHECK(to_replace.valid());
+          // Finish the transaction.  Copy to_replace, then clear it.
+
+          sender->scratch_index.Store(to_replace);
+          sender->to_replace.Invalidate();
+
+          // And then mark this sender clean.
+          __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_SEQ_CST);
+
+          // And account for to_replace.
+          accounted_for[to_replace.message_index()] = true;
+          --num_missing;
+          ++num_accounted_for;
+        } else {
+          if (kDebug) {
+            printf("Sender %zu died, neither is accounted for\n", i);
+          }
+          // Ambiguous.  There will be an unambiguous one somewhere that we
+          // can do first.
+        }
+      }
+    }
+    // CHECK that we are making progress.
+    CHECK_NE(num_missing, starting_num_missing);
+  }
+}
+
+// Exposes rt_tgsigqueueinfo so we can send the signal *just* to the target
+// thread.
+int rt_tgsigqueueinfo(pid_t tgid, pid_t tid, int sig, siginfo_t *si) {
+  return syscall(SYS_rt_tgsigqueueinfo, tgid, tid, sig, si);
+}
+
+}  // namespace
+
+size_t LocklessQueueMemorySize(LocklessQueueConfiguration config) {
+  // Round up the message size so following data is double aligned.  That should
+  // be overkill for most platforms.  And the checks below confirms it.
+  config.message_data_size = (config.message_data_size + 7) & ~0x7;
+
+  // As we build up the size, confirm that everything is aligned to the
+  // alignment requirements of the type.
+  size_t size = sizeof(LocklessQueueMemory);
+  CHECK_EQ(size & (alignof(LocklessQueueMemory) - 1), 0u);
+
+  CHECK_EQ(size & (alignof(AtomicIndex) - 1), 0u);
+  size += LocklessQueueMemory::SizeOfQueue(config);
+
+  CHECK_EQ(size & (alignof(Message) - 1), 0u);
+  size += LocklessQueueMemory::SizeOfMessages(config);
+
+  CHECK_EQ(size & (alignof(Watcher) - 1), 0u);
+  size += LocklessQueueMemory::SizeOfWatchers(config);
+
+  CHECK_EQ(size & (alignof(Sender) - 1), 0u);
+  size += LocklessQueueMemory::SizeOfSenders(config);
+
+  return size;
+}
+
+LocklessQueueMemory *InitializeLocklessQueueMemory(
+    LocklessQueueMemory *memory, LocklessQueueConfiguration config) {
+  // Everything should be zero initialized already.  So we just need to fill
+  // everything out properly.
+
+  // Grab the mutex.  We don't care if the previous reader died.  We are going
+  // to check everything anyways.
+  GrabQueueSetupLockOrDie(memory);
+
+  if (!memory->initialized) {
+    // TODO(austin): Check these for out of bounds.
+    memory->config.num_watchers = config.num_watchers;
+    memory->config.num_senders = config.num_senders;
+    memory->config.queue_size = config.queue_size;
+    // Round up to the nearest double word bytes.
+    memory->config.message_data_size = (config.message_data_size + 7) & ~0x7;
+
+    const size_t num_messages = memory->num_messages();
+    // There need to be at most MaxMessages() messages allocated.
+    CHECK_LE(num_messages, Index::MaxMessages());
+
+    for (size_t i = 0; i < num_messages; ++i) {
+      memory->GetMessage(Index(QueueIndex::Zero(memory->queue_size()), i))
+          ->header.queue_index.Invalidate();
+    }
+
+    for (size_t i = 0; i < memory->queue_size(); ++i) {
+      // Make the initial counter be the furthest away number.  That means that
+      // index 0 should be 0xffff, 1 should be 0, etc.
+      memory->GetQueue(i)->Store(Index(QueueIndex::Zero(memory->queue_size())
+                                           .IncrementBy(i)
+                                           .DecrementBy(memory->queue_size()),
+                                       i));
+    }
+
+    memory->next_queue_index.Invalidate();
+
+    for (size_t i = 0; i < memory->num_senders(); ++i) {
+      ::aos::ipc_lib::Sender *s = memory->GetSender(i);
+      s->scratch_index.Store(Index(0xffff, i + memory->queue_size()));
+      s->to_replace.RelaxedInvalidate();
+    }
+
+    // Signal everything is done.  This needs to be done last, so if we die, we
+    // redo initialization.
+    // This is a full atomic (probably overkill), but this is at initialization
+    // time, so it is cheap.
+    memory->initialized.store(true);
+  }
+
+  mutex_unlock(&(memory->queue_setup_lock));
+  return memory;
+}
+
+LocklessQueue::LocklessQueue(LocklessQueueMemory *memory,
+                             LocklessQueueConfiguration config)
+    : memory_(InitializeLocklessQueueMemory(memory, config)),
+      watcher_copy_(memory_->num_watchers()),
+      pid_(getpid()),
+      uid_(getuid()) {}
+
+LocklessQueue::~LocklessQueue() {
+  CHECK_EQ(watcher_index_, -1);
+
+  GrabQueueSetupLockOrDie(memory_);
+  const int num_watchers = memory_->num_watchers();
+  // Cleanup is cheap.  Go for it anyways.
+
+  // And confirm that nothing is owned by us.
+  for (int i = 0; i < num_watchers; ++i) {
+    CHECK(!mutex_islocked(&(memory_->GetWatcher(i)->tid)));
+  }
+  mutex_unlock(&(memory_->queue_setup_lock));
+}
+
+size_t LocklessQueue::QueueSize() const { return memory_->queue_size(); }
+
+bool LocklessQueue::RegisterWakeup(int priority) {
+  // TODO(austin): Make sure signal coalescing is turned on.  We don't need
+  // duplicates.  That will improve performance under high load.
+
+  // Since everything is self consistent, all we need to do is make sure nobody
+  // else is running.  Someone dying will get caught in the generic consistency
+  // check.
+  GrabQueueSetupLockOrDie(memory_);
+  const int num_watchers = memory_->num_watchers();
+
+  // Now, find the first empty watcher and grab it.
+  CHECK_EQ(watcher_index_, -1);
+  for (int i = 0; i < num_watchers; ++i) {
+    const uint32_t tid =
+        __atomic_load_n(&(memory_->GetWatcher(i)->tid.futex), __ATOMIC_RELAXED);
+    if (tid == 0 || tid & FUTEX_OWNER_DIED) {
+      watcher_index_ = i;
+      break;
+    }
+  }
+
+  // Bail if we failed to find an open slot.
+  if (watcher_index_ == -1) {
+    mutex_unlock(&(memory_->queue_setup_lock));
+    return false;
+  }
+
+  Watcher *w = memory_->GetWatcher(watcher_index_);
+
+  w->pid = getpid();
+  w->priority = priority;
+
+  // Grabbing a mutex is a compiler and memory barrier, so nothing before will
+  // get rearranged afterwords.
+  //
+  // Since everything is done under the queue_setup_lock, this should always
+  // return immediately.
+  const int result = mutex_grab(&(w->tid));
+
+  mutex_unlock(&(memory_->queue_setup_lock));
+
+  // We should either get the lock, or the previous owner should have died.
+  // Anything else is a pretty serious error.
+  return result == 0 || result == 1;
+}
+
+void LocklessQueue::UnregisterWakeup() {
+  // Since everything is self consistent, all we need to do is make sure nobody
+  // else is running.  Someone dying will get caught in the generic consistency
+  // check.
+  GrabQueueSetupLockOrDie(memory_);
+
+  // Make sure we are registered.
+  CHECK_NE(watcher_index_, -1);
+
+  // Make sure we still own the slot we are supposed to.
+  CHECK(mutex_islocked(&(memory_->GetWatcher(watcher_index_)->tid)));
+
+  // The act of unlocking invalidates the entry.  Invalidate it.
+  mutex_unlock(&(memory_->GetWatcher(watcher_index_)->tid));
+  // And internally forget the slot.
+  watcher_index_ = -1;
+
+  mutex_unlock(&(memory_->queue_setup_lock));
+}
+
+int LocklessQueue::Wakeup(const int current_priority) {
+  const size_t num_watchers = memory_->num_watchers();
+
+  CHECK_EQ(watcher_copy_.size(), num_watchers);
+
+  // Grab a copy so it won't change out from underneath us, and we can sort it
+  // nicely in C++.
+  // Do note that there is still a window where the process can die *after* we
+  // read everything.  We will still PI boost and send a signal to the thread in
+  // question.  There is no way without pidfd's to close this window, and
+  // creating a pidfd is likely not RT.
+  for (size_t i = 0; i < num_watchers; ++i) {
+    Watcher *w = memory_->GetWatcher(i);
+    // Start by reading the tid.  This needs to be atomic to force it to come first.
+    watcher_copy_[i].tid = __atomic_load_n(&(w->tid.futex), __ATOMIC_SEQ_CST);
+    watcher_copy_[i].pid = w->pid;
+    watcher_copy_[i].priority = w->priority;
+
+    // Use a priority of -1 to mean an invalid entry to make sorting easier.
+    if (watcher_copy_[i].tid & FUTEX_OWNER_DIED || watcher_copy_[i].tid == 0) {
+      watcher_copy_[i].priority = -1;
+    } else if (watcher_copy_[i].tid !=
+               static_cast<pid_t>(
+                   __atomic_load_n(&(w->tid.futex), __ATOMIC_SEQ_CST))) {
+      // Confirm that the watcher hasn't been re-used and modified while we read
+      // it.  If it has, mark it invalid again.
+      watcher_copy_[i].priority = -1;
+      watcher_copy_[i].tid = 0;
+    }
+  }
+
+  // Now sort.
+  ::std::sort(watcher_copy_.begin(), watcher_copy_.end(),
+              [](const WatcherCopy &a, const WatcherCopy &b) {
+                return a.priority > b.priority;
+              });
+
+  int count = 0;
+  if (watcher_copy_[0].priority != -1) {
+    const int max_priority =
+        ::std::max(current_priority, watcher_copy_[0].priority);
+    // Boost if we are RT and there is a higher priority sender out there.
+    // Otherwise we might run into priority inversions.
+    if (max_priority > current_priority && current_priority > 0) {
+      SetCurrentThreadRealtimePriority(max_priority);
+    }
+
+    // Build up the siginfo to send.
+    siginfo_t uinfo;
+    memset(&uinfo, 0, sizeof(uinfo));
+
+    uinfo.si_code = SI_QUEUE;
+    uinfo.si_pid = pid_;
+    uinfo.si_uid = uid_;
+    uinfo.si_value.sival_int = 0;
+
+    for (const WatcherCopy &watcher_copy : watcher_copy_) {
+      // The first -1 priority means we are at the end of the valid list.
+      if (watcher_copy.priority == -1) {
+        break;
+      }
+
+      // Send the signal.  Target just the thread that sent it so that we can
+      // support multiple watchers in a process (when someone creates multiple
+      // event loops in different threads).
+      rt_tgsigqueueinfo(watcher_copy.pid, watcher_copy.tid, kWakeupSignal,
+                        &uinfo);
+
+      ++count;
+    }
+
+    // Drop back down if we were boosted.
+    if (max_priority > current_priority && current_priority > 0) {
+      SetCurrentThreadRealtimePriority(current_priority);
+    }
+  }
+
+  return count;
+}
+
+LocklessQueue::Sender::Sender(LocklessQueueMemory *memory) : memory_(memory) {
+  GrabQueueSetupLockOrDie(memory_);
+
+  // Since we already have the lock, go ahead and try cleaning up.
+  Cleanup(memory_);
+
+  const int num_senders = memory_->num_senders();
+
+  for (int i = 0; i < num_senders; ++i) {
+    ::aos::ipc_lib::Sender *s = memory->GetSender(i);
+    const uint32_t tid = __atomic_load_n(&(s->tid.futex), __ATOMIC_RELAXED);
+    if (tid == 0) {
+      sender_index_ = i;
+      break;
+    }
+  }
+
+  if (sender_index_ == -1) {
+    LOG(FATAL, "Too many senders\n");
+  }
+
+  ::aos::ipc_lib::Sender *s = memory_->GetSender(sender_index_);
+
+  // Atomically grab the mutex.  This signals that we are alive.  If the
+  // previous owner died, we don't care, and want to grab the mutex anyways.
+  const int result = mutex_grab(&(s->tid));
+  CHECK(result == 0 || result == 1);
+
+  mutex_unlock(&(memory->queue_setup_lock));
+}
+
+LocklessQueue::Sender::~Sender() {
+  if (memory_ != nullptr) {
+    mutex_unlock(&(memory_->GetSender(sender_index_)->tid));
+  }
+}
+
+LocklessQueue::Sender LocklessQueue::MakeSender() {
+  return LocklessQueue::Sender(memory_);
+}
+
+QueueIndex ZeroOrValid(QueueIndex index) {
+  if (!index.valid()) {
+    return index.Clear();
+  }
+  return index;
+}
+
+void LocklessQueue::Sender::Send(const char *data, size_t length) {
+  const size_t queue_size = memory_->queue_size();
+  CHECK_LE(length, memory_->message_data_size());
+
+  ::aos::ipc_lib::Sender *sender = memory_->GetSender(sender_index_);
+  Index scratch_index = sender->scratch_index.RelaxedLoad();
+  Message *message = memory_->GetMessage(scratch_index);
+
+  message->header.queue_index.Invalidate();
+
+  message->header.length = length;
+  memcpy(&message->data[0], data, length);
+
+  while (true) {
+    const QueueIndex actual_next_queue_index =
+        memory_->next_queue_index.Load(queue_size);
+    const QueueIndex next_queue_index = ZeroOrValid(actual_next_queue_index);
+
+    const QueueIndex incremented_queue_index = next_queue_index.Increment();
+
+    // TODO(austin): I think we can drop the barrier off this.
+    const Index to_replace = memory_->LoadIndex(next_queue_index);
+
+    const QueueIndex decremented_queue_index =
+        next_queue_index.DecrementBy(queue_size);
+
+    // See if we got beat.  If we did, try to atomically update
+    // next_queue_index in case the previous writer failed and retry.
+    if (!to_replace.IsPlausible(decremented_queue_index)) {
+      // We don't care about the result.  It will either succeed, or we got
+      // beat in fixing it and just need to give up and try again.  If we got
+      // beat multiple times, the only way progress can be made is if the queue
+      // is updated as well.  This means that if we retry reading
+      // next_queue_index, we will be at most off by one and can retry.
+      //
+      // Both require no further action from us.
+      //
+      // TODO(austin): If we are having fairness issues under contention, we
+      // could have a mode bit in next_queue_index, and could use a lock or some
+      // other form of PI boosting to let the higher priority task win.
+      memory_->next_queue_index.CompareAndExchangeStrong(
+          actual_next_queue_index, incremented_queue_index);
+
+      if (kDebug) {
+        printf("We were beat.  Try again.  Was %x, is %x\n", to_replace.get(),
+               decremented_queue_index.index());
+      }
+      continue;
+    }
+
+    // Confirm that the message is what it should be.
+    {
+      // We just need this to be atomic and after the index has been calculated
+      // and before we exchange the index back in.  Both of those will be strong
+      // barriers, so this is fine.
+      const QueueIndex previous_index =
+          memory_->GetMessage(to_replace)
+              ->header.queue_index.RelaxedLoad(queue_size);
+      if (previous_index != decremented_queue_index && previous_index.valid()) {
+        // Retry.
+        if (kDebug) {
+          printf(
+              "Something fishy happened, queue index doesn't match.  Retrying. "
+              " Previous index was %x, should be %x\n",
+              previous_index.index(), decremented_queue_index.index());
+        }
+        continue;
+      }
+    }
+
+    message->header.monotonic_sent_time = ::aos::monotonic_clock::now();
+    message->header.realtime_sent_time = ::aos::realtime_clock::now();
+
+    // Before we are fully done filling out the message, update the Sender state
+    // with the new index to write.  This re-uses the barrier for the
+    // queue_index store.
+    const Index index_to_write(next_queue_index,
+                               scratch_index.message_index());
+
+    sender->scratch_index.RelaxedStore(index_to_write);
+
+    message->header.queue_index.Store(next_queue_index);
+
+    // The message is now filled out, and we have a confirmed slot to store
+    // into.
+    //
+    // Start by writing down what we are going to pull out of the queue.  This
+    // was Invalid before now.
+    sender->to_replace.RelaxedStore(to_replace);
+
+    // Then exchange the next index into the queue.
+    if (!memory_->GetQueue(next_queue_index.Wrapped())
+             ->CompareAndExchangeStrong(to_replace, index_to_write)) {
+      // Aw, didn't succeed.  Retry.
+      sender->to_replace.RelaxedInvalidate();
+      if (kDebug) {
+        printf("Failed to wrap into queue\n");
+      }
+      continue;
+    }
+
+    // Then update next_queue_index to save the next user some computation time.
+    memory_->next_queue_index.CompareAndExchangeStrong(actual_next_queue_index,
+                                                       incremented_queue_index);
+
+    // Now update the scratch space and record that we succeeded.
+    sender->scratch_index.Store(to_replace);
+    // And then clear out the entry used to replace.  This just needs to be
+    // atomic.  It can't be moved above the store because that is a full
+    // barrier, but delaying it until later will only affect things if something
+    // died.
+    sender->to_replace.RelaxedInvalidate();
+    break;
+  }
+}
+
+LocklessQueue::ReadResult LocklessQueue::Read(
+    uint32_t uint32_queue_index,
+    ::aos::monotonic_clock::time_point *monotonic_sent_time,
+    ::aos::realtime_clock::time_point *realtime_sent_time, size_t *length,
+    char *data) {
+  const size_t queue_size = memory_->queue_size();
+
+  // Build up the QueueIndex.
+  const QueueIndex queue_index =
+      QueueIndex::Zero(queue_size).IncrementBy(uint32_queue_index);
+
+  // Read the message stored at the requested location.
+  Index mi = memory_->LoadIndex(queue_index);
+  Message *m = memory_->GetMessage(mi);
+
+  while (true) {
+    // We need to confirm that the data doesn't change while we are reading it.
+    // Do that by first confirming that the message points to the queue index we
+    // want.
+    const QueueIndex starting_queue_index =
+        m->header.queue_index.Load(queue_size);
+    if (starting_queue_index != queue_index) {
+      // If we found a message that is exactly 1 loop old, we just wrapped.
+      if (starting_queue_index == queue_index.DecrementBy(queue_size)) {
+        if (kDebug) {
+          printf("Matches: %x, %x\n", starting_queue_index.index(),
+                 queue_index.DecrementBy(queue_size).index());
+        }
+        return ReadResult::NOTHING_NEW;
+      } else {
+        // Someone has re-used this message between when we pulled it out of the
+        // queue and when we grabbed its index.  It is pretty hard to deduce
+        // what happened. Just try again.
+        Message *new_m = memory_->GetMessage(queue_index);
+        if (m != new_m) {
+          m = new_m;
+          if (kDebug) {
+            printf("Retrying, m doesn't match\n");
+          }
+          continue;
+        }
+
+        // We have confirmed that message still points to the same message. This
+        // means that the message didn't get swapped out from under us, so
+        // starting_queue_index is correct.
+        //
+        // Either we got too far behind (signaled by this being a valid
+        // message), or this is one of the initial messages which are invalid.
+        if (starting_queue_index.valid()) {
+          if (kDebug) {
+            printf("Too old.  Tried for %x, got %x, behind by %d\n",
+                   queue_index.index(), starting_queue_index.index(),
+                   starting_queue_index.index() - queue_index.index());
+          }
+          return ReadResult::TOO_OLD;
+        }
+
+        if (kDebug) {
+          printf("Initial\n");
+        }
+
+        // There isn't a valid message at this location.
+        //
+        // If someone asks for one of the messages within the first go around,
+        // then they need to wait.  They got ahead.  Otherwise, they are
+        // asking for something crazy, like something before the beginning of
+        // the queue.  Tell them that they are behind.
+        if (uint32_queue_index < memory_->queue_size()) {
+          if (kDebug) {
+            printf("Near zero, %x\n", uint32_queue_index);
+          }
+          return ReadResult::NOTHING_NEW;
+        } else {
+          if (kDebug) {
+            printf("not near zero, %x\n", uint32_queue_index);
+          }
+          return ReadResult::TOO_OLD;
+        }
+      }
+    }
+    if (kDebug) {
+      printf("Eq: %x, %x\n", starting_queue_index.index(), queue_index.index());
+    }
+    break;
+  }
+
+  // Then read the data out.
+  *monotonic_sent_time = m->header.monotonic_sent_time;
+  *realtime_sent_time = m->header.realtime_sent_time;
+  memcpy(data, &m->data[0], m->header.length);
+  *length = m->header.length;
+
+  // And finally, confirm that the message *still* points to the queue index we
+  // want.  This means it didn't change out from under us.
+  // If something changed out from under us, we were reading it much too late in
+  // it's lifetime.
+  const QueueIndex final_queue_index = m->header.queue_index.Load(queue_size);
+  if (final_queue_index != queue_index) {
+    if (kDebug) {
+      printf(
+          "Changed out from under us.  Reading %x, finished with %x, delta: "
+          "%d\n",
+          queue_index.index(), final_queue_index.index(),
+          final_queue_index.index() - queue_index.index());
+    }
+    return ReadResult::TOO_OLD;
+  }
+
+  return ReadResult::GOOD;
+}
+
+uint32_t LocklessQueue::LatestQueueIndex() {
+  const size_t queue_size = memory_->queue_size();
+
+  // There is only one interesting case.  We need to know if the queue is empty.
+  // That is done with a sentinel value.  At worst, this will be off by one.
+  const QueueIndex next_queue_index =
+      memory_->next_queue_index.Load(queue_size);
+  if (next_queue_index.valid()) {
+    const QueueIndex current_queue_index = next_queue_index.DecrementBy(1u);
+    return current_queue_index.index();
+  } else {
+    return empty_queue_index();
+  }
+}
+
+namespace {
+
+// Prints out the mutex state.  Not safe to use while the mutex is being
+// changed.
+::std::string PrintMutex(aos_mutex *mutex) {
+  ::std::stringstream s;
+  s << "aos_mutex(" << ::std::hex << mutex->futex;
+
+  if (mutex->futex != 0) {
+    s << ":";
+    if (mutex->futex & FUTEX_OWNER_DIED) {
+      s << "FUTEX_OWNER_DIED|";
+    }
+    s << "tid=" << (mutex->futex & FUTEX_TID_MASK);
+  }
+
+  s << ")";
+  return s.str();
+}
+
+}  // namespace
+
+void PrintLocklessQueueMemory(LocklessQueueMemory *memory) {
+  const size_t queue_size = memory->queue_size();
+  ::std::cout << "LocklessQueueMemory (" << memory << ") {" << ::std::endl;
+  ::std::cout << "  aos_mutex queue_setup_lock = "
+              << PrintMutex(&memory->queue_setup_lock) << ::std::endl;
+  ::std::cout << "  ::std::atomic<bool> initialized = " << memory->initialized
+              << ::std::endl;
+  ::std::cout << "  config {" << ::std::endl;
+  ::std::cout << "    size_t num_watchers = " << memory->config.num_watchers
+              << ::std::endl;
+  ::std::cout << "    size_t num_senders = " << memory->config.num_senders
+              << ::std::endl;
+  ::std::cout << "    size_t queue_size = " << memory->config.queue_size
+              << ::std::endl;
+  ::std::cout << "    size_t message_data_size = "
+              << memory->config.message_data_size << ::std::endl;
+
+  ::std::cout << "    AtomicQueueIndex next_queue_index = "
+              << memory->next_queue_index.Load(queue_size).DebugString()
+              << ::std::endl;
+
+  ::std::cout << "  }" << ::std::endl;
+  ::std::cout << "  AtomicIndex queue[" << queue_size << "] {" << ::std::endl;
+  for (size_t i = 0; i < queue_size; ++i) {
+    ::std::cout << "    [" << i << "] -> "
+                << memory->GetQueue(i)->Load().DebugString() << ::std::endl;
+  }
+  ::std::cout << "  }" << ::std::endl;
+  ::std::cout << "  Message messages[" << memory->num_messages() << "] {"
+              << ::std::endl;
+  for (size_t i = 0; i < memory->num_messages(); ++i) {
+    Message *m = memory->GetMessage(Index(i, i));
+    ::std::cout << "    [" << i << "] -> Message {" << ::std::endl;
+    ::std::cout << "      Header {" << ::std::endl;
+    ::std::cout << "        AtomicQueueIndex queue_index = "
+                << m->header.queue_index.Load(queue_size).DebugString()
+                << ::std::endl;
+    ::std::cout << "        size_t length = " << m->header.length
+                << ::std::endl;
+    ::std::cout << "      }" << ::std::endl;
+    ::std::cout << "      data: {";
+
+    for (size_t j = 0; j < m->header.length; ++j) {
+      char data = m->data[j];
+      if (j != 0) {
+        ::std::cout << " ";
+      }
+      if (::std::isprint(data)) {
+        ::std::cout << ::std::setfill(' ') << ::std::setw(2) << ::std::hex
+                    << data;
+      } else {
+        ::std::cout << "0x" << ::std::setfill('0') << ::std::setw(2)
+                    << ::std::hex << (static_cast<unsigned>(data) & 0xff);
+      }
+    }
+    ::std::cout << ::std::setfill(' ') << ::std::dec << "}" << ::std::endl;
+    ::std::cout << "    }," << ::std::endl;
+  }
+  ::std::cout << "  }" << ::std::endl;
+
+  ::std::cout << "  Sender senders[" << memory->num_senders() << "] {" << ::std::endl;
+  for (size_t i = 0; i < memory->num_senders(); ++i) {
+    Sender *s = memory->GetSender(i);
+    ::std::cout << "    [" << i << "] -> Sender {" << ::std::endl;
+    ::std::cout << "      aos_mutex tid = " << PrintMutex(&s->tid)
+                << ::std::endl;
+    ::std::cout << "      AtomicIndex scratch_index = "
+                << s->scratch_index.Load().DebugString() << ::std::endl;
+    ::std::cout << "      AtomicIndex to_replace = "
+                << s->to_replace.Load().DebugString() << ::std::endl;
+    ::std::cout << "    }" << ::std::endl;
+  }
+  ::std::cout << "  }" << ::std::endl;
+
+  ::std::cout << "  Watcher watchers[" << memory->num_watchers() << "] {"
+              << ::std::endl;
+  for (size_t i = 0; i < memory->num_watchers(); ++i) {
+    Watcher *w = memory->GetWatcher(i);
+    ::std::cout << "    [" << i << "] -> Watcher {" << ::std::endl;
+    ::std::cout << "      aos_mutex tid = " << PrintMutex(&w->tid)
+                << ::std::endl;
+    ::std::cout << "      pid_t pid = " << w->pid << ::std::endl;
+    ::std::cout << "      int priority = " << w->priority << ::std::endl;
+    ::std::cout << "    }" << ::std::endl;
+  }
+  ::std::cout << "  }" << ::std::endl;
+
+  ::std::cout << "}" << ::std::endl;
+}
+
+}  // namespace ipc_lib
+}  // namespace aos
diff --git a/aos/ipc_lib/lockless_queue.h b/aos/ipc_lib/lockless_queue.h
new file mode 100644
index 0000000..dafc157
--- /dev/null
+++ b/aos/ipc_lib/lockless_queue.h
@@ -0,0 +1,227 @@
+#ifndef AOS_IPC_LIB_LOCKLESS_QUEUE_H_
+#define AOS_IPC_LIB_LOCKLESS_QUEUE_H_
+
+#include <signal.h>
+#include <sys/signalfd.h>
+#include <sys/types.h>
+#include <atomic>
+#include <vector>
+
+#include "aos/ipc_lib/aos_sync.h"
+#include "aos/ipc_lib/index.h"
+#include "aos/time/time.h"
+
+namespace aos {
+namespace ipc_lib {
+
+// Structure to hold the state required to wake a watcher.
+struct Watcher {
+  // Mutex that the watcher locks.  If the futex is 0 (or FUTEX_OWNER_DIED),
+  // then this watcher is invalid.  The futex variable will then hold the tid of
+  // the watcher, or FUTEX_OWNER_DIED if the task died.
+  //
+  // Note: this is modified with a lock held, but is always able to be read.
+  // Any state modification should happen before the lock is acquired.
+  aos_mutex tid;
+
+  // PID of the watcher.
+  pid_t pid;
+
+  // RT priority of the watcher.
+  int priority;
+};
+
+// Structure to hold the state required to send messages.
+struct Sender {
+  // Mutex that the sender locks.  If the futex is 0 (or FUTEX_OWNER_DIED), then
+  // this sender is invalid.  The futex variable will then hold the tid of the
+  // sender, or FUTEX_OWNER_DIED if the task died.
+  //
+  // Note: this is modified with a lock held, but is always able to be read.
+  aos_mutex tid;
+
+  // Index of the message we will be filling out.
+  AtomicIndex scratch_index;
+
+  // Index of the element being swapped with scratch_index, or Invalid if there
+  // is nothing to do.
+  AtomicIndex to_replace;
+};
+
+// Structure representing a message.
+struct Message {
+  struct Header {
+    // Index of this message in the queue.  Needs to match the index this
+    // message is written into the queue at.  The data in this message is only
+    // valid if it matches the index in the queue both before and after all the
+    // data is read.
+    //
+    // Note: a value of 0xffffffff always means that the contents aren't valid.
+    AtomicQueueIndex queue_index;
+
+    // Timestamp of the message.  Needs to be atomically incrementing in the
+    // queue, which means that time needs to be re-sampled every time a write
+    // fails.
+    ::aos::monotonic_clock::time_point monotonic_sent_time;
+    ::aos::realtime_clock::time_point realtime_sent_time;
+
+    size_t length;
+  } header;
+
+  char data[];
+};
+
+struct LocklessQueueConfiguration {
+  // Size of the watchers list.
+  size_t num_watchers;
+  // Size of the sender list.
+  size_t num_senders;
+
+  // Size of the list of pointers into the messages list.
+  size_t queue_size;
+  // Size in bytes of the data stored in each Message.
+  size_t message_data_size;
+
+  size_t message_size() const { return message_data_size + sizeof(Message); }
+
+  size_t num_messages() const { return num_senders + queue_size; }
+};
+
+// Structure to hold the state of the queue.
+//
+// Reads and writes are lockless and constant time.
+//
+// Adding a new watcher doesn't need to be constant time for the watcher (this
+// is done before the watcher goes RT), but needs to be RT for the sender.
+struct LocklessQueueMemory;
+
+// Initializes the queue memory.  memory must be either a valid pointer to the
+// queue datastructure, or must be zero initialized.
+LocklessQueueMemory *InitializeLocklessQueueMemory(
+    LocklessQueueMemory *memory, LocklessQueueConfiguration config);
+
+// Returns the size of the LocklessQueueMemory.
+size_t LocklessQueueMemorySize(LocklessQueueConfiguration config);
+
+// Prints to stdout the data inside the queue for debugging.
+void PrintLocklessQueueMemory(LocklessQueueMemory *memory);
+
+const static int kWakeupSignal = SIGRTMIN + 2;
+
+// Class to manage sending and receiving data in the lockless queue.  This is
+// separate from the actual memory backing the queue so that memory can be
+// managed with mmap to share across the process boundary.
+class LocklessQueue {
+ public:
+  LocklessQueue(LocklessQueueMemory *memory, LocklessQueueConfiguration config);
+  LocklessQueue(const LocklessQueue &) = delete;
+  LocklessQueue &operator=(const LocklessQueue &) = delete;
+
+  ~LocklessQueue();
+
+  // Returns the number of messages in the queue.
+  size_t QueueSize() const;
+
+  // Registers this thread to receive the kWakeupSignal signal when Wakeup is
+  // called. Returns false if there was an error in registration.
+  bool RegisterWakeup(int priority);
+  // Unregisters the wakeup.
+  void UnregisterWakeup();
+
+  // Sends the kWakeupSignal to all threads which have called RegisterWakeup.
+  //
+  // priority of 0 means nonrt.  nonrt could have issues, so we don't PI boost
+  // if nonrt.
+  int Wakeup(int current_priority);
+
+  // If you ask for a queue index 2 past the newest, you will still get
+  // NOTHING_NEW until that gets overwritten with new data.  If you ask for an
+  // element newer than QueueSize() from the current message, we consider it
+  // behind by a large amount and return TOO_OLD.
+  enum class ReadResult { TOO_OLD, GOOD, NOTHING_NEW };
+  ReadResult Read(uint32_t queue_index,
+                  ::aos::monotonic_clock::time_point *monotonic_sent_time,
+                  ::aos::realtime_clock::time_point *realtime_sent_time,
+                  size_t *length, char *data);
+
+  // Returns the index to the latest queue message.  Returns empty_queue_index()
+  // if there are no messages in the queue.  Do note that this index wraps if
+  // more than 2^32 messages are sent.
+  uint32_t LatestQueueIndex();
+  static constexpr uint32_t empty_queue_index() { return 0xffffffff; }
+
+  // TODO(austin): Return the oldest queue index.  This lets us catch up nicely
+  // if we got behind.
+  // The easiest way to implement this is likely going to be to reserve the
+  // first modulo of values for the initial time around, and never reuse them.
+  // That lets us do a simple atomic read of the next index and deduce what has
+  // happened.  It will involve the simplest atomic operations.
+
+  // TODO(austin): Make it so we can find the indices which were sent just
+  // before and after a time with a binary search.
+
+  // Sender for blocks of data.  The resources associated with a sender are
+  // scoped to this object's lifetime.
+  class Sender {
+   public:
+    Sender(const Sender &) = delete;
+    Sender &operator=(const Sender &) = delete;
+    Sender(Sender &&other)
+        : memory_(other.memory_), sender_index_(other.sender_index_) {
+      other.memory_ = nullptr;
+      other.sender_index_ = -1;
+    }
+    Sender &operator=(Sender &&other) {
+      memory_ = other.memory_;
+      sender_index_ = other.sender_index_;
+      other.memory_ = nullptr;
+      other.sender_index_ = -1;
+      return *this;
+    }
+
+    ~Sender();
+
+    // Sends up to length data.  Does not wakeup the target.
+    void Send(const char *data, size_t length);
+
+   private:
+    friend class LocklessQueue;
+
+    Sender(LocklessQueueMemory *memory);
+
+    // Pointer to the backing memory.
+    LocklessQueueMemory *memory_ = nullptr;
+
+    // Index into the sender list.
+    int sender_index_ = -1;
+  };
+
+  // Creates a sender.
+  Sender MakeSender();
+
+ private:
+  LocklessQueueMemory *memory_ = nullptr;
+
+  // Memory and datastructure used to sort a list of watchers to wake
+  // up.  This isn't a copy of Watcher since tid is simpler to work with here
+  // than the futex above.
+  struct WatcherCopy {
+    pid_t tid;
+    pid_t pid;
+    int priority;
+  };
+  // TODO(austin): Don't allocate this memory if we aren't going to send.
+  ::std::vector<WatcherCopy> watcher_copy_;
+
+  // Index in the watcher list that our entry is, or -1 if no watcher is
+  // registered.
+  int watcher_index_ = -1;
+
+  const int pid_;
+  const uid_t uid_;
+};
+
+}  // namespace ipc_lib
+}  // namespace aos
+
+#endif  // AOS_IPC_LIB_LOCKLESS_QUEUE_H_
diff --git a/aos/ipc_lib/lockless_queue_death_test.cc b/aos/ipc_lib/lockless_queue_death_test.cc
new file mode 100644
index 0000000..100821c
--- /dev/null
+++ b/aos/ipc_lib/lockless_queue_death_test.cc
@@ -0,0 +1,615 @@
+#include "aos/ipc_lib/lockless_queue.h"
+
+#include <dlfcn.h>
+#include <inttypes.h>
+#include <linux/futex.h>
+#include <sys/mman.h>
+#include <sys/syscall.h>
+#include <unistd.h>
+#include <wait.h>
+#include <chrono>
+#include <memory>
+#include <thread>
+
+#include "aos/init.h"
+#include "aos/ipc_lib/aos_sync.h"
+#include "aos/ipc_lib/lockless_queue_memory.h"
+#include "aos/libc/aos_strsignal.h"
+#include "aos/logging/logging.h"
+#include "aos/testing/prevent_exit.h"
+#include "aos/testing/test_logging.h"
+#include "gflags/gflags.h"
+#include "gtest/gtest.h"
+
+namespace aos {
+namespace ipc_lib {
+namespace testing {
+
+namespace chrono = ::std::chrono;
+
+#if ((defined(AOS_SANITIZER_address) || defined(AOS_SANITIZER_thread)) && \
+     !defined(__clang__) && __GNUC__ <= 4 && __GNUC_MINOR__ <= 8) ||      \
+    defined(__ARM_EABI__)
+// There are various reasons why we might not actually be able to do this
+// testing, but we still want to run the functions to test anything they test
+// other than shm robustness.
+//
+// GCC 4.8 has too old of a version of asan to allow SIGSEGV handler
+// chaining.
+//
+// GCC 4.8's tsan doesn't work with the code for calling the real sigaction to
+// work arounds its weirdness of the SIGTRAP handler.
+//
+// ARM doesn't have a nice way to do single-stepping, and I don't feel like
+// dealing with editing the next instruction to be a breakpoint and then
+// changing it back.
+#else
+
+// This code currently supports amd64 only, but it
+// shouldn't be hard to port to i386 (it should just be using the name for
+// only the smaller part of the flags register), but that's not tested, and
+// porting it to any other architecture is more work.
+// Currently, we skip doing anything exciting on arm (just run the code without
+// any robustness testing) and refuse to compile anywhere else.
+
+#define SIMPLE_ASSERT(condition, message)                                 \
+  do {                                                                    \
+    if (!(condition)) {                                                   \
+      static const char kMessage[] = message "\n";                        \
+      if (write(STDERR_FILENO, kMessage, sizeof(kMessage) - 1) !=         \
+          (sizeof(kMessage) - 1)) {                                       \
+        static const char kFailureMessage[] = "writing failed\n";         \
+        __attribute__((unused)) int ignore = write(                       \
+            STDERR_FILENO, kFailureMessage, sizeof(kFailureMessage) - 1); \
+      }                                                                   \
+      abort();                                                            \
+    }                                                                     \
+  } while (false)
+
+// Array to track writes to memory, and make sure they happen in the right
+// order.
+class WritesArray {
+ public:
+  uintptr_t At(size_t location) const {
+    SIMPLE_ASSERT(location < size_, "too far into writes array");
+    return writes_[location];
+  }
+  void Add(uintptr_t pointer) {
+    SIMPLE_ASSERT(size_ < kSize, "too many writes");
+    writes_[size_++] = pointer;
+  }
+
+  size_t size() const { return size_; }
+
+ private:
+  static const size_t kSize = 20000;
+
+  uintptr_t writes_[kSize];
+  size_t size_ = 0;
+};
+
+enum class DieAtState {
+  // No SEGVs should be happening.
+  kDisabled,
+  // SEGVs are fine.  Normal operation.
+  kRunning,
+  // We are manipulating a mutex.  No SEGVs should be happening.
+  kWriting,
+};
+
+// What we exit with when we're exiting in the middle.
+const int kExitEarlyValue = 123;
+
+// We have to keep track of everything in a global variable because there's no
+// other way for the signal handlers to find it.
+struct GlobalState {
+  // Pointer to the queue memory, and its size.
+  void *lockless_queue_memory;
+  size_t lockless_queue_memory_size;
+
+  // Pointer to a second block of memory the same size.  This (on purpose) has
+  // the same size as lockless_queue_memory so we can point the robust mutexes
+  // here.
+  void *lockless_queue_memory_lock_backup;
+
+  // Expected writes.
+  const WritesArray *writes_in;
+  // Actual writes.
+  WritesArray *writes_out;
+  // Location to die at, and how far we have gotten.
+  size_t die_at, current_location;
+  // State.
+  DieAtState state;
+};
+::std::atomic<GlobalState *> global_state;
+
+#ifndef __ARM_EABI__
+#ifndef __x86_64__
+#error This code only works on amd64.
+#endif
+
+// The "trap bit" which enables single-stepping for x86.
+const greg_t kTrapFlag = 1 << 8;
+
+// Returns true if the address is in the queue memory chunk.
+bool IsInLocklessQueueMemory(void *address) {
+  GlobalState *my_global_state = global_state.load(::std::memory_order_relaxed);
+  void *lockless_queue_memory = my_global_state->lockless_queue_memory;
+  if (address < lockless_queue_memory) {
+    return false;
+    if (reinterpret_cast<uintptr_t>(address) >
+        reinterpret_cast<uintptr_t>(lockless_queue_memory) +
+            my_global_state->lockless_queue_memory_size)
+      return false;
+  }
+  return true;
+}
+
+// Calls mprotect(2) for the entire shared memory region with the given prot.
+void ShmProtectOrDie(int prot) {
+  GlobalState *my_global_state = global_state.load(::std::memory_order_relaxed);
+  if (mprotect(my_global_state->lockless_queue_memory,
+               my_global_state->lockless_queue_memory_size, prot) == -1) {
+    PLOG(FATAL, "mprotect(%p, %zu, %x) failed",
+         my_global_state->lockless_queue_memory,
+         my_global_state->lockless_queue_memory_size, prot);
+  }
+}
+
+// Checks a write into the queue and conditionally dies.  Tracks the write.
+void HandleWrite(void *address) {
+  GlobalState *my_global_state = global_state.load(::std::memory_order_relaxed);
+  uintptr_t address_offset =
+      reinterpret_cast<uintptr_t>(address) -
+      reinterpret_cast<uintptr_t>(my_global_state->lockless_queue_memory);
+  if (my_global_state->writes_in != nullptr) {
+    SIMPLE_ASSERT(my_global_state->writes_in->At(
+                      my_global_state->current_location) == address_offset,
+                  "wrong write order");
+  }
+  if (my_global_state->writes_out != nullptr) {
+    my_global_state->writes_out->Add(address_offset);
+  }
+  if (my_global_state->die_at != 0) {
+    if (my_global_state->die_at == my_global_state->current_location) {
+      _exit(kExitEarlyValue);
+    }
+  }
+  ++my_global_state->current_location;
+}
+
+struct sigaction old_segv_handler, old_trap_handler;
+
+// Calls the original signal handler.
+bool CallChainedAction(const struct sigaction &action, int signal,
+                       siginfo_t *siginfo, void *context) {
+  if (action.sa_handler == SIG_IGN || action.sa_handler == SIG_DFL) {
+    return false;
+  }
+  if (action.sa_flags & SA_SIGINFO) {
+    action.sa_sigaction(signal, siginfo, context);
+  } else {
+    action.sa_handler(signal);
+  }
+  return true;
+}
+
+void segv_handler(int signal, siginfo_t *siginfo, void *context_void) {
+  GlobalState *my_global_state = global_state.load(::std::memory_order_relaxed);
+  const int saved_errno = errno;
+  SIMPLE_ASSERT(signal == SIGSEGV, "wrong signal for SIGSEGV handler");
+
+  ucontext_t *const context = static_cast<ucontext_t *>(context_void);
+  // Only process memory addresses in our shared memory block.
+  if (!IsInLocklessQueueMemory(siginfo->si_addr)) {
+    if (CallChainedAction(old_segv_handler, signal, siginfo, context_void)) {
+      errno = saved_errno;
+      return;
+    } else {
+      SIMPLE_ASSERT(false, "actual SIGSEGV");
+    }
+  }
+  SIMPLE_ASSERT(my_global_state->state == DieAtState::kRunning,
+                "bad state for SIGSEGV");
+
+  HandleWrite(siginfo->si_addr);
+
+  ShmProtectOrDie(PROT_READ | PROT_WRITE);
+  context->uc_mcontext.gregs[REG_EFL] |= kTrapFlag;
+  my_global_state->state = DieAtState::kWriting;
+  errno = saved_errno;
+}
+
+// A mutex lock is about to happen.  Mark the memory rw, and check to see if we
+// should die.
+void futex_before(void *address, bool) {
+  GlobalState *my_global_state = global_state.load(::std::memory_order_relaxed);
+  if (IsInLocklessQueueMemory(address)) {
+    assert(my_global_state->state == DieAtState::kRunning);
+    HandleWrite(address);
+    ShmProtectOrDie(PROT_READ | PROT_WRITE);
+    my_global_state->state = DieAtState::kWriting;
+  }
+}
+
+// The SEGV handler has set a breakpoint 1 instruction in the future.  This
+// clears it, marks memory readonly, and continues.
+void trap_handler(int signal, siginfo_t *, void *context_void) {
+  GlobalState *my_global_state = global_state.load(::std::memory_order_relaxed);
+  const int saved_errno = errno;
+  SIMPLE_ASSERT(signal == SIGTRAP, "wrong signal for SIGTRAP handler");
+
+  ucontext_t *const context = static_cast<ucontext_t *>(context_void);
+
+  context->uc_mcontext.gregs[REG_EFL] &= ~kTrapFlag;
+  SIMPLE_ASSERT(my_global_state->state == DieAtState::kWriting,
+                "bad state for SIGTRAP");
+  ShmProtectOrDie(PROT_READ);
+  my_global_state->state = DieAtState::kRunning;
+  errno = saved_errno;
+}
+
+// We have a manual trap for mutexes.  Check to see if we were supposed to die
+// on this write (the compare/exchange for the mutex), and mark the memory ro
+// again.
+void futex_after(void *address, bool) {
+  GlobalState *my_global_state = global_state.load(::std::memory_order_relaxed);
+  if (IsInLocklessQueueMemory(address)) {
+    assert(my_global_state->state == DieAtState::kWriting);
+    ShmProtectOrDie(PROT_READ);
+    my_global_state->state = DieAtState::kRunning;
+  }
+}
+
+// Installs the signal handler.
+void InstallHandler(int signal, void (*handler)(int, siginfo_t *, void *),
+                    struct sigaction *old_action) {
+  struct sigaction action;
+  memset(&action, 0, sizeof(action));
+  action.sa_sigaction = handler;
+  action.sa_flags = SA_RESTART | SA_SIGINFO;
+#ifdef AOS_SANITIZER_thread
+  // Tsan messes with signal handlers to check for race conditions, and it
+  // causes problems, so we have to work around it for SIGTRAP.
+  if (signal == SIGTRAP) {
+    typedef int (*SigactionType)(int, const struct sigaction *,
+                                 struct sigaction *);
+    SigactionType real_sigaction =
+        reinterpret_cast<SigactionType>(dlsym(RTLD_NEXT, "sigaction"));
+    if (sigaction == real_sigaction) {
+      LOG(WARNING, "failed to work around tsan signal handling weirdness\n");
+    }
+    PCHECK(real_sigaction(signal, &action, old_action));
+    return;
+  }
+#endif
+  PCHECK(sigaction(signal, &action, old_action));
+}
+
+#endif  // ifndef __ARM_EABI__
+
+// gtest only allows creating fatal failures in functions returning void...
+// status is from wait(2).
+void DetectFatalFailures(int status) {
+  if (WIFEXITED(status)) {
+    FAIL() << " child returned status "
+           << ::std::to_string(WEXITSTATUS(status));
+  } else if (WIFSIGNALED(status)) {
+    FAIL() << " child exited because of signal "
+           << aos_strsignal(WTERMSIG(status));
+  } else {
+    FAIL() << "child exited with status " << ::std::hex << status;
+  }
+}
+
+// Returns true if it runs all the way through.
+bool RunFunctionDieAt(::std::function<void(void *)> prepare,
+                      ::std::function<void(void *)> function,
+                      bool *test_failure, size_t die_at, bool prepare_in_child,
+                      uintptr_t writable_offset, const WritesArray *writes_in,
+                      WritesArray *writes_out) {
+  GlobalState *my_global_state = global_state.load(::std::memory_order_relaxed);
+  my_global_state->writes_in = writes_in;
+  my_global_state->writes_out = writes_out;
+  my_global_state->die_at = die_at;
+  my_global_state->current_location = 0;
+  my_global_state->state = DieAtState::kDisabled;
+
+  if (!prepare_in_child) prepare(my_global_state->lockless_queue_memory);
+
+  const pid_t pid = fork();
+  if (pid == -1) {
+    PLOG(FATAL, "fork() failed\n");
+  }
+  if (pid == 0) {
+    // Run the test.
+    ::aos::testing::PreventExit();
+
+    if (prepare_in_child) prepare(my_global_state->lockless_queue_memory);
+
+    // Update the robust list offset.
+    linux_code::ipc_lib::SetRobustListOffset(writable_offset);
+    // Install a segv handler (to detect writes to the memory block), and a trap
+    // handler so we can single step.
+    InstallHandler(SIGSEGV, segv_handler, &old_segv_handler);
+    InstallHandler(SIGTRAP, trap_handler, &old_trap_handler);
+    CHECK_EQ(old_trap_handler.sa_handler, SIG_DFL);
+    linux_code::ipc_lib::SetFutexAccessorObservers(futex_before, futex_after);
+
+    ShmProtectOrDie(PROT_READ);
+    my_global_state->state = DieAtState::kRunning;
+
+    function(my_global_state->lockless_queue_memory);
+    my_global_state->state = DieAtState::kDisabled;
+    ShmProtectOrDie(PROT_READ | PROT_WRITE);
+    _exit(0);
+  } else {
+    // Wait until the child process dies.
+    while (true) {
+      int status;
+      pid_t waited_on = waitpid(pid, &status, 0);
+      if (waited_on == -1) {
+        if (errno == EINTR) continue;
+        PLOG(FATAL, "waitpid(%jd, %p, 0) failed\n", static_cast<intmax_t>(pid),
+             &status);
+      }
+      if (waited_on != pid) {
+        LOG(FATAL, "waitpid got child %jd instead of %jd\n",
+            static_cast<intmax_t>(waited_on), static_cast<intmax_t>(pid));
+      }
+      if (WIFEXITED(status)) {
+        if (WEXITSTATUS(status) == 0) return true;
+        if (WEXITSTATUS(status) == kExitEarlyValue) return false;
+      }
+      DetectFatalFailures(status);
+      if (test_failure) *test_failure = true;
+      return false;
+    }
+  }
+}
+
+bool RunFunctionDieAtAndCheck(const LocklessQueueConfiguration &config,
+                              ::std::function<void(void *)> prepare,
+                              ::std::function<void(void *)> function,
+                              ::std::function<void(void *)> check,
+                              bool *test_failure, size_t die_at,
+                              bool prepare_in_child,
+                              const WritesArray *writes_in,
+                              WritesArray *writes_out) {
+  // Allocate shared memory.
+  GlobalState *my_global_state = global_state.load(::std::memory_order_relaxed);
+  my_global_state->lockless_queue_memory_size = LocklessQueueMemorySize(config);
+  my_global_state->lockless_queue_memory = static_cast<void *>(
+      mmap(nullptr, my_global_state->lockless_queue_memory_size,
+           PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0));
+  CHECK_NE(MAP_FAILED, my_global_state->lockless_queue_memory);
+
+  // And the backup used to point the robust list at.
+  my_global_state->lockless_queue_memory_lock_backup = static_cast<void *>(
+      mmap(nullptr, my_global_state->lockless_queue_memory_size,
+           PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0));
+  CHECK_NE(MAP_FAILED, my_global_state->lockless_queue_memory_lock_backup);
+
+  // The writable offset tells us how to convert from a pointer in the queue to
+  // a pointer that is safe to write.  This is so robust futexes don't spin the
+  // kernel when the user maps a page PROT_READ, and the kernel tries to clear
+  // the futex there.
+  const uintptr_t writable_offset =
+      reinterpret_cast<uintptr_t>(
+          my_global_state->lockless_queue_memory_lock_backup) -
+      reinterpret_cast<uintptr_t>(my_global_state->lockless_queue_memory);
+
+  bool r;
+  // Do the actual test in a new thread so any locked mutexes will be cleaned up
+  // nicely with owner-died at the end.
+  ::std::thread test_thread([&prepare, &function, &check, test_failure, die_at,
+                             prepare_in_child, writes_in, writes_out,
+                             writable_offset, &r]() {
+    r = RunFunctionDieAt(prepare, function, test_failure, die_at,
+                         prepare_in_child, writable_offset, writes_in,
+                         writes_out);
+    if (::testing::Test::HasFailure()) {
+      r = false;
+      if (test_failure) *test_failure = true;
+      return;
+    }
+
+    check(
+        global_state.load(::std::memory_order_relaxed)->lockless_queue_memory);
+  });
+  test_thread.join();
+  return r;
+}
+
+// Tests function to make sure it handles dying after each store it makes to
+// shared memory. check should make sure function behaved correctly.
+// This will repeatedly create a new TestSharedMemory, run prepare, run
+// function, and then
+// run check, killing the process function is running in at various points. It
+// will stop if anything reports a fatal gtest failure.
+//
+// prepare_in_child being true means the prepare function will be run in the
+// child instead of the parent which doesn't die. This means that reference
+// counts on any objects it allocates won't be duplicated, but it also means
+// that any variables it sets will not be visible in check etc.
+void TestShmRobustness(const LocklessQueueConfiguration &config,
+                       ::std::function<void(void *)> prepare,
+                       ::std::function<void(void *)> function,
+                       ::std::function<void(void *)> check,
+                       bool prepare_in_child) {
+  // Map the global state and memory for the Writes array so it exists across
+  // the process boundary.
+  void *shared_allocations = static_cast<GlobalState *>(
+      mmap(nullptr, sizeof(GlobalState) + sizeof(WritesArray),
+           PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0));
+  CHECK_NE(MAP_FAILED, shared_allocations);
+
+  global_state.store(static_cast<GlobalState *>(shared_allocations));
+  shared_allocations = static_cast<void *>(
+      static_cast<uint8_t *>(shared_allocations) + sizeof(GlobalState));
+  WritesArray *expected_writes = static_cast<WritesArray *>(shared_allocations);
+  new (expected_writes) WritesArray();
+
+  bool test_failed = false;
+  ASSERT_TRUE(RunFunctionDieAtAndCheck(config, prepare, function, check,
+                                       &test_failed, 0, prepare_in_child,
+                                       nullptr, expected_writes));
+  if (test_failed) {
+    ADD_FAILURE();
+    return;
+  }
+
+  size_t die_at = 1;
+  while (true) {
+    SCOPED_TRACE("dying at " + ::std::to_string(die_at) + "/" +
+                 ::std::to_string(expected_writes->size()));
+    if (RunFunctionDieAtAndCheck(config, prepare, function, check, &test_failed,
+                                 die_at, prepare_in_child, expected_writes,
+                                 nullptr)) {
+      return;
+    }
+    if (test_failed) {
+      ADD_FAILURE();
+    }
+    if (::testing::Test::HasFailure()) return;
+    ++die_at;
+  }
+}
+
+namespace {
+pid_t gettid() { return syscall(SYS_gettid); }
+
+// Sets FUTEX_OWNER_DIED if the owner was tid.  This fakes what the kernel does
+// with a robust mutex.
+bool PretendOwnerDied(aos_mutex *mutex, pid_t tid) {
+  if ((mutex->futex & FUTEX_TID_MASK) == tid) {
+    mutex->futex = FUTEX_OWNER_DIED;
+    return true;
+  }
+  return false;
+}
+}  // namespace
+
+// Tests that death during sends is recovered from correctly.
+TEST(LocklessQueueTest, Death) {
+  ::aos::testing::EnableTestLogging();
+  // Capture the tid in the child so we can tell if it died.  Use mmap so it
+  // works across the process boundary.
+  pid_t *tid =
+      static_cast<pid_t *>(mmap(nullptr, sizeof(pid_t), PROT_READ | PROT_WRITE,
+                                MAP_SHARED | MAP_ANONYMOUS, -1, 0));
+  CHECK_NE(MAP_FAILED, tid);
+
+  // Make a small queue so it is easier to debug.
+  LocklessQueueConfiguration config;
+  config.num_watchers = 2;
+  config.num_senders = 2;
+  config.queue_size = 4;
+  config.message_data_size = 32;
+
+  TestShmRobustness(
+      config,
+      [this, config, tid](void *memory) {
+        // Initialize the queue and grab the tid.
+        LocklessQueue queue(
+            reinterpret_cast<aos::ipc_lib::LocklessQueueMemory *>(memory),
+            config);
+        *tid = gettid();
+      },
+      [this, config](void *memory) {
+        // Now try to write 2 messages.  We will get killed a bunch as this
+        // tries to happen.
+        LocklessQueue queue(
+            reinterpret_cast<aos::ipc_lib::LocklessQueueMemory *>(memory),
+            config);
+        LocklessQueue::Sender sender = queue.MakeSender();
+        for (int i = 0; i < 2; ++i) {
+          char data[100];
+          size_t s = snprintf(data, sizeof(data), "foobar%d", i + 1);
+          sender.Send(data, s + 1);
+        }
+      },
+      [this, config, tid](void *raw_memory) {
+        // Confirm that we can create 2 senders (the number in the queue), and
+        // send a message.  And that all the messages in the queue are valid.
+        ::aos::ipc_lib::LocklessQueueMemory *memory =
+            reinterpret_cast<::aos::ipc_lib::LocklessQueueMemory *>(raw_memory);
+
+        bool print = false;
+
+        // TestShmRobustness doesn't handle robust futexes.  It is happy to just
+        // not crash with them.  We know what they are, and what the tid of the
+        // holder is.  So go pretend to be the kernel and fix it for it.
+        PretendOwnerDied(&memory->queue_setup_lock, *tid);
+
+        for (size_t i = 0; i < config.num_senders; ++i) {
+          if (PretendOwnerDied(&memory->GetSender(i)->tid, *tid)) {
+            // Print out before and after results if a sender died.  That is the
+            // more fun case.
+            print = true;
+          }
+        }
+
+        if (print) {
+          PrintLocklessQueueMemory(memory);
+        }
+
+        LocklessQueue queue(memory, config);
+        // Building and destroying a sender will clean up the queue.
+        { LocklessQueue::Sender sender = queue.MakeSender(); }
+
+        if (print) {
+          printf("Cleaned up version:\n");
+          PrintLocklessQueueMemory(memory);
+        }
+
+        {
+          LocklessQueue::Sender sender = queue.MakeSender();
+          {
+            // Make a second sender to confirm that the slot was freed.
+            // If the sender doesn't get cleaned up, this will fail.
+            LocklessQueue queue2(memory, config);
+            queue2.MakeSender();
+          }
+
+          // Send a message to make sure that the queue still works.
+          char data[100];
+          size_t s = snprintf(data, sizeof(data), "foobar%d", 971);
+          sender.Send(data, s + 1);
+        }
+
+        // Now loop through the queue and make sure the number in the snprintf
+        // increments.
+        char last_data = '0';
+        int i = 0;
+        while (true) {
+          ::aos::monotonic_clock::time_point monotonic_sent_time;
+          ::aos::realtime_clock::time_point realtime_sent_time;
+          char read_data[1024];
+          size_t length;
+
+          LocklessQueue::ReadResult read_result =
+              queue.Read(i, &monotonic_sent_time, &realtime_sent_time, &length,
+                         &(read_data[0]));
+
+          if (read_result != LocklessQueue::ReadResult::GOOD) {
+            break;
+          }
+
+          EXPECT_GT(read_data[6], last_data) << ": Got " << read_data;
+          last_data = read_data[6];
+
+          ++i;
+        }
+
+        // Confirm our message got through.
+        EXPECT_EQ(last_data, '9');
+      },
+      /* prepare_in_child = true */ true);
+}
+
+#endif
+
+}  // namespace testing
+}  // namespace ipc_lib
+}  // namespace aos
diff --git a/aos/ipc_lib/lockless_queue_memory.h b/aos/ipc_lib/lockless_queue_memory.h
new file mode 100644
index 0000000..162403a
--- /dev/null
+++ b/aos/ipc_lib/lockless_queue_memory.h
@@ -0,0 +1,123 @@
+#ifndef AOS_IPC_LIB_LOCKLESS_QUEUE_MEMORY_H_
+#define AOS_IPC_LIB_LOCKLESS_QUEUE_MEMORY_H_
+
+#include <sys/types.h>
+#include <atomic>
+
+#include "aos/ipc_lib/aos_sync.h"
+#include "aos/ipc_lib/index.h"
+#include "aos/time/time.h"
+
+namespace aos {
+namespace ipc_lib {
+
+struct LocklessQueueMemory {
+  // A lot of things get easier if the only lockless writes are when messages
+  // are published.  Do note, the datastructures protected by this lock need to
+  // be consistent at all times because a reader (or writer) may read them
+  // regardless of if the lock is held or not.
+  //
+  // Any non-constant time operations need to be done at queue startup time,
+  // including cleanup.  Cleanup is done when the next queue is opened.
+  aos_mutex queue_setup_lock;
+  ::std::atomic<bool> initialized;
+
+  LocklessQueueConfiguration config;
+
+  // Size of the watchers list.
+  size_t num_watchers() const { return config.num_watchers; }
+  // Size of the sender list.
+  size_t num_senders() const { return config.num_senders; }
+
+  // List of pointers into the messages list.
+  size_t queue_size() const { return config.queue_size; }
+
+  // Size in bytes of each Message.
+  size_t message_size() const { return config.message_size(); }
+  // Size in bytes of the data in each Message.
+  size_t message_data_size() const { return config.message_data_size; }
+
+  size_t num_messages() const { return config.num_messages(); }
+
+  // Index in the queue of the latest message.  This is wrapped modulo the
+  // queue_size when used to look inside the queue for the message pointer.  It
+  // starts out invalid when the queue is empty.
+  //
+  // A message is valid iff its internal index matches the index in the queue.
+  AtomicQueueIndex next_queue_index;
+
+  // There is then memory allocated after this structure.  That memory is used
+  // to store the messages, queue, watchers, and senders.  This is equivalent to
+  // writing:
+  //
+  // AtomicIndex queue[config.queue_size];
+  // Message messages[config.queue_size + config.num_senders];
+  // Watcher watchers[config.num_watchers];
+  // Sender senders[config.num_senders];
+
+  // Aligned pointer to where the data starts.
+  // Use a 64 bit type to require 64 bit alignment of the data inside.
+  uint64_t data[];
+
+  // Memory size functions for all 4 lists.
+  size_t SizeOfQueue() { return SizeOfQueue(config); }
+  static size_t SizeOfQueue(LocklessQueueConfiguration config) {
+    return sizeof(AtomicIndex) * config.queue_size;
+  }
+
+  size_t SizeOfMessages() { return SizeOfMessages(config); }
+  static size_t SizeOfMessages(LocklessQueueConfiguration config) {
+    return config.message_size() * config.num_messages();
+  }
+
+  size_t SizeOfWatchers() { return SizeOfWatchers(config); }
+  static size_t SizeOfWatchers(LocklessQueueConfiguration config) {
+    return sizeof(Watcher) * config.num_watchers;
+  }
+
+  size_t SizeOfSenders() { return SizeOfSenders(config); }
+  static size_t SizeOfSenders(LocklessQueueConfiguration config) {
+    return sizeof(Sender) * config.num_senders;
+  }
+
+  // Getters for each of the 4 lists.
+  Sender *GetSender(size_t sender_index) {
+    return reinterpret_cast<Sender *>(
+        reinterpret_cast<uintptr_t>(&data[0]) + SizeOfQueue() +
+        SizeOfMessages() + SizeOfWatchers() + sender_index * sizeof(Sender));
+  }
+
+  Watcher *GetWatcher(size_t watcher_index) {
+    return reinterpret_cast<Watcher *>(reinterpret_cast<uintptr_t>(&data[0]) +
+                                       SizeOfQueue() + SizeOfMessages() +
+                                       watcher_index * sizeof(Watcher));
+  }
+
+  AtomicIndex *GetQueue(uint32_t index) {
+    return reinterpret_cast<AtomicIndex *>(
+        reinterpret_cast<uintptr_t>(&data[0]) + sizeof(AtomicIndex) * index);
+  }
+
+  // There are num_senders + queue_size messages.  The free list is really the
+  // sender list, since those are messages available to be filled in and sent.
+  // This removes the need to find lost messages when a sender dies.
+  Message *GetMessage(Index index) {
+    return reinterpret_cast<Message *>(reinterpret_cast<uintptr_t>(&data[0]) +
+                                       SizeOfQueue() +
+                                       index.message_index() * message_size());
+  }
+
+
+  // Helpers to fetch messages from the queue.
+  Index LoadIndex(QueueIndex index) {
+    return GetQueue(index.Wrapped())->Load();
+  }
+  Message *GetMessage(QueueIndex index) {
+    return GetMessage(LoadIndex(index));
+  }
+};
+
+}  // namespace ipc_lib
+}  // namespace aos
+
+#endif  // AOS_IPC_LIB_LOCKLESS_QUEUE_MEMORY_H_
diff --git a/aos/ipc_lib/lockless_queue_test.cc b/aos/ipc_lib/lockless_queue_test.cc
new file mode 100644
index 0000000..f3b49b6
--- /dev/null
+++ b/aos/ipc_lib/lockless_queue_test.cc
@@ -0,0 +1,350 @@
+#include "aos/ipc_lib/lockless_queue.h"
+
+#include <inttypes.h>
+#include <signal.h>
+#include <unistd.h>
+#include <wait.h>
+#include <chrono>
+#include <memory>
+#include <random>
+#include <thread>
+
+#include "aos/event.h"
+#include "aos/events/epoll.h"
+#include "aos/init.h"
+#include "aos/ipc_lib/aos_sync.h"
+#include "aos/ipc_lib/queue_racer.h"
+#include "aos/ipc_lib/signalfd.h"
+#include "aos/testing/test_logging.h"
+#include "gflags/gflags.h"
+#include "gtest/gtest.h"
+
+DEFINE_int32(min_iterations, 100,
+             "Minimum number of stress test iterations to run");
+DEFINE_int32(duration, 5, "Number of seconds to test for");
+DEFINE_int32(print_rate, 60, "Number of seconds between status prints");
+
+// The roboRIO can only handle 10 threads before exploding.  Set the default for
+// ARM to 10.
+DEFINE_int32(thread_count,
+#if defined(__ARM_EABI__)
+             10,
+#else
+             100,
+#endif
+             "Number of threads to race");
+
+namespace aos {
+namespace ipc_lib {
+namespace testing {
+
+namespace chrono = ::std::chrono;
+
+class LocklessQueueTest : public ::testing::Test {
+ public:
+  LocklessQueueTest() {
+    ::aos::testing::EnableTestLogging();
+    config_.num_watchers = 10;
+    config_.num_senders = 100;
+    config_.queue_size = 10000;
+    // Exercise the alignment code.  This would throw off alignment.
+    config_.message_data_size = 101;
+
+    // Since our backing store is an array of uint64_t for alignment purposes,
+    // normalize by the size.
+    memory_.resize(LocklessQueueMemorySize(config_) / sizeof(uint64_t));
+
+    Reset();
+  }
+
+  LocklessQueueMemory *get_memory() {
+    return reinterpret_cast<LocklessQueueMemory *>(&(memory_[0]));
+  }
+
+  void Reset() { memset(get_memory(), 0, LocklessQueueMemorySize(config_)); }
+
+  // Runs until the signal is received.
+  void RunUntilWakeup(Event *ready, int priority) {
+    LocklessQueue queue(get_memory(), config_);
+    internal::EPoll epoll;
+    SignalFd signalfd({kWakeupSignal});
+
+    epoll.OnReadable(signalfd.fd(), [&signalfd, &epoll]() {
+      signalfd_siginfo result = signalfd.Read();
+
+      fprintf(stderr, "Got signal: %d\n", result.ssi_signo);
+      epoll.Quit();
+    });
+
+    // Register to be woken up *after* the signalfd is catching the signals.
+    queue.RegisterWakeup(priority);
+
+    // And signal we are now ready.
+    ready->Set();
+
+    epoll.Run();
+
+    // Cleanup.
+    queue.UnregisterWakeup();
+    epoll.DeleteFd(signalfd.fd());
+  }
+
+  // Use a type with enough alignment that we are guarenteed that everything
+  // will be aligned properly on the target platform.
+  ::std::vector<uint64_t> memory_;
+
+  LocklessQueueConfiguration config_;
+};
+
+typedef LocklessQueueTest LocklessQueueDeathTest;
+
+// Tests that wakeup doesn't do anything if nothing was registered.
+TEST_F(LocklessQueueTest, NoWatcherWakeup) {
+  LocklessQueue queue(get_memory(), config_);
+
+  EXPECT_EQ(queue.Wakeup(7), 0);
+}
+
+// Tests that wakeup doesn't do anything if a wakeup was registered and then
+// unregistered.
+TEST_F(LocklessQueueTest, UnregisteredWatcherWakeup) {
+  LocklessQueue queue(get_memory(), config_);
+
+  queue.RegisterWakeup(5);
+  queue.UnregisterWakeup();
+
+  EXPECT_EQ(queue.Wakeup(7), 0);
+}
+
+// Tests that wakeup doesn't do anything if the thread dies.
+TEST_F(LocklessQueueTest, DiedWatcherWakeup) {
+  LocklessQueue queue(get_memory(), config_);
+
+  ::std::thread([this]() {
+    // Use placement new so the destructor doesn't get run.
+    ::std::aligned_storage<sizeof(LocklessQueue), alignof(LocklessQueue)>::type
+        data;
+    LocklessQueue *q = new (&data) LocklessQueue(get_memory(), config_);
+    // Register a wakeup.
+    q->RegisterWakeup(5);
+  }).join();
+
+  EXPECT_EQ(queue.Wakeup(7), 0);
+}
+
+struct WatcherState {
+  ::std::thread t;
+  Event ready;
+};
+
+// Tests that too many watchers fails like expected.
+TEST_F(LocklessQueueTest, TooManyWatchers) {
+  // This is going to be a barrel of monkeys.
+  // We need to spin up a bunch of watchers.  But, they all need to be in
+  // different threads so they have different tids.
+  ::std::vector<WatcherState> queues;
+  // Reserve num_watchers WatcherState objects so the pointer value doesn't
+  // change out from under us below.
+  queues.reserve(config_.num_watchers);
+
+  // Event used to trigger all the threads to unregister.
+  Event cleanup;
+
+  // Start all the threads.
+  for (size_t i = 0; i < config_.num_watchers; ++i) {
+    queues.emplace_back();
+
+    WatcherState *s = &queues.back();
+    queues.back().t = ::std::thread([this, &cleanup, s]() {
+      LocklessQueue q(get_memory(), config_);
+      EXPECT_TRUE(q.RegisterWakeup(0));
+
+      // Signal that this thread is ready.
+      s->ready.Set();
+
+      // And wait until we are asked to shut down.
+      cleanup.Wait();
+
+      q.UnregisterWakeup();
+    });
+  }
+
+  // Wait until all the threads are actually going.
+  for (WatcherState &w : queues) {
+    w.ready.Wait();
+  }
+
+  // Now try to allocate another one.  This will fail.
+  {
+    LocklessQueue queue(get_memory(), config_);
+    EXPECT_FALSE(queue.RegisterWakeup(0));
+  }
+
+  // Trigger the threads to cleanup their resources, and wait unti they are
+  // done.
+  cleanup.Set();
+  for (WatcherState &w : queues) {
+    w.t.join();
+  }
+
+  // We should now be able to allocate a wakeup.
+  {
+    LocklessQueue queue(get_memory(), config_);
+    EXPECT_TRUE(queue.RegisterWakeup(0));
+    queue.UnregisterWakeup();
+  }
+}
+
+// Tests that too many watchers dies like expected.
+TEST_F(LocklessQueueDeathTest, TooManySenders) {
+  EXPECT_DEATH(
+      {
+        ::std::vector<::std::unique_ptr<LocklessQueue>> queues;
+        ::std::vector<LocklessQueue::Sender> senders;
+        for (size_t i = 0; i < config_.num_senders + 1; ++i) {
+          queues.emplace_back(new LocklessQueue(get_memory(), config_));
+          senders.emplace_back(queues.back()->MakeSender());
+        }
+      },
+      "Too many senders");
+}
+
+// Now, start 2 threads and have them receive the signals.
+TEST_F(LocklessQueueTest, WakeUpThreads) {
+  // Confirm that the wakeup signal is in range.
+  EXPECT_LE(kWakeupSignal, SIGRTMAX);
+  EXPECT_GE(kWakeupSignal, SIGRTMIN);
+
+  LocklessQueue queue(get_memory(), config_);
+
+  // Event used to make sure the thread is ready before the test starts.
+  Event ready1;
+  Event ready2;
+
+  // Start the thread.
+  ::std::thread t1([this, &ready1]() { RunUntilWakeup(&ready1, 5); });
+  ::std::thread t2([this, &ready2]() { RunUntilWakeup(&ready2, 4); });
+
+  ready1.Wait();
+  ready2.Wait();
+
+  EXPECT_EQ(queue.Wakeup(3), 2);
+
+  t1.join();
+  t2.join();
+
+  // Clean up afterwords.  We are pretending to be RT when we are really not.
+  // So we will be PI boosted up.
+  UnsetCurrentThreadRealtimePriority();
+}
+
+// Do a simple send test.
+TEST_F(LocklessQueueTest, Send) {
+  LocklessQueue queue(get_memory(), config_);
+
+  LocklessQueue::Sender sender = queue.MakeSender();
+
+  // Send enough messages to wrap.
+  for (int i = 0; i < 20000; ++i) {
+    // Confirm that the queue index makes sense given the number of sends.
+    EXPECT_EQ(queue.LatestQueueIndex(),
+              i == 0 ? LocklessQueue::empty_queue_index() : i - 1);
+
+    // Send a trivial piece of data.
+    char data[100];
+    size_t s = snprintf(data, sizeof(data), "foobar%d", i);
+    sender.Send(data, s);
+
+    // Confirm that the queue index still makes sense.  This is easier since the
+    // empty case has been handled.
+    EXPECT_EQ(queue.LatestQueueIndex(), i);
+
+    // Read a result from 5 in the past.
+    ::aos::monotonic_clock::time_point monotonic_sent_time;
+    ::aos::realtime_clock::time_point realtime_sent_time;
+    char read_data[1024];
+    size_t length;
+
+    QueueIndex index = QueueIndex::Zero(config_.queue_size);
+    if (i - 5 < 0) {
+      index = index.DecrementBy(5 - i);
+    } else {
+      index = index.IncrementBy(i - 5);
+    }
+    LocklessQueue::ReadResult read_result =
+        queue.Read(index.index(), &monotonic_sent_time, &realtime_sent_time,
+                   &length, &(read_data[0]));
+
+    // This should either return GOOD, or TOO_OLD if it is before the start of
+    // the queue.
+    if (read_result != LocklessQueue::ReadResult::GOOD) {
+      EXPECT_EQ(read_result, LocklessQueue::ReadResult::TOO_OLD);
+    }
+  }
+}
+
+// Races a bunch of sending threads to see if it all works.
+TEST_F(LocklessQueueTest, SendRace) {
+  const size_t kNumMessages = 10000 / FLAGS_thread_count;
+
+  ::std::mt19937 generator(0);
+  ::std::uniform_int_distribution<> write_wrap_count_distribution(0, 10);
+  ::std::bernoulli_distribution race_reads_distribution;
+  ::std::bernoulli_distribution wrap_writes_distribution;
+
+  const chrono::seconds print_frequency(FLAGS_print_rate);
+
+  QueueRacer racer(get_memory(), FLAGS_thread_count, kNumMessages, config_);
+  const monotonic_clock::time_point start_time =
+      monotonic_clock::now();
+  const monotonic_clock::time_point end_time =
+      start_time + chrono::seconds(FLAGS_duration);
+
+  monotonic_clock::time_point monotonic_now = start_time;
+  monotonic_clock::time_point next_print_time = start_time + print_frequency;
+  uint64_t messages = 0;
+  for (int i = 0; i < FLAGS_min_iterations || monotonic_now < end_time; ++i) {
+    bool race_reads = race_reads_distribution(generator);
+    int write_wrap_count = write_wrap_count_distribution(generator);
+    if (!wrap_writes_distribution(generator)) {
+      write_wrap_count = 0;
+    }
+    EXPECT_NO_FATAL_FAILURE(racer.RunIteration(race_reads, write_wrap_count))
+        << ": Running with race_reads: " << race_reads
+        << ", and write_wrap_count " << write_wrap_count << " and on iteration "
+        << i;
+
+    messages += racer.CurrentIndex();
+
+    monotonic_now = monotonic_clock::now();
+    if (monotonic_now > next_print_time) {
+      double elapsed_seconds = chrono::duration_cast<chrono::duration<double>>(
+                                   monotonic_now - start_time)
+                                   .count();
+      printf("Finished iteration %d, %f iterations/sec, %f messages/second\n",
+             i, i / elapsed_seconds,
+             static_cast<double>(messages) / elapsed_seconds);
+      next_print_time = monotonic_now + print_frequency;
+    }
+  }
+}
+
+// Send enough messages to wrap the 32 bit send counter.
+TEST_F(LocklessQueueTest, WrappedSend) {
+  uint64_t kNumMessages = 0x100010000ul;
+  QueueRacer racer(get_memory(), 1, kNumMessages, config_);
+
+  const monotonic_clock::time_point start_time = monotonic_clock::now();
+  EXPECT_NO_FATAL_FAILURE(racer.RunIteration(false, 0));
+  const monotonic_clock::time_point monotonic_now = monotonic_clock::now();
+  double elapsed_seconds = chrono::duration_cast<chrono::duration<double>>(
+                               monotonic_now - start_time)
+                               .count();
+  printf("Took %f seconds to write %" PRIu64 " messages, %f messages/s\n",
+         elapsed_seconds, kNumMessages,
+         static_cast<double>(kNumMessages) / elapsed_seconds);
+}
+
+}  // namespace testing
+}  // namespace ipc_lib
+}  // namespace aos
diff --git a/aos/ipc_lib/queue_racer.cc b/aos/ipc_lib/queue_racer.cc
new file mode 100644
index 0000000..53490a0
--- /dev/null
+++ b/aos/ipc_lib/queue_racer.cc
@@ -0,0 +1,309 @@
+#include "aos/ipc_lib/queue_racer.h"
+
+#include <inttypes.h>
+#include <string.h>
+#include <limits>
+
+#include "aos/event.h"
+#include "gtest/gtest.h"
+
+namespace aos {
+namespace ipc_lib {
+namespace {
+
+struct ThreadPlusCount {
+  int thread;
+  uint64_t count;
+};
+
+}  // namespace
+
+struct ThreadState {
+  ::std::thread thread;
+  Event ready;
+  uint64_t event_count = ::std::numeric_limits<uint64_t>::max();
+};
+
+QueueRacer::QueueRacer(LocklessQueueMemory *memory, int num_threads,
+                       uint64_t num_messages, LocklessQueueConfiguration config)
+    : memory_(memory),
+      num_threads_(num_threads),
+      num_messages_(num_messages),
+      config_(config) {
+  Reset();
+}
+
+void QueueRacer::RunIteration(bool race_reads, int write_wrap_count) {
+  const bool will_wrap = num_messages_ * num_threads_ *
+                             static_cast<uint64_t>(1 + write_wrap_count) >
+                         config_.queue_size;
+
+  // Clear out shmem.
+  Reset();
+  started_writes_ = 0;
+  finished_writes_ = 0;
+
+  // Event used to start all the threads processing at once.
+  Event run;
+
+  ::std::atomic<bool> poll_index;
+  poll_index = true;
+
+  // List of threads.
+  ::std::vector<ThreadState> threads(num_threads_);
+
+  ::std::thread queue_index_racer([this, &poll_index]() {
+    LocklessQueue queue(memory_, config_);
+
+    // Track the number of times we wrap, and cache the modulo.
+    uint64_t wrap_count = 0;
+    uint32_t last_queue_index = 0;
+    const uint32_t max_queue_index =
+        QueueIndex::MaxIndex(0xffffffffu, queue.QueueSize());
+    while (poll_index) {
+      // We want to read everything backwards.  This will give us conservative
+      // bounds.  And with enough time and randomness, we will see all the cases
+      // we care to see.
+
+      // These 3 numbers look at the same thing, but at different points of time
+      // in the process.  The process (essentially) looks like:
+      //
+      // ++started_writes;
+      // ++latest_queue_index;
+      // ++finished_writes;
+      //
+      // We want to check that latest_queue_index is bounded by the number of
+      // writes started and finished.  Basically, we can say that
+      // finished_writes < latest_queue_index always.  And
+      // latest_queue_index < started_writes.  And everything always increases.
+      // So, if we let more time elapse between sampling finished_writes and
+      // latest_queue_index, we will only be relaxing our bounds, not
+      // invalidating the check.  The same goes for started_writes.
+      //
+      // So, grab them in order.
+      const uint64_t finished_writes = finished_writes_.load();
+      const uint32_t latest_queue_index_uint32_t = queue.LatestQueueIndex();
+      const uint64_t started_writes = started_writes_.load();
+
+      uint64_t latest_queue_index = latest_queue_index_uint32_t;
+
+      if (latest_queue_index_uint32_t != LocklessQueue::empty_queue_index()) {
+        // If we got smaller, we wrapped.
+        if (latest_queue_index_uint32_t < last_queue_index) {
+          ++wrap_count;
+        }
+        // And apply it.
+        latest_queue_index +=
+            static_cast<uint64_t>(max_queue_index) * wrap_count;
+        last_queue_index = latest_queue_index_uint32_t;
+      }
+
+      // For grins, check that we have always started more than we finished.
+      // Should never fail.
+      EXPECT_GE(started_writes, finished_writes);
+
+      // If we are at the beginning, the queue needs to always return empty.
+      if (started_writes == 0) {
+        EXPECT_EQ(latest_queue_index_uint32_t,
+                  LocklessQueue::empty_queue_index());
+        EXPECT_EQ(finished_writes, 0);
+      } else {
+        if (finished_writes == 0) {
+          // Plausible to be at the beginning.
+          if (latest_queue_index_uint32_t !=
+              LocklessQueue::empty_queue_index()) {
+            // Otherwise, we have started.  The queue is always allowed to
+            EXPECT_GE(started_writes, latest_queue_index + 1);
+          }
+        } else {
+          EXPECT_NE(latest_queue_index_uint32_t,
+                    LocklessQueue::empty_queue_index());
+          // latest_queue_index is an index, not a count.  So it always reads 1
+          // low.
+          EXPECT_GE(latest_queue_index + 1, finished_writes);
+        }
+      }
+    }
+  });
+
+  // Build up each thread and kick it off.
+  int thread_index = 0;
+  for (ThreadState &t : threads) {
+    if (will_wrap) {
+      t.event_count = ::std::numeric_limits<uint64_t>::max();
+    } else {
+      t.event_count = 0;
+    }
+    t.thread =
+        ::std::thread([this, &t, thread_index, &run, write_wrap_count]() {
+          // Build up a sender.
+          LocklessQueue queue(memory_, config_);
+          LocklessQueue::Sender sender = queue.MakeSender();
+
+          // Signal that we are ready to start sending.
+          t.ready.Set();
+
+          // Wait until signaled to start running.
+          run.Wait();
+
+          // Gogogo!
+          for (uint64_t i = 0; i < num_messages_ * static_cast<uint64_t>(1 + write_wrap_count); ++i) {
+            char data[sizeof(ThreadPlusCount)];
+            ThreadPlusCount tpc;
+            tpc.thread = thread_index;
+            tpc.count = i;
+
+            memcpy(data, &tpc, sizeof(ThreadPlusCount));
+
+            if (i % 0x800000 == 0x100000) {
+              fprintf(stderr, "Sent %" PRIu64 ", %f %%\n", i,
+                      static_cast<double>(i) /
+                          static_cast<double>(num_messages_ *
+                                              (1 + write_wrap_count)) *
+                          100.0);
+            }
+
+            ++started_writes_;
+            sender.Send(data, sizeof(ThreadPlusCount));
+            ++finished_writes_;
+          }
+        });
+    ++thread_index;
+  }
+
+  // Wait until all the threads are ready.
+  for (ThreadState &t : threads) {
+    t.ready.Wait();
+  }
+
+  // And start them racing.
+  run.Set();
+
+  // Let all the threads finish before reading if we are supposed to not be
+  // racing reads.
+  if (!race_reads) {
+    for (ThreadState &t : threads) {
+      t.thread.join();
+    }
+    poll_index = false;
+    queue_index_racer.join();
+  }
+
+  CheckReads(race_reads, write_wrap_count, &threads);
+
+  // Reap all the threads.
+  if (race_reads) {
+    for (ThreadState &t : threads) {
+      t.thread.join();
+    }
+    poll_index = false;
+    queue_index_racer.join();
+  }
+
+  // Confirm that the number of writes matches the expected number of writes.
+  ASSERT_EQ(num_threads_ * num_messages_ * (1 + write_wrap_count),
+            started_writes_);
+  ASSERT_EQ(num_threads_ * num_messages_ * (1 + write_wrap_count),
+            finished_writes_);
+
+  // And that every thread sent the right number of messages.
+  for (ThreadState &t : threads) {
+    if (will_wrap) {
+      if (!race_reads) {
+        // If we are wrapping, there is a possibility that a thread writes
+        // everything *before* we can read any of it, and it all gets
+        // overwritten.
+        ASSERT_TRUE(t.event_count == ::std::numeric_limits<uint64_t>::max() ||
+                    t.event_count == (1 + write_wrap_count) * num_messages_)
+            << ": Got " << t.event_count << " events, expected "
+            << (1 + write_wrap_count) * num_messages_;
+      }
+    } else {
+      ASSERT_EQ(t.event_count, num_messages_);
+    }
+  }
+}
+
+void QueueRacer::CheckReads(bool race_reads, int write_wrap_count,
+                            ::std::vector<ThreadState> *threads) {
+  // Now read back the results to double check.
+  LocklessQueue queue(memory_, config_);
+
+  const bool will_wrap =
+      num_messages_ * num_threads_ * (1 + write_wrap_count) > queue.QueueSize();
+
+  monotonic_clock::time_point last_monotonic_sent_time =
+      monotonic_clock::epoch();
+  uint64_t initial_i = 0;
+  if (will_wrap) {
+    initial_i = (1 + write_wrap_count) * num_messages_ * num_threads_ -
+                queue.QueueSize();
+  }
+
+  for (uint64_t i = initial_i;
+       i < (1 + write_wrap_count) * num_messages_ * num_threads_; ++i) {
+    ::aos::monotonic_clock::time_point monotonic_sent_time;
+    ::aos::realtime_clock::time_point realtime_sent_time;
+    size_t length;
+    char read_data[1024];
+
+    // Handle overflowing the message count for the wrap test.
+    const uint32_t wrapped_i = i % static_cast<size_t>(QueueIndex::MaxIndex(
+                                       0xffffffffu, queue.QueueSize()));
+    LocklessQueue::ReadResult read_result =
+        queue.Read(wrapped_i, &monotonic_sent_time, &realtime_sent_time,
+                   &length, &(read_data[0]));
+
+    if (race_reads) {
+      if (read_result == LocklessQueue::ReadResult::NOTHING_NEW) {
+        --i;
+        continue;
+      }
+    }
+
+    if (race_reads && will_wrap) {
+      if (read_result == LocklessQueue::ReadResult::TOO_OLD) {
+        continue;
+      }
+    }
+    // Every message should be good.
+    ASSERT_EQ(read_result, LocklessQueue::ReadResult::GOOD) << ": i is " << i;
+
+    // And, confirm that time never went backwards.
+    ASSERT_GT(monotonic_sent_time, last_monotonic_sent_time);
+    last_monotonic_sent_time = monotonic_sent_time;
+
+    ThreadPlusCount tpc;
+    ASSERT_EQ(length, sizeof(ThreadPlusCount));
+    memcpy(&tpc, read_data, sizeof(ThreadPlusCount));
+
+    if (will_wrap) {
+      // The queue won't chang out from under us, so we should get some amount
+      // of the tail end of the messages from a a thread.
+      // Confirm that once we get our first message, they all show up.
+      if ((*threads)[tpc.thread].event_count ==
+          ::std::numeric_limits<uint64_t>::max()) {
+        (*threads)[tpc.thread].event_count = tpc.count;
+      }
+
+      if (race_reads) {
+        // Make sure nothing goes backwards.  Really not much we can do here.
+        ASSERT_LE((*threads)[tpc.thread].event_count, tpc.count) << ": Thread "
+                                                                 << tpc.thread;
+        (*threads)[tpc.thread].event_count = tpc.count;
+      } else {
+        // Make sure nothing goes backwards.  Really not much we can do here.
+        ASSERT_EQ((*threads)[tpc.thread].event_count, tpc.count) << ": Thread "
+                                                                 << tpc.thread;
+      }
+    } else {
+      // Confirm that we see every message counter from every thread.
+      ASSERT_EQ((*threads)[tpc.thread].event_count, tpc.count) << ": Thread "
+                                                               << tpc.thread;
+    }
+    ++(*threads)[tpc.thread].event_count;
+  }
+}
+
+}  // namespace ipc_lib
+}  // namespace aos
diff --git a/aos/ipc_lib/queue_racer.h b/aos/ipc_lib/queue_racer.h
new file mode 100644
index 0000000..59435b1
--- /dev/null
+++ b/aos/ipc_lib/queue_racer.h
@@ -0,0 +1,70 @@
+#ifndef AOS_IPC_LIB_QUEUE_RACER_H_
+#define AOS_IPC_LIB_QUEUE_RACER_H_
+
+#include <string.h>
+
+#include "aos/ipc_lib/lockless_queue.h"
+
+namespace aos {
+namespace ipc_lib {
+
+struct ThreadState;
+
+// Class to test the queue by spinning up a bunch of writing threads and racing
+// them together to all write at once.
+class QueueRacer {
+ public:
+  QueueRacer(LocklessQueueMemory *memory, int num_threads,
+             uint64_t num_messages, LocklessQueueConfiguration config);
+
+  // Runs an iteration of the race.
+  //
+  // This spins up num_threads, each of which sends num_messages.  These must
+  // both be able to fit in the queue without wrapping.
+  //
+  // Then, this reads back all the messages and confirms that all were received
+  // in order, and none were missed.
+  //
+  // If race_reads is set, start reading (and retry if data isn't ready yet)
+  // while writes are still happening.
+  //
+  // If wrap_writes is nonzero, write enough to overwrite old data.  This
+  // necesitates a loser check at the end.
+  //
+  // If both are set, run an even looser test.
+  void RunIteration(bool race_reads, int write_wrap_count);
+
+  size_t CurrentIndex() {
+    LocklessQueue queue(memory_, config_);
+    return queue.LatestQueueIndex();
+  }
+
+ private:
+  // Wipes the queue memory out so we get a clean start.
+  void Reset() { memset(memory_, 0, LocklessQueueMemorySize(config_)); }
+
+  // This is a separate method so that when all the ASSERT_* methods, we still
+  // clean up all the threads.  Otherwise we get an assert on the way out of
+  // RunIteration instead of getting all the way back to gtest.
+  void CheckReads(bool race_reads, int write_wrap_count,
+                  ::std::vector<ThreadState> *threads);
+
+  LocklessQueueMemory *memory_;
+  const uint64_t num_threads_;
+  const uint64_t num_messages_;
+
+  // The overall number of writes executed will always be between the two of
+  // these.  We can't atomically count writes, so we have to bound them.
+  //
+  // Number of writes about to be started.
+  ::std::atomic<uint64_t> started_writes_;
+  // Number of writes completed.
+  ::std::atomic<uint64_t> finished_writes_;
+
+  const LocklessQueueConfiguration config_;
+};
+
+}  // namespace ipc_lib
+}  // namespace aos
+
+#endif  // AOS_IPC_LIB_QUEUE_RACER_H_