Automatically remap repeatedly remapped channels
If you replay a log multiple times and are remapping a channel
/foo on each run, this will now cascade the remaps so that on
the first replay you got
/foo -> /original/foo
and then on the next replay you get
/original/foo -> /original/original/foo
/foo -> /original/foo
and so on.
This is needed to handle logs where timestamp channels are not
NOT_LOGGED.
Change-Id: I158735709504ee1d3ebb7a9678c03bbb0a53ba5c
Signed-off-by: James Kuszmaul <jabukuszmaul+collab@gmail.com>
diff --git a/aos/events/logging/log_reader.cc b/aos/events/logging/log_reader.cc
index 87ad72c..f80f28d 100644
--- a/aos/events/logging/log_reader.cc
+++ b/aos/events/logging/log_reader.cc
@@ -288,6 +288,10 @@
RemoteMessage::GetFullyQualifiedName(), "", node, true);
if (timestamp_channel != nullptr) {
+ // If for some reason a timestamp channel is not NOT_LOGGED (which
+ // is unusual), then remap the channel so that the replayed channel
+ // doesn't overlap with the special separate replay we do for
+ // timestamps.
if (timestamp_channel->logger() != LoggerConfig::NOT_LOGGED) {
RemapLoggedChannel<RemoteMessage>(
timestamp_channel->name()->string_view(), node);
@@ -1278,9 +1282,40 @@
event_loop_factory_ = nullptr;
}
+namespace {
+// Checks if the specified channel name/type exists in the config and, depending
+// on the value of conflict_handling, calls conflict_handler or just dies.
+template <typename F>
+void CheckAndHandleRemapConflict(std::string_view new_name,
+ std::string_view new_type,
+ const Configuration *config,
+ LogReader::RemapConflict conflict_handling,
+ F conflict_handler) {
+ const Channel *existing_channel =
+ configuration::GetChannel(config, new_name, new_type, "", nullptr, true);
+ if (existing_channel != nullptr) {
+ switch (conflict_handling) {
+ case LogReader::RemapConflict::kDisallow:
+ LOG(FATAL)
+ << "Channel "
+ << configuration::StrippedChannelToString(existing_channel)
+ << " is already used--you can't remap a logged channel to it.";
+ break;
+ case LogReader::RemapConflict::kCascade:
+ LOG(INFO) << "Automatically remapping "
+ << configuration::StrippedChannelToString(existing_channel)
+ << " to avoid conflicts.";
+ conflict_handler();
+ break;
+ }
+ }
+}
+} // namespace
+
void LogReader::RemapLoggedChannel(std::string_view name, std::string_view type,
std::string_view add_prefix,
- std::string_view new_type) {
+ std::string_view new_type,
+ RemapConflict conflict_handling) {
for (size_t ii = 0; ii < logged_configuration()->channels()->size(); ++ii) {
const Channel *const channel = logged_configuration()->channels()->Get(ii);
if (channel->name()->str() == name &&
@@ -1292,6 +1327,16 @@
remapped_channel.remapped_name =
std::string(add_prefix) + std::string(name);
remapped_channel.new_type = new_type;
+ const std::string_view remapped_type =
+ remapped_channel.new_type.empty() ? type : remapped_channel.new_type;
+ CheckAndHandleRemapConflict(
+ remapped_channel.remapped_name, remapped_type,
+ remapped_configuration_, conflict_handling,
+ [this, &remapped_channel, remapped_type, add_prefix,
+ conflict_handling]() {
+ RemapLoggedChannel(remapped_channel.remapped_name, remapped_type,
+ add_prefix, "", conflict_handling);
+ });
remapped_channels_[ii] = std::move(remapped_channel);
VLOG(1) << "Remapping channel "
<< configuration::CleanedChannelToString(channel)
@@ -1307,7 +1352,8 @@
void LogReader::RemapLoggedChannel(std::string_view name, std::string_view type,
const Node *node,
std::string_view add_prefix,
- std::string_view new_type) {
+ std::string_view new_type,
+ RemapConflict conflict_handling) {
VLOG(1) << "Node is " << aos::FlatbufferToJson(node);
const Channel *remapped_channel =
configuration::GetChannel(logged_configuration(), name, type, "", node);
@@ -1350,6 +1396,15 @@
std::string(add_prefix) +
std::string(remapped_channel->name()->string_view());
remapped_channel_struct.new_type = new_type;
+ const std::string_view remapped_type = new_type.empty() ? type : new_type;
+ CheckAndHandleRemapConflict(
+ remapped_channel_struct.remapped_name, remapped_type,
+ remapped_configuration_, conflict_handling,
+ [this, &remapped_channel_struct, remapped_type, node, add_prefix,
+ conflict_handling]() {
+ RemapLoggedChannel(remapped_channel_struct.remapped_name, remapped_type,
+ node, add_prefix, "", conflict_handling);
+ });
remapped_channels_[channel_index] = std::move(remapped_channel_struct);
MakeRemappedConfig();
}
diff --git a/aos/events/logging/log_reader.h b/aos/events/logging/log_reader.h
index 9e44996..07cca48 100644
--- a/aos/events/logging/log_reader.h
+++ b/aos/events/logging/log_reader.h
@@ -156,18 +156,36 @@
void SetEndTime(std::string end_time);
void SetEndTime(realtime_clock::time_point end_time);
+ // Enum to use for indicating how RemapLoggedChannel behaves when there is
+ // already a channel with the remapped name (e.g., as may happen when
+ // replaying a logfile that was itself generated from replay).
+ enum class RemapConflict {
+ // LOG(FATAL) on conflicts in remappings.
+ kDisallow,
+ // If we run into a conflict, attempt to remap the channel we would be
+ // overriding (and continue to do so if remapping *that* channel also
+ // generates a conflict).
+ // This will mean that if we repeatedly replay a log, we will end up
+ // stacking more and more /original's on the start of the oldest version
+ // of the channels.
+ kCascade
+ };
+
// Causes the logger to publish the provided channel on a different name so
// that replayed applications can publish on the proper channel name without
// interference. This operates on raw channel names, without any node or
// application specific mappings.
- void RemapLoggedChannel(std::string_view name, std::string_view type,
- std::string_view add_prefix = "/original",
- std::string_view new_type = "");
+ void RemapLoggedChannel(
+ std::string_view name, std::string_view type,
+ std::string_view add_prefix = "/original", std::string_view new_type = "",
+ RemapConflict conflict_handling = RemapConflict::kCascade);
template <typename T>
- void RemapLoggedChannel(std::string_view name,
- std::string_view add_prefix = "/original",
- std::string_view new_type = "") {
- RemapLoggedChannel(name, T::GetFullyQualifiedName(), add_prefix, new_type);
+ void RemapLoggedChannel(
+ std::string_view name, std::string_view add_prefix = "/original",
+ std::string_view new_type = "",
+ RemapConflict conflict_handling = RemapConflict::kCascade) {
+ RemapLoggedChannel(name, T::GetFullyQualifiedName(), add_prefix, new_type,
+ conflict_handling);
}
// Remaps the provided channel, though this respects node mappings, and
@@ -179,16 +197,17 @@
// TODO(austin): If you have 2 nodes remapping something to the same channel,
// this doesn't handle that. No use cases exist yet for that, so it isn't
// being done yet.
- void RemapLoggedChannel(std::string_view name, std::string_view type,
- const Node *node,
- std::string_view add_prefix = "/original",
- std::string_view new_type = "");
+ void RemapLoggedChannel(
+ std::string_view name, std::string_view type, const Node *node,
+ std::string_view add_prefix = "/original", std::string_view new_type = "",
+ RemapConflict conflict_handling = RemapConflict::kCascade);
template <typename T>
- void RemapLoggedChannel(std::string_view name, const Node *node,
- std::string_view add_prefix = "/original",
- std::string_view new_type = "") {
+ void RemapLoggedChannel(
+ std::string_view name, const Node *node,
+ std::string_view add_prefix = "/original", std::string_view new_type = "",
+ RemapConflict conflict_handling = RemapConflict::kCascade) {
RemapLoggedChannel(name, T::GetFullyQualifiedName(), node, add_prefix,
- new_type);
+ new_type, conflict_handling);
}
template <typename T>
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index ed7d150..ab0ec3a 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -566,11 +566,14 @@
bool shared;
// sha256 of the config.
std::string_view sha256;
+ // sha256 of the relogged config
+ std::string_view relogged_sha256;
};
std::ostream &operator<<(std::ostream &ostream, const ConfigParams ¶ms) {
ostream << "{config: \"" << params.config << "\", shared: " << params.shared
- << ", sha256: \"" << params.sha256 << "\"}";
+ << ", sha256: \"" << params.sha256 << "\", relogged_sha256: \""
+ << params.relogged_sha256 << "\"}";
return ostream;
}
@@ -754,8 +757,8 @@
unlink((file + ".xz").c_str());
}
- for (const auto &file :
- MakeLogFiles(tmp_dir_ + "/relogged1", tmp_dir_ + "/relogged2")) {
+ for (const auto &file : MakeLogFiles(tmp_dir_ + "/relogged1",
+ tmp_dir_ + "/relogged2", 3, 3, true)) {
unlink(file.c_str());
}
@@ -775,12 +778,14 @@
std::vector<std::string> MakeLogFiles(std::string logfile_base1,
std::string logfile_base2,
size_t pi1_data_count = 3,
- size_t pi2_data_count = 3) {
+ size_t pi2_data_count = 3,
+ bool relogged_config = false) {
+ std::string_view sha256 = relogged_config
+ ? std::get<0>(GetParam()).relogged_sha256
+ : std::get<0>(GetParam()).sha256;
std::vector<std::string> result;
- result.emplace_back(absl::StrCat(
- logfile_base1, "_", std::get<0>(GetParam()).sha256, Extension()));
- result.emplace_back(absl::StrCat(
- logfile_base2, "_", std::get<0>(GetParam()).sha256, Extension()));
+ result.emplace_back(absl::StrCat(logfile_base1, "_", sha256, Extension()));
+ result.emplace_back(absl::StrCat(logfile_base2, "_", sha256, Extension()));
for (size_t i = 0; i < pi1_data_count; ++i) {
result.emplace_back(
absl::StrCat(logfile_base1, "_pi1_data.part", i, Extension()));
@@ -1322,24 +1327,40 @@
"aos.message_bridge.ServerStatistics", 1),
std::make_tuple("/test", "aos.examples.Ping", 1)))
<< " : " << logfiles_[2];
- EXPECT_THAT(
- CountChannelsData(config, logfiles_[3]),
- UnorderedElementsAre(
- std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 1),
- std::make_tuple("/pi1/aos", "aos.message_bridge.ClientStatistics",
- 1)))
- << " : " << logfiles_[3];
- EXPECT_THAT(
- CountChannelsData(config, logfiles_[4]),
- UnorderedElementsAre(
- std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 199),
- std::make_tuple("/pi1/aos", "aos.message_bridge.ServerStatistics",
- 20),
- std::make_tuple("/pi1/aos", "aos.message_bridge.ClientStatistics",
- 199),
- std::make_tuple("/pi1/aos", "aos.timing.Report", 40),
- std::make_tuple("/test", "aos.examples.Ping", 2000)))
- << " : " << logfiles_[4];
+ {
+ std::vector<std::tuple<std::string, std::string, int>> channel_counts = {
+ std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 1),
+ std::make_tuple("/pi1/aos", "aos.message_bridge.ClientStatistics",
+ 1)};
+ if (!std::get<0>(GetParam()).shared) {
+ channel_counts.push_back(
+ std::make_tuple("/pi1/aos/remote_timestamps/pi2/pi1/aos/"
+ "aos-message_bridge-Timestamp",
+ "aos.message_bridge.RemoteMessage", 1));
+ }
+ EXPECT_THAT(CountChannelsData(config, logfiles_[3]),
+ ::testing::UnorderedElementsAreArray(channel_counts))
+ << " : " << logfiles_[3];
+ }
+ {
+ std::vector<std::tuple<std::string, std::string, int>> channel_counts = {
+ std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 199),
+ std::make_tuple("/pi1/aos", "aos.message_bridge.ServerStatistics",
+ 20),
+ std::make_tuple("/pi1/aos", "aos.message_bridge.ClientStatistics",
+ 199),
+ std::make_tuple("/pi1/aos", "aos.timing.Report", 40),
+ std::make_tuple("/test", "aos.examples.Ping", 2000)};
+ if (!std::get<0>(GetParam()).shared) {
+ channel_counts.push_back(
+ std::make_tuple("/pi1/aos/remote_timestamps/pi2/pi1/aos/"
+ "aos-message_bridge-Timestamp",
+ "aos.message_bridge.RemoteMessage", 199));
+ }
+ EXPECT_THAT(CountChannelsData(config, logfiles_[4]),
+ ::testing::UnorderedElementsAreArray(channel_counts))
+ << " : " << logfiles_[4];
+ }
// Timestamps for pong
EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[2]),
UnorderedElementsAre())
@@ -2111,9 +2132,18 @@
log_reader_factory.set_send_delay(chrono::microseconds(0));
std::vector<const Channel *> remapped_channels = reader.RemappedChannels();
- ASSERT_EQ(remapped_channels.size(), 1u);
+ // Note: An extra channel gets remapped automatically due to a timestamp
+ // channel being LOCAL_LOGGER'd.
+ ASSERT_EQ(remapped_channels.size(), std::get<0>(GetParam()).shared ? 1u : 2u);
EXPECT_EQ(remapped_channels[0]->name()->string_view(), "/original/pi1/aos");
EXPECT_EQ(remapped_channels[0]->type()->string_view(), "aos.timing.Report");
+ if (!std::get<0>(GetParam()).shared) {
+ EXPECT_EQ(remapped_channels[1]->name()->string_view(),
+ "/original/pi1/aos/remote_timestamps/pi2/pi1/aos/"
+ "aos-message_bridge-Timestamp");
+ EXPECT_EQ(remapped_channels[1]->type()->string_view(),
+ "aos.message_bridge.RemoteMessage");
+ }
reader.Register(&log_reader_factory);
@@ -2285,9 +2315,12 @@
const Channel *channel =
full_event_loop->configuration()->channels()->Get(ii);
// We currently don't support replaying remote timestamp channels in
- // realtime replay.
+ // realtime replay (unless the remote timestamp channel was not NOT_LOGGED,
+ // in which case it gets auto-remapped and replayed on a /original channel).
if (channel->name()->string_view().find("remote_timestamp") !=
- std::string_view::npos) {
+ std::string_view::npos &&
+ channel->name()->string_view().find("/original") ==
+ std::string_view::npos) {
continue;
}
if (configuration::ChannelIsReadableOnNode(channel, full_pi1)) {
@@ -2640,8 +2673,19 @@
// And verify that we can run the LogReader over the relogged files without
// hitting any fatal errors.
{
- LogReader relogged_reader(SortParts(
- MakeLogFiles(tmp_dir_ + "/relogged1", tmp_dir_ + "/relogged2")));
+ LogReader relogged_reader(SortParts(MakeLogFiles(
+ tmp_dir_ + "/relogged1", tmp_dir_ + "/relogged2", 3, 3, true)));
+ relogged_reader.Register();
+
+ relogged_reader.event_loop_factory()->Run();
+ }
+ // And confirm that we can read the logged file using the reader's
+ // configuration.
+ {
+ LogReader relogged_reader(
+ SortParts(MakeLogFiles(tmp_dir_ + "/relogged1", tmp_dir_ + "/relogged2",
+ 3, 3, true)),
+ reader.configuration());
relogged_reader.Register();
relogged_reader.event_loop_factory()->Run();
@@ -3668,27 +3712,29 @@
constexpr std::string_view kCombinedConfigSha1(
"46f8daa7d84eb999c8d3584b79f4a289fd46e3a0b47a08bdbee9c7e3b89b4aff");
constexpr std::string_view kSplitConfigSha1(
- "2558663e9414383e264510ce733505a40be99d2f43e299819417944f06b899ea");
+ "53978a9c089e8f5a525503ce18aca6146a7a9dee450b6067580ed952a1ff08dd");
+constexpr std::string_view kReloggedSplitConfigSha1(
+ "037fb89ada330db8908314b26bc7d96a6b3264984c189471d23b70ab6443ff96");
INSTANTIATE_TEST_SUITE_P(
All, MultinodeLoggerTest,
- ::testing::Combine(::testing::Values(
- ConfigParams{
- "multinode_pingpong_combined_config.json", true,
- kCombinedConfigSha1},
- ConfigParams{"multinode_pingpong_split_config.json",
- false, kSplitConfigSha1}),
- ::testing::ValuesIn(SupportedCompressionAlgorithms())));
+ ::testing::Combine(
+ ::testing::Values(
+ ConfigParams{"multinode_pingpong_combined_config.json", true,
+ kCombinedConfigSha1, kCombinedConfigSha1},
+ ConfigParams{"multinode_pingpong_split_config.json", false,
+ kSplitConfigSha1, kReloggedSplitConfigSha1}),
+ ::testing::ValuesIn(SupportedCompressionAlgorithms())));
INSTANTIATE_TEST_SUITE_P(
All, MultinodeLoggerDeathTest,
- ::testing::Combine(::testing::Values(
- ConfigParams{
- "multinode_pingpong_combined_config.json", true,
- kCombinedConfigSha1},
- ConfigParams{"multinode_pingpong_split_config.json",
- false, kSplitConfigSha1}),
- ::testing::ValuesIn(SupportedCompressionAlgorithms())));
+ ::testing::Combine(
+ ::testing::Values(
+ ConfigParams{"multinode_pingpong_combined_config.json", true,
+ kCombinedConfigSha1, kCombinedConfigSha1},
+ ConfigParams{"multinode_pingpong_split_config.json", false,
+ kSplitConfigSha1, kReloggedSplitConfigSha1}),
+ ::testing::ValuesIn(SupportedCompressionAlgorithms())));
// Tests that we can relog with a different config. This makes most sense when
// you are trying to edit a log and want to use channel renaming + the original
diff --git a/aos/events/logging/multinode_pingpong_split.json b/aos/events/logging/multinode_pingpong_split.json
index c696ec2..f5bd9cb 100644
--- a/aos/events/logging/multinode_pingpong_split.json
+++ b/aos/events/logging/multinode_pingpong_split.json
@@ -100,7 +100,8 @@
{
"name": "/pi1/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp",
"type": "aos.message_bridge.RemoteMessage",
- "logger": "NOT_LOGGED",
+ /* Log one remote timestamp channel to ensure everythings still works. */
+ "logger": "LOCAL_LOGGER",
"num_senders": 2,
"source_node": "pi1"
},