Actually exit LogReader at end_time

We were ceasing *replay* of messages in the logfile, but still parsed
and read the entire file. On sufficiently large logfiles (or logs with
slow read speeds), this can cause significant slow-down as we spin
without doing anything.

Change-Id: Ice836ca598b2bc003bd203f87143c76f9835f6c5
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/log_reader.cc b/aos/events/logging/log_reader.cc
index d3ce16b..fb63e79 100644
--- a/aos/events/logging/log_reader.cc
+++ b/aos/events/logging/log_reader.cc
@@ -617,8 +617,8 @@
         filtered_parts.size() == 0u
             ? nullptr
             : std::make_unique<TimestampMapper>(std::move(filtered_parts)),
-        filters_.get(), node, State::ThreadedBuffering::kNo,
-        MaybeMakeReplayChannelIndicies(node));
+        filters_.get(), std::bind(&LogReader::NoticeRealtimeEnd, this), node,
+        State::ThreadedBuffering::kNo, MaybeMakeReplayChannelIndicies(node));
     State *state = states_[node_index].get();
     state->SetNodeEventLoopFactory(
         event_loop_factory_->GetNodeEventLoopFactory(node),
@@ -808,8 +808,8 @@
         filtered_parts.size() == 0u
             ? nullptr
             : std::make_unique<TimestampMapper>(std::move(filtered_parts)),
-        filters_.get(), node, State::ThreadedBuffering::kYes,
-        MaybeMakeReplayChannelIndicies(node));
+        filters_.get(), std::bind(&LogReader::NoticeRealtimeEnd, this), node,
+        State::ThreadedBuffering::kYes, MaybeMakeReplayChannelIndicies(node));
     State *state = states_[node_index].get();
 
     state->SetChannelCount(logged_configuration()->channels()->size());
@@ -964,7 +964,7 @@
       VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Node down!";
       if (exit_on_finish_ && live_nodes_ == 0 &&
           event_loop_factory_ != nullptr) {
-        CHECK_NOTNULL(event_loop_factory_)->Exit();
+        event_loop_factory_->Exit();
       }
       return;
     }
@@ -1233,6 +1233,7 @@
     }
     if (end_time_ != realtime_clock::max_time) {
       state->SetEndTimeFlag(end_time_);
+      ++live_nodes_with_realtime_time_end_;
     }
     event_loop->OnRun([state]() {
       BootTimestamp next_time = state->SingleThreadedOldestMessageTime();
@@ -1745,9 +1746,11 @@
 LogReader::State::State(
     std::unique_ptr<TimestampMapper> timestamp_mapper,
     message_bridge::MultiNodeNoncausalOffsetEstimator *multinode_filters,
-    const Node *node, LogReader::State::ThreadedBuffering threading,
+    std::function<void()> notice_realtime_end, const Node *node,
+    LogReader::State::ThreadedBuffering threading,
     std::unique_ptr<const ReplayChannelIndicies> replay_channel_indicies)
     : timestamp_mapper_(std::move(timestamp_mapper)),
+      notice_realtime_end_(notice_realtime_end),
       node_(node),
       multinode_filters_(multinode_filters),
       threading_(threading),
@@ -2347,6 +2350,8 @@
   if (!stopped_ && started_) {
     RunOnEnd();
     SetFoundLastMessage(true);
+    CHECK(notice_realtime_end_);
+    notice_realtime_end_();
   }
 }
 
@@ -2372,5 +2377,14 @@
   event_loop_factory_->SetRealtimeReplayRate(replay_rate);
 }
 
+void LogReader::NoticeRealtimeEnd() {
+  CHECK_GE(live_nodes_with_realtime_time_end_, 1u);
+  --live_nodes_with_realtime_time_end_;
+  if (live_nodes_with_realtime_time_end_ == 0 && exit_on_finish() &&
+      event_loop_factory_ != nullptr) {
+    event_loop_factory_->Exit();
+  }
+}
+
 }  // namespace logger
 }  // namespace aos
diff --git a/aos/events/logging/log_reader.h b/aos/events/logging/log_reader.h
index 0d50fb9..fd5a935 100644
--- a/aos/events/logging/log_reader.h
+++ b/aos/events/logging/log_reader.h
@@ -260,6 +260,7 @@
   void set_exit_on_finish(bool exit_on_finish) {
     exit_on_finish_ = exit_on_finish;
   }
+  bool exit_on_finish() const { return exit_on_finish_; }
 
   // Sets the realtime replay rate. A value of 1.0 will cause the scheduler to
   // try to play events in realtime. 0.5 will run at half speed. Use infinity
@@ -289,6 +290,10 @@
                : logged_configuration()->nodes()->size();
   }
 
+  // Handles when an individual node hits the realtime end time, exitting the
+  // entire event loop once all nodes are stopped.
+  void NoticeRealtimeEnd();
+
   const std::vector<LogFile> log_files_;
 
   // Class to manage sending RemoteMessages on the provided node after the
@@ -343,7 +348,8 @@
     enum class ThreadedBuffering { kYes, kNo };
     State(std::unique_ptr<TimestampMapper> timestamp_mapper,
           message_bridge::MultiNodeNoncausalOffsetEstimator *multinode_filters,
-          const Node *node, ThreadedBuffering threading,
+          std::function<void()> notice_realtime_end, const Node *node,
+          ThreadedBuffering threading,
           std::unique_ptr<const ReplayChannelIndicies> replay_channel_indicies);
 
     // Connects up the timestamp mappers.
@@ -667,6 +673,9 @@
     NodeEventLoopFactory *node_event_loop_factory_ = nullptr;
     SimulatedEventLoopFactory *event_loop_factory_ = nullptr;
 
+    // Callback for when this node hits its realtime end time.
+    std::function<void()> notice_realtime_end_;
+
     std::unique_ptr<EventLoop> event_loop_unique_ptr_;
     // Event loop.
     const Node *node_ = nullptr;
@@ -785,6 +794,10 @@
   // when to exit.
   size_t live_nodes_ = 0;
 
+  // Similar counter to live_nodes_, but for tracking which individual nodes are
+  // running and have yet to hit the realtime end time, if any.
+  size_t live_nodes_with_realtime_time_end_ = 0;
+
   const Configuration *remapped_configuration_ = nullptr;
   const Configuration *replay_configuration_ = nullptr;