Use the new solver to compute time

Now that all the infrastructure exists, hook it up.  Track which nodes
are connected, if there are any orphaned nodes, and everything else the
old code used to do.

This doesn't yet handle single directions going and coming.

Change-Id: I658347797384f7608870d231a3ebbb2c05dad1dc
diff --git a/aos/events/BUILD b/aos/events/BUILD
index 646f597..9533cb5 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -303,6 +303,7 @@
         ":pong_lib",
         ":simulated_event_loop",
         "//aos/network:remote_message_fbs",
+        "//aos/network:testing_time_converter",
         "//aos/testing:googletest",
     ],
 )
diff --git a/aos/events/event_scheduler.cc b/aos/events/event_scheduler.cc
index f5d6c92..49b170d 100644
--- a/aos/events/event_scheduler.cc
+++ b/aos/events/event_scheduler.cc
@@ -44,13 +44,12 @@
 void EventScheduler::CallOldestEvent() {
   CHECK_GT(events_list_.size(), 0u);
   auto iter = events_list_.begin();
-  monotonic_now_ = iter->first;
-  monotonic_now_valid_ = true;
+  CHECK_EQ(monotonic_now(), iter->first)
+      << ": Time is wrong on node " << node_index_;
 
   ::std::function<void()> callback = ::std::move(iter->second);
   events_list_.erase(iter);
   callback();
-  monotonic_now_valid_ = false;
 }
 
 void EventScheduler::RunOnRun() {
@@ -158,6 +157,10 @@
     }
   }
 
+  if (min_scheduler) {
+    VLOG(1) << "Oldest event " << min_event_time << " on scheduler "
+            << min_scheduler->node_index_;
+  }
   return std::make_tuple(min_event_time, min_scheduler);
 }
 
diff --git a/aos/events/event_scheduler.h b/aos/events/event_scheduler.h
index 959caea..397b5f0 100644
--- a/aos/events/event_scheduler.h
+++ b/aos/events/event_scheduler.h
@@ -68,6 +68,13 @@
       std::multimap<monotonic_clock::time_point, std::function<void()>>;
   using Token = ChannelType::iterator;
 
+  // Sets the time converter in use for this scheduler (and the corresponding
+  // node index)
+  void SetTimeConverter(size_t node_index, TimeConverter *converter) {
+    node_index_ = node_index;
+    converter_ = converter;
+  }
+
   // Schedule an event with a callback function
   // Returns an iterator to the event
   Token Schedule(monotonic_clock::time_point time,
@@ -98,53 +105,25 @@
   // measurement.
   distributed_clock::time_point ToDistributedClock(
       monotonic_clock::time_point time) const {
-    return distributed_clock::epoch() +
-           std::chrono::duration_cast<std::chrono::nanoseconds>(
-               (time.time_since_epoch() - distributed_offset_) /
-               distributed_slope_);
+    return converter_->ToDistributedClock(node_index_, time);
   }
 
   // Takes the distributed time and converts it to the monotonic clock for this
   // node.
   monotonic_clock::time_point FromDistributedClock(
       distributed_clock::time_point time) const {
-    return monotonic_clock::epoch() +
-           std::chrono::duration_cast<std::chrono::nanoseconds>(
-               time.time_since_epoch() * distributed_slope_) +
-           distributed_offset_;
+    return converter_->FromDistributedClock(node_index_, time);
   }
 
   // Returns the current monotonic time on this node calculated from the
   // distributed clock.
   inline monotonic_clock::time_point monotonic_now() const;
 
-  // Sets the offset between the distributed and monotonic clock.
-  //   monotonic = distributed * slope + offset;
-  void SetDistributedOffset(std::chrono::nanoseconds distributed_offset,
-                            double distributed_slope) {
-    // TODO(austin): Use a starting point to improve precision.
-    // TODO(austin): Make slope be the slope of the offset, not the input,
-    // throught the calculation process.
-    distributed_offset_ = distributed_offset;
-    distributed_slope_ = distributed_slope;
-
-    // Once we update the offset, now isn't going to be valid anymore.
-    // TODO(austin): Probably should instead use the piecewise linear function
-    // and evaluate it correctly.
-    monotonic_now_valid_ = false;
-  }
-
  private:
   friend class EventSchedulerScheduler;
   // Current execution time.
-  bool monotonic_now_valid_ = false;
   monotonic_clock::time_point monotonic_now_ = monotonic_clock::epoch();
 
-  // Offset to the distributed clock.
-  //   distributed = monotonic + offset;
-  std::chrono::nanoseconds distributed_offset_ = std::chrono::seconds(0);
-  double distributed_slope_ = 1.0;
-
   // List of functions to run (once) when running.
   std::vector<std::function<void()>> on_run_;
 
@@ -154,6 +133,29 @@
 
   // Pointer to the actual scheduler.
   EventSchedulerScheduler *scheduler_scheduler_ = nullptr;
+
+  // Node index handle to be handed back to the TimeConverter.  This lets the
+  // same time converter be used for all the nodes, and the node index
+  // distinguish which one.
+  size_t node_index_ = 0;
+
+  // Converts time by doing nothing to it.
+  class UnityConverter final : public TimeConverter {
+   public:
+    distributed_clock::time_point ToDistributedClock(
+        size_t /*node_index*/, monotonic_clock::time_point time) override {
+      return distributed_clock::epoch() + time.time_since_epoch();
+    }
+
+    monotonic_clock::time_point FromDistributedClock(
+        size_t /*node_index*/, distributed_clock::time_point time) override {
+      return monotonic_clock::epoch() + time.time_since_epoch();
+    }
+  };
+
+  UnityConverter unity_converter_;
+
+  TimeConverter *converter_ = &unity_converter_;
 };
 
 // We need a heap of heaps...
@@ -211,22 +213,7 @@
 };
 
 inline monotonic_clock::time_point EventScheduler::monotonic_now() const {
-  // Make sure we stay in sync.
-  if (monotonic_now_valid_) {
-    // We want time to be smooth, so confirm that it doesn't change too much
-    // while handling an event.
-    //
-    // There are 2 sources of error.  There are numerical precision and interger
-    // rounding problems going from the monotonic clock to the distributed clock
-    // and back again.  When we update the time function as well to transition
-    // line segments, we have a slight jump as well.
-    CHECK_NEAR(monotonic_now_,
-               FromDistributedClock(scheduler_scheduler_->distributed_now()),
-               std::chrono::nanoseconds(2));
-    return monotonic_now_;
-  } else {
-    return FromDistributedClock(scheduler_scheduler_->distributed_now());
-  }
+  return FromDistributedClock(scheduler_scheduler_->distributed_now());
 }
 
 inline bool EventScheduler::is_running() const {
diff --git a/aos/events/event_scheduler_test.cc b/aos/events/event_scheduler_test.cc
index e2523db..1ad0a84 100644
--- a/aos/events/event_scheduler_test.cc
+++ b/aos/events/event_scheduler_test.cc
@@ -8,10 +8,52 @@
 
 namespace chrono = std::chrono;
 
+// Legacy time converter for keeping old tests working.  Has numerical precision
+// problems.
+class SlopeOffsetTimeConverter final : public TimeConverter {
+ public:
+  SlopeOffsetTimeConverter(size_t nodes_count)
+      : distributed_offset_(nodes_count, std::chrono::seconds(0)),
+        distributed_slope_(nodes_count, 1.0) {}
+
+  // Sets the offset between the distributed and monotonic clock.
+  //   monotonic = distributed * slope + offset;
+  void SetDistributedOffset(size_t node_index,
+                            std::chrono::nanoseconds distributed_offset,
+                            double distributed_slope) {
+    distributed_offset_[node_index] = distributed_offset;
+    distributed_slope_[node_index] = distributed_slope;
+  }
+
+  distributed_clock::time_point ToDistributedClock(
+      size_t node_index, monotonic_clock::time_point time) override {
+    return distributed_clock::epoch() +
+           std::chrono::duration_cast<std::chrono::nanoseconds>(
+               (time.time_since_epoch() - distributed_offset_[node_index]) /
+               distributed_slope_[node_index]);
+  }
+
+  monotonic_clock::time_point FromDistributedClock(
+      size_t node_index, distributed_clock::time_point time) override {
+    return monotonic_clock::epoch() +
+           std::chrono::duration_cast<std::chrono::nanoseconds>(
+               time.time_since_epoch() * distributed_slope_[node_index]) +
+           distributed_offset_[node_index];
+  }
+
+ private:
+  // Offset to the distributed clock.
+  //   distributed = monotonic + offset;
+  std::vector<std::chrono::nanoseconds> distributed_offset_;
+  std::vector<double> distributed_slope_;
+};
+
 // Tests that the default parameters (slope of 1, offest of 0) behave as
 // an identity.
 TEST(EventSchedulerTest, IdentityTimeConversion) {
+  SlopeOffsetTimeConverter time(1);
   EventScheduler s;
+  s.SetTimeConverter(0u, &time);
   EXPECT_EQ(s.FromDistributedClock(distributed_clock::epoch()),
             monotonic_clock::epoch());
 
@@ -22,15 +64,16 @@
   EXPECT_EQ(s.ToDistributedClock(monotonic_clock::epoch()),
             distributed_clock::epoch());
 
-  EXPECT_EQ(
-      s.ToDistributedClock(monotonic_clock::epoch() + chrono::seconds(1)),
-      distributed_clock::epoch() + chrono::seconds(1));
+  EXPECT_EQ(s.ToDistributedClock(monotonic_clock::epoch() + chrono::seconds(1)),
+            distributed_clock::epoch() + chrono::seconds(1));
 }
 
 // Tests that a non-unity slope is computed correctly.
 TEST(EventSchedulerTest, DoubleTimeConversion) {
+  SlopeOffsetTimeConverter time(1);
   EventScheduler s;
-  s.SetDistributedOffset(std::chrono::seconds(7), 2.0);
+  s.SetTimeConverter(0u, &time);
+  time.SetDistributedOffset(0u, std::chrono::seconds(7), 2.0);
 
   EXPECT_EQ(s.FromDistributedClock(distributed_clock::epoch()),
             monotonic_clock::epoch() + chrono::seconds(7));
@@ -42,9 +85,8 @@
   EXPECT_EQ(s.ToDistributedClock(monotonic_clock::epoch() + chrono::seconds(7)),
             distributed_clock::epoch());
 
-  EXPECT_EQ(
-      s.ToDistributedClock(monotonic_clock::epoch() + chrono::seconds(9)),
-      distributed_clock::epoch() + chrono::seconds(1));
+  EXPECT_EQ(s.ToDistributedClock(monotonic_clock::epoch() + chrono::seconds(9)),
+            distributed_clock::epoch() + chrono::seconds(1));
 }
 
 }  // namespace aos
diff --git a/aos/events/logging/BUILD b/aos/events/logging/BUILD
index 5d21ae4..0d8fe0b 100644
--- a/aos/events/logging/BUILD
+++ b/aos/events/logging/BUILD
@@ -307,6 +307,7 @@
         "//aos/events:ping_lib",
         "//aos/events:pong_lib",
         "//aos/events:simulated_event_loop",
+        "//aos/network:testing_time_converter",
         "//aos/testing:googletest",
         "//aos/testing:tmpdir",
     ],
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 00322e1..2fe5473 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -766,8 +766,8 @@
   // Only set it if this node delivers to the peer timestamp_mapper.  Otherwise
   // we could needlessly save data.
   if (node_data->any_delivered) {
-    LOG(INFO) << "Registering on node " << node() << " for peer node "
-              << timestamp_mapper->node();
+    VLOG(1) << "Registering on node " << node() << " for peer node "
+            << timestamp_mapper->node();
     CHECK(timestamp_mapper->nodes_data_[node()].peer == nullptr);
 
     timestamp_mapper->nodes_data_[node()].peer = this;
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index d4f4abd..2e0f5db 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -1101,7 +1101,8 @@
   remapped_configuration_ = event_loop_factory_->configuration();
   filters_ =
       std::make_unique<message_bridge::MultiNodeNoncausalOffsetEstimator>(
-          event_loop_factory_);
+          event_loop_factory_, logged_configuration(),
+          FLAGS_skip_order_validation);
 
   for (const Node *node : configuration::GetNodes(configuration())) {
     const size_t node_index =
@@ -1130,6 +1131,7 @@
 
     state->SetChannelCount(logged_configuration()->channels()->size());
   }
+  event_loop_factory_->SetTimeConverter(filters_.get());
 
   for (const Node *node : configuration::GetNodes(configuration())) {
     const size_t node_index =
@@ -1161,15 +1163,12 @@
            "you sure that the replay config matches the original config?";
   }
 
-  filters_->Initialize(logged_configuration());
+  filters_->CheckGraph();
 
   for (std::unique_ptr<State> &state : states_) {
     state->SeedSortedMessages();
   }
 
-  // And solve.
-  UpdateOffsets();
-
   // We want to start the log file at the last start time of the log files
   // from all the nodes.  Compute how long each node's simulation needs to run
   // to move time to this point.
@@ -1191,6 +1190,9 @@
         start_time, state->ToDistributedClock(state->monotonic_start_time()));
   }
 
+  // TODO(austin): If a node doesn't have a start time, we might not queue
+  // enough.  If this happens, we'll explode with a frozen error eventually.
+
   CHECK_GE(start_time, distributed_clock::epoch())
       << ": Hmm, we have a node starting before the start of time.  Offset "
          "everything.";
@@ -1255,14 +1257,6 @@
   }
 }
 
-void LogReader::UpdateOffsets() {
-  filters_->UpdateOffsets();
-
-  if (VLOG_IS_ON(1)) {
-    filters_->LogFit("Offset is");
-  }
-}
-
 message_bridge::NoncausalOffsetEstimator *LogReader::GetFilter(
     const Node *node_a, const Node *node_b) {
   if (filters_) {
@@ -1344,9 +1338,6 @@
       }
       return;
     }
-    if (VLOG_IS_ON(1)) {
-      filters_->LogFit("Offset was");
-    }
 
     bool update_time;
     TimestampedMessage timestamped_message = state->PopOldest(&update_time);
@@ -1388,8 +1379,13 @@
           // Confirm that the message was sent on the sending node before the
           // destination node (this node).  As a proxy, do this by making sure
           // that time on the source node is past when the message was sent.
+          //
+          // TODO(austin): <= means that the cause message (which we know) could
+          // happen after the effect even though we know they are at the same
+          // time.  I doubt anyone will notice for a bit, but we should really
+          // fix that.
           if (!FLAGS_skip_order_validation) {
-            CHECK_LT(
+            CHECK_LE(
                 timestamped_message.monotonic_remote_time,
                 state->monotonic_remote_now(timestamped_message.channel_index))
                 << state->event_loop()->node()->name()->string_view() << " to "
@@ -1401,7 +1397,7 @@
                        logged_configuration()->channels()->Get(
                            timestamped_message.channel_index))
                 << " " << state->DebugString();
-          } else if (timestamped_message.monotonic_remote_time >=
+          } else if (timestamped_message.monotonic_remote_time >
                      state->monotonic_remote_now(
                          timestamped_message.channel_index)) {
             LOG(WARNING)
@@ -1492,7 +1488,6 @@
                        return state->monotonic_now();
                      });
 
-      UpdateOffsets();
       VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Now is now "
               << state->monotonic_now();
 
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index e3f5380..4213066 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -741,10 +741,6 @@
   // less than the second node.
   std::unique_ptr<message_bridge::MultiNodeNoncausalOffsetEstimator> filters_;
 
-  // Updates the offset matrix solution and sets the per-node distributed
-  // offsets in the factory.
-  void UpdateOffsets();
-
   std::unique_ptr<FlatbufferDetachedBuffer<Configuration>>
       remapped_configuration_buffer_;
 
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 05bafe4..16fb2ab 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -7,6 +7,7 @@
 #include "aos/events/pong_lib.h"
 #include "aos/events/simulated_event_loop.h"
 #include "aos/network/remote_message_generated.h"
+#include "aos/network/testing_time_converter.h"
 #include "aos/network/timestamp_generated.h"
 #include "aos/testing/tmpdir.h"
 #include "aos/util/file.h"
@@ -427,11 +428,16 @@
   MultinodeLoggerTest()
       : config_(aos::configuration::ReadConfig(
             "aos/events/logging/multinode_pingpong_config.json")),
+        time_converter_(configuration::NodesCount(&config_.message())),
         event_loop_factory_(&config_.message()),
         pi1_(
             configuration::GetNode(event_loop_factory_.configuration(), "pi1")),
+        pi1_index_(configuration::GetNodeIndex(
+            event_loop_factory_.configuration(), pi1_)),
         pi2_(
             configuration::GetNode(event_loop_factory_.configuration(), "pi2")),
+        pi2_index_(configuration::GetNodeIndex(
+            event_loop_factory_.configuration(), pi2_)),
         tmp_dir_(aos::testing::TestTmpDir()),
         logfile_base1_(tmp_dir_ + "/multi_logfile1"),
         logfile_base2_(tmp_dir_ + "/multi_logfile2"),
@@ -474,6 +480,8 @@
         ping_(ping_event_loop_.get()),
         pong_event_loop_(event_loop_factory_.MakeEventLoop("pong", pi2_)),
         pong_(pong_event_loop_.get()) {
+    event_loop_factory_.SetTimeConverter(&time_converter_);
+
     // Go through and remove the logfiles if they already exist.
     for (const auto file : logfiles_) {
       unlink(file.c_str());
@@ -637,10 +645,13 @@
 
   // Config and factory.
   aos::FlatbufferDetachedBuffer<aos::Configuration> config_;
+  message_bridge::TestingTimeConverter time_converter_;
   SimulatedEventLoopFactory event_loop_factory_;
 
-  const Node *pi1_;
-  const Node *pi2_;
+  const Node *const pi1_;
+  const size_t pi1_index_;
+  const Node *const pi2_;
+  const size_t pi2_index_;
 
   std::string tmp_dir_;
   std::string logfile_base1_;
@@ -723,6 +734,7 @@
 
 // Tests that we can write and read simple multi-node log files.
 TEST_F(MultinodeLoggerTest, SimpleMultiNode) {
+  time_converter_.StartEqual();
   {
     LoggerState pi1_logger = MakeLogger(pi1_);
     LoggerState pi2_logger = MakeLogger(pi2_);
@@ -1054,6 +1066,7 @@
 // Test that if we feed the replay with a mismatched node list that we die on
 // the LogReader constructor.
 TEST_F(MultinodeLoggerDeathTest, MultiNodeBadReplayConfig) {
+  time_converter_.StartEqual();
   {
     LoggerState pi1_logger = MakeLogger(pi1_);
     LoggerState pi2_logger = MakeLogger(pi2_);
@@ -1086,6 +1099,7 @@
 // Tests that we can read log files where they don't start at the same monotonic
 // time.
 TEST_F(MultinodeLoggerTest, StaggeredStart) {
+  time_converter_.StartEqual();
   {
     LoggerState pi1_logger = MakeLogger(pi1_);
     LoggerState pi2_logger = MakeLogger(pi2_);
@@ -1187,58 +1201,78 @@
 // match correctly.  While we are here, also test that different ending times
 // also is readable.
 TEST_F(MultinodeLoggerTest, MismatchedClocks) {
+  // TODO(austin): Negate...
+  const chrono::nanoseconds initial_pi2_offset = chrono::seconds(1000);
+
+  time_converter_.AddMonotonic({monotonic_clock::epoch(),
+                                monotonic_clock::epoch() + initial_pi2_offset});
+  // Wait for 95 ms, (~0.1 seconds - 1/2 of the ping/pong period), and set the
+  // skew to be 200 uS/s
+  const chrono::nanoseconds startup_sleep1 = time_converter_.AddMonotonic(
+      {chrono::milliseconds(95),
+       chrono::milliseconds(95) - chrono::nanoseconds(200) * 95});
+  // Run another 200 ms to have one logger start first.
+  const chrono::nanoseconds startup_sleep2 = time_converter_.AddMonotonic(
+      {chrono::milliseconds(200), chrono::milliseconds(200)});
+  // Slew one way then the other at the same 200 uS/S slew rate.  Make sure we
+  // go far enough to cause problems if this isn't accounted for.
+  const chrono::nanoseconds logger_run1 = time_converter_.AddMonotonic(
+      {chrono::milliseconds(20000),
+       chrono::milliseconds(20000) - chrono::nanoseconds(200) * 20000});
+  const chrono::nanoseconds logger_run2 = time_converter_.AddMonotonic(
+      {chrono::milliseconds(40000),
+       chrono::milliseconds(40000) + chrono::nanoseconds(200) * 40000});
+  const chrono::nanoseconds logger_run3 = time_converter_.AddMonotonic(
+      {chrono::milliseconds(400), chrono::milliseconds(400)});
+
   {
     LoggerState pi2_logger = MakeLogger(pi2_);
 
+    NodeEventLoopFactory *pi1 =
+        event_loop_factory_.GetNodeEventLoopFactory(pi1_);
     NodeEventLoopFactory *pi2 =
         event_loop_factory_.GetNodeEventLoopFactory(pi2_);
     LOG(INFO) << "pi2 times: " << pi2->monotonic_now() << " "
               << pi2->realtime_now() << " distributed "
               << pi2->ToDistributedClock(pi2->monotonic_now());
 
-    const chrono::nanoseconds initial_pi2_offset = -chrono::seconds(1000);
-    chrono::nanoseconds pi2_offset = initial_pi2_offset;
-
-    pi2->SetDistributedOffset(-pi2_offset, 1.0);
     LOG(INFO) << "pi2 times: " << pi2->monotonic_now() << " "
               << pi2->realtime_now() << " distributed "
               << pi2->ToDistributedClock(pi2->monotonic_now());
 
-    for (int i = 0; i < 95; ++i) {
-      pi2_offset += chrono::nanoseconds(200);
-      pi2->SetDistributedOffset(-pi2_offset, 1.0);
-      event_loop_factory_.RunFor(chrono::milliseconds(1));
-    }
+    event_loop_factory_.RunFor(startup_sleep1);
 
     StartLogger(&pi2_logger);
 
-    event_loop_factory_.RunFor(chrono::milliseconds(200));
+    event_loop_factory_.RunFor(startup_sleep2);
 
     {
       // Run pi1's logger for only part of the time.
       LoggerState pi1_logger = MakeLogger(pi1_);
 
       StartLogger(&pi1_logger);
+      event_loop_factory_.RunFor(logger_run1);
 
-      for (int i = 0; i < 20000; ++i) {
-        pi2_offset += chrono::nanoseconds(200);
-        pi2->SetDistributedOffset(-pi2_offset, 1.0);
-        event_loop_factory_.RunFor(chrono::milliseconds(1));
-      }
+      // Make sure we slewed time far enough so that the difference is greater
+      // than the network delay.  This confirms that if we sort incorrectly, it
+      // would show in the results.
+      EXPECT_LT(
+          (pi2->monotonic_now() - pi1->monotonic_now()) - initial_pi2_offset,
+          -event_loop_factory_.send_delay() -
+              event_loop_factory_.network_delay());
 
-      EXPECT_GT(pi2_offset - initial_pi2_offset,
-                event_loop_factory_.send_delay() +
-                    event_loop_factory_.network_delay());
+      event_loop_factory_.RunFor(logger_run2);
 
-      for (int i = 0; i < 40000; ++i) {
-        pi2_offset -= chrono::nanoseconds(200);
-        pi2->SetDistributedOffset(-pi2_offset, 1.0);
-        event_loop_factory_.RunFor(chrono::milliseconds(1));
-      }
+      // And now check that we went far enough the other way to make sure we
+      // cover both problems.
+      EXPECT_GT(
+          (pi2->monotonic_now() - pi1->monotonic_now()) - initial_pi2_offset,
+          event_loop_factory_.send_delay() +
+              event_loop_factory_.network_delay());
     }
 
     // And log a bit more on pi2.
-    event_loop_factory_.RunFor(chrono::milliseconds(400));
+    event_loop_factory_.RunFor(logger_run3);
   }
 
   LogReader reader(SortParts(logfiles_));
@@ -1341,6 +1375,7 @@
 
 // Tests that we can sort a bunch of parts into the pre-determined sorted parts.
 TEST_F(MultinodeLoggerTest, SortParts) {
+  time_converter_.StartEqual();
   // Make a bunch of parts.
   {
     LoggerState pi1_logger = MakeLogger(pi1_);
@@ -1361,6 +1396,7 @@
 // Tests that we can sort a bunch of parts with an empty part.  We should ignore
 // it and remove it from the sorted list.
 TEST_F(MultinodeLoggerTest, SortEmptyParts) {
+  time_converter_.StartEqual();
   // Make a bunch of parts.
   {
     LoggerState pi1_logger = MakeLogger(pi1_);
@@ -1388,6 +1424,7 @@
 // Tests that we can sort a bunch of parts with an empty .xz file in there.  The
 // empty file should be ignored.
 TEST_F(MultinodeLoggerTest, SortEmptyCompressedParts) {
+  time_converter_.StartEqual();
   // Make a bunch of parts.
   {
     LoggerState pi1_logger = MakeLogger(pi1_);
@@ -1416,6 +1453,7 @@
 // Tests that we can sort a bunch of parts with the end missing off a compressed
 // file.  We should use the part we can read.
 TEST_F(MultinodeLoggerTest, SortTruncatedCompressedParts) {
+  time_converter_.StartEqual();
   // Make a bunch of parts.
   {
     LoggerState pi1_logger = MakeLogger(pi1_);
@@ -1447,6 +1485,7 @@
 
 // Tests that if we remap a remapped channel, it shows up correctly.
 TEST_F(MultinodeLoggerTest, RemapLoggedChannel) {
+  time_converter_.StartEqual();
   {
     LoggerState pi1_logger = MakeLogger(pi1_);
     LoggerState pi2_logger = MakeLogger(pi2_);
@@ -1513,6 +1552,7 @@
 // This should be enough that we can then re-run the logger and get a valid log
 // back.
 TEST_F(MultinodeLoggerTest, MessageHeader) {
+  time_converter_.StartEqual();
   {
     LoggerState pi1_logger = MakeLogger(pi1_);
     LoggerState pi2_logger = MakeLogger(pi2_);
@@ -1741,15 +1781,13 @@
 // Tests that we properly populate and extract the logger_start time by setting
 // up a clock difference between 2 nodes and looking at the resulting parts.
 TEST_F(MultinodeLoggerTest, LoggerStartTime) {
+  time_converter_.AddMonotonic(
+      {monotonic_clock::epoch(),
+       monotonic_clock::epoch() + chrono::seconds(1000)});
   {
     LoggerState pi1_logger = MakeLogger(pi1_);
     LoggerState pi2_logger = MakeLogger(pi2_);
 
-    NodeEventLoopFactory *pi2 =
-        event_loop_factory_.GetNodeEventLoopFactory(pi2_);
-
-    pi2->SetDistributedOffset(chrono::seconds(1000), 1.0);
-
     StartLogger(&pi1_logger);
     StartLogger(&pi2_logger);
 
@@ -1785,6 +1823,7 @@
 // This should be enough that we can then re-run the logger and get a valid log
 // back.
 TEST_F(MultinodeLoggerDeathTest, RemoteReboot) {
+  time_converter_.StartEqual();
   std::string pi2_boot1;
   std::string pi2_boot2;
   {
@@ -1828,8 +1867,13 @@
 // unavailable.
 TEST_F(MultinodeLoggerTest, OneDirectionWithNegativeSlope) {
   event_loop_factory_.GetNodeEventLoopFactory(pi1_)->Disconnect(pi2_);
-  event_loop_factory_.GetNodeEventLoopFactory(pi2_)->SetDistributedOffset(
-      chrono::seconds(1000), 0.99999);
+  time_converter_.AddMonotonic(
+      {monotonic_clock::epoch(),
+       monotonic_clock::epoch() + chrono::seconds(1000)});
+
+  time_converter_.AddMonotonic(
+      {chrono::milliseconds(10000),
+       chrono::milliseconds(10000) - chrono::milliseconds(1)});
   {
     LoggerState pi1_logger = MakeLogger(pi1_);
 
@@ -1849,8 +1893,36 @@
 // unavailable.
 TEST_F(MultinodeLoggerTest, OneDirectionWithPositiveSlope) {
   event_loop_factory_.GetNodeEventLoopFactory(pi1_)->Disconnect(pi2_);
-  event_loop_factory_.GetNodeEventLoopFactory(pi2_)->SetDistributedOffset(
-      chrono::seconds(500), 1.00001);
+  time_converter_.AddMonotonic(
+      {monotonic_clock::epoch(),
+       monotonic_clock::epoch() + chrono::seconds(500)});
+
+  time_converter_.AddMonotonic(
+      {chrono::milliseconds(10000),
+       chrono::milliseconds(10000) + chrono::milliseconds(1)});
+  {
+    LoggerState pi1_logger = MakeLogger(pi1_);
+
+    event_loop_factory_.RunFor(chrono::milliseconds(95));
+
+    StartLogger(&pi1_logger);
+
+    event_loop_factory_.RunFor(chrono::milliseconds(10000));
+  }
+
+  // Confirm that we can parse the result.  LogReader has enough internal CHECKs
+  // to confirm the right thing happened.
+  ConfirmReadable(pi1_single_direction_logfiles_);
+}
+
+// Tests that we properly handle a dead node.  Do this by just disconnecting it
+// and only using one nodes of logs.
+TEST_F(MultinodeLoggerTest, DeadNode) {
+  event_loop_factory_.GetNodeEventLoopFactory(pi1_)->Disconnect(pi2_);
+  event_loop_factory_.GetNodeEventLoopFactory(pi2_)->Disconnect(pi1_);
+  time_converter_.AddMonotonic(
+      {monotonic_clock::epoch(),
+       monotonic_clock::epoch() + chrono::seconds(1000)});
   {
     LoggerState pi1_logger = MakeLogger(pi1_);
 
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index 8621851..750df39 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -1034,6 +1034,13 @@
   return result->get();
 }
 
+void SimulatedEventLoopFactory::SetTimeConverter(
+    TimeConverter *time_converter) {
+  for (std::unique_ptr<NodeEventLoopFactory> &factory : node_factories_) {
+    factory->SetTimeConverter(time_converter);
+  }
+}
+
 ::std::unique_ptr<EventLoop> SimulatedEventLoopFactory::MakeEventLoop(
     std::string_view name, const Node *node) {
   if (node == nullptr) {
@@ -1092,9 +1099,7 @@
   }
 }
 
-void SimulatedEventLoopFactory::Exit() {
-  scheduler_scheduler_.Exit();
-}
+void SimulatedEventLoopFactory::Exit() { scheduler_scheduler_.Exit(); }
 
 void SimulatedEventLoopFactory::DisableForwarding(const Channel *channel) {
   CHECK(bridge_) << ": Can't disable forwarding without a message bridge.";
diff --git a/aos/events/simulated_event_loop.h b/aos/events/simulated_event_loop.h
index 8494593..4a2b289 100644
--- a/aos/events/simulated_event_loop.h
+++ b/aos/events/simulated_event_loop.h
@@ -73,6 +73,9 @@
   // lifetime identical to the factory.
   NodeEventLoopFactory *GetNodeEventLoopFactory(const Node *node);
 
+  // Sets the time converter for all nodes.
+  void SetTimeConverter(TimeConverter *time_converter);
+
   // Starts executing the event loops unconditionally.
   void Run();
   // Executes the event loops for a duration.
@@ -166,16 +169,19 @@
 
   // Converts a time to the distributed clock for scheduling and cross-node time
   // measurement.
+  // Note: converting time too far in the future can cause problems when
+  // replaying logs.  Only convert times in the present or near past.
   inline distributed_clock::time_point ToDistributedClock(
       monotonic_clock::time_point time) const;
   inline monotonic_clock::time_point FromDistributedClock(
       distributed_clock::time_point time) const;
 
-  // Sets the offset between the monotonic clock and the central distributed
-  // clock.  distributed_clock = monotonic_clock + offset.
-  void SetDistributedOffset(std::chrono::nanoseconds monotonic_offset,
-                            double monotonic_slope) {
-    scheduler_.SetDistributedOffset(monotonic_offset, monotonic_slope);
+  // Sets the class used to convert time.  This pointer must out-live the
+  // SimulatedEventLoopFactory.
+  void SetTimeConverter(TimeConverter *time_converter) {
+    scheduler_.SetTimeConverter(
+        configuration::GetNodeIndex(factory_->configuration(), node_),
+        time_converter);
   }
 
   // Returns the boot UUID for this node.
diff --git a/aos/events/simulated_event_loop_test.cc b/aos/events/simulated_event_loop_test.cc
index 18e7cc9..0fb757b 100644
--- a/aos/events/simulated_event_loop_test.cc
+++ b/aos/events/simulated_event_loop_test.cc
@@ -11,6 +11,7 @@
 #include "aos/network/message_bridge_client_generated.h"
 #include "aos/network/message_bridge_server_generated.h"
 #include "aos/network/remote_message_generated.h"
+#include "aos/network/testing_time_converter.h"
 #include "aos/network/timestamp_generated.h"
 #include "gtest/gtest.h"
 
@@ -690,15 +691,27 @@
       aos::configuration::ReadConfig(ConfigPrefix() +
                                      "events/multinode_pingpong_config.json");
   const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
+  const size_t pi1_index = configuration::GetNodeIndex(&config.message(), pi1);
+  ASSERT_EQ(pi1_index, 0u);
   const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
+  const size_t pi2_index = configuration::GetNodeIndex(&config.message(), pi2);
+  ASSERT_EQ(pi2_index, 1u);
   const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
+  const size_t pi3_index = configuration::GetNodeIndex(&config.message(), pi3);
+  ASSERT_EQ(pi3_index, 2u);
 
+  message_bridge::TestingTimeConverter time(
+      configuration::NodesCount(&config.message()));
   SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
   NodeEventLoopFactory *pi2_factory =
       simulated_event_loop_factory.GetNodeEventLoopFactory(pi2);
+  pi2_factory->SetTimeConverter(&time);
 
   constexpr chrono::milliseconds kOffset{1501};
-  pi2_factory->SetDistributedOffset(kOffset, 1.0);
+  time.AddNextTimestamp(
+      distributed_clock::epoch(),
+      {monotonic_clock::epoch(), monotonic_clock::epoch() + kOffset,
+       monotonic_clock::epoch()});
 
   std::unique_ptr<EventLoop> ping_event_loop =
       simulated_event_loop_factory.MakeEventLoop("ping", pi1);
@@ -711,15 +724,15 @@
   // Wait to let timestamp estimation start up before looking for the results.
   simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
 
+  std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
+      simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
+
   std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
       simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
 
   std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
       simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
 
-  std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
-      simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
-
   // Confirm the offsets are being recovered correctly.
   int pi1_server_statistics_count = 0;
   pi1_pong_counter_event_loop->MakeWatcher(
@@ -1188,16 +1201,32 @@
       aos::configuration::ReadConfig(ConfigPrefix() +
                                      "events/multinode_pingpong_config.json");
   const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
+  const size_t pi1_index = configuration::GetNodeIndex(&config.message(), pi1);
+  ASSERT_EQ(pi1_index, 0u);
   const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
+  const size_t pi2_index = configuration::GetNodeIndex(&config.message(), pi2);
+  ASSERT_EQ(pi2_index, 1u);
+  const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
+  const size_t pi3_index = configuration::GetNodeIndex(&config.message(), pi3);
+  ASSERT_EQ(pi3_index, 2u);
 
+  message_bridge::TestingTimeConverter time(
+      configuration::NodesCount(&config.message()));
   SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
   NodeEventLoopFactory *pi2_factory =
       simulated_event_loop_factory.GetNodeEventLoopFactory(pi2);
+  pi2_factory->SetTimeConverter(&time);
 
-  // Move the pi far into the future so the slope is significant.  And set it to
-  // something reasonable.
   constexpr chrono::milliseconds kOffset{150100};
-  pi2_factory->SetDistributedOffset(kOffset, 1.0001);
+  time.AddNextTimestamp(
+      distributed_clock::epoch(),
+      {monotonic_clock::epoch(), monotonic_clock::epoch() + kOffset,
+       monotonic_clock::epoch()});
+  time.AddNextTimestamp(
+      distributed_clock::epoch() + chrono::seconds(10),
+      {monotonic_clock::epoch() + chrono::milliseconds(9999),
+       monotonic_clock::epoch() + kOffset + chrono::seconds(10),
+       monotonic_clock::epoch() + chrono::milliseconds(9999)});
 
   std::unique_ptr<EventLoop> ping_event_loop =
       simulated_event_loop_factory.MakeEventLoop("ping", pi1);