blob: 07a56f060e0aa3dd4f6f7dc0007e89ceb2cce8f7 [file] [log] [blame]
Austin Schuhe84c3ed2019-12-14 15:29:48 -08001#include <chrono>
2#include <thread>
3
Austin Schuh2f8fd752020-09-01 22:38:28 -07004#include "absl/strings/str_cat.h"
Philipp Schrader790cb542023-07-05 21:06:52 -07005#include "gtest/gtest.h"
6
Austin Schuhe84c3ed2019-12-14 15:29:48 -08007#include "aos/events/ping_generated.h"
8#include "aos/events/pong_generated.h"
Brian Silverman7b266d92021-02-17 21:24:02 -08009#include "aos/ipc_lib/event.h"
Austin Schuhe84c3ed2019-12-14 15:29:48 -080010#include "aos/network/message_bridge_client_lib.h"
11#include "aos/network/message_bridge_server_lib.h"
Jim Ostrowski2192ddb2020-06-24 19:07:31 -070012#include "aos/network/team_number.h"
Austin Schuhb0e439d2023-05-15 10:55:40 -070013#include "aos/sha256.h"
Austin Schuh373f1762021-06-02 21:07:09 -070014#include "aos/testing/path.h"
Austin Schuhe991fe22020-11-18 16:53:39 -080015#include "aos/util/file.h"
Austin Schuhe84c3ed2019-12-14 15:29:48 -080016
Austin Schuh8902fa52021-03-14 22:39:24 -070017DECLARE_string(boot_uuid);
18
Austin Schuhe84c3ed2019-12-14 15:29:48 -080019namespace aos {
Austin Schuh2f8fd752020-09-01 22:38:28 -070020void SetShmBase(const std::string_view base);
21
Austin Schuhe84c3ed2019-12-14 15:29:48 -080022namespace message_bridge {
23namespace testing {
24
Austin Schuh373f1762021-06-02 21:07:09 -070025using aos::testing::ArtifactPath;
26
Austin Schuhe84c3ed2019-12-14 15:29:48 -080027namespace chrono = std::chrono;
28
Austin Schuhe991fe22020-11-18 16:53:39 -080029std::string ShmBase(const std::string_view node) {
Austin Schuh2f8fd752020-09-01 22:38:28 -070030 const char *tmpdir_c_str = getenv("TEST_TMPDIR");
31 if (tmpdir_c_str != nullptr) {
Austin Schuhe991fe22020-11-18 16:53:39 -080032 return absl::StrCat(tmpdir_c_str, "/", node);
Austin Schuh2f8fd752020-09-01 22:38:28 -070033 } else {
Austin Schuhe991fe22020-11-18 16:53:39 -080034 return absl::StrCat("/dev/shm/", node);
Austin Schuh2f8fd752020-09-01 22:38:28 -070035 }
36}
37
Austin Schuhe991fe22020-11-18 16:53:39 -080038void DoSetShmBase(const std::string_view node) {
39 aos::SetShmBase(ShmBase(node));
40}
41
Austin Schuh36a2c3e2021-02-18 22:28:38 -080042// Parameters to run all the tests with.
43struct Param {
44 // The config file to use.
45 std::string config;
46 // If true, the RemoteMessage channel should be shared between all the remote
47 // channels. If false, there will be 1 RemoteMessage channel per remote
48 // channel.
49 bool shared;
50};
51
52class MessageBridgeParameterizedTest
53 : public ::testing::TestWithParam<struct Param> {
Austin Schuh0de30f32020-12-06 12:44:28 -080054 public:
Austin Schuh36a2c3e2021-02-18 22:28:38 -080055 MessageBridgeParameterizedTest()
56 : config(aos::configuration::ReadConfig(
Austin Schuh373f1762021-06-02 21:07:09 -070057 ArtifactPath(absl::StrCat("aos/network/", GetParam().config)))),
Austin Schuhb0e439d2023-05-15 10:55:40 -070058 config_sha256(Sha256(config.span())),
Austin Schuh8902fa52021-03-14 22:39:24 -070059 pi1_boot_uuid_(UUID::Random()),
60 pi2_boot_uuid_(UUID::Random()) {
Austin Schuh0de30f32020-12-06 12:44:28 -080061 util::UnlinkRecursive(ShmBase("pi1"));
62 util::UnlinkRecursive(ShmBase("pi2"));
63 }
Austin Schuhe991fe22020-11-18 16:53:39 -080064
Austin Schuh36a2c3e2021-02-18 22:28:38 -080065 bool shared() const { return GetParam().shared; }
66
Austin Schuh0a2f12f2021-01-08 22:48:29 -080067 void OnPi1() {
68 DoSetShmBase("pi1");
69 FLAGS_override_hostname = "raspberrypi";
Austin Schuh8902fa52021-03-14 22:39:24 -070070 FLAGS_boot_uuid = pi1_boot_uuid_.ToString();
Austin Schuh0a2f12f2021-01-08 22:48:29 -080071 }
72
73 void OnPi2() {
74 DoSetShmBase("pi2");
75 FLAGS_override_hostname = "raspberrypi2";
Austin Schuh8902fa52021-03-14 22:39:24 -070076 FLAGS_boot_uuid = pi2_boot_uuid_.ToString();
Austin Schuh0a2f12f2021-01-08 22:48:29 -080077 }
78
Austin Schuhb0e439d2023-05-15 10:55:40 -070079 void MakePi1Server(std::string server_config_sha256 = "") {
Austin Schuh0a2f12f2021-01-08 22:48:29 -080080 OnPi1();
81 FLAGS_application_name = "pi1_message_bridge_server";
82 pi1_server_event_loop =
Austin Schuhf466ab52021-02-16 22:00:38 -080083 std::make_unique<aos::ShmEventLoop>(&config.message());
Austin Schuh0a2f12f2021-01-08 22:48:29 -080084 pi1_server_event_loop->SetRuntimeRealtimePriority(1);
Austin Schuhb0e439d2023-05-15 10:55:40 -070085 pi1_message_bridge_server = std::make_unique<MessageBridgeServer>(
86 pi1_server_event_loop.get(), server_config_sha256.size() == 0
87 ? config_sha256
88 : server_config_sha256);
Austin Schuh0a2f12f2021-01-08 22:48:29 -080089 }
90
91 void RunPi1Server(chrono::nanoseconds duration) {
Philipp Schradera6712522023-07-05 20:25:11 -070092 // Set up a shutdown callback.
Austin Schuh0a2f12f2021-01-08 22:48:29 -080093 aos::TimerHandler *const quit = pi1_server_event_loop->AddTimer(
94 [this]() { pi1_server_event_loop->Exit(); });
95 pi1_server_event_loop->OnRun([this, quit, duration]() {
96 // Stop between timestamps, not exactly on them.
Philipp Schradera6712522023-07-05 20:25:11 -070097 quit->Schedule(pi1_server_event_loop->monotonic_now() + duration);
Austin Schuh0a2f12f2021-01-08 22:48:29 -080098 });
99
100 pi1_server_event_loop->Run();
101 }
102
103 void StartPi1Server() {
104 pi1_server_thread = std::thread([this]() {
105 LOG(INFO) << "Started pi1_message_bridge_server";
106 pi1_server_event_loop->Run();
107 });
108 }
109
110 void StopPi1Server() {
111 if (pi1_server_thread.joinable()) {
112 pi1_server_event_loop->Exit();
113 pi1_server_thread.join();
114 pi1_server_thread = std::thread();
115 }
116 pi1_message_bridge_server.reset();
117 pi1_server_event_loop.reset();
118 }
119
120 void MakePi1Client() {
121 OnPi1();
122 FLAGS_application_name = "pi1_message_bridge_client";
123 pi1_client_event_loop =
Austin Schuhf466ab52021-02-16 22:00:38 -0800124 std::make_unique<aos::ShmEventLoop>(&config.message());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800125 pi1_client_event_loop->SetRuntimeRealtimePriority(1);
Austin Schuhb0e439d2023-05-15 10:55:40 -0700126 pi1_message_bridge_client = std::make_unique<MessageBridgeClient>(
127 pi1_client_event_loop.get(), config_sha256);
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800128 }
129
130 void StartPi1Client() {
131 pi1_client_thread = std::thread([this]() {
132 LOG(INFO) << "Started pi1_message_bridge_client";
133 pi1_client_event_loop->Run();
134 });
135 }
136
137 void StopPi1Client() {
138 pi1_client_event_loop->Exit();
139 pi1_client_thread.join();
140 pi1_client_thread = std::thread();
141 pi1_message_bridge_client.reset();
142 pi1_client_event_loop.reset();
143 }
144
145 void MakePi1Test() {
146 OnPi1();
147 FLAGS_application_name = "test1";
148 pi1_test_event_loop =
Austin Schuhf466ab52021-02-16 22:00:38 -0800149 std::make_unique<aos::ShmEventLoop>(&config.message());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800150
151 pi1_test_event_loop->MakeWatcher(
152 "/pi1/aos", [](const ServerStatistics &stats) {
153 VLOG(1) << "/pi1/aos ServerStatistics " << FlatbufferToJson(&stats);
154 });
155
156 pi1_test_event_loop->MakeWatcher(
157 "/pi1/aos", [](const ClientStatistics &stats) {
158 VLOG(1) << "/pi1/aos ClientStatistics " << FlatbufferToJson(&stats);
159 });
160
161 pi1_test_event_loop->MakeWatcher(
162 "/pi1/aos", [](const Timestamp &timestamp) {
163 VLOG(1) << "/pi1/aos Timestamp " << FlatbufferToJson(&timestamp);
164 });
Austin Schuh8902fa52021-03-14 22:39:24 -0700165 pi1_test_event_loop->MakeWatcher(
166 "/pi2/aos", [this](const Timestamp &timestamp) {
167 VLOG(1) << "/pi2/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuha9012be2021-07-21 15:19:11 -0700168 EXPECT_EQ(pi1_test_event_loop->context().source_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -0700169 pi2_boot_uuid_);
170 });
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800171 }
172
173 void StartPi1Test() {
174 pi1_test_thread = std::thread([this]() {
175 LOG(INFO) << "Started pi1_test";
176 pi1_test_event_loop->Run();
177 });
178 }
179
180 void StopPi1Test() {
181 pi1_test_event_loop->Exit();
182 pi1_test_thread.join();
183 }
184
185 void MakePi2Server() {
186 OnPi2();
187 FLAGS_application_name = "pi2_message_bridge_server";
188 pi2_server_event_loop =
Austin Schuhf466ab52021-02-16 22:00:38 -0800189 std::make_unique<aos::ShmEventLoop>(&config.message());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800190 pi2_server_event_loop->SetRuntimeRealtimePriority(1);
Austin Schuhb0e439d2023-05-15 10:55:40 -0700191 pi2_message_bridge_server = std::make_unique<MessageBridgeServer>(
192 pi2_server_event_loop.get(), config_sha256);
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800193 }
194
195 void RunPi2Server(chrono::nanoseconds duration) {
Philipp Schradera6712522023-07-05 20:25:11 -0700196 // Set up a shutdown callback.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800197 aos::TimerHandler *const quit = pi2_server_event_loop->AddTimer(
198 [this]() { pi2_server_event_loop->Exit(); });
199 pi2_server_event_loop->OnRun([this, quit, duration]() {
200 // Stop between timestamps, not exactly on them.
Philipp Schradera6712522023-07-05 20:25:11 -0700201 quit->Schedule(pi2_server_event_loop->monotonic_now() + duration);
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800202 });
203
204 pi2_server_event_loop->Run();
205 }
206
207 void StartPi2Server() {
208 pi2_server_thread = std::thread([this]() {
209 LOG(INFO) << "Started pi2_message_bridge_server";
210 pi2_server_event_loop->Run();
211 });
212 }
213
214 void StopPi2Server() {
215 if (pi2_server_thread.joinable()) {
216 pi2_server_event_loop->Exit();
217 pi2_server_thread.join();
218 pi2_server_thread = std::thread();
219 }
220 pi2_message_bridge_server.reset();
221 pi2_server_event_loop.reset();
222 }
223
224 void MakePi2Client() {
225 OnPi2();
226 FLAGS_application_name = "pi2_message_bridge_client";
227 pi2_client_event_loop =
Austin Schuhf466ab52021-02-16 22:00:38 -0800228 std::make_unique<aos::ShmEventLoop>(&config.message());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800229 pi2_client_event_loop->SetRuntimeRealtimePriority(1);
Austin Schuhb0e439d2023-05-15 10:55:40 -0700230 pi2_message_bridge_client = std::make_unique<MessageBridgeClient>(
231 pi2_client_event_loop.get(), config_sha256);
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800232 }
233
234 void RunPi2Client(chrono::nanoseconds duration) {
235 // Run for 5 seconds to make sure we have time to estimate the offset.
236 aos::TimerHandler *const quit = pi2_client_event_loop->AddTimer(
237 [this]() { pi2_client_event_loop->Exit(); });
238 pi2_client_event_loop->OnRun([this, quit, duration]() {
239 // Stop between timestamps, not exactly on them.
Philipp Schradera6712522023-07-05 20:25:11 -0700240 quit->Schedule(pi2_client_event_loop->monotonic_now() + duration);
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800241 });
242
243 // And go!
244 pi2_client_event_loop->Run();
245 }
246
247 void StartPi2Client() {
248 pi2_client_thread = std::thread([this]() {
249 LOG(INFO) << "Started pi2_message_bridge_client";
250 pi2_client_event_loop->Run();
251 });
252 }
253
254 void StopPi2Client() {
255 if (pi2_client_thread.joinable()) {
256 pi2_client_event_loop->Exit();
257 pi2_client_thread.join();
258 pi2_client_thread = std::thread();
259 }
260 pi2_message_bridge_client.reset();
261 pi2_client_event_loop.reset();
262 }
263
264 void MakePi2Test() {
265 OnPi2();
266 FLAGS_application_name = "test2";
267 pi2_test_event_loop =
Austin Schuhf466ab52021-02-16 22:00:38 -0800268 std::make_unique<aos::ShmEventLoop>(&config.message());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800269
270 pi2_test_event_loop->MakeWatcher(
271 "/pi2/aos", [](const ServerStatistics &stats) {
272 VLOG(1) << "/pi2/aos ServerStatistics " << FlatbufferToJson(&stats);
273 });
274
275 pi2_test_event_loop->MakeWatcher(
276 "/pi2/aos", [](const ClientStatistics &stats) {
277 VLOG(1) << "/pi2/aos ClientStatistics " << FlatbufferToJson(&stats);
278 });
279
280 pi2_test_event_loop->MakeWatcher(
Austin Schuh8902fa52021-03-14 22:39:24 -0700281 "/pi1/aos", [this](const Timestamp &timestamp) {
282 VLOG(1) << "/pi1/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuha9012be2021-07-21 15:19:11 -0700283 EXPECT_EQ(pi2_test_event_loop->context().source_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -0700284 pi1_boot_uuid_);
285 });
286 pi2_test_event_loop->MakeWatcher(
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800287 "/pi2/aos", [](const Timestamp &timestamp) {
288 VLOG(1) << "/pi2/aos Timestamp " << FlatbufferToJson(&timestamp);
289 });
290 }
291
292 void StartPi2Test() {
293 pi2_test_thread = std::thread([this]() {
294 LOG(INFO) << "Started pi2_message_bridge_test";
295 pi2_test_event_loop->Run();
296 });
297 }
298
299 void StopPi2Test() {
300 pi2_test_event_loop->Exit();
301 pi2_test_thread.join();
302 }
303
Austin Schuhf466ab52021-02-16 22:00:38 -0800304 aos::FlatbufferDetachedBuffer<aos::Configuration> config;
Austin Schuhb0e439d2023-05-15 10:55:40 -0700305 std::string config_sha256;
306
Austin Schuh8902fa52021-03-14 22:39:24 -0700307 const UUID pi1_boot_uuid_;
308 const UUID pi2_boot_uuid_;
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800309
310 std::unique_ptr<aos::ShmEventLoop> pi1_server_event_loop;
311 std::unique_ptr<MessageBridgeServer> pi1_message_bridge_server;
312 std::thread pi1_server_thread;
313
314 std::unique_ptr<aos::ShmEventLoop> pi1_client_event_loop;
315 std::unique_ptr<MessageBridgeClient> pi1_message_bridge_client;
316 std::thread pi1_client_thread;
317
318 std::unique_ptr<aos::ShmEventLoop> pi1_test_event_loop;
319 std::thread pi1_test_thread;
320
321 std::unique_ptr<aos::ShmEventLoop> pi2_server_event_loop;
322 std::unique_ptr<MessageBridgeServer> pi2_message_bridge_server;
323 std::thread pi2_server_thread;
324
325 std::unique_ptr<aos::ShmEventLoop> pi2_client_event_loop;
326 std::unique_ptr<MessageBridgeClient> pi2_message_bridge_client;
327 std::thread pi2_client_thread;
328
329 std::unique_ptr<aos::ShmEventLoop> pi2_test_event_loop;
330 std::thread pi2_test_thread;
Austin Schuhe991fe22020-11-18 16:53:39 -0800331};
332
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800333// Test that we can send a ping message over sctp and receive it.
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800334TEST_P(MessageBridgeParameterizedTest, PingPong) {
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800335 // This is rather annoying to set up. We need to start up a client and
336 // server, on the same node, but get them to think that they are on different
337 // nodes.
338 //
339 // We then get to wait until they are connected.
340 //
341 // After they are connected, we send a Ping message.
342 //
343 // On the other end, we receive a Pong message.
344 //
345 // But, we need the client to not post directly to "/test" like it would in a
346 // real system, otherwise we will re-send the ping message... So, use an
347 // application specific map to have the client post somewhere else.
348 //
349 // To top this all off, each of these needs to be done with a ShmEventLoop,
350 // which needs to run in a separate thread... And it is really hard to get
351 // everything started up reliably. So just be super generous on timeouts and
352 // hope for the best. We can be more generous in the future if we need to.
353 //
354 // We are faking the application names by passing in --application_name=foo
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800355 OnPi1();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800356 // Force ourselves to be "raspberrypi" and allocate everything.
Austin Schuh2f8fd752020-09-01 22:38:28 -0700357
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800358 MakePi1Server();
359 MakePi1Client();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800360
Austin Schuh89e1e9c2023-05-15 14:38:44 -0700361 const std::string long_data = std::string(10000, 'a');
362
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800363 // And build the app which sends the pings.
364 FLAGS_application_name = "ping";
Austin Schuhf466ab52021-02-16 22:00:38 -0800365 aos::ShmEventLoop ping_event_loop(&config.message());
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800366 aos::Sender<examples::Ping> ping_sender =
367 ping_event_loop.MakeSender<examples::Ping>("/test");
368
Austin Schuhf466ab52021-02-16 22:00:38 -0800369 aos::ShmEventLoop pi1_test_event_loop(&config.message());
Austin Schuh0de30f32020-12-06 12:44:28 -0800370 aos::Fetcher<RemoteMessage> message_header_fetcher1 =
371 pi1_test_event_loop.MakeFetcher<RemoteMessage>(
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800372 shared() ? "/pi1/aos/remote_timestamps/pi2"
373 : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping");
Austin Schuh2f8fd752020-09-01 22:38:28 -0700374
375 // Fetchers for confirming the remote timestamps made it.
376 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
377 ping_event_loop.MakeFetcher<examples::Ping>("/test");
378 aos::Fetcher<Timestamp> pi1_on_pi1_timestamp_fetcher =
379 ping_event_loop.MakeFetcher<Timestamp>("/aos");
380
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800381 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800382 OnPi2();
Austin Schuh2f8fd752020-09-01 22:38:28 -0700383
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800384 MakePi2Client();
385 MakePi2Server();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800386
387 // And build the app which sends the pongs.
388 FLAGS_application_name = "pong";
Austin Schuhf466ab52021-02-16 22:00:38 -0800389 aos::ShmEventLoop pong_event_loop(&config.message());
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800390
Austin Schuh7bc59052020-02-16 23:48:33 -0800391 // And build the app for testing.
392 FLAGS_application_name = "test";
Austin Schuhf466ab52021-02-16 22:00:38 -0800393 aos::ShmEventLoop test_event_loop(&config.message());
Austin Schuh7bc59052020-02-16 23:48:33 -0800394
395 aos::Fetcher<ClientStatistics> client_statistics_fetcher =
396 test_event_loop.MakeFetcher<ClientStatistics>("/aos");
Austin Schuh0de30f32020-12-06 12:44:28 -0800397 aos::Fetcher<RemoteMessage> message_header_fetcher2 =
398 test_event_loop.MakeFetcher<RemoteMessage>(
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800399 shared() ? "/pi2/aos/remote_timestamps/pi1"
400 : "/pi2/aos/remote_timestamps/pi1/pi2/aos/"
401 "aos-message_bridge-Timestamp");
Austin Schuh2f8fd752020-09-01 22:38:28 -0700402
403 // Event loop for fetching data delivered to pi2 from pi1 to match up
404 // messages.
Austin Schuhf466ab52021-02-16 22:00:38 -0800405 aos::ShmEventLoop delivered_messages_event_loop(&config.message());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700406 aos::Fetcher<Timestamp> pi1_on_pi2_timestamp_fetcher =
407 delivered_messages_event_loop.MakeFetcher<Timestamp>("/pi1/aos");
408 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
409 delivered_messages_event_loop.MakeFetcher<examples::Ping>("/test");
410 EXPECT_FALSE(ping_on_pi2_fetcher.Fetch());
411 EXPECT_FALSE(pi1_on_pi2_timestamp_fetcher.Fetch());
Austin Schuh7bc59052020-02-16 23:48:33 -0800412
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800413 // Count the pongs.
414 int pong_count = 0;
Austin Schuh8902fa52021-03-14 22:39:24 -0700415 pong_event_loop.MakeWatcher("/test", [&pong_count, &pong_event_loop,
416 this](const examples::Ping &ping) {
Austin Schuha9012be2021-07-21 15:19:11 -0700417 EXPECT_EQ(pong_event_loop.context().source_boot_uuid, pi1_boot_uuid_);
Austin Schuh8902fa52021-03-14 22:39:24 -0700418 ++pong_count;
419 VLOG(1) << "Got ping back " << FlatbufferToJson(&ping);
420 });
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800421
422 FLAGS_override_hostname = "";
423
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800424 // Wait until we are connected, then send.
425 int ping_count = 0;
Austin Schuh7bc59052020-02-16 23:48:33 -0800426 int pi1_server_statistics_count = 0;
Philipp Schrader790cb542023-07-05 21:06:52 -0700427 ping_event_loop.MakeWatcher(
428 "/pi1/aos",
429 [this, &ping_count, &ping_sender, &pi1_server_statistics_count,
430 &long_data](const ServerStatistics &stats) {
431 VLOG(1) << "/pi1/aos ServerStatistics " << FlatbufferToJson(&stats);
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800432
Philipp Schrader790cb542023-07-05 21:06:52 -0700433 ASSERT_TRUE(stats.has_connections());
434 EXPECT_EQ(stats.connections()->size(), 1);
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800435
Philipp Schrader790cb542023-07-05 21:06:52 -0700436 bool connected = false;
437 for (const ServerConnection *connection : *stats.connections()) {
438 // Confirm that we are estimating the server time offset correctly. It
439 // should be about 0 since we are on the same machine here.
440 if (connection->has_monotonic_offset()) {
441 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
442 chrono::milliseconds(1));
443 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
444 chrono::milliseconds(-1));
445 ++pi1_server_statistics_count;
446 }
Austin Schuh7bc59052020-02-16 23:48:33 -0800447
Philipp Schrader790cb542023-07-05 21:06:52 -0700448 if (connection->node()->name()->string_view() ==
449 pi2_client_event_loop->node()->name()->string_view()) {
450 if (connection->state() == State::CONNECTED) {
451 EXPECT_TRUE(connection->has_boot_uuid());
452 EXPECT_EQ(connection->connection_count(), 1u);
453 EXPECT_LT(monotonic_clock::time_point(chrono::nanoseconds(
454 connection->connected_since_time())),
455 monotonic_clock::now());
456 connected = true;
457 } else {
458 EXPECT_FALSE(connection->has_connection_count());
459 EXPECT_FALSE(connection->has_connected_since_time());
460 }
461 }
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800462 }
463
Philipp Schrader790cb542023-07-05 21:06:52 -0700464 if (connected) {
465 VLOG(1) << "Connected! Sent ping.";
466 auto builder = ping_sender.MakeBuilder();
467 builder.fbb()->CreateString(long_data);
468 examples::Ping::Builder ping_builder =
469 builder.MakeBuilder<examples::Ping>();
470 ping_builder.add_value(ping_count + 971);
471 EXPECT_EQ(builder.Send(ping_builder.Finish()), RawSender::Error::kOk);
472 ++ping_count;
473 }
474 });
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800475
Austin Schuh7bc59052020-02-16 23:48:33 -0800476 // Confirm both client and server statistics messages have decent offsets in
477 // them.
478 int pi2_server_statistics_count = 0;
Austin Schuh196a4452020-03-15 23:12:03 -0700479 pong_event_loop.MakeWatcher("/pi2/aos", [&pi2_server_statistics_count](
Austin Schuh7bc59052020-02-16 23:48:33 -0800480 const ServerStatistics &stats) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800481 VLOG(1) << "/pi2/aos ServerStatistics " << FlatbufferToJson(&stats);
Austin Schuh7bc59052020-02-16 23:48:33 -0800482 for (const ServerConnection *connection : *stats.connections()) {
483 if (connection->has_monotonic_offset()) {
484 ++pi2_server_statistics_count;
485 // Confirm that we are estimating the server time offset correctly. It
486 // should be about 0 since we are on the same machine here.
487 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
488 chrono::milliseconds(1));
489 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
490 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800491 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh7bc59052020-02-16 23:48:33 -0800492 }
Austin Schuh367a7f42021-11-23 23:04:36 -0800493
494 if (connection->state() == State::CONNECTED) {
495 EXPECT_EQ(connection->connection_count(), 1u);
496 EXPECT_LT(monotonic_clock::time_point(
497 chrono::nanoseconds(connection->connected_since_time())),
498 monotonic_clock::now());
499 } else {
500 EXPECT_FALSE(connection->has_connection_count());
501 EXPECT_FALSE(connection->has_connected_since_time());
502 }
Austin Schuh7bc59052020-02-16 23:48:33 -0800503 }
504 });
505
506 int pi1_client_statistics_count = 0;
Austin Schuh367a7f42021-11-23 23:04:36 -0800507 int pi1_connected_client_statistics_count = 0;
508 ping_event_loop.MakeWatcher(
509 "/pi1/aos",
510 [&pi1_client_statistics_count,
511 &pi1_connected_client_statistics_count](const ClientStatistics &stats) {
512 VLOG(1) << "/pi1/aos ClientStatistics " << FlatbufferToJson(&stats);
Austin Schuh7bc59052020-02-16 23:48:33 -0800513
Austin Schuh367a7f42021-11-23 23:04:36 -0800514 for (const ClientConnection *connection : *stats.connections()) {
515 if (connection->has_monotonic_offset()) {
516 ++pi1_client_statistics_count;
517 // It takes at least 10 microseconds to send a message between the
518 // client and server. The min (filtered) time shouldn't be over 10
519 // milliseconds on localhost. This might have to bump up if this is
520 // proving flaky.
521 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
522 chrono::milliseconds(10))
523 << " " << connection->monotonic_offset()
524 << "ns vs 10000ns on iteration " << pi1_client_statistics_count;
525 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
526 chrono::microseconds(10))
527 << " " << connection->monotonic_offset()
528 << "ns vs 10000ns on iteration " << pi1_client_statistics_count;
529 }
530 if (connection->state() == State::CONNECTED) {
531 EXPECT_EQ(connection->connection_count(), 1u);
532 EXPECT_LT(monotonic_clock::time_point(chrono::nanoseconds(
533 connection->connected_since_time())),
534 monotonic_clock::now());
535 // The first Connected message may not have a UUID in it since no
536 // data has flown. That's fine.
537 if (pi1_connected_client_statistics_count > 0) {
538 EXPECT_TRUE(connection->has_boot_uuid())
539 << ": " << aos::FlatbufferToJson(connection);
540 }
541 ++pi1_connected_client_statistics_count;
542 } else {
543 EXPECT_FALSE(connection->has_connection_count());
544 EXPECT_FALSE(connection->has_connected_since_time());
545 }
546 }
547 });
Austin Schuh7bc59052020-02-16 23:48:33 -0800548
549 int pi2_client_statistics_count = 0;
Austin Schuh367a7f42021-11-23 23:04:36 -0800550 int pi2_connected_client_statistics_count = 0;
551 pong_event_loop.MakeWatcher(
552 "/pi2/aos",
553 [&pi2_client_statistics_count,
554 &pi2_connected_client_statistics_count](const ClientStatistics &stats) {
555 VLOG(1) << "/pi2/aos ClientStatistics " << FlatbufferToJson(&stats);
Austin Schuh7bc59052020-02-16 23:48:33 -0800556
Austin Schuh367a7f42021-11-23 23:04:36 -0800557 for (const ClientConnection *connection : *stats.connections()) {
558 if (connection->has_monotonic_offset()) {
559 ++pi2_client_statistics_count;
560 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
561 chrono::milliseconds(10))
562 << ": got " << aos::FlatbufferToJson(connection);
563 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
564 chrono::microseconds(10))
565 << ": got " << aos::FlatbufferToJson(connection);
566 }
567 if (connection->state() == State::CONNECTED) {
568 EXPECT_EQ(connection->connection_count(), 1u);
569 EXPECT_LT(monotonic_clock::time_point(chrono::nanoseconds(
570 connection->connected_since_time())),
571 monotonic_clock::now());
572 if (pi2_connected_client_statistics_count > 0) {
573 EXPECT_TRUE(connection->has_boot_uuid());
574 }
575 ++pi2_connected_client_statistics_count;
576 } else {
577 EXPECT_FALSE(connection->has_connection_count());
578 EXPECT_FALSE(connection->has_connected_since_time());
579 }
580 }
581 });
Austin Schuh7bc59052020-02-16 23:48:33 -0800582
Austin Schuh196a4452020-03-15 23:12:03 -0700583 ping_event_loop.MakeWatcher("/pi1/aos", [](const Timestamp &timestamp) {
Austin Schuh7bc59052020-02-16 23:48:33 -0800584 EXPECT_TRUE(timestamp.has_offsets());
Austin Schuh1ca49e92020-12-11 00:01:27 -0800585 VLOG(1) << "/pi1/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuh7bc59052020-02-16 23:48:33 -0800586 });
Austin Schuh196a4452020-03-15 23:12:03 -0700587 pong_event_loop.MakeWatcher("/pi2/aos", [](const Timestamp &timestamp) {
Austin Schuh7bc59052020-02-16 23:48:33 -0800588 EXPECT_TRUE(timestamp.has_offsets());
Austin Schuh1ca49e92020-12-11 00:01:27 -0800589 VLOG(1) << "/pi2/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuh7bc59052020-02-16 23:48:33 -0800590 });
591
592 // Run for 5 seconds to make sure we have time to estimate the offset.
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800593 aos::TimerHandler *quit = ping_event_loop.AddTimer(
594 [&ping_event_loop]() { ping_event_loop.Exit(); });
595 ping_event_loop.OnRun([quit, &ping_event_loop]() {
Austin Schuh7bc59052020-02-16 23:48:33 -0800596 // Stop between timestamps, not exactly on them.
Philipp Schradera6712522023-07-05 20:25:11 -0700597 quit->Schedule(ping_event_loop.monotonic_now() +
598 chrono::milliseconds(5050));
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800599 });
600
Austin Schuh2f8fd752020-09-01 22:38:28 -0700601 // Find the channel index for both the /pi1/aos Timestamp channel and Ping
602 // channel.
603 const size_t pi1_timestamp_channel = configuration::ChannelIndex(
604 pong_event_loop.configuration(), pi1_on_pi2_timestamp_fetcher.channel());
605 const size_t ping_timestamp_channel =
606 configuration::ChannelIndex(delivered_messages_event_loop.configuration(),
607 ping_on_pi2_fetcher.channel());
608
609 for (const Channel *channel : *ping_event_loop.configuration()->channels()) {
610 VLOG(1) << "Channel "
611 << configuration::ChannelIndex(ping_event_loop.configuration(),
612 channel)
613 << " " << configuration::CleanedChannelToString(channel);
614 }
615
616 // For each remote timestamp we get back, confirm that it is either a ping
617 // message, or a timestamp we sent out. Also confirm that the timestamps are
618 // correct.
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800619 for (std::pair<int, std::string> channel :
620 shared()
621 ? std::vector<std::pair<
622 int, std::string>>{{-1, "/pi1/aos/remote_timestamps/pi2"}}
623 : std::vector<std::pair<int, std::string>>{
624 {pi1_timestamp_channel,
625 "/pi1/aos/remote_timestamps/pi2/pi1/aos/"
626 "aos-message_bridge-Timestamp"},
627 {ping_timestamp_channel,
628 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping"}}) {
629 ping_event_loop.MakeWatcher(
630 channel.second,
631 [pi1_timestamp_channel, ping_timestamp_channel, &ping_on_pi2_fetcher,
632 &ping_on_pi1_fetcher, &pi1_on_pi2_timestamp_fetcher,
633 &pi1_on_pi1_timestamp_fetcher,
634 channel_index = channel.first](const RemoteMessage &header) {
635 VLOG(1) << "/pi1/aos/remote_timestamps/pi2 RemoteMessage "
636 << aos::FlatbufferToJson(&header);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700637
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800638 EXPECT_TRUE(header.has_boot_uuid());
639 if (channel_index != -1) {
640 ASSERT_EQ(channel_index, header.channel_index());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700641 }
642
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800643 const aos::monotonic_clock::time_point header_monotonic_sent_time(
644 chrono::nanoseconds(header.monotonic_sent_time()));
645 const aos::realtime_clock::time_point header_realtime_sent_time(
646 chrono::nanoseconds(header.realtime_sent_time()));
647 const aos::monotonic_clock::time_point header_monotonic_remote_time(
648 chrono::nanoseconds(header.monotonic_remote_time()));
649 const aos::realtime_clock::time_point header_realtime_remote_time(
650 chrono::nanoseconds(header.realtime_remote_time()));
651
652 const Context *pi1_context = nullptr;
653 const Context *pi2_context = nullptr;
654
655 if (header.channel_index() == pi1_timestamp_channel) {
656 // Find the forwarded message.
657 while (pi1_on_pi2_timestamp_fetcher.context().monotonic_event_time <
658 header_monotonic_sent_time) {
659 ASSERT_TRUE(pi1_on_pi2_timestamp_fetcher.FetchNext());
660 }
661
662 // And the source message.
663 while (pi1_on_pi1_timestamp_fetcher.context().monotonic_event_time <
664 header_monotonic_remote_time) {
665 ASSERT_TRUE(pi1_on_pi1_timestamp_fetcher.FetchNext());
666 }
667
668 pi1_context = &pi1_on_pi1_timestamp_fetcher.context();
669 pi2_context = &pi1_on_pi2_timestamp_fetcher.context();
670 } else if (header.channel_index() == ping_timestamp_channel) {
671 // Find the forwarded message.
672 while (ping_on_pi2_fetcher.context().monotonic_event_time <
673 header_monotonic_sent_time) {
674 ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
675 }
676
677 // And the source message.
678 while (ping_on_pi1_fetcher.context().monotonic_event_time <
679 header_monotonic_remote_time) {
680 ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
681 }
682
683 pi1_context = &ping_on_pi1_fetcher.context();
684 pi2_context = &ping_on_pi2_fetcher.context();
685 } else {
686 LOG(FATAL) << "Unknown channel";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700687 }
688
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800689 // Confirm the forwarded message has matching timestamps to the
690 // timestamps we got back.
691 EXPECT_EQ(pi2_context->queue_index, header.queue_index());
692 EXPECT_EQ(pi2_context->monotonic_event_time,
693 header_monotonic_sent_time);
694 EXPECT_EQ(pi2_context->realtime_event_time,
695 header_realtime_sent_time);
696 EXPECT_EQ(pi2_context->realtime_remote_time,
697 header_realtime_remote_time);
698 EXPECT_EQ(pi2_context->monotonic_remote_time,
699 header_monotonic_remote_time);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700700
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800701 // Confirm the forwarded message also matches the source message.
702 EXPECT_EQ(pi1_context->queue_index, header.queue_index());
703 EXPECT_EQ(pi1_context->monotonic_event_time,
704 header_monotonic_remote_time);
705 EXPECT_EQ(pi1_context->realtime_event_time,
706 header_realtime_remote_time);
707 });
708 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700709
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800710 // Start everything up. Pong is the only thing we don't know how to wait
711 // on, so start it first.
Austin Schuh7bc59052020-02-16 23:48:33 -0800712 std::thread pong_thread([&pong_event_loop]() { pong_event_loop.Run(); });
713
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800714 StartPi1Server();
715 StartPi1Client();
716 StartPi2Client();
717 StartPi2Server();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800718
719 // And go!
720 ping_event_loop.Run();
721
722 // Shut everyone else down
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800723 StopPi1Server();
724 StopPi1Client();
725 StopPi2Client();
726 StopPi2Server();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800727 pong_event_loop.Exit();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800728 pong_thread.join();
729
730 // Make sure we sent something.
731 EXPECT_GE(ping_count, 1);
732 // And got something back.
733 EXPECT_GE(pong_count, 1);
Austin Schuh7bc59052020-02-16 23:48:33 -0800734
735 // Confirm that we are estimating a monotonic offset on the client.
736 ASSERT_TRUE(client_statistics_fetcher.Fetch());
737
738 EXPECT_EQ(client_statistics_fetcher->connections()->size(), 1u);
739 EXPECT_EQ(client_statistics_fetcher->connections()
740 ->Get(0)
741 ->node()
742 ->name()
743 ->string_view(),
744 "pi1");
745
746 // Make sure the offset in one direction is less than a second.
747 EXPECT_GT(
Austin Schuh2b159eb2021-07-31 19:42:21 -0700748 client_statistics_fetcher->connections()->Get(0)->monotonic_offset(), 0)
749 << aos::FlatbufferToJson(client_statistics_fetcher.get());
Austin Schuh7bc59052020-02-16 23:48:33 -0800750 EXPECT_LT(
751 client_statistics_fetcher->connections()->Get(0)->monotonic_offset(),
Austin Schuh2b159eb2021-07-31 19:42:21 -0700752 1000000000)
753 << aos::FlatbufferToJson(client_statistics_fetcher.get());
Austin Schuh7bc59052020-02-16 23:48:33 -0800754
755 EXPECT_GE(pi1_server_statistics_count, 2);
756 EXPECT_GE(pi2_server_statistics_count, 2);
757 EXPECT_GE(pi1_client_statistics_count, 2);
758 EXPECT_GE(pi2_client_statistics_count, 2);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700759
760 // Confirm we got timestamps back!
761 EXPECT_TRUE(message_header_fetcher1.Fetch());
762 EXPECT_TRUE(message_header_fetcher2.Fetch());
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800763}
764
Austin Schuh5344c352020-04-12 17:04:26 -0700765// Test that the client disconnecting triggers the server offsets on both sides
766// to clear.
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800767TEST_P(MessageBridgeParameterizedTest, ClientRestart) {
Austin Schuh5344c352020-04-12 17:04:26 -0700768 // This is rather annoying to set up. We need to start up a client and
769 // server, on the same node, but get them to think that they are on different
770 // nodes.
771 //
772 // We need the client to not post directly to "/test" like it would in a
773 // real system, otherwise we will re-send the ping message... So, use an
774 // application specific map to have the client post somewhere else.
775 //
776 // To top this all off, each of these needs to be done with a ShmEventLoop,
777 // which needs to run in a separate thread... And it is really hard to get
778 // everything started up reliably. So just be super generous on timeouts and
779 // hope for the best. We can be more generous in the future if we need to.
780 //
781 // We are faking the application names by passing in --application_name=foo
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800782 OnPi1();
Austin Schuh5344c352020-04-12 17:04:26 -0700783
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800784 MakePi1Server();
785 MakePi1Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700786
787 // And build the app for testing.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800788 MakePi1Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700789 aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800790 pi1_test_event_loop->MakeFetcher<ServerStatistics>("/pi1/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700791
792 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800793 OnPi2();
794 MakePi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700795
796 // And build the app for testing.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800797 MakePi2Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700798 aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800799 pi2_test_event_loop->MakeFetcher<ServerStatistics>("/pi2/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700800
801 // Wait until we are connected, then send.
Austin Schuh5344c352020-04-12 17:04:26 -0700802
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800803 StartPi1Test();
804 StartPi2Test();
805 StartPi1Server();
806 StartPi1Client();
807 StartPi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700808
809 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800810 MakePi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700811
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800812 RunPi2Client(chrono::milliseconds(3050));
Austin Schuh5344c352020-04-12 17:04:26 -0700813
814 // Now confirm we are synchronized.
815 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
816 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
817
818 const ServerConnection *const pi1_connection =
819 pi1_server_statistics_fetcher->connections()->Get(0);
820 const ServerConnection *const pi2_connection =
821 pi2_server_statistics_fetcher->connections()->Get(0);
822
823 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800824 EXPECT_EQ(pi1_connection->connection_count(), 1u);
825 EXPECT_TRUE(pi1_connection->has_connected_since_time());
Austin Schuh5344c352020-04-12 17:04:26 -0700826 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
827 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
828 chrono::milliseconds(1));
829 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
830 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800831 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700832
833 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800834 EXPECT_EQ(pi2_connection->connection_count(), 1u);
835 EXPECT_TRUE(pi2_connection->has_connected_since_time());
Austin Schuh5344c352020-04-12 17:04:26 -0700836 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
837 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
838 chrono::milliseconds(1));
839 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
840 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800841 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800842
843 StopPi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700844 }
845
Austin Schuhd0d894e2021-10-24 17:13:11 -0700846 std::this_thread::sleep_for(SctpClientConnection::kReconnectTimeout +
847 std::chrono::seconds(1));
Austin Schuh5344c352020-04-12 17:04:26 -0700848
849 {
850 // Now confirm we are un-synchronized.
851 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
852 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
853 const ServerConnection *const pi1_connection =
854 pi1_server_statistics_fetcher->connections()->Get(0);
855 const ServerConnection *const pi2_connection =
856 pi2_server_statistics_fetcher->connections()->Get(0);
857
858 EXPECT_EQ(pi1_connection->state(), State::DISCONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800859 EXPECT_EQ(pi1_connection->connection_count(), 1u);
860 EXPECT_FALSE(pi1_connection->has_connected_since_time());
Austin Schuh5344c352020-04-12 17:04:26 -0700861 EXPECT_FALSE(pi1_connection->has_monotonic_offset());
Austin Schuh20ac95d2020-12-05 17:24:19 -0800862 EXPECT_FALSE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700863 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
864 EXPECT_FALSE(pi2_connection->has_monotonic_offset());
Austin Schuh20ac95d2020-12-05 17:24:19 -0800865 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh367a7f42021-11-23 23:04:36 -0800866 EXPECT_EQ(pi2_connection->connection_count(), 1u);
867 EXPECT_TRUE(pi2_connection->has_connected_since_time());
Austin Schuh5344c352020-04-12 17:04:26 -0700868 }
869
870 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800871 MakePi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700872 // And go!
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800873 RunPi2Client(chrono::milliseconds(3050));
Austin Schuh5344c352020-04-12 17:04:26 -0700874
875 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
876 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
877
878 // Now confirm we are synchronized again.
879 const ServerConnection *const pi1_connection =
880 pi1_server_statistics_fetcher->connections()->Get(0);
881 const ServerConnection *const pi2_connection =
882 pi2_server_statistics_fetcher->connections()->Get(0);
883
884 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800885 EXPECT_EQ(pi1_connection->connection_count(), 2u);
886 EXPECT_TRUE(pi1_connection->has_connected_since_time());
Austin Schuh5344c352020-04-12 17:04:26 -0700887 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
888 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
Austin Schuh367a7f42021-11-23 23:04:36 -0800889 chrono::milliseconds(1))
890 << ": " << FlatbufferToJson(pi1_connection);
Austin Schuh5344c352020-04-12 17:04:26 -0700891 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
Austin Schuh367a7f42021-11-23 23:04:36 -0800892 chrono::milliseconds(-1))
893 << ": " << FlatbufferToJson(pi1_connection);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800894 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700895
896 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800897 EXPECT_EQ(pi2_connection->connection_count(), 1u);
898 EXPECT_TRUE(pi2_connection->has_connected_since_time());
Austin Schuh5344c352020-04-12 17:04:26 -0700899 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
900 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
Austin Schuh367a7f42021-11-23 23:04:36 -0800901 chrono::milliseconds(1))
902 << ": " << FlatbufferToJson(pi2_connection);
Austin Schuh5344c352020-04-12 17:04:26 -0700903 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
Austin Schuh367a7f42021-11-23 23:04:36 -0800904 chrono::milliseconds(-1))
905 << ": " << FlatbufferToJson(pi2_connection);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800906 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800907
908 StopPi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700909 }
910
911 // Shut everyone else down
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800912 StopPi1Server();
913 StopPi1Client();
914 StopPi2Server();
915 StopPi1Test();
916 StopPi2Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700917}
918
919// Test that the server disconnecting triggers the server offsets on the other
920// side to clear, along with the other client.
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800921TEST_P(MessageBridgeParameterizedTest, ServerRestart) {
Austin Schuh5344c352020-04-12 17:04:26 -0700922 // This is rather annoying to set up. We need to start up a client and
923 // server, on the same node, but get them to think that they are on different
924 // nodes.
925 //
926 // We need the client to not post directly to "/test" like it would in a
927 // real system, otherwise we will re-send the ping message... So, use an
928 // application specific map to have the client post somewhere else.
929 //
930 // To top this all off, each of these needs to be done with a ShmEventLoop,
931 // which needs to run in a separate thread... And it is really hard to get
932 // everything started up reliably. So just be super generous on timeouts and
933 // hope for the best. We can be more generous in the future if we need to.
934 //
935 // We are faking the application names by passing in --application_name=foo
Austin Schuh5344c352020-04-12 17:04:26 -0700936 // Force ourselves to be "raspberrypi" and allocate everything.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800937 OnPi1();
938 MakePi1Server();
939 MakePi1Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700940
941 // And build the app for testing.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800942 MakePi1Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700943 aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800944 pi1_test_event_loop->MakeFetcher<ServerStatistics>("/pi1/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700945 aos::Fetcher<ClientStatistics> pi1_client_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800946 pi1_test_event_loop->MakeFetcher<ClientStatistics>("/pi1/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700947
948 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800949 OnPi2();
950 MakePi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700951
952 // And build the app for testing.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800953 MakePi2Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700954 aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800955 pi2_test_event_loop->MakeFetcher<ServerStatistics>("/pi2/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700956
957 // Start everything up. Pong is the only thing we don't know how to wait on,
958 // so start it first.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800959 StartPi1Test();
960 StartPi2Test();
961 StartPi1Server();
962 StartPi1Client();
963 StartPi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700964
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800965 // Confirm both client and server statistics messages have decent offsets in
966 // them.
Austin Schuh5344c352020-04-12 17:04:26 -0700967
968 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800969 MakePi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700970
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800971 RunPi2Server(chrono::milliseconds(3050));
Austin Schuh5344c352020-04-12 17:04:26 -0700972
973 // Now confirm we are synchronized.
974 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
975 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
976
977 const ServerConnection *const pi1_connection =
978 pi1_server_statistics_fetcher->connections()->Get(0);
979 const ServerConnection *const pi2_connection =
980 pi2_server_statistics_fetcher->connections()->Get(0);
981
982 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
983 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
984 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
985 chrono::milliseconds(1));
986 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
987 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800988 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh367a7f42021-11-23 23:04:36 -0800989 EXPECT_TRUE(pi1_connection->has_connected_since_time());
990 EXPECT_EQ(pi1_connection->connection_count(), 1u);
Austin Schuh5344c352020-04-12 17:04:26 -0700991
992 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
993 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
994 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
995 chrono::milliseconds(1));
996 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
997 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800998 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh367a7f42021-11-23 23:04:36 -0800999 EXPECT_TRUE(pi2_connection->has_connected_since_time());
1000 EXPECT_EQ(pi2_connection->connection_count(), 1u);
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001001
1002 StopPi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -07001003 }
1004
1005 std::this_thread::sleep_for(std::chrono::seconds(2));
1006
1007 {
1008 // And confirm we are unsynchronized.
1009 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1010 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1011
1012 const ServerConnection *const pi1_server_connection =
1013 pi1_server_statistics_fetcher->connections()->Get(0);
1014 const ClientConnection *const pi1_client_connection =
1015 pi1_client_statistics_fetcher->connections()->Get(0);
1016
1017 EXPECT_EQ(pi1_server_connection->state(), State::CONNECTED);
1018 EXPECT_FALSE(pi1_server_connection->has_monotonic_offset());
Austin Schuh367a7f42021-11-23 23:04:36 -08001019 EXPECT_TRUE(pi1_server_connection->has_connected_since_time());
1020 EXPECT_EQ(pi1_server_connection->connection_count(), 1u);
1021
Austin Schuh20ac95d2020-12-05 17:24:19 -08001022 EXPECT_TRUE(pi1_server_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -07001023 EXPECT_EQ(pi1_client_connection->state(), State::DISCONNECTED);
1024 EXPECT_FALSE(pi1_client_connection->has_monotonic_offset());
Austin Schuh367a7f42021-11-23 23:04:36 -08001025 EXPECT_FALSE(pi1_client_connection->has_connected_since_time());
1026 EXPECT_EQ(pi1_client_connection->connection_count(), 1u);
1027 EXPECT_FALSE(pi1_client_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -07001028 }
1029
1030 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001031 MakePi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -07001032
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001033 RunPi2Server(chrono::milliseconds(3050));
Austin Schuh5344c352020-04-12 17:04:26 -07001034
1035 // And confirm we are synchronized again.
1036 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1037 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
Austin Schuh367a7f42021-11-23 23:04:36 -08001038 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
Austin Schuh5344c352020-04-12 17:04:26 -07001039
1040 const ServerConnection *const pi1_connection =
1041 pi1_server_statistics_fetcher->connections()->Get(0);
1042 const ServerConnection *const pi2_connection =
1043 pi2_server_statistics_fetcher->connections()->Get(0);
Austin Schuh367a7f42021-11-23 23:04:36 -08001044 const ClientConnection *const pi1_client_connection =
1045 pi1_client_statistics_fetcher->connections()->Get(0);
Austin Schuh5344c352020-04-12 17:04:26 -07001046
1047 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
1048 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
1049 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
1050 chrono::milliseconds(1));
1051 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
1052 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -08001053 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -07001054
Austin Schuh367a7f42021-11-23 23:04:36 -08001055 EXPECT_EQ(pi1_client_connection->state(), State::CONNECTED);
1056 EXPECT_TRUE(pi1_client_connection->has_connected_since_time());
1057 EXPECT_EQ(pi1_client_connection->connection_count(), 2u);
1058 EXPECT_TRUE(pi1_client_connection->has_boot_uuid());
1059
Austin Schuh5344c352020-04-12 17:04:26 -07001060 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
1061 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
1062 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
1063 chrono::milliseconds(1));
1064 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
1065 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -08001066 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001067
1068 StopPi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -07001069 }
1070
1071 // Shut everyone else down
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001072 StopPi1Server();
1073 StopPi1Client();
1074 StopPi2Client();
1075 StopPi1Test();
1076 StopPi2Test();
Austin Schuh5344c352020-04-12 17:04:26 -07001077}
1078
Austin Schuh4889b182020-11-18 19:11:56 -08001079// TODO(austin): The above test confirms that the external state does the right
Austin Schuh5344c352020-04-12 17:04:26 -07001080// thing, but doesn't confirm that the internal state does. We either need to
1081// expose a way to check the state in a thread-safe way, or need a way to jump
1082// time for one node to do that.
1083
Austin Schuh4889b182020-11-18 19:11:56 -08001084void SendPing(aos::Sender<examples::Ping> *sender, int value) {
1085 aos::Sender<examples::Ping>::Builder builder = sender->MakeBuilder();
1086 examples::Ping::Builder ping_builder = builder.MakeBuilder<examples::Ping>();
1087 ping_builder.add_value(value);
milind1f1dca32021-07-03 13:50:07 -07001088 builder.CheckOk(builder.Send(ping_builder.Finish()));
Austin Schuh4889b182020-11-18 19:11:56 -08001089}
1090
1091// Tests that when a message is sent before the bridge starts up, but is
1092// configured as reliable, we forward it. Confirm this survives a client reset.
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001093TEST_P(MessageBridgeParameterizedTest, ReliableSentBeforeClientStartup) {
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001094 OnPi1();
Austin Schuh4889b182020-11-18 19:11:56 -08001095
1096 FLAGS_application_name = "sender";
Austin Schuhf466ab52021-02-16 22:00:38 -08001097 aos::ShmEventLoop send_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -08001098 aos::Sender<examples::Ping> ping_sender =
1099 send_event_loop.MakeSender<examples::Ping>("/test");
1100 SendPing(&ping_sender, 1);
1101 aos::Sender<examples::Ping> unreliable_ping_sender =
1102 send_event_loop.MakeSender<examples::Ping>("/unreliable");
1103 SendPing(&unreliable_ping_sender, 1);
1104
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001105 MakePi1Server();
1106 MakePi1Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001107
1108 FLAGS_application_name = "pi1_timestamp";
Austin Schuhf466ab52021-02-16 22:00:38 -08001109 aos::ShmEventLoop pi1_remote_timestamp_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -08001110
1111 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001112 OnPi2();
Austin Schuh4889b182020-11-18 19:11:56 -08001113
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001114 MakePi2Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001115
Austin Schuhf466ab52021-02-16 22:00:38 -08001116 aos::ShmEventLoop receive_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -08001117 aos::Fetcher<examples::Ping> ping_fetcher =
1118 receive_event_loop.MakeFetcher<examples::Ping>("/test");
1119 aos::Fetcher<examples::Ping> unreliable_ping_fetcher =
1120 receive_event_loop.MakeFetcher<examples::Ping>("/unreliable");
1121 aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
1122 receive_event_loop.MakeFetcher<ClientStatistics>("/pi2/aos");
1123
1124 const size_t ping_channel_index = configuration::ChannelIndex(
1125 receive_event_loop.configuration(), ping_fetcher.channel());
1126
1127 std::atomic<int> ping_timestamp_count{0};
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001128 const std::string channel_name =
1129 shared() ? "/pi1/aos/remote_timestamps/pi2"
1130 : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping";
Austin Schuh4889b182020-11-18 19:11:56 -08001131 pi1_remote_timestamp_event_loop.MakeWatcher(
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001132 channel_name, [this, channel_name, ping_channel_index,
1133 &ping_timestamp_count](const RemoteMessage &header) {
Austin Schuh61e973f2021-02-21 21:43:56 -08001134 VLOG(1) << channel_name << " RemoteMessage "
Austin Schuh0de30f32020-12-06 12:44:28 -08001135 << aos::FlatbufferToJson(&header);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001136 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001137 if (shared() && header.channel_index() != ping_channel_index) {
1138 return;
Austin Schuh4889b182020-11-18 19:11:56 -08001139 }
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001140 CHECK_EQ(header.channel_index(), ping_channel_index);
1141 ++ping_timestamp_count;
Austin Schuh4889b182020-11-18 19:11:56 -08001142 });
1143
1144 // Before everything starts up, confirm there is no message.
1145 EXPECT_FALSE(ping_fetcher.Fetch());
1146 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
1147
1148 // Spin up the persistant pieces.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001149 StartPi1Server();
1150 StartPi1Client();
1151 StartPi2Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001152
1153 // Event used to wait for the timestamp counting thread to start.
1154 aos::Event event;
1155 std::thread pi1_remote_timestamp_thread(
1156 [&pi1_remote_timestamp_event_loop, &event]() {
1157 pi1_remote_timestamp_event_loop.OnRun([&event]() { event.Set(); });
1158 pi1_remote_timestamp_event_loop.Run();
1159 });
1160
1161 event.Wait();
1162
1163 {
1164 // Now, spin up a client for 2 seconds.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001165 MakePi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001166
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001167 RunPi2Client(chrono::milliseconds(2050));
Austin Schuh4889b182020-11-18 19:11:56 -08001168
1169 // Confirm there is no detected duplicate packet.
1170 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1171 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1172 ->Get(0)
1173 ->duplicate_packets(),
1174 0u);
1175
Austin Schuhe61d4382021-03-31 21:33:02 -07001176 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1177 ->Get(0)
1178 ->partial_deliveries(),
1179 0u);
1180
Austin Schuh4889b182020-11-18 19:11:56 -08001181 EXPECT_TRUE(ping_fetcher.Fetch());
1182 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
1183 EXPECT_EQ(ping_timestamp_count, 1);
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001184
1185 StopPi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001186 }
1187
1188 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001189 // Now, spin up a client for 2 seconds.
1190 MakePi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001191
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001192 RunPi2Client(chrono::milliseconds(5050));
Austin Schuh4889b182020-11-18 19:11:56 -08001193
1194 // Confirm we detect the duplicate packet correctly.
1195 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1196 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1197 ->Get(0)
1198 ->duplicate_packets(),
1199 1u);
1200
Austin Schuhe61d4382021-03-31 21:33:02 -07001201 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1202 ->Get(0)
1203 ->partial_deliveries(),
1204 0u);
1205
Austin Schuh4889b182020-11-18 19:11:56 -08001206 EXPECT_EQ(ping_timestamp_count, 1);
1207 EXPECT_FALSE(ping_fetcher.Fetch());
1208 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001209
1210 StopPi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001211 }
1212
1213 // Shut everyone else down
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001214 StopPi1Client();
1215 StopPi2Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001216 pi1_remote_timestamp_event_loop.Exit();
1217 pi1_remote_timestamp_thread.join();
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001218 StopPi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001219}
1220
1221// Tests that when a message is sent before the bridge starts up, but is
1222// configured as reliable, we forward it. Confirm this works across server
1223// resets.
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001224TEST_P(MessageBridgeParameterizedTest, ReliableSentBeforeServerStartup) {
Austin Schuh4889b182020-11-18 19:11:56 -08001225 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001226 OnPi2();
Austin Schuh4889b182020-11-18 19:11:56 -08001227
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001228 MakePi2Server();
1229 MakePi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001230
Austin Schuhf466ab52021-02-16 22:00:38 -08001231 aos::ShmEventLoop receive_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -08001232 aos::Fetcher<examples::Ping> ping_fetcher =
1233 receive_event_loop.MakeFetcher<examples::Ping>("/test");
1234 aos::Fetcher<examples::Ping> unreliable_ping_fetcher =
1235 receive_event_loop.MakeFetcher<examples::Ping>("/unreliable");
1236 aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
1237 receive_event_loop.MakeFetcher<ClientStatistics>("/pi2/aos");
1238
Austin Schuh4889b182020-11-18 19:11:56 -08001239 // Force ourselves to be "raspberrypi" and allocate everything.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001240 OnPi1();
Austin Schuh4889b182020-11-18 19:11:56 -08001241
1242 FLAGS_application_name = "sender";
Austin Schuhf466ab52021-02-16 22:00:38 -08001243 aos::ShmEventLoop send_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -08001244 aos::Sender<examples::Ping> ping_sender =
1245 send_event_loop.MakeSender<examples::Ping>("/test");
1246 {
1247 aos::Sender<examples::Ping>::Builder builder = ping_sender.MakeBuilder();
1248 examples::Ping::Builder ping_builder =
1249 builder.MakeBuilder<examples::Ping>();
1250 ping_builder.add_value(1);
milind1f1dca32021-07-03 13:50:07 -07001251 builder.CheckOk(builder.Send(ping_builder.Finish()));
Austin Schuh4889b182020-11-18 19:11:56 -08001252 }
1253
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001254 MakePi1Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001255
1256 FLAGS_application_name = "pi1_timestamp";
Austin Schuhf466ab52021-02-16 22:00:38 -08001257 aos::ShmEventLoop pi1_remote_timestamp_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -08001258
1259 const size_t ping_channel_index = configuration::ChannelIndex(
1260 receive_event_loop.configuration(), ping_fetcher.channel());
1261
1262 std::atomic<int> ping_timestamp_count{0};
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001263 const std::string channel_name =
1264 shared() ? "/pi1/aos/remote_timestamps/pi2"
1265 : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping";
Austin Schuh4889b182020-11-18 19:11:56 -08001266 pi1_remote_timestamp_event_loop.MakeWatcher(
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001267 channel_name, [this, channel_name, ping_channel_index,
1268 &ping_timestamp_count](const RemoteMessage &header) {
1269 VLOG(1) << channel_name << " RemoteMessage "
Austin Schuh0de30f32020-12-06 12:44:28 -08001270 << aos::FlatbufferToJson(&header);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001271 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001272 if (shared() && header.channel_index() != ping_channel_index) {
1273 return;
Austin Schuh4889b182020-11-18 19:11:56 -08001274 }
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001275 CHECK_EQ(header.channel_index(), ping_channel_index);
1276 ++ping_timestamp_count;
Austin Schuh4889b182020-11-18 19:11:56 -08001277 });
1278
1279 // Before everything starts up, confirm there is no message.
1280 EXPECT_FALSE(ping_fetcher.Fetch());
1281 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
1282
1283 // Spin up the persistant pieces.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001284 StartPi1Client();
1285 StartPi2Server();
1286 StartPi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001287
1288 // Event used to wait for the timestamp counting thread to start.
1289 aos::Event event;
1290 std::thread pi1_remote_timestamp_thread(
1291 [&pi1_remote_timestamp_event_loop, &event]() {
1292 pi1_remote_timestamp_event_loop.OnRun([&event]() { event.Set(); });
1293 pi1_remote_timestamp_event_loop.Run();
1294 });
1295
1296 event.Wait();
1297
1298 {
1299 // Now, spin up a server for 2 seconds.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001300 MakePi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001301
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001302 RunPi1Server(chrono::milliseconds(2050));
Austin Schuh4889b182020-11-18 19:11:56 -08001303
1304 // Confirm there is no detected duplicate packet.
1305 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1306 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1307 ->Get(0)
1308 ->duplicate_packets(),
1309 0u);
1310
Austin Schuhe61d4382021-03-31 21:33:02 -07001311 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1312 ->Get(0)
1313 ->partial_deliveries(),
1314 0u);
1315
Austin Schuh4889b182020-11-18 19:11:56 -08001316 EXPECT_TRUE(ping_fetcher.Fetch());
1317 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
1318 EXPECT_EQ(ping_timestamp_count, 1);
1319 LOG(INFO) << "Shutting down first pi1 MessageBridgeServer";
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001320
1321 StopPi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001322 }
1323
1324 {
1325 // Now, spin up a second server for 2 seconds.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001326 MakePi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001327
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001328 RunPi1Server(chrono::milliseconds(2050));
Austin Schuh4889b182020-11-18 19:11:56 -08001329
1330 // Confirm we detect the duplicate packet correctly.
1331 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1332 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1333 ->Get(0)
1334 ->duplicate_packets(),
1335 1u);
1336
Austin Schuhe61d4382021-03-31 21:33:02 -07001337 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1338 ->Get(0)
1339 ->partial_deliveries(),
1340 0u);
1341
Austin Schuh4889b182020-11-18 19:11:56 -08001342 EXPECT_EQ(ping_timestamp_count, 1);
1343 EXPECT_FALSE(ping_fetcher.Fetch());
1344 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001345
1346 StopPi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001347 }
1348
1349 // Shut everyone else down
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001350 StopPi1Client();
1351 StopPi2Server();
1352 StopPi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001353 pi1_remote_timestamp_event_loop.Exit();
1354 pi1_remote_timestamp_thread.join();
Austin Schuh4889b182020-11-18 19:11:56 -08001355}
1356
Austin Schuhb0e439d2023-05-15 10:55:40 -07001357// Test that differing config sha256's result in no connection.
1358TEST_P(MessageBridgeParameterizedTest, MismatchedSha256) {
1359 // This is rather annoying to set up. We need to start up a client and
1360 // server, on the same node, but get them to think that they are on different
1361 // nodes.
1362 //
1363 // We need the client to not post directly to "/test" like it would in a
1364 // real system, otherwise we will re-send the ping message... So, use an
1365 // application specific map to have the client post somewhere else.
1366 //
1367 // To top this all off, each of these needs to be done with a ShmEventLoop,
1368 // which needs to run in a separate thread... And it is really hard to get
1369 // everything started up reliably. So just be super generous on timeouts and
1370 // hope for the best. We can be more generous in the future if we need to.
1371 //
1372 // We are faking the application names by passing in --application_name=foo
1373 OnPi1();
1374
Austin Schuh530f9ee2023-05-15 14:29:31 -07001375 MakePi1Server(
1376 "dummy sha256 ");
Austin Schuhb0e439d2023-05-15 10:55:40 -07001377 MakePi1Client();
1378
1379 // And build the app for testing.
1380 MakePi1Test();
1381 aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
1382 pi1_test_event_loop->MakeFetcher<ServerStatistics>("/pi1/aos");
1383 aos::Fetcher<ClientStatistics> pi1_client_statistics_fetcher =
1384 pi1_test_event_loop->MakeFetcher<ClientStatistics>("/pi1/aos");
1385
1386 // Now do it for "raspberrypi2", the client.
1387 OnPi2();
1388 MakePi2Server();
1389
1390 // And build the app for testing.
1391 MakePi2Test();
1392 aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
1393 pi2_test_event_loop->MakeFetcher<ServerStatistics>("/pi2/aos");
1394 aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
1395 pi2_test_event_loop->MakeFetcher<ClientStatistics>("/pi2/aos");
1396
1397 // Wait until we are connected, then send.
1398
1399 StartPi1Test();
1400 StartPi2Test();
1401 StartPi1Server();
1402 StartPi1Client();
1403 StartPi2Server();
1404
1405 {
1406 MakePi2Client();
1407
1408 RunPi2Client(chrono::milliseconds(3050));
1409
1410 // Now confirm we are synchronized.
1411 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1412 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1413 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1414 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1415
1416 const ServerConnection *const pi1_connection =
1417 pi1_server_statistics_fetcher->connections()->Get(0);
1418 const ClientConnection *const pi1_client_connection =
1419 pi1_client_statistics_fetcher->connections()->Get(0);
1420 const ServerConnection *const pi2_connection =
1421 pi2_server_statistics_fetcher->connections()->Get(0);
1422 const ClientConnection *const pi2_client_connection =
1423 pi2_client_statistics_fetcher->connections()->Get(0);
1424
1425 // Make sure one direction is disconnected with a bunch of connection
1426 // attempts and failures.
1427 EXPECT_EQ(pi1_connection->state(), State::DISCONNECTED);
1428 EXPECT_EQ(pi1_connection->connection_count(), 0u);
1429 EXPECT_GT(pi1_connection->invalid_connection_count(), 10u);
1430
1431 EXPECT_EQ(pi2_client_connection->state(), State::DISCONNECTED);
1432 EXPECT_GT(pi2_client_connection->connection_count(), 10u);
1433
1434 // And the other direction is happy.
1435 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
1436 EXPECT_EQ(pi2_connection->connection_count(), 1u);
1437 EXPECT_TRUE(pi2_connection->has_connected_since_time());
1438 EXPECT_FALSE(pi2_connection->has_monotonic_offset());
1439 EXPECT_TRUE(pi2_connection->has_boot_uuid());
1440
1441 EXPECT_EQ(pi1_client_connection->state(), State::CONNECTED);
1442 EXPECT_EQ(pi1_client_connection->connection_count(), 1u);
1443
1444 VLOG(1) << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1445 VLOG(1) << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1446 VLOG(1) << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1447 VLOG(1) << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1448
1449 StopPi2Client();
1450 }
1451
1452 // Shut everyone else down
1453 StopPi1Server();
1454 StopPi1Client();
1455 StopPi2Server();
1456 StopPi1Test();
1457 StopPi2Test();
1458}
1459
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -07001460INSTANTIATE_TEST_SUITE_P(
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001461 MessageBridgeTests, MessageBridgeParameterizedTest,
1462 ::testing::Values(
1463 Param{"message_bridge_test_combined_timestamps_common_config.json",
1464 true},
1465 Param{"message_bridge_test_common_config.json", false}));
1466
Austin Schuhe84c3ed2019-12-14 15:29:48 -08001467} // namespace testing
1468} // namespace message_bridge
1469} // namespace aos