blob: aa0a03424e60f8368f5f90a3b53f6357d5e6533c [file] [log] [blame]
Austin Schuhe84c3ed2019-12-14 15:29:48 -08001#include "gtest/gtest.h"
2
3#include <chrono>
4#include <thread>
5
Austin Schuh2f8fd752020-09-01 22:38:28 -07006#include "absl/strings/str_cat.h"
Austin Schuhe84c3ed2019-12-14 15:29:48 -08007#include "aos/events/ping_generated.h"
8#include "aos/events/pong_generated.h"
9#include "aos/network/message_bridge_client_lib.h"
10#include "aos/network/message_bridge_server_lib.h"
Jim Ostrowski2192ddb2020-06-24 19:07:31 -070011#include "aos/network/team_number.h"
Austin Schuhe84c3ed2019-12-14 15:29:48 -080012
13namespace aos {
Austin Schuh2f8fd752020-09-01 22:38:28 -070014void SetShmBase(const std::string_view base);
15
Austin Schuhe84c3ed2019-12-14 15:29:48 -080016namespace message_bridge {
17namespace testing {
18
19namespace chrono = std::chrono;
20
Austin Schuh2f8fd752020-09-01 22:38:28 -070021void DoSetShmBase(const std::string_view node) {
22 const char *tmpdir_c_str = getenv("TEST_TMPDIR");
23 if (tmpdir_c_str != nullptr) {
24 aos::SetShmBase(absl::StrCat(tmpdir_c_str, "/", node));
25 } else {
26 aos::SetShmBase(absl::StrCat("/dev/shm/", node));
27 }
28}
29
Austin Schuhe84c3ed2019-12-14 15:29:48 -080030// Test that we can send a ping message over sctp and receive it.
31TEST(MessageBridgeTest, PingPong) {
32 // This is rather annoying to set up. We need to start up a client and
33 // server, on the same node, but get them to think that they are on different
34 // nodes.
35 //
36 // We then get to wait until they are connected.
37 //
38 // After they are connected, we send a Ping message.
39 //
40 // On the other end, we receive a Pong message.
41 //
42 // But, we need the client to not post directly to "/test" like it would in a
43 // real system, otherwise we will re-send the ping message... So, use an
44 // application specific map to have the client post somewhere else.
45 //
46 // To top this all off, each of these needs to be done with a ShmEventLoop,
47 // which needs to run in a separate thread... And it is really hard to get
48 // everything started up reliably. So just be super generous on timeouts and
49 // hope for the best. We can be more generous in the future if we need to.
50 //
51 // We are faking the application names by passing in --application_name=foo
52 aos::FlatbufferDetachedBuffer<aos::Configuration> server_config =
53 aos::configuration::ReadConfig(
54 "aos/network/message_bridge_test_server_config.json");
Austin Schuh5344c352020-04-12 17:04:26 -070055 aos::FlatbufferDetachedBuffer<aos::Configuration> pi2_config =
Austin Schuhe84c3ed2019-12-14 15:29:48 -080056 aos::configuration::ReadConfig(
57 "aos/network/message_bridge_test_client_config.json");
58
Austin Schuh2f8fd752020-09-01 22:38:28 -070059 DoSetShmBase("pi1");
Austin Schuhe84c3ed2019-12-14 15:29:48 -080060 FLAGS_application_name = "pi1_message_bridge_server";
61 // Force ourselves to be "raspberrypi" and allocate everything.
62 FLAGS_override_hostname = "raspberrypi";
Austin Schuh2f8fd752020-09-01 22:38:28 -070063
Austin Schuh7bc59052020-02-16 23:48:33 -080064 aos::ShmEventLoop pi1_server_event_loop(&server_config.message());
65 MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
66
67 FLAGS_application_name = "pi1_message_bridge_client";
68 aos::ShmEventLoop pi1_client_event_loop(&server_config.message());
69 MessageBridgeClient pi1_message_bridge_client(&pi1_client_event_loop);
Austin Schuhe84c3ed2019-12-14 15:29:48 -080070
71 // And build the app which sends the pings.
72 FLAGS_application_name = "ping";
73 aos::ShmEventLoop ping_event_loop(&server_config.message());
74 aos::Sender<examples::Ping> ping_sender =
75 ping_event_loop.MakeSender<examples::Ping>("/test");
76
Austin Schuh2f8fd752020-09-01 22:38:28 -070077 aos::ShmEventLoop pi1_test_event_loop(&server_config.message());
78 aos::Fetcher<logger::MessageHeader> message_header_fetcher1 =
79 pi1_test_event_loop.MakeFetcher<logger::MessageHeader>(
80 "/pi1/aos/remote_timestamps/pi2");
81
82 // Fetchers for confirming the remote timestamps made it.
83 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
84 ping_event_loop.MakeFetcher<examples::Ping>("/test");
85 aos::Fetcher<Timestamp> pi1_on_pi1_timestamp_fetcher =
86 ping_event_loop.MakeFetcher<Timestamp>("/aos");
87
Austin Schuhe84c3ed2019-12-14 15:29:48 -080088 // Now do it for "raspberrypi2", the client.
89 FLAGS_application_name = "pi2_message_bridge_client";
90 FLAGS_override_hostname = "raspberrypi2";
Austin Schuh2f8fd752020-09-01 22:38:28 -070091 DoSetShmBase("pi2");
92
Austin Schuh5344c352020-04-12 17:04:26 -070093 aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
Austin Schuh7bc59052020-02-16 23:48:33 -080094 MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
95
96 FLAGS_application_name = "pi2_message_bridge_server";
Austin Schuh5344c352020-04-12 17:04:26 -070097 aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
Austin Schuh7bc59052020-02-16 23:48:33 -080098 MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
Austin Schuhe84c3ed2019-12-14 15:29:48 -080099
100 // And build the app which sends the pongs.
101 FLAGS_application_name = "pong";
Austin Schuh5344c352020-04-12 17:04:26 -0700102 aos::ShmEventLoop pong_event_loop(&pi2_config.message());
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800103
Austin Schuh7bc59052020-02-16 23:48:33 -0800104 // And build the app for testing.
105 FLAGS_application_name = "test";
Austin Schuh5344c352020-04-12 17:04:26 -0700106 aos::ShmEventLoop test_event_loop(&pi2_config.message());
Austin Schuh7bc59052020-02-16 23:48:33 -0800107
108 aos::Fetcher<ClientStatistics> client_statistics_fetcher =
109 test_event_loop.MakeFetcher<ClientStatistics>("/aos");
Austin Schuh2f8fd752020-09-01 22:38:28 -0700110 aos::Fetcher<logger::MessageHeader> message_header_fetcher2 =
111 test_event_loop.MakeFetcher<logger::MessageHeader>(
112 "/pi2/aos/remote_timestamps/pi1");
113
114 // Event loop for fetching data delivered to pi2 from pi1 to match up
115 // messages.
116 aos::ShmEventLoop delivered_messages_event_loop(&pi2_config.message());
117 aos::Fetcher<Timestamp> pi1_on_pi2_timestamp_fetcher =
118 delivered_messages_event_loop.MakeFetcher<Timestamp>("/pi1/aos");
119 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
120 delivered_messages_event_loop.MakeFetcher<examples::Ping>("/test");
121 EXPECT_FALSE(ping_on_pi2_fetcher.Fetch());
122 EXPECT_FALSE(pi1_on_pi2_timestamp_fetcher.Fetch());
Austin Schuh7bc59052020-02-16 23:48:33 -0800123
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800124 // Count the pongs.
125 int pong_count = 0;
126 pong_event_loop.MakeWatcher(
Austin Schuh2f8fd752020-09-01 22:38:28 -0700127 "/test", [&pong_count](const examples::Ping &ping) {
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800128 ++pong_count;
129 LOG(INFO) << "Got ping back " << FlatbufferToJson(&ping);
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800130 });
131
132 FLAGS_override_hostname = "";
133
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800134 // Wait until we are connected, then send.
135 int ping_count = 0;
Austin Schuh7bc59052020-02-16 23:48:33 -0800136 int pi1_server_statistics_count = 0;
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800137 ping_event_loop.MakeWatcher(
Austin Schuh196a4452020-03-15 23:12:03 -0700138 "/pi1/aos",
Austin Schuh7bc59052020-02-16 23:48:33 -0800139 [&ping_count, &pi2_client_event_loop, &ping_sender,
140 &pi1_server_statistics_count](const ServerStatistics &stats) {
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800141 LOG(INFO) << FlatbufferToJson(&stats);
142
143 ASSERT_TRUE(stats.has_connections());
144 EXPECT_EQ(stats.connections()->size(), 1);
145
146 bool connected = false;
147 for (const ServerConnection *connection : *stats.connections()) {
Austin Schuh7bc59052020-02-16 23:48:33 -0800148 // Confirm that we are estimating the server time offset correctly. It
149 // should be about 0 since we are on the same machine here.
150 if (connection->has_monotonic_offset()) {
151 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
152 chrono::milliseconds(1));
153 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
154 chrono::milliseconds(-1));
155 ++pi1_server_statistics_count;
156 }
157
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800158 if (connection->node()->name()->string_view() ==
Austin Schuh7bc59052020-02-16 23:48:33 -0800159 pi2_client_event_loop.node()->name()->string_view()) {
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800160 if (connection->state() == State::CONNECTED) {
161 connected = true;
162 }
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800163 }
164 }
165
166 if (connected) {
167 LOG(INFO) << "Connected! Sent ping.";
168 auto builder = ping_sender.MakeBuilder();
169 examples::Ping::Builder ping_builder =
170 builder.MakeBuilder<examples::Ping>();
171 ping_builder.add_value(ping_count + 971);
172 builder.Send(ping_builder.Finish());
173 ++ping_count;
174 }
175 });
176
Austin Schuh7bc59052020-02-16 23:48:33 -0800177 // Confirm both client and server statistics messages have decent offsets in
178 // them.
179 int pi2_server_statistics_count = 0;
Austin Schuh196a4452020-03-15 23:12:03 -0700180 pong_event_loop.MakeWatcher("/pi2/aos", [&pi2_server_statistics_count](
Austin Schuh7bc59052020-02-16 23:48:33 -0800181 const ServerStatistics &stats) {
182 LOG(INFO) << FlatbufferToJson(&stats);
183 for (const ServerConnection *connection : *stats.connections()) {
184 if (connection->has_monotonic_offset()) {
185 ++pi2_server_statistics_count;
186 // Confirm that we are estimating the server time offset correctly. It
187 // should be about 0 since we are on the same machine here.
188 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
189 chrono::milliseconds(1));
190 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
191 chrono::milliseconds(-1));
192 }
193 }
194 });
195
196 int pi1_client_statistics_count = 0;
Austin Schuh5344c352020-04-12 17:04:26 -0700197 ping_event_loop.MakeWatcher("/pi1/aos", [&pi1_client_statistics_count](
198 const ClientStatistics &stats) {
199 LOG(INFO) << FlatbufferToJson(&stats);
Austin Schuh7bc59052020-02-16 23:48:33 -0800200
Austin Schuh5344c352020-04-12 17:04:26 -0700201 for (const ClientConnection *connection : *stats.connections()) {
202 if (connection->has_monotonic_offset()) {
203 ++pi1_client_statistics_count;
204 // It takes at least 10 microseconds to send a message between the
205 // client and server. The min (filtered) time shouldn't be over 10
206 // milliseconds on localhost. This might have to bump up if this is
207 // proving flaky.
208 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
209 chrono::milliseconds(10));
210 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
211 chrono::microseconds(10));
212 }
213 }
214 });
Austin Schuh7bc59052020-02-16 23:48:33 -0800215
216 int pi2_client_statistics_count = 0;
Austin Schuh196a4452020-03-15 23:12:03 -0700217 pong_event_loop.MakeWatcher("/pi2/aos", [&pi2_client_statistics_count](
Austin Schuh7bc59052020-02-16 23:48:33 -0800218 const ClientStatistics &stats) {
219 LOG(INFO) << FlatbufferToJson(&stats);
220
221 for (const ClientConnection *connection : *stats.connections()) {
222 if (connection->has_monotonic_offset()) {
223 ++pi2_client_statistics_count;
224 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
225 chrono::milliseconds(10));
226 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
227 chrono::microseconds(10));
228 }
229 }
230 });
231
Austin Schuh196a4452020-03-15 23:12:03 -0700232 ping_event_loop.MakeWatcher("/pi1/aos", [](const Timestamp &timestamp) {
Austin Schuh7bc59052020-02-16 23:48:33 -0800233 EXPECT_TRUE(timestamp.has_offsets());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700234 LOG(INFO) << "/pi1/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuh7bc59052020-02-16 23:48:33 -0800235 });
Austin Schuh196a4452020-03-15 23:12:03 -0700236 pong_event_loop.MakeWatcher("/pi2/aos", [](const Timestamp &timestamp) {
Austin Schuh7bc59052020-02-16 23:48:33 -0800237 EXPECT_TRUE(timestamp.has_offsets());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700238 LOG(INFO) << "/pi2/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuh7bc59052020-02-16 23:48:33 -0800239 });
240
241 // Run for 5 seconds to make sure we have time to estimate the offset.
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800242 aos::TimerHandler *quit = ping_event_loop.AddTimer(
243 [&ping_event_loop]() { ping_event_loop.Exit(); });
244 ping_event_loop.OnRun([quit, &ping_event_loop]() {
Austin Schuh7bc59052020-02-16 23:48:33 -0800245 // Stop between timestamps, not exactly on them.
246 quit->Setup(ping_event_loop.monotonic_now() + chrono::milliseconds(5050));
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800247 });
248
Austin Schuh2f8fd752020-09-01 22:38:28 -0700249 // Find the channel index for both the /pi1/aos Timestamp channel and Ping
250 // channel.
251 const size_t pi1_timestamp_channel = configuration::ChannelIndex(
252 pong_event_loop.configuration(), pi1_on_pi2_timestamp_fetcher.channel());
253 const size_t ping_timestamp_channel =
254 configuration::ChannelIndex(delivered_messages_event_loop.configuration(),
255 ping_on_pi2_fetcher.channel());
256
257 for (const Channel *channel : *ping_event_loop.configuration()->channels()) {
258 VLOG(1) << "Channel "
259 << configuration::ChannelIndex(ping_event_loop.configuration(),
260 channel)
261 << " " << configuration::CleanedChannelToString(channel);
262 }
263
264 // For each remote timestamp we get back, confirm that it is either a ping
265 // message, or a timestamp we sent out. Also confirm that the timestamps are
266 // correct.
267 ping_event_loop.MakeWatcher(
268 "/pi1/aos/remote_timestamps/pi2",
269 [pi1_timestamp_channel, ping_timestamp_channel, &ping_on_pi2_fetcher,
270 &ping_on_pi1_fetcher, &pi1_on_pi2_timestamp_fetcher,
271 &pi1_on_pi1_timestamp_fetcher](const logger::MessageHeader &header) {
272 VLOG(1) << aos::FlatbufferToJson(&header);
273
274 const aos::monotonic_clock::time_point header_monotonic_sent_time(
275 chrono::nanoseconds(header.monotonic_sent_time()));
276 const aos::realtime_clock::time_point header_realtime_sent_time(
277 chrono::nanoseconds(header.realtime_sent_time()));
278 const aos::monotonic_clock::time_point header_monotonic_remote_time(
279 chrono::nanoseconds(header.monotonic_remote_time()));
280 const aos::realtime_clock::time_point header_realtime_remote_time(
281 chrono::nanoseconds(header.realtime_remote_time()));
282
283 const Context *pi1_context = nullptr;
284 const Context *pi2_context = nullptr;
285
286 if (header.channel_index() == pi1_timestamp_channel) {
287 // Find the forwarded message.
288 while (pi1_on_pi2_timestamp_fetcher.context().monotonic_event_time <
289 header_monotonic_sent_time) {
290 ASSERT_TRUE(pi1_on_pi2_timestamp_fetcher.FetchNext());
291 }
292
293 // And the source message.
294 while (pi1_on_pi1_timestamp_fetcher.context().monotonic_event_time <
295 header_monotonic_remote_time) {
296 ASSERT_TRUE(pi1_on_pi1_timestamp_fetcher.FetchNext());
297 }
298
299 pi1_context = &pi1_on_pi1_timestamp_fetcher.context();
300 pi2_context = &pi1_on_pi2_timestamp_fetcher.context();
301 } else if (header.channel_index() == ping_timestamp_channel) {
302 // Find the forwarded message.
303 while (ping_on_pi2_fetcher.context().monotonic_event_time <
304 header_monotonic_sent_time) {
305 ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
306 }
307
308 // And the source message.
309 while (ping_on_pi1_fetcher.context().monotonic_event_time <
310 header_monotonic_remote_time) {
311 ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
312 }
313
314 pi1_context = &ping_on_pi1_fetcher.context();
315 pi2_context = &ping_on_pi2_fetcher.context();
316 } else {
317 LOG(FATAL) << "Unknown channel";
318 }
319
320 // Confirm the forwarded message has matching timestamps to the
321 // timestamps we got back.
322 EXPECT_EQ(pi2_context->queue_index, header.queue_index());
323 EXPECT_EQ(pi2_context->monotonic_event_time,
324 header_monotonic_sent_time);
325 EXPECT_EQ(pi2_context->realtime_event_time, header_realtime_sent_time);
326 EXPECT_EQ(pi2_context->realtime_remote_time,
327 header_realtime_remote_time);
328 EXPECT_EQ(pi2_context->monotonic_remote_time,
329 header_monotonic_remote_time);
330
331 // Confirm the forwarded message also matches the source message.
332 EXPECT_EQ(pi1_context->queue_index, header.queue_index());
333 EXPECT_EQ(pi1_context->monotonic_event_time,
334 header_monotonic_remote_time);
335 EXPECT_EQ(pi1_context->realtime_event_time,
336 header_realtime_remote_time);
337 });
338
Austin Schuh7bc59052020-02-16 23:48:33 -0800339 // Start everything up. Pong is the only thing we don't know how to wait on,
340 // so start it first.
341 std::thread pong_thread([&pong_event_loop]() { pong_event_loop.Run(); });
342
343 std::thread pi1_server_thread(
344 [&pi1_server_event_loop]() { pi1_server_event_loop.Run(); });
345 std::thread pi1_client_thread(
346 [&pi1_client_event_loop]() { pi1_client_event_loop.Run(); });
347 std::thread pi2_client_thread(
348 [&pi2_client_event_loop]() { pi2_client_event_loop.Run(); });
349 std::thread pi2_server_thread(
350 [&pi2_server_event_loop]() { pi2_server_event_loop.Run(); });
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800351
352 // And go!
353 ping_event_loop.Run();
354
355 // Shut everyone else down
Austin Schuh7bc59052020-02-16 23:48:33 -0800356 pi1_server_event_loop.Exit();
357 pi1_client_event_loop.Exit();
358 pi2_client_event_loop.Exit();
359 pi2_server_event_loop.Exit();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800360 pong_event_loop.Exit();
Austin Schuh7bc59052020-02-16 23:48:33 -0800361 pi1_server_thread.join();
362 pi1_client_thread.join();
363 pi2_client_thread.join();
364 pi2_server_thread.join();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800365 pong_thread.join();
366
367 // Make sure we sent something.
368 EXPECT_GE(ping_count, 1);
369 // And got something back.
370 EXPECT_GE(pong_count, 1);
Austin Schuh7bc59052020-02-16 23:48:33 -0800371
372 // Confirm that we are estimating a monotonic offset on the client.
373 ASSERT_TRUE(client_statistics_fetcher.Fetch());
374
375 EXPECT_EQ(client_statistics_fetcher->connections()->size(), 1u);
376 EXPECT_EQ(client_statistics_fetcher->connections()
377 ->Get(0)
378 ->node()
379 ->name()
380 ->string_view(),
381 "pi1");
382
383 // Make sure the offset in one direction is less than a second.
384 EXPECT_GT(
385 client_statistics_fetcher->connections()->Get(0)->monotonic_offset(), 0);
386 EXPECT_LT(
387 client_statistics_fetcher->connections()->Get(0)->monotonic_offset(),
388 1000000000);
389
390 EXPECT_GE(pi1_server_statistics_count, 2);
391 EXPECT_GE(pi2_server_statistics_count, 2);
392 EXPECT_GE(pi1_client_statistics_count, 2);
393 EXPECT_GE(pi2_client_statistics_count, 2);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700394
395 // Confirm we got timestamps back!
396 EXPECT_TRUE(message_header_fetcher1.Fetch());
397 EXPECT_TRUE(message_header_fetcher2.Fetch());
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800398}
399
Austin Schuh5344c352020-04-12 17:04:26 -0700400// Test that the client disconnecting triggers the server offsets on both sides
401// to clear.
402TEST(MessageBridgeTest, ClientRestart) {
403 // This is rather annoying to set up. We need to start up a client and
404 // server, on the same node, but get them to think that they are on different
405 // nodes.
406 //
407 // We need the client to not post directly to "/test" like it would in a
408 // real system, otherwise we will re-send the ping message... So, use an
409 // application specific map to have the client post somewhere else.
410 //
411 // To top this all off, each of these needs to be done with a ShmEventLoop,
412 // which needs to run in a separate thread... And it is really hard to get
413 // everything started up reliably. So just be super generous on timeouts and
414 // hope for the best. We can be more generous in the future if we need to.
415 //
416 // We are faking the application names by passing in --application_name=foo
417 aos::FlatbufferDetachedBuffer<aos::Configuration> server_config =
418 aos::configuration::ReadConfig(
419 "aos/network/message_bridge_test_server_config.json");
420 aos::FlatbufferDetachedBuffer<aos::Configuration> pi2_config =
421 aos::configuration::ReadConfig(
422 "aos/network/message_bridge_test_client_config.json");
423
424 FLAGS_application_name = "pi1_message_bridge_server";
425 // Force ourselves to be "raspberrypi" and allocate everything.
426 FLAGS_override_hostname = "raspberrypi";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700427 DoSetShmBase("pi1");
Austin Schuh5344c352020-04-12 17:04:26 -0700428 aos::ShmEventLoop pi1_server_event_loop(&server_config.message());
429 MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
430
431 FLAGS_application_name = "pi1_message_bridge_client";
432 aos::ShmEventLoop pi1_client_event_loop(&server_config.message());
433 MessageBridgeClient pi1_message_bridge_client(&pi1_client_event_loop);
434
435 // And build the app for testing.
436 FLAGS_application_name = "test1";
437 aos::ShmEventLoop pi1_test_event_loop(&server_config.message());
438 aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
439 pi1_test_event_loop.MakeFetcher<ServerStatistics>("/pi1/aos");
440
441 // Now do it for "raspberrypi2", the client.
442 FLAGS_override_hostname = "raspberrypi2";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700443 DoSetShmBase("pi2");
Austin Schuh5344c352020-04-12 17:04:26 -0700444 FLAGS_application_name = "pi2_message_bridge_server";
445 aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
446 MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
447
448 // And build the app for testing.
449 FLAGS_application_name = "test2";
450 aos::ShmEventLoop pi2_test_event_loop(&pi2_config.message());
451 aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
452 pi2_test_event_loop.MakeFetcher<ServerStatistics>("/pi2/aos");
453
454 // Wait until we are connected, then send.
455 pi1_test_event_loop.MakeWatcher(
456 "/pi1/aos", [](const ServerStatistics &stats) {
457 LOG(INFO) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
458 });
459
460 pi2_test_event_loop.MakeWatcher(
461 "/pi2/aos", [](const ServerStatistics &stats) {
462 LOG(INFO) << "pi2 ServerStatistics " << FlatbufferToJson(&stats);
463 });
464
465 pi1_test_event_loop.MakeWatcher(
466 "/pi1/aos", [](const ClientStatistics &stats) {
467 LOG(INFO) << "pi1 ClientStatistics " << FlatbufferToJson(&stats);
468 });
469
470 pi2_test_event_loop.MakeWatcher(
471 "/pi2/aos", [](const ClientStatistics &stats) {
472 LOG(INFO) << "pi2 ClientStatistics " << FlatbufferToJson(&stats);
473 });
474
475 pi1_test_event_loop.MakeWatcher("/pi1/aos", [](const Timestamp &timestamp) {
476 LOG(INFO) << "pi1 Timestamp " << FlatbufferToJson(&timestamp);
477 });
478 pi2_test_event_loop.MakeWatcher("/pi2/aos", [](const Timestamp &timestamp) {
479 LOG(INFO) << "pi2 Timestamp " << FlatbufferToJson(&timestamp);
480 });
481
482 // Start everything up. Pong is the only thing we don't know how to wait on,
483 // so start it first.
484 std::thread pi1_test_thread(
485 [&pi1_test_event_loop]() { pi1_test_event_loop.Run(); });
486 std::thread pi2_test_thread(
487 [&pi2_test_event_loop]() { pi2_test_event_loop.Run(); });
488
489 std::thread pi1_server_thread(
490 [&pi1_server_event_loop]() { pi1_server_event_loop.Run(); });
491 std::thread pi1_client_thread(
492 [&pi1_client_event_loop]() { pi1_client_event_loop.Run(); });
493 std::thread pi2_server_thread(
494 [&pi2_server_event_loop]() { pi2_server_event_loop.Run(); });
495
496 {
497 FLAGS_application_name = "pi2_message_bridge_client";
498 aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
499 MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
500
501 // Run for 5 seconds to make sure we have time to estimate the offset.
502 aos::TimerHandler *const quit = pi2_client_event_loop.AddTimer(
503 [&pi2_client_event_loop]() { pi2_client_event_loop.Exit(); });
504 pi2_client_event_loop.OnRun([quit, &pi2_client_event_loop]() {
505 // Stop between timestamps, not exactly on them.
506 quit->Setup(pi2_client_event_loop.monotonic_now() +
507 chrono::milliseconds(3050));
508 });
509
510 // And go!
511 pi2_client_event_loop.Run();
512
513 // Now confirm we are synchronized.
514 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
515 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
516
517 const ServerConnection *const pi1_connection =
518 pi1_server_statistics_fetcher->connections()->Get(0);
519 const ServerConnection *const pi2_connection =
520 pi2_server_statistics_fetcher->connections()->Get(0);
521
522 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
523 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
524 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
525 chrono::milliseconds(1));
526 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
527 chrono::milliseconds(-1));
528
529 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
530 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
531 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
532 chrono::milliseconds(1));
533 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
534 chrono::milliseconds(-1));
535 }
536
537 std::this_thread::sleep_for(std::chrono::seconds(2));
538
539 {
540 // Now confirm we are un-synchronized.
541 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
542 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
543 const ServerConnection *const pi1_connection =
544 pi1_server_statistics_fetcher->connections()->Get(0);
545 const ServerConnection *const pi2_connection =
546 pi2_server_statistics_fetcher->connections()->Get(0);
547
548 EXPECT_EQ(pi1_connection->state(), State::DISCONNECTED);
549 EXPECT_FALSE(pi1_connection->has_monotonic_offset());
550 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
551 EXPECT_FALSE(pi2_connection->has_monotonic_offset());
552 }
553
554 {
555 FLAGS_application_name = "pi2_message_bridge_client";
556 aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
557 MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
558
559 // Run for 5 seconds to make sure we have time to estimate the offset.
560 aos::TimerHandler *const quit = pi2_client_event_loop.AddTimer(
561 [&pi2_client_event_loop]() { pi2_client_event_loop.Exit(); });
562 pi2_client_event_loop.OnRun([quit, &pi2_client_event_loop]() {
563 // Stop between timestamps, not exactly on them.
564 quit->Setup(pi2_client_event_loop.monotonic_now() +
565 chrono::milliseconds(3050));
566 });
567
568 // And go!
569 pi2_client_event_loop.Run();
570
571 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
572 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
573
574 // Now confirm we are synchronized again.
575 const ServerConnection *const pi1_connection =
576 pi1_server_statistics_fetcher->connections()->Get(0);
577 const ServerConnection *const pi2_connection =
578 pi2_server_statistics_fetcher->connections()->Get(0);
579
580 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
581 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
582 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
583 chrono::milliseconds(1));
584 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
585 chrono::milliseconds(-1));
586
587 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
588 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
589 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
590 chrono::milliseconds(1));
591 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
592 chrono::milliseconds(-1));
593 }
594
595 // Shut everyone else down
596 pi1_server_event_loop.Exit();
597 pi1_client_event_loop.Exit();
598 pi2_server_event_loop.Exit();
599 pi1_test_event_loop.Exit();
600 pi2_test_event_loop.Exit();
601 pi1_server_thread.join();
602 pi1_client_thread.join();
603 pi2_server_thread.join();
604 pi1_test_thread.join();
605 pi2_test_thread.join();
606}
607
608// Test that the server disconnecting triggers the server offsets on the other
609// side to clear, along with the other client.
610TEST(MessageBridgeTest, ServerRestart) {
611 // This is rather annoying to set up. We need to start up a client and
612 // server, on the same node, but get them to think that they are on different
613 // nodes.
614 //
615 // We need the client to not post directly to "/test" like it would in a
616 // real system, otherwise we will re-send the ping message... So, use an
617 // application specific map to have the client post somewhere else.
618 //
619 // To top this all off, each of these needs to be done with a ShmEventLoop,
620 // which needs to run in a separate thread... And it is really hard to get
621 // everything started up reliably. So just be super generous on timeouts and
622 // hope for the best. We can be more generous in the future if we need to.
623 //
624 // We are faking the application names by passing in --application_name=foo
625 aos::FlatbufferDetachedBuffer<aos::Configuration> server_config =
626 aos::configuration::ReadConfig(
627 "aos/network/message_bridge_test_server_config.json");
628 aos::FlatbufferDetachedBuffer<aos::Configuration> pi2_config =
629 aos::configuration::ReadConfig(
630 "aos/network/message_bridge_test_client_config.json");
631
632 FLAGS_application_name = "pi1_message_bridge_server";
633 // Force ourselves to be "raspberrypi" and allocate everything.
634 FLAGS_override_hostname = "raspberrypi";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700635 DoSetShmBase("pi1");
Austin Schuh5344c352020-04-12 17:04:26 -0700636 aos::ShmEventLoop pi1_server_event_loop(&server_config.message());
637 MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
638
639 FLAGS_application_name = "pi1_message_bridge_client";
640 aos::ShmEventLoop pi1_client_event_loop(&server_config.message());
641 MessageBridgeClient pi1_message_bridge_client(&pi1_client_event_loop);
642
643 // And build the app for testing.
644 FLAGS_application_name = "test1";
645 aos::ShmEventLoop pi1_test_event_loop(&server_config.message());
646 aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
647 pi1_test_event_loop.MakeFetcher<ServerStatistics>("/pi1/aos");
648 aos::Fetcher<ClientStatistics> pi1_client_statistics_fetcher =
649 pi1_test_event_loop.MakeFetcher<ClientStatistics>("/pi1/aos");
650
651 // Now do it for "raspberrypi2", the client.
652 FLAGS_override_hostname = "raspberrypi2";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700653 DoSetShmBase("pi2");
Austin Schuh5344c352020-04-12 17:04:26 -0700654 FLAGS_application_name = "pi2_message_bridge_client";
655 aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
656 MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
657
658 // And build the app for testing.
659 FLAGS_application_name = "test2";
660 aos::ShmEventLoop pi2_test_event_loop(&pi2_config.message());
661 aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
662 pi2_test_event_loop.MakeFetcher<ServerStatistics>("/pi2/aos");
663
664 // Wait until we are connected, then send.
665 pi1_test_event_loop.MakeWatcher(
666 "/pi1/aos", [](const ServerStatistics &stats) {
667 LOG(INFO) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
668 });
669
670 // Confirm both client and server statistics messages have decent offsets in
671 // them.
672 pi2_test_event_loop.MakeWatcher(
673 "/pi2/aos", [](const ServerStatistics &stats) {
674 LOG(INFO) << "pi2 ServerStatistics " << FlatbufferToJson(&stats);
675 });
676
677 pi1_test_event_loop.MakeWatcher(
678 "/pi1/aos", [](const ClientStatistics &stats) {
679 LOG(INFO) << "pi1 ClientStatistics " << FlatbufferToJson(&stats);
680 });
681
682 pi2_test_event_loop.MakeWatcher(
683 "/pi2/aos", [](const ClientStatistics &stats) {
684 LOG(INFO) << "pi2 ClientStatistics " << FlatbufferToJson(&stats);
685 });
686
687 pi1_test_event_loop.MakeWatcher("/pi1/aos", [](const Timestamp &timestamp) {
688 LOG(INFO) << "pi1 Timestamp " << FlatbufferToJson(&timestamp);
689 });
690 pi2_test_event_loop.MakeWatcher("/pi2/aos", [](const Timestamp &timestamp) {
691 LOG(INFO) << "pi2 Timestamp " << FlatbufferToJson(&timestamp);
692 });
693
694 // Start everything up. Pong is the only thing we don't know how to wait on,
695 // so start it first.
696 std::thread pi1_test_thread(
697 [&pi1_test_event_loop]() { pi1_test_event_loop.Run(); });
698 std::thread pi2_test_thread(
699 [&pi2_test_event_loop]() { pi2_test_event_loop.Run(); });
700
701 std::thread pi1_server_thread(
702 [&pi1_server_event_loop]() { pi1_server_event_loop.Run(); });
703 std::thread pi1_client_thread(
704 [&pi1_client_event_loop]() { pi1_client_event_loop.Run(); });
705 std::thread pi2_client_thread(
706 [&pi2_client_event_loop]() { pi2_client_event_loop.Run(); });
707
708 {
709 FLAGS_application_name = "pi2_message_bridge_server";
710 aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
711 MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
712
713 // Run for 5 seconds to make sure we have time to estimate the offset.
714 aos::TimerHandler *const quit = pi2_server_event_loop.AddTimer(
715 [&pi2_server_event_loop]() { pi2_server_event_loop.Exit(); });
716 pi2_server_event_loop.OnRun([quit, &pi2_server_event_loop]() {
717 // Stop between timestamps, not exactly on them.
718 quit->Setup(pi2_server_event_loop.monotonic_now() +
719 chrono::milliseconds(3050));
720 });
721
722 // And go!
723 pi2_server_event_loop.Run();
724
725 // Now confirm we are synchronized.
726 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
727 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
728
729 const ServerConnection *const pi1_connection =
730 pi1_server_statistics_fetcher->connections()->Get(0);
731 const ServerConnection *const pi2_connection =
732 pi2_server_statistics_fetcher->connections()->Get(0);
733
734 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
735 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
736 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
737 chrono::milliseconds(1));
738 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
739 chrono::milliseconds(-1));
740
741 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
742 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
743 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
744 chrono::milliseconds(1));
745 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
746 chrono::milliseconds(-1));
747 }
748
749 std::this_thread::sleep_for(std::chrono::seconds(2));
750
751 {
752 // And confirm we are unsynchronized.
753 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
754 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
755
756 const ServerConnection *const pi1_server_connection =
757 pi1_server_statistics_fetcher->connections()->Get(0);
758 const ClientConnection *const pi1_client_connection =
759 pi1_client_statistics_fetcher->connections()->Get(0);
760
761 EXPECT_EQ(pi1_server_connection->state(), State::CONNECTED);
762 EXPECT_FALSE(pi1_server_connection->has_monotonic_offset());
763 EXPECT_EQ(pi1_client_connection->state(), State::DISCONNECTED);
764 EXPECT_FALSE(pi1_client_connection->has_monotonic_offset());
765 }
766
767 {
768 FLAGS_application_name = "pi2_message_bridge_server";
769 aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
770 MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
771
772 // Run for 5 seconds to make sure we have time to estimate the offset.
773 aos::TimerHandler *const quit = pi2_server_event_loop.AddTimer(
774 [&pi2_server_event_loop]() { pi2_server_event_loop.Exit(); });
775 pi2_server_event_loop.OnRun([quit, &pi2_server_event_loop]() {
776 // Stop between timestamps, not exactly on them.
777 quit->Setup(pi2_server_event_loop.monotonic_now() +
778 chrono::milliseconds(3050));
779 });
780
781 // And go!
782 pi2_server_event_loop.Run();
783
784 // And confirm we are synchronized again.
785 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
786 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
787
788 const ServerConnection *const pi1_connection =
789 pi1_server_statistics_fetcher->connections()->Get(0);
790 const ServerConnection *const pi2_connection =
791 pi2_server_statistics_fetcher->connections()->Get(0);
792
793 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
794 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
795 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
796 chrono::milliseconds(1));
797 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
798 chrono::milliseconds(-1));
799
800 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
801 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
802 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
803 chrono::milliseconds(1));
804 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
805 chrono::milliseconds(-1));
806 }
807
808 // Shut everyone else down
809 pi1_server_event_loop.Exit();
810 pi1_client_event_loop.Exit();
811 pi2_client_event_loop.Exit();
812 pi1_test_event_loop.Exit();
813 pi2_test_event_loop.Exit();
814 pi1_server_thread.join();
815 pi1_client_thread.join();
816 pi2_client_thread.join();
817 pi1_test_thread.join();
818 pi2_test_thread.join();
819}
820
821// TODO(austin): This test confirms that the external state does the right
822// thing, but doesn't confirm that the internal state does. We either need to
823// expose a way to check the state in a thread-safe way, or need a way to jump
824// time for one node to do that.
825
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800826} // namespace testing
827} // namespace message_bridge
828} // namespace aos