blob: e2dddc4ae31e526f873e8ad820c804ab85fcc31e [file] [log] [blame]
Austin Schuhe84c3ed2019-12-14 15:29:48 -08001#include <chrono>
2#include <thread>
3
Austin Schuh2f8fd752020-09-01 22:38:28 -07004#include "absl/strings/str_cat.h"
Austin Schuhe84c3ed2019-12-14 15:29:48 -08005#include "aos/events/ping_generated.h"
6#include "aos/events/pong_generated.h"
Brian Silverman7b266d92021-02-17 21:24:02 -08007#include "aos/ipc_lib/event.h"
Austin Schuhe84c3ed2019-12-14 15:29:48 -08008#include "aos/network/message_bridge_client_lib.h"
9#include "aos/network/message_bridge_server_lib.h"
Jim Ostrowski2192ddb2020-06-24 19:07:31 -070010#include "aos/network/team_number.h"
Austin Schuhe991fe22020-11-18 16:53:39 -080011#include "aos/util/file.h"
Brian Silverman7b266d92021-02-17 21:24:02 -080012#include "gtest/gtest.h"
Austin Schuhe84c3ed2019-12-14 15:29:48 -080013
14namespace aos {
Austin Schuh2f8fd752020-09-01 22:38:28 -070015void SetShmBase(const std::string_view base);
16
Austin Schuhe84c3ed2019-12-14 15:29:48 -080017namespace message_bridge {
18namespace testing {
19
20namespace chrono = std::chrono;
21
Austin Schuhe991fe22020-11-18 16:53:39 -080022std::string ShmBase(const std::string_view node) {
Austin Schuh2f8fd752020-09-01 22:38:28 -070023 const char *tmpdir_c_str = getenv("TEST_TMPDIR");
24 if (tmpdir_c_str != nullptr) {
Austin Schuhe991fe22020-11-18 16:53:39 -080025 return absl::StrCat(tmpdir_c_str, "/", node);
Austin Schuh2f8fd752020-09-01 22:38:28 -070026 } else {
Austin Schuhe991fe22020-11-18 16:53:39 -080027 return absl::StrCat("/dev/shm/", node);
Austin Schuh2f8fd752020-09-01 22:38:28 -070028 }
29}
30
Austin Schuhe991fe22020-11-18 16:53:39 -080031void DoSetShmBase(const std::string_view node) {
32 aos::SetShmBase(ShmBase(node));
33}
34
Austin Schuh36a2c3e2021-02-18 22:28:38 -080035// Parameters to run all the tests with.
36struct Param {
37 // The config file to use.
38 std::string config;
39 // If true, the RemoteMessage channel should be shared between all the remote
40 // channels. If false, there will be 1 RemoteMessage channel per remote
41 // channel.
42 bool shared;
43};
44
45class MessageBridgeParameterizedTest
46 : public ::testing::TestWithParam<struct Param> {
Austin Schuh0de30f32020-12-06 12:44:28 -080047 public:
Austin Schuh36a2c3e2021-02-18 22:28:38 -080048 MessageBridgeParameterizedTest()
49 : config(aos::configuration::ReadConfig(
50 absl::StrCat("aos/network/", GetParam().config))) {
Austin Schuh0de30f32020-12-06 12:44:28 -080051 util::UnlinkRecursive(ShmBase("pi1"));
52 util::UnlinkRecursive(ShmBase("pi2"));
53 }
Austin Schuhe991fe22020-11-18 16:53:39 -080054
Austin Schuh36a2c3e2021-02-18 22:28:38 -080055 bool shared() const { return GetParam().shared; }
56
Austin Schuh0a2f12f2021-01-08 22:48:29 -080057 void OnPi1() {
58 DoSetShmBase("pi1");
59 FLAGS_override_hostname = "raspberrypi";
60 }
61
62 void OnPi2() {
63 DoSetShmBase("pi2");
64 FLAGS_override_hostname = "raspberrypi2";
65 }
66
67 void MakePi1Server() {
68 OnPi1();
69 FLAGS_application_name = "pi1_message_bridge_server";
70 pi1_server_event_loop =
Austin Schuhf466ab52021-02-16 22:00:38 -080071 std::make_unique<aos::ShmEventLoop>(&config.message());
Austin Schuh0a2f12f2021-01-08 22:48:29 -080072 pi1_server_event_loop->SetRuntimeRealtimePriority(1);
73 pi1_message_bridge_server =
74 std::make_unique<MessageBridgeServer>(pi1_server_event_loop.get());
75 }
76
77 void RunPi1Server(chrono::nanoseconds duration) {
78 // Setup a shutdown callback.
79 aos::TimerHandler *const quit = pi1_server_event_loop->AddTimer(
80 [this]() { pi1_server_event_loop->Exit(); });
81 pi1_server_event_loop->OnRun([this, quit, duration]() {
82 // Stop between timestamps, not exactly on them.
83 quit->Setup(pi1_server_event_loop->monotonic_now() + duration);
84 });
85
86 pi1_server_event_loop->Run();
87 }
88
89 void StartPi1Server() {
90 pi1_server_thread = std::thread([this]() {
91 LOG(INFO) << "Started pi1_message_bridge_server";
92 pi1_server_event_loop->Run();
93 });
94 }
95
96 void StopPi1Server() {
97 if (pi1_server_thread.joinable()) {
98 pi1_server_event_loop->Exit();
99 pi1_server_thread.join();
100 pi1_server_thread = std::thread();
101 }
102 pi1_message_bridge_server.reset();
103 pi1_server_event_loop.reset();
104 }
105
106 void MakePi1Client() {
107 OnPi1();
108 FLAGS_application_name = "pi1_message_bridge_client";
109 pi1_client_event_loop =
Austin Schuhf466ab52021-02-16 22:00:38 -0800110 std::make_unique<aos::ShmEventLoop>(&config.message());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800111 pi1_client_event_loop->SetRuntimeRealtimePriority(1);
112 pi1_message_bridge_client =
113 std::make_unique<MessageBridgeClient>(pi1_client_event_loop.get());
114 }
115
116 void StartPi1Client() {
117 pi1_client_thread = std::thread([this]() {
118 LOG(INFO) << "Started pi1_message_bridge_client";
119 pi1_client_event_loop->Run();
120 });
121 }
122
123 void StopPi1Client() {
124 pi1_client_event_loop->Exit();
125 pi1_client_thread.join();
126 pi1_client_thread = std::thread();
127 pi1_message_bridge_client.reset();
128 pi1_client_event_loop.reset();
129 }
130
131 void MakePi1Test() {
132 OnPi1();
133 FLAGS_application_name = "test1";
134 pi1_test_event_loop =
Austin Schuhf466ab52021-02-16 22:00:38 -0800135 std::make_unique<aos::ShmEventLoop>(&config.message());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800136
137 pi1_test_event_loop->MakeWatcher(
138 "/pi1/aos", [](const ServerStatistics &stats) {
139 VLOG(1) << "/pi1/aos ServerStatistics " << FlatbufferToJson(&stats);
140 });
141
142 pi1_test_event_loop->MakeWatcher(
143 "/pi1/aos", [](const ClientStatistics &stats) {
144 VLOG(1) << "/pi1/aos ClientStatistics " << FlatbufferToJson(&stats);
145 });
146
147 pi1_test_event_loop->MakeWatcher(
148 "/pi1/aos", [](const Timestamp &timestamp) {
149 VLOG(1) << "/pi1/aos Timestamp " << FlatbufferToJson(&timestamp);
150 });
151 }
152
153 void StartPi1Test() {
154 pi1_test_thread = std::thread([this]() {
155 LOG(INFO) << "Started pi1_test";
156 pi1_test_event_loop->Run();
157 });
158 }
159
160 void StopPi1Test() {
161 pi1_test_event_loop->Exit();
162 pi1_test_thread.join();
163 }
164
165 void MakePi2Server() {
166 OnPi2();
167 FLAGS_application_name = "pi2_message_bridge_server";
168 pi2_server_event_loop =
Austin Schuhf466ab52021-02-16 22:00:38 -0800169 std::make_unique<aos::ShmEventLoop>(&config.message());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800170 pi2_server_event_loop->SetRuntimeRealtimePriority(1);
171 pi2_message_bridge_server =
172 std::make_unique<MessageBridgeServer>(pi2_server_event_loop.get());
173 }
174
175 void RunPi2Server(chrono::nanoseconds duration) {
176 // Setup a shutdown callback.
177 aos::TimerHandler *const quit = pi2_server_event_loop->AddTimer(
178 [this]() { pi2_server_event_loop->Exit(); });
179 pi2_server_event_loop->OnRun([this, quit, duration]() {
180 // Stop between timestamps, not exactly on them.
181 quit->Setup(pi2_server_event_loop->monotonic_now() + duration);
182 });
183
184 pi2_server_event_loop->Run();
185 }
186
187 void StartPi2Server() {
188 pi2_server_thread = std::thread([this]() {
189 LOG(INFO) << "Started pi2_message_bridge_server";
190 pi2_server_event_loop->Run();
191 });
192 }
193
194 void StopPi2Server() {
195 if (pi2_server_thread.joinable()) {
196 pi2_server_event_loop->Exit();
197 pi2_server_thread.join();
198 pi2_server_thread = std::thread();
199 }
200 pi2_message_bridge_server.reset();
201 pi2_server_event_loop.reset();
202 }
203
204 void MakePi2Client() {
205 OnPi2();
206 FLAGS_application_name = "pi2_message_bridge_client";
207 pi2_client_event_loop =
Austin Schuhf466ab52021-02-16 22:00:38 -0800208 std::make_unique<aos::ShmEventLoop>(&config.message());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800209 pi2_client_event_loop->SetRuntimeRealtimePriority(1);
210 pi2_message_bridge_client =
211 std::make_unique<MessageBridgeClient>(pi2_client_event_loop.get());
212 }
213
214 void RunPi2Client(chrono::nanoseconds duration) {
215 // Run for 5 seconds to make sure we have time to estimate the offset.
216 aos::TimerHandler *const quit = pi2_client_event_loop->AddTimer(
217 [this]() { pi2_client_event_loop->Exit(); });
218 pi2_client_event_loop->OnRun([this, quit, duration]() {
219 // Stop between timestamps, not exactly on them.
220 quit->Setup(pi2_client_event_loop->monotonic_now() + duration);
221 });
222
223 // And go!
224 pi2_client_event_loop->Run();
225 }
226
227 void StartPi2Client() {
228 pi2_client_thread = std::thread([this]() {
229 LOG(INFO) << "Started pi2_message_bridge_client";
230 pi2_client_event_loop->Run();
231 });
232 }
233
234 void StopPi2Client() {
235 if (pi2_client_thread.joinable()) {
236 pi2_client_event_loop->Exit();
237 pi2_client_thread.join();
238 pi2_client_thread = std::thread();
239 }
240 pi2_message_bridge_client.reset();
241 pi2_client_event_loop.reset();
242 }
243
244 void MakePi2Test() {
245 OnPi2();
246 FLAGS_application_name = "test2";
247 pi2_test_event_loop =
Austin Schuhf466ab52021-02-16 22:00:38 -0800248 std::make_unique<aos::ShmEventLoop>(&config.message());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800249
250 pi2_test_event_loop->MakeWatcher(
251 "/pi2/aos", [](const ServerStatistics &stats) {
252 VLOG(1) << "/pi2/aos ServerStatistics " << FlatbufferToJson(&stats);
253 });
254
255 pi2_test_event_loop->MakeWatcher(
256 "/pi2/aos", [](const ClientStatistics &stats) {
257 VLOG(1) << "/pi2/aos ClientStatistics " << FlatbufferToJson(&stats);
258 });
259
260 pi2_test_event_loop->MakeWatcher(
261 "/pi2/aos", [](const Timestamp &timestamp) {
262 VLOG(1) << "/pi2/aos Timestamp " << FlatbufferToJson(&timestamp);
263 });
264 }
265
266 void StartPi2Test() {
267 pi2_test_thread = std::thread([this]() {
268 LOG(INFO) << "Started pi2_message_bridge_test";
269 pi2_test_event_loop->Run();
270 });
271 }
272
273 void StopPi2Test() {
274 pi2_test_event_loop->Exit();
275 pi2_test_thread.join();
276 }
277
Austin Schuhf466ab52021-02-16 22:00:38 -0800278 aos::FlatbufferDetachedBuffer<aos::Configuration> config;
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800279
280 std::unique_ptr<aos::ShmEventLoop> pi1_server_event_loop;
281 std::unique_ptr<MessageBridgeServer> pi1_message_bridge_server;
282 std::thread pi1_server_thread;
283
284 std::unique_ptr<aos::ShmEventLoop> pi1_client_event_loop;
285 std::unique_ptr<MessageBridgeClient> pi1_message_bridge_client;
286 std::thread pi1_client_thread;
287
288 std::unique_ptr<aos::ShmEventLoop> pi1_test_event_loop;
289 std::thread pi1_test_thread;
290
291 std::unique_ptr<aos::ShmEventLoop> pi2_server_event_loop;
292 std::unique_ptr<MessageBridgeServer> pi2_message_bridge_server;
293 std::thread pi2_server_thread;
294
295 std::unique_ptr<aos::ShmEventLoop> pi2_client_event_loop;
296 std::unique_ptr<MessageBridgeClient> pi2_message_bridge_client;
297 std::thread pi2_client_thread;
298
299 std::unique_ptr<aos::ShmEventLoop> pi2_test_event_loop;
300 std::thread pi2_test_thread;
Austin Schuhe991fe22020-11-18 16:53:39 -0800301};
302
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800303// Test that we can send a ping message over sctp and receive it.
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800304TEST_P(MessageBridgeParameterizedTest, PingPong) {
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800305 // This is rather annoying to set up. We need to start up a client and
306 // server, on the same node, but get them to think that they are on different
307 // nodes.
308 //
309 // We then get to wait until they are connected.
310 //
311 // After they are connected, we send a Ping message.
312 //
313 // On the other end, we receive a Pong message.
314 //
315 // But, we need the client to not post directly to "/test" like it would in a
316 // real system, otherwise we will re-send the ping message... So, use an
317 // application specific map to have the client post somewhere else.
318 //
319 // To top this all off, each of these needs to be done with a ShmEventLoop,
320 // which needs to run in a separate thread... And it is really hard to get
321 // everything started up reliably. So just be super generous on timeouts and
322 // hope for the best. We can be more generous in the future if we need to.
323 //
324 // We are faking the application names by passing in --application_name=foo
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800325 OnPi1();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800326 // Force ourselves to be "raspberrypi" and allocate everything.
Austin Schuh2f8fd752020-09-01 22:38:28 -0700327
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800328 MakePi1Server();
329 MakePi1Client();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800330
331 // And build the app which sends the pings.
332 FLAGS_application_name = "ping";
Austin Schuhf466ab52021-02-16 22:00:38 -0800333 aos::ShmEventLoop ping_event_loop(&config.message());
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800334 aos::Sender<examples::Ping> ping_sender =
335 ping_event_loop.MakeSender<examples::Ping>("/test");
336
Austin Schuhf466ab52021-02-16 22:00:38 -0800337 aos::ShmEventLoop pi1_test_event_loop(&config.message());
Austin Schuh0de30f32020-12-06 12:44:28 -0800338 aos::Fetcher<RemoteMessage> message_header_fetcher1 =
339 pi1_test_event_loop.MakeFetcher<RemoteMessage>(
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800340 shared() ? "/pi1/aos/remote_timestamps/pi2"
341 : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping");
Austin Schuh2f8fd752020-09-01 22:38:28 -0700342
343 // Fetchers for confirming the remote timestamps made it.
344 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
345 ping_event_loop.MakeFetcher<examples::Ping>("/test");
346 aos::Fetcher<Timestamp> pi1_on_pi1_timestamp_fetcher =
347 ping_event_loop.MakeFetcher<Timestamp>("/aos");
348
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800349 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800350 OnPi2();
Austin Schuh2f8fd752020-09-01 22:38:28 -0700351
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800352 MakePi2Client();
353 MakePi2Server();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800354
355 // And build the app which sends the pongs.
356 FLAGS_application_name = "pong";
Austin Schuhf466ab52021-02-16 22:00:38 -0800357 aos::ShmEventLoop pong_event_loop(&config.message());
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800358
Austin Schuh7bc59052020-02-16 23:48:33 -0800359 // And build the app for testing.
360 FLAGS_application_name = "test";
Austin Schuhf466ab52021-02-16 22:00:38 -0800361 aos::ShmEventLoop test_event_loop(&config.message());
Austin Schuh7bc59052020-02-16 23:48:33 -0800362
363 aos::Fetcher<ClientStatistics> client_statistics_fetcher =
364 test_event_loop.MakeFetcher<ClientStatistics>("/aos");
Austin Schuh0de30f32020-12-06 12:44:28 -0800365 aos::Fetcher<RemoteMessage> message_header_fetcher2 =
366 test_event_loop.MakeFetcher<RemoteMessage>(
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800367 shared() ? "/pi2/aos/remote_timestamps/pi1"
368 : "/pi2/aos/remote_timestamps/pi1/pi2/aos/"
369 "aos-message_bridge-Timestamp");
Austin Schuh2f8fd752020-09-01 22:38:28 -0700370
371 // Event loop for fetching data delivered to pi2 from pi1 to match up
372 // messages.
Austin Schuhf466ab52021-02-16 22:00:38 -0800373 aos::ShmEventLoop delivered_messages_event_loop(&config.message());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700374 aos::Fetcher<Timestamp> pi1_on_pi2_timestamp_fetcher =
375 delivered_messages_event_loop.MakeFetcher<Timestamp>("/pi1/aos");
376 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
377 delivered_messages_event_loop.MakeFetcher<examples::Ping>("/test");
378 EXPECT_FALSE(ping_on_pi2_fetcher.Fetch());
379 EXPECT_FALSE(pi1_on_pi2_timestamp_fetcher.Fetch());
Austin Schuh7bc59052020-02-16 23:48:33 -0800380
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800381 // Count the pongs.
382 int pong_count = 0;
383 pong_event_loop.MakeWatcher(
Austin Schuh2f8fd752020-09-01 22:38:28 -0700384 "/test", [&pong_count](const examples::Ping &ping) {
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800385 ++pong_count;
Austin Schuh1ca49e92020-12-11 00:01:27 -0800386 VLOG(1) << "Got ping back " << FlatbufferToJson(&ping);
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800387 });
388
389 FLAGS_override_hostname = "";
390
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800391 // Wait until we are connected, then send.
392 int ping_count = 0;
Austin Schuh7bc59052020-02-16 23:48:33 -0800393 int pi1_server_statistics_count = 0;
Austin Schuh61e973f2021-02-21 21:43:56 -0800394 ping_event_loop.MakeWatcher("/pi1/aos", [this, &ping_count, &ping_sender,
395 &pi1_server_statistics_count](
396 const ServerStatistics &stats) {
397 VLOG(1) << "/pi1/aos ServerStatistics " << FlatbufferToJson(&stats);
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800398
Austin Schuh61e973f2021-02-21 21:43:56 -0800399 ASSERT_TRUE(stats.has_connections());
400 EXPECT_EQ(stats.connections()->size(), 1);
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800401
Austin Schuh61e973f2021-02-21 21:43:56 -0800402 bool connected = false;
403 for (const ServerConnection *connection : *stats.connections()) {
404 // Confirm that we are estimating the server time offset correctly. It
405 // should be about 0 since we are on the same machine here.
406 if (connection->has_monotonic_offset()) {
407 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
408 chrono::milliseconds(1));
409 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
410 chrono::milliseconds(-1));
411 ++pi1_server_statistics_count;
412 }
Austin Schuh7bc59052020-02-16 23:48:33 -0800413
Austin Schuh61e973f2021-02-21 21:43:56 -0800414 if (connection->node()->name()->string_view() ==
415 pi2_client_event_loop->node()->name()->string_view()) {
416 if (connection->state() == State::CONNECTED) {
417 EXPECT_TRUE(connection->has_boot_uuid());
418 connected = true;
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800419 }
Austin Schuh61e973f2021-02-21 21:43:56 -0800420 }
421 }
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800422
Austin Schuh61e973f2021-02-21 21:43:56 -0800423 if (connected) {
424 VLOG(1) << "Connected! Sent ping.";
425 auto builder = ping_sender.MakeBuilder();
426 examples::Ping::Builder ping_builder =
427 builder.MakeBuilder<examples::Ping>();
428 ping_builder.add_value(ping_count + 971);
429 builder.Send(ping_builder.Finish());
430 ++ping_count;
431 }
432 });
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800433
Austin Schuh7bc59052020-02-16 23:48:33 -0800434 // Confirm both client and server statistics messages have decent offsets in
435 // them.
436 int pi2_server_statistics_count = 0;
Austin Schuh196a4452020-03-15 23:12:03 -0700437 pong_event_loop.MakeWatcher("/pi2/aos", [&pi2_server_statistics_count](
Austin Schuh7bc59052020-02-16 23:48:33 -0800438 const ServerStatistics &stats) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800439 VLOG(1) << "/pi2/aos ServerStatistics " << FlatbufferToJson(&stats);
Austin Schuh7bc59052020-02-16 23:48:33 -0800440 for (const ServerConnection *connection : *stats.connections()) {
441 if (connection->has_monotonic_offset()) {
442 ++pi2_server_statistics_count;
443 // Confirm that we are estimating the server time offset correctly. It
444 // should be about 0 since we are on the same machine here.
445 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
446 chrono::milliseconds(1));
447 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
448 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800449 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh7bc59052020-02-16 23:48:33 -0800450 }
451 }
452 });
453
454 int pi1_client_statistics_count = 0;
Austin Schuh5344c352020-04-12 17:04:26 -0700455 ping_event_loop.MakeWatcher("/pi1/aos", [&pi1_client_statistics_count](
456 const ClientStatistics &stats) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800457 VLOG(1) << "/pi1/aos ClientStatistics " << FlatbufferToJson(&stats);
Austin Schuh7bc59052020-02-16 23:48:33 -0800458
Austin Schuh5344c352020-04-12 17:04:26 -0700459 for (const ClientConnection *connection : *stats.connections()) {
460 if (connection->has_monotonic_offset()) {
461 ++pi1_client_statistics_count;
462 // It takes at least 10 microseconds to send a message between the
463 // client and server. The min (filtered) time shouldn't be over 10
464 // milliseconds on localhost. This might have to bump up if this is
465 // proving flaky.
466 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
Austin Schuh3edddcc2020-12-29 13:32:02 -0800467 chrono::milliseconds(10))
468 << " " << connection->monotonic_offset()
469 << "ns vs 10000ns on iteration " << pi1_client_statistics_count;
Austin Schuh5344c352020-04-12 17:04:26 -0700470 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
Austin Schuh3edddcc2020-12-29 13:32:02 -0800471 chrono::microseconds(10))
472 << " " << connection->monotonic_offset()
473 << "ns vs 10000ns on iteration " << pi1_client_statistics_count;
Austin Schuh5344c352020-04-12 17:04:26 -0700474 }
475 }
476 });
Austin Schuh7bc59052020-02-16 23:48:33 -0800477
478 int pi2_client_statistics_count = 0;
Austin Schuh196a4452020-03-15 23:12:03 -0700479 pong_event_loop.MakeWatcher("/pi2/aos", [&pi2_client_statistics_count](
Austin Schuh7bc59052020-02-16 23:48:33 -0800480 const ClientStatistics &stats) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800481 VLOG(1) << "/pi2/aos ClientStatistics " << FlatbufferToJson(&stats);
Austin Schuh7bc59052020-02-16 23:48:33 -0800482
483 for (const ClientConnection *connection : *stats.connections()) {
484 if (connection->has_monotonic_offset()) {
485 ++pi2_client_statistics_count;
486 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
487 chrono::milliseconds(10));
488 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
489 chrono::microseconds(10));
490 }
491 }
492 });
493
Austin Schuh196a4452020-03-15 23:12:03 -0700494 ping_event_loop.MakeWatcher("/pi1/aos", [](const Timestamp &timestamp) {
Austin Schuh7bc59052020-02-16 23:48:33 -0800495 EXPECT_TRUE(timestamp.has_offsets());
Austin Schuh1ca49e92020-12-11 00:01:27 -0800496 VLOG(1) << "/pi1/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuh7bc59052020-02-16 23:48:33 -0800497 });
Austin Schuh196a4452020-03-15 23:12:03 -0700498 pong_event_loop.MakeWatcher("/pi2/aos", [](const Timestamp &timestamp) {
Austin Schuh7bc59052020-02-16 23:48:33 -0800499 EXPECT_TRUE(timestamp.has_offsets());
Austin Schuh1ca49e92020-12-11 00:01:27 -0800500 VLOG(1) << "/pi2/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuh7bc59052020-02-16 23:48:33 -0800501 });
502
503 // Run for 5 seconds to make sure we have time to estimate the offset.
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800504 aos::TimerHandler *quit = ping_event_loop.AddTimer(
505 [&ping_event_loop]() { ping_event_loop.Exit(); });
506 ping_event_loop.OnRun([quit, &ping_event_loop]() {
Austin Schuh7bc59052020-02-16 23:48:33 -0800507 // Stop between timestamps, not exactly on them.
508 quit->Setup(ping_event_loop.monotonic_now() + chrono::milliseconds(5050));
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800509 });
510
Austin Schuh2f8fd752020-09-01 22:38:28 -0700511 // Find the channel index for both the /pi1/aos Timestamp channel and Ping
512 // channel.
513 const size_t pi1_timestamp_channel = configuration::ChannelIndex(
514 pong_event_loop.configuration(), pi1_on_pi2_timestamp_fetcher.channel());
515 const size_t ping_timestamp_channel =
516 configuration::ChannelIndex(delivered_messages_event_loop.configuration(),
517 ping_on_pi2_fetcher.channel());
518
519 for (const Channel *channel : *ping_event_loop.configuration()->channels()) {
520 VLOG(1) << "Channel "
521 << configuration::ChannelIndex(ping_event_loop.configuration(),
522 channel)
523 << " " << configuration::CleanedChannelToString(channel);
524 }
525
526 // For each remote timestamp we get back, confirm that it is either a ping
527 // message, or a timestamp we sent out. Also confirm that the timestamps are
528 // correct.
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800529 for (std::pair<int, std::string> channel :
530 shared()
531 ? std::vector<std::pair<
532 int, std::string>>{{-1, "/pi1/aos/remote_timestamps/pi2"}}
533 : std::vector<std::pair<int, std::string>>{
534 {pi1_timestamp_channel,
535 "/pi1/aos/remote_timestamps/pi2/pi1/aos/"
536 "aos-message_bridge-Timestamp"},
537 {ping_timestamp_channel,
538 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping"}}) {
539 ping_event_loop.MakeWatcher(
540 channel.second,
541 [pi1_timestamp_channel, ping_timestamp_channel, &ping_on_pi2_fetcher,
542 &ping_on_pi1_fetcher, &pi1_on_pi2_timestamp_fetcher,
543 &pi1_on_pi1_timestamp_fetcher,
544 channel_index = channel.first](const RemoteMessage &header) {
545 VLOG(1) << "/pi1/aos/remote_timestamps/pi2 RemoteMessage "
546 << aos::FlatbufferToJson(&header);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700547
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800548 EXPECT_TRUE(header.has_boot_uuid());
549 if (channel_index != -1) {
550 ASSERT_EQ(channel_index, header.channel_index());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700551 }
552
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800553 const aos::monotonic_clock::time_point header_monotonic_sent_time(
554 chrono::nanoseconds(header.monotonic_sent_time()));
555 const aos::realtime_clock::time_point header_realtime_sent_time(
556 chrono::nanoseconds(header.realtime_sent_time()));
557 const aos::monotonic_clock::time_point header_monotonic_remote_time(
558 chrono::nanoseconds(header.monotonic_remote_time()));
559 const aos::realtime_clock::time_point header_realtime_remote_time(
560 chrono::nanoseconds(header.realtime_remote_time()));
561
562 const Context *pi1_context = nullptr;
563 const Context *pi2_context = nullptr;
564
565 if (header.channel_index() == pi1_timestamp_channel) {
566 // Find the forwarded message.
567 while (pi1_on_pi2_timestamp_fetcher.context().monotonic_event_time <
568 header_monotonic_sent_time) {
569 ASSERT_TRUE(pi1_on_pi2_timestamp_fetcher.FetchNext());
570 }
571
572 // And the source message.
573 while (pi1_on_pi1_timestamp_fetcher.context().monotonic_event_time <
574 header_monotonic_remote_time) {
575 ASSERT_TRUE(pi1_on_pi1_timestamp_fetcher.FetchNext());
576 }
577
578 pi1_context = &pi1_on_pi1_timestamp_fetcher.context();
579 pi2_context = &pi1_on_pi2_timestamp_fetcher.context();
580 } else if (header.channel_index() == ping_timestamp_channel) {
581 // Find the forwarded message.
582 while (ping_on_pi2_fetcher.context().monotonic_event_time <
583 header_monotonic_sent_time) {
584 ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
585 }
586
587 // And the source message.
588 while (ping_on_pi1_fetcher.context().monotonic_event_time <
589 header_monotonic_remote_time) {
590 ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
591 }
592
593 pi1_context = &ping_on_pi1_fetcher.context();
594 pi2_context = &ping_on_pi2_fetcher.context();
595 } else {
596 LOG(FATAL) << "Unknown channel";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700597 }
598
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800599 // Confirm the forwarded message has matching timestamps to the
600 // timestamps we got back.
601 EXPECT_EQ(pi2_context->queue_index, header.queue_index());
602 EXPECT_EQ(pi2_context->monotonic_event_time,
603 header_monotonic_sent_time);
604 EXPECT_EQ(pi2_context->realtime_event_time,
605 header_realtime_sent_time);
606 EXPECT_EQ(pi2_context->realtime_remote_time,
607 header_realtime_remote_time);
608 EXPECT_EQ(pi2_context->monotonic_remote_time,
609 header_monotonic_remote_time);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700610
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800611 // Confirm the forwarded message also matches the source message.
612 EXPECT_EQ(pi1_context->queue_index, header.queue_index());
613 EXPECT_EQ(pi1_context->monotonic_event_time,
614 header_monotonic_remote_time);
615 EXPECT_EQ(pi1_context->realtime_event_time,
616 header_realtime_remote_time);
617 });
618 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700619
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800620 // Start everything up. Pong is the only thing we don't know how to wait
621 // on, so start it first.
Austin Schuh7bc59052020-02-16 23:48:33 -0800622 std::thread pong_thread([&pong_event_loop]() { pong_event_loop.Run(); });
623
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800624 StartPi1Server();
625 StartPi1Client();
626 StartPi2Client();
627 StartPi2Server();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800628
629 // And go!
630 ping_event_loop.Run();
631
632 // Shut everyone else down
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800633 StopPi1Server();
634 StopPi1Client();
635 StopPi2Client();
636 StopPi2Server();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800637 pong_event_loop.Exit();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800638 pong_thread.join();
639
640 // Make sure we sent something.
641 EXPECT_GE(ping_count, 1);
642 // And got something back.
643 EXPECT_GE(pong_count, 1);
Austin Schuh7bc59052020-02-16 23:48:33 -0800644
645 // Confirm that we are estimating a monotonic offset on the client.
646 ASSERT_TRUE(client_statistics_fetcher.Fetch());
647
648 EXPECT_EQ(client_statistics_fetcher->connections()->size(), 1u);
649 EXPECT_EQ(client_statistics_fetcher->connections()
650 ->Get(0)
651 ->node()
652 ->name()
653 ->string_view(),
654 "pi1");
655
656 // Make sure the offset in one direction is less than a second.
657 EXPECT_GT(
658 client_statistics_fetcher->connections()->Get(0)->monotonic_offset(), 0);
659 EXPECT_LT(
660 client_statistics_fetcher->connections()->Get(0)->monotonic_offset(),
661 1000000000);
662
663 EXPECT_GE(pi1_server_statistics_count, 2);
664 EXPECT_GE(pi2_server_statistics_count, 2);
665 EXPECT_GE(pi1_client_statistics_count, 2);
666 EXPECT_GE(pi2_client_statistics_count, 2);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700667
668 // Confirm we got timestamps back!
669 EXPECT_TRUE(message_header_fetcher1.Fetch());
670 EXPECT_TRUE(message_header_fetcher2.Fetch());
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800671}
672
Austin Schuh5344c352020-04-12 17:04:26 -0700673// Test that the client disconnecting triggers the server offsets on both sides
674// to clear.
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800675TEST_P(MessageBridgeParameterizedTest, ClientRestart) {
Austin Schuh5344c352020-04-12 17:04:26 -0700676 // This is rather annoying to set up. We need to start up a client and
677 // server, on the same node, but get them to think that they are on different
678 // nodes.
679 //
680 // We need the client to not post directly to "/test" like it would in a
681 // real system, otherwise we will re-send the ping message... So, use an
682 // application specific map to have the client post somewhere else.
683 //
684 // To top this all off, each of these needs to be done with a ShmEventLoop,
685 // which needs to run in a separate thread... And it is really hard to get
686 // everything started up reliably. So just be super generous on timeouts and
687 // hope for the best. We can be more generous in the future if we need to.
688 //
689 // We are faking the application names by passing in --application_name=foo
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800690 OnPi1();
Austin Schuh5344c352020-04-12 17:04:26 -0700691
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800692 MakePi1Server();
693 MakePi1Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700694
695 // And build the app for testing.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800696 MakePi1Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700697 aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800698 pi1_test_event_loop->MakeFetcher<ServerStatistics>("/pi1/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700699
700 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800701 OnPi2();
702 MakePi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700703
704 // And build the app for testing.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800705 MakePi2Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700706 aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800707 pi2_test_event_loop->MakeFetcher<ServerStatistics>("/pi2/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700708
709 // Wait until we are connected, then send.
Austin Schuh5344c352020-04-12 17:04:26 -0700710
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800711 StartPi1Test();
712 StartPi2Test();
713 StartPi1Server();
714 StartPi1Client();
715 StartPi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700716
717 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800718 MakePi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700719
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800720 RunPi2Client(chrono::milliseconds(3050));
Austin Schuh5344c352020-04-12 17:04:26 -0700721
722 // Now confirm we are synchronized.
723 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
724 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
725
726 const ServerConnection *const pi1_connection =
727 pi1_server_statistics_fetcher->connections()->Get(0);
728 const ServerConnection *const pi2_connection =
729 pi2_server_statistics_fetcher->connections()->Get(0);
730
731 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
732 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
733 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
734 chrono::milliseconds(1));
735 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
736 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800737 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700738
739 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
740 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
741 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
742 chrono::milliseconds(1));
743 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
744 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800745 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800746
747 StopPi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700748 }
749
750 std::this_thread::sleep_for(std::chrono::seconds(2));
751
752 {
753 // Now confirm we are un-synchronized.
754 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
755 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
756 const ServerConnection *const pi1_connection =
757 pi1_server_statistics_fetcher->connections()->Get(0);
758 const ServerConnection *const pi2_connection =
759 pi2_server_statistics_fetcher->connections()->Get(0);
760
761 EXPECT_EQ(pi1_connection->state(), State::DISCONNECTED);
762 EXPECT_FALSE(pi1_connection->has_monotonic_offset());
Austin Schuh20ac95d2020-12-05 17:24:19 -0800763 EXPECT_FALSE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700764 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
765 EXPECT_FALSE(pi2_connection->has_monotonic_offset());
Austin Schuh20ac95d2020-12-05 17:24:19 -0800766 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700767 }
768
769 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800770 MakePi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700771 // And go!
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800772 RunPi2Client(chrono::milliseconds(3050));
Austin Schuh5344c352020-04-12 17:04:26 -0700773
774 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
775 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
776
777 // Now confirm we are synchronized again.
778 const ServerConnection *const pi1_connection =
779 pi1_server_statistics_fetcher->connections()->Get(0);
780 const ServerConnection *const pi2_connection =
781 pi2_server_statistics_fetcher->connections()->Get(0);
782
783 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
784 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
785 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
786 chrono::milliseconds(1));
787 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
788 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800789 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700790
791 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
792 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
793 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
794 chrono::milliseconds(1));
795 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
796 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800797 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800798
799 StopPi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700800 }
801
802 // Shut everyone else down
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800803 StopPi1Server();
804 StopPi1Client();
805 StopPi2Server();
806 StopPi1Test();
807 StopPi2Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700808}
809
810// Test that the server disconnecting triggers the server offsets on the other
811// side to clear, along with the other client.
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800812TEST_P(MessageBridgeParameterizedTest, ServerRestart) {
Austin Schuh5344c352020-04-12 17:04:26 -0700813 // This is rather annoying to set up. We need to start up a client and
814 // server, on the same node, but get them to think that they are on different
815 // nodes.
816 //
817 // We need the client to not post directly to "/test" like it would in a
818 // real system, otherwise we will re-send the ping message... So, use an
819 // application specific map to have the client post somewhere else.
820 //
821 // To top this all off, each of these needs to be done with a ShmEventLoop,
822 // which needs to run in a separate thread... And it is really hard to get
823 // everything started up reliably. So just be super generous on timeouts and
824 // hope for the best. We can be more generous in the future if we need to.
825 //
826 // We are faking the application names by passing in --application_name=foo
Austin Schuh5344c352020-04-12 17:04:26 -0700827 // Force ourselves to be "raspberrypi" and allocate everything.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800828 OnPi1();
829 MakePi1Server();
830 MakePi1Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700831
832 // And build the app for testing.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800833 MakePi1Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700834 aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800835 pi1_test_event_loop->MakeFetcher<ServerStatistics>("/pi1/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700836 aos::Fetcher<ClientStatistics> pi1_client_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800837 pi1_test_event_loop->MakeFetcher<ClientStatistics>("/pi1/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700838
839 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800840 OnPi2();
841 MakePi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700842
843 // And build the app for testing.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800844 MakePi2Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700845 aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800846 pi2_test_event_loop->MakeFetcher<ServerStatistics>("/pi2/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700847
848 // Start everything up. Pong is the only thing we don't know how to wait on,
849 // so start it first.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800850 StartPi1Test();
851 StartPi2Test();
852 StartPi1Server();
853 StartPi1Client();
854 StartPi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700855
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800856 // Confirm both client and server statistics messages have decent offsets in
857 // them.
Austin Schuh5344c352020-04-12 17:04:26 -0700858
859 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800860 MakePi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700861
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800862 RunPi2Server(chrono::milliseconds(3050));
Austin Schuh5344c352020-04-12 17:04:26 -0700863
864 // Now confirm we are synchronized.
865 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
866 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
867
868 const ServerConnection *const pi1_connection =
869 pi1_server_statistics_fetcher->connections()->Get(0);
870 const ServerConnection *const pi2_connection =
871 pi2_server_statistics_fetcher->connections()->Get(0);
872
873 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
874 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
875 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
876 chrono::milliseconds(1));
877 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
878 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800879 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700880
881 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
882 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
883 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
884 chrono::milliseconds(1));
885 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
886 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800887 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800888
889 StopPi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700890 }
891
892 std::this_thread::sleep_for(std::chrono::seconds(2));
893
894 {
895 // And confirm we are unsynchronized.
896 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
897 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
898
899 const ServerConnection *const pi1_server_connection =
900 pi1_server_statistics_fetcher->connections()->Get(0);
901 const ClientConnection *const pi1_client_connection =
902 pi1_client_statistics_fetcher->connections()->Get(0);
903
904 EXPECT_EQ(pi1_server_connection->state(), State::CONNECTED);
905 EXPECT_FALSE(pi1_server_connection->has_monotonic_offset());
Austin Schuh20ac95d2020-12-05 17:24:19 -0800906 EXPECT_TRUE(pi1_server_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700907 EXPECT_EQ(pi1_client_connection->state(), State::DISCONNECTED);
908 EXPECT_FALSE(pi1_client_connection->has_monotonic_offset());
909 }
910
911 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800912 MakePi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700913
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800914 RunPi2Server(chrono::milliseconds(3050));
Austin Schuh5344c352020-04-12 17:04:26 -0700915
916 // And confirm we are synchronized again.
917 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
918 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
919
920 const ServerConnection *const pi1_connection =
921 pi1_server_statistics_fetcher->connections()->Get(0);
922 const ServerConnection *const pi2_connection =
923 pi2_server_statistics_fetcher->connections()->Get(0);
924
925 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
926 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
927 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
928 chrono::milliseconds(1));
929 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
930 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800931 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700932
933 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
934 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
935 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
936 chrono::milliseconds(1));
937 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
938 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800939 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800940
941 StopPi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700942 }
943
944 // Shut everyone else down
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800945 StopPi1Server();
946 StopPi1Client();
947 StopPi2Client();
948 StopPi1Test();
949 StopPi2Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700950}
951
Austin Schuh4889b182020-11-18 19:11:56 -0800952// TODO(austin): The above test confirms that the external state does the right
Austin Schuh5344c352020-04-12 17:04:26 -0700953// thing, but doesn't confirm that the internal state does. We either need to
954// expose a way to check the state in a thread-safe way, or need a way to jump
955// time for one node to do that.
956
Austin Schuh4889b182020-11-18 19:11:56 -0800957void SendPing(aos::Sender<examples::Ping> *sender, int value) {
958 aos::Sender<examples::Ping>::Builder builder = sender->MakeBuilder();
959 examples::Ping::Builder ping_builder = builder.MakeBuilder<examples::Ping>();
960 ping_builder.add_value(value);
961 builder.Send(ping_builder.Finish());
962}
963
964// Tests that when a message is sent before the bridge starts up, but is
965// configured as reliable, we forward it. Confirm this survives a client reset.
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800966TEST_P(MessageBridgeParameterizedTest, ReliableSentBeforeClientStartup) {
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800967 OnPi1();
Austin Schuh4889b182020-11-18 19:11:56 -0800968
969 FLAGS_application_name = "sender";
Austin Schuhf466ab52021-02-16 22:00:38 -0800970 aos::ShmEventLoop send_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -0800971 aos::Sender<examples::Ping> ping_sender =
972 send_event_loop.MakeSender<examples::Ping>("/test");
973 SendPing(&ping_sender, 1);
974 aos::Sender<examples::Ping> unreliable_ping_sender =
975 send_event_loop.MakeSender<examples::Ping>("/unreliable");
976 SendPing(&unreliable_ping_sender, 1);
977
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800978 MakePi1Server();
979 MakePi1Client();
Austin Schuh4889b182020-11-18 19:11:56 -0800980
981 FLAGS_application_name = "pi1_timestamp";
Austin Schuhf466ab52021-02-16 22:00:38 -0800982 aos::ShmEventLoop pi1_remote_timestamp_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -0800983
984 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800985 OnPi2();
Austin Schuh4889b182020-11-18 19:11:56 -0800986
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800987 MakePi2Server();
Austin Schuh4889b182020-11-18 19:11:56 -0800988
Austin Schuhf466ab52021-02-16 22:00:38 -0800989 aos::ShmEventLoop receive_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -0800990 aos::Fetcher<examples::Ping> ping_fetcher =
991 receive_event_loop.MakeFetcher<examples::Ping>("/test");
992 aos::Fetcher<examples::Ping> unreliable_ping_fetcher =
993 receive_event_loop.MakeFetcher<examples::Ping>("/unreliable");
994 aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
995 receive_event_loop.MakeFetcher<ClientStatistics>("/pi2/aos");
996
997 const size_t ping_channel_index = configuration::ChannelIndex(
998 receive_event_loop.configuration(), ping_fetcher.channel());
999
1000 std::atomic<int> ping_timestamp_count{0};
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001001 const std::string channel_name =
1002 shared() ? "/pi1/aos/remote_timestamps/pi2"
1003 : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping";
Austin Schuh4889b182020-11-18 19:11:56 -08001004 pi1_remote_timestamp_event_loop.MakeWatcher(
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001005 channel_name, [this, channel_name, ping_channel_index,
1006 &ping_timestamp_count](const RemoteMessage &header) {
Austin Schuh61e973f2021-02-21 21:43:56 -08001007 VLOG(1) << channel_name << " RemoteMessage "
Austin Schuh0de30f32020-12-06 12:44:28 -08001008 << aos::FlatbufferToJson(&header);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001009 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001010 if (shared() && header.channel_index() != ping_channel_index) {
1011 return;
Austin Schuh4889b182020-11-18 19:11:56 -08001012 }
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001013 CHECK_EQ(header.channel_index(), ping_channel_index);
1014 ++ping_timestamp_count;
Austin Schuh4889b182020-11-18 19:11:56 -08001015 });
1016
1017 // Before everything starts up, confirm there is no message.
1018 EXPECT_FALSE(ping_fetcher.Fetch());
1019 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
1020
1021 // Spin up the persistant pieces.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001022 StartPi1Server();
1023 StartPi1Client();
1024 StartPi2Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001025
1026 // Event used to wait for the timestamp counting thread to start.
1027 aos::Event event;
1028 std::thread pi1_remote_timestamp_thread(
1029 [&pi1_remote_timestamp_event_loop, &event]() {
1030 pi1_remote_timestamp_event_loop.OnRun([&event]() { event.Set(); });
1031 pi1_remote_timestamp_event_loop.Run();
1032 });
1033
1034 event.Wait();
1035
1036 {
1037 // Now, spin up a client for 2 seconds.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001038 MakePi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001039
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001040 RunPi2Client(chrono::milliseconds(2050));
Austin Schuh4889b182020-11-18 19:11:56 -08001041
1042 // Confirm there is no detected duplicate packet.
1043 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1044 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1045 ->Get(0)
1046 ->duplicate_packets(),
1047 0u);
1048
1049 EXPECT_TRUE(ping_fetcher.Fetch());
1050 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
1051 EXPECT_EQ(ping_timestamp_count, 1);
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001052
1053 StopPi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001054 }
1055
1056 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001057 // Now, spin up a client for 2 seconds.
1058 MakePi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001059
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001060 RunPi2Client(chrono::milliseconds(5050));
Austin Schuh4889b182020-11-18 19:11:56 -08001061
1062 // Confirm we detect the duplicate packet correctly.
1063 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1064 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1065 ->Get(0)
1066 ->duplicate_packets(),
1067 1u);
1068
1069 EXPECT_EQ(ping_timestamp_count, 1);
1070 EXPECT_FALSE(ping_fetcher.Fetch());
1071 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001072
1073 StopPi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001074 }
1075
1076 // Shut everyone else down
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001077 StopPi1Client();
1078 StopPi2Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001079 pi1_remote_timestamp_event_loop.Exit();
1080 pi1_remote_timestamp_thread.join();
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001081 StopPi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001082}
1083
1084// Tests that when a message is sent before the bridge starts up, but is
1085// configured as reliable, we forward it. Confirm this works across server
1086// resets.
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001087TEST_P(MessageBridgeParameterizedTest, ReliableSentBeforeServerStartup) {
Austin Schuh4889b182020-11-18 19:11:56 -08001088 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001089 OnPi2();
Austin Schuh4889b182020-11-18 19:11:56 -08001090
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001091 MakePi2Server();
1092 MakePi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001093
Austin Schuhf466ab52021-02-16 22:00:38 -08001094 aos::ShmEventLoop receive_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -08001095 aos::Fetcher<examples::Ping> ping_fetcher =
1096 receive_event_loop.MakeFetcher<examples::Ping>("/test");
1097 aos::Fetcher<examples::Ping> unreliable_ping_fetcher =
1098 receive_event_loop.MakeFetcher<examples::Ping>("/unreliable");
1099 aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
1100 receive_event_loop.MakeFetcher<ClientStatistics>("/pi2/aos");
1101
Austin Schuh4889b182020-11-18 19:11:56 -08001102 // Force ourselves to be "raspberrypi" and allocate everything.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001103 OnPi1();
Austin Schuh4889b182020-11-18 19:11:56 -08001104
1105 FLAGS_application_name = "sender";
Austin Schuhf466ab52021-02-16 22:00:38 -08001106 aos::ShmEventLoop send_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -08001107 aos::Sender<examples::Ping> ping_sender =
1108 send_event_loop.MakeSender<examples::Ping>("/test");
1109 {
1110 aos::Sender<examples::Ping>::Builder builder = ping_sender.MakeBuilder();
1111 examples::Ping::Builder ping_builder =
1112 builder.MakeBuilder<examples::Ping>();
1113 ping_builder.add_value(1);
1114 builder.Send(ping_builder.Finish());
1115 }
1116
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001117 MakePi1Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001118
1119 FLAGS_application_name = "pi1_timestamp";
Austin Schuhf466ab52021-02-16 22:00:38 -08001120 aos::ShmEventLoop pi1_remote_timestamp_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -08001121
1122 const size_t ping_channel_index = configuration::ChannelIndex(
1123 receive_event_loop.configuration(), ping_fetcher.channel());
1124
1125 std::atomic<int> ping_timestamp_count{0};
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001126 const std::string channel_name =
1127 shared() ? "/pi1/aos/remote_timestamps/pi2"
1128 : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping";
Austin Schuh4889b182020-11-18 19:11:56 -08001129 pi1_remote_timestamp_event_loop.MakeWatcher(
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001130 channel_name, [this, channel_name, ping_channel_index,
1131 &ping_timestamp_count](const RemoteMessage &header) {
1132 VLOG(1) << channel_name << " RemoteMessage "
Austin Schuh0de30f32020-12-06 12:44:28 -08001133 << aos::FlatbufferToJson(&header);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001134 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001135 if (shared() && header.channel_index() != ping_channel_index) {
1136 return;
Austin Schuh4889b182020-11-18 19:11:56 -08001137 }
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001138 CHECK_EQ(header.channel_index(), ping_channel_index);
1139 ++ping_timestamp_count;
Austin Schuh4889b182020-11-18 19:11:56 -08001140 });
1141
1142 // Before everything starts up, confirm there is no message.
1143 EXPECT_FALSE(ping_fetcher.Fetch());
1144 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
1145
1146 // Spin up the persistant pieces.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001147 StartPi1Client();
1148 StartPi2Server();
1149 StartPi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001150
1151 // Event used to wait for the timestamp counting thread to start.
1152 aos::Event event;
1153 std::thread pi1_remote_timestamp_thread(
1154 [&pi1_remote_timestamp_event_loop, &event]() {
1155 pi1_remote_timestamp_event_loop.OnRun([&event]() { event.Set(); });
1156 pi1_remote_timestamp_event_loop.Run();
1157 });
1158
1159 event.Wait();
1160
1161 {
1162 // Now, spin up a server for 2 seconds.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001163 MakePi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001164
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001165 RunPi1Server(chrono::milliseconds(2050));
Austin Schuh4889b182020-11-18 19:11:56 -08001166
1167 // Confirm there is no detected duplicate packet.
1168 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1169 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1170 ->Get(0)
1171 ->duplicate_packets(),
1172 0u);
1173
1174 EXPECT_TRUE(ping_fetcher.Fetch());
1175 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
1176 EXPECT_EQ(ping_timestamp_count, 1);
1177 LOG(INFO) << "Shutting down first pi1 MessageBridgeServer";
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001178
1179 StopPi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001180 }
1181
1182 {
1183 // Now, spin up a second server for 2 seconds.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001184 MakePi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001185
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001186 RunPi1Server(chrono::milliseconds(2050));
Austin Schuh4889b182020-11-18 19:11:56 -08001187
1188 // Confirm we detect the duplicate packet correctly.
1189 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1190 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1191 ->Get(0)
1192 ->duplicate_packets(),
1193 1u);
1194
1195 EXPECT_EQ(ping_timestamp_count, 1);
1196 EXPECT_FALSE(ping_fetcher.Fetch());
1197 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001198
1199 StopPi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001200 }
1201
1202 // Shut everyone else down
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001203 StopPi1Client();
1204 StopPi2Server();
1205 StopPi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001206 pi1_remote_timestamp_event_loop.Exit();
1207 pi1_remote_timestamp_thread.join();
Austin Schuh4889b182020-11-18 19:11:56 -08001208}
1209
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001210INSTANTIATE_TEST_CASE_P(
1211 MessageBridgeTests, MessageBridgeParameterizedTest,
1212 ::testing::Values(
1213 Param{"message_bridge_test_combined_timestamps_common_config.json",
1214 true},
1215 Param{"message_bridge_test_common_config.json", false}));
1216
Austin Schuhe84c3ed2019-12-14 15:29:48 -08001217} // namespace testing
1218} // namespace message_bridge
1219} // namespace aos