Make LogReader::OnStart able to start applications at times
There are 2 main features missing from OnStart to make it truly useful
at scale.
1) We need to be able to start applications...
2) Only replaying a time range (typically on the RT clock) is very
helpful.
Add support for 1.
2 is more tricky. I considered putting the logic outside LogReader, but
in the end, the bookkeeping for starting and stopping is bad enough that
it really helps to have LogReader manage and synchronize it. Maybe we
can refactor it back out later.
Change-Id: I4ddf6e18819d3aadd02f38776bbc7aef843a96b0
Signed-off-by: Austin Schuh <austin.schuh@bluerivertech.com>
diff --git a/aos/events/logging/log_reader.cc b/aos/events/logging/log_reader.cc
index e58ccd2..4e89a0e 100644
--- a/aos/events/logging/log_reader.cc
+++ b/aos/events/logging/log_reader.cc
@@ -39,6 +39,13 @@
time_estimation_buffer_seconds, 2.0,
"The time to buffer ahead in the log file to accurately reconstruct time.");
+DEFINE_string(
+ start_time, "",
+ "If set, start at this point in time in the log on the realtime clock.");
+DEFINE_string(
+ end_time, "",
+ "If set, end at this point in time in the log on the realtime clock.");
+
namespace aos {
namespace configuration {
// We don't really want to expose this publicly, but log reader doesn't really
@@ -131,6 +138,79 @@
using message_bridge::RemoteMessage;
} // namespace
+// Class to manage triggering events on the RT clock while replaying logs. Since
+// the RT clock can only change when we get a message, we only need to update
+// our timers when new messages are read.
+class EventNotifier {
+ public:
+ EventNotifier(EventLoop *event_loop, std::function<void()> fn,
+ std::string_view name,
+ realtime_clock::time_point realtime_event_time)
+ : event_loop_(event_loop),
+ fn_(std::move(fn)),
+ realtime_event_time_(realtime_event_time) {
+ CHECK(event_loop_);
+ event_timer_ = event_loop->AddTimer([this]() { HandleTime(); });
+
+ if (event_loop_->node() != nullptr) {
+ event_timer_->set_name(
+ absl::StrCat(event_loop_->node()->name()->string_view(), "_", name));
+ } else {
+ event_timer_->set_name(name);
+ }
+ }
+
+ ~EventNotifier() { event_timer_->Disable(); }
+
+ // Returns the event trigger time.
+ realtime_clock::time_point realtime_event_time() const {
+ return realtime_event_time_;
+ }
+
+ // Observes the next message and potentially calls the callback or updates the
+ // timer.
+ void ObserveNextMessage(monotonic_clock::time_point monotonic_message_time,
+ realtime_clock::time_point realtime_message_time) {
+ if (realtime_message_time < realtime_event_time_) {
+ return;
+ }
+ if (called_) {
+ return;
+ }
+
+ // Move the callback wakeup time to the correct time (or make it now if
+ // there's a gap in time) now that we know it is before the next
+ // message.
+ const monotonic_clock::time_point candidate_monotonic =
+ (realtime_event_time_ - realtime_message_time) + monotonic_message_time;
+ const monotonic_clock::time_point monotonic_now =
+ event_loop_->monotonic_now();
+ if (candidate_monotonic < monotonic_now) {
+ // Whops, time went backwards. Just do it now.
+ HandleTime();
+ } else {
+ event_timer_->Setup(candidate_monotonic);
+ }
+ }
+
+ private:
+ void HandleTime() {
+ if (!called_) {
+ called_ = true;
+ fn_();
+ }
+ }
+
+ EventLoop *event_loop_ = nullptr;
+ TimerHandler *event_timer_ = nullptr;
+ std::function<void()> fn_;
+
+ const realtime_clock::time_point realtime_event_time_ =
+ realtime_clock::min_time;
+
+ bool called_ = false;
+};
+
LogReader::LogReader(std::string_view filename,
const Configuration *replay_configuration)
: LogReader(SortParts({std::string(filename)}), replay_configuration) {}
@@ -139,6 +219,9 @@
const Configuration *replay_configuration)
: log_files_(std::move(log_files)),
replay_configuration_(replay_configuration) {
+ SetStartTime(FLAGS_start_time);
+ SetEndTime(FLAGS_end_time);
+
CHECK_GT(log_files_.size(), 0u);
{
// Validate that we have the same config everwhere. This will be true if
@@ -329,8 +412,15 @@
VLOG(1) << "Starting " << MaybeNodeName(node()) << "at time "
<< monotonic_start_time(boot_count());
- for (size_t i = 0; i < on_starts_.size(); ++i) {
- on_starts_[i]();
+ auto fn = [this]() {
+ for (size_t i = 0; i < on_starts_.size(); ++i) {
+ on_starts_[i]();
+ }
+ };
+ if (event_loop_factory_) {
+ event_loop_factory_->AllowApplicationCreationDuring(std::move(fn));
+ } else {
+ fn();
}
stopped_ = false;
started_ = true;
@@ -358,12 +448,19 @@
void LogReader::State::RunOnEnd() {
VLOG(1) << "Ending " << MaybeNodeName(node()) << "at time "
<< monotonic_start_time(boot_count());
- for (size_t i = 0; i < on_ends_.size(); ++i) {
- on_ends_[i]();
+ auto fn = [this]() {
+ for (size_t i = 0; i < on_ends_.size(); ++i) {
+ on_ends_[i]();
+ }
+ };
+ if (event_loop_factory_) {
+ event_loop_factory_->AllowApplicationCreationDuring(std::move(fn));
+ } else {
+ fn();
}
stopped_ = true;
- started_ = false;
+ started_ = true;
}
void LogReader::Register() {
@@ -397,7 +494,8 @@
node);
State *state = states_[node_index].get();
state->SetNodeEventLoopFactory(
- event_loop_factory_->GetNodeEventLoopFactory(node));
+ event_loop_factory_->GetNodeEventLoopFactory(node),
+ event_loop_factory_);
state->SetChannelCount(logged_configuration()->channels()->size());
timestamp_mappers.emplace_back(state->timestamp_mapper());
@@ -489,6 +587,11 @@
void LogReader::Register(SimulatedEventLoopFactory *event_loop_factory) {
RegisterWithoutStarting(event_loop_factory);
+ StartAfterRegister(event_loop_factory);
+}
+
+void LogReader::StartAfterRegister(
+ SimulatedEventLoopFactory *event_loop_factory) {
// We want to start the log file at the last start time of the log files
// from all the nodes. Compute how long each node's simulation needs to run
// to move time to this point.
@@ -588,6 +691,10 @@
State *state =
states_[configuration::GetNodeIndex(configuration(), node)].get();
+ if (!event_loop) {
+ state->ClearTimeFlags();
+ }
+
state->set_event_loop(event_loop);
// We don't run timing reports when trying to print out logged data, because
@@ -889,7 +996,7 @@
<< "is on the next boot, " << next_time << " now is "
<< state->monotonic_now();
CHECK(event_loop_factory_);
- state->RunOnEnd();
+ state->NotifyLogfileEnd();
return;
}
VLOG(1) << "Scheduling " << MaybeNodeName(state->event_loop()->node())
@@ -900,7 +1007,7 @@
} else {
VLOG(1) << MaybeNodeName(state->event_loop()->node())
<< "No next message, scheduling shutdown";
- state->RunOnEnd();
+ state->NotifyLogfileEnd();
// Set a timer up immediately after now to die. If we don't do this,
// then the senders waiting on the message we just read will never get
// called.
@@ -917,7 +1024,13 @@
if (state->OldestMessageTime() != BootTimestamp::max_time()) {
state->set_startup_timer(
- event_loop->AddTimer([state]() { state->RunOnStart(); }));
+ event_loop->AddTimer([state]() { state->NotifyLogfileStart(); }));
+ if (start_time_ != realtime_clock::min_time) {
+ state->SetStartTimeFlag(start_time_);
+ }
+ if (end_time_ != realtime_clock::max_time) {
+ state->SetEndTimeFlag(end_time_);
+ }
event_loop->OnRun([state]() {
BootTimestamp next_time = state->OldestMessageTime();
CHECK_EQ(next_time.boot, state->boot_count());
@@ -927,6 +1040,40 @@
}
}
+void LogReader::SetEndTime(std::string end_time) {
+ if (end_time.empty()) {
+ SetEndTime(realtime_clock::max_time);
+ } else {
+ std::optional<aos::realtime_clock::time_point> parsed_end_time =
+ aos::realtime_clock::FromString(end_time);
+ CHECK(parsed_end_time) << ": Failed to parse end time '" << end_time
+ << "'. Expected a date in the format of "
+ "2021-01-15_15-30-35.000000000.";
+ SetEndTime(*parsed_end_time);
+ }
+}
+
+void LogReader::SetEndTime(realtime_clock::time_point end_time) {
+ end_time_ = end_time;
+}
+
+void LogReader::SetStartTime(std::string start_time) {
+ if (start_time.empty()) {
+ SetStartTime(realtime_clock::min_time);
+ } else {
+ std::optional<aos::realtime_clock::time_point> parsed_start_time =
+ aos::realtime_clock::FromString(start_time);
+ CHECK(parsed_start_time) << ": Failed to parse start time '" << start_time
+ << "'. Expected a date in the format of "
+ "2021-01-15_15-30-35.000000000.";
+ SetStartTime(*parsed_start_time);
+ }
+}
+
+void LogReader::SetStartTime(realtime_clock::time_point start_time) {
+ start_time_ = start_time;
+}
+
void LogReader::Deregister() {
// Make sure that things get destroyed in the correct order, rather than
// relying on getting the order correct in the class definition.
@@ -1312,8 +1459,10 @@
}
void LogReader::State::SetNodeEventLoopFactory(
- NodeEventLoopFactory *node_event_loop_factory) {
+ NodeEventLoopFactory *node_event_loop_factory,
+ SimulatedEventLoopFactory *event_loop_factory) {
node_event_loop_factory_ = node_event_loop_factory;
+ event_loop_factory_ = event_loop_factory;
}
void LogReader::State::SetChannelCount(size_t count) {
@@ -1670,7 +1819,7 @@
return result;
}
-BootTimestamp LogReader::State::OldestMessageTime() const {
+BootTimestamp LogReader::State::OldestMessageTime() {
if (timestamp_mapper_ == nullptr) {
return BootTimestamp::max_time();
}
@@ -1680,6 +1829,12 @@
}
VLOG(2) << MaybeNodeName(node()) << "oldest message at "
<< result_ptr->monotonic_event_time.time;
+
+ if (result_ptr->monotonic_event_time.boot == boot_count()) {
+ ObserveNextMessage(result_ptr->monotonic_event_time.time,
+ result_ptr->realtime_event_time);
+ }
+
return result_ptr->monotonic_event_time;
}
@@ -1692,11 +1847,12 @@
void LogReader::State::Deregister() {
if (started_ && !stopped_) {
- RunOnEnd();
+ NotifyLogfileEnd();
}
for (size_t i = 0; i < channels_.size(); ++i) {
channels_[i].reset();
}
+ ClearTimeFlags();
channel_timestamp_loggers_.clear();
timestamp_loggers_.clear();
event_loop_unique_ptr_.reset();
@@ -1705,5 +1861,75 @@
node_event_loop_factory_ = nullptr;
}
+void LogReader::State::SetStartTimeFlag(realtime_clock::time_point start_time) {
+ if (start_time != realtime_clock::min_time) {
+ start_event_notifier_ = std::make_unique<EventNotifier>(
+ event_loop_, [this]() { NotifyFlagStart(); }, "flag_start", start_time);
+ }
+}
+
+void LogReader::State::SetEndTimeFlag(realtime_clock::time_point end_time) {
+ if (end_time != realtime_clock::max_time) {
+ end_event_notifier_ = std::make_unique<EventNotifier>(
+ event_loop_, [this]() { NotifyFlagEnd(); }, "flag_end", end_time);
+ }
+}
+
+void LogReader::State::ObserveNextMessage(
+ monotonic_clock::time_point monotonic_event,
+ realtime_clock::time_point realtime_event) {
+ if (start_event_notifier_) {
+ start_event_notifier_->ObserveNextMessage(monotonic_event, realtime_event);
+ }
+ if (end_event_notifier_) {
+ end_event_notifier_->ObserveNextMessage(monotonic_event, realtime_event);
+ }
+}
+
+void LogReader::State::ClearTimeFlags() {
+ start_event_notifier_.reset();
+ end_event_notifier_.reset();
+}
+
+void LogReader::State::NotifyLogfileStart() {
+ if (start_event_notifier_) {
+ if (start_event_notifier_->realtime_event_time() >
+ realtime_start_time(boot_count())) {
+ VLOG(1) << "Skipping, " << start_event_notifier_->realtime_event_time()
+ << " > " << realtime_start_time(boot_count());
+ return;
+ }
+ }
+ if (found_last_message_) {
+ VLOG(1) << "Last message already found, bailing";
+ return;
+ }
+ RunOnStart();
+}
+
+void LogReader::State::NotifyFlagStart() {
+ if (start_event_notifier_->realtime_event_time() >=
+ realtime_start_time(boot_count())) {
+ RunOnStart();
+ }
+}
+
+void LogReader::State::NotifyLogfileEnd() {
+ if (found_last_message_) {
+ return;
+ }
+
+ if (!stopped_ && started_) {
+ RunOnEnd();
+ }
+}
+
+void LogReader::State::NotifyFlagEnd() {
+ if (!stopped_ && started_) {
+ RunOnEnd();
+ SetFoundLastMessage(true);
+ }
+}
+
} // namespace logger
} // namespace aos