Merge "Add relative encoder zeroing strategy"
diff --git a/WORKSPACE b/WORKSPACE
index 4a1b88f..ab7a70f 100644
--- a/WORKSPACE
+++ b/WORKSPACE
@@ -561,6 +561,27 @@
     url = "http://www.frc971.org/Build-Dependencies/emscripten-llvm-e" + emscripten_version + ".tar.gz",
 )
 
+new_http_archive(
+    name = "webrtc_x64",
+    build_file = "@//debian:webrtc.BUILD",
+    sha256 = "bd212b2a112a043d08d27f49027091788fa01c7c2ac5f072d096c17d9dbd976f",
+    url = "http://www.frc971.org/Build-Dependencies/webrtc-30326-1a68679-linux-x64.tar.gz",
+)
+
+new_http_archive(
+    name = "webrtc_arm",
+    build_file = "@//debian:webrtc.BUILD",
+    sha256 = "c34badaf313877cd03a0dfd6b71de024d806a7652550a7f1cd7dea523a7c813d",
+    url = "http://www.frc971.org/Build-Dependencies/webrtc-30326-1a68679-linux-arm.tar.gz",
+)
+
+new_http_archive(
+    name = "webrtc_rio",
+    build_file = "@//debian:webrtc.BUILD",
+    sha256 = "d86d3b030099b35ae5ea31c807fb4d0b0352598e79f1ea84877e5504e185faa8",
+    url = "http://www.frc971.org/Build-Dependencies/webrtc-30376-4c4735b-linux-rio.tar.gz",
+)
+
 # Fetch our Bazel dependencies that aren't distributed on npm
 load("@build_bazel_rules_typescript//:package.bzl", "rules_typescript_dependencies")
 
diff --git a/aos/config_flattener.cc b/aos/config_flattener.cc
index 6fb4af3..71cda32 100644
--- a/aos/config_flattener.cc
+++ b/aos/config_flattener.cc
@@ -27,8 +27,7 @@
       &configuration::MergeConfiguration(config, schemas).message(), true);
 
   // TODO(austin): Figure out how to squash the schemas onto 1 line so it is
-  // easier to read?  Or figure out how to split them into a second file which
-  // gets included.
+  // easier to read?
   VLOG(1) << "Flattened config is " << merged_config;
   util::WriteStringToFileOrDie(argv[1], merged_config);
   return 0;
diff --git a/aos/configuration.cc b/aos/configuration.cc
index a40c47c..c4f139c 100644
--- a/aos/configuration.cc
+++ b/aos/configuration.cc
@@ -470,6 +470,18 @@
   }
 }
 
+size_t ChannelIndex(const Configuration *configuration,
+                    const Channel *channel) {
+  CHECK(configuration->channels() != nullptr) << ": No channels";
+
+  auto c = std::find(configuration->channels()->begin(),
+                     configuration->channels()->end(), channel);
+  CHECK(c != configuration->channels()->end())
+      << ": Channel pointer not found in configuration()->channels()";
+
+  return std::distance(configuration->channels()->begin(), c);
+}
+
 std::string CleanedChannelToString(const Channel *channel) {
   FlatbufferDetachedBuffer<Channel> cleaned_channel = CopyFlatBuffer(channel);
   cleaned_channel.mutable_message()->clear_schema();
@@ -591,6 +603,7 @@
   }
   return nullptr;
 }
+
 const Node *GetMyNode(const Configuration *config) {
   const std::string hostname = (FLAGS_override_hostname.size() > 0)
                                    ? FLAGS_override_hostname
@@ -604,6 +617,17 @@
   return nullptr;
 }
 
+const Node *GetNode(const Configuration *config, const Node *node) {
+  if (!MultiNode(config)) {
+    CHECK(node == nullptr) << ": Provided a node in a single node world.";
+    return nullptr;
+  } else {
+    CHECK(node != nullptr);
+    CHECK(node->has_name());
+    return GetNode(config, node->name()->string_view());
+  }
+}
+
 const Node *GetNode(const Configuration *config, std::string_view name) {
   CHECK(config->has_nodes())
       << ": Asking for a node from a single node configuration.";
@@ -616,6 +640,46 @@
   return nullptr;
 }
 
+const Node *GetNodeOrDie(const Configuration *config, const Node *node) {
+  if (!MultiNode(config)) {
+    CHECK(node == nullptr) << ": Provided a node in a single node world.";
+    return nullptr;
+  } else {
+    const Node *config_node = GetNode(config, node);
+    if (config_node == nullptr) {
+      LOG(FATAL) << "Couldn't find node matching " << FlatbufferToJson(node);
+    }
+    return config_node;
+  }
+}
+
+int GetNodeIndex(const Configuration *config, const Node *node) {
+  CHECK(config->has_nodes())
+      << ": Asking for a node from a single node configuration.";
+  int node_index = 0;
+  for (const Node *iterated_node : *config->nodes()) {
+    if (iterated_node == node) {
+      return node_index;
+    }
+    ++node_index;
+  }
+  LOG(FATAL) << "Node not found in the configuration.";
+}
+
+std::vector<const Node *> GetNodes(const Configuration *config) {
+  std::vector<const Node *> nodes;
+  if (configuration::MultiNode(config)) {
+    for (const Node *node : *config->nodes()) {
+      nodes.emplace_back(node);
+    }
+  } else {
+    nodes.emplace_back(nullptr);
+  }
+  return nodes;
+}
+
+bool MultiNode(const Configuration *config) { return config->has_nodes(); }
+
 bool ChannelIsSendableOnNode(const Channel *channel, const Node *node) {
   if (node == nullptr) {
     return true;
diff --git a/aos/configuration.h b/aos/configuration.h
index ef30fce..a9fbfe3 100644
--- a/aos/configuration.h
+++ b/aos/configuration.h
@@ -60,15 +60,32 @@
                     channel->type()->string_view(), application_name, node);
 }
 
+// Returns the channel index (or dies) of channel in the provided config.
+size_t ChannelIndex(const Configuration *config, const Channel *channel);
+
 // Returns the Node out of the config with the matching name, or nullptr if it
 // can't be found.
 const Node *GetNode(const Configuration *config, std::string_view name);
+const Node *GetNode(const Configuration *config, const Node *node);
+// Returns a matching node, or nullptr if the provided node is nullptr and we
+// are in a single node world.
+const Node *GetNodeOrDie(const Configuration *config, const Node *node);
 // Returns the Node out of the configuration which matches our hostname.
 // CHECKs if it can't be found.
 const Node *GetMyNode(const Configuration *config);
 const Node *GetNodeFromHostname(const Configuration *config,
                                 std::string_view name);
 
+// Returns a vector of the nodes in the config.  (nullptr is considered the node
+// in a single node world.)
+std::vector<const Node *> GetNodes(const Configuration *config);
+
+// Returns the node index for a node.  Note: node needs to exist inside config.
+int GetNodeIndex(const Configuration *config, const Node *node);
+
+// Returns true if we are running in a multinode configuration.
+bool MultiNode(const Configuration *config);
+
 // Returns true if the provided channel is sendable on the provided node.
 bool ChannelIsSendableOnNode(const Channel *channel, const Node *node);
 // Returns true if the provided channel is able to be watched or fetched on the
diff --git a/aos/configuration_test.cc b/aos/configuration_test.cc
index b8fc5b6..70515fd 100644
--- a/aos/configuration_test.cc
+++ b/aos/configuration_test.cc
@@ -39,6 +39,16 @@
       FlatbufferToJson(config, true));
 }
 
+// Tests that we can get back a ChannelIndex.
+TEST_F(ConfigurationTest, ChannelIndex) {
+  FlatbufferDetachedBuffer<Configuration> config =
+      ReadConfig("aos/testdata/config1.json");
+
+  EXPECT_EQ(
+      ChannelIndex(&config.message(), config.message().channels()->Get(1u)),
+      1u);
+}
+
 // Tests that we can read and merge a multinode configuration.
 TEST_F(ConfigurationTest, ConfigMergeMultinode) {
   FlatbufferDetachedBuffer<Configuration> config =
@@ -599,6 +609,63 @@
       ::testing::ElementsAreArray({"pi1"}));
 }
 
+// Tests that we can pull out all the nodes.
+TEST_F(ConfigurationTest, GetNodes) {
+  {
+    FlatbufferDetachedBuffer<Configuration> config =
+        ReadConfig("aos/testdata/good_multinode.json");
+    const Node *pi1 = GetNode(&config.message(), "pi1");
+    const Node *pi2 = GetNode(&config.message(), "pi2");
+
+    EXPECT_THAT(GetNodes(&config.message()), ::testing::ElementsAre(pi1, pi2));
+  }
+
+  {
+    FlatbufferDetachedBuffer<Configuration> config =
+        ReadConfig("aos/testdata/config1.json");
+    EXPECT_THAT(GetNodes(&config.message()), ::testing::ElementsAre(nullptr));
+  }
+}
+
+// Tests that we can extract a node index from a config.
+TEST_F(ConfigurationTest, GetNodeIndex) {
+  FlatbufferDetachedBuffer<Configuration> config =
+      ReadConfig("aos/testdata/good_multinode.json");
+  const Node *pi1 = GetNode(&config.message(), "pi1");
+  const Node *pi2 = GetNode(&config.message(), "pi2");
+
+  EXPECT_EQ(GetNodeIndex(&config.message(), pi1), 0);
+  EXPECT_EQ(GetNodeIndex(&config.message(), pi2), 1);
+}
+
+// Tests that GetNodeOrDie handles both single and multi-node worlds and returns
+// valid nodes.
+TEST_F(ConfigurationDeathTest, GetNodeOrDie) {
+  FlatbufferDetachedBuffer<Configuration> config =
+      ReadConfig("aos/testdata/good_multinode.json");
+  FlatbufferDetachedBuffer<Configuration> config2 =
+      ReadConfig("aos/testdata/good_multinode.json");
+  {
+    // Simple case, nullptr -> nullptr
+    FlatbufferDetachedBuffer<Configuration> single_node_config =
+        ReadConfig("aos/testdata/config1.json");
+    EXPECT_EQ(nullptr, GetNodeOrDie(&single_node_config.message(), nullptr));
+
+    // Confirm that we die when a node is passed in.
+    EXPECT_DEATH(
+        {
+          GetNodeOrDie(&single_node_config.message(),
+                       config.message().nodes()->Get(0));
+        },
+        "Provided a node in a single node world.");
+  }
+
+  const Node *pi1 = GetNode(&config.message(), "pi1");
+  // Now try a lookup using a node from a different instance of the config.
+  EXPECT_EQ(pi1,
+            GetNodeOrDie(&config.message(), config2.message().nodes()->Get(0)));
+}
+
 }  // namespace testing
 }  // namespace configuration
 }  // namespace aos
diff --git a/aos/events/BUILD b/aos/events/BUILD
index a183cae..9cfdf10 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -41,6 +41,16 @@
     ],
 )
 
+cc_test(
+    name = "epoll_test",
+    srcs = ["epoll_test.cc"],
+    deps = [
+        ":epoll",
+        "//aos/testing:googletest",
+        "@com_github_google_glog//:glog",
+    ],
+)
+
 cc_library(
     name = "event_loop",
     srcs = [
@@ -58,6 +68,7 @@
         "//aos:configuration",
         "//aos:configuration_fbs",
         "//aos:flatbuffers",
+        "//aos/ipc_lib:data_alignment",
         "//aos/time",
         "//aos/util:phased_loop",
         "@com_github_google_flatbuffers//:flatbuffers",
@@ -214,6 +225,7 @@
         "//aos/ipc_lib:signalfd",
         "//aos/stl_mutex",
         "//aos/util:phased_loop",
+        "@com_google_absl//absl/base",
     ],
 )
 
@@ -243,8 +255,11 @@
 cc_test(
     name = "simulated_event_loop_test",
     srcs = ["simulated_event_loop_test.cc"],
+    data = ["multinode_pingpong_config.json"],
     deps = [
         ":event_loop_param_test",
+        ":ping_lib",
+        ":pong_lib",
         ":simulated_event_loop",
         "//aos/testing:googletest",
     ],
@@ -267,10 +282,12 @@
     srcs = [
         "event_scheduler.cc",
         "simulated_event_loop.cc",
+        "simulated_network_bridge.cc",
     ],
     hdrs = [
         "event_scheduler.h",
         "simulated_event_loop.h",
+        "simulated_network_bridge.h",
     ],
     visibility = ["//visibility:public"],
     deps = [
diff --git a/aos/events/epoll.cc b/aos/events/epoll.cc
index e19cf59..b9e7edb 100644
--- a/aos/events/epoll.cc
+++ b/aos/events/epoll.cc
@@ -71,6 +71,7 @@
 }
 
 void EPoll::Run() {
+  run_ = true;
   while (true) {
     // Pull a single event out.  Infinite timeout if we are supposed to be
     // running, and 0 length timeout otherwise.  This lets us flush the event
@@ -90,34 +91,53 @@
         return;
       }
     }
-    EventData *event_data = static_cast<struct EventData *>(event.data.ptr);
-    if (event.events & (EPOLLIN | EPOLLPRI)) {
+    EventData *const event_data = static_cast<struct EventData *>(event.data.ptr);
+    if (event.events & kInEvents) {
+      CHECK(event_data->in_fn)
+          << ": No handler registered for input events on " << event_data->fd;
       event_data->in_fn();
     }
+    if (event.events & kOutEvents) {
+      CHECK(event_data->out_fn)
+          << ": No handler registered for output events on " << event_data->fd;
+      event_data->out_fn();
+    }
   }
 }
 
 void EPoll::Quit() { PCHECK(write(quit_signal_fd_, "q", 1) == 1); }
 
 void EPoll::OnReadable(int fd, ::std::function<void()> function) {
-  ::std::unique_ptr<EventData> event_data(
-      new EventData{fd, ::std::move(function)});
+  EventData *event_data = GetEventData(fd);
+  if (event_data == nullptr) {
+    fns_.emplace_back(std::make_unique<EventData>(fd));
+    event_data = fns_.back().get();
+  } else {
+    CHECK(!event_data->in_fn) << ": Duplicate in functions for " << fd;
+  }
+  event_data->in_fn = ::std::move(function);
+  DoEpollCtl(event_data, event_data->events | kInEvents);
+}
 
-  struct epoll_event event;
-  event.events = EPOLLIN | EPOLLPRI;
-  event.data.ptr = event_data.get();
-  PCHECK(epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &event) == 0)
-      << ": Failed to add fd " << fd;
-  fns_.push_back(::std::move(event_data));
+void EPoll::OnWriteable(int fd, ::std::function<void()> function) {
+  EventData *event_data = GetEventData(fd);
+  if (event_data == nullptr) {
+    fns_.emplace_back(std::make_unique<EventData>(fd));
+    event_data = fns_.back().get();
+  } else {
+    CHECK(!event_data->out_fn) << ": Duplicate out functions for " << fd;
+  }
+  event_data->out_fn = ::std::move(function);
+  DoEpollCtl(event_data, event_data->events | kOutEvents);
 }
 
 // Removes fd from the event loop.
 void EPoll::DeleteFd(int fd) {
   auto element = fns_.begin();
-  PCHECK(epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr) == 0);
   while (fns_.end() != element) {
     if (element->get()->fd == fd) {
       fns_.erase(element);
+      PCHECK(epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr) == 0);
       return;
     }
     ++element;
@@ -125,5 +145,50 @@
   LOG(FATAL) << "fd " << fd << " not found";
 }
 
+void EPoll::EnableEvents(int fd, uint32_t events) {
+  EventData *const event_data = CHECK_NOTNULL(GetEventData(fd));
+  DoEpollCtl(event_data, event_data->events | events);
+}
+
+void EPoll::DisableEvents(int fd, uint32_t events) {
+  EventData *const event_data = CHECK_NOTNULL(GetEventData(fd));
+  DoEpollCtl(event_data, event_data->events & ~events);
+}
+
+EPoll::EventData *EPoll::GetEventData(int fd) {
+  const auto iterator = std::find_if(
+      fns_.begin(), fns_.end(),
+      [fd](const std::unique_ptr<EventData> &data) { return data->fd == fd; });
+  if (iterator == fns_.end()) {
+    return nullptr;
+  }
+  return iterator->get();
+}
+
+void EPoll::DoEpollCtl(EventData *event_data, const uint32_t new_events) {
+  const uint32_t old_events = event_data->events;
+  event_data->events = new_events;
+  if (new_events == 0) {
+    if (old_events == 0) {
+      // Not added, and doesn't need to be. Nothing to do here.
+      return;
+    }
+    // It was added, but should now be removed.
+    PCHECK(epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, event_data->fd, nullptr) == 0);
+    return;
+  }
+
+  int operation = EPOLL_CTL_MOD;
+  if (old_events == 0) {
+    // If it wasn't added before, then this is the first time it's being added.
+    operation = EPOLL_CTL_ADD;
+  }
+  struct epoll_event event;
+  event.events = event_data->events;
+  event.data.ptr = event_data;
+  PCHECK(epoll_ctl(epoll_fd_, operation, event_data->fd, &event) == 0)
+      << ": Failed to " << operation << " epoll fd: " << event_data->fd;
+}
+
 }  // namespace internal
 }  // namespace aos
diff --git a/aos/events/epoll.h b/aos/events/epoll.h
index 7bc135c..51a20d0 100644
--- a/aos/events/epoll.h
+++ b/aos/events/epoll.h
@@ -63,25 +63,57 @@
   void Quit();
 
   // Registers a function to be called if the fd becomes readable.
-  // There should only be 1 function registered for each fd.
+  // Only one function may be registered for readability on each fd.
   void OnReadable(int fd, ::std::function<void()> function);
 
+  // Registers a function to be called if the fd becomes writeable.
+  // Only one function may be registered for writability on each fd.
+  void OnWriteable(int fd, ::std::function<void()> function);
+
   // Removes fd from the event loop.
   // All Fds must be cleaned up before this class is destroyed.
   void DeleteFd(int fd);
 
+  // Enables calling the existing function registered for fd when it becomes
+  // writeable.
+  void EnableWriteable(int fd) { EnableEvents(fd, kOutEvents); }
+
+  // Disables calling the existing function registered for fd when it becomes
+  // writeable.
+  void DisableWriteable(int fd) { DisableEvents(fd, kOutEvents); }
+
  private:
+  // Structure whose pointer should be returned by epoll.  Makes looking up the
+  // function fast and easy.
+  struct EventData {
+    EventData(int fd_in) : fd(fd_in) {}
+    // We use pointers to these objects as persistent identifiers, so they can't
+    // be moved.
+    EventData(const EventData &) = delete;
+    EventData &operator=(const EventData &) = delete;
+
+    const int fd;
+    uint32_t events = 0;
+    ::std::function<void()> in_fn, out_fn;
+  };
+
+  void EnableEvents(int fd, uint32_t events);
+  void DisableEvents(int fd, uint32_t events);
+
+  EventData *GetEventData(int fd);
+
+  void DoEpollCtl(EventData *event_data, uint32_t new_events);
+
+  // TODO(Brian): Figure out a nicer way to handle EPOLLPRI than lumping it in
+  // with input.
+  static constexpr uint32_t kInEvents = EPOLLIN | EPOLLPRI;
+  static constexpr uint32_t kOutEvents = EPOLLOUT;
+
   ::std::atomic<bool> run_{true};
 
   // Main epoll fd.
   int epoll_fd_;
 
-  // Structure whose pointer should be returned by epoll.  Makes looking up the
-  // function fast and easy.
-  struct EventData {
-    int fd;
-    ::std::function<void()> in_fn;
-  };
   ::std::vector<::std::unique_ptr<EventData>> fns_;
 
   // Pipe pair for handling quit.
diff --git a/aos/events/epoll_test.cc b/aos/events/epoll_test.cc
new file mode 100644
index 0000000..4e6bbbc
--- /dev/null
+++ b/aos/events/epoll_test.cc
@@ -0,0 +1,172 @@
+#include "aos/events/epoll.h"
+
+#include <fcntl.h>
+#include <unistd.h>
+
+#include "gtest/gtest.h"
+#include "glog/logging.h"
+
+namespace aos {
+namespace internal {
+namespace testing {
+
+// A simple wrapper around both ends of a pipe along with some helpers to easily
+// read/write data through it.
+class Pipe {
+ public:
+  Pipe() { PCHECK(pipe2(fds_, O_NONBLOCK) == 0); }
+  ~Pipe() {
+    PCHECK(close(fds_[0]) == 0);
+    PCHECK(close(fds_[1]) == 0);
+  }
+
+  int read_fd() { return fds_[0]; }
+  int write_fd() { return fds_[1]; }
+
+  void Write(const std::string &data) {
+    CHECK_EQ(write(write_fd(), data.data(), data.size()),
+             static_cast<ssize_t>(data.size()));
+  }
+
+  std::string Read(size_t size) {
+    std::string result;
+    result.resize(size);
+    CHECK_EQ(read(read_fd(), result.data(), size), static_cast<ssize_t>(size));
+    return result;
+  }
+
+ private:
+  int fds_[2];
+};
+
+class EPollTest : public ::testing::Test {
+  public:
+   void RunFor(std::chrono::nanoseconds duration) {
+     TimerFd timerfd;
+     bool did_quit = false;
+     epoll_.OnReadable(timerfd.fd(), [this, &timerfd, &did_quit]() {
+       CHECK(!did_quit);
+       epoll_.Quit();
+       did_quit = true;
+       timerfd.Read();
+     });
+     timerfd.SetTime(monotonic_clock::now() + duration,
+                     monotonic_clock::duration::zero());
+     epoll_.Run();
+     CHECK(did_quit);
+     epoll_.DeleteFd(timerfd.fd());
+   }
+
+  // Tests should avoid relying on ordering for events closer in time than this,
+  // or waiting for longer than this to ensure events happen in order.
+  static constexpr std::chrono::nanoseconds tick_duration() {
+    return std::chrono::milliseconds(50);
+  }
+
+   EPoll epoll_;
+};
+
+// Test that the basics of OnReadable work.
+TEST_F(EPollTest, BasicReadable) {
+  Pipe pipe;
+  bool got_data = false;
+  epoll_.OnReadable(pipe.read_fd(), [&]() {
+    ASSERT_FALSE(got_data);
+    ASSERT_EQ("some", pipe.Read(4));
+    got_data = true;
+  });
+  RunFor(tick_duration());
+  EXPECT_FALSE(got_data);
+
+  pipe.Write("some");
+  RunFor(tick_duration());
+  EXPECT_TRUE(got_data);
+
+  epoll_.DeleteFd(pipe.read_fd());
+}
+
+// Test that the basics of OnWriteable work.
+TEST_F(EPollTest, BasicWriteable) {
+  Pipe pipe;
+  int number_writes = 0;
+  epoll_.OnWriteable(pipe.write_fd(), [&]() {
+    pipe.Write(" ");
+    ++number_writes;
+  });
+
+  // First, fill up the pipe's write buffer.
+  RunFor(tick_duration());
+  EXPECT_GT(number_writes, 0);
+
+  // Now, if we try again, we shouldn't do anything.
+  const int bytes_in_pipe = number_writes;
+  number_writes = 0;
+  RunFor(tick_duration());
+  EXPECT_EQ(number_writes, 0);
+
+  // Empty the pipe, then fill it up again.
+  for (int i = 0; i < bytes_in_pipe; ++i) {
+    ASSERT_EQ(" ", pipe.Read(1));
+  }
+  number_writes = 0;
+  RunFor(tick_duration());
+  EXPECT_EQ(number_writes, bytes_in_pipe);
+
+  epoll_.DeleteFd(pipe.write_fd());
+}
+
+TEST(EPollDeathTest, InvalidFd) {
+  EPoll epoll;
+  Pipe pipe;
+  epoll.OnReadable(pipe.read_fd(), []() {});
+  EXPECT_DEATH(epoll.OnReadable(pipe.read_fd(), []() {}),
+               "Duplicate in functions");
+  epoll.OnWriteable(pipe.read_fd(), []() {});
+  EXPECT_DEATH(epoll.OnWriteable(pipe.read_fd(), []() {}),
+               "Duplicate out functions");
+
+  epoll.DeleteFd(pipe.read_fd());
+  EXPECT_DEATH(epoll.DeleteFd(pipe.read_fd()), "fd [0-9]+ not found");
+  EXPECT_DEATH(epoll.DeleteFd(pipe.write_fd()), "fd [0-9]+ not found");
+}
+
+// Tests that enabling/disabling a writeable FD works.
+TEST_F(EPollTest, WriteableEnableDisable) {
+  Pipe pipe;
+  int number_writes = 0;
+  epoll_.OnWriteable(pipe.write_fd(), [&]() {
+    pipe.Write(" ");
+    ++number_writes;
+  });
+
+  // First, fill up the pipe's write buffer.
+  RunFor(tick_duration());
+  EXPECT_GT(number_writes, 0);
+
+  // Empty the pipe.
+  const int bytes_in_pipe = number_writes;
+  for (int i = 0; i < bytes_in_pipe; ++i) {
+    ASSERT_EQ(" ", pipe.Read(1));
+  }
+
+  // If we disable writeable checking, then nothing should happen.
+  epoll_.DisableWriteable(pipe.write_fd());
+  number_writes = 0;
+  RunFor(tick_duration());
+  EXPECT_EQ(number_writes, 0);
+
+  // Disabling it again should be a NOP.
+  epoll_.DisableWriteable(pipe.write_fd());
+
+  // And then when we re-enable, it should fill the pipe up again.
+  epoll_.EnableWriteable(pipe.write_fd());
+  number_writes = 0;
+  RunFor(tick_duration());
+  EXPECT_EQ(number_writes, bytes_in_pipe);
+
+  epoll_.DeleteFd(pipe.write_fd());
+}
+
+}  // namespace testing
+}  // namespace internal
+}  // namespace aos
diff --git a/aos/events/event_loop.cc b/aos/events/event_loop.cc
index e368df2..76b83c9 100644
--- a/aos/events/event_loop.cc
+++ b/aos/events/event_loop.cc
@@ -71,14 +71,17 @@
 }
 
 int EventLoop::ChannelIndex(const Channel *channel) {
-  CHECK(configuration_->channels() != nullptr) << ": No channels";
+  return configuration::ChannelIndex(configuration_, channel);
+}
 
-  auto c = std::find(configuration_->channels()->begin(),
-                     configuration_->channels()->end(), channel);
-  CHECK(c != configuration_->channels()->end())
-      << ": Channel pointer not found in configuration()->channels()";
-
-  return std::distance(configuration()->channels()->begin(), c);
+WatcherState *EventLoop::GetWatcherState(const Channel *channel) {
+  const int channel_index = ChannelIndex(channel);
+  for (const std::unique_ptr<WatcherState> &watcher : watchers_) {
+    if (watcher->channel_index() == channel_index) {
+      return watcher.get();
+    }
+  }
+  LOG(FATAL) << "No watcher found for channel";
 }
 
 void EventLoop::NewSender(RawSender *sender) {
diff --git a/aos/events/event_loop.h b/aos/events/event_loop.h
index 4a12096..3fe46ec 100644
--- a/aos/events/event_loop.h
+++ b/aos/events/event_loop.h
@@ -1,4 +1,5 @@
 #ifndef AOS_EVENTS_EVENT_LOOP_H_
+
 #define AOS_EVENTS_EVENT_LOOP_H_
 
 #include <atomic>
@@ -11,6 +12,7 @@
 #include "aos/events/event_loop_generated.h"
 #include "aos/events/timing_statistics.h"
 #include "aos/flatbuffers.h"
+#include "aos/ipc_lib/data_alignment.h"
 #include "aos/json_to_flatbuffer.h"
 #include "aos/time/time.h"
 #include "aos/util/phased_loop.h"
@@ -141,6 +143,13 @@
   // Returns the queue index that this was sent with.
   uint32_t sent_queue_index() const { return sent_queue_index_; }
 
+  // Returns the associated flatbuffers-style allocator. This must be
+  // deallocated before the message is sent.
+  PreallocatedAllocator *fbb_allocator() {
+    fbb_allocator_ = PreallocatedAllocator(data(), size());
+    return &fbb_allocator_;
+  }
+
  protected:
   EventLoop *event_loop() { return event_loop_; }
 
@@ -166,6 +175,8 @@
   const Channel *channel_;
 
   internal::RawSenderTiming timing_;
+
+  PreallocatedAllocator fbb_allocator_{nullptr, 0};
 };
 
 // Fetches the newest message from a channel.
@@ -177,12 +188,26 @@
 
   // Fetches the next message. Returns true if it fetched a new message.  This
   // method will only return messages sent after the Fetcher was created.
-  bool FetchNext() { return fetcher_->FetchNext(); }
+  bool FetchNext() {
+    const bool result = fetcher_->FetchNext();
+    if (result) {
+      CheckChannelDataAlignment(fetcher_->context().data,
+                                fetcher_->context().size);
+    }
+    return result;
+  }
 
   // Fetches the most recent message. Returns true if it fetched a new message.
   // This will return the latest message regardless of if it was sent before or
   // after the fetcher was created.
-  bool Fetch() { return fetcher_->Fetch(); }
+  bool Fetch() {
+    const bool result = fetcher_->Fetch();
+    if (result) {
+      CheckChannelDataAlignment(fetcher_->context().data,
+                                fetcher_->context().size);
+    }
+    return result;
+  }
 
   // Returns a pointer to the contained flatbuffer, or nullptr if there is no
   // available message.
@@ -222,10 +247,17 @@
   // builder.Send(t_builder.Finish());
   class Builder {
    public:
-    Builder(RawSender *sender, void *data, size_t size)
-        : alloc_(data, size), fbb_(size, &alloc_), sender_(sender) {
+    Builder(RawSender *sender, PreallocatedAllocator *allocator)
+        : fbb_(allocator->size(), allocator), sender_(sender) {
+      CheckChannelDataAlignment(allocator->data(), allocator->size());
       fbb_.ForceDefaults(1);
     }
+    Builder() {}
+    Builder(const Builder &) = delete;
+    Builder(Builder &&) = default;
+
+    Builder &operator=(const Builder &) = delete;
+    Builder &operator=(Builder &&) = default;
 
     flatbuffers::FlatBufferBuilder *fbb() { return &fbb_; }
 
@@ -243,7 +275,6 @@
     void CheckSent() { fbb_.Finished(); }
 
    private:
-    PreallocatedAllocator alloc_;
     flatbuffers::FlatBufferBuilder fbb_;
     RawSender *sender_;
   };
@@ -482,6 +513,16 @@
   // Validates that channel exists inside configuration_ and finds its index.
   int ChannelIndex(const Channel *channel);
 
+  // Returns the state for the watcher on the corresponding channel. This
+  // watcher must exist before calling this.
+  WatcherState *GetWatcherState(const Channel *channel);
+
+  // Returns a Sender's protected RawSender
+  template <typename T>
+  static RawSender *GetRawSender(aos::Sender<T> *sender) {
+    return sender->sender_.get();
+  }
+
   // Context available for watchers, timers, and phased loops.
   Context context_;
 
diff --git a/aos/events/event_loop_param_test.h b/aos/events/event_loop_param_test.h
index ea5fd3d..89e80ae 100644
--- a/aos/events/event_loop_param_test.h
+++ b/aos/events/event_loop_param_test.h
@@ -55,7 +55,7 @@
   virtual void SleepFor(::std::chrono::nanoseconds duration) = 0;
 
   void EnableNodes(std::string_view my_node) {
-    std::string json = std::string(R"config({
+    std::string json = R"config({
   "channels": [
     {
       "name": "/aos",
@@ -80,9 +80,12 @@
   ],
   "nodes": [
     {
-      "name": ")config") +
-                       std::string(my_node) + R"config(",
+      "name": "me",
       "hostname": "myhostname"
+    },
+    {
+      "name": "them",
+      "hostname": "themhostname"
     }
   ]
 })config";
@@ -90,17 +93,17 @@
     flatbuffer_ = FlatbufferDetachedBuffer<Configuration>(
         JsonToFlatbuffer(json, Configuration::MiniReflectTypeTable()));
 
-    my_node_ = my_node;
+    my_node_ = configuration::GetNode(&flatbuffer_.message(), my_node);
   }
 
-  std::string_view my_node() const { return my_node_; }
+  const Node *my_node() const { return my_node_; }
 
   const Configuration *configuration() { return &flatbuffer_.message(); }
 
  private:
   FlatbufferDetachedBuffer<Configuration> flatbuffer_;
 
-  std::string my_node_;
+  const Node *my_node_ = nullptr;
 };
 
 class AbstractEventLoopTestBase
@@ -134,7 +137,7 @@
 
   const Configuration *configuration() { return factory_->configuration(); }
 
-  std::string_view my_node() const { return factory_->my_node(); }
+  const Node *my_node() const { return factory_->my_node(); }
 
   // Ends the given event loop at the given time from now.
   void EndEventLoop(EventLoop *loop, ::std::chrono::milliseconds duration) {
diff --git a/aos/events/event_loop_tmpl.h b/aos/events/event_loop_tmpl.h
index c2c9884..dfb5a6d 100644
--- a/aos/events/event_loop_tmpl.h
+++ b/aos/events/event_loop_tmpl.h
@@ -23,7 +23,7 @@
 
 template <typename T>
 typename Sender<T>::Builder Sender<T>::MakeBuilder() {
-  return Builder(sender_.get(), sender_->data(), sender_->size());
+  return Builder(sender_.get(), sender_->fbb_allocator());
 }
 
 template <typename Watch>
@@ -193,6 +193,7 @@
   // context.
   void DoCallCallback(std::function<monotonic_clock::time_point()> get_time,
                       Context context) {
+    CheckChannelDataAlignment(context.data, context.size);
     const monotonic_clock::time_point monotonic_start_time = get_time();
     {
       const float start_latency =
diff --git a/aos/events/event_scheduler.cc b/aos/events/event_scheduler.cc
index b5a530e..57f20ae 100644
--- a/aos/events/event_scheduler.cc
+++ b/aos/events/event_scheduler.cc
@@ -8,7 +8,7 @@
 namespace aos {
 
 EventScheduler::Token EventScheduler::Schedule(
-    ::aos::monotonic_clock::time_point time, ::std::function<void()> callback) {
+    distributed_clock::time_point time, ::std::function<void()> callback) {
   return events_list_.emplace(time, callback);
 }
 
@@ -16,9 +16,9 @@
   events_list_.erase(token);
 }
 
-void EventScheduler::RunFor(monotonic_clock::duration duration) {
-  const ::aos::monotonic_clock::time_point end_time =
-      monotonic_now() + duration;
+void EventScheduler::RunFor(distributed_clock::duration duration) {
+  const distributed_clock::time_point end_time =
+      distributed_now() + duration;
   is_running_ = true;
   for (std::function<void()> &on_run : on_run_) {
     on_run();
@@ -26,7 +26,7 @@
   on_run_.clear();
   while (!events_list_.empty() && is_running_) {
     auto iter = events_list_.begin();
-    ::aos::monotonic_clock::time_point next_time = iter->first;
+    distributed_clock::time_point next_time = iter->first;
     if (next_time > end_time) {
       break;
     }
@@ -53,4 +53,11 @@
   }
 }
 
+std::ostream &operator<<(std::ostream &stream,
+                         const aos::distributed_clock::time_point &now) {
+  // Print it the same way we print a monotonic time.  Literally.
+  stream << monotonic_clock::time_point(now.time_since_epoch());
+  return stream;
+}
+
 }  // namespace aos
diff --git a/aos/events/event_scheduler.h b/aos/events/event_scheduler.h
index 432f4ad..400a307 100644
--- a/aos/events/event_scheduler.h
+++ b/aos/events/event_scheduler.h
@@ -14,15 +14,43 @@
 
 namespace aos {
 
+// This clock is the basis for distributed time.  It is used to synchronize time
+// between multiple nodes.  This is a new type so conversions to and from the
+// monotonic and realtime clocks aren't implicit.
+class distributed_clock {
+ public:
+  typedef ::std::chrono::nanoseconds::rep rep;
+  typedef ::std::chrono::nanoseconds::period period;
+  typedef ::std::chrono::nanoseconds duration;
+  typedef ::std::chrono::time_point<distributed_clock> time_point;
+
+  // This clock is the base clock for the simulation and everything is synced to
+  // it.  It never jumps.
+  static constexpr bool is_steady = true;
+
+  // Returns the epoch (0).
+  static constexpr time_point epoch() { return time_point(zero()); }
+
+  static constexpr duration zero() { return duration(0); }
+
+  static constexpr time_point min_time{
+      time_point(duration(::std::numeric_limits<duration::rep>::min()))};
+  static constexpr time_point max_time{
+      time_point(duration(::std::numeric_limits<duration::rep>::max()))};
+};
+
+std::ostream &operator<<(std::ostream &stream,
+                         const aos::distributed_clock::time_point &now);
+
 class EventScheduler {
  public:
   using ChannelType =
-      std::multimap<monotonic_clock::time_point, std::function<void()>>;
+      std::multimap<distributed_clock::time_point, std::function<void()>>;
   using Token = ChannelType::iterator;
 
   // Schedule an event with a callback function
   // Returns an iterator to the event
-  Token Schedule(monotonic_clock::time_point time,
+  Token Schedule(distributed_clock::time_point time,
                  std::function<void()> callback);
 
   // Schedules a callback when the event scheduler starts.
@@ -38,30 +66,17 @@
   // Runs until exited.
   void Run();
   // Runs for a duration.
-  void RunFor(monotonic_clock::duration duration);
+  void RunFor(distributed_clock::duration duration);
 
   void Exit() { is_running_ = false; }
 
   bool is_running() const { return is_running_; }
 
-  monotonic_clock::time_point monotonic_now() const { return now_; }
-
-  realtime_clock::time_point realtime_now() const {
-    return realtime_clock::time_point(monotonic_now().time_since_epoch() +
-                                      realtime_offset_);
-  }
-
-  // Sets realtime clock to realtime_now for a given monotonic clock.
-  void SetRealtimeOffset(monotonic_clock::time_point monotonic_now,
-                         realtime_clock::time_point realtime_now) {
-    realtime_offset_ =
-        realtime_now.time_since_epoch() - monotonic_now.time_since_epoch();
-  }
+  distributed_clock::time_point distributed_now() const { return now_; }
 
  private:
   // Current execution time.
-  monotonic_clock::time_point now_ = monotonic_clock::epoch();
-  std::chrono::nanoseconds realtime_offset_ = std::chrono::seconds(0);
+  distributed_clock::time_point now_ = distributed_clock::epoch();
 
   std::vector<std::function<void()>> on_run_;
 
diff --git a/aos/events/logging/BUILD b/aos/events/logging/BUILD
index b2be972..efbbcc6 100644
--- a/aos/events/logging/BUILD
+++ b/aos/events/logging/BUILD
@@ -68,6 +68,7 @@
         "@com_github_google_glog//:glog",
     ],
 )
+
 cc_binary(
     name = "logger_main",
     srcs = [
diff --git a/aos/events/logging/log_cat.cc b/aos/events/logging/log_cat.cc
index 1d1dd8b..80ec8e7 100644
--- a/aos/events/logging/log_cat.cc
+++ b/aos/events/logging/log_cat.cc
@@ -31,7 +31,7 @@
   reader.Register();
 
   std::unique_ptr<aos::EventLoop> printer_event_loop =
-      reader.event_loop_factory()->MakeEventLoop("printer");
+      reader.event_loop_factory()->MakeEventLoop("printer", reader.node());
   printer_event_loop->SkipTimingReport();
 
   bool found_channel = false;
@@ -66,7 +66,7 @@
                         << aos::FlatbufferToJson(
                                channel->schema(),
                                static_cast<const uint8_t *>(message))
-                        << '\n';
+                        << std::endl;
             } else {
               std::cout << context.realtime_event_time << " ("
                         << context.monotonic_event_time << ") "
@@ -75,7 +75,7 @@
                         << aos::FlatbufferToJson(
                                channel->schema(),
                                static_cast<const uint8_t *>(message))
-                        << '\n';
+                        << std::endl;
             }
           });
       found_channel = true;
diff --git a/aos/events/logging/log_stats.cc b/aos/events/logging/log_stats.cc
index 2d9604f..c766fe6 100644
--- a/aos/events/logging/log_stats.cc
+++ b/aos/events/logging/log_stats.cc
@@ -65,7 +65,7 @@
 
   // Make an eventloop for retrieving stats
   std::unique_ptr<aos::EventLoop> stats_event_loop =
-      log_reader_factory.MakeEventLoop("logstats");
+      log_reader_factory.MakeEventLoop("logstats", reader.node());
   stats_event_loop->SkipTimingReport();
 
   // Read channel info and store in vector
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index 51dc10c..2de17d7 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -276,13 +276,16 @@
 
 void LogReader::Register() {
   event_loop_factory_unique_ptr_ =
-      std::make_unique<SimulatedEventLoopFactory>(configuration(), node());
+      std::make_unique<SimulatedEventLoopFactory>(configuration());
   Register(event_loop_factory_unique_ptr_.get());
 }
 
 void LogReader::Register(SimulatedEventLoopFactory *event_loop_factory) {
   event_loop_factory_ = event_loop_factory;
-  event_loop_unique_ptr_ = event_loop_factory_->MakeEventLoop("log_reader");
+  node_event_loop_factory_ =
+      event_loop_factory_->GetNodeEventLoopFactory(node());
+  event_loop_unique_ptr_ =
+      event_loop_factory->MakeEventLoop("log_reader", node());
   // We don't run timing reports when trying to print out logged data, because
   // otherwise we would end up printing out the timing reports themselves...
   // This is only really relevant when we are replaying into a simulation.
@@ -355,8 +358,8 @@
                "this.";
 
         // If we have access to the factory, use it to fix the realtime time.
-        if (event_loop_factory_ != nullptr) {
-          event_loop_factory_->SetRealtimeOffset(
+        if (node_event_loop_factory_ != nullptr) {
+          node_event_loop_factory_->SetRealtimeOffset(
               monotonic_clock::time_point(chrono::nanoseconds(
                   channel_data.message().monotonic_sent_time())),
               realtime_clock::time_point(chrono::nanoseconds(
@@ -410,6 +413,7 @@
   event_loop_ = nullptr;
   event_loop_factory_unique_ptr_.reset();
   event_loop_factory_ = nullptr;
+  node_event_loop_factory_ = nullptr;
 }
 
 void LogReader::RemapLoggedChannel(std::string_view name, std::string_view type,
@@ -434,6 +438,9 @@
 }
 
 void LogReader::MakeRemappedConfig() {
+  CHECK(!event_loop_)
+      << ": Can't change the mapping after the events are scheduled.";
+
   // If no remapping occurred and we are using the original config, then there
   // is nothing interesting to do here.
   if (remapped_channels_.empty() && replay_configuration_ == nullptr) {
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index 337109b..54b55d8 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -152,6 +152,7 @@
   std::vector<std::unique_ptr<RawSender>> channels_;
 
   std::unique_ptr<EventLoop> event_loop_unique_ptr_;
+  NodeEventLoopFactory *node_event_loop_factory_ = nullptr;
   EventLoop *event_loop_ = nullptr;
   TimerHandler *timer_handler_;
 
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 5323493..55d0ecc 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -208,8 +208,10 @@
   MultinodeLoggerTest()
       : config_(aos::configuration::ReadConfig(
             "aos/events/logging/multinode_pingpong_config.json")),
-        event_loop_factory_(&config_.message(), "pi1"),
-        ping_event_loop_(event_loop_factory_.MakeEventLoop("ping")),
+        event_loop_factory_(&config_.message()),
+        ping_event_loop_(event_loop_factory_.MakeEventLoop(
+            "ping", configuration::GetNode(event_loop_factory_.configuration(),
+                                           "pi1"))),
         ping_(ping_event_loop_.get()) {}
 
   // Config and factory.
@@ -233,8 +235,10 @@
   LOG(INFO) << "Logging data to " << logfile;
 
   {
+    const Node *pi1 =
+        configuration::GetNode(event_loop_factory_.configuration(), "pi1");
     std::unique_ptr<EventLoop> pong_event_loop =
-        event_loop_factory_.MakeEventLoop("pong");
+        event_loop_factory_.MakeEventLoop("pong", pi1);
 
     std::unique_ptr<aos::RawSender> pong_sender(
         pong_event_loop->MakeRawSender(aos::configuration::GetChannel(
@@ -262,7 +266,7 @@
 
     DetachedBufferWriter writer(logfile);
     std::unique_ptr<EventLoop> logger_event_loop =
-        event_loop_factory_.MakeEventLoop("logger");
+        event_loop_factory_.MakeEventLoop("logger", pi1);
 
     event_loop_factory_.RunFor(chrono::milliseconds(95));
 
@@ -276,20 +280,23 @@
   // TODO(austin): Also replay as pi2 or pi3 and make sure we see the pong
   // messages.  This won't work today yet until the log reading code gets
   // significantly better.
-  SimulatedEventLoopFactory log_reader_factory(reader.logged_configuration(), reader.node());
+  SimulatedEventLoopFactory log_reader_factory(reader.logged_configuration());
   log_reader_factory.set_send_delay(chrono::microseconds(0));
 
   // This sends out the fetched messages and advances time to the start of the
   // log file.
   reader.Register(&log_reader_factory);
 
+  const Node *pi1 =
+      configuration::GetNode(log_reader_factory.configuration(), "pi1");
+
   ASSERT_NE(reader.node(), nullptr);
   EXPECT_EQ(reader.node()->name()->string_view(), "pi1");
 
   reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
 
   std::unique_ptr<EventLoop> test_event_loop =
-      log_reader_factory.MakeEventLoop("test");
+      log_reader_factory.MakeEventLoop("test", pi1);
 
   int ping_count = 10;
   int pong_count = 10;
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index cc11520..b4578cb 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -18,6 +18,7 @@
 #include "aos/ipc_lib/signalfd.h"
 #include "aos/realtime.h"
 #include "aos/stl_mutex/stl_mutex.h"
+#include "aos/util/file.h"
 #include "aos/util/phased_loop.h"
 #include "glog/logging.h"
 
@@ -72,7 +73,7 @@
 
     size_ = ipc_lib::LocklessQueueMemorySize(config_);
 
-    MkdirP(path);
+    util::MkdirP(path, FLAGS_permissions);
 
     // There are 2 cases.  Either the file already exists, or it does not
     // already exist and we need to create it.  Start by trying to create it. If
@@ -123,24 +124,11 @@
 
   const ipc_lib::LocklessQueueConfiguration &config() const { return config_; }
 
- private:
-  void MkdirP(std::string_view path) {
-    auto last_slash_pos = path.find_last_of("/");
-
-    std::string folder(last_slash_pos == std::string_view::npos
-                           ? std::string_view("")
-                           : path.substr(0, last_slash_pos));
-    if (folder.empty()) return;
-    MkdirP(folder);
-    VLOG(1) << "Creating " << folder;
-    const int result = mkdir(folder.c_str(), FLAGS_permissions);
-    if (result == -1 && errno == EEXIST) {
-      VLOG(1) << "Already exists";
-      return;
-    }
-    PCHECK(result == 0) << ": Error creating " << folder;
+  absl::Span<char> GetSharedMemory() const {
+    return absl::Span<char>(static_cast<char *>(data_), size_);
   }
 
+ private:
   ipc_lib::LocklessQueueConfiguration config_;
 
   int fd_;
@@ -184,8 +172,8 @@
                 event_loop->configuration()->channel_storage_duration()))),
         lockless_queue_(lockless_queue_memory_.memory(),
                         lockless_queue_memory_.config()),
-        data_storage_(static_cast<AlignedChar *>(aligned_alloc(
-                          alignof(AlignedChar), channel->max_size())),
+        data_storage_(static_cast<char *>(malloc(channel->max_size() +
+                                                 kChannelDataAlignment - 1)),
                       &free) {
     context_.data = nullptr;
     // Point the queue index at the next index to read starting now.  This
@@ -217,7 +205,7 @@
         actual_queue_index_.index(), &context_.monotonic_event_time,
         &context_.realtime_event_time, &context_.monotonic_remote_time,
         &context_.realtime_remote_time, &context_.remote_queue_index,
-        &context_.size, reinterpret_cast<char *>(data_storage_.get()));
+        &context_.size, data_storage_start());
     if (read_result == ipc_lib::LocklessQueue::ReadResult::GOOD) {
       context_.queue_index = actual_queue_index_.index();
       if (context_.remote_queue_index == 0xffffffffu) {
@@ -229,7 +217,7 @@
       if (context_.realtime_remote_time == aos::realtime_clock::min_time) {
         context_.realtime_remote_time = context_.realtime_event_time;
       }
-      context_.data = reinterpret_cast<char *>(data_storage_.get()) +
+      context_.data = data_storage_start() +
                       lockless_queue_.message_data_size() - context_.size;
       actual_queue_index_ = actual_queue_index_.Increment();
     }
@@ -267,7 +255,7 @@
         queue_index.index(), &context_.monotonic_event_time,
         &context_.realtime_event_time, &context_.monotonic_remote_time,
         &context_.realtime_remote_time, &context_.remote_queue_index,
-        &context_.size, reinterpret_cast<char *>(data_storage_.get()));
+        &context_.size, data_storage_start());
     if (read_result == ipc_lib::LocklessQueue::ReadResult::GOOD) {
       context_.queue_index = queue_index.index();
       if (context_.remote_queue_index == 0xffffffffu) {
@@ -279,7 +267,7 @@
       if (context_.realtime_remote_time == aos::realtime_clock::min_time) {
         context_.realtime_remote_time = context_.realtime_event_time;
       }
-      context_.data = reinterpret_cast<char *>(data_storage_.get()) +
+      context_.data = data_storage_start() +
                       lockless_queue_.message_data_size() - context_.size;
       actual_queue_index_ = queue_index.Increment();
     }
@@ -314,7 +302,15 @@
 
   void UnregisterWakeup() { lockless_queue_.UnregisterWakeup(); }
 
+  absl::Span<char> GetSharedMemory() const {
+    return lockless_queue_memory_.GetSharedMemory();
+  }
+
  private:
+  char *data_storage_start() {
+    return RoundChannelData(data_storage_.get(), channel_->max_size());
+  }
+
   const Channel *const channel_;
   MMapedQueue lockless_queue_memory_;
   ipc_lib::LocklessQueue lockless_queue_;
@@ -322,14 +318,7 @@
   ipc_lib::QueueIndex actual_queue_index_ =
       ipc_lib::LocklessQueue::empty_queue_index();
 
-  struct AlignedChar {
-    // Cortex-A72 (Raspberry Pi 4) and Cortex-A53 (Xavier AGX) both have 64 byte
-    // cache lines.
-    // V4L2 requires 64 byte alignment for USERPTR.
-    alignas(64) char data;
-  };
-
-  std::unique_ptr<AlignedChar, decltype(&free)> data_storage_;
+  std::unique_ptr<char, decltype(&free)> data_storage_;
 
   Context context_;
 };
@@ -402,6 +391,10 @@
     return true;
   }
 
+  absl::Span<char> GetSharedMemory() const {
+    return lockless_queue_memory_.GetSharedMemory();
+  }
+
  private:
   MMapedQueue lockless_queue_memory_;
   ipc_lib::LocklessQueue lockless_queue_;
@@ -456,6 +449,10 @@
 
   void UnregisterWakeup() { return simple_shm_fetcher_.UnregisterWakeup(); }
 
+  absl::Span<char> GetSharedMemory() const {
+    return simple_shm_fetcher_.GetSharedMemory();
+  }
+
  private:
   bool has_new_data_ = false;
 
@@ -708,7 +705,8 @@
     ScopedSignalMask mask({SIGINT, SIGHUP, SIGTERM});
     std::unique_lock<stl_mutex> locker(mutex_);
 
-    event_loops_.erase(std::find(event_loops_.begin(), event_loops_.end(), event_loop));
+    event_loops_.erase(
+        std::find(event_loops_.begin(), event_loops_.end(), event_loop));
 
     if (event_loops_.size() == 0u) {
       // The last caller restores the original signal handlers.
@@ -836,6 +834,17 @@
   UpdateTimingReport();
 }
 
+absl::Span<char> ShmEventLoop::GetWatcherSharedMemory(const Channel *channel) {
+  internal::WatcherState *const watcher_state =
+      static_cast<internal::WatcherState *>(GetWatcherState(channel));
+  return watcher_state->GetSharedMemory();
+}
+
+absl::Span<char> ShmEventLoop::GetShmSenderSharedMemory(
+    const aos::RawSender *sender) const {
+  return static_cast<const internal::ShmSender *>(sender)->GetSharedMemory();
+}
+
 pid_t ShmEventLoop::GetTid() { return syscall(SYS_gettid); }
 
 }  // namespace aos
diff --git a/aos/events/shm_event_loop.h b/aos/events/shm_event_loop.h
index 52bb338..fa870b8 100644
--- a/aos/events/shm_event_loop.h
+++ b/aos/events/shm_event_loop.h
@@ -3,6 +3,8 @@
 
 #include <vector>
 
+#include "absl/types/span.h"
+
 #include "aos/events/epoll.h"
 #include "aos/events/event_loop.h"
 #include "aos/events/event_loop_generated.h"
@@ -68,8 +70,20 @@
 
   int priority() const override { return priority_; }
 
+  // Returns the epoll loop used to run the event loop.
   internal::EPoll *epoll() { return &epoll_; }
 
+  // Returns the local mapping of the shared memory used by the watcher on the
+  // specified channel. A watcher must be created on this channel before calling
+  // this.
+  absl::Span<char> GetWatcherSharedMemory(const Channel *channel);
+
+  // Returns the local mapping of the shared memory used by the provided Sender
+  template <typename T>
+  absl::Span<char> GetSenderSharedMemory(aos::Sender<T> *sender) const {
+    return GetShmSenderSharedMemory(GetRawSender(sender));
+  }
+
  private:
   friend class internal::WatcherState;
   friend class internal::TimerHandlerState;
@@ -82,6 +96,9 @@
   // Returns the TID of the event loop.
   pid_t GetTid() override;
 
+  // Private method to access the shared memory mapping of a ShmSender
+  absl::Span<char> GetShmSenderSharedMemory(const aos::RawSender *sender) const;
+
   std::vector<std::function<void()>> on_run_;
   int priority_ = 0;
   std::string name_;
@@ -90,7 +107,6 @@
   internal::EPoll epoll_;
 };
 
-
 }  // namespace aos
 
 #endif  // AOS_EVENTS_SHM_EVENT_LOOP_H_
diff --git a/aos/events/shm_event_loop_test.cc b/aos/events/shm_event_loop_test.cc
index d33f496..984f978 100644
--- a/aos/events/shm_event_loop_test.cc
+++ b/aos/events/shm_event_loop_test.cc
@@ -38,7 +38,8 @@
 
   ::std::unique_ptr<EventLoop> Make(std::string_view name) override {
     if (configuration()->has_nodes()) {
-      FLAGS_override_hostname = "myhostname";
+      FLAGS_override_hostname =
+          std::string(my_node()->hostname()->string_view());
     }
     ::std::unique_ptr<ShmEventLoop> loop(new ShmEventLoop(configuration()));
     loop->set_name(name);
@@ -47,7 +48,8 @@
 
   ::std::unique_ptr<EventLoop> MakePrimary(std::string_view name) override {
     if (configuration()->has_nodes()) {
-      FLAGS_override_hostname = "myhostname";
+      FLAGS_override_hostname =
+          std::string(my_node()->hostname()->string_view());
     }
     ::std::unique_ptr<ShmEventLoop> loop =
         ::std::unique_ptr<ShmEventLoop>(new ShmEventLoop(configuration()));
@@ -181,6 +183,34 @@
   EXPECT_EQ(times.size(), 2u);
 }
 
+// Test GetWatcherSharedMemory in a few basic scenarios.
+TEST(ShmEventLoopDeathTest, GetWatcherSharedMemory) {
+  ShmEventLoopTestFactory factory;
+  auto generic_loop1 = factory.MakePrimary("primary");
+  ShmEventLoop *const loop1 = static_cast<ShmEventLoop *>(generic_loop1.get());
+  const auto channel = configuration::GetChannel(
+      loop1->configuration(), "/test", TestMessage::GetFullyQualifiedName(),
+      loop1->name(), loop1->node());
+
+  // First verify it handles an invalid channel reasonably.
+  EXPECT_DEATH(loop1->GetWatcherSharedMemory(channel),
+               "No watcher found for channel");
+
+  // Then, actually create a watcher, and verify it returns something sane.
+  loop1->MakeWatcher("/test", [](const TestMessage &) {});
+  EXPECT_FALSE(loop1->GetWatcherSharedMemory(channel).empty());
+}
+
+TEST(ShmEventLoopTest, GetSenderSharedMemory) {
+  ShmEventLoopTestFactory factory;
+  auto generic_loop1 = factory.MakePrimary("primary");
+  ShmEventLoop *const loop1 = static_cast<ShmEventLoop *>(generic_loop1.get());
+
+  // check that GetSenderSharedMemory returns non-null/non-empty memory span
+  auto sender = loop1->MakeSender<TestMessage>("/test");
+  EXPECT_FALSE(loop1->GetSenderSharedMemory(&sender).empty());
+}
+
 // TODO(austin): Test that missing a deadline with a timer recovers as expected.
 
 }  // namespace testing
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index e34a23f..c857bb6 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -5,6 +5,7 @@
 #include <string_view>
 
 #include "absl/container/btree_map.h"
+#include "aos/events/simulated_network_bridge.h"
 #include "aos/json_to_flatbuffer.h"
 #include "aos/util/phased_loop.h"
 
@@ -13,19 +14,17 @@
 // Container for both a message, and the context for it for simulation.  This
 // makes tracking the timestamps associated with the data easy.
 struct SimulatedMessage {
-  // Struct to let us force data to be well aligned.
-  struct OveralignedChar {
-    char data alignas(64);
-  };
-
   // Context for the data.
   Context context;
 
   // The data.
-  char *data() { return reinterpret_cast<char *>(&actual_data[0]); }
+  char *data(size_t buffer_size) {
+    return RoundChannelData(&actual_data[0], buffer_size);
+  }
 
-  // Then the data.
-  OveralignedChar actual_data[];
+  // Then the data, including padding on the end so we can align the buffer we
+  // actually return from data().
+  char actual_data[];
 };
 
 class SimulatedEventLoop;
@@ -36,6 +35,7 @@
  public:
   SimulatedWatcher(
       SimulatedEventLoop *simulated_event_loop, EventScheduler *scheduler,
+      NodeEventLoopFactory *node_event_loop_factory,
       const Channel *channel,
       std::function<void(const Context &context, const void *message)> fn);
 
@@ -59,6 +59,7 @@
   SimulatedEventLoop *simulated_event_loop_;
   EventHandler<SimulatedWatcher> event_;
   EventScheduler *scheduler_;
+  NodeEventLoopFactory *node_event_loop_factory_;
   EventScheduler::Token token_;
   SimulatedChannel *simulated_channel_ = nullptr;
 };
@@ -102,10 +103,6 @@
 
   const Channel *channel() const { return channel_; }
 
-  ::aos::monotonic_clock::time_point monotonic_now() const {
-    return scheduler_->monotonic_now();
-  }
-
  private:
   const Channel *channel_;
 
@@ -126,9 +123,9 @@
 // This is a shared_ptr so we don't have to implement refcounting or copying.
 std::shared_ptr<SimulatedMessage> MakeSimulatedMessage(size_t size) {
   SimulatedMessage *message = reinterpret_cast<SimulatedMessage *>(
-      malloc(sizeof(SimulatedMessage) + size));
+      malloc(sizeof(SimulatedMessage) + size + kChannelDataAlignment - 1));
   message->context.size = size;
-  message->context.data = message->data();
+  message->context.data = message->data(size);
 
   return std::shared_ptr<SimulatedMessage>(message, free);
 }
@@ -145,7 +142,7 @@
     if (!message_) {
       message_ = MakeSimulatedMessage(simulated_channel_->max_size());
     }
-    return message_->data();
+    return message_->data(simulated_channel_->max_size());
   }
 
   size_t size() override { return simulated_channel_->max_size(); }
@@ -186,12 +183,14 @@
     message_ = MakeSimulatedMessage(simulated_channel_->max_size());
 
     // Now fill in the message.  size is already populated above, and
-    // queue_index will be populated in queue_.  Put this at the back of the
-    // data segment.
-    memcpy(message_->data() + simulated_channel_->max_size() - size, msg, size);
+    // queue_index will be populated in simulated_channel_.  Put this at the
+    // back of the data segment.
+    memcpy(message_->data(simulated_channel_->max_size()) +
+               simulated_channel_->max_size() - size,
+           msg, size);
 
-    return Send(size, monotonic_remote_time, realtime_remote_time,
-                remote_queue_index);
+    return DoSend(size, monotonic_remote_time, realtime_remote_time,
+                  remote_queue_index);
   }
 
  private:
@@ -204,9 +203,11 @@
 
 class SimulatedFetcher : public RawFetcher {
  public:
-  explicit SimulatedFetcher(EventLoop *event_loop, SimulatedChannel *queue)
-      : RawFetcher(event_loop, queue->channel()), queue_(queue) {}
-  ~SimulatedFetcher() { queue_->UnregisterFetcher(this); }
+  explicit SimulatedFetcher(EventLoop *event_loop,
+                            SimulatedChannel *simulated_channel)
+      : RawFetcher(event_loop, simulated_channel->channel()),
+        simulated_channel_(simulated_channel) {}
+  ~SimulatedFetcher() { simulated_channel_->UnregisterFetcher(this); }
 
   std::pair<bool, monotonic_clock::time_point> DoFetchNext() override {
     if (msgs_.size() == 0) {
@@ -222,8 +223,8 @@
     if (msgs_.size() == 0) {
       // TODO(austin): Can we just do this logic unconditionally?  It is a lot
       // simpler.  And call clear, obviously.
-      if (!msg_ && queue_->latest_message()) {
-        SetMsg(queue_->latest_message());
+      if (!msg_ && simulated_channel_->latest_message()) {
+        SetMsg(simulated_channel_->latest_message());
         return std::make_pair(true, event_loop()->monotonic_now());
       } else {
         return std::make_pair(false, monotonic_clock::min_time);
@@ -260,7 +261,7 @@
     msgs_.emplace_back(buffer);
   }
 
-  SimulatedChannel *queue_;
+  SimulatedChannel *simulated_channel_;
   std::shared_ptr<SimulatedMessage> msg_;
 
   // Messages queued up but not in use.
@@ -270,6 +271,7 @@
 class SimulatedTimerHandler : public TimerHandler {
  public:
   explicit SimulatedTimerHandler(EventScheduler *scheduler,
+                                 NodeEventLoopFactory *node_event_loop_factory,
                                  SimulatedEventLoop *simulated_event_loop,
                                  ::std::function<void()> fn);
   ~SimulatedTimerHandler() { Disable(); }
@@ -285,6 +287,7 @@
   SimulatedEventLoop *simulated_event_loop_;
   EventHandler<SimulatedTimerHandler> event_;
   EventScheduler *scheduler_;
+  NodeEventLoopFactory *node_event_loop_factory_;
   EventScheduler::Token token_;
 
   monotonic_clock::time_point base_;
@@ -294,6 +297,7 @@
 class SimulatedPhasedLoopHandler : public PhasedLoopHandler {
  public:
   SimulatedPhasedLoopHandler(EventScheduler *scheduler,
+                             NodeEventLoopFactory *node_event_loop_factory,
                              SimulatedEventLoop *simulated_event_loop,
                              ::std::function<void(int)> fn,
                              const monotonic_clock::duration interval,
@@ -309,6 +313,7 @@
   EventHandler<SimulatedPhasedLoopHandler> event_;
 
   EventScheduler *scheduler_;
+  NodeEventLoopFactory *node_event_loop_factory_;
   EventScheduler::Token token_;
 };
 
@@ -316,6 +321,7 @@
  public:
   explicit SimulatedEventLoop(
       EventScheduler *scheduler,
+      NodeEventLoopFactory *node_event_loop_factory,
       absl::btree_map<SimpleChannel, std::unique_ptr<SimulatedChannel>>
           *channels,
       const Configuration *configuration,
@@ -324,6 +330,7 @@
       const Node *node, pid_t tid)
       : EventLoop(CHECK_NOTNULL(configuration)),
         scheduler_(scheduler),
+        node_event_loop_factory_(node_event_loop_factory),
         channels_(channels),
         raw_event_loops_(raw_event_loops),
         node_(node),
@@ -361,11 +368,11 @@
   }
 
   ::aos::monotonic_clock::time_point monotonic_now() override {
-    return scheduler_->monotonic_now();
+    return node_event_loop_factory_->monotonic_now();
   }
 
   ::aos::realtime_clock::time_point realtime_now() override {
-    return scheduler_->realtime_now();
+    return node_event_loop_factory_->realtime_now();
   }
 
   ::std::unique_ptr<RawSender> MakeRawSender(const Channel *channel) override;
@@ -379,17 +386,17 @@
 
   TimerHandler *AddTimer(::std::function<void()> callback) override {
     CHECK(!is_running());
-    return NewTimer(::std::unique_ptr<TimerHandler>(
-        new SimulatedTimerHandler(scheduler_, this, callback)));
+    return NewTimer(::std::unique_ptr<TimerHandler>(new SimulatedTimerHandler(
+        scheduler_, node_event_loop_factory_, this, callback)));
   }
 
   PhasedLoopHandler *AddPhasedLoop(::std::function<void(int)> callback,
                                    const monotonic_clock::duration interval,
                                    const monotonic_clock::duration offset =
                                        ::std::chrono::seconds(0)) override {
-    return NewPhasedLoop(
-        ::std::unique_ptr<PhasedLoopHandler>(new SimulatedPhasedLoopHandler(
-            scheduler_, this, callback, interval, offset)));
+    return NewPhasedLoop(::std::unique_ptr<PhasedLoopHandler>(
+        new SimulatedPhasedLoopHandler(scheduler_, node_event_loop_factory_,
+                                       this, callback, interval, offset)));
   }
 
   void OnRun(::std::function<void()> on_run) override {
@@ -433,6 +440,7 @@
   pid_t GetTid() override { return tid_; }
 
   EventScheduler *scheduler_;
+  NodeEventLoopFactory *node_event_loop_factory_;
   absl::btree_map<SimpleChannel, std::unique_ptr<SimulatedChannel>> *channels_;
   std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
       *raw_event_loops_;
@@ -459,17 +467,13 @@
   }
 }
 
-std::chrono::nanoseconds SimulatedEventLoopFactory::send_delay() const {
-  return send_delay_;
-}
-
 void SimulatedEventLoop::MakeRawWatcher(
     const Channel *channel,
     std::function<void(const Context &channel, const void *message)> watcher) {
   TakeWatcher(channel);
 
-  std::unique_ptr<SimulatedWatcher> shm_watcher(
-      new SimulatedWatcher(this, scheduler_, channel, std::move(watcher)));
+  std::unique_ptr<SimulatedWatcher> shm_watcher(new SimulatedWatcher(
+      this, scheduler_, node_event_loop_factory_, channel, std::move(watcher)));
 
   GetSimulatedChannel(channel)->MakeRawWatcher(shm_watcher.get());
   NewWatcher(std::move(shm_watcher));
@@ -511,12 +515,13 @@
 
 SimulatedWatcher::SimulatedWatcher(
     SimulatedEventLoop *simulated_event_loop, EventScheduler *scheduler,
-    const Channel *channel,
+    NodeEventLoopFactory *node_event_loop_factory, const Channel *channel,
     std::function<void(const Context &context, const void *message)> fn)
     : WatcherState(simulated_event_loop, channel, std::move(fn)),
       simulated_event_loop_(simulated_event_loop),
       event_(this),
       scheduler_(scheduler),
+      node_event_loop_factory_(node_event_loop_factory),
       token_(scheduler_->InvalidToken()) {}
 
 SimulatedWatcher::~SimulatedWatcher() {
@@ -574,9 +579,10 @@
 }
 
 void SimulatedWatcher::DoSchedule(monotonic_clock::time_point event_time) {
-  token_ =
-      scheduler_->Schedule(event_time + simulated_event_loop_->send_delay(),
-                           [this]() { simulated_event_loop_->HandleEvent(); });
+  token_ = scheduler_->Schedule(
+      node_event_loop_factory_->ToDistributedClock(
+          event_time + simulated_event_loop_->send_delay()),
+      [this]() { simulated_event_loop_->HandleEvent(); });
 }
 
 void SimulatedChannel::MakeRawWatcher(SimulatedWatcher *watcher) {
@@ -600,8 +606,8 @@
 uint32_t SimulatedChannel::Send(std::shared_ptr<SimulatedMessage> message) {
   const uint32_t queue_index = next_queue_index_.index();
   message->context.queue_index = queue_index;
-  message->context.data =
-      message->data() + channel()->max_size() - message->context.size;
+  message->context.data = message->data(channel()->max_size()) +
+                          channel()->max_size() - message->context.size;
   next_queue_index_ = next_queue_index_.Increment();
 
   latest_message_ = message;
@@ -622,12 +628,13 @@
 }
 
 SimulatedTimerHandler::SimulatedTimerHandler(
-    EventScheduler *scheduler, SimulatedEventLoop *simulated_event_loop,
-    ::std::function<void()> fn)
+    EventScheduler *scheduler, NodeEventLoopFactory *node_event_loop_factory,
+    SimulatedEventLoop *simulated_event_loop, ::std::function<void()> fn)
     : TimerHandler(simulated_event_loop, std::move(fn)),
       simulated_event_loop_(simulated_event_loop),
       event_(this),
       scheduler_(scheduler),
+      node_event_loop_factory_(node_event_loop_factory),
       token_(scheduler_->InvalidToken()) {}
 
 void SimulatedTimerHandler::Setup(monotonic_clock::time_point base,
@@ -639,10 +646,12 @@
   repeat_offset_ = repeat_offset;
   if (base < monotonic_now) {
     token_ = scheduler_->Schedule(
-        monotonic_now, [this]() { simulated_event_loop_->HandleEvent(); });
+        node_event_loop_factory_->ToDistributedClock(monotonic_now),
+        [this]() { simulated_event_loop_->HandleEvent(); });
   } else {
     token_ = scheduler_->Schedule(
-        base, [this]() { simulated_event_loop_->HandleEvent(); });
+        node_event_loop_factory_->ToDistributedClock(base),
+        [this]() { simulated_event_loop_->HandleEvent(); });
   }
   event_.set_event_time(base_);
   simulated_event_loop_->AddEvent(&event_);
@@ -655,7 +664,8 @@
     // Reschedule.
     while (base_ <= monotonic_now) base_ += repeat_offset_;
     token_ = scheduler_->Schedule(
-        base_, [this]() { simulated_event_loop_->HandleEvent(); });
+        node_event_loop_factory_->ToDistributedClock(base_),
+        [this]() { simulated_event_loop_->HandleEvent(); });
     event_.set_event_time(base_);
     simulated_event_loop_->AddEvent(&event_);
   } else {
@@ -674,13 +684,15 @@
 }
 
 SimulatedPhasedLoopHandler::SimulatedPhasedLoopHandler(
-    EventScheduler *scheduler, SimulatedEventLoop *simulated_event_loop,
-    ::std::function<void(int)> fn, const monotonic_clock::duration interval,
+    EventScheduler *scheduler, NodeEventLoopFactory *node_event_loop_factory,
+    SimulatedEventLoop *simulated_event_loop, ::std::function<void(int)> fn,
+    const monotonic_clock::duration interval,
     const monotonic_clock::duration offset)
     : PhasedLoopHandler(simulated_event_loop, std::move(fn), interval, offset),
       simulated_event_loop_(simulated_event_loop),
       event_(this),
       scheduler_(scheduler),
+      node_event_loop_factory_(node_event_loop_factory),
       token_(scheduler_->InvalidToken()) {}
 
 SimulatedPhasedLoopHandler::~SimulatedPhasedLoopHandler() {
@@ -702,51 +714,80 @@
 void SimulatedPhasedLoopHandler::Schedule(
     monotonic_clock::time_point sleep_time) {
   token_ = scheduler_->Schedule(
-      sleep_time, [this]() { simulated_event_loop_->HandleEvent(); });
+      node_event_loop_factory_->ToDistributedClock(sleep_time),
+      [this]() { simulated_event_loop_->HandleEvent(); });
   event_.set_event_time(sleep_time);
   simulated_event_loop_->AddEvent(&event_);
 }
 
+NodeEventLoopFactory::NodeEventLoopFactory(
+    EventScheduler *scheduler, SimulatedEventLoopFactory *factory,
+    const Node *node,
+    std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
+        *raw_event_loops)
+    : scheduler_(scheduler),
+      factory_(factory),
+      node_(node),
+      raw_event_loops_(raw_event_loops) {}
+
 SimulatedEventLoopFactory::SimulatedEventLoopFactory(
     const Configuration *configuration)
-    : configuration_(CHECK_NOTNULL(configuration)), node_(nullptr) {
-  CHECK(!configuration_->has_nodes())
-      << ": Got a configuration with multiple nodes and no node was selected.";
-}
-
-SimulatedEventLoopFactory::SimulatedEventLoopFactory(
-    const Configuration *configuration, std::string_view node_name)
-    : SimulatedEventLoopFactory(
-          configuration, configuration::GetNode(configuration, node_name)) {}
-
-SimulatedEventLoopFactory::SimulatedEventLoopFactory(
-    const Configuration *configuration, const Node *node)
-    : configuration_(CHECK_NOTNULL(configuration)), node_(node) {
-  if (node != nullptr) {
-    CHECK(configuration_->has_nodes())
-        << ": Got a configuration with no nodes and node \""
-        << node->name()->string_view() << "\" was selected.";
-    bool found = false;
-    for (const Node *node : *configuration_->nodes()) {
-      if (node == node_) {
-        found = true;
-        break;
-      }
+    : configuration_(CHECK_NOTNULL(configuration)) {
+  if (configuration::MultiNode(configuration_)) {
+    for (const Node *node : *configuration->nodes()) {
+      nodes_.emplace_back(node);
     }
-    CHECK(found) << ": node must be a pointer in the configuration.";
+  } else {
+    nodes_.emplace_back(nullptr);
+  }
+
+  for (const Node *node : nodes_) {
+    node_factories_.emplace_back(
+        new NodeEventLoopFactory(&scheduler_, this, node, &raw_event_loops_));
+  }
+
+  if (configuration::MultiNode(configuration)) {
+    bridge_ = std::make_unique<message_bridge::SimulatedMessageBridge>(this);
   }
 }
 
 SimulatedEventLoopFactory::~SimulatedEventLoopFactory() {}
 
+NodeEventLoopFactory *SimulatedEventLoopFactory::GetNodeEventLoopFactory(
+    const Node *node) {
+  auto result = std::find_if(
+      node_factories_.begin(), node_factories_.end(),
+      [node](const std::unique_ptr<NodeEventLoopFactory> &node_factory) {
+        return node_factory->node() == node;
+      });
+
+  CHECK(result != node_factories_.end())
+      << ": Failed to find node " << FlatbufferToJson(node);
+
+  return result->get();
+}
+
 ::std::unique_ptr<EventLoop> SimulatedEventLoopFactory::MakeEventLoop(
+    std::string_view name, const Node *node) {
+  if (node == nullptr) {
+    CHECK(!configuration::MultiNode(configuration()))
+        << ": Can't make a single node event loop in a multi-node world.";
+  } else {
+    CHECK(configuration::MultiNode(configuration()))
+        << ": Can't make a multi-node event loop in a single-node world.";
+  }
+  return GetNodeEventLoopFactory(node)->MakeEventLoop(name);
+}
+
+::std::unique_ptr<EventLoop> NodeEventLoopFactory::MakeEventLoop(
     std::string_view name) {
   pid_t tid = tid_;
   ++tid_;
   ::std::unique_ptr<SimulatedEventLoop> result(new SimulatedEventLoop(
-      &scheduler_, &channels_, configuration_, &raw_event_loops_, node_, tid));
+      scheduler_, this, &channels_, factory_->configuration(), raw_event_loops_,
+      node_, tid));
   result->set_name(name);
-  result->set_send_delay(send_delay_);
+  result->set_send_delay(factory_->send_delay());
   return std::move(result);
 }
 
diff --git a/aos/events/simulated_event_loop.h b/aos/events/simulated_event_loop.h
index de37c03..8cff0d7 100644
--- a/aos/events/simulated_event_loop.h
+++ b/aos/events/simulated_event_loop.h
@@ -23,66 +23,175 @@
 // Class for simulated fetchers.
 class SimulatedChannel;
 
+class NodeEventLoopFactory;
+namespace message_bridge {
+class SimulatedMessageBridge;
+}
+
+// There are 2 concepts needed to support multi-node simulations.
+//  1) The node.  This is implemented with NodeEventLoopFactory.
+//  2) The "robot" which runs multiple nodes.  This is implemented with
+//     SimulatedEventLoopFactory.
+//
+// To make things easier, SimulatedEventLoopFactory takes an optional Node
+// argument if you want to make event loops without interacting with the
+// NodeEventLoopFactory object.
+//
+// The basic flow goes something like as follows:
+//
+// SimulatedEventLoopFactory factory(config);
+// const Node *pi1 = configuration::GetNode(factory.configuration(), "pi1");
+// std::unique_ptr<EventLoop> event_loop = factory.MakeEventLoop("ping", pi1);
+//
+// Or
+//
+// SimulatedEventLoopFactory factory(config);
+// const Node *pi1 = configuration::GetNode(factory.configuration(), "pi1");
+// NodeEventLoopFactory *pi1_factory = factory.GetNodeEventLoopFactory(pi1);
+// std::unique_ptr<EventLoop> event_loop = pi1_factory.MakeEventLoop("ping");
+//
+// The distributed_clock is used to be the base time.  NodeEventLoopFactory has
+// all the information needed to adjust both the realtime and monotonic clocks
+// relative to the distributed_clock.
 class SimulatedEventLoopFactory {
  public:
   // Constructs a SimulatedEventLoopFactory with the provided configuration.
   // This configuration must remain in scope for the lifetime of the factory and
   // all sub-objects.
   SimulatedEventLoopFactory(const Configuration *configuration);
-  SimulatedEventLoopFactory(const Configuration *configuration,
-                            std::string_view node_name);
-  SimulatedEventLoopFactory(const Configuration *configuration,
-                            const Node *node);
   ~SimulatedEventLoopFactory();
 
-  ::std::unique_ptr<EventLoop> MakeEventLoop(std::string_view name);
+  // Creates an event loop.  If running in a multi-node environment, node needs
+  // to point to the node to create this event loop on.
+  ::std::unique_ptr<EventLoop> MakeEventLoop(std::string_view name,
+                                             const Node *node = nullptr);
+
+  // Returns the NodeEventLoopFactory for the provided node.  The returned
+  // NodeEventLoopFactory is owned by the SimulatedEventLoopFactory and has a
+  // lifetime identical to the factory.
+  NodeEventLoopFactory *GetNodeEventLoopFactory(const Node *node);
 
   // Starts executing the event loops unconditionally.
   void Run();
   // Executes the event loops for a duration.
-  void RunFor(monotonic_clock::duration duration);
+  void RunFor(distributed_clock::duration duration);
 
   // Stops executing all event loops.  Meant to be called from within an event
   // loop handler.
   void Exit() { scheduler_.Exit(); }
 
-  // Sets the simulated send delay for the factory.
+  const std::vector<const Node *> &nodes() const { return nodes_; }
+
+  // Sets the simulated send delay for all messages sent within a single node.
   void set_send_delay(std::chrono::nanoseconds send_delay);
-  std::chrono::nanoseconds send_delay() const;
+  std::chrono::nanoseconds send_delay() const { return send_delay_; }
+
+  // Sets the simulated network delay for messages forwarded between nodes.
+  void set_network_delay(std::chrono::nanoseconds network_delay);
+  std::chrono::nanoseconds network_delay() const { return network_delay_; }
+
+  // Returns the clock used to synchronize the nodes.
+  distributed_clock::time_point distributed_now() const {
+    return scheduler_.distributed_now();
+  }
+
+  // Returns the configuration used for everything.
+  const Configuration *configuration() const { return configuration_; }
+
+ private:
+  const Configuration *const configuration_;
+  EventScheduler scheduler_;
+  // List of event loops to manage running and not running for.
+  // The function is a callback used to set and clear the running bool on each
+  // event loop.
+  std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
+      raw_event_loops_;
+
+  std::chrono::nanoseconds send_delay_ = std::chrono::microseconds(50);
+  std::chrono::nanoseconds network_delay_ = std::chrono::microseconds(100);
+
+  std::vector<std::unique_ptr<NodeEventLoopFactory>> node_factories_;
+
+  std::vector<const Node *> nodes_;
+
+  std::unique_ptr<message_bridge::SimulatedMessageBridge> bridge_;
+};
+
+// This class holds all the state required to be a single node.
+class NodeEventLoopFactory {
+ public:
+  ::std::unique_ptr<EventLoop> MakeEventLoop(std::string_view name);
 
   // Returns the node that this factory is running as, or nullptr if this is a
   // single node setup.
   const Node *node() const { return node_; }
 
-  monotonic_clock::time_point monotonic_now() const {
-    return scheduler_.monotonic_now();
-  }
-  realtime_clock::time_point realtime_now() const {
-    return scheduler_.realtime_now();
-  }
-
   // Sets realtime clock to realtime_now for a given monotonic clock.
   void SetRealtimeOffset(monotonic_clock::time_point monotonic_now,
                          realtime_clock::time_point realtime_now) {
-    scheduler_.SetRealtimeOffset(monotonic_now, realtime_now);
+    realtime_offset_ =
+        realtime_now.time_since_epoch() - monotonic_now.time_since_epoch();
   }
 
- private:
-  const Configuration *const configuration_;
-  EventScheduler scheduler_;
-  // Map from name, type to queue.
-  absl::btree_map<SimpleChannel, std::unique_ptr<SimulatedChannel>> channels_;
-  // List of event loops to manage running and not running for.
-  std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
-      raw_event_loops_;
+  // Returns the current time on both clocks.
+  inline monotonic_clock::time_point monotonic_now() const;
+  inline realtime_clock::time_point realtime_now() const;
 
-  std::chrono::nanoseconds send_delay_ = std::chrono::microseconds(50);
+  // Returns the simulated network delay for messages forwarded between nodes.
+  std::chrono::nanoseconds network_delay() const {
+    return factory_->network_delay();
+  }
+  // Returns the simulated send delay for all messages sent within a single
+  // node.
+  std::chrono::nanoseconds send_delay() const { return factory_->send_delay(); }
+
+  // Converts a time to the distributed clock for scheduling and cross-node time
+  // measurement.
+  inline distributed_clock::time_point ToDistributedClock(
+      monotonic_clock::time_point time) const;
+
+ private:
+  friend class SimulatedEventLoopFactory;
+  NodeEventLoopFactory(
+      EventScheduler *scheduler, SimulatedEventLoopFactory *factory,
+      const Node *node,
+      std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
+          *raw_event_loops);
+
+  EventScheduler *const scheduler_;
+  SimulatedEventLoopFactory *const factory_;
 
   const Node *const node_;
 
+  std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
+      *const raw_event_loops_;
+
+  std::chrono::nanoseconds monotonic_offset_ = std::chrono::seconds(0);
+  std::chrono::nanoseconds realtime_offset_ = std::chrono::seconds(0);
+
+  // Map from name, type to queue.
+  absl::btree_map<SimpleChannel, std::unique_ptr<SimulatedChannel>> channels_;
+
+  // pid so we get unique timing reports.
   pid_t tid_ = 0;
 };
 
+inline monotonic_clock::time_point NodeEventLoopFactory::monotonic_now() const {
+  return monotonic_clock::time_point(
+      factory_->distributed_now().time_since_epoch() + monotonic_offset_);
+}
+
+inline realtime_clock::time_point NodeEventLoopFactory::realtime_now() const {
+  return realtime_clock::time_point(monotonic_now().time_since_epoch() +
+                                    realtime_offset_);
+}
+
+inline distributed_clock::time_point NodeEventLoopFactory::ToDistributedClock(
+    monotonic_clock::time_point time) const {
+  return distributed_clock::time_point(time.time_since_epoch() -
+                                       monotonic_offset_);
+}
+
 }  // namespace aos
 
 #endif  // AOS_EVENTS_SIMULATED_EVENT_LOOP_H_
diff --git a/aos/events/simulated_event_loop_test.cc b/aos/events/simulated_event_loop_test.cc
index 4328d6f..de5cbae 100644
--- a/aos/events/simulated_event_loop_test.cc
+++ b/aos/events/simulated_event_loop_test.cc
@@ -3,6 +3,8 @@
 #include <string_view>
 
 #include "aos/events/event_loop_param_test.h"
+#include "aos/events/ping_lib.h"
+#include "aos/events/pong_lib.h"
 #include "aos/events/test_message_generated.h"
 #include "gtest/gtest.h"
 
@@ -15,11 +17,11 @@
  public:
   ::std::unique_ptr<EventLoop> Make(std::string_view name) override {
     MaybeMake();
-    return event_loop_factory_->MakeEventLoop(name);
+    return event_loop_factory_->MakeEventLoop(name, my_node());
   }
   ::std::unique_ptr<EventLoop> MakePrimary(std::string_view name) override {
     MaybeMake();
-    return event_loop_factory_->MakeEventLoop(name);
+    return event_loop_factory_->MakeEventLoop(name, my_node());
   }
 
   void Run() override { event_loop_factory_->Run(); }
@@ -38,8 +40,8 @@
   void MaybeMake() {
     if (!event_loop_factory_) {
       if (configuration()->has_nodes()) {
-        event_loop_factory_ = std::make_unique<SimulatedEventLoopFactory>(
-            configuration(), my_node());
+        event_loop_factory_ =
+            std::make_unique<SimulatedEventLoopFactory>(configuration());
       } else {
         event_loop_factory_ =
             std::make_unique<SimulatedEventLoopFactory>(configuration());
@@ -64,12 +66,13 @@
   int counter = 0;
   EventScheduler scheduler;
 
-  scheduler.Schedule(::aos::monotonic_clock::now(),
-                      [&counter]() { counter += 1; });
+  scheduler.Schedule(distributed_clock::epoch() + chrono::seconds(1),
+                     [&counter]() { counter += 1; });
   scheduler.Run();
   EXPECT_EQ(counter, 1);
-  auto token = scheduler.Schedule(::aos::monotonic_clock::now(),
-                                   [&counter]() { counter += 1; });
+  auto token =
+      scheduler.Schedule(distributed_clock::epoch() + chrono::seconds(2),
+                         [&counter]() { counter += 1; });
   scheduler.Deschedule(token);
   scheduler.Run();
   EXPECT_EQ(counter, 1);
@@ -80,8 +83,9 @@
   int counter = 0;
   EventScheduler scheduler;
 
-  auto token = scheduler.Schedule(::aos::monotonic_clock::now(),
-                                   [&counter]() { counter += 1; });
+  auto token =
+      scheduler.Schedule(distributed_clock::epoch() + chrono::seconds(1),
+                         [&counter]() { counter += 1; });
   scheduler.Deschedule(token);
   scheduler.Run();
   EXPECT_EQ(counter, 0);
@@ -100,8 +104,6 @@
   simulated_event_loop_factory.RunFor(chrono::seconds(1));
 
   EXPECT_EQ(::aos::monotonic_clock::epoch() + chrono::seconds(1),
-            simulated_event_loop_factory.monotonic_now());
-  EXPECT_EQ(::aos::monotonic_clock::epoch() + chrono::seconds(1),
             event_loop->monotonic_now());
 }
 
@@ -125,8 +127,6 @@
   simulated_event_loop_factory.RunFor(chrono::seconds(1));
 
   EXPECT_EQ(::aos::monotonic_clock::epoch() + chrono::seconds(1),
-            simulated_event_loop_factory.monotonic_now());
-  EXPECT_EQ(::aos::monotonic_clock::epoch() + chrono::seconds(1),
             event_loop->monotonic_now());
   EXPECT_EQ(counter, 10);
 }
@@ -232,5 +232,45 @@
             0.0);
 }
 
+// Tests that ping and pong work when on 2 different nodes.
+TEST(SimulatedEventLoopTest, MultinodePingPong) {
+  aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+      aos::configuration::ReadConfig(
+          "aos/events/multinode_pingpong_config.json");
+  const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
+  const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
+
+  SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
+
+  std::unique_ptr<EventLoop> ping_event_loop =
+      simulated_event_loop_factory.MakeEventLoop("ping", pi1);
+  Ping ping(ping_event_loop.get());
+
+  std::unique_ptr<EventLoop> pong_event_loop =
+      simulated_event_loop_factory.MakeEventLoop("pong", pi2);
+  Pong pong(pong_event_loop.get());
+
+  std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
+      simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
+
+  int pi2_pong_count = 0;
+  pi2_pong_counter_event_loop->MakeWatcher(
+      "/test",
+      [&pi2_pong_count](const examples::Pong & /*pong*/) { ++pi2_pong_count; });
+
+  std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
+      simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
+  int pi1_pong_count = 0;
+  pi1_pong_counter_event_loop->MakeWatcher(
+      "/test",
+      [&pi1_pong_count](const examples::Pong & /*pong*/) { ++pi1_pong_count; });
+
+  simulated_event_loop_factory.RunFor(chrono::seconds(10) +
+                                      chrono::milliseconds(5));
+
+  EXPECT_EQ(pi1_pong_count, 1001);
+  EXPECT_EQ(pi2_pong_count, 1001);
+}
+
 }  // namespace testing
 }  // namespace aos
diff --git a/aos/events/simulated_network_bridge.cc b/aos/events/simulated_network_bridge.cc
new file mode 100644
index 0000000..57f2efd
--- /dev/null
+++ b/aos/events/simulated_network_bridge.cc
@@ -0,0 +1,165 @@
+#include "aos/events/simulated_network_bridge.h"
+
+#include "aos/events/event_loop.h"
+#include "aos/events/simulated_event_loop.h"
+
+namespace aos {
+namespace message_bridge {
+
+// This class delays messages forwarded between two factories.
+//
+// The basic design is that we need to use the distributed_clock to convert
+// monotonic times from the source to the destination node.  We also use a
+// fetcher to manage the queue of data, and a timer to schedule the sends.
+class RawMessageDelayer {
+ public:
+  RawMessageDelayer(aos::NodeEventLoopFactory *fetch_node_factory,
+                    aos::NodeEventLoopFactory *send_node_factory,
+                    aos::EventLoop *send_event_loop,
+                    std::unique_ptr<aos::RawFetcher> fetcher,
+                    std::unique_ptr<aos::RawSender> sender)
+      : fetch_node_factory_(fetch_node_factory),
+        send_node_factory_(send_node_factory),
+        send_event_loop_(send_event_loop),
+        fetcher_(std::move(fetcher)),
+        sender_(std::move(sender)) {
+    timer_ = send_event_loop_->AddTimer([this]() { Send(); });
+
+    Schedule();
+  }
+
+  // Kicks us to re-fetch and schedule the timer.
+  void Schedule() {
+    if (fetcher_->context().data == nullptr || sent_) {
+      sent_ = !fetcher_->FetchNext();
+    }
+
+    if (fetcher_->context().data == nullptr) {
+      return;
+    }
+
+    if (sent_) {
+      return;
+    }
+
+    // Compute the time to publish this message.
+    const monotonic_clock::time_point monotonic_delivered_time =
+        DeliveredTime(fetcher_->context());
+
+    CHECK_GE(monotonic_delivered_time, send_node_factory_->monotonic_now())
+        << ": Trying to deliver message in the past...";
+
+    timer_->Setup(monotonic_delivered_time);
+  }
+
+ private:
+  // Acutally sends the message, and reschedules.
+  void Send() {
+    // Compute the time to publish this message.
+    const monotonic_clock::time_point monotonic_delivered_time =
+        DeliveredTime(fetcher_->context());
+
+    CHECK_EQ(monotonic_delivered_time, send_node_factory_->monotonic_now())
+        << ": Message to be sent at the wrong time.";
+
+    // And also fill out the send times as well.
+    sender_->Send(fetcher_->context().data, fetcher_->context().size,
+                  fetcher_->context().monotonic_event_time,
+                  fetcher_->context().realtime_event_time,
+                  fetcher_->context().queue_index);
+
+    sent_ = true;
+    Schedule();
+  }
+
+  // Converts from time on the sending node to time on the receiving node.
+  monotonic_clock::time_point DeliveredTime(const Context &context) const {
+    const distributed_clock::time_point distributed_sent_time =
+        fetch_node_factory_->ToDistributedClock(context.monotonic_event_time);
+
+    return aos::monotonic_clock::epoch() +
+           (distributed_sent_time - send_node_factory_->ToDistributedClock(
+                                        aos::monotonic_clock::epoch())) +
+           send_node_factory_->network_delay() +
+           send_node_factory_->send_delay();
+  }
+
+  // Factories used for time conversion.
+  aos::NodeEventLoopFactory *fetch_node_factory_;
+  aos::NodeEventLoopFactory *send_node_factory_;
+
+  // Event loop which sending is scheduled on.
+  aos::EventLoop *send_event_loop_;
+  // Timer used to send.
+  aos::TimerHandler *timer_;
+  // Fetcher used to receive messages.
+  std::unique_ptr<aos::RawFetcher> fetcher_;
+  // Sender to send them back out.
+  std::unique_ptr<aos::RawSender> sender_;
+  // True if we have sent the message in the fetcher.
+  bool sent_ = false;
+};
+
+SimulatedMessageBridge::SimulatedMessageBridge(
+    SimulatedEventLoopFactory *simulated_event_loop_factory) {
+  CHECK(
+      configuration::MultiNode(simulated_event_loop_factory->configuration()));
+
+  // Pre-build up event loops for every node.  They are pretty cheap anyways.
+  for (const Node *node : simulated_event_loop_factory->nodes()) {
+    CHECK(event_loop_map_
+              .insert({node, simulated_event_loop_factory->MakeEventLoop(
+                                 "message_bridge", node)})
+              .second);
+  }
+
+  for (const Channel *channel :
+       *simulated_event_loop_factory->configuration()->channels()) {
+    if (!channel->has_destination_nodes()) {
+      continue;
+    }
+
+    // Find the sending node.
+    const Node *node =
+        configuration::GetNode(simulated_event_loop_factory->configuration(),
+                               channel->source_node()->string_view());
+    auto source_event_loop = event_loop_map_.find(node);
+    CHECK(source_event_loop != event_loop_map_.end());
+
+    std::unique_ptr<DelayersVector> delayers =
+        std::make_unique<DelayersVector>();
+
+    // And then build up a RawMessageDelayer for each destination.
+    for (const Connection *connection : *channel->destination_nodes()) {
+      const Node *destination_node =
+          configuration::GetNode(simulated_event_loop_factory->configuration(),
+                                 connection->name()->string_view());
+      auto destination_event_loop = event_loop_map_.find(destination_node);
+      CHECK(destination_event_loop != event_loop_map_.end());
+
+      delayers->emplace_back(std::make_unique<RawMessageDelayer>(
+          simulated_event_loop_factory->GetNodeEventLoopFactory(node),
+          simulated_event_loop_factory->GetNodeEventLoopFactory(
+              destination_node),
+          destination_event_loop->second.get(),
+          source_event_loop->second->MakeRawFetcher(channel),
+          destination_event_loop->second->MakeRawSender(channel)));
+    }
+
+    // And register every delayer to be poked when a new message shows up.
+    source_event_loop->second->MakeRawWatcher(
+        channel,
+        [captured_delayers = delayers.get()](const Context &, const void *) {
+          for (std::unique_ptr<RawMessageDelayer> &delayer :
+               *captured_delayers) {
+            delayer->Schedule();
+          }
+        });
+    delayers_list_.emplace_back(std::move(delayers));
+  }
+}
+
+SimulatedMessageBridge::~SimulatedMessageBridge() {}
+
+}  // namespace message_bridge
+}  // namespace aos
diff --git a/aos/events/simulated_network_bridge.h b/aos/events/simulated_network_bridge.h
new file mode 100644
index 0000000..5d613ab
--- /dev/null
+++ b/aos/events/simulated_network_bridge.h
@@ -0,0 +1,36 @@
+#ifndef AOS_EVENTS_SIMULATED_NETWORK_BRIDGE_H_
+#define AOS_EVENTS_SIMULATED_NETWORK_BRIDGE_H_
+
+#include "aos/events/event_loop.h"
+#include "aos/events/simulated_event_loop.h"
+
+namespace aos {
+namespace message_bridge {
+
+class RawMessageDelayer;
+
+// This class moves messages between nodes.  It is implemented as a separate
+// class because it would have been even harder to manage forwarding in the
+// SimulatedEventLoopFactory.
+class SimulatedMessageBridge {
+ public:
+  // Constructs the bridge.
+  SimulatedMessageBridge(
+      SimulatedEventLoopFactory *simulated_event_loop_factory);
+  ~SimulatedMessageBridge();
+
+ private:
+  // Map of nodes to event loops.  This is a member variable so that the
+  // lifetime of the event loops matches the lifetime of the bridge.
+  std::map<const Node *, std::unique_ptr<aos::EventLoop>> event_loop_map_;
+
+
+  // List of delayers used to resend the messages.
+  using DelayersVector = std::vector<std::unique_ptr<RawMessageDelayer>>;
+  std::vector<std::unique_ptr<DelayersVector>> delayers_list_;
+};
+
+}  // namespace message_bridge
+}  // namespace aos
+
+#endif  // AOS_EVENTS_SIMULATED_NETWORK_BRIDGE_H_
diff --git a/aos/flatbuffers.h b/aos/flatbuffers.h
index c1c0db0..d9fcab6 100644
--- a/aos/flatbuffers.h
+++ b/aos/flatbuffers.h
@@ -12,6 +12,8 @@
 // This class is a base class for all sizes of array backed allocators.
 class FixedAllocatorBase : public flatbuffers::Allocator {
  public:
+  ~FixedAllocatorBase() override { CHECK(!is_allocated_); }
+
   // TODO(austin): Read the contract for these.
   uint8_t *allocate(size_t) override;
 
@@ -25,6 +27,7 @@
   virtual size_t size() const = 0;
 
   void Reset() { is_allocated_ = false; }
+  bool is_allocated() const { return is_allocated_; }
 
  private:
   bool is_allocated_ = false;
@@ -51,14 +54,32 @@
 class PreallocatedAllocator : public FixedAllocatorBase {
  public:
   PreallocatedAllocator(void *data, size_t size) : data_(data), size_(size) {}
-  uint8_t *data() override { return reinterpret_cast<uint8_t *>(data_); }
-  const uint8_t *data() const override {
-    return reinterpret_cast<const uint8_t *>(data_);
+  PreallocatedAllocator(const PreallocatedAllocator&) = delete;
+  PreallocatedAllocator(PreallocatedAllocator &&other)
+      : data_(other.data_), size_(other.size_) {
+    CHECK(!is_allocated());
+    CHECK(!other.is_allocated());
   }
-  size_t size() const override { return size_; }
+
+  PreallocatedAllocator &operator=(const PreallocatedAllocator &) = delete;
+  PreallocatedAllocator &operator=(PreallocatedAllocator &&other) {
+    CHECK(!is_allocated());
+    CHECK(!other.is_allocated());
+    data_ = other.data_;
+    size_ = other.size_;
+    return *this;
+  }
+
+  uint8_t *data() final {
+    return reinterpret_cast<uint8_t *>(CHECK_NOTNULL(data_));
+  }
+  const uint8_t *data() const final {
+    return reinterpret_cast<const uint8_t *>(CHECK_NOTNULL(data_));
+  }
+  size_t size() const final { return size_; }
 
  private:
-  void* data_;
+  void *data_;
   size_t size_;
 };
 
diff --git a/aos/ipc_lib/BUILD b/aos/ipc_lib/BUILD
index 57adc6d..91b050c 100644
--- a/aos/ipc_lib/BUILD
+++ b/aos/ipc_lib/BUILD
@@ -204,6 +204,7 @@
     visibility = ["//visibility:public"],
     deps = [
         ":aos_sync",
+        ":data_alignment",
         ":index",
         "//aos:realtime",
         "//aos/time",
@@ -259,3 +260,14 @@
         "//aos/testing:test_logging",
     ],
 )
+
+cc_library(
+    name = "data_alignment",
+    hdrs = [
+        "data_alignment.h",
+    ],
+    visibility = ["//visibility:public"],
+    deps = [
+        "@com_github_google_glog//:glog",
+    ],
+)
diff --git a/aos/ipc_lib/data_alignment.h b/aos/ipc_lib/data_alignment.h
new file mode 100644
index 0000000..2f59b78
--- /dev/null
+++ b/aos/ipc_lib/data_alignment.h
@@ -0,0 +1,41 @@
+#ifndef AOS_IPC_LIB_DATA_ALIGNMENT_H_
+#define AOS_IPC_LIB_DATA_ALIGNMENT_H_
+
+#include "glog/logging.h"
+
+namespace aos {
+
+// All data buffers sent over or received from a channel will guarantee this
+// alignment for their end. Flatbuffers aligns from the end, so this is what
+// matters.
+//
+// 64 is a reasonable choice for now:
+//   Cortex-A72 (Raspberry Pi 4) and Cortex-A53 (Xavier AGX) both have 64 byte
+//   cache lines.
+//   V4L2 requires 64 byte alignment for USERPTR buffers.
+static constexpr size_t kChannelDataAlignment = 64;
+
+template <typename T>
+inline void CheckChannelDataAlignment(T *data, size_t size) {
+  CHECK_EQ((reinterpret_cast<uintptr_t>(data) + size) % kChannelDataAlignment,
+           0u)
+      << ": data pointer is not end aligned as it should be: " << data << " + "
+      << size;
+}
+
+// Aligns the beginning of a channel data buffer. There must be
+// kChannelDataAlignment-1 extra bytes beyond the end to potentially use after
+// aligning it.
+inline char *RoundChannelData(char *data, size_t size) {
+  const uintptr_t data_value = reinterpret_cast<uintptr_t>(data);
+  const uintptr_t data_end = data_value + size;
+  const uintptr_t data_end_max = data_end + (kChannelDataAlignment - 1);
+  const uintptr_t rounded_data_end =
+      data_end_max - (data_end_max % kChannelDataAlignment);
+  const uintptr_t rounded_data = rounded_data_end - size;
+  return reinterpret_cast<char *>(rounded_data);
+}
+
+}  // namespace aos
+
+#endif  // AOS_IPC_LIB_DATA_ALIGNMENT_H_
diff --git a/aos/ipc_lib/lockless_queue.cc b/aos/ipc_lib/lockless_queue.cc
index 903150b..c323b8b 100644
--- a/aos/ipc_lib/lockless_queue.cc
+++ b/aos/ipc_lib/lockless_queue.cc
@@ -241,7 +241,8 @@
 
 size_t LocklessQueueConfiguration::message_size() const {
   // Round up the message size so following data is aligned appropriately.
-  return LocklessQueueMemory::AlignmentRoundUp(message_data_size) +
+  return LocklessQueueMemory::AlignmentRoundUp(message_data_size +
+                                               (kChannelDataAlignment - 1)) +
          sizeof(Message);
 }
 
@@ -549,7 +550,7 @@
   Message *message = memory_->GetMessage(scratch_index);
   message->header.queue_index.Invalidate();
 
-  return &message->data[0];
+  return message->data(memory_->message_data_size());
 }
 
 void LocklessQueue::Sender::Send(
@@ -788,7 +789,7 @@
   }
   *monotonic_remote_time = m->header.monotonic_remote_time;
   *realtime_remote_time = m->header.realtime_remote_time;
-  memcpy(data, &m->data[0], message_data_size());
+  memcpy(data, m->data(memory_->message_data_size()), message_data_size());
   *length = m->header.length;
 
   // And finally, confirm that the message *still* points to the queue index we
@@ -891,8 +892,9 @@
     ::std::cout << "      }" << ::std::endl;
     ::std::cout << "      data: {";
 
+    const char *const m_data = m->data(memory->message_data_size());
     for (size_t j = 0; j < m->header.length; ++j) {
-      char data = m->data[j];
+      char data = m_data[j];
       if (j != 0) {
         ::std::cout << " ";
       }
diff --git a/aos/ipc_lib/lockless_queue.h b/aos/ipc_lib/lockless_queue.h
index 976f758..0384aa8 100644
--- a/aos/ipc_lib/lockless_queue.h
+++ b/aos/ipc_lib/lockless_queue.h
@@ -7,6 +7,7 @@
 #include <vector>
 
 #include "aos/ipc_lib/aos_sync.h"
+#include "aos/ipc_lib/data_alignment.h"
 #include "aos/ipc_lib/index.h"
 #include "aos/time/time.h"
 
@@ -76,7 +77,19 @@
     size_t length;
   } header;
 
-  char data[];
+  char *data(size_t message_size) { return RoundedData(message_size); }
+  const char *data(size_t message_size) const {
+    return RoundedData(message_size);
+  }
+
+ private:
+  // This returns a non-const pointer into a const object. Be very careful about
+  // const correctness in publicly accessible APIs using it.
+  char *RoundedData(size_t message_size) const {
+    return RoundChannelData(const_cast<char *>(&data_pointer[0]), message_size);
+  }
+
+  char data_pointer[];
 };
 
 struct LocklessQueueConfiguration {
diff --git a/aos/ipc_lib/lockless_queue_memory.h b/aos/ipc_lib/lockless_queue_memory.h
index 0c0973c..cbe76a7 100644
--- a/aos/ipc_lib/lockless_queue_memory.h
+++ b/aos/ipc_lib/lockless_queue_memory.h
@@ -89,18 +89,24 @@
 
   // Getters for each of the 4 lists.
   Sender *GetSender(size_t sender_index) {
+    static_assert(alignof(Sender) <= kDataAlignment,
+                  "kDataAlignment is too small");
     return reinterpret_cast<Sender *>(&data[0] + SizeOfQueue() +
                                       SizeOfMessages() + SizeOfWatchers() +
                                       sender_index * sizeof(Sender));
   }
 
   Watcher *GetWatcher(size_t watcher_index) {
+    static_assert(alignof(Watcher) <= kDataAlignment,
+                  "kDataAlignment is too small");
     return reinterpret_cast<Watcher *>(&data[0] + SizeOfQueue() +
                                        SizeOfMessages() +
                                        watcher_index * sizeof(Watcher));
   }
 
   AtomicIndex *GetQueue(uint32_t index) {
+    static_assert(alignof(AtomicIndex) <= kDataAlignment,
+                  "kDataAlignment is too small");
     return reinterpret_cast<AtomicIndex *>(&data[0] +
                                            sizeof(AtomicIndex) * index);
   }
@@ -109,6 +115,8 @@
   // sender list, since those are messages available to be filled in and sent.
   // This removes the need to find lost messages when a sender dies.
   Message *GetMessage(Index index) {
+    static_assert(alignof(Message) <= kDataAlignment,
+                  "kDataAlignment is too small");
     return reinterpret_cast<Message *>(&data[0] + SizeOfQueue() +
                                        index.message_index() * message_size());
   }
diff --git a/aos/scoped/scoped_ptr.h b/aos/scoped/scoped_ptr.h
deleted file mode 100644
index 336e73b..0000000
--- a/aos/scoped/scoped_ptr.h
+++ /dev/null
@@ -1,43 +0,0 @@
-#ifndef AOS_SCOPED_PTR_H_
-#define AOS_SCOPED_PTR_H_
-
-#include "aos/macros.h"
-
-namespace aos {
-
-// A simple scoped_ptr implementation that works under both linux and vxworks.
-template<typename T>
-class scoped_ptr {
- public:
-  typedef T element_type;
-  
-  explicit scoped_ptr(T *p = NULL) : p_(p) {}
-  ~scoped_ptr() {
-    delete p_;
-  }
-
-  T &operator*() const { return *p_; }
-  T *operator->() const { return p_; }
-  T *get() const { return p_; }
-
-  operator bool() const { return p_ != NULL; }
-
-  void swap(scoped_ptr<T> &other) {
-    T *temp = other.p_;
-    other.p_ = p_;
-    p_ = other.p_;
-  }
-  void reset(T *p = NULL) {
-    if (p_ != NULL) delete p_;
-    p_ = p;
-  }
-
- private:
-  T *p_;
-
-  DISALLOW_COPY_AND_ASSIGN(scoped_ptr<T>);
-};
-
-}  // namespace aos
-
-#endif  // AOS_SCOPED_PTR_H_
diff --git a/aos/util/file.cc b/aos/util/file.cc
index af1f85b..b334ded 100644
--- a/aos/util/file.cc
+++ b/aos/util/file.cc
@@ -1,6 +1,8 @@
 #include "aos/util/file.h"
 
 #include <fcntl.h>
+#include <sys/stat.h>
+#include <sys/types.h>
 #include <unistd.h>
 
 #include <string_view>
@@ -46,5 +48,23 @@
   }
 }
 
+void MkdirP(std::string_view path, mode_t mode) {
+  auto last_slash_pos = path.find_last_of("/");
+
+  std::string folder(last_slash_pos == std::string_view::npos
+                         ? std::string_view("")
+                         : path.substr(0, last_slash_pos));
+  if (folder.empty()) return;
+  MkdirP(folder, mode);
+  const int result = mkdir(folder.c_str(), mode);
+  if (result == -1 && errno == EEXIST) {
+    VLOG(2) << folder << " already exists";
+    return;
+  } else {
+    VLOG(1) << "Created " << folder;
+  }
+  PCHECK(result == 0) << ": Error creating " << folder;
+}
+
 }  // namespace util
 }  // namespace aos
diff --git a/aos/util/file.h b/aos/util/file.h
index 3aebd87..d6724af 100644
--- a/aos/util/file.h
+++ b/aos/util/file.h
@@ -15,6 +15,8 @@
 void WriteStringToFileOrDie(const std::string_view filename,
                             const std::string_view contents);
 
+void MkdirP(std::string_view path, mode_t mode);
+
 }  // namespace util
 }  // namespace aos
 
diff --git a/debian/matplotlib.bzl b/debian/matplotlib.bzl
index be520f3..7032745 100644
--- a/debian/matplotlib.bzl
+++ b/debian/matplotlib.bzl
@@ -100,146 +100,146 @@
 }
 
 def build_matplotlib(version, tkinter_py_version = None, copy_shared_files = True):
-  """Creates a py_library rule for matplotlib for the given python version.
+    """Creates a py_library rule for matplotlib for the given python version.
 
-  See debian/matplotlib.BUILD for the usage.
+    See debian/matplotlib.BUILD for the usage.
 
-  All the rules generated by this will be suffixed by version. Only one
-  instance of this macro should set copy_shared_files, which generate the
-  files that are shared between python versions.
+    All the rules generated by this will be suffixed by version. Only one
+    instance of this macro should set copy_shared_files, which generate the
+    files that are shared between python versions.
 
-  tkinter_py_version is used because for the Python3 instance, some files
-  are in folders named python3 and some are in folders named python3.5...
+    tkinter_py_version is used because for the Python3 instance, some files
+    are in folders named python3 and some are in folders named python3.5...
 
-  version numbers should both be strings.
-  """
-  if tkinter_py_version == None:
-    tkinter_py_version = version
+    version numbers should both be strings.
+    """
+    if tkinter_py_version == None:
+        tkinter_py_version = version
 
-  native.genrule(
-      name = "patch_init" + version,
-      srcs = [
-          "usr/lib/python" + version + "/dist-packages/matplotlib/__init__.py",
-          "@//debian:matplotlib_patches",
-      ],
-      outs = [version + "/matplotlib/__init__.py"],
-      cmd = " && ".join([
-          "cp $(location usr/lib/python" + version + "/dist-packages/matplotlib/__init__.py) $@",
-          "readonly PATCH=\"$$(readlink -f $(location @patch))\"",
-          "readonly FILE=\"$$(readlink -f $(location @//debian:matplotlib_patches))\"",
-          "(cd $(@D) && \"$${PATCH}\" -p1 < \"$${FILE}\") > /dev/null",
-      ]),
-      tools = [
-          "@patch",
-      ],
-  )
-
-  _src_files = native.glob(
-      include = ["usr/lib/python" + version + "/dist-packages/**/*.py"],
-      exclude = [
-          "usr/lib/python" + version + "/dist-packages/matplotlib/__init__.py",
-      ],
-  )
-
-  _data_files = native.glob([
-      "usr/share/matplotlib/mpl-data/**",
-      "usr/share/tcltk/**",
-  ])
-
-  _src_copied = ["/".join([version] + f.split("/")[4:]) for f in _src_files]
-
-  _builtin_so_files = native.glob([
-      "usr/lib/python" + version + "/dist-packages/**/*x86_64-linux-gnu.so",
-      "usr/lib/python" + tkinter_py_version + "/lib-dynload/*.so",
-  ])
-
-  _system_so_files = native.glob([
-      "usr/lib/**/*.so*",
-      "lib/x86_64-linux-gnu/**/*.so*",
-  ])
-
-  _builtin_so_copied = ["/".join([version] + f.split("/")[4:]) for f in _builtin_so_files]
-
-  rpath_prefix = "rpathed" + version + "/"
-
-  _system_so_copied = [rpath_prefix + f for f in _system_so_files]
-
-  _builtin_rpaths = [":".join([
-      "\\$$ORIGIN/%s" % rel,
-      "\\$$ORIGIN/%s/%s/usr/lib/x86_64-linux-gnu" % (rel, rpath_prefix),
-      "\\$$ORIGIN/%s/%s/usr/lib" % (rel, rpath_prefix),
-      "\\$$ORIGIN/%s/%s/lib/x86_64-linux-gnu" % (rel, rpath_prefix),
-  ]) for rel in ["/".join([".." for _ in so.split("/")[1:]]) for so in _builtin_so_copied]]
-
-  _system_rpaths = [":".join([
-      "\\$$ORIGIN/%s/%s/usr/lib/x86_64-linux-gnu" % (rel, rpath_prefix),
-      "\\$$ORIGIN/%s/%s/lib/x86_64-linux-gnu" % (rel, rpath_prefix),
-  ]) for rel in ["/".join([".." for _ in so.split("/")[1:]]) for so in _system_so_copied]]
-
-  native.genrule(
-      name = "run_patchelf_builtin" + version,
-      srcs = _builtin_so_files,
-      outs = _builtin_so_copied,
-      cmd = "\n".join(
-          [
-              "cp $(location %s) $(location %s)" % (src, dest)
-              for src, dest in zip(_builtin_so_files, _builtin_so_copied)
-          ] +
-          ["$(location @patchelf) --set-rpath %s $(location %s)" % (rpath, so) for rpath, so in zip(_builtin_rpaths, _builtin_so_copied)],
-      ),
-      tools = [
-          "@patchelf",
-      ],
-  )
-
-  native.genrule(
-      name = "run_patchelf_system" + version,
-      srcs = _system_so_files,
-      outs = _system_so_copied,
-      cmd = "\n".join(
-          [
-              "cp $(location %s) $(location %s)" % (src, dest)
-              for src, dest in zip(_system_so_files, _system_so_copied)
-          ] +
-          ["$(location @patchelf) --set-rpath %s $(location %s)" % (rpath, so) for rpath, so in zip(_system_rpaths, _system_so_copied)],
-      ),
-      tools = [
-          "@patchelf",
-      ],
-  )
-
-  native.genrule(
-      name = "copy_files" + version,
-      srcs = _src_files,
-      outs = _src_copied,
-      cmd = " && ".join(["cp $(location %s) $(location %s)" % (src, dest) for src, dest in zip(
-          _src_files,
-          _src_copied,
-      )]),
-  )
-
-  if copy_shared_files:
     native.genrule(
-        name = "create_rc" + version,
-        outs = ["usr/share/matplotlib/mpl-data/matplotlibrc"],
-        cmd = "\n".join([
-            "cat > $@ << END",
-            # This is necessary to make matplotlib actually plot things to the
-            # screen by default.
-            "backend      : TkAgg",
-            "END",
+        name = "patch_init" + version,
+        srcs = [
+            "usr/lib/python" + version + "/dist-packages/matplotlib/__init__.py",
+            "@//debian:matplotlib_patches",
+        ],
+        outs = [version + "/matplotlib/__init__.py"],
+        cmd = " && ".join([
+            "cp $(location usr/lib/python" + version + "/dist-packages/matplotlib/__init__.py) $@",
+            "readonly PATCH=\"$$(readlink -f $(location @patch))\"",
+            "readonly FILE=\"$$(readlink -f $(location @//debian:matplotlib_patches))\"",
+            "(cd $(@D) && \"$${PATCH}\" -p1 < \"$${FILE}\") > /dev/null",
         ]),
+        tools = [
+            "@patch",
+        ],
     )
 
-  native.py_library(
-      name = "matplotlib" + version,
-      srcs = _src_copied + [
-          version + "/matplotlib/__init__.py",
-      ],
-      data = _data_files + _builtin_so_copied + _system_so_copied + [
-          ":usr/share/matplotlib/mpl-data/matplotlibrc",
-      ] + native.glob(["etc/**"]),
-      imports = ["usr/lib/python" + version + "/dist-packages", version, "."],
-      restricted_to = ["@//tools:k8"],
-      visibility = ["//visibility:public"],
-  )
+    _src_files = native.glob(
+        include = ["usr/lib/python" + version + "/dist-packages/**/*.py"],
+        exclude = [
+            "usr/lib/python" + version + "/dist-packages/matplotlib/__init__.py",
+        ],
+    )
+
+    _data_files = native.glob([
+        "usr/share/matplotlib/mpl-data/**",
+        "usr/share/tcltk/**",
+    ])
+
+    _src_copied = ["/".join([version] + f.split("/")[4:]) for f in _src_files]
+
+    _builtin_so_files = native.glob([
+        "usr/lib/python" + version + "/dist-packages/**/*x86_64-linux-gnu.so",
+        "usr/lib/python" + tkinter_py_version + "/lib-dynload/*.so",
+    ])
+
+    _system_so_files = native.glob([
+        "usr/lib/**/*.so*",
+        "lib/x86_64-linux-gnu/**/*.so*",
+    ])
+
+    _builtin_so_copied = ["/".join([version] + f.split("/")[4:]) for f in _builtin_so_files]
+
+    rpath_prefix = "rpathed" + version + "/"
+
+    _system_so_copied = [rpath_prefix + f for f in _system_so_files]
+
+    _builtin_rpaths = [":".join([
+        "\\$$ORIGIN/%s" % rel,
+        "\\$$ORIGIN/%s/%s/usr/lib/x86_64-linux-gnu" % (rel, rpath_prefix),
+        "\\$$ORIGIN/%s/%s/usr/lib" % (rel, rpath_prefix),
+        "\\$$ORIGIN/%s/%s/lib/x86_64-linux-gnu" % (rel, rpath_prefix),
+    ]) for rel in ["/".join([".." for _ in so.split("/")[1:]]) for so in _builtin_so_copied]]
+
+    _system_rpaths = [":".join([
+        "\\$$ORIGIN/%s/%s/usr/lib/x86_64-linux-gnu" % (rel, rpath_prefix),
+        "\\$$ORIGIN/%s/%s/lib/x86_64-linux-gnu" % (rel, rpath_prefix),
+    ]) for rel in ["/".join([".." for _ in so.split("/")[1:]]) for so in _system_so_copied]]
+
+    native.genrule(
+        name = "run_patchelf_builtin" + version,
+        srcs = _builtin_so_files,
+        outs = _builtin_so_copied,
+        cmd = "\n".join(
+            [
+                "cp $(location %s) $(location %s)" % (src, dest)
+                for src, dest in zip(_builtin_so_files, _builtin_so_copied)
+            ] +
+            ["$(location @patchelf) --set-rpath %s $(location %s)" % (rpath, so) for rpath, so in zip(_builtin_rpaths, _builtin_so_copied)],
+        ),
+        tools = [
+            "@patchelf",
+        ],
+    )
+
+    native.genrule(
+        name = "run_patchelf_system" + version,
+        srcs = _system_so_files,
+        outs = _system_so_copied,
+        cmd = "\n".join(
+            [
+                "cp $(location %s) $(location %s)" % (src, dest)
+                for src, dest in zip(_system_so_files, _system_so_copied)
+            ] +
+            ["$(location @patchelf) --set-rpath %s $(location %s)" % (rpath, so) for rpath, so in zip(_system_rpaths, _system_so_copied)],
+        ),
+        tools = [
+            "@patchelf",
+        ],
+    )
+
+    native.genrule(
+        name = "copy_files" + version,
+        srcs = _src_files,
+        outs = _src_copied,
+        cmd = " && ".join(["cp $(location %s) $(location %s)" % (src, dest) for src, dest in zip(
+            _src_files,
+            _src_copied,
+        )]),
+    )
+
+    if copy_shared_files:
+        native.genrule(
+            name = "create_rc" + version,
+            outs = ["usr/share/matplotlib/mpl-data/matplotlibrc"],
+            cmd = "\n".join([
+                "cat > $@ << END",
+                # This is necessary to make matplotlib actually plot things to the
+                # screen by default.
+                "backend      : TkAgg",
+                "END",
+            ]),
+        )
+
+    native.py_library(
+        name = "matplotlib" + version,
+        srcs = _src_copied + [
+            version + "/matplotlib/__init__.py",
+        ],
+        data = _data_files + _builtin_so_copied + _system_so_copied + [
+            ":usr/share/matplotlib/mpl-data/matplotlibrc",
+        ] + native.glob(["etc/**"]),
+        imports = ["usr/lib/python" + version + "/dist-packages", version, "."],
+        restricted_to = ["@//tools:k8"],
+        visibility = ["//visibility:public"],
+    )
diff --git a/debian/packages.bzl b/debian/packages.bzl
index ba1f840..3043452 100644
--- a/debian/packages.bzl
+++ b/debian/packages.bzl
@@ -55,14 +55,14 @@
     )
 
 def _convert_deb_to_target(deb):
-  """Converts a debian package filename to a valid bazel target name."""
-  target = deb
-  target = target.replace('-', '_')
-  target = target.replace('.', '_')
-  target = target.replace(':', '_')
-  target = target.replace('+', 'x')
-  target = target.replace('~', '_')
-  return "deb_%s_repo" % target
+    """Converts a debian package filename to a valid bazel target name."""
+    target = deb
+    target = target.replace("-", "_")
+    target = target.replace(".", "_")
+    target = target.replace(":", "_")
+    target = target.replace("+", "x")
+    target = target.replace("~", "_")
+    return "deb_%s_repo" % target
 
 def generate_repositories_for_debs(files, base_url = "http://www.frc971.org/Build-Dependencies"):
     """A WORKSPACE helper to add all the deb packages in the dictionary as a repo.
diff --git a/debian/webrtc.BUILD b/debian/webrtc.BUILD
new file mode 100644
index 0000000..e90e268
--- /dev/null
+++ b/debian/webrtc.BUILD
@@ -0,0 +1,18 @@
+load("@//tools/build_rules:select.bzl", "cpu_select")
+
+cc_library(
+    name = "webrtc",
+    visibility = ["//visibility:public"],
+    hdrs = glob(["include/**/*.h"]),
+    srcs = cpu_select({
+        "arm": ["lib/arm/Release/libwebrtc_full.a"],
+        "else": ["lib/x64/Release/libwebrtc_full.a"],
+    }),
+    includes = ["include"],
+    deps = [
+        "@com_google_absl//absl/strings",
+        "@com_google_absl//absl/types:optional",
+        "@com_google_absl//absl/types:variant",
+        "@com_google_absl//absl/algorithm:container",
+    ],
+)
diff --git a/third_party/BUILD b/third_party/BUILD
index a4e236f..13f015e 100644
--- a/third_party/BUILD
+++ b/third_party/BUILD
@@ -1,3 +1,5 @@
+load("@//tools/build_rules:select.bzl", "cpu_select")
+
 cc_library(
     name = "wpilib",
     linkstatic = True,
@@ -86,3 +88,15 @@
         "//conditions:default": [],
     }),
 )
+
+cc_library(
+    name = "webrtc",
+    visibility = ["//visibility:public"],
+    deps = cpu_select({
+        "amd64": ["@webrtc_x64//:webrtc"],
+        "armhf": ["@webrtc_arm//:webrtc"],
+        "cortex-m": ["@webrtc_arm//:webrtc"],
+        "roborio": ["@webrtc_rio//:webrtc"],
+    }),
+)
+