blob: cc469263c8e07629241c1bf72aac81640b0feec9 [file] [log] [blame]
Austin Schuhe84c3ed2019-12-14 15:29:48 -08001#include <chrono>
2#include <thread>
3
Austin Schuh2f8fd752020-09-01 22:38:28 -07004#include "absl/strings/str_cat.h"
Philipp Schrader790cb542023-07-05 21:06:52 -07005#include "gtest/gtest.h"
6
Austin Schuhe84c3ed2019-12-14 15:29:48 -08007#include "aos/events/ping_generated.h"
8#include "aos/events/pong_generated.h"
Brian Silverman7b266d92021-02-17 21:24:02 -08009#include "aos/ipc_lib/event.h"
Austin Schuhe84c3ed2019-12-14 15:29:48 -080010#include "aos/network/message_bridge_client_lib.h"
Austin Schuh89f23e32023-05-15 17:06:43 -070011#include "aos/network/message_bridge_protocol.h"
Austin Schuhe84c3ed2019-12-14 15:29:48 -080012#include "aos/network/message_bridge_server_lib.h"
James Kuszmaul79b2f032023-06-02 21:02:27 -070013#include "aos/network/message_bridge_test_lib.h"
Jim Ostrowski2192ddb2020-06-24 19:07:31 -070014#include "aos/network/team_number.h"
Austin Schuhb0e439d2023-05-15 10:55:40 -070015#include "aos/sha256.h"
Austin Schuh373f1762021-06-02 21:07:09 -070016#include "aos/testing/path.h"
Austin Schuhe991fe22020-11-18 16:53:39 -080017#include "aos/util/file.h"
Austin Schuhe84c3ed2019-12-14 15:29:48 -080018
19namespace aos {
Austin Schuh2f8fd752020-09-01 22:38:28 -070020
Austin Schuhe84c3ed2019-12-14 15:29:48 -080021namespace message_bridge {
22namespace testing {
23
James Kuszmaul79b2f032023-06-02 21:02:27 -070024// Note: All of these tests spin up ShmEventLoop's in separate threads to allow
25// us to run the "real" message bridge. This requires extra threading and timing
26// coordination to make happen, which is the reason for some of the extra
27// complexity in these tests.
Austin Schuhe991fe22020-11-18 16:53:39 -080028
Austin Schuhe84c3ed2019-12-14 15:29:48 -080029// Test that we can send a ping message over sctp and receive it.
Austin Schuh36a2c3e2021-02-18 22:28:38 -080030TEST_P(MessageBridgeParameterizedTest, PingPong) {
Austin Schuhe84c3ed2019-12-14 15:29:48 -080031 // This is rather annoying to set up. We need to start up a client and
32 // server, on the same node, but get them to think that they are on different
33 // nodes.
34 //
35 // We then get to wait until they are connected.
36 //
37 // After they are connected, we send a Ping message.
38 //
39 // On the other end, we receive a Pong message.
40 //
41 // But, we need the client to not post directly to "/test" like it would in a
42 // real system, otherwise we will re-send the ping message... So, use an
43 // application specific map to have the client post somewhere else.
44 //
45 // To top this all off, each of these needs to be done with a ShmEventLoop,
46 // which needs to run in a separate thread... And it is really hard to get
47 // everything started up reliably. So just be super generous on timeouts and
48 // hope for the best. We can be more generous in the future if we need to.
49 //
50 // We are faking the application names by passing in --application_name=foo
Austin Schuh0a2f12f2021-01-08 22:48:29 -080051 OnPi1();
Austin Schuhe84c3ed2019-12-14 15:29:48 -080052 // Force ourselves to be "raspberrypi" and allocate everything.
Austin Schuh2f8fd752020-09-01 22:38:28 -070053
Austin Schuh0a2f12f2021-01-08 22:48:29 -080054 MakePi1Server();
55 MakePi1Client();
Austin Schuhe84c3ed2019-12-14 15:29:48 -080056
Austin Schuh89e1e9c2023-05-15 14:38:44 -070057 const std::string long_data = std::string(10000, 'a');
58
Austin Schuhe84c3ed2019-12-14 15:29:48 -080059 // And build the app which sends the pings.
60 FLAGS_application_name = "ping";
Austin Schuhf466ab52021-02-16 22:00:38 -080061 aos::ShmEventLoop ping_event_loop(&config.message());
Austin Schuhe84c3ed2019-12-14 15:29:48 -080062 aos::Sender<examples::Ping> ping_sender =
63 ping_event_loop.MakeSender<examples::Ping>("/test");
64
Austin Schuhf466ab52021-02-16 22:00:38 -080065 aos::ShmEventLoop pi1_test_event_loop(&config.message());
Austin Schuh0de30f32020-12-06 12:44:28 -080066 aos::Fetcher<RemoteMessage> message_header_fetcher1 =
67 pi1_test_event_loop.MakeFetcher<RemoteMessage>(
Austin Schuh36a2c3e2021-02-18 22:28:38 -080068 shared() ? "/pi1/aos/remote_timestamps/pi2"
69 : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping");
Austin Schuh2f8fd752020-09-01 22:38:28 -070070
71 // Fetchers for confirming the remote timestamps made it.
72 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
73 ping_event_loop.MakeFetcher<examples::Ping>("/test");
74 aos::Fetcher<Timestamp> pi1_on_pi1_timestamp_fetcher =
75 ping_event_loop.MakeFetcher<Timestamp>("/aos");
76
Austin Schuhe84c3ed2019-12-14 15:29:48 -080077 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -080078 OnPi2();
Austin Schuh2f8fd752020-09-01 22:38:28 -070079
Austin Schuh0a2f12f2021-01-08 22:48:29 -080080 MakePi2Client();
81 MakePi2Server();
Austin Schuhe84c3ed2019-12-14 15:29:48 -080082
83 // And build the app which sends the pongs.
84 FLAGS_application_name = "pong";
Austin Schuhf466ab52021-02-16 22:00:38 -080085 aos::ShmEventLoop pong_event_loop(&config.message());
Austin Schuhe84c3ed2019-12-14 15:29:48 -080086
Austin Schuh7bc59052020-02-16 23:48:33 -080087 // And build the app for testing.
88 FLAGS_application_name = "test";
Austin Schuhf466ab52021-02-16 22:00:38 -080089 aos::ShmEventLoop test_event_loop(&config.message());
Austin Schuh7bc59052020-02-16 23:48:33 -080090
91 aos::Fetcher<ClientStatistics> client_statistics_fetcher =
92 test_event_loop.MakeFetcher<ClientStatistics>("/aos");
Austin Schuh0de30f32020-12-06 12:44:28 -080093 aos::Fetcher<RemoteMessage> message_header_fetcher2 =
94 test_event_loop.MakeFetcher<RemoteMessage>(
Austin Schuh36a2c3e2021-02-18 22:28:38 -080095 shared() ? "/pi2/aos/remote_timestamps/pi1"
96 : "/pi2/aos/remote_timestamps/pi1/pi2/aos/"
97 "aos-message_bridge-Timestamp");
Austin Schuh2f8fd752020-09-01 22:38:28 -070098
99 // Event loop for fetching data delivered to pi2 from pi1 to match up
100 // messages.
Austin Schuhf466ab52021-02-16 22:00:38 -0800101 aos::ShmEventLoop delivered_messages_event_loop(&config.message());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700102 aos::Fetcher<Timestamp> pi1_on_pi2_timestamp_fetcher =
103 delivered_messages_event_loop.MakeFetcher<Timestamp>("/pi1/aos");
104 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
105 delivered_messages_event_loop.MakeFetcher<examples::Ping>("/test");
106 EXPECT_FALSE(ping_on_pi2_fetcher.Fetch());
107 EXPECT_FALSE(pi1_on_pi2_timestamp_fetcher.Fetch());
Austin Schuh7bc59052020-02-16 23:48:33 -0800108
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800109 // Count the pongs.
110 int pong_count = 0;
Austin Schuh8902fa52021-03-14 22:39:24 -0700111 pong_event_loop.MakeWatcher("/test", [&pong_count, &pong_event_loop,
112 this](const examples::Ping &ping) {
Austin Schuha9012be2021-07-21 15:19:11 -0700113 EXPECT_EQ(pong_event_loop.context().source_boot_uuid, pi1_boot_uuid_);
Austin Schuh8902fa52021-03-14 22:39:24 -0700114 ++pong_count;
115 VLOG(1) << "Got ping back " << FlatbufferToJson(&ping);
116 });
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800117
118 FLAGS_override_hostname = "";
119
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800120 // Wait until we are connected, then send.
121 int ping_count = 0;
Austin Schuh7bc59052020-02-16 23:48:33 -0800122 int pi1_server_statistics_count = 0;
Philipp Schrader790cb542023-07-05 21:06:52 -0700123 ping_event_loop.MakeWatcher(
124 "/pi1/aos",
125 [this, &ping_count, &ping_sender, &pi1_server_statistics_count,
126 &long_data](const ServerStatistics &stats) {
127 VLOG(1) << "/pi1/aos ServerStatistics " << FlatbufferToJson(&stats);
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800128
Philipp Schrader790cb542023-07-05 21:06:52 -0700129 ASSERT_TRUE(stats.has_connections());
130 EXPECT_EQ(stats.connections()->size(), 1);
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800131
Philipp Schrader790cb542023-07-05 21:06:52 -0700132 bool connected = false;
133 for (const ServerConnection *connection : *stats.connections()) {
134 // Confirm that we are estimating the server time offset correctly. It
135 // should be about 0 since we are on the same machine here.
136 if (connection->has_monotonic_offset()) {
137 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
138 chrono::milliseconds(1));
139 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
140 chrono::milliseconds(-1));
141 ++pi1_server_statistics_count;
142 }
Austin Schuh7bc59052020-02-16 23:48:33 -0800143
Philipp Schrader790cb542023-07-05 21:06:52 -0700144 if (connection->node()->name()->string_view() ==
145 pi2_client_event_loop->node()->name()->string_view()) {
146 if (connection->state() == State::CONNECTED) {
147 EXPECT_TRUE(connection->has_boot_uuid());
148 EXPECT_EQ(connection->connection_count(), 1u);
149 EXPECT_LT(monotonic_clock::time_point(chrono::nanoseconds(
150 connection->connected_since_time())),
151 monotonic_clock::now());
152 connected = true;
153 } else {
154 EXPECT_FALSE(connection->has_connection_count());
155 EXPECT_FALSE(connection->has_connected_since_time());
156 }
157 }
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800158 }
159
Philipp Schrader790cb542023-07-05 21:06:52 -0700160 if (connected) {
161 VLOG(1) << "Connected! Sent ping.";
162 auto builder = ping_sender.MakeBuilder();
163 builder.fbb()->CreateString(long_data);
164 examples::Ping::Builder ping_builder =
165 builder.MakeBuilder<examples::Ping>();
166 ping_builder.add_value(ping_count + 971);
167 EXPECT_EQ(builder.Send(ping_builder.Finish()), RawSender::Error::kOk);
168 ++ping_count;
169 }
170 });
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800171
Austin Schuh7bc59052020-02-16 23:48:33 -0800172 // Confirm both client and server statistics messages have decent offsets in
173 // them.
174 int pi2_server_statistics_count = 0;
Austin Schuh196a4452020-03-15 23:12:03 -0700175 pong_event_loop.MakeWatcher("/pi2/aos", [&pi2_server_statistics_count](
Austin Schuh7bc59052020-02-16 23:48:33 -0800176 const ServerStatistics &stats) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800177 VLOG(1) << "/pi2/aos ServerStatistics " << FlatbufferToJson(&stats);
Austin Schuh7bc59052020-02-16 23:48:33 -0800178 for (const ServerConnection *connection : *stats.connections()) {
179 if (connection->has_monotonic_offset()) {
180 ++pi2_server_statistics_count;
181 // Confirm that we are estimating the server time offset correctly. It
182 // should be about 0 since we are on the same machine here.
183 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
184 chrono::milliseconds(1));
185 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
186 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800187 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh7bc59052020-02-16 23:48:33 -0800188 }
Austin Schuh367a7f42021-11-23 23:04:36 -0800189
190 if (connection->state() == State::CONNECTED) {
191 EXPECT_EQ(connection->connection_count(), 1u);
192 EXPECT_LT(monotonic_clock::time_point(
193 chrono::nanoseconds(connection->connected_since_time())),
194 monotonic_clock::now());
195 } else {
Austin Schuha4e616a2023-05-15 17:59:30 -0700196 // If we have been connected, we expect the connection count to stay
197 // around.
198 if (pi2_server_statistics_count > 0) {
199 EXPECT_TRUE(connection->has_connection_count());
200 EXPECT_EQ(connection->connection_count(), 1u);
201 } else {
202 EXPECT_FALSE(connection->has_connection_count());
203 }
Austin Schuh367a7f42021-11-23 23:04:36 -0800204 EXPECT_FALSE(connection->has_connected_since_time());
205 }
Austin Schuh7bc59052020-02-16 23:48:33 -0800206 }
207 });
208
209 int pi1_client_statistics_count = 0;
Austin Schuh367a7f42021-11-23 23:04:36 -0800210 int pi1_connected_client_statistics_count = 0;
211 ping_event_loop.MakeWatcher(
212 "/pi1/aos",
213 [&pi1_client_statistics_count,
214 &pi1_connected_client_statistics_count](const ClientStatistics &stats) {
215 VLOG(1) << "/pi1/aos ClientStatistics " << FlatbufferToJson(&stats);
Austin Schuh7bc59052020-02-16 23:48:33 -0800216
Austin Schuh367a7f42021-11-23 23:04:36 -0800217 for (const ClientConnection *connection : *stats.connections()) {
218 if (connection->has_monotonic_offset()) {
219 ++pi1_client_statistics_count;
220 // It takes at least 10 microseconds to send a message between the
221 // client and server. The min (filtered) time shouldn't be over 10
222 // milliseconds on localhost. This might have to bump up if this is
223 // proving flaky.
224 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
225 chrono::milliseconds(10))
226 << " " << connection->monotonic_offset()
227 << "ns vs 10000ns on iteration " << pi1_client_statistics_count;
228 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
229 chrono::microseconds(10))
230 << " " << connection->monotonic_offset()
231 << "ns vs 10000ns on iteration " << pi1_client_statistics_count;
232 }
233 if (connection->state() == State::CONNECTED) {
234 EXPECT_EQ(connection->connection_count(), 1u);
235 EXPECT_LT(monotonic_clock::time_point(chrono::nanoseconds(
236 connection->connected_since_time())),
237 monotonic_clock::now());
238 // The first Connected message may not have a UUID in it since no
239 // data has flown. That's fine.
240 if (pi1_connected_client_statistics_count > 0) {
241 EXPECT_TRUE(connection->has_boot_uuid())
242 << ": " << aos::FlatbufferToJson(connection);
243 }
244 ++pi1_connected_client_statistics_count;
245 } else {
246 EXPECT_FALSE(connection->has_connection_count());
247 EXPECT_FALSE(connection->has_connected_since_time());
248 }
249 }
250 });
Austin Schuh7bc59052020-02-16 23:48:33 -0800251
252 int pi2_client_statistics_count = 0;
Austin Schuh367a7f42021-11-23 23:04:36 -0800253 int pi2_connected_client_statistics_count = 0;
254 pong_event_loop.MakeWatcher(
255 "/pi2/aos",
256 [&pi2_client_statistics_count,
257 &pi2_connected_client_statistics_count](const ClientStatistics &stats) {
258 VLOG(1) << "/pi2/aos ClientStatistics " << FlatbufferToJson(&stats);
Austin Schuh7bc59052020-02-16 23:48:33 -0800259
Austin Schuh367a7f42021-11-23 23:04:36 -0800260 for (const ClientConnection *connection : *stats.connections()) {
261 if (connection->has_monotonic_offset()) {
262 ++pi2_client_statistics_count;
263 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
264 chrono::milliseconds(10))
265 << ": got " << aos::FlatbufferToJson(connection);
266 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
267 chrono::microseconds(10))
268 << ": got " << aos::FlatbufferToJson(connection);
269 }
270 if (connection->state() == State::CONNECTED) {
271 EXPECT_EQ(connection->connection_count(), 1u);
272 EXPECT_LT(monotonic_clock::time_point(chrono::nanoseconds(
273 connection->connected_since_time())),
274 monotonic_clock::now());
275 if (pi2_connected_client_statistics_count > 0) {
276 EXPECT_TRUE(connection->has_boot_uuid());
277 }
278 ++pi2_connected_client_statistics_count;
279 } else {
Austin Schuha4e616a2023-05-15 17:59:30 -0700280 if (pi2_connected_client_statistics_count == 0) {
281 EXPECT_FALSE(connection->has_connection_count())
282 << aos::FlatbufferToJson(&stats);
283 } else {
284 EXPECT_TRUE(connection->has_connection_count())
285 << aos::FlatbufferToJson(&stats);
286 EXPECT_EQ(connection->connection_count(), 1u);
287 }
Austin Schuh367a7f42021-11-23 23:04:36 -0800288 EXPECT_FALSE(connection->has_connected_since_time());
289 }
290 }
291 });
Austin Schuh7bc59052020-02-16 23:48:33 -0800292
Austin Schuh196a4452020-03-15 23:12:03 -0700293 ping_event_loop.MakeWatcher("/pi1/aos", [](const Timestamp &timestamp) {
Austin Schuh7bc59052020-02-16 23:48:33 -0800294 EXPECT_TRUE(timestamp.has_offsets());
Austin Schuh1ca49e92020-12-11 00:01:27 -0800295 VLOG(1) << "/pi1/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuh7bc59052020-02-16 23:48:33 -0800296 });
Austin Schuh196a4452020-03-15 23:12:03 -0700297 pong_event_loop.MakeWatcher("/pi2/aos", [](const Timestamp &timestamp) {
Austin Schuh7bc59052020-02-16 23:48:33 -0800298 EXPECT_TRUE(timestamp.has_offsets());
Austin Schuh1ca49e92020-12-11 00:01:27 -0800299 VLOG(1) << "/pi2/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuh7bc59052020-02-16 23:48:33 -0800300 });
301
Austin Schuh2f8fd752020-09-01 22:38:28 -0700302 // Find the channel index for both the /pi1/aos Timestamp channel and Ping
303 // channel.
304 const size_t pi1_timestamp_channel = configuration::ChannelIndex(
305 pong_event_loop.configuration(), pi1_on_pi2_timestamp_fetcher.channel());
306 const size_t ping_timestamp_channel =
307 configuration::ChannelIndex(delivered_messages_event_loop.configuration(),
308 ping_on_pi2_fetcher.channel());
309
310 for (const Channel *channel : *ping_event_loop.configuration()->channels()) {
311 VLOG(1) << "Channel "
312 << configuration::ChannelIndex(ping_event_loop.configuration(),
313 channel)
314 << " " << configuration::CleanedChannelToString(channel);
315 }
316
317 // For each remote timestamp we get back, confirm that it is either a ping
318 // message, or a timestamp we sent out. Also confirm that the timestamps are
319 // correct.
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800320 for (std::pair<int, std::string> channel :
321 shared()
322 ? std::vector<std::pair<
323 int, std::string>>{{-1, "/pi1/aos/remote_timestamps/pi2"}}
324 : std::vector<std::pair<int, std::string>>{
325 {pi1_timestamp_channel,
326 "/pi1/aos/remote_timestamps/pi2/pi1/aos/"
327 "aos-message_bridge-Timestamp"},
328 {ping_timestamp_channel,
329 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping"}}) {
330 ping_event_loop.MakeWatcher(
331 channel.second,
332 [pi1_timestamp_channel, ping_timestamp_channel, &ping_on_pi2_fetcher,
333 &ping_on_pi1_fetcher, &pi1_on_pi2_timestamp_fetcher,
334 &pi1_on_pi1_timestamp_fetcher,
335 channel_index = channel.first](const RemoteMessage &header) {
336 VLOG(1) << "/pi1/aos/remote_timestamps/pi2 RemoteMessage "
337 << aos::FlatbufferToJson(&header);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700338
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800339 EXPECT_TRUE(header.has_boot_uuid());
340 if (channel_index != -1) {
341 ASSERT_EQ(channel_index, header.channel_index());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700342 }
343
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800344 const aos::monotonic_clock::time_point header_monotonic_sent_time(
345 chrono::nanoseconds(header.monotonic_sent_time()));
346 const aos::realtime_clock::time_point header_realtime_sent_time(
347 chrono::nanoseconds(header.realtime_sent_time()));
348 const aos::monotonic_clock::time_point header_monotonic_remote_time(
349 chrono::nanoseconds(header.monotonic_remote_time()));
350 const aos::realtime_clock::time_point header_realtime_remote_time(
351 chrono::nanoseconds(header.realtime_remote_time()));
352
353 const Context *pi1_context = nullptr;
354 const Context *pi2_context = nullptr;
355
356 if (header.channel_index() == pi1_timestamp_channel) {
357 // Find the forwarded message.
358 while (pi1_on_pi2_timestamp_fetcher.context().monotonic_event_time <
359 header_monotonic_sent_time) {
360 ASSERT_TRUE(pi1_on_pi2_timestamp_fetcher.FetchNext());
361 }
362
363 // And the source message.
364 while (pi1_on_pi1_timestamp_fetcher.context().monotonic_event_time <
365 header_monotonic_remote_time) {
366 ASSERT_TRUE(pi1_on_pi1_timestamp_fetcher.FetchNext());
367 }
368
369 pi1_context = &pi1_on_pi1_timestamp_fetcher.context();
370 pi2_context = &pi1_on_pi2_timestamp_fetcher.context();
371 } else if (header.channel_index() == ping_timestamp_channel) {
372 // Find the forwarded message.
373 while (ping_on_pi2_fetcher.context().monotonic_event_time <
374 header_monotonic_sent_time) {
375 ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
376 }
377
378 // And the source message.
379 while (ping_on_pi1_fetcher.context().monotonic_event_time <
380 header_monotonic_remote_time) {
381 ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
382 }
383
384 pi1_context = &ping_on_pi1_fetcher.context();
385 pi2_context = &ping_on_pi2_fetcher.context();
386 } else {
387 LOG(FATAL) << "Unknown channel";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700388 }
389
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800390 // Confirm the forwarded message has matching timestamps to the
391 // timestamps we got back.
392 EXPECT_EQ(pi2_context->queue_index, header.queue_index());
393 EXPECT_EQ(pi2_context->monotonic_event_time,
394 header_monotonic_sent_time);
395 EXPECT_EQ(pi2_context->realtime_event_time,
396 header_realtime_sent_time);
397 EXPECT_EQ(pi2_context->realtime_remote_time,
398 header_realtime_remote_time);
399 EXPECT_EQ(pi2_context->monotonic_remote_time,
400 header_monotonic_remote_time);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700401
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800402 // Confirm the forwarded message also matches the source message.
403 EXPECT_EQ(pi1_context->queue_index, header.queue_index());
404 EXPECT_EQ(pi1_context->monotonic_event_time,
405 header_monotonic_remote_time);
406 EXPECT_EQ(pi1_context->realtime_event_time,
407 header_realtime_remote_time);
408 });
409 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700410
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800411 // Start everything up. Pong is the only thing we don't know how to wait
412 // on, so start it first.
Austin Schuha4e616a2023-05-15 17:59:30 -0700413 ThreadedEventLoopRunner pong_thread(&pong_event_loop);
414 ThreadedEventLoopRunner ping_thread(&ping_event_loop);
Austin Schuh7bc59052020-02-16 23:48:33 -0800415
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800416 StartPi1Server();
417 StartPi1Client();
418 StartPi2Client();
419 StartPi2Server();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800420
421 // And go!
Austin Schuha4e616a2023-05-15 17:59:30 -0700422 // Run for 5 seconds to make sure we have time to estimate the offset.
423 std::this_thread::sleep_for(chrono::milliseconds(5050));
Austin Schuh7bc59052020-02-16 23:48:33 -0800424
425 // Confirm that we are estimating a monotonic offset on the client.
426 ASSERT_TRUE(client_statistics_fetcher.Fetch());
427
428 EXPECT_EQ(client_statistics_fetcher->connections()->size(), 1u);
429 EXPECT_EQ(client_statistics_fetcher->connections()
430 ->Get(0)
431 ->node()
432 ->name()
433 ->string_view(),
434 "pi1");
435
436 // Make sure the offset in one direction is less than a second.
437 EXPECT_GT(
Austin Schuh2b159eb2021-07-31 19:42:21 -0700438 client_statistics_fetcher->connections()->Get(0)->monotonic_offset(), 0)
439 << aos::FlatbufferToJson(client_statistics_fetcher.get());
Austin Schuh7bc59052020-02-16 23:48:33 -0800440 EXPECT_LT(
441 client_statistics_fetcher->connections()->Get(0)->monotonic_offset(),
Austin Schuh2b159eb2021-07-31 19:42:21 -0700442 1000000000)
443 << aos::FlatbufferToJson(client_statistics_fetcher.get());
Austin Schuh7bc59052020-02-16 23:48:33 -0800444
Austin Schuha4e616a2023-05-15 17:59:30 -0700445 // Shut everyone else down before confirming everything actually ran.
446 ping_thread.Exit();
447 pong_thread.Exit();
448 StopPi1Server();
449 StopPi1Client();
450 StopPi2Client();
451 StopPi2Server();
452
453 // Make sure we sent something.
454 EXPECT_GE(ping_count, 1);
455 // And got something back.
456 EXPECT_GE(pong_count, 1);
457
Austin Schuh7bc59052020-02-16 23:48:33 -0800458 EXPECT_GE(pi1_server_statistics_count, 2);
459 EXPECT_GE(pi2_server_statistics_count, 2);
460 EXPECT_GE(pi1_client_statistics_count, 2);
461 EXPECT_GE(pi2_client_statistics_count, 2);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700462
463 // Confirm we got timestamps back!
464 EXPECT_TRUE(message_header_fetcher1.Fetch());
465 EXPECT_TRUE(message_header_fetcher2.Fetch());
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800466}
467
Austin Schuh5344c352020-04-12 17:04:26 -0700468// Test that the client disconnecting triggers the server offsets on both sides
469// to clear.
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800470TEST_P(MessageBridgeParameterizedTest, ClientRestart) {
Austin Schuh5344c352020-04-12 17:04:26 -0700471 // This is rather annoying to set up. We need to start up a client and
472 // server, on the same node, but get them to think that they are on different
473 // nodes.
474 //
475 // We need the client to not post directly to "/test" like it would in a
476 // real system, otherwise we will re-send the ping message... So, use an
477 // application specific map to have the client post somewhere else.
478 //
479 // To top this all off, each of these needs to be done with a ShmEventLoop,
480 // which needs to run in a separate thread... And it is really hard to get
481 // everything started up reliably. So just be super generous on timeouts and
482 // hope for the best. We can be more generous in the future if we need to.
483 //
484 // We are faking the application names by passing in --application_name=foo
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800485 OnPi1();
Austin Schuh5344c352020-04-12 17:04:26 -0700486
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800487 MakePi1Server();
488 MakePi1Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700489
490 // And build the app for testing.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800491 MakePi1Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700492 aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800493 pi1_test_event_loop->MakeFetcher<ServerStatistics>("/pi1/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700494
495 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800496 OnPi2();
497 MakePi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700498
499 // And build the app for testing.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800500 MakePi2Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700501 aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800502 pi2_test_event_loop->MakeFetcher<ServerStatistics>("/pi2/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700503
504 // Wait until we are connected, then send.
Austin Schuh5344c352020-04-12 17:04:26 -0700505
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800506 StartPi1Test();
507 StartPi2Test();
508 StartPi1Server();
509 StartPi1Client();
510 StartPi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700511
512 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800513 MakePi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700514
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800515 RunPi2Client(chrono::milliseconds(3050));
Austin Schuh5344c352020-04-12 17:04:26 -0700516
517 // Now confirm we are synchronized.
518 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
519 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
520
521 const ServerConnection *const pi1_connection =
522 pi1_server_statistics_fetcher->connections()->Get(0);
523 const ServerConnection *const pi2_connection =
524 pi2_server_statistics_fetcher->connections()->Get(0);
525
526 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800527 EXPECT_EQ(pi1_connection->connection_count(), 1u);
528 EXPECT_TRUE(pi1_connection->has_connected_since_time());
Austin Schuh5344c352020-04-12 17:04:26 -0700529 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
530 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
531 chrono::milliseconds(1));
532 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
533 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800534 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700535
536 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800537 EXPECT_EQ(pi2_connection->connection_count(), 1u);
538 EXPECT_TRUE(pi2_connection->has_connected_since_time());
Austin Schuh5344c352020-04-12 17:04:26 -0700539 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
540 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
541 chrono::milliseconds(1));
542 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
543 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800544 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800545
546 StopPi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700547 }
548
Austin Schuhd0d894e2021-10-24 17:13:11 -0700549 std::this_thread::sleep_for(SctpClientConnection::kReconnectTimeout +
550 std::chrono::seconds(1));
Austin Schuh5344c352020-04-12 17:04:26 -0700551
552 {
553 // Now confirm we are un-synchronized.
554 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
555 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
556 const ServerConnection *const pi1_connection =
557 pi1_server_statistics_fetcher->connections()->Get(0);
558 const ServerConnection *const pi2_connection =
559 pi2_server_statistics_fetcher->connections()->Get(0);
560
561 EXPECT_EQ(pi1_connection->state(), State::DISCONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800562 EXPECT_EQ(pi1_connection->connection_count(), 1u);
563 EXPECT_FALSE(pi1_connection->has_connected_since_time());
Austin Schuh5344c352020-04-12 17:04:26 -0700564 EXPECT_FALSE(pi1_connection->has_monotonic_offset());
Austin Schuh20ac95d2020-12-05 17:24:19 -0800565 EXPECT_FALSE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700566 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
567 EXPECT_FALSE(pi2_connection->has_monotonic_offset());
Austin Schuh20ac95d2020-12-05 17:24:19 -0800568 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh367a7f42021-11-23 23:04:36 -0800569 EXPECT_EQ(pi2_connection->connection_count(), 1u);
570 EXPECT_TRUE(pi2_connection->has_connected_since_time());
Austin Schuh5344c352020-04-12 17:04:26 -0700571 }
572
573 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800574 MakePi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700575 // And go!
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800576 RunPi2Client(chrono::milliseconds(3050));
Austin Schuh5344c352020-04-12 17:04:26 -0700577
578 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
579 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
580
581 // Now confirm we are synchronized again.
582 const ServerConnection *const pi1_connection =
583 pi1_server_statistics_fetcher->connections()->Get(0);
584 const ServerConnection *const pi2_connection =
585 pi2_server_statistics_fetcher->connections()->Get(0);
586
587 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800588 EXPECT_EQ(pi1_connection->connection_count(), 2u);
589 EXPECT_TRUE(pi1_connection->has_connected_since_time());
Austin Schuh5344c352020-04-12 17:04:26 -0700590 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
591 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
Austin Schuh367a7f42021-11-23 23:04:36 -0800592 chrono::milliseconds(1))
593 << ": " << FlatbufferToJson(pi1_connection);
Austin Schuh5344c352020-04-12 17:04:26 -0700594 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
Austin Schuh367a7f42021-11-23 23:04:36 -0800595 chrono::milliseconds(-1))
596 << ": " << FlatbufferToJson(pi1_connection);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800597 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700598
599 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800600 EXPECT_EQ(pi2_connection->connection_count(), 1u);
601 EXPECT_TRUE(pi2_connection->has_connected_since_time());
Austin Schuh5344c352020-04-12 17:04:26 -0700602 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
603 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
Austin Schuh367a7f42021-11-23 23:04:36 -0800604 chrono::milliseconds(1))
605 << ": " << FlatbufferToJson(pi2_connection);
Austin Schuh5344c352020-04-12 17:04:26 -0700606 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
Austin Schuh367a7f42021-11-23 23:04:36 -0800607 chrono::milliseconds(-1))
608 << ": " << FlatbufferToJson(pi2_connection);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800609 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800610
611 StopPi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700612 }
613
James Kuszmaul79b2f032023-06-02 21:02:27 -0700614 // Shut everyone else down.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800615 StopPi1Server();
616 StopPi1Client();
617 StopPi2Server();
618 StopPi1Test();
619 StopPi2Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700620}
621
622// Test that the server disconnecting triggers the server offsets on the other
623// side to clear, along with the other client.
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800624TEST_P(MessageBridgeParameterizedTest, ServerRestart) {
Austin Schuh5344c352020-04-12 17:04:26 -0700625 // This is rather annoying to set up. We need to start up a client and
626 // server, on the same node, but get them to think that they are on different
627 // nodes.
628 //
629 // We need the client to not post directly to "/test" like it would in a
630 // real system, otherwise we will re-send the ping message... So, use an
631 // application specific map to have the client post somewhere else.
632 //
633 // To top this all off, each of these needs to be done with a ShmEventLoop,
634 // which needs to run in a separate thread... And it is really hard to get
635 // everything started up reliably. So just be super generous on timeouts and
636 // hope for the best. We can be more generous in the future if we need to.
637 //
638 // We are faking the application names by passing in --application_name=foo
Austin Schuh5344c352020-04-12 17:04:26 -0700639 // Force ourselves to be "raspberrypi" and allocate everything.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800640 OnPi1();
641 MakePi1Server();
642 MakePi1Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700643
644 // And build the app for testing.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800645 MakePi1Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700646 aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800647 pi1_test_event_loop->MakeFetcher<ServerStatistics>("/pi1/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700648 aos::Fetcher<ClientStatistics> pi1_client_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800649 pi1_test_event_loop->MakeFetcher<ClientStatistics>("/pi1/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700650
651 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800652 OnPi2();
653 MakePi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700654
655 // And build the app for testing.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800656 MakePi2Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700657 aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800658 pi2_test_event_loop->MakeFetcher<ServerStatistics>("/pi2/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700659
660 // Start everything up. Pong is the only thing we don't know how to wait on,
661 // so start it first.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800662 StartPi1Test();
663 StartPi2Test();
664 StartPi1Server();
665 StartPi1Client();
666 StartPi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700667
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800668 // Confirm both client and server statistics messages have decent offsets in
669 // them.
Austin Schuh5344c352020-04-12 17:04:26 -0700670
671 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800672 MakePi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700673
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800674 RunPi2Server(chrono::milliseconds(3050));
Austin Schuh5344c352020-04-12 17:04:26 -0700675
676 // Now confirm we are synchronized.
677 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
678 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
679
680 const ServerConnection *const pi1_connection =
681 pi1_server_statistics_fetcher->connections()->Get(0);
682 const ServerConnection *const pi2_connection =
683 pi2_server_statistics_fetcher->connections()->Get(0);
684
685 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
686 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
687 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
688 chrono::milliseconds(1));
689 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
690 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800691 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh367a7f42021-11-23 23:04:36 -0800692 EXPECT_TRUE(pi1_connection->has_connected_since_time());
693 EXPECT_EQ(pi1_connection->connection_count(), 1u);
Austin Schuh5344c352020-04-12 17:04:26 -0700694
695 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
696 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
697 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
698 chrono::milliseconds(1));
699 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
700 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800701 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh367a7f42021-11-23 23:04:36 -0800702 EXPECT_TRUE(pi2_connection->has_connected_since_time());
703 EXPECT_EQ(pi2_connection->connection_count(), 1u);
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800704
705 StopPi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700706 }
707
708 std::this_thread::sleep_for(std::chrono::seconds(2));
709
710 {
711 // And confirm we are unsynchronized.
712 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
713 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
714
715 const ServerConnection *const pi1_server_connection =
716 pi1_server_statistics_fetcher->connections()->Get(0);
717 const ClientConnection *const pi1_client_connection =
718 pi1_client_statistics_fetcher->connections()->Get(0);
719
720 EXPECT_EQ(pi1_server_connection->state(), State::CONNECTED);
721 EXPECT_FALSE(pi1_server_connection->has_monotonic_offset());
Austin Schuh367a7f42021-11-23 23:04:36 -0800722 EXPECT_TRUE(pi1_server_connection->has_connected_since_time());
723 EXPECT_EQ(pi1_server_connection->connection_count(), 1u);
724
Austin Schuh20ac95d2020-12-05 17:24:19 -0800725 EXPECT_TRUE(pi1_server_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700726 EXPECT_EQ(pi1_client_connection->state(), State::DISCONNECTED);
727 EXPECT_FALSE(pi1_client_connection->has_monotonic_offset());
Austin Schuh367a7f42021-11-23 23:04:36 -0800728 EXPECT_FALSE(pi1_client_connection->has_connected_since_time());
729 EXPECT_EQ(pi1_client_connection->connection_count(), 1u);
730 EXPECT_FALSE(pi1_client_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700731 }
732
733 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800734 MakePi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700735
Austin Schuh5cd1d752023-08-18 17:31:46 -0700736 // Wait long enough for the client to connect again. It currently takes 3
737 // seconds of connection to estimate the time offset.
738 RunPi2Server(chrono::milliseconds(4050));
Austin Schuh5344c352020-04-12 17:04:26 -0700739
740 // And confirm we are synchronized again.
741 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
742 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
Austin Schuh367a7f42021-11-23 23:04:36 -0800743 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
Austin Schuh5344c352020-04-12 17:04:26 -0700744
745 const ServerConnection *const pi1_connection =
746 pi1_server_statistics_fetcher->connections()->Get(0);
747 const ServerConnection *const pi2_connection =
748 pi2_server_statistics_fetcher->connections()->Get(0);
Austin Schuh367a7f42021-11-23 23:04:36 -0800749 const ClientConnection *const pi1_client_connection =
750 pi1_client_statistics_fetcher->connections()->Get(0);
Austin Schuh5344c352020-04-12 17:04:26 -0700751
752 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
753 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
754 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
755 chrono::milliseconds(1));
756 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
757 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800758 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700759
Austin Schuh367a7f42021-11-23 23:04:36 -0800760 EXPECT_EQ(pi1_client_connection->state(), State::CONNECTED);
761 EXPECT_TRUE(pi1_client_connection->has_connected_since_time());
762 EXPECT_EQ(pi1_client_connection->connection_count(), 2u);
763 EXPECT_TRUE(pi1_client_connection->has_boot_uuid());
764
Austin Schuh5344c352020-04-12 17:04:26 -0700765 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
766 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
767 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
768 chrono::milliseconds(1));
769 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
770 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800771 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800772
773 StopPi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700774 }
775
James Kuszmaul79b2f032023-06-02 21:02:27 -0700776 // Shut everyone else down.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800777 StopPi1Server();
778 StopPi1Client();
779 StopPi2Client();
780 StopPi1Test();
781 StopPi2Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700782}
783
Austin Schuh4889b182020-11-18 19:11:56 -0800784// TODO(austin): The above test confirms that the external state does the right
Austin Schuh5344c352020-04-12 17:04:26 -0700785// thing, but doesn't confirm that the internal state does. We either need to
786// expose a way to check the state in a thread-safe way, or need a way to jump
787// time for one node to do that.
788
Austin Schuh4889b182020-11-18 19:11:56 -0800789void SendPing(aos::Sender<examples::Ping> *sender, int value) {
790 aos::Sender<examples::Ping>::Builder builder = sender->MakeBuilder();
791 examples::Ping::Builder ping_builder = builder.MakeBuilder<examples::Ping>();
792 ping_builder.add_value(value);
milind1f1dca32021-07-03 13:50:07 -0700793 builder.CheckOk(builder.Send(ping_builder.Finish()));
Austin Schuh4889b182020-11-18 19:11:56 -0800794}
795
796// Tests that when a message is sent before the bridge starts up, but is
797// configured as reliable, we forward it. Confirm this survives a client reset.
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800798TEST_P(MessageBridgeParameterizedTest, ReliableSentBeforeClientStartup) {
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800799 OnPi1();
Austin Schuh4889b182020-11-18 19:11:56 -0800800
801 FLAGS_application_name = "sender";
Austin Schuhf466ab52021-02-16 22:00:38 -0800802 aos::ShmEventLoop send_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -0800803 aos::Sender<examples::Ping> ping_sender =
804 send_event_loop.MakeSender<examples::Ping>("/test");
805 SendPing(&ping_sender, 1);
806 aos::Sender<examples::Ping> unreliable_ping_sender =
807 send_event_loop.MakeSender<examples::Ping>("/unreliable");
808 SendPing(&unreliable_ping_sender, 1);
809
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800810 MakePi1Server();
811 MakePi1Client();
Austin Schuh4889b182020-11-18 19:11:56 -0800812
813 FLAGS_application_name = "pi1_timestamp";
Austin Schuhf466ab52021-02-16 22:00:38 -0800814 aos::ShmEventLoop pi1_remote_timestamp_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -0800815
816 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800817 OnPi2();
Austin Schuh4889b182020-11-18 19:11:56 -0800818
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800819 MakePi2Server();
Austin Schuh4889b182020-11-18 19:11:56 -0800820
Austin Schuhf466ab52021-02-16 22:00:38 -0800821 aos::ShmEventLoop receive_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -0800822 aos::Fetcher<examples::Ping> ping_fetcher =
823 receive_event_loop.MakeFetcher<examples::Ping>("/test");
824 aos::Fetcher<examples::Ping> unreliable_ping_fetcher =
825 receive_event_loop.MakeFetcher<examples::Ping>("/unreliable");
826 aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
827 receive_event_loop.MakeFetcher<ClientStatistics>("/pi2/aos");
828
829 const size_t ping_channel_index = configuration::ChannelIndex(
830 receive_event_loop.configuration(), ping_fetcher.channel());
831
James Kuszmaul79b2f032023-06-02 21:02:27 -0700832 // ping_timestamp_count is accessed from multiple threads (the Watcher that
833 // triggers it is in a separate thread), so make it atomic.
Austin Schuh4889b182020-11-18 19:11:56 -0800834 std::atomic<int> ping_timestamp_count{0};
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800835 const std::string channel_name =
836 shared() ? "/pi1/aos/remote_timestamps/pi2"
837 : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping";
Austin Schuh4889b182020-11-18 19:11:56 -0800838 pi1_remote_timestamp_event_loop.MakeWatcher(
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800839 channel_name, [this, channel_name, ping_channel_index,
840 &ping_timestamp_count](const RemoteMessage &header) {
Austin Schuh61e973f2021-02-21 21:43:56 -0800841 VLOG(1) << channel_name << " RemoteMessage "
Austin Schuh0de30f32020-12-06 12:44:28 -0800842 << aos::FlatbufferToJson(&header);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800843 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800844 if (shared() && header.channel_index() != ping_channel_index) {
845 return;
Austin Schuh4889b182020-11-18 19:11:56 -0800846 }
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800847 CHECK_EQ(header.channel_index(), ping_channel_index);
848 ++ping_timestamp_count;
Austin Schuh4889b182020-11-18 19:11:56 -0800849 });
850
851 // Before everything starts up, confirm there is no message.
852 EXPECT_FALSE(ping_fetcher.Fetch());
853 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
854
James Kuszmaul79b2f032023-06-02 21:02:27 -0700855 // Spin up the persistent pieces.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800856 StartPi1Server();
857 StartPi1Client();
858 StartPi2Server();
Austin Schuh4889b182020-11-18 19:11:56 -0800859
860 // Event used to wait for the timestamp counting thread to start.
Austin Schuha4e616a2023-05-15 17:59:30 -0700861 std::unique_ptr<ThreadedEventLoopRunner> pi1_remote_timestamp_thread =
862 std::make_unique<ThreadedEventLoopRunner>(
863 &pi1_remote_timestamp_event_loop);
Austin Schuh4889b182020-11-18 19:11:56 -0800864
865 {
James Kuszmaul79b2f032023-06-02 21:02:27 -0700866 // Now spin up a client for 2 seconds.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800867 MakePi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -0800868
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800869 RunPi2Client(chrono::milliseconds(2050));
Austin Schuh4889b182020-11-18 19:11:56 -0800870
871 // Confirm there is no detected duplicate packet.
872 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
873 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
874 ->Get(0)
875 ->duplicate_packets(),
876 0u);
877
Austin Schuhe61d4382021-03-31 21:33:02 -0700878 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
879 ->Get(0)
880 ->partial_deliveries(),
881 0u);
882
Austin Schuh4889b182020-11-18 19:11:56 -0800883 EXPECT_TRUE(ping_fetcher.Fetch());
884 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
885 EXPECT_EQ(ping_timestamp_count, 1);
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800886
887 StopPi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -0800888 }
889
890 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800891 // Now, spin up a client for 2 seconds.
892 MakePi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -0800893
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800894 RunPi2Client(chrono::milliseconds(5050));
Austin Schuh4889b182020-11-18 19:11:56 -0800895
896 // Confirm we detect the duplicate packet correctly.
897 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
898 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
899 ->Get(0)
900 ->duplicate_packets(),
901 1u);
902
Austin Schuhe61d4382021-03-31 21:33:02 -0700903 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
904 ->Get(0)
905 ->partial_deliveries(),
906 0u);
907
Austin Schuh4889b182020-11-18 19:11:56 -0800908 EXPECT_EQ(ping_timestamp_count, 1);
909 EXPECT_FALSE(ping_fetcher.Fetch());
910 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800911
912 StopPi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -0800913 }
914
James Kuszmaul79b2f032023-06-02 21:02:27 -0700915 // Shut everyone else down.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800916 StopPi1Client();
917 StopPi2Server();
Austin Schuha4e616a2023-05-15 17:59:30 -0700918 pi1_remote_timestamp_thread.reset();
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800919 StopPi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -0800920}
921
922// Tests that when a message is sent before the bridge starts up, but is
923// configured as reliable, we forward it. Confirm this works across server
924// resets.
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800925TEST_P(MessageBridgeParameterizedTest, ReliableSentBeforeServerStartup) {
Austin Schuh4889b182020-11-18 19:11:56 -0800926 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800927 OnPi2();
Austin Schuh4889b182020-11-18 19:11:56 -0800928
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800929 MakePi2Server();
930 MakePi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -0800931
Austin Schuhf466ab52021-02-16 22:00:38 -0800932 aos::ShmEventLoop receive_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -0800933 aos::Fetcher<examples::Ping> ping_fetcher =
934 receive_event_loop.MakeFetcher<examples::Ping>("/test");
935 aos::Fetcher<examples::Ping> unreliable_ping_fetcher =
936 receive_event_loop.MakeFetcher<examples::Ping>("/unreliable");
937 aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
938 receive_event_loop.MakeFetcher<ClientStatistics>("/pi2/aos");
939
Austin Schuh4889b182020-11-18 19:11:56 -0800940 // Force ourselves to be "raspberrypi" and allocate everything.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800941 OnPi1();
Austin Schuh4889b182020-11-18 19:11:56 -0800942
943 FLAGS_application_name = "sender";
Austin Schuhf466ab52021-02-16 22:00:38 -0800944 aos::ShmEventLoop send_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -0800945 aos::Sender<examples::Ping> ping_sender =
946 send_event_loop.MakeSender<examples::Ping>("/test");
947 {
948 aos::Sender<examples::Ping>::Builder builder = ping_sender.MakeBuilder();
949 examples::Ping::Builder ping_builder =
950 builder.MakeBuilder<examples::Ping>();
951 ping_builder.add_value(1);
milind1f1dca32021-07-03 13:50:07 -0700952 builder.CheckOk(builder.Send(ping_builder.Finish()));
Austin Schuh4889b182020-11-18 19:11:56 -0800953 }
954
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800955 MakePi1Client();
Austin Schuh4889b182020-11-18 19:11:56 -0800956
957 FLAGS_application_name = "pi1_timestamp";
Austin Schuhf466ab52021-02-16 22:00:38 -0800958 aos::ShmEventLoop pi1_remote_timestamp_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -0800959
960 const size_t ping_channel_index = configuration::ChannelIndex(
961 receive_event_loop.configuration(), ping_fetcher.channel());
962
James Kuszmaul79b2f032023-06-02 21:02:27 -0700963 // ping_timestamp_count is accessed from multiple threads (the Watcher that
964 // triggers it is in a separate thread), so make it atomic.
Austin Schuh4889b182020-11-18 19:11:56 -0800965 std::atomic<int> ping_timestamp_count{0};
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800966 const std::string channel_name =
967 shared() ? "/pi1/aos/remote_timestamps/pi2"
968 : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping";
Austin Schuh4889b182020-11-18 19:11:56 -0800969 pi1_remote_timestamp_event_loop.MakeWatcher(
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800970 channel_name, [this, channel_name, ping_channel_index,
971 &ping_timestamp_count](const RemoteMessage &header) {
972 VLOG(1) << channel_name << " RemoteMessage "
Austin Schuh0de30f32020-12-06 12:44:28 -0800973 << aos::FlatbufferToJson(&header);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800974 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800975 if (shared() && header.channel_index() != ping_channel_index) {
976 return;
Austin Schuh4889b182020-11-18 19:11:56 -0800977 }
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800978 CHECK_EQ(header.channel_index(), ping_channel_index);
979 ++ping_timestamp_count;
Austin Schuh4889b182020-11-18 19:11:56 -0800980 });
981
982 // Before everything starts up, confirm there is no message.
983 EXPECT_FALSE(ping_fetcher.Fetch());
984 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
985
James Kuszmaul79b2f032023-06-02 21:02:27 -0700986 // Spin up the persistent pieces.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800987 StartPi1Client();
988 StartPi2Server();
989 StartPi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -0800990
Austin Schuha4e616a2023-05-15 17:59:30 -0700991 std::unique_ptr<ThreadedEventLoopRunner> pi1_remote_timestamp_thread =
992 std::make_unique<ThreadedEventLoopRunner>(
993 &pi1_remote_timestamp_event_loop);
Austin Schuh4889b182020-11-18 19:11:56 -0800994
995 {
996 // Now, spin up a server for 2 seconds.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800997 MakePi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -0800998
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800999 RunPi1Server(chrono::milliseconds(2050));
Austin Schuh4889b182020-11-18 19:11:56 -08001000
1001 // Confirm there is no detected duplicate packet.
1002 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1003 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1004 ->Get(0)
1005 ->duplicate_packets(),
1006 0u);
1007
Austin Schuhe61d4382021-03-31 21:33:02 -07001008 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1009 ->Get(0)
1010 ->partial_deliveries(),
1011 0u);
1012
Austin Schuh4889b182020-11-18 19:11:56 -08001013 EXPECT_TRUE(ping_fetcher.Fetch());
1014 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
1015 EXPECT_EQ(ping_timestamp_count, 1);
1016 LOG(INFO) << "Shutting down first pi1 MessageBridgeServer";
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001017
1018 StopPi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001019 }
1020
1021 {
1022 // Now, spin up a second server for 2 seconds.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001023 MakePi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001024
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001025 RunPi1Server(chrono::milliseconds(2050));
Austin Schuh4889b182020-11-18 19:11:56 -08001026
1027 // Confirm we detect the duplicate packet correctly.
1028 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1029 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1030 ->Get(0)
1031 ->duplicate_packets(),
1032 1u);
1033
Austin Schuhe61d4382021-03-31 21:33:02 -07001034 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1035 ->Get(0)
1036 ->partial_deliveries(),
1037 0u);
1038
Austin Schuh4889b182020-11-18 19:11:56 -08001039 EXPECT_EQ(ping_timestamp_count, 1);
1040 EXPECT_FALSE(ping_fetcher.Fetch());
1041 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001042
1043 StopPi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001044 }
1045
James Kuszmaul79b2f032023-06-02 21:02:27 -07001046 // Shut everyone else down.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001047 StopPi1Client();
1048 StopPi2Server();
1049 StopPi2Client();
Austin Schuha4e616a2023-05-15 17:59:30 -07001050 pi1_remote_timestamp_thread.reset();
Austin Schuh4889b182020-11-18 19:11:56 -08001051}
1052
James Kuszmaul79b2f032023-06-02 21:02:27 -07001053// Tests that when multiple reliable messages are sent during a time when the
1054// client is restarting that only the final of those messages makes it to the
1055// client. This ensures that we handle a disconnecting & reconnecting client
1056// correctly in the server reliable connection retry logic.
1057TEST_P(MessageBridgeParameterizedTest, ReliableSentDuringClientReboot) {
1058 OnPi1();
1059
1060 FLAGS_application_name = "sender";
1061 aos::ShmEventLoop send_event_loop(&config.message());
1062 aos::Sender<examples::Ping> ping_sender =
1063 send_event_loop.MakeSender<examples::Ping>("/test");
1064 size_t ping_index = 0;
1065 SendPing(&ping_sender, ++ping_index);
1066
1067 MakePi1Server();
1068 MakePi1Client();
1069
1070 FLAGS_application_name = "pi1_timestamp";
1071 aos::ShmEventLoop pi1_remote_timestamp_event_loop(&config.message());
1072
1073 // Now do it for "raspberrypi2", the client.
1074 OnPi2();
1075
1076 MakePi2Server();
1077
1078 aos::ShmEventLoop receive_event_loop(&config.message());
1079 aos::Fetcher<examples::Ping> ping_fetcher =
1080 receive_event_loop.MakeFetcher<examples::Ping>("/test");
1081 aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
1082 receive_event_loop.MakeFetcher<ClientStatistics>("/pi2/aos");
1083
1084 const size_t ping_channel_index = configuration::ChannelIndex(
1085 receive_event_loop.configuration(), ping_fetcher.channel());
1086
1087 // ping_timestamp_count is accessed from multiple threads (the Watcher that
1088 // triggers it is in a separate thread), so make it atomic.
1089 std::atomic<int> ping_timestamp_count{0};
1090 const std::string channel_name =
1091 shared() ? "/pi1/aos/remote_timestamps/pi2"
1092 : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping";
1093 pi1_remote_timestamp_event_loop.MakeWatcher(
1094 channel_name, [this, channel_name, ping_channel_index,
1095 &ping_timestamp_count](const RemoteMessage &header) {
1096 VLOG(1) << channel_name << " RemoteMessage "
1097 << aos::FlatbufferToJson(&header);
1098 EXPECT_TRUE(header.has_boot_uuid());
1099 if (shared() && header.channel_index() != ping_channel_index) {
1100 return;
1101 }
1102 CHECK_EQ(header.channel_index(), ping_channel_index);
1103 ++ping_timestamp_count;
1104 });
1105
1106 // Before everything starts up, confirm there is no message.
1107 EXPECT_FALSE(ping_fetcher.Fetch());
1108
1109 // Spin up the persistent pieces.
1110 StartPi1Server();
1111 StartPi1Client();
1112 StartPi2Server();
1113
1114 // Event used to wait for the timestamp counting thread to start.
1115 std::unique_ptr<ThreadedEventLoopRunner> pi1_remote_timestamp_thread =
1116 std::make_unique<ThreadedEventLoopRunner>(
1117 &pi1_remote_timestamp_event_loop);
1118
1119 {
1120 // Now, spin up a client for 2 seconds.
1121 MakePi2Client();
1122
1123 RunPi2Client(chrono::milliseconds(2050));
1124
1125 // Confirm there is no detected duplicate packet.
1126 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1127 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1128 ->Get(0)
1129 ->duplicate_packets(),
1130 0u);
1131
1132 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1133 ->Get(0)
1134 ->partial_deliveries(),
1135 0u);
1136
1137 EXPECT_TRUE(ping_fetcher.Fetch());
1138 EXPECT_EQ(ping_timestamp_count, 1);
1139
1140 StopPi2Client();
1141 }
1142
1143 // Send some reliable messages while the client is dead. Only the final one
1144 // should make it through.
1145 while (ping_index < 10) {
1146 SendPing(&ping_sender, ++ping_index);
1147 }
1148
1149 {
1150 // Now, spin up a client for 2 seconds.
1151 MakePi2Client();
1152
1153 RunPi2Client(chrono::milliseconds(5050));
1154
1155 // No duplicate packets should have appeared.
1156 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1157 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1158 ->Get(0)
1159 ->duplicate_packets(),
1160 0u);
1161
1162 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1163 ->Get(0)
1164 ->partial_deliveries(),
1165 0u);
1166
1167 EXPECT_EQ(ping_timestamp_count, 2);
1168 // We should have gotten precisely one more ping message--the latest one
1169 // sent should've made it, but no previous ones.
1170 EXPECT_TRUE(ping_fetcher.FetchNext());
1171 EXPECT_EQ(ping_index, ping_fetcher->value());
1172 EXPECT_FALSE(ping_fetcher.FetchNext());
1173
1174 StopPi2Client();
1175 }
1176
1177 // Shut everyone else down.
1178 StopPi1Client();
1179 StopPi2Server();
1180 pi1_remote_timestamp_thread.reset();
1181 StopPi1Server();
1182}
1183
Austin Schuh4f7cccc2023-05-17 09:20:33 -07001184// Test that differing config sha256's result in no connection.
1185TEST_P(MessageBridgeParameterizedTest, MismatchedSha256) {
1186 // This is rather annoying to set up. We need to start up a client and
1187 // server, on the same node, but get them to think that they are on different
1188 // nodes.
1189 //
1190 // We need the client to not post directly to "/test" like it would in a
1191 // real system, otherwise we will re-send the ping message... So, use an
1192 // application specific map to have the client post somewhere else.
1193 //
1194 // To top this all off, each of these needs to be done with a ShmEventLoop,
1195 // which needs to run in a separate thread... And it is really hard to get
1196 // everything started up reliably. So just be super generous on timeouts and
1197 // hope for the best. We can be more generous in the future if we need to.
1198 //
1199 // We are faking the application names by passing in --application_name=foo
1200 OnPi1();
1201
1202 MakePi1Server(
1203 "dummy sha256 ");
1204 MakePi1Client();
1205
1206 // And build the app for testing.
1207 MakePi1Test();
1208 aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
1209 pi1_test_event_loop->MakeFetcher<ServerStatistics>("/pi1/aos");
1210 aos::Fetcher<ClientStatistics> pi1_client_statistics_fetcher =
1211 pi1_test_event_loop->MakeFetcher<ClientStatistics>("/pi1/aos");
1212
1213 // Now do it for "raspberrypi2", the client.
1214 OnPi2();
1215 MakePi2Server();
1216
1217 // And build the app for testing.
1218 MakePi2Test();
1219 aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
1220 pi2_test_event_loop->MakeFetcher<ServerStatistics>("/pi2/aos");
1221 aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
1222 pi2_test_event_loop->MakeFetcher<ClientStatistics>("/pi2/aos");
1223
1224 // Wait until we are connected, then send.
1225
1226 StartPi1Test();
1227 StartPi2Test();
1228 StartPi1Server();
1229 StartPi1Client();
1230 StartPi2Server();
1231
1232 {
1233 MakePi2Client();
1234
1235 RunPi2Client(chrono::milliseconds(3050));
1236
1237 // Now confirm we are synchronized.
1238 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1239 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1240 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1241 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1242
1243 const ServerConnection *const pi1_connection =
1244 pi1_server_statistics_fetcher->connections()->Get(0);
1245 const ClientConnection *const pi1_client_connection =
1246 pi1_client_statistics_fetcher->connections()->Get(0);
1247 const ServerConnection *const pi2_connection =
1248 pi2_server_statistics_fetcher->connections()->Get(0);
1249 const ClientConnection *const pi2_client_connection =
1250 pi2_client_statistics_fetcher->connections()->Get(0);
1251
1252 // Make sure one direction is disconnected with a bunch of connection
1253 // attempts and failures.
1254 EXPECT_EQ(pi1_connection->state(), State::DISCONNECTED);
1255 EXPECT_EQ(pi1_connection->connection_count(), 0u);
1256 EXPECT_GT(pi1_connection->invalid_connection_count(), 10u);
1257
1258 EXPECT_EQ(pi2_client_connection->state(), State::DISCONNECTED);
1259 EXPECT_GT(pi2_client_connection->connection_count(), 10u);
1260
1261 // And the other direction is happy.
1262 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
1263 EXPECT_EQ(pi2_connection->connection_count(), 1u);
1264 EXPECT_TRUE(pi2_connection->has_connected_since_time());
1265 EXPECT_FALSE(pi2_connection->has_monotonic_offset());
1266 EXPECT_TRUE(pi2_connection->has_boot_uuid());
1267
1268 EXPECT_EQ(pi1_client_connection->state(), State::CONNECTED);
1269 EXPECT_EQ(pi1_client_connection->connection_count(), 1u);
1270
1271 VLOG(1) << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1272 VLOG(1) << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1273 VLOG(1) << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1274 VLOG(1) << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1275
1276 StopPi2Client();
1277 }
1278
James Kuszmaul79b2f032023-06-02 21:02:27 -07001279 // Shut everyone else down.
Austin Schuh4f7cccc2023-05-17 09:20:33 -07001280 StopPi1Server();
1281 StopPi1Client();
1282 StopPi2Server();
1283 StopPi1Test();
1284 StopPi2Test();
1285}
1286
Austin Schuh89f23e32023-05-15 17:06:43 -07001287// Test that a client which connects with too big a message gets disconnected
1288// without crashing.
1289TEST_P(MessageBridgeParameterizedTest, TooBigConnect) {
Austin Schuhb0e439d2023-05-15 10:55:40 -07001290 // This is rather annoying to set up. We need to start up a client and
1291 // server, on the same node, but get them to think that they are on different
1292 // nodes.
1293 //
1294 // We need the client to not post directly to "/test" like it would in a
1295 // real system, otherwise we will re-send the ping message... So, use an
1296 // application specific map to have the client post somewhere else.
1297 //
1298 // To top this all off, each of these needs to be done with a ShmEventLoop,
1299 // which needs to run in a separate thread... And it is really hard to get
1300 // everything started up reliably. So just be super generous on timeouts and
1301 // hope for the best. We can be more generous in the future if we need to.
1302 //
1303 // We are faking the application names by passing in --application_name=foo
1304 OnPi1();
1305
Sarah Newman3a3b5b82023-05-26 15:56:53 -07001306 MakePi1Server();
Austin Schuhb0e439d2023-05-15 10:55:40 -07001307 MakePi1Client();
1308
1309 // And build the app for testing.
1310 MakePi1Test();
1311 aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
1312 pi1_test_event_loop->MakeFetcher<ServerStatistics>("/pi1/aos");
1313 aos::Fetcher<ClientStatistics> pi1_client_statistics_fetcher =
1314 pi1_test_event_loop->MakeFetcher<ClientStatistics>("/pi1/aos");
1315
1316 // Now do it for "raspberrypi2", the client.
1317 OnPi2();
1318 MakePi2Server();
1319
1320 // And build the app for testing.
1321 MakePi2Test();
1322 aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
1323 pi2_test_event_loop->MakeFetcher<ServerStatistics>("/pi2/aos");
1324 aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
1325 pi2_test_event_loop->MakeFetcher<ClientStatistics>("/pi2/aos");
1326
1327 // Wait until we are connected, then send.
1328
1329 StartPi1Test();
1330 StartPi2Test();
1331 StartPi1Server();
1332 StartPi1Client();
1333 StartPi2Server();
1334
1335 {
Austin Schuh89f23e32023-05-15 17:06:43 -07001336 // Now, spin up a SctpClient and send a massive hunk of data. This should
1337 // trigger a disconnect, but no crash.
1338 OnPi2();
1339 FLAGS_application_name = "pi2_message_bridge_client";
1340 pi2_client_event_loop =
1341 std::make_unique<aos::ShmEventLoop>(&config.message());
1342 pi2_client_event_loop->SetRuntimeRealtimePriority(1);
1343
1344 const aos::Node *const remote_node = CHECK_NOTNULL(
1345 configuration::GetNode(pi2_client_event_loop->configuration(), "pi1"));
1346
1347 const aos::FlatbufferDetachedBuffer<aos::message_bridge::Connect>
1348 connect_message(MakeConnectMessage(
1349 pi2_client_event_loop->configuration(),
1350 pi2_client_event_loop->node(), "pi1",
1351 pi2_client_event_loop->boot_uuid(), config_sha256));
1352
1353 SctpClient client(remote_node->hostname()->string_view(),
1354 remote_node->port(),
1355 connect_message.message().channels_to_transfer()->size() +
1356 kControlStreams(),
1357 "");
1358
Austin Schuh89f23e32023-05-15 17:06:43 -07001359 client.SetPoolSize(2u);
1360
Sarah Newman3a3b5b82023-05-26 15:56:53 -07001361 // Passes on a machine with:
1362 // 5.4.0-147-generic
1363 // net.core.wmem_default = 212992
1364 // net.core.wmem_max = 212992
1365 // net.core.rmem_default = 212992
1366 // net.core.rmem_max = 212992
1367 // If too large it appears the message is never delivered to the
1368 // application.
1369 constexpr size_t kBigMessageSize = 64000;
1370 client.SetMaxReadSize(kBigMessageSize);
1371 client.SetMaxWriteSize(kBigMessageSize);
1372
1373 const std::string big_data(kBigMessageSize, 'a');
Austin Schuh89f23e32023-05-15 17:06:43 -07001374
1375 pi2_client_event_loop->epoll()->OnReadable(client.fd(), [&]() {
1376 aos::unique_c_ptr<Message> message = client.Read();
1377 client.FreeMessage(std::move(message));
1378 });
1379
1380 aos::TimerHandler *const send_big_message = pi2_client_event_loop->AddTimer(
1381 [&]() { CHECK(client.Send(kConnectStream(), big_data, 0)); });
1382
1383 pi2_client_event_loop->OnRun([this, send_big_message]() {
1384 send_big_message->Schedule(pi2_client_event_loop->monotonic_now() +
1385 chrono::seconds(1));
1386 });
Austin Schuhb0e439d2023-05-15 10:55:40 -07001387
1388 RunPi2Client(chrono::milliseconds(3050));
1389
1390 // Now confirm we are synchronized.
1391 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1392 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1393 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
Austin Schuh89f23e32023-05-15 17:06:43 -07001394 EXPECT_FALSE(pi2_client_statistics_fetcher.Fetch());
Austin Schuhb0e439d2023-05-15 10:55:40 -07001395
1396 const ServerConnection *const pi1_connection =
1397 pi1_server_statistics_fetcher->connections()->Get(0);
1398 const ClientConnection *const pi1_client_connection =
1399 pi1_client_statistics_fetcher->connections()->Get(0);
1400 const ServerConnection *const pi2_connection =
1401 pi2_server_statistics_fetcher->connections()->Get(0);
Austin Schuhb0e439d2023-05-15 10:55:40 -07001402
Austin Schuh4f7cccc2023-05-17 09:20:33 -07001403 // Make sure the server we just sent a bunch of junk to is grumpy and
1404 // disconnected the bad client.
Austin Schuhb0e439d2023-05-15 10:55:40 -07001405 EXPECT_EQ(pi1_connection->state(), State::DISCONNECTED);
1406 EXPECT_EQ(pi1_connection->connection_count(), 0u);
Austin Schuh89f23e32023-05-15 17:06:43 -07001407 EXPECT_GE(pi1_server_statistics_fetcher->invalid_connection_count(), 1u);
Austin Schuhb0e439d2023-05-15 10:55:40 -07001408
1409 // And the other direction is happy.
1410 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
1411 EXPECT_EQ(pi2_connection->connection_count(), 1u);
1412 EXPECT_TRUE(pi2_connection->has_connected_since_time());
1413 EXPECT_FALSE(pi2_connection->has_monotonic_offset());
1414 EXPECT_TRUE(pi2_connection->has_boot_uuid());
1415
1416 EXPECT_EQ(pi1_client_connection->state(), State::CONNECTED);
1417 EXPECT_EQ(pi1_client_connection->connection_count(), 1u);
1418
1419 VLOG(1) << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1420 VLOG(1) << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
Austin Schuhb0e439d2023-05-15 10:55:40 -07001421 VLOG(1) << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1422
Austin Schuh89f23e32023-05-15 17:06:43 -07001423 pi2_client_event_loop->epoll()->DeleteFd(client.fd());
1424
Austin Schuhb0e439d2023-05-15 10:55:40 -07001425 StopPi2Client();
1426 }
1427
James Kuszmaul79b2f032023-06-02 21:02:27 -07001428 // Shut everyone else down.
Austin Schuhb0e439d2023-05-15 10:55:40 -07001429 StopPi1Server();
1430 StopPi1Client();
1431 StopPi2Server();
1432 StopPi1Test();
1433 StopPi2Test();
1434}
1435
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -07001436INSTANTIATE_TEST_SUITE_P(
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001437 MessageBridgeTests, MessageBridgeParameterizedTest,
1438 ::testing::Values(
1439 Param{"message_bridge_test_combined_timestamps_common_config.json",
1440 true},
1441 Param{"message_bridge_test_common_config.json", false}));
1442
Austin Schuhe84c3ed2019-12-14 15:29:48 -08001443} // namespace testing
1444} // namespace message_bridge
1445} // namespace aos