Merge changes I94afdc39,Id92f11a3,I403535de,I468c5a35,I1f1749a6, ...

* changes:
  Clean up memory barriers and documentation in lockless_queue
  Align the shared memory data better
  Use acquire/release instead of seq_cst
  Fix compiler_memory_barrier
  Use dedicated functions for using futexes to wait for a task to die
  Make signalfd wrapper more robust
diff --git a/aos/ipc_lib/BUILD b/aos/ipc_lib/BUILD
index 908cb80..57adc6d 100644
--- a/aos/ipc_lib/BUILD
+++ b/aos/ipc_lib/BUILD
@@ -164,6 +164,19 @@
     ],
 )
 
+cc_test(
+    name = "signalfd_test",
+    srcs = [
+        "signalfd_test.cc",
+    ],
+    deps = [
+        ":signalfd",
+        "//aos/testing:googletest",
+        "//aos/testing:test_logging",
+        "@com_github_google_glog//:glog",
+    ],
+)
+
 cc_library(
     name = "index",
     srcs = ["index.cc"],
diff --git a/aos/ipc_lib/aos_sync.cc b/aos/ipc_lib/aos_sync.cc
index efb1fb1..d5c3d4e 100644
--- a/aos/ipc_lib/aos_sync.cc
+++ b/aos/ipc_lib/aos_sync.cc
@@ -851,6 +851,44 @@
   return (value & FUTEX_TID_MASK) == tid;
 }
 
+void death_notification_init(aos_mutex *m) {
+  const uint32_t tid = get_tid();
+  if (kPrintOperations) {
+    printf("%" PRId32 ": %p death_notification start\n", tid, m);
+  }
+  my_robust_list::Adder adder(m);
+  {
+    RunObservers run_observers(m, true);
+    CHECK(compare_and_swap(&m->futex, 0, tid));
+  }
+  adder.Add();
+}
+
+void death_notification_release(aos_mutex *m) {
+  RunObservers run_observers(m, true);
+
+#ifndef NDEBUG
+  // Verify it's "locked", like it should be.
+  {
+    const uint32_t tid = get_tid();
+    if (kPrintOperations) {
+      printf("%" PRId32 ": %p death_notification release\n", tid, m);
+    }
+    const uint32_t value = __atomic_load_n(&m->futex, __ATOMIC_SEQ_CST);
+    assert((value & ~FUTEX_WAITERS) == tid);
+  }
+#endif
+
+  my_robust_list::Remover remover(m);
+  ANNOTATE_HAPPENS_BEFORE(m);
+  const int ret = sys_futex_unlock_pi(&m->futex);
+  if (ret != 0) {
+    my_robust_list::robust_head.pending_next = 0;
+    errno = -ret;
+    PLOG(FATAL)  << "FUTEX_UNLOCK_PI(" << &m->futex << ") failed";
+  }
+}
+
 int condition_wait(aos_condition *c, aos_mutex *m, struct timespec *end_time) {
   RunObservers run_observers(c, false);
   const uint32_t tid = get_tid();
diff --git a/aos/ipc_lib/aos_sync.h b/aos/ipc_lib/aos_sync.h
index 2824516..8290faa 100644
--- a/aos/ipc_lib/aos_sync.h
+++ b/aos/ipc_lib/aos_sync.h
@@ -26,7 +26,7 @@
 // No initialization is necessary.
 typedef aos_futex aos_condition;
 
-// For use with the mutex_ functions.
+// For use with the mutex_ or death_notification_ functions.
 // futex must be initialized to 0.
 // No initialization is necessary for next and previous.
 // Under ThreadSanitizer, pthread_mutex_init must be initialized to false.
@@ -88,6 +88,25 @@
 // checking mutexes as they are destroyed to catch problems with that early and
 // stack-based recursive mutex locking.
 bool mutex_islocked(const aos_mutex *m);
+
+// The death_notification_ functions are designed for one thread to wait for
+// another thread (possibly in a different process) to end. This can mean
+// exiting gracefully or dying at any point. They use a standard aos_mutex, but
+// this mutex may not be used with any of the mutex_ functions.
+
+// Initializes a variable which can be used to wait for this thread to die.
+// This can only be called once after initializing *m.
+void death_notification_init(aos_mutex *m);
+
+// Manually triggers a death notification for this thread.
+// This thread must have previously called death_notification_init(m).
+void death_notification_release(aos_mutex *m);
+
+// Returns whether or not m is held by the current thread.
+// This is mainly useful as a debug assertion.
+inline bool death_notification_is_held(aos_mutex *m) {
+  return mutex_islocked(m);
+}
 #endif
 
 // The futex_ functions are similar to the mutex_ ones but different.
diff --git a/aos/ipc_lib/index.h b/aos/ipc_lib/index.h
index b1afdc8..7d979ea 100644
--- a/aos/ipc_lib/index.h
+++ b/aos/ipc_lib/index.h
@@ -138,8 +138,12 @@
   }
 
   // Full bidirectional barriers here.
-  QueueIndex Load(uint32_t count) { return QueueIndex(index_.load(), count); }
-  inline void Store(QueueIndex value) { index_.store(value.index_); }
+  QueueIndex Load(uint32_t count) {
+    return QueueIndex(index_.load(::std::memory_order_acquire), count);
+  }
+  inline void Store(QueueIndex value) {
+    index_.store(value.index_, ::std::memory_order_release);
+  }
 
   // Invalidates the element unconditionally.
   inline void Invalidate() { Store(QueueIndex::Invalid()); }
@@ -147,7 +151,8 @@
   // Swaps expected for index atomically.  Returns true on success, false
   // otherwise.
   inline bool CompareAndExchangeStrong(QueueIndex expected, QueueIndex index) {
-    return index_.compare_exchange_strong(expected.index_, index.index_);
+    return index_.compare_exchange_strong(expected.index_, index.index_,
+                                          ::std::memory_order_acq_rel);
   }
 
  private:
@@ -221,16 +226,18 @@
   // Invalidates the index atomically, but without any ordering constraints.
   void RelaxedInvalidate() { RelaxedStore(Index::Invalid()); }
 
-  // Full bidirectional barriers here.
+  // Full barriers here.
   void Invalidate() { Store(Index::Invalid()); }
-  void Store(Index index) { index_.store(index.index_); }
-  Index Load() { return Index(index_.load()); }
-
+  void Store(Index index) {
+    index_.store(index.index_, ::std::memory_order_release);
+  }
+  Index Load() { return Index(index_.load(::std::memory_order_acquire)); }
 
   // Swaps expected for index atomically.  Returns true on success, false
   // otherwise.
   inline bool CompareAndExchangeStrong(Index expected, Index index) {
-    return index_.compare_exchange_strong(expected.index_, index.index_);
+    return index_.compare_exchange_strong(expected.index_, index.index_,
+                                          ::std::memory_order_acq_rel);
   }
 
  private:
diff --git a/aos/ipc_lib/lockless_queue.cc b/aos/ipc_lib/lockless_queue.cc
index c2b0254..78f990d 100644
--- a/aos/ipc_lib/lockless_queue.cc
+++ b/aos/ipc_lib/lockless_queue.cc
@@ -16,16 +16,31 @@
 
 namespace aos {
 namespace ipc_lib {
-
 namespace {
 
-void GrabQueueSetupLockOrDie(LocklessQueueMemory *memory) {
-  const int result = mutex_grab(&(memory->queue_setup_lock));
-  CHECK(result == 0 || result == 1);
-}
+class GrabQueueSetupLockOrDie {
+ public:
+  GrabQueueSetupLockOrDie(LocklessQueueMemory *memory) : memory_(memory) {
+    const int result = mutex_grab(&(memory->queue_setup_lock));
+    CHECK(result == 0 || result == 1) << ": " << result;
+  }
 
-// This must be called under the queue_setup_lock.
-void Cleanup(LocklessQueueMemory *memory) {
+  ~GrabQueueSetupLockOrDie() { mutex_unlock(&(memory_->queue_setup_lock)); }
+
+  GrabQueueSetupLockOrDie(const GrabQueueSetupLockOrDie &) = delete;
+  GrabQueueSetupLockOrDie &operator=(const GrabQueueSetupLockOrDie &) = delete;
+
+ private:
+  LocklessQueueMemory *const memory_;
+};
+
+void Cleanup(LocklessQueueMemory *memory, const GrabQueueSetupLockOrDie &) {
+  // Make sure we start looking at shared memory fresh right now. We'll handle
+  // people dying partway through by either cleaning up after them or not, but
+  // we want to ensure we clean up after anybody who has already died when we
+  // start.
+  aos_compiler_memory_barrier();
+
   const size_t num_senders = memory->num_senders();
   const size_t queue_size = memory->queue_size();
   const size_t num_messages = memory->num_messages();
@@ -54,14 +69,16 @@
   for (size_t i = 0; i < num_senders; ++i) {
     Sender *sender = memory->GetSender(i);
     const uint32_t tid =
-        __atomic_load_n(&(sender->tid.futex), __ATOMIC_RELAXED);
+        __atomic_load_n(&(sender->tid.futex), __ATOMIC_ACQUIRE);
     if (tid & FUTEX_OWNER_DIED) {
       VLOG(3) << "Found an easy death for sender " << i;
+      // We can do a relaxed load here because we're the only person touching
+      // this sender at this point.
       const Index to_replace = sender->to_replace.RelaxedLoad();
       const Index scratch_index = sender->scratch_index.Load();
 
       // I find it easiest to think about this in terms of the set of observable
-      // states.  The main code follows the following states:
+      // states.  The main code progresses through the following states:
 
       // 1) scratch_index = xxx
       //    to_replace = invalid
@@ -92,14 +109,14 @@
           sender->to_replace.Invalidate();
 
           // And mark that we succeeded.
-          __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_SEQ_CST);
+          __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_RELEASE);
           ++valid_senders;
         }
       } else {
         // 1) or 4).  Make sure we aren't corrupted and declare victory.
         CHECK(scratch_index.valid());
 
-        __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_SEQ_CST);
+        __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_RELEASE);
         ++valid_senders;
       }
     } else {
@@ -123,12 +140,16 @@
   while ((num_accounted_for + num_missing) != num_messages) {
     num_missing = 0;
     for (size_t i = 0; i < num_senders; ++i) {
-      Sender *sender = memory->GetSender(i);
+      Sender *const sender = memory->GetSender(i);
       const uint32_t tid =
-          __atomic_load_n(&(sender->tid.futex), __ATOMIC_RELAXED);
+          __atomic_load_n(&(sender->tid.futex), __ATOMIC_ACQUIRE);
       if (tid & FUTEX_OWNER_DIED) {
         ++num_missing;
       } else {
+        // We can do a relaxed load here because we're the only person touching
+        // this sender at this point, if it matters. If it's not a dead sender,
+        // then any message it every has will already be accounted for, so this
+        // will always be a NOP.
         const Index scratch_index = sender->scratch_index.RelaxedLoad();
         if (!accounted_for[scratch_index.message_index()]) {
           ++num_accounted_for;
@@ -138,6 +159,7 @@
     }
 
     for (size_t i = 0; i < queue_size; ++i) {
+      // Same logic as above for scratch_index applies here too.
       const Index index = memory->GetQueue(i)->RelaxedLoad();
       if (!accounted_for[index.message_index()]) {
         ++num_accounted_for;
@@ -151,8 +173,10 @@
     for (size_t i = 0; i < num_senders; ++i) {
       Sender *sender = memory->GetSender(i);
       const uint32_t tid =
-          __atomic_load_n(&(sender->tid.futex), __ATOMIC_RELAXED);
+          __atomic_load_n(&(sender->tid.futex), __ATOMIC_ACQUIRE);
       if (tid & FUTEX_OWNER_DIED) {
+        // We can do relaxed loads here because we're the only person touching
+        // this sender at this point.
         const Index scratch_index = sender->scratch_index.RelaxedLoad();
         const Index to_replace = sender->to_replace.RelaxedLoad();
 
@@ -170,7 +194,7 @@
           sender->to_replace.Invalidate();
 
           // And then mark this sender clean.
-          __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_SEQ_CST);
+          __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_RELEASE);
 
           // And account for scratch_index.
           accounted_for[scratch_index.message_index()] = true;
@@ -188,7 +212,7 @@
           sender->to_replace.Invalidate();
 
           // And then mark this sender clean.
-          __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_SEQ_CST);
+          __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_RELEASE);
 
           // And account for to_replace.
           accounted_for[to_replace.message_index()] = true;
@@ -208,6 +232,7 @@
 
 // Exposes rt_tgsigqueueinfo so we can send the signal *just* to the target
 // thread.
+// TODO(Brian): Do directly in assembly for armhf at least for efficiency.
 int rt_tgsigqueueinfo(pid_t tgid, pid_t tid, int sig, siginfo_t *si) {
   return syscall(SYS_rt_tgsigqueueinfo, tgid, tid, sig, si);
 }
@@ -215,25 +240,25 @@
 }  // namespace
 
 size_t LocklessQueueMemorySize(LocklessQueueConfiguration config) {
-  // Round up the message size so following data is double aligned.  That should
-  // be overkill for most platforms.  And the checks below confirms it.
-  config.message_data_size = (config.message_data_size + 7) & ~0x7;
+  // Round up the message size so following data is aligned appropriately.
+  config.message_data_size =
+      LocklessQueueMemory::AlignmentRoundUp(config.message_data_size);
 
   // As we build up the size, confirm that everything is aligned to the
   // alignment requirements of the type.
   size_t size = sizeof(LocklessQueueMemory);
-  CHECK_EQ(size & (alignof(LocklessQueueMemory) - 1), 0u);
+  CHECK_EQ(size % alignof(LocklessQueueMemory), 0u);
 
-  CHECK_EQ(size & (alignof(AtomicIndex) - 1), 0u);
+  CHECK_EQ(size % alignof(AtomicIndex), 0u);
   size += LocklessQueueMemory::SizeOfQueue(config);
 
-  CHECK_EQ(size & (alignof(Message) - 1), 0u);
+  CHECK_EQ(size % alignof(Message), 0u);
   size += LocklessQueueMemory::SizeOfMessages(config);
 
-  CHECK_EQ(size & (alignof(Watcher) - 1), 0u);
+  CHECK_EQ(size % alignof(Watcher), 0u);
   size += LocklessQueueMemory::SizeOfWatchers(config);
 
-  CHECK_EQ(size & (alignof(Sender) - 1), 0u);
+  CHECK_EQ(size % alignof(Sender), 0u);
   size += LocklessQueueMemory::SizeOfSenders(config);
 
   return size;
@@ -246,15 +271,15 @@
 
   // Grab the mutex.  We don't care if the previous reader died.  We are going
   // to check everything anyways.
-  GrabQueueSetupLockOrDie(memory);
+  GrabQueueSetupLockOrDie grab_queue_setup_lock(memory);
 
   if (!memory->initialized) {
     // TODO(austin): Check these for out of bounds.
     memory->config.num_watchers = config.num_watchers;
     memory->config.num_senders = config.num_senders;
     memory->config.queue_size = config.queue_size;
-    // Round up to the nearest double word bytes.
-    memory->config.message_data_size = (config.message_data_size + 7) & ~0x7;
+    memory->config.message_data_size =
+        LocklessQueueMemory::AlignmentRoundUp(config.message_data_size);
 
     const size_t num_messages = memory->num_messages();
     // There need to be at most MaxMessages() messages allocated.
@@ -278,18 +303,18 @@
 
     for (size_t i = 0; i < memory->num_senders(); ++i) {
       ::aos::ipc_lib::Sender *s = memory->GetSender(i);
-      s->scratch_index.Store(Index(0xffff, i + memory->queue_size()));
+      // Nobody else can possibly be touching these because we haven't set
+      // initialized to true yet.
+      s->scratch_index.RelaxedStore(Index(0xffff, i + memory->queue_size()));
       s->to_replace.RelaxedInvalidate();
     }
 
+    aos_compiler_memory_barrier();
     // Signal everything is done.  This needs to be done last, so if we die, we
     // redo initialization.
-    // This is a full atomic (probably overkill), but this is at initialization
-    // time, so it is cheap.
-    memory->initialized.store(true);
+    memory->initialized = true;
   }
 
-  mutex_unlock(&(memory->queue_setup_lock));
   return memory;
 }
 
@@ -303,15 +328,15 @@
 LocklessQueue::~LocklessQueue() {
   CHECK_EQ(watcher_index_, -1);
 
-  GrabQueueSetupLockOrDie(memory_);
+  GrabQueueSetupLockOrDie grab_queue_setup_lock(memory_);
   const int num_watchers = memory_->num_watchers();
-  // Cleanup is cheap.  Go for it anyways.
+  // Cleanup is cheap. The next user will do it anyways, so no need for us to do
+  // anything right now.
 
   // And confirm that nothing is owned by us.
   for (int i = 0; i < num_watchers; ++i) {
-    CHECK(!mutex_islocked(&(memory_->GetWatcher(i)->tid)));
+    CHECK(!death_notification_is_held(&(memory_->GetWatcher(i)->tid)));
   }
-  mutex_unlock(&(memory_->queue_setup_lock));
 }
 
 size_t LocklessQueue::QueueSize() const { return memory_->queue_size(); }
@@ -323,15 +348,17 @@
   // Since everything is self consistent, all we need to do is make sure nobody
   // else is running.  Someone dying will get caught in the generic consistency
   // check.
-  GrabQueueSetupLockOrDie(memory_);
+  GrabQueueSetupLockOrDie grab_queue_setup_lock(memory_);
   const int num_watchers = memory_->num_watchers();
 
   // Now, find the first empty watcher and grab it.
   CHECK_EQ(watcher_index_, -1);
   for (int i = 0; i < num_watchers; ++i) {
+    // If we see a slot the kernel has marked as dead, everything we do reusing
+    // it needs to happen-after whatever that process did before dying.
     const uint32_t tid =
-        __atomic_load_n(&(memory_->GetWatcher(i)->tid.futex), __ATOMIC_RELAXED);
-    if (tid == 0 || tid & FUTEX_OWNER_DIED) {
+        __atomic_load_n(&(memory_->GetWatcher(i)->tid.futex), __ATOMIC_ACQUIRE);
+    if (tid == 0 || (tid & FUTEX_OWNER_DIED)) {
       watcher_index_ = i;
       break;
     }
@@ -339,7 +366,6 @@
 
   // Bail if we failed to find an open slot.
   if (watcher_index_ == -1) {
-    mutex_unlock(&(memory_->queue_setup_lock));
     return false;
   }
 
@@ -350,36 +376,27 @@
 
   // Grabbing a mutex is a compiler and memory barrier, so nothing before will
   // get rearranged afterwords.
-  //
-  // Since everything is done under the queue_setup_lock, this should always
-  // return immediately.
-  const int result = mutex_grab(&(w->tid));
-
-  mutex_unlock(&(memory_->queue_setup_lock));
-
-  // We should either get the lock, or the previous owner should have died.
-  // Anything else is a pretty serious error.
-  return result == 0 || result == 1;
+  death_notification_init(&(w->tid));
+  return true;
 }
 
 void LocklessQueue::UnregisterWakeup() {
   // Since everything is self consistent, all we need to do is make sure nobody
   // else is running.  Someone dying will get caught in the generic consistency
   // check.
-  GrabQueueSetupLockOrDie(memory_);
+  GrabQueueSetupLockOrDie grab_queue_setup_lock(memory_);
 
   // Make sure we are registered.
   CHECK_NE(watcher_index_, -1);
 
   // Make sure we still own the slot we are supposed to.
-  CHECK(mutex_islocked(&(memory_->GetWatcher(watcher_index_)->tid)));
+  CHECK(
+      death_notification_is_held(&(memory_->GetWatcher(watcher_index_)->tid)));
 
   // The act of unlocking invalidates the entry.  Invalidate it.
-  mutex_unlock(&(memory_->GetWatcher(watcher_index_)->tid));
+  death_notification_release(&(memory_->GetWatcher(watcher_index_)->tid));
   // And internally forget the slot.
   watcher_index_ = -1;
-
-  mutex_unlock(&(memory_->queue_setup_lock));
 }
 
 int LocklessQueue::Wakeup(const int current_priority) {
@@ -395,22 +412,26 @@
   // creating a pidfd is likely not RT.
   for (size_t i = 0; i < num_watchers; ++i) {
     Watcher *w = memory_->GetWatcher(i);
-    // Start by reading the tid.  This needs to be atomic to force it to come
-    // first.
-    watcher_copy_[i].tid = __atomic_load_n(&(w->tid.futex), __ATOMIC_SEQ_CST);
-    watcher_copy_[i].pid = w->pid;
-    watcher_copy_[i].priority = w->priority;
+    watcher_copy_[i].tid = __atomic_load_n(&(w->tid.futex), __ATOMIC_RELAXED);
+    // Force the load of the TID to come first.
+    aos_compiler_memory_barrier();
+    watcher_copy_[i].pid = w->pid.load(std::memory_order_relaxed);
+    watcher_copy_[i].priority = w->priority.load(std::memory_order_relaxed);
 
     // Use a priority of -1 to mean an invalid entry to make sorting easier.
     if (watcher_copy_[i].tid & FUTEX_OWNER_DIED || watcher_copy_[i].tid == 0) {
       watcher_copy_[i].priority = -1;
-    } else if (watcher_copy_[i].tid !=
-               static_cast<pid_t>(
-                   __atomic_load_n(&(w->tid.futex), __ATOMIC_SEQ_CST))) {
-      // Confirm that the watcher hasn't been re-used and modified while we read
-      // it.  If it has, mark it invalid again.
-      watcher_copy_[i].priority = -1;
-      watcher_copy_[i].tid = 0;
+    } else {
+      // Ensure all of this happens after we're done looking at the pid+priority
+      // in shared memory.
+      aos_compiler_memory_barrier();
+      if (watcher_copy_[i].tid != static_cast<pid_t>(__atomic_load_n(
+                                      &(w->tid.futex), __ATOMIC_RELAXED))) {
+        // Confirm that the watcher hasn't been re-used and modified while we
+        // read it.  If it has, mark it invalid again.
+        watcher_copy_[i].priority = -1;
+        watcher_copy_[i].tid = 0;
+      }
     }
   }
 
@@ -464,15 +485,18 @@
 }
 
 LocklessQueue::Sender::Sender(LocklessQueueMemory *memory) : memory_(memory) {
-  GrabQueueSetupLockOrDie(memory_);
+  GrabQueueSetupLockOrDie grab_queue_setup_lock(memory_);
 
   // Since we already have the lock, go ahead and try cleaning up.
-  Cleanup(memory_);
+  Cleanup(memory_, grab_queue_setup_lock);
 
   const int num_senders = memory_->num_senders();
 
   for (int i = 0; i < num_senders; ++i) {
     ::aos::ipc_lib::Sender *s = memory->GetSender(i);
+    // This doesn't need synchronization because we're the only process doing
+    // initialization right now, and nobody else will be touching senders which
+    // we're interested in.
     const uint32_t tid = __atomic_load_n(&(s->tid.futex), __ATOMIC_RELAXED);
     if (tid == 0) {
       sender_index_ = i;
@@ -486,17 +510,14 @@
 
   ::aos::ipc_lib::Sender *s = memory_->GetSender(sender_index_);
 
-  // Atomically grab the mutex.  This signals that we are alive.  If the
-  // previous owner died, we don't care, and want to grab the mutex anyways.
-  const int result = mutex_grab(&(s->tid));
-  CHECK(result == 0 || result == 1);
-
-  mutex_unlock(&(memory->queue_setup_lock));
+  // Indicate that we are now alive by taking over the slot. If the previous
+  // owner died, we still want to do this.
+  death_notification_init(&(s->tid));
 }
 
 LocklessQueue::Sender::~Sender() {
   if (memory_ != nullptr) {
-    mutex_unlock(&(memory_->GetSender(sender_index_)->tid));
+    death_notification_release(&(memory_->GetSender(sender_index_)->tid));
   }
 }
 
@@ -532,9 +553,11 @@
   const size_t queue_size = memory_->queue_size();
   CHECK_LE(length, size());
 
-  ::aos::ipc_lib::Sender *sender = memory_->GetSender(sender_index_);
-  Index scratch_index = sender->scratch_index.RelaxedLoad();
-  Message *message = memory_->GetMessage(scratch_index);
+  ::aos::ipc_lib::Sender *const sender = memory_->GetSender(sender_index_);
+  // We can do a relaxed load on our sender because we're the only person
+  // modifying it right now.
+  const Index scratch_index = sender->scratch_index.RelaxedLoad();
+  Message *const message = memory_->GetMessage(scratch_index);
 
   message->header.length = length;
 
@@ -545,7 +568,8 @@
 
     const QueueIndex incremented_queue_index = next_queue_index.Increment();
 
-    // TODO(austin): I think we can drop the barrier off this.
+    // This needs to synchronize with whoever the previous writer at this
+    // location was.
     const Index to_replace = memory_->LoadIndex(next_queue_index);
 
     const QueueIndex decremented_queue_index =
@@ -575,12 +599,8 @@
 
     // Confirm that the message is what it should be.
     {
-      // We just need this to be atomic and after the index has been calculated
-      // and before we exchange the index back in.  Both of those will be strong
-      // barriers, so this is fine.
       const QueueIndex previous_index =
-          memory_->GetMessage(to_replace)
-              ->header.queue_index.RelaxedLoad(queue_size);
+          memory_->GetMessage(to_replace)->header.queue_index.Load(queue_size);
       if (previous_index != decremented_queue_index && previous_index.valid()) {
         // Retry.
         VLOG(3) << "Something fishy happened, queue index doesn't match.  "
@@ -599,22 +619,30 @@
     // queue_index store.
     const Index index_to_write(next_queue_index, scratch_index.message_index());
 
+    aos_compiler_memory_barrier();
+    // We're the only person who cares about our scratch index, besides somebody
+    // cleaning up after us.
     sender->scratch_index.RelaxedStore(index_to_write);
+    aos_compiler_memory_barrier();
 
     message->header.queue_index.Store(next_queue_index);
 
+    aos_compiler_memory_barrier();
     // The message is now filled out, and we have a confirmed slot to store
     // into.
     //
     // Start by writing down what we are going to pull out of the queue.  This
-    // was Invalid before now.
+    // was Invalid before now. Only person who will read this is whoever cleans
+    // up after us, so no synchronization necessary.
     sender->to_replace.RelaxedStore(to_replace);
+    aos_compiler_memory_barrier();
 
     // Then exchange the next index into the queue.
     if (!memory_->GetQueue(next_queue_index.Wrapped())
              ->CompareAndExchangeStrong(to_replace, index_to_write)) {
       // Aw, didn't succeed.  Retry.
       sender->to_replace.RelaxedInvalidate();
+      aos_compiler_memory_barrier();
       VLOG(3) << "Failed to wrap into queue";
       continue;
     }
@@ -623,12 +651,11 @@
     memory_->next_queue_index.CompareAndExchangeStrong(actual_next_queue_index,
                                                        incremented_queue_index);
 
+    aos_compiler_memory_barrier();
     // Now update the scratch space and record that we succeeded.
     sender->scratch_index.Store(to_replace);
-    // And then clear out the entry used to replace.  This just needs to be
-    // atomic.  It can't be moved above the store because that is a full
-    // barrier, but delaying it until later will only affect things if something
-    // died.
+    aos_compiler_memory_barrier();
+    // And then record that we succeeded, but definitely after the above store.
     sender->to_replace.RelaxedInvalidate();
     break;
   }
@@ -665,7 +692,7 @@
         // Someone has re-used this message between when we pulled it out of the
         // queue and when we grabbed its index.  It is pretty hard to deduce
         // what happened. Just try again.
-        Message *new_m = memory_->GetMessage(queue_index);
+        Message *const new_m = memory_->GetMessage(queue_index);
         if (m != new_m) {
           m = new_m;
           VLOG(3) << "Retrying, m doesn't match";
@@ -698,7 +725,7 @@
           VLOG(3) << "Near zero, " << std::hex << uint32_queue_index;
           return ReadResult::NOTHING_NEW;
         } else {
-          VLOG(3) << "not near zero, " << std::hex << uint32_queue_index;
+          VLOG(3) << "Not near zero, " << std::hex << uint32_queue_index;
           return ReadResult::TOO_OLD;
         }
       }
@@ -719,6 +746,7 @@
   // want.  This means it didn't change out from under us.
   // If something changed out from under us, we were reading it much too late in
   // it's lifetime.
+  aos_compiler_memory_barrier();
   const QueueIndex final_queue_index = m->header.queue_index.Load(queue_size);
   if (final_queue_index != queue_index) {
     VLOG(3) << "Changed out from under us.  Reading " << std::hex
@@ -778,8 +806,7 @@
   ::std::cout << "LocklessQueueMemory (" << memory << ") {" << ::std::endl;
   ::std::cout << "  aos_mutex queue_setup_lock = "
               << PrintMutex(&memory->queue_setup_lock) << ::std::endl;
-  ::std::cout << "  ::std::atomic<bool> initialized = " << memory->initialized
-              << ::std::endl;
+  ::std::cout << "  bool initialized = " << memory->initialized << ::std::endl;
   ::std::cout << "  config {" << ::std::endl;
   ::std::cout << "    size_t num_watchers = " << memory->config.num_watchers
               << ::std::endl;
diff --git a/aos/ipc_lib/lockless_queue.h b/aos/ipc_lib/lockless_queue.h
index fcc5d79..4a8523b 100644
--- a/aos/ipc_lib/lockless_queue.h
+++ b/aos/ipc_lib/lockless_queue.h
@@ -4,7 +4,6 @@
 #include <signal.h>
 #include <sys/signalfd.h>
 #include <sys/types.h>
-#include <atomic>
 #include <vector>
 
 #include "aos/ipc_lib/aos_sync.h"
@@ -20,15 +19,16 @@
   // then this watcher is invalid.  The futex variable will then hold the tid of
   // the watcher, or FUTEX_OWNER_DIED if the task died.
   //
-  // Note: this is modified with a lock held, but is always able to be read.
+  // Note: this is only modified with the queue_setup_lock lock held, but may
+  // always be read.
   // Any state modification should happen before the lock is acquired.
   aos_mutex tid;
 
   // PID of the watcher.
-  pid_t pid;
+  std::atomic<pid_t> pid;
 
   // RT priority of the watcher.
-  int priority;
+  std::atomic<int> priority;
 };
 
 // Structure to hold the state required to send messages.
@@ -37,7 +37,8 @@
   // this sender is invalid.  The futex variable will then hold the tid of the
   // sender, or FUTEX_OWNER_DIED if the task died.
   //
-  // Note: this is modified with a lock held, but is always able to be read.
+  // Note: this is only modified with the queue_setup_lock lock held, but may
+  // always be read.
   aos_mutex tid;
 
   // Index of the message we will be filling out.
@@ -59,7 +60,7 @@
     // Note: a value of 0xffffffff always means that the contents aren't valid.
     AtomicQueueIndex queue_index;
 
-    // Timestamp of the message.  Needs to be atomically incrementing in the
+    // Timestamp of the message.  Needs to be monotonically incrementing in the
     // queue, which means that time needs to be re-sampled every time a write
     // fails.
     ::aos::monotonic_clock::time_point monotonic_sent_time;
diff --git a/aos/ipc_lib/lockless_queue_memory.h b/aos/ipc_lib/lockless_queue_memory.h
index 162403a..0c0973c 100644
--- a/aos/ipc_lib/lockless_queue_memory.h
+++ b/aos/ipc_lib/lockless_queue_memory.h
@@ -2,33 +2,37 @@
 #define AOS_IPC_LIB_LOCKLESS_QUEUE_MEMORY_H_
 
 #include <sys/types.h>
-#include <atomic>
+#include <cstddef>
 
 #include "aos/ipc_lib/aos_sync.h"
 #include "aos/ipc_lib/index.h"
+#include "aos/ipc_lib/lockless_queue.h"
 #include "aos/time/time.h"
 
 namespace aos {
 namespace ipc_lib {
 
 struct LocklessQueueMemory {
+  // This is held during initialization. Cleanup after dead processes happens
+  // during this initialization process.
+  //
   // A lot of things get easier if the only lockless writes are when messages
   // are published.  Do note, the datastructures protected by this lock need to
   // be consistent at all times because a reader (or writer) may read them
   // regardless of if the lock is held or not.
-  //
-  // Any non-constant time operations need to be done at queue startup time,
-  // including cleanup.  Cleanup is done when the next queue is opened.
   aos_mutex queue_setup_lock;
-  ::std::atomic<bool> initialized;
+  // Tracks that one process has completed initialization once.
+  // Only accessed under queue_setup_lock.
+  bool initialized;
 
   LocklessQueueConfiguration config;
 
-  // Size of the watchers list.
+  // Size of the watcher list.
   size_t num_watchers() const { return config.num_watchers; }
   // Size of the sender list.
   size_t num_senders() const { return config.num_senders; }
 
+  // Number of messages logically in the queue at a time.
   // List of pointers into the messages list.
   size_t queue_size() const { return config.queue_size; }
 
@@ -55,59 +59,60 @@
   // Watcher watchers[config.num_watchers];
   // Sender senders[config.num_senders];
 
+  static constexpr size_t kDataAlignment = alignof(std::max_align_t);
+
   // Aligned pointer to where the data starts.
-  // Use a 64 bit type to require 64 bit alignment of the data inside.
-  uint64_t data[];
+  // Make sure it's of a type which can safely alias with the actual datatype,
+  // and is aligned enough for any of the structs we will access this memory as.
+  alignas(kDataAlignment) char data[];
 
   // Memory size functions for all 4 lists.
   size_t SizeOfQueue() { return SizeOfQueue(config); }
   static size_t SizeOfQueue(LocklessQueueConfiguration config) {
-    return sizeof(AtomicIndex) * config.queue_size;
+    return AlignmentRoundUp(sizeof(AtomicIndex) * config.queue_size);
   }
 
   size_t SizeOfMessages() { return SizeOfMessages(config); }
   static size_t SizeOfMessages(LocklessQueueConfiguration config) {
-    return config.message_size() * config.num_messages();
+    return AlignmentRoundUp(config.message_size() * config.num_messages());
   }
 
   size_t SizeOfWatchers() { return SizeOfWatchers(config); }
   static size_t SizeOfWatchers(LocklessQueueConfiguration config) {
-    return sizeof(Watcher) * config.num_watchers;
+    return AlignmentRoundUp(sizeof(Watcher) * config.num_watchers);
   }
 
   size_t SizeOfSenders() { return SizeOfSenders(config); }
   static size_t SizeOfSenders(LocklessQueueConfiguration config) {
-    return sizeof(Sender) * config.num_senders;
+    return AlignmentRoundUp(sizeof(Sender) * config.num_senders);
   }
 
   // Getters for each of the 4 lists.
   Sender *GetSender(size_t sender_index) {
-    return reinterpret_cast<Sender *>(
-        reinterpret_cast<uintptr_t>(&data[0]) + SizeOfQueue() +
-        SizeOfMessages() + SizeOfWatchers() + sender_index * sizeof(Sender));
+    return reinterpret_cast<Sender *>(&data[0] + SizeOfQueue() +
+                                      SizeOfMessages() + SizeOfWatchers() +
+                                      sender_index * sizeof(Sender));
   }
 
   Watcher *GetWatcher(size_t watcher_index) {
-    return reinterpret_cast<Watcher *>(reinterpret_cast<uintptr_t>(&data[0]) +
-                                       SizeOfQueue() + SizeOfMessages() +
+    return reinterpret_cast<Watcher *>(&data[0] + SizeOfQueue() +
+                                       SizeOfMessages() +
                                        watcher_index * sizeof(Watcher));
   }
 
   AtomicIndex *GetQueue(uint32_t index) {
-    return reinterpret_cast<AtomicIndex *>(
-        reinterpret_cast<uintptr_t>(&data[0]) + sizeof(AtomicIndex) * index);
+    return reinterpret_cast<AtomicIndex *>(&data[0] +
+                                           sizeof(AtomicIndex) * index);
   }
 
   // There are num_senders + queue_size messages.  The free list is really the
   // sender list, since those are messages available to be filled in and sent.
   // This removes the need to find lost messages when a sender dies.
   Message *GetMessage(Index index) {
-    return reinterpret_cast<Message *>(reinterpret_cast<uintptr_t>(&data[0]) +
-                                       SizeOfQueue() +
+    return reinterpret_cast<Message *>(&data[0] + SizeOfQueue() +
                                        index.message_index() * message_size());
   }
 
-
   // Helpers to fetch messages from the queue.
   Index LoadIndex(QueueIndex index) {
     return GetQueue(index.Wrapped())->Load();
@@ -115,6 +120,10 @@
   Message *GetMessage(QueueIndex index) {
     return GetMessage(LoadIndex(index));
   }
+
+  static constexpr size_t AlignmentRoundUp(size_t in) {
+    return (in + kDataAlignment - 1) / kDataAlignment * kDataAlignment;
+  }
 };
 
 }  // namespace ipc_lib
diff --git a/aos/ipc_lib/signalfd.cc b/aos/ipc_lib/signalfd.cc
index af95598..363bf4a 100644
--- a/aos/ipc_lib/signalfd.cc
+++ b/aos/ipc_lib/signalfd.cc
@@ -12,21 +12,35 @@
 
 SignalFd::SignalFd(::std::initializer_list<unsigned int> signals) {
   // Build up the mask with the provided signals.
-  sigemptyset(&mask_);
+  CHECK_EQ(0, sigemptyset(&blocked_mask_));
   for (int signal : signals) {
-    sigaddset(&mask_, signal);
+    CHECK_EQ(0, sigaddset(&blocked_mask_, signal));
   }
   // Then build a signalfd.  Make it nonblocking so it works well with an epoll
   // loop, and have it close on exec.
-  PCHECK((fd_ = signalfd(-1, &mask_, SFD_NONBLOCK | SFD_CLOEXEC)) != 0);
+  PCHECK((fd_ = signalfd(-1, &blocked_mask_, SFD_NONBLOCK | SFD_CLOEXEC)) != 0);
   // Now that we have a consumer of the signal, block the signals so the
-  // signalfd gets them.
-  pthread_sigmask(SIG_BLOCK, &mask_, nullptr);
+  // signalfd gets them. Record which ones we actually blocked, so we can
+  // unblock just those later.
+  sigset_t old_mask;
+  CHECK_EQ(0, pthread_sigmask(SIG_BLOCK, &blocked_mask_, &old_mask));
+  for (int signal : signals) {
+    if (sigismember(&old_mask, signal)) {
+      CHECK_EQ(0, sigdelset(&blocked_mask_, signal));
+    }
+  }
 }
 
 SignalFd::~SignalFd() {
-  // Unwind the constructor.  Unblock the signals and close the fd.
-  pthread_sigmask(SIG_UNBLOCK, &mask_, nullptr);
+  // Unwind the constructor. Unblock the signals and close the fd. Verify nobody
+  // else unblocked the signals we're supposed to unblock in the meantime.
+  sigset_t old_mask;
+  CHECK_EQ(0, pthread_sigmask(SIG_UNBLOCK, &blocked_mask_, &old_mask));
+  sigset_t unblocked_mask;
+  CHECK_EQ(0, sigandset(&unblocked_mask, &blocked_mask_, &old_mask));
+  if (memcmp(&unblocked_mask, &blocked_mask_, sizeof(unblocked_mask)) != 0) {
+    LOG(FATAL) << "Some other code unblocked one or more of our signals";
+  }
   PCHECK(close(fd_) == 0);
 }
 
@@ -39,6 +53,8 @@
   // by setting the signal number to 0.
   if (ret != static_cast<int>(sizeof(signalfd_siginfo))) {
     result.ssi_signo = 0;
+  } else {
+    CHECK_NE(0u, result.ssi_signo);
   }
   return result;
 }
diff --git a/aos/ipc_lib/signalfd.h b/aos/ipc_lib/signalfd.h
index 7d2021a..b3a9063 100644
--- a/aos/ipc_lib/signalfd.h
+++ b/aos/ipc_lib/signalfd.h
@@ -29,7 +29,8 @@
  private:
   int fd_ = -1;
 
-  sigset_t mask_;
+  // The signals we blocked in the constructor.
+  sigset_t blocked_mask_;
 };
 
 }  // namespace ipc_lib
diff --git a/aos/ipc_lib/signalfd_test.cc b/aos/ipc_lib/signalfd_test.cc
new file mode 100644
index 0000000..b4fe74b
--- /dev/null
+++ b/aos/ipc_lib/signalfd_test.cc
@@ -0,0 +1,76 @@
+#include "aos/ipc_lib/signalfd.h"
+
+#include "gtest/gtest.h"
+#include "glog/logging.h"
+#include "aos/testing/test_logging.h"
+
+namespace aos {
+namespace ipc_lib {
+namespace testing {
+
+// Tests in this file use separate threads to isolate all manipulation of signal
+// masks between test cases.
+
+// Verify that SignalFd will leave signals unblocked if we ask it to.
+TEST(SignalFdTest, LeaveSignalBlocked) {
+  ::aos::testing::EnableTestLogging();
+  std::thread thread([]() {
+    {
+      sigset_t test_mask;
+      CHECK_EQ(0, sigemptyset(&test_mask));
+      CHECK_EQ(0, sigaddset(&test_mask, SIGUSR1));
+      PCHECK(sigprocmask(SIG_BLOCK, &test_mask, nullptr) == 0);
+    }
+    SignalFd({SIGUSR1});
+    {
+      sigset_t blocked_now;
+      PCHECK(sigprocmask(SIG_BLOCK, nullptr, &blocked_now) == 0);
+      ASSERT_TRUE(sigismember(&blocked_now, SIGUSR1));
+    }
+  });
+  thread.join();
+}
+
+// Verify that SignalFd actually blocks the requested signals, and unblocks them
+// afterwards.
+TEST(SignalFdTest, BlockSignal) {
+  ::aos::testing::EnableTestLogging();
+  std::thread thread([]() {
+    {
+      sigset_t blocked_now;
+      PCHECK(sigprocmask(SIG_BLOCK, nullptr, &blocked_now) == 0);
+      ASSERT_FALSE(sigismember(&blocked_now, SIGUSR1));
+    }
+    {
+      SignalFd signalfd({SIGUSR1});
+      sigset_t blocked_now;
+      PCHECK(sigprocmask(SIG_BLOCK, nullptr, &blocked_now) == 0);
+      ASSERT_TRUE(sigismember(&blocked_now, SIGUSR1));
+    }
+    {
+      sigset_t blocked_now;
+      PCHECK(sigprocmask(SIG_BLOCK, nullptr, &blocked_now) == 0);
+      ASSERT_FALSE(sigismember(&blocked_now, SIGUSR1));
+    }
+  });
+  thread.join();
+}
+
+// Verify that SignalFd responds correctly when some other code unblocks one of
+// its signals.
+TEST(SignalFdDeathTest, ExternalUnblockSignal) {
+  ::aos::testing::EnableTestLogging();
+  EXPECT_DEATH(
+      {
+        SignalFd signalfd({SIGUSR1});
+        sigset_t test_mask;
+        CHECK_EQ(0, sigemptyset(&test_mask));
+        CHECK_EQ(0, sigaddset(&test_mask, SIGUSR1));
+        PCHECK(sigprocmask(SIG_UNBLOCK, &test_mask, nullptr) == 0);
+      },
+      "Some other code unblocked one or more of our signals");
+}
+
+}  // namespace testing
+}  // namespace ipc_lib
+}  // namespace aos
diff --git a/aos/util/compiler_memory_barrier.h b/aos/util/compiler_memory_barrier.h
index 33da511..6be3c59 100644
--- a/aos/util/compiler_memory_barrier.h
+++ b/aos/util/compiler_memory_barrier.h
@@ -5,7 +5,7 @@
 // Using this function makes it clearer what you're doing and easier to be
 // portable.
 static inline void aos_compiler_memory_barrier(void) {
-  __asm__ __volatile__("" ::: "memory");
+  __asm__ __volatile__("" ::: "memory", "cc");
 }
 
 #endif  // AOS_UTIL_COMPILER_MEMORY_BARRIER_H_