Merge changes Icd0187f2,I81a0da10

* changes:
  Add drivetrain replay application
  Enable renaming logged locations
diff --git a/aos/config.bzl b/aos/config.bzl
index 3f78422..0766329 100644
--- a/aos/config.bzl
+++ b/aos/config.bzl
@@ -2,7 +2,7 @@
 
 AosConfigInfo = provider(fields = ["transitive_flatbuffers", "transitive_src"])
 
-def aos_config(name, src, flatbuffers, deps = [], visibility = None):
+def aos_config(name, src, flatbuffers = [], deps = [], visibility = None):
     _aos_config(
         name = name,
         src = src,
diff --git a/aos/configuration.cc b/aos/configuration.cc
index 46f07ac..a40c47c 100644
--- a/aos/configuration.cc
+++ b/aos/configuration.cc
@@ -7,6 +7,8 @@
 #include <arpa/inet.h>
 #include <ifaddrs.h>
 #include <unistd.h>
+
+#include <set>
 #include <string_view>
 
 #include "absl/container/btree_set.h"
@@ -730,5 +732,57 @@
              << static_cast<int>(connection->timestamp_logger());
 }
 
+std::vector<std::string_view> SourceNodeNames(const Configuration *config,
+                                              const Node *my_node) {
+  std::set<std::string_view> result_set;
+
+  for (const Channel *channel : *config->channels()) {
+    if (channel->has_destination_nodes()) {
+      for (const Connection *connection : *channel->destination_nodes()) {
+        if (connection->name()->string_view() ==
+            my_node->name()->string_view()) {
+          result_set.insert(channel->source_node()->string_view());
+        }
+      }
+    }
+  }
+
+  std::vector<std::string_view> result;
+  for (const std::string_view source : result_set) {
+    VLOG(1) << "Found a source node of " << source;
+    result.emplace_back(source);
+  }
+  return result;
+}
+
+std::vector<std::string_view> DestinationNodeNames(const Configuration *config,
+                                                   const Node *my_node) {
+  std::vector<std::string_view> result;
+
+  for (const Channel *channel : *config->channels()) {
+    if (channel->has_source_node() && channel->source_node()->string_view() ==
+                                          my_node->name()->string_view()) {
+      if (!channel->has_destination_nodes()) continue;
+
+      if (channel->source_node()->string_view() !=
+          my_node->name()->string_view()) {
+        continue;
+      }
+
+      for (const Connection *connection : *channel->destination_nodes()) {
+        if (std::find(result.begin(), result.end(),
+                      connection->name()->string_view()) == result.end()) {
+          result.emplace_back(connection->name()->string_view());
+        }
+      }
+    }
+  }
+
+  for (const std::string_view destination : result) {
+    VLOG(1) << "Found a destination node of " << destination;
+  }
+  return result;
+}
+
 }  // namespace configuration
 }  // namespace aos
diff --git a/aos/configuration.h b/aos/configuration.h
index a078dea..ef30fce 100644
--- a/aos/configuration.h
+++ b/aos/configuration.h
@@ -90,6 +90,14 @@
 // Prints a channel to json, but without the schema.
 std::string CleanedChannelToString(const Channel *channel);
 
+// Returns the node names that this node should be forwarding to.
+std::vector<std::string_view> DestinationNodeNames(const Configuration *config,
+                                                   const Node *my_node);
+
+// Returns the node names that this node should be receiving messages from.
+std::vector<std::string_view> SourceNodeNames(const Configuration *config,
+                                              const Node *my_node);
+
 // TODO(austin): GetSchema<T>(const Flatbuffer<Configuration> &config);
 
 }  // namespace configuration
diff --git a/aos/configuration_test.cc b/aos/configuration_test.cc
index fab6745..b8fc5b6 100644
--- a/aos/configuration_test.cc
+++ b/aos/configuration_test.cc
@@ -7,6 +7,7 @@
 #include "flatbuffers/reflection.h"
 #include "glog/logging.h"
 #include "gtest/gtest.h"
+#include "gmock/gmock.h"
 
 namespace aos {
 namespace configuration {
@@ -567,6 +568,37 @@
       &logged_on_both_channel.message(), &baz_node.message(),
       &baz_node.message()));
 }
+
+// Tests that we can deduce source nodes from a multinode config.
+TEST_F(ConfigurationTest, SourceNodeNames) {
+  FlatbufferDetachedBuffer<Configuration> config =
+      ReadConfig("aos/testdata/config1_multinode.json");
+
+  // This is a bit simplistic in that it doesn't test deduplication, but it does
+  // exercise a lot of the logic.
+  EXPECT_THAT(
+      SourceNodeNames(&config.message(), config.message().nodes()->Get(0)),
+      ::testing::ElementsAreArray({"pi2"}));
+  EXPECT_THAT(
+      SourceNodeNames(&config.message(), config.message().nodes()->Get(1)),
+      ::testing::ElementsAreArray({"pi1"}));
+}
+
+// Tests that we can deduce destination nodes from a multinode config.
+TEST_F(ConfigurationTest, DestinationNodeNames) {
+  FlatbufferDetachedBuffer<Configuration> config =
+      ReadConfig("aos/testdata/config1_multinode.json");
+
+  // This is a bit simplistic in that it doesn't test deduplication, but it does
+  // exercise a lot of the logic.
+  EXPECT_THAT(
+      DestinationNodeNames(&config.message(), config.message().nodes()->Get(0)),
+      ::testing::ElementsAreArray({"pi2"}));
+  EXPECT_THAT(
+      DestinationNodeNames(&config.message(), config.message().nodes()->Get(1)),
+      ::testing::ElementsAreArray({"pi1"}));
+}
+
 }  // namespace testing
 }  // namespace configuration
 }  // namespace aos
diff --git a/aos/events/BUILD b/aos/events/BUILD
index e40ab99..79200bd 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -121,6 +121,8 @@
     flatbuffers = [
         ":ping_fbs",
         ":pong_fbs",
+        "//aos/network:message_bridge_client_fbs",
+        "//aos/network:message_bridge_server_fbs",
     ],
     deps = [":config"],
 )
diff --git a/aos/events/logging/BUILD b/aos/events/logging/BUILD
index 97d5783..b2be972 100644
--- a/aos/events/logging/BUILD
+++ b/aos/events/logging/BUILD
@@ -1,4 +1,5 @@
 load("@com_github_google_flatbuffers//:build_defs.bzl", "flatbuffer_cc_library")
+load("//aos:config.bzl", "aos_config")
 
 flatbuffer_cc_library(
     name = "logger_fbs",
@@ -84,11 +85,21 @@
     ],
 )
 
+aos_config(
+    name = "multinode_pingpong_config",
+    src = "multinode_pingpong.json",
+    flatbuffers = [
+        "//aos/events:ping_fbs",
+        "//aos/events:pong_fbs",
+    ],
+    deps = ["//aos/events:config"],
+)
+
 cc_test(
     name = "logger_test",
     srcs = ["logger_test.cc"],
     data = [
-        "//aos/events:multinode_pingpong_config.json",
+        ":multinode_pingpong_config.json",
         "//aos/events:pingpong_config.json",
     ],
     deps = [
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index e1808b1..98f6a74 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -144,7 +144,7 @@
  public:
   MultinodeLoggerTest()
       : config_(aos::configuration::ReadConfig(
-            "aos/events/multinode_pingpong_config.json")),
+            "aos/events/logging/multinode_pingpong_config.json")),
         event_loop_factory_(&config_.message(), "pi1"),
         ping_event_loop_(event_loop_factory_.MakeEventLoop("ping")),
         ping_(ping_event_loop_.get()) {}
diff --git a/aos/events/logging/multinode_pingpong.json b/aos/events/logging/multinode_pingpong.json
new file mode 100644
index 0000000..f85a4e1
--- /dev/null
+++ b/aos/events/logging/multinode_pingpong.json
@@ -0,0 +1,93 @@
+{
+  "channels": [
+    /* Logged on pi1 locally */
+    {
+      "name": "/aos/pi1",
+      "type": "aos.timing.Report",
+      "source_node": "pi1",
+      "frequency": 50,
+      "num_senders": 20,
+      "max_size": 2048
+    },
+    {
+      "name": "/aos/pi2",
+      "type": "aos.timing.Report",
+      "source_node": "pi2",
+      "frequency": 50,
+      "num_senders": 20,
+      "max_size": 2048
+    },
+    /* Forwarded to pi2.
+     * Doesn't matter where timestamps are logged for the test.
+     */
+    {
+      "name": "/test",
+      "type": "aos.examples.Ping",
+      "source_node": "pi1",
+      "destination_nodes": [
+        {
+          "name": "pi2",
+          "priority": 1,
+          "timestamp_logger": "REMOTE_LOGGER",
+          "timestamp_logger_node": "pi1",
+          "time_to_live": 5000000
+        }
+      ]
+    },
+    /* Forwarded back to pi1.
+     * The message is logged both on the sending node and the receiving node
+     * (to make it easier to look at the results for now).
+     *
+     * The timestamps are logged on the receiving node.
+     */
+    {
+      "name": "/test",
+      "type": "aos.examples.Pong",
+      "source_node": "pi2",
+      "logger": "LOCAL_AND_REMOTE_LOGGER",
+      "logger_node": "pi1",
+      "destination_nodes": [
+        {
+          "name": "pi1",
+          "priority": 1,
+          "timestamp_logger": "LOCAL_LOGGER",
+          "time_to_live": 5000000
+        }
+      ]
+    }
+  ],
+  "maps": [
+    {
+      "match": {
+        "name": "/aos",
+        "type": "aos.timing.Report",
+        "source_node": "pi1"
+      },
+      "rename": {
+        "name": "/aos/pi1"
+      }
+    },
+    {
+      "match": {
+        "name": "/aos",
+        "type": "aos.timing.Report",
+        "source_node": "pi2"
+      },
+      "rename": {
+        "name": "/aos/pi2"
+      }
+    }
+  ],
+  "nodes": [
+    {
+      "name": "pi1",
+      "hostname": "raspberrypi",
+      "port": 9971
+    },
+    {
+      "name": "pi2",
+      "hostname": "raspberrypi2",
+      "port": 9971
+    }
+  ]
+}
diff --git a/aos/events/multinode_pingpong.json b/aos/events/multinode_pingpong.json
index f0e532e..c0c5087 100644
--- a/aos/events/multinode_pingpong.json
+++ b/aos/events/multinode_pingpong.json
@@ -2,6 +2,54 @@
   "channels": [
     {
       "name": "/aos/pi1",
+      "type": "aos.message_bridge.ServerStatistics",
+      "source_node": "pi1",
+      "frequency": 2
+    },
+    {
+      "name": "/aos/pi2",
+      "type": "aos.message_bridge.ServerStatistics",
+      "source_node": "pi2",
+      "frequency": 2
+    },
+    {
+      "name": "/aos/pi3",
+      "type": "aos.message_bridge.ServerStatistics",
+      "source_node": "pi3",
+      "frequency": 2
+    },
+    {
+      "name": "/aos/roborio",
+      "type": "aos.message_bridge.ServerStatistics",
+      "source_node": "roborio",
+      "frequency": 2
+    },
+    {
+      "name": "/aos/pi1",
+      "type": "aos.message_bridge.ClientStatistics",
+      "source_node": "pi1",
+      "frequency": 2
+    },
+    {
+      "name": "/aos/pi2",
+      "type": "aos.message_bridge.ClientStatistics",
+      "source_node": "pi2",
+      "frequency": 2
+    },
+    {
+      "name": "/aos/pi3",
+      "type": "aos.message_bridge.ClientStatistics",
+      "source_node": "pi3",
+      "frequency": 2
+    },
+    {
+      "name": "/aos/roborio",
+      "type": "aos.message_bridge.ClientStatistics",
+      "source_node": "roborio",
+      "frequency": 2
+    },
+    {
+      "name": "/aos/pi1",
       "type": "aos.timing.Report",
       "source_node": "pi1",
       "frequency": 50,
@@ -25,6 +73,14 @@
       "max_size": 2048
     },
     {
+      "name": "/aos/roborio",
+      "type": "aos.timing.Report",
+      "source_node": "roborio",
+      "frequency": 50,
+      "num_senders": 20,
+      "max_size": 2048
+    },
+    {
       "name": "/test",
       "type": "aos.examples.Ping",
       "source_node": "pi1",
@@ -32,8 +88,8 @@
         {
           "name": "pi2",
           "priority": 1,
-          "timestamp_logger": "REMOTE_LOGGER",
-          "timestamp_logger_node": "pi1"
+          "timestamp_logger": "LOCAL_LOGGER",
+          "time_to_live": 5000000
         }
       ]
     },
@@ -41,14 +97,12 @@
       "name": "/test",
       "type": "aos.examples.Pong",
       "source_node": "pi2",
-      "logger": "LOCAL_AND_REMOTE_LOGGER",
-      "logger_node": "pi1",
       "destination_nodes": [
         {
           "name": "pi1",
           "priority": 1,
-          "timestamp_logger": "REMOTE_LOGGER",
-          "timestamp_logger_node": "pi1"
+          "timestamp_logger": "LOCAL_LOGGER",
+          "time_to_live": 5000000
         }
       ]
     },
@@ -60,8 +114,8 @@
         {
           "name": "pi3",
           "priority": 1,
-          "timestamp_logger": "REMOTE_LOGGER",
-          "timestamp_logger_node": "pi1"
+          "timestamp_logger": "LOCAL_LOGGER",
+          "time_to_live": 5000000
         }
       ]
     },
@@ -69,14 +123,12 @@
       "name": "/test2",
       "type": "aos.examples.Pong",
       "source_node": "pi3",
-      "logger": "LOCAL_AND_REMOTE_LOGGER",
-      "logger_node": "pi1",
       "destination_nodes": [
         {
           "name": "pi1",
           "priority": 1,
-          "timestamp_logger": "REMOTE_LOGGER",
-          "timestamp_logger_node": "pi1"
+          "timestamp_logger": "LOCAL_LOGGER",
+          "time_to_live": 5000000
         }
       ]
     }
@@ -111,6 +163,96 @@
       "rename": {
         "name": "/aos/pi3"
       }
+    },
+    {
+      "match": {
+        "name": "/aos",
+        "type": "aos.timing.Report",
+        "source_node": "roborio"
+      },
+      "rename": {
+        "name": "/aos/roborio"
+      }
+    },
+    {
+      "match": {
+        "name": "/aos",
+        "type": "aos.message_bridge.ServerStatistics",
+        "source_node": "pi1"
+      },
+      "rename": {
+        "name": "/aos/pi1"
+      }
+    },
+    {
+      "match": {
+        "name": "/aos",
+        "type": "aos.message_bridge.ServerStatistics",
+        "source_node": "pi2"
+      },
+      "rename": {
+        "name": "/aos/pi2"
+      }
+    },
+    {
+      "match": {
+        "name": "/aos",
+        "type": "aos.message_bridge.ServerStatistics",
+        "source_node": "pi3"
+      },
+      "rename": {
+        "name": "/aos/pi3"
+      }
+    },
+    {
+      "match": {
+        "name": "/aos",
+        "type": "aos.message_bridge.ServerStatistics",
+        "source_node": "roborio"
+      },
+      "rename": {
+        "name": "/aos/roborio"
+      }
+    },
+    {
+      "match": {
+        "name": "/aos",
+        "type": "aos.message_bridge.ClientStatistics",
+        "source_node": "pi1"
+      },
+      "rename": {
+        "name": "/aos/pi1"
+      }
+    },
+    {
+      "match": {
+        "name": "/aos",
+        "type": "aos.message_bridge.ClientStatistics",
+        "source_node": "pi2"
+      },
+      "rename": {
+        "name": "/aos/pi2"
+      }
+    },
+    {
+      "match": {
+        "name": "/aos",
+        "type": "aos.message_bridge.ClientStatistics",
+        "source_node": "pi3"
+      },
+      "rename": {
+        "name": "/aos/pi3"
+      }
+    },
+    {
+      "match": {
+        "name": "/aos",
+        "type": "aos.message_bridge.ClientStatistics",
+        "source_node": "roborio"
+      },
+      "rename": {
+        "name": "/aos/roborio"
+      }
     }
   ],
   "nodes": [
@@ -128,6 +270,11 @@
       "name": "pi3",
       "hostname": "raspberrypi3",
       "port": 9971
+    },
+    {
+      "name": "roborio",
+      "hostname": "roboRIO-6971-FRC",
+      "port": 9971
     }
   ],
   "applications": [
@@ -156,6 +303,32 @@
           }
         }
       ]
+    },
+    {
+      "name": "ping3",
+      "maps": [
+        {
+          "match": {
+            "name": "/test"
+          },
+          "rename": {
+            "name": "/test3"
+          }
+        }
+      ]
+    },
+    {
+      "name": "pong3",
+      "maps": [
+        {
+          "match": {
+            "name": "/test"
+          },
+          "rename": {
+            "name": "/test3"
+          }
+        }
+      ]
     }
   ]
 }
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index 3ced933..bdee4f2 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -21,10 +21,26 @@
 #include "aos/util/phased_loop.h"
 #include "glog/logging.h"
 
+namespace {
+
+// Returns the portion of the path after the last /.  This very much assumes
+// that the application name is null terminated.
+const char *Filename(const char *path) {
+  const std::string_view path_string_view = path;
+  auto last_slash_pos = path_string_view.find_last_of("/");
+
+  return last_slash_pos == std::string_view::npos ? path
+                                                  : path + last_slash_pos + 1;
+}
+
+}  // namespace
+
 DEFINE_string(shm_base, "/dev/shm/aos",
               "Directory to place queue backing mmaped files in.");
 DEFINE_uint32(permissions, 0770,
               "Permissions to make shared memory files and folders.");
+DEFINE_string(application_name, Filename(program_invocation_name),
+              "The application name");
 
 namespace aos {
 
@@ -135,15 +151,6 @@
 
 namespace {
 
-// Returns the portion of the path after the last /.
-std::string_view Filename(std::string_view path) {
-  auto last_slash_pos = path.find_last_of("/");
-
-  return last_slash_pos == std::string_view::npos
-             ? path
-             : path.substr(last_slash_pos + 1, path.size());
-}
-
 const Node *MaybeMyNode(const Configuration *configuration) {
   if (!configuration->has_nodes()) {
     return nullptr;
@@ -158,7 +165,7 @@
 
 ShmEventLoop::ShmEventLoop(const Configuration *configuration)
     : EventLoop(configuration),
-      name_(Filename(program_invocation_name)),
+      name_(FLAGS_application_name),
       node_(MaybeMyNode(configuration)) {
   if (configuration->has_nodes()) {
     CHECK(node_ != nullptr) << ": Couldn't find node in config.";
@@ -802,15 +809,16 @@
   }
 
   SignalHandler::global()->Unregister(this);
+
+  // Trigger any remaining senders or fetchers to be cleared before destroying
+  // the event loop so the book keeping matches.  Do this in the thread that
+  // created the timing reporter.
+  timing_report_sender_.reset();
 }
 
 void ShmEventLoop::Exit() { epoll_.Quit(); }
 
 ShmEventLoop::~ShmEventLoop() {
-  // Trigger any remaining senders or fetchers to be cleared before destroying
-  // the event loop so the book keeping matches.
-  timing_report_sender_.reset();
-
   // Force everything with a registered fd with epoll to be destroyed now.
   timers_.clear();
   phased_loops_.clear();
diff --git a/aos/events/shm_event_loop.h b/aos/events/shm_event_loop.h
index d10989e..bb4ad77 100644
--- a/aos/events/shm_event_loop.h
+++ b/aos/events/shm_event_loop.h
@@ -71,6 +71,8 @@
 
   int priority() const override { return priority_; }
 
+  internal::EPoll *epoll() { return &epoll_; }
+
  private:
   friend class internal::WatcherState;
   friend class internal::TimerHandlerState;
diff --git a/aos/flatbuffers.h b/aos/flatbuffers.h
index 4e4520c..4388687 100644
--- a/aos/flatbuffers.h
+++ b/aos/flatbuffers.h
@@ -5,6 +5,7 @@
 #include <string_view>
 
 #include "flatbuffers/flatbuffers.h"
+#include "glog/logging.h"
 
 namespace aos {
 
@@ -31,15 +32,19 @@
 
 // This class is a fixed memory allocator which holds the data for a flatbuffer
 // in an array.
-template <size_t S>
 class FixedAllocator : public FixedAllocatorBase {
  public:
+  FixedAllocator(size_t size) : buffer_(size, 0) {}
+
   uint8_t *data() override { return &buffer_[0]; }
   const uint8_t *data() const override { return &buffer_[0]; }
   size_t size() const override { return buffer_.size(); }
 
+  // Releases the data in the buffer.
+  std::vector<uint8_t> release() { return std::move(buffer_); }
+
  private:
-  std::array<uint8_t, S> buffer_;
+  std::vector<uint8_t> buffer_;
 };
 
 // This class adapts a preallocated memory region to an Allocator.
@@ -82,46 +87,6 @@
   virtual size_t size() const = 0;
 };
 
-// Array backed flatbuffer.
-template <typename T>
-class FlatbufferArray : public Flatbuffer<T> {
- public:
-  // Builds a Flatbuffer by copying the data from the other flatbuffer.
-  FlatbufferArray(const Flatbuffer<T> &other) {
-    CHECK_LE(other.size(), data_.size());
-
-    memcpy(data_.data(), other.data(), other.size());
-    size_ = other.size();
-  }
-
-  // Coppies the data from the other flatbuffer.
-  FlatbufferArray &operator=(const Flatbuffer<T> &other) {
-    CHECK_LE(other.size(), data_.size());
-
-    memcpy(data_.data(), other.data(), other.size());
-    size_ = other.size();
-    return *this;
-  }
-
-  virtual ~FlatbufferArray() override {}
-
-  // Creates a builder wrapping the underlying data.
-  flatbuffers::FlatBufferBuilder FlatBufferBuilder() {
-    data_.deallocate(data_.data(), data_.size());
-    flatbuffers::FlatBufferBuilder fbb(data_.size(), &data_);
-    fbb.ForceDefaults(1);
-    return fbb;
-  }
-
-  const uint8_t *data() const override { return data_.data(); }
-  uint8_t *data() override { return data_.data(); }
-  size_t size() const override { return size_; }
-
- private:
-  FixedAllocator<8 * 1024> data_;
-  size_t size_ = data_.size();
-};
-
 // String backed flatbuffer.
 template <typename T>
 class FlatbufferString : public Flatbuffer<T> {
@@ -228,6 +193,48 @@
   flatbuffers::DetachedBuffer buffer_;
 };
 
+// This object associates the message type with the memory storing the
+// flatbuffer.  This only stores root tables.
+//
+// From a usage point of view, pointers to the data are very different than
+// pointers to the tables.
+template <typename T>
+class SizePrefixedFlatbufferDetachedBuffer final : public Flatbuffer<T> {
+ public:
+  // Builds a Flatbuffer by taking ownership of the buffer.
+  SizePrefixedFlatbufferDetachedBuffer(flatbuffers::DetachedBuffer &&buffer)
+      : buffer_(::std::move(buffer)) {
+    CHECK_GE(buffer_.size(), sizeof(flatbuffers::uoffset_t));
+  }
+
+  // Builds a flatbuffer by taking ownership of the buffer from the other
+  // flatbuffer.
+  SizePrefixedFlatbufferDetachedBuffer(
+      SizePrefixedFlatbufferDetachedBuffer &&fb)
+      : buffer_(::std::move(fb.buffer_)) {}
+  SizePrefixedFlatbufferDetachedBuffer &operator=(
+      SizePrefixedFlatbufferDetachedBuffer &&fb) {
+    ::std::swap(buffer_, fb.buffer_);
+    return *this;
+  }
+
+  virtual ~SizePrefixedFlatbufferDetachedBuffer() override {}
+
+  // Returns references to the buffer, and the data.
+  const flatbuffers::DetachedBuffer &buffer() const { return buffer_; }
+  const uint8_t *data() const override {
+    return buffer_.data() + sizeof(flatbuffers::uoffset_t);
+  }
+  uint8_t *data() override {
+    return buffer_.data() + sizeof(flatbuffers::uoffset_t);
+  }
+  size_t size() const override {
+    return buffer_.size() - sizeof(flatbuffers::uoffset_t);
+  }
+
+ private:
+  flatbuffers::DetachedBuffer buffer_;
+};
 // TODO(austin): Need a way to get our hands on the max size.  Can start with
 // "large" for now.
 
diff --git a/aos/network/BUILD b/aos/network/BUILD
index 5d96183..9d3b3c4 100644
--- a/aos/network/BUILD
+++ b/aos/network/BUILD
@@ -1,5 +1,36 @@
+load("@com_github_google_flatbuffers//:build_defs.bzl", "flatbuffer_cc_library")
+load("//aos:config.bzl", "aos_config")
+
 package(default_visibility = ["//visibility:public"])
 
+flatbuffer_cc_library(
+    name = "connect_fbs",
+    srcs = ["connect.fbs"],
+    gen_reflections = 1,
+    includes = [
+        "//aos:configuration_fbs_includes",
+    ],
+)
+
+flatbuffer_cc_library(
+    name = "message_bridge_client_fbs",
+    srcs = ["message_bridge_client.fbs"],
+    gen_reflections = 1,
+    includes = [
+        ":message_bridge_server_fbs_includes",
+        "//aos:configuration_fbs_includes",
+    ],
+)
+
+flatbuffer_cc_library(
+    name = "message_bridge_server_fbs",
+    srcs = ["message_bridge_server.fbs"],
+    gen_reflections = 1,
+    includes = [
+        "//aos:configuration_fbs_includes",
+    ],
+)
+
 cc_library(
     name = "team_number",
     srcs = [
@@ -25,3 +56,182 @@
         "//aos/testing:googletest",
     ],
 )
+
+cc_library(
+    name = "sctp_lib",
+    srcs = [
+        "sctp_lib.cc",
+    ],
+    hdrs = [
+        "sctp_lib.h",
+    ],
+    copts = [
+        # The casts required to read datastructures from sockets trip -Wcast-align.
+        "-Wno-cast-align",
+    ],
+    deps = [
+        "//aos:unique_malloc_ptr",
+        "//third_party/lksctp-tools:sctp",
+        "@com_github_google_glog//:glog",
+    ],
+)
+
+cc_library(
+    name = "sctp_server",
+    srcs = [
+        "sctp_server.cc",
+    ],
+    hdrs = [
+        "sctp_server.h",
+    ],
+    copts = [
+        "-Wno-cast-align",
+    ],
+    deps = [
+        ":sctp_lib",
+        "//third_party/lksctp-tools:sctp",
+    ],
+)
+
+cc_library(
+    name = "message_bridge_protocol",
+    hdrs = [
+        "message_bridge_protocol.h",
+    ],
+)
+
+cc_library(
+    name = "message_bridge_server_lib",
+    srcs = [
+        "message_bridge_server_lib.cc",
+    ],
+    hdrs = [
+        "message_bridge_server_lib.h",
+    ],
+    copts = [
+        "-Wno-cast-align",
+    ],
+    deps = [
+        ":connect_fbs",
+        ":message_bridge_protocol",
+        ":message_bridge_server_fbs",
+        ":sctp_lib",
+        ":sctp_server",
+        "//aos:unique_malloc_ptr",
+        "//aos/events:shm_event_loop",
+        "//aos/events/logging:logger",
+        "//third_party/lksctp-tools:sctp",
+    ],
+)
+
+cc_binary(
+    name = "message_bridge_server",
+    srcs = [
+        "message_bridge_server.cc",
+    ],
+    deps = [
+        ":message_bridge_server_lib",
+        "//aos:init",
+        "//aos:json_to_flatbuffer",
+        "//aos/events:shm_event_loop",
+    ],
+)
+
+cc_library(
+    name = "sctp_client",
+    srcs = [
+        "sctp_client.cc",
+    ],
+    hdrs = [
+        "sctp_client.h",
+    ],
+    copts = [
+        "-Wno-cast-align",
+    ],
+    deps = [
+        ":sctp_lib",
+        "//third_party/lksctp-tools:sctp",
+    ],
+)
+
+cc_library(
+    name = "message_bridge_client_lib",
+    srcs = [
+        "message_bridge_client_lib.cc",
+    ],
+    hdrs = [
+        "message_bridge_client_lib.h",
+    ],
+    copts = [
+        "-Wno-cast-align",
+    ],
+    deps = [
+        ":connect_fbs",
+        ":message_bridge_client_fbs",
+        ":message_bridge_protocol",
+        ":message_bridge_server_fbs",
+        ":sctp_client",
+        "//aos/events:shm_event_loop",
+        "//aos/events/logging:logger",
+    ],
+)
+
+cc_binary(
+    name = "message_bridge_client",
+    srcs = [
+        "message_bridge_client.cc",
+    ],
+    copts = [
+        "-Wno-cast-align",
+    ],
+    deps = [
+        ":message_bridge_client_lib",
+        "//aos:init",
+        "//aos:json_to_flatbuffer",
+        "//aos/events:shm_event_loop",
+    ],
+)
+
+aos_config(
+    name = "message_bridge_test_common_config",
+    src = "message_bridge_test_common.json",
+    flatbuffers = [
+        "//aos/events:ping_fbs",
+        "//aos/events:pong_fbs",
+        "//aos/network:message_bridge_client_fbs",
+        "//aos/network:message_bridge_server_fbs",
+    ],
+    deps = ["//aos/events:config"],
+)
+
+aos_config(
+    name = "message_bridge_test_server_config",
+    src = "message_bridge_test_server.json",
+    deps = [":message_bridge_test_common_config"],
+)
+
+aos_config(
+    name = "message_bridge_test_client_config",
+    src = "message_bridge_test_client.json",
+    deps = [":message_bridge_test_common_config"],
+)
+
+cc_test(
+    name = "message_bridge_test",
+    srcs = [
+        "message_bridge_test.cc",
+    ],
+    data = [
+        ":message_bridge_test_client_config.json",
+        ":message_bridge_test_server_config.json",
+    ],
+    deps = [
+        ":message_bridge_client_lib",
+        ":message_bridge_server_lib",
+        "//aos:json_to_flatbuffer",
+        "//aos/events:ping_fbs",
+        "//aos/events:pong_fbs",
+        "//aos/events:shm_event_loop",
+        "//aos/testing:googletest",
+    ],
+)
diff --git a/aos/network/connect.fbs b/aos/network/connect.fbs
new file mode 100644
index 0000000..32893b8
--- /dev/null
+++ b/aos/network/connect.fbs
@@ -0,0 +1,13 @@
+include "aos/configuration.fbs";
+
+namespace aos.message_bridge;
+
+// This is the message sent to initiate a connection to a message_bridge.
+// It communicates the channels that need to be forwarded back.
+table Connect {
+  // The node making the request.
+  node:aos.Node;
+
+  // The channels that we want transfered to this client.
+  channels_to_transfer:[Channel];
+}
diff --git a/aos/network/message_bridge_client.cc b/aos/network/message_bridge_client.cc
new file mode 100644
index 0000000..c44eee0
--- /dev/null
+++ b/aos/network/message_bridge_client.cc
@@ -0,0 +1,36 @@
+#include "aos/network/message_bridge_client_lib.h"
+
+#include "aos/events/shm_event_loop.h"
+#include "aos/init.h"
+
+DEFINE_string(config, "multinode_pingpong_config.json", "Path to the config.");
+
+namespace aos {
+namespace message_bridge {
+
+int Main() {
+  aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+      aos::configuration::ReadConfig(FLAGS_config);
+
+  aos::ShmEventLoop event_loop(&config.message());
+
+  MessageBridgeClient app(&event_loop);
+
+  // TODO(austin): Save messages into a vector to be logged.  One file per
+  // channel?  Need to sort out ordering.
+  //
+  // TODO(austin): Low priority, "reliable" logging channel.
+
+  event_loop.Run();
+
+  return EXIT_SUCCESS;
+}
+
+}  // namespace message_bridge
+}  // namespace aos
+
+int main(int argc, char **argv) {
+  aos::InitGoogle(&argc, &argv);
+
+  return aos::message_bridge::Main();
+}
diff --git a/aos/network/message_bridge_client.fbs b/aos/network/message_bridge_client.fbs
new file mode 100644
index 0000000..58b653a
--- /dev/null
+++ b/aos/network/message_bridge_client.fbs
@@ -0,0 +1,24 @@
+include "aos/network/message_bridge_server.fbs";
+
+namespace aos.message_bridge;
+
+// Statistics from a single client connection to a server.
+table ClientConnection {
+  // The node that we are connected to.
+  node:Node;
+
+  // Health of this connection.  Connected or not?
+  state:State;
+
+  // Number of packets received on all channels.
+  received_packets:uint;
+
+  // TODO(austin): Per channel counts?
+}
+
+// Statistics for all clients.
+table ClientStatistics {
+  connections:[ClientConnection];
+}
+
+root_type ClientStatistics;
diff --git a/aos/network/message_bridge_client_lib.cc b/aos/network/message_bridge_client_lib.cc
new file mode 100644
index 0000000..3652db2
--- /dev/null
+++ b/aos/network/message_bridge_client_lib.cc
@@ -0,0 +1,361 @@
+#include "aos/network/message_bridge_client_lib.h"
+
+#include <chrono>
+#include <string_view>
+
+#include "aos/events/logging/logger.h"
+#include "aos/events/shm_event_loop.h"
+#include "aos/network/connect_generated.h"
+#include "aos/network/message_bridge_client_generated.h"
+#include "aos/network/message_bridge_protocol.h"
+#include "aos/network/sctp_client.h"
+#include "aos/unique_malloc_ptr.h"
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+
+// This application receives messages from another node and re-publishes them on
+// this node.
+//
+// To simulate packet loss for testing, run:
+//   tc qdisc add dev eth0 root netem loss random 10
+// To restore it, run:
+//   tc qdisc del dev eth0 root netem
+
+namespace aos {
+namespace message_bridge {
+namespace {
+namespace chrono = std::chrono;
+
+aos::FlatbufferDetachedBuffer<aos::message_bridge::Connect> MakeConnectMessage(
+    const Configuration *config, const Node *my_node,
+    std::string_view remote_name) {
+  CHECK(config->has_nodes()) << ": Config must have nodes to transfer.";
+
+  flatbuffers::FlatBufferBuilder fbb;
+
+  flatbuffers::Offset<Node> node_offset = CopyFlatBuffer<Node>(my_node, &fbb);
+  const std::string_view node_name = my_node->name()->string_view();
+
+  std::vector<flatbuffers::Offset<Channel>> channel_offsets;
+  for (const Channel *channel : *config->channels()) {
+    if (channel->has_destination_nodes()) {
+      for (const Connection *connection : *channel->destination_nodes()) {
+        if (connection->name()->string_view() == node_name &&
+            channel->source_node()->string_view() == remote_name) {
+          channel_offsets.emplace_back(CopyFlatBuffer<Channel>(channel, &fbb));
+        }
+      }
+    }
+  }
+
+  flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Channel>>>
+      channels_offset = fbb.CreateVector(channel_offsets);
+
+  Connect::Builder connect_builder(fbb);
+  connect_builder.add_channels_to_transfer(channels_offset);
+  connect_builder.add_node(node_offset);
+  fbb.Finish(connect_builder.Finish());
+
+  return fbb.Release();
+}
+
+std::vector<int> StreamToChannel(const Configuration *config,
+                                 const Node *my_node, const Node *other_node) {
+  std::vector<int> stream_to_channel;
+  int channel_index = 0;
+  for (const Channel *channel : *config->channels()) {
+    if (configuration::ChannelIsSendableOnNode(channel, other_node)) {
+      const Connection *connection =
+          configuration::ConnectionToNode(channel, my_node);
+      if (connection != nullptr) {
+        stream_to_channel.emplace_back(channel_index);
+      }
+    }
+    ++channel_index;
+  }
+
+  return stream_to_channel;
+}
+
+std::vector<bool> StreamReplyWithTimestamp(const Configuration *config,
+                                           const Node *my_node,
+                                           const Node *other_node) {
+  std::vector<bool> stream_reply_with_timestamp;
+  int channel_index = 0;
+  for (const Channel *channel : *config->channels()) {
+    if (configuration::ChannelIsSendableOnNode(channel, other_node)) {
+      const Connection *connection =
+          configuration::ConnectionToNode(channel, my_node);
+      if (connection != nullptr) {
+        // We want to reply with a timestamp if the other node is logging the
+        // timestamp (and it therefore needs the timestamp), or if we are
+        // logging the message and it needs to know if we received it so it can
+        // log (in the future) it through different mechanisms on failure.
+        stream_reply_with_timestamp.emplace_back(
+            configuration::ConnectionDeliveryTimeIsLoggedOnNode(connection,
+                                                                other_node) ||
+            configuration::ChannelMessageIsLoggedOnNode(channel, my_node));
+      }
+    }
+    ++channel_index;
+  }
+
+  return stream_reply_with_timestamp;
+}
+
+aos::FlatbufferDetachedBuffer<aos::logger::MessageHeader>
+MakeMessageHeaderReply() {
+  flatbuffers::FlatBufferBuilder fbb;
+  logger::MessageHeader::Builder message_header_builder(fbb);
+  message_header_builder.add_channel_index(0);
+  message_header_builder.add_monotonic_sent_time(0);
+  message_header_builder.add_monotonic_remote_time(0);
+  message_header_builder.add_realtime_remote_time(0);
+  message_header_builder.add_remote_queue_index(0);
+  fbb.Finish(message_header_builder.Finish());
+
+  return fbb.Release();
+}
+
+FlatbufferDetachedBuffer<ClientStatistics> MakeClientStatistics(
+    const std::vector<std::string_view> &source_node_names,
+    const Configuration *configuration) {
+  flatbuffers::FlatBufferBuilder fbb;
+
+  std::vector<flatbuffers::Offset<ClientConnection>> connection_offsets;
+  for (const std::string_view node_name : source_node_names) {
+    flatbuffers::Offset<Node> node_offset =
+        CopyFlatBuffer(configuration::GetNode(configuration, node_name), &fbb);
+    ClientConnection::Builder connection_builder(fbb);
+    connection_builder.add_node(node_offset);
+    connection_builder.add_state(State::DISCONNECTED);
+    // TODO(austin): Track dropped packets.
+    connection_builder.add_received_packets(0);
+    connection_offsets.emplace_back(connection_builder.Finish());
+  }
+  flatbuffers::Offset<
+      flatbuffers::Vector<flatbuffers::Offset<ClientConnection>>>
+      connections_offset = fbb.CreateVector(connection_offsets);
+
+  ClientStatistics::Builder client_statistics_builder(fbb);
+  client_statistics_builder.add_connections(connections_offset);
+  fbb.Finish(client_statistics_builder.Finish());
+
+  return fbb.Release();
+}
+
+}  // namespace
+
+SctpClientConnection::SctpClientConnection(
+    aos::ShmEventLoop *const event_loop, std::string_view remote_name,
+    const Node *my_node, std::string_view local_host,
+    std::vector<std::unique_ptr<aos::RawSender>> *channels,
+    ClientConnection *connection)
+    : event_loop_(event_loop),
+      connect_message_(MakeConnectMessage(event_loop->configuration(), my_node,
+                                          remote_name)),
+      message_reception_reply_(MakeMessageHeaderReply()),
+      remote_node_(CHECK_NOTNULL(
+          configuration::GetNode(event_loop->configuration(), remote_name))),
+      client_(remote_node_->hostname()->string_view(), remote_node_->port(),
+              connect_message_.message().channels_to_transfer()->size() +
+                  kControlStreams(),
+              local_host, 0),
+      channels_(channels),
+      stream_to_channel_(
+          StreamToChannel(event_loop->configuration(), my_node, remote_node_)),
+      stream_reply_with_timestamp_(StreamReplyWithTimestamp(
+          event_loop->configuration(), my_node, remote_node_)),
+      connection_(connection) {
+  VLOG(1) << "Connect request for " << remote_node_->name()->string_view()
+          << ": " << FlatbufferToJson(connect_message_);
+
+  connect_timer_ = event_loop_->AddTimer([this]() { SendConnect(); });
+  event_loop_->OnRun(
+      [this]() { connect_timer_->Setup(event_loop_->monotonic_now()); });
+
+  event_loop_->epoll()->OnReadable(client_.fd(),
+                                   [this]() { MessageReceived(); });
+}
+
+void SctpClientConnection::MessageReceived() {
+  // Dispatch the message to the correct receiver.
+  aos::unique_c_ptr<Message> message = client_.Read();
+
+  if (message->message_type == Message::kNotification) {
+    const union sctp_notification *snp =
+        (const union sctp_notification *)message->data();
+
+    switch (snp->sn_header.sn_type) {
+      case SCTP_ASSOC_CHANGE: {
+        const struct sctp_assoc_change *sac = &snp->sn_assoc_change;
+        switch (sac->sac_state) {
+          case SCTP_COMM_UP:
+            NodeConnected(sac->sac_assoc_id);
+
+            VLOG(1) << "Received up from " << message->PeerAddress() << " on "
+                    << sac->sac_assoc_id;
+            break;
+          case SCTP_COMM_LOST:
+          case SCTP_SHUTDOWN_COMP:
+          case SCTP_CANT_STR_ASSOC: {
+            NodeDisconnected();
+          } break;
+          case SCTP_RESTART:
+            LOG(FATAL) << "Never seen this before.";
+            break;
+        }
+      } break;
+    }
+
+    if (VLOG_IS_ON(1)) {
+      PrintNotification(message.get());
+    }
+  } else if (message->message_type == Message::kMessage) {
+    HandleData(message.get());
+  }
+}
+
+void SctpClientConnection::SendConnect() {
+  // Try to send the connect message.  If that fails, retry.
+  if (!client_.Send(kConnectStream(),
+                    std::string_view(
+                        reinterpret_cast<const char *>(connect_message_.data()),
+                        connect_message_.size()),
+                    0)) {
+    NodeDisconnected();
+  }
+}
+
+void SctpClientConnection::NodeConnected(sctp_assoc_t assoc_id) {
+  connect_timer_->Disable();
+
+  // We want to tell the kernel to schedule the packets on this new stream with
+  // the priority scheduler.  This only needs to be done once per stream.
+  client_.SetPriorityScheduler(assoc_id);
+
+  remote_assoc_id_ = assoc_id;
+  connection_->mutate_state(State::CONNECTED);
+}
+
+void SctpClientConnection::NodeDisconnected() {
+  connect_timer_->Setup(
+      event_loop_->monotonic_now() + chrono::milliseconds(100),
+      chrono::milliseconds(100));
+  remote_assoc_id_ = 0;
+  connection_->mutate_state(State::DISCONNECTED);
+}
+
+void SctpClientConnection::HandleData(const Message *message) {
+  const logger::MessageHeader *message_header =
+      flatbuffers::GetSizePrefixedRoot<logger::MessageHeader>(message->data());
+
+  connection_->mutate_received_packets(connection_->received_packets() + 1);
+
+  const int stream = message->header.rcvinfo.rcv_sid - kControlStreams();
+
+  // Publish the message.
+  RawSender *sender = (*channels_)[stream_to_channel_[stream]].get();
+  sender->Send(message_header->data()->data(), message_header->data()->size(),
+               aos::monotonic_clock::time_point(
+                   chrono::nanoseconds(message_header->monotonic_sent_time())),
+               aos::realtime_clock::time_point(
+                   chrono::nanoseconds(message_header->realtime_sent_time())),
+               message_header->queue_index());
+
+  if (stream_reply_with_timestamp_[stream]) {
+    // TODO(austin): Send back less if we are only acking.  Maybe only a
+    // stream id?  Nothing if we are only forwarding?
+
+    // Now fill out the message received reply.  This uses a MessageHeader
+    // container so it can be directly logged.
+    message_reception_reply_.mutable_message()->mutate_channel_index(
+        message_header->channel_index());
+    message_reception_reply_.mutable_message()->mutate_monotonic_sent_time(
+        message_header->monotonic_sent_time());
+
+    // And capture the relevant data needed to generate the forwarding
+    // MessageHeader.
+    message_reception_reply_.mutable_message()->mutate_monotonic_remote_time(
+        sender->monotonic_sent_time().time_since_epoch().count());
+    message_reception_reply_.mutable_message()->mutate_realtime_remote_time(
+        sender->realtime_sent_time().time_since_epoch().count());
+    message_reception_reply_.mutable_message()->mutate_remote_queue_index(
+        sender->sent_queue_index());
+
+    // Unique ID is channel_index and monotonic clock.
+    // TODO(austin): Depending on if we are the logger node or not, we need to
+    // guarentee that this ack gets received too...  Same path as the logger.
+    client_.Send(kTimestampStream(),
+                 std::string_view(reinterpret_cast<const char *>(
+                                      message_reception_reply_.data()),
+                                  message_reception_reply_.size()),
+                 0);
+  }
+
+  VLOG(1) << "Received data of length " << message->size << " from "
+          << message->PeerAddress();
+
+  if (VLOG_IS_ON(1)) {
+    client_.LogSctpStatus(message->header.rcvinfo.rcv_assoc_id);
+  }
+
+  VLOG(2) << "\tSNDRCV (stream=" << message->header.rcvinfo.rcv_sid
+          << " ssn=" << message->header.rcvinfo.rcv_ssn
+          << " tsn=" << message->header.rcvinfo.rcv_tsn << " flags=0x"
+          << std::hex << message->header.rcvinfo.rcv_flags << std::dec
+          << " ppid=" << message->header.rcvinfo.rcv_ppid
+          << " cumtsn=" << message->header.rcvinfo.rcv_cumtsn << ")";
+}
+
+MessageBridgeClient::MessageBridgeClient(aos::ShmEventLoop *event_loop)
+    : event_loop_(event_loop),
+      sender_(event_loop_->MakeSender<ClientStatistics>("/aos")),
+      source_node_names_(configuration::SourceNodeNames(
+          event_loop->configuration(), event_loop->node())),
+      statistics_(MakeClientStatistics(source_node_names_,
+                                       event_loop->configuration())) {
+  std::string_view node_name = event_loop->node()->name()->string_view();
+
+  // Find all the channels which are supposed to be delivered to us.
+  channels_.resize(event_loop_->configuration()->channels()->size());
+  int channel_index = 0;
+  for (const Channel *channel : *event_loop_->configuration()->channels()) {
+    if (channel->has_destination_nodes()) {
+      for (const Connection *connection : *channel->destination_nodes()) {
+        if (connection->name()->string_view() == node_name) {
+          // Give the config a chance to remap us.  This helps with testing on a
+          // single node.
+          const Channel *mapped_channel = configuration::GetChannel(
+              event_loop_->configuration(), channel->name()->string_view(),
+              channel->type()->string_view(), event_loop_->name(),
+              event_loop_->node());
+          channels_[channel_index] = event_loop_->MakeRawSender(mapped_channel);
+          break;
+        }
+      }
+    }
+    ++channel_index;
+  }
+
+  // Now, for each source node, build a connection.
+  int node_index = 0;
+  for (const std::string_view source_node : source_node_names_) {
+    // Open an unspecified connection (:: in ipv6 terminology)
+    connections_.emplace_back(new SctpClientConnection(
+        event_loop, source_node, event_loop->node(), "::", &channels_,
+        statistics_.mutable_message()->mutable_connections()->GetMutableObject(
+            node_index)));
+    ++node_index;
+  }
+
+  // And kick it all off.
+  statistics_timer_ = event_loop_->AddTimer([this]() { SendStatistics(); });
+  event_loop_->OnRun([this]() {
+    statistics_timer_->Setup(event_loop_->monotonic_now() + chrono::seconds(1),
+                             chrono::seconds(1));
+  });
+}
+
+}  // namespace message_bridge
+}  // namespace aos
diff --git a/aos/network/message_bridge_client_lib.h b/aos/network/message_bridge_client_lib.h
new file mode 100644
index 0000000..77bf2b7
--- /dev/null
+++ b/aos/network/message_bridge_client_lib.h
@@ -0,0 +1,116 @@
+#ifndef AOS_NETWORK_MESSAGE_BRIDGE_CLIENT_LIB_H_
+#define AOS_NETWORK_MESSAGE_BRIDGE_CLIENT_LIB_H_
+
+#include <string_view>
+
+#include "aos/events/event_loop.h"
+#include "aos/events/logging/logger_generated.h"
+#include "aos/events/shm_event_loop.h"
+#include "aos/network/connect_generated.h"
+#include "aos/network/message_bridge_client_generated.h"
+#include "aos/network/sctp_client.h"
+#include "aos/network/sctp_lib.h"
+
+namespace aos {
+namespace message_bridge {
+
+// See message_bridge_protocol.h for more details about the protocol.
+
+// This class encapsulates all the state required to connect to a server and
+// transmit messages.
+class SctpClientConnection {
+ public:
+  SctpClientConnection(aos::ShmEventLoop *const event_loop,
+                       std::string_view remote_name, const Node *my_node,
+                       std::string_view local_host,
+                       std::vector<std::unique_ptr<aos::RawSender>> *channels,
+                       ClientConnection *connection);
+
+  ~SctpClientConnection() { event_loop_->epoll()->DeleteFd(client_.fd()); }
+
+ private:
+  // Reads a message from the socket.  Could be a notification.
+  void MessageReceived();
+
+  // Sends a connection request message.
+  void SendConnect();
+
+  // Called when the server connection succeeds.
+  void NodeConnected(sctp_assoc_t assoc_id);
+  // Called when the server connection disconnects.
+  void NodeDisconnected();
+  void HandleData(const Message *message);
+
+  // Event loop to register the server on.
+  aos::ShmEventLoop *const event_loop_;
+
+  // Message to send on connect.
+  const aos::FlatbufferDetachedBuffer<aos::message_bridge::Connect>
+      connect_message_;
+
+  // Starting point for the message reception reply (including timestamps).
+  aos::FlatbufferDetachedBuffer<aos::logger::MessageHeader>
+      message_reception_reply_;
+
+  // Node we are sending to.
+  const aos::Node *const remote_node_;
+
+  // SCTP client.  There is a client per connection so we don't have to deal
+  // with association ids nearly as badly.
+  SctpClient client_;
+
+  // Channels to send received messages on.
+  std::vector<std::unique_ptr<aos::RawSender>> *channels_;
+  // Stream number -> channel lookup.
+  std::vector<int> stream_to_channel_;
+  // Bitmask signaling if we should be replying back with delivery times.
+  std::vector<bool> stream_reply_with_timestamp_;
+
+  // Timer which fires to handle reconnections.
+  aos::TimerHandler *connect_timer_;
+
+  // ClientConnection statistics message to modify.  This will be published
+  // periodicially.
+  ClientConnection *connection_;
+
+  // id of the server once known.  This is only valid if connection_ says
+  // connected.
+  sctp_assoc_t remote_assoc_id_ = 0;
+};
+
+// This encapsulates the state required to talk to *all* the servers from this
+// node.
+class MessageBridgeClient {
+ public:
+  MessageBridgeClient(aos::ShmEventLoop *event_loop);
+
+  ~MessageBridgeClient() {}
+
+ private:
+  // Sends out the statistics that are continually updated by the
+  // SctpClientConnections.
+  void SendStatistics() { sender_.Send(statistics_); }
+
+  // Event loop to schedule everything on.
+  aos::ShmEventLoop *event_loop_;
+  // Sender to publish statistics on.
+  aos::Sender<ClientStatistics> sender_;
+  aos::TimerHandler *statistics_timer_;
+
+  // Nodes to receive data from.
+  const std::vector<std::string_view> source_node_names_;
+
+  // Data to publish.
+  FlatbufferDetachedBuffer<ClientStatistics> statistics_;
+
+  // Channels to send data over.
+  std::vector<std::unique_ptr<aos::RawSender>> channels_;
+
+  // List of connections.  These correspond to the nodes in source_node_names_
+  std::vector<std::unique_ptr<SctpClientConnection>> connections_;
+};
+
+}  // namespace message_bridge
+}  // namespace aos
+
+#endif  // AOS_NETWORK_MESSAGE_BRIDGE_CLIENT_LIB_H_
diff --git a/aos/network/message_bridge_protocol.h b/aos/network/message_bridge_protocol.h
new file mode 100644
index 0000000..1136188
--- /dev/null
+++ b/aos/network/message_bridge_protocol.h
@@ -0,0 +1,31 @@
+#ifndef AOS_NETWORK_MESSAGE_BRIDGE_PROTOCOL_H_
+#define AOS_NETWORK_MESSAGE_BRIDGE_PROTOCOL_H_
+
+namespace aos {
+namespace message_bridge {
+
+// The protocol between the message_bridge_client and server is pretty simple.
+// The overarching design philosophy is that the server sends data to the
+// client, and the client (optionally) sends timestamps back.
+//
+// 1) A connection is established by the client sending the server a Connect
+//    flatbuffer on stream 0.
+// 2) The server then replies with the data, as it is available, on streams 2 +
+//    channel_id in the Connect message.
+// 3) The client (optionally) replies on stream 1 with MessageHeader flatbuffers
+//    with the timestamps that the messages were received.
+//
+// Most of the complexity from there is handling multiple clients and servers
+// and persuading SCTP to do what we want.
+
+// Number of streams reserved for control messages.
+constexpr size_t kControlStreams() { return 2; }
+// The stream on which Connect messages are sent.
+constexpr size_t kConnectStream() { return 0; }
+// The stream on which timestamp replies are sent.
+constexpr size_t kTimestampStream() { return 1; }
+
+}  // namespace message_bridge
+}  // namespace aos
+
+#endif  // AOS_NETWORK_MESSAGE_BRIDGE_PROTOCOL_H_
diff --git a/aos/network/message_bridge_server.cc b/aos/network/message_bridge_server.cc
new file mode 100644
index 0000000..fa5e7c1
--- /dev/null
+++ b/aos/network/message_bridge_server.cc
@@ -0,0 +1,35 @@
+#include "aos/events/shm_event_loop.h"
+#include "aos/init.h"
+#include "aos/network/message_bridge_server_lib.h"
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+
+DEFINE_string(config, "multinode_pingpong_config.json", "Path to the config.");
+
+namespace aos {
+namespace message_bridge {
+
+int Main() {
+  aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+      aos::configuration::ReadConfig(FLAGS_config);
+
+  aos::ShmEventLoop event_loop(&config.message());
+
+  MessageBridgeServer app(&event_loop);
+
+  // TODO(austin): Track which messages didn't make it in time and need to be
+  // logged locally and forwarded.
+
+  event_loop.Run();
+
+  return EXIT_SUCCESS;
+}
+
+}  // namespace message_bridge
+}  // namespace aos
+
+int main(int argc, char **argv) {
+  aos::InitGoogle(&argc, &argv);
+
+  return aos::message_bridge::Main();
+}
diff --git a/aos/network/message_bridge_server.fbs b/aos/network/message_bridge_server.fbs
new file mode 100644
index 0000000..75a6a15
--- /dev/null
+++ b/aos/network/message_bridge_server.fbs
@@ -0,0 +1,33 @@
+include "aos/configuration.fbs";
+
+namespace aos.message_bridge;
+
+// State of the connection.
+enum State: ubyte {
+  CONNECTED,
+  DISCONNECTED,
+}
+
+// Statistics from a single connection to a client from this server.
+table ServerConnection {
+  // The node that we are connected to.
+  node:Node;
+
+  // Health of this connection.  Connected or not?
+  state:State;
+
+  // Number of packets that have been dropped (if known).
+  dropped_packets:uint;
+
+  // Number of packets received on all channels.
+  sent_packets:uint;
+
+  // TODO(austin): Per channel counts?
+}
+
+// Statistics for all connections to all the clients.
+table ServerStatistics {
+  connections:[ServerConnection];
+}
+
+root_type ServerStatistics;
diff --git a/aos/network/message_bridge_server_lib.cc b/aos/network/message_bridge_server_lib.cc
new file mode 100644
index 0000000..38958ba
--- /dev/null
+++ b/aos/network/message_bridge_server_lib.cc
@@ -0,0 +1,367 @@
+#include "aos/network/message_bridge_server_lib.h"
+
+#include "absl/types/span.h"
+#include "aos/events/logging/logger.h"
+#include "aos/events/logging/logger_generated.h"
+#include "aos/events/shm_event_loop.h"
+#include "aos/network/connect_generated.h"
+#include "aos/network/message_bridge_protocol.h"
+#include "aos/network/message_bridge_server_generated.h"
+#include "aos/network/sctp_server.h"
+#include "glog/logging.h"
+
+namespace aos {
+namespace message_bridge {
+namespace {
+
+namespace chrono = std::chrono;
+
+// Builds up the "empty" server statistics message to be pointed to by all the
+// connections, updated at runtime, and periodically sent.
+FlatbufferDetachedBuffer<ServerStatistics> MakeServerStatistics(
+    const std::vector<std::string_view> &source_node_names,
+    const Configuration *configuration) {
+  flatbuffers::FlatBufferBuilder fbb;
+
+  std::vector<flatbuffers::Offset<ServerConnection>> connection_offsets;
+  for (const std::string_view node_name : source_node_names) {
+    flatbuffers::Offset<Node> node_offset =
+        CopyFlatBuffer(configuration::GetNode(configuration, node_name), &fbb);
+    ServerConnection::Builder connection_builder(fbb);
+    connection_builder.add_node(node_offset);
+    connection_builder.add_state(State::DISCONNECTED);
+    connection_builder.add_dropped_packets(0);
+    connection_builder.add_sent_packets(0);
+    connection_offsets.emplace_back(connection_builder.Finish());
+  }
+  flatbuffers::Offset<
+      flatbuffers::Vector<flatbuffers::Offset<ServerConnection>>>
+      connections_offset = fbb.CreateVector(connection_offsets);
+
+  ServerStatistics::Builder server_statistics_builder(fbb);
+  server_statistics_builder.add_connections(connections_offset);
+  fbb.Finish(server_statistics_builder.Finish());
+
+  return fbb.Release();
+}
+
+// Finds the statistics for the provided node name.
+ServerConnection *FindServerConnection(ServerStatistics *statistics,
+                                       std::string_view node_name) {
+  ServerConnection *matching_server_connection = nullptr;
+  for (size_t i = 0; i < statistics->mutable_connections()->size(); ++i) {
+    ServerConnection *server_connection =
+        statistics->mutable_connections()->GetMutableObject(i);
+    if (server_connection->node()->name()->string_view() == node_name) {
+      matching_server_connection = server_connection;
+      break;
+    }
+  }
+
+  CHECK(matching_server_connection != nullptr) << ": Unknown client";
+
+  return matching_server_connection;
+}
+
+}  // namespace
+
+bool ChannelState::Matches(const Channel *other_channel) {
+  // Confirm the normal tuple, plus make sure that the other side isn't going to
+  // send more data over than we expect with a mismatching size.
+  return (
+      channel_->name()->string_view() == other_channel->name()->string_view() &&
+      channel_->type()->string_view() == other_channel->type()->string_view() &&
+      channel_->max_size() == other_channel->max_size());
+}
+
+void ChannelState::SendData(SctpServer *server, const Context &context) {
+  // TODO(austin): I don't like allocating this buffer when we are just freeing
+  // it at the end of the function.
+  flatbuffers::FlatBufferBuilder fbb(channel_->max_size() + 100);
+  VLOG(1) << "Found " << peers_.size() << " peers on channel "
+          << channel_->name()->string_view() << " size " << context.size;
+
+  // TODO(austin): Use an iovec to build it up in 3 parts to avoid the copy?
+  // Only useful when not logging.
+  fbb.FinishSizePrefixed(logger::PackMessage(&fbb, context, channel_index_,
+                                             logger::LogType::kLogMessage));
+
+  // TODO(austin): Track which connections need to be reliable and handle
+  // resending properly.
+  size_t sent_count = 0;
+  bool logged_remotely = false;
+  for (Peer &peer : peers_) {
+    logged_remotely = logged_remotely || peer.logged_remotely;
+
+    if (peer.sac_assoc_id != 0) {
+      server->Send(std::string_view(
+                       reinterpret_cast<const char *>(fbb.GetBufferPointer()),
+                       fbb.GetSize()),
+                   peer.sac_assoc_id, peer.stream,
+                   peer.connection->time_to_live() / 1000000);
+      peer.server_connection_statistics->mutate_sent_packets(
+          peer.server_connection_statistics->sent_packets() + 1);
+      if (peer.logged_remotely) {
+        ++sent_count;
+      }
+    } else {
+      peer.server_connection_statistics->mutate_dropped_packets(
+          peer.server_connection_statistics->dropped_packets() + 1);
+    }
+  }
+
+  if (logged_remotely) {
+    if (sent_count == 0) {
+      VLOG(1) << "No clients, rejecting";
+      HandleFailure(fbb.Release());
+    } else {
+      sent_messages_.emplace_back(fbb.Release());
+    }
+  } else {
+    VLOG(1) << "Not bothering to track this message since nobody cares.";
+  }
+
+  // TODO(austin): Limit the size of this queue.  Flush messages to disk
+  // which are too old.  We really care about messages which didn't make it
+  // to a logger...  Which is a new concept or two.
+
+  // Need to handle logging and disk in another thread.  Need other thread
+  // since we sometimes want to skip disk, and synchronization is easier.
+  // This thread then spins on the queue until empty, then polls at 1-10 hz.
+
+  // TODO(austin): ~10 MB chunks on disk and push them over the logging
+  // channel?  Threadsafe disk backed queue object which can handle restarts
+  // and flushes.  Whee.
+}
+
+void ChannelState::HandleDelivery(sctp_assoc_t /*rcv_assoc_id*/,
+                                  uint16_t /*ssn*/,
+                                  absl::Span<const uint8_t> data) {
+  const logger::MessageHeader *message_header =
+      flatbuffers::GetRoot<logger::MessageHeader>(data.data());
+  while (sent_messages_.size() > 0u) {
+    if (sent_messages_.begin()->message().monotonic_sent_time() ==
+        message_header->monotonic_sent_time()) {
+      sent_messages_.pop_front();
+      continue;
+    }
+
+    if (sent_messages_.begin()->message().monotonic_sent_time() <
+        message_header->monotonic_sent_time()) {
+      VLOG(1) << "Delivery looks wrong, rejecting";
+      HandleFailure(std::move(sent_messages_.front()));
+      sent_messages_.pop_front();
+      continue;
+    }
+
+    break;
+  }
+}
+
+void ChannelState::HandleFailure(
+    SizePrefixedFlatbufferDetachedBuffer<logger::MessageHeader> &&message) {
+  // TODO(austin): Put it in the log queue.
+  LOG(INFO) << "Failed to send " << FlatbufferToJson(message);
+
+  // Note: this may be really out of order when we avoid the queue...  We
+  // have the ones we know didn't make it immediately, and the ones which
+  // time out eventually.  Need to sort that out.
+}
+
+void ChannelState::AddPeer(const Connection *connection,
+                           ServerConnection *server_connection_statistics,
+                           bool logged_remotely) {
+  peers_.emplace_back(0, 0, connection, server_connection_statistics,
+                      logged_remotely);
+}
+
+void ChannelState::NodeDisconnected(sctp_assoc_t assoc_id) {
+  for (ChannelState::Peer &peer : peers_) {
+    if (peer.sac_assoc_id == assoc_id) {
+      // TODO(austin): This will not handle multiple clients from
+      // a single node.  But that should be rare.
+      peer.server_connection_statistics->mutate_state(State::DISCONNECTED);
+      peer.sac_assoc_id = 0;
+      peer.stream = 0;
+      break;
+    }
+  }
+}
+
+void ChannelState::NodeConnected(const Node *node, sctp_assoc_t assoc_id,
+                                 int stream, SctpServer *server) {
+  for (ChannelState::Peer &peer : peers_) {
+    if (peer.connection->name()->string_view() == node->name()->string_view()) {
+      peer.sac_assoc_id = assoc_id;
+      peer.stream = stream;
+      peer.server_connection_statistics->mutate_state(State::CONNECTED);
+      server->SetStreamPriority(assoc_id, stream, peer.connection->priority());
+
+      break;
+    }
+  }
+}
+
+MessageBridgeServer::MessageBridgeServer(aos::ShmEventLoop *event_loop)
+    : event_loop_(event_loop),
+      sender_(event_loop_->MakeSender<ServerStatistics>("/aos")),
+      statistics_(MakeServerStatistics(
+          configuration::DestinationNodeNames(event_loop->configuration(),
+                                              event_loop->node()),
+          event_loop->configuration())),
+      server_("::", event_loop->node()->port()) {
+  CHECK(event_loop_->node() != nullptr) << ": No nodes configured.";
+
+  // TODO(austin): Time sync.  sctp gives us filtered round trip time, not
+  // target time.
+
+  // TODO(austin): Logging synchronization.
+  //
+  // TODO(austin): How do we handle parameter channels?  The oldest value
+  // needs to be sent regardless on connection (though probably only if it has
+  // changed).
+  event_loop_->epoll()->OnReadable(server_.fd(),
+                                   [this]() { MessageReceived(); });
+
+  LOG(INFO) << "Hostname: " << event_loop_->node()->hostname()->string_view();
+
+  int channel_index = 0;
+  for (const Channel *channel : *event_loop_->configuration()->channels()) {
+    CHECK(channel->has_source_node());
+    if (channel->source_node()->string_view() ==
+            event_loop_->node()->name()->string_view() &&
+        channel->has_destination_nodes()) {
+      std::unique_ptr<ChannelState> state(
+          new ChannelState{channel, channel_index});
+
+      for (const Connection *connection : *channel->destination_nodes()) {
+        const Node *other_node = configuration::GetNode(
+            event_loop_->configuration(), connection->name()->string_view());
+        state->AddPeer(
+            connection,
+            FindServerConnection(statistics_.mutable_message(),
+                                 connection->name()->string_view()),
+            configuration::ChannelMessageIsLoggedOnNode(channel, other_node));
+      }
+
+      // Call SendData for every message.
+      ChannelState *state_ptr = state.get();
+      event_loop_->MakeRawWatcher(
+          channel,
+          [this, state_ptr](const Context &context, const void * /*message*/) {
+            state_ptr->SendData(&server_, context);
+          });
+      channels_.emplace_back(std::move(state));
+    } else {
+      channels_.emplace_back(nullptr);
+    }
+    ++channel_index;
+  }
+
+  statistics_timer_ = event_loop_->AddTimer([this]() { SendStatistics(); });
+  event_loop_->OnRun([this]() {
+    statistics_timer_->Setup(event_loop_->monotonic_now() + chrono::seconds(1),
+                             chrono::seconds(1));
+  });
+}
+
+void MessageBridgeServer::NodeConnected(sctp_assoc_t assoc_id) {
+  server_.SetPriorityScheduler(assoc_id);
+}
+
+void MessageBridgeServer::NodeDisconnected(sctp_assoc_t assoc_id) {
+  // Find any matching peers and remove them.
+  for (std::unique_ptr<ChannelState> &channel_state : channels_) {
+    if (channel_state.get() == nullptr) {
+      continue;
+    }
+
+    channel_state->NodeDisconnected(assoc_id);
+  }
+}
+
+void MessageBridgeServer::MessageReceived() {
+  aos::unique_c_ptr<Message> message = server_.Read();
+
+  if (message->message_type == Message::kNotification) {
+    const union sctp_notification *snp =
+        (const union sctp_notification *)message->data();
+
+    switch (snp->sn_header.sn_type) {
+      case SCTP_ASSOC_CHANGE: {
+        const struct sctp_assoc_change *sac = &snp->sn_assoc_change;
+        switch (sac->sac_state) {
+          case SCTP_COMM_UP:
+            NodeConnected(sac->sac_assoc_id);
+            VLOG(1) << "Peer connected";
+            break;
+          case SCTP_COMM_LOST:
+          case SCTP_SHUTDOWN_COMP:
+          case SCTP_CANT_STR_ASSOC:
+            NodeDisconnected(sac->sac_assoc_id);
+            VLOG(1) << "Disconnect";
+            break;
+          case SCTP_RESTART:
+            LOG(FATAL) << "Never seen this before.";
+            break;
+        }
+      } break;
+    }
+
+    if (VLOG_IS_ON(1)) {
+      PrintNotification(message.get());
+    }
+  } else if (message->message_type == Message::kMessage) {
+    HandleData(message.get());
+  }
+}
+
+void MessageBridgeServer::HandleData(const Message *message) {
+  VLOG(1) << "Received data of length " << message->size;
+
+  if (message->header.rcvinfo.rcv_sid == kConnectStream()) {
+    // Control channel!
+    const Connect *connect = flatbuffers::GetRoot<Connect>(message->data());
+    VLOG(1) << FlatbufferToJson(connect);
+
+    // Account for the control channel and delivery times channel.
+    size_t channel_index = kControlStreams();
+    for (const Channel *channel : *connect->channels_to_transfer()) {
+      bool matched = false;
+      for (std::unique_ptr<ChannelState> &channel_state : channels_) {
+        if (channel_state.get() == nullptr) {
+          continue;
+        }
+        if (channel_state->Matches(channel)) {
+          channel_state->NodeConnected(connect->node(),
+                                       message->header.rcvinfo.rcv_assoc_id,
+                                       channel_index, &server_);
+
+          matched = true;
+          break;
+        }
+      }
+      if (!matched) {
+        LOG(ERROR) << "Remote tried registering for unknown channel "
+                   << FlatbufferToJson(channel);
+      } else {
+        ++channel_index;
+      }
+    }
+  } else if (message->header.rcvinfo.rcv_sid == kTimestampStream()) {
+    // Message delivery
+    const logger::MessageHeader *message_header =
+        flatbuffers::GetRoot<logger::MessageHeader>(message->data());
+
+    channels_[message_header->channel_index()]->HandleDelivery(
+        message->header.rcvinfo.rcv_assoc_id, message->header.rcvinfo.rcv_ssn,
+        absl::Span<const uint8_t>(message->data(), message->size));
+  }
+
+  if (VLOG_IS_ON(1)) {
+    message->LogRcvInfo();
+  }
+}
+
+}  // namespace message_bridge
+}  // namespace aos
diff --git a/aos/network/message_bridge_server_lib.h b/aos/network/message_bridge_server_lib.h
new file mode 100644
index 0000000..f202fc9
--- /dev/null
+++ b/aos/network/message_bridge_server_lib.h
@@ -0,0 +1,130 @@
+#ifndef AOS_NETWORK_MESSAGE_BRIDGE_SERVER_LIB_H_
+#define AOS_NETWORK_MESSAGE_BRIDGE_SERVER_LIB_H_
+
+#include <deque>
+
+#include "absl/types/span.h"
+#include "aos/events/logging/logger.h"
+#include "aos/events/logging/logger_generated.h"
+#include "aos/events/shm_event_loop.h"
+#include "aos/network/connect_generated.h"
+#include "aos/network/message_bridge_server_generated.h"
+#include "aos/network/sctp_server.h"
+#include "glog/logging.h"
+
+namespace aos {
+namespace message_bridge {
+
+// See message_bridge_protocol.h for more details about the protocol.
+
+// Class to encapsulate all the state per channel.  This is the dispatcher for a
+// new message from the event loop.
+class ChannelState {
+  public:
+   ChannelState(const Channel *channel, int channel_index)
+       : channel_index_(channel_index), channel_(channel) {}
+
+   // Class to encapsulate all the state per client on a channel.  A client may
+   // be subscribed to multiple channels.
+   struct Peer {
+     Peer(sctp_assoc_t new_sac_assoc_id, size_t new_stream,
+          const Connection *new_connection,
+          ServerConnection *new_server_connection_statistics,
+          bool new_logged_remotely)
+         : sac_assoc_id(new_sac_assoc_id),
+           stream(new_stream),
+           connection(new_connection),
+           server_connection_statistics(new_server_connection_statistics),
+           logged_remotely(new_logged_remotely) {}
+
+     // Valid if != 0.
+     sctp_assoc_t sac_assoc_id = 0;
+
+     size_t stream;
+     const aos::Connection *connection;
+     ServerConnection *server_connection_statistics;
+
+     // If true, this message will be logged on a receiving node.  We need to
+     // keep it around to log it locally if that fails.
+     bool logged_remotely = false;
+  };
+
+  // Needs to be called when a node (might have) disconnected.
+  void NodeDisconnected(sctp_assoc_t assoc_id);
+  void NodeConnected(const Node *node, sctp_assoc_t assoc_id, int stream,
+                     SctpServer *server);
+
+  // Adds a new peer.
+  void AddPeer(const Connection *connection,
+               ServerConnection *server_connection_statistics,
+               bool logged_remotely);
+
+  // Returns true if this channel has the same name and type as the other
+  // channel.
+  bool Matches(const Channel *other_channel);
+
+  // Sends the data in context using the provided server.
+  void SendData(SctpServer *server, const Context &context);
+
+  // Handles reception of delivery times.
+  void HandleDelivery(sctp_assoc_t rcv_assoc_id, uint16_t ssn,
+                      absl::Span<const uint8_t> data);
+
+  // Handles (by consuming) failure to deliver a message.
+  void HandleFailure(
+      SizePrefixedFlatbufferDetachedBuffer<logger::MessageHeader> &&message);
+
+ private:
+  const int channel_index_;
+  const Channel *const channel_;
+
+  std::vector<Peer> peers_;
+
+  std::deque<SizePrefixedFlatbufferDetachedBuffer<logger::MessageHeader>>
+      sent_messages_;
+};
+
+// This encapsulates the state required to talk to *all* the clients from this
+// node.  It handles the session and dispatches data to the ChannelState.
+class MessageBridgeServer {
+ public:
+  MessageBridgeServer(aos::ShmEventLoop *event_loop);
+
+  ~MessageBridgeServer() { event_loop_->epoll()->DeleteFd(server_.fd()); }
+
+ private:
+  // Reads a message from the socket.  Could be a notification.
+  void MessageReceived();
+
+  // Called when the server connection succeeds.
+  void NodeConnected(sctp_assoc_t assoc_id);
+  // Called when the server connection disconnects.
+  void NodeDisconnected(sctp_assoc_t assoc_id);
+
+  // Called when data (either a connection request or delivery timestamps) is
+  // received.
+  void HandleData(const Message *message);
+
+  // Sends out the statistics that are continually updated by the
+  // ChannelState's.
+  void SendStatistics() { sender_.Send(statistics_); }
+
+  // Event loop to schedule everything on.
+  aos::ShmEventLoop *event_loop_;
+
+  // Statistics, timer, and associated sender.
+  aos::Sender<ServerStatistics> sender_;
+  aos::TimerHandler *statistics_timer_;
+  FlatbufferDetachedBuffer<ServerStatistics> statistics_;
+
+  SctpServer server_;
+
+  // List of channels.  The entries that aren't sent from this node are left
+  // null.
+  std::vector<std::unique_ptr<ChannelState>> channels_;
+};
+
+}  // namespace message_bridge
+}  // namespace aos
+
+#endif  // AOS_NETWORK_MESSAGE_BRIDGE_SERVER_LIB_H_
diff --git a/aos/network/message_bridge_test.cc b/aos/network/message_bridge_test.cc
new file mode 100644
index 0000000..383c1c4
--- /dev/null
+++ b/aos/network/message_bridge_test.cc
@@ -0,0 +1,154 @@
+#include "gtest/gtest.h"
+
+#include <chrono>
+#include <thread>
+
+#include "aos/events/ping_generated.h"
+#include "aos/events/pong_generated.h"
+#include "aos/network/message_bridge_client_lib.h"
+#include "aos/network/message_bridge_server_lib.h"
+
+DECLARE_string(override_hostname);
+DECLARE_string(application_name);
+
+namespace aos {
+namespace message_bridge {
+namespace testing {
+
+namespace chrono = std::chrono;
+
+// Test that we can send a ping message over sctp and receive it.
+TEST(MessageBridgeTest, PingPong) {
+  // This is rather annoying to set up.  We need to start up a client and
+  // server, on the same node, but get them to think that they are on different
+  // nodes.
+  //
+  // We then get to wait until they are connected.
+  //
+  // After they are connected, we send a Ping message.
+  //
+  // On the other end, we receive a Pong message.
+  //
+  // But, we need the client to not post directly to "/test" like it would in a
+  // real system, otherwise we will re-send the ping message... So, use an
+  // application specific map to have the client post somewhere else.
+  //
+  // To top this all off, each of these needs to be done with a ShmEventLoop,
+  // which needs to run in a separate thread...  And it is really hard to get
+  // everything started up reliably.  So just be super generous on timeouts and
+  // hope for the best.  We can be more generous in the future if we need to.
+  //
+  // We are faking the application names by passing in --application_name=foo
+  aos::FlatbufferDetachedBuffer<aos::Configuration> server_config =
+      aos::configuration::ReadConfig(
+          "aos/network/message_bridge_test_server_config.json");
+  aos::FlatbufferDetachedBuffer<aos::Configuration> client_config =
+      aos::configuration::ReadConfig(
+          "aos/network/message_bridge_test_client_config.json");
+
+  FLAGS_application_name = "pi1_message_bridge_server";
+  // Force ourselves to be "raspberrypi" and allocate everything.
+  FLAGS_override_hostname = "raspberrypi";
+  aos::ShmEventLoop server_event_loop(&server_config.message());
+  MessageBridgeServer message_bridge_server(&server_event_loop);
+
+  // And build the app which sends the pings.
+  FLAGS_application_name = "ping";
+  aos::ShmEventLoop ping_event_loop(&server_config.message());
+  aos::Sender<examples::Ping> ping_sender =
+      ping_event_loop.MakeSender<examples::Ping>("/test");
+
+  // Now do it for "raspberrypi2", the client.
+  FLAGS_application_name = "pi2_message_bridge_client";
+  FLAGS_override_hostname = "raspberrypi2";
+  aos::ShmEventLoop client_event_loop(&client_config.message());
+  MessageBridgeClient message_bridge_client(&client_event_loop);
+
+  // And build the app which sends the pongs.
+  FLAGS_application_name = "pong";
+  aos::ShmEventLoop pong_event_loop(&client_config.message());
+
+  // Count the pongs.
+  int pong_count = 0;
+  pong_event_loop.MakeWatcher(
+      "/test2", [&pong_count, &ping_event_loop](const examples::Ping &ping) {
+        ++pong_count;
+        LOG(INFO) << "Got ping back " << FlatbufferToJson(&ping);
+        if (pong_count >= 2) {
+          LOG(INFO) << "That's enough bailing early.";
+          // And Exit is async safe, so thread safe is easy.
+          ping_event_loop.Exit();
+        }
+      });
+
+  FLAGS_override_hostname = "";
+
+  // Start everything up.  Pong is the only thing we don't know how to wait on,
+  // so start it first.
+  std::thread pong_thread([&pong_event_loop]() { pong_event_loop.Run(); });
+
+  std::thread server_thread(
+      [&server_event_loop]() { server_event_loop.Run(); });
+  std::thread client_thread(
+      [&client_event_loop]() { client_event_loop.Run(); });
+
+  // Wait until we are connected, then send.
+  int ping_count = 0;
+  ping_event_loop.MakeWatcher(
+      "/aos/pi1", [&ping_count, &client_event_loop,
+                   &ping_sender](const ServerStatistics &stats) {
+        LOG(INFO) << FlatbufferToJson(&stats);
+
+        ASSERT_TRUE(stats.has_connections());
+        EXPECT_EQ(stats.connections()->size(), 1);
+
+        bool connected = false;
+        for (const ServerConnection *connection : *stats.connections()) {
+          if (connection->node()->name()->string_view() ==
+              client_event_loop.node()->name()->string_view()) {
+            if (connection->state() == State::CONNECTED) {
+              connected = true;
+            }
+            break;
+          }
+        }
+
+        if (connected) {
+          LOG(INFO) << "Connected!  Sent ping.";
+          auto builder = ping_sender.MakeBuilder();
+          examples::Ping::Builder ping_builder =
+              builder.MakeBuilder<examples::Ping>();
+          ping_builder.add_value(ping_count + 971);
+          builder.Send(ping_builder.Finish());
+          ++ping_count;
+        }
+      });
+
+  // Time ourselves out after a while if Pong doesn't do it for us.
+  aos::TimerHandler *quit = ping_event_loop.AddTimer(
+      [&ping_event_loop]() { ping_event_loop.Exit(); });
+  ping_event_loop.OnRun([quit, &ping_event_loop]() {
+    quit->Setup(ping_event_loop.monotonic_now() + chrono::seconds(10));
+  });
+
+
+  // And go!
+  ping_event_loop.Run();
+
+  // Shut everyone else down
+  server_event_loop.Exit();
+  client_event_loop.Exit();
+  pong_event_loop.Exit();
+  server_thread.join();
+  client_thread.join();
+  pong_thread.join();
+
+  // Make sure we sent something.
+  EXPECT_GE(ping_count, 1);
+  // And got something back.
+  EXPECT_GE(pong_count, 1);
+}
+
+}  // namespace testing
+}  // namespace message_bridge
+}  // namespace aos
diff --git a/aos/network/message_bridge_test_client.json b/aos/network/message_bridge_test_client.json
new file mode 100644
index 0000000..65d28d2
--- /dev/null
+++ b/aos/network/message_bridge_test_client.json
@@ -0,0 +1,17 @@
+{
+  "imports": [
+    "message_bridge_test_common.json"
+  ],
+  "nodes": [
+    {
+      "name": "pi1",
+      "hostname": "localhost",
+      "port": 9971
+    },
+    {
+      "name": "pi2",
+      "hostname": "raspberrypi2",
+      "port": 9971
+    }
+  ]
+}
diff --git a/aos/network/message_bridge_test_common.json b/aos/network/message_bridge_test_common.json
new file mode 100644
index 0000000..d0085c2
--- /dev/null
+++ b/aos/network/message_bridge_test_common.json
@@ -0,0 +1,113 @@
+{
+  "channels": [
+    {
+      "name": "/aos/pi1",
+      "type": "aos.message_bridge.ServerStatistics",
+      "source_node": "pi1",
+      "frequency": 2
+    },
+    {
+      "name": "/aos/pi2",
+      "type": "aos.message_bridge.ServerStatistics",
+      "source_node": "pi2",
+      "frequency": 2
+    },
+    {
+      "name": "/aos/pi1",
+      "type": "aos.message_bridge.ClientStatistics",
+      "source_node": "pi1",
+      "frequency": 2
+    },
+    {
+      "name": "/aos/pi2",
+      "type": "aos.message_bridge.ClientStatistics",
+      "source_node": "pi2",
+      "frequency": 2
+    },
+    {
+      "name": "/aos/pi1",
+      "type": "aos.timing.Report",
+      "source_node": "pi1",
+      "frequency": 50,
+      "num_senders": 20,
+      "max_size": 2048
+    },
+    {
+      "name": "/aos/pi2",
+      "type": "aos.timing.Report",
+      "source_node": "pi2",
+      "frequency": 50,
+      "num_senders": 20,
+      "max_size": 2048
+    },
+    {
+      "name": "/test",
+      "type": "aos.examples.Ping",
+      "source_node": "pi1",
+      "destination_nodes": [
+        {
+          "name": "pi2",
+          "priority": 1,
+          "timestamp_logger": "REMOTE_LOGGER",
+          "timestamp_logger_node": "pi1"
+        }
+      ]
+    },
+    {
+      "name": "/test2",
+      "type": "aos.examples.Ping",
+      "source_node": "pi2"
+    },
+    {
+      "name": "/test",
+      "type": "aos.examples.Pong",
+      "source_node": "pi2",
+      "logger": "LOCAL_AND_REMOTE_LOGGER",
+      "logger_node": "pi1",
+      "destination_nodes": [
+        {
+          "name": "pi1",
+          "priority": 1,
+          "timestamp_logger": "REMOTE_LOGGER",
+          "timestamp_logger_node": "pi1"
+        }
+      ]
+    }
+  ],
+  "applications": [
+    {
+      "name": "pi2_message_bridge_client",
+      "maps": [
+        {
+          "match": {
+            "name": "/test",
+            "type": "aos.examples.Ping"
+          },
+          "rename": {
+            "name": "/test2"
+          }
+        }
+      ]
+    }
+  ],
+  "maps": [
+    {
+      "match": {
+        "name": "/aos",
+        "source_node": "pi1"
+      },
+      "rename": {
+        "name": "/aos/pi1"
+      }
+    },
+    {
+      "match": {
+        "name": "/aos",
+        "source_node": "pi2"
+      },
+      "rename": {
+        "name": "/aos/pi2"
+      }
+    }
+  ]
+}
diff --git a/aos/network/message_bridge_test_server.json b/aos/network/message_bridge_test_server.json
new file mode 100644
index 0000000..eea92e7
--- /dev/null
+++ b/aos/network/message_bridge_test_server.json
@@ -0,0 +1,17 @@
+{
+  "imports": [
+    "message_bridge_test_common.json"
+  ],
+  "nodes": [
+    {
+      "name": "pi1",
+      "hostname": "raspberrypi",
+      "port": 9971
+    },
+    {
+      "name": "pi2",
+      "hostname": "localhost",
+      "port": 9971
+    }
+  ]
+}
diff --git a/aos/network/sctp_client.cc b/aos/network/sctp_client.cc
new file mode 100644
index 0000000..9e7f6c1
--- /dev/null
+++ b/aos/network/sctp_client.cc
@@ -0,0 +1,144 @@
+#include "aos/network/sctp_client.h"
+
+#include <arpa/inet.h>
+#include <net/if.h>
+#include <netinet/sctp.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <string_view>
+
+#include "aos/network/sctp_lib.h"
+#include "aos/unique_malloc_ptr.h"
+#include "glog/logging.h"
+
+namespace aos {
+namespace message_bridge {
+
+SctpClient::SctpClient(std::string_view remote_host, int remote_port,
+                       int streams, std::string_view local_host, int local_port)
+    : sockaddr_remote_(ResolveSocket(remote_host, remote_port)),
+      sockaddr_local_(ResolveSocket(local_host, local_port)),
+      fd_(socket(sockaddr_local_.ss_family, SOCK_SEQPACKET, IPPROTO_SCTP)) {
+  LOG(INFO) << "socket(" << Family(sockaddr_local_)
+            << ", SOCK_SEQPACKET, IPPROTOSCTP) = " << fd_;
+  PCHECK(fd_ != -1);
+
+  {
+    // Allow the kernel to deliver messages from different streams in any order.
+    int full_interleaving = 2;
+    PCHECK(setsockopt(fd_, IPPROTO_SCTP, SCTP_FRAGMENT_INTERLEAVE,
+                      &full_interleaving, sizeof(full_interleaving)) == 0);
+  }
+
+  {
+    struct sctp_initmsg initmsg;
+    memset(&initmsg, 0, sizeof(struct sctp_initmsg));
+    initmsg.sinit_num_ostreams = streams;
+    initmsg.sinit_max_instreams = streams;
+    PCHECK(setsockopt(fd_, IPPROTO_SCTP, SCTP_INITMSG, &initmsg,
+                      sizeof(struct sctp_initmsg)) == 0);
+  }
+
+  {
+    int on = 1;
+    PCHECK(setsockopt(fd_, IPPROTO_SCTP, SCTP_RECVRCVINFO, &on, sizeof(int)) ==
+           0);
+  }
+  {
+    // Servers send promptly.  Clients don't.
+    // TODO(austin): Revisit this assumption when we have time sync.
+    int on = 0;
+    PCHECK(setsockopt(fd_, IPPROTO_SCTP, SCTP_NODELAY, &on, sizeof(int)) == 0);
+  }
+
+  {
+    // TODO(austin): This is the old style registration...  But, the sctp
+    // stack out in the wild for linux is old and primitive.
+    struct sctp_event_subscribe subscribe;
+    memset(&subscribe, 0, sizeof(subscribe));
+    subscribe.sctp_data_io_event = 1;
+    subscribe.sctp_association_event = 1;
+    PCHECK(setsockopt(fd_, SOL_SCTP, SCTP_EVENTS, (char *)&subscribe,
+                      sizeof(subscribe)) == 0);
+  }
+
+  PCHECK(bind(fd_, (struct sockaddr *)&sockaddr_local_,
+              sockaddr_local_.ss_family == AF_INET6
+                  ? sizeof(struct sockaddr_in6)
+                  : sizeof(struct sockaddr_in)) == 0);
+  VLOG(1) << "bind(" << fd_ << ", " << Address(sockaddr_local_) << ")";
+}
+
+aos::unique_c_ptr<Message> SctpClient::Read() {
+  return ReadSctpMessage(fd_, max_size_);
+}
+
+bool SctpClient::Send(int stream, std::string_view data, int time_to_live) {
+  struct iovec iov;
+  iov.iov_base = const_cast<char *>(data.data());
+  iov.iov_len = data.size();
+
+  struct msghdr outmsg;
+  // Target to send to.
+  outmsg.msg_name = &sockaddr_remote_;
+  outmsg.msg_namelen = sizeof(struct sockaddr_storage);
+  VLOG(1) << "Sending to " << Address(sockaddr_remote_);
+
+  // Data to send.
+  outmsg.msg_iov = &iov;
+  outmsg.msg_iovlen = 1;
+
+  // Build up the sndinfo message.
+  char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
+  outmsg.msg_control = outcmsg;
+  outmsg.msg_controllen = sizeof(outcmsg);
+  outmsg.msg_flags = 0;
+
+  struct cmsghdr *cmsg = CMSG_FIRSTHDR(&outmsg);
+  cmsg->cmsg_level = IPPROTO_SCTP;
+  cmsg->cmsg_type = SCTP_SNDRCV;
+  cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
+
+  outmsg.msg_controllen = cmsg->cmsg_len;
+  struct sctp_sndrcvinfo *sinfo = (struct sctp_sndrcvinfo *)CMSG_DATA(cmsg);
+  memset(sinfo, 0, sizeof(struct sctp_sndrcvinfo));
+  sinfo->sinfo_ppid = rand();
+  sinfo->sinfo_stream = stream;
+  sinfo->sinfo_context = 19;
+  sinfo->sinfo_flags = 0;
+  sinfo->sinfo_timetolive = time_to_live;
+
+  // And send.
+  const ssize_t size = sendmsg(fd_, &outmsg, MSG_NOSIGNAL | MSG_DONTWAIT);
+  if (size == -1) {
+    if (errno != EPIPE && errno != EAGAIN) {
+      PCHECK(size == static_cast<ssize_t>(data.size()));
+    } else {
+      return false;
+    }
+  } else {
+    CHECK_EQ(static_cast<ssize_t>(data.size()), size);
+  }
+
+  VLOG(1) << "Sent " << data.size();
+  return true;
+}
+
+void SctpClient::LogSctpStatus(sctp_assoc_t assoc_id) {
+  message_bridge::LogSctpStatus(fd(), assoc_id);
+}
+
+void SctpClient::SetPriorityScheduler(sctp_assoc_t assoc_id) {
+    struct sctp_assoc_value scheduler;
+    memset(&scheduler, 0, sizeof(scheduler));
+    scheduler.assoc_id = assoc_id;
+    scheduler.assoc_value = SCTP_SS_PRIO;
+    if (setsockopt(fd(), IPPROTO_SCTP, SCTP_STREAM_SCHEDULER, &scheduler,
+                   sizeof(scheduler)) != 0) {
+      PLOG(WARNING) << "Failed to set scheduler";
+    }
+}
+
+}  // namespace message_bridge
+}  // namespace aos
diff --git a/aos/network/sctp_client.h b/aos/network/sctp_client.h
new file mode 100644
index 0000000..926e59b
--- /dev/null
+++ b/aos/network/sctp_client.h
@@ -0,0 +1,57 @@
+#ifndef AOS_NETWORK_SCTP_CLIENT_H_
+#define AOS_NETWORK_SCTP_CLIENT_H_
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string_view>
+
+#include "aos/network/sctp_lib.h"
+#include "aos/unique_malloc_ptr.h"
+#include "glog/logging.h"
+
+namespace aos {
+namespace message_bridge {
+
+// Class to encapsulate everything needed to be a SCTP client.
+class SctpClient {
+ public:
+  SctpClient(std::string_view remote_host, int remote_port, int streams,
+             std::string_view local_host = "0.0.0.0", int local_port = 9971);
+
+  ~SctpClient() {
+    LOG(INFO) << "close(" << fd_ << ")";
+    PCHECK(close(fd_) == 0);
+  }
+
+  // Receives the next packet from the remote.
+  aos::unique_c_ptr<Message> Read();
+
+  // Sends a block of data on a stream with a TTL.
+  bool Send(int stream, std::string_view data, int time_to_live);
+
+  int fd() { return fd_; }
+
+  // Enables the priority scheduler.  This is a SCTP feature which lets us
+  // configure the priority per stream so that higher priority packets don't get
+  // backed up behind lower priority packets in the networking queues.
+  void SetPriorityScheduler(sctp_assoc_t assoc_id);
+
+  // Remote to send to.
+  struct sockaddr_storage sockaddr_remote() const {
+    return sockaddr_remote_;
+  }
+
+  void LogSctpStatus(sctp_assoc_t assoc_id);
+
+ private:
+  struct sockaddr_storage sockaddr_remote_;
+  struct sockaddr_storage sockaddr_local_;
+  int fd_;
+
+  size_t max_size_ = 1000;
+};
+
+}  // namespace message_bridge
+}  // namespace aos
+
+#endif  //  AOS_NETWORK_SCTP_CLIENT_H_
diff --git a/aos/network/sctp_lib.cc b/aos/network/sctp_lib.cc
new file mode 100644
index 0000000..1ed816b
--- /dev/null
+++ b/aos/network/sctp_lib.cc
@@ -0,0 +1,229 @@
+#include "aos/network/sctp_lib.h"
+
+#include <arpa/inet.h>
+#include <net/if.h>
+#include <netdb.h>
+#include <netinet/sctp.h>
+
+#include <string_view>
+
+DEFINE_string(interface, "", "ipv6 interface");
+
+namespace aos {
+namespace message_bridge {
+
+namespace {
+const char *sac_state_tbl[] = {"COMMUNICATION_UP", "COMMUNICATION_LOST",
+                               "RESTART", "SHUTDOWN_COMPLETE",
+                               "CANT_START_ASSOCICATION"};
+
+typedef union {
+  struct sctp_initmsg init;
+  struct sctp_sndrcvinfo sndrcvinfo;
+} _sctp_cmsg_data_t;
+
+}  // namespace
+
+struct sockaddr_storage ResolveSocket(std::string_view host, int port) {
+  struct sockaddr_storage result;
+  struct addrinfo *addrinfo_result;
+  struct sockaddr_in *t_addr = (struct sockaddr_in *)&result;
+  struct sockaddr_in6 *t_addr6 = (struct sockaddr_in6 *)&result;
+
+  PCHECK(getaddrinfo(std::string(host).c_str(), 0, NULL, &addrinfo_result) ==
+         0);
+
+  switch (addrinfo_result->ai_family) {
+    case AF_INET:
+      memcpy(t_addr, addrinfo_result->ai_addr, addrinfo_result->ai_addrlen);
+      t_addr->sin_family = addrinfo_result->ai_family;
+      t_addr->sin_port = htons(port);
+
+      break;
+    case AF_INET6:
+      memcpy(t_addr6, addrinfo_result->ai_addr, addrinfo_result->ai_addrlen);
+      t_addr6->sin6_family = addrinfo_result->ai_family;
+      t_addr6->sin6_port = htons(port);
+
+      if (FLAGS_interface.size() > 0) {
+        t_addr6->sin6_scope_id = if_nametoindex(FLAGS_interface.c_str());
+      }
+
+      break;
+  }
+
+  // Now print it back out nicely.
+  char host_string[NI_MAXHOST];
+  char service_string[NI_MAXSERV];
+
+  int error = getnameinfo((struct sockaddr *)&result,
+                          addrinfo_result->ai_addrlen, host_string, NI_MAXHOST,
+                          service_string, NI_MAXSERV, NI_NUMERICHOST);
+
+  if (error) {
+    LOG(ERROR) << "Reverse lookup failed ... " << gai_strerror(error);
+  }
+
+  LOG(INFO) << "remote:addr=" << host_string << ", port=" << service_string
+            << ", family=" << addrinfo_result->ai_family;
+
+  freeaddrinfo(addrinfo_result);
+
+  return result;
+}
+
+std::string_view Family(const struct sockaddr_storage &sockaddr) {
+  if (sockaddr.ss_family == AF_INET) {
+    return "AF_INET";
+  } else if (sockaddr.ss_family == AF_INET6) {
+    return "AF_INET6";
+  } else {
+    return "unknown";
+  }
+}
+std::string Address(const struct sockaddr_storage &sockaddr) {
+  char addrbuf[INET6_ADDRSTRLEN];
+  if (sockaddr.ss_family == AF_INET) {
+    const struct sockaddr_in *sin = (const struct sockaddr_in *)&sockaddr;
+    return std::string(
+        inet_ntop(AF_INET, &sin->sin_addr, addrbuf, INET6_ADDRSTRLEN));
+  } else {
+    const struct sockaddr_in6 *sin6 = (const struct sockaddr_in6 *)&sockaddr;
+    return std::string(
+        inet_ntop(AF_INET6, &sin6->sin6_addr, addrbuf, INET6_ADDRSTRLEN));
+  }
+}
+
+void PrintNotification(const Message *msg) {
+  const union sctp_notification *snp =
+      (const union sctp_notification *)msg->data();
+
+  LOG(INFO) << "Notification:";
+
+  switch (snp->sn_header.sn_type) {
+    case SCTP_ASSOC_CHANGE: {
+      const struct sctp_assoc_change *sac = &snp->sn_assoc_change;
+      LOG(INFO) << "SCTP_ASSOC_CHANGE(" << sac_state_tbl[sac->sac_state] << ")";
+      VLOG(1) << "    (assoc_change: state=" << sac->sac_state
+              << ", error=" << sac->sac_error
+              << ", instr=" << sac->sac_inbound_streams
+              << " outstr=" << sac->sac_outbound_streams
+              << ", assoc=" << sac->sac_assoc_id << ")";
+    } break;
+    case SCTP_PEER_ADDR_CHANGE: {
+      const struct sctp_paddr_change *spc = &snp->sn_paddr_change;
+      LOG(INFO) << " SlCTP_PEER_ADDR_CHANGE";
+      VLOG(1) << "\t\t(peer_addr_change: " << Address(spc->spc_aaddr)
+              << " state=" << spc->spc_state << ", error=" << spc->spc_error
+              << ")";
+    } break;
+    case SCTP_SEND_FAILED: {
+      const struct sctp_send_failed *ssf = &snp->sn_send_failed;
+      LOG(INFO) << " SCTP_SEND_FAILED";
+      VLOG(1) << "\t\t(sendfailed: len=" << ssf->ssf_length
+              << " err=" << ssf->ssf_error << ")";
+    } break;
+    case SCTP_REMOTE_ERROR: {
+      const struct sctp_remote_error *sre = &snp->sn_remote_error;
+      LOG(INFO) << " SCTP_REMOTE_ERROR";
+      VLOG(1) << "\t\t(remote_error: err=" << ntohs(sre->sre_error) << ")";
+    } break;
+    case SCTP_SHUTDOWN_EVENT: {
+      LOG(INFO) << " SCTP_SHUTDOWN_EVENT";
+    } break;
+    default:
+      LOG(INFO) << " Unknown type: " << snp->sn_header.sn_type;
+      break;
+  }
+}
+
+std::string GetHostname() {
+  char buf[256];
+  buf[sizeof(buf) - 1] = '\0';
+  PCHECK(gethostname(buf, sizeof(buf) - 1) == 0);
+  return buf;
+}
+
+std::string Message::PeerAddress() const { return Address(sin); }
+
+void LogSctpStatus(int fd, sctp_assoc_t assoc_id) {
+  struct sctp_status status;
+  memset(&status, 0, sizeof(status));
+  status.sstat_assoc_id = assoc_id;
+
+  socklen_t size = sizeof(status);
+  PCHECK(getsockopt(fd, SOL_SCTP, SCTP_STATUS,
+                    reinterpret_cast<void *>(&status), &size) == 0);
+
+  LOG(INFO) << "sctp_status) sstat_assoc_id:" << status.sstat_assoc_id
+            << " sstat_state:" << status.sstat_state
+            << " sstat_rwnd:" << status.sstat_rwnd
+            << " sstat_unackdata:" << status.sstat_unackdata
+            << " sstat_penddata:" << status.sstat_penddata
+            << " sstat_instrms:" << status.sstat_instrms
+            << " sstat_outstrms:" << status.sstat_outstrms
+            << " sstat_fragmentation_point:" << status.sstat_fragmentation_point
+            << " sstat_primary.spinfo_srtt:" << status.sstat_primary.spinfo_srtt
+            << " sstat_primary.spinfo_rto:" << status.sstat_primary.spinfo_rto;
+}
+
+aos::unique_c_ptr<Message> ReadSctpMessage(int fd, int max_size) {
+  char incmsg[CMSG_SPACE(sizeof(_sctp_cmsg_data_t))];
+  struct iovec iov;
+  struct msghdr inmessage;
+
+  memset(&inmessage, 0, sizeof(struct msghdr));
+
+  aos::unique_c_ptr<Message> result(
+      reinterpret_cast<Message *>(malloc(sizeof(Message) + max_size)));
+
+  iov.iov_len = max_size;
+  iov.iov_base = result->mutable_data();
+
+  inmessage.msg_iov = &iov;
+  inmessage.msg_iovlen = 1;
+
+  inmessage.msg_control = incmsg;
+  inmessage.msg_controllen = sizeof(incmsg);
+
+  inmessage.msg_namelen = sizeof(struct sockaddr_storage);
+  inmessage.msg_name = &result->sin;
+
+  ssize_t size;
+  PCHECK((size = recvmsg(fd, &inmessage, 0)) > 0);
+
+  result->size = size;
+
+  if ((MSG_NOTIFICATION & inmessage.msg_flags)) {
+    result->message_type = Message::kNotification;
+  } else {
+    result->message_type = Message::kMessage;
+  }
+
+  for (struct cmsghdr *scmsg = CMSG_FIRSTHDR(&inmessage); scmsg != NULL;
+       scmsg = CMSG_NXTHDR(&inmessage, scmsg)) {
+    switch (scmsg->cmsg_type) {
+      case SCTP_RCVINFO: {
+        struct sctp_rcvinfo *data = (struct sctp_rcvinfo *)CMSG_DATA(scmsg);
+        result->header.rcvinfo = *data;
+      } break;
+      default:
+        LOG(INFO) << "\tUnknown type: " << scmsg->cmsg_type;
+        break;
+    }
+  }
+
+  return result;
+}
+
+void Message::LogRcvInfo() const {
+  LOG(INFO) << "\tSNDRCV (stream=" << header.rcvinfo.rcv_sid
+            << " ssn=" << header.rcvinfo.rcv_ssn
+            << " tsn=" << header.rcvinfo.rcv_tsn << " flags=0x" << std::hex
+            << header.rcvinfo.rcv_flags << std::dec
+            << " ppid=" << header.rcvinfo.rcv_ppid
+            << " cumtsn=" << header.rcvinfo.rcv_cumtsn << ")";
+}
+
+}  // namespace message_bridge
+}  // namespace aos
diff --git a/aos/network/sctp_lib.h b/aos/network/sctp_lib.h
new file mode 100644
index 0000000..0f90f87
--- /dev/null
+++ b/aos/network/sctp_lib.h
@@ -0,0 +1,78 @@
+#ifndef AOS_NETWORK_SCTP_LIB_H_
+#define AOS_NETWORK_SCTP_LIB_H_
+
+#include <arpa/inet.h>
+#include <netinet/sctp.h>
+
+#include <memory>
+#include <string>
+#include <string_view>
+
+#include "aos/unique_malloc_ptr.h"
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+
+namespace aos {
+namespace message_bridge {
+
+// Resolves a socket and returns the address.  This can be either an ipv4 or
+// ipv6 address.
+struct sockaddr_storage ResolveSocket(std::string_view host, int port);
+
+// Returns a formatted version of the address.
+std::string Address(const struct sockaddr_storage &sockaddr);
+// Returns a formatted version of the address family.
+std::string_view Family(const struct sockaddr_storage &sockaddr);
+
+// Message received.
+// This message is malloced bigger than needed and the extra space after it is
+// the data.
+struct Message {
+  // Struct to let us force data to be well aligned.
+  struct OveralignedChar {
+    uint8_t data alignas(32);
+  };
+
+  // Headers.
+  struct {
+    struct sctp_rcvinfo rcvinfo;
+  } header;
+
+  // Address of the sender.
+  struct sockaddr_storage sin;
+
+  // Data type. Is it a block of data, or is it a struct sctp_notification?
+  enum MessageType { kMessage, kNotification } message_type;
+
+  size_t size = 0u;
+  uint8_t *mutable_data() {
+    return reinterpret_cast<uint8_t *>(&actual_data[0].data);
+  }
+  const uint8_t *data() const {
+    return reinterpret_cast<const uint8_t *>(&actual_data[0].data);
+  }
+
+  // Returns a human readable peer IP address.
+  std::string PeerAddress() const;
+
+  // Prints out the RcvInfo structure.
+  void LogRcvInfo() const;
+
+  // The start of the data.
+  OveralignedChar actual_data[];
+};
+
+void PrintNotification(const Message *msg);
+
+std::string GetHostname();
+
+// Gets and logs the contents of the sctp_status message.
+void LogSctpStatus(int fd, sctp_assoc_t assoc_id);
+
+// Read and allocate a message.
+aos::unique_c_ptr<Message> ReadSctpMessage(int fd, int max_size);
+
+}  // namespace message_bridge
+}  // namespace aos
+
+#endif  // AOS_NETWORK_SCTP_LIB_H_
diff --git a/aos/network/sctp_server.cc b/aos/network/sctp_server.cc
new file mode 100644
index 0000000..70d5b28
--- /dev/null
+++ b/aos/network/sctp_server.cc
@@ -0,0 +1,143 @@
+#include "aos/network/sctp_server.h"
+
+#include <arpa/inet.h>
+#include <net/if.h>
+#include <netdb.h>
+#include <netinet/in.h>
+#include <netinet/sctp.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <memory>
+
+#include "aos/network/sctp_lib.h"
+#include "aos/unique_malloc_ptr.h"
+#include "glog/logging.h"
+
+namespace aos {
+namespace message_bridge {
+
+SctpServer::SctpServer(std::string_view local_host, int local_port)
+    : sockaddr_local_(ResolveSocket(local_host, local_port)),
+      fd_(socket(sockaddr_local_.ss_family, SOCK_SEQPACKET, IPPROTO_SCTP)) {
+  LOG(INFO) << "socket(" << Family(sockaddr_local_)
+            << ", SOCK_SEQPACKET, IPPROTOSCTP) = " << fd_;
+  PCHECK(fd_ != -1);
+
+  {
+    struct sctp_event_subscribe subscribe;
+    memset(&subscribe, 0, sizeof(subscribe));
+    subscribe.sctp_data_io_event = 1;
+    subscribe.sctp_association_event = 1;
+    subscribe.sctp_send_failure_event = 1;
+    subscribe.sctp_partial_delivery_event = 1;
+
+    PCHECK(setsockopt(fd_, SOL_SCTP, SCTP_EVENTS, (char *)&subscribe,
+                      sizeof(subscribe)) == 0);
+  }
+  {
+    // Enable recvinfo when a packet arrives.
+    int on = 1;
+    PCHECK(setsockopt(fd_, IPPROTO_SCTP, SCTP_RECVRCVINFO, &on, sizeof(int)) ==
+           0);
+  }
+  {
+    // Allow one packet on the wire to have multiple source packets.
+    int full_interleaving = 2;
+    PCHECK(setsockopt(fd_, IPPROTO_SCTP, SCTP_FRAGMENT_INTERLEAVE,
+                      &full_interleaving, sizeof(full_interleaving)) == 0);
+  }
+  {
+    // Turn off the NAGLE algorithm.
+    int on = 1;
+    PCHECK(setsockopt(fd_, IPPROTO_SCTP, SCTP_NODELAY, &on, sizeof(int)) == 0);
+  }
+
+  // And go!
+  PCHECK(bind(fd_, (struct sockaddr *)&sockaddr_local_,
+              sockaddr_local_.ss_family == AF_INET6
+                  ? sizeof(struct sockaddr_in6)
+                  : sizeof(struct sockaddr_in)) == 0);
+  LOG(INFO) << "bind(" << fd_ << ", " << Address(sockaddr_local_) << ")";
+
+  PCHECK(listen(fd_, 100) == 0);
+
+  PCHECK(setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &max_size_,
+                    sizeof(max_size_)) == 0);
+}
+
+aos::unique_c_ptr<Message> SctpServer::Read() {
+  return ReadSctpMessage(fd_, max_size_);
+}
+
+void SctpServer::Send(std::string_view data, sctp_assoc_t snd_assoc_id,
+                      int stream, int timetolive) {
+  struct iovec iov;
+  iov.iov_base = const_cast<char *>(data.data());
+  iov.iov_len = data.size();
+
+  // Use the assoc_id for the destination instead of the msg_name.
+  struct msghdr outmsg;
+  outmsg.msg_namelen = 0;
+
+  // Data to send.
+  outmsg.msg_iov = &iov;
+  outmsg.msg_iovlen = 1;
+
+  // Build up the sndinfo message.
+  char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
+  outmsg.msg_control = outcmsg;
+  outmsg.msg_controllen = CMSG_SPACE(sizeof(struct sctp_sndrcvinfo));
+  outmsg.msg_flags = 0;
+
+  struct cmsghdr *cmsg = CMSG_FIRSTHDR(&outmsg);
+  cmsg->cmsg_level = IPPROTO_SCTP;
+  cmsg->cmsg_type = SCTP_SNDRCV;
+  cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
+
+  struct sctp_sndrcvinfo *sinfo = (struct sctp_sndrcvinfo *)CMSG_DATA(cmsg);
+  memset(sinfo, 0, sizeof(struct sctp_sndrcvinfo));
+  sinfo->sinfo_ppid = ++ppid_;
+  sinfo->sinfo_stream = stream;
+  sinfo->sinfo_flags = 0;
+  sinfo->sinfo_assoc_id = snd_assoc_id;
+  sinfo->sinfo_timetolive = timetolive;
+
+  // And send.
+  const ssize_t size = sendmsg(fd_, &outmsg, MSG_NOSIGNAL | MSG_DONTWAIT);
+  if (size == -1) {
+    if (errno != EPIPE) {
+      PCHECK(size == static_cast<ssize_t>(data.size()));
+    }
+  } else {
+    CHECK_EQ(static_cast<ssize_t>(data.size()), size);
+  }
+}
+
+void SctpServer::SetPriorityScheduler(sctp_assoc_t assoc_id) {
+  struct sctp_assoc_value scheduler;
+  memset(&scheduler, 0, sizeof(scheduler));
+  scheduler.assoc_id = assoc_id;
+  scheduler.assoc_value = SCTP_SS_PRIO;
+  if (setsockopt(fd(), IPPROTO_SCTP, SCTP_STREAM_SCHEDULER, &scheduler,
+                 sizeof(scheduler)) != 0) {
+    PLOG(WARNING) << "Failed to set scheduler";
+  }
+}
+
+void SctpServer::SetStreamPriority(sctp_assoc_t assoc_id, int stream_id,
+                                   uint16_t priority) {
+  struct sctp_stream_value sctp_priority;
+  memset(&sctp_priority, 0, sizeof(sctp_priority));
+  sctp_priority.assoc_id = assoc_id;
+  sctp_priority.stream_id = stream_id;
+  sctp_priority.stream_value = priority;
+  if (setsockopt(fd(), IPPROTO_SCTP, SCTP_STREAM_SCHEDULER_VALUE,
+                 &sctp_priority, sizeof(sctp_priority)) != 0) {
+    PLOG(WARNING) << "Failed to set scheduler";
+  }
+}
+
+}  // namespace message_bridge
+}  // namespace aos
diff --git a/aos/network/sctp_server.h b/aos/network/sctp_server.h
new file mode 100644
index 0000000..a3086d9
--- /dev/null
+++ b/aos/network/sctp_server.h
@@ -0,0 +1,63 @@
+#ifndef AOS_NETWORK_SCTP_SERVER_H_
+#define AOS_NETWORK_SCTP_SERVER_H_
+
+#include <arpa/inet.h>
+#include <net/if.h>
+#include <netdb.h>
+#include <netinet/in.h>
+#include <netinet/sctp.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <memory>
+#include <sys/socket.h>
+
+#include "aos/network/sctp_lib.h"
+#include "aos/unique_malloc_ptr.h"
+#include "glog/logging.h"
+
+namespace aos {
+namespace message_bridge {
+
+class SctpServer {
+ public:
+  SctpServer(std::string_view local_host = "0.0.0.0", int local_port = 9971);
+
+  ~SctpServer() {
+    LOG(INFO) << "close(" << fd_ << ")";
+    PCHECK(close(fd_) == 0);
+  }
+
+  // Receives the next packet from the remote.
+  aos::unique_c_ptr<Message> Read();
+
+  // Sends a block of data to a client on a stream with a TTL.
+  void Send(std::string_view data, sctp_assoc_t snd_assoc_id, int stream,
+            int timetolive);
+
+  int fd() { return fd_; }
+
+  // Enables the priority scheduler.  This is a SCTP feature which lets us
+  // configure the priority per stream so that higher priority packets don't get
+  // backed up behind lower priority packets in the networking queues.
+  void SetPriorityScheduler(sctp_assoc_t assoc_id);
+
+  // Sets the priority of a specific stream.
+  void SetStreamPriority(sctp_assoc_t assoc_id, int stream_id,
+                         uint16_t priority);
+
+ private:
+  struct sockaddr_storage sockaddr_local_;
+  int fd_;
+
+  // TODO(austin): Configure this.
+  size_t max_size_ = 1000;
+
+  int ppid_ = 1;
+};
+
+
+}  // namespace message_bridge
+}  // namespace aos
+
+#endif  // AOS_NETWORK_SCTP_SERVER_H_
diff --git a/aos/testdata/config1_multinode.json b/aos/testdata/config1_multinode.json
index 32bce5c..f12d927 100644
--- a/aos/testdata/config1_multinode.json
+++ b/aos/testdata/config1_multinode.json
@@ -4,12 +4,22 @@
       "name": "/foo",
       "type": ".aos.bar",
       "max_size": 5,
-      "source_node": "pi2"
+      "source_node": "pi2",
+      "destination_nodes": [
+        {
+          "name": "pi1"
+        }
+      ]
     },
     {
       "name": "/foo2",
       "type": ".aos.bar",
-      "source_node": "pi1"
+      "source_node": "pi1",
+      "destination_nodes": [
+        {
+          "name": "pi2"
+        }
+      ]
     }
   ],
   "applications": [
diff --git a/aos/testdata/expected_multinode.json b/aos/testdata/expected_multinode.json
index 46d052e..0946007 100644
--- a/aos/testdata/expected_multinode.json
+++ b/aos/testdata/expected_multinode.json
@@ -4,12 +4,22 @@
    "name": "/foo",
    "type": ".aos.bar",
    "max_size": 5,
-   "source_node": "pi2"
+   "source_node": "pi2",
+   "destination_nodes": [
+    {
+     "name": "pi1"
+    }
+   ]
   },
   {
    "name": "/foo2",
    "type": ".aos.bar",
-   "source_node": "pi1"
+   "source_node": "pi1",
+   "destination_nodes": [
+    {
+     "name": "pi2"
+    }
+   ]
   },
   {
    "name": "/foo3",
diff --git a/compilers/linaro_linux_gcc.BUILD b/compilers/linaro_linux_gcc.BUILD
index db2e9bb..aea261d 100644
--- a/compilers/linaro_linux_gcc.BUILD
+++ b/compilers/linaro_linux_gcc.BUILD
@@ -63,7 +63,10 @@
         "libexec/**",
         "lib/gcc/arm-linux-gnueabihf/**",
         "include/**",
-    ]),
+    ], exclude=["arm-linux-gnueabihf/libc/usr/include/linux/sctp.h"]) +
+    [
+        "@org_frc971//third_party/linux:sctp",
+    ],
 )
 
 filegroup(
diff --git a/third_party/linux/BUILD b/third_party/linux/BUILD
new file mode 100644
index 0000000..007b913
--- /dev/null
+++ b/third_party/linux/BUILD
@@ -0,0 +1,10 @@
+licenses(["notice"])
+
+package(default_visibility = ["//visibility:public"])
+
+filegroup(
+    name = "sctp",
+    srcs = [
+        "sctp.h",
+    ],
+)
diff --git a/third_party/linux/sctp.h b/third_party/linux/sctp.h
new file mode 100644
index 0000000..bd82046
--- /dev/null
+++ b/third_party/linux/sctp.h
@@ -0,0 +1,1105 @@
+/* SPDX-License-Identifier: GPL-2.0+ WITH Linux-syscall-note */
+/* SCTP kernel implementation
+ * (C) Copyright IBM Corp. 2001, 2004
+ * Copyright (c) 1999-2000 Cisco, Inc.
+ * Copyright (c) 1999-2001 Motorola, Inc.
+ * Copyright (c) 2002 Intel Corp.
+ *
+ * This file is part of the SCTP kernel implementation
+ *
+ * This header represents the structures and constants needed to support
+ * the SCTP Extension to the Sockets API.
+ *
+ * This SCTP implementation is free software;
+ * you can redistribute it and/or modify it under the terms of
+ * the GNU General Public License as published by
+ * the Free Software Foundation; either version 2, or (at your option)
+ * any later version.
+ *
+ * This SCTP implementation is distributed in the hope that it
+ * will be useful, but WITHOUT ANY WARRANTY; without even the implied
+ *                 ************************
+ * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ * See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with GNU CC; see the file COPYING.  If not, see
+ * <http://www.gnu.org/licenses/>.
+ *
+ * Please send any bug reports or fixes you make to the
+ * email address(es):
+ *    lksctp developers <linux-sctp@vger.kernel.org>
+ *
+ * Or submit a bug report through the following website:
+ *    http://www.sf.net/projects/lksctp
+ *
+ * Written or modified by:
+ *    La Monte H.P. Yarroll    <piggy@acm.org>
+ *    R. Stewart               <randall@sctp.chicago.il.us>
+ *    K. Morneau               <kmorneau@cisco.com>
+ *    Q. Xie                   <qxie1@email.mot.com>
+ *    Karl Knutson             <karl@athena.chicago.il.us>
+ *    Jon Grimm                <jgrimm@us.ibm.com>
+ *    Daisy Chang              <daisyc@us.ibm.com>
+ *    Ryan Layer               <rmlayer@us.ibm.com>
+ *    Ardelle Fan              <ardelle.fan@intel.com>
+ *    Sridhar Samudrala        <sri@us.ibm.com>
+ *    Inaky Perez-Gonzalez     <inaky.gonzalez@intel.com>
+ *    Vlad Yasevich            <vladislav.yasevich@hp.com>
+ *
+ * Any bugs reported given to us we will try to fix... any fixes shared will
+ * be incorporated into the next SCTP release.
+ */
+
+#ifndef _SCTP_H
+#define _SCTP_H
+
+#include <linux/types.h>
+#include <linux/socket.h>
+
+typedef __s32 sctp_assoc_t;
+
+/* The following symbols come from the Sockets API Extensions for
+ * SCTP <draft-ietf-tsvwg-sctpsocket-07.txt>.
+ */
+#define SCTP_RTOINFO	0
+#define SCTP_ASSOCINFO  1
+#define SCTP_INITMSG	2
+#define SCTP_NODELAY	3		/* Get/set nodelay option. */
+#define SCTP_AUTOCLOSE	4
+#define SCTP_SET_PEER_PRIMARY_ADDR 5
+#define SCTP_PRIMARY_ADDR	6
+#define SCTP_ADAPTATION_LAYER	7
+#define SCTP_DISABLE_FRAGMENTS	8
+#define SCTP_PEER_ADDR_PARAMS	9
+#define SCTP_DEFAULT_SEND_PARAM	10
+#define SCTP_EVENTS	11
+#define SCTP_I_WANT_MAPPED_V4_ADDR 12	/* Turn on/off mapped v4 addresses  */
+#define SCTP_MAXSEG	13		/* Get/set maximum fragment. */
+#define SCTP_STATUS	14
+#define SCTP_GET_PEER_ADDR_INFO	15
+#define SCTP_DELAYED_ACK_TIME	16
+#define SCTP_DELAYED_ACK SCTP_DELAYED_ACK_TIME
+#define SCTP_DELAYED_SACK SCTP_DELAYED_ACK_TIME
+#define SCTP_CONTEXT	17
+#define SCTP_FRAGMENT_INTERLEAVE	18
+#define SCTP_PARTIAL_DELIVERY_POINT	19 /* Set/Get partial delivery point */
+#define SCTP_MAX_BURST	20		/* Set/Get max burst */
+#define SCTP_AUTH_CHUNK	21	/* Set only: add a chunk type to authenticate */
+#define SCTP_HMAC_IDENT	22
+#define SCTP_AUTH_KEY	23
+#define SCTP_AUTH_ACTIVE_KEY	24
+#define SCTP_AUTH_DELETE_KEY	25
+#define SCTP_PEER_AUTH_CHUNKS	26	/* Read only */
+#define SCTP_LOCAL_AUTH_CHUNKS	27	/* Read only */
+#define SCTP_GET_ASSOC_NUMBER	28	/* Read only */
+#define SCTP_GET_ASSOC_ID_LIST	29	/* Read only */
+#define SCTP_AUTO_ASCONF       30
+#define SCTP_PEER_ADDR_THLDS	31
+#define SCTP_RECVRCVINFO	32
+#define SCTP_RECVNXTINFO	33
+#define SCTP_DEFAULT_SNDINFO	34
+
+/* Internal Socket Options. Some of the sctp library functions are
+ * implemented using these socket options.
+ */
+#define SCTP_SOCKOPT_BINDX_ADD	100	/* BINDX requests for adding addrs */
+#define SCTP_SOCKOPT_BINDX_REM	101	/* BINDX requests for removing addrs. */
+#define SCTP_SOCKOPT_PEELOFF	102	/* peel off association. */
+/* Options 104-106 are deprecated and removed. Do not use this space */
+#define SCTP_SOCKOPT_CONNECTX_OLD	107	/* CONNECTX old requests. */
+#define SCTP_GET_PEER_ADDRS	108		/* Get all peer address. */
+#define SCTP_GET_LOCAL_ADDRS	109		/* Get all local address. */
+#define SCTP_SOCKOPT_CONNECTX	110		/* CONNECTX requests. */
+#define SCTP_SOCKOPT_CONNECTX3	111	/* CONNECTX requests (updated) */
+#define SCTP_GET_ASSOC_STATS	112	/* Read only */
+#define SCTP_PR_SUPPORTED	113
+#define SCTP_DEFAULT_PRINFO	114
+#define SCTP_PR_ASSOC_STATUS	115
+#define SCTP_PR_STREAM_STATUS	116
+#define SCTP_RECONFIG_SUPPORTED	117
+#define SCTP_ENABLE_STREAM_RESET	118
+#define SCTP_RESET_STREAMS	119
+#define SCTP_RESET_ASSOC	120
+#define SCTP_ADD_STREAMS	121
+#define SCTP_SOCKOPT_PEELOFF_FLAGS 122
+#define SCTP_STREAM_SCHEDULER	123
+#define SCTP_STREAM_SCHEDULER_VALUE	124
+
+/* PR-SCTP policies */
+#define SCTP_PR_SCTP_NONE	0x0000
+#define SCTP_PR_SCTP_TTL	0x0010
+#define SCTP_PR_SCTP_RTX	0x0020
+#define SCTP_PR_SCTP_PRIO	0x0030
+#define SCTP_PR_SCTP_MAX	SCTP_PR_SCTP_PRIO
+#define SCTP_PR_SCTP_MASK	0x0030
+
+#define __SCTP_PR_INDEX(x)	((x >> 4) - 1)
+#define SCTP_PR_INDEX(x)	__SCTP_PR_INDEX(SCTP_PR_SCTP_ ## x)
+
+#define SCTP_PR_POLICY(x)	((x) & SCTP_PR_SCTP_MASK)
+#define SCTP_PR_SET_POLICY(flags, x)	\
+	do {				\
+		flags &= ~SCTP_PR_SCTP_MASK;	\
+		flags |= x;		\
+	} while (0)
+
+#define SCTP_PR_TTL_ENABLED(x)	(SCTP_PR_POLICY(x) == SCTP_PR_SCTP_TTL)
+#define SCTP_PR_RTX_ENABLED(x)	(SCTP_PR_POLICY(x) == SCTP_PR_SCTP_RTX)
+#define SCTP_PR_PRIO_ENABLED(x)	(SCTP_PR_POLICY(x) == SCTP_PR_SCTP_PRIO)
+
+/* For enable stream reset */
+#define SCTP_ENABLE_RESET_STREAM_REQ	0x01
+#define SCTP_ENABLE_RESET_ASSOC_REQ	0x02
+#define SCTP_ENABLE_CHANGE_ASSOC_REQ	0x04
+#define SCTP_ENABLE_STRRESET_MASK	0x07
+
+#define SCTP_STREAM_RESET_INCOMING	0x01
+#define SCTP_STREAM_RESET_OUTGOING	0x02
+
+/* These are bit fields for msghdr->msg_flags.  See section 5.1.  */
+/* On user space Linux, these live in <bits/socket.h> as an enum.  */
+enum sctp_msg_flags {
+	MSG_NOTIFICATION = 0x8000,
+#define MSG_NOTIFICATION MSG_NOTIFICATION
+};
+
+/* 5.3.1 SCTP Initiation Structure (SCTP_INIT)
+ *
+ *   This cmsghdr structure provides information for initializing new
+ *   SCTP associations with sendmsg().  The SCTP_INITMSG socket option
+ *   uses this same data structure.  This structure is not used for
+ *   recvmsg().
+ *
+ *   cmsg_level    cmsg_type      cmsg_data[]
+ *   ------------  ------------   ----------------------
+ *   IPPROTO_SCTP  SCTP_INIT      struct sctp_initmsg
+ */
+struct sctp_initmsg {
+	__u16 sinit_num_ostreams;
+	__u16 sinit_max_instreams;
+	__u16 sinit_max_attempts;
+	__u16 sinit_max_init_timeo;
+};
+
+/* 5.3.2 SCTP Header Information Structure (SCTP_SNDRCV)
+ *
+ *   This cmsghdr structure specifies SCTP options for sendmsg() and
+ *   describes SCTP header information about a received message through
+ *   recvmsg().
+ *
+ *   cmsg_level    cmsg_type      cmsg_data[]
+ *   ------------  ------------   ----------------------
+ *   IPPROTO_SCTP  SCTP_SNDRCV    struct sctp_sndrcvinfo
+ */
+struct sctp_sndrcvinfo {
+	__u16 sinfo_stream;
+	__u16 sinfo_ssn;
+	__u16 sinfo_flags;
+	__u32 sinfo_ppid;
+	__u32 sinfo_context;
+	__u32 sinfo_timetolive;
+	__u32 sinfo_tsn;
+	__u32 sinfo_cumtsn;
+	sctp_assoc_t sinfo_assoc_id;
+};
+
+/* 5.3.4 SCTP Send Information Structure (SCTP_SNDINFO)
+ *
+ *   This cmsghdr structure specifies SCTP options for sendmsg().
+ *
+ *   cmsg_level    cmsg_type      cmsg_data[]
+ *   ------------  ------------   -------------------
+ *   IPPROTO_SCTP  SCTP_SNDINFO   struct sctp_sndinfo
+ */
+struct sctp_sndinfo {
+	__u16 snd_sid;
+	__u16 snd_flags;
+	__u32 snd_ppid;
+	__u32 snd_context;
+	sctp_assoc_t snd_assoc_id;
+};
+
+/* 5.3.5 SCTP Receive Information Structure (SCTP_RCVINFO)
+ *
+ *   This cmsghdr structure describes SCTP receive information
+ *   about a received message through recvmsg().
+ *
+ *   cmsg_level    cmsg_type      cmsg_data[]
+ *   ------------  ------------   -------------------
+ *   IPPROTO_SCTP  SCTP_RCVINFO   struct sctp_rcvinfo
+ */
+struct sctp_rcvinfo {
+	__u16 rcv_sid;
+	__u16 rcv_ssn;
+	__u16 rcv_flags;
+	__u32 rcv_ppid;
+	__u32 rcv_tsn;
+	__u32 rcv_cumtsn;
+	__u32 rcv_context;
+	sctp_assoc_t rcv_assoc_id;
+};
+
+/* 5.3.6 SCTP Next Receive Information Structure (SCTP_NXTINFO)
+ *
+ *   This cmsghdr structure describes SCTP receive information
+ *   of the next message that will be delivered through recvmsg()
+ *   if this information is already available when delivering
+ *   the current message.
+ *
+ *   cmsg_level    cmsg_type      cmsg_data[]
+ *   ------------  ------------   -------------------
+ *   IPPROTO_SCTP  SCTP_NXTINFO   struct sctp_nxtinfo
+ */
+struct sctp_nxtinfo {
+	__u16 nxt_sid;
+	__u16 nxt_flags;
+	__u32 nxt_ppid;
+	__u32 nxt_length;
+	sctp_assoc_t nxt_assoc_id;
+};
+
+/*
+ *  sinfo_flags: 16 bits (unsigned integer)
+ *
+ *   This field may contain any of the following flags and is composed of
+ *   a bitwise OR of these values.
+ */
+enum sctp_sinfo_flags {
+	SCTP_UNORDERED		= (1 << 0), /* Send/receive message unordered. */
+	SCTP_ADDR_OVER		= (1 << 1), /* Override the primary destination. */
+	SCTP_ABORT		= (1 << 2), /* Send an ABORT message to the peer. */
+	SCTP_SACK_IMMEDIATELY	= (1 << 3), /* SACK should be sent without delay. */
+	SCTP_NOTIFICATION	= MSG_NOTIFICATION, /* Next message is not user msg but notification. */
+	SCTP_EOF		= MSG_FIN,  /* Initiate graceful shutdown process. */
+};
+
+typedef union {
+	__u8   			raw;
+	struct sctp_initmsg	init;
+	struct sctp_sndrcvinfo	sndrcv;
+} sctp_cmsg_data_t;
+
+/* These are cmsg_types.  */
+typedef enum sctp_cmsg_type {
+	SCTP_INIT,		/* 5.2.1 SCTP Initiation Structure */
+#define SCTP_INIT	SCTP_INIT
+	SCTP_SNDRCV,		/* 5.2.2 SCTP Header Information Structure */
+#define SCTP_SNDRCV	SCTP_SNDRCV
+	SCTP_SNDINFO,		/* 5.3.4 SCTP Send Information Structure */
+#define SCTP_SNDINFO	SCTP_SNDINFO
+	SCTP_RCVINFO,		/* 5.3.5 SCTP Receive Information Structure */
+#define SCTP_RCVINFO	SCTP_RCVINFO
+	SCTP_NXTINFO,		/* 5.3.6 SCTP Next Receive Information Structure */
+#define SCTP_NXTINFO	SCTP_NXTINFO
+} sctp_cmsg_t;
+
+/*
+ * 5.3.1.1 SCTP_ASSOC_CHANGE
+ *
+ *   Communication notifications inform the ULP that an SCTP association
+ *   has either begun or ended. The identifier for a new association is
+ *   provided by this notificaion. The notification information has the
+ *   following format:
+ *
+ */
+struct sctp_assoc_change {
+	__u16 sac_type;
+	__u16 sac_flags;
+	__u32 sac_length;
+	__u16 sac_state;
+	__u16 sac_error;
+	__u16 sac_outbound_streams;
+	__u16 sac_inbound_streams;
+	sctp_assoc_t sac_assoc_id;
+	__u8 sac_info[0];
+};
+
+/*
+ *   sac_state: 32 bits (signed integer)
+ *
+ *   This field holds one of a number of values that communicate the
+ *   event that happened to the association.  They include:
+ *
+ *   Note:  The following state names deviate from the API draft as
+ *   the names clash too easily with other kernel symbols.
+ */
+enum sctp_sac_state {
+	SCTP_COMM_UP,
+	SCTP_COMM_LOST,
+	SCTP_RESTART,
+	SCTP_SHUTDOWN_COMP,
+	SCTP_CANT_STR_ASSOC,
+};
+
+/*
+ * 5.3.1.2 SCTP_PEER_ADDR_CHANGE
+ *
+ *   When a destination address on a multi-homed peer encounters a change
+ *   an interface details event is sent.  The information has the
+ *   following structure:
+ */
+struct sctp_paddr_change {
+	__u16 spc_type;
+	__u16 spc_flags;
+	__u32 spc_length;
+	struct sockaddr_storage spc_aaddr;
+	int spc_state;
+	int spc_error;
+	sctp_assoc_t spc_assoc_id;
+} __attribute__((packed, aligned(4)));
+
+/*
+ *    spc_state:  32 bits (signed integer)
+ *
+ *   This field holds one of a number of values that communicate the
+ *   event that happened to the address.  They include:
+ */
+enum sctp_spc_state {
+	SCTP_ADDR_AVAILABLE,
+	SCTP_ADDR_UNREACHABLE,
+	SCTP_ADDR_REMOVED,
+	SCTP_ADDR_ADDED,
+	SCTP_ADDR_MADE_PRIM,
+	SCTP_ADDR_CONFIRMED,
+};
+
+
+/*
+ * 5.3.1.3 SCTP_REMOTE_ERROR
+ *
+ *   A remote peer may send an Operational Error message to its peer.
+ *   This message indicates a variety of error conditions on an
+ *   association. The entire error TLV as it appears on the wire is
+ *   included in a SCTP_REMOTE_ERROR event.  Please refer to the SCTP
+ *   specification [SCTP] and any extensions for a list of possible
+ *   error formats. SCTP error TLVs have the format:
+ */
+struct sctp_remote_error {
+	__u16 sre_type;
+	__u16 sre_flags;
+	__u32 sre_length;
+	__be16 sre_error;
+	sctp_assoc_t sre_assoc_id;
+	__u8 sre_data[0];
+};
+
+
+/*
+ * 5.3.1.4 SCTP_SEND_FAILED
+ *
+ *   If SCTP cannot deliver a message it may return the message as a
+ *   notification.
+ */
+struct sctp_send_failed {
+	__u16 ssf_type;
+	__u16 ssf_flags;
+	__u32 ssf_length;
+	__u32 ssf_error;
+	struct sctp_sndrcvinfo ssf_info;
+	sctp_assoc_t ssf_assoc_id;
+	__u8 ssf_data[0];
+};
+
+/*
+ *   ssf_flags: 16 bits (unsigned integer)
+ *
+ *   The flag value will take one of the following values
+ *
+ *   SCTP_DATA_UNSENT  - Indicates that the data was never put on
+ *                       the wire.
+ *
+ *   SCTP_DATA_SENT    - Indicates that the data was put on the wire.
+ *                       Note that this does not necessarily mean that the
+ *                       data was (or was not) successfully delivered.
+ */
+enum sctp_ssf_flags {
+	SCTP_DATA_UNSENT,
+	SCTP_DATA_SENT,
+};
+
+/*
+ * 5.3.1.5 SCTP_SHUTDOWN_EVENT
+ *
+ *   When a peer sends a SHUTDOWN, SCTP delivers this notification to
+ *   inform the application that it should cease sending data.
+ */
+struct sctp_shutdown_event {
+	__u16 sse_type;
+	__u16 sse_flags;
+	__u32 sse_length;
+	sctp_assoc_t sse_assoc_id;
+};
+
+/*
+ * 5.3.1.6 SCTP_ADAPTATION_INDICATION
+ *
+ *   When a peer sends a Adaptation Layer Indication parameter , SCTP
+ *   delivers this notification to inform the application
+ *   that of the peers requested adaptation layer.
+ */
+struct sctp_adaptation_event {
+	__u16 sai_type;
+	__u16 sai_flags;
+	__u32 sai_length;
+	__u32 sai_adaptation_ind;
+	sctp_assoc_t sai_assoc_id;
+};
+
+/*
+ * 5.3.1.7 SCTP_PARTIAL_DELIVERY_EVENT
+ *
+ *   When a receiver is engaged in a partial delivery of a
+ *   message this notification will be used to indicate
+ *   various events.
+ */
+struct sctp_pdapi_event {
+	__u16 pdapi_type;
+	__u16 pdapi_flags;
+	__u32 pdapi_length;
+	__u32 pdapi_indication;
+	sctp_assoc_t pdapi_assoc_id;
+};
+
+enum { SCTP_PARTIAL_DELIVERY_ABORTED=0, };
+
+/*
+ * 5.3.1.8.  SCTP_AUTHENTICATION_EVENT
+ *
+ *  When a receiver is using authentication this message will provide
+ *  notifications regarding new keys being made active as well as errors.
+ */
+struct sctp_authkey_event {
+	__u16 auth_type;
+	__u16 auth_flags;
+	__u32 auth_length;
+	__u16 auth_keynumber;
+	__u16 auth_altkeynumber;
+	__u32 auth_indication;
+	sctp_assoc_t auth_assoc_id;
+};
+
+enum { SCTP_AUTH_NEWKEY = 0, };
+
+/*
+ * 6.1.9. SCTP_SENDER_DRY_EVENT
+ *
+ * When the SCTP stack has no more user data to send or retransmit, this
+ * notification is given to the user. Also, at the time when a user app
+ * subscribes to this event, if there is no data to be sent or
+ * retransmit, the stack will immediately send up this notification.
+ */
+struct sctp_sender_dry_event {
+	__u16 sender_dry_type;
+	__u16 sender_dry_flags;
+	__u32 sender_dry_length;
+	sctp_assoc_t sender_dry_assoc_id;
+};
+
+#define SCTP_STREAM_RESET_INCOMING_SSN	0x0001
+#define SCTP_STREAM_RESET_OUTGOING_SSN	0x0002
+#define SCTP_STREAM_RESET_DENIED	0x0004
+#define SCTP_STREAM_RESET_FAILED	0x0008
+struct sctp_stream_reset_event {
+	__u16 strreset_type;
+	__u16 strreset_flags;
+	__u32 strreset_length;
+	sctp_assoc_t strreset_assoc_id;
+	__u16 strreset_stream_list[];
+};
+
+#define SCTP_ASSOC_RESET_DENIED		0x0004
+#define SCTP_ASSOC_RESET_FAILED		0x0008
+struct sctp_assoc_reset_event {
+	__u16 assocreset_type;
+	__u16 assocreset_flags;
+	__u32 assocreset_length;
+	sctp_assoc_t assocreset_assoc_id;
+	__u32 assocreset_local_tsn;
+	__u32 assocreset_remote_tsn;
+};
+
+#define SCTP_ASSOC_CHANGE_DENIED	0x0004
+#define SCTP_ASSOC_CHANGE_FAILED	0x0008
+#define SCTP_STREAM_CHANGE_DENIED	SCTP_ASSOC_CHANGE_DENIED
+#define SCTP_STREAM_CHANGE_FAILED	SCTP_ASSOC_CHANGE_FAILED
+struct sctp_stream_change_event {
+	__u16 strchange_type;
+	__u16 strchange_flags;
+	__u32 strchange_length;
+	sctp_assoc_t strchange_assoc_id;
+	__u16 strchange_instrms;
+	__u16 strchange_outstrms;
+};
+
+/*
+ * Described in Section 7.3
+ *   Ancillary Data and Notification Interest Options
+ */
+struct sctp_event_subscribe {
+	__u8 sctp_data_io_event;
+	__u8 sctp_association_event;
+	__u8 sctp_address_event;
+	__u8 sctp_send_failure_event;
+	__u8 sctp_peer_error_event;
+	__u8 sctp_shutdown_event;
+	__u8 sctp_partial_delivery_event;
+	__u8 sctp_adaptation_layer_event;
+	__u8 sctp_authentication_event;
+	__u8 sctp_sender_dry_event;
+	__u8 sctp_stream_reset_event;
+	__u8 sctp_assoc_reset_event;
+	__u8 sctp_stream_change_event;
+};
+
+/*
+ * 5.3.1 SCTP Notification Structure
+ *
+ *   The notification structure is defined as the union of all
+ *   notification types.
+ *
+ */
+union sctp_notification {
+	struct {
+		__u16 sn_type;             /* Notification type. */
+		__u16 sn_flags;
+		__u32 sn_length;
+	} sn_header;
+	struct sctp_assoc_change sn_assoc_change;
+	struct sctp_paddr_change sn_paddr_change;
+	struct sctp_remote_error sn_remote_error;
+	struct sctp_send_failed sn_send_failed;
+	struct sctp_shutdown_event sn_shutdown_event;
+	struct sctp_adaptation_event sn_adaptation_event;
+	struct sctp_pdapi_event sn_pdapi_event;
+	struct sctp_authkey_event sn_authkey_event;
+	struct sctp_sender_dry_event sn_sender_dry_event;
+	struct sctp_stream_reset_event sn_strreset_event;
+	struct sctp_assoc_reset_event sn_assocreset_event;
+	struct sctp_stream_change_event sn_strchange_event;
+};
+
+/* Section 5.3.1
+ * All standard values for sn_type flags are greater than 2^15.
+ * Values from 2^15 and down are reserved.
+ */
+
+enum sctp_sn_type {
+	SCTP_SN_TYPE_BASE     = (1<<15),
+	SCTP_ASSOC_CHANGE,
+#define SCTP_ASSOC_CHANGE		SCTP_ASSOC_CHANGE
+	SCTP_PEER_ADDR_CHANGE,
+#define SCTP_PEER_ADDR_CHANGE		SCTP_PEER_ADDR_CHANGE
+	SCTP_SEND_FAILED,
+#define SCTP_SEND_FAILED		SCTP_SEND_FAILED
+	SCTP_REMOTE_ERROR,
+#define SCTP_REMOTE_ERROR		SCTP_REMOTE_ERROR
+	SCTP_SHUTDOWN_EVENT,
+#define SCTP_SHUTDOWN_EVENT		SCTP_SHUTDOWN_EVENT
+	SCTP_PARTIAL_DELIVERY_EVENT,
+#define SCTP_PARTIAL_DELIVERY_EVENT	SCTP_PARTIAL_DELIVERY_EVENT
+	SCTP_ADAPTATION_INDICATION,
+#define SCTP_ADAPTATION_INDICATION	SCTP_ADAPTATION_INDICATION
+	SCTP_AUTHENTICATION_EVENT,
+#define SCTP_AUTHENTICATION_INDICATION	SCTP_AUTHENTICATION_EVENT
+	SCTP_SENDER_DRY_EVENT,
+#define SCTP_SENDER_DRY_EVENT		SCTP_SENDER_DRY_EVENT
+	SCTP_STREAM_RESET_EVENT,
+#define SCTP_STREAM_RESET_EVENT		SCTP_STREAM_RESET_EVENT
+	SCTP_ASSOC_RESET_EVENT,
+#define SCTP_ASSOC_RESET_EVENT		SCTP_ASSOC_RESET_EVENT
+	SCTP_STREAM_CHANGE_EVENT,
+#define SCTP_STREAM_CHANGE_EVENT	SCTP_STREAM_CHANGE_EVENT
+};
+
+/* Notification error codes used to fill up the error fields in some
+ * notifications.
+ * SCTP_PEER_ADDRESS_CHAGE 	: spc_error
+ * SCTP_ASSOC_CHANGE		: sac_error
+ * These names should be potentially included in the draft 04 of the SCTP
+ * sockets API specification.
+ */
+typedef enum sctp_sn_error {
+	SCTP_FAILED_THRESHOLD,
+	SCTP_RECEIVED_SACK,
+	SCTP_HEARTBEAT_SUCCESS,
+	SCTP_RESPONSE_TO_USER_REQ,
+	SCTP_INTERNAL_ERROR,
+	SCTP_SHUTDOWN_GUARD_EXPIRES,
+	SCTP_PEER_FAULTY,
+} sctp_sn_error_t;
+
+/*
+ * 7.1.1 Retransmission Timeout Parameters (SCTP_RTOINFO)
+ *
+ *   The protocol parameters used to initialize and bound retransmission
+ *   timeout (RTO) are tunable.  See [SCTP] for more information on how
+ *   these parameters are used in RTO calculation.
+ */
+struct sctp_rtoinfo {
+	sctp_assoc_t	srto_assoc_id;
+	__u32		srto_initial;
+	__u32		srto_max;
+	__u32		srto_min;
+};
+
+/*
+ * 7.1.2 Association Parameters (SCTP_ASSOCINFO)
+ *
+ *   This option is used to both examine and set various association and
+ *   endpoint parameters.
+ */
+struct sctp_assocparams {
+	sctp_assoc_t	sasoc_assoc_id;
+	__u16		sasoc_asocmaxrxt;
+	__u16		sasoc_number_peer_destinations;
+	__u32		sasoc_peer_rwnd;
+	__u32		sasoc_local_rwnd;
+	__u32		sasoc_cookie_life;
+};
+
+/*
+ * 7.1.9 Set Peer Primary Address (SCTP_SET_PEER_PRIMARY_ADDR)
+ *
+ *  Requests that the peer mark the enclosed address as the association
+ *  primary. The enclosed address must be one of the association's
+ *  locally bound addresses. The following structure is used to make a
+ *   set primary request:
+ */
+struct sctp_setpeerprim {
+	sctp_assoc_t            sspp_assoc_id;
+	struct sockaddr_storage sspp_addr;
+} __attribute__((packed, aligned(4)));
+
+/*
+ * 7.1.10 Set Primary Address (SCTP_PRIMARY_ADDR)
+ *
+ *  Requests that the local SCTP stack use the enclosed peer address as
+ *  the association primary. The enclosed address must be one of the
+ *  association peer's addresses. The following structure is used to
+ *  make a set peer primary request:
+ */
+struct sctp_prim {
+	sctp_assoc_t            ssp_assoc_id;
+	struct sockaddr_storage ssp_addr;
+} __attribute__((packed, aligned(4)));
+
+/* For backward compatibility use, define the old name too */
+#define sctp_setprim	sctp_prim
+
+/*
+ * 7.1.11 Set Adaptation Layer Indicator (SCTP_ADAPTATION_LAYER)
+ *
+ * Requests that the local endpoint set the specified Adaptation Layer
+ * Indication parameter for all future INIT and INIT-ACK exchanges.
+ */
+struct sctp_setadaptation {
+	__u32	ssb_adaptation_ind;
+};
+
+/*
+ * 7.1.13 Peer Address Parameters  (SCTP_PEER_ADDR_PARAMS)
+ *
+ *   Applications can enable or disable heartbeats for any peer address
+ *   of an association, modify an address's heartbeat interval, force a
+ *   heartbeat to be sent immediately, and adjust the address's maximum
+ *   number of retransmissions sent before an address is considered
+ *   unreachable. The following structure is used to access and modify an
+ *   address's parameters:
+ */
+enum  sctp_spp_flags {
+	SPP_HB_ENABLE = 1<<0,		/*Enable heartbeats*/
+	SPP_HB_DISABLE = 1<<1,		/*Disable heartbeats*/
+	SPP_HB = SPP_HB_ENABLE | SPP_HB_DISABLE,
+	SPP_HB_DEMAND = 1<<2,		/*Send heartbeat immediately*/
+	SPP_PMTUD_ENABLE = 1<<3,	/*Enable PMTU discovery*/
+	SPP_PMTUD_DISABLE = 1<<4,	/*Disable PMTU discovery*/
+	SPP_PMTUD = SPP_PMTUD_ENABLE | SPP_PMTUD_DISABLE,
+	SPP_SACKDELAY_ENABLE = 1<<5,	/*Enable SACK*/
+	SPP_SACKDELAY_DISABLE = 1<<6,	/*Disable SACK*/
+	SPP_SACKDELAY = SPP_SACKDELAY_ENABLE | SPP_SACKDELAY_DISABLE,
+	SPP_HB_TIME_IS_ZERO = 1<<7,	/* Set HB delay to 0 */
+};
+
+struct sctp_paddrparams {
+	sctp_assoc_t		spp_assoc_id;
+	struct sockaddr_storage	spp_address;
+	__u32			spp_hbinterval;
+	__u16			spp_pathmaxrxt;
+	__u32			spp_pathmtu;
+	__u32			spp_sackdelay;
+	__u32			spp_flags;
+} __attribute__((packed, aligned(4)));
+
+/*
+ * 7.1.18.  Add a chunk that must be authenticated (SCTP_AUTH_CHUNK)
+ *
+ * This set option adds a chunk type that the user is requesting to be
+ * received only in an authenticated way.  Changes to the list of chunks
+ * will only effect future associations on the socket.
+ */
+struct sctp_authchunk {
+	__u8		sauth_chunk;
+};
+
+/*
+ * 7.1.19.  Get or set the list of supported HMAC Identifiers (SCTP_HMAC_IDENT)
+ *
+ * This option gets or sets the list of HMAC algorithms that the local
+ * endpoint requires the peer to use.
+ */
+/* This here is only used by user space as is. It might not be a good idea
+ * to export/reveal the whole structure with reserved fields etc.
+ */
+enum {
+	SCTP_AUTH_HMAC_ID_SHA1 = 1,
+	SCTP_AUTH_HMAC_ID_SHA256 = 3,
+};
+
+struct sctp_hmacalgo {
+	__u32		shmac_num_idents;
+	__u16		shmac_idents[];
+};
+
+/* Sadly, user and kernel space have different names for
+ * this structure member, so this is to not break anything.
+ */
+#define shmac_number_of_idents	shmac_num_idents
+
+/*
+ * 7.1.20.  Set a shared key (SCTP_AUTH_KEY)
+ *
+ * This option will set a shared secret key which is used to build an
+ * association shared key.
+ */
+struct sctp_authkey {
+	sctp_assoc_t	sca_assoc_id;
+	__u16		sca_keynumber;
+	__u16		sca_keylength;
+	__u8		sca_key[];
+};
+
+/*
+ * 7.1.21.  Get or set the active shared key (SCTP_AUTH_ACTIVE_KEY)
+ *
+ * This option will get or set the active shared key to be used to build
+ * the association shared key.
+ */
+
+struct sctp_authkeyid {
+	sctp_assoc_t	scact_assoc_id;
+	__u16		scact_keynumber;
+};
+
+
+/*
+ * 7.1.23.  Get or set delayed ack timer (SCTP_DELAYED_SACK)
+ *
+ * This option will effect the way delayed acks are performed.  This
+ * option allows you to get or set the delayed ack time, in
+ * milliseconds.  It also allows changing the delayed ack frequency.
+ * Changing the frequency to 1 disables the delayed sack algorithm.  If
+ * the assoc_id is 0, then this sets or gets the endpoints default
+ * values.  If the assoc_id field is non-zero, then the set or get
+ * effects the specified association for the one to many model (the
+ * assoc_id field is ignored by the one to one model).  Note that if
+ * sack_delay or sack_freq are 0 when setting this option, then the
+ * current values will remain unchanged.
+ */
+struct sctp_sack_info {
+	sctp_assoc_t	sack_assoc_id;
+	uint32_t	sack_delay;
+	uint32_t	sack_freq;
+};
+
+struct sctp_assoc_value {
+    sctp_assoc_t            assoc_id;
+    uint32_t                assoc_value;
+};
+
+struct sctp_stream_value {
+	sctp_assoc_t assoc_id;
+	uint16_t stream_id;
+	uint16_t stream_value;
+};
+
+/*
+ * 7.2.2 Peer Address Information
+ *
+ *   Applications can retrieve information about a specific peer address
+ *   of an association, including its reachability state, congestion
+ *   window, and retransmission timer values.  This information is
+ *   read-only. The following structure is used to access this
+ *   information:
+ */
+struct sctp_paddrinfo {
+	sctp_assoc_t		spinfo_assoc_id;
+	struct sockaddr_storage	spinfo_address;
+	__s32			spinfo_state;
+	__u32			spinfo_cwnd;
+	__u32			spinfo_srtt;
+	__u32			spinfo_rto;
+	__u32			spinfo_mtu;
+} __attribute__((packed, aligned(4)));
+
+/* Peer addresses's state. */
+/* UNKNOWN: Peer address passed by the upper layer in sendmsg or connect[x]
+ * calls.
+ * UNCONFIRMED: Peer address received in INIT/INIT-ACK address parameters.
+ *              Not yet confirmed by a heartbeat and not available for data
+ *		transfers.
+ * ACTIVE : Peer address confirmed, active and available for data transfers.
+ * INACTIVE: Peer address inactive and not available for data transfers.
+ */
+enum sctp_spinfo_state {
+	SCTP_INACTIVE,
+	SCTP_PF,
+	SCTP_ACTIVE,
+	SCTP_UNCONFIRMED,
+	SCTP_UNKNOWN = 0xffff  /* Value used for transport state unknown */
+};
+
+/*
+ * 7.2.1 Association Status (SCTP_STATUS)
+ *
+ *   Applications can retrieve current status information about an
+ *   association, including association state, peer receiver window size,
+ *   number of unacked data chunks, and number of data chunks pending
+ *   receipt.  This information is read-only.  The following structure is
+ *   used to access this information:
+ */
+struct sctp_status {
+	sctp_assoc_t		sstat_assoc_id;
+	__s32			sstat_state;
+	__u32			sstat_rwnd;
+	__u16			sstat_unackdata;
+	__u16			sstat_penddata;
+	__u16			sstat_instrms;
+	__u16			sstat_outstrms;
+	__u32			sstat_fragmentation_point;
+	struct sctp_paddrinfo	sstat_primary;
+};
+
+/*
+ * 7.2.3.  Get the list of chunks the peer requires to be authenticated
+ *         (SCTP_PEER_AUTH_CHUNKS)
+ *
+ * This option gets a list of chunks for a specified association that
+ * the peer requires to be received authenticated only.
+ */
+struct sctp_authchunks {
+	sctp_assoc_t	gauth_assoc_id;
+	__u32		gauth_number_of_chunks;
+	uint8_t		gauth_chunks[];
+};
+
+/* The broken spelling has been released already in lksctp-tools header,
+ * so don't break anyone, now that it's fixed.
+ */
+#define guth_number_of_chunks	gauth_number_of_chunks
+
+/* Association states.  */
+enum sctp_sstat_state {
+	SCTP_EMPTY                = 0,
+	SCTP_CLOSED               = 1,
+	SCTP_COOKIE_WAIT          = 2,
+	SCTP_COOKIE_ECHOED        = 3,
+	SCTP_ESTABLISHED          = 4,
+	SCTP_SHUTDOWN_PENDING     = 5,
+	SCTP_SHUTDOWN_SENT        = 6,
+	SCTP_SHUTDOWN_RECEIVED    = 7,
+	SCTP_SHUTDOWN_ACK_SENT    = 8,
+};
+
+/*
+ * 8.2.6. Get the Current Identifiers of Associations
+ *        (SCTP_GET_ASSOC_ID_LIST)
+ *
+ * This option gets the current list of SCTP association identifiers of
+ * the SCTP associations handled by a one-to-many style socket.
+ */
+struct sctp_assoc_ids {
+	__u32		gaids_number_of_ids;
+	sctp_assoc_t	gaids_assoc_id[];
+};
+
+/*
+ * 8.3, 8.5 get all peer/local addresses in an association.
+ * This parameter struct is used by SCTP_GET_PEER_ADDRS and
+ * SCTP_GET_LOCAL_ADDRS socket options used internally to implement
+ * sctp_getpaddrs() and sctp_getladdrs() API.
+ */
+struct sctp_getaddrs_old {
+	sctp_assoc_t            assoc_id;
+	int			addr_num;
+	struct sockaddr		*addrs;
+};
+
+struct sctp_getaddrs {
+	sctp_assoc_t		assoc_id; /*input*/
+	__u32			addr_num; /*output*/
+	__u8			addrs[0]; /*output, variable size*/
+};
+
+/* A socket user request obtained via SCTP_GET_ASSOC_STATS that retrieves
+ * association stats. All stats are counts except sas_maxrto and
+ * sas_obs_rto_ipaddr. maxrto is the max observed rto + transport since
+ * the last call. Will return 0 when RTO was not update since last call
+ */
+struct sctp_assoc_stats {
+	sctp_assoc_t	sas_assoc_id;    /* Input */
+					 /* Transport of observed max RTO */
+	struct sockaddr_storage sas_obs_rto_ipaddr;
+	__u64		sas_maxrto;      /* Maximum Observed RTO for period */
+	__u64		sas_isacks;	 /* SACKs received */
+	__u64		sas_osacks;	 /* SACKs sent */
+	__u64		sas_opackets;	 /* Packets sent */
+	__u64		sas_ipackets;	 /* Packets received */
+	__u64		sas_rtxchunks;   /* Retransmitted Chunks */
+	__u64		sas_outofseqtsns;/* TSN received > next expected */
+	__u64		sas_idupchunks;  /* Dups received (ordered+unordered) */
+	__u64		sas_gapcnt;      /* Gap Acknowledgements Received */
+	__u64		sas_ouodchunks;  /* Unordered data chunks sent */
+	__u64		sas_iuodchunks;  /* Unordered data chunks received */
+	__u64		sas_oodchunks;	 /* Ordered data chunks sent */
+	__u64		sas_iodchunks;	 /* Ordered data chunks received */
+	__u64		sas_octrlchunks; /* Control chunks sent */
+	__u64		sas_ictrlchunks; /* Control chunks received */
+};
+
+/*
+ * 8.1 sctp_bindx()
+ *
+ * The flags parameter is formed from the bitwise OR of zero or more of the
+ * following currently defined flags:
+ */
+#define SCTP_BINDX_ADD_ADDR 0x01
+#define SCTP_BINDX_REM_ADDR 0x02
+
+/* This is the structure that is passed as an argument(optval) to
+ * getsockopt(SCTP_SOCKOPT_PEELOFF).
+ */
+typedef struct {
+	sctp_assoc_t associd;
+	int sd;
+} sctp_peeloff_arg_t;
+
+typedef struct {
+	sctp_peeloff_arg_t p_arg;
+	unsigned flags;
+} sctp_peeloff_flags_arg_t;
+
+/*
+ *  Peer Address Thresholds socket option
+ */
+struct sctp_paddrthlds {
+	sctp_assoc_t spt_assoc_id;
+	struct sockaddr_storage spt_address;
+	__u16 spt_pathmaxrxt;
+	__u16 spt_pathpfthld;
+};
+
+/*
+ * Socket Option for Getting the Association/Stream-Specific PR-SCTP Status
+ */
+struct sctp_prstatus {
+	sctp_assoc_t sprstat_assoc_id;
+	__u16 sprstat_sid;
+	__u16 sprstat_policy;
+	__u64 sprstat_abandoned_unsent;
+	__u64 sprstat_abandoned_sent;
+};
+
+struct sctp_default_prinfo {
+	sctp_assoc_t pr_assoc_id;
+	__u32 pr_value;
+	__u16 pr_policy;
+};
+
+struct sctp_info {
+	__u32	sctpi_tag;
+	__u32	sctpi_state;
+	__u32	sctpi_rwnd;
+	__u16	sctpi_unackdata;
+	__u16	sctpi_penddata;
+	__u16	sctpi_instrms;
+	__u16	sctpi_outstrms;
+	__u32	sctpi_fragmentation_point;
+	__u32	sctpi_inqueue;
+	__u32	sctpi_outqueue;
+	__u32	sctpi_overall_error;
+	__u32	sctpi_max_burst;
+	__u32	sctpi_maxseg;
+	__u32	sctpi_peer_rwnd;
+	__u32	sctpi_peer_tag;
+	__u8	sctpi_peer_capable;
+	__u8	sctpi_peer_sack;
+	__u16	__reserved1;
+
+	/* assoc status info */
+	__u64	sctpi_isacks;
+	__u64	sctpi_osacks;
+	__u64	sctpi_opackets;
+	__u64	sctpi_ipackets;
+	__u64	sctpi_rtxchunks;
+	__u64	sctpi_outofseqtsns;
+	__u64	sctpi_idupchunks;
+	__u64	sctpi_gapcnt;
+	__u64	sctpi_ouodchunks;
+	__u64	sctpi_iuodchunks;
+	__u64	sctpi_oodchunks;
+	__u64	sctpi_iodchunks;
+	__u64	sctpi_octrlchunks;
+	__u64	sctpi_ictrlchunks;
+
+	/* primary transport info */
+	struct sockaddr_storage	sctpi_p_address;
+	__s32	sctpi_p_state;
+	__u32	sctpi_p_cwnd;
+	__u32	sctpi_p_srtt;
+	__u32	sctpi_p_rto;
+	__u32	sctpi_p_hbinterval;
+	__u32	sctpi_p_pathmaxrxt;
+	__u32	sctpi_p_sackdelay;
+	__u32	sctpi_p_sackfreq;
+	__u32	sctpi_p_ssthresh;
+	__u32	sctpi_p_partial_bytes_acked;
+	__u32	sctpi_p_flight_size;
+	__u16	sctpi_p_error;
+	__u16	__reserved2;
+
+	/* sctp sock info */
+	__u32	sctpi_s_autoclose;
+	__u32	sctpi_s_adaptation_ind;
+	__u32	sctpi_s_pd_point;
+	__u8	sctpi_s_nodelay;
+	__u8	sctpi_s_disable_fragments;
+	__u8	sctpi_s_v4mapped;
+	__u8	sctpi_s_frag_interleave;
+	__u32	sctpi_s_type;
+	__u32	__reserved3;
+};
+
+struct sctp_reset_streams {
+	sctp_assoc_t srs_assoc_id;
+	uint16_t srs_flags;
+	uint16_t srs_number_streams;	/* 0 == ALL */
+	uint16_t srs_stream_list[];	/* list if srs_num_streams is not 0 */
+};
+
+struct sctp_add_streams {
+	sctp_assoc_t sas_assoc_id;
+	uint16_t sas_instrms;
+	uint16_t sas_outstrms;
+};
+
+/* SCTP Stream schedulers */
+enum sctp_sched_type {
+	SCTP_SS_FCFS,
+	SCTP_SS_DEFAULT = SCTP_SS_FCFS,
+	SCTP_SS_PRIO,
+	SCTP_SS_RR,
+	SCTP_SS_MAX = SCTP_SS_RR
+};
+
+#endif /* _SCTP_H */
diff --git a/third_party/lksctp-tools/BUILD b/third_party/lksctp-tools/BUILD
new file mode 100644
index 0000000..b127b77
--- /dev/null
+++ b/third_party/lksctp-tools/BUILD
@@ -0,0 +1,35 @@
+licenses(["notice"])
+
+genrule(
+    name = "sctp_copy",
+    srcs = [
+        "src/include/netinet/sctp.h.in",
+    ],
+    outs = [
+        "src/include/netinet/sctp.h",
+    ],
+    cmd = "cp $< $@",
+)
+
+cc_library(
+    name = "sctp",
+    srcs = [
+        "src/lib/addrs.c",
+        "src/lib/bindx.c",
+        "src/lib/opt_info.c",
+        "src/lib/peeloff.c",
+        "src/lib/recvmsg.c",
+        "src/lib/sendmsg.c",
+    ],
+    hdrs = [
+        "src/include/netinet/sctp.h",
+    ],
+    copts = [
+        "-Wno-cast-align",
+        "-Wno-cast-qual",
+    ],
+    includes = [
+        "src/include",
+    ],
+    visibility = ["//visibility:public"],
+)
diff --git a/third_party/lksctp-tools/src/include/netinet/sctp.h.in b/third_party/lksctp-tools/src/include/netinet/sctp.h.in
index 2009f1c..c1d4d2c 100644
--- a/third_party/lksctp-tools/src/include/netinet/sctp.h.in
+++ b/third_party/lksctp-tools/src/include/netinet/sctp.h.in
@@ -60,15 +60,17 @@
 #define HAVE_SCTP_ADDIP
 #define HAVE_SCTP_CANSET_PRIMARY
 
-#undef HAVE_SCTP_STREAM_RESET_EVENT
-#undef HAVE_SCTP_STREAM_RECONFIG
-#undef HAVE_SCTP_PEELOFF_FLAGS
-#undef HAVE_SCTP_PDAPI_EVENT_PDAPI_STREAM
-#undef HAVE_SCTP_PDAPI_EVENT_PDAPI_SEQ
-#undef HAVE_SCTP_SENDV
-#undef HAVE_SCTP_AUTH_NO_AUTH
-#undef HAVE_SCTP_SPP_IPV6_FLOWLABEL
-#undef HAVE_SCTP_SPP_DSCP
+#define HAVE_SCTP_STREAM_RESET_EVENT 1
+/* #undef HAVE_SCTP_ASSOC_RESET_EVENT */
+/* #undef HAVE_SCTP_STREAM_CHANGE_EVENT */
+#define HAVE_SCTP_STREAM_RECONFIG 1
+/* #undef HAVE_SCTP_PEELOFF_FLAGS */
+#define HAVE_SCTP_PDAPI_EVENT_PDAPI_STREAM 1
+#define HAVE_SCTP_PDAPI_EVENT_PDAPI_SEQ 1
+/* #undef HAVE_SCTP_SENDV */
+/* #undef HAVE_SCTP_AUTH_NO_AUTH */
+/* #undef HAVE_SCTP_SPP_IPV6_FLOWLABEL */
+/* #undef HAVE_SCTP_SPP_DSCP */
 
 int sctp_bindx(int sd, struct sockaddr *addrs, int addrcnt, int flags);
 
diff --git a/tools/cpp/CROSSTOOL b/tools/cpp/CROSSTOOL
index 39f29d8..20f47e3 100644
--- a/tools/cpp/CROSSTOOL
+++ b/tools/cpp/CROSSTOOL
@@ -932,6 +932,8 @@
   compiler_flag: "external/linaro_linux_gcc_repo/include/c++/7.4.1"
   compiler_flag: "-isystem"
   compiler_flag: "external/linaro_linux_gcc_repo/arm-linux-gnueabihf/libc/usr/include"
+  compiler_flag: "-isystem"
+  compiler_flag: "external/org_frc971/third_party"
   compiler_flag: "-D__STDC_FORMAT_MACROS"
   compiler_flag: "-D__STDC_CONSTANT_MACROS"
   compiler_flag: "-D__STDC_LIMIT_MACROS"