Switch magic timestamp channel to RemoteMessage

In order to track reboots and such, we really need to communicate more
information from the message_bridge to the logger.  It is a shame to
have to modify the MessageHeader to do this, even though it would be
nice.  Switch the remote timestamp channels over to a new RemoteMessage
channel instead, and add code to rename the channel when replaying.

There are no known log files with a MessageHeader actually logged, so
most of this should be pretty safe.  I've tested this on an old log file
by hand.

Change-Id: If81b31869b95040151d833d20ec3eb8623ab1cd4
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index 0aa37e7..a8bf77c 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -15,6 +15,8 @@
 #include "aos/events/logging/logger_generated.h"
 #include "aos/events/logging/uuid.h"
 #include "aos/flatbuffer_merge.h"
+#include "aos/network/remote_message_generated.h"
+#include "aos/network/remote_message_schema.h"
 #include "aos/network/team_number.h"
 #include "aos/time/time.h"
 #include "aos/util/file.h"
@@ -47,7 +49,69 @@
   CHECK(result);
   return result.value();
 }
+
+// Copies the channel, removing the schema as we go.  If new_name is provided,
+// it is used instead of the name inside the channel.  If new_type is provided,
+// it is used instead of the type in the channel.
+flatbuffers::Offset<Channel> CopyChannel(const Channel *c,
+                                         std::string_view new_name,
+                                         std::string_view new_type,
+                                         flatbuffers::FlatBufferBuilder *fbb) {
+  flatbuffers::Offset<flatbuffers::String> name_offset =
+      fbb->CreateSharedString(new_name.empty() ? c->name()->string_view()
+                                               : new_name);
+  flatbuffers::Offset<flatbuffers::String> type_offset =
+      fbb->CreateSharedString(new_type.empty() ? c->type()->str() : new_type);
+  flatbuffers::Offset<flatbuffers::String> source_node_offset =
+      c->has_source_node() ? fbb->CreateSharedString(c->source_node()->str())
+                           : 0;
+
+  flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Connection>>>
+      destination_nodes_offset =
+          aos::RecursiveCopyVectorTable(c->destination_nodes(), fbb);
+
+  flatbuffers::Offset<
+      flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>>
+      logger_nodes_offset = aos::CopyVectorSharedString(c->logger_nodes(), fbb);
+
+  Channel::Builder channel_builder(*fbb);
+  channel_builder.add_name(name_offset);
+  channel_builder.add_type(type_offset);
+  if (c->has_frequency()) {
+    channel_builder.add_frequency(c->frequency());
+  }
+  if (c->has_max_size()) {
+    channel_builder.add_max_size(c->max_size());
+  }
+  if (c->has_num_senders()) {
+    channel_builder.add_num_senders(c->num_senders());
+  }
+  if (c->has_num_watchers()) {
+    channel_builder.add_num_watchers(c->num_watchers());
+  }
+  if (!source_node_offset.IsNull()) {
+    channel_builder.add_source_node(source_node_offset);
+  }
+  if (!destination_nodes_offset.IsNull()) {
+    channel_builder.add_destination_nodes(destination_nodes_offset);
+  }
+  if (c->has_logger()) {
+    channel_builder.add_logger(c->logger());
+  }
+  if (!logger_nodes_offset.IsNull()) {
+    channel_builder.add_logger_nodes(logger_nodes_offset);
+  }
+  if (c->has_read_method()) {
+    channel_builder.add_read_method(c->read_method());
+  }
+  if (c->has_num_readers()) {
+    channel_builder.add_num_readers(c->num_readers());
+  }
+  return channel_builder.Finish();
+}
+
 namespace chrono = std::chrono;
+using message_bridge::RemoteMessage;
 }  // namespace
 
 Logger::Logger(EventLoop *event_loop, const Configuration *configuration,
@@ -83,7 +147,7 @@
     const Channel *channel = configuration::GetChannel(
         event_loop->configuration(),
         absl::StrCat("/aos/remote_timestamps/", node->name()->string_view()),
-        logger::MessageHeader::GetFullyQualifiedName(), event_loop_->name(),
+        RemoteMessage::GetFullyQualifiedName(), event_loop_->name(),
         event_loop_->node());
 
     CHECK(channel != nullptr)
@@ -137,7 +201,7 @@
           channel, event_loop_->node(), event_loop_->node());
     }
 
-    // Now, detect a MessageHeader timestamp logger where we should just log the
+    // Now, detect a RemoteMessage timestamp logger where we should just log the
     // contents to a file directly.
     const bool log_contents = timestamp_logger_channels.find(channel) !=
                               timestamp_logger_channels.end();
@@ -607,8 +671,8 @@
         flatbuffers::FlatBufferBuilder fbb;
         fbb.ForceDefaults(true);
 
-        const MessageHeader *msg =
-            flatbuffers::GetRoot<MessageHeader>(f.fetcher->context().data);
+        const RemoteMessage *msg =
+            flatbuffers::GetRoot<RemoteMessage>(f.fetcher->context().data);
 
         logger::MessageHeader::Builder message_header_builder(fbb);
 
@@ -735,11 +799,22 @@
     for (const Node *remote_node : timestamp_logger_nodes) {
       const std::string channel = absl::StrCat(
           "/aos/remote_timestamps/", remote_node->name()->string_view());
-      CHECK(HasChannel<logger::MessageHeader>(channel, node))
-          << ": Failed to find {\"name\": \"" << channel << "\", \"type\": \""
-          << logger::MessageHeader::GetFullyQualifiedName() << "\"} for node "
-          << node->name()->string_view();
-      RemapLoggedChannel<logger::MessageHeader>(channel, node);
+      // See if the log file is an old log with MessageHeader channels in it, or
+      // a newer log with RemoteMessage.  If we find an older log, rename the
+      // type too along with the name.
+      if (HasChannel<MessageHeader>(channel, node)) {
+        CHECK(!HasChannel<RemoteMessage>(channel, node))
+            << ": Can't have both a MessageHeader and RemoteMessage remote "
+               "timestamp channel.";
+        RemapLoggedChannel<MessageHeader>(channel, node, "/original",
+                                          "aos.message_bridge.RemoteMessage");
+      } else {
+        CHECK(HasChannel<RemoteMessage>(channel, node))
+            << ": Failed to find {\"name\": \"" << channel << "\", \"type\": \""
+            << RemoteMessage::GetFullyQualifiedName() << "\"} for node "
+            << node->name()->string_view();
+        RemapLoggedChannel<RemoteMessage>(channel, node);
+      }
     }
   }
 
@@ -1239,7 +1314,7 @@
         logged_configuration()->channels()->Get(logged_channel_index));
 
     message_bridge::NoncausalOffsetEstimator *filter = nullptr;
-    aos::Sender<MessageHeader> *remote_timestamp_sender = nullptr;
+    aos::Sender<RemoteMessage> *remote_timestamp_sender = nullptr;
 
     State *source_state = nullptr;
 
@@ -1542,7 +1617,8 @@
 }
 
 void LogReader::RemapLoggedChannel(std::string_view name, std::string_view type,
-                                   std::string_view add_prefix) {
+                                   std::string_view add_prefix,
+                                   std::string_view new_type) {
   for (size_t ii = 0; ii < logged_configuration()->channels()->size(); ++ii) {
     const Channel *const channel = logged_configuration()->channels()->Get(ii);
     if (channel->name()->str() == name &&
@@ -1550,10 +1626,14 @@
       CHECK_EQ(0u, remapped_channels_.count(ii))
           << "Already remapped channel "
           << configuration::CleanedChannelToString(channel);
-      remapped_channels_[ii] = std::string(add_prefix) + std::string(name);
+      RemappedChannel remapped_channel;
+      remapped_channel.remapped_name =
+          std::string(add_prefix) + std::string(name);
+      remapped_channel.new_type = new_type;
+      remapped_channels_[ii] = std::move(remapped_channel);
       VLOG(1) << "Remapping channel "
               << configuration::CleanedChannelToString(channel)
-              << " to have name " << remapped_channels_[ii];
+              << " to have name " << remapped_channels_[ii].remapped_name;
       MakeRemappedConfig();
       return;
     }
@@ -1564,7 +1644,8 @@
 
 void LogReader::RemapLoggedChannel(std::string_view name, std::string_view type,
                                    const Node *node,
-                                   std::string_view add_prefix) {
+                                   std::string_view add_prefix,
+                                   std::string_view new_type) {
   VLOG(1) << "Node is " << aos::FlatbufferToJson(node);
   const Channel *remapped_channel =
       configuration::GetChannel(logged_configuration(), name, type, "", node);
@@ -1601,8 +1682,13 @@
   CHECK_EQ(0u, remapped_channels_.count(channel_index))
       << "Already remapped channel "
       << configuration::CleanedChannelToString(remapped_channel);
-  remapped_channels_[channel_index] =
-      absl::StrCat(add_prefix, remapped_channel->name()->string_view());
+
+  RemappedChannel remapped_channel_struct;
+  remapped_channel_struct.remapped_name =
+      std::string(add_prefix) +
+      std::string(remapped_channel->name()->string_view());
+  remapped_channel_struct.new_type = new_type;
+  remapped_channels_[channel_index] = std::move(remapped_channel_struct);
   MakeRemappedConfig();
 }
 
@@ -1625,73 +1711,96 @@
   const Configuration *const base_config = replay_configuration_ == nullptr
                                                ? logged_configuration()
                                                : replay_configuration_;
-  // The remapped config will be identical to the base_config, except that it
-  // will have a bunch of extra channels in the channel list, which are exact
-  // copies of the remapped channels, but with different names.
-  // Because the flatbuffers API is a pain to work with, this requires a bit of
-  // a song-and-dance to get copied over.
-  // The order of operations is to:
-  // 1) Make a flatbuffer builder for a config that will just contain a list of
-  //    the new channels that we want to add.
-  // 2) For each channel that we are remapping:
-  //    a) Make a buffer/builder and construct into it a Channel table that only
-  //       contains the new name for the channel.
-  //    b) Merge the new channel with just the name into the channel that we are
-  //       trying to copy, built in the flatbuffer builder made in 1. This gives
-  //       us the new channel definition that we need.
-  // 3) Using this list of offsets, build the Configuration of just new
-  //    Channels.
-  // 4) Merge the Configuration with the new Channels into the base_config.
-  // 5) Call MergeConfiguration() on that result to give MergeConfiguration a
-  //    chance to sanitize the config.
+
+  // Create a config with all the channels, but un-sorted/merged.  Collect up
+  // the schemas while we do this.  Call MergeConfiguration to sort everything,
+  // and then merge it all in together.
 
   // This is the builder that we use for the config containing all the new
   // channels.
-  flatbuffers::FlatBufferBuilder new_config_fbb;
-  new_config_fbb.ForceDefaults(true);
+  flatbuffers::FlatBufferBuilder fbb;
+  fbb.ForceDefaults(true);
   std::vector<flatbuffers::Offset<Channel>> channel_offsets;
+
+  CHECK_EQ(Channel::MiniReflectTypeTable()->num_elems, 13u)
+      << ": Merging logic needs to be updated when the number of channel "
+         "fields changes.";
+
+  // List of schemas.
+  std::map<std::string_view, FlatbufferVector<reflection::Schema>> schema_map;
+  // Make sure our new RemoteMessage schema is in there for old logs without it.
+  schema_map.insert(std::make_pair(
+      RemoteMessage::GetFullyQualifiedName(),
+      FlatbufferVector<reflection::Schema>(FlatbufferSpan<reflection::Schema>(
+          message_bridge::RemoteMessageSchema()))));
+
+  // Reconstruct the remapped channels.
   for (auto &pair : remapped_channels_) {
-    // This is the builder that we use for creating the Channel with just the
-    // new name.
-    flatbuffers::FlatBufferBuilder new_name_fbb;
-    new_name_fbb.ForceDefaults(true);
-    const flatbuffers::Offset<flatbuffers::String> name_offset =
-        new_name_fbb.CreateString(pair.second);
-    ChannelBuilder new_name_builder(new_name_fbb);
-    new_name_builder.add_name(name_offset);
-    new_name_fbb.Finish(new_name_builder.Finish());
-    const FlatbufferDetachedBuffer<Channel> new_name = new_name_fbb.Release();
-    // Retrieve the channel that we want to copy, confirming that it is
-    // actually present in base_config.
-    const Channel *const base_channel = CHECK_NOTNULL(configuration::GetChannel(
+    const Channel *const c = CHECK_NOTNULL(configuration::GetChannel(
         base_config, logged_configuration()->channels()->Get(pair.first), "",
         nullptr));
-    // Actually create the new channel and put it into the vector of Offsets
-    // that we will use to create the new Configuration.
-    channel_offsets.emplace_back(MergeFlatBuffers<Channel>(
-        reinterpret_cast<const flatbuffers::Table *>(base_channel),
-        reinterpret_cast<const flatbuffers::Table *>(&new_name.message()),
-        &new_config_fbb));
+    channel_offsets.emplace_back(
+        CopyChannel(c, pair.second.remapped_name, "", &fbb));
   }
-  // Create the Configuration containing the new channels that we want to add.
-  const auto new_channel_vector_offsets =
-      new_config_fbb.CreateVector(channel_offsets);
 
-  // Now create the new maps.
+  // Now reconstruct the original channels, translating types as needed
+  for (const Channel *c : *base_config->channels()) {
+    // Search for a mapping channel.
+    std::string_view new_type = "";
+    for (auto &pair : remapped_channels_) {
+      const Channel *const remapped_channel =
+          logged_configuration()->channels()->Get(pair.first);
+      if (remapped_channel->name()->string_view() == c->name()->string_view() &&
+          remapped_channel->type()->string_view() == c->type()->string_view()) {
+        new_type = pair.second.new_type;
+        break;
+      }
+    }
+
+    // Copy everything over.
+    channel_offsets.emplace_back(CopyChannel(c, "", new_type, &fbb));
+
+    // Add the schema if it doesn't exist.
+    if (schema_map.find(c->type()->string_view()) == schema_map.end()) {
+      CHECK(c->has_schema());
+      schema_map.insert(std::make_pair(c->type()->string_view(),
+                                       RecursiveCopyFlatBuffer(c->schema())));
+    }
+  }
+
+  // The MergeConfiguration API takes a vector, not a map.  Convert.
+  std::vector<FlatbufferVector<reflection::Schema>> schemas;
+  while (!schema_map.empty()) {
+    schemas.emplace_back(std::move(schema_map.begin()->second));
+    schema_map.erase(schema_map.begin());
+  }
+
+  // Create the Configuration containing the new channels that we want to add.
+  const flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Channel>>>
+      channels_offset =
+          channel_offsets.empty() ? 0 : fbb.CreateVector(channel_offsets);
+
+  // Copy over the old maps.
   std::vector<flatbuffers::Offset<Map>> map_offsets;
+  if (base_config->maps()) {
+    for (const Map *map : *base_config->maps()) {
+      map_offsets.emplace_back(aos::RecursiveCopyFlatBuffer(map, &fbb));
+    }
+  }
+
+  // Now create the new maps.  These are second so they take effect first.
   for (const MapT &map : maps_) {
     const flatbuffers::Offset<flatbuffers::String> match_name_offset =
-        new_config_fbb.CreateString(map.match->name);
+        fbb.CreateString(map.match->name);
     const flatbuffers::Offset<flatbuffers::String> match_type_offset =
-        new_config_fbb.CreateString(map.match->type);
+        fbb.CreateString(map.match->type);
     const flatbuffers::Offset<flatbuffers::String> rename_name_offset =
-        new_config_fbb.CreateString(map.rename->name);
+        fbb.CreateString(map.rename->name);
     flatbuffers::Offset<flatbuffers::String> match_source_node_offset;
     if (!map.match->source_node.empty()) {
-      match_source_node_offset =
-          new_config_fbb.CreateString(map.match->source_node);
+      match_source_node_offset = fbb.CreateString(map.match->source_node);
     }
-    Channel::Builder match_builder(new_config_fbb);
+    Channel::Builder match_builder(fbb);
     match_builder.add_name(match_name_offset);
     match_builder.add_type(match_type_offset);
     if (!map.match->source_node.empty()) {
@@ -1699,36 +1808,66 @@
     }
     const flatbuffers::Offset<Channel> match_offset = match_builder.Finish();
 
-    Channel::Builder rename_builder(new_config_fbb);
+    Channel::Builder rename_builder(fbb);
     rename_builder.add_name(rename_name_offset);
     const flatbuffers::Offset<Channel> rename_offset = rename_builder.Finish();
 
-    Map::Builder map_builder(new_config_fbb);
+    Map::Builder map_builder(fbb);
     map_builder.add_match(match_offset);
     map_builder.add_rename(rename_offset);
     map_offsets.emplace_back(map_builder.Finish());
   }
 
-  const auto new_maps_offsets = new_config_fbb.CreateVector(map_offsets);
+  flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Map>>>
+      maps_offsets = map_offsets.empty() ? 0 : fbb.CreateVector(map_offsets);
 
-  ConfigurationBuilder new_config_builder(new_config_fbb);
-  new_config_builder.add_channels(new_channel_vector_offsets);
-  new_config_builder.add_maps(new_maps_offsets);
-  new_config_fbb.Finish(new_config_builder.Finish());
-  const FlatbufferDetachedBuffer<Configuration> new_name_config =
-      new_config_fbb.Release();
-  // Merge the new channels configuration into the base_config, giving us the
-  // remapped configuration.
+  // And copy everything else over.
+  flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Node>>>
+      nodes_offset = aos::RecursiveCopyVectorTable(base_config->nodes(), &fbb);
+
+  flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Application>>>
+      applications_offset =
+          aos::RecursiveCopyVectorTable(base_config->applications(), &fbb);
+
+  // Now insert everything else in unmodified.
+  ConfigurationBuilder configuration_builder(fbb);
+  if (!channels_offset.IsNull()) {
+    configuration_builder.add_channels(channels_offset);
+  }
+  if (!maps_offsets.IsNull()) {
+    configuration_builder.add_maps(maps_offsets);
+  }
+  if (!nodes_offset.IsNull()) {
+    configuration_builder.add_nodes(nodes_offset);
+  }
+  if (!applications_offset.IsNull()) {
+    configuration_builder.add_applications(applications_offset);
+  }
+
+  if (base_config->has_channel_storage_duration()) {
+    configuration_builder.add_channel_storage_duration(
+        base_config->channel_storage_duration());
+  }
+
+  CHECK_EQ(Configuration::MiniReflectTypeTable()->num_elems, 6u)
+      << ": Merging logic needs to be updated when the number of configuration "
+         "fields changes.";
+
+  fbb.Finish(configuration_builder.Finish());
+
+  // Clean it up and return it!  By using MergeConfiguration here, we'll
+  // actually get a deduplicated config for free too.
+  FlatbufferDetachedBuffer<Configuration> new_merged_config =
+      configuration::MergeConfiguration(
+          FlatbufferDetachedBuffer<Configuration>(fbb.Release()));
+
   remapped_configuration_buffer_ =
       std::make_unique<FlatbufferDetachedBuffer<Configuration>>(
-          MergeFlatBuffers<Configuration>(base_config,
-                                          &new_name_config.message()));
-  // Call MergeConfiguration to deal with sanitizing the config.
-  remapped_configuration_buffer_ =
-      std::make_unique<FlatbufferDetachedBuffer<Configuration>>(
-          configuration::MergeConfiguration(*remapped_configuration_buffer_));
+          configuration::MergeConfiguration(new_merged_config, schemas));
 
   remapped_configuration_ = &remapped_configuration_buffer_->message();
+
+  // TODO(austin): Lazily re-build to save CPU?
 }
 
 const Channel *LogReader::RemapChannel(const EventLoop *event_loop,
@@ -1741,7 +1880,7 @@
   if (remapped_channels_.count(channel_index) > 0) {
     VLOG(3) << "Got remapped channel on "
             << configuration::CleanedChannelToString(channel);
-    channel_name = remapped_channels_[channel_index];
+    channel_name = remapped_channels_[channel_index].remapped_name;
   }
 
   VLOG(2) << "Going to remap channel " << channel_name << " " << channel_type;
@@ -1786,7 +1925,7 @@
     size_t logged_channel_index, size_t factory_channel_index,
     std::unique_ptr<RawSender> sender,
     message_bridge::NoncausalOffsetEstimator *filter,
-    aos::Sender<MessageHeader> *remote_timestamp_sender, State *source_state) {
+    aos::Sender<RemoteMessage> *remote_timestamp_sender, State *source_state) {
   channels_[logged_channel_index] = std::move(sender);
   filters_[logged_channel_index] = filter;
   remote_timestamp_senders_[logged_channel_index] = remote_timestamp_sender;
@@ -1874,12 +2013,12 @@
         timestamp);
   } else if (remote_timestamp_senders_[timestamped_message.channel_index] !=
              nullptr) {
-    aos::Sender<MessageHeader>::Builder builder =
+    aos::Sender<RemoteMessage>::Builder builder =
         remote_timestamp_senders_[timestamped_message.channel_index]
             ->MakeBuilder();
 
-    logger::MessageHeader::Builder message_header_builder =
-        builder.MakeBuilder<logger::MessageHeader>();
+    RemoteMessage::Builder message_header_builder =
+        builder.MakeBuilder<RemoteMessage>();
 
     message_header_builder.add_channel_index(
         factory_channel_index_[timestamped_message.channel_index]);
@@ -1905,7 +2044,7 @@
   return true;
 }
 
-aos::Sender<MessageHeader> *LogReader::State::RemoteTimestampSender(
+aos::Sender<RemoteMessage> *LogReader::State::RemoteTimestampSender(
     const Node *delivered_node) {
   auto sender = remote_timestamp_senders_map_.find(delivered_node);
 
@@ -1913,7 +2052,7 @@
     sender = remote_timestamp_senders_map_
                  .emplace(std::make_pair(
                      delivered_node,
-                     event_loop()->MakeSender<MessageHeader>(
+                     event_loop()->MakeSender<RemoteMessage>(
                          absl::StrCat("/aos/remote_timestamps/",
                                       delivered_node->name()->string_view()))))
                  .first;
@@ -1984,7 +2123,8 @@
     timestamp_mapper_->PopFront();
 
     // Skip any messages without forwarding information.
-    if (timestamped_message.monotonic_remote_time != monotonic_clock::min_time) {
+    if (timestamped_message.monotonic_remote_time !=
+        monotonic_clock::min_time) {
       // Got a forwarding timestamp!
       filter = filters_[timestamped_message.channel_index];