Add LogReader ReplayChannels filtering
LogReader has a new input into its constructor which is replay_channels containing the
name & type pairs of channels to replay on. As a part of construction,
LogReader takes this replay_channels and acquires the channel indicies of
the channels being replayed and uses that to check on sending a message
if the channel is included in replay_channels or not. This functionality is
contained within TimestampMapper which takes a lambda to do the actual
filtering when calling Front().
Change-Id: I614bc70f89afab2e7f6d00a36dc569518d1edc5a
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/logfile_utils_test.cc b/aos/events/logging/logfile_utils_test.cc
index 3d99757..f453798 100644
--- a/aos/events/logging/logfile_utils_test.cc
+++ b/aos/events/logging/logfile_utils_test.cc
@@ -24,8 +24,8 @@
namespace logger {
namespace testing {
namespace chrono = std::chrono;
-using aos::testing::ArtifactPath;
using aos::message_bridge::RemoteMessage;
+using aos::testing::ArtifactPath;
// Adapter class to make it easy to test DetachedBufferWriter without adding
// test only boilerplate to DetachedBufferWriter.
@@ -1036,6 +1036,143 @@
}
}
+// Tests that we filter messages using the channel filter callback
+TEST_F(TimestampMapperTest, ReplayChannelsCallbackTest) {
+ const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+ {
+ TestDetachedBufferWriter writer0(logfile0_);
+ writer0.QueueSpan(config0_.span());
+ TestDetachedBufferWriter writer1(logfile1_);
+ writer1.QueueSpan(config2_.span());
+
+ writer0.WriteSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
+ writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
+
+ writer0.WriteSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
+ writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
+
+ writer0.WriteSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
+ writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
+ }
+
+ const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+
+ ASSERT_EQ(parts[0].logger_node, "pi1");
+ ASSERT_EQ(parts[1].logger_node, "pi2");
+
+ // mapper0 will not provide any messages while mapper1 will provide all
+ // messages due to the channel filter callbacks used
+ size_t mapper0_count = 0;
+ TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+ mapper0.set_timestamp_callback(
+ [&](TimestampedMessage *) { ++mapper0_count; });
+ mapper0.set_replay_channels_callback(
+ [&](const TimestampedMessage &) -> bool { return mapper0_count != 2; });
+ size_t mapper1_count = 0;
+ TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+ mapper1.set_timestamp_callback(
+ [&](TimestampedMessage *) { ++mapper1_count; });
+ mapper1.set_replay_channels_callback(
+ [&](const TimestampedMessage &) -> bool { return mapper1_count != 2; });
+
+ mapper0.AddPeer(&mapper1);
+ mapper1.AddPeer(&mapper0);
+
+ {
+ std::deque<TimestampedMessage> output0;
+
+ EXPECT_EQ(mapper0_count, 0u);
+ EXPECT_EQ(mapper1_count, 0u);
+
+ ASSERT_TRUE(mapper0.Front() != nullptr);
+ EXPECT_EQ(mapper0_count, 1u);
+ EXPECT_EQ(mapper1_count, 0u);
+ output0.emplace_back(std::move(*mapper0.Front()));
+ mapper0.PopFront();
+
+ EXPECT_TRUE(mapper0.started());
+ EXPECT_EQ(mapper0_count, 1u);
+ EXPECT_EQ(mapper1_count, 0u);
+
+ // mapper0_count is now at 3 since the second message is not queued, but
+ // timestamp_callback needs to be called everytime even if Front() does not
+ // provide a message due to the replay_channels_callback.
+ ASSERT_TRUE(mapper0.Front() != nullptr);
+ EXPECT_EQ(mapper0_count, 3u);
+ EXPECT_EQ(mapper1_count, 0u);
+ output0.emplace_back(std::move(*mapper0.Front()));
+ mapper0.PopFront();
+
+ EXPECT_TRUE(mapper0.started());
+ EXPECT_EQ(mapper0_count, 3u);
+ EXPECT_EQ(mapper1_count, 0u);
+
+ ASSERT_TRUE(mapper0.Front() == nullptr);
+ EXPECT_TRUE(mapper0.started());
+
+ EXPECT_EQ(mapper0_count, 3u);
+ EXPECT_EQ(mapper1_count, 0u);
+
+ EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
+ EXPECT_EQ(output0[0].monotonic_event_time.time,
+ e + chrono::milliseconds(1000));
+ EXPECT_TRUE(output0[0].data != nullptr);
+
+ EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
+ EXPECT_EQ(output0[1].monotonic_event_time.time,
+ e + chrono::milliseconds(3000));
+ EXPECT_TRUE(output0[1].data != nullptr);
+ }
+
+ {
+ SCOPED_TRACE("Trying node1 now");
+ std::deque<TimestampedMessage> output1;
+
+ EXPECT_EQ(mapper0_count, 3u);
+ EXPECT_EQ(mapper1_count, 0u);
+
+ ASSERT_TRUE(mapper1.Front() != nullptr);
+ EXPECT_EQ(mapper0_count, 3u);
+ EXPECT_EQ(mapper1_count, 1u);
+ output1.emplace_back(std::move(*mapper1.Front()));
+ mapper1.PopFront();
+ EXPECT_TRUE(mapper1.started());
+ EXPECT_EQ(mapper0_count, 3u);
+ EXPECT_EQ(mapper1_count, 1u);
+
+ // mapper1_count is now at 3 since the second message is not queued, but
+ // timestamp_callback needs to be called everytime even if Front() does not
+ // provide a message due to the replay_channels_callback.
+ ASSERT_TRUE(mapper1.Front() != nullptr);
+ output1.emplace_back(std::move(*mapper1.Front()));
+ mapper1.PopFront();
+ EXPECT_TRUE(mapper1.started());
+
+ EXPECT_EQ(mapper0_count, 3u);
+ EXPECT_EQ(mapper1_count, 3u);
+
+ ASSERT_TRUE(mapper1.Front() == nullptr);
+
+ EXPECT_EQ(mapper0_count, 3u);
+ EXPECT_EQ(mapper1_count, 3u);
+
+ EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
+ EXPECT_EQ(output1[0].monotonic_event_time.time,
+ e + chrono::seconds(100) + chrono::milliseconds(1000));
+ EXPECT_TRUE(output1[0].data != nullptr);
+
+ EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
+ EXPECT_EQ(output1[1].monotonic_event_time.time,
+ e + chrono::seconds(100) + chrono::milliseconds(3000));
+ EXPECT_TRUE(output1[1].data != nullptr);
+ }
+}
// Tests that a MessageHeader with monotonic_timestamp_time set gets properly
// returned.
TEST_F(TimestampMapperTest, MessageWithTimestampTime) {