Add a simulated message bridge

This gives us multi-node simulations!

Change-Id: I10fa955653766a26e4d11471a6dae5b47ea5cd1c
diff --git a/aos/events/BUILD b/aos/events/BUILD
index a183cae..fb0f356 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -243,8 +243,11 @@
 cc_test(
     name = "simulated_event_loop_test",
     srcs = ["simulated_event_loop_test.cc"],
+    data = ["multinode_pingpong_config.json"],
     deps = [
         ":event_loop_param_test",
+        ":ping_lib",
+        ":pong_lib",
         ":simulated_event_loop",
         "//aos/testing:googletest",
     ],
@@ -267,10 +270,12 @@
     srcs = [
         "event_scheduler.cc",
         "simulated_event_loop.cc",
+        "simulated_network_bridge.cc",
     ],
     hdrs = [
         "event_scheduler.h",
         "simulated_event_loop.h",
+        "simulated_network_bridge.h",
     ],
     visibility = ["//visibility:public"],
     deps = [
diff --git a/aos/events/event_loop_param_test.h b/aos/events/event_loop_param_test.h
index 4f24d78..89e80ae 100644
--- a/aos/events/event_loop_param_test.h
+++ b/aos/events/event_loop_param_test.h
@@ -55,7 +55,7 @@
   virtual void SleepFor(::std::chrono::nanoseconds duration) = 0;
 
   void EnableNodes(std::string_view my_node) {
-    std::string json = std::string(R"config({
+    std::string json = R"config({
   "channels": [
     {
       "name": "/aos",
@@ -80,9 +80,12 @@
   ],
   "nodes": [
     {
-      "name": ")config") +
-                       std::string(my_node) + R"config(",
+      "name": "me",
       "hostname": "myhostname"
+    },
+    {
+      "name": "them",
+      "hostname": "themhostname"
     }
   ]
 })config";
diff --git a/aos/events/logging/log_stats.cc b/aos/events/logging/log_stats.cc
index 2d9604f..c766fe6 100644
--- a/aos/events/logging/log_stats.cc
+++ b/aos/events/logging/log_stats.cc
@@ -65,7 +65,7 @@
 
   // Make an eventloop for retrieving stats
   std::unique_ptr<aos::EventLoop> stats_event_loop =
-      log_reader_factory.MakeEventLoop("logstats");
+      log_reader_factory.MakeEventLoop("logstats", reader.node());
   stats_event_loop->SkipTimingReport();
 
   // Read channel info and store in vector
diff --git a/aos/events/shm_event_loop_test.cc b/aos/events/shm_event_loop_test.cc
index d33f496..2edc3fc 100644
--- a/aos/events/shm_event_loop_test.cc
+++ b/aos/events/shm_event_loop_test.cc
@@ -38,7 +38,8 @@
 
   ::std::unique_ptr<EventLoop> Make(std::string_view name) override {
     if (configuration()->has_nodes()) {
-      FLAGS_override_hostname = "myhostname";
+      FLAGS_override_hostname =
+          std::string(my_node()->hostname()->string_view());
     }
     ::std::unique_ptr<ShmEventLoop> loop(new ShmEventLoop(configuration()));
     loop->set_name(name);
@@ -47,7 +48,8 @@
 
   ::std::unique_ptr<EventLoop> MakePrimary(std::string_view name) override {
     if (configuration()->has_nodes()) {
-      FLAGS_override_hostname = "myhostname";
+      FLAGS_override_hostname =
+          std::string(my_node()->hostname()->string_view());
     }
     ::std::unique_ptr<ShmEventLoop> loop =
         ::std::unique_ptr<ShmEventLoop>(new ShmEventLoop(configuration()));
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index 0d8a488..108a485 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -5,6 +5,7 @@
 #include <string_view>
 
 #include "absl/container/btree_map.h"
+#include "aos/events/simulated_network_bridge.h"
 #include "aos/json_to_flatbuffer.h"
 #include "aos/util/phased_loop.h"
 
@@ -744,6 +745,10 @@
     node_factories_.emplace_back(
         new NodeEventLoopFactory(&scheduler_, this, node, &raw_event_loops_));
   }
+
+  if (configuration::MultiNode(configuration)) {
+    bridge_ = std::make_unique<message_bridge::SimulatedMessageBridge>(this);
+  }
 }
 
 SimulatedEventLoopFactory::~SimulatedEventLoopFactory() {}
diff --git a/aos/events/simulated_event_loop.h b/aos/events/simulated_event_loop.h
index 019172b..8cff0d7 100644
--- a/aos/events/simulated_event_loop.h
+++ b/aos/events/simulated_event_loop.h
@@ -24,6 +24,9 @@
 class SimulatedChannel;
 
 class NodeEventLoopFactory;
+namespace message_bridge {
+class SimulatedMessageBridge;
+}
 
 // There are 2 concepts needed to support multi-node simulations.
 //  1) The node.  This is implemented with NodeEventLoopFactory.
@@ -110,6 +113,8 @@
   std::vector<std::unique_ptr<NodeEventLoopFactory>> node_factories_;
 
   std::vector<const Node *> nodes_;
+
+  std::unique_ptr<message_bridge::SimulatedMessageBridge> bridge_;
 };
 
 // This class holds all the state required to be a single node.
@@ -132,6 +137,14 @@
   inline monotonic_clock::time_point monotonic_now() const;
   inline realtime_clock::time_point realtime_now() const;
 
+  // Returns the simulated network delay for messages forwarded between nodes.
+  std::chrono::nanoseconds network_delay() const {
+    return factory_->network_delay();
+  }
+  // Returns the simulated send delay for all messages sent within a single
+  // node.
+  std::chrono::nanoseconds send_delay() const { return factory_->send_delay(); }
+
   // Converts a time to the distributed clock for scheduling and cross-node time
   // measurement.
   inline distributed_clock::time_point ToDistributedClock(
diff --git a/aos/events/simulated_event_loop_test.cc b/aos/events/simulated_event_loop_test.cc
index b373aa6..de5cbae 100644
--- a/aos/events/simulated_event_loop_test.cc
+++ b/aos/events/simulated_event_loop_test.cc
@@ -3,6 +3,8 @@
 #include <string_view>
 
 #include "aos/events/event_loop_param_test.h"
+#include "aos/events/ping_lib.h"
+#include "aos/events/pong_lib.h"
 #include "aos/events/test_message_generated.h"
 #include "gtest/gtest.h"
 
@@ -230,5 +232,45 @@
             0.0);
 }
 
+// Tests that ping and pong work when on 2 different nodes.
+TEST(SimulatedEventLoopTest, MultinodePingPong) {
+  aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+      aos::configuration::ReadConfig(
+          "aos/events/multinode_pingpong_config.json");
+  const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
+  const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
+
+  SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
+
+  std::unique_ptr<EventLoop> ping_event_loop =
+      simulated_event_loop_factory.MakeEventLoop("ping", pi1);
+  Ping ping(ping_event_loop.get());
+
+  std::unique_ptr<EventLoop> pong_event_loop =
+      simulated_event_loop_factory.MakeEventLoop("pong", pi2);
+  Pong pong(pong_event_loop.get());
+
+  std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
+      simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
+
+  int pi2_pong_count = 0;
+  pi2_pong_counter_event_loop->MakeWatcher(
+      "/test",
+      [&pi2_pong_count](const examples::Pong & /*pong*/) { ++pi2_pong_count; });
+
+  std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
+      simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
+  int pi1_pong_count = 0;
+  pi1_pong_counter_event_loop->MakeWatcher(
+      "/test",
+      [&pi1_pong_count](const examples::Pong & /*pong*/) { ++pi1_pong_count; });
+
+  simulated_event_loop_factory.RunFor(chrono::seconds(10) +
+                                      chrono::milliseconds(5));
+
+  EXPECT_EQ(pi1_pong_count, 1001);
+  EXPECT_EQ(pi2_pong_count, 1001);
+}
+
 }  // namespace testing
 }  // namespace aos
diff --git a/aos/events/simulated_network_bridge.cc b/aos/events/simulated_network_bridge.cc
new file mode 100644
index 0000000..57f2efd
--- /dev/null
+++ b/aos/events/simulated_network_bridge.cc
@@ -0,0 +1,165 @@
+#include "aos/events/simulated_network_bridge.h"
+
+#include "aos/events/event_loop.h"
+#include "aos/events/simulated_event_loop.h"
+
+namespace aos {
+namespace message_bridge {
+
+// This class delays messages forwarded between two factories.
+//
+// The basic design is that we need to use the distributed_clock to convert
+// monotonic times from the source to the destination node.  We also use a
+// fetcher to manage the queue of data, and a timer to schedule the sends.
+class RawMessageDelayer {
+ public:
+  RawMessageDelayer(aos::NodeEventLoopFactory *fetch_node_factory,
+                    aos::NodeEventLoopFactory *send_node_factory,
+                    aos::EventLoop *send_event_loop,
+                    std::unique_ptr<aos::RawFetcher> fetcher,
+                    std::unique_ptr<aos::RawSender> sender)
+      : fetch_node_factory_(fetch_node_factory),
+        send_node_factory_(send_node_factory),
+        send_event_loop_(send_event_loop),
+        fetcher_(std::move(fetcher)),
+        sender_(std::move(sender)) {
+    timer_ = send_event_loop_->AddTimer([this]() { Send(); });
+
+    Schedule();
+  }
+
+  // Kicks us to re-fetch and schedule the timer.
+  void Schedule() {
+    if (fetcher_->context().data == nullptr || sent_) {
+      sent_ = !fetcher_->FetchNext();
+    }
+
+    if (fetcher_->context().data == nullptr) {
+      return;
+    }
+
+    if (sent_) {
+      return;
+    }
+
+    // Compute the time to publish this message.
+    const monotonic_clock::time_point monotonic_delivered_time =
+        DeliveredTime(fetcher_->context());
+
+    CHECK_GE(monotonic_delivered_time, send_node_factory_->monotonic_now())
+        << ": Trying to deliver message in the past...";
+
+    timer_->Setup(monotonic_delivered_time);
+  }
+
+ private:
+  // Acutally sends the message, and reschedules.
+  void Send() {
+    // Compute the time to publish this message.
+    const monotonic_clock::time_point monotonic_delivered_time =
+        DeliveredTime(fetcher_->context());
+
+    CHECK_EQ(monotonic_delivered_time, send_node_factory_->monotonic_now())
+        << ": Message to be sent at the wrong time.";
+
+    // And also fill out the send times as well.
+    sender_->Send(fetcher_->context().data, fetcher_->context().size,
+                  fetcher_->context().monotonic_event_time,
+                  fetcher_->context().realtime_event_time,
+                  fetcher_->context().queue_index);
+
+    sent_ = true;
+    Schedule();
+  }
+
+  // Converts from time on the sending node to time on the receiving node.
+  monotonic_clock::time_point DeliveredTime(const Context &context) const {
+    const distributed_clock::time_point distributed_sent_time =
+        fetch_node_factory_->ToDistributedClock(context.monotonic_event_time);
+
+    return aos::monotonic_clock::epoch() +
+           (distributed_sent_time - send_node_factory_->ToDistributedClock(
+                                        aos::monotonic_clock::epoch())) +
+           send_node_factory_->network_delay() +
+           send_node_factory_->send_delay();
+  }
+
+  // Factories used for time conversion.
+  aos::NodeEventLoopFactory *fetch_node_factory_;
+  aos::NodeEventLoopFactory *send_node_factory_;
+
+  // Event loop which sending is scheduled on.
+  aos::EventLoop *send_event_loop_;
+  // Timer used to send.
+  aos::TimerHandler *timer_;
+  // Fetcher used to receive messages.
+  std::unique_ptr<aos::RawFetcher> fetcher_;
+  // Sender to send them back out.
+  std::unique_ptr<aos::RawSender> sender_;
+  // True if we have sent the message in the fetcher.
+  bool sent_ = false;
+};
+
+SimulatedMessageBridge::SimulatedMessageBridge(
+    SimulatedEventLoopFactory *simulated_event_loop_factory) {
+  CHECK(
+      configuration::MultiNode(simulated_event_loop_factory->configuration()));
+
+  // Pre-build up event loops for every node.  They are pretty cheap anyways.
+  for (const Node *node : simulated_event_loop_factory->nodes()) {
+    CHECK(event_loop_map_
+              .insert({node, simulated_event_loop_factory->MakeEventLoop(
+                                 "message_bridge", node)})
+              .second);
+  }
+
+  for (const Channel *channel :
+       *simulated_event_loop_factory->configuration()->channels()) {
+    if (!channel->has_destination_nodes()) {
+      continue;
+    }
+
+    // Find the sending node.
+    const Node *node =
+        configuration::GetNode(simulated_event_loop_factory->configuration(),
+                               channel->source_node()->string_view());
+    auto source_event_loop = event_loop_map_.find(node);
+    CHECK(source_event_loop != event_loop_map_.end());
+
+    std::unique_ptr<DelayersVector> delayers =
+        std::make_unique<DelayersVector>();
+
+    // And then build up a RawMessageDelayer for each destination.
+    for (const Connection *connection : *channel->destination_nodes()) {
+      const Node *destination_node =
+          configuration::GetNode(simulated_event_loop_factory->configuration(),
+                                 connection->name()->string_view());
+      auto destination_event_loop = event_loop_map_.find(destination_node);
+      CHECK(destination_event_loop != event_loop_map_.end());
+
+      delayers->emplace_back(std::make_unique<RawMessageDelayer>(
+          simulated_event_loop_factory->GetNodeEventLoopFactory(node),
+          simulated_event_loop_factory->GetNodeEventLoopFactory(
+              destination_node),
+          destination_event_loop->second.get(),
+          source_event_loop->second->MakeRawFetcher(channel),
+          destination_event_loop->second->MakeRawSender(channel)));
+    }
+
+    // And register every delayer to be poked when a new message shows up.
+    source_event_loop->second->MakeRawWatcher(
+        channel,
+        [captured_delayers = delayers.get()](const Context &, const void *) {
+          for (std::unique_ptr<RawMessageDelayer> &delayer :
+               *captured_delayers) {
+            delayer->Schedule();
+          }
+        });
+    delayers_list_.emplace_back(std::move(delayers));
+  }
+}
+
+SimulatedMessageBridge::~SimulatedMessageBridge() {}
+
+}  // namespace message_bridge
+}  // namespace aos
diff --git a/aos/events/simulated_network_bridge.h b/aos/events/simulated_network_bridge.h
new file mode 100644
index 0000000..5d613ab
--- /dev/null
+++ b/aos/events/simulated_network_bridge.h
@@ -0,0 +1,36 @@
+#ifndef AOS_EVENTS_SIMULATED_NETWORK_BRIDGE_H_
+#define AOS_EVENTS_SIMULATED_NETWORK_BRIDGE_H_
+
+#include "aos/events/event_loop.h"
+#include "aos/events/simulated_event_loop.h"
+
+namespace aos {
+namespace message_bridge {
+
+class RawMessageDelayer;
+
+// This class moves messages between nodes.  It is implemented as a separate
+// class because it would have been even harder to manage forwarding in the
+// SimulatedEventLoopFactory.
+class SimulatedMessageBridge {
+ public:
+  // Constructs the bridge.
+  SimulatedMessageBridge(
+      SimulatedEventLoopFactory *simulated_event_loop_factory);
+  ~SimulatedMessageBridge();
+
+ private:
+  // Map of nodes to event loops.  This is a member variable so that the
+  // lifetime of the event loops matches the lifetime of the bridge.
+  std::map<const Node *, std::unique_ptr<aos::EventLoop>> event_loop_map_;
+
+
+  // List of delayers used to resend the messages.
+  using DelayersVector = std::vector<std::unique_ptr<RawMessageDelayer>>;
+  std::vector<std::unique_ptr<DelayersVector>> delayers_list_;
+};
+
+}  // namespace message_bridge
+}  // namespace aos
+
+#endif  // AOS_EVENTS_SIMULATED_NETWORK_BRIDGE_H_