Make SkipTimingReport and DisableStatistics actually work
New event loops were not respecting the previous SkipTimingReports on
the factory. Close that hole.
DisableStatistics was sending 1 statistics out at the beginning too.
Close that hole too.
I want to add a test that requires the following behavior to stay
working. Let's test that this behavior continues to work as expected as
well in regards to the MessageBridgeServer and MessageBridgeClient and
Timestamp and RemoteMessage messages.
Change-Id: Ic1c51a14f9f76527bf1e8771f5247ba0aa4e94b9
Signed-off-by: Austin Schuh <austin.linux@gmail.com>
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index 11e10a6..121e164 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -1360,8 +1360,38 @@
bridge_->DisableStatistics();
}
+void SimulatedEventLoopFactory::EnableStatistics() {
+ CHECK(bridge_) << ": Can't enable statistics without a message bridge.";
+ bridge_->EnableStatistics();
+}
+
void SimulatedEventLoopFactory::SkipTimingReport() {
CHECK(bridge_) << ": Can't skip timing reports without a message bridge.";
+
+ for (std::unique_ptr<NodeEventLoopFactory> &node : node_factories_) {
+ if (node) {
+ node->SkipTimingReport();
+ }
+ }
+}
+
+void NodeEventLoopFactory::SkipTimingReport() {
+ for (SimulatedEventLoop *event_loop : event_loops_) {
+ event_loop->SkipTimingReport();
+ }
+ skip_timing_report_ = true;
+}
+
+void NodeEventLoopFactory::EnableStatistics() {
+ CHECK(factory_->bridge_)
+ << ": Can't enable statistics without a message bridge.";
+ factory_->bridge_->EnableStatistics(node_);
+}
+
+void NodeEventLoopFactory::DisableStatistics() {
+ CHECK(factory_->bridge_)
+ << ": Can't disable statistics without a message bridge.";
+ factory_->bridge_->DisableStatistics(node_);
}
::std::unique_ptr<EventLoop> NodeEventLoopFactory::MakeEventLoop(
@@ -1376,6 +1406,9 @@
node_, tid));
result->set_name(name);
result->set_send_delay(factory_->send_delay());
+ if (skip_timing_report_) {
+ result->SkipTimingReport();
+ }
VLOG(1) << scheduler_.distributed_now() << " " << NodeName(node())
<< monotonic_now() << " MakeEventLoop(\"" << result->name() << "\")";
diff --git a/aos/events/simulated_event_loop.h b/aos/events/simulated_event_loop.h
index b25f260..bd589d6 100644
--- a/aos/events/simulated_event_loop.h
+++ b/aos/events/simulated_event_loop.h
@@ -120,6 +120,8 @@
// Disables the messages sent by the simulated message gateway.
void DisableStatistics();
+ // Enables the messages sent by the simulated message gateway.
+ void EnableStatistics();
// Calls SkipTimingReport() on all EventLoops used as part of the
// infrastructure. This may improve the performance of long-simulated-duration
@@ -238,11 +240,19 @@
// Resumes forwarding messages.
void Connect(const Node *other);
+ // Disables the messages sent by the simulated message gateway.
+ void DisableStatistics();
+ // Enables the messages sent by the simulated message gateway.
+ void EnableStatistics();
+
private:
friend class SimulatedEventLoopFactory;
NodeEventLoopFactory(EventSchedulerScheduler *scheduler_scheduler,
SimulatedEventLoopFactory *factory, const Node *node);
+ // Skips timing reports on all event loops on this node.
+ void SkipTimingReport();
+
// Helpers to restart.
void ScheduleStartup();
void Startup();
@@ -255,6 +265,8 @@
const Node *const node_;
+ bool skip_timing_report_ = false;
+
std::vector<SimulatedEventLoop *> event_loops_;
std::chrono::nanoseconds realtime_offset_ = std::chrono::seconds(0);
diff --git a/aos/events/simulated_event_loop_test.cc b/aos/events/simulated_event_loop_test.cc
index 6b88ffe..75e3f85 100644
--- a/aos/events/simulated_event_loop_test.cc
+++ b/aos/events/simulated_event_loop_test.cc
@@ -1935,5 +1935,192 @@
EXPECT_NE(pi2_boot_uuid, pi2->boot_uuid());
}
+class SimulatedEventLoopDisconnectTest : public ::testing::Test {
+ public:
+ SimulatedEventLoopDisconnectTest()
+ : config(aos::configuration::ReadConfig(ArtifactPath(
+ "aos/events/multinode_pingpong_test_split_config.json"))),
+ time(configuration::NodesCount(&config.message())),
+ factory(&config.message()) {
+ factory.SetTimeConverter(&time);
+ }
+
+ void VerifyChannels(std::set<const aos::Channel *> statistics_channels,
+ const monotonic_clock::time_point allowable_message_time,
+ std::set<const aos::Node *> empty_nodes) {
+ NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
+ NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
+ std::unique_ptr<aos::EventLoop> pi1_event_loop =
+ pi1->MakeEventLoop("fetcher");
+ std::unique_ptr<aos::EventLoop> pi2_event_loop =
+ pi2->MakeEventLoop("fetcher");
+ for (const aos::Channel *channel : *factory.configuration()->channels()) {
+ if (configuration::ChannelIsReadableOnNode(channel,
+ pi1_event_loop->node())) {
+ std::unique_ptr<aos::RawFetcher> fetcher =
+ pi1_event_loop->MakeRawFetcher(channel);
+ if (statistics_channels.find(channel) == statistics_channels.end() ||
+ empty_nodes.find(pi1_event_loop->node()) != empty_nodes.end()) {
+ EXPECT_FALSE(fetcher->Fetch() &&
+ fetcher->context().monotonic_event_time >
+ allowable_message_time)
+ << ": Found recent message on channel "
+ << configuration::CleanedChannelToString(channel) << " and time "
+ << fetcher->context().monotonic_event_time << " > "
+ << allowable_message_time << " on pi1";
+ } else {
+ EXPECT_TRUE(fetcher->Fetch() &&
+ fetcher->context().monotonic_event_time >=
+ allowable_message_time)
+ << ": Didn't find recent message on channel "
+ << configuration::CleanedChannelToString(channel) << " on pi1";
+ }
+ }
+ if (configuration::ChannelIsReadableOnNode(channel,
+ pi2_event_loop->node())) {
+ std::unique_ptr<aos::RawFetcher> fetcher =
+ pi2_event_loop->MakeRawFetcher(channel);
+ if (statistics_channels.find(channel) == statistics_channels.end() ||
+ empty_nodes.find(pi2_event_loop->node()) != empty_nodes.end()) {
+ EXPECT_FALSE(fetcher->Fetch() &&
+ fetcher->context().monotonic_event_time >
+ allowable_message_time)
+ << ": Found message on channel "
+ << configuration::CleanedChannelToString(channel) << " and time "
+ << fetcher->context().monotonic_event_time << " > "
+ << allowable_message_time << " on pi2";
+ } else {
+ EXPECT_TRUE(fetcher->Fetch() &&
+ fetcher->context().monotonic_event_time >=
+ allowable_message_time)
+ << ": Didn't find message on channel "
+ << configuration::CleanedChannelToString(channel) << " on pi2";
+ }
+ }
+ }
+ }
+
+ aos::FlatbufferDetachedBuffer<aos::Configuration> config;
+
+ message_bridge::TestingTimeConverter time;
+ SimulatedEventLoopFactory factory;
+};
+
+// Tests that if we have message bridge client/server disabled, and timing
+// reports disabled, no messages are sent. Also tests that we can disconnect a
+// node and disable statistics on it and it actually fully disconnects.
+TEST_F(SimulatedEventLoopDisconnectTest, NoMessagesWhenDisabled) {
+ time.StartEqual();
+ factory.SkipTimingReport();
+ factory.DisableStatistics();
+
+ NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
+ NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
+
+ std::unique_ptr<aos::EventLoop> pi1_event_loop =
+ pi1->MakeEventLoop("fetcher");
+ std::unique_ptr<aos::EventLoop> pi2_event_loop =
+ pi2->MakeEventLoop("fetcher");
+
+ factory.RunFor(chrono::milliseconds(100000));
+
+ // Confirm no messages are sent if we've configured them all off.
+ VerifyChannels({}, monotonic_clock::min_time, {});
+
+ // Now, confirm that all the message_bridge channels come back when we
+ // re-enable.
+ factory.EnableStatistics();
+
+ factory.RunFor(chrono::milliseconds(10050));
+
+ // Build up the list of all the messages we expect when we come back.
+ {
+ std::set<const aos::Channel *> statistics_channels;
+ for (const std::pair<std::string_view, const Node *> pi :
+ std::vector<std::pair<std::string_view, const Node *>>{
+ {"/pi1/aos", pi1->node()},
+ {"/pi2/aos", pi1->node()},
+ {"/pi3/aos", pi1->node()}}) {
+ statistics_channels.insert(configuration::GetChannel(
+ factory.configuration(), pi.first, "aos.message_bridge.Timestamp", "",
+ pi.second));
+ statistics_channels.insert(configuration::GetChannel(
+ factory.configuration(), pi.first,
+ "aos.message_bridge.ServerStatistics", "", pi.second));
+ statistics_channels.insert(configuration::GetChannel(
+ factory.configuration(), pi.first,
+ "aos.message_bridge.ClientStatistics", "", pi.second));
+ }
+
+ statistics_channels.insert(configuration::GetChannel(
+ factory.configuration(),
+ "/pi1/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp",
+ "aos.message_bridge.RemoteMessage", "", pi1->node()));
+ statistics_channels.insert(configuration::GetChannel(
+ factory.configuration(),
+ "/pi2/aos/remote_timestamps/pi1/pi2/aos/aos-message_bridge-Timestamp",
+ "aos.message_bridge.RemoteMessage", "", pi2->node()));
+ VerifyChannels(statistics_channels, monotonic_clock::min_time, {});
+ }
+
+ // Now test that we can disable the messages for a single node
+ pi2->DisableStatistics();
+ const aos::monotonic_clock::time_point statistics_disable_time =
+ pi2->monotonic_now();
+ factory.RunFor(chrono::milliseconds(10000));
+
+ // We should see a much smaller set of messages, but should still see messages
+ // forwarded, mainly the timestamp message.
+ {
+ std::set<const aos::Channel *> statistics_channels;
+ for (const std::pair<std::string_view, const Node *> pi :
+ std::vector<std::pair<std::string_view, const Node *>>{
+ {"/pi1/aos", pi1->node()}, {"/pi3/aos", pi1->node()}}) {
+ statistics_channels.insert(configuration::GetChannel(
+ factory.configuration(), pi.first, "aos.message_bridge.Timestamp", "",
+ pi.second));
+ statistics_channels.insert(configuration::GetChannel(
+ factory.configuration(), pi.first,
+ "aos.message_bridge.ServerStatistics", "", pi.second));
+ statistics_channels.insert(configuration::GetChannel(
+ factory.configuration(), pi.first,
+ "aos.message_bridge.ClientStatistics", "", pi.second));
+ }
+
+ statistics_channels.insert(configuration::GetChannel(
+ factory.configuration(),
+ "/pi1/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp",
+ "aos.message_bridge.RemoteMessage", "", pi1->node()));
+ VerifyChannels(statistics_channels, statistics_disable_time, {});
+ }
+
+ // Now, fully disconnect the node. This will completely quiet down pi2.
+ pi1->Disconnect(pi2->node());
+ pi2->Disconnect(pi1->node());
+
+ const aos::monotonic_clock::time_point disconnect_disable_time =
+ pi2->monotonic_now();
+ factory.RunFor(chrono::milliseconds(10000));
+
+ {
+ std::set<const aos::Channel *> statistics_channels;
+ for (const std::pair<std::string_view, const Node *> pi :
+ std::vector<std::pair<std::string_view, const Node *>>{
+ {"/pi1/aos", pi1->node()}, {"/pi3/aos", pi1->node()}}) {
+ statistics_channels.insert(configuration::GetChannel(
+ factory.configuration(), pi.first, "aos.message_bridge.Timestamp", "",
+ pi.second));
+ statistics_channels.insert(configuration::GetChannel(
+ factory.configuration(), pi.first,
+ "aos.message_bridge.ServerStatistics", "", pi.second));
+ statistics_channels.insert(configuration::GetChannel(
+ factory.configuration(), pi.first,
+ "aos.message_bridge.ClientStatistics", "", pi.second));
+ }
+
+ VerifyChannels(statistics_channels, disconnect_disable_time, {pi2->node()});
+ }
+}
+
} // namespace testing
} // namespace aos
diff --git a/aos/events/simulated_network_bridge.cc b/aos/events/simulated_network_bridge.cc
index 339341f..b87182a 100644
--- a/aos/events/simulated_network_bridge.cc
+++ b/aos/events/simulated_network_bridge.cc
@@ -565,6 +565,24 @@
}
}
+void SimulatedMessageBridge::DisableStatistics(const Node *node) {
+ auto it = event_loop_map_.find(node);
+ CHECK(it != event_loop_map_.end());
+ it->second.DisableStatistics();
+}
+
+void SimulatedMessageBridge::EnableStatistics() {
+ for (std::pair<const Node *const, State> &state : event_loop_map_) {
+ state.second.EnableStatistics();
+ }
+}
+
+void SimulatedMessageBridge::EnableStatistics(const Node *node) {
+ auto it = event_loop_map_.find(node);
+ CHECK(it != event_loop_map_.end());
+ it->second.EnableStatistics();
+}
+
void SimulatedMessageBridge::State::SetEventLoop(
std::unique_ptr<aos::EventLoop> loop) {
if (!loop) {
@@ -604,6 +622,9 @@
timestamp_loggers = ChannelTimestampSender(event_loop.get());
server_status = std::make_unique<MessageBridgeServerStatus>(event_loop.get());
+ if (disable_statistics_) {
+ server_status->DisableStatistics();
+ }
{
size_t node_index = 0;
@@ -624,9 +645,6 @@
server_status->SetBootUUID(i, boot_uuids_[i]);
}
}
- if (disable_statistics_) {
- server_status->DisableStatistics();
- }
if (fn_) {
server_status->set_send_data(fn_);
}
diff --git a/aos/events/simulated_network_bridge.h b/aos/events/simulated_network_bridge.h
index c901e7e..231bba1 100644
--- a/aos/events/simulated_network_bridge.h
+++ b/aos/events/simulated_network_bridge.h
@@ -36,6 +36,9 @@
// The messages are the ClientStatistics, ServerStatistics and Timestamp
// messages.
void DisableStatistics();
+ void DisableStatistics(const Node *node);
+ void EnableStatistics();
+ void EnableStatistics(const Node *node);
private:
struct DelayersVector {
@@ -62,6 +65,16 @@
}
}
+ void EnableStatistics() {
+ disable_statistics_ = false;
+ if (server_status) {
+ server_status->EnableStatistics();
+ }
+ if (client_status) {
+ client_status->EnableStatistics();
+ }
+ }
+
void AddSourceDelayer(RawMessageDelayer *delayer) {
source_delayers_.emplace_back(delayer);
}
diff --git a/aos/network/message_bridge_client_status.cc b/aos/network/message_bridge_client_status.cc
index 5236f82..faade70 100644
--- a/aos/network/message_bridge_client_status.cc
+++ b/aos/network/message_bridge_client_status.cc
@@ -63,9 +63,8 @@
statistics_timer_->set_name("statistics");
event_loop_->OnRun([this]() {
if (send_) {
- statistics_timer_->Setup(
- event_loop_->monotonic_now() + chrono::milliseconds(100),
- chrono::milliseconds(100));
+ statistics_timer_->Setup(event_loop_->monotonic_now() + kStatisticsPeriod,
+ kStatisticsPeriod);
}
});
}
@@ -171,9 +170,14 @@
void MessageBridgeClientStatus::DisableStatistics() {
statistics_timer_->Disable();
- // TODO(austin): Re-arm when re-enabled.
send_ = false;
}
+void MessageBridgeClientStatus::EnableStatistics() {
+ send_ = true;
+ statistics_timer_->Setup(event_loop_->monotonic_now() + kStatisticsPeriod,
+ kStatisticsPeriod);
+}
+
} // namespace message_bridge
} // namespace aos
diff --git a/aos/network/message_bridge_client_status.h b/aos/network/message_bridge_client_status.h
index 9b8e07c..d38244a 100644
--- a/aos/network/message_bridge_client_status.h
+++ b/aos/network/message_bridge_client_status.h
@@ -15,6 +15,10 @@
// statistics periodically.
class MessageBridgeClientStatus {
public:
+ // The period in milliseconds at which the client statistics message is
+ // published.
+ static constexpr std::chrono::milliseconds kStatisticsPeriod{100};
+
MessageBridgeClientStatus(aos::EventLoop *event_loop);
MessageBridgeClientStatus(const MessageBridgeClientStatus &) = delete;
@@ -45,6 +49,8 @@
// Disables sending out any statistics messages.
void DisableStatistics();
+ // Enables sending out any statistics messages.
+ void EnableStatistics();
private:
// Sends out the statistics that are continually updated by the
diff --git a/aos/network/message_bridge_server_status.cc b/aos/network/message_bridge_server_status.cc
index a662990..f7553e4 100644
--- a/aos/network/message_bridge_server_status.cc
+++ b/aos/network/message_bridge_server_status.cc
@@ -374,5 +374,11 @@
statistics_timer_->Disable();
}
+void MessageBridgeServerStatus::EnableStatistics() {
+ send_ = true;
+ statistics_timer_->Setup(event_loop_->monotonic_now() + kPingPeriod,
+ kPingPeriod);
+}
+
} // namespace message_bridge
} // namespace aos
diff --git a/aos/network/message_bridge_server_status.h b/aos/network/message_bridge_server_status.h
index e9a3322..c17e00a 100644
--- a/aos/network/message_bridge_server_status.h
+++ b/aos/network/message_bridge_server_status.h
@@ -71,6 +71,8 @@
// Disables sending out any statistics messages.
void DisableStatistics();
+ // Enables sending out any statistics messages.
+ void EnableStatistics();
private:
static constexpr std::chrono::nanoseconds kStatisticsPeriod =