Make sure OnEnd is always called.

OnEnd was not being called if a log file had missing data and terminated
early.  A lot of applications like to use OnEnd to be notified that a
log has ended, and to trigger cleanup/writing to disk behavior.  This
seems like a reasonable assumption worth supporting.

Add a test to confirm this behavior explicitly, then add the check to
every test (number of starts == number of stops), then fix it.

Change-Id: I1b0aeeb963f813c83a625690827034394a2c379c
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/log_reader.cc b/aos/events/logging/log_reader.cc
index 850a564..50fddb0 100644
--- a/aos/events/logging/log_reader.cc
+++ b/aos/events/logging/log_reader.cc
@@ -381,9 +381,6 @@
 
   stopped_ = true;
   started_ = true;
-  if (message_queuer_.has_value()) {
-    message_queuer_->StopPushing();
-  }
 }
 
 std::vector<
@@ -623,8 +620,9 @@
                                state->realtime_start_time(0));
     }
     VLOG(1) << "Start time is " << state->monotonic_start_time(0)
-            << " for node '" << MaybeNodeName(state->event_loop()->node())
-            << "' now " << state->monotonic_now();
+            << " for node '" << MaybeNodeName(state->node()) << "' now "
+            << (state->event_loop() != nullptr ? state->monotonic_now()
+                                               : monotonic_clock::min_time);
   }
 
   if (FLAGS_timestamps_to_csv) {
@@ -1024,6 +1022,9 @@
                      "from one of the nodes it is logged on.";
             }
           }
+          // The log file is now done, prod the callbacks.
+          state->NotifyLogfileEnd();
+
           // Now that we found the end of one channel, artificially stop the
           // rest by setting the found_last_message bit.  It is confusing when
           // part of your data gets replayed but not all.  The rest of them will
@@ -1955,6 +1956,10 @@
     SetFoundLastMessage(true);
     CHECK(notice_realtime_end_);
     notice_realtime_end_();
+
+    if (message_queuer_.has_value()) {
+      message_queuer_->StopPushing();
+    }
   }
 }
 
diff --git a/aos/events/logging/multinode_logger_test.cc b/aos/events/logging/multinode_logger_test.cc
index a055006..4980b50 100644
--- a/aos/events/logging/multinode_logger_test.cc
+++ b/aos/events/logging/multinode_logger_test.cc
@@ -4276,6 +4276,48 @@
   // up in both logs correctly.
 }
 
+// Tests that we call OnEnd without --skip_missing_forwarding_entries.
+TEST_P(MultinodeLoggerTest, SkipMissingForwardingEntries) {
+  if (file_strategy() == FileStrategy::kCombine) {
+    GTEST_SKIP() << "We don't need to test the combined file writer this deep.";
+  }
+  time_converter_.AddMonotonic(
+      {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
+
+  std::vector<std::string> filenames;
+  {
+    LoggerState pi1_logger = MakeLogger(pi1_);
+
+    event_loop_factory_.RunFor(chrono::milliseconds(95));
+
+    StartLogger(&pi1_logger);
+    aos::monotonic_clock::time_point last_rotation_time =
+        pi1_logger.event_loop->monotonic_now();
+    pi1_logger.logger->set_on_logged_period(
+        [&](aos::monotonic_clock::time_point) {
+          const auto now = pi1_logger.event_loop->monotonic_now();
+          if (now > last_rotation_time + std::chrono::seconds(5)) {
+            pi1_logger.logger->Rotate();
+            last_rotation_time = now;
+          }
+        });
+
+    event_loop_factory_.RunFor(chrono::milliseconds(15000));
+    pi1_logger.AppendAllFilenames(&filenames);
+  }
+
+  // If we remove the last remote data part, we'll trigger missing data for
+  // timestamps.
+  filenames.erase(std::remove_if(filenames.begin(), filenames.end(),
+                                 [](const std::string &s) {
+                                   return s.find("data/pi2_data.part3.bfbs") !=
+                                          std::string::npos;
+                                 }),
+                  filenames.end());
+
+  auto result = ConfirmReadable(filenames);
+}
+
 // Tests that when we have evidence of 2 boots, and then start logging, the
 // max_out_of_order_duration ends up reasonable on the boot with the start time.
 TEST(MultinodeLoggerLoopTest, PreviousBootData) {
diff --git a/aos/events/logging/multinode_logger_test_lib.cc b/aos/events/logging/multinode_logger_test_lib.cc
index 5574c6e..9a905b1 100644
--- a/aos/events/logging/multinode_logger_test_lib.cc
+++ b/aos/events/logging/multinode_logger_test_lib.cc
@@ -494,6 +494,9 @@
     reader.Deregister();
 
     for (auto x : result) {
+      EXPECT_EQ(x.first.size(), x.second.size())
+          << ": Got a different number of start and end times, that is very "
+             "bad.";
       for (auto y : x.first) {
         VLOG(1) << "Start " << y;
       }