Enable renaming logged locations

This adds two main features:
-Renaming logged locations so that they don't interfere with attempted
 replay.
-Using a different (than logged) configuration for the replay so that we
 can take advantage of updated schemas or the such.

Change-Id: I81a0da10fd60a32be2e7ea87ffe98d9b3043198c
diff --git a/aos/configuration.h b/aos/configuration.h
index 7bf8203..a078dea 100644
--- a/aos/configuration.h
+++ b/aos/configuration.h
@@ -49,6 +49,16 @@
                                  const Node *node) {
   return GetChannel(&config.message(), name, type, application_name, node);
 }
+// Convenience wrapper for getting a channel from a specified config if you
+// already have the name/type in a Channel object--this is useful if you Channel
+// object you have does not point to memory within config.
+inline const Channel *GetChannel(const Configuration *config,
+                                 const Channel *channel,
+                                 const std::string_view application_name,
+                                 const Node *node) {
+  return GetChannel(config, channel->name()->string_view(),
+                    channel->type()->string_view(), application_name, node);
+}
 
 // Returns the Node out of the config with the matching name, or nullptr if it
 // can't be found.
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index 3e790b6..486cda9 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -219,20 +219,40 @@
   writer_->Flush();
 }
 
-LogReader::LogReader(std::string_view filename)
-    : sorted_message_reader_(filename) {
-  channels_.resize(configuration()->channels()->size());
+LogReader::LogReader(std::string_view filename,
+                     const Configuration *replay_configuration)
+    : sorted_message_reader_(filename),
+      replay_configuration_(replay_configuration) {
+  channels_.resize(logged_configuration()->channels()->size());
 }
 
 LogReader::~LogReader() {
   Deregister();
 }
 
-const Configuration *LogReader::configuration() const {
+const Configuration *LogReader::logged_configuration() const {
   return sorted_message_reader_.configuration();
 }
 
-const Node *LogReader::node() const { return sorted_message_reader_.node(); }
+const Configuration *LogReader::configuration() const {
+  CHECK(remapped_configuration_ != nullptr)
+      << ": Need to call Register() before the remapped config will be "
+         "generated.";
+  return remapped_configuration_;
+}
+
+const Node *LogReader::node() const {
+  // Because the Node pointer will only be valid if it actually points to memory
+  // owned by remapped_configuration_, we need to wait for the
+  // remapped_configuration_ to be populated before accessing it.
+  CHECK(remapped_configuration_ != nullptr)
+      << ": Need to call Register before the node() pointer will be valid.";
+  if (sorted_message_reader_.node() == nullptr) {
+    return nullptr;
+  }
+  return configuration::GetNode(
+      configuration(), sorted_message_reader_.node()->name()->string_view());
+}
 
 monotonic_clock::time_point LogReader::monotonic_start_time() {
   return sorted_message_reader_.monotonic_start_time();
@@ -243,6 +263,7 @@
 }
 
 void LogReader::Register() {
+  MakeRemappedConfig();
   event_loop_factory_unique_ptr_ =
       std::make_unique<SimulatedEventLoopFactory>(configuration(), node());
   Register(event_loop_factory_unique_ptr_.get());
@@ -268,13 +289,21 @@
   event_loop_->SkipTimingReport();
 
   for (size_t i = 0; i < channels_.size(); ++i) {
-    CHECK_EQ(configuration()->channels()->Get(i)->name(),
-             event_loop_->configuration()->channels()->Get(i)->name());
-    CHECK_EQ(configuration()->channels()->Get(i)->type(),
-             event_loop_->configuration()->channels()->Get(i)->type());
+    const Channel *const original_channel =
+        logged_configuration()->channels()->Get(i);
 
-    channels_[i] = event_loop_->MakeRawSender(
-        event_loop_->configuration()->channels()->Get(i));
+    std::string_view channel_name = original_channel->name()->string_view();
+    std::string_view channel_type = original_channel->type()->string_view();
+    // If the channel is remapped, find the correct channel name to use.
+    if (remapped_channels_.count(i) > 0) {
+      VLOG(2) << "Got remapped channel on "
+              << configuration::CleanedChannelToString(original_channel);
+      channel_name = remapped_channels_[i];
+    }
+    VLOG(1) << "Going to remap channel " << channel_name << " " << channel_type;
+    channels_[i] = event_loop_->MakeRawSender(CHECK_NOTNULL(
+        configuration::GetChannel(event_loop_->configuration(), channel_name,
+                                  channel_type, "", nullptr)));
   }
 
   timer_handler_ = event_loop_->AddTimer([this]() {
@@ -352,5 +381,109 @@
   event_loop_factory_ = nullptr;
 }
 
+void LogReader::RemapLoggedChannel(std::string_view name, std::string_view type,
+                                   std::string_view add_prefix) {
+  CHECK(remapped_configuration_ == nullptr)
+      << "Must call RemapLoggedChannel before calling Register().";
+  for (size_t ii = 0; ii < logged_configuration()->channels()->size(); ++ii) {
+    const Channel *const channel = logged_configuration()->channels()->Get(ii);
+    if (channel->name()->str() == name &&
+        channel->type()->string_view() == type) {
+      CHECK_EQ(0u, remapped_channels_.count(ii))
+          << "Already remapped channel "
+          << configuration::CleanedChannelToString(channel);
+      remapped_channels_[ii] = std::string(add_prefix) + std::string(name);
+      VLOG(1) << "Remapping channel "
+              << configuration::CleanedChannelToString(channel)
+              << " to have name " << remapped_channels_[ii];
+      return;
+    }
+  }
+  LOG(FATAL) << "Unabled to locate channel with name " << name << " and type "
+             << type;
+}
+
+void LogReader::MakeRemappedConfig() {
+  // If no remapping occurred and we are using the original config, then there
+  // is nothing interesting to do here.
+  if (remapped_channels_.empty() && replay_configuration_ == nullptr) {
+    remapped_configuration_ = sorted_message_reader_.configuration();
+    return;
+  }
+  // Config to copy Channel definitions from. Use the specified
+  // replay_configuration_ if it has been provided.
+  const Configuration *const base_config = replay_configuration_ == nullptr
+                                               ? logged_configuration()
+                                               : replay_configuration_;
+  // The remapped config will be identical to the base_config, except that it
+  // will have a bunch of extra channels in the channel list, which are exact
+  // copies of the remapped channels, but with different names.
+  // Because the flatbuffers API is a pain to work with, this requires a bit of
+  // a song-and-dance to get copied over.
+  // The order of operations is to:
+  // 1) Make a flatbuffer builder for a config that will just contain a list of
+  //    the new channels that we want to add.
+  // 2) For each channel that we are remapping:
+  //    a) Make a buffer/builder and construct into it a Channel table that only
+  //       contains the new name for the channel.
+  //    b) Merge the new channel with just the name into the channel that we are
+  //       trying to copy, built in the flatbuffer builder made in 1. This gives
+  //       us the new channel definition that we need.
+  // 3) Using this list of offsets, build the Configuration of just new
+  //    Channels.
+  // 4) Merge the Configuration with the new Channels into the base_config.
+  // 5) Call MergeConfiguration() on that result to give MergeConfiguration a
+  //    chance to sanitize the config.
+
+  // This is the builder that we use for the config containing all the new
+  // channels.
+  flatbuffers::FlatBufferBuilder new_config_fbb;
+  new_config_fbb.ForceDefaults(1);
+  std::vector<flatbuffers::Offset<Channel>> channel_offsets;
+  for (auto &pair : remapped_channels_) {
+    // This is the builder that we use for creating the Channel with just the
+    // new name.
+    flatbuffers::FlatBufferBuilder new_name_fbb;
+    new_name_fbb.ForceDefaults(1);
+    const flatbuffers::Offset<flatbuffers::String> name_offset =
+        new_name_fbb.CreateString(pair.second);
+    ChannelBuilder new_name_builder(new_name_fbb);
+    new_name_builder.add_name(name_offset);
+    new_name_fbb.Finish(new_name_builder.Finish());
+    const FlatbufferDetachedBuffer<Channel> new_name = new_name_fbb.Release();
+    // Retrieve the channel that we want to copy, confirming that it is actually
+    // present in base_config.
+    const Channel *const base_channel = CHECK_NOTNULL(configuration::GetChannel(
+        base_config, logged_configuration()->channels()->Get(pair.first), "",
+        nullptr));
+    // Actually create the new channel and put it into the vector of Offsets
+    // that we will use to create the new Configuration.
+    channel_offsets.emplace_back(MergeFlatBuffers<Channel>(
+        reinterpret_cast<const flatbuffers::Table *>(base_channel),
+        reinterpret_cast<const flatbuffers::Table *>(&new_name.message()),
+        &new_config_fbb));
+  }
+  // Create the Configuration containing the new channels that we want to add.
+  const auto
+      new_name_vector_offsets = new_config_fbb.CreateVector(channel_offsets);
+  ConfigurationBuilder new_config_builder(new_config_fbb);
+  new_config_builder.add_channels(new_name_vector_offsets);
+  new_config_fbb.Finish(new_config_builder.Finish());
+  const FlatbufferDetachedBuffer<Configuration> new_name_config =
+      new_config_fbb.Release();
+  // Merge the new channels configuration into the base_config, giving us the
+  // remapped configuration.
+  remapped_configuration_buffer_ =
+      std::make_unique<FlatbufferDetachedBuffer<Configuration>>(
+          MergeFlatBuffers<Configuration>(base_config,
+                                          &new_name_config.message()));
+  // Call MergeConfiguration to deal with sanitizing the config.
+  remapped_configuration_buffer_ =
+      std::make_unique<FlatbufferDetachedBuffer<Configuration>>(
+          configuration::MergeConfiguration(*remapped_configuration_buffer_));
+
+  remapped_configuration_ = &remapped_configuration_buffer_->message();
+}
+
 }  // namespace logger
 }  // namespace aos
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index e5b91ac..1740b1f 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -59,7 +59,12 @@
 // Replays all the channels in the logfile to the event loop.
 class LogReader {
  public:
-  LogReader(std::string_view filename);
+  // If you want to supply a new configuration that will be used for replay
+  // (e.g., to change message rates, or to populate an updated schema), then
+  // pass it in here. It must provide all the channels that the original logged
+  // config did.
+  LogReader(std::string_view filename,
+            const Configuration *replay_configuration = nullptr);
   ~LogReader();
 
   // Registers everything, but also updates the real time time in sync.  Runs
@@ -81,6 +86,8 @@
   void Deregister();
 
   // Returns the configuration from the log file.
+  const Configuration *logged_configuration() const;
+  // Returns the configuration being used for replay.
   const Configuration *configuration() const;
 
   // Returns the node that this log file was created on.
@@ -90,6 +97,18 @@
   monotonic_clock::time_point monotonic_start_time();
   realtime_clock::time_point realtime_start_time();
 
+  // Causes the logger to publish the provided channel on a different name so
+  // that replayed applications can publish on the proper channel name without
+  // interference. This operates on raw channel names, without any node or
+  // application specific mappings.
+  void RemapLoggedChannel(std::string_view name, std::string_view type,
+                          std::string_view add_prefix = "/original");
+  template <typename T>
+  void RemapLoggedChannel(std::string_view name,
+                          std::string_view add_prefix = "/original") {
+    RemapLoggedChannel(name, T::GetFullyQualifiedName(), add_prefix);
+  }
+
   SimulatedEventLoopFactory *event_loop_factory() {
     return event_loop_factory_;
   }
@@ -102,6 +121,9 @@
  private:
   // Queues at least max_out_of_order_duration_ messages into channels_.
   void QueueMessages();
+  // Handle constructing a configuration with all the additional remapped
+  // channels from calls to RemapLoggedChannel.
+  void MakeRemappedConfig();
 
   // Log chunk reader.
   SortedMessageReader sorted_message_reader_;
@@ -117,6 +139,14 @@
 
   std::unique_ptr<SimulatedEventLoopFactory> event_loop_factory_unique_ptr_;
   SimulatedEventLoopFactory *event_loop_factory_ = nullptr;
+
+  // Map of channel indices to new name. The channel index will be an index into
+  // logged_configuration(), and the string key will be the name of the channel
+  // to send on instead of the logged channel name.
+  std::map<size_t, std::string> remapped_channels_;
+
+  const Configuration *remapped_configuration_ = nullptr;
+  const Configuration *replay_configuration_ = nullptr;
 };
 
 }  // namespace logger
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index d96d4c6..e1808b1 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -59,10 +59,12 @@
     event_loop_factory_.RunFor(chrono::milliseconds(20000));
   }
 
-  LogReader reader(logfile);
+  // Even though it doesn't make any difference here, exercise the logic for
+  // passing in a separate config.
+  LogReader reader(logfile, &config_.message());
 
-  LOG(INFO) << "Config " << FlatbufferToJson(reader.configuration());
-  EXPECT_EQ(reader.node(), nullptr);
+  // Confirm that we can remap logged channels to point to new buses.
+  reader.RemapLoggedChannel<aos::examples::Ping>("/test", "/original");
 
   // This sends out the fetched messages and advances time to the start of the
   // log file.
@@ -78,8 +80,8 @@
   int ping_count = 10;
   int pong_count = 10;
 
-  // Confirm that the ping value matches.
-  test_event_loop->MakeWatcher("/test",
+  // Confirm that the ping value matches in the remapped channel location.
+  test_event_loop->MakeWatcher("/original/test",
                                [&ping_count](const examples::Ping &ping) {
                                  EXPECT_EQ(ping.value(), ping_count + 1);
                                  ++ping_count;