Automatically add split timestamp channels in RemapLoggedChannel

With the new split timestamp channels, we need a corresponding timestamp
channel to be available for timestamps to be sent on for all channels.
When a channel gets remapped, the timestamp channel it's timestamps get
published to changes name.  This makes it so replaying that log file
fails because it can't find the corresponding timestamp channel.

We can require the user to create that channel themselves, but that is
pretty error prone.  Make LogReader do it as part of RemapLoggedChannel
automatically.

While we are here, add a test for the expected behavior.

Change-Id: I1f920b5031c2cedc21f73775797acc6795a7882d
diff --git a/aos/events/logging/log_reader.cc b/aos/events/logging/log_reader.cc
index 2eb3094..05555c6 100644
--- a/aos/events/logging/log_reader.cc
+++ b/aos/events/logging/log_reader.cc
@@ -39,6 +39,12 @@
     "The time to buffer ahead in the log file to accurately reconstruct time.");
 
 namespace aos {
+namespace configuration {
+// We don't really want to expose this publicly, but log reader doesn't really
+// want to re-implement it.
+void HandleMaps(const flatbuffers::Vector<flatbuffers::Offset<aos::Map>> *maps,
+                std::string *name, std::string_view type, const Node *node);
+}
 namespace logger {
 namespace {
 
@@ -892,6 +898,65 @@
         nullptr));
     channel_offsets.emplace_back(
         CopyChannel(c, pair.second.remapped_name, "", &fbb));
+
+    if (c->has_destination_nodes()) {
+      for (const Connection *connection : *c->destination_nodes()) {
+        switch (connection->timestamp_logger()) {
+          case LoggerConfig::LOCAL_LOGGER:
+          case LoggerConfig::NOT_LOGGED:
+            // There is no timestamp channel associated with this, so ignore it.
+            break;
+
+          case LoggerConfig::REMOTE_LOGGER:
+          case LoggerConfig::LOCAL_AND_REMOTE_LOGGER:
+            // We want to make a split timestamp channel regardless of what type
+            // of log this used to be.  No sense propagating the single
+            // timestamp channel.
+
+            CHECK(connection->has_timestamp_logger_nodes());
+            for (const flatbuffers::String *timestamp_logger_node :
+                 *connection->timestamp_logger_nodes()) {
+              const Node *node = configuration::GetNode(
+                  logged_configuration(), timestamp_logger_node->string_view());
+              message_bridge::ChannelTimestampFinder finder(
+                  logged_configuration(), "log_reader", node);
+
+              // We are assuming here that all the maps are setup correctly to
+              // handle arbitrary timestamps.  Apply the maps for this node to
+              // see what name this ends up with.
+              std::string name = finder.SplitChannelName(
+                  pair.second.remapped_name, c->type()->str(), connection);
+              std::string unmapped_name = name;
+              configuration::HandleMaps(logged_configuration()->maps(), &name,
+                                        "aos.message_bridge.RemoteMessage",
+                                        node);
+              CHECK_NE(name, unmapped_name)
+                  << ": Remote timestamp channel was not remapped, this is "
+                     "very fishy";
+              flatbuffers::Offset<flatbuffers::String> channel_name_offset =
+                  fbb.CreateString(name);
+              flatbuffers::Offset<flatbuffers::String> channel_type_offset =
+                  fbb.CreateString("aos.message_bridge.RemoteMessage");
+              flatbuffers::Offset<flatbuffers::String> source_node_offset =
+                  fbb.CreateString(timestamp_logger_node->string_view());
+
+              // Now, build a channel.  Don't log it, 2 senders, and match the
+              // source frequency.
+              Channel::Builder channel_builder(fbb);
+              channel_builder.add_name(channel_name_offset);
+              channel_builder.add_type(channel_type_offset);
+              channel_builder.add_source_node(source_node_offset);
+              channel_builder.add_logger(LoggerConfig::NOT_LOGGED);
+              channel_builder.add_num_senders(2);
+              if (c->has_frequency()) {
+                channel_builder.add_frequency(c->frequency());
+              }
+              channel_offsets.emplace_back(channel_builder.Finish());
+            }
+            break;
+        }
+      }
+    }
   }
 
   // Now reconstruct the original channels, translating types as needed
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 13b2312..2b504f3 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -1767,6 +1767,83 @@
   reader.Deregister();
 }
 
+// Tests that we can remap a forwarded channel as well.
+TEST_P(MultinodeLoggerTest, RemapForwardedLoggedChannel) {
+  time_converter_.StartEqual();
+  {
+    LoggerState pi1_logger = MakeLogger(pi1_);
+    LoggerState pi2_logger = MakeLogger(pi2_);
+
+    event_loop_factory_.RunFor(chrono::milliseconds(95));
+
+    StartLogger(&pi1_logger);
+    StartLogger(&pi2_logger);
+
+    event_loop_factory_.RunFor(chrono::milliseconds(20000));
+  }
+
+  LogReader reader(SortParts(logfiles_));
+
+  reader.RemapLoggedChannel<examples::Ping>("/test");
+
+  SimulatedEventLoopFactory log_reader_factory(reader.configuration());
+  log_reader_factory.set_send_delay(chrono::microseconds(0));
+
+  reader.Register(&log_reader_factory);
+
+  const Node *pi1 =
+      configuration::GetNode(log_reader_factory.configuration(), "pi1");
+  const Node *pi2 =
+      configuration::GetNode(log_reader_factory.configuration(), "pi2");
+
+  // Confirm we can read the data on the remapped channel, just for pi1. Nothing
+  // else should have moved.
+  std::unique_ptr<EventLoop> pi1_event_loop =
+      log_reader_factory.MakeEventLoop("test", pi1);
+  pi1_event_loop->SkipTimingReport();
+  std::unique_ptr<EventLoop> full_pi1_event_loop =
+      log_reader_factory.MakeEventLoop("test", pi1);
+  full_pi1_event_loop->SkipTimingReport();
+  std::unique_ptr<EventLoop> pi2_event_loop =
+      log_reader_factory.MakeEventLoop("test", pi2);
+  pi2_event_loop->SkipTimingReport();
+
+  MessageCounter<examples::Ping> pi1_ping(pi1_event_loop.get(), "/test");
+  MessageCounter<examples::Ping> pi2_ping(pi2_event_loop.get(), "/test");
+  MessageCounter<examples::Ping> pi1_original_ping(pi1_event_loop.get(),
+                                                   "/original/test");
+  MessageCounter<examples::Ping> pi2_original_ping(pi2_event_loop.get(),
+                                                   "/original/test");
+
+  std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
+      pi1_original_ping_timestamp;
+  std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
+      pi1_ping_timestamp;
+  if (!shared()) {
+    pi1_original_ping_timestamp =
+        std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
+            pi1_event_loop.get(),
+            "/pi1/aos/remote_timestamps/pi2/original/test/aos-examples-Ping");
+    pi1_ping_timestamp =
+        std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
+            pi1_event_loop.get(),
+            "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping");
+  }
+
+  log_reader_factory.Run();
+
+  EXPECT_EQ(pi1_ping.count(), 0u);
+  EXPECT_EQ(pi2_ping.count(), 0u);
+  EXPECT_NE(pi1_original_ping.count(), 0u);
+  EXPECT_NE(pi2_original_ping.count(), 0u);
+  if (!shared()) {
+    EXPECT_NE(pi1_original_ping_timestamp->count(), 0u);
+    EXPECT_EQ(pi1_ping_timestamp->count(), 0u);
+  }
+
+  reader.Deregister();
+}
+
 // Tests that we properly recreate forwarded timestamps when replaying a log.
 // This should be enough that we can then re-run the logger and get a valid log
 // back.