Add a simulated message bridge

This gives us multi-node simulations!

Change-Id: I10fa955653766a26e4d11471a6dae5b47ea5cd1c
diff --git a/aos/events/simulated_network_bridge.cc b/aos/events/simulated_network_bridge.cc
new file mode 100644
index 0000000..57f2efd
--- /dev/null
+++ b/aos/events/simulated_network_bridge.cc
@@ -0,0 +1,165 @@
+#include "aos/events/simulated_network_bridge.h"
+
+#include "aos/events/event_loop.h"
+#include "aos/events/simulated_event_loop.h"
+
+namespace aos {
+namespace message_bridge {
+
+// This class delays messages forwarded between two factories.
+//
+// The basic design is that we need to use the distributed_clock to convert
+// monotonic times from the source to the destination node.  We also use a
+// fetcher to manage the queue of data, and a timer to schedule the sends.
+class RawMessageDelayer {
+ public:
+  RawMessageDelayer(aos::NodeEventLoopFactory *fetch_node_factory,
+                    aos::NodeEventLoopFactory *send_node_factory,
+                    aos::EventLoop *send_event_loop,
+                    std::unique_ptr<aos::RawFetcher> fetcher,
+                    std::unique_ptr<aos::RawSender> sender)
+      : fetch_node_factory_(fetch_node_factory),
+        send_node_factory_(send_node_factory),
+        send_event_loop_(send_event_loop),
+        fetcher_(std::move(fetcher)),
+        sender_(std::move(sender)) {
+    timer_ = send_event_loop_->AddTimer([this]() { Send(); });
+
+    Schedule();
+  }
+
+  // Kicks us to re-fetch and schedule the timer.
+  void Schedule() {
+    if (fetcher_->context().data == nullptr || sent_) {
+      sent_ = !fetcher_->FetchNext();
+    }
+
+    if (fetcher_->context().data == nullptr) {
+      return;
+    }
+
+    if (sent_) {
+      return;
+    }
+
+    // Compute the time to publish this message.
+    const monotonic_clock::time_point monotonic_delivered_time =
+        DeliveredTime(fetcher_->context());
+
+    CHECK_GE(monotonic_delivered_time, send_node_factory_->monotonic_now())
+        << ": Trying to deliver message in the past...";
+
+    timer_->Setup(monotonic_delivered_time);
+  }
+
+ private:
+  // Acutally sends the message, and reschedules.
+  void Send() {
+    // Compute the time to publish this message.
+    const monotonic_clock::time_point monotonic_delivered_time =
+        DeliveredTime(fetcher_->context());
+
+    CHECK_EQ(monotonic_delivered_time, send_node_factory_->monotonic_now())
+        << ": Message to be sent at the wrong time.";
+
+    // And also fill out the send times as well.
+    sender_->Send(fetcher_->context().data, fetcher_->context().size,
+                  fetcher_->context().monotonic_event_time,
+                  fetcher_->context().realtime_event_time,
+                  fetcher_->context().queue_index);
+
+    sent_ = true;
+    Schedule();
+  }
+
+  // Converts from time on the sending node to time on the receiving node.
+  monotonic_clock::time_point DeliveredTime(const Context &context) const {
+    const distributed_clock::time_point distributed_sent_time =
+        fetch_node_factory_->ToDistributedClock(context.monotonic_event_time);
+
+    return aos::monotonic_clock::epoch() +
+           (distributed_sent_time - send_node_factory_->ToDistributedClock(
+                                        aos::monotonic_clock::epoch())) +
+           send_node_factory_->network_delay() +
+           send_node_factory_->send_delay();
+  }
+
+  // Factories used for time conversion.
+  aos::NodeEventLoopFactory *fetch_node_factory_;
+  aos::NodeEventLoopFactory *send_node_factory_;
+
+  // Event loop which sending is scheduled on.
+  aos::EventLoop *send_event_loop_;
+  // Timer used to send.
+  aos::TimerHandler *timer_;
+  // Fetcher used to receive messages.
+  std::unique_ptr<aos::RawFetcher> fetcher_;
+  // Sender to send them back out.
+  std::unique_ptr<aos::RawSender> sender_;
+  // True if we have sent the message in the fetcher.
+  bool sent_ = false;
+};
+
+SimulatedMessageBridge::SimulatedMessageBridge(
+    SimulatedEventLoopFactory *simulated_event_loop_factory) {
+  CHECK(
+      configuration::MultiNode(simulated_event_loop_factory->configuration()));
+
+  // Pre-build up event loops for every node.  They are pretty cheap anyways.
+  for (const Node *node : simulated_event_loop_factory->nodes()) {
+    CHECK(event_loop_map_
+              .insert({node, simulated_event_loop_factory->MakeEventLoop(
+                                 "message_bridge", node)})
+              .second);
+  }
+
+  for (const Channel *channel :
+       *simulated_event_loop_factory->configuration()->channels()) {
+    if (!channel->has_destination_nodes()) {
+      continue;
+    }
+
+    // Find the sending node.
+    const Node *node =
+        configuration::GetNode(simulated_event_loop_factory->configuration(),
+                               channel->source_node()->string_view());
+    auto source_event_loop = event_loop_map_.find(node);
+    CHECK(source_event_loop != event_loop_map_.end());
+
+    std::unique_ptr<DelayersVector> delayers =
+        std::make_unique<DelayersVector>();
+
+    // And then build up a RawMessageDelayer for each destination.
+    for (const Connection *connection : *channel->destination_nodes()) {
+      const Node *destination_node =
+          configuration::GetNode(simulated_event_loop_factory->configuration(),
+                                 connection->name()->string_view());
+      auto destination_event_loop = event_loop_map_.find(destination_node);
+      CHECK(destination_event_loop != event_loop_map_.end());
+
+      delayers->emplace_back(std::make_unique<RawMessageDelayer>(
+          simulated_event_loop_factory->GetNodeEventLoopFactory(node),
+          simulated_event_loop_factory->GetNodeEventLoopFactory(
+              destination_node),
+          destination_event_loop->second.get(),
+          source_event_loop->second->MakeRawFetcher(channel),
+          destination_event_loop->second->MakeRawSender(channel)));
+    }
+
+    // And register every delayer to be poked when a new message shows up.
+    source_event_loop->second->MakeRawWatcher(
+        channel,
+        [captured_delayers = delayers.get()](const Context &, const void *) {
+          for (std::unique_ptr<RawMessageDelayer> &delayer :
+               *captured_delayers) {
+            delayer->Schedule();
+          }
+        });
+    delayers_list_.emplace_back(std::move(delayers));
+  }
+}
+
+SimulatedMessageBridge::~SimulatedMessageBridge() {}
+
+}  // namespace message_bridge
+}  // namespace aos