Implement a lockless queue

This uses atomics to swap pointers from an array.  It is safe against
process death, and concurent writes.  It also uses signals to wakeup any
processes waiting for new data.

https://docs.google.com/document/d/10xulameLtEqjBFkm54UcN-5N-w5Q_XFNILvNf1Jl1Y4/edit#

Change-Id: Ie7b2aea6f869c1d84e0705aadb21d33fa8241b60
diff --git a/aos/ipc_lib/lockless_queue.cc b/aos/ipc_lib/lockless_queue.cc
new file mode 100644
index 0000000..fb9c0b6
--- /dev/null
+++ b/aos/ipc_lib/lockless_queue.cc
@@ -0,0 +1,878 @@
+#include "aos/ipc_lib/lockless_queue.h"
+
+#include <linux/futex.h>
+#include <sys/types.h>
+#include <syscall.h>
+#include <unistd.h>
+#include <algorithm>
+#include <iomanip>
+#include <iostream>
+#include <sstream>
+
+#include "aos/init.h"
+#include "aos/ipc_lib/lockless_queue_memory.h"
+#include "aos/logging/logging.h"
+#include "aos/util/compiler_memory_barrier.h"
+
+namespace aos {
+namespace ipc_lib {
+
+namespace {
+
+constexpr bool kDebug = false;
+
+void GrabQueueSetupLockOrDie(LocklessQueueMemory *memory) {
+  const int result = mutex_grab(&(memory->queue_setup_lock));
+  CHECK(result == 0 || result == 1);
+}
+
+// This must be called under the queue_setup_lock.
+void Cleanup(LocklessQueueMemory *memory) {
+  const size_t num_senders = memory->num_senders();
+  const size_t queue_size = memory->queue_size();
+  const size_t num_messages = memory->num_messages();
+
+  // There are a large number of crazy cases here for how things can go wrong
+  // and how we have to recover.  They either require us to keep extra track of
+  // what is going on, slowing down the send path, or require a large number of
+  // cases.
+  //
+  // The solution here is to not over-think it.  This is running while not real
+  // time during construction.  It is allowed to be slow.  It will also very
+  // rarely trigger.  There is a small uS window where process death is
+  // ambiguous.
+  //
+  // So, build up a list N long, where N is the number of messages.  Search
+  // through the entire queue and the sender list (ignoring any dead senders),
+  // and mark down which ones we have seen.  Once we have seen all the messages
+  // except the N dead senders, we know which messages are dead.  Because the
+  // queue is active while we do this, it may take a couple of go arounds to see
+  // everything.
+
+  // Do the easy case.  Find all senders who have died.  See if they are either
+  // consistent already, or if they have copied over to_replace to the scratch
+  // index, but haven't cleared to_replace.  Count them.
+  size_t valid_senders = 0;
+  for (size_t i = 0; i < num_senders; ++i) {
+    Sender *sender = memory->GetSender(i);
+    const uint32_t tid =
+        __atomic_load_n(&(sender->tid.futex), __ATOMIC_RELAXED);
+    if (tid & FUTEX_OWNER_DIED) {
+      if (kDebug) {
+        printf("Found an easy death for sender %zu\n", i);
+      }
+      const Index to_replace = sender->to_replace.RelaxedLoad();
+      const Index scratch_index = sender->scratch_index.Load();
+
+      // I find it easiest to think about this in terms of the set of observable
+      // states.  The main code follows the following states:
+
+      // 1) scratch_index = xxx
+      //    to_replace = invalid
+      // This is unambiguous.  Already good.
+
+      // 2) scratch_index = xxx
+      //    to_replace = yyy
+      // Very ambiguous.  Is xxx or yyy the correct one?  Need to either roll
+      // this forwards or backwards.
+
+      // 3) scratch_index = yyy
+      //    to_replace = yyy
+      // We are in the act of moving to_replace to scratch_index, but didn't
+      // finish.  Easy.
+
+      // 4) scratch_index = yyy
+      //    to_replace = invalid
+      // Finished, but died.  Looks like 1)
+
+      // Any cleanup code needs to follow the same set of states to be robust to
+      // death, so death can be restarted.
+
+      // Could be 2) or 3).
+      if (to_replace.valid()) {
+        // 3)
+        if (to_replace == scratch_index) {
+          // Just need to invalidate to_replace to finish.
+          sender->to_replace.Invalidate();
+
+          // And mark that we succeeded.
+          __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_SEQ_CST);
+          ++valid_senders;
+        }
+      } else {
+        // 1) or 4).  Make sure we aren't corrupted and declare victory.
+        CHECK(scratch_index.valid());
+
+        __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_SEQ_CST);
+        ++valid_senders;
+      }
+    } else {
+      // Not dead.
+      ++valid_senders;
+    }
+  }
+
+  // If all the senders are (or were made) good, there is no need to do the hard
+  // case.
+  if (valid_senders == num_senders) {
+    return;
+  }
+
+  if (kDebug) {
+    printf("Starting hard cleanup\n");
+  }
+
+  size_t num_accounted_for = 0;
+  size_t num_missing = 0;
+  ::std::vector<bool> accounted_for(num_messages, false);
+
+  while ((num_accounted_for + num_missing) != num_messages) {
+    num_missing = 0;
+    for (size_t i = 0; i < num_senders; ++i) {
+      Sender *sender = memory->GetSender(i);
+      const uint32_t tid =
+          __atomic_load_n(&(sender->tid.futex), __ATOMIC_RELAXED);
+      if (tid & FUTEX_OWNER_DIED) {
+        ++num_missing;
+      } else {
+        const Index scratch_index = sender->scratch_index.RelaxedLoad();
+        if (!accounted_for[scratch_index.message_index()]) {
+          ++num_accounted_for;
+        }
+        accounted_for[scratch_index.message_index()] = true;
+      }
+    }
+
+    for (size_t i = 0; i < queue_size; ++i) {
+      const Index index = memory->GetQueue(i)->RelaxedLoad();
+      if (!accounted_for[index.message_index()]) {
+        ++num_accounted_for;
+      }
+      accounted_for[index.message_index()] = true;
+    }
+  }
+
+  while (num_missing != 0) {
+    const size_t starting_num_missing = num_missing;
+    for (size_t i = 0; i < num_senders; ++i) {
+      Sender *sender = memory->GetSender(i);
+      const uint32_t tid =
+          __atomic_load_n(&(sender->tid.futex), __ATOMIC_RELAXED);
+      if (tid & FUTEX_OWNER_DIED) {
+        const Index scratch_index = sender->scratch_index.RelaxedLoad();
+        const Index to_replace = sender->to_replace.RelaxedLoad();
+
+        // Candidate.
+        CHECK_LE(to_replace.message_index(), accounted_for.size());
+        if (accounted_for[to_replace.message_index()]) {
+          if (kDebug) {
+            printf("Sender %zu died, to_replace is already accounted for\n", i);
+          }
+          // If both are accounted for, we are corrupt...
+          CHECK(!accounted_for[scratch_index.message_index()]);
+
+          // to_replace is already accounted for.  This means that we didn't
+          // atomically insert scratch_index into the queue yet.  So
+          // invalidate to_replace.
+          sender->to_replace.Invalidate();
+
+          // And then mark this sender clean.
+          __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_SEQ_CST);
+
+          // And account for scratch_index.
+          accounted_for[scratch_index.message_index()] = true;
+          --num_missing;
+          ++num_accounted_for;
+        } else if (accounted_for[scratch_index.message_index()]) {
+          if (kDebug) {
+            printf("Sender %zu died, scratch_index is already accounted for\n", i);
+          }
+          // scratch_index is accounted for.  That means we did the insert,
+          // but didn't record it.
+          CHECK(to_replace.valid());
+          // Finish the transaction.  Copy to_replace, then clear it.
+
+          sender->scratch_index.Store(to_replace);
+          sender->to_replace.Invalidate();
+
+          // And then mark this sender clean.
+          __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_SEQ_CST);
+
+          // And account for to_replace.
+          accounted_for[to_replace.message_index()] = true;
+          --num_missing;
+          ++num_accounted_for;
+        } else {
+          if (kDebug) {
+            printf("Sender %zu died, neither is accounted for\n", i);
+          }
+          // Ambiguous.  There will be an unambiguous one somewhere that we
+          // can do first.
+        }
+      }
+    }
+    // CHECK that we are making progress.
+    CHECK_NE(num_missing, starting_num_missing);
+  }
+}
+
+// Exposes rt_tgsigqueueinfo so we can send the signal *just* to the target
+// thread.
+int rt_tgsigqueueinfo(pid_t tgid, pid_t tid, int sig, siginfo_t *si) {
+  return syscall(SYS_rt_tgsigqueueinfo, tgid, tid, sig, si);
+}
+
+}  // namespace
+
+size_t LocklessQueueMemorySize(LocklessQueueConfiguration config) {
+  // Round up the message size so following data is double aligned.  That should
+  // be overkill for most platforms.  And the checks below confirms it.
+  config.message_data_size = (config.message_data_size + 7) & ~0x7;
+
+  // As we build up the size, confirm that everything is aligned to the
+  // alignment requirements of the type.
+  size_t size = sizeof(LocklessQueueMemory);
+  CHECK_EQ(size & (alignof(LocklessQueueMemory) - 1), 0u);
+
+  CHECK_EQ(size & (alignof(AtomicIndex) - 1), 0u);
+  size += LocklessQueueMemory::SizeOfQueue(config);
+
+  CHECK_EQ(size & (alignof(Message) - 1), 0u);
+  size += LocklessQueueMemory::SizeOfMessages(config);
+
+  CHECK_EQ(size & (alignof(Watcher) - 1), 0u);
+  size += LocklessQueueMemory::SizeOfWatchers(config);
+
+  CHECK_EQ(size & (alignof(Sender) - 1), 0u);
+  size += LocklessQueueMemory::SizeOfSenders(config);
+
+  return size;
+}
+
+LocklessQueueMemory *InitializeLocklessQueueMemory(
+    LocklessQueueMemory *memory, LocklessQueueConfiguration config) {
+  // Everything should be zero initialized already.  So we just need to fill
+  // everything out properly.
+
+  // Grab the mutex.  We don't care if the previous reader died.  We are going
+  // to check everything anyways.
+  GrabQueueSetupLockOrDie(memory);
+
+  if (!memory->initialized) {
+    // TODO(austin): Check these for out of bounds.
+    memory->config.num_watchers = config.num_watchers;
+    memory->config.num_senders = config.num_senders;
+    memory->config.queue_size = config.queue_size;
+    // Round up to the nearest double word bytes.
+    memory->config.message_data_size = (config.message_data_size + 7) & ~0x7;
+
+    const size_t num_messages = memory->num_messages();
+    // There need to be at most MaxMessages() messages allocated.
+    CHECK_LE(num_messages, Index::MaxMessages());
+
+    for (size_t i = 0; i < num_messages; ++i) {
+      memory->GetMessage(Index(QueueIndex::Zero(memory->queue_size()), i))
+          ->header.queue_index.Invalidate();
+    }
+
+    for (size_t i = 0; i < memory->queue_size(); ++i) {
+      // Make the initial counter be the furthest away number.  That means that
+      // index 0 should be 0xffff, 1 should be 0, etc.
+      memory->GetQueue(i)->Store(Index(QueueIndex::Zero(memory->queue_size())
+                                           .IncrementBy(i)
+                                           .DecrementBy(memory->queue_size()),
+                                       i));
+    }
+
+    memory->next_queue_index.Invalidate();
+
+    for (size_t i = 0; i < memory->num_senders(); ++i) {
+      ::aos::ipc_lib::Sender *s = memory->GetSender(i);
+      s->scratch_index.Store(Index(0xffff, i + memory->queue_size()));
+      s->to_replace.RelaxedInvalidate();
+    }
+
+    // Signal everything is done.  This needs to be done last, so if we die, we
+    // redo initialization.
+    // This is a full atomic (probably overkill), but this is at initialization
+    // time, so it is cheap.
+    memory->initialized.store(true);
+  }
+
+  mutex_unlock(&(memory->queue_setup_lock));
+  return memory;
+}
+
+LocklessQueue::LocklessQueue(LocklessQueueMemory *memory,
+                             LocklessQueueConfiguration config)
+    : memory_(InitializeLocklessQueueMemory(memory, config)),
+      watcher_copy_(memory_->num_watchers()),
+      pid_(getpid()),
+      uid_(getuid()) {}
+
+LocklessQueue::~LocklessQueue() {
+  CHECK_EQ(watcher_index_, -1);
+
+  GrabQueueSetupLockOrDie(memory_);
+  const int num_watchers = memory_->num_watchers();
+  // Cleanup is cheap.  Go for it anyways.
+
+  // And confirm that nothing is owned by us.
+  for (int i = 0; i < num_watchers; ++i) {
+    CHECK(!mutex_islocked(&(memory_->GetWatcher(i)->tid)));
+  }
+  mutex_unlock(&(memory_->queue_setup_lock));
+}
+
+size_t LocklessQueue::QueueSize() const { return memory_->queue_size(); }
+
+bool LocklessQueue::RegisterWakeup(int priority) {
+  // TODO(austin): Make sure signal coalescing is turned on.  We don't need
+  // duplicates.  That will improve performance under high load.
+
+  // Since everything is self consistent, all we need to do is make sure nobody
+  // else is running.  Someone dying will get caught in the generic consistency
+  // check.
+  GrabQueueSetupLockOrDie(memory_);
+  const int num_watchers = memory_->num_watchers();
+
+  // Now, find the first empty watcher and grab it.
+  CHECK_EQ(watcher_index_, -1);
+  for (int i = 0; i < num_watchers; ++i) {
+    const uint32_t tid =
+        __atomic_load_n(&(memory_->GetWatcher(i)->tid.futex), __ATOMIC_RELAXED);
+    if (tid == 0 || tid & FUTEX_OWNER_DIED) {
+      watcher_index_ = i;
+      break;
+    }
+  }
+
+  // Bail if we failed to find an open slot.
+  if (watcher_index_ == -1) {
+    mutex_unlock(&(memory_->queue_setup_lock));
+    return false;
+  }
+
+  Watcher *w = memory_->GetWatcher(watcher_index_);
+
+  w->pid = getpid();
+  w->priority = priority;
+
+  // Grabbing a mutex is a compiler and memory barrier, so nothing before will
+  // get rearranged afterwords.
+  //
+  // Since everything is done under the queue_setup_lock, this should always
+  // return immediately.
+  const int result = mutex_grab(&(w->tid));
+
+  mutex_unlock(&(memory_->queue_setup_lock));
+
+  // We should either get the lock, or the previous owner should have died.
+  // Anything else is a pretty serious error.
+  return result == 0 || result == 1;
+}
+
+void LocklessQueue::UnregisterWakeup() {
+  // Since everything is self consistent, all we need to do is make sure nobody
+  // else is running.  Someone dying will get caught in the generic consistency
+  // check.
+  GrabQueueSetupLockOrDie(memory_);
+
+  // Make sure we are registered.
+  CHECK_NE(watcher_index_, -1);
+
+  // Make sure we still own the slot we are supposed to.
+  CHECK(mutex_islocked(&(memory_->GetWatcher(watcher_index_)->tid)));
+
+  // The act of unlocking invalidates the entry.  Invalidate it.
+  mutex_unlock(&(memory_->GetWatcher(watcher_index_)->tid));
+  // And internally forget the slot.
+  watcher_index_ = -1;
+
+  mutex_unlock(&(memory_->queue_setup_lock));
+}
+
+int LocklessQueue::Wakeup(const int current_priority) {
+  const size_t num_watchers = memory_->num_watchers();
+
+  CHECK_EQ(watcher_copy_.size(), num_watchers);
+
+  // Grab a copy so it won't change out from underneath us, and we can sort it
+  // nicely in C++.
+  // Do note that there is still a window where the process can die *after* we
+  // read everything.  We will still PI boost and send a signal to the thread in
+  // question.  There is no way without pidfd's to close this window, and
+  // creating a pidfd is likely not RT.
+  for (size_t i = 0; i < num_watchers; ++i) {
+    Watcher *w = memory_->GetWatcher(i);
+    // Start by reading the tid.  This needs to be atomic to force it to come first.
+    watcher_copy_[i].tid = __atomic_load_n(&(w->tid.futex), __ATOMIC_SEQ_CST);
+    watcher_copy_[i].pid = w->pid;
+    watcher_copy_[i].priority = w->priority;
+
+    // Use a priority of -1 to mean an invalid entry to make sorting easier.
+    if (watcher_copy_[i].tid & FUTEX_OWNER_DIED || watcher_copy_[i].tid == 0) {
+      watcher_copy_[i].priority = -1;
+    } else if (watcher_copy_[i].tid !=
+               static_cast<pid_t>(
+                   __atomic_load_n(&(w->tid.futex), __ATOMIC_SEQ_CST))) {
+      // Confirm that the watcher hasn't been re-used and modified while we read
+      // it.  If it has, mark it invalid again.
+      watcher_copy_[i].priority = -1;
+      watcher_copy_[i].tid = 0;
+    }
+  }
+
+  // Now sort.
+  ::std::sort(watcher_copy_.begin(), watcher_copy_.end(),
+              [](const WatcherCopy &a, const WatcherCopy &b) {
+                return a.priority > b.priority;
+              });
+
+  int count = 0;
+  if (watcher_copy_[0].priority != -1) {
+    const int max_priority =
+        ::std::max(current_priority, watcher_copy_[0].priority);
+    // Boost if we are RT and there is a higher priority sender out there.
+    // Otherwise we might run into priority inversions.
+    if (max_priority > current_priority && current_priority > 0) {
+      SetCurrentThreadRealtimePriority(max_priority);
+    }
+
+    // Build up the siginfo to send.
+    siginfo_t uinfo;
+    memset(&uinfo, 0, sizeof(uinfo));
+
+    uinfo.si_code = SI_QUEUE;
+    uinfo.si_pid = pid_;
+    uinfo.si_uid = uid_;
+    uinfo.si_value.sival_int = 0;
+
+    for (const WatcherCopy &watcher_copy : watcher_copy_) {
+      // The first -1 priority means we are at the end of the valid list.
+      if (watcher_copy.priority == -1) {
+        break;
+      }
+
+      // Send the signal.  Target just the thread that sent it so that we can
+      // support multiple watchers in a process (when someone creates multiple
+      // event loops in different threads).
+      rt_tgsigqueueinfo(watcher_copy.pid, watcher_copy.tid, kWakeupSignal,
+                        &uinfo);
+
+      ++count;
+    }
+
+    // Drop back down if we were boosted.
+    if (max_priority > current_priority && current_priority > 0) {
+      SetCurrentThreadRealtimePriority(current_priority);
+    }
+  }
+
+  return count;
+}
+
+LocklessQueue::Sender::Sender(LocklessQueueMemory *memory) : memory_(memory) {
+  GrabQueueSetupLockOrDie(memory_);
+
+  // Since we already have the lock, go ahead and try cleaning up.
+  Cleanup(memory_);
+
+  const int num_senders = memory_->num_senders();
+
+  for (int i = 0; i < num_senders; ++i) {
+    ::aos::ipc_lib::Sender *s = memory->GetSender(i);
+    const uint32_t tid = __atomic_load_n(&(s->tid.futex), __ATOMIC_RELAXED);
+    if (tid == 0) {
+      sender_index_ = i;
+      break;
+    }
+  }
+
+  if (sender_index_ == -1) {
+    LOG(FATAL, "Too many senders\n");
+  }
+
+  ::aos::ipc_lib::Sender *s = memory_->GetSender(sender_index_);
+
+  // Atomically grab the mutex.  This signals that we are alive.  If the
+  // previous owner died, we don't care, and want to grab the mutex anyways.
+  const int result = mutex_grab(&(s->tid));
+  CHECK(result == 0 || result == 1);
+
+  mutex_unlock(&(memory->queue_setup_lock));
+}
+
+LocklessQueue::Sender::~Sender() {
+  if (memory_ != nullptr) {
+    mutex_unlock(&(memory_->GetSender(sender_index_)->tid));
+  }
+}
+
+LocklessQueue::Sender LocklessQueue::MakeSender() {
+  return LocklessQueue::Sender(memory_);
+}
+
+QueueIndex ZeroOrValid(QueueIndex index) {
+  if (!index.valid()) {
+    return index.Clear();
+  }
+  return index;
+}
+
+void LocklessQueue::Sender::Send(const char *data, size_t length) {
+  const size_t queue_size = memory_->queue_size();
+  CHECK_LE(length, memory_->message_data_size());
+
+  ::aos::ipc_lib::Sender *sender = memory_->GetSender(sender_index_);
+  Index scratch_index = sender->scratch_index.RelaxedLoad();
+  Message *message = memory_->GetMessage(scratch_index);
+
+  message->header.queue_index.Invalidate();
+
+  message->header.length = length;
+  memcpy(&message->data[0], data, length);
+
+  while (true) {
+    const QueueIndex actual_next_queue_index =
+        memory_->next_queue_index.Load(queue_size);
+    const QueueIndex next_queue_index = ZeroOrValid(actual_next_queue_index);
+
+    const QueueIndex incremented_queue_index = next_queue_index.Increment();
+
+    // TODO(austin): I think we can drop the barrier off this.
+    const Index to_replace = memory_->LoadIndex(next_queue_index);
+
+    const QueueIndex decremented_queue_index =
+        next_queue_index.DecrementBy(queue_size);
+
+    // See if we got beat.  If we did, try to atomically update
+    // next_queue_index in case the previous writer failed and retry.
+    if (!to_replace.IsPlausible(decremented_queue_index)) {
+      // We don't care about the result.  It will either succeed, or we got
+      // beat in fixing it and just need to give up and try again.  If we got
+      // beat multiple times, the only way progress can be made is if the queue
+      // is updated as well.  This means that if we retry reading
+      // next_queue_index, we will be at most off by one and can retry.
+      //
+      // Both require no further action from us.
+      //
+      // TODO(austin): If we are having fairness issues under contention, we
+      // could have a mode bit in next_queue_index, and could use a lock or some
+      // other form of PI boosting to let the higher priority task win.
+      memory_->next_queue_index.CompareAndExchangeStrong(
+          actual_next_queue_index, incremented_queue_index);
+
+      if (kDebug) {
+        printf("We were beat.  Try again.  Was %x, is %x\n", to_replace.get(),
+               decremented_queue_index.index());
+      }
+      continue;
+    }
+
+    // Confirm that the message is what it should be.
+    {
+      // We just need this to be atomic and after the index has been calculated
+      // and before we exchange the index back in.  Both of those will be strong
+      // barriers, so this is fine.
+      const QueueIndex previous_index =
+          memory_->GetMessage(to_replace)
+              ->header.queue_index.RelaxedLoad(queue_size);
+      if (previous_index != decremented_queue_index && previous_index.valid()) {
+        // Retry.
+        if (kDebug) {
+          printf(
+              "Something fishy happened, queue index doesn't match.  Retrying. "
+              " Previous index was %x, should be %x\n",
+              previous_index.index(), decremented_queue_index.index());
+        }
+        continue;
+      }
+    }
+
+    message->header.monotonic_sent_time = ::aos::monotonic_clock::now();
+    message->header.realtime_sent_time = ::aos::realtime_clock::now();
+
+    // Before we are fully done filling out the message, update the Sender state
+    // with the new index to write.  This re-uses the barrier for the
+    // queue_index store.
+    const Index index_to_write(next_queue_index,
+                               scratch_index.message_index());
+
+    sender->scratch_index.RelaxedStore(index_to_write);
+
+    message->header.queue_index.Store(next_queue_index);
+
+    // The message is now filled out, and we have a confirmed slot to store
+    // into.
+    //
+    // Start by writing down what we are going to pull out of the queue.  This
+    // was Invalid before now.
+    sender->to_replace.RelaxedStore(to_replace);
+
+    // Then exchange the next index into the queue.
+    if (!memory_->GetQueue(next_queue_index.Wrapped())
+             ->CompareAndExchangeStrong(to_replace, index_to_write)) {
+      // Aw, didn't succeed.  Retry.
+      sender->to_replace.RelaxedInvalidate();
+      if (kDebug) {
+        printf("Failed to wrap into queue\n");
+      }
+      continue;
+    }
+
+    // Then update next_queue_index to save the next user some computation time.
+    memory_->next_queue_index.CompareAndExchangeStrong(actual_next_queue_index,
+                                                       incremented_queue_index);
+
+    // Now update the scratch space and record that we succeeded.
+    sender->scratch_index.Store(to_replace);
+    // And then clear out the entry used to replace.  This just needs to be
+    // atomic.  It can't be moved above the store because that is a full
+    // barrier, but delaying it until later will only affect things if something
+    // died.
+    sender->to_replace.RelaxedInvalidate();
+    break;
+  }
+}
+
+LocklessQueue::ReadResult LocklessQueue::Read(
+    uint32_t uint32_queue_index,
+    ::aos::monotonic_clock::time_point *monotonic_sent_time,
+    ::aos::realtime_clock::time_point *realtime_sent_time, size_t *length,
+    char *data) {
+  const size_t queue_size = memory_->queue_size();
+
+  // Build up the QueueIndex.
+  const QueueIndex queue_index =
+      QueueIndex::Zero(queue_size).IncrementBy(uint32_queue_index);
+
+  // Read the message stored at the requested location.
+  Index mi = memory_->LoadIndex(queue_index);
+  Message *m = memory_->GetMessage(mi);
+
+  while (true) {
+    // We need to confirm that the data doesn't change while we are reading it.
+    // Do that by first confirming that the message points to the queue index we
+    // want.
+    const QueueIndex starting_queue_index =
+        m->header.queue_index.Load(queue_size);
+    if (starting_queue_index != queue_index) {
+      // If we found a message that is exactly 1 loop old, we just wrapped.
+      if (starting_queue_index == queue_index.DecrementBy(queue_size)) {
+        if (kDebug) {
+          printf("Matches: %x, %x\n", starting_queue_index.index(),
+                 queue_index.DecrementBy(queue_size).index());
+        }
+        return ReadResult::NOTHING_NEW;
+      } else {
+        // Someone has re-used this message between when we pulled it out of the
+        // queue and when we grabbed its index.  It is pretty hard to deduce
+        // what happened. Just try again.
+        Message *new_m = memory_->GetMessage(queue_index);
+        if (m != new_m) {
+          m = new_m;
+          if (kDebug) {
+            printf("Retrying, m doesn't match\n");
+          }
+          continue;
+        }
+
+        // We have confirmed that message still points to the same message. This
+        // means that the message didn't get swapped out from under us, so
+        // starting_queue_index is correct.
+        //
+        // Either we got too far behind (signaled by this being a valid
+        // message), or this is one of the initial messages which are invalid.
+        if (starting_queue_index.valid()) {
+          if (kDebug) {
+            printf("Too old.  Tried for %x, got %x, behind by %d\n",
+                   queue_index.index(), starting_queue_index.index(),
+                   starting_queue_index.index() - queue_index.index());
+          }
+          return ReadResult::TOO_OLD;
+        }
+
+        if (kDebug) {
+          printf("Initial\n");
+        }
+
+        // There isn't a valid message at this location.
+        //
+        // If someone asks for one of the messages within the first go around,
+        // then they need to wait.  They got ahead.  Otherwise, they are
+        // asking for something crazy, like something before the beginning of
+        // the queue.  Tell them that they are behind.
+        if (uint32_queue_index < memory_->queue_size()) {
+          if (kDebug) {
+            printf("Near zero, %x\n", uint32_queue_index);
+          }
+          return ReadResult::NOTHING_NEW;
+        } else {
+          if (kDebug) {
+            printf("not near zero, %x\n", uint32_queue_index);
+          }
+          return ReadResult::TOO_OLD;
+        }
+      }
+    }
+    if (kDebug) {
+      printf("Eq: %x, %x\n", starting_queue_index.index(), queue_index.index());
+    }
+    break;
+  }
+
+  // Then read the data out.
+  *monotonic_sent_time = m->header.monotonic_sent_time;
+  *realtime_sent_time = m->header.realtime_sent_time;
+  memcpy(data, &m->data[0], m->header.length);
+  *length = m->header.length;
+
+  // And finally, confirm that the message *still* points to the queue index we
+  // want.  This means it didn't change out from under us.
+  // If something changed out from under us, we were reading it much too late in
+  // it's lifetime.
+  const QueueIndex final_queue_index = m->header.queue_index.Load(queue_size);
+  if (final_queue_index != queue_index) {
+    if (kDebug) {
+      printf(
+          "Changed out from under us.  Reading %x, finished with %x, delta: "
+          "%d\n",
+          queue_index.index(), final_queue_index.index(),
+          final_queue_index.index() - queue_index.index());
+    }
+    return ReadResult::TOO_OLD;
+  }
+
+  return ReadResult::GOOD;
+}
+
+uint32_t LocklessQueue::LatestQueueIndex() {
+  const size_t queue_size = memory_->queue_size();
+
+  // There is only one interesting case.  We need to know if the queue is empty.
+  // That is done with a sentinel value.  At worst, this will be off by one.
+  const QueueIndex next_queue_index =
+      memory_->next_queue_index.Load(queue_size);
+  if (next_queue_index.valid()) {
+    const QueueIndex current_queue_index = next_queue_index.DecrementBy(1u);
+    return current_queue_index.index();
+  } else {
+    return empty_queue_index();
+  }
+}
+
+namespace {
+
+// Prints out the mutex state.  Not safe to use while the mutex is being
+// changed.
+::std::string PrintMutex(aos_mutex *mutex) {
+  ::std::stringstream s;
+  s << "aos_mutex(" << ::std::hex << mutex->futex;
+
+  if (mutex->futex != 0) {
+    s << ":";
+    if (mutex->futex & FUTEX_OWNER_DIED) {
+      s << "FUTEX_OWNER_DIED|";
+    }
+    s << "tid=" << (mutex->futex & FUTEX_TID_MASK);
+  }
+
+  s << ")";
+  return s.str();
+}
+
+}  // namespace
+
+void PrintLocklessQueueMemory(LocklessQueueMemory *memory) {
+  const size_t queue_size = memory->queue_size();
+  ::std::cout << "LocklessQueueMemory (" << memory << ") {" << ::std::endl;
+  ::std::cout << "  aos_mutex queue_setup_lock = "
+              << PrintMutex(&memory->queue_setup_lock) << ::std::endl;
+  ::std::cout << "  ::std::atomic<bool> initialized = " << memory->initialized
+              << ::std::endl;
+  ::std::cout << "  config {" << ::std::endl;
+  ::std::cout << "    size_t num_watchers = " << memory->config.num_watchers
+              << ::std::endl;
+  ::std::cout << "    size_t num_senders = " << memory->config.num_senders
+              << ::std::endl;
+  ::std::cout << "    size_t queue_size = " << memory->config.queue_size
+              << ::std::endl;
+  ::std::cout << "    size_t message_data_size = "
+              << memory->config.message_data_size << ::std::endl;
+
+  ::std::cout << "    AtomicQueueIndex next_queue_index = "
+              << memory->next_queue_index.Load(queue_size).DebugString()
+              << ::std::endl;
+
+  ::std::cout << "  }" << ::std::endl;
+  ::std::cout << "  AtomicIndex queue[" << queue_size << "] {" << ::std::endl;
+  for (size_t i = 0; i < queue_size; ++i) {
+    ::std::cout << "    [" << i << "] -> "
+                << memory->GetQueue(i)->Load().DebugString() << ::std::endl;
+  }
+  ::std::cout << "  }" << ::std::endl;
+  ::std::cout << "  Message messages[" << memory->num_messages() << "] {"
+              << ::std::endl;
+  for (size_t i = 0; i < memory->num_messages(); ++i) {
+    Message *m = memory->GetMessage(Index(i, i));
+    ::std::cout << "    [" << i << "] -> Message {" << ::std::endl;
+    ::std::cout << "      Header {" << ::std::endl;
+    ::std::cout << "        AtomicQueueIndex queue_index = "
+                << m->header.queue_index.Load(queue_size).DebugString()
+                << ::std::endl;
+    ::std::cout << "        size_t length = " << m->header.length
+                << ::std::endl;
+    ::std::cout << "      }" << ::std::endl;
+    ::std::cout << "      data: {";
+
+    for (size_t j = 0; j < m->header.length; ++j) {
+      char data = m->data[j];
+      if (j != 0) {
+        ::std::cout << " ";
+      }
+      if (::std::isprint(data)) {
+        ::std::cout << ::std::setfill(' ') << ::std::setw(2) << ::std::hex
+                    << data;
+      } else {
+        ::std::cout << "0x" << ::std::setfill('0') << ::std::setw(2)
+                    << ::std::hex << (static_cast<unsigned>(data) & 0xff);
+      }
+    }
+    ::std::cout << ::std::setfill(' ') << ::std::dec << "}" << ::std::endl;
+    ::std::cout << "    }," << ::std::endl;
+  }
+  ::std::cout << "  }" << ::std::endl;
+
+  ::std::cout << "  Sender senders[" << memory->num_senders() << "] {" << ::std::endl;
+  for (size_t i = 0; i < memory->num_senders(); ++i) {
+    Sender *s = memory->GetSender(i);
+    ::std::cout << "    [" << i << "] -> Sender {" << ::std::endl;
+    ::std::cout << "      aos_mutex tid = " << PrintMutex(&s->tid)
+                << ::std::endl;
+    ::std::cout << "      AtomicIndex scratch_index = "
+                << s->scratch_index.Load().DebugString() << ::std::endl;
+    ::std::cout << "      AtomicIndex to_replace = "
+                << s->to_replace.Load().DebugString() << ::std::endl;
+    ::std::cout << "    }" << ::std::endl;
+  }
+  ::std::cout << "  }" << ::std::endl;
+
+  ::std::cout << "  Watcher watchers[" << memory->num_watchers() << "] {"
+              << ::std::endl;
+  for (size_t i = 0; i < memory->num_watchers(); ++i) {
+    Watcher *w = memory->GetWatcher(i);
+    ::std::cout << "    [" << i << "] -> Watcher {" << ::std::endl;
+    ::std::cout << "      aos_mutex tid = " << PrintMutex(&w->tid)
+                << ::std::endl;
+    ::std::cout << "      pid_t pid = " << w->pid << ::std::endl;
+    ::std::cout << "      int priority = " << w->priority << ::std::endl;
+    ::std::cout << "    }" << ::std::endl;
+  }
+  ::std::cout << "  }" << ::std::endl;
+
+  ::std::cout << "}" << ::std::endl;
+}
+
+}  // namespace ipc_lib
+}  // namespace aos