Handle node reboots

Reset all the filter states when the node disappears.  We don't handle
that very well, and don't need to.

Change-Id: Ia92b42f38a030a945120b6c404acae76abe4a0af
diff --git a/aos/network/message_bridge_test.cc b/aos/network/message_bridge_test.cc
index 27bed82..96003f7 100644
--- a/aos/network/message_bridge_test.cc
+++ b/aos/network/message_bridge_test.cc
@@ -42,7 +42,7 @@
   aos::FlatbufferDetachedBuffer<aos::Configuration> server_config =
       aos::configuration::ReadConfig(
           "aos/network/message_bridge_test_server_config.json");
-  aos::FlatbufferDetachedBuffer<aos::Configuration> client_config =
+  aos::FlatbufferDetachedBuffer<aos::Configuration> pi2_config =
       aos::configuration::ReadConfig(
           "aos/network/message_bridge_test_client_config.json");
 
@@ -65,20 +65,20 @@
   // Now do it for "raspberrypi2", the client.
   FLAGS_application_name = "pi2_message_bridge_client";
   FLAGS_override_hostname = "raspberrypi2";
-  aos::ShmEventLoop pi2_client_event_loop(&client_config.message());
+  aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
   MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
 
   FLAGS_application_name = "pi2_message_bridge_server";
-  aos::ShmEventLoop pi2_server_event_loop(&client_config.message());
+  aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
   MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
 
   // And build the app which sends the pongs.
   FLAGS_application_name = "pong";
-  aos::ShmEventLoop pong_event_loop(&client_config.message());
+  aos::ShmEventLoop pong_event_loop(&pi2_config.message());
 
   // And build the app for testing.
   FLAGS_application_name = "test";
-  aos::ShmEventLoop test_event_loop(&client_config.message());
+  aos::ShmEventLoop test_event_loop(&pi2_config.message());
 
   aos::Fetcher<ClientStatistics> client_statistics_fetcher =
       test_event_loop.MakeFetcher<ClientStatistics>("/aos");
@@ -156,24 +156,24 @@
   });
 
   int pi1_client_statistics_count = 0;
-  ping_event_loop.MakeWatcher(
-      "/pi1/aos", [&pi1_client_statistics_count](const ClientStatistics &stats) {
-        LOG(INFO) << FlatbufferToJson(&stats);
+  ping_event_loop.MakeWatcher("/pi1/aos", [&pi1_client_statistics_count](
+                                              const ClientStatistics &stats) {
+    LOG(INFO) << FlatbufferToJson(&stats);
 
-        for (const ClientConnection *connection : *stats.connections()) {
-          if (connection->has_monotonic_offset()) {
-            ++pi1_client_statistics_count;
-            // It takes at least 10 microseconds to send a message between the
-            // client and server.  The min (filtered) time shouldn't be over 10
-            // milliseconds on localhost.  This might have to bump up if this is
-            // proving flaky.
-            EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
-                      chrono::milliseconds(10));
-            EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
-                      chrono::microseconds(10));
-          }
-        }
-      });
+    for (const ClientConnection *connection : *stats.connections()) {
+      if (connection->has_monotonic_offset()) {
+        ++pi1_client_statistics_count;
+        // It takes at least 10 microseconds to send a message between the
+        // client and server.  The min (filtered) time shouldn't be over 10
+        // milliseconds on localhost.  This might have to bump up if this is
+        // proving flaky.
+        EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
+                  chrono::milliseconds(10));
+        EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
+                  chrono::microseconds(10));
+      }
+    }
+  });
 
   int pi2_client_statistics_count = 0;
   pong_event_loop.MakeWatcher("/pi2/aos", [&pi2_client_statistics_count](
@@ -263,11 +263,430 @@
   EXPECT_GE(pi2_server_statistics_count, 2);
   EXPECT_GE(pi1_client_statistics_count, 2);
   EXPECT_GE(pi2_client_statistics_count, 2);
-
-  // TODO(austin): Need 2 servers going so we can do the round trip offset
-  // estimation.
 }
 
+// Test that the client disconnecting triggers the server offsets on both sides
+// to clear.
+TEST(MessageBridgeTest, ClientRestart) {
+  // This is rather annoying to set up.  We need to start up a client and
+  // server, on the same node, but get them to think that they are on different
+  // nodes.
+  //
+  // We need the client to not post directly to "/test" like it would in a
+  // real system, otherwise we will re-send the ping message... So, use an
+  // application specific map to have the client post somewhere else.
+  //
+  // To top this all off, each of these needs to be done with a ShmEventLoop,
+  // which needs to run in a separate thread...  And it is really hard to get
+  // everything started up reliably.  So just be super generous on timeouts and
+  // hope for the best.  We can be more generous in the future if we need to.
+  //
+  // We are faking the application names by passing in --application_name=foo
+  aos::FlatbufferDetachedBuffer<aos::Configuration> server_config =
+      aos::configuration::ReadConfig(
+          "aos/network/message_bridge_test_server_config.json");
+  aos::FlatbufferDetachedBuffer<aos::Configuration> pi2_config =
+      aos::configuration::ReadConfig(
+          "aos/network/message_bridge_test_client_config.json");
+
+  FLAGS_application_name = "pi1_message_bridge_server";
+  // Force ourselves to be "raspberrypi" and allocate everything.
+  FLAGS_override_hostname = "raspberrypi";
+  aos::ShmEventLoop pi1_server_event_loop(&server_config.message());
+  MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
+
+  FLAGS_application_name = "pi1_message_bridge_client";
+  aos::ShmEventLoop pi1_client_event_loop(&server_config.message());
+  MessageBridgeClient pi1_message_bridge_client(&pi1_client_event_loop);
+
+  // And build the app for testing.
+  FLAGS_application_name = "test1";
+  aos::ShmEventLoop pi1_test_event_loop(&server_config.message());
+  aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
+      pi1_test_event_loop.MakeFetcher<ServerStatistics>("/pi1/aos");
+
+  // Now do it for "raspberrypi2", the client.
+  FLAGS_override_hostname = "raspberrypi2";
+  FLAGS_application_name = "pi2_message_bridge_server";
+  aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
+  MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
+
+  // And build the app for testing.
+  FLAGS_application_name = "test2";
+  aos::ShmEventLoop pi2_test_event_loop(&pi2_config.message());
+  aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
+      pi2_test_event_loop.MakeFetcher<ServerStatistics>("/pi2/aos");
+
+  // Wait until we are connected, then send.
+  pi1_test_event_loop.MakeWatcher(
+      "/pi1/aos", [](const ServerStatistics &stats) {
+        LOG(INFO) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
+      });
+
+  pi2_test_event_loop.MakeWatcher(
+      "/pi2/aos", [](const ServerStatistics &stats) {
+        LOG(INFO) << "pi2 ServerStatistics " << FlatbufferToJson(&stats);
+      });
+
+  pi1_test_event_loop.MakeWatcher(
+      "/pi1/aos", [](const ClientStatistics &stats) {
+        LOG(INFO) << "pi1 ClientStatistics " << FlatbufferToJson(&stats);
+      });
+
+  pi2_test_event_loop.MakeWatcher(
+      "/pi2/aos", [](const ClientStatistics &stats) {
+        LOG(INFO) << "pi2 ClientStatistics " << FlatbufferToJson(&stats);
+      });
+
+  pi1_test_event_loop.MakeWatcher("/pi1/aos", [](const Timestamp &timestamp) {
+    LOG(INFO) << "pi1 Timestamp " << FlatbufferToJson(&timestamp);
+  });
+  pi2_test_event_loop.MakeWatcher("/pi2/aos", [](const Timestamp &timestamp) {
+    LOG(INFO) << "pi2 Timestamp " << FlatbufferToJson(&timestamp);
+  });
+
+  // Start everything up.  Pong is the only thing we don't know how to wait on,
+  // so start it first.
+  std::thread pi1_test_thread(
+      [&pi1_test_event_loop]() { pi1_test_event_loop.Run(); });
+  std::thread pi2_test_thread(
+      [&pi2_test_event_loop]() { pi2_test_event_loop.Run(); });
+
+  std::thread pi1_server_thread(
+      [&pi1_server_event_loop]() { pi1_server_event_loop.Run(); });
+  std::thread pi1_client_thread(
+      [&pi1_client_event_loop]() { pi1_client_event_loop.Run(); });
+  std::thread pi2_server_thread(
+      [&pi2_server_event_loop]() { pi2_server_event_loop.Run(); });
+
+  {
+    FLAGS_application_name = "pi2_message_bridge_client";
+    aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
+    MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
+
+    // Run for 5 seconds to make sure we have time to estimate the offset.
+    aos::TimerHandler *const quit = pi2_client_event_loop.AddTimer(
+        [&pi2_client_event_loop]() { pi2_client_event_loop.Exit(); });
+    pi2_client_event_loop.OnRun([quit, &pi2_client_event_loop]() {
+      // Stop between timestamps, not exactly on them.
+      quit->Setup(pi2_client_event_loop.monotonic_now() +
+                  chrono::milliseconds(3050));
+    });
+
+    // And go!
+    pi2_client_event_loop.Run();
+
+    // Now confirm we are synchronized.
+    EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
+    EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
+
+    const ServerConnection *const pi1_connection =
+        pi1_server_statistics_fetcher->connections()->Get(0);
+    const ServerConnection *const pi2_connection =
+        pi2_server_statistics_fetcher->connections()->Get(0);
+
+    EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
+    EXPECT_TRUE(pi1_connection->has_monotonic_offset());
+    EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
+              chrono::milliseconds(1));
+    EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
+              chrono::milliseconds(-1));
+
+    EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
+    EXPECT_TRUE(pi2_connection->has_monotonic_offset());
+    EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
+              chrono::milliseconds(1));
+    EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
+              chrono::milliseconds(-1));
+  }
+
+  std::this_thread::sleep_for(std::chrono::seconds(2));
+
+  {
+    // Now confirm we are un-synchronized.
+    EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
+    EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
+    const ServerConnection *const pi1_connection =
+        pi1_server_statistics_fetcher->connections()->Get(0);
+    const ServerConnection *const pi2_connection =
+        pi2_server_statistics_fetcher->connections()->Get(0);
+
+    EXPECT_EQ(pi1_connection->state(), State::DISCONNECTED);
+    EXPECT_FALSE(pi1_connection->has_monotonic_offset());
+    EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
+    EXPECT_FALSE(pi2_connection->has_monotonic_offset());
+  }
+
+  {
+    FLAGS_application_name = "pi2_message_bridge_client";
+    aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
+    MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
+
+    // Run for 5 seconds to make sure we have time to estimate the offset.
+    aos::TimerHandler *const quit = pi2_client_event_loop.AddTimer(
+        [&pi2_client_event_loop]() { pi2_client_event_loop.Exit(); });
+    pi2_client_event_loop.OnRun([quit, &pi2_client_event_loop]() {
+      // Stop between timestamps, not exactly on them.
+      quit->Setup(pi2_client_event_loop.monotonic_now() +
+                  chrono::milliseconds(3050));
+    });
+
+    // And go!
+    pi2_client_event_loop.Run();
+
+    EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
+    EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
+
+    // Now confirm we are synchronized again.
+    const ServerConnection *const pi1_connection =
+        pi1_server_statistics_fetcher->connections()->Get(0);
+    const ServerConnection *const pi2_connection =
+        pi2_server_statistics_fetcher->connections()->Get(0);
+
+    EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
+    EXPECT_TRUE(pi1_connection->has_monotonic_offset());
+    EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
+              chrono::milliseconds(1));
+    EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
+              chrono::milliseconds(-1));
+
+    EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
+    EXPECT_TRUE(pi2_connection->has_monotonic_offset());
+    EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
+              chrono::milliseconds(1));
+    EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
+              chrono::milliseconds(-1));
+  }
+
+  // Shut everyone else down
+  pi1_server_event_loop.Exit();
+  pi1_client_event_loop.Exit();
+  pi2_server_event_loop.Exit();
+  pi1_test_event_loop.Exit();
+  pi2_test_event_loop.Exit();
+  pi1_server_thread.join();
+  pi1_client_thread.join();
+  pi2_server_thread.join();
+  pi1_test_thread.join();
+  pi2_test_thread.join();
+}
+
+// Test that the server disconnecting triggers the server offsets on the other
+// side to clear, along with the other client.
+TEST(MessageBridgeTest, ServerRestart) {
+  // This is rather annoying to set up.  We need to start up a client and
+  // server, on the same node, but get them to think that they are on different
+  // nodes.
+  //
+  // We need the client to not post directly to "/test" like it would in a
+  // real system, otherwise we will re-send the ping message... So, use an
+  // application specific map to have the client post somewhere else.
+  //
+  // To top this all off, each of these needs to be done with a ShmEventLoop,
+  // which needs to run in a separate thread...  And it is really hard to get
+  // everything started up reliably.  So just be super generous on timeouts and
+  // hope for the best.  We can be more generous in the future if we need to.
+  //
+  // We are faking the application names by passing in --application_name=foo
+  aos::FlatbufferDetachedBuffer<aos::Configuration> server_config =
+      aos::configuration::ReadConfig(
+          "aos/network/message_bridge_test_server_config.json");
+  aos::FlatbufferDetachedBuffer<aos::Configuration> pi2_config =
+      aos::configuration::ReadConfig(
+          "aos/network/message_bridge_test_client_config.json");
+
+  FLAGS_application_name = "pi1_message_bridge_server";
+  // Force ourselves to be "raspberrypi" and allocate everything.
+  FLAGS_override_hostname = "raspberrypi";
+  aos::ShmEventLoop pi1_server_event_loop(&server_config.message());
+  MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
+
+  FLAGS_application_name = "pi1_message_bridge_client";
+  aos::ShmEventLoop pi1_client_event_loop(&server_config.message());
+  MessageBridgeClient pi1_message_bridge_client(&pi1_client_event_loop);
+
+  // And build the app for testing.
+  FLAGS_application_name = "test1";
+  aos::ShmEventLoop pi1_test_event_loop(&server_config.message());
+  aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
+      pi1_test_event_loop.MakeFetcher<ServerStatistics>("/pi1/aos");
+  aos::Fetcher<ClientStatistics> pi1_client_statistics_fetcher =
+      pi1_test_event_loop.MakeFetcher<ClientStatistics>("/pi1/aos");
+
+  // Now do it for "raspberrypi2", the client.
+  FLAGS_override_hostname = "raspberrypi2";
+  FLAGS_application_name = "pi2_message_bridge_client";
+  aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
+  MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
+
+  // And build the app for testing.
+  FLAGS_application_name = "test2";
+  aos::ShmEventLoop pi2_test_event_loop(&pi2_config.message());
+  aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
+      pi2_test_event_loop.MakeFetcher<ServerStatistics>("/pi2/aos");
+
+  // Wait until we are connected, then send.
+  pi1_test_event_loop.MakeWatcher(
+      "/pi1/aos", [](const ServerStatistics &stats) {
+        LOG(INFO) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
+      });
+
+  // Confirm both client and server statistics messages have decent offsets in
+  // them.
+  pi2_test_event_loop.MakeWatcher(
+      "/pi2/aos", [](const ServerStatistics &stats) {
+        LOG(INFO) << "pi2 ServerStatistics " << FlatbufferToJson(&stats);
+      });
+
+  pi1_test_event_loop.MakeWatcher(
+      "/pi1/aos", [](const ClientStatistics &stats) {
+        LOG(INFO) << "pi1 ClientStatistics " << FlatbufferToJson(&stats);
+      });
+
+  pi2_test_event_loop.MakeWatcher(
+      "/pi2/aos", [](const ClientStatistics &stats) {
+        LOG(INFO) << "pi2 ClientStatistics " << FlatbufferToJson(&stats);
+      });
+
+  pi1_test_event_loop.MakeWatcher("/pi1/aos", [](const Timestamp &timestamp) {
+    LOG(INFO) << "pi1 Timestamp " << FlatbufferToJson(&timestamp);
+  });
+  pi2_test_event_loop.MakeWatcher("/pi2/aos", [](const Timestamp &timestamp) {
+    LOG(INFO) << "pi2 Timestamp " << FlatbufferToJson(&timestamp);
+  });
+
+  // Start everything up.  Pong is the only thing we don't know how to wait on,
+  // so start it first.
+  std::thread pi1_test_thread(
+      [&pi1_test_event_loop]() { pi1_test_event_loop.Run(); });
+  std::thread pi2_test_thread(
+      [&pi2_test_event_loop]() { pi2_test_event_loop.Run(); });
+
+  std::thread pi1_server_thread(
+      [&pi1_server_event_loop]() { pi1_server_event_loop.Run(); });
+  std::thread pi1_client_thread(
+      [&pi1_client_event_loop]() { pi1_client_event_loop.Run(); });
+  std::thread pi2_client_thread(
+      [&pi2_client_event_loop]() { pi2_client_event_loop.Run(); });
+
+  {
+    FLAGS_application_name = "pi2_message_bridge_server";
+    aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
+    MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
+
+    // Run for 5 seconds to make sure we have time to estimate the offset.
+    aos::TimerHandler *const quit = pi2_server_event_loop.AddTimer(
+        [&pi2_server_event_loop]() { pi2_server_event_loop.Exit(); });
+    pi2_server_event_loop.OnRun([quit, &pi2_server_event_loop]() {
+      // Stop between timestamps, not exactly on them.
+      quit->Setup(pi2_server_event_loop.monotonic_now() +
+                  chrono::milliseconds(3050));
+    });
+
+    // And go!
+    pi2_server_event_loop.Run();
+
+    // Now confirm we are synchronized.
+    EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
+    EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
+
+    const ServerConnection *const pi1_connection =
+        pi1_server_statistics_fetcher->connections()->Get(0);
+    const ServerConnection *const pi2_connection =
+        pi2_server_statistics_fetcher->connections()->Get(0);
+
+    EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
+    EXPECT_TRUE(pi1_connection->has_monotonic_offset());
+    EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
+              chrono::milliseconds(1));
+    EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
+              chrono::milliseconds(-1));
+
+    EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
+    EXPECT_TRUE(pi2_connection->has_monotonic_offset());
+    EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
+              chrono::milliseconds(1));
+    EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
+              chrono::milliseconds(-1));
+  }
+
+  std::this_thread::sleep_for(std::chrono::seconds(2));
+
+  {
+    // And confirm we are unsynchronized.
+    EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
+    EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
+
+    const ServerConnection *const pi1_server_connection =
+        pi1_server_statistics_fetcher->connections()->Get(0);
+    const ClientConnection *const pi1_client_connection =
+        pi1_client_statistics_fetcher->connections()->Get(0);
+
+    EXPECT_EQ(pi1_server_connection->state(), State::CONNECTED);
+    EXPECT_FALSE(pi1_server_connection->has_monotonic_offset());
+    EXPECT_EQ(pi1_client_connection->state(), State::DISCONNECTED);
+    EXPECT_FALSE(pi1_client_connection->has_monotonic_offset());
+  }
+
+  {
+    FLAGS_application_name = "pi2_message_bridge_server";
+    aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
+    MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
+
+    // Run for 5 seconds to make sure we have time to estimate the offset.
+    aos::TimerHandler *const quit = pi2_server_event_loop.AddTimer(
+        [&pi2_server_event_loop]() { pi2_server_event_loop.Exit(); });
+    pi2_server_event_loop.OnRun([quit, &pi2_server_event_loop]() {
+      // Stop between timestamps, not exactly on them.
+      quit->Setup(pi2_server_event_loop.monotonic_now() +
+                  chrono::milliseconds(3050));
+    });
+
+    // And go!
+    pi2_server_event_loop.Run();
+
+    // And confirm we are synchronized again.
+    EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
+    EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
+
+    const ServerConnection *const pi1_connection =
+        pi1_server_statistics_fetcher->connections()->Get(0);
+    const ServerConnection *const pi2_connection =
+        pi2_server_statistics_fetcher->connections()->Get(0);
+
+    EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
+    EXPECT_TRUE(pi1_connection->has_monotonic_offset());
+    EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
+              chrono::milliseconds(1));
+    EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
+              chrono::milliseconds(-1));
+
+    EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
+    EXPECT_TRUE(pi2_connection->has_monotonic_offset());
+    EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
+              chrono::milliseconds(1));
+    EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
+              chrono::milliseconds(-1));
+  }
+
+  // Shut everyone else down
+  pi1_server_event_loop.Exit();
+  pi1_client_event_loop.Exit();
+  pi2_client_event_loop.Exit();
+  pi1_test_event_loop.Exit();
+  pi2_test_event_loop.Exit();
+  pi1_server_thread.join();
+  pi1_client_thread.join();
+  pi2_client_thread.join();
+  pi1_test_thread.join();
+  pi2_test_thread.join();
+}
+
+// TODO(austin): This test confirms that the external state does the right
+// thing, but doesn't confirm that the internal state does.  We either need to
+// expose a way to check the state in a thread-safe way, or need a way to jump
+// time for one node to do that.
+
 }  // namespace testing
 }  // namespace message_bridge
 }  // namespace aos