blob: 02e6a02f40712b97ece97a6623b3199e5db6e3ea [file] [log] [blame]
Austin Schuhe84c3ed2019-12-14 15:29:48 -08001#include <chrono>
2#include <thread>
3
Austin Schuh2f8fd752020-09-01 22:38:28 -07004#include "absl/strings/str_cat.h"
Austin Schuhe84c3ed2019-12-14 15:29:48 -08005#include "aos/events/ping_generated.h"
6#include "aos/events/pong_generated.h"
Brian Silverman7b266d92021-02-17 21:24:02 -08007#include "aos/ipc_lib/event.h"
Austin Schuhe84c3ed2019-12-14 15:29:48 -08008#include "aos/network/message_bridge_client_lib.h"
9#include "aos/network/message_bridge_server_lib.h"
Jim Ostrowski2192ddb2020-06-24 19:07:31 -070010#include "aos/network/team_number.h"
Austin Schuhb0e439d2023-05-15 10:55:40 -070011#include "aos/sha256.h"
Austin Schuh373f1762021-06-02 21:07:09 -070012#include "aos/testing/path.h"
Austin Schuhe991fe22020-11-18 16:53:39 -080013#include "aos/util/file.h"
Brian Silverman7b266d92021-02-17 21:24:02 -080014#include "gtest/gtest.h"
Austin Schuhe84c3ed2019-12-14 15:29:48 -080015
Austin Schuh8902fa52021-03-14 22:39:24 -070016DECLARE_string(boot_uuid);
17
Austin Schuhe84c3ed2019-12-14 15:29:48 -080018namespace aos {
Austin Schuh2f8fd752020-09-01 22:38:28 -070019void SetShmBase(const std::string_view base);
20
Austin Schuhe84c3ed2019-12-14 15:29:48 -080021namespace message_bridge {
22namespace testing {
23
Austin Schuh373f1762021-06-02 21:07:09 -070024using aos::testing::ArtifactPath;
25
Austin Schuhe84c3ed2019-12-14 15:29:48 -080026namespace chrono = std::chrono;
27
Austin Schuhe991fe22020-11-18 16:53:39 -080028std::string ShmBase(const std::string_view node) {
Austin Schuh2f8fd752020-09-01 22:38:28 -070029 const char *tmpdir_c_str = getenv("TEST_TMPDIR");
30 if (tmpdir_c_str != nullptr) {
Austin Schuhe991fe22020-11-18 16:53:39 -080031 return absl::StrCat(tmpdir_c_str, "/", node);
Austin Schuh2f8fd752020-09-01 22:38:28 -070032 } else {
Austin Schuhe991fe22020-11-18 16:53:39 -080033 return absl::StrCat("/dev/shm/", node);
Austin Schuh2f8fd752020-09-01 22:38:28 -070034 }
35}
36
Austin Schuhe991fe22020-11-18 16:53:39 -080037void DoSetShmBase(const std::string_view node) {
38 aos::SetShmBase(ShmBase(node));
39}
40
Austin Schuh36a2c3e2021-02-18 22:28:38 -080041// Parameters to run all the tests with.
42struct Param {
43 // The config file to use.
44 std::string config;
45 // If true, the RemoteMessage channel should be shared between all the remote
46 // channels. If false, there will be 1 RemoteMessage channel per remote
47 // channel.
48 bool shared;
49};
50
51class MessageBridgeParameterizedTest
52 : public ::testing::TestWithParam<struct Param> {
Austin Schuh0de30f32020-12-06 12:44:28 -080053 public:
Austin Schuh36a2c3e2021-02-18 22:28:38 -080054 MessageBridgeParameterizedTest()
55 : config(aos::configuration::ReadConfig(
Austin Schuh373f1762021-06-02 21:07:09 -070056 ArtifactPath(absl::StrCat("aos/network/", GetParam().config)))),
Austin Schuhb0e439d2023-05-15 10:55:40 -070057 config_sha256(Sha256(config.span())),
Austin Schuh8902fa52021-03-14 22:39:24 -070058 pi1_boot_uuid_(UUID::Random()),
59 pi2_boot_uuid_(UUID::Random()) {
Austin Schuh0de30f32020-12-06 12:44:28 -080060 util::UnlinkRecursive(ShmBase("pi1"));
61 util::UnlinkRecursive(ShmBase("pi2"));
62 }
Austin Schuhe991fe22020-11-18 16:53:39 -080063
Austin Schuh36a2c3e2021-02-18 22:28:38 -080064 bool shared() const { return GetParam().shared; }
65
Austin Schuh0a2f12f2021-01-08 22:48:29 -080066 void OnPi1() {
67 DoSetShmBase("pi1");
68 FLAGS_override_hostname = "raspberrypi";
Austin Schuh8902fa52021-03-14 22:39:24 -070069 FLAGS_boot_uuid = pi1_boot_uuid_.ToString();
Austin Schuh0a2f12f2021-01-08 22:48:29 -080070 }
71
72 void OnPi2() {
73 DoSetShmBase("pi2");
74 FLAGS_override_hostname = "raspberrypi2";
Austin Schuh8902fa52021-03-14 22:39:24 -070075 FLAGS_boot_uuid = pi2_boot_uuid_.ToString();
Austin Schuh0a2f12f2021-01-08 22:48:29 -080076 }
77
Austin Schuhb0e439d2023-05-15 10:55:40 -070078 void MakePi1Server(std::string server_config_sha256 = "") {
Austin Schuh0a2f12f2021-01-08 22:48:29 -080079 OnPi1();
80 FLAGS_application_name = "pi1_message_bridge_server";
81 pi1_server_event_loop =
Austin Schuhf466ab52021-02-16 22:00:38 -080082 std::make_unique<aos::ShmEventLoop>(&config.message());
Austin Schuh0a2f12f2021-01-08 22:48:29 -080083 pi1_server_event_loop->SetRuntimeRealtimePriority(1);
Austin Schuhb0e439d2023-05-15 10:55:40 -070084 pi1_message_bridge_server = std::make_unique<MessageBridgeServer>(
85 pi1_server_event_loop.get(), server_config_sha256.size() == 0
86 ? config_sha256
87 : server_config_sha256);
Austin Schuh0a2f12f2021-01-08 22:48:29 -080088 }
89
90 void RunPi1Server(chrono::nanoseconds duration) {
91 // Setup a shutdown callback.
92 aos::TimerHandler *const quit = pi1_server_event_loop->AddTimer(
93 [this]() { pi1_server_event_loop->Exit(); });
94 pi1_server_event_loop->OnRun([this, quit, duration]() {
95 // Stop between timestamps, not exactly on them.
96 quit->Setup(pi1_server_event_loop->monotonic_now() + duration);
97 });
98
99 pi1_server_event_loop->Run();
100 }
101
102 void StartPi1Server() {
103 pi1_server_thread = std::thread([this]() {
104 LOG(INFO) << "Started pi1_message_bridge_server";
105 pi1_server_event_loop->Run();
106 });
107 }
108
109 void StopPi1Server() {
110 if (pi1_server_thread.joinable()) {
111 pi1_server_event_loop->Exit();
112 pi1_server_thread.join();
113 pi1_server_thread = std::thread();
114 }
115 pi1_message_bridge_server.reset();
116 pi1_server_event_loop.reset();
117 }
118
119 void MakePi1Client() {
120 OnPi1();
121 FLAGS_application_name = "pi1_message_bridge_client";
122 pi1_client_event_loop =
Austin Schuhf466ab52021-02-16 22:00:38 -0800123 std::make_unique<aos::ShmEventLoop>(&config.message());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800124 pi1_client_event_loop->SetRuntimeRealtimePriority(1);
Austin Schuhb0e439d2023-05-15 10:55:40 -0700125 pi1_message_bridge_client = std::make_unique<MessageBridgeClient>(
126 pi1_client_event_loop.get(), config_sha256);
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800127 }
128
129 void StartPi1Client() {
130 pi1_client_thread = std::thread([this]() {
131 LOG(INFO) << "Started pi1_message_bridge_client";
132 pi1_client_event_loop->Run();
133 });
134 }
135
136 void StopPi1Client() {
137 pi1_client_event_loop->Exit();
138 pi1_client_thread.join();
139 pi1_client_thread = std::thread();
140 pi1_message_bridge_client.reset();
141 pi1_client_event_loop.reset();
142 }
143
144 void MakePi1Test() {
145 OnPi1();
146 FLAGS_application_name = "test1";
147 pi1_test_event_loop =
Austin Schuhf466ab52021-02-16 22:00:38 -0800148 std::make_unique<aos::ShmEventLoop>(&config.message());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800149
150 pi1_test_event_loop->MakeWatcher(
151 "/pi1/aos", [](const ServerStatistics &stats) {
152 VLOG(1) << "/pi1/aos ServerStatistics " << FlatbufferToJson(&stats);
153 });
154
155 pi1_test_event_loop->MakeWatcher(
156 "/pi1/aos", [](const ClientStatistics &stats) {
157 VLOG(1) << "/pi1/aos ClientStatistics " << FlatbufferToJson(&stats);
158 });
159
160 pi1_test_event_loop->MakeWatcher(
161 "/pi1/aos", [](const Timestamp &timestamp) {
162 VLOG(1) << "/pi1/aos Timestamp " << FlatbufferToJson(&timestamp);
163 });
Austin Schuh8902fa52021-03-14 22:39:24 -0700164 pi1_test_event_loop->MakeWatcher(
165 "/pi2/aos", [this](const Timestamp &timestamp) {
166 VLOG(1) << "/pi2/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuha9012be2021-07-21 15:19:11 -0700167 EXPECT_EQ(pi1_test_event_loop->context().source_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -0700168 pi2_boot_uuid_);
169 });
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800170 }
171
172 void StartPi1Test() {
173 pi1_test_thread = std::thread([this]() {
174 LOG(INFO) << "Started pi1_test";
175 pi1_test_event_loop->Run();
176 });
177 }
178
179 void StopPi1Test() {
180 pi1_test_event_loop->Exit();
181 pi1_test_thread.join();
182 }
183
184 void MakePi2Server() {
185 OnPi2();
186 FLAGS_application_name = "pi2_message_bridge_server";
187 pi2_server_event_loop =
Austin Schuhf466ab52021-02-16 22:00:38 -0800188 std::make_unique<aos::ShmEventLoop>(&config.message());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800189 pi2_server_event_loop->SetRuntimeRealtimePriority(1);
Austin Schuhb0e439d2023-05-15 10:55:40 -0700190 pi2_message_bridge_server = std::make_unique<MessageBridgeServer>(
191 pi2_server_event_loop.get(), config_sha256);
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800192 }
193
194 void RunPi2Server(chrono::nanoseconds duration) {
195 // Setup a shutdown callback.
196 aos::TimerHandler *const quit = pi2_server_event_loop->AddTimer(
197 [this]() { pi2_server_event_loop->Exit(); });
198 pi2_server_event_loop->OnRun([this, quit, duration]() {
199 // Stop between timestamps, not exactly on them.
200 quit->Setup(pi2_server_event_loop->monotonic_now() + duration);
201 });
202
203 pi2_server_event_loop->Run();
204 }
205
206 void StartPi2Server() {
207 pi2_server_thread = std::thread([this]() {
208 LOG(INFO) << "Started pi2_message_bridge_server";
209 pi2_server_event_loop->Run();
210 });
211 }
212
213 void StopPi2Server() {
214 if (pi2_server_thread.joinable()) {
215 pi2_server_event_loop->Exit();
216 pi2_server_thread.join();
217 pi2_server_thread = std::thread();
218 }
219 pi2_message_bridge_server.reset();
220 pi2_server_event_loop.reset();
221 }
222
223 void MakePi2Client() {
224 OnPi2();
225 FLAGS_application_name = "pi2_message_bridge_client";
226 pi2_client_event_loop =
Austin Schuhf466ab52021-02-16 22:00:38 -0800227 std::make_unique<aos::ShmEventLoop>(&config.message());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800228 pi2_client_event_loop->SetRuntimeRealtimePriority(1);
Austin Schuhb0e439d2023-05-15 10:55:40 -0700229 pi2_message_bridge_client = std::make_unique<MessageBridgeClient>(
230 pi2_client_event_loop.get(), config_sha256);
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800231 }
232
233 void RunPi2Client(chrono::nanoseconds duration) {
234 // Run for 5 seconds to make sure we have time to estimate the offset.
235 aos::TimerHandler *const quit = pi2_client_event_loop->AddTimer(
236 [this]() { pi2_client_event_loop->Exit(); });
237 pi2_client_event_loop->OnRun([this, quit, duration]() {
238 // Stop between timestamps, not exactly on them.
239 quit->Setup(pi2_client_event_loop->monotonic_now() + duration);
240 });
241
242 // And go!
243 pi2_client_event_loop->Run();
244 }
245
246 void StartPi2Client() {
247 pi2_client_thread = std::thread([this]() {
248 LOG(INFO) << "Started pi2_message_bridge_client";
249 pi2_client_event_loop->Run();
250 });
251 }
252
253 void StopPi2Client() {
254 if (pi2_client_thread.joinable()) {
255 pi2_client_event_loop->Exit();
256 pi2_client_thread.join();
257 pi2_client_thread = std::thread();
258 }
259 pi2_message_bridge_client.reset();
260 pi2_client_event_loop.reset();
261 }
262
263 void MakePi2Test() {
264 OnPi2();
265 FLAGS_application_name = "test2";
266 pi2_test_event_loop =
Austin Schuhf466ab52021-02-16 22:00:38 -0800267 std::make_unique<aos::ShmEventLoop>(&config.message());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800268
269 pi2_test_event_loop->MakeWatcher(
270 "/pi2/aos", [](const ServerStatistics &stats) {
271 VLOG(1) << "/pi2/aos ServerStatistics " << FlatbufferToJson(&stats);
272 });
273
274 pi2_test_event_loop->MakeWatcher(
275 "/pi2/aos", [](const ClientStatistics &stats) {
276 VLOG(1) << "/pi2/aos ClientStatistics " << FlatbufferToJson(&stats);
277 });
278
279 pi2_test_event_loop->MakeWatcher(
Austin Schuh8902fa52021-03-14 22:39:24 -0700280 "/pi1/aos", [this](const Timestamp &timestamp) {
281 VLOG(1) << "/pi1/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuha9012be2021-07-21 15:19:11 -0700282 EXPECT_EQ(pi2_test_event_loop->context().source_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -0700283 pi1_boot_uuid_);
284 });
285 pi2_test_event_loop->MakeWatcher(
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800286 "/pi2/aos", [](const Timestamp &timestamp) {
287 VLOG(1) << "/pi2/aos Timestamp " << FlatbufferToJson(&timestamp);
288 });
289 }
290
291 void StartPi2Test() {
292 pi2_test_thread = std::thread([this]() {
293 LOG(INFO) << "Started pi2_message_bridge_test";
294 pi2_test_event_loop->Run();
295 });
296 }
297
298 void StopPi2Test() {
299 pi2_test_event_loop->Exit();
300 pi2_test_thread.join();
301 }
302
Austin Schuhf466ab52021-02-16 22:00:38 -0800303 aos::FlatbufferDetachedBuffer<aos::Configuration> config;
Austin Schuhb0e439d2023-05-15 10:55:40 -0700304 std::string config_sha256;
305
Austin Schuh8902fa52021-03-14 22:39:24 -0700306 const UUID pi1_boot_uuid_;
307 const UUID pi2_boot_uuid_;
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800308
309 std::unique_ptr<aos::ShmEventLoop> pi1_server_event_loop;
310 std::unique_ptr<MessageBridgeServer> pi1_message_bridge_server;
311 std::thread pi1_server_thread;
312
313 std::unique_ptr<aos::ShmEventLoop> pi1_client_event_loop;
314 std::unique_ptr<MessageBridgeClient> pi1_message_bridge_client;
315 std::thread pi1_client_thread;
316
317 std::unique_ptr<aos::ShmEventLoop> pi1_test_event_loop;
318 std::thread pi1_test_thread;
319
320 std::unique_ptr<aos::ShmEventLoop> pi2_server_event_loop;
321 std::unique_ptr<MessageBridgeServer> pi2_message_bridge_server;
322 std::thread pi2_server_thread;
323
324 std::unique_ptr<aos::ShmEventLoop> pi2_client_event_loop;
325 std::unique_ptr<MessageBridgeClient> pi2_message_bridge_client;
326 std::thread pi2_client_thread;
327
328 std::unique_ptr<aos::ShmEventLoop> pi2_test_event_loop;
329 std::thread pi2_test_thread;
Austin Schuhe991fe22020-11-18 16:53:39 -0800330};
331
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800332// Test that we can send a ping message over sctp and receive it.
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800333TEST_P(MessageBridgeParameterizedTest, PingPong) {
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800334 // This is rather annoying to set up. We need to start up a client and
335 // server, on the same node, but get them to think that they are on different
336 // nodes.
337 //
338 // We then get to wait until they are connected.
339 //
340 // After they are connected, we send a Ping message.
341 //
342 // On the other end, we receive a Pong message.
343 //
344 // But, we need the client to not post directly to "/test" like it would in a
345 // real system, otherwise we will re-send the ping message... So, use an
346 // application specific map to have the client post somewhere else.
347 //
348 // To top this all off, each of these needs to be done with a ShmEventLoop,
349 // which needs to run in a separate thread... And it is really hard to get
350 // everything started up reliably. So just be super generous on timeouts and
351 // hope for the best. We can be more generous in the future if we need to.
352 //
353 // We are faking the application names by passing in --application_name=foo
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800354 OnPi1();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800355 // Force ourselves to be "raspberrypi" and allocate everything.
Austin Schuh2f8fd752020-09-01 22:38:28 -0700356
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800357 MakePi1Server();
358 MakePi1Client();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800359
360 // And build the app which sends the pings.
361 FLAGS_application_name = "ping";
Austin Schuhf466ab52021-02-16 22:00:38 -0800362 aos::ShmEventLoop ping_event_loop(&config.message());
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800363 aos::Sender<examples::Ping> ping_sender =
364 ping_event_loop.MakeSender<examples::Ping>("/test");
365
Austin Schuhf466ab52021-02-16 22:00:38 -0800366 aos::ShmEventLoop pi1_test_event_loop(&config.message());
Austin Schuh0de30f32020-12-06 12:44:28 -0800367 aos::Fetcher<RemoteMessage> message_header_fetcher1 =
368 pi1_test_event_loop.MakeFetcher<RemoteMessage>(
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800369 shared() ? "/pi1/aos/remote_timestamps/pi2"
370 : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping");
Austin Schuh2f8fd752020-09-01 22:38:28 -0700371
372 // Fetchers for confirming the remote timestamps made it.
373 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
374 ping_event_loop.MakeFetcher<examples::Ping>("/test");
375 aos::Fetcher<Timestamp> pi1_on_pi1_timestamp_fetcher =
376 ping_event_loop.MakeFetcher<Timestamp>("/aos");
377
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800378 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800379 OnPi2();
Austin Schuh2f8fd752020-09-01 22:38:28 -0700380
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800381 MakePi2Client();
382 MakePi2Server();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800383
384 // And build the app which sends the pongs.
385 FLAGS_application_name = "pong";
Austin Schuhf466ab52021-02-16 22:00:38 -0800386 aos::ShmEventLoop pong_event_loop(&config.message());
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800387
Austin Schuh7bc59052020-02-16 23:48:33 -0800388 // And build the app for testing.
389 FLAGS_application_name = "test";
Austin Schuhf466ab52021-02-16 22:00:38 -0800390 aos::ShmEventLoop test_event_loop(&config.message());
Austin Schuh7bc59052020-02-16 23:48:33 -0800391
392 aos::Fetcher<ClientStatistics> client_statistics_fetcher =
393 test_event_loop.MakeFetcher<ClientStatistics>("/aos");
Austin Schuh0de30f32020-12-06 12:44:28 -0800394 aos::Fetcher<RemoteMessage> message_header_fetcher2 =
395 test_event_loop.MakeFetcher<RemoteMessage>(
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800396 shared() ? "/pi2/aos/remote_timestamps/pi1"
397 : "/pi2/aos/remote_timestamps/pi1/pi2/aos/"
398 "aos-message_bridge-Timestamp");
Austin Schuh2f8fd752020-09-01 22:38:28 -0700399
400 // Event loop for fetching data delivered to pi2 from pi1 to match up
401 // messages.
Austin Schuhf466ab52021-02-16 22:00:38 -0800402 aos::ShmEventLoop delivered_messages_event_loop(&config.message());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700403 aos::Fetcher<Timestamp> pi1_on_pi2_timestamp_fetcher =
404 delivered_messages_event_loop.MakeFetcher<Timestamp>("/pi1/aos");
405 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
406 delivered_messages_event_loop.MakeFetcher<examples::Ping>("/test");
407 EXPECT_FALSE(ping_on_pi2_fetcher.Fetch());
408 EXPECT_FALSE(pi1_on_pi2_timestamp_fetcher.Fetch());
Austin Schuh7bc59052020-02-16 23:48:33 -0800409
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800410 // Count the pongs.
411 int pong_count = 0;
Austin Schuh8902fa52021-03-14 22:39:24 -0700412 pong_event_loop.MakeWatcher("/test", [&pong_count, &pong_event_loop,
413 this](const examples::Ping &ping) {
Austin Schuha9012be2021-07-21 15:19:11 -0700414 EXPECT_EQ(pong_event_loop.context().source_boot_uuid, pi1_boot_uuid_);
Austin Schuh8902fa52021-03-14 22:39:24 -0700415 ++pong_count;
416 VLOG(1) << "Got ping back " << FlatbufferToJson(&ping);
417 });
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800418
419 FLAGS_override_hostname = "";
420
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800421 // Wait until we are connected, then send.
422 int ping_count = 0;
Austin Schuh7bc59052020-02-16 23:48:33 -0800423 int pi1_server_statistics_count = 0;
Austin Schuh61e973f2021-02-21 21:43:56 -0800424 ping_event_loop.MakeWatcher("/pi1/aos", [this, &ping_count, &ping_sender,
425 &pi1_server_statistics_count](
426 const ServerStatistics &stats) {
427 VLOG(1) << "/pi1/aos ServerStatistics " << FlatbufferToJson(&stats);
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800428
Austin Schuh61e973f2021-02-21 21:43:56 -0800429 ASSERT_TRUE(stats.has_connections());
430 EXPECT_EQ(stats.connections()->size(), 1);
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800431
Austin Schuh61e973f2021-02-21 21:43:56 -0800432 bool connected = false;
433 for (const ServerConnection *connection : *stats.connections()) {
434 // Confirm that we are estimating the server time offset correctly. It
435 // should be about 0 since we are on the same machine here.
436 if (connection->has_monotonic_offset()) {
437 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
438 chrono::milliseconds(1));
439 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
440 chrono::milliseconds(-1));
441 ++pi1_server_statistics_count;
442 }
Austin Schuh7bc59052020-02-16 23:48:33 -0800443
Austin Schuh61e973f2021-02-21 21:43:56 -0800444 if (connection->node()->name()->string_view() ==
445 pi2_client_event_loop->node()->name()->string_view()) {
446 if (connection->state() == State::CONNECTED) {
447 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh367a7f42021-11-23 23:04:36 -0800448 EXPECT_EQ(connection->connection_count(), 1u);
449 EXPECT_LT(monotonic_clock::time_point(chrono::nanoseconds(
450 connection->connected_since_time())),
451 monotonic_clock::now());
Austin Schuh61e973f2021-02-21 21:43:56 -0800452 connected = true;
Austin Schuh367a7f42021-11-23 23:04:36 -0800453 } else {
454 EXPECT_FALSE(connection->has_connection_count());
455 EXPECT_FALSE(connection->has_connected_since_time());
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800456 }
Austin Schuh61e973f2021-02-21 21:43:56 -0800457 }
458 }
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800459
Austin Schuh61e973f2021-02-21 21:43:56 -0800460 if (connected) {
461 VLOG(1) << "Connected! Sent ping.";
462 auto builder = ping_sender.MakeBuilder();
463 examples::Ping::Builder ping_builder =
464 builder.MakeBuilder<examples::Ping>();
465 ping_builder.add_value(ping_count + 971);
Austin Schuh60e77942022-05-16 17:48:24 -0700466 EXPECT_EQ(builder.Send(ping_builder.Finish()), RawSender::Error::kOk);
Austin Schuh61e973f2021-02-21 21:43:56 -0800467 ++ping_count;
468 }
469 });
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800470
Austin Schuh7bc59052020-02-16 23:48:33 -0800471 // Confirm both client and server statistics messages have decent offsets in
472 // them.
473 int pi2_server_statistics_count = 0;
Austin Schuh196a4452020-03-15 23:12:03 -0700474 pong_event_loop.MakeWatcher("/pi2/aos", [&pi2_server_statistics_count](
Austin Schuh7bc59052020-02-16 23:48:33 -0800475 const ServerStatistics &stats) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800476 VLOG(1) << "/pi2/aos ServerStatistics " << FlatbufferToJson(&stats);
Austin Schuh7bc59052020-02-16 23:48:33 -0800477 for (const ServerConnection *connection : *stats.connections()) {
478 if (connection->has_monotonic_offset()) {
479 ++pi2_server_statistics_count;
480 // Confirm that we are estimating the server time offset correctly. It
481 // should be about 0 since we are on the same machine here.
482 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
483 chrono::milliseconds(1));
484 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
485 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800486 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh7bc59052020-02-16 23:48:33 -0800487 }
Austin Schuh367a7f42021-11-23 23:04:36 -0800488
489 if (connection->state() == State::CONNECTED) {
490 EXPECT_EQ(connection->connection_count(), 1u);
491 EXPECT_LT(monotonic_clock::time_point(
492 chrono::nanoseconds(connection->connected_since_time())),
493 monotonic_clock::now());
494 } else {
495 EXPECT_FALSE(connection->has_connection_count());
496 EXPECT_FALSE(connection->has_connected_since_time());
497 }
Austin Schuh7bc59052020-02-16 23:48:33 -0800498 }
499 });
500
501 int pi1_client_statistics_count = 0;
Austin Schuh367a7f42021-11-23 23:04:36 -0800502 int pi1_connected_client_statistics_count = 0;
503 ping_event_loop.MakeWatcher(
504 "/pi1/aos",
505 [&pi1_client_statistics_count,
506 &pi1_connected_client_statistics_count](const ClientStatistics &stats) {
507 VLOG(1) << "/pi1/aos ClientStatistics " << FlatbufferToJson(&stats);
Austin Schuh7bc59052020-02-16 23:48:33 -0800508
Austin Schuh367a7f42021-11-23 23:04:36 -0800509 for (const ClientConnection *connection : *stats.connections()) {
510 if (connection->has_monotonic_offset()) {
511 ++pi1_client_statistics_count;
512 // It takes at least 10 microseconds to send a message between the
513 // client and server. The min (filtered) time shouldn't be over 10
514 // milliseconds on localhost. This might have to bump up if this is
515 // proving flaky.
516 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
517 chrono::milliseconds(10))
518 << " " << connection->monotonic_offset()
519 << "ns vs 10000ns on iteration " << pi1_client_statistics_count;
520 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
521 chrono::microseconds(10))
522 << " " << connection->monotonic_offset()
523 << "ns vs 10000ns on iteration " << pi1_client_statistics_count;
524 }
525 if (connection->state() == State::CONNECTED) {
526 EXPECT_EQ(connection->connection_count(), 1u);
527 EXPECT_LT(monotonic_clock::time_point(chrono::nanoseconds(
528 connection->connected_since_time())),
529 monotonic_clock::now());
530 // The first Connected message may not have a UUID in it since no
531 // data has flown. That's fine.
532 if (pi1_connected_client_statistics_count > 0) {
533 EXPECT_TRUE(connection->has_boot_uuid())
534 << ": " << aos::FlatbufferToJson(connection);
535 }
536 ++pi1_connected_client_statistics_count;
537 } else {
538 EXPECT_FALSE(connection->has_connection_count());
539 EXPECT_FALSE(connection->has_connected_since_time());
540 }
541 }
542 });
Austin Schuh7bc59052020-02-16 23:48:33 -0800543
544 int pi2_client_statistics_count = 0;
Austin Schuh367a7f42021-11-23 23:04:36 -0800545 int pi2_connected_client_statistics_count = 0;
546 pong_event_loop.MakeWatcher(
547 "/pi2/aos",
548 [&pi2_client_statistics_count,
549 &pi2_connected_client_statistics_count](const ClientStatistics &stats) {
550 VLOG(1) << "/pi2/aos ClientStatistics " << FlatbufferToJson(&stats);
Austin Schuh7bc59052020-02-16 23:48:33 -0800551
Austin Schuh367a7f42021-11-23 23:04:36 -0800552 for (const ClientConnection *connection : *stats.connections()) {
553 if (connection->has_monotonic_offset()) {
554 ++pi2_client_statistics_count;
555 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
556 chrono::milliseconds(10))
557 << ": got " << aos::FlatbufferToJson(connection);
558 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
559 chrono::microseconds(10))
560 << ": got " << aos::FlatbufferToJson(connection);
561 }
562 if (connection->state() == State::CONNECTED) {
563 EXPECT_EQ(connection->connection_count(), 1u);
564 EXPECT_LT(monotonic_clock::time_point(chrono::nanoseconds(
565 connection->connected_since_time())),
566 monotonic_clock::now());
567 if (pi2_connected_client_statistics_count > 0) {
568 EXPECT_TRUE(connection->has_boot_uuid());
569 }
570 ++pi2_connected_client_statistics_count;
571 } else {
572 EXPECT_FALSE(connection->has_connection_count());
573 EXPECT_FALSE(connection->has_connected_since_time());
574 }
575 }
576 });
Austin Schuh7bc59052020-02-16 23:48:33 -0800577
Austin Schuh196a4452020-03-15 23:12:03 -0700578 ping_event_loop.MakeWatcher("/pi1/aos", [](const Timestamp &timestamp) {
Austin Schuh7bc59052020-02-16 23:48:33 -0800579 EXPECT_TRUE(timestamp.has_offsets());
Austin Schuh1ca49e92020-12-11 00:01:27 -0800580 VLOG(1) << "/pi1/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuh7bc59052020-02-16 23:48:33 -0800581 });
Austin Schuh196a4452020-03-15 23:12:03 -0700582 pong_event_loop.MakeWatcher("/pi2/aos", [](const Timestamp &timestamp) {
Austin Schuh7bc59052020-02-16 23:48:33 -0800583 EXPECT_TRUE(timestamp.has_offsets());
Austin Schuh1ca49e92020-12-11 00:01:27 -0800584 VLOG(1) << "/pi2/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuh7bc59052020-02-16 23:48:33 -0800585 });
586
587 // Run for 5 seconds to make sure we have time to estimate the offset.
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800588 aos::TimerHandler *quit = ping_event_loop.AddTimer(
589 [&ping_event_loop]() { ping_event_loop.Exit(); });
590 ping_event_loop.OnRun([quit, &ping_event_loop]() {
Austin Schuh7bc59052020-02-16 23:48:33 -0800591 // Stop between timestamps, not exactly on them.
592 quit->Setup(ping_event_loop.monotonic_now() + chrono::milliseconds(5050));
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800593 });
594
Austin Schuh2f8fd752020-09-01 22:38:28 -0700595 // Find the channel index for both the /pi1/aos Timestamp channel and Ping
596 // channel.
597 const size_t pi1_timestamp_channel = configuration::ChannelIndex(
598 pong_event_loop.configuration(), pi1_on_pi2_timestamp_fetcher.channel());
599 const size_t ping_timestamp_channel =
600 configuration::ChannelIndex(delivered_messages_event_loop.configuration(),
601 ping_on_pi2_fetcher.channel());
602
603 for (const Channel *channel : *ping_event_loop.configuration()->channels()) {
604 VLOG(1) << "Channel "
605 << configuration::ChannelIndex(ping_event_loop.configuration(),
606 channel)
607 << " " << configuration::CleanedChannelToString(channel);
608 }
609
610 // For each remote timestamp we get back, confirm that it is either a ping
611 // message, or a timestamp we sent out. Also confirm that the timestamps are
612 // correct.
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800613 for (std::pair<int, std::string> channel :
614 shared()
615 ? std::vector<std::pair<
616 int, std::string>>{{-1, "/pi1/aos/remote_timestamps/pi2"}}
617 : std::vector<std::pair<int, std::string>>{
618 {pi1_timestamp_channel,
619 "/pi1/aos/remote_timestamps/pi2/pi1/aos/"
620 "aos-message_bridge-Timestamp"},
621 {ping_timestamp_channel,
622 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping"}}) {
623 ping_event_loop.MakeWatcher(
624 channel.second,
625 [pi1_timestamp_channel, ping_timestamp_channel, &ping_on_pi2_fetcher,
626 &ping_on_pi1_fetcher, &pi1_on_pi2_timestamp_fetcher,
627 &pi1_on_pi1_timestamp_fetcher,
628 channel_index = channel.first](const RemoteMessage &header) {
629 VLOG(1) << "/pi1/aos/remote_timestamps/pi2 RemoteMessage "
630 << aos::FlatbufferToJson(&header);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700631
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800632 EXPECT_TRUE(header.has_boot_uuid());
633 if (channel_index != -1) {
634 ASSERT_EQ(channel_index, header.channel_index());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700635 }
636
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800637 const aos::monotonic_clock::time_point header_monotonic_sent_time(
638 chrono::nanoseconds(header.monotonic_sent_time()));
639 const aos::realtime_clock::time_point header_realtime_sent_time(
640 chrono::nanoseconds(header.realtime_sent_time()));
641 const aos::monotonic_clock::time_point header_monotonic_remote_time(
642 chrono::nanoseconds(header.monotonic_remote_time()));
643 const aos::realtime_clock::time_point header_realtime_remote_time(
644 chrono::nanoseconds(header.realtime_remote_time()));
645
646 const Context *pi1_context = nullptr;
647 const Context *pi2_context = nullptr;
648
649 if (header.channel_index() == pi1_timestamp_channel) {
650 // Find the forwarded message.
651 while (pi1_on_pi2_timestamp_fetcher.context().monotonic_event_time <
652 header_monotonic_sent_time) {
653 ASSERT_TRUE(pi1_on_pi2_timestamp_fetcher.FetchNext());
654 }
655
656 // And the source message.
657 while (pi1_on_pi1_timestamp_fetcher.context().monotonic_event_time <
658 header_monotonic_remote_time) {
659 ASSERT_TRUE(pi1_on_pi1_timestamp_fetcher.FetchNext());
660 }
661
662 pi1_context = &pi1_on_pi1_timestamp_fetcher.context();
663 pi2_context = &pi1_on_pi2_timestamp_fetcher.context();
664 } else if (header.channel_index() == ping_timestamp_channel) {
665 // Find the forwarded message.
666 while (ping_on_pi2_fetcher.context().monotonic_event_time <
667 header_monotonic_sent_time) {
668 ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
669 }
670
671 // And the source message.
672 while (ping_on_pi1_fetcher.context().monotonic_event_time <
673 header_monotonic_remote_time) {
674 ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
675 }
676
677 pi1_context = &ping_on_pi1_fetcher.context();
678 pi2_context = &ping_on_pi2_fetcher.context();
679 } else {
680 LOG(FATAL) << "Unknown channel";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700681 }
682
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800683 // Confirm the forwarded message has matching timestamps to the
684 // timestamps we got back.
685 EXPECT_EQ(pi2_context->queue_index, header.queue_index());
686 EXPECT_EQ(pi2_context->monotonic_event_time,
687 header_monotonic_sent_time);
688 EXPECT_EQ(pi2_context->realtime_event_time,
689 header_realtime_sent_time);
690 EXPECT_EQ(pi2_context->realtime_remote_time,
691 header_realtime_remote_time);
692 EXPECT_EQ(pi2_context->monotonic_remote_time,
693 header_monotonic_remote_time);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700694
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800695 // Confirm the forwarded message also matches the source message.
696 EXPECT_EQ(pi1_context->queue_index, header.queue_index());
697 EXPECT_EQ(pi1_context->monotonic_event_time,
698 header_monotonic_remote_time);
699 EXPECT_EQ(pi1_context->realtime_event_time,
700 header_realtime_remote_time);
701 });
702 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700703
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800704 // Start everything up. Pong is the only thing we don't know how to wait
705 // on, so start it first.
Austin Schuh7bc59052020-02-16 23:48:33 -0800706 std::thread pong_thread([&pong_event_loop]() { pong_event_loop.Run(); });
707
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800708 StartPi1Server();
709 StartPi1Client();
710 StartPi2Client();
711 StartPi2Server();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800712
713 // And go!
714 ping_event_loop.Run();
715
716 // Shut everyone else down
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800717 StopPi1Server();
718 StopPi1Client();
719 StopPi2Client();
720 StopPi2Server();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800721 pong_event_loop.Exit();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800722 pong_thread.join();
723
724 // Make sure we sent something.
725 EXPECT_GE(ping_count, 1);
726 // And got something back.
727 EXPECT_GE(pong_count, 1);
Austin Schuh7bc59052020-02-16 23:48:33 -0800728
729 // Confirm that we are estimating a monotonic offset on the client.
730 ASSERT_TRUE(client_statistics_fetcher.Fetch());
731
732 EXPECT_EQ(client_statistics_fetcher->connections()->size(), 1u);
733 EXPECT_EQ(client_statistics_fetcher->connections()
734 ->Get(0)
735 ->node()
736 ->name()
737 ->string_view(),
738 "pi1");
739
740 // Make sure the offset in one direction is less than a second.
741 EXPECT_GT(
Austin Schuh2b159eb2021-07-31 19:42:21 -0700742 client_statistics_fetcher->connections()->Get(0)->monotonic_offset(), 0)
743 << aos::FlatbufferToJson(client_statistics_fetcher.get());
Austin Schuh7bc59052020-02-16 23:48:33 -0800744 EXPECT_LT(
745 client_statistics_fetcher->connections()->Get(0)->monotonic_offset(),
Austin Schuh2b159eb2021-07-31 19:42:21 -0700746 1000000000)
747 << aos::FlatbufferToJson(client_statistics_fetcher.get());
Austin Schuh7bc59052020-02-16 23:48:33 -0800748
749 EXPECT_GE(pi1_server_statistics_count, 2);
750 EXPECT_GE(pi2_server_statistics_count, 2);
751 EXPECT_GE(pi1_client_statistics_count, 2);
752 EXPECT_GE(pi2_client_statistics_count, 2);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700753
754 // Confirm we got timestamps back!
755 EXPECT_TRUE(message_header_fetcher1.Fetch());
756 EXPECT_TRUE(message_header_fetcher2.Fetch());
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800757}
758
Austin Schuh5344c352020-04-12 17:04:26 -0700759// Test that the client disconnecting triggers the server offsets on both sides
760// to clear.
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800761TEST_P(MessageBridgeParameterizedTest, ClientRestart) {
Austin Schuh5344c352020-04-12 17:04:26 -0700762 // This is rather annoying to set up. We need to start up a client and
763 // server, on the same node, but get them to think that they are on different
764 // nodes.
765 //
766 // We need the client to not post directly to "/test" like it would in a
767 // real system, otherwise we will re-send the ping message... So, use an
768 // application specific map to have the client post somewhere else.
769 //
770 // To top this all off, each of these needs to be done with a ShmEventLoop,
771 // which needs to run in a separate thread... And it is really hard to get
772 // everything started up reliably. So just be super generous on timeouts and
773 // hope for the best. We can be more generous in the future if we need to.
774 //
775 // We are faking the application names by passing in --application_name=foo
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800776 OnPi1();
Austin Schuh5344c352020-04-12 17:04:26 -0700777
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800778 MakePi1Server();
779 MakePi1Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700780
781 // And build the app for testing.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800782 MakePi1Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700783 aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800784 pi1_test_event_loop->MakeFetcher<ServerStatistics>("/pi1/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700785
786 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800787 OnPi2();
788 MakePi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700789
790 // And build the app for testing.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800791 MakePi2Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700792 aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800793 pi2_test_event_loop->MakeFetcher<ServerStatistics>("/pi2/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700794
795 // Wait until we are connected, then send.
Austin Schuh5344c352020-04-12 17:04:26 -0700796
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800797 StartPi1Test();
798 StartPi2Test();
799 StartPi1Server();
800 StartPi1Client();
801 StartPi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700802
803 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800804 MakePi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700805
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800806 RunPi2Client(chrono::milliseconds(3050));
Austin Schuh5344c352020-04-12 17:04:26 -0700807
808 // Now confirm we are synchronized.
809 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
810 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
811
812 const ServerConnection *const pi1_connection =
813 pi1_server_statistics_fetcher->connections()->Get(0);
814 const ServerConnection *const pi2_connection =
815 pi2_server_statistics_fetcher->connections()->Get(0);
816
817 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800818 EXPECT_EQ(pi1_connection->connection_count(), 1u);
819 EXPECT_TRUE(pi1_connection->has_connected_since_time());
Austin Schuh5344c352020-04-12 17:04:26 -0700820 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
821 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
822 chrono::milliseconds(1));
823 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
824 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800825 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700826
827 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800828 EXPECT_EQ(pi2_connection->connection_count(), 1u);
829 EXPECT_TRUE(pi2_connection->has_connected_since_time());
Austin Schuh5344c352020-04-12 17:04:26 -0700830 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
831 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
832 chrono::milliseconds(1));
833 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
834 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800835 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800836
837 StopPi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700838 }
839
Austin Schuhd0d894e2021-10-24 17:13:11 -0700840 std::this_thread::sleep_for(SctpClientConnection::kReconnectTimeout +
841 std::chrono::seconds(1));
Austin Schuh5344c352020-04-12 17:04:26 -0700842
843 {
844 // Now confirm we are un-synchronized.
845 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
846 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
847 const ServerConnection *const pi1_connection =
848 pi1_server_statistics_fetcher->connections()->Get(0);
849 const ServerConnection *const pi2_connection =
850 pi2_server_statistics_fetcher->connections()->Get(0);
851
852 EXPECT_EQ(pi1_connection->state(), State::DISCONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800853 EXPECT_EQ(pi1_connection->connection_count(), 1u);
854 EXPECT_FALSE(pi1_connection->has_connected_since_time());
Austin Schuh5344c352020-04-12 17:04:26 -0700855 EXPECT_FALSE(pi1_connection->has_monotonic_offset());
Austin Schuh20ac95d2020-12-05 17:24:19 -0800856 EXPECT_FALSE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700857 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
858 EXPECT_FALSE(pi2_connection->has_monotonic_offset());
Austin Schuh20ac95d2020-12-05 17:24:19 -0800859 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh367a7f42021-11-23 23:04:36 -0800860 EXPECT_EQ(pi2_connection->connection_count(), 1u);
861 EXPECT_TRUE(pi2_connection->has_connected_since_time());
Austin Schuh5344c352020-04-12 17:04:26 -0700862 }
863
864 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800865 MakePi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700866 // And go!
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800867 RunPi2Client(chrono::milliseconds(3050));
Austin Schuh5344c352020-04-12 17:04:26 -0700868
869 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
870 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
871
872 // Now confirm we are synchronized again.
873 const ServerConnection *const pi1_connection =
874 pi1_server_statistics_fetcher->connections()->Get(0);
875 const ServerConnection *const pi2_connection =
876 pi2_server_statistics_fetcher->connections()->Get(0);
877
878 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800879 EXPECT_EQ(pi1_connection->connection_count(), 2u);
880 EXPECT_TRUE(pi1_connection->has_connected_since_time());
Austin Schuh5344c352020-04-12 17:04:26 -0700881 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
882 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
Austin Schuh367a7f42021-11-23 23:04:36 -0800883 chrono::milliseconds(1))
884 << ": " << FlatbufferToJson(pi1_connection);
Austin Schuh5344c352020-04-12 17:04:26 -0700885 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
Austin Schuh367a7f42021-11-23 23:04:36 -0800886 chrono::milliseconds(-1))
887 << ": " << FlatbufferToJson(pi1_connection);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800888 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700889
890 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800891 EXPECT_EQ(pi2_connection->connection_count(), 1u);
892 EXPECT_TRUE(pi2_connection->has_connected_since_time());
Austin Schuh5344c352020-04-12 17:04:26 -0700893 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
894 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
Austin Schuh367a7f42021-11-23 23:04:36 -0800895 chrono::milliseconds(1))
896 << ": " << FlatbufferToJson(pi2_connection);
Austin Schuh5344c352020-04-12 17:04:26 -0700897 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
Austin Schuh367a7f42021-11-23 23:04:36 -0800898 chrono::milliseconds(-1))
899 << ": " << FlatbufferToJson(pi2_connection);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800900 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800901
902 StopPi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700903 }
904
905 // Shut everyone else down
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800906 StopPi1Server();
907 StopPi1Client();
908 StopPi2Server();
909 StopPi1Test();
910 StopPi2Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700911}
912
913// Test that the server disconnecting triggers the server offsets on the other
914// side to clear, along with the other client.
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800915TEST_P(MessageBridgeParameterizedTest, ServerRestart) {
Austin Schuh5344c352020-04-12 17:04:26 -0700916 // This is rather annoying to set up. We need to start up a client and
917 // server, on the same node, but get them to think that they are on different
918 // nodes.
919 //
920 // We need the client to not post directly to "/test" like it would in a
921 // real system, otherwise we will re-send the ping message... So, use an
922 // application specific map to have the client post somewhere else.
923 //
924 // To top this all off, each of these needs to be done with a ShmEventLoop,
925 // which needs to run in a separate thread... And it is really hard to get
926 // everything started up reliably. So just be super generous on timeouts and
927 // hope for the best. We can be more generous in the future if we need to.
928 //
929 // We are faking the application names by passing in --application_name=foo
Austin Schuh5344c352020-04-12 17:04:26 -0700930 // Force ourselves to be "raspberrypi" and allocate everything.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800931 OnPi1();
932 MakePi1Server();
933 MakePi1Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700934
935 // And build the app for testing.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800936 MakePi1Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700937 aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800938 pi1_test_event_loop->MakeFetcher<ServerStatistics>("/pi1/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700939 aos::Fetcher<ClientStatistics> pi1_client_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800940 pi1_test_event_loop->MakeFetcher<ClientStatistics>("/pi1/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700941
942 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800943 OnPi2();
944 MakePi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700945
946 // And build the app for testing.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800947 MakePi2Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700948 aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800949 pi2_test_event_loop->MakeFetcher<ServerStatistics>("/pi2/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700950
951 // Start everything up. Pong is the only thing we don't know how to wait on,
952 // so start it first.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800953 StartPi1Test();
954 StartPi2Test();
955 StartPi1Server();
956 StartPi1Client();
957 StartPi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700958
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800959 // Confirm both client and server statistics messages have decent offsets in
960 // them.
Austin Schuh5344c352020-04-12 17:04:26 -0700961
962 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800963 MakePi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700964
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800965 RunPi2Server(chrono::milliseconds(3050));
Austin Schuh5344c352020-04-12 17:04:26 -0700966
967 // Now confirm we are synchronized.
968 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
969 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
970
971 const ServerConnection *const pi1_connection =
972 pi1_server_statistics_fetcher->connections()->Get(0);
973 const ServerConnection *const pi2_connection =
974 pi2_server_statistics_fetcher->connections()->Get(0);
975
976 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
977 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
978 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
979 chrono::milliseconds(1));
980 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
981 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800982 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh367a7f42021-11-23 23:04:36 -0800983 EXPECT_TRUE(pi1_connection->has_connected_since_time());
984 EXPECT_EQ(pi1_connection->connection_count(), 1u);
Austin Schuh5344c352020-04-12 17:04:26 -0700985
986 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
987 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
988 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
989 chrono::milliseconds(1));
990 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
991 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800992 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh367a7f42021-11-23 23:04:36 -0800993 EXPECT_TRUE(pi2_connection->has_connected_since_time());
994 EXPECT_EQ(pi2_connection->connection_count(), 1u);
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800995
996 StopPi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700997 }
998
999 std::this_thread::sleep_for(std::chrono::seconds(2));
1000
1001 {
1002 // And confirm we are unsynchronized.
1003 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1004 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1005
1006 const ServerConnection *const pi1_server_connection =
1007 pi1_server_statistics_fetcher->connections()->Get(0);
1008 const ClientConnection *const pi1_client_connection =
1009 pi1_client_statistics_fetcher->connections()->Get(0);
1010
1011 EXPECT_EQ(pi1_server_connection->state(), State::CONNECTED);
1012 EXPECT_FALSE(pi1_server_connection->has_monotonic_offset());
Austin Schuh367a7f42021-11-23 23:04:36 -08001013 EXPECT_TRUE(pi1_server_connection->has_connected_since_time());
1014 EXPECT_EQ(pi1_server_connection->connection_count(), 1u);
1015
Austin Schuh20ac95d2020-12-05 17:24:19 -08001016 EXPECT_TRUE(pi1_server_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -07001017 EXPECT_EQ(pi1_client_connection->state(), State::DISCONNECTED);
1018 EXPECT_FALSE(pi1_client_connection->has_monotonic_offset());
Austin Schuh367a7f42021-11-23 23:04:36 -08001019 EXPECT_FALSE(pi1_client_connection->has_connected_since_time());
1020 EXPECT_EQ(pi1_client_connection->connection_count(), 1u);
1021 EXPECT_FALSE(pi1_client_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -07001022 }
1023
1024 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001025 MakePi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -07001026
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001027 RunPi2Server(chrono::milliseconds(3050));
Austin Schuh5344c352020-04-12 17:04:26 -07001028
1029 // And confirm we are synchronized again.
1030 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1031 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
Austin Schuh367a7f42021-11-23 23:04:36 -08001032 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
Austin Schuh5344c352020-04-12 17:04:26 -07001033
1034 const ServerConnection *const pi1_connection =
1035 pi1_server_statistics_fetcher->connections()->Get(0);
1036 const ServerConnection *const pi2_connection =
1037 pi2_server_statistics_fetcher->connections()->Get(0);
Austin Schuh367a7f42021-11-23 23:04:36 -08001038 const ClientConnection *const pi1_client_connection =
1039 pi1_client_statistics_fetcher->connections()->Get(0);
Austin Schuh5344c352020-04-12 17:04:26 -07001040
1041 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
1042 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
1043 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
1044 chrono::milliseconds(1));
1045 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
1046 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -08001047 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -07001048
Austin Schuh367a7f42021-11-23 23:04:36 -08001049 EXPECT_EQ(pi1_client_connection->state(), State::CONNECTED);
1050 EXPECT_TRUE(pi1_client_connection->has_connected_since_time());
1051 EXPECT_EQ(pi1_client_connection->connection_count(), 2u);
1052 EXPECT_TRUE(pi1_client_connection->has_boot_uuid());
1053
Austin Schuh5344c352020-04-12 17:04:26 -07001054 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
1055 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
1056 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
1057 chrono::milliseconds(1));
1058 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
1059 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -08001060 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001061
1062 StopPi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -07001063 }
1064
1065 // Shut everyone else down
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001066 StopPi1Server();
1067 StopPi1Client();
1068 StopPi2Client();
1069 StopPi1Test();
1070 StopPi2Test();
Austin Schuh5344c352020-04-12 17:04:26 -07001071}
1072
Austin Schuh4889b182020-11-18 19:11:56 -08001073// TODO(austin): The above test confirms that the external state does the right
Austin Schuh5344c352020-04-12 17:04:26 -07001074// thing, but doesn't confirm that the internal state does. We either need to
1075// expose a way to check the state in a thread-safe way, or need a way to jump
1076// time for one node to do that.
1077
Austin Schuh4889b182020-11-18 19:11:56 -08001078void SendPing(aos::Sender<examples::Ping> *sender, int value) {
1079 aos::Sender<examples::Ping>::Builder builder = sender->MakeBuilder();
1080 examples::Ping::Builder ping_builder = builder.MakeBuilder<examples::Ping>();
1081 ping_builder.add_value(value);
milind1f1dca32021-07-03 13:50:07 -07001082 builder.CheckOk(builder.Send(ping_builder.Finish()));
Austin Schuh4889b182020-11-18 19:11:56 -08001083}
1084
1085// Tests that when a message is sent before the bridge starts up, but is
1086// configured as reliable, we forward it. Confirm this survives a client reset.
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001087TEST_P(MessageBridgeParameterizedTest, ReliableSentBeforeClientStartup) {
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001088 OnPi1();
Austin Schuh4889b182020-11-18 19:11:56 -08001089
1090 FLAGS_application_name = "sender";
Austin Schuhf466ab52021-02-16 22:00:38 -08001091 aos::ShmEventLoop send_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -08001092 aos::Sender<examples::Ping> ping_sender =
1093 send_event_loop.MakeSender<examples::Ping>("/test");
1094 SendPing(&ping_sender, 1);
1095 aos::Sender<examples::Ping> unreliable_ping_sender =
1096 send_event_loop.MakeSender<examples::Ping>("/unreliable");
1097 SendPing(&unreliable_ping_sender, 1);
1098
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001099 MakePi1Server();
1100 MakePi1Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001101
1102 FLAGS_application_name = "pi1_timestamp";
Austin Schuhf466ab52021-02-16 22:00:38 -08001103 aos::ShmEventLoop pi1_remote_timestamp_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -08001104
1105 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001106 OnPi2();
Austin Schuh4889b182020-11-18 19:11:56 -08001107
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001108 MakePi2Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001109
Austin Schuhf466ab52021-02-16 22:00:38 -08001110 aos::ShmEventLoop receive_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -08001111 aos::Fetcher<examples::Ping> ping_fetcher =
1112 receive_event_loop.MakeFetcher<examples::Ping>("/test");
1113 aos::Fetcher<examples::Ping> unreliable_ping_fetcher =
1114 receive_event_loop.MakeFetcher<examples::Ping>("/unreliable");
1115 aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
1116 receive_event_loop.MakeFetcher<ClientStatistics>("/pi2/aos");
1117
1118 const size_t ping_channel_index = configuration::ChannelIndex(
1119 receive_event_loop.configuration(), ping_fetcher.channel());
1120
1121 std::atomic<int> ping_timestamp_count{0};
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001122 const std::string channel_name =
1123 shared() ? "/pi1/aos/remote_timestamps/pi2"
1124 : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping";
Austin Schuh4889b182020-11-18 19:11:56 -08001125 pi1_remote_timestamp_event_loop.MakeWatcher(
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001126 channel_name, [this, channel_name, ping_channel_index,
1127 &ping_timestamp_count](const RemoteMessage &header) {
Austin Schuh61e973f2021-02-21 21:43:56 -08001128 VLOG(1) << channel_name << " RemoteMessage "
Austin Schuh0de30f32020-12-06 12:44:28 -08001129 << aos::FlatbufferToJson(&header);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001130 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001131 if (shared() && header.channel_index() != ping_channel_index) {
1132 return;
Austin Schuh4889b182020-11-18 19:11:56 -08001133 }
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001134 CHECK_EQ(header.channel_index(), ping_channel_index);
1135 ++ping_timestamp_count;
Austin Schuh4889b182020-11-18 19:11:56 -08001136 });
1137
1138 // Before everything starts up, confirm there is no message.
1139 EXPECT_FALSE(ping_fetcher.Fetch());
1140 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
1141
1142 // Spin up the persistant pieces.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001143 StartPi1Server();
1144 StartPi1Client();
1145 StartPi2Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001146
1147 // Event used to wait for the timestamp counting thread to start.
1148 aos::Event event;
1149 std::thread pi1_remote_timestamp_thread(
1150 [&pi1_remote_timestamp_event_loop, &event]() {
1151 pi1_remote_timestamp_event_loop.OnRun([&event]() { event.Set(); });
1152 pi1_remote_timestamp_event_loop.Run();
1153 });
1154
1155 event.Wait();
1156
1157 {
1158 // Now, spin up a client for 2 seconds.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001159 MakePi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001160
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001161 RunPi2Client(chrono::milliseconds(2050));
Austin Schuh4889b182020-11-18 19:11:56 -08001162
1163 // Confirm there is no detected duplicate packet.
1164 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1165 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1166 ->Get(0)
1167 ->duplicate_packets(),
1168 0u);
1169
Austin Schuhe61d4382021-03-31 21:33:02 -07001170 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1171 ->Get(0)
1172 ->partial_deliveries(),
1173 0u);
1174
Austin Schuh4889b182020-11-18 19:11:56 -08001175 EXPECT_TRUE(ping_fetcher.Fetch());
1176 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
1177 EXPECT_EQ(ping_timestamp_count, 1);
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001178
1179 StopPi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001180 }
1181
1182 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001183 // Now, spin up a client for 2 seconds.
1184 MakePi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001185
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001186 RunPi2Client(chrono::milliseconds(5050));
Austin Schuh4889b182020-11-18 19:11:56 -08001187
1188 // Confirm we detect the duplicate packet correctly.
1189 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1190 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1191 ->Get(0)
1192 ->duplicate_packets(),
1193 1u);
1194
Austin Schuhe61d4382021-03-31 21:33:02 -07001195 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1196 ->Get(0)
1197 ->partial_deliveries(),
1198 0u);
1199
Austin Schuh4889b182020-11-18 19:11:56 -08001200 EXPECT_EQ(ping_timestamp_count, 1);
1201 EXPECT_FALSE(ping_fetcher.Fetch());
1202 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001203
1204 StopPi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001205 }
1206
1207 // Shut everyone else down
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001208 StopPi1Client();
1209 StopPi2Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001210 pi1_remote_timestamp_event_loop.Exit();
1211 pi1_remote_timestamp_thread.join();
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001212 StopPi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001213}
1214
1215// Tests that when a message is sent before the bridge starts up, but is
1216// configured as reliable, we forward it. Confirm this works across server
1217// resets.
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001218TEST_P(MessageBridgeParameterizedTest, ReliableSentBeforeServerStartup) {
Austin Schuh4889b182020-11-18 19:11:56 -08001219 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001220 OnPi2();
Austin Schuh4889b182020-11-18 19:11:56 -08001221
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001222 MakePi2Server();
1223 MakePi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001224
Austin Schuhf466ab52021-02-16 22:00:38 -08001225 aos::ShmEventLoop receive_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -08001226 aos::Fetcher<examples::Ping> ping_fetcher =
1227 receive_event_loop.MakeFetcher<examples::Ping>("/test");
1228 aos::Fetcher<examples::Ping> unreliable_ping_fetcher =
1229 receive_event_loop.MakeFetcher<examples::Ping>("/unreliable");
1230 aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
1231 receive_event_loop.MakeFetcher<ClientStatistics>("/pi2/aos");
1232
Austin Schuh4889b182020-11-18 19:11:56 -08001233 // Force ourselves to be "raspberrypi" and allocate everything.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001234 OnPi1();
Austin Schuh4889b182020-11-18 19:11:56 -08001235
1236 FLAGS_application_name = "sender";
Austin Schuhf466ab52021-02-16 22:00:38 -08001237 aos::ShmEventLoop send_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -08001238 aos::Sender<examples::Ping> ping_sender =
1239 send_event_loop.MakeSender<examples::Ping>("/test");
1240 {
1241 aos::Sender<examples::Ping>::Builder builder = ping_sender.MakeBuilder();
1242 examples::Ping::Builder ping_builder =
1243 builder.MakeBuilder<examples::Ping>();
1244 ping_builder.add_value(1);
milind1f1dca32021-07-03 13:50:07 -07001245 builder.CheckOk(builder.Send(ping_builder.Finish()));
Austin Schuh4889b182020-11-18 19:11:56 -08001246 }
1247
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001248 MakePi1Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001249
1250 FLAGS_application_name = "pi1_timestamp";
Austin Schuhf466ab52021-02-16 22:00:38 -08001251 aos::ShmEventLoop pi1_remote_timestamp_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -08001252
1253 const size_t ping_channel_index = configuration::ChannelIndex(
1254 receive_event_loop.configuration(), ping_fetcher.channel());
1255
1256 std::atomic<int> ping_timestamp_count{0};
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001257 const std::string channel_name =
1258 shared() ? "/pi1/aos/remote_timestamps/pi2"
1259 : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping";
Austin Schuh4889b182020-11-18 19:11:56 -08001260 pi1_remote_timestamp_event_loop.MakeWatcher(
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001261 channel_name, [this, channel_name, ping_channel_index,
1262 &ping_timestamp_count](const RemoteMessage &header) {
1263 VLOG(1) << channel_name << " RemoteMessage "
Austin Schuh0de30f32020-12-06 12:44:28 -08001264 << aos::FlatbufferToJson(&header);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001265 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001266 if (shared() && header.channel_index() != ping_channel_index) {
1267 return;
Austin Schuh4889b182020-11-18 19:11:56 -08001268 }
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001269 CHECK_EQ(header.channel_index(), ping_channel_index);
1270 ++ping_timestamp_count;
Austin Schuh4889b182020-11-18 19:11:56 -08001271 });
1272
1273 // Before everything starts up, confirm there is no message.
1274 EXPECT_FALSE(ping_fetcher.Fetch());
1275 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
1276
1277 // Spin up the persistant pieces.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001278 StartPi1Client();
1279 StartPi2Server();
1280 StartPi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001281
1282 // Event used to wait for the timestamp counting thread to start.
1283 aos::Event event;
1284 std::thread pi1_remote_timestamp_thread(
1285 [&pi1_remote_timestamp_event_loop, &event]() {
1286 pi1_remote_timestamp_event_loop.OnRun([&event]() { event.Set(); });
1287 pi1_remote_timestamp_event_loop.Run();
1288 });
1289
1290 event.Wait();
1291
1292 {
1293 // Now, spin up a server for 2 seconds.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001294 MakePi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001295
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001296 RunPi1Server(chrono::milliseconds(2050));
Austin Schuh4889b182020-11-18 19:11:56 -08001297
1298 // Confirm there is no detected duplicate packet.
1299 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1300 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1301 ->Get(0)
1302 ->duplicate_packets(),
1303 0u);
1304
Austin Schuhe61d4382021-03-31 21:33:02 -07001305 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1306 ->Get(0)
1307 ->partial_deliveries(),
1308 0u);
1309
Austin Schuh4889b182020-11-18 19:11:56 -08001310 EXPECT_TRUE(ping_fetcher.Fetch());
1311 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
1312 EXPECT_EQ(ping_timestamp_count, 1);
1313 LOG(INFO) << "Shutting down first pi1 MessageBridgeServer";
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001314
1315 StopPi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001316 }
1317
1318 {
1319 // Now, spin up a second server for 2 seconds.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001320 MakePi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001321
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001322 RunPi1Server(chrono::milliseconds(2050));
Austin Schuh4889b182020-11-18 19:11:56 -08001323
1324 // Confirm we detect the duplicate packet correctly.
1325 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1326 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1327 ->Get(0)
1328 ->duplicate_packets(),
1329 1u);
1330
Austin Schuhe61d4382021-03-31 21:33:02 -07001331 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1332 ->Get(0)
1333 ->partial_deliveries(),
1334 0u);
1335
Austin Schuh4889b182020-11-18 19:11:56 -08001336 EXPECT_EQ(ping_timestamp_count, 1);
1337 EXPECT_FALSE(ping_fetcher.Fetch());
1338 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001339
1340 StopPi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001341 }
1342
1343 // Shut everyone else down
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001344 StopPi1Client();
1345 StopPi2Server();
1346 StopPi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001347 pi1_remote_timestamp_event_loop.Exit();
1348 pi1_remote_timestamp_thread.join();
Austin Schuh4889b182020-11-18 19:11:56 -08001349}
1350
Austin Schuhb0e439d2023-05-15 10:55:40 -07001351// Test that differing config sha256's result in no connection.
1352TEST_P(MessageBridgeParameterizedTest, MismatchedSha256) {
1353 // This is rather annoying to set up. We need to start up a client and
1354 // server, on the same node, but get them to think that they are on different
1355 // nodes.
1356 //
1357 // We need the client to not post directly to "/test" like it would in a
1358 // real system, otherwise we will re-send the ping message... So, use an
1359 // application specific map to have the client post somewhere else.
1360 //
1361 // To top this all off, each of these needs to be done with a ShmEventLoop,
1362 // which needs to run in a separate thread... And it is really hard to get
1363 // everything started up reliably. So just be super generous on timeouts and
1364 // hope for the best. We can be more generous in the future if we need to.
1365 //
1366 // We are faking the application names by passing in --application_name=foo
1367 OnPi1();
1368
Austin Schuh530f9ee2023-05-15 14:29:31 -07001369 MakePi1Server(
1370 "dummy sha256 ");
Austin Schuhb0e439d2023-05-15 10:55:40 -07001371 MakePi1Client();
1372
1373 // And build the app for testing.
1374 MakePi1Test();
1375 aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
1376 pi1_test_event_loop->MakeFetcher<ServerStatistics>("/pi1/aos");
1377 aos::Fetcher<ClientStatistics> pi1_client_statistics_fetcher =
1378 pi1_test_event_loop->MakeFetcher<ClientStatistics>("/pi1/aos");
1379
1380 // Now do it for "raspberrypi2", the client.
1381 OnPi2();
1382 MakePi2Server();
1383
1384 // And build the app for testing.
1385 MakePi2Test();
1386 aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
1387 pi2_test_event_loop->MakeFetcher<ServerStatistics>("/pi2/aos");
1388 aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
1389 pi2_test_event_loop->MakeFetcher<ClientStatistics>("/pi2/aos");
1390
1391 // Wait until we are connected, then send.
1392
1393 StartPi1Test();
1394 StartPi2Test();
1395 StartPi1Server();
1396 StartPi1Client();
1397 StartPi2Server();
1398
1399 {
1400 MakePi2Client();
1401
1402 RunPi2Client(chrono::milliseconds(3050));
1403
1404 // Now confirm we are synchronized.
1405 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1406 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1407 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1408 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1409
1410 const ServerConnection *const pi1_connection =
1411 pi1_server_statistics_fetcher->connections()->Get(0);
1412 const ClientConnection *const pi1_client_connection =
1413 pi1_client_statistics_fetcher->connections()->Get(0);
1414 const ServerConnection *const pi2_connection =
1415 pi2_server_statistics_fetcher->connections()->Get(0);
1416 const ClientConnection *const pi2_client_connection =
1417 pi2_client_statistics_fetcher->connections()->Get(0);
1418
1419 // Make sure one direction is disconnected with a bunch of connection
1420 // attempts and failures.
1421 EXPECT_EQ(pi1_connection->state(), State::DISCONNECTED);
1422 EXPECT_EQ(pi1_connection->connection_count(), 0u);
1423 EXPECT_GT(pi1_connection->invalid_connection_count(), 10u);
1424
1425 EXPECT_EQ(pi2_client_connection->state(), State::DISCONNECTED);
1426 EXPECT_GT(pi2_client_connection->connection_count(), 10u);
1427
1428 // And the other direction is happy.
1429 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
1430 EXPECT_EQ(pi2_connection->connection_count(), 1u);
1431 EXPECT_TRUE(pi2_connection->has_connected_since_time());
1432 EXPECT_FALSE(pi2_connection->has_monotonic_offset());
1433 EXPECT_TRUE(pi2_connection->has_boot_uuid());
1434
1435 EXPECT_EQ(pi1_client_connection->state(), State::CONNECTED);
1436 EXPECT_EQ(pi1_client_connection->connection_count(), 1u);
1437
1438 VLOG(1) << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1439 VLOG(1) << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1440 VLOG(1) << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1441 VLOG(1) << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1442
1443 StopPi2Client();
1444 }
1445
1446 // Shut everyone else down
1447 StopPi1Server();
1448 StopPi1Client();
1449 StopPi2Server();
1450 StopPi1Test();
1451 StopPi2Test();
1452}
1453
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -07001454INSTANTIATE_TEST_SUITE_P(
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001455 MessageBridgeTests, MessageBridgeParameterizedTest,
1456 ::testing::Values(
1457 Param{"message_bridge_test_combined_timestamps_common_config.json",
1458 true},
1459 Param{"message_bridge_test_common_config.json", false}));
1460
Austin Schuhe84c3ed2019-12-14 15:29:48 -08001461} // namespace testing
1462} // namespace message_bridge
1463} // namespace aos