Pull multi-node timestamp estimation into a class
There is enough code there and enough interactions to justify an
interface. Put it behind an interface to make it much more clear and
easier to enforce what updates when and with what guarentees.
This should provide no functional change.
Change-Id: Idd2c1849c2641df828c26860c61e238f47d87f9e
diff --git a/aos/configuration.cc b/aos/configuration.cc
index ad2e295..b8a38bc 100644
--- a/aos/configuration.cc
+++ b/aos/configuration.cc
@@ -878,6 +878,16 @@
return nullptr;
}
+const Node *GetNode(const Configuration *config, size_t node_index) {
+ if (!MultiNode(config)) {
+ CHECK_EQ(node_index, 0u) << ": Invalid node in a single node world.";
+ return nullptr;
+ } else {
+ CHECK_LT(node_index, config->nodes()->size());
+ return config->nodes()->Get(node_index);
+ }
+}
+
const Node *GetNodeOrDie(const Configuration *config, const Node *node) {
if (!MultiNode(config)) {
CHECK(node == nullptr) << ": Provided a node in a single node world.";
diff --git a/aos/configuration.h b/aos/configuration.h
index d10d16d..a7eff21 100644
--- a/aos/configuration.h
+++ b/aos/configuration.h
@@ -83,6 +83,9 @@
// can't be found.
const Node *GetNode(const Configuration *config, std::string_view name);
const Node *GetNode(const Configuration *config, const Node *node);
+// Returns the node with the provided index. This works in both a single and
+// multi-node world.
+const Node *GetNode(const Configuration *config, size_t node_index);
// Returns a matching node, or nullptr if the provided node is nullptr and we
// are in a single node world.
const Node *GetNodeOrDie(const Configuration *config, const Node *node);
diff --git a/aos/events/logging/BUILD b/aos/events/logging/BUILD
index 45ec394..7fa6ec2 100644
--- a/aos/events/logging/BUILD
+++ b/aos/events/logging/BUILD
@@ -169,7 +169,6 @@
srcs = [
"log_namer.cc",
"logger.cc",
- "logger_math.cc",
],
hdrs = [
"eigen_mpq.h",
@@ -185,6 +184,7 @@
"//aos/events:event_loop",
"//aos/events:simulated_event_loop",
"//aos/network:message_bridge_server_fbs",
+ "//aos/network:multinode_timestamp_filter",
"//aos/network:remote_message_fbs",
"//aos/network:remote_message_schema",
"//aos/network:team_number",
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index 654c7ce..671fbea 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -15,6 +15,7 @@
#include "aos/events/logging/logger_generated.h"
#include "aos/events/logging/uuid.h"
#include "aos/flatbuffer_merge.h"
+#include "aos/network/multinode_timestamp_filter.h"
#include "aos/network/remote_message_generated.h"
#include "aos/network/remote_message_schema.h"
#include "aos/network/team_number.h"
@@ -27,10 +28,7 @@
"If true, drop any forwarding entries with missing data. If "
"false, CHECK.");
-DEFINE_bool(timestamps_to_csv, false,
- "If true, write all the time synchronization information to a set "
- "of CSV files in /tmp/. This should only be needed when debugging "
- "time synchronization.");
+DECLARE_bool(timestamps_to_csv);
DEFINE_bool(skip_order_validation, false,
"If true, ignore any out of orderness in replay");
@@ -993,9 +991,6 @@
LOG(FATAL) << "Must call Deregister before the SimulatedEventLoopFactory "
"is destroyed";
}
- if (offset_fp_ != nullptr) {
- fclose(offset_fp_);
- }
// Zero out some buffers. It's easy to do use-after-frees on these, so make
// it more obvious.
if (remapped_configuration_buffer_) {
@@ -1051,6 +1046,9 @@
void LogReader::Register(SimulatedEventLoopFactory *event_loop_factory) {
event_loop_factory_ = event_loop_factory;
remapped_configuration_ = event_loop_factory_->configuration();
+ filters_ =
+ std::make_unique<message_bridge::MultiNodeNoncausalOffsetEstimator>(
+ event_loop_factory_);
for (const Node *node : configuration::GetNodes(configuration())) {
const size_t node_index =
@@ -1110,86 +1108,12 @@
"you sure that the replay config matches the original config?";
}
- // We need to now seed our per-node time offsets and get everything set up
- // to run.
- const size_t num_nodes = nodes_count();
-
- // It is easiest to solve for per node offsets with a matrix rather than
- // trying to solve the equations by hand. So let's get after it.
- //
- // Now, build up the map matrix.
- //
- // offset_matrix_ = (map_matrix_ + slope_matrix_) * [ta; tb; tc]
- map_matrix_ = Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic>::Zero(
- filters_.size() + 1, num_nodes);
- slope_matrix_ =
- Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic>::Zero(
- filters_.size() + 1, num_nodes);
-
- offset_matrix_ =
- Eigen::Matrix<mpq_class, Eigen::Dynamic, 1>::Zero(filters_.size() + 1);
- valid_matrix_ =
- Eigen::Matrix<bool, Eigen::Dynamic, 1>::Zero(filters_.size() + 1);
- last_valid_matrix_ =
- Eigen::Matrix<bool, Eigen::Dynamic, 1>::Zero(filters_.size() + 1);
-
- time_offset_matrix_ = Eigen::VectorXd::Zero(num_nodes);
- time_slope_matrix_ = Eigen::VectorXd::Zero(num_nodes);
-
- // All times should average out to the distributed clock.
- for (int i = 0; i < map_matrix_.cols(); ++i) {
- // 1/num_nodes.
- map_matrix_(0, i) = mpq_class(1, num_nodes);
- }
- valid_matrix_(0) = true;
-
- {
- // Now, add the a - b -> sample elements.
- size_t i = 1;
- for (std::pair<const std::tuple<const Node *, const Node *>,
- std::tuple<message_bridge::NoncausalOffsetEstimator>>
- &filter : filters_) {
- const Node *const node_a = std::get<0>(filter.first);
- const Node *const node_b = std::get<1>(filter.first);
-
- const size_t node_a_index =
- configuration::GetNodeIndex(configuration(), node_a);
- const size_t node_b_index =
- configuration::GetNodeIndex(configuration(), node_b);
-
- // -a
- map_matrix_(i, node_a_index) = mpq_class(-1);
- // +b
- map_matrix_(i, node_b_index) = mpq_class(1);
-
- // -> sample
- std::get<0>(filter.second)
- .set_slope_pointer(&slope_matrix_(i, node_a_index));
- std::get<0>(filter.second).set_offset_pointer(&offset_matrix_(i, 0));
-
- valid_matrix_(i) = false;
- std::get<0>(filter.second).set_valid_pointer(&valid_matrix_(i));
-
- ++i;
- }
- }
+ filters_->Initialize(logged_configuration());
for (std::unique_ptr<State> &state : states_) {
state->SeedSortedMessages();
}
- // Rank of the map matrix tells you if all the nodes are in communication
- // with each other, which tells you if the offsets are observable.
- const size_t connected_nodes =
- Eigen::FullPivLU<
- Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic>>(map_matrix_)
- .rank();
-
- // We don't need to support isolated nodes until someone has a real use
- // case.
- CHECK_EQ(connected_nodes, num_nodes)
- << ": There is a node which isn't communicating with the rest.";
-
// And solve.
UpdateOffsets();
@@ -1265,184 +1189,24 @@
}
if (FLAGS_timestamps_to_csv) {
- for (std::pair<const std::tuple<const Node *, const Node *>,
- std::tuple<message_bridge::NoncausalOffsetEstimator>>
- &filter : filters_) {
- const Node *const node_a = std::get<0>(filter.first);
- const Node *const node_b = std::get<1>(filter.first);
-
- std::get<0>(filter.second)
- .SetFirstFwdTime(event_loop_factory_->GetNodeEventLoopFactory(node_a)
- ->monotonic_now());
- std::get<0>(filter.second)
- .SetFirstRevTime(event_loop_factory_->GetNodeEventLoopFactory(node_b)
- ->monotonic_now());
- }
+ filters_->Start(event_loop_factory);
}
}
void LogReader::UpdateOffsets() {
- VLOG(2) << "Samples are " << offset_matrix_;
- VLOG(2) << "Map is " << (map_matrix_ + slope_matrix_);
- std::tie(time_slope_matrix_, time_offset_matrix_) = SolveOffsets();
- Eigen::IOFormat HeavyFmt(Eigen::FullPrecision, 0, ", ", ";\n", "[", "]", "[",
- "]");
- VLOG(1) << "First slope " << time_slope_matrix_.transpose().format(HeavyFmt)
- << " offset " << time_offset_matrix_.transpose().format(HeavyFmt);
-
- size_t node_index = 0;
- for (std::unique_ptr<State> &state : states_) {
- state->SetDistributedOffset(offset(node_index), slope(node_index));
- VLOG(1) << "Offset for node " << node_index << " "
- << MaybeNodeName(state->event_loop()->node()) << "is "
- << aos::distributed_clock::time_point(offset(node_index))
- << " slope " << std::setprecision(9) << std::fixed
- << slope(node_index);
- ++node_index;
- }
+ filters_->UpdateOffsets();
if (VLOG_IS_ON(1)) {
- LogFit("Offset is");
- }
-}
-
-void LogReader::LogFit(std::string_view prefix) {
- for (std::unique_ptr<State> &state : states_) {
- VLOG(1) << MaybeNodeName(state->event_loop()->node()) << " now "
- << state->monotonic_now() << " distributed "
- << event_loop_factory_->distributed_now();
- }
-
- for (std::pair<const std::tuple<const Node *, const Node *>,
- std::tuple<message_bridge::NoncausalOffsetEstimator>> &filter :
- filters_) {
- message_bridge::NoncausalOffsetEstimator *estimator =
- &std::get<0>(filter.second);
-
- if (estimator->a_timestamps().size() == 0 &&
- estimator->b_timestamps().size() == 0) {
- continue;
- }
-
- if (VLOG_IS_ON(1)) {
- estimator->LogFit(prefix);
- }
-
- const Node *const node_a = std::get<0>(filter.first);
- const Node *const node_b = std::get<1>(filter.first);
-
- const size_t node_a_index =
- configuration::GetNodeIndex(configuration(), node_a);
- const size_t node_b_index =
- configuration::GetNodeIndex(configuration(), node_b);
-
- const double recovered_slope =
- slope(node_b_index) / slope(node_a_index) - 1.0;
- const int64_t recovered_offset =
- offset(node_b_index).count() - offset(node_a_index).count() *
- slope(node_b_index) /
- slope(node_a_index);
-
- VLOG(1) << "Recovered slope " << std::setprecision(20) << recovered_slope
- << " (error " << recovered_slope - estimator->fit().slope() << ") "
- << " offset " << std::setprecision(20) << recovered_offset
- << " (error "
- << recovered_offset - estimator->fit().offset().count() << ")";
-
- const aos::distributed_clock::time_point a0 =
- states_[node_a_index]->ToDistributedClock(
- std::get<0>(estimator->a_timestamps()[0]));
- const aos::distributed_clock::time_point a1 =
- states_[node_a_index]->ToDistributedClock(
- std::get<0>(estimator->a_timestamps()[1]));
-
- VLOG(1) << node_a->name()->string_view() << " timestamps()[0] = "
- << std::get<0>(estimator->a_timestamps()[0]) << " -> " << a0
- << " distributed -> " << node_b->name()->string_view() << " "
- << states_[node_b_index]->FromDistributedClock(a0) << " should be "
- << aos::monotonic_clock::time_point(
- std::chrono::nanoseconds(static_cast<int64_t>(
- std::get<0>(estimator->a_timestamps()[0])
- .time_since_epoch()
- .count() *
- (1.0 + estimator->fit().slope()))) +
- estimator->fit().offset())
- << ((a0 <= event_loop_factory_->distributed_now())
- ? ""
- : " After now, investigate");
- VLOG(1) << node_a->name()->string_view() << " timestamps()[1] = "
- << std::get<0>(estimator->a_timestamps()[1]) << " -> " << a1
- << " distributed -> " << node_b->name()->string_view() << " "
- << states_[node_b_index]->FromDistributedClock(a1) << " should be "
- << aos::monotonic_clock::time_point(
- std::chrono::nanoseconds(static_cast<int64_t>(
- std::get<0>(estimator->a_timestamps()[1])
- .time_since_epoch()
- .count() *
- (1.0 + estimator->fit().slope()))) +
- estimator->fit().offset())
- << ((event_loop_factory_->distributed_now() <= a1)
- ? ""
- : " Before now, investigate");
-
- const aos::distributed_clock::time_point b0 =
- states_[node_b_index]->ToDistributedClock(
- std::get<0>(estimator->b_timestamps()[0]));
- const aos::distributed_clock::time_point b1 =
- states_[node_b_index]->ToDistributedClock(
- std::get<0>(estimator->b_timestamps()[1]));
-
- VLOG(1) << node_b->name()->string_view() << " timestamps()[0] = "
- << std::get<0>(estimator->b_timestamps()[0]) << " -> " << b0
- << " distributed -> " << node_a->name()->string_view() << " "
- << states_[node_a_index]->FromDistributedClock(b0)
- << ((b0 <= event_loop_factory_->distributed_now())
- ? ""
- : " After now, investigate");
- VLOG(1) << node_b->name()->string_view() << " timestamps()[1] = "
- << std::get<0>(estimator->b_timestamps()[1]) << " -> " << b1
- << " distributed -> " << node_a->name()->string_view() << " "
- << states_[node_a_index]->FromDistributedClock(b1)
- << ((event_loop_factory_->distributed_now() <= b1)
- ? ""
- : " Before now, investigate");
+ filters_->LogFit("Offset is");
}
}
message_bridge::NoncausalOffsetEstimator *LogReader::GetFilter(
const Node *node_a, const Node *node_b) {
- CHECK_NE(node_a, node_b);
- CHECK_EQ(configuration::GetNode(configuration(), node_a), node_a);
- CHECK_EQ(configuration::GetNode(configuration(), node_b), node_b);
-
- if (node_a > node_b) {
- return GetFilter(node_b, node_a);
+ if (filters_) {
+ return filters_->GetFilter(node_a, node_b);
}
-
- auto tuple = std::make_tuple(node_a, node_b);
-
- auto it = filters_.find(tuple);
-
- if (it == filters_.end()) {
- auto &x =
- filters_
- .insert(std::make_pair(
- tuple, std::make_tuple(message_bridge::NoncausalOffsetEstimator(
- node_a, node_b))))
- .first->second;
- if (FLAGS_timestamps_to_csv) {
- std::get<0>(x).SetFwdCsvFileName(absl::StrCat(
- "/tmp/timestamp_noncausal_", node_a->name()->string_view(), "_",
- node_b->name()->string_view()));
- std::get<0>(x).SetRevCsvFileName(absl::StrCat(
- "/tmp/timestamp_noncausal_", node_b->name()->string_view(), "_",
- node_a->name()->string_view()));
- }
-
- return &std::get<0>(x);
- } else {
- return &std::get<0>(it->second);
- }
+ return nullptr;
}
void LogReader::Register(EventLoop *event_loop) {
@@ -1519,7 +1283,7 @@
return;
}
if (VLOG_IS_ON(1)) {
- LogFit("Offset was");
+ filters_->LogFit("Offset was");
}
bool update_time;
@@ -1601,31 +1365,6 @@
timestamped_message.monotonic_remote_time)
<< ") " << state->DebugString();
}
-
- if (FLAGS_timestamps_to_csv) {
- if (offset_fp_ == nullptr) {
- offset_fp_ = fopen("/tmp/offsets.csv", "w");
- fprintf(
- offset_fp_,
- "# time_since_start, offset node 0, offset node 1, ...\n");
- first_time_ = timestamped_message.realtime_event_time;
- }
-
- fprintf(offset_fp_, "%.9f",
- std::chrono::duration_cast<std::chrono::duration<double>>(
- timestamped_message.realtime_event_time - first_time_)
- .count());
- for (int i = 1; i < time_offset_matrix_.rows(); ++i) {
- fprintf(offset_fp_, ", %.9f",
- time_offset_matrix_(i, 0) +
- time_slope_matrix_(i, 0) *
- chrono::duration<double>(
- event_loop_factory_->distributed_now()
- .time_since_epoch())
- .count());
- }
- fprintf(offset_fp_, "\n");
- }
}
// If we have access to the factory, use it to fix the realtime time.
@@ -1681,6 +1420,8 @@
// which involves time before changing it. That especially includes
// sending the message.
if (update_time) {
+ filters_->LogFit("");
+
VLOG(1) << MaybeNodeName(state->event_loop()->node())
<< "updating offsets";
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index d042f90..8b31e6c 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -19,6 +19,7 @@
#include "aos/events/logging/uuid.h"
#include "aos/events/simulated_event_loop.h"
#include "aos/network/message_bridge_server_generated.h"
+#include "aos/network/multinode_timestamp_filter.h"
#include "aos/network/remote_message_generated.h"
#include "aos/network/timestamp_filter.h"
#include "aos/time/time.h"
@@ -487,13 +488,6 @@
// the header is likely distracting.
SizePrefixedFlatbufferVector<LogFileHeader> log_file_header_;
- // Returns [ta; tb; ...] = tuple[0] * t + tuple[1]
- std::tuple<Eigen::Matrix<double, Eigen::Dynamic, 1>,
- Eigen::Matrix<double, Eigen::Dynamic, 1>>
- SolveOffsets();
-
- void LogFit(std::string_view prefix);
-
// State per node.
class State {
public:
@@ -555,18 +549,6 @@
return node_event_loop_factory_->ToDistributedClock(time);
}
- monotonic_clock::time_point FromDistributedClock(
- distributed_clock::time_point time) {
- return node_event_loop_factory_->FromDistributedClock(time);
- }
-
- // Sets the offset (and slope) from the distributed clock.
- void SetDistributedOffset(std::chrono::nanoseconds distributed_offset,
- double distributed_slope) {
- node_event_loop_factory_->SetDistributedOffset(distributed_offset,
- distributed_slope);
- }
-
// Returns the current time on the remote node which sends messages on
// channel_index.
monotonic_clock::time_point monotonic_remote_now(size_t channel_index) {
@@ -709,89 +691,14 @@
message_bridge::NoncausalOffsetEstimator *GetFilter(const Node *node_a,
const Node *node_b);
- // FILE to write offsets to (if populated).
- FILE *offset_fp_ = nullptr;
- // Timestamp of the first piece of data used for the horizontal axis on the
- // plot.
- aos::realtime_clock::time_point first_time_;
-
// List of filters for a connection. The pointer to the first node will be
// less than the second node.
- std::map<std::tuple<const Node *, const Node *>,
- std::tuple<message_bridge::NoncausalOffsetEstimator>>
- filters_;
-
- // Returns the offset from the monotonic clock for a node to the distributed
- // clock. monotonic = distributed * slope() + offset();
- double slope(int node_index) const {
- CHECK_LT(node_index, time_slope_matrix_.rows())
- << ": Got too high of a node index.";
- return time_slope_matrix_(node_index);
- }
- std::chrono::nanoseconds offset(int node_index) const {
- CHECK_LT(node_index, time_offset_matrix_.rows())
- << ": Got too high of a node index.";
- return std::chrono::duration_cast<std::chrono::nanoseconds>(
- std::chrono::duration<double>(time_offset_matrix_(node_index)));
- }
+ std::unique_ptr<message_bridge::MultiNodeNoncausalOffsetEstimator> filters_;
// Updates the offset matrix solution and sets the per-node distributed
// offsets in the factory.
void UpdateOffsets();
- // We have 2 types of equations to do a least squares regression over to fully
- // constrain our time function.
- //
- // One is simple. The distributed clock is the average of all the clocks.
- // (ta + tb + tc + td) / num_nodes = t_distributed
- //
- // The second is a bit more complicated. Our basic time conversion function
- // is:
- // tb = ta + (ta * slope + offset)
- // We can rewrite this as follows
- // tb - (1 + slope) * ta = offset
- //
- // From here, we have enough equations to solve for t{a,b,c,...} We want to
- // take as an input the offsets and slope, and solve for the per-node times as
- // a function of the distributed clock.
- //
- // We need to massage our equations to make this work. If we solve for the
- // per-node times at two set distributed clock times, we will be able to
- // recreate the linear function (we know it is linear). We can do a similar
- // thing by breaking our equation up into:
- //
- // [1/3 1/3 1/3 ] [ta] [t_distributed]
- // [ 1 -1-m1 0 ] [tb] = [oab]
- // [ 1 0 -1-m2 ] [tc] [oac]
- //
- // This solves to:
- //
- // [ta] [ a00 a01 a02] [t_distributed]
- // [tb] = [ a10 a11 a12] * [oab]
- // [tc] [ a20 a21 a22] [oac]
- //
- // and can be split into:
- //
- // [ta] [ a00 ] [a01 a02]
- // [tb] = [ a10 ] * t_distributed + [a11 a12] * [oab]
- // [tc] [ a20 ] [a21 a22] [oac]
- //
- // (map_matrix_ + slope_matrix_) * [ta; tb; tc] = [offset_matrix_];
- // offset_matrix_ will be in nanoseconds.
- Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic> map_matrix_;
- Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic> slope_matrix_;
- Eigen::Matrix<mpq_class, Eigen::Dynamic, 1> offset_matrix_;
- // Matrix tracking which offsets are valid.
- Eigen::Matrix<bool, Eigen::Dynamic, 1> valid_matrix_;
- // Matrix tracking the last valid matrix we used to determine connected nodes.
- Eigen::Matrix<bool, Eigen::Dynamic, 1> last_valid_matrix_;
- size_t cached_valid_node_count_ = 0;
-
- // [ta; tb; tc] = time_slope_matrix_ * t + time_offset_matrix;
- // t is in seconds.
- Eigen::Matrix<double, Eigen::Dynamic, 1> time_slope_matrix_;
- Eigen::Matrix<double, Eigen::Dynamic, 1> time_offset_matrix_;
-
std::unique_ptr<FlatbufferDetachedBuffer<Configuration>>
remapped_configuration_buffer_;
diff --git a/aos/events/logging/logger_math.cc b/aos/events/logging/logger_math.cc
deleted file mode 100644
index ea360cc..0000000
--- a/aos/events/logging/logger_math.cc
+++ /dev/null
@@ -1,240 +0,0 @@
-#include "aos/events/logging/logger.h"
-
-#include "Eigen/Dense"
-
-#include "third_party/gmp/gmpxx.h"
-
-namespace aos {
-namespace logger {
-
-namespace {
-Eigen::Matrix<double, Eigen::Dynamic, Eigen::Dynamic> ToDouble(
- Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic> in) {
- Eigen::Matrix<double, Eigen::Dynamic, Eigen::Dynamic> result =
- Eigen::Matrix<double, Eigen::Dynamic, Eigen::Dynamic>::Zero(in.rows(),
- in.cols());
- for (int i = 0; i < in.rows(); ++i) {
- for (int j = 0; j < in.cols(); ++j) {
- result(i, j) = in(i, j).get_d();
- }
- }
- return result;
-}
-
-std::tuple<Eigen::Matrix<double, Eigen::Dynamic, 1>,
- Eigen::Matrix<double, Eigen::Dynamic, 1>>
-Solve(const Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic> &mpq_map,
- const Eigen::Matrix<mpq_class, Eigen::Dynamic, 1> &mpq_offsets) {
- aos::monotonic_clock::time_point start_time = aos::monotonic_clock::now();
- // Least squares solve for the slopes and offsets.
- const Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic> inv =
- (mpq_map.transpose() * mpq_map).inverse() * mpq_map.transpose();
- aos::monotonic_clock::time_point end_time = aos::monotonic_clock::now();
-
- VLOG(1) << "Took "
- << std::chrono::duration<double>(end_time - start_time).count()
- << " seconds to invert";
-
- Eigen::Matrix<mpq_class, Eigen::Dynamic, 1> mpq_solution_slope =
- inv.block(0, 0, inv.rows(), 1);
- Eigen::Matrix<mpq_class, Eigen::Dynamic, 1> mpq_solution_offset =
- inv.block(0, 1, inv.rows(), inv.cols() - 1) *
- mpq_offsets.block(1, 0, inv.rows() - 1, 1);
-
- mpq_solution_offset *= mpq_class(1, 1000000000);
-
- return std::make_tuple(ToDouble(mpq_solution_slope),
- ToDouble(mpq_solution_offset));
-}
-} // namespace
-
-// This is slow to compile, so we put it in a separate file. More parallelism
-// and less change.
-std::tuple<Eigen::Matrix<double, Eigen::Dynamic, 1>,
- Eigen::Matrix<double, Eigen::Dynamic, 1>>
-LogReader::SolveOffsets() {
- // TODO(austin): Split this out and unit tests a bit better. When we do
- // partial node subsets and also try to optimize this again would be a good
- // time.
- //
- // TODO(austin): CHECK that the number doesn't change over time. We can freak
- // out if that happens.
-
- // Start by counting how many node pairs we have an offset estimated for.
- int nonzero_offset_count = 1;
- for (int i = 1; i < valid_matrix_.rows(); ++i) {
- if (valid_matrix_(i) != 0) {
- ++nonzero_offset_count;
- }
- }
-
- Eigen::IOFormat HeavyFmt(Eigen::FullPrecision, 0, ", ", ";\n", "[", "]", "[",
- "]");
-
- // If there are missing rows, we can't solve the original problem and instead
- // need to filter the matrix to remove the missing rows and solve a simplified
- // problem. What this means practically is that we might have pairs of nodes
- // which are communicating, but we don't have timestamps between. But we can
- // have multiple paths in our graph between 2 nodes, so we can still solve
- // time without the missing timestamp.
- //
- // In the following example, we can drop any of the last 3 rows, and still
- // solve.
- //
- // [1/3 1/3 1/3 ] [ta] [t_distributed]
- // [ 1 -1-m1 0 ] [tb] = [oab]
- // [ 1 0 -1-m2 ] [tc] [oac]
- // [ 0 1 -1-m2 ] [obc]
- if (nonzero_offset_count != offset_matrix_.rows()) {
- Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic> mpq_map =
- Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic>::Zero(
- nonzero_offset_count, map_matrix_.cols());
- Eigen::Matrix<mpq_class, Eigen::Dynamic, 1> mpq_offsets =
- Eigen::Matrix<mpq_class, Eigen::Dynamic, 1>::Zero(nonzero_offset_count);
-
- std::vector<bool> valid_nodes(nodes_count(), false);
-
- size_t destination_row = 0;
- for (int j = 0; j < map_matrix_.cols(); ++j) {
- mpq_map(0, j) = mpq_class(1, map_matrix_.cols());
- }
- mpq_offsets(0) = mpq_class(0);
- ++destination_row;
-
- for (int i = 1; i < offset_matrix_.rows(); ++i) {
- // Copy over the first row, i.e. the row which says that all times average
- // to the distributed time. And then copy over all valid rows.
- if (valid_matrix_(i)) {
- mpq_offsets(destination_row) = mpq_class(offset_matrix_(i));
-
- for (int j = 0; j < map_matrix_.cols(); ++j) {
- mpq_map(destination_row, j) = map_matrix_(i, j) + slope_matrix_(i, j);
- if (mpq_map(destination_row, j) != 0) {
- valid_nodes[j] = true;
- }
- }
-
- ++destination_row;
- }
- }
-
- VLOG(1) << "Filtered map " << ToDouble(mpq_map).format(HeavyFmt);
- VLOG(1) << "Filtered offsets " << ToDouble(mpq_offsets).format(HeavyFmt);
-
- // Compute (and cache) the current connectivity. If we have N nodes
- // configured, but logs only from one of them, we want to assume that the
- // rest of the nodes match the distributed clock exactly.
- //
- // If data shows up later for them, we will CHECK when time jumps.
- //
- // TODO(austin): Once we have more info on what cases are reasonable, we can
- // open up the restrictions.
- if (valid_matrix_ != last_valid_matrix_) {
- Eigen::FullPivLU<Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic>>
- full_piv(mpq_map);
- const size_t connected_nodes = full_piv.rank();
-
- size_t valid_node_count = 0;
- for (size_t i = 0; i < valid_nodes.size(); ++i) {
- const bool valid_node = valid_nodes[i];
- if (valid_node) {
- ++valid_node_count;
- } else {
- LOG(WARNING)
- << "Node "
- << logged_configuration()->nodes()->Get(i)->name()->string_view()
- << " has no observations, setting to distributed clock.";
- }
- }
-
- // Confirm that the set of nodes we have connected matches the rank.
- // Otherwise a<->b and c<->d would count as 4 but really is 3.
- CHECK_EQ(std::max(static_cast<size_t>(1u), valid_node_count),
- connected_nodes)
- << ": Ambiguous nodes.";
-
- last_valid_matrix_ = valid_matrix_;
- cached_valid_node_count_ = valid_node_count;
- }
-
- // There are 2 cases. Either all the nodes are connected with each other by
- // actual data, or we have isolated nodes. We want to force the isolated
- // nodes to match the distributed clock exactly, and to solve for the other
- // nodes.
- if (cached_valid_node_count_ == 0) {
- // Cheat. If there are no valid nodes, the slopes are 1, and offset is 0,
- // ie, just be the distributed clock.
- return std::make_tuple(
- Eigen::Matrix<double, Eigen::Dynamic, 1>::Ones(nodes_count()),
- Eigen::Matrix<double, Eigen::Dynamic, 1>::Zero(nodes_count()));
- } else if (cached_valid_node_count_ == nodes_count()) {
- return Solve(mpq_map, mpq_offsets);
- } else {
- // Strip out any columns (nodes) which aren't relevant. Solve the
- // simplified problem, then set any nodes which were missing back to slope
- // 1, offset 0 (ie the distributed clock).
- Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic>
- valid_node_mpq_map =
- Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic>::Zero(
- nonzero_offset_count, cached_valid_node_count_);
-
- {
- // Only copy over the columns with valid nodes in them.
- size_t column = 0;
- for (size_t i = 0; i < valid_nodes.size(); ++i) {
- if (valid_nodes[i]) {
- valid_node_mpq_map.col(column) = mpq_map.col(i);
-
- ++column;
- }
- }
- // The 1/n needs to be based on the number of nodes being solved.
- // Recreate it here.
- for (int j = 0; j < valid_node_mpq_map.cols(); ++j) {
- valid_node_mpq_map(0, j) = mpq_class(1, cached_valid_node_count_);
- }
- }
-
- VLOG(1) << "Reduced node filtered map "
- << ToDouble(valid_node_mpq_map).format(HeavyFmt);
- VLOG(1) << "Reduced node filtered offsets "
- << ToDouble(mpq_offsets).format(HeavyFmt);
-
- // Solve the simplified problem now.
- std::tuple<Eigen::Matrix<double, Eigen::Dynamic, 1>,
- Eigen::Matrix<double, Eigen::Dynamic, 1>>
- valid_result = Solve(valid_node_mpq_map, mpq_offsets);
-
- // And expand the results back into a solution matrix.
- std::tuple<Eigen::Matrix<double, Eigen::Dynamic, 1>,
- Eigen::Matrix<double, Eigen::Dynamic, 1>>
- result = std::make_tuple(
- Eigen::Matrix<double, Eigen::Dynamic, 1>::Ones(nodes_count()),
- Eigen::Matrix<double, Eigen::Dynamic, 1>::Zero(nodes_count()));
-
- {
- size_t column = 0;
- for (size_t i = 0; i < valid_nodes.size(); ++i) {
- if (valid_nodes[i]) {
- std::get<0>(result)(i) = std::get<0>(valid_result)(column);
- std::get<1>(result)(i) = std::get<1>(valid_result)(column);
-
- ++column;
- }
- }
- }
-
- return result;
- }
- } else {
- const Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic> mpq_map =
- map_matrix_ + slope_matrix_;
- VLOG(1) << "map " << (map_matrix_ + slope_matrix_).format(HeavyFmt);
- VLOG(1) << "offsets " << offset_matrix_.format(HeavyFmt);
-
- return Solve(mpq_map, offset_matrix_);
- }
-}
-
-} // namespace logger
-} // namespace aos
diff --git a/aos/network/BUILD b/aos/network/BUILD
index 13ba42e..ade8355 100644
--- a/aos/network/BUILD
+++ b/aos/network/BUILD
@@ -450,6 +450,21 @@
],
)
+cc_library(
+ name = "multinode_timestamp_filter",
+ srcs = ["multinode_timestamp_filter.cc"],
+ hdrs = ["multinode_timestamp_filter.h"],
+ target_compatible_with = ["@platforms//os:linux"],
+ deps = [
+ ":timestamp_filter",
+ "//aos:configuration",
+ "//aos/events:simulated_event_loop",
+ "//aos/time",
+ "//third_party/gmp",
+ "@org_tuxfamily_eigen//:eigen",
+ ],
+)
+
cc_test(
name = "timestamp_filter_test",
srcs = [
diff --git a/aos/network/multinode_timestamp_filter.cc b/aos/network/multinode_timestamp_filter.cc
new file mode 100644
index 0000000..4b36287
--- /dev/null
+++ b/aos/network/multinode_timestamp_filter.cc
@@ -0,0 +1,516 @@
+#include "aos/network/multinode_timestamp_filter.h"
+
+#include <map>
+
+#include "aos/configuration.h"
+#include "aos/events/simulated_event_loop.h"
+#include "aos/network/timestamp_filter.h"
+#include "aos/time/time.h"
+#include "glog/logging.h"
+
+DEFINE_bool(timestamps_to_csv, false,
+ "If true, write all the time synchronization information to a set "
+ "of CSV files in /tmp/. This should only be needed when debugging "
+ "time synchronization.");
+
+namespace aos {
+namespace message_bridge {
+namespace {
+Eigen::Matrix<double, Eigen::Dynamic, Eigen::Dynamic> ToDouble(
+ Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic> in) {
+ Eigen::Matrix<double, Eigen::Dynamic, Eigen::Dynamic> result =
+ Eigen::Matrix<double, Eigen::Dynamic, Eigen::Dynamic>::Zero(in.rows(),
+ in.cols());
+ for (int i = 0; i < in.rows(); ++i) {
+ for (int j = 0; j < in.cols(); ++j) {
+ result(i, j) = in(i, j).get_d();
+ }
+ }
+ return result;
+}
+
+std::tuple<Eigen::Matrix<double, Eigen::Dynamic, 1>,
+ Eigen::Matrix<double, Eigen::Dynamic, 1>>
+Solve(const Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic> &mpq_map,
+ const Eigen::Matrix<mpq_class, Eigen::Dynamic, 1> &mpq_offsets) {
+ aos::monotonic_clock::time_point start_time = aos::monotonic_clock::now();
+ // Least squares solve for the slopes and offsets.
+ const Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic> inv =
+ (mpq_map.transpose() * mpq_map).inverse() * mpq_map.transpose();
+ aos::monotonic_clock::time_point end_time = aos::monotonic_clock::now();
+
+ VLOG(1) << "Took "
+ << std::chrono::duration<double>(end_time - start_time).count()
+ << " seconds to invert";
+
+ Eigen::Matrix<mpq_class, Eigen::Dynamic, 1> mpq_solution_slope =
+ inv.block(0, 0, inv.rows(), 1);
+ Eigen::Matrix<mpq_class, Eigen::Dynamic, 1> mpq_solution_offset =
+ inv.block(0, 1, inv.rows(), inv.cols() - 1) *
+ mpq_offsets.block(1, 0, inv.rows() - 1, 1);
+
+ mpq_solution_offset *= mpq_class(1, 1000000000);
+
+ return std::make_tuple(ToDouble(mpq_solution_slope),
+ ToDouble(mpq_solution_offset));
+}
+} // namespace
+
+void MultiNodeNoncausalOffsetEstimator::Start(
+ SimulatedEventLoopFactory *factory) {
+ for (std::pair<const std::tuple<const Node *, const Node *>,
+ std::tuple<message_bridge::NoncausalOffsetEstimator>> &filter :
+ filters_) {
+ const Node *const node_a = std::get<0>(filter.first);
+ const Node *const node_b = std::get<1>(filter.first);
+
+ std::get<0>(filter.second)
+ .SetFirstFwdTime(
+ factory->GetNodeEventLoopFactory(node_a)->monotonic_now());
+ std::get<0>(filter.second)
+ .SetFirstRevTime(
+ factory->GetNodeEventLoopFactory(node_b)->monotonic_now());
+ }
+}
+
+message_bridge::NoncausalOffsetEstimator *
+MultiNodeNoncausalOffsetEstimator::GetFilter(const Node *node_a,
+ const Node *node_b) {
+ CHECK_NE(node_a, node_b);
+ CHECK_EQ(configuration::GetNode(configuration(), node_a), node_a);
+ CHECK_EQ(configuration::GetNode(configuration(), node_b), node_b);
+
+ if (node_a > node_b) {
+ return GetFilter(node_b, node_a);
+ }
+
+ auto tuple = std::make_tuple(node_a, node_b);
+
+ auto it = filters_.find(tuple);
+
+ if (it == filters_.end()) {
+ auto &x =
+ filters_
+ .insert(std::make_pair(
+ tuple, std::make_tuple(message_bridge::NoncausalOffsetEstimator(
+ node_a, node_b))))
+ .first->second;
+ if (FLAGS_timestamps_to_csv) {
+ std::get<0>(x).SetFwdCsvFileName(absl::StrCat(
+ "/tmp/timestamp_noncausal_", node_a->name()->string_view(), "_",
+ node_b->name()->string_view()));
+ std::get<0>(x).SetRevCsvFileName(absl::StrCat(
+ "/tmp/timestamp_noncausal_", node_b->name()->string_view(), "_",
+ node_a->name()->string_view()));
+ }
+
+ return &std::get<0>(x);
+ } else {
+ return &std::get<0>(it->second);
+ }
+}
+
+void MultiNodeNoncausalOffsetEstimator::LogFit(std::string_view prefix) {
+ for (size_t node_index = 0; node_index < nodes_count(); ++node_index) {
+ const Node *node = configuration::GetNode(configuration(), node_index);
+ VLOG(1)
+ << node->name()->string_view() << " now "
+ << event_loop_factory_->GetNodeEventLoopFactory(node)->monotonic_now()
+ << " distributed " << event_loop_factory_->distributed_now();
+ }
+
+ for (std::pair<const std::tuple<const Node *, const Node *>,
+ std::tuple<message_bridge::NoncausalOffsetEstimator>> &filter :
+ filters_) {
+ message_bridge::NoncausalOffsetEstimator *estimator =
+ &std::get<0>(filter.second);
+
+ if (estimator->a_timestamps().size() == 0 &&
+ estimator->b_timestamps().size() == 0) {
+ continue;
+ }
+
+ if (VLOG_IS_ON(1)) {
+ estimator->LogFit(prefix);
+ }
+
+ const Node *const node_a = std::get<0>(filter.first);
+ const Node *const node_b = std::get<1>(filter.first);
+
+ const size_t node_a_index =
+ configuration::GetNodeIndex(configuration(), node_a);
+ const size_t node_b_index =
+ configuration::GetNodeIndex(configuration(), node_b);
+
+ NodeEventLoopFactory *node_a_factory =
+ event_loop_factory_->GetNodeEventLoopFactory(node_a);
+ NodeEventLoopFactory *node_b_factory =
+ event_loop_factory_->GetNodeEventLoopFactory(node_b);
+
+ const double recovered_slope =
+ slope(node_b_index) / slope(node_a_index) - 1.0;
+ const int64_t recovered_offset =
+ offset(node_b_index).count() - offset(node_a_index).count() *
+ slope(node_b_index) /
+ slope(node_a_index);
+
+ VLOG(2) << "Recovered slope " << std::setprecision(20) << recovered_slope
+ << " (error " << recovered_slope - estimator->fit().slope() << ") "
+ << " offset " << std::setprecision(20) << recovered_offset
+ << " (error "
+ << recovered_offset - estimator->fit().offset().count() << ")";
+
+ const aos::distributed_clock::time_point a0 =
+ node_a_factory->ToDistributedClock(
+ std::get<0>(estimator->a_timestamps()[0]));
+ const aos::distributed_clock::time_point a1 =
+ node_a_factory->ToDistributedClock(
+ std::get<0>(estimator->a_timestamps()[1]));
+
+ VLOG(2) << node_a->name()->string_view() << " timestamps()[0] = "
+ << std::get<0>(estimator->a_timestamps()[0]) << " -> " << a0
+ << " distributed -> " << node_b->name()->string_view() << " "
+ << node_b_factory->FromDistributedClock(a0) << " should be "
+ << aos::monotonic_clock::time_point(
+ std::chrono::nanoseconds(static_cast<int64_t>(
+ std::get<0>(estimator->a_timestamps()[0])
+ .time_since_epoch()
+ .count() *
+ (1.0 + estimator->fit().slope()))) +
+ estimator->fit().offset())
+ << ((a0 <= event_loop_factory_->distributed_now())
+ ? ""
+ : " After now, investigate");
+ VLOG(2) << node_a->name()->string_view() << " timestamps()[1] = "
+ << std::get<0>(estimator->a_timestamps()[1]) << " -> " << a1
+ << " distributed -> " << node_b->name()->string_view() << " "
+ << node_b_factory->FromDistributedClock(a1) << " should be "
+ << aos::monotonic_clock::time_point(
+ std::chrono::nanoseconds(static_cast<int64_t>(
+ std::get<0>(estimator->a_timestamps()[1])
+ .time_since_epoch()
+ .count() *
+ (1.0 + estimator->fit().slope()))) +
+ estimator->fit().offset())
+ << ((event_loop_factory_->distributed_now() <= a1)
+ ? ""
+ : " Before now, investigate");
+
+ const aos::distributed_clock::time_point b0 =
+ node_b_factory->ToDistributedClock(
+ std::get<0>(estimator->b_timestamps()[0]));
+ const aos::distributed_clock::time_point b1 =
+ node_b_factory->ToDistributedClock(
+ std::get<0>(estimator->b_timestamps()[1]));
+
+ VLOG(2) << node_b->name()->string_view() << " timestamps()[0] = "
+ << std::get<0>(estimator->b_timestamps()[0]) << " -> " << b0
+ << " distributed -> " << node_a->name()->string_view() << " "
+ << node_a_factory->FromDistributedClock(b0)
+ << ((b0 <= event_loop_factory_->distributed_now())
+ ? ""
+ : " After now, investigate");
+ VLOG(2) << node_b->name()->string_view() << " timestamps()[1] = "
+ << std::get<0>(estimator->b_timestamps()[1]) << " -> " << b1
+ << " distributed -> " << node_a->name()->string_view() << " "
+ << node_a_factory->FromDistributedClock(b1)
+ << ((event_loop_factory_->distributed_now() <= b1)
+ ? ""
+ : " Before now, investigate");
+ }
+}
+
+void MultiNodeNoncausalOffsetEstimator::UpdateOffsets() {
+ VLOG(2) << "Samples are " << offset_matrix_;
+ VLOG(2) << "Map is " << (map_matrix_ + slope_matrix_);
+ std::tie(time_slope_matrix_, time_offset_matrix_) = SolveOffsets();
+ Eigen::IOFormat HeavyFmt(Eigen::FullPrecision, 0, ", ", ";\n", "[", "]", "[",
+ "]");
+ VLOG(1) << "First slope " << time_slope_matrix_.transpose().format(HeavyFmt)
+ << " offset " << time_offset_matrix_.transpose().format(HeavyFmt);
+
+ for (size_t node_index = 0; node_index < nodes_count(); ++node_index) {
+ const Node *node = configuration::GetNode(configuration(), node_index);
+ event_loop_factory_->GetNodeEventLoopFactory(node)->SetDistributedOffset(
+ offset(node_index), slope(node_index));
+
+ VLOG(1) << "Offset for node " << node_index << " "
+ << node->name()->string_view() << " is "
+ << aos::distributed_clock::time_point(offset(node_index))
+ << " slope " << std::setprecision(9) << std::fixed
+ << slope(node_index);
+ }
+}
+
+void MultiNodeNoncausalOffsetEstimator::Initialize(
+ const Configuration *logged_configuration) {
+ logged_configuration_ = logged_configuration;
+ // We need to now seed our per-node time offsets and get everything set up
+ // to run.
+ const size_t num_nodes = nodes_count();
+
+ // It is easiest to solve for per node offsets with a matrix rather than
+ // trying to solve the equations by hand. So let's get after it.
+ //
+ // Now, build up the map matrix.
+ //
+ // offset_matrix_ = (map_matrix_ + slope_matrix_) * [ta; tb; tc]
+ map_matrix_ = Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic>::Zero(
+ filters_.size() + 1, num_nodes);
+ slope_matrix_ =
+ Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic>::Zero(
+ filters_.size() + 1, num_nodes);
+
+ offset_matrix_ =
+ Eigen::Matrix<mpq_class, Eigen::Dynamic, 1>::Zero(filters_.size() + 1);
+ valid_matrix_ =
+ Eigen::Matrix<bool, Eigen::Dynamic, 1>::Zero(filters_.size() + 1);
+ last_valid_matrix_ =
+ Eigen::Matrix<bool, Eigen::Dynamic, 1>::Zero(filters_.size() + 1);
+
+ time_offset_matrix_ = Eigen::VectorXd::Zero(num_nodes);
+ time_slope_matrix_ = Eigen::VectorXd::Zero(num_nodes);
+
+ // All times should average out to the distributed clock.
+ for (int i = 0; i < map_matrix_.cols(); ++i) {
+ // 1/num_nodes.
+ map_matrix_(0, i) = mpq_class(1, num_nodes);
+ }
+ valid_matrix_(0) = true;
+
+ {
+ // Now, add the a - b -> sample elements.
+ size_t i = 1;
+ for (std::pair<const std::tuple<const Node *, const Node *>,
+ std::tuple<message_bridge::NoncausalOffsetEstimator>>
+ &filter : filters_) {
+ const Node *const node_a = std::get<0>(filter.first);
+ const Node *const node_b = std::get<1>(filter.first);
+
+ const size_t node_a_index =
+ configuration::GetNodeIndex(configuration(), node_a);
+ const size_t node_b_index =
+ configuration::GetNodeIndex(configuration(), node_b);
+
+ // -a
+ map_matrix_(i, node_a_index) = mpq_class(-1);
+ // +b
+ map_matrix_(i, node_b_index) = mpq_class(1);
+
+ // -> sample
+ std::get<0>(filter.second)
+ .set_slope_pointer(&slope_matrix_(i, node_a_index));
+ std::get<0>(filter.second).set_offset_pointer(&offset_matrix_(i, 0));
+
+ valid_matrix_(i) = false;
+ std::get<0>(filter.second).set_valid_pointer(&valid_matrix_(i));
+
+ ++i;
+ }
+ }
+
+ const size_t connected_nodes = ConnectedNodes();
+
+ // We don't need to support isolated nodes until someone has a real use
+ // case.
+ CHECK_EQ(connected_nodes, num_nodes)
+ << ": There is a node which isn't communicating with the rest.";
+}
+
+std::tuple<Eigen::Matrix<double, Eigen::Dynamic, 1>,
+ Eigen::Matrix<double, Eigen::Dynamic, 1>>
+MultiNodeNoncausalOffsetEstimator::SolveOffsets() {
+ // TODO(austin): Split this out and unit tests a bit better. When we do
+ // partial node subsets and also try to optimize this again would be a good
+ // time.
+ //
+ // TODO(austin): CHECK that the number doesn't change over time. We can freak
+ // out if that happens.
+
+ // Start by counting how many node pairs we have an offset estimated for.
+ int nonzero_offset_count = 1;
+ for (int i = 1; i < valid_matrix_.rows(); ++i) {
+ if (valid_matrix_(i) != 0) {
+ ++nonzero_offset_count;
+ }
+ }
+
+ Eigen::IOFormat HeavyFmt(Eigen::FullPrecision, 0, ", ", ";\n", "[", "]", "[",
+ "]");
+
+ // If there are missing rows, we can't solve the original problem and instead
+ // need to filter the matrix to remove the missing rows and solve a simplified
+ // problem. What this means practically is that we might have pairs of nodes
+ // which are communicating, but we don't have timestamps between. But we can
+ // have multiple paths in our graph between 2 nodes, so we can still solve
+ // time without the missing timestamp.
+ //
+ // In the following example, we can drop any of the last 3 rows, and still
+ // solve.
+ //
+ // [1/3 1/3 1/3 ] [ta] [t_distributed]
+ // [ 1 -1-m1 0 ] [tb] = [oab]
+ // [ 1 0 -1-m2 ] [tc] [oac]
+ // [ 0 1 -1-m2 ] [obc]
+ if (nonzero_offset_count != offset_matrix_.rows()) {
+ Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic> mpq_map =
+ Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic>::Zero(
+ nonzero_offset_count, map_matrix_.cols());
+ Eigen::Matrix<mpq_class, Eigen::Dynamic, 1> mpq_offsets =
+ Eigen::Matrix<mpq_class, Eigen::Dynamic, 1>::Zero(nonzero_offset_count);
+
+ std::vector<bool> valid_nodes(nodes_count(), false);
+
+ size_t destination_row = 0;
+ for (int j = 0; j < map_matrix_.cols(); ++j) {
+ mpq_map(0, j) = mpq_class(1, map_matrix_.cols());
+ }
+ mpq_offsets(0) = mpq_class(0);
+ ++destination_row;
+
+ for (int i = 1; i < offset_matrix_.rows(); ++i) {
+ // Copy over the first row, i.e. the row which says that all times average
+ // to the distributed time. And then copy over all valid rows.
+ if (valid_matrix_(i)) {
+ mpq_offsets(destination_row) = mpq_class(offset_matrix_(i));
+
+ for (int j = 0; j < map_matrix_.cols(); ++j) {
+ mpq_map(destination_row, j) = map_matrix_(i, j) + slope_matrix_(i, j);
+ if (mpq_map(destination_row, j) != 0) {
+ valid_nodes[j] = true;
+ }
+ }
+
+ ++destination_row;
+ }
+ }
+
+ VLOG(1) << "Filtered map " << ToDouble(mpq_map).format(HeavyFmt);
+ VLOG(1) << "Filtered offsets " << ToDouble(mpq_offsets).format(HeavyFmt);
+
+ // Compute (and cache) the current connectivity. If we have N nodes
+ // configured, but logs only from one of them, we want to assume that the
+ // rest of the nodes match the distributed clock exactly.
+ //
+ // If data shows up later for them, we will CHECK when time jumps.
+ //
+ // TODO(austin): Once we have more info on what cases are reasonable, we can
+ // open up the restrictions.
+ if (valid_matrix_ != last_valid_matrix_) {
+ Eigen::FullPivLU<Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic>>
+ full_piv(mpq_map);
+ const size_t connected_nodes = full_piv.rank();
+
+ size_t valid_node_count = 0;
+ for (size_t i = 0; i < valid_nodes.size(); ++i) {
+ const bool valid_node = valid_nodes[i];
+ if (valid_node) {
+ ++valid_node_count;
+ } else {
+ LOG(WARNING)
+ << "Node "
+ << logged_configuration()->nodes()->Get(i)->name()->string_view()
+ << " has no observations, setting to distributed clock.";
+ }
+ }
+
+ // Confirm that the set of nodes we have connected matches the rank.
+ // Otherwise a<->b and c<->d would count as 4 but really is 3.
+ CHECK_EQ(std::max(static_cast<size_t>(1u), valid_node_count),
+ connected_nodes)
+ << ": Ambiguous nodes.";
+
+ last_valid_matrix_ = valid_matrix_;
+ cached_valid_node_count_ = valid_node_count;
+ }
+
+ // There are 2 cases. Either all the nodes are connected with each other by
+ // actual data, or we have isolated nodes. We want to force the isolated
+ // nodes to match the distributed clock exactly, and to solve for the other
+ // nodes.
+ if (cached_valid_node_count_ == 0) {
+ // Cheat. If there are no valid nodes, the slopes are 1, and offset is 0,
+ // ie, just be the distributed clock.
+ return std::make_tuple(
+ Eigen::Matrix<double, Eigen::Dynamic, 1>::Ones(nodes_count()),
+ Eigen::Matrix<double, Eigen::Dynamic, 1>::Zero(nodes_count()));
+ } else if (cached_valid_node_count_ == nodes_count()) {
+ return Solve(mpq_map, mpq_offsets);
+ } else {
+ // Strip out any columns (nodes) which aren't relevant. Solve the
+ // simplified problem, then set any nodes which were missing back to slope
+ // 1, offset 0 (ie the distributed clock).
+ Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic>
+ valid_node_mpq_map =
+ Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic>::Zero(
+ nonzero_offset_count, cached_valid_node_count_);
+
+ {
+ // Only copy over the columns with valid nodes in them.
+ size_t column = 0;
+ for (size_t i = 0; i < valid_nodes.size(); ++i) {
+ if (valid_nodes[i]) {
+ valid_node_mpq_map.col(column) = mpq_map.col(i);
+
+ ++column;
+ }
+ }
+ // The 1/n needs to be based on the number of nodes being solved.
+ // Recreate it here.
+ for (int j = 0; j < valid_node_mpq_map.cols(); ++j) {
+ valid_node_mpq_map(0, j) = mpq_class(1, cached_valid_node_count_);
+ }
+ }
+
+ VLOG(1) << "Reduced node filtered map "
+ << ToDouble(valid_node_mpq_map).format(HeavyFmt);
+ VLOG(1) << "Reduced node filtered offsets "
+ << ToDouble(mpq_offsets).format(HeavyFmt);
+
+ // Solve the simplified problem now.
+ std::tuple<Eigen::Matrix<double, Eigen::Dynamic, 1>,
+ Eigen::Matrix<double, Eigen::Dynamic, 1>>
+ valid_result = Solve(valid_node_mpq_map, mpq_offsets);
+
+ // And expand the results back into a solution matrix.
+ std::tuple<Eigen::Matrix<double, Eigen::Dynamic, 1>,
+ Eigen::Matrix<double, Eigen::Dynamic, 1>>
+ result = std::make_tuple(
+ Eigen::Matrix<double, Eigen::Dynamic, 1>::Ones(nodes_count()),
+ Eigen::Matrix<double, Eigen::Dynamic, 1>::Zero(nodes_count()));
+
+ {
+ size_t column = 0;
+ for (size_t i = 0; i < valid_nodes.size(); ++i) {
+ if (valid_nodes[i]) {
+ std::get<0>(result)(i) = std::get<0>(valid_result)(column);
+ std::get<1>(result)(i) = std::get<1>(valid_result)(column);
+
+ ++column;
+ }
+ }
+ }
+
+ return result;
+ }
+ } else {
+ const Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic> mpq_map =
+ map_matrix_ + slope_matrix_;
+ VLOG(1) << "map " << (map_matrix_ + slope_matrix_).format(HeavyFmt);
+ VLOG(1) << "offsets " << offset_matrix_.format(HeavyFmt);
+
+ return Solve(mpq_map, offset_matrix_);
+ }
+}
+
+size_t MultiNodeNoncausalOffsetEstimator::ConnectedNodes() {
+ // Rank of the map matrix tells you if all the nodes are in communication
+ // with each other, which tells you if the offsets are observable.
+ return Eigen::FullPivLU<
+ Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic>>(
+ map_matrix_)
+ .rank();
+}
+
+} // namespace message_bridge
+} // namespace aos
diff --git a/aos/network/multinode_timestamp_filter.h b/aos/network/multinode_timestamp_filter.h
new file mode 100644
index 0000000..0954448
--- /dev/null
+++ b/aos/network/multinode_timestamp_filter.h
@@ -0,0 +1,149 @@
+#ifndef AOS_NETWORK_MULTINODE_TIMESTAMP_FILTER_H_
+#define AOS_NETWORK_MULTINODE_TIMESTAMP_FILTER_H_
+
+#include <map>
+#include <string_view>
+
+#include "Eigen/Dense"
+#include "aos/configuration.h"
+#include "aos/events/simulated_event_loop.h"
+#include "aos/network/timestamp_filter.h"
+#include "aos/time/time.h"
+#include "third_party/gmp/gmpxx.h"
+
+namespace aos {
+namespace message_bridge {
+
+// Class to hold a NoncausalOffsetEstimator per pair of communicating nodes, and
+// to estimate and set the overall time of all nodes.
+class MultiNodeNoncausalOffsetEstimator {
+ public:
+ MultiNodeNoncausalOffsetEstimator(
+ SimulatedEventLoopFactory *event_loop_factory)
+ : event_loop_factory_(event_loop_factory) {}
+
+ // Constructs all the matricies. Needs to be called after the log files have
+ // been opened and all the filters have been created.
+ void Initialize(const Configuration *logged_configuration);
+
+ // Returns the filter for a pair of nodes. The same filter will be returned
+ // for a pair of nodes, regardless of argument order.
+ message_bridge::NoncausalOffsetEstimator *GetFilter(const Node *node_a,
+ const Node *node_b);
+
+ // Prints out debug information about the line segments for each node.
+ void LogFit(std::string_view prefix);
+
+ // Captures the start time.
+ void Start(SimulatedEventLoopFactory *factory);
+
+ // Returns [ta; tb; ...] = tuple[0] * t + tuple[1]
+ std::tuple<Eigen::Matrix<double, Eigen::Dynamic, 1>,
+ Eigen::Matrix<double, Eigen::Dynamic, 1>>
+ SolveOffsets();
+
+ // Returns the number of nodes.
+ size_t nodes_count() const {
+ return !configuration::MultiNode(logged_configuration())
+ ? 1u
+ : logged_configuration()->nodes()->size();
+ }
+
+ // Returns the offset from the monotonic clock for a node to the distributed
+ // clock. monotonic = distributed * slope() + offset();
+ double slope(int node_index) const {
+ CHECK_LT(node_index, time_slope_matrix_.rows())
+ << ": Got too high of a node index.";
+ return time_slope_matrix_(node_index);
+ }
+ std::chrono::nanoseconds offset(int node_index) const {
+ CHECK_LT(node_index, time_offset_matrix_.rows())
+ << ": Got too high of a node index.";
+ return std::chrono::duration_cast<std::chrono::nanoseconds>(
+ std::chrono::duration<double>(time_offset_matrix_(node_index)));
+ }
+
+ // Returns the configuration that was logged.
+ const aos::Configuration *logged_configuration() const {
+ return logged_configuration_;
+ }
+
+ // Returns the configuration that we are replaying into.
+ const aos::Configuration *configuration() const {
+ return event_loop_factory_->configuration();
+ }
+
+ // Returns the number of nodes connected to each other.
+ size_t ConnectedNodes();
+
+ // Recomputes the offsets and sets them on event_loop_factory_.
+ void UpdateOffsets();
+
+ private:
+ SimulatedEventLoopFactory *event_loop_factory_;
+ const Configuration *logged_configuration_;
+
+ // List of filters for a connection. The pointer to the first node will be
+ // less than the second node.
+ std::map<std::tuple<const Node *, const Node *>,
+ std::tuple<NoncausalOffsetEstimator>>
+ filters_;
+
+ // We have 2 types of equations to do a least squares regression over to fully
+ // constrain our time function.
+ //
+ // One is simple. The distributed clock is the average of all the clocks.
+ // (ta + tb + tc + td) / num_nodes = t_distributed
+ //
+ // The second is a bit more complicated. Our basic time conversion function
+ // is:
+ // tb = ta + (ta * slope + offset)
+ // We can rewrite this as follows
+ // tb - (1 + slope) * ta = offset
+ //
+ // From here, we have enough equations to solve for t{a,b,c,...} We want to
+ // take as an input the offsets and slope, and solve for the per-node times as
+ // a function of the distributed clock.
+ //
+ // We need to massage our equations to make this work. If we solve for the
+ // per-node times at two set distributed clock times, we will be able to
+ // recreate the linear function (we know it is linear). We can do a similar
+ // thing by breaking our equation up into:
+ //
+ // [1/3 1/3 1/3 ] [ta] [t_distributed]
+ // [ 1 -1-m1 0 ] [tb] = [oab]
+ // [ 1 0 -1-m2 ] [tc] [oac]
+ //
+ // This solves to:
+ //
+ // [ta] [ a00 a01 a02] [t_distributed]
+ // [tb] = [ a10 a11 a12] * [oab]
+ // [tc] [ a20 a21 a22] [oac]
+ //
+ // and can be split into:
+ //
+ // [ta] [ a00 ] [a01 a02]
+ // [tb] = [ a10 ] * t_distributed + [a11 a12] * [oab]
+ // [tc] [ a20 ] [a21 a22] [oac]
+ //
+ // (map_matrix_ + slope_matrix_) * [ta; tb; tc] = [offset_matrix_];
+ // offset_matrix_ will be in nanoseconds.
+ Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic> map_matrix_;
+ Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic> slope_matrix_;
+ Eigen::Matrix<mpq_class, Eigen::Dynamic, 1> offset_matrix_;
+ // Matrix tracking which offsets are valid.
+ Eigen::Matrix<bool, Eigen::Dynamic, 1> valid_matrix_;
+ // Matrix tracking the last valid matrix we used to determine connected nodes.
+ Eigen::Matrix<bool, Eigen::Dynamic, 1> last_valid_matrix_;
+ size_t cached_valid_node_count_ = 0;
+
+ // [ta; tb; tc] = time_slope_matrix_ * t + time_offset_matrix;
+ // t is in seconds.
+ Eigen::Matrix<double, Eigen::Dynamic, 1> time_slope_matrix_;
+ Eigen::Matrix<double, Eigen::Dynamic, 1> time_offset_matrix_;
+};
+
+} // namespace message_bridge
+} // namespace aos
+
+#endif // AOS_NETWORK_MULTINODE_TIMESTAMP_FILTER_H_
diff --git a/aos/network/timestamp_filter.cc b/aos/network/timestamp_filter.cc
index 116607c..bb18095 100644
--- a/aos/network/timestamp_filter.cc
+++ b/aos/network/timestamp_filter.cc
@@ -699,40 +699,72 @@
}
void NoncausalOffsetEstimator::LogFit(std::string_view prefix) {
- LOG(INFO)
- << prefix << " " << node_a_->name()->string_view() << " from "
- << node_b_->name()->string_view() << " slope " << std::setprecision(20)
- << fit_.slope() << " offset " << fit_.offset().count() << " a [("
- << std::get<0>(a_.timestamps()[0]) << " -> "
- << std::get<1>(a_.timestamps()[0]).count() << "ns), ("
- << std::get<0>(a_.timestamps()[1]) << " -> "
- << std::get<1>(a_.timestamps()[1]).count() << "ns) => {dt: " << std::fixed
- << std::setprecision(6)
- << std::chrono::duration<double, std::milli>(
- std::get<0>(a_.timestamps()[1]) - std::get<0>(a_.timestamps()[0]))
- .count()
- << "ms, do: " << std::fixed << std::setprecision(6)
- << std::chrono::duration<double, std::milli>(
- std::get<1>(a_.timestamps()[1]) - std::get<1>(a_.timestamps()[0]))
- .count()
- << "ms}]";
- LOG(INFO)
- << prefix << " " << node_a_->name()->string_view() << " from "
- << node_b_->name()->string_view() << " slope " << std::setprecision(20)
- << fit_.slope() << " offset " << fit_.offset().count() << " b [("
- << std::get<0>(b_.timestamps()[0]) << " -> "
- << std::get<1>(b_.timestamps()[0]).count() << "ns), ("
- << std::get<0>(b_.timestamps()[1]) << " -> "
- << std::get<1>(b_.timestamps()[1]).count() << "ns) => {dt: " << std::fixed
- << std::setprecision(6)
- << std::chrono::duration<double, std::milli>(
- std::get<0>(b_.timestamps()[1]) - std::get<0>(b_.timestamps()[0]))
- .count()
- << "ms, do: " << std::fixed << std::setprecision(6)
- << std::chrono::duration<double, std::milli>(
- std::get<1>(b_.timestamps()[1]) - std::get<1>(b_.timestamps()[0]))
- .count()
- << "ms}]";
+ if (a_.timestamps().size() >= 2u) {
+ LOG(INFO) << prefix << " " << node_a_->name()->string_view() << " from "
+ << node_b_->name()->string_view() << " slope "
+ << std::setprecision(20) << fit_.slope() << " offset "
+ << fit_.offset().count() << " a [("
+ << std::get<0>(a_.timestamps()[0]) << " -> "
+ << std::get<1>(a_.timestamps()[0]).count() << "ns), ("
+ << std::get<0>(a_.timestamps()[1]) << " -> "
+ << std::get<1>(a_.timestamps()[1]).count()
+ << "ns) => {dt: " << std::fixed << std::setprecision(6)
+ << std::chrono::duration<double, std::milli>(
+ std::get<0>(a_.timestamps()[1]) -
+ std::get<0>(a_.timestamps()[0]))
+ .count()
+ << "ms, do: " << std::fixed << std::setprecision(6)
+ << std::chrono::duration<double, std::milli>(
+ std::get<1>(a_.timestamps()[1]) -
+ std::get<1>(a_.timestamps()[0]))
+ .count()
+ << "ms}]";
+ } else if (a_.timestamps().size() == 1u) {
+ LOG(INFO) << prefix << " " << node_a_->name()->string_view() << " from "
+ << node_b_->name()->string_view() << " slope "
+ << std::setprecision(20) << fit_.slope() << " offset "
+ << fit_.offset().count() << " a [("
+ << std::get<0>(a_.timestamps()[0]) << " -> "
+ << std::get<1>(a_.timestamps()[0]).count() << "ns)";
+ } else {
+ LOG(INFO) << prefix << " " << node_a_->name()->string_view() << " from "
+ << node_b_->name()->string_view() << " slope "
+ << std::setprecision(20) << fit_.slope() << " offset "
+ << fit_.offset().count() << " no samples.";
+ }
+ if (b_.timestamps().size() >= 2u) {
+ LOG(INFO) << prefix << " " << node_a_->name()->string_view() << " from "
+ << node_b_->name()->string_view() << " slope "
+ << std::setprecision(20) << fit_.slope() << " offset "
+ << fit_.offset().count() << " b [("
+ << std::get<0>(b_.timestamps()[0]) << " -> "
+ << std::get<1>(b_.timestamps()[0]).count() << "ns), ("
+ << std::get<0>(b_.timestamps()[1]) << " -> "
+ << std::get<1>(b_.timestamps()[1]).count()
+ << "ns) => {dt: " << std::fixed << std::setprecision(6)
+ << std::chrono::duration<double, std::milli>(
+ std::get<0>(b_.timestamps()[1]) -
+ std::get<0>(b_.timestamps()[0]))
+ .count()
+ << "ms, do: " << std::fixed << std::setprecision(6)
+ << std::chrono::duration<double, std::milli>(
+ std::get<1>(b_.timestamps()[1]) -
+ std::get<1>(b_.timestamps()[0]))
+ .count()
+ << "ms}]";
+ } else if (b_.timestamps().size() == 1u) {
+ LOG(INFO) << prefix << " " << node_b_->name()->string_view() << " from "
+ << node_a_->name()->string_view() << " slope "
+ << std::setprecision(20) << fit_.slope() << " offset "
+ << fit_.offset().count() << " b [("
+ << std::get<0>(b_.timestamps()[0]) << " -> "
+ << std::get<1>(b_.timestamps()[0]).count() << "ns)";
+ } else {
+ LOG(INFO) << prefix << " " << node_b_->name()->string_view() << " from "
+ << node_a_->name()->string_view() << " slope "
+ << std::setprecision(20) << fit_.slope() << " offset "
+ << fit_.offset().count() << " no samples.";
+ }
}
void NoncausalOffsetEstimator::Refit() {