Add realtime replay support to SimulatedEventLoopFactory

"realtime" is heavily overloaded here, but this adds support
for making it so that you can play a SimulatedEventLoopFactory at
realtime speed (rather than just "as fast as possible"). This
can be useful in a variety of situations (e.g., debugging
tooling that will run in realtime on a robot).

Adds a demonstration of using this in an piece of AOS tooling for
plotting (this change also makes it so that that binary no longer spins
at 100% CPU indefinitely by consequence of better integrating
the EPoll object into the log replay).

Change-Id: Ia01ecd850a50c9b78dd72bfb0e8862672a716067
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/BUILD b/aos/events/BUILD
index 278934b..be65594 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -373,6 +373,7 @@
     visibility = ["//visibility:public"],
     deps = [
         ":aos_logging",
+        ":epoll",
         ":event_loop",
         ":simple_channel",
         "//aos:init",
diff --git a/aos/events/event_scheduler.cc b/aos/events/event_scheduler.cc
index cb0c629..e7b9641 100644
--- a/aos/events/event_scheduler.cc
+++ b/aos/events/event_scheduler.cc
@@ -161,7 +161,7 @@
   RunOnRun();
 
   // Run all the sub-event-schedulers.
-  while (is_running_) {
+  RunMaybeRealtimeLoop([this, end_time]() {
     std::tuple<distributed_clock::time_point, EventScheduler *> oldest_event =
         OldestEvent();
     if (!reboots_.empty() &&
@@ -170,7 +170,7 @@
       if (std::get<0>(reboots_.front()) > end_time) {
         // Reboot is after our end time, give up.
         is_running_ = false;
-        break;
+        return;
       }
 
       CHECK_LE(now_,
@@ -180,14 +180,14 @@
       now_ = std::get<0>(reboots_.front());
       Reboot();
       reboots_.erase(reboots_.begin());
-      continue;
+      return;
     }
 
     // No events left, bail.
     if (std::get<0>(oldest_event) == distributed_clock::max_time ||
         std::get<0>(oldest_event) > end_time) {
       is_running_ = false;
-      break;
+      return;
     }
 
     // We get to pick our tradeoffs here.  Either we assume that there are no
@@ -202,7 +202,7 @@
     now_ = std::get<0>(oldest_event);
 
     std::get<1>(oldest_event)->CallOldestEvent();
-  }
+  });
 
   now_ = end_time;
 
@@ -213,8 +213,8 @@
   logging::ScopedLogRestorer prev_logger;
   RunOnStartup();
   RunOnRun();
-  // Run all the sub-event-schedulers.
-  while (is_running_) {
+  RunMaybeRealtimeLoop([this]() {
+    // Run all the sub-event-schedulers.
     std::tuple<distributed_clock::time_point, EventScheduler *> oldest_event =
         OldestEvent();
     if (!reboots_.empty() &&
@@ -227,11 +227,12 @@
       now_ = std::get<0>(reboots_.front());
       Reboot();
       reboots_.erase(reboots_.begin());
-      continue;
+      return;
     }
     // No events left, bail.
     if (std::get<0>(oldest_event) == distributed_clock::max_time) {
-      break;
+      is_running_ = false;
+      return;
     }
 
     // We get to pick our tradeoffs here.  Either we assume that there are no
@@ -246,13 +247,57 @@
     now_ = std::get<0>(oldest_event);
 
     std::get<1>(oldest_event)->CallOldestEvent();
-  }
-
-  is_running_ = false;
+  });
 
   RunStopped();
 }
 
+template <typename F>
+void EventSchedulerScheduler::RunMaybeRealtimeLoop(F loop_body) {
+  internal::TimerFd timerfd;
+  CHECK_LT(0.0, replay_rate_) << "Replay rate must be positive.";
+  distributed_clock::time_point last_distributed_clock =
+      std::get<0>(OldestEvent());
+  monotonic_clock::time_point last_monotonic_clock = monotonic_clock::now();
+  timerfd.SetTime(last_monotonic_clock, std::chrono::seconds(0));
+  epoll_.OnReadable(
+      timerfd.fd(), [this, &last_distributed_clock, &last_monotonic_clock,
+                     &timerfd, loop_body]() {
+        const uint64_t read_result = timerfd.Read();
+        if (!is_running_) {
+          epoll_.Quit();
+          return;
+        }
+        CHECK_EQ(read_result, 1u);
+        // Call loop_body() at least once; if we are in infinite-speed replay,
+        // we don't actually want/need the context switches from the epoll
+        // setup, so just loop.
+        // Note: The performance impacts of this code have not been carefully
+        // inspected (e.g., how much does avoiding the context-switch help; does
+        // the timerfd_settime call matter).
+        // This is deliberately written to support the user changing replay
+        // rates dynamically.
+        do {
+          loop_body();
+          if (is_running_) {
+            const monotonic_clock::time_point next_trigger =
+                last_monotonic_clock +
+                std::chrono::duration_cast<std::chrono::nanoseconds>(
+                    (now_ - last_distributed_clock) / replay_rate_);
+            timerfd.SetTime(next_trigger, std::chrono::seconds(0));
+            last_monotonic_clock = next_trigger;
+            last_distributed_clock = now_;
+          } else {
+            epoll_.Quit();
+          }
+        } while (replay_rate_ == std::numeric_limits<double>::infinity() &&
+                 is_running_);
+      });
+
+  epoll_.Run();
+  epoll_.DeleteFd(timerfd.fd());
+}
+
 std::tuple<distributed_clock::time_point, EventScheduler *>
 EventSchedulerScheduler::OldestEvent() {
   distributed_clock::time_point min_event_time = distributed_clock::max_time;
diff --git a/aos/events/event_scheduler.h b/aos/events/event_scheduler.h
index d91a75e..b14d0f8 100644
--- a/aos/events/event_scheduler.h
+++ b/aos/events/event_scheduler.h
@@ -8,6 +8,7 @@
 #include <utility>
 #include <vector>
 
+#include "aos/events/epoll.h"
 #include "aos/events/event_loop.h"
 #include "aos/events/logging/boot_timestamp.h"
 #include "aos/logging/implementations.h"
@@ -277,6 +278,13 @@
   // exactly the same.
   void RunFor(distributed_clock::duration duration);
 
+  // Sets the realtime replay rate. A value of 1.0 will cause the scheduler to
+  // try to play events in realtime. 0.5 will run at half speed. Use infinity
+  // (the default) to run as fast as possible. This can be changed during
+  // run-time.
+  void SetReplayRate(double replay_rate) { replay_rate_ = replay_rate; }
+  internal::EPoll *epoll() { return &epoll_; }
+
   // Returns the current distributed time.
   distributed_clock::time_point distributed_now() const { return now_; }
 
@@ -328,6 +336,12 @@
   // Returns the next event time and scheduler on which to run it.
   std::tuple<distributed_clock::time_point, EventScheduler *> OldestEvent();
 
+  // Handles running loop_body repeatedly until complete. loop_body should
+  // return the next time at which it wants to be called, and set is_running_ to
+  // false once we should stop.
+  template <typename F>
+  void RunMaybeRealtimeLoop(F loop_body);
+
   // True if we are running.
   bool is_running_ = false;
   // The current time.
@@ -339,6 +353,9 @@
   std::vector<std::tuple<distributed_clock::time_point,
                          std::vector<logger::BootTimestamp>>>
       reboots_;
+
+  double replay_rate_ = std::numeric_limits<double>::infinity();
+  internal::EPoll epoll_;
 };
 
 inline distributed_clock::time_point EventScheduler::distributed_now() const {
diff --git a/aos/events/logging/log_reader.cc b/aos/events/logging/log_reader.cc
index bdee44f..1ad60c9 100644
--- a/aos/events/logging/log_reader.cc
+++ b/aos/events/logging/log_reader.cc
@@ -2208,5 +2208,12 @@
   }
 }
 
+void LogReader::SetRealtimeReplayRate(double replay_rate) {
+  CHECK(event_loop_factory_ != nullptr)
+      << ": Can't set replay rate without an event loop factory (have you "
+         "called Register()?).";
+  event_loop_factory_->SetRealtimeReplayRate(replay_rate);
+}
+
 }  // namespace logger
 }  // namespace aos
diff --git a/aos/events/logging/log_reader.h b/aos/events/logging/log_reader.h
index 6ea3193..157249c 100644
--- a/aos/events/logging/log_reader.h
+++ b/aos/events/logging/log_reader.h
@@ -231,6 +231,13 @@
     exit_on_finish_ = exit_on_finish;
   }
 
+  // Sets the realtime replay rate. A value of 1.0 will cause the scheduler to
+  // try to play events in realtime. 0.5 will run at half speed. Use infinity
+  // (the default) to run as fast as possible. This can be changed during
+  // run-time.
+  // Only applies when running against a SimulatedEventLoopFactory.
+  void SetRealtimeReplayRate(double replay_rate);
+
  private:
   void Register(EventLoop *event_loop, const Node *node);
 
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index e0ed7a0..c218df6 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -756,6 +756,10 @@
   }
 }
 
+void SimulatedEventLoopFactory::SetRealtimeReplayRate(double replay_rate) {
+  scheduler_scheduler_.SetReplayRate(replay_rate);
+}
+
 void SimulatedEventLoop::MakeRawWatcher(
     const Channel *channel,
     std::function<void(const Context &channel, const void *message)> watcher) {
diff --git a/aos/events/simulated_event_loop.h b/aos/events/simulated_event_loop.h
index 9200044..e6ba4bf 100644
--- a/aos/events/simulated_event_loop.h
+++ b/aos/events/simulated_event_loop.h
@@ -133,6 +133,15 @@
   // starts up without stopping execution.
   void AllowApplicationCreationDuring(std::function<void()> fn);
 
+  // Sets the realtime replay rate. A value of 1.0 will cause the scheduler to
+  // try to play events in realtime. 0.5 will run at half speed. Use infinity
+  // (the default) to run as fast as possible. This can be changed during
+  // run-time.
+  void SetRealtimeReplayRate(double replay_rate);
+
+  // Access to the internal scheduler's epoll object for realtime replay.
+  internal::EPoll *scheduler_epoll() { return scheduler_scheduler_.epoll(); }
+
  private:
   friend class NodeEventLoopFactory;
 
diff --git a/aos/network/log_web_proxy_main.cc b/aos/network/log_web_proxy_main.cc
index 1cc8e17..34e40a2 100644
--- a/aos/network/log_web_proxy_main.cc
+++ b/aos/network/log_web_proxy_main.cc
@@ -17,6 +17,9 @@
 DEFINE_int32(buffer_size, -1, "-1 if infinite, in # of messages / channel.");
 DEFINE_double(monotonic_start_time, -1.0, "Start time (sec)");
 DEFINE_double(monotonic_end_time, -1.0, "End time (sec)");
+DEFINE_double(
+    replay_rate, -1,
+    "-1 to replay as fast as possible; 1.0 = realtime, 0.5 = half speed.");
 
 int main(int argc, char **argv) {
   aos::InitGoogle(&argc, &argv);
@@ -31,6 +34,12 @@
 
   reader.Register();
 
+  // If going for "as fast as possible" don't actually use infinity, because we
+  // don't want the log reading blocking our use of the epoll handlers.
+  reader.SetRealtimeReplayRate(FLAGS_replay_rate == -1.0
+                                   ? std::numeric_limits<double>::max()
+                                   : FLAGS_replay_rate);
+
   std::unique_ptr<aos::EventLoop> event_loop;
 
   if (FLAGS_node.empty()) {
@@ -55,13 +64,12 @@
   }
 
   aos::web_proxy::WebProxy web_proxy(
-      event_loop.get(), aos::web_proxy::StoreHistory::kYes, FLAGS_buffer_size);
+      event_loop.get(),
+      reader.event_loop_factory()->scheduler_epoll(),
+      aos::web_proxy::StoreHistory::kYes, FLAGS_buffer_size);
 
   web_proxy.SetDataPath(FLAGS_data_dir.c_str());
 
-  // Keep the web proxy alive past when we finish reading the logfile.
-  reader.set_exit_on_finish(false);
-
   if (FLAGS_monotonic_end_time > 0) {
     event_loop->AddTimer([&web_proxy]() { web_proxy.StopRecording(); })
         ->Setup(aos::monotonic_clock::time_point(
@@ -70,4 +78,11 @@
   }
 
   reader.event_loop_factory()->Run();
+
+  // Keep the web proxy alive past when we finish reading the logfile, but crank
+  // down the replay rate so that we don't peg our entire CPU just trying to
+  // service timers in the web proxy code.
+  reader.set_exit_on_finish(false);
+  reader.SetRealtimeReplayRate(1.0);
+  reader.event_loop_factory()->Run();
 }
diff --git a/aos/network/web_proxy.h b/aos/network/web_proxy.h
index 0c1d1dc..2b57c05 100644
--- a/aos/network/web_proxy.h
+++ b/aos/network/web_proxy.h
@@ -77,6 +77,8 @@
            int per_channel_buffer_size_bytes);
   WebProxy(aos::ShmEventLoop *event_loop, StoreHistory store_history,
            int per_channel_buffer_size_bytes);
+  WebProxy(aos::EventLoop *event_loop, aos::internal::EPoll *epoll,
+           StoreHistory store_history, int per_channel_buffer_size_bytes);
   ~WebProxy();
 
   void SetDataPath(const char *path) { server_.setStaticPath(path); }
@@ -85,9 +87,6 @@
   void StopRecording();
 
  private:
-  WebProxy(aos::EventLoop *event_loop, aos::internal::EPoll *epoll,
-           StoreHistory store_history, int per_channel_buffer_size_bytes);
-
   aos::internal::EPoll internal_epoll_;
   aos::internal::EPoll *const epoll_;
   ::seasocks::Server server_;
diff --git a/frc971/analysis/in_process_plotter.cc b/frc971/analysis/in_process_plotter.cc
index 545740d..33d999e 100644
--- a/frc971/analysis/in_process_plotter.cc
+++ b/frc971/analysis/in_process_plotter.cc
@@ -15,7 +15,8 @@
       event_loop_factory_(&config_.message()),
       event_loop_(event_loop_factory_.MakeEventLoop("plotter")),
       plot_sender_(event_loop_->MakeSender<Plot>("/analysis")),
-      web_proxy_(event_loop_.get(), aos::web_proxy::StoreHistory::kYes, -1),
+      web_proxy_(event_loop_.get(), event_loop_factory_.scheduler_epoll(),
+                 aos::web_proxy::StoreHistory::kYes, -1),
       builder_(plot_sender_.MakeBuilder()) {
   web_proxy_.SetDataPath(kDataPath);
   event_loop_->SkipTimingReport();
@@ -41,7 +42,11 @@
       ColorWheelColor{.name = "white", .color = {1, 1, 1}});
 }
 
-void Plotter::Spin() { event_loop_factory_.Run(); }
+void Plotter::Spin() {
+  // Set non-infinite replay rate to avoid pegging a full CPU.
+  event_loop_factory_.SetRealtimeReplayRate(1.0);
+  event_loop_factory_.Run();
+}
 
 void Plotter::Title(std::string_view title) {
   title_ = builder_.fbb()->CreateString(title);