blob: 4eb5e8b0ddc1e8d09c882904f4dfe7a6c8a4cef2 [file] [log] [blame]
Austin Schuhe84c3ed2019-12-14 15:29:48 -08001#include <chrono>
2#include <thread>
3
Austin Schuh2f8fd752020-09-01 22:38:28 -07004#include "absl/strings/str_cat.h"
Philipp Schrader790cb542023-07-05 21:06:52 -07005#include "gtest/gtest.h"
6
Austin Schuhe84c3ed2019-12-14 15:29:48 -08007#include "aos/events/ping_generated.h"
8#include "aos/events/pong_generated.h"
Brian Silverman7b266d92021-02-17 21:24:02 -08009#include "aos/ipc_lib/event.h"
Austin Schuhe84c3ed2019-12-14 15:29:48 -080010#include "aos/network/message_bridge_client_lib.h"
Austin Schuh89f23e32023-05-15 17:06:43 -070011#include "aos/network/message_bridge_protocol.h"
Austin Schuhe84c3ed2019-12-14 15:29:48 -080012#include "aos/network/message_bridge_server_lib.h"
James Kuszmaul79b2f032023-06-02 21:02:27 -070013#include "aos/network/message_bridge_test_lib.h"
Jim Ostrowski2192ddb2020-06-24 19:07:31 -070014#include "aos/network/team_number.h"
Austin Schuhb0e439d2023-05-15 10:55:40 -070015#include "aos/sha256.h"
Austin Schuh373f1762021-06-02 21:07:09 -070016#include "aos/testing/path.h"
Austin Schuhe991fe22020-11-18 16:53:39 -080017#include "aos/util/file.h"
Austin Schuhe84c3ed2019-12-14 15:29:48 -080018
19namespace aos {
Austin Schuh2f8fd752020-09-01 22:38:28 -070020
Austin Schuhe84c3ed2019-12-14 15:29:48 -080021namespace message_bridge {
22namespace testing {
23
James Kuszmaul79b2f032023-06-02 21:02:27 -070024// Note: All of these tests spin up ShmEventLoop's in separate threads to allow
25// us to run the "real" message bridge. This requires extra threading and timing
26// coordination to make happen, which is the reason for some of the extra
27// complexity in these tests.
Austin Schuhe991fe22020-11-18 16:53:39 -080028
Austin Schuhe84c3ed2019-12-14 15:29:48 -080029// Test that we can send a ping message over sctp and receive it.
Austin Schuh36a2c3e2021-02-18 22:28:38 -080030TEST_P(MessageBridgeParameterizedTest, PingPong) {
Austin Schuhe84c3ed2019-12-14 15:29:48 -080031 // This is rather annoying to set up. We need to start up a client and
32 // server, on the same node, but get them to think that they are on different
33 // nodes.
34 //
35 // We then get to wait until they are connected.
36 //
37 // After they are connected, we send a Ping message.
38 //
39 // On the other end, we receive a Pong message.
40 //
41 // But, we need the client to not post directly to "/test" like it would in a
42 // real system, otherwise we will re-send the ping message... So, use an
43 // application specific map to have the client post somewhere else.
44 //
45 // To top this all off, each of these needs to be done with a ShmEventLoop,
46 // which needs to run in a separate thread... And it is really hard to get
47 // everything started up reliably. So just be super generous on timeouts and
48 // hope for the best. We can be more generous in the future if we need to.
49 //
50 // We are faking the application names by passing in --application_name=foo
Austin Schuh0a2f12f2021-01-08 22:48:29 -080051 OnPi1();
Austin Schuhe84c3ed2019-12-14 15:29:48 -080052 // Force ourselves to be "raspberrypi" and allocate everything.
Austin Schuh2f8fd752020-09-01 22:38:28 -070053
Austin Schuh0a2f12f2021-01-08 22:48:29 -080054 MakePi1Server();
55 MakePi1Client();
Austin Schuhe84c3ed2019-12-14 15:29:48 -080056
Austin Schuh89e1e9c2023-05-15 14:38:44 -070057 const std::string long_data = std::string(10000, 'a');
58
Austin Schuhe84c3ed2019-12-14 15:29:48 -080059 // And build the app which sends the pings.
60 FLAGS_application_name = "ping";
Austin Schuhf466ab52021-02-16 22:00:38 -080061 aos::ShmEventLoop ping_event_loop(&config.message());
Austin Schuhe84c3ed2019-12-14 15:29:48 -080062 aos::Sender<examples::Ping> ping_sender =
63 ping_event_loop.MakeSender<examples::Ping>("/test");
64
Austin Schuhf466ab52021-02-16 22:00:38 -080065 aos::ShmEventLoop pi1_test_event_loop(&config.message());
Austin Schuh0de30f32020-12-06 12:44:28 -080066 aos::Fetcher<RemoteMessage> message_header_fetcher1 =
67 pi1_test_event_loop.MakeFetcher<RemoteMessage>(
Austin Schuh36a2c3e2021-02-18 22:28:38 -080068 shared() ? "/pi1/aos/remote_timestamps/pi2"
69 : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping");
Austin Schuh2f8fd752020-09-01 22:38:28 -070070
71 // Fetchers for confirming the remote timestamps made it.
72 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
73 ping_event_loop.MakeFetcher<examples::Ping>("/test");
74 aos::Fetcher<Timestamp> pi1_on_pi1_timestamp_fetcher =
75 ping_event_loop.MakeFetcher<Timestamp>("/aos");
76
Austin Schuhe84c3ed2019-12-14 15:29:48 -080077 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -080078 OnPi2();
Austin Schuh2f8fd752020-09-01 22:38:28 -070079
Austin Schuh0a2f12f2021-01-08 22:48:29 -080080 MakePi2Client();
81 MakePi2Server();
Austin Schuhe84c3ed2019-12-14 15:29:48 -080082
83 // And build the app which sends the pongs.
84 FLAGS_application_name = "pong";
Austin Schuhf466ab52021-02-16 22:00:38 -080085 aos::ShmEventLoop pong_event_loop(&config.message());
Austin Schuhe84c3ed2019-12-14 15:29:48 -080086
Austin Schuh7bc59052020-02-16 23:48:33 -080087 // And build the app for testing.
88 FLAGS_application_name = "test";
Austin Schuhf466ab52021-02-16 22:00:38 -080089 aos::ShmEventLoop test_event_loop(&config.message());
Austin Schuh7bc59052020-02-16 23:48:33 -080090
91 aos::Fetcher<ClientStatistics> client_statistics_fetcher =
92 test_event_loop.MakeFetcher<ClientStatistics>("/aos");
Austin Schuh0de30f32020-12-06 12:44:28 -080093 aos::Fetcher<RemoteMessage> message_header_fetcher2 =
94 test_event_loop.MakeFetcher<RemoteMessage>(
Austin Schuh36a2c3e2021-02-18 22:28:38 -080095 shared() ? "/pi2/aos/remote_timestamps/pi1"
96 : "/pi2/aos/remote_timestamps/pi1/pi2/aos/"
97 "aos-message_bridge-Timestamp");
Austin Schuh2f8fd752020-09-01 22:38:28 -070098
99 // Event loop for fetching data delivered to pi2 from pi1 to match up
100 // messages.
Austin Schuhf466ab52021-02-16 22:00:38 -0800101 aos::ShmEventLoop delivered_messages_event_loop(&config.message());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700102 aos::Fetcher<Timestamp> pi1_on_pi2_timestamp_fetcher =
103 delivered_messages_event_loop.MakeFetcher<Timestamp>("/pi1/aos");
104 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
105 delivered_messages_event_loop.MakeFetcher<examples::Ping>("/test");
106 EXPECT_FALSE(ping_on_pi2_fetcher.Fetch());
107 EXPECT_FALSE(pi1_on_pi2_timestamp_fetcher.Fetch());
Austin Schuh7bc59052020-02-16 23:48:33 -0800108
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800109 // Count the pongs.
110 int pong_count = 0;
Austin Schuh8902fa52021-03-14 22:39:24 -0700111 pong_event_loop.MakeWatcher("/test", [&pong_count, &pong_event_loop,
112 this](const examples::Ping &ping) {
Austin Schuha9012be2021-07-21 15:19:11 -0700113 EXPECT_EQ(pong_event_loop.context().source_boot_uuid, pi1_boot_uuid_);
Austin Schuh8902fa52021-03-14 22:39:24 -0700114 ++pong_count;
115 VLOG(1) << "Got ping back " << FlatbufferToJson(&ping);
116 });
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800117
118 FLAGS_override_hostname = "";
119
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800120 // Wait until we are connected, then send.
121 int ping_count = 0;
Austin Schuh7bc59052020-02-16 23:48:33 -0800122 int pi1_server_statistics_count = 0;
Philipp Schrader790cb542023-07-05 21:06:52 -0700123 ping_event_loop.MakeWatcher(
124 "/pi1/aos",
125 [this, &ping_count, &ping_sender, &pi1_server_statistics_count,
126 &long_data](const ServerStatistics &stats) {
127 VLOG(1) << "/pi1/aos ServerStatistics " << FlatbufferToJson(&stats);
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800128
Philipp Schrader790cb542023-07-05 21:06:52 -0700129 ASSERT_TRUE(stats.has_connections());
130 EXPECT_EQ(stats.connections()->size(), 1);
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800131
Philipp Schrader790cb542023-07-05 21:06:52 -0700132 bool connected = false;
133 for (const ServerConnection *connection : *stats.connections()) {
134 // Confirm that we are estimating the server time offset correctly. It
135 // should be about 0 since we are on the same machine here.
136 if (connection->has_monotonic_offset()) {
137 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
138 chrono::milliseconds(1));
139 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
140 chrono::milliseconds(-1));
141 ++pi1_server_statistics_count;
142 }
Austin Schuh7bc59052020-02-16 23:48:33 -0800143
Philipp Schrader790cb542023-07-05 21:06:52 -0700144 if (connection->node()->name()->string_view() ==
145 pi2_client_event_loop->node()->name()->string_view()) {
146 if (connection->state() == State::CONNECTED) {
147 EXPECT_TRUE(connection->has_boot_uuid());
148 EXPECT_EQ(connection->connection_count(), 1u);
149 EXPECT_LT(monotonic_clock::time_point(chrono::nanoseconds(
150 connection->connected_since_time())),
151 monotonic_clock::now());
152 connected = true;
153 } else {
154 EXPECT_FALSE(connection->has_connection_count());
155 EXPECT_FALSE(connection->has_connected_since_time());
156 }
157 }
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800158 }
159
Philipp Schrader790cb542023-07-05 21:06:52 -0700160 if (connected) {
161 VLOG(1) << "Connected! Sent ping.";
162 auto builder = ping_sender.MakeBuilder();
163 builder.fbb()->CreateString(long_data);
164 examples::Ping::Builder ping_builder =
165 builder.MakeBuilder<examples::Ping>();
166 ping_builder.add_value(ping_count + 971);
167 EXPECT_EQ(builder.Send(ping_builder.Finish()), RawSender::Error::kOk);
168 ++ping_count;
169 }
170 });
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800171
Austin Schuh7bc59052020-02-16 23:48:33 -0800172 // Confirm both client and server statistics messages have decent offsets in
173 // them.
174 int pi2_server_statistics_count = 0;
Austin Schuh196a4452020-03-15 23:12:03 -0700175 pong_event_loop.MakeWatcher("/pi2/aos", [&pi2_server_statistics_count](
Austin Schuh7bc59052020-02-16 23:48:33 -0800176 const ServerStatistics &stats) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800177 VLOG(1) << "/pi2/aos ServerStatistics " << FlatbufferToJson(&stats);
Austin Schuh7bc59052020-02-16 23:48:33 -0800178 for (const ServerConnection *connection : *stats.connections()) {
179 if (connection->has_monotonic_offset()) {
180 ++pi2_server_statistics_count;
181 // Confirm that we are estimating the server time offset correctly. It
182 // should be about 0 since we are on the same machine here.
183 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
184 chrono::milliseconds(1));
185 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
186 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800187 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh7bc59052020-02-16 23:48:33 -0800188 }
Austin Schuh367a7f42021-11-23 23:04:36 -0800189
190 if (connection->state() == State::CONNECTED) {
191 EXPECT_EQ(connection->connection_count(), 1u);
192 EXPECT_LT(monotonic_clock::time_point(
193 chrono::nanoseconds(connection->connected_since_time())),
194 monotonic_clock::now());
195 } else {
Austin Schuha4e616a2023-05-15 17:59:30 -0700196 // If we have been connected, we expect the connection count to stay
197 // around.
198 if (pi2_server_statistics_count > 0) {
199 EXPECT_TRUE(connection->has_connection_count());
200 EXPECT_EQ(connection->connection_count(), 1u);
201 } else {
202 EXPECT_FALSE(connection->has_connection_count());
203 }
Austin Schuh367a7f42021-11-23 23:04:36 -0800204 EXPECT_FALSE(connection->has_connected_since_time());
205 }
Austin Schuh7bc59052020-02-16 23:48:33 -0800206 }
207 });
208
209 int pi1_client_statistics_count = 0;
Austin Schuh367a7f42021-11-23 23:04:36 -0800210 int pi1_connected_client_statistics_count = 0;
211 ping_event_loop.MakeWatcher(
212 "/pi1/aos",
213 [&pi1_client_statistics_count,
214 &pi1_connected_client_statistics_count](const ClientStatistics &stats) {
215 VLOG(1) << "/pi1/aos ClientStatistics " << FlatbufferToJson(&stats);
Austin Schuh7bc59052020-02-16 23:48:33 -0800216
Austin Schuh367a7f42021-11-23 23:04:36 -0800217 for (const ClientConnection *connection : *stats.connections()) {
218 if (connection->has_monotonic_offset()) {
219 ++pi1_client_statistics_count;
220 // It takes at least 10 microseconds to send a message between the
221 // client and server. The min (filtered) time shouldn't be over 10
222 // milliseconds on localhost. This might have to bump up if this is
223 // proving flaky.
224 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
225 chrono::milliseconds(10))
226 << " " << connection->monotonic_offset()
227 << "ns vs 10000ns on iteration " << pi1_client_statistics_count;
228 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
229 chrono::microseconds(10))
230 << " " << connection->monotonic_offset()
231 << "ns vs 10000ns on iteration " << pi1_client_statistics_count;
232 }
233 if (connection->state() == State::CONNECTED) {
234 EXPECT_EQ(connection->connection_count(), 1u);
235 EXPECT_LT(monotonic_clock::time_point(chrono::nanoseconds(
236 connection->connected_since_time())),
237 monotonic_clock::now());
238 // The first Connected message may not have a UUID in it since no
239 // data has flown. That's fine.
240 if (pi1_connected_client_statistics_count > 0) {
241 EXPECT_TRUE(connection->has_boot_uuid())
242 << ": " << aos::FlatbufferToJson(connection);
243 }
244 ++pi1_connected_client_statistics_count;
245 } else {
246 EXPECT_FALSE(connection->has_connection_count());
247 EXPECT_FALSE(connection->has_connected_since_time());
248 }
249 }
250 });
Austin Schuh7bc59052020-02-16 23:48:33 -0800251
252 int pi2_client_statistics_count = 0;
Austin Schuh367a7f42021-11-23 23:04:36 -0800253 int pi2_connected_client_statistics_count = 0;
254 pong_event_loop.MakeWatcher(
255 "/pi2/aos",
256 [&pi2_client_statistics_count,
257 &pi2_connected_client_statistics_count](const ClientStatistics &stats) {
258 VLOG(1) << "/pi2/aos ClientStatistics " << FlatbufferToJson(&stats);
Austin Schuh7bc59052020-02-16 23:48:33 -0800259
Austin Schuh367a7f42021-11-23 23:04:36 -0800260 for (const ClientConnection *connection : *stats.connections()) {
261 if (connection->has_monotonic_offset()) {
262 ++pi2_client_statistics_count;
263 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
264 chrono::milliseconds(10))
265 << ": got " << aos::FlatbufferToJson(connection);
266 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
267 chrono::microseconds(10))
268 << ": got " << aos::FlatbufferToJson(connection);
269 }
270 if (connection->state() == State::CONNECTED) {
271 EXPECT_EQ(connection->connection_count(), 1u);
272 EXPECT_LT(monotonic_clock::time_point(chrono::nanoseconds(
273 connection->connected_since_time())),
274 monotonic_clock::now());
275 if (pi2_connected_client_statistics_count > 0) {
276 EXPECT_TRUE(connection->has_boot_uuid());
277 }
278 ++pi2_connected_client_statistics_count;
279 } else {
Austin Schuha4e616a2023-05-15 17:59:30 -0700280 if (pi2_connected_client_statistics_count == 0) {
281 EXPECT_FALSE(connection->has_connection_count())
282 << aos::FlatbufferToJson(&stats);
283 } else {
284 EXPECT_TRUE(connection->has_connection_count())
285 << aos::FlatbufferToJson(&stats);
286 EXPECT_EQ(connection->connection_count(), 1u);
287 }
Austin Schuh367a7f42021-11-23 23:04:36 -0800288 EXPECT_FALSE(connection->has_connected_since_time());
289 }
290 }
291 });
Austin Schuh7bc59052020-02-16 23:48:33 -0800292
Austin Schuh196a4452020-03-15 23:12:03 -0700293 ping_event_loop.MakeWatcher("/pi1/aos", [](const Timestamp &timestamp) {
Austin Schuh7bc59052020-02-16 23:48:33 -0800294 EXPECT_TRUE(timestamp.has_offsets());
Austin Schuh1ca49e92020-12-11 00:01:27 -0800295 VLOG(1) << "/pi1/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuh7bc59052020-02-16 23:48:33 -0800296 });
Austin Schuh196a4452020-03-15 23:12:03 -0700297 pong_event_loop.MakeWatcher("/pi2/aos", [](const Timestamp &timestamp) {
Austin Schuh7bc59052020-02-16 23:48:33 -0800298 EXPECT_TRUE(timestamp.has_offsets());
Austin Schuh1ca49e92020-12-11 00:01:27 -0800299 VLOG(1) << "/pi2/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuh7bc59052020-02-16 23:48:33 -0800300 });
301
Austin Schuh2f8fd752020-09-01 22:38:28 -0700302 // Find the channel index for both the /pi1/aos Timestamp channel and Ping
303 // channel.
304 const size_t pi1_timestamp_channel = configuration::ChannelIndex(
305 pong_event_loop.configuration(), pi1_on_pi2_timestamp_fetcher.channel());
306 const size_t ping_timestamp_channel =
307 configuration::ChannelIndex(delivered_messages_event_loop.configuration(),
308 ping_on_pi2_fetcher.channel());
309
310 for (const Channel *channel : *ping_event_loop.configuration()->channels()) {
311 VLOG(1) << "Channel "
312 << configuration::ChannelIndex(ping_event_loop.configuration(),
313 channel)
314 << " " << configuration::CleanedChannelToString(channel);
315 }
316
317 // For each remote timestamp we get back, confirm that it is either a ping
318 // message, or a timestamp we sent out. Also confirm that the timestamps are
319 // correct.
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800320 for (std::pair<int, std::string> channel :
321 shared()
322 ? std::vector<std::pair<
323 int, std::string>>{{-1, "/pi1/aos/remote_timestamps/pi2"}}
324 : std::vector<std::pair<int, std::string>>{
325 {pi1_timestamp_channel,
326 "/pi1/aos/remote_timestamps/pi2/pi1/aos/"
327 "aos-message_bridge-Timestamp"},
328 {ping_timestamp_channel,
329 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping"}}) {
330 ping_event_loop.MakeWatcher(
331 channel.second,
332 [pi1_timestamp_channel, ping_timestamp_channel, &ping_on_pi2_fetcher,
333 &ping_on_pi1_fetcher, &pi1_on_pi2_timestamp_fetcher,
334 &pi1_on_pi1_timestamp_fetcher,
335 channel_index = channel.first](const RemoteMessage &header) {
336 VLOG(1) << "/pi1/aos/remote_timestamps/pi2 RemoteMessage "
337 << aos::FlatbufferToJson(&header);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700338
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800339 EXPECT_TRUE(header.has_boot_uuid());
340 if (channel_index != -1) {
341 ASSERT_EQ(channel_index, header.channel_index());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700342 }
343
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800344 const aos::monotonic_clock::time_point header_monotonic_sent_time(
345 chrono::nanoseconds(header.monotonic_sent_time()));
346 const aos::realtime_clock::time_point header_realtime_sent_time(
347 chrono::nanoseconds(header.realtime_sent_time()));
348 const aos::monotonic_clock::time_point header_monotonic_remote_time(
349 chrono::nanoseconds(header.monotonic_remote_time()));
350 const aos::realtime_clock::time_point header_realtime_remote_time(
351 chrono::nanoseconds(header.realtime_remote_time()));
352
353 const Context *pi1_context = nullptr;
354 const Context *pi2_context = nullptr;
355
356 if (header.channel_index() == pi1_timestamp_channel) {
357 // Find the forwarded message.
358 while (pi1_on_pi2_timestamp_fetcher.context().monotonic_event_time <
359 header_monotonic_sent_time) {
360 ASSERT_TRUE(pi1_on_pi2_timestamp_fetcher.FetchNext());
361 }
362
363 // And the source message.
364 while (pi1_on_pi1_timestamp_fetcher.context().monotonic_event_time <
365 header_monotonic_remote_time) {
366 ASSERT_TRUE(pi1_on_pi1_timestamp_fetcher.FetchNext());
367 }
368
369 pi1_context = &pi1_on_pi1_timestamp_fetcher.context();
370 pi2_context = &pi1_on_pi2_timestamp_fetcher.context();
371 } else if (header.channel_index() == ping_timestamp_channel) {
372 // Find the forwarded message.
373 while (ping_on_pi2_fetcher.context().monotonic_event_time <
374 header_monotonic_sent_time) {
375 ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
376 }
377
378 // And the source message.
379 while (ping_on_pi1_fetcher.context().monotonic_event_time <
380 header_monotonic_remote_time) {
381 ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
382 }
383
384 pi1_context = &ping_on_pi1_fetcher.context();
385 pi2_context = &ping_on_pi2_fetcher.context();
386 } else {
387 LOG(FATAL) << "Unknown channel";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700388 }
389
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800390 // Confirm the forwarded message has matching timestamps to the
391 // timestamps we got back.
392 EXPECT_EQ(pi2_context->queue_index, header.queue_index());
393 EXPECT_EQ(pi2_context->monotonic_event_time,
394 header_monotonic_sent_time);
395 EXPECT_EQ(pi2_context->realtime_event_time,
396 header_realtime_sent_time);
397 EXPECT_EQ(pi2_context->realtime_remote_time,
398 header_realtime_remote_time);
399 EXPECT_EQ(pi2_context->monotonic_remote_time,
400 header_monotonic_remote_time);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700401
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800402 // Confirm the forwarded message also matches the source message.
403 EXPECT_EQ(pi1_context->queue_index, header.queue_index());
404 EXPECT_EQ(pi1_context->monotonic_event_time,
405 header_monotonic_remote_time);
406 EXPECT_EQ(pi1_context->realtime_event_time,
407 header_realtime_remote_time);
408 });
409 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700410
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800411 // Start everything up. Pong is the only thing we don't know how to wait
412 // on, so start it first.
Austin Schuha4e616a2023-05-15 17:59:30 -0700413 ThreadedEventLoopRunner pong_thread(&pong_event_loop);
414 ThreadedEventLoopRunner ping_thread(&ping_event_loop);
Austin Schuh7bc59052020-02-16 23:48:33 -0800415
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800416 StartPi1Server();
417 StartPi1Client();
418 StartPi2Client();
419 StartPi2Server();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800420
421 // And go!
Austin Schuha4e616a2023-05-15 17:59:30 -0700422 // Run for 5 seconds to make sure we have time to estimate the offset.
423 std::this_thread::sleep_for(chrono::milliseconds(5050));
Austin Schuh7bc59052020-02-16 23:48:33 -0800424
425 // Confirm that we are estimating a monotonic offset on the client.
426 ASSERT_TRUE(client_statistics_fetcher.Fetch());
427
428 EXPECT_EQ(client_statistics_fetcher->connections()->size(), 1u);
429 EXPECT_EQ(client_statistics_fetcher->connections()
430 ->Get(0)
431 ->node()
432 ->name()
433 ->string_view(),
434 "pi1");
435
436 // Make sure the offset in one direction is less than a second.
437 EXPECT_GT(
Austin Schuh2b159eb2021-07-31 19:42:21 -0700438 client_statistics_fetcher->connections()->Get(0)->monotonic_offset(), 0)
439 << aos::FlatbufferToJson(client_statistics_fetcher.get());
Austin Schuh7bc59052020-02-16 23:48:33 -0800440 EXPECT_LT(
441 client_statistics_fetcher->connections()->Get(0)->monotonic_offset(),
Austin Schuh2b159eb2021-07-31 19:42:21 -0700442 1000000000)
443 << aos::FlatbufferToJson(client_statistics_fetcher.get());
Austin Schuh7bc59052020-02-16 23:48:33 -0800444
Austin Schuha4e616a2023-05-15 17:59:30 -0700445 // Shut everyone else down before confirming everything actually ran.
446 ping_thread.Exit();
447 pong_thread.Exit();
448 StopPi1Server();
449 StopPi1Client();
450 StopPi2Client();
451 StopPi2Server();
452
453 // Make sure we sent something.
454 EXPECT_GE(ping_count, 1);
455 // And got something back.
456 EXPECT_GE(pong_count, 1);
457
Austin Schuh7bc59052020-02-16 23:48:33 -0800458 EXPECT_GE(pi1_server_statistics_count, 2);
459 EXPECT_GE(pi2_server_statistics_count, 2);
460 EXPECT_GE(pi1_client_statistics_count, 2);
461 EXPECT_GE(pi2_client_statistics_count, 2);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700462
463 // Confirm we got timestamps back!
464 EXPECT_TRUE(message_header_fetcher1.Fetch());
465 EXPECT_TRUE(message_header_fetcher2.Fetch());
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800466}
467
Austin Schuh5344c352020-04-12 17:04:26 -0700468// Test that the client disconnecting triggers the server offsets on both sides
469// to clear.
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800470TEST_P(MessageBridgeParameterizedTest, ClientRestart) {
Austin Schuh5344c352020-04-12 17:04:26 -0700471 // This is rather annoying to set up. We need to start up a client and
472 // server, on the same node, but get them to think that they are on different
473 // nodes.
474 //
475 // We need the client to not post directly to "/test" like it would in a
476 // real system, otherwise we will re-send the ping message... So, use an
477 // application specific map to have the client post somewhere else.
478 //
479 // To top this all off, each of these needs to be done with a ShmEventLoop,
480 // which needs to run in a separate thread... And it is really hard to get
481 // everything started up reliably. So just be super generous on timeouts and
482 // hope for the best. We can be more generous in the future if we need to.
483 //
484 // We are faking the application names by passing in --application_name=foo
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800485 OnPi1();
Austin Schuh5344c352020-04-12 17:04:26 -0700486
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800487 MakePi1Server();
488 MakePi1Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700489
490 // And build the app for testing.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800491 MakePi1Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700492 aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800493 pi1_test_event_loop->MakeFetcher<ServerStatistics>("/pi1/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700494
495 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800496 OnPi2();
497 MakePi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700498
499 // And build the app for testing.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800500 MakePi2Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700501 aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800502 pi2_test_event_loop->MakeFetcher<ServerStatistics>("/pi2/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700503
504 // Wait until we are connected, then send.
Austin Schuh5344c352020-04-12 17:04:26 -0700505
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800506 StartPi1Test();
507 StartPi2Test();
508 StartPi1Server();
509 StartPi1Client();
510 StartPi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700511
512 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800513 MakePi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700514
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800515 RunPi2Client(chrono::milliseconds(3050));
Austin Schuh5344c352020-04-12 17:04:26 -0700516
517 // Now confirm we are synchronized.
518 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
519 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
520
521 const ServerConnection *const pi1_connection =
522 pi1_server_statistics_fetcher->connections()->Get(0);
523 const ServerConnection *const pi2_connection =
524 pi2_server_statistics_fetcher->connections()->Get(0);
525
526 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800527 EXPECT_EQ(pi1_connection->connection_count(), 1u);
528 EXPECT_TRUE(pi1_connection->has_connected_since_time());
Austin Schuh5344c352020-04-12 17:04:26 -0700529 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
530 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
531 chrono::milliseconds(1));
532 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
533 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800534 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700535
536 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800537 EXPECT_EQ(pi2_connection->connection_count(), 1u);
538 EXPECT_TRUE(pi2_connection->has_connected_since_time());
Austin Schuh5344c352020-04-12 17:04:26 -0700539 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
540 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
541 chrono::milliseconds(1));
542 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
543 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800544 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800545
546 StopPi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700547 }
548
Austin Schuhd0d894e2021-10-24 17:13:11 -0700549 std::this_thread::sleep_for(SctpClientConnection::kReconnectTimeout +
550 std::chrono::seconds(1));
Austin Schuh5344c352020-04-12 17:04:26 -0700551
552 {
553 // Now confirm we are un-synchronized.
554 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
555 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
556 const ServerConnection *const pi1_connection =
557 pi1_server_statistics_fetcher->connections()->Get(0);
558 const ServerConnection *const pi2_connection =
559 pi2_server_statistics_fetcher->connections()->Get(0);
560
561 EXPECT_EQ(pi1_connection->state(), State::DISCONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800562 EXPECT_EQ(pi1_connection->connection_count(), 1u);
563 EXPECT_FALSE(pi1_connection->has_connected_since_time());
Austin Schuh5344c352020-04-12 17:04:26 -0700564 EXPECT_FALSE(pi1_connection->has_monotonic_offset());
Austin Schuh20ac95d2020-12-05 17:24:19 -0800565 EXPECT_FALSE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700566 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
567 EXPECT_FALSE(pi2_connection->has_monotonic_offset());
Austin Schuh20ac95d2020-12-05 17:24:19 -0800568 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh367a7f42021-11-23 23:04:36 -0800569 EXPECT_EQ(pi2_connection->connection_count(), 1u);
570 EXPECT_TRUE(pi2_connection->has_connected_since_time());
Austin Schuh5344c352020-04-12 17:04:26 -0700571 }
572
573 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800574 MakePi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700575 // And go!
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800576 RunPi2Client(chrono::milliseconds(3050));
Austin Schuh5344c352020-04-12 17:04:26 -0700577
578 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
579 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
580
581 // Now confirm we are synchronized again.
582 const ServerConnection *const pi1_connection =
583 pi1_server_statistics_fetcher->connections()->Get(0);
584 const ServerConnection *const pi2_connection =
585 pi2_server_statistics_fetcher->connections()->Get(0);
586
587 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800588 EXPECT_EQ(pi1_connection->connection_count(), 2u);
589 EXPECT_TRUE(pi1_connection->has_connected_since_time());
Austin Schuh5344c352020-04-12 17:04:26 -0700590 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
591 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
Austin Schuh367a7f42021-11-23 23:04:36 -0800592 chrono::milliseconds(1))
593 << ": " << FlatbufferToJson(pi1_connection);
Austin Schuh5344c352020-04-12 17:04:26 -0700594 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
Austin Schuh367a7f42021-11-23 23:04:36 -0800595 chrono::milliseconds(-1))
596 << ": " << FlatbufferToJson(pi1_connection);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800597 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700598
599 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800600 EXPECT_EQ(pi2_connection->connection_count(), 1u);
601 EXPECT_TRUE(pi2_connection->has_connected_since_time());
Austin Schuh5344c352020-04-12 17:04:26 -0700602 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
603 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
Austin Schuh367a7f42021-11-23 23:04:36 -0800604 chrono::milliseconds(1))
605 << ": " << FlatbufferToJson(pi2_connection);
Austin Schuh5344c352020-04-12 17:04:26 -0700606 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
Austin Schuh367a7f42021-11-23 23:04:36 -0800607 chrono::milliseconds(-1))
608 << ": " << FlatbufferToJson(pi2_connection);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800609 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800610
611 StopPi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700612 }
613
James Kuszmaul79b2f032023-06-02 21:02:27 -0700614 // Shut everyone else down.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800615 StopPi1Server();
616 StopPi1Client();
617 StopPi2Server();
618 StopPi1Test();
619 StopPi2Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700620}
621
622// Test that the server disconnecting triggers the server offsets on the other
623// side to clear, along with the other client.
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800624TEST_P(MessageBridgeParameterizedTest, ServerRestart) {
Austin Schuh5344c352020-04-12 17:04:26 -0700625 // This is rather annoying to set up. We need to start up a client and
626 // server, on the same node, but get them to think that they are on different
627 // nodes.
628 //
629 // We need the client to not post directly to "/test" like it would in a
630 // real system, otherwise we will re-send the ping message... So, use an
631 // application specific map to have the client post somewhere else.
632 //
633 // To top this all off, each of these needs to be done with a ShmEventLoop,
634 // which needs to run in a separate thread... And it is really hard to get
635 // everything started up reliably. So just be super generous on timeouts and
636 // hope for the best. We can be more generous in the future if we need to.
637 //
638 // We are faking the application names by passing in --application_name=foo
Austin Schuh5344c352020-04-12 17:04:26 -0700639 // Force ourselves to be "raspberrypi" and allocate everything.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800640 OnPi1();
641 MakePi1Server();
642 MakePi1Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700643
644 // And build the app for testing.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800645 MakePi1Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700646 aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800647 pi1_test_event_loop->MakeFetcher<ServerStatistics>("/pi1/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700648 aos::Fetcher<ClientStatistics> pi1_client_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800649 pi1_test_event_loop->MakeFetcher<ClientStatistics>("/pi1/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700650
651 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800652 OnPi2();
653 MakePi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700654
655 // And build the app for testing.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800656 MakePi2Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700657 aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800658 pi2_test_event_loop->MakeFetcher<ServerStatistics>("/pi2/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700659
660 // Start everything up. Pong is the only thing we don't know how to wait on,
661 // so start it first.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800662 StartPi1Test();
663 StartPi2Test();
664 StartPi1Server();
665 StartPi1Client();
666 StartPi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700667
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800668 // Confirm both client and server statistics messages have decent offsets in
669 // them.
Austin Schuh5344c352020-04-12 17:04:26 -0700670
671 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800672 MakePi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700673
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800674 RunPi2Server(chrono::milliseconds(3050));
Austin Schuh5344c352020-04-12 17:04:26 -0700675
676 // Now confirm we are synchronized.
677 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
678 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
679
680 const ServerConnection *const pi1_connection =
681 pi1_server_statistics_fetcher->connections()->Get(0);
682 const ServerConnection *const pi2_connection =
683 pi2_server_statistics_fetcher->connections()->Get(0);
684
685 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
686 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
687 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
688 chrono::milliseconds(1));
689 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
690 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800691 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh367a7f42021-11-23 23:04:36 -0800692 EXPECT_TRUE(pi1_connection->has_connected_since_time());
693 EXPECT_EQ(pi1_connection->connection_count(), 1u);
Austin Schuh5344c352020-04-12 17:04:26 -0700694
695 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
696 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
697 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
698 chrono::milliseconds(1));
699 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
700 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800701 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh367a7f42021-11-23 23:04:36 -0800702 EXPECT_TRUE(pi2_connection->has_connected_since_time());
703 EXPECT_EQ(pi2_connection->connection_count(), 1u);
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800704
705 StopPi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700706 }
707
708 std::this_thread::sleep_for(std::chrono::seconds(2));
709
710 {
711 // And confirm we are unsynchronized.
712 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
713 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
714
715 const ServerConnection *const pi1_server_connection =
716 pi1_server_statistics_fetcher->connections()->Get(0);
717 const ClientConnection *const pi1_client_connection =
718 pi1_client_statistics_fetcher->connections()->Get(0);
719
720 EXPECT_EQ(pi1_server_connection->state(), State::CONNECTED);
721 EXPECT_FALSE(pi1_server_connection->has_monotonic_offset());
Austin Schuh367a7f42021-11-23 23:04:36 -0800722 EXPECT_TRUE(pi1_server_connection->has_connected_since_time());
723 EXPECT_EQ(pi1_server_connection->connection_count(), 1u);
724
Austin Schuh20ac95d2020-12-05 17:24:19 -0800725 EXPECT_TRUE(pi1_server_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700726 EXPECT_EQ(pi1_client_connection->state(), State::DISCONNECTED);
727 EXPECT_FALSE(pi1_client_connection->has_monotonic_offset());
Austin Schuh367a7f42021-11-23 23:04:36 -0800728 EXPECT_FALSE(pi1_client_connection->has_connected_since_time());
729 EXPECT_EQ(pi1_client_connection->connection_count(), 1u);
730 EXPECT_FALSE(pi1_client_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700731 }
732
733 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800734 MakePi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700735
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800736 RunPi2Server(chrono::milliseconds(3050));
Austin Schuh5344c352020-04-12 17:04:26 -0700737
738 // And confirm we are synchronized again.
739 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
740 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
Austin Schuh367a7f42021-11-23 23:04:36 -0800741 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
Austin Schuh5344c352020-04-12 17:04:26 -0700742
743 const ServerConnection *const pi1_connection =
744 pi1_server_statistics_fetcher->connections()->Get(0);
745 const ServerConnection *const pi2_connection =
746 pi2_server_statistics_fetcher->connections()->Get(0);
Austin Schuh367a7f42021-11-23 23:04:36 -0800747 const ClientConnection *const pi1_client_connection =
748 pi1_client_statistics_fetcher->connections()->Get(0);
Austin Schuh5344c352020-04-12 17:04:26 -0700749
750 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
751 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
752 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
753 chrono::milliseconds(1));
754 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
755 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800756 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700757
Austin Schuh367a7f42021-11-23 23:04:36 -0800758 EXPECT_EQ(pi1_client_connection->state(), State::CONNECTED);
759 EXPECT_TRUE(pi1_client_connection->has_connected_since_time());
760 EXPECT_EQ(pi1_client_connection->connection_count(), 2u);
761 EXPECT_TRUE(pi1_client_connection->has_boot_uuid());
762
Austin Schuh5344c352020-04-12 17:04:26 -0700763 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
764 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
765 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
766 chrono::milliseconds(1));
767 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
768 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800769 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800770
771 StopPi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700772 }
773
James Kuszmaul79b2f032023-06-02 21:02:27 -0700774 // Shut everyone else down.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800775 StopPi1Server();
776 StopPi1Client();
777 StopPi2Client();
778 StopPi1Test();
779 StopPi2Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700780}
781
Austin Schuh4889b182020-11-18 19:11:56 -0800782// TODO(austin): The above test confirms that the external state does the right
Austin Schuh5344c352020-04-12 17:04:26 -0700783// thing, but doesn't confirm that the internal state does. We either need to
784// expose a way to check the state in a thread-safe way, or need a way to jump
785// time for one node to do that.
786
Austin Schuh4889b182020-11-18 19:11:56 -0800787void SendPing(aos::Sender<examples::Ping> *sender, int value) {
788 aos::Sender<examples::Ping>::Builder builder = sender->MakeBuilder();
789 examples::Ping::Builder ping_builder = builder.MakeBuilder<examples::Ping>();
790 ping_builder.add_value(value);
milind1f1dca32021-07-03 13:50:07 -0700791 builder.CheckOk(builder.Send(ping_builder.Finish()));
Austin Schuh4889b182020-11-18 19:11:56 -0800792}
793
794// Tests that when a message is sent before the bridge starts up, but is
795// configured as reliable, we forward it. Confirm this survives a client reset.
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800796TEST_P(MessageBridgeParameterizedTest, ReliableSentBeforeClientStartup) {
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800797 OnPi1();
Austin Schuh4889b182020-11-18 19:11:56 -0800798
799 FLAGS_application_name = "sender";
Austin Schuhf466ab52021-02-16 22:00:38 -0800800 aos::ShmEventLoop send_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -0800801 aos::Sender<examples::Ping> ping_sender =
802 send_event_loop.MakeSender<examples::Ping>("/test");
803 SendPing(&ping_sender, 1);
804 aos::Sender<examples::Ping> unreliable_ping_sender =
805 send_event_loop.MakeSender<examples::Ping>("/unreliable");
806 SendPing(&unreliable_ping_sender, 1);
807
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800808 MakePi1Server();
809 MakePi1Client();
Austin Schuh4889b182020-11-18 19:11:56 -0800810
811 FLAGS_application_name = "pi1_timestamp";
Austin Schuhf466ab52021-02-16 22:00:38 -0800812 aos::ShmEventLoop pi1_remote_timestamp_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -0800813
814 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800815 OnPi2();
Austin Schuh4889b182020-11-18 19:11:56 -0800816
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800817 MakePi2Server();
Austin Schuh4889b182020-11-18 19:11:56 -0800818
Austin Schuhf466ab52021-02-16 22:00:38 -0800819 aos::ShmEventLoop receive_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -0800820 aos::Fetcher<examples::Ping> ping_fetcher =
821 receive_event_loop.MakeFetcher<examples::Ping>("/test");
822 aos::Fetcher<examples::Ping> unreliable_ping_fetcher =
823 receive_event_loop.MakeFetcher<examples::Ping>("/unreliable");
824 aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
825 receive_event_loop.MakeFetcher<ClientStatistics>("/pi2/aos");
826
827 const size_t ping_channel_index = configuration::ChannelIndex(
828 receive_event_loop.configuration(), ping_fetcher.channel());
829
James Kuszmaul79b2f032023-06-02 21:02:27 -0700830 // ping_timestamp_count is accessed from multiple threads (the Watcher that
831 // triggers it is in a separate thread), so make it atomic.
Austin Schuh4889b182020-11-18 19:11:56 -0800832 std::atomic<int> ping_timestamp_count{0};
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800833 const std::string channel_name =
834 shared() ? "/pi1/aos/remote_timestamps/pi2"
835 : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping";
Austin Schuh4889b182020-11-18 19:11:56 -0800836 pi1_remote_timestamp_event_loop.MakeWatcher(
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800837 channel_name, [this, channel_name, ping_channel_index,
838 &ping_timestamp_count](const RemoteMessage &header) {
Austin Schuh61e973f2021-02-21 21:43:56 -0800839 VLOG(1) << channel_name << " RemoteMessage "
Austin Schuh0de30f32020-12-06 12:44:28 -0800840 << aos::FlatbufferToJson(&header);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800841 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800842 if (shared() && header.channel_index() != ping_channel_index) {
843 return;
Austin Schuh4889b182020-11-18 19:11:56 -0800844 }
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800845 CHECK_EQ(header.channel_index(), ping_channel_index);
846 ++ping_timestamp_count;
Austin Schuh4889b182020-11-18 19:11:56 -0800847 });
848
849 // Before everything starts up, confirm there is no message.
850 EXPECT_FALSE(ping_fetcher.Fetch());
851 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
852
James Kuszmaul79b2f032023-06-02 21:02:27 -0700853 // Spin up the persistent pieces.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800854 StartPi1Server();
855 StartPi1Client();
856 StartPi2Server();
Austin Schuh4889b182020-11-18 19:11:56 -0800857
858 // Event used to wait for the timestamp counting thread to start.
Austin Schuha4e616a2023-05-15 17:59:30 -0700859 std::unique_ptr<ThreadedEventLoopRunner> pi1_remote_timestamp_thread =
860 std::make_unique<ThreadedEventLoopRunner>(
861 &pi1_remote_timestamp_event_loop);
Austin Schuh4889b182020-11-18 19:11:56 -0800862
863 {
James Kuszmaul79b2f032023-06-02 21:02:27 -0700864 // Now spin up a client for 2 seconds.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800865 MakePi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -0800866
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800867 RunPi2Client(chrono::milliseconds(2050));
Austin Schuh4889b182020-11-18 19:11:56 -0800868
869 // Confirm there is no detected duplicate packet.
870 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
871 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
872 ->Get(0)
873 ->duplicate_packets(),
874 0u);
875
Austin Schuhe61d4382021-03-31 21:33:02 -0700876 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
877 ->Get(0)
878 ->partial_deliveries(),
879 0u);
880
Austin Schuh4889b182020-11-18 19:11:56 -0800881 EXPECT_TRUE(ping_fetcher.Fetch());
882 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
883 EXPECT_EQ(ping_timestamp_count, 1);
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800884
885 StopPi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -0800886 }
887
888 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800889 // Now, spin up a client for 2 seconds.
890 MakePi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -0800891
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800892 RunPi2Client(chrono::milliseconds(5050));
Austin Schuh4889b182020-11-18 19:11:56 -0800893
894 // Confirm we detect the duplicate packet correctly.
895 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
896 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
897 ->Get(0)
898 ->duplicate_packets(),
899 1u);
900
Austin Schuhe61d4382021-03-31 21:33:02 -0700901 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
902 ->Get(0)
903 ->partial_deliveries(),
904 0u);
905
Austin Schuh4889b182020-11-18 19:11:56 -0800906 EXPECT_EQ(ping_timestamp_count, 1);
907 EXPECT_FALSE(ping_fetcher.Fetch());
908 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800909
910 StopPi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -0800911 }
912
James Kuszmaul79b2f032023-06-02 21:02:27 -0700913 // Shut everyone else down.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800914 StopPi1Client();
915 StopPi2Server();
Austin Schuha4e616a2023-05-15 17:59:30 -0700916 pi1_remote_timestamp_thread.reset();
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800917 StopPi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -0800918}
919
920// Tests that when a message is sent before the bridge starts up, but is
921// configured as reliable, we forward it. Confirm this works across server
922// resets.
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800923TEST_P(MessageBridgeParameterizedTest, ReliableSentBeforeServerStartup) {
Austin Schuh4889b182020-11-18 19:11:56 -0800924 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800925 OnPi2();
Austin Schuh4889b182020-11-18 19:11:56 -0800926
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800927 MakePi2Server();
928 MakePi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -0800929
Austin Schuhf466ab52021-02-16 22:00:38 -0800930 aos::ShmEventLoop receive_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -0800931 aos::Fetcher<examples::Ping> ping_fetcher =
932 receive_event_loop.MakeFetcher<examples::Ping>("/test");
933 aos::Fetcher<examples::Ping> unreliable_ping_fetcher =
934 receive_event_loop.MakeFetcher<examples::Ping>("/unreliable");
935 aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
936 receive_event_loop.MakeFetcher<ClientStatistics>("/pi2/aos");
937
Austin Schuh4889b182020-11-18 19:11:56 -0800938 // Force ourselves to be "raspberrypi" and allocate everything.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800939 OnPi1();
Austin Schuh4889b182020-11-18 19:11:56 -0800940
941 FLAGS_application_name = "sender";
Austin Schuhf466ab52021-02-16 22:00:38 -0800942 aos::ShmEventLoop send_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -0800943 aos::Sender<examples::Ping> ping_sender =
944 send_event_loop.MakeSender<examples::Ping>("/test");
945 {
946 aos::Sender<examples::Ping>::Builder builder = ping_sender.MakeBuilder();
947 examples::Ping::Builder ping_builder =
948 builder.MakeBuilder<examples::Ping>();
949 ping_builder.add_value(1);
milind1f1dca32021-07-03 13:50:07 -0700950 builder.CheckOk(builder.Send(ping_builder.Finish()));
Austin Schuh4889b182020-11-18 19:11:56 -0800951 }
952
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800953 MakePi1Client();
Austin Schuh4889b182020-11-18 19:11:56 -0800954
955 FLAGS_application_name = "pi1_timestamp";
Austin Schuhf466ab52021-02-16 22:00:38 -0800956 aos::ShmEventLoop pi1_remote_timestamp_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -0800957
958 const size_t ping_channel_index = configuration::ChannelIndex(
959 receive_event_loop.configuration(), ping_fetcher.channel());
960
James Kuszmaul79b2f032023-06-02 21:02:27 -0700961 // ping_timestamp_count is accessed from multiple threads (the Watcher that
962 // triggers it is in a separate thread), so make it atomic.
Austin Schuh4889b182020-11-18 19:11:56 -0800963 std::atomic<int> ping_timestamp_count{0};
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800964 const std::string channel_name =
965 shared() ? "/pi1/aos/remote_timestamps/pi2"
966 : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping";
Austin Schuh4889b182020-11-18 19:11:56 -0800967 pi1_remote_timestamp_event_loop.MakeWatcher(
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800968 channel_name, [this, channel_name, ping_channel_index,
969 &ping_timestamp_count](const RemoteMessage &header) {
970 VLOG(1) << channel_name << " RemoteMessage "
Austin Schuh0de30f32020-12-06 12:44:28 -0800971 << aos::FlatbufferToJson(&header);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800972 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800973 if (shared() && header.channel_index() != ping_channel_index) {
974 return;
Austin Schuh4889b182020-11-18 19:11:56 -0800975 }
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800976 CHECK_EQ(header.channel_index(), ping_channel_index);
977 ++ping_timestamp_count;
Austin Schuh4889b182020-11-18 19:11:56 -0800978 });
979
980 // Before everything starts up, confirm there is no message.
981 EXPECT_FALSE(ping_fetcher.Fetch());
982 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
983
James Kuszmaul79b2f032023-06-02 21:02:27 -0700984 // Spin up the persistent pieces.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800985 StartPi1Client();
986 StartPi2Server();
987 StartPi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -0800988
Austin Schuha4e616a2023-05-15 17:59:30 -0700989 std::unique_ptr<ThreadedEventLoopRunner> pi1_remote_timestamp_thread =
990 std::make_unique<ThreadedEventLoopRunner>(
991 &pi1_remote_timestamp_event_loop);
Austin Schuh4889b182020-11-18 19:11:56 -0800992
993 {
994 // Now, spin up a server for 2 seconds.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800995 MakePi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -0800996
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800997 RunPi1Server(chrono::milliseconds(2050));
Austin Schuh4889b182020-11-18 19:11:56 -0800998
999 // Confirm there is no detected duplicate packet.
1000 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1001 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1002 ->Get(0)
1003 ->duplicate_packets(),
1004 0u);
1005
Austin Schuhe61d4382021-03-31 21:33:02 -07001006 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1007 ->Get(0)
1008 ->partial_deliveries(),
1009 0u);
1010
Austin Schuh4889b182020-11-18 19:11:56 -08001011 EXPECT_TRUE(ping_fetcher.Fetch());
1012 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
1013 EXPECT_EQ(ping_timestamp_count, 1);
1014 LOG(INFO) << "Shutting down first pi1 MessageBridgeServer";
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001015
1016 StopPi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001017 }
1018
1019 {
1020 // Now, spin up a second server for 2 seconds.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001021 MakePi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001022
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001023 RunPi1Server(chrono::milliseconds(2050));
Austin Schuh4889b182020-11-18 19:11:56 -08001024
1025 // Confirm we detect the duplicate packet correctly.
1026 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1027 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1028 ->Get(0)
1029 ->duplicate_packets(),
1030 1u);
1031
Austin Schuhe61d4382021-03-31 21:33:02 -07001032 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1033 ->Get(0)
1034 ->partial_deliveries(),
1035 0u);
1036
Austin Schuh4889b182020-11-18 19:11:56 -08001037 EXPECT_EQ(ping_timestamp_count, 1);
1038 EXPECT_FALSE(ping_fetcher.Fetch());
1039 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001040
1041 StopPi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001042 }
1043
James Kuszmaul79b2f032023-06-02 21:02:27 -07001044 // Shut everyone else down.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001045 StopPi1Client();
1046 StopPi2Server();
1047 StopPi2Client();
Austin Schuha4e616a2023-05-15 17:59:30 -07001048 pi1_remote_timestamp_thread.reset();
Austin Schuh4889b182020-11-18 19:11:56 -08001049}
1050
James Kuszmaul79b2f032023-06-02 21:02:27 -07001051// Tests that when multiple reliable messages are sent during a time when the
1052// client is restarting that only the final of those messages makes it to the
1053// client. This ensures that we handle a disconnecting & reconnecting client
1054// correctly in the server reliable connection retry logic.
1055TEST_P(MessageBridgeParameterizedTest, ReliableSentDuringClientReboot) {
1056 OnPi1();
1057
1058 FLAGS_application_name = "sender";
1059 aos::ShmEventLoop send_event_loop(&config.message());
1060 aos::Sender<examples::Ping> ping_sender =
1061 send_event_loop.MakeSender<examples::Ping>("/test");
1062 size_t ping_index = 0;
1063 SendPing(&ping_sender, ++ping_index);
1064
1065 MakePi1Server();
1066 MakePi1Client();
1067
1068 FLAGS_application_name = "pi1_timestamp";
1069 aos::ShmEventLoop pi1_remote_timestamp_event_loop(&config.message());
1070
1071 // Now do it for "raspberrypi2", the client.
1072 OnPi2();
1073
1074 MakePi2Server();
1075
1076 aos::ShmEventLoop receive_event_loop(&config.message());
1077 aos::Fetcher<examples::Ping> ping_fetcher =
1078 receive_event_loop.MakeFetcher<examples::Ping>("/test");
1079 aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
1080 receive_event_loop.MakeFetcher<ClientStatistics>("/pi2/aos");
1081
1082 const size_t ping_channel_index = configuration::ChannelIndex(
1083 receive_event_loop.configuration(), ping_fetcher.channel());
1084
1085 // ping_timestamp_count is accessed from multiple threads (the Watcher that
1086 // triggers it is in a separate thread), so make it atomic.
1087 std::atomic<int> ping_timestamp_count{0};
1088 const std::string channel_name =
1089 shared() ? "/pi1/aos/remote_timestamps/pi2"
1090 : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping";
1091 pi1_remote_timestamp_event_loop.MakeWatcher(
1092 channel_name, [this, channel_name, ping_channel_index,
1093 &ping_timestamp_count](const RemoteMessage &header) {
1094 VLOG(1) << channel_name << " RemoteMessage "
1095 << aos::FlatbufferToJson(&header);
1096 EXPECT_TRUE(header.has_boot_uuid());
1097 if (shared() && header.channel_index() != ping_channel_index) {
1098 return;
1099 }
1100 CHECK_EQ(header.channel_index(), ping_channel_index);
1101 ++ping_timestamp_count;
1102 });
1103
1104 // Before everything starts up, confirm there is no message.
1105 EXPECT_FALSE(ping_fetcher.Fetch());
1106
1107 // Spin up the persistent pieces.
1108 StartPi1Server();
1109 StartPi1Client();
1110 StartPi2Server();
1111
1112 // Event used to wait for the timestamp counting thread to start.
1113 std::unique_ptr<ThreadedEventLoopRunner> pi1_remote_timestamp_thread =
1114 std::make_unique<ThreadedEventLoopRunner>(
1115 &pi1_remote_timestamp_event_loop);
1116
1117 {
1118 // Now, spin up a client for 2 seconds.
1119 MakePi2Client();
1120
1121 RunPi2Client(chrono::milliseconds(2050));
1122
1123 // Confirm there is no detected duplicate packet.
1124 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1125 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1126 ->Get(0)
1127 ->duplicate_packets(),
1128 0u);
1129
1130 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1131 ->Get(0)
1132 ->partial_deliveries(),
1133 0u);
1134
1135 EXPECT_TRUE(ping_fetcher.Fetch());
1136 EXPECT_EQ(ping_timestamp_count, 1);
1137
1138 StopPi2Client();
1139 }
1140
1141 // Send some reliable messages while the client is dead. Only the final one
1142 // should make it through.
1143 while (ping_index < 10) {
1144 SendPing(&ping_sender, ++ping_index);
1145 }
1146
1147 {
1148 // Now, spin up a client for 2 seconds.
1149 MakePi2Client();
1150
1151 RunPi2Client(chrono::milliseconds(5050));
1152
1153 // No duplicate packets should have appeared.
1154 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1155 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1156 ->Get(0)
1157 ->duplicate_packets(),
1158 0u);
1159
1160 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1161 ->Get(0)
1162 ->partial_deliveries(),
1163 0u);
1164
1165 EXPECT_EQ(ping_timestamp_count, 2);
1166 // We should have gotten precisely one more ping message--the latest one
1167 // sent should've made it, but no previous ones.
1168 EXPECT_TRUE(ping_fetcher.FetchNext());
1169 EXPECT_EQ(ping_index, ping_fetcher->value());
1170 EXPECT_FALSE(ping_fetcher.FetchNext());
1171
1172 StopPi2Client();
1173 }
1174
1175 // Shut everyone else down.
1176 StopPi1Client();
1177 StopPi2Server();
1178 pi1_remote_timestamp_thread.reset();
1179 StopPi1Server();
1180}
1181
Austin Schuh4f7cccc2023-05-17 09:20:33 -07001182// Test that differing config sha256's result in no connection.
1183TEST_P(MessageBridgeParameterizedTest, MismatchedSha256) {
1184 // This is rather annoying to set up. We need to start up a client and
1185 // server, on the same node, but get them to think that they are on different
1186 // nodes.
1187 //
1188 // We need the client to not post directly to "/test" like it would in a
1189 // real system, otherwise we will re-send the ping message... So, use an
1190 // application specific map to have the client post somewhere else.
1191 //
1192 // To top this all off, each of these needs to be done with a ShmEventLoop,
1193 // which needs to run in a separate thread... And it is really hard to get
1194 // everything started up reliably. So just be super generous on timeouts and
1195 // hope for the best. We can be more generous in the future if we need to.
1196 //
1197 // We are faking the application names by passing in --application_name=foo
1198 OnPi1();
1199
1200 MakePi1Server(
1201 "dummy sha256 ");
1202 MakePi1Client();
1203
1204 // And build the app for testing.
1205 MakePi1Test();
1206 aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
1207 pi1_test_event_loop->MakeFetcher<ServerStatistics>("/pi1/aos");
1208 aos::Fetcher<ClientStatistics> pi1_client_statistics_fetcher =
1209 pi1_test_event_loop->MakeFetcher<ClientStatistics>("/pi1/aos");
1210
1211 // Now do it for "raspberrypi2", the client.
1212 OnPi2();
1213 MakePi2Server();
1214
1215 // And build the app for testing.
1216 MakePi2Test();
1217 aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
1218 pi2_test_event_loop->MakeFetcher<ServerStatistics>("/pi2/aos");
1219 aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
1220 pi2_test_event_loop->MakeFetcher<ClientStatistics>("/pi2/aos");
1221
1222 // Wait until we are connected, then send.
1223
1224 StartPi1Test();
1225 StartPi2Test();
1226 StartPi1Server();
1227 StartPi1Client();
1228 StartPi2Server();
1229
1230 {
1231 MakePi2Client();
1232
1233 RunPi2Client(chrono::milliseconds(3050));
1234
1235 // Now confirm we are synchronized.
1236 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1237 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1238 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1239 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1240
1241 const ServerConnection *const pi1_connection =
1242 pi1_server_statistics_fetcher->connections()->Get(0);
1243 const ClientConnection *const pi1_client_connection =
1244 pi1_client_statistics_fetcher->connections()->Get(0);
1245 const ServerConnection *const pi2_connection =
1246 pi2_server_statistics_fetcher->connections()->Get(0);
1247 const ClientConnection *const pi2_client_connection =
1248 pi2_client_statistics_fetcher->connections()->Get(0);
1249
1250 // Make sure one direction is disconnected with a bunch of connection
1251 // attempts and failures.
1252 EXPECT_EQ(pi1_connection->state(), State::DISCONNECTED);
1253 EXPECT_EQ(pi1_connection->connection_count(), 0u);
1254 EXPECT_GT(pi1_connection->invalid_connection_count(), 10u);
1255
1256 EXPECT_EQ(pi2_client_connection->state(), State::DISCONNECTED);
1257 EXPECT_GT(pi2_client_connection->connection_count(), 10u);
1258
1259 // And the other direction is happy.
1260 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
1261 EXPECT_EQ(pi2_connection->connection_count(), 1u);
1262 EXPECT_TRUE(pi2_connection->has_connected_since_time());
1263 EXPECT_FALSE(pi2_connection->has_monotonic_offset());
1264 EXPECT_TRUE(pi2_connection->has_boot_uuid());
1265
1266 EXPECT_EQ(pi1_client_connection->state(), State::CONNECTED);
1267 EXPECT_EQ(pi1_client_connection->connection_count(), 1u);
1268
1269 VLOG(1) << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1270 VLOG(1) << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1271 VLOG(1) << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1272 VLOG(1) << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1273
1274 StopPi2Client();
1275 }
1276
James Kuszmaul79b2f032023-06-02 21:02:27 -07001277 // Shut everyone else down.
Austin Schuh4f7cccc2023-05-17 09:20:33 -07001278 StopPi1Server();
1279 StopPi1Client();
1280 StopPi2Server();
1281 StopPi1Test();
1282 StopPi2Test();
1283}
1284
Austin Schuh89f23e32023-05-15 17:06:43 -07001285// Test that a client which connects with too big a message gets disconnected
1286// without crashing.
1287TEST_P(MessageBridgeParameterizedTest, TooBigConnect) {
Austin Schuhb0e439d2023-05-15 10:55:40 -07001288 // This is rather annoying to set up. We need to start up a client and
1289 // server, on the same node, but get them to think that they are on different
1290 // nodes.
1291 //
1292 // We need the client to not post directly to "/test" like it would in a
1293 // real system, otherwise we will re-send the ping message... So, use an
1294 // application specific map to have the client post somewhere else.
1295 //
1296 // To top this all off, each of these needs to be done with a ShmEventLoop,
1297 // which needs to run in a separate thread... And it is really hard to get
1298 // everything started up reliably. So just be super generous on timeouts and
1299 // hope for the best. We can be more generous in the future if we need to.
1300 //
1301 // We are faking the application names by passing in --application_name=foo
1302 OnPi1();
1303
Sarah Newman3a3b5b82023-05-26 15:56:53 -07001304 MakePi1Server();
Austin Schuhb0e439d2023-05-15 10:55:40 -07001305 MakePi1Client();
1306
1307 // And build the app for testing.
1308 MakePi1Test();
1309 aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
1310 pi1_test_event_loop->MakeFetcher<ServerStatistics>("/pi1/aos");
1311 aos::Fetcher<ClientStatistics> pi1_client_statistics_fetcher =
1312 pi1_test_event_loop->MakeFetcher<ClientStatistics>("/pi1/aos");
1313
1314 // Now do it for "raspberrypi2", the client.
1315 OnPi2();
1316 MakePi2Server();
1317
1318 // And build the app for testing.
1319 MakePi2Test();
1320 aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
1321 pi2_test_event_loop->MakeFetcher<ServerStatistics>("/pi2/aos");
1322 aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
1323 pi2_test_event_loop->MakeFetcher<ClientStatistics>("/pi2/aos");
1324
1325 // Wait until we are connected, then send.
1326
1327 StartPi1Test();
1328 StartPi2Test();
1329 StartPi1Server();
1330 StartPi1Client();
1331 StartPi2Server();
1332
1333 {
Austin Schuh89f23e32023-05-15 17:06:43 -07001334 // Now, spin up a SctpClient and send a massive hunk of data. This should
1335 // trigger a disconnect, but no crash.
1336 OnPi2();
1337 FLAGS_application_name = "pi2_message_bridge_client";
1338 pi2_client_event_loop =
1339 std::make_unique<aos::ShmEventLoop>(&config.message());
1340 pi2_client_event_loop->SetRuntimeRealtimePriority(1);
1341
1342 const aos::Node *const remote_node = CHECK_NOTNULL(
1343 configuration::GetNode(pi2_client_event_loop->configuration(), "pi1"));
1344
1345 const aos::FlatbufferDetachedBuffer<aos::message_bridge::Connect>
1346 connect_message(MakeConnectMessage(
1347 pi2_client_event_loop->configuration(),
1348 pi2_client_event_loop->node(), "pi1",
1349 pi2_client_event_loop->boot_uuid(), config_sha256));
1350
1351 SctpClient client(remote_node->hostname()->string_view(),
1352 remote_node->port(),
1353 connect_message.message().channels_to_transfer()->size() +
1354 kControlStreams(),
1355 "");
1356
Austin Schuh89f23e32023-05-15 17:06:43 -07001357 client.SetPoolSize(2u);
1358
Sarah Newman3a3b5b82023-05-26 15:56:53 -07001359 // Passes on a machine with:
1360 // 5.4.0-147-generic
1361 // net.core.wmem_default = 212992
1362 // net.core.wmem_max = 212992
1363 // net.core.rmem_default = 212992
1364 // net.core.rmem_max = 212992
1365 // If too large it appears the message is never delivered to the
1366 // application.
1367 constexpr size_t kBigMessageSize = 64000;
1368 client.SetMaxReadSize(kBigMessageSize);
1369 client.SetMaxWriteSize(kBigMessageSize);
1370
1371 const std::string big_data(kBigMessageSize, 'a');
Austin Schuh89f23e32023-05-15 17:06:43 -07001372
1373 pi2_client_event_loop->epoll()->OnReadable(client.fd(), [&]() {
1374 aos::unique_c_ptr<Message> message = client.Read();
1375 client.FreeMessage(std::move(message));
1376 });
1377
1378 aos::TimerHandler *const send_big_message = pi2_client_event_loop->AddTimer(
1379 [&]() { CHECK(client.Send(kConnectStream(), big_data, 0)); });
1380
1381 pi2_client_event_loop->OnRun([this, send_big_message]() {
1382 send_big_message->Schedule(pi2_client_event_loop->monotonic_now() +
1383 chrono::seconds(1));
1384 });
Austin Schuhb0e439d2023-05-15 10:55:40 -07001385
1386 RunPi2Client(chrono::milliseconds(3050));
1387
1388 // Now confirm we are synchronized.
1389 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1390 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1391 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
Austin Schuh89f23e32023-05-15 17:06:43 -07001392 EXPECT_FALSE(pi2_client_statistics_fetcher.Fetch());
Austin Schuhb0e439d2023-05-15 10:55:40 -07001393
1394 const ServerConnection *const pi1_connection =
1395 pi1_server_statistics_fetcher->connections()->Get(0);
1396 const ClientConnection *const pi1_client_connection =
1397 pi1_client_statistics_fetcher->connections()->Get(0);
1398 const ServerConnection *const pi2_connection =
1399 pi2_server_statistics_fetcher->connections()->Get(0);
Austin Schuhb0e439d2023-05-15 10:55:40 -07001400
Austin Schuh4f7cccc2023-05-17 09:20:33 -07001401 // Make sure the server we just sent a bunch of junk to is grumpy and
1402 // disconnected the bad client.
Austin Schuhb0e439d2023-05-15 10:55:40 -07001403 EXPECT_EQ(pi1_connection->state(), State::DISCONNECTED);
1404 EXPECT_EQ(pi1_connection->connection_count(), 0u);
Austin Schuh89f23e32023-05-15 17:06:43 -07001405 EXPECT_GE(pi1_server_statistics_fetcher->invalid_connection_count(), 1u);
Austin Schuhb0e439d2023-05-15 10:55:40 -07001406
1407 // And the other direction is happy.
1408 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
1409 EXPECT_EQ(pi2_connection->connection_count(), 1u);
1410 EXPECT_TRUE(pi2_connection->has_connected_since_time());
1411 EXPECT_FALSE(pi2_connection->has_monotonic_offset());
1412 EXPECT_TRUE(pi2_connection->has_boot_uuid());
1413
1414 EXPECT_EQ(pi1_client_connection->state(), State::CONNECTED);
1415 EXPECT_EQ(pi1_client_connection->connection_count(), 1u);
1416
1417 VLOG(1) << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1418 VLOG(1) << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
Austin Schuhb0e439d2023-05-15 10:55:40 -07001419 VLOG(1) << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1420
Austin Schuh89f23e32023-05-15 17:06:43 -07001421 pi2_client_event_loop->epoll()->DeleteFd(client.fd());
1422
Austin Schuhb0e439d2023-05-15 10:55:40 -07001423 StopPi2Client();
1424 }
1425
James Kuszmaul79b2f032023-06-02 21:02:27 -07001426 // Shut everyone else down.
Austin Schuhb0e439d2023-05-15 10:55:40 -07001427 StopPi1Server();
1428 StopPi1Client();
1429 StopPi2Server();
1430 StopPi1Test();
1431 StopPi2Test();
1432}
1433
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -07001434INSTANTIATE_TEST_SUITE_P(
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001435 MessageBridgeTests, MessageBridgeParameterizedTest,
1436 ::testing::Values(
1437 Param{"message_bridge_test_combined_timestamps_common_config.json",
1438 true},
1439 Param{"message_bridge_test_common_config.json", false}));
1440
Austin Schuhe84c3ed2019-12-14 15:29:48 -08001441} // namespace testing
1442} // namespace message_bridge
1443} // namespace aos