Merge changes I68d7b396,I94868562

* changes:
  Initial tuning of the control loops.
  Update python models to have accurate constants
diff --git a/aos/events/BUILD b/aos/events/BUILD
index 1f7a3d7..a905728 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -141,6 +141,7 @@
         ":ping_fbs",
         ":pong_fbs",
         "//aos/network:message_bridge_client_fbs",
+        "//aos/network:timestamp_fbs",
         "//aos/network:message_bridge_server_fbs",
     ],
     deps = [":config"],
diff --git a/aos/events/event_loop.h b/aos/events/event_loop.h
index 27df016..6047cbc 100644
--- a/aos/events/event_loop.h
+++ b/aos/events/event_loop.h
@@ -306,6 +306,16 @@
     return sender_ ? true : false;
   }
 
+  // Returns the time_points that the last message was sent at.
+  aos::monotonic_clock::time_point monotonic_sent_time() const {
+    return sender_->monotonic_sent_time();
+  }
+  aos::realtime_clock::time_point realtime_sent_time() const {
+    return sender_->realtime_sent_time();
+  }
+  // Returns the queue index that this was sent with.
+  uint32_t sent_queue_index() const { return sender_->sent_queue_index(); }
+
  private:
   friend class EventLoop;
   Sender(std::unique_ptr<RawSender> sender) : sender_(std::move(sender)) {}
diff --git a/aos/events/multinode_pingpong.json b/aos/events/multinode_pingpong.json
index c9bc53e..54cc658 100644
--- a/aos/events/multinode_pingpong.json
+++ b/aos/events/multinode_pingpong.json
@@ -26,6 +26,56 @@
     },
     {
       "name": "/aos/pi1",
+      "type": "aos.message_bridge.Timestamp",
+      "source_node": "pi1",
+      "frequency": 10,
+      "num_senders": 2,
+      "max_size": 200,
+      "destination_nodes": [
+        {
+          "name": "pi2",
+          "priority": 1,
+          "time_to_live": 5000000
+        },
+        {
+          "name": "pi3",
+          "priority": 1,
+          "time_to_live": 5000000
+        }
+      ]
+    },
+    {
+      "name": "/aos/pi2",
+      "type": "aos.message_bridge.Timestamp",
+      "source_node": "pi2",
+      "frequency": 10,
+      "num_senders": 2,
+      "max_size": 200,
+      "destination_nodes": [
+        {
+          "name": "pi1",
+          "priority": 1,
+          "time_to_live": 5000000
+        }
+      ]
+    },
+    {
+      "name": "/aos/pi3",
+      "type": "aos.message_bridge.Timestamp",
+      "source_node": "pi3",
+      "frequency": 10,
+      "num_senders": 2,
+      "max_size": 200,
+      "destination_nodes": [
+        {
+          "name": "pi1",
+          "priority": 1,
+          "time_to_live": 5000000
+        }
+      ]
+    },
+    {
+      "name": "/aos/pi1",
       "type": "aos.message_bridge.ServerStatistics",
       "source_node": "pi1",
       "frequency": 2
@@ -141,7 +191,6 @@
     {
       "match": {
         "name": "/aos",
-        "type": "aos.logging.LogMessageFbs",
         "source_node": "pi1"
       },
       "rename": {
@@ -151,7 +200,6 @@
     {
       "match": {
         "name": "/aos",
-        "type": "aos.logging.LogMessageFbs",
         "source_node": "pi2"
       },
       "rename": {
@@ -161,97 +209,6 @@
     {
       "match": {
         "name": "/aos",
-        "type": "aos.logging.LogMessageFbs",
-        "source_node": "pi3"
-      },
-      "rename": {
-        "name": "/aos/pi3"
-      }
-    },
-    {
-      "match": {
-        "name": "/aos",
-        "type": "aos.timing.Report",
-        "source_node": "pi1"
-      },
-      "rename": {
-        "name": "/aos/pi1"
-      }
-    },
-    {
-      "match": {
-        "name": "/aos",
-        "type": "aos.timing.Report",
-        "source_node": "pi2"
-      },
-      "rename": {
-        "name": "/aos/pi2"
-      }
-    },
-    {
-      "match": {
-        "name": "/aos",
-        "type": "aos.timing.Report",
-        "source_node": "pi3"
-      },
-      "rename": {
-        "name": "/aos/pi3"
-      }
-    },
-    {
-      "match": {
-        "name": "/aos",
-        "type": "aos.message_bridge.ServerStatistics",
-        "source_node": "pi1"
-      },
-      "rename": {
-        "name": "/aos/pi1"
-      }
-    },
-    {
-      "match": {
-        "name": "/aos",
-        "type": "aos.message_bridge.ServerStatistics",
-        "source_node": "pi2"
-      },
-      "rename": {
-        "name": "/aos/pi2"
-      }
-    },
-    {
-      "match": {
-        "name": "/aos",
-        "type": "aos.message_bridge.ServerStatistics",
-        "source_node": "pi3"
-      },
-      "rename": {
-        "name": "/aos/pi3"
-      }
-    },
-    {
-      "match": {
-        "name": "/aos",
-        "type": "aos.message_bridge.ClientStatistics",
-        "source_node": "pi1"
-      },
-      "rename": {
-        "name": "/aos/pi1"
-      }
-    },
-    {
-      "match": {
-        "name": "/aos",
-        "type": "aos.message_bridge.ClientStatistics",
-        "source_node": "pi2"
-      },
-      "rename": {
-        "name": "/aos/pi2"
-      }
-    },
-    {
-      "match": {
-        "name": "/aos",
-        "type": "aos.message_bridge.ClientStatistics",
         "source_node": "pi3"
       },
       "rename": {
diff --git a/aos/network/BUILD b/aos/network/BUILD
index d915404..96d561f 100644
--- a/aos/network/BUILD
+++ b/aos/network/BUILD
@@ -14,6 +14,15 @@
 )
 
 flatbuffer_cc_library(
+    name = "timestamp_fbs",
+    srcs = ["timestamp.fbs"],
+    gen_reflections = 1,
+    includes = [
+        "//aos:configuration_fbs_includes",
+    ],
+)
+
+flatbuffer_cc_library(
     name = "message_bridge_client_fbs",
     srcs = ["message_bridge_client.fbs"],
     gen_reflections = 1,
@@ -95,9 +104,19 @@
 
 cc_library(
     name = "message_bridge_protocol",
+    srcs = [
+        "message_bridge_protocol.cc",
+    ],
     hdrs = [
         "message_bridge_protocol.h",
     ],
+    deps = [
+        ":connect_fbs",
+        "//aos:configuration",
+        "//aos:flatbuffer_merge",
+        "//aos:flatbuffers",
+        "@com_github_google_flatbuffers//:flatbuffers",
+    ],
 )
 
 cc_library(
@@ -113,10 +132,12 @@
     ],
     deps = [
         ":connect_fbs",
+        ":message_bridge_client_fbs",
         ":message_bridge_protocol",
         ":message_bridge_server_fbs",
         ":sctp_lib",
         ":sctp_server",
+        ":timestamp_fbs",
         "//aos:unique_malloc_ptr",
         "//aos/events:shm_event_loop",
         "//aos/events/logging:logger",
@@ -171,6 +192,7 @@
         ":message_bridge_protocol",
         ":message_bridge_server_fbs",
         ":sctp_client",
+        ":timestamp_fbs",
         "//aos/events:shm_event_loop",
         "//aos/events/logging:logger",
     ],
@@ -200,6 +222,7 @@
         "//aos/events:pong_fbs",
         "//aos/network:message_bridge_client_fbs",
         "//aos/network:message_bridge_server_fbs",
+        "//aos/network:timestamp_fbs",
     ],
     deps = ["//aos/events:config"],
 )
diff --git a/aos/network/message_bridge_client.fbs b/aos/network/message_bridge_client.fbs
index 58b653a..df3f02f 100644
--- a/aos/network/message_bridge_client.fbs
+++ b/aos/network/message_bridge_client.fbs
@@ -13,6 +13,11 @@
   // Number of packets received on all channels.
   received_packets:uint;
 
+  // This is the measured monotonic offset for just the server -> client
+  // direction measured in nanoseconds.  Subtract this from our monotonic time
+  // to get their monotonic time.
+  monotonic_offset:int64;
+
   // TODO(austin): Per channel counts?
 }
 
diff --git a/aos/network/message_bridge_client_lib.cc b/aos/network/message_bridge_client_lib.cc
index 93eccc9..263a757 100644
--- a/aos/network/message_bridge_client_lib.cc
+++ b/aos/network/message_bridge_client_lib.cc
@@ -9,6 +9,7 @@
 #include "aos/network/message_bridge_client_generated.h"
 #include "aos/network/message_bridge_protocol.h"
 #include "aos/network/sctp_client.h"
+#include "aos/network/timestamp_generated.h"
 #include "aos/unique_malloc_ptr.h"
 #include "gflags/gflags.h"
 #include "glog/logging.h"
@@ -26,40 +27,6 @@
 namespace {
 namespace chrono = std::chrono;
 
-aos::FlatbufferDetachedBuffer<aos::message_bridge::Connect> MakeConnectMessage(
-    const Configuration *config, const Node *my_node,
-    std::string_view remote_name) {
-  CHECK(config->has_nodes()) << ": Config must have nodes to transfer.";
-
-  flatbuffers::FlatBufferBuilder fbb;
-  fbb.ForceDefaults(1);
-
-  flatbuffers::Offset<Node> node_offset = CopyFlatBuffer<Node>(my_node, &fbb);
-  const std::string_view node_name = my_node->name()->string_view();
-
-  std::vector<flatbuffers::Offset<Channel>> channel_offsets;
-  for (const Channel *channel : *config->channels()) {
-    if (channel->has_destination_nodes()) {
-      for (const Connection *connection : *channel->destination_nodes()) {
-        if (connection->name()->string_view() == node_name &&
-            channel->source_node()->string_view() == remote_name) {
-          channel_offsets.emplace_back(CopyFlatBuffer<Channel>(channel, &fbb));
-        }
-      }
-    }
-  }
-
-  flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Channel>>>
-      channels_offset = fbb.CreateVector(channel_offsets);
-
-  Connect::Builder connect_builder(fbb);
-  connect_builder.add_channels_to_transfer(channels_offset);
-  connect_builder.add_node(node_offset);
-  fbb.Finish(connect_builder.Finish());
-
-  return fbb.Release();
-}
-
 std::vector<int> StreamToChannel(const Configuration *config,
                                  const Node *my_node, const Node *other_node) {
   std::vector<int> stream_to_channel;
@@ -120,20 +87,25 @@
 }
 
 FlatbufferDetachedBuffer<ClientStatistics> MakeClientStatistics(
-    const std::vector<std::string_view> &source_node_names,
-    const Configuration *configuration) {
+    const std::vector<std::string_view> &source_node_names) {
   flatbuffers::FlatBufferBuilder fbb;
   fbb.ForceDefaults(true);
 
   std::vector<flatbuffers::Offset<ClientConnection>> connection_offsets;
   for (const std::string_view node_name : source_node_names) {
-    flatbuffers::Offset<Node> node_offset =
-        CopyFlatBuffer(configuration::GetNode(configuration, node_name), &fbb);
+    flatbuffers::Offset<flatbuffers::String> node_name_offset =
+        fbb.CreateString(node_name);
+
+    Node::Builder node_builder(fbb);
+    node_builder.add_name(node_name_offset);
+    flatbuffers::Offset<Node> node_offset = node_builder.Finish();
+
     ClientConnection::Builder connection_builder(fbb);
     connection_builder.add_node(node_offset);
     connection_builder.add_state(State::DISCONNECTED);
     // TODO(austin): Track dropped packets.
     connection_builder.add_received_packets(0);
+    connection_builder.add_monotonic_offset(0);
     connection_offsets.emplace_back(connection_builder.Finish());
   }
   flatbuffers::Offset<
@@ -266,6 +238,26 @@
                    chrono::nanoseconds(message_header->realtime_sent_time())),
                message_header->queue_index());
 
+  const std::chrono::nanoseconds offset =
+      sender->monotonic_sent_time() -
+      aos::monotonic_clock::time_point(
+          chrono::nanoseconds(message_header->monotonic_sent_time()));
+
+  // If this is our first observation, use that to seed the base offset.  That
+  // gets us in the ballpark.
+  if (!filter_.has_sample()) {
+    filter_.set_base_offset(offset);
+  }
+
+  // We can now measure the latency!
+  filter_.Sample(sender->monotonic_sent_time(), offset);
+
+  connection_->mutate_monotonic_offset(
+      (chrono::duration_cast<chrono::nanoseconds>(
+           chrono::duration<double>(filter_.offset())) +
+       filter_.base_offset())
+          .count());
+
   if (stream_reply_with_timestamp_[stream]) {
     // TODO(austin): Send back less if we are only acking.  Maybe only a
     // stream id?  Nothing if we are only forwarding?
@@ -316,8 +308,9 @@
       sender_(event_loop_->MakeSender<ClientStatistics>("/aos")),
       source_node_names_(configuration::SourceNodeNames(
           event_loop->configuration(), event_loop->node())),
-      statistics_(MakeClientStatistics(source_node_names_,
-                                       event_loop->configuration())) {
+      statistics_(MakeClientStatistics(source_node_names_)) {
+  client_connection_offsets_.reserve(
+      statistics_.message().connections()->size());
   std::string_view node_name = event_loop->node()->name()->string_view();
 
   // Find all the channels which are supposed to be delivered to us.
@@ -355,10 +348,54 @@
   // And kick it all off.
   statistics_timer_ = event_loop_->AddTimer([this]() { SendStatistics(); });
   event_loop_->OnRun([this]() {
-    statistics_timer_->Setup(event_loop_->monotonic_now() + chrono::seconds(1),
-                             chrono::seconds(1));
+    statistics_timer_->Setup(
+        event_loop_->monotonic_now() + chrono::milliseconds(100),
+        chrono::milliseconds(100));
   });
 }
 
+void MessageBridgeClient::SendStatistics() {
+  // Copy from statistics_ and drop monotonic_offset if it isn't populated yet.
+  // There doesn't exist a good way to drop fields otherwise.
+  aos::Sender<ClientStatistics>::Builder builder = sender_.MakeBuilder();
+  client_connection_offsets_.clear();
+
+  for (const ClientConnection *connection :
+       *statistics_.message().connections()) {
+    flatbuffers::Offset<flatbuffers::String> node_name_offset =
+        builder.fbb()->CreateString(connection->node()->name()->string_view());
+    Node::Builder node_builder = builder.MakeBuilder<Node>();
+    node_builder.add_name(node_name_offset);
+    flatbuffers::Offset<Node> node_offset = node_builder.Finish();
+
+    ClientConnection::Builder client_connection_builder =
+        builder.MakeBuilder<ClientConnection>();
+
+    client_connection_builder.add_node(node_offset);
+    client_connection_builder.add_state(connection->state());
+    client_connection_builder.add_received_packets(
+        connection->received_packets());
+
+    // Strip out the monotonic offset if it isn't populated.
+    if (connection->monotonic_offset() != 0) {
+      client_connection_builder.add_monotonic_offset(
+          connection->monotonic_offset());
+    }
+
+    client_connection_offsets_.emplace_back(client_connection_builder.Finish());
+  }
+
+  flatbuffers::Offset<
+      flatbuffers::Vector<flatbuffers::Offset<ClientConnection>>>
+      client_connections_offset =
+          builder.fbb()->CreateVector(client_connection_offsets_);
+
+  ClientStatistics::Builder client_statistics_builder =
+      builder.MakeBuilder<ClientStatistics>();
+  client_statistics_builder.add_connections(client_connections_offset);
+
+  builder.Send(client_statistics_builder.Finish());
+}
+
 }  // namespace message_bridge
 }  // namespace aos
diff --git a/aos/network/message_bridge_client_lib.h b/aos/network/message_bridge_client_lib.h
index 77bf2b7..2811553 100644
--- a/aos/network/message_bridge_client_lib.h
+++ b/aos/network/message_bridge_client_lib.h
@@ -7,6 +7,7 @@
 #include "aos/events/logging/logger_generated.h"
 #include "aos/events/shm_event_loop.h"
 #include "aos/network/connect_generated.h"
+#include "aos/network/timestamp_filter.h"
 #include "aos/network/message_bridge_client_generated.h"
 #include "aos/network/sctp_client.h"
 #include "aos/network/sctp_lib.h"
@@ -76,6 +77,9 @@
   // id of the server once known.  This is only valid if connection_ says
   // connected.
   sctp_assoc_t remote_assoc_id_ = 0;
+
+  // Filter for the timestamp offset for this connection.
+  TimestampFilter filter_;
 };
 
 // This encapsulates the state required to talk to *all* the servers from this
@@ -89,7 +93,7 @@
  private:
   // Sends out the statistics that are continually updated by the
   // SctpClientConnections.
-  void SendStatistics() { sender_.Send(statistics_); }
+  void SendStatistics();
 
   // Event loop to schedule everything on.
   aos::ShmEventLoop *event_loop_;
@@ -102,6 +106,9 @@
 
   // Data to publish.
   FlatbufferDetachedBuffer<ClientStatistics> statistics_;
+  // Reserved memory for the client connection offsets to reduce heap
+  // allocations.
+  std::vector<flatbuffers::Offset<ClientConnection>> client_connection_offsets_;
 
   // Channels to send data over.
   std::vector<std::unique_ptr<aos::RawSender>> channels_;
diff --git a/aos/network/message_bridge_protocol.cc b/aos/network/message_bridge_protocol.cc
new file mode 100644
index 0000000..9663edf
--- /dev/null
+++ b/aos/network/message_bridge_protocol.cc
@@ -0,0 +1,49 @@
+#include "aos/network/message_bridge_protocol.h"
+
+#include <string_view>
+
+#include "aos/configuration.h"
+#include "aos/flatbuffer_merge.h"
+#include "aos/flatbuffers.h"
+#include "aos/network/connect_generated.h"
+#include "flatbuffers/flatbuffers.h"
+
+namespace aos {
+namespace message_bridge {
+
+aos::FlatbufferDetachedBuffer<aos::message_bridge::Connect> MakeConnectMessage(
+    const Configuration *config, const Node *my_node,
+    std::string_view remote_name) {
+  CHECK(config->has_nodes()) << ": Config must have nodes to transfer.";
+
+  flatbuffers::FlatBufferBuilder fbb;
+  fbb.ForceDefaults(true);
+
+  flatbuffers::Offset<Node> node_offset = CopyFlatBuffer<Node>(my_node, &fbb);
+  const std::string_view node_name = my_node->name()->string_view();
+
+  std::vector<flatbuffers::Offset<Channel>> channel_offsets;
+  for (const Channel *channel : *config->channels()) {
+    if (channel->has_destination_nodes()) {
+      for (const Connection *connection : *channel->destination_nodes()) {
+        if (connection->name()->string_view() == node_name &&
+            channel->source_node()->string_view() == remote_name) {
+          channel_offsets.emplace_back(CopyFlatBuffer<Channel>(channel, &fbb));
+        }
+      }
+    }
+  }
+
+  flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Channel>>>
+      channels_offset = fbb.CreateVector(channel_offsets);
+
+  Connect::Builder connect_builder(fbb);
+  connect_builder.add_channels_to_transfer(channels_offset);
+  connect_builder.add_node(node_offset);
+  fbb.Finish(connect_builder.Finish());
+
+  return fbb.Release();
+}
+
+}  // namespace message_bridge
+}  // namespace aos
diff --git a/aos/network/message_bridge_protocol.h b/aos/network/message_bridge_protocol.h
index 1136188..7b6a248 100644
--- a/aos/network/message_bridge_protocol.h
+++ b/aos/network/message_bridge_protocol.h
@@ -1,6 +1,11 @@
 #ifndef AOS_NETWORK_MESSAGE_BRIDGE_PROTOCOL_H_
 #define AOS_NETWORK_MESSAGE_BRIDGE_PROTOCOL_H_
 
+#include <string_view>
+
+#include "aos/configuration.h"
+#include "aos/network/connect_generated.h"
+
 namespace aos {
 namespace message_bridge {
 
@@ -25,6 +30,11 @@
 // The stream on which timestamp replies are sent.
 constexpr size_t kTimestampStream() { return 1; }
 
+// Builds up a subscription request for my_node to remote_name.
+aos::FlatbufferDetachedBuffer<aos::message_bridge::Connect> MakeConnectMessage(
+    const Configuration *config, const Node *my_node,
+    std::string_view remote_name);
+
 }  // namespace message_bridge
 }  // namespace aos
 
diff --git a/aos/network/message_bridge_server.fbs b/aos/network/message_bridge_server.fbs
index 75a6a15..8f06fdb 100644
--- a/aos/network/message_bridge_server.fbs
+++ b/aos/network/message_bridge_server.fbs
@@ -22,6 +22,11 @@
   // Number of packets received on all channels.
   sent_packets:uint;
 
+  // This is the measured monotonic offset for the connected node in
+  // nanoseconds.  Add this to our monotonic time to get their
+  // monotonic time.
+  monotonic_offset:int64;
+
   // TODO(austin): Per channel counts?
 }
 
diff --git a/aos/network/message_bridge_server_lib.cc b/aos/network/message_bridge_server_lib.cc
index c32829c..f54e2bf 100644
--- a/aos/network/message_bridge_server_lib.cc
+++ b/aos/network/message_bridge_server_lib.cc
@@ -33,6 +33,7 @@
     connection_builder.add_state(State::DISCONNECTED);
     connection_builder.add_dropped_packets(0);
     connection_builder.add_sent_packets(0);
+    connection_builder.add_monotonic_offset(0);
     connection_offsets.emplace_back(connection_builder.Finish());
   }
   flatbuffers::Offset<
@@ -211,11 +212,52 @@
           configuration::DestinationNodeNames(event_loop->configuration(),
                                               event_loop->node()),
           event_loop->configuration())),
+      timestamp_sender_(event_loop_->MakeSender<Timestamp>("/aos")),
+      client_statistics_fetcher_(
+          event_loop_->MakeFetcher<ClientStatistics>("/aos")),
       server_("::", event_loop->node()->port()) {
   CHECK(event_loop_->node() != nullptr) << ": No nodes configured.";
 
-  // TODO(austin): Time sync.  sctp gives us filtered round trip time, not
-  // target time.
+  server_connection_offsets_.reserve(
+      statistics_.message().connections()->size());
+
+  int32_t max_size = 0;
+  timestamp_fetchers_.resize(event_loop->configuration()->nodes()->size());
+  filters_.resize(event_loop->configuration()->nodes()->size());
+  server_connection_.resize(event_loop->configuration()->nodes()->size());
+
+  // Seed up all the per-node connection state.
+  for (std::string_view destination_node_name :
+       configuration::DestinationNodeNames(event_loop->configuration(),
+                                           event_loop->node())) {
+    // Find the largest connection message so we can size our buffers big enough
+    // to receive a connection message.
+    max_size = std::max(
+        max_size,
+        static_cast<int32_t>(MakeConnectMessage(event_loop->configuration(),
+                                                event_loop->node(),
+                                                destination_node_name)
+                                 .size()));
+    const Node *destination_node = configuration::GetNode(
+        event_loop->configuration(), destination_node_name);
+
+    const int node_index = configuration::GetNodeIndex(
+        event_loop->configuration(), destination_node);
+
+    // Now find the timestamp channel forwarded from the other node.
+    const Channel *const other_timestamp_channel =
+        configuration::GetChannel(event_loop_->configuration(), "/aos",
+                                  Timestamp::GetFullyQualifiedName(),
+                                  event_loop_->name(), destination_node);
+
+    timestamp_fetchers_[node_index] = event_loop_->MakeFetcher<Timestamp>(
+        other_timestamp_channel->name()->string_view());
+
+    // And then find the server connection that we should be populating
+    // statistics into.
+    server_connection_[node_index] = FindServerConnection(
+        statistics_.mutable_message(), destination_node->name()->string_view());
+  }
 
   // TODO(austin): Logging synchronization.
   //
@@ -228,11 +270,16 @@
   LOG(INFO) << "Hostname: " << event_loop_->node()->hostname()->string_view();
 
   int channel_index = 0;
+  const Channel *const timestamp_channel = configuration::GetChannel(
+      event_loop_->configuration(), "/aos", Timestamp::GetFullyQualifiedName(),
+      event_loop_->name(), event_loop_->node());
+
   for (const Channel *channel : *event_loop_->configuration()->channels()) {
     CHECK(channel->has_source_node());
-    if (channel->source_node()->string_view() ==
-            event_loop_->node()->name()->string_view() &&
+
+    if (configuration::ChannelIsSendableOnNode(channel, event_loop_->node()) &&
         channel->has_destination_nodes()) {
+      max_size = std::max(channel->max_size(), max_size);
       std::unique_ptr<ChannelState> state(
           new ChannelState{channel, channel_index});
 
@@ -246,13 +293,20 @@
             configuration::ChannelMessageIsLoggedOnNode(channel, other_node));
       }
 
-      // Call SendData for every message.
-      ChannelState *state_ptr = state.get();
-      event_loop_->MakeRawWatcher(
-          channel,
-          [this, state_ptr](const Context &context, const void * /*message*/) {
-            state_ptr->SendData(&server_, context);
-          });
+      // Don't subscribe to timestamps on the timestamp channel.  Those get
+      // handled specially.
+      if (channel != timestamp_channel) {
+        // Call SendData for every message.
+        ChannelState *state_ptr = state.get();
+        event_loop_->MakeRawWatcher(
+            channel, [this, state_ptr](const Context &context,
+                                       const void * /*message*/) {
+              state_ptr->SendData(&server_, context);
+            });
+      } else {
+        CHECK(timestamp_state_ == nullptr);
+        timestamp_state_ = state.get();
+      }
       channels_.emplace_back(std::move(state));
     } else {
       channels_.emplace_back(nullptr);
@@ -260,10 +314,13 @@
     ++channel_index;
   }
 
-  statistics_timer_ = event_loop_->AddTimer([this]() { SendStatistics(); });
+  // Buffer up the max size a bit so everything fits nicely.
+  server_.SetMaxSize(max_size + 100);
+
+  statistics_timer_ = event_loop_->AddTimer([this]() { Tick(); });
   event_loop_->OnRun([this]() {
-    statistics_timer_->Setup(event_loop_->monotonic_now() + chrono::seconds(1),
-                             chrono::seconds(1));
+    statistics_timer_->Setup(event_loop_->monotonic_now() + kPingPeriod,
+                             kPingPeriod);
   });
 }
 
@@ -365,5 +422,173 @@
   }
 }
 
+void MessageBridgeServer::SendStatistics() {
+  aos::Sender<ServerStatistics>::Builder builder = sender_.MakeBuilder();
+
+  server_connection_offsets_.clear();
+
+  // Copy the statistics over, but only add monotonic_offset if it is valid.
+  for (const ServerConnection *connection :
+       *statistics_.message().connections()) {
+    flatbuffers::Offset<flatbuffers::String> node_name_offset =
+        builder.fbb()->CreateString(connection->node()->name()->string_view());
+    Node::Builder node_builder = builder.MakeBuilder<Node>();
+    node_builder.add_name(node_name_offset);
+    flatbuffers::Offset<Node> node_offset = node_builder.Finish();
+
+    ServerConnection::Builder server_connection_builder =
+        builder.MakeBuilder<ServerConnection>();
+    server_connection_builder.add_node(node_offset);
+    server_connection_builder.add_state(connection->state());
+    server_connection_builder.add_dropped_packets(
+        connection->dropped_packets());
+    server_connection_builder.add_sent_packets(connection->sent_packets());
+
+    // TODO(austin): If it gets stale, drop it too.
+    if (connection->monotonic_offset() != 0) {
+      server_connection_builder.add_monotonic_offset(
+          connection->monotonic_offset());
+    }
+
+    server_connection_offsets_.emplace_back(server_connection_builder.Finish());
+  }
+
+  flatbuffers::Offset<
+      flatbuffers::Vector<flatbuffers::Offset<ServerConnection>>>
+      server_connections_offset =
+          builder.fbb()->CreateVector(server_connection_offsets_);
+
+  ServerStatistics::Builder server_statistics_builder =
+      builder.MakeBuilder<ServerStatistics>();
+  server_statistics_builder.add_connections(server_connections_offset);
+  builder.Send(server_statistics_builder.Finish());
+}
+
+void MessageBridgeServer::Tick() {
+  // Send statistics every kStatisticsPeriod. Use the context so we don't get
+  // caught up with the wakeup delay and jitter.
+  if (event_loop_->context().monotonic_event_time >=
+      last_statistics_send_time_ + kStatisticsPeriod) {
+    SendStatistics();
+    last_statistics_send_time_ = event_loop_->context().monotonic_event_time;
+  }
+
+  // The message_bridge_client application measures and filters the offsets from
+  // all messages it receives.  It then sends this on in the ClientStatistics
+  // message.  Collect that up and forward it back over the Timestamp message so
+  // we have guarenteed traffic on the other node for timestamping.  This also
+  // moves the offsets back across the network so both directions can be
+  // observed.
+  client_statistics_fetcher_.Fetch();
+
+  // Build up the timestamp message.  Do it here so that we don't have invalid
+  // data in it.
+  FlatbufferFixedAllocatorArray<Timestamp, 1000> timestamp_copy;
+  flatbuffers::FlatBufferBuilder *fbb = timestamp_copy.Builder();
+
+  if (client_statistics_fetcher_.get()) {
+    // Build up the list of client offsets.
+    std::vector<flatbuffers::Offset<ClientOffset>> client_offsets;
+
+    // Iterate through the connections this node has made.
+    for (const ClientConnection *connection :
+         *client_statistics_fetcher_->connections()) {
+      // Filter out the ones which aren't connected.
+      if (connection->state() != State::CONNECTED) continue;
+      // And the ones without monotonic offsets.
+      if (!connection->has_monotonic_offset()) continue;
+
+      const int node_index = configuration::GetNodeIndex(
+          event_loop_->configuration(),
+          connection->node()->name()->string_view());
+
+      timestamp_fetchers_[node_index].Fetch();
+
+      // Find the offset computed on their node for this client connection
+      // using their timestamp message.
+      bool has_their_offset = false;
+      std::chrono::nanoseconds their_offset = std::chrono::nanoseconds(0);
+      if (timestamp_fetchers_[node_index].get() != nullptr) {
+        for (const ClientOffset *client_offset :
+             *timestamp_fetchers_[node_index]->offsets()) {
+          if (client_offset->node()->name()->string_view() ==
+              event_loop_->node()->name()->string_view()) {
+            if (client_offset->has_monotonic_offset()) {
+              their_offset =
+                  std::chrono::nanoseconds(client_offset->monotonic_offset());
+              has_their_offset = true;
+            }
+            break;
+          }
+        }
+      }
+
+      if (has_their_offset) {
+        // Update the filters.
+        if (filters_[node_index].MissingSamples()) {
+          // Update the offset the first time.  This should be representative.
+          filters_[node_index].set_base_offset(
+              std::chrono::nanoseconds(connection->monotonic_offset()));
+        }
+        // The message_bridge_clients are the ones running the first filter.  So
+        // set the values from that and let the averaging filter run from there.
+        filters_[node_index].FwdSet(
+            timestamp_fetchers_[node_index].context().monotonic_remote_time,
+            std::chrono::nanoseconds(connection->monotonic_offset()));
+        filters_[node_index].RevSet(
+            client_statistics_fetcher_.context().monotonic_event_time,
+            their_offset);
+
+        // Publish!
+        server_connection_[node_index]->mutate_monotonic_offset(
+            -filters_[node_index].offset().count());
+      }
+
+      // Now fill out the Timestamp message with the offset from the client.
+      flatbuffers::Offset<flatbuffers::String> node_name_offset =
+          fbb->CreateString(connection->node()->name()->string_view());
+
+      Node::Builder node_builder(*fbb);
+      node_builder.add_name(node_name_offset);
+      flatbuffers::Offset<Node> node_offset = node_builder.Finish();
+
+      ClientOffset::Builder client_offset_builder(*fbb);
+      client_offset_builder.add_node(node_offset);
+      client_offset_builder.add_monotonic_offset(
+          connection->monotonic_offset());
+      client_offsets.emplace_back(client_offset_builder.Finish());
+    }
+    flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<ClientOffset>>>
+        offsets_offset = fbb->CreateVector(client_offsets);
+
+    Timestamp::Builder builder(*fbb);
+    builder.add_offsets(offsets_offset);
+    timestamp_copy.Finish(builder.Finish());
+  } else {
+    // Publish an empty timestamp if we have nothing.
+    flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<ClientOffset>>>
+        offsets_offset =
+            fbb->CreateVector(std::vector<flatbuffers::Offset<ClientOffset>>{});
+    Timestamp::Builder builder(*fbb);
+    builder.add_offsets(offsets_offset);
+    timestamp_copy.Finish(builder.Finish());
+  }
+
+  // Send it out over shm, and using that timestamp, then send it out over sctp.
+  // This avoid some context switches.
+  timestamp_sender_.Send(timestamp_copy);
+
+  Context context;
+  context.monotonic_event_time = timestamp_sender_.monotonic_sent_time();
+  context.realtime_event_time = timestamp_sender_.realtime_sent_time();
+  context.queue_index = timestamp_sender_.sent_queue_index();
+  context.size = timestamp_copy.size();
+  context.data = timestamp_copy.data();
+
+  // Since we are building up the timestamp to send here, we need to trigger the
+  // SendData call ourselves.
+  timestamp_state_->SendData(&server_, context);
+}
+
 }  // namespace message_bridge
 }  // namespace aos
diff --git a/aos/network/message_bridge_server_lib.h b/aos/network/message_bridge_server_lib.h
index f202fc9..d271c8c 100644
--- a/aos/network/message_bridge_server_lib.h
+++ b/aos/network/message_bridge_server_lib.h
@@ -8,8 +8,10 @@
 #include "aos/events/logging/logger_generated.h"
 #include "aos/events/shm_event_loop.h"
 #include "aos/network/connect_generated.h"
+#include "aos/network/message_bridge_client_generated.h"
 #include "aos/network/message_bridge_server_generated.h"
 #include "aos/network/sctp_server.h"
+#include "aos/network/timestamp_generated.h"
 #include "glog/logging.h"
 
 namespace aos {
@@ -105,9 +107,12 @@
   // received.
   void HandleData(const Message *message);
 
+  // Handle timestamps and statistics.
+  void Tick();
+
   // Sends out the statistics that are continually updated by the
   // ChannelState's.
-  void SendStatistics() { sender_.Send(statistics_); }
+  void SendStatistics();
 
   // Event loop to schedule everything on.
   aos::ShmEventLoop *event_loop_;
@@ -116,9 +121,33 @@
   aos::Sender<ServerStatistics> sender_;
   aos::TimerHandler *statistics_timer_;
   FlatbufferDetachedBuffer<ServerStatistics> statistics_;
+  std::vector<flatbuffers::Offset<ServerConnection>> server_connection_offsets_;
+
+  // Sender for the timestamps that we are forwarding over the network.
+  aos::Sender<Timestamp> timestamp_sender_;
+  // ChannelState to send timestamps over the network with.
+  ChannelState *timestamp_state_ = nullptr;
+
+  // Fetcher to grab the measured offsets in the client.
+  aos::Fetcher<ClientStatistics> client_statistics_fetcher_;
+  // All of these are indexed by the other node index.
+  // Fetcher to grab timestamps and therefore offsets from the other nodes.
+  std::vector<aos::Fetcher<Timestamp>> timestamp_fetchers_;
+  // Bidirectional filters for each connection.
+  std::vector<ClippedAverageFilter> filters_;
+  // ServerConnection to fill out the offsets for from each node.
+  std::vector<ServerConnection *> server_connection_;
 
   SctpServer server_;
 
+  static constexpr std::chrono::nanoseconds kStatisticsPeriod =
+      std::chrono::seconds(1);
+  static constexpr std::chrono::nanoseconds kPingPeriod =
+      std::chrono::milliseconds(100);
+
+  aos::monotonic_clock::time_point last_statistics_send_time_ =
+      aos::monotonic_clock::min_time;
+
   // List of channels.  The entries that aren't sent from this node are left
   // null.
   std::vector<std::unique_ptr<ChannelState>> channels_;
diff --git a/aos/network/message_bridge_test.cc b/aos/network/message_bridge_test.cc
index 383c1c4..c1dd1ca 100644
--- a/aos/network/message_bridge_test.cc
+++ b/aos/network/message_bridge_test.cc
@@ -49,8 +49,12 @@
   FLAGS_application_name = "pi1_message_bridge_server";
   // Force ourselves to be "raspberrypi" and allocate everything.
   FLAGS_override_hostname = "raspberrypi";
-  aos::ShmEventLoop server_event_loop(&server_config.message());
-  MessageBridgeServer message_bridge_server(&server_event_loop);
+  aos::ShmEventLoop pi1_server_event_loop(&server_config.message());
+  MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
+
+  FLAGS_application_name = "pi1_message_bridge_client";
+  aos::ShmEventLoop pi1_client_event_loop(&server_config.message());
+  MessageBridgeClient pi1_message_bridge_client(&pi1_client_event_loop);
 
   // And build the app which sends the pings.
   FLAGS_application_name = "ping";
@@ -61,42 +65,41 @@
   // Now do it for "raspberrypi2", the client.
   FLAGS_application_name = "pi2_message_bridge_client";
   FLAGS_override_hostname = "raspberrypi2";
-  aos::ShmEventLoop client_event_loop(&client_config.message());
-  MessageBridgeClient message_bridge_client(&client_event_loop);
+  aos::ShmEventLoop pi2_client_event_loop(&client_config.message());
+  MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
+
+  FLAGS_application_name = "pi2_message_bridge_server";
+  aos::ShmEventLoop pi2_server_event_loop(&client_config.message());
+  MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
 
   // And build the app which sends the pongs.
   FLAGS_application_name = "pong";
   aos::ShmEventLoop pong_event_loop(&client_config.message());
 
+  // And build the app for testing.
+  FLAGS_application_name = "test";
+  aos::ShmEventLoop test_event_loop(&client_config.message());
+
+  aos::Fetcher<ClientStatistics> client_statistics_fetcher =
+      test_event_loop.MakeFetcher<ClientStatistics>("/aos");
+
   // Count the pongs.
   int pong_count = 0;
   pong_event_loop.MakeWatcher(
-      "/test2", [&pong_count, &ping_event_loop](const examples::Ping &ping) {
+      "/test2", [&pong_count](const examples::Ping &ping) {
         ++pong_count;
         LOG(INFO) << "Got ping back " << FlatbufferToJson(&ping);
-        if (pong_count >= 2) {
-          LOG(INFO) << "That's enough bailing early.";
-          // And Exit is async safe, so thread safe is easy.
-          ping_event_loop.Exit();
-        }
       });
 
   FLAGS_override_hostname = "";
 
-  // Start everything up.  Pong is the only thing we don't know how to wait on,
-  // so start it first.
-  std::thread pong_thread([&pong_event_loop]() { pong_event_loop.Run(); });
-
-  std::thread server_thread(
-      [&server_event_loop]() { server_event_loop.Run(); });
-  std::thread client_thread(
-      [&client_event_loop]() { client_event_loop.Run(); });
-
   // Wait until we are connected, then send.
   int ping_count = 0;
+  int pi1_server_statistics_count = 0;
   ping_event_loop.MakeWatcher(
-      "/aos/pi1", [&ping_count, &client_event_loop,
-                   &ping_sender](const ServerStatistics &stats) {
+      "/aos/pi1",
+      [&ping_count, &pi2_client_event_loop, &ping_sender,
+       &pi1_server_statistics_count](const ServerStatistics &stats) {
         LOG(INFO) << FlatbufferToJson(&stats);
 
         ASSERT_TRUE(stats.has_connections());
@@ -104,12 +107,21 @@
 
         bool connected = false;
         for (const ServerConnection *connection : *stats.connections()) {
+          // Confirm that we are estimating the server time offset correctly. It
+          // should be about 0 since we are on the same machine here.
+          if (connection->has_monotonic_offset()) {
+            EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
+                      chrono::milliseconds(1));
+            EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
+                      chrono::milliseconds(-1));
+            ++pi1_server_statistics_count;
+          }
+
           if (connection->node()->name()->string_view() ==
-              client_event_loop.node()->name()->string_view()) {
+              pi2_client_event_loop.node()->name()->string_view()) {
             if (connection->state() == State::CONNECTED) {
               connected = true;
             }
-            break;
           }
         }
 
@@ -124,29 +136,136 @@
         }
       });
 
-  // Time ourselves out after a while if Pong doesn't do it for us.
+  // Confirm both client and server statistics messages have decent offsets in
+  // them.
+  int pi2_server_statistics_count = 0;
+  pong_event_loop.MakeWatcher("/aos/pi2", [&pi2_server_statistics_count](
+                                              const ServerStatistics &stats) {
+    LOG(INFO) << FlatbufferToJson(&stats);
+    for (const ServerConnection *connection : *stats.connections()) {
+      if (connection->has_monotonic_offset()) {
+        ++pi2_server_statistics_count;
+        // Confirm that we are estimating the server time offset correctly. It
+        // should be about 0 since we are on the same machine here.
+        EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
+                  chrono::milliseconds(1));
+        EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
+                  chrono::milliseconds(-1));
+      }
+    }
+  });
+
+  int pi1_client_statistics_count = 0;
+  ping_event_loop.MakeWatcher(
+      "/aos/pi1", [&pi1_client_statistics_count](const ClientStatistics &stats) {
+        LOG(INFO) << FlatbufferToJson(&stats);
+
+        for (const ClientConnection *connection : *stats.connections()) {
+          if (connection->has_monotonic_offset()) {
+            ++pi1_client_statistics_count;
+            // It takes at least 10 microseconds to send a message between the
+            // client and server.  The min (filtered) time shouldn't be over 10
+            // milliseconds on localhost.  This might have to bump up if this is
+            // proving flaky.
+            EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
+                      chrono::milliseconds(10));
+            EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
+                      chrono::microseconds(10));
+          }
+        }
+      });
+
+  int pi2_client_statistics_count = 0;
+  pong_event_loop.MakeWatcher("/aos/pi2", [&pi2_client_statistics_count](
+                                              const ClientStatistics &stats) {
+    LOG(INFO) << FlatbufferToJson(&stats);
+
+    for (const ClientConnection *connection : *stats.connections()) {
+      if (connection->has_monotonic_offset()) {
+        ++pi2_client_statistics_count;
+        EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
+                  chrono::milliseconds(10));
+        EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
+                  chrono::microseconds(10));
+      }
+    }
+  });
+
+  ping_event_loop.MakeWatcher("/aos/pi1", [](const Timestamp &timestamp) {
+    EXPECT_TRUE(timestamp.has_offsets());
+    LOG(INFO) << FlatbufferToJson(&timestamp);
+  });
+  pong_event_loop.MakeWatcher("/aos/pi2", [](const Timestamp &timestamp) {
+    EXPECT_TRUE(timestamp.has_offsets());
+    LOG(INFO) << FlatbufferToJson(&timestamp);
+  });
+
+  // Run for 5 seconds to make sure we have time to estimate the offset.
   aos::TimerHandler *quit = ping_event_loop.AddTimer(
       [&ping_event_loop]() { ping_event_loop.Exit(); });
   ping_event_loop.OnRun([quit, &ping_event_loop]() {
-    quit->Setup(ping_event_loop.monotonic_now() + chrono::seconds(10));
+    // Stop between timestamps, not exactly on them.
+    quit->Setup(ping_event_loop.monotonic_now() + chrono::milliseconds(5050));
   });
 
+  // Start everything up.  Pong is the only thing we don't know how to wait on,
+  // so start it first.
+  std::thread pong_thread([&pong_event_loop]() { pong_event_loop.Run(); });
+
+  std::thread pi1_server_thread(
+      [&pi1_server_event_loop]() { pi1_server_event_loop.Run(); });
+  std::thread pi1_client_thread(
+      [&pi1_client_event_loop]() { pi1_client_event_loop.Run(); });
+  std::thread pi2_client_thread(
+      [&pi2_client_event_loop]() { pi2_client_event_loop.Run(); });
+  std::thread pi2_server_thread(
+      [&pi2_server_event_loop]() { pi2_server_event_loop.Run(); });
 
   // And go!
   ping_event_loop.Run();
 
   // Shut everyone else down
-  server_event_loop.Exit();
-  client_event_loop.Exit();
+  pi1_server_event_loop.Exit();
+  pi1_client_event_loop.Exit();
+  pi2_client_event_loop.Exit();
+  pi2_server_event_loop.Exit();
   pong_event_loop.Exit();
-  server_thread.join();
-  client_thread.join();
+  pi1_server_thread.join();
+  pi1_client_thread.join();
+  pi2_client_thread.join();
+  pi2_server_thread.join();
   pong_thread.join();
 
   // Make sure we sent something.
   EXPECT_GE(ping_count, 1);
   // And got something back.
   EXPECT_GE(pong_count, 1);
+
+  // Confirm that we are estimating a monotonic offset on the client.
+  ASSERT_TRUE(client_statistics_fetcher.Fetch());
+
+  EXPECT_EQ(client_statistics_fetcher->connections()->size(), 1u);
+  EXPECT_EQ(client_statistics_fetcher->connections()
+                ->Get(0)
+                ->node()
+                ->name()
+                ->string_view(),
+            "pi1");
+
+  // Make sure the offset in one direction is less than a second.
+  EXPECT_GT(
+      client_statistics_fetcher->connections()->Get(0)->monotonic_offset(), 0);
+  EXPECT_LT(
+      client_statistics_fetcher->connections()->Get(0)->monotonic_offset(),
+      1000000000);
+
+  EXPECT_GE(pi1_server_statistics_count, 2);
+  EXPECT_GE(pi2_server_statistics_count, 2);
+  EXPECT_GE(pi1_client_statistics_count, 2);
+  EXPECT_GE(pi2_client_statistics_count, 2);
+
+  // TODO(austin): Need 2 servers going so we can do the round trip offset
+  // estimation.
 }
 
 }  // namespace testing
diff --git a/aos/network/message_bridge_test_client.json b/aos/network/message_bridge_test_client.json
index 65d28d2..d748225 100644
--- a/aos/network/message_bridge_test_client.json
+++ b/aos/network/message_bridge_test_client.json
@@ -11,7 +11,7 @@
     {
       "name": "pi2",
       "hostname": "raspberrypi2",
-      "port": 9971
+      "port": 9972
     }
   ]
 }
diff --git a/aos/network/message_bridge_test_common.json b/aos/network/message_bridge_test_common.json
index a073869..1e5dfce 100644
--- a/aos/network/message_bridge_test_common.json
+++ b/aos/network/message_bridge_test_common.json
@@ -18,6 +18,46 @@
     },
     {
       "name": "/aos/pi1",
+      "type": "aos.message_bridge.Timestamp",
+      "source_node": "pi1",
+      "frequency": 10,
+      "max_size": 200,
+      "destination_nodes": [
+        {
+          "name": "pi2",
+          "priority": 1
+        }
+      ]
+    },
+    {
+      "name": "/aos/pi2",
+      "type": "aos.message_bridge.Timestamp",
+      "source_node": "pi2",
+      "frequency": 10,
+      "max_size": 200,
+      "destination_nodes": [
+        {
+          "name": "pi1",
+          "priority": 1
+        }
+      ]
+    },
+    {
+      "name": "/aos/pi1_forwarded",
+      "type": "aos.message_bridge.Timestamp",
+      "source_node": "pi2",
+      "frequency": 10,
+      "max_size": 200
+    },
+    {
+      "name": "/aos/pi2_forwarded",
+      "type": "aos.message_bridge.Timestamp",
+      "source_node": "pi1",
+      "frequency": 10,
+      "max_size": 200
+    },
+    {
+      "name": "/aos/pi1",
       "type": "aos.message_bridge.ServerStatistics",
       "source_node": "pi1",
       "frequency": 2
@@ -90,22 +130,6 @@
       ]
     }
   ],
-  "applications": [
-    {
-      "name": "pi2_message_bridge_client",
-      "maps": [
-        {
-          "match": {
-            "name": "/test",
-            "type": "aos.examples.Ping"
-          },
-          "rename": {
-            "name": "/test2"
-          }
-        }
-      ]
-    }
-  ],
   "maps": [
     {
       "match": {
@@ -124,6 +148,34 @@
       "rename": {
         "name": "/aos/pi2"
       }
+    },
+    {
+      "match": {
+        "name": "/test",
+        "type": "aos.examples.Ping",
+        "source_node": "pi2"
+      },
+      "rename": {
+        "name": "/test2"
+      }
+    },
+    {
+      "match": {
+        "name": "/aos/pi1",
+        "source_node": "pi2"
+      },
+      "rename": {
+        "name": "/aos/pi1_forwarded"
+      }
+    },
+    {
+      "match": {
+        "name": "/aos/pi2",
+        "source_node": "pi1"
+      },
+      "rename": {
+        "name": "/aos/pi2_forwarded"
+      }
     }
   ]
 }
diff --git a/aos/network/message_bridge_test_server.json b/aos/network/message_bridge_test_server.json
index eea92e7..35cedb6 100644
--- a/aos/network/message_bridge_test_server.json
+++ b/aos/network/message_bridge_test_server.json
@@ -11,7 +11,7 @@
     {
       "name": "pi2",
       "hostname": "localhost",
-      "port": 9971
+      "port": 9972
     }
   ]
 }
diff --git a/aos/network/sctp_client.h b/aos/network/sctp_client.h
index 926e59b..5b6df3b 100644
--- a/aos/network/sctp_client.h
+++ b/aos/network/sctp_client.h
@@ -43,6 +43,8 @@
 
   void LogSctpStatus(sctp_assoc_t assoc_id);
 
+  void set_max_size(size_t max_size) { max_size_ = max_size; }
+
  private:
   struct sockaddr_storage sockaddr_remote_;
   struct sockaddr_storage sockaddr_local_;
diff --git a/aos/network/sctp_lib.cc b/aos/network/sctp_lib.cc
index 1ed816b..5af994e 100644
--- a/aos/network/sctp_lib.cc
+++ b/aos/network/sctp_lib.cc
@@ -175,9 +175,9 @@
   memset(&inmessage, 0, sizeof(struct msghdr));
 
   aos::unique_c_ptr<Message> result(
-      reinterpret_cast<Message *>(malloc(sizeof(Message) + max_size)));
+      reinterpret_cast<Message *>(malloc(sizeof(Message) + max_size + 1)));
 
-  iov.iov_len = max_size;
+  iov.iov_len = max_size + 1;
   iov.iov_base = result->mutable_data();
 
   inmessage.msg_iov = &iov;
@@ -193,6 +193,7 @@
   PCHECK((size = recvmsg(fd, &inmessage, 0)) > 0);
 
   result->size = size;
+  CHECK_LE(size, max_size) << ": Message overflowed buffer.";
 
   if ((MSG_NOTIFICATION & inmessage.msg_flags)) {
     result->message_type = Message::kNotification;
diff --git a/aos/network/sctp_server.cc b/aos/network/sctp_server.cc
index 70d5b28..5fd9f53 100644
--- a/aos/network/sctp_server.cc
+++ b/aos/network/sctp_server.cc
@@ -63,8 +63,7 @@
 
   PCHECK(listen(fd_, 100) == 0);
 
-  PCHECK(setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &max_size_,
-                    sizeof(max_size_)) == 0);
+  SetMaxSize(1000);
 }
 
 aos::unique_c_ptr<Message> SctpServer::Read() {
diff --git a/aos/network/sctp_server.h b/aos/network/sctp_server.h
index a3086d9..b702aa8 100644
--- a/aos/network/sctp_server.h
+++ b/aos/network/sctp_server.h
@@ -46,11 +46,19 @@
   void SetStreamPriority(sctp_assoc_t assoc_id, int stream_id,
                          uint16_t priority);
 
+  void SetMaxSize(size_t max_size) {
+    max_size_ = max_size;
+    // Have the kernel give us a factor of 10 more.  This lets us have more than
+    // one full sized packet in flight.
+    max_size = max_size * 10;
+    PCHECK(setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &max_size,
+                      sizeof(max_size)) == 0);
+  }
+
  private:
   struct sockaddr_storage sockaddr_local_;
   int fd_;
 
-  // TODO(austin): Configure this.
   size_t max_size_ = 1000;
 
   int ppid_ = 1;
diff --git a/aos/network/timestamp.fbs b/aos/network/timestamp.fbs
new file mode 100644
index 0000000..299ecaf
--- /dev/null
+++ b/aos/network/timestamp.fbs
@@ -0,0 +1,15 @@
+include "aos/configuration.fbs";
+
+namespace aos.message_bridge;
+
+table ClientOffset {
+  node:Node;
+
+  monotonic_offset:int64;
+}
+
+table Timestamp {
+  offsets:[ClientOffset];
+}
+
+root_type Timestamp;
diff --git a/aos/network/timestamp_filter.h b/aos/network/timestamp_filter.h
index b4280f6..6f2baca 100644
--- a/aos/network/timestamp_filter.h
+++ b/aos/network/timestamp_filter.h
@@ -24,6 +24,18 @@
 // much about precision when solving for the global offset.
 class TimestampFilter {
  public:
+  // Forces the offset and time to the provided sample without filtering.  Used
+  // for syncing with a remote filter calculation.
+  void Set(aos::monotonic_clock::time_point monotonic_now,
+           std::chrono::nanoseconds sample_ns) {
+    const double sample =
+        std::chrono::duration_cast<std::chrono::duration<double>>(sample_ns -
+                                                                  base_offset_)
+            .count();
+    offset_ = sample;
+    last_time_ = monotonic_now;
+  }
+
   // Updates with a new sample.  monotonic_now is the timestamp of the sample on
   // the destination node, and sample_ns is destination_time - source_time.
   void Sample(aos::monotonic_clock::time_point monotonic_now,
@@ -132,6 +144,13 @@
     }
   }
 
+  // Sets the forward sample without filtering.  See FwdSample for more details.
+  void FwdSet(aos::monotonic_clock::time_point monotonic_now,
+              std::chrono::nanoseconds sample_ns) {
+    fwd_.Set(monotonic_now, sample_ns);
+    Update(monotonic_now, &last_fwd_time_);
+  }
+
   // Adds a forward sample.  sample_ns = destination - source;  Forward samples
   // are from A -> B.
   void FwdSample(aos::monotonic_clock::time_point monotonic_now,
@@ -156,6 +175,13 @@
     }
   }
 
+  // Sets the forward sample without filtering.  See FwdSample for more details.
+  void RevSet(aos::monotonic_clock::time_point monotonic_now,
+              std::chrono::nanoseconds sample_ns) {
+    rev_.Set(monotonic_now, sample_ns);
+    Update(monotonic_now, &last_rev_time_);
+  }
+
   // Adds a reverse sample.  sample_ns = destination - source;  Reverse samples
   // are B -> A.
   void RevSample(aos::monotonic_clock::time_point monotonic_now,
@@ -214,6 +240,11 @@
     last_rev_time_ = aos::monotonic_clock::min_time;
   }
 
+  bool MissingSamples() {
+    return (last_fwd_time_ == aos::monotonic_clock::min_time) ||
+           (last_rev_time_ == aos::monotonic_clock::min_time);
+  }
+
  private:
   // Updates the offset estimate given the current time, and a pointer to the
   // variable holding the last time.
@@ -226,7 +257,6 @@
     const double hard_max = fwd_.offset();
     const double hard_min = -rev_.offset();
     const double average = (hard_max + hard_min) / 2.0;
-    LOG(INFO) << "max " << hard_max << " min " << hard_min;
     // We don't want to clip the offset to the hard min/max.  We really want to
     // keep it within a band around the middle.  ratio of 0.5 means stay within
     // +- 0.25 of the middle of the hard min and max.