Really fix log sorting

We were queueing all the forward timestamps, but the reverse timestamps
were killing us.  So, we now queue node_b up until the reverse direction
as well.

Change-Id: I94fab9bb2b49eb439d4b576b19177b3d13703fd2
diff --git a/aos/network/multinode_timestamp_filter.cc b/aos/network/multinode_timestamp_filter.cc
index aa580e7..bc11ffc 100644
--- a/aos/network/multinode_timestamp_filter.cc
+++ b/aos/network/multinode_timestamp_filter.cc
@@ -863,6 +863,15 @@
   BitSet64 all_live_nodes(problem.size());
   const BitSet64 all_nodes = ~BitSet64(problem.size());
 
+  for (size_t node_index = 0; node_index < timestamp_mappers_.size();
+       ++node_index) {
+    if (timestamp_mappers_[node_index] != nullptr) {
+      // Make sure we have enough data queued such that if we are going to
+      // have a timestamp on this filter, we do have a timestamp queued.
+      timestamp_mappers_[node_index]->QueueFor(time_estimation_buffer_seconds_);
+    }
+  }
+
   {
     size_t node_a_index = 0;
     for (const auto &filters : filters_per_node_) {
@@ -879,10 +888,6 @@
           problem.add_filter(node_a_index, filter.filter, filter.b_index);
 
           if (timestamp_mappers_[node_a_index] != nullptr) {
-            // Make sure we have enough data queued such that if we are going to
-            // have a timestamp on this filter, we do have a timestamp queued.
-            timestamp_mappers_[node_a_index]->QueueFor(
-                time_estimation_buffer_seconds_);
             // Now, we have cases at startup where we have a couple of points
             // followed by a long gap, followed by the body of the data.  We are
             // extrapolating, then adding the new data, and finding that time
@@ -896,15 +901,34 @@
             // to queue those until we hit 2, because that'll force us to read
             // everything into memory.  So, only queue for the filters which
             // have data in them.
-            if (filter.filter->timestamps_size() == 1u) {
-              timestamp_mappers_[node_a_index]->QueueUntilCondition(
-                  [&filter]() {
-                    return filter.filter->timestamps_size() >= 2u;
-                  });
-            }
-            if (filter.filter->timestamps_size() >= 2u) {
+
+            size_t node_b_index = configuration::GetNodeIndex(
+                timestamp_mappers_[node_a_index]->configuration(),
+                configuration::GetNode(
+                    timestamp_mappers_[node_a_index]->configuration(),
+                    filter.filter->node_b()));
+
+            // Timestamps can come from either node.  When a message is
+            // delivered to a node, it can have the timestamp time attached to
+            // that message as well.  That means that we have to queue both
+            // nodes until we have 2 unobserved points from both nodes.
+            timestamp_mappers_[node_a_index]->QueueUntilCondition(
+                [&filter]() { return filter.filter->has_unobserved_line(); });
+
+            timestamp_mappers_[node_b_index]->QueueUntilCondition(
+                [&filter]() { return filter.filter->has_unobserved_line(); });
+
+            // If we actually found a line, make sure to buffer to the desired
+            // distance past that last point so the filter doesn't try to
+            // invalidate the point.  Do this for both nodes to pick up all the
+            // timestamps.
+            if (filter.filter->has_unobserved_line()) {
               timestamp_mappers_[node_a_index]->QueueUntil(
-                  std::get<0>(filter.filter->timestamp(1u)) +
+                  filter.filter->unobserved_line_end() +
+                  time_estimation_buffer_seconds_);
+
+              timestamp_mappers_[node_b_index]->QueueUntil(
+                  filter.filter->unobserved_line_remote_end() +
                   time_estimation_buffer_seconds_);
             }
           }
diff --git a/aos/network/timestamp_filter.cc b/aos/network/timestamp_filter.cc
index 5605246..b4455a5 100644
--- a/aos/network/timestamp_filter.cc
+++ b/aos/network/timestamp_filter.cc
@@ -1008,6 +1008,11 @@
   }
 }
 
+std::string NoncausalTimestampFilter::NodeNames() const {
+  return absl::StrCat(node_a_->name()->string_view(), " -> ",
+                      node_b_->name()->string_view());
+}
+
 bool NoncausalTimestampFilter::ValidateSolution(
     aos::monotonic_clock::time_point ta,
     aos::monotonic_clock::time_point tb) const {
@@ -1021,9 +1026,8 @@
     const chrono::nanoseconds offset =
         NoncausalTimestampFilter::ExtrapolateOffset(reference_timestamp, ta);
     if (offset + ta > tb) {
-      LOG(ERROR) << node_a_->name()->string_view() << " -> "
-                 << node_b_->name()->string_view() << " "
-                 << TimeString(ta, offset) << " > solution time " << tb;
+      LOG(ERROR) << NodeNames() << " " << TimeString(ta, offset)
+                 << " > solution time " << tb;
       return false;
     }
     return true;
@@ -1036,9 +1040,8 @@
       NoncausalTimestampFilter::InterpolateOffset(points.first, points.second,
                                                   ta);
   if (offset + ta > tb) {
-    LOG(ERROR) << node_a_->name()->string_view() << " -> "
-               << node_b_->name()->string_view() << " "
-               << TimeString(ta, offset) << " > solution time " << tb;
+    LOG(ERROR) << NodeNames() << " " << TimeString(ta, offset)
+               << " > solution time " << tb;
     LOG(ERROR) << "Bracketing times are " << TimeString(points.first) << " and "
                << TimeString(points.second);
     return false;
@@ -1058,20 +1061,20 @@
 
   // The first sample is easy.  Just do it!
   if (timestamps_.size() == 0) {
-    VLOG(1) << node_a_->name()->string_view() << " -> "
-            << node_b_->name()->string_view() << " Initial sample of "
+    VLOG(1) << NodeNames() << " Initial sample of "
             << TimeString(monotonic_now, sample_ns);
     timestamps_.emplace_back(std::make_tuple(monotonic_now, sample_ns, false));
-    CHECK(!fully_frozen_) << ": " << node_a_->name()->string_view() << " -> "
-                          << node_b_->name()->string_view()
-                          << " Returned a horizontal line previously and then "
-                             "got a new sample at "
-                          << monotonic_now << ", "
-                          << chrono::duration<double>(
-                                 monotonic_now - std::get<0>(timestamps_[0]))
-                                 .count()
-                          << " seconds after the last sample at "
-                          << std::get<0>(timestamps_[0]) << ".";
+    CHECK(!fully_frozen_)
+        << ": " << NodeNames()
+        << " Returned a horizontal line previously and then "
+           "got a new sample at "
+        << monotonic_now << ", "
+        << chrono::duration<double>(monotonic_now - std::get<0>(timestamps_[0]))
+               .count()
+        << " seconds after the last sample at " << std::get<0>(timestamps_[0])
+        << ".  Increase --time_estimation_buffer_seconds to at least "
+        << chrono::duration<double>(monotonic_now - std::get<0>(timestamps_[0]))
+               .count();
     return;
   }
 
@@ -1084,9 +1087,8 @@
   aos::monotonic_clock::duration doffset = sample_ns - std::get<1>(back);
 
   if (dt == chrono::nanoseconds(0) && doffset == chrono::nanoseconds(0)) {
-    VLOG(1) << node_a_->name()->string_view() << " -> "
-            << node_b_->name()->string_view() << " Duplicate sample of O("
-            << monotonic_now << ") = " << sample_ns.count() << ", remote time "
+    VLOG(1) << NodeNames() << " Duplicate sample of O(" << monotonic_now
+            << ") = " << sample_ns.count() << ", remote time "
             << monotonic_now + sample_ns;
 
     return;
@@ -1099,26 +1101,27 @@
     // negative slope, the point violates our constraint and will never be worth
     // considering.  Ignore it.
     if (doffset < -dt * kMaxVelocity()) {
-      VLOG(1) << std::setprecision(1) << std::fixed
-              << node_a_->name()->string_view() << " -> "
-              << node_b_->name()->string_view() << " Rejected sample of "
-              << TimeString(monotonic_now, sample_ns) << " because "
-              << doffset.count() << " < " << (-dt * kMaxVelocity()).count()
-              << " len " << timestamps_.size();
+      VLOG(1) << std::setprecision(1) << std::fixed << NodeNames()
+              << " Rejected sample of " << TimeString(monotonic_now, sample_ns)
+              << " because " << doffset.count() << " < "
+              << (-dt * kMaxVelocity()).count() << " len "
+              << timestamps_.size();
       return;
     }
 
     // Be overly conservative here.  It either won't make a difference, or
     // will give us an error with an actual useful time difference.
     CHECK(!fully_frozen_)
-        << ": " << node_a_->name()->string_view() << " -> "
-        << node_b_->name()->string_view()
+        << ": " << NodeNames()
         << " Returned a horizontal line previously and then got a new "
            "sample at "
         << monotonic_now << ", "
         << chrono::duration<double>(monotonic_now - std::get<0>(timestamps_[0]))
                .count()
-        << " seconds after the last sample at " << std::get<0>(timestamps_[0]);
+        << " seconds after the last sample at " << std::get<0>(timestamps_[0])
+        << ".  Increase --time_estimation_buffer_seconds to at least "
+        << chrono::duration<double>(monotonic_now - std::get<0>(timestamps_[0]))
+               .count();
 
     // Back propagate the max velocity and remove any elements violating the
     // velocity constraint.  This is to handle the case where the offsets were
@@ -1132,11 +1135,16 @@
     // In this case, point 3 is now violating our constraint and we need to
     // remove it.  This is the non-causal part of the filter.
     while (dt * kMaxVelocity() < doffset && timestamps_.size() > 1u) {
-      CHECK(!std::get<2>(back)) << ": " << node_a_->name()->string_view()
-                                << " -> " << node_b_->name()->string_view()
-                                << " Can't pop an already frozen sample.";
-      VLOG(1) << node_a_->name()->string_view() << " -> "
-              << node_b_->name()->string_view()
+      CHECK(!std::get<2>(back))
+          << ": " << NodeNames() << " Can't pop an already frozen sample "
+          << TimeString(back) << " while inserting "
+          << TimeString(monotonic_now, sample_ns) << ", "
+          << chrono::duration<double>(monotonic_now - std::get<0>(back)).count()
+          << " seconds in the past.  Increase --time_estimation_buffer_seconds "
+             "to at least "
+          << chrono::duration<double>(monotonic_now - std::get<0>(back))
+                 .count();
+      VLOG(1) << NodeNames()
               << " Removing now invalid sample during back propegation of "
               << TimeString(back);
       timestamps_.pop_back();
@@ -1163,9 +1171,11 @@
   CHECK(it != timestamps_.end());
 
   CHECK(!std::get<2>(*(it)))
-      << ": " << node_a_->name()->string_view() << " -> "
-      << node_b_->name()->string_view() << " Tried to insert " << monotonic_now
-      << " before " << std::get<0>(*it) << ", which is a frozen time.";
+      << ": " << NodeNames() << " Tried to insert " << monotonic_now
+      << " before " << std::get<0>(*it)
+      << ", which is frozen in the past.  Increase "
+         "--time_estimation_buffer_seconds to at least "
+      << chrono::duration<double>(std::get<0>(*it) - monotonic_now).count();
 
   if (it == timestamps_.begin()) {
     // We are being asked to add at the beginning.
@@ -1175,8 +1185,7 @@
       const chrono::nanoseconds doffset = original_offset - sample_ns;
 
       if (dt == chrono::nanoseconds(0) && doffset >= chrono::nanoseconds(0)) {
-        VLOG(1) << node_a_->name()->string_view() << " -> "
-                << node_b_->name()->string_view() << " Redundant timestamp "
+        VLOG(1) << NodeNames() << " Redundant timestamp "
                 << TimeString(monotonic_now, sample_ns) << " because "
                 << TimeString(timestamps_.front())
                 << " is at the same time and a better solution.";
@@ -1184,8 +1193,7 @@
       }
     }
 
-    VLOG(1) << node_a_->name()->string_view() << " -> "
-            << node_b_->name()->string_view() << " Added sample at beginning "
+    VLOG(1) << NodeNames() << " Added sample at beginning "
             << TimeString(monotonic_now, sample_ns);
     timestamps_.insert(it, std::make_tuple(monotonic_now, sample_ns, false));
 
@@ -1198,10 +1206,9 @@
         const chrono::nanoseconds doffset = std::get<1>(*second) - sample_ns;
 
         if (doffset < -dt * kMaxVelocity()) {
-          VLOG(1) << node_a_->name()->string_view() << " -> "
-                  << node_b_->name()->string_view()
-                  << " Removing redundant sample of " << TimeString(*second)
-                  << " because " << TimeString(timestamps_.front())
+          VLOG(1) << NodeNames() << " Removing redundant sample of "
+                  << TimeString(*second) << " because "
+                  << TimeString(timestamps_.front())
                   << " would make the slope too negative.";
           timestamps_.erase(second);
           continue;
@@ -1226,10 +1233,8 @@
               std::get<1>(*third) - std::get<1>(*second);
 
           if (doffset > dt * kMaxVelocity()) {
-            VLOG(1) << node_a_->name()->string_view() << " -> "
-                    << node_b_->name()->string_view()
-                    << " Removing invalid sample of " << TimeString(*second)
-                    << " because " << TimeString(*third)
+            VLOG(1) << NodeNames() << " Removing invalid sample of "
+                    << TimeString(*second) << " because " << TimeString(*third)
                     << " would make the slope too positive.";
             timestamps_.erase(second);
             continue;
@@ -1241,10 +1246,8 @@
     }
     return;
   } else {
-    VLOG(1) << node_a_->name()->string_view() << " -> "
-            << node_b_->name()->string_view() << " Found the next time "
-            << std::get<0>(*(it - 1)) << " < " << monotonic_now << " < "
-            << std::get<0>(*it);
+    VLOG(1) << NodeNames() << " Found the next time " << std::get<0>(*(it - 1))
+            << " < " << monotonic_now << " < " << std::get<0>(*it);
 
     {
       chrono::nanoseconds prior_dt = monotonic_now - std::get<0>(*(it - 1));
@@ -1254,16 +1257,14 @@
 
       // If we are worse than either the previous or next point, discard.
       if (prior_doffset < -prior_dt * kMaxVelocity()) {
-        VLOG(1) << node_a_->name()->string_view() << " -> "
-                << node_b_->name()->string_view() << " Ignoring timestamp "
+        VLOG(1) << NodeNames() << " Ignoring timestamp "
                 << TimeString(monotonic_now, sample_ns) << " because "
                 << TimeString(*(it - 1))
                 << " is before and the slope would be too negative.";
         return;
       }
       if (next_doffset > next_dt * kMaxVelocity()) {
-        VLOG(1) << node_a_->name()->string_view() << " -> "
-                << node_b_->name()->string_view() << " Ignoring timestamp "
+        VLOG(1) << NodeNames() << " Ignoring timestamp "
                 << TimeString(monotonic_now, sample_ns) << " because "
                 << TimeString(*it)
                 << " is following and the slope would be too positive.";
@@ -1276,9 +1277,7 @@
     // new.
     auto middle_it = timestamps_.insert(
         it, std::make_tuple(monotonic_now, sample_ns, false));
-    VLOG(1) << node_a_->name()->string_view() << " -> "
-            << node_b_->name()->string_view() << " Inserted "
-            << TimeString(*middle_it);
+    VLOG(1) << NodeNames() << " Inserted " << TimeString(*middle_it);
 
     while (middle_it != timestamps_.end() && middle_it != timestamps_.begin()) {
       auto next_it =
@@ -1294,8 +1293,7 @@
             std::get<1>(*next_it) - std::get<1>(*middle_it);
 
         if (next_doffset < -next_dt * kMaxVelocity()) {
-          VLOG(1) << node_a_->name()->string_view() << " -> "
-                  << node_b_->name()->string_view()
+          VLOG(1) << NodeNames()
                   << " Next slope is too negative, removing next point "
                   << TimeString(*next_it);
           next_it = timestamps_.erase(next_it);
@@ -1315,9 +1313,11 @@
 
         if (prior_doffset > prior_dt * kMaxVelocity()) {
           CHECK(!std::get<2>(*prior_it))
-              << ": " << node_a_->name()->string_view() << " -> "
-              << node_b_->name()->string_view()
-              << " Can't pop an already frozen sample.";
+              << ": " << NodeNames()
+              << " Can't pop an already frozen sample.  Increase "
+                 "--time_estimation_buffer_seconds to at least "
+              << chrono::duration<double>(prior_dt).count();
+
           VLOG(1) << "Prior slope is too positive, removing prior point "
                   << TimeString(*prior_it);
           prior_it = timestamps_.erase(prior_it);
@@ -1342,6 +1342,28 @@
   return removed;
 }
 
+monotonic_clock::time_point NoncausalTimestampFilter::unobserved_line_end()
+    const {
+  if (has_unobserved_line()) {
+    return std::get<0>(timestamp(next_to_consume_ + 1));
+  }
+  return monotonic_clock::min_time;
+}
+
+monotonic_clock::time_point
+NoncausalTimestampFilter::unobserved_line_remote_end() const {
+  if (has_unobserved_line()) {
+    const std::tuple<monotonic_clock::time_point, std::chrono::nanoseconds> t =
+        timestamp(next_to_consume_ + 1);
+    return std::get<0>(t) + std::get<1>(t);
+  }
+  return monotonic_clock::min_time;
+}
+
+bool NoncausalTimestampFilter::has_unobserved_line() const {
+  return next_to_consume_ + 1 < timestamps_.size();
+}
+
 std::optional<std::tuple<monotonic_clock::time_point, std::chrono::nanoseconds>>
 NoncausalTimestampFilter::Observe() const {
   if (timestamps_.empty() || next_to_consume_ >= timestamps_.size()) {
@@ -1372,10 +1394,12 @@
   }
 
   if (timestamps_.empty()) {
+    VLOG(1) << NodeNames() << " fully_frozen_";
     fully_frozen_ = true;
   } else if (node_monotonic_now > std::get<0>(timestamps_.back())) {
     // We've been asked to freeze past the last point.  It isn't safe to add any
     // more points or we will change this region.
+    VLOG(1) << NodeNames() << " fully_frozen_";
     fully_frozen_ = true;
   }
 }
@@ -1392,11 +1416,13 @@
   }
 
   if (timestamps_.empty()) {
+    VLOG(1) << NodeNames() << " fully_frozen_";
     fully_frozen_ = true;
   } else if (remote_monotonic_now > std::get<0>(timestamps_.back()) +
                                         std::get<1>(timestamps_.back())) {
     // We've been asked to freeze past the last point.  It isn't safe to add any
     // more points or we will change this region.
+    VLOG(1) << NodeNames() << " fully_frozen_";
     fully_frozen_ = true;
   }
 }
@@ -1417,16 +1443,13 @@
 
 void NoncausalTimestampFilter::SetCsvFileName(std::string_view name) {
   fp_ = fopen(absl::StrCat(name, ".csv").c_str(), "w");
-  samples_fp_ =
-      fopen(absl::StrCat(name, "_samples.csv").c_str(), "w");
+  samples_fp_ = fopen(absl::StrCat(name, "_samples.csv").c_str(), "w");
   PrintNoncausalTimestampFilterHeader(fp_);
   PrintNoncausalTimestampFilterSamplesHeader(samples_fp_);
 }
 
 void NoncausalTimestampFilter::PopFront() {
-  VLOG(1) << node_a_->name()->string_view() << " -> "
-          << node_b_->name()->string_view() << " Popped sample of "
-          << TimeString(timestamp(0));
+  VLOG(1) << NodeNames() << " Popped sample of " << TimeString(timestamp(0));
   MaybeWriteTimestamp(timestamp(0));
   timestamps_.pop_front();
   has_popped_ = true;
diff --git a/aos/network/timestamp_filter.h b/aos/network/timestamp_filter.h
index 7a9b7fa..9f01efc 100644
--- a/aos/network/timestamp_filter.h
+++ b/aos/network/timestamp_filter.h
@@ -343,6 +343,9 @@
 
   size_t timestamps_size() const { return timestamps_.size(); }
 
+  // Returns a debug string with the nodes this filter represents.
+  std::string NodeNames() const;
+
   void Debug() {
     size_t count = 0;
     for (std::tuple<aos::monotonic_clock::time_point, std::chrono::nanoseconds,
@@ -366,6 +369,15 @@
   void FreezeUntil(aos::monotonic_clock::time_point node_monotonic_now);
   void FreezeUntilRemote(aos::monotonic_clock::time_point remote_monotonic_now);
 
+  // Returns true if there is a full line which hasn't been observed.
+  bool has_unobserved_line() const;
+  // Returns the time of the second point in the unobserved line, or min_time if
+  // there is no line.
+  monotonic_clock::time_point unobserved_line_end() const;
+  // Returns the time of the second point in the unobserved line on the remote
+  // node, or min_time if there is no line.
+  monotonic_clock::time_point unobserved_line_remote_end() const;
+
   // Returns the next timestamp in the queue if available without incrementing
   // the pointer.  This, Consume, and FreezeUntil work together to allow
   // tracking and freezing timestamps which have been combined externally.
@@ -415,6 +427,9 @@
       std::tuple<monotonic_clock::time_point, std::chrono::nanoseconds> p0,
       monotonic_clock::time_point ta_base, double ta);
 
+  const Node *node_a() const { return node_a_; }
+  const Node *node_b() const { return node_b_; }
+
  private:
   // Removes the oldest timestamp.
   void PopFront();
diff --git a/aos/network/timestamp_filter_test.cc b/aos/network/timestamp_filter_test.cc
index caecccd..0e47cc6 100644
--- a/aos/network/timestamp_filter_test.cc
+++ b/aos/network/timestamp_filter_test.cc
@@ -625,9 +625,11 @@
     ASSERT_EQ(filter.timestamps_size(), 2u);
     filter.FreezeUntil(tb);
 
-    EXPECT_DEATH({ filter.Sample(tb, oa); },
-                 "Tried to insert 0.100000000sec before 0.100000000sec, which "
-                 "is a frozen time");
+    EXPECT_DEATH(
+        { filter.Sample(tb, oa); },
+        "test_a -> test_b Tried to insert 0.100000000sec before "
+        "0.100000000sec, which is frozen "
+        "in the past.  Increase --time_estimation_buffer_seconds to at least");
   }
 
   {
@@ -641,7 +643,8 @@
 
     EXPECT_DEATH(
         { filter.Sample(tc, oc); },
-        "Returned a horizontal line previously and then got a new sample at "
+        "test_a -> test_b Returned a horizontal line previously and then got a "
+        "new sample at "
         "0.200000000sec, 0.2 seconds after the last sample at 0.000000000sec");
   }
 
@@ -655,8 +658,9 @@
     filter.FreezeUntil(tc);
 
     EXPECT_DEATH({ filter.Sample(tb, ob); },
-                 "Tried to insert 0.100000000sec before 0.200000000sec, which "
-                 "is a frozen time");
+                 "test_a -> test_b Tried to insert 0.100000000sec before "
+                 "0.200000000sec, which "
+                 "is frozen");
   }
 
   {
@@ -670,11 +674,13 @@
     ASSERT_EQ(filter.timestamps_size(), 3u);
     filter.FreezeUntil(tb);
 
-    EXPECT_DEATH({ filter.Sample(tb, oa); },
-                 "Tried to insert 0.100000000sec before 0.100000000sec, which "
-                 "is a frozen time");
+    EXPECT_DEATH(
+        { filter.Sample(tb, oa); },
+        "test_a -> test_b Tried to insert 0.100000000sec before "
+        "0.100000000sec, which "
+        "is frozen in the past.  Increase --time_estimation_buffer_seconds");
     EXPECT_DEATH({ filter.Sample(tb + chrono::nanoseconds(1), oa); },
-                 "Can't pop an already frozen sample");
+                 "test_a -> test_b Can't pop an already frozen sample");
   }
 }