blob: c8c24cf48ff5cf012f67b06a383c03c8da75f1a7 [file] [log] [blame]
Austin Schuhe84c3ed2019-12-14 15:29:48 -08001#include <chrono>
2#include <thread>
3
Austin Schuh2f8fd752020-09-01 22:38:28 -07004#include "absl/strings/str_cat.h"
Austin Schuhe84c3ed2019-12-14 15:29:48 -08005#include "aos/events/ping_generated.h"
6#include "aos/events/pong_generated.h"
Brian Silverman7b266d92021-02-17 21:24:02 -08007#include "aos/ipc_lib/event.h"
Austin Schuhe84c3ed2019-12-14 15:29:48 -08008#include "aos/network/message_bridge_client_lib.h"
9#include "aos/network/message_bridge_server_lib.h"
Jim Ostrowski2192ddb2020-06-24 19:07:31 -070010#include "aos/network/team_number.h"
Austin Schuhe991fe22020-11-18 16:53:39 -080011#include "aos/util/file.h"
Brian Silverman7b266d92021-02-17 21:24:02 -080012#include "gtest/gtest.h"
Austin Schuhe84c3ed2019-12-14 15:29:48 -080013
14namespace aos {
Austin Schuh2f8fd752020-09-01 22:38:28 -070015void SetShmBase(const std::string_view base);
16
Austin Schuhe84c3ed2019-12-14 15:29:48 -080017namespace message_bridge {
18namespace testing {
19
20namespace chrono = std::chrono;
21
Austin Schuhe991fe22020-11-18 16:53:39 -080022std::string ShmBase(const std::string_view node) {
Austin Schuh2f8fd752020-09-01 22:38:28 -070023 const char *tmpdir_c_str = getenv("TEST_TMPDIR");
24 if (tmpdir_c_str != nullptr) {
Austin Schuhe991fe22020-11-18 16:53:39 -080025 return absl::StrCat(tmpdir_c_str, "/", node);
Austin Schuh2f8fd752020-09-01 22:38:28 -070026 } else {
Austin Schuhe991fe22020-11-18 16:53:39 -080027 return absl::StrCat("/dev/shm/", node);
Austin Schuh2f8fd752020-09-01 22:38:28 -070028 }
29}
30
Austin Schuhe991fe22020-11-18 16:53:39 -080031void DoSetShmBase(const std::string_view node) {
32 aos::SetShmBase(ShmBase(node));
33}
34
35class MessageBridgeTest : public ::testing::Test {
Austin Schuh0de30f32020-12-06 12:44:28 -080036 public:
Austin Schuhf466ab52021-02-16 22:00:38 -080037 MessageBridgeTest(std::string_view path =
38 "aos/network/message_bridge_test_common_config.json")
39 : config(aos::configuration::ReadConfig(path)) {
Austin Schuh0de30f32020-12-06 12:44:28 -080040 util::UnlinkRecursive(ShmBase("pi1"));
41 util::UnlinkRecursive(ShmBase("pi2"));
42 }
Austin Schuhe991fe22020-11-18 16:53:39 -080043
Austin Schuh0a2f12f2021-01-08 22:48:29 -080044 void OnPi1() {
45 DoSetShmBase("pi1");
46 FLAGS_override_hostname = "raspberrypi";
47 }
48
49 void OnPi2() {
50 DoSetShmBase("pi2");
51 FLAGS_override_hostname = "raspberrypi2";
52 }
53
54 void MakePi1Server() {
55 OnPi1();
56 FLAGS_application_name = "pi1_message_bridge_server";
57 pi1_server_event_loop =
Austin Schuhf466ab52021-02-16 22:00:38 -080058 std::make_unique<aos::ShmEventLoop>(&config.message());
Austin Schuh0a2f12f2021-01-08 22:48:29 -080059 pi1_server_event_loop->SetRuntimeRealtimePriority(1);
60 pi1_message_bridge_server =
61 std::make_unique<MessageBridgeServer>(pi1_server_event_loop.get());
62 }
63
64 void RunPi1Server(chrono::nanoseconds duration) {
65 // Setup a shutdown callback.
66 aos::TimerHandler *const quit = pi1_server_event_loop->AddTimer(
67 [this]() { pi1_server_event_loop->Exit(); });
68 pi1_server_event_loop->OnRun([this, quit, duration]() {
69 // Stop between timestamps, not exactly on them.
70 quit->Setup(pi1_server_event_loop->monotonic_now() + duration);
71 });
72
73 pi1_server_event_loop->Run();
74 }
75
76 void StartPi1Server() {
77 pi1_server_thread = std::thread([this]() {
78 LOG(INFO) << "Started pi1_message_bridge_server";
79 pi1_server_event_loop->Run();
80 });
81 }
82
83 void StopPi1Server() {
84 if (pi1_server_thread.joinable()) {
85 pi1_server_event_loop->Exit();
86 pi1_server_thread.join();
87 pi1_server_thread = std::thread();
88 }
89 pi1_message_bridge_server.reset();
90 pi1_server_event_loop.reset();
91 }
92
93 void MakePi1Client() {
94 OnPi1();
95 FLAGS_application_name = "pi1_message_bridge_client";
96 pi1_client_event_loop =
Austin Schuhf466ab52021-02-16 22:00:38 -080097 std::make_unique<aos::ShmEventLoop>(&config.message());
Austin Schuh0a2f12f2021-01-08 22:48:29 -080098 pi1_client_event_loop->SetRuntimeRealtimePriority(1);
99 pi1_message_bridge_client =
100 std::make_unique<MessageBridgeClient>(pi1_client_event_loop.get());
101 }
102
103 void StartPi1Client() {
104 pi1_client_thread = std::thread([this]() {
105 LOG(INFO) << "Started pi1_message_bridge_client";
106 pi1_client_event_loop->Run();
107 });
108 }
109
110 void StopPi1Client() {
111 pi1_client_event_loop->Exit();
112 pi1_client_thread.join();
113 pi1_client_thread = std::thread();
114 pi1_message_bridge_client.reset();
115 pi1_client_event_loop.reset();
116 }
117
118 void MakePi1Test() {
119 OnPi1();
120 FLAGS_application_name = "test1";
121 pi1_test_event_loop =
Austin Schuhf466ab52021-02-16 22:00:38 -0800122 std::make_unique<aos::ShmEventLoop>(&config.message());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800123
124 pi1_test_event_loop->MakeWatcher(
125 "/pi1/aos", [](const ServerStatistics &stats) {
126 VLOG(1) << "/pi1/aos ServerStatistics " << FlatbufferToJson(&stats);
127 });
128
129 pi1_test_event_loop->MakeWatcher(
130 "/pi1/aos", [](const ClientStatistics &stats) {
131 VLOG(1) << "/pi1/aos ClientStatistics " << FlatbufferToJson(&stats);
132 });
133
134 pi1_test_event_loop->MakeWatcher(
135 "/pi1/aos", [](const Timestamp &timestamp) {
136 VLOG(1) << "/pi1/aos Timestamp " << FlatbufferToJson(&timestamp);
137 });
138 }
139
140 void StartPi1Test() {
141 pi1_test_thread = std::thread([this]() {
142 LOG(INFO) << "Started pi1_test";
143 pi1_test_event_loop->Run();
144 });
145 }
146
147 void StopPi1Test() {
148 pi1_test_event_loop->Exit();
149 pi1_test_thread.join();
150 }
151
152 void MakePi2Server() {
153 OnPi2();
154 FLAGS_application_name = "pi2_message_bridge_server";
155 pi2_server_event_loop =
Austin Schuhf466ab52021-02-16 22:00:38 -0800156 std::make_unique<aos::ShmEventLoop>(&config.message());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800157 pi2_server_event_loop->SetRuntimeRealtimePriority(1);
158 pi2_message_bridge_server =
159 std::make_unique<MessageBridgeServer>(pi2_server_event_loop.get());
160 }
161
162 void RunPi2Server(chrono::nanoseconds duration) {
163 // Setup a shutdown callback.
164 aos::TimerHandler *const quit = pi2_server_event_loop->AddTimer(
165 [this]() { pi2_server_event_loop->Exit(); });
166 pi2_server_event_loop->OnRun([this, quit, duration]() {
167 // Stop between timestamps, not exactly on them.
168 quit->Setup(pi2_server_event_loop->monotonic_now() + duration);
169 });
170
171 pi2_server_event_loop->Run();
172 }
173
174 void StartPi2Server() {
175 pi2_server_thread = std::thread([this]() {
176 LOG(INFO) << "Started pi2_message_bridge_server";
177 pi2_server_event_loop->Run();
178 });
179 }
180
181 void StopPi2Server() {
182 if (pi2_server_thread.joinable()) {
183 pi2_server_event_loop->Exit();
184 pi2_server_thread.join();
185 pi2_server_thread = std::thread();
186 }
187 pi2_message_bridge_server.reset();
188 pi2_server_event_loop.reset();
189 }
190
191 void MakePi2Client() {
192 OnPi2();
193 FLAGS_application_name = "pi2_message_bridge_client";
194 pi2_client_event_loop =
Austin Schuhf466ab52021-02-16 22:00:38 -0800195 std::make_unique<aos::ShmEventLoop>(&config.message());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800196 pi2_client_event_loop->SetRuntimeRealtimePriority(1);
197 pi2_message_bridge_client =
198 std::make_unique<MessageBridgeClient>(pi2_client_event_loop.get());
199 }
200
201 void RunPi2Client(chrono::nanoseconds duration) {
202 // Run for 5 seconds to make sure we have time to estimate the offset.
203 aos::TimerHandler *const quit = pi2_client_event_loop->AddTimer(
204 [this]() { pi2_client_event_loop->Exit(); });
205 pi2_client_event_loop->OnRun([this, quit, duration]() {
206 // Stop between timestamps, not exactly on them.
207 quit->Setup(pi2_client_event_loop->monotonic_now() + duration);
208 });
209
210 // And go!
211 pi2_client_event_loop->Run();
212 }
213
214 void StartPi2Client() {
215 pi2_client_thread = std::thread([this]() {
216 LOG(INFO) << "Started pi2_message_bridge_client";
217 pi2_client_event_loop->Run();
218 });
219 }
220
221 void StopPi2Client() {
222 if (pi2_client_thread.joinable()) {
223 pi2_client_event_loop->Exit();
224 pi2_client_thread.join();
225 pi2_client_thread = std::thread();
226 }
227 pi2_message_bridge_client.reset();
228 pi2_client_event_loop.reset();
229 }
230
231 void MakePi2Test() {
232 OnPi2();
233 FLAGS_application_name = "test2";
234 pi2_test_event_loop =
Austin Schuhf466ab52021-02-16 22:00:38 -0800235 std::make_unique<aos::ShmEventLoop>(&config.message());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800236
237 pi2_test_event_loop->MakeWatcher(
238 "/pi2/aos", [](const ServerStatistics &stats) {
239 VLOG(1) << "/pi2/aos ServerStatistics " << FlatbufferToJson(&stats);
240 });
241
242 pi2_test_event_loop->MakeWatcher(
243 "/pi2/aos", [](const ClientStatistics &stats) {
244 VLOG(1) << "/pi2/aos ClientStatistics " << FlatbufferToJson(&stats);
245 });
246
247 pi2_test_event_loop->MakeWatcher(
248 "/pi2/aos", [](const Timestamp &timestamp) {
249 VLOG(1) << "/pi2/aos Timestamp " << FlatbufferToJson(&timestamp);
250 });
251 }
252
253 void StartPi2Test() {
254 pi2_test_thread = std::thread([this]() {
255 LOG(INFO) << "Started pi2_message_bridge_test";
256 pi2_test_event_loop->Run();
257 });
258 }
259
260 void StopPi2Test() {
261 pi2_test_event_loop->Exit();
262 pi2_test_thread.join();
263 }
264
Austin Schuhf466ab52021-02-16 22:00:38 -0800265 aos::FlatbufferDetachedBuffer<aos::Configuration> config;
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800266
267 std::unique_ptr<aos::ShmEventLoop> pi1_server_event_loop;
268 std::unique_ptr<MessageBridgeServer> pi1_message_bridge_server;
269 std::thread pi1_server_thread;
270
271 std::unique_ptr<aos::ShmEventLoop> pi1_client_event_loop;
272 std::unique_ptr<MessageBridgeClient> pi1_message_bridge_client;
273 std::thread pi1_client_thread;
274
275 std::unique_ptr<aos::ShmEventLoop> pi1_test_event_loop;
276 std::thread pi1_test_thread;
277
278 std::unique_ptr<aos::ShmEventLoop> pi2_server_event_loop;
279 std::unique_ptr<MessageBridgeServer> pi2_message_bridge_server;
280 std::thread pi2_server_thread;
281
282 std::unique_ptr<aos::ShmEventLoop> pi2_client_event_loop;
283 std::unique_ptr<MessageBridgeClient> pi2_message_bridge_client;
284 std::thread pi2_client_thread;
285
286 std::unique_ptr<aos::ShmEventLoop> pi2_test_event_loop;
287 std::thread pi2_test_thread;
Austin Schuhe991fe22020-11-18 16:53:39 -0800288};
289
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800290// Test that we can send a ping message over sctp and receive it.
Austin Schuhe991fe22020-11-18 16:53:39 -0800291TEST_F(MessageBridgeTest, PingPong) {
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800292 // This is rather annoying to set up. We need to start up a client and
293 // server, on the same node, but get them to think that they are on different
294 // nodes.
295 //
296 // We then get to wait until they are connected.
297 //
298 // After they are connected, we send a Ping message.
299 //
300 // On the other end, we receive a Pong message.
301 //
302 // But, we need the client to not post directly to "/test" like it would in a
303 // real system, otherwise we will re-send the ping message... So, use an
304 // application specific map to have the client post somewhere else.
305 //
306 // To top this all off, each of these needs to be done with a ShmEventLoop,
307 // which needs to run in a separate thread... And it is really hard to get
308 // everything started up reliably. So just be super generous on timeouts and
309 // hope for the best. We can be more generous in the future if we need to.
310 //
311 // We are faking the application names by passing in --application_name=foo
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800312 OnPi1();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800313 // Force ourselves to be "raspberrypi" and allocate everything.
Austin Schuh2f8fd752020-09-01 22:38:28 -0700314
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800315 MakePi1Server();
316 MakePi1Client();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800317
318 // And build the app which sends the pings.
319 FLAGS_application_name = "ping";
Austin Schuhf466ab52021-02-16 22:00:38 -0800320 aos::ShmEventLoop ping_event_loop(&config.message());
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800321 aos::Sender<examples::Ping> ping_sender =
322 ping_event_loop.MakeSender<examples::Ping>("/test");
323
Austin Schuhf466ab52021-02-16 22:00:38 -0800324 aos::ShmEventLoop pi1_test_event_loop(&config.message());
Austin Schuh0de30f32020-12-06 12:44:28 -0800325 aos::Fetcher<RemoteMessage> message_header_fetcher1 =
326 pi1_test_event_loop.MakeFetcher<RemoteMessage>(
Austin Schuh2f8fd752020-09-01 22:38:28 -0700327 "/pi1/aos/remote_timestamps/pi2");
328
329 // Fetchers for confirming the remote timestamps made it.
330 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
331 ping_event_loop.MakeFetcher<examples::Ping>("/test");
332 aos::Fetcher<Timestamp> pi1_on_pi1_timestamp_fetcher =
333 ping_event_loop.MakeFetcher<Timestamp>("/aos");
334
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800335 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800336 OnPi2();
Austin Schuh2f8fd752020-09-01 22:38:28 -0700337
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800338 MakePi2Client();
339 MakePi2Server();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800340
341 // And build the app which sends the pongs.
342 FLAGS_application_name = "pong";
Austin Schuhf466ab52021-02-16 22:00:38 -0800343 aos::ShmEventLoop pong_event_loop(&config.message());
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800344
Austin Schuh7bc59052020-02-16 23:48:33 -0800345 // And build the app for testing.
346 FLAGS_application_name = "test";
Austin Schuhf466ab52021-02-16 22:00:38 -0800347 aos::ShmEventLoop test_event_loop(&config.message());
Austin Schuh7bc59052020-02-16 23:48:33 -0800348
349 aos::Fetcher<ClientStatistics> client_statistics_fetcher =
350 test_event_loop.MakeFetcher<ClientStatistics>("/aos");
Austin Schuh0de30f32020-12-06 12:44:28 -0800351 aos::Fetcher<RemoteMessage> message_header_fetcher2 =
352 test_event_loop.MakeFetcher<RemoteMessage>(
Austin Schuh2f8fd752020-09-01 22:38:28 -0700353 "/pi2/aos/remote_timestamps/pi1");
354
355 // Event loop for fetching data delivered to pi2 from pi1 to match up
356 // messages.
Austin Schuhf466ab52021-02-16 22:00:38 -0800357 aos::ShmEventLoop delivered_messages_event_loop(&config.message());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700358 aos::Fetcher<Timestamp> pi1_on_pi2_timestamp_fetcher =
359 delivered_messages_event_loop.MakeFetcher<Timestamp>("/pi1/aos");
360 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
361 delivered_messages_event_loop.MakeFetcher<examples::Ping>("/test");
362 EXPECT_FALSE(ping_on_pi2_fetcher.Fetch());
363 EXPECT_FALSE(pi1_on_pi2_timestamp_fetcher.Fetch());
Austin Schuh7bc59052020-02-16 23:48:33 -0800364
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800365 // Count the pongs.
366 int pong_count = 0;
367 pong_event_loop.MakeWatcher(
Austin Schuh2f8fd752020-09-01 22:38:28 -0700368 "/test", [&pong_count](const examples::Ping &ping) {
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800369 ++pong_count;
Austin Schuh1ca49e92020-12-11 00:01:27 -0800370 VLOG(1) << "Got ping back " << FlatbufferToJson(&ping);
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800371 });
372
373 FLAGS_override_hostname = "";
374
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800375 // Wait until we are connected, then send.
376 int ping_count = 0;
Austin Schuh7bc59052020-02-16 23:48:33 -0800377 int pi1_server_statistics_count = 0;
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800378 ping_event_loop.MakeWatcher(
Austin Schuh196a4452020-03-15 23:12:03 -0700379 "/pi1/aos",
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800380 [this, &ping_count, &ping_sender,
Austin Schuh7bc59052020-02-16 23:48:33 -0800381 &pi1_server_statistics_count](const ServerStatistics &stats) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800382 VLOG(1) << "/pi1/aos ServerStatistics " << FlatbufferToJson(&stats);
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800383
384 ASSERT_TRUE(stats.has_connections());
385 EXPECT_EQ(stats.connections()->size(), 1);
386
387 bool connected = false;
388 for (const ServerConnection *connection : *stats.connections()) {
Austin Schuh7bc59052020-02-16 23:48:33 -0800389 // Confirm that we are estimating the server time offset correctly. It
390 // should be about 0 since we are on the same machine here.
391 if (connection->has_monotonic_offset()) {
392 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
393 chrono::milliseconds(1));
394 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
395 chrono::milliseconds(-1));
396 ++pi1_server_statistics_count;
397 }
398
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800399 if (connection->node()->name()->string_view() ==
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800400 pi2_client_event_loop->node()->name()->string_view()) {
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800401 if (connection->state() == State::CONNECTED) {
Austin Schuh20ac95d2020-12-05 17:24:19 -0800402 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800403 connected = true;
404 }
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800405 }
406 }
407
408 if (connected) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800409 VLOG(1) << "Connected! Sent ping.";
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800410 auto builder = ping_sender.MakeBuilder();
411 examples::Ping::Builder ping_builder =
412 builder.MakeBuilder<examples::Ping>();
413 ping_builder.add_value(ping_count + 971);
414 builder.Send(ping_builder.Finish());
415 ++ping_count;
416 }
417 });
418
Austin Schuh7bc59052020-02-16 23:48:33 -0800419 // Confirm both client and server statistics messages have decent offsets in
420 // them.
421 int pi2_server_statistics_count = 0;
Austin Schuh196a4452020-03-15 23:12:03 -0700422 pong_event_loop.MakeWatcher("/pi2/aos", [&pi2_server_statistics_count](
Austin Schuh7bc59052020-02-16 23:48:33 -0800423 const ServerStatistics &stats) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800424 VLOG(1) << "/pi2/aos ServerStatistics " << FlatbufferToJson(&stats);
Austin Schuh7bc59052020-02-16 23:48:33 -0800425 for (const ServerConnection *connection : *stats.connections()) {
426 if (connection->has_monotonic_offset()) {
427 ++pi2_server_statistics_count;
428 // Confirm that we are estimating the server time offset correctly. It
429 // should be about 0 since we are on the same machine here.
430 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
431 chrono::milliseconds(1));
432 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
433 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800434 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh7bc59052020-02-16 23:48:33 -0800435 }
436 }
437 });
438
439 int pi1_client_statistics_count = 0;
Austin Schuh5344c352020-04-12 17:04:26 -0700440 ping_event_loop.MakeWatcher("/pi1/aos", [&pi1_client_statistics_count](
441 const ClientStatistics &stats) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800442 VLOG(1) << "/pi1/aos ClientStatistics " << FlatbufferToJson(&stats);
Austin Schuh7bc59052020-02-16 23:48:33 -0800443
Austin Schuh5344c352020-04-12 17:04:26 -0700444 for (const ClientConnection *connection : *stats.connections()) {
445 if (connection->has_monotonic_offset()) {
446 ++pi1_client_statistics_count;
447 // It takes at least 10 microseconds to send a message between the
448 // client and server. The min (filtered) time shouldn't be over 10
449 // milliseconds on localhost. This might have to bump up if this is
450 // proving flaky.
451 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
Austin Schuh3edddcc2020-12-29 13:32:02 -0800452 chrono::milliseconds(10))
453 << " " << connection->monotonic_offset()
454 << "ns vs 10000ns on iteration " << pi1_client_statistics_count;
Austin Schuh5344c352020-04-12 17:04:26 -0700455 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
Austin Schuh3edddcc2020-12-29 13:32:02 -0800456 chrono::microseconds(10))
457 << " " << connection->monotonic_offset()
458 << "ns vs 10000ns on iteration " << pi1_client_statistics_count;
Austin Schuh5344c352020-04-12 17:04:26 -0700459 }
460 }
461 });
Austin Schuh7bc59052020-02-16 23:48:33 -0800462
463 int pi2_client_statistics_count = 0;
Austin Schuh196a4452020-03-15 23:12:03 -0700464 pong_event_loop.MakeWatcher("/pi2/aos", [&pi2_client_statistics_count](
Austin Schuh7bc59052020-02-16 23:48:33 -0800465 const ClientStatistics &stats) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800466 VLOG(1) << "/pi2/aos ClientStatistics " << FlatbufferToJson(&stats);
Austin Schuh7bc59052020-02-16 23:48:33 -0800467
468 for (const ClientConnection *connection : *stats.connections()) {
469 if (connection->has_monotonic_offset()) {
470 ++pi2_client_statistics_count;
471 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
472 chrono::milliseconds(10));
473 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
474 chrono::microseconds(10));
475 }
476 }
477 });
478
Austin Schuh196a4452020-03-15 23:12:03 -0700479 ping_event_loop.MakeWatcher("/pi1/aos", [](const Timestamp &timestamp) {
Austin Schuh7bc59052020-02-16 23:48:33 -0800480 EXPECT_TRUE(timestamp.has_offsets());
Austin Schuh1ca49e92020-12-11 00:01:27 -0800481 VLOG(1) << "/pi1/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuh7bc59052020-02-16 23:48:33 -0800482 });
Austin Schuh196a4452020-03-15 23:12:03 -0700483 pong_event_loop.MakeWatcher("/pi2/aos", [](const Timestamp &timestamp) {
Austin Schuh7bc59052020-02-16 23:48:33 -0800484 EXPECT_TRUE(timestamp.has_offsets());
Austin Schuh1ca49e92020-12-11 00:01:27 -0800485 VLOG(1) << "/pi2/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuh7bc59052020-02-16 23:48:33 -0800486 });
487
488 // Run for 5 seconds to make sure we have time to estimate the offset.
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800489 aos::TimerHandler *quit = ping_event_loop.AddTimer(
490 [&ping_event_loop]() { ping_event_loop.Exit(); });
491 ping_event_loop.OnRun([quit, &ping_event_loop]() {
Austin Schuh7bc59052020-02-16 23:48:33 -0800492 // Stop between timestamps, not exactly on them.
493 quit->Setup(ping_event_loop.monotonic_now() + chrono::milliseconds(5050));
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800494 });
495
Austin Schuh2f8fd752020-09-01 22:38:28 -0700496 // Find the channel index for both the /pi1/aos Timestamp channel and Ping
497 // channel.
498 const size_t pi1_timestamp_channel = configuration::ChannelIndex(
499 pong_event_loop.configuration(), pi1_on_pi2_timestamp_fetcher.channel());
500 const size_t ping_timestamp_channel =
501 configuration::ChannelIndex(delivered_messages_event_loop.configuration(),
502 ping_on_pi2_fetcher.channel());
503
504 for (const Channel *channel : *ping_event_loop.configuration()->channels()) {
505 VLOG(1) << "Channel "
506 << configuration::ChannelIndex(ping_event_loop.configuration(),
507 channel)
508 << " " << configuration::CleanedChannelToString(channel);
509 }
510
511 // For each remote timestamp we get back, confirm that it is either a ping
512 // message, or a timestamp we sent out. Also confirm that the timestamps are
513 // correct.
514 ping_event_loop.MakeWatcher(
515 "/pi1/aos/remote_timestamps/pi2",
516 [pi1_timestamp_channel, ping_timestamp_channel, &ping_on_pi2_fetcher,
517 &ping_on_pi1_fetcher, &pi1_on_pi2_timestamp_fetcher,
Austin Schuh0de30f32020-12-06 12:44:28 -0800518 &pi1_on_pi1_timestamp_fetcher](const RemoteMessage &header) {
519 VLOG(1) << "/pi1/aos/remote_timestamps/pi2 RemoteMessage "
520 << aos::FlatbufferToJson(&header);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700521
Austin Schuh20ac95d2020-12-05 17:24:19 -0800522 EXPECT_TRUE(header.has_boot_uuid());
523
Austin Schuh2f8fd752020-09-01 22:38:28 -0700524 const aos::monotonic_clock::time_point header_monotonic_sent_time(
525 chrono::nanoseconds(header.monotonic_sent_time()));
526 const aos::realtime_clock::time_point header_realtime_sent_time(
527 chrono::nanoseconds(header.realtime_sent_time()));
528 const aos::monotonic_clock::time_point header_monotonic_remote_time(
529 chrono::nanoseconds(header.monotonic_remote_time()));
530 const aos::realtime_clock::time_point header_realtime_remote_time(
531 chrono::nanoseconds(header.realtime_remote_time()));
532
533 const Context *pi1_context = nullptr;
534 const Context *pi2_context = nullptr;
535
536 if (header.channel_index() == pi1_timestamp_channel) {
537 // Find the forwarded message.
538 while (pi1_on_pi2_timestamp_fetcher.context().monotonic_event_time <
539 header_monotonic_sent_time) {
540 ASSERT_TRUE(pi1_on_pi2_timestamp_fetcher.FetchNext());
541 }
542
543 // And the source message.
544 while (pi1_on_pi1_timestamp_fetcher.context().monotonic_event_time <
545 header_monotonic_remote_time) {
546 ASSERT_TRUE(pi1_on_pi1_timestamp_fetcher.FetchNext());
547 }
548
549 pi1_context = &pi1_on_pi1_timestamp_fetcher.context();
550 pi2_context = &pi1_on_pi2_timestamp_fetcher.context();
551 } else if (header.channel_index() == ping_timestamp_channel) {
552 // Find the forwarded message.
553 while (ping_on_pi2_fetcher.context().monotonic_event_time <
554 header_monotonic_sent_time) {
555 ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
556 }
557
558 // And the source message.
559 while (ping_on_pi1_fetcher.context().monotonic_event_time <
560 header_monotonic_remote_time) {
561 ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
562 }
563
564 pi1_context = &ping_on_pi1_fetcher.context();
565 pi2_context = &ping_on_pi2_fetcher.context();
566 } else {
567 LOG(FATAL) << "Unknown channel";
568 }
569
570 // Confirm the forwarded message has matching timestamps to the
571 // timestamps we got back.
572 EXPECT_EQ(pi2_context->queue_index, header.queue_index());
573 EXPECT_EQ(pi2_context->monotonic_event_time,
574 header_monotonic_sent_time);
575 EXPECT_EQ(pi2_context->realtime_event_time, header_realtime_sent_time);
576 EXPECT_EQ(pi2_context->realtime_remote_time,
577 header_realtime_remote_time);
578 EXPECT_EQ(pi2_context->monotonic_remote_time,
579 header_monotonic_remote_time);
580
581 // Confirm the forwarded message also matches the source message.
582 EXPECT_EQ(pi1_context->queue_index, header.queue_index());
583 EXPECT_EQ(pi1_context->monotonic_event_time,
584 header_monotonic_remote_time);
585 EXPECT_EQ(pi1_context->realtime_event_time,
586 header_realtime_remote_time);
587 });
588
Austin Schuh7bc59052020-02-16 23:48:33 -0800589 // Start everything up. Pong is the only thing we don't know how to wait on,
590 // so start it first.
591 std::thread pong_thread([&pong_event_loop]() { pong_event_loop.Run(); });
592
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800593 StartPi1Server();
594 StartPi1Client();
595 StartPi2Client();
596 StartPi2Server();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800597
598 // And go!
599 ping_event_loop.Run();
600
601 // Shut everyone else down
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800602 StopPi1Server();
603 StopPi1Client();
604 StopPi2Client();
605 StopPi2Server();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800606 pong_event_loop.Exit();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800607 pong_thread.join();
608
609 // Make sure we sent something.
610 EXPECT_GE(ping_count, 1);
611 // And got something back.
612 EXPECT_GE(pong_count, 1);
Austin Schuh7bc59052020-02-16 23:48:33 -0800613
614 // Confirm that we are estimating a monotonic offset on the client.
615 ASSERT_TRUE(client_statistics_fetcher.Fetch());
616
617 EXPECT_EQ(client_statistics_fetcher->connections()->size(), 1u);
618 EXPECT_EQ(client_statistics_fetcher->connections()
619 ->Get(0)
620 ->node()
621 ->name()
622 ->string_view(),
623 "pi1");
624
625 // Make sure the offset in one direction is less than a second.
626 EXPECT_GT(
627 client_statistics_fetcher->connections()->Get(0)->monotonic_offset(), 0);
628 EXPECT_LT(
629 client_statistics_fetcher->connections()->Get(0)->monotonic_offset(),
630 1000000000);
631
632 EXPECT_GE(pi1_server_statistics_count, 2);
633 EXPECT_GE(pi2_server_statistics_count, 2);
634 EXPECT_GE(pi1_client_statistics_count, 2);
635 EXPECT_GE(pi2_client_statistics_count, 2);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700636
637 // Confirm we got timestamps back!
638 EXPECT_TRUE(message_header_fetcher1.Fetch());
639 EXPECT_TRUE(message_header_fetcher2.Fetch());
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800640}
641
Austin Schuh5344c352020-04-12 17:04:26 -0700642// Test that the client disconnecting triggers the server offsets on both sides
643// to clear.
Austin Schuhe991fe22020-11-18 16:53:39 -0800644TEST_F(MessageBridgeTest, ClientRestart) {
Austin Schuh5344c352020-04-12 17:04:26 -0700645 // This is rather annoying to set up. We need to start up a client and
646 // server, on the same node, but get them to think that they are on different
647 // nodes.
648 //
649 // We need the client to not post directly to "/test" like it would in a
650 // real system, otherwise we will re-send the ping message... So, use an
651 // application specific map to have the client post somewhere else.
652 //
653 // To top this all off, each of these needs to be done with a ShmEventLoop,
654 // which needs to run in a separate thread... And it is really hard to get
655 // everything started up reliably. So just be super generous on timeouts and
656 // hope for the best. We can be more generous in the future if we need to.
657 //
658 // We are faking the application names by passing in --application_name=foo
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800659 OnPi1();
Austin Schuh5344c352020-04-12 17:04:26 -0700660
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800661 MakePi1Server();
662 MakePi1Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700663
664 // And build the app for testing.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800665 MakePi1Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700666 aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800667 pi1_test_event_loop->MakeFetcher<ServerStatistics>("/pi1/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700668
669 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800670 OnPi2();
671 MakePi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700672
673 // And build the app for testing.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800674 MakePi2Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700675 aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800676 pi2_test_event_loop->MakeFetcher<ServerStatistics>("/pi2/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700677
678 // Wait until we are connected, then send.
Austin Schuh5344c352020-04-12 17:04:26 -0700679
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800680 StartPi1Test();
681 StartPi2Test();
682 StartPi1Server();
683 StartPi1Client();
684 StartPi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700685
686 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800687 MakePi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700688
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800689 RunPi2Client(chrono::milliseconds(3050));
Austin Schuh5344c352020-04-12 17:04:26 -0700690
691 // Now confirm we are synchronized.
692 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
693 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
694
695 const ServerConnection *const pi1_connection =
696 pi1_server_statistics_fetcher->connections()->Get(0);
697 const ServerConnection *const pi2_connection =
698 pi2_server_statistics_fetcher->connections()->Get(0);
699
700 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
701 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
702 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
703 chrono::milliseconds(1));
704 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
705 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800706 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700707
708 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
709 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
710 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
711 chrono::milliseconds(1));
712 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
713 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800714 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800715
716 StopPi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700717 }
718
719 std::this_thread::sleep_for(std::chrono::seconds(2));
720
721 {
722 // Now confirm we are un-synchronized.
723 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
724 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
725 const ServerConnection *const pi1_connection =
726 pi1_server_statistics_fetcher->connections()->Get(0);
727 const ServerConnection *const pi2_connection =
728 pi2_server_statistics_fetcher->connections()->Get(0);
729
730 EXPECT_EQ(pi1_connection->state(), State::DISCONNECTED);
731 EXPECT_FALSE(pi1_connection->has_monotonic_offset());
Austin Schuh20ac95d2020-12-05 17:24:19 -0800732 EXPECT_FALSE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700733 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
734 EXPECT_FALSE(pi2_connection->has_monotonic_offset());
Austin Schuh20ac95d2020-12-05 17:24:19 -0800735 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700736 }
737
738 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800739 MakePi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700740 // And go!
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800741 RunPi2Client(chrono::milliseconds(3050));
Austin Schuh5344c352020-04-12 17:04:26 -0700742
743 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
744 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
745
746 // Now confirm we are synchronized again.
747 const ServerConnection *const pi1_connection =
748 pi1_server_statistics_fetcher->connections()->Get(0);
749 const ServerConnection *const pi2_connection =
750 pi2_server_statistics_fetcher->connections()->Get(0);
751
752 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
753 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
754 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
755 chrono::milliseconds(1));
756 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
757 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800758 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700759
760 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
761 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
762 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
763 chrono::milliseconds(1));
764 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
765 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800766 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800767
768 StopPi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700769 }
770
771 // Shut everyone else down
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800772 StopPi1Server();
773 StopPi1Client();
774 StopPi2Server();
775 StopPi1Test();
776 StopPi2Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700777}
778
779// Test that the server disconnecting triggers the server offsets on the other
780// side to clear, along with the other client.
Austin Schuhe991fe22020-11-18 16:53:39 -0800781TEST_F(MessageBridgeTest, ServerRestart) {
Austin Schuh5344c352020-04-12 17:04:26 -0700782 // This is rather annoying to set up. We need to start up a client and
783 // server, on the same node, but get them to think that they are on different
784 // nodes.
785 //
786 // We need the client to not post directly to "/test" like it would in a
787 // real system, otherwise we will re-send the ping message... So, use an
788 // application specific map to have the client post somewhere else.
789 //
790 // To top this all off, each of these needs to be done with a ShmEventLoop,
791 // which needs to run in a separate thread... And it is really hard to get
792 // everything started up reliably. So just be super generous on timeouts and
793 // hope for the best. We can be more generous in the future if we need to.
794 //
795 // We are faking the application names by passing in --application_name=foo
Austin Schuh5344c352020-04-12 17:04:26 -0700796 // Force ourselves to be "raspberrypi" and allocate everything.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800797 OnPi1();
798 MakePi1Server();
799 MakePi1Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700800
801 // And build the app for testing.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800802 MakePi1Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700803 aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800804 pi1_test_event_loop->MakeFetcher<ServerStatistics>("/pi1/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700805 aos::Fetcher<ClientStatistics> pi1_client_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800806 pi1_test_event_loop->MakeFetcher<ClientStatistics>("/pi1/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700807
808 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800809 OnPi2();
810 MakePi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700811
812 // And build the app for testing.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800813 MakePi2Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700814 aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800815 pi2_test_event_loop->MakeFetcher<ServerStatistics>("/pi2/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700816
817 // Start everything up. Pong is the only thing we don't know how to wait on,
818 // so start it first.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800819 StartPi1Test();
820 StartPi2Test();
821 StartPi1Server();
822 StartPi1Client();
823 StartPi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700824
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800825 // Confirm both client and server statistics messages have decent offsets in
826 // them.
Austin Schuh5344c352020-04-12 17:04:26 -0700827
828 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800829 MakePi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700830
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800831 RunPi2Server(chrono::milliseconds(3050));
Austin Schuh5344c352020-04-12 17:04:26 -0700832
833 // Now confirm we are synchronized.
834 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
835 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
836
837 const ServerConnection *const pi1_connection =
838 pi1_server_statistics_fetcher->connections()->Get(0);
839 const ServerConnection *const pi2_connection =
840 pi2_server_statistics_fetcher->connections()->Get(0);
841
842 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
843 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
844 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
845 chrono::milliseconds(1));
846 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
847 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800848 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700849
850 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
851 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
852 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
853 chrono::milliseconds(1));
854 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
855 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800856 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800857
858 StopPi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700859 }
860
861 std::this_thread::sleep_for(std::chrono::seconds(2));
862
863 {
864 // And confirm we are unsynchronized.
865 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
866 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
867
868 const ServerConnection *const pi1_server_connection =
869 pi1_server_statistics_fetcher->connections()->Get(0);
870 const ClientConnection *const pi1_client_connection =
871 pi1_client_statistics_fetcher->connections()->Get(0);
872
873 EXPECT_EQ(pi1_server_connection->state(), State::CONNECTED);
874 EXPECT_FALSE(pi1_server_connection->has_monotonic_offset());
Austin Schuh20ac95d2020-12-05 17:24:19 -0800875 EXPECT_TRUE(pi1_server_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700876 EXPECT_EQ(pi1_client_connection->state(), State::DISCONNECTED);
877 EXPECT_FALSE(pi1_client_connection->has_monotonic_offset());
878 }
879
880 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800881 MakePi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700882
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800883 RunPi2Server(chrono::milliseconds(3050));
Austin Schuh5344c352020-04-12 17:04:26 -0700884
885 // And confirm we are synchronized again.
886 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
887 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
888
889 const ServerConnection *const pi1_connection =
890 pi1_server_statistics_fetcher->connections()->Get(0);
891 const ServerConnection *const pi2_connection =
892 pi2_server_statistics_fetcher->connections()->Get(0);
893
894 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
895 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
896 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
897 chrono::milliseconds(1));
898 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
899 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800900 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700901
902 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
903 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
904 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
905 chrono::milliseconds(1));
906 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
907 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800908 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800909
910 StopPi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700911 }
912
913 // Shut everyone else down
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800914 StopPi1Server();
915 StopPi1Client();
916 StopPi2Client();
917 StopPi1Test();
918 StopPi2Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700919}
920
Austin Schuh4889b182020-11-18 19:11:56 -0800921// TODO(austin): The above test confirms that the external state does the right
Austin Schuh5344c352020-04-12 17:04:26 -0700922// thing, but doesn't confirm that the internal state does. We either need to
923// expose a way to check the state in a thread-safe way, or need a way to jump
924// time for one node to do that.
925
Austin Schuh4889b182020-11-18 19:11:56 -0800926void SendPing(aos::Sender<examples::Ping> *sender, int value) {
927 aos::Sender<examples::Ping>::Builder builder = sender->MakeBuilder();
928 examples::Ping::Builder ping_builder = builder.MakeBuilder<examples::Ping>();
929 ping_builder.add_value(value);
930 builder.Send(ping_builder.Finish());
931}
932
933// Tests that when a message is sent before the bridge starts up, but is
934// configured as reliable, we forward it. Confirm this survives a client reset.
935TEST_F(MessageBridgeTest, ReliableSentBeforeClientStartup) {
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800936 OnPi1();
Austin Schuh4889b182020-11-18 19:11:56 -0800937
938 FLAGS_application_name = "sender";
Austin Schuhf466ab52021-02-16 22:00:38 -0800939 aos::ShmEventLoop send_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -0800940 aos::Sender<examples::Ping> ping_sender =
941 send_event_loop.MakeSender<examples::Ping>("/test");
942 SendPing(&ping_sender, 1);
943 aos::Sender<examples::Ping> unreliable_ping_sender =
944 send_event_loop.MakeSender<examples::Ping>("/unreliable");
945 SendPing(&unreliable_ping_sender, 1);
946
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800947 MakePi1Server();
948 MakePi1Client();
Austin Schuh4889b182020-11-18 19:11:56 -0800949
950 FLAGS_application_name = "pi1_timestamp";
Austin Schuhf466ab52021-02-16 22:00:38 -0800951 aos::ShmEventLoop pi1_remote_timestamp_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -0800952
953 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800954 OnPi2();
Austin Schuh4889b182020-11-18 19:11:56 -0800955
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800956 MakePi2Server();
Austin Schuh4889b182020-11-18 19:11:56 -0800957
Austin Schuhf466ab52021-02-16 22:00:38 -0800958 aos::ShmEventLoop receive_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -0800959 aos::Fetcher<examples::Ping> ping_fetcher =
960 receive_event_loop.MakeFetcher<examples::Ping>("/test");
961 aos::Fetcher<examples::Ping> unreliable_ping_fetcher =
962 receive_event_loop.MakeFetcher<examples::Ping>("/unreliable");
963 aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
964 receive_event_loop.MakeFetcher<ClientStatistics>("/pi2/aos");
965
966 const size_t ping_channel_index = configuration::ChannelIndex(
967 receive_event_loop.configuration(), ping_fetcher.channel());
968
969 std::atomic<int> ping_timestamp_count{0};
970 pi1_remote_timestamp_event_loop.MakeWatcher(
971 "/pi1/aos/remote_timestamps/pi2",
Austin Schuh0de30f32020-12-06 12:44:28 -0800972 [ping_channel_index, &ping_timestamp_count](const RemoteMessage &header) {
973 VLOG(1) << "/pi1/aos/remote_timestamps/pi2 RemoteMessage "
974 << aos::FlatbufferToJson(&header);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800975 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuh4889b182020-11-18 19:11:56 -0800976 if (header.channel_index() == ping_channel_index) {
977 ++ping_timestamp_count;
978 }
979 });
980
981 // Before everything starts up, confirm there is no message.
982 EXPECT_FALSE(ping_fetcher.Fetch());
983 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
984
985 // Spin up the persistant pieces.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800986 StartPi1Server();
987 StartPi1Client();
988 StartPi2Server();
Austin Schuh4889b182020-11-18 19:11:56 -0800989
990 // Event used to wait for the timestamp counting thread to start.
991 aos::Event event;
992 std::thread pi1_remote_timestamp_thread(
993 [&pi1_remote_timestamp_event_loop, &event]() {
994 pi1_remote_timestamp_event_loop.OnRun([&event]() { event.Set(); });
995 pi1_remote_timestamp_event_loop.Run();
996 });
997
998 event.Wait();
999
1000 {
1001 // Now, spin up a client for 2 seconds.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001002 MakePi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001003
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001004 RunPi2Client(chrono::milliseconds(2050));
Austin Schuh4889b182020-11-18 19:11:56 -08001005
1006 // Confirm there is no detected duplicate packet.
1007 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1008 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1009 ->Get(0)
1010 ->duplicate_packets(),
1011 0u);
1012
1013 EXPECT_TRUE(ping_fetcher.Fetch());
1014 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
1015 EXPECT_EQ(ping_timestamp_count, 1);
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001016
1017 StopPi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001018 }
1019
1020 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001021 // Now, spin up a client for 2 seconds.
1022 MakePi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001023
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001024 RunPi2Client(chrono::milliseconds(5050));
Austin Schuh4889b182020-11-18 19:11:56 -08001025
1026 // Confirm we detect the duplicate packet correctly.
1027 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1028 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1029 ->Get(0)
1030 ->duplicate_packets(),
1031 1u);
1032
1033 EXPECT_EQ(ping_timestamp_count, 1);
1034 EXPECT_FALSE(ping_fetcher.Fetch());
1035 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001036
1037 StopPi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001038 }
1039
1040 // Shut everyone else down
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001041 StopPi1Client();
1042 StopPi2Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001043 pi1_remote_timestamp_event_loop.Exit();
1044 pi1_remote_timestamp_thread.join();
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001045 StopPi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001046}
1047
1048// Tests that when a message is sent before the bridge starts up, but is
1049// configured as reliable, we forward it. Confirm this works across server
1050// resets.
1051TEST_F(MessageBridgeTest, ReliableSentBeforeServerStartup) {
1052 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001053 OnPi2();
Austin Schuh4889b182020-11-18 19:11:56 -08001054
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001055 MakePi2Server();
1056 MakePi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001057
Austin Schuhf466ab52021-02-16 22:00:38 -08001058 aos::ShmEventLoop receive_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -08001059 aos::Fetcher<examples::Ping> ping_fetcher =
1060 receive_event_loop.MakeFetcher<examples::Ping>("/test");
1061 aos::Fetcher<examples::Ping> unreliable_ping_fetcher =
1062 receive_event_loop.MakeFetcher<examples::Ping>("/unreliable");
1063 aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
1064 receive_event_loop.MakeFetcher<ClientStatistics>("/pi2/aos");
1065
Austin Schuh4889b182020-11-18 19:11:56 -08001066 // Force ourselves to be "raspberrypi" and allocate everything.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001067 OnPi1();
Austin Schuh4889b182020-11-18 19:11:56 -08001068
1069 FLAGS_application_name = "sender";
Austin Schuhf466ab52021-02-16 22:00:38 -08001070 aos::ShmEventLoop send_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -08001071 aos::Sender<examples::Ping> ping_sender =
1072 send_event_loop.MakeSender<examples::Ping>("/test");
1073 {
1074 aos::Sender<examples::Ping>::Builder builder = ping_sender.MakeBuilder();
1075 examples::Ping::Builder ping_builder =
1076 builder.MakeBuilder<examples::Ping>();
1077 ping_builder.add_value(1);
1078 builder.Send(ping_builder.Finish());
1079 }
1080
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001081 MakePi1Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001082
1083 FLAGS_application_name = "pi1_timestamp";
Austin Schuhf466ab52021-02-16 22:00:38 -08001084 aos::ShmEventLoop pi1_remote_timestamp_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -08001085
1086 const size_t ping_channel_index = configuration::ChannelIndex(
1087 receive_event_loop.configuration(), ping_fetcher.channel());
1088
1089 std::atomic<int> ping_timestamp_count{0};
1090 pi1_remote_timestamp_event_loop.MakeWatcher(
1091 "/pi1/aos/remote_timestamps/pi2",
Austin Schuh0de30f32020-12-06 12:44:28 -08001092 [ping_channel_index, &ping_timestamp_count](const RemoteMessage &header) {
1093 VLOG(1) << "/pi1/aos/remote_timestamps/pi2 RemoteMessage "
1094 << aos::FlatbufferToJson(&header);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001095 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuh4889b182020-11-18 19:11:56 -08001096 if (header.channel_index() == ping_channel_index) {
1097 ++ping_timestamp_count;
1098 }
1099 });
1100
1101 // Before everything starts up, confirm there is no message.
1102 EXPECT_FALSE(ping_fetcher.Fetch());
1103 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
1104
1105 // Spin up the persistant pieces.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001106 StartPi1Client();
1107 StartPi2Server();
1108 StartPi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001109
1110 // Event used to wait for the timestamp counting thread to start.
1111 aos::Event event;
1112 std::thread pi1_remote_timestamp_thread(
1113 [&pi1_remote_timestamp_event_loop, &event]() {
1114 pi1_remote_timestamp_event_loop.OnRun([&event]() { event.Set(); });
1115 pi1_remote_timestamp_event_loop.Run();
1116 });
1117
1118 event.Wait();
1119
1120 {
1121 // Now, spin up a server for 2 seconds.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001122 MakePi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001123
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001124 RunPi1Server(chrono::milliseconds(2050));
Austin Schuh4889b182020-11-18 19:11:56 -08001125
1126 // Confirm there is no detected duplicate packet.
1127 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1128 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1129 ->Get(0)
1130 ->duplicate_packets(),
1131 0u);
1132
1133 EXPECT_TRUE(ping_fetcher.Fetch());
1134 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
1135 EXPECT_EQ(ping_timestamp_count, 1);
1136 LOG(INFO) << "Shutting down first pi1 MessageBridgeServer";
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001137
1138 StopPi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001139 }
1140
1141 {
1142 // Now, spin up a second server for 2 seconds.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001143 MakePi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001144
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001145 RunPi1Server(chrono::milliseconds(2050));
Austin Schuh4889b182020-11-18 19:11:56 -08001146
1147 // Confirm we detect the duplicate packet correctly.
1148 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1149 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1150 ->Get(0)
1151 ->duplicate_packets(),
1152 1u);
1153
1154 EXPECT_EQ(ping_timestamp_count, 1);
1155 EXPECT_FALSE(ping_fetcher.Fetch());
1156 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001157
1158 StopPi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001159 }
1160
1161 // Shut everyone else down
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001162 StopPi1Client();
1163 StopPi2Server();
1164 StopPi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001165 pi1_remote_timestamp_event_loop.Exit();
1166 pi1_remote_timestamp_thread.join();
Austin Schuh4889b182020-11-18 19:11:56 -08001167}
1168
Austin Schuhe84c3ed2019-12-14 15:29:48 -08001169} // namespace testing
1170} // namespace message_bridge
1171} // namespace aos