Allow renaming logged channels for replay

If we change an application to use a new channel for an existing
message, we need a way to support replaying old logs through it.
RenameLoggedChannel() provides an API to map the original channel to
the new one, updating the logged configuration as needed.

Since global maps for the new channel need not follow the same patterns
as the old one, any relevant maps must be specified by the user.

Change-Id: I68e9d2fe16bceaa60972a3f762f955e583c80255
Signed-off-by: Sanjay Narayanan <sanjay.narayanan@bluerivertech.com>
diff --git a/aos/events/logging/multinode_logger_test.cc b/aos/events/logging/multinode_logger_test.cc
index f1784bf..18337be 100644
--- a/aos/events/logging/multinode_logger_test.cc
+++ b/aos/events/logging/multinode_logger_test.cc
@@ -166,10 +166,12 @@
                 UnorderedElementsAre(
                     std::make_tuple("/pi1/aos",
                                     "aos.message_bridge.ServerStatistics", 1),
-                    std::make_tuple("/test", "aos.examples.Ping", 1)))
+                    std::make_tuple("/test", "aos.examples.Ping", 1),
+                    std::make_tuple("/pi1/aos", "aos.examples.Ping", 1)))
         << " : " << logfiles_[2];
     {
       std::vector<std::tuple<std::string, std::string, int>> channel_counts = {
+          std::make_tuple("/pi1/aos", "aos.examples.Ping", 10),
           std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 1),
           std::make_tuple("/pi1/aos", "aos.message_bridge.ClientStatistics",
                           1)};
@@ -185,12 +187,13 @@
     }
     {
       std::vector<std::tuple<std::string, std::string, int>> channel_counts = {
+          std::make_tuple("/pi1/aos", "aos.examples.Ping", 1990),
           std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 199),
           std::make_tuple("/pi1/aos", "aos.message_bridge.ServerStatistics",
                           20),
           std::make_tuple("/pi1/aos", "aos.message_bridge.ClientStatistics",
                           199),
-          std::make_tuple("/pi1/aos", "aos.timing.Report", 40),
+          std::make_tuple("/pi1/aos", "aos.timing.Report", 60),
           std::make_tuple("/test", "aos.examples.Ping", 2000)};
       if (!std::get<0>(GetParam()).shared) {
         channel_counts.push_back(
@@ -237,8 +240,10 @@
 
     // Timing reports and pongs.
     EXPECT_THAT(CountChannelsData(config, logfiles_[7]),
-                UnorderedElementsAre(std::make_tuple(
-                    "/pi2/aos", "aos.message_bridge.ServerStatistics", 1)))
+                UnorderedElementsAre(
+                    std::make_tuple("/pi2/aos", "aos.examples.Ping", 1),
+                    std::make_tuple("/pi2/aos",
+                                    "aos.message_bridge.ServerStatistics", 1)))
         << " : " << logfiles_[7];
     EXPECT_THAT(
         CountChannelsData(config, logfiles_[8]),
@@ -247,12 +252,13 @@
     EXPECT_THAT(
         CountChannelsData(config, logfiles_[9]),
         UnorderedElementsAre(
+            std::make_tuple("/pi2/aos", "aos.examples.Ping", 2000),
             std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 200),
             std::make_tuple("/pi2/aos", "aos.message_bridge.ServerStatistics",
                             20),
             std::make_tuple("/pi2/aos", "aos.message_bridge.ClientStatistics",
                             200),
-            std::make_tuple("/pi2/aos", "aos.timing.Report", 40),
+            std::make_tuple("/pi2/aos", "aos.timing.Report", 60),
             std::make_tuple("/test", "aos.examples.Pong", 2000)))
         << " : " << logfiles_[9];
     // And ping timestamps.
@@ -946,7 +952,7 @@
   VerifyParts(sorted_parts);
 }
 
-// Tests that if we remap a remapped channel, it shows up correctly.
+// Tests that if we remap a logged channel, it shows up correctly.
 TEST_P(MultinodeLoggerTest, RemapLoggedChannel) {
   time_converter_.StartEqual();
   {
@@ -1025,6 +1031,117 @@
   reader.Deregister();
 }
 
+// Tests that if we rename a logged channel, it shows up correctly.
+TEST_P(MultinodeLoggerTest, RenameLoggedChannel) {
+  std::vector<std::string> actual_filenames;
+  time_converter_.StartEqual();
+  {
+    LoggerState pi1_logger = MakeLogger(pi1_);
+    LoggerState pi2_logger = MakeLogger(pi2_);
+
+    event_loop_factory_.RunFor(chrono::milliseconds(95));
+
+    StartLogger(&pi1_logger);
+    StartLogger(&pi2_logger);
+
+    event_loop_factory_.RunFor(chrono::milliseconds(20000));
+
+    pi1_logger.AppendAllFilenames(&actual_filenames);
+    pi2_logger.AppendAllFilenames(&actual_filenames);
+  }
+
+  LogReader reader(SortParts(actual_filenames));
+
+  // Rename just on pi2. Add some global maps just to verify they get added in
+  // the config and used correctly.
+  std::vector<MapT> maps;
+  {
+    MapT map;
+    map.match = std::make_unique<ChannelT>();
+    map.match->name = "/foo*";
+    map.match->source_node = "pi1";
+    map.rename = std::make_unique<ChannelT>();
+    map.rename->name = "/pi1/foo";
+    maps.emplace_back(std::move(map));
+  }
+  {
+    MapT map;
+    map.match = std::make_unique<ChannelT>();
+    map.match->name = "/foo*";
+    map.match->source_node = "pi2";
+    map.rename = std::make_unique<ChannelT>();
+    map.rename->name = "/pi2/foo";
+    maps.emplace_back(std::move(map));
+  }
+  {
+    MapT map;
+    map.match = std::make_unique<ChannelT>();
+    map.match->name = "/foo";
+    map.match->type = "aos.examples.Ping";
+    map.rename = std::make_unique<ChannelT>();
+    map.rename->name = "/foo/renamed";
+    maps.emplace_back(std::move(map));
+  }
+  reader.RenameLoggedChannel<aos::examples::Ping>(
+      "/aos", configuration::GetNode(reader.configuration(), "pi2"),
+      "/pi2/foo/renamed", maps);
+
+  SimulatedEventLoopFactory log_reader_factory(reader.configuration());
+  log_reader_factory.set_send_delay(chrono::microseconds(0));
+
+  std::vector<const Channel *> remapped_channels = reader.RemappedChannels();
+  // Note: An extra channel gets remapped automatically due to a timestamp
+  // channel being LOCAL_LOGGER'd.
+  const bool shared = std::get<0>(GetParam()).shared;
+  ASSERT_EQ(remapped_channels.size(), shared ? 1u : 2u);
+  EXPECT_EQ(remapped_channels[shared ? 0 : 1]->name()->string_view(),
+            "/pi2/foo/renamed");
+  EXPECT_EQ(remapped_channels[shared ? 0 : 1]->type()->string_view(),
+            "aos.examples.Ping");
+  if (!shared) {
+    EXPECT_EQ(remapped_channels[0]->name()->string_view(),
+              "/original/pi1/aos/remote_timestamps/pi2/pi1/aos/"
+              "aos-message_bridge-Timestamp");
+    EXPECT_EQ(remapped_channels[0]->type()->string_view(),
+              "aos.message_bridge.RemoteMessage");
+  }
+
+  reader.Register(&log_reader_factory);
+
+  const Node *pi1 =
+      configuration::GetNode(log_reader_factory.configuration(), "pi1");
+  const Node *pi2 =
+      configuration::GetNode(log_reader_factory.configuration(), "pi2");
+
+  // Confirm we can read the data on the renamed channel, just for pi2. Nothing
+  // else should have moved.
+  std::unique_ptr<EventLoop> pi2_event_loop =
+      log_reader_factory.MakeEventLoop("test", pi2);
+  pi2_event_loop->SkipTimingReport();
+  std::unique_ptr<EventLoop> full_pi2_event_loop =
+      log_reader_factory.MakeEventLoop("test", pi2);
+  full_pi2_event_loop->SkipTimingReport();
+  std::unique_ptr<EventLoop> pi1_event_loop =
+      log_reader_factory.MakeEventLoop("test", pi1);
+  pi1_event_loop->SkipTimingReport();
+
+  MessageCounter<aos::examples::Ping> pi2_ping(pi2_event_loop.get(), "/aos");
+  MessageCounter<aos::examples::Ping> pi2_renamed_ping(pi2_event_loop.get(),
+                                                       "/foo");
+  MessageCounter<aos::examples::Ping> full_pi2_renamed_ping(
+      full_pi2_event_loop.get(), "/pi2/foo/renamed");
+  MessageCounter<aos::examples::Ping> pi1_ping(pi1_event_loop.get(), "/aos");
+
+  log_reader_factory.Run();
+
+  EXPECT_EQ(pi2_ping.count(), 0u);
+  EXPECT_NE(pi2_renamed_ping.count(), 0u);
+  EXPECT_NE(full_pi2_renamed_ping.count(), 0u);
+  EXPECT_NE(pi1_ping.count(), 0u);
+
+  reader.Deregister();
+}
+
 // Tests that we can remap a forwarded channel as well.
 TEST_P(MultinodeLoggerTest, RemapForwardedLoggedChannel) {
   time_converter_.StartEqual();
@@ -1102,6 +1219,109 @@
   reader.Deregister();
 }
 
+// Tests that we can rename a forwarded channel as well.
+TEST_P(MultinodeLoggerTest, RenameForwardedLoggedChannel) {
+  std::vector<std::string> actual_filenames;
+  time_converter_.StartEqual();
+  {
+    LoggerState pi1_logger = MakeLogger(pi1_);
+    LoggerState pi2_logger = MakeLogger(pi2_);
+
+    event_loop_factory_.RunFor(chrono::milliseconds(95));
+
+    StartLogger(&pi1_logger);
+    StartLogger(&pi2_logger);
+
+    event_loop_factory_.RunFor(chrono::milliseconds(20000));
+
+    pi1_logger.AppendAllFilenames(&actual_filenames);
+    pi2_logger.AppendAllFilenames(&actual_filenames);
+  }
+
+  LogReader reader(SortParts(actual_filenames));
+
+  std::vector<MapT> maps;
+  {
+    MapT map;
+    map.match = std::make_unique<ChannelT>();
+    map.match->name = "/production*";
+    map.match->source_node = "pi1";
+    map.rename = std::make_unique<ChannelT>();
+    map.rename->name = "/pi1/production";
+    maps.emplace_back(std::move(map));
+  }
+  {
+    MapT map;
+    map.match = std::make_unique<ChannelT>();
+    map.match->name = "/production*";
+    map.match->source_node = "pi2";
+    map.rename = std::make_unique<ChannelT>();
+    map.rename->name = "/pi2/production";
+    maps.emplace_back(std::move(map));
+  }
+  reader.RenameLoggedChannel<aos::examples::Ping>(
+      "/test", configuration::GetNode(reader.configuration(), "pi1"),
+      "/pi1/production", maps);
+
+  SimulatedEventLoopFactory log_reader_factory(reader.configuration());
+  log_reader_factory.set_send_delay(chrono::microseconds(0));
+
+  reader.Register(&log_reader_factory);
+
+  const Node *pi1 =
+      configuration::GetNode(log_reader_factory.configuration(), "pi1");
+  const Node *pi2 =
+      configuration::GetNode(log_reader_factory.configuration(), "pi2");
+
+  // Confirm we can read the data on the renamed channel, on both the source
+  // node and the remote node. In case of split timestamp channels, confirm that
+  // we receive the timestamp messages on the renamed channel as well.
+  std::unique_ptr<EventLoop> pi1_event_loop =
+      log_reader_factory.MakeEventLoop("test", pi1);
+  pi1_event_loop->SkipTimingReport();
+  std::unique_ptr<EventLoop> full_pi1_event_loop =
+      log_reader_factory.MakeEventLoop("test", pi1);
+  full_pi1_event_loop->SkipTimingReport();
+  std::unique_ptr<EventLoop> pi2_event_loop =
+      log_reader_factory.MakeEventLoop("test", pi2);
+  pi2_event_loop->SkipTimingReport();
+
+  MessageCounter<examples::Ping> pi1_ping(pi1_event_loop.get(), "/test");
+  MessageCounter<examples::Ping> pi2_ping(pi2_event_loop.get(), "/test");
+  MessageCounter<examples::Ping> pi1_renamed_ping(pi1_event_loop.get(),
+                                                  "/pi1/production");
+  MessageCounter<examples::Ping> pi2_renamed_ping(pi2_event_loop.get(),
+                                                  "/pi1/production");
+
+  std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
+      pi1_renamed_ping_timestamp;
+  std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
+      pi1_ping_timestamp;
+  if (!shared()) {
+    pi1_renamed_ping_timestamp =
+        std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
+            pi1_event_loop.get(),
+            "/pi1/aos/remote_timestamps/pi2/pi1/production/aos-examples-Ping");
+    pi1_ping_timestamp =
+        std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
+            pi1_event_loop.get(),
+            "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping");
+  }
+
+  log_reader_factory.Run();
+
+  EXPECT_EQ(pi1_ping.count(), 0u);
+  EXPECT_EQ(pi2_ping.count(), 0u);
+  EXPECT_NE(pi1_renamed_ping.count(), 0u);
+  EXPECT_NE(pi2_renamed_ping.count(), 0u);
+  if (!shared()) {
+    EXPECT_NE(pi1_renamed_ping_timestamp->count(), 0u);
+    EXPECT_EQ(pi1_ping_timestamp->count(), 0u);
+  }
+
+  reader.Deregister();
+}
+
 // Tests that we observe all the same events in log replay (for a given node)
 // whether we just register an event loop for that node or if we register a full
 // event loop factory.