Add NodeEventLoopFactory

This lets us create event loops on separate nodes which can't
communicate with each other.  Next step is to add a message proxy
between them, then teach the logger to replay onto multiple nodes.

Change-Id: I06b2836365aea13d696535c52a78ca0c862a7b1e
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index e34a23f..0d8a488 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -36,6 +36,7 @@
  public:
   SimulatedWatcher(
       SimulatedEventLoop *simulated_event_loop, EventScheduler *scheduler,
+      NodeEventLoopFactory *node_event_loop_factory,
       const Channel *channel,
       std::function<void(const Context &context, const void *message)> fn);
 
@@ -59,6 +60,7 @@
   SimulatedEventLoop *simulated_event_loop_;
   EventHandler<SimulatedWatcher> event_;
   EventScheduler *scheduler_;
+  NodeEventLoopFactory *node_event_loop_factory_;
   EventScheduler::Token token_;
   SimulatedChannel *simulated_channel_ = nullptr;
 };
@@ -102,10 +104,6 @@
 
   const Channel *channel() const { return channel_; }
 
-  ::aos::monotonic_clock::time_point monotonic_now() const {
-    return scheduler_->monotonic_now();
-  }
-
  private:
   const Channel *channel_;
 
@@ -186,12 +184,12 @@
     message_ = MakeSimulatedMessage(simulated_channel_->max_size());
 
     // Now fill in the message.  size is already populated above, and
-    // queue_index will be populated in queue_.  Put this at the back of the
-    // data segment.
+    // queue_index will be populated in simulated_channel_.  Put this at the
+    // back of the data segment.
     memcpy(message_->data() + simulated_channel_->max_size() - size, msg, size);
 
-    return Send(size, monotonic_remote_time, realtime_remote_time,
-                remote_queue_index);
+    return DoSend(size, monotonic_remote_time, realtime_remote_time,
+                  remote_queue_index);
   }
 
  private:
@@ -204,9 +202,11 @@
 
 class SimulatedFetcher : public RawFetcher {
  public:
-  explicit SimulatedFetcher(EventLoop *event_loop, SimulatedChannel *queue)
-      : RawFetcher(event_loop, queue->channel()), queue_(queue) {}
-  ~SimulatedFetcher() { queue_->UnregisterFetcher(this); }
+  explicit SimulatedFetcher(EventLoop *event_loop,
+                            SimulatedChannel *simulated_channel)
+      : RawFetcher(event_loop, simulated_channel->channel()),
+        simulated_channel_(simulated_channel) {}
+  ~SimulatedFetcher() { simulated_channel_->UnregisterFetcher(this); }
 
   std::pair<bool, monotonic_clock::time_point> DoFetchNext() override {
     if (msgs_.size() == 0) {
@@ -222,8 +222,8 @@
     if (msgs_.size() == 0) {
       // TODO(austin): Can we just do this logic unconditionally?  It is a lot
       // simpler.  And call clear, obviously.
-      if (!msg_ && queue_->latest_message()) {
-        SetMsg(queue_->latest_message());
+      if (!msg_ && simulated_channel_->latest_message()) {
+        SetMsg(simulated_channel_->latest_message());
         return std::make_pair(true, event_loop()->monotonic_now());
       } else {
         return std::make_pair(false, monotonic_clock::min_time);
@@ -260,7 +260,7 @@
     msgs_.emplace_back(buffer);
   }
 
-  SimulatedChannel *queue_;
+  SimulatedChannel *simulated_channel_;
   std::shared_ptr<SimulatedMessage> msg_;
 
   // Messages queued up but not in use.
@@ -270,6 +270,7 @@
 class SimulatedTimerHandler : public TimerHandler {
  public:
   explicit SimulatedTimerHandler(EventScheduler *scheduler,
+                                 NodeEventLoopFactory *node_event_loop_factory,
                                  SimulatedEventLoop *simulated_event_loop,
                                  ::std::function<void()> fn);
   ~SimulatedTimerHandler() { Disable(); }
@@ -285,6 +286,7 @@
   SimulatedEventLoop *simulated_event_loop_;
   EventHandler<SimulatedTimerHandler> event_;
   EventScheduler *scheduler_;
+  NodeEventLoopFactory *node_event_loop_factory_;
   EventScheduler::Token token_;
 
   monotonic_clock::time_point base_;
@@ -294,6 +296,7 @@
 class SimulatedPhasedLoopHandler : public PhasedLoopHandler {
  public:
   SimulatedPhasedLoopHandler(EventScheduler *scheduler,
+                             NodeEventLoopFactory *node_event_loop_factory,
                              SimulatedEventLoop *simulated_event_loop,
                              ::std::function<void(int)> fn,
                              const monotonic_clock::duration interval,
@@ -309,6 +312,7 @@
   EventHandler<SimulatedPhasedLoopHandler> event_;
 
   EventScheduler *scheduler_;
+  NodeEventLoopFactory *node_event_loop_factory_;
   EventScheduler::Token token_;
 };
 
@@ -316,6 +320,7 @@
  public:
   explicit SimulatedEventLoop(
       EventScheduler *scheduler,
+      NodeEventLoopFactory *node_event_loop_factory,
       absl::btree_map<SimpleChannel, std::unique_ptr<SimulatedChannel>>
           *channels,
       const Configuration *configuration,
@@ -324,6 +329,7 @@
       const Node *node, pid_t tid)
       : EventLoop(CHECK_NOTNULL(configuration)),
         scheduler_(scheduler),
+        node_event_loop_factory_(node_event_loop_factory),
         channels_(channels),
         raw_event_loops_(raw_event_loops),
         node_(node),
@@ -361,11 +367,11 @@
   }
 
   ::aos::monotonic_clock::time_point monotonic_now() override {
-    return scheduler_->monotonic_now();
+    return node_event_loop_factory_->monotonic_now();
   }
 
   ::aos::realtime_clock::time_point realtime_now() override {
-    return scheduler_->realtime_now();
+    return node_event_loop_factory_->realtime_now();
   }
 
   ::std::unique_ptr<RawSender> MakeRawSender(const Channel *channel) override;
@@ -379,17 +385,17 @@
 
   TimerHandler *AddTimer(::std::function<void()> callback) override {
     CHECK(!is_running());
-    return NewTimer(::std::unique_ptr<TimerHandler>(
-        new SimulatedTimerHandler(scheduler_, this, callback)));
+    return NewTimer(::std::unique_ptr<TimerHandler>(new SimulatedTimerHandler(
+        scheduler_, node_event_loop_factory_, this, callback)));
   }
 
   PhasedLoopHandler *AddPhasedLoop(::std::function<void(int)> callback,
                                    const monotonic_clock::duration interval,
                                    const monotonic_clock::duration offset =
                                        ::std::chrono::seconds(0)) override {
-    return NewPhasedLoop(
-        ::std::unique_ptr<PhasedLoopHandler>(new SimulatedPhasedLoopHandler(
-            scheduler_, this, callback, interval, offset)));
+    return NewPhasedLoop(::std::unique_ptr<PhasedLoopHandler>(
+        new SimulatedPhasedLoopHandler(scheduler_, node_event_loop_factory_,
+                                       this, callback, interval, offset)));
   }
 
   void OnRun(::std::function<void()> on_run) override {
@@ -433,6 +439,7 @@
   pid_t GetTid() override { return tid_; }
 
   EventScheduler *scheduler_;
+  NodeEventLoopFactory *node_event_loop_factory_;
   absl::btree_map<SimpleChannel, std::unique_ptr<SimulatedChannel>> *channels_;
   std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
       *raw_event_loops_;
@@ -459,17 +466,13 @@
   }
 }
 
-std::chrono::nanoseconds SimulatedEventLoopFactory::send_delay() const {
-  return send_delay_;
-}
-
 void SimulatedEventLoop::MakeRawWatcher(
     const Channel *channel,
     std::function<void(const Context &channel, const void *message)> watcher) {
   TakeWatcher(channel);
 
-  std::unique_ptr<SimulatedWatcher> shm_watcher(
-      new SimulatedWatcher(this, scheduler_, channel, std::move(watcher)));
+  std::unique_ptr<SimulatedWatcher> shm_watcher(new SimulatedWatcher(
+      this, scheduler_, node_event_loop_factory_, channel, std::move(watcher)));
 
   GetSimulatedChannel(channel)->MakeRawWatcher(shm_watcher.get());
   NewWatcher(std::move(shm_watcher));
@@ -511,12 +514,13 @@
 
 SimulatedWatcher::SimulatedWatcher(
     SimulatedEventLoop *simulated_event_loop, EventScheduler *scheduler,
-    const Channel *channel,
+    NodeEventLoopFactory *node_event_loop_factory, const Channel *channel,
     std::function<void(const Context &context, const void *message)> fn)
     : WatcherState(simulated_event_loop, channel, std::move(fn)),
       simulated_event_loop_(simulated_event_loop),
       event_(this),
       scheduler_(scheduler),
+      node_event_loop_factory_(node_event_loop_factory),
       token_(scheduler_->InvalidToken()) {}
 
 SimulatedWatcher::~SimulatedWatcher() {
@@ -574,9 +578,10 @@
 }
 
 void SimulatedWatcher::DoSchedule(monotonic_clock::time_point event_time) {
-  token_ =
-      scheduler_->Schedule(event_time + simulated_event_loop_->send_delay(),
-                           [this]() { simulated_event_loop_->HandleEvent(); });
+  token_ = scheduler_->Schedule(
+      node_event_loop_factory_->ToDistributedClock(
+          event_time + simulated_event_loop_->send_delay()),
+      [this]() { simulated_event_loop_->HandleEvent(); });
 }
 
 void SimulatedChannel::MakeRawWatcher(SimulatedWatcher *watcher) {
@@ -622,12 +627,13 @@
 }
 
 SimulatedTimerHandler::SimulatedTimerHandler(
-    EventScheduler *scheduler, SimulatedEventLoop *simulated_event_loop,
-    ::std::function<void()> fn)
+    EventScheduler *scheduler, NodeEventLoopFactory *node_event_loop_factory,
+    SimulatedEventLoop *simulated_event_loop, ::std::function<void()> fn)
     : TimerHandler(simulated_event_loop, std::move(fn)),
       simulated_event_loop_(simulated_event_loop),
       event_(this),
       scheduler_(scheduler),
+      node_event_loop_factory_(node_event_loop_factory),
       token_(scheduler_->InvalidToken()) {}
 
 void SimulatedTimerHandler::Setup(monotonic_clock::time_point base,
@@ -639,10 +645,12 @@
   repeat_offset_ = repeat_offset;
   if (base < monotonic_now) {
     token_ = scheduler_->Schedule(
-        monotonic_now, [this]() { simulated_event_loop_->HandleEvent(); });
+        node_event_loop_factory_->ToDistributedClock(monotonic_now),
+        [this]() { simulated_event_loop_->HandleEvent(); });
   } else {
     token_ = scheduler_->Schedule(
-        base, [this]() { simulated_event_loop_->HandleEvent(); });
+        node_event_loop_factory_->ToDistributedClock(base),
+        [this]() { simulated_event_loop_->HandleEvent(); });
   }
   event_.set_event_time(base_);
   simulated_event_loop_->AddEvent(&event_);
@@ -655,7 +663,8 @@
     // Reschedule.
     while (base_ <= monotonic_now) base_ += repeat_offset_;
     token_ = scheduler_->Schedule(
-        base_, [this]() { simulated_event_loop_->HandleEvent(); });
+        node_event_loop_factory_->ToDistributedClock(base_),
+        [this]() { simulated_event_loop_->HandleEvent(); });
     event_.set_event_time(base_);
     simulated_event_loop_->AddEvent(&event_);
   } else {
@@ -674,13 +683,15 @@
 }
 
 SimulatedPhasedLoopHandler::SimulatedPhasedLoopHandler(
-    EventScheduler *scheduler, SimulatedEventLoop *simulated_event_loop,
-    ::std::function<void(int)> fn, const monotonic_clock::duration interval,
+    EventScheduler *scheduler, NodeEventLoopFactory *node_event_loop_factory,
+    SimulatedEventLoop *simulated_event_loop, ::std::function<void(int)> fn,
+    const monotonic_clock::duration interval,
     const monotonic_clock::duration offset)
     : PhasedLoopHandler(simulated_event_loop, std::move(fn), interval, offset),
       simulated_event_loop_(simulated_event_loop),
       event_(this),
       scheduler_(scheduler),
+      node_event_loop_factory_(node_event_loop_factory),
       token_(scheduler_->InvalidToken()) {}
 
 SimulatedPhasedLoopHandler::~SimulatedPhasedLoopHandler() {
@@ -702,51 +713,76 @@
 void SimulatedPhasedLoopHandler::Schedule(
     monotonic_clock::time_point sleep_time) {
   token_ = scheduler_->Schedule(
-      sleep_time, [this]() { simulated_event_loop_->HandleEvent(); });
+      node_event_loop_factory_->ToDistributedClock(sleep_time),
+      [this]() { simulated_event_loop_->HandleEvent(); });
   event_.set_event_time(sleep_time);
   simulated_event_loop_->AddEvent(&event_);
 }
 
+NodeEventLoopFactory::NodeEventLoopFactory(
+    EventScheduler *scheduler, SimulatedEventLoopFactory *factory,
+    const Node *node,
+    std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
+        *raw_event_loops)
+    : scheduler_(scheduler),
+      factory_(factory),
+      node_(node),
+      raw_event_loops_(raw_event_loops) {}
+
 SimulatedEventLoopFactory::SimulatedEventLoopFactory(
     const Configuration *configuration)
-    : configuration_(CHECK_NOTNULL(configuration)), node_(nullptr) {
-  CHECK(!configuration_->has_nodes())
-      << ": Got a configuration with multiple nodes and no node was selected.";
-}
-
-SimulatedEventLoopFactory::SimulatedEventLoopFactory(
-    const Configuration *configuration, std::string_view node_name)
-    : SimulatedEventLoopFactory(
-          configuration, configuration::GetNode(configuration, node_name)) {}
-
-SimulatedEventLoopFactory::SimulatedEventLoopFactory(
-    const Configuration *configuration, const Node *node)
-    : configuration_(CHECK_NOTNULL(configuration)), node_(node) {
-  if (node != nullptr) {
-    CHECK(configuration_->has_nodes())
-        << ": Got a configuration with no nodes and node \""
-        << node->name()->string_view() << "\" was selected.";
-    bool found = false;
-    for (const Node *node : *configuration_->nodes()) {
-      if (node == node_) {
-        found = true;
-        break;
-      }
+    : configuration_(CHECK_NOTNULL(configuration)) {
+  if (configuration::MultiNode(configuration_)) {
+    for (const Node *node : *configuration->nodes()) {
+      nodes_.emplace_back(node);
     }
-    CHECK(found) << ": node must be a pointer in the configuration.";
+  } else {
+    nodes_.emplace_back(nullptr);
+  }
+
+  for (const Node *node : nodes_) {
+    node_factories_.emplace_back(
+        new NodeEventLoopFactory(&scheduler_, this, node, &raw_event_loops_));
   }
 }
 
 SimulatedEventLoopFactory::~SimulatedEventLoopFactory() {}
 
+NodeEventLoopFactory *SimulatedEventLoopFactory::GetNodeEventLoopFactory(
+    const Node *node) {
+  auto result = std::find_if(
+      node_factories_.begin(), node_factories_.end(),
+      [node](const std::unique_ptr<NodeEventLoopFactory> &node_factory) {
+        return node_factory->node() == node;
+      });
+
+  CHECK(result != node_factories_.end())
+      << ": Failed to find node " << FlatbufferToJson(node);
+
+  return result->get();
+}
+
 ::std::unique_ptr<EventLoop> SimulatedEventLoopFactory::MakeEventLoop(
+    std::string_view name, const Node *node) {
+  if (node == nullptr) {
+    CHECK(!configuration::MultiNode(configuration()))
+        << ": Can't make a single node event loop in a multi-node world.";
+  } else {
+    CHECK(configuration::MultiNode(configuration()))
+        << ": Can't make a multi-node event loop in a single-node world.";
+  }
+  return GetNodeEventLoopFactory(node)->MakeEventLoop(name);
+}
+
+::std::unique_ptr<EventLoop> NodeEventLoopFactory::MakeEventLoop(
     std::string_view name) {
   pid_t tid = tid_;
   ++tid_;
   ::std::unique_ptr<SimulatedEventLoop> result(new SimulatedEventLoop(
-      &scheduler_, &channels_, configuration_, &raw_event_loops_, node_, tid));
+      scheduler_, this, &channels_, factory_->configuration(), raw_event_loops_,
+      node_, tid));
   result->set_name(name);
-  result->set_send_delay(send_delay_);
+  result->set_send_delay(factory_->send_delay());
   return std::move(result);
 }