Initial message_bridge client and server

These will forward data, and track what made it across and what didn't
when configured correctly.  This should be off if nothing is requested
to be logged remotely.

It implements ttl, reconnects, and has a basic smoke test.

We still need to handle forwarding data for logging.

Change-Id: I7daebe8cef54029a5733b7f81ee6b68367c80d82
diff --git a/aos/config.bzl b/aos/config.bzl
index 3f78422..0766329 100644
--- a/aos/config.bzl
+++ b/aos/config.bzl
@@ -2,7 +2,7 @@
 
 AosConfigInfo = provider(fields = ["transitive_flatbuffers", "transitive_src"])
 
-def aos_config(name, src, flatbuffers, deps = [], visibility = None):
+def aos_config(name, src, flatbuffers = [], deps = [], visibility = None):
     _aos_config(
         name = name,
         src = src,
diff --git a/aos/configuration.cc b/aos/configuration.cc
index 46f07ac..a40c47c 100644
--- a/aos/configuration.cc
+++ b/aos/configuration.cc
@@ -7,6 +7,8 @@
 #include <arpa/inet.h>
 #include <ifaddrs.h>
 #include <unistd.h>
+
+#include <set>
 #include <string_view>
 
 #include "absl/container/btree_set.h"
@@ -730,5 +732,57 @@
              << static_cast<int>(connection->timestamp_logger());
 }
 
+std::vector<std::string_view> SourceNodeNames(const Configuration *config,
+                                              const Node *my_node) {
+  std::set<std::string_view> result_set;
+
+  for (const Channel *channel : *config->channels()) {
+    if (channel->has_destination_nodes()) {
+      for (const Connection *connection : *channel->destination_nodes()) {
+        if (connection->name()->string_view() ==
+            my_node->name()->string_view()) {
+          result_set.insert(channel->source_node()->string_view());
+        }
+      }
+    }
+  }
+
+  std::vector<std::string_view> result;
+  for (const std::string_view source : result_set) {
+    VLOG(1) << "Found a source node of " << source;
+    result.emplace_back(source);
+  }
+  return result;
+}
+
+std::vector<std::string_view> DestinationNodeNames(const Configuration *config,
+                                                   const Node *my_node) {
+  std::vector<std::string_view> result;
+
+  for (const Channel *channel : *config->channels()) {
+    if (channel->has_source_node() && channel->source_node()->string_view() ==
+                                          my_node->name()->string_view()) {
+      if (!channel->has_destination_nodes()) continue;
+
+      if (channel->source_node()->string_view() !=
+          my_node->name()->string_view()) {
+        continue;
+      }
+
+      for (const Connection *connection : *channel->destination_nodes()) {
+        if (std::find(result.begin(), result.end(),
+                      connection->name()->string_view()) == result.end()) {
+          result.emplace_back(connection->name()->string_view());
+        }
+      }
+    }
+  }
+
+  for (const std::string_view destination : result) {
+    VLOG(1) << "Found a destination node of " << destination;
+  }
+  return result;
+}
+
 }  // namespace configuration
 }  // namespace aos
diff --git a/aos/configuration.h b/aos/configuration.h
index 7bf8203..55b64c9 100644
--- a/aos/configuration.h
+++ b/aos/configuration.h
@@ -80,6 +80,14 @@
 // Prints a channel to json, but without the schema.
 std::string CleanedChannelToString(const Channel *channel);
 
+// Returns the node names that this node should be forwarding to.
+std::vector<std::string_view> DestinationNodeNames(const Configuration *config,
+                                                   const Node *my_node);
+
+// Returns the node names that this node should be receiving messages from.
+std::vector<std::string_view> SourceNodeNames(const Configuration *config,
+                                              const Node *my_node);
+
 // TODO(austin): GetSchema<T>(const Flatbuffer<Configuration> &config);
 
 }  // namespace configuration
diff --git a/aos/configuration_test.cc b/aos/configuration_test.cc
index fab6745..b8fc5b6 100644
--- a/aos/configuration_test.cc
+++ b/aos/configuration_test.cc
@@ -7,6 +7,7 @@
 #include "flatbuffers/reflection.h"
 #include "glog/logging.h"
 #include "gtest/gtest.h"
+#include "gmock/gmock.h"
 
 namespace aos {
 namespace configuration {
@@ -567,6 +568,37 @@
       &logged_on_both_channel.message(), &baz_node.message(),
       &baz_node.message()));
 }
+
+// Tests that we can deduce source nodes from a multinode config.
+TEST_F(ConfigurationTest, SourceNodeNames) {
+  FlatbufferDetachedBuffer<Configuration> config =
+      ReadConfig("aos/testdata/config1_multinode.json");
+
+  // This is a bit simplistic in that it doesn't test deduplication, but it does
+  // exercise a lot of the logic.
+  EXPECT_THAT(
+      SourceNodeNames(&config.message(), config.message().nodes()->Get(0)),
+      ::testing::ElementsAreArray({"pi2"}));
+  EXPECT_THAT(
+      SourceNodeNames(&config.message(), config.message().nodes()->Get(1)),
+      ::testing::ElementsAreArray({"pi1"}));
+}
+
+// Tests that we can deduce destination nodes from a multinode config.
+TEST_F(ConfigurationTest, DestinationNodeNames) {
+  FlatbufferDetachedBuffer<Configuration> config =
+      ReadConfig("aos/testdata/config1_multinode.json");
+
+  // This is a bit simplistic in that it doesn't test deduplication, but it does
+  // exercise a lot of the logic.
+  EXPECT_THAT(
+      DestinationNodeNames(&config.message(), config.message().nodes()->Get(0)),
+      ::testing::ElementsAreArray({"pi2"}));
+  EXPECT_THAT(
+      DestinationNodeNames(&config.message(), config.message().nodes()->Get(1)),
+      ::testing::ElementsAreArray({"pi1"}));
+}
+
 }  // namespace testing
 }  // namespace configuration
 }  // namespace aos
diff --git a/aos/events/BUILD b/aos/events/BUILD
index e40ab99..79200bd 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -121,6 +121,8 @@
     flatbuffers = [
         ":ping_fbs",
         ":pong_fbs",
+        "//aos/network:message_bridge_client_fbs",
+        "//aos/network:message_bridge_server_fbs",
     ],
     deps = [":config"],
 )
diff --git a/aos/events/logging/BUILD b/aos/events/logging/BUILD
index 97d5783..b2be972 100644
--- a/aos/events/logging/BUILD
+++ b/aos/events/logging/BUILD
@@ -1,4 +1,5 @@
 load("@com_github_google_flatbuffers//:build_defs.bzl", "flatbuffer_cc_library")
+load("//aos:config.bzl", "aos_config")
 
 flatbuffer_cc_library(
     name = "logger_fbs",
@@ -84,11 +85,21 @@
     ],
 )
 
+aos_config(
+    name = "multinode_pingpong_config",
+    src = "multinode_pingpong.json",
+    flatbuffers = [
+        "//aos/events:ping_fbs",
+        "//aos/events:pong_fbs",
+    ],
+    deps = ["//aos/events:config"],
+)
+
 cc_test(
     name = "logger_test",
     srcs = ["logger_test.cc"],
     data = [
-        "//aos/events:multinode_pingpong_config.json",
+        ":multinode_pingpong_config.json",
         "//aos/events:pingpong_config.json",
     ],
     deps = [
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 91190e7..70c520d 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -144,7 +144,7 @@
  public:
   MultinodeLoggerTest()
       : config_(aos::configuration::ReadConfig(
-            "aos/events/multinode_pingpong_config.json")),
+            "aos/events/logging/multinode_pingpong_config.json")),
         event_loop_factory_(&config_.message(), "pi1"),
         ping_event_loop_(event_loop_factory_.MakeEventLoop("ping")),
         ping_(ping_event_loop_.get()) {}
diff --git a/aos/events/logging/multinode_pingpong.json b/aos/events/logging/multinode_pingpong.json
new file mode 100644
index 0000000..f85a4e1
--- /dev/null
+++ b/aos/events/logging/multinode_pingpong.json
@@ -0,0 +1,93 @@
+{
+  "channels": [
+    /* Logged on pi1 locally */
+    {
+      "name": "/aos/pi1",
+      "type": "aos.timing.Report",
+      "source_node": "pi1",
+      "frequency": 50,
+      "num_senders": 20,
+      "max_size": 2048
+    },
+    {
+      "name": "/aos/pi2",
+      "type": "aos.timing.Report",
+      "source_node": "pi2",
+      "frequency": 50,
+      "num_senders": 20,
+      "max_size": 2048
+    },
+    /* Forwarded to pi2.
+     * Doesn't matter where timestamps are logged for the test.
+     */
+    {
+      "name": "/test",
+      "type": "aos.examples.Ping",
+      "source_node": "pi1",
+      "destination_nodes": [
+        {
+          "name": "pi2",
+          "priority": 1,
+          "timestamp_logger": "REMOTE_LOGGER",
+          "timestamp_logger_node": "pi1",
+          "time_to_live": 5000000
+        }
+      ]
+    },
+    /* Forwarded back to pi1.
+     * The message is logged both on the sending node and the receiving node
+     * (to make it easier to look at the results for now).
+     *
+     * The timestamps are logged on the receiving node.
+     */
+    {
+      "name": "/test",
+      "type": "aos.examples.Pong",
+      "source_node": "pi2",
+      "logger": "LOCAL_AND_REMOTE_LOGGER",
+      "logger_node": "pi1",
+      "destination_nodes": [
+        {
+          "name": "pi1",
+          "priority": 1,
+          "timestamp_logger": "LOCAL_LOGGER",
+          "time_to_live": 5000000
+        }
+      ]
+    }
+  ],
+  "maps": [
+    {
+      "match": {
+        "name": "/aos",
+        "type": "aos.timing.Report",
+        "source_node": "pi1"
+      },
+      "rename": {
+        "name": "/aos/pi1"
+      }
+    },
+    {
+      "match": {
+        "name": "/aos",
+        "type": "aos.timing.Report",
+        "source_node": "pi2"
+      },
+      "rename": {
+        "name": "/aos/pi2"
+      }
+    }
+  ],
+  "nodes": [
+    {
+      "name": "pi1",
+      "hostname": "raspberrypi",
+      "port": 9971
+    },
+    {
+      "name": "pi2",
+      "hostname": "raspberrypi2",
+      "port": 9971
+    }
+  ]
+}
diff --git a/aos/events/multinode_pingpong.json b/aos/events/multinode_pingpong.json
index f0e532e..c0c5087 100644
--- a/aos/events/multinode_pingpong.json
+++ b/aos/events/multinode_pingpong.json
@@ -2,6 +2,54 @@
   "channels": [
     {
       "name": "/aos/pi1",
+      "type": "aos.message_bridge.ServerStatistics",
+      "source_node": "pi1",
+      "frequency": 2
+    },
+    {
+      "name": "/aos/pi2",
+      "type": "aos.message_bridge.ServerStatistics",
+      "source_node": "pi2",
+      "frequency": 2
+    },
+    {
+      "name": "/aos/pi3",
+      "type": "aos.message_bridge.ServerStatistics",
+      "source_node": "pi3",
+      "frequency": 2
+    },
+    {
+      "name": "/aos/roborio",
+      "type": "aos.message_bridge.ServerStatistics",
+      "source_node": "roborio",
+      "frequency": 2
+    },
+    {
+      "name": "/aos/pi1",
+      "type": "aos.message_bridge.ClientStatistics",
+      "source_node": "pi1",
+      "frequency": 2
+    },
+    {
+      "name": "/aos/pi2",
+      "type": "aos.message_bridge.ClientStatistics",
+      "source_node": "pi2",
+      "frequency": 2
+    },
+    {
+      "name": "/aos/pi3",
+      "type": "aos.message_bridge.ClientStatistics",
+      "source_node": "pi3",
+      "frequency": 2
+    },
+    {
+      "name": "/aos/roborio",
+      "type": "aos.message_bridge.ClientStatistics",
+      "source_node": "roborio",
+      "frequency": 2
+    },
+    {
+      "name": "/aos/pi1",
       "type": "aos.timing.Report",
       "source_node": "pi1",
       "frequency": 50,
@@ -25,6 +73,14 @@
       "max_size": 2048
     },
     {
+      "name": "/aos/roborio",
+      "type": "aos.timing.Report",
+      "source_node": "roborio",
+      "frequency": 50,
+      "num_senders": 20,
+      "max_size": 2048
+    },
+    {
       "name": "/test",
       "type": "aos.examples.Ping",
       "source_node": "pi1",
@@ -32,8 +88,8 @@
         {
           "name": "pi2",
           "priority": 1,
-          "timestamp_logger": "REMOTE_LOGGER",
-          "timestamp_logger_node": "pi1"
+          "timestamp_logger": "LOCAL_LOGGER",
+          "time_to_live": 5000000
         }
       ]
     },
@@ -41,14 +97,12 @@
       "name": "/test",
       "type": "aos.examples.Pong",
       "source_node": "pi2",
-      "logger": "LOCAL_AND_REMOTE_LOGGER",
-      "logger_node": "pi1",
       "destination_nodes": [
         {
           "name": "pi1",
           "priority": 1,
-          "timestamp_logger": "REMOTE_LOGGER",
-          "timestamp_logger_node": "pi1"
+          "timestamp_logger": "LOCAL_LOGGER",
+          "time_to_live": 5000000
         }
       ]
     },
@@ -60,8 +114,8 @@
         {
           "name": "pi3",
           "priority": 1,
-          "timestamp_logger": "REMOTE_LOGGER",
-          "timestamp_logger_node": "pi1"
+          "timestamp_logger": "LOCAL_LOGGER",
+          "time_to_live": 5000000
         }
       ]
     },
@@ -69,14 +123,12 @@
       "name": "/test2",
       "type": "aos.examples.Pong",
       "source_node": "pi3",
-      "logger": "LOCAL_AND_REMOTE_LOGGER",
-      "logger_node": "pi1",
       "destination_nodes": [
         {
           "name": "pi1",
           "priority": 1,
-          "timestamp_logger": "REMOTE_LOGGER",
-          "timestamp_logger_node": "pi1"
+          "timestamp_logger": "LOCAL_LOGGER",
+          "time_to_live": 5000000
         }
       ]
     }
@@ -111,6 +163,96 @@
       "rename": {
         "name": "/aos/pi3"
       }
+    },
+    {
+      "match": {
+        "name": "/aos",
+        "type": "aos.timing.Report",
+        "source_node": "roborio"
+      },
+      "rename": {
+        "name": "/aos/roborio"
+      }
+    },
+    {
+      "match": {
+        "name": "/aos",
+        "type": "aos.message_bridge.ServerStatistics",
+        "source_node": "pi1"
+      },
+      "rename": {
+        "name": "/aos/pi1"
+      }
+    },
+    {
+      "match": {
+        "name": "/aos",
+        "type": "aos.message_bridge.ServerStatistics",
+        "source_node": "pi2"
+      },
+      "rename": {
+        "name": "/aos/pi2"
+      }
+    },
+    {
+      "match": {
+        "name": "/aos",
+        "type": "aos.message_bridge.ServerStatistics",
+        "source_node": "pi3"
+      },
+      "rename": {
+        "name": "/aos/pi3"
+      }
+    },
+    {
+      "match": {
+        "name": "/aos",
+        "type": "aos.message_bridge.ServerStatistics",
+        "source_node": "roborio"
+      },
+      "rename": {
+        "name": "/aos/roborio"
+      }
+    },
+    {
+      "match": {
+        "name": "/aos",
+        "type": "aos.message_bridge.ClientStatistics",
+        "source_node": "pi1"
+      },
+      "rename": {
+        "name": "/aos/pi1"
+      }
+    },
+    {
+      "match": {
+        "name": "/aos",
+        "type": "aos.message_bridge.ClientStatistics",
+        "source_node": "pi2"
+      },
+      "rename": {
+        "name": "/aos/pi2"
+      }
+    },
+    {
+      "match": {
+        "name": "/aos",
+        "type": "aos.message_bridge.ClientStatistics",
+        "source_node": "pi3"
+      },
+      "rename": {
+        "name": "/aos/pi3"
+      }
+    },
+    {
+      "match": {
+        "name": "/aos",
+        "type": "aos.message_bridge.ClientStatistics",
+        "source_node": "roborio"
+      },
+      "rename": {
+        "name": "/aos/roborio"
+      }
     }
   ],
   "nodes": [
@@ -128,6 +270,11 @@
       "name": "pi3",
       "hostname": "raspberrypi3",
       "port": 9971
+    },
+    {
+      "name": "roborio",
+      "hostname": "roboRIO-6971-FRC",
+      "port": 9971
     }
   ],
   "applications": [
@@ -156,6 +303,32 @@
           }
         }
       ]
+    },
+    {
+      "name": "ping3",
+      "maps": [
+        {
+          "match": {
+            "name": "/test"
+          },
+          "rename": {
+            "name": "/test3"
+          }
+        }
+      ]
+    },
+    {
+      "name": "pong3",
+      "maps": [
+        {
+          "match": {
+            "name": "/test"
+          },
+          "rename": {
+            "name": "/test3"
+          }
+        }
+      ]
     }
   ]
 }
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index 3ced933..bdee4f2 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -21,10 +21,26 @@
 #include "aos/util/phased_loop.h"
 #include "glog/logging.h"
 
+namespace {
+
+// Returns the portion of the path after the last /.  This very much assumes
+// that the application name is null terminated.
+const char *Filename(const char *path) {
+  const std::string_view path_string_view = path;
+  auto last_slash_pos = path_string_view.find_last_of("/");
+
+  return last_slash_pos == std::string_view::npos ? path
+                                                  : path + last_slash_pos + 1;
+}
+
+}  // namespace
+
 DEFINE_string(shm_base, "/dev/shm/aos",
               "Directory to place queue backing mmaped files in.");
 DEFINE_uint32(permissions, 0770,
               "Permissions to make shared memory files and folders.");
+DEFINE_string(application_name, Filename(program_invocation_name),
+              "The application name");
 
 namespace aos {
 
@@ -135,15 +151,6 @@
 
 namespace {
 
-// Returns the portion of the path after the last /.
-std::string_view Filename(std::string_view path) {
-  auto last_slash_pos = path.find_last_of("/");
-
-  return last_slash_pos == std::string_view::npos
-             ? path
-             : path.substr(last_slash_pos + 1, path.size());
-}
-
 const Node *MaybeMyNode(const Configuration *configuration) {
   if (!configuration->has_nodes()) {
     return nullptr;
@@ -158,7 +165,7 @@
 
 ShmEventLoop::ShmEventLoop(const Configuration *configuration)
     : EventLoop(configuration),
-      name_(Filename(program_invocation_name)),
+      name_(FLAGS_application_name),
       node_(MaybeMyNode(configuration)) {
   if (configuration->has_nodes()) {
     CHECK(node_ != nullptr) << ": Couldn't find node in config.";
@@ -802,15 +809,16 @@
   }
 
   SignalHandler::global()->Unregister(this);
+
+  // Trigger any remaining senders or fetchers to be cleared before destroying
+  // the event loop so the book keeping matches.  Do this in the thread that
+  // created the timing reporter.
+  timing_report_sender_.reset();
 }
 
 void ShmEventLoop::Exit() { epoll_.Quit(); }
 
 ShmEventLoop::~ShmEventLoop() {
-  // Trigger any remaining senders or fetchers to be cleared before destroying
-  // the event loop so the book keeping matches.
-  timing_report_sender_.reset();
-
   // Force everything with a registered fd with epoll to be destroyed now.
   timers_.clear();
   phased_loops_.clear();
diff --git a/aos/events/shm_event_loop.h b/aos/events/shm_event_loop.h
index d10989e..bb4ad77 100644
--- a/aos/events/shm_event_loop.h
+++ b/aos/events/shm_event_loop.h
@@ -71,6 +71,8 @@
 
   int priority() const override { return priority_; }
 
+  internal::EPoll *epoll() { return &epoll_; }
+
  private:
   friend class internal::WatcherState;
   friend class internal::TimerHandlerState;
diff --git a/aos/flatbuffers.h b/aos/flatbuffers.h
index 4e4520c..4388687 100644
--- a/aos/flatbuffers.h
+++ b/aos/flatbuffers.h
@@ -5,6 +5,7 @@
 #include <string_view>
 
 #include "flatbuffers/flatbuffers.h"
+#include "glog/logging.h"
 
 namespace aos {
 
@@ -31,15 +32,19 @@
 
 // This class is a fixed memory allocator which holds the data for a flatbuffer
 // in an array.
-template <size_t S>
 class FixedAllocator : public FixedAllocatorBase {
  public:
+  FixedAllocator(size_t size) : buffer_(size, 0) {}
+
   uint8_t *data() override { return &buffer_[0]; }
   const uint8_t *data() const override { return &buffer_[0]; }
   size_t size() const override { return buffer_.size(); }
 
+  // Releases the data in the buffer.
+  std::vector<uint8_t> release() { return std::move(buffer_); }
+
  private:
-  std::array<uint8_t, S> buffer_;
+  std::vector<uint8_t> buffer_;
 };
 
 // This class adapts a preallocated memory region to an Allocator.
@@ -82,46 +87,6 @@
   virtual size_t size() const = 0;
 };
 
-// Array backed flatbuffer.
-template <typename T>
-class FlatbufferArray : public Flatbuffer<T> {
- public:
-  // Builds a Flatbuffer by copying the data from the other flatbuffer.
-  FlatbufferArray(const Flatbuffer<T> &other) {
-    CHECK_LE(other.size(), data_.size());
-
-    memcpy(data_.data(), other.data(), other.size());
-    size_ = other.size();
-  }
-
-  // Coppies the data from the other flatbuffer.
-  FlatbufferArray &operator=(const Flatbuffer<T> &other) {
-    CHECK_LE(other.size(), data_.size());
-
-    memcpy(data_.data(), other.data(), other.size());
-    size_ = other.size();
-    return *this;
-  }
-
-  virtual ~FlatbufferArray() override {}
-
-  // Creates a builder wrapping the underlying data.
-  flatbuffers::FlatBufferBuilder FlatBufferBuilder() {
-    data_.deallocate(data_.data(), data_.size());
-    flatbuffers::FlatBufferBuilder fbb(data_.size(), &data_);
-    fbb.ForceDefaults(1);
-    return fbb;
-  }
-
-  const uint8_t *data() const override { return data_.data(); }
-  uint8_t *data() override { return data_.data(); }
-  size_t size() const override { return size_; }
-
- private:
-  FixedAllocator<8 * 1024> data_;
-  size_t size_ = data_.size();
-};
-
 // String backed flatbuffer.
 template <typename T>
 class FlatbufferString : public Flatbuffer<T> {
@@ -228,6 +193,48 @@
   flatbuffers::DetachedBuffer buffer_;
 };
 
+// This object associates the message type with the memory storing the
+// flatbuffer.  This only stores root tables.
+//
+// From a usage point of view, pointers to the data are very different than
+// pointers to the tables.
+template <typename T>
+class SizePrefixedFlatbufferDetachedBuffer final : public Flatbuffer<T> {
+ public:
+  // Builds a Flatbuffer by taking ownership of the buffer.
+  SizePrefixedFlatbufferDetachedBuffer(flatbuffers::DetachedBuffer &&buffer)
+      : buffer_(::std::move(buffer)) {
+    CHECK_GE(buffer_.size(), sizeof(flatbuffers::uoffset_t));
+  }
+
+  // Builds a flatbuffer by taking ownership of the buffer from the other
+  // flatbuffer.
+  SizePrefixedFlatbufferDetachedBuffer(
+      SizePrefixedFlatbufferDetachedBuffer &&fb)
+      : buffer_(::std::move(fb.buffer_)) {}
+  SizePrefixedFlatbufferDetachedBuffer &operator=(
+      SizePrefixedFlatbufferDetachedBuffer &&fb) {
+    ::std::swap(buffer_, fb.buffer_);
+    return *this;
+  }
+
+  virtual ~SizePrefixedFlatbufferDetachedBuffer() override {}
+
+  // Returns references to the buffer, and the data.
+  const flatbuffers::DetachedBuffer &buffer() const { return buffer_; }
+  const uint8_t *data() const override {
+    return buffer_.data() + sizeof(flatbuffers::uoffset_t);
+  }
+  uint8_t *data() override {
+    return buffer_.data() + sizeof(flatbuffers::uoffset_t);
+  }
+  size_t size() const override {
+    return buffer_.size() - sizeof(flatbuffers::uoffset_t);
+  }
+
+ private:
+  flatbuffers::DetachedBuffer buffer_;
+};
 // TODO(austin): Need a way to get our hands on the max size.  Can start with
 // "large" for now.
 
diff --git a/aos/network/BUILD b/aos/network/BUILD
index 5d96183..9d3b3c4 100644
--- a/aos/network/BUILD
+++ b/aos/network/BUILD
@@ -1,5 +1,36 @@
+load("@com_github_google_flatbuffers//:build_defs.bzl", "flatbuffer_cc_library")
+load("//aos:config.bzl", "aos_config")
+
 package(default_visibility = ["//visibility:public"])
 
+flatbuffer_cc_library(
+    name = "connect_fbs",
+    srcs = ["connect.fbs"],
+    gen_reflections = 1,
+    includes = [
+        "//aos:configuration_fbs_includes",
+    ],
+)
+
+flatbuffer_cc_library(
+    name = "message_bridge_client_fbs",
+    srcs = ["message_bridge_client.fbs"],
+    gen_reflections = 1,
+    includes = [
+        ":message_bridge_server_fbs_includes",
+        "//aos:configuration_fbs_includes",
+    ],
+)
+
+flatbuffer_cc_library(
+    name = "message_bridge_server_fbs",
+    srcs = ["message_bridge_server.fbs"],
+    gen_reflections = 1,
+    includes = [
+        "//aos:configuration_fbs_includes",
+    ],
+)
+
 cc_library(
     name = "team_number",
     srcs = [
@@ -25,3 +56,182 @@
         "//aos/testing:googletest",
     ],
 )
+
+cc_library(
+    name = "sctp_lib",
+    srcs = [
+        "sctp_lib.cc",
+    ],
+    hdrs = [
+        "sctp_lib.h",
+    ],
+    copts = [
+        # The casts required to read datastructures from sockets trip -Wcast-align.
+        "-Wno-cast-align",
+    ],
+    deps = [
+        "//aos:unique_malloc_ptr",
+        "//third_party/lksctp-tools:sctp",
+        "@com_github_google_glog//:glog",
+    ],
+)
+
+cc_library(
+    name = "sctp_server",
+    srcs = [
+        "sctp_server.cc",
+    ],
+    hdrs = [
+        "sctp_server.h",
+    ],
+    copts = [
+        "-Wno-cast-align",
+    ],
+    deps = [
+        ":sctp_lib",
+        "//third_party/lksctp-tools:sctp",
+    ],
+)
+
+cc_library(
+    name = "message_bridge_protocol",
+    hdrs = [
+        "message_bridge_protocol.h",
+    ],
+)
+
+cc_library(
+    name = "message_bridge_server_lib",
+    srcs = [
+        "message_bridge_server_lib.cc",
+    ],
+    hdrs = [
+        "message_bridge_server_lib.h",
+    ],
+    copts = [
+        "-Wno-cast-align",
+    ],
+    deps = [
+        ":connect_fbs",
+        ":message_bridge_protocol",
+        ":message_bridge_server_fbs",
+        ":sctp_lib",
+        ":sctp_server",
+        "//aos:unique_malloc_ptr",
+        "//aos/events:shm_event_loop",
+        "//aos/events/logging:logger",
+        "//third_party/lksctp-tools:sctp",
+    ],
+)
+
+cc_binary(
+    name = "message_bridge_server",
+    srcs = [
+        "message_bridge_server.cc",
+    ],
+    deps = [
+        ":message_bridge_server_lib",
+        "//aos:init",
+        "//aos:json_to_flatbuffer",
+        "//aos/events:shm_event_loop",
+    ],
+)
+
+cc_library(
+    name = "sctp_client",
+    srcs = [
+        "sctp_client.cc",
+    ],
+    hdrs = [
+        "sctp_client.h",
+    ],
+    copts = [
+        "-Wno-cast-align",
+    ],
+    deps = [
+        ":sctp_lib",
+        "//third_party/lksctp-tools:sctp",
+    ],
+)
+
+cc_library(
+    name = "message_bridge_client_lib",
+    srcs = [
+        "message_bridge_client_lib.cc",
+    ],
+    hdrs = [
+        "message_bridge_client_lib.h",
+    ],
+    copts = [
+        "-Wno-cast-align",
+    ],
+    deps = [
+        ":connect_fbs",
+        ":message_bridge_client_fbs",
+        ":message_bridge_protocol",
+        ":message_bridge_server_fbs",
+        ":sctp_client",
+        "//aos/events:shm_event_loop",
+        "//aos/events/logging:logger",
+    ],
+)
+
+cc_binary(
+    name = "message_bridge_client",
+    srcs = [
+        "message_bridge_client.cc",
+    ],
+    copts = [
+        "-Wno-cast-align",
+    ],
+    deps = [
+        ":message_bridge_client_lib",
+        "//aos:init",
+        "//aos:json_to_flatbuffer",
+        "//aos/events:shm_event_loop",
+    ],
+)
+
+aos_config(
+    name = "message_bridge_test_common_config",
+    src = "message_bridge_test_common.json",
+    flatbuffers = [
+        "//aos/events:ping_fbs",
+        "//aos/events:pong_fbs",
+        "//aos/network:message_bridge_client_fbs",
+        "//aos/network:message_bridge_server_fbs",
+    ],
+    deps = ["//aos/events:config"],
+)
+
+aos_config(
+    name = "message_bridge_test_server_config",
+    src = "message_bridge_test_server.json",
+    deps = [":message_bridge_test_common_config"],
+)
+
+aos_config(
+    name = "message_bridge_test_client_config",
+    src = "message_bridge_test_client.json",
+    deps = [":message_bridge_test_common_config"],
+)
+
+cc_test(
+    name = "message_bridge_test",
+    srcs = [
+        "message_bridge_test.cc",
+    ],
+    data = [
+        ":message_bridge_test_client_config.json",
+        ":message_bridge_test_server_config.json",
+    ],
+    deps = [
+        ":message_bridge_client_lib",
+        ":message_bridge_server_lib",
+        "//aos:json_to_flatbuffer",
+        "//aos/events:ping_fbs",
+        "//aos/events:pong_fbs",
+        "//aos/events:shm_event_loop",
+        "//aos/testing:googletest",
+    ],
+)
diff --git a/aos/network/connect.fbs b/aos/network/connect.fbs
new file mode 100644
index 0000000..32893b8
--- /dev/null
+++ b/aos/network/connect.fbs
@@ -0,0 +1,13 @@
+include "aos/configuration.fbs";
+
+namespace aos.message_bridge;
+
+// This is the message sent to initiate a connection to a message_bridge.
+// It communicates the channels that need to be forwarded back.
+table Connect {
+  // The node making the request.
+  node:aos.Node;
+
+  // The channels that we want transfered to this client.
+  channels_to_transfer:[Channel];
+}
diff --git a/aos/network/message_bridge_client.cc b/aos/network/message_bridge_client.cc
new file mode 100644
index 0000000..c44eee0
--- /dev/null
+++ b/aos/network/message_bridge_client.cc
@@ -0,0 +1,36 @@
+#include "aos/network/message_bridge_client_lib.h"
+
+#include "aos/events/shm_event_loop.h"
+#include "aos/init.h"
+
+DEFINE_string(config, "multinode_pingpong_config.json", "Path to the config.");
+
+namespace aos {
+namespace message_bridge {
+
+int Main() {
+  aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+      aos::configuration::ReadConfig(FLAGS_config);
+
+  aos::ShmEventLoop event_loop(&config.message());
+
+  MessageBridgeClient app(&event_loop);
+
+  // TODO(austin): Save messages into a vector to be logged.  One file per
+  // channel?  Need to sort out ordering.
+  //
+  // TODO(austin): Low priority, "reliable" logging channel.
+
+  event_loop.Run();
+
+  return EXIT_SUCCESS;
+}
+
+}  // namespace message_bridge
+}  // namespace aos
+
+int main(int argc, char **argv) {
+  aos::InitGoogle(&argc, &argv);
+
+  return aos::message_bridge::Main();
+}
diff --git a/aos/network/message_bridge_client.fbs b/aos/network/message_bridge_client.fbs
new file mode 100644
index 0000000..58b653a
--- /dev/null
+++ b/aos/network/message_bridge_client.fbs
@@ -0,0 +1,24 @@
+include "aos/network/message_bridge_server.fbs";
+
+namespace aos.message_bridge;
+
+// Statistics from a single client connection to a server.
+table ClientConnection {
+  // The node that we are connected to.
+  node:Node;
+
+  // Health of this connection.  Connected or not?
+  state:State;
+
+  // Number of packets received on all channels.
+  received_packets:uint;
+
+  // TODO(austin): Per channel counts?
+}
+
+// Statistics for all clients.
+table ClientStatistics {
+  connections:[ClientConnection];
+}
+
+root_type ClientStatistics;
diff --git a/aos/network/message_bridge_client_lib.cc b/aos/network/message_bridge_client_lib.cc
new file mode 100644
index 0000000..3652db2
--- /dev/null
+++ b/aos/network/message_bridge_client_lib.cc
@@ -0,0 +1,361 @@
+#include "aos/network/message_bridge_client_lib.h"
+
+#include <chrono>
+#include <string_view>
+
+#include "aos/events/logging/logger.h"
+#include "aos/events/shm_event_loop.h"
+#include "aos/network/connect_generated.h"
+#include "aos/network/message_bridge_client_generated.h"
+#include "aos/network/message_bridge_protocol.h"
+#include "aos/network/sctp_client.h"
+#include "aos/unique_malloc_ptr.h"
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+
+// This application receives messages from another node and re-publishes them on
+// this node.
+//
+// To simulate packet loss for testing, run:
+//   tc qdisc add dev eth0 root netem loss random 10
+// To restore it, run:
+//   tc qdisc del dev eth0 root netem
+
+namespace aos {
+namespace message_bridge {
+namespace {
+namespace chrono = std::chrono;
+
+aos::FlatbufferDetachedBuffer<aos::message_bridge::Connect> MakeConnectMessage(
+    const Configuration *config, const Node *my_node,
+    std::string_view remote_name) {
+  CHECK(config->has_nodes()) << ": Config must have nodes to transfer.";
+
+  flatbuffers::FlatBufferBuilder fbb;
+
+  flatbuffers::Offset<Node> node_offset = CopyFlatBuffer<Node>(my_node, &fbb);
+  const std::string_view node_name = my_node->name()->string_view();
+
+  std::vector<flatbuffers::Offset<Channel>> channel_offsets;
+  for (const Channel *channel : *config->channels()) {
+    if (channel->has_destination_nodes()) {
+      for (const Connection *connection : *channel->destination_nodes()) {
+        if (connection->name()->string_view() == node_name &&
+            channel->source_node()->string_view() == remote_name) {
+          channel_offsets.emplace_back(CopyFlatBuffer<Channel>(channel, &fbb));
+        }
+      }
+    }
+  }
+
+  flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Channel>>>
+      channels_offset = fbb.CreateVector(channel_offsets);
+
+  Connect::Builder connect_builder(fbb);
+  connect_builder.add_channels_to_transfer(channels_offset);
+  connect_builder.add_node(node_offset);
+  fbb.Finish(connect_builder.Finish());
+
+  return fbb.Release();
+}
+
+std::vector<int> StreamToChannel(const Configuration *config,
+                                 const Node *my_node, const Node *other_node) {
+  std::vector<int> stream_to_channel;
+  int channel_index = 0;
+  for (const Channel *channel : *config->channels()) {
+    if (configuration::ChannelIsSendableOnNode(channel, other_node)) {
+      const Connection *connection =
+          configuration::ConnectionToNode(channel, my_node);
+      if (connection != nullptr) {
+        stream_to_channel.emplace_back(channel_index);
+      }
+    }
+    ++channel_index;
+  }
+
+  return stream_to_channel;
+}
+
+std::vector<bool> StreamReplyWithTimestamp(const Configuration *config,
+                                           const Node *my_node,
+                                           const Node *other_node) {
+  std::vector<bool> stream_reply_with_timestamp;
+  int channel_index = 0;
+  for (const Channel *channel : *config->channels()) {
+    if (configuration::ChannelIsSendableOnNode(channel, other_node)) {
+      const Connection *connection =
+          configuration::ConnectionToNode(channel, my_node);
+      if (connection != nullptr) {
+        // We want to reply with a timestamp if the other node is logging the
+        // timestamp (and it therefore needs the timestamp), or if we are
+        // logging the message and it needs to know if we received it so it can
+        // log (in the future) it through different mechanisms on failure.
+        stream_reply_with_timestamp.emplace_back(
+            configuration::ConnectionDeliveryTimeIsLoggedOnNode(connection,
+                                                                other_node) ||
+            configuration::ChannelMessageIsLoggedOnNode(channel, my_node));
+      }
+    }
+    ++channel_index;
+  }
+
+  return stream_reply_with_timestamp;
+}
+
+aos::FlatbufferDetachedBuffer<aos::logger::MessageHeader>
+MakeMessageHeaderReply() {
+  flatbuffers::FlatBufferBuilder fbb;
+  logger::MessageHeader::Builder message_header_builder(fbb);
+  message_header_builder.add_channel_index(0);
+  message_header_builder.add_monotonic_sent_time(0);
+  message_header_builder.add_monotonic_remote_time(0);
+  message_header_builder.add_realtime_remote_time(0);
+  message_header_builder.add_remote_queue_index(0);
+  fbb.Finish(message_header_builder.Finish());
+
+  return fbb.Release();
+}
+
+FlatbufferDetachedBuffer<ClientStatistics> MakeClientStatistics(
+    const std::vector<std::string_view> &source_node_names,
+    const Configuration *configuration) {
+  flatbuffers::FlatBufferBuilder fbb;
+
+  std::vector<flatbuffers::Offset<ClientConnection>> connection_offsets;
+  for (const std::string_view node_name : source_node_names) {
+    flatbuffers::Offset<Node> node_offset =
+        CopyFlatBuffer(configuration::GetNode(configuration, node_name), &fbb);
+    ClientConnection::Builder connection_builder(fbb);
+    connection_builder.add_node(node_offset);
+    connection_builder.add_state(State::DISCONNECTED);
+    // TODO(austin): Track dropped packets.
+    connection_builder.add_received_packets(0);
+    connection_offsets.emplace_back(connection_builder.Finish());
+  }
+  flatbuffers::Offset<
+      flatbuffers::Vector<flatbuffers::Offset<ClientConnection>>>
+      connections_offset = fbb.CreateVector(connection_offsets);
+
+  ClientStatistics::Builder client_statistics_builder(fbb);
+  client_statistics_builder.add_connections(connections_offset);
+  fbb.Finish(client_statistics_builder.Finish());
+
+  return fbb.Release();
+}
+
+}  // namespace
+
+SctpClientConnection::SctpClientConnection(
+    aos::ShmEventLoop *const event_loop, std::string_view remote_name,
+    const Node *my_node, std::string_view local_host,
+    std::vector<std::unique_ptr<aos::RawSender>> *channels,
+    ClientConnection *connection)
+    : event_loop_(event_loop),
+      connect_message_(MakeConnectMessage(event_loop->configuration(), my_node,
+                                          remote_name)),
+      message_reception_reply_(MakeMessageHeaderReply()),
+      remote_node_(CHECK_NOTNULL(
+          configuration::GetNode(event_loop->configuration(), remote_name))),
+      client_(remote_node_->hostname()->string_view(), remote_node_->port(),
+              connect_message_.message().channels_to_transfer()->size() +
+                  kControlStreams(),
+              local_host, 0),
+      channels_(channels),
+      stream_to_channel_(
+          StreamToChannel(event_loop->configuration(), my_node, remote_node_)),
+      stream_reply_with_timestamp_(StreamReplyWithTimestamp(
+          event_loop->configuration(), my_node, remote_node_)),
+      connection_(connection) {
+  VLOG(1) << "Connect request for " << remote_node_->name()->string_view()
+          << ": " << FlatbufferToJson(connect_message_);
+
+  connect_timer_ = event_loop_->AddTimer([this]() { SendConnect(); });
+  event_loop_->OnRun(
+      [this]() { connect_timer_->Setup(event_loop_->monotonic_now()); });
+
+  event_loop_->epoll()->OnReadable(client_.fd(),
+                                   [this]() { MessageReceived(); });
+}
+
+void SctpClientConnection::MessageReceived() {
+  // Dispatch the message to the correct receiver.
+  aos::unique_c_ptr<Message> message = client_.Read();
+
+  if (message->message_type == Message::kNotification) {
+    const union sctp_notification *snp =
+        (const union sctp_notification *)message->data();
+
+    switch (snp->sn_header.sn_type) {
+      case SCTP_ASSOC_CHANGE: {
+        const struct sctp_assoc_change *sac = &snp->sn_assoc_change;
+        switch (sac->sac_state) {
+          case SCTP_COMM_UP:
+            NodeConnected(sac->sac_assoc_id);
+
+            VLOG(1) << "Received up from " << message->PeerAddress() << " on "
+                    << sac->sac_assoc_id;
+            break;
+          case SCTP_COMM_LOST:
+          case SCTP_SHUTDOWN_COMP:
+          case SCTP_CANT_STR_ASSOC: {
+            NodeDisconnected();
+          } break;
+          case SCTP_RESTART:
+            LOG(FATAL) << "Never seen this before.";
+            break;
+        }
+      } break;
+    }
+
+    if (VLOG_IS_ON(1)) {
+      PrintNotification(message.get());
+    }
+  } else if (message->message_type == Message::kMessage) {
+    HandleData(message.get());
+  }
+}
+
+void SctpClientConnection::SendConnect() {
+  // Try to send the connect message.  If that fails, retry.
+  if (!client_.Send(kConnectStream(),
+                    std::string_view(
+                        reinterpret_cast<const char *>(connect_message_.data()),
+                        connect_message_.size()),
+                    0)) {
+    NodeDisconnected();
+  }
+}
+
+void SctpClientConnection::NodeConnected(sctp_assoc_t assoc_id) {
+  connect_timer_->Disable();
+
+  // We want to tell the kernel to schedule the packets on this new stream with
+  // the priority scheduler.  This only needs to be done once per stream.
+  client_.SetPriorityScheduler(assoc_id);
+
+  remote_assoc_id_ = assoc_id;
+  connection_->mutate_state(State::CONNECTED);
+}
+
+void SctpClientConnection::NodeDisconnected() {
+  connect_timer_->Setup(
+      event_loop_->monotonic_now() + chrono::milliseconds(100),
+      chrono::milliseconds(100));
+  remote_assoc_id_ = 0;
+  connection_->mutate_state(State::DISCONNECTED);
+}
+
+void SctpClientConnection::HandleData(const Message *message) {
+  const logger::MessageHeader *message_header =
+      flatbuffers::GetSizePrefixedRoot<logger::MessageHeader>(message->data());
+
+  connection_->mutate_received_packets(connection_->received_packets() + 1);
+
+  const int stream = message->header.rcvinfo.rcv_sid - kControlStreams();
+
+  // Publish the message.
+  RawSender *sender = (*channels_)[stream_to_channel_[stream]].get();
+  sender->Send(message_header->data()->data(), message_header->data()->size(),
+               aos::monotonic_clock::time_point(
+                   chrono::nanoseconds(message_header->monotonic_sent_time())),
+               aos::realtime_clock::time_point(
+                   chrono::nanoseconds(message_header->realtime_sent_time())),
+               message_header->queue_index());
+
+  if (stream_reply_with_timestamp_[stream]) {
+    // TODO(austin): Send back less if we are only acking.  Maybe only a
+    // stream id?  Nothing if we are only forwarding?
+
+    // Now fill out the message received reply.  This uses a MessageHeader
+    // container so it can be directly logged.
+    message_reception_reply_.mutable_message()->mutate_channel_index(
+        message_header->channel_index());
+    message_reception_reply_.mutable_message()->mutate_monotonic_sent_time(
+        message_header->monotonic_sent_time());
+
+    // And capture the relevant data needed to generate the forwarding
+    // MessageHeader.
+    message_reception_reply_.mutable_message()->mutate_monotonic_remote_time(
+        sender->monotonic_sent_time().time_since_epoch().count());
+    message_reception_reply_.mutable_message()->mutate_realtime_remote_time(
+        sender->realtime_sent_time().time_since_epoch().count());
+    message_reception_reply_.mutable_message()->mutate_remote_queue_index(
+        sender->sent_queue_index());
+
+    // Unique ID is channel_index and monotonic clock.
+    // TODO(austin): Depending on if we are the logger node or not, we need to
+    // guarentee that this ack gets received too...  Same path as the logger.
+    client_.Send(kTimestampStream(),
+                 std::string_view(reinterpret_cast<const char *>(
+                                      message_reception_reply_.data()),
+                                  message_reception_reply_.size()),
+                 0);
+  }
+
+  VLOG(1) << "Received data of length " << message->size << " from "
+          << message->PeerAddress();
+
+  if (VLOG_IS_ON(1)) {
+    client_.LogSctpStatus(message->header.rcvinfo.rcv_assoc_id);
+  }
+
+  VLOG(2) << "\tSNDRCV (stream=" << message->header.rcvinfo.rcv_sid
+          << " ssn=" << message->header.rcvinfo.rcv_ssn
+          << " tsn=" << message->header.rcvinfo.rcv_tsn << " flags=0x"
+          << std::hex << message->header.rcvinfo.rcv_flags << std::dec
+          << " ppid=" << message->header.rcvinfo.rcv_ppid
+          << " cumtsn=" << message->header.rcvinfo.rcv_cumtsn << ")";
+}
+
+MessageBridgeClient::MessageBridgeClient(aos::ShmEventLoop *event_loop)
+    : event_loop_(event_loop),
+      sender_(event_loop_->MakeSender<ClientStatistics>("/aos")),
+      source_node_names_(configuration::SourceNodeNames(
+          event_loop->configuration(), event_loop->node())),
+      statistics_(MakeClientStatistics(source_node_names_,
+                                       event_loop->configuration())) {
+  std::string_view node_name = event_loop->node()->name()->string_view();
+
+  // Find all the channels which are supposed to be delivered to us.
+  channels_.resize(event_loop_->configuration()->channels()->size());
+  int channel_index = 0;
+  for (const Channel *channel : *event_loop_->configuration()->channels()) {
+    if (channel->has_destination_nodes()) {
+      for (const Connection *connection : *channel->destination_nodes()) {
+        if (connection->name()->string_view() == node_name) {
+          // Give the config a chance to remap us.  This helps with testing on a
+          // single node.
+          const Channel *mapped_channel = configuration::GetChannel(
+              event_loop_->configuration(), channel->name()->string_view(),
+              channel->type()->string_view(), event_loop_->name(),
+              event_loop_->node());
+          channels_[channel_index] = event_loop_->MakeRawSender(mapped_channel);
+          break;
+        }
+      }
+    }
+    ++channel_index;
+  }
+
+  // Now, for each source node, build a connection.
+  int node_index = 0;
+  for (const std::string_view source_node : source_node_names_) {
+    // Open an unspecified connection (:: in ipv6 terminology)
+    connections_.emplace_back(new SctpClientConnection(
+        event_loop, source_node, event_loop->node(), "::", &channels_,
+        statistics_.mutable_message()->mutable_connections()->GetMutableObject(
+            node_index)));
+    ++node_index;
+  }
+
+  // And kick it all off.
+  statistics_timer_ = event_loop_->AddTimer([this]() { SendStatistics(); });
+  event_loop_->OnRun([this]() {
+    statistics_timer_->Setup(event_loop_->monotonic_now() + chrono::seconds(1),
+                             chrono::seconds(1));
+  });
+}
+
+}  // namespace message_bridge
+}  // namespace aos
diff --git a/aos/network/message_bridge_client_lib.h b/aos/network/message_bridge_client_lib.h
new file mode 100644
index 0000000..77bf2b7
--- /dev/null
+++ b/aos/network/message_bridge_client_lib.h
@@ -0,0 +1,116 @@
+#ifndef AOS_NETWORK_MESSAGE_BRIDGE_CLIENT_LIB_H_
+#define AOS_NETWORK_MESSAGE_BRIDGE_CLIENT_LIB_H_
+
+#include <string_view>
+
+#include "aos/events/event_loop.h"
+#include "aos/events/logging/logger_generated.h"
+#include "aos/events/shm_event_loop.h"
+#include "aos/network/connect_generated.h"
+#include "aos/network/message_bridge_client_generated.h"
+#include "aos/network/sctp_client.h"
+#include "aos/network/sctp_lib.h"
+
+namespace aos {
+namespace message_bridge {
+
+// See message_bridge_protocol.h for more details about the protocol.
+
+// This class encapsulates all the state required to connect to a server and
+// transmit messages.
+class SctpClientConnection {
+ public:
+  SctpClientConnection(aos::ShmEventLoop *const event_loop,
+                       std::string_view remote_name, const Node *my_node,
+                       std::string_view local_host,
+                       std::vector<std::unique_ptr<aos::RawSender>> *channels,
+                       ClientConnection *connection);
+
+  ~SctpClientConnection() { event_loop_->epoll()->DeleteFd(client_.fd()); }
+
+ private:
+  // Reads a message from the socket.  Could be a notification.
+  void MessageReceived();
+
+  // Sends a connection request message.
+  void SendConnect();
+
+  // Called when the server connection succeeds.
+  void NodeConnected(sctp_assoc_t assoc_id);
+  // Called when the server connection disconnects.
+  void NodeDisconnected();
+  void HandleData(const Message *message);
+
+  // Event loop to register the server on.
+  aos::ShmEventLoop *const event_loop_;
+
+  // Message to send on connect.
+  const aos::FlatbufferDetachedBuffer<aos::message_bridge::Connect>
+      connect_message_;
+
+  // Starting point for the message reception reply (including timestamps).
+  aos::FlatbufferDetachedBuffer<aos::logger::MessageHeader>
+      message_reception_reply_;
+
+  // Node we are sending to.
+  const aos::Node *const remote_node_;
+
+  // SCTP client.  There is a client per connection so we don't have to deal
+  // with association ids nearly as badly.
+  SctpClient client_;
+
+  // Channels to send received messages on.
+  std::vector<std::unique_ptr<aos::RawSender>> *channels_;
+  // Stream number -> channel lookup.
+  std::vector<int> stream_to_channel_;
+  // Bitmask signaling if we should be replying back with delivery times.
+  std::vector<bool> stream_reply_with_timestamp_;
+
+  // Timer which fires to handle reconnections.
+  aos::TimerHandler *connect_timer_;
+
+  // ClientConnection statistics message to modify.  This will be published
+  // periodicially.
+  ClientConnection *connection_;
+
+  // id of the server once known.  This is only valid if connection_ says
+  // connected.
+  sctp_assoc_t remote_assoc_id_ = 0;
+};
+
+// This encapsulates the state required to talk to *all* the servers from this
+// node.
+class MessageBridgeClient {
+ public:
+  MessageBridgeClient(aos::ShmEventLoop *event_loop);
+
+  ~MessageBridgeClient() {}
+
+ private:
+  // Sends out the statistics that are continually updated by the
+  // SctpClientConnections.
+  void SendStatistics() { sender_.Send(statistics_); }
+
+  // Event loop to schedule everything on.
+  aos::ShmEventLoop *event_loop_;
+  // Sender to publish statistics on.
+  aos::Sender<ClientStatistics> sender_;
+  aos::TimerHandler *statistics_timer_;
+
+  // Nodes to receive data from.
+  const std::vector<std::string_view> source_node_names_;
+
+  // Data to publish.
+  FlatbufferDetachedBuffer<ClientStatistics> statistics_;
+
+  // Channels to send data over.
+  std::vector<std::unique_ptr<aos::RawSender>> channels_;
+
+  // List of connections.  These correspond to the nodes in source_node_names_
+  std::vector<std::unique_ptr<SctpClientConnection>> connections_;
+};
+
+}  // namespace message_bridge
+}  // namespace aos
+
+#endif  // AOS_NETWORK_MESSAGE_BRIDGE_CLIENT_LIB_H_
diff --git a/aos/network/message_bridge_protocol.h b/aos/network/message_bridge_protocol.h
new file mode 100644
index 0000000..1136188
--- /dev/null
+++ b/aos/network/message_bridge_protocol.h
@@ -0,0 +1,31 @@
+#ifndef AOS_NETWORK_MESSAGE_BRIDGE_PROTOCOL_H_
+#define AOS_NETWORK_MESSAGE_BRIDGE_PROTOCOL_H_
+
+namespace aos {
+namespace message_bridge {
+
+// The protocol between the message_bridge_client and server is pretty simple.
+// The overarching design philosophy is that the server sends data to the
+// client, and the client (optionally) sends timestamps back.
+//
+// 1) A connection is established by the client sending the server a Connect
+//    flatbuffer on stream 0.
+// 2) The server then replies with the data, as it is available, on streams 2 +
+//    channel_id in the Connect message.
+// 3) The client (optionally) replies on stream 1 with MessageHeader flatbuffers
+//    with the timestamps that the messages were received.
+//
+// Most of the complexity from there is handling multiple clients and servers
+// and persuading SCTP to do what we want.
+
+// Number of streams reserved for control messages.
+constexpr size_t kControlStreams() { return 2; }
+// The stream on which Connect messages are sent.
+constexpr size_t kConnectStream() { return 0; }
+// The stream on which timestamp replies are sent.
+constexpr size_t kTimestampStream() { return 1; }
+
+}  // namespace message_bridge
+}  // namespace aos
+
+#endif  // AOS_NETWORK_MESSAGE_BRIDGE_PROTOCOL_H_
diff --git a/aos/network/message_bridge_server.cc b/aos/network/message_bridge_server.cc
new file mode 100644
index 0000000..fa5e7c1
--- /dev/null
+++ b/aos/network/message_bridge_server.cc
@@ -0,0 +1,35 @@
+#include "aos/events/shm_event_loop.h"
+#include "aos/init.h"
+#include "aos/network/message_bridge_server_lib.h"
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+
+DEFINE_string(config, "multinode_pingpong_config.json", "Path to the config.");
+
+namespace aos {
+namespace message_bridge {
+
+int Main() {
+  aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+      aos::configuration::ReadConfig(FLAGS_config);
+
+  aos::ShmEventLoop event_loop(&config.message());
+
+  MessageBridgeServer app(&event_loop);
+
+  // TODO(austin): Track which messages didn't make it in time and need to be
+  // logged locally and forwarded.
+
+  event_loop.Run();
+
+  return EXIT_SUCCESS;
+}
+
+}  // namespace message_bridge
+}  // namespace aos
+
+int main(int argc, char **argv) {
+  aos::InitGoogle(&argc, &argv);
+
+  return aos::message_bridge::Main();
+}
diff --git a/aos/network/message_bridge_server.fbs b/aos/network/message_bridge_server.fbs
new file mode 100644
index 0000000..75a6a15
--- /dev/null
+++ b/aos/network/message_bridge_server.fbs
@@ -0,0 +1,33 @@
+include "aos/configuration.fbs";
+
+namespace aos.message_bridge;
+
+// State of the connection.
+enum State: ubyte {
+  CONNECTED,
+  DISCONNECTED,
+}
+
+// Statistics from a single connection to a client from this server.
+table ServerConnection {
+  // The node that we are connected to.
+  node:Node;
+
+  // Health of this connection.  Connected or not?
+  state:State;
+
+  // Number of packets that have been dropped (if known).
+  dropped_packets:uint;
+
+  // Number of packets received on all channels.
+  sent_packets:uint;
+
+  // TODO(austin): Per channel counts?
+}
+
+// Statistics for all connections to all the clients.
+table ServerStatistics {
+  connections:[ServerConnection];
+}
+
+root_type ServerStatistics;
diff --git a/aos/network/message_bridge_server_lib.cc b/aos/network/message_bridge_server_lib.cc
new file mode 100644
index 0000000..38958ba
--- /dev/null
+++ b/aos/network/message_bridge_server_lib.cc
@@ -0,0 +1,367 @@
+#include "aos/network/message_bridge_server_lib.h"
+
+#include "absl/types/span.h"
+#include "aos/events/logging/logger.h"
+#include "aos/events/logging/logger_generated.h"
+#include "aos/events/shm_event_loop.h"
+#include "aos/network/connect_generated.h"
+#include "aos/network/message_bridge_protocol.h"
+#include "aos/network/message_bridge_server_generated.h"
+#include "aos/network/sctp_server.h"
+#include "glog/logging.h"
+
+namespace aos {
+namespace message_bridge {
+namespace {
+
+namespace chrono = std::chrono;
+
+// Builds up the "empty" server statistics message to be pointed to by all the
+// connections, updated at runtime, and periodically sent.
+FlatbufferDetachedBuffer<ServerStatistics> MakeServerStatistics(
+    const std::vector<std::string_view> &source_node_names,
+    const Configuration *configuration) {
+  flatbuffers::FlatBufferBuilder fbb;
+
+  std::vector<flatbuffers::Offset<ServerConnection>> connection_offsets;
+  for (const std::string_view node_name : source_node_names) {
+    flatbuffers::Offset<Node> node_offset =
+        CopyFlatBuffer(configuration::GetNode(configuration, node_name), &fbb);
+    ServerConnection::Builder connection_builder(fbb);
+    connection_builder.add_node(node_offset);
+    connection_builder.add_state(State::DISCONNECTED);
+    connection_builder.add_dropped_packets(0);
+    connection_builder.add_sent_packets(0);
+    connection_offsets.emplace_back(connection_builder.Finish());
+  }
+  flatbuffers::Offset<
+      flatbuffers::Vector<flatbuffers::Offset<ServerConnection>>>
+      connections_offset = fbb.CreateVector(connection_offsets);
+
+  ServerStatistics::Builder server_statistics_builder(fbb);
+  server_statistics_builder.add_connections(connections_offset);
+  fbb.Finish(server_statistics_builder.Finish());
+
+  return fbb.Release();
+}
+
+// Finds the statistics for the provided node name.
+ServerConnection *FindServerConnection(ServerStatistics *statistics,
+                                       std::string_view node_name) {
+  ServerConnection *matching_server_connection = nullptr;
+  for (size_t i = 0; i < statistics->mutable_connections()->size(); ++i) {
+    ServerConnection *server_connection =
+        statistics->mutable_connections()->GetMutableObject(i);
+    if (server_connection->node()->name()->string_view() == node_name) {
+      matching_server_connection = server_connection;
+      break;
+    }
+  }
+
+  CHECK(matching_server_connection != nullptr) << ": Unknown client";
+
+  return matching_server_connection;
+}
+
+}  // namespace
+
+bool ChannelState::Matches(const Channel *other_channel) {
+  // Confirm the normal tuple, plus make sure that the other side isn't going to
+  // send more data over than we expect with a mismatching size.
+  return (
+      channel_->name()->string_view() == other_channel->name()->string_view() &&
+      channel_->type()->string_view() == other_channel->type()->string_view() &&
+      channel_->max_size() == other_channel->max_size());
+}
+
+void ChannelState::SendData(SctpServer *server, const Context &context) {
+  // TODO(austin): I don't like allocating this buffer when we are just freeing
+  // it at the end of the function.
+  flatbuffers::FlatBufferBuilder fbb(channel_->max_size() + 100);
+  VLOG(1) << "Found " << peers_.size() << " peers on channel "
+          << channel_->name()->string_view() << " size " << context.size;
+
+  // TODO(austin): Use an iovec to build it up in 3 parts to avoid the copy?
+  // Only useful when not logging.
+  fbb.FinishSizePrefixed(logger::PackMessage(&fbb, context, channel_index_,
+                                             logger::LogType::kLogMessage));
+
+  // TODO(austin): Track which connections need to be reliable and handle
+  // resending properly.
+  size_t sent_count = 0;
+  bool logged_remotely = false;
+  for (Peer &peer : peers_) {
+    logged_remotely = logged_remotely || peer.logged_remotely;
+
+    if (peer.sac_assoc_id != 0) {
+      server->Send(std::string_view(
+                       reinterpret_cast<const char *>(fbb.GetBufferPointer()),
+                       fbb.GetSize()),
+                   peer.sac_assoc_id, peer.stream,
+                   peer.connection->time_to_live() / 1000000);
+      peer.server_connection_statistics->mutate_sent_packets(
+          peer.server_connection_statistics->sent_packets() + 1);
+      if (peer.logged_remotely) {
+        ++sent_count;
+      }
+    } else {
+      peer.server_connection_statistics->mutate_dropped_packets(
+          peer.server_connection_statistics->dropped_packets() + 1);
+    }
+  }
+
+  if (logged_remotely) {
+    if (sent_count == 0) {
+      VLOG(1) << "No clients, rejecting";
+      HandleFailure(fbb.Release());
+    } else {
+      sent_messages_.emplace_back(fbb.Release());
+    }
+  } else {
+    VLOG(1) << "Not bothering to track this message since nobody cares.";
+  }
+
+  // TODO(austin): Limit the size of this queue.  Flush messages to disk
+  // which are too old.  We really care about messages which didn't make it
+  // to a logger...  Which is a new concept or two.
+
+  // Need to handle logging and disk in another thread.  Need other thread
+  // since we sometimes want to skip disk, and synchronization is easier.
+  // This thread then spins on the queue until empty, then polls at 1-10 hz.
+
+  // TODO(austin): ~10 MB chunks on disk and push them over the logging
+  // channel?  Threadsafe disk backed queue object which can handle restarts
+  // and flushes.  Whee.
+}
+
+void ChannelState::HandleDelivery(sctp_assoc_t /*rcv_assoc_id*/,
+                                  uint16_t /*ssn*/,
+                                  absl::Span<const uint8_t> data) {
+  const logger::MessageHeader *message_header =
+      flatbuffers::GetRoot<logger::MessageHeader>(data.data());
+  while (sent_messages_.size() > 0u) {
+    if (sent_messages_.begin()->message().monotonic_sent_time() ==
+        message_header->monotonic_sent_time()) {
+      sent_messages_.pop_front();
+      continue;
+    }
+
+    if (sent_messages_.begin()->message().monotonic_sent_time() <
+        message_header->monotonic_sent_time()) {
+      VLOG(1) << "Delivery looks wrong, rejecting";
+      HandleFailure(std::move(sent_messages_.front()));
+      sent_messages_.pop_front();
+      continue;
+    }
+
+    break;
+  }
+}
+
+void ChannelState::HandleFailure(
+    SizePrefixedFlatbufferDetachedBuffer<logger::MessageHeader> &&message) {
+  // TODO(austin): Put it in the log queue.
+  LOG(INFO) << "Failed to send " << FlatbufferToJson(message);
+
+  // Note: this may be really out of order when we avoid the queue...  We
+  // have the ones we know didn't make it immediately, and the ones which
+  // time out eventually.  Need to sort that out.
+}
+
+void ChannelState::AddPeer(const Connection *connection,
+                           ServerConnection *server_connection_statistics,
+                           bool logged_remotely) {
+  peers_.emplace_back(0, 0, connection, server_connection_statistics,
+                      logged_remotely);
+}
+
+void ChannelState::NodeDisconnected(sctp_assoc_t assoc_id) {
+  for (ChannelState::Peer &peer : peers_) {
+    if (peer.sac_assoc_id == assoc_id) {
+      // TODO(austin): This will not handle multiple clients from
+      // a single node.  But that should be rare.
+      peer.server_connection_statistics->mutate_state(State::DISCONNECTED);
+      peer.sac_assoc_id = 0;
+      peer.stream = 0;
+      break;
+    }
+  }
+}
+
+void ChannelState::NodeConnected(const Node *node, sctp_assoc_t assoc_id,
+                                 int stream, SctpServer *server) {
+  for (ChannelState::Peer &peer : peers_) {
+    if (peer.connection->name()->string_view() == node->name()->string_view()) {
+      peer.sac_assoc_id = assoc_id;
+      peer.stream = stream;
+      peer.server_connection_statistics->mutate_state(State::CONNECTED);
+      server->SetStreamPriority(assoc_id, stream, peer.connection->priority());
+
+      break;
+    }
+  }
+}
+
+MessageBridgeServer::MessageBridgeServer(aos::ShmEventLoop *event_loop)
+    : event_loop_(event_loop),
+      sender_(event_loop_->MakeSender<ServerStatistics>("/aos")),
+      statistics_(MakeServerStatistics(
+          configuration::DestinationNodeNames(event_loop->configuration(),
+                                              event_loop->node()),
+          event_loop->configuration())),
+      server_("::", event_loop->node()->port()) {
+  CHECK(event_loop_->node() != nullptr) << ": No nodes configured.";
+
+  // TODO(austin): Time sync.  sctp gives us filtered round trip time, not
+  // target time.
+
+  // TODO(austin): Logging synchronization.
+  //
+  // TODO(austin): How do we handle parameter channels?  The oldest value
+  // needs to be sent regardless on connection (though probably only if it has
+  // changed).
+  event_loop_->epoll()->OnReadable(server_.fd(),
+                                   [this]() { MessageReceived(); });
+
+  LOG(INFO) << "Hostname: " << event_loop_->node()->hostname()->string_view();
+
+  int channel_index = 0;
+  for (const Channel *channel : *event_loop_->configuration()->channels()) {
+    CHECK(channel->has_source_node());
+    if (channel->source_node()->string_view() ==
+            event_loop_->node()->name()->string_view() &&
+        channel->has_destination_nodes()) {
+      std::unique_ptr<ChannelState> state(
+          new ChannelState{channel, channel_index});
+
+      for (const Connection *connection : *channel->destination_nodes()) {
+        const Node *other_node = configuration::GetNode(
+            event_loop_->configuration(), connection->name()->string_view());
+        state->AddPeer(
+            connection,
+            FindServerConnection(statistics_.mutable_message(),
+                                 connection->name()->string_view()),
+            configuration::ChannelMessageIsLoggedOnNode(channel, other_node));
+      }
+
+      // Call SendData for every message.
+      ChannelState *state_ptr = state.get();
+      event_loop_->MakeRawWatcher(
+          channel,
+          [this, state_ptr](const Context &context, const void * /*message*/) {
+            state_ptr->SendData(&server_, context);
+          });
+      channels_.emplace_back(std::move(state));
+    } else {
+      channels_.emplace_back(nullptr);
+    }
+    ++channel_index;
+  }
+
+  statistics_timer_ = event_loop_->AddTimer([this]() { SendStatistics(); });
+  event_loop_->OnRun([this]() {
+    statistics_timer_->Setup(event_loop_->monotonic_now() + chrono::seconds(1),
+                             chrono::seconds(1));
+  });
+}
+
+void MessageBridgeServer::NodeConnected(sctp_assoc_t assoc_id) {
+  server_.SetPriorityScheduler(assoc_id);
+}
+
+void MessageBridgeServer::NodeDisconnected(sctp_assoc_t assoc_id) {
+  // Find any matching peers and remove them.
+  for (std::unique_ptr<ChannelState> &channel_state : channels_) {
+    if (channel_state.get() == nullptr) {
+      continue;
+    }
+
+    channel_state->NodeDisconnected(assoc_id);
+  }
+}
+
+void MessageBridgeServer::MessageReceived() {
+  aos::unique_c_ptr<Message> message = server_.Read();
+
+  if (message->message_type == Message::kNotification) {
+    const union sctp_notification *snp =
+        (const union sctp_notification *)message->data();
+
+    switch (snp->sn_header.sn_type) {
+      case SCTP_ASSOC_CHANGE: {
+        const struct sctp_assoc_change *sac = &snp->sn_assoc_change;
+        switch (sac->sac_state) {
+          case SCTP_COMM_UP:
+            NodeConnected(sac->sac_assoc_id);
+            VLOG(1) << "Peer connected";
+            break;
+          case SCTP_COMM_LOST:
+          case SCTP_SHUTDOWN_COMP:
+          case SCTP_CANT_STR_ASSOC:
+            NodeDisconnected(sac->sac_assoc_id);
+            VLOG(1) << "Disconnect";
+            break;
+          case SCTP_RESTART:
+            LOG(FATAL) << "Never seen this before.";
+            break;
+        }
+      } break;
+    }
+
+    if (VLOG_IS_ON(1)) {
+      PrintNotification(message.get());
+    }
+  } else if (message->message_type == Message::kMessage) {
+    HandleData(message.get());
+  }
+}
+
+void MessageBridgeServer::HandleData(const Message *message) {
+  VLOG(1) << "Received data of length " << message->size;
+
+  if (message->header.rcvinfo.rcv_sid == kConnectStream()) {
+    // Control channel!
+    const Connect *connect = flatbuffers::GetRoot<Connect>(message->data());
+    VLOG(1) << FlatbufferToJson(connect);
+
+    // Account for the control channel and delivery times channel.
+    size_t channel_index = kControlStreams();
+    for (const Channel *channel : *connect->channels_to_transfer()) {
+      bool matched = false;
+      for (std::unique_ptr<ChannelState> &channel_state : channels_) {
+        if (channel_state.get() == nullptr) {
+          continue;
+        }
+        if (channel_state->Matches(channel)) {
+          channel_state->NodeConnected(connect->node(),
+                                       message->header.rcvinfo.rcv_assoc_id,
+                                       channel_index, &server_);
+
+          matched = true;
+          break;
+        }
+      }
+      if (!matched) {
+        LOG(ERROR) << "Remote tried registering for unknown channel "
+                   << FlatbufferToJson(channel);
+      } else {
+        ++channel_index;
+      }
+    }
+  } else if (message->header.rcvinfo.rcv_sid == kTimestampStream()) {
+    // Message delivery
+    const logger::MessageHeader *message_header =
+        flatbuffers::GetRoot<logger::MessageHeader>(message->data());
+
+    channels_[message_header->channel_index()]->HandleDelivery(
+        message->header.rcvinfo.rcv_assoc_id, message->header.rcvinfo.rcv_ssn,
+        absl::Span<const uint8_t>(message->data(), message->size));
+  }
+
+  if (VLOG_IS_ON(1)) {
+    message->LogRcvInfo();
+  }
+}
+
+}  // namespace message_bridge
+}  // namespace aos
diff --git a/aos/network/message_bridge_server_lib.h b/aos/network/message_bridge_server_lib.h
new file mode 100644
index 0000000..f202fc9
--- /dev/null
+++ b/aos/network/message_bridge_server_lib.h
@@ -0,0 +1,130 @@
+#ifndef AOS_NETWORK_MESSAGE_BRIDGE_SERVER_LIB_H_
+#define AOS_NETWORK_MESSAGE_BRIDGE_SERVER_LIB_H_
+
+#include <deque>
+
+#include "absl/types/span.h"
+#include "aos/events/logging/logger.h"
+#include "aos/events/logging/logger_generated.h"
+#include "aos/events/shm_event_loop.h"
+#include "aos/network/connect_generated.h"
+#include "aos/network/message_bridge_server_generated.h"
+#include "aos/network/sctp_server.h"
+#include "glog/logging.h"
+
+namespace aos {
+namespace message_bridge {
+
+// See message_bridge_protocol.h for more details about the protocol.
+
+// Class to encapsulate all the state per channel.  This is the dispatcher for a
+// new message from the event loop.
+class ChannelState {
+  public:
+   ChannelState(const Channel *channel, int channel_index)
+       : channel_index_(channel_index), channel_(channel) {}
+
+   // Class to encapsulate all the state per client on a channel.  A client may
+   // be subscribed to multiple channels.
+   struct Peer {
+     Peer(sctp_assoc_t new_sac_assoc_id, size_t new_stream,
+          const Connection *new_connection,
+          ServerConnection *new_server_connection_statistics,
+          bool new_logged_remotely)
+         : sac_assoc_id(new_sac_assoc_id),
+           stream(new_stream),
+           connection(new_connection),
+           server_connection_statistics(new_server_connection_statistics),
+           logged_remotely(new_logged_remotely) {}
+
+     // Valid if != 0.
+     sctp_assoc_t sac_assoc_id = 0;
+
+     size_t stream;
+     const aos::Connection *connection;
+     ServerConnection *server_connection_statistics;
+
+     // If true, this message will be logged on a receiving node.  We need to
+     // keep it around to log it locally if that fails.
+     bool logged_remotely = false;
+  };
+
+  // Needs to be called when a node (might have) disconnected.
+  void NodeDisconnected(sctp_assoc_t assoc_id);
+  void NodeConnected(const Node *node, sctp_assoc_t assoc_id, int stream,
+                     SctpServer *server);
+
+  // Adds a new peer.
+  void AddPeer(const Connection *connection,
+               ServerConnection *server_connection_statistics,
+               bool logged_remotely);
+
+  // Returns true if this channel has the same name and type as the other
+  // channel.
+  bool Matches(const Channel *other_channel);
+
+  // Sends the data in context using the provided server.
+  void SendData(SctpServer *server, const Context &context);
+
+  // Handles reception of delivery times.
+  void HandleDelivery(sctp_assoc_t rcv_assoc_id, uint16_t ssn,
+                      absl::Span<const uint8_t> data);
+
+  // Handles (by consuming) failure to deliver a message.
+  void HandleFailure(
+      SizePrefixedFlatbufferDetachedBuffer<logger::MessageHeader> &&message);
+
+ private:
+  const int channel_index_;
+  const Channel *const channel_;
+
+  std::vector<Peer> peers_;
+
+  std::deque<SizePrefixedFlatbufferDetachedBuffer<logger::MessageHeader>>
+      sent_messages_;
+};
+
+// This encapsulates the state required to talk to *all* the clients from this
+// node.  It handles the session and dispatches data to the ChannelState.
+class MessageBridgeServer {
+ public:
+  MessageBridgeServer(aos::ShmEventLoop *event_loop);
+
+  ~MessageBridgeServer() { event_loop_->epoll()->DeleteFd(server_.fd()); }
+
+ private:
+  // Reads a message from the socket.  Could be a notification.
+  void MessageReceived();
+
+  // Called when the server connection succeeds.
+  void NodeConnected(sctp_assoc_t assoc_id);
+  // Called when the server connection disconnects.
+  void NodeDisconnected(sctp_assoc_t assoc_id);
+
+  // Called when data (either a connection request or delivery timestamps) is
+  // received.
+  void HandleData(const Message *message);
+
+  // Sends out the statistics that are continually updated by the
+  // ChannelState's.
+  void SendStatistics() { sender_.Send(statistics_); }
+
+  // Event loop to schedule everything on.
+  aos::ShmEventLoop *event_loop_;
+
+  // Statistics, timer, and associated sender.
+  aos::Sender<ServerStatistics> sender_;
+  aos::TimerHandler *statistics_timer_;
+  FlatbufferDetachedBuffer<ServerStatistics> statistics_;
+
+  SctpServer server_;
+
+  // List of channels.  The entries that aren't sent from this node are left
+  // null.
+  std::vector<std::unique_ptr<ChannelState>> channels_;
+};
+
+}  // namespace message_bridge
+}  // namespace aos
+
+#endif  // AOS_NETWORK_MESSAGE_BRIDGE_SERVER_LIB_H_
diff --git a/aos/network/message_bridge_test.cc b/aos/network/message_bridge_test.cc
new file mode 100644
index 0000000..383c1c4
--- /dev/null
+++ b/aos/network/message_bridge_test.cc
@@ -0,0 +1,154 @@
+#include "gtest/gtest.h"
+
+#include <chrono>
+#include <thread>
+
+#include "aos/events/ping_generated.h"
+#include "aos/events/pong_generated.h"
+#include "aos/network/message_bridge_client_lib.h"
+#include "aos/network/message_bridge_server_lib.h"
+
+DECLARE_string(override_hostname);
+DECLARE_string(application_name);
+
+namespace aos {
+namespace message_bridge {
+namespace testing {
+
+namespace chrono = std::chrono;
+
+// Test that we can send a ping message over sctp and receive it.
+TEST(MessageBridgeTest, PingPong) {
+  // This is rather annoying to set up.  We need to start up a client and
+  // server, on the same node, but get them to think that they are on different
+  // nodes.
+  //
+  // We then get to wait until they are connected.
+  //
+  // After they are connected, we send a Ping message.
+  //
+  // On the other end, we receive a Pong message.
+  //
+  // But, we need the client to not post directly to "/test" like it would in a
+  // real system, otherwise we will re-send the ping message... So, use an
+  // application specific map to have the client post somewhere else.
+  //
+  // To top this all off, each of these needs to be done with a ShmEventLoop,
+  // which needs to run in a separate thread...  And it is really hard to get
+  // everything started up reliably.  So just be super generous on timeouts and
+  // hope for the best.  We can be more generous in the future if we need to.
+  //
+  // We are faking the application names by passing in --application_name=foo
+  aos::FlatbufferDetachedBuffer<aos::Configuration> server_config =
+      aos::configuration::ReadConfig(
+          "aos/network/message_bridge_test_server_config.json");
+  aos::FlatbufferDetachedBuffer<aos::Configuration> client_config =
+      aos::configuration::ReadConfig(
+          "aos/network/message_bridge_test_client_config.json");
+
+  FLAGS_application_name = "pi1_message_bridge_server";
+  // Force ourselves to be "raspberrypi" and allocate everything.
+  FLAGS_override_hostname = "raspberrypi";
+  aos::ShmEventLoop server_event_loop(&server_config.message());
+  MessageBridgeServer message_bridge_server(&server_event_loop);
+
+  // And build the app which sends the pings.
+  FLAGS_application_name = "ping";
+  aos::ShmEventLoop ping_event_loop(&server_config.message());
+  aos::Sender<examples::Ping> ping_sender =
+      ping_event_loop.MakeSender<examples::Ping>("/test");
+
+  // Now do it for "raspberrypi2", the client.
+  FLAGS_application_name = "pi2_message_bridge_client";
+  FLAGS_override_hostname = "raspberrypi2";
+  aos::ShmEventLoop client_event_loop(&client_config.message());
+  MessageBridgeClient message_bridge_client(&client_event_loop);
+
+  // And build the app which sends the pongs.
+  FLAGS_application_name = "pong";
+  aos::ShmEventLoop pong_event_loop(&client_config.message());
+
+  // Count the pongs.
+  int pong_count = 0;
+  pong_event_loop.MakeWatcher(
+      "/test2", [&pong_count, &ping_event_loop](const examples::Ping &ping) {
+        ++pong_count;
+        LOG(INFO) << "Got ping back " << FlatbufferToJson(&ping);
+        if (pong_count >= 2) {
+          LOG(INFO) << "That's enough bailing early.";
+          // And Exit is async safe, so thread safe is easy.
+          ping_event_loop.Exit();
+        }
+      });
+
+  FLAGS_override_hostname = "";
+
+  // Start everything up.  Pong is the only thing we don't know how to wait on,
+  // so start it first.
+  std::thread pong_thread([&pong_event_loop]() { pong_event_loop.Run(); });
+
+  std::thread server_thread(
+      [&server_event_loop]() { server_event_loop.Run(); });
+  std::thread client_thread(
+      [&client_event_loop]() { client_event_loop.Run(); });
+
+  // Wait until we are connected, then send.
+  int ping_count = 0;
+  ping_event_loop.MakeWatcher(
+      "/aos/pi1", [&ping_count, &client_event_loop,
+                   &ping_sender](const ServerStatistics &stats) {
+        LOG(INFO) << FlatbufferToJson(&stats);
+
+        ASSERT_TRUE(stats.has_connections());
+        EXPECT_EQ(stats.connections()->size(), 1);
+
+        bool connected = false;
+        for (const ServerConnection *connection : *stats.connections()) {
+          if (connection->node()->name()->string_view() ==
+              client_event_loop.node()->name()->string_view()) {
+            if (connection->state() == State::CONNECTED) {
+              connected = true;
+            }
+            break;
+          }
+        }
+
+        if (connected) {
+          LOG(INFO) << "Connected!  Sent ping.";
+          auto builder = ping_sender.MakeBuilder();
+          examples::Ping::Builder ping_builder =
+              builder.MakeBuilder<examples::Ping>();
+          ping_builder.add_value(ping_count + 971);
+          builder.Send(ping_builder.Finish());
+          ++ping_count;
+        }
+      });
+
+  // Time ourselves out after a while if Pong doesn't do it for us.
+  aos::TimerHandler *quit = ping_event_loop.AddTimer(
+      [&ping_event_loop]() { ping_event_loop.Exit(); });
+  ping_event_loop.OnRun([quit, &ping_event_loop]() {
+    quit->Setup(ping_event_loop.monotonic_now() + chrono::seconds(10));
+  });
+
+
+  // And go!
+  ping_event_loop.Run();
+
+  // Shut everyone else down
+  server_event_loop.Exit();
+  client_event_loop.Exit();
+  pong_event_loop.Exit();
+  server_thread.join();
+  client_thread.join();
+  pong_thread.join();
+
+  // Make sure we sent something.
+  EXPECT_GE(ping_count, 1);
+  // And got something back.
+  EXPECT_GE(pong_count, 1);
+}
+
+}  // namespace testing
+}  // namespace message_bridge
+}  // namespace aos
diff --git a/aos/network/message_bridge_test_client.json b/aos/network/message_bridge_test_client.json
new file mode 100644
index 0000000..65d28d2
--- /dev/null
+++ b/aos/network/message_bridge_test_client.json
@@ -0,0 +1,17 @@
+{
+  "imports": [
+    "message_bridge_test_common.json"
+  ],
+  "nodes": [
+    {
+      "name": "pi1",
+      "hostname": "localhost",
+      "port": 9971
+    },
+    {
+      "name": "pi2",
+      "hostname": "raspberrypi2",
+      "port": 9971
+    }
+  ]
+}
diff --git a/aos/network/message_bridge_test_common.json b/aos/network/message_bridge_test_common.json
new file mode 100644
index 0000000..d0085c2
--- /dev/null
+++ b/aos/network/message_bridge_test_common.json
@@ -0,0 +1,113 @@
+{
+  "channels": [
+    {
+      "name": "/aos/pi1",
+      "type": "aos.message_bridge.ServerStatistics",
+      "source_node": "pi1",
+      "frequency": 2
+    },
+    {
+      "name": "/aos/pi2",
+      "type": "aos.message_bridge.ServerStatistics",
+      "source_node": "pi2",
+      "frequency": 2
+    },
+    {
+      "name": "/aos/pi1",
+      "type": "aos.message_bridge.ClientStatistics",
+      "source_node": "pi1",
+      "frequency": 2
+    },
+    {
+      "name": "/aos/pi2",
+      "type": "aos.message_bridge.ClientStatistics",
+      "source_node": "pi2",
+      "frequency": 2
+    },
+    {
+      "name": "/aos/pi1",
+      "type": "aos.timing.Report",
+      "source_node": "pi1",
+      "frequency": 50,
+      "num_senders": 20,
+      "max_size": 2048
+    },
+    {
+      "name": "/aos/pi2",
+      "type": "aos.timing.Report",
+      "source_node": "pi2",
+      "frequency": 50,
+      "num_senders": 20,
+      "max_size": 2048
+    },
+    {
+      "name": "/test",
+      "type": "aos.examples.Ping",
+      "source_node": "pi1",
+      "destination_nodes": [
+        {
+          "name": "pi2",
+          "priority": 1,
+          "timestamp_logger": "REMOTE_LOGGER",
+          "timestamp_logger_node": "pi1"
+        }
+      ]
+    },
+    {
+      "name": "/test2",
+      "type": "aos.examples.Ping",
+      "source_node": "pi2"
+    },
+    {
+      "name": "/test",
+      "type": "aos.examples.Pong",
+      "source_node": "pi2",
+      "logger": "LOCAL_AND_REMOTE_LOGGER",
+      "logger_node": "pi1",
+      "destination_nodes": [
+        {
+          "name": "pi1",
+          "priority": 1,
+          "timestamp_logger": "REMOTE_LOGGER",
+          "timestamp_logger_node": "pi1"
+        }
+      ]
+    }
+  ],
+  "applications": [
+    {
+      "name": "pi2_message_bridge_client",
+      "maps": [
+        {
+          "match": {
+            "name": "/test",
+            "type": "aos.examples.Ping"
+          },
+          "rename": {
+            "name": "/test2"
+          }
+        }
+      ]
+    }
+  ],
+  "maps": [
+    {
+      "match": {
+        "name": "/aos",
+        "source_node": "pi1"
+      },
+      "rename": {
+        "name": "/aos/pi1"
+      }
+    },
+    {
+      "match": {
+        "name": "/aos",
+        "source_node": "pi2"
+      },
+      "rename": {
+        "name": "/aos/pi2"
+      }
+    }
+  ]
+}
diff --git a/aos/network/message_bridge_test_server.json b/aos/network/message_bridge_test_server.json
new file mode 100644
index 0000000..eea92e7
--- /dev/null
+++ b/aos/network/message_bridge_test_server.json
@@ -0,0 +1,17 @@
+{
+  "imports": [
+    "message_bridge_test_common.json"
+  ],
+  "nodes": [
+    {
+      "name": "pi1",
+      "hostname": "raspberrypi",
+      "port": 9971
+    },
+    {
+      "name": "pi2",
+      "hostname": "localhost",
+      "port": 9971
+    }
+  ]
+}
diff --git a/aos/network/sctp_client.cc b/aos/network/sctp_client.cc
new file mode 100644
index 0000000..9e7f6c1
--- /dev/null
+++ b/aos/network/sctp_client.cc
@@ -0,0 +1,144 @@
+#include "aos/network/sctp_client.h"
+
+#include <arpa/inet.h>
+#include <net/if.h>
+#include <netinet/sctp.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <string_view>
+
+#include "aos/network/sctp_lib.h"
+#include "aos/unique_malloc_ptr.h"
+#include "glog/logging.h"
+
+namespace aos {
+namespace message_bridge {
+
+SctpClient::SctpClient(std::string_view remote_host, int remote_port,
+                       int streams, std::string_view local_host, int local_port)
+    : sockaddr_remote_(ResolveSocket(remote_host, remote_port)),
+      sockaddr_local_(ResolveSocket(local_host, local_port)),
+      fd_(socket(sockaddr_local_.ss_family, SOCK_SEQPACKET, IPPROTO_SCTP)) {
+  LOG(INFO) << "socket(" << Family(sockaddr_local_)
+            << ", SOCK_SEQPACKET, IPPROTOSCTP) = " << fd_;
+  PCHECK(fd_ != -1);
+
+  {
+    // Allow the kernel to deliver messages from different streams in any order.
+    int full_interleaving = 2;
+    PCHECK(setsockopt(fd_, IPPROTO_SCTP, SCTP_FRAGMENT_INTERLEAVE,
+                      &full_interleaving, sizeof(full_interleaving)) == 0);
+  }
+
+  {
+    struct sctp_initmsg initmsg;
+    memset(&initmsg, 0, sizeof(struct sctp_initmsg));
+    initmsg.sinit_num_ostreams = streams;
+    initmsg.sinit_max_instreams = streams;
+    PCHECK(setsockopt(fd_, IPPROTO_SCTP, SCTP_INITMSG, &initmsg,
+                      sizeof(struct sctp_initmsg)) == 0);
+  }
+
+  {
+    int on = 1;
+    PCHECK(setsockopt(fd_, IPPROTO_SCTP, SCTP_RECVRCVINFO, &on, sizeof(int)) ==
+           0);
+  }
+  {
+    // Servers send promptly.  Clients don't.
+    // TODO(austin): Revisit this assumption when we have time sync.
+    int on = 0;
+    PCHECK(setsockopt(fd_, IPPROTO_SCTP, SCTP_NODELAY, &on, sizeof(int)) == 0);
+  }
+
+  {
+    // TODO(austin): This is the old style registration...  But, the sctp
+    // stack out in the wild for linux is old and primitive.
+    struct sctp_event_subscribe subscribe;
+    memset(&subscribe, 0, sizeof(subscribe));
+    subscribe.sctp_data_io_event = 1;
+    subscribe.sctp_association_event = 1;
+    PCHECK(setsockopt(fd_, SOL_SCTP, SCTP_EVENTS, (char *)&subscribe,
+                      sizeof(subscribe)) == 0);
+  }
+
+  PCHECK(bind(fd_, (struct sockaddr *)&sockaddr_local_,
+              sockaddr_local_.ss_family == AF_INET6
+                  ? sizeof(struct sockaddr_in6)
+                  : sizeof(struct sockaddr_in)) == 0);
+  VLOG(1) << "bind(" << fd_ << ", " << Address(sockaddr_local_) << ")";
+}
+
+aos::unique_c_ptr<Message> SctpClient::Read() {
+  return ReadSctpMessage(fd_, max_size_);
+}
+
+bool SctpClient::Send(int stream, std::string_view data, int time_to_live) {
+  struct iovec iov;
+  iov.iov_base = const_cast<char *>(data.data());
+  iov.iov_len = data.size();
+
+  struct msghdr outmsg;
+  // Target to send to.
+  outmsg.msg_name = &sockaddr_remote_;
+  outmsg.msg_namelen = sizeof(struct sockaddr_storage);
+  VLOG(1) << "Sending to " << Address(sockaddr_remote_);
+
+  // Data to send.
+  outmsg.msg_iov = &iov;
+  outmsg.msg_iovlen = 1;
+
+  // Build up the sndinfo message.
+  char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
+  outmsg.msg_control = outcmsg;
+  outmsg.msg_controllen = sizeof(outcmsg);
+  outmsg.msg_flags = 0;
+
+  struct cmsghdr *cmsg = CMSG_FIRSTHDR(&outmsg);
+  cmsg->cmsg_level = IPPROTO_SCTP;
+  cmsg->cmsg_type = SCTP_SNDRCV;
+  cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
+
+  outmsg.msg_controllen = cmsg->cmsg_len;
+  struct sctp_sndrcvinfo *sinfo = (struct sctp_sndrcvinfo *)CMSG_DATA(cmsg);
+  memset(sinfo, 0, sizeof(struct sctp_sndrcvinfo));
+  sinfo->sinfo_ppid = rand();
+  sinfo->sinfo_stream = stream;
+  sinfo->sinfo_context = 19;
+  sinfo->sinfo_flags = 0;
+  sinfo->sinfo_timetolive = time_to_live;
+
+  // And send.
+  const ssize_t size = sendmsg(fd_, &outmsg, MSG_NOSIGNAL | MSG_DONTWAIT);
+  if (size == -1) {
+    if (errno != EPIPE && errno != EAGAIN) {
+      PCHECK(size == static_cast<ssize_t>(data.size()));
+    } else {
+      return false;
+    }
+  } else {
+    CHECK_EQ(static_cast<ssize_t>(data.size()), size);
+  }
+
+  VLOG(1) << "Sent " << data.size();
+  return true;
+}
+
+void SctpClient::LogSctpStatus(sctp_assoc_t assoc_id) {
+  message_bridge::LogSctpStatus(fd(), assoc_id);
+}
+
+void SctpClient::SetPriorityScheduler(sctp_assoc_t assoc_id) {
+    struct sctp_assoc_value scheduler;
+    memset(&scheduler, 0, sizeof(scheduler));
+    scheduler.assoc_id = assoc_id;
+    scheduler.assoc_value = SCTP_SS_PRIO;
+    if (setsockopt(fd(), IPPROTO_SCTP, SCTP_STREAM_SCHEDULER, &scheduler,
+                   sizeof(scheduler)) != 0) {
+      PLOG(WARNING) << "Failed to set scheduler";
+    }
+}
+
+}  // namespace message_bridge
+}  // namespace aos
diff --git a/aos/network/sctp_client.h b/aos/network/sctp_client.h
new file mode 100644
index 0000000..926e59b
--- /dev/null
+++ b/aos/network/sctp_client.h
@@ -0,0 +1,57 @@
+#ifndef AOS_NETWORK_SCTP_CLIENT_H_
+#define AOS_NETWORK_SCTP_CLIENT_H_
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string_view>
+
+#include "aos/network/sctp_lib.h"
+#include "aos/unique_malloc_ptr.h"
+#include "glog/logging.h"
+
+namespace aos {
+namespace message_bridge {
+
+// Class to encapsulate everything needed to be a SCTP client.
+class SctpClient {
+ public:
+  SctpClient(std::string_view remote_host, int remote_port, int streams,
+             std::string_view local_host = "0.0.0.0", int local_port = 9971);
+
+  ~SctpClient() {
+    LOG(INFO) << "close(" << fd_ << ")";
+    PCHECK(close(fd_) == 0);
+  }
+
+  // Receives the next packet from the remote.
+  aos::unique_c_ptr<Message> Read();
+
+  // Sends a block of data on a stream with a TTL.
+  bool Send(int stream, std::string_view data, int time_to_live);
+
+  int fd() { return fd_; }
+
+  // Enables the priority scheduler.  This is a SCTP feature which lets us
+  // configure the priority per stream so that higher priority packets don't get
+  // backed up behind lower priority packets in the networking queues.
+  void SetPriorityScheduler(sctp_assoc_t assoc_id);
+
+  // Remote to send to.
+  struct sockaddr_storage sockaddr_remote() const {
+    return sockaddr_remote_;
+  }
+
+  void LogSctpStatus(sctp_assoc_t assoc_id);
+
+ private:
+  struct sockaddr_storage sockaddr_remote_;
+  struct sockaddr_storage sockaddr_local_;
+  int fd_;
+
+  size_t max_size_ = 1000;
+};
+
+}  // namespace message_bridge
+}  // namespace aos
+
+#endif  //  AOS_NETWORK_SCTP_CLIENT_H_
diff --git a/aos/network/sctp_lib.cc b/aos/network/sctp_lib.cc
new file mode 100644
index 0000000..1ed816b
--- /dev/null
+++ b/aos/network/sctp_lib.cc
@@ -0,0 +1,229 @@
+#include "aos/network/sctp_lib.h"
+
+#include <arpa/inet.h>
+#include <net/if.h>
+#include <netdb.h>
+#include <netinet/sctp.h>
+
+#include <string_view>
+
+DEFINE_string(interface, "", "ipv6 interface");
+
+namespace aos {
+namespace message_bridge {
+
+namespace {
+const char *sac_state_tbl[] = {"COMMUNICATION_UP", "COMMUNICATION_LOST",
+                               "RESTART", "SHUTDOWN_COMPLETE",
+                               "CANT_START_ASSOCICATION"};
+
+typedef union {
+  struct sctp_initmsg init;
+  struct sctp_sndrcvinfo sndrcvinfo;
+} _sctp_cmsg_data_t;
+
+}  // namespace
+
+struct sockaddr_storage ResolveSocket(std::string_view host, int port) {
+  struct sockaddr_storage result;
+  struct addrinfo *addrinfo_result;
+  struct sockaddr_in *t_addr = (struct sockaddr_in *)&result;
+  struct sockaddr_in6 *t_addr6 = (struct sockaddr_in6 *)&result;
+
+  PCHECK(getaddrinfo(std::string(host).c_str(), 0, NULL, &addrinfo_result) ==
+         0);
+
+  switch (addrinfo_result->ai_family) {
+    case AF_INET:
+      memcpy(t_addr, addrinfo_result->ai_addr, addrinfo_result->ai_addrlen);
+      t_addr->sin_family = addrinfo_result->ai_family;
+      t_addr->sin_port = htons(port);
+
+      break;
+    case AF_INET6:
+      memcpy(t_addr6, addrinfo_result->ai_addr, addrinfo_result->ai_addrlen);
+      t_addr6->sin6_family = addrinfo_result->ai_family;
+      t_addr6->sin6_port = htons(port);
+
+      if (FLAGS_interface.size() > 0) {
+        t_addr6->sin6_scope_id = if_nametoindex(FLAGS_interface.c_str());
+      }
+
+      break;
+  }
+
+  // Now print it back out nicely.
+  char host_string[NI_MAXHOST];
+  char service_string[NI_MAXSERV];
+
+  int error = getnameinfo((struct sockaddr *)&result,
+                          addrinfo_result->ai_addrlen, host_string, NI_MAXHOST,
+                          service_string, NI_MAXSERV, NI_NUMERICHOST);
+
+  if (error) {
+    LOG(ERROR) << "Reverse lookup failed ... " << gai_strerror(error);
+  }
+
+  LOG(INFO) << "remote:addr=" << host_string << ", port=" << service_string
+            << ", family=" << addrinfo_result->ai_family;
+
+  freeaddrinfo(addrinfo_result);
+
+  return result;
+}
+
+std::string_view Family(const struct sockaddr_storage &sockaddr) {
+  if (sockaddr.ss_family == AF_INET) {
+    return "AF_INET";
+  } else if (sockaddr.ss_family == AF_INET6) {
+    return "AF_INET6";
+  } else {
+    return "unknown";
+  }
+}
+std::string Address(const struct sockaddr_storage &sockaddr) {
+  char addrbuf[INET6_ADDRSTRLEN];
+  if (sockaddr.ss_family == AF_INET) {
+    const struct sockaddr_in *sin = (const struct sockaddr_in *)&sockaddr;
+    return std::string(
+        inet_ntop(AF_INET, &sin->sin_addr, addrbuf, INET6_ADDRSTRLEN));
+  } else {
+    const struct sockaddr_in6 *sin6 = (const struct sockaddr_in6 *)&sockaddr;
+    return std::string(
+        inet_ntop(AF_INET6, &sin6->sin6_addr, addrbuf, INET6_ADDRSTRLEN));
+  }
+}
+
+void PrintNotification(const Message *msg) {
+  const union sctp_notification *snp =
+      (const union sctp_notification *)msg->data();
+
+  LOG(INFO) << "Notification:";
+
+  switch (snp->sn_header.sn_type) {
+    case SCTP_ASSOC_CHANGE: {
+      const struct sctp_assoc_change *sac = &snp->sn_assoc_change;
+      LOG(INFO) << "SCTP_ASSOC_CHANGE(" << sac_state_tbl[sac->sac_state] << ")";
+      VLOG(1) << "    (assoc_change: state=" << sac->sac_state
+              << ", error=" << sac->sac_error
+              << ", instr=" << sac->sac_inbound_streams
+              << " outstr=" << sac->sac_outbound_streams
+              << ", assoc=" << sac->sac_assoc_id << ")";
+    } break;
+    case SCTP_PEER_ADDR_CHANGE: {
+      const struct sctp_paddr_change *spc = &snp->sn_paddr_change;
+      LOG(INFO) << " SlCTP_PEER_ADDR_CHANGE";
+      VLOG(1) << "\t\t(peer_addr_change: " << Address(spc->spc_aaddr)
+              << " state=" << spc->spc_state << ", error=" << spc->spc_error
+              << ")";
+    } break;
+    case SCTP_SEND_FAILED: {
+      const struct sctp_send_failed *ssf = &snp->sn_send_failed;
+      LOG(INFO) << " SCTP_SEND_FAILED";
+      VLOG(1) << "\t\t(sendfailed: len=" << ssf->ssf_length
+              << " err=" << ssf->ssf_error << ")";
+    } break;
+    case SCTP_REMOTE_ERROR: {
+      const struct sctp_remote_error *sre = &snp->sn_remote_error;
+      LOG(INFO) << " SCTP_REMOTE_ERROR";
+      VLOG(1) << "\t\t(remote_error: err=" << ntohs(sre->sre_error) << ")";
+    } break;
+    case SCTP_SHUTDOWN_EVENT: {
+      LOG(INFO) << " SCTP_SHUTDOWN_EVENT";
+    } break;
+    default:
+      LOG(INFO) << " Unknown type: " << snp->sn_header.sn_type;
+      break;
+  }
+}
+
+std::string GetHostname() {
+  char buf[256];
+  buf[sizeof(buf) - 1] = '\0';
+  PCHECK(gethostname(buf, sizeof(buf) - 1) == 0);
+  return buf;
+}
+
+std::string Message::PeerAddress() const { return Address(sin); }
+
+void LogSctpStatus(int fd, sctp_assoc_t assoc_id) {
+  struct sctp_status status;
+  memset(&status, 0, sizeof(status));
+  status.sstat_assoc_id = assoc_id;
+
+  socklen_t size = sizeof(status);
+  PCHECK(getsockopt(fd, SOL_SCTP, SCTP_STATUS,
+                    reinterpret_cast<void *>(&status), &size) == 0);
+
+  LOG(INFO) << "sctp_status) sstat_assoc_id:" << status.sstat_assoc_id
+            << " sstat_state:" << status.sstat_state
+            << " sstat_rwnd:" << status.sstat_rwnd
+            << " sstat_unackdata:" << status.sstat_unackdata
+            << " sstat_penddata:" << status.sstat_penddata
+            << " sstat_instrms:" << status.sstat_instrms
+            << " sstat_outstrms:" << status.sstat_outstrms
+            << " sstat_fragmentation_point:" << status.sstat_fragmentation_point
+            << " sstat_primary.spinfo_srtt:" << status.sstat_primary.spinfo_srtt
+            << " sstat_primary.spinfo_rto:" << status.sstat_primary.spinfo_rto;
+}
+
+aos::unique_c_ptr<Message> ReadSctpMessage(int fd, int max_size) {
+  char incmsg[CMSG_SPACE(sizeof(_sctp_cmsg_data_t))];
+  struct iovec iov;
+  struct msghdr inmessage;
+
+  memset(&inmessage, 0, sizeof(struct msghdr));
+
+  aos::unique_c_ptr<Message> result(
+      reinterpret_cast<Message *>(malloc(sizeof(Message) + max_size)));
+
+  iov.iov_len = max_size;
+  iov.iov_base = result->mutable_data();
+
+  inmessage.msg_iov = &iov;
+  inmessage.msg_iovlen = 1;
+
+  inmessage.msg_control = incmsg;
+  inmessage.msg_controllen = sizeof(incmsg);
+
+  inmessage.msg_namelen = sizeof(struct sockaddr_storage);
+  inmessage.msg_name = &result->sin;
+
+  ssize_t size;
+  PCHECK((size = recvmsg(fd, &inmessage, 0)) > 0);
+
+  result->size = size;
+
+  if ((MSG_NOTIFICATION & inmessage.msg_flags)) {
+    result->message_type = Message::kNotification;
+  } else {
+    result->message_type = Message::kMessage;
+  }
+
+  for (struct cmsghdr *scmsg = CMSG_FIRSTHDR(&inmessage); scmsg != NULL;
+       scmsg = CMSG_NXTHDR(&inmessage, scmsg)) {
+    switch (scmsg->cmsg_type) {
+      case SCTP_RCVINFO: {
+        struct sctp_rcvinfo *data = (struct sctp_rcvinfo *)CMSG_DATA(scmsg);
+        result->header.rcvinfo = *data;
+      } break;
+      default:
+        LOG(INFO) << "\tUnknown type: " << scmsg->cmsg_type;
+        break;
+    }
+  }
+
+  return result;
+}
+
+void Message::LogRcvInfo() const {
+  LOG(INFO) << "\tSNDRCV (stream=" << header.rcvinfo.rcv_sid
+            << " ssn=" << header.rcvinfo.rcv_ssn
+            << " tsn=" << header.rcvinfo.rcv_tsn << " flags=0x" << std::hex
+            << header.rcvinfo.rcv_flags << std::dec
+            << " ppid=" << header.rcvinfo.rcv_ppid
+            << " cumtsn=" << header.rcvinfo.rcv_cumtsn << ")";
+}
+
+}  // namespace message_bridge
+}  // namespace aos
diff --git a/aos/network/sctp_lib.h b/aos/network/sctp_lib.h
new file mode 100644
index 0000000..0f90f87
--- /dev/null
+++ b/aos/network/sctp_lib.h
@@ -0,0 +1,78 @@
+#ifndef AOS_NETWORK_SCTP_LIB_H_
+#define AOS_NETWORK_SCTP_LIB_H_
+
+#include <arpa/inet.h>
+#include <netinet/sctp.h>
+
+#include <memory>
+#include <string>
+#include <string_view>
+
+#include "aos/unique_malloc_ptr.h"
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+
+namespace aos {
+namespace message_bridge {
+
+// Resolves a socket and returns the address.  This can be either an ipv4 or
+// ipv6 address.
+struct sockaddr_storage ResolveSocket(std::string_view host, int port);
+
+// Returns a formatted version of the address.
+std::string Address(const struct sockaddr_storage &sockaddr);
+// Returns a formatted version of the address family.
+std::string_view Family(const struct sockaddr_storage &sockaddr);
+
+// Message received.
+// This message is malloced bigger than needed and the extra space after it is
+// the data.
+struct Message {
+  // Struct to let us force data to be well aligned.
+  struct OveralignedChar {
+    uint8_t data alignas(32);
+  };
+
+  // Headers.
+  struct {
+    struct sctp_rcvinfo rcvinfo;
+  } header;
+
+  // Address of the sender.
+  struct sockaddr_storage sin;
+
+  // Data type. Is it a block of data, or is it a struct sctp_notification?
+  enum MessageType { kMessage, kNotification } message_type;
+
+  size_t size = 0u;
+  uint8_t *mutable_data() {
+    return reinterpret_cast<uint8_t *>(&actual_data[0].data);
+  }
+  const uint8_t *data() const {
+    return reinterpret_cast<const uint8_t *>(&actual_data[0].data);
+  }
+
+  // Returns a human readable peer IP address.
+  std::string PeerAddress() const;
+
+  // Prints out the RcvInfo structure.
+  void LogRcvInfo() const;
+
+  // The start of the data.
+  OveralignedChar actual_data[];
+};
+
+void PrintNotification(const Message *msg);
+
+std::string GetHostname();
+
+// Gets and logs the contents of the sctp_status message.
+void LogSctpStatus(int fd, sctp_assoc_t assoc_id);
+
+// Read and allocate a message.
+aos::unique_c_ptr<Message> ReadSctpMessage(int fd, int max_size);
+
+}  // namespace message_bridge
+}  // namespace aos
+
+#endif  // AOS_NETWORK_SCTP_LIB_H_
diff --git a/aos/network/sctp_server.cc b/aos/network/sctp_server.cc
new file mode 100644
index 0000000..70d5b28
--- /dev/null
+++ b/aos/network/sctp_server.cc
@@ -0,0 +1,143 @@
+#include "aos/network/sctp_server.h"
+
+#include <arpa/inet.h>
+#include <net/if.h>
+#include <netdb.h>
+#include <netinet/in.h>
+#include <netinet/sctp.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <memory>
+
+#include "aos/network/sctp_lib.h"
+#include "aos/unique_malloc_ptr.h"
+#include "glog/logging.h"
+
+namespace aos {
+namespace message_bridge {
+
+SctpServer::SctpServer(std::string_view local_host, int local_port)
+    : sockaddr_local_(ResolveSocket(local_host, local_port)),
+      fd_(socket(sockaddr_local_.ss_family, SOCK_SEQPACKET, IPPROTO_SCTP)) {
+  LOG(INFO) << "socket(" << Family(sockaddr_local_)
+            << ", SOCK_SEQPACKET, IPPROTOSCTP) = " << fd_;
+  PCHECK(fd_ != -1);
+
+  {
+    struct sctp_event_subscribe subscribe;
+    memset(&subscribe, 0, sizeof(subscribe));
+    subscribe.sctp_data_io_event = 1;
+    subscribe.sctp_association_event = 1;
+    subscribe.sctp_send_failure_event = 1;
+    subscribe.sctp_partial_delivery_event = 1;
+
+    PCHECK(setsockopt(fd_, SOL_SCTP, SCTP_EVENTS, (char *)&subscribe,
+                      sizeof(subscribe)) == 0);
+  }
+  {
+    // Enable recvinfo when a packet arrives.
+    int on = 1;
+    PCHECK(setsockopt(fd_, IPPROTO_SCTP, SCTP_RECVRCVINFO, &on, sizeof(int)) ==
+           0);
+  }
+  {
+    // Allow one packet on the wire to have multiple source packets.
+    int full_interleaving = 2;
+    PCHECK(setsockopt(fd_, IPPROTO_SCTP, SCTP_FRAGMENT_INTERLEAVE,
+                      &full_interleaving, sizeof(full_interleaving)) == 0);
+  }
+  {
+    // Turn off the NAGLE algorithm.
+    int on = 1;
+    PCHECK(setsockopt(fd_, IPPROTO_SCTP, SCTP_NODELAY, &on, sizeof(int)) == 0);
+  }
+
+  // And go!
+  PCHECK(bind(fd_, (struct sockaddr *)&sockaddr_local_,
+              sockaddr_local_.ss_family == AF_INET6
+                  ? sizeof(struct sockaddr_in6)
+                  : sizeof(struct sockaddr_in)) == 0);
+  LOG(INFO) << "bind(" << fd_ << ", " << Address(sockaddr_local_) << ")";
+
+  PCHECK(listen(fd_, 100) == 0);
+
+  PCHECK(setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &max_size_,
+                    sizeof(max_size_)) == 0);
+}
+
+aos::unique_c_ptr<Message> SctpServer::Read() {
+  return ReadSctpMessage(fd_, max_size_);
+}
+
+void SctpServer::Send(std::string_view data, sctp_assoc_t snd_assoc_id,
+                      int stream, int timetolive) {
+  struct iovec iov;
+  iov.iov_base = const_cast<char *>(data.data());
+  iov.iov_len = data.size();
+
+  // Use the assoc_id for the destination instead of the msg_name.
+  struct msghdr outmsg;
+  outmsg.msg_namelen = 0;
+
+  // Data to send.
+  outmsg.msg_iov = &iov;
+  outmsg.msg_iovlen = 1;
+
+  // Build up the sndinfo message.
+  char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
+  outmsg.msg_control = outcmsg;
+  outmsg.msg_controllen = CMSG_SPACE(sizeof(struct sctp_sndrcvinfo));
+  outmsg.msg_flags = 0;
+
+  struct cmsghdr *cmsg = CMSG_FIRSTHDR(&outmsg);
+  cmsg->cmsg_level = IPPROTO_SCTP;
+  cmsg->cmsg_type = SCTP_SNDRCV;
+  cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
+
+  struct sctp_sndrcvinfo *sinfo = (struct sctp_sndrcvinfo *)CMSG_DATA(cmsg);
+  memset(sinfo, 0, sizeof(struct sctp_sndrcvinfo));
+  sinfo->sinfo_ppid = ++ppid_;
+  sinfo->sinfo_stream = stream;
+  sinfo->sinfo_flags = 0;
+  sinfo->sinfo_assoc_id = snd_assoc_id;
+  sinfo->sinfo_timetolive = timetolive;
+
+  // And send.
+  const ssize_t size = sendmsg(fd_, &outmsg, MSG_NOSIGNAL | MSG_DONTWAIT);
+  if (size == -1) {
+    if (errno != EPIPE) {
+      PCHECK(size == static_cast<ssize_t>(data.size()));
+    }
+  } else {
+    CHECK_EQ(static_cast<ssize_t>(data.size()), size);
+  }
+}
+
+void SctpServer::SetPriorityScheduler(sctp_assoc_t assoc_id) {
+  struct sctp_assoc_value scheduler;
+  memset(&scheduler, 0, sizeof(scheduler));
+  scheduler.assoc_id = assoc_id;
+  scheduler.assoc_value = SCTP_SS_PRIO;
+  if (setsockopt(fd(), IPPROTO_SCTP, SCTP_STREAM_SCHEDULER, &scheduler,
+                 sizeof(scheduler)) != 0) {
+    PLOG(WARNING) << "Failed to set scheduler";
+  }
+}
+
+void SctpServer::SetStreamPriority(sctp_assoc_t assoc_id, int stream_id,
+                                   uint16_t priority) {
+  struct sctp_stream_value sctp_priority;
+  memset(&sctp_priority, 0, sizeof(sctp_priority));
+  sctp_priority.assoc_id = assoc_id;
+  sctp_priority.stream_id = stream_id;
+  sctp_priority.stream_value = priority;
+  if (setsockopt(fd(), IPPROTO_SCTP, SCTP_STREAM_SCHEDULER_VALUE,
+                 &sctp_priority, sizeof(sctp_priority)) != 0) {
+    PLOG(WARNING) << "Failed to set scheduler";
+  }
+}
+
+}  // namespace message_bridge
+}  // namespace aos
diff --git a/aos/network/sctp_server.h b/aos/network/sctp_server.h
new file mode 100644
index 0000000..a3086d9
--- /dev/null
+++ b/aos/network/sctp_server.h
@@ -0,0 +1,63 @@
+#ifndef AOS_NETWORK_SCTP_SERVER_H_
+#define AOS_NETWORK_SCTP_SERVER_H_
+
+#include <arpa/inet.h>
+#include <net/if.h>
+#include <netdb.h>
+#include <netinet/in.h>
+#include <netinet/sctp.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <memory>
+#include <sys/socket.h>
+
+#include "aos/network/sctp_lib.h"
+#include "aos/unique_malloc_ptr.h"
+#include "glog/logging.h"
+
+namespace aos {
+namespace message_bridge {
+
+class SctpServer {
+ public:
+  SctpServer(std::string_view local_host = "0.0.0.0", int local_port = 9971);
+
+  ~SctpServer() {
+    LOG(INFO) << "close(" << fd_ << ")";
+    PCHECK(close(fd_) == 0);
+  }
+
+  // Receives the next packet from the remote.
+  aos::unique_c_ptr<Message> Read();
+
+  // Sends a block of data to a client on a stream with a TTL.
+  void Send(std::string_view data, sctp_assoc_t snd_assoc_id, int stream,
+            int timetolive);
+
+  int fd() { return fd_; }
+
+  // Enables the priority scheduler.  This is a SCTP feature which lets us
+  // configure the priority per stream so that higher priority packets don't get
+  // backed up behind lower priority packets in the networking queues.
+  void SetPriorityScheduler(sctp_assoc_t assoc_id);
+
+  // Sets the priority of a specific stream.
+  void SetStreamPriority(sctp_assoc_t assoc_id, int stream_id,
+                         uint16_t priority);
+
+ private:
+  struct sockaddr_storage sockaddr_local_;
+  int fd_;
+
+  // TODO(austin): Configure this.
+  size_t max_size_ = 1000;
+
+  int ppid_ = 1;
+};
+
+
+}  // namespace message_bridge
+}  // namespace aos
+
+#endif  // AOS_NETWORK_SCTP_SERVER_H_
diff --git a/aos/testdata/config1_multinode.json b/aos/testdata/config1_multinode.json
index 32bce5c..f12d927 100644
--- a/aos/testdata/config1_multinode.json
+++ b/aos/testdata/config1_multinode.json
@@ -4,12 +4,22 @@
       "name": "/foo",
       "type": ".aos.bar",
       "max_size": 5,
-      "source_node": "pi2"
+      "source_node": "pi2",
+      "destination_nodes": [
+        {
+          "name": "pi1"
+        }
+      ]
     },
     {
       "name": "/foo2",
       "type": ".aos.bar",
-      "source_node": "pi1"
+      "source_node": "pi1",
+      "destination_nodes": [
+        {
+          "name": "pi2"
+        }
+      ]
     }
   ],
   "applications": [
diff --git a/aos/testdata/expected_multinode.json b/aos/testdata/expected_multinode.json
index 46d052e..0946007 100644
--- a/aos/testdata/expected_multinode.json
+++ b/aos/testdata/expected_multinode.json
@@ -4,12 +4,22 @@
    "name": "/foo",
    "type": ".aos.bar",
    "max_size": 5,
-   "source_node": "pi2"
+   "source_node": "pi2",
+   "destination_nodes": [
+    {
+     "name": "pi1"
+    }
+   ]
   },
   {
    "name": "/foo2",
    "type": ".aos.bar",
-   "source_node": "pi1"
+   "source_node": "pi1",
+   "destination_nodes": [
+    {
+     "name": "pi2"
+    }
+   ]
   },
   {
    "name": "/foo3",