Pull multi-node timestamp estimation into a class
There is enough code there and enough interactions to justify an
interface. Put it behind an interface to make it much more clear and
easier to enforce what updates when and with what guarentees.
This should provide no functional change.
Change-Id: Idd2c1849c2641df828c26860c61e238f47d87f9e
diff --git a/aos/network/BUILD b/aos/network/BUILD
index 13ba42e..ade8355 100644
--- a/aos/network/BUILD
+++ b/aos/network/BUILD
@@ -450,6 +450,21 @@
],
)
+cc_library(
+ name = "multinode_timestamp_filter",
+ srcs = ["multinode_timestamp_filter.cc"],
+ hdrs = ["multinode_timestamp_filter.h"],
+ target_compatible_with = ["@platforms//os:linux"],
+ deps = [
+ ":timestamp_filter",
+ "//aos:configuration",
+ "//aos/events:simulated_event_loop",
+ "//aos/time",
+ "//third_party/gmp",
+ "@org_tuxfamily_eigen//:eigen",
+ ],
+)
+
cc_test(
name = "timestamp_filter_test",
srcs = [
diff --git a/aos/network/multinode_timestamp_filter.cc b/aos/network/multinode_timestamp_filter.cc
new file mode 100644
index 0000000..4b36287
--- /dev/null
+++ b/aos/network/multinode_timestamp_filter.cc
@@ -0,0 +1,516 @@
+#include "aos/network/multinode_timestamp_filter.h"
+
+#include <map>
+
+#include "aos/configuration.h"
+#include "aos/events/simulated_event_loop.h"
+#include "aos/network/timestamp_filter.h"
+#include "aos/time/time.h"
+#include "glog/logging.h"
+
+DEFINE_bool(timestamps_to_csv, false,
+ "If true, write all the time synchronization information to a set "
+ "of CSV files in /tmp/. This should only be needed when debugging "
+ "time synchronization.");
+
+namespace aos {
+namespace message_bridge {
+namespace {
+Eigen::Matrix<double, Eigen::Dynamic, Eigen::Dynamic> ToDouble(
+ Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic> in) {
+ Eigen::Matrix<double, Eigen::Dynamic, Eigen::Dynamic> result =
+ Eigen::Matrix<double, Eigen::Dynamic, Eigen::Dynamic>::Zero(in.rows(),
+ in.cols());
+ for (int i = 0; i < in.rows(); ++i) {
+ for (int j = 0; j < in.cols(); ++j) {
+ result(i, j) = in(i, j).get_d();
+ }
+ }
+ return result;
+}
+
+std::tuple<Eigen::Matrix<double, Eigen::Dynamic, 1>,
+ Eigen::Matrix<double, Eigen::Dynamic, 1>>
+Solve(const Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic> &mpq_map,
+ const Eigen::Matrix<mpq_class, Eigen::Dynamic, 1> &mpq_offsets) {
+ aos::monotonic_clock::time_point start_time = aos::monotonic_clock::now();
+ // Least squares solve for the slopes and offsets.
+ const Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic> inv =
+ (mpq_map.transpose() * mpq_map).inverse() * mpq_map.transpose();
+ aos::monotonic_clock::time_point end_time = aos::monotonic_clock::now();
+
+ VLOG(1) << "Took "
+ << std::chrono::duration<double>(end_time - start_time).count()
+ << " seconds to invert";
+
+ Eigen::Matrix<mpq_class, Eigen::Dynamic, 1> mpq_solution_slope =
+ inv.block(0, 0, inv.rows(), 1);
+ Eigen::Matrix<mpq_class, Eigen::Dynamic, 1> mpq_solution_offset =
+ inv.block(0, 1, inv.rows(), inv.cols() - 1) *
+ mpq_offsets.block(1, 0, inv.rows() - 1, 1);
+
+ mpq_solution_offset *= mpq_class(1, 1000000000);
+
+ return std::make_tuple(ToDouble(mpq_solution_slope),
+ ToDouble(mpq_solution_offset));
+}
+} // namespace
+
+void MultiNodeNoncausalOffsetEstimator::Start(
+ SimulatedEventLoopFactory *factory) {
+ for (std::pair<const std::tuple<const Node *, const Node *>,
+ std::tuple<message_bridge::NoncausalOffsetEstimator>> &filter :
+ filters_) {
+ const Node *const node_a = std::get<0>(filter.first);
+ const Node *const node_b = std::get<1>(filter.first);
+
+ std::get<0>(filter.second)
+ .SetFirstFwdTime(
+ factory->GetNodeEventLoopFactory(node_a)->monotonic_now());
+ std::get<0>(filter.second)
+ .SetFirstRevTime(
+ factory->GetNodeEventLoopFactory(node_b)->monotonic_now());
+ }
+}
+
+message_bridge::NoncausalOffsetEstimator *
+MultiNodeNoncausalOffsetEstimator::GetFilter(const Node *node_a,
+ const Node *node_b) {
+ CHECK_NE(node_a, node_b);
+ CHECK_EQ(configuration::GetNode(configuration(), node_a), node_a);
+ CHECK_EQ(configuration::GetNode(configuration(), node_b), node_b);
+
+ if (node_a > node_b) {
+ return GetFilter(node_b, node_a);
+ }
+
+ auto tuple = std::make_tuple(node_a, node_b);
+
+ auto it = filters_.find(tuple);
+
+ if (it == filters_.end()) {
+ auto &x =
+ filters_
+ .insert(std::make_pair(
+ tuple, std::make_tuple(message_bridge::NoncausalOffsetEstimator(
+ node_a, node_b))))
+ .first->second;
+ if (FLAGS_timestamps_to_csv) {
+ std::get<0>(x).SetFwdCsvFileName(absl::StrCat(
+ "/tmp/timestamp_noncausal_", node_a->name()->string_view(), "_",
+ node_b->name()->string_view()));
+ std::get<0>(x).SetRevCsvFileName(absl::StrCat(
+ "/tmp/timestamp_noncausal_", node_b->name()->string_view(), "_",
+ node_a->name()->string_view()));
+ }
+
+ return &std::get<0>(x);
+ } else {
+ return &std::get<0>(it->second);
+ }
+}
+
+void MultiNodeNoncausalOffsetEstimator::LogFit(std::string_view prefix) {
+ for (size_t node_index = 0; node_index < nodes_count(); ++node_index) {
+ const Node *node = configuration::GetNode(configuration(), node_index);
+ VLOG(1)
+ << node->name()->string_view() << " now "
+ << event_loop_factory_->GetNodeEventLoopFactory(node)->monotonic_now()
+ << " distributed " << event_loop_factory_->distributed_now();
+ }
+
+ for (std::pair<const std::tuple<const Node *, const Node *>,
+ std::tuple<message_bridge::NoncausalOffsetEstimator>> &filter :
+ filters_) {
+ message_bridge::NoncausalOffsetEstimator *estimator =
+ &std::get<0>(filter.second);
+
+ if (estimator->a_timestamps().size() == 0 &&
+ estimator->b_timestamps().size() == 0) {
+ continue;
+ }
+
+ if (VLOG_IS_ON(1)) {
+ estimator->LogFit(prefix);
+ }
+
+ const Node *const node_a = std::get<0>(filter.first);
+ const Node *const node_b = std::get<1>(filter.first);
+
+ const size_t node_a_index =
+ configuration::GetNodeIndex(configuration(), node_a);
+ const size_t node_b_index =
+ configuration::GetNodeIndex(configuration(), node_b);
+
+ NodeEventLoopFactory *node_a_factory =
+ event_loop_factory_->GetNodeEventLoopFactory(node_a);
+ NodeEventLoopFactory *node_b_factory =
+ event_loop_factory_->GetNodeEventLoopFactory(node_b);
+
+ const double recovered_slope =
+ slope(node_b_index) / slope(node_a_index) - 1.0;
+ const int64_t recovered_offset =
+ offset(node_b_index).count() - offset(node_a_index).count() *
+ slope(node_b_index) /
+ slope(node_a_index);
+
+ VLOG(2) << "Recovered slope " << std::setprecision(20) << recovered_slope
+ << " (error " << recovered_slope - estimator->fit().slope() << ") "
+ << " offset " << std::setprecision(20) << recovered_offset
+ << " (error "
+ << recovered_offset - estimator->fit().offset().count() << ")";
+
+ const aos::distributed_clock::time_point a0 =
+ node_a_factory->ToDistributedClock(
+ std::get<0>(estimator->a_timestamps()[0]));
+ const aos::distributed_clock::time_point a1 =
+ node_a_factory->ToDistributedClock(
+ std::get<0>(estimator->a_timestamps()[1]));
+
+ VLOG(2) << node_a->name()->string_view() << " timestamps()[0] = "
+ << std::get<0>(estimator->a_timestamps()[0]) << " -> " << a0
+ << " distributed -> " << node_b->name()->string_view() << " "
+ << node_b_factory->FromDistributedClock(a0) << " should be "
+ << aos::monotonic_clock::time_point(
+ std::chrono::nanoseconds(static_cast<int64_t>(
+ std::get<0>(estimator->a_timestamps()[0])
+ .time_since_epoch()
+ .count() *
+ (1.0 + estimator->fit().slope()))) +
+ estimator->fit().offset())
+ << ((a0 <= event_loop_factory_->distributed_now())
+ ? ""
+ : " After now, investigate");
+ VLOG(2) << node_a->name()->string_view() << " timestamps()[1] = "
+ << std::get<0>(estimator->a_timestamps()[1]) << " -> " << a1
+ << " distributed -> " << node_b->name()->string_view() << " "
+ << node_b_factory->FromDistributedClock(a1) << " should be "
+ << aos::monotonic_clock::time_point(
+ std::chrono::nanoseconds(static_cast<int64_t>(
+ std::get<0>(estimator->a_timestamps()[1])
+ .time_since_epoch()
+ .count() *
+ (1.0 + estimator->fit().slope()))) +
+ estimator->fit().offset())
+ << ((event_loop_factory_->distributed_now() <= a1)
+ ? ""
+ : " Before now, investigate");
+
+ const aos::distributed_clock::time_point b0 =
+ node_b_factory->ToDistributedClock(
+ std::get<0>(estimator->b_timestamps()[0]));
+ const aos::distributed_clock::time_point b1 =
+ node_b_factory->ToDistributedClock(
+ std::get<0>(estimator->b_timestamps()[1]));
+
+ VLOG(2) << node_b->name()->string_view() << " timestamps()[0] = "
+ << std::get<0>(estimator->b_timestamps()[0]) << " -> " << b0
+ << " distributed -> " << node_a->name()->string_view() << " "
+ << node_a_factory->FromDistributedClock(b0)
+ << ((b0 <= event_loop_factory_->distributed_now())
+ ? ""
+ : " After now, investigate");
+ VLOG(2) << node_b->name()->string_view() << " timestamps()[1] = "
+ << std::get<0>(estimator->b_timestamps()[1]) << " -> " << b1
+ << " distributed -> " << node_a->name()->string_view() << " "
+ << node_a_factory->FromDistributedClock(b1)
+ << ((event_loop_factory_->distributed_now() <= b1)
+ ? ""
+ : " Before now, investigate");
+ }
+}
+
+void MultiNodeNoncausalOffsetEstimator::UpdateOffsets() {
+ VLOG(2) << "Samples are " << offset_matrix_;
+ VLOG(2) << "Map is " << (map_matrix_ + slope_matrix_);
+ std::tie(time_slope_matrix_, time_offset_matrix_) = SolveOffsets();
+ Eigen::IOFormat HeavyFmt(Eigen::FullPrecision, 0, ", ", ";\n", "[", "]", "[",
+ "]");
+ VLOG(1) << "First slope " << time_slope_matrix_.transpose().format(HeavyFmt)
+ << " offset " << time_offset_matrix_.transpose().format(HeavyFmt);
+
+ for (size_t node_index = 0; node_index < nodes_count(); ++node_index) {
+ const Node *node = configuration::GetNode(configuration(), node_index);
+ event_loop_factory_->GetNodeEventLoopFactory(node)->SetDistributedOffset(
+ offset(node_index), slope(node_index));
+
+ VLOG(1) << "Offset for node " << node_index << " "
+ << node->name()->string_view() << " is "
+ << aos::distributed_clock::time_point(offset(node_index))
+ << " slope " << std::setprecision(9) << std::fixed
+ << slope(node_index);
+ }
+}
+
+void MultiNodeNoncausalOffsetEstimator::Initialize(
+ const Configuration *logged_configuration) {
+ logged_configuration_ = logged_configuration;
+ // We need to now seed our per-node time offsets and get everything set up
+ // to run.
+ const size_t num_nodes = nodes_count();
+
+ // It is easiest to solve for per node offsets with a matrix rather than
+ // trying to solve the equations by hand. So let's get after it.
+ //
+ // Now, build up the map matrix.
+ //
+ // offset_matrix_ = (map_matrix_ + slope_matrix_) * [ta; tb; tc]
+ map_matrix_ = Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic>::Zero(
+ filters_.size() + 1, num_nodes);
+ slope_matrix_ =
+ Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic>::Zero(
+ filters_.size() + 1, num_nodes);
+
+ offset_matrix_ =
+ Eigen::Matrix<mpq_class, Eigen::Dynamic, 1>::Zero(filters_.size() + 1);
+ valid_matrix_ =
+ Eigen::Matrix<bool, Eigen::Dynamic, 1>::Zero(filters_.size() + 1);
+ last_valid_matrix_ =
+ Eigen::Matrix<bool, Eigen::Dynamic, 1>::Zero(filters_.size() + 1);
+
+ time_offset_matrix_ = Eigen::VectorXd::Zero(num_nodes);
+ time_slope_matrix_ = Eigen::VectorXd::Zero(num_nodes);
+
+ // All times should average out to the distributed clock.
+ for (int i = 0; i < map_matrix_.cols(); ++i) {
+ // 1/num_nodes.
+ map_matrix_(0, i) = mpq_class(1, num_nodes);
+ }
+ valid_matrix_(0) = true;
+
+ {
+ // Now, add the a - b -> sample elements.
+ size_t i = 1;
+ for (std::pair<const std::tuple<const Node *, const Node *>,
+ std::tuple<message_bridge::NoncausalOffsetEstimator>>
+ &filter : filters_) {
+ const Node *const node_a = std::get<0>(filter.first);
+ const Node *const node_b = std::get<1>(filter.first);
+
+ const size_t node_a_index =
+ configuration::GetNodeIndex(configuration(), node_a);
+ const size_t node_b_index =
+ configuration::GetNodeIndex(configuration(), node_b);
+
+ // -a
+ map_matrix_(i, node_a_index) = mpq_class(-1);
+ // +b
+ map_matrix_(i, node_b_index) = mpq_class(1);
+
+ // -> sample
+ std::get<0>(filter.second)
+ .set_slope_pointer(&slope_matrix_(i, node_a_index));
+ std::get<0>(filter.second).set_offset_pointer(&offset_matrix_(i, 0));
+
+ valid_matrix_(i) = false;
+ std::get<0>(filter.second).set_valid_pointer(&valid_matrix_(i));
+
+ ++i;
+ }
+ }
+
+ const size_t connected_nodes = ConnectedNodes();
+
+ // We don't need to support isolated nodes until someone has a real use
+ // case.
+ CHECK_EQ(connected_nodes, num_nodes)
+ << ": There is a node which isn't communicating with the rest.";
+}
+
+std::tuple<Eigen::Matrix<double, Eigen::Dynamic, 1>,
+ Eigen::Matrix<double, Eigen::Dynamic, 1>>
+MultiNodeNoncausalOffsetEstimator::SolveOffsets() {
+ // TODO(austin): Split this out and unit tests a bit better. When we do
+ // partial node subsets and also try to optimize this again would be a good
+ // time.
+ //
+ // TODO(austin): CHECK that the number doesn't change over time. We can freak
+ // out if that happens.
+
+ // Start by counting how many node pairs we have an offset estimated for.
+ int nonzero_offset_count = 1;
+ for (int i = 1; i < valid_matrix_.rows(); ++i) {
+ if (valid_matrix_(i) != 0) {
+ ++nonzero_offset_count;
+ }
+ }
+
+ Eigen::IOFormat HeavyFmt(Eigen::FullPrecision, 0, ", ", ";\n", "[", "]", "[",
+ "]");
+
+ // If there are missing rows, we can't solve the original problem and instead
+ // need to filter the matrix to remove the missing rows and solve a simplified
+ // problem. What this means practically is that we might have pairs of nodes
+ // which are communicating, but we don't have timestamps between. But we can
+ // have multiple paths in our graph between 2 nodes, so we can still solve
+ // time without the missing timestamp.
+ //
+ // In the following example, we can drop any of the last 3 rows, and still
+ // solve.
+ //
+ // [1/3 1/3 1/3 ] [ta] [t_distributed]
+ // [ 1 -1-m1 0 ] [tb] = [oab]
+ // [ 1 0 -1-m2 ] [tc] [oac]
+ // [ 0 1 -1-m2 ] [obc]
+ if (nonzero_offset_count != offset_matrix_.rows()) {
+ Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic> mpq_map =
+ Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic>::Zero(
+ nonzero_offset_count, map_matrix_.cols());
+ Eigen::Matrix<mpq_class, Eigen::Dynamic, 1> mpq_offsets =
+ Eigen::Matrix<mpq_class, Eigen::Dynamic, 1>::Zero(nonzero_offset_count);
+
+ std::vector<bool> valid_nodes(nodes_count(), false);
+
+ size_t destination_row = 0;
+ for (int j = 0; j < map_matrix_.cols(); ++j) {
+ mpq_map(0, j) = mpq_class(1, map_matrix_.cols());
+ }
+ mpq_offsets(0) = mpq_class(0);
+ ++destination_row;
+
+ for (int i = 1; i < offset_matrix_.rows(); ++i) {
+ // Copy over the first row, i.e. the row which says that all times average
+ // to the distributed time. And then copy over all valid rows.
+ if (valid_matrix_(i)) {
+ mpq_offsets(destination_row) = mpq_class(offset_matrix_(i));
+
+ for (int j = 0; j < map_matrix_.cols(); ++j) {
+ mpq_map(destination_row, j) = map_matrix_(i, j) + slope_matrix_(i, j);
+ if (mpq_map(destination_row, j) != 0) {
+ valid_nodes[j] = true;
+ }
+ }
+
+ ++destination_row;
+ }
+ }
+
+ VLOG(1) << "Filtered map " << ToDouble(mpq_map).format(HeavyFmt);
+ VLOG(1) << "Filtered offsets " << ToDouble(mpq_offsets).format(HeavyFmt);
+
+ // Compute (and cache) the current connectivity. If we have N nodes
+ // configured, but logs only from one of them, we want to assume that the
+ // rest of the nodes match the distributed clock exactly.
+ //
+ // If data shows up later for them, we will CHECK when time jumps.
+ //
+ // TODO(austin): Once we have more info on what cases are reasonable, we can
+ // open up the restrictions.
+ if (valid_matrix_ != last_valid_matrix_) {
+ Eigen::FullPivLU<Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic>>
+ full_piv(mpq_map);
+ const size_t connected_nodes = full_piv.rank();
+
+ size_t valid_node_count = 0;
+ for (size_t i = 0; i < valid_nodes.size(); ++i) {
+ const bool valid_node = valid_nodes[i];
+ if (valid_node) {
+ ++valid_node_count;
+ } else {
+ LOG(WARNING)
+ << "Node "
+ << logged_configuration()->nodes()->Get(i)->name()->string_view()
+ << " has no observations, setting to distributed clock.";
+ }
+ }
+
+ // Confirm that the set of nodes we have connected matches the rank.
+ // Otherwise a<->b and c<->d would count as 4 but really is 3.
+ CHECK_EQ(std::max(static_cast<size_t>(1u), valid_node_count),
+ connected_nodes)
+ << ": Ambiguous nodes.";
+
+ last_valid_matrix_ = valid_matrix_;
+ cached_valid_node_count_ = valid_node_count;
+ }
+
+ // There are 2 cases. Either all the nodes are connected with each other by
+ // actual data, or we have isolated nodes. We want to force the isolated
+ // nodes to match the distributed clock exactly, and to solve for the other
+ // nodes.
+ if (cached_valid_node_count_ == 0) {
+ // Cheat. If there are no valid nodes, the slopes are 1, and offset is 0,
+ // ie, just be the distributed clock.
+ return std::make_tuple(
+ Eigen::Matrix<double, Eigen::Dynamic, 1>::Ones(nodes_count()),
+ Eigen::Matrix<double, Eigen::Dynamic, 1>::Zero(nodes_count()));
+ } else if (cached_valid_node_count_ == nodes_count()) {
+ return Solve(mpq_map, mpq_offsets);
+ } else {
+ // Strip out any columns (nodes) which aren't relevant. Solve the
+ // simplified problem, then set any nodes which were missing back to slope
+ // 1, offset 0 (ie the distributed clock).
+ Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic>
+ valid_node_mpq_map =
+ Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic>::Zero(
+ nonzero_offset_count, cached_valid_node_count_);
+
+ {
+ // Only copy over the columns with valid nodes in them.
+ size_t column = 0;
+ for (size_t i = 0; i < valid_nodes.size(); ++i) {
+ if (valid_nodes[i]) {
+ valid_node_mpq_map.col(column) = mpq_map.col(i);
+
+ ++column;
+ }
+ }
+ // The 1/n needs to be based on the number of nodes being solved.
+ // Recreate it here.
+ for (int j = 0; j < valid_node_mpq_map.cols(); ++j) {
+ valid_node_mpq_map(0, j) = mpq_class(1, cached_valid_node_count_);
+ }
+ }
+
+ VLOG(1) << "Reduced node filtered map "
+ << ToDouble(valid_node_mpq_map).format(HeavyFmt);
+ VLOG(1) << "Reduced node filtered offsets "
+ << ToDouble(mpq_offsets).format(HeavyFmt);
+
+ // Solve the simplified problem now.
+ std::tuple<Eigen::Matrix<double, Eigen::Dynamic, 1>,
+ Eigen::Matrix<double, Eigen::Dynamic, 1>>
+ valid_result = Solve(valid_node_mpq_map, mpq_offsets);
+
+ // And expand the results back into a solution matrix.
+ std::tuple<Eigen::Matrix<double, Eigen::Dynamic, 1>,
+ Eigen::Matrix<double, Eigen::Dynamic, 1>>
+ result = std::make_tuple(
+ Eigen::Matrix<double, Eigen::Dynamic, 1>::Ones(nodes_count()),
+ Eigen::Matrix<double, Eigen::Dynamic, 1>::Zero(nodes_count()));
+
+ {
+ size_t column = 0;
+ for (size_t i = 0; i < valid_nodes.size(); ++i) {
+ if (valid_nodes[i]) {
+ std::get<0>(result)(i) = std::get<0>(valid_result)(column);
+ std::get<1>(result)(i) = std::get<1>(valid_result)(column);
+
+ ++column;
+ }
+ }
+ }
+
+ return result;
+ }
+ } else {
+ const Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic> mpq_map =
+ map_matrix_ + slope_matrix_;
+ VLOG(1) << "map " << (map_matrix_ + slope_matrix_).format(HeavyFmt);
+ VLOG(1) << "offsets " << offset_matrix_.format(HeavyFmt);
+
+ return Solve(mpq_map, offset_matrix_);
+ }
+}
+
+size_t MultiNodeNoncausalOffsetEstimator::ConnectedNodes() {
+ // Rank of the map matrix tells you if all the nodes are in communication
+ // with each other, which tells you if the offsets are observable.
+ return Eigen::FullPivLU<
+ Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic>>(
+ map_matrix_)
+ .rank();
+}
+
+} // namespace message_bridge
+} // namespace aos
diff --git a/aos/network/multinode_timestamp_filter.h b/aos/network/multinode_timestamp_filter.h
new file mode 100644
index 0000000..0954448
--- /dev/null
+++ b/aos/network/multinode_timestamp_filter.h
@@ -0,0 +1,149 @@
+#ifndef AOS_NETWORK_MULTINODE_TIMESTAMP_FILTER_H_
+#define AOS_NETWORK_MULTINODE_TIMESTAMP_FILTER_H_
+
+#include <map>
+#include <string_view>
+
+#include "Eigen/Dense"
+#include "aos/configuration.h"
+#include "aos/events/simulated_event_loop.h"
+#include "aos/network/timestamp_filter.h"
+#include "aos/time/time.h"
+#include "third_party/gmp/gmpxx.h"
+
+namespace aos {
+namespace message_bridge {
+
+// Class to hold a NoncausalOffsetEstimator per pair of communicating nodes, and
+// to estimate and set the overall time of all nodes.
+class MultiNodeNoncausalOffsetEstimator {
+ public:
+ MultiNodeNoncausalOffsetEstimator(
+ SimulatedEventLoopFactory *event_loop_factory)
+ : event_loop_factory_(event_loop_factory) {}
+
+ // Constructs all the matricies. Needs to be called after the log files have
+ // been opened and all the filters have been created.
+ void Initialize(const Configuration *logged_configuration);
+
+ // Returns the filter for a pair of nodes. The same filter will be returned
+ // for a pair of nodes, regardless of argument order.
+ message_bridge::NoncausalOffsetEstimator *GetFilter(const Node *node_a,
+ const Node *node_b);
+
+ // Prints out debug information about the line segments for each node.
+ void LogFit(std::string_view prefix);
+
+ // Captures the start time.
+ void Start(SimulatedEventLoopFactory *factory);
+
+ // Returns [ta; tb; ...] = tuple[0] * t + tuple[1]
+ std::tuple<Eigen::Matrix<double, Eigen::Dynamic, 1>,
+ Eigen::Matrix<double, Eigen::Dynamic, 1>>
+ SolveOffsets();
+
+ // Returns the number of nodes.
+ size_t nodes_count() const {
+ return !configuration::MultiNode(logged_configuration())
+ ? 1u
+ : logged_configuration()->nodes()->size();
+ }
+
+ // Returns the offset from the monotonic clock for a node to the distributed
+ // clock. monotonic = distributed * slope() + offset();
+ double slope(int node_index) const {
+ CHECK_LT(node_index, time_slope_matrix_.rows())
+ << ": Got too high of a node index.";
+ return time_slope_matrix_(node_index);
+ }
+ std::chrono::nanoseconds offset(int node_index) const {
+ CHECK_LT(node_index, time_offset_matrix_.rows())
+ << ": Got too high of a node index.";
+ return std::chrono::duration_cast<std::chrono::nanoseconds>(
+ std::chrono::duration<double>(time_offset_matrix_(node_index)));
+ }
+
+ // Returns the configuration that was logged.
+ const aos::Configuration *logged_configuration() const {
+ return logged_configuration_;
+ }
+
+ // Returns the configuration that we are replaying into.
+ const aos::Configuration *configuration() const {
+ return event_loop_factory_->configuration();
+ }
+
+ // Returns the number of nodes connected to each other.
+ size_t ConnectedNodes();
+
+ // Recomputes the offsets and sets them on event_loop_factory_.
+ void UpdateOffsets();
+
+ private:
+ SimulatedEventLoopFactory *event_loop_factory_;
+ const Configuration *logged_configuration_;
+
+ // List of filters for a connection. The pointer to the first node will be
+ // less than the second node.
+ std::map<std::tuple<const Node *, const Node *>,
+ std::tuple<NoncausalOffsetEstimator>>
+ filters_;
+
+ // We have 2 types of equations to do a least squares regression over to fully
+ // constrain our time function.
+ //
+ // One is simple. The distributed clock is the average of all the clocks.
+ // (ta + tb + tc + td) / num_nodes = t_distributed
+ //
+ // The second is a bit more complicated. Our basic time conversion function
+ // is:
+ // tb = ta + (ta * slope + offset)
+ // We can rewrite this as follows
+ // tb - (1 + slope) * ta = offset
+ //
+ // From here, we have enough equations to solve for t{a,b,c,...} We want to
+ // take as an input the offsets and slope, and solve for the per-node times as
+ // a function of the distributed clock.
+ //
+ // We need to massage our equations to make this work. If we solve for the
+ // per-node times at two set distributed clock times, we will be able to
+ // recreate the linear function (we know it is linear). We can do a similar
+ // thing by breaking our equation up into:
+ //
+ // [1/3 1/3 1/3 ] [ta] [t_distributed]
+ // [ 1 -1-m1 0 ] [tb] = [oab]
+ // [ 1 0 -1-m2 ] [tc] [oac]
+ //
+ // This solves to:
+ //
+ // [ta] [ a00 a01 a02] [t_distributed]
+ // [tb] = [ a10 a11 a12] * [oab]
+ // [tc] [ a20 a21 a22] [oac]
+ //
+ // and can be split into:
+ //
+ // [ta] [ a00 ] [a01 a02]
+ // [tb] = [ a10 ] * t_distributed + [a11 a12] * [oab]
+ // [tc] [ a20 ] [a21 a22] [oac]
+ //
+ // (map_matrix_ + slope_matrix_) * [ta; tb; tc] = [offset_matrix_];
+ // offset_matrix_ will be in nanoseconds.
+ Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic> map_matrix_;
+ Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic> slope_matrix_;
+ Eigen::Matrix<mpq_class, Eigen::Dynamic, 1> offset_matrix_;
+ // Matrix tracking which offsets are valid.
+ Eigen::Matrix<bool, Eigen::Dynamic, 1> valid_matrix_;
+ // Matrix tracking the last valid matrix we used to determine connected nodes.
+ Eigen::Matrix<bool, Eigen::Dynamic, 1> last_valid_matrix_;
+ size_t cached_valid_node_count_ = 0;
+
+ // [ta; tb; tc] = time_slope_matrix_ * t + time_offset_matrix;
+ // t is in seconds.
+ Eigen::Matrix<double, Eigen::Dynamic, 1> time_slope_matrix_;
+ Eigen::Matrix<double, Eigen::Dynamic, 1> time_offset_matrix_;
+};
+
+} // namespace message_bridge
+} // namespace aos
+
+#endif // AOS_NETWORK_MULTINODE_TIMESTAMP_FILTER_H_
diff --git a/aos/network/timestamp_filter.cc b/aos/network/timestamp_filter.cc
index 116607c..bb18095 100644
--- a/aos/network/timestamp_filter.cc
+++ b/aos/network/timestamp_filter.cc
@@ -699,40 +699,72 @@
}
void NoncausalOffsetEstimator::LogFit(std::string_view prefix) {
- LOG(INFO)
- << prefix << " " << node_a_->name()->string_view() << " from "
- << node_b_->name()->string_view() << " slope " << std::setprecision(20)
- << fit_.slope() << " offset " << fit_.offset().count() << " a [("
- << std::get<0>(a_.timestamps()[0]) << " -> "
- << std::get<1>(a_.timestamps()[0]).count() << "ns), ("
- << std::get<0>(a_.timestamps()[1]) << " -> "
- << std::get<1>(a_.timestamps()[1]).count() << "ns) => {dt: " << std::fixed
- << std::setprecision(6)
- << std::chrono::duration<double, std::milli>(
- std::get<0>(a_.timestamps()[1]) - std::get<0>(a_.timestamps()[0]))
- .count()
- << "ms, do: " << std::fixed << std::setprecision(6)
- << std::chrono::duration<double, std::milli>(
- std::get<1>(a_.timestamps()[1]) - std::get<1>(a_.timestamps()[0]))
- .count()
- << "ms}]";
- LOG(INFO)
- << prefix << " " << node_a_->name()->string_view() << " from "
- << node_b_->name()->string_view() << " slope " << std::setprecision(20)
- << fit_.slope() << " offset " << fit_.offset().count() << " b [("
- << std::get<0>(b_.timestamps()[0]) << " -> "
- << std::get<1>(b_.timestamps()[0]).count() << "ns), ("
- << std::get<0>(b_.timestamps()[1]) << " -> "
- << std::get<1>(b_.timestamps()[1]).count() << "ns) => {dt: " << std::fixed
- << std::setprecision(6)
- << std::chrono::duration<double, std::milli>(
- std::get<0>(b_.timestamps()[1]) - std::get<0>(b_.timestamps()[0]))
- .count()
- << "ms, do: " << std::fixed << std::setprecision(6)
- << std::chrono::duration<double, std::milli>(
- std::get<1>(b_.timestamps()[1]) - std::get<1>(b_.timestamps()[0]))
- .count()
- << "ms}]";
+ if (a_.timestamps().size() >= 2u) {
+ LOG(INFO) << prefix << " " << node_a_->name()->string_view() << " from "
+ << node_b_->name()->string_view() << " slope "
+ << std::setprecision(20) << fit_.slope() << " offset "
+ << fit_.offset().count() << " a [("
+ << std::get<0>(a_.timestamps()[0]) << " -> "
+ << std::get<1>(a_.timestamps()[0]).count() << "ns), ("
+ << std::get<0>(a_.timestamps()[1]) << " -> "
+ << std::get<1>(a_.timestamps()[1]).count()
+ << "ns) => {dt: " << std::fixed << std::setprecision(6)
+ << std::chrono::duration<double, std::milli>(
+ std::get<0>(a_.timestamps()[1]) -
+ std::get<0>(a_.timestamps()[0]))
+ .count()
+ << "ms, do: " << std::fixed << std::setprecision(6)
+ << std::chrono::duration<double, std::milli>(
+ std::get<1>(a_.timestamps()[1]) -
+ std::get<1>(a_.timestamps()[0]))
+ .count()
+ << "ms}]";
+ } else if (a_.timestamps().size() == 1u) {
+ LOG(INFO) << prefix << " " << node_a_->name()->string_view() << " from "
+ << node_b_->name()->string_view() << " slope "
+ << std::setprecision(20) << fit_.slope() << " offset "
+ << fit_.offset().count() << " a [("
+ << std::get<0>(a_.timestamps()[0]) << " -> "
+ << std::get<1>(a_.timestamps()[0]).count() << "ns)";
+ } else {
+ LOG(INFO) << prefix << " " << node_a_->name()->string_view() << " from "
+ << node_b_->name()->string_view() << " slope "
+ << std::setprecision(20) << fit_.slope() << " offset "
+ << fit_.offset().count() << " no samples.";
+ }
+ if (b_.timestamps().size() >= 2u) {
+ LOG(INFO) << prefix << " " << node_a_->name()->string_view() << " from "
+ << node_b_->name()->string_view() << " slope "
+ << std::setprecision(20) << fit_.slope() << " offset "
+ << fit_.offset().count() << " b [("
+ << std::get<0>(b_.timestamps()[0]) << " -> "
+ << std::get<1>(b_.timestamps()[0]).count() << "ns), ("
+ << std::get<0>(b_.timestamps()[1]) << " -> "
+ << std::get<1>(b_.timestamps()[1]).count()
+ << "ns) => {dt: " << std::fixed << std::setprecision(6)
+ << std::chrono::duration<double, std::milli>(
+ std::get<0>(b_.timestamps()[1]) -
+ std::get<0>(b_.timestamps()[0]))
+ .count()
+ << "ms, do: " << std::fixed << std::setprecision(6)
+ << std::chrono::duration<double, std::milli>(
+ std::get<1>(b_.timestamps()[1]) -
+ std::get<1>(b_.timestamps()[0]))
+ .count()
+ << "ms}]";
+ } else if (b_.timestamps().size() == 1u) {
+ LOG(INFO) << prefix << " " << node_b_->name()->string_view() << " from "
+ << node_a_->name()->string_view() << " slope "
+ << std::setprecision(20) << fit_.slope() << " offset "
+ << fit_.offset().count() << " b [("
+ << std::get<0>(b_.timestamps()[0]) << " -> "
+ << std::get<1>(b_.timestamps()[0]).count() << "ns)";
+ } else {
+ LOG(INFO) << prefix << " " << node_b_->name()->string_view() << " from "
+ << node_a_->name()->string_view() << " slope "
+ << std::setprecision(20) << fit_.slope() << " offset "
+ << fit_.offset().count() << " no samples.";
+ }
}
void NoncausalOffsetEstimator::Refit() {