Merge "Support node specific remaps in LogReader"
diff --git a/aos/events/BUILD b/aos/events/BUILD
index 5912ac6..59968c3 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -271,6 +271,7 @@
shard_count = 4,
deps = [
":event_loop_param_test",
+ ":message_counter",
":ping_lib",
":pong_lib",
":simulated_event_loop",
@@ -341,3 +342,12 @@
"//aos/logging:log_message_fbs",
],
)
+
+cc_library(
+ name = "message_counter",
+ hdrs = ["message_counter.h"],
+ visibility = ["//visibility:public"],
+ deps = [
+ ":event_loop",
+ ],
+)
diff --git a/aos/events/logging/BUILD b/aos/events/logging/BUILD
index 56bef43..48409bb 100644
--- a/aos/events/logging/BUILD
+++ b/aos/events/logging/BUILD
@@ -155,6 +155,7 @@
],
deps = [
":logger",
+ "//aos/events:message_counter",
"//aos/events:ping_lib",
"//aos/events:pong_lib",
"//aos/events:simulated_event_loop",
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index 0a1e543..aee3f8b 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -1569,6 +1569,50 @@
<< type;
}
+void LogReader::RemapLoggedChannel(std::string_view name, std::string_view type,
+ const Node *node,
+ std::string_view add_prefix) {
+ VLOG(1) << "Node is " << aos::FlatbufferToJson(node);
+ const Channel *remapped_channel =
+ configuration::GetChannel(logged_configuration(), name, type, "", node);
+ CHECK(remapped_channel != nullptr) << ": Failed to find {\"name\": \"" << name
+ << "\", \"type\": \"" << type << "\"}";
+ VLOG(1) << "Original {\"name\": \"" << name << "\", \"type\": \"" << type
+ << "\"}";
+ VLOG(1) << "Remapped "
+ << aos::configuration::StrippedChannelToString(remapped_channel);
+
+ // We want to make /spray on node 0 go to /0/spray by snooping the maps. And
+ // we want it to degrade if the heuristics fail to just work.
+ //
+ // The easiest way to do this is going to be incredibly specific and verbose.
+ // Look up /spray, to /0/spray. Then, prefix the result with /original to get
+ // /original/0/spray. Then, create a map from /original/spray to
+ // /original/0/spray for just the type we were asked for.
+ if (name != remapped_channel->name()->string_view()) {
+ MapT new_map;
+ new_map.match = std::make_unique<ChannelT>();
+ new_map.match->name = absl::StrCat(add_prefix, name);
+ new_map.match->type = type;
+ if (node != nullptr) {
+ new_map.match->source_node = node->name()->str();
+ }
+ new_map.rename = std::make_unique<ChannelT>();
+ new_map.rename->name =
+ absl::StrCat(add_prefix, remapped_channel->name()->string_view());
+ maps_.emplace_back(std::move(new_map));
+ }
+
+ const size_t channel_index =
+ configuration::ChannelIndex(logged_configuration(), remapped_channel);
+ CHECK_EQ(0u, remapped_channels_.count(channel_index))
+ << "Already remapped channel "
+ << configuration::CleanedChannelToString(remapped_channel);
+ remapped_channels_[channel_index] =
+ absl::StrCat(add_prefix, remapped_channel->name()->string_view());
+ MakeRemappedConfig();
+}
+
void LogReader::MakeRemappedConfig() {
for (std::unique_ptr<State> &state : states_) {
if (state) {
@@ -1637,10 +1681,46 @@
&new_config_fbb));
}
// Create the Configuration containing the new channels that we want to add.
- const auto new_name_vector_offsets =
+ const auto new_channel_vector_offsets =
new_config_fbb.CreateVector(channel_offsets);
+
+ // Now create the new maps.
+ std::vector<flatbuffers::Offset<Map>> map_offsets;
+ for (const MapT &map : maps_) {
+ const flatbuffers::Offset<flatbuffers::String> match_name_offset =
+ new_config_fbb.CreateString(map.match->name);
+ const flatbuffers::Offset<flatbuffers::String> match_type_offset =
+ new_config_fbb.CreateString(map.match->type);
+ const flatbuffers::Offset<flatbuffers::String> rename_name_offset =
+ new_config_fbb.CreateString(map.rename->name);
+ flatbuffers::Offset<flatbuffers::String> match_source_node_offset;
+ if (!map.match->source_node.empty()) {
+ match_source_node_offset =
+ new_config_fbb.CreateString(map.match->source_node);
+ }
+ Channel::Builder match_builder(new_config_fbb);
+ match_builder.add_name(match_name_offset);
+ match_builder.add_type(match_type_offset);
+ if (!map.match->source_node.empty()) {
+ match_builder.add_source_node(match_source_node_offset);
+ }
+ const flatbuffers::Offset<Channel> match_offset = match_builder.Finish();
+
+ Channel::Builder rename_builder(new_config_fbb);
+ rename_builder.add_name(rename_name_offset);
+ const flatbuffers::Offset<Channel> rename_offset = rename_builder.Finish();
+
+ Map::Builder map_builder(new_config_fbb);
+ map_builder.add_match(match_offset);
+ map_builder.add_rename(rename_offset);
+ map_offsets.emplace_back(map_builder.Finish());
+ }
+
+ const auto new_maps_offsets = new_config_fbb.CreateVector(map_offsets);
+
ConfigurationBuilder new_config_builder(new_config_fbb);
- new_config_builder.add_channels(new_name_vector_offsets);
+ new_config_builder.add_channels(new_channel_vector_offsets);
+ new_config_builder.add_maps(new_maps_offsets);
new_config_fbb.Finish(new_config_builder.Finish());
const FlatbufferDetachedBuffer<Configuration> new_name_config =
new_config_fbb.Release();
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index 2f6f931..70c9f60 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -300,11 +300,28 @@
RemapLoggedChannel(name, T::GetFullyQualifiedName(), add_prefix);
}
+ // Remaps the provided channel, though this respects node mappings, and
+ // preserves them too. This makes it so if /aos -> /pi1/aos on one node,
+ // /original/aos -> /original/pi1/aos on the same node after renaming, just
+ // like you would hope.
+ //
+ // TODO(austin): If you have 2 nodes remapping something to the same channel,
+ // this doesn't handle that. No use cases exist yet for that, so it isn't
+ // being done yet.
+ void RemapLoggedChannel(std::string_view name, std::string_view type,
+ const Node *node,
+ std::string_view add_prefix = "/original");
template <typename T>
- bool HasChannel(std::string_view name) {
+ void RemapLoggedChannel(std::string_view name, const Node *node,
+ std::string_view add_prefix = "/original") {
+ RemapLoggedChannel(name, T::GetFullyQualifiedName(), node, add_prefix);
+ }
+
+ template <typename T>
+ bool HasChannel(std::string_view name, const Node *node = nullptr) {
return configuration::GetChannel(log_file_header()->configuration(), name,
T::GetFullyQualifiedName(), "",
- nullptr) != nullptr;
+ node) != nullptr;
}
SimulatedEventLoopFactory *event_loop_factory() {
@@ -625,6 +642,7 @@
// logged_configuration(), and the string key will be the name of the channel
// to send on instead of the logged channel name.
std::map<size_t, std::string> remapped_channels_;
+ std::vector<MapT> maps_;
// Number of nodes which still have data to send. This is used to figure out
// when to exit.
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index be77f38..d4b6424 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -1,6 +1,7 @@
#include "aos/events/logging/logger.h"
#include "aos/events/event_loop.h"
+#include "aos/events/message_counter.h"
#include "aos/events/ping_lib.h"
#include "aos/events/pong_lib.h"
#include "aos/events/simulated_event_loop.h"
@@ -14,6 +15,7 @@
namespace testing {
namespace chrono = std::chrono;
+using aos::testing::MessageCounter;
class LoggerTest : public ::testing::Test {
public:
@@ -1018,6 +1020,70 @@
::testing::UnorderedElementsAreArray(structured_logfiles_));
}
+// Tests that if we remap a remapped channel, it shows up correctly.
+TEST_F(MultinodeLoggerTest, RemapLoggedChannel) {
+ {
+ LoggerState pi1_logger = MakeLogger(pi1_);
+ LoggerState pi2_logger = MakeLogger(pi2_);
+
+ event_loop_factory_.RunFor(chrono::milliseconds(95));
+
+ StartLogger(&pi1_logger);
+ StartLogger(&pi2_logger);
+
+ event_loop_factory_.RunFor(chrono::milliseconds(20000));
+ }
+
+ LogReader reader(structured_logfiles_);
+
+ // Remap just on pi1.
+ reader.RemapLoggedChannel<aos::timing::Report>(
+ "/aos", configuration::GetNode(reader.configuration(), "pi1"));
+
+ SimulatedEventLoopFactory log_reader_factory(reader.configuration());
+ log_reader_factory.set_send_delay(chrono::microseconds(0));
+
+ reader.Register(&log_reader_factory);
+
+ const Node *pi1 =
+ configuration::GetNode(log_reader_factory.configuration(), "pi1");
+ const Node *pi2 =
+ configuration::GetNode(log_reader_factory.configuration(), "pi2");
+
+ // Confirm we can read the data on the remapped channel, just for pi1. Nothing
+ // else should have moved.
+ std::unique_ptr<EventLoop> pi1_event_loop =
+ log_reader_factory.MakeEventLoop("test", pi1);
+ pi1_event_loop->SkipTimingReport();
+ std::unique_ptr<EventLoop> full_pi1_event_loop =
+ log_reader_factory.MakeEventLoop("test", pi1);
+ full_pi1_event_loop->SkipTimingReport();
+ std::unique_ptr<EventLoop> pi2_event_loop =
+ log_reader_factory.MakeEventLoop("test", pi2);
+ pi2_event_loop->SkipTimingReport();
+
+ MessageCounter<aos::timing::Report> pi1_timing_report(pi1_event_loop.get(),
+ "/aos");
+ MessageCounter<aos::timing::Report> full_pi1_timing_report(
+ full_pi1_event_loop.get(), "/pi1/aos");
+ MessageCounter<aos::timing::Report> pi1_original_timing_report(
+ pi1_event_loop.get(), "/original/aos");
+ MessageCounter<aos::timing::Report> full_pi1_original_timing_report(
+ full_pi1_event_loop.get(), "/original/pi1/aos");
+ MessageCounter<aos::timing::Report> pi2_timing_report(pi2_event_loop.get(),
+ "/aos");
+
+ log_reader_factory.Run();
+
+ EXPECT_EQ(pi1_timing_report.count(), 0u);
+ EXPECT_EQ(full_pi1_timing_report.count(), 0u);
+ EXPECT_NE(pi1_original_timing_report.count(), 0u);
+ EXPECT_NE(full_pi1_original_timing_report.count(), 0u);
+ EXPECT_NE(pi2_timing_report.count(), 0u);
+
+ reader.Deregister();
+}
+
// TODO(austin): We can write a test which recreates a logfile and confirms that
// we get it back. That is the ultimate test.
diff --git a/aos/events/message_counter.h b/aos/events/message_counter.h
new file mode 100644
index 0000000..418c1eb
--- /dev/null
+++ b/aos/events/message_counter.h
@@ -0,0 +1,28 @@
+#ifndef AOS_EVENTS_MESSAGE_COUNTER_H_
+#define AOS_EVENTS_MESSAGE_COUNTER_H_
+
+#include "aos/events/event_loop.h"
+
+namespace aos {
+namespace testing {
+
+// Simple class to count messages on a channel easily. This only counts
+// messages published while running.
+template <typename T>
+class MessageCounter {
+ public:
+ MessageCounter(aos::EventLoop *event_loop, std::string_view name) {
+ event_loop->MakeNoArgWatcher<T>(name, [this]() { ++count_; });
+ }
+
+ // Returns the number of messages seen so far.
+ size_t count() const { return count_; }
+
+ private:
+ size_t count_ = 0;
+};
+
+} // namespace testing
+} // namespace aos
+
+#endif // AOS_EVENTS_MESSAGE_COUNTER_H_
diff --git a/aos/events/simulated_event_loop_test.cc b/aos/events/simulated_event_loop_test.cc
index 375b8fd..81937ca 100644
--- a/aos/events/simulated_event_loop_test.cc
+++ b/aos/events/simulated_event_loop_test.cc
@@ -4,6 +4,7 @@
#include "aos/events/event_loop_param_test.h"
#include "aos/events/logging/logger_generated.h"
+#include "aos/events/message_counter.h"
#include "aos/events/ping_lib.h"
#include "aos/events/pong_lib.h"
#include "aos/events/test_message_generated.h"
@@ -259,19 +260,6 @@
0.0);
}
-template <typename T>
-class MessageCounter {
- public:
- MessageCounter(aos::EventLoop *event_loop, std::string_view name) {
- event_loop->MakeNoArgWatcher<T>(name, [this]() { ++count_; });
- }
-
- size_t count() const { return count_; }
-
- private:
- size_t count_ = 0;
-};
-
// Tests that ping and pong work when on 2 different nodes, and the message
// gateway messages are sent out as expected.
TEST(SimulatedEventLoopTest, MultinodePingPong) {