Make sure OnEnd is always called.

OnEnd was not being called if a log file had missing data and terminated
early.  A lot of applications like to use OnEnd to be notified that a
log has ended, and to trigger cleanup/writing to disk behavior.  This
seems like a reasonable assumption worth supporting.

Add a test to confirm this behavior explicitly, then add the check to
every test (number of starts == number of stops), then fix it.

Change-Id: I1b0aeeb963f813c83a625690827034394a2c379c
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/log_reader.cc b/aos/events/logging/log_reader.cc
index 850a564..50fddb0 100644
--- a/aos/events/logging/log_reader.cc
+++ b/aos/events/logging/log_reader.cc
@@ -381,9 +381,6 @@
 
   stopped_ = true;
   started_ = true;
-  if (message_queuer_.has_value()) {
-    message_queuer_->StopPushing();
-  }
 }
 
 std::vector<
@@ -623,8 +620,9 @@
                                state->realtime_start_time(0));
     }
     VLOG(1) << "Start time is " << state->monotonic_start_time(0)
-            << " for node '" << MaybeNodeName(state->event_loop()->node())
-            << "' now " << state->monotonic_now();
+            << " for node '" << MaybeNodeName(state->node()) << "' now "
+            << (state->event_loop() != nullptr ? state->monotonic_now()
+                                               : monotonic_clock::min_time);
   }
 
   if (FLAGS_timestamps_to_csv) {
@@ -1024,6 +1022,9 @@
                      "from one of the nodes it is logged on.";
             }
           }
+          // The log file is now done, prod the callbacks.
+          state->NotifyLogfileEnd();
+
           // Now that we found the end of one channel, artificially stop the
           // rest by setting the found_last_message bit.  It is confusing when
           // part of your data gets replayed but not all.  The rest of them will
@@ -1955,6 +1956,10 @@
     SetFoundLastMessage(true);
     CHECK(notice_realtime_end_);
     notice_realtime_end_();
+
+    if (message_queuer_.has_value()) {
+      message_queuer_->StopPushing();
+    }
   }
 }