Merge "Run buildifier"
diff --git a/aos/configuration.cc b/aos/configuration.cc
index a40c47c..c4f139c 100644
--- a/aos/configuration.cc
+++ b/aos/configuration.cc
@@ -470,6 +470,18 @@
   }
 }
 
+size_t ChannelIndex(const Configuration *configuration,
+                    const Channel *channel) {
+  CHECK(configuration->channels() != nullptr) << ": No channels";
+
+  auto c = std::find(configuration->channels()->begin(),
+                     configuration->channels()->end(), channel);
+  CHECK(c != configuration->channels()->end())
+      << ": Channel pointer not found in configuration()->channels()";
+
+  return std::distance(configuration->channels()->begin(), c);
+}
+
 std::string CleanedChannelToString(const Channel *channel) {
   FlatbufferDetachedBuffer<Channel> cleaned_channel = CopyFlatBuffer(channel);
   cleaned_channel.mutable_message()->clear_schema();
@@ -591,6 +603,7 @@
   }
   return nullptr;
 }
+
 const Node *GetMyNode(const Configuration *config) {
   const std::string hostname = (FLAGS_override_hostname.size() > 0)
                                    ? FLAGS_override_hostname
@@ -604,6 +617,17 @@
   return nullptr;
 }
 
+const Node *GetNode(const Configuration *config, const Node *node) {
+  if (!MultiNode(config)) {
+    CHECK(node == nullptr) << ": Provided a node in a single node world.";
+    return nullptr;
+  } else {
+    CHECK(node != nullptr);
+    CHECK(node->has_name());
+    return GetNode(config, node->name()->string_view());
+  }
+}
+
 const Node *GetNode(const Configuration *config, std::string_view name) {
   CHECK(config->has_nodes())
       << ": Asking for a node from a single node configuration.";
@@ -616,6 +640,46 @@
   return nullptr;
 }
 
+const Node *GetNodeOrDie(const Configuration *config, const Node *node) {
+  if (!MultiNode(config)) {
+    CHECK(node == nullptr) << ": Provided a node in a single node world.";
+    return nullptr;
+  } else {
+    const Node *config_node = GetNode(config, node);
+    if (config_node == nullptr) {
+      LOG(FATAL) << "Couldn't find node matching " << FlatbufferToJson(node);
+    }
+    return config_node;
+  }
+}
+
+int GetNodeIndex(const Configuration *config, const Node *node) {
+  CHECK(config->has_nodes())
+      << ": Asking for a node from a single node configuration.";
+  int node_index = 0;
+  for (const Node *iterated_node : *config->nodes()) {
+    if (iterated_node == node) {
+      return node_index;
+    }
+    ++node_index;
+  }
+  LOG(FATAL) << "Node not found in the configuration.";
+}
+
+std::vector<const Node *> GetNodes(const Configuration *config) {
+  std::vector<const Node *> nodes;
+  if (configuration::MultiNode(config)) {
+    for (const Node *node : *config->nodes()) {
+      nodes.emplace_back(node);
+    }
+  } else {
+    nodes.emplace_back(nullptr);
+  }
+  return nodes;
+}
+
+bool MultiNode(const Configuration *config) { return config->has_nodes(); }
+
 bool ChannelIsSendableOnNode(const Channel *channel, const Node *node) {
   if (node == nullptr) {
     return true;
diff --git a/aos/configuration.h b/aos/configuration.h
index ef30fce..a9fbfe3 100644
--- a/aos/configuration.h
+++ b/aos/configuration.h
@@ -60,15 +60,32 @@
                     channel->type()->string_view(), application_name, node);
 }
 
+// Returns the channel index (or dies) of channel in the provided config.
+size_t ChannelIndex(const Configuration *config, const Channel *channel);
+
 // Returns the Node out of the config with the matching name, or nullptr if it
 // can't be found.
 const Node *GetNode(const Configuration *config, std::string_view name);
+const Node *GetNode(const Configuration *config, const Node *node);
+// Returns a matching node, or nullptr if the provided node is nullptr and we
+// are in a single node world.
+const Node *GetNodeOrDie(const Configuration *config, const Node *node);
 // Returns the Node out of the configuration which matches our hostname.
 // CHECKs if it can't be found.
 const Node *GetMyNode(const Configuration *config);
 const Node *GetNodeFromHostname(const Configuration *config,
                                 std::string_view name);
 
+// Returns a vector of the nodes in the config.  (nullptr is considered the node
+// in a single node world.)
+std::vector<const Node *> GetNodes(const Configuration *config);
+
+// Returns the node index for a node.  Note: node needs to exist inside config.
+int GetNodeIndex(const Configuration *config, const Node *node);
+
+// Returns true if we are running in a multinode configuration.
+bool MultiNode(const Configuration *config);
+
 // Returns true if the provided channel is sendable on the provided node.
 bool ChannelIsSendableOnNode(const Channel *channel, const Node *node);
 // Returns true if the provided channel is able to be watched or fetched on the
diff --git a/aos/configuration_test.cc b/aos/configuration_test.cc
index b8fc5b6..70515fd 100644
--- a/aos/configuration_test.cc
+++ b/aos/configuration_test.cc
@@ -39,6 +39,16 @@
       FlatbufferToJson(config, true));
 }
 
+// Tests that we can get back a ChannelIndex.
+TEST_F(ConfigurationTest, ChannelIndex) {
+  FlatbufferDetachedBuffer<Configuration> config =
+      ReadConfig("aos/testdata/config1.json");
+
+  EXPECT_EQ(
+      ChannelIndex(&config.message(), config.message().channels()->Get(1u)),
+      1u);
+}
+
 // Tests that we can read and merge a multinode configuration.
 TEST_F(ConfigurationTest, ConfigMergeMultinode) {
   FlatbufferDetachedBuffer<Configuration> config =
@@ -599,6 +609,63 @@
       ::testing::ElementsAreArray({"pi1"}));
 }
 
+// Tests that we can pull out all the nodes.
+TEST_F(ConfigurationTest, GetNodes) {
+  {
+    FlatbufferDetachedBuffer<Configuration> config =
+        ReadConfig("aos/testdata/good_multinode.json");
+    const Node *pi1 = GetNode(&config.message(), "pi1");
+    const Node *pi2 = GetNode(&config.message(), "pi2");
+
+    EXPECT_THAT(GetNodes(&config.message()), ::testing::ElementsAre(pi1, pi2));
+  }
+
+  {
+    FlatbufferDetachedBuffer<Configuration> config =
+        ReadConfig("aos/testdata/config1.json");
+    EXPECT_THAT(GetNodes(&config.message()), ::testing::ElementsAre(nullptr));
+  }
+}
+
+// Tests that we can extract a node index from a config.
+TEST_F(ConfigurationTest, GetNodeIndex) {
+  FlatbufferDetachedBuffer<Configuration> config =
+      ReadConfig("aos/testdata/good_multinode.json");
+  const Node *pi1 = GetNode(&config.message(), "pi1");
+  const Node *pi2 = GetNode(&config.message(), "pi2");
+
+  EXPECT_EQ(GetNodeIndex(&config.message(), pi1), 0);
+  EXPECT_EQ(GetNodeIndex(&config.message(), pi2), 1);
+}
+
+// Tests that GetNodeOrDie handles both single and multi-node worlds and returns
+// valid nodes.
+TEST_F(ConfigurationDeathTest, GetNodeOrDie) {
+  FlatbufferDetachedBuffer<Configuration> config =
+      ReadConfig("aos/testdata/good_multinode.json");
+  FlatbufferDetachedBuffer<Configuration> config2 =
+      ReadConfig("aos/testdata/good_multinode.json");
+  {
+    // Simple case, nullptr -> nullptr
+    FlatbufferDetachedBuffer<Configuration> single_node_config =
+        ReadConfig("aos/testdata/config1.json");
+    EXPECT_EQ(nullptr, GetNodeOrDie(&single_node_config.message(), nullptr));
+
+    // Confirm that we die when a node is passed in.
+    EXPECT_DEATH(
+        {
+          GetNodeOrDie(&single_node_config.message(),
+                       config.message().nodes()->Get(0));
+        },
+        "Provided a node in a single node world.");
+  }
+
+  const Node *pi1 = GetNode(&config.message(), "pi1");
+  // Now try a lookup using a node from a different instance of the config.
+  EXPECT_EQ(pi1,
+            GetNodeOrDie(&config.message(), config2.message().nodes()->Get(0)));
+}
+
 }  // namespace testing
 }  // namespace configuration
 }  // namespace aos
diff --git a/aos/events/BUILD b/aos/events/BUILD
index a183cae..a4c24ce 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -58,6 +58,7 @@
         "//aos:configuration",
         "//aos:configuration_fbs",
         "//aos:flatbuffers",
+        "//aos/ipc_lib:data_alignment",
         "//aos/time",
         "//aos/util:phased_loop",
         "@com_github_google_flatbuffers//:flatbuffers",
@@ -243,8 +244,11 @@
 cc_test(
     name = "simulated_event_loop_test",
     srcs = ["simulated_event_loop_test.cc"],
+    data = ["multinode_pingpong_config.json"],
     deps = [
         ":event_loop_param_test",
+        ":ping_lib",
+        ":pong_lib",
         ":simulated_event_loop",
         "//aos/testing:googletest",
     ],
@@ -267,10 +271,12 @@
     srcs = [
         "event_scheduler.cc",
         "simulated_event_loop.cc",
+        "simulated_network_bridge.cc",
     ],
     hdrs = [
         "event_scheduler.h",
         "simulated_event_loop.h",
+        "simulated_network_bridge.h",
     ],
     visibility = ["//visibility:public"],
     deps = [
diff --git a/aos/events/event_loop.cc b/aos/events/event_loop.cc
index e368df2..c6d3755 100644
--- a/aos/events/event_loop.cc
+++ b/aos/events/event_loop.cc
@@ -71,14 +71,7 @@
 }
 
 int EventLoop::ChannelIndex(const Channel *channel) {
-  CHECK(configuration_->channels() != nullptr) << ": No channels";
-
-  auto c = std::find(configuration_->channels()->begin(),
-                     configuration_->channels()->end(), channel);
-  CHECK(c != configuration_->channels()->end())
-      << ": Channel pointer not found in configuration()->channels()";
-
-  return std::distance(configuration()->channels()->begin(), c);
+  return configuration::ChannelIndex(configuration_, channel);
 }
 
 void EventLoop::NewSender(RawSender *sender) {
diff --git a/aos/events/event_loop.h b/aos/events/event_loop.h
index 4a12096..9618a32 100644
--- a/aos/events/event_loop.h
+++ b/aos/events/event_loop.h
@@ -11,6 +11,7 @@
 #include "aos/events/event_loop_generated.h"
 #include "aos/events/timing_statistics.h"
 #include "aos/flatbuffers.h"
+#include "aos/ipc_lib/data_alignment.h"
 #include "aos/json_to_flatbuffer.h"
 #include "aos/time/time.h"
 #include "aos/util/phased_loop.h"
@@ -141,6 +142,13 @@
   // Returns the queue index that this was sent with.
   uint32_t sent_queue_index() const { return sent_queue_index_; }
 
+  // Returns the associated flatbuffers-style allocator. This must be
+  // deallocated before the message is sent.
+  PreallocatedAllocator *fbb_allocator() {
+    fbb_allocator_ = PreallocatedAllocator(data(), size());
+    return &fbb_allocator_;
+  }
+
  protected:
   EventLoop *event_loop() { return event_loop_; }
 
@@ -166,6 +174,8 @@
   const Channel *channel_;
 
   internal::RawSenderTiming timing_;
+
+  PreallocatedAllocator fbb_allocator_{nullptr, 0};
 };
 
 // Fetches the newest message from a channel.
@@ -177,12 +187,26 @@
 
   // Fetches the next message. Returns true if it fetched a new message.  This
   // method will only return messages sent after the Fetcher was created.
-  bool FetchNext() { return fetcher_->FetchNext(); }
+  bool FetchNext() {
+    const bool result = fetcher_->FetchNext();
+    if (result) {
+      CheckChannelDataAlignment(fetcher_->context().data,
+                                fetcher_->context().size);
+    }
+    return result;
+  }
 
   // Fetches the most recent message. Returns true if it fetched a new message.
   // This will return the latest message regardless of if it was sent before or
   // after the fetcher was created.
-  bool Fetch() { return fetcher_->Fetch(); }
+  bool Fetch() {
+    const bool result = fetcher_->Fetch();
+    if (result) {
+      CheckChannelDataAlignment(fetcher_->context().data,
+                                fetcher_->context().size);
+    }
+    return result;
+  }
 
   // Returns a pointer to the contained flatbuffer, or nullptr if there is no
   // available message.
@@ -222,10 +246,17 @@
   // builder.Send(t_builder.Finish());
   class Builder {
    public:
-    Builder(RawSender *sender, void *data, size_t size)
-        : alloc_(data, size), fbb_(size, &alloc_), sender_(sender) {
+    Builder(RawSender *sender, PreallocatedAllocator *allocator)
+        : fbb_(allocator->size(), allocator), sender_(sender) {
+      CheckChannelDataAlignment(allocator->data(), allocator->size());
       fbb_.ForceDefaults(1);
     }
+    Builder() {}
+    Builder(const Builder &) = delete;
+    Builder(Builder &&) = default;
+
+    Builder &operator=(const Builder &) = delete;
+    Builder &operator=(Builder &&) = default;
 
     flatbuffers::FlatBufferBuilder *fbb() { return &fbb_; }
 
@@ -243,7 +274,6 @@
     void CheckSent() { fbb_.Finished(); }
 
    private:
-    PreallocatedAllocator alloc_;
     flatbuffers::FlatBufferBuilder fbb_;
     RawSender *sender_;
   };
diff --git a/aos/events/event_loop_param_test.h b/aos/events/event_loop_param_test.h
index ea5fd3d..89e80ae 100644
--- a/aos/events/event_loop_param_test.h
+++ b/aos/events/event_loop_param_test.h
@@ -55,7 +55,7 @@
   virtual void SleepFor(::std::chrono::nanoseconds duration) = 0;
 
   void EnableNodes(std::string_view my_node) {
-    std::string json = std::string(R"config({
+    std::string json = R"config({
   "channels": [
     {
       "name": "/aos",
@@ -80,9 +80,12 @@
   ],
   "nodes": [
     {
-      "name": ")config") +
-                       std::string(my_node) + R"config(",
+      "name": "me",
       "hostname": "myhostname"
+    },
+    {
+      "name": "them",
+      "hostname": "themhostname"
     }
   ]
 })config";
@@ -90,17 +93,17 @@
     flatbuffer_ = FlatbufferDetachedBuffer<Configuration>(
         JsonToFlatbuffer(json, Configuration::MiniReflectTypeTable()));
 
-    my_node_ = my_node;
+    my_node_ = configuration::GetNode(&flatbuffer_.message(), my_node);
   }
 
-  std::string_view my_node() const { return my_node_; }
+  const Node *my_node() const { return my_node_; }
 
   const Configuration *configuration() { return &flatbuffer_.message(); }
 
  private:
   FlatbufferDetachedBuffer<Configuration> flatbuffer_;
 
-  std::string my_node_;
+  const Node *my_node_ = nullptr;
 };
 
 class AbstractEventLoopTestBase
@@ -134,7 +137,7 @@
 
   const Configuration *configuration() { return factory_->configuration(); }
 
-  std::string_view my_node() const { return factory_->my_node(); }
+  const Node *my_node() const { return factory_->my_node(); }
 
   // Ends the given event loop at the given time from now.
   void EndEventLoop(EventLoop *loop, ::std::chrono::milliseconds duration) {
diff --git a/aos/events/event_loop_tmpl.h b/aos/events/event_loop_tmpl.h
index c2c9884..dfb5a6d 100644
--- a/aos/events/event_loop_tmpl.h
+++ b/aos/events/event_loop_tmpl.h
@@ -23,7 +23,7 @@
 
 template <typename T>
 typename Sender<T>::Builder Sender<T>::MakeBuilder() {
-  return Builder(sender_.get(), sender_->data(), sender_->size());
+  return Builder(sender_.get(), sender_->fbb_allocator());
 }
 
 template <typename Watch>
@@ -193,6 +193,7 @@
   // context.
   void DoCallCallback(std::function<monotonic_clock::time_point()> get_time,
                       Context context) {
+    CheckChannelDataAlignment(context.data, context.size);
     const monotonic_clock::time_point monotonic_start_time = get_time();
     {
       const float start_latency =
diff --git a/aos/events/event_scheduler.cc b/aos/events/event_scheduler.cc
index b5a530e..57f20ae 100644
--- a/aos/events/event_scheduler.cc
+++ b/aos/events/event_scheduler.cc
@@ -8,7 +8,7 @@
 namespace aos {
 
 EventScheduler::Token EventScheduler::Schedule(
-    ::aos::monotonic_clock::time_point time, ::std::function<void()> callback) {
+    distributed_clock::time_point time, ::std::function<void()> callback) {
   return events_list_.emplace(time, callback);
 }
 
@@ -16,9 +16,9 @@
   events_list_.erase(token);
 }
 
-void EventScheduler::RunFor(monotonic_clock::duration duration) {
-  const ::aos::monotonic_clock::time_point end_time =
-      monotonic_now() + duration;
+void EventScheduler::RunFor(distributed_clock::duration duration) {
+  const distributed_clock::time_point end_time =
+      distributed_now() + duration;
   is_running_ = true;
   for (std::function<void()> &on_run : on_run_) {
     on_run();
@@ -26,7 +26,7 @@
   on_run_.clear();
   while (!events_list_.empty() && is_running_) {
     auto iter = events_list_.begin();
-    ::aos::monotonic_clock::time_point next_time = iter->first;
+    distributed_clock::time_point next_time = iter->first;
     if (next_time > end_time) {
       break;
     }
@@ -53,4 +53,11 @@
   }
 }
 
+std::ostream &operator<<(std::ostream &stream,
+                         const aos::distributed_clock::time_point &now) {
+  // Print it the same way we print a monotonic time.  Literally.
+  stream << monotonic_clock::time_point(now.time_since_epoch());
+  return stream;
+}
+
 }  // namespace aos
diff --git a/aos/events/event_scheduler.h b/aos/events/event_scheduler.h
index 432f4ad..400a307 100644
--- a/aos/events/event_scheduler.h
+++ b/aos/events/event_scheduler.h
@@ -14,15 +14,43 @@
 
 namespace aos {
 
+// This clock is the basis for distributed time.  It is used to synchronize time
+// between multiple nodes.  This is a new type so conversions to and from the
+// monotonic and realtime clocks aren't implicit.
+class distributed_clock {
+ public:
+  typedef ::std::chrono::nanoseconds::rep rep;
+  typedef ::std::chrono::nanoseconds::period period;
+  typedef ::std::chrono::nanoseconds duration;
+  typedef ::std::chrono::time_point<distributed_clock> time_point;
+
+  // This clock is the base clock for the simulation and everything is synced to
+  // it.  It never jumps.
+  static constexpr bool is_steady = true;
+
+  // Returns the epoch (0).
+  static constexpr time_point epoch() { return time_point(zero()); }
+
+  static constexpr duration zero() { return duration(0); }
+
+  static constexpr time_point min_time{
+      time_point(duration(::std::numeric_limits<duration::rep>::min()))};
+  static constexpr time_point max_time{
+      time_point(duration(::std::numeric_limits<duration::rep>::max()))};
+};
+
+std::ostream &operator<<(std::ostream &stream,
+                         const aos::distributed_clock::time_point &now);
+
 class EventScheduler {
  public:
   using ChannelType =
-      std::multimap<monotonic_clock::time_point, std::function<void()>>;
+      std::multimap<distributed_clock::time_point, std::function<void()>>;
   using Token = ChannelType::iterator;
 
   // Schedule an event with a callback function
   // Returns an iterator to the event
-  Token Schedule(monotonic_clock::time_point time,
+  Token Schedule(distributed_clock::time_point time,
                  std::function<void()> callback);
 
   // Schedules a callback when the event scheduler starts.
@@ -38,30 +66,17 @@
   // Runs until exited.
   void Run();
   // Runs for a duration.
-  void RunFor(monotonic_clock::duration duration);
+  void RunFor(distributed_clock::duration duration);
 
   void Exit() { is_running_ = false; }
 
   bool is_running() const { return is_running_; }
 
-  monotonic_clock::time_point monotonic_now() const { return now_; }
-
-  realtime_clock::time_point realtime_now() const {
-    return realtime_clock::time_point(monotonic_now().time_since_epoch() +
-                                      realtime_offset_);
-  }
-
-  // Sets realtime clock to realtime_now for a given monotonic clock.
-  void SetRealtimeOffset(monotonic_clock::time_point monotonic_now,
-                         realtime_clock::time_point realtime_now) {
-    realtime_offset_ =
-        realtime_now.time_since_epoch() - monotonic_now.time_since_epoch();
-  }
+  distributed_clock::time_point distributed_now() const { return now_; }
 
  private:
   // Current execution time.
-  monotonic_clock::time_point now_ = monotonic_clock::epoch();
-  std::chrono::nanoseconds realtime_offset_ = std::chrono::seconds(0);
+  distributed_clock::time_point now_ = distributed_clock::epoch();
 
   std::vector<std::function<void()>> on_run_;
 
diff --git a/aos/events/logging/log_cat.cc b/aos/events/logging/log_cat.cc
index 1d1dd8b..80ec8e7 100644
--- a/aos/events/logging/log_cat.cc
+++ b/aos/events/logging/log_cat.cc
@@ -31,7 +31,7 @@
   reader.Register();
 
   std::unique_ptr<aos::EventLoop> printer_event_loop =
-      reader.event_loop_factory()->MakeEventLoop("printer");
+      reader.event_loop_factory()->MakeEventLoop("printer", reader.node());
   printer_event_loop->SkipTimingReport();
 
   bool found_channel = false;
@@ -66,7 +66,7 @@
                         << aos::FlatbufferToJson(
                                channel->schema(),
                                static_cast<const uint8_t *>(message))
-                        << '\n';
+                        << std::endl;
             } else {
               std::cout << context.realtime_event_time << " ("
                         << context.monotonic_event_time << ") "
@@ -75,7 +75,7 @@
                         << aos::FlatbufferToJson(
                                channel->schema(),
                                static_cast<const uint8_t *>(message))
-                        << '\n';
+                        << std::endl;
             }
           });
       found_channel = true;
diff --git a/aos/events/logging/log_stats.cc b/aos/events/logging/log_stats.cc
index 2d9604f..c766fe6 100644
--- a/aos/events/logging/log_stats.cc
+++ b/aos/events/logging/log_stats.cc
@@ -65,7 +65,7 @@
 
   // Make an eventloop for retrieving stats
   std::unique_ptr<aos::EventLoop> stats_event_loop =
-      log_reader_factory.MakeEventLoop("logstats");
+      log_reader_factory.MakeEventLoop("logstats", reader.node());
   stats_event_loop->SkipTimingReport();
 
   // Read channel info and store in vector
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index 51dc10c..2de17d7 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -276,13 +276,16 @@
 
 void LogReader::Register() {
   event_loop_factory_unique_ptr_ =
-      std::make_unique<SimulatedEventLoopFactory>(configuration(), node());
+      std::make_unique<SimulatedEventLoopFactory>(configuration());
   Register(event_loop_factory_unique_ptr_.get());
 }
 
 void LogReader::Register(SimulatedEventLoopFactory *event_loop_factory) {
   event_loop_factory_ = event_loop_factory;
-  event_loop_unique_ptr_ = event_loop_factory_->MakeEventLoop("log_reader");
+  node_event_loop_factory_ =
+      event_loop_factory_->GetNodeEventLoopFactory(node());
+  event_loop_unique_ptr_ =
+      event_loop_factory->MakeEventLoop("log_reader", node());
   // We don't run timing reports when trying to print out logged data, because
   // otherwise we would end up printing out the timing reports themselves...
   // This is only really relevant when we are replaying into a simulation.
@@ -355,8 +358,8 @@
                "this.";
 
         // If we have access to the factory, use it to fix the realtime time.
-        if (event_loop_factory_ != nullptr) {
-          event_loop_factory_->SetRealtimeOffset(
+        if (node_event_loop_factory_ != nullptr) {
+          node_event_loop_factory_->SetRealtimeOffset(
               monotonic_clock::time_point(chrono::nanoseconds(
                   channel_data.message().monotonic_sent_time())),
               realtime_clock::time_point(chrono::nanoseconds(
@@ -410,6 +413,7 @@
   event_loop_ = nullptr;
   event_loop_factory_unique_ptr_.reset();
   event_loop_factory_ = nullptr;
+  node_event_loop_factory_ = nullptr;
 }
 
 void LogReader::RemapLoggedChannel(std::string_view name, std::string_view type,
@@ -434,6 +438,9 @@
 }
 
 void LogReader::MakeRemappedConfig() {
+  CHECK(!event_loop_)
+      << ": Can't change the mapping after the events are scheduled.";
+
   // If no remapping occurred and we are using the original config, then there
   // is nothing interesting to do here.
   if (remapped_channels_.empty() && replay_configuration_ == nullptr) {
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index 337109b..54b55d8 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -152,6 +152,7 @@
   std::vector<std::unique_ptr<RawSender>> channels_;
 
   std::unique_ptr<EventLoop> event_loop_unique_ptr_;
+  NodeEventLoopFactory *node_event_loop_factory_ = nullptr;
   EventLoop *event_loop_ = nullptr;
   TimerHandler *timer_handler_;
 
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 5323493..55d0ecc 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -208,8 +208,10 @@
   MultinodeLoggerTest()
       : config_(aos::configuration::ReadConfig(
             "aos/events/logging/multinode_pingpong_config.json")),
-        event_loop_factory_(&config_.message(), "pi1"),
-        ping_event_loop_(event_loop_factory_.MakeEventLoop("ping")),
+        event_loop_factory_(&config_.message()),
+        ping_event_loop_(event_loop_factory_.MakeEventLoop(
+            "ping", configuration::GetNode(event_loop_factory_.configuration(),
+                                           "pi1"))),
         ping_(ping_event_loop_.get()) {}
 
   // Config and factory.
@@ -233,8 +235,10 @@
   LOG(INFO) << "Logging data to " << logfile;
 
   {
+    const Node *pi1 =
+        configuration::GetNode(event_loop_factory_.configuration(), "pi1");
     std::unique_ptr<EventLoop> pong_event_loop =
-        event_loop_factory_.MakeEventLoop("pong");
+        event_loop_factory_.MakeEventLoop("pong", pi1);
 
     std::unique_ptr<aos::RawSender> pong_sender(
         pong_event_loop->MakeRawSender(aos::configuration::GetChannel(
@@ -262,7 +266,7 @@
 
     DetachedBufferWriter writer(logfile);
     std::unique_ptr<EventLoop> logger_event_loop =
-        event_loop_factory_.MakeEventLoop("logger");
+        event_loop_factory_.MakeEventLoop("logger", pi1);
 
     event_loop_factory_.RunFor(chrono::milliseconds(95));
 
@@ -276,20 +280,23 @@
   // TODO(austin): Also replay as pi2 or pi3 and make sure we see the pong
   // messages.  This won't work today yet until the log reading code gets
   // significantly better.
-  SimulatedEventLoopFactory log_reader_factory(reader.logged_configuration(), reader.node());
+  SimulatedEventLoopFactory log_reader_factory(reader.logged_configuration());
   log_reader_factory.set_send_delay(chrono::microseconds(0));
 
   // This sends out the fetched messages and advances time to the start of the
   // log file.
   reader.Register(&log_reader_factory);
 
+  const Node *pi1 =
+      configuration::GetNode(log_reader_factory.configuration(), "pi1");
+
   ASSERT_NE(reader.node(), nullptr);
   EXPECT_EQ(reader.node()->name()->string_view(), "pi1");
 
   reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
 
   std::unique_ptr<EventLoop> test_event_loop =
-      log_reader_factory.MakeEventLoop("test");
+      log_reader_factory.MakeEventLoop("test", pi1);
 
   int ping_count = 10;
   int pong_count = 10;
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index cc11520..c4656d9 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -18,6 +18,7 @@
 #include "aos/ipc_lib/signalfd.h"
 #include "aos/realtime.h"
 #include "aos/stl_mutex/stl_mutex.h"
+#include "aos/util/file.h"
 #include "aos/util/phased_loop.h"
 #include "glog/logging.h"
 
@@ -72,7 +73,7 @@
 
     size_ = ipc_lib::LocklessQueueMemorySize(config_);
 
-    MkdirP(path);
+    util::MkdirP(path, FLAGS_permissions);
 
     // There are 2 cases.  Either the file already exists, or it does not
     // already exist and we need to create it.  Start by trying to create it. If
@@ -124,23 +125,6 @@
   const ipc_lib::LocklessQueueConfiguration &config() const { return config_; }
 
  private:
-  void MkdirP(std::string_view path) {
-    auto last_slash_pos = path.find_last_of("/");
-
-    std::string folder(last_slash_pos == std::string_view::npos
-                           ? std::string_view("")
-                           : path.substr(0, last_slash_pos));
-    if (folder.empty()) return;
-    MkdirP(folder);
-    VLOG(1) << "Creating " << folder;
-    const int result = mkdir(folder.c_str(), FLAGS_permissions);
-    if (result == -1 && errno == EEXIST) {
-      VLOG(1) << "Already exists";
-      return;
-    }
-    PCHECK(result == 0) << ": Error creating " << folder;
-  }
-
   ipc_lib::LocklessQueueConfiguration config_;
 
   int fd_;
@@ -184,8 +168,8 @@
                 event_loop->configuration()->channel_storage_duration()))),
         lockless_queue_(lockless_queue_memory_.memory(),
                         lockless_queue_memory_.config()),
-        data_storage_(static_cast<AlignedChar *>(aligned_alloc(
-                          alignof(AlignedChar), channel->max_size())),
+        data_storage_(static_cast<char *>(malloc(channel->max_size() +
+                                                 kChannelDataAlignment - 1)),
                       &free) {
     context_.data = nullptr;
     // Point the queue index at the next index to read starting now.  This
@@ -217,7 +201,7 @@
         actual_queue_index_.index(), &context_.monotonic_event_time,
         &context_.realtime_event_time, &context_.monotonic_remote_time,
         &context_.realtime_remote_time, &context_.remote_queue_index,
-        &context_.size, reinterpret_cast<char *>(data_storage_.get()));
+        &context_.size, data_storage_start());
     if (read_result == ipc_lib::LocklessQueue::ReadResult::GOOD) {
       context_.queue_index = actual_queue_index_.index();
       if (context_.remote_queue_index == 0xffffffffu) {
@@ -229,7 +213,7 @@
       if (context_.realtime_remote_time == aos::realtime_clock::min_time) {
         context_.realtime_remote_time = context_.realtime_event_time;
       }
-      context_.data = reinterpret_cast<char *>(data_storage_.get()) +
+      context_.data = data_storage_start() +
                       lockless_queue_.message_data_size() - context_.size;
       actual_queue_index_ = actual_queue_index_.Increment();
     }
@@ -267,7 +251,7 @@
         queue_index.index(), &context_.monotonic_event_time,
         &context_.realtime_event_time, &context_.monotonic_remote_time,
         &context_.realtime_remote_time, &context_.remote_queue_index,
-        &context_.size, reinterpret_cast<char *>(data_storage_.get()));
+        &context_.size, data_storage_start());
     if (read_result == ipc_lib::LocklessQueue::ReadResult::GOOD) {
       context_.queue_index = queue_index.index();
       if (context_.remote_queue_index == 0xffffffffu) {
@@ -279,7 +263,7 @@
       if (context_.realtime_remote_time == aos::realtime_clock::min_time) {
         context_.realtime_remote_time = context_.realtime_event_time;
       }
-      context_.data = reinterpret_cast<char *>(data_storage_.get()) +
+      context_.data = data_storage_start() +
                       lockless_queue_.message_data_size() - context_.size;
       actual_queue_index_ = queue_index.Increment();
     }
@@ -315,6 +299,10 @@
   void UnregisterWakeup() { lockless_queue_.UnregisterWakeup(); }
 
  private:
+  char *data_storage_start() {
+    return RoundChannelData(data_storage_.get(), channel_->max_size());
+  }
+
   const Channel *const channel_;
   MMapedQueue lockless_queue_memory_;
   ipc_lib::LocklessQueue lockless_queue_;
@@ -322,14 +310,7 @@
   ipc_lib::QueueIndex actual_queue_index_ =
       ipc_lib::LocklessQueue::empty_queue_index();
 
-  struct AlignedChar {
-    // Cortex-A72 (Raspberry Pi 4) and Cortex-A53 (Xavier AGX) both have 64 byte
-    // cache lines.
-    // V4L2 requires 64 byte alignment for USERPTR.
-    alignas(64) char data;
-  };
-
-  std::unique_ptr<AlignedChar, decltype(&free)> data_storage_;
+  std::unique_ptr<char, decltype(&free)> data_storage_;
 
   Context context_;
 };
diff --git a/aos/events/shm_event_loop_test.cc b/aos/events/shm_event_loop_test.cc
index d33f496..2edc3fc 100644
--- a/aos/events/shm_event_loop_test.cc
+++ b/aos/events/shm_event_loop_test.cc
@@ -38,7 +38,8 @@
 
   ::std::unique_ptr<EventLoop> Make(std::string_view name) override {
     if (configuration()->has_nodes()) {
-      FLAGS_override_hostname = "myhostname";
+      FLAGS_override_hostname =
+          std::string(my_node()->hostname()->string_view());
     }
     ::std::unique_ptr<ShmEventLoop> loop(new ShmEventLoop(configuration()));
     loop->set_name(name);
@@ -47,7 +48,8 @@
 
   ::std::unique_ptr<EventLoop> MakePrimary(std::string_view name) override {
     if (configuration()->has_nodes()) {
-      FLAGS_override_hostname = "myhostname";
+      FLAGS_override_hostname =
+          std::string(my_node()->hostname()->string_view());
     }
     ::std::unique_ptr<ShmEventLoop> loop =
         ::std::unique_ptr<ShmEventLoop>(new ShmEventLoop(configuration()));
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index e34a23f..c857bb6 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -5,6 +5,7 @@
 #include <string_view>
 
 #include "absl/container/btree_map.h"
+#include "aos/events/simulated_network_bridge.h"
 #include "aos/json_to_flatbuffer.h"
 #include "aos/util/phased_loop.h"
 
@@ -13,19 +14,17 @@
 // Container for both a message, and the context for it for simulation.  This
 // makes tracking the timestamps associated with the data easy.
 struct SimulatedMessage {
-  // Struct to let us force data to be well aligned.
-  struct OveralignedChar {
-    char data alignas(64);
-  };
-
   // Context for the data.
   Context context;
 
   // The data.
-  char *data() { return reinterpret_cast<char *>(&actual_data[0]); }
+  char *data(size_t buffer_size) {
+    return RoundChannelData(&actual_data[0], buffer_size);
+  }
 
-  // Then the data.
-  OveralignedChar actual_data[];
+  // Then the data, including padding on the end so we can align the buffer we
+  // actually return from data().
+  char actual_data[];
 };
 
 class SimulatedEventLoop;
@@ -36,6 +35,7 @@
  public:
   SimulatedWatcher(
       SimulatedEventLoop *simulated_event_loop, EventScheduler *scheduler,
+      NodeEventLoopFactory *node_event_loop_factory,
       const Channel *channel,
       std::function<void(const Context &context, const void *message)> fn);
 
@@ -59,6 +59,7 @@
   SimulatedEventLoop *simulated_event_loop_;
   EventHandler<SimulatedWatcher> event_;
   EventScheduler *scheduler_;
+  NodeEventLoopFactory *node_event_loop_factory_;
   EventScheduler::Token token_;
   SimulatedChannel *simulated_channel_ = nullptr;
 };
@@ -102,10 +103,6 @@
 
   const Channel *channel() const { return channel_; }
 
-  ::aos::monotonic_clock::time_point monotonic_now() const {
-    return scheduler_->monotonic_now();
-  }
-
  private:
   const Channel *channel_;
 
@@ -126,9 +123,9 @@
 // This is a shared_ptr so we don't have to implement refcounting or copying.
 std::shared_ptr<SimulatedMessage> MakeSimulatedMessage(size_t size) {
   SimulatedMessage *message = reinterpret_cast<SimulatedMessage *>(
-      malloc(sizeof(SimulatedMessage) + size));
+      malloc(sizeof(SimulatedMessage) + size + kChannelDataAlignment - 1));
   message->context.size = size;
-  message->context.data = message->data();
+  message->context.data = message->data(size);
 
   return std::shared_ptr<SimulatedMessage>(message, free);
 }
@@ -145,7 +142,7 @@
     if (!message_) {
       message_ = MakeSimulatedMessage(simulated_channel_->max_size());
     }
-    return message_->data();
+    return message_->data(simulated_channel_->max_size());
   }
 
   size_t size() override { return simulated_channel_->max_size(); }
@@ -186,12 +183,14 @@
     message_ = MakeSimulatedMessage(simulated_channel_->max_size());
 
     // Now fill in the message.  size is already populated above, and
-    // queue_index will be populated in queue_.  Put this at the back of the
-    // data segment.
-    memcpy(message_->data() + simulated_channel_->max_size() - size, msg, size);
+    // queue_index will be populated in simulated_channel_.  Put this at the
+    // back of the data segment.
+    memcpy(message_->data(simulated_channel_->max_size()) +
+               simulated_channel_->max_size() - size,
+           msg, size);
 
-    return Send(size, monotonic_remote_time, realtime_remote_time,
-                remote_queue_index);
+    return DoSend(size, monotonic_remote_time, realtime_remote_time,
+                  remote_queue_index);
   }
 
  private:
@@ -204,9 +203,11 @@
 
 class SimulatedFetcher : public RawFetcher {
  public:
-  explicit SimulatedFetcher(EventLoop *event_loop, SimulatedChannel *queue)
-      : RawFetcher(event_loop, queue->channel()), queue_(queue) {}
-  ~SimulatedFetcher() { queue_->UnregisterFetcher(this); }
+  explicit SimulatedFetcher(EventLoop *event_loop,
+                            SimulatedChannel *simulated_channel)
+      : RawFetcher(event_loop, simulated_channel->channel()),
+        simulated_channel_(simulated_channel) {}
+  ~SimulatedFetcher() { simulated_channel_->UnregisterFetcher(this); }
 
   std::pair<bool, monotonic_clock::time_point> DoFetchNext() override {
     if (msgs_.size() == 0) {
@@ -222,8 +223,8 @@
     if (msgs_.size() == 0) {
       // TODO(austin): Can we just do this logic unconditionally?  It is a lot
       // simpler.  And call clear, obviously.
-      if (!msg_ && queue_->latest_message()) {
-        SetMsg(queue_->latest_message());
+      if (!msg_ && simulated_channel_->latest_message()) {
+        SetMsg(simulated_channel_->latest_message());
         return std::make_pair(true, event_loop()->monotonic_now());
       } else {
         return std::make_pair(false, monotonic_clock::min_time);
@@ -260,7 +261,7 @@
     msgs_.emplace_back(buffer);
   }
 
-  SimulatedChannel *queue_;
+  SimulatedChannel *simulated_channel_;
   std::shared_ptr<SimulatedMessage> msg_;
 
   // Messages queued up but not in use.
@@ -270,6 +271,7 @@
 class SimulatedTimerHandler : public TimerHandler {
  public:
   explicit SimulatedTimerHandler(EventScheduler *scheduler,
+                                 NodeEventLoopFactory *node_event_loop_factory,
                                  SimulatedEventLoop *simulated_event_loop,
                                  ::std::function<void()> fn);
   ~SimulatedTimerHandler() { Disable(); }
@@ -285,6 +287,7 @@
   SimulatedEventLoop *simulated_event_loop_;
   EventHandler<SimulatedTimerHandler> event_;
   EventScheduler *scheduler_;
+  NodeEventLoopFactory *node_event_loop_factory_;
   EventScheduler::Token token_;
 
   monotonic_clock::time_point base_;
@@ -294,6 +297,7 @@
 class SimulatedPhasedLoopHandler : public PhasedLoopHandler {
  public:
   SimulatedPhasedLoopHandler(EventScheduler *scheduler,
+                             NodeEventLoopFactory *node_event_loop_factory,
                              SimulatedEventLoop *simulated_event_loop,
                              ::std::function<void(int)> fn,
                              const monotonic_clock::duration interval,
@@ -309,6 +313,7 @@
   EventHandler<SimulatedPhasedLoopHandler> event_;
 
   EventScheduler *scheduler_;
+  NodeEventLoopFactory *node_event_loop_factory_;
   EventScheduler::Token token_;
 };
 
@@ -316,6 +321,7 @@
  public:
   explicit SimulatedEventLoop(
       EventScheduler *scheduler,
+      NodeEventLoopFactory *node_event_loop_factory,
       absl::btree_map<SimpleChannel, std::unique_ptr<SimulatedChannel>>
           *channels,
       const Configuration *configuration,
@@ -324,6 +330,7 @@
       const Node *node, pid_t tid)
       : EventLoop(CHECK_NOTNULL(configuration)),
         scheduler_(scheduler),
+        node_event_loop_factory_(node_event_loop_factory),
         channels_(channels),
         raw_event_loops_(raw_event_loops),
         node_(node),
@@ -361,11 +368,11 @@
   }
 
   ::aos::monotonic_clock::time_point monotonic_now() override {
-    return scheduler_->monotonic_now();
+    return node_event_loop_factory_->monotonic_now();
   }
 
   ::aos::realtime_clock::time_point realtime_now() override {
-    return scheduler_->realtime_now();
+    return node_event_loop_factory_->realtime_now();
   }
 
   ::std::unique_ptr<RawSender> MakeRawSender(const Channel *channel) override;
@@ -379,17 +386,17 @@
 
   TimerHandler *AddTimer(::std::function<void()> callback) override {
     CHECK(!is_running());
-    return NewTimer(::std::unique_ptr<TimerHandler>(
-        new SimulatedTimerHandler(scheduler_, this, callback)));
+    return NewTimer(::std::unique_ptr<TimerHandler>(new SimulatedTimerHandler(
+        scheduler_, node_event_loop_factory_, this, callback)));
   }
 
   PhasedLoopHandler *AddPhasedLoop(::std::function<void(int)> callback,
                                    const monotonic_clock::duration interval,
                                    const monotonic_clock::duration offset =
                                        ::std::chrono::seconds(0)) override {
-    return NewPhasedLoop(
-        ::std::unique_ptr<PhasedLoopHandler>(new SimulatedPhasedLoopHandler(
-            scheduler_, this, callback, interval, offset)));
+    return NewPhasedLoop(::std::unique_ptr<PhasedLoopHandler>(
+        new SimulatedPhasedLoopHandler(scheduler_, node_event_loop_factory_,
+                                       this, callback, interval, offset)));
   }
 
   void OnRun(::std::function<void()> on_run) override {
@@ -433,6 +440,7 @@
   pid_t GetTid() override { return tid_; }
 
   EventScheduler *scheduler_;
+  NodeEventLoopFactory *node_event_loop_factory_;
   absl::btree_map<SimpleChannel, std::unique_ptr<SimulatedChannel>> *channels_;
   std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
       *raw_event_loops_;
@@ -459,17 +467,13 @@
   }
 }
 
-std::chrono::nanoseconds SimulatedEventLoopFactory::send_delay() const {
-  return send_delay_;
-}
-
 void SimulatedEventLoop::MakeRawWatcher(
     const Channel *channel,
     std::function<void(const Context &channel, const void *message)> watcher) {
   TakeWatcher(channel);
 
-  std::unique_ptr<SimulatedWatcher> shm_watcher(
-      new SimulatedWatcher(this, scheduler_, channel, std::move(watcher)));
+  std::unique_ptr<SimulatedWatcher> shm_watcher(new SimulatedWatcher(
+      this, scheduler_, node_event_loop_factory_, channel, std::move(watcher)));
 
   GetSimulatedChannel(channel)->MakeRawWatcher(shm_watcher.get());
   NewWatcher(std::move(shm_watcher));
@@ -511,12 +515,13 @@
 
 SimulatedWatcher::SimulatedWatcher(
     SimulatedEventLoop *simulated_event_loop, EventScheduler *scheduler,
-    const Channel *channel,
+    NodeEventLoopFactory *node_event_loop_factory, const Channel *channel,
     std::function<void(const Context &context, const void *message)> fn)
     : WatcherState(simulated_event_loop, channel, std::move(fn)),
       simulated_event_loop_(simulated_event_loop),
       event_(this),
       scheduler_(scheduler),
+      node_event_loop_factory_(node_event_loop_factory),
       token_(scheduler_->InvalidToken()) {}
 
 SimulatedWatcher::~SimulatedWatcher() {
@@ -574,9 +579,10 @@
 }
 
 void SimulatedWatcher::DoSchedule(monotonic_clock::time_point event_time) {
-  token_ =
-      scheduler_->Schedule(event_time + simulated_event_loop_->send_delay(),
-                           [this]() { simulated_event_loop_->HandleEvent(); });
+  token_ = scheduler_->Schedule(
+      node_event_loop_factory_->ToDistributedClock(
+          event_time + simulated_event_loop_->send_delay()),
+      [this]() { simulated_event_loop_->HandleEvent(); });
 }
 
 void SimulatedChannel::MakeRawWatcher(SimulatedWatcher *watcher) {
@@ -600,8 +606,8 @@
 uint32_t SimulatedChannel::Send(std::shared_ptr<SimulatedMessage> message) {
   const uint32_t queue_index = next_queue_index_.index();
   message->context.queue_index = queue_index;
-  message->context.data =
-      message->data() + channel()->max_size() - message->context.size;
+  message->context.data = message->data(channel()->max_size()) +
+                          channel()->max_size() - message->context.size;
   next_queue_index_ = next_queue_index_.Increment();
 
   latest_message_ = message;
@@ -622,12 +628,13 @@
 }
 
 SimulatedTimerHandler::SimulatedTimerHandler(
-    EventScheduler *scheduler, SimulatedEventLoop *simulated_event_loop,
-    ::std::function<void()> fn)
+    EventScheduler *scheduler, NodeEventLoopFactory *node_event_loop_factory,
+    SimulatedEventLoop *simulated_event_loop, ::std::function<void()> fn)
     : TimerHandler(simulated_event_loop, std::move(fn)),
       simulated_event_loop_(simulated_event_loop),
       event_(this),
       scheduler_(scheduler),
+      node_event_loop_factory_(node_event_loop_factory),
       token_(scheduler_->InvalidToken()) {}
 
 void SimulatedTimerHandler::Setup(monotonic_clock::time_point base,
@@ -639,10 +646,12 @@
   repeat_offset_ = repeat_offset;
   if (base < monotonic_now) {
     token_ = scheduler_->Schedule(
-        monotonic_now, [this]() { simulated_event_loop_->HandleEvent(); });
+        node_event_loop_factory_->ToDistributedClock(monotonic_now),
+        [this]() { simulated_event_loop_->HandleEvent(); });
   } else {
     token_ = scheduler_->Schedule(
-        base, [this]() { simulated_event_loop_->HandleEvent(); });
+        node_event_loop_factory_->ToDistributedClock(base),
+        [this]() { simulated_event_loop_->HandleEvent(); });
   }
   event_.set_event_time(base_);
   simulated_event_loop_->AddEvent(&event_);
@@ -655,7 +664,8 @@
     // Reschedule.
     while (base_ <= monotonic_now) base_ += repeat_offset_;
     token_ = scheduler_->Schedule(
-        base_, [this]() { simulated_event_loop_->HandleEvent(); });
+        node_event_loop_factory_->ToDistributedClock(base_),
+        [this]() { simulated_event_loop_->HandleEvent(); });
     event_.set_event_time(base_);
     simulated_event_loop_->AddEvent(&event_);
   } else {
@@ -674,13 +684,15 @@
 }
 
 SimulatedPhasedLoopHandler::SimulatedPhasedLoopHandler(
-    EventScheduler *scheduler, SimulatedEventLoop *simulated_event_loop,
-    ::std::function<void(int)> fn, const monotonic_clock::duration interval,
+    EventScheduler *scheduler, NodeEventLoopFactory *node_event_loop_factory,
+    SimulatedEventLoop *simulated_event_loop, ::std::function<void(int)> fn,
+    const monotonic_clock::duration interval,
     const monotonic_clock::duration offset)
     : PhasedLoopHandler(simulated_event_loop, std::move(fn), interval, offset),
       simulated_event_loop_(simulated_event_loop),
       event_(this),
       scheduler_(scheduler),
+      node_event_loop_factory_(node_event_loop_factory),
       token_(scheduler_->InvalidToken()) {}
 
 SimulatedPhasedLoopHandler::~SimulatedPhasedLoopHandler() {
@@ -702,51 +714,80 @@
 void SimulatedPhasedLoopHandler::Schedule(
     monotonic_clock::time_point sleep_time) {
   token_ = scheduler_->Schedule(
-      sleep_time, [this]() { simulated_event_loop_->HandleEvent(); });
+      node_event_loop_factory_->ToDistributedClock(sleep_time),
+      [this]() { simulated_event_loop_->HandleEvent(); });
   event_.set_event_time(sleep_time);
   simulated_event_loop_->AddEvent(&event_);
 }
 
+NodeEventLoopFactory::NodeEventLoopFactory(
+    EventScheduler *scheduler, SimulatedEventLoopFactory *factory,
+    const Node *node,
+    std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
+        *raw_event_loops)
+    : scheduler_(scheduler),
+      factory_(factory),
+      node_(node),
+      raw_event_loops_(raw_event_loops) {}
+
 SimulatedEventLoopFactory::SimulatedEventLoopFactory(
     const Configuration *configuration)
-    : configuration_(CHECK_NOTNULL(configuration)), node_(nullptr) {
-  CHECK(!configuration_->has_nodes())
-      << ": Got a configuration with multiple nodes and no node was selected.";
-}
-
-SimulatedEventLoopFactory::SimulatedEventLoopFactory(
-    const Configuration *configuration, std::string_view node_name)
-    : SimulatedEventLoopFactory(
-          configuration, configuration::GetNode(configuration, node_name)) {}
-
-SimulatedEventLoopFactory::SimulatedEventLoopFactory(
-    const Configuration *configuration, const Node *node)
-    : configuration_(CHECK_NOTNULL(configuration)), node_(node) {
-  if (node != nullptr) {
-    CHECK(configuration_->has_nodes())
-        << ": Got a configuration with no nodes and node \""
-        << node->name()->string_view() << "\" was selected.";
-    bool found = false;
-    for (const Node *node : *configuration_->nodes()) {
-      if (node == node_) {
-        found = true;
-        break;
-      }
+    : configuration_(CHECK_NOTNULL(configuration)) {
+  if (configuration::MultiNode(configuration_)) {
+    for (const Node *node : *configuration->nodes()) {
+      nodes_.emplace_back(node);
     }
-    CHECK(found) << ": node must be a pointer in the configuration.";
+  } else {
+    nodes_.emplace_back(nullptr);
+  }
+
+  for (const Node *node : nodes_) {
+    node_factories_.emplace_back(
+        new NodeEventLoopFactory(&scheduler_, this, node, &raw_event_loops_));
+  }
+
+  if (configuration::MultiNode(configuration)) {
+    bridge_ = std::make_unique<message_bridge::SimulatedMessageBridge>(this);
   }
 }
 
 SimulatedEventLoopFactory::~SimulatedEventLoopFactory() {}
 
+NodeEventLoopFactory *SimulatedEventLoopFactory::GetNodeEventLoopFactory(
+    const Node *node) {
+  auto result = std::find_if(
+      node_factories_.begin(), node_factories_.end(),
+      [node](const std::unique_ptr<NodeEventLoopFactory> &node_factory) {
+        return node_factory->node() == node;
+      });
+
+  CHECK(result != node_factories_.end())
+      << ": Failed to find node " << FlatbufferToJson(node);
+
+  return result->get();
+}
+
 ::std::unique_ptr<EventLoop> SimulatedEventLoopFactory::MakeEventLoop(
+    std::string_view name, const Node *node) {
+  if (node == nullptr) {
+    CHECK(!configuration::MultiNode(configuration()))
+        << ": Can't make a single node event loop in a multi-node world.";
+  } else {
+    CHECK(configuration::MultiNode(configuration()))
+        << ": Can't make a multi-node event loop in a single-node world.";
+  }
+  return GetNodeEventLoopFactory(node)->MakeEventLoop(name);
+}
+
+::std::unique_ptr<EventLoop> NodeEventLoopFactory::MakeEventLoop(
     std::string_view name) {
   pid_t tid = tid_;
   ++tid_;
   ::std::unique_ptr<SimulatedEventLoop> result(new SimulatedEventLoop(
-      &scheduler_, &channels_, configuration_, &raw_event_loops_, node_, tid));
+      scheduler_, this, &channels_, factory_->configuration(), raw_event_loops_,
+      node_, tid));
   result->set_name(name);
-  result->set_send_delay(send_delay_);
+  result->set_send_delay(factory_->send_delay());
   return std::move(result);
 }
 
diff --git a/aos/events/simulated_event_loop.h b/aos/events/simulated_event_loop.h
index de37c03..8cff0d7 100644
--- a/aos/events/simulated_event_loop.h
+++ b/aos/events/simulated_event_loop.h
@@ -23,66 +23,175 @@
 // Class for simulated fetchers.
 class SimulatedChannel;
 
+class NodeEventLoopFactory;
+namespace message_bridge {
+class SimulatedMessageBridge;
+}
+
+// There are 2 concepts needed to support multi-node simulations.
+//  1) The node.  This is implemented with NodeEventLoopFactory.
+//  2) The "robot" which runs multiple nodes.  This is implemented with
+//     SimulatedEventLoopFactory.
+//
+// To make things easier, SimulatedEventLoopFactory takes an optional Node
+// argument if you want to make event loops without interacting with the
+// NodeEventLoopFactory object.
+//
+// The basic flow goes something like as follows:
+//
+// SimulatedEventLoopFactory factory(config);
+// const Node *pi1 = configuration::GetNode(factory.configuration(), "pi1");
+// std::unique_ptr<EventLoop> event_loop = factory.MakeEventLoop("ping", pi1);
+//
+// Or
+//
+// SimulatedEventLoopFactory factory(config);
+// const Node *pi1 = configuration::GetNode(factory.configuration(), "pi1");
+// NodeEventLoopFactory *pi1_factory = factory.GetNodeEventLoopFactory(pi1);
+// std::unique_ptr<EventLoop> event_loop = pi1_factory.MakeEventLoop("ping");
+//
+// The distributed_clock is used to be the base time.  NodeEventLoopFactory has
+// all the information needed to adjust both the realtime and monotonic clocks
+// relative to the distributed_clock.
 class SimulatedEventLoopFactory {
  public:
   // Constructs a SimulatedEventLoopFactory with the provided configuration.
   // This configuration must remain in scope for the lifetime of the factory and
   // all sub-objects.
   SimulatedEventLoopFactory(const Configuration *configuration);
-  SimulatedEventLoopFactory(const Configuration *configuration,
-                            std::string_view node_name);
-  SimulatedEventLoopFactory(const Configuration *configuration,
-                            const Node *node);
   ~SimulatedEventLoopFactory();
 
-  ::std::unique_ptr<EventLoop> MakeEventLoop(std::string_view name);
+  // Creates an event loop.  If running in a multi-node environment, node needs
+  // to point to the node to create this event loop on.
+  ::std::unique_ptr<EventLoop> MakeEventLoop(std::string_view name,
+                                             const Node *node = nullptr);
+
+  // Returns the NodeEventLoopFactory for the provided node.  The returned
+  // NodeEventLoopFactory is owned by the SimulatedEventLoopFactory and has a
+  // lifetime identical to the factory.
+  NodeEventLoopFactory *GetNodeEventLoopFactory(const Node *node);
 
   // Starts executing the event loops unconditionally.
   void Run();
   // Executes the event loops for a duration.
-  void RunFor(monotonic_clock::duration duration);
+  void RunFor(distributed_clock::duration duration);
 
   // Stops executing all event loops.  Meant to be called from within an event
   // loop handler.
   void Exit() { scheduler_.Exit(); }
 
-  // Sets the simulated send delay for the factory.
+  const std::vector<const Node *> &nodes() const { return nodes_; }
+
+  // Sets the simulated send delay for all messages sent within a single node.
   void set_send_delay(std::chrono::nanoseconds send_delay);
-  std::chrono::nanoseconds send_delay() const;
+  std::chrono::nanoseconds send_delay() const { return send_delay_; }
+
+  // Sets the simulated network delay for messages forwarded between nodes.
+  void set_network_delay(std::chrono::nanoseconds network_delay);
+  std::chrono::nanoseconds network_delay() const { return network_delay_; }
+
+  // Returns the clock used to synchronize the nodes.
+  distributed_clock::time_point distributed_now() const {
+    return scheduler_.distributed_now();
+  }
+
+  // Returns the configuration used for everything.
+  const Configuration *configuration() const { return configuration_; }
+
+ private:
+  const Configuration *const configuration_;
+  EventScheduler scheduler_;
+  // List of event loops to manage running and not running for.
+  // The function is a callback used to set and clear the running bool on each
+  // event loop.
+  std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
+      raw_event_loops_;
+
+  std::chrono::nanoseconds send_delay_ = std::chrono::microseconds(50);
+  std::chrono::nanoseconds network_delay_ = std::chrono::microseconds(100);
+
+  std::vector<std::unique_ptr<NodeEventLoopFactory>> node_factories_;
+
+  std::vector<const Node *> nodes_;
+
+  std::unique_ptr<message_bridge::SimulatedMessageBridge> bridge_;
+};
+
+// This class holds all the state required to be a single node.
+class NodeEventLoopFactory {
+ public:
+  ::std::unique_ptr<EventLoop> MakeEventLoop(std::string_view name);
 
   // Returns the node that this factory is running as, or nullptr if this is a
   // single node setup.
   const Node *node() const { return node_; }
 
-  monotonic_clock::time_point monotonic_now() const {
-    return scheduler_.monotonic_now();
-  }
-  realtime_clock::time_point realtime_now() const {
-    return scheduler_.realtime_now();
-  }
-
   // Sets realtime clock to realtime_now for a given monotonic clock.
   void SetRealtimeOffset(monotonic_clock::time_point monotonic_now,
                          realtime_clock::time_point realtime_now) {
-    scheduler_.SetRealtimeOffset(monotonic_now, realtime_now);
+    realtime_offset_ =
+        realtime_now.time_since_epoch() - monotonic_now.time_since_epoch();
   }
 
- private:
-  const Configuration *const configuration_;
-  EventScheduler scheduler_;
-  // Map from name, type to queue.
-  absl::btree_map<SimpleChannel, std::unique_ptr<SimulatedChannel>> channels_;
-  // List of event loops to manage running and not running for.
-  std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
-      raw_event_loops_;
+  // Returns the current time on both clocks.
+  inline monotonic_clock::time_point monotonic_now() const;
+  inline realtime_clock::time_point realtime_now() const;
 
-  std::chrono::nanoseconds send_delay_ = std::chrono::microseconds(50);
+  // Returns the simulated network delay for messages forwarded between nodes.
+  std::chrono::nanoseconds network_delay() const {
+    return factory_->network_delay();
+  }
+  // Returns the simulated send delay for all messages sent within a single
+  // node.
+  std::chrono::nanoseconds send_delay() const { return factory_->send_delay(); }
+
+  // Converts a time to the distributed clock for scheduling and cross-node time
+  // measurement.
+  inline distributed_clock::time_point ToDistributedClock(
+      monotonic_clock::time_point time) const;
+
+ private:
+  friend class SimulatedEventLoopFactory;
+  NodeEventLoopFactory(
+      EventScheduler *scheduler, SimulatedEventLoopFactory *factory,
+      const Node *node,
+      std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
+          *raw_event_loops);
+
+  EventScheduler *const scheduler_;
+  SimulatedEventLoopFactory *const factory_;
 
   const Node *const node_;
 
+  std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
+      *const raw_event_loops_;
+
+  std::chrono::nanoseconds monotonic_offset_ = std::chrono::seconds(0);
+  std::chrono::nanoseconds realtime_offset_ = std::chrono::seconds(0);
+
+  // Map from name, type to queue.
+  absl::btree_map<SimpleChannel, std::unique_ptr<SimulatedChannel>> channels_;
+
+  // pid so we get unique timing reports.
   pid_t tid_ = 0;
 };
 
+inline monotonic_clock::time_point NodeEventLoopFactory::monotonic_now() const {
+  return monotonic_clock::time_point(
+      factory_->distributed_now().time_since_epoch() + monotonic_offset_);
+}
+
+inline realtime_clock::time_point NodeEventLoopFactory::realtime_now() const {
+  return realtime_clock::time_point(monotonic_now().time_since_epoch() +
+                                    realtime_offset_);
+}
+
+inline distributed_clock::time_point NodeEventLoopFactory::ToDistributedClock(
+    monotonic_clock::time_point time) const {
+  return distributed_clock::time_point(time.time_since_epoch() -
+                                       monotonic_offset_);
+}
+
 }  // namespace aos
 
 #endif  // AOS_EVENTS_SIMULATED_EVENT_LOOP_H_
diff --git a/aos/events/simulated_event_loop_test.cc b/aos/events/simulated_event_loop_test.cc
index 4328d6f..de5cbae 100644
--- a/aos/events/simulated_event_loop_test.cc
+++ b/aos/events/simulated_event_loop_test.cc
@@ -3,6 +3,8 @@
 #include <string_view>
 
 #include "aos/events/event_loop_param_test.h"
+#include "aos/events/ping_lib.h"
+#include "aos/events/pong_lib.h"
 #include "aos/events/test_message_generated.h"
 #include "gtest/gtest.h"
 
@@ -15,11 +17,11 @@
  public:
   ::std::unique_ptr<EventLoop> Make(std::string_view name) override {
     MaybeMake();
-    return event_loop_factory_->MakeEventLoop(name);
+    return event_loop_factory_->MakeEventLoop(name, my_node());
   }
   ::std::unique_ptr<EventLoop> MakePrimary(std::string_view name) override {
     MaybeMake();
-    return event_loop_factory_->MakeEventLoop(name);
+    return event_loop_factory_->MakeEventLoop(name, my_node());
   }
 
   void Run() override { event_loop_factory_->Run(); }
@@ -38,8 +40,8 @@
   void MaybeMake() {
     if (!event_loop_factory_) {
       if (configuration()->has_nodes()) {
-        event_loop_factory_ = std::make_unique<SimulatedEventLoopFactory>(
-            configuration(), my_node());
+        event_loop_factory_ =
+            std::make_unique<SimulatedEventLoopFactory>(configuration());
       } else {
         event_loop_factory_ =
             std::make_unique<SimulatedEventLoopFactory>(configuration());
@@ -64,12 +66,13 @@
   int counter = 0;
   EventScheduler scheduler;
 
-  scheduler.Schedule(::aos::monotonic_clock::now(),
-                      [&counter]() { counter += 1; });
+  scheduler.Schedule(distributed_clock::epoch() + chrono::seconds(1),
+                     [&counter]() { counter += 1; });
   scheduler.Run();
   EXPECT_EQ(counter, 1);
-  auto token = scheduler.Schedule(::aos::monotonic_clock::now(),
-                                   [&counter]() { counter += 1; });
+  auto token =
+      scheduler.Schedule(distributed_clock::epoch() + chrono::seconds(2),
+                         [&counter]() { counter += 1; });
   scheduler.Deschedule(token);
   scheduler.Run();
   EXPECT_EQ(counter, 1);
@@ -80,8 +83,9 @@
   int counter = 0;
   EventScheduler scheduler;
 
-  auto token = scheduler.Schedule(::aos::monotonic_clock::now(),
-                                   [&counter]() { counter += 1; });
+  auto token =
+      scheduler.Schedule(distributed_clock::epoch() + chrono::seconds(1),
+                         [&counter]() { counter += 1; });
   scheduler.Deschedule(token);
   scheduler.Run();
   EXPECT_EQ(counter, 0);
@@ -100,8 +104,6 @@
   simulated_event_loop_factory.RunFor(chrono::seconds(1));
 
   EXPECT_EQ(::aos::monotonic_clock::epoch() + chrono::seconds(1),
-            simulated_event_loop_factory.monotonic_now());
-  EXPECT_EQ(::aos::monotonic_clock::epoch() + chrono::seconds(1),
             event_loop->monotonic_now());
 }
 
@@ -125,8 +127,6 @@
   simulated_event_loop_factory.RunFor(chrono::seconds(1));
 
   EXPECT_EQ(::aos::monotonic_clock::epoch() + chrono::seconds(1),
-            simulated_event_loop_factory.monotonic_now());
-  EXPECT_EQ(::aos::monotonic_clock::epoch() + chrono::seconds(1),
             event_loop->monotonic_now());
   EXPECT_EQ(counter, 10);
 }
@@ -232,5 +232,45 @@
             0.0);
 }
 
+// Tests that ping and pong work when on 2 different nodes.
+TEST(SimulatedEventLoopTest, MultinodePingPong) {
+  aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+      aos::configuration::ReadConfig(
+          "aos/events/multinode_pingpong_config.json");
+  const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
+  const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
+
+  SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
+
+  std::unique_ptr<EventLoop> ping_event_loop =
+      simulated_event_loop_factory.MakeEventLoop("ping", pi1);
+  Ping ping(ping_event_loop.get());
+
+  std::unique_ptr<EventLoop> pong_event_loop =
+      simulated_event_loop_factory.MakeEventLoop("pong", pi2);
+  Pong pong(pong_event_loop.get());
+
+  std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
+      simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
+
+  int pi2_pong_count = 0;
+  pi2_pong_counter_event_loop->MakeWatcher(
+      "/test",
+      [&pi2_pong_count](const examples::Pong & /*pong*/) { ++pi2_pong_count; });
+
+  std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
+      simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
+  int pi1_pong_count = 0;
+  pi1_pong_counter_event_loop->MakeWatcher(
+      "/test",
+      [&pi1_pong_count](const examples::Pong & /*pong*/) { ++pi1_pong_count; });
+
+  simulated_event_loop_factory.RunFor(chrono::seconds(10) +
+                                      chrono::milliseconds(5));
+
+  EXPECT_EQ(pi1_pong_count, 1001);
+  EXPECT_EQ(pi2_pong_count, 1001);
+}
+
 }  // namespace testing
 }  // namespace aos
diff --git a/aos/events/simulated_network_bridge.cc b/aos/events/simulated_network_bridge.cc
new file mode 100644
index 0000000..57f2efd
--- /dev/null
+++ b/aos/events/simulated_network_bridge.cc
@@ -0,0 +1,165 @@
+#include "aos/events/simulated_network_bridge.h"
+
+#include "aos/events/event_loop.h"
+#include "aos/events/simulated_event_loop.h"
+
+namespace aos {
+namespace message_bridge {
+
+// This class delays messages forwarded between two factories.
+//
+// The basic design is that we need to use the distributed_clock to convert
+// monotonic times from the source to the destination node.  We also use a
+// fetcher to manage the queue of data, and a timer to schedule the sends.
+class RawMessageDelayer {
+ public:
+  RawMessageDelayer(aos::NodeEventLoopFactory *fetch_node_factory,
+                    aos::NodeEventLoopFactory *send_node_factory,
+                    aos::EventLoop *send_event_loop,
+                    std::unique_ptr<aos::RawFetcher> fetcher,
+                    std::unique_ptr<aos::RawSender> sender)
+      : fetch_node_factory_(fetch_node_factory),
+        send_node_factory_(send_node_factory),
+        send_event_loop_(send_event_loop),
+        fetcher_(std::move(fetcher)),
+        sender_(std::move(sender)) {
+    timer_ = send_event_loop_->AddTimer([this]() { Send(); });
+
+    Schedule();
+  }
+
+  // Kicks us to re-fetch and schedule the timer.
+  void Schedule() {
+    if (fetcher_->context().data == nullptr || sent_) {
+      sent_ = !fetcher_->FetchNext();
+    }
+
+    if (fetcher_->context().data == nullptr) {
+      return;
+    }
+
+    if (sent_) {
+      return;
+    }
+
+    // Compute the time to publish this message.
+    const monotonic_clock::time_point monotonic_delivered_time =
+        DeliveredTime(fetcher_->context());
+
+    CHECK_GE(monotonic_delivered_time, send_node_factory_->monotonic_now())
+        << ": Trying to deliver message in the past...";
+
+    timer_->Setup(monotonic_delivered_time);
+  }
+
+ private:
+  // Acutally sends the message, and reschedules.
+  void Send() {
+    // Compute the time to publish this message.
+    const monotonic_clock::time_point monotonic_delivered_time =
+        DeliveredTime(fetcher_->context());
+
+    CHECK_EQ(monotonic_delivered_time, send_node_factory_->monotonic_now())
+        << ": Message to be sent at the wrong time.";
+
+    // And also fill out the send times as well.
+    sender_->Send(fetcher_->context().data, fetcher_->context().size,
+                  fetcher_->context().monotonic_event_time,
+                  fetcher_->context().realtime_event_time,
+                  fetcher_->context().queue_index);
+
+    sent_ = true;
+    Schedule();
+  }
+
+  // Converts from time on the sending node to time on the receiving node.
+  monotonic_clock::time_point DeliveredTime(const Context &context) const {
+    const distributed_clock::time_point distributed_sent_time =
+        fetch_node_factory_->ToDistributedClock(context.monotonic_event_time);
+
+    return aos::monotonic_clock::epoch() +
+           (distributed_sent_time - send_node_factory_->ToDistributedClock(
+                                        aos::monotonic_clock::epoch())) +
+           send_node_factory_->network_delay() +
+           send_node_factory_->send_delay();
+  }
+
+  // Factories used for time conversion.
+  aos::NodeEventLoopFactory *fetch_node_factory_;
+  aos::NodeEventLoopFactory *send_node_factory_;
+
+  // Event loop which sending is scheduled on.
+  aos::EventLoop *send_event_loop_;
+  // Timer used to send.
+  aos::TimerHandler *timer_;
+  // Fetcher used to receive messages.
+  std::unique_ptr<aos::RawFetcher> fetcher_;
+  // Sender to send them back out.
+  std::unique_ptr<aos::RawSender> sender_;
+  // True if we have sent the message in the fetcher.
+  bool sent_ = false;
+};
+
+SimulatedMessageBridge::SimulatedMessageBridge(
+    SimulatedEventLoopFactory *simulated_event_loop_factory) {
+  CHECK(
+      configuration::MultiNode(simulated_event_loop_factory->configuration()));
+
+  // Pre-build up event loops for every node.  They are pretty cheap anyways.
+  for (const Node *node : simulated_event_loop_factory->nodes()) {
+    CHECK(event_loop_map_
+              .insert({node, simulated_event_loop_factory->MakeEventLoop(
+                                 "message_bridge", node)})
+              .second);
+  }
+
+  for (const Channel *channel :
+       *simulated_event_loop_factory->configuration()->channels()) {
+    if (!channel->has_destination_nodes()) {
+      continue;
+    }
+
+    // Find the sending node.
+    const Node *node =
+        configuration::GetNode(simulated_event_loop_factory->configuration(),
+                               channel->source_node()->string_view());
+    auto source_event_loop = event_loop_map_.find(node);
+    CHECK(source_event_loop != event_loop_map_.end());
+
+    std::unique_ptr<DelayersVector> delayers =
+        std::make_unique<DelayersVector>();
+
+    // And then build up a RawMessageDelayer for each destination.
+    for (const Connection *connection : *channel->destination_nodes()) {
+      const Node *destination_node =
+          configuration::GetNode(simulated_event_loop_factory->configuration(),
+                                 connection->name()->string_view());
+      auto destination_event_loop = event_loop_map_.find(destination_node);
+      CHECK(destination_event_loop != event_loop_map_.end());
+
+      delayers->emplace_back(std::make_unique<RawMessageDelayer>(
+          simulated_event_loop_factory->GetNodeEventLoopFactory(node),
+          simulated_event_loop_factory->GetNodeEventLoopFactory(
+              destination_node),
+          destination_event_loop->second.get(),
+          source_event_loop->second->MakeRawFetcher(channel),
+          destination_event_loop->second->MakeRawSender(channel)));
+    }
+
+    // And register every delayer to be poked when a new message shows up.
+    source_event_loop->second->MakeRawWatcher(
+        channel,
+        [captured_delayers = delayers.get()](const Context &, const void *) {
+          for (std::unique_ptr<RawMessageDelayer> &delayer :
+               *captured_delayers) {
+            delayer->Schedule();
+          }
+        });
+    delayers_list_.emplace_back(std::move(delayers));
+  }
+}
+
+SimulatedMessageBridge::~SimulatedMessageBridge() {}
+
+}  // namespace message_bridge
+}  // namespace aos
diff --git a/aos/events/simulated_network_bridge.h b/aos/events/simulated_network_bridge.h
new file mode 100644
index 0000000..5d613ab
--- /dev/null
+++ b/aos/events/simulated_network_bridge.h
@@ -0,0 +1,36 @@
+#ifndef AOS_EVENTS_SIMULATED_NETWORK_BRIDGE_H_
+#define AOS_EVENTS_SIMULATED_NETWORK_BRIDGE_H_
+
+#include "aos/events/event_loop.h"
+#include "aos/events/simulated_event_loop.h"
+
+namespace aos {
+namespace message_bridge {
+
+class RawMessageDelayer;
+
+// This class moves messages between nodes.  It is implemented as a separate
+// class because it would have been even harder to manage forwarding in the
+// SimulatedEventLoopFactory.
+class SimulatedMessageBridge {
+ public:
+  // Constructs the bridge.
+  SimulatedMessageBridge(
+      SimulatedEventLoopFactory *simulated_event_loop_factory);
+  ~SimulatedMessageBridge();
+
+ private:
+  // Map of nodes to event loops.  This is a member variable so that the
+  // lifetime of the event loops matches the lifetime of the bridge.
+  std::map<const Node *, std::unique_ptr<aos::EventLoop>> event_loop_map_;
+
+
+  // List of delayers used to resend the messages.
+  using DelayersVector = std::vector<std::unique_ptr<RawMessageDelayer>>;
+  std::vector<std::unique_ptr<DelayersVector>> delayers_list_;
+};
+
+}  // namespace message_bridge
+}  // namespace aos
+
+#endif  // AOS_EVENTS_SIMULATED_NETWORK_BRIDGE_H_
diff --git a/aos/flatbuffers.h b/aos/flatbuffers.h
index c1c0db0..d9fcab6 100644
--- a/aos/flatbuffers.h
+++ b/aos/flatbuffers.h
@@ -12,6 +12,8 @@
 // This class is a base class for all sizes of array backed allocators.
 class FixedAllocatorBase : public flatbuffers::Allocator {
  public:
+  ~FixedAllocatorBase() override { CHECK(!is_allocated_); }
+
   // TODO(austin): Read the contract for these.
   uint8_t *allocate(size_t) override;
 
@@ -25,6 +27,7 @@
   virtual size_t size() const = 0;
 
   void Reset() { is_allocated_ = false; }
+  bool is_allocated() const { return is_allocated_; }
 
  private:
   bool is_allocated_ = false;
@@ -51,14 +54,32 @@
 class PreallocatedAllocator : public FixedAllocatorBase {
  public:
   PreallocatedAllocator(void *data, size_t size) : data_(data), size_(size) {}
-  uint8_t *data() override { return reinterpret_cast<uint8_t *>(data_); }
-  const uint8_t *data() const override {
-    return reinterpret_cast<const uint8_t *>(data_);
+  PreallocatedAllocator(const PreallocatedAllocator&) = delete;
+  PreallocatedAllocator(PreallocatedAllocator &&other)
+      : data_(other.data_), size_(other.size_) {
+    CHECK(!is_allocated());
+    CHECK(!other.is_allocated());
   }
-  size_t size() const override { return size_; }
+
+  PreallocatedAllocator &operator=(const PreallocatedAllocator &) = delete;
+  PreallocatedAllocator &operator=(PreallocatedAllocator &&other) {
+    CHECK(!is_allocated());
+    CHECK(!other.is_allocated());
+    data_ = other.data_;
+    size_ = other.size_;
+    return *this;
+  }
+
+  uint8_t *data() final {
+    return reinterpret_cast<uint8_t *>(CHECK_NOTNULL(data_));
+  }
+  const uint8_t *data() const final {
+    return reinterpret_cast<const uint8_t *>(CHECK_NOTNULL(data_));
+  }
+  size_t size() const final { return size_; }
 
  private:
-  void* data_;
+  void *data_;
   size_t size_;
 };
 
diff --git a/aos/ipc_lib/BUILD b/aos/ipc_lib/BUILD
index 57adc6d..91b050c 100644
--- a/aos/ipc_lib/BUILD
+++ b/aos/ipc_lib/BUILD
@@ -204,6 +204,7 @@
     visibility = ["//visibility:public"],
     deps = [
         ":aos_sync",
+        ":data_alignment",
         ":index",
         "//aos:realtime",
         "//aos/time",
@@ -259,3 +260,14 @@
         "//aos/testing:test_logging",
     ],
 )
+
+cc_library(
+    name = "data_alignment",
+    hdrs = [
+        "data_alignment.h",
+    ],
+    visibility = ["//visibility:public"],
+    deps = [
+        "@com_github_google_glog//:glog",
+    ],
+)
diff --git a/aos/ipc_lib/data_alignment.h b/aos/ipc_lib/data_alignment.h
new file mode 100644
index 0000000..2f59b78
--- /dev/null
+++ b/aos/ipc_lib/data_alignment.h
@@ -0,0 +1,41 @@
+#ifndef AOS_IPC_LIB_DATA_ALIGNMENT_H_
+#define AOS_IPC_LIB_DATA_ALIGNMENT_H_
+
+#include "glog/logging.h"
+
+namespace aos {
+
+// All data buffers sent over or received from a channel will guarantee this
+// alignment for their end. Flatbuffers aligns from the end, so this is what
+// matters.
+//
+// 64 is a reasonable choice for now:
+//   Cortex-A72 (Raspberry Pi 4) and Cortex-A53 (Xavier AGX) both have 64 byte
+//   cache lines.
+//   V4L2 requires 64 byte alignment for USERPTR buffers.
+static constexpr size_t kChannelDataAlignment = 64;
+
+template <typename T>
+inline void CheckChannelDataAlignment(T *data, size_t size) {
+  CHECK_EQ((reinterpret_cast<uintptr_t>(data) + size) % kChannelDataAlignment,
+           0u)
+      << ": data pointer is not end aligned as it should be: " << data << " + "
+      << size;
+}
+
+// Aligns the beginning of a channel data buffer. There must be
+// kChannelDataAlignment-1 extra bytes beyond the end to potentially use after
+// aligning it.
+inline char *RoundChannelData(char *data, size_t size) {
+  const uintptr_t data_value = reinterpret_cast<uintptr_t>(data);
+  const uintptr_t data_end = data_value + size;
+  const uintptr_t data_end_max = data_end + (kChannelDataAlignment - 1);
+  const uintptr_t rounded_data_end =
+      data_end_max - (data_end_max % kChannelDataAlignment);
+  const uintptr_t rounded_data = rounded_data_end - size;
+  return reinterpret_cast<char *>(rounded_data);
+}
+
+}  // namespace aos
+
+#endif  // AOS_IPC_LIB_DATA_ALIGNMENT_H_
diff --git a/aos/ipc_lib/lockless_queue.cc b/aos/ipc_lib/lockless_queue.cc
index 903150b..c323b8b 100644
--- a/aos/ipc_lib/lockless_queue.cc
+++ b/aos/ipc_lib/lockless_queue.cc
@@ -241,7 +241,8 @@
 
 size_t LocklessQueueConfiguration::message_size() const {
   // Round up the message size so following data is aligned appropriately.
-  return LocklessQueueMemory::AlignmentRoundUp(message_data_size) +
+  return LocklessQueueMemory::AlignmentRoundUp(message_data_size +
+                                               (kChannelDataAlignment - 1)) +
          sizeof(Message);
 }
 
@@ -549,7 +550,7 @@
   Message *message = memory_->GetMessage(scratch_index);
   message->header.queue_index.Invalidate();
 
-  return &message->data[0];
+  return message->data(memory_->message_data_size());
 }
 
 void LocklessQueue::Sender::Send(
@@ -788,7 +789,7 @@
   }
   *monotonic_remote_time = m->header.monotonic_remote_time;
   *realtime_remote_time = m->header.realtime_remote_time;
-  memcpy(data, &m->data[0], message_data_size());
+  memcpy(data, m->data(memory_->message_data_size()), message_data_size());
   *length = m->header.length;
 
   // And finally, confirm that the message *still* points to the queue index we
@@ -891,8 +892,9 @@
     ::std::cout << "      }" << ::std::endl;
     ::std::cout << "      data: {";
 
+    const char *const m_data = m->data(memory->message_data_size());
     for (size_t j = 0; j < m->header.length; ++j) {
-      char data = m->data[j];
+      char data = m_data[j];
       if (j != 0) {
         ::std::cout << " ";
       }
diff --git a/aos/ipc_lib/lockless_queue.h b/aos/ipc_lib/lockless_queue.h
index 976f758..0384aa8 100644
--- a/aos/ipc_lib/lockless_queue.h
+++ b/aos/ipc_lib/lockless_queue.h
@@ -7,6 +7,7 @@
 #include <vector>
 
 #include "aos/ipc_lib/aos_sync.h"
+#include "aos/ipc_lib/data_alignment.h"
 #include "aos/ipc_lib/index.h"
 #include "aos/time/time.h"
 
@@ -76,7 +77,19 @@
     size_t length;
   } header;
 
-  char data[];
+  char *data(size_t message_size) { return RoundedData(message_size); }
+  const char *data(size_t message_size) const {
+    return RoundedData(message_size);
+  }
+
+ private:
+  // This returns a non-const pointer into a const object. Be very careful about
+  // const correctness in publicly accessible APIs using it.
+  char *RoundedData(size_t message_size) const {
+    return RoundChannelData(const_cast<char *>(&data_pointer[0]), message_size);
+  }
+
+  char data_pointer[];
 };
 
 struct LocklessQueueConfiguration {
diff --git a/aos/ipc_lib/lockless_queue_memory.h b/aos/ipc_lib/lockless_queue_memory.h
index 0c0973c..cbe76a7 100644
--- a/aos/ipc_lib/lockless_queue_memory.h
+++ b/aos/ipc_lib/lockless_queue_memory.h
@@ -89,18 +89,24 @@
 
   // Getters for each of the 4 lists.
   Sender *GetSender(size_t sender_index) {
+    static_assert(alignof(Sender) <= kDataAlignment,
+                  "kDataAlignment is too small");
     return reinterpret_cast<Sender *>(&data[0] + SizeOfQueue() +
                                       SizeOfMessages() + SizeOfWatchers() +
                                       sender_index * sizeof(Sender));
   }
 
   Watcher *GetWatcher(size_t watcher_index) {
+    static_assert(alignof(Watcher) <= kDataAlignment,
+                  "kDataAlignment is too small");
     return reinterpret_cast<Watcher *>(&data[0] + SizeOfQueue() +
                                        SizeOfMessages() +
                                        watcher_index * sizeof(Watcher));
   }
 
   AtomicIndex *GetQueue(uint32_t index) {
+    static_assert(alignof(AtomicIndex) <= kDataAlignment,
+                  "kDataAlignment is too small");
     return reinterpret_cast<AtomicIndex *>(&data[0] +
                                            sizeof(AtomicIndex) * index);
   }
@@ -109,6 +115,8 @@
   // sender list, since those are messages available to be filled in and sent.
   // This removes the need to find lost messages when a sender dies.
   Message *GetMessage(Index index) {
+    static_assert(alignof(Message) <= kDataAlignment,
+                  "kDataAlignment is too small");
     return reinterpret_cast<Message *>(&data[0] + SizeOfQueue() +
                                        index.message_index() * message_size());
   }
diff --git a/aos/scoped/scoped_ptr.h b/aos/scoped/scoped_ptr.h
deleted file mode 100644
index 336e73b..0000000
--- a/aos/scoped/scoped_ptr.h
+++ /dev/null
@@ -1,43 +0,0 @@
-#ifndef AOS_SCOPED_PTR_H_
-#define AOS_SCOPED_PTR_H_
-
-#include "aos/macros.h"
-
-namespace aos {
-
-// A simple scoped_ptr implementation that works under both linux and vxworks.
-template<typename T>
-class scoped_ptr {
- public:
-  typedef T element_type;
-  
-  explicit scoped_ptr(T *p = NULL) : p_(p) {}
-  ~scoped_ptr() {
-    delete p_;
-  }
-
-  T &operator*() const { return *p_; }
-  T *operator->() const { return p_; }
-  T *get() const { return p_; }
-
-  operator bool() const { return p_ != NULL; }
-
-  void swap(scoped_ptr<T> &other) {
-    T *temp = other.p_;
-    other.p_ = p_;
-    p_ = other.p_;
-  }
-  void reset(T *p = NULL) {
-    if (p_ != NULL) delete p_;
-    p_ = p;
-  }
-
- private:
-  T *p_;
-
-  DISALLOW_COPY_AND_ASSIGN(scoped_ptr<T>);
-};
-
-}  // namespace aos
-
-#endif  // AOS_SCOPED_PTR_H_
diff --git a/aos/util/file.cc b/aos/util/file.cc
index af1f85b..b334ded 100644
--- a/aos/util/file.cc
+++ b/aos/util/file.cc
@@ -1,6 +1,8 @@
 #include "aos/util/file.h"
 
 #include <fcntl.h>
+#include <sys/stat.h>
+#include <sys/types.h>
 #include <unistd.h>
 
 #include <string_view>
@@ -46,5 +48,23 @@
   }
 }
 
+void MkdirP(std::string_view path, mode_t mode) {
+  auto last_slash_pos = path.find_last_of("/");
+
+  std::string folder(last_slash_pos == std::string_view::npos
+                         ? std::string_view("")
+                         : path.substr(0, last_slash_pos));
+  if (folder.empty()) return;
+  MkdirP(folder, mode);
+  const int result = mkdir(folder.c_str(), mode);
+  if (result == -1 && errno == EEXIST) {
+    VLOG(2) << folder << " already exists";
+    return;
+  } else {
+    VLOG(1) << "Created " << folder;
+  }
+  PCHECK(result == 0) << ": Error creating " << folder;
+}
+
 }  // namespace util
 }  // namespace aos
diff --git a/aos/util/file.h b/aos/util/file.h
index 3aebd87..d6724af 100644
--- a/aos/util/file.h
+++ b/aos/util/file.h
@@ -15,6 +15,8 @@
 void WriteStringToFileOrDie(const std::string_view filename,
                             const std::string_view contents);
 
+void MkdirP(std::string_view path, mode_t mode);
+
 }  // namespace util
 }  // namespace aos