Actually exit LogReader at end_time
We were ceasing *replay* of messages in the logfile, but still parsed
and read the entire file. On sufficiently large logfiles (or logs with
slow read speeds), this can cause significant slow-down as we spin
without doing anything.
Change-Id: Ice836ca598b2bc003bd203f87143c76f9835f6c5
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/log_reader.cc b/aos/events/logging/log_reader.cc
index d3ce16b..fb63e79 100644
--- a/aos/events/logging/log_reader.cc
+++ b/aos/events/logging/log_reader.cc
@@ -617,8 +617,8 @@
filtered_parts.size() == 0u
? nullptr
: std::make_unique<TimestampMapper>(std::move(filtered_parts)),
- filters_.get(), node, State::ThreadedBuffering::kNo,
- MaybeMakeReplayChannelIndicies(node));
+ filters_.get(), std::bind(&LogReader::NoticeRealtimeEnd, this), node,
+ State::ThreadedBuffering::kNo, MaybeMakeReplayChannelIndicies(node));
State *state = states_[node_index].get();
state->SetNodeEventLoopFactory(
event_loop_factory_->GetNodeEventLoopFactory(node),
@@ -808,8 +808,8 @@
filtered_parts.size() == 0u
? nullptr
: std::make_unique<TimestampMapper>(std::move(filtered_parts)),
- filters_.get(), node, State::ThreadedBuffering::kYes,
- MaybeMakeReplayChannelIndicies(node));
+ filters_.get(), std::bind(&LogReader::NoticeRealtimeEnd, this), node,
+ State::ThreadedBuffering::kYes, MaybeMakeReplayChannelIndicies(node));
State *state = states_[node_index].get();
state->SetChannelCount(logged_configuration()->channels()->size());
@@ -964,7 +964,7 @@
VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Node down!";
if (exit_on_finish_ && live_nodes_ == 0 &&
event_loop_factory_ != nullptr) {
- CHECK_NOTNULL(event_loop_factory_)->Exit();
+ event_loop_factory_->Exit();
}
return;
}
@@ -1233,6 +1233,7 @@
}
if (end_time_ != realtime_clock::max_time) {
state->SetEndTimeFlag(end_time_);
+ ++live_nodes_with_realtime_time_end_;
}
event_loop->OnRun([state]() {
BootTimestamp next_time = state->SingleThreadedOldestMessageTime();
@@ -1745,9 +1746,11 @@
LogReader::State::State(
std::unique_ptr<TimestampMapper> timestamp_mapper,
message_bridge::MultiNodeNoncausalOffsetEstimator *multinode_filters,
- const Node *node, LogReader::State::ThreadedBuffering threading,
+ std::function<void()> notice_realtime_end, const Node *node,
+ LogReader::State::ThreadedBuffering threading,
std::unique_ptr<const ReplayChannelIndicies> replay_channel_indicies)
: timestamp_mapper_(std::move(timestamp_mapper)),
+ notice_realtime_end_(notice_realtime_end),
node_(node),
multinode_filters_(multinode_filters),
threading_(threading),
@@ -2347,6 +2350,8 @@
if (!stopped_ && started_) {
RunOnEnd();
SetFoundLastMessage(true);
+ CHECK(notice_realtime_end_);
+ notice_realtime_end_();
}
}
@@ -2372,5 +2377,14 @@
event_loop_factory_->SetRealtimeReplayRate(replay_rate);
}
+void LogReader::NoticeRealtimeEnd() {
+ CHECK_GE(live_nodes_with_realtime_time_end_, 1u);
+ --live_nodes_with_realtime_time_end_;
+ if (live_nodes_with_realtime_time_end_ == 0 && exit_on_finish() &&
+ event_loop_factory_ != nullptr) {
+ event_loop_factory_->Exit();
+ }
+}
+
} // namespace logger
} // namespace aos