Add a noncausal timestamp filter
This implements one half of the per node pair filter. This filters
offset samples from a single VPU to build up an estimate of the minimum
offset (plus network delay) which we will use in another class to
estimate the average.
Change-Id: I3222978843777bb91b01008e2461722c51f52485
diff --git a/aos/network/timestamp_filter.cc b/aos/network/timestamp_filter.cc
index 117b0e6..3fca7e6 100644
--- a/aos/network/timestamp_filter.cc
+++ b/aos/network/timestamp_filter.cc
@@ -1,6 +1,7 @@
#include "aos/network/timestamp_filter.h"
#include <chrono>
+#include <iomanip>
#include <tuple>
#include "absl/strings/str_cat.h"
@@ -20,6 +21,17 @@
"sample_contribution, time_contribution\n");
}
+void PrintNoncausalTimestampFilterHeader(FILE *fp) {
+ fprintf(fp,
+ "# time_since_start, sample_ns, filtered_offset, offset, "
+ "velocity, filtered_velocity, velocity_contribution, "
+ "sample_contribution, time_contribution\n");
+}
+
+void PrintNoncausalTimestampFilterSamplesHeader(FILE *fp) {
+ fprintf(fp, "# time_since_start, sample_ns, offset\n");
+}
+
} // namespace
void TimestampFilter::Set(aos::monotonic_clock::time_point monotonic_now,
@@ -427,9 +439,8 @@
Line Line::Fit(
const std::tuple<monotonic_clock::time_point, chrono::nanoseconds> a,
const std::tuple<monotonic_clock::time_point, chrono::nanoseconds> b) {
- mpq_class slope =
- FromInt64((std::get<1>(b) - std::get<1>(a)).count()) /
- FromInt64((std::get<0>(b) - std::get<0>(a)).count());
+ mpq_class slope = FromInt64((std::get<1>(b) - std::get<1>(a)).count()) /
+ FromInt64((std::get<0>(b) - std::get<0>(a)).count());
slope.canonicalize();
mpq_class offset =
FromInt64(std::get<1>(a).count()) -
@@ -480,5 +491,167 @@
return f;
}
+NoncausalTimestampFilter::~NoncausalTimestampFilter() {
+ // Destroy the filter by popping until empty. This will trigger any
+ // timestamps to be written to the files.
+ while (timestamps_.size() != 0u) {
+ PopFront();
+ }
+ if (fp_) {
+ fclose(fp_);
+ }
+
+ if (samples_fp_) {
+ fclose(samples_fp_);
+ }
+}
+
+Line NoncausalTimestampFilter::FitLine() {
+ DCHECK_GE(timestamps_.size(), 1u);
+ if (timestamps_.size() == 1) {
+ Line fit(std::get<1>(timestamps_[0]), 0.0);
+ return fit;
+ } else {
+ return Line::Fit(timestamps_[0], timestamps_[1]);
+ }
+}
+
+bool NoncausalTimestampFilter::Sample(
+ aos::monotonic_clock::time_point monotonic_now,
+ chrono::nanoseconds sample_ns) {
+ if (samples_fp_ && first_time_ != aos::monotonic_clock::min_time) {
+ fprintf(samples_fp_, "%.9f, %.9f\n",
+ chrono::duration_cast<chrono::duration<double>>(monotonic_now -
+ first_time_)
+ .count(),
+ chrono::duration_cast<chrono::duration<double>>(sample_ns).count());
+ }
+
+ // The first sample is easy. Just do it!
+ if (timestamps_.size() == 0) {
+ timestamps_.emplace_back(std::make_pair(monotonic_now, sample_ns));
+ return true;
+ } else {
+ // Future samples get quite a bit harder. We want the line to track the
+ // highest point without volating the slope constraint.
+ std::tuple<aos::monotonic_clock::time_point, chrono::nanoseconds> back =
+ timestamps_.back();
+
+ aos::monotonic_clock::duration dt = monotonic_now - std::get<0>(back);
+ aos::monotonic_clock::duration doffset = sample_ns - std::get<1>(back);
+
+ // If the point is higher than the max negative slope, the slope will either
+ // adhere to our constraint, or will be too positive. If it is too
+ // positive, we need to back propagate and remove offending points which
+ // were too low rather than reject this new point. We never want a point to
+ // be higher than the line.
+ if (-dt * kMaxVelocity() <= doffset) {
+ // Back propagate the max velocity and remove any elements violating the
+ // velocity constraint.
+ while (dt * kMaxVelocity() < doffset && timestamps_.size() > 1u) {
+ timestamps_.pop_back();
+
+ back = timestamps_.back();
+ dt = monotonic_now - std::get<0>(back);
+ doffset = sample_ns - std::get<1>(back);
+ }
+
+ // TODO(austin): Refuse to modify the 0th element after we have used it.
+ timestamps_.emplace_back(std::make_pair(monotonic_now, sample_ns));
+
+ // If we are early in the log file, the filter hasn't had time to get
+ // started. We might only have 2 samples, and the first sample was
+ // incredibly delayed, violating our velocity constraint. In that case,
+ // modify the first sample (rather than remove it) to retain the knowledge
+ // of the velocity, but adhere to the constraints.
+ if (dt * kMaxVelocity() < doffset) {
+ CHECK_EQ(timestamps_.size(), 2u);
+ const aos::monotonic_clock::duration adjusted_initial_time =
+ sample_ns - aos::monotonic_clock::duration(
+ static_cast<aos::monotonic_clock::duration::rep>(
+ dt.count() * kMaxVelocity()));
+
+ VLOG(1) << csv_file_name_ << " slope " << std::setprecision(20)
+ << FitLine().slope() << " offset " << FitLine().offset().count()
+ << " a [(" << std::get<0>(timestamps()[0]) << " -> "
+ << std::get<1>(timestamps()[0]).count() << "ns), ("
+ << std::get<0>(timestamps()[1]) << " -> "
+ << std::get<1>(timestamps()[1]).count()
+ << "ns) => {dt: " << std::fixed << std::setprecision(6)
+ << chrono::duration<double, std::milli>(
+ std::get<0>(timestamps()[1]) -
+ std::get<0>(timestamps()[0]))
+ .count()
+ << "ms, do: " << std::fixed << std::setprecision(6)
+ << chrono::duration<double, std::milli>(
+ std::get<1>(timestamps()[1]) -
+ std::get<1>(timestamps()[0]))
+ .count()
+ << "ms}]";
+ VLOG(1) << "Back is out of range, clipping from "
+ << std::get<1>(timestamps_[0]).count() << " to "
+ << adjusted_initial_time.count();
+
+ std::get<1>(timestamps_[0]) = adjusted_initial_time;
+ }
+ if (timestamps_.size() == 2) {
+ return true;
+ }
+ }
+ return false;
+ }
+}
+
+bool NoncausalTimestampFilter::Pop(
+ aos::monotonic_clock::time_point time) {
+ bool removed = false;
+ // When the timestamp which is the end of the line is popped, we want to
+ // drop it off the list. Hence the >=
+ while (timestamps_.size() >= 2 && time >= std::get<0>(timestamps_[1])) {
+ PopFront();
+ removed = true;
+ }
+ return removed;
+}
+
+void NoncausalTimestampFilter::SetFirstTime(
+ aos::monotonic_clock::time_point time) {
+ first_time_ = time;
+ if (fp_) {
+ fp_ = freopen(NULL, "wb", fp_);
+ PrintNoncausalTimestampFilterHeader(fp_);
+ }
+ if (samples_fp_) {
+ samples_fp_ = freopen(NULL, "wb", samples_fp_);
+ PrintNoncausalTimestampFilterSamplesHeader(samples_fp_);
+ }
+}
+
+void NoncausalTimestampFilter::SetCsvFileName(std::string_view name) {
+ csv_file_name_ = name;
+ fp_ = fopen(absl::StrCat(csv_file_name_, ".csv").c_str(), "w");
+ samples_fp_ =
+ fopen(absl::StrCat(csv_file_name_, "_samples.csv").c_str(), "w");
+ PrintNoncausalTimestampFilterHeader(fp_);
+ PrintNoncausalTimestampFilterSamplesHeader(samples_fp_);
+}
+
+void NoncausalTimestampFilter::MaybeWriteTimestamp(
+ std::tuple<aos::monotonic_clock::time_point, std::chrono::nanoseconds>
+ timestamp) {
+ if (fp_ && first_time_ != aos::monotonic_clock::min_time) {
+ fprintf(fp_, "%.9f, %.9f, %.9f\n",
+ std::chrono::duration_cast<std::chrono::duration<double>>(
+ std::get<0>(timestamp) - first_time_)
+ .count(),
+ std::chrono::duration_cast<std::chrono::duration<double>>(
+ std::get<0>(timestamp).time_since_epoch())
+ .count(),
+ std::chrono::duration_cast<std::chrono::duration<double>>(
+ std::get<1>(timestamp))
+ .count());
+ }
+}
+
} // namespace message_bridge
} // namespace aos
diff --git a/aos/network/timestamp_filter.h b/aos/network/timestamp_filter.h
index c82f9a6..3f5d7d2 100644
--- a/aos/network/timestamp_filter.h
+++ b/aos/network/timestamp_filter.h
@@ -5,6 +5,7 @@
#include <algorithm>
#include <chrono>
#include <cmath>
+#include <deque>
#include "aos/time/time.h"
#include "glog/logging.h"
@@ -304,6 +305,80 @@
// tb = O(ta) + ta
Line AverageFits(Line fa, Line fb);
+// This class implements a noncausal timestamp filter. It tracks the maximum
+// points while enforcing both a maximum positive and negative slope constraint.
+// It does this by building up a buffer of samples, and removing any samples
+// which would create segments where the slope is invalid. As long as the
+// filter is seeded with enough future samples, the start won't change.
+//
+// We want the offset to be defined as tb = O(ta) + ta. For this to work, the
+// offset is (tb - ta). If we assume tb = O(ta) + ta + network_delay, then
+// O(ta) = tb - ta - network_delay. This means that the fastest time a message
+// can be delivered is going to be when the offset is the most positive.
+//
+// TODO(austin): Figure out how to track when we have used something from this
+// filter, and when we do that, enforce that it can't change anymore. That will
+// help us find when we haven't buffered far enough in the future.
+class NoncausalTimestampFilter {
+ public:
+ ~NoncausalTimestampFilter();
+
+ // Returns a line fit to the oldest 2 points in the timestamp list if
+ // available, or the only point (assuming 0 slope) if not available.
+ Line FitLine();
+
+ // Adds a new sample to our filtered timestamp list.
+ // Returns true if adding the sample changed the output from FitLine().
+ bool Sample(aos::monotonic_clock::time_point monotonic_now,
+ std::chrono::nanoseconds sample_ns);
+
+ // Removes any old timestamps from our timestamps list.
+ // Returns true if adding the sample changed the output from FitLine().
+ bool Pop(aos::monotonic_clock::time_point time);
+
+ // Returns the current list of timestamps in our list.
+ const std::deque<
+ std::tuple<aos::monotonic_clock::time_point, std::chrono::nanoseconds>>
+ ×tamps() {
+ return timestamps_;
+ }
+
+ void Debug() {
+ for (std::tuple<aos::monotonic_clock::time_point, std::chrono::nanoseconds>
+ timestamp : timestamps_) {
+ LOG(INFO) << std::get<0>(timestamp) << " offset "
+ << std::get<1>(timestamp).count();
+ }
+ }
+
+ // Sets the starting point and filename to log samples to. These functions
+ // are only used when doing CSV file logging to debug the filter.
+ void SetFirstTime(aos::monotonic_clock::time_point time);
+ void SetCsvFileName(std::string_view name);
+
+ private:
+ // Removes the oldest timestamp.
+ void PopFront() {
+ MaybeWriteTimestamp(timestamps_.front());
+ timestamps_.pop_front();
+ }
+
+ // Writes a timestamp to the file if it is reasonable.
+ void MaybeWriteTimestamp(
+ std::tuple<aos::monotonic_clock::time_point, std::chrono::nanoseconds>
+ timestamp);
+
+ std::deque<
+ std::tuple<aos::monotonic_clock::time_point, std::chrono::nanoseconds>>
+ timestamps_;
+
+ std::string csv_file_name_;
+ FILE *fp_ = nullptr;
+ FILE *samples_fp_ = nullptr;
+
+ aos::monotonic_clock::time_point first_time_ = aos::monotonic_clock::min_time;
+};
+
} // namespace message_bridge
} // namespace aos
diff --git a/aos/network/timestamp_filter_test.cc b/aos/network/timestamp_filter_test.cc
index c834850..b46f526 100644
--- a/aos/network/timestamp_filter_test.cc
+++ b/aos/network/timestamp_filter_test.cc
@@ -160,6 +160,156 @@
}
}
+// Tests that 2 samples results in the correct line between them, and the
+// correct intermediate as it is being built.
+TEST(NoncausalTimestampFilterTest, SingleSample) {
+ const monotonic_clock::time_point ta(chrono::nanoseconds(100000));
+ const monotonic_clock::time_point tb(chrono::nanoseconds(200000));
+
+ NoncausalTimestampFilter filter;
+
+ filter.Sample(ta, chrono::nanoseconds(1000));
+ EXPECT_EQ(filter.timestamps().size(), 1u);
+
+ {
+ Line l1 = filter.FitLine();
+
+ EXPECT_EQ(l1.mpq_offset(), mpq_class(1000));
+ EXPECT_EQ(l1.mpq_slope(), mpq_class(0));
+ }
+
+ filter.Sample(tb, chrono::nanoseconds(1100));
+ EXPECT_EQ(filter.timestamps().size(), 2u);
+
+ {
+ Line l2 = filter.FitLine();
+ EXPECT_EQ(l2.mpq_offset(), mpq_class(900));
+ EXPECT_EQ(l2.mpq_slope(), mpq_class(1, 1000));
+ }
+}
+
+// Tests that invalid samples get clipped as expected.
+TEST(NoncausalTimestampFilterTest, ClippedSample) {
+ const monotonic_clock::time_point ta(chrono::milliseconds(0));
+ const monotonic_clock::time_point tb(chrono::milliseconds(1));
+ const monotonic_clock::time_point tc(chrono::milliseconds(2));
+
+ {
+ // A positive slope of 1 ms/second is properly applied.
+ NoncausalTimestampFilter filter;
+
+ filter.Sample(ta, chrono::microseconds(1));
+ filter.Debug();
+ filter.Sample(tb, chrono::microseconds(2));
+ filter.Debug();
+ ASSERT_EQ(filter.timestamps().size(), 2u);
+
+ {
+ Line l2 = filter.FitLine();
+ EXPECT_EQ(l2.mpq_offset(), mpq_class(1000));
+ EXPECT_EQ(l2.mpq_slope(), mpq_class(1, 1000));
+ }
+ }
+
+ {
+ // A negative slope of 1 ms/second is properly applied.
+ NoncausalTimestampFilter filter;
+
+ filter.Sample(ta, chrono::microseconds(1));
+ filter.Debug();
+ filter.Sample(tb, chrono::microseconds(0));
+ filter.Debug();
+ ASSERT_EQ(filter.timestamps().size(), 2u);
+
+ {
+ Line l2 = filter.FitLine();
+ EXPECT_EQ(l2.mpq_offset(), mpq_class(1000));
+ EXPECT_EQ(l2.mpq_slope(), -mpq_class(1, 1000));
+ }
+ }
+
+ {
+ // Too much negative is ignored.
+ NoncausalTimestampFilter filter;
+
+ filter.Sample(ta, chrono::microseconds(1));
+ filter.Debug();
+ filter.Sample(tb, -chrono::microseconds(1));
+ filter.Debug();
+ ASSERT_EQ(filter.timestamps().size(), 1u);
+ }
+
+ {
+ // Too much positive pulls up the first point.
+ NoncausalTimestampFilter filter;
+
+ filter.Sample(ta, chrono::microseconds(1));
+ filter.Debug();
+ filter.Sample(tb, chrono::microseconds(3));
+ filter.Debug();
+ ASSERT_EQ(filter.timestamps().size(), 2u);
+
+ EXPECT_EQ(std::get<1>(filter.timestamps()[0]), chrono::microseconds(2));
+ EXPECT_EQ(std::get<1>(filter.timestamps()[1]), chrono::microseconds(3));
+ }
+
+ {
+ // Too much positive slope removes points.
+ NoncausalTimestampFilter filter;
+
+ filter.Sample(ta, chrono::microseconds(1));
+ filter.Debug();
+ filter.Sample(tb, chrono::microseconds(1));
+ filter.Debug();
+ ASSERT_EQ(filter.timestamps().size(), 2u);
+
+ // Now add a sample with a slope of 0.002. This should back propagate and
+ // remove the middle point since it violates our constraints.
+ filter.Sample(tc, chrono::microseconds(3));
+ filter.Debug();
+ ASSERT_EQ(filter.timestamps().size(), 2u);
+
+ EXPECT_EQ(std::get<1>(filter.timestamps()[0]), chrono::microseconds(1));
+ EXPECT_EQ(std::get<1>(filter.timestamps()[1]), chrono::microseconds(3));
+ }
+}
+
+// Tests that removing points from the filter works as expected.
+TEST(NoncausalTimestampFilterTest, PointRemoval) {
+ const monotonic_clock::time_point t_before(-chrono::milliseconds(1));
+ const monotonic_clock::time_point ta(chrono::milliseconds(0));
+ const monotonic_clock::time_point tb(chrono::milliseconds(1));
+ const monotonic_clock::time_point tc(chrono::milliseconds(2));
+
+ // A positive slope of 1 ms/second is properly applied.
+ NoncausalTimestampFilter filter;
+
+ filter.Sample(ta, chrono::microseconds(1));
+ filter.Debug();
+ filter.Sample(tb, chrono::microseconds(2));
+ filter.Debug();
+ filter.Sample(tc, chrono::microseconds(1));
+ filter.Debug();
+ ASSERT_EQ(filter.timestamps().size(), 3u);
+
+ // Before or in the middle of the first line segment shouldn't change the
+ // number of points.
+ EXPECT_FALSE(filter.Pop(t_before));
+ ASSERT_EQ(filter.timestamps().size(), 3u);
+
+ EXPECT_FALSE(filter.Pop(ta));
+ ASSERT_EQ(filter.timestamps().size(), 3u);
+
+ EXPECT_FALSE(filter.Pop(ta + chrono::microseconds(100)));
+ ASSERT_EQ(filter.timestamps().size(), 3u);
+
+ // The second point should trigger a pop, since the offset computed using the
+ // points won't change when it is used, and any times after (even 1-2 ns
+ // later) would be wrong.
+ EXPECT_TRUE(filter.Pop(tb));
+ ASSERT_EQ(filter.timestamps().size(), 2u);
+}
+
} // namespace testing
} // namespace message_bridge
} // namespace aos