Refactor channel remapping out of LogReader
This change takes all the logic to rename and remap channels in
LogReader and places it in a new class: ConfigRemapper. LogReader still
retains the same remapping/renaming API for backwards compatibility, but
now the args are basically just passed along to ConfigRemapper. This was
done to allow configs to be remapped without a corresponding log file.
Change-Id: Ia32d8a3e640e94af0b52397ccd322149588d6da4
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/BUILD b/aos/events/logging/BUILD
index d4c361f..cafe49f 100644
--- a/aos/events/logging/BUILD
+++ b/aos/events/logging/BUILD
@@ -422,10 +422,12 @@
],
hdrs = [
"log_reader.h",
+ "replay_channels.h",
],
target_compatible_with = ["@platforms//os:linux"],
visibility = ["//visibility:public"],
deps = [
+ ":config_remapper",
":log_namer",
":log_writer",
":logfile_utils",
@@ -451,6 +453,58 @@
],
)
+cc_library(
+ name = "config_remapper",
+ srcs = [
+ "config_remapper.cc",
+ ],
+ hdrs = [
+ "config_remapper.h",
+ "replay_channels.h",
+ ],
+ target_compatible_with = ["@platforms//os:linux"],
+ visibility = ["//visibility:public"],
+ deps = [
+ ":logfile_utils",
+ ":logger_fbs",
+ "//aos/events:event_loop",
+ "//aos/network:message_bridge_server_fbs",
+ "//aos/network:multinode_timestamp_filter",
+ "//aos/network:remote_message_fbs",
+ "//aos/network:remote_message_schema",
+ "//aos/network:team_number",
+ "//aos/network:timestamp_filter",
+ "@com_github_google_flatbuffers//:flatbuffers",
+ "@com_google_absl//absl/strings",
+ ],
+)
+
+cc_test(
+ name = "config_remapper_test",
+ srcs = ["config_remapper_test.cc"],
+ copts = select({
+ "//tools:cpu_k8": ["-DLZMA=1"],
+ "//tools:cpu_arm64": ["-DLZMA=1"],
+ "//conditions:default": [],
+ }),
+ data = [
+ ":multinode_pingpong_combined_config",
+ ":multinode_pingpong_split3_config",
+ ":multinode_pingpong_split4_config",
+ ":multinode_pingpong_split4_mixed1_config",
+ ":multinode_pingpong_split4_mixed2_config",
+ ":multinode_pingpong_split4_reliable_config",
+ ":multinode_pingpong_split_config",
+ ":multinode_pingpong_triangle_split_config",
+ "//aos/events:pingpong_config",
+ ],
+ target_compatible_with = ["@platforms//os:linux"],
+ deps = [
+ ":config_remapper",
+ ":multinode_logger_test_lib",
+ ],
+)
+
cc_binary(
name = "log_cat",
srcs = [
diff --git a/aos/events/logging/config_remapper.cc b/aos/events/logging/config_remapper.cc
new file mode 100644
index 0000000..21f45d5
--- /dev/null
+++ b/aos/events/logging/config_remapper.cc
@@ -0,0 +1,679 @@
+#include "aos/events/logging/config_remapper.h"
+
+#include <vector>
+
+#include "absl/strings/escaping.h"
+#include "flatbuffers/flatbuffers.h"
+
+#include "aos/events/logging/logger_generated.h"
+#include "aos/flatbuffer_merge.h"
+#include "aos/json_to_flatbuffer.h"
+#include "aos/network/multinode_timestamp_filter.h"
+#include "aos/network/remote_message_generated.h"
+#include "aos/network/remote_message_schema.h"
+#include "aos/network/team_number.h"
+#include "aos/network/timestamp_channel.h"
+
+namespace aos {
+using message_bridge::RemoteMessage;
+
+namespace {
+// Checks if the specified channel name/type exists in the config and, depending
+// on the value of conflict_handling, calls conflict_handler or just dies.
+template <typename F>
+void CheckAndHandleRemapConflict(
+ std::string_view new_name, std::string_view new_type,
+ const Configuration *config,
+ ConfigRemapper::RemapConflict conflict_handling, F conflict_handler) {
+ const Channel *existing_channel =
+ configuration::GetChannel(config, new_name, new_type, "", nullptr, true);
+ if (existing_channel != nullptr) {
+ switch (conflict_handling) {
+ case ConfigRemapper::RemapConflict::kDisallow:
+ LOG(FATAL)
+ << "Channel "
+ << configuration::StrippedChannelToString(existing_channel)
+ << " is already used--you can't remap an original channel to it.";
+ break;
+ case ConfigRemapper::RemapConflict::kCascade:
+ VLOG(1) << "Automatically remapping "
+ << configuration::StrippedChannelToString(existing_channel)
+ << " to avoid conflicts.";
+ conflict_handler();
+ break;
+ }
+ }
+}
+} // namespace
+
+namespace configuration {
+// We don't really want to expose this publicly, but log reader doesn't really
+// want to re-implement it.
+void HandleMaps(const flatbuffers::Vector<flatbuffers::Offset<Map>> *maps,
+ std::string *name, std::string_view type, const Node *node);
+} // namespace configuration
+
+bool CompareChannels(const Channel *c,
+ ::std::pair<std::string_view, std::string_view> p) {
+ int name_compare = c->name()->string_view().compare(p.first);
+ if (name_compare == 0) {
+ return c->type()->string_view() < p.second;
+ } else if (name_compare < 0) {
+ return true;
+ } else {
+ return false;
+ }
+}
+
+bool EqualsChannels(const Channel *c,
+ ::std::pair<std::string_view, std::string_view> p) {
+ return c->name()->string_view() == p.first &&
+ c->type()->string_view() == p.second;
+}
+// Copies the channel, removing the schema as we go. If new_name is provided,
+// it is used instead of the name inside the channel. If new_type is provided,
+// it is used instead of the type in the channel.
+flatbuffers::Offset<Channel> CopyChannel(const Channel *c,
+ std::string_view new_name,
+ std::string_view new_type,
+ flatbuffers::FlatBufferBuilder *fbb) {
+ CHECK_EQ(Channel::MiniReflectTypeTable()->num_elems, 14u)
+ << ": Merging logic needs to be updated when the number of channel "
+ "fields changes.";
+
+ flatbuffers::Offset<flatbuffers::String> name_offset =
+ fbb->CreateSharedString(new_name.empty() ? c->name()->string_view()
+ : new_name);
+ flatbuffers::Offset<flatbuffers::String> type_offset =
+ fbb->CreateSharedString(new_type.empty() ? c->type()->str() : new_type);
+ flatbuffers::Offset<flatbuffers::String> source_node_offset =
+ c->has_source_node() ? fbb->CreateSharedString(c->source_node()->str())
+ : 0;
+
+ flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Connection>>>
+ destination_nodes_offset =
+ RecursiveCopyVectorTable(c->destination_nodes(), fbb);
+
+ flatbuffers::Offset<
+ flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>>
+ logger_nodes_offset = CopyVectorSharedString(c->logger_nodes(), fbb);
+
+ Channel::Builder channel_builder(*fbb);
+ channel_builder.add_name(name_offset);
+ channel_builder.add_type(type_offset);
+ if (c->has_frequency()) {
+ channel_builder.add_frequency(c->frequency());
+ }
+ if (c->has_max_size()) {
+ channel_builder.add_max_size(c->max_size());
+ }
+ if (c->has_num_senders()) {
+ channel_builder.add_num_senders(c->num_senders());
+ }
+ if (c->has_num_watchers()) {
+ channel_builder.add_num_watchers(c->num_watchers());
+ }
+ if (!source_node_offset.IsNull()) {
+ channel_builder.add_source_node(source_node_offset);
+ }
+ if (!destination_nodes_offset.IsNull()) {
+ channel_builder.add_destination_nodes(destination_nodes_offset);
+ }
+ if (c->has_logger()) {
+ channel_builder.add_logger(c->logger());
+ }
+ if (!logger_nodes_offset.IsNull()) {
+ channel_builder.add_logger_nodes(logger_nodes_offset);
+ }
+ if (c->has_read_method()) {
+ channel_builder.add_read_method(c->read_method());
+ }
+ if (c->has_num_readers()) {
+ channel_builder.add_num_readers(c->num_readers());
+ }
+ if (c->has_channel_storage_duration()) {
+ channel_builder.add_channel_storage_duration(c->channel_storage_duration());
+ }
+ return channel_builder.Finish();
+}
+
+ConfigRemapper::ConfigRemapper(const Configuration *config,
+ const Configuration *replay_config,
+ const logger::ReplayChannels *replay_channels)
+ : remapped_configuration_(config),
+ original_configuration_(config),
+ replay_configuration_(replay_config),
+ replay_channels_(replay_channels) {
+ MakeRemappedConfig();
+
+ // If any remote timestamp channel was not marked NOT_LOGGED, then remap that
+ // channel to avoid the redundant logged data. Also, this loop handles the
+ // MessageHeader to RemoteMessae name change.
+ // Note: This path is mainly for backwards compatibility reasons, and should
+ // not be necessary for any new logs.
+ for (const Node *node : configuration::GetNodes(original_configuration())) {
+ message_bridge::ChannelTimestampFinder finder(original_configuration(),
+ "log_reader", node);
+
+ absl::btree_set<std::string_view> remote_nodes;
+
+ for (const Channel *channel : *original_configuration()->channels()) {
+ if (!configuration::ChannelIsSendableOnNode(channel, node)) {
+ continue;
+ }
+ if (!channel->has_destination_nodes()) {
+ continue;
+ }
+ for (const Connection *connection : *channel->destination_nodes()) {
+ if (configuration::ConnectionDeliveryTimeIsLoggedOnNode(connection,
+ node)) {
+ // Start by seeing if the split timestamp channels are being used for
+ // this message.
+ const Channel *timestamp_channel = configuration::GetChannel(
+ original_configuration(),
+ finder.SplitChannelName(channel, connection),
+ RemoteMessage::GetFullyQualifiedName(), "", node, true);
+
+ if (timestamp_channel != nullptr) {
+ // If for some reason a timestamp channel is not NOT_LOGGED (which
+ // is unusual), then remap the channel so that the replayed channel
+ // doesn't overlap with the special separate replay we do for
+ // timestamps.
+ if (timestamp_channel->logger() != LoggerConfig::NOT_LOGGED) {
+ RemapOriginalChannel<RemoteMessage>(
+ timestamp_channel->name()->string_view(), node);
+ }
+ continue;
+ }
+
+ // Otherwise collect this one up as a node to look for a combined
+ // channel from. It is more efficient to compare nodes than channels.
+ LOG(WARNING) << "Failed to find channel "
+ << finder.SplitChannelName(channel, connection)
+ << " on node " << FlatbufferToJson(node);
+ remote_nodes.insert(connection->name()->string_view());
+ }
+ }
+ }
+
+ std::vector<const Node *> timestamp_logger_nodes =
+ configuration::TimestampNodes(original_configuration(), node);
+ for (const std::string_view remote_node : remote_nodes) {
+ const std::string channel = finder.CombinedChannelName(remote_node);
+
+ // See if the log file is an old log with logger::MessageHeader channels
+ // in it, or a newer log with RemoteMessage. If we find an older log,
+ // rename the type too along with the name.
+ if (HasChannel<logger::MessageHeader>(channel, node)) {
+ CHECK(!HasChannel<RemoteMessage>(channel, node))
+ << ": Can't have both a logger::MessageHeader and RemoteMessage "
+ "remote "
+ "timestamp channel.";
+ // In theory, we should check NOT_LOGGED like RemoteMessage and be more
+ // careful about updating the config, but there are fewer and fewer logs
+ // with logger::MessageHeader remote messages, so it isn't worth the
+ // effort.
+ RemapOriginalChannel<logger::MessageHeader>(
+ channel, node, "/original", "aos.message_bridge.RemoteMessage");
+ } else {
+ CHECK(HasChannel<RemoteMessage>(channel, node))
+ << ": Failed to find {\"name\": \"" << channel << "\", \"type\": \""
+ << RemoteMessage::GetFullyQualifiedName() << "\"} for node "
+ << node->name()->string_view();
+ // Only bother to remap if there's something on the channel. We can
+ // tell if the channel was marked NOT_LOGGED or not. This makes the
+ // config not change un-necesarily when we replay a log with NOT_LOGGED
+ // messages.
+ if (HasOriginalChannel<RemoteMessage>(channel, node)) {
+ RemapOriginalChannel<RemoteMessage>(channel, node);
+ }
+ }
+ }
+ }
+ if (replay_configuration_) {
+ CHECK_EQ(configuration::MultiNode(remapped_configuration()),
+ configuration::MultiNode(replay_configuration_))
+ << ": Log file and replay config need to both be multi or single "
+ "node.";
+ }
+}
+
+ConfigRemapper::~ConfigRemapper() {
+ // Zero out some buffers. It's easy to do use-after-frees on these, so make
+ // it more obvious.
+ if (remapped_configuration_buffer_) {
+ remapped_configuration_buffer_->Wipe();
+ }
+}
+
+const Configuration *ConfigRemapper::original_configuration() const {
+ return original_configuration_;
+}
+
+const Configuration *ConfigRemapper::remapped_configuration() const {
+ return remapped_configuration_;
+}
+
+void ConfigRemapper::set_configuration(const Configuration *configuration) {
+ remapped_configuration_ = configuration;
+}
+
+std::vector<const Channel *> ConfigRemapper::RemappedChannels() const {
+ std::vector<const Channel *> result;
+ result.reserve(remapped_channels_.size());
+ for (auto &pair : remapped_channels_) {
+ const Channel *const original_channel =
+ CHECK_NOTNULL(original_configuration()->channels()->Get(pair.first));
+
+ auto channel_iterator = std::lower_bound(
+ remapped_configuration_->channels()->cbegin(),
+ remapped_configuration_->channels()->cend(),
+ std::make_pair(std::string_view(pair.second.remapped_name),
+ original_channel->type()->string_view()),
+ CompareChannels);
+
+ CHECK(channel_iterator != remapped_configuration_->channels()->cend());
+ CHECK(EqualsChannels(
+ *channel_iterator,
+ std::make_pair(std::string_view(pair.second.remapped_name),
+ original_channel->type()->string_view())));
+ result.push_back(*channel_iterator);
+ }
+ return result;
+}
+
+const Channel *ConfigRemapper::RemapChannel(const EventLoop *event_loop,
+ const Node *node,
+ const Channel *channel) {
+ std::string_view channel_name = channel->name()->string_view();
+ std::string_view channel_type = channel->type()->string_view();
+ const int channel_index =
+ configuration::ChannelIndex(original_configuration(), channel);
+ // If the channel is remapped, find the correct channel name to use.
+ if (remapped_channels_.count(channel_index) > 0) {
+ VLOG(3) << "Got remapped channel on "
+ << configuration::CleanedChannelToString(channel);
+ channel_name = remapped_channels_[channel_index].remapped_name;
+ }
+
+ VLOG(2) << "Going to remap channel " << channel_name << " " << channel_type;
+ const Channel *remapped_channel = configuration::GetChannel(
+ remapped_configuration(), channel_name, channel_type,
+ event_loop ? event_loop->name() : "log_reader", node);
+
+ CHECK(remapped_channel != nullptr)
+ << ": Unable to send {\"name\": \"" << channel_name << "\", \"type\": \""
+ << channel_type << "\"} because it is not in the provided configuration.";
+
+ return remapped_channel;
+}
+
+void ConfigRemapper::RemapOriginalChannel(std::string_view name,
+ std::string_view type,
+ std::string_view add_prefix,
+ std::string_view new_type,
+ RemapConflict conflict_handling) {
+ RemapOriginalChannel(name, type, nullptr, add_prefix, new_type,
+ conflict_handling);
+}
+
+void ConfigRemapper::RemapOriginalChannel(std::string_view name,
+ std::string_view type,
+ const Node *node,
+ std::string_view add_prefix,
+ std::string_view new_type,
+ RemapConflict conflict_handling) {
+ if (node != nullptr) {
+ VLOG(1) << "Node is " << FlatbufferToJson(node);
+ }
+ if (replay_channels_ != nullptr) {
+ CHECK(std::find(replay_channels_->begin(), replay_channels_->end(),
+ std::make_pair(std::string{name}, std::string{type})) !=
+ replay_channels_->end())
+ << "Attempted to remap channel " << name << " " << type
+ << " which is not included in the replay channels passed to "
+ "ConfigRemapper.";
+ }
+ const Channel *remapped_channel =
+ configuration::GetChannel(original_configuration(), name, type, "", node);
+ CHECK(remapped_channel != nullptr) << ": Failed to find {\"name\": \"" << name
+ << "\", \"type\": \"" << type << "\"}";
+ VLOG(1) << "Original {\"name\": \"" << name << "\", \"type\": \"" << type
+ << "\"}";
+ VLOG(1) << "Remapped "
+ << configuration::StrippedChannelToString(remapped_channel);
+
+ // We want to make /spray on node 0 go to /0/spray by snooping the maps. And
+ // we want it to degrade if the heuristics fail to just work.
+ //
+ // The easiest way to do this is going to be incredibly specific and verbose.
+ // Look up /spray, to /0/spray. Then, prefix the result with /original to get
+ // /original/0/spray. Then, create a map from /original/spray to
+ // /original/0/spray for just the type we were asked for.
+ if (name != remapped_channel->name()->string_view()) {
+ MapT new_map;
+ new_map.match = std::make_unique<ChannelT>();
+ new_map.match->name = absl::StrCat(add_prefix, name);
+ new_map.match->type = type;
+ if (node != nullptr) {
+ new_map.match->source_node = node->name()->str();
+ }
+ new_map.rename = std::make_unique<ChannelT>();
+ new_map.rename->name =
+ absl::StrCat(add_prefix, remapped_channel->name()->string_view());
+ maps_.emplace_back(std::move(new_map));
+ }
+
+ // Then remap the original channel to the prefixed channel.
+ const size_t channel_index =
+ configuration::ChannelIndex(original_configuration(), remapped_channel);
+ CHECK_EQ(0u, remapped_channels_.count(channel_index))
+ << "Already remapped channel "
+ << configuration::CleanedChannelToString(remapped_channel);
+
+ RemappedChannel remapped_channel_struct;
+ remapped_channel_struct.remapped_name =
+ std::string(add_prefix) +
+ std::string(remapped_channel->name()->string_view());
+ remapped_channel_struct.new_type = new_type;
+ const std::string_view remapped_type = new_type.empty() ? type : new_type;
+ CheckAndHandleRemapConflict(
+ remapped_channel_struct.remapped_name, remapped_type,
+ remapped_configuration_, conflict_handling,
+ [this, &remapped_channel_struct, remapped_type, node, add_prefix,
+ conflict_handling]() {
+ RemapOriginalChannel(remapped_channel_struct.remapped_name,
+ remapped_type, node, add_prefix, "",
+ conflict_handling);
+ });
+ remapped_channels_[channel_index] = std::move(remapped_channel_struct);
+ MakeRemappedConfig();
+}
+
+void ConfigRemapper::RenameOriginalChannel(const std::string_view name,
+ const std::string_view type,
+ const std::string_view new_name,
+ const std::vector<MapT> &add_maps) {
+ RenameOriginalChannel(name, type, nullptr, new_name, add_maps);
+}
+
+void ConfigRemapper::RenameOriginalChannel(const std::string_view name,
+ const std::string_view type,
+ const Node *const node,
+ const std::string_view new_name,
+ const std::vector<MapT> &add_maps) {
+ if (node != nullptr) {
+ VLOG(1) << "Node is " << FlatbufferToJson(node);
+ }
+ // First find the channel and rename it.
+ const Channel *remapped_channel =
+ configuration::GetChannel(original_configuration(), name, type, "", node);
+ CHECK(remapped_channel != nullptr) << ": Failed to find {\"name\": \"" << name
+ << "\", \"type\": \"" << type << "\"}";
+ VLOG(1) << "Original {\"name\": \"" << name << "\", \"type\": \"" << type
+ << "\"}";
+ VLOG(1) << "Remapped "
+ << configuration::StrippedChannelToString(remapped_channel);
+
+ const size_t channel_index =
+ configuration::ChannelIndex(original_configuration(), remapped_channel);
+ CHECK_EQ(0u, remapped_channels_.count(channel_index))
+ << "Already remapped channel "
+ << configuration::CleanedChannelToString(remapped_channel);
+
+ RemappedChannel remapped_channel_struct;
+ remapped_channel_struct.remapped_name = new_name;
+ remapped_channel_struct.new_type.clear();
+ remapped_channels_[channel_index] = std::move(remapped_channel_struct);
+
+ // Then add any provided maps.
+ for (const MapT &map : add_maps) {
+ maps_.push_back(map);
+ }
+
+ // Finally rewrite the config.
+ MakeRemappedConfig();
+}
+
+void ConfigRemapper::MakeRemappedConfig() {
+ // If no remapping occurred and we are using the original config, then there
+ // is nothing interesting to do here.
+ if (remapped_channels_.empty() && replay_configuration_ == nullptr) {
+ remapped_configuration_ = original_configuration();
+ return;
+ }
+ // Config to copy Channel definitions from. Use the specified
+ // replay_configuration_ if it has been provided.
+ const Configuration *const base_config = replay_configuration_ == nullptr
+ ? original_configuration()
+ : replay_configuration_;
+
+ // Create a config with all the channels, but un-sorted/merged. Collect up
+ // the schemas while we do this. Call MergeConfiguration to sort everything,
+ // and then merge it all in together.
+
+ // This is the builder that we use for the config containing all the new
+ // channels.
+ flatbuffers::FlatBufferBuilder fbb;
+ fbb.ForceDefaults(true);
+ std::vector<flatbuffers::Offset<Channel>> channel_offsets;
+
+ CHECK_EQ(Channel::MiniReflectTypeTable()->num_elems, 14u)
+ << ": Merging logic needs to be updated when the number of channel "
+ "fields changes.";
+
+ // List of schemas.
+ std::map<std::string_view, FlatbufferVector<reflection::Schema>> schema_map;
+ // Make sure our new RemoteMessage schema is in there for old logs without it.
+ schema_map.insert(std::make_pair(
+ message_bridge::RemoteMessage::GetFullyQualifiedName(),
+ FlatbufferVector<reflection::Schema>(FlatbufferSpan<reflection::Schema>(
+ message_bridge::RemoteMessageSchema()))));
+
+ // Reconstruct the remapped channels.
+ for (auto &pair : remapped_channels_) {
+ const Channel *const c = CHECK_NOTNULL(configuration::GetChannel(
+ base_config, original_configuration()->channels()->Get(pair.first), "",
+ nullptr));
+ channel_offsets.emplace_back(
+ CopyChannel(c, pair.second.remapped_name, "", &fbb));
+
+ if (c->has_destination_nodes()) {
+ for (const Connection *connection : *c->destination_nodes()) {
+ switch (connection->timestamp_logger()) {
+ case LoggerConfig::LOCAL_LOGGER:
+ case LoggerConfig::NOT_LOGGED:
+ // There is no timestamp channel associated with this, so ignore it.
+ break;
+
+ case LoggerConfig::REMOTE_LOGGER:
+ case LoggerConfig::LOCAL_AND_REMOTE_LOGGER:
+ // We want to make a split timestamp channel regardless of what type
+ // of log this used to be. No sense propagating the single
+ // timestamp channel.
+
+ CHECK(connection->has_timestamp_logger_nodes());
+ for (const flatbuffers::String *timestamp_logger_node :
+ *connection->timestamp_logger_nodes()) {
+ const Node *node =
+ configuration::GetNode(original_configuration(),
+ timestamp_logger_node->string_view());
+ message_bridge::ChannelTimestampFinder finder(
+ original_configuration(), "log_reader", node);
+
+ // We are assuming here that all the maps are setup correctly to
+ // handle arbitrary timestamps. Apply the maps for this node to
+ // see what name this ends up with.
+ std::string name = finder.SplitChannelName(
+ pair.second.remapped_name, c->type()->str(), connection);
+ std::string unmapped_name = name;
+ configuration::HandleMaps(original_configuration()->maps(), &name,
+ "aos.message_bridge.RemoteMessage",
+ node);
+ CHECK_NE(name, unmapped_name)
+ << ": Remote timestamp channel was not remapped, this is "
+ "very fishy";
+ flatbuffers::Offset<flatbuffers::String> channel_name_offset =
+ fbb.CreateString(name);
+ flatbuffers::Offset<flatbuffers::String> channel_type_offset =
+ fbb.CreateString("aos.message_bridge.RemoteMessage");
+ flatbuffers::Offset<flatbuffers::String> source_node_offset =
+ fbb.CreateString(timestamp_logger_node->string_view());
+
+ // Now, build a channel. Don't log it, 2 senders, and match the
+ // source frequency.
+ Channel::Builder channel_builder(fbb);
+ channel_builder.add_name(channel_name_offset);
+ channel_builder.add_type(channel_type_offset);
+ channel_builder.add_source_node(source_node_offset);
+ channel_builder.add_logger(LoggerConfig::NOT_LOGGED);
+ channel_builder.add_num_senders(2);
+ if (c->has_frequency()) {
+ channel_builder.add_frequency(c->frequency());
+ }
+ if (c->has_channel_storage_duration()) {
+ channel_builder.add_channel_storage_duration(
+ c->channel_storage_duration());
+ }
+ channel_offsets.emplace_back(channel_builder.Finish());
+ }
+ break;
+ }
+ }
+ }
+ }
+
+ // Now reconstruct the original channels, translating types as needed
+ for (const Channel *c : *base_config->channels()) {
+ // Search for a mapping channel.
+ std::string_view new_type = "";
+ for (auto &pair : remapped_channels_) {
+ const Channel *const remapped_channel =
+ original_configuration()->channels()->Get(pair.first);
+ if (remapped_channel->name()->string_view() == c->name()->string_view() &&
+ remapped_channel->type()->string_view() == c->type()->string_view()) {
+ new_type = pair.second.new_type;
+ break;
+ }
+ }
+
+ // Copy everything over.
+ channel_offsets.emplace_back(CopyChannel(c, "", new_type, &fbb));
+
+ // Add the schema if it doesn't exist.
+ if (schema_map.find(c->type()->string_view()) == schema_map.end()) {
+ CHECK(c->has_schema());
+ schema_map.insert(std::make_pair(c->type()->string_view(),
+ RecursiveCopyFlatBuffer(c->schema())));
+ }
+ }
+
+ // The MergeConfiguration API takes a vector, not a map. Convert.
+ std::vector<FlatbufferVector<reflection::Schema>> schemas;
+ while (!schema_map.empty()) {
+ schemas.emplace_back(std::move(schema_map.begin()->second));
+ schema_map.erase(schema_map.begin());
+ }
+
+ // Create the Configuration containing the new channels that we want to add.
+ const flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Channel>>>
+ channels_offset =
+ channel_offsets.empty() ? 0 : fbb.CreateVector(channel_offsets);
+
+ // Copy over the old maps.
+ std::vector<flatbuffers::Offset<Map>> map_offsets;
+ if (base_config->maps()) {
+ for (const Map *map : *base_config->maps()) {
+ map_offsets.emplace_back(RecursiveCopyFlatBuffer(map, &fbb));
+ }
+ }
+
+ // Now create the new maps. These are second so they take effect first.
+ for (const MapT &map : maps_) {
+ CHECK(!map.match->name.empty());
+ const flatbuffers::Offset<flatbuffers::String> match_name_offset =
+ fbb.CreateString(map.match->name);
+ flatbuffers::Offset<flatbuffers::String> match_type_offset;
+ if (!map.match->type.empty()) {
+ match_type_offset = fbb.CreateString(map.match->type);
+ }
+ flatbuffers::Offset<flatbuffers::String> match_source_node_offset;
+ if (!map.match->source_node.empty()) {
+ match_source_node_offset = fbb.CreateString(map.match->source_node);
+ }
+ CHECK(!map.rename->name.empty());
+ const flatbuffers::Offset<flatbuffers::String> rename_name_offset =
+ fbb.CreateString(map.rename->name);
+ Channel::Builder match_builder(fbb);
+ match_builder.add_name(match_name_offset);
+ if (!match_type_offset.IsNull()) {
+ match_builder.add_type(match_type_offset);
+ }
+ if (!match_source_node_offset.IsNull()) {
+ match_builder.add_source_node(match_source_node_offset);
+ }
+ const flatbuffers::Offset<Channel> match_offset = match_builder.Finish();
+
+ Channel::Builder rename_builder(fbb);
+ rename_builder.add_name(rename_name_offset);
+ const flatbuffers::Offset<Channel> rename_offset = rename_builder.Finish();
+
+ Map::Builder map_builder(fbb);
+ map_builder.add_match(match_offset);
+ map_builder.add_rename(rename_offset);
+ map_offsets.emplace_back(map_builder.Finish());
+ }
+
+ flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Map>>>
+ maps_offsets = map_offsets.empty() ? 0 : fbb.CreateVector(map_offsets);
+
+ // And copy everything else over.
+ flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Node>>>
+ nodes_offset = RecursiveCopyVectorTable(base_config->nodes(), &fbb);
+
+ flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Application>>>
+ applications_offset =
+ RecursiveCopyVectorTable(base_config->applications(), &fbb);
+
+ // Now insert everything else in unmodified.
+ ConfigurationBuilder configuration_builder(fbb);
+ if (!channels_offset.IsNull()) {
+ configuration_builder.add_channels(channels_offset);
+ }
+ if (!maps_offsets.IsNull()) {
+ configuration_builder.add_maps(maps_offsets);
+ }
+ if (!nodes_offset.IsNull()) {
+ configuration_builder.add_nodes(nodes_offset);
+ }
+ if (!applications_offset.IsNull()) {
+ configuration_builder.add_applications(applications_offset);
+ }
+
+ if (base_config->has_channel_storage_duration()) {
+ configuration_builder.add_channel_storage_duration(
+ base_config->channel_storage_duration());
+ }
+
+ CHECK_EQ(Configuration::MiniReflectTypeTable()->num_elems, 6u)
+ << ": Merging logic needs to be updated when the number of configuration "
+ "fields changes.";
+
+ fbb.Finish(configuration_builder.Finish());
+
+ // Clean it up and return it! By using MergeConfiguration here, we'll
+ // actually get a deduplicated config for free too.
+ FlatbufferDetachedBuffer<Configuration> new_merged_config =
+ configuration::MergeConfiguration(
+ FlatbufferDetachedBuffer<Configuration>(fbb.Release()));
+
+ remapped_configuration_buffer_ =
+ std::make_unique<FlatbufferDetachedBuffer<Configuration>>(
+ configuration::MergeConfiguration(new_merged_config, schemas));
+
+ remapped_configuration_ = &remapped_configuration_buffer_->message();
+
+ // TODO(austin): Lazily re-build to save CPU?
+}
+
+} // namespace aos
diff --git a/aos/events/logging/config_remapper.h b/aos/events/logging/config_remapper.h
new file mode 100644
index 0000000..d1eb0c6
--- /dev/null
+++ b/aos/events/logging/config_remapper.h
@@ -0,0 +1,196 @@
+#ifndef AOS_EVENTS_LOGGING_CONFIG_REMAPPER_H_
+#define AOS_EVENTS_LOGGING_CONFIG_REMAPPER_H_
+
+#include <map>
+#include <string_view>
+#include <tuple>
+#include <vector>
+
+#include "flatbuffers/flatbuffers.h"
+
+#include "aos/events/event_loop.h"
+#include "aos/events/logging/logger_generated.h"
+#include "aos/events/logging/replay_channels.h"
+
+namespace aos {
+
+// This class is used for remapping and renaming channels with the passed in
+// configuration to the constructor. Typically, the templated versions of
+// RemapOriginalChannel and RenameOriginalChannel are the main functions to use
+// for type safety. After remapping and renaming, the remapped configuration can
+// be accessed through remapped_configuration(), and the original configuration
+// that is not mutated can be accessed through original_configuration.
+//
+// This class assumes no ownership over any pointers provided to it.
+//
+// Timestamp channels are automatically remapped on construction
+//
+// Note: This class does not need logfiles to function unlike LogReader. This
+// logic originally lived in LogReader and was refactored out into this class.
+// The same API for remapping and renaming still exists in LogReader which now
+// just passes along the args to this class.
+class ConfigRemapper {
+ public:
+ ConfigRemapper(const Configuration *config,
+ const Configuration *replay_config = nullptr,
+ const logger::ReplayChannels *replay_channels = nullptr);
+ ~ConfigRemapper();
+
+ // Map of channel indices to new name. The channel index will be an index into
+ // original_configuration(), and the string key will be the name of the
+ // channel to send on instead of the orignal channel name.
+ struct RemappedChannel {
+ std::string remapped_name;
+ std::string new_type;
+ };
+
+ // Enum to use for indicating how RemapOriginalChannel behaves when there is
+ // already a channel with the remapped name (e.g., as may happen when
+ // replaying a logfile that was itself generated from replay).
+ enum class RemapConflict {
+ // LOG(FATAL) on conflicts in remappings.
+ kDisallow,
+ // If we run into a conflict, attempt to remap the channel we would be
+ // overriding (and continue to do so if remapping *that* channel also
+ // generates a conflict).
+ // This will mean that if we repeatedly replay a log, we will end up
+ // stacking more and more /original's on the start of the oldest version
+ // of the channels.
+ kCascade
+ };
+
+ // Remaps a channel from the original configuration passed to the constructor
+ // to the given one. This operates on raw channel names, without any node or
+ // application specific mappings.
+ void RemapOriginalChannel(
+ std::string_view name, std::string_view type,
+ std::string_view add_prefix = "/original", std::string_view new_type = "",
+ RemapConflict conflict_handling = RemapConflict::kCascade);
+ template <typename T>
+ void RemapOriginalChannel(
+ std::string_view name, std::string_view add_prefix = "/original",
+ std::string_view new_type = "",
+ RemapConflict conflict_handling = RemapConflict::kCascade) {
+ RemapOriginalChannel(name, T::GetFullyQualifiedName(), add_prefix, new_type,
+ conflict_handling);
+ }
+
+ // Remaps the provided channel, though this respects node mappings, and
+ // preserves them too. This makes it so if /aos -> /pi1/aos on one node,
+ // /original/aos -> /original/pi1/aos on the same node after renaming, just
+ // like you would hope. If new_type is not empty, the new channel will use
+ // the provided type instead. This allows for renaming messages.
+ //
+ // TODO(austin): If you have 2 nodes remapping something to the same channel,
+ // this doesn't handle that. No use cases exist yet for that, so it isn't
+ // being done yet.
+ void RemapOriginalChannel(
+ std::string_view name, std::string_view type, const Node *node,
+ std::string_view add_prefix = "/original", std::string_view new_type = "",
+ RemapConflict conflict_handling = RemapConflict::kCascade);
+
+ template <typename T>
+ void RemapOriginalChannel(
+ std::string_view name, const Node *node,
+ std::string_view add_prefix = "/original", std::string_view new_type = "",
+ RemapConflict conflict_handling = RemapConflict::kCascade) {
+ RemapOriginalChannel(name, T::GetFullyQualifiedName(), node, add_prefix,
+ new_type, conflict_handling);
+ }
+
+ // Similar to RemapOriginalChannel(), but lets you specify a name for the new
+ // channel without constraints. By default, this will not add any maps for the
+ // new channel. Use add_maps to specify any maps you'd like added.
+ void RenameOriginalChannel(std::string_view name, std::string_view type,
+ std::string_view new_name,
+ const std::vector<MapT> &add_maps = {});
+ template <typename T>
+ void RenameOriginalChannel(std::string_view name, std::string_view new_name,
+ const std::vector<MapT> &add_maps = {}) {
+ RenameOriginalChannel(name, T::GetFullyQualifiedName(), new_name, add_maps);
+ }
+ // The following overloads are more suitable for multi-node configurations,
+ // and let you rename a channel on a specific node.
+ void RenameOriginalChannel(std::string_view name, std::string_view type,
+ const Node *node, std::string_view new_name,
+ const std::vector<MapT> &add_maps = {});
+ template <typename T>
+ void RenameOriginalChannel(std::string_view name, const Node *node,
+ std::string_view new_name,
+ const std::vector<MapT> &add_maps = {}) {
+ RenameOriginalChannel(name, T::GetFullyQualifiedName(), node, new_name,
+ add_maps);
+ }
+
+ template <typename T>
+ bool HasChannel(std::string_view name, const Node *node = nullptr) {
+ return HasChannel(name, T::GetFullyQualifiedName(), node);
+ }
+ bool HasChannel(std::string_view name, std::string_view type,
+ const Node *node) {
+ return configuration::GetChannel(original_configuration(), name, type, "",
+ node, true) != nullptr;
+ }
+
+ // Returns true if the channel exists on the node and was in the original
+ // config
+ template <typename T>
+ bool HasOriginalChannel(std::string_view name, const Node *node = nullptr) {
+ const Channel *channel =
+ configuration::GetChannel(original_configuration(), name,
+ T::GetFullyQualifiedName(), "", node, true);
+ if (channel == nullptr) return false;
+ return channel->logger() != LoggerConfig::NOT_LOGGED;
+ }
+
+ template <typename T>
+ void MaybeRemapOriginalChannel(std::string_view name,
+ const Node *node = nullptr) {
+ if (HasChannel<T>(name, node)) {
+ RemapOriginalChannel<T>(name, node);
+ }
+ }
+ template <typename T>
+ void MaybeRenameOriginalChannel(std::string_view name, const Node *node,
+ std::string_view new_name,
+ const std::vector<MapT> &add_maps = {}) {
+ if (HasChannel<T>(name, node)) {
+ RenameOriginalChannel<T>(name, node, new_name, add_maps);
+ }
+ }
+
+ const Channel *RemapChannel(const EventLoop *event_loop, const Node *node,
+ const Channel *channel);
+ // Returns a list of all the original channels from remapping.
+ std::vector<const Channel *> RemappedChannels() const;
+
+ void set_configuration(const Configuration *configuration);
+
+ // Returns the configuration that was originally passed to the constructor.
+ // This class does not own this pointer.
+ const Configuration *original_configuration() const;
+
+ // Returns the configuration that contains the remapping and renamings done on
+ // the original configuration. The pointer is invalidated whenever
+ // RemapOriginalChannel is called.
+ const Configuration *remapped_configuration() const;
+
+ private:
+ // Handle constructing a configuration with all the additional remapped
+ // channels from calls to RemapOriginalChannel.
+ void MakeRemappedConfig();
+
+ std::map<size_t, RemappedChannel> remapped_channels_;
+ std::vector<MapT> maps_;
+ std::unique_ptr<FlatbufferDetachedBuffer<Configuration>>
+ remapped_configuration_buffer_;
+
+ const Configuration *remapped_configuration_ = nullptr;
+ const Configuration *original_configuration_ = nullptr;
+ const Configuration *replay_configuration_ = nullptr;
+
+ const logger::ReplayChannels *replay_channels_ = nullptr;
+}; // class ConfigRemapper
+
+} // namespace aos
+#endif // AOS_EVENTS_LOGGING_CONFIG_REMAPPER_H_
diff --git a/aos/events/logging/config_remapper_test.cc b/aos/events/logging/config_remapper_test.cc
new file mode 100644
index 0000000..9649f19
--- /dev/null
+++ b/aos/events/logging/config_remapper_test.cc
@@ -0,0 +1,97 @@
+#include "aos/events/logging/config_remapper.h"
+
+#include "gmock/gmock.h"
+#include "gtest/gtest.h"
+
+#include "aos/events/event_loop_generated.h"
+#include "aos/events/logging/log_reader.h"
+#include "aos/events/logging/multinode_logger_test_lib.h"
+#include "aos/events/message_counter.h"
+#include "aos/events/ping_lib.h"
+#include "aos/events/pong_lib.h"
+#include "aos/network/remote_message_generated.h"
+#include "aos/network/timestamp_generated.h"
+#include "aos/testing/tmpdir.h"
+#include "multinode_logger_test_lib.h"
+
+namespace aos {
+namespace testing {
+using namespace logger::testing;
+using namespace logger;
+namespace chrono = std::chrono;
+
+using ConfigRemapperTest = MultinodeLoggerTest;
+
+INSTANTIATE_TEST_SUITE_P(
+ All, ConfigRemapperTest,
+ ::testing::Combine(
+ ::testing::Values(
+ ConfigParams{"multinode_pingpong_combined_config.json", true,
+ kCombinedConfigSha1(), kCombinedConfigSha1()},
+ ConfigParams{"multinode_pingpong_split_config.json", false,
+ kSplitConfigSha1(), kReloggedSplitConfigSha1()}),
+ ::testing::ValuesIn(SupportedCompressionAlgorithms())));
+
+// Tests that we can read a config and remap a channel
+TEST_P(ConfigRemapperTest, RemapOriginalChannel) {
+ ConfigRemapper remapper(&config_.message());
+
+ remapper.RemapOriginalChannel<examples::Ping>("/test");
+
+ const Channel *channel = configuration::GetChannel<examples::Ping>(
+ remapper.remapped_configuration(), "/original/test", "pi1", nullptr);
+ EXPECT_NE(channel, nullptr);
+ EXPECT_EQ(channel->name()->string_view(), "/original/test");
+ EXPECT_EQ(channel->type()->string_view(), "aos.examples.Ping");
+}
+
+// Tests that we can read a config and rename a channel
+TEST_P(ConfigRemapperTest, RenameOriginalChannel) {
+ ConfigRemapper remapper(&config_.message());
+
+ remapper.RenameOriginalChannel<examples::Ping>("/test", "/original/test");
+
+ const Channel *channel = configuration::GetChannel<examples::Ping>(
+ remapper.remapped_configuration(), "/original/test", "pi1", nullptr);
+ EXPECT_NE(channel, nullptr);
+ EXPECT_EQ(channel->name()->string_view(), "/original/test");
+ EXPECT_EQ(channel->type()->string_view(), "aos.examples.Ping");
+}
+
+// Tests that we can remap a channel specifying a certain node
+TEST_P(ConfigRemapperTest, RemapOriginalChannelWithNode) {
+ ConfigRemapper remapper(&config_.message());
+
+ const Node *node =
+ configuration::GetNode(remapper.remapped_configuration(), "pi1");
+
+ // Remap just on pi1.
+ remapper.RemapOriginalChannel<aos::timing::Report>("/aos", node);
+
+ const Channel *channel = configuration::GetChannel<aos::timing::Report>(
+ remapper.remapped_configuration(), "/original/pi1/aos", "pi1", node);
+ EXPECT_NE(channel, nullptr);
+ EXPECT_EQ(channel->name()->string_view(), "/original/pi1/aos");
+ EXPECT_EQ(channel->type()->string_view(), "aos.timing.Report");
+}
+
+// Tests that we can rename a channel specifying a certain node
+TEST_P(ConfigRemapperTest, RenameOriginalChannelWithNode) {
+ ConfigRemapper remapper(&config_.message());
+
+ const Node *node =
+ configuration::GetNode(remapper.remapped_configuration(), "pi1");
+
+ // Rename just on pi1.
+ remapper.RenameOriginalChannel<aos::timing::Report>("/aos", node,
+ "/original/pi1/aos");
+
+ const Channel *channel = configuration::GetChannel<aos::timing::Report>(
+ remapper.remapped_configuration(), "/original/pi1/aos", "pi1", node);
+ EXPECT_NE(channel, nullptr);
+ EXPECT_EQ(channel->name()->string_view(), "/original/pi1/aos");
+ EXPECT_EQ(channel->type()->string_view(), "aos.timing.Report");
+}
+
+} // namespace testing
+} // namespace aos
diff --git a/aos/events/logging/log_reader.cc b/aos/events/logging/log_reader.cc
index e3dd904..b18873e 100644
--- a/aos/events/logging/log_reader.cc
+++ b/aos/events/logging/log_reader.cc
@@ -73,91 +73,6 @@
namespace logger {
namespace {
-bool CompareChannels(const Channel *c,
- ::std::pair<std::string_view, std::string_view> p) {
- int name_compare = c->name()->string_view().compare(p.first);
- if (name_compare == 0) {
- return c->type()->string_view() < p.second;
- } else if (name_compare < 0) {
- return true;
- } else {
- return false;
- }
-}
-
-bool EqualsChannels(const Channel *c,
- ::std::pair<std::string_view, std::string_view> p) {
- return c->name()->string_view() == p.first &&
- c->type()->string_view() == p.second;
-}
-
-// Copies the channel, removing the schema as we go. If new_name is provided,
-// it is used instead of the name inside the channel. If new_type is provided,
-// it is used instead of the type in the channel.
-flatbuffers::Offset<Channel> CopyChannel(const Channel *c,
- std::string_view new_name,
- std::string_view new_type,
- flatbuffers::FlatBufferBuilder *fbb) {
- CHECK_EQ(Channel::MiniReflectTypeTable()->num_elems, 14u)
- << ": Merging logic needs to be updated when the number of channel "
- "fields changes.";
-
- flatbuffers::Offset<flatbuffers::String> name_offset =
- fbb->CreateSharedString(new_name.empty() ? c->name()->string_view()
- : new_name);
- flatbuffers::Offset<flatbuffers::String> type_offset =
- fbb->CreateSharedString(new_type.empty() ? c->type()->str() : new_type);
- flatbuffers::Offset<flatbuffers::String> source_node_offset =
- c->has_source_node() ? fbb->CreateSharedString(c->source_node()->str())
- : 0;
-
- flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Connection>>>
- destination_nodes_offset =
- aos::RecursiveCopyVectorTable(c->destination_nodes(), fbb);
-
- flatbuffers::Offset<
- flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>>
- logger_nodes_offset = aos::CopyVectorSharedString(c->logger_nodes(), fbb);
-
- Channel::Builder channel_builder(*fbb);
- channel_builder.add_name(name_offset);
- channel_builder.add_type(type_offset);
- if (c->has_frequency()) {
- channel_builder.add_frequency(c->frequency());
- }
- if (c->has_max_size()) {
- channel_builder.add_max_size(c->max_size());
- }
- if (c->has_num_senders()) {
- channel_builder.add_num_senders(c->num_senders());
- }
- if (c->has_num_watchers()) {
- channel_builder.add_num_watchers(c->num_watchers());
- }
- if (!source_node_offset.IsNull()) {
- channel_builder.add_source_node(source_node_offset);
- }
- if (!destination_nodes_offset.IsNull()) {
- channel_builder.add_destination_nodes(destination_nodes_offset);
- }
- if (c->has_logger()) {
- channel_builder.add_logger(c->logger());
- }
- if (!logger_nodes_offset.IsNull()) {
- channel_builder.add_logger_nodes(logger_nodes_offset);
- }
- if (c->has_read_method()) {
- channel_builder.add_read_method(c->read_method());
- }
- if (c->has_num_readers()) {
- channel_builder.add_num_readers(c->num_readers());
- }
- if (c->has_channel_storage_duration()) {
- channel_builder.add_channel_storage_duration(c->channel_storage_duration());
- }
- return channel_builder.Finish();
-}
-
namespace chrono = std::chrono;
using message_bridge::RemoteMessage;
} // namespace
@@ -259,7 +174,9 @@
const ReplayChannels *replay_channels)
: log_files_(std::move(log_files)),
replay_configuration_(replay_configuration),
- replay_channels_(replay_channels) {
+ replay_channels_(replay_channels),
+ config_remapper_(log_files_.config(), replay_configuration_,
+ replay_channels_) {
SetStartTime(FLAGS_start_time);
SetEndTime(FLAGS_end_time);
@@ -274,95 +191,6 @@
"no messages will get replayed.";
}
- MakeRemappedConfig();
-
- // Remap all existing remote timestamp channels. They will be recreated, and
- // the data logged isn't relevant anymore.
- for (const Node *node : configuration::GetNodes(logged_configuration())) {
- message_bridge::ChannelTimestampFinder finder(logged_configuration(),
- "log_reader", node);
-
- absl::btree_set<std::string_view> remote_nodes;
-
- for (const Channel *channel : *logged_configuration()->channels()) {
- if (!configuration::ChannelIsSendableOnNode(channel, node)) {
- continue;
- }
- if (!channel->has_destination_nodes()) {
- continue;
- }
- for (const Connection *connection : *channel->destination_nodes()) {
- if (configuration::ConnectionDeliveryTimeIsLoggedOnNode(connection,
- node)) {
- // Start by seeing if the split timestamp channels are being used for
- // this message. If so, remap them.
- const Channel *timestamp_channel = configuration::GetChannel(
- logged_configuration(),
- finder.SplitChannelName(channel, connection),
- RemoteMessage::GetFullyQualifiedName(), "", node, true);
-
- if (timestamp_channel != nullptr) {
- // If for some reason a timestamp channel is not NOT_LOGGED (which
- // is unusual), then remap the channel so that the replayed channel
- // doesn't overlap with the special separate replay we do for
- // timestamps.
- if (timestamp_channel->logger() != LoggerConfig::NOT_LOGGED) {
- RemapLoggedChannel<RemoteMessage>(
- timestamp_channel->name()->string_view(), node);
- }
- continue;
- }
-
- // Otherwise collect this one up as a node to look for a combined
- // channel from. It is more efficient to compare nodes than channels.
- LOG(WARNING) << "Failed to find channel "
- << finder.SplitChannelName(channel, connection)
- << " on node " << aos::FlatbufferToJson(node);
- remote_nodes.insert(connection->name()->string_view());
- }
- }
- }
-
- std::vector<const Node *> timestamp_logger_nodes =
- configuration::TimestampNodes(logged_configuration(), node);
- for (const std::string_view remote_node : remote_nodes) {
- const std::string channel = finder.CombinedChannelName(remote_node);
-
- // See if the log file is an old log with MessageHeader channels in it, or
- // a newer log with RemoteMessage. If we find an older log, rename the
- // type too along with the name.
- if (HasChannel<MessageHeader>(channel, node)) {
- CHECK(!HasChannel<RemoteMessage>(channel, node))
- << ": Can't have both a MessageHeader and RemoteMessage remote "
- "timestamp channel.";
- // In theory, we should check NOT_LOGGED like RemoteMessage and be more
- // careful about updating the config, but there are fewer and fewer logs
- // with MessageHeader remote messages, so it isn't worth the effort.
- RemapLoggedChannel<MessageHeader>(channel, node, "/original",
- "aos.message_bridge.RemoteMessage");
- } else {
- CHECK(HasChannel<RemoteMessage>(channel, node))
- << ": Failed to find {\"name\": \"" << channel << "\", \"type\": \""
- << RemoteMessage::GetFullyQualifiedName() << "\"} for node "
- << node->name()->string_view();
- // Only bother to remap if there's something on the channel. We can
- // tell if the channel was marked NOT_LOGGED or not. This makes the
- // config not change un-necesarily when we replay a log with NOT_LOGGED
- // messages.
- if (HasLoggedChannel<RemoteMessage>(channel, node)) {
- RemapLoggedChannel<RemoteMessage>(channel, node);
- }
- }
- }
- }
-
- if (replay_configuration) {
- CHECK_EQ(configuration::MultiNode(configuration()),
- configuration::MultiNode(replay_configuration))
- << ": Log file and replay config need to both be multi or single "
- "node.";
- }
-
if (!configuration::MultiNode(configuration())) {
states_.resize(1);
} else {
@@ -392,19 +220,14 @@
LOG(FATAL) << "Must call Deregister before the SimulatedEventLoopFactory "
"is destroyed";
}
- // Zero out some buffers. It's easy to do use-after-frees on these, so make
- // it more obvious.
- if (remapped_configuration_buffer_) {
- remapped_configuration_buffer_->Wipe();
- }
}
const Configuration *LogReader::logged_configuration() const {
- return log_files_.config();
+ return config_remapper_.original_configuration();
}
const Configuration *LogReader::configuration() const {
- return remapped_configuration_;
+ return config_remapper_.remapped_configuration();
}
std::vector<const Node *> LogReader::LoggedNodes() const {
@@ -601,7 +424,7 @@
void LogReader::RegisterWithoutStarting(
SimulatedEventLoopFactory *event_loop_factory) {
event_loop_factory_ = event_loop_factory;
- remapped_configuration_ = event_loop_factory_->configuration();
+ config_remapper_.set_configuration(event_loop_factory_->configuration());
filters_ =
std::make_unique<message_bridge::MultiNodeNoncausalOffsetEstimator>(
event_loop_factory_->configuration(), logged_configuration(),
@@ -705,7 +528,7 @@
states_[configuration::GetNodeIndex(configuration(), node)].get();
const Channel *remapped_channel =
- RemapChannel(state->event_loop(), node, channel);
+ config_remapper_.RemapChannel(state->event_loop(), node, channel);
event_loop_factory_->DisableForwarding(remapped_channel);
}
@@ -889,7 +712,7 @@
for (size_t logged_channel_index = 0;
logged_channel_index < logged_configuration()->channels()->size();
++logged_channel_index) {
- const Channel *channel = RemapChannel(
+ const Channel *channel = config_remapper_.RemapChannel(
event_loop, node,
logged_configuration()->channels()->Get(logged_channel_index));
@@ -1303,22 +1126,21 @@
// Checks if the specified channel name/type exists in the config and, depending
// on the value of conflict_handling, calls conflict_handler or just dies.
template <typename F>
-void CheckAndHandleRemapConflict(std::string_view new_name,
- std::string_view new_type,
- const Configuration *config,
- LogReader::RemapConflict conflict_handling,
- F conflict_handler) {
+void CheckAndHandleRemapConflict(
+ std::string_view new_name, std::string_view new_type,
+ const Configuration *config,
+ ConfigRemapper::RemapConflict conflict_handling, F conflict_handler) {
const Channel *existing_channel =
configuration::GetChannel(config, new_name, new_type, "", nullptr, true);
if (existing_channel != nullptr) {
switch (conflict_handling) {
- case LogReader::RemapConflict::kDisallow:
+ case ConfigRemapper::RemapConflict::kDisallow:
LOG(FATAL)
<< "Channel "
<< configuration::StrippedChannelToString(existing_channel)
<< " is already used--you can't remap a logged channel to it.";
break;
- case LogReader::RemapConflict::kCascade:
+ case ConfigRemapper::RemapConflict::kCascade:
LOG(INFO) << "Automatically remapping "
<< configuration::StrippedChannelToString(existing_channel)
<< " to avoid conflicts.";
@@ -1329,88 +1151,29 @@
}
} // namespace
-void LogReader::RemapLoggedChannel(std::string_view name, std::string_view type,
- std::string_view add_prefix,
- std::string_view new_type,
- RemapConflict conflict_handling) {
- RemapLoggedChannel(name, type, nullptr, add_prefix, new_type,
- conflict_handling);
+void LogReader::RemapLoggedChannel(
+ std::string_view name, std::string_view type, std::string_view add_prefix,
+ std::string_view new_type,
+ ConfigRemapper::RemapConflict conflict_handling) {
+ CheckEventsAreNotScheduled();
+ config_remapper_.RemapOriginalChannel(name, type, nullptr, add_prefix,
+ new_type, conflict_handling);
}
-void LogReader::RemapLoggedChannel(std::string_view name, std::string_view type,
- const Node *node,
- std::string_view add_prefix,
- std::string_view new_type,
- RemapConflict conflict_handling) {
- if (node != nullptr) {
- VLOG(1) << "Node is " << aos::FlatbufferToJson(node);
- }
- if (replay_channels_ != nullptr) {
- CHECK(std::find(replay_channels_->begin(), replay_channels_->end(),
- std::make_pair(std::string{name}, std::string{type})) !=
- replay_channels_->end())
- << "Attempted to remap channel " << name << " " << type
- << " which is not included in the replay channels passed to LogReader.";
- }
- const Channel *remapped_channel =
- configuration::GetChannel(logged_configuration(), name, type, "", node);
- CHECK(remapped_channel != nullptr) << ": Failed to find {\"name\": \"" << name
- << "\", \"type\": \"" << type << "\"}";
- VLOG(1) << "Original {\"name\": \"" << name << "\", \"type\": \"" << type
- << "\"}";
- VLOG(1) << "Remapped "
- << aos::configuration::StrippedChannelToString(remapped_channel);
-
- // We want to make /spray on node 0 go to /0/spray by snooping the maps. And
- // we want it to degrade if the heuristics fail to just work.
- //
- // The easiest way to do this is going to be incredibly specific and verbose.
- // Look up /spray, to /0/spray. Then, prefix the result with /original to get
- // /original/0/spray. Then, create a map from /original/spray to
- // /original/0/spray for just the type we were asked for.
- if (name != remapped_channel->name()->string_view()) {
- MapT new_map;
- new_map.match = std::make_unique<ChannelT>();
- new_map.match->name = absl::StrCat(add_prefix, name);
- new_map.match->type = type;
- if (node != nullptr) {
- new_map.match->source_node = node->name()->str();
- }
- new_map.rename = std::make_unique<ChannelT>();
- new_map.rename->name =
- absl::StrCat(add_prefix, remapped_channel->name()->string_view());
- maps_.emplace_back(std::move(new_map));
- }
-
- // Then remap the logged channel to the prefixed channel.
- const size_t channel_index =
- configuration::ChannelIndex(logged_configuration(), remapped_channel);
- CHECK_EQ(0u, remapped_channels_.count(channel_index))
- << "Already remapped channel "
- << configuration::CleanedChannelToString(remapped_channel);
-
- RemappedChannel remapped_channel_struct;
- remapped_channel_struct.remapped_name =
- std::string(add_prefix) +
- std::string(remapped_channel->name()->string_view());
- remapped_channel_struct.new_type = new_type;
- const std::string_view remapped_type = new_type.empty() ? type : new_type;
- CheckAndHandleRemapConflict(
- remapped_channel_struct.remapped_name, remapped_type,
- remapped_configuration_, conflict_handling,
- [this, &remapped_channel_struct, remapped_type, node, add_prefix,
- conflict_handling]() {
- RemapLoggedChannel(remapped_channel_struct.remapped_name, remapped_type,
- node, add_prefix, "", conflict_handling);
- });
- remapped_channels_[channel_index] = std::move(remapped_channel_struct);
- MakeRemappedConfig();
+void LogReader::RemapLoggedChannel(
+ std::string_view name, std::string_view type, const Node *node,
+ std::string_view add_prefix, std::string_view new_type,
+ ConfigRemapper::RemapConflict conflict_handling) {
+ CheckEventsAreNotScheduled();
+ config_remapper_.RemapOriginalChannel(name, type, node, add_prefix, new_type,
+ conflict_handling);
}
void LogReader::RenameLoggedChannel(const std::string_view name,
const std::string_view type,
const std::string_view new_name,
const std::vector<MapT> &add_maps) {
+ CheckEventsAreNotScheduled();
RenameLoggedChannel(name, type, nullptr, new_name, add_maps);
}
@@ -1419,284 +1182,17 @@
const Node *const node,
const std::string_view new_name,
const std::vector<MapT> &add_maps) {
- if (node != nullptr) {
- VLOG(1) << "Node is " << aos::FlatbufferToJson(node);
- }
- // First find the channel and rename it.
- const Channel *remapped_channel =
- configuration::GetChannel(logged_configuration(), name, type, "", node);
- CHECK(remapped_channel != nullptr) << ": Failed to find {\"name\": \"" << name
- << "\", \"type\": \"" << type << "\"}";
- VLOG(1) << "Original {\"name\": \"" << name << "\", \"type\": \"" << type
- << "\"}";
- VLOG(1) << "Remapped "
- << aos::configuration::StrippedChannelToString(remapped_channel);
-
- const size_t channel_index =
- configuration::ChannelIndex(logged_configuration(), remapped_channel);
- CHECK_EQ(0u, remapped_channels_.count(channel_index))
- << "Already remapped channel "
- << configuration::CleanedChannelToString(remapped_channel);
-
- RemappedChannel remapped_channel_struct;
- remapped_channel_struct.remapped_name = new_name;
- remapped_channel_struct.new_type.clear();
- remapped_channels_[channel_index] = std::move(remapped_channel_struct);
-
- // Then add any provided maps.
- for (const MapT &map : add_maps) {
- maps_.push_back(map);
- }
-
- // Finally rewrite the config.
- MakeRemappedConfig();
+ CheckEventsAreNotScheduled();
+ config_remapper_.RenameOriginalChannel(name, type, node, new_name, add_maps);
}
-void LogReader::MakeRemappedConfig() {
+void LogReader::CheckEventsAreNotScheduled() {
for (std::unique_ptr<State> &state : states_) {
if (state) {
CHECK(!state->event_loop())
<< ": Can't change the mapping after the events are scheduled.";
}
}
-
- // If no remapping occurred and we are using the original config, then there
- // is nothing interesting to do here.
- if (remapped_channels_.empty() && replay_configuration_ == nullptr) {
- remapped_configuration_ = logged_configuration();
- return;
- }
- // Config to copy Channel definitions from. Use the specified
- // replay_configuration_ if it has been provided.
- const Configuration *const base_config = replay_configuration_ == nullptr
- ? logged_configuration()
- : replay_configuration_;
-
- // Create a config with all the channels, but un-sorted/merged. Collect up
- // the schemas while we do this. Call MergeConfiguration to sort everything,
- // and then merge it all in together.
-
- // This is the builder that we use for the config containing all the new
- // channels.
- flatbuffers::FlatBufferBuilder fbb;
- fbb.ForceDefaults(true);
- std::vector<flatbuffers::Offset<Channel>> channel_offsets;
-
- CHECK_EQ(Channel::MiniReflectTypeTable()->num_elems, 14u)
- << ": Merging logic needs to be updated when the number of channel "
- "fields changes.";
-
- // List of schemas.
- std::map<std::string_view, FlatbufferVector<reflection::Schema>> schema_map;
- // Make sure our new RemoteMessage schema is in there for old logs without it.
- schema_map.insert(std::make_pair(
- RemoteMessage::GetFullyQualifiedName(),
- FlatbufferVector<reflection::Schema>(FlatbufferSpan<reflection::Schema>(
- message_bridge::RemoteMessageSchema()))));
-
- // Reconstruct the remapped channels.
- for (auto &pair : remapped_channels_) {
- const Channel *const c = CHECK_NOTNULL(configuration::GetChannel(
- base_config, logged_configuration()->channels()->Get(pair.first), "",
- nullptr));
- channel_offsets.emplace_back(
- CopyChannel(c, pair.second.remapped_name, "", &fbb));
-
- if (c->has_destination_nodes()) {
- for (const Connection *connection : *c->destination_nodes()) {
- switch (connection->timestamp_logger()) {
- case LoggerConfig::LOCAL_LOGGER:
- case LoggerConfig::NOT_LOGGED:
- // There is no timestamp channel associated with this, so ignore it.
- break;
-
- case LoggerConfig::REMOTE_LOGGER:
- case LoggerConfig::LOCAL_AND_REMOTE_LOGGER:
- // We want to make a split timestamp channel regardless of what type
- // of log this used to be. No sense propagating the single
- // timestamp channel.
-
- CHECK(connection->has_timestamp_logger_nodes());
- for (const flatbuffers::String *timestamp_logger_node :
- *connection->timestamp_logger_nodes()) {
- const Node *node = configuration::GetNode(
- logged_configuration(), timestamp_logger_node->string_view());
- message_bridge::ChannelTimestampFinder finder(
- logged_configuration(), "log_reader", node);
-
- // We are assuming here that all the maps are setup correctly to
- // handle arbitrary timestamps. Apply the maps for this node to
- // see what name this ends up with.
- std::string name = finder.SplitChannelName(
- pair.second.remapped_name, c->type()->str(), connection);
- std::string unmapped_name = name;
- configuration::HandleMaps(logged_configuration()->maps(), &name,
- "aos.message_bridge.RemoteMessage",
- node);
- CHECK_NE(name, unmapped_name)
- << ": Remote timestamp channel was not remapped, this is "
- "very fishy";
- flatbuffers::Offset<flatbuffers::String> channel_name_offset =
- fbb.CreateString(name);
- flatbuffers::Offset<flatbuffers::String> channel_type_offset =
- fbb.CreateString("aos.message_bridge.RemoteMessage");
- flatbuffers::Offset<flatbuffers::String> source_node_offset =
- fbb.CreateString(timestamp_logger_node->string_view());
-
- // Now, build a channel. Don't log it, 2 senders, and match the
- // source frequency.
- Channel::Builder channel_builder(fbb);
- channel_builder.add_name(channel_name_offset);
- channel_builder.add_type(channel_type_offset);
- channel_builder.add_source_node(source_node_offset);
- channel_builder.add_logger(LoggerConfig::NOT_LOGGED);
- channel_builder.add_num_senders(2);
- if (c->has_frequency()) {
- channel_builder.add_frequency(c->frequency());
- }
- if (c->has_channel_storage_duration()) {
- channel_builder.add_channel_storage_duration(
- c->channel_storage_duration());
- }
- channel_offsets.emplace_back(channel_builder.Finish());
- }
- break;
- }
- }
- }
- }
-
- // Now reconstruct the original channels, translating types as needed
- for (const Channel *c : *base_config->channels()) {
- // Search for a mapping channel.
- std::string_view new_type = "";
- for (auto &pair : remapped_channels_) {
- const Channel *const remapped_channel =
- logged_configuration()->channels()->Get(pair.first);
- if (remapped_channel->name()->string_view() == c->name()->string_view() &&
- remapped_channel->type()->string_view() == c->type()->string_view()) {
- new_type = pair.second.new_type;
- break;
- }
- }
-
- // Copy everything over.
- channel_offsets.emplace_back(CopyChannel(c, "", new_type, &fbb));
-
- // Add the schema if it doesn't exist.
- if (schema_map.find(c->type()->string_view()) == schema_map.end()) {
- CHECK(c->has_schema());
- schema_map.insert(std::make_pair(c->type()->string_view(),
- RecursiveCopyFlatBuffer(c->schema())));
- }
- }
-
- // The MergeConfiguration API takes a vector, not a map. Convert.
- std::vector<FlatbufferVector<reflection::Schema>> schemas;
- while (!schema_map.empty()) {
- schemas.emplace_back(std::move(schema_map.begin()->second));
- schema_map.erase(schema_map.begin());
- }
-
- // Create the Configuration containing the new channels that we want to add.
- const flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Channel>>>
- channels_offset =
- channel_offsets.empty() ? 0 : fbb.CreateVector(channel_offsets);
-
- // Copy over the old maps.
- std::vector<flatbuffers::Offset<Map>> map_offsets;
- if (base_config->maps()) {
- for (const Map *map : *base_config->maps()) {
- map_offsets.emplace_back(aos::RecursiveCopyFlatBuffer(map, &fbb));
- }
- }
-
- // Now create the new maps. These are second so they take effect first.
- for (const MapT &map : maps_) {
- CHECK(!map.match->name.empty());
- const flatbuffers::Offset<flatbuffers::String> match_name_offset =
- fbb.CreateString(map.match->name);
- flatbuffers::Offset<flatbuffers::String> match_type_offset;
- if (!map.match->type.empty()) {
- match_type_offset = fbb.CreateString(map.match->type);
- }
- flatbuffers::Offset<flatbuffers::String> match_source_node_offset;
- if (!map.match->source_node.empty()) {
- match_source_node_offset = fbb.CreateString(map.match->source_node);
- }
- CHECK(!map.rename->name.empty());
- const flatbuffers::Offset<flatbuffers::String> rename_name_offset =
- fbb.CreateString(map.rename->name);
- Channel::Builder match_builder(fbb);
- match_builder.add_name(match_name_offset);
- if (!match_type_offset.IsNull()) {
- match_builder.add_type(match_type_offset);
- }
- if (!match_source_node_offset.IsNull()) {
- match_builder.add_source_node(match_source_node_offset);
- }
- const flatbuffers::Offset<Channel> match_offset = match_builder.Finish();
-
- Channel::Builder rename_builder(fbb);
- rename_builder.add_name(rename_name_offset);
- const flatbuffers::Offset<Channel> rename_offset = rename_builder.Finish();
-
- Map::Builder map_builder(fbb);
- map_builder.add_match(match_offset);
- map_builder.add_rename(rename_offset);
- map_offsets.emplace_back(map_builder.Finish());
- }
-
- flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Map>>>
- maps_offsets = map_offsets.empty() ? 0 : fbb.CreateVector(map_offsets);
-
- // And copy everything else over.
- flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Node>>>
- nodes_offset = aos::RecursiveCopyVectorTable(base_config->nodes(), &fbb);
-
- flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Application>>>
- applications_offset =
- aos::RecursiveCopyVectorTable(base_config->applications(), &fbb);
-
- // Now insert everything else in unmodified.
- ConfigurationBuilder configuration_builder(fbb);
- if (!channels_offset.IsNull()) {
- configuration_builder.add_channels(channels_offset);
- }
- if (!maps_offsets.IsNull()) {
- configuration_builder.add_maps(maps_offsets);
- }
- if (!nodes_offset.IsNull()) {
- configuration_builder.add_nodes(nodes_offset);
- }
- if (!applications_offset.IsNull()) {
- configuration_builder.add_applications(applications_offset);
- }
-
- if (base_config->has_channel_storage_duration()) {
- configuration_builder.add_channel_storage_duration(
- base_config->channel_storage_duration());
- }
-
- CHECK_EQ(Configuration::MiniReflectTypeTable()->num_elems, 6u)
- << ": Merging logic needs to be updated when the number of configuration "
- "fields changes.";
-
- fbb.Finish(configuration_builder.Finish());
-
- // Clean it up and return it! By using MergeConfiguration here, we'll
- // actually get a deduplicated config for free too.
- FlatbufferDetachedBuffer<Configuration> new_merged_config =
- configuration::MergeConfiguration(
- FlatbufferDetachedBuffer<Configuration>(fbb.Release()));
-
- remapped_configuration_buffer_ =
- std::make_unique<FlatbufferDetachedBuffer<Configuration>>(
- configuration::MergeConfiguration(new_merged_config, schemas));
-
- remapped_configuration_ = &remapped_configuration_buffer_->message();
-
- // TODO(austin): Lazily re-build to save CPU?
}
std::unique_ptr<const ReplayChannelIndices>
@@ -1725,53 +1221,13 @@
}
std::vector<const Channel *> LogReader::RemappedChannels() const {
- std::vector<const Channel *> result;
- result.reserve(remapped_channels_.size());
- for (auto &pair : remapped_channels_) {
- const Channel *const logged_channel =
- CHECK_NOTNULL(logged_configuration()->channels()->Get(pair.first));
-
- auto channel_iterator = std::lower_bound(
- remapped_configuration_->channels()->cbegin(),
- remapped_configuration_->channels()->cend(),
- std::make_pair(std::string_view(pair.second.remapped_name),
- logged_channel->type()->string_view()),
- CompareChannels);
-
- CHECK(channel_iterator != remapped_configuration_->channels()->cend());
- CHECK(EqualsChannels(
- *channel_iterator,
- std::make_pair(std::string_view(pair.second.remapped_name),
- logged_channel->type()->string_view())));
- result.push_back(*channel_iterator);
- }
- return result;
+ return config_remapper_.RemappedChannels();
}
const Channel *LogReader::RemapChannel(const EventLoop *event_loop,
const Node *node,
const Channel *channel) {
- std::string_view channel_name = channel->name()->string_view();
- std::string_view channel_type = channel->type()->string_view();
- const int channel_index =
- configuration::ChannelIndex(logged_configuration(), channel);
- // If the channel is remapped, find the correct channel name to use.
- if (remapped_channels_.count(channel_index) > 0) {
- VLOG(3) << "Got remapped channel on "
- << configuration::CleanedChannelToString(channel);
- channel_name = remapped_channels_[channel_index].remapped_name;
- }
-
- VLOG(2) << "Going to remap channel " << channel_name << " " << channel_type;
- const Channel *remapped_channel = configuration::GetChannel(
- configuration(), channel_name, channel_type,
- event_loop ? event_loop->name() : "log_reader", node);
-
- CHECK(remapped_channel != nullptr)
- << ": Unable to send {\"name\": \"" << channel_name << "\", \"type\": \""
- << channel_type << "\"} because it is not in the provided configuration.";
-
- return remapped_channel;
+ return config_remapper_.RemapChannel(event_loop, node, channel);
}
LogReader::State::State(
diff --git a/aos/events/logging/log_reader.h b/aos/events/logging/log_reader.h
index 0486665..228341d 100644
--- a/aos/events/logging/log_reader.h
+++ b/aos/events/logging/log_reader.h
@@ -15,9 +15,11 @@
#include "aos/condition.h"
#include "aos/events/event_loop.h"
#include "aos/events/event_loop_tmpl.h"
+#include "aos/events/logging/config_remapper.h"
#include "aos/events/logging/logfile_sorting.h"
#include "aos/events/logging/logfile_utils.h"
#include "aos/events/logging/logger_generated.h"
+#include "aos/events/logging/replay_channels.h"
#include "aos/events/logging/replay_timing_generated.h"
#include "aos/events/shm_event_loop.h"
#include "aos/events/simulated_event_loop.h"
@@ -35,11 +37,6 @@
class EventNotifier;
-// Vector of pair of name and type of the channel
-using ReplayChannels = std::vector<std::pair<std::string, std::string>>;
-// Vector of channel indices
-using ReplayChannelIndices = std::vector<size_t>;
-
// We end up with one of the following 3 log file types.
//
// Single node logged as the source node.
@@ -217,34 +214,21 @@
void SetEndTime(std::string end_time);
void SetEndTime(realtime_clock::time_point end_time);
- // Enum to use for indicating how RemapLoggedChannel behaves when there is
- // already a channel with the remapped name (e.g., as may happen when
- // replaying a logfile that was itself generated from replay).
- enum class RemapConflict {
- // LOG(FATAL) on conflicts in remappings.
- kDisallow,
- // If we run into a conflict, attempt to remap the channel we would be
- // overriding (and continue to do so if remapping *that* channel also
- // generates a conflict).
- // This will mean that if we repeatedly replay a log, we will end up
- // stacking more and more /original's on the start of the oldest version
- // of the channels.
- kCascade
- };
-
// Causes the logger to publish the provided channel on a different name so
// that replayed applications can publish on the proper channel name without
// interference. This operates on raw channel names, without any node or
// application specific mappings.
- void RemapLoggedChannel(
- std::string_view name, std::string_view type,
- std::string_view add_prefix = "/original", std::string_view new_type = "",
- RemapConflict conflict_handling = RemapConflict::kCascade);
+ void RemapLoggedChannel(std::string_view name, std::string_view type,
+ std::string_view add_prefix = "/original",
+ std::string_view new_type = "",
+ ConfigRemapper::RemapConflict conflict_handling =
+ ConfigRemapper::RemapConflict::kCascade);
template <typename T>
- void RemapLoggedChannel(
- std::string_view name, std::string_view add_prefix = "/original",
- std::string_view new_type = "",
- RemapConflict conflict_handling = RemapConflict::kCascade) {
+ void RemapLoggedChannel(std::string_view name,
+ std::string_view add_prefix = "/original",
+ std::string_view new_type = "",
+ ConfigRemapper::RemapConflict conflict_handling =
+ ConfigRemapper::RemapConflict::kCascade) {
RemapLoggedChannel(name, T::GetFullyQualifiedName(), add_prefix, new_type,
conflict_handling);
}
@@ -257,15 +241,18 @@
// TODO(austin): If you have 2 nodes remapping something to the same channel,
// this doesn't handle that. No use cases exist yet for that, so it isn't
// being done yet.
- void RemapLoggedChannel(
- std::string_view name, std::string_view type, const Node *node,
- std::string_view add_prefix = "/original", std::string_view new_type = "",
- RemapConflict conflict_handling = RemapConflict::kCascade);
+ void RemapLoggedChannel(std::string_view name, std::string_view type,
+ const Node *node,
+ std::string_view add_prefix = "/original",
+ std::string_view new_type = "",
+ ConfigRemapper::RemapConflict conflict_handling =
+ ConfigRemapper::RemapConflict::kCascade);
template <typename T>
- void RemapLoggedChannel(
- std::string_view name, const Node *node,
- std::string_view add_prefix = "/original", std::string_view new_type = "",
- RemapConflict conflict_handling = RemapConflict::kCascade) {
+ void RemapLoggedChannel(std::string_view name, const Node *node,
+ std::string_view add_prefix = "/original",
+ std::string_view new_type = "",
+ ConfigRemapper::RemapConflict conflict_handling =
+ ConfigRemapper::RemapConflict::kCascade) {
RemapLoggedChannel(name, T::GetFullyQualifiedName(), node, add_prefix,
new_type, conflict_handling);
}
@@ -325,11 +312,7 @@
// Returns true if the channel exists on the node and was logged.
template <typename T>
bool HasLoggedChannel(std::string_view name, const Node *node = nullptr) {
- const Channel *channel =
- configuration::GetChannel(logged_configuration(), name,
- T::GetFullyQualifiedName(), "", node, true);
- if (channel == nullptr) return false;
- return channel->logger() != LoggerConfig::NOT_LOGGED;
+ return config_remapper_.HasOriginalChannel<T>(name, node);
}
// Returns a list of all the original channels from remapping.
@@ -407,9 +390,10 @@
// Queues at least max_out_of_order_duration_ messages into channels_.
void QueueMessages();
- // Handle constructing a configuration with all the additional remapped
- // channels from calls to RemapLoggedChannel.
- void MakeRemappedConfig();
+
+ // Checks if any states have their event loops initialized which indicates
+ // events have been scheduled
+ void CheckEventsAreNotScheduled();
// Returns the number of nodes.
size_t nodes_count() const {
@@ -911,22 +895,9 @@
// less than the second node.
std::unique_ptr<message_bridge::MultiNodeNoncausalOffsetEstimator> filters_;
- std::unique_ptr<FlatbufferDetachedBuffer<Configuration>>
- remapped_configuration_buffer_;
-
std::unique_ptr<SimulatedEventLoopFactory> event_loop_factory_unique_ptr_;
SimulatedEventLoopFactory *event_loop_factory_ = nullptr;
- // Map of channel indices to new name. The channel index will be an index into
- // logged_configuration(), and the string key will be the name of the channel
- // to send on instead of the logged channel name.
- struct RemappedChannel {
- std::string remapped_name;
- std::string new_type;
- };
- std::map<size_t, RemappedChannel> remapped_channels_;
- std::vector<MapT> maps_;
-
// Number of nodes which still have data to send. This is used to figure out
// when to exit.
size_t live_nodes_ = 0;
@@ -935,7 +906,6 @@
// running and have yet to hit the realtime end time, if any.
size_t live_nodes_with_realtime_time_end_ = 0;
- const Configuration *remapped_configuration_ = nullptr;
const Configuration *replay_configuration_ = nullptr;
// If a ReplayChannels was passed to LogReader, this will hold the
@@ -956,6 +926,7 @@
realtime_clock::time_point start_time_ = realtime_clock::min_time;
realtime_clock::time_point end_time_ = realtime_clock::max_time;
+ ConfigRemapper config_remapper_;
};
} // namespace logger
diff --git a/aos/events/logging/realtime_replay_test.cc b/aos/events/logging/realtime_replay_test.cc
index 8c7751e..4118550 100644
--- a/aos/events/logging/realtime_replay_test.cc
+++ b/aos/events/logging/realtime_replay_test.cc
@@ -363,7 +363,7 @@
&config_.message(), &replay_channels);
EXPECT_DEATH(
reader.RemapLoggedChannel<aos::examples::Ping>("/fake", "/original"),
- "which is not included in the replay channels passed to LogReader");
+ "which is not included in the replay channels passed to");
}
} // namespace aos::logger::testing
diff --git a/aos/events/logging/replay_channels.h b/aos/events/logging/replay_channels.h
new file mode 100644
index 0000000..f9ec144
--- /dev/null
+++ b/aos/events/logging/replay_channels.h
@@ -0,0 +1,15 @@
+#ifndef AOS_EVENTS_LOGGING_REPLAY_CHANNELS_H_
+#define AOS_EVENTS_LOGGING_REPLAY_CHANNELS_H_
+
+#include <string>
+#include <vector>
+
+namespace aos {
+namespace logger {
+// Vector of pair of name and type of the channel
+using ReplayChannels = std::vector<std::pair<std::string, std::string>>;
+// Vector of channel indices
+using ReplayChannelIndices = std::vector<size_t>;
+} // namespace logger
+} // namespace aos
+#endif // AOS_EVENTS_LOGGING_REPLAY_CHANNELS_H_
diff --git a/aos/testdata/BUILD b/aos/testdata/BUILD
index 8b87682..076e6c9 100644
--- a/aos/testdata/BUILD
+++ b/aos/testdata/BUILD
@@ -27,7 +27,7 @@
"invalid_source_node.json",
"self_forward.json",
],
- visibility = ["//aos:__pkg__"],
+ visibility = ["//visibility:public"],
)
genrule(