Add simulation support for split RemoteMessage channels
In order to handle simulation sending messages as fast as possible, and
messages sent infrequently with timestamps delivered, we want to split
each channel's RemoteMessage timestamps into a separate channel.
Teach SimulatedNetworkBridge how to use it, and add tests for both the
old and new method.
Change-Id: Ie59f322f3d4dce3219303216707c46e24f9f9ff3
diff --git a/aos/events/BUILD b/aos/events/BUILD
index 9533cb5..a1a4c6e 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -151,20 +151,27 @@
deps = [":config"],
)
-aos_config(
- name = "multinode_pingpong_config",
- src = "multinode_pingpong.json",
- flatbuffers = [
- ":ping_fbs",
- ":pong_fbs",
- "//aos/network:message_bridge_client_fbs",
- "//aos/network:remote_message_fbs",
- "//aos/network:timestamp_fbs",
- "//aos/network:message_bridge_server_fbs",
- ],
- target_compatible_with = ["@platforms//os:linux"],
- deps = [":config"],
-)
+[
+ aos_config(
+ name = config + "_config",
+ src = config + ".json",
+ flatbuffers = [
+ ":ping_fbs",
+ ":pong_fbs",
+ "//aos/network:message_bridge_client_fbs",
+ "//aos/network:remote_message_fbs",
+ "//aos/network:timestamp_fbs",
+ "//aos/network:message_bridge_server_fbs",
+ ],
+ target_compatible_with = ["@platforms//os:linux"],
+ deps = [":config"],
+ )
+ for config in [
+ "multinode_pingpong_test_split",
+ "multinode_pingpong",
+ "multinode_pingpong_test_combined",
+ ]
+]
cc_library(
name = "pong_lib",
@@ -293,7 +300,10 @@
cc_test(
name = "simulated_event_loop_test",
srcs = ["simulated_event_loop_test.cc"],
- data = [":multinode_pingpong_config"],
+ data = [
+ ":multinode_pingpong_test_combined_config",
+ ":multinode_pingpong_test_split_config",
+ ],
shard_count = 4,
target_compatible_with = ["@platforms//os:linux"],
deps = [
@@ -346,6 +356,7 @@
"//aos/network:message_bridge_client_status",
"//aos/network:message_bridge_server_status",
"//aos/network:remote_message_fbs",
+ "//aos/network:timestamp_channel",
"//aos/util:phased_loop",
"@com_google_absl//absl/container:btree",
],
diff --git a/aos/events/multinode_pingpong_test_combined.json b/aos/events/multinode_pingpong_test_combined.json
new file mode 100644
index 0000000..54da1c1
--- /dev/null
+++ b/aos/events/multinode_pingpong_test_combined.json
@@ -0,0 +1,335 @@
+{
+ "channels": [
+ {
+ "name": "/pi1/aos",
+ "type": "aos.logging.LogMessageFbs",
+ "source_node": "pi1",
+ "frequency": 200,
+ "num_senders": 20,
+ "max_size": 2048
+ },
+ {
+ "name": "/pi2/aos",
+ "type": "aos.logging.LogMessageFbs",
+ "source_node": "pi2",
+ "frequency": 200,
+ "num_senders": 20,
+ "max_size": 2048
+ },
+ {
+ "name": "/pi3/aos",
+ "type": "aos.logging.LogMessageFbs",
+ "source_node": "pi3",
+ "frequency": 200,
+ "num_senders": 20,
+ "max_size": 2048
+ },
+ {
+ "name": "/pi1/aos",
+ "type": "aos.message_bridge.Timestamp",
+ "source_node": "pi1",
+ "frequency": 10,
+ "num_senders": 2,
+ "max_size": 200,
+ "destination_nodes": [
+ {
+ "name": "pi2",
+ "priority": 1,
+ "time_to_live": 5000000,
+ "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+ "timestamp_logger_nodes": ["pi1"]
+ },
+ {
+ "name": "pi3",
+ "priority": 1,
+ "time_to_live": 5000000
+ }
+ ]
+ },
+ {
+ "name": "/pi2/aos",
+ "type": "aos.message_bridge.Timestamp",
+ "source_node": "pi2",
+ "frequency": 10,
+ "num_senders": 2,
+ "max_size": 200,
+ "destination_nodes": [
+ {
+ "name": "pi1",
+ "priority": 1,
+ "time_to_live": 5000000,
+ "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+ "timestamp_logger_nodes": ["pi2"]
+ }
+ ]
+ },
+ {
+ "name": "/pi3/aos",
+ "type": "aos.message_bridge.Timestamp",
+ "source_node": "pi3",
+ "frequency": 10,
+ "num_senders": 2,
+ "max_size": 200,
+ "destination_nodes": [
+ {
+ "name": "pi1",
+ "priority": 1,
+ "time_to_live": 5000000
+ }
+ ]
+ },
+ {
+ "name": "/pi1/aos",
+ "type": "aos.message_bridge.ServerStatistics",
+ "source_node": "pi1",
+ "frequency": 2
+ },
+ {
+ "name": "/pi2/aos",
+ "type": "aos.message_bridge.ServerStatistics",
+ "source_node": "pi2",
+ "frequency": 2
+ },
+ {
+ "name": "/pi3/aos",
+ "type": "aos.message_bridge.ServerStatistics",
+ "source_node": "pi3",
+ "frequency": 2
+ },
+ {
+ "name": "/pi1/aos",
+ "type": "aos.message_bridge.ClientStatistics",
+ "source_node": "pi1",
+ "frequency": 2
+ },
+ {
+ "name": "/pi2/aos",
+ "type": "aos.message_bridge.ClientStatistics",
+ "source_node": "pi2",
+ "frequency": 2
+ },
+ {
+ "name": "/pi3/aos",
+ "type": "aos.message_bridge.ClientStatistics",
+ "source_node": "pi3",
+ "frequency": 2
+ },
+ {
+ "name": "/pi1/aos/remote_timestamps/pi2",
+ "type": "aos.message_bridge.RemoteMessage",
+ "logger": "NOT_LOGGED",
+ "source_node": "pi1"
+ },
+ {
+ "name": "/pi2/aos/remote_timestamps/pi1",
+ "type": "aos.message_bridge.RemoteMessage",
+ "logger": "NOT_LOGGED",
+ "source_node": "pi2"
+ },
+ {
+ "name": "/pi1/aos",
+ "type": "aos.timing.Report",
+ "source_node": "pi1",
+ "frequency": 50,
+ "num_senders": 20,
+ "max_size": 2048
+ },
+ {
+ "name": "/pi2/aos",
+ "type": "aos.timing.Report",
+ "source_node": "pi2",
+ "frequency": 50,
+ "num_senders": 20,
+ "max_size": 2048
+ },
+ {
+ "name": "/pi3/aos",
+ "type": "aos.timing.Report",
+ "source_node": "pi3",
+ "frequency": 50,
+ "num_senders": 20,
+ "max_size": 2048
+ },
+ {
+ "name": "/test",
+ "type": "aos.examples.Ping",
+ "source_node": "pi1",
+ "destination_nodes": [
+ {
+ "name": "pi2",
+ "priority": 1,
+ "time_to_live": 5000000,
+ "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+ "timestamp_logger_nodes": ["pi1"]
+ }
+ ]
+ },
+ {
+ "name": "/test",
+ "type": "aos.examples.Pong",
+ "source_node": "pi2",
+ "destination_nodes": [
+ {
+ "name": "pi1",
+ "priority": 1,
+ "time_to_live": 5000000,
+ "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+ "timestamp_logger_nodes": ["pi2"]
+ }
+ ]
+ },
+ {
+ "name": "/test2",
+ "type": "aos.examples.Ping",
+ "source_node": "pi1",
+ "destination_nodes": [
+ {
+ "name": "pi3",
+ "priority": 1,
+ "timestamp_logger": "LOCAL_LOGGER",
+ "time_to_live": 5000000
+ }
+ ]
+ },
+ {
+ "name": "/test2",
+ "type": "aos.examples.Pong",
+ "source_node": "pi3",
+ "destination_nodes": [
+ {
+ "name": "pi1",
+ "priority": 1,
+ "timestamp_logger": "LOCAL_LOGGER",
+ "time_to_live": 5000000
+ }
+ ]
+ },
+ {
+ "name": "/reliable",
+ "type": "aos.examples.Ping",
+ "source_node": "pi1",
+ "destination_nodes": [
+ {
+ "name": "pi2",
+ "priority": 1,
+ "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+ "timestamp_logger_nodes": ["pi1"],
+ "time_to_live": 0
+ }
+ ]
+ },
+ {
+ "name": "/unreliable",
+ "type": "aos.examples.Ping",
+ "source_node": "pi1",
+ "destination_nodes": [
+ {
+ "name": "pi2",
+ "priority": 1,
+ "timestamp_logger": "LOCAL_LOGGER",
+ "time_to_live": 5000000
+ }
+ ]
+ }
+ ],
+ "maps": [
+ {
+ "match": {
+ "name": "/aos*",
+ "source_node": "pi1"
+ },
+ "rename": {
+ "name": "/pi1/aos"
+ }
+ },
+ {
+ "match": {
+ "name": "/aos*",
+ "source_node": "pi2"
+ },
+ "rename": {
+ "name": "/pi2/aos"
+ }
+ },
+ {
+ "match": {
+ "name": "/aos*",
+ "source_node": "pi3"
+ },
+ "rename": {
+ "name": "/pi3/aos"
+ }
+ }
+ ],
+ "nodes": [
+ {
+ "name": "pi1",
+ "hostname": "raspberrypi",
+ "port": 9971
+ },
+ {
+ "name": "pi2",
+ "hostname": "raspberrypi2",
+ "port": 9971
+ },
+ {
+ "name": "pi3",
+ "hostname": "raspberrypi3",
+ "port": 9971
+ }
+ ],
+ "applications": [
+ {
+ "name": "ping2",
+ "maps": [
+ {
+ "match": {
+ "name": "/test"
+ },
+ "rename": {
+ "name": "/test2"
+ }
+ }
+ ]
+ },
+ {
+ "name": "pong2",
+ "maps": [
+ {
+ "match": {
+ "name": "/test"
+ },
+ "rename": {
+ "name": "/test2"
+ }
+ }
+ ]
+ },
+ {
+ "name": "ping3",
+ "maps": [
+ {
+ "match": {
+ "name": "/test"
+ },
+ "rename": {
+ "name": "/test3"
+ }
+ }
+ ]
+ },
+ {
+ "name": "pong3",
+ "maps": [
+ {
+ "match": {
+ "name": "/test"
+ },
+ "rename": {
+ "name": "/test3"
+ }
+ }
+ ]
+ }
+ ]
+}
diff --git a/aos/events/multinode_pingpong_test_split.json b/aos/events/multinode_pingpong_test_split.json
new file mode 100644
index 0000000..a59f588
--- /dev/null
+++ b/aos/events/multinode_pingpong_test_split.json
@@ -0,0 +1,353 @@
+{
+ "channels": [
+ {
+ "name": "/pi1/aos",
+ "type": "aos.logging.LogMessageFbs",
+ "source_node": "pi1",
+ "frequency": 200,
+ "num_senders": 20,
+ "max_size": 2048
+ },
+ {
+ "name": "/pi2/aos",
+ "type": "aos.logging.LogMessageFbs",
+ "source_node": "pi2",
+ "frequency": 200,
+ "num_senders": 20,
+ "max_size": 2048
+ },
+ {
+ "name": "/pi3/aos",
+ "type": "aos.logging.LogMessageFbs",
+ "source_node": "pi3",
+ "frequency": 200,
+ "num_senders": 20,
+ "max_size": 2048
+ },
+ {
+ "name": "/pi1/aos",
+ "type": "aos.message_bridge.Timestamp",
+ "source_node": "pi1",
+ "frequency": 10,
+ "num_senders": 2,
+ "max_size": 200,
+ "destination_nodes": [
+ {
+ "name": "pi2",
+ "priority": 1,
+ "time_to_live": 5000000,
+ "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+ "timestamp_logger_nodes": ["pi1"]
+ },
+ {
+ "name": "pi3",
+ "priority": 1,
+ "time_to_live": 5000000
+ }
+ ]
+ },
+ {
+ "name": "/pi2/aos",
+ "type": "aos.message_bridge.Timestamp",
+ "source_node": "pi2",
+ "frequency": 10,
+ "num_senders": 2,
+ "max_size": 200,
+ "destination_nodes": [
+ {
+ "name": "pi1",
+ "priority": 1,
+ "time_to_live": 5000000,
+ "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+ "timestamp_logger_nodes": ["pi2"]
+ }
+ ]
+ },
+ {
+ "name": "/pi3/aos",
+ "type": "aos.message_bridge.Timestamp",
+ "source_node": "pi3",
+ "frequency": 10,
+ "num_senders": 2,
+ "max_size": 200,
+ "destination_nodes": [
+ {
+ "name": "pi1",
+ "priority": 1,
+ "time_to_live": 5000000
+ }
+ ]
+ },
+ {
+ "name": "/pi1/aos",
+ "type": "aos.message_bridge.ServerStatistics",
+ "source_node": "pi1",
+ "frequency": 2
+ },
+ {
+ "name": "/pi2/aos",
+ "type": "aos.message_bridge.ServerStatistics",
+ "source_node": "pi2",
+ "frequency": 2
+ },
+ {
+ "name": "/pi3/aos",
+ "type": "aos.message_bridge.ServerStatistics",
+ "source_node": "pi3",
+ "frequency": 2
+ },
+ {
+ "name": "/pi1/aos",
+ "type": "aos.message_bridge.ClientStatistics",
+ "source_node": "pi1",
+ "frequency": 2
+ },
+ {
+ "name": "/pi2/aos",
+ "type": "aos.message_bridge.ClientStatistics",
+ "source_node": "pi2",
+ "frequency": 2
+ },
+ {
+ "name": "/pi3/aos",
+ "type": "aos.message_bridge.ClientStatistics",
+ "source_node": "pi3",
+ "frequency": 2
+ },
+ {
+ "name": "/pi1/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp",
+ "type": "aos.message_bridge.RemoteMessage",
+ "logger": "NOT_LOGGED",
+ "source_node": "pi1"
+ },
+ {
+ "name": "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping",
+ "type": "aos.message_bridge.RemoteMessage",
+ "logger": "NOT_LOGGED",
+ "source_node": "pi1"
+ },
+ {
+ "name": "/pi2/aos/remote_timestamps/pi1/pi2/aos/aos-message_bridge-Timestamp",
+ "type": "aos.message_bridge.RemoteMessage",
+ "logger": "NOT_LOGGED",
+ "source_node": "pi2"
+ },
+ {
+ "name": "/pi2/aos/remote_timestamps/pi1/test/aos-examples-Pong",
+ "type": "aos.message_bridge.RemoteMessage",
+ "logger": "NOT_LOGGED",
+ "source_node": "pi2"
+ },
+ {
+ "name": "/pi1/aos/remote_timestamps/pi2/reliable/aos-examples-Ping",
+ "type": "aos.message_bridge.RemoteMessage",
+ "logger": "NOT_LOGGED",
+ "source_node": "pi1"
+ },
+ {
+ "name": "/pi1/aos",
+ "type": "aos.timing.Report",
+ "source_node": "pi1",
+ "frequency": 50,
+ "num_senders": 20,
+ "max_size": 2048
+ },
+ {
+ "name": "/pi2/aos",
+ "type": "aos.timing.Report",
+ "source_node": "pi2",
+ "frequency": 50,
+ "num_senders": 20,
+ "max_size": 2048
+ },
+ {
+ "name": "/pi3/aos",
+ "type": "aos.timing.Report",
+ "source_node": "pi3",
+ "frequency": 50,
+ "num_senders": 20,
+ "max_size": 2048
+ },
+ {
+ "name": "/test",
+ "type": "aos.examples.Ping",
+ "source_node": "pi1",
+ "destination_nodes": [
+ {
+ "name": "pi2",
+ "priority": 1,
+ "time_to_live": 5000000,
+ "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+ "timestamp_logger_nodes": ["pi1"]
+ }
+ ]
+ },
+ {
+ "name": "/test",
+ "type": "aos.examples.Pong",
+ "source_node": "pi2",
+ "destination_nodes": [
+ {
+ "name": "pi1",
+ "priority": 1,
+ "time_to_live": 5000000,
+ "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+ "timestamp_logger_nodes": ["pi2"]
+ }
+ ]
+ },
+ {
+ "name": "/test2",
+ "type": "aos.examples.Ping",
+ "source_node": "pi1",
+ "destination_nodes": [
+ {
+ "name": "pi3",
+ "priority": 1,
+ "timestamp_logger": "LOCAL_LOGGER",
+ "time_to_live": 5000000
+ }
+ ]
+ },
+ {
+ "name": "/test2",
+ "type": "aos.examples.Pong",
+ "source_node": "pi3",
+ "destination_nodes": [
+ {
+ "name": "pi1",
+ "priority": 1,
+ "timestamp_logger": "LOCAL_LOGGER",
+ "time_to_live": 5000000
+ }
+ ]
+ },
+ {
+ "name": "/reliable",
+ "type": "aos.examples.Ping",
+ "source_node": "pi1",
+ "destination_nodes": [
+ {
+ "name": "pi2",
+ "priority": 1,
+ "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+ "timestamp_logger_nodes": ["pi1"],
+ "time_to_live": 0
+ }
+ ]
+ },
+ {
+ "name": "/unreliable",
+ "type": "aos.examples.Ping",
+ "source_node": "pi1",
+ "destination_nodes": [
+ {
+ "name": "pi2",
+ "priority": 1,
+ "timestamp_logger": "LOCAL_LOGGER",
+ "time_to_live": 5000000
+ }
+ ]
+ }
+ ],
+ "maps": [
+ {
+ "match": {
+ "name": "/aos*",
+ "source_node": "pi1"
+ },
+ "rename": {
+ "name": "/pi1/aos"
+ }
+ },
+ {
+ "match": {
+ "name": "/aos*",
+ "source_node": "pi2"
+ },
+ "rename": {
+ "name": "/pi2/aos"
+ }
+ },
+ {
+ "match": {
+ "name": "/aos*",
+ "source_node": "pi3"
+ },
+ "rename": {
+ "name": "/pi3/aos"
+ }
+ }
+ ],
+ "nodes": [
+ {
+ "name": "pi1",
+ "hostname": "raspberrypi",
+ "port": 9971
+ },
+ {
+ "name": "pi2",
+ "hostname": "raspberrypi2",
+ "port": 9971
+ },
+ {
+ "name": "pi3",
+ "hostname": "raspberrypi3",
+ "port": 9971
+ }
+ ],
+ "applications": [
+ {
+ "name": "ping2",
+ "maps": [
+ {
+ "match": {
+ "name": "/test"
+ },
+ "rename": {
+ "name": "/test2"
+ }
+ }
+ ]
+ },
+ {
+ "name": "pong2",
+ "maps": [
+ {
+ "match": {
+ "name": "/test"
+ },
+ "rename": {
+ "name": "/test2"
+ }
+ }
+ ]
+ },
+ {
+ "name": "ping3",
+ "maps": [
+ {
+ "match": {
+ "name": "/test"
+ },
+ "rename": {
+ "name": "/test3"
+ }
+ }
+ ]
+ },
+ {
+ "name": "pong3",
+ "maps": [
+ {
+ "match": {
+ "name": "/test"
+ },
+ "rename": {
+ "name": "/test3"
+ }
+ }
+ ]
+ }
+ ]
+}
diff --git a/aos/events/simulated_event_loop_test.cc b/aos/events/simulated_event_loop_test.cc
index 7b7c861..43833fd 100644
--- a/aos/events/simulated_event_loop_test.cc
+++ b/aos/events/simulated_event_loop_test.cc
@@ -77,6 +77,64 @@
INSTANTIATE_TEST_CASE_P(SimulatedEventLoopCommonDeathTest,
AbstractEventLoopDeathTest, CommonParameters());
+// Parameters to run all the tests with.
+struct Param {
+ // The config file to use.
+ std::string config;
+ // If true, the RemoteMessage channel should be shared between all the remote
+ // channels. If false, there will be 1 RemoteMessage channel per remote
+ // channel.
+ bool shared;
+};
+
+class RemoteMessageSimulatedEventLoopTest
+ : public ::testing::TestWithParam<struct Param> {
+ public:
+ RemoteMessageSimulatedEventLoopTest()
+ : config(aos::configuration::ReadConfig(
+ absl::StrCat(ConfigPrefix(), "events/", GetParam().config))) {
+ LOG(INFO) << "Config " << GetParam().config;
+ }
+
+ bool shared() const { return GetParam().shared; }
+
+ std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
+ MakePi2OnPi1MessageCounters(aos::EventLoop *event_loop) {
+ std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>> counters;
+ if (shared()) {
+ counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
+ event_loop, "/aos/remote_timestamps/pi2"));
+ } else {
+ counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
+ event_loop,
+ "/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp"));
+ counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
+ event_loop, "/aos/remote_timestamps/pi2/test/aos-examples-Ping"));
+ counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
+ event_loop, "/aos/remote_timestamps/pi2/reliable/aos-examples-Ping"));
+ }
+ return counters;
+ }
+
+ std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
+ MakePi1OnPi2MessageCounters(aos::EventLoop *event_loop) {
+ std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>> counters;
+ if (shared()) {
+ counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
+ event_loop, "/aos/remote_timestamps/pi1"));
+ } else {
+ counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
+ event_loop, "/aos/remote_timestamps/pi1/test/aos-examples-Pong"));
+ counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
+ event_loop,
+ "/aos/remote_timestamps/pi1/pi2/aos/aos-message_bridge-Timestamp"));
+ }
+ return counters;
+ }
+
+ aos::FlatbufferDetachedBuffer<aos::Configuration> config;
+};
+
// Test that creating an event and running the scheduler runs the event.
TEST(EventSchedulerTest, ScheduleEvent) {
int counter = 0;
@@ -352,12 +410,20 @@
0.0);
}
+size_t CountAll(
+ const std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
+ &counters) {
+ size_t count = 0u;
+ for (const std::unique_ptr<MessageCounter<RemoteMessage>> &counter :
+ counters) {
+ count += counter->count();
+ }
+ return count;
+}
+
// Tests that ping and pong work when on 2 different nodes, and the message
// gateway messages are sent out as expected.
-TEST(SimulatedEventLoopTest, MultinodePingPong) {
- aos::FlatbufferDetachedBuffer<aos::Configuration> config =
- aos::configuration::ReadConfig(ConfigPrefix() +
- "events/multinode_pingpong_config.json");
+TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodePingPong) {
const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
@@ -412,10 +478,12 @@
pi3_pong_counter_event_loop.get(), "/pi3/aos");
// Count remote timestamps
- MessageCounter<RemoteMessage> remote_timestamps_pi2_on_pi1(
- pi1_pong_counter_event_loop.get(), "/aos/remote_timestamps/pi2");
- MessageCounter<RemoteMessage> remote_timestamps_pi1_on_pi2(
- pi2_pong_counter_event_loop.get(), "/aos/remote_timestamps/pi1");
+ std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
+ remote_timestamps_pi2_on_pi1 =
+ MakePi2OnPi1MessageCounters(pi1_pong_counter_event_loop.get());
+ std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
+ remote_timestamps_pi1_on_pi2 =
+ MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
// Wait to let timestamp estimation start up before looking for the results.
simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
@@ -561,87 +629,104 @@
std::unique_ptr<EventLoop> pi1_remote_timestamp =
simulated_event_loop_factory.MakeEventLoop("pi1_remote_timestamp", pi1);
- // For each remote timestamp we get back, confirm that it is either a ping
- // message, or a timestamp we sent out. Also confirm that the timestamps are
- // correct.
- pi1_remote_timestamp->MakeWatcher(
- "/pi1/aos/remote_timestamps/pi2",
- [pi1_timestamp_channel, ping_timestamp_channel, &ping_on_pi2_fetcher,
- &ping_on_pi1_fetcher, &pi1_on_pi2_timestamp_fetcher,
- &pi1_on_pi1_timestamp_fetcher, &simulated_event_loop_factory,
- pi2](const RemoteMessage &header) {
- VLOG(1) << aos::FlatbufferToJson(&header);
- EXPECT_TRUE(header.has_boot_uuid());
- EXPECT_EQ(header.boot_uuid()->string_view(),
- simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
- ->boot_uuid()
- .string_view());
+ for (std::pair<int, std::string> channel :
+ shared()
+ ? std::vector<std::pair<
+ int, std::string>>{{-1, "/pi1/aos/remote_timestamps/pi2"}}
+ : std::vector<std::pair<int, std::string>>{
+ {pi1_timestamp_channel,
+ "/pi1/aos/remote_timestamps/pi2/pi1/aos/"
+ "aos-message_bridge-Timestamp"},
+ {ping_timestamp_channel,
+ "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping"}}) {
+ // For each remote timestamp we get back, confirm that it is either a ping
+ // message, or a timestamp we sent out. Also confirm that the timestamps
+ // are correct.
+ pi1_remote_timestamp->MakeWatcher(
+ channel.second,
+ [pi1_timestamp_channel, ping_timestamp_channel, &ping_on_pi2_fetcher,
+ &ping_on_pi1_fetcher, &pi1_on_pi2_timestamp_fetcher,
+ &pi1_on_pi1_timestamp_fetcher, &simulated_event_loop_factory, pi2,
+ channel_index = channel.first](const RemoteMessage &header) {
+ VLOG(1) << aos::FlatbufferToJson(&header);
+ EXPECT_TRUE(header.has_boot_uuid());
+ EXPECT_EQ(header.boot_uuid()->string_view(),
+ simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
+ ->boot_uuid()
+ .string_view());
- const aos::monotonic_clock::time_point header_monotonic_sent_time(
- chrono::nanoseconds(header.monotonic_sent_time()));
- const aos::realtime_clock::time_point header_realtime_sent_time(
- chrono::nanoseconds(header.realtime_sent_time()));
- const aos::monotonic_clock::time_point header_monotonic_remote_time(
- chrono::nanoseconds(header.monotonic_remote_time()));
- const aos::realtime_clock::time_point header_realtime_remote_time(
- chrono::nanoseconds(header.realtime_remote_time()));
+ const aos::monotonic_clock::time_point header_monotonic_sent_time(
+ chrono::nanoseconds(header.monotonic_sent_time()));
+ const aos::realtime_clock::time_point header_realtime_sent_time(
+ chrono::nanoseconds(header.realtime_sent_time()));
+ const aos::monotonic_clock::time_point header_monotonic_remote_time(
+ chrono::nanoseconds(header.monotonic_remote_time()));
+ const aos::realtime_clock::time_point header_realtime_remote_time(
+ chrono::nanoseconds(header.realtime_remote_time()));
- const Context *pi1_context = nullptr;
- const Context *pi2_context = nullptr;
-
- if (header.channel_index() == pi1_timestamp_channel) {
- // Find the forwarded message.
- while (pi1_on_pi2_timestamp_fetcher.context().monotonic_event_time <
- header_monotonic_sent_time) {
- ASSERT_TRUE(pi1_on_pi2_timestamp_fetcher.FetchNext());
+ if (channel_index != -1) {
+ ASSERT_EQ(channel_index, header.channel_index());
}
- // And the source message.
- while (pi1_on_pi1_timestamp_fetcher.context().monotonic_event_time <
- header_monotonic_remote_time) {
- ASSERT_TRUE(pi1_on_pi1_timestamp_fetcher.FetchNext());
+ const Context *pi1_context = nullptr;
+ const Context *pi2_context = nullptr;
+
+ if (header.channel_index() == pi1_timestamp_channel) {
+ // Find the forwarded message.
+ while (pi1_on_pi2_timestamp_fetcher.context().monotonic_event_time <
+ header_monotonic_sent_time) {
+ ASSERT_TRUE(pi1_on_pi2_timestamp_fetcher.FetchNext());
+ }
+
+ // And the source message.
+ while (pi1_on_pi1_timestamp_fetcher.context().monotonic_event_time <
+ header_monotonic_remote_time) {
+ ASSERT_TRUE(pi1_on_pi1_timestamp_fetcher.FetchNext());
+ }
+
+ pi1_context = &pi1_on_pi1_timestamp_fetcher.context();
+ pi2_context = &pi1_on_pi2_timestamp_fetcher.context();
+ } else if (header.channel_index() == ping_timestamp_channel) {
+ // Find the forwarded message.
+ while (ping_on_pi2_fetcher.context().monotonic_event_time <
+ header_monotonic_sent_time) {
+ ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
+ }
+
+ // And the source message.
+ while (ping_on_pi1_fetcher.context().monotonic_event_time <
+ header_monotonic_remote_time) {
+ ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
+ }
+
+ pi1_context = &ping_on_pi1_fetcher.context();
+ pi2_context = &ping_on_pi2_fetcher.context();
+ } else {
+ LOG(FATAL) << "Unknown channel";
}
- pi1_context = &pi1_on_pi1_timestamp_fetcher.context();
- pi2_context = &pi1_on_pi2_timestamp_fetcher.context();
- } else if (header.channel_index() == ping_timestamp_channel) {
- // Find the forwarded message.
- while (ping_on_pi2_fetcher.context().monotonic_event_time <
- header_monotonic_sent_time) {
- ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
- }
+ // Confirm the forwarded message has matching timestamps to the
+ // timestamps we got back.
+ EXPECT_EQ(pi2_context->queue_index, header.queue_index());
+ EXPECT_EQ(pi2_context->remote_queue_index,
+ header.remote_queue_index());
+ EXPECT_EQ(pi2_context->monotonic_event_time,
+ header_monotonic_sent_time);
+ EXPECT_EQ(pi2_context->realtime_event_time,
+ header_realtime_sent_time);
+ EXPECT_EQ(pi2_context->realtime_remote_time,
+ header_realtime_remote_time);
+ EXPECT_EQ(pi2_context->monotonic_remote_time,
+ header_monotonic_remote_time);
- // And the source message.
- while (ping_on_pi1_fetcher.context().monotonic_event_time <
- header_monotonic_remote_time) {
- ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
- }
-
- pi1_context = &ping_on_pi1_fetcher.context();
- pi2_context = &ping_on_pi2_fetcher.context();
- } else {
- LOG(FATAL) << "Unknown channel";
- }
-
- // Confirm the forwarded message has matching timestamps to the
- // timestamps we got back.
- EXPECT_EQ(pi2_context->queue_index, header.queue_index());
- EXPECT_EQ(pi2_context->remote_queue_index, header.remote_queue_index());
- EXPECT_EQ(pi2_context->monotonic_event_time,
- header_monotonic_sent_time);
- EXPECT_EQ(pi2_context->realtime_event_time, header_realtime_sent_time);
- EXPECT_EQ(pi2_context->realtime_remote_time,
- header_realtime_remote_time);
- EXPECT_EQ(pi2_context->monotonic_remote_time,
- header_monotonic_remote_time);
-
- // Confirm the forwarded message also matches the source message.
- EXPECT_EQ(pi1_context->queue_index, header.remote_queue_index());
- EXPECT_EQ(pi1_context->monotonic_event_time,
- header_monotonic_remote_time);
- EXPECT_EQ(pi1_context->realtime_event_time,
- header_realtime_remote_time);
- });
+ // Confirm the forwarded message also matches the source message.
+ EXPECT_EQ(pi1_context->queue_index, header.remote_queue_index());
+ EXPECT_EQ(pi1_context->monotonic_event_time,
+ header_monotonic_remote_time);
+ EXPECT_EQ(pi1_context->realtime_event_time,
+ header_realtime_remote_time);
+ });
+ }
simulated_event_loop_factory.RunFor(chrono::seconds(10) -
chrono::milliseconds(500) +
@@ -667,16 +752,17 @@
EXPECT_EQ(pi3_client_statistics_count, 95);
// Also confirm that remote timestamps are being forwarded correctly.
- EXPECT_EQ(remote_timestamps_pi2_on_pi1.count(), 1101);
- EXPECT_EQ(remote_timestamps_pi1_on_pi2.count(), 1101);
+ EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 1101);
+ EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 1101);
}
// Tests that an offset between nodes can be recovered and shows up in
// ServerStatistics correctly.
TEST(SimulatedEventLoopTest, MultinodePingPongWithOffset) {
aos::FlatbufferDetachedBuffer<aos::Configuration> config =
- aos::configuration::ReadConfig(ConfigPrefix() +
- "events/multinode_pingpong_config.json");
+ aos::configuration::ReadConfig(
+ ConfigPrefix() +
+ "events/multinode_pingpong_test_combined_config.json");
const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
const size_t pi1_index = configuration::GetNodeIndex(&config.message(), pi1);
ASSERT_EQ(pi1_index, 0u);
@@ -788,10 +874,7 @@
}
// Test that disabling statistics actually disables them.
-TEST(SimulatedEventLoopTest, MultinodeWithoutStatistics) {
- aos::FlatbufferDetachedBuffer<aos::Configuration> config =
- aos::configuration::ReadConfig(ConfigPrefix() +
- "events/multinode_pingpong_config.json");
+TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeWithoutStatistics) {
const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
@@ -839,10 +922,12 @@
pi3_pong_counter_event_loop.get(), "/pi3/aos");
// Count remote timestamps
- MessageCounter<RemoteMessage> remote_timestamps_pi2_on_pi1(
- pi1_pong_counter_event_loop.get(), "/aos/remote_timestamps/pi2");
- MessageCounter<RemoteMessage> remote_timestamps_pi1_on_pi2(
- pi2_pong_counter_event_loop.get(), "/aos/remote_timestamps/pi1");
+ std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
+ remote_timestamps_pi2_on_pi1 =
+ MakePi2OnPi1MessageCounters(pi1_pong_counter_event_loop.get());
+ std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
+ remote_timestamps_pi1_on_pi2 =
+ MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
MessageCounter<message_bridge::ServerStatistics>
pi1_server_statistics_counter(pi1_pong_counter_event_loop.get(),
@@ -887,8 +972,8 @@
EXPECT_EQ(pi3_client_statistics_counter.count(), 0u);
// Also confirm that remote timestamps are being forwarded correctly.
- EXPECT_EQ(remote_timestamps_pi2_on_pi1.count(), 1001);
- EXPECT_EQ(remote_timestamps_pi1_on_pi2.count(), 1001);
+ EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 1001);
+ EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 1001);
}
bool AllConnected(const message_bridge::ServerStatistics *server_statistics) {
@@ -946,10 +1031,7 @@
}
// Test that disconnecting nodes actually disconnects them.
-TEST(SimulatedEventLoopTest, MultinodeDisconnect) {
- aos::FlatbufferDetachedBuffer<aos::Configuration> config =
- aos::configuration::ReadConfig(ConfigPrefix() +
- "events/multinode_pingpong_config.json");
+TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeDisconnect) {
const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
@@ -996,10 +1078,12 @@
pi3_pong_counter_event_loop.get(), "/pi3/aos");
// Count remote timestamps
- MessageCounter<RemoteMessage> remote_timestamps_pi2_on_pi1(
- pi1_pong_counter_event_loop.get(), "/aos/remote_timestamps/pi2");
- MessageCounter<RemoteMessage> remote_timestamps_pi1_on_pi2(
- pi2_pong_counter_event_loop.get(), "/aos/remote_timestamps/pi1");
+ std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
+ remote_timestamps_pi2_on_pi1 =
+ MakePi2OnPi1MessageCounters(pi1_pong_counter_event_loop.get());
+ std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
+ remote_timestamps_pi1_on_pi2 =
+ MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
MessageCounter<message_bridge::ServerStatistics>
pi1_server_statistics_counter(pi1_pong_counter_event_loop.get(),
@@ -1064,8 +1148,8 @@
EXPECT_EQ(pi3_client_statistics_counter.count(), 20u);
// Also confirm that remote timestamps are being forwarded correctly.
- EXPECT_EQ(remote_timestamps_pi2_on_pi1.count(), 221);
- EXPECT_EQ(remote_timestamps_pi1_on_pi2.count(), 221);
+ EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 221);
+ EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 221);
EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
EXPECT_TRUE(AllConnected(pi1_server_statistics_fetcher.get()))
@@ -1110,8 +1194,8 @@
EXPECT_EQ(pi3_client_statistics_counter.count(), 40u);
// Also confirm that remote timestamps are being forwarded correctly.
- EXPECT_EQ(remote_timestamps_pi2_on_pi1.count(), 441);
- EXPECT_EQ(remote_timestamps_pi1_on_pi2.count(), 441);
+ EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 441);
+ EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 441);
EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
EXPECT_TRUE(AllConnectedBut(pi1_server_statistics_fetcher.get(), "pi3"))
@@ -1156,8 +1240,8 @@
EXPECT_EQ(pi3_client_statistics_counter.count(), 60u);
// Also confirm that remote timestamps are being forwarded correctly.
- EXPECT_EQ(remote_timestamps_pi2_on_pi1.count(), 661);
- EXPECT_EQ(remote_timestamps_pi1_on_pi2.count(), 661);
+ EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 661);
+ EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 661);
EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
EXPECT_TRUE(AllConnected(pi1_server_statistics_fetcher.get()))
@@ -1185,8 +1269,9 @@
// it gets delivered as expected.
TEST(SimulatedEventLoopTest, MultinodePingPongWithOffsetAndSlope) {
aos::FlatbufferDetachedBuffer<aos::Configuration> config =
- aos::configuration::ReadConfig(ConfigPrefix() +
- "events/multinode_pingpong_config.json");
+ aos::configuration::ReadConfig(
+ ConfigPrefix() +
+ "events/multinode_pingpong_test_combined_config.json");
const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
const size_t pi1_index = configuration::GetNodeIndex(&config.message(), pi1);
ASSERT_EQ(pi1_index, 0u);
@@ -1302,10 +1387,7 @@
}
// Tests that reliable (and unreliable) ping messages get forwarded as expected.
-TEST(SimulatedEventLoopTest, MultinodeStartupTesting) {
- aos::FlatbufferDetachedBuffer<aos::Configuration> config =
- aos::configuration::ReadConfig(ConfigPrefix() +
- "events/multinode_pingpong_config.json");
+TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeStartupTesting) {
const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
@@ -1342,7 +1424,8 @@
int reliable_timestamp_count = 0;
pi1_remote_timestamp->MakeWatcher(
- "/pi1/aos/remote_timestamps/pi2",
+ shared() ? "/pi1/aos/remote_timestamps/pi2"
+ : "/pi1/aos/remote_timestamps/pi2/reliable/aos-examples-Ping",
[reliable_channel_index, &reliable_timestamp_count,
&simulated_event_loop_factory, pi2, network_delay, &pi2_pong_event_loop,
&pi1_remote_timestamp](const RemoteMessage &header) {
@@ -1386,10 +1469,7 @@
// Tests that rebooting a node changes the ServerStatistics message and the
// RemoteTimestamp message.
-TEST(SimulatedEventLoopTest, BootUUIDTest) {
- aos::FlatbufferDetachedBuffer<aos::Configuration> config =
- aos::configuration::ReadConfig(ConfigPrefix() +
- "events/multinode_pingpong_config.json");
+TEST_P(RemoteMessageSimulatedEventLoopTest, BootUUIDTest) {
const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
@@ -1412,7 +1492,8 @@
int timestamp_count = 0;
pi1_remote_timestamp->MakeWatcher(
- "/pi1/aos/remote_timestamps/pi2",
+ shared() ? "/pi1/aos/remote_timestamps/pi2"
+ : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping",
[×tamp_count, &expected_boot_uuid](const RemoteMessage &header) {
EXPECT_TRUE(header.has_boot_uuid());
EXPECT_EQ(header.boot_uuid()->string_view(), expected_boot_uuid);
@@ -1463,5 +1544,11 @@
EXPECT_GE(pi1_server_statistics_count, 1u);
}
+INSTANTIATE_TEST_CASE_P(
+ All, RemoteMessageSimulatedEventLoopTest,
+ ::testing::Values(
+ Param{"multinode_pingpong_test_combined_config.json", true},
+ Param{"multinode_pingpong_test_split_config.json", false}));
+
} // namespace testing
} // namespace aos
diff --git a/aos/events/simulated_network_bridge.cc b/aos/events/simulated_network_bridge.cc
index 93a90ed..8b29b23 100644
--- a/aos/events/simulated_network_bridge.cc
+++ b/aos/events/simulated_network_bridge.cc
@@ -403,8 +403,8 @@
configuration::ChannelIndex(
source_event_loop->second.event_loop->configuration(), channel),
delivery_time_is_logged
- ? &source_event_loop->second
- .timestamp_loggers[destination_node_index]
+ ? source_event_loop->second.timestamp_loggers.SenderForChannel(
+ channel, connection)
: nullptr));
}
@@ -512,9 +512,9 @@
SimulatedMessageBridge::State::State(
std::unique_ptr<aos::EventLoop> &&new_event_loop)
: event_loop(std::move(new_event_loop)),
+ timestamp_loggers(event_loop.get()),
server_status(event_loop.get()),
client_status(event_loop.get()) {
- timestamp_loggers.resize(event_loop->configuration()->nodes()->size());
// Find all nodes which log timestamps back to us (from us).
for (const Channel *channel : *event_loop->configuration()->channels()) {
@@ -533,18 +533,7 @@
continue;
}
- // (And only construct the sender if it hasn't been constructed)
- const Node *other_node = configuration::GetNode(
- event_loop->configuration(), connection->name()->string_view());
- const size_t other_node_index = configuration::GetNodeIndex(
- event_loop->configuration(), other_node);
-
- if (!timestamp_loggers[other_node_index]) {
- timestamp_loggers[other_node_index] =
- event_loop->MakeSender<RemoteMessage>(
- absl::StrCat("/aos/remote_timestamps/",
- connection->name()->string_view()));
- }
+ timestamp_loggers.SenderForChannel(channel, connection);
}
}
}
diff --git a/aos/events/simulated_network_bridge.h b/aos/events/simulated_network_bridge.h
index 1145461..c1ff698 100644
--- a/aos/events/simulated_network_bridge.h
+++ b/aos/events/simulated_network_bridge.h
@@ -6,6 +6,7 @@
#include "aos/network/message_bridge_client_status.h"
#include "aos/network/message_bridge_server_status.h"
#include "aos/network/remote_message_generated.h"
+#include "aos/network/timestamp_channel.h"
namespace aos {
namespace message_bridge {
@@ -47,10 +48,9 @@
State(const State &state) = delete;
std::unique_ptr<aos::EventLoop> event_loop;
+ ChannelTimestampSender timestamp_loggers;
MessageBridgeServerStatus server_status;
MessageBridgeClientStatus client_status;
-
- std::vector<aos::Sender<RemoteMessage>> timestamp_loggers;
};
// Map of nodes to event loops. This is a member variable so that the
// lifetime of the event loops matches the lifetime of the bridge.