Add Consume, Observe, and FreezeUntil to track timestamps
This lets us solve at every corner and track which corners have been
solved. It also then lets us report back which points have been
used and need to be frozen.
Change-Id: Id3133f661529a0e7da8b9d408678239dcf394fd4
diff --git a/aos/network/timestamp_filter.cc b/aos/network/timestamp_filter.cc
index 0683077..1df9ee7 100644
--- a/aos/network/timestamp_filter.cc
+++ b/aos/network/timestamp_filter.cc
@@ -1043,6 +1043,58 @@
return removed;
}
+std::optional<std::tuple<monotonic_clock::time_point, std::chrono::nanoseconds>>
+NoncausalTimestampFilter::Observe() const {
+ if (timestamps_.empty() || next_to_consume_ >= timestamps_.size()) {
+ return std::nullopt;
+ }
+ return TrimTuple(timestamps_[next_to_consume_]);
+}
+
+std::optional<std::tuple<monotonic_clock::time_point, std::chrono::nanoseconds>>
+NoncausalTimestampFilter::Consume() {
+ if (timestamps_.empty() || next_to_consume_ >= timestamps_.size()) {
+ return std::nullopt;
+ }
+
+ auto result = TrimTuple(timestamps_[next_to_consume_]);
+ ++next_to_consume_;
+ return result;
+}
+
+void NoncausalTimestampFilter::FreezeUntil(
+ aos::monotonic_clock::time_point node_monotonic_now) {
+ for (size_t i = 0; i < timestamps_.size(); ++i) {
+ if (std::get<0>(timestamps_[i]) > node_monotonic_now) {
+ return;
+ }
+ std::get<2>(timestamps_[i]) = true;
+ }
+
+ if (timestamps_.size() < 2u) {
+ // This will evaluate to a line. We can't support adding points to a line
+ // yet.
+ fully_frozen_ = true;
+ }
+}
+
+void NoncausalTimestampFilter::FreezeUntilRemote(
+ aos::monotonic_clock::time_point remote_monotonic_now) {
+ for (size_t i = 0; i < timestamps_.size(); ++i) {
+ if (std::get<0>(timestamps_[i]) + std::get<1>(timestamps_[i]) >
+ remote_monotonic_now) {
+ return;
+ }
+ std::get<2>(timestamps_[i]) = true;
+ }
+
+ if (timestamps_.size() < 2u) {
+ // This will evaluate to a line. We can't support adding points to a line
+ // yet.
+ fully_frozen_ = true;
+ }
+}
+
void NoncausalTimestampFilter::Freeze() {
if (timestamps_.size() >= 1u) {
std::get<2>(timestamps_[0]) = true;
diff --git a/aos/network/timestamp_filter.h b/aos/network/timestamp_filter.h
index 3771315..7a2c386 100644
--- a/aos/network/timestamp_filter.h
+++ b/aos/network/timestamp_filter.h
@@ -413,12 +413,15 @@
size_t timestamps_size() const { return timestamps_.size(); }
void Debug() {
+ size_t count = 0;
for (std::tuple<aos::monotonic_clock::time_point, std::chrono::nanoseconds,
bool>
timestamp : timestamps_) {
LOG(INFO) << std::get<0>(timestamp) << " offset "
<< std::get<1>(timestamp).count() << " frozen? "
- << std::get<2>(timestamp);
+ << std::get<2>(timestamp) << " consumed? "
+ << (count < next_to_consume_);
+ ++count;
}
}
@@ -432,6 +435,23 @@
// going forwards.
void Freeze();
+ // Marks all line segments up until the provided time on the provided node as
+ // used.
+ void FreezeUntil(aos::monotonic_clock::time_point node_monotonic_now);
+ void FreezeUntilRemote(aos::monotonic_clock::time_point remote_monotonic_now);
+
+ // Returns the next timestamp in the queue if available without incrementing
+ // the pointer. This, Consume, and FreezeUntil work together to allow
+ // tracking and freezing timestamps which have been combined externally.
+ std::optional<
+ std::tuple<monotonic_clock::time_point, std::chrono::nanoseconds>>
+ Observe() const;
+ // Returns the next timestamp in the queue if available, incrementing the
+ // pointer.
+ std::optional<
+ std::tuple<monotonic_clock::time_point, std::chrono::nanoseconds>>
+ Consume();
+
// Public for testing.
// Assuming that there are at least 2 points in timestamps_, finds the 2
// matching points.
@@ -473,6 +493,9 @@
void PopFront() {
MaybeWriteTimestamp(timestamps_.front());
timestamps_.pop_front();
+ if (next_to_consume_ > 0u) {
+ next_to_consume_--;
+ }
}
// Writes a timestamp to the file if it is reasonable.
@@ -490,6 +513,10 @@
std::chrono::nanoseconds, bool>>
timestamps_;
+ // The index of the next element in timestamps to consume. 0 means none have
+ // been consumed, and size() means all have been consumed.
+ size_t next_to_consume_ = 0;
+
// Holds any timestamps from before the start of the log to be flushed when we
// know when the log starts.
std::vector<
diff --git a/aos/network/timestamp_filter_test.cc b/aos/network/timestamp_filter_test.cc
index f3581d5..854bc78 100644
--- a/aos/network/timestamp_filter_test.cc
+++ b/aos/network/timestamp_filter_test.cc
@@ -211,6 +211,71 @@
}
}
+// Tests that 2 samples results in the correct line between them, and the
+// correct intermediate as it is being built.
+TEST(NoncausalTimestampFilterTest, PeekPop) {
+ const monotonic_clock::time_point ta(chrono::nanoseconds(100000));
+ const chrono::nanoseconds oa(chrono::nanoseconds(1000));
+ const monotonic_clock::time_point tb(chrono::nanoseconds(200000));
+ const chrono::nanoseconds ob(chrono::nanoseconds(1100));
+ const monotonic_clock::time_point tc(chrono::nanoseconds(300000));
+ const chrono::nanoseconds oc(chrono::nanoseconds(1010));
+
+ // Simple case, everything is done in order, nothing is dropped.
+ {
+ NoncausalTimestampFilter filter;
+
+ filter.Sample(ta, oa);
+ filter.Sample(tb, ob);
+ filter.Sample(tc, oc);
+
+ EXPECT_TRUE(filter.Observe());
+ EXPECT_EQ(*filter.Consume(), std::make_tuple(ta, oa));
+ EXPECT_TRUE(filter.Observe());
+ EXPECT_EQ(*filter.Consume(), std::make_tuple(tb, ob));
+ EXPECT_TRUE(filter.Observe());
+ EXPECT_EQ(*filter.Consume(), std::make_tuple(tc, oc));
+ EXPECT_FALSE(filter.Observe());
+ }
+
+ // Now try again while dropping ta after popping it.
+ {
+ NoncausalTimestampFilter filter;
+
+ filter.Sample(ta, oa);
+ filter.Sample(tb, ob);
+ filter.Sample(tc, oc);
+
+
+ EXPECT_TRUE(filter.Observe());
+ EXPECT_EQ(*filter.Consume(), std::make_tuple(ta, oa));
+
+ filter.Pop(tb);
+ EXPECT_TRUE(filter.Observe());
+ EXPECT_EQ(*filter.Consume(), std::make_tuple(tb, ob));
+ EXPECT_TRUE(filter.Observe());
+ EXPECT_EQ(*filter.Consume(), std::make_tuple(tc, oc));
+ EXPECT_FALSE(filter.Observe());
+ }
+
+ // Now try again while dropping ta before popping it.
+ {
+ NoncausalTimestampFilter filter;
+
+ filter.Sample(ta, oa);
+ filter.Sample(tb, ob);
+ filter.Sample(tc, oc);
+
+
+ filter.Pop(tb);
+ EXPECT_TRUE(filter.Observe());
+ EXPECT_EQ(*filter.Consume(), std::make_tuple(tb, ob));
+ EXPECT_TRUE(filter.Observe());
+ EXPECT_EQ(*filter.Consume(), std::make_tuple(tc, oc));
+ EXPECT_FALSE(filter.Observe());
+ }
+}
+
// Tests that invalid samples get clipped as expected.
TEST(NoncausalTimestampFilterTest, ClippedSample) {
const monotonic_clock::time_point ta(chrono::milliseconds(0));