Support disconnecting nodes.
We have a case where message bridge client failed to connect on only one
of the nodes. This resulted in an unreadable log file. Add the ability
for simulated event loop to recreate this situation.
Change-Id: If471f1690fad45d905c6299324f514ab86355805
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index 80ba88c..8621851 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -1061,6 +1061,13 @@
return std::move(result);
}
+void NodeEventLoopFactory::Disconnect(const Node *other) {
+ factory_->bridge_->Disconnect(node_, other);
+}
+void NodeEventLoopFactory::Connect(const Node *other) {
+ factory_->bridge_->Connect(node_, other);
+}
+
void SimulatedEventLoopFactory::RunFor(monotonic_clock::duration duration) {
for (const std::pair<EventLoop *, std::function<void(bool)>> &event_loop :
raw_event_loops_) {
diff --git a/aos/events/simulated_event_loop.h b/aos/events/simulated_event_loop.h
index 40deb43..8494593 100644
--- a/aos/events/simulated_event_loop.h
+++ b/aos/events/simulated_event_loop.h
@@ -110,6 +110,8 @@
void DisableStatistics();
private:
+ friend class NodeEventLoopFactory;
+
const Configuration *const configuration_;
EventSchedulerScheduler scheduler_scheduler_;
// List of event loops to manage running and not running for.
@@ -185,6 +187,13 @@
// worked out better. Don't use this for anything real yet.
void Reboot() { boot_uuid_ = UUID::Random(); }
+ // Stops forwarding messages to the other node, and reports disconnected in
+ // the ServerStatistics message for this node, and the ClientStatistics for
+ // the other node.
+ void Disconnect(const Node *other);
+ // Resumes forwarding messages.
+ void Connect(const Node *other);
+
private:
friend class SimulatedEventLoopFactory;
NodeEventLoopFactory(
diff --git a/aos/events/simulated_event_loop_test.cc b/aos/events/simulated_event_loop_test.cc
index 3768348..6aedae7 100644
--- a/aos/events/simulated_event_loop_test.cc
+++ b/aos/events/simulated_event_loop_test.cc
@@ -891,6 +891,294 @@
EXPECT_EQ(remote_timestamps_pi1_on_pi2.count(), 1001);
}
+bool AllConnected(const message_bridge::ServerStatistics *server_statistics) {
+ for (const message_bridge::ServerConnection *connection :
+ *server_statistics->connections()) {
+ if (connection->state() != message_bridge::State::CONNECTED) {
+ return false;
+ }
+ }
+ return true;
+}
+
+bool AllConnectedBut(const message_bridge::ServerStatistics *server_statistics,
+ std::string_view target) {
+ for (const message_bridge::ServerConnection *connection :
+ *server_statistics->connections()) {
+ if (connection->node()->name()->string_view() == target) {
+ if (connection->state() == message_bridge::State::CONNECTED) {
+ return false;
+ }
+ } else {
+ if (connection->state() != message_bridge::State::CONNECTED) {
+ return false;
+ }
+ }
+ }
+ return true;
+}
+
+bool AllConnected(const message_bridge::ClientStatistics *client_statistics) {
+ for (const message_bridge::ClientConnection *connection :
+ *client_statistics->connections()) {
+ if (connection->state() != message_bridge::State::CONNECTED) {
+ return false;
+ }
+ }
+ return true;
+}
+
+bool AllConnectedBut(const message_bridge::ClientStatistics *client_statistics,
+ std::string_view target) {
+ for (const message_bridge::ClientConnection *connection :
+ *client_statistics->connections()) {
+ if (connection->node()->name()->string_view() == target) {
+ if (connection->state() == message_bridge::State::CONNECTED) {
+ return false;
+ }
+ } else {
+ if (connection->state() != message_bridge::State::CONNECTED) {
+ return false;
+ }
+ }
+ }
+ return true;
+}
+
+// Test that disconnecting nodes actually disconnects them.
+TEST(SimulatedEventLoopTest, MultinodeDisconnect) {
+ aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+ aos::configuration::ReadConfig(ConfigPrefix() +
+ "events/multinode_pingpong_config.json");
+ const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
+ const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
+ const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
+
+ SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
+
+ std::unique_ptr<EventLoop> ping_event_loop =
+ simulated_event_loop_factory.MakeEventLoop("ping", pi1);
+ Ping ping(ping_event_loop.get());
+
+ std::unique_ptr<EventLoop> pong_event_loop =
+ simulated_event_loop_factory.MakeEventLoop("pong", pi2);
+ Pong pong(pong_event_loop.get());
+
+ std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
+ simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
+
+ MessageCounter<examples::Pong> pi2_pong_counter(
+ pi2_pong_counter_event_loop.get(), "/test");
+
+ std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
+ simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
+
+ std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
+ simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
+
+ MessageCounter<examples::Pong> pi1_pong_counter(
+ pi1_pong_counter_event_loop.get(), "/test");
+
+ // Count timestamps.
+ MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
+ pi1_pong_counter_event_loop.get(), "/pi1/aos");
+ MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
+ pi2_pong_counter_event_loop.get(), "/pi1/aos");
+ MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
+ pi3_pong_counter_event_loop.get(), "/pi1/aos");
+ MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
+ pi1_pong_counter_event_loop.get(), "/pi2/aos");
+ MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
+ pi2_pong_counter_event_loop.get(), "/pi2/aos");
+ MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
+ pi1_pong_counter_event_loop.get(), "/pi3/aos");
+ MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
+ pi3_pong_counter_event_loop.get(), "/pi3/aos");
+
+ // Count remote timestamps
+ MessageCounter<RemoteMessage> remote_timestamps_pi2_on_pi1(
+ pi1_pong_counter_event_loop.get(), "/aos/remote_timestamps/pi2");
+ MessageCounter<RemoteMessage> remote_timestamps_pi1_on_pi2(
+ pi2_pong_counter_event_loop.get(), "/aos/remote_timestamps/pi1");
+
+ MessageCounter<message_bridge::ServerStatistics>
+ pi1_server_statistics_counter(pi1_pong_counter_event_loop.get(),
+ "/pi1/aos");
+ aos::Fetcher<message_bridge::ServerStatistics> pi1_server_statistics_fetcher =
+ pi1_pong_counter_event_loop
+ ->MakeFetcher<message_bridge::ServerStatistics>("/pi1/aos");
+ aos::Fetcher<message_bridge::ClientStatistics> pi1_client_statistics_fetcher =
+ pi1_pong_counter_event_loop
+ ->MakeFetcher<message_bridge::ClientStatistics>("/pi1/aos");
+
+ MessageCounter<message_bridge::ServerStatistics>
+ pi2_server_statistics_counter(pi2_pong_counter_event_loop.get(),
+ "/pi2/aos");
+ aos::Fetcher<message_bridge::ServerStatistics> pi2_server_statistics_fetcher =
+ pi2_pong_counter_event_loop
+ ->MakeFetcher<message_bridge::ServerStatistics>("/pi2/aos");
+ aos::Fetcher<message_bridge::ClientStatistics> pi2_client_statistics_fetcher =
+ pi2_pong_counter_event_loop
+ ->MakeFetcher<message_bridge::ClientStatistics>("/pi2/aos");
+
+ MessageCounter<message_bridge::ServerStatistics>
+ pi3_server_statistics_counter(pi3_pong_counter_event_loop.get(),
+ "/pi3/aos");
+ aos::Fetcher<message_bridge::ServerStatistics> pi3_server_statistics_fetcher =
+ pi3_pong_counter_event_loop
+ ->MakeFetcher<message_bridge::ServerStatistics>("/pi3/aos");
+ aos::Fetcher<message_bridge::ClientStatistics> pi3_client_statistics_fetcher =
+ pi3_pong_counter_event_loop
+ ->MakeFetcher<message_bridge::ClientStatistics>("/pi3/aos");
+
+ MessageCounter<message_bridge::ClientStatistics>
+ pi1_client_statistics_counter(pi1_pong_counter_event_loop.get(),
+ "/pi1/aos");
+ MessageCounter<message_bridge::ClientStatistics>
+ pi2_client_statistics_counter(pi2_pong_counter_event_loop.get(),
+ "/pi2/aos");
+ MessageCounter<message_bridge::ClientStatistics>
+ pi3_client_statistics_counter(pi3_pong_counter_event_loop.get(),
+ "/pi3/aos");
+
+ simulated_event_loop_factory.RunFor(chrono::seconds(2) +
+ chrono::milliseconds(5));
+
+ EXPECT_EQ(pi1_pong_counter.count(), 201u);
+ EXPECT_EQ(pi2_pong_counter.count(), 201u);
+
+ EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 20u);
+ EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 20u);
+ EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 20u);
+ EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 20u);
+ EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 20u);
+ EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 20u);
+ EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 20u);
+
+ EXPECT_EQ(pi1_server_statistics_counter.count(), 2u);
+ EXPECT_EQ(pi2_server_statistics_counter.count(), 2u);
+ EXPECT_EQ(pi3_server_statistics_counter.count(), 2u);
+
+ EXPECT_EQ(pi1_client_statistics_counter.count(), 20u);
+ EXPECT_EQ(pi2_client_statistics_counter.count(), 20u);
+ EXPECT_EQ(pi3_client_statistics_counter.count(), 20u);
+
+ // Also confirm that remote timestamps are being forwarded correctly.
+ EXPECT_EQ(remote_timestamps_pi2_on_pi1.count(), 221);
+ EXPECT_EQ(remote_timestamps_pi1_on_pi2.count(), 221);
+
+ EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
+ EXPECT_TRUE(AllConnected(pi1_server_statistics_fetcher.get()))
+ << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
+ EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
+ EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
+ << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
+ EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
+ EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
+ << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
+ EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
+ EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
+ << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
+ EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
+ EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
+ << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
+ EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
+ EXPECT_TRUE(AllConnected(pi3_client_statistics_fetcher.get()))
+ << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
+
+ simulated_event_loop_factory.GetNodeEventLoopFactory(pi1)->Disconnect(pi3);
+
+ simulated_event_loop_factory.RunFor(chrono::seconds(2));
+
+ EXPECT_EQ(pi1_pong_counter.count(), 401u);
+ EXPECT_EQ(pi2_pong_counter.count(), 401u);
+
+ EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 40u);
+ EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 40u);
+ EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 20u);
+ EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 40u);
+ EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 40u);
+ EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 40u);
+ EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 40u);
+
+ EXPECT_EQ(pi1_server_statistics_counter.count(), 4u);
+ EXPECT_EQ(pi2_server_statistics_counter.count(), 4u);
+ EXPECT_EQ(pi3_server_statistics_counter.count(), 4u);
+
+ EXPECT_EQ(pi1_client_statistics_counter.count(), 40u);
+ EXPECT_EQ(pi2_client_statistics_counter.count(), 40u);
+ EXPECT_EQ(pi3_client_statistics_counter.count(), 40u);
+
+ // Also confirm that remote timestamps are being forwarded correctly.
+ EXPECT_EQ(remote_timestamps_pi2_on_pi1.count(), 441);
+ EXPECT_EQ(remote_timestamps_pi1_on_pi2.count(), 441);
+
+ EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
+ EXPECT_TRUE(AllConnectedBut(pi1_server_statistics_fetcher.get(), "pi3"))
+ << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
+ EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
+ EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
+ << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
+ EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
+ EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
+ << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
+ EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
+ EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
+ << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
+ EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
+ EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
+ << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
+ EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
+ EXPECT_TRUE(AllConnectedBut(pi3_client_statistics_fetcher.get(), "pi1"))
+ << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
+
+ simulated_event_loop_factory.GetNodeEventLoopFactory(pi1)->Connect(pi3);
+
+ simulated_event_loop_factory.RunFor(chrono::seconds(2));
+
+ EXPECT_EQ(pi1_pong_counter.count(), 601u);
+ EXPECT_EQ(pi2_pong_counter.count(), 601u);
+
+ EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 60u);
+ EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 60u);
+ EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 40u);
+ EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 60u);
+ EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 60u);
+ EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 60u);
+ EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 60u);
+
+ EXPECT_EQ(pi1_server_statistics_counter.count(), 6u);
+ EXPECT_EQ(pi2_server_statistics_counter.count(), 6u);
+ EXPECT_EQ(pi3_server_statistics_counter.count(), 6u);
+
+ EXPECT_EQ(pi1_client_statistics_counter.count(), 60u);
+ EXPECT_EQ(pi2_client_statistics_counter.count(), 60u);
+ EXPECT_EQ(pi3_client_statistics_counter.count(), 60u);
+
+ // Also confirm that remote timestamps are being forwarded correctly.
+ EXPECT_EQ(remote_timestamps_pi2_on_pi1.count(), 661);
+ EXPECT_EQ(remote_timestamps_pi1_on_pi2.count(), 661);
+
+ EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
+ EXPECT_TRUE(AllConnected(pi1_server_statistics_fetcher.get()))
+ << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
+ EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
+ EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
+ << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
+ EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
+ EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
+ << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
+ EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
+ EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
+ << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
+ EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
+ EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
+ << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
+ EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
+ EXPECT_TRUE(AllConnected(pi3_client_statistics_fetcher.get()))
+ << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
+}
+
// Tests that the time offset having a slope doesn't break the world.
// SimulatedMessageBridge has enough self consistency CHECK statements to
// confirm, and we can can also check a message in each direction to make sure
diff --git a/aos/events/simulated_network_bridge.cc b/aos/events/simulated_network_bridge.cc
index 45bc6e7..0b79504 100644
--- a/aos/events/simulated_network_bridge.cc
+++ b/aos/events/simulated_network_bridge.cc
@@ -63,6 +63,14 @@
if (sent_) {
break;
}
+
+ if (server_connection_->state() != State::CONNECTED) {
+ sent_ = true;
+ server_connection_->mutate_dropped_packets(
+ server_connection_->dropped_packets() + 1);
+ continue;
+ }
+
if (fetcher_->context().monotonic_event_time +
send_node_factory_->network_delay() +
send_node_factory_->send_delay() >
@@ -109,6 +117,11 @@
private:
// Acutally sends the message, and reschedules.
void Send() {
+ if (server_connection_->state() != State::CONNECTED) {
+ sent_ = true;
+ Schedule();
+ return;
+ }
// Fill out the send times.
sender_->Send(fetcher_->context().data, fetcher_->context().size,
fetcher_->context().monotonic_event_time,
@@ -379,6 +392,38 @@
}
}
+void SimulatedMessageBridge::Disconnect(const Node *source,
+ const Node *destination) {
+ SetState(source, destination, message_bridge::State::DISCONNECTED);
+}
+
+void SimulatedMessageBridge::Connect(const Node *source,
+ const Node *destination) {
+ SetState(source, destination, message_bridge::State::CONNECTED);
+}
+void SimulatedMessageBridge::SetState(const Node *source,
+ const Node *destination,
+ message_bridge::State state) {
+ auto source_state = event_loop_map_.find(source);
+ CHECK(source_state != event_loop_map_.end());
+
+ ServerConnection *server_connection =
+ source_state->second.server_status.FindServerConnection(destination);
+ if (!server_connection) {
+ return;
+ }
+ server_connection->mutate_state(state);
+
+ auto destination_state = event_loop_map_.find(destination);
+ CHECK(destination_state != event_loop_map_.end());
+ ClientConnection *client_connection =
+ destination_state->second.client_status.GetClientConnection(source);
+ if (!client_connection) {
+ return;
+ }
+ client_connection->mutate_state(state);
+}
+
void SimulatedMessageBridge::DisableStatistics() {
for (std::pair<const Node *const, State> &state : event_loop_map_) {
state.second.server_status.DisableStatistics();
diff --git a/aos/events/simulated_network_bridge.h b/aos/events/simulated_network_bridge.h
index 5130c65..978acf9 100644
--- a/aos/events/simulated_network_bridge.h
+++ b/aos/events/simulated_network_bridge.h
@@ -26,6 +26,11 @@
// for things like the logger.
void DisableForwarding(const Channel *channel);
+ void Disconnect(const Node *source, const Node *other);
+ void Connect(const Node *source, const Node *other);
+ void SetState(const Node *source, const Node *other,
+ message_bridge::State state);
+
// Disables generating and sending the messages which message_gateway sends.
// The messages are the ClientStatistics, ServerStatistics and Timestamp
// messages.