Automatically remap repeatedly remapped channels

If you replay a log multiple times and are remapping a channel
/foo on each run, this will now cascade the remaps so that on
the first replay you got
/foo -> /original/foo
and then on the next replay you get
/original/foo -> /original/original/foo
/foo -> /original/foo
and so on.

This is needed to handle logs where timestamp channels are not
NOT_LOGGED.

Change-Id: I158735709504ee1d3ebb7a9678c03bbb0a53ba5c
Signed-off-by: James Kuszmaul <jabukuszmaul+collab@gmail.com>
diff --git a/aos/events/logging/log_reader.cc b/aos/events/logging/log_reader.cc
index 87ad72c..f80f28d 100644
--- a/aos/events/logging/log_reader.cc
+++ b/aos/events/logging/log_reader.cc
@@ -288,6 +288,10 @@
               RemoteMessage::GetFullyQualifiedName(), "", node, true);
 
           if (timestamp_channel != nullptr) {
+            // If for some reason a timestamp channel is not NOT_LOGGED (which
+            // is unusual), then remap the channel so that the replayed channel
+            // doesn't overlap with the special separate replay we do for
+            // timestamps.
             if (timestamp_channel->logger() != LoggerConfig::NOT_LOGGED) {
               RemapLoggedChannel<RemoteMessage>(
                   timestamp_channel->name()->string_view(), node);
@@ -1278,9 +1282,40 @@
   event_loop_factory_ = nullptr;
 }
 
+namespace {
+// Checks if the specified channel name/type exists in the config and, depending
+// on the value of conflict_handling, calls conflict_handler or just dies.
+template <typename F>
+void CheckAndHandleRemapConflict(std::string_view new_name,
+                                 std::string_view new_type,
+                                 const Configuration *config,
+                                 LogReader::RemapConflict conflict_handling,
+                                 F conflict_handler) {
+  const Channel *existing_channel =
+      configuration::GetChannel(config, new_name, new_type, "", nullptr, true);
+  if (existing_channel != nullptr) {
+    switch (conflict_handling) {
+      case LogReader::RemapConflict::kDisallow:
+        LOG(FATAL)
+            << "Channel "
+            << configuration::StrippedChannelToString(existing_channel)
+            << " is already used--you can't remap a logged channel to it.";
+        break;
+      case LogReader::RemapConflict::kCascade:
+        LOG(INFO) << "Automatically remapping "
+                  << configuration::StrippedChannelToString(existing_channel)
+                  << " to avoid conflicts.";
+        conflict_handler();
+        break;
+    }
+  }
+}
+}  // namespace
+
 void LogReader::RemapLoggedChannel(std::string_view name, std::string_view type,
                                    std::string_view add_prefix,
-                                   std::string_view new_type) {
+                                   std::string_view new_type,
+                                   RemapConflict conflict_handling) {
   for (size_t ii = 0; ii < logged_configuration()->channels()->size(); ++ii) {
     const Channel *const channel = logged_configuration()->channels()->Get(ii);
     if (channel->name()->str() == name &&
@@ -1292,6 +1327,16 @@
       remapped_channel.remapped_name =
           std::string(add_prefix) + std::string(name);
       remapped_channel.new_type = new_type;
+      const std::string_view remapped_type =
+          remapped_channel.new_type.empty() ? type : remapped_channel.new_type;
+      CheckAndHandleRemapConflict(
+          remapped_channel.remapped_name, remapped_type,
+          remapped_configuration_, conflict_handling,
+          [this, &remapped_channel, remapped_type, add_prefix,
+           conflict_handling]() {
+            RemapLoggedChannel(remapped_channel.remapped_name, remapped_type,
+                               add_prefix, "", conflict_handling);
+          });
       remapped_channels_[ii] = std::move(remapped_channel);
       VLOG(1) << "Remapping channel "
               << configuration::CleanedChannelToString(channel)
@@ -1307,7 +1352,8 @@
 void LogReader::RemapLoggedChannel(std::string_view name, std::string_view type,
                                    const Node *node,
                                    std::string_view add_prefix,
-                                   std::string_view new_type) {
+                                   std::string_view new_type,
+                                   RemapConflict conflict_handling) {
   VLOG(1) << "Node is " << aos::FlatbufferToJson(node);
   const Channel *remapped_channel =
       configuration::GetChannel(logged_configuration(), name, type, "", node);
@@ -1350,6 +1396,15 @@
       std::string(add_prefix) +
       std::string(remapped_channel->name()->string_view());
   remapped_channel_struct.new_type = new_type;
+  const std::string_view remapped_type = new_type.empty() ? type : new_type;
+  CheckAndHandleRemapConflict(
+      remapped_channel_struct.remapped_name, remapped_type,
+      remapped_configuration_, conflict_handling,
+      [this, &remapped_channel_struct, remapped_type, node, add_prefix,
+       conflict_handling]() {
+        RemapLoggedChannel(remapped_channel_struct.remapped_name, remapped_type,
+                           node, add_prefix, "", conflict_handling);
+      });
   remapped_channels_[channel_index] = std::move(remapped_channel_struct);
   MakeRemappedConfig();
 }
diff --git a/aos/events/logging/log_reader.h b/aos/events/logging/log_reader.h
index 9e44996..07cca48 100644
--- a/aos/events/logging/log_reader.h
+++ b/aos/events/logging/log_reader.h
@@ -156,18 +156,36 @@
   void SetEndTime(std::string end_time);
   void SetEndTime(realtime_clock::time_point end_time);
 
+  // Enum to use for indicating how RemapLoggedChannel behaves when there is
+  // already a channel with the remapped name (e.g., as may happen when
+  // replaying a logfile that was itself generated from replay).
+  enum class RemapConflict {
+    // LOG(FATAL) on conflicts in remappings.
+    kDisallow,
+    // If we run into a conflict, attempt to remap the channel we would be
+    // overriding (and continue to do so if remapping *that* channel also
+    // generates a conflict).
+    // This will mean that if we repeatedly replay a log, we will end up
+    // stacking more and more /original's on the start of the oldest version
+    // of the channels.
+    kCascade
+  };
+
   // Causes the logger to publish the provided channel on a different name so
   // that replayed applications can publish on the proper channel name without
   // interference. This operates on raw channel names, without any node or
   // application specific mappings.
-  void RemapLoggedChannel(std::string_view name, std::string_view type,
-                          std::string_view add_prefix = "/original",
-                          std::string_view new_type = "");
+  void RemapLoggedChannel(
+      std::string_view name, std::string_view type,
+      std::string_view add_prefix = "/original", std::string_view new_type = "",
+      RemapConflict conflict_handling = RemapConflict::kCascade);
   template <typename T>
-  void RemapLoggedChannel(std::string_view name,
-                          std::string_view add_prefix = "/original",
-                          std::string_view new_type = "") {
-    RemapLoggedChannel(name, T::GetFullyQualifiedName(), add_prefix, new_type);
+  void RemapLoggedChannel(
+      std::string_view name, std::string_view add_prefix = "/original",
+      std::string_view new_type = "",
+      RemapConflict conflict_handling = RemapConflict::kCascade) {
+    RemapLoggedChannel(name, T::GetFullyQualifiedName(), add_prefix, new_type,
+                       conflict_handling);
   }
 
   // Remaps the provided channel, though this respects node mappings, and
@@ -179,16 +197,17 @@
   // TODO(austin): If you have 2 nodes remapping something to the same channel,
   // this doesn't handle that.  No use cases exist yet for that, so it isn't
   // being done yet.
-  void RemapLoggedChannel(std::string_view name, std::string_view type,
-                          const Node *node,
-                          std::string_view add_prefix = "/original",
-                          std::string_view new_type = "");
+  void RemapLoggedChannel(
+      std::string_view name, std::string_view type, const Node *node,
+      std::string_view add_prefix = "/original", std::string_view new_type = "",
+      RemapConflict conflict_handling = RemapConflict::kCascade);
   template <typename T>
-  void RemapLoggedChannel(std::string_view name, const Node *node,
-                          std::string_view add_prefix = "/original",
-                          std::string_view new_type = "") {
+  void RemapLoggedChannel(
+      std::string_view name, const Node *node,
+      std::string_view add_prefix = "/original", std::string_view new_type = "",
+      RemapConflict conflict_handling = RemapConflict::kCascade) {
     RemapLoggedChannel(name, T::GetFullyQualifiedName(), node, add_prefix,
-                       new_type);
+                       new_type, conflict_handling);
   }
 
   template <typename T>
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index ed7d150..ab0ec3a 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -566,11 +566,14 @@
   bool shared;
   // sha256 of the config.
   std::string_view sha256;
+  // sha256 of the relogged config
+  std::string_view relogged_sha256;
 };
 
 std::ostream &operator<<(std::ostream &ostream, const ConfigParams &params) {
   ostream << "{config: \"" << params.config << "\", shared: " << params.shared
-          << ", sha256: \"" << params.sha256 << "\"}";
+          << ", sha256: \"" << params.sha256 << "\", relogged_sha256: \""
+          << params.relogged_sha256 << "\"}";
   return ostream;
 }
 
@@ -754,8 +757,8 @@
       unlink((file + ".xz").c_str());
     }
 
-    for (const auto &file :
-         MakeLogFiles(tmp_dir_ + "/relogged1", tmp_dir_ + "/relogged2")) {
+    for (const auto &file : MakeLogFiles(tmp_dir_ + "/relogged1",
+                                         tmp_dir_ + "/relogged2", 3, 3, true)) {
       unlink(file.c_str());
     }
 
@@ -775,12 +778,14 @@
   std::vector<std::string> MakeLogFiles(std::string logfile_base1,
                                         std::string logfile_base2,
                                         size_t pi1_data_count = 3,
-                                        size_t pi2_data_count = 3) {
+                                        size_t pi2_data_count = 3,
+                                        bool relogged_config = false) {
+    std::string_view sha256 = relogged_config
+                                  ? std::get<0>(GetParam()).relogged_sha256
+                                  : std::get<0>(GetParam()).sha256;
     std::vector<std::string> result;
-    result.emplace_back(absl::StrCat(
-        logfile_base1, "_", std::get<0>(GetParam()).sha256, Extension()));
-    result.emplace_back(absl::StrCat(
-        logfile_base2, "_", std::get<0>(GetParam()).sha256, Extension()));
+    result.emplace_back(absl::StrCat(logfile_base1, "_", sha256, Extension()));
+    result.emplace_back(absl::StrCat(logfile_base2, "_", sha256, Extension()));
     for (size_t i = 0; i < pi1_data_count; ++i) {
       result.emplace_back(
           absl::StrCat(logfile_base1, "_pi1_data.part", i, Extension()));
@@ -1322,24 +1327,40 @@
                                     "aos.message_bridge.ServerStatistics", 1),
                     std::make_tuple("/test", "aos.examples.Ping", 1)))
         << " : " << logfiles_[2];
-    EXPECT_THAT(
-        CountChannelsData(config, logfiles_[3]),
-        UnorderedElementsAre(
-            std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 1),
-            std::make_tuple("/pi1/aos", "aos.message_bridge.ClientStatistics",
-                            1)))
-        << " : " << logfiles_[3];
-    EXPECT_THAT(
-        CountChannelsData(config, logfiles_[4]),
-        UnorderedElementsAre(
-            std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 199),
-            std::make_tuple("/pi1/aos", "aos.message_bridge.ServerStatistics",
-                            20),
-            std::make_tuple("/pi1/aos", "aos.message_bridge.ClientStatistics",
-                            199),
-            std::make_tuple("/pi1/aos", "aos.timing.Report", 40),
-            std::make_tuple("/test", "aos.examples.Ping", 2000)))
-        << " : " << logfiles_[4];
+    {
+      std::vector<std::tuple<std::string, std::string, int>> channel_counts = {
+          std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 1),
+          std::make_tuple("/pi1/aos", "aos.message_bridge.ClientStatistics",
+                          1)};
+      if (!std::get<0>(GetParam()).shared) {
+        channel_counts.push_back(
+            std::make_tuple("/pi1/aos/remote_timestamps/pi2/pi1/aos/"
+                            "aos-message_bridge-Timestamp",
+                            "aos.message_bridge.RemoteMessage", 1));
+      }
+      EXPECT_THAT(CountChannelsData(config, logfiles_[3]),
+                  ::testing::UnorderedElementsAreArray(channel_counts))
+          << " : " << logfiles_[3];
+    }
+    {
+      std::vector<std::tuple<std::string, std::string, int>> channel_counts = {
+          std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 199),
+          std::make_tuple("/pi1/aos", "aos.message_bridge.ServerStatistics",
+                          20),
+          std::make_tuple("/pi1/aos", "aos.message_bridge.ClientStatistics",
+                          199),
+          std::make_tuple("/pi1/aos", "aos.timing.Report", 40),
+          std::make_tuple("/test", "aos.examples.Ping", 2000)};
+      if (!std::get<0>(GetParam()).shared) {
+        channel_counts.push_back(
+            std::make_tuple("/pi1/aos/remote_timestamps/pi2/pi1/aos/"
+                            "aos-message_bridge-Timestamp",
+                            "aos.message_bridge.RemoteMessage", 199));
+      }
+      EXPECT_THAT(CountChannelsData(config, logfiles_[4]),
+                  ::testing::UnorderedElementsAreArray(channel_counts))
+          << " : " << logfiles_[4];
+    }
     // Timestamps for pong
     EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[2]),
                 UnorderedElementsAre())
@@ -2111,9 +2132,18 @@
   log_reader_factory.set_send_delay(chrono::microseconds(0));
 
   std::vector<const Channel *> remapped_channels = reader.RemappedChannels();
-  ASSERT_EQ(remapped_channels.size(), 1u);
+  // Note: An extra channel gets remapped automatically due to a timestamp
+  // channel being LOCAL_LOGGER'd.
+  ASSERT_EQ(remapped_channels.size(), std::get<0>(GetParam()).shared ? 1u : 2u);
   EXPECT_EQ(remapped_channels[0]->name()->string_view(), "/original/pi1/aos");
   EXPECT_EQ(remapped_channels[0]->type()->string_view(), "aos.timing.Report");
+  if (!std::get<0>(GetParam()).shared) {
+    EXPECT_EQ(remapped_channels[1]->name()->string_view(),
+              "/original/pi1/aos/remote_timestamps/pi2/pi1/aos/"
+              "aos-message_bridge-Timestamp");
+    EXPECT_EQ(remapped_channels[1]->type()->string_view(),
+              "aos.message_bridge.RemoteMessage");
+  }
 
   reader.Register(&log_reader_factory);
 
@@ -2285,9 +2315,12 @@
     const Channel *channel =
         full_event_loop->configuration()->channels()->Get(ii);
     // We currently don't support replaying remote timestamp channels in
-    // realtime replay.
+    // realtime replay (unless the remote timestamp channel was not NOT_LOGGED,
+    // in which case it gets auto-remapped and replayed on a /original channel).
     if (channel->name()->string_view().find("remote_timestamp") !=
-        std::string_view::npos) {
+            std::string_view::npos &&
+        channel->name()->string_view().find("/original") ==
+            std::string_view::npos) {
       continue;
     }
     if (configuration::ChannelIsReadableOnNode(channel, full_pi1)) {
@@ -2640,8 +2673,19 @@
   // And verify that we can run the LogReader over the relogged files without
   // hitting any fatal errors.
   {
-    LogReader relogged_reader(SortParts(
-        MakeLogFiles(tmp_dir_ + "/relogged1", tmp_dir_ + "/relogged2")));
+    LogReader relogged_reader(SortParts(MakeLogFiles(
+        tmp_dir_ + "/relogged1", tmp_dir_ + "/relogged2", 3, 3, true)));
+    relogged_reader.Register();
+
+    relogged_reader.event_loop_factory()->Run();
+  }
+  // And confirm that we can read the logged file using the reader's
+  // configuration.
+  {
+    LogReader relogged_reader(
+        SortParts(MakeLogFiles(tmp_dir_ + "/relogged1", tmp_dir_ + "/relogged2",
+                               3, 3, true)),
+        reader.configuration());
     relogged_reader.Register();
 
     relogged_reader.event_loop_factory()->Run();
@@ -3668,27 +3712,29 @@
 constexpr std::string_view kCombinedConfigSha1(
     "46f8daa7d84eb999c8d3584b79f4a289fd46e3a0b47a08bdbee9c7e3b89b4aff");
 constexpr std::string_view kSplitConfigSha1(
-    "2558663e9414383e264510ce733505a40be99d2f43e299819417944f06b899ea");
+    "53978a9c089e8f5a525503ce18aca6146a7a9dee450b6067580ed952a1ff08dd");
+constexpr std::string_view kReloggedSplitConfigSha1(
+    "037fb89ada330db8908314b26bc7d96a6b3264984c189471d23b70ab6443ff96");
 
 INSTANTIATE_TEST_SUITE_P(
     All, MultinodeLoggerTest,
-    ::testing::Combine(::testing::Values(
-                           ConfigParams{
-                               "multinode_pingpong_combined_config.json", true,
-                               kCombinedConfigSha1},
-                           ConfigParams{"multinode_pingpong_split_config.json",
-                                        false, kSplitConfigSha1}),
-                       ::testing::ValuesIn(SupportedCompressionAlgorithms())));
+    ::testing::Combine(
+        ::testing::Values(
+            ConfigParams{"multinode_pingpong_combined_config.json", true,
+                         kCombinedConfigSha1, kCombinedConfigSha1},
+            ConfigParams{"multinode_pingpong_split_config.json", false,
+                         kSplitConfigSha1, kReloggedSplitConfigSha1}),
+        ::testing::ValuesIn(SupportedCompressionAlgorithms())));
 
 INSTANTIATE_TEST_SUITE_P(
     All, MultinodeLoggerDeathTest,
-    ::testing::Combine(::testing::Values(
-                           ConfigParams{
-                               "multinode_pingpong_combined_config.json", true,
-                               kCombinedConfigSha1},
-                           ConfigParams{"multinode_pingpong_split_config.json",
-                                        false, kSplitConfigSha1}),
-                       ::testing::ValuesIn(SupportedCompressionAlgorithms())));
+    ::testing::Combine(
+        ::testing::Values(
+            ConfigParams{"multinode_pingpong_combined_config.json", true,
+                         kCombinedConfigSha1, kCombinedConfigSha1},
+            ConfigParams{"multinode_pingpong_split_config.json", false,
+                         kSplitConfigSha1, kReloggedSplitConfigSha1}),
+        ::testing::ValuesIn(SupportedCompressionAlgorithms())));
 
 // Tests that we can relog with a different config.  This makes most sense when
 // you are trying to edit a log and want to use channel renaming + the original
diff --git a/aos/events/logging/multinode_pingpong_split.json b/aos/events/logging/multinode_pingpong_split.json
index c696ec2..f5bd9cb 100644
--- a/aos/events/logging/multinode_pingpong_split.json
+++ b/aos/events/logging/multinode_pingpong_split.json
@@ -100,7 +100,8 @@
     {
       "name": "/pi1/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp",
       "type": "aos.message_bridge.RemoteMessage",
-      "logger": "NOT_LOGGED",
+      /* Log one remote timestamp channel to ensure everythings still works. */
+      "logger": "LOCAL_LOGGER",
       "num_senders": 2,
       "source_node": "pi1"
     },