Add NodeEventLoopFactory

This lets us create event loops on separate nodes which can't
communicate with each other.  Next step is to add a message proxy
between them, then teach the logger to replay onto multiple nodes.

Change-Id: I06b2836365aea13d696535c52a78ca0c862a7b1e
diff --git a/aos/configuration.cc b/aos/configuration.cc
index a40c47c..095add5 100644
--- a/aos/configuration.cc
+++ b/aos/configuration.cc
@@ -591,6 +591,7 @@
   }
   return nullptr;
 }
+
 const Node *GetMyNode(const Configuration *config) {
   const std::string hostname = (FLAGS_override_hostname.size() > 0)
                                    ? FLAGS_override_hostname
@@ -616,6 +617,8 @@
   return nullptr;
 }
 
+bool MultiNode(const Configuration *config) { return config->has_nodes(); }
+
 bool ChannelIsSendableOnNode(const Channel *channel, const Node *node) {
   if (node == nullptr) {
     return true;
diff --git a/aos/configuration.h b/aos/configuration.h
index ef30fce..88687ad 100644
--- a/aos/configuration.h
+++ b/aos/configuration.h
@@ -69,6 +69,9 @@
 const Node *GetNodeFromHostname(const Configuration *config,
                                 std::string_view name);
 
+// Returns true if we are running in a multinode configuration.
+bool MultiNode(const Configuration *config);
+
 // Returns true if the provided channel is sendable on the provided node.
 bool ChannelIsSendableOnNode(const Channel *channel, const Node *node);
 // Returns true if the provided channel is able to be watched or fetched on the
diff --git a/aos/events/event_loop_param_test.h b/aos/events/event_loop_param_test.h
index ea5fd3d..4f24d78 100644
--- a/aos/events/event_loop_param_test.h
+++ b/aos/events/event_loop_param_test.h
@@ -90,17 +90,17 @@
     flatbuffer_ = FlatbufferDetachedBuffer<Configuration>(
         JsonToFlatbuffer(json, Configuration::MiniReflectTypeTable()));
 
-    my_node_ = my_node;
+    my_node_ = configuration::GetNode(&flatbuffer_.message(), my_node);
   }
 
-  std::string_view my_node() const { return my_node_; }
+  const Node *my_node() const { return my_node_; }
 
   const Configuration *configuration() { return &flatbuffer_.message(); }
 
  private:
   FlatbufferDetachedBuffer<Configuration> flatbuffer_;
 
-  std::string my_node_;
+  const Node *my_node_ = nullptr;
 };
 
 class AbstractEventLoopTestBase
@@ -134,7 +134,7 @@
 
   const Configuration *configuration() { return factory_->configuration(); }
 
-  std::string_view my_node() const { return factory_->my_node(); }
+  const Node *my_node() const { return factory_->my_node(); }
 
   // Ends the given event loop at the given time from now.
   void EndEventLoop(EventLoop *loop, ::std::chrono::milliseconds duration) {
diff --git a/aos/events/event_scheduler.cc b/aos/events/event_scheduler.cc
index b5a530e..57f20ae 100644
--- a/aos/events/event_scheduler.cc
+++ b/aos/events/event_scheduler.cc
@@ -8,7 +8,7 @@
 namespace aos {
 
 EventScheduler::Token EventScheduler::Schedule(
-    ::aos::monotonic_clock::time_point time, ::std::function<void()> callback) {
+    distributed_clock::time_point time, ::std::function<void()> callback) {
   return events_list_.emplace(time, callback);
 }
 
@@ -16,9 +16,9 @@
   events_list_.erase(token);
 }
 
-void EventScheduler::RunFor(monotonic_clock::duration duration) {
-  const ::aos::monotonic_clock::time_point end_time =
-      monotonic_now() + duration;
+void EventScheduler::RunFor(distributed_clock::duration duration) {
+  const distributed_clock::time_point end_time =
+      distributed_now() + duration;
   is_running_ = true;
   for (std::function<void()> &on_run : on_run_) {
     on_run();
@@ -26,7 +26,7 @@
   on_run_.clear();
   while (!events_list_.empty() && is_running_) {
     auto iter = events_list_.begin();
-    ::aos::monotonic_clock::time_point next_time = iter->first;
+    distributed_clock::time_point next_time = iter->first;
     if (next_time > end_time) {
       break;
     }
@@ -53,4 +53,11 @@
   }
 }
 
+std::ostream &operator<<(std::ostream &stream,
+                         const aos::distributed_clock::time_point &now) {
+  // Print it the same way we print a monotonic time.  Literally.
+  stream << monotonic_clock::time_point(now.time_since_epoch());
+  return stream;
+}
+
 }  // namespace aos
diff --git a/aos/events/event_scheduler.h b/aos/events/event_scheduler.h
index 432f4ad..400a307 100644
--- a/aos/events/event_scheduler.h
+++ b/aos/events/event_scheduler.h
@@ -14,15 +14,43 @@
 
 namespace aos {
 
+// This clock is the basis for distributed time.  It is used to synchronize time
+// between multiple nodes.  This is a new type so conversions to and from the
+// monotonic and realtime clocks aren't implicit.
+class distributed_clock {
+ public:
+  typedef ::std::chrono::nanoseconds::rep rep;
+  typedef ::std::chrono::nanoseconds::period period;
+  typedef ::std::chrono::nanoseconds duration;
+  typedef ::std::chrono::time_point<distributed_clock> time_point;
+
+  // This clock is the base clock for the simulation and everything is synced to
+  // it.  It never jumps.
+  static constexpr bool is_steady = true;
+
+  // Returns the epoch (0).
+  static constexpr time_point epoch() { return time_point(zero()); }
+
+  static constexpr duration zero() { return duration(0); }
+
+  static constexpr time_point min_time{
+      time_point(duration(::std::numeric_limits<duration::rep>::min()))};
+  static constexpr time_point max_time{
+      time_point(duration(::std::numeric_limits<duration::rep>::max()))};
+};
+
+std::ostream &operator<<(std::ostream &stream,
+                         const aos::distributed_clock::time_point &now);
+
 class EventScheduler {
  public:
   using ChannelType =
-      std::multimap<monotonic_clock::time_point, std::function<void()>>;
+      std::multimap<distributed_clock::time_point, std::function<void()>>;
   using Token = ChannelType::iterator;
 
   // Schedule an event with a callback function
   // Returns an iterator to the event
-  Token Schedule(monotonic_clock::time_point time,
+  Token Schedule(distributed_clock::time_point time,
                  std::function<void()> callback);
 
   // Schedules a callback when the event scheduler starts.
@@ -38,30 +66,17 @@
   // Runs until exited.
   void Run();
   // Runs for a duration.
-  void RunFor(monotonic_clock::duration duration);
+  void RunFor(distributed_clock::duration duration);
 
   void Exit() { is_running_ = false; }
 
   bool is_running() const { return is_running_; }
 
-  monotonic_clock::time_point monotonic_now() const { return now_; }
-
-  realtime_clock::time_point realtime_now() const {
-    return realtime_clock::time_point(monotonic_now().time_since_epoch() +
-                                      realtime_offset_);
-  }
-
-  // Sets realtime clock to realtime_now for a given monotonic clock.
-  void SetRealtimeOffset(monotonic_clock::time_point monotonic_now,
-                         realtime_clock::time_point realtime_now) {
-    realtime_offset_ =
-        realtime_now.time_since_epoch() - monotonic_now.time_since_epoch();
-  }
+  distributed_clock::time_point distributed_now() const { return now_; }
 
  private:
   // Current execution time.
-  monotonic_clock::time_point now_ = monotonic_clock::epoch();
-  std::chrono::nanoseconds realtime_offset_ = std::chrono::seconds(0);
+  distributed_clock::time_point now_ = distributed_clock::epoch();
 
   std::vector<std::function<void()>> on_run_;
 
diff --git a/aos/events/logging/log_cat.cc b/aos/events/logging/log_cat.cc
index 1d1dd8b..80ec8e7 100644
--- a/aos/events/logging/log_cat.cc
+++ b/aos/events/logging/log_cat.cc
@@ -31,7 +31,7 @@
   reader.Register();
 
   std::unique_ptr<aos::EventLoop> printer_event_loop =
-      reader.event_loop_factory()->MakeEventLoop("printer");
+      reader.event_loop_factory()->MakeEventLoop("printer", reader.node());
   printer_event_loop->SkipTimingReport();
 
   bool found_channel = false;
@@ -66,7 +66,7 @@
                         << aos::FlatbufferToJson(
                                channel->schema(),
                                static_cast<const uint8_t *>(message))
-                        << '\n';
+                        << std::endl;
             } else {
               std::cout << context.realtime_event_time << " ("
                         << context.monotonic_event_time << ") "
@@ -75,7 +75,7 @@
                         << aos::FlatbufferToJson(
                                channel->schema(),
                                static_cast<const uint8_t *>(message))
-                        << '\n';
+                        << std::endl;
             }
           });
       found_channel = true;
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index 51dc10c..2de17d7 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -276,13 +276,16 @@
 
 void LogReader::Register() {
   event_loop_factory_unique_ptr_ =
-      std::make_unique<SimulatedEventLoopFactory>(configuration(), node());
+      std::make_unique<SimulatedEventLoopFactory>(configuration());
   Register(event_loop_factory_unique_ptr_.get());
 }
 
 void LogReader::Register(SimulatedEventLoopFactory *event_loop_factory) {
   event_loop_factory_ = event_loop_factory;
-  event_loop_unique_ptr_ = event_loop_factory_->MakeEventLoop("log_reader");
+  node_event_loop_factory_ =
+      event_loop_factory_->GetNodeEventLoopFactory(node());
+  event_loop_unique_ptr_ =
+      event_loop_factory->MakeEventLoop("log_reader", node());
   // We don't run timing reports when trying to print out logged data, because
   // otherwise we would end up printing out the timing reports themselves...
   // This is only really relevant when we are replaying into a simulation.
@@ -355,8 +358,8 @@
                "this.";
 
         // If we have access to the factory, use it to fix the realtime time.
-        if (event_loop_factory_ != nullptr) {
-          event_loop_factory_->SetRealtimeOffset(
+        if (node_event_loop_factory_ != nullptr) {
+          node_event_loop_factory_->SetRealtimeOffset(
               monotonic_clock::time_point(chrono::nanoseconds(
                   channel_data.message().monotonic_sent_time())),
               realtime_clock::time_point(chrono::nanoseconds(
@@ -410,6 +413,7 @@
   event_loop_ = nullptr;
   event_loop_factory_unique_ptr_.reset();
   event_loop_factory_ = nullptr;
+  node_event_loop_factory_ = nullptr;
 }
 
 void LogReader::RemapLoggedChannel(std::string_view name, std::string_view type,
@@ -434,6 +438,9 @@
 }
 
 void LogReader::MakeRemappedConfig() {
+  CHECK(!event_loop_)
+      << ": Can't change the mapping after the events are scheduled.";
+
   // If no remapping occurred and we are using the original config, then there
   // is nothing interesting to do here.
   if (remapped_channels_.empty() && replay_configuration_ == nullptr) {
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index 337109b..54b55d8 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -152,6 +152,7 @@
   std::vector<std::unique_ptr<RawSender>> channels_;
 
   std::unique_ptr<EventLoop> event_loop_unique_ptr_;
+  NodeEventLoopFactory *node_event_loop_factory_ = nullptr;
   EventLoop *event_loop_ = nullptr;
   TimerHandler *timer_handler_;
 
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 5323493..55d0ecc 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -208,8 +208,10 @@
   MultinodeLoggerTest()
       : config_(aos::configuration::ReadConfig(
             "aos/events/logging/multinode_pingpong_config.json")),
-        event_loop_factory_(&config_.message(), "pi1"),
-        ping_event_loop_(event_loop_factory_.MakeEventLoop("ping")),
+        event_loop_factory_(&config_.message()),
+        ping_event_loop_(event_loop_factory_.MakeEventLoop(
+            "ping", configuration::GetNode(event_loop_factory_.configuration(),
+                                           "pi1"))),
         ping_(ping_event_loop_.get()) {}
 
   // Config and factory.
@@ -233,8 +235,10 @@
   LOG(INFO) << "Logging data to " << logfile;
 
   {
+    const Node *pi1 =
+        configuration::GetNode(event_loop_factory_.configuration(), "pi1");
     std::unique_ptr<EventLoop> pong_event_loop =
-        event_loop_factory_.MakeEventLoop("pong");
+        event_loop_factory_.MakeEventLoop("pong", pi1);
 
     std::unique_ptr<aos::RawSender> pong_sender(
         pong_event_loop->MakeRawSender(aos::configuration::GetChannel(
@@ -262,7 +266,7 @@
 
     DetachedBufferWriter writer(logfile);
     std::unique_ptr<EventLoop> logger_event_loop =
-        event_loop_factory_.MakeEventLoop("logger");
+        event_loop_factory_.MakeEventLoop("logger", pi1);
 
     event_loop_factory_.RunFor(chrono::milliseconds(95));
 
@@ -276,20 +280,23 @@
   // TODO(austin): Also replay as pi2 or pi3 and make sure we see the pong
   // messages.  This won't work today yet until the log reading code gets
   // significantly better.
-  SimulatedEventLoopFactory log_reader_factory(reader.logged_configuration(), reader.node());
+  SimulatedEventLoopFactory log_reader_factory(reader.logged_configuration());
   log_reader_factory.set_send_delay(chrono::microseconds(0));
 
   // This sends out the fetched messages and advances time to the start of the
   // log file.
   reader.Register(&log_reader_factory);
 
+  const Node *pi1 =
+      configuration::GetNode(log_reader_factory.configuration(), "pi1");
+
   ASSERT_NE(reader.node(), nullptr);
   EXPECT_EQ(reader.node()->name()->string_view(), "pi1");
 
   reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
 
   std::unique_ptr<EventLoop> test_event_loop =
-      log_reader_factory.MakeEventLoop("test");
+      log_reader_factory.MakeEventLoop("test", pi1);
 
   int ping_count = 10;
   int pong_count = 10;
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index e34a23f..0d8a488 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -36,6 +36,7 @@
  public:
   SimulatedWatcher(
       SimulatedEventLoop *simulated_event_loop, EventScheduler *scheduler,
+      NodeEventLoopFactory *node_event_loop_factory,
       const Channel *channel,
       std::function<void(const Context &context, const void *message)> fn);
 
@@ -59,6 +60,7 @@
   SimulatedEventLoop *simulated_event_loop_;
   EventHandler<SimulatedWatcher> event_;
   EventScheduler *scheduler_;
+  NodeEventLoopFactory *node_event_loop_factory_;
   EventScheduler::Token token_;
   SimulatedChannel *simulated_channel_ = nullptr;
 };
@@ -102,10 +104,6 @@
 
   const Channel *channel() const { return channel_; }
 
-  ::aos::monotonic_clock::time_point monotonic_now() const {
-    return scheduler_->monotonic_now();
-  }
-
  private:
   const Channel *channel_;
 
@@ -186,12 +184,12 @@
     message_ = MakeSimulatedMessage(simulated_channel_->max_size());
 
     // Now fill in the message.  size is already populated above, and
-    // queue_index will be populated in queue_.  Put this at the back of the
-    // data segment.
+    // queue_index will be populated in simulated_channel_.  Put this at the
+    // back of the data segment.
     memcpy(message_->data() + simulated_channel_->max_size() - size, msg, size);
 
-    return Send(size, monotonic_remote_time, realtime_remote_time,
-                remote_queue_index);
+    return DoSend(size, monotonic_remote_time, realtime_remote_time,
+                  remote_queue_index);
   }
 
  private:
@@ -204,9 +202,11 @@
 
 class SimulatedFetcher : public RawFetcher {
  public:
-  explicit SimulatedFetcher(EventLoop *event_loop, SimulatedChannel *queue)
-      : RawFetcher(event_loop, queue->channel()), queue_(queue) {}
-  ~SimulatedFetcher() { queue_->UnregisterFetcher(this); }
+  explicit SimulatedFetcher(EventLoop *event_loop,
+                            SimulatedChannel *simulated_channel)
+      : RawFetcher(event_loop, simulated_channel->channel()),
+        simulated_channel_(simulated_channel) {}
+  ~SimulatedFetcher() { simulated_channel_->UnregisterFetcher(this); }
 
   std::pair<bool, monotonic_clock::time_point> DoFetchNext() override {
     if (msgs_.size() == 0) {
@@ -222,8 +222,8 @@
     if (msgs_.size() == 0) {
       // TODO(austin): Can we just do this logic unconditionally?  It is a lot
       // simpler.  And call clear, obviously.
-      if (!msg_ && queue_->latest_message()) {
-        SetMsg(queue_->latest_message());
+      if (!msg_ && simulated_channel_->latest_message()) {
+        SetMsg(simulated_channel_->latest_message());
         return std::make_pair(true, event_loop()->monotonic_now());
       } else {
         return std::make_pair(false, monotonic_clock::min_time);
@@ -260,7 +260,7 @@
     msgs_.emplace_back(buffer);
   }
 
-  SimulatedChannel *queue_;
+  SimulatedChannel *simulated_channel_;
   std::shared_ptr<SimulatedMessage> msg_;
 
   // Messages queued up but not in use.
@@ -270,6 +270,7 @@
 class SimulatedTimerHandler : public TimerHandler {
  public:
   explicit SimulatedTimerHandler(EventScheduler *scheduler,
+                                 NodeEventLoopFactory *node_event_loop_factory,
                                  SimulatedEventLoop *simulated_event_loop,
                                  ::std::function<void()> fn);
   ~SimulatedTimerHandler() { Disable(); }
@@ -285,6 +286,7 @@
   SimulatedEventLoop *simulated_event_loop_;
   EventHandler<SimulatedTimerHandler> event_;
   EventScheduler *scheduler_;
+  NodeEventLoopFactory *node_event_loop_factory_;
   EventScheduler::Token token_;
 
   monotonic_clock::time_point base_;
@@ -294,6 +296,7 @@
 class SimulatedPhasedLoopHandler : public PhasedLoopHandler {
  public:
   SimulatedPhasedLoopHandler(EventScheduler *scheduler,
+                             NodeEventLoopFactory *node_event_loop_factory,
                              SimulatedEventLoop *simulated_event_loop,
                              ::std::function<void(int)> fn,
                              const monotonic_clock::duration interval,
@@ -309,6 +312,7 @@
   EventHandler<SimulatedPhasedLoopHandler> event_;
 
   EventScheduler *scheduler_;
+  NodeEventLoopFactory *node_event_loop_factory_;
   EventScheduler::Token token_;
 };
 
@@ -316,6 +320,7 @@
  public:
   explicit SimulatedEventLoop(
       EventScheduler *scheduler,
+      NodeEventLoopFactory *node_event_loop_factory,
       absl::btree_map<SimpleChannel, std::unique_ptr<SimulatedChannel>>
           *channels,
       const Configuration *configuration,
@@ -324,6 +329,7 @@
       const Node *node, pid_t tid)
       : EventLoop(CHECK_NOTNULL(configuration)),
         scheduler_(scheduler),
+        node_event_loop_factory_(node_event_loop_factory),
         channels_(channels),
         raw_event_loops_(raw_event_loops),
         node_(node),
@@ -361,11 +367,11 @@
   }
 
   ::aos::monotonic_clock::time_point monotonic_now() override {
-    return scheduler_->monotonic_now();
+    return node_event_loop_factory_->monotonic_now();
   }
 
   ::aos::realtime_clock::time_point realtime_now() override {
-    return scheduler_->realtime_now();
+    return node_event_loop_factory_->realtime_now();
   }
 
   ::std::unique_ptr<RawSender> MakeRawSender(const Channel *channel) override;
@@ -379,17 +385,17 @@
 
   TimerHandler *AddTimer(::std::function<void()> callback) override {
     CHECK(!is_running());
-    return NewTimer(::std::unique_ptr<TimerHandler>(
-        new SimulatedTimerHandler(scheduler_, this, callback)));
+    return NewTimer(::std::unique_ptr<TimerHandler>(new SimulatedTimerHandler(
+        scheduler_, node_event_loop_factory_, this, callback)));
   }
 
   PhasedLoopHandler *AddPhasedLoop(::std::function<void(int)> callback,
                                    const monotonic_clock::duration interval,
                                    const monotonic_clock::duration offset =
                                        ::std::chrono::seconds(0)) override {
-    return NewPhasedLoop(
-        ::std::unique_ptr<PhasedLoopHandler>(new SimulatedPhasedLoopHandler(
-            scheduler_, this, callback, interval, offset)));
+    return NewPhasedLoop(::std::unique_ptr<PhasedLoopHandler>(
+        new SimulatedPhasedLoopHandler(scheduler_, node_event_loop_factory_,
+                                       this, callback, interval, offset)));
   }
 
   void OnRun(::std::function<void()> on_run) override {
@@ -433,6 +439,7 @@
   pid_t GetTid() override { return tid_; }
 
   EventScheduler *scheduler_;
+  NodeEventLoopFactory *node_event_loop_factory_;
   absl::btree_map<SimpleChannel, std::unique_ptr<SimulatedChannel>> *channels_;
   std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
       *raw_event_loops_;
@@ -459,17 +466,13 @@
   }
 }
 
-std::chrono::nanoseconds SimulatedEventLoopFactory::send_delay() const {
-  return send_delay_;
-}
-
 void SimulatedEventLoop::MakeRawWatcher(
     const Channel *channel,
     std::function<void(const Context &channel, const void *message)> watcher) {
   TakeWatcher(channel);
 
-  std::unique_ptr<SimulatedWatcher> shm_watcher(
-      new SimulatedWatcher(this, scheduler_, channel, std::move(watcher)));
+  std::unique_ptr<SimulatedWatcher> shm_watcher(new SimulatedWatcher(
+      this, scheduler_, node_event_loop_factory_, channel, std::move(watcher)));
 
   GetSimulatedChannel(channel)->MakeRawWatcher(shm_watcher.get());
   NewWatcher(std::move(shm_watcher));
@@ -511,12 +514,13 @@
 
 SimulatedWatcher::SimulatedWatcher(
     SimulatedEventLoop *simulated_event_loop, EventScheduler *scheduler,
-    const Channel *channel,
+    NodeEventLoopFactory *node_event_loop_factory, const Channel *channel,
     std::function<void(const Context &context, const void *message)> fn)
     : WatcherState(simulated_event_loop, channel, std::move(fn)),
       simulated_event_loop_(simulated_event_loop),
       event_(this),
       scheduler_(scheduler),
+      node_event_loop_factory_(node_event_loop_factory),
       token_(scheduler_->InvalidToken()) {}
 
 SimulatedWatcher::~SimulatedWatcher() {
@@ -574,9 +578,10 @@
 }
 
 void SimulatedWatcher::DoSchedule(monotonic_clock::time_point event_time) {
-  token_ =
-      scheduler_->Schedule(event_time + simulated_event_loop_->send_delay(),
-                           [this]() { simulated_event_loop_->HandleEvent(); });
+  token_ = scheduler_->Schedule(
+      node_event_loop_factory_->ToDistributedClock(
+          event_time + simulated_event_loop_->send_delay()),
+      [this]() { simulated_event_loop_->HandleEvent(); });
 }
 
 void SimulatedChannel::MakeRawWatcher(SimulatedWatcher *watcher) {
@@ -622,12 +627,13 @@
 }
 
 SimulatedTimerHandler::SimulatedTimerHandler(
-    EventScheduler *scheduler, SimulatedEventLoop *simulated_event_loop,
-    ::std::function<void()> fn)
+    EventScheduler *scheduler, NodeEventLoopFactory *node_event_loop_factory,
+    SimulatedEventLoop *simulated_event_loop, ::std::function<void()> fn)
     : TimerHandler(simulated_event_loop, std::move(fn)),
       simulated_event_loop_(simulated_event_loop),
       event_(this),
       scheduler_(scheduler),
+      node_event_loop_factory_(node_event_loop_factory),
       token_(scheduler_->InvalidToken()) {}
 
 void SimulatedTimerHandler::Setup(monotonic_clock::time_point base,
@@ -639,10 +645,12 @@
   repeat_offset_ = repeat_offset;
   if (base < monotonic_now) {
     token_ = scheduler_->Schedule(
-        monotonic_now, [this]() { simulated_event_loop_->HandleEvent(); });
+        node_event_loop_factory_->ToDistributedClock(monotonic_now),
+        [this]() { simulated_event_loop_->HandleEvent(); });
   } else {
     token_ = scheduler_->Schedule(
-        base, [this]() { simulated_event_loop_->HandleEvent(); });
+        node_event_loop_factory_->ToDistributedClock(base),
+        [this]() { simulated_event_loop_->HandleEvent(); });
   }
   event_.set_event_time(base_);
   simulated_event_loop_->AddEvent(&event_);
@@ -655,7 +663,8 @@
     // Reschedule.
     while (base_ <= monotonic_now) base_ += repeat_offset_;
     token_ = scheduler_->Schedule(
-        base_, [this]() { simulated_event_loop_->HandleEvent(); });
+        node_event_loop_factory_->ToDistributedClock(base_),
+        [this]() { simulated_event_loop_->HandleEvent(); });
     event_.set_event_time(base_);
     simulated_event_loop_->AddEvent(&event_);
   } else {
@@ -674,13 +683,15 @@
 }
 
 SimulatedPhasedLoopHandler::SimulatedPhasedLoopHandler(
-    EventScheduler *scheduler, SimulatedEventLoop *simulated_event_loop,
-    ::std::function<void(int)> fn, const monotonic_clock::duration interval,
+    EventScheduler *scheduler, NodeEventLoopFactory *node_event_loop_factory,
+    SimulatedEventLoop *simulated_event_loop, ::std::function<void(int)> fn,
+    const monotonic_clock::duration interval,
     const monotonic_clock::duration offset)
     : PhasedLoopHandler(simulated_event_loop, std::move(fn), interval, offset),
       simulated_event_loop_(simulated_event_loop),
       event_(this),
       scheduler_(scheduler),
+      node_event_loop_factory_(node_event_loop_factory),
       token_(scheduler_->InvalidToken()) {}
 
 SimulatedPhasedLoopHandler::~SimulatedPhasedLoopHandler() {
@@ -702,51 +713,76 @@
 void SimulatedPhasedLoopHandler::Schedule(
     monotonic_clock::time_point sleep_time) {
   token_ = scheduler_->Schedule(
-      sleep_time, [this]() { simulated_event_loop_->HandleEvent(); });
+      node_event_loop_factory_->ToDistributedClock(sleep_time),
+      [this]() { simulated_event_loop_->HandleEvent(); });
   event_.set_event_time(sleep_time);
   simulated_event_loop_->AddEvent(&event_);
 }
 
+NodeEventLoopFactory::NodeEventLoopFactory(
+    EventScheduler *scheduler, SimulatedEventLoopFactory *factory,
+    const Node *node,
+    std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
+        *raw_event_loops)
+    : scheduler_(scheduler),
+      factory_(factory),
+      node_(node),
+      raw_event_loops_(raw_event_loops) {}
+
 SimulatedEventLoopFactory::SimulatedEventLoopFactory(
     const Configuration *configuration)
-    : configuration_(CHECK_NOTNULL(configuration)), node_(nullptr) {
-  CHECK(!configuration_->has_nodes())
-      << ": Got a configuration with multiple nodes and no node was selected.";
-}
-
-SimulatedEventLoopFactory::SimulatedEventLoopFactory(
-    const Configuration *configuration, std::string_view node_name)
-    : SimulatedEventLoopFactory(
-          configuration, configuration::GetNode(configuration, node_name)) {}
-
-SimulatedEventLoopFactory::SimulatedEventLoopFactory(
-    const Configuration *configuration, const Node *node)
-    : configuration_(CHECK_NOTNULL(configuration)), node_(node) {
-  if (node != nullptr) {
-    CHECK(configuration_->has_nodes())
-        << ": Got a configuration with no nodes and node \""
-        << node->name()->string_view() << "\" was selected.";
-    bool found = false;
-    for (const Node *node : *configuration_->nodes()) {
-      if (node == node_) {
-        found = true;
-        break;
-      }
+    : configuration_(CHECK_NOTNULL(configuration)) {
+  if (configuration::MultiNode(configuration_)) {
+    for (const Node *node : *configuration->nodes()) {
+      nodes_.emplace_back(node);
     }
-    CHECK(found) << ": node must be a pointer in the configuration.";
+  } else {
+    nodes_.emplace_back(nullptr);
+  }
+
+  for (const Node *node : nodes_) {
+    node_factories_.emplace_back(
+        new NodeEventLoopFactory(&scheduler_, this, node, &raw_event_loops_));
   }
 }
 
 SimulatedEventLoopFactory::~SimulatedEventLoopFactory() {}
 
+NodeEventLoopFactory *SimulatedEventLoopFactory::GetNodeEventLoopFactory(
+    const Node *node) {
+  auto result = std::find_if(
+      node_factories_.begin(), node_factories_.end(),
+      [node](const std::unique_ptr<NodeEventLoopFactory> &node_factory) {
+        return node_factory->node() == node;
+      });
+
+  CHECK(result != node_factories_.end())
+      << ": Failed to find node " << FlatbufferToJson(node);
+
+  return result->get();
+}
+
 ::std::unique_ptr<EventLoop> SimulatedEventLoopFactory::MakeEventLoop(
+    std::string_view name, const Node *node) {
+  if (node == nullptr) {
+    CHECK(!configuration::MultiNode(configuration()))
+        << ": Can't make a single node event loop in a multi-node world.";
+  } else {
+    CHECK(configuration::MultiNode(configuration()))
+        << ": Can't make a multi-node event loop in a single-node world.";
+  }
+  return GetNodeEventLoopFactory(node)->MakeEventLoop(name);
+}
+
+::std::unique_ptr<EventLoop> NodeEventLoopFactory::MakeEventLoop(
     std::string_view name) {
   pid_t tid = tid_;
   ++tid_;
   ::std::unique_ptr<SimulatedEventLoop> result(new SimulatedEventLoop(
-      &scheduler_, &channels_, configuration_, &raw_event_loops_, node_, tid));
+      scheduler_, this, &channels_, factory_->configuration(), raw_event_loops_,
+      node_, tid));
   result->set_name(name);
-  result->set_send_delay(send_delay_);
+  result->set_send_delay(factory_->send_delay());
   return std::move(result);
 }
 
diff --git a/aos/events/simulated_event_loop.h b/aos/events/simulated_event_loop.h
index de37c03..019172b 100644
--- a/aos/events/simulated_event_loop.h
+++ b/aos/events/simulated_event_loop.h
@@ -23,66 +23,162 @@
 // Class for simulated fetchers.
 class SimulatedChannel;
 
+class NodeEventLoopFactory;
+
+// There are 2 concepts needed to support multi-node simulations.
+//  1) The node.  This is implemented with NodeEventLoopFactory.
+//  2) The "robot" which runs multiple nodes.  This is implemented with
+//     SimulatedEventLoopFactory.
+//
+// To make things easier, SimulatedEventLoopFactory takes an optional Node
+// argument if you want to make event loops without interacting with the
+// NodeEventLoopFactory object.
+//
+// The basic flow goes something like as follows:
+//
+// SimulatedEventLoopFactory factory(config);
+// const Node *pi1 = configuration::GetNode(factory.configuration(), "pi1");
+// std::unique_ptr<EventLoop> event_loop = factory.MakeEventLoop("ping", pi1);
+//
+// Or
+//
+// SimulatedEventLoopFactory factory(config);
+// const Node *pi1 = configuration::GetNode(factory.configuration(), "pi1");
+// NodeEventLoopFactory *pi1_factory = factory.GetNodeEventLoopFactory(pi1);
+// std::unique_ptr<EventLoop> event_loop = pi1_factory.MakeEventLoop("ping");
+//
+// The distributed_clock is used to be the base time.  NodeEventLoopFactory has
+// all the information needed to adjust both the realtime and monotonic clocks
+// relative to the distributed_clock.
 class SimulatedEventLoopFactory {
  public:
   // Constructs a SimulatedEventLoopFactory with the provided configuration.
   // This configuration must remain in scope for the lifetime of the factory and
   // all sub-objects.
   SimulatedEventLoopFactory(const Configuration *configuration);
-  SimulatedEventLoopFactory(const Configuration *configuration,
-                            std::string_view node_name);
-  SimulatedEventLoopFactory(const Configuration *configuration,
-                            const Node *node);
   ~SimulatedEventLoopFactory();
 
-  ::std::unique_ptr<EventLoop> MakeEventLoop(std::string_view name);
+  // Creates an event loop.  If running in a multi-node environment, node needs
+  // to point to the node to create this event loop on.
+  ::std::unique_ptr<EventLoop> MakeEventLoop(std::string_view name,
+                                             const Node *node = nullptr);
+
+  // Returns the NodeEventLoopFactory for the provided node.  The returned
+  // NodeEventLoopFactory is owned by the SimulatedEventLoopFactory and has a
+  // lifetime identical to the factory.
+  NodeEventLoopFactory *GetNodeEventLoopFactory(const Node *node);
 
   // Starts executing the event loops unconditionally.
   void Run();
   // Executes the event loops for a duration.
-  void RunFor(monotonic_clock::duration duration);
+  void RunFor(distributed_clock::duration duration);
 
   // Stops executing all event loops.  Meant to be called from within an event
   // loop handler.
   void Exit() { scheduler_.Exit(); }
 
-  // Sets the simulated send delay for the factory.
+  const std::vector<const Node *> &nodes() const { return nodes_; }
+
+  // Sets the simulated send delay for all messages sent within a single node.
   void set_send_delay(std::chrono::nanoseconds send_delay);
-  std::chrono::nanoseconds send_delay() const;
+  std::chrono::nanoseconds send_delay() const { return send_delay_; }
+
+  // Sets the simulated network delay for messages forwarded between nodes.
+  void set_network_delay(std::chrono::nanoseconds network_delay);
+  std::chrono::nanoseconds network_delay() const { return network_delay_; }
+
+  // Returns the clock used to synchronize the nodes.
+  distributed_clock::time_point distributed_now() const {
+    return scheduler_.distributed_now();
+  }
+
+  // Returns the configuration used for everything.
+  const Configuration *configuration() const { return configuration_; }
+
+ private:
+  const Configuration *const configuration_;
+  EventScheduler scheduler_;
+  // List of event loops to manage running and not running for.
+  // The function is a callback used to set and clear the running bool on each
+  // event loop.
+  std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
+      raw_event_loops_;
+
+  std::chrono::nanoseconds send_delay_ = std::chrono::microseconds(50);
+  std::chrono::nanoseconds network_delay_ = std::chrono::microseconds(100);
+
+  std::vector<std::unique_ptr<NodeEventLoopFactory>> node_factories_;
+
+  std::vector<const Node *> nodes_;
+};
+
+// This class holds all the state required to be a single node.
+class NodeEventLoopFactory {
+ public:
+  ::std::unique_ptr<EventLoop> MakeEventLoop(std::string_view name);
 
   // Returns the node that this factory is running as, or nullptr if this is a
   // single node setup.
   const Node *node() const { return node_; }
 
-  monotonic_clock::time_point monotonic_now() const {
-    return scheduler_.monotonic_now();
-  }
-  realtime_clock::time_point realtime_now() const {
-    return scheduler_.realtime_now();
-  }
-
   // Sets realtime clock to realtime_now for a given monotonic clock.
   void SetRealtimeOffset(monotonic_clock::time_point monotonic_now,
                          realtime_clock::time_point realtime_now) {
-    scheduler_.SetRealtimeOffset(monotonic_now, realtime_now);
+    realtime_offset_ =
+        realtime_now.time_since_epoch() - monotonic_now.time_since_epoch();
   }
 
- private:
-  const Configuration *const configuration_;
-  EventScheduler scheduler_;
-  // Map from name, type to queue.
-  absl::btree_map<SimpleChannel, std::unique_ptr<SimulatedChannel>> channels_;
-  // List of event loops to manage running and not running for.
-  std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
-      raw_event_loops_;
+  // Returns the current time on both clocks.
+  inline monotonic_clock::time_point monotonic_now() const;
+  inline realtime_clock::time_point realtime_now() const;
 
-  std::chrono::nanoseconds send_delay_ = std::chrono::microseconds(50);
+  // Converts a time to the distributed clock for scheduling and cross-node time
+  // measurement.
+  inline distributed_clock::time_point ToDistributedClock(
+      monotonic_clock::time_point time) const;
+
+ private:
+  friend class SimulatedEventLoopFactory;
+  NodeEventLoopFactory(
+      EventScheduler *scheduler, SimulatedEventLoopFactory *factory,
+      const Node *node,
+      std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
+          *raw_event_loops);
+
+  EventScheduler *const scheduler_;
+  SimulatedEventLoopFactory *const factory_;
 
   const Node *const node_;
 
+  std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
+      *const raw_event_loops_;
+
+  std::chrono::nanoseconds monotonic_offset_ = std::chrono::seconds(0);
+  std::chrono::nanoseconds realtime_offset_ = std::chrono::seconds(0);
+
+  // Map from name, type to queue.
+  absl::btree_map<SimpleChannel, std::unique_ptr<SimulatedChannel>> channels_;
+
+  // pid so we get unique timing reports.
   pid_t tid_ = 0;
 };
 
+inline monotonic_clock::time_point NodeEventLoopFactory::monotonic_now() const {
+  return monotonic_clock::time_point(
+      factory_->distributed_now().time_since_epoch() + monotonic_offset_);
+}
+
+inline realtime_clock::time_point NodeEventLoopFactory::realtime_now() const {
+  return realtime_clock::time_point(monotonic_now().time_since_epoch() +
+                                    realtime_offset_);
+}
+
+inline distributed_clock::time_point NodeEventLoopFactory::ToDistributedClock(
+    monotonic_clock::time_point time) const {
+  return distributed_clock::time_point(time.time_since_epoch() -
+                                       monotonic_offset_);
+}
+
 }  // namespace aos
 
 #endif  // AOS_EVENTS_SIMULATED_EVENT_LOOP_H_
diff --git a/aos/events/simulated_event_loop_test.cc b/aos/events/simulated_event_loop_test.cc
index 4328d6f..b373aa6 100644
--- a/aos/events/simulated_event_loop_test.cc
+++ b/aos/events/simulated_event_loop_test.cc
@@ -15,11 +15,11 @@
  public:
   ::std::unique_ptr<EventLoop> Make(std::string_view name) override {
     MaybeMake();
-    return event_loop_factory_->MakeEventLoop(name);
+    return event_loop_factory_->MakeEventLoop(name, my_node());
   }
   ::std::unique_ptr<EventLoop> MakePrimary(std::string_view name) override {
     MaybeMake();
-    return event_loop_factory_->MakeEventLoop(name);
+    return event_loop_factory_->MakeEventLoop(name, my_node());
   }
 
   void Run() override { event_loop_factory_->Run(); }
@@ -38,8 +38,8 @@
   void MaybeMake() {
     if (!event_loop_factory_) {
       if (configuration()->has_nodes()) {
-        event_loop_factory_ = std::make_unique<SimulatedEventLoopFactory>(
-            configuration(), my_node());
+        event_loop_factory_ =
+            std::make_unique<SimulatedEventLoopFactory>(configuration());
       } else {
         event_loop_factory_ =
             std::make_unique<SimulatedEventLoopFactory>(configuration());
@@ -64,12 +64,13 @@
   int counter = 0;
   EventScheduler scheduler;
 
-  scheduler.Schedule(::aos::monotonic_clock::now(),
-                      [&counter]() { counter += 1; });
+  scheduler.Schedule(distributed_clock::epoch() + chrono::seconds(1),
+                     [&counter]() { counter += 1; });
   scheduler.Run();
   EXPECT_EQ(counter, 1);
-  auto token = scheduler.Schedule(::aos::monotonic_clock::now(),
-                                   [&counter]() { counter += 1; });
+  auto token =
+      scheduler.Schedule(distributed_clock::epoch() + chrono::seconds(2),
+                         [&counter]() { counter += 1; });
   scheduler.Deschedule(token);
   scheduler.Run();
   EXPECT_EQ(counter, 1);
@@ -80,8 +81,9 @@
   int counter = 0;
   EventScheduler scheduler;
 
-  auto token = scheduler.Schedule(::aos::monotonic_clock::now(),
-                                   [&counter]() { counter += 1; });
+  auto token =
+      scheduler.Schedule(distributed_clock::epoch() + chrono::seconds(1),
+                         [&counter]() { counter += 1; });
   scheduler.Deschedule(token);
   scheduler.Run();
   EXPECT_EQ(counter, 0);
@@ -100,8 +102,6 @@
   simulated_event_loop_factory.RunFor(chrono::seconds(1));
 
   EXPECT_EQ(::aos::monotonic_clock::epoch() + chrono::seconds(1),
-            simulated_event_loop_factory.monotonic_now());
-  EXPECT_EQ(::aos::monotonic_clock::epoch() + chrono::seconds(1),
             event_loop->monotonic_now());
 }
 
@@ -125,8 +125,6 @@
   simulated_event_loop_factory.RunFor(chrono::seconds(1));
 
   EXPECT_EQ(::aos::monotonic_clock::epoch() + chrono::seconds(1),
-            simulated_event_loop_factory.monotonic_now());
-  EXPECT_EQ(::aos::monotonic_clock::epoch() + chrono::seconds(1),
             event_loop->monotonic_now());
   EXPECT_EQ(counter, 10);
 }