Add realtime replay support to SimulatedEventLoopFactory

"realtime" is heavily overloaded here, but this adds support
for making it so that you can play a SimulatedEventLoopFactory at
realtime speed (rather than just "as fast as possible"). This
can be useful in a variety of situations (e.g., debugging
tooling that will run in realtime on a robot).

Adds a demonstration of using this in an piece of AOS tooling for
plotting (this change also makes it so that that binary no longer spins
at 100% CPU indefinitely by consequence of better integrating
the EPoll object into the log replay).

Change-Id: Ia01ecd850a50c9b78dd72bfb0e8862672a716067
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/event_scheduler.cc b/aos/events/event_scheduler.cc
index cb0c629..e7b9641 100644
--- a/aos/events/event_scheduler.cc
+++ b/aos/events/event_scheduler.cc
@@ -161,7 +161,7 @@
   RunOnRun();
 
   // Run all the sub-event-schedulers.
-  while (is_running_) {
+  RunMaybeRealtimeLoop([this, end_time]() {
     std::tuple<distributed_clock::time_point, EventScheduler *> oldest_event =
         OldestEvent();
     if (!reboots_.empty() &&
@@ -170,7 +170,7 @@
       if (std::get<0>(reboots_.front()) > end_time) {
         // Reboot is after our end time, give up.
         is_running_ = false;
-        break;
+        return;
       }
 
       CHECK_LE(now_,
@@ -180,14 +180,14 @@
       now_ = std::get<0>(reboots_.front());
       Reboot();
       reboots_.erase(reboots_.begin());
-      continue;
+      return;
     }
 
     // No events left, bail.
     if (std::get<0>(oldest_event) == distributed_clock::max_time ||
         std::get<0>(oldest_event) > end_time) {
       is_running_ = false;
-      break;
+      return;
     }
 
     // We get to pick our tradeoffs here.  Either we assume that there are no
@@ -202,7 +202,7 @@
     now_ = std::get<0>(oldest_event);
 
     std::get<1>(oldest_event)->CallOldestEvent();
-  }
+  });
 
   now_ = end_time;
 
@@ -213,8 +213,8 @@
   logging::ScopedLogRestorer prev_logger;
   RunOnStartup();
   RunOnRun();
-  // Run all the sub-event-schedulers.
-  while (is_running_) {
+  RunMaybeRealtimeLoop([this]() {
+    // Run all the sub-event-schedulers.
     std::tuple<distributed_clock::time_point, EventScheduler *> oldest_event =
         OldestEvent();
     if (!reboots_.empty() &&
@@ -227,11 +227,12 @@
       now_ = std::get<0>(reboots_.front());
       Reboot();
       reboots_.erase(reboots_.begin());
-      continue;
+      return;
     }
     // No events left, bail.
     if (std::get<0>(oldest_event) == distributed_clock::max_time) {
-      break;
+      is_running_ = false;
+      return;
     }
 
     // We get to pick our tradeoffs here.  Either we assume that there are no
@@ -246,13 +247,57 @@
     now_ = std::get<0>(oldest_event);
 
     std::get<1>(oldest_event)->CallOldestEvent();
-  }
-
-  is_running_ = false;
+  });
 
   RunStopped();
 }
 
+template <typename F>
+void EventSchedulerScheduler::RunMaybeRealtimeLoop(F loop_body) {
+  internal::TimerFd timerfd;
+  CHECK_LT(0.0, replay_rate_) << "Replay rate must be positive.";
+  distributed_clock::time_point last_distributed_clock =
+      std::get<0>(OldestEvent());
+  monotonic_clock::time_point last_monotonic_clock = monotonic_clock::now();
+  timerfd.SetTime(last_monotonic_clock, std::chrono::seconds(0));
+  epoll_.OnReadable(
+      timerfd.fd(), [this, &last_distributed_clock, &last_monotonic_clock,
+                     &timerfd, loop_body]() {
+        const uint64_t read_result = timerfd.Read();
+        if (!is_running_) {
+          epoll_.Quit();
+          return;
+        }
+        CHECK_EQ(read_result, 1u);
+        // Call loop_body() at least once; if we are in infinite-speed replay,
+        // we don't actually want/need the context switches from the epoll
+        // setup, so just loop.
+        // Note: The performance impacts of this code have not been carefully
+        // inspected (e.g., how much does avoiding the context-switch help; does
+        // the timerfd_settime call matter).
+        // This is deliberately written to support the user changing replay
+        // rates dynamically.
+        do {
+          loop_body();
+          if (is_running_) {
+            const monotonic_clock::time_point next_trigger =
+                last_monotonic_clock +
+                std::chrono::duration_cast<std::chrono::nanoseconds>(
+                    (now_ - last_distributed_clock) / replay_rate_);
+            timerfd.SetTime(next_trigger, std::chrono::seconds(0));
+            last_monotonic_clock = next_trigger;
+            last_distributed_clock = now_;
+          } else {
+            epoll_.Quit();
+          }
+        } while (replay_rate_ == std::numeric_limits<double>::infinity() &&
+                 is_running_);
+      });
+
+  epoll_.Run();
+  epoll_.DeleteFd(timerfd.fd());
+}
+
 std::tuple<distributed_clock::time_point, EventScheduler *>
 EventSchedulerScheduler::OldestEvent() {
   distributed_clock::time_point min_event_time = distributed_clock::max_time;