Refactor common utilities in MessageBridgeTest

There's a lot of duplicated code starting and stopping the clients and
servers.  The end result is that the actual test gets lost too easily.

This is also a predecesor to using the timestamp timestamps in online
time estimation, though that effort is likely delayed for a while.

Change-Id: Iee2769b0d86fb8e0679b7fc361bc462196ba0bce
diff --git a/aos/network/message_bridge_test.cc b/aos/network/message_bridge_test.cc
index 7fc5c1f..a9e918b 100644
--- a/aos/network/message_bridge_test.cc
+++ b/aos/network/message_bridge_test.cc
@@ -44,8 +44,251 @@
     util::UnlinkRecursive(ShmBase("pi2"));
   }
 
+  void OnPi1() {
+    DoSetShmBase("pi1");
+    FLAGS_override_hostname = "raspberrypi";
+  }
+
+  void OnPi2() {
+    DoSetShmBase("pi2");
+    FLAGS_override_hostname = "raspberrypi2";
+  }
+
+  void MakePi1Server() {
+    OnPi1();
+    FLAGS_application_name = "pi1_message_bridge_server";
+    pi1_server_event_loop =
+        std::make_unique<aos::ShmEventLoop>(&pi1_config.message());
+    pi1_server_event_loop->SetRuntimeRealtimePriority(1);
+    pi1_message_bridge_server =
+        std::make_unique<MessageBridgeServer>(pi1_server_event_loop.get());
+  }
+
+  void RunPi1Server(chrono::nanoseconds duration) {
+    // Setup a shutdown callback.
+    aos::TimerHandler *const quit = pi1_server_event_loop->AddTimer(
+        [this]() { pi1_server_event_loop->Exit(); });
+    pi1_server_event_loop->OnRun([this, quit, duration]() {
+      // Stop between timestamps, not exactly on them.
+      quit->Setup(pi1_server_event_loop->monotonic_now() + duration);
+    });
+
+    pi1_server_event_loop->Run();
+  }
+
+  void StartPi1Server() {
+    pi1_server_thread = std::thread([this]() {
+      LOG(INFO) << "Started pi1_message_bridge_server";
+      pi1_server_event_loop->Run();
+    });
+  }
+
+  void StopPi1Server() {
+    if (pi1_server_thread.joinable()) {
+      pi1_server_event_loop->Exit();
+      pi1_server_thread.join();
+      pi1_server_thread = std::thread();
+    }
+    pi1_message_bridge_server.reset();
+    pi1_server_event_loop.reset();
+  }
+
+  void MakePi1Client() {
+    OnPi1();
+    FLAGS_application_name = "pi1_message_bridge_client";
+    pi1_client_event_loop =
+        std::make_unique<aos::ShmEventLoop>(&pi1_config.message());
+    pi1_client_event_loop->SetRuntimeRealtimePriority(1);
+    pi1_message_bridge_client =
+        std::make_unique<MessageBridgeClient>(pi1_client_event_loop.get());
+  }
+
+  void StartPi1Client() {
+    pi1_client_thread = std::thread([this]() {
+      LOG(INFO) << "Started pi1_message_bridge_client";
+      pi1_client_event_loop->Run();
+    });
+  }
+
+  void StopPi1Client() {
+    pi1_client_event_loop->Exit();
+    pi1_client_thread.join();
+    pi1_client_thread = std::thread();
+    pi1_message_bridge_client.reset();
+    pi1_client_event_loop.reset();
+  }
+
+  void MakePi1Test() {
+    OnPi1();
+    FLAGS_application_name = "test1";
+    pi1_test_event_loop =
+        std::make_unique<aos::ShmEventLoop>(&pi1_config.message());
+
+    pi1_test_event_loop->MakeWatcher(
+        "/pi1/aos", [](const ServerStatistics &stats) {
+          VLOG(1) << "/pi1/aos ServerStatistics " << FlatbufferToJson(&stats);
+        });
+
+    pi1_test_event_loop->MakeWatcher(
+        "/pi1/aos", [](const ClientStatistics &stats) {
+          VLOG(1) << "/pi1/aos ClientStatistics " << FlatbufferToJson(&stats);
+        });
+
+    pi1_test_event_loop->MakeWatcher(
+        "/pi1/aos", [](const Timestamp &timestamp) {
+          VLOG(1) << "/pi1/aos Timestamp " << FlatbufferToJson(&timestamp);
+        });
+  }
+
+  void StartPi1Test() {
+    pi1_test_thread = std::thread([this]() {
+      LOG(INFO) << "Started pi1_test";
+      pi1_test_event_loop->Run();
+    });
+  }
+
+  void StopPi1Test() {
+    pi1_test_event_loop->Exit();
+    pi1_test_thread.join();
+  }
+
+  void MakePi2Server() {
+    OnPi2();
+    FLAGS_application_name = "pi2_message_bridge_server";
+    pi2_server_event_loop =
+        std::make_unique<aos::ShmEventLoop>(&pi2_config.message());
+    pi2_server_event_loop->SetRuntimeRealtimePriority(1);
+    pi2_message_bridge_server =
+        std::make_unique<MessageBridgeServer>(pi2_server_event_loop.get());
+  }
+
+  void RunPi2Server(chrono::nanoseconds duration) {
+    // Setup a shutdown callback.
+    aos::TimerHandler *const quit = pi2_server_event_loop->AddTimer(
+        [this]() { pi2_server_event_loop->Exit(); });
+    pi2_server_event_loop->OnRun([this, quit, duration]() {
+      // Stop between timestamps, not exactly on them.
+      quit->Setup(pi2_server_event_loop->monotonic_now() + duration);
+    });
+
+    pi2_server_event_loop->Run();
+  }
+
+  void StartPi2Server() {
+    pi2_server_thread = std::thread([this]() {
+      LOG(INFO) << "Started pi2_message_bridge_server";
+      pi2_server_event_loop->Run();
+    });
+  }
+
+  void StopPi2Server() {
+    if (pi2_server_thread.joinable()) {
+      pi2_server_event_loop->Exit();
+      pi2_server_thread.join();
+      pi2_server_thread = std::thread();
+    }
+    pi2_message_bridge_server.reset();
+    pi2_server_event_loop.reset();
+  }
+
+  void MakePi2Client() {
+    OnPi2();
+    FLAGS_application_name = "pi2_message_bridge_client";
+    pi2_client_event_loop =
+        std::make_unique<aos::ShmEventLoop>(&pi2_config.message());
+    pi2_client_event_loop->SetRuntimeRealtimePriority(1);
+    pi2_message_bridge_client =
+        std::make_unique<MessageBridgeClient>(pi2_client_event_loop.get());
+  }
+
+  void RunPi2Client(chrono::nanoseconds duration) {
+    // Run for 5 seconds to make sure we have time to estimate the offset.
+    aos::TimerHandler *const quit = pi2_client_event_loop->AddTimer(
+        [this]() { pi2_client_event_loop->Exit(); });
+    pi2_client_event_loop->OnRun([this, quit, duration]() {
+      // Stop between timestamps, not exactly on them.
+      quit->Setup(pi2_client_event_loop->monotonic_now() + duration);
+    });
+
+    // And go!
+    pi2_client_event_loop->Run();
+  }
+
+  void StartPi2Client() {
+    pi2_client_thread = std::thread([this]() {
+      LOG(INFO) << "Started pi2_message_bridge_client";
+      pi2_client_event_loop->Run();
+    });
+  }
+
+  void StopPi2Client() {
+    if (pi2_client_thread.joinable()) {
+      pi2_client_event_loop->Exit();
+      pi2_client_thread.join();
+      pi2_client_thread = std::thread();
+    }
+    pi2_message_bridge_client.reset();
+    pi2_client_event_loop.reset();
+  }
+
+  void MakePi2Test() {
+    OnPi2();
+    FLAGS_application_name = "test2";
+    pi2_test_event_loop =
+        std::make_unique<aos::ShmEventLoop>(&pi2_config.message());
+
+    pi2_test_event_loop->MakeWatcher(
+        "/pi2/aos", [](const ServerStatistics &stats) {
+          VLOG(1) << "/pi2/aos ServerStatistics " << FlatbufferToJson(&stats);
+        });
+
+    pi2_test_event_loop->MakeWatcher(
+        "/pi2/aos", [](const ClientStatistics &stats) {
+          VLOG(1) << "/pi2/aos ClientStatistics " << FlatbufferToJson(&stats);
+        });
+
+    pi2_test_event_loop->MakeWatcher(
+        "/pi2/aos", [](const Timestamp &timestamp) {
+          VLOG(1) << "/pi2/aos Timestamp " << FlatbufferToJson(&timestamp);
+        });
+  }
+
+  void StartPi2Test() {
+    pi2_test_thread = std::thread([this]() {
+      LOG(INFO) << "Started pi2_message_bridge_test";
+      pi2_test_event_loop->Run();
+    });
+  }
+
+  void StopPi2Test() {
+    pi2_test_event_loop->Exit();
+    pi2_test_thread.join();
+  }
+
   aos::FlatbufferDetachedBuffer<aos::Configuration> pi1_config;
   aos::FlatbufferDetachedBuffer<aos::Configuration> pi2_config;
+
+  std::unique_ptr<aos::ShmEventLoop> pi1_server_event_loop;
+  std::unique_ptr<MessageBridgeServer> pi1_message_bridge_server;
+  std::thread pi1_server_thread;
+
+  std::unique_ptr<aos::ShmEventLoop> pi1_client_event_loop;
+  std::unique_ptr<MessageBridgeClient> pi1_message_bridge_client;
+  std::thread pi1_client_thread;
+
+  std::unique_ptr<aos::ShmEventLoop> pi1_test_event_loop;
+  std::thread pi1_test_thread;
+
+  std::unique_ptr<aos::ShmEventLoop> pi2_server_event_loop;
+  std::unique_ptr<MessageBridgeServer> pi2_message_bridge_server;
+  std::thread pi2_server_thread;
+
+  std::unique_ptr<aos::ShmEventLoop> pi2_client_event_loop;
+  std::unique_ptr<MessageBridgeClient> pi2_message_bridge_client;
+  std::thread pi2_client_thread;
+
+  std::unique_ptr<aos::ShmEventLoop> pi2_test_event_loop;
+  std::thread pi2_test_thread;
 };
 
 // Test that we can send a ping message over sctp and receive it.
@@ -70,19 +313,11 @@
   // hope for the best.  We can be more generous in the future if we need to.
   //
   // We are faking the application names by passing in --application_name=foo
-  DoSetShmBase("pi1");
-  FLAGS_application_name = "pi1_message_bridge_server";
+  OnPi1();
   // Force ourselves to be "raspberrypi" and allocate everything.
-  FLAGS_override_hostname = "raspberrypi";
 
-  aos::ShmEventLoop pi1_server_event_loop(&pi1_config.message());
-  pi1_server_event_loop.SetRuntimeRealtimePriority(1);
-  MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
-
-  FLAGS_application_name = "pi1_message_bridge_client";
-  aos::ShmEventLoop pi1_client_event_loop(&pi1_config.message());
-  pi1_client_event_loop.SetRuntimeRealtimePriority(1);
-  MessageBridgeClient pi1_message_bridge_client(&pi1_client_event_loop);
+  MakePi1Server();
+  MakePi1Client();
 
   // And build the app which sends the pings.
   FLAGS_application_name = "ping";
@@ -102,18 +337,10 @@
       ping_event_loop.MakeFetcher<Timestamp>("/aos");
 
   // Now do it for "raspberrypi2", the client.
-  FLAGS_application_name = "pi2_message_bridge_client";
-  FLAGS_override_hostname = "raspberrypi2";
-  DoSetShmBase("pi2");
+  OnPi2();
 
-  aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
-  pi2_client_event_loop.SetRuntimeRealtimePriority(1);
-  MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
-
-  FLAGS_application_name = "pi2_message_bridge_server";
-  aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
-  pi2_server_event_loop.SetRuntimeRealtimePriority(1);
-  MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
+  MakePi2Client();
+  MakePi2Server();
 
   // And build the app which sends the pongs.
   FLAGS_application_name = "pong";
@@ -154,7 +381,7 @@
   int pi1_server_statistics_count = 0;
   ping_event_loop.MakeWatcher(
       "/pi1/aos",
-      [&ping_count, &pi2_client_event_loop, &ping_sender,
+      [this, &ping_count, &ping_sender,
        &pi1_server_statistics_count](const ServerStatistics &stats) {
         VLOG(1) << "/pi1/aos ServerStatistics " << FlatbufferToJson(&stats);
 
@@ -174,7 +401,7 @@
           }
 
           if (connection->node()->name()->string_view() ==
-              pi2_client_event_loop.node()->name()->string_view()) {
+              pi2_client_event_loop->node()->name()->string_view()) {
             if (connection->state() == State::CONNECTED) {
               EXPECT_TRUE(connection->has_boot_uuid());
               connected = true;
@@ -367,28 +594,20 @@
   // so start it first.
   std::thread pong_thread([&pong_event_loop]() { pong_event_loop.Run(); });
 
-  std::thread pi1_server_thread(
-      [&pi1_server_event_loop]() { pi1_server_event_loop.Run(); });
-  std::thread pi1_client_thread(
-      [&pi1_client_event_loop]() { pi1_client_event_loop.Run(); });
-  std::thread pi2_client_thread(
-      [&pi2_client_event_loop]() { pi2_client_event_loop.Run(); });
-  std::thread pi2_server_thread(
-      [&pi2_server_event_loop]() { pi2_server_event_loop.Run(); });
+  StartPi1Server();
+  StartPi1Client();
+  StartPi2Client();
+  StartPi2Server();
 
   // And go!
   ping_event_loop.Run();
 
   // Shut everyone else down
-  pi1_server_event_loop.Exit();
-  pi1_client_event_loop.Exit();
-  pi2_client_event_loop.Exit();
-  pi2_server_event_loop.Exit();
+  StopPi1Server();
+  StopPi1Client();
+  StopPi2Client();
+  StopPi2Server();
   pong_event_loop.Exit();
-  pi1_server_thread.join();
-  pi1_client_thread.join();
-  pi2_client_thread.join();
-  pi2_server_thread.join();
   pong_thread.join();
 
   // Make sure we sent something.
@@ -441,98 +660,37 @@
   // hope for the best.  We can be more generous in the future if we need to.
   //
   // We are faking the application names by passing in --application_name=foo
-  FLAGS_application_name = "pi1_message_bridge_server";
-  // Force ourselves to be "raspberrypi" and allocate everything.
-  FLAGS_override_hostname = "raspberrypi";
-  DoSetShmBase("pi1");
-  aos::ShmEventLoop pi1_server_event_loop(&pi1_config.message());
-  pi1_server_event_loop.SetRuntimeRealtimePriority(1);
-  MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
+  OnPi1();
 
-  FLAGS_application_name = "pi1_message_bridge_client";
-  aos::ShmEventLoop pi1_client_event_loop(&pi1_config.message());
-  pi1_client_event_loop.SetRuntimeRealtimePriority(1);
-  MessageBridgeClient pi1_message_bridge_client(&pi1_client_event_loop);
+  MakePi1Server();
+  MakePi1Client();
 
   // And build the app for testing.
-  FLAGS_application_name = "test1";
-  aos::ShmEventLoop pi1_test_event_loop(&pi1_config.message());
+  MakePi1Test();
   aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
-      pi1_test_event_loop.MakeFetcher<ServerStatistics>("/pi1/aos");
+      pi1_test_event_loop->MakeFetcher<ServerStatistics>("/pi1/aos");
 
   // Now do it for "raspberrypi2", the client.
-  FLAGS_override_hostname = "raspberrypi2";
-  DoSetShmBase("pi2");
-  FLAGS_application_name = "pi2_message_bridge_server";
-  aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
-  pi2_server_event_loop.SetRuntimeRealtimePriority(1);
-  MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
+  OnPi2();
+  MakePi2Server();
 
   // And build the app for testing.
-  FLAGS_application_name = "test2";
-  aos::ShmEventLoop pi2_test_event_loop(&pi2_config.message());
+  MakePi2Test();
   aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
-      pi2_test_event_loop.MakeFetcher<ServerStatistics>("/pi2/aos");
+      pi2_test_event_loop->MakeFetcher<ServerStatistics>("/pi2/aos");
 
   // Wait until we are connected, then send.
-  pi1_test_event_loop.MakeWatcher(
-      "/pi1/aos", [](const ServerStatistics &stats) {
-        VLOG(1) << "/pi1/aos ServerStatistics " << FlatbufferToJson(&stats);
-      });
 
-  pi2_test_event_loop.MakeWatcher(
-      "/pi2/aos", [](const ServerStatistics &stats) {
-        VLOG(1) << "/pi2/aos ServerStatistics " << FlatbufferToJson(&stats);
-      });
-
-  pi1_test_event_loop.MakeWatcher(
-      "/pi1/aos", [](const ClientStatistics &stats) {
-        VLOG(1) << "/pi1/aos ClientStatistics " << FlatbufferToJson(&stats);
-      });
-
-  pi2_test_event_loop.MakeWatcher(
-      "/pi2/aos", [](const ClientStatistics &stats) {
-        VLOG(1) << "/pi2/aos ClientStatistics " << FlatbufferToJson(&stats);
-      });
-
-  pi1_test_event_loop.MakeWatcher("/pi1/aos", [](const Timestamp &timestamp) {
-    VLOG(1) << "/pi1/aos Timestamp " << FlatbufferToJson(&timestamp);
-  });
-  pi2_test_event_loop.MakeWatcher("/pi2/aos", [](const Timestamp &timestamp) {
-    VLOG(1) << "/pi2/aos Timestamp " << FlatbufferToJson(&timestamp);
-  });
-
-  // Start everything up.  Pong is the only thing we don't know how to wait on,
-  // so start it first.
-  std::thread pi1_test_thread(
-      [&pi1_test_event_loop]() { pi1_test_event_loop.Run(); });
-  std::thread pi2_test_thread(
-      [&pi2_test_event_loop]() { pi2_test_event_loop.Run(); });
-
-  std::thread pi1_server_thread(
-      [&pi1_server_event_loop]() { pi1_server_event_loop.Run(); });
-  std::thread pi1_client_thread(
-      [&pi1_client_event_loop]() { pi1_client_event_loop.Run(); });
-  std::thread pi2_server_thread(
-      [&pi2_server_event_loop]() { pi2_server_event_loop.Run(); });
+  StartPi1Test();
+  StartPi2Test();
+  StartPi1Server();
+  StartPi1Client();
+  StartPi2Server();
 
   {
-    FLAGS_application_name = "pi2_message_bridge_client";
-    aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
-    pi2_client_event_loop.SetRuntimeRealtimePriority(1);
-    MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
+    MakePi2Client();
 
-    // Run for 5 seconds to make sure we have time to estimate the offset.
-    aos::TimerHandler *const quit = pi2_client_event_loop.AddTimer(
-        [&pi2_client_event_loop]() { pi2_client_event_loop.Exit(); });
-    pi2_client_event_loop.OnRun([quit, &pi2_client_event_loop]() {
-      // Stop between timestamps, not exactly on them.
-      quit->Setup(pi2_client_event_loop.monotonic_now() +
-                  chrono::milliseconds(3050));
-    });
-
-    // And go!
-    pi2_client_event_loop.Run();
+    RunPi2Client(chrono::milliseconds(3050));
 
     // Now confirm we are synchronized.
     EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
@@ -558,6 +716,8 @@
     EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
               chrono::milliseconds(-1));
     EXPECT_TRUE(pi2_connection->has_boot_uuid());
+
+    StopPi2Client();
   }
 
   std::this_thread::sleep_for(std::chrono::seconds(2));
@@ -580,22 +740,9 @@
   }
 
   {
-    FLAGS_application_name = "pi2_message_bridge_client";
-    aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
-    pi2_client_event_loop.SetRuntimeRealtimePriority(1);
-    MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
-
-    // Run for 5 seconds to make sure we have time to estimate the offset.
-    aos::TimerHandler *const quit = pi2_client_event_loop.AddTimer(
-        [&pi2_client_event_loop]() { pi2_client_event_loop.Exit(); });
-    pi2_client_event_loop.OnRun([quit, &pi2_client_event_loop]() {
-      // Stop between timestamps, not exactly on them.
-      quit->Setup(pi2_client_event_loop.monotonic_now() +
-                  chrono::milliseconds(3050));
-    });
-
+    MakePi2Client();
     // And go!
-    pi2_client_event_loop.Run();
+    RunPi2Client(chrono::milliseconds(3050));
 
     EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
     EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
@@ -621,19 +768,16 @@
     EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
               chrono::milliseconds(-1));
     EXPECT_TRUE(pi2_connection->has_boot_uuid());
+
+    StopPi2Client();
   }
 
   // Shut everyone else down
-  pi1_server_event_loop.Exit();
-  pi1_client_event_loop.Exit();
-  pi2_server_event_loop.Exit();
-  pi1_test_event_loop.Exit();
-  pi2_test_event_loop.Exit();
-  pi1_server_thread.join();
-  pi1_client_thread.join();
-  pi2_server_thread.join();
-  pi1_test_thread.join();
-  pi2_test_thread.join();
+  StopPi1Server();
+  StopPi1Client();
+  StopPi2Server();
+  StopPi1Test();
+  StopPi2Test();
 }
 
 // Test that the server disconnecting triggers the server offsets on the other
@@ -653,102 +797,42 @@
   // hope for the best.  We can be more generous in the future if we need to.
   //
   // We are faking the application names by passing in --application_name=foo
-  FLAGS_application_name = "pi1_message_bridge_server";
   // Force ourselves to be "raspberrypi" and allocate everything.
-  FLAGS_override_hostname = "raspberrypi";
-  DoSetShmBase("pi1");
-  aos::ShmEventLoop pi1_server_event_loop(&pi1_config.message());
-  pi1_server_event_loop.SetRuntimeRealtimePriority(1);
-  MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
-
-  FLAGS_application_name = "pi1_message_bridge_client";
-  aos::ShmEventLoop pi1_client_event_loop(&pi1_config.message());
-  pi1_client_event_loop.SetRuntimeRealtimePriority(1);
-  MessageBridgeClient pi1_message_bridge_client(&pi1_client_event_loop);
+  OnPi1();
+  MakePi1Server();
+  MakePi1Client();
 
   // And build the app for testing.
-  FLAGS_application_name = "test1";
-  aos::ShmEventLoop pi1_test_event_loop(&pi1_config.message());
+  MakePi1Test();
   aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
-      pi1_test_event_loop.MakeFetcher<ServerStatistics>("/pi1/aos");
+      pi1_test_event_loop->MakeFetcher<ServerStatistics>("/pi1/aos");
   aos::Fetcher<ClientStatistics> pi1_client_statistics_fetcher =
-      pi1_test_event_loop.MakeFetcher<ClientStatistics>("/pi1/aos");
+      pi1_test_event_loop->MakeFetcher<ClientStatistics>("/pi1/aos");
 
   // Now do it for "raspberrypi2", the client.
-  FLAGS_override_hostname = "raspberrypi2";
-  DoSetShmBase("pi2");
-  FLAGS_application_name = "pi2_message_bridge_client";
-  aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
-  pi2_client_event_loop.SetRuntimeRealtimePriority(1);
-  MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
+  OnPi2();
+  MakePi2Client();
 
   // And build the app for testing.
-  FLAGS_application_name = "test2";
-  aos::ShmEventLoop pi2_test_event_loop(&pi2_config.message());
+  MakePi2Test();
   aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
-      pi2_test_event_loop.MakeFetcher<ServerStatistics>("/pi2/aos");
-
-  // Wait until we are connected, then send.
-  pi1_test_event_loop.MakeWatcher(
-      "/pi1/aos", [](const ServerStatistics &stats) {
-        VLOG(1) << "/pi1/aos ServerStatistics " << FlatbufferToJson(&stats);
-      });
-
-  // Confirm both client and server statistics messages have decent offsets in
-  // them.
-  pi2_test_event_loop.MakeWatcher(
-      "/pi2/aos", [](const ServerStatistics &stats) {
-        VLOG(1) << "/pi2/aos ServerStatistics " << FlatbufferToJson(&stats);
-      });
-
-  pi1_test_event_loop.MakeWatcher(
-      "/pi1/aos", [](const ClientStatistics &stats) {
-        VLOG(1) << "/pi1/aos ClientStatistics " << FlatbufferToJson(&stats);
-      });
-
-  pi2_test_event_loop.MakeWatcher(
-      "/pi2/aos", [](const ClientStatistics &stats) {
-        VLOG(1) << "/pi2/aos ClientStatistics " << FlatbufferToJson(&stats);
-      });
-
-  pi1_test_event_loop.MakeWatcher("/pi1/aos", [](const Timestamp &timestamp) {
-    VLOG(1) << "/pi1/aos Timestamp " << FlatbufferToJson(&timestamp);
-  });
-  pi2_test_event_loop.MakeWatcher("/pi2/aos", [](const Timestamp &timestamp) {
-    VLOG(1) << "/pi2/aos Timestamp " << FlatbufferToJson(&timestamp);
-  });
+      pi2_test_event_loop->MakeFetcher<ServerStatistics>("/pi2/aos");
 
   // Start everything up.  Pong is the only thing we don't know how to wait on,
   // so start it first.
-  std::thread pi1_test_thread(
-      [&pi1_test_event_loop]() { pi1_test_event_loop.Run(); });
-  std::thread pi2_test_thread(
-      [&pi2_test_event_loop]() { pi2_test_event_loop.Run(); });
+  StartPi1Test();
+  StartPi2Test();
+  StartPi1Server();
+  StartPi1Client();
+  StartPi2Client();
 
-  std::thread pi1_server_thread(
-      [&pi1_server_event_loop]() { pi1_server_event_loop.Run(); });
-  std::thread pi1_client_thread(
-      [&pi1_client_event_loop]() { pi1_client_event_loop.Run(); });
-  std::thread pi2_client_thread(
-      [&pi2_client_event_loop]() { pi2_client_event_loop.Run(); });
+  // Confirm both client and server statistics messages have decent offsets in
+  // them.
 
   {
-    FLAGS_application_name = "pi2_message_bridge_server";
-    aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
-    pi2_server_event_loop.SetRuntimeRealtimePriority(1);
-    MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
+    MakePi2Server();
 
-    // Run for 5 seconds to make sure we have time to estimate the offset.
-    aos::TimerHandler *const quit = pi2_server_event_loop.AddTimer(
-        [&pi2_server_event_loop]() { pi2_server_event_loop.Exit(); });
-    pi2_server_event_loop.OnRun([quit, &pi2_server_event_loop]() {
-      // Stop between timestamps, not exactly on them.
-      quit->Setup(pi2_server_event_loop.monotonic_now() +
-                  chrono::milliseconds(3050));
-    });
-
-    // And go!
-    pi2_server_event_loop.Run();
+    RunPi2Server(chrono::milliseconds(3050));
 
     // Now confirm we are synchronized.
     EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
@@ -774,6 +858,8 @@
     EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
               chrono::milliseconds(-1));
     EXPECT_TRUE(pi2_connection->has_boot_uuid());
+
+    StopPi2Server();
   }
 
   std::this_thread::sleep_for(std::chrono::seconds(2));
@@ -796,22 +882,9 @@
   }
 
   {
-    FLAGS_application_name = "pi2_message_bridge_server";
-    aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
-    pi2_server_event_loop.SetRuntimeRealtimePriority(1);
-    MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
+    MakePi2Server();
 
-    // Run for 5 seconds to make sure we have time to estimate the offset.
-    aos::TimerHandler *const quit = pi2_server_event_loop.AddTimer(
-        [&pi2_server_event_loop]() { pi2_server_event_loop.Exit(); });
-    pi2_server_event_loop.OnRun([quit, &pi2_server_event_loop]() {
-      // Stop between timestamps, not exactly on them.
-      quit->Setup(pi2_server_event_loop.monotonic_now() +
-                  chrono::milliseconds(3050));
-    });
-
-    // And go!
-    pi2_server_event_loop.Run();
+    RunPi2Server(chrono::milliseconds(3050));
 
     // And confirm we are synchronized again.
     EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
@@ -837,19 +910,16 @@
     EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
               chrono::milliseconds(-1));
     EXPECT_TRUE(pi2_connection->has_boot_uuid());
+
+    StopPi2Server();
   }
 
   // Shut everyone else down
-  pi1_server_event_loop.Exit();
-  pi1_client_event_loop.Exit();
-  pi2_client_event_loop.Exit();
-  pi1_test_event_loop.Exit();
-  pi2_test_event_loop.Exit();
-  pi1_server_thread.join();
-  pi1_client_thread.join();
-  pi2_client_thread.join();
-  pi1_test_thread.join();
-  pi2_test_thread.join();
+  StopPi1Server();
+  StopPi1Client();
+  StopPi2Client();
+  StopPi1Test();
+  StopPi2Test();
 }
 
 // TODO(austin): The above test confirms that the external state does the right
@@ -867,9 +937,7 @@
 // Tests that when a message is sent before the bridge starts up, but is
 // configured as reliable, we forward it.  Confirm this survives a client reset.
 TEST_F(MessageBridgeTest, ReliableSentBeforeClientStartup) {
-  DoSetShmBase("pi1");
-  // Force ourselves to be "raspberrypi" and allocate everything.
-  FLAGS_override_hostname = "raspberrypi";
+  OnPi1();
 
   FLAGS_application_name = "sender";
   aos::ShmEventLoop send_event_loop(&pi1_config.message());
@@ -880,27 +948,16 @@
       send_event_loop.MakeSender<examples::Ping>("/unreliable");
   SendPing(&unreliable_ping_sender, 1);
 
-  FLAGS_application_name = "pi1_message_bridge_server";
-  aos::ShmEventLoop pi1_server_event_loop(&pi1_config.message());
-  pi1_server_event_loop.SetRuntimeRealtimePriority(1);
-  MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
-
-  FLAGS_application_name = "pi1_message_bridge_client";
-  aos::ShmEventLoop pi1_client_event_loop(&pi1_config.message());
-  pi1_client_event_loop.SetRuntimeRealtimePriority(1);
-  MessageBridgeClient pi1_message_bridge_client(&pi1_client_event_loop);
+  MakePi1Server();
+  MakePi1Client();
 
   FLAGS_application_name = "pi1_timestamp";
   aos::ShmEventLoop pi1_remote_timestamp_event_loop(&pi1_config.message());
 
   // Now do it for "raspberrypi2", the client.
-  DoSetShmBase("pi2");
-  FLAGS_override_hostname = "raspberrypi2";
+  OnPi2();
 
-  FLAGS_application_name = "pi2_message_bridge_server";
-  aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
-  pi2_server_event_loop.SetRuntimeRealtimePriority(1);
-  MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
+  MakePi2Server();
 
   aos::ShmEventLoop receive_event_loop(&pi2_config.message());
   aos::Fetcher<examples::Ping> ping_fetcher =
@@ -930,12 +987,9 @@
   EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
 
   // Spin up the persistant pieces.
-  std::thread pi1_server_thread(
-      [&pi1_server_event_loop]() { pi1_server_event_loop.Run(); });
-  std::thread pi1_client_thread(
-      [&pi1_client_event_loop]() { pi1_client_event_loop.Run(); });
-  std::thread pi2_server_thread(
-      [&pi2_server_event_loop]() { pi2_server_event_loop.Run(); });
+  StartPi1Server();
+  StartPi1Client();
+  StartPi2Server();
 
   // Event used to wait for the timestamp counting thread to start.
   aos::Event event;
@@ -949,22 +1003,9 @@
 
   {
     // Now, spin up a client for 2 seconds.
-    LOG(INFO) << "Starting first pi2 MessageBridgeClient";
-    FLAGS_application_name = "pi2_message_bridge_client";
-    aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
-    pi2_client_event_loop.SetRuntimeRealtimePriority(1);
-    MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
+    MakePi2Client();
 
-    aos::TimerHandler *quit = pi2_client_event_loop.AddTimer(
-        [&pi2_client_event_loop]() { pi2_client_event_loop.Exit(); });
-    pi2_client_event_loop.OnRun([quit, &pi2_client_event_loop]() {
-      // Stop between timestamps, not exactly on them.
-      quit->Setup(pi2_client_event_loop.monotonic_now() +
-                  chrono::milliseconds(2050));
-    });
-
-    // And go!
-    pi2_client_event_loop.Run();
+    RunPi2Client(chrono::milliseconds(2050));
 
     // Confirm there is no detected duplicate packet.
     EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
@@ -976,27 +1017,15 @@
     EXPECT_TRUE(ping_fetcher.Fetch());
     EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
     EXPECT_EQ(ping_timestamp_count, 1);
-    LOG(INFO) << "Shutting down first pi2 MessageBridgeClient";
+
+    StopPi2Client();
   }
 
   {
-    // Now, spin up a second client for 2 seconds.
-    LOG(INFO) << "Starting second pi2 MessageBridgeClient";
-    FLAGS_application_name = "pi2_message_bridge_client";
-    aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
-    pi2_client_event_loop.SetRuntimeRealtimePriority(1);
-    MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
+    // Now, spin up a client for 2 seconds.
+    MakePi2Client();
 
-    aos::TimerHandler *quit = pi2_client_event_loop.AddTimer(
-        [&pi2_client_event_loop]() { pi2_client_event_loop.Exit(); });
-    pi2_client_event_loop.OnRun([quit, &pi2_client_event_loop]() {
-      // Stop between timestamps, not exactly on them.
-      quit->Setup(pi2_client_event_loop.monotonic_now() +
-                  chrono::milliseconds(5050));
-    });
-
-    // And go!
-    pi2_client_event_loop.Run();
+    RunPi2Client(chrono::milliseconds(5050));
 
     // Confirm we detect the duplicate packet correctly.
     EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
@@ -1008,17 +1037,16 @@
     EXPECT_EQ(ping_timestamp_count, 1);
     EXPECT_FALSE(ping_fetcher.Fetch());
     EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
+
+    StopPi2Client();
   }
 
   // Shut everyone else down
-  pi1_server_event_loop.Exit();
-  pi1_client_event_loop.Exit();
-  pi2_server_event_loop.Exit();
+  StopPi1Client();
+  StopPi2Server();
   pi1_remote_timestamp_event_loop.Exit();
   pi1_remote_timestamp_thread.join();
-  pi1_server_thread.join();
-  pi1_client_thread.join();
-  pi2_server_thread.join();
+  StopPi1Server();
 }
 
 // Tests that when a message is sent before the bridge starts up, but is
@@ -1026,18 +1054,10 @@
 // resets.
 TEST_F(MessageBridgeTest, ReliableSentBeforeServerStartup) {
   // Now do it for "raspberrypi2", the client.
-  DoSetShmBase("pi2");
-  FLAGS_override_hostname = "raspberrypi2";
+  OnPi2();
 
-  FLAGS_application_name = "pi2_message_bridge_server";
-  aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
-  pi2_server_event_loop.SetRuntimeRealtimePriority(1);
-  MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
-
-  FLAGS_application_name = "pi2_message_bridge_client";
-  aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
-  pi2_client_event_loop.SetRuntimeRealtimePriority(1);
-  MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
+  MakePi2Server();
+  MakePi2Client();
 
   aos::ShmEventLoop receive_event_loop(&pi2_config.message());
   aos::Fetcher<examples::Ping> ping_fetcher =
@@ -1047,9 +1067,8 @@
   aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
       receive_event_loop.MakeFetcher<ClientStatistics>("/pi2/aos");
 
-  DoSetShmBase("pi1");
   // Force ourselves to be "raspberrypi" and allocate everything.
-  FLAGS_override_hostname = "raspberrypi";
+  OnPi1();
 
   FLAGS_application_name = "sender";
   aos::ShmEventLoop send_event_loop(&pi1_config.message());
@@ -1063,10 +1082,7 @@
     builder.Send(ping_builder.Finish());
   }
 
-  FLAGS_application_name = "pi1_message_bridge_client";
-  aos::ShmEventLoop pi1_client_event_loop(&pi1_config.message());
-  pi1_client_event_loop.SetRuntimeRealtimePriority(1);
-  MessageBridgeClient pi1_message_bridge_client(&pi1_client_event_loop);
+  MakePi1Client();
 
   FLAGS_application_name = "pi1_timestamp";
   aos::ShmEventLoop pi1_remote_timestamp_event_loop(&pi1_config.message());
@@ -1091,12 +1107,9 @@
   EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
 
   // Spin up the persistant pieces.
-  std::thread pi1_client_thread(
-      [&pi1_client_event_loop]() { pi1_client_event_loop.Run(); });
-  std::thread pi2_server_thread(
-      [&pi2_server_event_loop]() { pi2_server_event_loop.Run(); });
-  std::thread pi2_client_thread(
-      [&pi2_client_event_loop]() { pi2_client_event_loop.Run(); });
+  StartPi1Client();
+  StartPi2Server();
+  StartPi2Client();
 
   // Event used to wait for the timestamp counting thread to start.
   aos::Event event;
@@ -1110,21 +1123,9 @@
 
   {
     // Now, spin up a server for 2 seconds.
-    FLAGS_application_name = "pi1_message_bridge_server";
-    aos::ShmEventLoop pi1_server_event_loop(&pi1_config.message());
-    pi1_server_event_loop.SetRuntimeRealtimePriority(1);
-    MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
+    MakePi1Server();
 
-    aos::TimerHandler *quit = pi1_server_event_loop.AddTimer(
-        [&pi1_server_event_loop]() { pi1_server_event_loop.Exit(); });
-    pi1_server_event_loop.OnRun([quit, &pi1_server_event_loop]() {
-      // Stop between timestamps, not exactly on them.
-      quit->Setup(pi1_server_event_loop.monotonic_now() +
-                  chrono::milliseconds(2050));
-    });
-
-    // And go!
-    pi1_server_event_loop.Run();
+    RunPi1Server(chrono::milliseconds(2050));
 
     // Confirm there is no detected duplicate packet.
     EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
@@ -1137,25 +1138,15 @@
     EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
     EXPECT_EQ(ping_timestamp_count, 1);
     LOG(INFO) << "Shutting down first pi1 MessageBridgeServer";
+
+    StopPi1Server();
   }
 
   {
     // Now, spin up a second server for 2 seconds.
-    FLAGS_application_name = "pi1_message_bridge_server";
-    aos::ShmEventLoop pi1_server_event_loop(&pi1_config.message());
-    pi1_server_event_loop.SetRuntimeRealtimePriority(1);
-    MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
+    MakePi1Server();
 
-    aos::TimerHandler *quit = pi1_server_event_loop.AddTimer(
-        [&pi1_server_event_loop]() { pi1_server_event_loop.Exit(); });
-    pi1_server_event_loop.OnRun([quit, &pi1_server_event_loop]() {
-      // Stop between timestamps, not exactly on them.
-      quit->Setup(pi1_server_event_loop.monotonic_now() +
-                  chrono::milliseconds(2050));
-    });
-
-    // And go!
-    pi1_server_event_loop.Run();
+    RunPi1Server(chrono::milliseconds(2050));
 
     // Confirm we detect the duplicate packet correctly.
     EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
@@ -1167,18 +1158,16 @@
     EXPECT_EQ(ping_timestamp_count, 1);
     EXPECT_FALSE(ping_fetcher.Fetch());
     EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
-    LOG(INFO) << "Shutting down first pi1 MessageBridgeServer";
+
+    StopPi1Server();
   }
 
   // Shut everyone else down
-  pi1_client_event_loop.Exit();
-  pi2_server_event_loop.Exit();
-  pi2_client_event_loop.Exit();
+  StopPi1Client();
+  StopPi2Server();
+  StopPi2Client();
   pi1_remote_timestamp_event_loop.Exit();
   pi1_remote_timestamp_thread.join();
-  pi1_client_thread.join();
-  pi2_server_thread.join();
-  pi2_client_thread.join();
 }
 
 }  // namespace testing