Add NodeEventLoopFactory

This lets us create event loops on separate nodes which can't
communicate with each other.  Next step is to add a message proxy
between them, then teach the logger to replay onto multiple nodes.

Change-Id: I06b2836365aea13d696535c52a78ca0c862a7b1e
diff --git a/aos/events/logging/log_cat.cc b/aos/events/logging/log_cat.cc
index 1d1dd8b..80ec8e7 100644
--- a/aos/events/logging/log_cat.cc
+++ b/aos/events/logging/log_cat.cc
@@ -31,7 +31,7 @@
   reader.Register();
 
   std::unique_ptr<aos::EventLoop> printer_event_loop =
-      reader.event_loop_factory()->MakeEventLoop("printer");
+      reader.event_loop_factory()->MakeEventLoop("printer", reader.node());
   printer_event_loop->SkipTimingReport();
 
   bool found_channel = false;
@@ -66,7 +66,7 @@
                         << aos::FlatbufferToJson(
                                channel->schema(),
                                static_cast<const uint8_t *>(message))
-                        << '\n';
+                        << std::endl;
             } else {
               std::cout << context.realtime_event_time << " ("
                         << context.monotonic_event_time << ") "
@@ -75,7 +75,7 @@
                         << aos::FlatbufferToJson(
                                channel->schema(),
                                static_cast<const uint8_t *>(message))
-                        << '\n';
+                        << std::endl;
             }
           });
       found_channel = true;
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index 51dc10c..2de17d7 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -276,13 +276,16 @@
 
 void LogReader::Register() {
   event_loop_factory_unique_ptr_ =
-      std::make_unique<SimulatedEventLoopFactory>(configuration(), node());
+      std::make_unique<SimulatedEventLoopFactory>(configuration());
   Register(event_loop_factory_unique_ptr_.get());
 }
 
 void LogReader::Register(SimulatedEventLoopFactory *event_loop_factory) {
   event_loop_factory_ = event_loop_factory;
-  event_loop_unique_ptr_ = event_loop_factory_->MakeEventLoop("log_reader");
+  node_event_loop_factory_ =
+      event_loop_factory_->GetNodeEventLoopFactory(node());
+  event_loop_unique_ptr_ =
+      event_loop_factory->MakeEventLoop("log_reader", node());
   // We don't run timing reports when trying to print out logged data, because
   // otherwise we would end up printing out the timing reports themselves...
   // This is only really relevant when we are replaying into a simulation.
@@ -355,8 +358,8 @@
                "this.";
 
         // If we have access to the factory, use it to fix the realtime time.
-        if (event_loop_factory_ != nullptr) {
-          event_loop_factory_->SetRealtimeOffset(
+        if (node_event_loop_factory_ != nullptr) {
+          node_event_loop_factory_->SetRealtimeOffset(
               monotonic_clock::time_point(chrono::nanoseconds(
                   channel_data.message().monotonic_sent_time())),
               realtime_clock::time_point(chrono::nanoseconds(
@@ -410,6 +413,7 @@
   event_loop_ = nullptr;
   event_loop_factory_unique_ptr_.reset();
   event_loop_factory_ = nullptr;
+  node_event_loop_factory_ = nullptr;
 }
 
 void LogReader::RemapLoggedChannel(std::string_view name, std::string_view type,
@@ -434,6 +438,9 @@
 }
 
 void LogReader::MakeRemappedConfig() {
+  CHECK(!event_loop_)
+      << ": Can't change the mapping after the events are scheduled.";
+
   // If no remapping occurred and we are using the original config, then there
   // is nothing interesting to do here.
   if (remapped_channels_.empty() && replay_configuration_ == nullptr) {
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index 337109b..54b55d8 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -152,6 +152,7 @@
   std::vector<std::unique_ptr<RawSender>> channels_;
 
   std::unique_ptr<EventLoop> event_loop_unique_ptr_;
+  NodeEventLoopFactory *node_event_loop_factory_ = nullptr;
   EventLoop *event_loop_ = nullptr;
   TimerHandler *timer_handler_;
 
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 5323493..55d0ecc 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -208,8 +208,10 @@
   MultinodeLoggerTest()
       : config_(aos::configuration::ReadConfig(
             "aos/events/logging/multinode_pingpong_config.json")),
-        event_loop_factory_(&config_.message(), "pi1"),
-        ping_event_loop_(event_loop_factory_.MakeEventLoop("ping")),
+        event_loop_factory_(&config_.message()),
+        ping_event_loop_(event_loop_factory_.MakeEventLoop(
+            "ping", configuration::GetNode(event_loop_factory_.configuration(),
+                                           "pi1"))),
         ping_(ping_event_loop_.get()) {}
 
   // Config and factory.
@@ -233,8 +235,10 @@
   LOG(INFO) << "Logging data to " << logfile;
 
   {
+    const Node *pi1 =
+        configuration::GetNode(event_loop_factory_.configuration(), "pi1");
     std::unique_ptr<EventLoop> pong_event_loop =
-        event_loop_factory_.MakeEventLoop("pong");
+        event_loop_factory_.MakeEventLoop("pong", pi1);
 
     std::unique_ptr<aos::RawSender> pong_sender(
         pong_event_loop->MakeRawSender(aos::configuration::GetChannel(
@@ -262,7 +266,7 @@
 
     DetachedBufferWriter writer(logfile);
     std::unique_ptr<EventLoop> logger_event_loop =
-        event_loop_factory_.MakeEventLoop("logger");
+        event_loop_factory_.MakeEventLoop("logger", pi1);
 
     event_loop_factory_.RunFor(chrono::milliseconds(95));
 
@@ -276,20 +280,23 @@
   // TODO(austin): Also replay as pi2 or pi3 and make sure we see the pong
   // messages.  This won't work today yet until the log reading code gets
   // significantly better.
-  SimulatedEventLoopFactory log_reader_factory(reader.logged_configuration(), reader.node());
+  SimulatedEventLoopFactory log_reader_factory(reader.logged_configuration());
   log_reader_factory.set_send_delay(chrono::microseconds(0));
 
   // This sends out the fetched messages and advances time to the start of the
   // log file.
   reader.Register(&log_reader_factory);
 
+  const Node *pi1 =
+      configuration::GetNode(log_reader_factory.configuration(), "pi1");
+
   ASSERT_NE(reader.node(), nullptr);
   EXPECT_EQ(reader.node()->name()->string_view(), "pi1");
 
   reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
 
   std::unique_ptr<EventLoop> test_event_loop =
-      log_reader_factory.MakeEventLoop("test");
+      log_reader_factory.MakeEventLoop("test", pi1);
 
   int ping_count = 10;
   int pong_count = 10;