blob: 8d52292d9e74894ad89c7c1d3bf25770170eb19e [file] [log] [blame]
Austin Schuhe84c3ed2019-12-14 15:29:48 -08001#include "gtest/gtest.h"
2
3#include <chrono>
4#include <thread>
5
6#include "aos/events/ping_generated.h"
7#include "aos/events/pong_generated.h"
8#include "aos/network/message_bridge_client_lib.h"
9#include "aos/network/message_bridge_server_lib.h"
Jim Ostrowski2192ddb2020-06-24 19:07:31 -070010#include "aos/network/team_number.h"
Austin Schuhe84c3ed2019-12-14 15:29:48 -080011
12namespace aos {
13namespace message_bridge {
14namespace testing {
15
16namespace chrono = std::chrono;
17
18// Test that we can send a ping message over sctp and receive it.
19TEST(MessageBridgeTest, PingPong) {
20 // This is rather annoying to set up. We need to start up a client and
21 // server, on the same node, but get them to think that they are on different
22 // nodes.
23 //
24 // We then get to wait until they are connected.
25 //
26 // After they are connected, we send a Ping message.
27 //
28 // On the other end, we receive a Pong message.
29 //
30 // But, we need the client to not post directly to "/test" like it would in a
31 // real system, otherwise we will re-send the ping message... So, use an
32 // application specific map to have the client post somewhere else.
33 //
34 // To top this all off, each of these needs to be done with a ShmEventLoop,
35 // which needs to run in a separate thread... And it is really hard to get
36 // everything started up reliably. So just be super generous on timeouts and
37 // hope for the best. We can be more generous in the future if we need to.
38 //
39 // We are faking the application names by passing in --application_name=foo
40 aos::FlatbufferDetachedBuffer<aos::Configuration> server_config =
41 aos::configuration::ReadConfig(
42 "aos/network/message_bridge_test_server_config.json");
Austin Schuh5344c352020-04-12 17:04:26 -070043 aos::FlatbufferDetachedBuffer<aos::Configuration> pi2_config =
Austin Schuhe84c3ed2019-12-14 15:29:48 -080044 aos::configuration::ReadConfig(
45 "aos/network/message_bridge_test_client_config.json");
46
47 FLAGS_application_name = "pi1_message_bridge_server";
48 // Force ourselves to be "raspberrypi" and allocate everything.
49 FLAGS_override_hostname = "raspberrypi";
Austin Schuh7bc59052020-02-16 23:48:33 -080050 aos::ShmEventLoop pi1_server_event_loop(&server_config.message());
51 MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
52
53 FLAGS_application_name = "pi1_message_bridge_client";
54 aos::ShmEventLoop pi1_client_event_loop(&server_config.message());
55 MessageBridgeClient pi1_message_bridge_client(&pi1_client_event_loop);
Austin Schuhe84c3ed2019-12-14 15:29:48 -080056
57 // And build the app which sends the pings.
58 FLAGS_application_name = "ping";
59 aos::ShmEventLoop ping_event_loop(&server_config.message());
60 aos::Sender<examples::Ping> ping_sender =
61 ping_event_loop.MakeSender<examples::Ping>("/test");
62
63 // Now do it for "raspberrypi2", the client.
64 FLAGS_application_name = "pi2_message_bridge_client";
65 FLAGS_override_hostname = "raspberrypi2";
Austin Schuh5344c352020-04-12 17:04:26 -070066 aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
Austin Schuh7bc59052020-02-16 23:48:33 -080067 MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
68
69 FLAGS_application_name = "pi2_message_bridge_server";
Austin Schuh5344c352020-04-12 17:04:26 -070070 aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
Austin Schuh7bc59052020-02-16 23:48:33 -080071 MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
Austin Schuhe84c3ed2019-12-14 15:29:48 -080072
73 // And build the app which sends the pongs.
74 FLAGS_application_name = "pong";
Austin Schuh5344c352020-04-12 17:04:26 -070075 aos::ShmEventLoop pong_event_loop(&pi2_config.message());
Austin Schuhe84c3ed2019-12-14 15:29:48 -080076
Austin Schuh7bc59052020-02-16 23:48:33 -080077 // And build the app for testing.
78 FLAGS_application_name = "test";
Austin Schuh5344c352020-04-12 17:04:26 -070079 aos::ShmEventLoop test_event_loop(&pi2_config.message());
Austin Schuh7bc59052020-02-16 23:48:33 -080080
81 aos::Fetcher<ClientStatistics> client_statistics_fetcher =
82 test_event_loop.MakeFetcher<ClientStatistics>("/aos");
83
Austin Schuhe84c3ed2019-12-14 15:29:48 -080084 // Count the pongs.
85 int pong_count = 0;
86 pong_event_loop.MakeWatcher(
Austin Schuh7bc59052020-02-16 23:48:33 -080087 "/test2", [&pong_count](const examples::Ping &ping) {
Austin Schuhe84c3ed2019-12-14 15:29:48 -080088 ++pong_count;
89 LOG(INFO) << "Got ping back " << FlatbufferToJson(&ping);
Austin Schuhe84c3ed2019-12-14 15:29:48 -080090 });
91
92 FLAGS_override_hostname = "";
93
Austin Schuhe84c3ed2019-12-14 15:29:48 -080094 // Wait until we are connected, then send.
95 int ping_count = 0;
Austin Schuh7bc59052020-02-16 23:48:33 -080096 int pi1_server_statistics_count = 0;
Austin Schuhe84c3ed2019-12-14 15:29:48 -080097 ping_event_loop.MakeWatcher(
Austin Schuh196a4452020-03-15 23:12:03 -070098 "/pi1/aos",
Austin Schuh7bc59052020-02-16 23:48:33 -080099 [&ping_count, &pi2_client_event_loop, &ping_sender,
100 &pi1_server_statistics_count](const ServerStatistics &stats) {
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800101 LOG(INFO) << FlatbufferToJson(&stats);
102
103 ASSERT_TRUE(stats.has_connections());
104 EXPECT_EQ(stats.connections()->size(), 1);
105
106 bool connected = false;
107 for (const ServerConnection *connection : *stats.connections()) {
Austin Schuh7bc59052020-02-16 23:48:33 -0800108 // Confirm that we are estimating the server time offset correctly. It
109 // should be about 0 since we are on the same machine here.
110 if (connection->has_monotonic_offset()) {
111 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
112 chrono::milliseconds(1));
113 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
114 chrono::milliseconds(-1));
115 ++pi1_server_statistics_count;
116 }
117
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800118 if (connection->node()->name()->string_view() ==
Austin Schuh7bc59052020-02-16 23:48:33 -0800119 pi2_client_event_loop.node()->name()->string_view()) {
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800120 if (connection->state() == State::CONNECTED) {
121 connected = true;
122 }
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800123 }
124 }
125
126 if (connected) {
127 LOG(INFO) << "Connected! Sent ping.";
128 auto builder = ping_sender.MakeBuilder();
129 examples::Ping::Builder ping_builder =
130 builder.MakeBuilder<examples::Ping>();
131 ping_builder.add_value(ping_count + 971);
132 builder.Send(ping_builder.Finish());
133 ++ping_count;
134 }
135 });
136
Austin Schuh7bc59052020-02-16 23:48:33 -0800137 // Confirm both client and server statistics messages have decent offsets in
138 // them.
139 int pi2_server_statistics_count = 0;
Austin Schuh196a4452020-03-15 23:12:03 -0700140 pong_event_loop.MakeWatcher("/pi2/aos", [&pi2_server_statistics_count](
Austin Schuh7bc59052020-02-16 23:48:33 -0800141 const ServerStatistics &stats) {
142 LOG(INFO) << FlatbufferToJson(&stats);
143 for (const ServerConnection *connection : *stats.connections()) {
144 if (connection->has_monotonic_offset()) {
145 ++pi2_server_statistics_count;
146 // Confirm that we are estimating the server time offset correctly. It
147 // should be about 0 since we are on the same machine here.
148 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
149 chrono::milliseconds(1));
150 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
151 chrono::milliseconds(-1));
152 }
153 }
154 });
155
156 int pi1_client_statistics_count = 0;
Austin Schuh5344c352020-04-12 17:04:26 -0700157 ping_event_loop.MakeWatcher("/pi1/aos", [&pi1_client_statistics_count](
158 const ClientStatistics &stats) {
159 LOG(INFO) << FlatbufferToJson(&stats);
Austin Schuh7bc59052020-02-16 23:48:33 -0800160
Austin Schuh5344c352020-04-12 17:04:26 -0700161 for (const ClientConnection *connection : *stats.connections()) {
162 if (connection->has_monotonic_offset()) {
163 ++pi1_client_statistics_count;
164 // It takes at least 10 microseconds to send a message between the
165 // client and server. The min (filtered) time shouldn't be over 10
166 // milliseconds on localhost. This might have to bump up if this is
167 // proving flaky.
168 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
169 chrono::milliseconds(10));
170 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
171 chrono::microseconds(10));
172 }
173 }
174 });
Austin Schuh7bc59052020-02-16 23:48:33 -0800175
176 int pi2_client_statistics_count = 0;
Austin Schuh196a4452020-03-15 23:12:03 -0700177 pong_event_loop.MakeWatcher("/pi2/aos", [&pi2_client_statistics_count](
Austin Schuh7bc59052020-02-16 23:48:33 -0800178 const ClientStatistics &stats) {
179 LOG(INFO) << FlatbufferToJson(&stats);
180
181 for (const ClientConnection *connection : *stats.connections()) {
182 if (connection->has_monotonic_offset()) {
183 ++pi2_client_statistics_count;
184 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
185 chrono::milliseconds(10));
186 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
187 chrono::microseconds(10));
188 }
189 }
190 });
191
Austin Schuh196a4452020-03-15 23:12:03 -0700192 ping_event_loop.MakeWatcher("/pi1/aos", [](const Timestamp &timestamp) {
Austin Schuh7bc59052020-02-16 23:48:33 -0800193 EXPECT_TRUE(timestamp.has_offsets());
194 LOG(INFO) << FlatbufferToJson(&timestamp);
195 });
Austin Schuh196a4452020-03-15 23:12:03 -0700196 pong_event_loop.MakeWatcher("/pi2/aos", [](const Timestamp &timestamp) {
Austin Schuh7bc59052020-02-16 23:48:33 -0800197 EXPECT_TRUE(timestamp.has_offsets());
198 LOG(INFO) << FlatbufferToJson(&timestamp);
199 });
200
201 // Run for 5 seconds to make sure we have time to estimate the offset.
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800202 aos::TimerHandler *quit = ping_event_loop.AddTimer(
203 [&ping_event_loop]() { ping_event_loop.Exit(); });
204 ping_event_loop.OnRun([quit, &ping_event_loop]() {
Austin Schuh7bc59052020-02-16 23:48:33 -0800205 // Stop between timestamps, not exactly on them.
206 quit->Setup(ping_event_loop.monotonic_now() + chrono::milliseconds(5050));
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800207 });
208
Austin Schuh7bc59052020-02-16 23:48:33 -0800209 // Start everything up. Pong is the only thing we don't know how to wait on,
210 // so start it first.
211 std::thread pong_thread([&pong_event_loop]() { pong_event_loop.Run(); });
212
213 std::thread pi1_server_thread(
214 [&pi1_server_event_loop]() { pi1_server_event_loop.Run(); });
215 std::thread pi1_client_thread(
216 [&pi1_client_event_loop]() { pi1_client_event_loop.Run(); });
217 std::thread pi2_client_thread(
218 [&pi2_client_event_loop]() { pi2_client_event_loop.Run(); });
219 std::thread pi2_server_thread(
220 [&pi2_server_event_loop]() { pi2_server_event_loop.Run(); });
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800221
222 // And go!
223 ping_event_loop.Run();
224
225 // Shut everyone else down
Austin Schuh7bc59052020-02-16 23:48:33 -0800226 pi1_server_event_loop.Exit();
227 pi1_client_event_loop.Exit();
228 pi2_client_event_loop.Exit();
229 pi2_server_event_loop.Exit();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800230 pong_event_loop.Exit();
Austin Schuh7bc59052020-02-16 23:48:33 -0800231 pi1_server_thread.join();
232 pi1_client_thread.join();
233 pi2_client_thread.join();
234 pi2_server_thread.join();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800235 pong_thread.join();
236
237 // Make sure we sent something.
238 EXPECT_GE(ping_count, 1);
239 // And got something back.
240 EXPECT_GE(pong_count, 1);
Austin Schuh7bc59052020-02-16 23:48:33 -0800241
242 // Confirm that we are estimating a monotonic offset on the client.
243 ASSERT_TRUE(client_statistics_fetcher.Fetch());
244
245 EXPECT_EQ(client_statistics_fetcher->connections()->size(), 1u);
246 EXPECT_EQ(client_statistics_fetcher->connections()
247 ->Get(0)
248 ->node()
249 ->name()
250 ->string_view(),
251 "pi1");
252
253 // Make sure the offset in one direction is less than a second.
254 EXPECT_GT(
255 client_statistics_fetcher->connections()->Get(0)->monotonic_offset(), 0);
256 EXPECT_LT(
257 client_statistics_fetcher->connections()->Get(0)->monotonic_offset(),
258 1000000000);
259
260 EXPECT_GE(pi1_server_statistics_count, 2);
261 EXPECT_GE(pi2_server_statistics_count, 2);
262 EXPECT_GE(pi1_client_statistics_count, 2);
263 EXPECT_GE(pi2_client_statistics_count, 2);
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800264}
265
Austin Schuh5344c352020-04-12 17:04:26 -0700266// Test that the client disconnecting triggers the server offsets on both sides
267// to clear.
268TEST(MessageBridgeTest, ClientRestart) {
269 // This is rather annoying to set up. We need to start up a client and
270 // server, on the same node, but get them to think that they are on different
271 // nodes.
272 //
273 // We need the client to not post directly to "/test" like it would in a
274 // real system, otherwise we will re-send the ping message... So, use an
275 // application specific map to have the client post somewhere else.
276 //
277 // To top this all off, each of these needs to be done with a ShmEventLoop,
278 // which needs to run in a separate thread... And it is really hard to get
279 // everything started up reliably. So just be super generous on timeouts and
280 // hope for the best. We can be more generous in the future if we need to.
281 //
282 // We are faking the application names by passing in --application_name=foo
283 aos::FlatbufferDetachedBuffer<aos::Configuration> server_config =
284 aos::configuration::ReadConfig(
285 "aos/network/message_bridge_test_server_config.json");
286 aos::FlatbufferDetachedBuffer<aos::Configuration> pi2_config =
287 aos::configuration::ReadConfig(
288 "aos/network/message_bridge_test_client_config.json");
289
290 FLAGS_application_name = "pi1_message_bridge_server";
291 // Force ourselves to be "raspberrypi" and allocate everything.
292 FLAGS_override_hostname = "raspberrypi";
293 aos::ShmEventLoop pi1_server_event_loop(&server_config.message());
294 MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
295
296 FLAGS_application_name = "pi1_message_bridge_client";
297 aos::ShmEventLoop pi1_client_event_loop(&server_config.message());
298 MessageBridgeClient pi1_message_bridge_client(&pi1_client_event_loop);
299
300 // And build the app for testing.
301 FLAGS_application_name = "test1";
302 aos::ShmEventLoop pi1_test_event_loop(&server_config.message());
303 aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
304 pi1_test_event_loop.MakeFetcher<ServerStatistics>("/pi1/aos");
305
306 // Now do it for "raspberrypi2", the client.
307 FLAGS_override_hostname = "raspberrypi2";
308 FLAGS_application_name = "pi2_message_bridge_server";
309 aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
310 MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
311
312 // And build the app for testing.
313 FLAGS_application_name = "test2";
314 aos::ShmEventLoop pi2_test_event_loop(&pi2_config.message());
315 aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
316 pi2_test_event_loop.MakeFetcher<ServerStatistics>("/pi2/aos");
317
318 // Wait until we are connected, then send.
319 pi1_test_event_loop.MakeWatcher(
320 "/pi1/aos", [](const ServerStatistics &stats) {
321 LOG(INFO) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
322 });
323
324 pi2_test_event_loop.MakeWatcher(
325 "/pi2/aos", [](const ServerStatistics &stats) {
326 LOG(INFO) << "pi2 ServerStatistics " << FlatbufferToJson(&stats);
327 });
328
329 pi1_test_event_loop.MakeWatcher(
330 "/pi1/aos", [](const ClientStatistics &stats) {
331 LOG(INFO) << "pi1 ClientStatistics " << FlatbufferToJson(&stats);
332 });
333
334 pi2_test_event_loop.MakeWatcher(
335 "/pi2/aos", [](const ClientStatistics &stats) {
336 LOG(INFO) << "pi2 ClientStatistics " << FlatbufferToJson(&stats);
337 });
338
339 pi1_test_event_loop.MakeWatcher("/pi1/aos", [](const Timestamp &timestamp) {
340 LOG(INFO) << "pi1 Timestamp " << FlatbufferToJson(&timestamp);
341 });
342 pi2_test_event_loop.MakeWatcher("/pi2/aos", [](const Timestamp &timestamp) {
343 LOG(INFO) << "pi2 Timestamp " << FlatbufferToJson(&timestamp);
344 });
345
346 // Start everything up. Pong is the only thing we don't know how to wait on,
347 // so start it first.
348 std::thread pi1_test_thread(
349 [&pi1_test_event_loop]() { pi1_test_event_loop.Run(); });
350 std::thread pi2_test_thread(
351 [&pi2_test_event_loop]() { pi2_test_event_loop.Run(); });
352
353 std::thread pi1_server_thread(
354 [&pi1_server_event_loop]() { pi1_server_event_loop.Run(); });
355 std::thread pi1_client_thread(
356 [&pi1_client_event_loop]() { pi1_client_event_loop.Run(); });
357 std::thread pi2_server_thread(
358 [&pi2_server_event_loop]() { pi2_server_event_loop.Run(); });
359
360 {
361 FLAGS_application_name = "pi2_message_bridge_client";
362 aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
363 MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
364
365 // Run for 5 seconds to make sure we have time to estimate the offset.
366 aos::TimerHandler *const quit = pi2_client_event_loop.AddTimer(
367 [&pi2_client_event_loop]() { pi2_client_event_loop.Exit(); });
368 pi2_client_event_loop.OnRun([quit, &pi2_client_event_loop]() {
369 // Stop between timestamps, not exactly on them.
370 quit->Setup(pi2_client_event_loop.monotonic_now() +
371 chrono::milliseconds(3050));
372 });
373
374 // And go!
375 pi2_client_event_loop.Run();
376
377 // Now confirm we are synchronized.
378 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
379 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
380
381 const ServerConnection *const pi1_connection =
382 pi1_server_statistics_fetcher->connections()->Get(0);
383 const ServerConnection *const pi2_connection =
384 pi2_server_statistics_fetcher->connections()->Get(0);
385
386 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
387 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
388 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
389 chrono::milliseconds(1));
390 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
391 chrono::milliseconds(-1));
392
393 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
394 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
395 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
396 chrono::milliseconds(1));
397 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
398 chrono::milliseconds(-1));
399 }
400
401 std::this_thread::sleep_for(std::chrono::seconds(2));
402
403 {
404 // Now confirm we are un-synchronized.
405 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
406 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
407 const ServerConnection *const pi1_connection =
408 pi1_server_statistics_fetcher->connections()->Get(0);
409 const ServerConnection *const pi2_connection =
410 pi2_server_statistics_fetcher->connections()->Get(0);
411
412 EXPECT_EQ(pi1_connection->state(), State::DISCONNECTED);
413 EXPECT_FALSE(pi1_connection->has_monotonic_offset());
414 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
415 EXPECT_FALSE(pi2_connection->has_monotonic_offset());
416 }
417
418 {
419 FLAGS_application_name = "pi2_message_bridge_client";
420 aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
421 MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
422
423 // Run for 5 seconds to make sure we have time to estimate the offset.
424 aos::TimerHandler *const quit = pi2_client_event_loop.AddTimer(
425 [&pi2_client_event_loop]() { pi2_client_event_loop.Exit(); });
426 pi2_client_event_loop.OnRun([quit, &pi2_client_event_loop]() {
427 // Stop between timestamps, not exactly on them.
428 quit->Setup(pi2_client_event_loop.monotonic_now() +
429 chrono::milliseconds(3050));
430 });
431
432 // And go!
433 pi2_client_event_loop.Run();
434
435 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
436 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
437
438 // Now confirm we are synchronized again.
439 const ServerConnection *const pi1_connection =
440 pi1_server_statistics_fetcher->connections()->Get(0);
441 const ServerConnection *const pi2_connection =
442 pi2_server_statistics_fetcher->connections()->Get(0);
443
444 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
445 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
446 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
447 chrono::milliseconds(1));
448 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
449 chrono::milliseconds(-1));
450
451 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
452 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
453 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
454 chrono::milliseconds(1));
455 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
456 chrono::milliseconds(-1));
457 }
458
459 // Shut everyone else down
460 pi1_server_event_loop.Exit();
461 pi1_client_event_loop.Exit();
462 pi2_server_event_loop.Exit();
463 pi1_test_event_loop.Exit();
464 pi2_test_event_loop.Exit();
465 pi1_server_thread.join();
466 pi1_client_thread.join();
467 pi2_server_thread.join();
468 pi1_test_thread.join();
469 pi2_test_thread.join();
470}
471
472// Test that the server disconnecting triggers the server offsets on the other
473// side to clear, along with the other client.
474TEST(MessageBridgeTest, ServerRestart) {
475 // This is rather annoying to set up. We need to start up a client and
476 // server, on the same node, but get them to think that they are on different
477 // nodes.
478 //
479 // We need the client to not post directly to "/test" like it would in a
480 // real system, otherwise we will re-send the ping message... So, use an
481 // application specific map to have the client post somewhere else.
482 //
483 // To top this all off, each of these needs to be done with a ShmEventLoop,
484 // which needs to run in a separate thread... And it is really hard to get
485 // everything started up reliably. So just be super generous on timeouts and
486 // hope for the best. We can be more generous in the future if we need to.
487 //
488 // We are faking the application names by passing in --application_name=foo
489 aos::FlatbufferDetachedBuffer<aos::Configuration> server_config =
490 aos::configuration::ReadConfig(
491 "aos/network/message_bridge_test_server_config.json");
492 aos::FlatbufferDetachedBuffer<aos::Configuration> pi2_config =
493 aos::configuration::ReadConfig(
494 "aos/network/message_bridge_test_client_config.json");
495
496 FLAGS_application_name = "pi1_message_bridge_server";
497 // Force ourselves to be "raspberrypi" and allocate everything.
498 FLAGS_override_hostname = "raspberrypi";
499 aos::ShmEventLoop pi1_server_event_loop(&server_config.message());
500 MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
501
502 FLAGS_application_name = "pi1_message_bridge_client";
503 aos::ShmEventLoop pi1_client_event_loop(&server_config.message());
504 MessageBridgeClient pi1_message_bridge_client(&pi1_client_event_loop);
505
506 // And build the app for testing.
507 FLAGS_application_name = "test1";
508 aos::ShmEventLoop pi1_test_event_loop(&server_config.message());
509 aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
510 pi1_test_event_loop.MakeFetcher<ServerStatistics>("/pi1/aos");
511 aos::Fetcher<ClientStatistics> pi1_client_statistics_fetcher =
512 pi1_test_event_loop.MakeFetcher<ClientStatistics>("/pi1/aos");
513
514 // Now do it for "raspberrypi2", the client.
515 FLAGS_override_hostname = "raspberrypi2";
516 FLAGS_application_name = "pi2_message_bridge_client";
517 aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
518 MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
519
520 // And build the app for testing.
521 FLAGS_application_name = "test2";
522 aos::ShmEventLoop pi2_test_event_loop(&pi2_config.message());
523 aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
524 pi2_test_event_loop.MakeFetcher<ServerStatistics>("/pi2/aos");
525
526 // Wait until we are connected, then send.
527 pi1_test_event_loop.MakeWatcher(
528 "/pi1/aos", [](const ServerStatistics &stats) {
529 LOG(INFO) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
530 });
531
532 // Confirm both client and server statistics messages have decent offsets in
533 // them.
534 pi2_test_event_loop.MakeWatcher(
535 "/pi2/aos", [](const ServerStatistics &stats) {
536 LOG(INFO) << "pi2 ServerStatistics " << FlatbufferToJson(&stats);
537 });
538
539 pi1_test_event_loop.MakeWatcher(
540 "/pi1/aos", [](const ClientStatistics &stats) {
541 LOG(INFO) << "pi1 ClientStatistics " << FlatbufferToJson(&stats);
542 });
543
544 pi2_test_event_loop.MakeWatcher(
545 "/pi2/aos", [](const ClientStatistics &stats) {
546 LOG(INFO) << "pi2 ClientStatistics " << FlatbufferToJson(&stats);
547 });
548
549 pi1_test_event_loop.MakeWatcher("/pi1/aos", [](const Timestamp &timestamp) {
550 LOG(INFO) << "pi1 Timestamp " << FlatbufferToJson(&timestamp);
551 });
552 pi2_test_event_loop.MakeWatcher("/pi2/aos", [](const Timestamp &timestamp) {
553 LOG(INFO) << "pi2 Timestamp " << FlatbufferToJson(&timestamp);
554 });
555
556 // Start everything up. Pong is the only thing we don't know how to wait on,
557 // so start it first.
558 std::thread pi1_test_thread(
559 [&pi1_test_event_loop]() { pi1_test_event_loop.Run(); });
560 std::thread pi2_test_thread(
561 [&pi2_test_event_loop]() { pi2_test_event_loop.Run(); });
562
563 std::thread pi1_server_thread(
564 [&pi1_server_event_loop]() { pi1_server_event_loop.Run(); });
565 std::thread pi1_client_thread(
566 [&pi1_client_event_loop]() { pi1_client_event_loop.Run(); });
567 std::thread pi2_client_thread(
568 [&pi2_client_event_loop]() { pi2_client_event_loop.Run(); });
569
570 {
571 FLAGS_application_name = "pi2_message_bridge_server";
572 aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
573 MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
574
575 // Run for 5 seconds to make sure we have time to estimate the offset.
576 aos::TimerHandler *const quit = pi2_server_event_loop.AddTimer(
577 [&pi2_server_event_loop]() { pi2_server_event_loop.Exit(); });
578 pi2_server_event_loop.OnRun([quit, &pi2_server_event_loop]() {
579 // Stop between timestamps, not exactly on them.
580 quit->Setup(pi2_server_event_loop.monotonic_now() +
581 chrono::milliseconds(3050));
582 });
583
584 // And go!
585 pi2_server_event_loop.Run();
586
587 // Now confirm we are synchronized.
588 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
589 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
590
591 const ServerConnection *const pi1_connection =
592 pi1_server_statistics_fetcher->connections()->Get(0);
593 const ServerConnection *const pi2_connection =
594 pi2_server_statistics_fetcher->connections()->Get(0);
595
596 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
597 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
598 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
599 chrono::milliseconds(1));
600 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
601 chrono::milliseconds(-1));
602
603 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
604 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
605 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
606 chrono::milliseconds(1));
607 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
608 chrono::milliseconds(-1));
609 }
610
611 std::this_thread::sleep_for(std::chrono::seconds(2));
612
613 {
614 // And confirm we are unsynchronized.
615 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
616 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
617
618 const ServerConnection *const pi1_server_connection =
619 pi1_server_statistics_fetcher->connections()->Get(0);
620 const ClientConnection *const pi1_client_connection =
621 pi1_client_statistics_fetcher->connections()->Get(0);
622
623 EXPECT_EQ(pi1_server_connection->state(), State::CONNECTED);
624 EXPECT_FALSE(pi1_server_connection->has_monotonic_offset());
625 EXPECT_EQ(pi1_client_connection->state(), State::DISCONNECTED);
626 EXPECT_FALSE(pi1_client_connection->has_monotonic_offset());
627 }
628
629 {
630 FLAGS_application_name = "pi2_message_bridge_server";
631 aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
632 MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
633
634 // Run for 5 seconds to make sure we have time to estimate the offset.
635 aos::TimerHandler *const quit = pi2_server_event_loop.AddTimer(
636 [&pi2_server_event_loop]() { pi2_server_event_loop.Exit(); });
637 pi2_server_event_loop.OnRun([quit, &pi2_server_event_loop]() {
638 // Stop between timestamps, not exactly on them.
639 quit->Setup(pi2_server_event_loop.monotonic_now() +
640 chrono::milliseconds(3050));
641 });
642
643 // And go!
644 pi2_server_event_loop.Run();
645
646 // And confirm we are synchronized again.
647 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
648 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
649
650 const ServerConnection *const pi1_connection =
651 pi1_server_statistics_fetcher->connections()->Get(0);
652 const ServerConnection *const pi2_connection =
653 pi2_server_statistics_fetcher->connections()->Get(0);
654
655 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
656 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
657 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
658 chrono::milliseconds(1));
659 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
660 chrono::milliseconds(-1));
661
662 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
663 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
664 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
665 chrono::milliseconds(1));
666 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
667 chrono::milliseconds(-1));
668 }
669
670 // Shut everyone else down
671 pi1_server_event_loop.Exit();
672 pi1_client_event_loop.Exit();
673 pi2_client_event_loop.Exit();
674 pi1_test_event_loop.Exit();
675 pi2_test_event_loop.Exit();
676 pi1_server_thread.join();
677 pi1_client_thread.join();
678 pi2_client_thread.join();
679 pi1_test_thread.join();
680 pi2_test_thread.join();
681}
682
683// TODO(austin): This test confirms that the external state does the right
684// thing, but doesn't confirm that the internal state does. We either need to
685// expose a way to check the state in a thread-safe way, or need a way to jump
686// time for one node to do that.
687
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800688} // namespace testing
689} // namespace message_bridge
690} // namespace aos