Handle log files with only one direction of data
We have seen some log files where A -> B works, but B -> A hasn't
started up yet when interesting things are happening in the log. There
is enough information to order events causally between nodes, though not
well. So, use this information to fit what we have.
This doesn't yet handle transitioning between not having both directions
and having both directions. Both of those behaviors should result in a
jump in time and therefore a crash.
Change-Id: I684c1b61eb18efe2caaedc0e1e86f147c1e30f70
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index b16ea6f..24c0427 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -1424,8 +1424,6 @@
// which involves time before changing it. That especially includes
// sending the message.
if (update_time) {
- filters_->LogFit("");
-
VLOG(1) << MaybeNodeName(state->event_loop()->node())
<< "updating offsets";
@@ -1436,20 +1434,10 @@
return state->monotonic_now();
});
- for (size_t i = 0; i < states_.size(); ++i) {
- VLOG(1) << MaybeNodeName(states_[i]->event_loop()->node()) << "before "
- << states_[i]->monotonic_now();
- }
-
UpdateOffsets();
VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Now is now "
<< state->monotonic_now();
- for (size_t i = 0; i < states_.size(); ++i) {
- VLOG(1) << MaybeNodeName(states_[i]->event_loop()->node()) << "after "
- << states_[i]->monotonic_now();
- }
-
// TODO(austin): We should be perfect.
const std::chrono::nanoseconds kTolerance{3};
if (!FLAGS_skip_order_validation) {
@@ -1462,7 +1450,7 @@
<< MaybeNodeName(states_[i]->event_loop()->node());
CHECK_LE(states_[i]->monotonic_now(), before_times[i] + kTolerance)
<< ": Time changed too much on node "
- << states_[i]->event_loop()->node()->name()->string_view();
+ << MaybeNodeName(states_[i]->event_loop()->node());
}
} else {
if (next_time < state->monotonic_now()) {
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 3f4893e..315a272 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -415,6 +415,13 @@
logfile_base_ +
"_pi2_data/pi2/aos/aos.message_bridge.Timestamp.part2.bfbs"}),
logfiles_(MakeLogFiles(logfile_base_)),
+ pi1_single_direction_logfiles_(
+ {logfile_base_ + "_pi1_data.part0.bfbs",
+ logfile_base_ + "_pi2_data/test/aos.examples.Pong.part0.bfbs",
+ logfile_base_ + "_timestamps/pi1/aos/remote_timestamps/pi2/"
+ "aos.message_bridge.RemoteMessage.part0.bfbs",
+ logfile_base_ +
+ "_pi2_data/pi2/aos/aos.message_bridge.Timestamp.part0.bfbs"}),
structured_logfiles_{
std::vector<std::string>{logfiles_[0]},
std::vector<std::string>{logfiles_[1], logfiles_[2]},
@@ -549,6 +556,17 @@
EXPECT_THAT(sorted_parts[1].corrupted, ::testing::Eq(corrupted_parts));
}
+ void ConfirmReadable(const std::vector<std::string> &files) {
+ LogReader reader(SortParts(files));
+
+ SimulatedEventLoopFactory log_reader_factory(reader.configuration());
+ reader.Register(&log_reader_factory);
+
+ log_reader_factory.Run();
+
+ reader.Deregister();
+ }
+
void AddExtension(std::string_view extension) {
std::transform(logfiles_.begin(), logfiles_.end(), logfiles_.begin(),
[extension](const std::string &in) {
@@ -577,6 +595,7 @@
std::string logfile_base_;
std::vector<std::string> pi1_reboot_logfiles_;
std::vector<std::string> logfiles_;
+ std::vector<std::string> pi1_single_direction_logfiles_;
std::vector<std::vector<std::string>> structured_logfiles_;
@@ -1727,6 +1746,48 @@
pi2_boot1, pi2_boot2, pi2_boot2, pi2_boot1));
}
+// Tests that we properly handle one direction of message_bridge being
+// unavailable.
+TEST_F(MultinodeLoggerTest, OneDirectionWithNegativeSlope) {
+ event_loop_factory_.GetNodeEventLoopFactory(pi1_)->Disconnect(pi2_);
+ event_loop_factory_.GetNodeEventLoopFactory(pi2_)->SetDistributedOffset(
+ chrono::seconds(1000), 0.99999);
+ {
+ LoggerState pi1_logger = MakeLogger(pi1_);
+
+ event_loop_factory_.RunFor(chrono::milliseconds(95));
+
+ StartLogger(&pi1_logger);
+
+ event_loop_factory_.RunFor(chrono::milliseconds(10000));
+ }
+
+ // Confirm that we can parse the result. LogReader has enough internal CHECKs
+ // to confirm the right thing happened.
+ ConfirmReadable(pi1_single_direction_logfiles_);
+}
+
+// Tests that we properly handle one direction of message_bridge being
+// unavailable.
+TEST_F(MultinodeLoggerTest, OneDirectionWithPositiveSlope) {
+ event_loop_factory_.GetNodeEventLoopFactory(pi1_)->Disconnect(pi2_);
+ event_loop_factory_.GetNodeEventLoopFactory(pi2_)->SetDistributedOffset(
+ chrono::seconds(500), 1.00001);
+ {
+ LoggerState pi1_logger = MakeLogger(pi1_);
+
+ event_loop_factory_.RunFor(chrono::milliseconds(95));
+
+ StartLogger(&pi1_logger);
+
+ event_loop_factory_.RunFor(chrono::milliseconds(10000));
+ }
+
+ // Confirm that we can parse the result. LogReader has enough internal CHECKs
+ // to confirm the right thing happened.
+ ConfirmReadable(pi1_single_direction_logfiles_);
+}
+
} // namespace testing
} // namespace logger
} // namespace aos
diff --git a/aos/network/multinode_timestamp_filter.cc b/aos/network/multinode_timestamp_filter.cc
index 5579b9a..4f5e0e5 100644
--- a/aos/network/multinode_timestamp_filter.cc
+++ b/aos/network/multinode_timestamp_filter.cc
@@ -39,7 +39,7 @@
(mpq_map.transpose() * mpq_map).inverse() * mpq_map.transpose();
aos::monotonic_clock::time_point end_time = aos::monotonic_clock::now();
- VLOG(1) << "Took "
+ VLOG(3) << "Took "
<< std::chrono::duration<double>(end_time - start_time).count()
<< " seconds to invert";
@@ -215,13 +215,17 @@
}
void MultiNodeNoncausalOffsetEstimator::UpdateOffsets() {
- VLOG(2) << "Samples are " << offset_matrix_;
- VLOG(2) << "Map is " << (map_matrix_ + slope_matrix_);
+ for (size_t node_index = 0; node_index < nodes_count(); ++node_index) {
+ const Node *node = configuration::GetNode(configuration(), node_index);
+ VLOG(1)
+ << node->name()->string_view() << " before "
+ << event_loop_factory_->GetNodeEventLoopFactory(node)->monotonic_now();
+ }
+ VLOG(1) << "Distributed " << event_loop_factory_->distributed_now();
+
std::tie(time_slope_matrix_, time_offset_matrix_) = SolveOffsets();
Eigen::IOFormat HeavyFmt(Eigen::FullPrecision, 0, ", ", ";\n", "[", "]", "[",
"]");
- VLOG(1) << "First slope " << time_slope_matrix_.transpose().format(HeavyFmt)
- << " offset " << time_offset_matrix_.transpose().format(HeavyFmt);
for (size_t node_index = 0; node_index < nodes_count(); ++node_index) {
const Node *node = configuration::GetNode(configuration(), node_index);
@@ -242,6 +246,13 @@
// deviate from horizontal again.
filter.second.Freeze();
}
+
+ for (size_t node_index = 0; node_index < nodes_count(); ++node_index) {
+ const Node *node = configuration::GetNode(configuration(), node_index);
+ VLOG(1)
+ << node->name()->string_view() << " after "
+ << event_loop_factory_->GetNodeEventLoopFactory(node)->monotonic_now();
+ }
}
void MultiNodeNoncausalOffsetEstimator::Initialize(
@@ -498,8 +509,8 @@
} else {
const Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic> mpq_map =
map_matrix_ + slope_matrix_;
- VLOG(1) << "map " << (map_matrix_ + slope_matrix_).format(HeavyFmt);
- VLOG(1) << "offsets " << offset_matrix_.format(HeavyFmt);
+ VLOG(2) << "map " << ToDouble(map_matrix_ + slope_matrix_).format(HeavyFmt);
+ VLOG(2) << "offsets " << ToDouble(offset_matrix_).format(HeavyFmt);
return Solve(mpq_map, offset_matrix_);
}
diff --git a/aos/network/timestamp_filter.cc b/aos/network/timestamp_filter.cc
index edb20a1..f9e63a3 100644
--- a/aos/network/timestamp_filter.cc
+++ b/aos/network/timestamp_filter.cc
@@ -492,6 +492,32 @@
return f;
}
+Line Invert(Line fb) {
+ // ta = Ob(tb) + tb
+ // tb = Oa(ta) + ta
+ // Ob(tb) = mb * tb + bb
+ // Oa(ta) = ma * ta + ba
+ //
+ // ta = mb * tb + tb + bb
+ // ta = (mb + 1) * tb + bb
+ // 1 / (mb + 1) ta - bb / (mb + 1) = tb
+ // ta + (-1 + 1 / (mb + 1)) ta - bb / (mb + 1) = tb
+ // ta + ((-mb - 1) / (mb + 1) + 1 / (mb + 1)) ta - bb / (mb + 1) = tb
+ // ta + -mb / (mb + 1) ta - bb / (mb + 1) = tb
+ //
+ // ma = -mb / (mb + 1)
+ // ba = -bb / (mb + 1)
+
+ mpq_class denom = (mpq_class(1) + fb.mpq_slope());
+ mpq_class ma = -fb.mpq_slope() / denom;
+ ma.canonicalize();
+
+ mpq_class ba = -fb.mpq_offset() / denom;
+
+ Line f(ba, ma);
+ return f;
+}
+
NoncausalTimestampFilter::~NoncausalTimestampFilter() {
// Destroy the filter by popping until empty. This will trigger any
// timestamps to be written to the files.
@@ -779,8 +805,8 @@
}
if (b_timestamps.size() >= 2u) {
LOG(INFO)
- << prefix << " " << node_a_->name()->string_view() << " from "
- << node_b_->name()->string_view() << " slope " << std::setprecision(20)
+ << prefix << " " << node_b_->name()->string_view() << " from "
+ << node_a_->name()->string_view() << " slope " << std::setprecision(20)
<< fit_.slope() << " offset " << fit_.offset().count() << " b [("
<< std::get<0>(b_timestamps[0]) << " -> "
<< std::get<1>(b_timestamps[0]).count() << "ns), ("
@@ -811,17 +837,33 @@
}
void NoncausalOffsetEstimator::Refit() {
- if (a_timestamps_size() == 0 || b_timestamps_size() == 0) {
+ if (a_timestamps_size() == 0 && b_timestamps_size() == 0) {
VLOG(1) << "Not fitting because there is no data";
return;
}
- fit_ = AverageFits(a_.FitLine(), b_.FitLine());
+
+ // If we only have one side of the timestamp estimation, we will be on the
+ // ragged edge of non-causal. Events will traverse the network in "0 ns".
+ // Combined with rounding errors, this causes sorting to not work. Assume
+ // some amount of network delay.
+ constexpr int kSmidgeOfTimeNs = 10;
+
+ if (a_timestamps_size() == 0) {
+ fit_ = Invert(b_.FitLine());
+ fit_.increment_mpq_offset(-mpq_class(kSmidgeOfTimeNs));
+ } else if (b_timestamps_size() == 0) {
+ fit_ = a_.FitLine();
+ fit_.increment_mpq_offset(-mpq_class(kSmidgeOfTimeNs));
+ } else {
+ fit_ = AverageFits(a_.FitLine(), b_.FitLine());
+ }
+
if (offset_pointer_) {
- VLOG(1) << "Setting offset to " << fit_.mpq_offset();
+ VLOG(2) << " Setting offset to " << fit_.mpq_offset();
*offset_pointer_ = fit_.mpq_offset();
}
if (slope_pointer_) {
- VLOG(1) << "Setting slope to " << fit_.mpq_slope();
+ VLOG(2) << " Setting slope to " << fit_.mpq_slope();
*slope_pointer_ = -fit_.mpq_slope();
}
if (valid_pointer_) {
diff --git a/aos/network/timestamp_filter.h b/aos/network/timestamp_filter.h
index fa52a6f..e9f2446 100644
--- a/aos/network/timestamp_filter.h
+++ b/aos/network/timestamp_filter.h
@@ -260,6 +260,7 @@
// Returns the full precision slopes and offsets.
mpq_class mpq_offset() const { return offset_; }
mpq_class mpq_slope() const { return slope_; }
+ void increment_mpq_offset(mpq_class increment) { offset_ += increment; }
// Returns the rounded offsets and slopes.
std::chrono::nanoseconds offset() const {
@@ -306,6 +307,18 @@
// tb = O(ta) + ta
Line AverageFits(Line fa, Line fb);
+// Inverts an offset line
+//
+// Oa(ta) = fa.slope * ta + fa.offset;
+// tb = Oa(ta) + ta;
+// Ob(tb) = fb.slope * tb + fb.offset;
+// ta = Ob(tb) + tb;
+//
+// This takes a line in the form ta = Ob(tb) + tb,
+// and returns one in the form tb = Oa(ta) + ta. This is a pure algebreic
+// reshuffling of terms.
+Line Invert(Line fb);
+
// This class implements a noncausal timestamp filter. It tracks the maximum
// points while enforcing both a maximum positive and negative slope constraint.
// It does this by building up a buffer of samples, and removing any samples
diff --git a/aos/network/timestamp_filter_test.cc b/aos/network/timestamp_filter_test.cc
index 9cff160..d7d31ae 100644
--- a/aos/network/timestamp_filter_test.cc
+++ b/aos/network/timestamp_filter_test.cc
@@ -162,6 +162,26 @@
}
}
+// Tests that the Invert function returns sane results.
+TEST(LineTest, Invert) {
+ const monotonic_clock::time_point ta(chrono::nanoseconds(1000000000));
+ const monotonic_clock::time_point tb(chrono::nanoseconds(2001000000));
+
+ // Double inversion should get us back where we started. Make sure there are
+ // enough digits to catch rounding problems.
+ Line l1(mpq_class(1000000000), mpq_class(1, 1000));
+ Line l2 = Invert(l1);
+ Line l1_again = Invert(l2);
+
+ // Confirm we can convert time back and forth as expected.
+ EXPECT_EQ(l1.Eval(ta) + ta, tb);
+ EXPECT_EQ(l2.Eval(tb) + tb, ta);
+
+ // And we got back our original line.
+ EXPECT_EQ(l1.mpq_slope(), l1_again.mpq_slope());
+ EXPECT_EQ(l1.mpq_offset(), l1_again.mpq_offset());
+}
+
// Tests that 2 samples results in the correct line between them, and the
// correct intermediate as it is being built.
TEST(NoncausalTimestampFilterTest, SingleSample) {