Support queueing data in TimestampMapper and viewing timestamps
When solving for the starting time of a log, we don't always have enough
data queued on all the nodes. We can either do gymnastics to queue
enough data, or we can teach MultiNodeNoncausalOffsetEstimator how to
enqueue more data on each node when asked to evaluate times after the
pre-loaded range.
Teaching MultiNodeNoncausalOffsetEstimator how to automatically enqueue
more data will be more robust in the long term, so let's go that route.
This commit gives us enough hooks to remove the std::deque from inside
LogReader::State and directly use TimestampMapper. That in turn should
let us automatically enqueue more data when needed, and add it to the
filters automatically.
Change-Id: Ife1248d0b515d9ea9088ace84fa020a351289d0c
diff --git a/aos/events/logging/logfile_utils_test.cc b/aos/events/logging/logfile_utils_test.cc
index 979821e..fc7635a 100644
--- a/aos/events/logging/logfile_utils_test.cc
+++ b/aos/events/logging/logfile_utils_test.cc
@@ -815,8 +815,14 @@
ASSERT_EQ(parts[0].logger_node, "pi1");
ASSERT_EQ(parts[1].logger_node, "pi2");
+ size_t mapper0_count = 0;
TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+ mapper0.set_timestamp_callback(
+ [&](TimestampedMessage *) { ++mapper0_count; });
+ size_t mapper1_count = 0;
TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+ mapper1.set_timestamp_callback(
+ [&](TimestampedMessage *) { ++mapper1_count; });
mapper0.AddPeer(&mapper1);
mapper1.AddPeer(&mapper0);
@@ -824,12 +830,20 @@
{
std::deque<TimestampedMessage> output0;
+ EXPECT_EQ(mapper0_count, 0u);
+ EXPECT_EQ(mapper1_count, 0u);
ASSERT_TRUE(mapper0.Front() != nullptr);
+ EXPECT_EQ(mapper0_count, 1u);
+ EXPECT_EQ(mapper1_count, 0u);
output0.emplace_back(std::move(*mapper0.Front()));
mapper0.PopFront();
EXPECT_EQ(mapper0.sorted_until(), e + chrono::milliseconds(1900));
+ EXPECT_EQ(mapper0_count, 1u);
+ EXPECT_EQ(mapper1_count, 0u);
ASSERT_TRUE(mapper0.Front() != nullptr);
+ EXPECT_EQ(mapper0_count, 2u);
+ EXPECT_EQ(mapper1_count, 0u);
output0.emplace_back(std::move(*mapper0.Front()));
mapper0.PopFront();
EXPECT_EQ(mapper0.sorted_until(), e + chrono::milliseconds(2900));
@@ -839,6 +853,9 @@
mapper0.PopFront();
EXPECT_EQ(mapper0.sorted_until(), monotonic_clock::max_time);
+ EXPECT_EQ(mapper0_count, 3u);
+ EXPECT_EQ(mapper1_count, 0u);
+
ASSERT_TRUE(mapper0.Front() == nullptr);
EXPECT_EQ(output0[0].monotonic_event_time, e + chrono::milliseconds(1000));
@@ -853,13 +870,22 @@
SCOPED_TRACE("Trying node1 now");
std::deque<TimestampedMessage> output1;
+ EXPECT_EQ(mapper0_count, 3u);
+ EXPECT_EQ(mapper1_count, 0u);
+
ASSERT_TRUE(mapper1.Front() != nullptr);
+ EXPECT_EQ(mapper0_count, 3u);
+ EXPECT_EQ(mapper1_count, 1u);
output1.emplace_back(std::move(*mapper1.Front()));
mapper1.PopFront();
EXPECT_EQ(mapper1.sorted_until(),
e + chrono::seconds(100) + chrono::milliseconds(1900));
+ EXPECT_EQ(mapper0_count, 3u);
+ EXPECT_EQ(mapper1_count, 1u);
ASSERT_TRUE(mapper1.Front() != nullptr);
+ EXPECT_EQ(mapper0_count, 3u);
+ EXPECT_EQ(mapper1_count, 2u);
output1.emplace_back(std::move(*mapper1.Front()));
mapper1.PopFront();
EXPECT_EQ(mapper1.sorted_until(),
@@ -870,8 +896,14 @@
mapper1.PopFront();
EXPECT_EQ(mapper1.sorted_until(), monotonic_clock::max_time);
+ EXPECT_EQ(mapper0_count, 3u);
+ EXPECT_EQ(mapper1_count, 3u);
+
ASSERT_TRUE(mapper1.Front() == nullptr);
+ EXPECT_EQ(mapper0_count, 3u);
+ EXPECT_EQ(mapper1_count, 3u);
+
EXPECT_EQ(output1[0].monotonic_event_time,
e + chrono::seconds(100) + chrono::milliseconds(1000));
EXPECT_TRUE(output1[0].data.Verify());
@@ -920,8 +952,14 @@
ASSERT_EQ(parts.size(), 1u);
+ size_t mapper0_count = 0;
TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+ mapper0.set_timestamp_callback(
+ [&](TimestampedMessage *) { ++mapper0_count; });
+ size_t mapper1_count = 0;
TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+ mapper1.set_timestamp_callback(
+ [&](TimestampedMessage *) { ++mapper1_count; });
mapper0.AddPeer(&mapper1);
mapper1.AddPeer(&mapper0);
@@ -975,6 +1013,9 @@
EXPECT_EQ(output1[2].monotonic_timestamp_time, monotonic_clock::min_time);
EXPECT_TRUE(output1[2].data.Verify());
}
+
+ EXPECT_EQ(mapper0_count, 3u);
+ EXPECT_EQ(mapper1_count, 3u);
}
// Tests that we can match timestamps on delivered messages. By doing this in
@@ -1009,8 +1050,14 @@
ASSERT_EQ(parts[0].logger_node, "pi1");
ASSERT_EQ(parts[1].logger_node, "pi2");
+ size_t mapper0_count = 0;
TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+ mapper0.set_timestamp_callback(
+ [&](TimestampedMessage *) { ++mapper0_count; });
+ size_t mapper1_count = 0;
TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+ mapper1.set_timestamp_callback(
+ [&](TimestampedMessage *) { ++mapper1_count; });
mapper0.AddPeer(&mapper1);
mapper1.AddPeer(&mapper0);
@@ -1076,6 +1123,9 @@
EXPECT_EQ(output0[2].monotonic_event_time, e + chrono::milliseconds(3000));
EXPECT_TRUE(output0[2].data.Verify());
}
+
+ EXPECT_EQ(mapper0_count, 3u);
+ EXPECT_EQ(mapper1_count, 3u);
}
// Tests that we return just the timestamps if we couldn't find the data and the
@@ -1108,8 +1158,14 @@
ASSERT_EQ(parts[0].logger_node, "pi1");
ASSERT_EQ(parts[1].logger_node, "pi2");
+ size_t mapper0_count = 0;
TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+ mapper0.set_timestamp_callback(
+ [&](TimestampedMessage *) { ++mapper0_count; });
+ size_t mapper1_count = 0;
TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+ mapper1.set_timestamp_callback(
+ [&](TimestampedMessage *) { ++mapper1_count; });
mapper0.AddPeer(&mapper1);
mapper1.AddPeer(&mapper0);
@@ -1147,6 +1203,9 @@
e + chrono::seconds(100) + chrono::milliseconds(3000));
EXPECT_TRUE(output1[2].data.Verify());
}
+
+ EXPECT_EQ(mapper0_count, 0u);
+ EXPECT_EQ(mapper1_count, 3u);
}
// Tests that we return just the timestamps if we couldn't find the data and the
@@ -1179,8 +1238,14 @@
ASSERT_EQ(parts[0].logger_node, "pi1");
ASSERT_EQ(parts[1].logger_node, "pi2");
+ size_t mapper0_count = 0;
TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+ mapper0.set_timestamp_callback(
+ [&](TimestampedMessage *) { ++mapper0_count; });
+ size_t mapper1_count = 0;
TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+ mapper1.set_timestamp_callback(
+ [&](TimestampedMessage *) { ++mapper1_count; });
mapper0.AddPeer(&mapper1);
mapper1.AddPeer(&mapper0);
@@ -1218,6 +1283,9 @@
e + chrono::seconds(100) + chrono::milliseconds(3000));
EXPECT_FALSE(output1[2].data.Verify());
}
+
+ EXPECT_EQ(mapper0_count, 0u);
+ EXPECT_EQ(mapper1_count, 3u);
}
// Tests that we handle a message which failed to forward or be logged.
@@ -1251,8 +1319,14 @@
ASSERT_EQ(parts[0].logger_node, "pi1");
ASSERT_EQ(parts[1].logger_node, "pi2");
+ size_t mapper0_count = 0;
TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+ mapper0.set_timestamp_callback(
+ [&](TimestampedMessage *) { ++mapper0_count; });
+ size_t mapper1_count = 0;
TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+ mapper1.set_timestamp_callback(
+ [&](TimestampedMessage *) { ++mapper1_count; });
mapper0.AddPeer(&mapper1);
mapper1.AddPeer(&mapper0);
@@ -1276,6 +1350,9 @@
e + chrono::seconds(100) + chrono::milliseconds(3000));
EXPECT_TRUE(output1[1].data.Verify());
}
+
+ EXPECT_EQ(mapper0_count, 0u);
+ EXPECT_EQ(mapper1_count, 2u);
}
// Tests that we properly sort log files with duplicate timestamps.
@@ -1313,8 +1390,14 @@
ASSERT_EQ(parts[0].logger_node, "pi1");
ASSERT_EQ(parts[1].logger_node, "pi2");
+ size_t mapper0_count = 0;
TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+ mapper0.set_timestamp_callback(
+ [&](TimestampedMessage *) { ++mapper0_count; });
+ size_t mapper1_count = 0;
TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+ mapper1.set_timestamp_callback(
+ [&](TimestampedMessage *) { ++mapper1_count; });
mapper0.AddPeer(&mapper1);
mapper1.AddPeer(&mapper0);
@@ -1343,6 +1426,9 @@
e + chrono::seconds(100) + chrono::milliseconds(3000));
EXPECT_TRUE(output1[3].data.Verify());
}
+
+ EXPECT_EQ(mapper0_count, 0u);
+ EXPECT_EQ(mapper1_count, 4u);
}
// Tests that we properly sort log files with duplicate timestamps.
@@ -1360,11 +1446,15 @@
const std::vector<LogFile> parts =
SortParts({logfile0_, logfile1_, logfile2_});
+ size_t mapper0_count = 0;
TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+ mapper0.set_timestamp_callback(
+ [&](TimestampedMessage *) { ++mapper0_count; });
EXPECT_EQ(mapper0.monotonic_start_time(), e + chrono::milliseconds(1));
EXPECT_EQ(mapper0.realtime_start_time(),
realtime_clock::time_point(chrono::seconds(1000)));
+ EXPECT_EQ(mapper0_count, 0u);
}
// Tests that when a peer isn't registered, we treat that as if there was no
@@ -1397,7 +1487,10 @@
ASSERT_EQ(parts[0].logger_node, "pi1");
ASSERT_EQ(parts[1].logger_node, "pi2");
+ size_t mapper1_count = 0;
TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+ mapper1.set_timestamp_callback(
+ [&](TimestampedMessage *) { ++mapper1_count; });
{
std::deque<TimestampedMessage> output1;
@@ -1423,6 +1516,161 @@
e + chrono::seconds(100) + chrono::milliseconds(3000));
EXPECT_FALSE(output1[2].data.Verify());
}
+ EXPECT_EQ(mapper1_count, 3u);
+}
+
+// Tests that we can queue messages and call the timestamp callback for both
+// nodes.
+TEST_F(TimestampMapperTest, QueueUntilNode0) {
+ const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+ {
+ DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
+ writer0.QueueSpan(config0_.span());
+ DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
+ writer1.QueueSpan(config2_.span());
+
+ writer0.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
+ writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
+
+ writer0.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x006));
+ writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
+
+ writer0.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x007));
+ writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
+
+ writer0.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x008));
+ writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
+ }
+
+ const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+
+ ASSERT_EQ(parts[0].logger_node, "pi1");
+ ASSERT_EQ(parts[1].logger_node, "pi2");
+
+ size_t mapper0_count = 0;
+ TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+ mapper0.set_timestamp_callback(
+ [&](TimestampedMessage *) { ++mapper0_count; });
+ size_t mapper1_count = 0;
+ TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+ mapper1.set_timestamp_callback(
+ [&](TimestampedMessage *) { ++mapper1_count; });
+
+ mapper0.AddPeer(&mapper1);
+ mapper1.AddPeer(&mapper0);
+
+ {
+ std::deque<TimestampedMessage> output0;
+
+ EXPECT_EQ(mapper0_count, 0u);
+ EXPECT_EQ(mapper1_count, 0u);
+ mapper0.QueueUntil(e + chrono::milliseconds(1000));
+ EXPECT_EQ(mapper0_count, 3u);
+ EXPECT_EQ(mapper1_count, 0u);
+
+ ASSERT_TRUE(mapper0.Front() != nullptr);
+ EXPECT_EQ(mapper0_count, 3u);
+ EXPECT_EQ(mapper1_count, 0u);
+
+ mapper0.QueueUntil(e + chrono::milliseconds(1500));
+ EXPECT_EQ(mapper0_count, 3u);
+ EXPECT_EQ(mapper1_count, 0u);
+
+ mapper0.QueueUntil(e + chrono::milliseconds(2500));
+ EXPECT_EQ(mapper0_count, 4u);
+ EXPECT_EQ(mapper1_count, 0u);
+
+ output0.emplace_back(std::move(*mapper0.Front()));
+ mapper0.PopFront();
+ output0.emplace_back(std::move(*mapper0.Front()));
+ mapper0.PopFront();
+ output0.emplace_back(std::move(*mapper0.Front()));
+ mapper0.PopFront();
+ output0.emplace_back(std::move(*mapper0.Front()));
+ mapper0.PopFront();
+
+ EXPECT_EQ(mapper0_count, 4u);
+ EXPECT_EQ(mapper1_count, 0u);
+
+ ASSERT_TRUE(mapper0.Front() == nullptr);
+
+ EXPECT_EQ(output0[0].monotonic_event_time, e + chrono::milliseconds(1000));
+ EXPECT_TRUE(output0[0].data.Verify());
+ EXPECT_EQ(output0[1].monotonic_event_time, e + chrono::milliseconds(1000));
+ EXPECT_TRUE(output0[1].data.Verify());
+ EXPECT_EQ(output0[2].monotonic_event_time, e + chrono::milliseconds(2000));
+ EXPECT_TRUE(output0[2].data.Verify());
+ EXPECT_EQ(output0[3].monotonic_event_time, e + chrono::milliseconds(3000));
+ EXPECT_TRUE(output0[3].data.Verify());
+ }
+
+ {
+ SCOPED_TRACE("Trying node1 now");
+ std::deque<TimestampedMessage> output1;
+
+ EXPECT_EQ(mapper0_count, 4u);
+ EXPECT_EQ(mapper1_count, 0u);
+ mapper1.QueueUntil(e + chrono::seconds(100) + chrono::milliseconds(1000));
+ EXPECT_EQ(mapper0_count, 4u);
+ EXPECT_EQ(mapper1_count, 3u);
+
+ ASSERT_TRUE(mapper1.Front() != nullptr);
+ EXPECT_EQ(mapper0_count, 4u);
+ EXPECT_EQ(mapper1_count, 3u);
+
+ mapper1.QueueUntil(e + chrono::seconds(100) + chrono::milliseconds(1500));
+ EXPECT_EQ(mapper0_count, 4u);
+ EXPECT_EQ(mapper1_count, 3u);
+
+ mapper1.QueueUntil(e + chrono::seconds(100) + chrono::milliseconds(2500));
+ EXPECT_EQ(mapper0_count, 4u);
+ EXPECT_EQ(mapper1_count, 4u);
+
+ ASSERT_TRUE(mapper1.Front() != nullptr);
+ EXPECT_EQ(mapper0_count, 4u);
+ EXPECT_EQ(mapper1_count, 4u);
+
+ output1.emplace_back(std::move(*mapper1.Front()));
+ mapper1.PopFront();
+ ASSERT_TRUE(mapper1.Front() != nullptr);
+ output1.emplace_back(std::move(*mapper1.Front()));
+ mapper1.PopFront();
+ ASSERT_TRUE(mapper1.Front() != nullptr);
+ output1.emplace_back(std::move(*mapper1.Front()));
+ mapper1.PopFront();
+ ASSERT_TRUE(mapper1.Front() != nullptr);
+ output1.emplace_back(std::move(*mapper1.Front()));
+ mapper1.PopFront();
+
+ EXPECT_EQ(mapper0_count, 4u);
+ EXPECT_EQ(mapper1_count, 4u);
+
+ ASSERT_TRUE(mapper1.Front() == nullptr);
+
+ EXPECT_EQ(mapper0_count, 4u);
+ EXPECT_EQ(mapper1_count, 4u);
+
+ EXPECT_EQ(output1[0].monotonic_event_time,
+ e + chrono::seconds(100) + chrono::milliseconds(1000));
+ EXPECT_TRUE(output1[0].data.Verify());
+ EXPECT_EQ(output1[1].monotonic_event_time,
+ e + chrono::seconds(100) + chrono::milliseconds(1000));
+ EXPECT_TRUE(output1[1].data.Verify());
+ EXPECT_EQ(output1[2].monotonic_event_time,
+ e + chrono::seconds(100) + chrono::milliseconds(2000));
+ EXPECT_TRUE(output1[2].data.Verify());
+ EXPECT_EQ(output1[3].monotonic_event_time,
+ e + chrono::seconds(100) + chrono::milliseconds(3000));
+ EXPECT_TRUE(output1[3].data.Verify());
+ }
}
} // namespace testing