Support disconnecting nodes.

We have a case where message bridge client failed to connect on only one
of the nodes.  This resulted in an unreadable log file.  Add the ability
for simulated event loop to recreate this situation.

Change-Id: If471f1690fad45d905c6299324f514ab86355805
diff --git a/aos/events/simulated_event_loop_test.cc b/aos/events/simulated_event_loop_test.cc
index 3768348..6aedae7 100644
--- a/aos/events/simulated_event_loop_test.cc
+++ b/aos/events/simulated_event_loop_test.cc
@@ -891,6 +891,294 @@
   EXPECT_EQ(remote_timestamps_pi1_on_pi2.count(), 1001);
 }
 
+bool AllConnected(const message_bridge::ServerStatistics *server_statistics) {
+  for (const message_bridge::ServerConnection *connection :
+       *server_statistics->connections()) {
+    if (connection->state() != message_bridge::State::CONNECTED) {
+      return false;
+    }
+  }
+  return true;
+}
+
+bool AllConnectedBut(const message_bridge::ServerStatistics *server_statistics,
+                     std::string_view target) {
+  for (const message_bridge::ServerConnection *connection :
+       *server_statistics->connections()) {
+    if (connection->node()->name()->string_view() == target) {
+      if (connection->state() == message_bridge::State::CONNECTED) {
+        return false;
+      }
+    } else {
+      if (connection->state() != message_bridge::State::CONNECTED) {
+        return false;
+      }
+    }
+  }
+  return true;
+}
+
+bool AllConnected(const message_bridge::ClientStatistics *client_statistics) {
+  for (const message_bridge::ClientConnection *connection :
+       *client_statistics->connections()) {
+    if (connection->state() != message_bridge::State::CONNECTED) {
+      return false;
+    }
+  }
+  return true;
+}
+
+bool AllConnectedBut(const message_bridge::ClientStatistics *client_statistics,
+                     std::string_view target) {
+  for (const message_bridge::ClientConnection *connection :
+       *client_statistics->connections()) {
+    if (connection->node()->name()->string_view() == target) {
+      if (connection->state() == message_bridge::State::CONNECTED) {
+        return false;
+      }
+    } else {
+      if (connection->state() != message_bridge::State::CONNECTED) {
+        return false;
+      }
+    }
+  }
+  return true;
+}
+
+// Test that disconnecting nodes actually disconnects them.
+TEST(SimulatedEventLoopTest, MultinodeDisconnect) {
+  aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+      aos::configuration::ReadConfig(ConfigPrefix() +
+                                     "events/multinode_pingpong_config.json");
+  const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
+  const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
+  const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
+
+  SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
+
+  std::unique_ptr<EventLoop> ping_event_loop =
+      simulated_event_loop_factory.MakeEventLoop("ping", pi1);
+  Ping ping(ping_event_loop.get());
+
+  std::unique_ptr<EventLoop> pong_event_loop =
+      simulated_event_loop_factory.MakeEventLoop("pong", pi2);
+  Pong pong(pong_event_loop.get());
+
+  std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
+      simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
+
+  MessageCounter<examples::Pong> pi2_pong_counter(
+      pi2_pong_counter_event_loop.get(), "/test");
+
+  std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
+      simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
+
+  std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
+      simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
+
+  MessageCounter<examples::Pong> pi1_pong_counter(
+      pi1_pong_counter_event_loop.get(), "/test");
+
+  // Count timestamps.
+  MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
+      pi1_pong_counter_event_loop.get(), "/pi1/aos");
+  MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
+      pi2_pong_counter_event_loop.get(), "/pi1/aos");
+  MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
+      pi3_pong_counter_event_loop.get(), "/pi1/aos");
+  MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
+      pi1_pong_counter_event_loop.get(), "/pi2/aos");
+  MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
+      pi2_pong_counter_event_loop.get(), "/pi2/aos");
+  MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
+      pi1_pong_counter_event_loop.get(), "/pi3/aos");
+  MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
+      pi3_pong_counter_event_loop.get(), "/pi3/aos");
+
+  // Count remote timestamps
+  MessageCounter<RemoteMessage> remote_timestamps_pi2_on_pi1(
+      pi1_pong_counter_event_loop.get(), "/aos/remote_timestamps/pi2");
+  MessageCounter<RemoteMessage> remote_timestamps_pi1_on_pi2(
+      pi2_pong_counter_event_loop.get(), "/aos/remote_timestamps/pi1");
+
+  MessageCounter<message_bridge::ServerStatistics>
+      pi1_server_statistics_counter(pi1_pong_counter_event_loop.get(),
+                                    "/pi1/aos");
+  aos::Fetcher<message_bridge::ServerStatistics> pi1_server_statistics_fetcher =
+      pi1_pong_counter_event_loop
+          ->MakeFetcher<message_bridge::ServerStatistics>("/pi1/aos");
+  aos::Fetcher<message_bridge::ClientStatistics> pi1_client_statistics_fetcher =
+      pi1_pong_counter_event_loop
+          ->MakeFetcher<message_bridge::ClientStatistics>("/pi1/aos");
+
+  MessageCounter<message_bridge::ServerStatistics>
+      pi2_server_statistics_counter(pi2_pong_counter_event_loop.get(),
+                                    "/pi2/aos");
+  aos::Fetcher<message_bridge::ServerStatistics> pi2_server_statistics_fetcher =
+      pi2_pong_counter_event_loop
+          ->MakeFetcher<message_bridge::ServerStatistics>("/pi2/aos");
+  aos::Fetcher<message_bridge::ClientStatistics> pi2_client_statistics_fetcher =
+      pi2_pong_counter_event_loop
+          ->MakeFetcher<message_bridge::ClientStatistics>("/pi2/aos");
+
+  MessageCounter<message_bridge::ServerStatistics>
+      pi3_server_statistics_counter(pi3_pong_counter_event_loop.get(),
+                                    "/pi3/aos");
+  aos::Fetcher<message_bridge::ServerStatistics> pi3_server_statistics_fetcher =
+      pi3_pong_counter_event_loop
+          ->MakeFetcher<message_bridge::ServerStatistics>("/pi3/aos");
+  aos::Fetcher<message_bridge::ClientStatistics> pi3_client_statistics_fetcher =
+      pi3_pong_counter_event_loop
+          ->MakeFetcher<message_bridge::ClientStatistics>("/pi3/aos");
+
+  MessageCounter<message_bridge::ClientStatistics>
+      pi1_client_statistics_counter(pi1_pong_counter_event_loop.get(),
+                                    "/pi1/aos");
+  MessageCounter<message_bridge::ClientStatistics>
+      pi2_client_statistics_counter(pi2_pong_counter_event_loop.get(),
+                                    "/pi2/aos");
+  MessageCounter<message_bridge::ClientStatistics>
+      pi3_client_statistics_counter(pi3_pong_counter_event_loop.get(),
+                                    "/pi3/aos");
+
+  simulated_event_loop_factory.RunFor(chrono::seconds(2) +
+                                      chrono::milliseconds(5));
+
+  EXPECT_EQ(pi1_pong_counter.count(), 201u);
+  EXPECT_EQ(pi2_pong_counter.count(), 201u);
+
+  EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 20u);
+  EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 20u);
+  EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 20u);
+  EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 20u);
+  EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 20u);
+  EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 20u);
+  EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 20u);
+
+  EXPECT_EQ(pi1_server_statistics_counter.count(), 2u);
+  EXPECT_EQ(pi2_server_statistics_counter.count(), 2u);
+  EXPECT_EQ(pi3_server_statistics_counter.count(), 2u);
+
+  EXPECT_EQ(pi1_client_statistics_counter.count(), 20u);
+  EXPECT_EQ(pi2_client_statistics_counter.count(), 20u);
+  EXPECT_EQ(pi3_client_statistics_counter.count(), 20u);
+
+  // Also confirm that remote timestamps are being forwarded correctly.
+  EXPECT_EQ(remote_timestamps_pi2_on_pi1.count(), 221);
+  EXPECT_EQ(remote_timestamps_pi1_on_pi2.count(), 221);
+
+  EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
+  EXPECT_TRUE(AllConnected(pi1_server_statistics_fetcher.get()))
+      << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
+  EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
+  EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
+      << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
+  EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
+  EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
+      << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
+  EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
+  EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
+      << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
+  EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
+  EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
+      << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
+  EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
+  EXPECT_TRUE(AllConnected(pi3_client_statistics_fetcher.get()))
+      << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
+
+  simulated_event_loop_factory.GetNodeEventLoopFactory(pi1)->Disconnect(pi3);
+
+  simulated_event_loop_factory.RunFor(chrono::seconds(2));
+
+  EXPECT_EQ(pi1_pong_counter.count(), 401u);
+  EXPECT_EQ(pi2_pong_counter.count(), 401u);
+
+  EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 40u);
+  EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 40u);
+  EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 20u);
+  EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 40u);
+  EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 40u);
+  EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 40u);
+  EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 40u);
+
+  EXPECT_EQ(pi1_server_statistics_counter.count(), 4u);
+  EXPECT_EQ(pi2_server_statistics_counter.count(), 4u);
+  EXPECT_EQ(pi3_server_statistics_counter.count(), 4u);
+
+  EXPECT_EQ(pi1_client_statistics_counter.count(), 40u);
+  EXPECT_EQ(pi2_client_statistics_counter.count(), 40u);
+  EXPECT_EQ(pi3_client_statistics_counter.count(), 40u);
+
+  // Also confirm that remote timestamps are being forwarded correctly.
+  EXPECT_EQ(remote_timestamps_pi2_on_pi1.count(), 441);
+  EXPECT_EQ(remote_timestamps_pi1_on_pi2.count(), 441);
+
+  EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
+  EXPECT_TRUE(AllConnectedBut(pi1_server_statistics_fetcher.get(), "pi3"))
+      << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
+  EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
+  EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
+      << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
+  EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
+  EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
+      << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
+  EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
+  EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
+      << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
+  EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
+  EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
+      << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
+  EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
+  EXPECT_TRUE(AllConnectedBut(pi3_client_statistics_fetcher.get(), "pi1"))
+      << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
+
+  simulated_event_loop_factory.GetNodeEventLoopFactory(pi1)->Connect(pi3);
+
+  simulated_event_loop_factory.RunFor(chrono::seconds(2));
+
+  EXPECT_EQ(pi1_pong_counter.count(), 601u);
+  EXPECT_EQ(pi2_pong_counter.count(), 601u);
+
+  EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 60u);
+  EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 60u);
+  EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 40u);
+  EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 60u);
+  EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 60u);
+  EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 60u);
+  EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 60u);
+
+  EXPECT_EQ(pi1_server_statistics_counter.count(), 6u);
+  EXPECT_EQ(pi2_server_statistics_counter.count(), 6u);
+  EXPECT_EQ(pi3_server_statistics_counter.count(), 6u);
+
+  EXPECT_EQ(pi1_client_statistics_counter.count(), 60u);
+  EXPECT_EQ(pi2_client_statistics_counter.count(), 60u);
+  EXPECT_EQ(pi3_client_statistics_counter.count(), 60u);
+
+  // Also confirm that remote timestamps are being forwarded correctly.
+  EXPECT_EQ(remote_timestamps_pi2_on_pi1.count(), 661);
+  EXPECT_EQ(remote_timestamps_pi1_on_pi2.count(), 661);
+
+  EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
+  EXPECT_TRUE(AllConnected(pi1_server_statistics_fetcher.get()))
+      << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
+  EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
+  EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
+      << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
+  EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
+  EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
+      << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
+  EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
+  EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
+      << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
+  EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
+  EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
+      << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
+  EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
+  EXPECT_TRUE(AllConnected(pi3_client_statistics_fetcher.get()))
+      << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
+}
+
 // Tests that the time offset having a slope doesn't break the world.
 // SimulatedMessageBridge has enough self consistency CHECK statements to
 // confirm, and we can can also check a message in each direction to make sure