Extract server side timestamp filtering logic

This sets us up to reuse it as part of the simulated event loop message
gateway.  It is surprisingly isolated.

Change-Id: Id327044f925b4e8209e8029777ef2a1a8c8c7c15
diff --git a/aos/network/BUILD b/aos/network/BUILD
index 5b5e84b..c7e5a28 100644
--- a/aos/network/BUILD
+++ b/aos/network/BUILD
@@ -130,6 +130,26 @@
 )
 
 cc_library(
+    name = "message_bridge_server_status",
+    srcs = [
+        "message_bridge_server_status.cc",
+    ],
+    hdrs = [
+        "message_bridge_server_status.h",
+    ],
+    deps = [
+        ":message_bridge_client_fbs",
+        ":message_bridge_server_fbs",
+        ":timestamp_fbs",
+        ":timestamp_filter",
+        "//aos:flatbuffer_merge",
+        "//aos:flatbuffers",
+        "//aos/events:event_loop",
+        "//aos/time",
+    ],
+)
+
+cc_library(
     name = "message_bridge_server_lib",
     srcs = [
         "message_bridge_server_lib.cc",
@@ -145,6 +165,7 @@
         ":message_bridge_client_fbs",
         ":message_bridge_protocol",
         ":message_bridge_server_fbs",
+        ":message_bridge_server_status",
         ":sctp_lib",
         ":sctp_server",
         ":timestamp_fbs",
diff --git a/aos/network/message_bridge_server_lib.cc b/aos/network/message_bridge_server_lib.cc
index 8a575aa..2832339 100644
--- a/aos/network/message_bridge_server_lib.cc
+++ b/aos/network/message_bridge_server_lib.cc
@@ -12,61 +12,8 @@
 
 namespace aos {
 namespace message_bridge {
-namespace {
-
 namespace chrono = std::chrono;
 
-// Builds up the "empty" server statistics message to be pointed to by all the
-// connections, updated at runtime, and periodically sent.
-FlatbufferDetachedBuffer<ServerStatistics> MakeServerStatistics(
-    const std::vector<std::string_view> &source_node_names,
-    const Configuration *configuration) {
-  flatbuffers::FlatBufferBuilder fbb;
-  fbb.ForceDefaults(true);
-
-  std::vector<flatbuffers::Offset<ServerConnection>> connection_offsets;
-  for (const std::string_view node_name : source_node_names) {
-    flatbuffers::Offset<Node> node_offset =
-        CopyFlatBuffer(configuration::GetNode(configuration, node_name), &fbb);
-    ServerConnection::Builder connection_builder(fbb);
-    connection_builder.add_node(node_offset);
-    connection_builder.add_state(State::DISCONNECTED);
-    connection_builder.add_dropped_packets(0);
-    connection_builder.add_sent_packets(0);
-    connection_builder.add_monotonic_offset(0);
-    connection_offsets.emplace_back(connection_builder.Finish());
-  }
-  flatbuffers::Offset<
-      flatbuffers::Vector<flatbuffers::Offset<ServerConnection>>>
-      connections_offset = fbb.CreateVector(connection_offsets);
-
-  ServerStatistics::Builder server_statistics_builder(fbb);
-  server_statistics_builder.add_connections(connections_offset);
-  fbb.Finish(server_statistics_builder.Finish());
-
-  return fbb.Release();
-}
-
-// Finds the statistics for the provided node name.
-ServerConnection *FindServerConnection(ServerStatistics *statistics,
-                                       std::string_view node_name) {
-  ServerConnection *matching_server_connection = nullptr;
-  for (size_t i = 0; i < statistics->mutable_connections()->size(); ++i) {
-    ServerConnection *server_connection =
-        statistics->mutable_connections()->GetMutableObject(i);
-    if (server_connection->node()->name()->string_view() == node_name) {
-      matching_server_connection = server_connection;
-      break;
-    }
-  }
-
-  CHECK(matching_server_connection != nullptr) << ": Unknown client";
-
-  return matching_server_connection;
-}
-
-}  // namespace
-
 bool ChannelState::Matches(const Channel *other_channel) {
   // Confirm the normal tuple, plus make sure that the other side isn't going to
   // send more data over than we expect with a mismatching size.
@@ -214,24 +161,13 @@
 
 MessageBridgeServer::MessageBridgeServer(aos::ShmEventLoop *event_loop)
     : event_loop_(event_loop),
-      sender_(event_loop_->MakeSender<ServerStatistics>("/aos")),
-      statistics_(MakeServerStatistics(
-          configuration::DestinationNodeNames(event_loop->configuration(),
-                                              event_loop->node()),
-          event_loop->configuration())),
-      timestamp_sender_(event_loop_->MakeSender<Timestamp>("/aos")),
-      client_statistics_fetcher_(
-          event_loop_->MakeFetcher<ClientStatistics>("/aos")),
-      server_("::", event_loop->node()->port()) {
+      server_("::", event_loop->node()->port()),
+      server_status_(event_loop, [this](const Context &context) {
+        timestamp_state_->SendData(&server_, context);
+      }) {
   CHECK(event_loop_->node() != nullptr) << ": No nodes configured.";
 
-  server_connection_offsets_.reserve(
-      statistics_.message().connections()->size());
-
   int32_t max_size = 0;
-  timestamp_fetchers_.resize(event_loop->configuration()->nodes()->size());
-  filters_.resize(event_loop->configuration()->nodes()->size());
-  server_connection_.resize(event_loop->configuration()->nodes()->size());
 
   // Seed up all the per-node connection state.
   // We are making the assumption here that every connection is bidirectional
@@ -252,25 +188,6 @@
     VLOG(1) << "Connection to " << destination_node_name << " has size "
             << connect_size;
     max_size = std::max(max_size, connect_size);
-    const Node *destination_node = configuration::GetNode(
-        event_loop->configuration(), destination_node_name);
-
-    const int node_index = configuration::GetNodeIndex(
-        event_loop->configuration(), destination_node);
-
-    // Now find the timestamp channel forwarded from the other node.
-    const Channel *const other_timestamp_channel =
-        configuration::GetChannel(event_loop_->configuration(), "/aos",
-                                  Timestamp::GetFullyQualifiedName(),
-                                  event_loop_->name(), destination_node);
-
-    timestamp_fetchers_[node_index] = event_loop_->MakeFetcher<Timestamp>(
-        other_timestamp_channel->name()->string_view());
-
-    // And then find the server connection that we should be populating
-    // statistics into.
-    server_connection_[node_index] = FindServerConnection(
-        statistics_.mutable_message(), destination_node->name()->string_view());
   }
 
   // TODO(austin): Logging synchronization.
@@ -304,8 +221,8 @@
             connection,
             configuration::GetNodeIndex(event_loop_->configuration(),
                                         connection->name()->string_view()),
-            FindServerConnection(statistics_.mutable_message(),
-                                 connection->name()->string_view()),
+            server_status_.FindServerConnection(
+                connection->name()->string_view()),
             configuration::ChannelMessageIsLoggedOnNode(channel, other_node));
       }
 
@@ -333,12 +250,6 @@
   // Buffer up the max size a bit so everything fits nicely.
   LOG(INFO) << "Max message size for all clients is " << max_size;
   server_.SetMaxSize(max_size + 100);
-
-  statistics_timer_ = event_loop_->AddTimer([this]() { Tick(); });
-  event_loop_->OnRun([this]() {
-    statistics_timer_->Setup(event_loop_->monotonic_now() + kPingPeriod,
-                             kPingPeriod);
-  });
 }
 
 void MessageBridgeServer::NodeConnected(sctp_assoc_t assoc_id) {
@@ -364,15 +275,10 @@
                    ->Get(node_index)
                    ->name()
                    ->string_view();
-    ResetFilter(node_index);
+    server_status_.ResetFilter(node_index);
   }
 }
 
-void MessageBridgeServer::ResetFilter(int node_index) {
-  filters_[node_index].Reset();
-  server_connection_[node_index]->mutate_monotonic_offset(0);
-}
-
 void MessageBridgeServer::MessageReceived() {
   aos::unique_c_ptr<Message> message = server_.Read();
 
@@ -443,7 +349,7 @@
         ++channel_index;
       }
     }
-    ResetFilter(node_index);
+    server_status_.ResetFilter(node_index);
     VLOG(1) << "Resetting filters for " << node_index << " "
             << event_loop_->configuration()
                    ->nodes()
@@ -465,192 +371,5 @@
   }
 }
 
-void MessageBridgeServer::SendStatistics() {
-  aos::Sender<ServerStatistics>::Builder builder = sender_.MakeBuilder();
-
-  server_connection_offsets_.clear();
-
-  // Copy the statistics over, but only add monotonic_offset if it is valid.
-  for (const ServerConnection *connection :
-       *statistics_.message().connections()) {
-    flatbuffers::Offset<flatbuffers::String> node_name_offset =
-        builder.fbb()->CreateString(connection->node()->name()->string_view());
-    Node::Builder node_builder = builder.MakeBuilder<Node>();
-    node_builder.add_name(node_name_offset);
-    flatbuffers::Offset<Node> node_offset = node_builder.Finish();
-
-    ServerConnection::Builder server_connection_builder =
-        builder.MakeBuilder<ServerConnection>();
-    server_connection_builder.add_node(node_offset);
-    server_connection_builder.add_state(connection->state());
-    server_connection_builder.add_dropped_packets(
-        connection->dropped_packets());
-    server_connection_builder.add_sent_packets(connection->sent_packets());
-
-    // TODO(austin): If it gets stale, drop it too.
-    if (connection->monotonic_offset() != 0) {
-      server_connection_builder.add_monotonic_offset(
-          connection->monotonic_offset());
-    }
-
-    server_connection_offsets_.emplace_back(server_connection_builder.Finish());
-  }
-
-  flatbuffers::Offset<
-      flatbuffers::Vector<flatbuffers::Offset<ServerConnection>>>
-      server_connections_offset =
-          builder.fbb()->CreateVector(server_connection_offsets_);
-
-  ServerStatistics::Builder server_statistics_builder =
-      builder.MakeBuilder<ServerStatistics>();
-  server_statistics_builder.add_connections(server_connections_offset);
-  builder.Send(server_statistics_builder.Finish());
-}
-
-void MessageBridgeServer::Tick() {
-  // Send statistics every kStatisticsPeriod. Use the context so we don't get
-  // caught up with the wakeup delay and jitter.
-  if (event_loop_->context().monotonic_event_time >=
-      last_statistics_send_time_ + kStatisticsPeriod) {
-    SendStatistics();
-    last_statistics_send_time_ = event_loop_->context().monotonic_event_time;
-  }
-
-  // The message_bridge_client application measures and filters the offsets from
-  // all messages it receives.  It then sends this on in the ClientStatistics
-  // message.  Collect that up and forward it back over the Timestamp message so
-  // we have guarenteed traffic on the other node for timestamping.  This also
-  // moves the offsets back across the network so both directions can be
-  // observed.
-  client_statistics_fetcher_.Fetch();
-
-  // Build up the timestamp message.  Do it here so that we don't have invalid
-  // data in it.
-  FlatbufferFixedAllocatorArray<Timestamp, 1000> timestamp_copy;
-  flatbuffers::FlatBufferBuilder *fbb = timestamp_copy.fbb();
-
-  if (client_statistics_fetcher_.get()) {
-    // Build up the list of client offsets.
-    std::vector<flatbuffers::Offset<ClientOffset>> client_offsets;
-
-    // Iterate through the connections this node has made.
-    for (const ClientConnection *connection :
-         *client_statistics_fetcher_->connections()) {
-      const int node_index = configuration::GetNodeIndex(
-          event_loop_->configuration(),
-          connection->node()->name()->string_view());
-
-      // Filter out the ones which aren't connected.
-      // And the ones without monotonic offsets.
-      if (connection->state() != State::CONNECTED ||
-          !connection->has_monotonic_offset() ||
-          client_statistics_fetcher_.context().monotonic_event_time +
-                  kClientStatisticsStaleTimeout <
-              event_loop_->context().monotonic_event_time) {
-        VLOG(1) << "Disconnected, no offset, or client message too old for "
-                << connection->node()->name()->string_view();
-        ResetFilter(node_index);
-        continue;
-      }
-
-      timestamp_fetchers_[node_index].Fetch();
-
-      // Find the offset computed on their node for this client connection
-      // using their timestamp message.
-      bool has_their_offset = false;
-      std::chrono::nanoseconds their_offset = std::chrono::nanoseconds(0);
-      if (timestamp_fetchers_[node_index].get() != nullptr) {
-        for (const ClientOffset *client_offset :
-             *timestamp_fetchers_[node_index]->offsets()) {
-          if (client_offset->node()->name()->string_view() ==
-              event_loop_->node()->name()->string_view()) {
-            // Make sure it has an offset and the message isn't stale.
-            if (client_offset->has_monotonic_offset()) {
-              if (timestamp_fetchers_[node_index]
-                          .context()
-                          .monotonic_event_time +
-                      kTimestampStaleTimeout >
-                  event_loop_->context().monotonic_event_time) {
-                their_offset =
-                    std::chrono::nanoseconds(client_offset->monotonic_offset());
-                has_their_offset = true;
-              } else {
-                ResetFilter(node_index);
-                VLOG(1) << "Timestamp old, resetting.";
-              }
-            }
-            break;
-          }
-        }
-      }
-
-      if (has_their_offset &&
-          server_connection_[node_index]->state() == State::CONNECTED) {
-        // Update the filters.
-        if (filters_[node_index].MissingSamples()) {
-          // Update the offset the first time.  This should be representative.
-          filters_[node_index].set_base_offset(
-              std::chrono::nanoseconds(connection->monotonic_offset()));
-        }
-        // The message_bridge_clients are the ones running the first filter.  So
-        // set the values from that and let the averaging filter run from there.
-        filters_[node_index].FwdSet(
-            timestamp_fetchers_[node_index].context().monotonic_remote_time,
-            std::chrono::nanoseconds(connection->monotonic_offset()));
-        filters_[node_index].RevSet(
-            client_statistics_fetcher_.context().monotonic_event_time,
-            their_offset);
-
-        // Publish!
-        server_connection_[node_index]->mutate_monotonic_offset(
-            -filters_[node_index].offset().count());
-      }
-
-      // Now fill out the Timestamp message with the offset from the client.
-      flatbuffers::Offset<flatbuffers::String> node_name_offset =
-          fbb->CreateString(connection->node()->name()->string_view());
-
-      Node::Builder node_builder(*fbb);
-      node_builder.add_name(node_name_offset);
-      flatbuffers::Offset<Node> node_offset = node_builder.Finish();
-
-      ClientOffset::Builder client_offset_builder(*fbb);
-      client_offset_builder.add_node(node_offset);
-      client_offset_builder.add_monotonic_offset(
-          connection->monotonic_offset());
-      client_offsets.emplace_back(client_offset_builder.Finish());
-    }
-    flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<ClientOffset>>>
-        offsets_offset = fbb->CreateVector(client_offsets);
-
-    Timestamp::Builder builder(*fbb);
-    builder.add_offsets(offsets_offset);
-    timestamp_copy.Finish(builder.Finish());
-  } else {
-    // Publish an empty timestamp if we have nothing.
-    flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<ClientOffset>>>
-        offsets_offset =
-            fbb->CreateVector(std::vector<flatbuffers::Offset<ClientOffset>>{});
-    Timestamp::Builder builder(*fbb);
-    builder.add_offsets(offsets_offset);
-    timestamp_copy.Finish(builder.Finish());
-  }
-
-  // Send it out over shm, and using that timestamp, then send it out over sctp.
-  // This avoid some context switches.
-  timestamp_sender_.Send(timestamp_copy);
-
-  Context context;
-  context.monotonic_event_time = timestamp_sender_.monotonic_sent_time();
-  context.realtime_event_time = timestamp_sender_.realtime_sent_time();
-  context.queue_index = timestamp_sender_.sent_queue_index();
-  context.size = timestamp_copy.size();
-  context.data = timestamp_copy.data();
-
-  // Since we are building up the timestamp to send here, we need to trigger the
-  // SendData call ourselves.
-  timestamp_state_->SendData(&server_, context);
-}
-
 }  // namespace message_bridge
 }  // namespace aos
diff --git a/aos/network/message_bridge_server_lib.h b/aos/network/message_bridge_server_lib.h
index 8e8ec64..b6a4f05 100644
--- a/aos/network/message_bridge_server_lib.h
+++ b/aos/network/message_bridge_server_lib.h
@@ -10,6 +10,7 @@
 #include "aos/network/connect_generated.h"
 #include "aos/network/message_bridge_client_generated.h"
 #include "aos/network/message_bridge_server_generated.h"
+#include "aos/network/message_bridge_server_status.h"
 #include "aos/network/sctp_server.h"
 #include "aos/network/timestamp_generated.h"
 #include "glog/logging.h"
@@ -94,12 +95,6 @@
 
   ~MessageBridgeServer() { event_loop_->epoll()->DeleteFd(server_.fd()); }
 
-  // Time after which we consider the client statistics message stale, and reset
-  // the filter.
-  static constexpr std::chrono::seconds kClientStatisticsStaleTimeout{1};
-  // Time after which we consider the timestamp stale, and reset the filter.
-  static constexpr std::chrono::milliseconds kTimestampStaleTimeout{250};
-
  private:
   // Reads a message from the socket.  Could be a notification.
   void MessageReceived();
@@ -109,53 +104,19 @@
   // Called when the server connection disconnects.
   void NodeDisconnected(sctp_assoc_t assoc_id);
 
-  // Resets the filter and clears the entry from the server statistics.
-  void ResetFilter(int node_index);
-
   // Called when data (either a connection request or delivery timestamps) is
   // received.
   void HandleData(const Message *message);
 
-  // Handle timestamps and statistics.
-  void Tick();
-
-  // Sends out the statistics that are continually updated by the
-  // ChannelState's.
-  void SendStatistics();
-
   // Event loop to schedule everything on.
   aos::ShmEventLoop *event_loop_;
 
-  // Statistics, timer, and associated sender.
-  aos::Sender<ServerStatistics> sender_;
-  aos::TimerHandler *statistics_timer_;
-  FlatbufferDetachedBuffer<ServerStatistics> statistics_;
-  std::vector<flatbuffers::Offset<ServerConnection>> server_connection_offsets_;
-
-  // Sender for the timestamps that we are forwarding over the network.
-  aos::Sender<Timestamp> timestamp_sender_;
-  // ChannelState to send timestamps over the network with.
-  ChannelState *timestamp_state_ = nullptr;
-
-  // Fetcher to grab the measured offsets in the client.
-  aos::Fetcher<ClientStatistics> client_statistics_fetcher_;
-  // All of these are indexed by the other node index.
-  // Fetcher to grab timestamps and therefore offsets from the other nodes.
-  std::vector<aos::Fetcher<Timestamp>> timestamp_fetchers_;
-  // Bidirectional filters for each connection.
-  std::vector<ClippedAverageFilter> filters_;
-  // ServerConnection to fill out the offsets for from each node.
-  std::vector<ServerConnection *> server_connection_;
-
   SctpServer server_;
 
-  static constexpr std::chrono::nanoseconds kStatisticsPeriod =
-      std::chrono::seconds(1);
-  static constexpr std::chrono::nanoseconds kPingPeriod =
-      std::chrono::milliseconds(100);
+  MessageBridgeServerStatus server_status_;
 
-  aos::monotonic_clock::time_point last_statistics_send_time_ =
-      aos::monotonic_clock::min_time;
+  // ChannelState to send timestamps over the network with.
+  ChannelState *timestamp_state_ = nullptr;
 
   // List of channels.  The entries that aren't sent from this node are left
   // null.
diff --git a/aos/network/message_bridge_server_status.cc b/aos/network/message_bridge_server_status.cc
new file mode 100644
index 0000000..21badef
--- /dev/null
+++ b/aos/network/message_bridge_server_status.cc
@@ -0,0 +1,327 @@
+#include "aos/network/message_bridge_server_status.h"
+
+#include <chrono>
+#include <functional>
+
+#include "aos/configuration.h"
+#include "aos/events/event_loop.h"
+#include "aos/flatbuffer_merge.h"
+#include "aos/flatbuffers.h"
+#include "aos/network/message_bridge_client_generated.h"
+#include "aos/network/message_bridge_server_generated.h"
+#include "aos/network/timestamp_filter.h"
+#include "aos/network/timestamp_generated.h"
+
+namespace aos {
+namespace message_bridge {
+namespace {
+
+namespace chrono = std::chrono;
+
+// Builds up the "empty" server statistics message to be pointed to by all the
+// connections, updated at runtime, and periodically sent.
+FlatbufferDetachedBuffer<ServerStatistics> MakeServerStatistics(
+    const std::vector<std::string_view> &source_node_names,
+    const Configuration *configuration) {
+  flatbuffers::FlatBufferBuilder fbb;
+  fbb.ForceDefaults(true);
+
+  std::vector<flatbuffers::Offset<ServerConnection>> connection_offsets;
+  for (const std::string_view node_name : source_node_names) {
+    flatbuffers::Offset<Node> node_offset =
+        CopyFlatBuffer(configuration::GetNode(configuration, node_name), &fbb);
+    ServerConnection::Builder connection_builder(fbb);
+    connection_builder.add_node(node_offset);
+    connection_builder.add_state(State::DISCONNECTED);
+    connection_builder.add_dropped_packets(0);
+    connection_builder.add_sent_packets(0);
+    connection_builder.add_monotonic_offset(0);
+    connection_offsets.emplace_back(connection_builder.Finish());
+  }
+  flatbuffers::Offset<
+      flatbuffers::Vector<flatbuffers::Offset<ServerConnection>>>
+      connections_offset = fbb.CreateVector(connection_offsets);
+
+  ServerStatistics::Builder server_statistics_builder(fbb);
+  server_statistics_builder.add_connections(connections_offset);
+  fbb.Finish(server_statistics_builder.Finish());
+
+  return fbb.Release();
+}
+
+// Finds the statistics for the provided node name.
+ServerConnection *FindServerConnection(ServerStatistics *statistics,
+                                       std::string_view node_name) {
+  ServerConnection *matching_server_connection = nullptr;
+  for (size_t i = 0; i < statistics->mutable_connections()->size(); ++i) {
+    ServerConnection *server_connection =
+        statistics->mutable_connections()->GetMutableObject(i);
+    if (server_connection->node()->name()->string_view() == node_name) {
+      matching_server_connection = server_connection;
+      break;
+    }
+  }
+
+  CHECK(matching_server_connection != nullptr) << ": Unknown client";
+
+  return matching_server_connection;
+}
+
+}  // namespace
+
+MessageBridgeServerStatus::MessageBridgeServerStatus(
+    aos::EventLoop *event_loop, std::function<void(const Context &)> send_data)
+    : event_loop_(event_loop),
+      sender_(event_loop_->MakeSender<ServerStatistics>("/aos")),
+      statistics_(MakeServerStatistics(
+          configuration::DestinationNodeNames(event_loop_->configuration(),
+                                              event_loop_->node()),
+          event_loop->configuration())),
+      client_statistics_fetcher_(
+          event_loop_->MakeFetcher<ClientStatistics>("/aos")),
+      timestamp_sender_(event_loop_->MakeSender<Timestamp>("/aos")),
+      send_data_(send_data) {
+  server_connection_offsets_.reserve(
+      statistics_.message().connections()->size());
+
+  filters_.resize(event_loop->configuration()->nodes()->size());
+  timestamp_fetchers_.resize(event_loop->configuration()->nodes()->size());
+  server_connection_.resize(event_loop->configuration()->nodes()->size());
+
+  // Seed up all the per-node connection state.
+  // We are making the assumption here that every connection is bidirectional
+  // (data is being sent both ways).  This is pretty safe because we are
+  // forwarding timestamps between nodes.
+  for (std::string_view destination_node_name :
+       configuration::DestinationNodeNames(event_loop->configuration(),
+                                           event_loop->node())) {
+    const Node *destination_node = configuration::GetNode(
+        event_loop->configuration(), destination_node_name);
+
+    const int node_index = configuration::GetNodeIndex(
+        event_loop->configuration(), destination_node);
+
+    // Now find the timestamp channel forwarded from the other node.
+    const Channel *const other_timestamp_channel =
+        configuration::GetChannel(event_loop_->configuration(), "/aos",
+                                  Timestamp::GetFullyQualifiedName(),
+                                  event_loop_->name(), destination_node);
+
+    timestamp_fetchers_[node_index] = event_loop_->MakeFetcher<Timestamp>(
+        other_timestamp_channel->name()->string_view());
+
+    // And then find the server connection that we should be populating
+    // statistics into.
+    server_connection_[node_index] =
+        FindServerConnection(destination_node->name()->string_view());
+  }
+
+  statistics_timer_ = event_loop_->AddTimer([this]() { Tick(); });
+  event_loop_->OnRun([this]() {
+    statistics_timer_->Setup(event_loop_->monotonic_now() + kPingPeriod,
+                             kPingPeriod);
+  });
+}
+
+ServerConnection *MessageBridgeServerStatus::FindServerConnection(
+    std::string_view node_name) {
+  return message_bridge::FindServerConnection(statistics_.mutable_message(),
+                                              node_name);
+}
+
+void MessageBridgeServerStatus::ResetFilter(int node_index) {
+  filters_[node_index].Reset();
+  server_connection_[node_index]->mutate_monotonic_offset(0);
+}
+
+void MessageBridgeServerStatus::SendStatistics() {
+  aos::Sender<ServerStatistics>::Builder builder = sender_.MakeBuilder();
+
+  server_connection_offsets_.clear();
+
+  // Copy the statistics over, but only add monotonic_offset if it is valid.
+  for (const ServerConnection *connection :
+       *statistics_.message().connections()) {
+    flatbuffers::Offset<flatbuffers::String> node_name_offset =
+        builder.fbb()->CreateString(connection->node()->name()->string_view());
+    Node::Builder node_builder = builder.MakeBuilder<Node>();
+    node_builder.add_name(node_name_offset);
+    flatbuffers::Offset<Node> node_offset = node_builder.Finish();
+
+    ServerConnection::Builder server_connection_builder =
+        builder.MakeBuilder<ServerConnection>();
+    server_connection_builder.add_node(node_offset);
+    server_connection_builder.add_state(connection->state());
+    server_connection_builder.add_dropped_packets(
+        connection->dropped_packets());
+    server_connection_builder.add_sent_packets(connection->sent_packets());
+
+    // TODO(austin): If it gets stale, drop it too.
+    if (connection->monotonic_offset() != 0) {
+      server_connection_builder.add_monotonic_offset(
+          connection->monotonic_offset());
+    }
+
+    server_connection_offsets_.emplace_back(server_connection_builder.Finish());
+  }
+
+  flatbuffers::Offset<
+      flatbuffers::Vector<flatbuffers::Offset<ServerConnection>>>
+      server_connections_offset =
+          builder.fbb()->CreateVector(server_connection_offsets_);
+
+  ServerStatistics::Builder server_statistics_builder =
+      builder.MakeBuilder<ServerStatistics>();
+  server_statistics_builder.add_connections(server_connections_offset);
+  builder.Send(server_statistics_builder.Finish());
+}
+
+void MessageBridgeServerStatus::Tick() {
+  // Send statistics every kStatisticsPeriod. Use the context so we don't get
+  // caught up with the wakeup delay and jitter.
+  if (event_loop_->context().monotonic_event_time >=
+      last_statistics_send_time_ + kStatisticsPeriod) {
+    SendStatistics();
+    last_statistics_send_time_ = event_loop_->context().monotonic_event_time;
+  }
+
+  // The message_bridge_client application measures and filters the offsets from
+  // all messages it receives.  It then sends this on in the ClientStatistics
+  // message.  Collect that up and forward it back over the Timestamp message so
+  // we have guarenteed traffic on the other node for timestamping.  This also
+  // moves the offsets back across the network so both directions can be
+  // observed.
+  client_statistics_fetcher_.Fetch();
+
+  // Build up the timestamp message.  Do it here so that we don't have invalid
+  // data in it.
+  FlatbufferFixedAllocatorArray<Timestamp, 1000> timestamp_copy;
+  flatbuffers::FlatBufferBuilder *fbb = timestamp_copy.fbb();
+
+  if (client_statistics_fetcher_.get()) {
+    // Build up the list of client offsets.
+    std::vector<flatbuffers::Offset<ClientOffset>> client_offsets;
+
+    // Iterate through the connections this node has made.
+    for (const ClientConnection *connection :
+         *client_statistics_fetcher_->connections()) {
+      const int node_index = configuration::GetNodeIndex(
+          event_loop_->configuration(),
+          connection->node()->name()->string_view());
+
+      // Filter out the ones which aren't connected.
+      // And the ones without monotonic offsets.
+      if (connection->state() != State::CONNECTED ||
+          !connection->has_monotonic_offset() ||
+          client_statistics_fetcher_.context().monotonic_event_time +
+                  MessageBridgeServerStatus::kClientStatisticsStaleTimeout <
+              event_loop_->context().monotonic_event_time) {
+        VLOG(1) << "Disconnected, no offset, or client message too old for "
+                << connection->node()->name()->string_view();
+        ResetFilter(node_index);
+        continue;
+      }
+
+      timestamp_fetchers_[node_index].Fetch();
+
+      // Find the offset computed on their node for this client connection
+      // using their timestamp message.
+      bool has_their_offset = false;
+      chrono::nanoseconds their_offset = chrono::nanoseconds(0);
+      if (timestamp_fetchers_[node_index].get() != nullptr) {
+        for (const ClientOffset *client_offset :
+             *timestamp_fetchers_[node_index]->offsets()) {
+          if (client_offset->node()->name()->string_view() ==
+              event_loop_->node()->name()->string_view()) {
+            // Make sure it has an offset and the message isn't stale.
+            if (client_offset->has_monotonic_offset()) {
+              if (timestamp_fetchers_[node_index]
+                          .context()
+                          .monotonic_event_time +
+                      MessageBridgeServerStatus::kTimestampStaleTimeout >
+                  event_loop_->context().monotonic_event_time) {
+                their_offset =
+                    chrono::nanoseconds(client_offset->monotonic_offset());
+                has_their_offset = true;
+              } else {
+                ResetFilter(node_index);
+                VLOG(1) << "Timestamp old, resetting.";
+              }
+            }
+            break;
+          }
+        }
+      }
+
+      if (has_their_offset &&
+          server_connection_[node_index]->state() == State::CONNECTED) {
+        // Update the filters.
+        if (filters_[node_index].MissingSamples()) {
+          // Update the offset the first time.  This should be representative.
+          filters_[node_index].set_base_offset(
+              chrono::nanoseconds(connection->monotonic_offset()));
+        }
+        // The message_bridge_clients are the ones running the first filter.  So
+        // set the values from that and let the averaging filter run from there.
+        filters_[node_index].FwdSet(
+            timestamp_fetchers_[node_index].context().monotonic_remote_time,
+            chrono::nanoseconds(connection->monotonic_offset()));
+        filters_[node_index].RevSet(
+            client_statistics_fetcher_.context().monotonic_event_time,
+            their_offset);
+
+        // Publish!
+        server_connection_[node_index]->mutate_monotonic_offset(
+            -filters_[node_index].offset().count());
+      }
+
+      // Now fill out the Timestamp message with the offset from the client.
+      flatbuffers::Offset<flatbuffers::String> node_name_offset =
+          fbb->CreateString(connection->node()->name()->string_view());
+
+      Node::Builder node_builder(*fbb);
+      node_builder.add_name(node_name_offset);
+      flatbuffers::Offset<Node> node_offset = node_builder.Finish();
+
+      ClientOffset::Builder client_offset_builder(*fbb);
+      client_offset_builder.add_node(node_offset);
+      client_offset_builder.add_monotonic_offset(
+          connection->monotonic_offset());
+      client_offsets.emplace_back(client_offset_builder.Finish());
+    }
+    flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<ClientOffset>>>
+        offsets_offset = fbb->CreateVector(client_offsets);
+
+    Timestamp::Builder builder(*fbb);
+    builder.add_offsets(offsets_offset);
+    timestamp_copy.Finish(builder.Finish());
+  } else {
+    // Publish an empty timestamp if we have nothing.
+    flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<ClientOffset>>>
+        offsets_offset =
+            fbb->CreateVector(std::vector<flatbuffers::Offset<ClientOffset>>{});
+    Timestamp::Builder builder(*fbb);
+    builder.add_offsets(offsets_offset);
+    timestamp_copy.Finish(builder.Finish());
+  }
+
+  // Send it out over shm, and using that timestamp, then send it out over sctp.
+  // This avoid some context switches.
+  timestamp_sender_.Send(timestamp_copy);
+
+  Context context;
+  context.monotonic_event_time = timestamp_sender_.monotonic_sent_time();
+  context.realtime_event_time = timestamp_sender_.realtime_sent_time();
+  context.queue_index = timestamp_sender_.sent_queue_index();
+  context.size = timestamp_copy.size();
+  context.data = timestamp_copy.data();
+
+  // Since we are building up the timestamp to send here, we need to trigger the
+  // SendData call ourselves.
+  if (send_data_) {
+    send_data_(context);
+  }
+}
+
+}  // namespace message_bridge
+}  // namespace aos
diff --git a/aos/network/message_bridge_server_status.h b/aos/network/message_bridge_server_status.h
new file mode 100644
index 0000000..adb747d
--- /dev/null
+++ b/aos/network/message_bridge_server_status.h
@@ -0,0 +1,82 @@
+#ifndef AOS_NETWORK_MESSAGE_BRIDGE_SERVER_STATUS_H_
+#define AOS_NETWORK_MESSAGE_BRIDGE_SERVER_STATUS_H_
+
+#include <chrono>
+#include <functional>
+
+#include "aos/events/event_loop.h"
+#include "aos/network/message_bridge_client_generated.h"
+#include "aos/network/message_bridge_server_generated.h"
+#include "aos/network/timestamp_filter.h"
+#include "aos/network/timestamp_generated.h"
+#include "aos/time/time.h"
+
+namespace aos {
+namespace message_bridge {
+
+// This class encapsulates the server side of sending server statistics and
+// managing timestamp offsets.
+class MessageBridgeServerStatus {
+ public:
+  // Time after which we consider the client statistics message stale, and reset
+  // the filter.
+  static constexpr std::chrono::seconds kClientStatisticsStaleTimeout{1};
+  // Time after which we consider the timestamp stale, and reset the filter.
+  static constexpr std::chrono::milliseconds kTimestampStaleTimeout{250};
+
+  MessageBridgeServerStatus(aos::EventLoop *event_loop,
+                            std::function<void(const Context &)> send_data =
+                                std::function<void(const Context &)>());
+
+  // Resets the filter and clears the entry from the server statistics.
+  void ResetFilter(int node_index);
+
+  // Returns the ServerConnection message which is updated by the server.
+  ServerConnection *FindServerConnection(std::string_view node_name);
+
+ private:
+  static constexpr std::chrono::nanoseconds kStatisticsPeriod =
+      std::chrono::seconds(1);
+  static constexpr std::chrono::nanoseconds kPingPeriod =
+      std::chrono::milliseconds(100);
+
+  // Handle timestamps and statistics.
+  void Tick();
+
+  // Sends out the statistics that are continually updated by the
+  // ChannelState's.
+  void SendStatistics();
+
+  aos::EventLoop *event_loop_;
+
+  // Statistics, timer, and associated sender.
+  aos::Sender<ServerStatistics> sender_;
+  aos::TimerHandler *statistics_timer_;
+  FlatbufferDetachedBuffer<ServerStatistics> statistics_;
+  std::vector<flatbuffers::Offset<ServerConnection>> server_connection_offsets_;
+
+  // Fetcher to grab the measured offsets in the client.
+  aos::Fetcher<ClientStatistics> client_statistics_fetcher_;
+
+  // ServerConnection to fill out the offsets for from each node.
+  std::vector<ServerConnection *> server_connection_;
+  // All of these are indexed by the other node index.
+  // Fetcher to grab timestamps and therefore offsets from the other nodes.
+  std::vector<aos::Fetcher<Timestamp>> timestamp_fetchers_;
+  // Bidirectional filters for each connection.
+  std::vector<ClippedAverageFilter> filters_;
+
+  // Sender for the timestamps that we are forwarding over the network.
+  aos::Sender<Timestamp> timestamp_sender_;
+
+  aos::monotonic_clock::time_point last_statistics_send_time_ =
+      aos::monotonic_clock::min_time;
+
+  std::function<void(const Context &)> send_data_;
+};
+
+
+}  // namespace message_bridge
+}  // namespace aos
+
+#endif  // AOS_NETWORK_MESSAGE_BRIDGE_SERVER_STATUS_H_