Fix AOS logging when using non-EventLoop configuration
Fixed crash in aos::Logger when reading back a log that was made
by providing a different configuration to the Logger than was
used by the event loop.
Change event_loop_to_logged_channel_index_ to use
`std::optional<uint32_t>` instead of `int`. Channel indices
should be uint32_t. Changed to optional because the channel can
be missing from the configuration specified in the logger
Constructor.
Change-Id: I5203b390689cab27cd79ee35b029cfdd0f98dfd8
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/configuration.cc b/aos/configuration.cc
index 6cae0d4..4de5332 100644
--- a/aos/configuration.cc
+++ b/aos/configuration.cc
@@ -1729,5 +1729,39 @@
return MergeWithConfig(config, new_channel_config);
}
+FlatbufferDetachedBuffer<Configuration> GetPartialConfiguration(
+ const Configuration &configuration,
+ std::function<bool(const Channel &)> should_include_channel) {
+ // create new_configuration1, containing everything except the `channels`
+ // field.
+ FlatbufferDetachedBuffer<Configuration> new_configuration1 =
+ RecursiveCopyFlatBuffer(&configuration);
+ new_configuration1.mutable_message()->clear_channels();
+
+ // create new_configuration2, containing only the `channels` field.
+ flatbuffers::FlatBufferBuilder fbb;
+ std::vector<flatbuffers::Offset<Channel>> new_channels_vec;
+ for (const auto &channel : *configuration.channels()) {
+ CHECK_NOTNULL(channel);
+ if (should_include_channel(*channel)) {
+ new_channels_vec.push_back(RecursiveCopyFlatBuffer(channel, &fbb));
+ }
+ }
+ flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Channel>>>
+ new_channels_offset = fbb.CreateVector(new_channels_vec);
+ Configuration::Builder new_configuration2_builder(fbb);
+ new_configuration2_builder.add_channels(new_channels_offset);
+ fbb.Finish(new_configuration2_builder.Finish());
+ FlatbufferDetachedBuffer<Configuration> new_configuration2 = fbb.Release();
+
+ // Merge the configuration containing channels with the configuration
+ // containing everything else, creating a complete configuration.
+ const aos::FlatbufferDetachedBuffer<Configuration> raw_subset_configuration =
+ MergeFlatBuffers(&new_configuration1.message(),
+ &new_configuration2.message());
+
+ // Use MergeConfiguration to clean up redundant schemas.
+ return configuration::MergeConfiguration(raw_subset_configuration);
+}
} // namespace configuration
} // namespace aos
diff --git a/aos/configuration.h b/aos/configuration.h
index 01b9e7c..7ef2e6e 100644
--- a/aos/configuration.h
+++ b/aos/configuration.h
@@ -248,6 +248,12 @@
aos::FlatbufferVector<reflection::Schema> schema,
const aos::Node *source_node = nullptr, ChannelT overrides = {});
+// Build a new configuration that only contains the channels we want to
+// include. This is useful for excluding obsolete or deprecated channels, so
+// they don't appear in the configuration when reading the log.
+FlatbufferDetachedBuffer<Configuration> GetPartialConfiguration(
+ const Configuration &configuration,
+ std::function<bool(const Channel &)> should_include_channel);
} // namespace configuration
// Compare and equality operators for Channel. Note: these only check the name
diff --git a/aos/configuration_test.cc b/aos/configuration_test.cc
index e5fe303..eb08d78 100644
--- a/aos/configuration_test.cc
+++ b/aos/configuration_test.cc
@@ -1166,4 +1166,68 @@
ASSERT_EQ(971, channel->frequency());
}
+// Create a new configuration with the specified channel removed.
+// Initially there must be exactly one channel in the base_config that matches
+// the criteria. Check to make sure the new configuration has one less channel,
+// and that channel is the specified channel.
+void TestGetPartialConfiguration(const Configuration &base_config,
+ std::string_view test_channel_name,
+ std::string_view test_channel_type) {
+ const Channel *channel_from_base_config = GetChannel(
+ &base_config, test_channel_name, test_channel_type, "", nullptr);
+ ASSERT_TRUE(channel_from_base_config != nullptr);
+
+ const FlatbufferDetachedBuffer<Configuration> new_config =
+ configuration::GetPartialConfiguration(
+ base_config,
+ // should_include_channel function
+ [test_channel_name, test_channel_type](const Channel &channel) {
+ if (channel.name()->string_view() == test_channel_name &&
+ channel.type()->string_view() == test_channel_type) {
+ LOG(INFO) << "Omitting channel from save_log, channel: "
+ << channel.name()->string_view() << ", "
+ << channel.type()->string_view();
+ return false;
+ }
+ return true;
+ });
+
+ EXPECT_EQ(new_config.message().channels()->size(),
+ base_config.channels()->size() - 1);
+
+ channel_from_base_config = GetChannel(&base_config, test_channel_name,
+ test_channel_type, "", nullptr);
+ EXPECT_TRUE(channel_from_base_config != nullptr);
+
+ const Channel *channel_from_new_config =
+ GetChannel(new_config, test_channel_name, test_channel_type, "", nullptr);
+ EXPECT_TRUE(channel_from_new_config == nullptr);
+}
+
+// Tests that we can use a utility to remove individual channels from a
+// single-node config.
+TEST_F(ConfigurationTest, RemoveChannelsFromConfigSingleNode) {
+ FlatbufferDetachedBuffer<Configuration> base_config =
+ ReadConfig(ArtifactPath("aos/testdata/config1.json"));
+
+ constexpr std::string_view test_channel_name = "/foo2";
+ constexpr std::string_view test_channel_type = ".aos.bar";
+
+ TestGetPartialConfiguration(base_config.message(), test_channel_name,
+ test_channel_type);
+}
+
+// Tests that we can use a utility to remove individual channels from a
+// multi-node config.
+TEST_F(ConfigurationTest, RemoveChannelsFromConfigMultiNode) {
+ FlatbufferDetachedBuffer<Configuration> base_config =
+ ReadConfig(ArtifactPath("aos/testdata/good_multinode.json"));
+
+ constexpr std::string_view test_channel_name = "/batman";
+ constexpr std::string_view test_channel_type = ".aos.baz";
+
+ TestGetPartialConfiguration(base_config.message(), test_channel_name,
+ test_channel_type);
+}
+
} // namespace aos::configuration::testing
diff --git a/aos/events/logging/log_writer.cc b/aos/events/logging/log_writer.cc
index f840c22..ece4055 100644
--- a/aos/events/logging/log_writer.cc
+++ b/aos/events/logging/log_writer.cc
@@ -41,23 +41,7 @@
// from the channel index that the event loop uses to the channel index in
// the config in the log file.
event_loop_to_logged_channel_index_.resize(
- event_loop->configuration()->channels()->size(), -1);
- for (size_t event_loop_channel_index = 0;
- event_loop_channel_index <
- event_loop->configuration()->channels()->size();
- ++event_loop_channel_index) {
- const Channel *event_loop_channel =
- event_loop->configuration()->channels()->Get(event_loop_channel_index);
-
- const Channel *logged_channel = aos::configuration::GetChannel(
- configuration_, event_loop_channel->name()->string_view(),
- event_loop_channel->type()->string_view(), "", node_);
-
- if (logged_channel != nullptr) {
- event_loop_to_logged_channel_index_[event_loop_channel_index] =
- configuration::ChannelIndex(configuration_, logged_channel);
- }
- }
+ event_loop->configuration()->channels()->size());
// Map to match source channels with the timestamp logger, if the contents
// should be reliable, and a list of all channels logged on it to be treated
@@ -92,6 +76,18 @@
const Channel *const timestamp_logger_channel =
finder.ForChannel(channel, connection);
+ const Channel *logged_channel = aos::configuration::GetChannel(
+ configuration_, channel->name()->string_view(),
+ channel->type()->string_view(), "", node_);
+ if (logged_channel == nullptr) {
+ // The channel doesn't exist in configuration_, so don't log it.
+ continue;
+ }
+ if (!should_log(logged_channel)) {
+ // The channel didn't pass our `should_log` check, so don't log it.
+ continue;
+ }
+
auto it = timestamp_logger_channels.find(timestamp_logger_channel);
if (it != timestamp_logger_channels.end()) {
CHECK(!is_split);
@@ -120,27 +116,41 @@
}
}
- for (size_t channel_index = 0;
- channel_index < configuration_->channels()->size(); ++channel_index) {
- const Channel *const config_channel =
- configuration_->channels()->Get(channel_index);
+ for (size_t event_loop_channel_index = 0;
+ event_loop_channel_index <
+ event_loop->configuration()->channels()->size();
+ ++event_loop_channel_index) {
+ const Channel *channel =
+ event_loop->configuration()->channels()->Get(event_loop_channel_index);
+
// The MakeRawFetcher method needs a channel which is in the event loop
// configuration() object, not the configuration_ object. Go look that up
// from the config.
- const Channel *channel = aos::configuration::GetChannel(
- event_loop_->configuration(), config_channel->name()->string_view(),
- config_channel->type()->string_view(), "", event_loop_->node());
- CHECK(channel != nullptr)
- << ": Failed to look up channel "
- << aos::configuration::CleanedChannelToString(config_channel);
- if (!should_log(config_channel)) {
+ const Channel *config_channel = aos::configuration::GetChannel(
+ configuration_, channel->name()->string_view(),
+ channel->type()->string_view(), "", node_);
+
+ if (config_channel == nullptr) {
+ // The channel doesn't exist in configuration_, so don't log the
+ // timestamps.
continue;
}
+ if (!should_log(config_channel)) {
+ // The channel didn't pass our `should_log` check, so don't log the
+ // timestamps.
+ continue;
+ }
+
+ const uint32_t channel_index =
+ configuration::ChannelIndex(configuration_, config_channel);
FetcherStruct fs;
fs.channel_index = channel_index;
fs.channel = channel;
+ event_loop_to_logged_channel_index_[event_loop_channel_index] =
+ channel_index;
+
const bool is_local =
configuration::ChannelIsSendableOnNode(config_channel, node_);
@@ -759,9 +769,11 @@
CHECK(msg->has_boot_uuid()) << ": " << aos::FlatbufferToJson(msg);
// Translate from the channel index that the event loop uses to the
// channel index in the log file.
- const int channel_index =
+ const std::optional<uint32_t> channel_index =
event_loop_to_logged_channel_index_[msg->channel_index()];
+ CHECK(channel_index.has_value());
+
const aos::monotonic_clock::time_point monotonic_timestamp_time =
f.fetcher->context().monotonic_event_time;
@@ -784,8 +796,8 @@
chrono::nanoseconds(msg->monotonic_sent_time())),
reliable, monotonic_timestamp_time);
- RemoteMessageCopier coppier(msg, channel_index, monotonic_timestamp_time,
- event_loop_);
+ RemoteMessageCopier coppier(msg, channel_index.value(),
+ monotonic_timestamp_time, event_loop_);
contents_writer->CopyRemoteTimestampMessage(
&coppier, UUID::FromVector(msg->boot_uuid()), start,
diff --git a/aos/events/logging/log_writer.h b/aos/events/logging/log_writer.h
index eb69d7e..a9c4cb9 100644
--- a/aos/events/logging/log_writer.h
+++ b/aos/events/logging/log_writer.h
@@ -203,9 +203,13 @@
std::unique_ptr<RawFetcher> fetcher;
bool written = false;
- // Channel index to log to.
+ // Index of the channel in the logged configuration (not necessarily the
+ // event loop configuration).
int channel_index = -1;
+
+ // Channel from the event_loop configuration.
const Channel *channel = nullptr;
+
const Node *timestamp_node = nullptr;
LogType log_type = LogType::kLogMessage;
@@ -245,7 +249,13 @@
// Vector mapping from the channel index from the event loop to the logged
// channel index.
- std::vector<int> event_loop_to_logged_channel_index_;
+ // When using the constructor that allows manually specifying the
+ // configuration, that configuration may have different channels than the
+ // event loop's configuration. When there is a channel that is included in the
+ // event loop configuration but not in the specified configuration, the value
+ // in this mapping will be nullopt for that channel. Nullopt will result in
+ // that channel not being included in the output log's configuration or data.
+ std::vector<std::optional<uint32_t>> event_loop_to_logged_channel_index_;
// Start/Restart write configuration into LogNamer space.
std::string WriteConfiguration(LogNamer *log_namer);
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 0166b72..326fa6d 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -1824,6 +1824,7 @@
size_t monotonic_remote_boot = 0xffffff;
if (msg->monotonic_remote_time.has_value()) {
+ CHECK_LT(msg->channel_index, source_node_index_.size());
const Node *node = parts().config->nodes()->Get(
source_node_index_[msg->channel_index]);
diff --git a/aos/events/logging/multinode_logger_test.cc b/aos/events/logging/multinode_logger_test.cc
index 3c438e3..427347d 100644
--- a/aos/events/logging/multinode_logger_test.cc
+++ b/aos/events/logging/multinode_logger_test.cc
@@ -3124,6 +3124,107 @@
}
}
+// Tests that we can relog with a subset of the original config. This is useful
+// for excluding obsolete or deprecated channels, so they don't appear in the
+// configuration when reading the log.
+TEST_P(MultinodeLoggerTest, LogPartialConfig) {
+ time_converter_.StartEqual();
+ {
+ LoggerState pi1_logger = MakeLogger(pi1_);
+ LoggerState pi2_logger = MakeLogger(pi2_);
+
+ event_loop_factory_.RunFor(chrono::milliseconds(95));
+
+ StartLogger(&pi1_logger);
+ StartLogger(&pi2_logger);
+
+ event_loop_factory_.RunFor(chrono::milliseconds(20000));
+ }
+
+ auto sorted_parts = SortParts(logfiles_);
+ EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
+ LogReader reader(sorted_parts);
+ reader.RemapLoggedChannel<aos::examples::Ping>("/test", "/original");
+
+ SimulatedEventLoopFactory log_reader_factory(reader.configuration());
+ log_reader_factory.set_send_delay(chrono::microseconds(0));
+
+ // This sends out the fetched messages and advances time to the start of the
+ // log file.
+ reader.Register(&log_reader_factory);
+
+ const Node *pi1 =
+ configuration::GetNode(log_reader_factory.configuration(), "pi1");
+ const Node *pi2 =
+ configuration::GetNode(log_reader_factory.configuration(), "pi2");
+
+ LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
+ LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
+ LOG(INFO) << "now pi1 "
+ << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
+ LOG(INFO) << "now pi2 "
+ << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
+
+ EXPECT_THAT(reader.LoggedNodes(),
+ ::testing::ElementsAre(
+ configuration::GetNode(reader.logged_configuration(), pi1),
+ configuration::GetNode(reader.logged_configuration(), pi2)));
+
+ reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
+
+ const FlatbufferDetachedBuffer<Configuration> partial_configuration_buffer =
+ configuration::GetPartialConfiguration(
+ *reader.event_loop_factory()->configuration(),
+ [](const Channel &channel) {
+ if (channel.name()->string_view().starts_with("/original/")) {
+ LOG(INFO) << "Omitting channel from save_log, channel: "
+ << channel.name()->string_view() << ", "
+ << channel.type()->string_view();
+ return false;
+ }
+ return true;
+ });
+
+ // And confirm we can re-create a log again, while checking the contents.
+ std::vector<std::string> log_files;
+ {
+ const Configuration *partial_configuration =
+ &(partial_configuration_buffer.message());
+
+ LoggerState pi1_logger =
+ MakeLogger(log_reader_factory.GetNodeEventLoopFactory("pi1"),
+ &log_reader_factory, partial_configuration);
+ LoggerState pi2_logger =
+ MakeLogger(log_reader_factory.GetNodeEventLoopFactory("pi2"),
+ &log_reader_factory, partial_configuration);
+
+ pi1_logger.StartLogger(tmp_dir_ + "/logs/relogged1");
+ pi2_logger.StartLogger(tmp_dir_ + "/logs/relogged2");
+
+ log_reader_factory.Run();
+
+ for (auto &x : pi1_logger.log_namer->all_filenames()) {
+ log_files.emplace_back(absl::StrCat(tmp_dir_, "/logs/relogged1_", x));
+ }
+ for (auto &x : pi2_logger.log_namer->all_filenames()) {
+ log_files.emplace_back(absl::StrCat(tmp_dir_, "/logs/relogged2_", x));
+ }
+ }
+
+ reader.Deregister();
+
+ // And verify that we can run the LogReader over the relogged files without
+ // hitting any fatal errors.
+ {
+ auto sorted_parts = SortParts(log_files);
+ EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
+ LogReader relogged_reader(sorted_parts);
+ relogged_reader.Register();
+
+ relogged_reader.event_loop_factory()->Run();
+ }
+}
+
// Tests that we properly replay a log where the start time for a node is
// before any data on the node. This can happen if the logger starts before
// data is published. While the scenario below is a bit convoluted, we have