Buffer using --time_estimation_buffer_seconds in multinode_timestamp_filter

When time is having trouble sorting, we can sometimes throw out old
times too fast. When we are doing this, we crank up
--time_estimation_buffer_seconds to compensate, so tie the buffer to
this quantity as well.

Change-Id: I05004b9dc956bc8f207ecc1564736d87a286be15
diff --git a/aos/events/logging/log_cat.cc b/aos/events/logging/log_cat.cc
index 1168e6f..f9da914 100644
--- a/aos/events/logging/log_cat.cc
+++ b/aos/events/logging/log_cat.cc
@@ -132,6 +132,10 @@
   const std::vector<aos::logger::LogFile> logfiles =
       aos::logger::SortParts(unsorted_logfiles);
 
+  for (auto &it : logfiles) {
+    VLOG(1) << it;
+  }
+
   aos::logger::LogReader reader(logfiles);
 
   aos::FastStringBuilder builder;
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index 7e822de..99c853e 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -1111,7 +1111,9 @@
   filters_ =
       std::make_unique<message_bridge::MultiNodeNoncausalOffsetEstimator>(
           event_loop_factory_, logged_configuration(),
-          FLAGS_skip_order_validation);
+          FLAGS_skip_order_validation,
+          chrono::duration_cast<chrono::nanoseconds>(
+              chrono::duration<double>(FLAGS_time_estimation_buffer_seconds)));
 
   for (const Node *node : configuration::GetNodes(configuration())) {
     const size_t node_index =
diff --git a/aos/network/multinode_timestamp_filter.cc b/aos/network/multinode_timestamp_filter.cc
index c338bf3..1d90792 100644
--- a/aos/network/multinode_timestamp_filter.cc
+++ b/aos/network/multinode_timestamp_filter.cc
@@ -262,9 +262,10 @@
 
   CHECK(!times_.empty())
       << ": Found no times to do timestamp estimation, please investigate.";
-  // Keep at least 50 points and 10 seconds of time.
-  while (times_.size() > 50 &&
-         std::get<0>(times_.front()) + chrono::seconds(10) <
+  // Keep at least 500 points and time_estimation_buffer_seconds seconds of
+  // time.  This should be enough to handle any reasonable amount of history.
+  while (times_.size() > kHistoryMinCount &&
+         std::get<0>(times_.front()) + time_estimation_buffer_seconds_ <
              std::get<0>(times_.back())) {
     times_.pop_front();
     have_popped_ = true;
@@ -420,10 +421,12 @@
 }
 MultiNodeNoncausalOffsetEstimator::MultiNodeNoncausalOffsetEstimator(
     SimulatedEventLoopFactory *event_loop_factory,
-    const Configuration *logged_configuration, bool skip_order_validation)
+    const Configuration *logged_configuration, bool skip_order_validation,
+    chrono::nanoseconds time_estimation_buffer_seconds)
     : InterpolatedTimeConverter(!configuration::MultiNode(logged_configuration)
                                     ? 1u
-                                    : logged_configuration->nodes()->size()),
+                                    : logged_configuration->nodes()->size(),
+                                time_estimation_buffer_seconds),
       event_loop_factory_(event_loop_factory),
       logged_configuration_(logged_configuration),
       skip_order_validation_(skip_order_validation) {
diff --git a/aos/network/multinode_timestamp_filter.h b/aos/network/multinode_timestamp_filter.h
index 5e4c690..f834c79 100644
--- a/aos/network/multinode_timestamp_filter.h
+++ b/aos/network/multinode_timestamp_filter.h
@@ -177,7 +177,16 @@
 // multiple nodes using a list of points and interpolation.
 class InterpolatedTimeConverter : public TimeConverter {
  public:
-  InterpolatedTimeConverter(size_t node_count) : node_count_(node_count) {}
+  static constexpr std::chrono::nanoseconds kDefaultHistoryDuration{
+      10000000000};
+  static constexpr int kHistoryMinCount{500};
+
+  InterpolatedTimeConverter(
+      size_t node_count,
+      std::chrono::nanoseconds time_estimation_buffer_seconds =
+          kDefaultHistoryDuration)
+      : node_count_(node_count),
+        time_estimation_buffer_seconds_(time_estimation_buffer_seconds) {}
 
   virtual ~InterpolatedTimeConverter() {}
 
@@ -221,6 +230,12 @@
   // unknown.
   bool have_popped_ = false;
 
+  // The amount of time to buffer when estimating.  We care so we don't throw
+  // data out of our queue too soon.  This time is indicative of how much to
+  // buffer everywhere, so let's latch onto it as well until proven that there
+  // is a different metric.
+  const std::chrono::nanoseconds time_estimation_buffer_seconds_;
+
  protected:
   // If true, NextTimestamp returned nothing, so don't bother checking again.
   // (This also enforces that we don't find more time data after calling it
@@ -269,7 +284,8 @@
  public:
   MultiNodeNoncausalOffsetEstimator(
       SimulatedEventLoopFactory *event_loop_factory,
-      const Configuration *logged_configuration, bool skip_order_validation);
+      const Configuration *logged_configuration, bool skip_order_validation,
+      std::chrono::nanoseconds time_estimation_buffer_seconds);
 
   ~MultiNodeNoncausalOffsetEstimator() override;
 
diff --git a/aos/network/multinode_timestamp_filter_test.cc b/aos/network/multinode_timestamp_filter_test.cc
index 524ef19..f8f14e5 100644
--- a/aos/network/multinode_timestamp_filter_test.cc
+++ b/aos/network/multinode_timestamp_filter_test.cc
@@ -351,50 +351,54 @@
   const distributed_clock::time_point de = distributed_clock::epoch();
   const monotonic_clock::time_point me = monotonic_clock::epoch();
 
+  constexpr auto kDefaultHistoryDuration =
+      InterpolatedTimeConverter::kDefaultHistoryDuration;
+  constexpr chrono::nanoseconds kDt =
+      kDefaultHistoryDuration /
+      (InterpolatedTimeConverter::kHistoryMinCount * 2);
+
   TestingTimeConverter time_converter(3u);
   time_converter.StartEqual();
 
   // Test that 2 timestamps interpolate correctly.
-  for (int i = 0; i < 200; ++i) {
-    time_converter.AddMonotonic({chrono::milliseconds(100),
-                                 chrono::milliseconds(100),
-                                 chrono::milliseconds(100)});
+  for (int i = 0; i < InterpolatedTimeConverter::kHistoryMinCount * 4; ++i) {
+    time_converter.AddMonotonic({kDt, kDt, kDt});
   }
 
   // Force 5 seconds to be read.
   EXPECT_EQ(
-      de + chrono::milliseconds(5000),
-      time_converter.ToDistributedClock(0, me + chrono::milliseconds(5000)));
+      de + kDefaultHistoryDuration / 2,
+      time_converter.ToDistributedClock(0, me + kDefaultHistoryDuration / 2));
   EXPECT_EQ(
-      me + chrono::milliseconds(5000),
-      time_converter.FromDistributedClock(0, de + chrono::milliseconds(5000)));
+      me + kDefaultHistoryDuration / 2,
+      time_converter.FromDistributedClock(0, de + kDefaultHistoryDuration / 2));
 
   // Double check we can read things from before the start
   EXPECT_EQ(
-      de - chrono::milliseconds(100),
-      time_converter.ToDistributedClock(0, me - chrono::milliseconds(100)));
+      de - kDt,
+      time_converter.ToDistributedClock(0, me - kDt));
   EXPECT_EQ(
-      me - chrono::milliseconds(100),
-      time_converter.FromDistributedClock(0, de - chrono::milliseconds(100)));
+      me - kDt,
+      time_converter.FromDistributedClock(0, de - kDt));
 
   // And at and after the origin.
   EXPECT_EQ(de, time_converter.ToDistributedClock(0, me));
   EXPECT_EQ(me, time_converter.FromDistributedClock(0, de));
 
   EXPECT_EQ(
-      de + chrono::milliseconds(100),
-      time_converter.ToDistributedClock(0, me + chrono::milliseconds(100)));
+      de + chrono::milliseconds(10),
+      time_converter.ToDistributedClock(0, me + kDt));
   EXPECT_EQ(
-      me + chrono::milliseconds(100),
-      time_converter.FromDistributedClock(0, de + chrono::milliseconds(100)));
+      me + chrono::milliseconds(10),
+      time_converter.FromDistributedClock(0, de + kDt));
 
   // Force 10.1 seconds now.  This will forget the 0th point at the origin.
   EXPECT_EQ(
-      de + chrono::milliseconds(10100),
-      time_converter.ToDistributedClock(0, me + chrono::milliseconds(10100)));
+      de + kDefaultHistoryDuration + kDt,
+      time_converter.ToDistributedClock(0, me + kDefaultHistoryDuration + kDt));
   EXPECT_EQ(
-      me + chrono::milliseconds(10100),
-      time_converter.FromDistributedClock(0, de + chrono::milliseconds(10100)));
+      me + kDefaultHistoryDuration + kDt,
+      time_converter.FromDistributedClock(0, de + kDefaultHistoryDuration + kDt));
 
   // Yup, can't read the origin anymore.
   EXPECT_DEATH({ LOG(INFO) << time_converter.ToDistributedClock(0, me); },
@@ -404,11 +408,11 @@
 
   // But can still read the next point.
   EXPECT_EQ(
-      de + chrono::milliseconds(100),
-      time_converter.ToDistributedClock(0, me + chrono::milliseconds(100)));
+      de + kDt,
+      time_converter.ToDistributedClock(0, me + kDt));
   EXPECT_EQ(
-      me + chrono::milliseconds(100),
-      time_converter.FromDistributedClock(0, de + chrono::milliseconds(100)));
+      me + kDt,
+      time_converter.FromDistributedClock(0, de + kDt));
 }
 
 // Tests unity time with 1 node.