blob: a9e918b12ce76556df17eff3e21ae0c46e7ab648 [file] [log] [blame]
Austin Schuhe84c3ed2019-12-14 15:29:48 -08001#include "gtest/gtest.h"
2
3#include <chrono>
4#include <thread>
5
Austin Schuh2f8fd752020-09-01 22:38:28 -07006#include "absl/strings/str_cat.h"
Austin Schuh4889b182020-11-18 19:11:56 -08007#include "aos/event.h"
Austin Schuhe84c3ed2019-12-14 15:29:48 -08008#include "aos/events/ping_generated.h"
9#include "aos/events/pong_generated.h"
10#include "aos/network/message_bridge_client_lib.h"
11#include "aos/network/message_bridge_server_lib.h"
Jim Ostrowski2192ddb2020-06-24 19:07:31 -070012#include "aos/network/team_number.h"
Austin Schuhe991fe22020-11-18 16:53:39 -080013#include "aos/util/file.h"
Austin Schuhe84c3ed2019-12-14 15:29:48 -080014
15namespace aos {
Austin Schuh2f8fd752020-09-01 22:38:28 -070016void SetShmBase(const std::string_view base);
17
Austin Schuhe84c3ed2019-12-14 15:29:48 -080018namespace message_bridge {
19namespace testing {
20
21namespace chrono = std::chrono;
22
Austin Schuhe991fe22020-11-18 16:53:39 -080023std::string ShmBase(const std::string_view node) {
Austin Schuh2f8fd752020-09-01 22:38:28 -070024 const char *tmpdir_c_str = getenv("TEST_TMPDIR");
25 if (tmpdir_c_str != nullptr) {
Austin Schuhe991fe22020-11-18 16:53:39 -080026 return absl::StrCat(tmpdir_c_str, "/", node);
Austin Schuh2f8fd752020-09-01 22:38:28 -070027 } else {
Austin Schuhe991fe22020-11-18 16:53:39 -080028 return absl::StrCat("/dev/shm/", node);
Austin Schuh2f8fd752020-09-01 22:38:28 -070029 }
30}
31
Austin Schuhe991fe22020-11-18 16:53:39 -080032void DoSetShmBase(const std::string_view node) {
33 aos::SetShmBase(ShmBase(node));
34}
35
36class MessageBridgeTest : public ::testing::Test {
Austin Schuh0de30f32020-12-06 12:44:28 -080037 public:
38 MessageBridgeTest()
39 : pi1_config(aos::configuration::ReadConfig(
40 "aos/network/message_bridge_test_server_config.json")),
41 pi2_config(aos::configuration::ReadConfig(
42 "aos/network/message_bridge_test_client_config.json")) {
43 util::UnlinkRecursive(ShmBase("pi1"));
44 util::UnlinkRecursive(ShmBase("pi2"));
45 }
Austin Schuhe991fe22020-11-18 16:53:39 -080046
Austin Schuh0a2f12f2021-01-08 22:48:29 -080047 void OnPi1() {
48 DoSetShmBase("pi1");
49 FLAGS_override_hostname = "raspberrypi";
50 }
51
52 void OnPi2() {
53 DoSetShmBase("pi2");
54 FLAGS_override_hostname = "raspberrypi2";
55 }
56
57 void MakePi1Server() {
58 OnPi1();
59 FLAGS_application_name = "pi1_message_bridge_server";
60 pi1_server_event_loop =
61 std::make_unique<aos::ShmEventLoop>(&pi1_config.message());
62 pi1_server_event_loop->SetRuntimeRealtimePriority(1);
63 pi1_message_bridge_server =
64 std::make_unique<MessageBridgeServer>(pi1_server_event_loop.get());
65 }
66
67 void RunPi1Server(chrono::nanoseconds duration) {
68 // Setup a shutdown callback.
69 aos::TimerHandler *const quit = pi1_server_event_loop->AddTimer(
70 [this]() { pi1_server_event_loop->Exit(); });
71 pi1_server_event_loop->OnRun([this, quit, duration]() {
72 // Stop between timestamps, not exactly on them.
73 quit->Setup(pi1_server_event_loop->monotonic_now() + duration);
74 });
75
76 pi1_server_event_loop->Run();
77 }
78
79 void StartPi1Server() {
80 pi1_server_thread = std::thread([this]() {
81 LOG(INFO) << "Started pi1_message_bridge_server";
82 pi1_server_event_loop->Run();
83 });
84 }
85
86 void StopPi1Server() {
87 if (pi1_server_thread.joinable()) {
88 pi1_server_event_loop->Exit();
89 pi1_server_thread.join();
90 pi1_server_thread = std::thread();
91 }
92 pi1_message_bridge_server.reset();
93 pi1_server_event_loop.reset();
94 }
95
96 void MakePi1Client() {
97 OnPi1();
98 FLAGS_application_name = "pi1_message_bridge_client";
99 pi1_client_event_loop =
100 std::make_unique<aos::ShmEventLoop>(&pi1_config.message());
101 pi1_client_event_loop->SetRuntimeRealtimePriority(1);
102 pi1_message_bridge_client =
103 std::make_unique<MessageBridgeClient>(pi1_client_event_loop.get());
104 }
105
106 void StartPi1Client() {
107 pi1_client_thread = std::thread([this]() {
108 LOG(INFO) << "Started pi1_message_bridge_client";
109 pi1_client_event_loop->Run();
110 });
111 }
112
113 void StopPi1Client() {
114 pi1_client_event_loop->Exit();
115 pi1_client_thread.join();
116 pi1_client_thread = std::thread();
117 pi1_message_bridge_client.reset();
118 pi1_client_event_loop.reset();
119 }
120
121 void MakePi1Test() {
122 OnPi1();
123 FLAGS_application_name = "test1";
124 pi1_test_event_loop =
125 std::make_unique<aos::ShmEventLoop>(&pi1_config.message());
126
127 pi1_test_event_loop->MakeWatcher(
128 "/pi1/aos", [](const ServerStatistics &stats) {
129 VLOG(1) << "/pi1/aos ServerStatistics " << FlatbufferToJson(&stats);
130 });
131
132 pi1_test_event_loop->MakeWatcher(
133 "/pi1/aos", [](const ClientStatistics &stats) {
134 VLOG(1) << "/pi1/aos ClientStatistics " << FlatbufferToJson(&stats);
135 });
136
137 pi1_test_event_loop->MakeWatcher(
138 "/pi1/aos", [](const Timestamp &timestamp) {
139 VLOG(1) << "/pi1/aos Timestamp " << FlatbufferToJson(&timestamp);
140 });
141 }
142
143 void StartPi1Test() {
144 pi1_test_thread = std::thread([this]() {
145 LOG(INFO) << "Started pi1_test";
146 pi1_test_event_loop->Run();
147 });
148 }
149
150 void StopPi1Test() {
151 pi1_test_event_loop->Exit();
152 pi1_test_thread.join();
153 }
154
155 void MakePi2Server() {
156 OnPi2();
157 FLAGS_application_name = "pi2_message_bridge_server";
158 pi2_server_event_loop =
159 std::make_unique<aos::ShmEventLoop>(&pi2_config.message());
160 pi2_server_event_loop->SetRuntimeRealtimePriority(1);
161 pi2_message_bridge_server =
162 std::make_unique<MessageBridgeServer>(pi2_server_event_loop.get());
163 }
164
165 void RunPi2Server(chrono::nanoseconds duration) {
166 // Setup a shutdown callback.
167 aos::TimerHandler *const quit = pi2_server_event_loop->AddTimer(
168 [this]() { pi2_server_event_loop->Exit(); });
169 pi2_server_event_loop->OnRun([this, quit, duration]() {
170 // Stop between timestamps, not exactly on them.
171 quit->Setup(pi2_server_event_loop->monotonic_now() + duration);
172 });
173
174 pi2_server_event_loop->Run();
175 }
176
177 void StartPi2Server() {
178 pi2_server_thread = std::thread([this]() {
179 LOG(INFO) << "Started pi2_message_bridge_server";
180 pi2_server_event_loop->Run();
181 });
182 }
183
184 void StopPi2Server() {
185 if (pi2_server_thread.joinable()) {
186 pi2_server_event_loop->Exit();
187 pi2_server_thread.join();
188 pi2_server_thread = std::thread();
189 }
190 pi2_message_bridge_server.reset();
191 pi2_server_event_loop.reset();
192 }
193
194 void MakePi2Client() {
195 OnPi2();
196 FLAGS_application_name = "pi2_message_bridge_client";
197 pi2_client_event_loop =
198 std::make_unique<aos::ShmEventLoop>(&pi2_config.message());
199 pi2_client_event_loop->SetRuntimeRealtimePriority(1);
200 pi2_message_bridge_client =
201 std::make_unique<MessageBridgeClient>(pi2_client_event_loop.get());
202 }
203
204 void RunPi2Client(chrono::nanoseconds duration) {
205 // Run for 5 seconds to make sure we have time to estimate the offset.
206 aos::TimerHandler *const quit = pi2_client_event_loop->AddTimer(
207 [this]() { pi2_client_event_loop->Exit(); });
208 pi2_client_event_loop->OnRun([this, quit, duration]() {
209 // Stop between timestamps, not exactly on them.
210 quit->Setup(pi2_client_event_loop->monotonic_now() + duration);
211 });
212
213 // And go!
214 pi2_client_event_loop->Run();
215 }
216
217 void StartPi2Client() {
218 pi2_client_thread = std::thread([this]() {
219 LOG(INFO) << "Started pi2_message_bridge_client";
220 pi2_client_event_loop->Run();
221 });
222 }
223
224 void StopPi2Client() {
225 if (pi2_client_thread.joinable()) {
226 pi2_client_event_loop->Exit();
227 pi2_client_thread.join();
228 pi2_client_thread = std::thread();
229 }
230 pi2_message_bridge_client.reset();
231 pi2_client_event_loop.reset();
232 }
233
234 void MakePi2Test() {
235 OnPi2();
236 FLAGS_application_name = "test2";
237 pi2_test_event_loop =
238 std::make_unique<aos::ShmEventLoop>(&pi2_config.message());
239
240 pi2_test_event_loop->MakeWatcher(
241 "/pi2/aos", [](const ServerStatistics &stats) {
242 VLOG(1) << "/pi2/aos ServerStatistics " << FlatbufferToJson(&stats);
243 });
244
245 pi2_test_event_loop->MakeWatcher(
246 "/pi2/aos", [](const ClientStatistics &stats) {
247 VLOG(1) << "/pi2/aos ClientStatistics " << FlatbufferToJson(&stats);
248 });
249
250 pi2_test_event_loop->MakeWatcher(
251 "/pi2/aos", [](const Timestamp &timestamp) {
252 VLOG(1) << "/pi2/aos Timestamp " << FlatbufferToJson(&timestamp);
253 });
254 }
255
256 void StartPi2Test() {
257 pi2_test_thread = std::thread([this]() {
258 LOG(INFO) << "Started pi2_message_bridge_test";
259 pi2_test_event_loop->Run();
260 });
261 }
262
263 void StopPi2Test() {
264 pi2_test_event_loop->Exit();
265 pi2_test_thread.join();
266 }
267
Austin Schuh0de30f32020-12-06 12:44:28 -0800268 aos::FlatbufferDetachedBuffer<aos::Configuration> pi1_config;
269 aos::FlatbufferDetachedBuffer<aos::Configuration> pi2_config;
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800270
271 std::unique_ptr<aos::ShmEventLoop> pi1_server_event_loop;
272 std::unique_ptr<MessageBridgeServer> pi1_message_bridge_server;
273 std::thread pi1_server_thread;
274
275 std::unique_ptr<aos::ShmEventLoop> pi1_client_event_loop;
276 std::unique_ptr<MessageBridgeClient> pi1_message_bridge_client;
277 std::thread pi1_client_thread;
278
279 std::unique_ptr<aos::ShmEventLoop> pi1_test_event_loop;
280 std::thread pi1_test_thread;
281
282 std::unique_ptr<aos::ShmEventLoop> pi2_server_event_loop;
283 std::unique_ptr<MessageBridgeServer> pi2_message_bridge_server;
284 std::thread pi2_server_thread;
285
286 std::unique_ptr<aos::ShmEventLoop> pi2_client_event_loop;
287 std::unique_ptr<MessageBridgeClient> pi2_message_bridge_client;
288 std::thread pi2_client_thread;
289
290 std::unique_ptr<aos::ShmEventLoop> pi2_test_event_loop;
291 std::thread pi2_test_thread;
Austin Schuhe991fe22020-11-18 16:53:39 -0800292};
293
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800294// Test that we can send a ping message over sctp and receive it.
Austin Schuhe991fe22020-11-18 16:53:39 -0800295TEST_F(MessageBridgeTest, PingPong) {
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800296 // This is rather annoying to set up. We need to start up a client and
297 // server, on the same node, but get them to think that they are on different
298 // nodes.
299 //
300 // We then get to wait until they are connected.
301 //
302 // After they are connected, we send a Ping message.
303 //
304 // On the other end, we receive a Pong message.
305 //
306 // But, we need the client to not post directly to "/test" like it would in a
307 // real system, otherwise we will re-send the ping message... So, use an
308 // application specific map to have the client post somewhere else.
309 //
310 // To top this all off, each of these needs to be done with a ShmEventLoop,
311 // which needs to run in a separate thread... And it is really hard to get
312 // everything started up reliably. So just be super generous on timeouts and
313 // hope for the best. We can be more generous in the future if we need to.
314 //
315 // We are faking the application names by passing in --application_name=foo
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800316 OnPi1();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800317 // Force ourselves to be "raspberrypi" and allocate everything.
Austin Schuh2f8fd752020-09-01 22:38:28 -0700318
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800319 MakePi1Server();
320 MakePi1Client();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800321
322 // And build the app which sends the pings.
323 FLAGS_application_name = "ping";
Austin Schuhe991fe22020-11-18 16:53:39 -0800324 aos::ShmEventLoop ping_event_loop(&pi1_config.message());
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800325 aos::Sender<examples::Ping> ping_sender =
326 ping_event_loop.MakeSender<examples::Ping>("/test");
327
Austin Schuhe991fe22020-11-18 16:53:39 -0800328 aos::ShmEventLoop pi1_test_event_loop(&pi1_config.message());
Austin Schuh0de30f32020-12-06 12:44:28 -0800329 aos::Fetcher<RemoteMessage> message_header_fetcher1 =
330 pi1_test_event_loop.MakeFetcher<RemoteMessage>(
Austin Schuh2f8fd752020-09-01 22:38:28 -0700331 "/pi1/aos/remote_timestamps/pi2");
332
333 // Fetchers for confirming the remote timestamps made it.
334 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
335 ping_event_loop.MakeFetcher<examples::Ping>("/test");
336 aos::Fetcher<Timestamp> pi1_on_pi1_timestamp_fetcher =
337 ping_event_loop.MakeFetcher<Timestamp>("/aos");
338
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800339 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800340 OnPi2();
Austin Schuh2f8fd752020-09-01 22:38:28 -0700341
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800342 MakePi2Client();
343 MakePi2Server();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800344
345 // And build the app which sends the pongs.
346 FLAGS_application_name = "pong";
Austin Schuh5344c352020-04-12 17:04:26 -0700347 aos::ShmEventLoop pong_event_loop(&pi2_config.message());
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800348
Austin Schuh7bc59052020-02-16 23:48:33 -0800349 // And build the app for testing.
350 FLAGS_application_name = "test";
Austin Schuh5344c352020-04-12 17:04:26 -0700351 aos::ShmEventLoop test_event_loop(&pi2_config.message());
Austin Schuh7bc59052020-02-16 23:48:33 -0800352
353 aos::Fetcher<ClientStatistics> client_statistics_fetcher =
354 test_event_loop.MakeFetcher<ClientStatistics>("/aos");
Austin Schuh0de30f32020-12-06 12:44:28 -0800355 aos::Fetcher<RemoteMessage> message_header_fetcher2 =
356 test_event_loop.MakeFetcher<RemoteMessage>(
Austin Schuh2f8fd752020-09-01 22:38:28 -0700357 "/pi2/aos/remote_timestamps/pi1");
358
359 // Event loop for fetching data delivered to pi2 from pi1 to match up
360 // messages.
361 aos::ShmEventLoop delivered_messages_event_loop(&pi2_config.message());
362 aos::Fetcher<Timestamp> pi1_on_pi2_timestamp_fetcher =
363 delivered_messages_event_loop.MakeFetcher<Timestamp>("/pi1/aos");
364 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
365 delivered_messages_event_loop.MakeFetcher<examples::Ping>("/test");
366 EXPECT_FALSE(ping_on_pi2_fetcher.Fetch());
367 EXPECT_FALSE(pi1_on_pi2_timestamp_fetcher.Fetch());
Austin Schuh7bc59052020-02-16 23:48:33 -0800368
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800369 // Count the pongs.
370 int pong_count = 0;
371 pong_event_loop.MakeWatcher(
Austin Schuh2f8fd752020-09-01 22:38:28 -0700372 "/test", [&pong_count](const examples::Ping &ping) {
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800373 ++pong_count;
Austin Schuh1ca49e92020-12-11 00:01:27 -0800374 VLOG(1) << "Got ping back " << FlatbufferToJson(&ping);
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800375 });
376
377 FLAGS_override_hostname = "";
378
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800379 // Wait until we are connected, then send.
380 int ping_count = 0;
Austin Schuh7bc59052020-02-16 23:48:33 -0800381 int pi1_server_statistics_count = 0;
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800382 ping_event_loop.MakeWatcher(
Austin Schuh196a4452020-03-15 23:12:03 -0700383 "/pi1/aos",
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800384 [this, &ping_count, &ping_sender,
Austin Schuh7bc59052020-02-16 23:48:33 -0800385 &pi1_server_statistics_count](const ServerStatistics &stats) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800386 VLOG(1) << "/pi1/aos ServerStatistics " << FlatbufferToJson(&stats);
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800387
388 ASSERT_TRUE(stats.has_connections());
389 EXPECT_EQ(stats.connections()->size(), 1);
390
391 bool connected = false;
392 for (const ServerConnection *connection : *stats.connections()) {
Austin Schuh7bc59052020-02-16 23:48:33 -0800393 // Confirm that we are estimating the server time offset correctly. It
394 // should be about 0 since we are on the same machine here.
395 if (connection->has_monotonic_offset()) {
396 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
397 chrono::milliseconds(1));
398 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
399 chrono::milliseconds(-1));
400 ++pi1_server_statistics_count;
401 }
402
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800403 if (connection->node()->name()->string_view() ==
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800404 pi2_client_event_loop->node()->name()->string_view()) {
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800405 if (connection->state() == State::CONNECTED) {
Austin Schuh20ac95d2020-12-05 17:24:19 -0800406 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800407 connected = true;
408 }
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800409 }
410 }
411
412 if (connected) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800413 VLOG(1) << "Connected! Sent ping.";
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800414 auto builder = ping_sender.MakeBuilder();
415 examples::Ping::Builder ping_builder =
416 builder.MakeBuilder<examples::Ping>();
417 ping_builder.add_value(ping_count + 971);
418 builder.Send(ping_builder.Finish());
419 ++ping_count;
420 }
421 });
422
Austin Schuh7bc59052020-02-16 23:48:33 -0800423 // Confirm both client and server statistics messages have decent offsets in
424 // them.
425 int pi2_server_statistics_count = 0;
Austin Schuh196a4452020-03-15 23:12:03 -0700426 pong_event_loop.MakeWatcher("/pi2/aos", [&pi2_server_statistics_count](
Austin Schuh7bc59052020-02-16 23:48:33 -0800427 const ServerStatistics &stats) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800428 VLOG(1) << "/pi2/aos ServerStatistics " << FlatbufferToJson(&stats);
Austin Schuh7bc59052020-02-16 23:48:33 -0800429 for (const ServerConnection *connection : *stats.connections()) {
430 if (connection->has_monotonic_offset()) {
431 ++pi2_server_statistics_count;
432 // Confirm that we are estimating the server time offset correctly. It
433 // should be about 0 since we are on the same machine here.
434 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
435 chrono::milliseconds(1));
436 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
437 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800438 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh7bc59052020-02-16 23:48:33 -0800439 }
440 }
441 });
442
443 int pi1_client_statistics_count = 0;
Austin Schuh5344c352020-04-12 17:04:26 -0700444 ping_event_loop.MakeWatcher("/pi1/aos", [&pi1_client_statistics_count](
445 const ClientStatistics &stats) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800446 VLOG(1) << "/pi1/aos ClientStatistics " << FlatbufferToJson(&stats);
Austin Schuh7bc59052020-02-16 23:48:33 -0800447
Austin Schuh5344c352020-04-12 17:04:26 -0700448 for (const ClientConnection *connection : *stats.connections()) {
449 if (connection->has_monotonic_offset()) {
450 ++pi1_client_statistics_count;
451 // It takes at least 10 microseconds to send a message between the
452 // client and server. The min (filtered) time shouldn't be over 10
453 // milliseconds on localhost. This might have to bump up if this is
454 // proving flaky.
455 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
Austin Schuh3edddcc2020-12-29 13:32:02 -0800456 chrono::milliseconds(10))
457 << " " << connection->monotonic_offset()
458 << "ns vs 10000ns on iteration " << pi1_client_statistics_count;
Austin Schuh5344c352020-04-12 17:04:26 -0700459 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
Austin Schuh3edddcc2020-12-29 13:32:02 -0800460 chrono::microseconds(10))
461 << " " << connection->monotonic_offset()
462 << "ns vs 10000ns on iteration " << pi1_client_statistics_count;
Austin Schuh5344c352020-04-12 17:04:26 -0700463 }
464 }
465 });
Austin Schuh7bc59052020-02-16 23:48:33 -0800466
467 int pi2_client_statistics_count = 0;
Austin Schuh196a4452020-03-15 23:12:03 -0700468 pong_event_loop.MakeWatcher("/pi2/aos", [&pi2_client_statistics_count](
Austin Schuh7bc59052020-02-16 23:48:33 -0800469 const ClientStatistics &stats) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800470 VLOG(1) << "/pi2/aos ClientStatistics " << FlatbufferToJson(&stats);
Austin Schuh7bc59052020-02-16 23:48:33 -0800471
472 for (const ClientConnection *connection : *stats.connections()) {
473 if (connection->has_monotonic_offset()) {
474 ++pi2_client_statistics_count;
475 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
476 chrono::milliseconds(10));
477 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
478 chrono::microseconds(10));
479 }
480 }
481 });
482
Austin Schuh196a4452020-03-15 23:12:03 -0700483 ping_event_loop.MakeWatcher("/pi1/aos", [](const Timestamp &timestamp) {
Austin Schuh7bc59052020-02-16 23:48:33 -0800484 EXPECT_TRUE(timestamp.has_offsets());
Austin Schuh1ca49e92020-12-11 00:01:27 -0800485 VLOG(1) << "/pi1/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuh7bc59052020-02-16 23:48:33 -0800486 });
Austin Schuh196a4452020-03-15 23:12:03 -0700487 pong_event_loop.MakeWatcher("/pi2/aos", [](const Timestamp &timestamp) {
Austin Schuh7bc59052020-02-16 23:48:33 -0800488 EXPECT_TRUE(timestamp.has_offsets());
Austin Schuh1ca49e92020-12-11 00:01:27 -0800489 VLOG(1) << "/pi2/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuh7bc59052020-02-16 23:48:33 -0800490 });
491
492 // Run for 5 seconds to make sure we have time to estimate the offset.
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800493 aos::TimerHandler *quit = ping_event_loop.AddTimer(
494 [&ping_event_loop]() { ping_event_loop.Exit(); });
495 ping_event_loop.OnRun([quit, &ping_event_loop]() {
Austin Schuh7bc59052020-02-16 23:48:33 -0800496 // Stop between timestamps, not exactly on them.
497 quit->Setup(ping_event_loop.monotonic_now() + chrono::milliseconds(5050));
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800498 });
499
Austin Schuh2f8fd752020-09-01 22:38:28 -0700500 // Find the channel index for both the /pi1/aos Timestamp channel and Ping
501 // channel.
502 const size_t pi1_timestamp_channel = configuration::ChannelIndex(
503 pong_event_loop.configuration(), pi1_on_pi2_timestamp_fetcher.channel());
504 const size_t ping_timestamp_channel =
505 configuration::ChannelIndex(delivered_messages_event_loop.configuration(),
506 ping_on_pi2_fetcher.channel());
507
508 for (const Channel *channel : *ping_event_loop.configuration()->channels()) {
509 VLOG(1) << "Channel "
510 << configuration::ChannelIndex(ping_event_loop.configuration(),
511 channel)
512 << " " << configuration::CleanedChannelToString(channel);
513 }
514
515 // For each remote timestamp we get back, confirm that it is either a ping
516 // message, or a timestamp we sent out. Also confirm that the timestamps are
517 // correct.
518 ping_event_loop.MakeWatcher(
519 "/pi1/aos/remote_timestamps/pi2",
520 [pi1_timestamp_channel, ping_timestamp_channel, &ping_on_pi2_fetcher,
521 &ping_on_pi1_fetcher, &pi1_on_pi2_timestamp_fetcher,
Austin Schuh0de30f32020-12-06 12:44:28 -0800522 &pi1_on_pi1_timestamp_fetcher](const RemoteMessage &header) {
523 VLOG(1) << "/pi1/aos/remote_timestamps/pi2 RemoteMessage "
524 << aos::FlatbufferToJson(&header);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700525
Austin Schuh20ac95d2020-12-05 17:24:19 -0800526 EXPECT_TRUE(header.has_boot_uuid());
527
Austin Schuh2f8fd752020-09-01 22:38:28 -0700528 const aos::monotonic_clock::time_point header_monotonic_sent_time(
529 chrono::nanoseconds(header.monotonic_sent_time()));
530 const aos::realtime_clock::time_point header_realtime_sent_time(
531 chrono::nanoseconds(header.realtime_sent_time()));
532 const aos::monotonic_clock::time_point header_monotonic_remote_time(
533 chrono::nanoseconds(header.monotonic_remote_time()));
534 const aos::realtime_clock::time_point header_realtime_remote_time(
535 chrono::nanoseconds(header.realtime_remote_time()));
536
537 const Context *pi1_context = nullptr;
538 const Context *pi2_context = nullptr;
539
540 if (header.channel_index() == pi1_timestamp_channel) {
541 // Find the forwarded message.
542 while (pi1_on_pi2_timestamp_fetcher.context().monotonic_event_time <
543 header_monotonic_sent_time) {
544 ASSERT_TRUE(pi1_on_pi2_timestamp_fetcher.FetchNext());
545 }
546
547 // And the source message.
548 while (pi1_on_pi1_timestamp_fetcher.context().monotonic_event_time <
549 header_monotonic_remote_time) {
550 ASSERT_TRUE(pi1_on_pi1_timestamp_fetcher.FetchNext());
551 }
552
553 pi1_context = &pi1_on_pi1_timestamp_fetcher.context();
554 pi2_context = &pi1_on_pi2_timestamp_fetcher.context();
555 } else if (header.channel_index() == ping_timestamp_channel) {
556 // Find the forwarded message.
557 while (ping_on_pi2_fetcher.context().monotonic_event_time <
558 header_monotonic_sent_time) {
559 ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
560 }
561
562 // And the source message.
563 while (ping_on_pi1_fetcher.context().monotonic_event_time <
564 header_monotonic_remote_time) {
565 ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
566 }
567
568 pi1_context = &ping_on_pi1_fetcher.context();
569 pi2_context = &ping_on_pi2_fetcher.context();
570 } else {
571 LOG(FATAL) << "Unknown channel";
572 }
573
574 // Confirm the forwarded message has matching timestamps to the
575 // timestamps we got back.
576 EXPECT_EQ(pi2_context->queue_index, header.queue_index());
577 EXPECT_EQ(pi2_context->monotonic_event_time,
578 header_monotonic_sent_time);
579 EXPECT_EQ(pi2_context->realtime_event_time, header_realtime_sent_time);
580 EXPECT_EQ(pi2_context->realtime_remote_time,
581 header_realtime_remote_time);
582 EXPECT_EQ(pi2_context->monotonic_remote_time,
583 header_monotonic_remote_time);
584
585 // Confirm the forwarded message also matches the source message.
586 EXPECT_EQ(pi1_context->queue_index, header.queue_index());
587 EXPECT_EQ(pi1_context->monotonic_event_time,
588 header_monotonic_remote_time);
589 EXPECT_EQ(pi1_context->realtime_event_time,
590 header_realtime_remote_time);
591 });
592
Austin Schuh7bc59052020-02-16 23:48:33 -0800593 // Start everything up. Pong is the only thing we don't know how to wait on,
594 // so start it first.
595 std::thread pong_thread([&pong_event_loop]() { pong_event_loop.Run(); });
596
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800597 StartPi1Server();
598 StartPi1Client();
599 StartPi2Client();
600 StartPi2Server();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800601
602 // And go!
603 ping_event_loop.Run();
604
605 // Shut everyone else down
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800606 StopPi1Server();
607 StopPi1Client();
608 StopPi2Client();
609 StopPi2Server();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800610 pong_event_loop.Exit();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800611 pong_thread.join();
612
613 // Make sure we sent something.
614 EXPECT_GE(ping_count, 1);
615 // And got something back.
616 EXPECT_GE(pong_count, 1);
Austin Schuh7bc59052020-02-16 23:48:33 -0800617
618 // Confirm that we are estimating a monotonic offset on the client.
619 ASSERT_TRUE(client_statistics_fetcher.Fetch());
620
621 EXPECT_EQ(client_statistics_fetcher->connections()->size(), 1u);
622 EXPECT_EQ(client_statistics_fetcher->connections()
623 ->Get(0)
624 ->node()
625 ->name()
626 ->string_view(),
627 "pi1");
628
629 // Make sure the offset in one direction is less than a second.
630 EXPECT_GT(
631 client_statistics_fetcher->connections()->Get(0)->monotonic_offset(), 0);
632 EXPECT_LT(
633 client_statistics_fetcher->connections()->Get(0)->monotonic_offset(),
634 1000000000);
635
636 EXPECT_GE(pi1_server_statistics_count, 2);
637 EXPECT_GE(pi2_server_statistics_count, 2);
638 EXPECT_GE(pi1_client_statistics_count, 2);
639 EXPECT_GE(pi2_client_statistics_count, 2);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700640
641 // Confirm we got timestamps back!
642 EXPECT_TRUE(message_header_fetcher1.Fetch());
643 EXPECT_TRUE(message_header_fetcher2.Fetch());
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800644}
645
Austin Schuh5344c352020-04-12 17:04:26 -0700646// Test that the client disconnecting triggers the server offsets on both sides
647// to clear.
Austin Schuhe991fe22020-11-18 16:53:39 -0800648TEST_F(MessageBridgeTest, ClientRestart) {
Austin Schuh5344c352020-04-12 17:04:26 -0700649 // This is rather annoying to set up. We need to start up a client and
650 // server, on the same node, but get them to think that they are on different
651 // nodes.
652 //
653 // We need the client to not post directly to "/test" like it would in a
654 // real system, otherwise we will re-send the ping message... So, use an
655 // application specific map to have the client post somewhere else.
656 //
657 // To top this all off, each of these needs to be done with a ShmEventLoop,
658 // which needs to run in a separate thread... And it is really hard to get
659 // everything started up reliably. So just be super generous on timeouts and
660 // hope for the best. We can be more generous in the future if we need to.
661 //
662 // We are faking the application names by passing in --application_name=foo
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800663 OnPi1();
Austin Schuh5344c352020-04-12 17:04:26 -0700664
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800665 MakePi1Server();
666 MakePi1Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700667
668 // And build the app for testing.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800669 MakePi1Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700670 aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800671 pi1_test_event_loop->MakeFetcher<ServerStatistics>("/pi1/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700672
673 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800674 OnPi2();
675 MakePi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700676
677 // And build the app for testing.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800678 MakePi2Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700679 aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800680 pi2_test_event_loop->MakeFetcher<ServerStatistics>("/pi2/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700681
682 // Wait until we are connected, then send.
Austin Schuh5344c352020-04-12 17:04:26 -0700683
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800684 StartPi1Test();
685 StartPi2Test();
686 StartPi1Server();
687 StartPi1Client();
688 StartPi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700689
690 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800691 MakePi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700692
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800693 RunPi2Client(chrono::milliseconds(3050));
Austin Schuh5344c352020-04-12 17:04:26 -0700694
695 // Now confirm we are synchronized.
696 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
697 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
698
699 const ServerConnection *const pi1_connection =
700 pi1_server_statistics_fetcher->connections()->Get(0);
701 const ServerConnection *const pi2_connection =
702 pi2_server_statistics_fetcher->connections()->Get(0);
703
704 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
705 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
706 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
707 chrono::milliseconds(1));
708 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
709 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800710 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700711
712 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
713 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
714 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
715 chrono::milliseconds(1));
716 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
717 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800718 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800719
720 StopPi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700721 }
722
723 std::this_thread::sleep_for(std::chrono::seconds(2));
724
725 {
726 // Now confirm we are un-synchronized.
727 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
728 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
729 const ServerConnection *const pi1_connection =
730 pi1_server_statistics_fetcher->connections()->Get(0);
731 const ServerConnection *const pi2_connection =
732 pi2_server_statistics_fetcher->connections()->Get(0);
733
734 EXPECT_EQ(pi1_connection->state(), State::DISCONNECTED);
735 EXPECT_FALSE(pi1_connection->has_monotonic_offset());
Austin Schuh20ac95d2020-12-05 17:24:19 -0800736 EXPECT_FALSE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700737 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
738 EXPECT_FALSE(pi2_connection->has_monotonic_offset());
Austin Schuh20ac95d2020-12-05 17:24:19 -0800739 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700740 }
741
742 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800743 MakePi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700744 // And go!
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800745 RunPi2Client(chrono::milliseconds(3050));
Austin Schuh5344c352020-04-12 17:04:26 -0700746
747 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
748 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
749
750 // Now confirm we are synchronized again.
751 const ServerConnection *const pi1_connection =
752 pi1_server_statistics_fetcher->connections()->Get(0);
753 const ServerConnection *const pi2_connection =
754 pi2_server_statistics_fetcher->connections()->Get(0);
755
756 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
757 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
758 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
759 chrono::milliseconds(1));
760 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
761 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800762 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700763
764 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
765 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
766 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
767 chrono::milliseconds(1));
768 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
769 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800770 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800771
772 StopPi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700773 }
774
775 // Shut everyone else down
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800776 StopPi1Server();
777 StopPi1Client();
778 StopPi2Server();
779 StopPi1Test();
780 StopPi2Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700781}
782
783// Test that the server disconnecting triggers the server offsets on the other
784// side to clear, along with the other client.
Austin Schuhe991fe22020-11-18 16:53:39 -0800785TEST_F(MessageBridgeTest, ServerRestart) {
Austin Schuh5344c352020-04-12 17:04:26 -0700786 // This is rather annoying to set up. We need to start up a client and
787 // server, on the same node, but get them to think that they are on different
788 // nodes.
789 //
790 // We need the client to not post directly to "/test" like it would in a
791 // real system, otherwise we will re-send the ping message... So, use an
792 // application specific map to have the client post somewhere else.
793 //
794 // To top this all off, each of these needs to be done with a ShmEventLoop,
795 // which needs to run in a separate thread... And it is really hard to get
796 // everything started up reliably. So just be super generous on timeouts and
797 // hope for the best. We can be more generous in the future if we need to.
798 //
799 // We are faking the application names by passing in --application_name=foo
Austin Schuh5344c352020-04-12 17:04:26 -0700800 // Force ourselves to be "raspberrypi" and allocate everything.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800801 OnPi1();
802 MakePi1Server();
803 MakePi1Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700804
805 // And build the app for testing.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800806 MakePi1Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700807 aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800808 pi1_test_event_loop->MakeFetcher<ServerStatistics>("/pi1/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700809 aos::Fetcher<ClientStatistics> pi1_client_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800810 pi1_test_event_loop->MakeFetcher<ClientStatistics>("/pi1/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700811
812 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800813 OnPi2();
814 MakePi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700815
816 // And build the app for testing.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800817 MakePi2Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700818 aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800819 pi2_test_event_loop->MakeFetcher<ServerStatistics>("/pi2/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700820
821 // Start everything up. Pong is the only thing we don't know how to wait on,
822 // so start it first.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800823 StartPi1Test();
824 StartPi2Test();
825 StartPi1Server();
826 StartPi1Client();
827 StartPi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700828
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800829 // Confirm both client and server statistics messages have decent offsets in
830 // them.
Austin Schuh5344c352020-04-12 17:04:26 -0700831
832 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800833 MakePi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700834
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800835 RunPi2Server(chrono::milliseconds(3050));
Austin Schuh5344c352020-04-12 17:04:26 -0700836
837 // Now confirm we are synchronized.
838 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
839 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
840
841 const ServerConnection *const pi1_connection =
842 pi1_server_statistics_fetcher->connections()->Get(0);
843 const ServerConnection *const pi2_connection =
844 pi2_server_statistics_fetcher->connections()->Get(0);
845
846 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
847 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
848 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
849 chrono::milliseconds(1));
850 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
851 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800852 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700853
854 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
855 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
856 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
857 chrono::milliseconds(1));
858 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
859 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800860 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800861
862 StopPi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700863 }
864
865 std::this_thread::sleep_for(std::chrono::seconds(2));
866
867 {
868 // And confirm we are unsynchronized.
869 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
870 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
871
872 const ServerConnection *const pi1_server_connection =
873 pi1_server_statistics_fetcher->connections()->Get(0);
874 const ClientConnection *const pi1_client_connection =
875 pi1_client_statistics_fetcher->connections()->Get(0);
876
877 EXPECT_EQ(pi1_server_connection->state(), State::CONNECTED);
878 EXPECT_FALSE(pi1_server_connection->has_monotonic_offset());
Austin Schuh20ac95d2020-12-05 17:24:19 -0800879 EXPECT_TRUE(pi1_server_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700880 EXPECT_EQ(pi1_client_connection->state(), State::DISCONNECTED);
881 EXPECT_FALSE(pi1_client_connection->has_monotonic_offset());
882 }
883
884 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800885 MakePi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700886
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800887 RunPi2Server(chrono::milliseconds(3050));
Austin Schuh5344c352020-04-12 17:04:26 -0700888
889 // And confirm we are synchronized again.
890 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
891 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
892
893 const ServerConnection *const pi1_connection =
894 pi1_server_statistics_fetcher->connections()->Get(0);
895 const ServerConnection *const pi2_connection =
896 pi2_server_statistics_fetcher->connections()->Get(0);
897
898 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
899 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
900 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
901 chrono::milliseconds(1));
902 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
903 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800904 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700905
906 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
907 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
908 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
909 chrono::milliseconds(1));
910 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
911 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800912 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800913
914 StopPi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700915 }
916
917 // Shut everyone else down
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800918 StopPi1Server();
919 StopPi1Client();
920 StopPi2Client();
921 StopPi1Test();
922 StopPi2Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700923}
924
Austin Schuh4889b182020-11-18 19:11:56 -0800925// TODO(austin): The above test confirms that the external state does the right
Austin Schuh5344c352020-04-12 17:04:26 -0700926// thing, but doesn't confirm that the internal state does. We either need to
927// expose a way to check the state in a thread-safe way, or need a way to jump
928// time for one node to do that.
929
Austin Schuh4889b182020-11-18 19:11:56 -0800930void SendPing(aos::Sender<examples::Ping> *sender, int value) {
931 aos::Sender<examples::Ping>::Builder builder = sender->MakeBuilder();
932 examples::Ping::Builder ping_builder = builder.MakeBuilder<examples::Ping>();
933 ping_builder.add_value(value);
934 builder.Send(ping_builder.Finish());
935}
936
937// Tests that when a message is sent before the bridge starts up, but is
938// configured as reliable, we forward it. Confirm this survives a client reset.
939TEST_F(MessageBridgeTest, ReliableSentBeforeClientStartup) {
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800940 OnPi1();
Austin Schuh4889b182020-11-18 19:11:56 -0800941
942 FLAGS_application_name = "sender";
943 aos::ShmEventLoop send_event_loop(&pi1_config.message());
944 aos::Sender<examples::Ping> ping_sender =
945 send_event_loop.MakeSender<examples::Ping>("/test");
946 SendPing(&ping_sender, 1);
947 aos::Sender<examples::Ping> unreliable_ping_sender =
948 send_event_loop.MakeSender<examples::Ping>("/unreliable");
949 SendPing(&unreliable_ping_sender, 1);
950
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800951 MakePi1Server();
952 MakePi1Client();
Austin Schuh4889b182020-11-18 19:11:56 -0800953
954 FLAGS_application_name = "pi1_timestamp";
955 aos::ShmEventLoop pi1_remote_timestamp_event_loop(&pi1_config.message());
956
957 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800958 OnPi2();
Austin Schuh4889b182020-11-18 19:11:56 -0800959
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800960 MakePi2Server();
Austin Schuh4889b182020-11-18 19:11:56 -0800961
962 aos::ShmEventLoop receive_event_loop(&pi2_config.message());
963 aos::Fetcher<examples::Ping> ping_fetcher =
964 receive_event_loop.MakeFetcher<examples::Ping>("/test");
965 aos::Fetcher<examples::Ping> unreliable_ping_fetcher =
966 receive_event_loop.MakeFetcher<examples::Ping>("/unreliable");
967 aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
968 receive_event_loop.MakeFetcher<ClientStatistics>("/pi2/aos");
969
970 const size_t ping_channel_index = configuration::ChannelIndex(
971 receive_event_loop.configuration(), ping_fetcher.channel());
972
973 std::atomic<int> ping_timestamp_count{0};
974 pi1_remote_timestamp_event_loop.MakeWatcher(
975 "/pi1/aos/remote_timestamps/pi2",
Austin Schuh0de30f32020-12-06 12:44:28 -0800976 [ping_channel_index, &ping_timestamp_count](const RemoteMessage &header) {
977 VLOG(1) << "/pi1/aos/remote_timestamps/pi2 RemoteMessage "
978 << aos::FlatbufferToJson(&header);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800979 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuh4889b182020-11-18 19:11:56 -0800980 if (header.channel_index() == ping_channel_index) {
981 ++ping_timestamp_count;
982 }
983 });
984
985 // Before everything starts up, confirm there is no message.
986 EXPECT_FALSE(ping_fetcher.Fetch());
987 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
988
989 // Spin up the persistant pieces.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800990 StartPi1Server();
991 StartPi1Client();
992 StartPi2Server();
Austin Schuh4889b182020-11-18 19:11:56 -0800993
994 // Event used to wait for the timestamp counting thread to start.
995 aos::Event event;
996 std::thread pi1_remote_timestamp_thread(
997 [&pi1_remote_timestamp_event_loop, &event]() {
998 pi1_remote_timestamp_event_loop.OnRun([&event]() { event.Set(); });
999 pi1_remote_timestamp_event_loop.Run();
1000 });
1001
1002 event.Wait();
1003
1004 {
1005 // Now, spin up a client for 2 seconds.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001006 MakePi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001007
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001008 RunPi2Client(chrono::milliseconds(2050));
Austin Schuh4889b182020-11-18 19:11:56 -08001009
1010 // Confirm there is no detected duplicate packet.
1011 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1012 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1013 ->Get(0)
1014 ->duplicate_packets(),
1015 0u);
1016
1017 EXPECT_TRUE(ping_fetcher.Fetch());
1018 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
1019 EXPECT_EQ(ping_timestamp_count, 1);
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001020
1021 StopPi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001022 }
1023
1024 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001025 // Now, spin up a client for 2 seconds.
1026 MakePi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001027
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001028 RunPi2Client(chrono::milliseconds(5050));
Austin Schuh4889b182020-11-18 19:11:56 -08001029
1030 // Confirm we detect the duplicate packet correctly.
1031 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1032 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1033 ->Get(0)
1034 ->duplicate_packets(),
1035 1u);
1036
1037 EXPECT_EQ(ping_timestamp_count, 1);
1038 EXPECT_FALSE(ping_fetcher.Fetch());
1039 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001040
1041 StopPi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001042 }
1043
1044 // Shut everyone else down
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001045 StopPi1Client();
1046 StopPi2Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001047 pi1_remote_timestamp_event_loop.Exit();
1048 pi1_remote_timestamp_thread.join();
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001049 StopPi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001050}
1051
1052// Tests that when a message is sent before the bridge starts up, but is
1053// configured as reliable, we forward it. Confirm this works across server
1054// resets.
1055TEST_F(MessageBridgeTest, ReliableSentBeforeServerStartup) {
1056 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001057 OnPi2();
Austin Schuh4889b182020-11-18 19:11:56 -08001058
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001059 MakePi2Server();
1060 MakePi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001061
1062 aos::ShmEventLoop receive_event_loop(&pi2_config.message());
1063 aos::Fetcher<examples::Ping> ping_fetcher =
1064 receive_event_loop.MakeFetcher<examples::Ping>("/test");
1065 aos::Fetcher<examples::Ping> unreliable_ping_fetcher =
1066 receive_event_loop.MakeFetcher<examples::Ping>("/unreliable");
1067 aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
1068 receive_event_loop.MakeFetcher<ClientStatistics>("/pi2/aos");
1069
Austin Schuh4889b182020-11-18 19:11:56 -08001070 // Force ourselves to be "raspberrypi" and allocate everything.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001071 OnPi1();
Austin Schuh4889b182020-11-18 19:11:56 -08001072
1073 FLAGS_application_name = "sender";
1074 aos::ShmEventLoop send_event_loop(&pi1_config.message());
1075 aos::Sender<examples::Ping> ping_sender =
1076 send_event_loop.MakeSender<examples::Ping>("/test");
1077 {
1078 aos::Sender<examples::Ping>::Builder builder = ping_sender.MakeBuilder();
1079 examples::Ping::Builder ping_builder =
1080 builder.MakeBuilder<examples::Ping>();
1081 ping_builder.add_value(1);
1082 builder.Send(ping_builder.Finish());
1083 }
1084
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001085 MakePi1Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001086
1087 FLAGS_application_name = "pi1_timestamp";
1088 aos::ShmEventLoop pi1_remote_timestamp_event_loop(&pi1_config.message());
1089
1090 const size_t ping_channel_index = configuration::ChannelIndex(
1091 receive_event_loop.configuration(), ping_fetcher.channel());
1092
1093 std::atomic<int> ping_timestamp_count{0};
1094 pi1_remote_timestamp_event_loop.MakeWatcher(
1095 "/pi1/aos/remote_timestamps/pi2",
Austin Schuh0de30f32020-12-06 12:44:28 -08001096 [ping_channel_index, &ping_timestamp_count](const RemoteMessage &header) {
1097 VLOG(1) << "/pi1/aos/remote_timestamps/pi2 RemoteMessage "
1098 << aos::FlatbufferToJson(&header);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001099 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuh4889b182020-11-18 19:11:56 -08001100 if (header.channel_index() == ping_channel_index) {
1101 ++ping_timestamp_count;
1102 }
1103 });
1104
1105 // Before everything starts up, confirm there is no message.
1106 EXPECT_FALSE(ping_fetcher.Fetch());
1107 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
1108
1109 // Spin up the persistant pieces.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001110 StartPi1Client();
1111 StartPi2Server();
1112 StartPi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001113
1114 // Event used to wait for the timestamp counting thread to start.
1115 aos::Event event;
1116 std::thread pi1_remote_timestamp_thread(
1117 [&pi1_remote_timestamp_event_loop, &event]() {
1118 pi1_remote_timestamp_event_loop.OnRun([&event]() { event.Set(); });
1119 pi1_remote_timestamp_event_loop.Run();
1120 });
1121
1122 event.Wait();
1123
1124 {
1125 // Now, spin up a server for 2 seconds.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001126 MakePi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001127
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001128 RunPi1Server(chrono::milliseconds(2050));
Austin Schuh4889b182020-11-18 19:11:56 -08001129
1130 // Confirm there is no detected duplicate packet.
1131 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1132 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1133 ->Get(0)
1134 ->duplicate_packets(),
1135 0u);
1136
1137 EXPECT_TRUE(ping_fetcher.Fetch());
1138 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
1139 EXPECT_EQ(ping_timestamp_count, 1);
1140 LOG(INFO) << "Shutting down first pi1 MessageBridgeServer";
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001141
1142 StopPi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001143 }
1144
1145 {
1146 // Now, spin up a second server for 2 seconds.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001147 MakePi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001148
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001149 RunPi1Server(chrono::milliseconds(2050));
Austin Schuh4889b182020-11-18 19:11:56 -08001150
1151 // Confirm we detect the duplicate packet correctly.
1152 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1153 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1154 ->Get(0)
1155 ->duplicate_packets(),
1156 1u);
1157
1158 EXPECT_EQ(ping_timestamp_count, 1);
1159 EXPECT_FALSE(ping_fetcher.Fetch());
1160 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001161
1162 StopPi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001163 }
1164
1165 // Shut everyone else down
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001166 StopPi1Client();
1167 StopPi2Server();
1168 StopPi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001169 pi1_remote_timestamp_event_loop.Exit();
1170 pi1_remote_timestamp_thread.join();
Austin Schuh4889b182020-11-18 19:11:56 -08001171}
1172
Austin Schuhe84c3ed2019-12-14 15:29:48 -08001173} // namespace testing
1174} // namespace message_bridge
1175} // namespace aos