Buffer using --time_estimation_buffer_seconds in multinode_timestamp_filter
When time is having trouble sorting, we can sometimes throw out old
times too fast. When we are doing this, we crank up
--time_estimation_buffer_seconds to compensate, so tie the buffer to
this quantity as well.
Change-Id: I05004b9dc956bc8f207ecc1564736d87a286be15
diff --git a/aos/events/logging/log_cat.cc b/aos/events/logging/log_cat.cc
index 1168e6f..f9da914 100644
--- a/aos/events/logging/log_cat.cc
+++ b/aos/events/logging/log_cat.cc
@@ -132,6 +132,10 @@
const std::vector<aos::logger::LogFile> logfiles =
aos::logger::SortParts(unsorted_logfiles);
+ for (auto &it : logfiles) {
+ VLOG(1) << it;
+ }
+
aos::logger::LogReader reader(logfiles);
aos::FastStringBuilder builder;
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index 7e822de..99c853e 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -1111,7 +1111,9 @@
filters_ =
std::make_unique<message_bridge::MultiNodeNoncausalOffsetEstimator>(
event_loop_factory_, logged_configuration(),
- FLAGS_skip_order_validation);
+ FLAGS_skip_order_validation,
+ chrono::duration_cast<chrono::nanoseconds>(
+ chrono::duration<double>(FLAGS_time_estimation_buffer_seconds)));
for (const Node *node : configuration::GetNodes(configuration())) {
const size_t node_index =
diff --git a/aos/network/multinode_timestamp_filter.cc b/aos/network/multinode_timestamp_filter.cc
index c338bf3..1d90792 100644
--- a/aos/network/multinode_timestamp_filter.cc
+++ b/aos/network/multinode_timestamp_filter.cc
@@ -262,9 +262,10 @@
CHECK(!times_.empty())
<< ": Found no times to do timestamp estimation, please investigate.";
- // Keep at least 50 points and 10 seconds of time.
- while (times_.size() > 50 &&
- std::get<0>(times_.front()) + chrono::seconds(10) <
+ // Keep at least 500 points and time_estimation_buffer_seconds seconds of
+ // time. This should be enough to handle any reasonable amount of history.
+ while (times_.size() > kHistoryMinCount &&
+ std::get<0>(times_.front()) + time_estimation_buffer_seconds_ <
std::get<0>(times_.back())) {
times_.pop_front();
have_popped_ = true;
@@ -420,10 +421,12 @@
}
MultiNodeNoncausalOffsetEstimator::MultiNodeNoncausalOffsetEstimator(
SimulatedEventLoopFactory *event_loop_factory,
- const Configuration *logged_configuration, bool skip_order_validation)
+ const Configuration *logged_configuration, bool skip_order_validation,
+ chrono::nanoseconds time_estimation_buffer_seconds)
: InterpolatedTimeConverter(!configuration::MultiNode(logged_configuration)
? 1u
- : logged_configuration->nodes()->size()),
+ : logged_configuration->nodes()->size(),
+ time_estimation_buffer_seconds),
event_loop_factory_(event_loop_factory),
logged_configuration_(logged_configuration),
skip_order_validation_(skip_order_validation) {
diff --git a/aos/network/multinode_timestamp_filter.h b/aos/network/multinode_timestamp_filter.h
index 5e4c690..f834c79 100644
--- a/aos/network/multinode_timestamp_filter.h
+++ b/aos/network/multinode_timestamp_filter.h
@@ -177,7 +177,16 @@
// multiple nodes using a list of points and interpolation.
class InterpolatedTimeConverter : public TimeConverter {
public:
- InterpolatedTimeConverter(size_t node_count) : node_count_(node_count) {}
+ static constexpr std::chrono::nanoseconds kDefaultHistoryDuration{
+ 10000000000};
+ static constexpr int kHistoryMinCount{500};
+
+ InterpolatedTimeConverter(
+ size_t node_count,
+ std::chrono::nanoseconds time_estimation_buffer_seconds =
+ kDefaultHistoryDuration)
+ : node_count_(node_count),
+ time_estimation_buffer_seconds_(time_estimation_buffer_seconds) {}
virtual ~InterpolatedTimeConverter() {}
@@ -221,6 +230,12 @@
// unknown.
bool have_popped_ = false;
+ // The amount of time to buffer when estimating. We care so we don't throw
+ // data out of our queue too soon. This time is indicative of how much to
+ // buffer everywhere, so let's latch onto it as well until proven that there
+ // is a different metric.
+ const std::chrono::nanoseconds time_estimation_buffer_seconds_;
+
protected:
// If true, NextTimestamp returned nothing, so don't bother checking again.
// (This also enforces that we don't find more time data after calling it
@@ -269,7 +284,8 @@
public:
MultiNodeNoncausalOffsetEstimator(
SimulatedEventLoopFactory *event_loop_factory,
- const Configuration *logged_configuration, bool skip_order_validation);
+ const Configuration *logged_configuration, bool skip_order_validation,
+ std::chrono::nanoseconds time_estimation_buffer_seconds);
~MultiNodeNoncausalOffsetEstimator() override;
diff --git a/aos/network/multinode_timestamp_filter_test.cc b/aos/network/multinode_timestamp_filter_test.cc
index 524ef19..f8f14e5 100644
--- a/aos/network/multinode_timestamp_filter_test.cc
+++ b/aos/network/multinode_timestamp_filter_test.cc
@@ -351,50 +351,54 @@
const distributed_clock::time_point de = distributed_clock::epoch();
const monotonic_clock::time_point me = monotonic_clock::epoch();
+ constexpr auto kDefaultHistoryDuration =
+ InterpolatedTimeConverter::kDefaultHistoryDuration;
+ constexpr chrono::nanoseconds kDt =
+ kDefaultHistoryDuration /
+ (InterpolatedTimeConverter::kHistoryMinCount * 2);
+
TestingTimeConverter time_converter(3u);
time_converter.StartEqual();
// Test that 2 timestamps interpolate correctly.
- for (int i = 0; i < 200; ++i) {
- time_converter.AddMonotonic({chrono::milliseconds(100),
- chrono::milliseconds(100),
- chrono::milliseconds(100)});
+ for (int i = 0; i < InterpolatedTimeConverter::kHistoryMinCount * 4; ++i) {
+ time_converter.AddMonotonic({kDt, kDt, kDt});
}
// Force 5 seconds to be read.
EXPECT_EQ(
- de + chrono::milliseconds(5000),
- time_converter.ToDistributedClock(0, me + chrono::milliseconds(5000)));
+ de + kDefaultHistoryDuration / 2,
+ time_converter.ToDistributedClock(0, me + kDefaultHistoryDuration / 2));
EXPECT_EQ(
- me + chrono::milliseconds(5000),
- time_converter.FromDistributedClock(0, de + chrono::milliseconds(5000)));
+ me + kDefaultHistoryDuration / 2,
+ time_converter.FromDistributedClock(0, de + kDefaultHistoryDuration / 2));
// Double check we can read things from before the start
EXPECT_EQ(
- de - chrono::milliseconds(100),
- time_converter.ToDistributedClock(0, me - chrono::milliseconds(100)));
+ de - kDt,
+ time_converter.ToDistributedClock(0, me - kDt));
EXPECT_EQ(
- me - chrono::milliseconds(100),
- time_converter.FromDistributedClock(0, de - chrono::milliseconds(100)));
+ me - kDt,
+ time_converter.FromDistributedClock(0, de - kDt));
// And at and after the origin.
EXPECT_EQ(de, time_converter.ToDistributedClock(0, me));
EXPECT_EQ(me, time_converter.FromDistributedClock(0, de));
EXPECT_EQ(
- de + chrono::milliseconds(100),
- time_converter.ToDistributedClock(0, me + chrono::milliseconds(100)));
+ de + chrono::milliseconds(10),
+ time_converter.ToDistributedClock(0, me + kDt));
EXPECT_EQ(
- me + chrono::milliseconds(100),
- time_converter.FromDistributedClock(0, de + chrono::milliseconds(100)));
+ me + chrono::milliseconds(10),
+ time_converter.FromDistributedClock(0, de + kDt));
// Force 10.1 seconds now. This will forget the 0th point at the origin.
EXPECT_EQ(
- de + chrono::milliseconds(10100),
- time_converter.ToDistributedClock(0, me + chrono::milliseconds(10100)));
+ de + kDefaultHistoryDuration + kDt,
+ time_converter.ToDistributedClock(0, me + kDefaultHistoryDuration + kDt));
EXPECT_EQ(
- me + chrono::milliseconds(10100),
- time_converter.FromDistributedClock(0, de + chrono::milliseconds(10100)));
+ me + kDefaultHistoryDuration + kDt,
+ time_converter.FromDistributedClock(0, de + kDefaultHistoryDuration + kDt));
// Yup, can't read the origin anymore.
EXPECT_DEATH({ LOG(INFO) << time_converter.ToDistributedClock(0, me); },
@@ -404,11 +408,11 @@
// But can still read the next point.
EXPECT_EQ(
- de + chrono::milliseconds(100),
- time_converter.ToDistributedClock(0, me + chrono::milliseconds(100)));
+ de + kDt,
+ time_converter.ToDistributedClock(0, me + kDt));
EXPECT_EQ(
- me + chrono::milliseconds(100),
- time_converter.FromDistributedClock(0, de + chrono::milliseconds(100)));
+ me + kDt,
+ time_converter.FromDistributedClock(0, de + kDt));
}
// Tests unity time with 1 node.