Add LogReader ReplayChannels filtering

LogReader has a new input into its constructor which is replay_channels containing the
name & type pairs of channels to replay on. As a part of construction,
LogReader takes this replay_channels and acquires the channel indicies of
the channels being replayed and uses that to check on sending a message
if the channel is included in replay_channels or not. This functionality is
contained within TimestampMapper which takes a lambda to do the actual
filtering when calling Front().

Change-Id: I614bc70f89afab2e7f6d00a36dc569518d1edc5a
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/logfile_utils_test.cc b/aos/events/logging/logfile_utils_test.cc
index 3d99757..f453798 100644
--- a/aos/events/logging/logfile_utils_test.cc
+++ b/aos/events/logging/logfile_utils_test.cc
@@ -24,8 +24,8 @@
 namespace logger {
 namespace testing {
 namespace chrono = std::chrono;
-using aos::testing::ArtifactPath;
 using aos::message_bridge::RemoteMessage;
+using aos::testing::ArtifactPath;
 
 // Adapter class to make it easy to test DetachedBufferWriter without adding
 // test only boilerplate to DetachedBufferWriter.
@@ -1036,6 +1036,143 @@
   }
 }
 
+// Tests that we filter messages using the channel filter callback
+TEST_F(TimestampMapperTest, ReplayChannelsCallbackTest) {
+  const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+  {
+    TestDetachedBufferWriter writer0(logfile0_);
+    writer0.QueueSpan(config0_.span());
+    TestDetachedBufferWriter writer1(logfile1_);
+    writer1.QueueSpan(config2_.span());
+
+    writer0.WriteSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
+    writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
+        e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
+
+    writer0.WriteSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
+    writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
+        e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
+
+    writer0.WriteSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
+    writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
+        e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
+  }
+
+  const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+
+  ASSERT_EQ(parts[0].logger_node, "pi1");
+  ASSERT_EQ(parts[1].logger_node, "pi2");
+
+  // mapper0 will not provide any messages while mapper1 will provide all
+  // messages due to the channel filter callbacks used
+  size_t mapper0_count = 0;
+  TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+  mapper0.set_timestamp_callback(
+      [&](TimestampedMessage *) { ++mapper0_count; });
+  mapper0.set_replay_channels_callback(
+      [&](const TimestampedMessage &) -> bool { return mapper0_count != 2; });
+  size_t mapper1_count = 0;
+  TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+  mapper1.set_timestamp_callback(
+      [&](TimestampedMessage *) { ++mapper1_count; });
+  mapper1.set_replay_channels_callback(
+      [&](const TimestampedMessage &) -> bool { return mapper1_count != 2; });
+
+  mapper0.AddPeer(&mapper1);
+  mapper1.AddPeer(&mapper0);
+
+  {
+    std::deque<TimestampedMessage> output0;
+
+    EXPECT_EQ(mapper0_count, 0u);
+    EXPECT_EQ(mapper1_count, 0u);
+
+    ASSERT_TRUE(mapper0.Front() != nullptr);
+    EXPECT_EQ(mapper0_count, 1u);
+    EXPECT_EQ(mapper1_count, 0u);
+    output0.emplace_back(std::move(*mapper0.Front()));
+    mapper0.PopFront();
+
+    EXPECT_TRUE(mapper0.started());
+    EXPECT_EQ(mapper0_count, 1u);
+    EXPECT_EQ(mapper1_count, 0u);
+
+    // mapper0_count is now at 3 since the second message is not queued, but
+    // timestamp_callback needs to be called everytime even if Front() does not
+    // provide a message due to the replay_channels_callback.
+    ASSERT_TRUE(mapper0.Front() != nullptr);
+    EXPECT_EQ(mapper0_count, 3u);
+    EXPECT_EQ(mapper1_count, 0u);
+    output0.emplace_back(std::move(*mapper0.Front()));
+    mapper0.PopFront();
+
+    EXPECT_TRUE(mapper0.started());
+    EXPECT_EQ(mapper0_count, 3u);
+    EXPECT_EQ(mapper1_count, 0u);
+
+    ASSERT_TRUE(mapper0.Front() == nullptr);
+    EXPECT_TRUE(mapper0.started());
+
+    EXPECT_EQ(mapper0_count, 3u);
+    EXPECT_EQ(mapper1_count, 0u);
+
+    EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
+    EXPECT_EQ(output0[0].monotonic_event_time.time,
+              e + chrono::milliseconds(1000));
+    EXPECT_TRUE(output0[0].data != nullptr);
+
+    EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
+    EXPECT_EQ(output0[1].monotonic_event_time.time,
+              e + chrono::milliseconds(3000));
+    EXPECT_TRUE(output0[1].data != nullptr);
+  }
+
+  {
+    SCOPED_TRACE("Trying node1 now");
+    std::deque<TimestampedMessage> output1;
+
+    EXPECT_EQ(mapper0_count, 3u);
+    EXPECT_EQ(mapper1_count, 0u);
+
+    ASSERT_TRUE(mapper1.Front() != nullptr);
+    EXPECT_EQ(mapper0_count, 3u);
+    EXPECT_EQ(mapper1_count, 1u);
+    output1.emplace_back(std::move(*mapper1.Front()));
+    mapper1.PopFront();
+    EXPECT_TRUE(mapper1.started());
+    EXPECT_EQ(mapper0_count, 3u);
+    EXPECT_EQ(mapper1_count, 1u);
+
+    // mapper1_count is now at 3 since the second message is not queued, but
+    // timestamp_callback needs to be called everytime even if Front() does not
+    // provide a message due to the replay_channels_callback.
+    ASSERT_TRUE(mapper1.Front() != nullptr);
+    output1.emplace_back(std::move(*mapper1.Front()));
+    mapper1.PopFront();
+    EXPECT_TRUE(mapper1.started());
+
+    EXPECT_EQ(mapper0_count, 3u);
+    EXPECT_EQ(mapper1_count, 3u);
+
+    ASSERT_TRUE(mapper1.Front() == nullptr);
+
+    EXPECT_EQ(mapper0_count, 3u);
+    EXPECT_EQ(mapper1_count, 3u);
+
+    EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
+    EXPECT_EQ(output1[0].monotonic_event_time.time,
+              e + chrono::seconds(100) + chrono::milliseconds(1000));
+    EXPECT_TRUE(output1[0].data != nullptr);
+
+    EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
+    EXPECT_EQ(output1[1].monotonic_event_time.time,
+              e + chrono::seconds(100) + chrono::milliseconds(3000));
+    EXPECT_TRUE(output1[1].data != nullptr);
+  }
+}
 // Tests that a MessageHeader with monotonic_timestamp_time set gets properly
 // returned.
 TEST_F(TimestampMapperTest, MessageWithTimestampTime) {