blob: 96003f79d60928e96655ff6c056311f6a9fcd00a [file] [log] [blame]
Austin Schuhe84c3ed2019-12-14 15:29:48 -08001#include "gtest/gtest.h"
2
3#include <chrono>
4#include <thread>
5
6#include "aos/events/ping_generated.h"
7#include "aos/events/pong_generated.h"
8#include "aos/network/message_bridge_client_lib.h"
9#include "aos/network/message_bridge_server_lib.h"
10
11DECLARE_string(override_hostname);
12DECLARE_string(application_name);
13
14namespace aos {
15namespace message_bridge {
16namespace testing {
17
18namespace chrono = std::chrono;
19
20// Test that we can send a ping message over sctp and receive it.
21TEST(MessageBridgeTest, PingPong) {
22 // This is rather annoying to set up. We need to start up a client and
23 // server, on the same node, but get them to think that they are on different
24 // nodes.
25 //
26 // We then get to wait until they are connected.
27 //
28 // After they are connected, we send a Ping message.
29 //
30 // On the other end, we receive a Pong message.
31 //
32 // But, we need the client to not post directly to "/test" like it would in a
33 // real system, otherwise we will re-send the ping message... So, use an
34 // application specific map to have the client post somewhere else.
35 //
36 // To top this all off, each of these needs to be done with a ShmEventLoop,
37 // which needs to run in a separate thread... And it is really hard to get
38 // everything started up reliably. So just be super generous on timeouts and
39 // hope for the best. We can be more generous in the future if we need to.
40 //
41 // We are faking the application names by passing in --application_name=foo
42 aos::FlatbufferDetachedBuffer<aos::Configuration> server_config =
43 aos::configuration::ReadConfig(
44 "aos/network/message_bridge_test_server_config.json");
Austin Schuh5344c352020-04-12 17:04:26 -070045 aos::FlatbufferDetachedBuffer<aos::Configuration> pi2_config =
Austin Schuhe84c3ed2019-12-14 15:29:48 -080046 aos::configuration::ReadConfig(
47 "aos/network/message_bridge_test_client_config.json");
48
49 FLAGS_application_name = "pi1_message_bridge_server";
50 // Force ourselves to be "raspberrypi" and allocate everything.
51 FLAGS_override_hostname = "raspberrypi";
Austin Schuh7bc59052020-02-16 23:48:33 -080052 aos::ShmEventLoop pi1_server_event_loop(&server_config.message());
53 MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
54
55 FLAGS_application_name = "pi1_message_bridge_client";
56 aos::ShmEventLoop pi1_client_event_loop(&server_config.message());
57 MessageBridgeClient pi1_message_bridge_client(&pi1_client_event_loop);
Austin Schuhe84c3ed2019-12-14 15:29:48 -080058
59 // And build the app which sends the pings.
60 FLAGS_application_name = "ping";
61 aos::ShmEventLoop ping_event_loop(&server_config.message());
62 aos::Sender<examples::Ping> ping_sender =
63 ping_event_loop.MakeSender<examples::Ping>("/test");
64
65 // Now do it for "raspberrypi2", the client.
66 FLAGS_application_name = "pi2_message_bridge_client";
67 FLAGS_override_hostname = "raspberrypi2";
Austin Schuh5344c352020-04-12 17:04:26 -070068 aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
Austin Schuh7bc59052020-02-16 23:48:33 -080069 MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
70
71 FLAGS_application_name = "pi2_message_bridge_server";
Austin Schuh5344c352020-04-12 17:04:26 -070072 aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
Austin Schuh7bc59052020-02-16 23:48:33 -080073 MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
Austin Schuhe84c3ed2019-12-14 15:29:48 -080074
75 // And build the app which sends the pongs.
76 FLAGS_application_name = "pong";
Austin Schuh5344c352020-04-12 17:04:26 -070077 aos::ShmEventLoop pong_event_loop(&pi2_config.message());
Austin Schuhe84c3ed2019-12-14 15:29:48 -080078
Austin Schuh7bc59052020-02-16 23:48:33 -080079 // And build the app for testing.
80 FLAGS_application_name = "test";
Austin Schuh5344c352020-04-12 17:04:26 -070081 aos::ShmEventLoop test_event_loop(&pi2_config.message());
Austin Schuh7bc59052020-02-16 23:48:33 -080082
83 aos::Fetcher<ClientStatistics> client_statistics_fetcher =
84 test_event_loop.MakeFetcher<ClientStatistics>("/aos");
85
Austin Schuhe84c3ed2019-12-14 15:29:48 -080086 // Count the pongs.
87 int pong_count = 0;
88 pong_event_loop.MakeWatcher(
Austin Schuh7bc59052020-02-16 23:48:33 -080089 "/test2", [&pong_count](const examples::Ping &ping) {
Austin Schuhe84c3ed2019-12-14 15:29:48 -080090 ++pong_count;
91 LOG(INFO) << "Got ping back " << FlatbufferToJson(&ping);
Austin Schuhe84c3ed2019-12-14 15:29:48 -080092 });
93
94 FLAGS_override_hostname = "";
95
Austin Schuhe84c3ed2019-12-14 15:29:48 -080096 // Wait until we are connected, then send.
97 int ping_count = 0;
Austin Schuh7bc59052020-02-16 23:48:33 -080098 int pi1_server_statistics_count = 0;
Austin Schuhe84c3ed2019-12-14 15:29:48 -080099 ping_event_loop.MakeWatcher(
Austin Schuh196a4452020-03-15 23:12:03 -0700100 "/pi1/aos",
Austin Schuh7bc59052020-02-16 23:48:33 -0800101 [&ping_count, &pi2_client_event_loop, &ping_sender,
102 &pi1_server_statistics_count](const ServerStatistics &stats) {
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800103 LOG(INFO) << FlatbufferToJson(&stats);
104
105 ASSERT_TRUE(stats.has_connections());
106 EXPECT_EQ(stats.connections()->size(), 1);
107
108 bool connected = false;
109 for (const ServerConnection *connection : *stats.connections()) {
Austin Schuh7bc59052020-02-16 23:48:33 -0800110 // Confirm that we are estimating the server time offset correctly. It
111 // should be about 0 since we are on the same machine here.
112 if (connection->has_monotonic_offset()) {
113 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
114 chrono::milliseconds(1));
115 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
116 chrono::milliseconds(-1));
117 ++pi1_server_statistics_count;
118 }
119
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800120 if (connection->node()->name()->string_view() ==
Austin Schuh7bc59052020-02-16 23:48:33 -0800121 pi2_client_event_loop.node()->name()->string_view()) {
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800122 if (connection->state() == State::CONNECTED) {
123 connected = true;
124 }
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800125 }
126 }
127
128 if (connected) {
129 LOG(INFO) << "Connected! Sent ping.";
130 auto builder = ping_sender.MakeBuilder();
131 examples::Ping::Builder ping_builder =
132 builder.MakeBuilder<examples::Ping>();
133 ping_builder.add_value(ping_count + 971);
134 builder.Send(ping_builder.Finish());
135 ++ping_count;
136 }
137 });
138
Austin Schuh7bc59052020-02-16 23:48:33 -0800139 // Confirm both client and server statistics messages have decent offsets in
140 // them.
141 int pi2_server_statistics_count = 0;
Austin Schuh196a4452020-03-15 23:12:03 -0700142 pong_event_loop.MakeWatcher("/pi2/aos", [&pi2_server_statistics_count](
Austin Schuh7bc59052020-02-16 23:48:33 -0800143 const ServerStatistics &stats) {
144 LOG(INFO) << FlatbufferToJson(&stats);
145 for (const ServerConnection *connection : *stats.connections()) {
146 if (connection->has_monotonic_offset()) {
147 ++pi2_server_statistics_count;
148 // Confirm that we are estimating the server time offset correctly. It
149 // should be about 0 since we are on the same machine here.
150 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
151 chrono::milliseconds(1));
152 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
153 chrono::milliseconds(-1));
154 }
155 }
156 });
157
158 int pi1_client_statistics_count = 0;
Austin Schuh5344c352020-04-12 17:04:26 -0700159 ping_event_loop.MakeWatcher("/pi1/aos", [&pi1_client_statistics_count](
160 const ClientStatistics &stats) {
161 LOG(INFO) << FlatbufferToJson(&stats);
Austin Schuh7bc59052020-02-16 23:48:33 -0800162
Austin Schuh5344c352020-04-12 17:04:26 -0700163 for (const ClientConnection *connection : *stats.connections()) {
164 if (connection->has_monotonic_offset()) {
165 ++pi1_client_statistics_count;
166 // It takes at least 10 microseconds to send a message between the
167 // client and server. The min (filtered) time shouldn't be over 10
168 // milliseconds on localhost. This might have to bump up if this is
169 // proving flaky.
170 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
171 chrono::milliseconds(10));
172 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
173 chrono::microseconds(10));
174 }
175 }
176 });
Austin Schuh7bc59052020-02-16 23:48:33 -0800177
178 int pi2_client_statistics_count = 0;
Austin Schuh196a4452020-03-15 23:12:03 -0700179 pong_event_loop.MakeWatcher("/pi2/aos", [&pi2_client_statistics_count](
Austin Schuh7bc59052020-02-16 23:48:33 -0800180 const ClientStatistics &stats) {
181 LOG(INFO) << FlatbufferToJson(&stats);
182
183 for (const ClientConnection *connection : *stats.connections()) {
184 if (connection->has_monotonic_offset()) {
185 ++pi2_client_statistics_count;
186 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
187 chrono::milliseconds(10));
188 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
189 chrono::microseconds(10));
190 }
191 }
192 });
193
Austin Schuh196a4452020-03-15 23:12:03 -0700194 ping_event_loop.MakeWatcher("/pi1/aos", [](const Timestamp &timestamp) {
Austin Schuh7bc59052020-02-16 23:48:33 -0800195 EXPECT_TRUE(timestamp.has_offsets());
196 LOG(INFO) << FlatbufferToJson(&timestamp);
197 });
Austin Schuh196a4452020-03-15 23:12:03 -0700198 pong_event_loop.MakeWatcher("/pi2/aos", [](const Timestamp &timestamp) {
Austin Schuh7bc59052020-02-16 23:48:33 -0800199 EXPECT_TRUE(timestamp.has_offsets());
200 LOG(INFO) << FlatbufferToJson(&timestamp);
201 });
202
203 // Run for 5 seconds to make sure we have time to estimate the offset.
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800204 aos::TimerHandler *quit = ping_event_loop.AddTimer(
205 [&ping_event_loop]() { ping_event_loop.Exit(); });
206 ping_event_loop.OnRun([quit, &ping_event_loop]() {
Austin Schuh7bc59052020-02-16 23:48:33 -0800207 // Stop between timestamps, not exactly on them.
208 quit->Setup(ping_event_loop.monotonic_now() + chrono::milliseconds(5050));
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800209 });
210
Austin Schuh7bc59052020-02-16 23:48:33 -0800211 // Start everything up. Pong is the only thing we don't know how to wait on,
212 // so start it first.
213 std::thread pong_thread([&pong_event_loop]() { pong_event_loop.Run(); });
214
215 std::thread pi1_server_thread(
216 [&pi1_server_event_loop]() { pi1_server_event_loop.Run(); });
217 std::thread pi1_client_thread(
218 [&pi1_client_event_loop]() { pi1_client_event_loop.Run(); });
219 std::thread pi2_client_thread(
220 [&pi2_client_event_loop]() { pi2_client_event_loop.Run(); });
221 std::thread pi2_server_thread(
222 [&pi2_server_event_loop]() { pi2_server_event_loop.Run(); });
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800223
224 // And go!
225 ping_event_loop.Run();
226
227 // Shut everyone else down
Austin Schuh7bc59052020-02-16 23:48:33 -0800228 pi1_server_event_loop.Exit();
229 pi1_client_event_loop.Exit();
230 pi2_client_event_loop.Exit();
231 pi2_server_event_loop.Exit();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800232 pong_event_loop.Exit();
Austin Schuh7bc59052020-02-16 23:48:33 -0800233 pi1_server_thread.join();
234 pi1_client_thread.join();
235 pi2_client_thread.join();
236 pi2_server_thread.join();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800237 pong_thread.join();
238
239 // Make sure we sent something.
240 EXPECT_GE(ping_count, 1);
241 // And got something back.
242 EXPECT_GE(pong_count, 1);
Austin Schuh7bc59052020-02-16 23:48:33 -0800243
244 // Confirm that we are estimating a monotonic offset on the client.
245 ASSERT_TRUE(client_statistics_fetcher.Fetch());
246
247 EXPECT_EQ(client_statistics_fetcher->connections()->size(), 1u);
248 EXPECT_EQ(client_statistics_fetcher->connections()
249 ->Get(0)
250 ->node()
251 ->name()
252 ->string_view(),
253 "pi1");
254
255 // Make sure the offset in one direction is less than a second.
256 EXPECT_GT(
257 client_statistics_fetcher->connections()->Get(0)->monotonic_offset(), 0);
258 EXPECT_LT(
259 client_statistics_fetcher->connections()->Get(0)->monotonic_offset(),
260 1000000000);
261
262 EXPECT_GE(pi1_server_statistics_count, 2);
263 EXPECT_GE(pi2_server_statistics_count, 2);
264 EXPECT_GE(pi1_client_statistics_count, 2);
265 EXPECT_GE(pi2_client_statistics_count, 2);
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800266}
267
Austin Schuh5344c352020-04-12 17:04:26 -0700268// Test that the client disconnecting triggers the server offsets on both sides
269// to clear.
270TEST(MessageBridgeTest, ClientRestart) {
271 // This is rather annoying to set up. We need to start up a client and
272 // server, on the same node, but get them to think that they are on different
273 // nodes.
274 //
275 // We need the client to not post directly to "/test" like it would in a
276 // real system, otherwise we will re-send the ping message... So, use an
277 // application specific map to have the client post somewhere else.
278 //
279 // To top this all off, each of these needs to be done with a ShmEventLoop,
280 // which needs to run in a separate thread... And it is really hard to get
281 // everything started up reliably. So just be super generous on timeouts and
282 // hope for the best. We can be more generous in the future if we need to.
283 //
284 // We are faking the application names by passing in --application_name=foo
285 aos::FlatbufferDetachedBuffer<aos::Configuration> server_config =
286 aos::configuration::ReadConfig(
287 "aos/network/message_bridge_test_server_config.json");
288 aos::FlatbufferDetachedBuffer<aos::Configuration> pi2_config =
289 aos::configuration::ReadConfig(
290 "aos/network/message_bridge_test_client_config.json");
291
292 FLAGS_application_name = "pi1_message_bridge_server";
293 // Force ourselves to be "raspberrypi" and allocate everything.
294 FLAGS_override_hostname = "raspberrypi";
295 aos::ShmEventLoop pi1_server_event_loop(&server_config.message());
296 MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
297
298 FLAGS_application_name = "pi1_message_bridge_client";
299 aos::ShmEventLoop pi1_client_event_loop(&server_config.message());
300 MessageBridgeClient pi1_message_bridge_client(&pi1_client_event_loop);
301
302 // And build the app for testing.
303 FLAGS_application_name = "test1";
304 aos::ShmEventLoop pi1_test_event_loop(&server_config.message());
305 aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
306 pi1_test_event_loop.MakeFetcher<ServerStatistics>("/pi1/aos");
307
308 // Now do it for "raspberrypi2", the client.
309 FLAGS_override_hostname = "raspberrypi2";
310 FLAGS_application_name = "pi2_message_bridge_server";
311 aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
312 MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
313
314 // And build the app for testing.
315 FLAGS_application_name = "test2";
316 aos::ShmEventLoop pi2_test_event_loop(&pi2_config.message());
317 aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
318 pi2_test_event_loop.MakeFetcher<ServerStatistics>("/pi2/aos");
319
320 // Wait until we are connected, then send.
321 pi1_test_event_loop.MakeWatcher(
322 "/pi1/aos", [](const ServerStatistics &stats) {
323 LOG(INFO) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
324 });
325
326 pi2_test_event_loop.MakeWatcher(
327 "/pi2/aos", [](const ServerStatistics &stats) {
328 LOG(INFO) << "pi2 ServerStatistics " << FlatbufferToJson(&stats);
329 });
330
331 pi1_test_event_loop.MakeWatcher(
332 "/pi1/aos", [](const ClientStatistics &stats) {
333 LOG(INFO) << "pi1 ClientStatistics " << FlatbufferToJson(&stats);
334 });
335
336 pi2_test_event_loop.MakeWatcher(
337 "/pi2/aos", [](const ClientStatistics &stats) {
338 LOG(INFO) << "pi2 ClientStatistics " << FlatbufferToJson(&stats);
339 });
340
341 pi1_test_event_loop.MakeWatcher("/pi1/aos", [](const Timestamp &timestamp) {
342 LOG(INFO) << "pi1 Timestamp " << FlatbufferToJson(&timestamp);
343 });
344 pi2_test_event_loop.MakeWatcher("/pi2/aos", [](const Timestamp &timestamp) {
345 LOG(INFO) << "pi2 Timestamp " << FlatbufferToJson(&timestamp);
346 });
347
348 // Start everything up. Pong is the only thing we don't know how to wait on,
349 // so start it first.
350 std::thread pi1_test_thread(
351 [&pi1_test_event_loop]() { pi1_test_event_loop.Run(); });
352 std::thread pi2_test_thread(
353 [&pi2_test_event_loop]() { pi2_test_event_loop.Run(); });
354
355 std::thread pi1_server_thread(
356 [&pi1_server_event_loop]() { pi1_server_event_loop.Run(); });
357 std::thread pi1_client_thread(
358 [&pi1_client_event_loop]() { pi1_client_event_loop.Run(); });
359 std::thread pi2_server_thread(
360 [&pi2_server_event_loop]() { pi2_server_event_loop.Run(); });
361
362 {
363 FLAGS_application_name = "pi2_message_bridge_client";
364 aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
365 MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
366
367 // Run for 5 seconds to make sure we have time to estimate the offset.
368 aos::TimerHandler *const quit = pi2_client_event_loop.AddTimer(
369 [&pi2_client_event_loop]() { pi2_client_event_loop.Exit(); });
370 pi2_client_event_loop.OnRun([quit, &pi2_client_event_loop]() {
371 // Stop between timestamps, not exactly on them.
372 quit->Setup(pi2_client_event_loop.monotonic_now() +
373 chrono::milliseconds(3050));
374 });
375
376 // And go!
377 pi2_client_event_loop.Run();
378
379 // Now confirm we are synchronized.
380 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
381 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
382
383 const ServerConnection *const pi1_connection =
384 pi1_server_statistics_fetcher->connections()->Get(0);
385 const ServerConnection *const pi2_connection =
386 pi2_server_statistics_fetcher->connections()->Get(0);
387
388 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
389 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
390 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
391 chrono::milliseconds(1));
392 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
393 chrono::milliseconds(-1));
394
395 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
396 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
397 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
398 chrono::milliseconds(1));
399 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
400 chrono::milliseconds(-1));
401 }
402
403 std::this_thread::sleep_for(std::chrono::seconds(2));
404
405 {
406 // Now confirm we are un-synchronized.
407 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
408 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
409 const ServerConnection *const pi1_connection =
410 pi1_server_statistics_fetcher->connections()->Get(0);
411 const ServerConnection *const pi2_connection =
412 pi2_server_statistics_fetcher->connections()->Get(0);
413
414 EXPECT_EQ(pi1_connection->state(), State::DISCONNECTED);
415 EXPECT_FALSE(pi1_connection->has_monotonic_offset());
416 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
417 EXPECT_FALSE(pi2_connection->has_monotonic_offset());
418 }
419
420 {
421 FLAGS_application_name = "pi2_message_bridge_client";
422 aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
423 MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
424
425 // Run for 5 seconds to make sure we have time to estimate the offset.
426 aos::TimerHandler *const quit = pi2_client_event_loop.AddTimer(
427 [&pi2_client_event_loop]() { pi2_client_event_loop.Exit(); });
428 pi2_client_event_loop.OnRun([quit, &pi2_client_event_loop]() {
429 // Stop between timestamps, not exactly on them.
430 quit->Setup(pi2_client_event_loop.monotonic_now() +
431 chrono::milliseconds(3050));
432 });
433
434 // And go!
435 pi2_client_event_loop.Run();
436
437 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
438 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
439
440 // Now confirm we are synchronized again.
441 const ServerConnection *const pi1_connection =
442 pi1_server_statistics_fetcher->connections()->Get(0);
443 const ServerConnection *const pi2_connection =
444 pi2_server_statistics_fetcher->connections()->Get(0);
445
446 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
447 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
448 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
449 chrono::milliseconds(1));
450 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
451 chrono::milliseconds(-1));
452
453 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
454 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
455 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
456 chrono::milliseconds(1));
457 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
458 chrono::milliseconds(-1));
459 }
460
461 // Shut everyone else down
462 pi1_server_event_loop.Exit();
463 pi1_client_event_loop.Exit();
464 pi2_server_event_loop.Exit();
465 pi1_test_event_loop.Exit();
466 pi2_test_event_loop.Exit();
467 pi1_server_thread.join();
468 pi1_client_thread.join();
469 pi2_server_thread.join();
470 pi1_test_thread.join();
471 pi2_test_thread.join();
472}
473
474// Test that the server disconnecting triggers the server offsets on the other
475// side to clear, along with the other client.
476TEST(MessageBridgeTest, ServerRestart) {
477 // This is rather annoying to set up. We need to start up a client and
478 // server, on the same node, but get them to think that they are on different
479 // nodes.
480 //
481 // We need the client to not post directly to "/test" like it would in a
482 // real system, otherwise we will re-send the ping message... So, use an
483 // application specific map to have the client post somewhere else.
484 //
485 // To top this all off, each of these needs to be done with a ShmEventLoop,
486 // which needs to run in a separate thread... And it is really hard to get
487 // everything started up reliably. So just be super generous on timeouts and
488 // hope for the best. We can be more generous in the future if we need to.
489 //
490 // We are faking the application names by passing in --application_name=foo
491 aos::FlatbufferDetachedBuffer<aos::Configuration> server_config =
492 aos::configuration::ReadConfig(
493 "aos/network/message_bridge_test_server_config.json");
494 aos::FlatbufferDetachedBuffer<aos::Configuration> pi2_config =
495 aos::configuration::ReadConfig(
496 "aos/network/message_bridge_test_client_config.json");
497
498 FLAGS_application_name = "pi1_message_bridge_server";
499 // Force ourselves to be "raspberrypi" and allocate everything.
500 FLAGS_override_hostname = "raspberrypi";
501 aos::ShmEventLoop pi1_server_event_loop(&server_config.message());
502 MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
503
504 FLAGS_application_name = "pi1_message_bridge_client";
505 aos::ShmEventLoop pi1_client_event_loop(&server_config.message());
506 MessageBridgeClient pi1_message_bridge_client(&pi1_client_event_loop);
507
508 // And build the app for testing.
509 FLAGS_application_name = "test1";
510 aos::ShmEventLoop pi1_test_event_loop(&server_config.message());
511 aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
512 pi1_test_event_loop.MakeFetcher<ServerStatistics>("/pi1/aos");
513 aos::Fetcher<ClientStatistics> pi1_client_statistics_fetcher =
514 pi1_test_event_loop.MakeFetcher<ClientStatistics>("/pi1/aos");
515
516 // Now do it for "raspberrypi2", the client.
517 FLAGS_override_hostname = "raspberrypi2";
518 FLAGS_application_name = "pi2_message_bridge_client";
519 aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
520 MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
521
522 // And build the app for testing.
523 FLAGS_application_name = "test2";
524 aos::ShmEventLoop pi2_test_event_loop(&pi2_config.message());
525 aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
526 pi2_test_event_loop.MakeFetcher<ServerStatistics>("/pi2/aos");
527
528 // Wait until we are connected, then send.
529 pi1_test_event_loop.MakeWatcher(
530 "/pi1/aos", [](const ServerStatistics &stats) {
531 LOG(INFO) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
532 });
533
534 // Confirm both client and server statistics messages have decent offsets in
535 // them.
536 pi2_test_event_loop.MakeWatcher(
537 "/pi2/aos", [](const ServerStatistics &stats) {
538 LOG(INFO) << "pi2 ServerStatistics " << FlatbufferToJson(&stats);
539 });
540
541 pi1_test_event_loop.MakeWatcher(
542 "/pi1/aos", [](const ClientStatistics &stats) {
543 LOG(INFO) << "pi1 ClientStatistics " << FlatbufferToJson(&stats);
544 });
545
546 pi2_test_event_loop.MakeWatcher(
547 "/pi2/aos", [](const ClientStatistics &stats) {
548 LOG(INFO) << "pi2 ClientStatistics " << FlatbufferToJson(&stats);
549 });
550
551 pi1_test_event_loop.MakeWatcher("/pi1/aos", [](const Timestamp &timestamp) {
552 LOG(INFO) << "pi1 Timestamp " << FlatbufferToJson(&timestamp);
553 });
554 pi2_test_event_loop.MakeWatcher("/pi2/aos", [](const Timestamp &timestamp) {
555 LOG(INFO) << "pi2 Timestamp " << FlatbufferToJson(&timestamp);
556 });
557
558 // Start everything up. Pong is the only thing we don't know how to wait on,
559 // so start it first.
560 std::thread pi1_test_thread(
561 [&pi1_test_event_loop]() { pi1_test_event_loop.Run(); });
562 std::thread pi2_test_thread(
563 [&pi2_test_event_loop]() { pi2_test_event_loop.Run(); });
564
565 std::thread pi1_server_thread(
566 [&pi1_server_event_loop]() { pi1_server_event_loop.Run(); });
567 std::thread pi1_client_thread(
568 [&pi1_client_event_loop]() { pi1_client_event_loop.Run(); });
569 std::thread pi2_client_thread(
570 [&pi2_client_event_loop]() { pi2_client_event_loop.Run(); });
571
572 {
573 FLAGS_application_name = "pi2_message_bridge_server";
574 aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
575 MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
576
577 // Run for 5 seconds to make sure we have time to estimate the offset.
578 aos::TimerHandler *const quit = pi2_server_event_loop.AddTimer(
579 [&pi2_server_event_loop]() { pi2_server_event_loop.Exit(); });
580 pi2_server_event_loop.OnRun([quit, &pi2_server_event_loop]() {
581 // Stop between timestamps, not exactly on them.
582 quit->Setup(pi2_server_event_loop.monotonic_now() +
583 chrono::milliseconds(3050));
584 });
585
586 // And go!
587 pi2_server_event_loop.Run();
588
589 // Now confirm we are synchronized.
590 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
591 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
592
593 const ServerConnection *const pi1_connection =
594 pi1_server_statistics_fetcher->connections()->Get(0);
595 const ServerConnection *const pi2_connection =
596 pi2_server_statistics_fetcher->connections()->Get(0);
597
598 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
599 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
600 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
601 chrono::milliseconds(1));
602 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
603 chrono::milliseconds(-1));
604
605 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
606 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
607 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
608 chrono::milliseconds(1));
609 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
610 chrono::milliseconds(-1));
611 }
612
613 std::this_thread::sleep_for(std::chrono::seconds(2));
614
615 {
616 // And confirm we are unsynchronized.
617 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
618 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
619
620 const ServerConnection *const pi1_server_connection =
621 pi1_server_statistics_fetcher->connections()->Get(0);
622 const ClientConnection *const pi1_client_connection =
623 pi1_client_statistics_fetcher->connections()->Get(0);
624
625 EXPECT_EQ(pi1_server_connection->state(), State::CONNECTED);
626 EXPECT_FALSE(pi1_server_connection->has_monotonic_offset());
627 EXPECT_EQ(pi1_client_connection->state(), State::DISCONNECTED);
628 EXPECT_FALSE(pi1_client_connection->has_monotonic_offset());
629 }
630
631 {
632 FLAGS_application_name = "pi2_message_bridge_server";
633 aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
634 MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
635
636 // Run for 5 seconds to make sure we have time to estimate the offset.
637 aos::TimerHandler *const quit = pi2_server_event_loop.AddTimer(
638 [&pi2_server_event_loop]() { pi2_server_event_loop.Exit(); });
639 pi2_server_event_loop.OnRun([quit, &pi2_server_event_loop]() {
640 // Stop between timestamps, not exactly on them.
641 quit->Setup(pi2_server_event_loop.monotonic_now() +
642 chrono::milliseconds(3050));
643 });
644
645 // And go!
646 pi2_server_event_loop.Run();
647
648 // And confirm we are synchronized again.
649 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
650 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
651
652 const ServerConnection *const pi1_connection =
653 pi1_server_statistics_fetcher->connections()->Get(0);
654 const ServerConnection *const pi2_connection =
655 pi2_server_statistics_fetcher->connections()->Get(0);
656
657 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
658 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
659 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
660 chrono::milliseconds(1));
661 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
662 chrono::milliseconds(-1));
663
664 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
665 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
666 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
667 chrono::milliseconds(1));
668 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
669 chrono::milliseconds(-1));
670 }
671
672 // Shut everyone else down
673 pi1_server_event_loop.Exit();
674 pi1_client_event_loop.Exit();
675 pi2_client_event_loop.Exit();
676 pi1_test_event_loop.Exit();
677 pi2_test_event_loop.Exit();
678 pi1_server_thread.join();
679 pi1_client_thread.join();
680 pi2_client_thread.join();
681 pi1_test_thread.join();
682 pi2_test_thread.join();
683}
684
685// TODO(austin): This test confirms that the external state does the right
686// thing, but doesn't confirm that the internal state does. We either need to
687// expose a way to check the state in a thread-safe way, or need a way to jump
688// time for one node to do that.
689
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800690} // namespace testing
691} // namespace message_bridge
692} // namespace aos