Enable renaming logged locations
This adds two main features:
-Renaming logged locations so that they don't interfere with attempted
replay.
-Using a different (than logged) configuration for the replay so that we
can take advantage of updated schemas or the such.
Change-Id: I81a0da10fd60a32be2e7ea87ffe98d9b3043198c
diff --git a/aos/configuration.h b/aos/configuration.h
index 7bf8203..a078dea 100644
--- a/aos/configuration.h
+++ b/aos/configuration.h
@@ -49,6 +49,16 @@
const Node *node) {
return GetChannel(&config.message(), name, type, application_name, node);
}
+// Convenience wrapper for getting a channel from a specified config if you
+// already have the name/type in a Channel object--this is useful if you Channel
+// object you have does not point to memory within config.
+inline const Channel *GetChannel(const Configuration *config,
+ const Channel *channel,
+ const std::string_view application_name,
+ const Node *node) {
+ return GetChannel(config, channel->name()->string_view(),
+ channel->type()->string_view(), application_name, node);
+}
// Returns the Node out of the config with the matching name, or nullptr if it
// can't be found.
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index 3e790b6..486cda9 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -219,20 +219,40 @@
writer_->Flush();
}
-LogReader::LogReader(std::string_view filename)
- : sorted_message_reader_(filename) {
- channels_.resize(configuration()->channels()->size());
+LogReader::LogReader(std::string_view filename,
+ const Configuration *replay_configuration)
+ : sorted_message_reader_(filename),
+ replay_configuration_(replay_configuration) {
+ channels_.resize(logged_configuration()->channels()->size());
}
LogReader::~LogReader() {
Deregister();
}
-const Configuration *LogReader::configuration() const {
+const Configuration *LogReader::logged_configuration() const {
return sorted_message_reader_.configuration();
}
-const Node *LogReader::node() const { return sorted_message_reader_.node(); }
+const Configuration *LogReader::configuration() const {
+ CHECK(remapped_configuration_ != nullptr)
+ << ": Need to call Register() before the remapped config will be "
+ "generated.";
+ return remapped_configuration_;
+}
+
+const Node *LogReader::node() const {
+ // Because the Node pointer will only be valid if it actually points to memory
+ // owned by remapped_configuration_, we need to wait for the
+ // remapped_configuration_ to be populated before accessing it.
+ CHECK(remapped_configuration_ != nullptr)
+ << ": Need to call Register before the node() pointer will be valid.";
+ if (sorted_message_reader_.node() == nullptr) {
+ return nullptr;
+ }
+ return configuration::GetNode(
+ configuration(), sorted_message_reader_.node()->name()->string_view());
+}
monotonic_clock::time_point LogReader::monotonic_start_time() {
return sorted_message_reader_.monotonic_start_time();
@@ -243,6 +263,7 @@
}
void LogReader::Register() {
+ MakeRemappedConfig();
event_loop_factory_unique_ptr_ =
std::make_unique<SimulatedEventLoopFactory>(configuration(), node());
Register(event_loop_factory_unique_ptr_.get());
@@ -268,13 +289,21 @@
event_loop_->SkipTimingReport();
for (size_t i = 0; i < channels_.size(); ++i) {
- CHECK_EQ(configuration()->channels()->Get(i)->name(),
- event_loop_->configuration()->channels()->Get(i)->name());
- CHECK_EQ(configuration()->channels()->Get(i)->type(),
- event_loop_->configuration()->channels()->Get(i)->type());
+ const Channel *const original_channel =
+ logged_configuration()->channels()->Get(i);
- channels_[i] = event_loop_->MakeRawSender(
- event_loop_->configuration()->channels()->Get(i));
+ std::string_view channel_name = original_channel->name()->string_view();
+ std::string_view channel_type = original_channel->type()->string_view();
+ // If the channel is remapped, find the correct channel name to use.
+ if (remapped_channels_.count(i) > 0) {
+ VLOG(2) << "Got remapped channel on "
+ << configuration::CleanedChannelToString(original_channel);
+ channel_name = remapped_channels_[i];
+ }
+ VLOG(1) << "Going to remap channel " << channel_name << " " << channel_type;
+ channels_[i] = event_loop_->MakeRawSender(CHECK_NOTNULL(
+ configuration::GetChannel(event_loop_->configuration(), channel_name,
+ channel_type, "", nullptr)));
}
timer_handler_ = event_loop_->AddTimer([this]() {
@@ -352,5 +381,109 @@
event_loop_factory_ = nullptr;
}
+void LogReader::RemapLoggedChannel(std::string_view name, std::string_view type,
+ std::string_view add_prefix) {
+ CHECK(remapped_configuration_ == nullptr)
+ << "Must call RemapLoggedChannel before calling Register().";
+ for (size_t ii = 0; ii < logged_configuration()->channels()->size(); ++ii) {
+ const Channel *const channel = logged_configuration()->channels()->Get(ii);
+ if (channel->name()->str() == name &&
+ channel->type()->string_view() == type) {
+ CHECK_EQ(0u, remapped_channels_.count(ii))
+ << "Already remapped channel "
+ << configuration::CleanedChannelToString(channel);
+ remapped_channels_[ii] = std::string(add_prefix) + std::string(name);
+ VLOG(1) << "Remapping channel "
+ << configuration::CleanedChannelToString(channel)
+ << " to have name " << remapped_channels_[ii];
+ return;
+ }
+ }
+ LOG(FATAL) << "Unabled to locate channel with name " << name << " and type "
+ << type;
+}
+
+void LogReader::MakeRemappedConfig() {
+ // If no remapping occurred and we are using the original config, then there
+ // is nothing interesting to do here.
+ if (remapped_channels_.empty() && replay_configuration_ == nullptr) {
+ remapped_configuration_ = sorted_message_reader_.configuration();
+ return;
+ }
+ // Config to copy Channel definitions from. Use the specified
+ // replay_configuration_ if it has been provided.
+ const Configuration *const base_config = replay_configuration_ == nullptr
+ ? logged_configuration()
+ : replay_configuration_;
+ // The remapped config will be identical to the base_config, except that it
+ // will have a bunch of extra channels in the channel list, which are exact
+ // copies of the remapped channels, but with different names.
+ // Because the flatbuffers API is a pain to work with, this requires a bit of
+ // a song-and-dance to get copied over.
+ // The order of operations is to:
+ // 1) Make a flatbuffer builder for a config that will just contain a list of
+ // the new channels that we want to add.
+ // 2) For each channel that we are remapping:
+ // a) Make a buffer/builder and construct into it a Channel table that only
+ // contains the new name for the channel.
+ // b) Merge the new channel with just the name into the channel that we are
+ // trying to copy, built in the flatbuffer builder made in 1. This gives
+ // us the new channel definition that we need.
+ // 3) Using this list of offsets, build the Configuration of just new
+ // Channels.
+ // 4) Merge the Configuration with the new Channels into the base_config.
+ // 5) Call MergeConfiguration() on that result to give MergeConfiguration a
+ // chance to sanitize the config.
+
+ // This is the builder that we use for the config containing all the new
+ // channels.
+ flatbuffers::FlatBufferBuilder new_config_fbb;
+ new_config_fbb.ForceDefaults(1);
+ std::vector<flatbuffers::Offset<Channel>> channel_offsets;
+ for (auto &pair : remapped_channels_) {
+ // This is the builder that we use for creating the Channel with just the
+ // new name.
+ flatbuffers::FlatBufferBuilder new_name_fbb;
+ new_name_fbb.ForceDefaults(1);
+ const flatbuffers::Offset<flatbuffers::String> name_offset =
+ new_name_fbb.CreateString(pair.second);
+ ChannelBuilder new_name_builder(new_name_fbb);
+ new_name_builder.add_name(name_offset);
+ new_name_fbb.Finish(new_name_builder.Finish());
+ const FlatbufferDetachedBuffer<Channel> new_name = new_name_fbb.Release();
+ // Retrieve the channel that we want to copy, confirming that it is actually
+ // present in base_config.
+ const Channel *const base_channel = CHECK_NOTNULL(configuration::GetChannel(
+ base_config, logged_configuration()->channels()->Get(pair.first), "",
+ nullptr));
+ // Actually create the new channel and put it into the vector of Offsets
+ // that we will use to create the new Configuration.
+ channel_offsets.emplace_back(MergeFlatBuffers<Channel>(
+ reinterpret_cast<const flatbuffers::Table *>(base_channel),
+ reinterpret_cast<const flatbuffers::Table *>(&new_name.message()),
+ &new_config_fbb));
+ }
+ // Create the Configuration containing the new channels that we want to add.
+ const auto
+ new_name_vector_offsets = new_config_fbb.CreateVector(channel_offsets);
+ ConfigurationBuilder new_config_builder(new_config_fbb);
+ new_config_builder.add_channels(new_name_vector_offsets);
+ new_config_fbb.Finish(new_config_builder.Finish());
+ const FlatbufferDetachedBuffer<Configuration> new_name_config =
+ new_config_fbb.Release();
+ // Merge the new channels configuration into the base_config, giving us the
+ // remapped configuration.
+ remapped_configuration_buffer_ =
+ std::make_unique<FlatbufferDetachedBuffer<Configuration>>(
+ MergeFlatBuffers<Configuration>(base_config,
+ &new_name_config.message()));
+ // Call MergeConfiguration to deal with sanitizing the config.
+ remapped_configuration_buffer_ =
+ std::make_unique<FlatbufferDetachedBuffer<Configuration>>(
+ configuration::MergeConfiguration(*remapped_configuration_buffer_));
+
+ remapped_configuration_ = &remapped_configuration_buffer_->message();
+}
+
} // namespace logger
} // namespace aos
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index e5b91ac..1740b1f 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -59,7 +59,12 @@
// Replays all the channels in the logfile to the event loop.
class LogReader {
public:
- LogReader(std::string_view filename);
+ // If you want to supply a new configuration that will be used for replay
+ // (e.g., to change message rates, or to populate an updated schema), then
+ // pass it in here. It must provide all the channels that the original logged
+ // config did.
+ LogReader(std::string_view filename,
+ const Configuration *replay_configuration = nullptr);
~LogReader();
// Registers everything, but also updates the real time time in sync. Runs
@@ -81,6 +86,8 @@
void Deregister();
// Returns the configuration from the log file.
+ const Configuration *logged_configuration() const;
+ // Returns the configuration being used for replay.
const Configuration *configuration() const;
// Returns the node that this log file was created on.
@@ -90,6 +97,18 @@
monotonic_clock::time_point monotonic_start_time();
realtime_clock::time_point realtime_start_time();
+ // Causes the logger to publish the provided channel on a different name so
+ // that replayed applications can publish on the proper channel name without
+ // interference. This operates on raw channel names, without any node or
+ // application specific mappings.
+ void RemapLoggedChannel(std::string_view name, std::string_view type,
+ std::string_view add_prefix = "/original");
+ template <typename T>
+ void RemapLoggedChannel(std::string_view name,
+ std::string_view add_prefix = "/original") {
+ RemapLoggedChannel(name, T::GetFullyQualifiedName(), add_prefix);
+ }
+
SimulatedEventLoopFactory *event_loop_factory() {
return event_loop_factory_;
}
@@ -102,6 +121,9 @@
private:
// Queues at least max_out_of_order_duration_ messages into channels_.
void QueueMessages();
+ // Handle constructing a configuration with all the additional remapped
+ // channels from calls to RemapLoggedChannel.
+ void MakeRemappedConfig();
// Log chunk reader.
SortedMessageReader sorted_message_reader_;
@@ -117,6 +139,14 @@
std::unique_ptr<SimulatedEventLoopFactory> event_loop_factory_unique_ptr_;
SimulatedEventLoopFactory *event_loop_factory_ = nullptr;
+
+ // Map of channel indices to new name. The channel index will be an index into
+ // logged_configuration(), and the string key will be the name of the channel
+ // to send on instead of the logged channel name.
+ std::map<size_t, std::string> remapped_channels_;
+
+ const Configuration *remapped_configuration_ = nullptr;
+ const Configuration *replay_configuration_ = nullptr;
};
} // namespace logger
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index d96d4c6..e1808b1 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -59,10 +59,12 @@
event_loop_factory_.RunFor(chrono::milliseconds(20000));
}
- LogReader reader(logfile);
+ // Even though it doesn't make any difference here, exercise the logic for
+ // passing in a separate config.
+ LogReader reader(logfile, &config_.message());
- LOG(INFO) << "Config " << FlatbufferToJson(reader.configuration());
- EXPECT_EQ(reader.node(), nullptr);
+ // Confirm that we can remap logged channels to point to new buses.
+ reader.RemapLoggedChannel<aos::examples::Ping>("/test", "/original");
// This sends out the fetched messages and advances time to the start of the
// log file.
@@ -78,8 +80,8 @@
int ping_count = 10;
int pong_count = 10;
- // Confirm that the ping value matches.
- test_event_loop->MakeWatcher("/test",
+ // Confirm that the ping value matches in the remapped channel location.
+ test_event_loop->MakeWatcher("/original/test",
[&ping_count](const examples::Ping &ping) {
EXPECT_EQ(ping.value(), ping_count + 1);
++ping_count;