Add an overall noncausal timestamp filter
This merges and averages the forward and reverse timestamp filters, and
manages the two of them in preparation for using them in the log reader.
Change-Id: If05f3468bf9de25c1a10bb4b5df5b3999d992b8c
diff --git a/aos/network/BUILD b/aos/network/BUILD
index 8ea63fd..2f36854 100644
--- a/aos/network/BUILD
+++ b/aos/network/BUILD
@@ -360,6 +360,7 @@
srcs = ["timestamp_filter.cc"],
hdrs = ["timestamp_filter.h"],
deps = [
+ "//aos:configuration",
"//aos/time",
"//third_party/gmp",
"@com_google_absl//absl/strings",
@@ -373,6 +374,7 @@
],
deps = [
":timestamp_filter",
+ "//aos:json_to_flatbuffer",
"//aos/testing:googletest",
],
)
diff --git a/aos/network/timestamp_filter.cc b/aos/network/timestamp_filter.cc
index 3fca7e6..ba8f9bd 100644
--- a/aos/network/timestamp_filter.cc
+++ b/aos/network/timestamp_filter.cc
@@ -5,6 +5,7 @@
#include <tuple>
#include "absl/strings/str_cat.h"
+#include "aos/configuration.h"
#include "aos/time/time.h"
#include "third_party/gmp/gmpxx.h"
@@ -653,5 +654,102 @@
}
}
+void NoncausalOffsetEstimator::Sample(
+ const Node *node, aos::monotonic_clock::time_point node_delivered_time,
+ aos::monotonic_clock::time_point other_node_sent_time) {
+ if (node == node_a_) {
+ if (a_.Sample(node_delivered_time,
+ other_node_sent_time - node_delivered_time)) {
+ Refit();
+ }
+ } else if (node == node_b_) {
+ if (b_.Sample(node_delivered_time,
+ other_node_sent_time - node_delivered_time)) {
+ Refit();
+ }
+ } else {
+ LOG(FATAL) << "Unknown node " << node->name()->string_view();
+ }
+}
+
+bool NoncausalOffsetEstimator::Pop(
+ const Node *node, aos::monotonic_clock::time_point node_monotonic_now) {
+ if (node == node_a_) {
+ if (a_.Pop(node_monotonic_now)) {
+ VLOG(1) << "Popping forward sample to " << node_a_->name()->string_view()
+ << " from " << node_b_->name()->string_view() << " at "
+ << node_monotonic_now;
+ Refit();
+ return true;
+ }
+ } else if (node == node_b_) {
+ if (b_.Pop(node_monotonic_now)) {
+ VLOG(1) << "Popping reverse sample to " << node_b_->name()->string_view()
+ << " from " << node_a_->name()->string_view() << " at "
+ << node_monotonic_now;
+ Refit();
+ return true;
+ }
+ } else {
+ LOG(FATAL) << "Unknown node " << node->name()->string_view();
+ }
+ return false;
+}
+
+void NoncausalOffsetEstimator::LogFit(std::string_view prefix) {
+ LOG(INFO)
+ << prefix << " " << node_a_->name()->string_view() << " from "
+ << node_b_->name()->string_view() << " slope " << std::setprecision(20)
+ << fit_.slope() << " offset " << fit_.offset().count() << " a [("
+ << std::get<0>(a_.timestamps()[0]) << " -> "
+ << std::get<1>(a_.timestamps()[0]).count() << "ns), ("
+ << std::get<0>(a_.timestamps()[1]) << " -> "
+ << std::get<1>(a_.timestamps()[1]).count() << "ns) => {dt: " << std::fixed
+ << std::setprecision(6)
+ << std::chrono::duration<double, std::milli>(
+ std::get<0>(a_.timestamps()[1]) - std::get<0>(a_.timestamps()[0]))
+ .count()
+ << "ms, do: " << std::fixed << std::setprecision(6)
+ << std::chrono::duration<double, std::milli>(
+ std::get<1>(a_.timestamps()[1]) - std::get<1>(a_.timestamps()[0]))
+ .count()
+ << "ms}]";
+ LOG(INFO)
+ << prefix << " " << node_a_->name()->string_view() << " from "
+ << node_b_->name()->string_view() << " slope " << std::setprecision(20)
+ << fit_.slope() << " offset " << fit_.offset().count() << " b [("
+ << std::get<0>(b_.timestamps()[0]) << " -> "
+ << std::get<1>(b_.timestamps()[0]).count() << "ns), ("
+ << std::get<0>(b_.timestamps()[1]) << " -> "
+ << std::get<1>(b_.timestamps()[1]).count() << "ns) => {dt: " << std::fixed
+ << std::setprecision(6)
+ << std::chrono::duration<double, std::milli>(
+ std::get<0>(b_.timestamps()[1]) - std::get<0>(b_.timestamps()[0]))
+ .count()
+ << "ms, do: " << std::fixed << std::setprecision(6)
+ << std::chrono::duration<double, std::milli>(
+ std::get<1>(b_.timestamps()[1]) - std::get<1>(b_.timestamps()[0]))
+ .count()
+ << "ms}]";
+}
+
+void NoncausalOffsetEstimator::Refit() {
+ if (a_.timestamps().size() == 0 || b_.timestamps().size() == 0) {
+ VLOG(1) << "Not fitting because there is no data";
+ return;
+ }
+ fit_ = AverageFits(a_.FitLine(), b_.FitLine());
+ if (offset_pointer_) {
+ *offset_pointer_ = fit_.mpq_offset();
+ }
+ if (slope_pointer_) {
+ *slope_pointer_ = -fit_.mpq_slope();
+ }
+
+ if (VLOG_IS_ON(1)) {
+ LogFit("Refitting to");
+ }
+}
+
} // namespace message_bridge
} // namespace aos
diff --git a/aos/network/timestamp_filter.h b/aos/network/timestamp_filter.h
index 3f5d7d2..6994a8a 100644
--- a/aos/network/timestamp_filter.h
+++ b/aos/network/timestamp_filter.h
@@ -7,6 +7,7 @@
#include <cmath>
#include <deque>
+#include "aos/configuration.h"
#include "aos/time/time.h"
#include "glog/logging.h"
#include "third_party/gmp/gmpxx.h"
@@ -379,6 +380,74 @@
aos::monotonic_clock::time_point first_time_ = aos::monotonic_clock::min_time;
};
+// This class holds 2 NoncausalTimestampFilter's and handles averaging the
+// offsets and reporting them.
+class NoncausalOffsetEstimator {
+ public:
+ NoncausalOffsetEstimator(const Node *node_a, const Node *node_b)
+ : node_a_(node_a), node_b_(node_b) {}
+
+ // Updates the filter based on a sample from the provided node to the other
+ // node.
+ void Sample(const Node *node,
+ aos::monotonic_clock::time_point node_delivered_time,
+ aos::monotonic_clock::time_point other_node_sent_time);
+
+ // Removes old data points from a node before the provided time.
+ // Returns true if the line fit changes.
+ bool Pop(const Node *node,
+ aos::monotonic_clock::time_point node_monotonic_now);
+
+ // Returns a line for the oldest segment.
+ Line fit() const { return fit_; }
+
+ // Sets the locations to update when the fit changes.
+ void set_offset_pointer(mpq_class *offset_pointer) {
+ offset_pointer_ = offset_pointer;
+ }
+ void set_slope_pointer(mpq_class *slope_pointer) {
+ slope_pointer_ = slope_pointer;
+ }
+
+ // Returns the data points from each filter.
+ const std::deque<
+ std::tuple<aos::monotonic_clock::time_point, std::chrono::nanoseconds>>
+ &a_timestamps() {
+ return a_.timestamps();
+ }
+ const std::deque<
+ std::tuple<aos::monotonic_clock::time_point, std::chrono::nanoseconds>>
+ &b_timestamps() {
+ return b_.timestamps();
+ }
+
+ void SetFirstFwdTime(monotonic_clock::time_point time) {
+ a_.SetFirstTime(time);
+ }
+ void SetFwdCsvFileName(std::string_view name) { a_.SetCsvFileName(name); }
+ void SetFirstRevTime(monotonic_clock::time_point time) {
+ b_.SetFirstTime(time);
+ }
+ void SetRevCsvFileName(std::string_view name) { b_.SetCsvFileName(name); }
+
+ // Logs the fits and timestamps for all the filters.
+ void LogFit(std::string_view prefix);
+
+ private:
+ void Refit();
+
+ NoncausalTimestampFilter a_;
+ NoncausalTimestampFilter b_;
+
+ mpq_class *offset_pointer_ = nullptr;
+ mpq_class *slope_pointer_ = nullptr;
+
+ Line fit_{std::chrono::nanoseconds(0), 0.0};
+
+ const Node *const node_a_;
+ const Node *const node_b_;
+};
+
} // namespace message_bridge
} // namespace aos
diff --git a/aos/network/timestamp_filter_test.cc b/aos/network/timestamp_filter_test.cc
index b46f526..b495e2d 100644
--- a/aos/network/timestamp_filter_test.cc
+++ b/aos/network/timestamp_filter_test.cc
@@ -2,6 +2,8 @@
#include <chrono>
+#include "aos/configuration.h"
+#include "aos/json_to_flatbuffer.h"
#include "aos/macros.h"
#include "gtest/gtest.h"
@@ -310,6 +312,68 @@
ASSERT_EQ(filter.timestamps().size(), 2u);
}
+// Run a couple of points through the estimator and confirm it works.
+TEST(NoncausalOffsetEstimatorTest, FullEstimator) {
+ const aos::FlatbufferDetachedBuffer<Node> node_a_buffer =
+ JsonToFlatbuffer<Node>("{\"name\": \"a\"}");
+ const aos::FlatbufferDetachedBuffer<Node> node_b_buffer =
+ JsonToFlatbuffer<Node>("{\"name\": \"b\"}");
+
+ const Node *node_a = &node_a_buffer.message();
+ const Node *node_b = &node_b_buffer.message();
+
+ const monotonic_clock::time_point ta1(chrono::milliseconds(1000));
+ const monotonic_clock::time_point ta2 = ta1 + chrono::milliseconds(10);
+ const monotonic_clock::time_point ta3 = ta1 + chrono::milliseconds(20);
+
+ const monotonic_clock::time_point tb1(chrono::milliseconds(4000));
+ const monotonic_clock::time_point tb2 =
+ tb1 + chrono::milliseconds(10) + chrono::nanoseconds(100);
+ const monotonic_clock::time_point tb3 = tb1 + chrono::milliseconds(20);
+
+ NoncausalOffsetEstimator estimator(node_a, node_b);
+
+ // Add 3 timestamps in and confirm that the slopes come out reasonably.
+ estimator.Sample(node_a, ta1, tb1);
+ estimator.Sample(node_b, tb1, ta1);
+ EXPECT_EQ(estimator.a_timestamps().size(), 1u);
+ EXPECT_EQ(estimator.b_timestamps().size(), 1u);
+
+ // 1 point -> a line.
+ EXPECT_EQ(estimator.fit().mpq_slope(), mpq_class(0));
+
+ estimator.Sample(node_a, ta2, tb2);
+ estimator.Sample(node_b, tb2, ta2);
+ EXPECT_EQ(estimator.a_timestamps().size(), 2u);
+ EXPECT_EQ(estimator.b_timestamps().size(), 2u);
+
+ // Adding the second point should slope up.
+ EXPECT_EQ(estimator.fit().mpq_slope(), mpq_class(1, 100000));
+
+ estimator.Sample(node_a, ta3, tb3);
+ estimator.Sample(node_b, tb3, ta3);
+ EXPECT_EQ(estimator.a_timestamps().size(), 3u);
+ EXPECT_EQ(estimator.b_timestamps().size(), 3u);
+
+ // And the third point shouldn't change anything.
+ EXPECT_EQ(estimator.fit().mpq_slope(), mpq_class(1, 100000));
+
+ estimator.Pop(node_a, ta2);
+ estimator.Pop(node_b, tb2);
+ EXPECT_EQ(estimator.a_timestamps().size(), 2u);
+ EXPECT_EQ(estimator.b_timestamps().size(), 2u);
+
+ // Dropping the first point should have the slope point back down.
+ EXPECT_EQ(estimator.fit().mpq_slope(), mpq_class(-1, 100000));
+
+ // And dropping down to 1 point means 0 slope.
+ estimator.Pop(node_a, ta3);
+ estimator.Pop(node_b, tb3);
+ EXPECT_EQ(estimator.a_timestamps().size(), 1u);
+ EXPECT_EQ(estimator.b_timestamps().size(), 1u);
+ EXPECT_EQ(estimator.fit().mpq_slope(), mpq_class(0));
+}
+
} // namespace testing
} // namespace message_bridge
} // namespace aos