Add realtime replay support to SimulatedEventLoopFactory

"realtime" is heavily overloaded here, but this adds support
for making it so that you can play a SimulatedEventLoopFactory at
realtime speed (rather than just "as fast as possible"). This
can be useful in a variety of situations (e.g., debugging
tooling that will run in realtime on a robot).

Adds a demonstration of using this in an piece of AOS tooling for
plotting (this change also makes it so that that binary no longer spins
at 100% CPU indefinitely by consequence of better integrating
the EPoll object into the log replay).

Change-Id: Ia01ecd850a50c9b78dd72bfb0e8862672a716067
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/BUILD b/aos/events/BUILD
index 278934b..be65594 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -373,6 +373,7 @@
     visibility = ["//visibility:public"],
     deps = [
         ":aos_logging",
+        ":epoll",
         ":event_loop",
         ":simple_channel",
         "//aos:init",
diff --git a/aos/events/event_scheduler.cc b/aos/events/event_scheduler.cc
index cb0c629..e7b9641 100644
--- a/aos/events/event_scheduler.cc
+++ b/aos/events/event_scheduler.cc
@@ -161,7 +161,7 @@
   RunOnRun();
 
   // Run all the sub-event-schedulers.
-  while (is_running_) {
+  RunMaybeRealtimeLoop([this, end_time]() {
     std::tuple<distributed_clock::time_point, EventScheduler *> oldest_event =
         OldestEvent();
     if (!reboots_.empty() &&
@@ -170,7 +170,7 @@
       if (std::get<0>(reboots_.front()) > end_time) {
         // Reboot is after our end time, give up.
         is_running_ = false;
-        break;
+        return;
       }
 
       CHECK_LE(now_,
@@ -180,14 +180,14 @@
       now_ = std::get<0>(reboots_.front());
       Reboot();
       reboots_.erase(reboots_.begin());
-      continue;
+      return;
     }
 
     // No events left, bail.
     if (std::get<0>(oldest_event) == distributed_clock::max_time ||
         std::get<0>(oldest_event) > end_time) {
       is_running_ = false;
-      break;
+      return;
     }
 
     // We get to pick our tradeoffs here.  Either we assume that there are no
@@ -202,7 +202,7 @@
     now_ = std::get<0>(oldest_event);
 
     std::get<1>(oldest_event)->CallOldestEvent();
-  }
+  });
 
   now_ = end_time;
 
@@ -213,8 +213,8 @@
   logging::ScopedLogRestorer prev_logger;
   RunOnStartup();
   RunOnRun();
-  // Run all the sub-event-schedulers.
-  while (is_running_) {
+  RunMaybeRealtimeLoop([this]() {
+    // Run all the sub-event-schedulers.
     std::tuple<distributed_clock::time_point, EventScheduler *> oldest_event =
         OldestEvent();
     if (!reboots_.empty() &&
@@ -227,11 +227,12 @@
       now_ = std::get<0>(reboots_.front());
       Reboot();
       reboots_.erase(reboots_.begin());
-      continue;
+      return;
     }
     // No events left, bail.
     if (std::get<0>(oldest_event) == distributed_clock::max_time) {
-      break;
+      is_running_ = false;
+      return;
     }
 
     // We get to pick our tradeoffs here.  Either we assume that there are no
@@ -246,13 +247,57 @@
     now_ = std::get<0>(oldest_event);
 
     std::get<1>(oldest_event)->CallOldestEvent();
-  }
-
-  is_running_ = false;
+  });
 
   RunStopped();
 }
 
+template <typename F>
+void EventSchedulerScheduler::RunMaybeRealtimeLoop(F loop_body) {
+  internal::TimerFd timerfd;
+  CHECK_LT(0.0, replay_rate_) << "Replay rate must be positive.";
+  distributed_clock::time_point last_distributed_clock =
+      std::get<0>(OldestEvent());
+  monotonic_clock::time_point last_monotonic_clock = monotonic_clock::now();
+  timerfd.SetTime(last_monotonic_clock, std::chrono::seconds(0));
+  epoll_.OnReadable(
+      timerfd.fd(), [this, &last_distributed_clock, &last_monotonic_clock,
+                     &timerfd, loop_body]() {
+        const uint64_t read_result = timerfd.Read();
+        if (!is_running_) {
+          epoll_.Quit();
+          return;
+        }
+        CHECK_EQ(read_result, 1u);
+        // Call loop_body() at least once; if we are in infinite-speed replay,
+        // we don't actually want/need the context switches from the epoll
+        // setup, so just loop.
+        // Note: The performance impacts of this code have not been carefully
+        // inspected (e.g., how much does avoiding the context-switch help; does
+        // the timerfd_settime call matter).
+        // This is deliberately written to support the user changing replay
+        // rates dynamically.
+        do {
+          loop_body();
+          if (is_running_) {
+            const monotonic_clock::time_point next_trigger =
+                last_monotonic_clock +
+                std::chrono::duration_cast<std::chrono::nanoseconds>(
+                    (now_ - last_distributed_clock) / replay_rate_);
+            timerfd.SetTime(next_trigger, std::chrono::seconds(0));
+            last_monotonic_clock = next_trigger;
+            last_distributed_clock = now_;
+          } else {
+            epoll_.Quit();
+          }
+        } while (replay_rate_ == std::numeric_limits<double>::infinity() &&
+                 is_running_);
+      });
+
+  epoll_.Run();
+  epoll_.DeleteFd(timerfd.fd());
+}
+
 std::tuple<distributed_clock::time_point, EventScheduler *>
 EventSchedulerScheduler::OldestEvent() {
   distributed_clock::time_point min_event_time = distributed_clock::max_time;
diff --git a/aos/events/event_scheduler.h b/aos/events/event_scheduler.h
index d91a75e..b14d0f8 100644
--- a/aos/events/event_scheduler.h
+++ b/aos/events/event_scheduler.h
@@ -8,6 +8,7 @@
 #include <utility>
 #include <vector>
 
+#include "aos/events/epoll.h"
 #include "aos/events/event_loop.h"
 #include "aos/events/logging/boot_timestamp.h"
 #include "aos/logging/implementations.h"
@@ -277,6 +278,13 @@
   // exactly the same.
   void RunFor(distributed_clock::duration duration);
 
+  // Sets the realtime replay rate. A value of 1.0 will cause the scheduler to
+  // try to play events in realtime. 0.5 will run at half speed. Use infinity
+  // (the default) to run as fast as possible. This can be changed during
+  // run-time.
+  void SetReplayRate(double replay_rate) { replay_rate_ = replay_rate; }
+  internal::EPoll *epoll() { return &epoll_; }
+
   // Returns the current distributed time.
   distributed_clock::time_point distributed_now() const { return now_; }
 
@@ -328,6 +336,12 @@
   // Returns the next event time and scheduler on which to run it.
   std::tuple<distributed_clock::time_point, EventScheduler *> OldestEvent();
 
+  // Handles running loop_body repeatedly until complete. loop_body should
+  // return the next time at which it wants to be called, and set is_running_ to
+  // false once we should stop.
+  template <typename F>
+  void RunMaybeRealtimeLoop(F loop_body);
+
   // True if we are running.
   bool is_running_ = false;
   // The current time.
@@ -339,6 +353,9 @@
   std::vector<std::tuple<distributed_clock::time_point,
                          std::vector<logger::BootTimestamp>>>
       reboots_;
+
+  double replay_rate_ = std::numeric_limits<double>::infinity();
+  internal::EPoll epoll_;
 };
 
 inline distributed_clock::time_point EventScheduler::distributed_now() const {
diff --git a/aos/events/logging/log_reader.cc b/aos/events/logging/log_reader.cc
index bdee44f..1ad60c9 100644
--- a/aos/events/logging/log_reader.cc
+++ b/aos/events/logging/log_reader.cc
@@ -2208,5 +2208,12 @@
   }
 }
 
+void LogReader::SetRealtimeReplayRate(double replay_rate) {
+  CHECK(event_loop_factory_ != nullptr)
+      << ": Can't set replay rate without an event loop factory (have you "
+         "called Register()?).";
+  event_loop_factory_->SetRealtimeReplayRate(replay_rate);
+}
+
 }  // namespace logger
 }  // namespace aos
diff --git a/aos/events/logging/log_reader.h b/aos/events/logging/log_reader.h
index 6ea3193..157249c 100644
--- a/aos/events/logging/log_reader.h
+++ b/aos/events/logging/log_reader.h
@@ -231,6 +231,13 @@
     exit_on_finish_ = exit_on_finish;
   }
 
+  // Sets the realtime replay rate. A value of 1.0 will cause the scheduler to
+  // try to play events in realtime. 0.5 will run at half speed. Use infinity
+  // (the default) to run as fast as possible. This can be changed during
+  // run-time.
+  // Only applies when running against a SimulatedEventLoopFactory.
+  void SetRealtimeReplayRate(double replay_rate);
+
  private:
   void Register(EventLoop *event_loop, const Node *node);
 
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index e0ed7a0..c218df6 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -756,6 +756,10 @@
   }
 }
 
+void SimulatedEventLoopFactory::SetRealtimeReplayRate(double replay_rate) {
+  scheduler_scheduler_.SetReplayRate(replay_rate);
+}
+
 void SimulatedEventLoop::MakeRawWatcher(
     const Channel *channel,
     std::function<void(const Context &channel, const void *message)> watcher) {
diff --git a/aos/events/simulated_event_loop.h b/aos/events/simulated_event_loop.h
index 9200044..e6ba4bf 100644
--- a/aos/events/simulated_event_loop.h
+++ b/aos/events/simulated_event_loop.h
@@ -133,6 +133,15 @@
   // starts up without stopping execution.
   void AllowApplicationCreationDuring(std::function<void()> fn);
 
+  // Sets the realtime replay rate. A value of 1.0 will cause the scheduler to
+  // try to play events in realtime. 0.5 will run at half speed. Use infinity
+  // (the default) to run as fast as possible. This can be changed during
+  // run-time.
+  void SetRealtimeReplayRate(double replay_rate);
+
+  // Access to the internal scheduler's epoll object for realtime replay.
+  internal::EPoll *scheduler_epoll() { return scheduler_scheduler_.epoll(); }
+
  private:
   friend class NodeEventLoopFactory;