Handle local messages sent before remote messages after reboot

We only generate points in the time interpolation function (and
therefore only discover reboots, since a reboot can only happen at a
point).  When there is data in a log before remote data, we don't
generate these points, and therefore can't replay the log.

F1118 00:05:39.808653    14 log_reader.cc:690] Check failed: monotonic_now == timestamped_message.monotonic_event_time.time : { "name": "pi2", "hostname": "raspberrypi2", "port": 9971 } Now 107.005000000sec trying to send {.boot=1, .time=107.000000000sec} failure node 1 (pi2) [
] queued_until {.boot=1, .time=109.210150000sec}
node 0 remote_data [
  channel 0 [
    {.channel_index=3, .queue_index={.boot=0, .index=91}, .timestamp={.boot=0, .time=9.200000000sec}, .data=0x555c65f23f80}
  ]
  channel 1 [
    {.channel_index=20, .queue_index={.boot=0, .index=622}, .timestamp={.boot=0, .time=9.210000000sec}, .data=0x555c66126a90}
  ]
] queued_until {.boot=0, .time=9.210350000sec}
*** Check failure stack trace: ***
    @     0x7fd804015aef  google::LogMessageFatal::~LogMessageFatal()
    @     0x7fd8045a7069  aos::logger::LogReader::RegisterDuringStartup()::$_3::operator()()
    @     0x7fd804258119  aos::TimerHandler::Call()
    @     0x7fd804257ead  aos::SimulatedTimerHandler::HandleEvent()
    @     0x7fd804263fb1  std::_Function_handler<>::_M_invoke()
    @     0x7fd804252c18  aos::EventScheduler::CallOldestEvent()
    @     0x7fd804253f54  aos::EventSchedulerScheduler::Run()
    @     0x7fd80425b472  aos::SimulatedEventLoopFactory::Run()
    @     0x555c641e780f  aos::logger::testing::ConfirmReadable()
    @     0x555c6421507f  aos::logger::testing::MultinodeRebootLoggerTest_LocalMessageBeforeRemoteBeforeStartAfterReboot_Test::TestBody()
    @     0x7fd803fa0ce4  testing::internal::HandleExceptionsInMethodIfSupported<>()
    @     0x7fd803fa0c21  testing::Test::Run()
    @     0x7fd803fa1fdf  testing::TestInfo::Run()
    @     0x7fd803fa2b07  testing::TestSuite::Run()
    @     0x7fd803fb4d17  testing::internal::UnitTestImpl::RunAllTests()
    @     0x7fd803fb4634  testing::internal::HandleExceptionsInMethodIfSupported<>()
    @     0x7fd803fb44bb  testing::UnitTest::Run()
    @     0x7fd804065010  main
    @     0x7fd803aa4d0a  __libc_start_main
    @     0x555c641e132a  _start

The fix is to generate a point for the first local timestamp too.

Change-Id: Ic1090d35375fce295d91bccc53f0d3ca8e472770
Signed-off-by: Austin Schuh <austin.linux@gmail.com>
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 9c7fe5d..a76b876 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -983,6 +983,8 @@
   if (oldest) {
     CHECK_GE(oldest->timestamp.time, last_message_time_);
     last_message_time_ = oldest->timestamp.time;
+    monotonic_oldest_time_ =
+        std::min(monotonic_oldest_time_, oldest->timestamp.time);
   } else {
     last_message_time_ = monotonic_clock::max_time;
   }
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index 4c82e4d..ccd8bd7 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -550,6 +550,9 @@
   realtime_clock::time_point realtime_start_time() const {
     return realtime_start_time_;
   }
+  monotonic_clock::time_point monotonic_oldest_time() const {
+    return monotonic_oldest_time_;
+  }
 
   // The time this data is sorted until.
   monotonic_clock::time_point sorted_until() const { return sorted_until_; }
@@ -579,6 +582,7 @@
 
   realtime_clock::time_point realtime_start_time_ = realtime_clock::max_time;
   monotonic_clock::time_point monotonic_start_time_ = monotonic_clock::max_time;
+  monotonic_clock::time_point monotonic_oldest_time_ = monotonic_clock::max_time;
 };
 
 // Class to concatenate multiple boots worth of logs into a single per-node
@@ -612,6 +616,10 @@
     CHECK_LT(boot, node_mergers_.size());
     return node_mergers_[boot]->realtime_start_time();
   }
+  monotonic_clock::time_point monotonic_oldest_time(size_t boot) const {
+    CHECK_LT(boot, node_mergers_.size());
+    return node_mergers_[boot]->monotonic_oldest_time();
+  }
 
   bool started() const {
     return node_mergers_[index_]->sorted_until() != monotonic_clock::min_time ||
@@ -665,6 +673,10 @@
   realtime_clock::time_point realtime_start_time(size_t boot) const {
     return boot_merger_.realtime_start_time(boot);
   }
+  // Returns the oldest timestamp on a message on this boot.
+  monotonic_clock::time_point monotonic_oldest_time(size_t boot) const {
+    return boot_merger_.monotonic_oldest_time(boot);
+  }
 
   // Uses timestamp_mapper as the peer for its node. Only one mapper may be set
   // for each node.  Peers are used to look up the data for timestamps on this
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index b58933e..7bf9478 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -2860,6 +2860,147 @@
   ConfirmReadable(filenames);
 }
 
+// Tests that local data before remote data after reboot is properly replayed.
+// We only trigger a reboot in the timestamp interpolation function when solving
+// the timestamp problem when we actually have a point in the function.  This
+// originally only happened when a point passes the noncausal filter.  At the
+// start of time for the second boot, if we aren't careful, we will have
+// messages which need to be published at times before the boot.  This happens
+// when a local message is in the log before a forwarded message, so there is no
+// point in the interpolation function.  This delays the reboot.  So, we need to
+// recreate that situation and make sure it doesn't come back.
+TEST(MultinodeRebootLoggerTest, LocalMessageBeforeRemoteBeforeStartAfterReboot) {
+  aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+      aos::configuration::ReadConfig(ArtifactPath(
+          "aos/events/logging/multinode_pingpong_split3_config.json"));
+  message_bridge::TestingTimeConverter time_converter(
+      configuration::NodesCount(&config.message()));
+  SimulatedEventLoopFactory event_loop_factory(&config.message());
+  event_loop_factory.SetTimeConverter(&time_converter);
+  NodeEventLoopFactory *const pi1 =
+      event_loop_factory.GetNodeEventLoopFactory("pi1");
+  const size_t pi1_index = configuration::GetNodeIndex(
+      event_loop_factory.configuration(), pi1->node());
+  NodeEventLoopFactory *const pi2 =
+      event_loop_factory.GetNodeEventLoopFactory("pi2");
+  const size_t pi2_index = configuration::GetNodeIndex(
+      event_loop_factory.configuration(), pi2->node());
+  NodeEventLoopFactory *const pi3 =
+      event_loop_factory.GetNodeEventLoopFactory("pi3");
+  const size_t pi3_index = configuration::GetNodeIndex(
+      event_loop_factory.configuration(), pi3->node());
+
+  const std::string kLogfile1_1 =
+      aos::testing::TestTmpDir() + "/multi_logfile1/";
+  const std::string kLogfile2_1 =
+      aos::testing::TestTmpDir() + "/multi_logfile2.1/";
+  const std::string kLogfile2_2 =
+      aos::testing::TestTmpDir() + "/multi_logfile2.2/";
+  const std::string kLogfile3_1 =
+      aos::testing::TestTmpDir() + "/multi_logfile3/";
+  util::UnlinkRecursive(kLogfile1_1);
+  util::UnlinkRecursive(kLogfile2_1);
+  util::UnlinkRecursive(kLogfile2_2);
+  util::UnlinkRecursive(kLogfile3_1);
+  const UUID pi1_boot0 = UUID::Random();
+  const UUID pi2_boot0 = UUID::Random();
+  const UUID pi2_boot1 = UUID::Random();
+  const UUID pi3_boot0 = UUID::Random();
+  {
+    CHECK_EQ(pi1_index, 0u);
+    CHECK_EQ(pi2_index, 1u);
+    CHECK_EQ(pi3_index, 2u);
+
+    time_converter.set_boot_uuid(pi1_index, 0, pi1_boot0);
+    time_converter.set_boot_uuid(pi2_index, 0, pi2_boot0);
+    time_converter.set_boot_uuid(pi2_index, 1, pi2_boot1);
+    time_converter.set_boot_uuid(pi3_index, 0, pi3_boot0);
+
+    time_converter.AddNextTimestamp(
+        distributed_clock::epoch(),
+        {BootTimestamp::epoch(), BootTimestamp::epoch(),
+         BootTimestamp::epoch()});
+    const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
+    time_converter.AddNextTimestamp(
+        distributed_clock::epoch() + reboot_time,
+        {BootTimestamp::epoch() + reboot_time,
+         BootTimestamp{
+             .boot = 1,
+             .time = monotonic_clock::epoch() + reboot_time + chrono::seconds(100)},
+         BootTimestamp::epoch() + reboot_time});
+  }
+
+  std::vector<std::string> filenames;
+  {
+    LoggerState pi1_logger = LoggerState::MakeLogger(
+        pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
+    LoggerState pi3_logger = LoggerState::MakeLogger(
+        pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
+    {
+      // And now start the logger.
+      LoggerState pi2_logger = LoggerState::MakeLogger(
+          pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
+
+      pi1_logger.StartLogger(kLogfile1_1);
+      pi3_logger.StartLogger(kLogfile3_1);
+      pi2_logger.StartLogger(kLogfile2_1);
+
+      event_loop_factory.RunFor(chrono::milliseconds(1005));
+
+      // Now that we've got a start time in the past, turn on data.
+      std::unique_ptr<aos::EventLoop> ping_event_loop =
+          pi1->MakeEventLoop("ping");
+      Ping ping(ping_event_loop.get());
+
+      pi2->AlwaysStart<Pong>("pong");
+
+      event_loop_factory.RunFor(chrono::milliseconds(3000));
+
+      pi2_logger.AppendAllFilenames(&filenames);
+
+      // Disable any remote messages on pi2.
+      pi1->Disconnect(pi2->node());
+      pi2->Disconnect(pi1->node());
+    }
+    event_loop_factory.RunFor(chrono::milliseconds(995));
+    // pi2 now reboots at 5 seconds.
+    {
+      event_loop_factory.RunFor(chrono::milliseconds(1000));
+
+      // Make local stuff happen before we start logging and connect the remote.
+      pi2->AlwaysStart<Pong>("pong");
+      std::unique_ptr<aos::EventLoop> ping_event_loop =
+          pi1->MakeEventLoop("ping");
+      Ping ping(ping_event_loop.get());
+      event_loop_factory.RunFor(chrono::milliseconds(1005));
+
+      // Start logging again on pi2 after it is up.
+      LoggerState pi2_logger = LoggerState::MakeLogger(
+          pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
+      pi2_logger.StartLogger(kLogfile2_2);
+
+      // And allow remote messages now that we have some local ones.
+      pi1->Connect(pi2->node());
+      pi2->Connect(pi1->node());
+
+      event_loop_factory.RunFor(chrono::milliseconds(1000));
+
+      event_loop_factory.RunFor(chrono::milliseconds(3000));
+
+      pi2_logger.AppendAllFilenames(&filenames);
+    }
+
+    pi1_logger.AppendAllFilenames(&filenames);
+    pi3_logger.AppendAllFilenames(&filenames);
+  }
+
+  // Confirm that we can parse the result.  LogReader has enough internal CHECKs
+  // to confirm the right thing happened.
+  const std::vector<LogFile> sorted_parts = SortParts(filenames);
+  ConfirmReadable(filenames);
+}
+
+
 }  // namespace testing
 }  // namespace logger
 }  // namespace aos
diff --git a/aos/network/multinode_timestamp_filter.cc b/aos/network/multinode_timestamp_filter.cc
index 4d18d6b..23a9d8e 100644
--- a/aos/network/multinode_timestamp_filter.cc
+++ b/aos/network/multinode_timestamp_filter.cc
@@ -202,7 +202,7 @@
 std::vector<BootTimestamp> TimestampProblem::SolveNewton() {
   constexpr int kMaxIterations = 200;
   MaybeUpdateNodeMapping();
-  VLOG(1) << "Solving for node " << solution_node_ << " at "
+  VLOG(2) << "Solving for node " << solution_node_ << " at "
           << base_clock(solution_node_);
   Eigen::VectorXd data = Eigen::VectorXd::Zero(live_nodes_);
 
@@ -210,7 +210,7 @@
   while (true) {
     Eigen::VectorXd step = Newton(data);
 
-    if (VLOG_IS_ON(1)) {
+    if (VLOG_IS_ON(2)) {
       // Print out the gradient ignoring the component removed by the equality
       // constraint.  This tells us what gradient we are depending to try to
       // finish our solution.
@@ -220,12 +220,12 @@
       Eigen::VectorXd adjusted_grad =
           Gradient(data) + step(live_nodes_) * constraint_jacobian.transpose();
 
-      VLOG(1) << "Adjusted grad " << solution_number << " -> "
+      VLOG(2) << "Adjusted grad " << solution_number << " -> "
               << std::setprecision(12) << std::fixed << std::setfill(' ')
               << adjusted_grad.transpose().format(kHeavyFormat);
     }
 
-    VLOG(1) << "Step " << solution_number << " -> " << std::setprecision(12)
+    VLOG(2) << "Step " << solution_number << " -> " << std::setprecision(12)
             << std::fixed << std::setfill(' ')
             << step.transpose().format(kHeavyFormat);
     // We got there if the max step is small (this is strongly correlated to the
@@ -267,7 +267,7 @@
     }
   }
 
-  VLOG(1) << "Solving for node " << solution_node_ << " of "
+  VLOG(2) << "Solving for node " << solution_node_ << " of "
           << base_clock(solution_node_) << " in " << solution_number
           << " cycles";
   std::vector<BootTimestamp> result(size());
@@ -277,11 +277,11 @@
       result[i].time = base_clock(i).time +
                        std::chrono::nanoseconds(static_cast<int64_t>(
                            std::round(data(NodeToFullSolutionIndex(i)))));
-      VLOG(1) << "live  " << result[i] << " "
+      VLOG(2) << "live  " << result[i] << " "
               << data(NodeToFullSolutionIndex(i));
     } else {
       result[i] = BootTimestamp::min_time();
-      VLOG(1) << "dead  " << result[i];
+      VLOG(2) << "dead  " << result[i];
     }
   }
   if (solution_number > kMaxIterations) {
@@ -458,7 +458,7 @@
     const distributed_clock::time_point result =
         time.time - std::get<1>(times_[0])[node_index].time +
         std::get<0>(times_[0]);
-    VLOG(2) << "ToDistributedClock(" << node_index << ", " << time << ") -> "
+    VLOG(3) << "ToDistributedClock(" << node_index << ", " << time << ") -> "
             << result;
     return result;
   }
@@ -484,7 +484,7 @@
 
   if (time > t1) {
     const distributed_clock::time_point result = (time.time - t1.time) + d1;
-    VLOG(2) << "ToDistributedClock(" << node_index << ", " << time << ") -> "
+    VLOG(3) << "ToDistributedClock(" << node_index << ", " << time << ") -> "
             << result;
     return result;
   }
@@ -492,12 +492,12 @@
   if (t0.boot != t1.boot) {
     if (t0.boot == time.boot) {
       const distributed_clock::time_point result = (time.time - t0.time) + d0;
-      VLOG(2) << "ToDistributedClock(" << node_index << ", " << time << ") -> "
+      VLOG(3) << "ToDistributedClock(" << node_index << ", " << time << ") -> "
               << result;
       return result;
     } else if (t1.boot == time.boot) {
       const distributed_clock::time_point result = (time.time - t1.time) + d1;
-      VLOG(2) << "ToDistributedClock(" << node_index << ", " << time << ") -> "
+      VLOG(3) << "ToDistributedClock(" << node_index << ", " << time << ") -> "
               << result;
       return result;
     } else {
@@ -508,7 +508,7 @@
   const distributed_clock::time_point result =
       message_bridge::ToDistributedClock(d0, d1, t0.time, t1.time, time.time);
 
-  VLOG(2) << "ToDistributedClock(" << node_index << ", " << time << ") -> "
+  VLOG(3) << "ToDistributedClock(" << node_index << ", " << time << ") -> "
           << result;
   return result;
 }
@@ -536,7 +536,7 @@
     }
     monotonic_clock::time_point result =
         time - std::get<0>(times_[0]) + std::get<1>(times_[0])[node_index].time;
-    VLOG(2) << "FromDistributedClock(" << node_index << ", " << time << ", "
+    VLOG(3) << "FromDistributedClock(" << node_index << ", " << time << ", "
             << boot_count << ") -> " << result;
     return {.boot = std::get<1>(times_[0])[node_index].boot, .time = result};
   }
@@ -565,13 +565,13 @@
   if (time == d1) {
     if (boot_count == t1.boot) {
       const BootTimestamp result = t1 + (time - d1);
-      VLOG(2) << "FromDistributedClock(" << node_index << ", " << time << ", "
+      VLOG(3) << "FromDistributedClock(" << node_index << ", " << time << ", "
               << boot_count << ") -> " << result;
       return result;
     } else {
       CHECK_EQ(boot_count, t0.boot);
       const BootTimestamp result = t0 + (time - d0);
-      VLOG(2) << "FromDistributedClock(" << node_index << ", " << time << ", "
+      VLOG(3) << "FromDistributedClock(" << node_index << ", " << time << ", "
               << boot_count << ") -> " << result;
       return result;
     }
@@ -581,14 +581,14 @@
             //<< " t1 " << t1;
   if (time > d1) {
     const BootTimestamp result = t1 + (time - d1);
-    VLOG(2) << "FromDistributedClock(" << node_index << ", " << time << ", "
+    VLOG(3) << "FromDistributedClock(" << node_index << ", " << time << ", "
             << boot_count << ") -> " << result;
     return result;
   }
 
   if (t0.boot != t1.boot) {
     const BootTimestamp result = t0 + (time - d0);
-    VLOG(2) << "FromDistributedClock(" << node_index << ", " << time << ", "
+    VLOG(3) << "FromDistributedClock(" << node_index << ", " << time << ", "
             << boot_count << ") -> " << result;
     return result;
   }
@@ -615,7 +615,7 @@
   const monotonic_clock::time_point result =
       t0.time + std::chrono::nanoseconds(
                     static_cast<int64_t>(numerator / absl::int128(dd.count())));
-  VLOG(2) << "FromDistributedClock(" << node_index << ", " << time << ", "
+  VLOG(3) << "FromDistributedClock(" << node_index << ", " << time << ", "
           << boot_count << ") -> " << result;
   return {.boot = t0.boot, .time = result};
 }
@@ -1270,7 +1270,7 @@
   {
     size_t node_a_index = 0;
     for (const auto &filters : filters_per_node_) {
-      VLOG(1) << "Investigating filter for node " << node_a_index;
+      VLOG(2) << "Investigating filter for node " << node_a_index;
       BootTimestamp next_node_time = BootTimestamp::max_time();
       BootDuration next_node_duration;
       NoncausalTimestampFilter *next_node_filter = nullptr;
@@ -1282,7 +1282,7 @@
             filter.filter->Observe();
 
         if (candidate) {
-          VLOG(1) << "Candidate for node " << node_a_index << " filter "
+          VLOG(2) << "Candidate for node " << node_a_index << " filter "
                   << filter_index << " is " << std::get<0>(*candidate);
           if (std::get<0>(*candidate) < next_node_time) {
             next_node_time = std::get<0>(*candidate);
@@ -1311,7 +1311,7 @@
       const size_t next_boot = last_monotonics_[node_a_index].boot + 1;
       if (next_boot < boots_->boots[node_a_index].size() &&
           timestamp_mappers_[node_a_index] != nullptr) {
-        BootTimestamp next_start_time = BootTimestamp{
+        const BootTimestamp next_start_time = BootTimestamp{
             .boot = next_boot,
             .time = timestamp_mappers_[node_a_index]->monotonic_start_time(
                 next_boot)};
@@ -1321,10 +1321,27 @@
           next_node_time = next_start_time;
           next_node_filter = nullptr;
         }
+
+        // We need to make sure we have solutions as well for any local messages
+        // published before remote messages.  Find the oldest message for each
+        // boot and make sure there's a time there.  Boots can't overlap, so if
+        // we have evidence that there has been a reboot, we need to get that
+        // into the interpolation function.
+        const BootTimestamp next_oldest_time = BootTimestamp{
+            .boot = next_boot,
+            .time = timestamp_mappers_[node_a_index]->monotonic_oldest_time(
+                next_boot)};
+        if (next_oldest_time < next_node_time) {
+          VLOG(1) << "Candidate for node " << node_a_index
+                  << " is the next oldest time, " << next_oldest_time
+                  << " not applying yet";
+          next_node_time = next_oldest_time;
+          next_node_filter = nullptr;
+        }
       }
 
       if (next_node_filter != nullptr) {
-        VLOG(1) << "Trying " << next_node_time << " " << next_node_duration
+        VLOG(2) << "Trying " << next_node_time << " " << next_node_duration
                 << " for node " << node_a_index;
       } else {
         VLOG(1) << "Trying " << next_node_time << " for node " << node_a_index;