blob: 1f926c8dd741ef77ebb27406684d4830ad0f4f53 [file] [log] [blame]
Austin Schuhe84c3ed2019-12-14 15:29:48 -08001#include <chrono>
2#include <thread>
3
Austin Schuh2f8fd752020-09-01 22:38:28 -07004#include "absl/strings/str_cat.h"
Austin Schuhe84c3ed2019-12-14 15:29:48 -08005#include "aos/events/ping_generated.h"
6#include "aos/events/pong_generated.h"
Brian Silverman7b266d92021-02-17 21:24:02 -08007#include "aos/ipc_lib/event.h"
Austin Schuhe84c3ed2019-12-14 15:29:48 -08008#include "aos/network/message_bridge_client_lib.h"
9#include "aos/network/message_bridge_server_lib.h"
Jim Ostrowski2192ddb2020-06-24 19:07:31 -070010#include "aos/network/team_number.h"
Austin Schuhe991fe22020-11-18 16:53:39 -080011#include "aos/util/file.h"
Brian Silverman7b266d92021-02-17 21:24:02 -080012#include "gtest/gtest.h"
Austin Schuhe84c3ed2019-12-14 15:29:48 -080013
14namespace aos {
Austin Schuh2f8fd752020-09-01 22:38:28 -070015void SetShmBase(const std::string_view base);
16
Austin Schuhe84c3ed2019-12-14 15:29:48 -080017namespace message_bridge {
18namespace testing {
19
20namespace chrono = std::chrono;
21
Austin Schuhe991fe22020-11-18 16:53:39 -080022std::string ShmBase(const std::string_view node) {
Austin Schuh2f8fd752020-09-01 22:38:28 -070023 const char *tmpdir_c_str = getenv("TEST_TMPDIR");
24 if (tmpdir_c_str != nullptr) {
Austin Schuhe991fe22020-11-18 16:53:39 -080025 return absl::StrCat(tmpdir_c_str, "/", node);
Austin Schuh2f8fd752020-09-01 22:38:28 -070026 } else {
Austin Schuhe991fe22020-11-18 16:53:39 -080027 return absl::StrCat("/dev/shm/", node);
Austin Schuh2f8fd752020-09-01 22:38:28 -070028 }
29}
30
Austin Schuhe991fe22020-11-18 16:53:39 -080031void DoSetShmBase(const std::string_view node) {
32 aos::SetShmBase(ShmBase(node));
33}
34
Austin Schuh36a2c3e2021-02-18 22:28:38 -080035// Parameters to run all the tests with.
36struct Param {
37 // The config file to use.
38 std::string config;
39 // If true, the RemoteMessage channel should be shared between all the remote
40 // channels. If false, there will be 1 RemoteMessage channel per remote
41 // channel.
42 bool shared;
43};
44
45class MessageBridgeParameterizedTest
46 : public ::testing::TestWithParam<struct Param> {
Austin Schuh0de30f32020-12-06 12:44:28 -080047 public:
Austin Schuh36a2c3e2021-02-18 22:28:38 -080048 MessageBridgeParameterizedTest()
49 : config(aos::configuration::ReadConfig(
50 absl::StrCat("aos/network/", GetParam().config))) {
Austin Schuh0de30f32020-12-06 12:44:28 -080051 util::UnlinkRecursive(ShmBase("pi1"));
52 util::UnlinkRecursive(ShmBase("pi2"));
53 }
Austin Schuhe991fe22020-11-18 16:53:39 -080054
Austin Schuh36a2c3e2021-02-18 22:28:38 -080055 bool shared() const { return GetParam().shared; }
56
Austin Schuh0a2f12f2021-01-08 22:48:29 -080057 void OnPi1() {
58 DoSetShmBase("pi1");
59 FLAGS_override_hostname = "raspberrypi";
60 }
61
62 void OnPi2() {
63 DoSetShmBase("pi2");
64 FLAGS_override_hostname = "raspberrypi2";
65 }
66
67 void MakePi1Server() {
68 OnPi1();
69 FLAGS_application_name = "pi1_message_bridge_server";
70 pi1_server_event_loop =
Austin Schuhf466ab52021-02-16 22:00:38 -080071 std::make_unique<aos::ShmEventLoop>(&config.message());
Austin Schuh0a2f12f2021-01-08 22:48:29 -080072 pi1_server_event_loop->SetRuntimeRealtimePriority(1);
73 pi1_message_bridge_server =
74 std::make_unique<MessageBridgeServer>(pi1_server_event_loop.get());
75 }
76
77 void RunPi1Server(chrono::nanoseconds duration) {
78 // Setup a shutdown callback.
79 aos::TimerHandler *const quit = pi1_server_event_loop->AddTimer(
80 [this]() { pi1_server_event_loop->Exit(); });
81 pi1_server_event_loop->OnRun([this, quit, duration]() {
82 // Stop between timestamps, not exactly on them.
83 quit->Setup(pi1_server_event_loop->monotonic_now() + duration);
84 });
85
86 pi1_server_event_loop->Run();
87 }
88
89 void StartPi1Server() {
90 pi1_server_thread = std::thread([this]() {
91 LOG(INFO) << "Started pi1_message_bridge_server";
92 pi1_server_event_loop->Run();
93 });
94 }
95
96 void StopPi1Server() {
97 if (pi1_server_thread.joinable()) {
98 pi1_server_event_loop->Exit();
99 pi1_server_thread.join();
100 pi1_server_thread = std::thread();
101 }
102 pi1_message_bridge_server.reset();
103 pi1_server_event_loop.reset();
104 }
105
106 void MakePi1Client() {
107 OnPi1();
108 FLAGS_application_name = "pi1_message_bridge_client";
109 pi1_client_event_loop =
Austin Schuhf466ab52021-02-16 22:00:38 -0800110 std::make_unique<aos::ShmEventLoop>(&config.message());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800111 pi1_client_event_loop->SetRuntimeRealtimePriority(1);
112 pi1_message_bridge_client =
113 std::make_unique<MessageBridgeClient>(pi1_client_event_loop.get());
114 }
115
116 void StartPi1Client() {
117 pi1_client_thread = std::thread([this]() {
118 LOG(INFO) << "Started pi1_message_bridge_client";
119 pi1_client_event_loop->Run();
120 });
121 }
122
123 void StopPi1Client() {
124 pi1_client_event_loop->Exit();
125 pi1_client_thread.join();
126 pi1_client_thread = std::thread();
127 pi1_message_bridge_client.reset();
128 pi1_client_event_loop.reset();
129 }
130
131 void MakePi1Test() {
132 OnPi1();
133 FLAGS_application_name = "test1";
134 pi1_test_event_loop =
Austin Schuhf466ab52021-02-16 22:00:38 -0800135 std::make_unique<aos::ShmEventLoop>(&config.message());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800136
137 pi1_test_event_loop->MakeWatcher(
138 "/pi1/aos", [](const ServerStatistics &stats) {
139 VLOG(1) << "/pi1/aos ServerStatistics " << FlatbufferToJson(&stats);
140 });
141
142 pi1_test_event_loop->MakeWatcher(
143 "/pi1/aos", [](const ClientStatistics &stats) {
144 VLOG(1) << "/pi1/aos ClientStatistics " << FlatbufferToJson(&stats);
145 });
146
147 pi1_test_event_loop->MakeWatcher(
148 "/pi1/aos", [](const Timestamp &timestamp) {
149 VLOG(1) << "/pi1/aos Timestamp " << FlatbufferToJson(&timestamp);
150 });
151 }
152
153 void StartPi1Test() {
154 pi1_test_thread = std::thread([this]() {
155 LOG(INFO) << "Started pi1_test";
156 pi1_test_event_loop->Run();
157 });
158 }
159
160 void StopPi1Test() {
161 pi1_test_event_loop->Exit();
162 pi1_test_thread.join();
163 }
164
165 void MakePi2Server() {
166 OnPi2();
167 FLAGS_application_name = "pi2_message_bridge_server";
168 pi2_server_event_loop =
Austin Schuhf466ab52021-02-16 22:00:38 -0800169 std::make_unique<aos::ShmEventLoop>(&config.message());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800170 pi2_server_event_loop->SetRuntimeRealtimePriority(1);
171 pi2_message_bridge_server =
172 std::make_unique<MessageBridgeServer>(pi2_server_event_loop.get());
173 }
174
175 void RunPi2Server(chrono::nanoseconds duration) {
176 // Setup a shutdown callback.
177 aos::TimerHandler *const quit = pi2_server_event_loop->AddTimer(
178 [this]() { pi2_server_event_loop->Exit(); });
179 pi2_server_event_loop->OnRun([this, quit, duration]() {
180 // Stop between timestamps, not exactly on them.
181 quit->Setup(pi2_server_event_loop->monotonic_now() + duration);
182 });
183
184 pi2_server_event_loop->Run();
185 }
186
187 void StartPi2Server() {
188 pi2_server_thread = std::thread([this]() {
189 LOG(INFO) << "Started pi2_message_bridge_server";
190 pi2_server_event_loop->Run();
191 });
192 }
193
194 void StopPi2Server() {
195 if (pi2_server_thread.joinable()) {
196 pi2_server_event_loop->Exit();
197 pi2_server_thread.join();
198 pi2_server_thread = std::thread();
199 }
200 pi2_message_bridge_server.reset();
201 pi2_server_event_loop.reset();
202 }
203
204 void MakePi2Client() {
205 OnPi2();
206 FLAGS_application_name = "pi2_message_bridge_client";
207 pi2_client_event_loop =
Austin Schuhf466ab52021-02-16 22:00:38 -0800208 std::make_unique<aos::ShmEventLoop>(&config.message());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800209 pi2_client_event_loop->SetRuntimeRealtimePriority(1);
210 pi2_message_bridge_client =
211 std::make_unique<MessageBridgeClient>(pi2_client_event_loop.get());
212 }
213
214 void RunPi2Client(chrono::nanoseconds duration) {
215 // Run for 5 seconds to make sure we have time to estimate the offset.
216 aos::TimerHandler *const quit = pi2_client_event_loop->AddTimer(
217 [this]() { pi2_client_event_loop->Exit(); });
218 pi2_client_event_loop->OnRun([this, quit, duration]() {
219 // Stop between timestamps, not exactly on them.
220 quit->Setup(pi2_client_event_loop->monotonic_now() + duration);
221 });
222
223 // And go!
224 pi2_client_event_loop->Run();
225 }
226
227 void StartPi2Client() {
228 pi2_client_thread = std::thread([this]() {
229 LOG(INFO) << "Started pi2_message_bridge_client";
230 pi2_client_event_loop->Run();
231 });
232 }
233
234 void StopPi2Client() {
235 if (pi2_client_thread.joinable()) {
236 pi2_client_event_loop->Exit();
237 pi2_client_thread.join();
238 pi2_client_thread = std::thread();
239 }
240 pi2_message_bridge_client.reset();
241 pi2_client_event_loop.reset();
242 }
243
244 void MakePi2Test() {
245 OnPi2();
246 FLAGS_application_name = "test2";
247 pi2_test_event_loop =
Austin Schuhf466ab52021-02-16 22:00:38 -0800248 std::make_unique<aos::ShmEventLoop>(&config.message());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800249
250 pi2_test_event_loop->MakeWatcher(
251 "/pi2/aos", [](const ServerStatistics &stats) {
252 VLOG(1) << "/pi2/aos ServerStatistics " << FlatbufferToJson(&stats);
253 });
254
255 pi2_test_event_loop->MakeWatcher(
256 "/pi2/aos", [](const ClientStatistics &stats) {
257 VLOG(1) << "/pi2/aos ClientStatistics " << FlatbufferToJson(&stats);
258 });
259
260 pi2_test_event_loop->MakeWatcher(
261 "/pi2/aos", [](const Timestamp &timestamp) {
262 VLOG(1) << "/pi2/aos Timestamp " << FlatbufferToJson(&timestamp);
263 });
264 }
265
266 void StartPi2Test() {
267 pi2_test_thread = std::thread([this]() {
268 LOG(INFO) << "Started pi2_message_bridge_test";
269 pi2_test_event_loop->Run();
270 });
271 }
272
273 void StopPi2Test() {
274 pi2_test_event_loop->Exit();
275 pi2_test_thread.join();
276 }
277
Austin Schuhf466ab52021-02-16 22:00:38 -0800278 aos::FlatbufferDetachedBuffer<aos::Configuration> config;
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800279
280 std::unique_ptr<aos::ShmEventLoop> pi1_server_event_loop;
281 std::unique_ptr<MessageBridgeServer> pi1_message_bridge_server;
282 std::thread pi1_server_thread;
283
284 std::unique_ptr<aos::ShmEventLoop> pi1_client_event_loop;
285 std::unique_ptr<MessageBridgeClient> pi1_message_bridge_client;
286 std::thread pi1_client_thread;
287
288 std::unique_ptr<aos::ShmEventLoop> pi1_test_event_loop;
289 std::thread pi1_test_thread;
290
291 std::unique_ptr<aos::ShmEventLoop> pi2_server_event_loop;
292 std::unique_ptr<MessageBridgeServer> pi2_message_bridge_server;
293 std::thread pi2_server_thread;
294
295 std::unique_ptr<aos::ShmEventLoop> pi2_client_event_loop;
296 std::unique_ptr<MessageBridgeClient> pi2_message_bridge_client;
297 std::thread pi2_client_thread;
298
299 std::unique_ptr<aos::ShmEventLoop> pi2_test_event_loop;
300 std::thread pi2_test_thread;
Austin Schuhe991fe22020-11-18 16:53:39 -0800301};
302
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800303// Test that we can send a ping message over sctp and receive it.
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800304TEST_P(MessageBridgeParameterizedTest, PingPong) {
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800305 // This is rather annoying to set up. We need to start up a client and
306 // server, on the same node, but get them to think that they are on different
307 // nodes.
308 //
309 // We then get to wait until they are connected.
310 //
311 // After they are connected, we send a Ping message.
312 //
313 // On the other end, we receive a Pong message.
314 //
315 // But, we need the client to not post directly to "/test" like it would in a
316 // real system, otherwise we will re-send the ping message... So, use an
317 // application specific map to have the client post somewhere else.
318 //
319 // To top this all off, each of these needs to be done with a ShmEventLoop,
320 // which needs to run in a separate thread... And it is really hard to get
321 // everything started up reliably. So just be super generous on timeouts and
322 // hope for the best. We can be more generous in the future if we need to.
323 //
324 // We are faking the application names by passing in --application_name=foo
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800325 OnPi1();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800326 // Force ourselves to be "raspberrypi" and allocate everything.
Austin Schuh2f8fd752020-09-01 22:38:28 -0700327
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800328 MakePi1Server();
329 MakePi1Client();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800330
331 // And build the app which sends the pings.
332 FLAGS_application_name = "ping";
Austin Schuhf466ab52021-02-16 22:00:38 -0800333 aos::ShmEventLoop ping_event_loop(&config.message());
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800334 aos::Sender<examples::Ping> ping_sender =
335 ping_event_loop.MakeSender<examples::Ping>("/test");
336
Austin Schuhf466ab52021-02-16 22:00:38 -0800337 aos::ShmEventLoop pi1_test_event_loop(&config.message());
Austin Schuh0de30f32020-12-06 12:44:28 -0800338 aos::Fetcher<RemoteMessage> message_header_fetcher1 =
339 pi1_test_event_loop.MakeFetcher<RemoteMessage>(
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800340 shared() ? "/pi1/aos/remote_timestamps/pi2"
341 : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping");
Austin Schuh2f8fd752020-09-01 22:38:28 -0700342
343 // Fetchers for confirming the remote timestamps made it.
344 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
345 ping_event_loop.MakeFetcher<examples::Ping>("/test");
346 aos::Fetcher<Timestamp> pi1_on_pi1_timestamp_fetcher =
347 ping_event_loop.MakeFetcher<Timestamp>("/aos");
348
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800349 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800350 OnPi2();
Austin Schuh2f8fd752020-09-01 22:38:28 -0700351
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800352 MakePi2Client();
353 MakePi2Server();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800354
355 // And build the app which sends the pongs.
356 FLAGS_application_name = "pong";
Austin Schuhf466ab52021-02-16 22:00:38 -0800357 aos::ShmEventLoop pong_event_loop(&config.message());
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800358
Austin Schuh7bc59052020-02-16 23:48:33 -0800359 // And build the app for testing.
360 FLAGS_application_name = "test";
Austin Schuhf466ab52021-02-16 22:00:38 -0800361 aos::ShmEventLoop test_event_loop(&config.message());
Austin Schuh7bc59052020-02-16 23:48:33 -0800362
363 aos::Fetcher<ClientStatistics> client_statistics_fetcher =
364 test_event_loop.MakeFetcher<ClientStatistics>("/aos");
Austin Schuh0de30f32020-12-06 12:44:28 -0800365 aos::Fetcher<RemoteMessage> message_header_fetcher2 =
366 test_event_loop.MakeFetcher<RemoteMessage>(
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800367 shared() ? "/pi2/aos/remote_timestamps/pi1"
368 : "/pi2/aos/remote_timestamps/pi1/pi2/aos/"
369 "aos-message_bridge-Timestamp");
Austin Schuh2f8fd752020-09-01 22:38:28 -0700370
371 // Event loop for fetching data delivered to pi2 from pi1 to match up
372 // messages.
Austin Schuhf466ab52021-02-16 22:00:38 -0800373 aos::ShmEventLoop delivered_messages_event_loop(&config.message());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700374 aos::Fetcher<Timestamp> pi1_on_pi2_timestamp_fetcher =
375 delivered_messages_event_loop.MakeFetcher<Timestamp>("/pi1/aos");
376 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
377 delivered_messages_event_loop.MakeFetcher<examples::Ping>("/test");
378 EXPECT_FALSE(ping_on_pi2_fetcher.Fetch());
379 EXPECT_FALSE(pi1_on_pi2_timestamp_fetcher.Fetch());
Austin Schuh7bc59052020-02-16 23:48:33 -0800380
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800381 // Count the pongs.
382 int pong_count = 0;
383 pong_event_loop.MakeWatcher(
Austin Schuh2f8fd752020-09-01 22:38:28 -0700384 "/test", [&pong_count](const examples::Ping &ping) {
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800385 ++pong_count;
Austin Schuh1ca49e92020-12-11 00:01:27 -0800386 VLOG(1) << "Got ping back " << FlatbufferToJson(&ping);
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800387 });
388
389 FLAGS_override_hostname = "";
390
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800391 // Wait until we are connected, then send.
392 int ping_count = 0;
Austin Schuh7bc59052020-02-16 23:48:33 -0800393 int pi1_server_statistics_count = 0;
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800394 ping_event_loop.MakeWatcher(
Austin Schuh196a4452020-03-15 23:12:03 -0700395 "/pi1/aos",
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800396 [this, &ping_count, &ping_sender,
Austin Schuh7bc59052020-02-16 23:48:33 -0800397 &pi1_server_statistics_count](const ServerStatistics &stats) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800398 VLOG(1) << "/pi1/aos ServerStatistics " << FlatbufferToJson(&stats);
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800399
400 ASSERT_TRUE(stats.has_connections());
401 EXPECT_EQ(stats.connections()->size(), 1);
402
403 bool connected = false;
404 for (const ServerConnection *connection : *stats.connections()) {
Austin Schuh7bc59052020-02-16 23:48:33 -0800405 // Confirm that we are estimating the server time offset correctly. It
406 // should be about 0 since we are on the same machine here.
407 if (connection->has_monotonic_offset()) {
408 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
409 chrono::milliseconds(1));
410 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
411 chrono::milliseconds(-1));
412 ++pi1_server_statistics_count;
413 }
414
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800415 if (connection->node()->name()->string_view() ==
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800416 pi2_client_event_loop->node()->name()->string_view()) {
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800417 if (connection->state() == State::CONNECTED) {
Austin Schuh20ac95d2020-12-05 17:24:19 -0800418 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800419 connected = true;
420 }
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800421 }
422 }
423
424 if (connected) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800425 VLOG(1) << "Connected! Sent ping.";
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800426 auto builder = ping_sender.MakeBuilder();
427 examples::Ping::Builder ping_builder =
428 builder.MakeBuilder<examples::Ping>();
429 ping_builder.add_value(ping_count + 971);
430 builder.Send(ping_builder.Finish());
431 ++ping_count;
432 }
433 });
434
Austin Schuh7bc59052020-02-16 23:48:33 -0800435 // Confirm both client and server statistics messages have decent offsets in
436 // them.
437 int pi2_server_statistics_count = 0;
Austin Schuh196a4452020-03-15 23:12:03 -0700438 pong_event_loop.MakeWatcher("/pi2/aos", [&pi2_server_statistics_count](
Austin Schuh7bc59052020-02-16 23:48:33 -0800439 const ServerStatistics &stats) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800440 VLOG(1) << "/pi2/aos ServerStatistics " << FlatbufferToJson(&stats);
Austin Schuh7bc59052020-02-16 23:48:33 -0800441 for (const ServerConnection *connection : *stats.connections()) {
442 if (connection->has_monotonic_offset()) {
443 ++pi2_server_statistics_count;
444 // Confirm that we are estimating the server time offset correctly. It
445 // should be about 0 since we are on the same machine here.
446 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
447 chrono::milliseconds(1));
448 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
449 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800450 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh7bc59052020-02-16 23:48:33 -0800451 }
452 }
453 });
454
455 int pi1_client_statistics_count = 0;
Austin Schuh5344c352020-04-12 17:04:26 -0700456 ping_event_loop.MakeWatcher("/pi1/aos", [&pi1_client_statistics_count](
457 const ClientStatistics &stats) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800458 VLOG(1) << "/pi1/aos ClientStatistics " << FlatbufferToJson(&stats);
Austin Schuh7bc59052020-02-16 23:48:33 -0800459
Austin Schuh5344c352020-04-12 17:04:26 -0700460 for (const ClientConnection *connection : *stats.connections()) {
461 if (connection->has_monotonic_offset()) {
462 ++pi1_client_statistics_count;
463 // It takes at least 10 microseconds to send a message between the
464 // client and server. The min (filtered) time shouldn't be over 10
465 // milliseconds on localhost. This might have to bump up if this is
466 // proving flaky.
467 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
Austin Schuh3edddcc2020-12-29 13:32:02 -0800468 chrono::milliseconds(10))
469 << " " << connection->monotonic_offset()
470 << "ns vs 10000ns on iteration " << pi1_client_statistics_count;
Austin Schuh5344c352020-04-12 17:04:26 -0700471 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
Austin Schuh3edddcc2020-12-29 13:32:02 -0800472 chrono::microseconds(10))
473 << " " << connection->monotonic_offset()
474 << "ns vs 10000ns on iteration " << pi1_client_statistics_count;
Austin Schuh5344c352020-04-12 17:04:26 -0700475 }
476 }
477 });
Austin Schuh7bc59052020-02-16 23:48:33 -0800478
479 int pi2_client_statistics_count = 0;
Austin Schuh196a4452020-03-15 23:12:03 -0700480 pong_event_loop.MakeWatcher("/pi2/aos", [&pi2_client_statistics_count](
Austin Schuh7bc59052020-02-16 23:48:33 -0800481 const ClientStatistics &stats) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800482 VLOG(1) << "/pi2/aos ClientStatistics " << FlatbufferToJson(&stats);
Austin Schuh7bc59052020-02-16 23:48:33 -0800483
484 for (const ClientConnection *connection : *stats.connections()) {
485 if (connection->has_monotonic_offset()) {
486 ++pi2_client_statistics_count;
487 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
488 chrono::milliseconds(10));
489 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
490 chrono::microseconds(10));
491 }
492 }
493 });
494
Austin Schuh196a4452020-03-15 23:12:03 -0700495 ping_event_loop.MakeWatcher("/pi1/aos", [](const Timestamp &timestamp) {
Austin Schuh7bc59052020-02-16 23:48:33 -0800496 EXPECT_TRUE(timestamp.has_offsets());
Austin Schuh1ca49e92020-12-11 00:01:27 -0800497 VLOG(1) << "/pi1/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuh7bc59052020-02-16 23:48:33 -0800498 });
Austin Schuh196a4452020-03-15 23:12:03 -0700499 pong_event_loop.MakeWatcher("/pi2/aos", [](const Timestamp &timestamp) {
Austin Schuh7bc59052020-02-16 23:48:33 -0800500 EXPECT_TRUE(timestamp.has_offsets());
Austin Schuh1ca49e92020-12-11 00:01:27 -0800501 VLOG(1) << "/pi2/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuh7bc59052020-02-16 23:48:33 -0800502 });
503
504 // Run for 5 seconds to make sure we have time to estimate the offset.
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800505 aos::TimerHandler *quit = ping_event_loop.AddTimer(
506 [&ping_event_loop]() { ping_event_loop.Exit(); });
507 ping_event_loop.OnRun([quit, &ping_event_loop]() {
Austin Schuh7bc59052020-02-16 23:48:33 -0800508 // Stop between timestamps, not exactly on them.
509 quit->Setup(ping_event_loop.monotonic_now() + chrono::milliseconds(5050));
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800510 });
511
Austin Schuh2f8fd752020-09-01 22:38:28 -0700512 // Find the channel index for both the /pi1/aos Timestamp channel and Ping
513 // channel.
514 const size_t pi1_timestamp_channel = configuration::ChannelIndex(
515 pong_event_loop.configuration(), pi1_on_pi2_timestamp_fetcher.channel());
516 const size_t ping_timestamp_channel =
517 configuration::ChannelIndex(delivered_messages_event_loop.configuration(),
518 ping_on_pi2_fetcher.channel());
519
520 for (const Channel *channel : *ping_event_loop.configuration()->channels()) {
521 VLOG(1) << "Channel "
522 << configuration::ChannelIndex(ping_event_loop.configuration(),
523 channel)
524 << " " << configuration::CleanedChannelToString(channel);
525 }
526
527 // For each remote timestamp we get back, confirm that it is either a ping
528 // message, or a timestamp we sent out. Also confirm that the timestamps are
529 // correct.
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800530 for (std::pair<int, std::string> channel :
531 shared()
532 ? std::vector<std::pair<
533 int, std::string>>{{-1, "/pi1/aos/remote_timestamps/pi2"}}
534 : std::vector<std::pair<int, std::string>>{
535 {pi1_timestamp_channel,
536 "/pi1/aos/remote_timestamps/pi2/pi1/aos/"
537 "aos-message_bridge-Timestamp"},
538 {ping_timestamp_channel,
539 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping"}}) {
540 ping_event_loop.MakeWatcher(
541 channel.second,
542 [pi1_timestamp_channel, ping_timestamp_channel, &ping_on_pi2_fetcher,
543 &ping_on_pi1_fetcher, &pi1_on_pi2_timestamp_fetcher,
544 &pi1_on_pi1_timestamp_fetcher,
545 channel_index = channel.first](const RemoteMessage &header) {
546 VLOG(1) << "/pi1/aos/remote_timestamps/pi2 RemoteMessage "
547 << aos::FlatbufferToJson(&header);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700548
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800549 EXPECT_TRUE(header.has_boot_uuid());
550 if (channel_index != -1) {
551 ASSERT_EQ(channel_index, header.channel_index());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700552 }
553
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800554 const aos::monotonic_clock::time_point header_monotonic_sent_time(
555 chrono::nanoseconds(header.monotonic_sent_time()));
556 const aos::realtime_clock::time_point header_realtime_sent_time(
557 chrono::nanoseconds(header.realtime_sent_time()));
558 const aos::monotonic_clock::time_point header_monotonic_remote_time(
559 chrono::nanoseconds(header.monotonic_remote_time()));
560 const aos::realtime_clock::time_point header_realtime_remote_time(
561 chrono::nanoseconds(header.realtime_remote_time()));
562
563 const Context *pi1_context = nullptr;
564 const Context *pi2_context = nullptr;
565
566 if (header.channel_index() == pi1_timestamp_channel) {
567 // Find the forwarded message.
568 while (pi1_on_pi2_timestamp_fetcher.context().monotonic_event_time <
569 header_monotonic_sent_time) {
570 ASSERT_TRUE(pi1_on_pi2_timestamp_fetcher.FetchNext());
571 }
572
573 // And the source message.
574 while (pi1_on_pi1_timestamp_fetcher.context().monotonic_event_time <
575 header_monotonic_remote_time) {
576 ASSERT_TRUE(pi1_on_pi1_timestamp_fetcher.FetchNext());
577 }
578
579 pi1_context = &pi1_on_pi1_timestamp_fetcher.context();
580 pi2_context = &pi1_on_pi2_timestamp_fetcher.context();
581 } else if (header.channel_index() == ping_timestamp_channel) {
582 // Find the forwarded message.
583 while (ping_on_pi2_fetcher.context().monotonic_event_time <
584 header_monotonic_sent_time) {
585 ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
586 }
587
588 // And the source message.
589 while (ping_on_pi1_fetcher.context().monotonic_event_time <
590 header_monotonic_remote_time) {
591 ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
592 }
593
594 pi1_context = &ping_on_pi1_fetcher.context();
595 pi2_context = &ping_on_pi2_fetcher.context();
596 } else {
597 LOG(FATAL) << "Unknown channel";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700598 }
599
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800600 // Confirm the forwarded message has matching timestamps to the
601 // timestamps we got back.
602 EXPECT_EQ(pi2_context->queue_index, header.queue_index());
603 EXPECT_EQ(pi2_context->monotonic_event_time,
604 header_monotonic_sent_time);
605 EXPECT_EQ(pi2_context->realtime_event_time,
606 header_realtime_sent_time);
607 EXPECT_EQ(pi2_context->realtime_remote_time,
608 header_realtime_remote_time);
609 EXPECT_EQ(pi2_context->monotonic_remote_time,
610 header_monotonic_remote_time);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700611
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800612 // Confirm the forwarded message also matches the source message.
613 EXPECT_EQ(pi1_context->queue_index, header.queue_index());
614 EXPECT_EQ(pi1_context->monotonic_event_time,
615 header_monotonic_remote_time);
616 EXPECT_EQ(pi1_context->realtime_event_time,
617 header_realtime_remote_time);
618 });
619 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700620
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800621 // Start everything up. Pong is the only thing we don't know how to wait
622 // on, so start it first.
Austin Schuh7bc59052020-02-16 23:48:33 -0800623 std::thread pong_thread([&pong_event_loop]() { pong_event_loop.Run(); });
624
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800625 StartPi1Server();
626 StartPi1Client();
627 StartPi2Client();
628 StartPi2Server();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800629
630 // And go!
631 ping_event_loop.Run();
632
633 // Shut everyone else down
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800634 StopPi1Server();
635 StopPi1Client();
636 StopPi2Client();
637 StopPi2Server();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800638 pong_event_loop.Exit();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800639 pong_thread.join();
640
641 // Make sure we sent something.
642 EXPECT_GE(ping_count, 1);
643 // And got something back.
644 EXPECT_GE(pong_count, 1);
Austin Schuh7bc59052020-02-16 23:48:33 -0800645
646 // Confirm that we are estimating a monotonic offset on the client.
647 ASSERT_TRUE(client_statistics_fetcher.Fetch());
648
649 EXPECT_EQ(client_statistics_fetcher->connections()->size(), 1u);
650 EXPECT_EQ(client_statistics_fetcher->connections()
651 ->Get(0)
652 ->node()
653 ->name()
654 ->string_view(),
655 "pi1");
656
657 // Make sure the offset in one direction is less than a second.
658 EXPECT_GT(
659 client_statistics_fetcher->connections()->Get(0)->monotonic_offset(), 0);
660 EXPECT_LT(
661 client_statistics_fetcher->connections()->Get(0)->monotonic_offset(),
662 1000000000);
663
664 EXPECT_GE(pi1_server_statistics_count, 2);
665 EXPECT_GE(pi2_server_statistics_count, 2);
666 EXPECT_GE(pi1_client_statistics_count, 2);
667 EXPECT_GE(pi2_client_statistics_count, 2);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700668
669 // Confirm we got timestamps back!
670 EXPECT_TRUE(message_header_fetcher1.Fetch());
671 EXPECT_TRUE(message_header_fetcher2.Fetch());
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800672}
673
Austin Schuh5344c352020-04-12 17:04:26 -0700674// Test that the client disconnecting triggers the server offsets on both sides
675// to clear.
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800676TEST_P(MessageBridgeParameterizedTest, ClientRestart) {
Austin Schuh5344c352020-04-12 17:04:26 -0700677 // This is rather annoying to set up. We need to start up a client and
678 // server, on the same node, but get them to think that they are on different
679 // nodes.
680 //
681 // We need the client to not post directly to "/test" like it would in a
682 // real system, otherwise we will re-send the ping message... So, use an
683 // application specific map to have the client post somewhere else.
684 //
685 // To top this all off, each of these needs to be done with a ShmEventLoop,
686 // which needs to run in a separate thread... And it is really hard to get
687 // everything started up reliably. So just be super generous on timeouts and
688 // hope for the best. We can be more generous in the future if we need to.
689 //
690 // We are faking the application names by passing in --application_name=foo
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800691 OnPi1();
Austin Schuh5344c352020-04-12 17:04:26 -0700692
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800693 MakePi1Server();
694 MakePi1Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700695
696 // And build the app for testing.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800697 MakePi1Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700698 aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800699 pi1_test_event_loop->MakeFetcher<ServerStatistics>("/pi1/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700700
701 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800702 OnPi2();
703 MakePi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700704
705 // And build the app for testing.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800706 MakePi2Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700707 aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800708 pi2_test_event_loop->MakeFetcher<ServerStatistics>("/pi2/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700709
710 // Wait until we are connected, then send.
Austin Schuh5344c352020-04-12 17:04:26 -0700711
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800712 StartPi1Test();
713 StartPi2Test();
714 StartPi1Server();
715 StartPi1Client();
716 StartPi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700717
718 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800719 MakePi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700720
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800721 RunPi2Client(chrono::milliseconds(3050));
Austin Schuh5344c352020-04-12 17:04:26 -0700722
723 // Now confirm we are synchronized.
724 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
725 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
726
727 const ServerConnection *const pi1_connection =
728 pi1_server_statistics_fetcher->connections()->Get(0);
729 const ServerConnection *const pi2_connection =
730 pi2_server_statistics_fetcher->connections()->Get(0);
731
732 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
733 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
734 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
735 chrono::milliseconds(1));
736 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
737 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800738 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700739
740 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
741 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
742 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
743 chrono::milliseconds(1));
744 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
745 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800746 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800747
748 StopPi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700749 }
750
751 std::this_thread::sleep_for(std::chrono::seconds(2));
752
753 {
754 // Now confirm we are un-synchronized.
755 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
756 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
757 const ServerConnection *const pi1_connection =
758 pi1_server_statistics_fetcher->connections()->Get(0);
759 const ServerConnection *const pi2_connection =
760 pi2_server_statistics_fetcher->connections()->Get(0);
761
762 EXPECT_EQ(pi1_connection->state(), State::DISCONNECTED);
763 EXPECT_FALSE(pi1_connection->has_monotonic_offset());
Austin Schuh20ac95d2020-12-05 17:24:19 -0800764 EXPECT_FALSE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700765 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
766 EXPECT_FALSE(pi2_connection->has_monotonic_offset());
Austin Schuh20ac95d2020-12-05 17:24:19 -0800767 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700768 }
769
770 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800771 MakePi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700772 // And go!
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800773 RunPi2Client(chrono::milliseconds(3050));
Austin Schuh5344c352020-04-12 17:04:26 -0700774
775 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
776 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
777
778 // Now confirm we are synchronized again.
779 const ServerConnection *const pi1_connection =
780 pi1_server_statistics_fetcher->connections()->Get(0);
781 const ServerConnection *const pi2_connection =
782 pi2_server_statistics_fetcher->connections()->Get(0);
783
784 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
785 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
786 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
787 chrono::milliseconds(1));
788 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
789 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800790 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700791
792 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
793 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
794 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
795 chrono::milliseconds(1));
796 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
797 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800798 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800799
800 StopPi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700801 }
802
803 // Shut everyone else down
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800804 StopPi1Server();
805 StopPi1Client();
806 StopPi2Server();
807 StopPi1Test();
808 StopPi2Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700809}
810
811// Test that the server disconnecting triggers the server offsets on the other
812// side to clear, along with the other client.
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800813TEST_P(MessageBridgeParameterizedTest, ServerRestart) {
Austin Schuh5344c352020-04-12 17:04:26 -0700814 // This is rather annoying to set up. We need to start up a client and
815 // server, on the same node, but get them to think that they are on different
816 // nodes.
817 //
818 // We need the client to not post directly to "/test" like it would in a
819 // real system, otherwise we will re-send the ping message... So, use an
820 // application specific map to have the client post somewhere else.
821 //
822 // To top this all off, each of these needs to be done with a ShmEventLoop,
823 // which needs to run in a separate thread... And it is really hard to get
824 // everything started up reliably. So just be super generous on timeouts and
825 // hope for the best. We can be more generous in the future if we need to.
826 //
827 // We are faking the application names by passing in --application_name=foo
Austin Schuh5344c352020-04-12 17:04:26 -0700828 // Force ourselves to be "raspberrypi" and allocate everything.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800829 OnPi1();
830 MakePi1Server();
831 MakePi1Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700832
833 // And build the app for testing.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800834 MakePi1Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700835 aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800836 pi1_test_event_loop->MakeFetcher<ServerStatistics>("/pi1/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700837 aos::Fetcher<ClientStatistics> pi1_client_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800838 pi1_test_event_loop->MakeFetcher<ClientStatistics>("/pi1/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700839
840 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800841 OnPi2();
842 MakePi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700843
844 // And build the app for testing.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800845 MakePi2Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700846 aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800847 pi2_test_event_loop->MakeFetcher<ServerStatistics>("/pi2/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700848
849 // Start everything up. Pong is the only thing we don't know how to wait on,
850 // so start it first.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800851 StartPi1Test();
852 StartPi2Test();
853 StartPi1Server();
854 StartPi1Client();
855 StartPi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700856
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800857 // Confirm both client and server statistics messages have decent offsets in
858 // them.
Austin Schuh5344c352020-04-12 17:04:26 -0700859
860 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800861 MakePi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700862
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800863 RunPi2Server(chrono::milliseconds(3050));
Austin Schuh5344c352020-04-12 17:04:26 -0700864
865 // Now confirm we are synchronized.
866 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
867 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
868
869 const ServerConnection *const pi1_connection =
870 pi1_server_statistics_fetcher->connections()->Get(0);
871 const ServerConnection *const pi2_connection =
872 pi2_server_statistics_fetcher->connections()->Get(0);
873
874 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
875 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
876 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
877 chrono::milliseconds(1));
878 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
879 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800880 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700881
882 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
883 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
884 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
885 chrono::milliseconds(1));
886 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
887 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800888 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800889
890 StopPi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700891 }
892
893 std::this_thread::sleep_for(std::chrono::seconds(2));
894
895 {
896 // And confirm we are unsynchronized.
897 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
898 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
899
900 const ServerConnection *const pi1_server_connection =
901 pi1_server_statistics_fetcher->connections()->Get(0);
902 const ClientConnection *const pi1_client_connection =
903 pi1_client_statistics_fetcher->connections()->Get(0);
904
905 EXPECT_EQ(pi1_server_connection->state(), State::CONNECTED);
906 EXPECT_FALSE(pi1_server_connection->has_monotonic_offset());
Austin Schuh20ac95d2020-12-05 17:24:19 -0800907 EXPECT_TRUE(pi1_server_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700908 EXPECT_EQ(pi1_client_connection->state(), State::DISCONNECTED);
909 EXPECT_FALSE(pi1_client_connection->has_monotonic_offset());
910 }
911
912 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800913 MakePi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700914
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800915 RunPi2Server(chrono::milliseconds(3050));
Austin Schuh5344c352020-04-12 17:04:26 -0700916
917 // And confirm we are synchronized again.
918 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
919 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
920
921 const ServerConnection *const pi1_connection =
922 pi1_server_statistics_fetcher->connections()->Get(0);
923 const ServerConnection *const pi2_connection =
924 pi2_server_statistics_fetcher->connections()->Get(0);
925
926 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
927 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
928 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
929 chrono::milliseconds(1));
930 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
931 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800932 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700933
934 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
935 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
936 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
937 chrono::milliseconds(1));
938 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
939 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800940 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800941
942 StopPi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700943 }
944
945 // Shut everyone else down
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800946 StopPi1Server();
947 StopPi1Client();
948 StopPi2Client();
949 StopPi1Test();
950 StopPi2Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700951}
952
Austin Schuh4889b182020-11-18 19:11:56 -0800953// TODO(austin): The above test confirms that the external state does the right
Austin Schuh5344c352020-04-12 17:04:26 -0700954// thing, but doesn't confirm that the internal state does. We either need to
955// expose a way to check the state in a thread-safe way, or need a way to jump
956// time for one node to do that.
957
Austin Schuh4889b182020-11-18 19:11:56 -0800958void SendPing(aos::Sender<examples::Ping> *sender, int value) {
959 aos::Sender<examples::Ping>::Builder builder = sender->MakeBuilder();
960 examples::Ping::Builder ping_builder = builder.MakeBuilder<examples::Ping>();
961 ping_builder.add_value(value);
962 builder.Send(ping_builder.Finish());
963}
964
965// Tests that when a message is sent before the bridge starts up, but is
966// configured as reliable, we forward it. Confirm this survives a client reset.
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800967TEST_P(MessageBridgeParameterizedTest, ReliableSentBeforeClientStartup) {
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800968 OnPi1();
Austin Schuh4889b182020-11-18 19:11:56 -0800969
970 FLAGS_application_name = "sender";
Austin Schuhf466ab52021-02-16 22:00:38 -0800971 aos::ShmEventLoop send_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -0800972 aos::Sender<examples::Ping> ping_sender =
973 send_event_loop.MakeSender<examples::Ping>("/test");
974 SendPing(&ping_sender, 1);
975 aos::Sender<examples::Ping> unreliable_ping_sender =
976 send_event_loop.MakeSender<examples::Ping>("/unreliable");
977 SendPing(&unreliable_ping_sender, 1);
978
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800979 MakePi1Server();
980 MakePi1Client();
Austin Schuh4889b182020-11-18 19:11:56 -0800981
982 FLAGS_application_name = "pi1_timestamp";
Austin Schuhf466ab52021-02-16 22:00:38 -0800983 aos::ShmEventLoop pi1_remote_timestamp_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -0800984
985 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800986 OnPi2();
Austin Schuh4889b182020-11-18 19:11:56 -0800987
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800988 MakePi2Server();
Austin Schuh4889b182020-11-18 19:11:56 -0800989
Austin Schuhf466ab52021-02-16 22:00:38 -0800990 aos::ShmEventLoop receive_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -0800991 aos::Fetcher<examples::Ping> ping_fetcher =
992 receive_event_loop.MakeFetcher<examples::Ping>("/test");
993 aos::Fetcher<examples::Ping> unreliable_ping_fetcher =
994 receive_event_loop.MakeFetcher<examples::Ping>("/unreliable");
995 aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
996 receive_event_loop.MakeFetcher<ClientStatistics>("/pi2/aos");
997
998 const size_t ping_channel_index = configuration::ChannelIndex(
999 receive_event_loop.configuration(), ping_fetcher.channel());
1000
1001 std::atomic<int> ping_timestamp_count{0};
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001002 const std::string channel_name =
1003 shared() ? "/pi1/aos/remote_timestamps/pi2"
1004 : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping";
Austin Schuh4889b182020-11-18 19:11:56 -08001005 pi1_remote_timestamp_event_loop.MakeWatcher(
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001006 channel_name, [this, channel_name, ping_channel_index,
1007 &ping_timestamp_count](const RemoteMessage &header) {
1008 VLOG(1) <<channel_name << " RemoteMessage "
Austin Schuh0de30f32020-12-06 12:44:28 -08001009 << aos::FlatbufferToJson(&header);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001010 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001011 if (shared() && header.channel_index() != ping_channel_index) {
1012 return;
Austin Schuh4889b182020-11-18 19:11:56 -08001013 }
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001014 CHECK_EQ(header.channel_index(), ping_channel_index);
1015 ++ping_timestamp_count;
Austin Schuh4889b182020-11-18 19:11:56 -08001016 });
1017
1018 // Before everything starts up, confirm there is no message.
1019 EXPECT_FALSE(ping_fetcher.Fetch());
1020 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
1021
1022 // Spin up the persistant pieces.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001023 StartPi1Server();
1024 StartPi1Client();
1025 StartPi2Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001026
1027 // Event used to wait for the timestamp counting thread to start.
1028 aos::Event event;
1029 std::thread pi1_remote_timestamp_thread(
1030 [&pi1_remote_timestamp_event_loop, &event]() {
1031 pi1_remote_timestamp_event_loop.OnRun([&event]() { event.Set(); });
1032 pi1_remote_timestamp_event_loop.Run();
1033 });
1034
1035 event.Wait();
1036
1037 {
1038 // Now, spin up a client for 2 seconds.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001039 MakePi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001040
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001041 RunPi2Client(chrono::milliseconds(2050));
Austin Schuh4889b182020-11-18 19:11:56 -08001042
1043 // Confirm there is no detected duplicate packet.
1044 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1045 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1046 ->Get(0)
1047 ->duplicate_packets(),
1048 0u);
1049
1050 EXPECT_TRUE(ping_fetcher.Fetch());
1051 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
1052 EXPECT_EQ(ping_timestamp_count, 1);
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001053
1054 StopPi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001055 }
1056
1057 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001058 // Now, spin up a client for 2 seconds.
1059 MakePi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001060
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001061 RunPi2Client(chrono::milliseconds(5050));
Austin Schuh4889b182020-11-18 19:11:56 -08001062
1063 // Confirm we detect the duplicate packet correctly.
1064 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1065 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1066 ->Get(0)
1067 ->duplicate_packets(),
1068 1u);
1069
1070 EXPECT_EQ(ping_timestamp_count, 1);
1071 EXPECT_FALSE(ping_fetcher.Fetch());
1072 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001073
1074 StopPi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001075 }
1076
1077 // Shut everyone else down
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001078 StopPi1Client();
1079 StopPi2Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001080 pi1_remote_timestamp_event_loop.Exit();
1081 pi1_remote_timestamp_thread.join();
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001082 StopPi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001083}
1084
1085// Tests that when a message is sent before the bridge starts up, but is
1086// configured as reliable, we forward it. Confirm this works across server
1087// resets.
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001088TEST_P(MessageBridgeParameterizedTest, ReliableSentBeforeServerStartup) {
Austin Schuh4889b182020-11-18 19:11:56 -08001089 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001090 OnPi2();
Austin Schuh4889b182020-11-18 19:11:56 -08001091
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001092 MakePi2Server();
1093 MakePi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001094
Austin Schuhf466ab52021-02-16 22:00:38 -08001095 aos::ShmEventLoop receive_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -08001096 aos::Fetcher<examples::Ping> ping_fetcher =
1097 receive_event_loop.MakeFetcher<examples::Ping>("/test");
1098 aos::Fetcher<examples::Ping> unreliable_ping_fetcher =
1099 receive_event_loop.MakeFetcher<examples::Ping>("/unreliable");
1100 aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
1101 receive_event_loop.MakeFetcher<ClientStatistics>("/pi2/aos");
1102
Austin Schuh4889b182020-11-18 19:11:56 -08001103 // Force ourselves to be "raspberrypi" and allocate everything.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001104 OnPi1();
Austin Schuh4889b182020-11-18 19:11:56 -08001105
1106 FLAGS_application_name = "sender";
Austin Schuhf466ab52021-02-16 22:00:38 -08001107 aos::ShmEventLoop send_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -08001108 aos::Sender<examples::Ping> ping_sender =
1109 send_event_loop.MakeSender<examples::Ping>("/test");
1110 {
1111 aos::Sender<examples::Ping>::Builder builder = ping_sender.MakeBuilder();
1112 examples::Ping::Builder ping_builder =
1113 builder.MakeBuilder<examples::Ping>();
1114 ping_builder.add_value(1);
1115 builder.Send(ping_builder.Finish());
1116 }
1117
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001118 MakePi1Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001119
1120 FLAGS_application_name = "pi1_timestamp";
Austin Schuhf466ab52021-02-16 22:00:38 -08001121 aos::ShmEventLoop pi1_remote_timestamp_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -08001122
1123 const size_t ping_channel_index = configuration::ChannelIndex(
1124 receive_event_loop.configuration(), ping_fetcher.channel());
1125
1126 std::atomic<int> ping_timestamp_count{0};
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001127 const std::string channel_name =
1128 shared() ? "/pi1/aos/remote_timestamps/pi2"
1129 : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping";
Austin Schuh4889b182020-11-18 19:11:56 -08001130 pi1_remote_timestamp_event_loop.MakeWatcher(
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001131 channel_name, [this, channel_name, ping_channel_index,
1132 &ping_timestamp_count](const RemoteMessage &header) {
1133 VLOG(1) << channel_name << " RemoteMessage "
Austin Schuh0de30f32020-12-06 12:44:28 -08001134 << aos::FlatbufferToJson(&header);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001135 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001136 if (shared() && header.channel_index() != ping_channel_index) {
1137 return;
Austin Schuh4889b182020-11-18 19:11:56 -08001138 }
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001139 CHECK_EQ(header.channel_index(), ping_channel_index);
1140 ++ping_timestamp_count;
Austin Schuh4889b182020-11-18 19:11:56 -08001141 });
1142
1143 // Before everything starts up, confirm there is no message.
1144 EXPECT_FALSE(ping_fetcher.Fetch());
1145 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
1146
1147 // Spin up the persistant pieces.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001148 StartPi1Client();
1149 StartPi2Server();
1150 StartPi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001151
1152 // Event used to wait for the timestamp counting thread to start.
1153 aos::Event event;
1154 std::thread pi1_remote_timestamp_thread(
1155 [&pi1_remote_timestamp_event_loop, &event]() {
1156 pi1_remote_timestamp_event_loop.OnRun([&event]() { event.Set(); });
1157 pi1_remote_timestamp_event_loop.Run();
1158 });
1159
1160 event.Wait();
1161
1162 {
1163 // Now, spin up a server for 2 seconds.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001164 MakePi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001165
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001166 RunPi1Server(chrono::milliseconds(2050));
Austin Schuh4889b182020-11-18 19:11:56 -08001167
1168 // Confirm there is no detected duplicate packet.
1169 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1170 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1171 ->Get(0)
1172 ->duplicate_packets(),
1173 0u);
1174
1175 EXPECT_TRUE(ping_fetcher.Fetch());
1176 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
1177 EXPECT_EQ(ping_timestamp_count, 1);
1178 LOG(INFO) << "Shutting down first pi1 MessageBridgeServer";
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001179
1180 StopPi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001181 }
1182
1183 {
1184 // Now, spin up a second server for 2 seconds.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001185 MakePi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001186
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001187 RunPi1Server(chrono::milliseconds(2050));
Austin Schuh4889b182020-11-18 19:11:56 -08001188
1189 // Confirm we detect the duplicate packet correctly.
1190 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1191 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1192 ->Get(0)
1193 ->duplicate_packets(),
1194 1u);
1195
1196 EXPECT_EQ(ping_timestamp_count, 1);
1197 EXPECT_FALSE(ping_fetcher.Fetch());
1198 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001199
1200 StopPi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001201 }
1202
1203 // Shut everyone else down
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001204 StopPi1Client();
1205 StopPi2Server();
1206 StopPi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001207 pi1_remote_timestamp_event_loop.Exit();
1208 pi1_remote_timestamp_thread.join();
Austin Schuh4889b182020-11-18 19:11:56 -08001209}
1210
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001211INSTANTIATE_TEST_CASE_P(
1212 MessageBridgeTests, MessageBridgeParameterizedTest,
1213 ::testing::Values(
1214 Param{"message_bridge_test_combined_timestamps_common_config.json",
1215 true},
1216 Param{"message_bridge_test_common_config.json", false}));
1217
Austin Schuhe84c3ed2019-12-14 15:29:48 -08001218} // namespace testing
1219} // namespace message_bridge
1220} // namespace aos