Make message_bridge_test more reliable

We had a bunch of startup and shutdown ordering problems when run a lot.
Use Event to sequence those more reliably.

Change-Id: I6f2a3fe4e17e44b8cf776555638cefcbdbe6aa9b
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/network/message_bridge_test.cc b/aos/network/message_bridge_test.cc
index ccb907b..9106a86 100644
--- a/aos/network/message_bridge_test.cc
+++ b/aos/network/message_bridge_test.cc
@@ -40,6 +40,35 @@
   aos::SetShmBase(ShmBase(node));
 }
 
+// Class to manage starting and stopping a thread with an event loop in it.  The
+// thread is guarenteed to be running before the constructor exits.
+class ThreadedEventLoopRunner {
+ public:
+  ThreadedEventLoopRunner(aos::ShmEventLoop *event_loop)
+      : event_loop_(event_loop), my_thread_([this]() {
+          LOG(INFO) << "Started " << event_loop_->name();
+          event_loop_->OnRun([this]() { event_.Set(); });
+          event_loop_->Run();
+        }) {
+    event_.Wait();
+  }
+
+  ~ThreadedEventLoopRunner() { Exit(); }
+
+  void Exit() {
+    if (my_thread_.joinable()) {
+      event_loop_->Exit();
+      my_thread_.join();
+      my_thread_ = std::thread();
+    }
+  }
+
+ private:
+  aos::Event event_;
+  aos::ShmEventLoop *event_loop_;
+  std::thread my_thread_;
+};
+
 // Parameters to run all the tests with.
 struct Param {
   // The config file to use.
@@ -102,18 +131,12 @@
   }
 
   void StartPi1Server() {
-    pi1_server_thread = std::thread([this]() {
-      LOG(INFO) << "Started pi1_message_bridge_server";
-      pi1_server_event_loop->Run();
-    });
+    pi1_server_thread =
+        std::make_unique<ThreadedEventLoopRunner>(pi1_server_event_loop.get());
   }
 
   void StopPi1Server() {
-    if (pi1_server_thread.joinable()) {
-      pi1_server_event_loop->Exit();
-      pi1_server_thread.join();
-      pi1_server_thread = std::thread();
-    }
+    pi1_server_thread.reset();
     pi1_message_bridge_server.reset();
     pi1_server_event_loop.reset();
   }
@@ -129,16 +152,12 @@
   }
 
   void StartPi1Client() {
-    pi1_client_thread = std::thread([this]() {
-      LOG(INFO) << "Started pi1_message_bridge_client";
-      pi1_client_event_loop->Run();
-    });
+    pi1_client_thread =
+        std::make_unique<ThreadedEventLoopRunner>(pi1_client_event_loop.get());
   }
 
   void StopPi1Client() {
-    pi1_client_event_loop->Exit();
-    pi1_client_thread.join();
-    pi1_client_thread = std::thread();
+    pi1_client_thread.reset();
     pi1_message_bridge_client.reset();
     pi1_client_event_loop.reset();
   }
@@ -172,16 +191,11 @@
   }
 
   void StartPi1Test() {
-    pi1_test_thread = std::thread([this]() {
-      LOG(INFO) << "Started pi1_test";
-      pi1_test_event_loop->Run();
-    });
+    pi1_test_thread =
+        std::make_unique<ThreadedEventLoopRunner>(pi1_test_event_loop.get());
   }
 
-  void StopPi1Test() {
-    pi1_test_event_loop->Exit();
-    pi1_test_thread.join();
-  }
+  void StopPi1Test() { pi1_test_thread.reset(); }
 
   void MakePi2Server() {
     OnPi2();
@@ -206,18 +220,12 @@
   }
 
   void StartPi2Server() {
-    pi2_server_thread = std::thread([this]() {
-      LOG(INFO) << "Started pi2_message_bridge_server";
-      pi2_server_event_loop->Run();
-    });
+    pi2_server_thread =
+        std::make_unique<ThreadedEventLoopRunner>(pi2_server_event_loop.get());
   }
 
   void StopPi2Server() {
-    if (pi2_server_thread.joinable()) {
-      pi2_server_event_loop->Exit();
-      pi2_server_thread.join();
-      pi2_server_thread = std::thread();
-    }
+    pi2_server_thread.reset();
     pi2_message_bridge_server.reset();
     pi2_server_event_loop.reset();
   }
@@ -246,18 +254,12 @@
   }
 
   void StartPi2Client() {
-    pi2_client_thread = std::thread([this]() {
-      LOG(INFO) << "Started pi2_message_bridge_client";
-      pi2_client_event_loop->Run();
-    });
+    pi2_client_thread =
+        std::make_unique<ThreadedEventLoopRunner>(pi2_client_event_loop.get());
   }
 
   void StopPi2Client() {
-    if (pi2_client_thread.joinable()) {
-      pi2_client_event_loop->Exit();
-      pi2_client_thread.join();
-      pi2_client_thread = std::thread();
-    }
+    pi2_client_thread.reset();
     pi2_message_bridge_client.reset();
     pi2_client_event_loop.reset();
   }
@@ -291,16 +293,11 @@
   }
 
   void StartPi2Test() {
-    pi2_test_thread = std::thread([this]() {
-      LOG(INFO) << "Started pi2_message_bridge_test";
-      pi2_test_event_loop->Run();
-    });
+    pi2_test_thread =
+        std::make_unique<ThreadedEventLoopRunner>(pi2_test_event_loop.get());
   }
 
-  void StopPi2Test() {
-    pi2_test_event_loop->Exit();
-    pi2_test_thread.join();
-  }
+  void StopPi2Test() { pi2_test_thread.reset(); }
 
   aos::FlatbufferDetachedBuffer<aos::Configuration> config;
   std::string config_sha256;
@@ -310,25 +307,25 @@
 
   std::unique_ptr<aos::ShmEventLoop> pi1_server_event_loop;
   std::unique_ptr<MessageBridgeServer> pi1_message_bridge_server;
-  std::thread pi1_server_thread;
+  std::unique_ptr<ThreadedEventLoopRunner> pi1_server_thread;
 
   std::unique_ptr<aos::ShmEventLoop> pi1_client_event_loop;
   std::unique_ptr<MessageBridgeClient> pi1_message_bridge_client;
-  std::thread pi1_client_thread;
+  std::unique_ptr<ThreadedEventLoopRunner> pi1_client_thread;
 
   std::unique_ptr<aos::ShmEventLoop> pi1_test_event_loop;
-  std::thread pi1_test_thread;
+  std::unique_ptr<ThreadedEventLoopRunner> pi1_test_thread;
 
   std::unique_ptr<aos::ShmEventLoop> pi2_server_event_loop;
   std::unique_ptr<MessageBridgeServer> pi2_message_bridge_server;
-  std::thread pi2_server_thread;
+  std::unique_ptr<ThreadedEventLoopRunner> pi2_server_thread;
 
   std::unique_ptr<aos::ShmEventLoop> pi2_client_event_loop;
   std::unique_ptr<MessageBridgeClient> pi2_message_bridge_client;
-  std::thread pi2_client_thread;
+  std::unique_ptr<ThreadedEventLoopRunner> pi2_client_thread;
 
   std::unique_ptr<aos::ShmEventLoop> pi2_test_event_loop;
-  std::thread pi2_test_thread;
+  std::unique_ptr<ThreadedEventLoopRunner> pi2_test_thread;
 };
 
 // Test that we can send a ping message over sctp and receive it.
@@ -498,7 +495,14 @@
                       chrono::nanoseconds(connection->connected_since_time())),
                   monotonic_clock::now());
       } else {
-        EXPECT_FALSE(connection->has_connection_count());
+        // If we have been connected, we expect the connection count to stay
+        // around.
+        if (pi2_server_statistics_count > 0) {
+          EXPECT_TRUE(connection->has_connection_count());
+          EXPECT_EQ(connection->connection_count(), 1u);
+        } else {
+          EXPECT_FALSE(connection->has_connection_count());
+        }
         EXPECT_FALSE(connection->has_connected_since_time());
       }
     }
@@ -575,7 +579,14 @@
             }
             ++pi2_connected_client_statistics_count;
           } else {
-            EXPECT_FALSE(connection->has_connection_count());
+            if (pi2_connected_client_statistics_count == 0) {
+              EXPECT_FALSE(connection->has_connection_count())
+                  << aos::FlatbufferToJson(&stats);
+            } else {
+              EXPECT_TRUE(connection->has_connection_count())
+                  << aos::FlatbufferToJson(&stats);
+              EXPECT_EQ(connection->connection_count(), 1u);
+            }
             EXPECT_FALSE(connection->has_connected_since_time());
           }
         }
@@ -590,15 +601,6 @@
     VLOG(1) << "/pi2/aos Timestamp " << FlatbufferToJson(&timestamp);
   });
 
-  // Run for 5 seconds to make sure we have time to estimate the offset.
-  aos::TimerHandler *quit = ping_event_loop.AddTimer(
-      [&ping_event_loop]() { ping_event_loop.Exit(); });
-  ping_event_loop.OnRun([quit, &ping_event_loop]() {
-    // Stop between timestamps, not exactly on them.
-    quit->Schedule(ping_event_loop.monotonic_now() +
-                   chrono::milliseconds(5050));
-  });
-
   // Find the channel index for both the /pi1/aos Timestamp channel and Ping
   // channel.
   const size_t pi1_timestamp_channel = configuration::ChannelIndex(
@@ -710,7 +712,8 @@
 
   // Start everything up.  Pong is the only thing we don't know how to wait
   // on, so start it first.
-  std::thread pong_thread([&pong_event_loop]() { pong_event_loop.Run(); });
+  ThreadedEventLoopRunner pong_thread(&pong_event_loop);
+  ThreadedEventLoopRunner ping_thread(&ping_event_loop);
 
   StartPi1Server();
   StartPi1Client();
@@ -718,20 +721,8 @@
   StartPi2Server();
 
   // And go!
-  ping_event_loop.Run();
-
-  // Shut everyone else down
-  StopPi1Server();
-  StopPi1Client();
-  StopPi2Client();
-  StopPi2Server();
-  pong_event_loop.Exit();
-  pong_thread.join();
-
-  // Make sure we sent something.
-  EXPECT_GE(ping_count, 1);
-  // And got something back.
-  EXPECT_GE(pong_count, 1);
+  // Run for 5 seconds to make sure we have time to estimate the offset.
+  std::this_thread::sleep_for(chrono::milliseconds(5050));
 
   // Confirm that we are estimating a monotonic offset on the client.
   ASSERT_TRUE(client_statistics_fetcher.Fetch());
@@ -753,6 +744,19 @@
       1000000000)
       << aos::FlatbufferToJson(client_statistics_fetcher.get());
 
+  // Shut everyone else down before confirming everything actually ran.
+  ping_thread.Exit();
+  pong_thread.Exit();
+  StopPi1Server();
+  StopPi1Client();
+  StopPi2Client();
+  StopPi2Server();
+
+  // Make sure we sent something.
+  EXPECT_GE(ping_count, 1);
+  // And got something back.
+  EXPECT_GE(pong_count, 1);
+
   EXPECT_GE(pi1_server_statistics_count, 2);
   EXPECT_GE(pi2_server_statistics_count, 2);
   EXPECT_GE(pi1_client_statistics_count, 2);
@@ -1152,14 +1156,9 @@
   StartPi2Server();
 
   // Event used to wait for the timestamp counting thread to start.
-  aos::Event event;
-  std::thread pi1_remote_timestamp_thread(
-      [&pi1_remote_timestamp_event_loop, &event]() {
-        pi1_remote_timestamp_event_loop.OnRun([&event]() { event.Set(); });
-        pi1_remote_timestamp_event_loop.Run();
-      });
-
-  event.Wait();
+  std::unique_ptr<ThreadedEventLoopRunner> pi1_remote_timestamp_thread =
+      std::make_unique<ThreadedEventLoopRunner>(
+          &pi1_remote_timestamp_event_loop);
 
   {
     // Now, spin up a client for 2 seconds.
@@ -1214,8 +1213,7 @@
   // Shut everyone else down
   StopPi1Client();
   StopPi2Server();
-  pi1_remote_timestamp_event_loop.Exit();
-  pi1_remote_timestamp_thread.join();
+  pi1_remote_timestamp_thread.reset();
   StopPi1Server();
 }
 
@@ -1286,15 +1284,9 @@
   StartPi2Server();
   StartPi2Client();
 
-  // Event used to wait for the timestamp counting thread to start.
-  aos::Event event;
-  std::thread pi1_remote_timestamp_thread(
-      [&pi1_remote_timestamp_event_loop, &event]() {
-        pi1_remote_timestamp_event_loop.OnRun([&event]() { event.Set(); });
-        pi1_remote_timestamp_event_loop.Run();
-      });
-
-  event.Wait();
+  std::unique_ptr<ThreadedEventLoopRunner> pi1_remote_timestamp_thread =
+      std::make_unique<ThreadedEventLoopRunner>(
+          &pi1_remote_timestamp_event_loop);
 
   {
     // Now, spin up a server for 2 seconds.
@@ -1351,8 +1343,7 @@
   StopPi1Client();
   StopPi2Server();
   StopPi2Client();
-  pi1_remote_timestamp_event_loop.Exit();
-  pi1_remote_timestamp_thread.join();
+  pi1_remote_timestamp_thread.reset();
 }
 
 // Test that a client which connects with too big a message gets disconnected