Handle node reboots
Reset all the filter states when the node disappears. We don't handle
that very well, and don't need to.
Change-Id: Ia92b42f38a030a945120b6c404acae76abe4a0af
diff --git a/aos/network/message_bridge_test.cc b/aos/network/message_bridge_test.cc
index 27bed82..96003f7 100644
--- a/aos/network/message_bridge_test.cc
+++ b/aos/network/message_bridge_test.cc
@@ -42,7 +42,7 @@
aos::FlatbufferDetachedBuffer<aos::Configuration> server_config =
aos::configuration::ReadConfig(
"aos/network/message_bridge_test_server_config.json");
- aos::FlatbufferDetachedBuffer<aos::Configuration> client_config =
+ aos::FlatbufferDetachedBuffer<aos::Configuration> pi2_config =
aos::configuration::ReadConfig(
"aos/network/message_bridge_test_client_config.json");
@@ -65,20 +65,20 @@
// Now do it for "raspberrypi2", the client.
FLAGS_application_name = "pi2_message_bridge_client";
FLAGS_override_hostname = "raspberrypi2";
- aos::ShmEventLoop pi2_client_event_loop(&client_config.message());
+ aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
FLAGS_application_name = "pi2_message_bridge_server";
- aos::ShmEventLoop pi2_server_event_loop(&client_config.message());
+ aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
// And build the app which sends the pongs.
FLAGS_application_name = "pong";
- aos::ShmEventLoop pong_event_loop(&client_config.message());
+ aos::ShmEventLoop pong_event_loop(&pi2_config.message());
// And build the app for testing.
FLAGS_application_name = "test";
- aos::ShmEventLoop test_event_loop(&client_config.message());
+ aos::ShmEventLoop test_event_loop(&pi2_config.message());
aos::Fetcher<ClientStatistics> client_statistics_fetcher =
test_event_loop.MakeFetcher<ClientStatistics>("/aos");
@@ -156,24 +156,24 @@
});
int pi1_client_statistics_count = 0;
- ping_event_loop.MakeWatcher(
- "/pi1/aos", [&pi1_client_statistics_count](const ClientStatistics &stats) {
- LOG(INFO) << FlatbufferToJson(&stats);
+ ping_event_loop.MakeWatcher("/pi1/aos", [&pi1_client_statistics_count](
+ const ClientStatistics &stats) {
+ LOG(INFO) << FlatbufferToJson(&stats);
- for (const ClientConnection *connection : *stats.connections()) {
- if (connection->has_monotonic_offset()) {
- ++pi1_client_statistics_count;
- // It takes at least 10 microseconds to send a message between the
- // client and server. The min (filtered) time shouldn't be over 10
- // milliseconds on localhost. This might have to bump up if this is
- // proving flaky.
- EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
- chrono::milliseconds(10));
- EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
- chrono::microseconds(10));
- }
- }
- });
+ for (const ClientConnection *connection : *stats.connections()) {
+ if (connection->has_monotonic_offset()) {
+ ++pi1_client_statistics_count;
+ // It takes at least 10 microseconds to send a message between the
+ // client and server. The min (filtered) time shouldn't be over 10
+ // milliseconds on localhost. This might have to bump up if this is
+ // proving flaky.
+ EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
+ chrono::milliseconds(10));
+ EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
+ chrono::microseconds(10));
+ }
+ }
+ });
int pi2_client_statistics_count = 0;
pong_event_loop.MakeWatcher("/pi2/aos", [&pi2_client_statistics_count](
@@ -263,11 +263,430 @@
EXPECT_GE(pi2_server_statistics_count, 2);
EXPECT_GE(pi1_client_statistics_count, 2);
EXPECT_GE(pi2_client_statistics_count, 2);
-
- // TODO(austin): Need 2 servers going so we can do the round trip offset
- // estimation.
}
+// Test that the client disconnecting triggers the server offsets on both sides
+// to clear.
+TEST(MessageBridgeTest, ClientRestart) {
+ // This is rather annoying to set up. We need to start up a client and
+ // server, on the same node, but get them to think that they are on different
+ // nodes.
+ //
+ // We need the client to not post directly to "/test" like it would in a
+ // real system, otherwise we will re-send the ping message... So, use an
+ // application specific map to have the client post somewhere else.
+ //
+ // To top this all off, each of these needs to be done with a ShmEventLoop,
+ // which needs to run in a separate thread... And it is really hard to get
+ // everything started up reliably. So just be super generous on timeouts and
+ // hope for the best. We can be more generous in the future if we need to.
+ //
+ // We are faking the application names by passing in --application_name=foo
+ aos::FlatbufferDetachedBuffer<aos::Configuration> server_config =
+ aos::configuration::ReadConfig(
+ "aos/network/message_bridge_test_server_config.json");
+ aos::FlatbufferDetachedBuffer<aos::Configuration> pi2_config =
+ aos::configuration::ReadConfig(
+ "aos/network/message_bridge_test_client_config.json");
+
+ FLAGS_application_name = "pi1_message_bridge_server";
+ // Force ourselves to be "raspberrypi" and allocate everything.
+ FLAGS_override_hostname = "raspberrypi";
+ aos::ShmEventLoop pi1_server_event_loop(&server_config.message());
+ MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
+
+ FLAGS_application_name = "pi1_message_bridge_client";
+ aos::ShmEventLoop pi1_client_event_loop(&server_config.message());
+ MessageBridgeClient pi1_message_bridge_client(&pi1_client_event_loop);
+
+ // And build the app for testing.
+ FLAGS_application_name = "test1";
+ aos::ShmEventLoop pi1_test_event_loop(&server_config.message());
+ aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
+ pi1_test_event_loop.MakeFetcher<ServerStatistics>("/pi1/aos");
+
+ // Now do it for "raspberrypi2", the client.
+ FLAGS_override_hostname = "raspberrypi2";
+ FLAGS_application_name = "pi2_message_bridge_server";
+ aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
+ MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
+
+ // And build the app for testing.
+ FLAGS_application_name = "test2";
+ aos::ShmEventLoop pi2_test_event_loop(&pi2_config.message());
+ aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
+ pi2_test_event_loop.MakeFetcher<ServerStatistics>("/pi2/aos");
+
+ // Wait until we are connected, then send.
+ pi1_test_event_loop.MakeWatcher(
+ "/pi1/aos", [](const ServerStatistics &stats) {
+ LOG(INFO) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
+ });
+
+ pi2_test_event_loop.MakeWatcher(
+ "/pi2/aos", [](const ServerStatistics &stats) {
+ LOG(INFO) << "pi2 ServerStatistics " << FlatbufferToJson(&stats);
+ });
+
+ pi1_test_event_loop.MakeWatcher(
+ "/pi1/aos", [](const ClientStatistics &stats) {
+ LOG(INFO) << "pi1 ClientStatistics " << FlatbufferToJson(&stats);
+ });
+
+ pi2_test_event_loop.MakeWatcher(
+ "/pi2/aos", [](const ClientStatistics &stats) {
+ LOG(INFO) << "pi2 ClientStatistics " << FlatbufferToJson(&stats);
+ });
+
+ pi1_test_event_loop.MakeWatcher("/pi1/aos", [](const Timestamp ×tamp) {
+ LOG(INFO) << "pi1 Timestamp " << FlatbufferToJson(×tamp);
+ });
+ pi2_test_event_loop.MakeWatcher("/pi2/aos", [](const Timestamp ×tamp) {
+ LOG(INFO) << "pi2 Timestamp " << FlatbufferToJson(×tamp);
+ });
+
+ // Start everything up. Pong is the only thing we don't know how to wait on,
+ // so start it first.
+ std::thread pi1_test_thread(
+ [&pi1_test_event_loop]() { pi1_test_event_loop.Run(); });
+ std::thread pi2_test_thread(
+ [&pi2_test_event_loop]() { pi2_test_event_loop.Run(); });
+
+ std::thread pi1_server_thread(
+ [&pi1_server_event_loop]() { pi1_server_event_loop.Run(); });
+ std::thread pi1_client_thread(
+ [&pi1_client_event_loop]() { pi1_client_event_loop.Run(); });
+ std::thread pi2_server_thread(
+ [&pi2_server_event_loop]() { pi2_server_event_loop.Run(); });
+
+ {
+ FLAGS_application_name = "pi2_message_bridge_client";
+ aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
+ MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
+
+ // Run for 5 seconds to make sure we have time to estimate the offset.
+ aos::TimerHandler *const quit = pi2_client_event_loop.AddTimer(
+ [&pi2_client_event_loop]() { pi2_client_event_loop.Exit(); });
+ pi2_client_event_loop.OnRun([quit, &pi2_client_event_loop]() {
+ // Stop between timestamps, not exactly on them.
+ quit->Setup(pi2_client_event_loop.monotonic_now() +
+ chrono::milliseconds(3050));
+ });
+
+ // And go!
+ pi2_client_event_loop.Run();
+
+ // Now confirm we are synchronized.
+ EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
+ EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
+
+ const ServerConnection *const pi1_connection =
+ pi1_server_statistics_fetcher->connections()->Get(0);
+ const ServerConnection *const pi2_connection =
+ pi2_server_statistics_fetcher->connections()->Get(0);
+
+ EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
+ EXPECT_TRUE(pi1_connection->has_monotonic_offset());
+ EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
+ chrono::milliseconds(1));
+ EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
+ chrono::milliseconds(-1));
+
+ EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
+ EXPECT_TRUE(pi2_connection->has_monotonic_offset());
+ EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
+ chrono::milliseconds(1));
+ EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
+ chrono::milliseconds(-1));
+ }
+
+ std::this_thread::sleep_for(std::chrono::seconds(2));
+
+ {
+ // Now confirm we are un-synchronized.
+ EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
+ EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
+ const ServerConnection *const pi1_connection =
+ pi1_server_statistics_fetcher->connections()->Get(0);
+ const ServerConnection *const pi2_connection =
+ pi2_server_statistics_fetcher->connections()->Get(0);
+
+ EXPECT_EQ(pi1_connection->state(), State::DISCONNECTED);
+ EXPECT_FALSE(pi1_connection->has_monotonic_offset());
+ EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
+ EXPECT_FALSE(pi2_connection->has_monotonic_offset());
+ }
+
+ {
+ FLAGS_application_name = "pi2_message_bridge_client";
+ aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
+ MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
+
+ // Run for 5 seconds to make sure we have time to estimate the offset.
+ aos::TimerHandler *const quit = pi2_client_event_loop.AddTimer(
+ [&pi2_client_event_loop]() { pi2_client_event_loop.Exit(); });
+ pi2_client_event_loop.OnRun([quit, &pi2_client_event_loop]() {
+ // Stop between timestamps, not exactly on them.
+ quit->Setup(pi2_client_event_loop.monotonic_now() +
+ chrono::milliseconds(3050));
+ });
+
+ // And go!
+ pi2_client_event_loop.Run();
+
+ EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
+ EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
+
+ // Now confirm we are synchronized again.
+ const ServerConnection *const pi1_connection =
+ pi1_server_statistics_fetcher->connections()->Get(0);
+ const ServerConnection *const pi2_connection =
+ pi2_server_statistics_fetcher->connections()->Get(0);
+
+ EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
+ EXPECT_TRUE(pi1_connection->has_monotonic_offset());
+ EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
+ chrono::milliseconds(1));
+ EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
+ chrono::milliseconds(-1));
+
+ EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
+ EXPECT_TRUE(pi2_connection->has_monotonic_offset());
+ EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
+ chrono::milliseconds(1));
+ EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
+ chrono::milliseconds(-1));
+ }
+
+ // Shut everyone else down
+ pi1_server_event_loop.Exit();
+ pi1_client_event_loop.Exit();
+ pi2_server_event_loop.Exit();
+ pi1_test_event_loop.Exit();
+ pi2_test_event_loop.Exit();
+ pi1_server_thread.join();
+ pi1_client_thread.join();
+ pi2_server_thread.join();
+ pi1_test_thread.join();
+ pi2_test_thread.join();
+}
+
+// Test that the server disconnecting triggers the server offsets on the other
+// side to clear, along with the other client.
+TEST(MessageBridgeTest, ServerRestart) {
+ // This is rather annoying to set up. We need to start up a client and
+ // server, on the same node, but get them to think that they are on different
+ // nodes.
+ //
+ // We need the client to not post directly to "/test" like it would in a
+ // real system, otherwise we will re-send the ping message... So, use an
+ // application specific map to have the client post somewhere else.
+ //
+ // To top this all off, each of these needs to be done with a ShmEventLoop,
+ // which needs to run in a separate thread... And it is really hard to get
+ // everything started up reliably. So just be super generous on timeouts and
+ // hope for the best. We can be more generous in the future if we need to.
+ //
+ // We are faking the application names by passing in --application_name=foo
+ aos::FlatbufferDetachedBuffer<aos::Configuration> server_config =
+ aos::configuration::ReadConfig(
+ "aos/network/message_bridge_test_server_config.json");
+ aos::FlatbufferDetachedBuffer<aos::Configuration> pi2_config =
+ aos::configuration::ReadConfig(
+ "aos/network/message_bridge_test_client_config.json");
+
+ FLAGS_application_name = "pi1_message_bridge_server";
+ // Force ourselves to be "raspberrypi" and allocate everything.
+ FLAGS_override_hostname = "raspberrypi";
+ aos::ShmEventLoop pi1_server_event_loop(&server_config.message());
+ MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
+
+ FLAGS_application_name = "pi1_message_bridge_client";
+ aos::ShmEventLoop pi1_client_event_loop(&server_config.message());
+ MessageBridgeClient pi1_message_bridge_client(&pi1_client_event_loop);
+
+ // And build the app for testing.
+ FLAGS_application_name = "test1";
+ aos::ShmEventLoop pi1_test_event_loop(&server_config.message());
+ aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
+ pi1_test_event_loop.MakeFetcher<ServerStatistics>("/pi1/aos");
+ aos::Fetcher<ClientStatistics> pi1_client_statistics_fetcher =
+ pi1_test_event_loop.MakeFetcher<ClientStatistics>("/pi1/aos");
+
+ // Now do it for "raspberrypi2", the client.
+ FLAGS_override_hostname = "raspberrypi2";
+ FLAGS_application_name = "pi2_message_bridge_client";
+ aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
+ MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
+
+ // And build the app for testing.
+ FLAGS_application_name = "test2";
+ aos::ShmEventLoop pi2_test_event_loop(&pi2_config.message());
+ aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
+ pi2_test_event_loop.MakeFetcher<ServerStatistics>("/pi2/aos");
+
+ // Wait until we are connected, then send.
+ pi1_test_event_loop.MakeWatcher(
+ "/pi1/aos", [](const ServerStatistics &stats) {
+ LOG(INFO) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
+ });
+
+ // Confirm both client and server statistics messages have decent offsets in
+ // them.
+ pi2_test_event_loop.MakeWatcher(
+ "/pi2/aos", [](const ServerStatistics &stats) {
+ LOG(INFO) << "pi2 ServerStatistics " << FlatbufferToJson(&stats);
+ });
+
+ pi1_test_event_loop.MakeWatcher(
+ "/pi1/aos", [](const ClientStatistics &stats) {
+ LOG(INFO) << "pi1 ClientStatistics " << FlatbufferToJson(&stats);
+ });
+
+ pi2_test_event_loop.MakeWatcher(
+ "/pi2/aos", [](const ClientStatistics &stats) {
+ LOG(INFO) << "pi2 ClientStatistics " << FlatbufferToJson(&stats);
+ });
+
+ pi1_test_event_loop.MakeWatcher("/pi1/aos", [](const Timestamp ×tamp) {
+ LOG(INFO) << "pi1 Timestamp " << FlatbufferToJson(×tamp);
+ });
+ pi2_test_event_loop.MakeWatcher("/pi2/aos", [](const Timestamp ×tamp) {
+ LOG(INFO) << "pi2 Timestamp " << FlatbufferToJson(×tamp);
+ });
+
+ // Start everything up. Pong is the only thing we don't know how to wait on,
+ // so start it first.
+ std::thread pi1_test_thread(
+ [&pi1_test_event_loop]() { pi1_test_event_loop.Run(); });
+ std::thread pi2_test_thread(
+ [&pi2_test_event_loop]() { pi2_test_event_loop.Run(); });
+
+ std::thread pi1_server_thread(
+ [&pi1_server_event_loop]() { pi1_server_event_loop.Run(); });
+ std::thread pi1_client_thread(
+ [&pi1_client_event_loop]() { pi1_client_event_loop.Run(); });
+ std::thread pi2_client_thread(
+ [&pi2_client_event_loop]() { pi2_client_event_loop.Run(); });
+
+ {
+ FLAGS_application_name = "pi2_message_bridge_server";
+ aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
+ MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
+
+ // Run for 5 seconds to make sure we have time to estimate the offset.
+ aos::TimerHandler *const quit = pi2_server_event_loop.AddTimer(
+ [&pi2_server_event_loop]() { pi2_server_event_loop.Exit(); });
+ pi2_server_event_loop.OnRun([quit, &pi2_server_event_loop]() {
+ // Stop between timestamps, not exactly on them.
+ quit->Setup(pi2_server_event_loop.monotonic_now() +
+ chrono::milliseconds(3050));
+ });
+
+ // And go!
+ pi2_server_event_loop.Run();
+
+ // Now confirm we are synchronized.
+ EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
+ EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
+
+ const ServerConnection *const pi1_connection =
+ pi1_server_statistics_fetcher->connections()->Get(0);
+ const ServerConnection *const pi2_connection =
+ pi2_server_statistics_fetcher->connections()->Get(0);
+
+ EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
+ EXPECT_TRUE(pi1_connection->has_monotonic_offset());
+ EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
+ chrono::milliseconds(1));
+ EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
+ chrono::milliseconds(-1));
+
+ EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
+ EXPECT_TRUE(pi2_connection->has_monotonic_offset());
+ EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
+ chrono::milliseconds(1));
+ EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
+ chrono::milliseconds(-1));
+ }
+
+ std::this_thread::sleep_for(std::chrono::seconds(2));
+
+ {
+ // And confirm we are unsynchronized.
+ EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
+ EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
+
+ const ServerConnection *const pi1_server_connection =
+ pi1_server_statistics_fetcher->connections()->Get(0);
+ const ClientConnection *const pi1_client_connection =
+ pi1_client_statistics_fetcher->connections()->Get(0);
+
+ EXPECT_EQ(pi1_server_connection->state(), State::CONNECTED);
+ EXPECT_FALSE(pi1_server_connection->has_monotonic_offset());
+ EXPECT_EQ(pi1_client_connection->state(), State::DISCONNECTED);
+ EXPECT_FALSE(pi1_client_connection->has_monotonic_offset());
+ }
+
+ {
+ FLAGS_application_name = "pi2_message_bridge_server";
+ aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
+ MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
+
+ // Run for 5 seconds to make sure we have time to estimate the offset.
+ aos::TimerHandler *const quit = pi2_server_event_loop.AddTimer(
+ [&pi2_server_event_loop]() { pi2_server_event_loop.Exit(); });
+ pi2_server_event_loop.OnRun([quit, &pi2_server_event_loop]() {
+ // Stop between timestamps, not exactly on them.
+ quit->Setup(pi2_server_event_loop.monotonic_now() +
+ chrono::milliseconds(3050));
+ });
+
+ // And go!
+ pi2_server_event_loop.Run();
+
+ // And confirm we are synchronized again.
+ EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
+ EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
+
+ const ServerConnection *const pi1_connection =
+ pi1_server_statistics_fetcher->connections()->Get(0);
+ const ServerConnection *const pi2_connection =
+ pi2_server_statistics_fetcher->connections()->Get(0);
+
+ EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
+ EXPECT_TRUE(pi1_connection->has_monotonic_offset());
+ EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
+ chrono::milliseconds(1));
+ EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
+ chrono::milliseconds(-1));
+
+ EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
+ EXPECT_TRUE(pi2_connection->has_monotonic_offset());
+ EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
+ chrono::milliseconds(1));
+ EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
+ chrono::milliseconds(-1));
+ }
+
+ // Shut everyone else down
+ pi1_server_event_loop.Exit();
+ pi1_client_event_loop.Exit();
+ pi2_client_event_loop.Exit();
+ pi1_test_event_loop.Exit();
+ pi2_test_event_loop.Exit();
+ pi1_server_thread.join();
+ pi1_client_thread.join();
+ pi2_client_thread.join();
+ pi1_test_thread.join();
+ pi2_test_thread.join();
+}
+
+// TODO(austin): This test confirms that the external state does the right
+// thing, but doesn't confirm that the internal state does. We either need to
+// expose a way to check the state in a thread-safe way, or need a way to jump
+// time for one node to do that.
+
} // namespace testing
} // namespace message_bridge
} // namespace aos