Handle finishing a log file more gracefully

Once one file ended, we were spinning until the end and popping
continually.  This was resulting in us removing all the intermediate
data, and then adding it back in with the reverse timestamps.

There are 2 parts to the fix.  1 is detection and the other is fixing
the issue.

1) Instead of tracking a frozen bool, track the frozen time.  This more
   accurately captures what is going on, and handles things being popped
   off the front.  Also, make sure Validate won't certify an answer
   which needs data we popped.

2) When we are spinning to the end of the log, don't forget the
   timestamps yet.  We need those for solving the problem still.
   The current strategy likely keeps too much data at the end, but that
   sure beats keeping too little.

Future improvements should consider managing all of this inside
multinode timestamp filter so we aren't dependent on the logger getting
it right.

Change-Id: I323eff3e19c3b67258835b50a339939360de0444
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index e10d911..272250e 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -2027,6 +2027,8 @@
     CHECK_NOTNULL(timer_);
     timer_->Setup(remote_timestamps_.front().monotonic_timestamp_time);
     scheduled_time_ = remote_timestamps_.front().monotonic_timestamp_time;
+    CHECK_GE(scheduled_time_, event_loop_->monotonic_now())
+        << event_loop_->node()->name()->string_view();
   }
 }
 
@@ -2056,7 +2058,8 @@
 }
 
 void LogReader::RemoteMessageSender::SendTimestamp() {
-  CHECK_EQ(event_loop_->context().monotonic_event_time, scheduled_time_);
+  CHECK_EQ(event_loop_->context().monotonic_event_time, scheduled_time_)
+      << event_loop_->node()->name()->string_view();
   CHECK(!remote_timestamps_.empty());
 
   // Send out all timestamps at the currently scheduled time.
@@ -2111,7 +2114,7 @@
 
     // TODO(austin): We probably want to push this down into the timestamp
     // mapper directly.
-    filter->Pop(event_loop_->node(), result.monotonic_event_time);
+    filter->Pop(event_loop_->node(), event_loop_->monotonic_now());
   }
   VLOG(1) << "Popped " << result
           << configuration::CleanedChannelToString(
diff --git a/aos/network/multinode_timestamp_filter.cc b/aos/network/multinode_timestamp_filter.cc
index 7bc807a..362a473 100644
--- a/aos/network/multinode_timestamp_filter.cc
+++ b/aos/network/multinode_timestamp_filter.cc
@@ -179,7 +179,7 @@
     }
   }
 
-  if (VLOG_IS_ON(1)) {
+  if (VLOG_IS_ON(2)) {
     struct MyFormatter {
       void operator()(std::string *out, monotonic_clock::time_point t) const {
         std::stringstream ss;
diff --git a/aos/network/timestamp_filter.cc b/aos/network/timestamp_filter.cc
index 6479607..498f111 100644
--- a/aos/network/timestamp_filter.cc
+++ b/aos/network/timestamp_filter.cc
@@ -21,11 +21,6 @@
   ss << "O(" << t << ") = " << o.count() << "ns, remote " << t + o;
   return ss.str();
 }
-std::string TimeString(const std::tuple<aos::monotonic_clock::time_point,
-                                        std::chrono::nanoseconds, bool>
-                           t) {
-  return TimeString(std::get<0>(t), std::get<1>(t));
-}
 
 std::string TimeString(
     const std::tuple<aos::monotonic_clock::time_point, std::chrono::nanoseconds>
@@ -520,11 +515,11 @@
           std::tuple<monotonic_clock::time_point, chrono::nanoseconds>>
 NoncausalTimestampFilter::FindTimestamps(monotonic_clock::time_point ta) const {
   CHECK_GT(timestamps_size(), 1u);
-  auto it = std::upper_bound(timestamps_.begin() + 1, timestamps_.end() - 1, ta,
-                             [](monotonic_clock::time_point ta,
-                                std::tuple<aos::monotonic_clock::time_point,
-                                           std::chrono::nanoseconds, bool>
-                                    t) { return ta < std::get<0>(t); });
+  auto it = std::upper_bound(
+      timestamps_.begin() + 1, timestamps_.end() - 1, ta,
+      [](monotonic_clock::time_point ta,
+         std::tuple<aos::monotonic_clock::time_point, std::chrono::nanoseconds>
+             t) { return ta < std::get<0>(t); });
 
   const size_t index = std::distance(timestamps_.begin(), it);
 
@@ -1016,6 +1011,11 @@
     aos::monotonic_clock::time_point ta,
     aos::monotonic_clock::time_point tb) const {
   CHECK_GT(timestamps_size(), 0u);
+  if (ta < std::get<0>(timestamp(0)) && has_popped_) {
+    LOG(ERROR) << NodeNames() << " O(" << ta
+               << ") is before the start and we have forgotten the answer.";
+    return false;
+  }
   if (IsOutsideSamples(ta, 0.)) {
     // Special case size = 1 or ta_base before first timestamp or
     // after last timestamp, so we need to extrapolate out
@@ -1062,7 +1062,7 @@
   if (timestamps_.size() == 0) {
     VLOG(1) << NodeNames() << " Initial sample of "
             << TimeString(monotonic_now, sample_ns);
-    timestamps_.emplace_back(std::make_tuple(monotonic_now, sample_ns, false));
+    timestamps_.emplace_back(std::make_tuple(monotonic_now, sample_ns));
     CHECK(!fully_frozen_)
         << ": " << NodeNames()
         << " Returned a horizontal line previously and then "
@@ -1071,15 +1071,21 @@
         << chrono::duration<double>(monotonic_now - std::get<0>(timestamps_[0]))
                .count()
         << " seconds after the last sample at " << std::get<0>(timestamps_[0])
-        << ".  Increase --time_estimation_buffer_seconds to at least "
+        << ".  Increase --time_estimation_buffer_seconds to greater than "
         << chrono::duration<double>(monotonic_now - std::get<0>(timestamps_[0]))
                .count();
     return;
   }
+  CHECK_GT(monotonic_now, frozen_time_)
+      << ": " << NodeNames() << " Tried to insert " << monotonic_now
+      << " before the frozen time of " << frozen_time_
+      << ".  Increase "
+         "--time_estimation_buffer_seconds to greater than "
+      << chrono::duration<double>(frozen_time_ - monotonic_now).count();
 
   // Future samples get quite a bit harder.  We want the line to track the
   // highest point without volating the slope constraint.
-  std::tuple<aos::monotonic_clock::time_point, chrono::nanoseconds, bool> back =
+  std::tuple<aos::monotonic_clock::time_point, chrono::nanoseconds> back =
       timestamps_.back();
 
   aos::monotonic_clock::duration dt = monotonic_now - std::get<0>(back);
@@ -1118,7 +1124,7 @@
         << chrono::duration<double>(monotonic_now - std::get<0>(timestamps_[0]))
                .count()
         << " seconds after the last sample at " << std::get<0>(timestamps_[0])
-        << ".  Increase --time_estimation_buffer_seconds to at least "
+        << ".  Increase --time_estimation_buffer_seconds to greater than "
         << chrono::duration<double>(monotonic_now - std::get<0>(timestamps_[0]))
                .count();
 
@@ -1134,13 +1140,13 @@
     // In this case, point 3 is now violating our constraint and we need to
     // remove it.  This is the non-causal part of the filter.
     while (dt * kMaxVelocity() < doffset && timestamps_.size() > 1u) {
-      CHECK(!std::get<2>(back))
+      CHECK(!frozen(std::get<0>(back)))
           << ": " << NodeNames() << " Can't pop an already frozen sample "
           << TimeString(back) << " while inserting "
           << TimeString(monotonic_now, sample_ns) << ", "
           << chrono::duration<double>(monotonic_now - std::get<0>(back)).count()
           << " seconds in the past.  Increase --time_estimation_buffer_seconds "
-             "to at least "
+             "to greater than "
           << chrono::duration<double>(monotonic_now - std::get<0>(back))
                  .count();
       VLOG(1) << NodeNames()
@@ -1153,7 +1159,9 @@
       doffset = sample_ns - std::get<1>(back);
     }
 
-    timestamps_.emplace_back(std::make_tuple(monotonic_now, sample_ns, false));
+    VLOG(1) << NodeNames() << " Added sample of "
+            << TimeString(monotonic_now, sample_ns);
+    timestamps_.emplace_back(std::make_tuple(monotonic_now, sample_ns));
     return;
   }
 
@@ -1163,18 +1171,14 @@
   auto it = std::lower_bound(
       timestamps_.begin(), timestamps_.end(), monotonic_now,
       [](const std::tuple<aos::monotonic_clock::time_point,
-                          std::chrono::nanoseconds, bool>
+                          std::chrono::nanoseconds>
              x,
          monotonic_clock::time_point t) { return std::get<0>(x) < t; });
 
   CHECK(it != timestamps_.end());
 
-  CHECK(!std::get<2>(*(it)))
-      << ": " << NodeNames() << " Tried to insert " << monotonic_now
-      << " before " << std::get<0>(*it)
-      << ", which is frozen in the past.  Increase "
-         "--time_estimation_buffer_seconds to at least "
-      << chrono::duration<double>(std::get<0>(*it) - monotonic_now).count();
+  // We shouldn't hit this one, but I really want to be sure...
+  CHECK(!frozen(std::get<0>(*(it))));
 
   if (it == timestamps_.begin()) {
     // We are being asked to add at the beginning.
@@ -1194,7 +1198,7 @@
 
     VLOG(1) << NodeNames() << " Added sample at beginning "
             << TimeString(monotonic_now, sample_ns);
-    timestamps_.insert(it, std::make_tuple(monotonic_now, sample_ns, false));
+    timestamps_.insert(it, std::make_tuple(monotonic_now, sample_ns));
 
     while (true) {
       // First point was too positive, so we need to remove points after it
@@ -1274,8 +1278,8 @@
     // Now, insert and start propagating forwards and backwards anything we've
     // made invalid.  Do this simultaneously so we keep discovering anything
     // new.
-    auto middle_it = timestamps_.insert(
-        it, std::make_tuple(monotonic_now, sample_ns, false));
+    auto middle_it =
+        timestamps_.insert(it, std::make_tuple(monotonic_now, sample_ns));
     VLOG(1) << NodeNames() << " Inserted " << TimeString(*middle_it);
 
     while (middle_it != timestamps_.end() && middle_it != timestamps_.begin()) {
@@ -1311,10 +1315,10 @@
             std::get<1>(*middle_it) - std::get<1>(*prior_it);
 
         if (prior_doffset > prior_dt * kMaxVelocity()) {
-          CHECK(!std::get<2>(*prior_it))
+          CHECK(!frozen(std::get<0>(*prior_it)))
               << ": " << NodeNames()
               << " Can't pop an already frozen sample.  Increase "
-                 "--time_estimation_buffer_seconds to at least "
+                 "--time_estimation_buffer_seconds to greater than "
               << chrono::duration<double>(prior_dt).count();
 
           VLOG(1) << "Prior slope is too positive, removing prior point "
@@ -1331,6 +1335,7 @@
 }
 
 bool NoncausalTimestampFilter::Pop(aos::monotonic_clock::time_point time) {
+  VLOG(1) << NodeNames() << " Pop(" << time << ")";
   bool removed = false;
   // When the timestamp which is the end of the line is popped, we want to
   // drop it off the list.  Hence the >=
@@ -1341,6 +1346,18 @@
   return removed;
 }
 
+void NoncausalTimestampFilter::Debug() {
+  size_t count = 0;
+  for (std::tuple<aos::monotonic_clock::time_point, std::chrono::nanoseconds>
+           timestamp : timestamps_) {
+    LOG(INFO) << NodeNames() << " "
+              << TimeString(std::get<0>(timestamp), std::get<1>(timestamp))
+              << " frozen? " << frozen(std::get<0>(timestamp)) << " consumed? "
+              << (count < next_to_consume_);
+    ++count;
+  }
+}
+
 monotonic_clock::time_point NoncausalTimestampFilter::unobserved_line_end()
     const {
   if (has_unobserved_line()) {
@@ -1368,6 +1385,8 @@
   if (timestamps_.empty() || next_to_consume_ >= timestamps_.size()) {
     return std::nullopt;
   }
+  VLOG(1) << NodeNames() << " Observed sample of "
+          << TimeString(timestamp(next_to_consume_));
   return timestamp(next_to_consume_);
 }
 
@@ -1378,28 +1397,34 @@
   }
 
   auto result = timestamp(next_to_consume_);
+  VLOG(1) << NodeNames() << " Consumed sample of " << TimeString(result);
   ++next_to_consume_;
   return result;
 }
 
 void NoncausalTimestampFilter::FreezeUntil(
     aos::monotonic_clock::time_point node_monotonic_now) {
+  if (node_monotonic_now < frozen_time_) {
+    return;
+  }
   for (size_t i = 0; i < timestamps_.size(); ++i) {
     // Freeze 1 point past the match.
-    std::get<2>(timestamps_[i]) = true;
-    if (std::get<0>(timestamps_[i]) >= node_monotonic_now) {
+    if (std::get<0>(timestamp(i)) >= node_monotonic_now) {
+      frozen_time_ = std::get<0>(timestamp(i));
       return;
     }
   }
 
   if (timestamps_.empty()) {
-    VLOG(1) << NodeNames() << " fully_frozen_";
+    VLOG(1) << NodeNames() << " fully_frozen_, no timestamps.";
     fully_frozen_ = true;
   } else if (node_monotonic_now > std::get<0>(timestamps_.back())) {
     // We've been asked to freeze past the last point.  It isn't safe to add any
     // more points or we will change this region.
-    VLOG(1) << NodeNames() << " fully_frozen_";
+    VLOG(1) << NodeNames() << " fully_frozen_, after the end.";
     fully_frozen_ = true;
+  } else {
+    LOG(FATAL) << "How did we get here?";
   }
 }
 
@@ -1407,22 +1432,24 @@
     aos::monotonic_clock::time_point remote_monotonic_now) {
   for (size_t i = 0; i < timestamps_.size(); ++i) {
     // Freeze 1 point past the match.
-    std::get<2>(timestamps_[i]) = true;
     if (std::get<0>(timestamp(i)) + std::get<1>(timestamp(i)) >=
         remote_monotonic_now) {
+      frozen_time_ = std::max(std::get<0>(timestamp(i)), frozen_time_);
       return;
     }
   }
 
   if (timestamps_.empty()) {
-    VLOG(1) << NodeNames() << " fully_frozen_";
+    VLOG(1) << NodeNames() << " fully_frozen_, no timestamps.";
     fully_frozen_ = true;
   } else if (remote_monotonic_now > std::get<0>(timestamps_.back()) +
                                         std::get<1>(timestamps_.back())) {
     // We've been asked to freeze past the last point.  It isn't safe to add any
     // more points or we will change this region.
-    VLOG(1) << NodeNames() << " fully_frozen_";
+    VLOG(1) << NodeNames() << " fully_frozen_, after the end.";
     fully_frozen_ = true;
+  } else {
+    LOG(FATAL) << "How did we get here?";
   }
 }
 
@@ -1450,6 +1477,9 @@
 void NoncausalTimestampFilter::PopFront() {
   VLOG(1) << NodeNames() << " Popped sample of " << TimeString(timestamp(0));
   MaybeWriteTimestamp(timestamp(0));
+
+  // If we drop data, we shouldn't add anything before that point.
+  frozen_time_ = std::max(frozen_time_, std::get<0>(timestamp(0)));
   timestamps_.pop_front();
   has_popped_ = true;
   if (next_to_consume_ > 0u) {
@@ -1479,7 +1509,9 @@
     const Node *node, aos::monotonic_clock::time_point node_delivered_time,
     aos::monotonic_clock::time_point other_node_sent_time) {
   VLOG(1) << "Sample delivered         " << node_delivered_time << " sent "
-          << other_node_sent_time << " to " << node->name()->string_view();
+          << other_node_sent_time << " " << node->name()->string_view()
+          << " -> "
+          << ((node == node_a_) ? node_b_ : node_a_)->name()->string_view();
   if (node == node_a_) {
     a_.Sample(node_delivered_time, other_node_sent_time - node_delivered_time);
   } else if (node == node_b_) {
@@ -1493,8 +1525,9 @@
     const Node *node, aos::monotonic_clock::time_point node_sent_time,
     aos::monotonic_clock::time_point other_node_delivered_time) {
   VLOG(1) << "Reverse sample delivered " << other_node_delivered_time
-          << " sent " << node_sent_time << " from "
-          << node->name()->string_view();
+          << " sent " << node_sent_time << " "
+          << ((node == node_a_) ? node_b_ : node_a_)->name()->string_view()
+          << " -> " << node->name()->string_view();
   if (node == node_a_) {
     b_.Sample(other_node_delivered_time,
               node_sent_time - other_node_delivered_time);
diff --git a/aos/network/timestamp_filter.h b/aos/network/timestamp_filter.h
index 9f01efc..6090fce 100644
--- a/aos/network/timestamp_filter.h
+++ b/aos/network/timestamp_filter.h
@@ -243,8 +243,7 @@
   ~NoncausalTimestampFilter();
 
   // Check whether the given timestamp falls within our current samples
-  bool IsOutsideSamples(monotonic_clock::time_point ta_base,
-                        double ta) const;
+  bool IsOutsideSamples(monotonic_clock::time_point ta_base, double ta) const;
 
   // Check whether the given timestamp lies after our current samples
   bool IsAfterSamples(monotonic_clock::time_point ta_base, double ta) const;
@@ -338,7 +337,11 @@
 
   // Returns if the timestamp is frozen or not.
   bool frozen(size_t index) const {
-    return fully_frozen_ || std::get<2>(timestamps_[index]);
+    return fully_frozen_ || std::get<0>(timestamps_[index]) <= frozen_time_;
+  }
+
+  bool frozen(aos::monotonic_clock::time_point t) const {
+    return t <= frozen_time_;
   }
 
   size_t timestamps_size() const { return timestamps_.size(); }
@@ -346,18 +349,7 @@
   // Returns a debug string with the nodes this filter represents.
   std::string NodeNames() const;
 
-  void Debug() {
-    size_t count = 0;
-    for (std::tuple<aos::monotonic_clock::time_point, std::chrono::nanoseconds,
-                    bool>
-             timestamp : timestamps_) {
-      LOG(INFO) << std::get<0>(timestamp) << " offset "
-                << std::get<1>(timestamp).count() << " frozen? "
-                << std::get<2>(timestamp) << " consumed? "
-                << (count < next_to_consume_);
-      ++count;
-    }
-  }
+  void Debug();
 
   // Sets the starting point and filename to log samples to.  These functions
   // are only used when doing CSV file logging to debug the filter.
@@ -447,11 +439,13 @@
 
   // Timestamp, offest, and then a boolean representing if this sample is frozen
   // and can't be modified or not.
-  // TODO(austin): Actually use and update the bool.
-  std::deque<std::tuple<aos::monotonic_clock::time_point,
-                        std::chrono::nanoseconds, bool>>
+  std::deque<
+      std::tuple<aos::monotonic_clock::time_point, std::chrono::nanoseconds>>
       timestamps_;
 
+  aos::monotonic_clock::time_point frozen_time_ =
+      aos::monotonic_clock::min_time;
+
   // The index of the next element in timestamps to consume.  0 means none have
   // been consumed, and size() means all have been consumed.
   size_t next_to_consume_ = 0;
diff --git a/aos/network/timestamp_filter_test.cc b/aos/network/timestamp_filter_test.cc
index 0e47cc6..a7e1dcf 100644
--- a/aos/network/timestamp_filter_test.cc
+++ b/aos/network/timestamp_filter_test.cc
@@ -625,11 +625,11 @@
     ASSERT_EQ(filter.timestamps_size(), 2u);
     filter.FreezeUntil(tb);
 
-    EXPECT_DEATH(
-        { filter.Sample(tb, oa); },
-        "test_a -> test_b Tried to insert 0.100000000sec before "
-        "0.100000000sec, which is frozen "
-        "in the past.  Increase --time_estimation_buffer_seconds to at least");
+    EXPECT_DEATH({ filter.Sample(tb, oa); },
+                 "monotonic_now > frozen_time_ \\(0.100000000sec vs. "
+                 "0.100000000sec\\) : test_a -> test_b Tried to insert "
+                 "0.100000000sec before the frozen time of 0.100000000sec.  "
+                 "Increase --time_estimation_buffer_seconds to greater than 0");
   }
 
   {
@@ -658,9 +658,10 @@
     filter.FreezeUntil(tc);
 
     EXPECT_DEATH({ filter.Sample(tb, ob); },
-                 "test_a -> test_b Tried to insert 0.100000000sec before "
-                 "0.200000000sec, which "
-                 "is frozen");
+                 "monotonic_now > frozen_time_ \\(0.100000000sec vs. "
+                 "0.200000000sec\\) : test_a -> test_b Tried to insert "
+                 "0.100000000sec before the frozen time of 0.200000000sec.  "
+                 "Increase --time_estimation_buffer_seconds to greater than 0.1");
   }
 
   {
@@ -674,11 +675,11 @@
     ASSERT_EQ(filter.timestamps_size(), 3u);
     filter.FreezeUntil(tb);
 
-    EXPECT_DEATH(
-        { filter.Sample(tb, oa); },
-        "test_a -> test_b Tried to insert 0.100000000sec before "
-        "0.100000000sec, which "
-        "is frozen in the past.  Increase --time_estimation_buffer_seconds");
+    EXPECT_DEATH({ filter.Sample(tb, oa); },
+                 "monotonic_now > frozen_time_ \\(0.100000000sec vs. "
+                 "0.100000000sec\\) : test_a -> test_b Tried to insert "
+                 "0.100000000sec before the frozen time of 0.100000000sec.  "
+                 "Increase --time_estimation_buffer_seconds to greater than 0");
     EXPECT_DEATH({ filter.Sample(tb + chrono::nanoseconds(1), oa); },
                  "test_a -> test_b Can't pop an already frozen sample");
   }