Add support for both split and combined timestamp channels
In order to handle simulation sending messages as fast as possible, and
messages sent infrequently with timestamps delivered, we want to split
each channel's RemoteMessage timestamps into a separate channel.
Start by creating a class to sort out which one is which reliably, and
start by using and testing it in message_bridge.
Change-Id: I5d279836146a01f5d403f597e59d330440fe702c
diff --git a/aos/network/message_bridge_test.cc b/aos/network/message_bridge_test.cc
index c8c24cf..1f926c8 100644
--- a/aos/network/message_bridge_test.cc
+++ b/aos/network/message_bridge_test.cc
@@ -32,15 +32,28 @@
aos::SetShmBase(ShmBase(node));
}
-class MessageBridgeTest : public ::testing::Test {
+// Parameters to run all the tests with.
+struct Param {
+ // The config file to use.
+ std::string config;
+ // If true, the RemoteMessage channel should be shared between all the remote
+ // channels. If false, there will be 1 RemoteMessage channel per remote
+ // channel.
+ bool shared;
+};
+
+class MessageBridgeParameterizedTest
+ : public ::testing::TestWithParam<struct Param> {
public:
- MessageBridgeTest(std::string_view path =
- "aos/network/message_bridge_test_common_config.json")
- : config(aos::configuration::ReadConfig(path)) {
+ MessageBridgeParameterizedTest()
+ : config(aos::configuration::ReadConfig(
+ absl::StrCat("aos/network/", GetParam().config))) {
util::UnlinkRecursive(ShmBase("pi1"));
util::UnlinkRecursive(ShmBase("pi2"));
}
+ bool shared() const { return GetParam().shared; }
+
void OnPi1() {
DoSetShmBase("pi1");
FLAGS_override_hostname = "raspberrypi";
@@ -288,7 +301,7 @@
};
// Test that we can send a ping message over sctp and receive it.
-TEST_F(MessageBridgeTest, PingPong) {
+TEST_P(MessageBridgeParameterizedTest, PingPong) {
// This is rather annoying to set up. We need to start up a client and
// server, on the same node, but get them to think that they are on different
// nodes.
@@ -324,7 +337,8 @@
aos::ShmEventLoop pi1_test_event_loop(&config.message());
aos::Fetcher<RemoteMessage> message_header_fetcher1 =
pi1_test_event_loop.MakeFetcher<RemoteMessage>(
- "/pi1/aos/remote_timestamps/pi2");
+ shared() ? "/pi1/aos/remote_timestamps/pi2"
+ : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping");
// Fetchers for confirming the remote timestamps made it.
aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
@@ -350,7 +364,9 @@
test_event_loop.MakeFetcher<ClientStatistics>("/aos");
aos::Fetcher<RemoteMessage> message_header_fetcher2 =
test_event_loop.MakeFetcher<RemoteMessage>(
- "/pi2/aos/remote_timestamps/pi1");
+ shared() ? "/pi2/aos/remote_timestamps/pi1"
+ : "/pi2/aos/remote_timestamps/pi1/pi2/aos/"
+ "aos-message_bridge-Timestamp");
// Event loop for fetching data delivered to pi2 from pi1 to match up
// messages.
@@ -511,83 +527,99 @@
// For each remote timestamp we get back, confirm that it is either a ping
// message, or a timestamp we sent out. Also confirm that the timestamps are
// correct.
- ping_event_loop.MakeWatcher(
- "/pi1/aos/remote_timestamps/pi2",
- [pi1_timestamp_channel, ping_timestamp_channel, &ping_on_pi2_fetcher,
- &ping_on_pi1_fetcher, &pi1_on_pi2_timestamp_fetcher,
- &pi1_on_pi1_timestamp_fetcher](const RemoteMessage &header) {
- VLOG(1) << "/pi1/aos/remote_timestamps/pi2 RemoteMessage "
- << aos::FlatbufferToJson(&header);
+ for (std::pair<int, std::string> channel :
+ shared()
+ ? std::vector<std::pair<
+ int, std::string>>{{-1, "/pi1/aos/remote_timestamps/pi2"}}
+ : std::vector<std::pair<int, std::string>>{
+ {pi1_timestamp_channel,
+ "/pi1/aos/remote_timestamps/pi2/pi1/aos/"
+ "aos-message_bridge-Timestamp"},
+ {ping_timestamp_channel,
+ "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping"}}) {
+ ping_event_loop.MakeWatcher(
+ channel.second,
+ [pi1_timestamp_channel, ping_timestamp_channel, &ping_on_pi2_fetcher,
+ &ping_on_pi1_fetcher, &pi1_on_pi2_timestamp_fetcher,
+ &pi1_on_pi1_timestamp_fetcher,
+ channel_index = channel.first](const RemoteMessage &header) {
+ VLOG(1) << "/pi1/aos/remote_timestamps/pi2 RemoteMessage "
+ << aos::FlatbufferToJson(&header);
- EXPECT_TRUE(header.has_boot_uuid());
-
- const aos::monotonic_clock::time_point header_monotonic_sent_time(
- chrono::nanoseconds(header.monotonic_sent_time()));
- const aos::realtime_clock::time_point header_realtime_sent_time(
- chrono::nanoseconds(header.realtime_sent_time()));
- const aos::monotonic_clock::time_point header_monotonic_remote_time(
- chrono::nanoseconds(header.monotonic_remote_time()));
- const aos::realtime_clock::time_point header_realtime_remote_time(
- chrono::nanoseconds(header.realtime_remote_time()));
-
- const Context *pi1_context = nullptr;
- const Context *pi2_context = nullptr;
-
- if (header.channel_index() == pi1_timestamp_channel) {
- // Find the forwarded message.
- while (pi1_on_pi2_timestamp_fetcher.context().monotonic_event_time <
- header_monotonic_sent_time) {
- ASSERT_TRUE(pi1_on_pi2_timestamp_fetcher.FetchNext());
+ EXPECT_TRUE(header.has_boot_uuid());
+ if (channel_index != -1) {
+ ASSERT_EQ(channel_index, header.channel_index());
}
- // And the source message.
- while (pi1_on_pi1_timestamp_fetcher.context().monotonic_event_time <
- header_monotonic_remote_time) {
- ASSERT_TRUE(pi1_on_pi1_timestamp_fetcher.FetchNext());
+ const aos::monotonic_clock::time_point header_monotonic_sent_time(
+ chrono::nanoseconds(header.monotonic_sent_time()));
+ const aos::realtime_clock::time_point header_realtime_sent_time(
+ chrono::nanoseconds(header.realtime_sent_time()));
+ const aos::monotonic_clock::time_point header_monotonic_remote_time(
+ chrono::nanoseconds(header.monotonic_remote_time()));
+ const aos::realtime_clock::time_point header_realtime_remote_time(
+ chrono::nanoseconds(header.realtime_remote_time()));
+
+ const Context *pi1_context = nullptr;
+ const Context *pi2_context = nullptr;
+
+ if (header.channel_index() == pi1_timestamp_channel) {
+ // Find the forwarded message.
+ while (pi1_on_pi2_timestamp_fetcher.context().monotonic_event_time <
+ header_monotonic_sent_time) {
+ ASSERT_TRUE(pi1_on_pi2_timestamp_fetcher.FetchNext());
+ }
+
+ // And the source message.
+ while (pi1_on_pi1_timestamp_fetcher.context().monotonic_event_time <
+ header_monotonic_remote_time) {
+ ASSERT_TRUE(pi1_on_pi1_timestamp_fetcher.FetchNext());
+ }
+
+ pi1_context = &pi1_on_pi1_timestamp_fetcher.context();
+ pi2_context = &pi1_on_pi2_timestamp_fetcher.context();
+ } else if (header.channel_index() == ping_timestamp_channel) {
+ // Find the forwarded message.
+ while (ping_on_pi2_fetcher.context().monotonic_event_time <
+ header_monotonic_sent_time) {
+ ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
+ }
+
+ // And the source message.
+ while (ping_on_pi1_fetcher.context().monotonic_event_time <
+ header_monotonic_remote_time) {
+ ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
+ }
+
+ pi1_context = &ping_on_pi1_fetcher.context();
+ pi2_context = &ping_on_pi2_fetcher.context();
+ } else {
+ LOG(FATAL) << "Unknown channel";
}
- pi1_context = &pi1_on_pi1_timestamp_fetcher.context();
- pi2_context = &pi1_on_pi2_timestamp_fetcher.context();
- } else if (header.channel_index() == ping_timestamp_channel) {
- // Find the forwarded message.
- while (ping_on_pi2_fetcher.context().monotonic_event_time <
- header_monotonic_sent_time) {
- ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
- }
+ // Confirm the forwarded message has matching timestamps to the
+ // timestamps we got back.
+ EXPECT_EQ(pi2_context->queue_index, header.queue_index());
+ EXPECT_EQ(pi2_context->monotonic_event_time,
+ header_monotonic_sent_time);
+ EXPECT_EQ(pi2_context->realtime_event_time,
+ header_realtime_sent_time);
+ EXPECT_EQ(pi2_context->realtime_remote_time,
+ header_realtime_remote_time);
+ EXPECT_EQ(pi2_context->monotonic_remote_time,
+ header_monotonic_remote_time);
- // And the source message.
- while (ping_on_pi1_fetcher.context().monotonic_event_time <
- header_monotonic_remote_time) {
- ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
- }
+ // Confirm the forwarded message also matches the source message.
+ EXPECT_EQ(pi1_context->queue_index, header.queue_index());
+ EXPECT_EQ(pi1_context->monotonic_event_time,
+ header_monotonic_remote_time);
+ EXPECT_EQ(pi1_context->realtime_event_time,
+ header_realtime_remote_time);
+ });
+ }
- pi1_context = &ping_on_pi1_fetcher.context();
- pi2_context = &ping_on_pi2_fetcher.context();
- } else {
- LOG(FATAL) << "Unknown channel";
- }
-
- // Confirm the forwarded message has matching timestamps to the
- // timestamps we got back.
- EXPECT_EQ(pi2_context->queue_index, header.queue_index());
- EXPECT_EQ(pi2_context->monotonic_event_time,
- header_monotonic_sent_time);
- EXPECT_EQ(pi2_context->realtime_event_time, header_realtime_sent_time);
- EXPECT_EQ(pi2_context->realtime_remote_time,
- header_realtime_remote_time);
- EXPECT_EQ(pi2_context->monotonic_remote_time,
- header_monotonic_remote_time);
-
- // Confirm the forwarded message also matches the source message.
- EXPECT_EQ(pi1_context->queue_index, header.queue_index());
- EXPECT_EQ(pi1_context->monotonic_event_time,
- header_monotonic_remote_time);
- EXPECT_EQ(pi1_context->realtime_event_time,
- header_realtime_remote_time);
- });
-
- // Start everything up. Pong is the only thing we don't know how to wait on,
- // so start it first.
+ // Start everything up. Pong is the only thing we don't know how to wait
+ // on, so start it first.
std::thread pong_thread([&pong_event_loop]() { pong_event_loop.Run(); });
StartPi1Server();
@@ -641,7 +673,7 @@
// Test that the client disconnecting triggers the server offsets on both sides
// to clear.
-TEST_F(MessageBridgeTest, ClientRestart) {
+TEST_P(MessageBridgeParameterizedTest, ClientRestart) {
// This is rather annoying to set up. We need to start up a client and
// server, on the same node, but get them to think that they are on different
// nodes.
@@ -778,7 +810,7 @@
// Test that the server disconnecting triggers the server offsets on the other
// side to clear, along with the other client.
-TEST_F(MessageBridgeTest, ServerRestart) {
+TEST_P(MessageBridgeParameterizedTest, ServerRestart) {
// This is rather annoying to set up. We need to start up a client and
// server, on the same node, but get them to think that they are on different
// nodes.
@@ -932,7 +964,7 @@
// Tests that when a message is sent before the bridge starts up, but is
// configured as reliable, we forward it. Confirm this survives a client reset.
-TEST_F(MessageBridgeTest, ReliableSentBeforeClientStartup) {
+TEST_P(MessageBridgeParameterizedTest, ReliableSentBeforeClientStartup) {
OnPi1();
FLAGS_application_name = "sender";
@@ -967,15 +999,20 @@
receive_event_loop.configuration(), ping_fetcher.channel());
std::atomic<int> ping_timestamp_count{0};
+ const std::string channel_name =
+ shared() ? "/pi1/aos/remote_timestamps/pi2"
+ : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping";
pi1_remote_timestamp_event_loop.MakeWatcher(
- "/pi1/aos/remote_timestamps/pi2",
- [ping_channel_index, &ping_timestamp_count](const RemoteMessage &header) {
- VLOG(1) << "/pi1/aos/remote_timestamps/pi2 RemoteMessage "
+ channel_name, [this, channel_name, ping_channel_index,
+ &ping_timestamp_count](const RemoteMessage &header) {
+ VLOG(1) <<channel_name << " RemoteMessage "
<< aos::FlatbufferToJson(&header);
EXPECT_TRUE(header.has_boot_uuid());
- if (header.channel_index() == ping_channel_index) {
- ++ping_timestamp_count;
+ if (shared() && header.channel_index() != ping_channel_index) {
+ return;
}
+ CHECK_EQ(header.channel_index(), ping_channel_index);
+ ++ping_timestamp_count;
});
// Before everything starts up, confirm there is no message.
@@ -1048,7 +1085,7 @@
// Tests that when a message is sent before the bridge starts up, but is
// configured as reliable, we forward it. Confirm this works across server
// resets.
-TEST_F(MessageBridgeTest, ReliableSentBeforeServerStartup) {
+TEST_P(MessageBridgeParameterizedTest, ReliableSentBeforeServerStartup) {
// Now do it for "raspberrypi2", the client.
OnPi2();
@@ -1087,15 +1124,20 @@
receive_event_loop.configuration(), ping_fetcher.channel());
std::atomic<int> ping_timestamp_count{0};
+ const std::string channel_name =
+ shared() ? "/pi1/aos/remote_timestamps/pi2"
+ : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping";
pi1_remote_timestamp_event_loop.MakeWatcher(
- "/pi1/aos/remote_timestamps/pi2",
- [ping_channel_index, &ping_timestamp_count](const RemoteMessage &header) {
- VLOG(1) << "/pi1/aos/remote_timestamps/pi2 RemoteMessage "
+ channel_name, [this, channel_name, ping_channel_index,
+ &ping_timestamp_count](const RemoteMessage &header) {
+ VLOG(1) << channel_name << " RemoteMessage "
<< aos::FlatbufferToJson(&header);
EXPECT_TRUE(header.has_boot_uuid());
- if (header.channel_index() == ping_channel_index) {
- ++ping_timestamp_count;
+ if (shared() && header.channel_index() != ping_channel_index) {
+ return;
}
+ CHECK_EQ(header.channel_index(), ping_channel_index);
+ ++ping_timestamp_count;
});
// Before everything starts up, confirm there is no message.
@@ -1166,6 +1208,13 @@
pi1_remote_timestamp_thread.join();
}
+INSTANTIATE_TEST_CASE_P(
+ MessageBridgeTests, MessageBridgeParameterizedTest,
+ ::testing::Values(
+ Param{"message_bridge_test_combined_timestamps_common_config.json",
+ true},
+ Param{"message_bridge_test_common_config.json", false}));
+
} // namespace testing
} // namespace message_bridge
} // namespace aos