Add realtime replay support to SimulatedEventLoopFactory
"realtime" is heavily overloaded here, but this adds support
for making it so that you can play a SimulatedEventLoopFactory at
realtime speed (rather than just "as fast as possible"). This
can be useful in a variety of situations (e.g., debugging
tooling that will run in realtime on a robot).
Adds a demonstration of using this in an piece of AOS tooling for
plotting (this change also makes it so that that binary no longer spins
at 100% CPU indefinitely by consequence of better integrating
the EPoll object into the log replay).
Change-Id: Ia01ecd850a50c9b78dd72bfb0e8862672a716067
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/BUILD b/aos/events/BUILD
index 278934b..be65594 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -373,6 +373,7 @@
visibility = ["//visibility:public"],
deps = [
":aos_logging",
+ ":epoll",
":event_loop",
":simple_channel",
"//aos:init",
diff --git a/aos/events/event_scheduler.cc b/aos/events/event_scheduler.cc
index cb0c629..e7b9641 100644
--- a/aos/events/event_scheduler.cc
+++ b/aos/events/event_scheduler.cc
@@ -161,7 +161,7 @@
RunOnRun();
// Run all the sub-event-schedulers.
- while (is_running_) {
+ RunMaybeRealtimeLoop([this, end_time]() {
std::tuple<distributed_clock::time_point, EventScheduler *> oldest_event =
OldestEvent();
if (!reboots_.empty() &&
@@ -170,7 +170,7 @@
if (std::get<0>(reboots_.front()) > end_time) {
// Reboot is after our end time, give up.
is_running_ = false;
- break;
+ return;
}
CHECK_LE(now_,
@@ -180,14 +180,14 @@
now_ = std::get<0>(reboots_.front());
Reboot();
reboots_.erase(reboots_.begin());
- continue;
+ return;
}
// No events left, bail.
if (std::get<0>(oldest_event) == distributed_clock::max_time ||
std::get<0>(oldest_event) > end_time) {
is_running_ = false;
- break;
+ return;
}
// We get to pick our tradeoffs here. Either we assume that there are no
@@ -202,7 +202,7 @@
now_ = std::get<0>(oldest_event);
std::get<1>(oldest_event)->CallOldestEvent();
- }
+ });
now_ = end_time;
@@ -213,8 +213,8 @@
logging::ScopedLogRestorer prev_logger;
RunOnStartup();
RunOnRun();
- // Run all the sub-event-schedulers.
- while (is_running_) {
+ RunMaybeRealtimeLoop([this]() {
+ // Run all the sub-event-schedulers.
std::tuple<distributed_clock::time_point, EventScheduler *> oldest_event =
OldestEvent();
if (!reboots_.empty() &&
@@ -227,11 +227,12 @@
now_ = std::get<0>(reboots_.front());
Reboot();
reboots_.erase(reboots_.begin());
- continue;
+ return;
}
// No events left, bail.
if (std::get<0>(oldest_event) == distributed_clock::max_time) {
- break;
+ is_running_ = false;
+ return;
}
// We get to pick our tradeoffs here. Either we assume that there are no
@@ -246,13 +247,57 @@
now_ = std::get<0>(oldest_event);
std::get<1>(oldest_event)->CallOldestEvent();
- }
-
- is_running_ = false;
+ });
RunStopped();
}
+template <typename F>
+void EventSchedulerScheduler::RunMaybeRealtimeLoop(F loop_body) {
+ internal::TimerFd timerfd;
+ CHECK_LT(0.0, replay_rate_) << "Replay rate must be positive.";
+ distributed_clock::time_point last_distributed_clock =
+ std::get<0>(OldestEvent());
+ monotonic_clock::time_point last_monotonic_clock = monotonic_clock::now();
+ timerfd.SetTime(last_monotonic_clock, std::chrono::seconds(0));
+ epoll_.OnReadable(
+ timerfd.fd(), [this, &last_distributed_clock, &last_monotonic_clock,
+ &timerfd, loop_body]() {
+ const uint64_t read_result = timerfd.Read();
+ if (!is_running_) {
+ epoll_.Quit();
+ return;
+ }
+ CHECK_EQ(read_result, 1u);
+ // Call loop_body() at least once; if we are in infinite-speed replay,
+ // we don't actually want/need the context switches from the epoll
+ // setup, so just loop.
+ // Note: The performance impacts of this code have not been carefully
+ // inspected (e.g., how much does avoiding the context-switch help; does
+ // the timerfd_settime call matter).
+ // This is deliberately written to support the user changing replay
+ // rates dynamically.
+ do {
+ loop_body();
+ if (is_running_) {
+ const monotonic_clock::time_point next_trigger =
+ last_monotonic_clock +
+ std::chrono::duration_cast<std::chrono::nanoseconds>(
+ (now_ - last_distributed_clock) / replay_rate_);
+ timerfd.SetTime(next_trigger, std::chrono::seconds(0));
+ last_monotonic_clock = next_trigger;
+ last_distributed_clock = now_;
+ } else {
+ epoll_.Quit();
+ }
+ } while (replay_rate_ == std::numeric_limits<double>::infinity() &&
+ is_running_);
+ });
+
+ epoll_.Run();
+ epoll_.DeleteFd(timerfd.fd());
+}
+
std::tuple<distributed_clock::time_point, EventScheduler *>
EventSchedulerScheduler::OldestEvent() {
distributed_clock::time_point min_event_time = distributed_clock::max_time;
diff --git a/aos/events/event_scheduler.h b/aos/events/event_scheduler.h
index d91a75e..b14d0f8 100644
--- a/aos/events/event_scheduler.h
+++ b/aos/events/event_scheduler.h
@@ -8,6 +8,7 @@
#include <utility>
#include <vector>
+#include "aos/events/epoll.h"
#include "aos/events/event_loop.h"
#include "aos/events/logging/boot_timestamp.h"
#include "aos/logging/implementations.h"
@@ -277,6 +278,13 @@
// exactly the same.
void RunFor(distributed_clock::duration duration);
+ // Sets the realtime replay rate. A value of 1.0 will cause the scheduler to
+ // try to play events in realtime. 0.5 will run at half speed. Use infinity
+ // (the default) to run as fast as possible. This can be changed during
+ // run-time.
+ void SetReplayRate(double replay_rate) { replay_rate_ = replay_rate; }
+ internal::EPoll *epoll() { return &epoll_; }
+
// Returns the current distributed time.
distributed_clock::time_point distributed_now() const { return now_; }
@@ -328,6 +336,12 @@
// Returns the next event time and scheduler on which to run it.
std::tuple<distributed_clock::time_point, EventScheduler *> OldestEvent();
+ // Handles running loop_body repeatedly until complete. loop_body should
+ // return the next time at which it wants to be called, and set is_running_ to
+ // false once we should stop.
+ template <typename F>
+ void RunMaybeRealtimeLoop(F loop_body);
+
// True if we are running.
bool is_running_ = false;
// The current time.
@@ -339,6 +353,9 @@
std::vector<std::tuple<distributed_clock::time_point,
std::vector<logger::BootTimestamp>>>
reboots_;
+
+ double replay_rate_ = std::numeric_limits<double>::infinity();
+ internal::EPoll epoll_;
};
inline distributed_clock::time_point EventScheduler::distributed_now() const {
diff --git a/aos/events/logging/log_reader.cc b/aos/events/logging/log_reader.cc
index bdee44f..1ad60c9 100644
--- a/aos/events/logging/log_reader.cc
+++ b/aos/events/logging/log_reader.cc
@@ -2208,5 +2208,12 @@
}
}
+void LogReader::SetRealtimeReplayRate(double replay_rate) {
+ CHECK(event_loop_factory_ != nullptr)
+ << ": Can't set replay rate without an event loop factory (have you "
+ "called Register()?).";
+ event_loop_factory_->SetRealtimeReplayRate(replay_rate);
+}
+
} // namespace logger
} // namespace aos
diff --git a/aos/events/logging/log_reader.h b/aos/events/logging/log_reader.h
index 6ea3193..157249c 100644
--- a/aos/events/logging/log_reader.h
+++ b/aos/events/logging/log_reader.h
@@ -231,6 +231,13 @@
exit_on_finish_ = exit_on_finish;
}
+ // Sets the realtime replay rate. A value of 1.0 will cause the scheduler to
+ // try to play events in realtime. 0.5 will run at half speed. Use infinity
+ // (the default) to run as fast as possible. This can be changed during
+ // run-time.
+ // Only applies when running against a SimulatedEventLoopFactory.
+ void SetRealtimeReplayRate(double replay_rate);
+
private:
void Register(EventLoop *event_loop, const Node *node);
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index e0ed7a0..c218df6 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -756,6 +756,10 @@
}
}
+void SimulatedEventLoopFactory::SetRealtimeReplayRate(double replay_rate) {
+ scheduler_scheduler_.SetReplayRate(replay_rate);
+}
+
void SimulatedEventLoop::MakeRawWatcher(
const Channel *channel,
std::function<void(const Context &channel, const void *message)> watcher) {
diff --git a/aos/events/simulated_event_loop.h b/aos/events/simulated_event_loop.h
index 9200044..e6ba4bf 100644
--- a/aos/events/simulated_event_loop.h
+++ b/aos/events/simulated_event_loop.h
@@ -133,6 +133,15 @@
// starts up without stopping execution.
void AllowApplicationCreationDuring(std::function<void()> fn);
+ // Sets the realtime replay rate. A value of 1.0 will cause the scheduler to
+ // try to play events in realtime. 0.5 will run at half speed. Use infinity
+ // (the default) to run as fast as possible. This can be changed during
+ // run-time.
+ void SetRealtimeReplayRate(double replay_rate);
+
+ // Access to the internal scheduler's epoll object for realtime replay.
+ internal::EPoll *scheduler_epoll() { return scheduler_scheduler_.epoll(); }
+
private:
friend class NodeEventLoopFactory;
diff --git a/aos/network/log_web_proxy_main.cc b/aos/network/log_web_proxy_main.cc
index 1cc8e17..34e40a2 100644
--- a/aos/network/log_web_proxy_main.cc
+++ b/aos/network/log_web_proxy_main.cc
@@ -17,6 +17,9 @@
DEFINE_int32(buffer_size, -1, "-1 if infinite, in # of messages / channel.");
DEFINE_double(monotonic_start_time, -1.0, "Start time (sec)");
DEFINE_double(monotonic_end_time, -1.0, "End time (sec)");
+DEFINE_double(
+ replay_rate, -1,
+ "-1 to replay as fast as possible; 1.0 = realtime, 0.5 = half speed.");
int main(int argc, char **argv) {
aos::InitGoogle(&argc, &argv);
@@ -31,6 +34,12 @@
reader.Register();
+ // If going for "as fast as possible" don't actually use infinity, because we
+ // don't want the log reading blocking our use of the epoll handlers.
+ reader.SetRealtimeReplayRate(FLAGS_replay_rate == -1.0
+ ? std::numeric_limits<double>::max()
+ : FLAGS_replay_rate);
+
std::unique_ptr<aos::EventLoop> event_loop;
if (FLAGS_node.empty()) {
@@ -55,13 +64,12 @@
}
aos::web_proxy::WebProxy web_proxy(
- event_loop.get(), aos::web_proxy::StoreHistory::kYes, FLAGS_buffer_size);
+ event_loop.get(),
+ reader.event_loop_factory()->scheduler_epoll(),
+ aos::web_proxy::StoreHistory::kYes, FLAGS_buffer_size);
web_proxy.SetDataPath(FLAGS_data_dir.c_str());
- // Keep the web proxy alive past when we finish reading the logfile.
- reader.set_exit_on_finish(false);
-
if (FLAGS_monotonic_end_time > 0) {
event_loop->AddTimer([&web_proxy]() { web_proxy.StopRecording(); })
->Setup(aos::monotonic_clock::time_point(
@@ -70,4 +78,11 @@
}
reader.event_loop_factory()->Run();
+
+ // Keep the web proxy alive past when we finish reading the logfile, but crank
+ // down the replay rate so that we don't peg our entire CPU just trying to
+ // service timers in the web proxy code.
+ reader.set_exit_on_finish(false);
+ reader.SetRealtimeReplayRate(1.0);
+ reader.event_loop_factory()->Run();
}
diff --git a/aos/network/web_proxy.h b/aos/network/web_proxy.h
index 0c1d1dc..2b57c05 100644
--- a/aos/network/web_proxy.h
+++ b/aos/network/web_proxy.h
@@ -77,6 +77,8 @@
int per_channel_buffer_size_bytes);
WebProxy(aos::ShmEventLoop *event_loop, StoreHistory store_history,
int per_channel_buffer_size_bytes);
+ WebProxy(aos::EventLoop *event_loop, aos::internal::EPoll *epoll,
+ StoreHistory store_history, int per_channel_buffer_size_bytes);
~WebProxy();
void SetDataPath(const char *path) { server_.setStaticPath(path); }
@@ -85,9 +87,6 @@
void StopRecording();
private:
- WebProxy(aos::EventLoop *event_loop, aos::internal::EPoll *epoll,
- StoreHistory store_history, int per_channel_buffer_size_bytes);
-
aos::internal::EPoll internal_epoll_;
aos::internal::EPoll *const epoll_;
::seasocks::Server server_;
diff --git a/frc971/analysis/in_process_plotter.cc b/frc971/analysis/in_process_plotter.cc
index 545740d..33d999e 100644
--- a/frc971/analysis/in_process_plotter.cc
+++ b/frc971/analysis/in_process_plotter.cc
@@ -15,7 +15,8 @@
event_loop_factory_(&config_.message()),
event_loop_(event_loop_factory_.MakeEventLoop("plotter")),
plot_sender_(event_loop_->MakeSender<Plot>("/analysis")),
- web_proxy_(event_loop_.get(), aos::web_proxy::StoreHistory::kYes, -1),
+ web_proxy_(event_loop_.get(), event_loop_factory_.scheduler_epoll(),
+ aos::web_proxy::StoreHistory::kYes, -1),
builder_(plot_sender_.MakeBuilder()) {
web_proxy_.SetDataPath(kDataPath);
event_loop_->SkipTimingReport();
@@ -41,7 +42,11 @@
ColorWheelColor{.name = "white", .color = {1, 1, 1}});
}
-void Plotter::Spin() { event_loop_factory_.Run(); }
+void Plotter::Spin() {
+ // Set non-infinite replay rate to avoid pegging a full CPU.
+ event_loop_factory_.SetRealtimeReplayRate(1.0);
+ event_loop_factory_.Run();
+}
void Plotter::Title(std::string_view title) {
title_ = builder_.fbb()->CreateString(title);