LogReader Before Send Callback

Adds the ability to add a callback to LogReader to mutate or act on a
message right before it is sent.

Change-Id: I94a6b9fa2074c0a9aa8ea23cbc979e6cbce4bd05
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/multinode_logger_test.cc b/aos/events/logging/multinode_logger_test.cc
index 44736a5..677fc72 100644
--- a/aos/events/logging/multinode_logger_test.cc
+++ b/aos/events/logging/multinode_logger_test.cc
@@ -551,6 +551,253 @@
   reader.Deregister();
 }
 
+// MultinodeLoggerTest that tests the mutate callback works across multiple
+// nodes with remapping
+TEST_P(MultinodeLoggerTest, MultiNodeRemapMutateCallback) {
+  time_converter_.StartEqual();
+  std::vector<std::string> actual_filenames;
+
+  {
+    LoggerState pi1_logger = MakeLogger(pi1_);
+    LoggerState pi2_logger = MakeLogger(pi2_);
+
+    event_loop_factory_.RunFor(chrono::milliseconds(95));
+
+    StartLogger(&pi1_logger);
+    StartLogger(&pi2_logger);
+
+    event_loop_factory_.RunFor(chrono::milliseconds(20000));
+    pi1_logger.AppendAllFilenames(&actual_filenames);
+    pi2_logger.AppendAllFilenames(&actual_filenames);
+  }
+
+  const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
+
+  LogReader reader(sorted_parts, &config_.message());
+  // Remap just on pi1.
+  reader.RemapLoggedChannel<examples::Pong>(
+      "/test", configuration::GetNode(reader.configuration(), "pi1"));
+
+  SimulatedEventLoopFactory log_reader_factory(reader.configuration());
+
+  int pong_count = 0;
+  // Adds a callback which mutates the value of the pong message before the
+  // message is sent which is the feature we are testing here
+  reader.AddBeforeSendCallback("/test",
+                               [&pong_count](aos::examples::Pong *pong) {
+                                 pong->mutate_value(pong->value() + 1);
+                                 pong_count = pong->value();
+                               });
+
+  // This sends out the fetched messages and advances time to the start of the
+  // log file.
+  reader.Register(&log_reader_factory);
+
+  const Node *pi1 =
+      configuration::GetNode(log_reader_factory.configuration(), "pi1");
+  const Node *pi2 =
+      configuration::GetNode(log_reader_factory.configuration(), "pi2");
+
+  EXPECT_THAT(reader.LoggedNodes(),
+              ::testing::ElementsAre(
+                  configuration::GetNode(reader.logged_configuration(), pi1),
+                  configuration::GetNode(reader.logged_configuration(), pi2)));
+
+  std::unique_ptr<EventLoop> pi1_event_loop =
+      log_reader_factory.MakeEventLoop("test", pi1);
+  std::unique_ptr<EventLoop> pi2_event_loop =
+      log_reader_factory.MakeEventLoop("test", pi2);
+
+  pi1_event_loop->MakeWatcher("/original/test",
+                              [&pong_count](const examples::Pong &pong) {
+                                EXPECT_EQ(pong_count, pong.value());
+                              });
+
+  pi2_event_loop->MakeWatcher("/test",
+                              [&pong_count](const examples::Pong &pong) {
+                                EXPECT_EQ(pong_count, pong.value());
+                              });
+
+  reader.event_loop_factory()->RunFor(std::chrono::seconds(100));
+  reader.Deregister();
+
+  EXPECT_EQ(pong_count, 2011);
+}
+
+// MultinodeLoggerTest that tests the mutate callback works across multiple
+// nodes
+TEST_P(MultinodeLoggerTest, MultiNodeMutateCallback) {
+  time_converter_.StartEqual();
+  std::vector<std::string> actual_filenames;
+
+  {
+    LoggerState pi1_logger = MakeLogger(pi1_);
+    LoggerState pi2_logger = MakeLogger(pi2_);
+
+    event_loop_factory_.RunFor(chrono::milliseconds(95));
+
+    StartLogger(&pi1_logger);
+    StartLogger(&pi2_logger);
+
+    event_loop_factory_.RunFor(chrono::milliseconds(20000));
+    pi1_logger.AppendAllFilenames(&actual_filenames);
+    pi2_logger.AppendAllFilenames(&actual_filenames);
+  }
+
+  const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
+
+  LogReader reader(sorted_parts, &config_.message());
+
+  int pong_count = 0;
+  // Adds a callback which mutates the value of the pong message before the
+  // message is sent which is the feature we are testing here
+  reader.AddBeforeSendCallback("/test",
+                               [&pong_count](aos::examples::Pong *pong) {
+                                 pong->mutate_value(pong->value() + 1);
+                                 pong_count = pong->value();
+                               });
+
+  SimulatedEventLoopFactory log_reader_factory(reader.configuration());
+
+  // This sends out the fetched messages and advances time to the start of the
+  // log file.
+  reader.Register(&log_reader_factory);
+
+  const Node *pi1 =
+      configuration::GetNode(log_reader_factory.configuration(), "pi1");
+  const Node *pi2 =
+      configuration::GetNode(log_reader_factory.configuration(), "pi2");
+
+  EXPECT_THAT(reader.LoggedNodes(),
+              ::testing::ElementsAre(
+                  configuration::GetNode(reader.logged_configuration(), pi1),
+                  configuration::GetNode(reader.logged_configuration(), pi2)));
+
+  std::unique_ptr<EventLoop> pi1_event_loop =
+      log_reader_factory.MakeEventLoop("test", pi1);
+  std::unique_ptr<EventLoop> pi2_event_loop =
+      log_reader_factory.MakeEventLoop("test", pi2);
+
+  pi1_event_loop->MakeWatcher("/test",
+                              [&pong_count](const examples::Pong &pong) {
+                                EXPECT_EQ(pong_count, pong.value());
+                              });
+
+  pi2_event_loop->MakeWatcher("/test",
+                              [&pong_count](const examples::Pong &pong) {
+                                EXPECT_EQ(pong_count, pong.value());
+                              });
+
+  reader.event_loop_factory()->RunFor(std::chrono::seconds(100));
+  reader.Deregister();
+
+  EXPECT_EQ(pong_count, 2011);
+}
+
+// Tests that the before send callback is only called from the sender node if it
+// is forwarded
+TEST_P(MultinodeLoggerTest, OnlyDoBeforeSendCallbackOnSenderNode) {
+  time_converter_.StartEqual();
+  {
+    LoggerState pi1_logger = MakeLogger(pi1_);
+    LoggerState pi2_logger = MakeLogger(pi2_);
+
+    event_loop_factory_.RunFor(chrono::milliseconds(95));
+
+    StartLogger(&pi1_logger);
+    StartLogger(&pi2_logger);
+
+    event_loop_factory_.RunFor(chrono::milliseconds(20000));
+  }
+
+  LogReader reader(SortParts(logfiles_));
+
+  int ping_count = 0;
+  // Adds a callback which mutates the value of the pong message before the
+  // message is sent which is the feature we are testing here
+  reader.AddBeforeSendCallback("/test",
+                               [&ping_count](aos::examples::Ping *ping) {
+                                 ++ping_count;
+                                 ping->mutate_value(ping_count);
+                               });
+
+  SimulatedEventLoopFactory log_reader_factory(reader.configuration());
+  log_reader_factory.set_send_delay(chrono::microseconds(0));
+
+  reader.Register(&log_reader_factory);
+
+  const Node *pi1 =
+      configuration::GetNode(log_reader_factory.configuration(), "pi1");
+  const Node *pi2 =
+      configuration::GetNode(log_reader_factory.configuration(), "pi2");
+
+  std::unique_ptr<EventLoop> pi1_event_loop =
+      log_reader_factory.MakeEventLoop("test", pi1);
+  pi1_event_loop->SkipTimingReport();
+  std::unique_ptr<EventLoop> pi2_event_loop =
+      log_reader_factory.MakeEventLoop("test", pi2);
+  pi2_event_loop->SkipTimingReport();
+
+  MessageCounter<examples::Ping> pi1_ping(pi1_event_loop.get(), "/test");
+  MessageCounter<examples::Ping> pi2_ping(pi2_event_loop.get(), "/test");
+
+  std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
+      pi1_ping_timestamp;
+  if (!shared()) {
+    pi1_ping_timestamp =
+        std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
+            pi1_event_loop.get(),
+            "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping");
+  }
+
+  log_reader_factory.Run();
+
+  EXPECT_EQ(pi1_ping.count(), 2000u);
+  EXPECT_EQ(pi2_ping.count(), 2000u);
+  // If the BeforeSendCallback is called on both nodes, then the ping count
+  // would be 4002 instead of 2001
+  EXPECT_EQ(ping_count, 2001u);
+  if (!shared()) {
+    EXPECT_EQ(pi1_ping_timestamp->count(), 2000u);
+  }
+
+  reader.Deregister();
+}
+
+// Tests that we do not allow adding callbacks after Register is called
+TEST_P(MultinodeLoggerDeathTest, AddCallbackAfterRegister) {
+  time_converter_.StartEqual();
+  std::vector<std::string> actual_filenames;
+
+  {
+    LoggerState pi1_logger = MakeLogger(pi1_);
+    LoggerState pi2_logger = MakeLogger(pi2_);
+
+    event_loop_factory_.RunFor(chrono::milliseconds(95));
+
+    StartLogger(&pi1_logger);
+    StartLogger(&pi2_logger);
+
+    event_loop_factory_.RunFor(chrono::milliseconds(20000));
+    pi1_logger.AppendAllFilenames(&actual_filenames);
+    pi2_logger.AppendAllFilenames(&actual_filenames);
+  }
+
+  const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
+
+  LogReader reader(sorted_parts, &config_.message());
+  SimulatedEventLoopFactory log_reader_factory(reader.configuration());
+  reader.Register(&log_reader_factory);
+  EXPECT_DEATH(
+      {
+        reader.AddBeforeSendCallback("/test", [](aos::examples::Pong *) {
+          LOG(FATAL) << "This should not be called";
+        });
+      },
+      "Cannot add callbacks after calling Register");
+  reader.Deregister();
+}
+
 // Test that if we feed the replay with a mismatched node list that we die on
 // the LogReader constructor.
 TEST_P(MultinodeLoggerDeathTest, MultiNodeBadReplayConfig) {