Support nodes rebooting

We have log files which span a reboot.  We want to be able to replay the
timeline across that reboot so we can run simulations and everything
else interesting.

This requires a bunch of stuff, unfortunately.

The most visible one is that we need to be able to "reboot" a node.
This means we need a way of starting it up and stopping it.  There are
now OnStartup and OnShutdown handlers in NodeEventLoopFactory to serve
this purpose, and better application context tracking to make it easier
to start and stop applications through a virtual starter.

This requires LogReader and the simulated network bridge to be
refactored to support nodes coming and going while the main application
continues to run.

From there, everything else is just a massive amount of plumbing of the
BootTimestamp through everything just short of the user.  Boot UUIDs
were put in TimeConverter so everything related to rebooting is all
nicely together.

Change-Id: I2cfb659c5764c1dd80dc66f33cfab3937159e324
Signed-off-by: Austin Schuh <austin.schuh@bluerivertech.com>
diff --git a/aos/network/BUILD b/aos/network/BUILD
index 48a71c6..0ed756c 100644
--- a/aos/network/BUILD
+++ b/aos/network/BUILD
@@ -520,6 +520,7 @@
         ":timestamp_filter",
         "//aos:configuration",
         "//aos/events:simulated_event_loop",
+        "//aos/events/logging:boot_timestamp",
         "//aos/events/logging:logfile_utils",
         "//aos/time",
         "@org_tuxfamily_eigen//:eigen",
@@ -547,6 +548,7 @@
     deps = [
         ":multinode_timestamp_filter",
         "//aos/events:simulated_event_loop",
+        "//aos/events/logging:boot_timestamp",
         "//aos/time",
     ],
 )
diff --git a/aos/network/message_bridge_client_status.cc b/aos/network/message_bridge_client_status.cc
index 69f474a..5236f82 100644
--- a/aos/network/message_bridge_client_status.cc
+++ b/aos/network/message_bridge_client_status.cc
@@ -127,6 +127,7 @@
 }
 
 int MessageBridgeClientStatus::FindClientIndex(std::string_view node_name) {
+  CHECK(statistics_.message().has_connections());
   for (size_t i = 0; i < statistics_.message().connections()->size(); ++i) {
     const ClientConnection *client_connection =
         statistics_.message().connections()->Get(i);
diff --git a/aos/network/multinode_timestamp_filter.cc b/aos/network/multinode_timestamp_filter.cc
index 3b5b997..65691cb 100644
--- a/aos/network/multinode_timestamp_filter.cc
+++ b/aos/network/multinode_timestamp_filter.cc
@@ -347,8 +347,26 @@
     VLOG(1) << "  " << t;
   }
 
-  // TODO(austin): Figure out how to communicate the reboot up to the factory.
   CHECK_EQ(node_count_, std::get<1>(*next_time).size());
+
+  if (times_.empty()) {
+    for (BootTimestamp t : std::get<1>(*next_time)) {
+      CHECK_EQ(t.boot, 0u);
+    }
+  } else {
+    bool rebooted = false;
+    for (size_t i = 0; i < node_count_; ++i) {
+      if (std::get<1>(times_.back())[i].boot !=
+          std::get<1>(*next_time)[i].boot) {
+        rebooted = true;
+        break;
+      }
+    }
+    if (rebooted) {
+      CHECK(reboot_found_);
+      reboot_found_(std::get<0>(*next_time), std::get<1>(*next_time));
+    }
+  }
   times_.emplace_back(std::move(*next_time));
   return &times_.back();
 }
@@ -413,7 +431,7 @@
 }
 
 distributed_clock::time_point InterpolatedTimeConverter::ToDistributedClock(
-    size_t node_index, monotonic_clock::time_point time) {
+    size_t node_index, BootTimestamp time) {
   CHECK_LT(node_index, node_count_);
   // If there is only one node, time estimation makes no sense.  Just return
   // unity time.
@@ -425,19 +443,21 @@
   QueueUntil(
       [time, node_index](const std::tuple<distributed_clock::time_point,
                                           std::vector<BootTimestamp>> &t) {
-        return std::get<1>(t)[node_index].time < time;
+        return std::get<1>(t)[node_index] < time;
       });
 
   // Before the beginning needs to have 0 slope otherwise time jumps when
   // timestamp 2 happens.
-  if (times_.size() == 1u || time < std::get<1>(times_[0])[node_index].time) {
-    if (time < std::get<1>(times_[0])[node_index].time) {
+  if (times_.size() == 1u || time < std::get<1>(times_[0])[node_index]) {
+    if (time < std::get<1>(times_[0])[node_index]) {
       CHECK(!have_popped_)
           << ": Trying to interpolate time " << time
           << " but we have forgotten the relevant points already.";
     }
+    CHECK_EQ(time.boot, std::get<1>(times_[0])[node_index].boot);
     const distributed_clock::time_point result =
-        time - std::get<1>(times_[0])[node_index].time + std::get<0>(times_[0]);
+        time.time - std::get<1>(times_[0])[node_index].time +
+        std::get<0>(times_[0]);
     VLOG(2) << "ToDistributedClock(" << node_index << ", " << time << ") -> "
             << result;
     return result;
@@ -448,7 +468,9 @@
   size_t index = times_.size() - 2u;
   while (index > 0u) {
     // TODO(austin): Binary search.
-    if (std::get<1>(times_[index])[node_index].time <= time) {
+    //LOG(INFO) << std::get<1>(times_[index])[node_index] << " <= " << time
+              //<< "?";
+    if (std::get<1>(times_[index])[node_index] <= time) {
       break;
     }
     --index;
@@ -457,38 +479,54 @@
   // Interpolate with the two of these.
   const distributed_clock::time_point d0 = std::get<0>(times_[index]);
   const distributed_clock::time_point d1 = std::get<0>(times_[index + 1]);
+  const BootTimestamp t0 = std::get<1>(times_[index])[node_index];
+  const BootTimestamp t1 = std::get<1>(times_[index + 1])[node_index];
 
-  // TODO(austin): We should extrapolate if the boot changes.
-  CHECK_EQ(std::get<1>(times_[index])[node_index].boot,
-           std::get<1>(times_[index + 1])[node_index].boot);
-  const monotonic_clock::time_point t0 =
-      std::get<1>(times_[index])[node_index].time;
-  const monotonic_clock::time_point t1 =
-      std::get<1>(times_[index + 1])[node_index].time;
+  if (time > t1) {
+    const distributed_clock::time_point result = (time.time - t1.time) + d1;
+    VLOG(2) << "ToDistributedClock(" << node_index << ", " << time << ") -> "
+            << result;
+    return result;
+  }
+
+  if (t0.boot != t1.boot) {
+    if (t0.boot == time.boot) {
+      const distributed_clock::time_point result = (time.time - t0.time) + d0;
+      VLOG(2) << "ToDistributedClock(" << node_index << ", " << time << ") -> "
+              << result;
+      return result;
+    } else if (t1.boot == time.boot) {
+      const distributed_clock::time_point result = (time.time - t1.time) + d1;
+      VLOG(2) << "ToDistributedClock(" << node_index << ", " << time << ") -> "
+              << result;
+      return result;
+    } else {
+      LOG(FATAL) << t0 << " <= " << time << " <= " << t1;
+    }
+  }
 
   const distributed_clock::time_point result =
-      message_bridge::ToDistributedClock(d0, d1, t0, t1, time);
+      message_bridge::ToDistributedClock(d0, d1, t0.time, t1.time, time.time);
 
   VLOG(2) << "ToDistributedClock(" << node_index << ", " << time << ") -> "
           << result;
   return result;
 }
 
-monotonic_clock::time_point InterpolatedTimeConverter::FromDistributedClock(
-    size_t node_index, distributed_clock::time_point time) {
+BootTimestamp InterpolatedTimeConverter::FromDistributedClock(
+    size_t node_index, distributed_clock::time_point time, size_t boot_count) {
   CHECK_LT(node_index, node_count_);
   // If there is only one node, time estimation makes no sense.  Just return
   // unity time.
   if (node_count_ == 1u) {
-    return monotonic_clock::epoch() + time.time_since_epoch();
+    return BootTimestamp::epoch() + time.time_since_epoch();
   }
 
   // Make sure there are enough timestamps in the queue.
-  QueueUntil(
-      [time](const std::tuple<distributed_clock::time_point,
-                              std::vector<BootTimestamp>> &t) {
-        return std::get<0>(t) < time;
-      });
+  QueueUntil([time](const std::tuple<distributed_clock::time_point,
+                                     std::vector<BootTimestamp>> &t) {
+    return std::get<0>(t) < time;
+  });
 
   if (times_.size() == 1u || time < std::get<0>(times_[0])) {
     if (time < std::get<0>(times_[0])) {
@@ -498,16 +536,21 @@
     }
     monotonic_clock::time_point result =
         time - std::get<0>(times_[0]) + std::get<1>(times_[0])[node_index].time;
-    VLOG(2) << "FromDistributedClock(" << node_index << ", " << time << ") -> "
-            << result;
-    return result;
+    VLOG(2) << "FromDistributedClock(" << node_index << ", " << time << ", "
+            << boot_count << ") -> " << result;
+    return {.boot = std::get<1>(times_[0])[node_index].boot, .time = result};
   }
 
   // Now, find the corresponding timestamps.  Search from the back since that's
   // where most of the times we care about will be.
   size_t index = times_.size() - 2u;
   while (index > 0u) {
-    if (std::get<0>(times_[index]) <= time) {
+    //LOG(INFO) << "Considering " << std::get<0>(times_[index + 1]) << " index "
+              //<< index << " vs " << time;
+    // If we are searching across a reboot, we want both the before and after
+    // time.  We will be asked to solve for the after, so make sure when a time
+    // matches exactly, we pick the time before, not the time after.
+    if (std::get<0>(times_[index]) < time) {
       break;
     }
     --index;
@@ -516,13 +559,39 @@
   // Interpolate with the two of these.
   const distributed_clock::time_point d0 = std::get<0>(times_[index]);
   const distributed_clock::time_point d1 = std::get<0>(times_[index + 1]);
+  const BootTimestamp t0 = std::get<1>(times_[index])[node_index];
+  const BootTimestamp t1 = std::get<1>(times_[index + 1])[node_index];
 
-  CHECK_EQ(std::get<1>(times_[index])[node_index].boot,
-           std::get<1>(times_[index + 1])[node_index].boot);
-  const monotonic_clock::time_point t0 =
-      std::get<1>(times_[index])[node_index].time;
-  const monotonic_clock::time_point t1 =
-      std::get<1>(times_[index + 1])[node_index].time;
+  if (time == d1) {
+    if (boot_count == t1.boot) {
+      const BootTimestamp result = t1 + (time - d1);
+      VLOG(2) << "FromDistributedClock(" << node_index << ", " << time << ", "
+              << boot_count << ") -> " << result;
+      return result;
+    } else {
+      CHECK_EQ(boot_count, t0.boot);
+      const BootTimestamp result = t0 + (time - d0);
+      VLOG(2) << "FromDistributedClock(" << node_index << ", " << time << ", "
+              << boot_count << ") -> " << result;
+      return result;
+    }
+  }
+
+  //LOG(INFO) << "d0 " << d0 << " time " << time << " d1 " << d1 << " t0 " << t0
+            //<< " t1 " << t1;
+  if (time > d1) {
+    const BootTimestamp result = t1 + (time - d1);
+    VLOG(2) << "FromDistributedClock(" << node_index << ", " << time << ", "
+            << boot_count << ") -> " << result;
+    return result;
+  }
+
+  if (t0.boot != t1.boot) {
+    const BootTimestamp result = t0 + (time - d0);
+    VLOG(2) << "FromDistributedClock(" << node_index << ", " << time << ", "
+            << boot_count << ") -> " << result;
+    return result;
+  }
 
   const chrono::nanoseconds dd = d1 - d0;
 
@@ -538,21 +607,23 @@
   // and subtract when < 0 so we round correctly.  Multiply before dividing so
   // we don't round early, and use 128 bit arithmetic to guarantee that 64 bit
   // multiplication fits.
-  absl::int128 numerator =
-      absl::int128((time - d0).count()) * absl::int128((t1 - t0).count());
+  absl::int128 numerator = absl::int128((time - d0).count()) *
+                           absl::int128((t1.time - t0.time).count());
   numerator += numerator > 0 ? absl::int128(dd.count() / 2)
                              : -absl::int128(dd.count() / 2);
 
   const monotonic_clock::time_point result =
-      t0 + std::chrono::nanoseconds(
-               static_cast<int64_t>(numerator / absl::int128(dd.count())));
-  VLOG(2) << "FromDistributedClock(" << node_index << ", " << time << ") -> "
-          << result;
-  return result;
+      t0.time + std::chrono::nanoseconds(
+                    static_cast<int64_t>(numerator / absl::int128(dd.count())));
+  VLOG(2) << "FromDistributedClock(" << node_index << ", " << time << ", "
+          << boot_count << ") -> " << result;
+  return {.boot = t0.boot, .time = result};
 }
+
 MultiNodeNoncausalOffsetEstimator::MultiNodeNoncausalOffsetEstimator(
     const Configuration *configuration,
-    const Configuration *logged_configuration, bool skip_order_validation,
+    const Configuration *logged_configuration,
+    std::shared_ptr<const logger::Boots> boots, bool skip_order_validation,
     chrono::nanoseconds time_estimation_buffer_seconds)
     : InterpolatedTimeConverter(!configuration::MultiNode(logged_configuration)
                                     ? 1u
@@ -560,11 +631,26 @@
                                 time_estimation_buffer_seconds),
       configuration_(configuration),
       logged_configuration_(logged_configuration),
+      boots_(boots),
       skip_order_validation_(skip_order_validation) {
+  const bool multi_node = configuration::MultiNode(logged_configuration);
+  if (!boots_ && !multi_node) {
+    // This is a super old log.  Fake out boots by making them up.
+    LOG(WARNING) << "Old single node log without boot UUIDs, generating a "
+                    "random boot UUID.";
+    std::shared_ptr<logger::Boots> boots = std::make_shared<logger::Boots>();
+    const UUID random_boot_uuid = UUID::Random();
+    boots->boot_count_map.emplace(random_boot_uuid.ToString(), 0);
+    boots->boots =
+        std::vector<std::vector<std::string>>{{random_boot_uuid.ToString()}};
+    boots_ = boots;
+  }
+
+  CHECK(boots_) << ": Missing boots for " << NodesCount();
+  CHECK_EQ(boots_->boots.size(), NodesCount());
   filters_per_node_.resize(NodesCount());
   last_monotonics_.resize(NodesCount(), BootTimestamp::epoch());
-  if (FLAGS_timestamps_to_csv &&
-      configuration::MultiNode(logged_configuration)) {
+  if (FLAGS_timestamps_to_csv && multi_node) {
     fp_ = fopen("/tmp/timestamp_noncausal_offsets.csv", "w");
     fprintf(fp_, "# distributed");
     for (const Node *node : configuration::GetNodes(logged_configuration)) {
@@ -637,12 +723,23 @@
   if (!node_samples_.empty()) {
     for (NodeSamples &node : node_samples_) {
       for (SingleNodeSamples &timestamps : node.nodes) {
-        CHECK (timestamps.messages.empty());
+        CHECK(timestamps.messages.empty());
       }
     }
   }
 }
 
+UUID MultiNodeNoncausalOffsetEstimator::boot_uuid(size_t node_index,
+                                                  size_t boot_count) {
+  CHECK(boots_);
+  CHECK_LT(node_index, boots_->boots.size());
+  if (boot_count < boots_->boots[node_index].size()) {
+    return UUID::FromString(boots_->boots[node_index][boot_count]);
+  } else {
+    return UUID::Random();
+  }
+}
+
 void MultiNodeNoncausalOffsetEstimator::Start(
     SimulatedEventLoopFactory *factory) {
   std::vector<monotonic_clock::time_point> times;
@@ -1290,7 +1387,12 @@
                       << "ns";
             }
             VLOG(1) << "Ignoring because it is close enough.";
-            next_node_filter->Consume();
+            std::optional<
+                std::tuple<logger::BootTimestamp, logger::BootDuration>>
+                result = next_node_filter->Consume();
+            CHECK(result);
+            next_node_filter->Pop(std::get<0>(*result) -
+                                  time_estimation_buffer_seconds_);
             break;
           }
           // Somehow the new solution is better *and* worse than the old
@@ -1307,7 +1409,12 @@
           }
 
           if (skip_order_validation_) {
-            next_node_filter->Consume();
+            std::optional<
+                std::tuple<logger::BootTimestamp, logger::BootDuration>>
+                result = next_node_filter->Consume();
+            CHECK(result);
+            next_node_filter->Pop(std::get<0>(*result) -
+                                  time_estimation_buffer_seconds_);
             LOG(ERROR) << "Skipping because --skip_order_validation";
             break;
           } else {
@@ -1396,8 +1503,10 @@
       }
     }
     sample = *next_filter->Consume();
+    next_filter->Pop(std::get<0>(sample) - time_estimation_buffer_seconds_);
   } else {
     sample = *next_filter->Consume();
+    next_filter->Pop(std::get<0>(sample) - time_estimation_buffer_seconds_);
     // We found a good sample, so consume it.  If it is a duplicate, we still
     // want to consume it.  But, if this is the first time around, we want to
     // re-solve by recursing (once) to pickup the better base.
diff --git a/aos/network/multinode_timestamp_filter.h b/aos/network/multinode_timestamp_filter.h
index a52db7d..8d776b8 100644
--- a/aos/network/multinode_timestamp_filter.h
+++ b/aos/network/multinode_timestamp_filter.h
@@ -8,6 +8,7 @@
 #include "Eigen/Dense"
 #include "absl/container/btree_set.h"
 #include "aos/configuration.h"
+#include "aos/events/logging/boot_timestamp.h"
 #include "aos/events/logging/logfile_utils.h"
 #include "aos/events/simulated_event_loop.h"
 #include "aos/network/timestamp_filter.h"
@@ -166,16 +167,17 @@
 
   // Converts a time to the distributed clock for scheduling and cross-node
   // time measurement.
-  // TODO(austin): Need to pass in boot.
   distributed_clock::time_point ToDistributedClock(
-      size_t node_index, monotonic_clock::time_point time) override;
+      size_t node_index, logger::BootTimestamp time) override;
 
   // Takes the distributed time and converts it to the monotonic clock for this
   // node.
-  monotonic_clock::time_point FromDistributedClock(
-      size_t node_index, distributed_clock::time_point time) override;
+  logger::BootTimestamp FromDistributedClock(size_t node_index,
+                                             distributed_clock::time_point time,
+                                             size_t boot_count) override;
 
   // Called whenever time passes this point and we can forget about it.
+  // TODO(austin): Pop here instead of in log reader.
   void ObserveTimePassed(distributed_clock::time_point time) override;
 
   // Queues 1 more timestammp in the interpolation list.  This is public for
@@ -283,7 +285,8 @@
  public:
   MultiNodeNoncausalOffsetEstimator(
       const Configuration *configuration,
-      const Configuration *logged_configuration, bool skip_order_validation,
+      const Configuration *logged_configuration,
+      std::shared_ptr<const logger::Boots> boots, bool skip_order_validation,
       std::chrono::nanoseconds time_estimation_buffer_seconds);
 
   ~MultiNodeNoncausalOffsetEstimator() override;
@@ -297,6 +300,8 @@
                            std::vector<logger::BootTimestamp>>>
   NextTimestamp() override;
 
+  UUID boot_uuid(size_t node_index, size_t boot_count) override;
+
   // Checks that all the nodes in the graph are connected.  Needs all filters to
   // be constructed first.
   void CheckGraph();
@@ -337,6 +342,8 @@
   const Configuration *configuration_;
   const Configuration *logged_configuration_;
 
+  std::shared_ptr<const logger::Boots> boots_;
+
   // If true, skip any validation which would trigger if we see evidance that
   // time estimation between nodes was incorrect.
   const bool skip_order_validation_;
diff --git a/aos/network/multinode_timestamp_filter_test.cc b/aos/network/multinode_timestamp_filter_test.cc
index f67d8de..c1fb456 100644
--- a/aos/network/multinode_timestamp_filter_test.cc
+++ b/aos/network/multinode_timestamp_filter_test.cc
@@ -82,20 +82,19 @@
 // results.  1 second should be 1 second everywhere.
 TEST(InterpolatedTimeConverterTest, OneTime) {
   const distributed_clock::time_point de = distributed_clock::epoch();
-  const monotonic_clock::time_point me = monotonic_clock::epoch();
+  const BootTimestamp me = BootTimestamp::epoch();
 
   TestingTimeConverter time_converter(3u);
   time_converter.AddNextTimestamp(
       de + chrono::seconds(0),
-      {{.boot = 0, .time = me + chrono::seconds(1)},
-       {.boot = 0, .time = me + chrono::seconds(10)},
-       {.boot = 0, .time = me + chrono::seconds(1000)}});
+      {me + chrono::seconds(1), me + chrono::seconds(10),
+       me + chrono::seconds(1000)});
 
-  EXPECT_EQ(time_converter.FromDistributedClock(0, de - chrono::seconds(1)),
+  EXPECT_EQ(time_converter.FromDistributedClock(0, de - chrono::seconds(1), 0),
             me + chrono::seconds(0));
-  EXPECT_EQ(time_converter.FromDistributedClock(1, de - chrono::seconds(1)),
+  EXPECT_EQ(time_converter.FromDistributedClock(1, de - chrono::seconds(1), 0),
             me + chrono::seconds(9));
-  EXPECT_EQ(time_converter.FromDistributedClock(2, de - chrono::seconds(1)),
+  EXPECT_EQ(time_converter.FromDistributedClock(2, de - chrono::seconds(1), 0),
             me + chrono::seconds(999));
   EXPECT_EQ(time_converter.ToDistributedClock(0, me + chrono::seconds(0)),
             de - chrono::seconds(1));
@@ -104,11 +103,11 @@
   EXPECT_EQ(time_converter.ToDistributedClock(2, me + chrono::seconds(999)),
             de - chrono::seconds(1));
 
-  EXPECT_EQ(time_converter.FromDistributedClock(0, de),
+  EXPECT_EQ(time_converter.FromDistributedClock(0, de, 0),
             me + chrono::seconds(1));
-  EXPECT_EQ(time_converter.FromDistributedClock(1, de),
+  EXPECT_EQ(time_converter.FromDistributedClock(1, de, 0),
             me + chrono::seconds(10));
-  EXPECT_EQ(time_converter.FromDistributedClock(2, de),
+  EXPECT_EQ(time_converter.FromDistributedClock(2, de, 0),
             me + chrono::seconds(1000));
   EXPECT_EQ(time_converter.ToDistributedClock(0, me + chrono::seconds(1)), de);
   EXPECT_EQ(time_converter.ToDistributedClock(1, me + chrono::seconds(10)), de);
@@ -119,29 +118,27 @@
 // Tests that actual interpolation works as expected for multiple timestamps.
 TEST(InterpolatedTimeConverterTest, Interpolation) {
   const distributed_clock::time_point de = distributed_clock::epoch();
-  const monotonic_clock::time_point me = monotonic_clock::epoch();
+  const BootTimestamp me = BootTimestamp::epoch();
 
   TestingTimeConverter time_converter(3u);
   // Test that 2 timestamps interpolate correctly.
   time_converter.AddNextTimestamp(
       de + chrono::seconds(0),
-      {{.boot = 0, .time = me + chrono::seconds(1)},
-       {.boot = 0, .time = me + chrono::seconds(10)},
-       {.boot = 0, .time = me + chrono::seconds(1000)}});
+      {me + chrono::seconds(1), me + chrono::seconds(10),
+       me + chrono::seconds(1000)});
   time_converter.AddNextTimestamp(
       de + chrono::seconds(1),
-      {{.boot = 0, .time = me + chrono::seconds(2)},
-       {.boot = 0, .time = me + chrono::seconds(11)},
-       {.boot = 0, .time = me + chrono::seconds(1001)}});
+      {me + chrono::seconds(2), me + chrono::seconds(11),
+       me + chrono::seconds(1001)});
 
   EXPECT_EQ(
-      time_converter.FromDistributedClock(0, de + chrono::milliseconds(500)),
+      time_converter.FromDistributedClock(0, de + chrono::milliseconds(500), 0),
       me + chrono::milliseconds(1500));
   EXPECT_EQ(
-      time_converter.FromDistributedClock(1, de + chrono::milliseconds(500)),
+      time_converter.FromDistributedClock(1, de + chrono::milliseconds(500), 0),
       me + chrono::milliseconds(10500));
   EXPECT_EQ(
-      time_converter.FromDistributedClock(2, de + chrono::milliseconds(500)),
+      time_converter.FromDistributedClock(2, de + chrono::milliseconds(500), 0),
       me + chrono::milliseconds(1000500));
   EXPECT_EQ(
       time_converter.ToDistributedClock(0, me + chrono::milliseconds(1500)),
@@ -156,26 +153,25 @@
   // And that we can interpolate between points not at the start.
   time_converter.AddNextTimestamp(
       de + chrono::seconds(2),
-      {{.boot = 0, .time = me + chrono::seconds(3) - chrono::milliseconds(2)},
-       {.boot = 0, .time = me + chrono::seconds(12) - chrono::milliseconds(2)},
-       {.boot = 0, .time = me + chrono::seconds(1002)}});
+      {me + chrono::seconds(3) - chrono::milliseconds(2),
+       me + chrono::seconds(12) - chrono::milliseconds(2),
+       me + chrono::seconds(1002)});
 
   time_converter.AddNextTimestamp(
       de + chrono::seconds(3),
-      {{.boot = 0, .time = me + chrono::seconds(4) - chrono::milliseconds(4)},
-       {.boot = 0, .time = me + chrono::seconds(13) - chrono::milliseconds(2)},
-       {.boot = 0,
-        .time = me + chrono::seconds(1003) - chrono::milliseconds(2)}});
+      {me + chrono::seconds(4) - chrono::milliseconds(4),
+       me + chrono::seconds(13) - chrono::milliseconds(2),
+       me + chrono::seconds(1003) - chrono::milliseconds(2)});
 
-  EXPECT_EQ(
-      time_converter.FromDistributedClock(0, de + chrono::milliseconds(2500)),
-      me + chrono::milliseconds(3497));
-  EXPECT_EQ(
-      time_converter.FromDistributedClock(1, de + chrono::milliseconds(2500)),
-      me + chrono::milliseconds(12498));
-  EXPECT_EQ(
-      time_converter.FromDistributedClock(2, de + chrono::milliseconds(2500)),
-      me + chrono::milliseconds(1002499));
+  EXPECT_EQ(time_converter.FromDistributedClock(
+                0, de + chrono::milliseconds(2500), 0),
+            me + chrono::milliseconds(3497));
+  EXPECT_EQ(time_converter.FromDistributedClock(
+                1, de + chrono::milliseconds(2500), 0),
+            me + chrono::milliseconds(12498));
+  EXPECT_EQ(time_converter.FromDistributedClock(
+                2, de + chrono::milliseconds(2500), 0),
+            me + chrono::milliseconds(1002499));
   EXPECT_EQ(
       time_converter.ToDistributedClock(0, me + chrono::milliseconds(3497)),
       de + chrono::milliseconds(2500));
@@ -187,11 +183,87 @@
       de + chrono::milliseconds(2500));
 }
 
+// Tests that interpolation works across reboots.
+TEST(InterpolatedTimeConverterTest, RebootInterpolation) {
+  const distributed_clock::time_point de = distributed_clock::epoch();
+  const BootTimestamp me = BootTimestamp::epoch();
+  const BootTimestamp me2{.boot = 1u, .time = monotonic_clock::epoch()};
+
+  //LOG(FATAL) << "TODO(austin): Test ToDistributedClock too";
+
+  TestingTimeConverter time_converter(3u);
+  size_t reboot_counter = 0;
+  time_converter.set_reboot_found(
+      [&](distributed_clock::time_point,
+          const std::vector<logger::BootTimestamp> &) { ++reboot_counter; });
+  // Test that 2 timestamps interpolate correctly.
+  time_converter.AddNextTimestamp(
+      de + chrono::seconds(0),
+      {me + chrono::seconds(1), me + chrono::seconds(10),
+       me + chrono::seconds(1000)});
+  time_converter.AddNextTimestamp(
+      de + chrono::seconds(1),
+      {me + chrono::seconds(2), me + chrono::seconds(11),
+       me + chrono::seconds(1001)});
+  time_converter.AddNextTimestamp(
+      de + chrono::seconds(2),
+      {me + chrono::seconds(3), me + chrono::seconds(12),
+       me + chrono::seconds(1002)});
+
+  time_converter.AddNextTimestamp(
+      de + chrono::seconds(3),
+      {me2 + chrono::seconds(4), me + chrono::seconds(13),
+       me + chrono::seconds(1003)});
+
+  time_converter.AddNextTimestamp(
+      de + chrono::seconds(4),
+      {me2 + chrono::seconds(5), me + chrono::seconds(14),
+       me + chrono::seconds(1004)});
+
+  EXPECT_EQ(time_converter.FromDistributedClock(
+                0, de + chrono::milliseconds(2400), 0),
+            me + chrono::milliseconds(3400));
+  EXPECT_EQ(time_converter.FromDistributedClock(
+                1, de + chrono::milliseconds(2400), 0),
+            me + chrono::milliseconds(12400));
+  EXPECT_EQ(time_converter.FromDistributedClock(
+                2, de + chrono::milliseconds(2400), 0),
+            me + chrono::milliseconds(1002400));
+
+  EXPECT_EQ(time_converter.FromDistributedClock(
+                0, de + chrono::milliseconds(2900), 0),
+            me + chrono::milliseconds(3900));
+  EXPECT_EQ(time_converter.FromDistributedClock(
+                1, de + chrono::milliseconds(2900), 0),
+            me + chrono::milliseconds(12900));
+  EXPECT_EQ(time_converter.FromDistributedClock(
+                2, de + chrono::milliseconds(2900), 0),
+            me + chrono::milliseconds(1002900));
+
+  EXPECT_EQ(time_converter.FromDistributedClock(
+                0, de + chrono::milliseconds(3000), 0),
+            me + chrono::seconds(4));
+  EXPECT_EQ(time_converter.FromDistributedClock(
+                0, de + chrono::milliseconds(3000), 1),
+            me2 + chrono::seconds(4));
+
+  EXPECT_EQ(time_converter.FromDistributedClock(
+                0, de + chrono::milliseconds(3900), 1),
+            me2 + chrono::milliseconds(4900));
+  EXPECT_EQ(time_converter.FromDistributedClock(
+                1, de + chrono::milliseconds(3900), 0),
+            me + chrono::milliseconds(13900));
+  EXPECT_EQ(time_converter.FromDistributedClock(
+                2, de + chrono::milliseconds(3900), 0),
+            me + chrono::milliseconds(1003900));
+  EXPECT_EQ(reboot_counter, 1u);
+}
+
 // Tests that reading times before the start of our interpolation points
 // explodes.
 TEST(InterpolatedTimeConverterDeathTest, ReadLostTime) {
   const distributed_clock::time_point de = distributed_clock::epoch();
-  const monotonic_clock::time_point me = monotonic_clock::epoch();
+  const BootTimestamp me = BootTimestamp::epoch();
 
   constexpr auto kDefaultHistoryDuration =
       InterpolatedTimeConverter::kDefaultHistoryDuration;
@@ -211,22 +283,22 @@
   EXPECT_EQ(
       de + kDefaultHistoryDuration / 2,
       time_converter.ToDistributedClock(0, me + kDefaultHistoryDuration / 2));
-  EXPECT_EQ(
-      me + kDefaultHistoryDuration / 2,
-      time_converter.FromDistributedClock(0, de + kDefaultHistoryDuration / 2));
+  EXPECT_EQ(me + kDefaultHistoryDuration / 2,
+            time_converter.FromDistributedClock(
+                0, de + kDefaultHistoryDuration / 2, 0));
 
   // Double check we can read things from before the start
   EXPECT_EQ(de - kDt, time_converter.ToDistributedClock(0, me - kDt));
-  EXPECT_EQ(me - kDt, time_converter.FromDistributedClock(0, de - kDt));
+  EXPECT_EQ(me - kDt, time_converter.FromDistributedClock(0, de - kDt, 0));
 
   // And at and after the origin.
   EXPECT_EQ(de, time_converter.ToDistributedClock(0, me));
-  EXPECT_EQ(me, time_converter.FromDistributedClock(0, de));
+  EXPECT_EQ(me, time_converter.FromDistributedClock(0, de, 0));
 
   EXPECT_EQ(de + chrono::milliseconds(10),
             time_converter.ToDistributedClock(0, me + kDt));
   EXPECT_EQ(me + chrono::milliseconds(10),
-            time_converter.FromDistributedClock(0, de + kDt));
+            time_converter.FromDistributedClock(0, de + kDt, 0));
 
   // Now force ourselves to forget.
   time_converter.ObserveTimePassed(de + kDefaultHistoryDuration + kDt * 3 / 2);
@@ -234,26 +306,27 @@
   // Yup, can't read the origin anymore.
   EXPECT_DEATH({ LOG(INFO) << time_converter.ToDistributedClock(0, me); },
                "forgotten");
-  EXPECT_DEATH({ LOG(INFO) << time_converter.FromDistributedClock(0, de); },
+  EXPECT_DEATH({ LOG(INFO) << time_converter.FromDistributedClock(0, de, 0); },
                "forgotten");
 
   // But can still read the next point.
   EXPECT_EQ(de + kDt, time_converter.ToDistributedClock(0, me + kDt));
-  EXPECT_EQ(me + kDt, time_converter.FromDistributedClock(0, de + kDt));
+  EXPECT_EQ(me + kDt, time_converter.FromDistributedClock(0, de + kDt, 0));
 }
 
 // Tests unity time with 1 node.
 TEST(InterpolatedTimeConverterTest, SingleNodeTime) {
   const distributed_clock::time_point de = distributed_clock::epoch();
-  const monotonic_clock::time_point me = monotonic_clock::epoch();
+  const BootTimestamp me = BootTimestamp::epoch();
 
   TestingTimeConverter time_converter(1u);
-  time_converter.AddNextTimestamp(
-      de + chrono::seconds(0), {{.boot = 0, .time = me + chrono::seconds(1)}});
+  time_converter.AddNextTimestamp(de + chrono::seconds(0),
+                                  {me + chrono::seconds(1)});
 
-  EXPECT_EQ(time_converter.FromDistributedClock(0, de), me);
-  EXPECT_EQ(time_converter.FromDistributedClock(0, de + chrono::seconds(100)),
-            me + chrono::seconds(100));
+  EXPECT_EQ(time_converter.FromDistributedClock(0, de, 0), me);
+  EXPECT_EQ(
+      time_converter.FromDistributedClock(0, de + chrono::seconds(100), 0),
+      me + chrono::seconds(100));
 
   EXPECT_TRUE(time_converter.NextTimestamp());
 }
diff --git a/aos/network/testing_time_converter.cc b/aos/network/testing_time_converter.cc
index 9558033..c175fe6 100644
--- a/aos/network/testing_time_converter.cc
+++ b/aos/network/testing_time_converter.cc
@@ -71,6 +71,22 @@
   return dt;
 }
 
+void TestingTimeConverter::RebootAt(size_t node_index,
+                                    distributed_clock::time_point t) {
+  CHECK(!first_);
+  const chrono::nanoseconds dt = t - last_distributed_;
+
+  for (size_t i = 0; i < last_monotonic_.size(); ++i) {
+    last_monotonic_[i].time += dt;
+  }
+
+  ++last_monotonic_[node_index].boot;
+  last_monotonic_[node_index].time = monotonic_clock::epoch();
+
+  last_distributed_ = t;
+  ts_.emplace_back(std::make_tuple(last_distributed_, last_monotonic_));
+}
+
 void TestingTimeConverter::AddNextTimestamp(
     distributed_clock::time_point time,
     std::vector<logger::BootTimestamp> times) {
diff --git a/aos/network/testing_time_converter.h b/aos/network/testing_time_converter.h
index 5ffdc01..20b8e16 100644
--- a/aos/network/testing_time_converter.h
+++ b/aos/network/testing_time_converter.h
@@ -6,6 +6,7 @@
 #include <tuple>
 
 #include "aos/events/event_scheduler.h"
+#include "aos/events/logging/boot_timestamp.h"
 #include "aos/network/multinode_timestamp_filter.h"
 #include "aos/time/time.h"
 
@@ -34,6 +35,8 @@
   std::chrono::nanoseconds AddMonotonic(
       std::vector<logger::BootTimestamp> times);
 
+  void RebootAt(size_t node_index, distributed_clock::time_point t);
+
   // Adds a distributed to monotonic clock mapping to the queue.
   void AddNextTimestamp(distributed_clock::time_point time,
                         std::vector<logger::BootTimestamp> times);
@@ -42,6 +45,23 @@
                            std::vector<logger::BootTimestamp>>>
   NextTimestamp() override;
 
+  void set_boot_uuid(size_t node_index, size_t boot_count, UUID uuid) {
+    CHECK(boot_uuids_
+              .emplace(std::make_pair(node_index, boot_count), std ::move(uuid))
+              .second)
+        << ": Duplicate boot";
+  }
+
+  UUID boot_uuid(size_t node_index, size_t boot_count) override {
+    auto it = boot_uuids_.find(std::make_pair(node_index, boot_count));
+    if (it != boot_uuids_.end()) return it->second;
+
+    auto new_it = boot_uuids_.emplace(std::make_pair(node_index, boot_count),
+                                      UUID::Random());
+    CHECK(new_it.second);
+    return new_it.first->second;
+  }
+
  private:
   // List of timestamps.
   std::deque<std::tuple<distributed_clock::time_point,
@@ -53,6 +73,8 @@
   // The last times returned on all clocks.
   distributed_clock::time_point last_distributed_ = distributed_clock::epoch();
   std::vector<logger::BootTimestamp> last_monotonic_;
+
+  std::map<std::pair<size_t, size_t>, UUID> boot_uuids_;
 };
 
 }  // namespace message_bridge
diff --git a/aos/network/timestamp_channel.cc b/aos/network/timestamp_channel.cc
index bbd62c4..88b6ed0 100644
--- a/aos/network/timestamp_channel.cc
+++ b/aos/network/timestamp_channel.cc
@@ -65,11 +65,15 @@
 
 ChannelTimestampSender::ChannelTimestampSender(aos::EventLoop *event_loop)
     : event_loop_(event_loop) {
-  CHECK(configuration::MultiNode(event_loop_->configuration()));
+  if (event_loop_) {
+    CHECK(configuration::MultiNode(event_loop_->configuration()));
+  }
 }
 
 aos::Sender<RemoteMessage> *ChannelTimestampSender::SenderForChannel(
     const Channel *channel, const Connection *connection) {
+  CHECK(event_loop_);
+
   ChannelTimestampFinder finder(event_loop_);
   // Look at any pre-created channel/connection pairs.
   {
diff --git a/aos/network/timestamp_channel.h b/aos/network/timestamp_channel.h
index 738ca10..b78c83a 100644
--- a/aos/network/timestamp_channel.h
+++ b/aos/network/timestamp_channel.h
@@ -43,6 +43,11 @@
 class ChannelTimestampSender {
  public:
   ChannelTimestampSender(aos::EventLoop *event_loop);
+  ChannelTimestampSender() : event_loop_(nullptr) {}
+
+  ChannelTimestampSender(ChannelTimestampSender &&other) noexcept = default;
+  ChannelTimestampSender &operator=(ChannelTimestampSender &&other) noexcept =
+      default;
 
   aos::Sender<RemoteMessage> *SenderForChannel(const Channel *channel,
                                                const Connection *connection);
diff --git a/aos/network/timestamp_filter.cc b/aos/network/timestamp_filter.cc
index 59af3a0..c86dc8e 100644
--- a/aos/network/timestamp_filter.cc
+++ b/aos/network/timestamp_filter.cc
@@ -14,6 +14,8 @@
 namespace message_bridge {
 namespace {
 namespace chrono = std::chrono;
+using logger::BootDuration;
+using logger::BootTimestamp;
 
 std::string TimeString(const aos::monotonic_clock::time_point t,
                        std::chrono::nanoseconds o) {
@@ -55,7 +57,7 @@
   CHECK_GE(*ta, 0.0);
   CHECK_LT(*ta, 1.0);
 }
-void NormalizeTimestamps(logger::BootTimestamp *ta_base, double *ta) {
+void NormalizeTimestamps(BootTimestamp *ta_base, double *ta) {
   NormalizeTimestamps(&ta_base->time, ta);
 }
 
@@ -473,10 +475,10 @@
   return std::make_tuple(std::get<0>(t), std::get<1>(t));
 }
 
-std::pair<std::tuple<logger::BootTimestamp, logger::BootDuration>,
-          std::tuple<logger::BootTimestamp, logger::BootDuration>>
-NoncausalTimestampFilter::FindTimestamps(logger::BootTimestamp ta_base,
-                                         double ta, size_t sample_boot) const {
+std::pair<std::tuple<BootTimestamp, BootDuration>,
+          std::tuple<BootTimestamp, BootDuration>>
+NoncausalTimestampFilter::FindTimestamps(BootTimestamp ta_base, double ta,
+                                         size_t sample_boot) const {
   CHECK_GE(ta, 0.0);
   CHECK_LT(ta, 1.0);
 
@@ -731,9 +733,11 @@
          ((tb - ta) - offset.second);
 }
 
-std::string NoncausalTimestampFilter::DebugOffsetError(
-    logger::BootTimestamp ta_base, double ta, logger::BootTimestamp tb_base,
-    double tb, size_t node_a, size_t node_b) const {
+std::string NoncausalTimestampFilter::DebugOffsetError(BootTimestamp ta_base,
+                                                       double ta,
+                                                       BootTimestamp tb_base,
+                                                       double tb, size_t node_a,
+                                                       size_t node_b) const {
   NormalizeTimestamps(&ta_base, &ta);
   NormalizeTimestamps(&tb_base, &tb);
 
@@ -830,8 +834,8 @@
   return true;
 }
 
-void NoncausalTimestampFilter::Sample(logger::BootTimestamp monotonic_now_all,
-                                      logger::BootDuration sample_ns) {
+void NoncausalTimestampFilter::Sample(BootTimestamp monotonic_now_all,
+                                      BootDuration sample_ns) {
   filter(monotonic_now_all.boot, sample_ns.boot)
       ->Sample(monotonic_now_all.time, sample_ns.duration);
 }
@@ -1114,20 +1118,47 @@
   }
 }
 
-bool NoncausalTimestampFilter::Pop(logger::BootTimestamp time) {
-  // TODO(austin): Auto compute the second boot.
-  CHECK_LE(filters_.size(), 1u);
-  SingleFilter *f = filter(time.boot, 0);
+bool NoncausalTimestampFilter::Pop(BootTimestamp time) {
+  CHECK_GE(filters_.size(), 1u);
+
   VLOG(1) << NodeNames() << " Pop(" << time << ")";
   bool removed = false;
-  // When the timestamp which is the end of the line is popped, we want to
-  // drop it off the list.  Hence the >=
-  while (f->timestamps_size() >= 2 &&
-         time.time >= std::get<0>(f->timestamp(1))) {
-    f->PopFront();
-    removed = true;
+  while (true) {
+    DCHECK_LT(pop_filter_, filters_.size());
+    BootFilter *boot_filter = &filters_[pop_filter_];
+    CHECK(boot_filter != nullptr);
+    size_t timestamps_size = 0;
+    while ((timestamps_size = boot_filter->filter.timestamps_size()) > 2) {
+      // When the timestamp which is the end of the line is popped, we want to
+      // drop it off the list.  Hence the <
+      if (time < BootTimestamp{
+                     .boot = static_cast<size_t>(boot_filter->boot.first),
+                     .time = std::get<0>(boot_filter->filter.timestamp(1))}) {
+        return removed;
+      }
+      boot_filter->filter.PopFront();
+      removed = true;
+    }
+
+    if (timestamps_size == 2) {
+      if (pop_filter_ + 1u >= filters_.size()) {
+        return removed;
+      }
+
+      // There is 1 more filter, see if there is enough data in it to switch
+      // over to it.
+      if (filters_[pop_filter_ + 1].filter.timestamps_size() < 2u) {
+        return removed;
+      }
+      if (time <
+          BootTimestamp{.boot = static_cast<size_t>(boot_filter->boot.first),
+                        .time = std::get<0>(
+                            filters_[pop_filter_ + 1].filter.timestamp(1))}) {
+        return removed;
+      }
+    }
+    ++pop_filter_;
   }
-  return removed;
 }
 
 void NoncausalTimestampFilter::SingleFilter::Debug() const {
@@ -1247,9 +1278,9 @@
   }
 }
 
-void NoncausalOffsetEstimator::Sample(
-    const Node *node, logger::BootTimestamp node_delivered_time,
-    logger::BootTimestamp other_node_sent_time) {
+void NoncausalOffsetEstimator::Sample(const Node *node,
+                                      BootTimestamp node_delivered_time,
+                                      BootTimestamp other_node_sent_time) {
   VLOG(1) << "Sample delivered         " << node_delivered_time << " sent "
           << other_node_sent_time << " " << node->name()->string_view()
           << " -> "
@@ -1268,8 +1299,8 @@
 }
 
 void NoncausalOffsetEstimator::ReverseSample(
-    const Node *node, logger::BootTimestamp node_sent_time,
-    logger::BootTimestamp other_node_delivered_time) {
+    const Node *node, BootTimestamp node_sent_time,
+    BootTimestamp other_node_delivered_time) {
   VLOG(1) << "Reverse sample delivered " << other_node_delivered_time
           << " sent " << node_sent_time << " "
           << ((node == node_a_) ? node_b_ : node_a_)->name()->string_view()
@@ -1287,27 +1318,5 @@
   }
 }
 
-bool NoncausalOffsetEstimator::Pop(const Node *node,
-                                   logger::BootTimestamp node_monotonic_now) {
-  if (node == node_a_) {
-    if (a_.Pop(node_monotonic_now)) {
-      VLOG(1) << "Popping forward sample to " << node_a_->name()->string_view()
-              << " from " << node_b_->name()->string_view() << " at "
-              << node_monotonic_now;
-      return true;
-    }
-  } else if (node == node_b_) {
-    if (b_.Pop(node_monotonic_now)) {
-      VLOG(1) << "Popping reverse sample to " << node_b_->name()->string_view()
-              << " from " << node_a_->name()->string_view() << " at "
-              << node_monotonic_now;
-      return true;
-    }
-  } else {
-    LOG(FATAL) << "Unknown node " << node->name()->string_view();
-  }
-  return false;
-}
-
 }  // namespace message_bridge
 }  // namespace aos
diff --git a/aos/network/timestamp_filter.h b/aos/network/timestamp_filter.h
index 166e811..0645139 100644
--- a/aos/network/timestamp_filter.h
+++ b/aos/network/timestamp_filter.h
@@ -554,6 +554,7 @@
                                  adjusted_initial_time);
         }
       }
+      CHECK_LT(i, timestamps_.size());
       return std::make_tuple(std::get<0>(timestamps_[i]),
                              std::get<1>(timestamps_[i]));
     }
@@ -673,6 +674,9 @@
   std::vector<BootFilter> filters_;
 
   size_t current_filter_ = 0;
+
+  // The filter to resume popping from.
+  size_t pop_filter_ = 0;
 };
 
 // This class holds 2 NoncausalTimestampFilter's and handles averaging the
@@ -712,10 +716,6 @@
   void ReverseSample(const Node *node, logger::BootTimestamp node_sent_time,
                      logger::BootTimestamp other_node_delivered_time);
 
-  // Removes old data points from a node before the provided time.
-  // Returns true if any points were popped.
-  bool Pop(const Node *node, logger::BootTimestamp node_monotonic_now);
-
  private:
   NoncausalTimestampFilter a_;
   NoncausalTimestampFilter b_;
diff --git a/aos/network/timestamp_filter_test.cc b/aos/network/timestamp_filter_test.cc
index 5544711..37d02ac 100644
--- a/aos/network/timestamp_filter_test.cc
+++ b/aos/network/timestamp_filter_test.cc
@@ -1098,16 +1098,16 @@
   EXPECT_EQ(estimator.GetFilter(node_a)->timestamps_size(), 3u);
   EXPECT_EQ(estimator.GetFilter(node_b)->timestamps_size(), 3u);
 
-  estimator.Pop(node_a, ta2);
-  estimator.Pop(node_b, tb2);
+  estimator.GetFilter(node_a)->Pop(ta2);
+  estimator.GetFilter(node_b)->Pop(tb2);
   EXPECT_EQ(estimator.GetFilter(node_a)->timestamps_size(), 2u);
   EXPECT_EQ(estimator.GetFilter(node_b)->timestamps_size(), 2u);
 
-  // And dropping down to 1 point means 0 slope.
-  estimator.Pop(node_a, ta3);
-  estimator.Pop(node_b, tb3);
-  EXPECT_EQ(estimator.GetFilter(node_a)->timestamps_size(), 1u);
-  EXPECT_EQ(estimator.GetFilter(node_b)->timestamps_size(), 1u);
+  // And confirm we won't drop down to 1 point.
+  estimator.GetFilter(node_a)->Pop(ta3);
+  estimator.GetFilter(node_b)->Pop(tb3);
+  EXPECT_EQ(estimator.GetFilter(node_a)->timestamps_size(), 2u);
+  EXPECT_EQ(estimator.GetFilter(node_b)->timestamps_size(), 2u);
 }
 
 }  // namespace testing