Actually exit LogReader at end_time

We were ceasing *replay* of messages in the logfile, but still parsed
and read the entire file. On sufficiently large logfiles (or logs with
slow read speeds), this can cause significant slow-down as we spin
without doing anything.

Change-Id: Ice836ca598b2bc003bd203f87143c76f9835f6c5
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/log_reader.cc b/aos/events/logging/log_reader.cc
index d3ce16b..fb63e79 100644
--- a/aos/events/logging/log_reader.cc
+++ b/aos/events/logging/log_reader.cc
@@ -617,8 +617,8 @@
         filtered_parts.size() == 0u
             ? nullptr
             : std::make_unique<TimestampMapper>(std::move(filtered_parts)),
-        filters_.get(), node, State::ThreadedBuffering::kNo,
-        MaybeMakeReplayChannelIndicies(node));
+        filters_.get(), std::bind(&LogReader::NoticeRealtimeEnd, this), node,
+        State::ThreadedBuffering::kNo, MaybeMakeReplayChannelIndicies(node));
     State *state = states_[node_index].get();
     state->SetNodeEventLoopFactory(
         event_loop_factory_->GetNodeEventLoopFactory(node),
@@ -808,8 +808,8 @@
         filtered_parts.size() == 0u
             ? nullptr
             : std::make_unique<TimestampMapper>(std::move(filtered_parts)),
-        filters_.get(), node, State::ThreadedBuffering::kYes,
-        MaybeMakeReplayChannelIndicies(node));
+        filters_.get(), std::bind(&LogReader::NoticeRealtimeEnd, this), node,
+        State::ThreadedBuffering::kYes, MaybeMakeReplayChannelIndicies(node));
     State *state = states_[node_index].get();
 
     state->SetChannelCount(logged_configuration()->channels()->size());
@@ -964,7 +964,7 @@
       VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Node down!";
       if (exit_on_finish_ && live_nodes_ == 0 &&
           event_loop_factory_ != nullptr) {
-        CHECK_NOTNULL(event_loop_factory_)->Exit();
+        event_loop_factory_->Exit();
       }
       return;
     }
@@ -1233,6 +1233,7 @@
     }
     if (end_time_ != realtime_clock::max_time) {
       state->SetEndTimeFlag(end_time_);
+      ++live_nodes_with_realtime_time_end_;
     }
     event_loop->OnRun([state]() {
       BootTimestamp next_time = state->SingleThreadedOldestMessageTime();
@@ -1745,9 +1746,11 @@
 LogReader::State::State(
     std::unique_ptr<TimestampMapper> timestamp_mapper,
     message_bridge::MultiNodeNoncausalOffsetEstimator *multinode_filters,
-    const Node *node, LogReader::State::ThreadedBuffering threading,
+    std::function<void()> notice_realtime_end, const Node *node,
+    LogReader::State::ThreadedBuffering threading,
     std::unique_ptr<const ReplayChannelIndicies> replay_channel_indicies)
     : timestamp_mapper_(std::move(timestamp_mapper)),
+      notice_realtime_end_(notice_realtime_end),
       node_(node),
       multinode_filters_(multinode_filters),
       threading_(threading),
@@ -2347,6 +2350,8 @@
   if (!stopped_ && started_) {
     RunOnEnd();
     SetFoundLastMessage(true);
+    CHECK(notice_realtime_end_);
+    notice_realtime_end_();
   }
 }
 
@@ -2372,5 +2377,14 @@
   event_loop_factory_->SetRealtimeReplayRate(replay_rate);
 }
 
+void LogReader::NoticeRealtimeEnd() {
+  CHECK_GE(live_nodes_with_realtime_time_end_, 1u);
+  --live_nodes_with_realtime_time_end_;
+  if (live_nodes_with_realtime_time_end_ == 0 && exit_on_finish() &&
+      event_loop_factory_ != nullptr) {
+    event_loop_factory_->Exit();
+  }
+}
+
 }  // namespace logger
 }  // namespace aos