Handle node reboots

Reset all the filter states when the node disappears.  We don't handle
that very well, and don't need to.

Change-Id: Ia92b42f38a030a945120b6c404acae76abe4a0af
diff --git a/aos/network/BUILD b/aos/network/BUILD
index 6b85830..c12c675 100644
--- a/aos/network/BUILD
+++ b/aos/network/BUILD
@@ -258,6 +258,7 @@
         ":message_bridge_test_client_config.json",
         ":message_bridge_test_server_config.json",
     ],
+    shard_count = 3,
     deps = [
         ":message_bridge_client_lib",
         ":message_bridge_server_lib",
diff --git a/aos/network/message_bridge_client_lib.cc b/aos/network/message_bridge_client_lib.cc
index efb3dd0..3963485 100644
--- a/aos/network/message_bridge_client_lib.cc
+++ b/aos/network/message_bridge_client_lib.cc
@@ -230,6 +230,7 @@
 
   remote_assoc_id_ = assoc_id;
   connection_->mutate_state(State::CONNECTED);
+  filter_.Reset();
 }
 
 void SctpClientConnection::NodeDisconnected() {
@@ -238,6 +239,8 @@
       chrono::milliseconds(100));
   remote_assoc_id_ = 0;
   connection_->mutate_state(State::DISCONNECTED);
+  connection_->mutate_monotonic_offset(0);
+  filter_.Reset();
 }
 
 void SctpClientConnection::HandleData(const Message *message) {
diff --git a/aos/network/message_bridge_server_lib.cc b/aos/network/message_bridge_server_lib.cc
index 3607449..1b8d36f 100644
--- a/aos/network/message_bridge_server_lib.cc
+++ b/aos/network/message_bridge_server_lib.cc
@@ -177,14 +177,14 @@
   // time out eventually.  Need to sort that out.
 }
 
-void ChannelState::AddPeer(const Connection *connection,
-                           ServerConnection *server_connection_statistics,
-                           bool logged_remotely) {
-  peers_.emplace_back(0, 0, connection, server_connection_statistics,
+void ChannelState::AddPeer(
+    const Connection *connection, int node_index,
+    ServerConnection *server_connection_statistics, bool logged_remotely) {
+  peers_.emplace_back(connection, node_index, server_connection_statistics,
                       logged_remotely);
 }
 
-void ChannelState::NodeDisconnected(sctp_assoc_t assoc_id) {
+int ChannelState::NodeDisconnected(sctp_assoc_t assoc_id) {
   for (ChannelState::Peer &peer : peers_) {
     if (peer.sac_assoc_id == assoc_id) {
       // TODO(austin): This will not handle multiple clients from
@@ -192,12 +192,13 @@
       peer.server_connection_statistics->mutate_state(State::DISCONNECTED);
       peer.sac_assoc_id = 0;
       peer.stream = 0;
-      break;
+      return peer.node_index;
     }
   }
+  return -1;
 }
 
-void ChannelState::NodeConnected(const Node *node, sctp_assoc_t assoc_id,
+int ChannelState::NodeConnected(const Node *node, sctp_assoc_t assoc_id,
                                  int stream, SctpServer *server) {
   for (ChannelState::Peer &peer : peers_) {
     if (peer.connection->name()->string_view() == node->name()->string_view()) {
@@ -205,10 +206,10 @@
       peer.stream = stream;
       peer.server_connection_statistics->mutate_state(State::CONNECTED);
       server->SetStreamPriority(assoc_id, stream, peer.connection->priority());
-
-      break;
+      return peer.node_index;
     }
   }
+  return -1;
 }
 
 MessageBridgeServer::MessageBridgeServer(aos::ShmEventLoop *event_loop)
@@ -301,6 +302,8 @@
             event_loop_->configuration(), connection->name()->string_view());
         state->AddPeer(
             connection,
+            configuration::GetNodeIndex(event_loop_->configuration(),
+                                        connection->name()->string_view()),
             FindServerConnection(statistics_.mutable_message(),
                                  connection->name()->string_view()),
             configuration::ChannelMessageIsLoggedOnNode(channel, other_node));
@@ -344,13 +347,30 @@
 
 void MessageBridgeServer::NodeDisconnected(sctp_assoc_t assoc_id) {
   // Find any matching peers and remove them.
+  int node_index = -1;
   for (std::unique_ptr<ChannelState> &channel_state : channels_) {
     if (channel_state.get() == nullptr) {
       continue;
     }
 
-    channel_state->NodeDisconnected(assoc_id);
+    node_index = channel_state->NodeDisconnected(assoc_id);
+    CHECK_NE(node_index, -1);
   }
+
+  if (node_index != -1) {
+    VLOG(1) << "Resetting filters for " << node_index << " "
+            << event_loop_->configuration()
+                   ->nodes()
+                   ->Get(node_index)
+                   ->name()
+                   ->string_view();
+    ResetFilter(node_index);
+  }
+}
+
+void MessageBridgeServer::ResetFilter(int node_index) {
+  filters_[node_index].Reset();
+  server_connection_[node_index]->mutate_monotonic_offset(0);
 }
 
 void MessageBridgeServer::MessageReceived() {
@@ -399,6 +419,7 @@
 
     // Account for the control channel and delivery times channel.
     size_t channel_index = kControlStreams();
+    int node_index = -1;
     for (const Channel *channel : *connect->channels_to_transfer()) {
       bool matched = false;
       for (std::unique_ptr<ChannelState> &channel_state : channels_) {
@@ -406,9 +427,10 @@
           continue;
         }
         if (channel_state->Matches(channel)) {
-          channel_state->NodeConnected(connect->node(),
-                                       message->header.rcvinfo.rcv_assoc_id,
-                                       channel_index, &server_);
+          node_index = channel_state->NodeConnected(
+              connect->node(), message->header.rcvinfo.rcv_assoc_id,
+              channel_index, &server_);
+          CHECK_NE(node_index, -1);
 
           matched = true;
           break;
@@ -421,6 +443,13 @@
         ++channel_index;
       }
     }
+    ResetFilter(node_index);
+    VLOG(1) << "Resetting filters for " << node_index << " "
+              << event_loop_->configuration()
+                     ->nodes()
+                     ->Get(node_index)
+                     ->name()
+                     ->string_view();
   } else if (message->header.rcvinfo.rcv_sid == kTimestampStream()) {
     // Message delivery
     const logger::MessageHeader *message_header =
@@ -507,15 +536,23 @@
     // Iterate through the connections this node has made.
     for (const ClientConnection *connection :
          *client_statistics_fetcher_->connections()) {
-      // Filter out the ones which aren't connected.
-      if (connection->state() != State::CONNECTED) continue;
-      // And the ones without monotonic offsets.
-      if (!connection->has_monotonic_offset()) continue;
-
       const int node_index = configuration::GetNodeIndex(
           event_loop_->configuration(),
           connection->node()->name()->string_view());
 
+      // Filter out the ones which aren't connected.
+      // And the ones without monotonic offsets.
+      if (connection->state() != State::CONNECTED ||
+          !connection->has_monotonic_offset() ||
+          client_statistics_fetcher_.context().monotonic_event_time +
+                  kClientStatisticsStaleTimeout <
+              event_loop_->context().monotonic_event_time) {
+        VLOG(1) << "Disconnected, no offset, or client message too old for "
+                << connection->node()->name()->string_view();
+        ResetFilter(node_index);
+        continue;
+      }
+
       timestamp_fetchers_[node_index].Fetch();
 
       // Find the offset computed on their node for this client connection
@@ -527,17 +564,28 @@
              *timestamp_fetchers_[node_index]->offsets()) {
           if (client_offset->node()->name()->string_view() ==
               event_loop_->node()->name()->string_view()) {
+            // Make sure it has an offset and the message isn't stale.
             if (client_offset->has_monotonic_offset()) {
-              their_offset =
-                  std::chrono::nanoseconds(client_offset->monotonic_offset());
-              has_their_offset = true;
+              if (timestamp_fetchers_[node_index]
+                          .context()
+                          .monotonic_event_time +
+                      kTimestampStaleTimeout >
+                  event_loop_->context().monotonic_event_time) {
+                their_offset =
+                    std::chrono::nanoseconds(client_offset->monotonic_offset());
+                has_their_offset = true;
+              } else {
+                ResetFilter(node_index);
+                VLOG(1) << "Timestamp old, resetting.";
+              }
             }
             break;
           }
         }
       }
 
-      if (has_their_offset) {
+      if (has_their_offset &&
+          server_connection_[node_index]->state() == State::CONNECTED) {
         // Update the filters.
         if (filters_[node_index].MissingSamples()) {
           // Update the offset the first time.  This should be representative.
diff --git a/aos/network/message_bridge_server_lib.h b/aos/network/message_bridge_server_lib.h
index d271c8c..8e8ec64 100644
--- a/aos/network/message_bridge_server_lib.h
+++ b/aos/network/message_bridge_server_lib.h
@@ -22,42 +22,42 @@
 // Class to encapsulate all the state per channel.  This is the dispatcher for a
 // new message from the event loop.
 class ChannelState {
-  public:
-   ChannelState(const Channel *channel, int channel_index)
-       : channel_index_(channel_index), channel_(channel) {}
+ public:
+  ChannelState(const Channel *channel, int channel_index)
+      : channel_index_(channel_index), channel_(channel) {}
 
-   // Class to encapsulate all the state per client on a channel.  A client may
-   // be subscribed to multiple channels.
-   struct Peer {
-     Peer(sctp_assoc_t new_sac_assoc_id, size_t new_stream,
-          const Connection *new_connection,
-          ServerConnection *new_server_connection_statistics,
-          bool new_logged_remotely)
-         : sac_assoc_id(new_sac_assoc_id),
-           stream(new_stream),
-           connection(new_connection),
-           server_connection_statistics(new_server_connection_statistics),
-           logged_remotely(new_logged_remotely) {}
+  // Class to encapsulate all the state per client on a channel.  A client may
+  // be subscribed to multiple channels.
+  struct Peer {
+    Peer(const Connection *new_connection, int new_node_index,
+         ServerConnection *new_server_connection_statistics,
+         bool new_logged_remotely)
+        : connection(new_connection),
+          node_index(new_node_index),
+          server_connection_statistics(new_server_connection_statistics),
+          logged_remotely(new_logged_remotely) {}
 
-     // Valid if != 0.
-     sctp_assoc_t sac_assoc_id = 0;
+    // Valid if != 0.
+    sctp_assoc_t sac_assoc_id = 0;
 
-     size_t stream;
-     const aos::Connection *connection;
-     ServerConnection *server_connection_statistics;
+    size_t stream = 0;
+    const aos::Connection *connection;
+    const int node_index;
+    ServerConnection *server_connection_statistics;
 
-     // If true, this message will be logged on a receiving node.  We need to
-     // keep it around to log it locally if that fails.
-     bool logged_remotely = false;
+    // If true, this message will be logged on a receiving node.  We need to
+    // keep it around to log it locally if that fails.
+    bool logged_remotely = false;
   };
 
   // Needs to be called when a node (might have) disconnected.
-  void NodeDisconnected(sctp_assoc_t assoc_id);
-  void NodeConnected(const Node *node, sctp_assoc_t assoc_id, int stream,
-                     SctpServer *server);
+  // Returns the node index which [dis]connected, or -1 if it didn't match.
+  int NodeDisconnected(sctp_assoc_t assoc_id);
+  int NodeConnected(const Node *node, sctp_assoc_t assoc_id, int stream,
+                    SctpServer *server);
 
   // Adds a new peer.
-  void AddPeer(const Connection *connection,
+  void AddPeer(const Connection *connection, int node_index,
                ServerConnection *server_connection_statistics,
                bool logged_remotely);
 
@@ -94,6 +94,12 @@
 
   ~MessageBridgeServer() { event_loop_->epoll()->DeleteFd(server_.fd()); }
 
+  // Time after which we consider the client statistics message stale, and reset
+  // the filter.
+  static constexpr std::chrono::seconds kClientStatisticsStaleTimeout{1};
+  // Time after which we consider the timestamp stale, and reset the filter.
+  static constexpr std::chrono::milliseconds kTimestampStaleTimeout{250};
+
  private:
   // Reads a message from the socket.  Could be a notification.
   void MessageReceived();
@@ -103,6 +109,9 @@
   // Called when the server connection disconnects.
   void NodeDisconnected(sctp_assoc_t assoc_id);
 
+  // Resets the filter and clears the entry from the server statistics.
+  void ResetFilter(int node_index);
+
   // Called when data (either a connection request or delivery timestamps) is
   // received.
   void HandleData(const Message *message);
diff --git a/aos/network/message_bridge_test.cc b/aos/network/message_bridge_test.cc
index 27bed82..96003f7 100644
--- a/aos/network/message_bridge_test.cc
+++ b/aos/network/message_bridge_test.cc
@@ -42,7 +42,7 @@
   aos::FlatbufferDetachedBuffer<aos::Configuration> server_config =
       aos::configuration::ReadConfig(
           "aos/network/message_bridge_test_server_config.json");
-  aos::FlatbufferDetachedBuffer<aos::Configuration> client_config =
+  aos::FlatbufferDetachedBuffer<aos::Configuration> pi2_config =
       aos::configuration::ReadConfig(
           "aos/network/message_bridge_test_client_config.json");
 
@@ -65,20 +65,20 @@
   // Now do it for "raspberrypi2", the client.
   FLAGS_application_name = "pi2_message_bridge_client";
   FLAGS_override_hostname = "raspberrypi2";
-  aos::ShmEventLoop pi2_client_event_loop(&client_config.message());
+  aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
   MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
 
   FLAGS_application_name = "pi2_message_bridge_server";
-  aos::ShmEventLoop pi2_server_event_loop(&client_config.message());
+  aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
   MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
 
   // And build the app which sends the pongs.
   FLAGS_application_name = "pong";
-  aos::ShmEventLoop pong_event_loop(&client_config.message());
+  aos::ShmEventLoop pong_event_loop(&pi2_config.message());
 
   // And build the app for testing.
   FLAGS_application_name = "test";
-  aos::ShmEventLoop test_event_loop(&client_config.message());
+  aos::ShmEventLoop test_event_loop(&pi2_config.message());
 
   aos::Fetcher<ClientStatistics> client_statistics_fetcher =
       test_event_loop.MakeFetcher<ClientStatistics>("/aos");
@@ -156,24 +156,24 @@
   });
 
   int pi1_client_statistics_count = 0;
-  ping_event_loop.MakeWatcher(
-      "/pi1/aos", [&pi1_client_statistics_count](const ClientStatistics &stats) {
-        LOG(INFO) << FlatbufferToJson(&stats);
+  ping_event_loop.MakeWatcher("/pi1/aos", [&pi1_client_statistics_count](
+                                              const ClientStatistics &stats) {
+    LOG(INFO) << FlatbufferToJson(&stats);
 
-        for (const ClientConnection *connection : *stats.connections()) {
-          if (connection->has_monotonic_offset()) {
-            ++pi1_client_statistics_count;
-            // It takes at least 10 microseconds to send a message between the
-            // client and server.  The min (filtered) time shouldn't be over 10
-            // milliseconds on localhost.  This might have to bump up if this is
-            // proving flaky.
-            EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
-                      chrono::milliseconds(10));
-            EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
-                      chrono::microseconds(10));
-          }
-        }
-      });
+    for (const ClientConnection *connection : *stats.connections()) {
+      if (connection->has_monotonic_offset()) {
+        ++pi1_client_statistics_count;
+        // It takes at least 10 microseconds to send a message between the
+        // client and server.  The min (filtered) time shouldn't be over 10
+        // milliseconds on localhost.  This might have to bump up if this is
+        // proving flaky.
+        EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
+                  chrono::milliseconds(10));
+        EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
+                  chrono::microseconds(10));
+      }
+    }
+  });
 
   int pi2_client_statistics_count = 0;
   pong_event_loop.MakeWatcher("/pi2/aos", [&pi2_client_statistics_count](
@@ -263,11 +263,430 @@
   EXPECT_GE(pi2_server_statistics_count, 2);
   EXPECT_GE(pi1_client_statistics_count, 2);
   EXPECT_GE(pi2_client_statistics_count, 2);
-
-  // TODO(austin): Need 2 servers going so we can do the round trip offset
-  // estimation.
 }
 
+// Test that the client disconnecting triggers the server offsets on both sides
+// to clear.
+TEST(MessageBridgeTest, ClientRestart) {
+  // This is rather annoying to set up.  We need to start up a client and
+  // server, on the same node, but get them to think that they are on different
+  // nodes.
+  //
+  // We need the client to not post directly to "/test" like it would in a
+  // real system, otherwise we will re-send the ping message... So, use an
+  // application specific map to have the client post somewhere else.
+  //
+  // To top this all off, each of these needs to be done with a ShmEventLoop,
+  // which needs to run in a separate thread...  And it is really hard to get
+  // everything started up reliably.  So just be super generous on timeouts and
+  // hope for the best.  We can be more generous in the future if we need to.
+  //
+  // We are faking the application names by passing in --application_name=foo
+  aos::FlatbufferDetachedBuffer<aos::Configuration> server_config =
+      aos::configuration::ReadConfig(
+          "aos/network/message_bridge_test_server_config.json");
+  aos::FlatbufferDetachedBuffer<aos::Configuration> pi2_config =
+      aos::configuration::ReadConfig(
+          "aos/network/message_bridge_test_client_config.json");
+
+  FLAGS_application_name = "pi1_message_bridge_server";
+  // Force ourselves to be "raspberrypi" and allocate everything.
+  FLAGS_override_hostname = "raspberrypi";
+  aos::ShmEventLoop pi1_server_event_loop(&server_config.message());
+  MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
+
+  FLAGS_application_name = "pi1_message_bridge_client";
+  aos::ShmEventLoop pi1_client_event_loop(&server_config.message());
+  MessageBridgeClient pi1_message_bridge_client(&pi1_client_event_loop);
+
+  // And build the app for testing.
+  FLAGS_application_name = "test1";
+  aos::ShmEventLoop pi1_test_event_loop(&server_config.message());
+  aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
+      pi1_test_event_loop.MakeFetcher<ServerStatistics>("/pi1/aos");
+
+  // Now do it for "raspberrypi2", the client.
+  FLAGS_override_hostname = "raspberrypi2";
+  FLAGS_application_name = "pi2_message_bridge_server";
+  aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
+  MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
+
+  // And build the app for testing.
+  FLAGS_application_name = "test2";
+  aos::ShmEventLoop pi2_test_event_loop(&pi2_config.message());
+  aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
+      pi2_test_event_loop.MakeFetcher<ServerStatistics>("/pi2/aos");
+
+  // Wait until we are connected, then send.
+  pi1_test_event_loop.MakeWatcher(
+      "/pi1/aos", [](const ServerStatistics &stats) {
+        LOG(INFO) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
+      });
+
+  pi2_test_event_loop.MakeWatcher(
+      "/pi2/aos", [](const ServerStatistics &stats) {
+        LOG(INFO) << "pi2 ServerStatistics " << FlatbufferToJson(&stats);
+      });
+
+  pi1_test_event_loop.MakeWatcher(
+      "/pi1/aos", [](const ClientStatistics &stats) {
+        LOG(INFO) << "pi1 ClientStatistics " << FlatbufferToJson(&stats);
+      });
+
+  pi2_test_event_loop.MakeWatcher(
+      "/pi2/aos", [](const ClientStatistics &stats) {
+        LOG(INFO) << "pi2 ClientStatistics " << FlatbufferToJson(&stats);
+      });
+
+  pi1_test_event_loop.MakeWatcher("/pi1/aos", [](const Timestamp &timestamp) {
+    LOG(INFO) << "pi1 Timestamp " << FlatbufferToJson(&timestamp);
+  });
+  pi2_test_event_loop.MakeWatcher("/pi2/aos", [](const Timestamp &timestamp) {
+    LOG(INFO) << "pi2 Timestamp " << FlatbufferToJson(&timestamp);
+  });
+
+  // Start everything up.  Pong is the only thing we don't know how to wait on,
+  // so start it first.
+  std::thread pi1_test_thread(
+      [&pi1_test_event_loop]() { pi1_test_event_loop.Run(); });
+  std::thread pi2_test_thread(
+      [&pi2_test_event_loop]() { pi2_test_event_loop.Run(); });
+
+  std::thread pi1_server_thread(
+      [&pi1_server_event_loop]() { pi1_server_event_loop.Run(); });
+  std::thread pi1_client_thread(
+      [&pi1_client_event_loop]() { pi1_client_event_loop.Run(); });
+  std::thread pi2_server_thread(
+      [&pi2_server_event_loop]() { pi2_server_event_loop.Run(); });
+
+  {
+    FLAGS_application_name = "pi2_message_bridge_client";
+    aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
+    MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
+
+    // Run for 5 seconds to make sure we have time to estimate the offset.
+    aos::TimerHandler *const quit = pi2_client_event_loop.AddTimer(
+        [&pi2_client_event_loop]() { pi2_client_event_loop.Exit(); });
+    pi2_client_event_loop.OnRun([quit, &pi2_client_event_loop]() {
+      // Stop between timestamps, not exactly on them.
+      quit->Setup(pi2_client_event_loop.monotonic_now() +
+                  chrono::milliseconds(3050));
+    });
+
+    // And go!
+    pi2_client_event_loop.Run();
+
+    // Now confirm we are synchronized.
+    EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
+    EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
+
+    const ServerConnection *const pi1_connection =
+        pi1_server_statistics_fetcher->connections()->Get(0);
+    const ServerConnection *const pi2_connection =
+        pi2_server_statistics_fetcher->connections()->Get(0);
+
+    EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
+    EXPECT_TRUE(pi1_connection->has_monotonic_offset());
+    EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
+              chrono::milliseconds(1));
+    EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
+              chrono::milliseconds(-1));
+
+    EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
+    EXPECT_TRUE(pi2_connection->has_monotonic_offset());
+    EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
+              chrono::milliseconds(1));
+    EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
+              chrono::milliseconds(-1));
+  }
+
+  std::this_thread::sleep_for(std::chrono::seconds(2));
+
+  {
+    // Now confirm we are un-synchronized.
+    EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
+    EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
+    const ServerConnection *const pi1_connection =
+        pi1_server_statistics_fetcher->connections()->Get(0);
+    const ServerConnection *const pi2_connection =
+        pi2_server_statistics_fetcher->connections()->Get(0);
+
+    EXPECT_EQ(pi1_connection->state(), State::DISCONNECTED);
+    EXPECT_FALSE(pi1_connection->has_monotonic_offset());
+    EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
+    EXPECT_FALSE(pi2_connection->has_monotonic_offset());
+  }
+
+  {
+    FLAGS_application_name = "pi2_message_bridge_client";
+    aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
+    MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
+
+    // Run for 5 seconds to make sure we have time to estimate the offset.
+    aos::TimerHandler *const quit = pi2_client_event_loop.AddTimer(
+        [&pi2_client_event_loop]() { pi2_client_event_loop.Exit(); });
+    pi2_client_event_loop.OnRun([quit, &pi2_client_event_loop]() {
+      // Stop between timestamps, not exactly on them.
+      quit->Setup(pi2_client_event_loop.monotonic_now() +
+                  chrono::milliseconds(3050));
+    });
+
+    // And go!
+    pi2_client_event_loop.Run();
+
+    EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
+    EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
+
+    // Now confirm we are synchronized again.
+    const ServerConnection *const pi1_connection =
+        pi1_server_statistics_fetcher->connections()->Get(0);
+    const ServerConnection *const pi2_connection =
+        pi2_server_statistics_fetcher->connections()->Get(0);
+
+    EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
+    EXPECT_TRUE(pi1_connection->has_monotonic_offset());
+    EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
+              chrono::milliseconds(1));
+    EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
+              chrono::milliseconds(-1));
+
+    EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
+    EXPECT_TRUE(pi2_connection->has_monotonic_offset());
+    EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
+              chrono::milliseconds(1));
+    EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
+              chrono::milliseconds(-1));
+  }
+
+  // Shut everyone else down
+  pi1_server_event_loop.Exit();
+  pi1_client_event_loop.Exit();
+  pi2_server_event_loop.Exit();
+  pi1_test_event_loop.Exit();
+  pi2_test_event_loop.Exit();
+  pi1_server_thread.join();
+  pi1_client_thread.join();
+  pi2_server_thread.join();
+  pi1_test_thread.join();
+  pi2_test_thread.join();
+}
+
+// Test that the server disconnecting triggers the server offsets on the other
+// side to clear, along with the other client.
+TEST(MessageBridgeTest, ServerRestart) {
+  // This is rather annoying to set up.  We need to start up a client and
+  // server, on the same node, but get them to think that they are on different
+  // nodes.
+  //
+  // We need the client to not post directly to "/test" like it would in a
+  // real system, otherwise we will re-send the ping message... So, use an
+  // application specific map to have the client post somewhere else.
+  //
+  // To top this all off, each of these needs to be done with a ShmEventLoop,
+  // which needs to run in a separate thread...  And it is really hard to get
+  // everything started up reliably.  So just be super generous on timeouts and
+  // hope for the best.  We can be more generous in the future if we need to.
+  //
+  // We are faking the application names by passing in --application_name=foo
+  aos::FlatbufferDetachedBuffer<aos::Configuration> server_config =
+      aos::configuration::ReadConfig(
+          "aos/network/message_bridge_test_server_config.json");
+  aos::FlatbufferDetachedBuffer<aos::Configuration> pi2_config =
+      aos::configuration::ReadConfig(
+          "aos/network/message_bridge_test_client_config.json");
+
+  FLAGS_application_name = "pi1_message_bridge_server";
+  // Force ourselves to be "raspberrypi" and allocate everything.
+  FLAGS_override_hostname = "raspberrypi";
+  aos::ShmEventLoop pi1_server_event_loop(&server_config.message());
+  MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
+
+  FLAGS_application_name = "pi1_message_bridge_client";
+  aos::ShmEventLoop pi1_client_event_loop(&server_config.message());
+  MessageBridgeClient pi1_message_bridge_client(&pi1_client_event_loop);
+
+  // And build the app for testing.
+  FLAGS_application_name = "test1";
+  aos::ShmEventLoop pi1_test_event_loop(&server_config.message());
+  aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
+      pi1_test_event_loop.MakeFetcher<ServerStatistics>("/pi1/aos");
+  aos::Fetcher<ClientStatistics> pi1_client_statistics_fetcher =
+      pi1_test_event_loop.MakeFetcher<ClientStatistics>("/pi1/aos");
+
+  // Now do it for "raspberrypi2", the client.
+  FLAGS_override_hostname = "raspberrypi2";
+  FLAGS_application_name = "pi2_message_bridge_client";
+  aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
+  MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
+
+  // And build the app for testing.
+  FLAGS_application_name = "test2";
+  aos::ShmEventLoop pi2_test_event_loop(&pi2_config.message());
+  aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
+      pi2_test_event_loop.MakeFetcher<ServerStatistics>("/pi2/aos");
+
+  // Wait until we are connected, then send.
+  pi1_test_event_loop.MakeWatcher(
+      "/pi1/aos", [](const ServerStatistics &stats) {
+        LOG(INFO) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
+      });
+
+  // Confirm both client and server statistics messages have decent offsets in
+  // them.
+  pi2_test_event_loop.MakeWatcher(
+      "/pi2/aos", [](const ServerStatistics &stats) {
+        LOG(INFO) << "pi2 ServerStatistics " << FlatbufferToJson(&stats);
+      });
+
+  pi1_test_event_loop.MakeWatcher(
+      "/pi1/aos", [](const ClientStatistics &stats) {
+        LOG(INFO) << "pi1 ClientStatistics " << FlatbufferToJson(&stats);
+      });
+
+  pi2_test_event_loop.MakeWatcher(
+      "/pi2/aos", [](const ClientStatistics &stats) {
+        LOG(INFO) << "pi2 ClientStatistics " << FlatbufferToJson(&stats);
+      });
+
+  pi1_test_event_loop.MakeWatcher("/pi1/aos", [](const Timestamp &timestamp) {
+    LOG(INFO) << "pi1 Timestamp " << FlatbufferToJson(&timestamp);
+  });
+  pi2_test_event_loop.MakeWatcher("/pi2/aos", [](const Timestamp &timestamp) {
+    LOG(INFO) << "pi2 Timestamp " << FlatbufferToJson(&timestamp);
+  });
+
+  // Start everything up.  Pong is the only thing we don't know how to wait on,
+  // so start it first.
+  std::thread pi1_test_thread(
+      [&pi1_test_event_loop]() { pi1_test_event_loop.Run(); });
+  std::thread pi2_test_thread(
+      [&pi2_test_event_loop]() { pi2_test_event_loop.Run(); });
+
+  std::thread pi1_server_thread(
+      [&pi1_server_event_loop]() { pi1_server_event_loop.Run(); });
+  std::thread pi1_client_thread(
+      [&pi1_client_event_loop]() { pi1_client_event_loop.Run(); });
+  std::thread pi2_client_thread(
+      [&pi2_client_event_loop]() { pi2_client_event_loop.Run(); });
+
+  {
+    FLAGS_application_name = "pi2_message_bridge_server";
+    aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
+    MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
+
+    // Run for 5 seconds to make sure we have time to estimate the offset.
+    aos::TimerHandler *const quit = pi2_server_event_loop.AddTimer(
+        [&pi2_server_event_loop]() { pi2_server_event_loop.Exit(); });
+    pi2_server_event_loop.OnRun([quit, &pi2_server_event_loop]() {
+      // Stop between timestamps, not exactly on them.
+      quit->Setup(pi2_server_event_loop.monotonic_now() +
+                  chrono::milliseconds(3050));
+    });
+
+    // And go!
+    pi2_server_event_loop.Run();
+
+    // Now confirm we are synchronized.
+    EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
+    EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
+
+    const ServerConnection *const pi1_connection =
+        pi1_server_statistics_fetcher->connections()->Get(0);
+    const ServerConnection *const pi2_connection =
+        pi2_server_statistics_fetcher->connections()->Get(0);
+
+    EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
+    EXPECT_TRUE(pi1_connection->has_monotonic_offset());
+    EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
+              chrono::milliseconds(1));
+    EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
+              chrono::milliseconds(-1));
+
+    EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
+    EXPECT_TRUE(pi2_connection->has_monotonic_offset());
+    EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
+              chrono::milliseconds(1));
+    EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
+              chrono::milliseconds(-1));
+  }
+
+  std::this_thread::sleep_for(std::chrono::seconds(2));
+
+  {
+    // And confirm we are unsynchronized.
+    EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
+    EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
+
+    const ServerConnection *const pi1_server_connection =
+        pi1_server_statistics_fetcher->connections()->Get(0);
+    const ClientConnection *const pi1_client_connection =
+        pi1_client_statistics_fetcher->connections()->Get(0);
+
+    EXPECT_EQ(pi1_server_connection->state(), State::CONNECTED);
+    EXPECT_FALSE(pi1_server_connection->has_monotonic_offset());
+    EXPECT_EQ(pi1_client_connection->state(), State::DISCONNECTED);
+    EXPECT_FALSE(pi1_client_connection->has_monotonic_offset());
+  }
+
+  {
+    FLAGS_application_name = "pi2_message_bridge_server";
+    aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
+    MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
+
+    // Run for 5 seconds to make sure we have time to estimate the offset.
+    aos::TimerHandler *const quit = pi2_server_event_loop.AddTimer(
+        [&pi2_server_event_loop]() { pi2_server_event_loop.Exit(); });
+    pi2_server_event_loop.OnRun([quit, &pi2_server_event_loop]() {
+      // Stop between timestamps, not exactly on them.
+      quit->Setup(pi2_server_event_loop.monotonic_now() +
+                  chrono::milliseconds(3050));
+    });
+
+    // And go!
+    pi2_server_event_loop.Run();
+
+    // And confirm we are synchronized again.
+    EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
+    EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
+
+    const ServerConnection *const pi1_connection =
+        pi1_server_statistics_fetcher->connections()->Get(0);
+    const ServerConnection *const pi2_connection =
+        pi2_server_statistics_fetcher->connections()->Get(0);
+
+    EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
+    EXPECT_TRUE(pi1_connection->has_monotonic_offset());
+    EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
+              chrono::milliseconds(1));
+    EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
+              chrono::milliseconds(-1));
+
+    EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
+    EXPECT_TRUE(pi2_connection->has_monotonic_offset());
+    EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
+              chrono::milliseconds(1));
+    EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
+              chrono::milliseconds(-1));
+  }
+
+  // Shut everyone else down
+  pi1_server_event_loop.Exit();
+  pi1_client_event_loop.Exit();
+  pi2_client_event_loop.Exit();
+  pi1_test_event_loop.Exit();
+  pi2_test_event_loop.Exit();
+  pi1_server_thread.join();
+  pi1_client_thread.join();
+  pi2_client_thread.join();
+  pi1_test_thread.join();
+  pi2_test_thread.join();
+}
+
+// TODO(austin): This test confirms that the external state does the right
+// thing, but doesn't confirm that the internal state does.  We either need to
+// expose a way to check the state in a thread-safe way, or need a way to jump
+// time for one node to do that.
+
 }  // namespace testing
 }  // namespace message_bridge
 }  // namespace aos
diff --git a/aos/network/message_bridge_test_common.json b/aos/network/message_bridge_test_common.json
index 1092e4b..a8475b1 100644
--- a/aos/network/message_bridge_test_common.json
+++ b/aos/network/message_bridge_test_common.json
@@ -133,7 +133,7 @@
   "maps": [
     {
       "match": {
-        "name": "/aos",
+        "name": "/aos*",
         "source_node": "pi1"
       },
       "rename": {
@@ -142,7 +142,7 @@
     },
     {
       "match": {
-        "name": "/aos",
+        "name": "/aos*",
         "source_node": "pi2"
       },
       "rename": {
@@ -161,7 +161,7 @@
     },
     {
       "match": {
-        "name": "/pi1/aos",
+        "name": "/pi1/aos*",
         "source_node": "pi2"
       },
       "rename": {
@@ -170,7 +170,7 @@
     },
     {
       "match": {
-        "name": "/pi2/aos",
+        "name": "/pi2/aos*",
         "source_node": "pi1"
       },
       "rename": {
diff --git a/aos/network/timestamp_filter.h b/aos/network/timestamp_filter.h
index 6f2baca..88561b9 100644
--- a/aos/network/timestamp_filter.h
+++ b/aos/network/timestamp_filter.h
@@ -120,6 +120,13 @@
     return last_time_ != aos::monotonic_clock::min_time;
   }
 
+  void Reset() {
+    offset_ = 0;
+
+    last_time_ = aos::monotonic_clock::min_time;
+    base_offset_ = std::chrono::nanoseconds(0);
+  }
+
  private:
   double offset_ = 0;
 
@@ -245,6 +252,19 @@
            (last_rev_time_ == aos::monotonic_clock::min_time);
   }
 
+  void Reset() {
+    base_offset_ = std::chrono::nanoseconds(0);
+    offset_ = 0;
+
+    last_fwd_time_ = aos::monotonic_clock::min_time;
+    last_rev_time_ = aos::monotonic_clock::min_time;
+    first_fwd_time_ = aos::monotonic_clock::min_time;
+    first_rev_time_ = aos::monotonic_clock::min_time;
+
+    fwd_.Reset();
+    rev_.Reset();
+  }
+
  private:
   // Updates the offset estimate given the current time, and a pointer to the
   // variable holding the last time.
@@ -283,7 +303,14 @@
     *last_time = monotonic_now;
 
     if (sample_pointer_ != nullptr) {
-      *sample_pointer_ = offset_;
+      // TODO(austin): Probably shouldn't do the update if we don't have fwd and
+      // reverse samples.
+      if (!MissingSamples()) {
+        *sample_pointer_ = offset_;
+        VLOG(1) << "Updating sample to " << offset_;
+      } else {
+        LOG(WARNING) << "Don't have both samples.";
+      }
     }
   }