Add basic support for nodes

This adds the infrastructure and configuration language to describe a
multinode world.  This only checks that if there are multiple nodes
setup, everything is both configured for multiple nodes, and that we are
listening and sending data on the correct node per the configuration.

Change-Id: I658ba05620337a210d677c43e5eb840e05f96051
diff --git a/aos/events/event_loop.h b/aos/events/event_loop.h
index 68b1b5e..4cbb07b 100644
--- a/aos/events/event_loop.h
+++ b/aos/events/event_loop.h
@@ -318,6 +318,16 @@
         << ": Channel { \"name\": \"" << channel_name << "\", \"type\": \""
         << T::GetFullyQualifiedName() << "\" } not found in config.";
 
+    if (node() != nullptr) {
+      if (!configuration::ChannelIsReadableOnNode(channel, node())) {
+        LOG(FATAL)
+            << "Channel { \"name\": \"" << channel_name << "\", \"type\": \""
+            << T::GetFullyQualifiedName()
+            << "\" } is not able to be fetched on this node.  Check your "
+               "configuration.";
+      }
+    }
+
     return Fetcher<T>(MakeRawFetcher(channel));
   }
 
@@ -331,6 +341,15 @@
         << ": Channel { \"name\": \"" << channel_name << "\", \"type\": \""
         << T::GetFullyQualifiedName() << "\" } not found in config.";
 
+    if (node() != nullptr) {
+      if (!configuration::ChannelIsSendableOnNode(channel, node())) {
+        LOG(FATAL) << "Channel { \"name\": \"" << channel_name
+                   << "\", \"type\": \"" << T::GetFullyQualifiedName()
+                   << "\" } is not able to be sent on this node.  Check your "
+                      "configuration.";
+      }
+    }
+
     return Sender<T>(MakeRawSender(channel));
   }
 
@@ -348,11 +367,13 @@
   // Use this to run code once the thread goes into "real-time-mode",
   virtual void OnRun(::std::function<void()> on_run) = 0;
 
-  // Sets the name of the event loop.  This is the application name.
-  virtual void set_name(const std::string_view name) = 0;
-  // Gets the name of the event loop.
+  // Gets the name of the event loop.  This is the application name.
   virtual const std::string_view name() const = 0;
 
+  // Returns the node that this event loop is running on.  Returns nullptr if we
+  // are running in single-node mode.
+  virtual const Node *node() const = 0;
+
   // Creates a timer that executes callback when the timer expires
   // Returns a TimerHandle for configuration of the timer
   virtual TimerHandler *AddTimer(::std::function<void()> callback) = 0;
@@ -365,7 +386,7 @@
       const monotonic_clock::duration interval,
       const monotonic_clock::duration offset = ::std::chrono::seconds(0)) = 0;
 
-  // TODO(austin): OnExit
+  // TODO(austin): OnExit for cleanup.
 
   // Threadsafe.
   bool is_running() const { return is_running_.load(); }
@@ -375,31 +396,37 @@
   virtual void SetRuntimeRealtimePriority(int priority) = 0;
   virtual int priority() const = 0;
 
-  // Fetches new messages from the provided channel (path, type).  Note: this
-  // channel must be a member of the exact configuration object this was built
-  // with.
+  // Fetches new messages from the provided channel (path, type).
+  //
+  // Note: this channel must be a member of the exact configuration object this
+  // was built with.
   virtual std::unique_ptr<RawFetcher> MakeRawFetcher(
       const Channel *channel) = 0;
 
-  // Will watch channel (name, type) for new messages
+  // Watches channel (name, type) for new messages.
   virtual void MakeRawWatcher(
       const Channel *channel,
       std::function<void(const Context &context, const void *message)>
           watcher) = 0;
 
+  // Creates a raw sender for the provided channel.  This is used for reflection
+  // based sending.
+  // Note: this ignores any node constraints.  Ignore at your own peril.
+  virtual std::unique_ptr<RawSender> MakeRawSender(const Channel *channel) = 0;
+
   // Returns the context for the current callback.
   const Context &context() const { return context_; }
 
   // Returns the configuration that this event loop was built with.
   const Configuration *configuration() const { return configuration_; }
 
-  // Will send new messages from channel (path, type).
-  virtual std::unique_ptr<RawSender> MakeRawSender(const Channel *channel) = 0;
-
   // Prevents the event loop from sending a timing report.
   void SkipTimingReport() { skip_timing_report_ = true; }
 
  protected:
+  // Sets the name of the event loop.  This is the application name.
+  virtual void set_name(const std::string_view name) = 0;
+
   void set_is_running(bool value) { is_running_.store(value); }
 
   // Validates that channel exists inside configuration_ and finds its index.
diff --git a/aos/events/event_loop_param_test.cc b/aos/events/event_loop_param_test.cc
index cf3b6df..c1256f3 100644
--- a/aos/events/event_loop_param_test.cc
+++ b/aos/events/event_loop_param_test.cc
@@ -1145,5 +1145,100 @@
   EXPECT_TRUE(happened);
 }
 
+// Tests that not setting up nodes results in no node.
+TEST_P(AbstractEventLoopTest, NoNode) {
+  auto loop1 = Make();
+  auto loop2 = MakePrimary();
+
+  EXPECT_EQ(loop1->node(), nullptr);
+  EXPECT_EQ(loop2->node(), nullptr);
+}
+
+// Tests that setting up nodes results in node being set.
+TEST_P(AbstractEventLoopTest, Node) {
+  EnableNodes("me");
+
+  auto loop1 = Make();
+  auto loop2 = MakePrimary();
+
+  EXPECT_NE(loop1->node(), nullptr);
+  EXPECT_NE(loop2->node(), nullptr);
+}
+
+// Tests that watchers work with a node setup.
+TEST_P(AbstractEventLoopTest, NodeWatcher) {
+  EnableNodes("me");
+
+  auto loop1 = Make();
+  auto loop2 = Make();
+  loop1->MakeWatcher("/test", [](const TestMessage &) {});
+  loop2->MakeRawWatcher(configuration()->channels()->Get(1),
+                        [](const Context &, const void *) {});
+}
+
+// Tests that fetcher work with a node setup.
+TEST_P(AbstractEventLoopTest, NodeFetcher) {
+  EnableNodes("me");
+  auto loop1 = Make();
+
+  auto fetcher = loop1->MakeFetcher<TestMessage>("/test");
+  auto raw_fetcher = loop1->MakeRawFetcher(configuration()->channels()->Get(1));
+}
+
+// Tests that sender work with a node setup.
+TEST_P(AbstractEventLoopTest, NodeSender) {
+  EnableNodes("me");
+  auto loop1 = Make();
+
+  aos::Sender<TestMessage> sender = loop1->MakeSender<TestMessage>("/test");
+}
+
+// Tests that watchers fail when created on the wrong node.
+TEST_P(AbstractEventLoopDeathTest, NodeWatcher) {
+  EnableNodes("them");
+
+  auto loop1 = Make();
+  auto loop2 = Make();
+  EXPECT_DEATH({ loop1->MakeWatcher("/test", [](const TestMessage &) {}); },
+               "node");
+  EXPECT_DEATH(
+      {
+        loop2->MakeRawWatcher(configuration()->channels()->Get(1),
+                              [](const Context &, const void *) {});
+      },
+      "node");
+}
+
+// Tests that fetchers fail when created on the wrong node.
+TEST_P(AbstractEventLoopDeathTest, NodeFetcher) {
+  EnableNodes("them");
+  auto loop1 = Make();
+
+  EXPECT_DEATH({ auto fetcher = loop1->MakeFetcher<TestMessage>("/test"); },
+               "node");
+  EXPECT_DEATH(
+      {
+        auto raw_fetcher =
+            loop1->MakeRawFetcher(configuration()->channels()->Get(1));
+      },
+      "node");
+}
+
+// Tests that senders fail when created on the wrong node.
+TEST_P(AbstractEventLoopDeathTest, NodeSender) {
+  EnableNodes("them");
+  auto loop1 = Make();
+
+  EXPECT_DEATH(
+      {
+        aos::Sender<TestMessage> sender =
+            loop1->MakeSender<TestMessage>("/test");
+      },
+      "node");
+
+  // Note: Creating raw senders is always supported.  Right now, this lets us
+  // use them to create message_gateway.
+}
+
 }  // namespace testing
 }  // namespace aos
diff --git a/aos/events/event_loop_param_test.h b/aos/events/event_loop_param_test.h
index f4cf2ec..ea5fd3d 100644
--- a/aos/events/event_loop_param_test.h
+++ b/aos/events/event_loop_param_test.h
@@ -54,10 +54,53 @@
   // Advances time by sleeping.  Can't be called from inside a loop.
   virtual void SleepFor(::std::chrono::nanoseconds duration) = 0;
 
+  void EnableNodes(std::string_view my_node) {
+    std::string json = std::string(R"config({
+  "channels": [
+    {
+      "name": "/aos",
+      "type": "aos.timing.Report",
+      "source_node": "me"
+    },
+    {
+      "name": "/test",
+      "type": "aos.TestMessage",
+      "source_node": "me"
+    },
+    {
+      "name": "/test1",
+      "type": "aos.TestMessage",
+      "source_node": "me"
+    },
+    {
+      "name": "/test2",
+      "type": "aos.TestMessage",
+      "source_node": "me"
+    }
+  ],
+  "nodes": [
+    {
+      "name": ")config") +
+                       std::string(my_node) + R"config(",
+      "hostname": "myhostname"
+    }
+  ]
+})config";
+
+    flatbuffer_ = FlatbufferDetachedBuffer<Configuration>(
+        JsonToFlatbuffer(json, Configuration::MiniReflectTypeTable()));
+
+    my_node_ = my_node;
+  }
+
+  std::string_view my_node() const { return my_node_; }
+
   const Configuration *configuration() { return &flatbuffer_.message(); }
 
  private:
   FlatbufferDetachedBuffer<Configuration> flatbuffer_;
+
+  std::string my_node_;
 };
 
 class AbstractEventLoopTestBase
@@ -79,6 +122,8 @@
     return factory_->MakePrimary(name);
   }
 
+  void EnableNodes(std::string_view my_node) { factory_->EnableNodes(my_node); }
+
   void Run() { return factory_->Run(); }
 
   void Exit() { return factory_->Exit(); }
@@ -87,6 +132,10 @@
     return factory_->SleepFor(duration);
   }
 
+  const Configuration *configuration() { return factory_->configuration(); }
+
+  std::string_view my_node() const { return factory_->my_node(); }
+
   // Ends the given event loop at the given time from now.
   void EndEventLoop(EventLoop *loop, ::std::chrono::milliseconds duration) {
     auto end_timer = loop->AddTimer([this]() { this->Exit(); });
diff --git a/aos/events/event_loop_tmpl.h b/aos/events/event_loop_tmpl.h
index f652bb6..9e5b05d 100644
--- a/aos/events/event_loop_tmpl.h
+++ b/aos/events/event_loop_tmpl.h
@@ -36,6 +36,15 @@
       << ": Channel { \"name\": \"" << channel_name << "\", \"type\": \""
       << T::GetFullyQualifiedName() << "\" } not found in config.";
 
+  if (node() != nullptr) {
+    if (!configuration::ChannelIsReadableOnNode(channel, node())) {
+      LOG(FATAL) << "Channel { \"name\": \"" << channel_name
+                 << "\", \"type\": \"" << T::GetFullyQualifiedName()
+                 << "\" } is not able to be watched on this node.  Check your "
+                    "configuration.";
+    }
+  }
+
   return MakeRawWatcher(
       channel, [this, w](const Context &context, const void *message) {
         context_ = context;
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index e549d57..e696f36 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -126,6 +126,8 @@
   void *data_;
 };
 
+namespace {
+
 // Returns the portion of the path after the last /.
 std::string_view Filename(std::string_view path) {
   auto last_slash_pos = path.find_last_of("/");
@@ -135,15 +137,23 @@
              : path.substr(last_slash_pos + 1, path.size());
 }
 
-ShmEventLoop::ShmEventLoop(const Configuration *configuration)
-    : EventLoop(configuration), name_(Filename(program_invocation_name)) {}
+const Node *MaybeMyNode(const Configuration *configuration) {
+  if (!configuration->has_nodes()) {
+    return nullptr;
+  }
 
-namespace {
+  return configuration::GetMyNode(configuration);
+}
 
 namespace chrono = ::std::chrono;
 
 }  // namespace
 
+ShmEventLoop::ShmEventLoop(const Configuration *configuration)
+    : EventLoop(configuration),
+      name_(Filename(program_invocation_name)),
+      node_(MaybeMyNode(configuration)) {}
+
 namespace internal {
 
 class SimpleShmFetcher {
@@ -508,6 +518,16 @@
 
 ::std::unique_ptr<RawFetcher> ShmEventLoop::MakeRawFetcher(
     const Channel *channel) {
+
+  if (node() != nullptr) {
+    if (!configuration::ChannelIsReadableOnNode(channel, node())) {
+      LOG(FATAL) << "Channel { \"name\": \"" << channel->name()->string_view()
+                 << "\", \"type\": \"" << channel->type()->string_view()
+                 << "\" } is not able to be fetched on this node.  Check your "
+                    "configuration.";
+    }
+  }
+
   return ::std::unique_ptr<RawFetcher>(new internal::ShmFetcher(this, channel));
 }
 
@@ -523,6 +543,15 @@
     std::function<void(const Context &context, const void *message)> watcher) {
   Take(channel);
 
+  if (node() != nullptr) {
+    if (!configuration::ChannelIsReadableOnNode(channel, node())) {
+      LOG(FATAL) << "Channel { \"name\": \"" << channel->name()->string_view()
+                 << "\", \"type\": \"" << channel->type()->string_view()
+                 << "\" } is not able to be watched on this node.  Check your "
+                    "configuration.";
+    }
+  }
+
   NewWatcher(::std::unique_ptr<WatcherState>(
       new internal::WatcherState(this, channel, std::move(watcher))));
 }
diff --git a/aos/events/shm_event_loop.h b/aos/events/shm_event_loop.h
index 5063186..d10989e 100644
--- a/aos/events/shm_event_loop.h
+++ b/aos/events/shm_event_loop.h
@@ -67,6 +67,7 @@
     UpdateTimingReport();
   }
   const std::string_view name() const override { return name_; }
+  const Node *node() const override { return node_; }
 
   int priority() const override { return priority_; }
 
@@ -89,6 +90,7 @@
   std::vector<std::function<void()>> on_run_;
   int priority_ = 0;
   std::string name_;
+  const Node *const node_;
   std::vector<std::string> taken_;
 
   internal::EPoll epoll_;
diff --git a/aos/events/shm_event_loop_test.cc b/aos/events/shm_event_loop_test.cc
index 1bbca2c..f2c730b 100644
--- a/aos/events/shm_event_loop_test.cc
+++ b/aos/events/shm_event_loop_test.cc
@@ -9,6 +9,7 @@
 #include "aos/events/test_message_generated.h"
 
 DECLARE_string(shm_base);
+DECLARE_string(override_hostname);
 
 namespace aos {
 namespace testing {
@@ -33,13 +34,21 @@
     unlink((FLAGS_shm_base + "/aos/aos.timing.Report.v0").c_str());
   }
 
+  ~ShmEventLoopTestFactory() { FLAGS_override_hostname = ""; }
+
   ::std::unique_ptr<EventLoop> Make(std::string_view name) override {
-    ::std::unique_ptr<EventLoop> loop(new ShmEventLoop(configuration()));
+    if (configuration()->has_nodes()) {
+      FLAGS_override_hostname = "myhostname";
+    }
+    ::std::unique_ptr<ShmEventLoop> loop(new ShmEventLoop(configuration()));
     loop->set_name(name);
-    return loop;
+    return std::move(loop);
   }
 
   ::std::unique_ptr<EventLoop> MakePrimary(std::string_view name) override {
+    if (configuration()->has_nodes()) {
+      FLAGS_override_hostname = "myhostname";
+    }
     ::std::unique_ptr<ShmEventLoop> loop =
         ::std::unique_ptr<ShmEventLoop>(new ShmEventLoop(configuration()));
     primary_event_loop_ = loop.get();
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index 29c9edd..cd04c9f 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -304,11 +304,12 @@
       const Configuration *configuration,
       std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
           *raw_event_loops,
-      pid_t tid)
+      const Node *node, pid_t tid)
       : EventLoop(CHECK_NOTNULL(configuration)),
         scheduler_(scheduler),
         channels_(channels),
         raw_event_loops_(raw_event_loops),
+        node_(node),
         tid_(tid) {
     raw_event_loops_->push_back(std::make_pair(this, [this](bool value) {
       if (!has_setup_) {
@@ -378,6 +379,8 @@
     scheduler_->ScheduleOnRun(on_run);
   }
 
+  const Node *node() const override { return node_; }
+
   void set_name(const std::string_view name) override {
     name_ = std::string(name);
   }
@@ -428,6 +431,7 @@
 
   std::chrono::nanoseconds send_delay_;
 
+  const Node *const node_;
   const pid_t tid_;
 };
 
@@ -446,6 +450,16 @@
     std::function<void(const Context &channel, const void *message)> watcher) {
   ChannelIndex(channel);
   Take(channel);
+
+  if (node() != nullptr) {
+    if (!configuration::ChannelIsReadableOnNode(channel, node())) {
+      LOG(FATAL) << "Channel { \"name\": \"" << channel->name()->string_view()
+                 << "\", \"type\": \"" << channel->type()->string_view()
+                 << "\" } is not able to be watched on this node.  Check your "
+                    "configuration.";
+    }
+  }
+
   std::unique_ptr<SimulatedWatcher> shm_watcher(
       new SimulatedWatcher(this, scheduler_, channel, std::move(watcher)));
 
@@ -463,6 +477,16 @@
 std::unique_ptr<RawFetcher> SimulatedEventLoop::MakeRawFetcher(
     const Channel *channel) {
   ChannelIndex(channel);
+
+  if (node() != nullptr) {
+    if (!configuration::ChannelIsReadableOnNode(channel, node())) {
+      LOG(FATAL) << "Channel { \"name\": \"" << channel->name()->string_view()
+                 << "\", \"type\": \"" << channel->type()->string_view()
+                 << "\" } is not able to be fetched on this node.  Check your "
+                    "configuration.";
+    }
+  }
+
   return GetSimulatedChannel(channel)->MakeRawFetcher(this);
 }
 
@@ -676,7 +700,22 @@
 
 SimulatedEventLoopFactory::SimulatedEventLoopFactory(
     const Configuration *configuration)
-    : configuration_(CHECK_NOTNULL(configuration)) {}
+    : configuration_(CHECK_NOTNULL(configuration)), node_(nullptr) {
+  CHECK(!configuration_->has_nodes())
+      << ": Got a configuration with multiple nodes and no node was selected.";
+}
+
+SimulatedEventLoopFactory::SimulatedEventLoopFactory(
+    const Configuration *configuration, std::string_view node_name)
+    : configuration_(CHECK_NOTNULL(configuration)),
+      node_(configuration::GetNode(configuration, node_name)) {
+  CHECK(configuration_->has_nodes())
+      << ": Got a configuration with no nodes and node \"" << node_name
+      << "\" was selected.";
+  CHECK(node_ != nullptr) << ": Can't find node \"" << node_name
+                          << "\" in the configuration.";
+}
+
 SimulatedEventLoopFactory::~SimulatedEventLoopFactory() {}
 
 ::std::unique_ptr<EventLoop> SimulatedEventLoopFactory::MakeEventLoop(
@@ -684,7 +723,7 @@
   pid_t tid = tid_;
   ++tid_;
   ::std::unique_ptr<SimulatedEventLoop> result(new SimulatedEventLoop(
-      &scheduler_, &channels_, configuration_, &raw_event_loops_, tid));
+      &scheduler_, &channels_, configuration_, &raw_event_loops_, node_, tid));
   result->set_name(name);
   result->set_send_delay(send_delay_);
   return std::move(result);
diff --git a/aos/events/simulated_event_loop.h b/aos/events/simulated_event_loop.h
index 7b2b9c7..740a4c2 100644
--- a/aos/events/simulated_event_loop.h
+++ b/aos/events/simulated_event_loop.h
@@ -52,6 +52,8 @@
   // This configuration must remain in scope for the lifetime of the factory and
   // all sub-objects.
   SimulatedEventLoopFactory(const Configuration *configuration);
+  SimulatedEventLoopFactory(const Configuration *configuration,
+                            std::string_view node_name);
   ~SimulatedEventLoopFactory();
 
   ::std::unique_ptr<EventLoop> MakeEventLoop(std::string_view name);
@@ -68,6 +70,10 @@
   // Sets the simulated send delay for the factory.
   void set_send_delay(std::chrono::nanoseconds send_delay);
 
+  // Returns the node that this factory is running as, or nullptr if this is a
+  // single node setup.
+  const Node *node() const { return node_; }
+
   monotonic_clock::time_point monotonic_now() const {
     return scheduler_.monotonic_now();
   }
@@ -76,7 +82,7 @@
   }
 
  private:
-  const Configuration *configuration_;
+  const Configuration *const configuration_;
   EventScheduler scheduler_;
   // Map from name, type to queue.
   absl::btree_map<SimpleChannel, std::unique_ptr<SimulatedChannel>> channels_;
@@ -86,6 +92,8 @@
 
   std::chrono::nanoseconds send_delay_ = std::chrono::microseconds(50);
 
+  const Node *const node_;
+
   pid_t tid_ = 0;
 };
 
diff --git a/aos/events/simulated_event_loop_test.cc b/aos/events/simulated_event_loop_test.cc
index 44be581..4328d6f 100644
--- a/aos/events/simulated_event_loop_test.cc
+++ b/aos/events/simulated_event_loop_test.cc
@@ -13,28 +13,40 @@
 
 class SimulatedEventLoopTestFactory : public EventLoopTestFactory {
  public:
-  SimulatedEventLoopTestFactory() : event_loop_factory_(configuration()) {}
-
   ::std::unique_ptr<EventLoop> Make(std::string_view name) override {
-    return event_loop_factory_.MakeEventLoop(name);
+    MaybeMake();
+    return event_loop_factory_->MakeEventLoop(name);
   }
   ::std::unique_ptr<EventLoop> MakePrimary(std::string_view name) override {
-    return event_loop_factory_.MakeEventLoop(name);
+    MaybeMake();
+    return event_loop_factory_->MakeEventLoop(name);
   }
 
-  void Run() override { event_loop_factory_.Run(); }
-  void Exit() override { event_loop_factory_.Exit(); }
+  void Run() override { event_loop_factory_->Run(); }
+  void Exit() override { event_loop_factory_->Exit(); }
 
   // TODO(austin): Implement this.  It's used currently for a phased loop test.
   // I'm not sure how much that matters.
   void SleepFor(::std::chrono::nanoseconds /*duration*/) override {}
 
   void set_send_delay(std::chrono::nanoseconds send_delay) {
-    event_loop_factory_.set_send_delay(send_delay);
+    MaybeMake();
+    event_loop_factory_->set_send_delay(send_delay);
   }
 
  private:
-   SimulatedEventLoopFactory event_loop_factory_;
+  void MaybeMake() {
+    if (!event_loop_factory_) {
+      if (configuration()->has_nodes()) {
+        event_loop_factory_ = std::make_unique<SimulatedEventLoopFactory>(
+            configuration(), my_node());
+      } else {
+        event_loop_factory_ =
+            std::make_unique<SimulatedEventLoopFactory>(configuration());
+      }
+    }
+  }
+  std::unique_ptr<SimulatedEventLoopFactory> event_loop_factory_;
 };
 
 INSTANTIATE_TEST_CASE_P(SimulatedEventLoopDeathTest, AbstractEventLoopDeathTest,