aos: Refactor lockless_queue ownership into a class

This sets us up to make it more robust with a safer API.  Our end goal
is to more reliably detect thread deaths.

Change-Id: I00fad59a4d77eec31f0f51c85834b0892d2d2462
Co-authored-By: Austin Schuh <austin.schuh@bluerivertech.com>
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/ipc_lib/lockless_queue.cc b/aos/ipc_lib/lockless_queue.cc
index fd3a305..0752e2f 100644
--- a/aos/ipc_lib/lockless_queue.cc
+++ b/aos/ipc_lib/lockless_queue.cc
@@ -172,9 +172,7 @@
   size_t valid_senders = 0;
   for (size_t i = 0; i < num_senders; ++i) {
     Sender *sender = memory->GetSender(i);
-    const uint32_t tid =
-        __atomic_load_n(&(sender->tid.futex), __ATOMIC_ACQUIRE);
-    if (!(tid & FUTEX_OWNER_DIED)) {
+    if (!sender->ownership_tracker.LoadAcquire().OwnerIsDead()) {
       // Not dead.
       ++valid_senders;
       continue;
@@ -227,7 +225,7 @@
       // always be a NOP if it's in 1), verified by a DCHECK.
       memory->GetMessage(scratch_index)->header.queue_index.RelaxedInvalidate();
 
-      __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_RELEASE);
+      sender->ownership_tracker.ForceClear();
       ++valid_senders;
       continue;
     }
@@ -245,7 +243,7 @@
       aos_compiler_memory_barrier();
 
       // And mark that we succeeded.
-      __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_RELEASE);
+      sender->ownership_tracker.ForceClear();
       ++valid_senders;
       continue;
     }
@@ -259,13 +257,11 @@
   // read it before it's set.
   for (size_t i = 0; i < num_pinners; ++i) {
     Pinner *const pinner = memory->GetPinner(i);
-    const uint32_t tid =
-        __atomic_load_n(&(pinner->tid.futex), __ATOMIC_ACQUIRE);
-    if (!(tid & FUTEX_OWNER_DIED)) {
+    if (!pinner->ownership_tracker.LoadAcquire().OwnerIsDead()) {
       continue;
     }
     pinner->pinned.Invalidate();
-    __atomic_store_n(&(pinner->tid.futex), 0, __ATOMIC_RELEASE);
+    pinner->ownership_tracker.ForceClear();
   }
 
   // If all the senders are (or were made) good, there is no need to do the hard
@@ -284,9 +280,7 @@
     num_missing = 0;
     for (size_t i = 0; i < num_senders; ++i) {
       Sender *const sender = memory->GetSender(i);
-      const uint32_t tid =
-          __atomic_load_n(&(sender->tid.futex), __ATOMIC_ACQUIRE);
-      if (tid & FUTEX_OWNER_DIED) {
+      if (sender->ownership_tracker.LoadAcquire().OwnerIsDead()) {
         if (!need_recovery[i]) {
           return false;
         }
@@ -331,9 +325,7 @@
     const size_t starting_num_missing = num_missing;
     for (size_t i = 0; i < num_senders; ++i) {
       Sender *sender = memory->GetSender(i);
-      const uint32_t tid =
-          __atomic_load_n(&(sender->tid.futex), __ATOMIC_ACQUIRE);
-      if (!(tid & FUTEX_OWNER_DIED)) {
+      if (!sender->ownership_tracker.LoadAcquire().OwnerIsDead()) {
         CHECK(!need_recovery[i]) << ": Somebody else recovered a sender: " << i;
         continue;
       }
@@ -368,7 +360,7 @@
             ->header.queue_index.RelaxedInvalidate();
 
         // And then mark this sender clean.
-        __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_RELEASE);
+        sender->ownership_tracker.ForceClear();
         need_recovery[i] = false;
 
         // And account for scratch_index.
@@ -394,7 +386,7 @@
         sender->to_replace.Invalidate();
 
         // And then mark this sender clean.
-        __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_RELEASE);
+        sender->ownership_tracker.ForceClear();
         need_recovery[i] = false;
 
         // And account for to_replace.
@@ -437,6 +429,14 @@
 
 }  // namespace
 
+bool PretendThatOwnerIsDeadForTesting(aos_mutex *mutex, pid_t tid) {
+  if (static_cast<pid_t>(mutex->futex & FUTEX_TID_MASK) == tid) {
+    mutex->futex = FUTEX_OWNER_DIED;
+    return true;
+  }
+  return false;
+}
+
 size_t LocklessQueueConfiguration::message_size() const {
   // Round up the message size so following data is aligned appropriately.
   // Make sure to leave space to align the message data. It will be aligned
@@ -685,11 +685,10 @@
   CHECK_NE(watcher_index_, -1);
 
   // Make sure we still own the slot we are supposed to.
-  CHECK(
-      death_notification_is_held(&(memory_->GetWatcher(watcher_index_)->tid)));
+  CHECK(memory_->GetWatcher(watcher_index_)->ownership_tracker.IsHeldBySelf());
 
   // The act of unlocking invalidates the entry.  Invalidate it.
-  death_notification_release(&(memory_->GetWatcher(watcher_index_)->tid));
+  memory_->GetWatcher(watcher_index_)->ownership_tracker.Release();
   // And internally forget the slot.
   watcher_index_ = -1;
 
@@ -699,7 +698,7 @@
   // And confirm that nothing is owned by us.
   const int num_watchers = memory_->num_watchers();
   for (int i = 0; i < num_watchers; ++i) {
-    CHECK(!death_notification_is_held(&(memory_->GetWatcher(i)->tid)))
+    CHECK(!memory_->GetWatcher(i)->ownership_tracker.IsHeldBySelf())
         << ": " << i;
   }
 }
@@ -732,14 +731,14 @@
   for (int i = 0; i < num_watchers; ++i) {
     // If we see a slot the kernel has marked as dead, everything we do reusing
     // it needs to happen-after whatever that process did before dying.
-    auto *const futex = &(memory_->GetWatcher(i)->tid.futex);
-    const uint32_t tid = __atomic_load_n(futex, __ATOMIC_ACQUIRE);
-    if (tid == 0 || (tid & FUTEX_OWNER_DIED)) {
+    auto *const ownership_tracker =
+        &(memory_->GetWatcher(i)->ownership_tracker);
+    if (ownership_tracker->LoadAcquire().IsUnclaimedOrOwnerIsDead()) {
       watcher_index_ = i;
       // Relaxed is OK here because we're the only task going to touch it
       // between here and the write in death_notification_init below (other
       // recovery is blocked by us holding the setup lock).
-      __atomic_store_n(futex, 0, __ATOMIC_RELAXED);
+      ownership_tracker->ForceClear();
       break;
     }
   }
@@ -756,7 +755,7 @@
 
   // Grabbing a mutex is a compiler and memory barrier, so nothing before will
   // get rearranged afterwords.
-  death_notification_init(&(w->tid));
+  w->ownership_tracker.Acquire();
 }
 
 LocklessQueueWakeUpper::LocklessQueueWakeUpper(LocklessQueue queue)
@@ -778,25 +777,25 @@
   // creating a pidfd is likely not RT.
   for (size_t i = 0; i < num_watchers; ++i) {
     const Watcher *w = memory_->GetWatcher(i);
-    watcher_copy_[i].tid = __atomic_load_n(&(w->tid.futex), __ATOMIC_RELAXED);
+    watcher_copy_[i].ownership_snapshot = w->ownership_tracker.LoadRelaxed();
     // Force the load of the TID to come first.
     aos_compiler_memory_barrier();
     watcher_copy_[i].pid = w->pid.load(std::memory_order_relaxed);
     watcher_copy_[i].priority = w->priority.load(std::memory_order_relaxed);
 
     // Use a priority of -1 to mean an invalid entry to make sorting easier.
-    if (watcher_copy_[i].tid & FUTEX_OWNER_DIED || watcher_copy_[i].tid == 0) {
+    if (watcher_copy_[i].ownership_snapshot.OwnerIsDead() ||
+        watcher_copy_[i].ownership_snapshot.IsUnclaimed()) {
       watcher_copy_[i].priority = -1;
     } else {
       // Ensure all of this happens after we're done looking at the pid+priority
       // in shared memory.
       aos_compiler_memory_barrier();
-      if (watcher_copy_[i].tid != static_cast<pid_t>(__atomic_load_n(
-                                      &(w->tid.futex), __ATOMIC_RELAXED))) {
+      if (watcher_copy_[i].ownership_snapshot !=
+          w->ownership_tracker.LoadRelaxed()) {
         // Confirm that the watcher hasn't been re-used and modified while we
         // read it.  If it has, mark it invalid again.
         watcher_copy_[i].priority = -1;
-        watcher_copy_[i].tid = 0;
       }
     }
   }
@@ -835,8 +834,8 @@
       // Send the signal.  Target just the thread that sent it so that we can
       // support multiple watchers in a process (when someone creates multiple
       // event loops in different threads).
-      rt_tgsigqueueinfo(watcher_copy.pid, watcher_copy.tid, kWakeupSignal,
-                        &uinfo);
+      rt_tgsigqueueinfo(watcher_copy.pid, watcher_copy.ownership_snapshot.tid(),
+                        kWakeupSignal, &uinfo);
 
       ++count;
     }
@@ -872,8 +871,7 @@
     // This doesn't need synchronization because we're the only process doing
     // initialization right now, and nobody else will be touching senders which
     // we're interested in.
-    const uint32_t tid = __atomic_load_n(&(s->tid.futex), __ATOMIC_RELAXED);
-    if (tid == 0) {
+    if (s->ownership_tracker.LoadRelaxed().IsUnclaimed()) {
       sender_index_ = i;
       break;
     }
@@ -888,7 +886,7 @@
 
   // Indicate that we are now alive by taking over the slot. If the previous
   // owner died, we still want to do this.
-  death_notification_init(&(sender->tid));
+  sender->ownership_tracker.Acquire();
 
   const Index scratch_index = sender->scratch_index.RelaxedLoad();
   Message *const message = memory_->GetMessage(scratch_index);
@@ -899,7 +897,7 @@
 LocklessQueueSender::~LocklessQueueSender() {
   if (sender_index_ != -1) {
     CHECK(memory_ != nullptr);
-    death_notification_release(&(memory_->GetSender(sender_index_)->tid));
+    memory_->GetSender(sender_index_)->ownership_tracker.Release();
   }
 }
 
@@ -1183,8 +1181,7 @@
     // This doesn't need synchronization because we're the only process doing
     // initialization right now, and nobody else will be touching pinners which
     // we're interested in.
-    const uint32_t tid = __atomic_load_n(&(p->tid.futex), __ATOMIC_RELAXED);
-    if (tid == 0) {
+    if (p->ownership_tracker.LoadRelaxed().IsUnclaimed()) {
       pinner_index_ = i;
       break;
     }
@@ -1200,7 +1197,7 @@
 
   // Indicate that we are now alive by taking over the slot. If the previous
   // owner died, we still want to do this.
-  death_notification_init(&(p->tid));
+  p->ownership_tracker.Acquire();
 }
 
 LocklessQueuePinner::~LocklessQueuePinner() {
@@ -1208,7 +1205,7 @@
     CHECK(memory_ != nullptr);
     memory_->GetPinner(pinner_index_)->pinned.Invalidate();
     aos_compiler_memory_barrier();
-    death_notification_release(&(memory_->GetPinner(pinner_index_)->tid));
+    memory_->GetPinner(pinner_index_)->ownership_tracker.Release();
   }
 }
 
@@ -1622,8 +1619,8 @@
   for (size_t i = 0; i < memory->num_senders(); ++i) {
     const Sender *s = memory->GetSender(i);
     ::std::cout << "    [" << i << "] -> Sender {" << ::std::endl;
-    ::std::cout << "      aos_mutex tid = " << PrintMutex(&s->tid)
-                << ::std::endl;
+    ::std::cout << "      RobustOwnershipTracker ownership_tracker = "
+                << s->ownership_tracker.DebugString() << ::std::endl;
     ::std::cout << "      AtomicIndex scratch_index = "
                 << s->scratch_index.Load().DebugString() << ::std::endl;
     ::std::cout << "      AtomicIndex to_replace = "
@@ -1637,8 +1634,8 @@
   for (size_t i = 0; i < memory->num_pinners(); ++i) {
     const Pinner *p = memory->GetPinner(i);
     ::std::cout << "    [" << i << "] -> Pinner {" << ::std::endl;
-    ::std::cout << "      aos_mutex tid = " << PrintMutex(&p->tid)
-                << ::std::endl;
+    ::std::cout << "      RobustOwnershipTracker ownership_tracker = "
+                << p->ownership_tracker.DebugString() << ::std::endl;
     ::std::cout << "      AtomicIndex scratch_index = "
                 << p->scratch_index.Load().DebugString() << ::std::endl;
     ::std::cout << "      AtomicIndex pinned = "
@@ -1653,8 +1650,8 @@
   for (size_t i = 0; i < memory->num_watchers(); ++i) {
     const Watcher *w = memory->GetWatcher(i);
     ::std::cout << "    [" << i << "] -> Watcher {" << ::std::endl;
-    ::std::cout << "      aos_mutex tid = " << PrintMutex(&w->tid)
-                << ::std::endl;
+    ::std::cout << "      RobustOwnershipTracker ownership_tracker = "
+                << w->ownership_tracker.DebugString() << ::std::endl;
     ::std::cout << "      pid_t pid = " << w->pid << ::std::endl;
     ::std::cout << "      int priority = " << w->priority << ::std::endl;
     ::std::cout << "    }" << ::std::endl;