Clean up memory barriers and documentation in lockless_queue

Various fixups to make it safe under my understanding of the C++ memory
model, and also easier to verify that that's true.

Change-Id: I94afdc3908c9b77a9e72a33abaae5c4a354de350
diff --git a/aos/ipc_lib/lockless_queue.cc b/aos/ipc_lib/lockless_queue.cc
index c2b0254..78f990d 100644
--- a/aos/ipc_lib/lockless_queue.cc
+++ b/aos/ipc_lib/lockless_queue.cc
@@ -16,16 +16,31 @@
 
 namespace aos {
 namespace ipc_lib {
-
 namespace {
 
-void GrabQueueSetupLockOrDie(LocklessQueueMemory *memory) {
-  const int result = mutex_grab(&(memory->queue_setup_lock));
-  CHECK(result == 0 || result == 1);
-}
+class GrabQueueSetupLockOrDie {
+ public:
+  GrabQueueSetupLockOrDie(LocklessQueueMemory *memory) : memory_(memory) {
+    const int result = mutex_grab(&(memory->queue_setup_lock));
+    CHECK(result == 0 || result == 1) << ": " << result;
+  }
 
-// This must be called under the queue_setup_lock.
-void Cleanup(LocklessQueueMemory *memory) {
+  ~GrabQueueSetupLockOrDie() { mutex_unlock(&(memory_->queue_setup_lock)); }
+
+  GrabQueueSetupLockOrDie(const GrabQueueSetupLockOrDie &) = delete;
+  GrabQueueSetupLockOrDie &operator=(const GrabQueueSetupLockOrDie &) = delete;
+
+ private:
+  LocklessQueueMemory *const memory_;
+};
+
+void Cleanup(LocklessQueueMemory *memory, const GrabQueueSetupLockOrDie &) {
+  // Make sure we start looking at shared memory fresh right now. We'll handle
+  // people dying partway through by either cleaning up after them or not, but
+  // we want to ensure we clean up after anybody who has already died when we
+  // start.
+  aos_compiler_memory_barrier();
+
   const size_t num_senders = memory->num_senders();
   const size_t queue_size = memory->queue_size();
   const size_t num_messages = memory->num_messages();
@@ -54,14 +69,16 @@
   for (size_t i = 0; i < num_senders; ++i) {
     Sender *sender = memory->GetSender(i);
     const uint32_t tid =
-        __atomic_load_n(&(sender->tid.futex), __ATOMIC_RELAXED);
+        __atomic_load_n(&(sender->tid.futex), __ATOMIC_ACQUIRE);
     if (tid & FUTEX_OWNER_DIED) {
       VLOG(3) << "Found an easy death for sender " << i;
+      // We can do a relaxed load here because we're the only person touching
+      // this sender at this point.
       const Index to_replace = sender->to_replace.RelaxedLoad();
       const Index scratch_index = sender->scratch_index.Load();
 
       // I find it easiest to think about this in terms of the set of observable
-      // states.  The main code follows the following states:
+      // states.  The main code progresses through the following states:
 
       // 1) scratch_index = xxx
       //    to_replace = invalid
@@ -92,14 +109,14 @@
           sender->to_replace.Invalidate();
 
           // And mark that we succeeded.
-          __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_SEQ_CST);
+          __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_RELEASE);
           ++valid_senders;
         }
       } else {
         // 1) or 4).  Make sure we aren't corrupted and declare victory.
         CHECK(scratch_index.valid());
 
-        __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_SEQ_CST);
+        __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_RELEASE);
         ++valid_senders;
       }
     } else {
@@ -123,12 +140,16 @@
   while ((num_accounted_for + num_missing) != num_messages) {
     num_missing = 0;
     for (size_t i = 0; i < num_senders; ++i) {
-      Sender *sender = memory->GetSender(i);
+      Sender *const sender = memory->GetSender(i);
       const uint32_t tid =
-          __atomic_load_n(&(sender->tid.futex), __ATOMIC_RELAXED);
+          __atomic_load_n(&(sender->tid.futex), __ATOMIC_ACQUIRE);
       if (tid & FUTEX_OWNER_DIED) {
         ++num_missing;
       } else {
+        // We can do a relaxed load here because we're the only person touching
+        // this sender at this point, if it matters. If it's not a dead sender,
+        // then any message it every has will already be accounted for, so this
+        // will always be a NOP.
         const Index scratch_index = sender->scratch_index.RelaxedLoad();
         if (!accounted_for[scratch_index.message_index()]) {
           ++num_accounted_for;
@@ -138,6 +159,7 @@
     }
 
     for (size_t i = 0; i < queue_size; ++i) {
+      // Same logic as above for scratch_index applies here too.
       const Index index = memory->GetQueue(i)->RelaxedLoad();
       if (!accounted_for[index.message_index()]) {
         ++num_accounted_for;
@@ -151,8 +173,10 @@
     for (size_t i = 0; i < num_senders; ++i) {
       Sender *sender = memory->GetSender(i);
       const uint32_t tid =
-          __atomic_load_n(&(sender->tid.futex), __ATOMIC_RELAXED);
+          __atomic_load_n(&(sender->tid.futex), __ATOMIC_ACQUIRE);
       if (tid & FUTEX_OWNER_DIED) {
+        // We can do relaxed loads here because we're the only person touching
+        // this sender at this point.
         const Index scratch_index = sender->scratch_index.RelaxedLoad();
         const Index to_replace = sender->to_replace.RelaxedLoad();
 
@@ -170,7 +194,7 @@
           sender->to_replace.Invalidate();
 
           // And then mark this sender clean.
-          __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_SEQ_CST);
+          __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_RELEASE);
 
           // And account for scratch_index.
           accounted_for[scratch_index.message_index()] = true;
@@ -188,7 +212,7 @@
           sender->to_replace.Invalidate();
 
           // And then mark this sender clean.
-          __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_SEQ_CST);
+          __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_RELEASE);
 
           // And account for to_replace.
           accounted_for[to_replace.message_index()] = true;
@@ -208,6 +232,7 @@
 
 // Exposes rt_tgsigqueueinfo so we can send the signal *just* to the target
 // thread.
+// TODO(Brian): Do directly in assembly for armhf at least for efficiency.
 int rt_tgsigqueueinfo(pid_t tgid, pid_t tid, int sig, siginfo_t *si) {
   return syscall(SYS_rt_tgsigqueueinfo, tgid, tid, sig, si);
 }
@@ -215,25 +240,25 @@
 }  // namespace
 
 size_t LocklessQueueMemorySize(LocklessQueueConfiguration config) {
-  // Round up the message size so following data is double aligned.  That should
-  // be overkill for most platforms.  And the checks below confirms it.
-  config.message_data_size = (config.message_data_size + 7) & ~0x7;
+  // Round up the message size so following data is aligned appropriately.
+  config.message_data_size =
+      LocklessQueueMemory::AlignmentRoundUp(config.message_data_size);
 
   // As we build up the size, confirm that everything is aligned to the
   // alignment requirements of the type.
   size_t size = sizeof(LocklessQueueMemory);
-  CHECK_EQ(size & (alignof(LocklessQueueMemory) - 1), 0u);
+  CHECK_EQ(size % alignof(LocklessQueueMemory), 0u);
 
-  CHECK_EQ(size & (alignof(AtomicIndex) - 1), 0u);
+  CHECK_EQ(size % alignof(AtomicIndex), 0u);
   size += LocklessQueueMemory::SizeOfQueue(config);
 
-  CHECK_EQ(size & (alignof(Message) - 1), 0u);
+  CHECK_EQ(size % alignof(Message), 0u);
   size += LocklessQueueMemory::SizeOfMessages(config);
 
-  CHECK_EQ(size & (alignof(Watcher) - 1), 0u);
+  CHECK_EQ(size % alignof(Watcher), 0u);
   size += LocklessQueueMemory::SizeOfWatchers(config);
 
-  CHECK_EQ(size & (alignof(Sender) - 1), 0u);
+  CHECK_EQ(size % alignof(Sender), 0u);
   size += LocklessQueueMemory::SizeOfSenders(config);
 
   return size;
@@ -246,15 +271,15 @@
 
   // Grab the mutex.  We don't care if the previous reader died.  We are going
   // to check everything anyways.
-  GrabQueueSetupLockOrDie(memory);
+  GrabQueueSetupLockOrDie grab_queue_setup_lock(memory);
 
   if (!memory->initialized) {
     // TODO(austin): Check these for out of bounds.
     memory->config.num_watchers = config.num_watchers;
     memory->config.num_senders = config.num_senders;
     memory->config.queue_size = config.queue_size;
-    // Round up to the nearest double word bytes.
-    memory->config.message_data_size = (config.message_data_size + 7) & ~0x7;
+    memory->config.message_data_size =
+        LocklessQueueMemory::AlignmentRoundUp(config.message_data_size);
 
     const size_t num_messages = memory->num_messages();
     // There need to be at most MaxMessages() messages allocated.
@@ -278,18 +303,18 @@
 
     for (size_t i = 0; i < memory->num_senders(); ++i) {
       ::aos::ipc_lib::Sender *s = memory->GetSender(i);
-      s->scratch_index.Store(Index(0xffff, i + memory->queue_size()));
+      // Nobody else can possibly be touching these because we haven't set
+      // initialized to true yet.
+      s->scratch_index.RelaxedStore(Index(0xffff, i + memory->queue_size()));
       s->to_replace.RelaxedInvalidate();
     }
 
+    aos_compiler_memory_barrier();
     // Signal everything is done.  This needs to be done last, so if we die, we
     // redo initialization.
-    // This is a full atomic (probably overkill), but this is at initialization
-    // time, so it is cheap.
-    memory->initialized.store(true);
+    memory->initialized = true;
   }
 
-  mutex_unlock(&(memory->queue_setup_lock));
   return memory;
 }
 
@@ -303,15 +328,15 @@
 LocklessQueue::~LocklessQueue() {
   CHECK_EQ(watcher_index_, -1);
 
-  GrabQueueSetupLockOrDie(memory_);
+  GrabQueueSetupLockOrDie grab_queue_setup_lock(memory_);
   const int num_watchers = memory_->num_watchers();
-  // Cleanup is cheap.  Go for it anyways.
+  // Cleanup is cheap. The next user will do it anyways, so no need for us to do
+  // anything right now.
 
   // And confirm that nothing is owned by us.
   for (int i = 0; i < num_watchers; ++i) {
-    CHECK(!mutex_islocked(&(memory_->GetWatcher(i)->tid)));
+    CHECK(!death_notification_is_held(&(memory_->GetWatcher(i)->tid)));
   }
-  mutex_unlock(&(memory_->queue_setup_lock));
 }
 
 size_t LocklessQueue::QueueSize() const { return memory_->queue_size(); }
@@ -323,15 +348,17 @@
   // Since everything is self consistent, all we need to do is make sure nobody
   // else is running.  Someone dying will get caught in the generic consistency
   // check.
-  GrabQueueSetupLockOrDie(memory_);
+  GrabQueueSetupLockOrDie grab_queue_setup_lock(memory_);
   const int num_watchers = memory_->num_watchers();
 
   // Now, find the first empty watcher and grab it.
   CHECK_EQ(watcher_index_, -1);
   for (int i = 0; i < num_watchers; ++i) {
+    // If we see a slot the kernel has marked as dead, everything we do reusing
+    // it needs to happen-after whatever that process did before dying.
     const uint32_t tid =
-        __atomic_load_n(&(memory_->GetWatcher(i)->tid.futex), __ATOMIC_RELAXED);
-    if (tid == 0 || tid & FUTEX_OWNER_DIED) {
+        __atomic_load_n(&(memory_->GetWatcher(i)->tid.futex), __ATOMIC_ACQUIRE);
+    if (tid == 0 || (tid & FUTEX_OWNER_DIED)) {
       watcher_index_ = i;
       break;
     }
@@ -339,7 +366,6 @@
 
   // Bail if we failed to find an open slot.
   if (watcher_index_ == -1) {
-    mutex_unlock(&(memory_->queue_setup_lock));
     return false;
   }
 
@@ -350,36 +376,27 @@
 
   // Grabbing a mutex is a compiler and memory barrier, so nothing before will
   // get rearranged afterwords.
-  //
-  // Since everything is done under the queue_setup_lock, this should always
-  // return immediately.
-  const int result = mutex_grab(&(w->tid));
-
-  mutex_unlock(&(memory_->queue_setup_lock));
-
-  // We should either get the lock, or the previous owner should have died.
-  // Anything else is a pretty serious error.
-  return result == 0 || result == 1;
+  death_notification_init(&(w->tid));
+  return true;
 }
 
 void LocklessQueue::UnregisterWakeup() {
   // Since everything is self consistent, all we need to do is make sure nobody
   // else is running.  Someone dying will get caught in the generic consistency
   // check.
-  GrabQueueSetupLockOrDie(memory_);
+  GrabQueueSetupLockOrDie grab_queue_setup_lock(memory_);
 
   // Make sure we are registered.
   CHECK_NE(watcher_index_, -1);
 
   // Make sure we still own the slot we are supposed to.
-  CHECK(mutex_islocked(&(memory_->GetWatcher(watcher_index_)->tid)));
+  CHECK(
+      death_notification_is_held(&(memory_->GetWatcher(watcher_index_)->tid)));
 
   // The act of unlocking invalidates the entry.  Invalidate it.
-  mutex_unlock(&(memory_->GetWatcher(watcher_index_)->tid));
+  death_notification_release(&(memory_->GetWatcher(watcher_index_)->tid));
   // And internally forget the slot.
   watcher_index_ = -1;
-
-  mutex_unlock(&(memory_->queue_setup_lock));
 }
 
 int LocklessQueue::Wakeup(const int current_priority) {
@@ -395,22 +412,26 @@
   // creating a pidfd is likely not RT.
   for (size_t i = 0; i < num_watchers; ++i) {
     Watcher *w = memory_->GetWatcher(i);
-    // Start by reading the tid.  This needs to be atomic to force it to come
-    // first.
-    watcher_copy_[i].tid = __atomic_load_n(&(w->tid.futex), __ATOMIC_SEQ_CST);
-    watcher_copy_[i].pid = w->pid;
-    watcher_copy_[i].priority = w->priority;
+    watcher_copy_[i].tid = __atomic_load_n(&(w->tid.futex), __ATOMIC_RELAXED);
+    // Force the load of the TID to come first.
+    aos_compiler_memory_barrier();
+    watcher_copy_[i].pid = w->pid.load(std::memory_order_relaxed);
+    watcher_copy_[i].priority = w->priority.load(std::memory_order_relaxed);
 
     // Use a priority of -1 to mean an invalid entry to make sorting easier.
     if (watcher_copy_[i].tid & FUTEX_OWNER_DIED || watcher_copy_[i].tid == 0) {
       watcher_copy_[i].priority = -1;
-    } else if (watcher_copy_[i].tid !=
-               static_cast<pid_t>(
-                   __atomic_load_n(&(w->tid.futex), __ATOMIC_SEQ_CST))) {
-      // Confirm that the watcher hasn't been re-used and modified while we read
-      // it.  If it has, mark it invalid again.
-      watcher_copy_[i].priority = -1;
-      watcher_copy_[i].tid = 0;
+    } else {
+      // Ensure all of this happens after we're done looking at the pid+priority
+      // in shared memory.
+      aos_compiler_memory_barrier();
+      if (watcher_copy_[i].tid != static_cast<pid_t>(__atomic_load_n(
+                                      &(w->tid.futex), __ATOMIC_RELAXED))) {
+        // Confirm that the watcher hasn't been re-used and modified while we
+        // read it.  If it has, mark it invalid again.
+        watcher_copy_[i].priority = -1;
+        watcher_copy_[i].tid = 0;
+      }
     }
   }
 
@@ -464,15 +485,18 @@
 }
 
 LocklessQueue::Sender::Sender(LocklessQueueMemory *memory) : memory_(memory) {
-  GrabQueueSetupLockOrDie(memory_);
+  GrabQueueSetupLockOrDie grab_queue_setup_lock(memory_);
 
   // Since we already have the lock, go ahead and try cleaning up.
-  Cleanup(memory_);
+  Cleanup(memory_, grab_queue_setup_lock);
 
   const int num_senders = memory_->num_senders();
 
   for (int i = 0; i < num_senders; ++i) {
     ::aos::ipc_lib::Sender *s = memory->GetSender(i);
+    // This doesn't need synchronization because we're the only process doing
+    // initialization right now, and nobody else will be touching senders which
+    // we're interested in.
     const uint32_t tid = __atomic_load_n(&(s->tid.futex), __ATOMIC_RELAXED);
     if (tid == 0) {
       sender_index_ = i;
@@ -486,17 +510,14 @@
 
   ::aos::ipc_lib::Sender *s = memory_->GetSender(sender_index_);
 
-  // Atomically grab the mutex.  This signals that we are alive.  If the
-  // previous owner died, we don't care, and want to grab the mutex anyways.
-  const int result = mutex_grab(&(s->tid));
-  CHECK(result == 0 || result == 1);
-
-  mutex_unlock(&(memory->queue_setup_lock));
+  // Indicate that we are now alive by taking over the slot. If the previous
+  // owner died, we still want to do this.
+  death_notification_init(&(s->tid));
 }
 
 LocklessQueue::Sender::~Sender() {
   if (memory_ != nullptr) {
-    mutex_unlock(&(memory_->GetSender(sender_index_)->tid));
+    death_notification_release(&(memory_->GetSender(sender_index_)->tid));
   }
 }
 
@@ -532,9 +553,11 @@
   const size_t queue_size = memory_->queue_size();
   CHECK_LE(length, size());
 
-  ::aos::ipc_lib::Sender *sender = memory_->GetSender(sender_index_);
-  Index scratch_index = sender->scratch_index.RelaxedLoad();
-  Message *message = memory_->GetMessage(scratch_index);
+  ::aos::ipc_lib::Sender *const sender = memory_->GetSender(sender_index_);
+  // We can do a relaxed load on our sender because we're the only person
+  // modifying it right now.
+  const Index scratch_index = sender->scratch_index.RelaxedLoad();
+  Message *const message = memory_->GetMessage(scratch_index);
 
   message->header.length = length;
 
@@ -545,7 +568,8 @@
 
     const QueueIndex incremented_queue_index = next_queue_index.Increment();
 
-    // TODO(austin): I think we can drop the barrier off this.
+    // This needs to synchronize with whoever the previous writer at this
+    // location was.
     const Index to_replace = memory_->LoadIndex(next_queue_index);
 
     const QueueIndex decremented_queue_index =
@@ -575,12 +599,8 @@
 
     // Confirm that the message is what it should be.
     {
-      // We just need this to be atomic and after the index has been calculated
-      // and before we exchange the index back in.  Both of those will be strong
-      // barriers, so this is fine.
       const QueueIndex previous_index =
-          memory_->GetMessage(to_replace)
-              ->header.queue_index.RelaxedLoad(queue_size);
+          memory_->GetMessage(to_replace)->header.queue_index.Load(queue_size);
       if (previous_index != decremented_queue_index && previous_index.valid()) {
         // Retry.
         VLOG(3) << "Something fishy happened, queue index doesn't match.  "
@@ -599,22 +619,30 @@
     // queue_index store.
     const Index index_to_write(next_queue_index, scratch_index.message_index());
 
+    aos_compiler_memory_barrier();
+    // We're the only person who cares about our scratch index, besides somebody
+    // cleaning up after us.
     sender->scratch_index.RelaxedStore(index_to_write);
+    aos_compiler_memory_barrier();
 
     message->header.queue_index.Store(next_queue_index);
 
+    aos_compiler_memory_barrier();
     // The message is now filled out, and we have a confirmed slot to store
     // into.
     //
     // Start by writing down what we are going to pull out of the queue.  This
-    // was Invalid before now.
+    // was Invalid before now. Only person who will read this is whoever cleans
+    // up after us, so no synchronization necessary.
     sender->to_replace.RelaxedStore(to_replace);
+    aos_compiler_memory_barrier();
 
     // Then exchange the next index into the queue.
     if (!memory_->GetQueue(next_queue_index.Wrapped())
              ->CompareAndExchangeStrong(to_replace, index_to_write)) {
       // Aw, didn't succeed.  Retry.
       sender->to_replace.RelaxedInvalidate();
+      aos_compiler_memory_barrier();
       VLOG(3) << "Failed to wrap into queue";
       continue;
     }
@@ -623,12 +651,11 @@
     memory_->next_queue_index.CompareAndExchangeStrong(actual_next_queue_index,
                                                        incremented_queue_index);
 
+    aos_compiler_memory_barrier();
     // Now update the scratch space and record that we succeeded.
     sender->scratch_index.Store(to_replace);
-    // And then clear out the entry used to replace.  This just needs to be
-    // atomic.  It can't be moved above the store because that is a full
-    // barrier, but delaying it until later will only affect things if something
-    // died.
+    aos_compiler_memory_barrier();
+    // And then record that we succeeded, but definitely after the above store.
     sender->to_replace.RelaxedInvalidate();
     break;
   }
@@ -665,7 +692,7 @@
         // Someone has re-used this message between when we pulled it out of the
         // queue and when we grabbed its index.  It is pretty hard to deduce
         // what happened. Just try again.
-        Message *new_m = memory_->GetMessage(queue_index);
+        Message *const new_m = memory_->GetMessage(queue_index);
         if (m != new_m) {
           m = new_m;
           VLOG(3) << "Retrying, m doesn't match";
@@ -698,7 +725,7 @@
           VLOG(3) << "Near zero, " << std::hex << uint32_queue_index;
           return ReadResult::NOTHING_NEW;
         } else {
-          VLOG(3) << "not near zero, " << std::hex << uint32_queue_index;
+          VLOG(3) << "Not near zero, " << std::hex << uint32_queue_index;
           return ReadResult::TOO_OLD;
         }
       }
@@ -719,6 +746,7 @@
   // want.  This means it didn't change out from under us.
   // If something changed out from under us, we were reading it much too late in
   // it's lifetime.
+  aos_compiler_memory_barrier();
   const QueueIndex final_queue_index = m->header.queue_index.Load(queue_size);
   if (final_queue_index != queue_index) {
     VLOG(3) << "Changed out from under us.  Reading " << std::hex
@@ -778,8 +806,7 @@
   ::std::cout << "LocklessQueueMemory (" << memory << ") {" << ::std::endl;
   ::std::cout << "  aos_mutex queue_setup_lock = "
               << PrintMutex(&memory->queue_setup_lock) << ::std::endl;
-  ::std::cout << "  ::std::atomic<bool> initialized = " << memory->initialized
-              << ::std::endl;
+  ::std::cout << "  bool initialized = " << memory->initialized << ::std::endl;
   ::std::cout << "  config {" << ::std::endl;
   ::std::cout << "    size_t num_watchers = " << memory->config.num_watchers
               << ::std::endl;