Make message_bridge_test more reliable
We had a bunch of startup and shutdown ordering problems when run a lot.
Use Event to sequence those more reliably.
Change-Id: I6f2a3fe4e17e44b8cf776555638cefcbdbe6aa9b
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/network/message_bridge_test.cc b/aos/network/message_bridge_test.cc
index ccb907b..9106a86 100644
--- a/aos/network/message_bridge_test.cc
+++ b/aos/network/message_bridge_test.cc
@@ -40,6 +40,35 @@
aos::SetShmBase(ShmBase(node));
}
+// Class to manage starting and stopping a thread with an event loop in it. The
+// thread is guarenteed to be running before the constructor exits.
+class ThreadedEventLoopRunner {
+ public:
+ ThreadedEventLoopRunner(aos::ShmEventLoop *event_loop)
+ : event_loop_(event_loop), my_thread_([this]() {
+ LOG(INFO) << "Started " << event_loop_->name();
+ event_loop_->OnRun([this]() { event_.Set(); });
+ event_loop_->Run();
+ }) {
+ event_.Wait();
+ }
+
+ ~ThreadedEventLoopRunner() { Exit(); }
+
+ void Exit() {
+ if (my_thread_.joinable()) {
+ event_loop_->Exit();
+ my_thread_.join();
+ my_thread_ = std::thread();
+ }
+ }
+
+ private:
+ aos::Event event_;
+ aos::ShmEventLoop *event_loop_;
+ std::thread my_thread_;
+};
+
// Parameters to run all the tests with.
struct Param {
// The config file to use.
@@ -102,18 +131,12 @@
}
void StartPi1Server() {
- pi1_server_thread = std::thread([this]() {
- LOG(INFO) << "Started pi1_message_bridge_server";
- pi1_server_event_loop->Run();
- });
+ pi1_server_thread =
+ std::make_unique<ThreadedEventLoopRunner>(pi1_server_event_loop.get());
}
void StopPi1Server() {
- if (pi1_server_thread.joinable()) {
- pi1_server_event_loop->Exit();
- pi1_server_thread.join();
- pi1_server_thread = std::thread();
- }
+ pi1_server_thread.reset();
pi1_message_bridge_server.reset();
pi1_server_event_loop.reset();
}
@@ -129,16 +152,12 @@
}
void StartPi1Client() {
- pi1_client_thread = std::thread([this]() {
- LOG(INFO) << "Started pi1_message_bridge_client";
- pi1_client_event_loop->Run();
- });
+ pi1_client_thread =
+ std::make_unique<ThreadedEventLoopRunner>(pi1_client_event_loop.get());
}
void StopPi1Client() {
- pi1_client_event_loop->Exit();
- pi1_client_thread.join();
- pi1_client_thread = std::thread();
+ pi1_client_thread.reset();
pi1_message_bridge_client.reset();
pi1_client_event_loop.reset();
}
@@ -172,16 +191,11 @@
}
void StartPi1Test() {
- pi1_test_thread = std::thread([this]() {
- LOG(INFO) << "Started pi1_test";
- pi1_test_event_loop->Run();
- });
+ pi1_test_thread =
+ std::make_unique<ThreadedEventLoopRunner>(pi1_test_event_loop.get());
}
- void StopPi1Test() {
- pi1_test_event_loop->Exit();
- pi1_test_thread.join();
- }
+ void StopPi1Test() { pi1_test_thread.reset(); }
void MakePi2Server() {
OnPi2();
@@ -206,18 +220,12 @@
}
void StartPi2Server() {
- pi2_server_thread = std::thread([this]() {
- LOG(INFO) << "Started pi2_message_bridge_server";
- pi2_server_event_loop->Run();
- });
+ pi2_server_thread =
+ std::make_unique<ThreadedEventLoopRunner>(pi2_server_event_loop.get());
}
void StopPi2Server() {
- if (pi2_server_thread.joinable()) {
- pi2_server_event_loop->Exit();
- pi2_server_thread.join();
- pi2_server_thread = std::thread();
- }
+ pi2_server_thread.reset();
pi2_message_bridge_server.reset();
pi2_server_event_loop.reset();
}
@@ -246,18 +254,12 @@
}
void StartPi2Client() {
- pi2_client_thread = std::thread([this]() {
- LOG(INFO) << "Started pi2_message_bridge_client";
- pi2_client_event_loop->Run();
- });
+ pi2_client_thread =
+ std::make_unique<ThreadedEventLoopRunner>(pi2_client_event_loop.get());
}
void StopPi2Client() {
- if (pi2_client_thread.joinable()) {
- pi2_client_event_loop->Exit();
- pi2_client_thread.join();
- pi2_client_thread = std::thread();
- }
+ pi2_client_thread.reset();
pi2_message_bridge_client.reset();
pi2_client_event_loop.reset();
}
@@ -291,16 +293,11 @@
}
void StartPi2Test() {
- pi2_test_thread = std::thread([this]() {
- LOG(INFO) << "Started pi2_message_bridge_test";
- pi2_test_event_loop->Run();
- });
+ pi2_test_thread =
+ std::make_unique<ThreadedEventLoopRunner>(pi2_test_event_loop.get());
}
- void StopPi2Test() {
- pi2_test_event_loop->Exit();
- pi2_test_thread.join();
- }
+ void StopPi2Test() { pi2_test_thread.reset(); }
aos::FlatbufferDetachedBuffer<aos::Configuration> config;
std::string config_sha256;
@@ -310,25 +307,25 @@
std::unique_ptr<aos::ShmEventLoop> pi1_server_event_loop;
std::unique_ptr<MessageBridgeServer> pi1_message_bridge_server;
- std::thread pi1_server_thread;
+ std::unique_ptr<ThreadedEventLoopRunner> pi1_server_thread;
std::unique_ptr<aos::ShmEventLoop> pi1_client_event_loop;
std::unique_ptr<MessageBridgeClient> pi1_message_bridge_client;
- std::thread pi1_client_thread;
+ std::unique_ptr<ThreadedEventLoopRunner> pi1_client_thread;
std::unique_ptr<aos::ShmEventLoop> pi1_test_event_loop;
- std::thread pi1_test_thread;
+ std::unique_ptr<ThreadedEventLoopRunner> pi1_test_thread;
std::unique_ptr<aos::ShmEventLoop> pi2_server_event_loop;
std::unique_ptr<MessageBridgeServer> pi2_message_bridge_server;
- std::thread pi2_server_thread;
+ std::unique_ptr<ThreadedEventLoopRunner> pi2_server_thread;
std::unique_ptr<aos::ShmEventLoop> pi2_client_event_loop;
std::unique_ptr<MessageBridgeClient> pi2_message_bridge_client;
- std::thread pi2_client_thread;
+ std::unique_ptr<ThreadedEventLoopRunner> pi2_client_thread;
std::unique_ptr<aos::ShmEventLoop> pi2_test_event_loop;
- std::thread pi2_test_thread;
+ std::unique_ptr<ThreadedEventLoopRunner> pi2_test_thread;
};
// Test that we can send a ping message over sctp and receive it.
@@ -498,7 +495,14 @@
chrono::nanoseconds(connection->connected_since_time())),
monotonic_clock::now());
} else {
- EXPECT_FALSE(connection->has_connection_count());
+ // If we have been connected, we expect the connection count to stay
+ // around.
+ if (pi2_server_statistics_count > 0) {
+ EXPECT_TRUE(connection->has_connection_count());
+ EXPECT_EQ(connection->connection_count(), 1u);
+ } else {
+ EXPECT_FALSE(connection->has_connection_count());
+ }
EXPECT_FALSE(connection->has_connected_since_time());
}
}
@@ -575,7 +579,14 @@
}
++pi2_connected_client_statistics_count;
} else {
- EXPECT_FALSE(connection->has_connection_count());
+ if (pi2_connected_client_statistics_count == 0) {
+ EXPECT_FALSE(connection->has_connection_count())
+ << aos::FlatbufferToJson(&stats);
+ } else {
+ EXPECT_TRUE(connection->has_connection_count())
+ << aos::FlatbufferToJson(&stats);
+ EXPECT_EQ(connection->connection_count(), 1u);
+ }
EXPECT_FALSE(connection->has_connected_since_time());
}
}
@@ -590,15 +601,6 @@
VLOG(1) << "/pi2/aos Timestamp " << FlatbufferToJson(×tamp);
});
- // Run for 5 seconds to make sure we have time to estimate the offset.
- aos::TimerHandler *quit = ping_event_loop.AddTimer(
- [&ping_event_loop]() { ping_event_loop.Exit(); });
- ping_event_loop.OnRun([quit, &ping_event_loop]() {
- // Stop between timestamps, not exactly on them.
- quit->Schedule(ping_event_loop.monotonic_now() +
- chrono::milliseconds(5050));
- });
-
// Find the channel index for both the /pi1/aos Timestamp channel and Ping
// channel.
const size_t pi1_timestamp_channel = configuration::ChannelIndex(
@@ -710,7 +712,8 @@
// Start everything up. Pong is the only thing we don't know how to wait
// on, so start it first.
- std::thread pong_thread([&pong_event_loop]() { pong_event_loop.Run(); });
+ ThreadedEventLoopRunner pong_thread(&pong_event_loop);
+ ThreadedEventLoopRunner ping_thread(&ping_event_loop);
StartPi1Server();
StartPi1Client();
@@ -718,20 +721,8 @@
StartPi2Server();
// And go!
- ping_event_loop.Run();
-
- // Shut everyone else down
- StopPi1Server();
- StopPi1Client();
- StopPi2Client();
- StopPi2Server();
- pong_event_loop.Exit();
- pong_thread.join();
-
- // Make sure we sent something.
- EXPECT_GE(ping_count, 1);
- // And got something back.
- EXPECT_GE(pong_count, 1);
+ // Run for 5 seconds to make sure we have time to estimate the offset.
+ std::this_thread::sleep_for(chrono::milliseconds(5050));
// Confirm that we are estimating a monotonic offset on the client.
ASSERT_TRUE(client_statistics_fetcher.Fetch());
@@ -753,6 +744,19 @@
1000000000)
<< aos::FlatbufferToJson(client_statistics_fetcher.get());
+ // Shut everyone else down before confirming everything actually ran.
+ ping_thread.Exit();
+ pong_thread.Exit();
+ StopPi1Server();
+ StopPi1Client();
+ StopPi2Client();
+ StopPi2Server();
+
+ // Make sure we sent something.
+ EXPECT_GE(ping_count, 1);
+ // And got something back.
+ EXPECT_GE(pong_count, 1);
+
EXPECT_GE(pi1_server_statistics_count, 2);
EXPECT_GE(pi2_server_statistics_count, 2);
EXPECT_GE(pi1_client_statistics_count, 2);
@@ -1152,14 +1156,9 @@
StartPi2Server();
// Event used to wait for the timestamp counting thread to start.
- aos::Event event;
- std::thread pi1_remote_timestamp_thread(
- [&pi1_remote_timestamp_event_loop, &event]() {
- pi1_remote_timestamp_event_loop.OnRun([&event]() { event.Set(); });
- pi1_remote_timestamp_event_loop.Run();
- });
-
- event.Wait();
+ std::unique_ptr<ThreadedEventLoopRunner> pi1_remote_timestamp_thread =
+ std::make_unique<ThreadedEventLoopRunner>(
+ &pi1_remote_timestamp_event_loop);
{
// Now, spin up a client for 2 seconds.
@@ -1214,8 +1213,7 @@
// Shut everyone else down
StopPi1Client();
StopPi2Server();
- pi1_remote_timestamp_event_loop.Exit();
- pi1_remote_timestamp_thread.join();
+ pi1_remote_timestamp_thread.reset();
StopPi1Server();
}
@@ -1286,15 +1284,9 @@
StartPi2Server();
StartPi2Client();
- // Event used to wait for the timestamp counting thread to start.
- aos::Event event;
- std::thread pi1_remote_timestamp_thread(
- [&pi1_remote_timestamp_event_loop, &event]() {
- pi1_remote_timestamp_event_loop.OnRun([&event]() { event.Set(); });
- pi1_remote_timestamp_event_loop.Run();
- });
-
- event.Wait();
+ std::unique_ptr<ThreadedEventLoopRunner> pi1_remote_timestamp_thread =
+ std::make_unique<ThreadedEventLoopRunner>(
+ &pi1_remote_timestamp_event_loop);
{
// Now, spin up a server for 2 seconds.
@@ -1351,8 +1343,7 @@
StopPi1Client();
StopPi2Server();
StopPi2Client();
- pi1_remote_timestamp_event_loop.Exit();
- pi1_remote_timestamp_thread.join();
+ pi1_remote_timestamp_thread.reset();
}
// Test that a client which connects with too big a message gets disconnected