Change LogReader API to be able to replace messages

The mutation API in LogReader was not able to express dropping messages,
or growing messages.  This enables more aggressive mutation.

Change-Id: I477482da4262483a780d15ebf8c98a51e37099f6
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/multinode_logger_test.cc b/aos/events/logging/multinode_logger_test.cc
index 2cf8ffe..c252616 100644
--- a/aos/events/logging/multinode_logger_test.cc
+++ b/aos/events/logging/multinode_logger_test.cc
@@ -8,6 +8,7 @@
 #include "aos/events/message_counter.h"
 #include "aos/events/ping_lib.h"
 #include "aos/events/pong_lib.h"
+#include "aos/flatbuffers/aligned_allocator.h"
 #include "aos/network/remote_message_generated.h"
 #include "aos/network/timestamp_generated.h"
 #include "aos/testing/tmpdir.h"
@@ -600,7 +601,7 @@
 }
 
 // MultinodeLoggerTest that tests the mutate callback works across multiple
-// nodes with remapping
+// nodes with remapping.
 TEST_P(MultinodeLoggerTest, MultiNodeRemapMutateCallback) {
   time_converter_.StartEqual();
   std::vector<std::string> actual_filenames;
@@ -629,14 +630,18 @@
 
   SimulatedEventLoopFactory log_reader_factory(reader.configuration());
 
-  int pong_count = 0;
+  int pong_count = 10;
   // Adds a callback which mutates the value of the pong message before the
   // message is sent which is the feature we are testing here
-  reader.AddBeforeSendCallback("/test",
-                               [&pong_count](aos::examples::Pong *pong) {
-                                 pong->mutate_value(pong->value() + 1);
-                                 pong_count = pong->value();
-                               });
+  reader.AddBeforeSendCallback<aos::examples::Pong>(
+      "/test",
+      [&pong_count](
+          aos::examples::Pong *pong,
+          const TimestampedMessage &timestamped_message) -> SharedSpan {
+        pong->mutate_value(pong_count + 1);
+        ++pong_count;
+        return *timestamped_message.data;
+      });
 
   // This sends out the fetched messages and advances time to the start of the
   // log file.
@@ -698,14 +703,18 @@
 
   LogReader reader(sorted_parts, &config_.message());
 
-  int pong_count = 0;
+  int pong_count = 10;
   // Adds a callback which mutates the value of the pong message before the
   // message is sent which is the feature we are testing here
-  reader.AddBeforeSendCallback("/test",
-                               [&pong_count](aos::examples::Pong *pong) {
-                                 pong->mutate_value(pong->value() + 1);
-                                 pong_count = pong->value();
-                               });
+  reader.AddBeforeSendCallback<aos::examples::Pong>(
+      "/test",
+      [&pong_count](
+          aos::examples::Pong *pong,
+          const TimestampedMessage &timestamped_message) -> SharedSpan {
+        pong->mutate_value(pong_count + 1);
+        ++pong_count;
+        return *timestamped_message.data;
+      });
 
   SimulatedEventLoopFactory log_reader_factory(reader.configuration());
 
@@ -772,11 +781,15 @@
   int ping_count = 0;
   // Adds a callback which mutates the value of the pong message before the
   // message is sent which is the feature we are testing here
-  reader.AddBeforeSendCallback("/test",
-                               [&ping_count](aos::examples::Ping *ping) {
-                                 ++ping_count;
-                                 ping->mutate_value(ping_count);
-                               });
+  reader.AddBeforeSendCallback<aos::examples::Ping>(
+      "/test",
+      [&ping_count](
+          aos::examples::Ping *ping,
+          const TimestampedMessage &timestamped_message) -> SharedSpan {
+        ++ping_count;
+        ping->mutate_value(ping_count);
+        return *timestamped_message.data;
+      });
 
   SimulatedEventLoopFactory log_reader_factory(reader.configuration());
   log_reader_factory.set_send_delay(chrono::microseconds(0));
@@ -821,6 +834,291 @@
   reader.Deregister();
 }
 
+// MultinodeLoggerTest that tests the mutate callback can fully replace the
+// message.
+TEST_P(MultinodeLoggerTest, MultiNodeMutateCallbackReplacement) {
+  time_converter_.StartEqual();
+  std::vector<std::string> actual_filenames;
+
+  {
+    LoggerState pi1_logger = MakeLogger(pi1_);
+    LoggerState pi2_logger = MakeLogger(pi2_);
+
+    event_loop_factory_.RunFor(chrono::milliseconds(95));
+
+    StartLogger(&pi1_logger);
+    StartLogger(&pi2_logger);
+
+    event_loop_factory_.RunFor(chrono::milliseconds(20000));
+    pi1_logger.AppendAllFilenames(&actual_filenames);
+    pi2_logger.AppendAllFilenames(&actual_filenames);
+  }
+
+  const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
+  EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
+
+  LogReader reader(sorted_parts, &config_.message());
+
+  int pong_count = 10;
+  const uint8_t *data_ptr = nullptr;
+  // Adds a callback which replaces the pong message before the message is sent.
+  reader.AddBeforeSendCallback<aos::examples::Pong>(
+      "/test",
+      [&pong_count, &data_ptr](aos::examples::Pong *pong,
+                               const TimestampedMessage &) -> SharedSpan {
+        fbs::AlignedVectorAllocator allocator;
+        aos::fbs::Builder<aos::examples::PongStatic> pong_static(&allocator);
+        CHECK(pong_static->FromFlatbuffer(*pong));
+
+        pong_static->set_value(pong_count + 101);
+        ++pong_count;
+
+        SharedSpan result = allocator.Release();
+
+        data_ptr = result->data();
+
+        return result;
+      });
+
+  SimulatedEventLoopFactory log_reader_factory(reader.configuration());
+
+  // This sends out the fetched messages and advances time to the start of the
+  // log file.
+  reader.Register(&log_reader_factory);
+
+  const Node *pi1 =
+      configuration::GetNode(log_reader_factory.configuration(), "pi1");
+  const Node *pi2 =
+      configuration::GetNode(log_reader_factory.configuration(), "pi2");
+
+  EXPECT_THAT(reader.LoggedNodes(),
+              ::testing::ElementsAre(
+                  configuration::GetNode(reader.logged_configuration(), pi1),
+                  configuration::GetNode(reader.logged_configuration(), pi2)));
+
+  std::unique_ptr<EventLoop> pi1_event_loop =
+      log_reader_factory.MakeEventLoop("test", pi1);
+  std::unique_ptr<EventLoop> pi2_event_loop =
+      log_reader_factory.MakeEventLoop("test", pi2);
+
+  int pi1_pong_count = 10;
+  pi1_event_loop->MakeWatcher(
+      "/test", [&pi1_event_loop, &pong_count, &pi1_pong_count,
+                &data_ptr](const examples::Pong &pong) {
+        ++pi1_pong_count;
+        // Since simulated event loops (especially log replay) refcount the
+        // shared data, we can verify if the right data got published by
+        // verifying that the actual pointer to the flatbuffer matches.  This
+        // only is guarenteed to hold during this callback.
+        EXPECT_EQ(pi1_event_loop->context().data, data_ptr);
+        EXPECT_EQ(pong_count + 100, pong.value());
+        EXPECT_EQ(pi1_pong_count + 101, pong.value());
+      });
+
+  pi2_event_loop->MakeWatcher("/test", [&pi2_event_loop, &pong_count,
+                                        &data_ptr](const examples::Pong &pong) {
+    // Same goes for the forwarded side, that should be the same contents too.
+    EXPECT_EQ(pi2_event_loop->context().data, data_ptr);
+    EXPECT_EQ(pong_count + 100, pong.value());
+  });
+
+  reader.event_loop_factory()->RunFor(std::chrono::seconds(100));
+  reader.Deregister();
+
+  EXPECT_EQ(pong_count, 2011);
+}
+
+// MultinodeLoggerTest that tests the mutate callback can delete messages by
+// returning nullptr.
+TEST_P(MultinodeLoggerTest, MultiNodeMutateCallbackDelete) {
+  time_converter_.StartEqual();
+  std::vector<std::string> actual_filenames;
+
+  {
+    LoggerState pi1_logger = MakeLogger(pi1_);
+    LoggerState pi2_logger = MakeLogger(pi2_);
+
+    event_loop_factory_.RunFor(chrono::milliseconds(95));
+
+    StartLogger(&pi1_logger);
+    StartLogger(&pi2_logger);
+
+    event_loop_factory_.RunFor(chrono::milliseconds(20000));
+    pi1_logger.AppendAllFilenames(&actual_filenames);
+    pi2_logger.AppendAllFilenames(&actual_filenames);
+  }
+
+  const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
+  EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
+
+  LogReader reader(sorted_parts, &config_.message());
+
+  int pong_count = 10;
+  const uint8_t *data_ptr = nullptr;
+  // Adds a callback which mutates the value of the pong message before the
+  // message is sent which is the feature we are testing here
+  reader.AddBeforeSendCallback<aos::examples::Pong>(
+      "/test",
+      [&pong_count, &data_ptr](aos::examples::Pong *pong,
+                               const TimestampedMessage &) -> SharedSpan {
+        fbs::AlignedVectorAllocator allocator;
+        aos::fbs::Builder<aos::examples::PongStatic> pong_static(&allocator);
+        CHECK(pong_static->FromFlatbuffer(*pong));
+
+        pong_static->set_value(pong_count + 101);
+        ++pong_count;
+
+        if ((pong_count % 2) == 0) {
+          data_ptr = nullptr;
+          return nullptr;
+        }
+
+        SharedSpan result = allocator.Release();
+
+        data_ptr = result->data();
+
+        return result;
+      });
+
+  SimulatedEventLoopFactory log_reader_factory(reader.configuration());
+
+  // This sends out the fetched messages and advances time to the start of the
+  // log file.
+  reader.Register(&log_reader_factory);
+
+  const Node *pi1 =
+      configuration::GetNode(log_reader_factory.configuration(), "pi1");
+  const Node *pi2 =
+      configuration::GetNode(log_reader_factory.configuration(), "pi2");
+
+  EXPECT_THAT(reader.LoggedNodes(),
+              ::testing::ElementsAre(
+                  configuration::GetNode(reader.logged_configuration(), pi1),
+                  configuration::GetNode(reader.logged_configuration(), pi2)));
+
+  std::unique_ptr<EventLoop> pi1_event_loop =
+      log_reader_factory.MakeEventLoop("test", pi1);
+  std::unique_ptr<EventLoop> pi2_event_loop =
+      log_reader_factory.MakeEventLoop("test", pi2);
+
+  int pi1_pong_count = 10;
+  pi1_event_loop->MakeWatcher(
+      "/test", [&pi1_event_loop, &pong_count, &pi1_pong_count,
+                &data_ptr](const examples::Pong &pong) {
+        pi1_pong_count += 2;
+        // Since simulated event loops (especially log replay) refcount the
+        // shared data, we can verify if the right data got published by
+        // verifying that the actual pointer to the flatbuffer matches.  This
+        // only is guarenteed to hold during this callback.
+        EXPECT_EQ(pi1_event_loop->context().data, data_ptr);
+        EXPECT_EQ(pong_count + 100, pong.value());
+        EXPECT_EQ(pi1_pong_count + 101, pong.value());
+      });
+
+  pi2_event_loop->MakeWatcher("/test", [&pi2_event_loop, &pong_count,
+                                        &data_ptr](const examples::Pong &pong) {
+    // Same goes for the forwarded side, that should be the same contents too.
+    EXPECT_EQ(pi2_event_loop->context().data, data_ptr);
+    EXPECT_EQ(pong_count + 100, pong.value());
+  });
+
+  reader.event_loop_factory()->RunFor(std::chrono::seconds(100));
+  reader.Deregister();
+
+  EXPECT_EQ(pong_count, 2011);
+  // Since we count up by 2 each time we get a message, and the last pong gets
+  // dropped since it is an odd number we expect the number on pi1 to be 1 less.
+  EXPECT_EQ(pi1_pong_count, 2010);
+}
+
+// MultinodeLoggerTest that tests that non-forwarded channels are able to be
+// mutated.
+TEST_P(MultinodeLoggerTest, MultiNodeMutateCallbackNotForwarded) {
+  time_converter_.StartEqual();
+  std::vector<std::string> actual_filenames;
+
+  {
+    LoggerState pi1_logger = MakeLogger(pi1_);
+    LoggerState pi2_logger = MakeLogger(pi2_);
+
+    event_loop_factory_.RunFor(chrono::milliseconds(95));
+
+    StartLogger(&pi1_logger);
+    StartLogger(&pi2_logger);
+
+    event_loop_factory_.RunFor(chrono::milliseconds(20000));
+    pi1_logger.AppendAllFilenames(&actual_filenames);
+    pi2_logger.AppendAllFilenames(&actual_filenames);
+  }
+
+  const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
+  EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
+
+  LogReader reader(sorted_parts, &config_.message());
+
+  int ping_count = 10;
+  const uint8_t *data_ptr = nullptr;
+  // Adds a callback which mutates the value of the pong message before the
+  // message is sent which is the feature we are testing here
+  reader.AddBeforeSendCallback<aos::examples::Ping>(
+      "/pi1/aos",
+      [&ping_count, &data_ptr](aos::examples::Ping *ping,
+                               const TimestampedMessage &) -> SharedSpan {
+        fbs::AlignedVectorAllocator allocator;
+        aos::fbs::Builder<aos::examples::PingStatic> ping_static(&allocator);
+        CHECK(ping_static->FromFlatbuffer(*ping));
+
+        ping_static->set_value(ping_count + 101);
+        ++ping_count;
+
+        SharedSpan result = allocator.Release();
+
+        data_ptr = result->data();
+
+        return result;
+      });
+
+  SimulatedEventLoopFactory log_reader_factory(reader.configuration());
+
+  // This sends out the fetched messages and advances time to the start of the
+  // log file.
+  reader.Register(&log_reader_factory);
+
+  const Node *pi1 =
+      configuration::GetNode(log_reader_factory.configuration(), "pi1");
+  const Node *pi2 =
+      configuration::GetNode(log_reader_factory.configuration(), "pi2");
+
+  EXPECT_THAT(reader.LoggedNodes(),
+              ::testing::ElementsAre(
+                  configuration::GetNode(reader.logged_configuration(), pi1),
+                  configuration::GetNode(reader.logged_configuration(), pi2)));
+
+  std::unique_ptr<EventLoop> pi1_event_loop =
+      log_reader_factory.MakeEventLoop("test", pi1);
+  std::unique_ptr<EventLoop> pi2_event_loop =
+      log_reader_factory.MakeEventLoop("test", pi2);
+
+  int pi1_ping_count = 10;
+  pi1_event_loop->MakeWatcher(
+      "/aos", [&pi1_event_loop, &ping_count, &pi1_ping_count,
+               &data_ptr](const examples::Ping &ping) {
+        ++pi1_ping_count;
+        // Since simulated event loops (especially log replay) refcount the
+        // shared data, we can verify if the right data got published by
+        // verifying that the actual pointer to the flatbuffer matches.  This
+        // only is guarenteed to hold during this callback.
+        EXPECT_EQ(pi1_event_loop->context().data, data_ptr);
+        EXPECT_EQ(ping_count + 100, ping.value());
+        EXPECT_EQ(pi1_ping_count + 101, ping.value());
+      });
+
+  reader.event_loop_factory()->RunFor(std::chrono::seconds(100));
+  reader.Deregister();
+
+  EXPECT_EQ(ping_count, 2011);
+}
+
 // Tests that we do not allow adding callbacks after Register is called
 TEST_P(MultinodeLoggerDeathTest, AddCallbackAfterRegister) {
   time_converter_.StartEqual();
@@ -848,9 +1146,13 @@
   reader.Register(&log_reader_factory);
   EXPECT_DEATH(
       {
-        reader.AddBeforeSendCallback("/test", [](aos::examples::Pong *) {
-          LOG(FATAL) << "This should not be called";
-        });
+        reader.AddBeforeSendCallback<aos::examples::Pong>(
+            "/test",
+            [](aos::examples::Pong *,
+               const TimestampedMessage &timestamped_message) -> SharedSpan {
+              LOG(FATAL) << "This should not be called";
+              return *timestamped_message.data;
+            });
       },
       "Cannot add callbacks after calling Register");
   reader.Deregister();