Add a InterpolatedTimeConverter to wrap time conversion
This gives us an interface that we can use in EventScheduler to query
the current time, and helpers to interpolate time from a list of points.
Change-Id: I44bd5f0f795604c63a29cb1e3f65815b790edb6a
diff --git a/aos/events/event_scheduler.h b/aos/events/event_scheduler.h
index c067048..959caea 100644
--- a/aos/events/event_scheduler.h
+++ b/aos/events/event_scheduler.h
@@ -43,6 +43,23 @@
std::ostream &operator<<(std::ostream &stream,
const aos::distributed_clock::time_point &now);
+// Interface to handle converting time on a node to and from the distributed
+// clock accurately.
+class TimeConverter {
+ public:
+ virtual ~TimeConverter() {}
+
+ // Converts a time to the distributed clock for scheduling and cross-node
+ // time measurement.
+ virtual distributed_clock::time_point ToDistributedClock(
+ size_t node_index, monotonic_clock::time_point time) = 0;
+
+ // Takes the distributed time and converts it to the monotonic clock for this
+ // node.
+ virtual monotonic_clock::time_point FromDistributedClock(
+ size_t node_index, distributed_clock::time_point time) = 0;
+};
+
class EventSchedulerScheduler;
class EventScheduler {
diff --git a/aos/network/BUILD b/aos/network/BUILD
index 6053260..d8fc474 100644
--- a/aos/network/BUILD
+++ b/aos/network/BUILD
@@ -494,6 +494,18 @@
],
)
+cc_library(
+ name = "testing_time_converter",
+ testonly = True,
+ srcs = ["testing_time_converter.cc"],
+ hdrs = ["testing_time_converter.h"],
+ deps = [
+ ":multinode_timestamp_filter",
+ "//aos/events:simulated_event_loop",
+ "//aos/time",
+ ],
+)
+
cc_test(
name = "multinode_timestamp_filter_test",
srcs = [
@@ -502,6 +514,7 @@
target_compatible_with = ["@platforms//os:linux"],
deps = [
":multinode_timestamp_filter",
+ ":testing_time_converter",
":timestamp_filter",
"//aos/testing:googletest",
"@com_github_stevengj_nlopt//:nlopt",
diff --git a/aos/network/multinode_timestamp_filter.cc b/aos/network/multinode_timestamp_filter.cc
index 4e8e656..6a63b0c 100644
--- a/aos/network/multinode_timestamp_filter.cc
+++ b/aos/network/multinode_timestamp_filter.cc
@@ -1,6 +1,7 @@
#include "aos/network/multinode_timestamp_filter.h"
#include <chrono>
+#include <functional>
#include <map>
#include "absl/strings/str_join.h"
@@ -194,6 +195,187 @@
}
}
+void InterpolatedTimeConverter::QueueUntil(
+ std::function<
+ bool(const std::tuple<distributed_clock::time_point,
+ std::vector<monotonic_clock::time_point>> &)>
+ not_done) {
+ while (!at_end_ && (times_.empty() || not_done(times_.back()))) {
+ std::optional<std::tuple<distributed_clock::time_point,
+ std::vector<monotonic_clock::time_point>>>
+ next_time = NextTimestamp();
+ if (!next_time) {
+ VLOG(1) << "Last timestamp, calling it quits";
+ at_end_ = true;
+ break;
+ }
+ VLOG(1) << "Fetched next timestamp while solving.";
+ for (monotonic_clock::time_point t : std::get<1>(*next_time)) {
+ VLOG(1) << " " << t;
+ }
+ CHECK_EQ(node_count_, std::get<1>(*next_time).size());
+ times_.emplace_back(std::move(*next_time));
+ }
+
+ CHECK(!times_.empty())
+ << ": Found no times to do timestamp estimation, please investigate.";
+ // Keep at least 50 points and 10 seconds of time.
+ while (times_.size() > 50 &&
+ std::get<0>(times_.front()) + chrono::seconds(10) <
+ std::get<0>(times_.back())) {
+ times_.pop_front();
+ have_popped_ = true;
+ }
+}
+
+distributed_clock::time_point InterpolatedTimeConverter::ToDistributedClock(
+ size_t node_index, monotonic_clock::time_point time) {
+ CHECK_LT(node_index, node_count_);
+ // If there is only one node, time estimation makes no sense. Just return
+ // unity time.
+ if (node_count_ == 1u) {
+ return distributed_clock::epoch() + time.time_since_epoch();
+ }
+
+ // Make sure there are enough timestamps in the queue.
+ QueueUntil(
+ [time, node_index](
+ const std::tuple<distributed_clock::time_point,
+ std::vector<monotonic_clock::time_point>> &t) {
+ return std::get<1>(t)[node_index] < time;
+ });
+
+ // Before the beginning needs to have 0 slope otherwise time jumps when
+ // timestamp 2 happens.
+ if (times_.size() == 1u || time < std::get<1>(times_[0])[node_index]) {
+ if (time < std::get<1>(times_[0])[node_index]) {
+ CHECK(!have_popped_)
+ << ": Trying to interpolate time " << time
+ << " but we have forgotten the relevant points already.";
+ }
+ const distributed_clock::time_point result =
+ time - std::get<1>(times_[0])[node_index] + std::get<0>(times_[0]);
+ VLOG(2) << "ToDistributedClock(" << node_index << ", " << time << ") -> "
+ << result;
+ return result;
+ }
+
+ // Now, find the corresponding timestamps. Search from the back since that's
+ // where most of the times we care about will be.
+ size_t index = times_.size() - 2u;
+ while (index > 0u) {
+ if (std::get<1>(times_[index])[node_index] <= time) {
+ break;
+ }
+ --index;
+ }
+
+ // Interpolate with the two of these.
+ const distributed_clock::time_point d0 = std::get<0>(times_[index]);
+ const distributed_clock::time_point d1 = std::get<0>(times_[index + 1]);
+
+ const monotonic_clock::time_point t0 = std::get<1>(times_[index])[node_index];
+ const monotonic_clock::time_point t1 =
+ std::get<1>(times_[index + 1])[node_index];
+
+ const chrono::nanoseconds dt = (t1 - t0);
+
+ CHECK_NE(dt.count(), 0u) << " t0 " << t0 << " t1 " << t1 << " d0 " << d0
+ << " d1 " << d1 << " looking up monotonic " << time;
+ // Basic interpolation between 2 points look like
+ // p0.d + (t - p0.t) * (p1.d - p0.d) / (p1.t - p0.t)
+ // This can be multiplied out with integer arithmetic to get exact results.
+ // Since we are using integer arithmetic, we want to round to the nearest, not
+ // towards 0. To do that, we want to add half of the denominator when > 0,
+ // and subtract when < 0 so we round correctly. Multiply before dividing so
+ // we don't round early, and use 128 bit arithmetic to guarantee that 64 bit
+ // multiplication fits.
+ absl::int128 numerator =
+ absl::int128((time - t0).count()) * absl::int128((d1 - d0).count());
+ numerator += numerator > 0 ? absl::int128(dt.count() / 2)
+ : -absl::int128(dt.count() / 2);
+ const distributed_clock::time_point result =
+ d0 + std::chrono::nanoseconds(
+ static_cast<int64_t>(numerator / absl::int128(dt.count())));
+ VLOG(2) << "ToDistributedClock(" << node_index << ", " << time << ") -> "
+ << result;
+ return result;
+}
+
+monotonic_clock::time_point InterpolatedTimeConverter::FromDistributedClock(
+ size_t node_index, distributed_clock::time_point time) {
+ CHECK_LT(node_index, node_count_);
+ // If there is only one node, time estimation makes no sense. Just return
+ // unity time.
+ if (node_count_ == 1u) {
+ return monotonic_clock::epoch() + time.time_since_epoch();
+ }
+
+ // Make sure there are enough timestamps in the queue.
+ QueueUntil(
+ [time](const std::tuple<distributed_clock::time_point,
+ std::vector<monotonic_clock::time_point>> &t) {
+ return std::get<0>(t) < time;
+ });
+
+ if (times_.size() == 1u || time < std::get<0>(times_[0])) {
+ if (time < std::get<0>(times_[0])) {
+ CHECK(!have_popped_)
+ << ": Trying to interpolate time " << time
+ << " but we have forgotten the relevant points already.";
+ }
+ monotonic_clock::time_point result =
+ time - std::get<0>(times_[0]) + std::get<1>(times_[0])[node_index];
+ VLOG(2) << "FromDistributedClock(" << node_index << ", " << time << ") -> "
+ << result;
+ return result;
+ }
+
+ // Now, find the corresponding timestamps. Search from the back since that's
+ // where most of the times we care about will be.
+ size_t index = times_.size() - 2u;
+ while (index > 0u) {
+ if (std::get<0>(times_[index]) <= time) {
+ break;
+ }
+ --index;
+ }
+
+ // Interpolate with the two of these.
+ const distributed_clock::time_point d0 = std::get<0>(times_[index]);
+ const distributed_clock::time_point d1 = std::get<0>(times_[index + 1]);
+
+ const monotonic_clock::time_point t0 = std::get<1>(times_[index])[node_index];
+ const monotonic_clock::time_point t1 =
+ std::get<1>(times_[index + 1])[node_index];
+
+ const chrono::nanoseconds dd = d1 - d0;
+
+ CHECK_NE(dd.count(), 0u) << " t0 " << t0 << " t1 " << t1 << "d0 " << d0
+ << " d1 " << d1 << " looking up distributed "
+ << time;
+
+ // Basic interpolation between 2 points look like
+ // p0.t + (t - p0.d) * (p1.t - p0.t) / (p1.d - p0.d)
+ // This can be multiplied out with integer arithmetic to get exact results.
+ // Since we are using integer arithmetic, we want to round to the nearest, not
+ // towards 0. To do that, we want to add half of the denominator when > 0,
+ // and subtract when < 0 so we round correctly. Multiply before dividing so
+ // we don't round early, and use 128 bit arithmetic to guarantee that 64 bit
+ // multiplication fits.
+ absl::int128 numerator =
+ absl::int128((time - d0).count()) * absl::int128((t1 - t0).count());
+ numerator += numerator > 0 ? absl::int128(dd.count() / 2)
+ : -absl::int128(dd.count() / 2);
+
+ const monotonic_clock::time_point result =
+ t0 + std::chrono::nanoseconds(
+ static_cast<int64_t>(numerator / absl::int128(dd.count())));
+ VLOG(2) << "FromDistributedClock(" << node_index << ", " << time << ") -> "
+ << result;
+ return result;
+}
+
void MultiNodeNoncausalOffsetEstimator::Start(
SimulatedEventLoopFactory *factory) {
for (std::pair<const std::tuple<const Node *, const Node *>,
diff --git a/aos/network/multinode_timestamp_filter.h b/aos/network/multinode_timestamp_filter.h
index 3e00d87..e9c1d08 100644
--- a/aos/network/multinode_timestamp_filter.h
+++ b/aos/network/multinode_timestamp_filter.h
@@ -1,6 +1,7 @@
#ifndef AOS_NETWORK_MULTINODE_TIMESTAMP_FILTER_H_
#define AOS_NETWORK_MULTINODE_TIMESTAMP_FILTER_H_
+#include <functional>
#include <map>
#include <string_view>
@@ -117,6 +118,61 @@
std::vector<std::vector<FilterPair>> filters_;
};
+// Helpers to convert times between the monotonic and distributed clocks for
+// multiple nodes using a list of points and interpolation.
+class InterpolatedTimeConverter : public TimeConverter {
+ public:
+ InterpolatedTimeConverter(size_t node_count) : node_count_(node_count) {}
+
+ virtual ~InterpolatedTimeConverter() {}
+
+ // Converts a time to the distributed clock for scheduling and cross-node
+ // time measurement.
+ distributed_clock::time_point ToDistributedClock(
+ size_t node_index, monotonic_clock::time_point time) override;
+
+ // Takes the distributed time and converts it to the monotonic clock for this
+ // node.
+ monotonic_clock::time_point FromDistributedClock(
+ size_t node_index, distributed_clock::time_point time) override;
+
+ private:
+ // Returns the next timestamp, or nullopt if there isn't one. It is assumed
+ // that if there isn't one, there never will be one.
+ // A timestamp is a sample of the distributed clock and a corresponding point
+ // on every monotonic clock for all the nodes in the factory that this will be
+ // hooked up to.
+ virtual std::optional<std::tuple<distributed_clock::time_point,
+ std::vector<monotonic_clock::time_point>>>
+ NextTimestamp() = 0;
+
+ // Queues timestamps util the last time in the queue matches the provided
+ // function.
+ void QueueUntil(
+ std::function<
+ bool(const std::tuple<distributed_clock::time_point,
+ std::vector<monotonic_clock::time_point>> &)>
+ not_done);
+
+ // The number of nodes to enforce.
+ const size_t node_count_;
+
+ // List of timestamps.
+ std::deque<std::tuple<distributed_clock::time_point,
+ std::vector<monotonic_clock::time_point>>>
+ times_;
+
+ // If true, we have popped data from times_, so anything before the start is
+ // unknown.
+ bool have_popped_ = false;
+
+ protected:
+ // If true, NextTimestamp returned nothing, so don't bother checking again.
+ // (This also enforces that we don't find more time data after calling it
+ // quits.)
+ bool at_end_ = false;
+};
+
// Class to hold a NoncausalOffsetEstimator per pair of communicating nodes, and
// to estimate and set the overall time of all nodes.
class MultiNodeNoncausalOffsetEstimator {
diff --git a/aos/network/multinode_timestamp_filter_test.cc b/aos/network/multinode_timestamp_filter_test.cc
index 28d0183..69f76c5 100644
--- a/aos/network/multinode_timestamp_filter_test.cc
+++ b/aos/network/multinode_timestamp_filter_test.cc
@@ -7,6 +7,7 @@
#include "aos/json_to_flatbuffer.h"
#include "aos/macros.h"
#include "aos/network/multinode_timestamp_filter.h"
+#include "aos/network/testing_time_converter.h"
#include "gtest/gtest.h"
namespace aos {
@@ -133,6 +134,193 @@
}
}
+// Tests that a single timestamp InterpolatedTimeConverter returns equal
+// results. 1 second should be 1 second everywhere.
+TEST(InterpolatedTimeConverterTest, OneTime) {
+ const distributed_clock::time_point de = distributed_clock::epoch();
+ const monotonic_clock::time_point me = monotonic_clock::epoch();
+
+ TestingTimeConverter time_converter(3u);
+ time_converter.AddNextTimestamp(
+ de + chrono::seconds(0),
+ {me + chrono::seconds(1), me + chrono::seconds(10),
+ me + chrono::seconds(1000)});
+
+ EXPECT_EQ(time_converter.FromDistributedClock(0, de - chrono::seconds(1)),
+ me + chrono::seconds(0));
+ EXPECT_EQ(time_converter.FromDistributedClock(1, de - chrono::seconds(1)),
+ me + chrono::seconds(9));
+ EXPECT_EQ(time_converter.FromDistributedClock(2, de - chrono::seconds(1)),
+ me + chrono::seconds(999));
+ EXPECT_EQ(time_converter.ToDistributedClock(0, me + chrono::seconds(0)),
+ de - chrono::seconds(1));
+ EXPECT_EQ(time_converter.ToDistributedClock(1, me + chrono::seconds(9)),
+ de - chrono::seconds(1));
+ EXPECT_EQ(time_converter.ToDistributedClock(2, me + chrono::seconds(999)),
+ de - chrono::seconds(1));
+
+ EXPECT_EQ(time_converter.FromDistributedClock(0, de),
+ me + chrono::seconds(1));
+ EXPECT_EQ(time_converter.FromDistributedClock(1, de),
+ me + chrono::seconds(10));
+ EXPECT_EQ(time_converter.FromDistributedClock(2, de),
+ me + chrono::seconds(1000));
+ EXPECT_EQ(time_converter.ToDistributedClock(0, me + chrono::seconds(1)), de);
+ EXPECT_EQ(time_converter.ToDistributedClock(1, me + chrono::seconds(10)), de);
+ EXPECT_EQ(time_converter.ToDistributedClock(2, me + chrono::seconds(1000)),
+ de);
+}
+
+// Tests that actual interpolation works as expected for multiple timestamps.
+TEST(InterpolatedTimeConverterTest, Interpolation) {
+ const distributed_clock::time_point de = distributed_clock::epoch();
+ const monotonic_clock::time_point me = monotonic_clock::epoch();
+
+ TestingTimeConverter time_converter(3u);
+ // Test that 2 timestamps interpolate correctly.
+ time_converter.AddNextTimestamp(
+ de + chrono::seconds(0),
+ {me + chrono::seconds(1), me + chrono::seconds(10),
+ me + chrono::seconds(1000)});
+ time_converter.AddNextTimestamp(
+ de + chrono::seconds(1),
+ {me + chrono::seconds(2), me + chrono::seconds(11),
+ me + chrono::seconds(1001)});
+
+ EXPECT_EQ(
+ time_converter.FromDistributedClock(0, de + chrono::milliseconds(500)),
+ me + chrono::milliseconds(1500));
+ EXPECT_EQ(
+ time_converter.FromDistributedClock(1, de + chrono::milliseconds(500)),
+ me + chrono::milliseconds(10500));
+ EXPECT_EQ(
+ time_converter.FromDistributedClock(2, de + chrono::milliseconds(500)),
+ me + chrono::milliseconds(1000500));
+ EXPECT_EQ(
+ time_converter.ToDistributedClock(0, me + chrono::milliseconds(1500)),
+ de + chrono::milliseconds(500));
+ EXPECT_EQ(
+ time_converter.ToDistributedClock(1, me + chrono::milliseconds(10500)),
+ de + chrono::milliseconds(500));
+ EXPECT_EQ(
+ time_converter.ToDistributedClock(2, me + chrono::milliseconds(1000500)),
+ de + chrono::milliseconds(500));
+
+ // And that we can interpolate between points not at the start.
+ time_converter.AddNextTimestamp(
+ de + chrono::seconds(2),
+ {me + chrono::seconds(3) - chrono::milliseconds(2),
+ me + chrono::seconds(12) - chrono::milliseconds(2),
+ me + chrono::seconds(1002)});
+
+ time_converter.AddNextTimestamp(
+ de + chrono::seconds(3),
+ {me + chrono::seconds(4) - chrono::milliseconds(4),
+ me + chrono::seconds(13) - chrono::milliseconds(2),
+ me + chrono::seconds(1003) - chrono::milliseconds(2)});
+
+ EXPECT_EQ(
+ time_converter.FromDistributedClock(0, de + chrono::milliseconds(2500)),
+ me + chrono::milliseconds(3497));
+ EXPECT_EQ(
+ time_converter.FromDistributedClock(1, de + chrono::milliseconds(2500)),
+ me + chrono::milliseconds(12498));
+ EXPECT_EQ(
+ time_converter.FromDistributedClock(2, de + chrono::milliseconds(2500)),
+ me + chrono::milliseconds(1002499));
+ EXPECT_EQ(
+ time_converter.ToDistributedClock(0, me + chrono::milliseconds(3497)),
+ de + chrono::milliseconds(2500));
+ EXPECT_EQ(
+ time_converter.ToDistributedClock(1, me + chrono::milliseconds(12498)),
+ de + chrono::milliseconds(2500));
+ EXPECT_EQ(
+ time_converter.ToDistributedClock(2, me + chrono::milliseconds(1002499)),
+ de + chrono::milliseconds(2500));
+}
+
+// Tests that reading times before the start of our interpolation points
+// explodes.
+TEST(InterpolatedTimeConverterDeathTest, ReadLostTime) {
+ const distributed_clock::time_point de = distributed_clock::epoch();
+ const monotonic_clock::time_point me = monotonic_clock::epoch();
+
+ TestingTimeConverter time_converter(3u);
+ time_converter.StartEqual();
+
+ // Test that 2 timestamps interpolate correctly.
+ for (int i = 0; i < 200; ++i) {
+ time_converter.AddMonotonic({chrono::milliseconds(100),
+ chrono::milliseconds(100),
+ chrono::milliseconds(100)});
+ }
+
+ // Force 5 seconds to be read.
+ EXPECT_EQ(
+ de + chrono::milliseconds(5000),
+ time_converter.ToDistributedClock(0, me + chrono::milliseconds(5000)));
+ EXPECT_EQ(
+ me + chrono::milliseconds(5000),
+ time_converter.FromDistributedClock(0, de + chrono::milliseconds(5000)));
+
+ // Double check we can read things from before the start
+ EXPECT_EQ(
+ de - chrono::milliseconds(100),
+ time_converter.ToDistributedClock(0, me - chrono::milliseconds(100)));
+ EXPECT_EQ(
+ me - chrono::milliseconds(100),
+ time_converter.FromDistributedClock(0, de - chrono::milliseconds(100)));
+
+ // And at and after the origin.
+ EXPECT_EQ(de, time_converter.ToDistributedClock(0, me));
+ EXPECT_EQ(me, time_converter.FromDistributedClock(0, de));
+
+ EXPECT_EQ(
+ de + chrono::milliseconds(100),
+ time_converter.ToDistributedClock(0, me + chrono::milliseconds(100)));
+ EXPECT_EQ(
+ me + chrono::milliseconds(100),
+ time_converter.FromDistributedClock(0, de + chrono::milliseconds(100)));
+
+ // Force 10.1 seconds now. This will forget the 0th point at the origin.
+ EXPECT_EQ(
+ de + chrono::milliseconds(10100),
+ time_converter.ToDistributedClock(0, me + chrono::milliseconds(10100)));
+ EXPECT_EQ(
+ me + chrono::milliseconds(10100),
+ time_converter.FromDistributedClock(0, de + chrono::milliseconds(10100)));
+
+ // Yup, can't read the origin anymore.
+ EXPECT_DEATH({ LOG(INFO) << time_converter.ToDistributedClock(0, me); },
+ "forgotten");
+ EXPECT_DEATH({ LOG(INFO) << time_converter.FromDistributedClock(0, de); },
+ "forgotten");
+
+ // But can still read the next point.
+ EXPECT_EQ(
+ de + chrono::milliseconds(100),
+ time_converter.ToDistributedClock(0, me + chrono::milliseconds(100)));
+ EXPECT_EQ(
+ me + chrono::milliseconds(100),
+ time_converter.FromDistributedClock(0, de + chrono::milliseconds(100)));
+}
+
+// Tests unity time with 1 node.
+TEST(InterpolatedTimeConverterTest, SingleNodeTime) {
+ const distributed_clock::time_point de = distributed_clock::epoch();
+ const monotonic_clock::time_point me = monotonic_clock::epoch();
+
+ TestingTimeConverter time_converter(1u);
+ time_converter.AddNextTimestamp(de + chrono::seconds(0),
+ {me + chrono::seconds(1)});
+
+ EXPECT_EQ(time_converter.FromDistributedClock(0, de), me);
+ EXPECT_EQ(time_converter.FromDistributedClock(0, de + chrono::seconds(100)),
+ me + chrono::seconds(100));
+
+ EXPECT_TRUE(time_converter.NextTimestamp());
+}
+
} // namespace testing
} // namespace message_bridge
} // namespace aos
diff --git a/aos/network/testing_time_converter.cc b/aos/network/testing_time_converter.cc
new file mode 100644
index 0000000..0dd0cb3
--- /dev/null
+++ b/aos/network/testing_time_converter.cc
@@ -0,0 +1,105 @@
+#include "aos/network/testing_time_converter.h"
+
+#include <chrono>
+#include <deque>
+#include <optional>
+#include <tuple>
+
+#include "aos/events/event_scheduler.h"
+#include "aos/network/multinode_timestamp_filter.h"
+#include "aos/time/time.h"
+
+namespace aos {
+namespace message_bridge {
+
+namespace chrono = std::chrono;
+
+TestingTimeConverter ::TestingTimeConverter(size_t node_count)
+ : InterpolatedTimeConverter(node_count),
+ last_monotonic_(node_count, monotonic_clock::epoch()) {
+ CHECK_GE(node_count, 1u);
+}
+
+TestingTimeConverter::~TestingTimeConverter() {
+ if (at_end_) {
+ CHECK(!NextTimestamp()) << ": At the end but there is more data.";
+ }
+}
+
+void TestingTimeConverter::StartEqual() {
+ CHECK(first_);
+ first_ = false;
+ ts_.emplace_back(std::make_tuple(last_distributed_, last_monotonic_));
+}
+
+chrono::nanoseconds TestingTimeConverter::AddMonotonic(
+ std::vector<monotonic_clock::duration> times) {
+ CHECK_EQ(times.size(), last_monotonic_.size());
+ for (size_t i = 0; i < times.size(); ++i) {
+ CHECK_GT(times[i].count(), 0);
+ last_monotonic_[i] += times[i];
+ }
+ chrono::nanoseconds dt(0);
+ if (!first_) {
+ dt = *std::max_element(times.begin(), times.end());
+ last_distributed_ += dt;
+ } else {
+ first_ = false;
+ }
+ ts_.emplace_back(std::make_tuple(last_distributed_, last_monotonic_));
+ return dt;
+}
+
+chrono::nanoseconds TestingTimeConverter::AddMonotonic(
+ std::vector<monotonic_clock::time_point> times) {
+ CHECK_EQ(times.size(), last_monotonic_.size());
+ chrono::nanoseconds dt(0);
+ if (!first_) {
+ dt = times[0] - last_monotonic_[0];
+ for (size_t i = 0; i < times.size(); ++i) {
+ CHECK_GT(times[i], last_monotonic_[i]);
+ dt = std::max(dt, times[i] - times[0]);
+ }
+ last_distributed_ += dt;
+ last_monotonic_ = times;
+ } else {
+ first_ = false;
+ last_monotonic_ = times;
+ }
+ ts_.emplace_back(std::make_tuple(last_distributed_, std::move(times)));
+ return dt;
+}
+
+void TestingTimeConverter::AddNextTimestamp(
+ distributed_clock::time_point time,
+ std::vector<monotonic_clock::time_point> times) {
+ CHECK_EQ(times.size(), last_monotonic_.size());
+ if (!first_) {
+ CHECK_GT(time, last_distributed_);
+ for (size_t i = 0; i < times.size(); ++i) {
+ CHECK_GT(times[i], last_monotonic_[i]);
+ }
+ } else {
+ first_ = false;
+ }
+ last_distributed_ = time;
+ last_monotonic_ = times;
+
+ ts_.emplace_back(std::make_tuple(time, std::move(times)));
+}
+
+std::optional<std::tuple<distributed_clock::time_point,
+ std::vector<monotonic_clock::time_point>>>
+TestingTimeConverter::NextTimestamp() {
+ CHECK(!first_) << ": Tried to pull a timestamp before one was added. This "
+ "is unlikely to be what you want.";
+ if (ts_.empty()) {
+ return std::nullopt;
+ }
+ auto result = ts_.front();
+ ts_.pop_front();
+ return result;
+}
+
+} // namespace message_bridge
+} // namespace aos
diff --git a/aos/network/testing_time_converter.h b/aos/network/testing_time_converter.h
new file mode 100644
index 0000000..6d9fd8f
--- /dev/null
+++ b/aos/network/testing_time_converter.h
@@ -0,0 +1,61 @@
+#ifndef AOS_NETWORK_TESTING_TIME_CONVERTER_H_
+#define AOS_NETWORK_TESTING_TIME_CONVERTER_H_
+
+#include <deque>
+#include <optional>
+#include <tuple>
+
+#include "aos/events/event_scheduler.h"
+#include "aos/network/multinode_timestamp_filter.h"
+#include "aos/time/time.h"
+
+namespace aos {
+namespace message_bridge {
+
+// Simple class to which uses InterpolatedTimeConverter to produce an
+// interpolated timeline. Should only be used for testing.
+class TestingTimeConverter final : public InterpolatedTimeConverter {
+ public:
+ TestingTimeConverter(size_t node_count);
+
+ virtual ~TestingTimeConverter();
+
+ // Starts all nodes equal to the distributed clock.
+ void StartEqual();
+
+ // Elapses each node's clock by the provided duration, and returns the
+ // duration that the distributed clock elapsed by.
+ std::chrono::nanoseconds AddMonotonic(
+ std::vector<monotonic_clock::duration> times);
+
+ // Sets time on each node's clock to the provided times, and returns the
+ // duration that the distributed clock elapsed by. Note: time must always go
+ // forwards.
+ std::chrono::nanoseconds AddMonotonic(
+ std::vector<monotonic_clock::time_point> times);
+
+ // Adds a distributed to monotonic clock mapping to the queue.
+ void AddNextTimestamp(distributed_clock::time_point time,
+ std::vector<monotonic_clock::time_point> times);
+
+ std::optional<std::tuple<distributed_clock::time_point,
+ std::vector<monotonic_clock::time_point>>>
+ NextTimestamp() override;
+
+ private:
+ // List of timestamps.
+ std::deque<std::tuple<distributed_clock::time_point,
+ std::vector<monotonic_clock::time_point>>>
+ ts_;
+
+ // True if there is no time queued.
+ bool first_ = true;
+ // The last times returned on all clocks.
+ distributed_clock::time_point last_distributed_ = distributed_clock::epoch();
+ std::vector<monotonic_clock::time_point> last_monotonic_;
+};
+
+} // namespace message_bridge
+} // namespace aos
+
+#endif // AOS_NETWORK_TESTING_TIME_CONVERTER_H_