Fix race in ShmEventLoop
The logic in ShmEventLoop is trying to make sure messages are replayed
in order, and wakeups aren't dropped. It does this roughly by:
* Grab the time
* Process all the wakeups
* Look at all the queues for new messages
* Process messages (and timers) up until the time grabbed above
* Repeat until there is nothing to do
* Go back to sleep
There is a small race. If a message is published after the time is
grabbed but before wakeups are processed, we won't process it and will
go to sleep. The fix is to retry in that case.
Change-Id: Ic4a64b6abf376192ebe415fe3c83bd7edc5889db
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index 1352249..5be5077 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -819,7 +819,8 @@
} else {
next_time = PeekEvent()->event_time();
}
- const auto now = monotonic_clock::now();
+ monotonic_clock::time_point now;
+ bool new_data = false;
if (next_time > checked_until) {
// Read all of the signals, because there's no point in waking up again
@@ -835,13 +836,19 @@
}
CHECK_EQ(result.ssi_signo, ipc_lib::kWakeupSignal);
}
+ // This is the last time we can guarantee that if a message is published
+ // before, we will notice it.
+ now = monotonic_clock::now();
// Check all the watchers for new events.
for (std::unique_ptr<WatcherState> &base_watcher : watchers_) {
ShmWatcherState *const watcher =
reinterpret_cast<ShmWatcherState *>(base_watcher.get());
- watcher->CheckForNewData();
+ // Track if we got a message.
+ if (watcher->CheckForNewData()) {
+ new_data = true;
+ }
}
if (EventCount() == 0) {
// Still no events, all done now.
@@ -851,9 +858,19 @@
checked_until = now;
// Check for any new events we found.
next_time = PeekEvent()->event_time();
+ } else {
+ now = monotonic_clock::now();
}
if (next_time > now) {
+ // Ok, we got a message with a timestamp *after* we wrote down time. We
+ // need to process it (otherwise we will go to sleep without processing
+ // it), but we also need to make sure no other messages have come in
+ // before it that we would process out of order. Just go around again to
+ // redo the checks.
+ if (new_data) {
+ continue;
+ }
break;
}