Handle only one side coming back when rebooting.

We had a case where messages were only flowing to the logger node after
a reboot, and the other direction never came back.  This was never
getting observed, so we never solved for the next boot, and failed to
replay.

Change-Id: Ia2bab0b0179afafdf2a41b2be4498f629f511b61
Signed-off-by: Austin Schuh <austin.linux@gmail.com>
diff --git a/aos/events/event_scheduler.cc b/aos/events/event_scheduler.cc
index 97e0946..cb0c629 100644
--- a/aos/events/event_scheduler.cc
+++ b/aos/events/event_scheduler.cc
@@ -54,7 +54,7 @@
   auto iter = events_list_.begin();
   const logger::BootTimestamp t =
       FromDistributedClock(scheduler_scheduler_->distributed_now());
-  VLOG(1) << "Got time back " << t;
+  VLOG(2) << "Got time back " << t;
   CHECK_EQ(t.boot, boot_count_);
   CHECK_EQ(t.time, iter->first) << ": Time is wrong on node " << node_index_;
 
@@ -274,7 +274,7 @@
   }
 
   if (min_scheduler) {
-    VLOG(1) << "Oldest event " << min_event_time << " on scheduler "
+    VLOG(2) << "Oldest event " << min_event_time << " on scheduler "
             << min_scheduler->node_index_;
   }
   return std::make_tuple(min_event_time, min_scheduler);
diff --git a/aos/events/logging/boot_timestamp.h b/aos/events/logging/boot_timestamp.h
index dac6533..7eead2e 100644
--- a/aos/events/logging/boot_timestamp.h
+++ b/aos/events/logging/boot_timestamp.h
@@ -22,6 +22,16 @@
   bool operator==(const BootDuration &m2) const {
     return boot == m2.boot && duration == m2.duration;
   }
+  bool operator!=(const BootDuration &m2) const {
+    return boot != m2.boot || duration != m2.duration;
+  }
+
+  static constexpr BootDuration max_time() {
+    return BootDuration{
+        .boot = std::numeric_limits<size_t>::max(),
+        .duration = monotonic_clock::duration(
+            ::std::numeric_limits<monotonic_clock::duration::rep>::max())};
+  }
 };
 
 // Simple class representing which boot and what monotonic time in that boot.
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index e9f44b2..c346854 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -4067,6 +4067,76 @@
   ConfirmReadable(filenames);
 }
 
+// Tests that we properly handle only one direction ever existing after a
+// reboot.
+TEST(MissingDirectionTest, OneDirectionAfterReboot) {
+  aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+      aos::configuration::ReadConfig(ArtifactPath(
+          "aos/events/logging/multinode_pingpong_split4_config.json"));
+  message_bridge::TestingTimeConverter time_converter(
+      configuration::NodesCount(&config.message()));
+  SimulatedEventLoopFactory event_loop_factory(&config.message());
+  event_loop_factory.SetTimeConverter(&time_converter);
+
+  NodeEventLoopFactory *const pi1 =
+      event_loop_factory.GetNodeEventLoopFactory("pi1");
+  const size_t pi1_index = configuration::GetNodeIndex(
+      event_loop_factory.configuration(), pi1->node());
+  NodeEventLoopFactory *const pi2 =
+      event_loop_factory.GetNodeEventLoopFactory("pi2");
+  const size_t pi2_index = configuration::GetNodeIndex(
+      event_loop_factory.configuration(), pi2->node());
+  std::vector<std::string> filenames;
+
+  {
+    CHECK_EQ(pi1_index, 0u);
+    CHECK_EQ(pi2_index, 1u);
+
+    time_converter.AddNextTimestamp(
+        distributed_clock::epoch(),
+        {BootTimestamp::epoch(), BootTimestamp::epoch()});
+
+    const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
+    time_converter.AddNextTimestamp(
+        distributed_clock::epoch() + reboot_time,
+        {BootTimestamp{.boot = 1,
+                       .time = monotonic_clock::epoch()},
+         BootTimestamp::epoch() + reboot_time});
+  }
+
+  const std::string kLogfile2_1 =
+      aos::testing::TestTmpDir() + "/multi_logfile2.1/";
+  util::UnlinkRecursive(kLogfile2_1);
+
+
+  pi1->AlwaysStart<Ping>("ping");
+
+  // Pi1 sends to pi2.  Reboot pi1, but don't let pi2 connect to pi1.  This
+  // makes it such that we will only get timestamps from pi1 -> pi2 on the
+  // second boot.
+  {
+    LoggerState pi2_logger = LoggerState::MakeLogger(
+        pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
+
+    event_loop_factory.RunFor(chrono::milliseconds(95));
+
+    pi2_logger.StartLogger(kLogfile2_1);
+
+    event_loop_factory.RunFor(chrono::milliseconds(4000));
+
+    pi2->Disconnect(pi1->node());
+
+    event_loop_factory.RunFor(chrono::milliseconds(1000));
+    pi1->AlwaysStart<Ping>("ping");
+
+    event_loop_factory.RunFor(chrono::milliseconds(5000));
+    pi2_logger.AppendAllFilenames(&filenames);
+  }
+
+  const std::vector<LogFile> sorted_parts = SortParts(filenames);
+  ConfirmReadable(filenames);
+}
+
 }  // namespace testing
 }  // namespace logger
 }  // namespace aos
diff --git a/aos/network/multinode_timestamp_filter.cc b/aos/network/multinode_timestamp_filter.cc
index 0619b5e..262cb5c 100644
--- a/aos/network/multinode_timestamp_filter.cc
+++ b/aos/network/multinode_timestamp_filter.cc
@@ -43,6 +43,65 @@
 // ms/s.  Figure out how to define it.  Do this last.  This lets us handle
 // constraints going away, and constraints close in time.
 
+bool TimestampProblem::HasObservations(size_t node_a) const {
+  // Note, this function is probably over conservative.  It is requiring all the
+  // pairs for a node to have data in at least one direction rather than enough
+  // pairs to have data to observe the graph.  We can break that when someone
+  // finds it is overly restrictive.
+
+  if (clock_offset_filter_for_node_[node_a].empty()) {
+    // Look for a filter going the other way who's node_b is our node.
+    bool found_filter = false;
+    for (size_t node_b = 0u; node_b < clock_offset_filter_for_node_.size();
+         ++node_b) {
+      for (const struct FilterPair &filter :
+           clock_offset_filter_for_node_[node_b]) {
+        if (filter.b_index == node_a) {
+          if (filter.filter->timestamps_size(base_clock_[node_b].boot,
+                                             base_clock_[node_a].boot) == 0u) {
+            // Found one without data, explode.
+            return false;
+          }
+          found_filter = true;
+        }
+      }
+    }
+    return found_filter;
+  }
+
+  for (const struct FilterPair &filter :
+       clock_offset_filter_for_node_[node_a]) {
+    // There's something in this direction, so we don't need to check the
+    // opposite direction to confirm we have observations.
+    if (filter.filter->timestamps_size(
+            base_clock_[node_a].boot, base_clock_[filter.b_index].boot) != 0u) {
+      continue;
+    }
+
+    // For a boot to exist, we need to have some observations between it and
+    // another boot.  We wouldn't bother to build a problem to solve for
+    // this node otherwise.  Confirm that is true so we at least get
+    // notified if that assumption falls apart.
+    bool valid = false;
+    for (const struct FilterPair &other_filter :
+         clock_offset_filter_for_node_[filter.b_index]) {
+      if (other_filter.b_index == node_a) {
+        // Found our match.  Confirm it has timestamps.
+        if (other_filter.filter->timestamps_size(
+                base_clock_[filter.b_index].boot, base_clock_[node_a].boot) !=
+            0u) {
+          valid = true;
+        }
+        break;
+      }
+    }
+    if (!valid) {
+      return false;
+    }
+  }
+  return true;
+}
+
 bool TimestampProblem::ValidateSolution(std::vector<BootTimestamp> solution) {
   bool success = true;
   for (size_t i = 0u; i < clock_offset_filter_for_node_.size(); ++i) {
@@ -1355,16 +1414,15 @@
 
 std::tuple<std::vector<MultiNodeNoncausalOffsetEstimator::CandidateTimes>, bool>
 MultiNodeNoncausalOffsetEstimator::MakeCandidateTimes() const {
-  bool boots_all_match = true;
   std::vector<CandidateTimes> candidate_times;
   candidate_times.resize(last_monotonics_.size());
 
   size_t node_a_index = 0;
-  size_t last_boot = std::numeric_limits<size_t>::max();
   for (const auto &filters : filters_per_node_) {
     VLOG(2) << "Investigating filter for node " << node_a_index;
     BootTimestamp next_node_time = BootTimestamp::max_time();
-    BootDuration next_node_duration;
+    BootDuration next_node_duration = BootDuration::max_time();
+    size_t b_index = std::numeric_limits<size_t>::max();
     NoncausalTimestampFilter *next_node_filter = nullptr;
     // Find the oldest time for each node in each filter, and solve for that
     // time.  That gives us the next timestamp for this node.
@@ -1379,23 +1437,13 @@
         if (std::get<0>(*candidate) < next_node_time) {
           next_node_time = std::get<0>(*candidate);
           next_node_duration = std::get<1>(*candidate);
+          b_index = filter.b_index;
           next_node_filter = filter.filter;
         }
       }
       ++filter_index;
     }
 
-    // Found no active filters.  Either this node is off, or disconnected, or
-    // we are before the log file starts or after the log file ends.
-    if (next_node_time == BootTimestamp::max_time()) {
-      candidate_times[node_a_index] =
-          CandidateTimes{.next_node_time = next_node_time,
-                         .next_node_duration = next_node_duration,
-                         .next_node_filter = next_node_filter};
-      ++node_a_index;
-      continue;
-    }
-
     // We want to make sure we solve explicitly for the start time for each
     // log.  This is useless (though not all that expensive) if it is in the
     // middle of a set of data since we are just adding an extra point in the
@@ -1419,6 +1467,8 @@
                 << " is the next startup time, " << next_start_time;
         next_node_time = next_start_time;
         next_node_filter = nullptr;
+        b_index = std::numeric_limits<size_t>::max();
+        next_node_duration = BootDuration::max_time();
       }
 
       // We need to make sure we have solutions as well for any local messages
@@ -1436,19 +1486,56 @@
                 << " not applying yet";
         next_node_time = next_oldest_time;
         next_node_filter = nullptr;
+        b_index = std::numeric_limits<size_t>::max();
+        next_node_duration = BootDuration::max_time();
       }
     }
-    if (last_boot != std::numeric_limits<size_t>::max()) {
-      boots_all_match &= (next_node_time.boot == last_boot);
-    }
-    last_boot = next_node_time.boot;
     candidate_times[node_a_index] =
         CandidateTimes{.next_node_time = next_node_time,
                        .next_node_duration = next_node_duration,
+                       .b_index = b_index,
                        .next_node_filter = next_node_filter};
     ++node_a_index;
   }
 
+  // Now that we have all the candidates, confirm everything matches.
+  bool boots_all_match = true;
+  for (size_t i = 0; i < candidate_times.size(); ++i) {
+    const CandidateTimes &candidate = candidate_times[i];
+    if (candidate.next_node_time == logger::BootTimestamp::max_time()) {
+      continue;
+    }
+
+    // First step, if the last solution's boot doesn't match the next solution,
+    // we've got a reboot incoming and can't sort well.  Fall back to the more
+    // basic exhaustive search.
+    if (candidate.next_node_time.boot != last_monotonics_[i].boot) {
+      boots_all_match = false;
+      break;
+    }
+
+    // And then check that the other node's time also hasn't rebooted.  We might
+    // not have both directions of timestamps, so this is our only clue.
+    if (candidate.next_node_duration == BootDuration::max_time()) {
+      continue;
+    }
+
+    DCHECK_LT(candidate.b_index, candidate_times.size());
+    if (candidate_times[candidate.b_index].next_node_time.boot !=
+        candidate.next_node_duration.boot) {
+      boots_all_match = false;
+      break;
+    }
+  }
+  if (VLOG_IS_ON(1)) {
+    LOG(INFO) << "Boots all match: " << boots_all_match;
+    for (size_t i = 0; i < candidate_times.size(); ++i) {
+      LOG(INFO) << "Candidate " << candidate_times[i].next_node_time
+                << " duration " << candidate_times[i].next_node_duration
+                << " (node " << candidate_times[i].b_index << ")";
+    }
+  }
+
   return std::make_tuple(candidate_times, boots_all_match);
 }
 
@@ -1572,6 +1659,7 @@
         candidate_times[node_a_index].next_node_duration;
     NoncausalTimestampFilter *next_node_filter =
         candidate_times[node_a_index].next_node_filter;
+    size_t b_index = candidate_times[node_a_index].b_index;
     if (next_node_time == BootTimestamp::max_time()) {
       continue;
     }
@@ -1586,8 +1674,16 @@
     // TODO(austin): If we start supporting only having 1 direction of
     // timestamps, we might need to change our assumptions around
     // BootTimestamp and BootDuration.
+    bool boots_match = next_node_time.boot == base_times[node_a_index].boot;
 
-    if (next_node_time.boot == base_times[node_a_index].boot) {
+    // Make sure the paired time also has a matching boot.
+    if (next_node_duration != BootDuration::max_time()) {
+      if (next_node_duration.boot != base_times[b_index].boot) {
+        boots_match = false;
+      }
+    }
+
+    if (boots_match) {
       // Optimize, and save the time into times if earlier than time.
       for (size_t node_index = 0; node_index < base_times.size();
            ++node_index) {
@@ -1609,6 +1705,16 @@
       // And we know our solution node will have the wrong boot, so replace
       // it entirely.
       problem->set_base_clock(node_a_index, next_node_time);
+
+      // And update the paired boot for the paired node.
+      if (next_node_duration != BootDuration::max_time()) {
+        if (next_node_duration.boot != base_times[b_index].boot) {
+          problem->set_base_clock(
+              b_index, BootTimestamp{.boot = next_node_duration.boot,
+                                     .time = next_node_time.time +
+                                             next_node_duration.duration});
+        }
+      }
     }
 
     std::vector<BootTimestamp> points(problem->size(),
@@ -1617,6 +1723,14 @@
       problem->Debug();
     }
     points[node_a_index] = next_node_time;
+
+    if (!problem->HasObservations(node_a_index)) {
+      VLOG(1) << "No observations, checking if there's a filter";
+      CHECK(next_node_filter == nullptr)
+          << ": No observations, but this isn't a start time.";
+      continue;
+    }
+
     std::tuple<std::vector<BootTimestamp>, size_t> solution =
         problem->SolveNewton(points);
 
diff --git a/aos/network/multinode_timestamp_filter.h b/aos/network/multinode_timestamp_filter.h
index 7259fc8..f959d7b 100644
--- a/aos/network/multinode_timestamp_filter.h
+++ b/aos/network/multinode_timestamp_filter.h
@@ -61,6 +61,10 @@
   // Validates the solution, returning true if it meets all the constraints, and
   // false otherwise.
   bool ValidateSolution(std::vector<logger::BootTimestamp> solution);
+  // Returns true if the provide node has observations to solve for the
+  // provided boots.  This may happen when we are trying to solve for a reboot
+  // to see if it is next, and haven't queued far enough.
+  bool HasObservations(size_t node_a) const;
 
   // LOGs a representation of the problem.
   void Debug();
@@ -331,7 +335,8 @@
  private:
   struct CandidateTimes {
     logger::BootTimestamp next_node_time = logger::BootTimestamp::max_time();
-    logger::BootDuration next_node_duration;
+    logger::BootDuration next_node_duration = logger::BootDuration::max_time();
+    size_t b_index = std::numeric_limits<size_t>::max();
     NoncausalTimestampFilter *next_node_filter = nullptr;
   };
 
diff --git a/aos/network/timestamp_filter.h b/aos/network/timestamp_filter.h
index 6486a1b..b51fe2f 100644
--- a/aos/network/timestamp_filter.h
+++ b/aos/network/timestamp_filter.h
@@ -696,7 +696,9 @@
     auto it =
         std::lower_bound(filters_.begin(), filters_.end(),
                          std::make_pair(boota, bootb), FilterLessThanLower);
-    CHECK(it != filters_.end());
+    if (it == filters_.end()) {
+      return nullptr;
+    }
     if (it->boot == std::make_pair(boota, bootb)) {
       return &it->filter;
     } else {