Automatically add split timestamp channels in RemapLoggedChannel
With the new split timestamp channels, we need a corresponding timestamp
channel to be available for timestamps to be sent on for all channels.
When a channel gets remapped, the timestamp channel it's timestamps get
published to changes name. This makes it so replaying that log file
fails because it can't find the corresponding timestamp channel.
We can require the user to create that channel themselves, but that is
pretty error prone. Make LogReader do it as part of RemapLoggedChannel
automatically.
While we are here, add a test for the expected behavior.
Change-Id: I1f920b5031c2cedc21f73775797acc6795a7882d
diff --git a/aos/configuration.cc b/aos/configuration.cc
index 25d59ff..b388bd3 100644
--- a/aos/configuration.cc
+++ b/aos/configuration.cc
@@ -272,55 +272,6 @@
return a->name()->string_view() == name;
}
-// Maps name for the provided maps. Modifies name.
-void HandleMaps(const flatbuffers::Vector<flatbuffers::Offset<aos::Map>> *maps,
- std::string *name, std::string_view type, const Node *node) {
- // For the same reason we merge configs in reverse order, we want to process
- // maps in reverse order. That lets the outer config overwrite channels from
- // the inner configs.
- for (auto i = maps->rbegin(); i != maps->rend(); ++i) {
- if (!i->has_match() || !i->match()->has_name()) {
- continue;
- }
- if (!i->has_rename() || !i->rename()->has_name()) {
- continue;
- }
-
- // Handle normal maps (now that we know that match and rename are filled
- // out).
- const std::string_view match_name = i->match()->name()->string_view();
- if (match_name != *name) {
- if (match_name.back() == '*' &&
- std::string_view(*name).substr(
- 0, std::min(name->size(), match_name.size() - 1)) ==
- match_name.substr(0, match_name.size() - 1)) {
- CHECK_EQ(match_name.find('*'), match_name.size() - 1);
- } else {
- continue;
- }
- }
-
- // Handle type specific maps.
- if (i->match()->has_type() && i->match()->type()->string_view() != type) {
- continue;
- }
-
- // Now handle node specific maps.
- if (node != nullptr && i->match()->has_source_node() &&
- i->match()->source_node()->string_view() !=
- node->name()->string_view()) {
- continue;
- }
-
- std::string new_name(i->rename()->name()->string_view());
- if (match_name.back() == '*') {
- new_name += std::string(name->substr(match_name.size() - 1));
- }
- VLOG(1) << "Renamed \"" << *name << "\" to \"" << new_name << "\"";
- *name = std::move(new_name);
- }
-}
-
void ValidateConfiguration(const Flatbuffer<Configuration> &config) {
// No imports should be left.
CHECK(!config.message().has_imports());
@@ -419,6 +370,55 @@
} // namespace
+// Maps name for the provided maps. Modifies name.
+void HandleMaps(const flatbuffers::Vector<flatbuffers::Offset<aos::Map>> *maps,
+ std::string *name, std::string_view type, const Node *node) {
+ // For the same reason we merge configs in reverse order, we want to process
+ // maps in reverse order. That lets the outer config overwrite channels from
+ // the inner configs.
+ for (auto i = maps->rbegin(); i != maps->rend(); ++i) {
+ if (!i->has_match() || !i->match()->has_name()) {
+ continue;
+ }
+ if (!i->has_rename() || !i->rename()->has_name()) {
+ continue;
+ }
+
+ // Handle normal maps (now that we know that match and rename are filled
+ // out).
+ const std::string_view match_name = i->match()->name()->string_view();
+ if (match_name != *name) {
+ if (match_name.back() == '*' &&
+ std::string_view(*name).substr(
+ 0, std::min(name->size(), match_name.size() - 1)) ==
+ match_name.substr(0, match_name.size() - 1)) {
+ CHECK_EQ(match_name.find('*'), match_name.size() - 1);
+ } else {
+ continue;
+ }
+ }
+
+ // Handle type specific maps.
+ if (i->match()->has_type() && i->match()->type()->string_view() != type) {
+ continue;
+ }
+
+ // Now handle node specific maps.
+ if (node != nullptr && i->match()->has_source_node() &&
+ i->match()->source_node()->string_view() !=
+ node->name()->string_view()) {
+ continue;
+ }
+
+ std::string new_name(i->rename()->name()->string_view());
+ if (match_name.back() == '*') {
+ new_name += std::string(name->substr(match_name.size() - 1));
+ }
+ VLOG(1) << "Renamed \"" << *name << "\" to \"" << new_name << "\"";
+ *name = std::move(new_name);
+ }
+}
+
FlatbufferDetachedBuffer<Configuration> MergeConfiguration(
const Flatbuffer<Configuration> &config) {
// auto_merge_config will contain all the fields of the Configuration that are
diff --git a/aos/events/logging/log_reader.cc b/aos/events/logging/log_reader.cc
index 2eb3094..05555c6 100644
--- a/aos/events/logging/log_reader.cc
+++ b/aos/events/logging/log_reader.cc
@@ -39,6 +39,12 @@
"The time to buffer ahead in the log file to accurately reconstruct time.");
namespace aos {
+namespace configuration {
+// We don't really want to expose this publicly, but log reader doesn't really
+// want to re-implement it.
+void HandleMaps(const flatbuffers::Vector<flatbuffers::Offset<aos::Map>> *maps,
+ std::string *name, std::string_view type, const Node *node);
+}
namespace logger {
namespace {
@@ -892,6 +898,65 @@
nullptr));
channel_offsets.emplace_back(
CopyChannel(c, pair.second.remapped_name, "", &fbb));
+
+ if (c->has_destination_nodes()) {
+ for (const Connection *connection : *c->destination_nodes()) {
+ switch (connection->timestamp_logger()) {
+ case LoggerConfig::LOCAL_LOGGER:
+ case LoggerConfig::NOT_LOGGED:
+ // There is no timestamp channel associated with this, so ignore it.
+ break;
+
+ case LoggerConfig::REMOTE_LOGGER:
+ case LoggerConfig::LOCAL_AND_REMOTE_LOGGER:
+ // We want to make a split timestamp channel regardless of what type
+ // of log this used to be. No sense propagating the single
+ // timestamp channel.
+
+ CHECK(connection->has_timestamp_logger_nodes());
+ for (const flatbuffers::String *timestamp_logger_node :
+ *connection->timestamp_logger_nodes()) {
+ const Node *node = configuration::GetNode(
+ logged_configuration(), timestamp_logger_node->string_view());
+ message_bridge::ChannelTimestampFinder finder(
+ logged_configuration(), "log_reader", node);
+
+ // We are assuming here that all the maps are setup correctly to
+ // handle arbitrary timestamps. Apply the maps for this node to
+ // see what name this ends up with.
+ std::string name = finder.SplitChannelName(
+ pair.second.remapped_name, c->type()->str(), connection);
+ std::string unmapped_name = name;
+ configuration::HandleMaps(logged_configuration()->maps(), &name,
+ "aos.message_bridge.RemoteMessage",
+ node);
+ CHECK_NE(name, unmapped_name)
+ << ": Remote timestamp channel was not remapped, this is "
+ "very fishy";
+ flatbuffers::Offset<flatbuffers::String> channel_name_offset =
+ fbb.CreateString(name);
+ flatbuffers::Offset<flatbuffers::String> channel_type_offset =
+ fbb.CreateString("aos.message_bridge.RemoteMessage");
+ flatbuffers::Offset<flatbuffers::String> source_node_offset =
+ fbb.CreateString(timestamp_logger_node->string_view());
+
+ // Now, build a channel. Don't log it, 2 senders, and match the
+ // source frequency.
+ Channel::Builder channel_builder(fbb);
+ channel_builder.add_name(channel_name_offset);
+ channel_builder.add_type(channel_type_offset);
+ channel_builder.add_source_node(source_node_offset);
+ channel_builder.add_logger(LoggerConfig::NOT_LOGGED);
+ channel_builder.add_num_senders(2);
+ if (c->has_frequency()) {
+ channel_builder.add_frequency(c->frequency());
+ }
+ channel_offsets.emplace_back(channel_builder.Finish());
+ }
+ break;
+ }
+ }
+ }
}
// Now reconstruct the original channels, translating types as needed
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 13b2312..2b504f3 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -1767,6 +1767,83 @@
reader.Deregister();
}
+// Tests that we can remap a forwarded channel as well.
+TEST_P(MultinodeLoggerTest, RemapForwardedLoggedChannel) {
+ time_converter_.StartEqual();
+ {
+ LoggerState pi1_logger = MakeLogger(pi1_);
+ LoggerState pi2_logger = MakeLogger(pi2_);
+
+ event_loop_factory_.RunFor(chrono::milliseconds(95));
+
+ StartLogger(&pi1_logger);
+ StartLogger(&pi2_logger);
+
+ event_loop_factory_.RunFor(chrono::milliseconds(20000));
+ }
+
+ LogReader reader(SortParts(logfiles_));
+
+ reader.RemapLoggedChannel<examples::Ping>("/test");
+
+ SimulatedEventLoopFactory log_reader_factory(reader.configuration());
+ log_reader_factory.set_send_delay(chrono::microseconds(0));
+
+ reader.Register(&log_reader_factory);
+
+ const Node *pi1 =
+ configuration::GetNode(log_reader_factory.configuration(), "pi1");
+ const Node *pi2 =
+ configuration::GetNode(log_reader_factory.configuration(), "pi2");
+
+ // Confirm we can read the data on the remapped channel, just for pi1. Nothing
+ // else should have moved.
+ std::unique_ptr<EventLoop> pi1_event_loop =
+ log_reader_factory.MakeEventLoop("test", pi1);
+ pi1_event_loop->SkipTimingReport();
+ std::unique_ptr<EventLoop> full_pi1_event_loop =
+ log_reader_factory.MakeEventLoop("test", pi1);
+ full_pi1_event_loop->SkipTimingReport();
+ std::unique_ptr<EventLoop> pi2_event_loop =
+ log_reader_factory.MakeEventLoop("test", pi2);
+ pi2_event_loop->SkipTimingReport();
+
+ MessageCounter<examples::Ping> pi1_ping(pi1_event_loop.get(), "/test");
+ MessageCounter<examples::Ping> pi2_ping(pi2_event_loop.get(), "/test");
+ MessageCounter<examples::Ping> pi1_original_ping(pi1_event_loop.get(),
+ "/original/test");
+ MessageCounter<examples::Ping> pi2_original_ping(pi2_event_loop.get(),
+ "/original/test");
+
+ std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
+ pi1_original_ping_timestamp;
+ std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
+ pi1_ping_timestamp;
+ if (!shared()) {
+ pi1_original_ping_timestamp =
+ std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
+ pi1_event_loop.get(),
+ "/pi1/aos/remote_timestamps/pi2/original/test/aos-examples-Ping");
+ pi1_ping_timestamp =
+ std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
+ pi1_event_loop.get(),
+ "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping");
+ }
+
+ log_reader_factory.Run();
+
+ EXPECT_EQ(pi1_ping.count(), 0u);
+ EXPECT_EQ(pi2_ping.count(), 0u);
+ EXPECT_NE(pi1_original_ping.count(), 0u);
+ EXPECT_NE(pi2_original_ping.count(), 0u);
+ if (!shared()) {
+ EXPECT_NE(pi1_original_ping_timestamp->count(), 0u);
+ EXPECT_EQ(pi1_ping_timestamp->count(), 0u);
+ }
+
+ reader.Deregister();
+}
+
// Tests that we properly recreate forwarded timestamps when replaying a log.
// This should be enough that we can then re-run the logger and get a valid log
// back.
diff --git a/aos/network/timestamp_channel.cc b/aos/network/timestamp_channel.cc
index bc8c1d9..bbd62c4 100644
--- a/aos/network/timestamp_channel.cc
+++ b/aos/network/timestamp_channel.cc
@@ -12,11 +12,14 @@
std::string ChannelTimestampFinder::SplitChannelName(
const Channel *channel, const Connection *connection) {
- std::string type(channel->type()->string_view());
+ return SplitChannelName(channel->name()->string_view(), channel->type()->str(), connection);
+}
+
+std::string ChannelTimestampFinder::SplitChannelName(
+ std::string_view name, std::string type, const Connection *connection) {
std::replace(type.begin(), type.end(), '.', '-');
return absl::StrCat("/aos/remote_timestamps/",
- connection->name()->string_view(),
- channel->name()->string_view(), "/", type);
+ connection->name()->string_view(), name, "/", type);
}
std::string ChannelTimestampFinder::CombinedChannelName(
diff --git a/aos/network/timestamp_channel.h b/aos/network/timestamp_channel.h
index 1702632..738ca10 100644
--- a/aos/network/timestamp_channel.h
+++ b/aos/network/timestamp_channel.h
@@ -29,6 +29,8 @@
const Connection *connection);
std::string SplitChannelName(const Channel *channel,
const Connection *connection);
+ std::string SplitChannelName(std::string_view name, std::string type,
+ const Connection *connection);
std::string CombinedChannelName(std::string_view remote_node);
private: