blob: 4d5bd49d164fb6707ad5bf80aadd7a378f7eb748 [file] [log] [blame]
Austin Schuhe84c3ed2019-12-14 15:29:48 -08001#include "gtest/gtest.h"
2
3#include <chrono>
4#include <thread>
5
Austin Schuh2f8fd752020-09-01 22:38:28 -07006#include "absl/strings/str_cat.h"
Austin Schuh4889b182020-11-18 19:11:56 -08007#include "aos/event.h"
Austin Schuhe84c3ed2019-12-14 15:29:48 -08008#include "aos/events/ping_generated.h"
9#include "aos/events/pong_generated.h"
10#include "aos/network/message_bridge_client_lib.h"
11#include "aos/network/message_bridge_server_lib.h"
Jim Ostrowski2192ddb2020-06-24 19:07:31 -070012#include "aos/network/team_number.h"
Austin Schuhe991fe22020-11-18 16:53:39 -080013#include "aos/util/file.h"
Austin Schuhe84c3ed2019-12-14 15:29:48 -080014
15namespace aos {
Austin Schuh2f8fd752020-09-01 22:38:28 -070016void SetShmBase(const std::string_view base);
17
Austin Schuhe84c3ed2019-12-14 15:29:48 -080018namespace message_bridge {
19namespace testing {
20
21namespace chrono = std::chrono;
22
Austin Schuhe991fe22020-11-18 16:53:39 -080023std::string ShmBase(const std::string_view node) {
Austin Schuh2f8fd752020-09-01 22:38:28 -070024 const char *tmpdir_c_str = getenv("TEST_TMPDIR");
25 if (tmpdir_c_str != nullptr) {
Austin Schuhe991fe22020-11-18 16:53:39 -080026 return absl::StrCat(tmpdir_c_str, "/", node);
Austin Schuh2f8fd752020-09-01 22:38:28 -070027 } else {
Austin Schuhe991fe22020-11-18 16:53:39 -080028 return absl::StrCat("/dev/shm/", node);
Austin Schuh2f8fd752020-09-01 22:38:28 -070029 }
30}
31
Austin Schuhe991fe22020-11-18 16:53:39 -080032void DoSetShmBase(const std::string_view node) {
33 aos::SetShmBase(ShmBase(node));
34}
35
36class MessageBridgeTest : public ::testing::Test {
Austin Schuh0de30f32020-12-06 12:44:28 -080037 public:
38 MessageBridgeTest()
39 : pi1_config(aos::configuration::ReadConfig(
40 "aos/network/message_bridge_test_server_config.json")),
41 pi2_config(aos::configuration::ReadConfig(
42 "aos/network/message_bridge_test_client_config.json")) {
43 util::UnlinkRecursive(ShmBase("pi1"));
44 util::UnlinkRecursive(ShmBase("pi2"));
45 }
Austin Schuhe991fe22020-11-18 16:53:39 -080046
Austin Schuh0de30f32020-12-06 12:44:28 -080047 aos::FlatbufferDetachedBuffer<aos::Configuration> pi1_config;
48 aos::FlatbufferDetachedBuffer<aos::Configuration> pi2_config;
Austin Schuhe991fe22020-11-18 16:53:39 -080049};
50
Austin Schuhe84c3ed2019-12-14 15:29:48 -080051// Test that we can send a ping message over sctp and receive it.
Austin Schuhe991fe22020-11-18 16:53:39 -080052TEST_F(MessageBridgeTest, PingPong) {
Austin Schuhe84c3ed2019-12-14 15:29:48 -080053 // This is rather annoying to set up. We need to start up a client and
54 // server, on the same node, but get them to think that they are on different
55 // nodes.
56 //
57 // We then get to wait until they are connected.
58 //
59 // After they are connected, we send a Ping message.
60 //
61 // On the other end, we receive a Pong message.
62 //
63 // But, we need the client to not post directly to "/test" like it would in a
64 // real system, otherwise we will re-send the ping message... So, use an
65 // application specific map to have the client post somewhere else.
66 //
67 // To top this all off, each of these needs to be done with a ShmEventLoop,
68 // which needs to run in a separate thread... And it is really hard to get
69 // everything started up reliably. So just be super generous on timeouts and
70 // hope for the best. We can be more generous in the future if we need to.
71 //
72 // We are faking the application names by passing in --application_name=foo
Austin Schuh2f8fd752020-09-01 22:38:28 -070073 DoSetShmBase("pi1");
Austin Schuhe84c3ed2019-12-14 15:29:48 -080074 FLAGS_application_name = "pi1_message_bridge_server";
75 // Force ourselves to be "raspberrypi" and allocate everything.
76 FLAGS_override_hostname = "raspberrypi";
Austin Schuh2f8fd752020-09-01 22:38:28 -070077
Austin Schuhe991fe22020-11-18 16:53:39 -080078 aos::ShmEventLoop pi1_server_event_loop(&pi1_config.message());
Austin Schuh7bc59052020-02-16 23:48:33 -080079 MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
80
81 FLAGS_application_name = "pi1_message_bridge_client";
Austin Schuhe991fe22020-11-18 16:53:39 -080082 aos::ShmEventLoop pi1_client_event_loop(&pi1_config.message());
Austin Schuh7bc59052020-02-16 23:48:33 -080083 MessageBridgeClient pi1_message_bridge_client(&pi1_client_event_loop);
Austin Schuhe84c3ed2019-12-14 15:29:48 -080084
85 // And build the app which sends the pings.
86 FLAGS_application_name = "ping";
Austin Schuhe991fe22020-11-18 16:53:39 -080087 aos::ShmEventLoop ping_event_loop(&pi1_config.message());
Austin Schuhe84c3ed2019-12-14 15:29:48 -080088 aos::Sender<examples::Ping> ping_sender =
89 ping_event_loop.MakeSender<examples::Ping>("/test");
90
Austin Schuhe991fe22020-11-18 16:53:39 -080091 aos::ShmEventLoop pi1_test_event_loop(&pi1_config.message());
Austin Schuh0de30f32020-12-06 12:44:28 -080092 aos::Fetcher<RemoteMessage> message_header_fetcher1 =
93 pi1_test_event_loop.MakeFetcher<RemoteMessage>(
Austin Schuh2f8fd752020-09-01 22:38:28 -070094 "/pi1/aos/remote_timestamps/pi2");
95
96 // Fetchers for confirming the remote timestamps made it.
97 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
98 ping_event_loop.MakeFetcher<examples::Ping>("/test");
99 aos::Fetcher<Timestamp> pi1_on_pi1_timestamp_fetcher =
100 ping_event_loop.MakeFetcher<Timestamp>("/aos");
101
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800102 // Now do it for "raspberrypi2", the client.
103 FLAGS_application_name = "pi2_message_bridge_client";
104 FLAGS_override_hostname = "raspberrypi2";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700105 DoSetShmBase("pi2");
106
Austin Schuh5344c352020-04-12 17:04:26 -0700107 aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
Austin Schuh7bc59052020-02-16 23:48:33 -0800108 MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
109
110 FLAGS_application_name = "pi2_message_bridge_server";
Austin Schuh5344c352020-04-12 17:04:26 -0700111 aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
Austin Schuh7bc59052020-02-16 23:48:33 -0800112 MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800113
114 // And build the app which sends the pongs.
115 FLAGS_application_name = "pong";
Austin Schuh5344c352020-04-12 17:04:26 -0700116 aos::ShmEventLoop pong_event_loop(&pi2_config.message());
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800117
Austin Schuh7bc59052020-02-16 23:48:33 -0800118 // And build the app for testing.
119 FLAGS_application_name = "test";
Austin Schuh5344c352020-04-12 17:04:26 -0700120 aos::ShmEventLoop test_event_loop(&pi2_config.message());
Austin Schuh7bc59052020-02-16 23:48:33 -0800121
122 aos::Fetcher<ClientStatistics> client_statistics_fetcher =
123 test_event_loop.MakeFetcher<ClientStatistics>("/aos");
Austin Schuh0de30f32020-12-06 12:44:28 -0800124 aos::Fetcher<RemoteMessage> message_header_fetcher2 =
125 test_event_loop.MakeFetcher<RemoteMessage>(
Austin Schuh2f8fd752020-09-01 22:38:28 -0700126 "/pi2/aos/remote_timestamps/pi1");
127
128 // Event loop for fetching data delivered to pi2 from pi1 to match up
129 // messages.
130 aos::ShmEventLoop delivered_messages_event_loop(&pi2_config.message());
131 aos::Fetcher<Timestamp> pi1_on_pi2_timestamp_fetcher =
132 delivered_messages_event_loop.MakeFetcher<Timestamp>("/pi1/aos");
133 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
134 delivered_messages_event_loop.MakeFetcher<examples::Ping>("/test");
135 EXPECT_FALSE(ping_on_pi2_fetcher.Fetch());
136 EXPECT_FALSE(pi1_on_pi2_timestamp_fetcher.Fetch());
Austin Schuh7bc59052020-02-16 23:48:33 -0800137
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800138 // Count the pongs.
139 int pong_count = 0;
140 pong_event_loop.MakeWatcher(
Austin Schuh2f8fd752020-09-01 22:38:28 -0700141 "/test", [&pong_count](const examples::Ping &ping) {
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800142 ++pong_count;
Austin Schuh1ca49e92020-12-11 00:01:27 -0800143 VLOG(1) << "Got ping back " << FlatbufferToJson(&ping);
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800144 });
145
146 FLAGS_override_hostname = "";
147
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800148 // Wait until we are connected, then send.
149 int ping_count = 0;
Austin Schuh7bc59052020-02-16 23:48:33 -0800150 int pi1_server_statistics_count = 0;
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800151 ping_event_loop.MakeWatcher(
Austin Schuh196a4452020-03-15 23:12:03 -0700152 "/pi1/aos",
Austin Schuh7bc59052020-02-16 23:48:33 -0800153 [&ping_count, &pi2_client_event_loop, &ping_sender,
154 &pi1_server_statistics_count](const ServerStatistics &stats) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800155 VLOG(1) << "/pi1/aos ServerStatistics " << FlatbufferToJson(&stats);
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800156
157 ASSERT_TRUE(stats.has_connections());
158 EXPECT_EQ(stats.connections()->size(), 1);
159
160 bool connected = false;
161 for (const ServerConnection *connection : *stats.connections()) {
Austin Schuh7bc59052020-02-16 23:48:33 -0800162 // Confirm that we are estimating the server time offset correctly. It
163 // should be about 0 since we are on the same machine here.
164 if (connection->has_monotonic_offset()) {
165 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
166 chrono::milliseconds(1));
167 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
168 chrono::milliseconds(-1));
169 ++pi1_server_statistics_count;
170 }
171
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800172 if (connection->node()->name()->string_view() ==
Austin Schuh7bc59052020-02-16 23:48:33 -0800173 pi2_client_event_loop.node()->name()->string_view()) {
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800174 if (connection->state() == State::CONNECTED) {
Austin Schuh20ac95d2020-12-05 17:24:19 -0800175 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800176 connected = true;
177 }
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800178 }
179 }
180
181 if (connected) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800182 VLOG(1) << "Connected! Sent ping.";
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800183 auto builder = ping_sender.MakeBuilder();
184 examples::Ping::Builder ping_builder =
185 builder.MakeBuilder<examples::Ping>();
186 ping_builder.add_value(ping_count + 971);
187 builder.Send(ping_builder.Finish());
188 ++ping_count;
189 }
190 });
191
Austin Schuh7bc59052020-02-16 23:48:33 -0800192 // Confirm both client and server statistics messages have decent offsets in
193 // them.
194 int pi2_server_statistics_count = 0;
Austin Schuh196a4452020-03-15 23:12:03 -0700195 pong_event_loop.MakeWatcher("/pi2/aos", [&pi2_server_statistics_count](
Austin Schuh7bc59052020-02-16 23:48:33 -0800196 const ServerStatistics &stats) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800197 VLOG(1) << "/pi2/aos ServerStatistics " << FlatbufferToJson(&stats);
Austin Schuh7bc59052020-02-16 23:48:33 -0800198 for (const ServerConnection *connection : *stats.connections()) {
199 if (connection->has_monotonic_offset()) {
200 ++pi2_server_statistics_count;
201 // Confirm that we are estimating the server time offset correctly. It
202 // should be about 0 since we are on the same machine here.
203 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
204 chrono::milliseconds(1));
205 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
206 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800207 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh7bc59052020-02-16 23:48:33 -0800208 }
209 }
210 });
211
212 int pi1_client_statistics_count = 0;
Austin Schuh5344c352020-04-12 17:04:26 -0700213 ping_event_loop.MakeWatcher("/pi1/aos", [&pi1_client_statistics_count](
214 const ClientStatistics &stats) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800215 VLOG(1) << "/pi1/aos ClientStatistics " << FlatbufferToJson(&stats);
Austin Schuh7bc59052020-02-16 23:48:33 -0800216
Austin Schuh5344c352020-04-12 17:04:26 -0700217 for (const ClientConnection *connection : *stats.connections()) {
218 if (connection->has_monotonic_offset()) {
219 ++pi1_client_statistics_count;
220 // It takes at least 10 microseconds to send a message between the
221 // client and server. The min (filtered) time shouldn't be over 10
222 // milliseconds on localhost. This might have to bump up if this is
223 // proving flaky.
224 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
225 chrono::milliseconds(10));
226 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
227 chrono::microseconds(10));
228 }
229 }
230 });
Austin Schuh7bc59052020-02-16 23:48:33 -0800231
232 int pi2_client_statistics_count = 0;
Austin Schuh196a4452020-03-15 23:12:03 -0700233 pong_event_loop.MakeWatcher("/pi2/aos", [&pi2_client_statistics_count](
Austin Schuh7bc59052020-02-16 23:48:33 -0800234 const ClientStatistics &stats) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800235 VLOG(1) << "/pi2/aos ClientStatistics " << FlatbufferToJson(&stats);
Austin Schuh7bc59052020-02-16 23:48:33 -0800236
237 for (const ClientConnection *connection : *stats.connections()) {
238 if (connection->has_monotonic_offset()) {
239 ++pi2_client_statistics_count;
240 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
241 chrono::milliseconds(10));
242 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
243 chrono::microseconds(10));
244 }
245 }
246 });
247
Austin Schuh196a4452020-03-15 23:12:03 -0700248 ping_event_loop.MakeWatcher("/pi1/aos", [](const Timestamp &timestamp) {
Austin Schuh7bc59052020-02-16 23:48:33 -0800249 EXPECT_TRUE(timestamp.has_offsets());
Austin Schuh1ca49e92020-12-11 00:01:27 -0800250 VLOG(1) << "/pi1/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuh7bc59052020-02-16 23:48:33 -0800251 });
Austin Schuh196a4452020-03-15 23:12:03 -0700252 pong_event_loop.MakeWatcher("/pi2/aos", [](const Timestamp &timestamp) {
Austin Schuh7bc59052020-02-16 23:48:33 -0800253 EXPECT_TRUE(timestamp.has_offsets());
Austin Schuh1ca49e92020-12-11 00:01:27 -0800254 VLOG(1) << "/pi2/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuh7bc59052020-02-16 23:48:33 -0800255 });
256
257 // Run for 5 seconds to make sure we have time to estimate the offset.
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800258 aos::TimerHandler *quit = ping_event_loop.AddTimer(
259 [&ping_event_loop]() { ping_event_loop.Exit(); });
260 ping_event_loop.OnRun([quit, &ping_event_loop]() {
Austin Schuh7bc59052020-02-16 23:48:33 -0800261 // Stop between timestamps, not exactly on them.
262 quit->Setup(ping_event_loop.monotonic_now() + chrono::milliseconds(5050));
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800263 });
264
Austin Schuh2f8fd752020-09-01 22:38:28 -0700265 // Find the channel index for both the /pi1/aos Timestamp channel and Ping
266 // channel.
267 const size_t pi1_timestamp_channel = configuration::ChannelIndex(
268 pong_event_loop.configuration(), pi1_on_pi2_timestamp_fetcher.channel());
269 const size_t ping_timestamp_channel =
270 configuration::ChannelIndex(delivered_messages_event_loop.configuration(),
271 ping_on_pi2_fetcher.channel());
272
273 for (const Channel *channel : *ping_event_loop.configuration()->channels()) {
274 VLOG(1) << "Channel "
275 << configuration::ChannelIndex(ping_event_loop.configuration(),
276 channel)
277 << " " << configuration::CleanedChannelToString(channel);
278 }
279
280 // For each remote timestamp we get back, confirm that it is either a ping
281 // message, or a timestamp we sent out. Also confirm that the timestamps are
282 // correct.
283 ping_event_loop.MakeWatcher(
284 "/pi1/aos/remote_timestamps/pi2",
285 [pi1_timestamp_channel, ping_timestamp_channel, &ping_on_pi2_fetcher,
286 &ping_on_pi1_fetcher, &pi1_on_pi2_timestamp_fetcher,
Austin Schuh0de30f32020-12-06 12:44:28 -0800287 &pi1_on_pi1_timestamp_fetcher](const RemoteMessage &header) {
288 VLOG(1) << "/pi1/aos/remote_timestamps/pi2 RemoteMessage "
289 << aos::FlatbufferToJson(&header);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700290
Austin Schuh20ac95d2020-12-05 17:24:19 -0800291 EXPECT_TRUE(header.has_boot_uuid());
292
Austin Schuh2f8fd752020-09-01 22:38:28 -0700293 const aos::monotonic_clock::time_point header_monotonic_sent_time(
294 chrono::nanoseconds(header.monotonic_sent_time()));
295 const aos::realtime_clock::time_point header_realtime_sent_time(
296 chrono::nanoseconds(header.realtime_sent_time()));
297 const aos::monotonic_clock::time_point header_monotonic_remote_time(
298 chrono::nanoseconds(header.monotonic_remote_time()));
299 const aos::realtime_clock::time_point header_realtime_remote_time(
300 chrono::nanoseconds(header.realtime_remote_time()));
301
302 const Context *pi1_context = nullptr;
303 const Context *pi2_context = nullptr;
304
305 if (header.channel_index() == pi1_timestamp_channel) {
306 // Find the forwarded message.
307 while (pi1_on_pi2_timestamp_fetcher.context().monotonic_event_time <
308 header_monotonic_sent_time) {
309 ASSERT_TRUE(pi1_on_pi2_timestamp_fetcher.FetchNext());
310 }
311
312 // And the source message.
313 while (pi1_on_pi1_timestamp_fetcher.context().monotonic_event_time <
314 header_monotonic_remote_time) {
315 ASSERT_TRUE(pi1_on_pi1_timestamp_fetcher.FetchNext());
316 }
317
318 pi1_context = &pi1_on_pi1_timestamp_fetcher.context();
319 pi2_context = &pi1_on_pi2_timestamp_fetcher.context();
320 } else if (header.channel_index() == ping_timestamp_channel) {
321 // Find the forwarded message.
322 while (ping_on_pi2_fetcher.context().monotonic_event_time <
323 header_monotonic_sent_time) {
324 ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
325 }
326
327 // And the source message.
328 while (ping_on_pi1_fetcher.context().monotonic_event_time <
329 header_monotonic_remote_time) {
330 ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
331 }
332
333 pi1_context = &ping_on_pi1_fetcher.context();
334 pi2_context = &ping_on_pi2_fetcher.context();
335 } else {
336 LOG(FATAL) << "Unknown channel";
337 }
338
339 // Confirm the forwarded message has matching timestamps to the
340 // timestamps we got back.
341 EXPECT_EQ(pi2_context->queue_index, header.queue_index());
342 EXPECT_EQ(pi2_context->monotonic_event_time,
343 header_monotonic_sent_time);
344 EXPECT_EQ(pi2_context->realtime_event_time, header_realtime_sent_time);
345 EXPECT_EQ(pi2_context->realtime_remote_time,
346 header_realtime_remote_time);
347 EXPECT_EQ(pi2_context->monotonic_remote_time,
348 header_monotonic_remote_time);
349
350 // Confirm the forwarded message also matches the source message.
351 EXPECT_EQ(pi1_context->queue_index, header.queue_index());
352 EXPECT_EQ(pi1_context->monotonic_event_time,
353 header_monotonic_remote_time);
354 EXPECT_EQ(pi1_context->realtime_event_time,
355 header_realtime_remote_time);
356 });
357
Austin Schuh7bc59052020-02-16 23:48:33 -0800358 // Start everything up. Pong is the only thing we don't know how to wait on,
359 // so start it first.
360 std::thread pong_thread([&pong_event_loop]() { pong_event_loop.Run(); });
361
362 std::thread pi1_server_thread(
363 [&pi1_server_event_loop]() { pi1_server_event_loop.Run(); });
364 std::thread pi1_client_thread(
365 [&pi1_client_event_loop]() { pi1_client_event_loop.Run(); });
366 std::thread pi2_client_thread(
367 [&pi2_client_event_loop]() { pi2_client_event_loop.Run(); });
368 std::thread pi2_server_thread(
369 [&pi2_server_event_loop]() { pi2_server_event_loop.Run(); });
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800370
371 // And go!
372 ping_event_loop.Run();
373
374 // Shut everyone else down
Austin Schuh7bc59052020-02-16 23:48:33 -0800375 pi1_server_event_loop.Exit();
376 pi1_client_event_loop.Exit();
377 pi2_client_event_loop.Exit();
378 pi2_server_event_loop.Exit();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800379 pong_event_loop.Exit();
Austin Schuh7bc59052020-02-16 23:48:33 -0800380 pi1_server_thread.join();
381 pi1_client_thread.join();
382 pi2_client_thread.join();
383 pi2_server_thread.join();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800384 pong_thread.join();
385
386 // Make sure we sent something.
387 EXPECT_GE(ping_count, 1);
388 // And got something back.
389 EXPECT_GE(pong_count, 1);
Austin Schuh7bc59052020-02-16 23:48:33 -0800390
391 // Confirm that we are estimating a monotonic offset on the client.
392 ASSERT_TRUE(client_statistics_fetcher.Fetch());
393
394 EXPECT_EQ(client_statistics_fetcher->connections()->size(), 1u);
395 EXPECT_EQ(client_statistics_fetcher->connections()
396 ->Get(0)
397 ->node()
398 ->name()
399 ->string_view(),
400 "pi1");
401
402 // Make sure the offset in one direction is less than a second.
403 EXPECT_GT(
404 client_statistics_fetcher->connections()->Get(0)->monotonic_offset(), 0);
405 EXPECT_LT(
406 client_statistics_fetcher->connections()->Get(0)->monotonic_offset(),
407 1000000000);
408
409 EXPECT_GE(pi1_server_statistics_count, 2);
410 EXPECT_GE(pi2_server_statistics_count, 2);
411 EXPECT_GE(pi1_client_statistics_count, 2);
412 EXPECT_GE(pi2_client_statistics_count, 2);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700413
414 // Confirm we got timestamps back!
415 EXPECT_TRUE(message_header_fetcher1.Fetch());
416 EXPECT_TRUE(message_header_fetcher2.Fetch());
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800417}
418
Austin Schuh5344c352020-04-12 17:04:26 -0700419// Test that the client disconnecting triggers the server offsets on both sides
420// to clear.
Austin Schuhe991fe22020-11-18 16:53:39 -0800421TEST_F(MessageBridgeTest, ClientRestart) {
Austin Schuh5344c352020-04-12 17:04:26 -0700422 // This is rather annoying to set up. We need to start up a client and
423 // server, on the same node, but get them to think that they are on different
424 // nodes.
425 //
426 // We need the client to not post directly to "/test" like it would in a
427 // real system, otherwise we will re-send the ping message... So, use an
428 // application specific map to have the client post somewhere else.
429 //
430 // To top this all off, each of these needs to be done with a ShmEventLoop,
431 // which needs to run in a separate thread... And it is really hard to get
432 // everything started up reliably. So just be super generous on timeouts and
433 // hope for the best. We can be more generous in the future if we need to.
434 //
435 // We are faking the application names by passing in --application_name=foo
Austin Schuh5344c352020-04-12 17:04:26 -0700436 FLAGS_application_name = "pi1_message_bridge_server";
437 // Force ourselves to be "raspberrypi" and allocate everything.
438 FLAGS_override_hostname = "raspberrypi";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700439 DoSetShmBase("pi1");
Austin Schuhe991fe22020-11-18 16:53:39 -0800440 aos::ShmEventLoop pi1_server_event_loop(&pi1_config.message());
Austin Schuh5344c352020-04-12 17:04:26 -0700441 MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
442
443 FLAGS_application_name = "pi1_message_bridge_client";
Austin Schuhe991fe22020-11-18 16:53:39 -0800444 aos::ShmEventLoop pi1_client_event_loop(&pi1_config.message());
Austin Schuh5344c352020-04-12 17:04:26 -0700445 MessageBridgeClient pi1_message_bridge_client(&pi1_client_event_loop);
446
447 // And build the app for testing.
448 FLAGS_application_name = "test1";
Austin Schuhe991fe22020-11-18 16:53:39 -0800449 aos::ShmEventLoop pi1_test_event_loop(&pi1_config.message());
Austin Schuh5344c352020-04-12 17:04:26 -0700450 aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
451 pi1_test_event_loop.MakeFetcher<ServerStatistics>("/pi1/aos");
452
453 // Now do it for "raspberrypi2", the client.
454 FLAGS_override_hostname = "raspberrypi2";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700455 DoSetShmBase("pi2");
Austin Schuh5344c352020-04-12 17:04:26 -0700456 FLAGS_application_name = "pi2_message_bridge_server";
457 aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
458 MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
459
460 // And build the app for testing.
461 FLAGS_application_name = "test2";
462 aos::ShmEventLoop pi2_test_event_loop(&pi2_config.message());
463 aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
464 pi2_test_event_loop.MakeFetcher<ServerStatistics>("/pi2/aos");
465
466 // Wait until we are connected, then send.
467 pi1_test_event_loop.MakeWatcher(
468 "/pi1/aos", [](const ServerStatistics &stats) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800469 VLOG(1) << "/pi1/aos ServerStatistics " << FlatbufferToJson(&stats);
Austin Schuh5344c352020-04-12 17:04:26 -0700470 });
471
472 pi2_test_event_loop.MakeWatcher(
473 "/pi2/aos", [](const ServerStatistics &stats) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800474 VLOG(1) << "/pi2/aos ServerStatistics " << FlatbufferToJson(&stats);
Austin Schuh5344c352020-04-12 17:04:26 -0700475 });
476
477 pi1_test_event_loop.MakeWatcher(
478 "/pi1/aos", [](const ClientStatistics &stats) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800479 VLOG(1) << "/pi1/aos ClientStatistics " << FlatbufferToJson(&stats);
Austin Schuh5344c352020-04-12 17:04:26 -0700480 });
481
482 pi2_test_event_loop.MakeWatcher(
483 "/pi2/aos", [](const ClientStatistics &stats) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800484 VLOG(1) << "/pi2/aos ClientStatistics " << FlatbufferToJson(&stats);
Austin Schuh5344c352020-04-12 17:04:26 -0700485 });
486
487 pi1_test_event_loop.MakeWatcher("/pi1/aos", [](const Timestamp &timestamp) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800488 VLOG(1) << "/pi1/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuh5344c352020-04-12 17:04:26 -0700489 });
490 pi2_test_event_loop.MakeWatcher("/pi2/aos", [](const Timestamp &timestamp) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800491 VLOG(1) << "/pi2/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuh5344c352020-04-12 17:04:26 -0700492 });
493
494 // Start everything up. Pong is the only thing we don't know how to wait on,
495 // so start it first.
496 std::thread pi1_test_thread(
497 [&pi1_test_event_loop]() { pi1_test_event_loop.Run(); });
498 std::thread pi2_test_thread(
499 [&pi2_test_event_loop]() { pi2_test_event_loop.Run(); });
500
501 std::thread pi1_server_thread(
502 [&pi1_server_event_loop]() { pi1_server_event_loop.Run(); });
503 std::thread pi1_client_thread(
504 [&pi1_client_event_loop]() { pi1_client_event_loop.Run(); });
505 std::thread pi2_server_thread(
506 [&pi2_server_event_loop]() { pi2_server_event_loop.Run(); });
507
508 {
509 FLAGS_application_name = "pi2_message_bridge_client";
510 aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
511 MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
512
513 // Run for 5 seconds to make sure we have time to estimate the offset.
514 aos::TimerHandler *const quit = pi2_client_event_loop.AddTimer(
515 [&pi2_client_event_loop]() { pi2_client_event_loop.Exit(); });
516 pi2_client_event_loop.OnRun([quit, &pi2_client_event_loop]() {
517 // Stop between timestamps, not exactly on them.
518 quit->Setup(pi2_client_event_loop.monotonic_now() +
519 chrono::milliseconds(3050));
520 });
521
522 // And go!
523 pi2_client_event_loop.Run();
524
525 // Now confirm we are synchronized.
526 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
527 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
528
529 const ServerConnection *const pi1_connection =
530 pi1_server_statistics_fetcher->connections()->Get(0);
531 const ServerConnection *const pi2_connection =
532 pi2_server_statistics_fetcher->connections()->Get(0);
533
534 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
535 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
536 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
537 chrono::milliseconds(1));
538 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
539 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800540 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700541
542 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
543 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
544 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
545 chrono::milliseconds(1));
546 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
547 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800548 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700549 }
550
551 std::this_thread::sleep_for(std::chrono::seconds(2));
552
553 {
554 // Now confirm we are un-synchronized.
555 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
556 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
557 const ServerConnection *const pi1_connection =
558 pi1_server_statistics_fetcher->connections()->Get(0);
559 const ServerConnection *const pi2_connection =
560 pi2_server_statistics_fetcher->connections()->Get(0);
561
562 EXPECT_EQ(pi1_connection->state(), State::DISCONNECTED);
563 EXPECT_FALSE(pi1_connection->has_monotonic_offset());
Austin Schuh20ac95d2020-12-05 17:24:19 -0800564 EXPECT_FALSE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700565 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
566 EXPECT_FALSE(pi2_connection->has_monotonic_offset());
Austin Schuh20ac95d2020-12-05 17:24:19 -0800567 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700568 }
569
570 {
571 FLAGS_application_name = "pi2_message_bridge_client";
572 aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
573 MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
574
575 // Run for 5 seconds to make sure we have time to estimate the offset.
576 aos::TimerHandler *const quit = pi2_client_event_loop.AddTimer(
577 [&pi2_client_event_loop]() { pi2_client_event_loop.Exit(); });
578 pi2_client_event_loop.OnRun([quit, &pi2_client_event_loop]() {
579 // Stop between timestamps, not exactly on them.
580 quit->Setup(pi2_client_event_loop.monotonic_now() +
581 chrono::milliseconds(3050));
582 });
583
584 // And go!
585 pi2_client_event_loop.Run();
586
587 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
588 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
589
590 // Now confirm we are synchronized again.
591 const ServerConnection *const pi1_connection =
592 pi1_server_statistics_fetcher->connections()->Get(0);
593 const ServerConnection *const pi2_connection =
594 pi2_server_statistics_fetcher->connections()->Get(0);
595
596 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
597 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
598 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
599 chrono::milliseconds(1));
600 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
601 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800602 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700603
604 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
605 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
606 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
607 chrono::milliseconds(1));
608 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
609 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800610 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700611 }
612
613 // Shut everyone else down
614 pi1_server_event_loop.Exit();
615 pi1_client_event_loop.Exit();
616 pi2_server_event_loop.Exit();
617 pi1_test_event_loop.Exit();
618 pi2_test_event_loop.Exit();
619 pi1_server_thread.join();
620 pi1_client_thread.join();
621 pi2_server_thread.join();
622 pi1_test_thread.join();
623 pi2_test_thread.join();
624}
625
626// Test that the server disconnecting triggers the server offsets on the other
627// side to clear, along with the other client.
Austin Schuhe991fe22020-11-18 16:53:39 -0800628TEST_F(MessageBridgeTest, ServerRestart) {
Austin Schuh5344c352020-04-12 17:04:26 -0700629 // This is rather annoying to set up. We need to start up a client and
630 // server, on the same node, but get them to think that they are on different
631 // nodes.
632 //
633 // We need the client to not post directly to "/test" like it would in a
634 // real system, otherwise we will re-send the ping message... So, use an
635 // application specific map to have the client post somewhere else.
636 //
637 // To top this all off, each of these needs to be done with a ShmEventLoop,
638 // which needs to run in a separate thread... And it is really hard to get
639 // everything started up reliably. So just be super generous on timeouts and
640 // hope for the best. We can be more generous in the future if we need to.
641 //
642 // We are faking the application names by passing in --application_name=foo
Austin Schuh5344c352020-04-12 17:04:26 -0700643 FLAGS_application_name = "pi1_message_bridge_server";
644 // Force ourselves to be "raspberrypi" and allocate everything.
645 FLAGS_override_hostname = "raspberrypi";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700646 DoSetShmBase("pi1");
Austin Schuhe991fe22020-11-18 16:53:39 -0800647 aos::ShmEventLoop pi1_server_event_loop(&pi1_config.message());
Austin Schuh5344c352020-04-12 17:04:26 -0700648 MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
649
650 FLAGS_application_name = "pi1_message_bridge_client";
Austin Schuhe991fe22020-11-18 16:53:39 -0800651 aos::ShmEventLoop pi1_client_event_loop(&pi1_config.message());
Austin Schuh5344c352020-04-12 17:04:26 -0700652 MessageBridgeClient pi1_message_bridge_client(&pi1_client_event_loop);
653
654 // And build the app for testing.
655 FLAGS_application_name = "test1";
Austin Schuhe991fe22020-11-18 16:53:39 -0800656 aos::ShmEventLoop pi1_test_event_loop(&pi1_config.message());
Austin Schuh5344c352020-04-12 17:04:26 -0700657 aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
658 pi1_test_event_loop.MakeFetcher<ServerStatistics>("/pi1/aos");
659 aos::Fetcher<ClientStatistics> pi1_client_statistics_fetcher =
660 pi1_test_event_loop.MakeFetcher<ClientStatistics>("/pi1/aos");
661
662 // Now do it for "raspberrypi2", the client.
663 FLAGS_override_hostname = "raspberrypi2";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700664 DoSetShmBase("pi2");
Austin Schuh5344c352020-04-12 17:04:26 -0700665 FLAGS_application_name = "pi2_message_bridge_client";
666 aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
667 MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
668
669 // And build the app for testing.
670 FLAGS_application_name = "test2";
671 aos::ShmEventLoop pi2_test_event_loop(&pi2_config.message());
672 aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
673 pi2_test_event_loop.MakeFetcher<ServerStatistics>("/pi2/aos");
674
675 // Wait until we are connected, then send.
676 pi1_test_event_loop.MakeWatcher(
677 "/pi1/aos", [](const ServerStatistics &stats) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800678 VLOG(1) << "/pi1/aos ServerStatistics " << FlatbufferToJson(&stats);
Austin Schuh5344c352020-04-12 17:04:26 -0700679 });
680
681 // Confirm both client and server statistics messages have decent offsets in
682 // them.
683 pi2_test_event_loop.MakeWatcher(
684 "/pi2/aos", [](const ServerStatistics &stats) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800685 VLOG(1) << "/pi2/aos ServerStatistics " << FlatbufferToJson(&stats);
Austin Schuh5344c352020-04-12 17:04:26 -0700686 });
687
688 pi1_test_event_loop.MakeWatcher(
689 "/pi1/aos", [](const ClientStatistics &stats) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800690 VLOG(1) << "/pi1/aos ClientStatistics " << FlatbufferToJson(&stats);
Austin Schuh5344c352020-04-12 17:04:26 -0700691 });
692
693 pi2_test_event_loop.MakeWatcher(
694 "/pi2/aos", [](const ClientStatistics &stats) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800695 VLOG(1) << "/pi2/aos ClientStatistics " << FlatbufferToJson(&stats);
Austin Schuh5344c352020-04-12 17:04:26 -0700696 });
697
698 pi1_test_event_loop.MakeWatcher("/pi1/aos", [](const Timestamp &timestamp) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800699 VLOG(1) << "/pi1/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuh5344c352020-04-12 17:04:26 -0700700 });
701 pi2_test_event_loop.MakeWatcher("/pi2/aos", [](const Timestamp &timestamp) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800702 VLOG(1) << "/pi2/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuh5344c352020-04-12 17:04:26 -0700703 });
704
705 // Start everything up. Pong is the only thing we don't know how to wait on,
706 // so start it first.
707 std::thread pi1_test_thread(
708 [&pi1_test_event_loop]() { pi1_test_event_loop.Run(); });
709 std::thread pi2_test_thread(
710 [&pi2_test_event_loop]() { pi2_test_event_loop.Run(); });
711
712 std::thread pi1_server_thread(
713 [&pi1_server_event_loop]() { pi1_server_event_loop.Run(); });
714 std::thread pi1_client_thread(
715 [&pi1_client_event_loop]() { pi1_client_event_loop.Run(); });
716 std::thread pi2_client_thread(
717 [&pi2_client_event_loop]() { pi2_client_event_loop.Run(); });
718
719 {
720 FLAGS_application_name = "pi2_message_bridge_server";
721 aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
722 MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
723
724 // Run for 5 seconds to make sure we have time to estimate the offset.
725 aos::TimerHandler *const quit = pi2_server_event_loop.AddTimer(
726 [&pi2_server_event_loop]() { pi2_server_event_loop.Exit(); });
727 pi2_server_event_loop.OnRun([quit, &pi2_server_event_loop]() {
728 // Stop between timestamps, not exactly on them.
729 quit->Setup(pi2_server_event_loop.monotonic_now() +
730 chrono::milliseconds(3050));
731 });
732
733 // And go!
734 pi2_server_event_loop.Run();
735
736 // Now confirm we are synchronized.
737 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
738 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
739
740 const ServerConnection *const pi1_connection =
741 pi1_server_statistics_fetcher->connections()->Get(0);
742 const ServerConnection *const pi2_connection =
743 pi2_server_statistics_fetcher->connections()->Get(0);
744
745 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
746 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
747 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
748 chrono::milliseconds(1));
749 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
750 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800751 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700752
753 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
754 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
755 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
756 chrono::milliseconds(1));
757 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
758 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800759 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700760 }
761
762 std::this_thread::sleep_for(std::chrono::seconds(2));
763
764 {
765 // And confirm we are unsynchronized.
766 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
767 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
768
769 const ServerConnection *const pi1_server_connection =
770 pi1_server_statistics_fetcher->connections()->Get(0);
771 const ClientConnection *const pi1_client_connection =
772 pi1_client_statistics_fetcher->connections()->Get(0);
773
774 EXPECT_EQ(pi1_server_connection->state(), State::CONNECTED);
775 EXPECT_FALSE(pi1_server_connection->has_monotonic_offset());
Austin Schuh20ac95d2020-12-05 17:24:19 -0800776 EXPECT_TRUE(pi1_server_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700777 EXPECT_EQ(pi1_client_connection->state(), State::DISCONNECTED);
778 EXPECT_FALSE(pi1_client_connection->has_monotonic_offset());
779 }
780
781 {
782 FLAGS_application_name = "pi2_message_bridge_server";
783 aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
784 MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
785
786 // Run for 5 seconds to make sure we have time to estimate the offset.
787 aos::TimerHandler *const quit = pi2_server_event_loop.AddTimer(
788 [&pi2_server_event_loop]() { pi2_server_event_loop.Exit(); });
789 pi2_server_event_loop.OnRun([quit, &pi2_server_event_loop]() {
790 // Stop between timestamps, not exactly on them.
791 quit->Setup(pi2_server_event_loop.monotonic_now() +
792 chrono::milliseconds(3050));
793 });
794
795 // And go!
796 pi2_server_event_loop.Run();
797
798 // And confirm we are synchronized again.
799 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
800 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
801
802 const ServerConnection *const pi1_connection =
803 pi1_server_statistics_fetcher->connections()->Get(0);
804 const ServerConnection *const pi2_connection =
805 pi2_server_statistics_fetcher->connections()->Get(0);
806
807 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
808 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
809 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
810 chrono::milliseconds(1));
811 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
812 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800813 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700814
815 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
816 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
817 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
818 chrono::milliseconds(1));
819 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
820 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800821 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700822 }
823
824 // Shut everyone else down
825 pi1_server_event_loop.Exit();
826 pi1_client_event_loop.Exit();
827 pi2_client_event_loop.Exit();
828 pi1_test_event_loop.Exit();
829 pi2_test_event_loop.Exit();
830 pi1_server_thread.join();
831 pi1_client_thread.join();
832 pi2_client_thread.join();
833 pi1_test_thread.join();
834 pi2_test_thread.join();
835}
836
Austin Schuh4889b182020-11-18 19:11:56 -0800837// TODO(austin): The above test confirms that the external state does the right
Austin Schuh5344c352020-04-12 17:04:26 -0700838// thing, but doesn't confirm that the internal state does. We either need to
839// expose a way to check the state in a thread-safe way, or need a way to jump
840// time for one node to do that.
841
Austin Schuh4889b182020-11-18 19:11:56 -0800842void SendPing(aos::Sender<examples::Ping> *sender, int value) {
843 aos::Sender<examples::Ping>::Builder builder = sender->MakeBuilder();
844 examples::Ping::Builder ping_builder = builder.MakeBuilder<examples::Ping>();
845 ping_builder.add_value(value);
846 builder.Send(ping_builder.Finish());
847}
848
849// Tests that when a message is sent before the bridge starts up, but is
850// configured as reliable, we forward it. Confirm this survives a client reset.
851TEST_F(MessageBridgeTest, ReliableSentBeforeClientStartup) {
852 DoSetShmBase("pi1");
853 // Force ourselves to be "raspberrypi" and allocate everything.
854 FLAGS_override_hostname = "raspberrypi";
855
856 FLAGS_application_name = "sender";
857 aos::ShmEventLoop send_event_loop(&pi1_config.message());
858 aos::Sender<examples::Ping> ping_sender =
859 send_event_loop.MakeSender<examples::Ping>("/test");
860 SendPing(&ping_sender, 1);
861 aos::Sender<examples::Ping> unreliable_ping_sender =
862 send_event_loop.MakeSender<examples::Ping>("/unreliable");
863 SendPing(&unreliable_ping_sender, 1);
864
865 FLAGS_application_name = "pi1_message_bridge_server";
866 aos::ShmEventLoop pi1_server_event_loop(&pi1_config.message());
867 MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
868
869 FLAGS_application_name = "pi1_message_bridge_client";
870 aos::ShmEventLoop pi1_client_event_loop(&pi1_config.message());
871 MessageBridgeClient pi1_message_bridge_client(&pi1_client_event_loop);
872
873 FLAGS_application_name = "pi1_timestamp";
874 aos::ShmEventLoop pi1_remote_timestamp_event_loop(&pi1_config.message());
875
876 // Now do it for "raspberrypi2", the client.
877 DoSetShmBase("pi2");
878 FLAGS_override_hostname = "raspberrypi2";
879
880 FLAGS_application_name = "pi2_message_bridge_server";
881 aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
882 MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
883
884 aos::ShmEventLoop receive_event_loop(&pi2_config.message());
885 aos::Fetcher<examples::Ping> ping_fetcher =
886 receive_event_loop.MakeFetcher<examples::Ping>("/test");
887 aos::Fetcher<examples::Ping> unreliable_ping_fetcher =
888 receive_event_loop.MakeFetcher<examples::Ping>("/unreliable");
889 aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
890 receive_event_loop.MakeFetcher<ClientStatistics>("/pi2/aos");
891
892 const size_t ping_channel_index = configuration::ChannelIndex(
893 receive_event_loop.configuration(), ping_fetcher.channel());
894
895 std::atomic<int> ping_timestamp_count{0};
896 pi1_remote_timestamp_event_loop.MakeWatcher(
897 "/pi1/aos/remote_timestamps/pi2",
Austin Schuh0de30f32020-12-06 12:44:28 -0800898 [ping_channel_index, &ping_timestamp_count](const RemoteMessage &header) {
899 VLOG(1) << "/pi1/aos/remote_timestamps/pi2 RemoteMessage "
900 << aos::FlatbufferToJson(&header);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800901 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuh4889b182020-11-18 19:11:56 -0800902 if (header.channel_index() == ping_channel_index) {
903 ++ping_timestamp_count;
904 }
905 });
906
907 // Before everything starts up, confirm there is no message.
908 EXPECT_FALSE(ping_fetcher.Fetch());
909 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
910
911 // Spin up the persistant pieces.
912 std::thread pi1_server_thread(
913 [&pi1_server_event_loop]() { pi1_server_event_loop.Run(); });
914 std::thread pi1_client_thread(
915 [&pi1_client_event_loop]() { pi1_client_event_loop.Run(); });
916 std::thread pi2_server_thread(
917 [&pi2_server_event_loop]() { pi2_server_event_loop.Run(); });
918
919 // Event used to wait for the timestamp counting thread to start.
920 aos::Event event;
921 std::thread pi1_remote_timestamp_thread(
922 [&pi1_remote_timestamp_event_loop, &event]() {
923 pi1_remote_timestamp_event_loop.OnRun([&event]() { event.Set(); });
924 pi1_remote_timestamp_event_loop.Run();
925 });
926
927 event.Wait();
928
929 {
930 // Now, spin up a client for 2 seconds.
931 LOG(INFO) << "Starting first pi2 MessageBridgeClient";
932 FLAGS_application_name = "pi2_message_bridge_client";
933 aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
934 MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
935
936 aos::TimerHandler *quit = pi2_client_event_loop.AddTimer(
937 [&pi2_client_event_loop]() { pi2_client_event_loop.Exit(); });
938 pi2_client_event_loop.OnRun([quit, &pi2_client_event_loop]() {
939 // Stop between timestamps, not exactly on them.
940 quit->Setup(pi2_client_event_loop.monotonic_now() +
941 chrono::milliseconds(2050));
942 });
943
944 // And go!
945 pi2_client_event_loop.Run();
946
947 // Confirm there is no detected duplicate packet.
948 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
949 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
950 ->Get(0)
951 ->duplicate_packets(),
952 0u);
953
954 EXPECT_TRUE(ping_fetcher.Fetch());
955 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
956 EXPECT_EQ(ping_timestamp_count, 1);
957 LOG(INFO) << "Shutting down first pi2 MessageBridgeClient";
958 }
959
960 {
961 // Now, spin up a second client for 2 seconds.
962 LOG(INFO) << "Starting second pi2 MessageBridgeClient";
963 FLAGS_application_name = "pi2_message_bridge_client";
964 aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
965 MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
966
967 aos::TimerHandler *quit = pi2_client_event_loop.AddTimer(
968 [&pi2_client_event_loop]() { pi2_client_event_loop.Exit(); });
969 pi2_client_event_loop.OnRun([quit, &pi2_client_event_loop]() {
970 // Stop between timestamps, not exactly on them.
971 quit->Setup(pi2_client_event_loop.monotonic_now() +
972 chrono::milliseconds(5050));
973 });
974
975 // And go!
976 pi2_client_event_loop.Run();
977
978 // Confirm we detect the duplicate packet correctly.
979 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
980 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
981 ->Get(0)
982 ->duplicate_packets(),
983 1u);
984
985 EXPECT_EQ(ping_timestamp_count, 1);
986 EXPECT_FALSE(ping_fetcher.Fetch());
987 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
988 }
989
990 // Shut everyone else down
991 pi1_server_event_loop.Exit();
992 pi1_client_event_loop.Exit();
993 pi2_server_event_loop.Exit();
994 pi1_remote_timestamp_event_loop.Exit();
995 pi1_remote_timestamp_thread.join();
996 pi1_server_thread.join();
997 pi1_client_thread.join();
998 pi2_server_thread.join();
999}
1000
1001// Tests that when a message is sent before the bridge starts up, but is
1002// configured as reliable, we forward it. Confirm this works across server
1003// resets.
1004TEST_F(MessageBridgeTest, ReliableSentBeforeServerStartup) {
1005 // Now do it for "raspberrypi2", the client.
1006 DoSetShmBase("pi2");
1007 FLAGS_override_hostname = "raspberrypi2";
1008
1009 FLAGS_application_name = "pi2_message_bridge_server";
1010 aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
1011 MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
1012
1013 FLAGS_application_name = "pi2_message_bridge_client";
1014 aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
1015 MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
1016
1017 aos::ShmEventLoop receive_event_loop(&pi2_config.message());
1018 aos::Fetcher<examples::Ping> ping_fetcher =
1019 receive_event_loop.MakeFetcher<examples::Ping>("/test");
1020 aos::Fetcher<examples::Ping> unreliable_ping_fetcher =
1021 receive_event_loop.MakeFetcher<examples::Ping>("/unreliable");
1022 aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
1023 receive_event_loop.MakeFetcher<ClientStatistics>("/pi2/aos");
1024
1025 DoSetShmBase("pi1");
1026 // Force ourselves to be "raspberrypi" and allocate everything.
1027 FLAGS_override_hostname = "raspberrypi";
1028
1029 FLAGS_application_name = "sender";
1030 aos::ShmEventLoop send_event_loop(&pi1_config.message());
1031 aos::Sender<examples::Ping> ping_sender =
1032 send_event_loop.MakeSender<examples::Ping>("/test");
1033 {
1034 aos::Sender<examples::Ping>::Builder builder = ping_sender.MakeBuilder();
1035 examples::Ping::Builder ping_builder =
1036 builder.MakeBuilder<examples::Ping>();
1037 ping_builder.add_value(1);
1038 builder.Send(ping_builder.Finish());
1039 }
1040
1041 FLAGS_application_name = "pi1_message_bridge_client";
1042 aos::ShmEventLoop pi1_client_event_loop(&pi1_config.message());
1043 MessageBridgeClient pi1_message_bridge_client(&pi1_client_event_loop);
1044
1045 FLAGS_application_name = "pi1_timestamp";
1046 aos::ShmEventLoop pi1_remote_timestamp_event_loop(&pi1_config.message());
1047
1048 const size_t ping_channel_index = configuration::ChannelIndex(
1049 receive_event_loop.configuration(), ping_fetcher.channel());
1050
1051 std::atomic<int> ping_timestamp_count{0};
1052 pi1_remote_timestamp_event_loop.MakeWatcher(
1053 "/pi1/aos/remote_timestamps/pi2",
Austin Schuh0de30f32020-12-06 12:44:28 -08001054 [ping_channel_index, &ping_timestamp_count](const RemoteMessage &header) {
1055 VLOG(1) << "/pi1/aos/remote_timestamps/pi2 RemoteMessage "
1056 << aos::FlatbufferToJson(&header);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001057 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuh4889b182020-11-18 19:11:56 -08001058 if (header.channel_index() == ping_channel_index) {
1059 ++ping_timestamp_count;
1060 }
1061 });
1062
1063 // Before everything starts up, confirm there is no message.
1064 EXPECT_FALSE(ping_fetcher.Fetch());
1065 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
1066
1067 // Spin up the persistant pieces.
1068 std::thread pi1_client_thread(
1069 [&pi1_client_event_loop]() { pi1_client_event_loop.Run(); });
1070 std::thread pi2_server_thread(
1071 [&pi2_server_event_loop]() { pi2_server_event_loop.Run(); });
1072 std::thread pi2_client_thread(
1073 [&pi2_client_event_loop]() { pi2_client_event_loop.Run(); });
1074
1075 // Event used to wait for the timestamp counting thread to start.
1076 aos::Event event;
1077 std::thread pi1_remote_timestamp_thread(
1078 [&pi1_remote_timestamp_event_loop, &event]() {
1079 pi1_remote_timestamp_event_loop.OnRun([&event]() { event.Set(); });
1080 pi1_remote_timestamp_event_loop.Run();
1081 });
1082
1083 event.Wait();
1084
1085 {
1086 // Now, spin up a server for 2 seconds.
1087 FLAGS_application_name = "pi1_message_bridge_server";
1088 aos::ShmEventLoop pi1_server_event_loop(&pi1_config.message());
1089 MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
1090
1091 aos::TimerHandler *quit = pi1_server_event_loop.AddTimer(
1092 [&pi1_server_event_loop]() { pi1_server_event_loop.Exit(); });
1093 pi1_server_event_loop.OnRun([quit, &pi1_server_event_loop]() {
1094 // Stop between timestamps, not exactly on them.
1095 quit->Setup(pi1_server_event_loop.monotonic_now() +
1096 chrono::milliseconds(2050));
1097 });
1098
1099 // And go!
1100 pi1_server_event_loop.Run();
1101
1102 // Confirm there is no detected duplicate packet.
1103 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1104 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1105 ->Get(0)
1106 ->duplicate_packets(),
1107 0u);
1108
1109 EXPECT_TRUE(ping_fetcher.Fetch());
1110 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
1111 EXPECT_EQ(ping_timestamp_count, 1);
1112 LOG(INFO) << "Shutting down first pi1 MessageBridgeServer";
1113 }
1114
1115 {
1116 // Now, spin up a second server for 2 seconds.
1117 FLAGS_application_name = "pi1_message_bridge_server";
1118 aos::ShmEventLoop pi1_server_event_loop(&pi1_config.message());
1119 MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
1120
1121 aos::TimerHandler *quit = pi1_server_event_loop.AddTimer(
1122 [&pi1_server_event_loop]() { pi1_server_event_loop.Exit(); });
1123 pi1_server_event_loop.OnRun([quit, &pi1_server_event_loop]() {
1124 // Stop between timestamps, not exactly on them.
1125 quit->Setup(pi1_server_event_loop.monotonic_now() +
1126 chrono::milliseconds(2050));
1127 });
1128
1129 // And go!
1130 pi1_server_event_loop.Run();
1131
1132 // Confirm we detect the duplicate packet correctly.
1133 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1134 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1135 ->Get(0)
1136 ->duplicate_packets(),
1137 1u);
1138
1139 EXPECT_EQ(ping_timestamp_count, 1);
1140 EXPECT_FALSE(ping_fetcher.Fetch());
1141 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
1142 LOG(INFO) << "Shutting down first pi1 MessageBridgeServer";
1143 }
1144
1145 // Shut everyone else down
1146 pi1_client_event_loop.Exit();
1147 pi2_server_event_loop.Exit();
1148 pi2_client_event_loop.Exit();
1149 pi1_remote_timestamp_event_loop.Exit();
1150 pi1_remote_timestamp_thread.join();
1151 pi1_client_thread.join();
1152 pi2_server_thread.join();
1153 pi2_client_thread.join();
1154}
1155
Austin Schuhe84c3ed2019-12-14 15:29:48 -08001156} // namespace testing
1157} // namespace message_bridge
1158} // namespace aos