Support node specific remaps in LogReader
When we have a bunch of nodes running similar pieces of software, we end
up with a bunch of channels which are all the same, bar a prefix. The
applications then end up with node specific maps to avoid being
different everywhere.
When renaming those in replay, we need to recreate both the node
specific map, and take the map into account when renaming. We don't
want to leak the naming pattern up.
Change-Id: I6a6a0556bf49664679f443a21eaed23d024a515f
diff --git a/aos/events/BUILD b/aos/events/BUILD
index 5912ac6..59968c3 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -271,6 +271,7 @@
shard_count = 4,
deps = [
":event_loop_param_test",
+ ":message_counter",
":ping_lib",
":pong_lib",
":simulated_event_loop",
@@ -341,3 +342,12 @@
"//aos/logging:log_message_fbs",
],
)
+
+cc_library(
+ name = "message_counter",
+ hdrs = ["message_counter.h"],
+ visibility = ["//visibility:public"],
+ deps = [
+ ":event_loop",
+ ],
+)
diff --git a/aos/events/logging/BUILD b/aos/events/logging/BUILD
index 56bef43..48409bb 100644
--- a/aos/events/logging/BUILD
+++ b/aos/events/logging/BUILD
@@ -155,6 +155,7 @@
],
deps = [
":logger",
+ "//aos/events:message_counter",
"//aos/events:ping_lib",
"//aos/events:pong_lib",
"//aos/events:simulated_event_loop",
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index 424153a..3e6c24b 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -1421,6 +1421,50 @@
<< type;
}
+void LogReader::RemapLoggedChannel(std::string_view name, std::string_view type,
+ const Node *node,
+ std::string_view add_prefix) {
+ VLOG(1) << "Node is " << aos::FlatbufferToJson(node);
+ const Channel *remapped_channel =
+ configuration::GetChannel(logged_configuration(), name, type, "", node);
+ CHECK(remapped_channel != nullptr) << ": Failed to find {\"name\": \"" << name
+ << "\", \"type\": \"" << type << "\"}";
+ VLOG(1) << "Original {\"name\": \"" << name << "\", \"type\": \"" << type
+ << "\"}";
+ VLOG(1) << "Remapped "
+ << aos::configuration::StrippedChannelToString(remapped_channel);
+
+ // We want to make /spray on node 0 go to /0/spray by snooping the maps. And
+ // we want it to degrade if the heuristics fail to just work.
+ //
+ // The easiest way to do this is going to be incredibly specific and verbose.
+ // Look up /spray, to /0/spray. Then, prefix the result with /original to get
+ // /original/0/spray. Then, create a map from /original/spray to
+ // /original/0/spray for just the type we were asked for.
+ if (name != remapped_channel->name()->string_view()) {
+ MapT new_map;
+ new_map.match = std::make_unique<ChannelT>();
+ new_map.match->name = absl::StrCat(add_prefix, name);
+ new_map.match->type = type;
+ if (node != nullptr) {
+ new_map.match->source_node = node->name()->str();
+ }
+ new_map.rename = std::make_unique<ChannelT>();
+ new_map.rename->name =
+ absl::StrCat(add_prefix, remapped_channel->name()->string_view());
+ maps_.emplace_back(std::move(new_map));
+ }
+
+ const size_t channel_index =
+ configuration::ChannelIndex(logged_configuration(), remapped_channel);
+ CHECK_EQ(0u, remapped_channels_.count(channel_index))
+ << "Already remapped channel "
+ << configuration::CleanedChannelToString(remapped_channel);
+ remapped_channels_[channel_index] =
+ absl::StrCat(add_prefix, remapped_channel->name()->string_view());
+ MakeRemappedConfig();
+}
+
void LogReader::MakeRemappedConfig() {
for (std::unique_ptr<State> &state : states_) {
if (state) {
@@ -1489,10 +1533,46 @@
&new_config_fbb));
}
// Create the Configuration containing the new channels that we want to add.
- const auto new_name_vector_offsets =
+ const auto new_channel_vector_offsets =
new_config_fbb.CreateVector(channel_offsets);
+
+ // Now create the new maps.
+ std::vector<flatbuffers::Offset<Map>> map_offsets;
+ for (const MapT &map : maps_) {
+ const flatbuffers::Offset<flatbuffers::String> match_name_offset =
+ new_config_fbb.CreateString(map.match->name);
+ const flatbuffers::Offset<flatbuffers::String> match_type_offset =
+ new_config_fbb.CreateString(map.match->type);
+ const flatbuffers::Offset<flatbuffers::String> rename_name_offset =
+ new_config_fbb.CreateString(map.rename->name);
+ flatbuffers::Offset<flatbuffers::String> match_source_node_offset;
+ if (!map.match->source_node.empty()) {
+ match_source_node_offset =
+ new_config_fbb.CreateString(map.match->source_node);
+ }
+ Channel::Builder match_builder(new_config_fbb);
+ match_builder.add_name(match_name_offset);
+ match_builder.add_type(match_type_offset);
+ if (!map.match->source_node.empty()) {
+ match_builder.add_source_node(match_source_node_offset);
+ }
+ const flatbuffers::Offset<Channel> match_offset = match_builder.Finish();
+
+ Channel::Builder rename_builder(new_config_fbb);
+ rename_builder.add_name(rename_name_offset);
+ const flatbuffers::Offset<Channel> rename_offset = rename_builder.Finish();
+
+ Map::Builder map_builder(new_config_fbb);
+ map_builder.add_match(match_offset);
+ map_builder.add_rename(rename_offset);
+ map_offsets.emplace_back(map_builder.Finish());
+ }
+
+ const auto new_maps_offsets = new_config_fbb.CreateVector(map_offsets);
+
ConfigurationBuilder new_config_builder(new_config_fbb);
- new_config_builder.add_channels(new_name_vector_offsets);
+ new_config_builder.add_channels(new_channel_vector_offsets);
+ new_config_builder.add_maps(new_maps_offsets);
new_config_fbb.Finish(new_config_builder.Finish());
const FlatbufferDetachedBuffer<Configuration> new_name_config =
new_config_fbb.Release();
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index a3189db..ba76424 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -258,11 +258,28 @@
RemapLoggedChannel(name, T::GetFullyQualifiedName(), add_prefix);
}
+ // Remaps the provided channel, though this respects node mappings, and
+ // preserves them too. This makes it so if /aos -> /pi1/aos on one node,
+ // /original/aos -> /original/pi1/aos on the same node after renaming, just
+ // like you would hope.
+ //
+ // TODO(austin): If you have 2 nodes remapping something to the same channel,
+ // this doesn't handle that. No use cases exist yet for that, so it isn't
+ // being done yet.
+ void RemapLoggedChannel(std::string_view name, std::string_view type,
+ const Node *node,
+ std::string_view add_prefix = "/original");
template <typename T>
- bool HasChannel(std::string_view name) {
+ void RemapLoggedChannel(std::string_view name, const Node *node,
+ std::string_view add_prefix = "/original") {
+ RemapLoggedChannel(name, T::GetFullyQualifiedName(), node, add_prefix);
+ }
+
+ template <typename T>
+ bool HasChannel(std::string_view name, const Node *node = nullptr) {
return configuration::GetChannel(log_file_header()->configuration(), name,
T::GetFullyQualifiedName(), "",
- nullptr) != nullptr;
+ node) != nullptr;
}
SimulatedEventLoopFactory *event_loop_factory() {
@@ -583,6 +600,7 @@
// logged_configuration(), and the string key will be the name of the channel
// to send on instead of the logged channel name.
std::map<size_t, std::string> remapped_channels_;
+ std::vector<MapT> maps_;
// Number of nodes which still have data to send. This is used to figure out
// when to exit.
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 47fe9e2..0622650 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -1,6 +1,7 @@
#include "aos/events/logging/logger.h"
#include "aos/events/event_loop.h"
+#include "aos/events/message_counter.h"
#include "aos/events/ping_lib.h"
#include "aos/events/pong_lib.h"
#include "aos/events/simulated_event_loop.h"
@@ -14,6 +15,7 @@
namespace testing {
namespace chrono = std::chrono;
+using aos::testing::MessageCounter;
class LoggerTest : public ::testing::Test {
public:
@@ -983,6 +985,70 @@
::testing::UnorderedElementsAreArray(structured_logfiles_));
}
+// Tests that if we remap a remapped channel, it shows up correctly.
+TEST_F(MultinodeLoggerTest, RemapLoggedChannel) {
+ {
+ LoggerState pi1_logger = MakeLogger(pi1_);
+ LoggerState pi2_logger = MakeLogger(pi2_);
+
+ event_loop_factory_.RunFor(chrono::milliseconds(95));
+
+ StartLogger(&pi1_logger);
+ StartLogger(&pi2_logger);
+
+ event_loop_factory_.RunFor(chrono::milliseconds(20000));
+ }
+
+ LogReader reader(structured_logfiles_);
+
+ // Remap just on pi1.
+ reader.RemapLoggedChannel<aos::timing::Report>(
+ "/aos", configuration::GetNode(reader.configuration(), "pi1"));
+
+ SimulatedEventLoopFactory log_reader_factory(reader.configuration());
+ log_reader_factory.set_send_delay(chrono::microseconds(0));
+
+ reader.Register(&log_reader_factory);
+
+ const Node *pi1 =
+ configuration::GetNode(log_reader_factory.configuration(), "pi1");
+ const Node *pi2 =
+ configuration::GetNode(log_reader_factory.configuration(), "pi2");
+
+ // Confirm we can read the data on the remapped channel, just for pi1. Nothing
+ // else should have moved.
+ std::unique_ptr<EventLoop> pi1_event_loop =
+ log_reader_factory.MakeEventLoop("test", pi1);
+ pi1_event_loop->SkipTimingReport();
+ std::unique_ptr<EventLoop> full_pi1_event_loop =
+ log_reader_factory.MakeEventLoop("test", pi1);
+ full_pi1_event_loop->SkipTimingReport();
+ std::unique_ptr<EventLoop> pi2_event_loop =
+ log_reader_factory.MakeEventLoop("test", pi2);
+ pi2_event_loop->SkipTimingReport();
+
+ MessageCounter<aos::timing::Report> pi1_timing_report(pi1_event_loop.get(),
+ "/aos");
+ MessageCounter<aos::timing::Report> full_pi1_timing_report(
+ full_pi1_event_loop.get(), "/pi1/aos");
+ MessageCounter<aos::timing::Report> pi1_original_timing_report(
+ pi1_event_loop.get(), "/original/aos");
+ MessageCounter<aos::timing::Report> full_pi1_original_timing_report(
+ full_pi1_event_loop.get(), "/original/pi1/aos");
+ MessageCounter<aos::timing::Report> pi2_timing_report(pi2_event_loop.get(),
+ "/aos");
+
+ log_reader_factory.Run();
+
+ EXPECT_EQ(pi1_timing_report.count(), 0u);
+ EXPECT_EQ(full_pi1_timing_report.count(), 0u);
+ EXPECT_NE(pi1_original_timing_report.count(), 0u);
+ EXPECT_NE(full_pi1_original_timing_report.count(), 0u);
+ EXPECT_NE(pi2_timing_report.count(), 0u);
+
+ reader.Deregister();
+}
+
// TODO(austin): We can write a test which recreates a logfile and confirms that
// we get it back. That is the ultimate test.
diff --git a/aos/events/message_counter.h b/aos/events/message_counter.h
new file mode 100644
index 0000000..418c1eb
--- /dev/null
+++ b/aos/events/message_counter.h
@@ -0,0 +1,28 @@
+#ifndef AOS_EVENTS_MESSAGE_COUNTER_H_
+#define AOS_EVENTS_MESSAGE_COUNTER_H_
+
+#include "aos/events/event_loop.h"
+
+namespace aos {
+namespace testing {
+
+// Simple class to count messages on a channel easily. This only counts
+// messages published while running.
+template <typename T>
+class MessageCounter {
+ public:
+ MessageCounter(aos::EventLoop *event_loop, std::string_view name) {
+ event_loop->MakeNoArgWatcher<T>(name, [this]() { ++count_; });
+ }
+
+ // Returns the number of messages seen so far.
+ size_t count() const { return count_; }
+
+ private:
+ size_t count_ = 0;
+};
+
+} // namespace testing
+} // namespace aos
+
+#endif // AOS_EVENTS_MESSAGE_COUNTER_H_
diff --git a/aos/events/simulated_event_loop_test.cc b/aos/events/simulated_event_loop_test.cc
index cf48c34..c7ab942 100644
--- a/aos/events/simulated_event_loop_test.cc
+++ b/aos/events/simulated_event_loop_test.cc
@@ -4,6 +4,7 @@
#include "aos/events/event_loop_param_test.h"
#include "aos/events/logging/logger_generated.h"
+#include "aos/events/message_counter.h"
#include "aos/events/ping_lib.h"
#include "aos/events/pong_lib.h"
#include "aos/events/test_message_generated.h"
@@ -259,19 +260,6 @@
0.0);
}
-template <typename T>
-class MessageCounter {
- public:
- MessageCounter(aos::EventLoop *event_loop, std::string_view name) {
- event_loop->MakeNoArgWatcher<T>(name, [this]() { ++count_; });
- }
-
- size_t count() const { return count_; }
-
- private:
- size_t count_ = 0;
-};
-
// Tests that ping and pong work when on 2 different nodes, and the message
// gateway messages are sent out as expected.
TEST(SimulatedEventLoopTest, MultinodePingPong) {