blob: ca5bcb8f067d5fef2c404d4dd96523f8ba89c73b [file] [log] [blame]
Austin Schuhe84c3ed2019-12-14 15:29:48 -08001#include <chrono>
2#include <thread>
3
Austin Schuh2f8fd752020-09-01 22:38:28 -07004#include "absl/strings/str_cat.h"
Austin Schuhe84c3ed2019-12-14 15:29:48 -08005#include "aos/events/ping_generated.h"
6#include "aos/events/pong_generated.h"
Brian Silverman7b266d92021-02-17 21:24:02 -08007#include "aos/ipc_lib/event.h"
Austin Schuhe84c3ed2019-12-14 15:29:48 -08008#include "aos/network/message_bridge_client_lib.h"
9#include "aos/network/message_bridge_server_lib.h"
Jim Ostrowski2192ddb2020-06-24 19:07:31 -070010#include "aos/network/team_number.h"
Austin Schuhe991fe22020-11-18 16:53:39 -080011#include "aos/util/file.h"
Brian Silverman7b266d92021-02-17 21:24:02 -080012#include "gtest/gtest.h"
Austin Schuhe84c3ed2019-12-14 15:29:48 -080013
14namespace aos {
Austin Schuh2f8fd752020-09-01 22:38:28 -070015void SetShmBase(const std::string_view base);
16
Austin Schuhe84c3ed2019-12-14 15:29:48 -080017namespace message_bridge {
18namespace testing {
19
20namespace chrono = std::chrono;
21
Austin Schuhe991fe22020-11-18 16:53:39 -080022std::string ShmBase(const std::string_view node) {
Austin Schuh2f8fd752020-09-01 22:38:28 -070023 const char *tmpdir_c_str = getenv("TEST_TMPDIR");
24 if (tmpdir_c_str != nullptr) {
Austin Schuhe991fe22020-11-18 16:53:39 -080025 return absl::StrCat(tmpdir_c_str, "/", node);
Austin Schuh2f8fd752020-09-01 22:38:28 -070026 } else {
Austin Schuhe991fe22020-11-18 16:53:39 -080027 return absl::StrCat("/dev/shm/", node);
Austin Schuh2f8fd752020-09-01 22:38:28 -070028 }
29}
30
Austin Schuhe991fe22020-11-18 16:53:39 -080031void DoSetShmBase(const std::string_view node) {
32 aos::SetShmBase(ShmBase(node));
33}
34
35class MessageBridgeTest : public ::testing::Test {
Austin Schuh0de30f32020-12-06 12:44:28 -080036 public:
37 MessageBridgeTest()
38 : pi1_config(aos::configuration::ReadConfig(
39 "aos/network/message_bridge_test_server_config.json")),
40 pi2_config(aos::configuration::ReadConfig(
41 "aos/network/message_bridge_test_client_config.json")) {
42 util::UnlinkRecursive(ShmBase("pi1"));
43 util::UnlinkRecursive(ShmBase("pi2"));
44 }
Austin Schuhe991fe22020-11-18 16:53:39 -080045
Austin Schuh0a2f12f2021-01-08 22:48:29 -080046 void OnPi1() {
47 DoSetShmBase("pi1");
48 FLAGS_override_hostname = "raspberrypi";
49 }
50
51 void OnPi2() {
52 DoSetShmBase("pi2");
53 FLAGS_override_hostname = "raspberrypi2";
54 }
55
56 void MakePi1Server() {
57 OnPi1();
58 FLAGS_application_name = "pi1_message_bridge_server";
59 pi1_server_event_loop =
60 std::make_unique<aos::ShmEventLoop>(&pi1_config.message());
61 pi1_server_event_loop->SetRuntimeRealtimePriority(1);
62 pi1_message_bridge_server =
63 std::make_unique<MessageBridgeServer>(pi1_server_event_loop.get());
64 }
65
66 void RunPi1Server(chrono::nanoseconds duration) {
67 // Setup a shutdown callback.
68 aos::TimerHandler *const quit = pi1_server_event_loop->AddTimer(
69 [this]() { pi1_server_event_loop->Exit(); });
70 pi1_server_event_loop->OnRun([this, quit, duration]() {
71 // Stop between timestamps, not exactly on them.
72 quit->Setup(pi1_server_event_loop->monotonic_now() + duration);
73 });
74
75 pi1_server_event_loop->Run();
76 }
77
78 void StartPi1Server() {
79 pi1_server_thread = std::thread([this]() {
80 LOG(INFO) << "Started pi1_message_bridge_server";
81 pi1_server_event_loop->Run();
82 });
83 }
84
85 void StopPi1Server() {
86 if (pi1_server_thread.joinable()) {
87 pi1_server_event_loop->Exit();
88 pi1_server_thread.join();
89 pi1_server_thread = std::thread();
90 }
91 pi1_message_bridge_server.reset();
92 pi1_server_event_loop.reset();
93 }
94
95 void MakePi1Client() {
96 OnPi1();
97 FLAGS_application_name = "pi1_message_bridge_client";
98 pi1_client_event_loop =
99 std::make_unique<aos::ShmEventLoop>(&pi1_config.message());
100 pi1_client_event_loop->SetRuntimeRealtimePriority(1);
101 pi1_message_bridge_client =
102 std::make_unique<MessageBridgeClient>(pi1_client_event_loop.get());
103 }
104
105 void StartPi1Client() {
106 pi1_client_thread = std::thread([this]() {
107 LOG(INFO) << "Started pi1_message_bridge_client";
108 pi1_client_event_loop->Run();
109 });
110 }
111
112 void StopPi1Client() {
113 pi1_client_event_loop->Exit();
114 pi1_client_thread.join();
115 pi1_client_thread = std::thread();
116 pi1_message_bridge_client.reset();
117 pi1_client_event_loop.reset();
118 }
119
120 void MakePi1Test() {
121 OnPi1();
122 FLAGS_application_name = "test1";
123 pi1_test_event_loop =
124 std::make_unique<aos::ShmEventLoop>(&pi1_config.message());
125
126 pi1_test_event_loop->MakeWatcher(
127 "/pi1/aos", [](const ServerStatistics &stats) {
128 VLOG(1) << "/pi1/aos ServerStatistics " << FlatbufferToJson(&stats);
129 });
130
131 pi1_test_event_loop->MakeWatcher(
132 "/pi1/aos", [](const ClientStatistics &stats) {
133 VLOG(1) << "/pi1/aos ClientStatistics " << FlatbufferToJson(&stats);
134 });
135
136 pi1_test_event_loop->MakeWatcher(
137 "/pi1/aos", [](const Timestamp &timestamp) {
138 VLOG(1) << "/pi1/aos Timestamp " << FlatbufferToJson(&timestamp);
139 });
140 }
141
142 void StartPi1Test() {
143 pi1_test_thread = std::thread([this]() {
144 LOG(INFO) << "Started pi1_test";
145 pi1_test_event_loop->Run();
146 });
147 }
148
149 void StopPi1Test() {
150 pi1_test_event_loop->Exit();
151 pi1_test_thread.join();
152 }
153
154 void MakePi2Server() {
155 OnPi2();
156 FLAGS_application_name = "pi2_message_bridge_server";
157 pi2_server_event_loop =
158 std::make_unique<aos::ShmEventLoop>(&pi2_config.message());
159 pi2_server_event_loop->SetRuntimeRealtimePriority(1);
160 pi2_message_bridge_server =
161 std::make_unique<MessageBridgeServer>(pi2_server_event_loop.get());
162 }
163
164 void RunPi2Server(chrono::nanoseconds duration) {
165 // Setup a shutdown callback.
166 aos::TimerHandler *const quit = pi2_server_event_loop->AddTimer(
167 [this]() { pi2_server_event_loop->Exit(); });
168 pi2_server_event_loop->OnRun([this, quit, duration]() {
169 // Stop between timestamps, not exactly on them.
170 quit->Setup(pi2_server_event_loop->monotonic_now() + duration);
171 });
172
173 pi2_server_event_loop->Run();
174 }
175
176 void StartPi2Server() {
177 pi2_server_thread = std::thread([this]() {
178 LOG(INFO) << "Started pi2_message_bridge_server";
179 pi2_server_event_loop->Run();
180 });
181 }
182
183 void StopPi2Server() {
184 if (pi2_server_thread.joinable()) {
185 pi2_server_event_loop->Exit();
186 pi2_server_thread.join();
187 pi2_server_thread = std::thread();
188 }
189 pi2_message_bridge_server.reset();
190 pi2_server_event_loop.reset();
191 }
192
193 void MakePi2Client() {
194 OnPi2();
195 FLAGS_application_name = "pi2_message_bridge_client";
196 pi2_client_event_loop =
197 std::make_unique<aos::ShmEventLoop>(&pi2_config.message());
198 pi2_client_event_loop->SetRuntimeRealtimePriority(1);
199 pi2_message_bridge_client =
200 std::make_unique<MessageBridgeClient>(pi2_client_event_loop.get());
201 }
202
203 void RunPi2Client(chrono::nanoseconds duration) {
204 // Run for 5 seconds to make sure we have time to estimate the offset.
205 aos::TimerHandler *const quit = pi2_client_event_loop->AddTimer(
206 [this]() { pi2_client_event_loop->Exit(); });
207 pi2_client_event_loop->OnRun([this, quit, duration]() {
208 // Stop between timestamps, not exactly on them.
209 quit->Setup(pi2_client_event_loop->monotonic_now() + duration);
210 });
211
212 // And go!
213 pi2_client_event_loop->Run();
214 }
215
216 void StartPi2Client() {
217 pi2_client_thread = std::thread([this]() {
218 LOG(INFO) << "Started pi2_message_bridge_client";
219 pi2_client_event_loop->Run();
220 });
221 }
222
223 void StopPi2Client() {
224 if (pi2_client_thread.joinable()) {
225 pi2_client_event_loop->Exit();
226 pi2_client_thread.join();
227 pi2_client_thread = std::thread();
228 }
229 pi2_message_bridge_client.reset();
230 pi2_client_event_loop.reset();
231 }
232
233 void MakePi2Test() {
234 OnPi2();
235 FLAGS_application_name = "test2";
236 pi2_test_event_loop =
237 std::make_unique<aos::ShmEventLoop>(&pi2_config.message());
238
239 pi2_test_event_loop->MakeWatcher(
240 "/pi2/aos", [](const ServerStatistics &stats) {
241 VLOG(1) << "/pi2/aos ServerStatistics " << FlatbufferToJson(&stats);
242 });
243
244 pi2_test_event_loop->MakeWatcher(
245 "/pi2/aos", [](const ClientStatistics &stats) {
246 VLOG(1) << "/pi2/aos ClientStatistics " << FlatbufferToJson(&stats);
247 });
248
249 pi2_test_event_loop->MakeWatcher(
250 "/pi2/aos", [](const Timestamp &timestamp) {
251 VLOG(1) << "/pi2/aos Timestamp " << FlatbufferToJson(&timestamp);
252 });
253 }
254
255 void StartPi2Test() {
256 pi2_test_thread = std::thread([this]() {
257 LOG(INFO) << "Started pi2_message_bridge_test";
258 pi2_test_event_loop->Run();
259 });
260 }
261
262 void StopPi2Test() {
263 pi2_test_event_loop->Exit();
264 pi2_test_thread.join();
265 }
266
Austin Schuh0de30f32020-12-06 12:44:28 -0800267 aos::FlatbufferDetachedBuffer<aos::Configuration> pi1_config;
268 aos::FlatbufferDetachedBuffer<aos::Configuration> pi2_config;
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800269
270 std::unique_ptr<aos::ShmEventLoop> pi1_server_event_loop;
271 std::unique_ptr<MessageBridgeServer> pi1_message_bridge_server;
272 std::thread pi1_server_thread;
273
274 std::unique_ptr<aos::ShmEventLoop> pi1_client_event_loop;
275 std::unique_ptr<MessageBridgeClient> pi1_message_bridge_client;
276 std::thread pi1_client_thread;
277
278 std::unique_ptr<aos::ShmEventLoop> pi1_test_event_loop;
279 std::thread pi1_test_thread;
280
281 std::unique_ptr<aos::ShmEventLoop> pi2_server_event_loop;
282 std::unique_ptr<MessageBridgeServer> pi2_message_bridge_server;
283 std::thread pi2_server_thread;
284
285 std::unique_ptr<aos::ShmEventLoop> pi2_client_event_loop;
286 std::unique_ptr<MessageBridgeClient> pi2_message_bridge_client;
287 std::thread pi2_client_thread;
288
289 std::unique_ptr<aos::ShmEventLoop> pi2_test_event_loop;
290 std::thread pi2_test_thread;
Austin Schuhe991fe22020-11-18 16:53:39 -0800291};
292
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800293// Test that we can send a ping message over sctp and receive it.
Austin Schuhe991fe22020-11-18 16:53:39 -0800294TEST_F(MessageBridgeTest, PingPong) {
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800295 // This is rather annoying to set up. We need to start up a client and
296 // server, on the same node, but get them to think that they are on different
297 // nodes.
298 //
299 // We then get to wait until they are connected.
300 //
301 // After they are connected, we send a Ping message.
302 //
303 // On the other end, we receive a Pong message.
304 //
305 // But, we need the client to not post directly to "/test" like it would in a
306 // real system, otherwise we will re-send the ping message... So, use an
307 // application specific map to have the client post somewhere else.
308 //
309 // To top this all off, each of these needs to be done with a ShmEventLoop,
310 // which needs to run in a separate thread... And it is really hard to get
311 // everything started up reliably. So just be super generous on timeouts and
312 // hope for the best. We can be more generous in the future if we need to.
313 //
314 // We are faking the application names by passing in --application_name=foo
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800315 OnPi1();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800316 // Force ourselves to be "raspberrypi" and allocate everything.
Austin Schuh2f8fd752020-09-01 22:38:28 -0700317
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800318 MakePi1Server();
319 MakePi1Client();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800320
321 // And build the app which sends the pings.
322 FLAGS_application_name = "ping";
Austin Schuhe991fe22020-11-18 16:53:39 -0800323 aos::ShmEventLoop ping_event_loop(&pi1_config.message());
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800324 aos::Sender<examples::Ping> ping_sender =
325 ping_event_loop.MakeSender<examples::Ping>("/test");
326
Austin Schuhe991fe22020-11-18 16:53:39 -0800327 aos::ShmEventLoop pi1_test_event_loop(&pi1_config.message());
Austin Schuh0de30f32020-12-06 12:44:28 -0800328 aos::Fetcher<RemoteMessage> message_header_fetcher1 =
329 pi1_test_event_loop.MakeFetcher<RemoteMessage>(
Austin Schuh2f8fd752020-09-01 22:38:28 -0700330 "/pi1/aos/remote_timestamps/pi2");
331
332 // Fetchers for confirming the remote timestamps made it.
333 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
334 ping_event_loop.MakeFetcher<examples::Ping>("/test");
335 aos::Fetcher<Timestamp> pi1_on_pi1_timestamp_fetcher =
336 ping_event_loop.MakeFetcher<Timestamp>("/aos");
337
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800338 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800339 OnPi2();
Austin Schuh2f8fd752020-09-01 22:38:28 -0700340
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800341 MakePi2Client();
342 MakePi2Server();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800343
344 // And build the app which sends the pongs.
345 FLAGS_application_name = "pong";
Austin Schuh5344c352020-04-12 17:04:26 -0700346 aos::ShmEventLoop pong_event_loop(&pi2_config.message());
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800347
Austin Schuh7bc59052020-02-16 23:48:33 -0800348 // And build the app for testing.
349 FLAGS_application_name = "test";
Austin Schuh5344c352020-04-12 17:04:26 -0700350 aos::ShmEventLoop test_event_loop(&pi2_config.message());
Austin Schuh7bc59052020-02-16 23:48:33 -0800351
352 aos::Fetcher<ClientStatistics> client_statistics_fetcher =
353 test_event_loop.MakeFetcher<ClientStatistics>("/aos");
Austin Schuh0de30f32020-12-06 12:44:28 -0800354 aos::Fetcher<RemoteMessage> message_header_fetcher2 =
355 test_event_loop.MakeFetcher<RemoteMessage>(
Austin Schuh2f8fd752020-09-01 22:38:28 -0700356 "/pi2/aos/remote_timestamps/pi1");
357
358 // Event loop for fetching data delivered to pi2 from pi1 to match up
359 // messages.
360 aos::ShmEventLoop delivered_messages_event_loop(&pi2_config.message());
361 aos::Fetcher<Timestamp> pi1_on_pi2_timestamp_fetcher =
362 delivered_messages_event_loop.MakeFetcher<Timestamp>("/pi1/aos");
363 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
364 delivered_messages_event_loop.MakeFetcher<examples::Ping>("/test");
365 EXPECT_FALSE(ping_on_pi2_fetcher.Fetch());
366 EXPECT_FALSE(pi1_on_pi2_timestamp_fetcher.Fetch());
Austin Schuh7bc59052020-02-16 23:48:33 -0800367
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800368 // Count the pongs.
369 int pong_count = 0;
370 pong_event_loop.MakeWatcher(
Austin Schuh2f8fd752020-09-01 22:38:28 -0700371 "/test", [&pong_count](const examples::Ping &ping) {
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800372 ++pong_count;
Austin Schuh1ca49e92020-12-11 00:01:27 -0800373 VLOG(1) << "Got ping back " << FlatbufferToJson(&ping);
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800374 });
375
376 FLAGS_override_hostname = "";
377
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800378 // Wait until we are connected, then send.
379 int ping_count = 0;
Austin Schuh7bc59052020-02-16 23:48:33 -0800380 int pi1_server_statistics_count = 0;
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800381 ping_event_loop.MakeWatcher(
Austin Schuh196a4452020-03-15 23:12:03 -0700382 "/pi1/aos",
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800383 [this, &ping_count, &ping_sender,
Austin Schuh7bc59052020-02-16 23:48:33 -0800384 &pi1_server_statistics_count](const ServerStatistics &stats) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800385 VLOG(1) << "/pi1/aos ServerStatistics " << FlatbufferToJson(&stats);
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800386
387 ASSERT_TRUE(stats.has_connections());
388 EXPECT_EQ(stats.connections()->size(), 1);
389
390 bool connected = false;
391 for (const ServerConnection *connection : *stats.connections()) {
Austin Schuh7bc59052020-02-16 23:48:33 -0800392 // Confirm that we are estimating the server time offset correctly. It
393 // should be about 0 since we are on the same machine here.
394 if (connection->has_monotonic_offset()) {
395 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
396 chrono::milliseconds(1));
397 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
398 chrono::milliseconds(-1));
399 ++pi1_server_statistics_count;
400 }
401
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800402 if (connection->node()->name()->string_view() ==
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800403 pi2_client_event_loop->node()->name()->string_view()) {
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800404 if (connection->state() == State::CONNECTED) {
Austin Schuh20ac95d2020-12-05 17:24:19 -0800405 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800406 connected = true;
407 }
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800408 }
409 }
410
411 if (connected) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800412 VLOG(1) << "Connected! Sent ping.";
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800413 auto builder = ping_sender.MakeBuilder();
414 examples::Ping::Builder ping_builder =
415 builder.MakeBuilder<examples::Ping>();
416 ping_builder.add_value(ping_count + 971);
417 builder.Send(ping_builder.Finish());
418 ++ping_count;
419 }
420 });
421
Austin Schuh7bc59052020-02-16 23:48:33 -0800422 // Confirm both client and server statistics messages have decent offsets in
423 // them.
424 int pi2_server_statistics_count = 0;
Austin Schuh196a4452020-03-15 23:12:03 -0700425 pong_event_loop.MakeWatcher("/pi2/aos", [&pi2_server_statistics_count](
Austin Schuh7bc59052020-02-16 23:48:33 -0800426 const ServerStatistics &stats) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800427 VLOG(1) << "/pi2/aos ServerStatistics " << FlatbufferToJson(&stats);
Austin Schuh7bc59052020-02-16 23:48:33 -0800428 for (const ServerConnection *connection : *stats.connections()) {
429 if (connection->has_monotonic_offset()) {
430 ++pi2_server_statistics_count;
431 // Confirm that we are estimating the server time offset correctly. It
432 // should be about 0 since we are on the same machine here.
433 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
434 chrono::milliseconds(1));
435 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
436 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800437 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh7bc59052020-02-16 23:48:33 -0800438 }
439 }
440 });
441
442 int pi1_client_statistics_count = 0;
Austin Schuh5344c352020-04-12 17:04:26 -0700443 ping_event_loop.MakeWatcher("/pi1/aos", [&pi1_client_statistics_count](
444 const ClientStatistics &stats) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800445 VLOG(1) << "/pi1/aos ClientStatistics " << FlatbufferToJson(&stats);
Austin Schuh7bc59052020-02-16 23:48:33 -0800446
Austin Schuh5344c352020-04-12 17:04:26 -0700447 for (const ClientConnection *connection : *stats.connections()) {
448 if (connection->has_monotonic_offset()) {
449 ++pi1_client_statistics_count;
450 // It takes at least 10 microseconds to send a message between the
451 // client and server. The min (filtered) time shouldn't be over 10
452 // milliseconds on localhost. This might have to bump up if this is
453 // proving flaky.
454 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
Austin Schuh3edddcc2020-12-29 13:32:02 -0800455 chrono::milliseconds(10))
456 << " " << connection->monotonic_offset()
457 << "ns vs 10000ns on iteration " << pi1_client_statistics_count;
Austin Schuh5344c352020-04-12 17:04:26 -0700458 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
Austin Schuh3edddcc2020-12-29 13:32:02 -0800459 chrono::microseconds(10))
460 << " " << connection->monotonic_offset()
461 << "ns vs 10000ns on iteration " << pi1_client_statistics_count;
Austin Schuh5344c352020-04-12 17:04:26 -0700462 }
463 }
464 });
Austin Schuh7bc59052020-02-16 23:48:33 -0800465
466 int pi2_client_statistics_count = 0;
Austin Schuh196a4452020-03-15 23:12:03 -0700467 pong_event_loop.MakeWatcher("/pi2/aos", [&pi2_client_statistics_count](
Austin Schuh7bc59052020-02-16 23:48:33 -0800468 const ClientStatistics &stats) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800469 VLOG(1) << "/pi2/aos ClientStatistics " << FlatbufferToJson(&stats);
Austin Schuh7bc59052020-02-16 23:48:33 -0800470
471 for (const ClientConnection *connection : *stats.connections()) {
472 if (connection->has_monotonic_offset()) {
473 ++pi2_client_statistics_count;
474 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
475 chrono::milliseconds(10));
476 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
477 chrono::microseconds(10));
478 }
479 }
480 });
481
Austin Schuh196a4452020-03-15 23:12:03 -0700482 ping_event_loop.MakeWatcher("/pi1/aos", [](const Timestamp &timestamp) {
Austin Schuh7bc59052020-02-16 23:48:33 -0800483 EXPECT_TRUE(timestamp.has_offsets());
Austin Schuh1ca49e92020-12-11 00:01:27 -0800484 VLOG(1) << "/pi1/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuh7bc59052020-02-16 23:48:33 -0800485 });
Austin Schuh196a4452020-03-15 23:12:03 -0700486 pong_event_loop.MakeWatcher("/pi2/aos", [](const Timestamp &timestamp) {
Austin Schuh7bc59052020-02-16 23:48:33 -0800487 EXPECT_TRUE(timestamp.has_offsets());
Austin Schuh1ca49e92020-12-11 00:01:27 -0800488 VLOG(1) << "/pi2/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuh7bc59052020-02-16 23:48:33 -0800489 });
490
491 // Run for 5 seconds to make sure we have time to estimate the offset.
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800492 aos::TimerHandler *quit = ping_event_loop.AddTimer(
493 [&ping_event_loop]() { ping_event_loop.Exit(); });
494 ping_event_loop.OnRun([quit, &ping_event_loop]() {
Austin Schuh7bc59052020-02-16 23:48:33 -0800495 // Stop between timestamps, not exactly on them.
496 quit->Setup(ping_event_loop.monotonic_now() + chrono::milliseconds(5050));
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800497 });
498
Austin Schuh2f8fd752020-09-01 22:38:28 -0700499 // Find the channel index for both the /pi1/aos Timestamp channel and Ping
500 // channel.
501 const size_t pi1_timestamp_channel = configuration::ChannelIndex(
502 pong_event_loop.configuration(), pi1_on_pi2_timestamp_fetcher.channel());
503 const size_t ping_timestamp_channel =
504 configuration::ChannelIndex(delivered_messages_event_loop.configuration(),
505 ping_on_pi2_fetcher.channel());
506
507 for (const Channel *channel : *ping_event_loop.configuration()->channels()) {
508 VLOG(1) << "Channel "
509 << configuration::ChannelIndex(ping_event_loop.configuration(),
510 channel)
511 << " " << configuration::CleanedChannelToString(channel);
512 }
513
514 // For each remote timestamp we get back, confirm that it is either a ping
515 // message, or a timestamp we sent out. Also confirm that the timestamps are
516 // correct.
517 ping_event_loop.MakeWatcher(
518 "/pi1/aos/remote_timestamps/pi2",
519 [pi1_timestamp_channel, ping_timestamp_channel, &ping_on_pi2_fetcher,
520 &ping_on_pi1_fetcher, &pi1_on_pi2_timestamp_fetcher,
Austin Schuh0de30f32020-12-06 12:44:28 -0800521 &pi1_on_pi1_timestamp_fetcher](const RemoteMessage &header) {
522 VLOG(1) << "/pi1/aos/remote_timestamps/pi2 RemoteMessage "
523 << aos::FlatbufferToJson(&header);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700524
Austin Schuh20ac95d2020-12-05 17:24:19 -0800525 EXPECT_TRUE(header.has_boot_uuid());
526
Austin Schuh2f8fd752020-09-01 22:38:28 -0700527 const aos::monotonic_clock::time_point header_monotonic_sent_time(
528 chrono::nanoseconds(header.monotonic_sent_time()));
529 const aos::realtime_clock::time_point header_realtime_sent_time(
530 chrono::nanoseconds(header.realtime_sent_time()));
531 const aos::monotonic_clock::time_point header_monotonic_remote_time(
532 chrono::nanoseconds(header.monotonic_remote_time()));
533 const aos::realtime_clock::time_point header_realtime_remote_time(
534 chrono::nanoseconds(header.realtime_remote_time()));
535
536 const Context *pi1_context = nullptr;
537 const Context *pi2_context = nullptr;
538
539 if (header.channel_index() == pi1_timestamp_channel) {
540 // Find the forwarded message.
541 while (pi1_on_pi2_timestamp_fetcher.context().monotonic_event_time <
542 header_monotonic_sent_time) {
543 ASSERT_TRUE(pi1_on_pi2_timestamp_fetcher.FetchNext());
544 }
545
546 // And the source message.
547 while (pi1_on_pi1_timestamp_fetcher.context().monotonic_event_time <
548 header_monotonic_remote_time) {
549 ASSERT_TRUE(pi1_on_pi1_timestamp_fetcher.FetchNext());
550 }
551
552 pi1_context = &pi1_on_pi1_timestamp_fetcher.context();
553 pi2_context = &pi1_on_pi2_timestamp_fetcher.context();
554 } else if (header.channel_index() == ping_timestamp_channel) {
555 // Find the forwarded message.
556 while (ping_on_pi2_fetcher.context().monotonic_event_time <
557 header_monotonic_sent_time) {
558 ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
559 }
560
561 // And the source message.
562 while (ping_on_pi1_fetcher.context().monotonic_event_time <
563 header_monotonic_remote_time) {
564 ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
565 }
566
567 pi1_context = &ping_on_pi1_fetcher.context();
568 pi2_context = &ping_on_pi2_fetcher.context();
569 } else {
570 LOG(FATAL) << "Unknown channel";
571 }
572
573 // Confirm the forwarded message has matching timestamps to the
574 // timestamps we got back.
575 EXPECT_EQ(pi2_context->queue_index, header.queue_index());
576 EXPECT_EQ(pi2_context->monotonic_event_time,
577 header_monotonic_sent_time);
578 EXPECT_EQ(pi2_context->realtime_event_time, header_realtime_sent_time);
579 EXPECT_EQ(pi2_context->realtime_remote_time,
580 header_realtime_remote_time);
581 EXPECT_EQ(pi2_context->monotonic_remote_time,
582 header_monotonic_remote_time);
583
584 // Confirm the forwarded message also matches the source message.
585 EXPECT_EQ(pi1_context->queue_index, header.queue_index());
586 EXPECT_EQ(pi1_context->monotonic_event_time,
587 header_monotonic_remote_time);
588 EXPECT_EQ(pi1_context->realtime_event_time,
589 header_realtime_remote_time);
590 });
591
Austin Schuh7bc59052020-02-16 23:48:33 -0800592 // Start everything up. Pong is the only thing we don't know how to wait on,
593 // so start it first.
594 std::thread pong_thread([&pong_event_loop]() { pong_event_loop.Run(); });
595
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800596 StartPi1Server();
597 StartPi1Client();
598 StartPi2Client();
599 StartPi2Server();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800600
601 // And go!
602 ping_event_loop.Run();
603
604 // Shut everyone else down
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800605 StopPi1Server();
606 StopPi1Client();
607 StopPi2Client();
608 StopPi2Server();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800609 pong_event_loop.Exit();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800610 pong_thread.join();
611
612 // Make sure we sent something.
613 EXPECT_GE(ping_count, 1);
614 // And got something back.
615 EXPECT_GE(pong_count, 1);
Austin Schuh7bc59052020-02-16 23:48:33 -0800616
617 // Confirm that we are estimating a monotonic offset on the client.
618 ASSERT_TRUE(client_statistics_fetcher.Fetch());
619
620 EXPECT_EQ(client_statistics_fetcher->connections()->size(), 1u);
621 EXPECT_EQ(client_statistics_fetcher->connections()
622 ->Get(0)
623 ->node()
624 ->name()
625 ->string_view(),
626 "pi1");
627
628 // Make sure the offset in one direction is less than a second.
629 EXPECT_GT(
630 client_statistics_fetcher->connections()->Get(0)->monotonic_offset(), 0);
631 EXPECT_LT(
632 client_statistics_fetcher->connections()->Get(0)->monotonic_offset(),
633 1000000000);
634
635 EXPECT_GE(pi1_server_statistics_count, 2);
636 EXPECT_GE(pi2_server_statistics_count, 2);
637 EXPECT_GE(pi1_client_statistics_count, 2);
638 EXPECT_GE(pi2_client_statistics_count, 2);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700639
640 // Confirm we got timestamps back!
641 EXPECT_TRUE(message_header_fetcher1.Fetch());
642 EXPECT_TRUE(message_header_fetcher2.Fetch());
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800643}
644
Austin Schuh5344c352020-04-12 17:04:26 -0700645// Test that the client disconnecting triggers the server offsets on both sides
646// to clear.
Austin Schuhe991fe22020-11-18 16:53:39 -0800647TEST_F(MessageBridgeTest, ClientRestart) {
Austin Schuh5344c352020-04-12 17:04:26 -0700648 // This is rather annoying to set up. We need to start up a client and
649 // server, on the same node, but get them to think that they are on different
650 // nodes.
651 //
652 // We need the client to not post directly to "/test" like it would in a
653 // real system, otherwise we will re-send the ping message... So, use an
654 // application specific map to have the client post somewhere else.
655 //
656 // To top this all off, each of these needs to be done with a ShmEventLoop,
657 // which needs to run in a separate thread... And it is really hard to get
658 // everything started up reliably. So just be super generous on timeouts and
659 // hope for the best. We can be more generous in the future if we need to.
660 //
661 // We are faking the application names by passing in --application_name=foo
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800662 OnPi1();
Austin Schuh5344c352020-04-12 17:04:26 -0700663
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800664 MakePi1Server();
665 MakePi1Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700666
667 // And build the app for testing.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800668 MakePi1Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700669 aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800670 pi1_test_event_loop->MakeFetcher<ServerStatistics>("/pi1/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700671
672 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800673 OnPi2();
674 MakePi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700675
676 // And build the app for testing.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800677 MakePi2Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700678 aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800679 pi2_test_event_loop->MakeFetcher<ServerStatistics>("/pi2/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700680
681 // Wait until we are connected, then send.
Austin Schuh5344c352020-04-12 17:04:26 -0700682
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800683 StartPi1Test();
684 StartPi2Test();
685 StartPi1Server();
686 StartPi1Client();
687 StartPi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700688
689 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800690 MakePi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700691
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800692 RunPi2Client(chrono::milliseconds(3050));
Austin Schuh5344c352020-04-12 17:04:26 -0700693
694 // Now confirm we are synchronized.
695 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
696 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
697
698 const ServerConnection *const pi1_connection =
699 pi1_server_statistics_fetcher->connections()->Get(0);
700 const ServerConnection *const pi2_connection =
701 pi2_server_statistics_fetcher->connections()->Get(0);
702
703 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
704 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
705 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
706 chrono::milliseconds(1));
707 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
708 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800709 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700710
711 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
712 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
713 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
714 chrono::milliseconds(1));
715 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
716 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800717 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800718
719 StopPi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700720 }
721
722 std::this_thread::sleep_for(std::chrono::seconds(2));
723
724 {
725 // Now confirm we are un-synchronized.
726 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
727 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
728 const ServerConnection *const pi1_connection =
729 pi1_server_statistics_fetcher->connections()->Get(0);
730 const ServerConnection *const pi2_connection =
731 pi2_server_statistics_fetcher->connections()->Get(0);
732
733 EXPECT_EQ(pi1_connection->state(), State::DISCONNECTED);
734 EXPECT_FALSE(pi1_connection->has_monotonic_offset());
Austin Schuh20ac95d2020-12-05 17:24:19 -0800735 EXPECT_FALSE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700736 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
737 EXPECT_FALSE(pi2_connection->has_monotonic_offset());
Austin Schuh20ac95d2020-12-05 17:24:19 -0800738 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700739 }
740
741 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800742 MakePi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700743 // And go!
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800744 RunPi2Client(chrono::milliseconds(3050));
Austin Schuh5344c352020-04-12 17:04:26 -0700745
746 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
747 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
748
749 // Now confirm we are synchronized again.
750 const ServerConnection *const pi1_connection =
751 pi1_server_statistics_fetcher->connections()->Get(0);
752 const ServerConnection *const pi2_connection =
753 pi2_server_statistics_fetcher->connections()->Get(0);
754
755 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
756 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
757 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
758 chrono::milliseconds(1));
759 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
760 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800761 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700762
763 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
764 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
765 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
766 chrono::milliseconds(1));
767 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
768 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800769 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800770
771 StopPi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700772 }
773
774 // Shut everyone else down
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800775 StopPi1Server();
776 StopPi1Client();
777 StopPi2Server();
778 StopPi1Test();
779 StopPi2Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700780}
781
782// Test that the server disconnecting triggers the server offsets on the other
783// side to clear, along with the other client.
Austin Schuhe991fe22020-11-18 16:53:39 -0800784TEST_F(MessageBridgeTest, ServerRestart) {
Austin Schuh5344c352020-04-12 17:04:26 -0700785 // This is rather annoying to set up. We need to start up a client and
786 // server, on the same node, but get them to think that they are on different
787 // nodes.
788 //
789 // We need the client to not post directly to "/test" like it would in a
790 // real system, otherwise we will re-send the ping message... So, use an
791 // application specific map to have the client post somewhere else.
792 //
793 // To top this all off, each of these needs to be done with a ShmEventLoop,
794 // which needs to run in a separate thread... And it is really hard to get
795 // everything started up reliably. So just be super generous on timeouts and
796 // hope for the best. We can be more generous in the future if we need to.
797 //
798 // We are faking the application names by passing in --application_name=foo
Austin Schuh5344c352020-04-12 17:04:26 -0700799 // Force ourselves to be "raspberrypi" and allocate everything.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800800 OnPi1();
801 MakePi1Server();
802 MakePi1Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700803
804 // And build the app for testing.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800805 MakePi1Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700806 aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800807 pi1_test_event_loop->MakeFetcher<ServerStatistics>("/pi1/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700808 aos::Fetcher<ClientStatistics> pi1_client_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800809 pi1_test_event_loop->MakeFetcher<ClientStatistics>("/pi1/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700810
811 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800812 OnPi2();
813 MakePi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700814
815 // And build the app for testing.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800816 MakePi2Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700817 aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800818 pi2_test_event_loop->MakeFetcher<ServerStatistics>("/pi2/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700819
820 // Start everything up. Pong is the only thing we don't know how to wait on,
821 // so start it first.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800822 StartPi1Test();
823 StartPi2Test();
824 StartPi1Server();
825 StartPi1Client();
826 StartPi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700827
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800828 // Confirm both client and server statistics messages have decent offsets in
829 // them.
Austin Schuh5344c352020-04-12 17:04:26 -0700830
831 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800832 MakePi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700833
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800834 RunPi2Server(chrono::milliseconds(3050));
Austin Schuh5344c352020-04-12 17:04:26 -0700835
836 // Now confirm we are synchronized.
837 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
838 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
839
840 const ServerConnection *const pi1_connection =
841 pi1_server_statistics_fetcher->connections()->Get(0);
842 const ServerConnection *const pi2_connection =
843 pi2_server_statistics_fetcher->connections()->Get(0);
844
845 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
846 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
847 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
848 chrono::milliseconds(1));
849 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
850 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800851 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700852
853 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
854 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
855 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
856 chrono::milliseconds(1));
857 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
858 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800859 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800860
861 StopPi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700862 }
863
864 std::this_thread::sleep_for(std::chrono::seconds(2));
865
866 {
867 // And confirm we are unsynchronized.
868 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
869 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
870
871 const ServerConnection *const pi1_server_connection =
872 pi1_server_statistics_fetcher->connections()->Get(0);
873 const ClientConnection *const pi1_client_connection =
874 pi1_client_statistics_fetcher->connections()->Get(0);
875
876 EXPECT_EQ(pi1_server_connection->state(), State::CONNECTED);
877 EXPECT_FALSE(pi1_server_connection->has_monotonic_offset());
Austin Schuh20ac95d2020-12-05 17:24:19 -0800878 EXPECT_TRUE(pi1_server_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700879 EXPECT_EQ(pi1_client_connection->state(), State::DISCONNECTED);
880 EXPECT_FALSE(pi1_client_connection->has_monotonic_offset());
881 }
882
883 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800884 MakePi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700885
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800886 RunPi2Server(chrono::milliseconds(3050));
Austin Schuh5344c352020-04-12 17:04:26 -0700887
888 // And confirm we are synchronized again.
889 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
890 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
891
892 const ServerConnection *const pi1_connection =
893 pi1_server_statistics_fetcher->connections()->Get(0);
894 const ServerConnection *const pi2_connection =
895 pi2_server_statistics_fetcher->connections()->Get(0);
896
897 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
898 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
899 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
900 chrono::milliseconds(1));
901 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
902 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800903 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700904
905 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
906 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
907 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
908 chrono::milliseconds(1));
909 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
910 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800911 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800912
913 StopPi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700914 }
915
916 // Shut everyone else down
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800917 StopPi1Server();
918 StopPi1Client();
919 StopPi2Client();
920 StopPi1Test();
921 StopPi2Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700922}
923
Austin Schuh4889b182020-11-18 19:11:56 -0800924// TODO(austin): The above test confirms that the external state does the right
Austin Schuh5344c352020-04-12 17:04:26 -0700925// thing, but doesn't confirm that the internal state does. We either need to
926// expose a way to check the state in a thread-safe way, or need a way to jump
927// time for one node to do that.
928
Austin Schuh4889b182020-11-18 19:11:56 -0800929void SendPing(aos::Sender<examples::Ping> *sender, int value) {
930 aos::Sender<examples::Ping>::Builder builder = sender->MakeBuilder();
931 examples::Ping::Builder ping_builder = builder.MakeBuilder<examples::Ping>();
932 ping_builder.add_value(value);
933 builder.Send(ping_builder.Finish());
934}
935
936// Tests that when a message is sent before the bridge starts up, but is
937// configured as reliable, we forward it. Confirm this survives a client reset.
938TEST_F(MessageBridgeTest, ReliableSentBeforeClientStartup) {
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800939 OnPi1();
Austin Schuh4889b182020-11-18 19:11:56 -0800940
941 FLAGS_application_name = "sender";
942 aos::ShmEventLoop send_event_loop(&pi1_config.message());
943 aos::Sender<examples::Ping> ping_sender =
944 send_event_loop.MakeSender<examples::Ping>("/test");
945 SendPing(&ping_sender, 1);
946 aos::Sender<examples::Ping> unreliable_ping_sender =
947 send_event_loop.MakeSender<examples::Ping>("/unreliable");
948 SendPing(&unreliable_ping_sender, 1);
949
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800950 MakePi1Server();
951 MakePi1Client();
Austin Schuh4889b182020-11-18 19:11:56 -0800952
953 FLAGS_application_name = "pi1_timestamp";
954 aos::ShmEventLoop pi1_remote_timestamp_event_loop(&pi1_config.message());
955
956 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800957 OnPi2();
Austin Schuh4889b182020-11-18 19:11:56 -0800958
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800959 MakePi2Server();
Austin Schuh4889b182020-11-18 19:11:56 -0800960
961 aos::ShmEventLoop receive_event_loop(&pi2_config.message());
962 aos::Fetcher<examples::Ping> ping_fetcher =
963 receive_event_loop.MakeFetcher<examples::Ping>("/test");
964 aos::Fetcher<examples::Ping> unreliable_ping_fetcher =
965 receive_event_loop.MakeFetcher<examples::Ping>("/unreliable");
966 aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
967 receive_event_loop.MakeFetcher<ClientStatistics>("/pi2/aos");
968
969 const size_t ping_channel_index = configuration::ChannelIndex(
970 receive_event_loop.configuration(), ping_fetcher.channel());
971
972 std::atomic<int> ping_timestamp_count{0};
973 pi1_remote_timestamp_event_loop.MakeWatcher(
974 "/pi1/aos/remote_timestamps/pi2",
Austin Schuh0de30f32020-12-06 12:44:28 -0800975 [ping_channel_index, &ping_timestamp_count](const RemoteMessage &header) {
976 VLOG(1) << "/pi1/aos/remote_timestamps/pi2 RemoteMessage "
977 << aos::FlatbufferToJson(&header);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800978 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuh4889b182020-11-18 19:11:56 -0800979 if (header.channel_index() == ping_channel_index) {
980 ++ping_timestamp_count;
981 }
982 });
983
984 // Before everything starts up, confirm there is no message.
985 EXPECT_FALSE(ping_fetcher.Fetch());
986 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
987
988 // Spin up the persistant pieces.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800989 StartPi1Server();
990 StartPi1Client();
991 StartPi2Server();
Austin Schuh4889b182020-11-18 19:11:56 -0800992
993 // Event used to wait for the timestamp counting thread to start.
994 aos::Event event;
995 std::thread pi1_remote_timestamp_thread(
996 [&pi1_remote_timestamp_event_loop, &event]() {
997 pi1_remote_timestamp_event_loop.OnRun([&event]() { event.Set(); });
998 pi1_remote_timestamp_event_loop.Run();
999 });
1000
1001 event.Wait();
1002
1003 {
1004 // Now, spin up a client for 2 seconds.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001005 MakePi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001006
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001007 RunPi2Client(chrono::milliseconds(2050));
Austin Schuh4889b182020-11-18 19:11:56 -08001008
1009 // Confirm there is no detected duplicate packet.
1010 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1011 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1012 ->Get(0)
1013 ->duplicate_packets(),
1014 0u);
1015
1016 EXPECT_TRUE(ping_fetcher.Fetch());
1017 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
1018 EXPECT_EQ(ping_timestamp_count, 1);
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001019
1020 StopPi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001021 }
1022
1023 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001024 // Now, spin up a client for 2 seconds.
1025 MakePi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001026
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001027 RunPi2Client(chrono::milliseconds(5050));
Austin Schuh4889b182020-11-18 19:11:56 -08001028
1029 // Confirm we detect the duplicate packet correctly.
1030 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1031 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1032 ->Get(0)
1033 ->duplicate_packets(),
1034 1u);
1035
1036 EXPECT_EQ(ping_timestamp_count, 1);
1037 EXPECT_FALSE(ping_fetcher.Fetch());
1038 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001039
1040 StopPi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001041 }
1042
1043 // Shut everyone else down
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001044 StopPi1Client();
1045 StopPi2Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001046 pi1_remote_timestamp_event_loop.Exit();
1047 pi1_remote_timestamp_thread.join();
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001048 StopPi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001049}
1050
1051// Tests that when a message is sent before the bridge starts up, but is
1052// configured as reliable, we forward it. Confirm this works across server
1053// resets.
1054TEST_F(MessageBridgeTest, ReliableSentBeforeServerStartup) {
1055 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001056 OnPi2();
Austin Schuh4889b182020-11-18 19:11:56 -08001057
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001058 MakePi2Server();
1059 MakePi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001060
1061 aos::ShmEventLoop receive_event_loop(&pi2_config.message());
1062 aos::Fetcher<examples::Ping> ping_fetcher =
1063 receive_event_loop.MakeFetcher<examples::Ping>("/test");
1064 aos::Fetcher<examples::Ping> unreliable_ping_fetcher =
1065 receive_event_loop.MakeFetcher<examples::Ping>("/unreliable");
1066 aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
1067 receive_event_loop.MakeFetcher<ClientStatistics>("/pi2/aos");
1068
Austin Schuh4889b182020-11-18 19:11:56 -08001069 // Force ourselves to be "raspberrypi" and allocate everything.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001070 OnPi1();
Austin Schuh4889b182020-11-18 19:11:56 -08001071
1072 FLAGS_application_name = "sender";
1073 aos::ShmEventLoop send_event_loop(&pi1_config.message());
1074 aos::Sender<examples::Ping> ping_sender =
1075 send_event_loop.MakeSender<examples::Ping>("/test");
1076 {
1077 aos::Sender<examples::Ping>::Builder builder = ping_sender.MakeBuilder();
1078 examples::Ping::Builder ping_builder =
1079 builder.MakeBuilder<examples::Ping>();
1080 ping_builder.add_value(1);
1081 builder.Send(ping_builder.Finish());
1082 }
1083
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001084 MakePi1Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001085
1086 FLAGS_application_name = "pi1_timestamp";
1087 aos::ShmEventLoop pi1_remote_timestamp_event_loop(&pi1_config.message());
1088
1089 const size_t ping_channel_index = configuration::ChannelIndex(
1090 receive_event_loop.configuration(), ping_fetcher.channel());
1091
1092 std::atomic<int> ping_timestamp_count{0};
1093 pi1_remote_timestamp_event_loop.MakeWatcher(
1094 "/pi1/aos/remote_timestamps/pi2",
Austin Schuh0de30f32020-12-06 12:44:28 -08001095 [ping_channel_index, &ping_timestamp_count](const RemoteMessage &header) {
1096 VLOG(1) << "/pi1/aos/remote_timestamps/pi2 RemoteMessage "
1097 << aos::FlatbufferToJson(&header);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001098 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuh4889b182020-11-18 19:11:56 -08001099 if (header.channel_index() == ping_channel_index) {
1100 ++ping_timestamp_count;
1101 }
1102 });
1103
1104 // Before everything starts up, confirm there is no message.
1105 EXPECT_FALSE(ping_fetcher.Fetch());
1106 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
1107
1108 // Spin up the persistant pieces.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001109 StartPi1Client();
1110 StartPi2Server();
1111 StartPi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001112
1113 // Event used to wait for the timestamp counting thread to start.
1114 aos::Event event;
1115 std::thread pi1_remote_timestamp_thread(
1116 [&pi1_remote_timestamp_event_loop, &event]() {
1117 pi1_remote_timestamp_event_loop.OnRun([&event]() { event.Set(); });
1118 pi1_remote_timestamp_event_loop.Run();
1119 });
1120
1121 event.Wait();
1122
1123 {
1124 // Now, spin up a server for 2 seconds.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001125 MakePi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001126
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001127 RunPi1Server(chrono::milliseconds(2050));
Austin Schuh4889b182020-11-18 19:11:56 -08001128
1129 // Confirm there is no detected duplicate packet.
1130 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1131 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1132 ->Get(0)
1133 ->duplicate_packets(),
1134 0u);
1135
1136 EXPECT_TRUE(ping_fetcher.Fetch());
1137 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
1138 EXPECT_EQ(ping_timestamp_count, 1);
1139 LOG(INFO) << "Shutting down first pi1 MessageBridgeServer";
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001140
1141 StopPi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001142 }
1143
1144 {
1145 // Now, spin up a second server for 2 seconds.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001146 MakePi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001147
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001148 RunPi1Server(chrono::milliseconds(2050));
Austin Schuh4889b182020-11-18 19:11:56 -08001149
1150 // Confirm we detect the duplicate packet correctly.
1151 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1152 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1153 ->Get(0)
1154 ->duplicate_packets(),
1155 1u);
1156
1157 EXPECT_EQ(ping_timestamp_count, 1);
1158 EXPECT_FALSE(ping_fetcher.Fetch());
1159 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001160
1161 StopPi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001162 }
1163
1164 // Shut everyone else down
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001165 StopPi1Client();
1166 StopPi2Server();
1167 StopPi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001168 pi1_remote_timestamp_event_loop.Exit();
1169 pi1_remote_timestamp_thread.join();
Austin Schuh4889b182020-11-18 19:11:56 -08001170}
1171
Austin Schuhe84c3ed2019-12-14 15:29:48 -08001172} // namespace testing
1173} // namespace message_bridge
1174} // namespace aos