blob: c211c89451d04a6714a23361703c05e5d7d25555 [file] [log] [blame]
Austin Schuhe84c3ed2019-12-14 15:29:48 -08001#include <chrono>
2#include <thread>
3
Austin Schuh2f8fd752020-09-01 22:38:28 -07004#include "absl/strings/str_cat.h"
Philipp Schrader790cb542023-07-05 21:06:52 -07005#include "gtest/gtest.h"
6
Austin Schuhe84c3ed2019-12-14 15:29:48 -08007#include "aos/events/ping_generated.h"
8#include "aos/events/pong_generated.h"
Brian Silverman7b266d92021-02-17 21:24:02 -08009#include "aos/ipc_lib/event.h"
Austin Schuhe84c3ed2019-12-14 15:29:48 -080010#include "aos/network/message_bridge_client_lib.h"
Austin Schuh89f23e32023-05-15 17:06:43 -070011#include "aos/network/message_bridge_protocol.h"
Austin Schuhe84c3ed2019-12-14 15:29:48 -080012#include "aos/network/message_bridge_server_lib.h"
James Kuszmaul79b2f032023-06-02 21:02:27 -070013#include "aos/network/message_bridge_test_lib.h"
Jim Ostrowski2192ddb2020-06-24 19:07:31 -070014#include "aos/network/team_number.h"
Austin Schuhb0e439d2023-05-15 10:55:40 -070015#include "aos/sha256.h"
Austin Schuh373f1762021-06-02 21:07:09 -070016#include "aos/testing/path.h"
Austin Schuhe991fe22020-11-18 16:53:39 -080017#include "aos/util/file.h"
Austin Schuhe84c3ed2019-12-14 15:29:48 -080018
Stephan Pleinesf63bde82024-01-13 15:59:33 -080019namespace aos::message_bridge::testing {
Austin Schuhe84c3ed2019-12-14 15:29:48 -080020
James Kuszmaul79b2f032023-06-02 21:02:27 -070021// Note: All of these tests spin up ShmEventLoop's in separate threads to allow
22// us to run the "real" message bridge. This requires extra threading and timing
23// coordination to make happen, which is the reason for some of the extra
24// complexity in these tests.
Austin Schuhe991fe22020-11-18 16:53:39 -080025
Austin Schuhe84c3ed2019-12-14 15:29:48 -080026// Test that we can send a ping message over sctp and receive it.
Austin Schuh36a2c3e2021-02-18 22:28:38 -080027TEST_P(MessageBridgeParameterizedTest, PingPong) {
Austin Schuhe84c3ed2019-12-14 15:29:48 -080028 // This is rather annoying to set up. We need to start up a client and
29 // server, on the same node, but get them to think that they are on different
30 // nodes.
31 //
32 // We then get to wait until they are connected.
33 //
34 // After they are connected, we send a Ping message.
35 //
36 // On the other end, we receive a Pong message.
37 //
38 // But, we need the client to not post directly to "/test" like it would in a
39 // real system, otherwise we will re-send the ping message... So, use an
40 // application specific map to have the client post somewhere else.
41 //
42 // To top this all off, each of these needs to be done with a ShmEventLoop,
43 // which needs to run in a separate thread... And it is really hard to get
44 // everything started up reliably. So just be super generous on timeouts and
45 // hope for the best. We can be more generous in the future if we need to.
46 //
47 // We are faking the application names by passing in --application_name=foo
Austin Schuh0a2f12f2021-01-08 22:48:29 -080048 OnPi1();
Austin Schuhe84c3ed2019-12-14 15:29:48 -080049 // Force ourselves to be "raspberrypi" and allocate everything.
Austin Schuh2f8fd752020-09-01 22:38:28 -070050
Austin Schuh0a2f12f2021-01-08 22:48:29 -080051 MakePi1Server();
52 MakePi1Client();
Austin Schuhe84c3ed2019-12-14 15:29:48 -080053
Austin Schuh89e1e9c2023-05-15 14:38:44 -070054 const std::string long_data = std::string(10000, 'a');
55
Austin Schuhe84c3ed2019-12-14 15:29:48 -080056 // And build the app which sends the pings.
57 FLAGS_application_name = "ping";
Austin Schuhf466ab52021-02-16 22:00:38 -080058 aos::ShmEventLoop ping_event_loop(&config.message());
Austin Schuhe84c3ed2019-12-14 15:29:48 -080059 aos::Sender<examples::Ping> ping_sender =
60 ping_event_loop.MakeSender<examples::Ping>("/test");
61
Austin Schuhf466ab52021-02-16 22:00:38 -080062 aos::ShmEventLoop pi1_test_event_loop(&config.message());
Austin Schuh0de30f32020-12-06 12:44:28 -080063 aos::Fetcher<RemoteMessage> message_header_fetcher1 =
64 pi1_test_event_loop.MakeFetcher<RemoteMessage>(
Austin Schuh36a2c3e2021-02-18 22:28:38 -080065 shared() ? "/pi1/aos/remote_timestamps/pi2"
66 : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping");
Austin Schuh2f8fd752020-09-01 22:38:28 -070067
68 // Fetchers for confirming the remote timestamps made it.
69 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
70 ping_event_loop.MakeFetcher<examples::Ping>("/test");
71 aos::Fetcher<Timestamp> pi1_on_pi1_timestamp_fetcher =
72 ping_event_loop.MakeFetcher<Timestamp>("/aos");
73
Austin Schuhe84c3ed2019-12-14 15:29:48 -080074 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -080075 OnPi2();
Austin Schuh2f8fd752020-09-01 22:38:28 -070076
Austin Schuh0a2f12f2021-01-08 22:48:29 -080077 MakePi2Client();
78 MakePi2Server();
Austin Schuhe84c3ed2019-12-14 15:29:48 -080079
80 // And build the app which sends the pongs.
81 FLAGS_application_name = "pong";
Austin Schuhf466ab52021-02-16 22:00:38 -080082 aos::ShmEventLoop pong_event_loop(&config.message());
Austin Schuhe84c3ed2019-12-14 15:29:48 -080083
Austin Schuh7bc59052020-02-16 23:48:33 -080084 // And build the app for testing.
85 FLAGS_application_name = "test";
Austin Schuhf466ab52021-02-16 22:00:38 -080086 aos::ShmEventLoop test_event_loop(&config.message());
Austin Schuh7bc59052020-02-16 23:48:33 -080087
88 aos::Fetcher<ClientStatistics> client_statistics_fetcher =
89 test_event_loop.MakeFetcher<ClientStatistics>("/aos");
Austin Schuh0de30f32020-12-06 12:44:28 -080090 aos::Fetcher<RemoteMessage> message_header_fetcher2 =
91 test_event_loop.MakeFetcher<RemoteMessage>(
Austin Schuh36a2c3e2021-02-18 22:28:38 -080092 shared() ? "/pi2/aos/remote_timestamps/pi1"
93 : "/pi2/aos/remote_timestamps/pi1/pi2/aos/"
94 "aos-message_bridge-Timestamp");
Austin Schuh2f8fd752020-09-01 22:38:28 -070095
96 // Event loop for fetching data delivered to pi2 from pi1 to match up
97 // messages.
Austin Schuhf466ab52021-02-16 22:00:38 -080098 aos::ShmEventLoop delivered_messages_event_loop(&config.message());
Austin Schuh2f8fd752020-09-01 22:38:28 -070099 aos::Fetcher<Timestamp> pi1_on_pi2_timestamp_fetcher =
100 delivered_messages_event_loop.MakeFetcher<Timestamp>("/pi1/aos");
101 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
102 delivered_messages_event_loop.MakeFetcher<examples::Ping>("/test");
103 EXPECT_FALSE(ping_on_pi2_fetcher.Fetch());
104 EXPECT_FALSE(pi1_on_pi2_timestamp_fetcher.Fetch());
Austin Schuh7bc59052020-02-16 23:48:33 -0800105
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800106 // Count the pongs.
107 int pong_count = 0;
Austin Schuh8902fa52021-03-14 22:39:24 -0700108 pong_event_loop.MakeWatcher("/test", [&pong_count, &pong_event_loop,
109 this](const examples::Ping &ping) {
Austin Schuha9012be2021-07-21 15:19:11 -0700110 EXPECT_EQ(pong_event_loop.context().source_boot_uuid, pi1_boot_uuid_);
Austin Schuh8902fa52021-03-14 22:39:24 -0700111 ++pong_count;
112 VLOG(1) << "Got ping back " << FlatbufferToJson(&ping);
113 });
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800114
115 FLAGS_override_hostname = "";
116
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800117 // Wait until we are connected, then send.
118 int ping_count = 0;
Austin Schuh7bc59052020-02-16 23:48:33 -0800119 int pi1_server_statistics_count = 0;
Philipp Schrader790cb542023-07-05 21:06:52 -0700120 ping_event_loop.MakeWatcher(
121 "/pi1/aos",
122 [this, &ping_count, &ping_sender, &pi1_server_statistics_count,
123 &long_data](const ServerStatistics &stats) {
124 VLOG(1) << "/pi1/aos ServerStatistics " << FlatbufferToJson(&stats);
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800125
Philipp Schrader790cb542023-07-05 21:06:52 -0700126 ASSERT_TRUE(stats.has_connections());
127 EXPECT_EQ(stats.connections()->size(), 1);
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800128
Philipp Schrader790cb542023-07-05 21:06:52 -0700129 bool connected = false;
130 for (const ServerConnection *connection : *stats.connections()) {
131 // Confirm that we are estimating the server time offset correctly. It
132 // should be about 0 since we are on the same machine here.
133 if (connection->has_monotonic_offset()) {
134 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
135 chrono::milliseconds(1));
136 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
137 chrono::milliseconds(-1));
138 ++pi1_server_statistics_count;
139 }
Austin Schuh7bc59052020-02-16 23:48:33 -0800140
Philipp Schrader790cb542023-07-05 21:06:52 -0700141 if (connection->node()->name()->string_view() ==
142 pi2_client_event_loop->node()->name()->string_view()) {
143 if (connection->state() == State::CONNECTED) {
144 EXPECT_TRUE(connection->has_boot_uuid());
145 EXPECT_EQ(connection->connection_count(), 1u);
146 EXPECT_LT(monotonic_clock::time_point(chrono::nanoseconds(
147 connection->connected_since_time())),
148 monotonic_clock::now());
149 connected = true;
150 } else {
151 EXPECT_FALSE(connection->has_connection_count());
152 EXPECT_FALSE(connection->has_connected_since_time());
153 }
154 }
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800155 }
156
Philipp Schrader790cb542023-07-05 21:06:52 -0700157 if (connected) {
158 VLOG(1) << "Connected! Sent ping.";
159 auto builder = ping_sender.MakeBuilder();
160 builder.fbb()->CreateString(long_data);
161 examples::Ping::Builder ping_builder =
162 builder.MakeBuilder<examples::Ping>();
163 ping_builder.add_value(ping_count + 971);
164 EXPECT_EQ(builder.Send(ping_builder.Finish()), RawSender::Error::kOk);
165 ++ping_count;
166 }
167 });
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800168
Austin Schuh7bc59052020-02-16 23:48:33 -0800169 // Confirm both client and server statistics messages have decent offsets in
170 // them.
171 int pi2_server_statistics_count = 0;
Austin Schuh196a4452020-03-15 23:12:03 -0700172 pong_event_loop.MakeWatcher("/pi2/aos", [&pi2_server_statistics_count](
Austin Schuh7bc59052020-02-16 23:48:33 -0800173 const ServerStatistics &stats) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800174 VLOG(1) << "/pi2/aos ServerStatistics " << FlatbufferToJson(&stats);
Austin Schuh7bc59052020-02-16 23:48:33 -0800175 for (const ServerConnection *connection : *stats.connections()) {
176 if (connection->has_monotonic_offset()) {
177 ++pi2_server_statistics_count;
178 // Confirm that we are estimating the server time offset correctly. It
179 // should be about 0 since we are on the same machine here.
180 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
181 chrono::milliseconds(1));
182 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
183 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800184 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh7bc59052020-02-16 23:48:33 -0800185 }
Austin Schuh367a7f42021-11-23 23:04:36 -0800186
187 if (connection->state() == State::CONNECTED) {
188 EXPECT_EQ(connection->connection_count(), 1u);
189 EXPECT_LT(monotonic_clock::time_point(
190 chrono::nanoseconds(connection->connected_since_time())),
191 monotonic_clock::now());
192 } else {
Austin Schuha4e616a2023-05-15 17:59:30 -0700193 // If we have been connected, we expect the connection count to stay
194 // around.
195 if (pi2_server_statistics_count > 0) {
196 EXPECT_TRUE(connection->has_connection_count());
197 EXPECT_EQ(connection->connection_count(), 1u);
198 } else {
199 EXPECT_FALSE(connection->has_connection_count());
200 }
Austin Schuh367a7f42021-11-23 23:04:36 -0800201 EXPECT_FALSE(connection->has_connected_since_time());
202 }
Austin Schuh7bc59052020-02-16 23:48:33 -0800203 }
204 });
205
206 int pi1_client_statistics_count = 0;
Austin Schuh367a7f42021-11-23 23:04:36 -0800207 int pi1_connected_client_statistics_count = 0;
208 ping_event_loop.MakeWatcher(
209 "/pi1/aos",
210 [&pi1_client_statistics_count,
211 &pi1_connected_client_statistics_count](const ClientStatistics &stats) {
212 VLOG(1) << "/pi1/aos ClientStatistics " << FlatbufferToJson(&stats);
Austin Schuh7bc59052020-02-16 23:48:33 -0800213
Austin Schuh367a7f42021-11-23 23:04:36 -0800214 for (const ClientConnection *connection : *stats.connections()) {
215 if (connection->has_monotonic_offset()) {
216 ++pi1_client_statistics_count;
217 // It takes at least 10 microseconds to send a message between the
218 // client and server. The min (filtered) time shouldn't be over 10
219 // milliseconds on localhost. This might have to bump up if this is
220 // proving flaky.
221 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
222 chrono::milliseconds(10))
223 << " " << connection->monotonic_offset()
224 << "ns vs 10000ns on iteration " << pi1_client_statistics_count;
225 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
226 chrono::microseconds(10))
227 << " " << connection->monotonic_offset()
228 << "ns vs 10000ns on iteration " << pi1_client_statistics_count;
229 }
230 if (connection->state() == State::CONNECTED) {
231 EXPECT_EQ(connection->connection_count(), 1u);
232 EXPECT_LT(monotonic_clock::time_point(chrono::nanoseconds(
233 connection->connected_since_time())),
234 monotonic_clock::now());
235 // The first Connected message may not have a UUID in it since no
236 // data has flown. That's fine.
237 if (pi1_connected_client_statistics_count > 0) {
238 EXPECT_TRUE(connection->has_boot_uuid())
239 << ": " << aos::FlatbufferToJson(connection);
240 }
241 ++pi1_connected_client_statistics_count;
242 } else {
243 EXPECT_FALSE(connection->has_connection_count());
244 EXPECT_FALSE(connection->has_connected_since_time());
245 }
246 }
247 });
Austin Schuh7bc59052020-02-16 23:48:33 -0800248
249 int pi2_client_statistics_count = 0;
Austin Schuh367a7f42021-11-23 23:04:36 -0800250 int pi2_connected_client_statistics_count = 0;
251 pong_event_loop.MakeWatcher(
252 "/pi2/aos",
253 [&pi2_client_statistics_count,
254 &pi2_connected_client_statistics_count](const ClientStatistics &stats) {
255 VLOG(1) << "/pi2/aos ClientStatistics " << FlatbufferToJson(&stats);
Austin Schuh7bc59052020-02-16 23:48:33 -0800256
Austin Schuh367a7f42021-11-23 23:04:36 -0800257 for (const ClientConnection *connection : *stats.connections()) {
258 if (connection->has_monotonic_offset()) {
259 ++pi2_client_statistics_count;
260 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
261 chrono::milliseconds(10))
262 << ": got " << aos::FlatbufferToJson(connection);
263 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
264 chrono::microseconds(10))
265 << ": got " << aos::FlatbufferToJson(connection);
266 }
267 if (connection->state() == State::CONNECTED) {
268 EXPECT_EQ(connection->connection_count(), 1u);
269 EXPECT_LT(monotonic_clock::time_point(chrono::nanoseconds(
270 connection->connected_since_time())),
271 monotonic_clock::now());
272 if (pi2_connected_client_statistics_count > 0) {
273 EXPECT_TRUE(connection->has_boot_uuid());
274 }
275 ++pi2_connected_client_statistics_count;
276 } else {
Austin Schuha4e616a2023-05-15 17:59:30 -0700277 if (pi2_connected_client_statistics_count == 0) {
278 EXPECT_FALSE(connection->has_connection_count())
279 << aos::FlatbufferToJson(&stats);
280 } else {
281 EXPECT_TRUE(connection->has_connection_count())
282 << aos::FlatbufferToJson(&stats);
283 EXPECT_EQ(connection->connection_count(), 1u);
284 }
Austin Schuh367a7f42021-11-23 23:04:36 -0800285 EXPECT_FALSE(connection->has_connected_since_time());
286 }
287 }
288 });
Austin Schuh7bc59052020-02-16 23:48:33 -0800289
Austin Schuh196a4452020-03-15 23:12:03 -0700290 ping_event_loop.MakeWatcher("/pi1/aos", [](const Timestamp &timestamp) {
Austin Schuh7bc59052020-02-16 23:48:33 -0800291 EXPECT_TRUE(timestamp.has_offsets());
Austin Schuh1ca49e92020-12-11 00:01:27 -0800292 VLOG(1) << "/pi1/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuh7bc59052020-02-16 23:48:33 -0800293 });
Austin Schuh196a4452020-03-15 23:12:03 -0700294 pong_event_loop.MakeWatcher("/pi2/aos", [](const Timestamp &timestamp) {
Austin Schuh7bc59052020-02-16 23:48:33 -0800295 EXPECT_TRUE(timestamp.has_offsets());
Austin Schuh1ca49e92020-12-11 00:01:27 -0800296 VLOG(1) << "/pi2/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuh7bc59052020-02-16 23:48:33 -0800297 });
298
Austin Schuh2f8fd752020-09-01 22:38:28 -0700299 // Find the channel index for both the /pi1/aos Timestamp channel and Ping
300 // channel.
301 const size_t pi1_timestamp_channel = configuration::ChannelIndex(
302 pong_event_loop.configuration(), pi1_on_pi2_timestamp_fetcher.channel());
303 const size_t ping_timestamp_channel =
304 configuration::ChannelIndex(delivered_messages_event_loop.configuration(),
305 ping_on_pi2_fetcher.channel());
306
307 for (const Channel *channel : *ping_event_loop.configuration()->channels()) {
308 VLOG(1) << "Channel "
309 << configuration::ChannelIndex(ping_event_loop.configuration(),
310 channel)
311 << " " << configuration::CleanedChannelToString(channel);
312 }
313
314 // For each remote timestamp we get back, confirm that it is either a ping
315 // message, or a timestamp we sent out. Also confirm that the timestamps are
316 // correct.
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800317 for (std::pair<int, std::string> channel :
318 shared()
319 ? std::vector<std::pair<
320 int, std::string>>{{-1, "/pi1/aos/remote_timestamps/pi2"}}
321 : std::vector<std::pair<int, std::string>>{
322 {pi1_timestamp_channel,
323 "/pi1/aos/remote_timestamps/pi2/pi1/aos/"
324 "aos-message_bridge-Timestamp"},
325 {ping_timestamp_channel,
326 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping"}}) {
327 ping_event_loop.MakeWatcher(
328 channel.second,
329 [pi1_timestamp_channel, ping_timestamp_channel, &ping_on_pi2_fetcher,
330 &ping_on_pi1_fetcher, &pi1_on_pi2_timestamp_fetcher,
331 &pi1_on_pi1_timestamp_fetcher,
332 channel_index = channel.first](const RemoteMessage &header) {
333 VLOG(1) << "/pi1/aos/remote_timestamps/pi2 RemoteMessage "
334 << aos::FlatbufferToJson(&header);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700335
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800336 EXPECT_TRUE(header.has_boot_uuid());
337 if (channel_index != -1) {
338 ASSERT_EQ(channel_index, header.channel_index());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700339 }
340
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800341 const aos::monotonic_clock::time_point header_monotonic_sent_time(
342 chrono::nanoseconds(header.monotonic_sent_time()));
343 const aos::realtime_clock::time_point header_realtime_sent_time(
344 chrono::nanoseconds(header.realtime_sent_time()));
345 const aos::monotonic_clock::time_point header_monotonic_remote_time(
346 chrono::nanoseconds(header.monotonic_remote_time()));
347 const aos::realtime_clock::time_point header_realtime_remote_time(
348 chrono::nanoseconds(header.realtime_remote_time()));
349
350 const Context *pi1_context = nullptr;
351 const Context *pi2_context = nullptr;
352
353 if (header.channel_index() == pi1_timestamp_channel) {
354 // Find the forwarded message.
355 while (pi1_on_pi2_timestamp_fetcher.context().monotonic_event_time <
356 header_monotonic_sent_time) {
357 ASSERT_TRUE(pi1_on_pi2_timestamp_fetcher.FetchNext());
358 }
359
360 // And the source message.
361 while (pi1_on_pi1_timestamp_fetcher.context().monotonic_event_time <
362 header_monotonic_remote_time) {
363 ASSERT_TRUE(pi1_on_pi1_timestamp_fetcher.FetchNext());
364 }
365
366 pi1_context = &pi1_on_pi1_timestamp_fetcher.context();
367 pi2_context = &pi1_on_pi2_timestamp_fetcher.context();
368 } else if (header.channel_index() == ping_timestamp_channel) {
369 // Find the forwarded message.
370 while (ping_on_pi2_fetcher.context().monotonic_event_time <
371 header_monotonic_sent_time) {
372 ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
373 }
374
375 // And the source message.
376 while (ping_on_pi1_fetcher.context().monotonic_event_time <
377 header_monotonic_remote_time) {
378 ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
379 }
380
381 pi1_context = &ping_on_pi1_fetcher.context();
382 pi2_context = &ping_on_pi2_fetcher.context();
383 } else {
384 LOG(FATAL) << "Unknown channel";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700385 }
386
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800387 // Confirm the forwarded message has matching timestamps to the
388 // timestamps we got back.
389 EXPECT_EQ(pi2_context->queue_index, header.queue_index());
390 EXPECT_EQ(pi2_context->monotonic_event_time,
391 header_monotonic_sent_time);
392 EXPECT_EQ(pi2_context->realtime_event_time,
393 header_realtime_sent_time);
394 EXPECT_EQ(pi2_context->realtime_remote_time,
395 header_realtime_remote_time);
396 EXPECT_EQ(pi2_context->monotonic_remote_time,
397 header_monotonic_remote_time);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700398
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800399 // Confirm the forwarded message also matches the source message.
400 EXPECT_EQ(pi1_context->queue_index, header.queue_index());
401 EXPECT_EQ(pi1_context->monotonic_event_time,
402 header_monotonic_remote_time);
403 EXPECT_EQ(pi1_context->realtime_event_time,
404 header_realtime_remote_time);
405 });
406 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700407
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800408 // Start everything up. Pong is the only thing we don't know how to wait
409 // on, so start it first.
Austin Schuha4e616a2023-05-15 17:59:30 -0700410 ThreadedEventLoopRunner pong_thread(&pong_event_loop);
411 ThreadedEventLoopRunner ping_thread(&ping_event_loop);
Austin Schuh7bc59052020-02-16 23:48:33 -0800412
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800413 StartPi1Server();
414 StartPi1Client();
415 StartPi2Client();
416 StartPi2Server();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800417
418 // And go!
Austin Schuha4e616a2023-05-15 17:59:30 -0700419 // Run for 5 seconds to make sure we have time to estimate the offset.
420 std::this_thread::sleep_for(chrono::milliseconds(5050));
Austin Schuh7bc59052020-02-16 23:48:33 -0800421
422 // Confirm that we are estimating a monotonic offset on the client.
423 ASSERT_TRUE(client_statistics_fetcher.Fetch());
424
425 EXPECT_EQ(client_statistics_fetcher->connections()->size(), 1u);
426 EXPECT_EQ(client_statistics_fetcher->connections()
427 ->Get(0)
428 ->node()
429 ->name()
430 ->string_view(),
431 "pi1");
432
433 // Make sure the offset in one direction is less than a second.
434 EXPECT_GT(
Austin Schuh2b159eb2021-07-31 19:42:21 -0700435 client_statistics_fetcher->connections()->Get(0)->monotonic_offset(), 0)
436 << aos::FlatbufferToJson(client_statistics_fetcher.get());
Austin Schuh7bc59052020-02-16 23:48:33 -0800437 EXPECT_LT(
438 client_statistics_fetcher->connections()->Get(0)->monotonic_offset(),
Austin Schuh2b159eb2021-07-31 19:42:21 -0700439 1000000000)
440 << aos::FlatbufferToJson(client_statistics_fetcher.get());
Austin Schuh7bc59052020-02-16 23:48:33 -0800441
Austin Schuha4e616a2023-05-15 17:59:30 -0700442 // Shut everyone else down before confirming everything actually ran.
443 ping_thread.Exit();
444 pong_thread.Exit();
445 StopPi1Server();
446 StopPi1Client();
447 StopPi2Client();
448 StopPi2Server();
449
450 // Make sure we sent something.
451 EXPECT_GE(ping_count, 1);
452 // And got something back.
453 EXPECT_GE(pong_count, 1);
454
Austin Schuh7bc59052020-02-16 23:48:33 -0800455 EXPECT_GE(pi1_server_statistics_count, 2);
456 EXPECT_GE(pi2_server_statistics_count, 2);
457 EXPECT_GE(pi1_client_statistics_count, 2);
458 EXPECT_GE(pi2_client_statistics_count, 2);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700459
460 // Confirm we got timestamps back!
461 EXPECT_TRUE(message_header_fetcher1.Fetch());
462 EXPECT_TRUE(message_header_fetcher2.Fetch());
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800463}
464
Austin Schuh5344c352020-04-12 17:04:26 -0700465// Test that the client disconnecting triggers the server offsets on both sides
466// to clear.
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800467TEST_P(MessageBridgeParameterizedTest, ClientRestart) {
Austin Schuh5344c352020-04-12 17:04:26 -0700468 // This is rather annoying to set up. We need to start up a client and
469 // server, on the same node, but get them to think that they are on different
470 // nodes.
471 //
472 // We need the client to not post directly to "/test" like it would in a
473 // real system, otherwise we will re-send the ping message... So, use an
474 // application specific map to have the client post somewhere else.
475 //
476 // To top this all off, each of these needs to be done with a ShmEventLoop,
477 // which needs to run in a separate thread... And it is really hard to get
478 // everything started up reliably. So just be super generous on timeouts and
479 // hope for the best. We can be more generous in the future if we need to.
480 //
481 // We are faking the application names by passing in --application_name=foo
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800482 OnPi1();
Austin Schuh5344c352020-04-12 17:04:26 -0700483
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800484 MakePi1Server();
485 MakePi1Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700486
487 // And build the app for testing.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800488 MakePi1Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700489 aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800490 pi1_test_event_loop->MakeFetcher<ServerStatistics>("/pi1/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700491
492 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800493 OnPi2();
494 MakePi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700495
496 // And build the app for testing.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800497 MakePi2Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700498 aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800499 pi2_test_event_loop->MakeFetcher<ServerStatistics>("/pi2/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700500
501 // Wait until we are connected, then send.
Austin Schuh5344c352020-04-12 17:04:26 -0700502
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800503 StartPi1Test();
504 StartPi2Test();
505 StartPi1Server();
506 StartPi1Client();
507 StartPi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700508
509 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800510 MakePi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700511
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800512 RunPi2Client(chrono::milliseconds(3050));
Austin Schuh5344c352020-04-12 17:04:26 -0700513
514 // Now confirm we are synchronized.
515 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
516 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
517
518 const ServerConnection *const pi1_connection =
519 pi1_server_statistics_fetcher->connections()->Get(0);
520 const ServerConnection *const pi2_connection =
521 pi2_server_statistics_fetcher->connections()->Get(0);
522
523 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800524 EXPECT_EQ(pi1_connection->connection_count(), 1u);
525 EXPECT_TRUE(pi1_connection->has_connected_since_time());
Austin Schuh5344c352020-04-12 17:04:26 -0700526 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
527 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
528 chrono::milliseconds(1));
529 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
530 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800531 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700532
533 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800534 EXPECT_EQ(pi2_connection->connection_count(), 1u);
535 EXPECT_TRUE(pi2_connection->has_connected_since_time());
Austin Schuh5344c352020-04-12 17:04:26 -0700536 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
537 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
538 chrono::milliseconds(1));
539 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
540 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800541 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800542
543 StopPi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700544 }
545
Austin Schuhd0d894e2021-10-24 17:13:11 -0700546 std::this_thread::sleep_for(SctpClientConnection::kReconnectTimeout +
547 std::chrono::seconds(1));
Austin Schuh5344c352020-04-12 17:04:26 -0700548
549 {
550 // Now confirm we are un-synchronized.
551 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
552 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
553 const ServerConnection *const pi1_connection =
554 pi1_server_statistics_fetcher->connections()->Get(0);
555 const ServerConnection *const pi2_connection =
556 pi2_server_statistics_fetcher->connections()->Get(0);
557
558 EXPECT_EQ(pi1_connection->state(), State::DISCONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800559 EXPECT_EQ(pi1_connection->connection_count(), 1u);
560 EXPECT_FALSE(pi1_connection->has_connected_since_time());
Austin Schuh5344c352020-04-12 17:04:26 -0700561 EXPECT_FALSE(pi1_connection->has_monotonic_offset());
Austin Schuh20ac95d2020-12-05 17:24:19 -0800562 EXPECT_FALSE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700563 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
564 EXPECT_FALSE(pi2_connection->has_monotonic_offset());
Austin Schuh20ac95d2020-12-05 17:24:19 -0800565 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh367a7f42021-11-23 23:04:36 -0800566 EXPECT_EQ(pi2_connection->connection_count(), 1u);
567 EXPECT_TRUE(pi2_connection->has_connected_since_time());
Austin Schuh5344c352020-04-12 17:04:26 -0700568 }
569
570 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800571 MakePi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700572 // And go!
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800573 RunPi2Client(chrono::milliseconds(3050));
Austin Schuh5344c352020-04-12 17:04:26 -0700574
575 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
576 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
577
578 // Now confirm we are synchronized again.
579 const ServerConnection *const pi1_connection =
580 pi1_server_statistics_fetcher->connections()->Get(0);
581 const ServerConnection *const pi2_connection =
582 pi2_server_statistics_fetcher->connections()->Get(0);
583
584 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800585 EXPECT_EQ(pi1_connection->connection_count(), 2u);
586 EXPECT_TRUE(pi1_connection->has_connected_since_time());
Austin Schuh5344c352020-04-12 17:04:26 -0700587 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
588 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
Austin Schuh367a7f42021-11-23 23:04:36 -0800589 chrono::milliseconds(1))
590 << ": " << FlatbufferToJson(pi1_connection);
Austin Schuh5344c352020-04-12 17:04:26 -0700591 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
Austin Schuh367a7f42021-11-23 23:04:36 -0800592 chrono::milliseconds(-1))
593 << ": " << FlatbufferToJson(pi1_connection);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800594 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700595
596 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800597 EXPECT_EQ(pi2_connection->connection_count(), 1u);
598 EXPECT_TRUE(pi2_connection->has_connected_since_time());
Austin Schuh5344c352020-04-12 17:04:26 -0700599 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
600 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
Austin Schuh367a7f42021-11-23 23:04:36 -0800601 chrono::milliseconds(1))
602 << ": " << FlatbufferToJson(pi2_connection);
Austin Schuh5344c352020-04-12 17:04:26 -0700603 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
Austin Schuh367a7f42021-11-23 23:04:36 -0800604 chrono::milliseconds(-1))
605 << ": " << FlatbufferToJson(pi2_connection);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800606 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800607
608 StopPi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700609 }
610
James Kuszmaul79b2f032023-06-02 21:02:27 -0700611 // Shut everyone else down.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800612 StopPi1Server();
613 StopPi1Client();
614 StopPi2Server();
615 StopPi1Test();
616 StopPi2Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700617}
618
619// Test that the server disconnecting triggers the server offsets on the other
620// side to clear, along with the other client.
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800621TEST_P(MessageBridgeParameterizedTest, ServerRestart) {
Austin Schuh5344c352020-04-12 17:04:26 -0700622 // This is rather annoying to set up. We need to start up a client and
623 // server, on the same node, but get them to think that they are on different
624 // nodes.
625 //
626 // We need the client to not post directly to "/test" like it would in a
627 // real system, otherwise we will re-send the ping message... So, use an
628 // application specific map to have the client post somewhere else.
629 //
630 // To top this all off, each of these needs to be done with a ShmEventLoop,
631 // which needs to run in a separate thread... And it is really hard to get
632 // everything started up reliably. So just be super generous on timeouts and
633 // hope for the best. We can be more generous in the future if we need to.
634 //
635 // We are faking the application names by passing in --application_name=foo
Austin Schuh5344c352020-04-12 17:04:26 -0700636 // Force ourselves to be "raspberrypi" and allocate everything.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800637 OnPi1();
638 MakePi1Server();
639 MakePi1Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700640
641 // And build the app for testing.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800642 MakePi1Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700643 aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800644 pi1_test_event_loop->MakeFetcher<ServerStatistics>("/pi1/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700645 aos::Fetcher<ClientStatistics> pi1_client_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800646 pi1_test_event_loop->MakeFetcher<ClientStatistics>("/pi1/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700647
648 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800649 OnPi2();
650 MakePi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700651
652 // And build the app for testing.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800653 MakePi2Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700654 aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800655 pi2_test_event_loop->MakeFetcher<ServerStatistics>("/pi2/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700656
657 // Start everything up. Pong is the only thing we don't know how to wait on,
658 // so start it first.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800659 StartPi1Test();
660 StartPi2Test();
661 StartPi1Server();
662 StartPi1Client();
663 StartPi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700664
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800665 // Confirm both client and server statistics messages have decent offsets in
666 // them.
Austin Schuh5344c352020-04-12 17:04:26 -0700667
668 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800669 MakePi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700670
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800671 RunPi2Server(chrono::milliseconds(3050));
Austin Schuh5344c352020-04-12 17:04:26 -0700672
673 // Now confirm we are synchronized.
674 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
675 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
676
677 const ServerConnection *const pi1_connection =
678 pi1_server_statistics_fetcher->connections()->Get(0);
679 const ServerConnection *const pi2_connection =
680 pi2_server_statistics_fetcher->connections()->Get(0);
681
682 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
683 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
684 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
685 chrono::milliseconds(1));
686 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
687 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800688 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh367a7f42021-11-23 23:04:36 -0800689 EXPECT_TRUE(pi1_connection->has_connected_since_time());
690 EXPECT_EQ(pi1_connection->connection_count(), 1u);
Austin Schuh5344c352020-04-12 17:04:26 -0700691
692 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
693 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
694 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
695 chrono::milliseconds(1));
696 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
697 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800698 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh367a7f42021-11-23 23:04:36 -0800699 EXPECT_TRUE(pi2_connection->has_connected_since_time());
700 EXPECT_EQ(pi2_connection->connection_count(), 1u);
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800701
702 StopPi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700703 }
704
705 std::this_thread::sleep_for(std::chrono::seconds(2));
706
707 {
708 // And confirm we are unsynchronized.
709 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
710 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
711
712 const ServerConnection *const pi1_server_connection =
713 pi1_server_statistics_fetcher->connections()->Get(0);
714 const ClientConnection *const pi1_client_connection =
715 pi1_client_statistics_fetcher->connections()->Get(0);
716
717 EXPECT_EQ(pi1_server_connection->state(), State::CONNECTED);
718 EXPECT_FALSE(pi1_server_connection->has_monotonic_offset());
Austin Schuh367a7f42021-11-23 23:04:36 -0800719 EXPECT_TRUE(pi1_server_connection->has_connected_since_time());
720 EXPECT_EQ(pi1_server_connection->connection_count(), 1u);
721
Austin Schuh20ac95d2020-12-05 17:24:19 -0800722 EXPECT_TRUE(pi1_server_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700723 EXPECT_EQ(pi1_client_connection->state(), State::DISCONNECTED);
724 EXPECT_FALSE(pi1_client_connection->has_monotonic_offset());
Austin Schuh367a7f42021-11-23 23:04:36 -0800725 EXPECT_FALSE(pi1_client_connection->has_connected_since_time());
726 EXPECT_EQ(pi1_client_connection->connection_count(), 1u);
727 EXPECT_FALSE(pi1_client_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700728 }
729
730 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800731 MakePi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700732
Austin Schuh5cd1d752023-08-18 17:31:46 -0700733 // Wait long enough for the client to connect again. It currently takes 3
734 // seconds of connection to estimate the time offset.
735 RunPi2Server(chrono::milliseconds(4050));
Austin Schuh5344c352020-04-12 17:04:26 -0700736
737 // And confirm we are synchronized again.
738 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
739 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
Austin Schuh367a7f42021-11-23 23:04:36 -0800740 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
Austin Schuh5344c352020-04-12 17:04:26 -0700741
742 const ServerConnection *const pi1_connection =
743 pi1_server_statistics_fetcher->connections()->Get(0);
744 const ServerConnection *const pi2_connection =
745 pi2_server_statistics_fetcher->connections()->Get(0);
Austin Schuh367a7f42021-11-23 23:04:36 -0800746 const ClientConnection *const pi1_client_connection =
747 pi1_client_statistics_fetcher->connections()->Get(0);
Austin Schuh5344c352020-04-12 17:04:26 -0700748
749 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
750 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
751 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
752 chrono::milliseconds(1));
753 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
754 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800755 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700756
Austin Schuh367a7f42021-11-23 23:04:36 -0800757 EXPECT_EQ(pi1_client_connection->state(), State::CONNECTED);
758 EXPECT_TRUE(pi1_client_connection->has_connected_since_time());
759 EXPECT_EQ(pi1_client_connection->connection_count(), 2u);
760 EXPECT_TRUE(pi1_client_connection->has_boot_uuid());
761
Austin Schuh5344c352020-04-12 17:04:26 -0700762 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
763 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
764 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
765 chrono::milliseconds(1));
766 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
767 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800768 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800769
770 StopPi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700771 }
772
James Kuszmaul79b2f032023-06-02 21:02:27 -0700773 // Shut everyone else down.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800774 StopPi1Server();
775 StopPi1Client();
776 StopPi2Client();
777 StopPi1Test();
778 StopPi2Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700779}
780
Austin Schuh4889b182020-11-18 19:11:56 -0800781// TODO(austin): The above test confirms that the external state does the right
Austin Schuh5344c352020-04-12 17:04:26 -0700782// thing, but doesn't confirm that the internal state does. We either need to
783// expose a way to check the state in a thread-safe way, or need a way to jump
784// time for one node to do that.
785
Austin Schuh4889b182020-11-18 19:11:56 -0800786void SendPing(aos::Sender<examples::Ping> *sender, int value) {
787 aos::Sender<examples::Ping>::Builder builder = sender->MakeBuilder();
788 examples::Ping::Builder ping_builder = builder.MakeBuilder<examples::Ping>();
789 ping_builder.add_value(value);
milind1f1dca32021-07-03 13:50:07 -0700790 builder.CheckOk(builder.Send(ping_builder.Finish()));
Austin Schuh4889b182020-11-18 19:11:56 -0800791}
792
793// Tests that when a message is sent before the bridge starts up, but is
794// configured as reliable, we forward it. Confirm this survives a client reset.
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800795TEST_P(MessageBridgeParameterizedTest, ReliableSentBeforeClientStartup) {
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800796 OnPi1();
Austin Schuh4889b182020-11-18 19:11:56 -0800797
798 FLAGS_application_name = "sender";
Austin Schuhf466ab52021-02-16 22:00:38 -0800799 aos::ShmEventLoop send_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -0800800 aos::Sender<examples::Ping> ping_sender =
801 send_event_loop.MakeSender<examples::Ping>("/test");
802 SendPing(&ping_sender, 1);
803 aos::Sender<examples::Ping> unreliable_ping_sender =
804 send_event_loop.MakeSender<examples::Ping>("/unreliable");
805 SendPing(&unreliable_ping_sender, 1);
806
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800807 MakePi1Server();
808 MakePi1Client();
Austin Schuh4889b182020-11-18 19:11:56 -0800809
810 FLAGS_application_name = "pi1_timestamp";
Austin Schuhf466ab52021-02-16 22:00:38 -0800811 aos::ShmEventLoop pi1_remote_timestamp_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -0800812
813 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800814 OnPi2();
Austin Schuh4889b182020-11-18 19:11:56 -0800815
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800816 MakePi2Server();
Austin Schuh4889b182020-11-18 19:11:56 -0800817
Austin Schuhf466ab52021-02-16 22:00:38 -0800818 aos::ShmEventLoop receive_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -0800819 aos::Fetcher<examples::Ping> ping_fetcher =
820 receive_event_loop.MakeFetcher<examples::Ping>("/test");
821 aos::Fetcher<examples::Ping> unreliable_ping_fetcher =
822 receive_event_loop.MakeFetcher<examples::Ping>("/unreliable");
823 aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
824 receive_event_loop.MakeFetcher<ClientStatistics>("/pi2/aos");
825
826 const size_t ping_channel_index = configuration::ChannelIndex(
827 receive_event_loop.configuration(), ping_fetcher.channel());
828
James Kuszmaul79b2f032023-06-02 21:02:27 -0700829 // ping_timestamp_count is accessed from multiple threads (the Watcher that
830 // triggers it is in a separate thread), so make it atomic.
Austin Schuh4889b182020-11-18 19:11:56 -0800831 std::atomic<int> ping_timestamp_count{0};
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800832 const std::string channel_name =
833 shared() ? "/pi1/aos/remote_timestamps/pi2"
834 : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping";
Austin Schuh4889b182020-11-18 19:11:56 -0800835 pi1_remote_timestamp_event_loop.MakeWatcher(
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800836 channel_name, [this, channel_name, ping_channel_index,
837 &ping_timestamp_count](const RemoteMessage &header) {
Austin Schuh61e973f2021-02-21 21:43:56 -0800838 VLOG(1) << channel_name << " RemoteMessage "
Austin Schuh0de30f32020-12-06 12:44:28 -0800839 << aos::FlatbufferToJson(&header);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800840 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800841 if (shared() && header.channel_index() != ping_channel_index) {
842 return;
Austin Schuh4889b182020-11-18 19:11:56 -0800843 }
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800844 CHECK_EQ(header.channel_index(), ping_channel_index);
845 ++ping_timestamp_count;
Austin Schuh4889b182020-11-18 19:11:56 -0800846 });
847
848 // Before everything starts up, confirm there is no message.
849 EXPECT_FALSE(ping_fetcher.Fetch());
850 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
851
James Kuszmaul79b2f032023-06-02 21:02:27 -0700852 // Spin up the persistent pieces.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800853 StartPi1Server();
854 StartPi1Client();
855 StartPi2Server();
Austin Schuh4889b182020-11-18 19:11:56 -0800856
857 // Event used to wait for the timestamp counting thread to start.
Austin Schuha4e616a2023-05-15 17:59:30 -0700858 std::unique_ptr<ThreadedEventLoopRunner> pi1_remote_timestamp_thread =
859 std::make_unique<ThreadedEventLoopRunner>(
860 &pi1_remote_timestamp_event_loop);
Austin Schuh4889b182020-11-18 19:11:56 -0800861
862 {
James Kuszmaul79b2f032023-06-02 21:02:27 -0700863 // Now spin up a client for 2 seconds.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800864 MakePi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -0800865
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800866 RunPi2Client(chrono::milliseconds(2050));
Austin Schuh4889b182020-11-18 19:11:56 -0800867
868 // Confirm there is no detected duplicate packet.
869 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
870 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
871 ->Get(0)
872 ->duplicate_packets(),
873 0u);
874
Austin Schuhe61d4382021-03-31 21:33:02 -0700875 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
876 ->Get(0)
877 ->partial_deliveries(),
878 0u);
879
Austin Schuh4889b182020-11-18 19:11:56 -0800880 EXPECT_TRUE(ping_fetcher.Fetch());
881 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
882 EXPECT_EQ(ping_timestamp_count, 1);
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800883
884 StopPi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -0800885 }
886
887 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800888 // Now, spin up a client for 2 seconds.
889 MakePi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -0800890
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800891 RunPi2Client(chrono::milliseconds(5050));
Austin Schuh4889b182020-11-18 19:11:56 -0800892
893 // Confirm we detect the duplicate packet correctly.
894 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
895 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
896 ->Get(0)
897 ->duplicate_packets(),
898 1u);
899
Austin Schuhe61d4382021-03-31 21:33:02 -0700900 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
901 ->Get(0)
902 ->partial_deliveries(),
903 0u);
904
Austin Schuh4889b182020-11-18 19:11:56 -0800905 EXPECT_EQ(ping_timestamp_count, 1);
906 EXPECT_FALSE(ping_fetcher.Fetch());
907 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800908
909 StopPi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -0800910 }
911
James Kuszmaul79b2f032023-06-02 21:02:27 -0700912 // Shut everyone else down.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800913 StopPi1Client();
914 StopPi2Server();
Austin Schuha4e616a2023-05-15 17:59:30 -0700915 pi1_remote_timestamp_thread.reset();
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800916 StopPi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -0800917}
918
919// Tests that when a message is sent before the bridge starts up, but is
920// configured as reliable, we forward it. Confirm this works across server
921// resets.
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800922TEST_P(MessageBridgeParameterizedTest, ReliableSentBeforeServerStartup) {
Austin Schuh4889b182020-11-18 19:11:56 -0800923 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800924 OnPi2();
Austin Schuh4889b182020-11-18 19:11:56 -0800925
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800926 MakePi2Server();
927 MakePi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -0800928
Austin Schuhf466ab52021-02-16 22:00:38 -0800929 aos::ShmEventLoop receive_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -0800930 aos::Fetcher<examples::Ping> ping_fetcher =
931 receive_event_loop.MakeFetcher<examples::Ping>("/test");
932 aos::Fetcher<examples::Ping> unreliable_ping_fetcher =
933 receive_event_loop.MakeFetcher<examples::Ping>("/unreliable");
934 aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
935 receive_event_loop.MakeFetcher<ClientStatistics>("/pi2/aos");
936
Austin Schuh4889b182020-11-18 19:11:56 -0800937 // Force ourselves to be "raspberrypi" and allocate everything.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800938 OnPi1();
Austin Schuh4889b182020-11-18 19:11:56 -0800939
940 FLAGS_application_name = "sender";
Austin Schuhf466ab52021-02-16 22:00:38 -0800941 aos::ShmEventLoop send_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -0800942 aos::Sender<examples::Ping> ping_sender =
943 send_event_loop.MakeSender<examples::Ping>("/test");
944 {
945 aos::Sender<examples::Ping>::Builder builder = ping_sender.MakeBuilder();
946 examples::Ping::Builder ping_builder =
947 builder.MakeBuilder<examples::Ping>();
948 ping_builder.add_value(1);
milind1f1dca32021-07-03 13:50:07 -0700949 builder.CheckOk(builder.Send(ping_builder.Finish()));
Austin Schuh4889b182020-11-18 19:11:56 -0800950 }
951
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800952 MakePi1Client();
Austin Schuh4889b182020-11-18 19:11:56 -0800953
954 FLAGS_application_name = "pi1_timestamp";
Austin Schuhf466ab52021-02-16 22:00:38 -0800955 aos::ShmEventLoop pi1_remote_timestamp_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -0800956
957 const size_t ping_channel_index = configuration::ChannelIndex(
958 receive_event_loop.configuration(), ping_fetcher.channel());
959
James Kuszmaul79b2f032023-06-02 21:02:27 -0700960 // ping_timestamp_count is accessed from multiple threads (the Watcher that
961 // triggers it is in a separate thread), so make it atomic.
Austin Schuh4889b182020-11-18 19:11:56 -0800962 std::atomic<int> ping_timestamp_count{0};
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800963 const std::string channel_name =
964 shared() ? "/pi1/aos/remote_timestamps/pi2"
965 : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping";
Austin Schuh4889b182020-11-18 19:11:56 -0800966 pi1_remote_timestamp_event_loop.MakeWatcher(
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800967 channel_name, [this, channel_name, ping_channel_index,
968 &ping_timestamp_count](const RemoteMessage &header) {
969 VLOG(1) << channel_name << " RemoteMessage "
Austin Schuh0de30f32020-12-06 12:44:28 -0800970 << aos::FlatbufferToJson(&header);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800971 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800972 if (shared() && header.channel_index() != ping_channel_index) {
973 return;
Austin Schuh4889b182020-11-18 19:11:56 -0800974 }
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800975 CHECK_EQ(header.channel_index(), ping_channel_index);
976 ++ping_timestamp_count;
Austin Schuh4889b182020-11-18 19:11:56 -0800977 });
978
979 // Before everything starts up, confirm there is no message.
980 EXPECT_FALSE(ping_fetcher.Fetch());
981 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
982
James Kuszmaul79b2f032023-06-02 21:02:27 -0700983 // Spin up the persistent pieces.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800984 StartPi1Client();
985 StartPi2Server();
986 StartPi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -0800987
Austin Schuha4e616a2023-05-15 17:59:30 -0700988 std::unique_ptr<ThreadedEventLoopRunner> pi1_remote_timestamp_thread =
989 std::make_unique<ThreadedEventLoopRunner>(
990 &pi1_remote_timestamp_event_loop);
Austin Schuh4889b182020-11-18 19:11:56 -0800991
992 {
993 // Now, spin up a server for 2 seconds.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800994 MakePi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -0800995
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800996 RunPi1Server(chrono::milliseconds(2050));
Austin Schuh4889b182020-11-18 19:11:56 -0800997
998 // Confirm there is no detected duplicate packet.
999 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1000 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1001 ->Get(0)
1002 ->duplicate_packets(),
1003 0u);
1004
Austin Schuhe61d4382021-03-31 21:33:02 -07001005 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1006 ->Get(0)
1007 ->partial_deliveries(),
1008 0u);
1009
Austin Schuh4889b182020-11-18 19:11:56 -08001010 EXPECT_TRUE(ping_fetcher.Fetch());
1011 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
1012 EXPECT_EQ(ping_timestamp_count, 1);
1013 LOG(INFO) << "Shutting down first pi1 MessageBridgeServer";
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001014
1015 StopPi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001016 }
1017
1018 {
1019 // Now, spin up a second server for 2 seconds.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001020 MakePi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001021
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001022 RunPi1Server(chrono::milliseconds(2050));
Austin Schuh4889b182020-11-18 19:11:56 -08001023
1024 // Confirm we detect the duplicate packet correctly.
1025 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1026 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1027 ->Get(0)
1028 ->duplicate_packets(),
1029 1u);
1030
Austin Schuhe61d4382021-03-31 21:33:02 -07001031 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1032 ->Get(0)
1033 ->partial_deliveries(),
1034 0u);
1035
Austin Schuh4889b182020-11-18 19:11:56 -08001036 EXPECT_EQ(ping_timestamp_count, 1);
1037 EXPECT_FALSE(ping_fetcher.Fetch());
1038 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001039
1040 StopPi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001041 }
1042
James Kuszmaul79b2f032023-06-02 21:02:27 -07001043 // Shut everyone else down.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001044 StopPi1Client();
1045 StopPi2Server();
1046 StopPi2Client();
Austin Schuha4e616a2023-05-15 17:59:30 -07001047 pi1_remote_timestamp_thread.reset();
Austin Schuh4889b182020-11-18 19:11:56 -08001048}
1049
James Kuszmaul79b2f032023-06-02 21:02:27 -07001050// Tests that when multiple reliable messages are sent during a time when the
1051// client is restarting that only the final of those messages makes it to the
1052// client. This ensures that we handle a disconnecting & reconnecting client
1053// correctly in the server reliable connection retry logic.
1054TEST_P(MessageBridgeParameterizedTest, ReliableSentDuringClientReboot) {
1055 OnPi1();
1056
1057 FLAGS_application_name = "sender";
1058 aos::ShmEventLoop send_event_loop(&config.message());
1059 aos::Sender<examples::Ping> ping_sender =
1060 send_event_loop.MakeSender<examples::Ping>("/test");
1061 size_t ping_index = 0;
1062 SendPing(&ping_sender, ++ping_index);
1063
1064 MakePi1Server();
1065 MakePi1Client();
1066
1067 FLAGS_application_name = "pi1_timestamp";
1068 aos::ShmEventLoop pi1_remote_timestamp_event_loop(&config.message());
1069
1070 // Now do it for "raspberrypi2", the client.
1071 OnPi2();
1072
1073 MakePi2Server();
1074
1075 aos::ShmEventLoop receive_event_loop(&config.message());
1076 aos::Fetcher<examples::Ping> ping_fetcher =
1077 receive_event_loop.MakeFetcher<examples::Ping>("/test");
1078 aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
1079 receive_event_loop.MakeFetcher<ClientStatistics>("/pi2/aos");
1080
1081 const size_t ping_channel_index = configuration::ChannelIndex(
1082 receive_event_loop.configuration(), ping_fetcher.channel());
1083
1084 // ping_timestamp_count is accessed from multiple threads (the Watcher that
1085 // triggers it is in a separate thread), so make it atomic.
1086 std::atomic<int> ping_timestamp_count{0};
1087 const std::string channel_name =
1088 shared() ? "/pi1/aos/remote_timestamps/pi2"
1089 : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping";
1090 pi1_remote_timestamp_event_loop.MakeWatcher(
1091 channel_name, [this, channel_name, ping_channel_index,
1092 &ping_timestamp_count](const RemoteMessage &header) {
1093 VLOG(1) << channel_name << " RemoteMessage "
1094 << aos::FlatbufferToJson(&header);
1095 EXPECT_TRUE(header.has_boot_uuid());
1096 if (shared() && header.channel_index() != ping_channel_index) {
1097 return;
1098 }
1099 CHECK_EQ(header.channel_index(), ping_channel_index);
1100 ++ping_timestamp_count;
1101 });
1102
1103 // Before everything starts up, confirm there is no message.
1104 EXPECT_FALSE(ping_fetcher.Fetch());
1105
1106 // Spin up the persistent pieces.
1107 StartPi1Server();
1108 StartPi1Client();
1109 StartPi2Server();
1110
1111 // Event used to wait for the timestamp counting thread to start.
1112 std::unique_ptr<ThreadedEventLoopRunner> pi1_remote_timestamp_thread =
1113 std::make_unique<ThreadedEventLoopRunner>(
1114 &pi1_remote_timestamp_event_loop);
1115
1116 {
1117 // Now, spin up a client for 2 seconds.
1118 MakePi2Client();
1119
1120 RunPi2Client(chrono::milliseconds(2050));
1121
1122 // Confirm there is no detected duplicate packet.
1123 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1124 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1125 ->Get(0)
1126 ->duplicate_packets(),
1127 0u);
1128
1129 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1130 ->Get(0)
1131 ->partial_deliveries(),
1132 0u);
1133
1134 EXPECT_TRUE(ping_fetcher.Fetch());
1135 EXPECT_EQ(ping_timestamp_count, 1);
1136
1137 StopPi2Client();
1138 }
1139
1140 // Send some reliable messages while the client is dead. Only the final one
1141 // should make it through.
1142 while (ping_index < 10) {
1143 SendPing(&ping_sender, ++ping_index);
1144 }
1145
1146 {
1147 // Now, spin up a client for 2 seconds.
1148 MakePi2Client();
1149
1150 RunPi2Client(chrono::milliseconds(5050));
1151
1152 // No duplicate packets should have appeared.
1153 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1154 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1155 ->Get(0)
1156 ->duplicate_packets(),
1157 0u);
1158
1159 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1160 ->Get(0)
1161 ->partial_deliveries(),
1162 0u);
1163
1164 EXPECT_EQ(ping_timestamp_count, 2);
1165 // We should have gotten precisely one more ping message--the latest one
1166 // sent should've made it, but no previous ones.
1167 EXPECT_TRUE(ping_fetcher.FetchNext());
1168 EXPECT_EQ(ping_index, ping_fetcher->value());
1169 EXPECT_FALSE(ping_fetcher.FetchNext());
1170
1171 StopPi2Client();
1172 }
1173
1174 // Shut everyone else down.
1175 StopPi1Client();
1176 StopPi2Server();
1177 pi1_remote_timestamp_thread.reset();
1178 StopPi1Server();
1179}
1180
Austin Schuh4f7cccc2023-05-17 09:20:33 -07001181// Test that differing config sha256's result in no connection.
1182TEST_P(MessageBridgeParameterizedTest, MismatchedSha256) {
1183 // This is rather annoying to set up. We need to start up a client and
1184 // server, on the same node, but get them to think that they are on different
1185 // nodes.
1186 //
1187 // We need the client to not post directly to "/test" like it would in a
1188 // real system, otherwise we will re-send the ping message... So, use an
1189 // application specific map to have the client post somewhere else.
1190 //
1191 // To top this all off, each of these needs to be done with a ShmEventLoop,
1192 // which needs to run in a separate thread... And it is really hard to get
1193 // everything started up reliably. So just be super generous on timeouts and
1194 // hope for the best. We can be more generous in the future if we need to.
1195 //
1196 // We are faking the application names by passing in --application_name=foo
1197 OnPi1();
1198
1199 MakePi1Server(
1200 "dummy sha256 ");
1201 MakePi1Client();
1202
1203 // And build the app for testing.
1204 MakePi1Test();
1205 aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
1206 pi1_test_event_loop->MakeFetcher<ServerStatistics>("/pi1/aos");
1207 aos::Fetcher<ClientStatistics> pi1_client_statistics_fetcher =
1208 pi1_test_event_loop->MakeFetcher<ClientStatistics>("/pi1/aos");
1209
1210 // Now do it for "raspberrypi2", the client.
1211 OnPi2();
1212 MakePi2Server();
1213
1214 // And build the app for testing.
1215 MakePi2Test();
1216 aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
1217 pi2_test_event_loop->MakeFetcher<ServerStatistics>("/pi2/aos");
1218 aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
1219 pi2_test_event_loop->MakeFetcher<ClientStatistics>("/pi2/aos");
1220
1221 // Wait until we are connected, then send.
1222
1223 StartPi1Test();
1224 StartPi2Test();
1225 StartPi1Server();
1226 StartPi1Client();
1227 StartPi2Server();
1228
1229 {
1230 MakePi2Client();
1231
1232 RunPi2Client(chrono::milliseconds(3050));
1233
1234 // Now confirm we are synchronized.
1235 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1236 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1237 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1238 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1239
1240 const ServerConnection *const pi1_connection =
1241 pi1_server_statistics_fetcher->connections()->Get(0);
1242 const ClientConnection *const pi1_client_connection =
1243 pi1_client_statistics_fetcher->connections()->Get(0);
1244 const ServerConnection *const pi2_connection =
1245 pi2_server_statistics_fetcher->connections()->Get(0);
1246 const ClientConnection *const pi2_client_connection =
1247 pi2_client_statistics_fetcher->connections()->Get(0);
1248
1249 // Make sure one direction is disconnected with a bunch of connection
1250 // attempts and failures.
1251 EXPECT_EQ(pi1_connection->state(), State::DISCONNECTED);
1252 EXPECT_EQ(pi1_connection->connection_count(), 0u);
1253 EXPECT_GT(pi1_connection->invalid_connection_count(), 10u);
1254
1255 EXPECT_EQ(pi2_client_connection->state(), State::DISCONNECTED);
1256 EXPECT_GT(pi2_client_connection->connection_count(), 10u);
1257
1258 // And the other direction is happy.
1259 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
1260 EXPECT_EQ(pi2_connection->connection_count(), 1u);
1261 EXPECT_TRUE(pi2_connection->has_connected_since_time());
1262 EXPECT_FALSE(pi2_connection->has_monotonic_offset());
1263 EXPECT_TRUE(pi2_connection->has_boot_uuid());
1264
1265 EXPECT_EQ(pi1_client_connection->state(), State::CONNECTED);
1266 EXPECT_EQ(pi1_client_connection->connection_count(), 1u);
1267
1268 VLOG(1) << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1269 VLOG(1) << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1270 VLOG(1) << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1271 VLOG(1) << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1272
1273 StopPi2Client();
1274 }
1275
James Kuszmaul79b2f032023-06-02 21:02:27 -07001276 // Shut everyone else down.
Austin Schuh4f7cccc2023-05-17 09:20:33 -07001277 StopPi1Server();
1278 StopPi1Client();
1279 StopPi2Server();
1280 StopPi1Test();
1281 StopPi2Test();
1282}
1283
Austin Schuh89f23e32023-05-15 17:06:43 -07001284// Test that a client which connects with too big a message gets disconnected
1285// without crashing.
1286TEST_P(MessageBridgeParameterizedTest, TooBigConnect) {
Austin Schuhb0e439d2023-05-15 10:55:40 -07001287 // This is rather annoying to set up. We need to start up a client and
1288 // server, on the same node, but get them to think that they are on different
1289 // nodes.
1290 //
1291 // We need the client to not post directly to "/test" like it would in a
1292 // real system, otherwise we will re-send the ping message... So, use an
1293 // application specific map to have the client post somewhere else.
1294 //
1295 // To top this all off, each of these needs to be done with a ShmEventLoop,
1296 // which needs to run in a separate thread... And it is really hard to get
1297 // everything started up reliably. So just be super generous on timeouts and
1298 // hope for the best. We can be more generous in the future if we need to.
1299 //
1300 // We are faking the application names by passing in --application_name=foo
1301 OnPi1();
1302
Sarah Newman3a3b5b82023-05-26 15:56:53 -07001303 MakePi1Server();
Austin Schuhb0e439d2023-05-15 10:55:40 -07001304 MakePi1Client();
1305
1306 // And build the app for testing.
1307 MakePi1Test();
1308 aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
1309 pi1_test_event_loop->MakeFetcher<ServerStatistics>("/pi1/aos");
1310 aos::Fetcher<ClientStatistics> pi1_client_statistics_fetcher =
1311 pi1_test_event_loop->MakeFetcher<ClientStatistics>("/pi1/aos");
1312
1313 // Now do it for "raspberrypi2", the client.
1314 OnPi2();
1315 MakePi2Server();
1316
1317 // And build the app for testing.
1318 MakePi2Test();
1319 aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
1320 pi2_test_event_loop->MakeFetcher<ServerStatistics>("/pi2/aos");
1321 aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
1322 pi2_test_event_loop->MakeFetcher<ClientStatistics>("/pi2/aos");
1323
1324 // Wait until we are connected, then send.
1325
1326 StartPi1Test();
1327 StartPi2Test();
1328 StartPi1Server();
1329 StartPi1Client();
1330 StartPi2Server();
1331
1332 {
Austin Schuh89f23e32023-05-15 17:06:43 -07001333 // Now, spin up a SctpClient and send a massive hunk of data. This should
1334 // trigger a disconnect, but no crash.
1335 OnPi2();
1336 FLAGS_application_name = "pi2_message_bridge_client";
1337 pi2_client_event_loop =
1338 std::make_unique<aos::ShmEventLoop>(&config.message());
1339 pi2_client_event_loop->SetRuntimeRealtimePriority(1);
1340
1341 const aos::Node *const remote_node = CHECK_NOTNULL(
1342 configuration::GetNode(pi2_client_event_loop->configuration(), "pi1"));
1343
1344 const aos::FlatbufferDetachedBuffer<aos::message_bridge::Connect>
1345 connect_message(MakeConnectMessage(
1346 pi2_client_event_loop->configuration(),
1347 pi2_client_event_loop->node(), "pi1",
1348 pi2_client_event_loop->boot_uuid(), config_sha256));
1349
1350 SctpClient client(remote_node->hostname()->string_view(),
1351 remote_node->port(),
1352 connect_message.message().channels_to_transfer()->size() +
1353 kControlStreams(),
1354 "");
1355
Austin Schuh89f23e32023-05-15 17:06:43 -07001356 client.SetPoolSize(2u);
1357
Sarah Newman3a3b5b82023-05-26 15:56:53 -07001358 // Passes on a machine with:
1359 // 5.4.0-147-generic
1360 // net.core.wmem_default = 212992
1361 // net.core.wmem_max = 212992
1362 // net.core.rmem_default = 212992
1363 // net.core.rmem_max = 212992
1364 // If too large it appears the message is never delivered to the
1365 // application.
1366 constexpr size_t kBigMessageSize = 64000;
1367 client.SetMaxReadSize(kBigMessageSize);
1368 client.SetMaxWriteSize(kBigMessageSize);
1369
1370 const std::string big_data(kBigMessageSize, 'a');
Austin Schuh89f23e32023-05-15 17:06:43 -07001371
1372 pi2_client_event_loop->epoll()->OnReadable(client.fd(), [&]() {
1373 aos::unique_c_ptr<Message> message = client.Read();
1374 client.FreeMessage(std::move(message));
1375 });
1376
1377 aos::TimerHandler *const send_big_message = pi2_client_event_loop->AddTimer(
1378 [&]() { CHECK(client.Send(kConnectStream(), big_data, 0)); });
1379
1380 pi2_client_event_loop->OnRun([this, send_big_message]() {
1381 send_big_message->Schedule(pi2_client_event_loop->monotonic_now() +
1382 chrono::seconds(1));
1383 });
Austin Schuhb0e439d2023-05-15 10:55:40 -07001384
1385 RunPi2Client(chrono::milliseconds(3050));
1386
1387 // Now confirm we are synchronized.
1388 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1389 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1390 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
Austin Schuh89f23e32023-05-15 17:06:43 -07001391 EXPECT_FALSE(pi2_client_statistics_fetcher.Fetch());
Austin Schuhb0e439d2023-05-15 10:55:40 -07001392
1393 const ServerConnection *const pi1_connection =
1394 pi1_server_statistics_fetcher->connections()->Get(0);
1395 const ClientConnection *const pi1_client_connection =
1396 pi1_client_statistics_fetcher->connections()->Get(0);
1397 const ServerConnection *const pi2_connection =
1398 pi2_server_statistics_fetcher->connections()->Get(0);
Austin Schuhb0e439d2023-05-15 10:55:40 -07001399
Austin Schuh4f7cccc2023-05-17 09:20:33 -07001400 // Make sure the server we just sent a bunch of junk to is grumpy and
1401 // disconnected the bad client.
Austin Schuhb0e439d2023-05-15 10:55:40 -07001402 EXPECT_EQ(pi1_connection->state(), State::DISCONNECTED);
1403 EXPECT_EQ(pi1_connection->connection_count(), 0u);
Austin Schuh89f23e32023-05-15 17:06:43 -07001404 EXPECT_GE(pi1_server_statistics_fetcher->invalid_connection_count(), 1u);
Austin Schuhb0e439d2023-05-15 10:55:40 -07001405
1406 // And the other direction is happy.
1407 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
1408 EXPECT_EQ(pi2_connection->connection_count(), 1u);
1409 EXPECT_TRUE(pi2_connection->has_connected_since_time());
1410 EXPECT_FALSE(pi2_connection->has_monotonic_offset());
1411 EXPECT_TRUE(pi2_connection->has_boot_uuid());
1412
1413 EXPECT_EQ(pi1_client_connection->state(), State::CONNECTED);
1414 EXPECT_EQ(pi1_client_connection->connection_count(), 1u);
1415
1416 VLOG(1) << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1417 VLOG(1) << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
Austin Schuhb0e439d2023-05-15 10:55:40 -07001418 VLOG(1) << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1419
Austin Schuh89f23e32023-05-15 17:06:43 -07001420 pi2_client_event_loop->epoll()->DeleteFd(client.fd());
1421
Austin Schuhb0e439d2023-05-15 10:55:40 -07001422 StopPi2Client();
1423 }
1424
James Kuszmaul79b2f032023-06-02 21:02:27 -07001425 // Shut everyone else down.
Austin Schuhb0e439d2023-05-15 10:55:40 -07001426 StopPi1Server();
1427 StopPi1Client();
1428 StopPi2Server();
1429 StopPi1Test();
1430 StopPi2Test();
1431}
1432
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -07001433INSTANTIATE_TEST_SUITE_P(
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001434 MessageBridgeTests, MessageBridgeParameterizedTest,
1435 ::testing::Values(
1436 Param{"message_bridge_test_combined_timestamps_common_config.json",
1437 true},
1438 Param{"message_bridge_test_common_config.json", false}));
1439
Stephan Pleinesf63bde82024-01-13 15:59:33 -08001440} // namespace aos::message_bridge::testing