blob: 153f1e5347cdba334680d6b434aabc8f6602a333 [file] [log] [blame]
Austin Schuhe84c3ed2019-12-14 15:29:48 -08001#include <chrono>
2#include <thread>
3
Austin Schuh2f8fd752020-09-01 22:38:28 -07004#include "absl/strings/str_cat.h"
Philipp Schrader790cb542023-07-05 21:06:52 -07005#include "gtest/gtest.h"
6
Austin Schuhe84c3ed2019-12-14 15:29:48 -08007#include "aos/events/ping_generated.h"
8#include "aos/events/pong_generated.h"
Brian Silverman7b266d92021-02-17 21:24:02 -08009#include "aos/ipc_lib/event.h"
Austin Schuhe84c3ed2019-12-14 15:29:48 -080010#include "aos/network/message_bridge_client_lib.h"
11#include "aos/network/message_bridge_server_lib.h"
Jim Ostrowski2192ddb2020-06-24 19:07:31 -070012#include "aos/network/team_number.h"
Austin Schuhb0e439d2023-05-15 10:55:40 -070013#include "aos/sha256.h"
Austin Schuh373f1762021-06-02 21:07:09 -070014#include "aos/testing/path.h"
Austin Schuhe991fe22020-11-18 16:53:39 -080015#include "aos/util/file.h"
Austin Schuhe84c3ed2019-12-14 15:29:48 -080016
Austin Schuh8902fa52021-03-14 22:39:24 -070017DECLARE_string(boot_uuid);
18
Austin Schuhe84c3ed2019-12-14 15:29:48 -080019namespace aos {
Austin Schuh2f8fd752020-09-01 22:38:28 -070020void SetShmBase(const std::string_view base);
21
Austin Schuhe84c3ed2019-12-14 15:29:48 -080022namespace message_bridge {
23namespace testing {
24
Austin Schuh373f1762021-06-02 21:07:09 -070025using aos::testing::ArtifactPath;
26
Austin Schuhe84c3ed2019-12-14 15:29:48 -080027namespace chrono = std::chrono;
28
Austin Schuhe991fe22020-11-18 16:53:39 -080029std::string ShmBase(const std::string_view node) {
Austin Schuh2f8fd752020-09-01 22:38:28 -070030 const char *tmpdir_c_str = getenv("TEST_TMPDIR");
31 if (tmpdir_c_str != nullptr) {
Austin Schuhe991fe22020-11-18 16:53:39 -080032 return absl::StrCat(tmpdir_c_str, "/", node);
Austin Schuh2f8fd752020-09-01 22:38:28 -070033 } else {
Austin Schuhe991fe22020-11-18 16:53:39 -080034 return absl::StrCat("/dev/shm/", node);
Austin Schuh2f8fd752020-09-01 22:38:28 -070035 }
36}
37
Austin Schuhe991fe22020-11-18 16:53:39 -080038void DoSetShmBase(const std::string_view node) {
39 aos::SetShmBase(ShmBase(node));
40}
41
Austin Schuh36a2c3e2021-02-18 22:28:38 -080042// Parameters to run all the tests with.
43struct Param {
44 // The config file to use.
45 std::string config;
46 // If true, the RemoteMessage channel should be shared between all the remote
47 // channels. If false, there will be 1 RemoteMessage channel per remote
48 // channel.
49 bool shared;
50};
51
52class MessageBridgeParameterizedTest
53 : public ::testing::TestWithParam<struct Param> {
Austin Schuh0de30f32020-12-06 12:44:28 -080054 public:
Austin Schuh36a2c3e2021-02-18 22:28:38 -080055 MessageBridgeParameterizedTest()
56 : config(aos::configuration::ReadConfig(
Austin Schuh373f1762021-06-02 21:07:09 -070057 ArtifactPath(absl::StrCat("aos/network/", GetParam().config)))),
Austin Schuhb0e439d2023-05-15 10:55:40 -070058 config_sha256(Sha256(config.span())),
Austin Schuh8902fa52021-03-14 22:39:24 -070059 pi1_boot_uuid_(UUID::Random()),
60 pi2_boot_uuid_(UUID::Random()) {
Austin Schuh0de30f32020-12-06 12:44:28 -080061 util::UnlinkRecursive(ShmBase("pi1"));
62 util::UnlinkRecursive(ShmBase("pi2"));
63 }
Austin Schuhe991fe22020-11-18 16:53:39 -080064
Austin Schuh36a2c3e2021-02-18 22:28:38 -080065 bool shared() const { return GetParam().shared; }
66
Austin Schuh0a2f12f2021-01-08 22:48:29 -080067 void OnPi1() {
68 DoSetShmBase("pi1");
69 FLAGS_override_hostname = "raspberrypi";
Austin Schuh8902fa52021-03-14 22:39:24 -070070 FLAGS_boot_uuid = pi1_boot_uuid_.ToString();
Austin Schuh0a2f12f2021-01-08 22:48:29 -080071 }
72
73 void OnPi2() {
74 DoSetShmBase("pi2");
75 FLAGS_override_hostname = "raspberrypi2";
Austin Schuh8902fa52021-03-14 22:39:24 -070076 FLAGS_boot_uuid = pi2_boot_uuid_.ToString();
Austin Schuh0a2f12f2021-01-08 22:48:29 -080077 }
78
Austin Schuhb0e439d2023-05-15 10:55:40 -070079 void MakePi1Server(std::string server_config_sha256 = "") {
Austin Schuh0a2f12f2021-01-08 22:48:29 -080080 OnPi1();
81 FLAGS_application_name = "pi1_message_bridge_server";
82 pi1_server_event_loop =
Austin Schuhf466ab52021-02-16 22:00:38 -080083 std::make_unique<aos::ShmEventLoop>(&config.message());
Austin Schuh0a2f12f2021-01-08 22:48:29 -080084 pi1_server_event_loop->SetRuntimeRealtimePriority(1);
Austin Schuhb0e439d2023-05-15 10:55:40 -070085 pi1_message_bridge_server = std::make_unique<MessageBridgeServer>(
86 pi1_server_event_loop.get(), server_config_sha256.size() == 0
87 ? config_sha256
88 : server_config_sha256);
Austin Schuh0a2f12f2021-01-08 22:48:29 -080089 }
90
91 void RunPi1Server(chrono::nanoseconds duration) {
92 // Setup a shutdown callback.
93 aos::TimerHandler *const quit = pi1_server_event_loop->AddTimer(
94 [this]() { pi1_server_event_loop->Exit(); });
95 pi1_server_event_loop->OnRun([this, quit, duration]() {
96 // Stop between timestamps, not exactly on them.
97 quit->Setup(pi1_server_event_loop->monotonic_now() + duration);
98 });
99
100 pi1_server_event_loop->Run();
101 }
102
103 void StartPi1Server() {
104 pi1_server_thread = std::thread([this]() {
105 LOG(INFO) << "Started pi1_message_bridge_server";
106 pi1_server_event_loop->Run();
107 });
108 }
109
110 void StopPi1Server() {
111 if (pi1_server_thread.joinable()) {
112 pi1_server_event_loop->Exit();
113 pi1_server_thread.join();
114 pi1_server_thread = std::thread();
115 }
116 pi1_message_bridge_server.reset();
117 pi1_server_event_loop.reset();
118 }
119
120 void MakePi1Client() {
121 OnPi1();
122 FLAGS_application_name = "pi1_message_bridge_client";
123 pi1_client_event_loop =
Austin Schuhf466ab52021-02-16 22:00:38 -0800124 std::make_unique<aos::ShmEventLoop>(&config.message());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800125 pi1_client_event_loop->SetRuntimeRealtimePriority(1);
Austin Schuhb0e439d2023-05-15 10:55:40 -0700126 pi1_message_bridge_client = std::make_unique<MessageBridgeClient>(
127 pi1_client_event_loop.get(), config_sha256);
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800128 }
129
130 void StartPi1Client() {
131 pi1_client_thread = std::thread([this]() {
132 LOG(INFO) << "Started pi1_message_bridge_client";
133 pi1_client_event_loop->Run();
134 });
135 }
136
137 void StopPi1Client() {
138 pi1_client_event_loop->Exit();
139 pi1_client_thread.join();
140 pi1_client_thread = std::thread();
141 pi1_message_bridge_client.reset();
142 pi1_client_event_loop.reset();
143 }
144
145 void MakePi1Test() {
146 OnPi1();
147 FLAGS_application_name = "test1";
148 pi1_test_event_loop =
Austin Schuhf466ab52021-02-16 22:00:38 -0800149 std::make_unique<aos::ShmEventLoop>(&config.message());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800150
151 pi1_test_event_loop->MakeWatcher(
152 "/pi1/aos", [](const ServerStatistics &stats) {
153 VLOG(1) << "/pi1/aos ServerStatistics " << FlatbufferToJson(&stats);
154 });
155
156 pi1_test_event_loop->MakeWatcher(
157 "/pi1/aos", [](const ClientStatistics &stats) {
158 VLOG(1) << "/pi1/aos ClientStatistics " << FlatbufferToJson(&stats);
159 });
160
161 pi1_test_event_loop->MakeWatcher(
162 "/pi1/aos", [](const Timestamp &timestamp) {
163 VLOG(1) << "/pi1/aos Timestamp " << FlatbufferToJson(&timestamp);
164 });
Austin Schuh8902fa52021-03-14 22:39:24 -0700165 pi1_test_event_loop->MakeWatcher(
166 "/pi2/aos", [this](const Timestamp &timestamp) {
167 VLOG(1) << "/pi2/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuha9012be2021-07-21 15:19:11 -0700168 EXPECT_EQ(pi1_test_event_loop->context().source_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -0700169 pi2_boot_uuid_);
170 });
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800171 }
172
173 void StartPi1Test() {
174 pi1_test_thread = std::thread([this]() {
175 LOG(INFO) << "Started pi1_test";
176 pi1_test_event_loop->Run();
177 });
178 }
179
180 void StopPi1Test() {
181 pi1_test_event_loop->Exit();
182 pi1_test_thread.join();
183 }
184
185 void MakePi2Server() {
186 OnPi2();
187 FLAGS_application_name = "pi2_message_bridge_server";
188 pi2_server_event_loop =
Austin Schuhf466ab52021-02-16 22:00:38 -0800189 std::make_unique<aos::ShmEventLoop>(&config.message());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800190 pi2_server_event_loop->SetRuntimeRealtimePriority(1);
Austin Schuhb0e439d2023-05-15 10:55:40 -0700191 pi2_message_bridge_server = std::make_unique<MessageBridgeServer>(
192 pi2_server_event_loop.get(), config_sha256);
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800193 }
194
195 void RunPi2Server(chrono::nanoseconds duration) {
196 // Setup a shutdown callback.
197 aos::TimerHandler *const quit = pi2_server_event_loop->AddTimer(
198 [this]() { pi2_server_event_loop->Exit(); });
199 pi2_server_event_loop->OnRun([this, quit, duration]() {
200 // Stop between timestamps, not exactly on them.
201 quit->Setup(pi2_server_event_loop->monotonic_now() + duration);
202 });
203
204 pi2_server_event_loop->Run();
205 }
206
207 void StartPi2Server() {
208 pi2_server_thread = std::thread([this]() {
209 LOG(INFO) << "Started pi2_message_bridge_server";
210 pi2_server_event_loop->Run();
211 });
212 }
213
214 void StopPi2Server() {
215 if (pi2_server_thread.joinable()) {
216 pi2_server_event_loop->Exit();
217 pi2_server_thread.join();
218 pi2_server_thread = std::thread();
219 }
220 pi2_message_bridge_server.reset();
221 pi2_server_event_loop.reset();
222 }
223
224 void MakePi2Client() {
225 OnPi2();
226 FLAGS_application_name = "pi2_message_bridge_client";
227 pi2_client_event_loop =
Austin Schuhf466ab52021-02-16 22:00:38 -0800228 std::make_unique<aos::ShmEventLoop>(&config.message());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800229 pi2_client_event_loop->SetRuntimeRealtimePriority(1);
Austin Schuhb0e439d2023-05-15 10:55:40 -0700230 pi2_message_bridge_client = std::make_unique<MessageBridgeClient>(
231 pi2_client_event_loop.get(), config_sha256);
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800232 }
233
234 void RunPi2Client(chrono::nanoseconds duration) {
235 // Run for 5 seconds to make sure we have time to estimate the offset.
236 aos::TimerHandler *const quit = pi2_client_event_loop->AddTimer(
237 [this]() { pi2_client_event_loop->Exit(); });
238 pi2_client_event_loop->OnRun([this, quit, duration]() {
239 // Stop between timestamps, not exactly on them.
240 quit->Setup(pi2_client_event_loop->monotonic_now() + duration);
241 });
242
243 // And go!
244 pi2_client_event_loop->Run();
245 }
246
247 void StartPi2Client() {
248 pi2_client_thread = std::thread([this]() {
249 LOG(INFO) << "Started pi2_message_bridge_client";
250 pi2_client_event_loop->Run();
251 });
252 }
253
254 void StopPi2Client() {
255 if (pi2_client_thread.joinable()) {
256 pi2_client_event_loop->Exit();
257 pi2_client_thread.join();
258 pi2_client_thread = std::thread();
259 }
260 pi2_message_bridge_client.reset();
261 pi2_client_event_loop.reset();
262 }
263
264 void MakePi2Test() {
265 OnPi2();
266 FLAGS_application_name = "test2";
267 pi2_test_event_loop =
Austin Schuhf466ab52021-02-16 22:00:38 -0800268 std::make_unique<aos::ShmEventLoop>(&config.message());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800269
270 pi2_test_event_loop->MakeWatcher(
271 "/pi2/aos", [](const ServerStatistics &stats) {
272 VLOG(1) << "/pi2/aos ServerStatistics " << FlatbufferToJson(&stats);
273 });
274
275 pi2_test_event_loop->MakeWatcher(
276 "/pi2/aos", [](const ClientStatistics &stats) {
277 VLOG(1) << "/pi2/aos ClientStatistics " << FlatbufferToJson(&stats);
278 });
279
280 pi2_test_event_loop->MakeWatcher(
Austin Schuh8902fa52021-03-14 22:39:24 -0700281 "/pi1/aos", [this](const Timestamp &timestamp) {
282 VLOG(1) << "/pi1/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuha9012be2021-07-21 15:19:11 -0700283 EXPECT_EQ(pi2_test_event_loop->context().source_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -0700284 pi1_boot_uuid_);
285 });
286 pi2_test_event_loop->MakeWatcher(
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800287 "/pi2/aos", [](const Timestamp &timestamp) {
288 VLOG(1) << "/pi2/aos Timestamp " << FlatbufferToJson(&timestamp);
289 });
290 }
291
292 void StartPi2Test() {
293 pi2_test_thread = std::thread([this]() {
294 LOG(INFO) << "Started pi2_message_bridge_test";
295 pi2_test_event_loop->Run();
296 });
297 }
298
299 void StopPi2Test() {
300 pi2_test_event_loop->Exit();
301 pi2_test_thread.join();
302 }
303
Austin Schuhf466ab52021-02-16 22:00:38 -0800304 aos::FlatbufferDetachedBuffer<aos::Configuration> config;
Austin Schuhb0e439d2023-05-15 10:55:40 -0700305 std::string config_sha256;
306
Austin Schuh8902fa52021-03-14 22:39:24 -0700307 const UUID pi1_boot_uuid_;
308 const UUID pi2_boot_uuid_;
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800309
310 std::unique_ptr<aos::ShmEventLoop> pi1_server_event_loop;
311 std::unique_ptr<MessageBridgeServer> pi1_message_bridge_server;
312 std::thread pi1_server_thread;
313
314 std::unique_ptr<aos::ShmEventLoop> pi1_client_event_loop;
315 std::unique_ptr<MessageBridgeClient> pi1_message_bridge_client;
316 std::thread pi1_client_thread;
317
318 std::unique_ptr<aos::ShmEventLoop> pi1_test_event_loop;
319 std::thread pi1_test_thread;
320
321 std::unique_ptr<aos::ShmEventLoop> pi2_server_event_loop;
322 std::unique_ptr<MessageBridgeServer> pi2_message_bridge_server;
323 std::thread pi2_server_thread;
324
325 std::unique_ptr<aos::ShmEventLoop> pi2_client_event_loop;
326 std::unique_ptr<MessageBridgeClient> pi2_message_bridge_client;
327 std::thread pi2_client_thread;
328
329 std::unique_ptr<aos::ShmEventLoop> pi2_test_event_loop;
330 std::thread pi2_test_thread;
Austin Schuhe991fe22020-11-18 16:53:39 -0800331};
332
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800333// Test that we can send a ping message over sctp and receive it.
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800334TEST_P(MessageBridgeParameterizedTest, PingPong) {
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800335 // This is rather annoying to set up. We need to start up a client and
336 // server, on the same node, but get them to think that they are on different
337 // nodes.
338 //
339 // We then get to wait until they are connected.
340 //
341 // After they are connected, we send a Ping message.
342 //
343 // On the other end, we receive a Pong message.
344 //
345 // But, we need the client to not post directly to "/test" like it would in a
346 // real system, otherwise we will re-send the ping message... So, use an
347 // application specific map to have the client post somewhere else.
348 //
349 // To top this all off, each of these needs to be done with a ShmEventLoop,
350 // which needs to run in a separate thread... And it is really hard to get
351 // everything started up reliably. So just be super generous on timeouts and
352 // hope for the best. We can be more generous in the future if we need to.
353 //
354 // We are faking the application names by passing in --application_name=foo
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800355 OnPi1();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800356 // Force ourselves to be "raspberrypi" and allocate everything.
Austin Schuh2f8fd752020-09-01 22:38:28 -0700357
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800358 MakePi1Server();
359 MakePi1Client();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800360
Austin Schuh89e1e9c2023-05-15 14:38:44 -0700361 const std::string long_data = std::string(10000, 'a');
362
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800363 // And build the app which sends the pings.
364 FLAGS_application_name = "ping";
Austin Schuhf466ab52021-02-16 22:00:38 -0800365 aos::ShmEventLoop ping_event_loop(&config.message());
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800366 aos::Sender<examples::Ping> ping_sender =
367 ping_event_loop.MakeSender<examples::Ping>("/test");
368
Austin Schuhf466ab52021-02-16 22:00:38 -0800369 aos::ShmEventLoop pi1_test_event_loop(&config.message());
Austin Schuh0de30f32020-12-06 12:44:28 -0800370 aos::Fetcher<RemoteMessage> message_header_fetcher1 =
371 pi1_test_event_loop.MakeFetcher<RemoteMessage>(
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800372 shared() ? "/pi1/aos/remote_timestamps/pi2"
373 : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping");
Austin Schuh2f8fd752020-09-01 22:38:28 -0700374
375 // Fetchers for confirming the remote timestamps made it.
376 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
377 ping_event_loop.MakeFetcher<examples::Ping>("/test");
378 aos::Fetcher<Timestamp> pi1_on_pi1_timestamp_fetcher =
379 ping_event_loop.MakeFetcher<Timestamp>("/aos");
380
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800381 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800382 OnPi2();
Austin Schuh2f8fd752020-09-01 22:38:28 -0700383
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800384 MakePi2Client();
385 MakePi2Server();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800386
387 // And build the app which sends the pongs.
388 FLAGS_application_name = "pong";
Austin Schuhf466ab52021-02-16 22:00:38 -0800389 aos::ShmEventLoop pong_event_loop(&config.message());
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800390
Austin Schuh7bc59052020-02-16 23:48:33 -0800391 // And build the app for testing.
392 FLAGS_application_name = "test";
Austin Schuhf466ab52021-02-16 22:00:38 -0800393 aos::ShmEventLoop test_event_loop(&config.message());
Austin Schuh7bc59052020-02-16 23:48:33 -0800394
395 aos::Fetcher<ClientStatistics> client_statistics_fetcher =
396 test_event_loop.MakeFetcher<ClientStatistics>("/aos");
Austin Schuh0de30f32020-12-06 12:44:28 -0800397 aos::Fetcher<RemoteMessage> message_header_fetcher2 =
398 test_event_loop.MakeFetcher<RemoteMessage>(
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800399 shared() ? "/pi2/aos/remote_timestamps/pi1"
400 : "/pi2/aos/remote_timestamps/pi1/pi2/aos/"
401 "aos-message_bridge-Timestamp");
Austin Schuh2f8fd752020-09-01 22:38:28 -0700402
403 // Event loop for fetching data delivered to pi2 from pi1 to match up
404 // messages.
Austin Schuhf466ab52021-02-16 22:00:38 -0800405 aos::ShmEventLoop delivered_messages_event_loop(&config.message());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700406 aos::Fetcher<Timestamp> pi1_on_pi2_timestamp_fetcher =
407 delivered_messages_event_loop.MakeFetcher<Timestamp>("/pi1/aos");
408 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
409 delivered_messages_event_loop.MakeFetcher<examples::Ping>("/test");
410 EXPECT_FALSE(ping_on_pi2_fetcher.Fetch());
411 EXPECT_FALSE(pi1_on_pi2_timestamp_fetcher.Fetch());
Austin Schuh7bc59052020-02-16 23:48:33 -0800412
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800413 // Count the pongs.
414 int pong_count = 0;
Austin Schuh8902fa52021-03-14 22:39:24 -0700415 pong_event_loop.MakeWatcher("/test", [&pong_count, &pong_event_loop,
416 this](const examples::Ping &ping) {
Austin Schuha9012be2021-07-21 15:19:11 -0700417 EXPECT_EQ(pong_event_loop.context().source_boot_uuid, pi1_boot_uuid_);
Austin Schuh8902fa52021-03-14 22:39:24 -0700418 ++pong_count;
419 VLOG(1) << "Got ping back " << FlatbufferToJson(&ping);
420 });
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800421
422 FLAGS_override_hostname = "";
423
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800424 // Wait until we are connected, then send.
425 int ping_count = 0;
Austin Schuh7bc59052020-02-16 23:48:33 -0800426 int pi1_server_statistics_count = 0;
Philipp Schrader790cb542023-07-05 21:06:52 -0700427 ping_event_loop.MakeWatcher(
428 "/pi1/aos",
429 [this, &ping_count, &ping_sender, &pi1_server_statistics_count,
430 &long_data](const ServerStatistics &stats) {
431 VLOG(1) << "/pi1/aos ServerStatistics " << FlatbufferToJson(&stats);
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800432
Philipp Schrader790cb542023-07-05 21:06:52 -0700433 ASSERT_TRUE(stats.has_connections());
434 EXPECT_EQ(stats.connections()->size(), 1);
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800435
Philipp Schrader790cb542023-07-05 21:06:52 -0700436 bool connected = false;
437 for (const ServerConnection *connection : *stats.connections()) {
438 // Confirm that we are estimating the server time offset correctly. It
439 // should be about 0 since we are on the same machine here.
440 if (connection->has_monotonic_offset()) {
441 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
442 chrono::milliseconds(1));
443 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
444 chrono::milliseconds(-1));
445 ++pi1_server_statistics_count;
446 }
Austin Schuh7bc59052020-02-16 23:48:33 -0800447
Philipp Schrader790cb542023-07-05 21:06:52 -0700448 if (connection->node()->name()->string_view() ==
449 pi2_client_event_loop->node()->name()->string_view()) {
450 if (connection->state() == State::CONNECTED) {
451 EXPECT_TRUE(connection->has_boot_uuid());
452 EXPECT_EQ(connection->connection_count(), 1u);
453 EXPECT_LT(monotonic_clock::time_point(chrono::nanoseconds(
454 connection->connected_since_time())),
455 monotonic_clock::now());
456 connected = true;
457 } else {
458 EXPECT_FALSE(connection->has_connection_count());
459 EXPECT_FALSE(connection->has_connected_since_time());
460 }
461 }
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800462 }
463
Philipp Schrader790cb542023-07-05 21:06:52 -0700464 if (connected) {
465 VLOG(1) << "Connected! Sent ping.";
466 auto builder = ping_sender.MakeBuilder();
467 builder.fbb()->CreateString(long_data);
468 examples::Ping::Builder ping_builder =
469 builder.MakeBuilder<examples::Ping>();
470 ping_builder.add_value(ping_count + 971);
471 EXPECT_EQ(builder.Send(ping_builder.Finish()), RawSender::Error::kOk);
472 ++ping_count;
473 }
474 });
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800475
Austin Schuh7bc59052020-02-16 23:48:33 -0800476 // Confirm both client and server statistics messages have decent offsets in
477 // them.
478 int pi2_server_statistics_count = 0;
Austin Schuh196a4452020-03-15 23:12:03 -0700479 pong_event_loop.MakeWatcher("/pi2/aos", [&pi2_server_statistics_count](
Austin Schuh7bc59052020-02-16 23:48:33 -0800480 const ServerStatistics &stats) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800481 VLOG(1) << "/pi2/aos ServerStatistics " << FlatbufferToJson(&stats);
Austin Schuh7bc59052020-02-16 23:48:33 -0800482 for (const ServerConnection *connection : *stats.connections()) {
483 if (connection->has_monotonic_offset()) {
484 ++pi2_server_statistics_count;
485 // Confirm that we are estimating the server time offset correctly. It
486 // should be about 0 since we are on the same machine here.
487 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
488 chrono::milliseconds(1));
489 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
490 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800491 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh7bc59052020-02-16 23:48:33 -0800492 }
Austin Schuh367a7f42021-11-23 23:04:36 -0800493
494 if (connection->state() == State::CONNECTED) {
495 EXPECT_EQ(connection->connection_count(), 1u);
496 EXPECT_LT(monotonic_clock::time_point(
497 chrono::nanoseconds(connection->connected_since_time())),
498 monotonic_clock::now());
499 } else {
500 EXPECT_FALSE(connection->has_connection_count());
501 EXPECT_FALSE(connection->has_connected_since_time());
502 }
Austin Schuh7bc59052020-02-16 23:48:33 -0800503 }
504 });
505
506 int pi1_client_statistics_count = 0;
Austin Schuh367a7f42021-11-23 23:04:36 -0800507 int pi1_connected_client_statistics_count = 0;
508 ping_event_loop.MakeWatcher(
509 "/pi1/aos",
510 [&pi1_client_statistics_count,
511 &pi1_connected_client_statistics_count](const ClientStatistics &stats) {
512 VLOG(1) << "/pi1/aos ClientStatistics " << FlatbufferToJson(&stats);
Austin Schuh7bc59052020-02-16 23:48:33 -0800513
Austin Schuh367a7f42021-11-23 23:04:36 -0800514 for (const ClientConnection *connection : *stats.connections()) {
515 if (connection->has_monotonic_offset()) {
516 ++pi1_client_statistics_count;
517 // It takes at least 10 microseconds to send a message between the
518 // client and server. The min (filtered) time shouldn't be over 10
519 // milliseconds on localhost. This might have to bump up if this is
520 // proving flaky.
521 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
522 chrono::milliseconds(10))
523 << " " << connection->monotonic_offset()
524 << "ns vs 10000ns on iteration " << pi1_client_statistics_count;
525 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
526 chrono::microseconds(10))
527 << " " << connection->monotonic_offset()
528 << "ns vs 10000ns on iteration " << pi1_client_statistics_count;
529 }
530 if (connection->state() == State::CONNECTED) {
531 EXPECT_EQ(connection->connection_count(), 1u);
532 EXPECT_LT(monotonic_clock::time_point(chrono::nanoseconds(
533 connection->connected_since_time())),
534 monotonic_clock::now());
535 // The first Connected message may not have a UUID in it since no
536 // data has flown. That's fine.
537 if (pi1_connected_client_statistics_count > 0) {
538 EXPECT_TRUE(connection->has_boot_uuid())
539 << ": " << aos::FlatbufferToJson(connection);
540 }
541 ++pi1_connected_client_statistics_count;
542 } else {
543 EXPECT_FALSE(connection->has_connection_count());
544 EXPECT_FALSE(connection->has_connected_since_time());
545 }
546 }
547 });
Austin Schuh7bc59052020-02-16 23:48:33 -0800548
549 int pi2_client_statistics_count = 0;
Austin Schuh367a7f42021-11-23 23:04:36 -0800550 int pi2_connected_client_statistics_count = 0;
551 pong_event_loop.MakeWatcher(
552 "/pi2/aos",
553 [&pi2_client_statistics_count,
554 &pi2_connected_client_statistics_count](const ClientStatistics &stats) {
555 VLOG(1) << "/pi2/aos ClientStatistics " << FlatbufferToJson(&stats);
Austin Schuh7bc59052020-02-16 23:48:33 -0800556
Austin Schuh367a7f42021-11-23 23:04:36 -0800557 for (const ClientConnection *connection : *stats.connections()) {
558 if (connection->has_monotonic_offset()) {
559 ++pi2_client_statistics_count;
560 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
561 chrono::milliseconds(10))
562 << ": got " << aos::FlatbufferToJson(connection);
563 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
564 chrono::microseconds(10))
565 << ": got " << aos::FlatbufferToJson(connection);
566 }
567 if (connection->state() == State::CONNECTED) {
568 EXPECT_EQ(connection->connection_count(), 1u);
569 EXPECT_LT(monotonic_clock::time_point(chrono::nanoseconds(
570 connection->connected_since_time())),
571 monotonic_clock::now());
572 if (pi2_connected_client_statistics_count > 0) {
573 EXPECT_TRUE(connection->has_boot_uuid());
574 }
575 ++pi2_connected_client_statistics_count;
576 } else {
577 EXPECT_FALSE(connection->has_connection_count());
578 EXPECT_FALSE(connection->has_connected_since_time());
579 }
580 }
581 });
Austin Schuh7bc59052020-02-16 23:48:33 -0800582
Austin Schuh196a4452020-03-15 23:12:03 -0700583 ping_event_loop.MakeWatcher("/pi1/aos", [](const Timestamp &timestamp) {
Austin Schuh7bc59052020-02-16 23:48:33 -0800584 EXPECT_TRUE(timestamp.has_offsets());
Austin Schuh1ca49e92020-12-11 00:01:27 -0800585 VLOG(1) << "/pi1/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuh7bc59052020-02-16 23:48:33 -0800586 });
Austin Schuh196a4452020-03-15 23:12:03 -0700587 pong_event_loop.MakeWatcher("/pi2/aos", [](const Timestamp &timestamp) {
Austin Schuh7bc59052020-02-16 23:48:33 -0800588 EXPECT_TRUE(timestamp.has_offsets());
Austin Schuh1ca49e92020-12-11 00:01:27 -0800589 VLOG(1) << "/pi2/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuh7bc59052020-02-16 23:48:33 -0800590 });
591
592 // Run for 5 seconds to make sure we have time to estimate the offset.
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800593 aos::TimerHandler *quit = ping_event_loop.AddTimer(
594 [&ping_event_loop]() { ping_event_loop.Exit(); });
595 ping_event_loop.OnRun([quit, &ping_event_loop]() {
Austin Schuh7bc59052020-02-16 23:48:33 -0800596 // Stop between timestamps, not exactly on them.
597 quit->Setup(ping_event_loop.monotonic_now() + chrono::milliseconds(5050));
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800598 });
599
Austin Schuh2f8fd752020-09-01 22:38:28 -0700600 // Find the channel index for both the /pi1/aos Timestamp channel and Ping
601 // channel.
602 const size_t pi1_timestamp_channel = configuration::ChannelIndex(
603 pong_event_loop.configuration(), pi1_on_pi2_timestamp_fetcher.channel());
604 const size_t ping_timestamp_channel =
605 configuration::ChannelIndex(delivered_messages_event_loop.configuration(),
606 ping_on_pi2_fetcher.channel());
607
608 for (const Channel *channel : *ping_event_loop.configuration()->channels()) {
609 VLOG(1) << "Channel "
610 << configuration::ChannelIndex(ping_event_loop.configuration(),
611 channel)
612 << " " << configuration::CleanedChannelToString(channel);
613 }
614
615 // For each remote timestamp we get back, confirm that it is either a ping
616 // message, or a timestamp we sent out. Also confirm that the timestamps are
617 // correct.
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800618 for (std::pair<int, std::string> channel :
619 shared()
620 ? std::vector<std::pair<
621 int, std::string>>{{-1, "/pi1/aos/remote_timestamps/pi2"}}
622 : std::vector<std::pair<int, std::string>>{
623 {pi1_timestamp_channel,
624 "/pi1/aos/remote_timestamps/pi2/pi1/aos/"
625 "aos-message_bridge-Timestamp"},
626 {ping_timestamp_channel,
627 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping"}}) {
628 ping_event_loop.MakeWatcher(
629 channel.second,
630 [pi1_timestamp_channel, ping_timestamp_channel, &ping_on_pi2_fetcher,
631 &ping_on_pi1_fetcher, &pi1_on_pi2_timestamp_fetcher,
632 &pi1_on_pi1_timestamp_fetcher,
633 channel_index = channel.first](const RemoteMessage &header) {
634 VLOG(1) << "/pi1/aos/remote_timestamps/pi2 RemoteMessage "
635 << aos::FlatbufferToJson(&header);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700636
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800637 EXPECT_TRUE(header.has_boot_uuid());
638 if (channel_index != -1) {
639 ASSERT_EQ(channel_index, header.channel_index());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700640 }
641
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800642 const aos::monotonic_clock::time_point header_monotonic_sent_time(
643 chrono::nanoseconds(header.monotonic_sent_time()));
644 const aos::realtime_clock::time_point header_realtime_sent_time(
645 chrono::nanoseconds(header.realtime_sent_time()));
646 const aos::monotonic_clock::time_point header_monotonic_remote_time(
647 chrono::nanoseconds(header.monotonic_remote_time()));
648 const aos::realtime_clock::time_point header_realtime_remote_time(
649 chrono::nanoseconds(header.realtime_remote_time()));
650
651 const Context *pi1_context = nullptr;
652 const Context *pi2_context = nullptr;
653
654 if (header.channel_index() == pi1_timestamp_channel) {
655 // Find the forwarded message.
656 while (pi1_on_pi2_timestamp_fetcher.context().monotonic_event_time <
657 header_monotonic_sent_time) {
658 ASSERT_TRUE(pi1_on_pi2_timestamp_fetcher.FetchNext());
659 }
660
661 // And the source message.
662 while (pi1_on_pi1_timestamp_fetcher.context().monotonic_event_time <
663 header_monotonic_remote_time) {
664 ASSERT_TRUE(pi1_on_pi1_timestamp_fetcher.FetchNext());
665 }
666
667 pi1_context = &pi1_on_pi1_timestamp_fetcher.context();
668 pi2_context = &pi1_on_pi2_timestamp_fetcher.context();
669 } else if (header.channel_index() == ping_timestamp_channel) {
670 // Find the forwarded message.
671 while (ping_on_pi2_fetcher.context().monotonic_event_time <
672 header_monotonic_sent_time) {
673 ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
674 }
675
676 // And the source message.
677 while (ping_on_pi1_fetcher.context().monotonic_event_time <
678 header_monotonic_remote_time) {
679 ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
680 }
681
682 pi1_context = &ping_on_pi1_fetcher.context();
683 pi2_context = &ping_on_pi2_fetcher.context();
684 } else {
685 LOG(FATAL) << "Unknown channel";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700686 }
687
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800688 // Confirm the forwarded message has matching timestamps to the
689 // timestamps we got back.
690 EXPECT_EQ(pi2_context->queue_index, header.queue_index());
691 EXPECT_EQ(pi2_context->monotonic_event_time,
692 header_monotonic_sent_time);
693 EXPECT_EQ(pi2_context->realtime_event_time,
694 header_realtime_sent_time);
695 EXPECT_EQ(pi2_context->realtime_remote_time,
696 header_realtime_remote_time);
697 EXPECT_EQ(pi2_context->monotonic_remote_time,
698 header_monotonic_remote_time);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700699
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800700 // Confirm the forwarded message also matches the source message.
701 EXPECT_EQ(pi1_context->queue_index, header.queue_index());
702 EXPECT_EQ(pi1_context->monotonic_event_time,
703 header_monotonic_remote_time);
704 EXPECT_EQ(pi1_context->realtime_event_time,
705 header_realtime_remote_time);
706 });
707 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700708
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800709 // Start everything up. Pong is the only thing we don't know how to wait
710 // on, so start it first.
Austin Schuh7bc59052020-02-16 23:48:33 -0800711 std::thread pong_thread([&pong_event_loop]() { pong_event_loop.Run(); });
712
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800713 StartPi1Server();
714 StartPi1Client();
715 StartPi2Client();
716 StartPi2Server();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800717
718 // And go!
719 ping_event_loop.Run();
720
721 // Shut everyone else down
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800722 StopPi1Server();
723 StopPi1Client();
724 StopPi2Client();
725 StopPi2Server();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800726 pong_event_loop.Exit();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800727 pong_thread.join();
728
729 // Make sure we sent something.
730 EXPECT_GE(ping_count, 1);
731 // And got something back.
732 EXPECT_GE(pong_count, 1);
Austin Schuh7bc59052020-02-16 23:48:33 -0800733
734 // Confirm that we are estimating a monotonic offset on the client.
735 ASSERT_TRUE(client_statistics_fetcher.Fetch());
736
737 EXPECT_EQ(client_statistics_fetcher->connections()->size(), 1u);
738 EXPECT_EQ(client_statistics_fetcher->connections()
739 ->Get(0)
740 ->node()
741 ->name()
742 ->string_view(),
743 "pi1");
744
745 // Make sure the offset in one direction is less than a second.
746 EXPECT_GT(
Austin Schuh2b159eb2021-07-31 19:42:21 -0700747 client_statistics_fetcher->connections()->Get(0)->monotonic_offset(), 0)
748 << aos::FlatbufferToJson(client_statistics_fetcher.get());
Austin Schuh7bc59052020-02-16 23:48:33 -0800749 EXPECT_LT(
750 client_statistics_fetcher->connections()->Get(0)->monotonic_offset(),
Austin Schuh2b159eb2021-07-31 19:42:21 -0700751 1000000000)
752 << aos::FlatbufferToJson(client_statistics_fetcher.get());
Austin Schuh7bc59052020-02-16 23:48:33 -0800753
754 EXPECT_GE(pi1_server_statistics_count, 2);
755 EXPECT_GE(pi2_server_statistics_count, 2);
756 EXPECT_GE(pi1_client_statistics_count, 2);
757 EXPECT_GE(pi2_client_statistics_count, 2);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700758
759 // Confirm we got timestamps back!
760 EXPECT_TRUE(message_header_fetcher1.Fetch());
761 EXPECT_TRUE(message_header_fetcher2.Fetch());
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800762}
763
Austin Schuh5344c352020-04-12 17:04:26 -0700764// Test that the client disconnecting triggers the server offsets on both sides
765// to clear.
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800766TEST_P(MessageBridgeParameterizedTest, ClientRestart) {
Austin Schuh5344c352020-04-12 17:04:26 -0700767 // This is rather annoying to set up. We need to start up a client and
768 // server, on the same node, but get them to think that they are on different
769 // nodes.
770 //
771 // We need the client to not post directly to "/test" like it would in a
772 // real system, otherwise we will re-send the ping message... So, use an
773 // application specific map to have the client post somewhere else.
774 //
775 // To top this all off, each of these needs to be done with a ShmEventLoop,
776 // which needs to run in a separate thread... And it is really hard to get
777 // everything started up reliably. So just be super generous on timeouts and
778 // hope for the best. We can be more generous in the future if we need to.
779 //
780 // We are faking the application names by passing in --application_name=foo
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800781 OnPi1();
Austin Schuh5344c352020-04-12 17:04:26 -0700782
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800783 MakePi1Server();
784 MakePi1Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700785
786 // And build the app for testing.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800787 MakePi1Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700788 aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800789 pi1_test_event_loop->MakeFetcher<ServerStatistics>("/pi1/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700790
791 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800792 OnPi2();
793 MakePi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700794
795 // And build the app for testing.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800796 MakePi2Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700797 aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800798 pi2_test_event_loop->MakeFetcher<ServerStatistics>("/pi2/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700799
800 // Wait until we are connected, then send.
Austin Schuh5344c352020-04-12 17:04:26 -0700801
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800802 StartPi1Test();
803 StartPi2Test();
804 StartPi1Server();
805 StartPi1Client();
806 StartPi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700807
808 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800809 MakePi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700810
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800811 RunPi2Client(chrono::milliseconds(3050));
Austin Schuh5344c352020-04-12 17:04:26 -0700812
813 // Now confirm we are synchronized.
814 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
815 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
816
817 const ServerConnection *const pi1_connection =
818 pi1_server_statistics_fetcher->connections()->Get(0);
819 const ServerConnection *const pi2_connection =
820 pi2_server_statistics_fetcher->connections()->Get(0);
821
822 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800823 EXPECT_EQ(pi1_connection->connection_count(), 1u);
824 EXPECT_TRUE(pi1_connection->has_connected_since_time());
Austin Schuh5344c352020-04-12 17:04:26 -0700825 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
826 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
827 chrono::milliseconds(1));
828 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
829 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800830 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700831
832 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800833 EXPECT_EQ(pi2_connection->connection_count(), 1u);
834 EXPECT_TRUE(pi2_connection->has_connected_since_time());
Austin Schuh5344c352020-04-12 17:04:26 -0700835 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
836 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
837 chrono::milliseconds(1));
838 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
839 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800840 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800841
842 StopPi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700843 }
844
Austin Schuhd0d894e2021-10-24 17:13:11 -0700845 std::this_thread::sleep_for(SctpClientConnection::kReconnectTimeout +
846 std::chrono::seconds(1));
Austin Schuh5344c352020-04-12 17:04:26 -0700847
848 {
849 // Now confirm we are un-synchronized.
850 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
851 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
852 const ServerConnection *const pi1_connection =
853 pi1_server_statistics_fetcher->connections()->Get(0);
854 const ServerConnection *const pi2_connection =
855 pi2_server_statistics_fetcher->connections()->Get(0);
856
857 EXPECT_EQ(pi1_connection->state(), State::DISCONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800858 EXPECT_EQ(pi1_connection->connection_count(), 1u);
859 EXPECT_FALSE(pi1_connection->has_connected_since_time());
Austin Schuh5344c352020-04-12 17:04:26 -0700860 EXPECT_FALSE(pi1_connection->has_monotonic_offset());
Austin Schuh20ac95d2020-12-05 17:24:19 -0800861 EXPECT_FALSE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700862 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
863 EXPECT_FALSE(pi2_connection->has_monotonic_offset());
Austin Schuh20ac95d2020-12-05 17:24:19 -0800864 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh367a7f42021-11-23 23:04:36 -0800865 EXPECT_EQ(pi2_connection->connection_count(), 1u);
866 EXPECT_TRUE(pi2_connection->has_connected_since_time());
Austin Schuh5344c352020-04-12 17:04:26 -0700867 }
868
869 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800870 MakePi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700871 // And go!
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800872 RunPi2Client(chrono::milliseconds(3050));
Austin Schuh5344c352020-04-12 17:04:26 -0700873
874 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
875 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
876
877 // Now confirm we are synchronized again.
878 const ServerConnection *const pi1_connection =
879 pi1_server_statistics_fetcher->connections()->Get(0);
880 const ServerConnection *const pi2_connection =
881 pi2_server_statistics_fetcher->connections()->Get(0);
882
883 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800884 EXPECT_EQ(pi1_connection->connection_count(), 2u);
885 EXPECT_TRUE(pi1_connection->has_connected_since_time());
Austin Schuh5344c352020-04-12 17:04:26 -0700886 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
887 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
Austin Schuh367a7f42021-11-23 23:04:36 -0800888 chrono::milliseconds(1))
889 << ": " << FlatbufferToJson(pi1_connection);
Austin Schuh5344c352020-04-12 17:04:26 -0700890 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
Austin Schuh367a7f42021-11-23 23:04:36 -0800891 chrono::milliseconds(-1))
892 << ": " << FlatbufferToJson(pi1_connection);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800893 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700894
895 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800896 EXPECT_EQ(pi2_connection->connection_count(), 1u);
897 EXPECT_TRUE(pi2_connection->has_connected_since_time());
Austin Schuh5344c352020-04-12 17:04:26 -0700898 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
899 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
Austin Schuh367a7f42021-11-23 23:04:36 -0800900 chrono::milliseconds(1))
901 << ": " << FlatbufferToJson(pi2_connection);
Austin Schuh5344c352020-04-12 17:04:26 -0700902 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
Austin Schuh367a7f42021-11-23 23:04:36 -0800903 chrono::milliseconds(-1))
904 << ": " << FlatbufferToJson(pi2_connection);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800905 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800906
907 StopPi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700908 }
909
910 // Shut everyone else down
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800911 StopPi1Server();
912 StopPi1Client();
913 StopPi2Server();
914 StopPi1Test();
915 StopPi2Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700916}
917
918// Test that the server disconnecting triggers the server offsets on the other
919// side to clear, along with the other client.
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800920TEST_P(MessageBridgeParameterizedTest, ServerRestart) {
Austin Schuh5344c352020-04-12 17:04:26 -0700921 // This is rather annoying to set up. We need to start up a client and
922 // server, on the same node, but get them to think that they are on different
923 // nodes.
924 //
925 // We need the client to not post directly to "/test" like it would in a
926 // real system, otherwise we will re-send the ping message... So, use an
927 // application specific map to have the client post somewhere else.
928 //
929 // To top this all off, each of these needs to be done with a ShmEventLoop,
930 // which needs to run in a separate thread... And it is really hard to get
931 // everything started up reliably. So just be super generous on timeouts and
932 // hope for the best. We can be more generous in the future if we need to.
933 //
934 // We are faking the application names by passing in --application_name=foo
Austin Schuh5344c352020-04-12 17:04:26 -0700935 // Force ourselves to be "raspberrypi" and allocate everything.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800936 OnPi1();
937 MakePi1Server();
938 MakePi1Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700939
940 // And build the app for testing.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800941 MakePi1Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700942 aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800943 pi1_test_event_loop->MakeFetcher<ServerStatistics>("/pi1/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700944 aos::Fetcher<ClientStatistics> pi1_client_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800945 pi1_test_event_loop->MakeFetcher<ClientStatistics>("/pi1/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700946
947 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800948 OnPi2();
949 MakePi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700950
951 // And build the app for testing.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800952 MakePi2Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700953 aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800954 pi2_test_event_loop->MakeFetcher<ServerStatistics>("/pi2/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700955
956 // Start everything up. Pong is the only thing we don't know how to wait on,
957 // so start it first.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800958 StartPi1Test();
959 StartPi2Test();
960 StartPi1Server();
961 StartPi1Client();
962 StartPi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700963
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800964 // Confirm both client and server statistics messages have decent offsets in
965 // them.
Austin Schuh5344c352020-04-12 17:04:26 -0700966
967 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800968 MakePi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700969
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800970 RunPi2Server(chrono::milliseconds(3050));
Austin Schuh5344c352020-04-12 17:04:26 -0700971
972 // Now confirm we are synchronized.
973 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
974 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
975
976 const ServerConnection *const pi1_connection =
977 pi1_server_statistics_fetcher->connections()->Get(0);
978 const ServerConnection *const pi2_connection =
979 pi2_server_statistics_fetcher->connections()->Get(0);
980
981 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
982 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
983 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
984 chrono::milliseconds(1));
985 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
986 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800987 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh367a7f42021-11-23 23:04:36 -0800988 EXPECT_TRUE(pi1_connection->has_connected_since_time());
989 EXPECT_EQ(pi1_connection->connection_count(), 1u);
Austin Schuh5344c352020-04-12 17:04:26 -0700990
991 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
992 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
993 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
994 chrono::milliseconds(1));
995 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
996 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800997 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh367a7f42021-11-23 23:04:36 -0800998 EXPECT_TRUE(pi2_connection->has_connected_since_time());
999 EXPECT_EQ(pi2_connection->connection_count(), 1u);
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001000
1001 StopPi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -07001002 }
1003
1004 std::this_thread::sleep_for(std::chrono::seconds(2));
1005
1006 {
1007 // And confirm we are unsynchronized.
1008 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1009 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1010
1011 const ServerConnection *const pi1_server_connection =
1012 pi1_server_statistics_fetcher->connections()->Get(0);
1013 const ClientConnection *const pi1_client_connection =
1014 pi1_client_statistics_fetcher->connections()->Get(0);
1015
1016 EXPECT_EQ(pi1_server_connection->state(), State::CONNECTED);
1017 EXPECT_FALSE(pi1_server_connection->has_monotonic_offset());
Austin Schuh367a7f42021-11-23 23:04:36 -08001018 EXPECT_TRUE(pi1_server_connection->has_connected_since_time());
1019 EXPECT_EQ(pi1_server_connection->connection_count(), 1u);
1020
Austin Schuh20ac95d2020-12-05 17:24:19 -08001021 EXPECT_TRUE(pi1_server_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -07001022 EXPECT_EQ(pi1_client_connection->state(), State::DISCONNECTED);
1023 EXPECT_FALSE(pi1_client_connection->has_monotonic_offset());
Austin Schuh367a7f42021-11-23 23:04:36 -08001024 EXPECT_FALSE(pi1_client_connection->has_connected_since_time());
1025 EXPECT_EQ(pi1_client_connection->connection_count(), 1u);
1026 EXPECT_FALSE(pi1_client_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -07001027 }
1028
1029 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001030 MakePi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -07001031
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001032 RunPi2Server(chrono::milliseconds(3050));
Austin Schuh5344c352020-04-12 17:04:26 -07001033
1034 // And confirm we are synchronized again.
1035 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1036 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
Austin Schuh367a7f42021-11-23 23:04:36 -08001037 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
Austin Schuh5344c352020-04-12 17:04:26 -07001038
1039 const ServerConnection *const pi1_connection =
1040 pi1_server_statistics_fetcher->connections()->Get(0);
1041 const ServerConnection *const pi2_connection =
1042 pi2_server_statistics_fetcher->connections()->Get(0);
Austin Schuh367a7f42021-11-23 23:04:36 -08001043 const ClientConnection *const pi1_client_connection =
1044 pi1_client_statistics_fetcher->connections()->Get(0);
Austin Schuh5344c352020-04-12 17:04:26 -07001045
1046 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
1047 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
1048 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
1049 chrono::milliseconds(1));
1050 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
1051 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -08001052 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -07001053
Austin Schuh367a7f42021-11-23 23:04:36 -08001054 EXPECT_EQ(pi1_client_connection->state(), State::CONNECTED);
1055 EXPECT_TRUE(pi1_client_connection->has_connected_since_time());
1056 EXPECT_EQ(pi1_client_connection->connection_count(), 2u);
1057 EXPECT_TRUE(pi1_client_connection->has_boot_uuid());
1058
Austin Schuh5344c352020-04-12 17:04:26 -07001059 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
1060 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
1061 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
1062 chrono::milliseconds(1));
1063 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
1064 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -08001065 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001066
1067 StopPi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -07001068 }
1069
1070 // Shut everyone else down
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001071 StopPi1Server();
1072 StopPi1Client();
1073 StopPi2Client();
1074 StopPi1Test();
1075 StopPi2Test();
Austin Schuh5344c352020-04-12 17:04:26 -07001076}
1077
Austin Schuh4889b182020-11-18 19:11:56 -08001078// TODO(austin): The above test confirms that the external state does the right
Austin Schuh5344c352020-04-12 17:04:26 -07001079// thing, but doesn't confirm that the internal state does. We either need to
1080// expose a way to check the state in a thread-safe way, or need a way to jump
1081// time for one node to do that.
1082
Austin Schuh4889b182020-11-18 19:11:56 -08001083void SendPing(aos::Sender<examples::Ping> *sender, int value) {
1084 aos::Sender<examples::Ping>::Builder builder = sender->MakeBuilder();
1085 examples::Ping::Builder ping_builder = builder.MakeBuilder<examples::Ping>();
1086 ping_builder.add_value(value);
milind1f1dca32021-07-03 13:50:07 -07001087 builder.CheckOk(builder.Send(ping_builder.Finish()));
Austin Schuh4889b182020-11-18 19:11:56 -08001088}
1089
1090// Tests that when a message is sent before the bridge starts up, but is
1091// configured as reliable, we forward it. Confirm this survives a client reset.
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001092TEST_P(MessageBridgeParameterizedTest, ReliableSentBeforeClientStartup) {
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001093 OnPi1();
Austin Schuh4889b182020-11-18 19:11:56 -08001094
1095 FLAGS_application_name = "sender";
Austin Schuhf466ab52021-02-16 22:00:38 -08001096 aos::ShmEventLoop send_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -08001097 aos::Sender<examples::Ping> ping_sender =
1098 send_event_loop.MakeSender<examples::Ping>("/test");
1099 SendPing(&ping_sender, 1);
1100 aos::Sender<examples::Ping> unreliable_ping_sender =
1101 send_event_loop.MakeSender<examples::Ping>("/unreliable");
1102 SendPing(&unreliable_ping_sender, 1);
1103
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001104 MakePi1Server();
1105 MakePi1Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001106
1107 FLAGS_application_name = "pi1_timestamp";
Austin Schuhf466ab52021-02-16 22:00:38 -08001108 aos::ShmEventLoop pi1_remote_timestamp_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -08001109
1110 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001111 OnPi2();
Austin Schuh4889b182020-11-18 19:11:56 -08001112
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001113 MakePi2Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001114
Austin Schuhf466ab52021-02-16 22:00:38 -08001115 aos::ShmEventLoop receive_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -08001116 aos::Fetcher<examples::Ping> ping_fetcher =
1117 receive_event_loop.MakeFetcher<examples::Ping>("/test");
1118 aos::Fetcher<examples::Ping> unreliable_ping_fetcher =
1119 receive_event_loop.MakeFetcher<examples::Ping>("/unreliable");
1120 aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
1121 receive_event_loop.MakeFetcher<ClientStatistics>("/pi2/aos");
1122
1123 const size_t ping_channel_index = configuration::ChannelIndex(
1124 receive_event_loop.configuration(), ping_fetcher.channel());
1125
1126 std::atomic<int> ping_timestamp_count{0};
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001127 const std::string channel_name =
1128 shared() ? "/pi1/aos/remote_timestamps/pi2"
1129 : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping";
Austin Schuh4889b182020-11-18 19:11:56 -08001130 pi1_remote_timestamp_event_loop.MakeWatcher(
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001131 channel_name, [this, channel_name, ping_channel_index,
1132 &ping_timestamp_count](const RemoteMessage &header) {
Austin Schuh61e973f2021-02-21 21:43:56 -08001133 VLOG(1) << channel_name << " RemoteMessage "
Austin Schuh0de30f32020-12-06 12:44:28 -08001134 << aos::FlatbufferToJson(&header);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001135 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001136 if (shared() && header.channel_index() != ping_channel_index) {
1137 return;
Austin Schuh4889b182020-11-18 19:11:56 -08001138 }
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001139 CHECK_EQ(header.channel_index(), ping_channel_index);
1140 ++ping_timestamp_count;
Austin Schuh4889b182020-11-18 19:11:56 -08001141 });
1142
1143 // Before everything starts up, confirm there is no message.
1144 EXPECT_FALSE(ping_fetcher.Fetch());
1145 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
1146
1147 // Spin up the persistant pieces.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001148 StartPi1Server();
1149 StartPi1Client();
1150 StartPi2Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001151
1152 // Event used to wait for the timestamp counting thread to start.
1153 aos::Event event;
1154 std::thread pi1_remote_timestamp_thread(
1155 [&pi1_remote_timestamp_event_loop, &event]() {
1156 pi1_remote_timestamp_event_loop.OnRun([&event]() { event.Set(); });
1157 pi1_remote_timestamp_event_loop.Run();
1158 });
1159
1160 event.Wait();
1161
1162 {
1163 // Now, spin up a client for 2 seconds.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001164 MakePi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001165
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001166 RunPi2Client(chrono::milliseconds(2050));
Austin Schuh4889b182020-11-18 19:11:56 -08001167
1168 // Confirm there is no detected duplicate packet.
1169 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1170 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1171 ->Get(0)
1172 ->duplicate_packets(),
1173 0u);
1174
Austin Schuhe61d4382021-03-31 21:33:02 -07001175 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1176 ->Get(0)
1177 ->partial_deliveries(),
1178 0u);
1179
Austin Schuh4889b182020-11-18 19:11:56 -08001180 EXPECT_TRUE(ping_fetcher.Fetch());
1181 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
1182 EXPECT_EQ(ping_timestamp_count, 1);
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001183
1184 StopPi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001185 }
1186
1187 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001188 // Now, spin up a client for 2 seconds.
1189 MakePi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001190
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001191 RunPi2Client(chrono::milliseconds(5050));
Austin Schuh4889b182020-11-18 19:11:56 -08001192
1193 // Confirm we detect the duplicate packet correctly.
1194 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1195 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1196 ->Get(0)
1197 ->duplicate_packets(),
1198 1u);
1199
Austin Schuhe61d4382021-03-31 21:33:02 -07001200 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1201 ->Get(0)
1202 ->partial_deliveries(),
1203 0u);
1204
Austin Schuh4889b182020-11-18 19:11:56 -08001205 EXPECT_EQ(ping_timestamp_count, 1);
1206 EXPECT_FALSE(ping_fetcher.Fetch());
1207 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001208
1209 StopPi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001210 }
1211
1212 // Shut everyone else down
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001213 StopPi1Client();
1214 StopPi2Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001215 pi1_remote_timestamp_event_loop.Exit();
1216 pi1_remote_timestamp_thread.join();
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001217 StopPi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001218}
1219
1220// Tests that when a message is sent before the bridge starts up, but is
1221// configured as reliable, we forward it. Confirm this works across server
1222// resets.
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001223TEST_P(MessageBridgeParameterizedTest, ReliableSentBeforeServerStartup) {
Austin Schuh4889b182020-11-18 19:11:56 -08001224 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001225 OnPi2();
Austin Schuh4889b182020-11-18 19:11:56 -08001226
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001227 MakePi2Server();
1228 MakePi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001229
Austin Schuhf466ab52021-02-16 22:00:38 -08001230 aos::ShmEventLoop receive_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -08001231 aos::Fetcher<examples::Ping> ping_fetcher =
1232 receive_event_loop.MakeFetcher<examples::Ping>("/test");
1233 aos::Fetcher<examples::Ping> unreliable_ping_fetcher =
1234 receive_event_loop.MakeFetcher<examples::Ping>("/unreliable");
1235 aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
1236 receive_event_loop.MakeFetcher<ClientStatistics>("/pi2/aos");
1237
Austin Schuh4889b182020-11-18 19:11:56 -08001238 // Force ourselves to be "raspberrypi" and allocate everything.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001239 OnPi1();
Austin Schuh4889b182020-11-18 19:11:56 -08001240
1241 FLAGS_application_name = "sender";
Austin Schuhf466ab52021-02-16 22:00:38 -08001242 aos::ShmEventLoop send_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -08001243 aos::Sender<examples::Ping> ping_sender =
1244 send_event_loop.MakeSender<examples::Ping>("/test");
1245 {
1246 aos::Sender<examples::Ping>::Builder builder = ping_sender.MakeBuilder();
1247 examples::Ping::Builder ping_builder =
1248 builder.MakeBuilder<examples::Ping>();
1249 ping_builder.add_value(1);
milind1f1dca32021-07-03 13:50:07 -07001250 builder.CheckOk(builder.Send(ping_builder.Finish()));
Austin Schuh4889b182020-11-18 19:11:56 -08001251 }
1252
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001253 MakePi1Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001254
1255 FLAGS_application_name = "pi1_timestamp";
Austin Schuhf466ab52021-02-16 22:00:38 -08001256 aos::ShmEventLoop pi1_remote_timestamp_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -08001257
1258 const size_t ping_channel_index = configuration::ChannelIndex(
1259 receive_event_loop.configuration(), ping_fetcher.channel());
1260
1261 std::atomic<int> ping_timestamp_count{0};
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001262 const std::string channel_name =
1263 shared() ? "/pi1/aos/remote_timestamps/pi2"
1264 : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping";
Austin Schuh4889b182020-11-18 19:11:56 -08001265 pi1_remote_timestamp_event_loop.MakeWatcher(
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001266 channel_name, [this, channel_name, ping_channel_index,
1267 &ping_timestamp_count](const RemoteMessage &header) {
1268 VLOG(1) << channel_name << " RemoteMessage "
Austin Schuh0de30f32020-12-06 12:44:28 -08001269 << aos::FlatbufferToJson(&header);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001270 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001271 if (shared() && header.channel_index() != ping_channel_index) {
1272 return;
Austin Schuh4889b182020-11-18 19:11:56 -08001273 }
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001274 CHECK_EQ(header.channel_index(), ping_channel_index);
1275 ++ping_timestamp_count;
Austin Schuh4889b182020-11-18 19:11:56 -08001276 });
1277
1278 // Before everything starts up, confirm there is no message.
1279 EXPECT_FALSE(ping_fetcher.Fetch());
1280 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
1281
1282 // Spin up the persistant pieces.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001283 StartPi1Client();
1284 StartPi2Server();
1285 StartPi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001286
1287 // Event used to wait for the timestamp counting thread to start.
1288 aos::Event event;
1289 std::thread pi1_remote_timestamp_thread(
1290 [&pi1_remote_timestamp_event_loop, &event]() {
1291 pi1_remote_timestamp_event_loop.OnRun([&event]() { event.Set(); });
1292 pi1_remote_timestamp_event_loop.Run();
1293 });
1294
1295 event.Wait();
1296
1297 {
1298 // Now, spin up a server for 2 seconds.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001299 MakePi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001300
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001301 RunPi1Server(chrono::milliseconds(2050));
Austin Schuh4889b182020-11-18 19:11:56 -08001302
1303 // Confirm there is no detected duplicate packet.
1304 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1305 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1306 ->Get(0)
1307 ->duplicate_packets(),
1308 0u);
1309
Austin Schuhe61d4382021-03-31 21:33:02 -07001310 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1311 ->Get(0)
1312 ->partial_deliveries(),
1313 0u);
1314
Austin Schuh4889b182020-11-18 19:11:56 -08001315 EXPECT_TRUE(ping_fetcher.Fetch());
1316 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
1317 EXPECT_EQ(ping_timestamp_count, 1);
1318 LOG(INFO) << "Shutting down first pi1 MessageBridgeServer";
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001319
1320 StopPi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001321 }
1322
1323 {
1324 // Now, spin up a second server for 2 seconds.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001325 MakePi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001326
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001327 RunPi1Server(chrono::milliseconds(2050));
Austin Schuh4889b182020-11-18 19:11:56 -08001328
1329 // Confirm we detect the duplicate packet correctly.
1330 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1331 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1332 ->Get(0)
1333 ->duplicate_packets(),
1334 1u);
1335
Austin Schuhe61d4382021-03-31 21:33:02 -07001336 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1337 ->Get(0)
1338 ->partial_deliveries(),
1339 0u);
1340
Austin Schuh4889b182020-11-18 19:11:56 -08001341 EXPECT_EQ(ping_timestamp_count, 1);
1342 EXPECT_FALSE(ping_fetcher.Fetch());
1343 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001344
1345 StopPi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001346 }
1347
1348 // Shut everyone else down
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001349 StopPi1Client();
1350 StopPi2Server();
1351 StopPi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001352 pi1_remote_timestamp_event_loop.Exit();
1353 pi1_remote_timestamp_thread.join();
Austin Schuh4889b182020-11-18 19:11:56 -08001354}
1355
Austin Schuhb0e439d2023-05-15 10:55:40 -07001356// Test that differing config sha256's result in no connection.
1357TEST_P(MessageBridgeParameterizedTest, MismatchedSha256) {
1358 // This is rather annoying to set up. We need to start up a client and
1359 // server, on the same node, but get them to think that they are on different
1360 // nodes.
1361 //
1362 // We need the client to not post directly to "/test" like it would in a
1363 // real system, otherwise we will re-send the ping message... So, use an
1364 // application specific map to have the client post somewhere else.
1365 //
1366 // To top this all off, each of these needs to be done with a ShmEventLoop,
1367 // which needs to run in a separate thread... And it is really hard to get
1368 // everything started up reliably. So just be super generous on timeouts and
1369 // hope for the best. We can be more generous in the future if we need to.
1370 //
1371 // We are faking the application names by passing in --application_name=foo
1372 OnPi1();
1373
Austin Schuh530f9ee2023-05-15 14:29:31 -07001374 MakePi1Server(
1375 "dummy sha256 ");
Austin Schuhb0e439d2023-05-15 10:55:40 -07001376 MakePi1Client();
1377
1378 // And build the app for testing.
1379 MakePi1Test();
1380 aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
1381 pi1_test_event_loop->MakeFetcher<ServerStatistics>("/pi1/aos");
1382 aos::Fetcher<ClientStatistics> pi1_client_statistics_fetcher =
1383 pi1_test_event_loop->MakeFetcher<ClientStatistics>("/pi1/aos");
1384
1385 // Now do it for "raspberrypi2", the client.
1386 OnPi2();
1387 MakePi2Server();
1388
1389 // And build the app for testing.
1390 MakePi2Test();
1391 aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
1392 pi2_test_event_loop->MakeFetcher<ServerStatistics>("/pi2/aos");
1393 aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
1394 pi2_test_event_loop->MakeFetcher<ClientStatistics>("/pi2/aos");
1395
1396 // Wait until we are connected, then send.
1397
1398 StartPi1Test();
1399 StartPi2Test();
1400 StartPi1Server();
1401 StartPi1Client();
1402 StartPi2Server();
1403
1404 {
1405 MakePi2Client();
1406
1407 RunPi2Client(chrono::milliseconds(3050));
1408
1409 // Now confirm we are synchronized.
1410 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1411 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1412 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1413 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1414
1415 const ServerConnection *const pi1_connection =
1416 pi1_server_statistics_fetcher->connections()->Get(0);
1417 const ClientConnection *const pi1_client_connection =
1418 pi1_client_statistics_fetcher->connections()->Get(0);
1419 const ServerConnection *const pi2_connection =
1420 pi2_server_statistics_fetcher->connections()->Get(0);
1421 const ClientConnection *const pi2_client_connection =
1422 pi2_client_statistics_fetcher->connections()->Get(0);
1423
1424 // Make sure one direction is disconnected with a bunch of connection
1425 // attempts and failures.
1426 EXPECT_EQ(pi1_connection->state(), State::DISCONNECTED);
1427 EXPECT_EQ(pi1_connection->connection_count(), 0u);
1428 EXPECT_GT(pi1_connection->invalid_connection_count(), 10u);
1429
1430 EXPECT_EQ(pi2_client_connection->state(), State::DISCONNECTED);
1431 EXPECT_GT(pi2_client_connection->connection_count(), 10u);
1432
1433 // And the other direction is happy.
1434 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
1435 EXPECT_EQ(pi2_connection->connection_count(), 1u);
1436 EXPECT_TRUE(pi2_connection->has_connected_since_time());
1437 EXPECT_FALSE(pi2_connection->has_monotonic_offset());
1438 EXPECT_TRUE(pi2_connection->has_boot_uuid());
1439
1440 EXPECT_EQ(pi1_client_connection->state(), State::CONNECTED);
1441 EXPECT_EQ(pi1_client_connection->connection_count(), 1u);
1442
1443 VLOG(1) << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1444 VLOG(1) << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1445 VLOG(1) << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1446 VLOG(1) << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1447
1448 StopPi2Client();
1449 }
1450
1451 // Shut everyone else down
1452 StopPi1Server();
1453 StopPi1Client();
1454 StopPi2Server();
1455 StopPi1Test();
1456 StopPi2Test();
1457}
1458
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -07001459INSTANTIATE_TEST_SUITE_P(
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001460 MessageBridgeTests, MessageBridgeParameterizedTest,
1461 ::testing::Values(
1462 Param{"message_bridge_test_combined_timestamps_common_config.json",
1463 true},
1464 Param{"message_bridge_test_common_config.json", false}));
1465
Austin Schuhe84c3ed2019-12-14 15:29:48 -08001466} // namespace testing
1467} // namespace message_bridge
1468} // namespace aos