blob: ccb907be2a715664d416a2b934ac4f8bff4f5784 [file] [log] [blame]
Austin Schuhe84c3ed2019-12-14 15:29:48 -08001#include <chrono>
2#include <thread>
3
Austin Schuh2f8fd752020-09-01 22:38:28 -07004#include "absl/strings/str_cat.h"
Philipp Schrader790cb542023-07-05 21:06:52 -07005#include "gtest/gtest.h"
6
Austin Schuhe84c3ed2019-12-14 15:29:48 -08007#include "aos/events/ping_generated.h"
8#include "aos/events/pong_generated.h"
Brian Silverman7b266d92021-02-17 21:24:02 -08009#include "aos/ipc_lib/event.h"
Austin Schuhe84c3ed2019-12-14 15:29:48 -080010#include "aos/network/message_bridge_client_lib.h"
Austin Schuh89f23e32023-05-15 17:06:43 -070011#include "aos/network/message_bridge_protocol.h"
Austin Schuhe84c3ed2019-12-14 15:29:48 -080012#include "aos/network/message_bridge_server_lib.h"
Jim Ostrowski2192ddb2020-06-24 19:07:31 -070013#include "aos/network/team_number.h"
Austin Schuhb0e439d2023-05-15 10:55:40 -070014#include "aos/sha256.h"
Austin Schuh373f1762021-06-02 21:07:09 -070015#include "aos/testing/path.h"
Austin Schuhe991fe22020-11-18 16:53:39 -080016#include "aos/util/file.h"
Austin Schuhe84c3ed2019-12-14 15:29:48 -080017
Austin Schuh8902fa52021-03-14 22:39:24 -070018DECLARE_string(boot_uuid);
19
Austin Schuhe84c3ed2019-12-14 15:29:48 -080020namespace aos {
Austin Schuh2f8fd752020-09-01 22:38:28 -070021void SetShmBase(const std::string_view base);
22
Austin Schuhe84c3ed2019-12-14 15:29:48 -080023namespace message_bridge {
24namespace testing {
25
Austin Schuh373f1762021-06-02 21:07:09 -070026using aos::testing::ArtifactPath;
27
Austin Schuhe84c3ed2019-12-14 15:29:48 -080028namespace chrono = std::chrono;
29
Austin Schuhe991fe22020-11-18 16:53:39 -080030std::string ShmBase(const std::string_view node) {
Austin Schuh2f8fd752020-09-01 22:38:28 -070031 const char *tmpdir_c_str = getenv("TEST_TMPDIR");
32 if (tmpdir_c_str != nullptr) {
Austin Schuhe991fe22020-11-18 16:53:39 -080033 return absl::StrCat(tmpdir_c_str, "/", node);
Austin Schuh2f8fd752020-09-01 22:38:28 -070034 } else {
Austin Schuhe991fe22020-11-18 16:53:39 -080035 return absl::StrCat("/dev/shm/", node);
Austin Schuh2f8fd752020-09-01 22:38:28 -070036 }
37}
38
Austin Schuhe991fe22020-11-18 16:53:39 -080039void DoSetShmBase(const std::string_view node) {
40 aos::SetShmBase(ShmBase(node));
41}
42
Austin Schuh36a2c3e2021-02-18 22:28:38 -080043// Parameters to run all the tests with.
44struct Param {
45 // The config file to use.
46 std::string config;
47 // If true, the RemoteMessage channel should be shared between all the remote
48 // channels. If false, there will be 1 RemoteMessage channel per remote
49 // channel.
50 bool shared;
51};
52
53class MessageBridgeParameterizedTest
54 : public ::testing::TestWithParam<struct Param> {
Austin Schuh0de30f32020-12-06 12:44:28 -080055 public:
Austin Schuh36a2c3e2021-02-18 22:28:38 -080056 MessageBridgeParameterizedTest()
57 : config(aos::configuration::ReadConfig(
Austin Schuh373f1762021-06-02 21:07:09 -070058 ArtifactPath(absl::StrCat("aos/network/", GetParam().config)))),
Austin Schuhb0e439d2023-05-15 10:55:40 -070059 config_sha256(Sha256(config.span())),
Austin Schuh8902fa52021-03-14 22:39:24 -070060 pi1_boot_uuid_(UUID::Random()),
61 pi2_boot_uuid_(UUID::Random()) {
Austin Schuh0de30f32020-12-06 12:44:28 -080062 util::UnlinkRecursive(ShmBase("pi1"));
63 util::UnlinkRecursive(ShmBase("pi2"));
64 }
Austin Schuhe991fe22020-11-18 16:53:39 -080065
Austin Schuh36a2c3e2021-02-18 22:28:38 -080066 bool shared() const { return GetParam().shared; }
67
Austin Schuh0a2f12f2021-01-08 22:48:29 -080068 void OnPi1() {
69 DoSetShmBase("pi1");
70 FLAGS_override_hostname = "raspberrypi";
Austin Schuh8902fa52021-03-14 22:39:24 -070071 FLAGS_boot_uuid = pi1_boot_uuid_.ToString();
Austin Schuh0a2f12f2021-01-08 22:48:29 -080072 }
73
74 void OnPi2() {
75 DoSetShmBase("pi2");
76 FLAGS_override_hostname = "raspberrypi2";
Austin Schuh8902fa52021-03-14 22:39:24 -070077 FLAGS_boot_uuid = pi2_boot_uuid_.ToString();
Austin Schuh0a2f12f2021-01-08 22:48:29 -080078 }
79
Austin Schuhb0e439d2023-05-15 10:55:40 -070080 void MakePi1Server(std::string server_config_sha256 = "") {
Austin Schuh0a2f12f2021-01-08 22:48:29 -080081 OnPi1();
82 FLAGS_application_name = "pi1_message_bridge_server";
83 pi1_server_event_loop =
Austin Schuhf466ab52021-02-16 22:00:38 -080084 std::make_unique<aos::ShmEventLoop>(&config.message());
Austin Schuh0a2f12f2021-01-08 22:48:29 -080085 pi1_server_event_loop->SetRuntimeRealtimePriority(1);
Austin Schuhb0e439d2023-05-15 10:55:40 -070086 pi1_message_bridge_server = std::make_unique<MessageBridgeServer>(
87 pi1_server_event_loop.get(), server_config_sha256.size() == 0
88 ? config_sha256
89 : server_config_sha256);
Austin Schuh0a2f12f2021-01-08 22:48:29 -080090 }
91
92 void RunPi1Server(chrono::nanoseconds duration) {
Philipp Schradera6712522023-07-05 20:25:11 -070093 // Set up a shutdown callback.
Austin Schuh0a2f12f2021-01-08 22:48:29 -080094 aos::TimerHandler *const quit = pi1_server_event_loop->AddTimer(
95 [this]() { pi1_server_event_loop->Exit(); });
96 pi1_server_event_loop->OnRun([this, quit, duration]() {
97 // Stop between timestamps, not exactly on them.
Philipp Schradera6712522023-07-05 20:25:11 -070098 quit->Schedule(pi1_server_event_loop->monotonic_now() + duration);
Austin Schuh0a2f12f2021-01-08 22:48:29 -080099 });
100
101 pi1_server_event_loop->Run();
102 }
103
104 void StartPi1Server() {
105 pi1_server_thread = std::thread([this]() {
106 LOG(INFO) << "Started pi1_message_bridge_server";
107 pi1_server_event_loop->Run();
108 });
109 }
110
111 void StopPi1Server() {
112 if (pi1_server_thread.joinable()) {
113 pi1_server_event_loop->Exit();
114 pi1_server_thread.join();
115 pi1_server_thread = std::thread();
116 }
117 pi1_message_bridge_server.reset();
118 pi1_server_event_loop.reset();
119 }
120
121 void MakePi1Client() {
122 OnPi1();
123 FLAGS_application_name = "pi1_message_bridge_client";
124 pi1_client_event_loop =
Austin Schuhf466ab52021-02-16 22:00:38 -0800125 std::make_unique<aos::ShmEventLoop>(&config.message());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800126 pi1_client_event_loop->SetRuntimeRealtimePriority(1);
Austin Schuhb0e439d2023-05-15 10:55:40 -0700127 pi1_message_bridge_client = std::make_unique<MessageBridgeClient>(
128 pi1_client_event_loop.get(), config_sha256);
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800129 }
130
131 void StartPi1Client() {
132 pi1_client_thread = std::thread([this]() {
133 LOG(INFO) << "Started pi1_message_bridge_client";
134 pi1_client_event_loop->Run();
135 });
136 }
137
138 void StopPi1Client() {
139 pi1_client_event_loop->Exit();
140 pi1_client_thread.join();
141 pi1_client_thread = std::thread();
142 pi1_message_bridge_client.reset();
143 pi1_client_event_loop.reset();
144 }
145
146 void MakePi1Test() {
147 OnPi1();
148 FLAGS_application_name = "test1";
149 pi1_test_event_loop =
Austin Schuhf466ab52021-02-16 22:00:38 -0800150 std::make_unique<aos::ShmEventLoop>(&config.message());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800151
152 pi1_test_event_loop->MakeWatcher(
153 "/pi1/aos", [](const ServerStatistics &stats) {
154 VLOG(1) << "/pi1/aos ServerStatistics " << FlatbufferToJson(&stats);
155 });
156
157 pi1_test_event_loop->MakeWatcher(
158 "/pi1/aos", [](const ClientStatistics &stats) {
159 VLOG(1) << "/pi1/aos ClientStatistics " << FlatbufferToJson(&stats);
160 });
161
162 pi1_test_event_loop->MakeWatcher(
163 "/pi1/aos", [](const Timestamp &timestamp) {
164 VLOG(1) << "/pi1/aos Timestamp " << FlatbufferToJson(&timestamp);
165 });
Austin Schuh8902fa52021-03-14 22:39:24 -0700166 pi1_test_event_loop->MakeWatcher(
167 "/pi2/aos", [this](const Timestamp &timestamp) {
168 VLOG(1) << "/pi2/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuha9012be2021-07-21 15:19:11 -0700169 EXPECT_EQ(pi1_test_event_loop->context().source_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -0700170 pi2_boot_uuid_);
171 });
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800172 }
173
174 void StartPi1Test() {
175 pi1_test_thread = std::thread([this]() {
176 LOG(INFO) << "Started pi1_test";
177 pi1_test_event_loop->Run();
178 });
179 }
180
181 void StopPi1Test() {
182 pi1_test_event_loop->Exit();
183 pi1_test_thread.join();
184 }
185
186 void MakePi2Server() {
187 OnPi2();
188 FLAGS_application_name = "pi2_message_bridge_server";
189 pi2_server_event_loop =
Austin Schuhf466ab52021-02-16 22:00:38 -0800190 std::make_unique<aos::ShmEventLoop>(&config.message());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800191 pi2_server_event_loop->SetRuntimeRealtimePriority(1);
Austin Schuhb0e439d2023-05-15 10:55:40 -0700192 pi2_message_bridge_server = std::make_unique<MessageBridgeServer>(
193 pi2_server_event_loop.get(), config_sha256);
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800194 }
195
196 void RunPi2Server(chrono::nanoseconds duration) {
Philipp Schradera6712522023-07-05 20:25:11 -0700197 // Set up a shutdown callback.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800198 aos::TimerHandler *const quit = pi2_server_event_loop->AddTimer(
199 [this]() { pi2_server_event_loop->Exit(); });
200 pi2_server_event_loop->OnRun([this, quit, duration]() {
201 // Stop between timestamps, not exactly on them.
Philipp Schradera6712522023-07-05 20:25:11 -0700202 quit->Schedule(pi2_server_event_loop->monotonic_now() + duration);
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800203 });
204
205 pi2_server_event_loop->Run();
206 }
207
208 void StartPi2Server() {
209 pi2_server_thread = std::thread([this]() {
210 LOG(INFO) << "Started pi2_message_bridge_server";
211 pi2_server_event_loop->Run();
212 });
213 }
214
215 void StopPi2Server() {
216 if (pi2_server_thread.joinable()) {
217 pi2_server_event_loop->Exit();
218 pi2_server_thread.join();
219 pi2_server_thread = std::thread();
220 }
221 pi2_message_bridge_server.reset();
222 pi2_server_event_loop.reset();
223 }
224
225 void MakePi2Client() {
226 OnPi2();
227 FLAGS_application_name = "pi2_message_bridge_client";
228 pi2_client_event_loop =
Austin Schuhf466ab52021-02-16 22:00:38 -0800229 std::make_unique<aos::ShmEventLoop>(&config.message());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800230 pi2_client_event_loop->SetRuntimeRealtimePriority(1);
Austin Schuhb0e439d2023-05-15 10:55:40 -0700231 pi2_message_bridge_client = std::make_unique<MessageBridgeClient>(
232 pi2_client_event_loop.get(), config_sha256);
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800233 }
234
235 void RunPi2Client(chrono::nanoseconds duration) {
236 // Run for 5 seconds to make sure we have time to estimate the offset.
237 aos::TimerHandler *const quit = pi2_client_event_loop->AddTimer(
238 [this]() { pi2_client_event_loop->Exit(); });
239 pi2_client_event_loop->OnRun([this, quit, duration]() {
240 // Stop between timestamps, not exactly on them.
Philipp Schradera6712522023-07-05 20:25:11 -0700241 quit->Schedule(pi2_client_event_loop->monotonic_now() + duration);
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800242 });
243
244 // And go!
245 pi2_client_event_loop->Run();
246 }
247
248 void StartPi2Client() {
249 pi2_client_thread = std::thread([this]() {
250 LOG(INFO) << "Started pi2_message_bridge_client";
251 pi2_client_event_loop->Run();
252 });
253 }
254
255 void StopPi2Client() {
256 if (pi2_client_thread.joinable()) {
257 pi2_client_event_loop->Exit();
258 pi2_client_thread.join();
259 pi2_client_thread = std::thread();
260 }
261 pi2_message_bridge_client.reset();
262 pi2_client_event_loop.reset();
263 }
264
265 void MakePi2Test() {
266 OnPi2();
267 FLAGS_application_name = "test2";
268 pi2_test_event_loop =
Austin Schuhf466ab52021-02-16 22:00:38 -0800269 std::make_unique<aos::ShmEventLoop>(&config.message());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800270
271 pi2_test_event_loop->MakeWatcher(
272 "/pi2/aos", [](const ServerStatistics &stats) {
273 VLOG(1) << "/pi2/aos ServerStatistics " << FlatbufferToJson(&stats);
274 });
275
276 pi2_test_event_loop->MakeWatcher(
277 "/pi2/aos", [](const ClientStatistics &stats) {
278 VLOG(1) << "/pi2/aos ClientStatistics " << FlatbufferToJson(&stats);
279 });
280
281 pi2_test_event_loop->MakeWatcher(
Austin Schuh8902fa52021-03-14 22:39:24 -0700282 "/pi1/aos", [this](const Timestamp &timestamp) {
283 VLOG(1) << "/pi1/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuha9012be2021-07-21 15:19:11 -0700284 EXPECT_EQ(pi2_test_event_loop->context().source_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -0700285 pi1_boot_uuid_);
286 });
287 pi2_test_event_loop->MakeWatcher(
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800288 "/pi2/aos", [](const Timestamp &timestamp) {
289 VLOG(1) << "/pi2/aos Timestamp " << FlatbufferToJson(&timestamp);
290 });
291 }
292
293 void StartPi2Test() {
294 pi2_test_thread = std::thread([this]() {
295 LOG(INFO) << "Started pi2_message_bridge_test";
296 pi2_test_event_loop->Run();
297 });
298 }
299
300 void StopPi2Test() {
301 pi2_test_event_loop->Exit();
302 pi2_test_thread.join();
303 }
304
Austin Schuhf466ab52021-02-16 22:00:38 -0800305 aos::FlatbufferDetachedBuffer<aos::Configuration> config;
Austin Schuhb0e439d2023-05-15 10:55:40 -0700306 std::string config_sha256;
307
Austin Schuh8902fa52021-03-14 22:39:24 -0700308 const UUID pi1_boot_uuid_;
309 const UUID pi2_boot_uuid_;
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800310
311 std::unique_ptr<aos::ShmEventLoop> pi1_server_event_loop;
312 std::unique_ptr<MessageBridgeServer> pi1_message_bridge_server;
313 std::thread pi1_server_thread;
314
315 std::unique_ptr<aos::ShmEventLoop> pi1_client_event_loop;
316 std::unique_ptr<MessageBridgeClient> pi1_message_bridge_client;
317 std::thread pi1_client_thread;
318
319 std::unique_ptr<aos::ShmEventLoop> pi1_test_event_loop;
320 std::thread pi1_test_thread;
321
322 std::unique_ptr<aos::ShmEventLoop> pi2_server_event_loop;
323 std::unique_ptr<MessageBridgeServer> pi2_message_bridge_server;
324 std::thread pi2_server_thread;
325
326 std::unique_ptr<aos::ShmEventLoop> pi2_client_event_loop;
327 std::unique_ptr<MessageBridgeClient> pi2_message_bridge_client;
328 std::thread pi2_client_thread;
329
330 std::unique_ptr<aos::ShmEventLoop> pi2_test_event_loop;
331 std::thread pi2_test_thread;
Austin Schuhe991fe22020-11-18 16:53:39 -0800332};
333
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800334// Test that we can send a ping message over sctp and receive it.
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800335TEST_P(MessageBridgeParameterizedTest, PingPong) {
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800336 // This is rather annoying to set up. We need to start up a client and
337 // server, on the same node, but get them to think that they are on different
338 // nodes.
339 //
340 // We then get to wait until they are connected.
341 //
342 // After they are connected, we send a Ping message.
343 //
344 // On the other end, we receive a Pong message.
345 //
346 // But, we need the client to not post directly to "/test" like it would in a
347 // real system, otherwise we will re-send the ping message... So, use an
348 // application specific map to have the client post somewhere else.
349 //
350 // To top this all off, each of these needs to be done with a ShmEventLoop,
351 // which needs to run in a separate thread... And it is really hard to get
352 // everything started up reliably. So just be super generous on timeouts and
353 // hope for the best. We can be more generous in the future if we need to.
354 //
355 // We are faking the application names by passing in --application_name=foo
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800356 OnPi1();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800357 // Force ourselves to be "raspberrypi" and allocate everything.
Austin Schuh2f8fd752020-09-01 22:38:28 -0700358
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800359 MakePi1Server();
360 MakePi1Client();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800361
Austin Schuh89e1e9c2023-05-15 14:38:44 -0700362 const std::string long_data = std::string(10000, 'a');
363
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800364 // And build the app which sends the pings.
365 FLAGS_application_name = "ping";
Austin Schuhf466ab52021-02-16 22:00:38 -0800366 aos::ShmEventLoop ping_event_loop(&config.message());
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800367 aos::Sender<examples::Ping> ping_sender =
368 ping_event_loop.MakeSender<examples::Ping>("/test");
369
Austin Schuhf466ab52021-02-16 22:00:38 -0800370 aos::ShmEventLoop pi1_test_event_loop(&config.message());
Austin Schuh0de30f32020-12-06 12:44:28 -0800371 aos::Fetcher<RemoteMessage> message_header_fetcher1 =
372 pi1_test_event_loop.MakeFetcher<RemoteMessage>(
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800373 shared() ? "/pi1/aos/remote_timestamps/pi2"
374 : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping");
Austin Schuh2f8fd752020-09-01 22:38:28 -0700375
376 // Fetchers for confirming the remote timestamps made it.
377 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
378 ping_event_loop.MakeFetcher<examples::Ping>("/test");
379 aos::Fetcher<Timestamp> pi1_on_pi1_timestamp_fetcher =
380 ping_event_loop.MakeFetcher<Timestamp>("/aos");
381
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800382 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800383 OnPi2();
Austin Schuh2f8fd752020-09-01 22:38:28 -0700384
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800385 MakePi2Client();
386 MakePi2Server();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800387
388 // And build the app which sends the pongs.
389 FLAGS_application_name = "pong";
Austin Schuhf466ab52021-02-16 22:00:38 -0800390 aos::ShmEventLoop pong_event_loop(&config.message());
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800391
Austin Schuh7bc59052020-02-16 23:48:33 -0800392 // And build the app for testing.
393 FLAGS_application_name = "test";
Austin Schuhf466ab52021-02-16 22:00:38 -0800394 aos::ShmEventLoop test_event_loop(&config.message());
Austin Schuh7bc59052020-02-16 23:48:33 -0800395
396 aos::Fetcher<ClientStatistics> client_statistics_fetcher =
397 test_event_loop.MakeFetcher<ClientStatistics>("/aos");
Austin Schuh0de30f32020-12-06 12:44:28 -0800398 aos::Fetcher<RemoteMessage> message_header_fetcher2 =
399 test_event_loop.MakeFetcher<RemoteMessage>(
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800400 shared() ? "/pi2/aos/remote_timestamps/pi1"
401 : "/pi2/aos/remote_timestamps/pi1/pi2/aos/"
402 "aos-message_bridge-Timestamp");
Austin Schuh2f8fd752020-09-01 22:38:28 -0700403
404 // Event loop for fetching data delivered to pi2 from pi1 to match up
405 // messages.
Austin Schuhf466ab52021-02-16 22:00:38 -0800406 aos::ShmEventLoop delivered_messages_event_loop(&config.message());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700407 aos::Fetcher<Timestamp> pi1_on_pi2_timestamp_fetcher =
408 delivered_messages_event_loop.MakeFetcher<Timestamp>("/pi1/aos");
409 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
410 delivered_messages_event_loop.MakeFetcher<examples::Ping>("/test");
411 EXPECT_FALSE(ping_on_pi2_fetcher.Fetch());
412 EXPECT_FALSE(pi1_on_pi2_timestamp_fetcher.Fetch());
Austin Schuh7bc59052020-02-16 23:48:33 -0800413
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800414 // Count the pongs.
415 int pong_count = 0;
Austin Schuh8902fa52021-03-14 22:39:24 -0700416 pong_event_loop.MakeWatcher("/test", [&pong_count, &pong_event_loop,
417 this](const examples::Ping &ping) {
Austin Schuha9012be2021-07-21 15:19:11 -0700418 EXPECT_EQ(pong_event_loop.context().source_boot_uuid, pi1_boot_uuid_);
Austin Schuh8902fa52021-03-14 22:39:24 -0700419 ++pong_count;
420 VLOG(1) << "Got ping back " << FlatbufferToJson(&ping);
421 });
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800422
423 FLAGS_override_hostname = "";
424
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800425 // Wait until we are connected, then send.
426 int ping_count = 0;
Austin Schuh7bc59052020-02-16 23:48:33 -0800427 int pi1_server_statistics_count = 0;
Philipp Schrader790cb542023-07-05 21:06:52 -0700428 ping_event_loop.MakeWatcher(
429 "/pi1/aos",
430 [this, &ping_count, &ping_sender, &pi1_server_statistics_count,
431 &long_data](const ServerStatistics &stats) {
432 VLOG(1) << "/pi1/aos ServerStatistics " << FlatbufferToJson(&stats);
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800433
Philipp Schrader790cb542023-07-05 21:06:52 -0700434 ASSERT_TRUE(stats.has_connections());
435 EXPECT_EQ(stats.connections()->size(), 1);
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800436
Philipp Schrader790cb542023-07-05 21:06:52 -0700437 bool connected = false;
438 for (const ServerConnection *connection : *stats.connections()) {
439 // Confirm that we are estimating the server time offset correctly. It
440 // should be about 0 since we are on the same machine here.
441 if (connection->has_monotonic_offset()) {
442 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
443 chrono::milliseconds(1));
444 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
445 chrono::milliseconds(-1));
446 ++pi1_server_statistics_count;
447 }
Austin Schuh7bc59052020-02-16 23:48:33 -0800448
Philipp Schrader790cb542023-07-05 21:06:52 -0700449 if (connection->node()->name()->string_view() ==
450 pi2_client_event_loop->node()->name()->string_view()) {
451 if (connection->state() == State::CONNECTED) {
452 EXPECT_TRUE(connection->has_boot_uuid());
453 EXPECT_EQ(connection->connection_count(), 1u);
454 EXPECT_LT(monotonic_clock::time_point(chrono::nanoseconds(
455 connection->connected_since_time())),
456 monotonic_clock::now());
457 connected = true;
458 } else {
459 EXPECT_FALSE(connection->has_connection_count());
460 EXPECT_FALSE(connection->has_connected_since_time());
461 }
462 }
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800463 }
464
Philipp Schrader790cb542023-07-05 21:06:52 -0700465 if (connected) {
466 VLOG(1) << "Connected! Sent ping.";
467 auto builder = ping_sender.MakeBuilder();
468 builder.fbb()->CreateString(long_data);
469 examples::Ping::Builder ping_builder =
470 builder.MakeBuilder<examples::Ping>();
471 ping_builder.add_value(ping_count + 971);
472 EXPECT_EQ(builder.Send(ping_builder.Finish()), RawSender::Error::kOk);
473 ++ping_count;
474 }
475 });
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800476
Austin Schuh7bc59052020-02-16 23:48:33 -0800477 // Confirm both client and server statistics messages have decent offsets in
478 // them.
479 int pi2_server_statistics_count = 0;
Austin Schuh196a4452020-03-15 23:12:03 -0700480 pong_event_loop.MakeWatcher("/pi2/aos", [&pi2_server_statistics_count](
Austin Schuh7bc59052020-02-16 23:48:33 -0800481 const ServerStatistics &stats) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800482 VLOG(1) << "/pi2/aos ServerStatistics " << FlatbufferToJson(&stats);
Austin Schuh7bc59052020-02-16 23:48:33 -0800483 for (const ServerConnection *connection : *stats.connections()) {
484 if (connection->has_monotonic_offset()) {
485 ++pi2_server_statistics_count;
486 // Confirm that we are estimating the server time offset correctly. It
487 // should be about 0 since we are on the same machine here.
488 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
489 chrono::milliseconds(1));
490 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
491 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800492 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh7bc59052020-02-16 23:48:33 -0800493 }
Austin Schuh367a7f42021-11-23 23:04:36 -0800494
495 if (connection->state() == State::CONNECTED) {
496 EXPECT_EQ(connection->connection_count(), 1u);
497 EXPECT_LT(monotonic_clock::time_point(
498 chrono::nanoseconds(connection->connected_since_time())),
499 monotonic_clock::now());
500 } else {
501 EXPECT_FALSE(connection->has_connection_count());
502 EXPECT_FALSE(connection->has_connected_since_time());
503 }
Austin Schuh7bc59052020-02-16 23:48:33 -0800504 }
505 });
506
507 int pi1_client_statistics_count = 0;
Austin Schuh367a7f42021-11-23 23:04:36 -0800508 int pi1_connected_client_statistics_count = 0;
509 ping_event_loop.MakeWatcher(
510 "/pi1/aos",
511 [&pi1_client_statistics_count,
512 &pi1_connected_client_statistics_count](const ClientStatistics &stats) {
513 VLOG(1) << "/pi1/aos ClientStatistics " << FlatbufferToJson(&stats);
Austin Schuh7bc59052020-02-16 23:48:33 -0800514
Austin Schuh367a7f42021-11-23 23:04:36 -0800515 for (const ClientConnection *connection : *stats.connections()) {
516 if (connection->has_monotonic_offset()) {
517 ++pi1_client_statistics_count;
518 // It takes at least 10 microseconds to send a message between the
519 // client and server. The min (filtered) time shouldn't be over 10
520 // milliseconds on localhost. This might have to bump up if this is
521 // proving flaky.
522 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
523 chrono::milliseconds(10))
524 << " " << connection->monotonic_offset()
525 << "ns vs 10000ns on iteration " << pi1_client_statistics_count;
526 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
527 chrono::microseconds(10))
528 << " " << connection->monotonic_offset()
529 << "ns vs 10000ns on iteration " << pi1_client_statistics_count;
530 }
531 if (connection->state() == State::CONNECTED) {
532 EXPECT_EQ(connection->connection_count(), 1u);
533 EXPECT_LT(monotonic_clock::time_point(chrono::nanoseconds(
534 connection->connected_since_time())),
535 monotonic_clock::now());
536 // The first Connected message may not have a UUID in it since no
537 // data has flown. That's fine.
538 if (pi1_connected_client_statistics_count > 0) {
539 EXPECT_TRUE(connection->has_boot_uuid())
540 << ": " << aos::FlatbufferToJson(connection);
541 }
542 ++pi1_connected_client_statistics_count;
543 } else {
544 EXPECT_FALSE(connection->has_connection_count());
545 EXPECT_FALSE(connection->has_connected_since_time());
546 }
547 }
548 });
Austin Schuh7bc59052020-02-16 23:48:33 -0800549
550 int pi2_client_statistics_count = 0;
Austin Schuh367a7f42021-11-23 23:04:36 -0800551 int pi2_connected_client_statistics_count = 0;
552 pong_event_loop.MakeWatcher(
553 "/pi2/aos",
554 [&pi2_client_statistics_count,
555 &pi2_connected_client_statistics_count](const ClientStatistics &stats) {
556 VLOG(1) << "/pi2/aos ClientStatistics " << FlatbufferToJson(&stats);
Austin Schuh7bc59052020-02-16 23:48:33 -0800557
Austin Schuh367a7f42021-11-23 23:04:36 -0800558 for (const ClientConnection *connection : *stats.connections()) {
559 if (connection->has_monotonic_offset()) {
560 ++pi2_client_statistics_count;
561 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
562 chrono::milliseconds(10))
563 << ": got " << aos::FlatbufferToJson(connection);
564 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
565 chrono::microseconds(10))
566 << ": got " << aos::FlatbufferToJson(connection);
567 }
568 if (connection->state() == State::CONNECTED) {
569 EXPECT_EQ(connection->connection_count(), 1u);
570 EXPECT_LT(monotonic_clock::time_point(chrono::nanoseconds(
571 connection->connected_since_time())),
572 monotonic_clock::now());
573 if (pi2_connected_client_statistics_count > 0) {
574 EXPECT_TRUE(connection->has_boot_uuid());
575 }
576 ++pi2_connected_client_statistics_count;
577 } else {
578 EXPECT_FALSE(connection->has_connection_count());
579 EXPECT_FALSE(connection->has_connected_since_time());
580 }
581 }
582 });
Austin Schuh7bc59052020-02-16 23:48:33 -0800583
Austin Schuh196a4452020-03-15 23:12:03 -0700584 ping_event_loop.MakeWatcher("/pi1/aos", [](const Timestamp &timestamp) {
Austin Schuh7bc59052020-02-16 23:48:33 -0800585 EXPECT_TRUE(timestamp.has_offsets());
Austin Schuh1ca49e92020-12-11 00:01:27 -0800586 VLOG(1) << "/pi1/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuh7bc59052020-02-16 23:48:33 -0800587 });
Austin Schuh196a4452020-03-15 23:12:03 -0700588 pong_event_loop.MakeWatcher("/pi2/aos", [](const Timestamp &timestamp) {
Austin Schuh7bc59052020-02-16 23:48:33 -0800589 EXPECT_TRUE(timestamp.has_offsets());
Austin Schuh1ca49e92020-12-11 00:01:27 -0800590 VLOG(1) << "/pi2/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuh7bc59052020-02-16 23:48:33 -0800591 });
592
593 // Run for 5 seconds to make sure we have time to estimate the offset.
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800594 aos::TimerHandler *quit = ping_event_loop.AddTimer(
595 [&ping_event_loop]() { ping_event_loop.Exit(); });
596 ping_event_loop.OnRun([quit, &ping_event_loop]() {
Austin Schuh7bc59052020-02-16 23:48:33 -0800597 // Stop between timestamps, not exactly on them.
Philipp Schradera6712522023-07-05 20:25:11 -0700598 quit->Schedule(ping_event_loop.monotonic_now() +
599 chrono::milliseconds(5050));
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800600 });
601
Austin Schuh2f8fd752020-09-01 22:38:28 -0700602 // Find the channel index for both the /pi1/aos Timestamp channel and Ping
603 // channel.
604 const size_t pi1_timestamp_channel = configuration::ChannelIndex(
605 pong_event_loop.configuration(), pi1_on_pi2_timestamp_fetcher.channel());
606 const size_t ping_timestamp_channel =
607 configuration::ChannelIndex(delivered_messages_event_loop.configuration(),
608 ping_on_pi2_fetcher.channel());
609
610 for (const Channel *channel : *ping_event_loop.configuration()->channels()) {
611 VLOG(1) << "Channel "
612 << configuration::ChannelIndex(ping_event_loop.configuration(),
613 channel)
614 << " " << configuration::CleanedChannelToString(channel);
615 }
616
617 // For each remote timestamp we get back, confirm that it is either a ping
618 // message, or a timestamp we sent out. Also confirm that the timestamps are
619 // correct.
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800620 for (std::pair<int, std::string> channel :
621 shared()
622 ? std::vector<std::pair<
623 int, std::string>>{{-1, "/pi1/aos/remote_timestamps/pi2"}}
624 : std::vector<std::pair<int, std::string>>{
625 {pi1_timestamp_channel,
626 "/pi1/aos/remote_timestamps/pi2/pi1/aos/"
627 "aos-message_bridge-Timestamp"},
628 {ping_timestamp_channel,
629 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping"}}) {
630 ping_event_loop.MakeWatcher(
631 channel.second,
632 [pi1_timestamp_channel, ping_timestamp_channel, &ping_on_pi2_fetcher,
633 &ping_on_pi1_fetcher, &pi1_on_pi2_timestamp_fetcher,
634 &pi1_on_pi1_timestamp_fetcher,
635 channel_index = channel.first](const RemoteMessage &header) {
636 VLOG(1) << "/pi1/aos/remote_timestamps/pi2 RemoteMessage "
637 << aos::FlatbufferToJson(&header);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700638
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800639 EXPECT_TRUE(header.has_boot_uuid());
640 if (channel_index != -1) {
641 ASSERT_EQ(channel_index, header.channel_index());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700642 }
643
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800644 const aos::monotonic_clock::time_point header_monotonic_sent_time(
645 chrono::nanoseconds(header.monotonic_sent_time()));
646 const aos::realtime_clock::time_point header_realtime_sent_time(
647 chrono::nanoseconds(header.realtime_sent_time()));
648 const aos::monotonic_clock::time_point header_monotonic_remote_time(
649 chrono::nanoseconds(header.monotonic_remote_time()));
650 const aos::realtime_clock::time_point header_realtime_remote_time(
651 chrono::nanoseconds(header.realtime_remote_time()));
652
653 const Context *pi1_context = nullptr;
654 const Context *pi2_context = nullptr;
655
656 if (header.channel_index() == pi1_timestamp_channel) {
657 // Find the forwarded message.
658 while (pi1_on_pi2_timestamp_fetcher.context().monotonic_event_time <
659 header_monotonic_sent_time) {
660 ASSERT_TRUE(pi1_on_pi2_timestamp_fetcher.FetchNext());
661 }
662
663 // And the source message.
664 while (pi1_on_pi1_timestamp_fetcher.context().monotonic_event_time <
665 header_monotonic_remote_time) {
666 ASSERT_TRUE(pi1_on_pi1_timestamp_fetcher.FetchNext());
667 }
668
669 pi1_context = &pi1_on_pi1_timestamp_fetcher.context();
670 pi2_context = &pi1_on_pi2_timestamp_fetcher.context();
671 } else if (header.channel_index() == ping_timestamp_channel) {
672 // Find the forwarded message.
673 while (ping_on_pi2_fetcher.context().monotonic_event_time <
674 header_monotonic_sent_time) {
675 ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
676 }
677
678 // And the source message.
679 while (ping_on_pi1_fetcher.context().monotonic_event_time <
680 header_monotonic_remote_time) {
681 ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
682 }
683
684 pi1_context = &ping_on_pi1_fetcher.context();
685 pi2_context = &ping_on_pi2_fetcher.context();
686 } else {
687 LOG(FATAL) << "Unknown channel";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700688 }
689
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800690 // Confirm the forwarded message has matching timestamps to the
691 // timestamps we got back.
692 EXPECT_EQ(pi2_context->queue_index, header.queue_index());
693 EXPECT_EQ(pi2_context->monotonic_event_time,
694 header_monotonic_sent_time);
695 EXPECT_EQ(pi2_context->realtime_event_time,
696 header_realtime_sent_time);
697 EXPECT_EQ(pi2_context->realtime_remote_time,
698 header_realtime_remote_time);
699 EXPECT_EQ(pi2_context->monotonic_remote_time,
700 header_monotonic_remote_time);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700701
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800702 // Confirm the forwarded message also matches the source message.
703 EXPECT_EQ(pi1_context->queue_index, header.queue_index());
704 EXPECT_EQ(pi1_context->monotonic_event_time,
705 header_monotonic_remote_time);
706 EXPECT_EQ(pi1_context->realtime_event_time,
707 header_realtime_remote_time);
708 });
709 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700710
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800711 // Start everything up. Pong is the only thing we don't know how to wait
712 // on, so start it first.
Austin Schuh7bc59052020-02-16 23:48:33 -0800713 std::thread pong_thread([&pong_event_loop]() { pong_event_loop.Run(); });
714
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800715 StartPi1Server();
716 StartPi1Client();
717 StartPi2Client();
718 StartPi2Server();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800719
720 // And go!
721 ping_event_loop.Run();
722
723 // Shut everyone else down
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800724 StopPi1Server();
725 StopPi1Client();
726 StopPi2Client();
727 StopPi2Server();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800728 pong_event_loop.Exit();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800729 pong_thread.join();
730
731 // Make sure we sent something.
732 EXPECT_GE(ping_count, 1);
733 // And got something back.
734 EXPECT_GE(pong_count, 1);
Austin Schuh7bc59052020-02-16 23:48:33 -0800735
736 // Confirm that we are estimating a monotonic offset on the client.
737 ASSERT_TRUE(client_statistics_fetcher.Fetch());
738
739 EXPECT_EQ(client_statistics_fetcher->connections()->size(), 1u);
740 EXPECT_EQ(client_statistics_fetcher->connections()
741 ->Get(0)
742 ->node()
743 ->name()
744 ->string_view(),
745 "pi1");
746
747 // Make sure the offset in one direction is less than a second.
748 EXPECT_GT(
Austin Schuh2b159eb2021-07-31 19:42:21 -0700749 client_statistics_fetcher->connections()->Get(0)->monotonic_offset(), 0)
750 << aos::FlatbufferToJson(client_statistics_fetcher.get());
Austin Schuh7bc59052020-02-16 23:48:33 -0800751 EXPECT_LT(
752 client_statistics_fetcher->connections()->Get(0)->monotonic_offset(),
Austin Schuh2b159eb2021-07-31 19:42:21 -0700753 1000000000)
754 << aos::FlatbufferToJson(client_statistics_fetcher.get());
Austin Schuh7bc59052020-02-16 23:48:33 -0800755
756 EXPECT_GE(pi1_server_statistics_count, 2);
757 EXPECT_GE(pi2_server_statistics_count, 2);
758 EXPECT_GE(pi1_client_statistics_count, 2);
759 EXPECT_GE(pi2_client_statistics_count, 2);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700760
761 // Confirm we got timestamps back!
762 EXPECT_TRUE(message_header_fetcher1.Fetch());
763 EXPECT_TRUE(message_header_fetcher2.Fetch());
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800764}
765
Austin Schuh5344c352020-04-12 17:04:26 -0700766// Test that the client disconnecting triggers the server offsets on both sides
767// to clear.
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800768TEST_P(MessageBridgeParameterizedTest, ClientRestart) {
Austin Schuh5344c352020-04-12 17:04:26 -0700769 // This is rather annoying to set up. We need to start up a client and
770 // server, on the same node, but get them to think that they are on different
771 // nodes.
772 //
773 // We need the client to not post directly to "/test" like it would in a
774 // real system, otherwise we will re-send the ping message... So, use an
775 // application specific map to have the client post somewhere else.
776 //
777 // To top this all off, each of these needs to be done with a ShmEventLoop,
778 // which needs to run in a separate thread... And it is really hard to get
779 // everything started up reliably. So just be super generous on timeouts and
780 // hope for the best. We can be more generous in the future if we need to.
781 //
782 // We are faking the application names by passing in --application_name=foo
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800783 OnPi1();
Austin Schuh5344c352020-04-12 17:04:26 -0700784
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800785 MakePi1Server();
786 MakePi1Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700787
788 // And build the app for testing.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800789 MakePi1Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700790 aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800791 pi1_test_event_loop->MakeFetcher<ServerStatistics>("/pi1/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700792
793 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800794 OnPi2();
795 MakePi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700796
797 // And build the app for testing.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800798 MakePi2Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700799 aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800800 pi2_test_event_loop->MakeFetcher<ServerStatistics>("/pi2/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700801
802 // Wait until we are connected, then send.
Austin Schuh5344c352020-04-12 17:04:26 -0700803
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800804 StartPi1Test();
805 StartPi2Test();
806 StartPi1Server();
807 StartPi1Client();
808 StartPi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700809
810 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800811 MakePi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700812
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800813 RunPi2Client(chrono::milliseconds(3050));
Austin Schuh5344c352020-04-12 17:04:26 -0700814
815 // Now confirm we are synchronized.
816 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
817 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
818
819 const ServerConnection *const pi1_connection =
820 pi1_server_statistics_fetcher->connections()->Get(0);
821 const ServerConnection *const pi2_connection =
822 pi2_server_statistics_fetcher->connections()->Get(0);
823
824 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800825 EXPECT_EQ(pi1_connection->connection_count(), 1u);
826 EXPECT_TRUE(pi1_connection->has_connected_since_time());
Austin Schuh5344c352020-04-12 17:04:26 -0700827 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
828 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
829 chrono::milliseconds(1));
830 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
831 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800832 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700833
834 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800835 EXPECT_EQ(pi2_connection->connection_count(), 1u);
836 EXPECT_TRUE(pi2_connection->has_connected_since_time());
Austin Schuh5344c352020-04-12 17:04:26 -0700837 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
838 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
839 chrono::milliseconds(1));
840 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
841 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800842 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800843
844 StopPi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700845 }
846
Austin Schuhd0d894e2021-10-24 17:13:11 -0700847 std::this_thread::sleep_for(SctpClientConnection::kReconnectTimeout +
848 std::chrono::seconds(1));
Austin Schuh5344c352020-04-12 17:04:26 -0700849
850 {
851 // Now confirm we are un-synchronized.
852 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
853 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
854 const ServerConnection *const pi1_connection =
855 pi1_server_statistics_fetcher->connections()->Get(0);
856 const ServerConnection *const pi2_connection =
857 pi2_server_statistics_fetcher->connections()->Get(0);
858
859 EXPECT_EQ(pi1_connection->state(), State::DISCONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800860 EXPECT_EQ(pi1_connection->connection_count(), 1u);
861 EXPECT_FALSE(pi1_connection->has_connected_since_time());
Austin Schuh5344c352020-04-12 17:04:26 -0700862 EXPECT_FALSE(pi1_connection->has_monotonic_offset());
Austin Schuh20ac95d2020-12-05 17:24:19 -0800863 EXPECT_FALSE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700864 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
865 EXPECT_FALSE(pi2_connection->has_monotonic_offset());
Austin Schuh20ac95d2020-12-05 17:24:19 -0800866 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh367a7f42021-11-23 23:04:36 -0800867 EXPECT_EQ(pi2_connection->connection_count(), 1u);
868 EXPECT_TRUE(pi2_connection->has_connected_since_time());
Austin Schuh5344c352020-04-12 17:04:26 -0700869 }
870
871 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800872 MakePi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700873 // And go!
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800874 RunPi2Client(chrono::milliseconds(3050));
Austin Schuh5344c352020-04-12 17:04:26 -0700875
876 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
877 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
878
879 // Now confirm we are synchronized again.
880 const ServerConnection *const pi1_connection =
881 pi1_server_statistics_fetcher->connections()->Get(0);
882 const ServerConnection *const pi2_connection =
883 pi2_server_statistics_fetcher->connections()->Get(0);
884
885 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800886 EXPECT_EQ(pi1_connection->connection_count(), 2u);
887 EXPECT_TRUE(pi1_connection->has_connected_since_time());
Austin Schuh5344c352020-04-12 17:04:26 -0700888 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
889 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
Austin Schuh367a7f42021-11-23 23:04:36 -0800890 chrono::milliseconds(1))
891 << ": " << FlatbufferToJson(pi1_connection);
Austin Schuh5344c352020-04-12 17:04:26 -0700892 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
Austin Schuh367a7f42021-11-23 23:04:36 -0800893 chrono::milliseconds(-1))
894 << ": " << FlatbufferToJson(pi1_connection);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800895 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700896
897 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800898 EXPECT_EQ(pi2_connection->connection_count(), 1u);
899 EXPECT_TRUE(pi2_connection->has_connected_since_time());
Austin Schuh5344c352020-04-12 17:04:26 -0700900 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
901 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
Austin Schuh367a7f42021-11-23 23:04:36 -0800902 chrono::milliseconds(1))
903 << ": " << FlatbufferToJson(pi2_connection);
Austin Schuh5344c352020-04-12 17:04:26 -0700904 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
Austin Schuh367a7f42021-11-23 23:04:36 -0800905 chrono::milliseconds(-1))
906 << ": " << FlatbufferToJson(pi2_connection);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800907 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800908
909 StopPi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700910 }
911
912 // Shut everyone else down
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800913 StopPi1Server();
914 StopPi1Client();
915 StopPi2Server();
916 StopPi1Test();
917 StopPi2Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700918}
919
920// Test that the server disconnecting triggers the server offsets on the other
921// side to clear, along with the other client.
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800922TEST_P(MessageBridgeParameterizedTest, ServerRestart) {
Austin Schuh5344c352020-04-12 17:04:26 -0700923 // This is rather annoying to set up. We need to start up a client and
924 // server, on the same node, but get them to think that they are on different
925 // nodes.
926 //
927 // We need the client to not post directly to "/test" like it would in a
928 // real system, otherwise we will re-send the ping message... So, use an
929 // application specific map to have the client post somewhere else.
930 //
931 // To top this all off, each of these needs to be done with a ShmEventLoop,
932 // which needs to run in a separate thread... And it is really hard to get
933 // everything started up reliably. So just be super generous on timeouts and
934 // hope for the best. We can be more generous in the future if we need to.
935 //
936 // We are faking the application names by passing in --application_name=foo
Austin Schuh5344c352020-04-12 17:04:26 -0700937 // Force ourselves to be "raspberrypi" and allocate everything.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800938 OnPi1();
939 MakePi1Server();
940 MakePi1Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700941
942 // And build the app for testing.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800943 MakePi1Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700944 aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800945 pi1_test_event_loop->MakeFetcher<ServerStatistics>("/pi1/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700946 aos::Fetcher<ClientStatistics> pi1_client_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800947 pi1_test_event_loop->MakeFetcher<ClientStatistics>("/pi1/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700948
949 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800950 OnPi2();
951 MakePi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700952
953 // And build the app for testing.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800954 MakePi2Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700955 aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800956 pi2_test_event_loop->MakeFetcher<ServerStatistics>("/pi2/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700957
958 // Start everything up. Pong is the only thing we don't know how to wait on,
959 // so start it first.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800960 StartPi1Test();
961 StartPi2Test();
962 StartPi1Server();
963 StartPi1Client();
964 StartPi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700965
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800966 // Confirm both client and server statistics messages have decent offsets in
967 // them.
Austin Schuh5344c352020-04-12 17:04:26 -0700968
969 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800970 MakePi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700971
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800972 RunPi2Server(chrono::milliseconds(3050));
Austin Schuh5344c352020-04-12 17:04:26 -0700973
974 // Now confirm we are synchronized.
975 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
976 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
977
978 const ServerConnection *const pi1_connection =
979 pi1_server_statistics_fetcher->connections()->Get(0);
980 const ServerConnection *const pi2_connection =
981 pi2_server_statistics_fetcher->connections()->Get(0);
982
983 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
984 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
985 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
986 chrono::milliseconds(1));
987 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
988 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800989 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh367a7f42021-11-23 23:04:36 -0800990 EXPECT_TRUE(pi1_connection->has_connected_since_time());
991 EXPECT_EQ(pi1_connection->connection_count(), 1u);
Austin Schuh5344c352020-04-12 17:04:26 -0700992
993 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
994 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
995 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
996 chrono::milliseconds(1));
997 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
998 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800999 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh367a7f42021-11-23 23:04:36 -08001000 EXPECT_TRUE(pi2_connection->has_connected_since_time());
1001 EXPECT_EQ(pi2_connection->connection_count(), 1u);
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001002
1003 StopPi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -07001004 }
1005
1006 std::this_thread::sleep_for(std::chrono::seconds(2));
1007
1008 {
1009 // And confirm we are unsynchronized.
1010 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1011 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1012
1013 const ServerConnection *const pi1_server_connection =
1014 pi1_server_statistics_fetcher->connections()->Get(0);
1015 const ClientConnection *const pi1_client_connection =
1016 pi1_client_statistics_fetcher->connections()->Get(0);
1017
1018 EXPECT_EQ(pi1_server_connection->state(), State::CONNECTED);
1019 EXPECT_FALSE(pi1_server_connection->has_monotonic_offset());
Austin Schuh367a7f42021-11-23 23:04:36 -08001020 EXPECT_TRUE(pi1_server_connection->has_connected_since_time());
1021 EXPECT_EQ(pi1_server_connection->connection_count(), 1u);
1022
Austin Schuh20ac95d2020-12-05 17:24:19 -08001023 EXPECT_TRUE(pi1_server_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -07001024 EXPECT_EQ(pi1_client_connection->state(), State::DISCONNECTED);
1025 EXPECT_FALSE(pi1_client_connection->has_monotonic_offset());
Austin Schuh367a7f42021-11-23 23:04:36 -08001026 EXPECT_FALSE(pi1_client_connection->has_connected_since_time());
1027 EXPECT_EQ(pi1_client_connection->connection_count(), 1u);
1028 EXPECT_FALSE(pi1_client_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -07001029 }
1030
1031 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001032 MakePi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -07001033
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001034 RunPi2Server(chrono::milliseconds(3050));
Austin Schuh5344c352020-04-12 17:04:26 -07001035
1036 // And confirm we are synchronized again.
1037 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1038 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
Austin Schuh367a7f42021-11-23 23:04:36 -08001039 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
Austin Schuh5344c352020-04-12 17:04:26 -07001040
1041 const ServerConnection *const pi1_connection =
1042 pi1_server_statistics_fetcher->connections()->Get(0);
1043 const ServerConnection *const pi2_connection =
1044 pi2_server_statistics_fetcher->connections()->Get(0);
Austin Schuh367a7f42021-11-23 23:04:36 -08001045 const ClientConnection *const pi1_client_connection =
1046 pi1_client_statistics_fetcher->connections()->Get(0);
Austin Schuh5344c352020-04-12 17:04:26 -07001047
1048 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
1049 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
1050 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
1051 chrono::milliseconds(1));
1052 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
1053 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -08001054 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -07001055
Austin Schuh367a7f42021-11-23 23:04:36 -08001056 EXPECT_EQ(pi1_client_connection->state(), State::CONNECTED);
1057 EXPECT_TRUE(pi1_client_connection->has_connected_since_time());
1058 EXPECT_EQ(pi1_client_connection->connection_count(), 2u);
1059 EXPECT_TRUE(pi1_client_connection->has_boot_uuid());
1060
Austin Schuh5344c352020-04-12 17:04:26 -07001061 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
1062 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
1063 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
1064 chrono::milliseconds(1));
1065 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
1066 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -08001067 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001068
1069 StopPi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -07001070 }
1071
1072 // Shut everyone else down
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001073 StopPi1Server();
1074 StopPi1Client();
1075 StopPi2Client();
1076 StopPi1Test();
1077 StopPi2Test();
Austin Schuh5344c352020-04-12 17:04:26 -07001078}
1079
Austin Schuh4889b182020-11-18 19:11:56 -08001080// TODO(austin): The above test confirms that the external state does the right
Austin Schuh5344c352020-04-12 17:04:26 -07001081// thing, but doesn't confirm that the internal state does. We either need to
1082// expose a way to check the state in a thread-safe way, or need a way to jump
1083// time for one node to do that.
1084
Austin Schuh4889b182020-11-18 19:11:56 -08001085void SendPing(aos::Sender<examples::Ping> *sender, int value) {
1086 aos::Sender<examples::Ping>::Builder builder = sender->MakeBuilder();
1087 examples::Ping::Builder ping_builder = builder.MakeBuilder<examples::Ping>();
1088 ping_builder.add_value(value);
milind1f1dca32021-07-03 13:50:07 -07001089 builder.CheckOk(builder.Send(ping_builder.Finish()));
Austin Schuh4889b182020-11-18 19:11:56 -08001090}
1091
1092// Tests that when a message is sent before the bridge starts up, but is
1093// configured as reliable, we forward it. Confirm this survives a client reset.
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001094TEST_P(MessageBridgeParameterizedTest, ReliableSentBeforeClientStartup) {
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001095 OnPi1();
Austin Schuh4889b182020-11-18 19:11:56 -08001096
1097 FLAGS_application_name = "sender";
Austin Schuhf466ab52021-02-16 22:00:38 -08001098 aos::ShmEventLoop send_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -08001099 aos::Sender<examples::Ping> ping_sender =
1100 send_event_loop.MakeSender<examples::Ping>("/test");
1101 SendPing(&ping_sender, 1);
1102 aos::Sender<examples::Ping> unreliable_ping_sender =
1103 send_event_loop.MakeSender<examples::Ping>("/unreliable");
1104 SendPing(&unreliable_ping_sender, 1);
1105
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001106 MakePi1Server();
1107 MakePi1Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001108
1109 FLAGS_application_name = "pi1_timestamp";
Austin Schuhf466ab52021-02-16 22:00:38 -08001110 aos::ShmEventLoop pi1_remote_timestamp_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -08001111
1112 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001113 OnPi2();
Austin Schuh4889b182020-11-18 19:11:56 -08001114
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001115 MakePi2Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001116
Austin Schuhf466ab52021-02-16 22:00:38 -08001117 aos::ShmEventLoop receive_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -08001118 aos::Fetcher<examples::Ping> ping_fetcher =
1119 receive_event_loop.MakeFetcher<examples::Ping>("/test");
1120 aos::Fetcher<examples::Ping> unreliable_ping_fetcher =
1121 receive_event_loop.MakeFetcher<examples::Ping>("/unreliable");
1122 aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
1123 receive_event_loop.MakeFetcher<ClientStatistics>("/pi2/aos");
1124
1125 const size_t ping_channel_index = configuration::ChannelIndex(
1126 receive_event_loop.configuration(), ping_fetcher.channel());
1127
1128 std::atomic<int> ping_timestamp_count{0};
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001129 const std::string channel_name =
1130 shared() ? "/pi1/aos/remote_timestamps/pi2"
1131 : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping";
Austin Schuh4889b182020-11-18 19:11:56 -08001132 pi1_remote_timestamp_event_loop.MakeWatcher(
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001133 channel_name, [this, channel_name, ping_channel_index,
1134 &ping_timestamp_count](const RemoteMessage &header) {
Austin Schuh61e973f2021-02-21 21:43:56 -08001135 VLOG(1) << channel_name << " RemoteMessage "
Austin Schuh0de30f32020-12-06 12:44:28 -08001136 << aos::FlatbufferToJson(&header);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001137 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001138 if (shared() && header.channel_index() != ping_channel_index) {
1139 return;
Austin Schuh4889b182020-11-18 19:11:56 -08001140 }
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001141 CHECK_EQ(header.channel_index(), ping_channel_index);
1142 ++ping_timestamp_count;
Austin Schuh4889b182020-11-18 19:11:56 -08001143 });
1144
1145 // Before everything starts up, confirm there is no message.
1146 EXPECT_FALSE(ping_fetcher.Fetch());
1147 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
1148
1149 // Spin up the persistant pieces.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001150 StartPi1Server();
1151 StartPi1Client();
1152 StartPi2Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001153
1154 // Event used to wait for the timestamp counting thread to start.
1155 aos::Event event;
1156 std::thread pi1_remote_timestamp_thread(
1157 [&pi1_remote_timestamp_event_loop, &event]() {
1158 pi1_remote_timestamp_event_loop.OnRun([&event]() { event.Set(); });
1159 pi1_remote_timestamp_event_loop.Run();
1160 });
1161
1162 event.Wait();
1163
1164 {
1165 // Now, spin up a client for 2 seconds.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001166 MakePi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001167
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001168 RunPi2Client(chrono::milliseconds(2050));
Austin Schuh4889b182020-11-18 19:11:56 -08001169
1170 // Confirm there is no detected duplicate packet.
1171 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1172 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1173 ->Get(0)
1174 ->duplicate_packets(),
1175 0u);
1176
Austin Schuhe61d4382021-03-31 21:33:02 -07001177 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1178 ->Get(0)
1179 ->partial_deliveries(),
1180 0u);
1181
Austin Schuh4889b182020-11-18 19:11:56 -08001182 EXPECT_TRUE(ping_fetcher.Fetch());
1183 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
1184 EXPECT_EQ(ping_timestamp_count, 1);
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001185
1186 StopPi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001187 }
1188
1189 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001190 // Now, spin up a client for 2 seconds.
1191 MakePi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001192
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001193 RunPi2Client(chrono::milliseconds(5050));
Austin Schuh4889b182020-11-18 19:11:56 -08001194
1195 // Confirm we detect the duplicate packet correctly.
1196 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1197 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1198 ->Get(0)
1199 ->duplicate_packets(),
1200 1u);
1201
Austin Schuhe61d4382021-03-31 21:33:02 -07001202 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1203 ->Get(0)
1204 ->partial_deliveries(),
1205 0u);
1206
Austin Schuh4889b182020-11-18 19:11:56 -08001207 EXPECT_EQ(ping_timestamp_count, 1);
1208 EXPECT_FALSE(ping_fetcher.Fetch());
1209 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001210
1211 StopPi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001212 }
1213
1214 // Shut everyone else down
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001215 StopPi1Client();
1216 StopPi2Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001217 pi1_remote_timestamp_event_loop.Exit();
1218 pi1_remote_timestamp_thread.join();
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001219 StopPi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001220}
1221
1222// Tests that when a message is sent before the bridge starts up, but is
1223// configured as reliable, we forward it. Confirm this works across server
1224// resets.
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001225TEST_P(MessageBridgeParameterizedTest, ReliableSentBeforeServerStartup) {
Austin Schuh4889b182020-11-18 19:11:56 -08001226 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001227 OnPi2();
Austin Schuh4889b182020-11-18 19:11:56 -08001228
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001229 MakePi2Server();
1230 MakePi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001231
Austin Schuhf466ab52021-02-16 22:00:38 -08001232 aos::ShmEventLoop receive_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -08001233 aos::Fetcher<examples::Ping> ping_fetcher =
1234 receive_event_loop.MakeFetcher<examples::Ping>("/test");
1235 aos::Fetcher<examples::Ping> unreliable_ping_fetcher =
1236 receive_event_loop.MakeFetcher<examples::Ping>("/unreliable");
1237 aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
1238 receive_event_loop.MakeFetcher<ClientStatistics>("/pi2/aos");
1239
Austin Schuh4889b182020-11-18 19:11:56 -08001240 // Force ourselves to be "raspberrypi" and allocate everything.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001241 OnPi1();
Austin Schuh4889b182020-11-18 19:11:56 -08001242
1243 FLAGS_application_name = "sender";
Austin Schuhf466ab52021-02-16 22:00:38 -08001244 aos::ShmEventLoop send_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -08001245 aos::Sender<examples::Ping> ping_sender =
1246 send_event_loop.MakeSender<examples::Ping>("/test");
1247 {
1248 aos::Sender<examples::Ping>::Builder builder = ping_sender.MakeBuilder();
1249 examples::Ping::Builder ping_builder =
1250 builder.MakeBuilder<examples::Ping>();
1251 ping_builder.add_value(1);
milind1f1dca32021-07-03 13:50:07 -07001252 builder.CheckOk(builder.Send(ping_builder.Finish()));
Austin Schuh4889b182020-11-18 19:11:56 -08001253 }
1254
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001255 MakePi1Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001256
1257 FLAGS_application_name = "pi1_timestamp";
Austin Schuhf466ab52021-02-16 22:00:38 -08001258 aos::ShmEventLoop pi1_remote_timestamp_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -08001259
1260 const size_t ping_channel_index = configuration::ChannelIndex(
1261 receive_event_loop.configuration(), ping_fetcher.channel());
1262
1263 std::atomic<int> ping_timestamp_count{0};
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001264 const std::string channel_name =
1265 shared() ? "/pi1/aos/remote_timestamps/pi2"
1266 : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping";
Austin Schuh4889b182020-11-18 19:11:56 -08001267 pi1_remote_timestamp_event_loop.MakeWatcher(
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001268 channel_name, [this, channel_name, ping_channel_index,
1269 &ping_timestamp_count](const RemoteMessage &header) {
1270 VLOG(1) << channel_name << " RemoteMessage "
Austin Schuh0de30f32020-12-06 12:44:28 -08001271 << aos::FlatbufferToJson(&header);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001272 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001273 if (shared() && header.channel_index() != ping_channel_index) {
1274 return;
Austin Schuh4889b182020-11-18 19:11:56 -08001275 }
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001276 CHECK_EQ(header.channel_index(), ping_channel_index);
1277 ++ping_timestamp_count;
Austin Schuh4889b182020-11-18 19:11:56 -08001278 });
1279
1280 // Before everything starts up, confirm there is no message.
1281 EXPECT_FALSE(ping_fetcher.Fetch());
1282 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
1283
1284 // Spin up the persistant pieces.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001285 StartPi1Client();
1286 StartPi2Server();
1287 StartPi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001288
1289 // Event used to wait for the timestamp counting thread to start.
1290 aos::Event event;
1291 std::thread pi1_remote_timestamp_thread(
1292 [&pi1_remote_timestamp_event_loop, &event]() {
1293 pi1_remote_timestamp_event_loop.OnRun([&event]() { event.Set(); });
1294 pi1_remote_timestamp_event_loop.Run();
1295 });
1296
1297 event.Wait();
1298
1299 {
1300 // Now, spin up a server for 2 seconds.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001301 MakePi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001302
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001303 RunPi1Server(chrono::milliseconds(2050));
Austin Schuh4889b182020-11-18 19:11:56 -08001304
1305 // Confirm there is no detected duplicate packet.
1306 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1307 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1308 ->Get(0)
1309 ->duplicate_packets(),
1310 0u);
1311
Austin Schuhe61d4382021-03-31 21:33:02 -07001312 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1313 ->Get(0)
1314 ->partial_deliveries(),
1315 0u);
1316
Austin Schuh4889b182020-11-18 19:11:56 -08001317 EXPECT_TRUE(ping_fetcher.Fetch());
1318 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
1319 EXPECT_EQ(ping_timestamp_count, 1);
1320 LOG(INFO) << "Shutting down first pi1 MessageBridgeServer";
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001321
1322 StopPi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001323 }
1324
1325 {
1326 // Now, spin up a second server for 2 seconds.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001327 MakePi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001328
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001329 RunPi1Server(chrono::milliseconds(2050));
Austin Schuh4889b182020-11-18 19:11:56 -08001330
1331 // Confirm we detect the duplicate packet correctly.
1332 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1333 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1334 ->Get(0)
1335 ->duplicate_packets(),
1336 1u);
1337
Austin Schuhe61d4382021-03-31 21:33:02 -07001338 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1339 ->Get(0)
1340 ->partial_deliveries(),
1341 0u);
1342
Austin Schuh4889b182020-11-18 19:11:56 -08001343 EXPECT_EQ(ping_timestamp_count, 1);
1344 EXPECT_FALSE(ping_fetcher.Fetch());
1345 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001346
1347 StopPi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001348 }
1349
1350 // Shut everyone else down
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001351 StopPi1Client();
1352 StopPi2Server();
1353 StopPi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001354 pi1_remote_timestamp_event_loop.Exit();
1355 pi1_remote_timestamp_thread.join();
Austin Schuh4889b182020-11-18 19:11:56 -08001356}
1357
Austin Schuh89f23e32023-05-15 17:06:43 -07001358// Test that a client which connects with too big a message gets disconnected
1359// without crashing.
1360TEST_P(MessageBridgeParameterizedTest, TooBigConnect) {
Austin Schuhb0e439d2023-05-15 10:55:40 -07001361 // This is rather annoying to set up. We need to start up a client and
1362 // server, on the same node, but get them to think that they are on different
1363 // nodes.
1364 //
1365 // We need the client to not post directly to "/test" like it would in a
1366 // real system, otherwise we will re-send the ping message... So, use an
1367 // application specific map to have the client post somewhere else.
1368 //
1369 // To top this all off, each of these needs to be done with a ShmEventLoop,
1370 // which needs to run in a separate thread... And it is really hard to get
1371 // everything started up reliably. So just be super generous on timeouts and
1372 // hope for the best. We can be more generous in the future if we need to.
1373 //
1374 // We are faking the application names by passing in --application_name=foo
1375 OnPi1();
1376
Austin Schuh530f9ee2023-05-15 14:29:31 -07001377 MakePi1Server(
1378 "dummy sha256 ");
Austin Schuhb0e439d2023-05-15 10:55:40 -07001379 MakePi1Client();
1380
1381 // And build the app for testing.
1382 MakePi1Test();
1383 aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
1384 pi1_test_event_loop->MakeFetcher<ServerStatistics>("/pi1/aos");
1385 aos::Fetcher<ClientStatistics> pi1_client_statistics_fetcher =
1386 pi1_test_event_loop->MakeFetcher<ClientStatistics>("/pi1/aos");
1387
1388 // Now do it for "raspberrypi2", the client.
1389 OnPi2();
1390 MakePi2Server();
1391
1392 // And build the app for testing.
1393 MakePi2Test();
1394 aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
1395 pi2_test_event_loop->MakeFetcher<ServerStatistics>("/pi2/aos");
1396 aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
1397 pi2_test_event_loop->MakeFetcher<ClientStatistics>("/pi2/aos");
1398
1399 // Wait until we are connected, then send.
1400
1401 StartPi1Test();
1402 StartPi2Test();
1403 StartPi1Server();
1404 StartPi1Client();
1405 StartPi2Server();
1406
1407 {
Austin Schuh89f23e32023-05-15 17:06:43 -07001408 // Now, spin up a SctpClient and send a massive hunk of data. This should
1409 // trigger a disconnect, but no crash.
1410 OnPi2();
1411 FLAGS_application_name = "pi2_message_bridge_client";
1412 pi2_client_event_loop =
1413 std::make_unique<aos::ShmEventLoop>(&config.message());
1414 pi2_client_event_loop->SetRuntimeRealtimePriority(1);
1415
1416 const aos::Node *const remote_node = CHECK_NOTNULL(
1417 configuration::GetNode(pi2_client_event_loop->configuration(), "pi1"));
1418
1419 const aos::FlatbufferDetachedBuffer<aos::message_bridge::Connect>
1420 connect_message(MakeConnectMessage(
1421 pi2_client_event_loop->configuration(),
1422 pi2_client_event_loop->node(), "pi1",
1423 pi2_client_event_loop->boot_uuid(), config_sha256));
1424
1425 SctpClient client(remote_node->hostname()->string_view(),
1426 remote_node->port(),
1427 connect_message.message().channels_to_transfer()->size() +
1428 kControlStreams(),
1429 "");
1430
1431 client.SetMaxReadSize(100000);
1432 client.SetMaxWriteSize(100000);
1433
1434 client.SetPoolSize(2u);
1435
1436 const std::string big_data(100000, 'a');
1437
1438 pi2_client_event_loop->epoll()->OnReadable(client.fd(), [&]() {
1439 aos::unique_c_ptr<Message> message = client.Read();
1440 client.FreeMessage(std::move(message));
1441 });
1442
1443 aos::TimerHandler *const send_big_message = pi2_client_event_loop->AddTimer(
1444 [&]() { CHECK(client.Send(kConnectStream(), big_data, 0)); });
1445
1446 pi2_client_event_loop->OnRun([this, send_big_message]() {
1447 send_big_message->Schedule(pi2_client_event_loop->monotonic_now() +
1448 chrono::seconds(1));
1449 });
Austin Schuhb0e439d2023-05-15 10:55:40 -07001450
1451 RunPi2Client(chrono::milliseconds(3050));
1452
1453 // Now confirm we are synchronized.
1454 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1455 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1456 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
Austin Schuh89f23e32023-05-15 17:06:43 -07001457 EXPECT_FALSE(pi2_client_statistics_fetcher.Fetch());
Austin Schuhb0e439d2023-05-15 10:55:40 -07001458
1459 const ServerConnection *const pi1_connection =
1460 pi1_server_statistics_fetcher->connections()->Get(0);
1461 const ClientConnection *const pi1_client_connection =
1462 pi1_client_statistics_fetcher->connections()->Get(0);
1463 const ServerConnection *const pi2_connection =
1464 pi2_server_statistics_fetcher->connections()->Get(0);
Austin Schuhb0e439d2023-05-15 10:55:40 -07001465
1466 // Make sure one direction is disconnected with a bunch of connection
1467 // attempts and failures.
1468 EXPECT_EQ(pi1_connection->state(), State::DISCONNECTED);
1469 EXPECT_EQ(pi1_connection->connection_count(), 0u);
Austin Schuh89f23e32023-05-15 17:06:43 -07001470 EXPECT_GE(pi1_server_statistics_fetcher->invalid_connection_count(), 1u);
Austin Schuhb0e439d2023-05-15 10:55:40 -07001471
1472 // And the other direction is happy.
1473 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
1474 EXPECT_EQ(pi2_connection->connection_count(), 1u);
1475 EXPECT_TRUE(pi2_connection->has_connected_since_time());
1476 EXPECT_FALSE(pi2_connection->has_monotonic_offset());
1477 EXPECT_TRUE(pi2_connection->has_boot_uuid());
1478
1479 EXPECT_EQ(pi1_client_connection->state(), State::CONNECTED);
1480 EXPECT_EQ(pi1_client_connection->connection_count(), 1u);
1481
1482 VLOG(1) << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1483 VLOG(1) << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
Austin Schuhb0e439d2023-05-15 10:55:40 -07001484 VLOG(1) << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1485
Austin Schuh89f23e32023-05-15 17:06:43 -07001486 pi2_client_event_loop->epoll()->DeleteFd(client.fd());
1487
Austin Schuhb0e439d2023-05-15 10:55:40 -07001488 StopPi2Client();
1489 }
1490
1491 // Shut everyone else down
1492 StopPi1Server();
1493 StopPi1Client();
1494 StopPi2Server();
1495 StopPi1Test();
1496 StopPi2Test();
1497}
1498
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -07001499INSTANTIATE_TEST_SUITE_P(
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001500 MessageBridgeTests, MessageBridgeParameterizedTest,
1501 ::testing::Values(
1502 Param{"message_bridge_test_combined_timestamps_common_config.json",
1503 true},
1504 Param{"message_bridge_test_common_config.json", false}));
1505
Austin Schuhe84c3ed2019-12-14 15:29:48 -08001506} // namespace testing
1507} // namespace message_bridge
1508} // namespace aos