Run watchers even if a timer is firing constantly

Also make sure to drain the signalfd each time through.

There can only be so many outstanding signals on a system, and this
bounds that much better.  It also handles events which take too long to
handle and other events are ready to be handled.

Change-Id: I24ea65356c7316818ce70f34a84174426b85d329
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index 39ce8db..61a4149 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -19,7 +19,6 @@
 #include "aos/events/timing_statistics.h"
 #include "aos/init.h"
 #include "aos/ipc_lib/lockless_queue.h"
-#include "aos/ipc_lib/signalfd.h"
 #include "aos/realtime.h"
 #include "aos/stl_mutex/stl_mutex.h"
 #include "aos/util/file.h"
@@ -654,7 +653,7 @@
         shm_event_loop_(shm_event_loop),
         event_(this) {
     shm_event_loop_->epoll_.OnReadable(timerfd_.fd(), [this]() {
-      // The timer may fire spurriously.  HandleEvent on the event loop will
+      // The timer may fire spuriously.  HandleEvent on the event loop will
       // call the callback if it is needed.  It may also have called it when
       // processing some other event, and the kernel decided to deliver this
       // wakeup anyways.
@@ -845,22 +844,82 @@
   on_run_.push_back(::std::move(on_run));
 }
 
+// This is a bit tricky because watchers can generate new events at any time (as
+// long as it's in the past). We want to check the watchers at least once before
+// declaring there are no events to handle, and we want to check them again if
+// event processing takes long enough that we find an event after that point in
+// time to handle.
 void ShmEventLoop::HandleEvent() {
-  // Update all the times for handlers.
-  for (::std::unique_ptr<WatcherState> &base_watcher : watchers_) {
-    ShmWatcherState *watcher =
-        reinterpret_cast<ShmWatcherState *>(base_watcher.get());
-
-    watcher->CheckForNewData();
+  // Time through which we've checked for new events in watchers.
+  monotonic_clock::time_point checked_until = monotonic_clock::min_time;
+  if (!signalfd_) {
+    // Nothing to check, so we can bail out immediately once we're out of
+    // events.
+    CHECK(watchers_.empty());
+    checked_until = monotonic_clock::max_time;
   }
 
+  // Loop until we run out of events to check.
   while (true) {
-    if (EventCount() == 0 ||
-        PeekEvent()->event_time() > monotonic_clock::now()) {
+    // Time of the next event we know about. If this is before checked_until, we
+    // know there aren't any new events before the next one that we already know
+    // about, so no need to check the watchers.
+    monotonic_clock::time_point next_time = monotonic_clock::max_time;
+
+    if (EventCount() == 0) {
+      if (checked_until != monotonic_clock::min_time) {
+        // No events, and we've already checked the watchers at least once, so
+        // we're all done.
+        //
+        // There's a small chance that a watcher has gotten another event in
+        // between checked_until and now. If so, then the signalfd will be
+        // triggered now and we'll re-enter HandleEvent immediately. This is
+        // unlikely though, so we don't want to spend time checking all the
+        // watchers unnecessarily.
+        break;
+      }
+    } else {
+      next_time = PeekEvent()->event_time();
+    }
+    const auto now = monotonic_clock::now();
+
+    if (next_time > checked_until) {
+      // Read all of the signals, because there's no point in waking up again
+      // immediately to handle each one if we've fallen behind.
+      //
+      // This is safe before checking for new data on the watchers. If a signal
+      // is cleared here, the corresponding CheckForNewData() call below will
+      // pick it up.
+      while (true) {
+        const signalfd_siginfo result = signalfd_->Read();
+        if (result.ssi_signo == 0) {
+          break;
+        }
+        CHECK_EQ(result.ssi_signo, ipc_lib::kWakeupSignal);
+      }
+
+      // Check all the watchers for new events.
+      for (std::unique_ptr<WatcherState> &base_watcher : watchers_) {
+        ShmWatcherState *const watcher =
+            reinterpret_cast<ShmWatcherState *>(base_watcher.get());
+
+        watcher->CheckForNewData();
+      }
+      if (EventCount() == 0) {
+        // Still no events, all done now.
+        break;
+      }
+
+      checked_until = now;
+      // Check for any new events we found.
+      next_time = PeekEvent()->event_time();
+    }
+
+    if (next_time > now) {
       break;
     }
 
-    EventLoopEvent *event = PopEvent();
+    EventLoopEvent *const event = PopEvent();
     event->HandleEvent();
   }
 }
@@ -961,21 +1020,10 @@
 void ShmEventLoop::Run() {
   SignalHandler::global()->Register(this);
 
-  std::unique_ptr<ipc_lib::SignalFd> signalfd;
-
   if (watchers_.size() > 0) {
-    signalfd.reset(new ipc_lib::SignalFd({ipc_lib::kWakeupSignal}));
+    signalfd_.reset(new ipc_lib::SignalFd({ipc_lib::kWakeupSignal}));
 
-    epoll_.OnReadable(signalfd->fd(), [signalfd_ptr = signalfd.get(), this]() {
-      signalfd_siginfo result = signalfd_ptr->Read();
-      CHECK_EQ(result.ssi_signo, ipc_lib::kWakeupSignal);
-
-      // TODO(austin): We should really be checking *everything*, not just
-      // watchers, and calling the oldest thing first.  That will improve
-      // determinism a lot.
-
-      HandleEvent();
-    });
+    epoll_.OnReadable(signalfd_->fd(), [this]() { HandleEvent(); });
   }
 
   MaybeScheduleTimingReports();
@@ -1034,8 +1082,8 @@
   }
 
   if (watchers_.size() > 0) {
-    epoll_.DeleteFd(signalfd->fd());
-    signalfd.reset();
+    epoll_.DeleteFd(signalfd_->fd());
+    signalfd_.reset();
   }
 
   SignalHandler::global()->Unregister(this);
diff --git a/aos/events/shm_event_loop.h b/aos/events/shm_event_loop.h
index b76b4fe..845857c 100644
--- a/aos/events/shm_event_loop.h
+++ b/aos/events/shm_event_loop.h
@@ -8,6 +8,7 @@
 #include "aos/events/epoll.h"
 #include "aos/events/event_loop.h"
 #include "aos/events/event_loop_generated.h"
+#include "aos/ipc_lib/signalfd.h"
 
 DECLARE_string(application_name);
 DECLARE_string(shm_base);
@@ -151,6 +152,9 @@
   const Node *const node_;
 
   internal::EPoll epoll_;
+
+  // Only set during Run().
+  std::unique_ptr<ipc_lib::SignalFd> signalfd_;
 };
 
 }  // namespace aos