Actually exit LogReader at end_time
We were ceasing *replay* of messages in the logfile, but still parsed
and read the entire file. On sufficiently large logfiles (or logs with
slow read speeds), this can cause significant slow-down as we spin
without doing anything.
Change-Id: Ice836ca598b2bc003bd203f87143c76f9835f6c5
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/log_reader.cc b/aos/events/logging/log_reader.cc
index d3ce16b..fb63e79 100644
--- a/aos/events/logging/log_reader.cc
+++ b/aos/events/logging/log_reader.cc
@@ -617,8 +617,8 @@
filtered_parts.size() == 0u
? nullptr
: std::make_unique<TimestampMapper>(std::move(filtered_parts)),
- filters_.get(), node, State::ThreadedBuffering::kNo,
- MaybeMakeReplayChannelIndicies(node));
+ filters_.get(), std::bind(&LogReader::NoticeRealtimeEnd, this), node,
+ State::ThreadedBuffering::kNo, MaybeMakeReplayChannelIndicies(node));
State *state = states_[node_index].get();
state->SetNodeEventLoopFactory(
event_loop_factory_->GetNodeEventLoopFactory(node),
@@ -808,8 +808,8 @@
filtered_parts.size() == 0u
? nullptr
: std::make_unique<TimestampMapper>(std::move(filtered_parts)),
- filters_.get(), node, State::ThreadedBuffering::kYes,
- MaybeMakeReplayChannelIndicies(node));
+ filters_.get(), std::bind(&LogReader::NoticeRealtimeEnd, this), node,
+ State::ThreadedBuffering::kYes, MaybeMakeReplayChannelIndicies(node));
State *state = states_[node_index].get();
state->SetChannelCount(logged_configuration()->channels()->size());
@@ -964,7 +964,7 @@
VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Node down!";
if (exit_on_finish_ && live_nodes_ == 0 &&
event_loop_factory_ != nullptr) {
- CHECK_NOTNULL(event_loop_factory_)->Exit();
+ event_loop_factory_->Exit();
}
return;
}
@@ -1233,6 +1233,7 @@
}
if (end_time_ != realtime_clock::max_time) {
state->SetEndTimeFlag(end_time_);
+ ++live_nodes_with_realtime_time_end_;
}
event_loop->OnRun([state]() {
BootTimestamp next_time = state->SingleThreadedOldestMessageTime();
@@ -1745,9 +1746,11 @@
LogReader::State::State(
std::unique_ptr<TimestampMapper> timestamp_mapper,
message_bridge::MultiNodeNoncausalOffsetEstimator *multinode_filters,
- const Node *node, LogReader::State::ThreadedBuffering threading,
+ std::function<void()> notice_realtime_end, const Node *node,
+ LogReader::State::ThreadedBuffering threading,
std::unique_ptr<const ReplayChannelIndicies> replay_channel_indicies)
: timestamp_mapper_(std::move(timestamp_mapper)),
+ notice_realtime_end_(notice_realtime_end),
node_(node),
multinode_filters_(multinode_filters),
threading_(threading),
@@ -2347,6 +2350,8 @@
if (!stopped_ && started_) {
RunOnEnd();
SetFoundLastMessage(true);
+ CHECK(notice_realtime_end_);
+ notice_realtime_end_();
}
}
@@ -2372,5 +2377,14 @@
event_loop_factory_->SetRealtimeReplayRate(replay_rate);
}
+void LogReader::NoticeRealtimeEnd() {
+ CHECK_GE(live_nodes_with_realtime_time_end_, 1u);
+ --live_nodes_with_realtime_time_end_;
+ if (live_nodes_with_realtime_time_end_ == 0 && exit_on_finish() &&
+ event_loop_factory_ != nullptr) {
+ event_loop_factory_->Exit();
+ }
+}
+
} // namespace logger
} // namespace aos
diff --git a/aos/events/logging/log_reader.h b/aos/events/logging/log_reader.h
index 0d50fb9..fd5a935 100644
--- a/aos/events/logging/log_reader.h
+++ b/aos/events/logging/log_reader.h
@@ -260,6 +260,7 @@
void set_exit_on_finish(bool exit_on_finish) {
exit_on_finish_ = exit_on_finish;
}
+ bool exit_on_finish() const { return exit_on_finish_; }
// Sets the realtime replay rate. A value of 1.0 will cause the scheduler to
// try to play events in realtime. 0.5 will run at half speed. Use infinity
@@ -289,6 +290,10 @@
: logged_configuration()->nodes()->size();
}
+ // Handles when an individual node hits the realtime end time, exitting the
+ // entire event loop once all nodes are stopped.
+ void NoticeRealtimeEnd();
+
const std::vector<LogFile> log_files_;
// Class to manage sending RemoteMessages on the provided node after the
@@ -343,7 +348,8 @@
enum class ThreadedBuffering { kYes, kNo };
State(std::unique_ptr<TimestampMapper> timestamp_mapper,
message_bridge::MultiNodeNoncausalOffsetEstimator *multinode_filters,
- const Node *node, ThreadedBuffering threading,
+ std::function<void()> notice_realtime_end, const Node *node,
+ ThreadedBuffering threading,
std::unique_ptr<const ReplayChannelIndicies> replay_channel_indicies);
// Connects up the timestamp mappers.
@@ -667,6 +673,9 @@
NodeEventLoopFactory *node_event_loop_factory_ = nullptr;
SimulatedEventLoopFactory *event_loop_factory_ = nullptr;
+ // Callback for when this node hits its realtime end time.
+ std::function<void()> notice_realtime_end_;
+
std::unique_ptr<EventLoop> event_loop_unique_ptr_;
// Event loop.
const Node *node_ = nullptr;
@@ -785,6 +794,10 @@
// when to exit.
size_t live_nodes_ = 0;
+ // Similar counter to live_nodes_, but for tracking which individual nodes are
+ // running and have yet to hit the realtime end time, if any.
+ size_t live_nodes_with_realtime_time_end_ = 0;
+
const Configuration *remapped_configuration_ = nullptr;
const Configuration *replay_configuration_ = nullptr;