blob: 681001e59bf2bad41c5fb29a0ce9daa6b3b46290 [file] [log] [blame]
Austin Schuhe84c3ed2019-12-14 15:29:48 -08001#include <chrono>
2#include <thread>
3
Austin Schuh2f8fd752020-09-01 22:38:28 -07004#include "absl/strings/str_cat.h"
Austin Schuhe84c3ed2019-12-14 15:29:48 -08005#include "aos/events/ping_generated.h"
6#include "aos/events/pong_generated.h"
Brian Silverman7b266d92021-02-17 21:24:02 -08007#include "aos/ipc_lib/event.h"
Austin Schuhe84c3ed2019-12-14 15:29:48 -08008#include "aos/network/message_bridge_client_lib.h"
9#include "aos/network/message_bridge_server_lib.h"
Jim Ostrowski2192ddb2020-06-24 19:07:31 -070010#include "aos/network/team_number.h"
Austin Schuhb0e439d2023-05-15 10:55:40 -070011#include "aos/sha256.h"
Austin Schuh373f1762021-06-02 21:07:09 -070012#include "aos/testing/path.h"
Austin Schuhe991fe22020-11-18 16:53:39 -080013#include "aos/util/file.h"
Brian Silverman7b266d92021-02-17 21:24:02 -080014#include "gtest/gtest.h"
Austin Schuhe84c3ed2019-12-14 15:29:48 -080015
Austin Schuh8902fa52021-03-14 22:39:24 -070016DECLARE_string(boot_uuid);
17
Austin Schuhe84c3ed2019-12-14 15:29:48 -080018namespace aos {
Austin Schuh2f8fd752020-09-01 22:38:28 -070019void SetShmBase(const std::string_view base);
20
Austin Schuhe84c3ed2019-12-14 15:29:48 -080021namespace message_bridge {
22namespace testing {
23
Austin Schuh373f1762021-06-02 21:07:09 -070024using aos::testing::ArtifactPath;
25
Austin Schuhe84c3ed2019-12-14 15:29:48 -080026namespace chrono = std::chrono;
27
Austin Schuhe991fe22020-11-18 16:53:39 -080028std::string ShmBase(const std::string_view node) {
Austin Schuh2f8fd752020-09-01 22:38:28 -070029 const char *tmpdir_c_str = getenv("TEST_TMPDIR");
30 if (tmpdir_c_str != nullptr) {
Austin Schuhe991fe22020-11-18 16:53:39 -080031 return absl::StrCat(tmpdir_c_str, "/", node);
Austin Schuh2f8fd752020-09-01 22:38:28 -070032 } else {
Austin Schuhe991fe22020-11-18 16:53:39 -080033 return absl::StrCat("/dev/shm/", node);
Austin Schuh2f8fd752020-09-01 22:38:28 -070034 }
35}
36
Austin Schuhe991fe22020-11-18 16:53:39 -080037void DoSetShmBase(const std::string_view node) {
38 aos::SetShmBase(ShmBase(node));
39}
40
Austin Schuh36a2c3e2021-02-18 22:28:38 -080041// Parameters to run all the tests with.
42struct Param {
43 // The config file to use.
44 std::string config;
45 // If true, the RemoteMessage channel should be shared between all the remote
46 // channels. If false, there will be 1 RemoteMessage channel per remote
47 // channel.
48 bool shared;
49};
50
51class MessageBridgeParameterizedTest
52 : public ::testing::TestWithParam<struct Param> {
Austin Schuh0de30f32020-12-06 12:44:28 -080053 public:
Austin Schuh36a2c3e2021-02-18 22:28:38 -080054 MessageBridgeParameterizedTest()
55 : config(aos::configuration::ReadConfig(
Austin Schuh373f1762021-06-02 21:07:09 -070056 ArtifactPath(absl::StrCat("aos/network/", GetParam().config)))),
Austin Schuhb0e439d2023-05-15 10:55:40 -070057 config_sha256(Sha256(config.span())),
Austin Schuh8902fa52021-03-14 22:39:24 -070058 pi1_boot_uuid_(UUID::Random()),
59 pi2_boot_uuid_(UUID::Random()) {
Austin Schuh0de30f32020-12-06 12:44:28 -080060 util::UnlinkRecursive(ShmBase("pi1"));
61 util::UnlinkRecursive(ShmBase("pi2"));
62 }
Austin Schuhe991fe22020-11-18 16:53:39 -080063
Austin Schuh36a2c3e2021-02-18 22:28:38 -080064 bool shared() const { return GetParam().shared; }
65
Austin Schuh0a2f12f2021-01-08 22:48:29 -080066 void OnPi1() {
67 DoSetShmBase("pi1");
68 FLAGS_override_hostname = "raspberrypi";
Austin Schuh8902fa52021-03-14 22:39:24 -070069 FLAGS_boot_uuid = pi1_boot_uuid_.ToString();
Austin Schuh0a2f12f2021-01-08 22:48:29 -080070 }
71
72 void OnPi2() {
73 DoSetShmBase("pi2");
74 FLAGS_override_hostname = "raspberrypi2";
Austin Schuh8902fa52021-03-14 22:39:24 -070075 FLAGS_boot_uuid = pi2_boot_uuid_.ToString();
Austin Schuh0a2f12f2021-01-08 22:48:29 -080076 }
77
Austin Schuhb0e439d2023-05-15 10:55:40 -070078 void MakePi1Server(std::string server_config_sha256 = "") {
Austin Schuh0a2f12f2021-01-08 22:48:29 -080079 OnPi1();
80 FLAGS_application_name = "pi1_message_bridge_server";
81 pi1_server_event_loop =
Austin Schuhf466ab52021-02-16 22:00:38 -080082 std::make_unique<aos::ShmEventLoop>(&config.message());
Austin Schuh0a2f12f2021-01-08 22:48:29 -080083 pi1_server_event_loop->SetRuntimeRealtimePriority(1);
Austin Schuhb0e439d2023-05-15 10:55:40 -070084 pi1_message_bridge_server = std::make_unique<MessageBridgeServer>(
85 pi1_server_event_loop.get(), server_config_sha256.size() == 0
86 ? config_sha256
87 : server_config_sha256);
Austin Schuh0a2f12f2021-01-08 22:48:29 -080088 }
89
90 void RunPi1Server(chrono::nanoseconds duration) {
91 // Setup a shutdown callback.
92 aos::TimerHandler *const quit = pi1_server_event_loop->AddTimer(
93 [this]() { pi1_server_event_loop->Exit(); });
94 pi1_server_event_loop->OnRun([this, quit, duration]() {
95 // Stop between timestamps, not exactly on them.
96 quit->Setup(pi1_server_event_loop->monotonic_now() + duration);
97 });
98
99 pi1_server_event_loop->Run();
100 }
101
102 void StartPi1Server() {
103 pi1_server_thread = std::thread([this]() {
104 LOG(INFO) << "Started pi1_message_bridge_server";
105 pi1_server_event_loop->Run();
106 });
107 }
108
109 void StopPi1Server() {
110 if (pi1_server_thread.joinable()) {
111 pi1_server_event_loop->Exit();
112 pi1_server_thread.join();
113 pi1_server_thread = std::thread();
114 }
115 pi1_message_bridge_server.reset();
116 pi1_server_event_loop.reset();
117 }
118
119 void MakePi1Client() {
120 OnPi1();
121 FLAGS_application_name = "pi1_message_bridge_client";
122 pi1_client_event_loop =
Austin Schuhf466ab52021-02-16 22:00:38 -0800123 std::make_unique<aos::ShmEventLoop>(&config.message());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800124 pi1_client_event_loop->SetRuntimeRealtimePriority(1);
Austin Schuhb0e439d2023-05-15 10:55:40 -0700125 pi1_message_bridge_client = std::make_unique<MessageBridgeClient>(
126 pi1_client_event_loop.get(), config_sha256);
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800127 }
128
129 void StartPi1Client() {
130 pi1_client_thread = std::thread([this]() {
131 LOG(INFO) << "Started pi1_message_bridge_client";
132 pi1_client_event_loop->Run();
133 });
134 }
135
136 void StopPi1Client() {
137 pi1_client_event_loop->Exit();
138 pi1_client_thread.join();
139 pi1_client_thread = std::thread();
140 pi1_message_bridge_client.reset();
141 pi1_client_event_loop.reset();
142 }
143
144 void MakePi1Test() {
145 OnPi1();
146 FLAGS_application_name = "test1";
147 pi1_test_event_loop =
Austin Schuhf466ab52021-02-16 22:00:38 -0800148 std::make_unique<aos::ShmEventLoop>(&config.message());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800149
150 pi1_test_event_loop->MakeWatcher(
151 "/pi1/aos", [](const ServerStatistics &stats) {
152 VLOG(1) << "/pi1/aos ServerStatistics " << FlatbufferToJson(&stats);
153 });
154
155 pi1_test_event_loop->MakeWatcher(
156 "/pi1/aos", [](const ClientStatistics &stats) {
157 VLOG(1) << "/pi1/aos ClientStatistics " << FlatbufferToJson(&stats);
158 });
159
160 pi1_test_event_loop->MakeWatcher(
161 "/pi1/aos", [](const Timestamp &timestamp) {
162 VLOG(1) << "/pi1/aos Timestamp " << FlatbufferToJson(&timestamp);
163 });
Austin Schuh8902fa52021-03-14 22:39:24 -0700164 pi1_test_event_loop->MakeWatcher(
165 "/pi2/aos", [this](const Timestamp &timestamp) {
166 VLOG(1) << "/pi2/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuha9012be2021-07-21 15:19:11 -0700167 EXPECT_EQ(pi1_test_event_loop->context().source_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -0700168 pi2_boot_uuid_);
169 });
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800170 }
171
172 void StartPi1Test() {
173 pi1_test_thread = std::thread([this]() {
174 LOG(INFO) << "Started pi1_test";
175 pi1_test_event_loop->Run();
176 });
177 }
178
179 void StopPi1Test() {
180 pi1_test_event_loop->Exit();
181 pi1_test_thread.join();
182 }
183
184 void MakePi2Server() {
185 OnPi2();
186 FLAGS_application_name = "pi2_message_bridge_server";
187 pi2_server_event_loop =
Austin Schuhf466ab52021-02-16 22:00:38 -0800188 std::make_unique<aos::ShmEventLoop>(&config.message());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800189 pi2_server_event_loop->SetRuntimeRealtimePriority(1);
Austin Schuhb0e439d2023-05-15 10:55:40 -0700190 pi2_message_bridge_server = std::make_unique<MessageBridgeServer>(
191 pi2_server_event_loop.get(), config_sha256);
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800192 }
193
194 void RunPi2Server(chrono::nanoseconds duration) {
195 // Setup a shutdown callback.
196 aos::TimerHandler *const quit = pi2_server_event_loop->AddTimer(
197 [this]() { pi2_server_event_loop->Exit(); });
198 pi2_server_event_loop->OnRun([this, quit, duration]() {
199 // Stop between timestamps, not exactly on them.
200 quit->Setup(pi2_server_event_loop->monotonic_now() + duration);
201 });
202
203 pi2_server_event_loop->Run();
204 }
205
206 void StartPi2Server() {
207 pi2_server_thread = std::thread([this]() {
208 LOG(INFO) << "Started pi2_message_bridge_server";
209 pi2_server_event_loop->Run();
210 });
211 }
212
213 void StopPi2Server() {
214 if (pi2_server_thread.joinable()) {
215 pi2_server_event_loop->Exit();
216 pi2_server_thread.join();
217 pi2_server_thread = std::thread();
218 }
219 pi2_message_bridge_server.reset();
220 pi2_server_event_loop.reset();
221 }
222
223 void MakePi2Client() {
224 OnPi2();
225 FLAGS_application_name = "pi2_message_bridge_client";
226 pi2_client_event_loop =
Austin Schuhf466ab52021-02-16 22:00:38 -0800227 std::make_unique<aos::ShmEventLoop>(&config.message());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800228 pi2_client_event_loop->SetRuntimeRealtimePriority(1);
Austin Schuhb0e439d2023-05-15 10:55:40 -0700229 pi2_message_bridge_client = std::make_unique<MessageBridgeClient>(
230 pi2_client_event_loop.get(), config_sha256);
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800231 }
232
233 void RunPi2Client(chrono::nanoseconds duration) {
234 // Run for 5 seconds to make sure we have time to estimate the offset.
235 aos::TimerHandler *const quit = pi2_client_event_loop->AddTimer(
236 [this]() { pi2_client_event_loop->Exit(); });
237 pi2_client_event_loop->OnRun([this, quit, duration]() {
238 // Stop between timestamps, not exactly on them.
239 quit->Setup(pi2_client_event_loop->monotonic_now() + duration);
240 });
241
242 // And go!
243 pi2_client_event_loop->Run();
244 }
245
246 void StartPi2Client() {
247 pi2_client_thread = std::thread([this]() {
248 LOG(INFO) << "Started pi2_message_bridge_client";
249 pi2_client_event_loop->Run();
250 });
251 }
252
253 void StopPi2Client() {
254 if (pi2_client_thread.joinable()) {
255 pi2_client_event_loop->Exit();
256 pi2_client_thread.join();
257 pi2_client_thread = std::thread();
258 }
259 pi2_message_bridge_client.reset();
260 pi2_client_event_loop.reset();
261 }
262
263 void MakePi2Test() {
264 OnPi2();
265 FLAGS_application_name = "test2";
266 pi2_test_event_loop =
Austin Schuhf466ab52021-02-16 22:00:38 -0800267 std::make_unique<aos::ShmEventLoop>(&config.message());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800268
269 pi2_test_event_loop->MakeWatcher(
270 "/pi2/aos", [](const ServerStatistics &stats) {
271 VLOG(1) << "/pi2/aos ServerStatistics " << FlatbufferToJson(&stats);
272 });
273
274 pi2_test_event_loop->MakeWatcher(
275 "/pi2/aos", [](const ClientStatistics &stats) {
276 VLOG(1) << "/pi2/aos ClientStatistics " << FlatbufferToJson(&stats);
277 });
278
279 pi2_test_event_loop->MakeWatcher(
Austin Schuh8902fa52021-03-14 22:39:24 -0700280 "/pi1/aos", [this](const Timestamp &timestamp) {
281 VLOG(1) << "/pi1/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuha9012be2021-07-21 15:19:11 -0700282 EXPECT_EQ(pi2_test_event_loop->context().source_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -0700283 pi1_boot_uuid_);
284 });
285 pi2_test_event_loop->MakeWatcher(
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800286 "/pi2/aos", [](const Timestamp &timestamp) {
287 VLOG(1) << "/pi2/aos Timestamp " << FlatbufferToJson(&timestamp);
288 });
289 }
290
291 void StartPi2Test() {
292 pi2_test_thread = std::thread([this]() {
293 LOG(INFO) << "Started pi2_message_bridge_test";
294 pi2_test_event_loop->Run();
295 });
296 }
297
298 void StopPi2Test() {
299 pi2_test_event_loop->Exit();
300 pi2_test_thread.join();
301 }
302
Austin Schuhf466ab52021-02-16 22:00:38 -0800303 aos::FlatbufferDetachedBuffer<aos::Configuration> config;
Austin Schuhb0e439d2023-05-15 10:55:40 -0700304 std::string config_sha256;
305
Austin Schuh8902fa52021-03-14 22:39:24 -0700306 const UUID pi1_boot_uuid_;
307 const UUID pi2_boot_uuid_;
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800308
309 std::unique_ptr<aos::ShmEventLoop> pi1_server_event_loop;
310 std::unique_ptr<MessageBridgeServer> pi1_message_bridge_server;
311 std::thread pi1_server_thread;
312
313 std::unique_ptr<aos::ShmEventLoop> pi1_client_event_loop;
314 std::unique_ptr<MessageBridgeClient> pi1_message_bridge_client;
315 std::thread pi1_client_thread;
316
317 std::unique_ptr<aos::ShmEventLoop> pi1_test_event_loop;
318 std::thread pi1_test_thread;
319
320 std::unique_ptr<aos::ShmEventLoop> pi2_server_event_loop;
321 std::unique_ptr<MessageBridgeServer> pi2_message_bridge_server;
322 std::thread pi2_server_thread;
323
324 std::unique_ptr<aos::ShmEventLoop> pi2_client_event_loop;
325 std::unique_ptr<MessageBridgeClient> pi2_message_bridge_client;
326 std::thread pi2_client_thread;
327
328 std::unique_ptr<aos::ShmEventLoop> pi2_test_event_loop;
329 std::thread pi2_test_thread;
Austin Schuhe991fe22020-11-18 16:53:39 -0800330};
331
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800332// Test that we can send a ping message over sctp and receive it.
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800333TEST_P(MessageBridgeParameterizedTest, PingPong) {
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800334 // This is rather annoying to set up. We need to start up a client and
335 // server, on the same node, but get them to think that they are on different
336 // nodes.
337 //
338 // We then get to wait until they are connected.
339 //
340 // After they are connected, we send a Ping message.
341 //
342 // On the other end, we receive a Pong message.
343 //
344 // But, we need the client to not post directly to "/test" like it would in a
345 // real system, otherwise we will re-send the ping message... So, use an
346 // application specific map to have the client post somewhere else.
347 //
348 // To top this all off, each of these needs to be done with a ShmEventLoop,
349 // which needs to run in a separate thread... And it is really hard to get
350 // everything started up reliably. So just be super generous on timeouts and
351 // hope for the best. We can be more generous in the future if we need to.
352 //
353 // We are faking the application names by passing in --application_name=foo
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800354 OnPi1();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800355 // Force ourselves to be "raspberrypi" and allocate everything.
Austin Schuh2f8fd752020-09-01 22:38:28 -0700356
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800357 MakePi1Server();
358 MakePi1Client();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800359
Austin Schuh89e1e9c2023-05-15 14:38:44 -0700360 const std::string long_data = std::string(10000, 'a');
361
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800362 // And build the app which sends the pings.
363 FLAGS_application_name = "ping";
Austin Schuhf466ab52021-02-16 22:00:38 -0800364 aos::ShmEventLoop ping_event_loop(&config.message());
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800365 aos::Sender<examples::Ping> ping_sender =
366 ping_event_loop.MakeSender<examples::Ping>("/test");
367
Austin Schuhf466ab52021-02-16 22:00:38 -0800368 aos::ShmEventLoop pi1_test_event_loop(&config.message());
Austin Schuh0de30f32020-12-06 12:44:28 -0800369 aos::Fetcher<RemoteMessage> message_header_fetcher1 =
370 pi1_test_event_loop.MakeFetcher<RemoteMessage>(
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800371 shared() ? "/pi1/aos/remote_timestamps/pi2"
372 : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping");
Austin Schuh2f8fd752020-09-01 22:38:28 -0700373
374 // Fetchers for confirming the remote timestamps made it.
375 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
376 ping_event_loop.MakeFetcher<examples::Ping>("/test");
377 aos::Fetcher<Timestamp> pi1_on_pi1_timestamp_fetcher =
378 ping_event_loop.MakeFetcher<Timestamp>("/aos");
379
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800380 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800381 OnPi2();
Austin Schuh2f8fd752020-09-01 22:38:28 -0700382
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800383 MakePi2Client();
384 MakePi2Server();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800385
386 // And build the app which sends the pongs.
387 FLAGS_application_name = "pong";
Austin Schuhf466ab52021-02-16 22:00:38 -0800388 aos::ShmEventLoop pong_event_loop(&config.message());
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800389
Austin Schuh7bc59052020-02-16 23:48:33 -0800390 // And build the app for testing.
391 FLAGS_application_name = "test";
Austin Schuhf466ab52021-02-16 22:00:38 -0800392 aos::ShmEventLoop test_event_loop(&config.message());
Austin Schuh7bc59052020-02-16 23:48:33 -0800393
394 aos::Fetcher<ClientStatistics> client_statistics_fetcher =
395 test_event_loop.MakeFetcher<ClientStatistics>("/aos");
Austin Schuh0de30f32020-12-06 12:44:28 -0800396 aos::Fetcher<RemoteMessage> message_header_fetcher2 =
397 test_event_loop.MakeFetcher<RemoteMessage>(
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800398 shared() ? "/pi2/aos/remote_timestamps/pi1"
399 : "/pi2/aos/remote_timestamps/pi1/pi2/aos/"
400 "aos-message_bridge-Timestamp");
Austin Schuh2f8fd752020-09-01 22:38:28 -0700401
402 // Event loop for fetching data delivered to pi2 from pi1 to match up
403 // messages.
Austin Schuhf466ab52021-02-16 22:00:38 -0800404 aos::ShmEventLoop delivered_messages_event_loop(&config.message());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700405 aos::Fetcher<Timestamp> pi1_on_pi2_timestamp_fetcher =
406 delivered_messages_event_loop.MakeFetcher<Timestamp>("/pi1/aos");
407 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
408 delivered_messages_event_loop.MakeFetcher<examples::Ping>("/test");
409 EXPECT_FALSE(ping_on_pi2_fetcher.Fetch());
410 EXPECT_FALSE(pi1_on_pi2_timestamp_fetcher.Fetch());
Austin Schuh7bc59052020-02-16 23:48:33 -0800411
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800412 // Count the pongs.
413 int pong_count = 0;
Austin Schuh8902fa52021-03-14 22:39:24 -0700414 pong_event_loop.MakeWatcher("/test", [&pong_count, &pong_event_loop,
415 this](const examples::Ping &ping) {
Austin Schuha9012be2021-07-21 15:19:11 -0700416 EXPECT_EQ(pong_event_loop.context().source_boot_uuid, pi1_boot_uuid_);
Austin Schuh8902fa52021-03-14 22:39:24 -0700417 ++pong_count;
418 VLOG(1) << "Got ping back " << FlatbufferToJson(&ping);
419 });
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800420
421 FLAGS_override_hostname = "";
422
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800423 // Wait until we are connected, then send.
424 int ping_count = 0;
Austin Schuh7bc59052020-02-16 23:48:33 -0800425 int pi1_server_statistics_count = 0;
Austin Schuh61e973f2021-02-21 21:43:56 -0800426 ping_event_loop.MakeWatcher("/pi1/aos", [this, &ping_count, &ping_sender,
Austin Schuh89e1e9c2023-05-15 14:38:44 -0700427 &pi1_server_statistics_count,
428 &long_data](
Austin Schuh61e973f2021-02-21 21:43:56 -0800429 const ServerStatistics &stats) {
430 VLOG(1) << "/pi1/aos ServerStatistics " << FlatbufferToJson(&stats);
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800431
Austin Schuh61e973f2021-02-21 21:43:56 -0800432 ASSERT_TRUE(stats.has_connections());
433 EXPECT_EQ(stats.connections()->size(), 1);
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800434
Austin Schuh61e973f2021-02-21 21:43:56 -0800435 bool connected = false;
436 for (const ServerConnection *connection : *stats.connections()) {
437 // Confirm that we are estimating the server time offset correctly. It
438 // should be about 0 since we are on the same machine here.
439 if (connection->has_monotonic_offset()) {
440 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
441 chrono::milliseconds(1));
442 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
443 chrono::milliseconds(-1));
444 ++pi1_server_statistics_count;
445 }
Austin Schuh7bc59052020-02-16 23:48:33 -0800446
Austin Schuh61e973f2021-02-21 21:43:56 -0800447 if (connection->node()->name()->string_view() ==
448 pi2_client_event_loop->node()->name()->string_view()) {
449 if (connection->state() == State::CONNECTED) {
450 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh367a7f42021-11-23 23:04:36 -0800451 EXPECT_EQ(connection->connection_count(), 1u);
452 EXPECT_LT(monotonic_clock::time_point(chrono::nanoseconds(
453 connection->connected_since_time())),
454 monotonic_clock::now());
Austin Schuh61e973f2021-02-21 21:43:56 -0800455 connected = true;
Austin Schuh367a7f42021-11-23 23:04:36 -0800456 } else {
457 EXPECT_FALSE(connection->has_connection_count());
458 EXPECT_FALSE(connection->has_connected_since_time());
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800459 }
Austin Schuh61e973f2021-02-21 21:43:56 -0800460 }
461 }
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800462
Austin Schuh61e973f2021-02-21 21:43:56 -0800463 if (connected) {
464 VLOG(1) << "Connected! Sent ping.";
465 auto builder = ping_sender.MakeBuilder();
Austin Schuh89e1e9c2023-05-15 14:38:44 -0700466 builder.fbb()->CreateString(long_data);
Austin Schuh61e973f2021-02-21 21:43:56 -0800467 examples::Ping::Builder ping_builder =
468 builder.MakeBuilder<examples::Ping>();
469 ping_builder.add_value(ping_count + 971);
Austin Schuh60e77942022-05-16 17:48:24 -0700470 EXPECT_EQ(builder.Send(ping_builder.Finish()), RawSender::Error::kOk);
Austin Schuh61e973f2021-02-21 21:43:56 -0800471 ++ping_count;
472 }
473 });
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800474
Austin Schuh7bc59052020-02-16 23:48:33 -0800475 // Confirm both client and server statistics messages have decent offsets in
476 // them.
477 int pi2_server_statistics_count = 0;
Austin Schuh196a4452020-03-15 23:12:03 -0700478 pong_event_loop.MakeWatcher("/pi2/aos", [&pi2_server_statistics_count](
Austin Schuh7bc59052020-02-16 23:48:33 -0800479 const ServerStatistics &stats) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800480 VLOG(1) << "/pi2/aos ServerStatistics " << FlatbufferToJson(&stats);
Austin Schuh7bc59052020-02-16 23:48:33 -0800481 for (const ServerConnection *connection : *stats.connections()) {
482 if (connection->has_monotonic_offset()) {
483 ++pi2_server_statistics_count;
484 // Confirm that we are estimating the server time offset correctly. It
485 // should be about 0 since we are on the same machine here.
486 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
487 chrono::milliseconds(1));
488 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
489 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800490 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh7bc59052020-02-16 23:48:33 -0800491 }
Austin Schuh367a7f42021-11-23 23:04:36 -0800492
493 if (connection->state() == State::CONNECTED) {
494 EXPECT_EQ(connection->connection_count(), 1u);
495 EXPECT_LT(monotonic_clock::time_point(
496 chrono::nanoseconds(connection->connected_since_time())),
497 monotonic_clock::now());
498 } else {
499 EXPECT_FALSE(connection->has_connection_count());
500 EXPECT_FALSE(connection->has_connected_since_time());
501 }
Austin Schuh7bc59052020-02-16 23:48:33 -0800502 }
503 });
504
505 int pi1_client_statistics_count = 0;
Austin Schuh367a7f42021-11-23 23:04:36 -0800506 int pi1_connected_client_statistics_count = 0;
507 ping_event_loop.MakeWatcher(
508 "/pi1/aos",
509 [&pi1_client_statistics_count,
510 &pi1_connected_client_statistics_count](const ClientStatistics &stats) {
511 VLOG(1) << "/pi1/aos ClientStatistics " << FlatbufferToJson(&stats);
Austin Schuh7bc59052020-02-16 23:48:33 -0800512
Austin Schuh367a7f42021-11-23 23:04:36 -0800513 for (const ClientConnection *connection : *stats.connections()) {
514 if (connection->has_monotonic_offset()) {
515 ++pi1_client_statistics_count;
516 // It takes at least 10 microseconds to send a message between the
517 // client and server. The min (filtered) time shouldn't be over 10
518 // milliseconds on localhost. This might have to bump up if this is
519 // proving flaky.
520 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
521 chrono::milliseconds(10))
522 << " " << connection->monotonic_offset()
523 << "ns vs 10000ns on iteration " << pi1_client_statistics_count;
524 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
525 chrono::microseconds(10))
526 << " " << connection->monotonic_offset()
527 << "ns vs 10000ns on iteration " << pi1_client_statistics_count;
528 }
529 if (connection->state() == State::CONNECTED) {
530 EXPECT_EQ(connection->connection_count(), 1u);
531 EXPECT_LT(monotonic_clock::time_point(chrono::nanoseconds(
532 connection->connected_since_time())),
533 monotonic_clock::now());
534 // The first Connected message may not have a UUID in it since no
535 // data has flown. That's fine.
536 if (pi1_connected_client_statistics_count > 0) {
537 EXPECT_TRUE(connection->has_boot_uuid())
538 << ": " << aos::FlatbufferToJson(connection);
539 }
540 ++pi1_connected_client_statistics_count;
541 } else {
542 EXPECT_FALSE(connection->has_connection_count());
543 EXPECT_FALSE(connection->has_connected_since_time());
544 }
545 }
546 });
Austin Schuh7bc59052020-02-16 23:48:33 -0800547
548 int pi2_client_statistics_count = 0;
Austin Schuh367a7f42021-11-23 23:04:36 -0800549 int pi2_connected_client_statistics_count = 0;
550 pong_event_loop.MakeWatcher(
551 "/pi2/aos",
552 [&pi2_client_statistics_count,
553 &pi2_connected_client_statistics_count](const ClientStatistics &stats) {
554 VLOG(1) << "/pi2/aos ClientStatistics " << FlatbufferToJson(&stats);
Austin Schuh7bc59052020-02-16 23:48:33 -0800555
Austin Schuh367a7f42021-11-23 23:04:36 -0800556 for (const ClientConnection *connection : *stats.connections()) {
557 if (connection->has_monotonic_offset()) {
558 ++pi2_client_statistics_count;
559 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
560 chrono::milliseconds(10))
561 << ": got " << aos::FlatbufferToJson(connection);
562 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
563 chrono::microseconds(10))
564 << ": got " << aos::FlatbufferToJson(connection);
565 }
566 if (connection->state() == State::CONNECTED) {
567 EXPECT_EQ(connection->connection_count(), 1u);
568 EXPECT_LT(monotonic_clock::time_point(chrono::nanoseconds(
569 connection->connected_since_time())),
570 monotonic_clock::now());
571 if (pi2_connected_client_statistics_count > 0) {
572 EXPECT_TRUE(connection->has_boot_uuid());
573 }
574 ++pi2_connected_client_statistics_count;
575 } else {
576 EXPECT_FALSE(connection->has_connection_count());
577 EXPECT_FALSE(connection->has_connected_since_time());
578 }
579 }
580 });
Austin Schuh7bc59052020-02-16 23:48:33 -0800581
Austin Schuh196a4452020-03-15 23:12:03 -0700582 ping_event_loop.MakeWatcher("/pi1/aos", [](const Timestamp &timestamp) {
Austin Schuh7bc59052020-02-16 23:48:33 -0800583 EXPECT_TRUE(timestamp.has_offsets());
Austin Schuh1ca49e92020-12-11 00:01:27 -0800584 VLOG(1) << "/pi1/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuh7bc59052020-02-16 23:48:33 -0800585 });
Austin Schuh196a4452020-03-15 23:12:03 -0700586 pong_event_loop.MakeWatcher("/pi2/aos", [](const Timestamp &timestamp) {
Austin Schuh7bc59052020-02-16 23:48:33 -0800587 EXPECT_TRUE(timestamp.has_offsets());
Austin Schuh1ca49e92020-12-11 00:01:27 -0800588 VLOG(1) << "/pi2/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuh7bc59052020-02-16 23:48:33 -0800589 });
590
591 // Run for 5 seconds to make sure we have time to estimate the offset.
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800592 aos::TimerHandler *quit = ping_event_loop.AddTimer(
593 [&ping_event_loop]() { ping_event_loop.Exit(); });
594 ping_event_loop.OnRun([quit, &ping_event_loop]() {
Austin Schuh7bc59052020-02-16 23:48:33 -0800595 // Stop between timestamps, not exactly on them.
596 quit->Setup(ping_event_loop.monotonic_now() + chrono::milliseconds(5050));
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800597 });
598
Austin Schuh2f8fd752020-09-01 22:38:28 -0700599 // Find the channel index for both the /pi1/aos Timestamp channel and Ping
600 // channel.
601 const size_t pi1_timestamp_channel = configuration::ChannelIndex(
602 pong_event_loop.configuration(), pi1_on_pi2_timestamp_fetcher.channel());
603 const size_t ping_timestamp_channel =
604 configuration::ChannelIndex(delivered_messages_event_loop.configuration(),
605 ping_on_pi2_fetcher.channel());
606
607 for (const Channel *channel : *ping_event_loop.configuration()->channels()) {
608 VLOG(1) << "Channel "
609 << configuration::ChannelIndex(ping_event_loop.configuration(),
610 channel)
611 << " " << configuration::CleanedChannelToString(channel);
612 }
613
614 // For each remote timestamp we get back, confirm that it is either a ping
615 // message, or a timestamp we sent out. Also confirm that the timestamps are
616 // correct.
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800617 for (std::pair<int, std::string> channel :
618 shared()
619 ? std::vector<std::pair<
620 int, std::string>>{{-1, "/pi1/aos/remote_timestamps/pi2"}}
621 : std::vector<std::pair<int, std::string>>{
622 {pi1_timestamp_channel,
623 "/pi1/aos/remote_timestamps/pi2/pi1/aos/"
624 "aos-message_bridge-Timestamp"},
625 {ping_timestamp_channel,
626 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping"}}) {
627 ping_event_loop.MakeWatcher(
628 channel.second,
629 [pi1_timestamp_channel, ping_timestamp_channel, &ping_on_pi2_fetcher,
630 &ping_on_pi1_fetcher, &pi1_on_pi2_timestamp_fetcher,
631 &pi1_on_pi1_timestamp_fetcher,
632 channel_index = channel.first](const RemoteMessage &header) {
633 VLOG(1) << "/pi1/aos/remote_timestamps/pi2 RemoteMessage "
634 << aos::FlatbufferToJson(&header);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700635
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800636 EXPECT_TRUE(header.has_boot_uuid());
637 if (channel_index != -1) {
638 ASSERT_EQ(channel_index, header.channel_index());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700639 }
640
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800641 const aos::monotonic_clock::time_point header_monotonic_sent_time(
642 chrono::nanoseconds(header.monotonic_sent_time()));
643 const aos::realtime_clock::time_point header_realtime_sent_time(
644 chrono::nanoseconds(header.realtime_sent_time()));
645 const aos::monotonic_clock::time_point header_monotonic_remote_time(
646 chrono::nanoseconds(header.monotonic_remote_time()));
647 const aos::realtime_clock::time_point header_realtime_remote_time(
648 chrono::nanoseconds(header.realtime_remote_time()));
649
650 const Context *pi1_context = nullptr;
651 const Context *pi2_context = nullptr;
652
653 if (header.channel_index() == pi1_timestamp_channel) {
654 // Find the forwarded message.
655 while (pi1_on_pi2_timestamp_fetcher.context().monotonic_event_time <
656 header_monotonic_sent_time) {
657 ASSERT_TRUE(pi1_on_pi2_timestamp_fetcher.FetchNext());
658 }
659
660 // And the source message.
661 while (pi1_on_pi1_timestamp_fetcher.context().monotonic_event_time <
662 header_monotonic_remote_time) {
663 ASSERT_TRUE(pi1_on_pi1_timestamp_fetcher.FetchNext());
664 }
665
666 pi1_context = &pi1_on_pi1_timestamp_fetcher.context();
667 pi2_context = &pi1_on_pi2_timestamp_fetcher.context();
668 } else if (header.channel_index() == ping_timestamp_channel) {
669 // Find the forwarded message.
670 while (ping_on_pi2_fetcher.context().monotonic_event_time <
671 header_monotonic_sent_time) {
672 ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
673 }
674
675 // And the source message.
676 while (ping_on_pi1_fetcher.context().monotonic_event_time <
677 header_monotonic_remote_time) {
678 ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
679 }
680
681 pi1_context = &ping_on_pi1_fetcher.context();
682 pi2_context = &ping_on_pi2_fetcher.context();
683 } else {
684 LOG(FATAL) << "Unknown channel";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700685 }
686
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800687 // Confirm the forwarded message has matching timestamps to the
688 // timestamps we got back.
689 EXPECT_EQ(pi2_context->queue_index, header.queue_index());
690 EXPECT_EQ(pi2_context->monotonic_event_time,
691 header_monotonic_sent_time);
692 EXPECT_EQ(pi2_context->realtime_event_time,
693 header_realtime_sent_time);
694 EXPECT_EQ(pi2_context->realtime_remote_time,
695 header_realtime_remote_time);
696 EXPECT_EQ(pi2_context->monotonic_remote_time,
697 header_monotonic_remote_time);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700698
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800699 // Confirm the forwarded message also matches the source message.
700 EXPECT_EQ(pi1_context->queue_index, header.queue_index());
701 EXPECT_EQ(pi1_context->monotonic_event_time,
702 header_monotonic_remote_time);
703 EXPECT_EQ(pi1_context->realtime_event_time,
704 header_realtime_remote_time);
705 });
706 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700707
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800708 // Start everything up. Pong is the only thing we don't know how to wait
709 // on, so start it first.
Austin Schuh7bc59052020-02-16 23:48:33 -0800710 std::thread pong_thread([&pong_event_loop]() { pong_event_loop.Run(); });
711
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800712 StartPi1Server();
713 StartPi1Client();
714 StartPi2Client();
715 StartPi2Server();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800716
717 // And go!
718 ping_event_loop.Run();
719
720 // Shut everyone else down
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800721 StopPi1Server();
722 StopPi1Client();
723 StopPi2Client();
724 StopPi2Server();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800725 pong_event_loop.Exit();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800726 pong_thread.join();
727
728 // Make sure we sent something.
729 EXPECT_GE(ping_count, 1);
730 // And got something back.
731 EXPECT_GE(pong_count, 1);
Austin Schuh7bc59052020-02-16 23:48:33 -0800732
733 // Confirm that we are estimating a monotonic offset on the client.
734 ASSERT_TRUE(client_statistics_fetcher.Fetch());
735
736 EXPECT_EQ(client_statistics_fetcher->connections()->size(), 1u);
737 EXPECT_EQ(client_statistics_fetcher->connections()
738 ->Get(0)
739 ->node()
740 ->name()
741 ->string_view(),
742 "pi1");
743
744 // Make sure the offset in one direction is less than a second.
745 EXPECT_GT(
Austin Schuh2b159eb2021-07-31 19:42:21 -0700746 client_statistics_fetcher->connections()->Get(0)->monotonic_offset(), 0)
747 << aos::FlatbufferToJson(client_statistics_fetcher.get());
Austin Schuh7bc59052020-02-16 23:48:33 -0800748 EXPECT_LT(
749 client_statistics_fetcher->connections()->Get(0)->monotonic_offset(),
Austin Schuh2b159eb2021-07-31 19:42:21 -0700750 1000000000)
751 << aos::FlatbufferToJson(client_statistics_fetcher.get());
Austin Schuh7bc59052020-02-16 23:48:33 -0800752
753 EXPECT_GE(pi1_server_statistics_count, 2);
754 EXPECT_GE(pi2_server_statistics_count, 2);
755 EXPECT_GE(pi1_client_statistics_count, 2);
756 EXPECT_GE(pi2_client_statistics_count, 2);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700757
758 // Confirm we got timestamps back!
759 EXPECT_TRUE(message_header_fetcher1.Fetch());
760 EXPECT_TRUE(message_header_fetcher2.Fetch());
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800761}
762
Austin Schuh5344c352020-04-12 17:04:26 -0700763// Test that the client disconnecting triggers the server offsets on both sides
764// to clear.
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800765TEST_P(MessageBridgeParameterizedTest, ClientRestart) {
Austin Schuh5344c352020-04-12 17:04:26 -0700766 // This is rather annoying to set up. We need to start up a client and
767 // server, on the same node, but get them to think that they are on different
768 // nodes.
769 //
770 // We need the client to not post directly to "/test" like it would in a
771 // real system, otherwise we will re-send the ping message... So, use an
772 // application specific map to have the client post somewhere else.
773 //
774 // To top this all off, each of these needs to be done with a ShmEventLoop,
775 // which needs to run in a separate thread... And it is really hard to get
776 // everything started up reliably. So just be super generous on timeouts and
777 // hope for the best. We can be more generous in the future if we need to.
778 //
779 // We are faking the application names by passing in --application_name=foo
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800780 OnPi1();
Austin Schuh5344c352020-04-12 17:04:26 -0700781
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800782 MakePi1Server();
783 MakePi1Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700784
785 // And build the app for testing.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800786 MakePi1Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700787 aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800788 pi1_test_event_loop->MakeFetcher<ServerStatistics>("/pi1/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700789
790 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800791 OnPi2();
792 MakePi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700793
794 // And build the app for testing.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800795 MakePi2Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700796 aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800797 pi2_test_event_loop->MakeFetcher<ServerStatistics>("/pi2/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700798
799 // Wait until we are connected, then send.
Austin Schuh5344c352020-04-12 17:04:26 -0700800
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800801 StartPi1Test();
802 StartPi2Test();
803 StartPi1Server();
804 StartPi1Client();
805 StartPi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700806
807 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800808 MakePi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700809
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800810 RunPi2Client(chrono::milliseconds(3050));
Austin Schuh5344c352020-04-12 17:04:26 -0700811
812 // Now confirm we are synchronized.
813 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
814 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
815
816 const ServerConnection *const pi1_connection =
817 pi1_server_statistics_fetcher->connections()->Get(0);
818 const ServerConnection *const pi2_connection =
819 pi2_server_statistics_fetcher->connections()->Get(0);
820
821 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800822 EXPECT_EQ(pi1_connection->connection_count(), 1u);
823 EXPECT_TRUE(pi1_connection->has_connected_since_time());
Austin Schuh5344c352020-04-12 17:04:26 -0700824 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
825 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
826 chrono::milliseconds(1));
827 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
828 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800829 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700830
831 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800832 EXPECT_EQ(pi2_connection->connection_count(), 1u);
833 EXPECT_TRUE(pi2_connection->has_connected_since_time());
Austin Schuh5344c352020-04-12 17:04:26 -0700834 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
835 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
836 chrono::milliseconds(1));
837 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
838 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800839 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800840
841 StopPi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700842 }
843
Austin Schuhd0d894e2021-10-24 17:13:11 -0700844 std::this_thread::sleep_for(SctpClientConnection::kReconnectTimeout +
845 std::chrono::seconds(1));
Austin Schuh5344c352020-04-12 17:04:26 -0700846
847 {
848 // Now confirm we are un-synchronized.
849 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
850 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
851 const ServerConnection *const pi1_connection =
852 pi1_server_statistics_fetcher->connections()->Get(0);
853 const ServerConnection *const pi2_connection =
854 pi2_server_statistics_fetcher->connections()->Get(0);
855
856 EXPECT_EQ(pi1_connection->state(), State::DISCONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800857 EXPECT_EQ(pi1_connection->connection_count(), 1u);
858 EXPECT_FALSE(pi1_connection->has_connected_since_time());
Austin Schuh5344c352020-04-12 17:04:26 -0700859 EXPECT_FALSE(pi1_connection->has_monotonic_offset());
Austin Schuh20ac95d2020-12-05 17:24:19 -0800860 EXPECT_FALSE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700861 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
862 EXPECT_FALSE(pi2_connection->has_monotonic_offset());
Austin Schuh20ac95d2020-12-05 17:24:19 -0800863 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh367a7f42021-11-23 23:04:36 -0800864 EXPECT_EQ(pi2_connection->connection_count(), 1u);
865 EXPECT_TRUE(pi2_connection->has_connected_since_time());
Austin Schuh5344c352020-04-12 17:04:26 -0700866 }
867
868 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800869 MakePi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700870 // And go!
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800871 RunPi2Client(chrono::milliseconds(3050));
Austin Schuh5344c352020-04-12 17:04:26 -0700872
873 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
874 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
875
876 // Now confirm we are synchronized again.
877 const ServerConnection *const pi1_connection =
878 pi1_server_statistics_fetcher->connections()->Get(0);
879 const ServerConnection *const pi2_connection =
880 pi2_server_statistics_fetcher->connections()->Get(0);
881
882 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800883 EXPECT_EQ(pi1_connection->connection_count(), 2u);
884 EXPECT_TRUE(pi1_connection->has_connected_since_time());
Austin Schuh5344c352020-04-12 17:04:26 -0700885 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
886 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
Austin Schuh367a7f42021-11-23 23:04:36 -0800887 chrono::milliseconds(1))
888 << ": " << FlatbufferToJson(pi1_connection);
Austin Schuh5344c352020-04-12 17:04:26 -0700889 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
Austin Schuh367a7f42021-11-23 23:04:36 -0800890 chrono::milliseconds(-1))
891 << ": " << FlatbufferToJson(pi1_connection);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800892 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700893
894 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800895 EXPECT_EQ(pi2_connection->connection_count(), 1u);
896 EXPECT_TRUE(pi2_connection->has_connected_since_time());
Austin Schuh5344c352020-04-12 17:04:26 -0700897 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
898 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
Austin Schuh367a7f42021-11-23 23:04:36 -0800899 chrono::milliseconds(1))
900 << ": " << FlatbufferToJson(pi2_connection);
Austin Schuh5344c352020-04-12 17:04:26 -0700901 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
Austin Schuh367a7f42021-11-23 23:04:36 -0800902 chrono::milliseconds(-1))
903 << ": " << FlatbufferToJson(pi2_connection);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800904 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800905
906 StopPi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700907 }
908
909 // Shut everyone else down
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800910 StopPi1Server();
911 StopPi1Client();
912 StopPi2Server();
913 StopPi1Test();
914 StopPi2Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700915}
916
917// Test that the server disconnecting triggers the server offsets on the other
918// side to clear, along with the other client.
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800919TEST_P(MessageBridgeParameterizedTest, ServerRestart) {
Austin Schuh5344c352020-04-12 17:04:26 -0700920 // This is rather annoying to set up. We need to start up a client and
921 // server, on the same node, but get them to think that they are on different
922 // nodes.
923 //
924 // We need the client to not post directly to "/test" like it would in a
925 // real system, otherwise we will re-send the ping message... So, use an
926 // application specific map to have the client post somewhere else.
927 //
928 // To top this all off, each of these needs to be done with a ShmEventLoop,
929 // which needs to run in a separate thread... And it is really hard to get
930 // everything started up reliably. So just be super generous on timeouts and
931 // hope for the best. We can be more generous in the future if we need to.
932 //
933 // We are faking the application names by passing in --application_name=foo
Austin Schuh5344c352020-04-12 17:04:26 -0700934 // Force ourselves to be "raspberrypi" and allocate everything.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800935 OnPi1();
936 MakePi1Server();
937 MakePi1Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700938
939 // And build the app for testing.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800940 MakePi1Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700941 aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800942 pi1_test_event_loop->MakeFetcher<ServerStatistics>("/pi1/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700943 aos::Fetcher<ClientStatistics> pi1_client_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800944 pi1_test_event_loop->MakeFetcher<ClientStatistics>("/pi1/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700945
946 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800947 OnPi2();
948 MakePi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700949
950 // And build the app for testing.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800951 MakePi2Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700952 aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800953 pi2_test_event_loop->MakeFetcher<ServerStatistics>("/pi2/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700954
955 // Start everything up. Pong is the only thing we don't know how to wait on,
956 // so start it first.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800957 StartPi1Test();
958 StartPi2Test();
959 StartPi1Server();
960 StartPi1Client();
961 StartPi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700962
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800963 // Confirm both client and server statistics messages have decent offsets in
964 // them.
Austin Schuh5344c352020-04-12 17:04:26 -0700965
966 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800967 MakePi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700968
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800969 RunPi2Server(chrono::milliseconds(3050));
Austin Schuh5344c352020-04-12 17:04:26 -0700970
971 // Now confirm we are synchronized.
972 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
973 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
974
975 const ServerConnection *const pi1_connection =
976 pi1_server_statistics_fetcher->connections()->Get(0);
977 const ServerConnection *const pi2_connection =
978 pi2_server_statistics_fetcher->connections()->Get(0);
979
980 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
981 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
982 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
983 chrono::milliseconds(1));
984 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
985 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800986 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh367a7f42021-11-23 23:04:36 -0800987 EXPECT_TRUE(pi1_connection->has_connected_since_time());
988 EXPECT_EQ(pi1_connection->connection_count(), 1u);
Austin Schuh5344c352020-04-12 17:04:26 -0700989
990 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
991 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
992 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
993 chrono::milliseconds(1));
994 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
995 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800996 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh367a7f42021-11-23 23:04:36 -0800997 EXPECT_TRUE(pi2_connection->has_connected_since_time());
998 EXPECT_EQ(pi2_connection->connection_count(), 1u);
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800999
1000 StopPi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -07001001 }
1002
1003 std::this_thread::sleep_for(std::chrono::seconds(2));
1004
1005 {
1006 // And confirm we are unsynchronized.
1007 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1008 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1009
1010 const ServerConnection *const pi1_server_connection =
1011 pi1_server_statistics_fetcher->connections()->Get(0);
1012 const ClientConnection *const pi1_client_connection =
1013 pi1_client_statistics_fetcher->connections()->Get(0);
1014
1015 EXPECT_EQ(pi1_server_connection->state(), State::CONNECTED);
1016 EXPECT_FALSE(pi1_server_connection->has_monotonic_offset());
Austin Schuh367a7f42021-11-23 23:04:36 -08001017 EXPECT_TRUE(pi1_server_connection->has_connected_since_time());
1018 EXPECT_EQ(pi1_server_connection->connection_count(), 1u);
1019
Austin Schuh20ac95d2020-12-05 17:24:19 -08001020 EXPECT_TRUE(pi1_server_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -07001021 EXPECT_EQ(pi1_client_connection->state(), State::DISCONNECTED);
1022 EXPECT_FALSE(pi1_client_connection->has_monotonic_offset());
Austin Schuh367a7f42021-11-23 23:04:36 -08001023 EXPECT_FALSE(pi1_client_connection->has_connected_since_time());
1024 EXPECT_EQ(pi1_client_connection->connection_count(), 1u);
1025 EXPECT_FALSE(pi1_client_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -07001026 }
1027
1028 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001029 MakePi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -07001030
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001031 RunPi2Server(chrono::milliseconds(3050));
Austin Schuh5344c352020-04-12 17:04:26 -07001032
1033 // And confirm we are synchronized again.
1034 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1035 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
Austin Schuh367a7f42021-11-23 23:04:36 -08001036 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
Austin Schuh5344c352020-04-12 17:04:26 -07001037
1038 const ServerConnection *const pi1_connection =
1039 pi1_server_statistics_fetcher->connections()->Get(0);
1040 const ServerConnection *const pi2_connection =
1041 pi2_server_statistics_fetcher->connections()->Get(0);
Austin Schuh367a7f42021-11-23 23:04:36 -08001042 const ClientConnection *const pi1_client_connection =
1043 pi1_client_statistics_fetcher->connections()->Get(0);
Austin Schuh5344c352020-04-12 17:04:26 -07001044
1045 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
1046 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
1047 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
1048 chrono::milliseconds(1));
1049 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
1050 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -08001051 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -07001052
Austin Schuh367a7f42021-11-23 23:04:36 -08001053 EXPECT_EQ(pi1_client_connection->state(), State::CONNECTED);
1054 EXPECT_TRUE(pi1_client_connection->has_connected_since_time());
1055 EXPECT_EQ(pi1_client_connection->connection_count(), 2u);
1056 EXPECT_TRUE(pi1_client_connection->has_boot_uuid());
1057
Austin Schuh5344c352020-04-12 17:04:26 -07001058 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
1059 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
1060 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
1061 chrono::milliseconds(1));
1062 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
1063 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -08001064 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001065
1066 StopPi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -07001067 }
1068
1069 // Shut everyone else down
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001070 StopPi1Server();
1071 StopPi1Client();
1072 StopPi2Client();
1073 StopPi1Test();
1074 StopPi2Test();
Austin Schuh5344c352020-04-12 17:04:26 -07001075}
1076
Austin Schuh4889b182020-11-18 19:11:56 -08001077// TODO(austin): The above test confirms that the external state does the right
Austin Schuh5344c352020-04-12 17:04:26 -07001078// thing, but doesn't confirm that the internal state does. We either need to
1079// expose a way to check the state in a thread-safe way, or need a way to jump
1080// time for one node to do that.
1081
Austin Schuh4889b182020-11-18 19:11:56 -08001082void SendPing(aos::Sender<examples::Ping> *sender, int value) {
1083 aos::Sender<examples::Ping>::Builder builder = sender->MakeBuilder();
1084 examples::Ping::Builder ping_builder = builder.MakeBuilder<examples::Ping>();
1085 ping_builder.add_value(value);
milind1f1dca32021-07-03 13:50:07 -07001086 builder.CheckOk(builder.Send(ping_builder.Finish()));
Austin Schuh4889b182020-11-18 19:11:56 -08001087}
1088
1089// Tests that when a message is sent before the bridge starts up, but is
1090// configured as reliable, we forward it. Confirm this survives a client reset.
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001091TEST_P(MessageBridgeParameterizedTest, ReliableSentBeforeClientStartup) {
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001092 OnPi1();
Austin Schuh4889b182020-11-18 19:11:56 -08001093
1094 FLAGS_application_name = "sender";
Austin Schuhf466ab52021-02-16 22:00:38 -08001095 aos::ShmEventLoop send_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -08001096 aos::Sender<examples::Ping> ping_sender =
1097 send_event_loop.MakeSender<examples::Ping>("/test");
1098 SendPing(&ping_sender, 1);
1099 aos::Sender<examples::Ping> unreliable_ping_sender =
1100 send_event_loop.MakeSender<examples::Ping>("/unreliable");
1101 SendPing(&unreliable_ping_sender, 1);
1102
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001103 MakePi1Server();
1104 MakePi1Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001105
1106 FLAGS_application_name = "pi1_timestamp";
Austin Schuhf466ab52021-02-16 22:00:38 -08001107 aos::ShmEventLoop pi1_remote_timestamp_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -08001108
1109 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001110 OnPi2();
Austin Schuh4889b182020-11-18 19:11:56 -08001111
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001112 MakePi2Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001113
Austin Schuhf466ab52021-02-16 22:00:38 -08001114 aos::ShmEventLoop receive_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -08001115 aos::Fetcher<examples::Ping> ping_fetcher =
1116 receive_event_loop.MakeFetcher<examples::Ping>("/test");
1117 aos::Fetcher<examples::Ping> unreliable_ping_fetcher =
1118 receive_event_loop.MakeFetcher<examples::Ping>("/unreliable");
1119 aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
1120 receive_event_loop.MakeFetcher<ClientStatistics>("/pi2/aos");
1121
1122 const size_t ping_channel_index = configuration::ChannelIndex(
1123 receive_event_loop.configuration(), ping_fetcher.channel());
1124
1125 std::atomic<int> ping_timestamp_count{0};
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001126 const std::string channel_name =
1127 shared() ? "/pi1/aos/remote_timestamps/pi2"
1128 : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping";
Austin Schuh4889b182020-11-18 19:11:56 -08001129 pi1_remote_timestamp_event_loop.MakeWatcher(
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001130 channel_name, [this, channel_name, ping_channel_index,
1131 &ping_timestamp_count](const RemoteMessage &header) {
Austin Schuh61e973f2021-02-21 21:43:56 -08001132 VLOG(1) << channel_name << " RemoteMessage "
Austin Schuh0de30f32020-12-06 12:44:28 -08001133 << aos::FlatbufferToJson(&header);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001134 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001135 if (shared() && header.channel_index() != ping_channel_index) {
1136 return;
Austin Schuh4889b182020-11-18 19:11:56 -08001137 }
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001138 CHECK_EQ(header.channel_index(), ping_channel_index);
1139 ++ping_timestamp_count;
Austin Schuh4889b182020-11-18 19:11:56 -08001140 });
1141
1142 // Before everything starts up, confirm there is no message.
1143 EXPECT_FALSE(ping_fetcher.Fetch());
1144 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
1145
1146 // Spin up the persistant pieces.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001147 StartPi1Server();
1148 StartPi1Client();
1149 StartPi2Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001150
1151 // Event used to wait for the timestamp counting thread to start.
1152 aos::Event event;
1153 std::thread pi1_remote_timestamp_thread(
1154 [&pi1_remote_timestamp_event_loop, &event]() {
1155 pi1_remote_timestamp_event_loop.OnRun([&event]() { event.Set(); });
1156 pi1_remote_timestamp_event_loop.Run();
1157 });
1158
1159 event.Wait();
1160
1161 {
1162 // Now, spin up a client for 2 seconds.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001163 MakePi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001164
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001165 RunPi2Client(chrono::milliseconds(2050));
Austin Schuh4889b182020-11-18 19:11:56 -08001166
1167 // Confirm there is no detected duplicate packet.
1168 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1169 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1170 ->Get(0)
1171 ->duplicate_packets(),
1172 0u);
1173
Austin Schuhe61d4382021-03-31 21:33:02 -07001174 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1175 ->Get(0)
1176 ->partial_deliveries(),
1177 0u);
1178
Austin Schuh4889b182020-11-18 19:11:56 -08001179 EXPECT_TRUE(ping_fetcher.Fetch());
1180 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
1181 EXPECT_EQ(ping_timestamp_count, 1);
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001182
1183 StopPi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001184 }
1185
1186 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001187 // Now, spin up a client for 2 seconds.
1188 MakePi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001189
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001190 RunPi2Client(chrono::milliseconds(5050));
Austin Schuh4889b182020-11-18 19:11:56 -08001191
1192 // Confirm we detect the duplicate packet correctly.
1193 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1194 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1195 ->Get(0)
1196 ->duplicate_packets(),
1197 1u);
1198
Austin Schuhe61d4382021-03-31 21:33:02 -07001199 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1200 ->Get(0)
1201 ->partial_deliveries(),
1202 0u);
1203
Austin Schuh4889b182020-11-18 19:11:56 -08001204 EXPECT_EQ(ping_timestamp_count, 1);
1205 EXPECT_FALSE(ping_fetcher.Fetch());
1206 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001207
1208 StopPi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001209 }
1210
1211 // Shut everyone else down
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001212 StopPi1Client();
1213 StopPi2Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001214 pi1_remote_timestamp_event_loop.Exit();
1215 pi1_remote_timestamp_thread.join();
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001216 StopPi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001217}
1218
1219// Tests that when a message is sent before the bridge starts up, but is
1220// configured as reliable, we forward it. Confirm this works across server
1221// resets.
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001222TEST_P(MessageBridgeParameterizedTest, ReliableSentBeforeServerStartup) {
Austin Schuh4889b182020-11-18 19:11:56 -08001223 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001224 OnPi2();
Austin Schuh4889b182020-11-18 19:11:56 -08001225
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001226 MakePi2Server();
1227 MakePi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001228
Austin Schuhf466ab52021-02-16 22:00:38 -08001229 aos::ShmEventLoop receive_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -08001230 aos::Fetcher<examples::Ping> ping_fetcher =
1231 receive_event_loop.MakeFetcher<examples::Ping>("/test");
1232 aos::Fetcher<examples::Ping> unreliable_ping_fetcher =
1233 receive_event_loop.MakeFetcher<examples::Ping>("/unreliable");
1234 aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
1235 receive_event_loop.MakeFetcher<ClientStatistics>("/pi2/aos");
1236
Austin Schuh4889b182020-11-18 19:11:56 -08001237 // Force ourselves to be "raspberrypi" and allocate everything.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001238 OnPi1();
Austin Schuh4889b182020-11-18 19:11:56 -08001239
1240 FLAGS_application_name = "sender";
Austin Schuhf466ab52021-02-16 22:00:38 -08001241 aos::ShmEventLoop send_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -08001242 aos::Sender<examples::Ping> ping_sender =
1243 send_event_loop.MakeSender<examples::Ping>("/test");
1244 {
1245 aos::Sender<examples::Ping>::Builder builder = ping_sender.MakeBuilder();
1246 examples::Ping::Builder ping_builder =
1247 builder.MakeBuilder<examples::Ping>();
1248 ping_builder.add_value(1);
milind1f1dca32021-07-03 13:50:07 -07001249 builder.CheckOk(builder.Send(ping_builder.Finish()));
Austin Schuh4889b182020-11-18 19:11:56 -08001250 }
1251
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001252 MakePi1Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001253
1254 FLAGS_application_name = "pi1_timestamp";
Austin Schuhf466ab52021-02-16 22:00:38 -08001255 aos::ShmEventLoop pi1_remote_timestamp_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -08001256
1257 const size_t ping_channel_index = configuration::ChannelIndex(
1258 receive_event_loop.configuration(), ping_fetcher.channel());
1259
1260 std::atomic<int> ping_timestamp_count{0};
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001261 const std::string channel_name =
1262 shared() ? "/pi1/aos/remote_timestamps/pi2"
1263 : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping";
Austin Schuh4889b182020-11-18 19:11:56 -08001264 pi1_remote_timestamp_event_loop.MakeWatcher(
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001265 channel_name, [this, channel_name, ping_channel_index,
1266 &ping_timestamp_count](const RemoteMessage &header) {
1267 VLOG(1) << channel_name << " RemoteMessage "
Austin Schuh0de30f32020-12-06 12:44:28 -08001268 << aos::FlatbufferToJson(&header);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001269 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001270 if (shared() && header.channel_index() != ping_channel_index) {
1271 return;
Austin Schuh4889b182020-11-18 19:11:56 -08001272 }
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001273 CHECK_EQ(header.channel_index(), ping_channel_index);
1274 ++ping_timestamp_count;
Austin Schuh4889b182020-11-18 19:11:56 -08001275 });
1276
1277 // Before everything starts up, confirm there is no message.
1278 EXPECT_FALSE(ping_fetcher.Fetch());
1279 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
1280
1281 // Spin up the persistant pieces.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001282 StartPi1Client();
1283 StartPi2Server();
1284 StartPi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001285
1286 // Event used to wait for the timestamp counting thread to start.
1287 aos::Event event;
1288 std::thread pi1_remote_timestamp_thread(
1289 [&pi1_remote_timestamp_event_loop, &event]() {
1290 pi1_remote_timestamp_event_loop.OnRun([&event]() { event.Set(); });
1291 pi1_remote_timestamp_event_loop.Run();
1292 });
1293
1294 event.Wait();
1295
1296 {
1297 // Now, spin up a server for 2 seconds.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001298 MakePi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001299
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001300 RunPi1Server(chrono::milliseconds(2050));
Austin Schuh4889b182020-11-18 19:11:56 -08001301
1302 // Confirm there is no detected duplicate packet.
1303 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1304 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1305 ->Get(0)
1306 ->duplicate_packets(),
1307 0u);
1308
Austin Schuhe61d4382021-03-31 21:33:02 -07001309 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1310 ->Get(0)
1311 ->partial_deliveries(),
1312 0u);
1313
Austin Schuh4889b182020-11-18 19:11:56 -08001314 EXPECT_TRUE(ping_fetcher.Fetch());
1315 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
1316 EXPECT_EQ(ping_timestamp_count, 1);
1317 LOG(INFO) << "Shutting down first pi1 MessageBridgeServer";
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001318
1319 StopPi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001320 }
1321
1322 {
1323 // Now, spin up a second server for 2 seconds.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001324 MakePi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001325
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001326 RunPi1Server(chrono::milliseconds(2050));
Austin Schuh4889b182020-11-18 19:11:56 -08001327
1328 // Confirm we detect the duplicate packet correctly.
1329 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1330 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1331 ->Get(0)
1332 ->duplicate_packets(),
1333 1u);
1334
Austin Schuhe61d4382021-03-31 21:33:02 -07001335 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1336 ->Get(0)
1337 ->partial_deliveries(),
1338 0u);
1339
Austin Schuh4889b182020-11-18 19:11:56 -08001340 EXPECT_EQ(ping_timestamp_count, 1);
1341 EXPECT_FALSE(ping_fetcher.Fetch());
1342 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001343
1344 StopPi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001345 }
1346
1347 // Shut everyone else down
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001348 StopPi1Client();
1349 StopPi2Server();
1350 StopPi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001351 pi1_remote_timestamp_event_loop.Exit();
1352 pi1_remote_timestamp_thread.join();
Austin Schuh4889b182020-11-18 19:11:56 -08001353}
1354
Austin Schuhb0e439d2023-05-15 10:55:40 -07001355// Test that differing config sha256's result in no connection.
1356TEST_P(MessageBridgeParameterizedTest, MismatchedSha256) {
1357 // This is rather annoying to set up. We need to start up a client and
1358 // server, on the same node, but get them to think that they are on different
1359 // nodes.
1360 //
1361 // We need the client to not post directly to "/test" like it would in a
1362 // real system, otherwise we will re-send the ping message... So, use an
1363 // application specific map to have the client post somewhere else.
1364 //
1365 // To top this all off, each of these needs to be done with a ShmEventLoop,
1366 // which needs to run in a separate thread... And it is really hard to get
1367 // everything started up reliably. So just be super generous on timeouts and
1368 // hope for the best. We can be more generous in the future if we need to.
1369 //
1370 // We are faking the application names by passing in --application_name=foo
1371 OnPi1();
1372
Austin Schuh530f9ee2023-05-15 14:29:31 -07001373 MakePi1Server(
1374 "dummy sha256 ");
Austin Schuhb0e439d2023-05-15 10:55:40 -07001375 MakePi1Client();
1376
1377 // And build the app for testing.
1378 MakePi1Test();
1379 aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
1380 pi1_test_event_loop->MakeFetcher<ServerStatistics>("/pi1/aos");
1381 aos::Fetcher<ClientStatistics> pi1_client_statistics_fetcher =
1382 pi1_test_event_loop->MakeFetcher<ClientStatistics>("/pi1/aos");
1383
1384 // Now do it for "raspberrypi2", the client.
1385 OnPi2();
1386 MakePi2Server();
1387
1388 // And build the app for testing.
1389 MakePi2Test();
1390 aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
1391 pi2_test_event_loop->MakeFetcher<ServerStatistics>("/pi2/aos");
1392 aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
1393 pi2_test_event_loop->MakeFetcher<ClientStatistics>("/pi2/aos");
1394
1395 // Wait until we are connected, then send.
1396
1397 StartPi1Test();
1398 StartPi2Test();
1399 StartPi1Server();
1400 StartPi1Client();
1401 StartPi2Server();
1402
1403 {
1404 MakePi2Client();
1405
1406 RunPi2Client(chrono::milliseconds(3050));
1407
1408 // Now confirm we are synchronized.
1409 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1410 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1411 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1412 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1413
1414 const ServerConnection *const pi1_connection =
1415 pi1_server_statistics_fetcher->connections()->Get(0);
1416 const ClientConnection *const pi1_client_connection =
1417 pi1_client_statistics_fetcher->connections()->Get(0);
1418 const ServerConnection *const pi2_connection =
1419 pi2_server_statistics_fetcher->connections()->Get(0);
1420 const ClientConnection *const pi2_client_connection =
1421 pi2_client_statistics_fetcher->connections()->Get(0);
1422
1423 // Make sure one direction is disconnected with a bunch of connection
1424 // attempts and failures.
1425 EXPECT_EQ(pi1_connection->state(), State::DISCONNECTED);
1426 EXPECT_EQ(pi1_connection->connection_count(), 0u);
1427 EXPECT_GT(pi1_connection->invalid_connection_count(), 10u);
1428
1429 EXPECT_EQ(pi2_client_connection->state(), State::DISCONNECTED);
1430 EXPECT_GT(pi2_client_connection->connection_count(), 10u);
1431
1432 // And the other direction is happy.
1433 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
1434 EXPECT_EQ(pi2_connection->connection_count(), 1u);
1435 EXPECT_TRUE(pi2_connection->has_connected_since_time());
1436 EXPECT_FALSE(pi2_connection->has_monotonic_offset());
1437 EXPECT_TRUE(pi2_connection->has_boot_uuid());
1438
1439 EXPECT_EQ(pi1_client_connection->state(), State::CONNECTED);
1440 EXPECT_EQ(pi1_client_connection->connection_count(), 1u);
1441
1442 VLOG(1) << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1443 VLOG(1) << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1444 VLOG(1) << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1445 VLOG(1) << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1446
1447 StopPi2Client();
1448 }
1449
1450 // Shut everyone else down
1451 StopPi1Server();
1452 StopPi1Client();
1453 StopPi2Server();
1454 StopPi1Test();
1455 StopPi2Test();
1456}
1457
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -07001458INSTANTIATE_TEST_SUITE_P(
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001459 MessageBridgeTests, MessageBridgeParameterizedTest,
1460 ::testing::Values(
1461 Param{"message_bridge_test_combined_timestamps_common_config.json",
1462 true},
1463 Param{"message_bridge_test_common_config.json", false}));
1464
Austin Schuhe84c3ed2019-12-14 15:29:48 -08001465} // namespace testing
1466} // namespace message_bridge
1467} // namespace aos