Add LogReader ReplayChannels filtering

LogReader has a new input into its constructor which is replay_channels containing the
name & type pairs of channels to replay on. As a part of construction,
LogReader takes this replay_channels and acquires the channel indicies of
the channels being replayed and uses that to check on sending a message
if the channel is included in replay_channels or not. This functionality is
contained within TimestampMapper which takes a lambda to do the actual
filtering when calling Front().

Change-Id: I614bc70f89afab2e7f6d00a36dc569518d1edc5a
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/realtime_replay_test.cc b/aos/events/logging/realtime_replay_test.cc
index 0cdf9fb..888b043 100644
--- a/aos/events/logging/realtime_replay_test.cc
+++ b/aos/events/logging/realtime_replay_test.cc
@@ -1,12 +1,15 @@
 #include "aos/events/logging/log_reader.h"
 #include "aos/events/logging/log_writer.h"
 #include "aos/events/ping_lib.h"
+#include "aos/events/pong_lib.h"
 #include "aos/events/shm_event_loop.h"
 #include "aos/json_to_flatbuffer.h"
 #include "aos/testing/path.h"
 #include "aos/testing/tmpdir.h"
 #include "gtest/gtest.h"
 
+DECLARE_string(override_hostname);
+
 namespace aos::logger::testing {
 
 class RealtimeLoggerTest : public ::testing::Test {
@@ -18,12 +21,54 @@
         config_(aos::configuration::ReadConfig(config_file_)),
         event_loop_factory_(&config_.message()),
         ping_event_loop_(event_loop_factory_.MakeEventLoop("ping")),
-        ping_(ping_event_loop_.get()) {
+        pong_event_loop_(event_loop_factory_.MakeEventLoop("pong")),
+        ping_(ping_event_loop_.get()),
+        pong_(pong_event_loop_.get()),
+        tmpdir_(aos::testing::TestTmpDir()),
+        base_name_(tmpdir_ + "/logfile/") {
     FLAGS_shm_base = shm_dir_;
 
-    // Nuke the shm dir, to ensure we aren't being affected by any preexisting
-    // tests.
+    // Nuke the shm and log dirs, to ensure we aren't being affected by any
+    // preexisting tests.
     aos::util::UnlinkRecursive(shm_dir_);
+    aos::util::UnlinkRecursive(base_name_);
+  }
+
+  gflags::FlagSaver flag_saver_;
+  std::string shm_dir_;
+
+  const std::string config_file_;
+  const aos::FlatbufferDetachedBuffer<aos::Configuration> config_;
+
+  // Factory and Ping class to generate a test logfile.
+  SimulatedEventLoopFactory event_loop_factory_;
+  std::unique_ptr<EventLoop> ping_event_loop_;
+  std::unique_ptr<EventLoop> pong_event_loop_;
+  Ping ping_;
+  Pong pong_;
+  const std::string tmpdir_;
+  const std::string base_name_;
+};
+
+class RealtimeMultiNodeLoggerTest : public ::testing::Test {
+ protected:
+  RealtimeMultiNodeLoggerTest()
+      : shm_dir_(aos::testing::TestTmpDir() + "/aos"),
+        config_file_(aos::testing::ArtifactPath(
+            "aos/events/logging/multinode_pingpong_combined_config.json")),
+        config_(aos::configuration::ReadConfig(config_file_)),
+        event_loop_factory_(&config_.message()),
+        ping_event_loop_(event_loop_factory_.MakeEventLoop(
+            "pi1", configuration::GetNode(&config_.message(), "pi1"))),
+        ping_(ping_event_loop_.get()),
+        tmpdir_(aos::testing::TestTmpDir()),
+        base_name_(tmpdir_ + "/logfile/") {
+    FLAGS_shm_base = shm_dir_;
+
+    // Nuke the shm and log dirs, to ensure we aren't being affected by any
+    // preexisting tests.
+    aos::util::UnlinkRecursive(shm_dir_);
+    aos::util::UnlinkRecursive(base_name_);
   }
 
   gflags::FlagSaver flag_saver_;
@@ -36,12 +81,11 @@
   SimulatedEventLoopFactory event_loop_factory_;
   std::unique_ptr<EventLoop> ping_event_loop_;
   Ping ping_;
+  const std::string tmpdir_;
+  const std::string base_name_;
 };
 
 TEST_F(RealtimeLoggerTest, RealtimeReplay) {
-  const std::string tmpdir = aos::testing::TestTmpDir();
-  const std::string base_name = tmpdir + "/logfile/";
-  aos::util::UnlinkRecursive(base_name);
   {
     std::unique_ptr<EventLoop> logger_event_loop =
         event_loop_factory_.MakeEventLoop("logger");
@@ -51,11 +95,11 @@
     Logger logger(logger_event_loop.get());
     logger.set_separate_config(false);
     logger.set_polling_period(std::chrono::milliseconds(100));
-    logger.StartLoggingOnRun(base_name);
+    logger.StartLoggingOnRun(base_name_);
     event_loop_factory_.RunFor(std::chrono::milliseconds(2000));
   }
 
-  LogReader reader(logger::SortParts(logger::FindLogs(base_name)));
+  LogReader reader(logger::SortParts(logger::FindLogs(base_name_)));
   ShmEventLoop shm_event_loop(reader.configuration());
   reader.Register(&shm_event_loop);
   reader.OnEnd(shm_event_loop.node(),
@@ -73,4 +117,248 @@
   ASSERT_TRUE(ping_fetcher.Fetch());
   ASSERT_EQ(ping_fetcher->value(), 210);
 }
+
+// Tests that ReplayChannels causes no messages to be replayed other than what
+// is included on a single node config
+TEST_F(RealtimeLoggerTest, SingleNodeReplayChannels) {
+  {
+    std::unique_ptr<EventLoop> logger_event_loop =
+        event_loop_factory_.MakeEventLoop("logger");
+
+    event_loop_factory_.RunFor(std::chrono::milliseconds(95));
+
+    Logger logger(logger_event_loop.get());
+    logger.set_separate_config(false);
+    logger.set_polling_period(std::chrono::milliseconds(100));
+    logger.StartLoggingOnRun(base_name_);
+    event_loop_factory_.RunFor(std::chrono::milliseconds(2000));
+  }
+
+  ReplayChannels replay_channels{{"/test", "aos.examples.Ping"}};
+  LogReader reader(logger::SortParts(logger::FindLogs(base_name_)),
+                   &config_.message(), &replay_channels);
+  ShmEventLoop shm_event_loop(reader.configuration());
+  reader.Register(&shm_event_loop);
+  reader.OnEnd(shm_event_loop.node(),
+               [&shm_event_loop]() { shm_event_loop.Exit(); });
+
+  Fetcher<examples::Ping> ping_fetcher =
+      shm_event_loop.MakeFetcher<examples::Ping>("/test");
+  Fetcher<examples::Pong> pong_fetcher =
+      shm_event_loop.MakeFetcher<examples::Pong>("/test");
+
+  shm_event_loop.AddTimer([]() { LOG(INFO) << "Hello, World!"; })
+      ->Setup(shm_event_loop.monotonic_now(), std::chrono::seconds(1));
+
+  auto *const end_timer = shm_event_loop.AddTimer([&shm_event_loop]() {
+    LOG(INFO) << "All done, quitting now";
+    shm_event_loop.Exit();
+  });
+
+  // TODO(EricS) reader.OnEnd() is not working as expected when
+  // using a channel filter.
+  // keep looking for 3 seconds if some message comes, just in case
+  size_t run_seconds = 3;
+  shm_event_loop.OnRun([&shm_event_loop, end_timer, run_seconds]() {
+    LOG(INFO) << "Quitting in: " << run_seconds;
+    end_timer->Setup(shm_event_loop.monotonic_now() +
+                     std::chrono::seconds(run_seconds));
+  });
+  shm_event_loop.Run();
+  reader.Deregister();
+
+  ASSERT_TRUE(ping_fetcher.Fetch());
+  ASSERT_EQ(ping_fetcher->value(), 210);
+  ASSERT_FALSE(pong_fetcher.Fetch());
+}
+
+// Tests that ReplayChannels causes no messages to be replayed other than what
+// is included on a multi node config
+TEST_F(RealtimeMultiNodeLoggerTest, ReplayChannelsPingTest) {
+  FLAGS_override_hostname = "raspberrypi";
+  {
+    std::unique_ptr<EventLoop> logger_event_loop =
+        event_loop_factory_.MakeEventLoop(
+            "logger", configuration::GetNode(&config_.message(), "pi1"));
+
+    event_loop_factory_.RunFor(std::chrono::milliseconds(95));
+
+    Logger logger(logger_event_loop.get());
+    logger.set_separate_config(false);
+    logger.set_polling_period(std::chrono::milliseconds(100));
+
+    std::unique_ptr<MultiNodeLogNamer> namer =
+        std::make_unique<MultiNodeLogNamer>(
+            base_name_, &config_.message(), logger_event_loop.get(),
+            configuration::GetNode(&config_.message(), "pi1"));
+
+    logger.StartLogging(std::move(namer));
+    event_loop_factory_.RunFor(std::chrono::milliseconds(2000));
+  }
+
+  ReplayChannels replay_channels{{"/test", "aos.examples.Ping"}};
+  LogReader reader(logger::SortParts(logger::FindLogs(base_name_)),
+                   &config_.message(), &replay_channels);
+  ShmEventLoop shm_event_loop(reader.configuration());
+  reader.Register(&shm_event_loop);
+  reader.OnEnd(shm_event_loop.node(),
+               [&shm_event_loop]() { shm_event_loop.Exit(); });
+
+  Fetcher<examples::Ping> ping_fetcher =
+      shm_event_loop.MakeFetcher<examples::Ping>("/test");
+
+  shm_event_loop.AddTimer([]() { LOG(INFO) << "Hello, World!"; })
+      ->Setup(shm_event_loop.monotonic_now(), std::chrono::seconds(1));
+
+  shm_event_loop.Run();
+  reader.Deregister();
+
+  ASSERT_TRUE(ping_fetcher.Fetch());
+  ASSERT_EQ(ping_fetcher->value(), 210);
+}
+
+// Tests that when remapping a channel included in ReplayChannels messages are
+// sent on the remapped channel
+TEST_F(RealtimeMultiNodeLoggerTest, RemappedReplayChannelsTest) {
+  FLAGS_override_hostname = "raspberrypi";
+  {
+    std::unique_ptr<EventLoop> logger_event_loop =
+        event_loop_factory_.MakeEventLoop(
+            "logger", configuration::GetNode(&config_.message(), "pi1"));
+
+    event_loop_factory_.RunFor(std::chrono::milliseconds(95));
+
+    Logger logger(logger_event_loop.get());
+    logger.set_separate_config(false);
+    logger.set_polling_period(std::chrono::milliseconds(100));
+
+    std::unique_ptr<MultiNodeLogNamer> namer =
+        std::make_unique<MultiNodeLogNamer>(
+            base_name_, &config_.message(), logger_event_loop.get(),
+            configuration::GetNode(&config_.message(), "pi1"));
+
+    logger.StartLogging(std::move(namer));
+    event_loop_factory_.RunFor(std::chrono::milliseconds(2000));
+  }
+
+  ReplayChannels replay_channels{{"/test", "aos.examples.Ping"}};
+  LogReader reader(logger::SortParts(logger::FindLogs(base_name_)),
+                   &config_.message(), &replay_channels);
+  reader.RemapLoggedChannel<aos::examples::Ping>("/test", "/original");
+  ShmEventLoop shm_event_loop(reader.configuration());
+  reader.Register(&shm_event_loop);
+  reader.OnEnd(shm_event_loop.node(),
+               [&shm_event_loop]() { shm_event_loop.Exit(); });
+
+  Fetcher<examples::Ping> original_ping_fetcher =
+      shm_event_loop.MakeFetcher<examples::Ping>("/original/test");
+
+  Fetcher<examples::Ping> ping_fetcher =
+      shm_event_loop.MakeFetcher<examples::Ping>("/test");
+
+  shm_event_loop.AddTimer([]() { LOG(INFO) << "Hello, World!"; })
+      ->Setup(shm_event_loop.monotonic_now(), std::chrono::seconds(1));
+
+  shm_event_loop.Run();
+  reader.Deregister();
+
+  ASSERT_TRUE(original_ping_fetcher.Fetch());
+  ASSERT_EQ(original_ping_fetcher->value(), 210);
+  ASSERT_FALSE(ping_fetcher.Fetch());
+}
+
+// Tests that messages are not replayed when they do not exist in the
+// ReplayChannels provided to LogReader. The channels used here do not
+// exist in the log being replayed, and there's no messages on those
+// channels as well.
+TEST_F(RealtimeMultiNodeLoggerTest, DoesNotExistInReplayChannelsTest) {
+  FLAGS_override_hostname = "raspberrypi";
+  {
+    std::unique_ptr<EventLoop> logger_event_loop =
+        event_loop_factory_.MakeEventLoop(
+            "logger", configuration::GetNode(&config_.message(), "pi1"));
+
+    event_loop_factory_.RunFor(std::chrono::milliseconds(95));
+
+    Logger logger(logger_event_loop.get());
+    logger.set_separate_config(false);
+    logger.set_polling_period(std::chrono::milliseconds(100));
+    std::unique_ptr<MultiNodeLogNamer> namer =
+        std::make_unique<MultiNodeLogNamer>(
+            base_name_, &config_.message(), logger_event_loop.get(),
+            configuration::GetNode(&config_.message(), "pi1"));
+
+    logger.StartLogging(std::move(namer));
+    event_loop_factory_.RunFor(std::chrono::milliseconds(2000));
+  }
+
+  ReplayChannels replay_channels{{"/test", "aos.examples.Pong"},
+                                 {"/test", "fake"},
+                                 {"fake", "aos.examples.Ping"}};
+  LogReader reader(logger::SortParts(logger::FindLogs(base_name_)),
+                   &config_.message(), &replay_channels);
+  ShmEventLoop shm_event_loop(reader.configuration());
+  reader.Register(&shm_event_loop);
+  reader.OnEnd(shm_event_loop.node(),
+               [&shm_event_loop]() { shm_event_loop.Exit(); });
+
+  Fetcher<examples::Ping> ping_fetcher =
+      shm_event_loop.MakeFetcher<examples::Ping>("/test");
+
+  auto *const end_timer = shm_event_loop.AddTimer([&shm_event_loop]() {
+    LOG(INFO) << "All done, quitting now";
+    shm_event_loop.Exit();
+  });
+
+  // TODO(#21) reader.OnEnd() is not working as expected when
+  // using replay_channels
+  // keep looking for 3 seconds if some message comes, just in case
+  size_t run_seconds = 3;
+  shm_event_loop.OnRun([&shm_event_loop, end_timer, run_seconds]() {
+    LOG(INFO) << "Quitting in: " << run_seconds;
+    end_timer->Setup(shm_event_loop.monotonic_now() +
+                     std::chrono::seconds(run_seconds));
+  });
+
+  shm_event_loop.Run();
+  reader.Deregister();
+  ASSERT_FALSE(ping_fetcher.Fetch());
+}
+
+using RealtimeMultiNodeLoggerDeathTest = RealtimeMultiNodeLoggerTest;
+
+// Tests that remapping a channel not included in the replay channels passed to
+// LogReader throws an error since this would indicate the user is trying to use
+// the channel being remapped.
+TEST_F(RealtimeMultiNodeLoggerDeathTest,
+       RemapLoggedChannelNotIncludedInReplayChannels) {
+  FLAGS_override_hostname = "raspberrypi";
+  {
+    std::unique_ptr<EventLoop> logger_event_loop =
+        event_loop_factory_.MakeEventLoop(
+            "logger", configuration::GetNode(&config_.message(), "pi1"));
+
+    event_loop_factory_.RunFor(std::chrono::milliseconds(95));
+
+    Logger logger(logger_event_loop.get());
+    logger.set_separate_config(false);
+    logger.set_polling_period(std::chrono::milliseconds(100));
+
+    std::unique_ptr<MultiNodeLogNamer> namer =
+        std::make_unique<MultiNodeLogNamer>(
+            base_name_, &config_.message(), logger_event_loop.get(),
+            configuration::GetNode(&config_.message(), "pi1"));
+
+    logger.StartLogging(std::move(namer));
+    event_loop_factory_.RunFor(std::chrono::milliseconds(2000));
+  }
+
+  ReplayChannels replay_channels{{"/test", "aos.examples.Ping"}};
+  LogReader reader(logger::SortParts(logger::FindLogs(base_name_)),
+                   &config_.message(), &replay_channels);
+  EXPECT_DEATH(
+      reader.RemapLoggedChannel<aos::examples::Ping>("/fake", "/original"),
+      "which is not included in the replay channels passed to LogReader");
+}
+
 }  // namespace aos::logger::testing