Support disconnecting nodes.
We have a case where message bridge client failed to connect on only one
of the nodes. This resulted in an unreadable log file. Add the ability
for simulated event loop to recreate this situation.
Change-Id: If471f1690fad45d905c6299324f514ab86355805
diff --git a/aos/events/simulated_event_loop_test.cc b/aos/events/simulated_event_loop_test.cc
index 3768348..6aedae7 100644
--- a/aos/events/simulated_event_loop_test.cc
+++ b/aos/events/simulated_event_loop_test.cc
@@ -891,6 +891,294 @@
EXPECT_EQ(remote_timestamps_pi1_on_pi2.count(), 1001);
}
+bool AllConnected(const message_bridge::ServerStatistics *server_statistics) {
+ for (const message_bridge::ServerConnection *connection :
+ *server_statistics->connections()) {
+ if (connection->state() != message_bridge::State::CONNECTED) {
+ return false;
+ }
+ }
+ return true;
+}
+
+bool AllConnectedBut(const message_bridge::ServerStatistics *server_statistics,
+ std::string_view target) {
+ for (const message_bridge::ServerConnection *connection :
+ *server_statistics->connections()) {
+ if (connection->node()->name()->string_view() == target) {
+ if (connection->state() == message_bridge::State::CONNECTED) {
+ return false;
+ }
+ } else {
+ if (connection->state() != message_bridge::State::CONNECTED) {
+ return false;
+ }
+ }
+ }
+ return true;
+}
+
+bool AllConnected(const message_bridge::ClientStatistics *client_statistics) {
+ for (const message_bridge::ClientConnection *connection :
+ *client_statistics->connections()) {
+ if (connection->state() != message_bridge::State::CONNECTED) {
+ return false;
+ }
+ }
+ return true;
+}
+
+bool AllConnectedBut(const message_bridge::ClientStatistics *client_statistics,
+ std::string_view target) {
+ for (const message_bridge::ClientConnection *connection :
+ *client_statistics->connections()) {
+ if (connection->node()->name()->string_view() == target) {
+ if (connection->state() == message_bridge::State::CONNECTED) {
+ return false;
+ }
+ } else {
+ if (connection->state() != message_bridge::State::CONNECTED) {
+ return false;
+ }
+ }
+ }
+ return true;
+}
+
+// Test that disconnecting nodes actually disconnects them.
+TEST(SimulatedEventLoopTest, MultinodeDisconnect) {
+ aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+ aos::configuration::ReadConfig(ConfigPrefix() +
+ "events/multinode_pingpong_config.json");
+ const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
+ const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
+ const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
+
+ SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
+
+ std::unique_ptr<EventLoop> ping_event_loop =
+ simulated_event_loop_factory.MakeEventLoop("ping", pi1);
+ Ping ping(ping_event_loop.get());
+
+ std::unique_ptr<EventLoop> pong_event_loop =
+ simulated_event_loop_factory.MakeEventLoop("pong", pi2);
+ Pong pong(pong_event_loop.get());
+
+ std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
+ simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
+
+ MessageCounter<examples::Pong> pi2_pong_counter(
+ pi2_pong_counter_event_loop.get(), "/test");
+
+ std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
+ simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
+
+ std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
+ simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
+
+ MessageCounter<examples::Pong> pi1_pong_counter(
+ pi1_pong_counter_event_loop.get(), "/test");
+
+ // Count timestamps.
+ MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
+ pi1_pong_counter_event_loop.get(), "/pi1/aos");
+ MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
+ pi2_pong_counter_event_loop.get(), "/pi1/aos");
+ MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
+ pi3_pong_counter_event_loop.get(), "/pi1/aos");
+ MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
+ pi1_pong_counter_event_loop.get(), "/pi2/aos");
+ MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
+ pi2_pong_counter_event_loop.get(), "/pi2/aos");
+ MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
+ pi1_pong_counter_event_loop.get(), "/pi3/aos");
+ MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
+ pi3_pong_counter_event_loop.get(), "/pi3/aos");
+
+ // Count remote timestamps
+ MessageCounter<RemoteMessage> remote_timestamps_pi2_on_pi1(
+ pi1_pong_counter_event_loop.get(), "/aos/remote_timestamps/pi2");
+ MessageCounter<RemoteMessage> remote_timestamps_pi1_on_pi2(
+ pi2_pong_counter_event_loop.get(), "/aos/remote_timestamps/pi1");
+
+ MessageCounter<message_bridge::ServerStatistics>
+ pi1_server_statistics_counter(pi1_pong_counter_event_loop.get(),
+ "/pi1/aos");
+ aos::Fetcher<message_bridge::ServerStatistics> pi1_server_statistics_fetcher =
+ pi1_pong_counter_event_loop
+ ->MakeFetcher<message_bridge::ServerStatistics>("/pi1/aos");
+ aos::Fetcher<message_bridge::ClientStatistics> pi1_client_statistics_fetcher =
+ pi1_pong_counter_event_loop
+ ->MakeFetcher<message_bridge::ClientStatistics>("/pi1/aos");
+
+ MessageCounter<message_bridge::ServerStatistics>
+ pi2_server_statistics_counter(pi2_pong_counter_event_loop.get(),
+ "/pi2/aos");
+ aos::Fetcher<message_bridge::ServerStatistics> pi2_server_statistics_fetcher =
+ pi2_pong_counter_event_loop
+ ->MakeFetcher<message_bridge::ServerStatistics>("/pi2/aos");
+ aos::Fetcher<message_bridge::ClientStatistics> pi2_client_statistics_fetcher =
+ pi2_pong_counter_event_loop
+ ->MakeFetcher<message_bridge::ClientStatistics>("/pi2/aos");
+
+ MessageCounter<message_bridge::ServerStatistics>
+ pi3_server_statistics_counter(pi3_pong_counter_event_loop.get(),
+ "/pi3/aos");
+ aos::Fetcher<message_bridge::ServerStatistics> pi3_server_statistics_fetcher =
+ pi3_pong_counter_event_loop
+ ->MakeFetcher<message_bridge::ServerStatistics>("/pi3/aos");
+ aos::Fetcher<message_bridge::ClientStatistics> pi3_client_statistics_fetcher =
+ pi3_pong_counter_event_loop
+ ->MakeFetcher<message_bridge::ClientStatistics>("/pi3/aos");
+
+ MessageCounter<message_bridge::ClientStatistics>
+ pi1_client_statistics_counter(pi1_pong_counter_event_loop.get(),
+ "/pi1/aos");
+ MessageCounter<message_bridge::ClientStatistics>
+ pi2_client_statistics_counter(pi2_pong_counter_event_loop.get(),
+ "/pi2/aos");
+ MessageCounter<message_bridge::ClientStatistics>
+ pi3_client_statistics_counter(pi3_pong_counter_event_loop.get(),
+ "/pi3/aos");
+
+ simulated_event_loop_factory.RunFor(chrono::seconds(2) +
+ chrono::milliseconds(5));
+
+ EXPECT_EQ(pi1_pong_counter.count(), 201u);
+ EXPECT_EQ(pi2_pong_counter.count(), 201u);
+
+ EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 20u);
+ EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 20u);
+ EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 20u);
+ EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 20u);
+ EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 20u);
+ EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 20u);
+ EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 20u);
+
+ EXPECT_EQ(pi1_server_statistics_counter.count(), 2u);
+ EXPECT_EQ(pi2_server_statistics_counter.count(), 2u);
+ EXPECT_EQ(pi3_server_statistics_counter.count(), 2u);
+
+ EXPECT_EQ(pi1_client_statistics_counter.count(), 20u);
+ EXPECT_EQ(pi2_client_statistics_counter.count(), 20u);
+ EXPECT_EQ(pi3_client_statistics_counter.count(), 20u);
+
+ // Also confirm that remote timestamps are being forwarded correctly.
+ EXPECT_EQ(remote_timestamps_pi2_on_pi1.count(), 221);
+ EXPECT_EQ(remote_timestamps_pi1_on_pi2.count(), 221);
+
+ EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
+ EXPECT_TRUE(AllConnected(pi1_server_statistics_fetcher.get()))
+ << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
+ EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
+ EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
+ << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
+ EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
+ EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
+ << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
+ EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
+ EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
+ << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
+ EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
+ EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
+ << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
+ EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
+ EXPECT_TRUE(AllConnected(pi3_client_statistics_fetcher.get()))
+ << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
+
+ simulated_event_loop_factory.GetNodeEventLoopFactory(pi1)->Disconnect(pi3);
+
+ simulated_event_loop_factory.RunFor(chrono::seconds(2));
+
+ EXPECT_EQ(pi1_pong_counter.count(), 401u);
+ EXPECT_EQ(pi2_pong_counter.count(), 401u);
+
+ EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 40u);
+ EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 40u);
+ EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 20u);
+ EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 40u);
+ EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 40u);
+ EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 40u);
+ EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 40u);
+
+ EXPECT_EQ(pi1_server_statistics_counter.count(), 4u);
+ EXPECT_EQ(pi2_server_statistics_counter.count(), 4u);
+ EXPECT_EQ(pi3_server_statistics_counter.count(), 4u);
+
+ EXPECT_EQ(pi1_client_statistics_counter.count(), 40u);
+ EXPECT_EQ(pi2_client_statistics_counter.count(), 40u);
+ EXPECT_EQ(pi3_client_statistics_counter.count(), 40u);
+
+ // Also confirm that remote timestamps are being forwarded correctly.
+ EXPECT_EQ(remote_timestamps_pi2_on_pi1.count(), 441);
+ EXPECT_EQ(remote_timestamps_pi1_on_pi2.count(), 441);
+
+ EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
+ EXPECT_TRUE(AllConnectedBut(pi1_server_statistics_fetcher.get(), "pi3"))
+ << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
+ EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
+ EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
+ << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
+ EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
+ EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
+ << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
+ EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
+ EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
+ << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
+ EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
+ EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
+ << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
+ EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
+ EXPECT_TRUE(AllConnectedBut(pi3_client_statistics_fetcher.get(), "pi1"))
+ << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
+
+ simulated_event_loop_factory.GetNodeEventLoopFactory(pi1)->Connect(pi3);
+
+ simulated_event_loop_factory.RunFor(chrono::seconds(2));
+
+ EXPECT_EQ(pi1_pong_counter.count(), 601u);
+ EXPECT_EQ(pi2_pong_counter.count(), 601u);
+
+ EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 60u);
+ EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 60u);
+ EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 40u);
+ EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 60u);
+ EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 60u);
+ EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 60u);
+ EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 60u);
+
+ EXPECT_EQ(pi1_server_statistics_counter.count(), 6u);
+ EXPECT_EQ(pi2_server_statistics_counter.count(), 6u);
+ EXPECT_EQ(pi3_server_statistics_counter.count(), 6u);
+
+ EXPECT_EQ(pi1_client_statistics_counter.count(), 60u);
+ EXPECT_EQ(pi2_client_statistics_counter.count(), 60u);
+ EXPECT_EQ(pi3_client_statistics_counter.count(), 60u);
+
+ // Also confirm that remote timestamps are being forwarded correctly.
+ EXPECT_EQ(remote_timestamps_pi2_on_pi1.count(), 661);
+ EXPECT_EQ(remote_timestamps_pi1_on_pi2.count(), 661);
+
+ EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
+ EXPECT_TRUE(AllConnected(pi1_server_statistics_fetcher.get()))
+ << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
+ EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
+ EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
+ << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
+ EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
+ EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
+ << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
+ EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
+ EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
+ << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
+ EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
+ EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
+ << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
+ EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
+ EXPECT_TRUE(AllConnected(pi3_client_statistics_fetcher.get()))
+ << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
+}
+
// Tests that the time offset having a slope doesn't break the world.
// SimulatedMessageBridge has enough self consistency CHECK statements to
// confirm, and we can can also check a message in each direction to make sure