blob: d2a9b050151f640ddd4fd4f00679e075a2858506 [file] [log] [blame]
Austin Schuhe84c3ed2019-12-14 15:29:48 -08001#include <chrono>
2#include <thread>
3
Austin Schuh2f8fd752020-09-01 22:38:28 -07004#include "absl/strings/str_cat.h"
Austin Schuhe84c3ed2019-12-14 15:29:48 -08005#include "aos/events/ping_generated.h"
6#include "aos/events/pong_generated.h"
Brian Silverman7b266d92021-02-17 21:24:02 -08007#include "aos/ipc_lib/event.h"
Austin Schuhe84c3ed2019-12-14 15:29:48 -08008#include "aos/network/message_bridge_client_lib.h"
9#include "aos/network/message_bridge_server_lib.h"
Jim Ostrowski2192ddb2020-06-24 19:07:31 -070010#include "aos/network/team_number.h"
Austin Schuh373f1762021-06-02 21:07:09 -070011#include "aos/testing/path.h"
Austin Schuhe991fe22020-11-18 16:53:39 -080012#include "aos/util/file.h"
Brian Silverman7b266d92021-02-17 21:24:02 -080013#include "gtest/gtest.h"
Austin Schuhe84c3ed2019-12-14 15:29:48 -080014
Austin Schuh8902fa52021-03-14 22:39:24 -070015DECLARE_string(boot_uuid);
16
Austin Schuhe84c3ed2019-12-14 15:29:48 -080017namespace aos {
Austin Schuh2f8fd752020-09-01 22:38:28 -070018void SetShmBase(const std::string_view base);
19
Austin Schuhe84c3ed2019-12-14 15:29:48 -080020namespace message_bridge {
21namespace testing {
22
Austin Schuh373f1762021-06-02 21:07:09 -070023using aos::testing::ArtifactPath;
24
Austin Schuhe84c3ed2019-12-14 15:29:48 -080025namespace chrono = std::chrono;
26
Austin Schuhe991fe22020-11-18 16:53:39 -080027std::string ShmBase(const std::string_view node) {
Austin Schuh2f8fd752020-09-01 22:38:28 -070028 const char *tmpdir_c_str = getenv("TEST_TMPDIR");
29 if (tmpdir_c_str != nullptr) {
Austin Schuhe991fe22020-11-18 16:53:39 -080030 return absl::StrCat(tmpdir_c_str, "/", node);
Austin Schuh2f8fd752020-09-01 22:38:28 -070031 } else {
Austin Schuhe991fe22020-11-18 16:53:39 -080032 return absl::StrCat("/dev/shm/", node);
Austin Schuh2f8fd752020-09-01 22:38:28 -070033 }
34}
35
Austin Schuhe991fe22020-11-18 16:53:39 -080036void DoSetShmBase(const std::string_view node) {
37 aos::SetShmBase(ShmBase(node));
38}
39
Austin Schuh36a2c3e2021-02-18 22:28:38 -080040// Parameters to run all the tests with.
41struct Param {
42 // The config file to use.
43 std::string config;
44 // If true, the RemoteMessage channel should be shared between all the remote
45 // channels. If false, there will be 1 RemoteMessage channel per remote
46 // channel.
47 bool shared;
48};
49
50class MessageBridgeParameterizedTest
51 : public ::testing::TestWithParam<struct Param> {
Austin Schuh0de30f32020-12-06 12:44:28 -080052 public:
Austin Schuh36a2c3e2021-02-18 22:28:38 -080053 MessageBridgeParameterizedTest()
54 : config(aos::configuration::ReadConfig(
Austin Schuh373f1762021-06-02 21:07:09 -070055 ArtifactPath(absl::StrCat("aos/network/", GetParam().config)))),
Austin Schuh8902fa52021-03-14 22:39:24 -070056 pi1_boot_uuid_(UUID::Random()),
57 pi2_boot_uuid_(UUID::Random()) {
Austin Schuh0de30f32020-12-06 12:44:28 -080058 util::UnlinkRecursive(ShmBase("pi1"));
59 util::UnlinkRecursive(ShmBase("pi2"));
60 }
Austin Schuhe991fe22020-11-18 16:53:39 -080061
Austin Schuh36a2c3e2021-02-18 22:28:38 -080062 bool shared() const { return GetParam().shared; }
63
Austin Schuh0a2f12f2021-01-08 22:48:29 -080064 void OnPi1() {
65 DoSetShmBase("pi1");
66 FLAGS_override_hostname = "raspberrypi";
Austin Schuh8902fa52021-03-14 22:39:24 -070067 FLAGS_boot_uuid = pi1_boot_uuid_.ToString();
Austin Schuh0a2f12f2021-01-08 22:48:29 -080068 }
69
70 void OnPi2() {
71 DoSetShmBase("pi2");
72 FLAGS_override_hostname = "raspberrypi2";
Austin Schuh8902fa52021-03-14 22:39:24 -070073 FLAGS_boot_uuid = pi2_boot_uuid_.ToString();
Austin Schuh0a2f12f2021-01-08 22:48:29 -080074 }
75
76 void MakePi1Server() {
77 OnPi1();
78 FLAGS_application_name = "pi1_message_bridge_server";
79 pi1_server_event_loop =
Austin Schuhf466ab52021-02-16 22:00:38 -080080 std::make_unique<aos::ShmEventLoop>(&config.message());
Austin Schuh0a2f12f2021-01-08 22:48:29 -080081 pi1_server_event_loop->SetRuntimeRealtimePriority(1);
82 pi1_message_bridge_server =
83 std::make_unique<MessageBridgeServer>(pi1_server_event_loop.get());
84 }
85
86 void RunPi1Server(chrono::nanoseconds duration) {
87 // Setup a shutdown callback.
88 aos::TimerHandler *const quit = pi1_server_event_loop->AddTimer(
89 [this]() { pi1_server_event_loop->Exit(); });
90 pi1_server_event_loop->OnRun([this, quit, duration]() {
91 // Stop between timestamps, not exactly on them.
92 quit->Setup(pi1_server_event_loop->monotonic_now() + duration);
93 });
94
95 pi1_server_event_loop->Run();
96 }
97
98 void StartPi1Server() {
99 pi1_server_thread = std::thread([this]() {
100 LOG(INFO) << "Started pi1_message_bridge_server";
101 pi1_server_event_loop->Run();
102 });
103 }
104
105 void StopPi1Server() {
106 if (pi1_server_thread.joinable()) {
107 pi1_server_event_loop->Exit();
108 pi1_server_thread.join();
109 pi1_server_thread = std::thread();
110 }
111 pi1_message_bridge_server.reset();
112 pi1_server_event_loop.reset();
113 }
114
115 void MakePi1Client() {
116 OnPi1();
117 FLAGS_application_name = "pi1_message_bridge_client";
118 pi1_client_event_loop =
Austin Schuhf466ab52021-02-16 22:00:38 -0800119 std::make_unique<aos::ShmEventLoop>(&config.message());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800120 pi1_client_event_loop->SetRuntimeRealtimePriority(1);
121 pi1_message_bridge_client =
122 std::make_unique<MessageBridgeClient>(pi1_client_event_loop.get());
123 }
124
125 void StartPi1Client() {
126 pi1_client_thread = std::thread([this]() {
127 LOG(INFO) << "Started pi1_message_bridge_client";
128 pi1_client_event_loop->Run();
129 });
130 }
131
132 void StopPi1Client() {
133 pi1_client_event_loop->Exit();
134 pi1_client_thread.join();
135 pi1_client_thread = std::thread();
136 pi1_message_bridge_client.reset();
137 pi1_client_event_loop.reset();
138 }
139
140 void MakePi1Test() {
141 OnPi1();
142 FLAGS_application_name = "test1";
143 pi1_test_event_loop =
Austin Schuhf466ab52021-02-16 22:00:38 -0800144 std::make_unique<aos::ShmEventLoop>(&config.message());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800145
146 pi1_test_event_loop->MakeWatcher(
147 "/pi1/aos", [](const ServerStatistics &stats) {
148 VLOG(1) << "/pi1/aos ServerStatistics " << FlatbufferToJson(&stats);
149 });
150
151 pi1_test_event_loop->MakeWatcher(
152 "/pi1/aos", [](const ClientStatistics &stats) {
153 VLOG(1) << "/pi1/aos ClientStatistics " << FlatbufferToJson(&stats);
154 });
155
156 pi1_test_event_loop->MakeWatcher(
157 "/pi1/aos", [](const Timestamp &timestamp) {
158 VLOG(1) << "/pi1/aos Timestamp " << FlatbufferToJson(&timestamp);
159 });
Austin Schuh8902fa52021-03-14 22:39:24 -0700160 pi1_test_event_loop->MakeWatcher(
161 "/pi2/aos", [this](const Timestamp &timestamp) {
162 VLOG(1) << "/pi2/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuha9012be2021-07-21 15:19:11 -0700163 EXPECT_EQ(pi1_test_event_loop->context().source_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -0700164 pi2_boot_uuid_);
165 });
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800166 }
167
168 void StartPi1Test() {
169 pi1_test_thread = std::thread([this]() {
170 LOG(INFO) << "Started pi1_test";
171 pi1_test_event_loop->Run();
172 });
173 }
174
175 void StopPi1Test() {
176 pi1_test_event_loop->Exit();
177 pi1_test_thread.join();
178 }
179
180 void MakePi2Server() {
181 OnPi2();
182 FLAGS_application_name = "pi2_message_bridge_server";
183 pi2_server_event_loop =
Austin Schuhf466ab52021-02-16 22:00:38 -0800184 std::make_unique<aos::ShmEventLoop>(&config.message());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800185 pi2_server_event_loop->SetRuntimeRealtimePriority(1);
186 pi2_message_bridge_server =
187 std::make_unique<MessageBridgeServer>(pi2_server_event_loop.get());
188 }
189
190 void RunPi2Server(chrono::nanoseconds duration) {
191 // Setup a shutdown callback.
192 aos::TimerHandler *const quit = pi2_server_event_loop->AddTimer(
193 [this]() { pi2_server_event_loop->Exit(); });
194 pi2_server_event_loop->OnRun([this, quit, duration]() {
195 // Stop between timestamps, not exactly on them.
196 quit->Setup(pi2_server_event_loop->monotonic_now() + duration);
197 });
198
199 pi2_server_event_loop->Run();
200 }
201
202 void StartPi2Server() {
203 pi2_server_thread = std::thread([this]() {
204 LOG(INFO) << "Started pi2_message_bridge_server";
205 pi2_server_event_loop->Run();
206 });
207 }
208
209 void StopPi2Server() {
210 if (pi2_server_thread.joinable()) {
211 pi2_server_event_loop->Exit();
212 pi2_server_thread.join();
213 pi2_server_thread = std::thread();
214 }
215 pi2_message_bridge_server.reset();
216 pi2_server_event_loop.reset();
217 }
218
219 void MakePi2Client() {
220 OnPi2();
221 FLAGS_application_name = "pi2_message_bridge_client";
222 pi2_client_event_loop =
Austin Schuhf466ab52021-02-16 22:00:38 -0800223 std::make_unique<aos::ShmEventLoop>(&config.message());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800224 pi2_client_event_loop->SetRuntimeRealtimePriority(1);
225 pi2_message_bridge_client =
226 std::make_unique<MessageBridgeClient>(pi2_client_event_loop.get());
227 }
228
229 void RunPi2Client(chrono::nanoseconds duration) {
230 // Run for 5 seconds to make sure we have time to estimate the offset.
231 aos::TimerHandler *const quit = pi2_client_event_loop->AddTimer(
232 [this]() { pi2_client_event_loop->Exit(); });
233 pi2_client_event_loop->OnRun([this, quit, duration]() {
234 // Stop between timestamps, not exactly on them.
235 quit->Setup(pi2_client_event_loop->monotonic_now() + duration);
236 });
237
238 // And go!
239 pi2_client_event_loop->Run();
240 }
241
242 void StartPi2Client() {
243 pi2_client_thread = std::thread([this]() {
244 LOG(INFO) << "Started pi2_message_bridge_client";
245 pi2_client_event_loop->Run();
246 });
247 }
248
249 void StopPi2Client() {
250 if (pi2_client_thread.joinable()) {
251 pi2_client_event_loop->Exit();
252 pi2_client_thread.join();
253 pi2_client_thread = std::thread();
254 }
255 pi2_message_bridge_client.reset();
256 pi2_client_event_loop.reset();
257 }
258
259 void MakePi2Test() {
260 OnPi2();
261 FLAGS_application_name = "test2";
262 pi2_test_event_loop =
Austin Schuhf466ab52021-02-16 22:00:38 -0800263 std::make_unique<aos::ShmEventLoop>(&config.message());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800264
265 pi2_test_event_loop->MakeWatcher(
266 "/pi2/aos", [](const ServerStatistics &stats) {
267 VLOG(1) << "/pi2/aos ServerStatistics " << FlatbufferToJson(&stats);
268 });
269
270 pi2_test_event_loop->MakeWatcher(
271 "/pi2/aos", [](const ClientStatistics &stats) {
272 VLOG(1) << "/pi2/aos ClientStatistics " << FlatbufferToJson(&stats);
273 });
274
275 pi2_test_event_loop->MakeWatcher(
Austin Schuh8902fa52021-03-14 22:39:24 -0700276 "/pi1/aos", [this](const Timestamp &timestamp) {
277 VLOG(1) << "/pi1/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuha9012be2021-07-21 15:19:11 -0700278 EXPECT_EQ(pi2_test_event_loop->context().source_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -0700279 pi1_boot_uuid_);
280 });
281 pi2_test_event_loop->MakeWatcher(
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800282 "/pi2/aos", [](const Timestamp &timestamp) {
283 VLOG(1) << "/pi2/aos Timestamp " << FlatbufferToJson(&timestamp);
284 });
285 }
286
287 void StartPi2Test() {
288 pi2_test_thread = std::thread([this]() {
289 LOG(INFO) << "Started pi2_message_bridge_test";
290 pi2_test_event_loop->Run();
291 });
292 }
293
294 void StopPi2Test() {
295 pi2_test_event_loop->Exit();
296 pi2_test_thread.join();
297 }
298
Austin Schuhf466ab52021-02-16 22:00:38 -0800299 aos::FlatbufferDetachedBuffer<aos::Configuration> config;
Austin Schuh8902fa52021-03-14 22:39:24 -0700300 const UUID pi1_boot_uuid_;
301 const UUID pi2_boot_uuid_;
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800302
303 std::unique_ptr<aos::ShmEventLoop> pi1_server_event_loop;
304 std::unique_ptr<MessageBridgeServer> pi1_message_bridge_server;
305 std::thread pi1_server_thread;
306
307 std::unique_ptr<aos::ShmEventLoop> pi1_client_event_loop;
308 std::unique_ptr<MessageBridgeClient> pi1_message_bridge_client;
309 std::thread pi1_client_thread;
310
311 std::unique_ptr<aos::ShmEventLoop> pi1_test_event_loop;
312 std::thread pi1_test_thread;
313
314 std::unique_ptr<aos::ShmEventLoop> pi2_server_event_loop;
315 std::unique_ptr<MessageBridgeServer> pi2_message_bridge_server;
316 std::thread pi2_server_thread;
317
318 std::unique_ptr<aos::ShmEventLoop> pi2_client_event_loop;
319 std::unique_ptr<MessageBridgeClient> pi2_message_bridge_client;
320 std::thread pi2_client_thread;
321
322 std::unique_ptr<aos::ShmEventLoop> pi2_test_event_loop;
323 std::thread pi2_test_thread;
Austin Schuhe991fe22020-11-18 16:53:39 -0800324};
325
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800326// Test that we can send a ping message over sctp and receive it.
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800327TEST_P(MessageBridgeParameterizedTest, PingPong) {
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800328 // This is rather annoying to set up. We need to start up a client and
329 // server, on the same node, but get them to think that they are on different
330 // nodes.
331 //
332 // We then get to wait until they are connected.
333 //
334 // After they are connected, we send a Ping message.
335 //
336 // On the other end, we receive a Pong message.
337 //
338 // But, we need the client to not post directly to "/test" like it would in a
339 // real system, otherwise we will re-send the ping message... So, use an
340 // application specific map to have the client post somewhere else.
341 //
342 // To top this all off, each of these needs to be done with a ShmEventLoop,
343 // which needs to run in a separate thread... And it is really hard to get
344 // everything started up reliably. So just be super generous on timeouts and
345 // hope for the best. We can be more generous in the future if we need to.
346 //
347 // We are faking the application names by passing in --application_name=foo
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800348 OnPi1();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800349 // Force ourselves to be "raspberrypi" and allocate everything.
Austin Schuh2f8fd752020-09-01 22:38:28 -0700350
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800351 MakePi1Server();
352 MakePi1Client();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800353
354 // And build the app which sends the pings.
355 FLAGS_application_name = "ping";
Austin Schuhf466ab52021-02-16 22:00:38 -0800356 aos::ShmEventLoop ping_event_loop(&config.message());
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800357 aos::Sender<examples::Ping> ping_sender =
358 ping_event_loop.MakeSender<examples::Ping>("/test");
359
Austin Schuhf466ab52021-02-16 22:00:38 -0800360 aos::ShmEventLoop pi1_test_event_loop(&config.message());
Austin Schuh0de30f32020-12-06 12:44:28 -0800361 aos::Fetcher<RemoteMessage> message_header_fetcher1 =
362 pi1_test_event_loop.MakeFetcher<RemoteMessage>(
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800363 shared() ? "/pi1/aos/remote_timestamps/pi2"
364 : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping");
Austin Schuh2f8fd752020-09-01 22:38:28 -0700365
366 // Fetchers for confirming the remote timestamps made it.
367 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
368 ping_event_loop.MakeFetcher<examples::Ping>("/test");
369 aos::Fetcher<Timestamp> pi1_on_pi1_timestamp_fetcher =
370 ping_event_loop.MakeFetcher<Timestamp>("/aos");
371
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800372 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800373 OnPi2();
Austin Schuh2f8fd752020-09-01 22:38:28 -0700374
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800375 MakePi2Client();
376 MakePi2Server();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800377
378 // And build the app which sends the pongs.
379 FLAGS_application_name = "pong";
Austin Schuhf466ab52021-02-16 22:00:38 -0800380 aos::ShmEventLoop pong_event_loop(&config.message());
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800381
Austin Schuh7bc59052020-02-16 23:48:33 -0800382 // And build the app for testing.
383 FLAGS_application_name = "test";
Austin Schuhf466ab52021-02-16 22:00:38 -0800384 aos::ShmEventLoop test_event_loop(&config.message());
Austin Schuh7bc59052020-02-16 23:48:33 -0800385
386 aos::Fetcher<ClientStatistics> client_statistics_fetcher =
387 test_event_loop.MakeFetcher<ClientStatistics>("/aos");
Austin Schuh0de30f32020-12-06 12:44:28 -0800388 aos::Fetcher<RemoteMessage> message_header_fetcher2 =
389 test_event_loop.MakeFetcher<RemoteMessage>(
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800390 shared() ? "/pi2/aos/remote_timestamps/pi1"
391 : "/pi2/aos/remote_timestamps/pi1/pi2/aos/"
392 "aos-message_bridge-Timestamp");
Austin Schuh2f8fd752020-09-01 22:38:28 -0700393
394 // Event loop for fetching data delivered to pi2 from pi1 to match up
395 // messages.
Austin Schuhf466ab52021-02-16 22:00:38 -0800396 aos::ShmEventLoop delivered_messages_event_loop(&config.message());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700397 aos::Fetcher<Timestamp> pi1_on_pi2_timestamp_fetcher =
398 delivered_messages_event_loop.MakeFetcher<Timestamp>("/pi1/aos");
399 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
400 delivered_messages_event_loop.MakeFetcher<examples::Ping>("/test");
401 EXPECT_FALSE(ping_on_pi2_fetcher.Fetch());
402 EXPECT_FALSE(pi1_on_pi2_timestamp_fetcher.Fetch());
Austin Schuh7bc59052020-02-16 23:48:33 -0800403
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800404 // Count the pongs.
405 int pong_count = 0;
Austin Schuh8902fa52021-03-14 22:39:24 -0700406 pong_event_loop.MakeWatcher("/test", [&pong_count, &pong_event_loop,
407 this](const examples::Ping &ping) {
Austin Schuha9012be2021-07-21 15:19:11 -0700408 EXPECT_EQ(pong_event_loop.context().source_boot_uuid, pi1_boot_uuid_);
Austin Schuh8902fa52021-03-14 22:39:24 -0700409 ++pong_count;
410 VLOG(1) << "Got ping back " << FlatbufferToJson(&ping);
411 });
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800412
413 FLAGS_override_hostname = "";
414
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800415 // Wait until we are connected, then send.
416 int ping_count = 0;
Austin Schuh7bc59052020-02-16 23:48:33 -0800417 int pi1_server_statistics_count = 0;
Austin Schuh61e973f2021-02-21 21:43:56 -0800418 ping_event_loop.MakeWatcher("/pi1/aos", [this, &ping_count, &ping_sender,
419 &pi1_server_statistics_count](
420 const ServerStatistics &stats) {
421 VLOG(1) << "/pi1/aos ServerStatistics " << FlatbufferToJson(&stats);
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800422
Austin Schuh61e973f2021-02-21 21:43:56 -0800423 ASSERT_TRUE(stats.has_connections());
424 EXPECT_EQ(stats.connections()->size(), 1);
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800425
Austin Schuh61e973f2021-02-21 21:43:56 -0800426 bool connected = false;
427 for (const ServerConnection *connection : *stats.connections()) {
428 // Confirm that we are estimating the server time offset correctly. It
429 // should be about 0 since we are on the same machine here.
430 if (connection->has_monotonic_offset()) {
431 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
432 chrono::milliseconds(1));
433 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
434 chrono::milliseconds(-1));
435 ++pi1_server_statistics_count;
436 }
Austin Schuh7bc59052020-02-16 23:48:33 -0800437
Austin Schuh61e973f2021-02-21 21:43:56 -0800438 if (connection->node()->name()->string_view() ==
439 pi2_client_event_loop->node()->name()->string_view()) {
440 if (connection->state() == State::CONNECTED) {
441 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh367a7f42021-11-23 23:04:36 -0800442 EXPECT_EQ(connection->connection_count(), 1u);
443 EXPECT_LT(monotonic_clock::time_point(chrono::nanoseconds(
444 connection->connected_since_time())),
445 monotonic_clock::now());
Austin Schuh61e973f2021-02-21 21:43:56 -0800446 connected = true;
Austin Schuh367a7f42021-11-23 23:04:36 -0800447 } else {
448 EXPECT_FALSE(connection->has_connection_count());
449 EXPECT_FALSE(connection->has_connected_since_time());
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800450 }
Austin Schuh61e973f2021-02-21 21:43:56 -0800451 }
452 }
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800453
Austin Schuh61e973f2021-02-21 21:43:56 -0800454 if (connected) {
455 VLOG(1) << "Connected! Sent ping.";
456 auto builder = ping_sender.MakeBuilder();
457 examples::Ping::Builder ping_builder =
458 builder.MakeBuilder<examples::Ping>();
459 ping_builder.add_value(ping_count + 971);
milind1f1dca32021-07-03 13:50:07 -0700460 EXPECT_EQ(builder.Send(ping_builder.Finish()),
461 RawSender::Error::kOk);
Austin Schuh61e973f2021-02-21 21:43:56 -0800462 ++ping_count;
463 }
464 });
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800465
Austin Schuh7bc59052020-02-16 23:48:33 -0800466 // Confirm both client and server statistics messages have decent offsets in
467 // them.
468 int pi2_server_statistics_count = 0;
Austin Schuh196a4452020-03-15 23:12:03 -0700469 pong_event_loop.MakeWatcher("/pi2/aos", [&pi2_server_statistics_count](
Austin Schuh7bc59052020-02-16 23:48:33 -0800470 const ServerStatistics &stats) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800471 VLOG(1) << "/pi2/aos ServerStatistics " << FlatbufferToJson(&stats);
Austin Schuh7bc59052020-02-16 23:48:33 -0800472 for (const ServerConnection *connection : *stats.connections()) {
473 if (connection->has_monotonic_offset()) {
474 ++pi2_server_statistics_count;
475 // Confirm that we are estimating the server time offset correctly. It
476 // should be about 0 since we are on the same machine here.
477 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
478 chrono::milliseconds(1));
479 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
480 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800481 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh7bc59052020-02-16 23:48:33 -0800482 }
Austin Schuh367a7f42021-11-23 23:04:36 -0800483
484 if (connection->state() == State::CONNECTED) {
485 EXPECT_EQ(connection->connection_count(), 1u);
486 EXPECT_LT(monotonic_clock::time_point(
487 chrono::nanoseconds(connection->connected_since_time())),
488 monotonic_clock::now());
489 } else {
490 EXPECT_FALSE(connection->has_connection_count());
491 EXPECT_FALSE(connection->has_connected_since_time());
492 }
Austin Schuh7bc59052020-02-16 23:48:33 -0800493 }
494 });
495
496 int pi1_client_statistics_count = 0;
Austin Schuh367a7f42021-11-23 23:04:36 -0800497 int pi1_connected_client_statistics_count = 0;
498 ping_event_loop.MakeWatcher(
499 "/pi1/aos",
500 [&pi1_client_statistics_count,
501 &pi1_connected_client_statistics_count](const ClientStatistics &stats) {
502 VLOG(1) << "/pi1/aos ClientStatistics " << FlatbufferToJson(&stats);
Austin Schuh7bc59052020-02-16 23:48:33 -0800503
Austin Schuh367a7f42021-11-23 23:04:36 -0800504 for (const ClientConnection *connection : *stats.connections()) {
505 if (connection->has_monotonic_offset()) {
506 ++pi1_client_statistics_count;
507 // It takes at least 10 microseconds to send a message between the
508 // client and server. The min (filtered) time shouldn't be over 10
509 // milliseconds on localhost. This might have to bump up if this is
510 // proving flaky.
511 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
512 chrono::milliseconds(10))
513 << " " << connection->monotonic_offset()
514 << "ns vs 10000ns on iteration " << pi1_client_statistics_count;
515 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
516 chrono::microseconds(10))
517 << " " << connection->monotonic_offset()
518 << "ns vs 10000ns on iteration " << pi1_client_statistics_count;
519 }
520 if (connection->state() == State::CONNECTED) {
521 EXPECT_EQ(connection->connection_count(), 1u);
522 EXPECT_LT(monotonic_clock::time_point(chrono::nanoseconds(
523 connection->connected_since_time())),
524 monotonic_clock::now());
525 // The first Connected message may not have a UUID in it since no
526 // data has flown. That's fine.
527 if (pi1_connected_client_statistics_count > 0) {
528 EXPECT_TRUE(connection->has_boot_uuid())
529 << ": " << aos::FlatbufferToJson(connection);
530 }
531 ++pi1_connected_client_statistics_count;
532 } else {
533 EXPECT_FALSE(connection->has_connection_count());
534 EXPECT_FALSE(connection->has_connected_since_time());
535 }
536 }
537 });
Austin Schuh7bc59052020-02-16 23:48:33 -0800538
539 int pi2_client_statistics_count = 0;
Austin Schuh367a7f42021-11-23 23:04:36 -0800540 int pi2_connected_client_statistics_count = 0;
541 pong_event_loop.MakeWatcher(
542 "/pi2/aos",
543 [&pi2_client_statistics_count,
544 &pi2_connected_client_statistics_count](const ClientStatistics &stats) {
545 VLOG(1) << "/pi2/aos ClientStatistics " << FlatbufferToJson(&stats);
Austin Schuh7bc59052020-02-16 23:48:33 -0800546
Austin Schuh367a7f42021-11-23 23:04:36 -0800547 for (const ClientConnection *connection : *stats.connections()) {
548 if (connection->has_monotonic_offset()) {
549 ++pi2_client_statistics_count;
550 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
551 chrono::milliseconds(10))
552 << ": got " << aos::FlatbufferToJson(connection);
553 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
554 chrono::microseconds(10))
555 << ": got " << aos::FlatbufferToJson(connection);
556 }
557 if (connection->state() == State::CONNECTED) {
558 EXPECT_EQ(connection->connection_count(), 1u);
559 EXPECT_LT(monotonic_clock::time_point(chrono::nanoseconds(
560 connection->connected_since_time())),
561 monotonic_clock::now());
562 if (pi2_connected_client_statistics_count > 0) {
563 EXPECT_TRUE(connection->has_boot_uuid());
564 }
565 ++pi2_connected_client_statistics_count;
566 } else {
567 EXPECT_FALSE(connection->has_connection_count());
568 EXPECT_FALSE(connection->has_connected_since_time());
569 }
570 }
571 });
Austin Schuh7bc59052020-02-16 23:48:33 -0800572
Austin Schuh196a4452020-03-15 23:12:03 -0700573 ping_event_loop.MakeWatcher("/pi1/aos", [](const Timestamp &timestamp) {
Austin Schuh7bc59052020-02-16 23:48:33 -0800574 EXPECT_TRUE(timestamp.has_offsets());
Austin Schuh1ca49e92020-12-11 00:01:27 -0800575 VLOG(1) << "/pi1/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuh7bc59052020-02-16 23:48:33 -0800576 });
Austin Schuh196a4452020-03-15 23:12:03 -0700577 pong_event_loop.MakeWatcher("/pi2/aos", [](const Timestamp &timestamp) {
Austin Schuh7bc59052020-02-16 23:48:33 -0800578 EXPECT_TRUE(timestamp.has_offsets());
Austin Schuh1ca49e92020-12-11 00:01:27 -0800579 VLOG(1) << "/pi2/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuh7bc59052020-02-16 23:48:33 -0800580 });
581
582 // Run for 5 seconds to make sure we have time to estimate the offset.
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800583 aos::TimerHandler *quit = ping_event_loop.AddTimer(
584 [&ping_event_loop]() { ping_event_loop.Exit(); });
585 ping_event_loop.OnRun([quit, &ping_event_loop]() {
Austin Schuh7bc59052020-02-16 23:48:33 -0800586 // Stop between timestamps, not exactly on them.
587 quit->Setup(ping_event_loop.monotonic_now() + chrono::milliseconds(5050));
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800588 });
589
Austin Schuh2f8fd752020-09-01 22:38:28 -0700590 // Find the channel index for both the /pi1/aos Timestamp channel and Ping
591 // channel.
592 const size_t pi1_timestamp_channel = configuration::ChannelIndex(
593 pong_event_loop.configuration(), pi1_on_pi2_timestamp_fetcher.channel());
594 const size_t ping_timestamp_channel =
595 configuration::ChannelIndex(delivered_messages_event_loop.configuration(),
596 ping_on_pi2_fetcher.channel());
597
598 for (const Channel *channel : *ping_event_loop.configuration()->channels()) {
599 VLOG(1) << "Channel "
600 << configuration::ChannelIndex(ping_event_loop.configuration(),
601 channel)
602 << " " << configuration::CleanedChannelToString(channel);
603 }
604
605 // For each remote timestamp we get back, confirm that it is either a ping
606 // message, or a timestamp we sent out. Also confirm that the timestamps are
607 // correct.
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800608 for (std::pair<int, std::string> channel :
609 shared()
610 ? std::vector<std::pair<
611 int, std::string>>{{-1, "/pi1/aos/remote_timestamps/pi2"}}
612 : std::vector<std::pair<int, std::string>>{
613 {pi1_timestamp_channel,
614 "/pi1/aos/remote_timestamps/pi2/pi1/aos/"
615 "aos-message_bridge-Timestamp"},
616 {ping_timestamp_channel,
617 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping"}}) {
618 ping_event_loop.MakeWatcher(
619 channel.second,
620 [pi1_timestamp_channel, ping_timestamp_channel, &ping_on_pi2_fetcher,
621 &ping_on_pi1_fetcher, &pi1_on_pi2_timestamp_fetcher,
622 &pi1_on_pi1_timestamp_fetcher,
623 channel_index = channel.first](const RemoteMessage &header) {
624 VLOG(1) << "/pi1/aos/remote_timestamps/pi2 RemoteMessage "
625 << aos::FlatbufferToJson(&header);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700626
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800627 EXPECT_TRUE(header.has_boot_uuid());
628 if (channel_index != -1) {
629 ASSERT_EQ(channel_index, header.channel_index());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700630 }
631
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800632 const aos::monotonic_clock::time_point header_monotonic_sent_time(
633 chrono::nanoseconds(header.monotonic_sent_time()));
634 const aos::realtime_clock::time_point header_realtime_sent_time(
635 chrono::nanoseconds(header.realtime_sent_time()));
636 const aos::monotonic_clock::time_point header_monotonic_remote_time(
637 chrono::nanoseconds(header.monotonic_remote_time()));
638 const aos::realtime_clock::time_point header_realtime_remote_time(
639 chrono::nanoseconds(header.realtime_remote_time()));
640
641 const Context *pi1_context = nullptr;
642 const Context *pi2_context = nullptr;
643
644 if (header.channel_index() == pi1_timestamp_channel) {
645 // Find the forwarded message.
646 while (pi1_on_pi2_timestamp_fetcher.context().monotonic_event_time <
647 header_monotonic_sent_time) {
648 ASSERT_TRUE(pi1_on_pi2_timestamp_fetcher.FetchNext());
649 }
650
651 // And the source message.
652 while (pi1_on_pi1_timestamp_fetcher.context().monotonic_event_time <
653 header_monotonic_remote_time) {
654 ASSERT_TRUE(pi1_on_pi1_timestamp_fetcher.FetchNext());
655 }
656
657 pi1_context = &pi1_on_pi1_timestamp_fetcher.context();
658 pi2_context = &pi1_on_pi2_timestamp_fetcher.context();
659 } else if (header.channel_index() == ping_timestamp_channel) {
660 // Find the forwarded message.
661 while (ping_on_pi2_fetcher.context().monotonic_event_time <
662 header_monotonic_sent_time) {
663 ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
664 }
665
666 // And the source message.
667 while (ping_on_pi1_fetcher.context().monotonic_event_time <
668 header_monotonic_remote_time) {
669 ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
670 }
671
672 pi1_context = &ping_on_pi1_fetcher.context();
673 pi2_context = &ping_on_pi2_fetcher.context();
674 } else {
675 LOG(FATAL) << "Unknown channel";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700676 }
677
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800678 // Confirm the forwarded message has matching timestamps to the
679 // timestamps we got back.
680 EXPECT_EQ(pi2_context->queue_index, header.queue_index());
681 EXPECT_EQ(pi2_context->monotonic_event_time,
682 header_monotonic_sent_time);
683 EXPECT_EQ(pi2_context->realtime_event_time,
684 header_realtime_sent_time);
685 EXPECT_EQ(pi2_context->realtime_remote_time,
686 header_realtime_remote_time);
687 EXPECT_EQ(pi2_context->monotonic_remote_time,
688 header_monotonic_remote_time);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700689
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800690 // Confirm the forwarded message also matches the source message.
691 EXPECT_EQ(pi1_context->queue_index, header.queue_index());
692 EXPECT_EQ(pi1_context->monotonic_event_time,
693 header_monotonic_remote_time);
694 EXPECT_EQ(pi1_context->realtime_event_time,
695 header_realtime_remote_time);
696 });
697 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700698
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800699 // Start everything up. Pong is the only thing we don't know how to wait
700 // on, so start it first.
Austin Schuh7bc59052020-02-16 23:48:33 -0800701 std::thread pong_thread([&pong_event_loop]() { pong_event_loop.Run(); });
702
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800703 StartPi1Server();
704 StartPi1Client();
705 StartPi2Client();
706 StartPi2Server();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800707
708 // And go!
709 ping_event_loop.Run();
710
711 // Shut everyone else down
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800712 StopPi1Server();
713 StopPi1Client();
714 StopPi2Client();
715 StopPi2Server();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800716 pong_event_loop.Exit();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800717 pong_thread.join();
718
719 // Make sure we sent something.
720 EXPECT_GE(ping_count, 1);
721 // And got something back.
722 EXPECT_GE(pong_count, 1);
Austin Schuh7bc59052020-02-16 23:48:33 -0800723
724 // Confirm that we are estimating a monotonic offset on the client.
725 ASSERT_TRUE(client_statistics_fetcher.Fetch());
726
727 EXPECT_EQ(client_statistics_fetcher->connections()->size(), 1u);
728 EXPECT_EQ(client_statistics_fetcher->connections()
729 ->Get(0)
730 ->node()
731 ->name()
732 ->string_view(),
733 "pi1");
734
735 // Make sure the offset in one direction is less than a second.
736 EXPECT_GT(
Austin Schuh2b159eb2021-07-31 19:42:21 -0700737 client_statistics_fetcher->connections()->Get(0)->monotonic_offset(), 0)
738 << aos::FlatbufferToJson(client_statistics_fetcher.get());
Austin Schuh7bc59052020-02-16 23:48:33 -0800739 EXPECT_LT(
740 client_statistics_fetcher->connections()->Get(0)->monotonic_offset(),
Austin Schuh2b159eb2021-07-31 19:42:21 -0700741 1000000000)
742 << aos::FlatbufferToJson(client_statistics_fetcher.get());
Austin Schuh7bc59052020-02-16 23:48:33 -0800743
744 EXPECT_GE(pi1_server_statistics_count, 2);
745 EXPECT_GE(pi2_server_statistics_count, 2);
746 EXPECT_GE(pi1_client_statistics_count, 2);
747 EXPECT_GE(pi2_client_statistics_count, 2);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700748
749 // Confirm we got timestamps back!
750 EXPECT_TRUE(message_header_fetcher1.Fetch());
751 EXPECT_TRUE(message_header_fetcher2.Fetch());
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800752}
753
Austin Schuh5344c352020-04-12 17:04:26 -0700754// Test that the client disconnecting triggers the server offsets on both sides
755// to clear.
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800756TEST_P(MessageBridgeParameterizedTest, ClientRestart) {
Austin Schuh5344c352020-04-12 17:04:26 -0700757 // This is rather annoying to set up. We need to start up a client and
758 // server, on the same node, but get them to think that they are on different
759 // nodes.
760 //
761 // We need the client to not post directly to "/test" like it would in a
762 // real system, otherwise we will re-send the ping message... So, use an
763 // application specific map to have the client post somewhere else.
764 //
765 // To top this all off, each of these needs to be done with a ShmEventLoop,
766 // which needs to run in a separate thread... And it is really hard to get
767 // everything started up reliably. So just be super generous on timeouts and
768 // hope for the best. We can be more generous in the future if we need to.
769 //
770 // We are faking the application names by passing in --application_name=foo
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800771 OnPi1();
Austin Schuh5344c352020-04-12 17:04:26 -0700772
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800773 MakePi1Server();
774 MakePi1Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700775
776 // And build the app for testing.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800777 MakePi1Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700778 aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800779 pi1_test_event_loop->MakeFetcher<ServerStatistics>("/pi1/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700780
781 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800782 OnPi2();
783 MakePi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700784
785 // And build the app for testing.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800786 MakePi2Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700787 aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800788 pi2_test_event_loop->MakeFetcher<ServerStatistics>("/pi2/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700789
790 // Wait until we are connected, then send.
Austin Schuh5344c352020-04-12 17:04:26 -0700791
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800792 StartPi1Test();
793 StartPi2Test();
794 StartPi1Server();
795 StartPi1Client();
796 StartPi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700797
798 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800799 MakePi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700800
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800801 RunPi2Client(chrono::milliseconds(3050));
Austin Schuh5344c352020-04-12 17:04:26 -0700802
803 // Now confirm we are synchronized.
804 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
805 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
806
807 const ServerConnection *const pi1_connection =
808 pi1_server_statistics_fetcher->connections()->Get(0);
809 const ServerConnection *const pi2_connection =
810 pi2_server_statistics_fetcher->connections()->Get(0);
811
812 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800813 EXPECT_EQ(pi1_connection->connection_count(), 1u);
814 EXPECT_TRUE(pi1_connection->has_connected_since_time());
Austin Schuh5344c352020-04-12 17:04:26 -0700815 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
816 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
817 chrono::milliseconds(1));
818 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
819 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800820 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700821
822 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800823 EXPECT_EQ(pi2_connection->connection_count(), 1u);
824 EXPECT_TRUE(pi2_connection->has_connected_since_time());
Austin Schuh5344c352020-04-12 17:04:26 -0700825 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
826 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
827 chrono::milliseconds(1));
828 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
829 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800830 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800831
832 StopPi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700833 }
834
Austin Schuhd0d894e2021-10-24 17:13:11 -0700835 std::this_thread::sleep_for(SctpClientConnection::kReconnectTimeout +
836 std::chrono::seconds(1));
Austin Schuh5344c352020-04-12 17:04:26 -0700837
838 {
839 // Now confirm we are un-synchronized.
840 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
841 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
842 const ServerConnection *const pi1_connection =
843 pi1_server_statistics_fetcher->connections()->Get(0);
844 const ServerConnection *const pi2_connection =
845 pi2_server_statistics_fetcher->connections()->Get(0);
846
847 EXPECT_EQ(pi1_connection->state(), State::DISCONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800848 EXPECT_EQ(pi1_connection->connection_count(), 1u);
849 EXPECT_FALSE(pi1_connection->has_connected_since_time());
Austin Schuh5344c352020-04-12 17:04:26 -0700850 EXPECT_FALSE(pi1_connection->has_monotonic_offset());
Austin Schuh20ac95d2020-12-05 17:24:19 -0800851 EXPECT_FALSE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700852 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
853 EXPECT_FALSE(pi2_connection->has_monotonic_offset());
Austin Schuh20ac95d2020-12-05 17:24:19 -0800854 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh367a7f42021-11-23 23:04:36 -0800855 EXPECT_EQ(pi2_connection->connection_count(), 1u);
856 EXPECT_TRUE(pi2_connection->has_connected_since_time());
Austin Schuh5344c352020-04-12 17:04:26 -0700857 }
858
859 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800860 MakePi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700861 // And go!
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800862 RunPi2Client(chrono::milliseconds(3050));
Austin Schuh5344c352020-04-12 17:04:26 -0700863
864 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
865 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
866
867 // Now confirm we are synchronized again.
868 const ServerConnection *const pi1_connection =
869 pi1_server_statistics_fetcher->connections()->Get(0);
870 const ServerConnection *const pi2_connection =
871 pi2_server_statistics_fetcher->connections()->Get(0);
872
873 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800874 EXPECT_EQ(pi1_connection->connection_count(), 2u);
875 EXPECT_TRUE(pi1_connection->has_connected_since_time());
Austin Schuh5344c352020-04-12 17:04:26 -0700876 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
877 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
Austin Schuh367a7f42021-11-23 23:04:36 -0800878 chrono::milliseconds(1))
879 << ": " << FlatbufferToJson(pi1_connection);
Austin Schuh5344c352020-04-12 17:04:26 -0700880 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
Austin Schuh367a7f42021-11-23 23:04:36 -0800881 chrono::milliseconds(-1))
882 << ": " << FlatbufferToJson(pi1_connection);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800883 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700884
885 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800886 EXPECT_EQ(pi2_connection->connection_count(), 1u);
887 EXPECT_TRUE(pi2_connection->has_connected_since_time());
Austin Schuh5344c352020-04-12 17:04:26 -0700888 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
889 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
Austin Schuh367a7f42021-11-23 23:04:36 -0800890 chrono::milliseconds(1))
891 << ": " << FlatbufferToJson(pi2_connection);
Austin Schuh5344c352020-04-12 17:04:26 -0700892 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
Austin Schuh367a7f42021-11-23 23:04:36 -0800893 chrono::milliseconds(-1))
894 << ": " << FlatbufferToJson(pi2_connection);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800895 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800896
897 StopPi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700898 }
899
900 // Shut everyone else down
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800901 StopPi1Server();
902 StopPi1Client();
903 StopPi2Server();
904 StopPi1Test();
905 StopPi2Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700906}
907
908// Test that the server disconnecting triggers the server offsets on the other
909// side to clear, along with the other client.
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800910TEST_P(MessageBridgeParameterizedTest, ServerRestart) {
Austin Schuh5344c352020-04-12 17:04:26 -0700911 // This is rather annoying to set up. We need to start up a client and
912 // server, on the same node, but get them to think that they are on different
913 // nodes.
914 //
915 // We need the client to not post directly to "/test" like it would in a
916 // real system, otherwise we will re-send the ping message... So, use an
917 // application specific map to have the client post somewhere else.
918 //
919 // To top this all off, each of these needs to be done with a ShmEventLoop,
920 // which needs to run in a separate thread... And it is really hard to get
921 // everything started up reliably. So just be super generous on timeouts and
922 // hope for the best. We can be more generous in the future if we need to.
923 //
924 // We are faking the application names by passing in --application_name=foo
Austin Schuh5344c352020-04-12 17:04:26 -0700925 // Force ourselves to be "raspberrypi" and allocate everything.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800926 OnPi1();
927 MakePi1Server();
928 MakePi1Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700929
930 // And build the app for testing.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800931 MakePi1Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700932 aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800933 pi1_test_event_loop->MakeFetcher<ServerStatistics>("/pi1/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700934 aos::Fetcher<ClientStatistics> pi1_client_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800935 pi1_test_event_loop->MakeFetcher<ClientStatistics>("/pi1/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700936
937 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800938 OnPi2();
939 MakePi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700940
941 // And build the app for testing.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800942 MakePi2Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700943 aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800944 pi2_test_event_loop->MakeFetcher<ServerStatistics>("/pi2/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700945
946 // Start everything up. Pong is the only thing we don't know how to wait on,
947 // so start it first.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800948 StartPi1Test();
949 StartPi2Test();
950 StartPi1Server();
951 StartPi1Client();
952 StartPi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700953
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800954 // Confirm both client and server statistics messages have decent offsets in
955 // them.
Austin Schuh5344c352020-04-12 17:04:26 -0700956
957 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800958 MakePi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700959
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800960 RunPi2Server(chrono::milliseconds(3050));
Austin Schuh5344c352020-04-12 17:04:26 -0700961
962 // Now confirm we are synchronized.
963 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
964 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
965
966 const ServerConnection *const pi1_connection =
967 pi1_server_statistics_fetcher->connections()->Get(0);
968 const ServerConnection *const pi2_connection =
969 pi2_server_statistics_fetcher->connections()->Get(0);
970
971 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
972 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
973 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
974 chrono::milliseconds(1));
975 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
976 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800977 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh367a7f42021-11-23 23:04:36 -0800978 EXPECT_TRUE(pi1_connection->has_connected_since_time());
979 EXPECT_EQ(pi1_connection->connection_count(), 1u);
Austin Schuh5344c352020-04-12 17:04:26 -0700980
981 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
982 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
983 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
984 chrono::milliseconds(1));
985 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
986 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800987 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh367a7f42021-11-23 23:04:36 -0800988 EXPECT_TRUE(pi2_connection->has_connected_since_time());
989 EXPECT_EQ(pi2_connection->connection_count(), 1u);
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800990
991 StopPi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700992 }
993
994 std::this_thread::sleep_for(std::chrono::seconds(2));
995
996 {
997 // And confirm we are unsynchronized.
998 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
999 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1000
1001 const ServerConnection *const pi1_server_connection =
1002 pi1_server_statistics_fetcher->connections()->Get(0);
1003 const ClientConnection *const pi1_client_connection =
1004 pi1_client_statistics_fetcher->connections()->Get(0);
1005
1006 EXPECT_EQ(pi1_server_connection->state(), State::CONNECTED);
1007 EXPECT_FALSE(pi1_server_connection->has_monotonic_offset());
Austin Schuh367a7f42021-11-23 23:04:36 -08001008 EXPECT_TRUE(pi1_server_connection->has_connected_since_time());
1009 EXPECT_EQ(pi1_server_connection->connection_count(), 1u);
1010
Austin Schuh20ac95d2020-12-05 17:24:19 -08001011 EXPECT_TRUE(pi1_server_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -07001012 EXPECT_EQ(pi1_client_connection->state(), State::DISCONNECTED);
1013 EXPECT_FALSE(pi1_client_connection->has_monotonic_offset());
Austin Schuh367a7f42021-11-23 23:04:36 -08001014 EXPECT_FALSE(pi1_client_connection->has_connected_since_time());
1015 EXPECT_EQ(pi1_client_connection->connection_count(), 1u);
1016 EXPECT_FALSE(pi1_client_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -07001017 }
1018
1019 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001020 MakePi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -07001021
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001022 RunPi2Server(chrono::milliseconds(3050));
Austin Schuh5344c352020-04-12 17:04:26 -07001023
1024 // And confirm we are synchronized again.
1025 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1026 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
Austin Schuh367a7f42021-11-23 23:04:36 -08001027 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
Austin Schuh5344c352020-04-12 17:04:26 -07001028
1029 const ServerConnection *const pi1_connection =
1030 pi1_server_statistics_fetcher->connections()->Get(0);
1031 const ServerConnection *const pi2_connection =
1032 pi2_server_statistics_fetcher->connections()->Get(0);
Austin Schuh367a7f42021-11-23 23:04:36 -08001033 const ClientConnection *const pi1_client_connection =
1034 pi1_client_statistics_fetcher->connections()->Get(0);
Austin Schuh5344c352020-04-12 17:04:26 -07001035
1036 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
1037 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
1038 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
1039 chrono::milliseconds(1));
1040 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
1041 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -08001042 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -07001043
Austin Schuh367a7f42021-11-23 23:04:36 -08001044 EXPECT_EQ(pi1_client_connection->state(), State::CONNECTED);
1045 EXPECT_TRUE(pi1_client_connection->has_connected_since_time());
1046 EXPECT_EQ(pi1_client_connection->connection_count(), 2u);
1047 EXPECT_TRUE(pi1_client_connection->has_boot_uuid());
1048
Austin Schuh5344c352020-04-12 17:04:26 -07001049 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
1050 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
1051 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
1052 chrono::milliseconds(1));
1053 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
1054 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -08001055 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001056
1057 StopPi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -07001058 }
1059
1060 // Shut everyone else down
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001061 StopPi1Server();
1062 StopPi1Client();
1063 StopPi2Client();
1064 StopPi1Test();
1065 StopPi2Test();
Austin Schuh5344c352020-04-12 17:04:26 -07001066}
1067
Austin Schuh4889b182020-11-18 19:11:56 -08001068// TODO(austin): The above test confirms that the external state does the right
Austin Schuh5344c352020-04-12 17:04:26 -07001069// thing, but doesn't confirm that the internal state does. We either need to
1070// expose a way to check the state in a thread-safe way, or need a way to jump
1071// time for one node to do that.
1072
Austin Schuh4889b182020-11-18 19:11:56 -08001073void SendPing(aos::Sender<examples::Ping> *sender, int value) {
1074 aos::Sender<examples::Ping>::Builder builder = sender->MakeBuilder();
1075 examples::Ping::Builder ping_builder = builder.MakeBuilder<examples::Ping>();
1076 ping_builder.add_value(value);
milind1f1dca32021-07-03 13:50:07 -07001077 builder.CheckOk(builder.Send(ping_builder.Finish()));
Austin Schuh4889b182020-11-18 19:11:56 -08001078}
1079
1080// Tests that when a message is sent before the bridge starts up, but is
1081// configured as reliable, we forward it. Confirm this survives a client reset.
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001082TEST_P(MessageBridgeParameterizedTest, ReliableSentBeforeClientStartup) {
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001083 OnPi1();
Austin Schuh4889b182020-11-18 19:11:56 -08001084
1085 FLAGS_application_name = "sender";
Austin Schuhf466ab52021-02-16 22:00:38 -08001086 aos::ShmEventLoop send_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -08001087 aos::Sender<examples::Ping> ping_sender =
1088 send_event_loop.MakeSender<examples::Ping>("/test");
1089 SendPing(&ping_sender, 1);
1090 aos::Sender<examples::Ping> unreliable_ping_sender =
1091 send_event_loop.MakeSender<examples::Ping>("/unreliable");
1092 SendPing(&unreliable_ping_sender, 1);
1093
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001094 MakePi1Server();
1095 MakePi1Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001096
1097 FLAGS_application_name = "pi1_timestamp";
Austin Schuhf466ab52021-02-16 22:00:38 -08001098 aos::ShmEventLoop pi1_remote_timestamp_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -08001099
1100 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001101 OnPi2();
Austin Schuh4889b182020-11-18 19:11:56 -08001102
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001103 MakePi2Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001104
Austin Schuhf466ab52021-02-16 22:00:38 -08001105 aos::ShmEventLoop receive_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -08001106 aos::Fetcher<examples::Ping> ping_fetcher =
1107 receive_event_loop.MakeFetcher<examples::Ping>("/test");
1108 aos::Fetcher<examples::Ping> unreliable_ping_fetcher =
1109 receive_event_loop.MakeFetcher<examples::Ping>("/unreliable");
1110 aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
1111 receive_event_loop.MakeFetcher<ClientStatistics>("/pi2/aos");
1112
1113 const size_t ping_channel_index = configuration::ChannelIndex(
1114 receive_event_loop.configuration(), ping_fetcher.channel());
1115
1116 std::atomic<int> ping_timestamp_count{0};
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001117 const std::string channel_name =
1118 shared() ? "/pi1/aos/remote_timestamps/pi2"
1119 : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping";
Austin Schuh4889b182020-11-18 19:11:56 -08001120 pi1_remote_timestamp_event_loop.MakeWatcher(
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001121 channel_name, [this, channel_name, ping_channel_index,
1122 &ping_timestamp_count](const RemoteMessage &header) {
Austin Schuh61e973f2021-02-21 21:43:56 -08001123 VLOG(1) << channel_name << " RemoteMessage "
Austin Schuh0de30f32020-12-06 12:44:28 -08001124 << aos::FlatbufferToJson(&header);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001125 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001126 if (shared() && header.channel_index() != ping_channel_index) {
1127 return;
Austin Schuh4889b182020-11-18 19:11:56 -08001128 }
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001129 CHECK_EQ(header.channel_index(), ping_channel_index);
1130 ++ping_timestamp_count;
Austin Schuh4889b182020-11-18 19:11:56 -08001131 });
1132
1133 // Before everything starts up, confirm there is no message.
1134 EXPECT_FALSE(ping_fetcher.Fetch());
1135 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
1136
1137 // Spin up the persistant pieces.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001138 StartPi1Server();
1139 StartPi1Client();
1140 StartPi2Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001141
1142 // Event used to wait for the timestamp counting thread to start.
1143 aos::Event event;
1144 std::thread pi1_remote_timestamp_thread(
1145 [&pi1_remote_timestamp_event_loop, &event]() {
1146 pi1_remote_timestamp_event_loop.OnRun([&event]() { event.Set(); });
1147 pi1_remote_timestamp_event_loop.Run();
1148 });
1149
1150 event.Wait();
1151
1152 {
1153 // Now, spin up a client for 2 seconds.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001154 MakePi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001155
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001156 RunPi2Client(chrono::milliseconds(2050));
Austin Schuh4889b182020-11-18 19:11:56 -08001157
1158 // Confirm there is no detected duplicate packet.
1159 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1160 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1161 ->Get(0)
1162 ->duplicate_packets(),
1163 0u);
1164
Austin Schuhe61d4382021-03-31 21:33:02 -07001165 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1166 ->Get(0)
1167 ->partial_deliveries(),
1168 0u);
1169
Austin Schuh4889b182020-11-18 19:11:56 -08001170 EXPECT_TRUE(ping_fetcher.Fetch());
1171 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
1172 EXPECT_EQ(ping_timestamp_count, 1);
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001173
1174 StopPi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001175 }
1176
1177 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001178 // Now, spin up a client for 2 seconds.
1179 MakePi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001180
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001181 RunPi2Client(chrono::milliseconds(5050));
Austin Schuh4889b182020-11-18 19:11:56 -08001182
1183 // Confirm we detect the duplicate packet correctly.
1184 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1185 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1186 ->Get(0)
1187 ->duplicate_packets(),
1188 1u);
1189
Austin Schuhe61d4382021-03-31 21:33:02 -07001190 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1191 ->Get(0)
1192 ->partial_deliveries(),
1193 0u);
1194
Austin Schuh4889b182020-11-18 19:11:56 -08001195 EXPECT_EQ(ping_timestamp_count, 1);
1196 EXPECT_FALSE(ping_fetcher.Fetch());
1197 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001198
1199 StopPi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001200 }
1201
1202 // Shut everyone else down
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001203 StopPi1Client();
1204 StopPi2Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001205 pi1_remote_timestamp_event_loop.Exit();
1206 pi1_remote_timestamp_thread.join();
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001207 StopPi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001208}
1209
1210// Tests that when a message is sent before the bridge starts up, but is
1211// configured as reliable, we forward it. Confirm this works across server
1212// resets.
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001213TEST_P(MessageBridgeParameterizedTest, ReliableSentBeforeServerStartup) {
Austin Schuh4889b182020-11-18 19:11:56 -08001214 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001215 OnPi2();
Austin Schuh4889b182020-11-18 19:11:56 -08001216
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001217 MakePi2Server();
1218 MakePi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001219
Austin Schuhf466ab52021-02-16 22:00:38 -08001220 aos::ShmEventLoop receive_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -08001221 aos::Fetcher<examples::Ping> ping_fetcher =
1222 receive_event_loop.MakeFetcher<examples::Ping>("/test");
1223 aos::Fetcher<examples::Ping> unreliable_ping_fetcher =
1224 receive_event_loop.MakeFetcher<examples::Ping>("/unreliable");
1225 aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
1226 receive_event_loop.MakeFetcher<ClientStatistics>("/pi2/aos");
1227
Austin Schuh4889b182020-11-18 19:11:56 -08001228 // Force ourselves to be "raspberrypi" and allocate everything.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001229 OnPi1();
Austin Schuh4889b182020-11-18 19:11:56 -08001230
1231 FLAGS_application_name = "sender";
Austin Schuhf466ab52021-02-16 22:00:38 -08001232 aos::ShmEventLoop send_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -08001233 aos::Sender<examples::Ping> ping_sender =
1234 send_event_loop.MakeSender<examples::Ping>("/test");
1235 {
1236 aos::Sender<examples::Ping>::Builder builder = ping_sender.MakeBuilder();
1237 examples::Ping::Builder ping_builder =
1238 builder.MakeBuilder<examples::Ping>();
1239 ping_builder.add_value(1);
milind1f1dca32021-07-03 13:50:07 -07001240 builder.CheckOk(builder.Send(ping_builder.Finish()));
Austin Schuh4889b182020-11-18 19:11:56 -08001241 }
1242
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001243 MakePi1Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001244
1245 FLAGS_application_name = "pi1_timestamp";
Austin Schuhf466ab52021-02-16 22:00:38 -08001246 aos::ShmEventLoop pi1_remote_timestamp_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -08001247
1248 const size_t ping_channel_index = configuration::ChannelIndex(
1249 receive_event_loop.configuration(), ping_fetcher.channel());
1250
1251 std::atomic<int> ping_timestamp_count{0};
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001252 const std::string channel_name =
1253 shared() ? "/pi1/aos/remote_timestamps/pi2"
1254 : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping";
Austin Schuh4889b182020-11-18 19:11:56 -08001255 pi1_remote_timestamp_event_loop.MakeWatcher(
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001256 channel_name, [this, channel_name, ping_channel_index,
1257 &ping_timestamp_count](const RemoteMessage &header) {
1258 VLOG(1) << channel_name << " RemoteMessage "
Austin Schuh0de30f32020-12-06 12:44:28 -08001259 << aos::FlatbufferToJson(&header);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001260 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001261 if (shared() && header.channel_index() != ping_channel_index) {
1262 return;
Austin Schuh4889b182020-11-18 19:11:56 -08001263 }
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001264 CHECK_EQ(header.channel_index(), ping_channel_index);
1265 ++ping_timestamp_count;
Austin Schuh4889b182020-11-18 19:11:56 -08001266 });
1267
1268 // Before everything starts up, confirm there is no message.
1269 EXPECT_FALSE(ping_fetcher.Fetch());
1270 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
1271
1272 // Spin up the persistant pieces.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001273 StartPi1Client();
1274 StartPi2Server();
1275 StartPi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001276
1277 // Event used to wait for the timestamp counting thread to start.
1278 aos::Event event;
1279 std::thread pi1_remote_timestamp_thread(
1280 [&pi1_remote_timestamp_event_loop, &event]() {
1281 pi1_remote_timestamp_event_loop.OnRun([&event]() { event.Set(); });
1282 pi1_remote_timestamp_event_loop.Run();
1283 });
1284
1285 event.Wait();
1286
1287 {
1288 // Now, spin up a server for 2 seconds.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001289 MakePi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001290
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001291 RunPi1Server(chrono::milliseconds(2050));
Austin Schuh4889b182020-11-18 19:11:56 -08001292
1293 // Confirm there is no detected duplicate packet.
1294 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1295 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1296 ->Get(0)
1297 ->duplicate_packets(),
1298 0u);
1299
Austin Schuhe61d4382021-03-31 21:33:02 -07001300 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1301 ->Get(0)
1302 ->partial_deliveries(),
1303 0u);
1304
Austin Schuh4889b182020-11-18 19:11:56 -08001305 EXPECT_TRUE(ping_fetcher.Fetch());
1306 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
1307 EXPECT_EQ(ping_timestamp_count, 1);
1308 LOG(INFO) << "Shutting down first pi1 MessageBridgeServer";
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001309
1310 StopPi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001311 }
1312
1313 {
1314 // Now, spin up a second server for 2 seconds.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001315 MakePi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001316
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001317 RunPi1Server(chrono::milliseconds(2050));
Austin Schuh4889b182020-11-18 19:11:56 -08001318
1319 // Confirm we detect the duplicate packet correctly.
1320 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1321 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1322 ->Get(0)
1323 ->duplicate_packets(),
1324 1u);
1325
Austin Schuhe61d4382021-03-31 21:33:02 -07001326
1327 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1328 ->Get(0)
1329 ->partial_deliveries(),
1330 0u);
1331
Austin Schuh4889b182020-11-18 19:11:56 -08001332 EXPECT_EQ(ping_timestamp_count, 1);
1333 EXPECT_FALSE(ping_fetcher.Fetch());
1334 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001335
1336 StopPi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001337 }
1338
1339 // Shut everyone else down
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001340 StopPi1Client();
1341 StopPi2Server();
1342 StopPi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001343 pi1_remote_timestamp_event_loop.Exit();
1344 pi1_remote_timestamp_thread.join();
Austin Schuh4889b182020-11-18 19:11:56 -08001345}
1346
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -07001347INSTANTIATE_TEST_SUITE_P(
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001348 MessageBridgeTests, MessageBridgeParameterizedTest,
1349 ::testing::Values(
1350 Param{"message_bridge_test_combined_timestamps_common_config.json",
1351 true},
1352 Param{"message_bridge_test_common_config.json", false}));
1353
Austin Schuhe84c3ed2019-12-14 15:29:48 -08001354} // namespace testing
1355} // namespace message_bridge
1356} // namespace aos