aos: Detect lockless queue owner death more reliably
When the OOM killer kills us, or the process otherwise dies
aggressively, the robust futex cleanup doesn't happen. This results in
senders, watchers, or pinners getting leaked until reboot.
Fix this by both checking that the tid exists, along with tracking and
confirming that it's start time matches the original start time. That
should let us catch the PID collision case reliably. We only need to do
the exhaustive check when constructing the queue, so it is OK to be
expensive.
Because we're changing the format in the SHMEM files here (i.e. adding
the `start_time_ticks` field) we need to bump the queue version
number.
Mostly written by Phil Schrader.
Change-Id: I6bff78b6933fed2e0163bcee26138b6a8af857ad
Co-authored-by: Austin Schuh <austin.schuh@bluerivertech.com>
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/event_loop_tmpl.h b/aos/events/event_loop_tmpl.h
index 9a5fe19..132da04 100644
--- a/aos/events/event_loop_tmpl.h
+++ b/aos/events/event_loop_tmpl.h
@@ -408,7 +408,8 @@
void set_timing_report(timing::Watcher *watcher);
void ResetReport();
- virtual void Startup(EventLoop *event_loop) = 0;
+ virtual void Construct() = 0;
+ virtual void Startup() = 0;
protected:
const int channel_index_;
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index 2fbb445..7f06ee0 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -557,10 +557,14 @@
event_loop_->RemoveEvent(&event_);
}
- void Startup(EventLoop *event_loop) override {
+ void Construct() override {
+ event_loop_->CheckCurrentThread();
+ CHECK(RegisterWakeup(event_loop_->runtime_realtime_priority()));
+ }
+
+ void Startup() override {
event_loop_->CheckCurrentThread();
simple_shm_fetcher_.PointAtNextQueueIndex();
- CHECK(RegisterWakeup(event_loop->runtime_realtime_priority()));
}
// Returns true if there is new data available.
@@ -1048,6 +1052,16 @@
if (!CPU_EQUAL(&affinity_, &default_affinity)) {
::aos::SetCurrentThreadAffinity(affinity_);
}
+
+ // Construct the watchers, but don't update the next pointer. This also
+ // cleans up any watchers that previously died, and puts the nonrt work
+ // before going realtime. After this happens, we will start queueing
+ // signals (which may be a bit of extra work to process, but won't cause any
+ // messages to be lost).
+ for (::std::unique_ptr<WatcherState> &watcher : watchers_) {
+ watcher->Construct();
+ }
+
// Now, all the callbacks are setup. Lock everything into memory and go RT.
if (priority_ != 0) {
::aos::InitRT();
@@ -1059,9 +1073,10 @@
set_is_running(true);
// Now that we are realtime (but before the OnRun handlers run), snap the
- // queue index.
+ // queue index pointer to the newest message. This happens in RT so that we
+ // minimize the risk of losing messages.
for (::std::unique_ptr<WatcherState> &watcher : watchers_) {
- watcher->Startup(this);
+ watcher->Startup();
}
// Now that we are RT, run all the OnRun handlers.
diff --git a/aos/events/shm_event_loop_test.cc b/aos/events/shm_event_loop_test.cc
index 6b40b9d..2382f07 100644
--- a/aos/events/shm_event_loop_test.cc
+++ b/aos/events/shm_event_loop_test.cc
@@ -26,12 +26,12 @@
}
// Clean up anything left there before.
- unlink((FLAGS_shm_base + "/test/aos.TestMessage.v5").c_str());
- unlink((FLAGS_shm_base + "/test1/aos.TestMessage.v5").c_str());
- unlink((FLAGS_shm_base + "/test2/aos.TestMessage.v5").c_str());
- unlink((FLAGS_shm_base + "/test2/aos.TestMessage.v5").c_str());
- unlink((FLAGS_shm_base + "/aos/aos.timing.Report.v5").c_str());
- unlink((FLAGS_shm_base + "/aos/aos.logging.LogMessageFbs.v5").c_str());
+ unlink((FLAGS_shm_base + "/test/aos.TestMessage.v6").c_str());
+ unlink((FLAGS_shm_base + "/test1/aos.TestMessage.v6").c_str());
+ unlink((FLAGS_shm_base + "/test2/aos.TestMessage.v6").c_str());
+ unlink((FLAGS_shm_base + "/test2/aos.TestMessage.v6").c_str());
+ unlink((FLAGS_shm_base + "/aos/aos.timing.Report.v6").c_str());
+ unlink((FLAGS_shm_base + "/aos/aos.logging.LogMessageFbs.v6").c_str());
}
~ShmEventLoopTestFactory() { FLAGS_override_hostname = ""; }
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index 7b84380..5cb67b4 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -100,7 +100,8 @@
void Handle() noexcept override;
- void Startup(EventLoop * /*event_loop*/) override {}
+ void Construct() override {}
+ void Startup() override {}
void Schedule(std::shared_ptr<SimulatedMessage> message);