Fix AOS logging when using non-EventLoop configuration

Fixed crash in aos::Logger when reading back a log that was made
by providing a different configuration to the Logger than was
used by the event loop.

Change event_loop_to_logged_channel_index_ to use
`std::optional<uint32_t>` instead of `int`. Channel indices
should be uint32_t. Changed to optional because the channel can
be missing from the configuration specified in the logger
Constructor.

Change-Id: I5203b390689cab27cd79ee35b029cfdd0f98dfd8
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/configuration.cc b/aos/configuration.cc
index 6cae0d4..4de5332 100644
--- a/aos/configuration.cc
+++ b/aos/configuration.cc
@@ -1729,5 +1729,39 @@
   return MergeWithConfig(config, new_channel_config);
 }
 
+FlatbufferDetachedBuffer<Configuration> GetPartialConfiguration(
+    const Configuration &configuration,
+    std::function<bool(const Channel &)> should_include_channel) {
+  // create new_configuration1, containing everything except the `channels`
+  // field.
+  FlatbufferDetachedBuffer<Configuration> new_configuration1 =
+      RecursiveCopyFlatBuffer(&configuration);
+  new_configuration1.mutable_message()->clear_channels();
+
+  // create new_configuration2, containing only the `channels` field.
+  flatbuffers::FlatBufferBuilder fbb;
+  std::vector<flatbuffers::Offset<Channel>> new_channels_vec;
+  for (const auto &channel : *configuration.channels()) {
+    CHECK_NOTNULL(channel);
+    if (should_include_channel(*channel)) {
+      new_channels_vec.push_back(RecursiveCopyFlatBuffer(channel, &fbb));
+    }
+  }
+  flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Channel>>>
+      new_channels_offset = fbb.CreateVector(new_channels_vec);
+  Configuration::Builder new_configuration2_builder(fbb);
+  new_configuration2_builder.add_channels(new_channels_offset);
+  fbb.Finish(new_configuration2_builder.Finish());
+  FlatbufferDetachedBuffer<Configuration> new_configuration2 = fbb.Release();
+
+  // Merge the configuration containing channels with the configuration
+  // containing everything else, creating a complete configuration.
+  const aos::FlatbufferDetachedBuffer<Configuration> raw_subset_configuration =
+      MergeFlatBuffers(&new_configuration1.message(),
+                       &new_configuration2.message());
+
+  // Use MergeConfiguration to clean up redundant schemas.
+  return configuration::MergeConfiguration(raw_subset_configuration);
+}
 }  // namespace configuration
 }  // namespace aos
diff --git a/aos/configuration.h b/aos/configuration.h
index 01b9e7c..7ef2e6e 100644
--- a/aos/configuration.h
+++ b/aos/configuration.h
@@ -248,6 +248,12 @@
     aos::FlatbufferVector<reflection::Schema> schema,
     const aos::Node *source_node = nullptr, ChannelT overrides = {});
 
+// Build a new configuration that only contains the channels we want to
+// include. This is useful for excluding obsolete or deprecated channels, so
+// they don't appear in the configuration when reading the log.
+FlatbufferDetachedBuffer<Configuration> GetPartialConfiguration(
+    const Configuration &configuration,
+    std::function<bool(const Channel &)> should_include_channel);
 }  // namespace configuration
 
 // Compare and equality operators for Channel.  Note: these only check the name
diff --git a/aos/configuration_test.cc b/aos/configuration_test.cc
index e5fe303..eb08d78 100644
--- a/aos/configuration_test.cc
+++ b/aos/configuration_test.cc
@@ -1166,4 +1166,68 @@
   ASSERT_EQ(971, channel->frequency());
 }
 
+// Create a new configuration with the specified channel removed.
+// Initially there must be exactly one channel in the base_config that matches
+// the criteria. Check to make sure the new configuration has one less channel,
+// and that channel is the specified channel.
+void TestGetPartialConfiguration(const Configuration &base_config,
+                                 std::string_view test_channel_name,
+                                 std::string_view test_channel_type) {
+  const Channel *channel_from_base_config = GetChannel(
+      &base_config, test_channel_name, test_channel_type, "", nullptr);
+  ASSERT_TRUE(channel_from_base_config != nullptr);
+
+  const FlatbufferDetachedBuffer<Configuration> new_config =
+      configuration::GetPartialConfiguration(
+          base_config,
+          // should_include_channel function
+          [test_channel_name, test_channel_type](const Channel &channel) {
+            if (channel.name()->string_view() == test_channel_name &&
+                channel.type()->string_view() == test_channel_type) {
+              LOG(INFO) << "Omitting channel from save_log, channel: "
+                        << channel.name()->string_view() << ", "
+                        << channel.type()->string_view();
+              return false;
+            }
+            return true;
+          });
+
+  EXPECT_EQ(new_config.message().channels()->size(),
+            base_config.channels()->size() - 1);
+
+  channel_from_base_config = GetChannel(&base_config, test_channel_name,
+                                        test_channel_type, "", nullptr);
+  EXPECT_TRUE(channel_from_base_config != nullptr);
+
+  const Channel *channel_from_new_config =
+      GetChannel(new_config, test_channel_name, test_channel_type, "", nullptr);
+  EXPECT_TRUE(channel_from_new_config == nullptr);
+}
+
+// Tests that we can use a utility to remove individual channels from a
+// single-node config.
+TEST_F(ConfigurationTest, RemoveChannelsFromConfigSingleNode) {
+  FlatbufferDetachedBuffer<Configuration> base_config =
+      ReadConfig(ArtifactPath("aos/testdata/config1.json"));
+
+  constexpr std::string_view test_channel_name = "/foo2";
+  constexpr std::string_view test_channel_type = ".aos.bar";
+
+  TestGetPartialConfiguration(base_config.message(), test_channel_name,
+                              test_channel_type);
+}
+
+// Tests that we can use a utility to remove individual channels from a
+// multi-node config.
+TEST_F(ConfigurationTest, RemoveChannelsFromConfigMultiNode) {
+  FlatbufferDetachedBuffer<Configuration> base_config =
+      ReadConfig(ArtifactPath("aos/testdata/good_multinode.json"));
+
+  constexpr std::string_view test_channel_name = "/batman";
+  constexpr std::string_view test_channel_type = ".aos.baz";
+
+  TestGetPartialConfiguration(base_config.message(), test_channel_name,
+                              test_channel_type);
+}
+
 }  // namespace aos::configuration::testing
diff --git a/aos/events/logging/log_writer.cc b/aos/events/logging/log_writer.cc
index f840c22..ece4055 100644
--- a/aos/events/logging/log_writer.cc
+++ b/aos/events/logging/log_writer.cc
@@ -41,23 +41,7 @@
   // from the channel index that the event loop uses to the channel index in
   // the config in the log file.
   event_loop_to_logged_channel_index_.resize(
-      event_loop->configuration()->channels()->size(), -1);
-  for (size_t event_loop_channel_index = 0;
-       event_loop_channel_index <
-       event_loop->configuration()->channels()->size();
-       ++event_loop_channel_index) {
-    const Channel *event_loop_channel =
-        event_loop->configuration()->channels()->Get(event_loop_channel_index);
-
-    const Channel *logged_channel = aos::configuration::GetChannel(
-        configuration_, event_loop_channel->name()->string_view(),
-        event_loop_channel->type()->string_view(), "", node_);
-
-    if (logged_channel != nullptr) {
-      event_loop_to_logged_channel_index_[event_loop_channel_index] =
-          configuration::ChannelIndex(configuration_, logged_channel);
-    }
-  }
+      event_loop->configuration()->channels()->size());
 
   // Map to match source channels with the timestamp logger, if the contents
   // should be reliable, and a list of all channels logged on it to be treated
@@ -92,6 +76,18 @@
         const Channel *const timestamp_logger_channel =
             finder.ForChannel(channel, connection);
 
+        const Channel *logged_channel = aos::configuration::GetChannel(
+            configuration_, channel->name()->string_view(),
+            channel->type()->string_view(), "", node_);
+        if (logged_channel == nullptr) {
+          // The channel doesn't exist in configuration_, so don't log it.
+          continue;
+        }
+        if (!should_log(logged_channel)) {
+          // The channel didn't pass our `should_log` check, so don't log it.
+          continue;
+        }
+
         auto it = timestamp_logger_channels.find(timestamp_logger_channel);
         if (it != timestamp_logger_channels.end()) {
           CHECK(!is_split);
@@ -120,27 +116,41 @@
     }
   }
 
-  for (size_t channel_index = 0;
-       channel_index < configuration_->channels()->size(); ++channel_index) {
-    const Channel *const config_channel =
-        configuration_->channels()->Get(channel_index);
+  for (size_t event_loop_channel_index = 0;
+       event_loop_channel_index <
+       event_loop->configuration()->channels()->size();
+       ++event_loop_channel_index) {
+    const Channel *channel =
+        event_loop->configuration()->channels()->Get(event_loop_channel_index);
+
     // The MakeRawFetcher method needs a channel which is in the event loop
     // configuration() object, not the configuration_ object.  Go look that up
     // from the config.
-    const Channel *channel = aos::configuration::GetChannel(
-        event_loop_->configuration(), config_channel->name()->string_view(),
-        config_channel->type()->string_view(), "", event_loop_->node());
-    CHECK(channel != nullptr)
-        << ": Failed to look up channel "
-        << aos::configuration::CleanedChannelToString(config_channel);
-    if (!should_log(config_channel)) {
+    const Channel *config_channel = aos::configuration::GetChannel(
+        configuration_, channel->name()->string_view(),
+        channel->type()->string_view(), "", node_);
+
+    if (config_channel == nullptr) {
+      // The channel doesn't exist in configuration_, so don't log the
+      // timestamps.
       continue;
     }
+    if (!should_log(config_channel)) {
+      // The channel didn't pass our `should_log` check, so don't log the
+      // timestamps.
+      continue;
+    }
+
+    const uint32_t channel_index =
+        configuration::ChannelIndex(configuration_, config_channel);
 
     FetcherStruct fs;
     fs.channel_index = channel_index;
     fs.channel = channel;
 
+    event_loop_to_logged_channel_index_[event_loop_channel_index] =
+        channel_index;
+
     const bool is_local =
         configuration::ChannelIsSendableOnNode(config_channel, node_);
 
@@ -759,9 +769,11 @@
     CHECK(msg->has_boot_uuid()) << ": " << aos::FlatbufferToJson(msg);
     // Translate from the channel index that the event loop uses to the
     // channel index in the log file.
-    const int channel_index =
+    const std::optional<uint32_t> channel_index =
         event_loop_to_logged_channel_index_[msg->channel_index()];
 
+    CHECK(channel_index.has_value());
+
     const aos::monotonic_clock::time_point monotonic_timestamp_time =
         f.fetcher->context().monotonic_event_time;
 
@@ -784,8 +796,8 @@
             chrono::nanoseconds(msg->monotonic_sent_time())),
         reliable, monotonic_timestamp_time);
 
-    RemoteMessageCopier coppier(msg, channel_index, monotonic_timestamp_time,
-                                event_loop_);
+    RemoteMessageCopier coppier(msg, channel_index.value(),
+                                monotonic_timestamp_time, event_loop_);
 
     contents_writer->CopyRemoteTimestampMessage(
         &coppier, UUID::FromVector(msg->boot_uuid()), start,
diff --git a/aos/events/logging/log_writer.h b/aos/events/logging/log_writer.h
index eb69d7e..a9c4cb9 100644
--- a/aos/events/logging/log_writer.h
+++ b/aos/events/logging/log_writer.h
@@ -203,9 +203,13 @@
     std::unique_ptr<RawFetcher> fetcher;
     bool written = false;
 
-    // Channel index to log to.
+    // Index of the channel in the logged configuration (not necessarily the
+    // event loop configuration).
     int channel_index = -1;
+
+    // Channel from the event_loop configuration.
     const Channel *channel = nullptr;
+
     const Node *timestamp_node = nullptr;
 
     LogType log_type = LogType::kLogMessage;
@@ -245,7 +249,13 @@
 
   // Vector mapping from the channel index from the event loop to the logged
   // channel index.
-  std::vector<int> event_loop_to_logged_channel_index_;
+  // When using the constructor that allows manually specifying the
+  // configuration, that configuration may have different channels than the
+  // event loop's configuration. When there is a channel that is included in the
+  // event loop configuration but not in the specified configuration, the value
+  // in this mapping will be nullopt for that channel. Nullopt will result in
+  // that channel not being included in the output log's configuration or data.
+  std::vector<std::optional<uint32_t>> event_loop_to_logged_channel_index_;
 
   // Start/Restart write configuration into LogNamer space.
   std::string WriteConfiguration(LogNamer *log_namer);
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 0166b72..326fa6d 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -1824,6 +1824,7 @@
       size_t monotonic_remote_boot = 0xffffff;
 
       if (msg->monotonic_remote_time.has_value()) {
+        CHECK_LT(msg->channel_index, source_node_index_.size());
         const Node *node = parts().config->nodes()->Get(
             source_node_index_[msg->channel_index]);
 
diff --git a/aos/events/logging/multinode_logger_test.cc b/aos/events/logging/multinode_logger_test.cc
index 3c438e3..427347d 100644
--- a/aos/events/logging/multinode_logger_test.cc
+++ b/aos/events/logging/multinode_logger_test.cc
@@ -3124,6 +3124,107 @@
   }
 }
 
+// Tests that we can relog with a subset of the original config. This is useful
+// for excluding obsolete or deprecated channels, so they don't appear in the
+// configuration when reading the log.
+TEST_P(MultinodeLoggerTest, LogPartialConfig) {
+  time_converter_.StartEqual();
+  {
+    LoggerState pi1_logger = MakeLogger(pi1_);
+    LoggerState pi2_logger = MakeLogger(pi2_);
+
+    event_loop_factory_.RunFor(chrono::milliseconds(95));
+
+    StartLogger(&pi1_logger);
+    StartLogger(&pi2_logger);
+
+    event_loop_factory_.RunFor(chrono::milliseconds(20000));
+  }
+
+  auto sorted_parts = SortParts(logfiles_);
+  EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
+  LogReader reader(sorted_parts);
+  reader.RemapLoggedChannel<aos::examples::Ping>("/test", "/original");
+
+  SimulatedEventLoopFactory log_reader_factory(reader.configuration());
+  log_reader_factory.set_send_delay(chrono::microseconds(0));
+
+  // This sends out the fetched messages and advances time to the start of the
+  // log file.
+  reader.Register(&log_reader_factory);
+
+  const Node *pi1 =
+      configuration::GetNode(log_reader_factory.configuration(), "pi1");
+  const Node *pi2 =
+      configuration::GetNode(log_reader_factory.configuration(), "pi2");
+
+  LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
+  LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
+  LOG(INFO) << "now pi1 "
+            << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
+  LOG(INFO) << "now pi2 "
+            << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
+
+  EXPECT_THAT(reader.LoggedNodes(),
+              ::testing::ElementsAre(
+                  configuration::GetNode(reader.logged_configuration(), pi1),
+                  configuration::GetNode(reader.logged_configuration(), pi2)));
+
+  reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
+
+  const FlatbufferDetachedBuffer<Configuration> partial_configuration_buffer =
+      configuration::GetPartialConfiguration(
+          *reader.event_loop_factory()->configuration(),
+          [](const Channel &channel) {
+            if (channel.name()->string_view().starts_with("/original/")) {
+              LOG(INFO) << "Omitting channel from save_log, channel: "
+                        << channel.name()->string_view() << ", "
+                        << channel.type()->string_view();
+              return false;
+            }
+            return true;
+          });
+
+  // And confirm we can re-create a log again, while checking the contents.
+  std::vector<std::string> log_files;
+  {
+    const Configuration *partial_configuration =
+        &(partial_configuration_buffer.message());
+
+    LoggerState pi1_logger =
+        MakeLogger(log_reader_factory.GetNodeEventLoopFactory("pi1"),
+                   &log_reader_factory, partial_configuration);
+    LoggerState pi2_logger =
+        MakeLogger(log_reader_factory.GetNodeEventLoopFactory("pi2"),
+                   &log_reader_factory, partial_configuration);
+
+    pi1_logger.StartLogger(tmp_dir_ + "/logs/relogged1");
+    pi2_logger.StartLogger(tmp_dir_ + "/logs/relogged2");
+
+    log_reader_factory.Run();
+
+    for (auto &x : pi1_logger.log_namer->all_filenames()) {
+      log_files.emplace_back(absl::StrCat(tmp_dir_, "/logs/relogged1_", x));
+    }
+    for (auto &x : pi2_logger.log_namer->all_filenames()) {
+      log_files.emplace_back(absl::StrCat(tmp_dir_, "/logs/relogged2_", x));
+    }
+  }
+
+  reader.Deregister();
+
+  // And verify that we can run the LogReader over the relogged files without
+  // hitting any fatal errors.
+  {
+    auto sorted_parts = SortParts(log_files);
+    EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
+    LogReader relogged_reader(sorted_parts);
+    relogged_reader.Register();
+
+    relogged_reader.event_loop_factory()->Run();
+  }
+}
+
 // Tests that we properly replay a log where the start time for a node is
 // before any data on the node.  This can happen if the logger starts before
 // data is published.  While the scenario below is a bit convoluted, we have