Send *Statistics and Timestamp messages from SimulatedMessageBridge
This makes the simulation environment much closer to reality.
Change-Id: Ie0f44c17d9c9a750363335def1b008cef8c809b0
diff --git a/aos/configuration.cc b/aos/configuration.cc
index 834ab5b..c74234e 100644
--- a/aos/configuration.cc
+++ b/aos/configuration.cc
@@ -510,8 +510,13 @@
std::string_view application_name, const Node *node) {
const std::string_view original_name = name;
std::string mutable_name;
- VLOG(1) << "Looking up { \"name\": \"" << name << "\", \"type\": \"" << type
- << "\" }";
+ if (node != nullptr) {
+ VLOG(1) << "Looking up { \"name\": \"" << name << "\", \"type\": \"" << type
+ << "\" } on " << aos::FlatbufferToJson(node);
+ } else {
+ VLOG(1) << "Looking up { \"name\": \"" << name << "\", \"type\": \"" << type
+ << "\" }";
+ }
// First handle application specific maps. Only do this if we have a matching
// application name, and it has maps.
diff --git a/aos/events/BUILD b/aos/events/BUILD
index 01e256c..5346647 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -307,6 +307,8 @@
":event_loop",
":simple_channel",
"//aos/ipc_lib:index",
+ "//aos/network:message_bridge_client_status",
+ "//aos/network:message_bridge_server_status",
"//aos/util:phased_loop",
"@com_google_absl//absl/container:btree",
],
diff --git a/aos/events/event_loop_param_test.h b/aos/events/event_loop_param_test.h
index faf6361..57d0525 100644
--- a/aos/events/event_loop_param_test.h
+++ b/aos/events/event_loop_param_test.h
@@ -103,16 +103,60 @@
static const std::string kJson = R"config({
"channels": [
{
- "name": "/aos/me",
+ "name": "/me/aos",
"type": "aos.logging.LogMessageFbs",
"source_node": "me"
},
{
- "name": "/aos/them",
+ "name": "/them/aos",
"type": "aos.logging.LogMessageFbs",
"source_node": "them"
},
{
+ "name": "/me/aos",
+ "type": "aos.message_bridge.Timestamp",
+ "source_node": "me",
+ "destination_nodes": [
+ {
+ "name": "them"
+ }
+ ]
+ },
+ {
+ "name": "/them/aos",
+ "type": "aos.message_bridge.Timestamp",
+ "source_node": "them",
+ "destination_nodes": [
+ {
+ "name": "me"
+ }
+ ]
+ },
+ {
+ "name": "/me/aos",
+ "type": "aos.message_bridge.ServerStatistics",
+ "source_node": "me",
+ "frequency": 2
+ },
+ {
+ "name": "/them/aos",
+ "type": "aos.message_bridge.ServerStatistics",
+ "source_node": "them",
+ "frequency": 2
+ },
+ {
+ "name": "/me/aos",
+ "type": "aos.message_bridge.ClientStatistics",
+ "source_node": "me",
+ "frequency": 2
+ },
+ {
+ "name": "/them/aos",
+ "type": "aos.message_bridge.ClientStatistics",
+ "source_node": "them",
+ "frequency": 2
+ },
+ {
"name": "/aos",
"type": "aos.timing.Report",
"source_node": "me"
@@ -146,29 +190,28 @@
"maps": [
{
"match": {
- "name": "/aos",
- "type": "aos.logging.LogMessageFbs",
+ "name": "/aos*",
"source_node": "me"
},
"rename": {
- "name": "/aos/me"
+ "name": "/me/aos"
}
},
{
"match": {
- "name": "/aos",
- "type": "aos.logging.LogMessageFbs",
+ "name": "/aos*",
"source_node": "them"
},
"rename": {
- "name": "/aos/them"
+ "name": "/them/aos"
}
}
]
})config";
- flatbuffer_ = FlatbufferDetachedBuffer<Configuration>(
- JsonToFlatbuffer(kJson, Configuration::MiniReflectTypeTable()));
+ flatbuffer_ = configuration::MergeConfiguration(
+ FlatbufferDetachedBuffer<Configuration>(
+ JsonToFlatbuffer(kJson, Configuration::MiniReflectTypeTable())));
my_node_ = configuration::GetNode(&flatbuffer_.message(), my_node);
}
diff --git a/aos/events/logging/BUILD b/aos/events/logging/BUILD
index b5a4377..c3af9e2 100644
--- a/aos/events/logging/BUILD
+++ b/aos/events/logging/BUILD
@@ -131,6 +131,9 @@
flatbuffers = [
"//aos/events:ping_fbs",
"//aos/events:pong_fbs",
+ "//aos/network:message_bridge_client_fbs",
+ "//aos/network:message_bridge_server_fbs",
+ "//aos/network:timestamp_fbs",
],
deps = ["//aos/events:config"],
)
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index eff977b..abab5f6 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -593,6 +593,10 @@
event_loop_factory_->DisableForwarding(remapped_channel);
}
+
+ // If we are replaying a log, we don't want a bunch of redundant messages
+ // from both the real message bridge and simulated message bridge.
+ event_loop_factory_->DisableStatistics();
}
// While we are starting the system up, we might be relying on matching data
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index b6a30cb..7dbbb8e 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -171,6 +171,7 @@
unlink(logfile.c_str());
LOG(INFO) << "Logging data to " << logfile;
+ ping_.set_quiet(true);
{
DetachedBufferWriter writer(logfile);
@@ -352,25 +353,25 @@
// Timing reports, pings
EXPECT_THAT(CountChannelsData(logfiles_[0]),
- ::testing::ElementsAre(::testing::Pair(1, 40),
- ::testing::Pair(4, 2001)));
+ ::testing::ElementsAre(::testing::Pair(4, 40),
+ ::testing::Pair(10, 2001)));
// Timestamps for pong
EXPECT_THAT(CountChannelsTimestamp(logfiles_[0]),
- ::testing::ElementsAre(::testing::Pair(5, 2001)));
+ ::testing::ElementsAre(::testing::Pair(11, 2001)));
// Pong data.
EXPECT_THAT(CountChannelsData(logfiles_[1]),
- ::testing::ElementsAre(::testing::Pair(5, 2001)));
+ ::testing::ElementsAre(::testing::Pair(11, 2001)));
// No timestamps
EXPECT_THAT(CountChannelsTimestamp(logfiles_[1]), ::testing::ElementsAre());
// Timing reports and pongs.
EXPECT_THAT(CountChannelsData(logfiles_[2]),
- ::testing::ElementsAre(::testing::Pair(3, 40),
- ::testing::Pair(5, 2001)));
+ ::testing::ElementsAre(::testing::Pair(9, 40),
+ ::testing::Pair(11, 2001)));
// And ping timestamps.
EXPECT_THAT(CountChannelsTimestamp(logfiles_[2]),
- ::testing::ElementsAre(::testing::Pair(4, 2001)));
+ ::testing::ElementsAre(::testing::Pair(10, 2001)));
}
LogReader reader({std::vector<std::string>{logfiles_[0]},
diff --git a/aos/events/logging/multinode_pingpong.json b/aos/events/logging/multinode_pingpong.json
index 1b1424e..a78b42c 100644
--- a/aos/events/logging/multinode_pingpong.json
+++ b/aos/events/logging/multinode_pingpong.json
@@ -33,6 +33,54 @@
"num_senders": 20,
"max_size": 2048
},
+ {
+ "name": "/pi1/aos",
+ "type": "aos.message_bridge.ServerStatistics",
+ "logger": "NOT_LOGGED",
+ "source_node": "pi1"
+ },
+ {
+ "name": "/pi2/aos",
+ "type": "aos.message_bridge.ServerStatistics",
+ "logger": "NOT_LOGGED",
+ "source_node": "pi2"
+ },
+ {
+ "name": "/pi1/aos",
+ "type": "aos.message_bridge.ClientStatistics",
+ "logger": "NOT_LOGGED",
+ "source_node": "pi1"
+ },
+ {
+ "name": "/pi2/aos",
+ "type": "aos.message_bridge.ClientStatistics",
+ "logger": "NOT_LOGGED",
+ "source_node": "pi2"
+ },
+ {
+ "name": "/pi1/aos",
+ "type": "aos.message_bridge.Timestamp",
+ "logger": "NOT_LOGGED",
+ "source_node": "pi1",
+ "destination_nodes": [
+ {
+ "name": "pi2",
+ "timestamp_logger": "NOT_LOGGED"
+ }
+ ]
+ },
+ {
+ "name": "/pi2/aos",
+ "type": "aos.message_bridge.Timestamp",
+ "logger": "NOT_LOGGED",
+ "source_node": "pi2",
+ "destination_nodes": [
+ {
+ "name": "pi1",
+ "timestamp_logger": "NOT_LOGGED"
+ }
+ ]
+ },
/* Forwarded to pi2 */
{
"name": "/test",
diff --git a/aos/events/ping_lib.cc b/aos/events/ping_lib.cc
index 9bc2446..7a0bfec 100644
--- a/aos/events/ping_lib.cc
+++ b/aos/events/ping_lib.cc
@@ -49,14 +49,14 @@
const chrono::nanoseconds round_trip_time =
monotonic_now - monotonic_send_time;
- if (last_pong_value_ + 1 != pong.value()) {
+ if (last_pong_value_ + 1 != pong.value() && (!quiet_ || VLOG_IS_ON(1))) {
LOG(WARNING) << "Pong message lost";
}
if (pong.value() == count_) {
VLOG(1) << "Elapsed time " << round_trip_time.count() << " ns "
<< FlatbufferToJson(&pong);
- } else {
+ } else if (!quiet_ || VLOG_IS_ON(1)) {
LOG(WARNING) << "Missmatched pong message, got " << FlatbufferToJson(&pong)
<< " expected " << count_;
}
diff --git a/aos/events/ping_lib.h b/aos/events/ping_lib.h
index e14c3f2..6107494 100644
--- a/aos/events/ping_lib.h
+++ b/aos/events/ping_lib.h
@@ -14,6 +14,8 @@
public:
Ping(EventLoop *event_loop);
+ void set_quiet(bool quiet) { quiet_ = quiet; }
+
private:
// Sends out the ping message with an incrementing count.
void SendPing();
@@ -29,6 +31,8 @@
int count_ = 0;
// Last pong value received so we can detect missed pongs.
int last_pong_value_ = 0;
+
+ bool quiet_ = false;
};
} // namespace aos
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index c339ce0..b2e54bb 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -986,7 +986,13 @@
}
void SimulatedEventLoopFactory::DisableForwarding(const Channel *channel) {
+ CHECK(bridge_) << ": Can't disable forwarding without a message bridge.";
bridge_->DisableForwarding(channel);
}
+void SimulatedEventLoopFactory::DisableStatistics() {
+ CHECK(bridge_) << ": Can't disable statistics without a message bridge.";
+ bridge_->DisableStatistics();
+}
+
} // namespace aos
diff --git a/aos/events/simulated_event_loop.h b/aos/events/simulated_event_loop.h
index de19b11..bc2721e 100644
--- a/aos/events/simulated_event_loop.h
+++ b/aos/events/simulated_event_loop.h
@@ -105,6 +105,9 @@
// for things like the logger.
void DisableForwarding(const Channel *channel);
+ // Disables the messages sent by the simulated message gateway.
+ void DisableStatistics();
+
private:
const Configuration *const configuration_;
EventSchedulerScheduler scheduler_scheduler_;
diff --git a/aos/events/simulated_event_loop_test.cc b/aos/events/simulated_event_loop_test.cc
index 78c0d44..44ba3e5 100644
--- a/aos/events/simulated_event_loop_test.cc
+++ b/aos/events/simulated_event_loop_test.cc
@@ -6,6 +6,9 @@
#include "aos/events/ping_lib.h"
#include "aos/events/pong_lib.h"
#include "aos/events/test_message_generated.h"
+#include "aos/network/message_bridge_client_generated.h"
+#include "aos/network/message_bridge_server_generated.h"
+#include "aos/network/timestamp_generated.h"
#include "gtest/gtest.h"
namespace aos {
@@ -250,13 +253,28 @@
0.0);
}
-// Tests that ping and pong work when on 2 different nodes.
+template <typename T>
+class MessageCounter {
+ public:
+ MessageCounter(aos::EventLoop *event_loop, std::string_view name) {
+ event_loop->MakeNoArgWatcher<T>(name, [this]() { ++count_; });
+ }
+
+ size_t count() const { return count_; }
+
+ private:
+ size_t count_ = 0;
+};
+
+// Tests that ping and pong work when on 2 different nodes, and the message
+// gateway messages are sent out as expected.
TEST(SimulatedEventLoopTest, MultinodePingPong) {
aos::FlatbufferDetachedBuffer<aos::Configuration> config =
aos::configuration::ReadConfig(
"aos/events/multinode_pingpong_config.json");
const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
+ const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
@@ -270,24 +288,364 @@
std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
+ MessageCounter<examples::Pong> pi2_pong_counter(
+ pi2_pong_counter_event_loop.get(), "/test");
- int pi2_pong_count = 0;
- pi2_pong_counter_event_loop->MakeWatcher(
- "/test",
- [&pi2_pong_count](const examples::Pong & /*pong*/) { ++pi2_pong_count; });
+ std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
+ simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
- int pi1_pong_count = 0;
+ MessageCounter<examples::Pong> pi1_pong_counter(
+ pi1_pong_counter_event_loop.get(), "/test");
+
+ // Count timestamps.
+ MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
+ pi1_pong_counter_event_loop.get(), "/pi1/aos");
+ MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
+ pi2_pong_counter_event_loop.get(), "/pi1/aos");
+ MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
+ pi3_pong_counter_event_loop.get(), "/pi1/aos");
+ MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
+ pi1_pong_counter_event_loop.get(), "/pi2/aos");
+ MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
+ pi2_pong_counter_event_loop.get(), "/pi2/aos");
+ MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
+ pi1_pong_counter_event_loop.get(), "/pi3/aos");
+ MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
+ pi3_pong_counter_event_loop.get(), "/pi3/aos");
+
+ // Wait to let timestamp estimation start up before looking for the results.
+ simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
+
+ int pi1_server_statistics_count = 0;
pi1_pong_counter_event_loop->MakeWatcher(
- "/test",
- [&pi1_pong_count](const examples::Pong & /*pong*/) { ++pi1_pong_count; });
+ "/pi1/aos", [&pi1_server_statistics_count](
+ const message_bridge::ServerStatistics &stats) {
+ VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
+ EXPECT_EQ(stats.connections()->size(), 2u);
+ for (const message_bridge::ServerConnection *connection :
+ *stats.connections()) {
+ EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
+ if (connection->node()->name()->string_view() == "pi2") {
+ EXPECT_GT(connection->sent_packets(), 50);
+ } else if (connection->node()->name()->string_view() == "pi3") {
+ EXPECT_GE(connection->sent_packets(), 5);
+ } else {
+ LOG(FATAL) << "Unknown connection";
+ }
+
+ EXPECT_TRUE(connection->has_monotonic_offset());
+ EXPECT_EQ(connection->monotonic_offset(), 0);
+ }
+ ++pi1_server_statistics_count;
+ });
+
+ int pi2_server_statistics_count = 0;
+ pi2_pong_counter_event_loop->MakeWatcher(
+ "/pi2/aos", [&pi2_server_statistics_count](
+ const message_bridge::ServerStatistics &stats) {
+ VLOG(1) << "pi2 ServerStatistics " << FlatbufferToJson(&stats);
+ EXPECT_EQ(stats.connections()->size(), 1u);
+
+ const message_bridge::ServerConnection *connection =
+ stats.connections()->Get(0);
+ EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
+ EXPECT_GT(connection->sent_packets(), 50);
+ EXPECT_TRUE(connection->has_monotonic_offset());
+ EXPECT_EQ(connection->monotonic_offset(), 0);
+ ++pi2_server_statistics_count;
+ });
+
+ int pi3_server_statistics_count = 0;
+ pi3_pong_counter_event_loop->MakeWatcher(
+ "/pi3/aos", [&pi3_server_statistics_count](
+ const message_bridge::ServerStatistics &stats) {
+ VLOG(1) << "pi3 ServerStatistics " << FlatbufferToJson(&stats);
+ EXPECT_EQ(stats.connections()->size(), 1u);
+
+ const message_bridge::ServerConnection *connection =
+ stats.connections()->Get(0);
+ EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
+ EXPECT_GE(connection->sent_packets(), 5);
+ EXPECT_TRUE(connection->has_monotonic_offset());
+ EXPECT_EQ(connection->monotonic_offset(), 0);
+ ++pi3_server_statistics_count;
+ });
+
+ int pi1_client_statistics_count = 0;
+ pi1_pong_counter_event_loop->MakeWatcher(
+ "/pi1/aos", [&pi1_client_statistics_count](
+ const message_bridge::ClientStatistics &stats) {
+ VLOG(1) << "pi1 ClientStatistics " << FlatbufferToJson(&stats);
+ EXPECT_EQ(stats.connections()->size(), 2u);
+
+ for (const message_bridge::ClientConnection *connection :
+ *stats.connections()) {
+ EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
+ if (connection->node()->name()->string_view() == "pi2") {
+ EXPECT_GT(connection->received_packets(), 50);
+ } else if (connection->node()->name()->string_view() == "pi3") {
+ EXPECT_GE(connection->received_packets(), 5);
+ } else {
+ LOG(FATAL) << "Unknown connection";
+ }
+
+ EXPECT_TRUE(connection->has_monotonic_offset());
+ EXPECT_EQ(connection->monotonic_offset(), 150000);
+ }
+ ++pi1_client_statistics_count;
+ });
+
+ int pi2_client_statistics_count = 0;
+ pi2_pong_counter_event_loop->MakeWatcher(
+ "/pi2/aos", [&pi2_client_statistics_count](
+ const message_bridge::ClientStatistics &stats) {
+ VLOG(1) << "pi2 ClientStatistics " << FlatbufferToJson(&stats);
+ EXPECT_EQ(stats.connections()->size(), 1u);
+
+ const message_bridge::ClientConnection *connection =
+ stats.connections()->Get(0);
+ EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
+ EXPECT_GT(connection->received_packets(), 50);
+ EXPECT_TRUE(connection->has_monotonic_offset());
+ EXPECT_EQ(connection->monotonic_offset(), 150000);
+ ++pi2_client_statistics_count;
+ });
+
+ int pi3_client_statistics_count = 0;
+ pi3_pong_counter_event_loop->MakeWatcher(
+ "/pi3/aos", [&pi3_client_statistics_count](
+ const message_bridge::ClientStatistics &stats) {
+ VLOG(1) << "pi3 ClientStatistics " << FlatbufferToJson(&stats);
+ EXPECT_EQ(stats.connections()->size(), 1u);
+
+ const message_bridge::ClientConnection *connection =
+ stats.connections()->Get(0);
+ EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
+ EXPECT_GE(connection->received_packets(), 5);
+ EXPECT_TRUE(connection->has_monotonic_offset());
+ EXPECT_EQ(connection->monotonic_offset(), 150000);
+ ++pi3_client_statistics_count;
+ });
+
+ simulated_event_loop_factory.RunFor(chrono::seconds(10) -
+ chrono::milliseconds(500) +
+ chrono::milliseconds(5));
+
+ EXPECT_EQ(pi1_pong_counter.count(), 1001);
+ EXPECT_EQ(pi2_pong_counter.count(), 1001);
+
+ EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 100);
+ EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 100);
+ EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 100);
+ EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 100);
+ EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 100);
+ EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 100);
+ EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 100);
+
+ EXPECT_EQ(pi1_server_statistics_count, 9);
+ EXPECT_EQ(pi2_server_statistics_count, 9);
+ EXPECT_EQ(pi3_server_statistics_count, 9);
+
+ EXPECT_EQ(pi1_client_statistics_count, 95);
+ EXPECT_EQ(pi2_client_statistics_count, 95);
+ EXPECT_EQ(pi3_client_statistics_count, 95);
+}
+
+// Tests that an offset between nodes can be recovered and shows up in
+// ServerStatistics correctly.
+TEST(SimulatedEventLoopTest, MultinodePingPongWithOffset) {
+ aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+ aos::configuration::ReadConfig(
+ "aos/events/multinode_pingpong_config.json");
+ const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
+ const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
+ const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
+
+ SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
+ NodeEventLoopFactory *pi2_factory =
+ simulated_event_loop_factory.GetNodeEventLoopFactory(pi2);
+
+ constexpr chrono::milliseconds kOffset{1501};
+ pi2_factory->SetDistributedOffset(kOffset, 1.0);
+
+ std::unique_ptr<EventLoop> ping_event_loop =
+ simulated_event_loop_factory.MakeEventLoop("ping", pi1);
+ Ping ping(ping_event_loop.get());
+
+ std::unique_ptr<EventLoop> pong_event_loop =
+ simulated_event_loop_factory.MakeEventLoop("pong", pi2);
+ Pong pong(pong_event_loop.get());
+
+ std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
+ simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
+
+ std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
+ simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
+
+ std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
+ simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
+
+ // Wait to let timestamp estimation start up before looking for the results.
+ simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
+
+ // Confirm the offsets are being recovered correctly.
+ int pi1_server_statistics_count = 0;
+ pi1_pong_counter_event_loop->MakeWatcher(
+ "/pi1/aos", [&pi1_server_statistics_count,
+ kOffset](const message_bridge::ServerStatistics &stats) {
+ VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
+ EXPECT_EQ(stats.connections()->size(), 2u);
+ for (const message_bridge::ServerConnection *connection :
+ *stats.connections()) {
+ EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
+ if (connection->node()->name()->string_view() == "pi2") {
+ EXPECT_EQ(connection->monotonic_offset(),
+ chrono::nanoseconds(kOffset).count());
+ } else if (connection->node()->name()->string_view() == "pi3") {
+ EXPECT_EQ(connection->monotonic_offset(), 0);
+ } else {
+ LOG(FATAL) << "Unknown connection";
+ }
+
+ EXPECT_TRUE(connection->has_monotonic_offset());
+ }
+ ++pi1_server_statistics_count;
+ });
+
+ int pi2_server_statistics_count = 0;
+ pi2_pong_counter_event_loop->MakeWatcher(
+ "/pi2/aos", [&pi2_server_statistics_count,
+ kOffset](const message_bridge::ServerStatistics &stats) {
+ VLOG(1) << "pi2 ServerStatistics " << FlatbufferToJson(&stats);
+ EXPECT_EQ(stats.connections()->size(), 1u);
+
+ const message_bridge::ServerConnection *connection =
+ stats.connections()->Get(0);
+ EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
+ EXPECT_TRUE(connection->has_monotonic_offset());
+ EXPECT_EQ(connection->monotonic_offset(),
+ -chrono::nanoseconds(kOffset).count());
+ ++pi2_server_statistics_count;
+ });
+
+ int pi3_server_statistics_count = 0;
+ pi3_pong_counter_event_loop->MakeWatcher(
+ "/pi3/aos", [&pi3_server_statistics_count](
+ const message_bridge::ServerStatistics &stats) {
+ VLOG(1) << "pi3 ServerStatistics " << FlatbufferToJson(&stats);
+ EXPECT_EQ(stats.connections()->size(), 1u);
+
+ const message_bridge::ServerConnection *connection =
+ stats.connections()->Get(0);
+ EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
+ EXPECT_TRUE(connection->has_monotonic_offset());
+ EXPECT_EQ(connection->monotonic_offset(), 0);
+ ++pi3_server_statistics_count;
+ });
+
+ simulated_event_loop_factory.RunFor(chrono::seconds(10) -
+ chrono::milliseconds(500) +
+ chrono::milliseconds(5));
+
+ EXPECT_EQ(pi1_server_statistics_count, 9);
+ EXPECT_EQ(pi2_server_statistics_count, 9);
+ EXPECT_EQ(pi3_server_statistics_count, 9);
+}
+
+// Test that disabling statistics actually disables them.
+TEST(SimulatedEventLoopTest, MultinodeWithoutStatistics) {
+ aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+ aos::configuration::ReadConfig(
+ "aos/events/multinode_pingpong_config.json");
+ const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
+ const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
+ const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
+
+ SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
+ simulated_event_loop_factory.DisableStatistics();
+
+ std::unique_ptr<EventLoop> ping_event_loop =
+ simulated_event_loop_factory.MakeEventLoop("ping", pi1);
+ Ping ping(ping_event_loop.get());
+
+ std::unique_ptr<EventLoop> pong_event_loop =
+ simulated_event_loop_factory.MakeEventLoop("pong", pi2);
+ Pong pong(pong_event_loop.get());
+
+ std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
+ simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
+
+ MessageCounter<examples::Pong> pi2_pong_counter(
+ pi2_pong_counter_event_loop.get(), "/test");
+
+ std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
+ simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
+
+ std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
+ simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
+
+ MessageCounter<examples::Pong> pi1_pong_counter(
+ pi1_pong_counter_event_loop.get(), "/test");
+
+ // Count timestamps.
+ MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
+ pi1_pong_counter_event_loop.get(), "/pi1/aos");
+ MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
+ pi2_pong_counter_event_loop.get(), "/pi1/aos");
+ MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
+ pi3_pong_counter_event_loop.get(), "/pi1/aos");
+ MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
+ pi1_pong_counter_event_loop.get(), "/pi2/aos");
+ MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
+ pi2_pong_counter_event_loop.get(), "/pi2/aos");
+ MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
+ pi1_pong_counter_event_loop.get(), "/pi3/aos");
+ MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
+ pi3_pong_counter_event_loop.get(), "/pi3/aos");
+
+ MessageCounter<message_bridge::ServerStatistics>
+ pi1_server_statistics_counter(pi1_pong_counter_event_loop.get(),
+ "/pi1/aos");
+ MessageCounter<message_bridge::ServerStatistics>
+ pi2_server_statistics_counter(pi2_pong_counter_event_loop.get(),
+ "/pi2/aos");
+ MessageCounter<message_bridge::ServerStatistics>
+ pi3_server_statistics_counter(pi3_pong_counter_event_loop.get(),
+ "/pi3/aos");
+
+ MessageCounter<message_bridge::ClientStatistics>
+ pi1_client_statistics_counter(pi1_pong_counter_event_loop.get(),
+ "/pi1/aos");
+ MessageCounter<message_bridge::ClientStatistics>
+ pi2_client_statistics_counter(pi2_pong_counter_event_loop.get(),
+ "/pi2/aos");
+ MessageCounter<message_bridge::ClientStatistics>
+ pi3_client_statistics_counter(pi3_pong_counter_event_loop.get(),
+ "/pi3/aos");
simulated_event_loop_factory.RunFor(chrono::seconds(10) +
chrono::milliseconds(5));
- EXPECT_EQ(pi1_pong_count, 1001);
- EXPECT_EQ(pi2_pong_count, 1001);
+ EXPECT_EQ(pi1_pong_counter.count(), 1001u);
+ EXPECT_EQ(pi2_pong_counter.count(), 1001u);
+
+ EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 0u);
+ EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 0u);
+ EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 0u);
+ EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 0u);
+ EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 0u);
+ EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 0u);
+ EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 0u);
+
+ EXPECT_EQ(pi1_server_statistics_counter.count(), 0u);
+ EXPECT_EQ(pi2_server_statistics_counter.count(), 0u);
+ EXPECT_EQ(pi3_server_statistics_counter.count(), 0u);
+
+ EXPECT_EQ(pi1_client_statistics_counter.count(), 0u);
+ EXPECT_EQ(pi2_client_statistics_counter.count(), 0u);
+ EXPECT_EQ(pi3_client_statistics_counter.count(), 0u);
}
} // namespace testing
diff --git a/aos/events/simulated_network_bridge.cc b/aos/events/simulated_network_bridge.cc
index 0ab366a..0f6e8bc 100644
--- a/aos/events/simulated_network_bridge.cc
+++ b/aos/events/simulated_network_bridge.cc
@@ -17,12 +17,18 @@
aos::NodeEventLoopFactory *send_node_factory,
aos::EventLoop *send_event_loop,
std::unique_ptr<aos::RawFetcher> fetcher,
- std::unique_ptr<aos::RawSender> sender)
+ std::unique_ptr<aos::RawSender> sender,
+ ServerConnection *server_connection, int client_index,
+ MessageBridgeClientStatus *client_status)
: fetch_node_factory_(fetch_node_factory),
send_node_factory_(send_node_factory),
send_event_loop_(send_event_loop),
fetcher_(std::move(fetcher)),
- sender_(std::move(sender)) {
+ sender_(std::move(sender)),
+ server_connection_(server_connection),
+ client_status_(client_status),
+ client_index_(client_index),
+ client_connection_(client_status_->GetClientConnection(client_index)) {
timer_ = send_event_loop_->AddTimer([this]() { Send(); });
Schedule();
@@ -55,6 +61,8 @@
<< fetcher_->context().monotonic_event_time << " now is "
<< fetch_node_factory_->monotonic_now();
sent_ = true;
+ server_connection_->mutate_dropped_packets(
+ server_connection_->dropped_packets() + 1);
}
if (fetcher_->context().data == nullptr) {
@@ -72,25 +80,27 @@
CHECK_GE(monotonic_delivered_time, send_node_factory_->monotonic_now())
<< ": Trying to deliver message in the past...";
+ server_connection_->mutate_sent_packets(server_connection_->sent_packets() +
+ 1);
timer_->Setup(monotonic_delivered_time);
}
private:
// Acutally sends the message, and reschedules.
void Send() {
- // Compute the time to publish this message.
- const monotonic_clock::time_point monotonic_delivered_time =
- DeliveredTime(fetcher_->context());
-
- CHECK_EQ(monotonic_delivered_time, send_node_factory_->monotonic_now())
- << ": Message to be sent at the wrong time.";
-
- // And also fill out the send times as well.
+ // Fill out the send times.
sender_->Send(fetcher_->context().data, fetcher_->context().size,
fetcher_->context().monotonic_event_time,
fetcher_->context().realtime_event_time,
fetcher_->context().queue_index);
+ // And simulate message_bridge's offset recovery.
+ client_status_->SampleFilter(client_index_,
+ fetcher_->context().monotonic_event_time,
+ sender_->monotonic_sent_time());
+
+ client_connection_->mutate_received_packets(
+ client_connection_->received_packets() + 1);
sent_ = true;
Schedule();
}
@@ -121,6 +131,11 @@
std::unique_ptr<aos::RawSender> sender_;
// True if we have sent the message in the fetcher.
bool sent_ = false;
+
+ ServerConnection *server_connection_ = nullptr;
+ MessageBridgeClientStatus *client_status_ = nullptr;
+ int client_index_;
+ ClientConnection *client_connection_ = nullptr;
};
SimulatedMessageBridge::SimulatedMessageBridge(
@@ -130,14 +145,35 @@
// Pre-build up event loops for every node. They are pretty cheap anyways.
for (const Node *node : simulated_event_loop_factory->nodes()) {
- auto it = event_loop_map_.insert(
- {node,
- simulated_event_loop_factory->MakeEventLoop("message_bridge", node)});
+ auto it = event_loop_map_.emplace(std::make_pair(
+ node,
+ simulated_event_loop_factory->MakeEventLoop("message_bridge", node)));
CHECK(it.second);
- it.first->second->SkipTimingReport();
- it.first->second->SkipAosLog();
+ it.first->second.event_loop->SkipTimingReport();
+ it.first->second.event_loop->SkipAosLog();
+
+ for (ServerConnection *connection :
+ it.first->second.server_status.server_connection()) {
+ if (connection == nullptr) continue;
+
+ connection->mutate_state(message_bridge::State::CONNECTED);
+ }
+
+ for (size_t i = 0;
+ i < it.first->second.client_status.mutable_client_statistics()
+ ->mutable_connections()
+ ->size();
+ ++i) {
+ ClientConnection *connection =
+ it.first->second.client_status.mutable_client_statistics()
+ ->mutable_connections()
+ ->GetMutableObject(i);
+ if (connection == nullptr) continue;
+
+ connection->mutate_state(message_bridge::State::CONNECTED);
+ }
}
for (const Channel *channel :
@@ -164,23 +200,48 @@
auto destination_event_loop = event_loop_map_.find(destination_node);
CHECK(destination_event_loop != event_loop_map_.end());
+ ServerConnection *server_connection =
+ source_event_loop->second.server_status.FindServerConnection(
+ connection->name()->string_view());
+
+ int client_index =
+ destination_event_loop->second.client_status.FindClientIndex(
+ channel->source_node()->string_view());
+
delayers->emplace_back(std::make_unique<RawMessageDelayer>(
simulated_event_loop_factory->GetNodeEventLoopFactory(node),
simulated_event_loop_factory->GetNodeEventLoopFactory(
destination_node),
- destination_event_loop->second.get(),
- source_event_loop->second->MakeRawFetcher(channel),
- destination_event_loop->second->MakeRawSender(channel)));
+ destination_event_loop->second.event_loop.get(),
+ source_event_loop->second.event_loop->MakeRawFetcher(channel),
+ destination_event_loop->second.event_loop->MakeRawSender(channel),
+ server_connection, client_index,
+ &destination_event_loop->second.client_status));
}
- // And register every delayer to be poked when a new message shows up.
- source_event_loop->second->MakeRawNoArgWatcher(
- channel, [captured_delayers = delayers.get()](const Context &) {
- for (std::unique_ptr<RawMessageDelayer> &delayer :
- *captured_delayers) {
- delayer->Schedule();
- }
- });
+ const Channel *const timestamp_channel = configuration::GetChannel(
+ simulated_event_loop_factory->configuration(), "/aos",
+ Timestamp::GetFullyQualifiedName(),
+ source_event_loop->second.event_loop->name(), node);
+
+ if (channel == timestamp_channel) {
+ source_event_loop->second.server_status.set_send_data(
+ [captured_delayers = delayers.get()](const Context &) {
+ for (std::unique_ptr<RawMessageDelayer> &delayer :
+ *captured_delayers) {
+ delayer->Schedule();
+ }
+ });
+ } else {
+ // And register every delayer to be poked when a new message shows up.
+ source_event_loop->second.event_loop->MakeRawNoArgWatcher(
+ channel, [captured_delayers = delayers.get()](const Context &) {
+ for (std::unique_ptr<RawMessageDelayer> &delayer :
+ *captured_delayers) {
+ delayer->Schedule();
+ }
+ });
+ }
delayers_list_.emplace_back(std::move(delayers));
}
}
@@ -204,5 +265,12 @@
}
}
+void SimulatedMessageBridge::DisableStatistics() {
+ for (std::pair<const Node *const, State> &state : event_loop_map_) {
+ state.second.server_status.DisableStatistics();
+ state.second.client_status.DisableStatistics();
+ }
+}
+
} // namespace message_bridge
} // namespace aos
diff --git a/aos/events/simulated_network_bridge.h b/aos/events/simulated_network_bridge.h
index 7aeef64..5784bb3 100644
--- a/aos/events/simulated_network_bridge.h
+++ b/aos/events/simulated_network_bridge.h
@@ -3,6 +3,8 @@
#include "aos/events/event_loop.h"
#include "aos/events/simulated_event_loop.h"
+#include "aos/network/message_bridge_client_status.h"
+#include "aos/network/message_bridge_server_status.h"
namespace aos {
namespace message_bridge {
@@ -23,10 +25,27 @@
// for things like the logger.
void DisableForwarding(const Channel *channel);
+ // Disables generating and sending the messages which message_gateway sends.
+ // The messages are the ClientStatistics, ServerStatistics and Timestamp
+ // messages.
+ void DisableStatistics();
+
private:
+ struct State {
+ State(std::unique_ptr<aos::EventLoop> &&new_event_loop)
+ : event_loop(std::move(new_event_loop)),
+ server_status(event_loop.get()),
+ client_status(event_loop.get()) {}
+
+ State(const State &state) = delete;
+
+ std::unique_ptr<aos::EventLoop> event_loop;
+ MessageBridgeServerStatus server_status;
+ MessageBridgeClientStatus client_status;
+ };
// Map of nodes to event loops. This is a member variable so that the
// lifetime of the event loops matches the lifetime of the bridge.
- std::map<const Node *, std::unique_ptr<aos::EventLoop>> event_loop_map_;
+ std::map<const Node *, State> event_loop_map_;
// List of delayers used to resend the messages.
using DelayersVector = std::vector<std::unique_ptr<RawMessageDelayer>>;
diff --git a/aos/network/message_bridge_client_status.cc b/aos/network/message_bridge_client_status.cc
index 72b8ee1..e7107ea 100644
--- a/aos/network/message_bridge_client_status.cc
+++ b/aos/network/message_bridge_client_status.cc
@@ -67,6 +67,9 @@
}
void MessageBridgeClientStatus::SendStatistics() {
+ if (!send_) {
+ return;
+ }
// Copy from statistics_ and drop monotonic_offset if it isn't populated yet.
// There doesn't exist a good way to drop fields otherwise.
aos::Sender<ClientStatistics>::Builder builder = sender_.MakeBuilder();
@@ -150,5 +153,7 @@
filter->Sample(monotonic_delivered_time, offset);
}
+void MessageBridgeClientStatus::DisableStatistics() { send_ = false; }
+
} // namespace message_bridge
} // namespace aos
diff --git a/aos/network/message_bridge_client_status.h b/aos/network/message_bridge_client_status.h
index 0653c9d..53bc2dc 100644
--- a/aos/network/message_bridge_client_status.h
+++ b/aos/network/message_bridge_client_status.h
@@ -42,6 +42,9 @@
// Clears out the filter state.
void SampleReset(int client_index) { filters_[client_index].Reset(); }
+ // Disables sending out any statistics messages.
+ void DisableStatistics();
+
private:
// Sends out the statistics that are continually updated by the
// SctpClientConnections.
@@ -62,6 +65,9 @@
std::vector<flatbuffers::Offset<ClientConnection>> client_connection_offsets_;
std::vector<TimestampFilter> filters_;
+
+ // If true, send out the messages.
+ bool send_ = true;
};
} // namespace message_bridge
diff --git a/aos/network/message_bridge_server_status.cc b/aos/network/message_bridge_server_status.cc
index 21badef..d356bca 100644
--- a/aos/network/message_bridge_server_status.cc
+++ b/aos/network/message_bridge_server_status.cc
@@ -135,6 +135,7 @@
}
void MessageBridgeServerStatus::SendStatistics() {
+ if (!send_) return;
aos::Sender<ServerStatistics>::Builder builder = sender_.MakeBuilder();
server_connection_offsets_.clear();
@@ -142,6 +143,10 @@
// Copy the statistics over, but only add monotonic_offset if it is valid.
for (const ServerConnection *connection :
*statistics_.message().connections()) {
+ const int node_index =
+ configuration::GetNodeIndex(event_loop_->configuration(),
+ connection->node()->name()->string_view());
+
flatbuffers::Offset<flatbuffers::String> node_name_offset =
builder.fbb()->CreateString(connection->node()->name()->string_view());
Node::Builder node_builder = builder.MakeBuilder<Node>();
@@ -157,7 +162,7 @@
server_connection_builder.add_sent_packets(connection->sent_packets());
// TODO(austin): If it gets stale, drop it too.
- if (connection->monotonic_offset() != 0) {
+ if (!filters_[node_index].MissingSamples()) {
server_connection_builder.add_monotonic_offset(
connection->monotonic_offset());
}
@@ -307,6 +312,7 @@
// Send it out over shm, and using that timestamp, then send it out over sctp.
// This avoid some context switches.
+ if (!send_) return;
timestamp_sender_.Send(timestamp_copy);
Context context;
@@ -323,5 +329,9 @@
}
}
+void MessageBridgeServerStatus::DisableStatistics() {
+ send_ = false;
+}
+
} // namespace message_bridge
} // namespace aos
diff --git a/aos/network/message_bridge_server_status.h b/aos/network/message_bridge_server_status.h
index adb747d..814fccb 100644
--- a/aos/network/message_bridge_server_status.h
+++ b/aos/network/message_bridge_server_status.h
@@ -28,12 +28,29 @@
std::function<void(const Context &)> send_data =
std::function<void(const Context &)>());
+ MessageBridgeServerStatus(const MessageBridgeServerStatus &) = delete;
+ MessageBridgeServerStatus(MessageBridgeServerStatus &&) = delete;
+ MessageBridgeServerStatus &operator=(const MessageBridgeServerStatus &) =
+ delete;
+ MessageBridgeServerStatus &operator=(MessageBridgeServerStatus &&) = delete;
+
+ void set_send_data(std::function<void(const Context &)> send_data) {
+ send_data_ = send_data;
+ }
+
// Resets the filter and clears the entry from the server statistics.
void ResetFilter(int node_index);
// Returns the ServerConnection message which is updated by the server.
ServerConnection *FindServerConnection(std::string_view node_name);
+ std::vector<ServerConnection *> server_connection() {
+ return server_connection_;
+ }
+
+ // Disables sending out any statistics messages.
+ void DisableStatistics();
+
private:
static constexpr std::chrono::nanoseconds kStatisticsPeriod =
std::chrono::seconds(1);
@@ -73,6 +90,8 @@
aos::monotonic_clock::min_time;
std::function<void(const Context &)> send_data_;
+
+ bool send_ = true;
};