blob: 7fc5c1f11f5b787af5b7c0d1ed6b49c4e5af95e0 [file] [log] [blame]
Austin Schuhe84c3ed2019-12-14 15:29:48 -08001#include "gtest/gtest.h"
2
3#include <chrono>
4#include <thread>
5
Austin Schuh2f8fd752020-09-01 22:38:28 -07006#include "absl/strings/str_cat.h"
Austin Schuh4889b182020-11-18 19:11:56 -08007#include "aos/event.h"
Austin Schuhe84c3ed2019-12-14 15:29:48 -08008#include "aos/events/ping_generated.h"
9#include "aos/events/pong_generated.h"
10#include "aos/network/message_bridge_client_lib.h"
11#include "aos/network/message_bridge_server_lib.h"
Jim Ostrowski2192ddb2020-06-24 19:07:31 -070012#include "aos/network/team_number.h"
Austin Schuhe991fe22020-11-18 16:53:39 -080013#include "aos/util/file.h"
Austin Schuhe84c3ed2019-12-14 15:29:48 -080014
15namespace aos {
Austin Schuh2f8fd752020-09-01 22:38:28 -070016void SetShmBase(const std::string_view base);
17
Austin Schuhe84c3ed2019-12-14 15:29:48 -080018namespace message_bridge {
19namespace testing {
20
21namespace chrono = std::chrono;
22
Austin Schuhe991fe22020-11-18 16:53:39 -080023std::string ShmBase(const std::string_view node) {
Austin Schuh2f8fd752020-09-01 22:38:28 -070024 const char *tmpdir_c_str = getenv("TEST_TMPDIR");
25 if (tmpdir_c_str != nullptr) {
Austin Schuhe991fe22020-11-18 16:53:39 -080026 return absl::StrCat(tmpdir_c_str, "/", node);
Austin Schuh2f8fd752020-09-01 22:38:28 -070027 } else {
Austin Schuhe991fe22020-11-18 16:53:39 -080028 return absl::StrCat("/dev/shm/", node);
Austin Schuh2f8fd752020-09-01 22:38:28 -070029 }
30}
31
Austin Schuhe991fe22020-11-18 16:53:39 -080032void DoSetShmBase(const std::string_view node) {
33 aos::SetShmBase(ShmBase(node));
34}
35
36class MessageBridgeTest : public ::testing::Test {
Austin Schuh0de30f32020-12-06 12:44:28 -080037 public:
38 MessageBridgeTest()
39 : pi1_config(aos::configuration::ReadConfig(
40 "aos/network/message_bridge_test_server_config.json")),
41 pi2_config(aos::configuration::ReadConfig(
42 "aos/network/message_bridge_test_client_config.json")) {
43 util::UnlinkRecursive(ShmBase("pi1"));
44 util::UnlinkRecursive(ShmBase("pi2"));
45 }
Austin Schuhe991fe22020-11-18 16:53:39 -080046
Austin Schuh0de30f32020-12-06 12:44:28 -080047 aos::FlatbufferDetachedBuffer<aos::Configuration> pi1_config;
48 aos::FlatbufferDetachedBuffer<aos::Configuration> pi2_config;
Austin Schuhe991fe22020-11-18 16:53:39 -080049};
50
Austin Schuhe84c3ed2019-12-14 15:29:48 -080051// Test that we can send a ping message over sctp and receive it.
Austin Schuhe991fe22020-11-18 16:53:39 -080052TEST_F(MessageBridgeTest, PingPong) {
Austin Schuhe84c3ed2019-12-14 15:29:48 -080053 // This is rather annoying to set up. We need to start up a client and
54 // server, on the same node, but get them to think that they are on different
55 // nodes.
56 //
57 // We then get to wait until they are connected.
58 //
59 // After they are connected, we send a Ping message.
60 //
61 // On the other end, we receive a Pong message.
62 //
63 // But, we need the client to not post directly to "/test" like it would in a
64 // real system, otherwise we will re-send the ping message... So, use an
65 // application specific map to have the client post somewhere else.
66 //
67 // To top this all off, each of these needs to be done with a ShmEventLoop,
68 // which needs to run in a separate thread... And it is really hard to get
69 // everything started up reliably. So just be super generous on timeouts and
70 // hope for the best. We can be more generous in the future if we need to.
71 //
72 // We are faking the application names by passing in --application_name=foo
Austin Schuh2f8fd752020-09-01 22:38:28 -070073 DoSetShmBase("pi1");
Austin Schuhe84c3ed2019-12-14 15:29:48 -080074 FLAGS_application_name = "pi1_message_bridge_server";
75 // Force ourselves to be "raspberrypi" and allocate everything.
76 FLAGS_override_hostname = "raspberrypi";
Austin Schuh2f8fd752020-09-01 22:38:28 -070077
Austin Schuhe991fe22020-11-18 16:53:39 -080078 aos::ShmEventLoop pi1_server_event_loop(&pi1_config.message());
Austin Schuh744153b2020-12-26 00:02:29 -080079 pi1_server_event_loop.SetRuntimeRealtimePriority(1);
Austin Schuh7bc59052020-02-16 23:48:33 -080080 MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
81
82 FLAGS_application_name = "pi1_message_bridge_client";
Austin Schuhe991fe22020-11-18 16:53:39 -080083 aos::ShmEventLoop pi1_client_event_loop(&pi1_config.message());
Austin Schuh744153b2020-12-26 00:02:29 -080084 pi1_client_event_loop.SetRuntimeRealtimePriority(1);
Austin Schuh7bc59052020-02-16 23:48:33 -080085 MessageBridgeClient pi1_message_bridge_client(&pi1_client_event_loop);
Austin Schuhe84c3ed2019-12-14 15:29:48 -080086
87 // And build the app which sends the pings.
88 FLAGS_application_name = "ping";
Austin Schuhe991fe22020-11-18 16:53:39 -080089 aos::ShmEventLoop ping_event_loop(&pi1_config.message());
Austin Schuhe84c3ed2019-12-14 15:29:48 -080090 aos::Sender<examples::Ping> ping_sender =
91 ping_event_loop.MakeSender<examples::Ping>("/test");
92
Austin Schuhe991fe22020-11-18 16:53:39 -080093 aos::ShmEventLoop pi1_test_event_loop(&pi1_config.message());
Austin Schuh0de30f32020-12-06 12:44:28 -080094 aos::Fetcher<RemoteMessage> message_header_fetcher1 =
95 pi1_test_event_loop.MakeFetcher<RemoteMessage>(
Austin Schuh2f8fd752020-09-01 22:38:28 -070096 "/pi1/aos/remote_timestamps/pi2");
97
98 // Fetchers for confirming the remote timestamps made it.
99 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
100 ping_event_loop.MakeFetcher<examples::Ping>("/test");
101 aos::Fetcher<Timestamp> pi1_on_pi1_timestamp_fetcher =
102 ping_event_loop.MakeFetcher<Timestamp>("/aos");
103
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800104 // Now do it for "raspberrypi2", the client.
105 FLAGS_application_name = "pi2_message_bridge_client";
106 FLAGS_override_hostname = "raspberrypi2";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700107 DoSetShmBase("pi2");
108
Austin Schuh5344c352020-04-12 17:04:26 -0700109 aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
Austin Schuh744153b2020-12-26 00:02:29 -0800110 pi2_client_event_loop.SetRuntimeRealtimePriority(1);
Austin Schuh7bc59052020-02-16 23:48:33 -0800111 MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
112
113 FLAGS_application_name = "pi2_message_bridge_server";
Austin Schuh5344c352020-04-12 17:04:26 -0700114 aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
Austin Schuh744153b2020-12-26 00:02:29 -0800115 pi2_server_event_loop.SetRuntimeRealtimePriority(1);
Austin Schuh7bc59052020-02-16 23:48:33 -0800116 MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800117
118 // And build the app which sends the pongs.
119 FLAGS_application_name = "pong";
Austin Schuh5344c352020-04-12 17:04:26 -0700120 aos::ShmEventLoop pong_event_loop(&pi2_config.message());
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800121
Austin Schuh7bc59052020-02-16 23:48:33 -0800122 // And build the app for testing.
123 FLAGS_application_name = "test";
Austin Schuh5344c352020-04-12 17:04:26 -0700124 aos::ShmEventLoop test_event_loop(&pi2_config.message());
Austin Schuh7bc59052020-02-16 23:48:33 -0800125
126 aos::Fetcher<ClientStatistics> client_statistics_fetcher =
127 test_event_loop.MakeFetcher<ClientStatistics>("/aos");
Austin Schuh0de30f32020-12-06 12:44:28 -0800128 aos::Fetcher<RemoteMessage> message_header_fetcher2 =
129 test_event_loop.MakeFetcher<RemoteMessage>(
Austin Schuh2f8fd752020-09-01 22:38:28 -0700130 "/pi2/aos/remote_timestamps/pi1");
131
132 // Event loop for fetching data delivered to pi2 from pi1 to match up
133 // messages.
134 aos::ShmEventLoop delivered_messages_event_loop(&pi2_config.message());
135 aos::Fetcher<Timestamp> pi1_on_pi2_timestamp_fetcher =
136 delivered_messages_event_loop.MakeFetcher<Timestamp>("/pi1/aos");
137 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
138 delivered_messages_event_loop.MakeFetcher<examples::Ping>("/test");
139 EXPECT_FALSE(ping_on_pi2_fetcher.Fetch());
140 EXPECT_FALSE(pi1_on_pi2_timestamp_fetcher.Fetch());
Austin Schuh7bc59052020-02-16 23:48:33 -0800141
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800142 // Count the pongs.
143 int pong_count = 0;
144 pong_event_loop.MakeWatcher(
Austin Schuh2f8fd752020-09-01 22:38:28 -0700145 "/test", [&pong_count](const examples::Ping &ping) {
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800146 ++pong_count;
Austin Schuh1ca49e92020-12-11 00:01:27 -0800147 VLOG(1) << "Got ping back " << FlatbufferToJson(&ping);
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800148 });
149
150 FLAGS_override_hostname = "";
151
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800152 // Wait until we are connected, then send.
153 int ping_count = 0;
Austin Schuh7bc59052020-02-16 23:48:33 -0800154 int pi1_server_statistics_count = 0;
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800155 ping_event_loop.MakeWatcher(
Austin Schuh196a4452020-03-15 23:12:03 -0700156 "/pi1/aos",
Austin Schuh7bc59052020-02-16 23:48:33 -0800157 [&ping_count, &pi2_client_event_loop, &ping_sender,
158 &pi1_server_statistics_count](const ServerStatistics &stats) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800159 VLOG(1) << "/pi1/aos ServerStatistics " << FlatbufferToJson(&stats);
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800160
161 ASSERT_TRUE(stats.has_connections());
162 EXPECT_EQ(stats.connections()->size(), 1);
163
164 bool connected = false;
165 for (const ServerConnection *connection : *stats.connections()) {
Austin Schuh7bc59052020-02-16 23:48:33 -0800166 // Confirm that we are estimating the server time offset correctly. It
167 // should be about 0 since we are on the same machine here.
168 if (connection->has_monotonic_offset()) {
169 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
170 chrono::milliseconds(1));
171 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
172 chrono::milliseconds(-1));
173 ++pi1_server_statistics_count;
174 }
175
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800176 if (connection->node()->name()->string_view() ==
Austin Schuh7bc59052020-02-16 23:48:33 -0800177 pi2_client_event_loop.node()->name()->string_view()) {
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800178 if (connection->state() == State::CONNECTED) {
Austin Schuh20ac95d2020-12-05 17:24:19 -0800179 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800180 connected = true;
181 }
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800182 }
183 }
184
185 if (connected) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800186 VLOG(1) << "Connected! Sent ping.";
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800187 auto builder = ping_sender.MakeBuilder();
188 examples::Ping::Builder ping_builder =
189 builder.MakeBuilder<examples::Ping>();
190 ping_builder.add_value(ping_count + 971);
191 builder.Send(ping_builder.Finish());
192 ++ping_count;
193 }
194 });
195
Austin Schuh7bc59052020-02-16 23:48:33 -0800196 // Confirm both client and server statistics messages have decent offsets in
197 // them.
198 int pi2_server_statistics_count = 0;
Austin Schuh196a4452020-03-15 23:12:03 -0700199 pong_event_loop.MakeWatcher("/pi2/aos", [&pi2_server_statistics_count](
Austin Schuh7bc59052020-02-16 23:48:33 -0800200 const ServerStatistics &stats) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800201 VLOG(1) << "/pi2/aos ServerStatistics " << FlatbufferToJson(&stats);
Austin Schuh7bc59052020-02-16 23:48:33 -0800202 for (const ServerConnection *connection : *stats.connections()) {
203 if (connection->has_monotonic_offset()) {
204 ++pi2_server_statistics_count;
205 // Confirm that we are estimating the server time offset correctly. It
206 // should be about 0 since we are on the same machine here.
207 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
208 chrono::milliseconds(1));
209 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
210 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800211 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh7bc59052020-02-16 23:48:33 -0800212 }
213 }
214 });
215
216 int pi1_client_statistics_count = 0;
Austin Schuh5344c352020-04-12 17:04:26 -0700217 ping_event_loop.MakeWatcher("/pi1/aos", [&pi1_client_statistics_count](
218 const ClientStatistics &stats) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800219 VLOG(1) << "/pi1/aos ClientStatistics " << FlatbufferToJson(&stats);
Austin Schuh7bc59052020-02-16 23:48:33 -0800220
Austin Schuh5344c352020-04-12 17:04:26 -0700221 for (const ClientConnection *connection : *stats.connections()) {
222 if (connection->has_monotonic_offset()) {
223 ++pi1_client_statistics_count;
224 // It takes at least 10 microseconds to send a message between the
225 // client and server. The min (filtered) time shouldn't be over 10
226 // milliseconds on localhost. This might have to bump up if this is
227 // proving flaky.
228 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
Austin Schuh3edddcc2020-12-29 13:32:02 -0800229 chrono::milliseconds(10))
230 << " " << connection->monotonic_offset()
231 << "ns vs 10000ns on iteration " << pi1_client_statistics_count;
Austin Schuh5344c352020-04-12 17:04:26 -0700232 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
Austin Schuh3edddcc2020-12-29 13:32:02 -0800233 chrono::microseconds(10))
234 << " " << connection->monotonic_offset()
235 << "ns vs 10000ns on iteration " << pi1_client_statistics_count;
Austin Schuh5344c352020-04-12 17:04:26 -0700236 }
237 }
238 });
Austin Schuh7bc59052020-02-16 23:48:33 -0800239
240 int pi2_client_statistics_count = 0;
Austin Schuh196a4452020-03-15 23:12:03 -0700241 pong_event_loop.MakeWatcher("/pi2/aos", [&pi2_client_statistics_count](
Austin Schuh7bc59052020-02-16 23:48:33 -0800242 const ClientStatistics &stats) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800243 VLOG(1) << "/pi2/aos ClientStatistics " << FlatbufferToJson(&stats);
Austin Schuh7bc59052020-02-16 23:48:33 -0800244
245 for (const ClientConnection *connection : *stats.connections()) {
246 if (connection->has_monotonic_offset()) {
247 ++pi2_client_statistics_count;
248 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
249 chrono::milliseconds(10));
250 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
251 chrono::microseconds(10));
252 }
253 }
254 });
255
Austin Schuh196a4452020-03-15 23:12:03 -0700256 ping_event_loop.MakeWatcher("/pi1/aos", [](const Timestamp &timestamp) {
Austin Schuh7bc59052020-02-16 23:48:33 -0800257 EXPECT_TRUE(timestamp.has_offsets());
Austin Schuh1ca49e92020-12-11 00:01:27 -0800258 VLOG(1) << "/pi1/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuh7bc59052020-02-16 23:48:33 -0800259 });
Austin Schuh196a4452020-03-15 23:12:03 -0700260 pong_event_loop.MakeWatcher("/pi2/aos", [](const Timestamp &timestamp) {
Austin Schuh7bc59052020-02-16 23:48:33 -0800261 EXPECT_TRUE(timestamp.has_offsets());
Austin Schuh1ca49e92020-12-11 00:01:27 -0800262 VLOG(1) << "/pi2/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuh7bc59052020-02-16 23:48:33 -0800263 });
264
265 // Run for 5 seconds to make sure we have time to estimate the offset.
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800266 aos::TimerHandler *quit = ping_event_loop.AddTimer(
267 [&ping_event_loop]() { ping_event_loop.Exit(); });
268 ping_event_loop.OnRun([quit, &ping_event_loop]() {
Austin Schuh7bc59052020-02-16 23:48:33 -0800269 // Stop between timestamps, not exactly on them.
270 quit->Setup(ping_event_loop.monotonic_now() + chrono::milliseconds(5050));
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800271 });
272
Austin Schuh2f8fd752020-09-01 22:38:28 -0700273 // Find the channel index for both the /pi1/aos Timestamp channel and Ping
274 // channel.
275 const size_t pi1_timestamp_channel = configuration::ChannelIndex(
276 pong_event_loop.configuration(), pi1_on_pi2_timestamp_fetcher.channel());
277 const size_t ping_timestamp_channel =
278 configuration::ChannelIndex(delivered_messages_event_loop.configuration(),
279 ping_on_pi2_fetcher.channel());
280
281 for (const Channel *channel : *ping_event_loop.configuration()->channels()) {
282 VLOG(1) << "Channel "
283 << configuration::ChannelIndex(ping_event_loop.configuration(),
284 channel)
285 << " " << configuration::CleanedChannelToString(channel);
286 }
287
288 // For each remote timestamp we get back, confirm that it is either a ping
289 // message, or a timestamp we sent out. Also confirm that the timestamps are
290 // correct.
291 ping_event_loop.MakeWatcher(
292 "/pi1/aos/remote_timestamps/pi2",
293 [pi1_timestamp_channel, ping_timestamp_channel, &ping_on_pi2_fetcher,
294 &ping_on_pi1_fetcher, &pi1_on_pi2_timestamp_fetcher,
Austin Schuh0de30f32020-12-06 12:44:28 -0800295 &pi1_on_pi1_timestamp_fetcher](const RemoteMessage &header) {
296 VLOG(1) << "/pi1/aos/remote_timestamps/pi2 RemoteMessage "
297 << aos::FlatbufferToJson(&header);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700298
Austin Schuh20ac95d2020-12-05 17:24:19 -0800299 EXPECT_TRUE(header.has_boot_uuid());
300
Austin Schuh2f8fd752020-09-01 22:38:28 -0700301 const aos::monotonic_clock::time_point header_monotonic_sent_time(
302 chrono::nanoseconds(header.monotonic_sent_time()));
303 const aos::realtime_clock::time_point header_realtime_sent_time(
304 chrono::nanoseconds(header.realtime_sent_time()));
305 const aos::monotonic_clock::time_point header_monotonic_remote_time(
306 chrono::nanoseconds(header.monotonic_remote_time()));
307 const aos::realtime_clock::time_point header_realtime_remote_time(
308 chrono::nanoseconds(header.realtime_remote_time()));
309
310 const Context *pi1_context = nullptr;
311 const Context *pi2_context = nullptr;
312
313 if (header.channel_index() == pi1_timestamp_channel) {
314 // Find the forwarded message.
315 while (pi1_on_pi2_timestamp_fetcher.context().monotonic_event_time <
316 header_monotonic_sent_time) {
317 ASSERT_TRUE(pi1_on_pi2_timestamp_fetcher.FetchNext());
318 }
319
320 // And the source message.
321 while (pi1_on_pi1_timestamp_fetcher.context().monotonic_event_time <
322 header_monotonic_remote_time) {
323 ASSERT_TRUE(pi1_on_pi1_timestamp_fetcher.FetchNext());
324 }
325
326 pi1_context = &pi1_on_pi1_timestamp_fetcher.context();
327 pi2_context = &pi1_on_pi2_timestamp_fetcher.context();
328 } else if (header.channel_index() == ping_timestamp_channel) {
329 // Find the forwarded message.
330 while (ping_on_pi2_fetcher.context().monotonic_event_time <
331 header_monotonic_sent_time) {
332 ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
333 }
334
335 // And the source message.
336 while (ping_on_pi1_fetcher.context().monotonic_event_time <
337 header_monotonic_remote_time) {
338 ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
339 }
340
341 pi1_context = &ping_on_pi1_fetcher.context();
342 pi2_context = &ping_on_pi2_fetcher.context();
343 } else {
344 LOG(FATAL) << "Unknown channel";
345 }
346
347 // Confirm the forwarded message has matching timestamps to the
348 // timestamps we got back.
349 EXPECT_EQ(pi2_context->queue_index, header.queue_index());
350 EXPECT_EQ(pi2_context->monotonic_event_time,
351 header_monotonic_sent_time);
352 EXPECT_EQ(pi2_context->realtime_event_time, header_realtime_sent_time);
353 EXPECT_EQ(pi2_context->realtime_remote_time,
354 header_realtime_remote_time);
355 EXPECT_EQ(pi2_context->monotonic_remote_time,
356 header_monotonic_remote_time);
357
358 // Confirm the forwarded message also matches the source message.
359 EXPECT_EQ(pi1_context->queue_index, header.queue_index());
360 EXPECT_EQ(pi1_context->monotonic_event_time,
361 header_monotonic_remote_time);
362 EXPECT_EQ(pi1_context->realtime_event_time,
363 header_realtime_remote_time);
364 });
365
Austin Schuh7bc59052020-02-16 23:48:33 -0800366 // Start everything up. Pong is the only thing we don't know how to wait on,
367 // so start it first.
368 std::thread pong_thread([&pong_event_loop]() { pong_event_loop.Run(); });
369
370 std::thread pi1_server_thread(
371 [&pi1_server_event_loop]() { pi1_server_event_loop.Run(); });
372 std::thread pi1_client_thread(
373 [&pi1_client_event_loop]() { pi1_client_event_loop.Run(); });
374 std::thread pi2_client_thread(
375 [&pi2_client_event_loop]() { pi2_client_event_loop.Run(); });
376 std::thread pi2_server_thread(
377 [&pi2_server_event_loop]() { pi2_server_event_loop.Run(); });
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800378
379 // And go!
380 ping_event_loop.Run();
381
382 // Shut everyone else down
Austin Schuh7bc59052020-02-16 23:48:33 -0800383 pi1_server_event_loop.Exit();
384 pi1_client_event_loop.Exit();
385 pi2_client_event_loop.Exit();
386 pi2_server_event_loop.Exit();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800387 pong_event_loop.Exit();
Austin Schuh7bc59052020-02-16 23:48:33 -0800388 pi1_server_thread.join();
389 pi1_client_thread.join();
390 pi2_client_thread.join();
391 pi2_server_thread.join();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800392 pong_thread.join();
393
394 // Make sure we sent something.
395 EXPECT_GE(ping_count, 1);
396 // And got something back.
397 EXPECT_GE(pong_count, 1);
Austin Schuh7bc59052020-02-16 23:48:33 -0800398
399 // Confirm that we are estimating a monotonic offset on the client.
400 ASSERT_TRUE(client_statistics_fetcher.Fetch());
401
402 EXPECT_EQ(client_statistics_fetcher->connections()->size(), 1u);
403 EXPECT_EQ(client_statistics_fetcher->connections()
404 ->Get(0)
405 ->node()
406 ->name()
407 ->string_view(),
408 "pi1");
409
410 // Make sure the offset in one direction is less than a second.
411 EXPECT_GT(
412 client_statistics_fetcher->connections()->Get(0)->monotonic_offset(), 0);
413 EXPECT_LT(
414 client_statistics_fetcher->connections()->Get(0)->monotonic_offset(),
415 1000000000);
416
417 EXPECT_GE(pi1_server_statistics_count, 2);
418 EXPECT_GE(pi2_server_statistics_count, 2);
419 EXPECT_GE(pi1_client_statistics_count, 2);
420 EXPECT_GE(pi2_client_statistics_count, 2);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700421
422 // Confirm we got timestamps back!
423 EXPECT_TRUE(message_header_fetcher1.Fetch());
424 EXPECT_TRUE(message_header_fetcher2.Fetch());
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800425}
426
Austin Schuh5344c352020-04-12 17:04:26 -0700427// Test that the client disconnecting triggers the server offsets on both sides
428// to clear.
Austin Schuhe991fe22020-11-18 16:53:39 -0800429TEST_F(MessageBridgeTest, ClientRestart) {
Austin Schuh5344c352020-04-12 17:04:26 -0700430 // This is rather annoying to set up. We need to start up a client and
431 // server, on the same node, but get them to think that they are on different
432 // nodes.
433 //
434 // We need the client to not post directly to "/test" like it would in a
435 // real system, otherwise we will re-send the ping message... So, use an
436 // application specific map to have the client post somewhere else.
437 //
438 // To top this all off, each of these needs to be done with a ShmEventLoop,
439 // which needs to run in a separate thread... And it is really hard to get
440 // everything started up reliably. So just be super generous on timeouts and
441 // hope for the best. We can be more generous in the future if we need to.
442 //
443 // We are faking the application names by passing in --application_name=foo
Austin Schuh5344c352020-04-12 17:04:26 -0700444 FLAGS_application_name = "pi1_message_bridge_server";
445 // Force ourselves to be "raspberrypi" and allocate everything.
446 FLAGS_override_hostname = "raspberrypi";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700447 DoSetShmBase("pi1");
Austin Schuhe991fe22020-11-18 16:53:39 -0800448 aos::ShmEventLoop pi1_server_event_loop(&pi1_config.message());
Austin Schuh744153b2020-12-26 00:02:29 -0800449 pi1_server_event_loop.SetRuntimeRealtimePriority(1);
Austin Schuh5344c352020-04-12 17:04:26 -0700450 MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
451
452 FLAGS_application_name = "pi1_message_bridge_client";
Austin Schuhe991fe22020-11-18 16:53:39 -0800453 aos::ShmEventLoop pi1_client_event_loop(&pi1_config.message());
Austin Schuh744153b2020-12-26 00:02:29 -0800454 pi1_client_event_loop.SetRuntimeRealtimePriority(1);
Austin Schuh5344c352020-04-12 17:04:26 -0700455 MessageBridgeClient pi1_message_bridge_client(&pi1_client_event_loop);
456
457 // And build the app for testing.
458 FLAGS_application_name = "test1";
Austin Schuhe991fe22020-11-18 16:53:39 -0800459 aos::ShmEventLoop pi1_test_event_loop(&pi1_config.message());
Austin Schuh5344c352020-04-12 17:04:26 -0700460 aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
461 pi1_test_event_loop.MakeFetcher<ServerStatistics>("/pi1/aos");
462
463 // Now do it for "raspberrypi2", the client.
464 FLAGS_override_hostname = "raspberrypi2";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700465 DoSetShmBase("pi2");
Austin Schuh5344c352020-04-12 17:04:26 -0700466 FLAGS_application_name = "pi2_message_bridge_server";
467 aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
Austin Schuh744153b2020-12-26 00:02:29 -0800468 pi2_server_event_loop.SetRuntimeRealtimePriority(1);
Austin Schuh5344c352020-04-12 17:04:26 -0700469 MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
470
471 // And build the app for testing.
472 FLAGS_application_name = "test2";
473 aos::ShmEventLoop pi2_test_event_loop(&pi2_config.message());
474 aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
475 pi2_test_event_loop.MakeFetcher<ServerStatistics>("/pi2/aos");
476
477 // Wait until we are connected, then send.
478 pi1_test_event_loop.MakeWatcher(
479 "/pi1/aos", [](const ServerStatistics &stats) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800480 VLOG(1) << "/pi1/aos ServerStatistics " << FlatbufferToJson(&stats);
Austin Schuh5344c352020-04-12 17:04:26 -0700481 });
482
483 pi2_test_event_loop.MakeWatcher(
484 "/pi2/aos", [](const ServerStatistics &stats) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800485 VLOG(1) << "/pi2/aos ServerStatistics " << FlatbufferToJson(&stats);
Austin Schuh5344c352020-04-12 17:04:26 -0700486 });
487
488 pi1_test_event_loop.MakeWatcher(
489 "/pi1/aos", [](const ClientStatistics &stats) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800490 VLOG(1) << "/pi1/aos ClientStatistics " << FlatbufferToJson(&stats);
Austin Schuh5344c352020-04-12 17:04:26 -0700491 });
492
493 pi2_test_event_loop.MakeWatcher(
494 "/pi2/aos", [](const ClientStatistics &stats) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800495 VLOG(1) << "/pi2/aos ClientStatistics " << FlatbufferToJson(&stats);
Austin Schuh5344c352020-04-12 17:04:26 -0700496 });
497
498 pi1_test_event_loop.MakeWatcher("/pi1/aos", [](const Timestamp &timestamp) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800499 VLOG(1) << "/pi1/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuh5344c352020-04-12 17:04:26 -0700500 });
501 pi2_test_event_loop.MakeWatcher("/pi2/aos", [](const Timestamp &timestamp) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800502 VLOG(1) << "/pi2/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuh5344c352020-04-12 17:04:26 -0700503 });
504
505 // Start everything up. Pong is the only thing we don't know how to wait on,
506 // so start it first.
507 std::thread pi1_test_thread(
508 [&pi1_test_event_loop]() { pi1_test_event_loop.Run(); });
509 std::thread pi2_test_thread(
510 [&pi2_test_event_loop]() { pi2_test_event_loop.Run(); });
511
512 std::thread pi1_server_thread(
513 [&pi1_server_event_loop]() { pi1_server_event_loop.Run(); });
514 std::thread pi1_client_thread(
515 [&pi1_client_event_loop]() { pi1_client_event_loop.Run(); });
516 std::thread pi2_server_thread(
517 [&pi2_server_event_loop]() { pi2_server_event_loop.Run(); });
518
519 {
520 FLAGS_application_name = "pi2_message_bridge_client";
521 aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
Austin Schuh744153b2020-12-26 00:02:29 -0800522 pi2_client_event_loop.SetRuntimeRealtimePriority(1);
Austin Schuh5344c352020-04-12 17:04:26 -0700523 MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
524
525 // Run for 5 seconds to make sure we have time to estimate the offset.
526 aos::TimerHandler *const quit = pi2_client_event_loop.AddTimer(
527 [&pi2_client_event_loop]() { pi2_client_event_loop.Exit(); });
528 pi2_client_event_loop.OnRun([quit, &pi2_client_event_loop]() {
529 // Stop between timestamps, not exactly on them.
530 quit->Setup(pi2_client_event_loop.monotonic_now() +
531 chrono::milliseconds(3050));
532 });
533
534 // And go!
535 pi2_client_event_loop.Run();
536
537 // Now confirm we are synchronized.
538 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
539 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
540
541 const ServerConnection *const pi1_connection =
542 pi1_server_statistics_fetcher->connections()->Get(0);
543 const ServerConnection *const pi2_connection =
544 pi2_server_statistics_fetcher->connections()->Get(0);
545
546 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
547 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
548 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
549 chrono::milliseconds(1));
550 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
551 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800552 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700553
554 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
555 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
556 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
557 chrono::milliseconds(1));
558 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
559 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800560 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700561 }
562
563 std::this_thread::sleep_for(std::chrono::seconds(2));
564
565 {
566 // Now confirm we are un-synchronized.
567 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
568 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
569 const ServerConnection *const pi1_connection =
570 pi1_server_statistics_fetcher->connections()->Get(0);
571 const ServerConnection *const pi2_connection =
572 pi2_server_statistics_fetcher->connections()->Get(0);
573
574 EXPECT_EQ(pi1_connection->state(), State::DISCONNECTED);
575 EXPECT_FALSE(pi1_connection->has_monotonic_offset());
Austin Schuh20ac95d2020-12-05 17:24:19 -0800576 EXPECT_FALSE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700577 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
578 EXPECT_FALSE(pi2_connection->has_monotonic_offset());
Austin Schuh20ac95d2020-12-05 17:24:19 -0800579 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700580 }
581
582 {
583 FLAGS_application_name = "pi2_message_bridge_client";
584 aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
Austin Schuh744153b2020-12-26 00:02:29 -0800585 pi2_client_event_loop.SetRuntimeRealtimePriority(1);
Austin Schuh5344c352020-04-12 17:04:26 -0700586 MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
587
588 // Run for 5 seconds to make sure we have time to estimate the offset.
589 aos::TimerHandler *const quit = pi2_client_event_loop.AddTimer(
590 [&pi2_client_event_loop]() { pi2_client_event_loop.Exit(); });
591 pi2_client_event_loop.OnRun([quit, &pi2_client_event_loop]() {
592 // Stop between timestamps, not exactly on them.
593 quit->Setup(pi2_client_event_loop.monotonic_now() +
594 chrono::milliseconds(3050));
595 });
596
597 // And go!
598 pi2_client_event_loop.Run();
599
600 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
601 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
602
603 // Now confirm we are synchronized again.
604 const ServerConnection *const pi1_connection =
605 pi1_server_statistics_fetcher->connections()->Get(0);
606 const ServerConnection *const pi2_connection =
607 pi2_server_statistics_fetcher->connections()->Get(0);
608
609 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
610 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
611 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
612 chrono::milliseconds(1));
613 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
614 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800615 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700616
617 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
618 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
619 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
620 chrono::milliseconds(1));
621 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
622 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800623 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700624 }
625
626 // Shut everyone else down
627 pi1_server_event_loop.Exit();
628 pi1_client_event_loop.Exit();
629 pi2_server_event_loop.Exit();
630 pi1_test_event_loop.Exit();
631 pi2_test_event_loop.Exit();
632 pi1_server_thread.join();
633 pi1_client_thread.join();
634 pi2_server_thread.join();
635 pi1_test_thread.join();
636 pi2_test_thread.join();
637}
638
639// Test that the server disconnecting triggers the server offsets on the other
640// side to clear, along with the other client.
Austin Schuhe991fe22020-11-18 16:53:39 -0800641TEST_F(MessageBridgeTest, ServerRestart) {
Austin Schuh5344c352020-04-12 17:04:26 -0700642 // This is rather annoying to set up. We need to start up a client and
643 // server, on the same node, but get them to think that they are on different
644 // nodes.
645 //
646 // We need the client to not post directly to "/test" like it would in a
647 // real system, otherwise we will re-send the ping message... So, use an
648 // application specific map to have the client post somewhere else.
649 //
650 // To top this all off, each of these needs to be done with a ShmEventLoop,
651 // which needs to run in a separate thread... And it is really hard to get
652 // everything started up reliably. So just be super generous on timeouts and
653 // hope for the best. We can be more generous in the future if we need to.
654 //
655 // We are faking the application names by passing in --application_name=foo
Austin Schuh5344c352020-04-12 17:04:26 -0700656 FLAGS_application_name = "pi1_message_bridge_server";
657 // Force ourselves to be "raspberrypi" and allocate everything.
658 FLAGS_override_hostname = "raspberrypi";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700659 DoSetShmBase("pi1");
Austin Schuhe991fe22020-11-18 16:53:39 -0800660 aos::ShmEventLoop pi1_server_event_loop(&pi1_config.message());
Austin Schuh744153b2020-12-26 00:02:29 -0800661 pi1_server_event_loop.SetRuntimeRealtimePriority(1);
Austin Schuh5344c352020-04-12 17:04:26 -0700662 MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
663
664 FLAGS_application_name = "pi1_message_bridge_client";
Austin Schuhe991fe22020-11-18 16:53:39 -0800665 aos::ShmEventLoop pi1_client_event_loop(&pi1_config.message());
Austin Schuh744153b2020-12-26 00:02:29 -0800666 pi1_client_event_loop.SetRuntimeRealtimePriority(1);
Austin Schuh5344c352020-04-12 17:04:26 -0700667 MessageBridgeClient pi1_message_bridge_client(&pi1_client_event_loop);
668
669 // And build the app for testing.
670 FLAGS_application_name = "test1";
Austin Schuhe991fe22020-11-18 16:53:39 -0800671 aos::ShmEventLoop pi1_test_event_loop(&pi1_config.message());
Austin Schuh5344c352020-04-12 17:04:26 -0700672 aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
673 pi1_test_event_loop.MakeFetcher<ServerStatistics>("/pi1/aos");
674 aos::Fetcher<ClientStatistics> pi1_client_statistics_fetcher =
675 pi1_test_event_loop.MakeFetcher<ClientStatistics>("/pi1/aos");
676
677 // Now do it for "raspberrypi2", the client.
678 FLAGS_override_hostname = "raspberrypi2";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700679 DoSetShmBase("pi2");
Austin Schuh5344c352020-04-12 17:04:26 -0700680 FLAGS_application_name = "pi2_message_bridge_client";
681 aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
Austin Schuh744153b2020-12-26 00:02:29 -0800682 pi2_client_event_loop.SetRuntimeRealtimePriority(1);
Austin Schuh5344c352020-04-12 17:04:26 -0700683 MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
684
685 // And build the app for testing.
686 FLAGS_application_name = "test2";
687 aos::ShmEventLoop pi2_test_event_loop(&pi2_config.message());
688 aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
689 pi2_test_event_loop.MakeFetcher<ServerStatistics>("/pi2/aos");
690
691 // Wait until we are connected, then send.
692 pi1_test_event_loop.MakeWatcher(
693 "/pi1/aos", [](const ServerStatistics &stats) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800694 VLOG(1) << "/pi1/aos ServerStatistics " << FlatbufferToJson(&stats);
Austin Schuh5344c352020-04-12 17:04:26 -0700695 });
696
697 // Confirm both client and server statistics messages have decent offsets in
698 // them.
699 pi2_test_event_loop.MakeWatcher(
700 "/pi2/aos", [](const ServerStatistics &stats) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800701 VLOG(1) << "/pi2/aos ServerStatistics " << FlatbufferToJson(&stats);
Austin Schuh5344c352020-04-12 17:04:26 -0700702 });
703
704 pi1_test_event_loop.MakeWatcher(
705 "/pi1/aos", [](const ClientStatistics &stats) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800706 VLOG(1) << "/pi1/aos ClientStatistics " << FlatbufferToJson(&stats);
Austin Schuh5344c352020-04-12 17:04:26 -0700707 });
708
709 pi2_test_event_loop.MakeWatcher(
710 "/pi2/aos", [](const ClientStatistics &stats) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800711 VLOG(1) << "/pi2/aos ClientStatistics " << FlatbufferToJson(&stats);
Austin Schuh5344c352020-04-12 17:04:26 -0700712 });
713
714 pi1_test_event_loop.MakeWatcher("/pi1/aos", [](const Timestamp &timestamp) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800715 VLOG(1) << "/pi1/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuh5344c352020-04-12 17:04:26 -0700716 });
717 pi2_test_event_loop.MakeWatcher("/pi2/aos", [](const Timestamp &timestamp) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800718 VLOG(1) << "/pi2/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuh5344c352020-04-12 17:04:26 -0700719 });
720
721 // Start everything up. Pong is the only thing we don't know how to wait on,
722 // so start it first.
723 std::thread pi1_test_thread(
724 [&pi1_test_event_loop]() { pi1_test_event_loop.Run(); });
725 std::thread pi2_test_thread(
726 [&pi2_test_event_loop]() { pi2_test_event_loop.Run(); });
727
728 std::thread pi1_server_thread(
729 [&pi1_server_event_loop]() { pi1_server_event_loop.Run(); });
730 std::thread pi1_client_thread(
731 [&pi1_client_event_loop]() { pi1_client_event_loop.Run(); });
732 std::thread pi2_client_thread(
733 [&pi2_client_event_loop]() { pi2_client_event_loop.Run(); });
734
735 {
736 FLAGS_application_name = "pi2_message_bridge_server";
737 aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
Austin Schuh744153b2020-12-26 00:02:29 -0800738 pi2_server_event_loop.SetRuntimeRealtimePriority(1);
Austin Schuh5344c352020-04-12 17:04:26 -0700739 MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
740
741 // Run for 5 seconds to make sure we have time to estimate the offset.
742 aos::TimerHandler *const quit = pi2_server_event_loop.AddTimer(
743 [&pi2_server_event_loop]() { pi2_server_event_loop.Exit(); });
744 pi2_server_event_loop.OnRun([quit, &pi2_server_event_loop]() {
745 // Stop between timestamps, not exactly on them.
746 quit->Setup(pi2_server_event_loop.monotonic_now() +
747 chrono::milliseconds(3050));
748 });
749
750 // And go!
751 pi2_server_event_loop.Run();
752
753 // Now confirm we are synchronized.
754 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
755 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
756
757 const ServerConnection *const pi1_connection =
758 pi1_server_statistics_fetcher->connections()->Get(0);
759 const ServerConnection *const pi2_connection =
760 pi2_server_statistics_fetcher->connections()->Get(0);
761
762 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
763 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
764 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
765 chrono::milliseconds(1));
766 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
767 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800768 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700769
770 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
771 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
772 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
773 chrono::milliseconds(1));
774 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
775 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800776 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700777 }
778
779 std::this_thread::sleep_for(std::chrono::seconds(2));
780
781 {
782 // And confirm we are unsynchronized.
783 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
784 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
785
786 const ServerConnection *const pi1_server_connection =
787 pi1_server_statistics_fetcher->connections()->Get(0);
788 const ClientConnection *const pi1_client_connection =
789 pi1_client_statistics_fetcher->connections()->Get(0);
790
791 EXPECT_EQ(pi1_server_connection->state(), State::CONNECTED);
792 EXPECT_FALSE(pi1_server_connection->has_monotonic_offset());
Austin Schuh20ac95d2020-12-05 17:24:19 -0800793 EXPECT_TRUE(pi1_server_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700794 EXPECT_EQ(pi1_client_connection->state(), State::DISCONNECTED);
795 EXPECT_FALSE(pi1_client_connection->has_monotonic_offset());
796 }
797
798 {
799 FLAGS_application_name = "pi2_message_bridge_server";
800 aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
Austin Schuh744153b2020-12-26 00:02:29 -0800801 pi2_server_event_loop.SetRuntimeRealtimePriority(1);
Austin Schuh5344c352020-04-12 17:04:26 -0700802 MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
803
804 // Run for 5 seconds to make sure we have time to estimate the offset.
805 aos::TimerHandler *const quit = pi2_server_event_loop.AddTimer(
806 [&pi2_server_event_loop]() { pi2_server_event_loop.Exit(); });
807 pi2_server_event_loop.OnRun([quit, &pi2_server_event_loop]() {
808 // Stop between timestamps, not exactly on them.
809 quit->Setup(pi2_server_event_loop.monotonic_now() +
810 chrono::milliseconds(3050));
811 });
812
813 // And go!
814 pi2_server_event_loop.Run();
815
816 // And confirm we are synchronized again.
817 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
818 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
819
820 const ServerConnection *const pi1_connection =
821 pi1_server_statistics_fetcher->connections()->Get(0);
822 const ServerConnection *const pi2_connection =
823 pi2_server_statistics_fetcher->connections()->Get(0);
824
825 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
826 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
827 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
828 chrono::milliseconds(1));
829 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
830 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800831 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700832
833 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
834 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
835 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
836 chrono::milliseconds(1));
837 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
838 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800839 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700840 }
841
842 // Shut everyone else down
843 pi1_server_event_loop.Exit();
844 pi1_client_event_loop.Exit();
845 pi2_client_event_loop.Exit();
846 pi1_test_event_loop.Exit();
847 pi2_test_event_loop.Exit();
848 pi1_server_thread.join();
849 pi1_client_thread.join();
850 pi2_client_thread.join();
851 pi1_test_thread.join();
852 pi2_test_thread.join();
853}
854
Austin Schuh4889b182020-11-18 19:11:56 -0800855// TODO(austin): The above test confirms that the external state does the right
Austin Schuh5344c352020-04-12 17:04:26 -0700856// thing, but doesn't confirm that the internal state does. We either need to
857// expose a way to check the state in a thread-safe way, or need a way to jump
858// time for one node to do that.
859
Austin Schuh4889b182020-11-18 19:11:56 -0800860void SendPing(aos::Sender<examples::Ping> *sender, int value) {
861 aos::Sender<examples::Ping>::Builder builder = sender->MakeBuilder();
862 examples::Ping::Builder ping_builder = builder.MakeBuilder<examples::Ping>();
863 ping_builder.add_value(value);
864 builder.Send(ping_builder.Finish());
865}
866
867// Tests that when a message is sent before the bridge starts up, but is
868// configured as reliable, we forward it. Confirm this survives a client reset.
869TEST_F(MessageBridgeTest, ReliableSentBeforeClientStartup) {
870 DoSetShmBase("pi1");
871 // Force ourselves to be "raspberrypi" and allocate everything.
872 FLAGS_override_hostname = "raspberrypi";
873
874 FLAGS_application_name = "sender";
875 aos::ShmEventLoop send_event_loop(&pi1_config.message());
876 aos::Sender<examples::Ping> ping_sender =
877 send_event_loop.MakeSender<examples::Ping>("/test");
878 SendPing(&ping_sender, 1);
879 aos::Sender<examples::Ping> unreliable_ping_sender =
880 send_event_loop.MakeSender<examples::Ping>("/unreliable");
881 SendPing(&unreliable_ping_sender, 1);
882
883 FLAGS_application_name = "pi1_message_bridge_server";
884 aos::ShmEventLoop pi1_server_event_loop(&pi1_config.message());
Austin Schuh744153b2020-12-26 00:02:29 -0800885 pi1_server_event_loop.SetRuntimeRealtimePriority(1);
Austin Schuh4889b182020-11-18 19:11:56 -0800886 MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
887
888 FLAGS_application_name = "pi1_message_bridge_client";
889 aos::ShmEventLoop pi1_client_event_loop(&pi1_config.message());
Austin Schuh744153b2020-12-26 00:02:29 -0800890 pi1_client_event_loop.SetRuntimeRealtimePriority(1);
Austin Schuh4889b182020-11-18 19:11:56 -0800891 MessageBridgeClient pi1_message_bridge_client(&pi1_client_event_loop);
892
893 FLAGS_application_name = "pi1_timestamp";
894 aos::ShmEventLoop pi1_remote_timestamp_event_loop(&pi1_config.message());
895
896 // Now do it for "raspberrypi2", the client.
897 DoSetShmBase("pi2");
898 FLAGS_override_hostname = "raspberrypi2";
899
900 FLAGS_application_name = "pi2_message_bridge_server";
901 aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
Austin Schuh744153b2020-12-26 00:02:29 -0800902 pi2_server_event_loop.SetRuntimeRealtimePriority(1);
Austin Schuh4889b182020-11-18 19:11:56 -0800903 MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
904
905 aos::ShmEventLoop receive_event_loop(&pi2_config.message());
906 aos::Fetcher<examples::Ping> ping_fetcher =
907 receive_event_loop.MakeFetcher<examples::Ping>("/test");
908 aos::Fetcher<examples::Ping> unreliable_ping_fetcher =
909 receive_event_loop.MakeFetcher<examples::Ping>("/unreliable");
910 aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
911 receive_event_loop.MakeFetcher<ClientStatistics>("/pi2/aos");
912
913 const size_t ping_channel_index = configuration::ChannelIndex(
914 receive_event_loop.configuration(), ping_fetcher.channel());
915
916 std::atomic<int> ping_timestamp_count{0};
917 pi1_remote_timestamp_event_loop.MakeWatcher(
918 "/pi1/aos/remote_timestamps/pi2",
Austin Schuh0de30f32020-12-06 12:44:28 -0800919 [ping_channel_index, &ping_timestamp_count](const RemoteMessage &header) {
920 VLOG(1) << "/pi1/aos/remote_timestamps/pi2 RemoteMessage "
921 << aos::FlatbufferToJson(&header);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800922 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuh4889b182020-11-18 19:11:56 -0800923 if (header.channel_index() == ping_channel_index) {
924 ++ping_timestamp_count;
925 }
926 });
927
928 // Before everything starts up, confirm there is no message.
929 EXPECT_FALSE(ping_fetcher.Fetch());
930 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
931
932 // Spin up the persistant pieces.
933 std::thread pi1_server_thread(
934 [&pi1_server_event_loop]() { pi1_server_event_loop.Run(); });
935 std::thread pi1_client_thread(
936 [&pi1_client_event_loop]() { pi1_client_event_loop.Run(); });
937 std::thread pi2_server_thread(
938 [&pi2_server_event_loop]() { pi2_server_event_loop.Run(); });
939
940 // Event used to wait for the timestamp counting thread to start.
941 aos::Event event;
942 std::thread pi1_remote_timestamp_thread(
943 [&pi1_remote_timestamp_event_loop, &event]() {
944 pi1_remote_timestamp_event_loop.OnRun([&event]() { event.Set(); });
945 pi1_remote_timestamp_event_loop.Run();
946 });
947
948 event.Wait();
949
950 {
951 // Now, spin up a client for 2 seconds.
952 LOG(INFO) << "Starting first pi2 MessageBridgeClient";
953 FLAGS_application_name = "pi2_message_bridge_client";
954 aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
Austin Schuh744153b2020-12-26 00:02:29 -0800955 pi2_client_event_loop.SetRuntimeRealtimePriority(1);
Austin Schuh4889b182020-11-18 19:11:56 -0800956 MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
957
958 aos::TimerHandler *quit = pi2_client_event_loop.AddTimer(
959 [&pi2_client_event_loop]() { pi2_client_event_loop.Exit(); });
960 pi2_client_event_loop.OnRun([quit, &pi2_client_event_loop]() {
961 // Stop between timestamps, not exactly on them.
962 quit->Setup(pi2_client_event_loop.monotonic_now() +
963 chrono::milliseconds(2050));
964 });
965
966 // And go!
967 pi2_client_event_loop.Run();
968
969 // Confirm there is no detected duplicate packet.
970 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
971 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
972 ->Get(0)
973 ->duplicate_packets(),
974 0u);
975
976 EXPECT_TRUE(ping_fetcher.Fetch());
977 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
978 EXPECT_EQ(ping_timestamp_count, 1);
979 LOG(INFO) << "Shutting down first pi2 MessageBridgeClient";
980 }
981
982 {
983 // Now, spin up a second client for 2 seconds.
984 LOG(INFO) << "Starting second pi2 MessageBridgeClient";
985 FLAGS_application_name = "pi2_message_bridge_client";
986 aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
Austin Schuh744153b2020-12-26 00:02:29 -0800987 pi2_client_event_loop.SetRuntimeRealtimePriority(1);
Austin Schuh4889b182020-11-18 19:11:56 -0800988 MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
989
990 aos::TimerHandler *quit = pi2_client_event_loop.AddTimer(
991 [&pi2_client_event_loop]() { pi2_client_event_loop.Exit(); });
992 pi2_client_event_loop.OnRun([quit, &pi2_client_event_loop]() {
993 // Stop between timestamps, not exactly on them.
994 quit->Setup(pi2_client_event_loop.monotonic_now() +
995 chrono::milliseconds(5050));
996 });
997
998 // And go!
999 pi2_client_event_loop.Run();
1000
1001 // Confirm we detect the duplicate packet correctly.
1002 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1003 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1004 ->Get(0)
1005 ->duplicate_packets(),
1006 1u);
1007
1008 EXPECT_EQ(ping_timestamp_count, 1);
1009 EXPECT_FALSE(ping_fetcher.Fetch());
1010 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
1011 }
1012
1013 // Shut everyone else down
1014 pi1_server_event_loop.Exit();
1015 pi1_client_event_loop.Exit();
1016 pi2_server_event_loop.Exit();
1017 pi1_remote_timestamp_event_loop.Exit();
1018 pi1_remote_timestamp_thread.join();
1019 pi1_server_thread.join();
1020 pi1_client_thread.join();
1021 pi2_server_thread.join();
1022}
1023
1024// Tests that when a message is sent before the bridge starts up, but is
1025// configured as reliable, we forward it. Confirm this works across server
1026// resets.
1027TEST_F(MessageBridgeTest, ReliableSentBeforeServerStartup) {
1028 // Now do it for "raspberrypi2", the client.
1029 DoSetShmBase("pi2");
1030 FLAGS_override_hostname = "raspberrypi2";
1031
1032 FLAGS_application_name = "pi2_message_bridge_server";
1033 aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
Austin Schuh744153b2020-12-26 00:02:29 -08001034 pi2_server_event_loop.SetRuntimeRealtimePriority(1);
Austin Schuh4889b182020-11-18 19:11:56 -08001035 MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
1036
1037 FLAGS_application_name = "pi2_message_bridge_client";
1038 aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
Austin Schuh744153b2020-12-26 00:02:29 -08001039 pi2_client_event_loop.SetRuntimeRealtimePriority(1);
Austin Schuh4889b182020-11-18 19:11:56 -08001040 MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
1041
1042 aos::ShmEventLoop receive_event_loop(&pi2_config.message());
1043 aos::Fetcher<examples::Ping> ping_fetcher =
1044 receive_event_loop.MakeFetcher<examples::Ping>("/test");
1045 aos::Fetcher<examples::Ping> unreliable_ping_fetcher =
1046 receive_event_loop.MakeFetcher<examples::Ping>("/unreliable");
1047 aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
1048 receive_event_loop.MakeFetcher<ClientStatistics>("/pi2/aos");
1049
1050 DoSetShmBase("pi1");
1051 // Force ourselves to be "raspberrypi" and allocate everything.
1052 FLAGS_override_hostname = "raspberrypi";
1053
1054 FLAGS_application_name = "sender";
1055 aos::ShmEventLoop send_event_loop(&pi1_config.message());
1056 aos::Sender<examples::Ping> ping_sender =
1057 send_event_loop.MakeSender<examples::Ping>("/test");
1058 {
1059 aos::Sender<examples::Ping>::Builder builder = ping_sender.MakeBuilder();
1060 examples::Ping::Builder ping_builder =
1061 builder.MakeBuilder<examples::Ping>();
1062 ping_builder.add_value(1);
1063 builder.Send(ping_builder.Finish());
1064 }
1065
1066 FLAGS_application_name = "pi1_message_bridge_client";
1067 aos::ShmEventLoop pi1_client_event_loop(&pi1_config.message());
Austin Schuh744153b2020-12-26 00:02:29 -08001068 pi1_client_event_loop.SetRuntimeRealtimePriority(1);
Austin Schuh4889b182020-11-18 19:11:56 -08001069 MessageBridgeClient pi1_message_bridge_client(&pi1_client_event_loop);
1070
1071 FLAGS_application_name = "pi1_timestamp";
1072 aos::ShmEventLoop pi1_remote_timestamp_event_loop(&pi1_config.message());
1073
1074 const size_t ping_channel_index = configuration::ChannelIndex(
1075 receive_event_loop.configuration(), ping_fetcher.channel());
1076
1077 std::atomic<int> ping_timestamp_count{0};
1078 pi1_remote_timestamp_event_loop.MakeWatcher(
1079 "/pi1/aos/remote_timestamps/pi2",
Austin Schuh0de30f32020-12-06 12:44:28 -08001080 [ping_channel_index, &ping_timestamp_count](const RemoteMessage &header) {
1081 VLOG(1) << "/pi1/aos/remote_timestamps/pi2 RemoteMessage "
1082 << aos::FlatbufferToJson(&header);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001083 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuh4889b182020-11-18 19:11:56 -08001084 if (header.channel_index() == ping_channel_index) {
1085 ++ping_timestamp_count;
1086 }
1087 });
1088
1089 // Before everything starts up, confirm there is no message.
1090 EXPECT_FALSE(ping_fetcher.Fetch());
1091 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
1092
1093 // Spin up the persistant pieces.
1094 std::thread pi1_client_thread(
1095 [&pi1_client_event_loop]() { pi1_client_event_loop.Run(); });
1096 std::thread pi2_server_thread(
1097 [&pi2_server_event_loop]() { pi2_server_event_loop.Run(); });
1098 std::thread pi2_client_thread(
1099 [&pi2_client_event_loop]() { pi2_client_event_loop.Run(); });
1100
1101 // Event used to wait for the timestamp counting thread to start.
1102 aos::Event event;
1103 std::thread pi1_remote_timestamp_thread(
1104 [&pi1_remote_timestamp_event_loop, &event]() {
1105 pi1_remote_timestamp_event_loop.OnRun([&event]() { event.Set(); });
1106 pi1_remote_timestamp_event_loop.Run();
1107 });
1108
1109 event.Wait();
1110
1111 {
1112 // Now, spin up a server for 2 seconds.
1113 FLAGS_application_name = "pi1_message_bridge_server";
1114 aos::ShmEventLoop pi1_server_event_loop(&pi1_config.message());
Austin Schuh744153b2020-12-26 00:02:29 -08001115 pi1_server_event_loop.SetRuntimeRealtimePriority(1);
Austin Schuh4889b182020-11-18 19:11:56 -08001116 MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
1117
1118 aos::TimerHandler *quit = pi1_server_event_loop.AddTimer(
1119 [&pi1_server_event_loop]() { pi1_server_event_loop.Exit(); });
1120 pi1_server_event_loop.OnRun([quit, &pi1_server_event_loop]() {
1121 // Stop between timestamps, not exactly on them.
1122 quit->Setup(pi1_server_event_loop.monotonic_now() +
1123 chrono::milliseconds(2050));
1124 });
1125
1126 // And go!
1127 pi1_server_event_loop.Run();
1128
1129 // Confirm there is no detected duplicate packet.
1130 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1131 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1132 ->Get(0)
1133 ->duplicate_packets(),
1134 0u);
1135
1136 EXPECT_TRUE(ping_fetcher.Fetch());
1137 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
1138 EXPECT_EQ(ping_timestamp_count, 1);
1139 LOG(INFO) << "Shutting down first pi1 MessageBridgeServer";
1140 }
1141
1142 {
1143 // Now, spin up a second server for 2 seconds.
1144 FLAGS_application_name = "pi1_message_bridge_server";
1145 aos::ShmEventLoop pi1_server_event_loop(&pi1_config.message());
Austin Schuh744153b2020-12-26 00:02:29 -08001146 pi1_server_event_loop.SetRuntimeRealtimePriority(1);
Austin Schuh4889b182020-11-18 19:11:56 -08001147 MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
1148
1149 aos::TimerHandler *quit = pi1_server_event_loop.AddTimer(
1150 [&pi1_server_event_loop]() { pi1_server_event_loop.Exit(); });
1151 pi1_server_event_loop.OnRun([quit, &pi1_server_event_loop]() {
1152 // Stop between timestamps, not exactly on them.
1153 quit->Setup(pi1_server_event_loop.monotonic_now() +
1154 chrono::milliseconds(2050));
1155 });
1156
1157 // And go!
1158 pi1_server_event_loop.Run();
1159
1160 // Confirm we detect the duplicate packet correctly.
1161 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1162 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1163 ->Get(0)
1164 ->duplicate_packets(),
1165 1u);
1166
1167 EXPECT_EQ(ping_timestamp_count, 1);
1168 EXPECT_FALSE(ping_fetcher.Fetch());
1169 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
1170 LOG(INFO) << "Shutting down first pi1 MessageBridgeServer";
1171 }
1172
1173 // Shut everyone else down
1174 pi1_client_event_loop.Exit();
1175 pi2_server_event_loop.Exit();
1176 pi2_client_event_loop.Exit();
1177 pi1_remote_timestamp_event_loop.Exit();
1178 pi1_remote_timestamp_thread.join();
1179 pi1_client_thread.join();
1180 pi2_server_thread.join();
1181 pi2_client_thread.join();
1182}
1183
Austin Schuhe84c3ed2019-12-14 15:29:48 -08001184} // namespace testing
1185} // namespace message_bridge
1186} // namespace aos