blob: a923f8177079c5c9e15898576595af7b895e8329 [file] [log] [blame]
Austin Schuhe84c3ed2019-12-14 15:29:48 -08001#include <chrono>
2#include <thread>
3
Austin Schuh99f7c6a2024-06-25 22:07:44 -07004#include "absl/flags/flag.h"
5#include "absl/log/check.h"
6#include "absl/log/log.h"
Austin Schuh2f8fd752020-09-01 22:38:28 -07007#include "absl/strings/str_cat.h"
Philipp Schrader790cb542023-07-05 21:06:52 -07008#include "gtest/gtest.h"
9
Austin Schuhe84c3ed2019-12-14 15:29:48 -080010#include "aos/events/ping_generated.h"
11#include "aos/events/pong_generated.h"
Brian Silverman7b266d92021-02-17 21:24:02 -080012#include "aos/ipc_lib/event.h"
Austin Schuhe84c3ed2019-12-14 15:29:48 -080013#include "aos/network/message_bridge_client_lib.h"
Austin Schuh89f23e32023-05-15 17:06:43 -070014#include "aos/network/message_bridge_protocol.h"
Austin Schuhe84c3ed2019-12-14 15:29:48 -080015#include "aos/network/message_bridge_server_lib.h"
James Kuszmaul79b2f032023-06-02 21:02:27 -070016#include "aos/network/message_bridge_test_lib.h"
Jim Ostrowski2192ddb2020-06-24 19:07:31 -070017#include "aos/network/team_number.h"
Austin Schuhb0e439d2023-05-15 10:55:40 -070018#include "aos/sha256.h"
Austin Schuh373f1762021-06-02 21:07:09 -070019#include "aos/testing/path.h"
Austin Schuhe991fe22020-11-18 16:53:39 -080020#include "aos/util/file.h"
Austin Schuhe84c3ed2019-12-14 15:29:48 -080021
Stephan Pleinesf63bde82024-01-13 15:59:33 -080022namespace aos::message_bridge::testing {
Austin Schuhe84c3ed2019-12-14 15:29:48 -080023
James Kuszmaul79b2f032023-06-02 21:02:27 -070024// Note: All of these tests spin up ShmEventLoop's in separate threads to allow
25// us to run the "real" message bridge. This requires extra threading and timing
26// coordination to make happen, which is the reason for some of the extra
27// complexity in these tests.
Austin Schuhe991fe22020-11-18 16:53:39 -080028
Austin Schuhe84c3ed2019-12-14 15:29:48 -080029// Test that we can send a ping message over sctp and receive it.
Austin Schuh36a2c3e2021-02-18 22:28:38 -080030TEST_P(MessageBridgeParameterizedTest, PingPong) {
Austin Schuhe84c3ed2019-12-14 15:29:48 -080031 // This is rather annoying to set up. We need to start up a client and
32 // server, on the same node, but get them to think that they are on different
33 // nodes.
34 //
35 // We then get to wait until they are connected.
36 //
37 // After they are connected, we send a Ping message.
38 //
39 // On the other end, we receive a Pong message.
40 //
41 // But, we need the client to not post directly to "/test" like it would in a
42 // real system, otherwise we will re-send the ping message... So, use an
43 // application specific map to have the client post somewhere else.
44 //
45 // To top this all off, each of these needs to be done with a ShmEventLoop,
46 // which needs to run in a separate thread... And it is really hard to get
47 // everything started up reliably. So just be super generous on timeouts and
48 // hope for the best. We can be more generous in the future if we need to.
49 //
50 // We are faking the application names by passing in --application_name=foo
Kiran Mohan38648482024-03-15 15:56:14 -070051 pi1_.OnPi();
Austin Schuhe84c3ed2019-12-14 15:29:48 -080052 // Force ourselves to be "raspberrypi" and allocate everything.
Austin Schuh2f8fd752020-09-01 22:38:28 -070053
Kiran Mohan38648482024-03-15 15:56:14 -070054 pi1_.MakeServer();
55 pi1_.MakeClient();
Austin Schuhe84c3ed2019-12-14 15:29:48 -080056
Austin Schuh89e1e9c2023-05-15 14:38:44 -070057 const std::string long_data = std::string(10000, 'a');
58
Austin Schuhe84c3ed2019-12-14 15:29:48 -080059 // And build the app which sends the pings.
Austin Schuh99f7c6a2024-06-25 22:07:44 -070060 absl::SetFlag(&FLAGS_application_name, "ping");
Kiran Mohan38648482024-03-15 15:56:14 -070061 aos::ShmEventLoop ping_event_loop(&config_.message());
Austin Schuhe84c3ed2019-12-14 15:29:48 -080062 aos::Sender<examples::Ping> ping_sender =
63 ping_event_loop.MakeSender<examples::Ping>("/test");
64
Kiran Mohan38648482024-03-15 15:56:14 -070065 aos::ShmEventLoop pi1_test_event_loop_(&pi1_.config_.message());
Austin Schuh0de30f32020-12-06 12:44:28 -080066 aos::Fetcher<RemoteMessage> message_header_fetcher1 =
Kiran Mohan38648482024-03-15 15:56:14 -070067 pi1_test_event_loop_.MakeFetcher<RemoteMessage>(
Austin Schuh36a2c3e2021-02-18 22:28:38 -080068 shared() ? "/pi1/aos/remote_timestamps/pi2"
69 : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping");
Austin Schuh2f8fd752020-09-01 22:38:28 -070070
71 // Fetchers for confirming the remote timestamps made it.
72 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
73 ping_event_loop.MakeFetcher<examples::Ping>("/test");
74 aos::Fetcher<Timestamp> pi1_on_pi1_timestamp_fetcher =
75 ping_event_loop.MakeFetcher<Timestamp>("/aos");
76
Austin Schuhe84c3ed2019-12-14 15:29:48 -080077 // Now do it for "raspberrypi2", the client.
Kiran Mohan38648482024-03-15 15:56:14 -070078 pi2_.OnPi();
Austin Schuh2f8fd752020-09-01 22:38:28 -070079
Kiran Mohan38648482024-03-15 15:56:14 -070080 pi2_.MakeClient();
81 pi2_.MakeServer();
Austin Schuhe84c3ed2019-12-14 15:29:48 -080082
83 // And build the app which sends the pongs.
Austin Schuh99f7c6a2024-06-25 22:07:44 -070084 absl::SetFlag(&FLAGS_application_name, "pong");
Kiran Mohan38648482024-03-15 15:56:14 -070085 aos::ShmEventLoop pong_event_loop(&config_.message());
Austin Schuhe84c3ed2019-12-14 15:29:48 -080086
Austin Schuh7bc59052020-02-16 23:48:33 -080087 // And build the app for testing.
Austin Schuh99f7c6a2024-06-25 22:07:44 -070088 absl::SetFlag(&FLAGS_application_name, "test");
Kiran Mohan38648482024-03-15 15:56:14 -070089 aos::ShmEventLoop test_event_loop(&config_.message());
Austin Schuh7bc59052020-02-16 23:48:33 -080090
91 aos::Fetcher<ClientStatistics> client_statistics_fetcher =
92 test_event_loop.MakeFetcher<ClientStatistics>("/aos");
Austin Schuh0de30f32020-12-06 12:44:28 -080093 aos::Fetcher<RemoteMessage> message_header_fetcher2 =
94 test_event_loop.MakeFetcher<RemoteMessage>(
Austin Schuh36a2c3e2021-02-18 22:28:38 -080095 shared() ? "/pi2/aos/remote_timestamps/pi1"
96 : "/pi2/aos/remote_timestamps/pi1/pi2/aos/"
97 "aos-message_bridge-Timestamp");
Austin Schuh2f8fd752020-09-01 22:38:28 -070098
99 // Event loop for fetching data delivered to pi2 from pi1 to match up
100 // messages.
Kiran Mohan38648482024-03-15 15:56:14 -0700101 aos::ShmEventLoop delivered_messages_event_loop(&config_.message());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700102 aos::Fetcher<Timestamp> pi1_on_pi2_timestamp_fetcher =
103 delivered_messages_event_loop.MakeFetcher<Timestamp>("/pi1/aos");
104 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
105 delivered_messages_event_loop.MakeFetcher<examples::Ping>("/test");
106 EXPECT_FALSE(ping_on_pi2_fetcher.Fetch());
107 EXPECT_FALSE(pi1_on_pi2_timestamp_fetcher.Fetch());
Austin Schuh7bc59052020-02-16 23:48:33 -0800108
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800109 // Count the pongs.
110 int pong_count = 0;
Austin Schuh8902fa52021-03-14 22:39:24 -0700111 pong_event_loop.MakeWatcher("/test", [&pong_count, &pong_event_loop,
112 this](const examples::Ping &ping) {
Kiran Mohan38648482024-03-15 15:56:14 -0700113 EXPECT_EQ(pong_event_loop.context().source_boot_uuid, pi1_.boot_uuid_);
Austin Schuh8902fa52021-03-14 22:39:24 -0700114 ++pong_count;
115 VLOG(1) << "Got ping back " << FlatbufferToJson(&ping);
116 });
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800117
Austin Schuh99f7c6a2024-06-25 22:07:44 -0700118 absl::SetFlag(&FLAGS_override_hostname, "");
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800119
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800120 // Wait until we are connected, then send.
121 int ping_count = 0;
Austin Schuh7bc59052020-02-16 23:48:33 -0800122 int pi1_server_statistics_count = 0;
Philipp Schrader790cb542023-07-05 21:06:52 -0700123 ping_event_loop.MakeWatcher(
124 "/pi1/aos",
125 [this, &ping_count, &ping_sender, &pi1_server_statistics_count,
126 &long_data](const ServerStatistics &stats) {
127 VLOG(1) << "/pi1/aos ServerStatistics " << FlatbufferToJson(&stats);
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800128
Philipp Schrader790cb542023-07-05 21:06:52 -0700129 ASSERT_TRUE(stats.has_connections());
130 EXPECT_EQ(stats.connections()->size(), 1);
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800131
Philipp Schrader790cb542023-07-05 21:06:52 -0700132 bool connected = false;
133 for (const ServerConnection *connection : *stats.connections()) {
134 // Confirm that we are estimating the server time offset correctly. It
135 // should be about 0 since we are on the same machine here.
136 if (connection->has_monotonic_offset()) {
137 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
138 chrono::milliseconds(1));
139 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
140 chrono::milliseconds(-1));
141 ++pi1_server_statistics_count;
142 }
Austin Schuh7bc59052020-02-16 23:48:33 -0800143
Philipp Schrader790cb542023-07-05 21:06:52 -0700144 if (connection->node()->name()->string_view() ==
Kiran Mohan38648482024-03-15 15:56:14 -0700145 pi2_.client_event_loop_->node()->name()->string_view()) {
Philipp Schrader790cb542023-07-05 21:06:52 -0700146 if (connection->state() == State::CONNECTED) {
147 EXPECT_TRUE(connection->has_boot_uuid());
148 EXPECT_EQ(connection->connection_count(), 1u);
149 EXPECT_LT(monotonic_clock::time_point(chrono::nanoseconds(
150 connection->connected_since_time())),
151 monotonic_clock::now());
152 connected = true;
153 } else {
154 EXPECT_FALSE(connection->has_connection_count());
155 EXPECT_FALSE(connection->has_connected_since_time());
156 }
157 }
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800158 }
159
Philipp Schrader790cb542023-07-05 21:06:52 -0700160 if (connected) {
161 VLOG(1) << "Connected! Sent ping.";
162 auto builder = ping_sender.MakeBuilder();
163 builder.fbb()->CreateString(long_data);
164 examples::Ping::Builder ping_builder =
165 builder.MakeBuilder<examples::Ping>();
166 ping_builder.add_value(ping_count + 971);
167 EXPECT_EQ(builder.Send(ping_builder.Finish()), RawSender::Error::kOk);
168 ++ping_count;
169 }
170 });
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800171
Austin Schuh7bc59052020-02-16 23:48:33 -0800172 // Confirm both client and server statistics messages have decent offsets in
173 // them.
174 int pi2_server_statistics_count = 0;
Austin Schuh196a4452020-03-15 23:12:03 -0700175 pong_event_loop.MakeWatcher("/pi2/aos", [&pi2_server_statistics_count](
Austin Schuh7bc59052020-02-16 23:48:33 -0800176 const ServerStatistics &stats) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800177 VLOG(1) << "/pi2/aos ServerStatistics " << FlatbufferToJson(&stats);
Austin Schuh7bc59052020-02-16 23:48:33 -0800178 for (const ServerConnection *connection : *stats.connections()) {
179 if (connection->has_monotonic_offset()) {
180 ++pi2_server_statistics_count;
181 // Confirm that we are estimating the server time offset correctly. It
182 // should be about 0 since we are on the same machine here.
183 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
184 chrono::milliseconds(1));
185 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
186 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800187 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh7bc59052020-02-16 23:48:33 -0800188 }
Austin Schuh367a7f42021-11-23 23:04:36 -0800189
190 if (connection->state() == State::CONNECTED) {
191 EXPECT_EQ(connection->connection_count(), 1u);
192 EXPECT_LT(monotonic_clock::time_point(
193 chrono::nanoseconds(connection->connected_since_time())),
194 monotonic_clock::now());
195 } else {
Austin Schuha4e616a2023-05-15 17:59:30 -0700196 // If we have been connected, we expect the connection count to stay
197 // around.
198 if (pi2_server_statistics_count > 0) {
199 EXPECT_TRUE(connection->has_connection_count());
200 EXPECT_EQ(connection->connection_count(), 1u);
201 } else {
202 EXPECT_FALSE(connection->has_connection_count());
203 }
Austin Schuh367a7f42021-11-23 23:04:36 -0800204 EXPECT_FALSE(connection->has_connected_since_time());
205 }
Austin Schuh7bc59052020-02-16 23:48:33 -0800206 }
207 });
208
209 int pi1_client_statistics_count = 0;
Austin Schuh367a7f42021-11-23 23:04:36 -0800210 int pi1_connected_client_statistics_count = 0;
211 ping_event_loop.MakeWatcher(
212 "/pi1/aos",
213 [&pi1_client_statistics_count,
214 &pi1_connected_client_statistics_count](const ClientStatistics &stats) {
215 VLOG(1) << "/pi1/aos ClientStatistics " << FlatbufferToJson(&stats);
Austin Schuh7bc59052020-02-16 23:48:33 -0800216
Austin Schuh367a7f42021-11-23 23:04:36 -0800217 for (const ClientConnection *connection : *stats.connections()) {
218 if (connection->has_monotonic_offset()) {
219 ++pi1_client_statistics_count;
220 // It takes at least 10 microseconds to send a message between the
221 // client and server. The min (filtered) time shouldn't be over 10
222 // milliseconds on localhost. This might have to bump up if this is
223 // proving flaky.
224 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
225 chrono::milliseconds(10))
226 << " " << connection->monotonic_offset()
227 << "ns vs 10000ns on iteration " << pi1_client_statistics_count;
228 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
229 chrono::microseconds(10))
230 << " " << connection->monotonic_offset()
231 << "ns vs 10000ns on iteration " << pi1_client_statistics_count;
232 }
233 if (connection->state() == State::CONNECTED) {
234 EXPECT_EQ(connection->connection_count(), 1u);
235 EXPECT_LT(monotonic_clock::time_point(chrono::nanoseconds(
236 connection->connected_since_time())),
237 monotonic_clock::now());
238 // The first Connected message may not have a UUID in it since no
239 // data has flown. That's fine.
240 if (pi1_connected_client_statistics_count > 0) {
241 EXPECT_TRUE(connection->has_boot_uuid())
242 << ": " << aos::FlatbufferToJson(connection);
243 }
244 ++pi1_connected_client_statistics_count;
245 } else {
246 EXPECT_FALSE(connection->has_connection_count());
247 EXPECT_FALSE(connection->has_connected_since_time());
248 }
249 }
250 });
Austin Schuh7bc59052020-02-16 23:48:33 -0800251
252 int pi2_client_statistics_count = 0;
Austin Schuh367a7f42021-11-23 23:04:36 -0800253 int pi2_connected_client_statistics_count = 0;
254 pong_event_loop.MakeWatcher(
255 "/pi2/aos",
256 [&pi2_client_statistics_count,
257 &pi2_connected_client_statistics_count](const ClientStatistics &stats) {
258 VLOG(1) << "/pi2/aos ClientStatistics " << FlatbufferToJson(&stats);
Austin Schuh7bc59052020-02-16 23:48:33 -0800259
Austin Schuh367a7f42021-11-23 23:04:36 -0800260 for (const ClientConnection *connection : *stats.connections()) {
261 if (connection->has_monotonic_offset()) {
262 ++pi2_client_statistics_count;
263 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
264 chrono::milliseconds(10))
265 << ": got " << aos::FlatbufferToJson(connection);
266 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
267 chrono::microseconds(10))
268 << ": got " << aos::FlatbufferToJson(connection);
269 }
270 if (connection->state() == State::CONNECTED) {
271 EXPECT_EQ(connection->connection_count(), 1u);
272 EXPECT_LT(monotonic_clock::time_point(chrono::nanoseconds(
273 connection->connected_since_time())),
274 monotonic_clock::now());
275 if (pi2_connected_client_statistics_count > 0) {
276 EXPECT_TRUE(connection->has_boot_uuid());
277 }
278 ++pi2_connected_client_statistics_count;
279 } else {
Austin Schuha4e616a2023-05-15 17:59:30 -0700280 if (pi2_connected_client_statistics_count == 0) {
281 EXPECT_FALSE(connection->has_connection_count())
282 << aos::FlatbufferToJson(&stats);
283 } else {
284 EXPECT_TRUE(connection->has_connection_count())
285 << aos::FlatbufferToJson(&stats);
286 EXPECT_EQ(connection->connection_count(), 1u);
287 }
Austin Schuh367a7f42021-11-23 23:04:36 -0800288 EXPECT_FALSE(connection->has_connected_since_time());
289 }
290 }
291 });
Austin Schuh7bc59052020-02-16 23:48:33 -0800292
Austin Schuh196a4452020-03-15 23:12:03 -0700293 ping_event_loop.MakeWatcher("/pi1/aos", [](const Timestamp &timestamp) {
Austin Schuh7bc59052020-02-16 23:48:33 -0800294 EXPECT_TRUE(timestamp.has_offsets());
Austin Schuh1ca49e92020-12-11 00:01:27 -0800295 VLOG(1) << "/pi1/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuh7bc59052020-02-16 23:48:33 -0800296 });
Austin Schuh196a4452020-03-15 23:12:03 -0700297 pong_event_loop.MakeWatcher("/pi2/aos", [](const Timestamp &timestamp) {
Austin Schuh7bc59052020-02-16 23:48:33 -0800298 EXPECT_TRUE(timestamp.has_offsets());
Austin Schuh1ca49e92020-12-11 00:01:27 -0800299 VLOG(1) << "/pi2/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuh7bc59052020-02-16 23:48:33 -0800300 });
301
Austin Schuh2f8fd752020-09-01 22:38:28 -0700302 // Find the channel index for both the /pi1/aos Timestamp channel and Ping
303 // channel.
304 const size_t pi1_timestamp_channel = configuration::ChannelIndex(
305 pong_event_loop.configuration(), pi1_on_pi2_timestamp_fetcher.channel());
306 const size_t ping_timestamp_channel =
307 configuration::ChannelIndex(delivered_messages_event_loop.configuration(),
308 ping_on_pi2_fetcher.channel());
309
310 for (const Channel *channel : *ping_event_loop.configuration()->channels()) {
311 VLOG(1) << "Channel "
312 << configuration::ChannelIndex(ping_event_loop.configuration(),
313 channel)
314 << " " << configuration::CleanedChannelToString(channel);
315 }
316
317 // For each remote timestamp we get back, confirm that it is either a ping
318 // message, or a timestamp we sent out. Also confirm that the timestamps are
319 // correct.
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800320 for (std::pair<int, std::string> channel :
321 shared()
322 ? std::vector<std::pair<
323 int, std::string>>{{-1, "/pi1/aos/remote_timestamps/pi2"}}
324 : std::vector<std::pair<int, std::string>>{
325 {pi1_timestamp_channel,
326 "/pi1/aos/remote_timestamps/pi2/pi1/aos/"
327 "aos-message_bridge-Timestamp"},
328 {ping_timestamp_channel,
329 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping"}}) {
330 ping_event_loop.MakeWatcher(
331 channel.second,
332 [pi1_timestamp_channel, ping_timestamp_channel, &ping_on_pi2_fetcher,
333 &ping_on_pi1_fetcher, &pi1_on_pi2_timestamp_fetcher,
334 &pi1_on_pi1_timestamp_fetcher,
335 channel_index = channel.first](const RemoteMessage &header) {
336 VLOG(1) << "/pi1/aos/remote_timestamps/pi2 RemoteMessage "
337 << aos::FlatbufferToJson(&header);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700338
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800339 EXPECT_TRUE(header.has_boot_uuid());
340 if (channel_index != -1) {
341 ASSERT_EQ(channel_index, header.channel_index());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700342 }
343
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800344 const aos::monotonic_clock::time_point header_monotonic_sent_time(
345 chrono::nanoseconds(header.monotonic_sent_time()));
346 const aos::realtime_clock::time_point header_realtime_sent_time(
347 chrono::nanoseconds(header.realtime_sent_time()));
348 const aos::monotonic_clock::time_point header_monotonic_remote_time(
349 chrono::nanoseconds(header.monotonic_remote_time()));
Austin Schuhac6d89e2024-03-27 14:56:09 -0700350 const aos::monotonic_clock::time_point
351 header_monotonic_remote_transmit_time(
352 chrono::nanoseconds(header.monotonic_remote_transmit_time()));
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800353 const aos::realtime_clock::time_point header_realtime_remote_time(
354 chrono::nanoseconds(header.realtime_remote_time()));
355
356 const Context *pi1_context = nullptr;
357 const Context *pi2_context = nullptr;
358
359 if (header.channel_index() == pi1_timestamp_channel) {
360 // Find the forwarded message.
361 while (pi1_on_pi2_timestamp_fetcher.context().monotonic_event_time <
362 header_monotonic_sent_time) {
363 ASSERT_TRUE(pi1_on_pi2_timestamp_fetcher.FetchNext());
364 }
365
366 // And the source message.
367 while (pi1_on_pi1_timestamp_fetcher.context().monotonic_event_time <
368 header_monotonic_remote_time) {
369 ASSERT_TRUE(pi1_on_pi1_timestamp_fetcher.FetchNext());
370 }
371
372 pi1_context = &pi1_on_pi1_timestamp_fetcher.context();
373 pi2_context = &pi1_on_pi2_timestamp_fetcher.context();
374 } else if (header.channel_index() == ping_timestamp_channel) {
375 // Find the forwarded message.
376 while (ping_on_pi2_fetcher.context().monotonic_event_time <
377 header_monotonic_sent_time) {
378 ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
379 }
380
381 // And the source message.
382 while (ping_on_pi1_fetcher.context().monotonic_event_time <
383 header_monotonic_remote_time) {
384 ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
385 }
386
387 pi1_context = &ping_on_pi1_fetcher.context();
388 pi2_context = &ping_on_pi2_fetcher.context();
389 } else {
390 LOG(FATAL) << "Unknown channel";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700391 }
392
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800393 // Confirm the forwarded message has matching timestamps to the
394 // timestamps we got back.
395 EXPECT_EQ(pi2_context->queue_index, header.queue_index());
396 EXPECT_EQ(pi2_context->monotonic_event_time,
397 header_monotonic_sent_time);
398 EXPECT_EQ(pi2_context->realtime_event_time,
399 header_realtime_sent_time);
400 EXPECT_EQ(pi2_context->realtime_remote_time,
401 header_realtime_remote_time);
402 EXPECT_EQ(pi2_context->monotonic_remote_time,
403 header_monotonic_remote_time);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700404
Austin Schuhac6d89e2024-03-27 14:56:09 -0700405 EXPECT_LT(header_monotonic_remote_transmit_time,
406 pi2_context->monotonic_event_time);
407 EXPECT_GT(header_monotonic_remote_transmit_time,
408 pi2_context->monotonic_remote_time);
409
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800410 // Confirm the forwarded message also matches the source message.
411 EXPECT_EQ(pi1_context->queue_index, header.queue_index());
412 EXPECT_EQ(pi1_context->monotonic_event_time,
413 header_monotonic_remote_time);
414 EXPECT_EQ(pi1_context->realtime_event_time,
415 header_realtime_remote_time);
Austin Schuhac6d89e2024-03-27 14:56:09 -0700416 EXPECT_EQ(header_monotonic_remote_transmit_time,
417 pi2_context->monotonic_remote_transmit_time);
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800418 });
419 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700420
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800421 // Start everything up. Pong is the only thing we don't know how to wait
422 // on, so start it first.
Austin Schuha4e616a2023-05-15 17:59:30 -0700423 ThreadedEventLoopRunner pong_thread(&pong_event_loop);
424 ThreadedEventLoopRunner ping_thread(&ping_event_loop);
Austin Schuh7bc59052020-02-16 23:48:33 -0800425
Kiran Mohan38648482024-03-15 15:56:14 -0700426 pi1_.StartServer();
427 pi1_.StartClient();
428 pi2_.StartClient();
429 pi2_.StartServer();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800430
431 // And go!
Austin Schuha4e616a2023-05-15 17:59:30 -0700432 // Run for 5 seconds to make sure we have time to estimate the offset.
433 std::this_thread::sleep_for(chrono::milliseconds(5050));
Austin Schuh7bc59052020-02-16 23:48:33 -0800434
435 // Confirm that we are estimating a monotonic offset on the client.
436 ASSERT_TRUE(client_statistics_fetcher.Fetch());
437
438 EXPECT_EQ(client_statistics_fetcher->connections()->size(), 1u);
439 EXPECT_EQ(client_statistics_fetcher->connections()
440 ->Get(0)
441 ->node()
442 ->name()
443 ->string_view(),
444 "pi1");
445
446 // Make sure the offset in one direction is less than a second.
447 EXPECT_GT(
Austin Schuh2b159eb2021-07-31 19:42:21 -0700448 client_statistics_fetcher->connections()->Get(0)->monotonic_offset(), 0)
449 << aos::FlatbufferToJson(client_statistics_fetcher.get());
Austin Schuh7bc59052020-02-16 23:48:33 -0800450 EXPECT_LT(
451 client_statistics_fetcher->connections()->Get(0)->monotonic_offset(),
Austin Schuh2b159eb2021-07-31 19:42:21 -0700452 1000000000)
453 << aos::FlatbufferToJson(client_statistics_fetcher.get());
Austin Schuh7bc59052020-02-16 23:48:33 -0800454
Austin Schuha4e616a2023-05-15 17:59:30 -0700455 // Shut everyone else down before confirming everything actually ran.
456 ping_thread.Exit();
457 pong_thread.Exit();
Kiran Mohan38648482024-03-15 15:56:14 -0700458 pi1_.StopServer();
459 pi1_.StopClient();
460 pi2_.StopClient();
461 pi2_.StopServer();
Austin Schuha4e616a2023-05-15 17:59:30 -0700462
463 // Make sure we sent something.
464 EXPECT_GE(ping_count, 1);
465 // And got something back.
466 EXPECT_GE(pong_count, 1);
467
Austin Schuh7bc59052020-02-16 23:48:33 -0800468 EXPECT_GE(pi1_server_statistics_count, 2);
469 EXPECT_GE(pi2_server_statistics_count, 2);
470 EXPECT_GE(pi1_client_statistics_count, 2);
471 EXPECT_GE(pi2_client_statistics_count, 2);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700472
473 // Confirm we got timestamps back!
474 EXPECT_TRUE(message_header_fetcher1.Fetch());
475 EXPECT_TRUE(message_header_fetcher2.Fetch());
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800476}
477
Austin Schuh5344c352020-04-12 17:04:26 -0700478// Test that the client disconnecting triggers the server offsets on both sides
479// to clear.
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800480TEST_P(MessageBridgeParameterizedTest, ClientRestart) {
Austin Schuh5344c352020-04-12 17:04:26 -0700481 // This is rather annoying to set up. We need to start up a client and
482 // server, on the same node, but get them to think that they are on different
483 // nodes.
484 //
485 // We need the client to not post directly to "/test" like it would in a
486 // real system, otherwise we will re-send the ping message... So, use an
487 // application specific map to have the client post somewhere else.
488 //
489 // To top this all off, each of these needs to be done with a ShmEventLoop,
490 // which needs to run in a separate thread... And it is really hard to get
491 // everything started up reliably. So just be super generous on timeouts and
492 // hope for the best. We can be more generous in the future if we need to.
493 //
494 // We are faking the application names by passing in --application_name=foo
Kiran Mohan38648482024-03-15 15:56:14 -0700495 pi1_.OnPi();
Austin Schuh5344c352020-04-12 17:04:26 -0700496
Kiran Mohan38648482024-03-15 15:56:14 -0700497 pi1_.MakeServer();
498 pi1_.MakeClient();
Austin Schuh5344c352020-04-12 17:04:26 -0700499
500 // And build the app for testing.
Kiran Mohan38648482024-03-15 15:56:14 -0700501 pi1_.MakeTest("test1", &pi2_);
Austin Schuh5344c352020-04-12 17:04:26 -0700502 aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
Kiran Mohan38648482024-03-15 15:56:14 -0700503 pi1_.test_event_loop_->MakeFetcher<ServerStatistics>("/pi1/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700504
505 // Now do it for "raspberrypi2", the client.
Kiran Mohan38648482024-03-15 15:56:14 -0700506 pi2_.OnPi();
507 pi2_.MakeServer();
Austin Schuh5344c352020-04-12 17:04:26 -0700508
509 // And build the app for testing.
Kiran Mohan38648482024-03-15 15:56:14 -0700510 pi2_.MakeTest("test2", &pi1_);
Austin Schuh5344c352020-04-12 17:04:26 -0700511 aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
Kiran Mohan38648482024-03-15 15:56:14 -0700512 pi2_.test_event_loop_->MakeFetcher<ServerStatistics>("/pi2/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700513
514 // Wait until we are connected, then send.
Austin Schuh5344c352020-04-12 17:04:26 -0700515
Kiran Mohan38648482024-03-15 15:56:14 -0700516 pi1_.StartTest();
517 pi2_.StartTest();
518 pi1_.StartServer();
519 pi1_.StartClient();
520 pi2_.StartServer();
Austin Schuh5344c352020-04-12 17:04:26 -0700521
522 {
Kiran Mohan38648482024-03-15 15:56:14 -0700523 pi2_.MakeClient();
Austin Schuh5344c352020-04-12 17:04:26 -0700524
Kiran Mohan38648482024-03-15 15:56:14 -0700525 pi2_.RunClient(chrono::milliseconds(3050));
Austin Schuh5344c352020-04-12 17:04:26 -0700526
527 // Now confirm we are synchronized.
528 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
529 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
530
531 const ServerConnection *const pi1_connection =
532 pi1_server_statistics_fetcher->connections()->Get(0);
533 const ServerConnection *const pi2_connection =
534 pi2_server_statistics_fetcher->connections()->Get(0);
535
536 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800537 EXPECT_EQ(pi1_connection->connection_count(), 1u);
538 EXPECT_TRUE(pi1_connection->has_connected_since_time());
Austin Schuh5344c352020-04-12 17:04:26 -0700539 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
540 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
541 chrono::milliseconds(1));
542 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
543 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800544 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700545
546 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800547 EXPECT_EQ(pi2_connection->connection_count(), 1u);
548 EXPECT_TRUE(pi2_connection->has_connected_since_time());
Austin Schuh5344c352020-04-12 17:04:26 -0700549 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
550 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
551 chrono::milliseconds(1));
552 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
553 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800554 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800555
Kiran Mohan38648482024-03-15 15:56:14 -0700556 pi2_.StopClient();
Austin Schuh5344c352020-04-12 17:04:26 -0700557 }
558
Austin Schuhd0d894e2021-10-24 17:13:11 -0700559 std::this_thread::sleep_for(SctpClientConnection::kReconnectTimeout +
560 std::chrono::seconds(1));
Austin Schuh5344c352020-04-12 17:04:26 -0700561
562 {
563 // Now confirm we are un-synchronized.
564 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
565 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
566 const ServerConnection *const pi1_connection =
567 pi1_server_statistics_fetcher->connections()->Get(0);
568 const ServerConnection *const pi2_connection =
569 pi2_server_statistics_fetcher->connections()->Get(0);
570
571 EXPECT_EQ(pi1_connection->state(), State::DISCONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800572 EXPECT_EQ(pi1_connection->connection_count(), 1u);
573 EXPECT_FALSE(pi1_connection->has_connected_since_time());
Austin Schuh5344c352020-04-12 17:04:26 -0700574 EXPECT_FALSE(pi1_connection->has_monotonic_offset());
Austin Schuh20ac95d2020-12-05 17:24:19 -0800575 EXPECT_FALSE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700576 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
577 EXPECT_FALSE(pi2_connection->has_monotonic_offset());
Austin Schuh20ac95d2020-12-05 17:24:19 -0800578 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh367a7f42021-11-23 23:04:36 -0800579 EXPECT_EQ(pi2_connection->connection_count(), 1u);
580 EXPECT_TRUE(pi2_connection->has_connected_since_time());
Austin Schuh5344c352020-04-12 17:04:26 -0700581 }
582
583 {
Kiran Mohan38648482024-03-15 15:56:14 -0700584 pi2_.MakeClient();
Austin Schuh5344c352020-04-12 17:04:26 -0700585 // And go!
Kiran Mohan38648482024-03-15 15:56:14 -0700586 pi2_.RunClient(chrono::milliseconds(3050));
Austin Schuh5344c352020-04-12 17:04:26 -0700587
588 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
589 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
590
591 // Now confirm we are synchronized again.
592 const ServerConnection *const pi1_connection =
593 pi1_server_statistics_fetcher->connections()->Get(0);
594 const ServerConnection *const pi2_connection =
595 pi2_server_statistics_fetcher->connections()->Get(0);
596
597 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800598 EXPECT_EQ(pi1_connection->connection_count(), 2u);
599 EXPECT_TRUE(pi1_connection->has_connected_since_time());
Austin Schuh5344c352020-04-12 17:04:26 -0700600 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
601 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
Austin Schuh367a7f42021-11-23 23:04:36 -0800602 chrono::milliseconds(1))
603 << ": " << FlatbufferToJson(pi1_connection);
Austin Schuh5344c352020-04-12 17:04:26 -0700604 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
Austin Schuh367a7f42021-11-23 23:04:36 -0800605 chrono::milliseconds(-1))
606 << ": " << FlatbufferToJson(pi1_connection);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800607 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700608
609 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800610 EXPECT_EQ(pi2_connection->connection_count(), 1u);
611 EXPECT_TRUE(pi2_connection->has_connected_since_time());
Austin Schuh5344c352020-04-12 17:04:26 -0700612 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
613 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
Austin Schuh367a7f42021-11-23 23:04:36 -0800614 chrono::milliseconds(1))
615 << ": " << FlatbufferToJson(pi2_connection);
Austin Schuh5344c352020-04-12 17:04:26 -0700616 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
Austin Schuh367a7f42021-11-23 23:04:36 -0800617 chrono::milliseconds(-1))
618 << ": " << FlatbufferToJson(pi2_connection);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800619 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800620
Kiran Mohan38648482024-03-15 15:56:14 -0700621 pi2_.StopClient();
Austin Schuh5344c352020-04-12 17:04:26 -0700622 }
623
James Kuszmaul79b2f032023-06-02 21:02:27 -0700624 // Shut everyone else down.
Kiran Mohan38648482024-03-15 15:56:14 -0700625 pi1_.StopServer();
626 pi1_.StopClient();
627 pi2_.StopServer();
628 pi1_.StopTest();
629 pi2_.StopTest();
Austin Schuh5344c352020-04-12 17:04:26 -0700630}
631
632// Test that the server disconnecting triggers the server offsets on the other
633// side to clear, along with the other client.
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800634TEST_P(MessageBridgeParameterizedTest, ServerRestart) {
Austin Schuh5344c352020-04-12 17:04:26 -0700635 // This is rather annoying to set up. We need to start up a client and
636 // server, on the same node, but get them to think that they are on different
637 // nodes.
638 //
639 // We need the client to not post directly to "/test" like it would in a
640 // real system, otherwise we will re-send the ping message... So, use an
641 // application specific map to have the client post somewhere else.
642 //
643 // To top this all off, each of these needs to be done with a ShmEventLoop,
644 // which needs to run in a separate thread... And it is really hard to get
645 // everything started up reliably. So just be super generous on timeouts and
646 // hope for the best. We can be more generous in the future if we need to.
647 //
648 // We are faking the application names by passing in --application_name=foo
Austin Schuh5344c352020-04-12 17:04:26 -0700649 // Force ourselves to be "raspberrypi" and allocate everything.
Kiran Mohan38648482024-03-15 15:56:14 -0700650 pi1_.OnPi();
651 pi1_.MakeServer();
652 pi1_.MakeClient();
Austin Schuh5344c352020-04-12 17:04:26 -0700653
654 // And build the app for testing.
Kiran Mohan38648482024-03-15 15:56:14 -0700655 pi1_.MakeTest("test1", &pi2_);
Austin Schuh5344c352020-04-12 17:04:26 -0700656 aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
Kiran Mohan38648482024-03-15 15:56:14 -0700657 pi1_.test_event_loop_->MakeFetcher<ServerStatistics>("/pi1/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700658 aos::Fetcher<ClientStatistics> pi1_client_statistics_fetcher =
Kiran Mohan38648482024-03-15 15:56:14 -0700659 pi1_.test_event_loop_->MakeFetcher<ClientStatistics>("/pi1/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700660
661 // Now do it for "raspberrypi2", the client.
Kiran Mohan38648482024-03-15 15:56:14 -0700662 pi2_.OnPi();
663 pi2_.MakeClient();
Austin Schuh5344c352020-04-12 17:04:26 -0700664
665 // And build the app for testing.
Kiran Mohan38648482024-03-15 15:56:14 -0700666 pi2_.MakeTest("test1", &pi1_);
Austin Schuh5344c352020-04-12 17:04:26 -0700667 aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
Kiran Mohan38648482024-03-15 15:56:14 -0700668 pi2_.test_event_loop_->MakeFetcher<ServerStatistics>("/pi2/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700669
670 // Start everything up. Pong is the only thing we don't know how to wait on,
671 // so start it first.
Kiran Mohan38648482024-03-15 15:56:14 -0700672 pi1_.StartTest();
673 pi2_.StartTest();
674 pi1_.StartServer();
675 pi1_.StartClient();
676 pi2_.StartClient();
Austin Schuh5344c352020-04-12 17:04:26 -0700677
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800678 // Confirm both client and server statistics messages have decent offsets in
679 // them.
Austin Schuh5344c352020-04-12 17:04:26 -0700680
681 {
Kiran Mohan38648482024-03-15 15:56:14 -0700682 pi2_.MakeServer();
Austin Schuh5344c352020-04-12 17:04:26 -0700683
Kiran Mohan38648482024-03-15 15:56:14 -0700684 pi2_.RunServer(chrono::milliseconds(3050));
Austin Schuh5344c352020-04-12 17:04:26 -0700685
686 // Now confirm we are synchronized.
687 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
688 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
689
690 const ServerConnection *const pi1_connection =
691 pi1_server_statistics_fetcher->connections()->Get(0);
692 const ServerConnection *const pi2_connection =
693 pi2_server_statistics_fetcher->connections()->Get(0);
694
695 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
696 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
697 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
698 chrono::milliseconds(1));
699 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
700 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800701 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh367a7f42021-11-23 23:04:36 -0800702 EXPECT_TRUE(pi1_connection->has_connected_since_time());
703 EXPECT_EQ(pi1_connection->connection_count(), 1u);
Austin Schuh5344c352020-04-12 17:04:26 -0700704
705 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
706 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
707 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
708 chrono::milliseconds(1));
709 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
710 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800711 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh367a7f42021-11-23 23:04:36 -0800712 EXPECT_TRUE(pi2_connection->has_connected_since_time());
713 EXPECT_EQ(pi2_connection->connection_count(), 1u);
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800714
Kiran Mohan38648482024-03-15 15:56:14 -0700715 pi2_.StopServer();
Austin Schuh5344c352020-04-12 17:04:26 -0700716 }
717
718 std::this_thread::sleep_for(std::chrono::seconds(2));
719
720 {
721 // And confirm we are unsynchronized.
722 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
723 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
724
725 const ServerConnection *const pi1_server_connection =
726 pi1_server_statistics_fetcher->connections()->Get(0);
727 const ClientConnection *const pi1_client_connection =
728 pi1_client_statistics_fetcher->connections()->Get(0);
729
730 EXPECT_EQ(pi1_server_connection->state(), State::CONNECTED);
731 EXPECT_FALSE(pi1_server_connection->has_monotonic_offset());
Austin Schuh367a7f42021-11-23 23:04:36 -0800732 EXPECT_TRUE(pi1_server_connection->has_connected_since_time());
733 EXPECT_EQ(pi1_server_connection->connection_count(), 1u);
734
Austin Schuh20ac95d2020-12-05 17:24:19 -0800735 EXPECT_TRUE(pi1_server_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700736 EXPECT_EQ(pi1_client_connection->state(), State::DISCONNECTED);
737 EXPECT_FALSE(pi1_client_connection->has_monotonic_offset());
Austin Schuh367a7f42021-11-23 23:04:36 -0800738 EXPECT_FALSE(pi1_client_connection->has_connected_since_time());
739 EXPECT_EQ(pi1_client_connection->connection_count(), 1u);
740 EXPECT_FALSE(pi1_client_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700741 }
742
743 {
Kiran Mohan38648482024-03-15 15:56:14 -0700744 pi2_.MakeServer();
Austin Schuh5344c352020-04-12 17:04:26 -0700745
Austin Schuh5cd1d752023-08-18 17:31:46 -0700746 // Wait long enough for the client to connect again. It currently takes 3
747 // seconds of connection to estimate the time offset.
Kiran Mohan38648482024-03-15 15:56:14 -0700748 pi2_.RunServer(chrono::milliseconds(4050));
Austin Schuh5344c352020-04-12 17:04:26 -0700749
750 // And confirm we are synchronized again.
751 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
752 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
Austin Schuh367a7f42021-11-23 23:04:36 -0800753 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
Austin Schuh5344c352020-04-12 17:04:26 -0700754
755 const ServerConnection *const pi1_connection =
756 pi1_server_statistics_fetcher->connections()->Get(0);
757 const ServerConnection *const pi2_connection =
758 pi2_server_statistics_fetcher->connections()->Get(0);
Austin Schuh367a7f42021-11-23 23:04:36 -0800759 const ClientConnection *const pi1_client_connection =
760 pi1_client_statistics_fetcher->connections()->Get(0);
Austin Schuh5344c352020-04-12 17:04:26 -0700761
762 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
763 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
764 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
765 chrono::milliseconds(1));
766 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
767 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800768 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700769
Austin Schuh367a7f42021-11-23 23:04:36 -0800770 EXPECT_EQ(pi1_client_connection->state(), State::CONNECTED);
771 EXPECT_TRUE(pi1_client_connection->has_connected_since_time());
772 EXPECT_EQ(pi1_client_connection->connection_count(), 2u);
773 EXPECT_TRUE(pi1_client_connection->has_boot_uuid());
774
Austin Schuh5344c352020-04-12 17:04:26 -0700775 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
776 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
777 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
778 chrono::milliseconds(1));
779 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
780 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800781 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800782
Kiran Mohan38648482024-03-15 15:56:14 -0700783 pi2_.StopServer();
Austin Schuh5344c352020-04-12 17:04:26 -0700784 }
785
James Kuszmaul79b2f032023-06-02 21:02:27 -0700786 // Shut everyone else down.
Kiran Mohan38648482024-03-15 15:56:14 -0700787 pi1_.StopServer();
788 pi1_.StopClient();
789 pi2_.StopClient();
790 pi1_.StopTest();
791 pi2_.StopTest();
Austin Schuh5344c352020-04-12 17:04:26 -0700792}
793
Austin Schuh4889b182020-11-18 19:11:56 -0800794// TODO(austin): The above test confirms that the external state does the right
Austin Schuh5344c352020-04-12 17:04:26 -0700795// thing, but doesn't confirm that the internal state does. We either need to
796// expose a way to check the state in a thread-safe way, or need a way to jump
797// time for one node to do that.
798
Austin Schuh4889b182020-11-18 19:11:56 -0800799void SendPing(aos::Sender<examples::Ping> *sender, int value) {
800 aos::Sender<examples::Ping>::Builder builder = sender->MakeBuilder();
801 examples::Ping::Builder ping_builder = builder.MakeBuilder<examples::Ping>();
802 ping_builder.add_value(value);
milind1f1dca32021-07-03 13:50:07 -0700803 builder.CheckOk(builder.Send(ping_builder.Finish()));
Austin Schuh4889b182020-11-18 19:11:56 -0800804}
805
806// Tests that when a message is sent before the bridge starts up, but is
807// configured as reliable, we forward it. Confirm this survives a client reset.
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800808TEST_P(MessageBridgeParameterizedTest, ReliableSentBeforeClientStartup) {
Kiran Mohan38648482024-03-15 15:56:14 -0700809 pi1_.OnPi();
Austin Schuh4889b182020-11-18 19:11:56 -0800810
Austin Schuh99f7c6a2024-06-25 22:07:44 -0700811 absl::SetFlag(&FLAGS_application_name, "sender");
Kiran Mohan38648482024-03-15 15:56:14 -0700812 aos::ShmEventLoop send_event_loop(&config_.message());
Austin Schuh4889b182020-11-18 19:11:56 -0800813 aos::Sender<examples::Ping> ping_sender =
814 send_event_loop.MakeSender<examples::Ping>("/test");
815 SendPing(&ping_sender, 1);
816 aos::Sender<examples::Ping> unreliable_ping_sender =
817 send_event_loop.MakeSender<examples::Ping>("/unreliable");
818 SendPing(&unreliable_ping_sender, 1);
819
Kiran Mohan38648482024-03-15 15:56:14 -0700820 pi1_.MakeServer();
821 pi1_.MakeClient();
Austin Schuh4889b182020-11-18 19:11:56 -0800822
Austin Schuh99f7c6a2024-06-25 22:07:44 -0700823 absl::SetFlag(&FLAGS_application_name, "pi1_timestamp");
Kiran Mohan38648482024-03-15 15:56:14 -0700824 aos::ShmEventLoop pi1_remote_timestamp_event_loop(&config_.message());
Austin Schuh4889b182020-11-18 19:11:56 -0800825
826 // Now do it for "raspberrypi2", the client.
Kiran Mohan38648482024-03-15 15:56:14 -0700827 pi2_.OnPi();
Austin Schuh4889b182020-11-18 19:11:56 -0800828
Kiran Mohan38648482024-03-15 15:56:14 -0700829 pi2_.MakeServer();
Austin Schuh4889b182020-11-18 19:11:56 -0800830
Kiran Mohan38648482024-03-15 15:56:14 -0700831 aos::ShmEventLoop receive_event_loop(&config_.message());
Austin Schuh4889b182020-11-18 19:11:56 -0800832 aos::Fetcher<examples::Ping> ping_fetcher =
833 receive_event_loop.MakeFetcher<examples::Ping>("/test");
834 aos::Fetcher<examples::Ping> unreliable_ping_fetcher =
835 receive_event_loop.MakeFetcher<examples::Ping>("/unreliable");
836 aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
837 receive_event_loop.MakeFetcher<ClientStatistics>("/pi2/aos");
838
839 const size_t ping_channel_index = configuration::ChannelIndex(
840 receive_event_loop.configuration(), ping_fetcher.channel());
841
James Kuszmaul79b2f032023-06-02 21:02:27 -0700842 // ping_timestamp_count is accessed from multiple threads (the Watcher that
843 // triggers it is in a separate thread), so make it atomic.
Austin Schuh4889b182020-11-18 19:11:56 -0800844 std::atomic<int> ping_timestamp_count{0};
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800845 const std::string channel_name =
846 shared() ? "/pi1/aos/remote_timestamps/pi2"
847 : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping";
Austin Schuh4889b182020-11-18 19:11:56 -0800848 pi1_remote_timestamp_event_loop.MakeWatcher(
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800849 channel_name, [this, channel_name, ping_channel_index,
850 &ping_timestamp_count](const RemoteMessage &header) {
Austin Schuh61e973f2021-02-21 21:43:56 -0800851 VLOG(1) << channel_name << " RemoteMessage "
Austin Schuh0de30f32020-12-06 12:44:28 -0800852 << aos::FlatbufferToJson(&header);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800853 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800854 if (shared() && header.channel_index() != ping_channel_index) {
855 return;
Austin Schuh4889b182020-11-18 19:11:56 -0800856 }
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800857 CHECK_EQ(header.channel_index(), ping_channel_index);
858 ++ping_timestamp_count;
Austin Schuh4889b182020-11-18 19:11:56 -0800859 });
860
861 // Before everything starts up, confirm there is no message.
862 EXPECT_FALSE(ping_fetcher.Fetch());
863 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
864
James Kuszmaul79b2f032023-06-02 21:02:27 -0700865 // Spin up the persistent pieces.
Kiran Mohan38648482024-03-15 15:56:14 -0700866 pi1_.StartServer();
867 pi1_.StartClient();
868 pi2_.StartServer();
Austin Schuh4889b182020-11-18 19:11:56 -0800869
870 // Event used to wait for the timestamp counting thread to start.
Austin Schuha4e616a2023-05-15 17:59:30 -0700871 std::unique_ptr<ThreadedEventLoopRunner> pi1_remote_timestamp_thread =
872 std::make_unique<ThreadedEventLoopRunner>(
873 &pi1_remote_timestamp_event_loop);
Austin Schuh4889b182020-11-18 19:11:56 -0800874
875 {
Austin Schuhac6d89e2024-03-27 14:56:09 -0700876 const aos::monotonic_clock::time_point startup_time =
877 aos::monotonic_clock::now();
James Kuszmaul79b2f032023-06-02 21:02:27 -0700878 // Now spin up a client for 2 seconds.
Kiran Mohan38648482024-03-15 15:56:14 -0700879 pi2_.MakeClient();
Austin Schuh4889b182020-11-18 19:11:56 -0800880
Kiran Mohan38648482024-03-15 15:56:14 -0700881 pi2_.RunClient(chrono::milliseconds(2050));
Austin Schuh4889b182020-11-18 19:11:56 -0800882
883 // Confirm there is no detected duplicate packet.
884 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
885 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
886 ->Get(0)
887 ->duplicate_packets(),
888 0u);
889
Austin Schuhe61d4382021-03-31 21:33:02 -0700890 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
891 ->Get(0)
892 ->partial_deliveries(),
893 0u);
894
Austin Schuh4889b182020-11-18 19:11:56 -0800895 EXPECT_TRUE(ping_fetcher.Fetch());
Austin Schuhac6d89e2024-03-27 14:56:09 -0700896 EXPECT_GT(ping_fetcher.context().monotonic_remote_transmit_time,
897 startup_time);
898 EXPECT_LT(ping_fetcher.context().monotonic_remote_transmit_time,
899 aos::monotonic_clock::now());
Austin Schuh4889b182020-11-18 19:11:56 -0800900 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
901 EXPECT_EQ(ping_timestamp_count, 1);
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800902
Kiran Mohan38648482024-03-15 15:56:14 -0700903 pi2_.StopClient();
Austin Schuh4889b182020-11-18 19:11:56 -0800904 }
905
906 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800907 // Now, spin up a client for 2 seconds.
Kiran Mohan38648482024-03-15 15:56:14 -0700908 pi2_.MakeClient();
Austin Schuh4889b182020-11-18 19:11:56 -0800909
Kiran Mohan38648482024-03-15 15:56:14 -0700910 pi2_.RunClient(chrono::milliseconds(5050));
Austin Schuh4889b182020-11-18 19:11:56 -0800911
912 // Confirm we detect the duplicate packet correctly.
913 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
914 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
915 ->Get(0)
916 ->duplicate_packets(),
917 1u);
918
Austin Schuhe61d4382021-03-31 21:33:02 -0700919 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
920 ->Get(0)
921 ->partial_deliveries(),
922 0u);
923
Austin Schuh4889b182020-11-18 19:11:56 -0800924 EXPECT_EQ(ping_timestamp_count, 1);
925 EXPECT_FALSE(ping_fetcher.Fetch());
926 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800927
Kiran Mohan38648482024-03-15 15:56:14 -0700928 pi2_.StopClient();
Austin Schuh4889b182020-11-18 19:11:56 -0800929 }
930
James Kuszmaul79b2f032023-06-02 21:02:27 -0700931 // Shut everyone else down.
Kiran Mohan38648482024-03-15 15:56:14 -0700932 pi1_.StopClient();
933 pi2_.StopServer();
Austin Schuha4e616a2023-05-15 17:59:30 -0700934 pi1_remote_timestamp_thread.reset();
Kiran Mohan38648482024-03-15 15:56:14 -0700935 pi1_.StopServer();
Austin Schuh4889b182020-11-18 19:11:56 -0800936}
937
938// Tests that when a message is sent before the bridge starts up, but is
939// configured as reliable, we forward it. Confirm this works across server
940// resets.
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800941TEST_P(MessageBridgeParameterizedTest, ReliableSentBeforeServerStartup) {
Austin Schuh4889b182020-11-18 19:11:56 -0800942 // Now do it for "raspberrypi2", the client.
Kiran Mohan38648482024-03-15 15:56:14 -0700943 pi2_.OnPi();
Austin Schuh4889b182020-11-18 19:11:56 -0800944
Kiran Mohan38648482024-03-15 15:56:14 -0700945 pi2_.MakeServer();
946 pi2_.MakeClient();
Austin Schuh4889b182020-11-18 19:11:56 -0800947
Kiran Mohan38648482024-03-15 15:56:14 -0700948 aos::ShmEventLoop receive_event_loop(&config_.message());
Austin Schuh4889b182020-11-18 19:11:56 -0800949 aos::Fetcher<examples::Ping> ping_fetcher =
950 receive_event_loop.MakeFetcher<examples::Ping>("/test");
951 aos::Fetcher<examples::Ping> unreliable_ping_fetcher =
952 receive_event_loop.MakeFetcher<examples::Ping>("/unreliable");
953 aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
954 receive_event_loop.MakeFetcher<ClientStatistics>("/pi2/aos");
955
Austin Schuh4889b182020-11-18 19:11:56 -0800956 // Force ourselves to be "raspberrypi" and allocate everything.
Kiran Mohan38648482024-03-15 15:56:14 -0700957 pi1_.OnPi();
Austin Schuh4889b182020-11-18 19:11:56 -0800958
Austin Schuh99f7c6a2024-06-25 22:07:44 -0700959 absl::SetFlag(&FLAGS_application_name, "sender");
Kiran Mohan38648482024-03-15 15:56:14 -0700960 aos::ShmEventLoop send_event_loop(&config_.message());
Austin Schuh4889b182020-11-18 19:11:56 -0800961 aos::Sender<examples::Ping> ping_sender =
962 send_event_loop.MakeSender<examples::Ping>("/test");
963 {
964 aos::Sender<examples::Ping>::Builder builder = ping_sender.MakeBuilder();
965 examples::Ping::Builder ping_builder =
966 builder.MakeBuilder<examples::Ping>();
967 ping_builder.add_value(1);
milind1f1dca32021-07-03 13:50:07 -0700968 builder.CheckOk(builder.Send(ping_builder.Finish()));
Austin Schuh4889b182020-11-18 19:11:56 -0800969 }
970
Kiran Mohan38648482024-03-15 15:56:14 -0700971 pi1_.MakeClient();
Austin Schuh4889b182020-11-18 19:11:56 -0800972
Austin Schuh99f7c6a2024-06-25 22:07:44 -0700973 absl::SetFlag(&FLAGS_application_name, "pi1_timestamp");
Kiran Mohan38648482024-03-15 15:56:14 -0700974 aos::ShmEventLoop pi1_remote_timestamp_event_loop(&config_.message());
Austin Schuh4889b182020-11-18 19:11:56 -0800975
976 const size_t ping_channel_index = configuration::ChannelIndex(
977 receive_event_loop.configuration(), ping_fetcher.channel());
978
James Kuszmaul79b2f032023-06-02 21:02:27 -0700979 // ping_timestamp_count is accessed from multiple threads (the Watcher that
980 // triggers it is in a separate thread), so make it atomic.
Austin Schuh4889b182020-11-18 19:11:56 -0800981 std::atomic<int> ping_timestamp_count{0};
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800982 const std::string channel_name =
983 shared() ? "/pi1/aos/remote_timestamps/pi2"
984 : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping";
Austin Schuh4889b182020-11-18 19:11:56 -0800985 pi1_remote_timestamp_event_loop.MakeWatcher(
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800986 channel_name, [this, channel_name, ping_channel_index,
987 &ping_timestamp_count](const RemoteMessage &header) {
988 VLOG(1) << channel_name << " RemoteMessage "
Austin Schuh0de30f32020-12-06 12:44:28 -0800989 << aos::FlatbufferToJson(&header);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800990 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800991 if (shared() && header.channel_index() != ping_channel_index) {
992 return;
Austin Schuh4889b182020-11-18 19:11:56 -0800993 }
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800994 CHECK_EQ(header.channel_index(), ping_channel_index);
995 ++ping_timestamp_count;
Austin Schuh4889b182020-11-18 19:11:56 -0800996 });
997
998 // Before everything starts up, confirm there is no message.
999 EXPECT_FALSE(ping_fetcher.Fetch());
1000 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
1001
James Kuszmaul79b2f032023-06-02 21:02:27 -07001002 // Spin up the persistent pieces.
Kiran Mohan38648482024-03-15 15:56:14 -07001003 pi1_.StartClient();
1004 pi2_.StartServer();
1005 pi2_.StartClient();
Austin Schuh4889b182020-11-18 19:11:56 -08001006
Austin Schuha4e616a2023-05-15 17:59:30 -07001007 std::unique_ptr<ThreadedEventLoopRunner> pi1_remote_timestamp_thread =
1008 std::make_unique<ThreadedEventLoopRunner>(
1009 &pi1_remote_timestamp_event_loop);
Austin Schuh4889b182020-11-18 19:11:56 -08001010
1011 {
Austin Schuhac6d89e2024-03-27 14:56:09 -07001012 const aos::monotonic_clock::time_point startup_time =
1013 aos::monotonic_clock::now();
Austin Schuh4889b182020-11-18 19:11:56 -08001014 // Now, spin up a server for 2 seconds.
Kiran Mohan38648482024-03-15 15:56:14 -07001015 pi1_.MakeServer();
Austin Schuh4889b182020-11-18 19:11:56 -08001016
Kiran Mohan38648482024-03-15 15:56:14 -07001017 pi1_.RunServer(chrono::milliseconds(2050));
Austin Schuh4889b182020-11-18 19:11:56 -08001018
1019 // Confirm there is no detected duplicate packet.
1020 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1021 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1022 ->Get(0)
1023 ->duplicate_packets(),
1024 0u);
1025
Austin Schuhe61d4382021-03-31 21:33:02 -07001026 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1027 ->Get(0)
1028 ->partial_deliveries(),
1029 0u);
1030
Austin Schuh4889b182020-11-18 19:11:56 -08001031 EXPECT_TRUE(ping_fetcher.Fetch());
Austin Schuhac6d89e2024-03-27 14:56:09 -07001032 EXPECT_GT(ping_fetcher.context().monotonic_remote_transmit_time,
1033 startup_time);
1034 EXPECT_LT(ping_fetcher.context().monotonic_remote_transmit_time,
1035 aos::monotonic_clock::now());
1036
Austin Schuh4889b182020-11-18 19:11:56 -08001037 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
1038 EXPECT_EQ(ping_timestamp_count, 1);
1039 LOG(INFO) << "Shutting down first pi1 MessageBridgeServer";
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001040
Kiran Mohan38648482024-03-15 15:56:14 -07001041 pi1_.StopServer();
Austin Schuh4889b182020-11-18 19:11:56 -08001042 }
1043
1044 {
1045 // Now, spin up a second server for 2 seconds.
Kiran Mohan38648482024-03-15 15:56:14 -07001046 pi1_.MakeServer();
Austin Schuh4889b182020-11-18 19:11:56 -08001047
Kiran Mohan38648482024-03-15 15:56:14 -07001048 pi1_.RunServer(chrono::milliseconds(2050));
Austin Schuh4889b182020-11-18 19:11:56 -08001049
1050 // Confirm we detect the duplicate packet correctly.
1051 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1052 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1053 ->Get(0)
1054 ->duplicate_packets(),
1055 1u);
1056
Austin Schuhe61d4382021-03-31 21:33:02 -07001057 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1058 ->Get(0)
1059 ->partial_deliveries(),
1060 0u);
1061
Austin Schuh4889b182020-11-18 19:11:56 -08001062 EXPECT_EQ(ping_timestamp_count, 1);
1063 EXPECT_FALSE(ping_fetcher.Fetch());
1064 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001065
Kiran Mohan38648482024-03-15 15:56:14 -07001066 pi1_.StopServer();
Austin Schuh4889b182020-11-18 19:11:56 -08001067 }
1068
James Kuszmaul79b2f032023-06-02 21:02:27 -07001069 // Shut everyone else down.
Kiran Mohan38648482024-03-15 15:56:14 -07001070 pi1_.StopClient();
1071 pi2_.StopServer();
1072 pi2_.StopClient();
Austin Schuha4e616a2023-05-15 17:59:30 -07001073 pi1_remote_timestamp_thread.reset();
Austin Schuh4889b182020-11-18 19:11:56 -08001074}
1075
James Kuszmaul79b2f032023-06-02 21:02:27 -07001076// Tests that when multiple reliable messages are sent during a time when the
1077// client is restarting that only the final of those messages makes it to the
1078// client. This ensures that we handle a disconnecting & reconnecting client
1079// correctly in the server reliable connection retry logic.
1080TEST_P(MessageBridgeParameterizedTest, ReliableSentDuringClientReboot) {
Kiran Mohan38648482024-03-15 15:56:14 -07001081 pi1_.OnPi();
James Kuszmaul79b2f032023-06-02 21:02:27 -07001082
Austin Schuh99f7c6a2024-06-25 22:07:44 -07001083 absl::SetFlag(&FLAGS_application_name, "sender");
Kiran Mohan38648482024-03-15 15:56:14 -07001084 aos::ShmEventLoop send_event_loop(&config_.message());
James Kuszmaul79b2f032023-06-02 21:02:27 -07001085 aos::Sender<examples::Ping> ping_sender =
1086 send_event_loop.MakeSender<examples::Ping>("/test");
1087 size_t ping_index = 0;
1088 SendPing(&ping_sender, ++ping_index);
1089
Kiran Mohan38648482024-03-15 15:56:14 -07001090 pi1_.MakeServer();
1091 pi1_.MakeClient();
James Kuszmaul79b2f032023-06-02 21:02:27 -07001092
Austin Schuh99f7c6a2024-06-25 22:07:44 -07001093 absl::SetFlag(&FLAGS_application_name, "pi1_timestamp");
Kiran Mohan38648482024-03-15 15:56:14 -07001094 aos::ShmEventLoop pi1_remote_timestamp_event_loop(&config_.message());
James Kuszmaul79b2f032023-06-02 21:02:27 -07001095
1096 // Now do it for "raspberrypi2", the client.
Kiran Mohan38648482024-03-15 15:56:14 -07001097 pi2_.OnPi();
James Kuszmaul79b2f032023-06-02 21:02:27 -07001098
Kiran Mohan38648482024-03-15 15:56:14 -07001099 pi2_.MakeServer();
James Kuszmaul79b2f032023-06-02 21:02:27 -07001100
Kiran Mohan38648482024-03-15 15:56:14 -07001101 aos::ShmEventLoop receive_event_loop(&config_.message());
James Kuszmaul79b2f032023-06-02 21:02:27 -07001102 aos::Fetcher<examples::Ping> ping_fetcher =
1103 receive_event_loop.MakeFetcher<examples::Ping>("/test");
1104 aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
1105 receive_event_loop.MakeFetcher<ClientStatistics>("/pi2/aos");
1106
1107 const size_t ping_channel_index = configuration::ChannelIndex(
1108 receive_event_loop.configuration(), ping_fetcher.channel());
1109
1110 // ping_timestamp_count is accessed from multiple threads (the Watcher that
1111 // triggers it is in a separate thread), so make it atomic.
1112 std::atomic<int> ping_timestamp_count{0};
1113 const std::string channel_name =
1114 shared() ? "/pi1/aos/remote_timestamps/pi2"
1115 : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping";
1116 pi1_remote_timestamp_event_loop.MakeWatcher(
1117 channel_name, [this, channel_name, ping_channel_index,
1118 &ping_timestamp_count](const RemoteMessage &header) {
1119 VLOG(1) << channel_name << " RemoteMessage "
1120 << aos::FlatbufferToJson(&header);
1121 EXPECT_TRUE(header.has_boot_uuid());
1122 if (shared() && header.channel_index() != ping_channel_index) {
1123 return;
1124 }
1125 CHECK_EQ(header.channel_index(), ping_channel_index);
1126 ++ping_timestamp_count;
1127 });
1128
1129 // Before everything starts up, confirm there is no message.
1130 EXPECT_FALSE(ping_fetcher.Fetch());
1131
1132 // Spin up the persistent pieces.
Kiran Mohan38648482024-03-15 15:56:14 -07001133 pi1_.StartServer();
1134 pi1_.StartClient();
1135 pi2_.StartServer();
James Kuszmaul79b2f032023-06-02 21:02:27 -07001136
1137 // Event used to wait for the timestamp counting thread to start.
1138 std::unique_ptr<ThreadedEventLoopRunner> pi1_remote_timestamp_thread =
1139 std::make_unique<ThreadedEventLoopRunner>(
1140 &pi1_remote_timestamp_event_loop);
1141
1142 {
1143 // Now, spin up a client for 2 seconds.
Kiran Mohan38648482024-03-15 15:56:14 -07001144 pi2_.MakeClient();
James Kuszmaul79b2f032023-06-02 21:02:27 -07001145
Kiran Mohan38648482024-03-15 15:56:14 -07001146 pi2_.RunClient(chrono::milliseconds(2050));
James Kuszmaul79b2f032023-06-02 21:02:27 -07001147
1148 // Confirm there is no detected duplicate packet.
1149 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1150 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1151 ->Get(0)
1152 ->duplicate_packets(),
1153 0u);
1154
1155 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1156 ->Get(0)
1157 ->partial_deliveries(),
1158 0u);
1159
1160 EXPECT_TRUE(ping_fetcher.Fetch());
1161 EXPECT_EQ(ping_timestamp_count, 1);
1162
Kiran Mohan38648482024-03-15 15:56:14 -07001163 pi2_.StopClient();
James Kuszmaul79b2f032023-06-02 21:02:27 -07001164 }
1165
1166 // Send some reliable messages while the client is dead. Only the final one
1167 // should make it through.
1168 while (ping_index < 10) {
1169 SendPing(&ping_sender, ++ping_index);
1170 }
1171
1172 {
Austin Schuhac6d89e2024-03-27 14:56:09 -07001173 const aos::monotonic_clock::time_point startup_time =
1174 aos::monotonic_clock::now();
James Kuszmaul79b2f032023-06-02 21:02:27 -07001175 // Now, spin up a client for 2 seconds.
Kiran Mohan38648482024-03-15 15:56:14 -07001176 pi2_.MakeClient();
James Kuszmaul79b2f032023-06-02 21:02:27 -07001177
Kiran Mohan38648482024-03-15 15:56:14 -07001178 pi2_.RunClient(chrono::milliseconds(5050));
James Kuszmaul79b2f032023-06-02 21:02:27 -07001179
1180 // No duplicate packets should have appeared.
1181 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1182 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1183 ->Get(0)
1184 ->duplicate_packets(),
1185 0u);
1186
1187 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1188 ->Get(0)
1189 ->partial_deliveries(),
1190 0u);
1191
1192 EXPECT_EQ(ping_timestamp_count, 2);
1193 // We should have gotten precisely one more ping message--the latest one
1194 // sent should've made it, but no previous ones.
1195 EXPECT_TRUE(ping_fetcher.FetchNext());
Austin Schuhac6d89e2024-03-27 14:56:09 -07001196 EXPECT_GT(ping_fetcher.context().monotonic_remote_transmit_time,
1197 startup_time);
1198 EXPECT_LT(ping_fetcher.context().monotonic_remote_transmit_time,
1199 aos::monotonic_clock::now());
1200
James Kuszmaul79b2f032023-06-02 21:02:27 -07001201 EXPECT_EQ(ping_index, ping_fetcher->value());
1202 EXPECT_FALSE(ping_fetcher.FetchNext());
1203
Kiran Mohan38648482024-03-15 15:56:14 -07001204 pi2_.StopClient();
James Kuszmaul79b2f032023-06-02 21:02:27 -07001205 }
1206
1207 // Shut everyone else down.
Kiran Mohan38648482024-03-15 15:56:14 -07001208 pi1_.StopClient();
1209 pi2_.StopServer();
James Kuszmaul79b2f032023-06-02 21:02:27 -07001210 pi1_remote_timestamp_thread.reset();
Kiran Mohan38648482024-03-15 15:56:14 -07001211 pi1_.StopServer();
James Kuszmaul79b2f032023-06-02 21:02:27 -07001212}
1213
Austin Schuh4f7cccc2023-05-17 09:20:33 -07001214// Test that differing config sha256's result in no connection.
1215TEST_P(MessageBridgeParameterizedTest, MismatchedSha256) {
1216 // This is rather annoying to set up. We need to start up a client and
1217 // server, on the same node, but get them to think that they are on different
1218 // nodes.
1219 //
1220 // We need the client to not post directly to "/test" like it would in a
1221 // real system, otherwise we will re-send the ping message... So, use an
1222 // application specific map to have the client post somewhere else.
1223 //
1224 // To top this all off, each of these needs to be done with a ShmEventLoop,
1225 // which needs to run in a separate thread... And it is really hard to get
1226 // everything started up reliably. So just be super generous on timeouts and
1227 // hope for the best. We can be more generous in the future if we need to.
1228 //
1229 // We are faking the application names by passing in --application_name=foo
Kiran Mohan38648482024-03-15 15:56:14 -07001230 pi1_.OnPi();
Austin Schuh4f7cccc2023-05-17 09:20:33 -07001231
Kiran Mohan38648482024-03-15 15:56:14 -07001232 pi1_.MakeServer(
Austin Schuh4f7cccc2023-05-17 09:20:33 -07001233 "dummy sha256 ");
Kiran Mohan38648482024-03-15 15:56:14 -07001234 pi1_.MakeClient();
Austin Schuh4f7cccc2023-05-17 09:20:33 -07001235
1236 // And build the app for testing.
Kiran Mohan38648482024-03-15 15:56:14 -07001237 pi1_.MakeTest("test1", &pi2_);
Austin Schuh4f7cccc2023-05-17 09:20:33 -07001238 aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
Kiran Mohan38648482024-03-15 15:56:14 -07001239 pi1_.test_event_loop_->MakeFetcher<ServerStatistics>("/pi1/aos");
Austin Schuh4f7cccc2023-05-17 09:20:33 -07001240 aos::Fetcher<ClientStatistics> pi1_client_statistics_fetcher =
Kiran Mohan38648482024-03-15 15:56:14 -07001241 pi1_.test_event_loop_->MakeFetcher<ClientStatistics>("/pi1/aos");
Austin Schuh4f7cccc2023-05-17 09:20:33 -07001242
1243 // Now do it for "raspberrypi2", the client.
Kiran Mohan38648482024-03-15 15:56:14 -07001244 pi2_.OnPi();
1245 pi2_.MakeServer();
Austin Schuh4f7cccc2023-05-17 09:20:33 -07001246
1247 // And build the app for testing.
Kiran Mohan38648482024-03-15 15:56:14 -07001248 pi2_.MakeTest("test1", &pi1_);
Austin Schuh4f7cccc2023-05-17 09:20:33 -07001249 aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
Kiran Mohan38648482024-03-15 15:56:14 -07001250 pi2_.test_event_loop_->MakeFetcher<ServerStatistics>("/pi2/aos");
Austin Schuh4f7cccc2023-05-17 09:20:33 -07001251 aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
Kiran Mohan38648482024-03-15 15:56:14 -07001252 pi2_.test_event_loop_->MakeFetcher<ClientStatistics>("/pi2/aos");
Austin Schuh4f7cccc2023-05-17 09:20:33 -07001253
1254 // Wait until we are connected, then send.
1255
Kiran Mohan38648482024-03-15 15:56:14 -07001256 pi1_.StartTest();
1257 pi2_.StartTest();
1258 pi1_.StartServer();
1259 pi1_.StartClient();
1260 pi2_.StartServer();
Austin Schuh4f7cccc2023-05-17 09:20:33 -07001261
1262 {
Kiran Mohan38648482024-03-15 15:56:14 -07001263 pi2_.MakeClient();
Austin Schuh4f7cccc2023-05-17 09:20:33 -07001264
Kiran Mohan38648482024-03-15 15:56:14 -07001265 pi2_.RunClient(chrono::milliseconds(3050));
Austin Schuh4f7cccc2023-05-17 09:20:33 -07001266
1267 // Now confirm we are synchronized.
1268 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1269 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1270 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1271 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1272
1273 const ServerConnection *const pi1_connection =
1274 pi1_server_statistics_fetcher->connections()->Get(0);
1275 const ClientConnection *const pi1_client_connection =
1276 pi1_client_statistics_fetcher->connections()->Get(0);
1277 const ServerConnection *const pi2_connection =
1278 pi2_server_statistics_fetcher->connections()->Get(0);
1279 const ClientConnection *const pi2_client_connection =
1280 pi2_client_statistics_fetcher->connections()->Get(0);
1281
1282 // Make sure one direction is disconnected with a bunch of connection
1283 // attempts and failures.
1284 EXPECT_EQ(pi1_connection->state(), State::DISCONNECTED);
1285 EXPECT_EQ(pi1_connection->connection_count(), 0u);
1286 EXPECT_GT(pi1_connection->invalid_connection_count(), 10u);
1287
1288 EXPECT_EQ(pi2_client_connection->state(), State::DISCONNECTED);
1289 EXPECT_GT(pi2_client_connection->connection_count(), 10u);
1290
1291 // And the other direction is happy.
1292 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
1293 EXPECT_EQ(pi2_connection->connection_count(), 1u);
1294 EXPECT_TRUE(pi2_connection->has_connected_since_time());
1295 EXPECT_FALSE(pi2_connection->has_monotonic_offset());
1296 EXPECT_TRUE(pi2_connection->has_boot_uuid());
1297
1298 EXPECT_EQ(pi1_client_connection->state(), State::CONNECTED);
1299 EXPECT_EQ(pi1_client_connection->connection_count(), 1u);
1300
1301 VLOG(1) << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1302 VLOG(1) << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1303 VLOG(1) << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1304 VLOG(1) << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1305
Kiran Mohan38648482024-03-15 15:56:14 -07001306 pi2_.StopClient();
Austin Schuh4f7cccc2023-05-17 09:20:33 -07001307 }
1308
James Kuszmaul79b2f032023-06-02 21:02:27 -07001309 // Shut everyone else down.
Kiran Mohan38648482024-03-15 15:56:14 -07001310 pi1_.StopServer();
1311 pi1_.StopClient();
1312 pi2_.StopServer();
1313 pi1_.StopTest();
1314 pi2_.StopTest();
Austin Schuh4f7cccc2023-05-17 09:20:33 -07001315}
1316
Austin Schuh89f23e32023-05-15 17:06:43 -07001317// Test that a client which connects with too big a message gets disconnected
1318// without crashing.
1319TEST_P(MessageBridgeParameterizedTest, TooBigConnect) {
Austin Schuhb0e439d2023-05-15 10:55:40 -07001320 // This is rather annoying to set up. We need to start up a client and
1321 // server, on the same node, but get them to think that they are on different
1322 // nodes.
1323 //
1324 // We need the client to not post directly to "/test" like it would in a
1325 // real system, otherwise we will re-send the ping message... So, use an
1326 // application specific map to have the client post somewhere else.
1327 //
1328 // To top this all off, each of these needs to be done with a ShmEventLoop,
1329 // which needs to run in a separate thread... And it is really hard to get
1330 // everything started up reliably. So just be super generous on timeouts and
1331 // hope for the best. We can be more generous in the future if we need to.
1332 //
1333 // We are faking the application names by passing in --application_name=foo
Kiran Mohan38648482024-03-15 15:56:14 -07001334 pi1_.OnPi();
Austin Schuhb0e439d2023-05-15 10:55:40 -07001335
Kiran Mohan38648482024-03-15 15:56:14 -07001336 pi1_.MakeServer();
1337 pi1_.MakeClient();
Austin Schuhb0e439d2023-05-15 10:55:40 -07001338
1339 // And build the app for testing.
Kiran Mohan38648482024-03-15 15:56:14 -07001340 pi1_.MakeTest("test1", &pi2_);
Austin Schuhb0e439d2023-05-15 10:55:40 -07001341 aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
Kiran Mohan38648482024-03-15 15:56:14 -07001342 pi1_.test_event_loop_->MakeFetcher<ServerStatistics>("/pi1/aos");
Austin Schuhb0e439d2023-05-15 10:55:40 -07001343 aos::Fetcher<ClientStatistics> pi1_client_statistics_fetcher =
Kiran Mohan38648482024-03-15 15:56:14 -07001344 pi1_.test_event_loop_->MakeFetcher<ClientStatistics>("/pi1/aos");
Austin Schuhb0e439d2023-05-15 10:55:40 -07001345
1346 // Now do it for "raspberrypi2", the client.
Kiran Mohan38648482024-03-15 15:56:14 -07001347 pi2_.OnPi();
1348 pi2_.MakeServer();
Austin Schuhb0e439d2023-05-15 10:55:40 -07001349
1350 // And build the app for testing.
Kiran Mohan38648482024-03-15 15:56:14 -07001351 pi2_.MakeTest("test1", &pi1_);
Austin Schuhb0e439d2023-05-15 10:55:40 -07001352 aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
Kiran Mohan38648482024-03-15 15:56:14 -07001353 pi2_.test_event_loop_->MakeFetcher<ServerStatistics>("/pi2/aos");
Austin Schuhb0e439d2023-05-15 10:55:40 -07001354 aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
Kiran Mohan38648482024-03-15 15:56:14 -07001355 pi2_.test_event_loop_->MakeFetcher<ClientStatistics>("/pi2/aos");
Austin Schuhb0e439d2023-05-15 10:55:40 -07001356
1357 // Wait until we are connected, then send.
1358
Kiran Mohan38648482024-03-15 15:56:14 -07001359 pi1_.StartTest();
1360 pi2_.StartTest();
1361 pi1_.StartServer();
1362 pi1_.StartClient();
1363 pi2_.StartServer();
Austin Schuhb0e439d2023-05-15 10:55:40 -07001364
1365 {
Austin Schuh89f23e32023-05-15 17:06:43 -07001366 // Now, spin up a SctpClient and send a massive hunk of data. This should
1367 // trigger a disconnect, but no crash.
Kiran Mohan38648482024-03-15 15:56:14 -07001368 pi2_.OnPi();
Austin Schuh99f7c6a2024-06-25 22:07:44 -07001369 absl::SetFlag(&FLAGS_application_name, "pi2_message_bridge_client");
Kiran Mohan38648482024-03-15 15:56:14 -07001370 pi2_.client_event_loop_ =
1371 std::make_unique<aos::ShmEventLoop>(&config_.message());
1372 pi2_.client_event_loop_->SetRuntimeRealtimePriority(1);
Austin Schuh89f23e32023-05-15 17:06:43 -07001373
Austin Schuh6bdcc372024-06-27 14:49:11 -07001374 const aos::Node *const remote_node =
1375 configuration::GetNode(pi2_.client_event_loop_->configuration(), "pi1");
1376 CHECK(remote_node != nullptr);
Austin Schuh89f23e32023-05-15 17:06:43 -07001377
1378 const aos::FlatbufferDetachedBuffer<aos::message_bridge::Connect>
1379 connect_message(MakeConnectMessage(
Kiran Mohan38648482024-03-15 15:56:14 -07001380 pi2_.client_event_loop_->configuration(),
1381 pi2_.client_event_loop_->node(), "pi1",
1382 pi2_.client_event_loop_->boot_uuid(), config_sha256_));
Austin Schuh89f23e32023-05-15 17:06:43 -07001383
1384 SctpClient client(remote_node->hostname()->string_view(),
1385 remote_node->port(),
1386 connect_message.message().channels_to_transfer()->size() +
1387 kControlStreams(),
1388 "");
1389
Austin Schuh89f23e32023-05-15 17:06:43 -07001390 client.SetPoolSize(2u);
1391
Sarah Newman3a3b5b82023-05-26 15:56:53 -07001392 // Passes on a machine with:
1393 // 5.4.0-147-generic
1394 // net.core.wmem_default = 212992
1395 // net.core.wmem_max = 212992
1396 // net.core.rmem_default = 212992
1397 // net.core.rmem_max = 212992
1398 // If too large it appears the message is never delivered to the
1399 // application.
1400 constexpr size_t kBigMessageSize = 64000;
1401 client.SetMaxReadSize(kBigMessageSize);
1402 client.SetMaxWriteSize(kBigMessageSize);
1403
1404 const std::string big_data(kBigMessageSize, 'a');
Austin Schuh89f23e32023-05-15 17:06:43 -07001405
Kiran Mohan38648482024-03-15 15:56:14 -07001406 pi2_.client_event_loop_->epoll()->OnReadable(client.fd(), [&]() {
Austin Schuh89f23e32023-05-15 17:06:43 -07001407 aos::unique_c_ptr<Message> message = client.Read();
1408 client.FreeMessage(std::move(message));
1409 });
1410
Kiran Mohan38648482024-03-15 15:56:14 -07001411 aos::TimerHandler *const send_big_message =
1412 pi2_.client_event_loop_->AddTimer(
1413 [&]() { CHECK(client.Send(kConnectStream(), big_data, 0)); });
Austin Schuh89f23e32023-05-15 17:06:43 -07001414
Kiran Mohan38648482024-03-15 15:56:14 -07001415 pi2_.client_event_loop_->OnRun([this, send_big_message]() {
1416 send_big_message->Schedule(pi2_.client_event_loop_->monotonic_now() +
Austin Schuh89f23e32023-05-15 17:06:43 -07001417 chrono::seconds(1));
1418 });
Austin Schuhb0e439d2023-05-15 10:55:40 -07001419
Kiran Mohan38648482024-03-15 15:56:14 -07001420 pi2_.RunClient(chrono::milliseconds(3050));
Austin Schuhb0e439d2023-05-15 10:55:40 -07001421
1422 // Now confirm we are synchronized.
1423 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1424 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1425 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
Austin Schuh89f23e32023-05-15 17:06:43 -07001426 EXPECT_FALSE(pi2_client_statistics_fetcher.Fetch());
Austin Schuhb0e439d2023-05-15 10:55:40 -07001427
1428 const ServerConnection *const pi1_connection =
1429 pi1_server_statistics_fetcher->connections()->Get(0);
1430 const ClientConnection *const pi1_client_connection =
1431 pi1_client_statistics_fetcher->connections()->Get(0);
1432 const ServerConnection *const pi2_connection =
1433 pi2_server_statistics_fetcher->connections()->Get(0);
Austin Schuhb0e439d2023-05-15 10:55:40 -07001434
Austin Schuh4f7cccc2023-05-17 09:20:33 -07001435 // Make sure the server we just sent a bunch of junk to is grumpy and
1436 // disconnected the bad client.
Austin Schuhb0e439d2023-05-15 10:55:40 -07001437 EXPECT_EQ(pi1_connection->state(), State::DISCONNECTED);
1438 EXPECT_EQ(pi1_connection->connection_count(), 0u);
Austin Schuh89f23e32023-05-15 17:06:43 -07001439 EXPECT_GE(pi1_server_statistics_fetcher->invalid_connection_count(), 1u);
Austin Schuhb0e439d2023-05-15 10:55:40 -07001440
1441 // And the other direction is happy.
1442 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
1443 EXPECT_EQ(pi2_connection->connection_count(), 1u);
1444 EXPECT_TRUE(pi2_connection->has_connected_since_time());
1445 EXPECT_FALSE(pi2_connection->has_monotonic_offset());
1446 EXPECT_TRUE(pi2_connection->has_boot_uuid());
1447
1448 EXPECT_EQ(pi1_client_connection->state(), State::CONNECTED);
1449 EXPECT_EQ(pi1_client_connection->connection_count(), 1u);
1450
1451 VLOG(1) << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1452 VLOG(1) << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
Austin Schuhb0e439d2023-05-15 10:55:40 -07001453 VLOG(1) << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1454
Kiran Mohan38648482024-03-15 15:56:14 -07001455 pi2_.client_event_loop_->epoll()->DeleteFd(client.fd());
Austin Schuh89f23e32023-05-15 17:06:43 -07001456
Kiran Mohan38648482024-03-15 15:56:14 -07001457 pi2_.StopClient();
Austin Schuhb0e439d2023-05-15 10:55:40 -07001458 }
1459
James Kuszmaul79b2f032023-06-02 21:02:27 -07001460 // Shut everyone else down.
Kiran Mohan38648482024-03-15 15:56:14 -07001461 pi1_.StopServer();
1462 pi1_.StopClient();
1463 pi2_.StopServer();
1464 pi1_.StopTest();
1465 pi2_.StopTest();
Austin Schuhb0e439d2023-05-15 10:55:40 -07001466}
1467
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -07001468INSTANTIATE_TEST_SUITE_P(
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001469 MessageBridgeTests, MessageBridgeParameterizedTest,
1470 ::testing::Values(
1471 Param{"message_bridge_test_combined_timestamps_common_config.json",
1472 true},
1473 Param{"message_bridge_test_common_config.json", false}));
1474
Kiran Mohan38648482024-03-15 15:56:14 -07001475// Tests the case in which the configurations for the server and client are
1476// different - specifically the case where the client's config allows it to
1477// "talk" to the server, while the server's config does not allow the client to
1478// "talk" to it. The expectation in such a case is that we don't crash or raise
1479// an exception.
1480TEST(MessageBridgeTests, MismatchedServerAndClientConfigs) {
1481 // Make a `MessageBridgeServer` with the config
1482 // `message_bridge_test_mismatched_configs_pi1_and_pi3_config.json`.
1483 // In this config, `pi1` talks to `pi3`, but does *not* talk to `pi2`.
1484 PiNode pi1("pi1", "raspberrypi", "pi1_message_bridge_server",
1485 "message_bridge_test_mismatched_configs_pi1_and_pi3_config.json");
1486 pi1.OnPi();
1487 pi1.MakeServer();
1488 aos::ShmEventLoop pi1_test_event_loop(&pi1.config_.message());
1489 aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
1490 pi1_test_event_loop.MakeFetcher<ServerStatistics>("/pi1/aos");
1491
1492 // Make a `MessageBridgeClient` with the config
1493 // `message_bridge_test_mismatched_configs_pi1_and_pi2_config.json`.
1494 // In this config, `pi1` talks to `pi2`.
1495 // Reasoning:
1496 // Due to this mismatch between the configs of the server and client,
1497 // when the client `pi2` sends a "connect" request to the server `pi1`,
1498 // there will be no server node placed in the
1499 // `MessageBridgeServerStatus::nodes_` vector at the index corresponding to
1500 // the client node's index. In such a case, we expect to not crash or raise an
1501 // exception.
1502 PiNode pi2("pi2", "raspberrypi2", "pi2_message_bridge_client",
1503 "message_bridge_test_mismatched_configs_pi1_and_pi2_config.json");
1504 pi2.OnPi();
1505 pi2.MakeClient();
1506
1507 // Put the server and client on 2 separate threaded runners and start running.
1508 pi1.StartServer();
1509 pi2.StartClient();
1510
1511 // Sleep here while the server and client threads run for 1 second.
1512 // During this time, the client will attempt to connect to the server.
1513 // We've set them up with mismatching configs such that the
1514 // server does not expect to talk to the client, but the client does
1515 // expect to connect to the server.
1516 // We expect that neither of the threads crashes/raises an exception.
1517 // If any of them does, the test terminates and the exception is reported
1518 // via the stack trace when running the test.
1519 std::this_thread::sleep_for(chrono::milliseconds(1000));
1520
1521 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1522 // Since pi1's configuration is such that it expects to talk only to pi3,
1523 // we expect the number of connections to be 1, and the node to
1524 // be `pi3`.
1525 EXPECT_EQ(pi1_server_statistics_fetcher->connections()->size(), 1);
1526 const ServerConnection *const pi1_connection =
1527 pi1_server_statistics_fetcher->connections()->Get(0);
1528 EXPECT_EQ(pi1_connection->node()->name()->string_view(), "pi3");
1529 // Since we didn't really spawn a `pi3` node in this test, we expect
1530 // that the connection is disconnected, and the connection count is 0.
1531 EXPECT_EQ(pi1_connection->state(), State::DISCONNECTED);
1532 EXPECT_EQ(pi1_connection->connection_count(), 0u);
1533 // Also, since no connection was established, we expect that there is
1534 // no `connected_since_time` set.
1535 EXPECT_FALSE(pi1_connection->has_connected_since_time());
1536
1537 // If we got here, everything went well. Stop the threads.
1538 pi1.StopServer();
1539 pi2.StopClient();
1540}
1541
Stephan Pleinesf63bde82024-01-13 15:59:33 -08001542} // namespace aos::message_bridge::testing