Add a noncausal timestamp filter

This implements one half of the per node pair filter.  This filters
offset samples from a single VPU to build up an estimate of the minimum
offset (plus network delay) which we will use in another class to
estimate the average.

Change-Id: I3222978843777bb91b01008e2461722c51f52485
diff --git a/aos/network/timestamp_filter.cc b/aos/network/timestamp_filter.cc
index 117b0e6..3fca7e6 100644
--- a/aos/network/timestamp_filter.cc
+++ b/aos/network/timestamp_filter.cc
@@ -1,6 +1,7 @@
 #include "aos/network/timestamp_filter.h"
 
 #include <chrono>
+#include <iomanip>
 #include <tuple>
 
 #include "absl/strings/str_cat.h"
@@ -20,6 +21,17 @@
           "sample_contribution, time_contribution\n");
 }
 
+void PrintNoncausalTimestampFilterHeader(FILE *fp) {
+  fprintf(fp,
+          "# time_since_start, sample_ns, filtered_offset, offset, "
+          "velocity, filtered_velocity, velocity_contribution, "
+          "sample_contribution, time_contribution\n");
+}
+
+void PrintNoncausalTimestampFilterSamplesHeader(FILE *fp) {
+  fprintf(fp, "# time_since_start, sample_ns, offset\n");
+}
+
 }  // namespace
 
 void TimestampFilter::Set(aos::monotonic_clock::time_point monotonic_now,
@@ -427,9 +439,8 @@
 Line Line::Fit(
     const std::tuple<monotonic_clock::time_point, chrono::nanoseconds> a,
     const std::tuple<monotonic_clock::time_point, chrono::nanoseconds> b) {
-  mpq_class slope =
-      FromInt64((std::get<1>(b) - std::get<1>(a)).count()) /
-      FromInt64((std::get<0>(b) - std::get<0>(a)).count());
+  mpq_class slope = FromInt64((std::get<1>(b) - std::get<1>(a)).count()) /
+                    FromInt64((std::get<0>(b) - std::get<0>(a)).count());
   slope.canonicalize();
   mpq_class offset =
       FromInt64(std::get<1>(a).count()) -
@@ -480,5 +491,167 @@
   return f;
 }
 
+NoncausalTimestampFilter::~NoncausalTimestampFilter() {
+  // Destroy the filter by popping until empty.  This will trigger any
+  // timestamps to be written to the files.
+  while (timestamps_.size() != 0u) {
+    PopFront();
+  }
+  if (fp_) {
+    fclose(fp_);
+  }
+
+  if (samples_fp_) {
+    fclose(samples_fp_);
+  }
+}
+
+Line NoncausalTimestampFilter::FitLine() {
+  DCHECK_GE(timestamps_.size(), 1u);
+  if (timestamps_.size() == 1) {
+    Line fit(std::get<1>(timestamps_[0]), 0.0);
+    return fit;
+  } else {
+    return Line::Fit(timestamps_[0], timestamps_[1]);
+  }
+}
+
+bool NoncausalTimestampFilter::Sample(
+    aos::monotonic_clock::time_point monotonic_now,
+    chrono::nanoseconds sample_ns) {
+  if (samples_fp_ && first_time_ != aos::monotonic_clock::min_time) {
+    fprintf(samples_fp_, "%.9f, %.9f\n",
+            chrono::duration_cast<chrono::duration<double>>(monotonic_now -
+                                                            first_time_)
+                .count(),
+            chrono::duration_cast<chrono::duration<double>>(sample_ns).count());
+  }
+
+  // The first sample is easy.  Just do it!
+  if (timestamps_.size() == 0) {
+    timestamps_.emplace_back(std::make_pair(monotonic_now, sample_ns));
+    return true;
+  } else {
+    // Future samples get quite a bit harder.  We want the line to track the
+    // highest point without volating the slope constraint.
+    std::tuple<aos::monotonic_clock::time_point, chrono::nanoseconds> back =
+        timestamps_.back();
+
+    aos::monotonic_clock::duration dt = monotonic_now - std::get<0>(back);
+    aos::monotonic_clock::duration doffset = sample_ns - std::get<1>(back);
+
+    // If the point is higher than the max negative slope, the slope will either
+    // adhere to our constraint, or will be too positive.  If it is too
+    // positive, we need to back propagate and remove offending points which
+    // were too low rather than reject this new point.  We never want a point to
+    // be higher than the line.
+    if (-dt * kMaxVelocity() <= doffset) {
+      // Back propagate the max velocity and remove any elements violating the
+      // velocity constraint.
+      while (dt * kMaxVelocity() < doffset && timestamps_.size() > 1u) {
+        timestamps_.pop_back();
+
+        back = timestamps_.back();
+        dt = monotonic_now - std::get<0>(back);
+        doffset = sample_ns - std::get<1>(back);
+      }
+
+      // TODO(austin): Refuse to modify the 0th element after we have used it.
+      timestamps_.emplace_back(std::make_pair(monotonic_now, sample_ns));
+
+      // If we are early in the log file, the filter hasn't had time to get
+      // started.  We might only have 2 samples, and the first sample was
+      // incredibly delayed, violating our velocity constraint.  In that case,
+      // modify the first sample (rather than remove it) to retain the knowledge
+      // of the velocity, but adhere to the constraints.
+      if (dt * kMaxVelocity() < doffset) {
+        CHECK_EQ(timestamps_.size(), 2u);
+        const aos::monotonic_clock::duration adjusted_initial_time =
+            sample_ns - aos::monotonic_clock::duration(
+                            static_cast<aos::monotonic_clock::duration::rep>(
+                                dt.count() * kMaxVelocity()));
+
+        VLOG(1) << csv_file_name_ << " slope " << std::setprecision(20)
+                << FitLine().slope() << " offset " << FitLine().offset().count()
+                << " a [(" << std::get<0>(timestamps()[0]) << " -> "
+                << std::get<1>(timestamps()[0]).count() << "ns), ("
+                << std::get<0>(timestamps()[1]) << " -> "
+                << std::get<1>(timestamps()[1]).count()
+                << "ns) => {dt: " << std::fixed << std::setprecision(6)
+                << chrono::duration<double, std::milli>(
+                       std::get<0>(timestamps()[1]) -
+                       std::get<0>(timestamps()[0]))
+                       .count()
+                << "ms, do: " << std::fixed << std::setprecision(6)
+                << chrono::duration<double, std::milli>(
+                       std::get<1>(timestamps()[1]) -
+                       std::get<1>(timestamps()[0]))
+                       .count()
+                << "ms}]";
+        VLOG(1) << "Back is out of range, clipping from "
+                << std::get<1>(timestamps_[0]).count() << " to "
+                << adjusted_initial_time.count();
+
+        std::get<1>(timestamps_[0]) = adjusted_initial_time;
+      }
+      if (timestamps_.size() == 2) {
+        return true;
+      }
+    }
+    return false;
+  }
+}
+
+bool NoncausalTimestampFilter::Pop(
+    aos::monotonic_clock::time_point time) {
+  bool removed = false;
+  // When the timestamp which is the end of the line is popped, we want to
+  // drop it off the list.  Hence the >=
+  while (timestamps_.size() >= 2 && time >= std::get<0>(timestamps_[1])) {
+    PopFront();
+    removed = true;
+  }
+  return removed;
+}
+
+void NoncausalTimestampFilter::SetFirstTime(
+    aos::monotonic_clock::time_point time) {
+  first_time_ = time;
+  if (fp_) {
+    fp_ = freopen(NULL, "wb", fp_);
+    PrintNoncausalTimestampFilterHeader(fp_);
+  }
+  if (samples_fp_) {
+    samples_fp_ = freopen(NULL, "wb", samples_fp_);
+    PrintNoncausalTimestampFilterSamplesHeader(samples_fp_);
+  }
+}
+
+void NoncausalTimestampFilter::SetCsvFileName(std::string_view name) {
+  csv_file_name_ = name;
+  fp_ = fopen(absl::StrCat(csv_file_name_, ".csv").c_str(), "w");
+  samples_fp_ =
+      fopen(absl::StrCat(csv_file_name_, "_samples.csv").c_str(), "w");
+  PrintNoncausalTimestampFilterHeader(fp_);
+  PrintNoncausalTimestampFilterSamplesHeader(samples_fp_);
+}
+
+void NoncausalTimestampFilter::MaybeWriteTimestamp(
+    std::tuple<aos::monotonic_clock::time_point, std::chrono::nanoseconds>
+        timestamp) {
+  if (fp_ && first_time_ != aos::monotonic_clock::min_time) {
+    fprintf(fp_, "%.9f, %.9f, %.9f\n",
+            std::chrono::duration_cast<std::chrono::duration<double>>(
+                std::get<0>(timestamp) - first_time_)
+                .count(),
+            std::chrono::duration_cast<std::chrono::duration<double>>(
+                std::get<0>(timestamp).time_since_epoch())
+                .count(),
+            std::chrono::duration_cast<std::chrono::duration<double>>(
+                std::get<1>(timestamp))
+                .count());
+  }
+}
+
 }  // namespace message_bridge
 }  // namespace aos
diff --git a/aos/network/timestamp_filter.h b/aos/network/timestamp_filter.h
index c82f9a6..3f5d7d2 100644
--- a/aos/network/timestamp_filter.h
+++ b/aos/network/timestamp_filter.h
@@ -5,6 +5,7 @@
 #include <algorithm>
 #include <chrono>
 #include <cmath>
+#include <deque>
 
 #include "aos/time/time.h"
 #include "glog/logging.h"
@@ -304,6 +305,80 @@
 // tb = O(ta) + ta
 Line AverageFits(Line fa, Line fb);
 
+// This class implements a noncausal timestamp filter.  It tracks the maximum
+// points while enforcing both a maximum positive and negative slope constraint.
+// It does this by building up a buffer of samples, and removing any samples
+// which would create segments where the slope is invalid.  As long as the
+// filter is seeded with enough future samples, the start won't change.
+//
+// We want the offset to be defined as tb = O(ta) + ta.  For this to work, the
+// offset is (tb - ta).  If we assume tb = O(ta) + ta + network_delay, then
+// O(ta) = tb - ta - network_delay.  This means that the fastest time a message
+// can be delivered is going to be when the offset is the most positive.
+//
+// TODO(austin): Figure out how to track when we have used something from this
+// filter, and when we do that, enforce that it can't change anymore.  That will
+// help us find when we haven't buffered far enough in the future.
+class NoncausalTimestampFilter {
+ public:
+  ~NoncausalTimestampFilter();
+
+  // Returns a line fit to the oldest 2 points in the timestamp list if
+  // available, or the only point (assuming 0 slope) if not available.
+  Line FitLine();
+
+  // Adds a new sample to our filtered timestamp list.
+  // Returns true if adding the sample changed the output from FitLine().
+  bool Sample(aos::monotonic_clock::time_point monotonic_now,
+              std::chrono::nanoseconds sample_ns);
+
+  // Removes any old timestamps from our timestamps list.
+  // Returns true if adding the sample changed the output from FitLine().
+  bool Pop(aos::monotonic_clock::time_point time);
+
+  // Returns the current list of timestamps in our list.
+  const std::deque<
+      std::tuple<aos::monotonic_clock::time_point, std::chrono::nanoseconds>>
+      &timestamps() {
+    return timestamps_;
+  }
+
+  void Debug() {
+    for (std::tuple<aos::monotonic_clock::time_point, std::chrono::nanoseconds>
+             timestamp : timestamps_) {
+      LOG(INFO) << std::get<0>(timestamp) << " offset "
+                << std::get<1>(timestamp).count();
+    }
+  }
+
+  // Sets the starting point and filename to log samples to.  These functions
+  // are only used when doing CSV file logging to debug the filter.
+  void SetFirstTime(aos::monotonic_clock::time_point time);
+  void SetCsvFileName(std::string_view name);
+
+ private:
+  // Removes the oldest timestamp.
+  void PopFront() {
+    MaybeWriteTimestamp(timestamps_.front());
+    timestamps_.pop_front();
+  }
+
+  // Writes a timestamp to the file if it is reasonable.
+  void MaybeWriteTimestamp(
+      std::tuple<aos::monotonic_clock::time_point, std::chrono::nanoseconds>
+          timestamp);
+
+  std::deque<
+      std::tuple<aos::monotonic_clock::time_point, std::chrono::nanoseconds>>
+      timestamps_;
+
+  std::string csv_file_name_;
+  FILE *fp_ = nullptr;
+  FILE *samples_fp_ = nullptr;
+
+  aos::monotonic_clock::time_point first_time_ = aos::monotonic_clock::min_time;
+};
+
 }  // namespace message_bridge
 }  // namespace aos
 
diff --git a/aos/network/timestamp_filter_test.cc b/aos/network/timestamp_filter_test.cc
index c834850..b46f526 100644
--- a/aos/network/timestamp_filter_test.cc
+++ b/aos/network/timestamp_filter_test.cc
@@ -160,6 +160,156 @@
   }
 }
 
+// Tests that 2 samples results in the correct line between them, and the
+// correct intermediate as it is being built.
+TEST(NoncausalTimestampFilterTest, SingleSample) {
+  const monotonic_clock::time_point ta(chrono::nanoseconds(100000));
+  const monotonic_clock::time_point tb(chrono::nanoseconds(200000));
+
+  NoncausalTimestampFilter filter;
+
+  filter.Sample(ta, chrono::nanoseconds(1000));
+  EXPECT_EQ(filter.timestamps().size(), 1u);
+
+  {
+    Line l1 = filter.FitLine();
+
+    EXPECT_EQ(l1.mpq_offset(), mpq_class(1000));
+    EXPECT_EQ(l1.mpq_slope(), mpq_class(0));
+  }
+
+  filter.Sample(tb, chrono::nanoseconds(1100));
+  EXPECT_EQ(filter.timestamps().size(), 2u);
+
+  {
+    Line l2 = filter.FitLine();
+    EXPECT_EQ(l2.mpq_offset(), mpq_class(900));
+    EXPECT_EQ(l2.mpq_slope(), mpq_class(1, 1000));
+  }
+}
+
+// Tests that invalid samples get clipped as expected.
+TEST(NoncausalTimestampFilterTest, ClippedSample) {
+  const monotonic_clock::time_point ta(chrono::milliseconds(0));
+  const monotonic_clock::time_point tb(chrono::milliseconds(1));
+  const monotonic_clock::time_point tc(chrono::milliseconds(2));
+
+  {
+    // A positive slope of 1 ms/second is properly applied.
+    NoncausalTimestampFilter filter;
+
+    filter.Sample(ta, chrono::microseconds(1));
+    filter.Debug();
+    filter.Sample(tb, chrono::microseconds(2));
+    filter.Debug();
+    ASSERT_EQ(filter.timestamps().size(), 2u);
+
+    {
+      Line l2 = filter.FitLine();
+      EXPECT_EQ(l2.mpq_offset(), mpq_class(1000));
+      EXPECT_EQ(l2.mpq_slope(), mpq_class(1, 1000));
+    }
+  }
+
+  {
+    // A negative slope of 1 ms/second is properly applied.
+    NoncausalTimestampFilter filter;
+
+    filter.Sample(ta, chrono::microseconds(1));
+    filter.Debug();
+    filter.Sample(tb, chrono::microseconds(0));
+    filter.Debug();
+    ASSERT_EQ(filter.timestamps().size(), 2u);
+
+    {
+      Line l2 = filter.FitLine();
+      EXPECT_EQ(l2.mpq_offset(), mpq_class(1000));
+      EXPECT_EQ(l2.mpq_slope(), -mpq_class(1, 1000));
+    }
+  }
+
+  {
+    // Too much negative is ignored.
+    NoncausalTimestampFilter filter;
+
+    filter.Sample(ta, chrono::microseconds(1));
+    filter.Debug();
+    filter.Sample(tb, -chrono::microseconds(1));
+    filter.Debug();
+    ASSERT_EQ(filter.timestamps().size(), 1u);
+  }
+
+  {
+    // Too much positive pulls up the first point.
+    NoncausalTimestampFilter filter;
+
+    filter.Sample(ta, chrono::microseconds(1));
+    filter.Debug();
+    filter.Sample(tb, chrono::microseconds(3));
+    filter.Debug();
+    ASSERT_EQ(filter.timestamps().size(), 2u);
+
+    EXPECT_EQ(std::get<1>(filter.timestamps()[0]), chrono::microseconds(2));
+    EXPECT_EQ(std::get<1>(filter.timestamps()[1]), chrono::microseconds(3));
+  }
+
+  {
+    // Too much positive slope removes points.
+    NoncausalTimestampFilter filter;
+
+    filter.Sample(ta, chrono::microseconds(1));
+    filter.Debug();
+    filter.Sample(tb, chrono::microseconds(1));
+    filter.Debug();
+    ASSERT_EQ(filter.timestamps().size(), 2u);
+
+    // Now add a sample with a slope of 0.002.  This should back propagate and
+    // remove the middle point since it violates our constraints.
+    filter.Sample(tc, chrono::microseconds(3));
+    filter.Debug();
+    ASSERT_EQ(filter.timestamps().size(), 2u);
+
+    EXPECT_EQ(std::get<1>(filter.timestamps()[0]), chrono::microseconds(1));
+    EXPECT_EQ(std::get<1>(filter.timestamps()[1]), chrono::microseconds(3));
+  }
+}
+
+// Tests that removing points from the filter works as expected.
+TEST(NoncausalTimestampFilterTest, PointRemoval) {
+  const monotonic_clock::time_point t_before(-chrono::milliseconds(1));
+  const monotonic_clock::time_point ta(chrono::milliseconds(0));
+  const monotonic_clock::time_point tb(chrono::milliseconds(1));
+  const monotonic_clock::time_point tc(chrono::milliseconds(2));
+
+  // A positive slope of 1 ms/second is properly applied.
+  NoncausalTimestampFilter filter;
+
+  filter.Sample(ta, chrono::microseconds(1));
+  filter.Debug();
+  filter.Sample(tb, chrono::microseconds(2));
+  filter.Debug();
+  filter.Sample(tc, chrono::microseconds(1));
+  filter.Debug();
+  ASSERT_EQ(filter.timestamps().size(), 3u);
+
+  // Before or in the middle of the first line segment shouldn't change the
+  // number of points.
+  EXPECT_FALSE(filter.Pop(t_before));
+  ASSERT_EQ(filter.timestamps().size(), 3u);
+
+  EXPECT_FALSE(filter.Pop(ta));
+  ASSERT_EQ(filter.timestamps().size(), 3u);
+
+  EXPECT_FALSE(filter.Pop(ta + chrono::microseconds(100)));
+  ASSERT_EQ(filter.timestamps().size(), 3u);
+
+  // The second point should trigger a pop, since the offset computed using the
+  // points won't change when it is used, and any times after (even 1-2 ns
+  // later) would be wrong.
+  EXPECT_TRUE(filter.Pop(tb));
+  ASSERT_EQ(filter.timestamps().size(), 2u);
+}
+
 }  // namespace testing
 }  // namespace message_bridge
 }  // namespace aos