aos: Detect lockless queue owner death more reliably

When the OOM killer kills us, or the process otherwise dies
aggressively, the robust futex cleanup doesn't happen. This results in
senders, watchers, or pinners getting leaked until reboot.

Fix this by both checking that the tid exists, along with tracking and
confirming that it's start time matches the original start time.  That
should let us catch the PID collision case reliably.  We only need to do
the exhaustive check when constructing the queue, so it is OK to be
expensive.

Because we're changing the format in the SHMEM files here (i.e. adding
the `start_time_ticks` field) we need to bump the queue version
number.

Mostly written by Phil Schrader.

Change-Id: I6bff78b6933fed2e0163bcee26138b6a8af857ad
Co-authored-by: Austin Schuh <austin.schuh@bluerivertech.com>
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/event_loop_tmpl.h b/aos/events/event_loop_tmpl.h
index 9a5fe19..132da04 100644
--- a/aos/events/event_loop_tmpl.h
+++ b/aos/events/event_loop_tmpl.h
@@ -408,7 +408,8 @@
   void set_timing_report(timing::Watcher *watcher);
   void ResetReport();
 
-  virtual void Startup(EventLoop *event_loop) = 0;
+  virtual void Construct() = 0;
+  virtual void Startup() = 0;
 
  protected:
   const int channel_index_;
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index 2fbb445..7f06ee0 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -557,10 +557,14 @@
     event_loop_->RemoveEvent(&event_);
   }
 
-  void Startup(EventLoop *event_loop) override {
+  void Construct() override {
+    event_loop_->CheckCurrentThread();
+    CHECK(RegisterWakeup(event_loop_->runtime_realtime_priority()));
+  }
+
+  void Startup() override {
     event_loop_->CheckCurrentThread();
     simple_shm_fetcher_.PointAtNextQueueIndex();
-    CHECK(RegisterWakeup(event_loop->runtime_realtime_priority()));
   }
 
   // Returns true if there is new data available.
@@ -1048,6 +1052,16 @@
     if (!CPU_EQUAL(&affinity_, &default_affinity)) {
       ::aos::SetCurrentThreadAffinity(affinity_);
     }
+
+    // Construct the watchers, but don't update the next pointer. This also
+    // cleans up any watchers that previously died, and puts the nonrt work
+    // before going realtime.  After this happens, we will start queueing
+    // signals (which may be a bit of extra work to process, but won't cause any
+    // messages to be lost).
+    for (::std::unique_ptr<WatcherState> &watcher : watchers_) {
+      watcher->Construct();
+    }
+
     // Now, all the callbacks are setup.  Lock everything into memory and go RT.
     if (priority_ != 0) {
       ::aos::InitRT();
@@ -1059,9 +1073,10 @@
     set_is_running(true);
 
     // Now that we are realtime (but before the OnRun handlers run), snap the
-    // queue index.
+    // queue index pointer to the newest message. This happens in RT so that we
+    // minimize the risk of losing messages.
     for (::std::unique_ptr<WatcherState> &watcher : watchers_) {
-      watcher->Startup(this);
+      watcher->Startup();
     }
 
     // Now that we are RT, run all the OnRun handlers.
diff --git a/aos/events/shm_event_loop_test.cc b/aos/events/shm_event_loop_test.cc
index 6b40b9d..2382f07 100644
--- a/aos/events/shm_event_loop_test.cc
+++ b/aos/events/shm_event_loop_test.cc
@@ -26,12 +26,12 @@
     }
 
     // Clean up anything left there before.
-    unlink((FLAGS_shm_base + "/test/aos.TestMessage.v5").c_str());
-    unlink((FLAGS_shm_base + "/test1/aos.TestMessage.v5").c_str());
-    unlink((FLAGS_shm_base + "/test2/aos.TestMessage.v5").c_str());
-    unlink((FLAGS_shm_base + "/test2/aos.TestMessage.v5").c_str());
-    unlink((FLAGS_shm_base + "/aos/aos.timing.Report.v5").c_str());
-    unlink((FLAGS_shm_base + "/aos/aos.logging.LogMessageFbs.v5").c_str());
+    unlink((FLAGS_shm_base + "/test/aos.TestMessage.v6").c_str());
+    unlink((FLAGS_shm_base + "/test1/aos.TestMessage.v6").c_str());
+    unlink((FLAGS_shm_base + "/test2/aos.TestMessage.v6").c_str());
+    unlink((FLAGS_shm_base + "/test2/aos.TestMessage.v6").c_str());
+    unlink((FLAGS_shm_base + "/aos/aos.timing.Report.v6").c_str());
+    unlink((FLAGS_shm_base + "/aos/aos.logging.LogMessageFbs.v6").c_str());
   }
 
   ~ShmEventLoopTestFactory() { FLAGS_override_hostname = ""; }
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index 7b84380..5cb67b4 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -100,7 +100,8 @@
 
   void Handle() noexcept override;
 
-  void Startup(EventLoop * /*event_loop*/) override {}
+  void Construct() override {}
+  void Startup() override {}
 
   void Schedule(std::shared_ptr<SimulatedMessage> message);
 
diff --git a/aos/ipc_lib/BUILD b/aos/ipc_lib/BUILD
index d7dee64..d51ee25 100644
--- a/aos/ipc_lib/BUILD
+++ b/aos/ipc_lib/BUILD
@@ -212,6 +212,7 @@
         "//aos/events:context",
         "//aos/time",
         "//aos/util:compiler_memory_barrier",
+        "//aos/util:top",
         "@com_github_google_glog//:glog",
         "@com_google_absl//absl/strings",
         "@com_google_absl//absl/types:span",
diff --git a/aos/ipc_lib/lockless_queue.cc b/aos/ipc_lib/lockless_queue.cc
index 0752e2f..f033e60 100644
--- a/aos/ipc_lib/lockless_queue.cc
+++ b/aos/ipc_lib/lockless_queue.cc
@@ -172,7 +172,7 @@
   size_t valid_senders = 0;
   for (size_t i = 0; i < num_senders; ++i) {
     Sender *sender = memory->GetSender(i);
-    if (!sender->ownership_tracker.LoadAcquire().OwnerIsDead()) {
+    if (!sender->ownership_tracker.OwnerIsDefinitelyAbsolutelyDead()) {
       // Not dead.
       ++valid_senders;
       continue;
@@ -257,7 +257,7 @@
   // read it before it's set.
   for (size_t i = 0; i < num_pinners; ++i) {
     Pinner *const pinner = memory->GetPinner(i);
-    if (!pinner->ownership_tracker.LoadAcquire().OwnerIsDead()) {
+    if (!pinner->ownership_tracker.OwnerIsDefinitelyAbsolutelyDead()) {
       continue;
     }
     pinner->pinned.Invalidate();
@@ -280,7 +280,7 @@
     num_missing = 0;
     for (size_t i = 0; i < num_senders; ++i) {
       Sender *const sender = memory->GetSender(i);
-      if (sender->ownership_tracker.LoadAcquire().OwnerIsDead()) {
+      if (sender->ownership_tracker.OwnerIsDefinitelyAbsolutelyDead()) {
         if (!need_recovery[i]) {
           return false;
         }
@@ -325,7 +325,7 @@
     const size_t starting_num_missing = num_missing;
     for (size_t i = 0; i < num_senders; ++i) {
       Sender *sender = memory->GetSender(i);
-      if (!sender->ownership_tracker.LoadAcquire().OwnerIsDead()) {
+      if (!sender->ownership_tracker.OwnerIsDefinitelyAbsolutelyDead()) {
         CHECK(!need_recovery[i]) << ": Somebody else recovered a sender: " << i;
         continue;
       }
@@ -733,7 +733,8 @@
     // it needs to happen-after whatever that process did before dying.
     auto *const ownership_tracker =
         &(memory_->GetWatcher(i)->ownership_tracker);
-    if (ownership_tracker->LoadAcquire().IsUnclaimedOrOwnerIsDead()) {
+    if (ownership_tracker->LoadAcquire().IsUnclaimed() ||
+        ownership_tracker->OwnerIsDefinitelyAbsolutelyDead()) {
       watcher_index_ = i;
       // Relaxed is OK here because we're the only task going to touch it
       // between here and the write in death_notification_init below (other
diff --git a/aos/ipc_lib/memory_mapped_queue.cc b/aos/ipc_lib/memory_mapped_queue.cc
index 79f27d9..594febb 100644
--- a/aos/ipc_lib/memory_mapped_queue.cc
+++ b/aos/ipc_lib/memory_mapped_queue.cc
@@ -17,7 +17,7 @@
 
 std::string ShmPath(std::string_view shm_base, const Channel *channel) {
   CHECK(channel->has_type());
-  return ShmFolder(shm_base, channel) + channel->type()->str() + ".v5";
+  return ShmFolder(shm_base, channel) + channel->type()->str() + ".v6";
 }
 
 void PageFaultDataWrite(char *data, size_t size) {
diff --git a/aos/ipc_lib/robust_ownership_tracker.h b/aos/ipc_lib/robust_ownership_tracker.h
index 968c2f7..6f0d222 100644
--- a/aos/ipc_lib/robust_ownership_tracker.h
+++ b/aos/ipc_lib/robust_ownership_tracker.h
@@ -2,10 +2,12 @@
 #define AOS_IPC_LIB_ROBUST_OWNERSHIP_TRACKER_H_
 
 #include <linux/futex.h>
+#include <sys/syscall.h>
 
 #include <string>
 
 #include "aos/ipc_lib/aos_sync.h"
+#include "aos/util/top.h"
 
 namespace aos::ipc_lib {
 
@@ -28,11 +30,6 @@
   // Returns true if no one has claimed ownership.
   bool IsUnclaimed() const { return futex_ == 0; }
 
-  // Returns true if either ownership hasn't been acquired or the owner died.
-  bool IsUnclaimedOrOwnerIsDead() const {
-    return IsUnclaimed() || OwnerIsDead();
-  }
-
   // Returns the thread ID (a.k.a. "tid") of the owning thread. Use this when
   // trying to access the /proc entry that corresponds to the owning thread for
   // example. Do not use the futex value directly.
@@ -55,19 +52,59 @@
 // All methods other than Load* must be accessed under a mutex.
 class RobustOwnershipTracker {
  public:
-  // Loads all the contents of the ownership tracker with Acquire memory
-  // ordering.
+  static constexpr uint64_t kNoStartTimeTicks =
+      std::numeric_limits<uint64_t>::max();
+
+  static uint64_t ReadStartTimeTicks(pid_t tid) {
+    if (tid == 0) {
+      return kNoStartTimeTicks;
+    }
+    std::optional<aos::util::ProcStat> proc_stat = util::ReadProcStat(tid);
+    if (!proc_stat.has_value()) {
+      return kNoStartTimeTicks;
+    }
+    return proc_stat->start_time_ticks;
+  }
+
+  // Loads the realtime-compatible contents of the ownership tracker with
+  // Acquire memory ordering.
   ThreadOwnerStatusSnapshot LoadAcquire() const {
     return ThreadOwnerStatusSnapshot(
         __atomic_load_n(&(mutex_.futex), __ATOMIC_ACQUIRE));
   }
 
-  // Loads all the contents of the ownership tracker with Relaxed memory order.
+  // Loads all the realtime-compatible contents of the ownership tracker with
+  // Relaxed memory order.
   ThreadOwnerStatusSnapshot LoadRelaxed() const {
     return ThreadOwnerStatusSnapshot(
         __atomic_load_n(&(mutex_.futex), __ATOMIC_RELAXED));
   }
 
+  // Checks both the robust futex and dredges through /proc to see if the thread
+  // is alive. As per the class description, this must only be called under a
+  // mutex. This must not be called in a realtime context and it is slow.
+  bool OwnerIsDefinitelyAbsolutelyDead() const {
+    auto loaded = LoadAcquire();
+    if (loaded.OwnerIsDead()) {
+      return true;
+    }
+    if (loaded.IsUnclaimed()) {
+      return false;
+    }
+    const uint64_t proc_start_time_ticks = ReadStartTimeTicks(loaded.tid());
+    if (proc_start_time_ticks == kNoStartTimeTicks) {
+      LOG(ERROR) << "Detected that PID " << loaded.tid() << " died.";
+      return true;
+    }
+
+    if (proc_start_time_ticks != start_time_ticks_) {
+      LOG(ERROR) << "Detected that PID " << loaded.tid()
+                 << " died from a starttime missmatch.";
+      return true;
+    }
+    return false;
+  }
+
   // Clears all ownership state.
   //
   // This should only really be called if you are 100% certain that the owner is
@@ -79,6 +116,7 @@
     // want the kernel to know about this element via the linked list the next
     // time someone takes ownership.
     __atomic_store_n(&(mutex_.futex), 0, __ATOMIC_RELEASE);
+    start_time_ticks_ = kNoStartTimeTicks;
   }
 
   // Returns true if this thread holds ownership.
@@ -86,7 +124,15 @@
 
   // Acquires ownership. Other threads will know that this thread holds the
   // ownership or be notified if this thread dies.
-  void Acquire() { death_notification_init(&mutex_); }
+  void Acquire() {
+    pid_t tid = syscall(SYS_gettid);
+    assert(tid > 0);
+    const uint64_t proc_start_time_ticks = ReadStartTimeTicks(tid);
+    CHECK_NE(proc_start_time_ticks, kNoStartTimeTicks);
+
+    start_time_ticks_ = proc_start_time_ticks;
+    death_notification_init(&mutex_);
+  }
 
   // Releases ownership.
   //
@@ -94,6 +140,7 @@
   void Release() {
     // Must be opposite order of Acquire.
     death_notification_release(&mutex_);
+    start_time_ticks_ = kNoStartTimeTicks;
   }
 
   // Marks the owner as dead if the specified tid is the current owner. In other
@@ -118,6 +165,9 @@
   //   appropriately.
   // - Owners can clean up after dead threads.
   aos_mutex mutex_;
+
+  // Thread's start time ticks.
+  std::atomic<uint64_t> start_time_ticks_;
 };
 
 }  // namespace aos::ipc_lib