Add a InterpolatedTimeConverter to wrap time conversion

This gives us an interface that we can use in EventScheduler to query
the current time, and helpers to interpolate time from a list of points.

Change-Id: I44bd5f0f795604c63a29cb1e3f65815b790edb6a
diff --git a/aos/events/event_scheduler.h b/aos/events/event_scheduler.h
index c067048..959caea 100644
--- a/aos/events/event_scheduler.h
+++ b/aos/events/event_scheduler.h
@@ -43,6 +43,23 @@
 std::ostream &operator<<(std::ostream &stream,
                          const aos::distributed_clock::time_point &now);
 
+// Interface to handle converting time on a node to and from the distributed
+// clock accurately.
+class TimeConverter {
+ public:
+  virtual ~TimeConverter() {}
+
+  // Converts a time to the distributed clock for scheduling and cross-node
+  // time measurement.
+  virtual distributed_clock::time_point ToDistributedClock(
+      size_t node_index, monotonic_clock::time_point time) = 0;
+
+  // Takes the distributed time and converts it to the monotonic clock for this
+  // node.
+  virtual monotonic_clock::time_point FromDistributedClock(
+      size_t node_index, distributed_clock::time_point time) = 0;
+};
+
 class EventSchedulerScheduler;
 
 class EventScheduler {
diff --git a/aos/network/BUILD b/aos/network/BUILD
index 6053260..d8fc474 100644
--- a/aos/network/BUILD
+++ b/aos/network/BUILD
@@ -494,6 +494,18 @@
     ],
 )
 
+cc_library(
+    name = "testing_time_converter",
+    testonly = True,
+    srcs = ["testing_time_converter.cc"],
+    hdrs = ["testing_time_converter.h"],
+    deps = [
+        ":multinode_timestamp_filter",
+        "//aos/events:simulated_event_loop",
+        "//aos/time",
+    ],
+)
+
 cc_test(
     name = "multinode_timestamp_filter_test",
     srcs = [
@@ -502,6 +514,7 @@
     target_compatible_with = ["@platforms//os:linux"],
     deps = [
         ":multinode_timestamp_filter",
+        ":testing_time_converter",
         ":timestamp_filter",
         "//aos/testing:googletest",
         "@com_github_stevengj_nlopt//:nlopt",
diff --git a/aos/network/multinode_timestamp_filter.cc b/aos/network/multinode_timestamp_filter.cc
index 4e8e656..6a63b0c 100644
--- a/aos/network/multinode_timestamp_filter.cc
+++ b/aos/network/multinode_timestamp_filter.cc
@@ -1,6 +1,7 @@
 #include "aos/network/multinode_timestamp_filter.h"
 
 #include <chrono>
+#include <functional>
 #include <map>
 
 #include "absl/strings/str_join.h"
@@ -194,6 +195,187 @@
   }
 }
 
+void InterpolatedTimeConverter::QueueUntil(
+    std::function<
+        bool(const std::tuple<distributed_clock::time_point,
+                              std::vector<monotonic_clock::time_point>> &)>
+        not_done) {
+  while (!at_end_ && (times_.empty() || not_done(times_.back()))) {
+    std::optional<std::tuple<distributed_clock::time_point,
+                             std::vector<monotonic_clock::time_point>>>
+        next_time = NextTimestamp();
+    if (!next_time) {
+      VLOG(1) << "Last timestamp, calling it quits";
+      at_end_ = true;
+      break;
+    }
+    VLOG(1) << "Fetched next timestamp while solving.";
+    for (monotonic_clock::time_point t : std::get<1>(*next_time)) {
+      VLOG(1) << "  " << t;
+    }
+    CHECK_EQ(node_count_, std::get<1>(*next_time).size());
+    times_.emplace_back(std::move(*next_time));
+  }
+
+  CHECK(!times_.empty())
+      << ": Found no times to do timestamp estimation, please investigate.";
+  // Keep at least 50 points and 10 seconds of time.
+  while (times_.size() > 50 &&
+         std::get<0>(times_.front()) + chrono::seconds(10) <
+             std::get<0>(times_.back())) {
+    times_.pop_front();
+    have_popped_ = true;
+  }
+}
+
+distributed_clock::time_point InterpolatedTimeConverter::ToDistributedClock(
+    size_t node_index, monotonic_clock::time_point time) {
+  CHECK_LT(node_index, node_count_);
+  // If there is only one node, time estimation makes no sense.  Just return
+  // unity time.
+  if (node_count_ == 1u) {
+    return distributed_clock::epoch() + time.time_since_epoch();
+  }
+
+  // Make sure there are enough timestamps in the queue.
+  QueueUntil(
+      [time, node_index](
+          const std::tuple<distributed_clock::time_point,
+                           std::vector<monotonic_clock::time_point>> &t) {
+        return std::get<1>(t)[node_index] < time;
+      });
+
+  // Before the beginning needs to have 0 slope otherwise time jumps when
+  // timestamp 2 happens.
+  if (times_.size() == 1u || time < std::get<1>(times_[0])[node_index]) {
+    if (time < std::get<1>(times_[0])[node_index]) {
+      CHECK(!have_popped_)
+          << ": Trying to interpolate time " << time
+          << " but we have forgotten the relevant points already.";
+    }
+    const distributed_clock::time_point result =
+        time - std::get<1>(times_[0])[node_index] + std::get<0>(times_[0]);
+    VLOG(2) << "ToDistributedClock(" << node_index << ", " << time << ") -> "
+            << result;
+    return result;
+  }
+
+  // Now, find the corresponding timestamps.  Search from the back since that's
+  // where most of the times we care about will be.
+  size_t index = times_.size() - 2u;
+  while (index > 0u) {
+    if (std::get<1>(times_[index])[node_index] <= time) {
+      break;
+    }
+    --index;
+  }
+
+  // Interpolate with the two of these.
+  const distributed_clock::time_point d0 = std::get<0>(times_[index]);
+  const distributed_clock::time_point d1 = std::get<0>(times_[index + 1]);
+
+  const monotonic_clock::time_point t0 = std::get<1>(times_[index])[node_index];
+  const monotonic_clock::time_point t1 =
+      std::get<1>(times_[index + 1])[node_index];
+
+  const chrono::nanoseconds dt = (t1 - t0);
+
+  CHECK_NE(dt.count(), 0u) << " t0 " << t0 << " t1 " << t1 << " d0 " << d0
+                           << " d1 " << d1 << " looking up monotonic " << time;
+  // Basic interpolation between 2 points look like
+  //  p0.d + (t - p0.t) * (p1.d - p0.d) / (p1.t - p0.t)
+  // This can be multiplied out with integer arithmetic to get exact results.
+  // Since we are using integer arithmetic, we want to round to the nearest, not
+  // towards 0.  To do that, we want to add half of the denominator when > 0,
+  // and subtract when < 0 so we round correctly.  Multiply before dividing so
+  // we don't round early, and use 128 bit arithmetic to guarantee that 64 bit
+  // multiplication fits.
+  absl::int128 numerator =
+      absl::int128((time - t0).count()) * absl::int128((d1 - d0).count());
+  numerator += numerator > 0 ? absl::int128(dt.count() / 2)
+                             : -absl::int128(dt.count() / 2);
+  const distributed_clock::time_point result =
+      d0 + std::chrono::nanoseconds(
+               static_cast<int64_t>(numerator / absl::int128(dt.count())));
+  VLOG(2) << "ToDistributedClock(" << node_index << ", " << time << ") -> "
+          << result;
+  return result;
+}
+
+monotonic_clock::time_point InterpolatedTimeConverter::FromDistributedClock(
+    size_t node_index, distributed_clock::time_point time) {
+  CHECK_LT(node_index, node_count_);
+  // If there is only one node, time estimation makes no sense.  Just return
+  // unity time.
+  if (node_count_ == 1u) {
+    return monotonic_clock::epoch() + time.time_since_epoch();
+  }
+
+  // Make sure there are enough timestamps in the queue.
+  QueueUntil(
+      [time](const std::tuple<distributed_clock::time_point,
+                              std::vector<monotonic_clock::time_point>> &t) {
+        return std::get<0>(t) < time;
+      });
+
+  if (times_.size() == 1u || time < std::get<0>(times_[0])) {
+    if (time < std::get<0>(times_[0])) {
+      CHECK(!have_popped_)
+          << ": Trying to interpolate time " << time
+          << " but we have forgotten the relevant points already.";
+    }
+    monotonic_clock::time_point result =
+        time - std::get<0>(times_[0]) + std::get<1>(times_[0])[node_index];
+    VLOG(2) << "FromDistributedClock(" << node_index << ", " << time << ") -> "
+            << result;
+    return result;
+  }
+
+  // Now, find the corresponding timestamps.  Search from the back since that's
+  // where most of the times we care about will be.
+  size_t index = times_.size() - 2u;
+  while (index > 0u) {
+    if (std::get<0>(times_[index]) <= time) {
+      break;
+    }
+    --index;
+  }
+
+  // Interpolate with the two of these.
+  const distributed_clock::time_point d0 = std::get<0>(times_[index]);
+  const distributed_clock::time_point d1 = std::get<0>(times_[index + 1]);
+
+  const monotonic_clock::time_point t0 = std::get<1>(times_[index])[node_index];
+  const monotonic_clock::time_point t1 =
+      std::get<1>(times_[index + 1])[node_index];
+
+  const chrono::nanoseconds dd = d1 - d0;
+
+  CHECK_NE(dd.count(), 0u) << " t0 " << t0 << " t1 " << t1 << "d0 " << d0
+                           << " d1 " << d1 << " looking up distributed "
+                           << time;
+
+  // Basic interpolation between 2 points look like
+  //  p0.t + (t - p0.d) * (p1.t - p0.t) / (p1.d - p0.d)
+  // This can be multiplied out with integer arithmetic to get exact results.
+  // Since we are using integer arithmetic, we want to round to the nearest, not
+  // towards 0.  To do that, we want to add half of the denominator when > 0,
+  // and subtract when < 0 so we round correctly.  Multiply before dividing so
+  // we don't round early, and use 128 bit arithmetic to guarantee that 64 bit
+  // multiplication fits.
+  absl::int128 numerator =
+      absl::int128((time - d0).count()) * absl::int128((t1 - t0).count());
+  numerator += numerator > 0 ? absl::int128(dd.count() / 2)
+                             : -absl::int128(dd.count() / 2);
+
+  const monotonic_clock::time_point result =
+      t0 + std::chrono::nanoseconds(
+               static_cast<int64_t>(numerator / absl::int128(dd.count())));
+  VLOG(2) << "FromDistributedClock(" << node_index << ", " << time << ") -> "
+          << result;
+  return result;
+}
+
 void MultiNodeNoncausalOffsetEstimator::Start(
     SimulatedEventLoopFactory *factory) {
   for (std::pair<const std::tuple<const Node *, const Node *>,
diff --git a/aos/network/multinode_timestamp_filter.h b/aos/network/multinode_timestamp_filter.h
index 3e00d87..e9c1d08 100644
--- a/aos/network/multinode_timestamp_filter.h
+++ b/aos/network/multinode_timestamp_filter.h
@@ -1,6 +1,7 @@
 #ifndef AOS_NETWORK_MULTINODE_TIMESTAMP_FILTER_H_
 #define AOS_NETWORK_MULTINODE_TIMESTAMP_FILTER_H_
 
+#include <functional>
 #include <map>
 #include <string_view>
 
@@ -117,6 +118,61 @@
   std::vector<std::vector<FilterPair>> filters_;
 };
 
+// Helpers to convert times between the monotonic and distributed clocks for
+// multiple nodes using a list of points and interpolation.
+class InterpolatedTimeConverter : public TimeConverter {
+ public:
+  InterpolatedTimeConverter(size_t node_count) : node_count_(node_count) {}
+
+  virtual ~InterpolatedTimeConverter() {}
+
+  // Converts a time to the distributed clock for scheduling and cross-node
+  // time measurement.
+  distributed_clock::time_point ToDistributedClock(
+      size_t node_index, monotonic_clock::time_point time) override;
+
+  // Takes the distributed time and converts it to the monotonic clock for this
+  // node.
+  monotonic_clock::time_point FromDistributedClock(
+      size_t node_index, distributed_clock::time_point time) override;
+
+ private:
+  // Returns the next timestamp, or nullopt if there isn't one. It is assumed
+  // that if there isn't one, there never will be one.
+  // A timestamp is a sample of the distributed clock and a corresponding point
+  // on every monotonic clock for all the nodes in the factory that this will be
+  // hooked up to.
+  virtual std::optional<std::tuple<distributed_clock::time_point,
+                                   std::vector<monotonic_clock::time_point>>>
+  NextTimestamp() = 0;
+
+  // Queues timestamps util the last time in the queue matches the provided
+  // function.
+  void QueueUntil(
+      std::function<
+          bool(const std::tuple<distributed_clock::time_point,
+                                std::vector<monotonic_clock::time_point>> &)>
+          not_done);
+
+  // The number of nodes to enforce.
+  const size_t node_count_;
+
+  // List of timestamps.
+  std::deque<std::tuple<distributed_clock::time_point,
+                        std::vector<monotonic_clock::time_point>>>
+      times_;
+
+  // If true, we have popped data from times_, so anything before the start is
+  // unknown.
+  bool have_popped_ = false;
+
+ protected:
+  // If true, NextTimestamp returned nothing, so don't bother checking again.
+  // (This also enforces that we don't find more time data after calling it
+  // quits.)
+  bool at_end_ = false;
+};
+
 // Class to hold a NoncausalOffsetEstimator per pair of communicating nodes, and
 // to estimate and set the overall time of all nodes.
 class MultiNodeNoncausalOffsetEstimator {
diff --git a/aos/network/multinode_timestamp_filter_test.cc b/aos/network/multinode_timestamp_filter_test.cc
index 28d0183..69f76c5 100644
--- a/aos/network/multinode_timestamp_filter_test.cc
+++ b/aos/network/multinode_timestamp_filter_test.cc
@@ -7,6 +7,7 @@
 #include "aos/json_to_flatbuffer.h"
 #include "aos/macros.h"
 #include "aos/network/multinode_timestamp_filter.h"
+#include "aos/network/testing_time_converter.h"
 #include "gtest/gtest.h"
 
 namespace aos {
@@ -133,6 +134,193 @@
   }
 }
 
+// Tests that a single timestamp InterpolatedTimeConverter returns equal
+// results.  1 second should be 1 second everywhere.
+TEST(InterpolatedTimeConverterTest, OneTime) {
+  const distributed_clock::time_point de = distributed_clock::epoch();
+  const monotonic_clock::time_point me = monotonic_clock::epoch();
+
+  TestingTimeConverter time_converter(3u);
+  time_converter.AddNextTimestamp(
+      de + chrono::seconds(0),
+      {me + chrono::seconds(1), me + chrono::seconds(10),
+       me + chrono::seconds(1000)});
+
+  EXPECT_EQ(time_converter.FromDistributedClock(0, de - chrono::seconds(1)),
+            me + chrono::seconds(0));
+  EXPECT_EQ(time_converter.FromDistributedClock(1, de - chrono::seconds(1)),
+            me + chrono::seconds(9));
+  EXPECT_EQ(time_converter.FromDistributedClock(2, de - chrono::seconds(1)),
+            me + chrono::seconds(999));
+  EXPECT_EQ(time_converter.ToDistributedClock(0, me + chrono::seconds(0)),
+            de - chrono::seconds(1));
+  EXPECT_EQ(time_converter.ToDistributedClock(1, me + chrono::seconds(9)),
+            de - chrono::seconds(1));
+  EXPECT_EQ(time_converter.ToDistributedClock(2, me + chrono::seconds(999)),
+            de - chrono::seconds(1));
+
+  EXPECT_EQ(time_converter.FromDistributedClock(0, de),
+            me + chrono::seconds(1));
+  EXPECT_EQ(time_converter.FromDistributedClock(1, de),
+            me + chrono::seconds(10));
+  EXPECT_EQ(time_converter.FromDistributedClock(2, de),
+            me + chrono::seconds(1000));
+  EXPECT_EQ(time_converter.ToDistributedClock(0, me + chrono::seconds(1)), de);
+  EXPECT_EQ(time_converter.ToDistributedClock(1, me + chrono::seconds(10)), de);
+  EXPECT_EQ(time_converter.ToDistributedClock(2, me + chrono::seconds(1000)),
+            de);
+}
+
+// Tests that actual interpolation works as expected for multiple timestamps.
+TEST(InterpolatedTimeConverterTest, Interpolation) {
+  const distributed_clock::time_point de = distributed_clock::epoch();
+  const monotonic_clock::time_point me = monotonic_clock::epoch();
+
+  TestingTimeConverter time_converter(3u);
+  // Test that 2 timestamps interpolate correctly.
+  time_converter.AddNextTimestamp(
+      de + chrono::seconds(0),
+      {me + chrono::seconds(1), me + chrono::seconds(10),
+       me + chrono::seconds(1000)});
+  time_converter.AddNextTimestamp(
+      de + chrono::seconds(1),
+      {me + chrono::seconds(2), me + chrono::seconds(11),
+       me + chrono::seconds(1001)});
+
+  EXPECT_EQ(
+      time_converter.FromDistributedClock(0, de + chrono::milliseconds(500)),
+      me + chrono::milliseconds(1500));
+  EXPECT_EQ(
+      time_converter.FromDistributedClock(1, de + chrono::milliseconds(500)),
+      me + chrono::milliseconds(10500));
+  EXPECT_EQ(
+      time_converter.FromDistributedClock(2, de + chrono::milliseconds(500)),
+      me + chrono::milliseconds(1000500));
+  EXPECT_EQ(
+      time_converter.ToDistributedClock(0, me + chrono::milliseconds(1500)),
+      de + chrono::milliseconds(500));
+  EXPECT_EQ(
+      time_converter.ToDistributedClock(1, me + chrono::milliseconds(10500)),
+      de + chrono::milliseconds(500));
+  EXPECT_EQ(
+      time_converter.ToDistributedClock(2, me + chrono::milliseconds(1000500)),
+      de + chrono::milliseconds(500));
+
+  // And that we can interpolate between points not at the start.
+  time_converter.AddNextTimestamp(
+      de + chrono::seconds(2),
+      {me + chrono::seconds(3) - chrono::milliseconds(2),
+       me + chrono::seconds(12) - chrono::milliseconds(2),
+       me + chrono::seconds(1002)});
+
+  time_converter.AddNextTimestamp(
+      de + chrono::seconds(3),
+      {me + chrono::seconds(4) - chrono::milliseconds(4),
+       me + chrono::seconds(13) - chrono::milliseconds(2),
+       me + chrono::seconds(1003) - chrono::milliseconds(2)});
+
+  EXPECT_EQ(
+      time_converter.FromDistributedClock(0, de + chrono::milliseconds(2500)),
+      me + chrono::milliseconds(3497));
+  EXPECT_EQ(
+      time_converter.FromDistributedClock(1, de + chrono::milliseconds(2500)),
+      me + chrono::milliseconds(12498));
+  EXPECT_EQ(
+      time_converter.FromDistributedClock(2, de + chrono::milliseconds(2500)),
+      me + chrono::milliseconds(1002499));
+  EXPECT_EQ(
+      time_converter.ToDistributedClock(0, me + chrono::milliseconds(3497)),
+      de + chrono::milliseconds(2500));
+  EXPECT_EQ(
+      time_converter.ToDistributedClock(1, me + chrono::milliseconds(12498)),
+      de + chrono::milliseconds(2500));
+  EXPECT_EQ(
+      time_converter.ToDistributedClock(2, me + chrono::milliseconds(1002499)),
+      de + chrono::milliseconds(2500));
+}
+
+// Tests that reading times before the start of our interpolation points
+// explodes.
+TEST(InterpolatedTimeConverterDeathTest, ReadLostTime) {
+  const distributed_clock::time_point de = distributed_clock::epoch();
+  const monotonic_clock::time_point me = monotonic_clock::epoch();
+
+  TestingTimeConverter time_converter(3u);
+  time_converter.StartEqual();
+
+  // Test that 2 timestamps interpolate correctly.
+  for (int i = 0; i < 200; ++i) {
+    time_converter.AddMonotonic({chrono::milliseconds(100),
+                                 chrono::milliseconds(100),
+                                 chrono::milliseconds(100)});
+  }
+
+  // Force 5 seconds to be read.
+  EXPECT_EQ(
+      de + chrono::milliseconds(5000),
+      time_converter.ToDistributedClock(0, me + chrono::milliseconds(5000)));
+  EXPECT_EQ(
+      me + chrono::milliseconds(5000),
+      time_converter.FromDistributedClock(0, de + chrono::milliseconds(5000)));
+
+  // Double check we can read things from before the start
+  EXPECT_EQ(
+      de - chrono::milliseconds(100),
+      time_converter.ToDistributedClock(0, me - chrono::milliseconds(100)));
+  EXPECT_EQ(
+      me - chrono::milliseconds(100),
+      time_converter.FromDistributedClock(0, de - chrono::milliseconds(100)));
+
+  // And at and after the origin.
+  EXPECT_EQ(de, time_converter.ToDistributedClock(0, me));
+  EXPECT_EQ(me, time_converter.FromDistributedClock(0, de));
+
+  EXPECT_EQ(
+      de + chrono::milliseconds(100),
+      time_converter.ToDistributedClock(0, me + chrono::milliseconds(100)));
+  EXPECT_EQ(
+      me + chrono::milliseconds(100),
+      time_converter.FromDistributedClock(0, de + chrono::milliseconds(100)));
+
+  // Force 10.1 seconds now.  This will forget the 0th point at the origin.
+  EXPECT_EQ(
+      de + chrono::milliseconds(10100),
+      time_converter.ToDistributedClock(0, me + chrono::milliseconds(10100)));
+  EXPECT_EQ(
+      me + chrono::milliseconds(10100),
+      time_converter.FromDistributedClock(0, de + chrono::milliseconds(10100)));
+
+  // Yup, can't read the origin anymore.
+  EXPECT_DEATH({ LOG(INFO) << time_converter.ToDistributedClock(0, me); },
+               "forgotten");
+  EXPECT_DEATH({ LOG(INFO) << time_converter.FromDistributedClock(0, de); },
+               "forgotten");
+
+  // But can still read the next point.
+  EXPECT_EQ(
+      de + chrono::milliseconds(100),
+      time_converter.ToDistributedClock(0, me + chrono::milliseconds(100)));
+  EXPECT_EQ(
+      me + chrono::milliseconds(100),
+      time_converter.FromDistributedClock(0, de + chrono::milliseconds(100)));
+}
+
+// Tests unity time with 1 node.
+TEST(InterpolatedTimeConverterTest, SingleNodeTime) {
+  const distributed_clock::time_point de = distributed_clock::epoch();
+  const monotonic_clock::time_point me = monotonic_clock::epoch();
+
+  TestingTimeConverter time_converter(1u);
+  time_converter.AddNextTimestamp(de + chrono::seconds(0),
+                                  {me + chrono::seconds(1)});
+
+  EXPECT_EQ(time_converter.FromDistributedClock(0, de), me);
+  EXPECT_EQ(time_converter.FromDistributedClock(0, de + chrono::seconds(100)),
+            me + chrono::seconds(100));
+
+  EXPECT_TRUE(time_converter.NextTimestamp());
+}
+
 }  // namespace testing
 }  // namespace message_bridge
 }  // namespace aos
diff --git a/aos/network/testing_time_converter.cc b/aos/network/testing_time_converter.cc
new file mode 100644
index 0000000..0dd0cb3
--- /dev/null
+++ b/aos/network/testing_time_converter.cc
@@ -0,0 +1,105 @@
+#include "aos/network/testing_time_converter.h"
+
+#include <chrono>
+#include <deque>
+#include <optional>
+#include <tuple>
+
+#include "aos/events/event_scheduler.h"
+#include "aos/network/multinode_timestamp_filter.h"
+#include "aos/time/time.h"
+
+namespace aos {
+namespace message_bridge {
+
+namespace chrono = std::chrono;
+
+TestingTimeConverter ::TestingTimeConverter(size_t node_count)
+    : InterpolatedTimeConverter(node_count),
+      last_monotonic_(node_count, monotonic_clock::epoch()) {
+  CHECK_GE(node_count, 1u);
+}
+
+TestingTimeConverter::~TestingTimeConverter() {
+  if (at_end_) {
+    CHECK(!NextTimestamp()) << ": At the end but there is more data.";
+  }
+}
+
+void TestingTimeConverter::StartEqual() {
+  CHECK(first_);
+  first_ = false;
+  ts_.emplace_back(std::make_tuple(last_distributed_, last_monotonic_));
+}
+
+chrono::nanoseconds TestingTimeConverter::AddMonotonic(
+    std::vector<monotonic_clock::duration> times) {
+  CHECK_EQ(times.size(), last_monotonic_.size());
+  for (size_t i = 0; i < times.size(); ++i) {
+    CHECK_GT(times[i].count(), 0);
+    last_monotonic_[i] += times[i];
+  }
+  chrono::nanoseconds dt(0);
+  if (!first_) {
+    dt = *std::max_element(times.begin(), times.end());
+    last_distributed_ += dt;
+  } else {
+    first_ = false;
+  }
+  ts_.emplace_back(std::make_tuple(last_distributed_, last_monotonic_));
+  return dt;
+}
+
+chrono::nanoseconds TestingTimeConverter::AddMonotonic(
+    std::vector<monotonic_clock::time_point> times) {
+  CHECK_EQ(times.size(), last_monotonic_.size());
+  chrono::nanoseconds dt(0);
+  if (!first_) {
+    dt = times[0] - last_monotonic_[0];
+    for (size_t i = 0; i < times.size(); ++i) {
+      CHECK_GT(times[i], last_monotonic_[i]);
+      dt = std::max(dt, times[i] - times[0]);
+    }
+    last_distributed_ += dt;
+    last_monotonic_ = times;
+  } else {
+    first_ = false;
+    last_monotonic_ = times;
+  }
+  ts_.emplace_back(std::make_tuple(last_distributed_, std::move(times)));
+  return dt;
+}
+
+void TestingTimeConverter::AddNextTimestamp(
+    distributed_clock::time_point time,
+    std::vector<monotonic_clock::time_point> times) {
+  CHECK_EQ(times.size(), last_monotonic_.size());
+  if (!first_) {
+    CHECK_GT(time, last_distributed_);
+    for (size_t i = 0; i < times.size(); ++i) {
+      CHECK_GT(times[i], last_monotonic_[i]);
+    }
+  } else {
+    first_ = false;
+  }
+  last_distributed_ = time;
+  last_monotonic_ = times;
+
+  ts_.emplace_back(std::make_tuple(time, std::move(times)));
+}
+
+std::optional<std::tuple<distributed_clock::time_point,
+                         std::vector<monotonic_clock::time_point>>>
+TestingTimeConverter::NextTimestamp() {
+  CHECK(!first_) << ": Tried to pull a timestamp before one was added.  This "
+                    "is unlikely to be what you want.";
+  if (ts_.empty()) {
+    return std::nullopt;
+  }
+  auto result = ts_.front();
+  ts_.pop_front();
+  return result;
+}
+
+}  // namespace message_bridge
+}  // namespace aos
diff --git a/aos/network/testing_time_converter.h b/aos/network/testing_time_converter.h
new file mode 100644
index 0000000..6d9fd8f
--- /dev/null
+++ b/aos/network/testing_time_converter.h
@@ -0,0 +1,61 @@
+#ifndef AOS_NETWORK_TESTING_TIME_CONVERTER_H_
+#define AOS_NETWORK_TESTING_TIME_CONVERTER_H_
+
+#include <deque>
+#include <optional>
+#include <tuple>
+
+#include "aos/events/event_scheduler.h"
+#include "aos/network/multinode_timestamp_filter.h"
+#include "aos/time/time.h"
+
+namespace aos {
+namespace message_bridge {
+
+// Simple class to which uses InterpolatedTimeConverter to produce an
+// interpolated timeline.  Should only be used for testing.
+class TestingTimeConverter final : public InterpolatedTimeConverter {
+ public:
+  TestingTimeConverter(size_t node_count);
+
+  virtual ~TestingTimeConverter();
+
+  // Starts all nodes equal to the distributed clock.
+  void StartEqual();
+
+  // Elapses each node's clock by the provided duration, and returns the
+  // duration that the distributed clock elapsed by.
+  std::chrono::nanoseconds AddMonotonic(
+      std::vector<monotonic_clock::duration> times);
+
+  // Sets time on each node's clock to the provided times, and returns the
+  // duration that the distributed clock elapsed by.  Note: time must always go
+  // forwards.
+  std::chrono::nanoseconds AddMonotonic(
+      std::vector<monotonic_clock::time_point> times);
+
+  // Adds a distributed to monotonic clock mapping to the queue.
+  void AddNextTimestamp(distributed_clock::time_point time,
+                        std::vector<monotonic_clock::time_point> times);
+
+  std::optional<std::tuple<distributed_clock::time_point,
+                           std::vector<monotonic_clock::time_point>>>
+  NextTimestamp() override;
+
+ private:
+  // List of timestamps.
+  std::deque<std::tuple<distributed_clock::time_point,
+                        std::vector<monotonic_clock::time_point>>>
+      ts_;
+
+  // True if there is no time queued.
+  bool first_ = true;
+  // The last times returned on all clocks.
+  distributed_clock::time_point last_distributed_ = distributed_clock::epoch();
+  std::vector<monotonic_clock::time_point> last_monotonic_;
+};
+
+}  // namespace message_bridge
+}  // namespace aos
+
+#endif  // AOS_NETWORK_TESTING_TIME_CONVERTER_H_