Add NodeEventLoopFactory
This lets us create event loops on separate nodes which can't
communicate with each other. Next step is to add a message proxy
between them, then teach the logger to replay onto multiple nodes.
Change-Id: I06b2836365aea13d696535c52a78ca0c862a7b1e
diff --git a/aos/events/logging/log_cat.cc b/aos/events/logging/log_cat.cc
index 1d1dd8b..80ec8e7 100644
--- a/aos/events/logging/log_cat.cc
+++ b/aos/events/logging/log_cat.cc
@@ -31,7 +31,7 @@
reader.Register();
std::unique_ptr<aos::EventLoop> printer_event_loop =
- reader.event_loop_factory()->MakeEventLoop("printer");
+ reader.event_loop_factory()->MakeEventLoop("printer", reader.node());
printer_event_loop->SkipTimingReport();
bool found_channel = false;
@@ -66,7 +66,7 @@
<< aos::FlatbufferToJson(
channel->schema(),
static_cast<const uint8_t *>(message))
- << '\n';
+ << std::endl;
} else {
std::cout << context.realtime_event_time << " ("
<< context.monotonic_event_time << ") "
@@ -75,7 +75,7 @@
<< aos::FlatbufferToJson(
channel->schema(),
static_cast<const uint8_t *>(message))
- << '\n';
+ << std::endl;
}
});
found_channel = true;
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index 51dc10c..2de17d7 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -276,13 +276,16 @@
void LogReader::Register() {
event_loop_factory_unique_ptr_ =
- std::make_unique<SimulatedEventLoopFactory>(configuration(), node());
+ std::make_unique<SimulatedEventLoopFactory>(configuration());
Register(event_loop_factory_unique_ptr_.get());
}
void LogReader::Register(SimulatedEventLoopFactory *event_loop_factory) {
event_loop_factory_ = event_loop_factory;
- event_loop_unique_ptr_ = event_loop_factory_->MakeEventLoop("log_reader");
+ node_event_loop_factory_ =
+ event_loop_factory_->GetNodeEventLoopFactory(node());
+ event_loop_unique_ptr_ =
+ event_loop_factory->MakeEventLoop("log_reader", node());
// We don't run timing reports when trying to print out logged data, because
// otherwise we would end up printing out the timing reports themselves...
// This is only really relevant when we are replaying into a simulation.
@@ -355,8 +358,8 @@
"this.";
// If we have access to the factory, use it to fix the realtime time.
- if (event_loop_factory_ != nullptr) {
- event_loop_factory_->SetRealtimeOffset(
+ if (node_event_loop_factory_ != nullptr) {
+ node_event_loop_factory_->SetRealtimeOffset(
monotonic_clock::time_point(chrono::nanoseconds(
channel_data.message().monotonic_sent_time())),
realtime_clock::time_point(chrono::nanoseconds(
@@ -410,6 +413,7 @@
event_loop_ = nullptr;
event_loop_factory_unique_ptr_.reset();
event_loop_factory_ = nullptr;
+ node_event_loop_factory_ = nullptr;
}
void LogReader::RemapLoggedChannel(std::string_view name, std::string_view type,
@@ -434,6 +438,9 @@
}
void LogReader::MakeRemappedConfig() {
+ CHECK(!event_loop_)
+ << ": Can't change the mapping after the events are scheduled.";
+
// If no remapping occurred and we are using the original config, then there
// is nothing interesting to do here.
if (remapped_channels_.empty() && replay_configuration_ == nullptr) {
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index 337109b..54b55d8 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -152,6 +152,7 @@
std::vector<std::unique_ptr<RawSender>> channels_;
std::unique_ptr<EventLoop> event_loop_unique_ptr_;
+ NodeEventLoopFactory *node_event_loop_factory_ = nullptr;
EventLoop *event_loop_ = nullptr;
TimerHandler *timer_handler_;
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 5323493..55d0ecc 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -208,8 +208,10 @@
MultinodeLoggerTest()
: config_(aos::configuration::ReadConfig(
"aos/events/logging/multinode_pingpong_config.json")),
- event_loop_factory_(&config_.message(), "pi1"),
- ping_event_loop_(event_loop_factory_.MakeEventLoop("ping")),
+ event_loop_factory_(&config_.message()),
+ ping_event_loop_(event_loop_factory_.MakeEventLoop(
+ "ping", configuration::GetNode(event_loop_factory_.configuration(),
+ "pi1"))),
ping_(ping_event_loop_.get()) {}
// Config and factory.
@@ -233,8 +235,10 @@
LOG(INFO) << "Logging data to " << logfile;
{
+ const Node *pi1 =
+ configuration::GetNode(event_loop_factory_.configuration(), "pi1");
std::unique_ptr<EventLoop> pong_event_loop =
- event_loop_factory_.MakeEventLoop("pong");
+ event_loop_factory_.MakeEventLoop("pong", pi1);
std::unique_ptr<aos::RawSender> pong_sender(
pong_event_loop->MakeRawSender(aos::configuration::GetChannel(
@@ -262,7 +266,7 @@
DetachedBufferWriter writer(logfile);
std::unique_ptr<EventLoop> logger_event_loop =
- event_loop_factory_.MakeEventLoop("logger");
+ event_loop_factory_.MakeEventLoop("logger", pi1);
event_loop_factory_.RunFor(chrono::milliseconds(95));
@@ -276,20 +280,23 @@
// TODO(austin): Also replay as pi2 or pi3 and make sure we see the pong
// messages. This won't work today yet until the log reading code gets
// significantly better.
- SimulatedEventLoopFactory log_reader_factory(reader.logged_configuration(), reader.node());
+ SimulatedEventLoopFactory log_reader_factory(reader.logged_configuration());
log_reader_factory.set_send_delay(chrono::microseconds(0));
// This sends out the fetched messages and advances time to the start of the
// log file.
reader.Register(&log_reader_factory);
+ const Node *pi1 =
+ configuration::GetNode(log_reader_factory.configuration(), "pi1");
+
ASSERT_NE(reader.node(), nullptr);
EXPECT_EQ(reader.node()->name()->string_view(), "pi1");
reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
std::unique_ptr<EventLoop> test_event_loop =
- log_reader_factory.MakeEventLoop("test");
+ log_reader_factory.MakeEventLoop("test", pi1);
int ping_count = 10;
int pong_count = 10;