blob: c38a589d70941cb9938f113048d5ea850b3355fc [file] [log] [blame]
Austin Schuhe84c3ed2019-12-14 15:29:48 -08001#include "gtest/gtest.h"
2
3#include <chrono>
4#include <thread>
5
Austin Schuh2f8fd752020-09-01 22:38:28 -07006#include "absl/strings/str_cat.h"
Austin Schuh4889b182020-11-18 19:11:56 -08007#include "aos/event.h"
Austin Schuhe84c3ed2019-12-14 15:29:48 -08008#include "aos/events/ping_generated.h"
9#include "aos/events/pong_generated.h"
10#include "aos/network/message_bridge_client_lib.h"
11#include "aos/network/message_bridge_server_lib.h"
Jim Ostrowski2192ddb2020-06-24 19:07:31 -070012#include "aos/network/team_number.h"
Austin Schuhe991fe22020-11-18 16:53:39 -080013#include "aos/util/file.h"
Austin Schuhe84c3ed2019-12-14 15:29:48 -080014
15namespace aos {
Austin Schuh2f8fd752020-09-01 22:38:28 -070016void SetShmBase(const std::string_view base);
17
Austin Schuhe84c3ed2019-12-14 15:29:48 -080018namespace message_bridge {
19namespace testing {
20
21namespace chrono = std::chrono;
22
Austin Schuhe991fe22020-11-18 16:53:39 -080023std::string ShmBase(const std::string_view node) {
Austin Schuh2f8fd752020-09-01 22:38:28 -070024 const char *tmpdir_c_str = getenv("TEST_TMPDIR");
25 if (tmpdir_c_str != nullptr) {
Austin Schuhe991fe22020-11-18 16:53:39 -080026 return absl::StrCat(tmpdir_c_str, "/", node);
Austin Schuh2f8fd752020-09-01 22:38:28 -070027 } else {
Austin Schuhe991fe22020-11-18 16:53:39 -080028 return absl::StrCat("/dev/shm/", node);
Austin Schuh2f8fd752020-09-01 22:38:28 -070029 }
30}
31
Austin Schuhe991fe22020-11-18 16:53:39 -080032void DoSetShmBase(const std::string_view node) {
33 aos::SetShmBase(ShmBase(node));
34}
35
36class MessageBridgeTest : public ::testing::Test {
Austin Schuh0de30f32020-12-06 12:44:28 -080037 public:
38 MessageBridgeTest()
39 : pi1_config(aos::configuration::ReadConfig(
40 "aos/network/message_bridge_test_server_config.json")),
41 pi2_config(aos::configuration::ReadConfig(
42 "aos/network/message_bridge_test_client_config.json")) {
43 util::UnlinkRecursive(ShmBase("pi1"));
44 util::UnlinkRecursive(ShmBase("pi2"));
45 }
Austin Schuhe991fe22020-11-18 16:53:39 -080046
Austin Schuh0de30f32020-12-06 12:44:28 -080047 aos::FlatbufferDetachedBuffer<aos::Configuration> pi1_config;
48 aos::FlatbufferDetachedBuffer<aos::Configuration> pi2_config;
Austin Schuhe991fe22020-11-18 16:53:39 -080049};
50
Austin Schuhe84c3ed2019-12-14 15:29:48 -080051// Test that we can send a ping message over sctp and receive it.
Austin Schuhe991fe22020-11-18 16:53:39 -080052TEST_F(MessageBridgeTest, PingPong) {
Austin Schuhe84c3ed2019-12-14 15:29:48 -080053 // This is rather annoying to set up. We need to start up a client and
54 // server, on the same node, but get them to think that they are on different
55 // nodes.
56 //
57 // We then get to wait until they are connected.
58 //
59 // After they are connected, we send a Ping message.
60 //
61 // On the other end, we receive a Pong message.
62 //
63 // But, we need the client to not post directly to "/test" like it would in a
64 // real system, otherwise we will re-send the ping message... So, use an
65 // application specific map to have the client post somewhere else.
66 //
67 // To top this all off, each of these needs to be done with a ShmEventLoop,
68 // which needs to run in a separate thread... And it is really hard to get
69 // everything started up reliably. So just be super generous on timeouts and
70 // hope for the best. We can be more generous in the future if we need to.
71 //
72 // We are faking the application names by passing in --application_name=foo
Austin Schuh2f8fd752020-09-01 22:38:28 -070073 DoSetShmBase("pi1");
Austin Schuhe84c3ed2019-12-14 15:29:48 -080074 FLAGS_application_name = "pi1_message_bridge_server";
75 // Force ourselves to be "raspberrypi" and allocate everything.
76 FLAGS_override_hostname = "raspberrypi";
Austin Schuh2f8fd752020-09-01 22:38:28 -070077
Austin Schuhe991fe22020-11-18 16:53:39 -080078 aos::ShmEventLoop pi1_server_event_loop(&pi1_config.message());
Austin Schuh744153b2020-12-26 00:02:29 -080079 pi1_server_event_loop.SetRuntimeRealtimePriority(1);
Austin Schuh7bc59052020-02-16 23:48:33 -080080 MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
81
82 FLAGS_application_name = "pi1_message_bridge_client";
Austin Schuhe991fe22020-11-18 16:53:39 -080083 aos::ShmEventLoop pi1_client_event_loop(&pi1_config.message());
Austin Schuh744153b2020-12-26 00:02:29 -080084 pi1_client_event_loop.SetRuntimeRealtimePriority(1);
Austin Schuh7bc59052020-02-16 23:48:33 -080085 MessageBridgeClient pi1_message_bridge_client(&pi1_client_event_loop);
Austin Schuhe84c3ed2019-12-14 15:29:48 -080086
87 // And build the app which sends the pings.
88 FLAGS_application_name = "ping";
Austin Schuhe991fe22020-11-18 16:53:39 -080089 aos::ShmEventLoop ping_event_loop(&pi1_config.message());
Austin Schuhe84c3ed2019-12-14 15:29:48 -080090 aos::Sender<examples::Ping> ping_sender =
91 ping_event_loop.MakeSender<examples::Ping>("/test");
92
Austin Schuhe991fe22020-11-18 16:53:39 -080093 aos::ShmEventLoop pi1_test_event_loop(&pi1_config.message());
Austin Schuh0de30f32020-12-06 12:44:28 -080094 aos::Fetcher<RemoteMessage> message_header_fetcher1 =
95 pi1_test_event_loop.MakeFetcher<RemoteMessage>(
Austin Schuh2f8fd752020-09-01 22:38:28 -070096 "/pi1/aos/remote_timestamps/pi2");
97
98 // Fetchers for confirming the remote timestamps made it.
99 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
100 ping_event_loop.MakeFetcher<examples::Ping>("/test");
101 aos::Fetcher<Timestamp> pi1_on_pi1_timestamp_fetcher =
102 ping_event_loop.MakeFetcher<Timestamp>("/aos");
103
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800104 // Now do it for "raspberrypi2", the client.
105 FLAGS_application_name = "pi2_message_bridge_client";
106 FLAGS_override_hostname = "raspberrypi2";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700107 DoSetShmBase("pi2");
108
Austin Schuh5344c352020-04-12 17:04:26 -0700109 aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
Austin Schuh744153b2020-12-26 00:02:29 -0800110 pi2_client_event_loop.SetRuntimeRealtimePriority(1);
Austin Schuh7bc59052020-02-16 23:48:33 -0800111 MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
112
113 FLAGS_application_name = "pi2_message_bridge_server";
Austin Schuh5344c352020-04-12 17:04:26 -0700114 aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
Austin Schuh744153b2020-12-26 00:02:29 -0800115 pi2_server_event_loop.SetRuntimeRealtimePriority(1);
Austin Schuh7bc59052020-02-16 23:48:33 -0800116 MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800117
118 // And build the app which sends the pongs.
119 FLAGS_application_name = "pong";
Austin Schuh5344c352020-04-12 17:04:26 -0700120 aos::ShmEventLoop pong_event_loop(&pi2_config.message());
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800121
Austin Schuh7bc59052020-02-16 23:48:33 -0800122 // And build the app for testing.
123 FLAGS_application_name = "test";
Austin Schuh5344c352020-04-12 17:04:26 -0700124 aos::ShmEventLoop test_event_loop(&pi2_config.message());
Austin Schuh7bc59052020-02-16 23:48:33 -0800125
126 aos::Fetcher<ClientStatistics> client_statistics_fetcher =
127 test_event_loop.MakeFetcher<ClientStatistics>("/aos");
Austin Schuh0de30f32020-12-06 12:44:28 -0800128 aos::Fetcher<RemoteMessage> message_header_fetcher2 =
129 test_event_loop.MakeFetcher<RemoteMessage>(
Austin Schuh2f8fd752020-09-01 22:38:28 -0700130 "/pi2/aos/remote_timestamps/pi1");
131
132 // Event loop for fetching data delivered to pi2 from pi1 to match up
133 // messages.
134 aos::ShmEventLoop delivered_messages_event_loop(&pi2_config.message());
135 aos::Fetcher<Timestamp> pi1_on_pi2_timestamp_fetcher =
136 delivered_messages_event_loop.MakeFetcher<Timestamp>("/pi1/aos");
137 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
138 delivered_messages_event_loop.MakeFetcher<examples::Ping>("/test");
139 EXPECT_FALSE(ping_on_pi2_fetcher.Fetch());
140 EXPECT_FALSE(pi1_on_pi2_timestamp_fetcher.Fetch());
Austin Schuh7bc59052020-02-16 23:48:33 -0800141
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800142 // Count the pongs.
143 int pong_count = 0;
144 pong_event_loop.MakeWatcher(
Austin Schuh2f8fd752020-09-01 22:38:28 -0700145 "/test", [&pong_count](const examples::Ping &ping) {
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800146 ++pong_count;
Austin Schuh1ca49e92020-12-11 00:01:27 -0800147 VLOG(1) << "Got ping back " << FlatbufferToJson(&ping);
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800148 });
149
150 FLAGS_override_hostname = "";
151
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800152 // Wait until we are connected, then send.
153 int ping_count = 0;
Austin Schuh7bc59052020-02-16 23:48:33 -0800154 int pi1_server_statistics_count = 0;
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800155 ping_event_loop.MakeWatcher(
Austin Schuh196a4452020-03-15 23:12:03 -0700156 "/pi1/aos",
Austin Schuh7bc59052020-02-16 23:48:33 -0800157 [&ping_count, &pi2_client_event_loop, &ping_sender,
158 &pi1_server_statistics_count](const ServerStatistics &stats) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800159 VLOG(1) << "/pi1/aos ServerStatistics " << FlatbufferToJson(&stats);
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800160
161 ASSERT_TRUE(stats.has_connections());
162 EXPECT_EQ(stats.connections()->size(), 1);
163
164 bool connected = false;
165 for (const ServerConnection *connection : *stats.connections()) {
Austin Schuh7bc59052020-02-16 23:48:33 -0800166 // Confirm that we are estimating the server time offset correctly. It
167 // should be about 0 since we are on the same machine here.
168 if (connection->has_monotonic_offset()) {
169 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
170 chrono::milliseconds(1));
171 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
172 chrono::milliseconds(-1));
173 ++pi1_server_statistics_count;
174 }
175
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800176 if (connection->node()->name()->string_view() ==
Austin Schuh7bc59052020-02-16 23:48:33 -0800177 pi2_client_event_loop.node()->name()->string_view()) {
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800178 if (connection->state() == State::CONNECTED) {
Austin Schuh20ac95d2020-12-05 17:24:19 -0800179 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800180 connected = true;
181 }
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800182 }
183 }
184
185 if (connected) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800186 VLOG(1) << "Connected! Sent ping.";
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800187 auto builder = ping_sender.MakeBuilder();
188 examples::Ping::Builder ping_builder =
189 builder.MakeBuilder<examples::Ping>();
190 ping_builder.add_value(ping_count + 971);
191 builder.Send(ping_builder.Finish());
192 ++ping_count;
193 }
194 });
195
Austin Schuh7bc59052020-02-16 23:48:33 -0800196 // Confirm both client and server statistics messages have decent offsets in
197 // them.
198 int pi2_server_statistics_count = 0;
Austin Schuh196a4452020-03-15 23:12:03 -0700199 pong_event_loop.MakeWatcher("/pi2/aos", [&pi2_server_statistics_count](
Austin Schuh7bc59052020-02-16 23:48:33 -0800200 const ServerStatistics &stats) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800201 VLOG(1) << "/pi2/aos ServerStatistics " << FlatbufferToJson(&stats);
Austin Schuh7bc59052020-02-16 23:48:33 -0800202 for (const ServerConnection *connection : *stats.connections()) {
203 if (connection->has_monotonic_offset()) {
204 ++pi2_server_statistics_count;
205 // Confirm that we are estimating the server time offset correctly. It
206 // should be about 0 since we are on the same machine here.
207 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
208 chrono::milliseconds(1));
209 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
210 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800211 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh7bc59052020-02-16 23:48:33 -0800212 }
213 }
214 });
215
216 int pi1_client_statistics_count = 0;
Austin Schuh5344c352020-04-12 17:04:26 -0700217 ping_event_loop.MakeWatcher("/pi1/aos", [&pi1_client_statistics_count](
218 const ClientStatistics &stats) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800219 VLOG(1) << "/pi1/aos ClientStatistics " << FlatbufferToJson(&stats);
Austin Schuh7bc59052020-02-16 23:48:33 -0800220
Austin Schuh5344c352020-04-12 17:04:26 -0700221 for (const ClientConnection *connection : *stats.connections()) {
222 if (connection->has_monotonic_offset()) {
223 ++pi1_client_statistics_count;
224 // It takes at least 10 microseconds to send a message between the
225 // client and server. The min (filtered) time shouldn't be over 10
226 // milliseconds on localhost. This might have to bump up if this is
227 // proving flaky.
228 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
229 chrono::milliseconds(10));
230 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
231 chrono::microseconds(10));
232 }
233 }
234 });
Austin Schuh7bc59052020-02-16 23:48:33 -0800235
236 int pi2_client_statistics_count = 0;
Austin Schuh196a4452020-03-15 23:12:03 -0700237 pong_event_loop.MakeWatcher("/pi2/aos", [&pi2_client_statistics_count](
Austin Schuh7bc59052020-02-16 23:48:33 -0800238 const ClientStatistics &stats) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800239 VLOG(1) << "/pi2/aos ClientStatistics " << FlatbufferToJson(&stats);
Austin Schuh7bc59052020-02-16 23:48:33 -0800240
241 for (const ClientConnection *connection : *stats.connections()) {
242 if (connection->has_monotonic_offset()) {
243 ++pi2_client_statistics_count;
244 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
245 chrono::milliseconds(10));
246 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
247 chrono::microseconds(10));
248 }
249 }
250 });
251
Austin Schuh196a4452020-03-15 23:12:03 -0700252 ping_event_loop.MakeWatcher("/pi1/aos", [](const Timestamp &timestamp) {
Austin Schuh7bc59052020-02-16 23:48:33 -0800253 EXPECT_TRUE(timestamp.has_offsets());
Austin Schuh1ca49e92020-12-11 00:01:27 -0800254 VLOG(1) << "/pi1/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuh7bc59052020-02-16 23:48:33 -0800255 });
Austin Schuh196a4452020-03-15 23:12:03 -0700256 pong_event_loop.MakeWatcher("/pi2/aos", [](const Timestamp &timestamp) {
Austin Schuh7bc59052020-02-16 23:48:33 -0800257 EXPECT_TRUE(timestamp.has_offsets());
Austin Schuh1ca49e92020-12-11 00:01:27 -0800258 VLOG(1) << "/pi2/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuh7bc59052020-02-16 23:48:33 -0800259 });
260
261 // Run for 5 seconds to make sure we have time to estimate the offset.
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800262 aos::TimerHandler *quit = ping_event_loop.AddTimer(
263 [&ping_event_loop]() { ping_event_loop.Exit(); });
264 ping_event_loop.OnRun([quit, &ping_event_loop]() {
Austin Schuh7bc59052020-02-16 23:48:33 -0800265 // Stop between timestamps, not exactly on them.
266 quit->Setup(ping_event_loop.monotonic_now() + chrono::milliseconds(5050));
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800267 });
268
Austin Schuh2f8fd752020-09-01 22:38:28 -0700269 // Find the channel index for both the /pi1/aos Timestamp channel and Ping
270 // channel.
271 const size_t pi1_timestamp_channel = configuration::ChannelIndex(
272 pong_event_loop.configuration(), pi1_on_pi2_timestamp_fetcher.channel());
273 const size_t ping_timestamp_channel =
274 configuration::ChannelIndex(delivered_messages_event_loop.configuration(),
275 ping_on_pi2_fetcher.channel());
276
277 for (const Channel *channel : *ping_event_loop.configuration()->channels()) {
278 VLOG(1) << "Channel "
279 << configuration::ChannelIndex(ping_event_loop.configuration(),
280 channel)
281 << " " << configuration::CleanedChannelToString(channel);
282 }
283
284 // For each remote timestamp we get back, confirm that it is either a ping
285 // message, or a timestamp we sent out. Also confirm that the timestamps are
286 // correct.
287 ping_event_loop.MakeWatcher(
288 "/pi1/aos/remote_timestamps/pi2",
289 [pi1_timestamp_channel, ping_timestamp_channel, &ping_on_pi2_fetcher,
290 &ping_on_pi1_fetcher, &pi1_on_pi2_timestamp_fetcher,
Austin Schuh0de30f32020-12-06 12:44:28 -0800291 &pi1_on_pi1_timestamp_fetcher](const RemoteMessage &header) {
292 VLOG(1) << "/pi1/aos/remote_timestamps/pi2 RemoteMessage "
293 << aos::FlatbufferToJson(&header);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700294
Austin Schuh20ac95d2020-12-05 17:24:19 -0800295 EXPECT_TRUE(header.has_boot_uuid());
296
Austin Schuh2f8fd752020-09-01 22:38:28 -0700297 const aos::monotonic_clock::time_point header_monotonic_sent_time(
298 chrono::nanoseconds(header.monotonic_sent_time()));
299 const aos::realtime_clock::time_point header_realtime_sent_time(
300 chrono::nanoseconds(header.realtime_sent_time()));
301 const aos::monotonic_clock::time_point header_monotonic_remote_time(
302 chrono::nanoseconds(header.monotonic_remote_time()));
303 const aos::realtime_clock::time_point header_realtime_remote_time(
304 chrono::nanoseconds(header.realtime_remote_time()));
305
306 const Context *pi1_context = nullptr;
307 const Context *pi2_context = nullptr;
308
309 if (header.channel_index() == pi1_timestamp_channel) {
310 // Find the forwarded message.
311 while (pi1_on_pi2_timestamp_fetcher.context().monotonic_event_time <
312 header_monotonic_sent_time) {
313 ASSERT_TRUE(pi1_on_pi2_timestamp_fetcher.FetchNext());
314 }
315
316 // And the source message.
317 while (pi1_on_pi1_timestamp_fetcher.context().monotonic_event_time <
318 header_monotonic_remote_time) {
319 ASSERT_TRUE(pi1_on_pi1_timestamp_fetcher.FetchNext());
320 }
321
322 pi1_context = &pi1_on_pi1_timestamp_fetcher.context();
323 pi2_context = &pi1_on_pi2_timestamp_fetcher.context();
324 } else if (header.channel_index() == ping_timestamp_channel) {
325 // Find the forwarded message.
326 while (ping_on_pi2_fetcher.context().monotonic_event_time <
327 header_monotonic_sent_time) {
328 ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
329 }
330
331 // And the source message.
332 while (ping_on_pi1_fetcher.context().monotonic_event_time <
333 header_monotonic_remote_time) {
334 ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
335 }
336
337 pi1_context = &ping_on_pi1_fetcher.context();
338 pi2_context = &ping_on_pi2_fetcher.context();
339 } else {
340 LOG(FATAL) << "Unknown channel";
341 }
342
343 // Confirm the forwarded message has matching timestamps to the
344 // timestamps we got back.
345 EXPECT_EQ(pi2_context->queue_index, header.queue_index());
346 EXPECT_EQ(pi2_context->monotonic_event_time,
347 header_monotonic_sent_time);
348 EXPECT_EQ(pi2_context->realtime_event_time, header_realtime_sent_time);
349 EXPECT_EQ(pi2_context->realtime_remote_time,
350 header_realtime_remote_time);
351 EXPECT_EQ(pi2_context->monotonic_remote_time,
352 header_monotonic_remote_time);
353
354 // Confirm the forwarded message also matches the source message.
355 EXPECT_EQ(pi1_context->queue_index, header.queue_index());
356 EXPECT_EQ(pi1_context->monotonic_event_time,
357 header_monotonic_remote_time);
358 EXPECT_EQ(pi1_context->realtime_event_time,
359 header_realtime_remote_time);
360 });
361
Austin Schuh7bc59052020-02-16 23:48:33 -0800362 // Start everything up. Pong is the only thing we don't know how to wait on,
363 // so start it first.
364 std::thread pong_thread([&pong_event_loop]() { pong_event_loop.Run(); });
365
366 std::thread pi1_server_thread(
367 [&pi1_server_event_loop]() { pi1_server_event_loop.Run(); });
368 std::thread pi1_client_thread(
369 [&pi1_client_event_loop]() { pi1_client_event_loop.Run(); });
370 std::thread pi2_client_thread(
371 [&pi2_client_event_loop]() { pi2_client_event_loop.Run(); });
372 std::thread pi2_server_thread(
373 [&pi2_server_event_loop]() { pi2_server_event_loop.Run(); });
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800374
375 // And go!
376 ping_event_loop.Run();
377
378 // Shut everyone else down
Austin Schuh7bc59052020-02-16 23:48:33 -0800379 pi1_server_event_loop.Exit();
380 pi1_client_event_loop.Exit();
381 pi2_client_event_loop.Exit();
382 pi2_server_event_loop.Exit();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800383 pong_event_loop.Exit();
Austin Schuh7bc59052020-02-16 23:48:33 -0800384 pi1_server_thread.join();
385 pi1_client_thread.join();
386 pi2_client_thread.join();
387 pi2_server_thread.join();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800388 pong_thread.join();
389
390 // Make sure we sent something.
391 EXPECT_GE(ping_count, 1);
392 // And got something back.
393 EXPECT_GE(pong_count, 1);
Austin Schuh7bc59052020-02-16 23:48:33 -0800394
395 // Confirm that we are estimating a monotonic offset on the client.
396 ASSERT_TRUE(client_statistics_fetcher.Fetch());
397
398 EXPECT_EQ(client_statistics_fetcher->connections()->size(), 1u);
399 EXPECT_EQ(client_statistics_fetcher->connections()
400 ->Get(0)
401 ->node()
402 ->name()
403 ->string_view(),
404 "pi1");
405
406 // Make sure the offset in one direction is less than a second.
407 EXPECT_GT(
408 client_statistics_fetcher->connections()->Get(0)->monotonic_offset(), 0);
409 EXPECT_LT(
410 client_statistics_fetcher->connections()->Get(0)->monotonic_offset(),
411 1000000000);
412
413 EXPECT_GE(pi1_server_statistics_count, 2);
414 EXPECT_GE(pi2_server_statistics_count, 2);
415 EXPECT_GE(pi1_client_statistics_count, 2);
416 EXPECT_GE(pi2_client_statistics_count, 2);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700417
418 // Confirm we got timestamps back!
419 EXPECT_TRUE(message_header_fetcher1.Fetch());
420 EXPECT_TRUE(message_header_fetcher2.Fetch());
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800421}
422
Austin Schuh5344c352020-04-12 17:04:26 -0700423// Test that the client disconnecting triggers the server offsets on both sides
424// to clear.
Austin Schuhe991fe22020-11-18 16:53:39 -0800425TEST_F(MessageBridgeTest, ClientRestart) {
Austin Schuh5344c352020-04-12 17:04:26 -0700426 // This is rather annoying to set up. We need to start up a client and
427 // server, on the same node, but get them to think that they are on different
428 // nodes.
429 //
430 // We need the client to not post directly to "/test" like it would in a
431 // real system, otherwise we will re-send the ping message... So, use an
432 // application specific map to have the client post somewhere else.
433 //
434 // To top this all off, each of these needs to be done with a ShmEventLoop,
435 // which needs to run in a separate thread... And it is really hard to get
436 // everything started up reliably. So just be super generous on timeouts and
437 // hope for the best. We can be more generous in the future if we need to.
438 //
439 // We are faking the application names by passing in --application_name=foo
Austin Schuh5344c352020-04-12 17:04:26 -0700440 FLAGS_application_name = "pi1_message_bridge_server";
441 // Force ourselves to be "raspberrypi" and allocate everything.
442 FLAGS_override_hostname = "raspberrypi";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700443 DoSetShmBase("pi1");
Austin Schuhe991fe22020-11-18 16:53:39 -0800444 aos::ShmEventLoop pi1_server_event_loop(&pi1_config.message());
Austin Schuh744153b2020-12-26 00:02:29 -0800445 pi1_server_event_loop.SetRuntimeRealtimePriority(1);
Austin Schuh5344c352020-04-12 17:04:26 -0700446 MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
447
448 FLAGS_application_name = "pi1_message_bridge_client";
Austin Schuhe991fe22020-11-18 16:53:39 -0800449 aos::ShmEventLoop pi1_client_event_loop(&pi1_config.message());
Austin Schuh744153b2020-12-26 00:02:29 -0800450 pi1_client_event_loop.SetRuntimeRealtimePriority(1);
Austin Schuh5344c352020-04-12 17:04:26 -0700451 MessageBridgeClient pi1_message_bridge_client(&pi1_client_event_loop);
452
453 // And build the app for testing.
454 FLAGS_application_name = "test1";
Austin Schuhe991fe22020-11-18 16:53:39 -0800455 aos::ShmEventLoop pi1_test_event_loop(&pi1_config.message());
Austin Schuh5344c352020-04-12 17:04:26 -0700456 aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
457 pi1_test_event_loop.MakeFetcher<ServerStatistics>("/pi1/aos");
458
459 // Now do it for "raspberrypi2", the client.
460 FLAGS_override_hostname = "raspberrypi2";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700461 DoSetShmBase("pi2");
Austin Schuh5344c352020-04-12 17:04:26 -0700462 FLAGS_application_name = "pi2_message_bridge_server";
463 aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
Austin Schuh744153b2020-12-26 00:02:29 -0800464 pi2_server_event_loop.SetRuntimeRealtimePriority(1);
Austin Schuh5344c352020-04-12 17:04:26 -0700465 MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
466
467 // And build the app for testing.
468 FLAGS_application_name = "test2";
469 aos::ShmEventLoop pi2_test_event_loop(&pi2_config.message());
470 aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
471 pi2_test_event_loop.MakeFetcher<ServerStatistics>("/pi2/aos");
472
473 // Wait until we are connected, then send.
474 pi1_test_event_loop.MakeWatcher(
475 "/pi1/aos", [](const ServerStatistics &stats) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800476 VLOG(1) << "/pi1/aos ServerStatistics " << FlatbufferToJson(&stats);
Austin Schuh5344c352020-04-12 17:04:26 -0700477 });
478
479 pi2_test_event_loop.MakeWatcher(
480 "/pi2/aos", [](const ServerStatistics &stats) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800481 VLOG(1) << "/pi2/aos ServerStatistics " << FlatbufferToJson(&stats);
Austin Schuh5344c352020-04-12 17:04:26 -0700482 });
483
484 pi1_test_event_loop.MakeWatcher(
485 "/pi1/aos", [](const ClientStatistics &stats) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800486 VLOG(1) << "/pi1/aos ClientStatistics " << FlatbufferToJson(&stats);
Austin Schuh5344c352020-04-12 17:04:26 -0700487 });
488
489 pi2_test_event_loop.MakeWatcher(
490 "/pi2/aos", [](const ClientStatistics &stats) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800491 VLOG(1) << "/pi2/aos ClientStatistics " << FlatbufferToJson(&stats);
Austin Schuh5344c352020-04-12 17:04:26 -0700492 });
493
494 pi1_test_event_loop.MakeWatcher("/pi1/aos", [](const Timestamp &timestamp) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800495 VLOG(1) << "/pi1/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuh5344c352020-04-12 17:04:26 -0700496 });
497 pi2_test_event_loop.MakeWatcher("/pi2/aos", [](const Timestamp &timestamp) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800498 VLOG(1) << "/pi2/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuh5344c352020-04-12 17:04:26 -0700499 });
500
501 // Start everything up. Pong is the only thing we don't know how to wait on,
502 // so start it first.
503 std::thread pi1_test_thread(
504 [&pi1_test_event_loop]() { pi1_test_event_loop.Run(); });
505 std::thread pi2_test_thread(
506 [&pi2_test_event_loop]() { pi2_test_event_loop.Run(); });
507
508 std::thread pi1_server_thread(
509 [&pi1_server_event_loop]() { pi1_server_event_loop.Run(); });
510 std::thread pi1_client_thread(
511 [&pi1_client_event_loop]() { pi1_client_event_loop.Run(); });
512 std::thread pi2_server_thread(
513 [&pi2_server_event_loop]() { pi2_server_event_loop.Run(); });
514
515 {
516 FLAGS_application_name = "pi2_message_bridge_client";
517 aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
Austin Schuh744153b2020-12-26 00:02:29 -0800518 pi2_client_event_loop.SetRuntimeRealtimePriority(1);
Austin Schuh5344c352020-04-12 17:04:26 -0700519 MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
520
521 // Run for 5 seconds to make sure we have time to estimate the offset.
522 aos::TimerHandler *const quit = pi2_client_event_loop.AddTimer(
523 [&pi2_client_event_loop]() { pi2_client_event_loop.Exit(); });
524 pi2_client_event_loop.OnRun([quit, &pi2_client_event_loop]() {
525 // Stop between timestamps, not exactly on them.
526 quit->Setup(pi2_client_event_loop.monotonic_now() +
527 chrono::milliseconds(3050));
528 });
529
530 // And go!
531 pi2_client_event_loop.Run();
532
533 // Now confirm we are synchronized.
534 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
535 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
536
537 const ServerConnection *const pi1_connection =
538 pi1_server_statistics_fetcher->connections()->Get(0);
539 const ServerConnection *const pi2_connection =
540 pi2_server_statistics_fetcher->connections()->Get(0);
541
542 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
543 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
544 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
545 chrono::milliseconds(1));
546 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
547 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800548 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700549
550 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
551 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
552 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
553 chrono::milliseconds(1));
554 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
555 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800556 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700557 }
558
559 std::this_thread::sleep_for(std::chrono::seconds(2));
560
561 {
562 // Now confirm we are un-synchronized.
563 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
564 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
565 const ServerConnection *const pi1_connection =
566 pi1_server_statistics_fetcher->connections()->Get(0);
567 const ServerConnection *const pi2_connection =
568 pi2_server_statistics_fetcher->connections()->Get(0);
569
570 EXPECT_EQ(pi1_connection->state(), State::DISCONNECTED);
571 EXPECT_FALSE(pi1_connection->has_monotonic_offset());
Austin Schuh20ac95d2020-12-05 17:24:19 -0800572 EXPECT_FALSE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700573 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
574 EXPECT_FALSE(pi2_connection->has_monotonic_offset());
Austin Schuh20ac95d2020-12-05 17:24:19 -0800575 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700576 }
577
578 {
579 FLAGS_application_name = "pi2_message_bridge_client";
580 aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
Austin Schuh744153b2020-12-26 00:02:29 -0800581 pi2_client_event_loop.SetRuntimeRealtimePriority(1);
Austin Schuh5344c352020-04-12 17:04:26 -0700582 MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
583
584 // Run for 5 seconds to make sure we have time to estimate the offset.
585 aos::TimerHandler *const quit = pi2_client_event_loop.AddTimer(
586 [&pi2_client_event_loop]() { pi2_client_event_loop.Exit(); });
587 pi2_client_event_loop.OnRun([quit, &pi2_client_event_loop]() {
588 // Stop between timestamps, not exactly on them.
589 quit->Setup(pi2_client_event_loop.monotonic_now() +
590 chrono::milliseconds(3050));
591 });
592
593 // And go!
594 pi2_client_event_loop.Run();
595
596 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
597 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
598
599 // Now confirm we are synchronized again.
600 const ServerConnection *const pi1_connection =
601 pi1_server_statistics_fetcher->connections()->Get(0);
602 const ServerConnection *const pi2_connection =
603 pi2_server_statistics_fetcher->connections()->Get(0);
604
605 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
606 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
607 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
608 chrono::milliseconds(1));
609 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
610 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800611 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700612
613 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
614 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
615 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
616 chrono::milliseconds(1));
617 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
618 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800619 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700620 }
621
622 // Shut everyone else down
623 pi1_server_event_loop.Exit();
624 pi1_client_event_loop.Exit();
625 pi2_server_event_loop.Exit();
626 pi1_test_event_loop.Exit();
627 pi2_test_event_loop.Exit();
628 pi1_server_thread.join();
629 pi1_client_thread.join();
630 pi2_server_thread.join();
631 pi1_test_thread.join();
632 pi2_test_thread.join();
633}
634
635// Test that the server disconnecting triggers the server offsets on the other
636// side to clear, along with the other client.
Austin Schuhe991fe22020-11-18 16:53:39 -0800637TEST_F(MessageBridgeTest, ServerRestart) {
Austin Schuh5344c352020-04-12 17:04:26 -0700638 // This is rather annoying to set up. We need to start up a client and
639 // server, on the same node, but get them to think that they are on different
640 // nodes.
641 //
642 // We need the client to not post directly to "/test" like it would in a
643 // real system, otherwise we will re-send the ping message... So, use an
644 // application specific map to have the client post somewhere else.
645 //
646 // To top this all off, each of these needs to be done with a ShmEventLoop,
647 // which needs to run in a separate thread... And it is really hard to get
648 // everything started up reliably. So just be super generous on timeouts and
649 // hope for the best. We can be more generous in the future if we need to.
650 //
651 // We are faking the application names by passing in --application_name=foo
Austin Schuh5344c352020-04-12 17:04:26 -0700652 FLAGS_application_name = "pi1_message_bridge_server";
653 // Force ourselves to be "raspberrypi" and allocate everything.
654 FLAGS_override_hostname = "raspberrypi";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700655 DoSetShmBase("pi1");
Austin Schuhe991fe22020-11-18 16:53:39 -0800656 aos::ShmEventLoop pi1_server_event_loop(&pi1_config.message());
Austin Schuh744153b2020-12-26 00:02:29 -0800657 pi1_server_event_loop.SetRuntimeRealtimePriority(1);
Austin Schuh5344c352020-04-12 17:04:26 -0700658 MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
659
660 FLAGS_application_name = "pi1_message_bridge_client";
Austin Schuhe991fe22020-11-18 16:53:39 -0800661 aos::ShmEventLoop pi1_client_event_loop(&pi1_config.message());
Austin Schuh744153b2020-12-26 00:02:29 -0800662 pi1_client_event_loop.SetRuntimeRealtimePriority(1);
Austin Schuh5344c352020-04-12 17:04:26 -0700663 MessageBridgeClient pi1_message_bridge_client(&pi1_client_event_loop);
664
665 // And build the app for testing.
666 FLAGS_application_name = "test1";
Austin Schuhe991fe22020-11-18 16:53:39 -0800667 aos::ShmEventLoop pi1_test_event_loop(&pi1_config.message());
Austin Schuh5344c352020-04-12 17:04:26 -0700668 aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
669 pi1_test_event_loop.MakeFetcher<ServerStatistics>("/pi1/aos");
670 aos::Fetcher<ClientStatistics> pi1_client_statistics_fetcher =
671 pi1_test_event_loop.MakeFetcher<ClientStatistics>("/pi1/aos");
672
673 // Now do it for "raspberrypi2", the client.
674 FLAGS_override_hostname = "raspberrypi2";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700675 DoSetShmBase("pi2");
Austin Schuh5344c352020-04-12 17:04:26 -0700676 FLAGS_application_name = "pi2_message_bridge_client";
677 aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
Austin Schuh744153b2020-12-26 00:02:29 -0800678 pi2_client_event_loop.SetRuntimeRealtimePriority(1);
Austin Schuh5344c352020-04-12 17:04:26 -0700679 MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
680
681 // And build the app for testing.
682 FLAGS_application_name = "test2";
683 aos::ShmEventLoop pi2_test_event_loop(&pi2_config.message());
684 aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
685 pi2_test_event_loop.MakeFetcher<ServerStatistics>("/pi2/aos");
686
687 // Wait until we are connected, then send.
688 pi1_test_event_loop.MakeWatcher(
689 "/pi1/aos", [](const ServerStatistics &stats) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800690 VLOG(1) << "/pi1/aos ServerStatistics " << FlatbufferToJson(&stats);
Austin Schuh5344c352020-04-12 17:04:26 -0700691 });
692
693 // Confirm both client and server statistics messages have decent offsets in
694 // them.
695 pi2_test_event_loop.MakeWatcher(
696 "/pi2/aos", [](const ServerStatistics &stats) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800697 VLOG(1) << "/pi2/aos ServerStatistics " << FlatbufferToJson(&stats);
Austin Schuh5344c352020-04-12 17:04:26 -0700698 });
699
700 pi1_test_event_loop.MakeWatcher(
701 "/pi1/aos", [](const ClientStatistics &stats) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800702 VLOG(1) << "/pi1/aos ClientStatistics " << FlatbufferToJson(&stats);
Austin Schuh5344c352020-04-12 17:04:26 -0700703 });
704
705 pi2_test_event_loop.MakeWatcher(
706 "/pi2/aos", [](const ClientStatistics &stats) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800707 VLOG(1) << "/pi2/aos ClientStatistics " << FlatbufferToJson(&stats);
Austin Schuh5344c352020-04-12 17:04:26 -0700708 });
709
710 pi1_test_event_loop.MakeWatcher("/pi1/aos", [](const Timestamp &timestamp) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800711 VLOG(1) << "/pi1/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuh5344c352020-04-12 17:04:26 -0700712 });
713 pi2_test_event_loop.MakeWatcher("/pi2/aos", [](const Timestamp &timestamp) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800714 VLOG(1) << "/pi2/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuh5344c352020-04-12 17:04:26 -0700715 });
716
717 // Start everything up. Pong is the only thing we don't know how to wait on,
718 // so start it first.
719 std::thread pi1_test_thread(
720 [&pi1_test_event_loop]() { pi1_test_event_loop.Run(); });
721 std::thread pi2_test_thread(
722 [&pi2_test_event_loop]() { pi2_test_event_loop.Run(); });
723
724 std::thread pi1_server_thread(
725 [&pi1_server_event_loop]() { pi1_server_event_loop.Run(); });
726 std::thread pi1_client_thread(
727 [&pi1_client_event_loop]() { pi1_client_event_loop.Run(); });
728 std::thread pi2_client_thread(
729 [&pi2_client_event_loop]() { pi2_client_event_loop.Run(); });
730
731 {
732 FLAGS_application_name = "pi2_message_bridge_server";
733 aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
Austin Schuh744153b2020-12-26 00:02:29 -0800734 pi2_server_event_loop.SetRuntimeRealtimePriority(1);
Austin Schuh5344c352020-04-12 17:04:26 -0700735 MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
736
737 // Run for 5 seconds to make sure we have time to estimate the offset.
738 aos::TimerHandler *const quit = pi2_server_event_loop.AddTimer(
739 [&pi2_server_event_loop]() { pi2_server_event_loop.Exit(); });
740 pi2_server_event_loop.OnRun([quit, &pi2_server_event_loop]() {
741 // Stop between timestamps, not exactly on them.
742 quit->Setup(pi2_server_event_loop.monotonic_now() +
743 chrono::milliseconds(3050));
744 });
745
746 // And go!
747 pi2_server_event_loop.Run();
748
749 // Now confirm we are synchronized.
750 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
751 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
752
753 const ServerConnection *const pi1_connection =
754 pi1_server_statistics_fetcher->connections()->Get(0);
755 const ServerConnection *const pi2_connection =
756 pi2_server_statistics_fetcher->connections()->Get(0);
757
758 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
759 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
760 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
761 chrono::milliseconds(1));
762 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
763 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800764 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700765
766 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
767 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
768 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
769 chrono::milliseconds(1));
770 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
771 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800772 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700773 }
774
775 std::this_thread::sleep_for(std::chrono::seconds(2));
776
777 {
778 // And confirm we are unsynchronized.
779 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
780 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
781
782 const ServerConnection *const pi1_server_connection =
783 pi1_server_statistics_fetcher->connections()->Get(0);
784 const ClientConnection *const pi1_client_connection =
785 pi1_client_statistics_fetcher->connections()->Get(0);
786
787 EXPECT_EQ(pi1_server_connection->state(), State::CONNECTED);
788 EXPECT_FALSE(pi1_server_connection->has_monotonic_offset());
Austin Schuh20ac95d2020-12-05 17:24:19 -0800789 EXPECT_TRUE(pi1_server_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700790 EXPECT_EQ(pi1_client_connection->state(), State::DISCONNECTED);
791 EXPECT_FALSE(pi1_client_connection->has_monotonic_offset());
792 }
793
794 {
795 FLAGS_application_name = "pi2_message_bridge_server";
796 aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
Austin Schuh744153b2020-12-26 00:02:29 -0800797 pi2_server_event_loop.SetRuntimeRealtimePriority(1);
Austin Schuh5344c352020-04-12 17:04:26 -0700798 MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
799
800 // Run for 5 seconds to make sure we have time to estimate the offset.
801 aos::TimerHandler *const quit = pi2_server_event_loop.AddTimer(
802 [&pi2_server_event_loop]() { pi2_server_event_loop.Exit(); });
803 pi2_server_event_loop.OnRun([quit, &pi2_server_event_loop]() {
804 // Stop between timestamps, not exactly on them.
805 quit->Setup(pi2_server_event_loop.monotonic_now() +
806 chrono::milliseconds(3050));
807 });
808
809 // And go!
810 pi2_server_event_loop.Run();
811
812 // And confirm we are synchronized again.
813 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
814 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
815
816 const ServerConnection *const pi1_connection =
817 pi1_server_statistics_fetcher->connections()->Get(0);
818 const ServerConnection *const pi2_connection =
819 pi2_server_statistics_fetcher->connections()->Get(0);
820
821 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
822 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
823 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
824 chrono::milliseconds(1));
825 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
826 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800827 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700828
829 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
830 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
831 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
832 chrono::milliseconds(1));
833 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
834 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800835 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700836 }
837
838 // Shut everyone else down
839 pi1_server_event_loop.Exit();
840 pi1_client_event_loop.Exit();
841 pi2_client_event_loop.Exit();
842 pi1_test_event_loop.Exit();
843 pi2_test_event_loop.Exit();
844 pi1_server_thread.join();
845 pi1_client_thread.join();
846 pi2_client_thread.join();
847 pi1_test_thread.join();
848 pi2_test_thread.join();
849}
850
Austin Schuh4889b182020-11-18 19:11:56 -0800851// TODO(austin): The above test confirms that the external state does the right
Austin Schuh5344c352020-04-12 17:04:26 -0700852// thing, but doesn't confirm that the internal state does. We either need to
853// expose a way to check the state in a thread-safe way, or need a way to jump
854// time for one node to do that.
855
Austin Schuh4889b182020-11-18 19:11:56 -0800856void SendPing(aos::Sender<examples::Ping> *sender, int value) {
857 aos::Sender<examples::Ping>::Builder builder = sender->MakeBuilder();
858 examples::Ping::Builder ping_builder = builder.MakeBuilder<examples::Ping>();
859 ping_builder.add_value(value);
860 builder.Send(ping_builder.Finish());
861}
862
863// Tests that when a message is sent before the bridge starts up, but is
864// configured as reliable, we forward it. Confirm this survives a client reset.
865TEST_F(MessageBridgeTest, ReliableSentBeforeClientStartup) {
866 DoSetShmBase("pi1");
867 // Force ourselves to be "raspberrypi" and allocate everything.
868 FLAGS_override_hostname = "raspberrypi";
869
870 FLAGS_application_name = "sender";
871 aos::ShmEventLoop send_event_loop(&pi1_config.message());
872 aos::Sender<examples::Ping> ping_sender =
873 send_event_loop.MakeSender<examples::Ping>("/test");
874 SendPing(&ping_sender, 1);
875 aos::Sender<examples::Ping> unreliable_ping_sender =
876 send_event_loop.MakeSender<examples::Ping>("/unreliable");
877 SendPing(&unreliable_ping_sender, 1);
878
879 FLAGS_application_name = "pi1_message_bridge_server";
880 aos::ShmEventLoop pi1_server_event_loop(&pi1_config.message());
Austin Schuh744153b2020-12-26 00:02:29 -0800881 pi1_server_event_loop.SetRuntimeRealtimePriority(1);
Austin Schuh4889b182020-11-18 19:11:56 -0800882 MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
883
884 FLAGS_application_name = "pi1_message_bridge_client";
885 aos::ShmEventLoop pi1_client_event_loop(&pi1_config.message());
Austin Schuh744153b2020-12-26 00:02:29 -0800886 pi1_client_event_loop.SetRuntimeRealtimePriority(1);
Austin Schuh4889b182020-11-18 19:11:56 -0800887 MessageBridgeClient pi1_message_bridge_client(&pi1_client_event_loop);
888
889 FLAGS_application_name = "pi1_timestamp";
890 aos::ShmEventLoop pi1_remote_timestamp_event_loop(&pi1_config.message());
891
892 // Now do it for "raspberrypi2", the client.
893 DoSetShmBase("pi2");
894 FLAGS_override_hostname = "raspberrypi2";
895
896 FLAGS_application_name = "pi2_message_bridge_server";
897 aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
Austin Schuh744153b2020-12-26 00:02:29 -0800898 pi2_server_event_loop.SetRuntimeRealtimePriority(1);
Austin Schuh4889b182020-11-18 19:11:56 -0800899 MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
900
901 aos::ShmEventLoop receive_event_loop(&pi2_config.message());
902 aos::Fetcher<examples::Ping> ping_fetcher =
903 receive_event_loop.MakeFetcher<examples::Ping>("/test");
904 aos::Fetcher<examples::Ping> unreliable_ping_fetcher =
905 receive_event_loop.MakeFetcher<examples::Ping>("/unreliable");
906 aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
907 receive_event_loop.MakeFetcher<ClientStatistics>("/pi2/aos");
908
909 const size_t ping_channel_index = configuration::ChannelIndex(
910 receive_event_loop.configuration(), ping_fetcher.channel());
911
912 std::atomic<int> ping_timestamp_count{0};
913 pi1_remote_timestamp_event_loop.MakeWatcher(
914 "/pi1/aos/remote_timestamps/pi2",
Austin Schuh0de30f32020-12-06 12:44:28 -0800915 [ping_channel_index, &ping_timestamp_count](const RemoteMessage &header) {
916 VLOG(1) << "/pi1/aos/remote_timestamps/pi2 RemoteMessage "
917 << aos::FlatbufferToJson(&header);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800918 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuh4889b182020-11-18 19:11:56 -0800919 if (header.channel_index() == ping_channel_index) {
920 ++ping_timestamp_count;
921 }
922 });
923
924 // Before everything starts up, confirm there is no message.
925 EXPECT_FALSE(ping_fetcher.Fetch());
926 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
927
928 // Spin up the persistant pieces.
929 std::thread pi1_server_thread(
930 [&pi1_server_event_loop]() { pi1_server_event_loop.Run(); });
931 std::thread pi1_client_thread(
932 [&pi1_client_event_loop]() { pi1_client_event_loop.Run(); });
933 std::thread pi2_server_thread(
934 [&pi2_server_event_loop]() { pi2_server_event_loop.Run(); });
935
936 // Event used to wait for the timestamp counting thread to start.
937 aos::Event event;
938 std::thread pi1_remote_timestamp_thread(
939 [&pi1_remote_timestamp_event_loop, &event]() {
940 pi1_remote_timestamp_event_loop.OnRun([&event]() { event.Set(); });
941 pi1_remote_timestamp_event_loop.Run();
942 });
943
944 event.Wait();
945
946 {
947 // Now, spin up a client for 2 seconds.
948 LOG(INFO) << "Starting first pi2 MessageBridgeClient";
949 FLAGS_application_name = "pi2_message_bridge_client";
950 aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
Austin Schuh744153b2020-12-26 00:02:29 -0800951 pi2_client_event_loop.SetRuntimeRealtimePriority(1);
Austin Schuh4889b182020-11-18 19:11:56 -0800952 MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
953
954 aos::TimerHandler *quit = pi2_client_event_loop.AddTimer(
955 [&pi2_client_event_loop]() { pi2_client_event_loop.Exit(); });
956 pi2_client_event_loop.OnRun([quit, &pi2_client_event_loop]() {
957 // Stop between timestamps, not exactly on them.
958 quit->Setup(pi2_client_event_loop.monotonic_now() +
959 chrono::milliseconds(2050));
960 });
961
962 // And go!
963 pi2_client_event_loop.Run();
964
965 // Confirm there is no detected duplicate packet.
966 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
967 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
968 ->Get(0)
969 ->duplicate_packets(),
970 0u);
971
972 EXPECT_TRUE(ping_fetcher.Fetch());
973 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
974 EXPECT_EQ(ping_timestamp_count, 1);
975 LOG(INFO) << "Shutting down first pi2 MessageBridgeClient";
976 }
977
978 {
979 // Now, spin up a second client for 2 seconds.
980 LOG(INFO) << "Starting second pi2 MessageBridgeClient";
981 FLAGS_application_name = "pi2_message_bridge_client";
982 aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
Austin Schuh744153b2020-12-26 00:02:29 -0800983 pi2_client_event_loop.SetRuntimeRealtimePriority(1);
Austin Schuh4889b182020-11-18 19:11:56 -0800984 MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
985
986 aos::TimerHandler *quit = pi2_client_event_loop.AddTimer(
987 [&pi2_client_event_loop]() { pi2_client_event_loop.Exit(); });
988 pi2_client_event_loop.OnRun([quit, &pi2_client_event_loop]() {
989 // Stop between timestamps, not exactly on them.
990 quit->Setup(pi2_client_event_loop.monotonic_now() +
991 chrono::milliseconds(5050));
992 });
993
994 // And go!
995 pi2_client_event_loop.Run();
996
997 // Confirm we detect the duplicate packet correctly.
998 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
999 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1000 ->Get(0)
1001 ->duplicate_packets(),
1002 1u);
1003
1004 EXPECT_EQ(ping_timestamp_count, 1);
1005 EXPECT_FALSE(ping_fetcher.Fetch());
1006 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
1007 }
1008
1009 // Shut everyone else down
1010 pi1_server_event_loop.Exit();
1011 pi1_client_event_loop.Exit();
1012 pi2_server_event_loop.Exit();
1013 pi1_remote_timestamp_event_loop.Exit();
1014 pi1_remote_timestamp_thread.join();
1015 pi1_server_thread.join();
1016 pi1_client_thread.join();
1017 pi2_server_thread.join();
1018}
1019
1020// Tests that when a message is sent before the bridge starts up, but is
1021// configured as reliable, we forward it. Confirm this works across server
1022// resets.
1023TEST_F(MessageBridgeTest, ReliableSentBeforeServerStartup) {
1024 // Now do it for "raspberrypi2", the client.
1025 DoSetShmBase("pi2");
1026 FLAGS_override_hostname = "raspberrypi2";
1027
1028 FLAGS_application_name = "pi2_message_bridge_server";
1029 aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
Austin Schuh744153b2020-12-26 00:02:29 -08001030 pi2_server_event_loop.SetRuntimeRealtimePriority(1);
Austin Schuh4889b182020-11-18 19:11:56 -08001031 MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
1032
1033 FLAGS_application_name = "pi2_message_bridge_client";
1034 aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
Austin Schuh744153b2020-12-26 00:02:29 -08001035 pi2_client_event_loop.SetRuntimeRealtimePriority(1);
Austin Schuh4889b182020-11-18 19:11:56 -08001036 MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
1037
1038 aos::ShmEventLoop receive_event_loop(&pi2_config.message());
1039 aos::Fetcher<examples::Ping> ping_fetcher =
1040 receive_event_loop.MakeFetcher<examples::Ping>("/test");
1041 aos::Fetcher<examples::Ping> unreliable_ping_fetcher =
1042 receive_event_loop.MakeFetcher<examples::Ping>("/unreliable");
1043 aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
1044 receive_event_loop.MakeFetcher<ClientStatistics>("/pi2/aos");
1045
1046 DoSetShmBase("pi1");
1047 // Force ourselves to be "raspberrypi" and allocate everything.
1048 FLAGS_override_hostname = "raspberrypi";
1049
1050 FLAGS_application_name = "sender";
1051 aos::ShmEventLoop send_event_loop(&pi1_config.message());
1052 aos::Sender<examples::Ping> ping_sender =
1053 send_event_loop.MakeSender<examples::Ping>("/test");
1054 {
1055 aos::Sender<examples::Ping>::Builder builder = ping_sender.MakeBuilder();
1056 examples::Ping::Builder ping_builder =
1057 builder.MakeBuilder<examples::Ping>();
1058 ping_builder.add_value(1);
1059 builder.Send(ping_builder.Finish());
1060 }
1061
1062 FLAGS_application_name = "pi1_message_bridge_client";
1063 aos::ShmEventLoop pi1_client_event_loop(&pi1_config.message());
Austin Schuh744153b2020-12-26 00:02:29 -08001064 pi1_client_event_loop.SetRuntimeRealtimePriority(1);
Austin Schuh4889b182020-11-18 19:11:56 -08001065 MessageBridgeClient pi1_message_bridge_client(&pi1_client_event_loop);
1066
1067 FLAGS_application_name = "pi1_timestamp";
1068 aos::ShmEventLoop pi1_remote_timestamp_event_loop(&pi1_config.message());
1069
1070 const size_t ping_channel_index = configuration::ChannelIndex(
1071 receive_event_loop.configuration(), ping_fetcher.channel());
1072
1073 std::atomic<int> ping_timestamp_count{0};
1074 pi1_remote_timestamp_event_loop.MakeWatcher(
1075 "/pi1/aos/remote_timestamps/pi2",
Austin Schuh0de30f32020-12-06 12:44:28 -08001076 [ping_channel_index, &ping_timestamp_count](const RemoteMessage &header) {
1077 VLOG(1) << "/pi1/aos/remote_timestamps/pi2 RemoteMessage "
1078 << aos::FlatbufferToJson(&header);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001079 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuh4889b182020-11-18 19:11:56 -08001080 if (header.channel_index() == ping_channel_index) {
1081 ++ping_timestamp_count;
1082 }
1083 });
1084
1085 // Before everything starts up, confirm there is no message.
1086 EXPECT_FALSE(ping_fetcher.Fetch());
1087 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
1088
1089 // Spin up the persistant pieces.
1090 std::thread pi1_client_thread(
1091 [&pi1_client_event_loop]() { pi1_client_event_loop.Run(); });
1092 std::thread pi2_server_thread(
1093 [&pi2_server_event_loop]() { pi2_server_event_loop.Run(); });
1094 std::thread pi2_client_thread(
1095 [&pi2_client_event_loop]() { pi2_client_event_loop.Run(); });
1096
1097 // Event used to wait for the timestamp counting thread to start.
1098 aos::Event event;
1099 std::thread pi1_remote_timestamp_thread(
1100 [&pi1_remote_timestamp_event_loop, &event]() {
1101 pi1_remote_timestamp_event_loop.OnRun([&event]() { event.Set(); });
1102 pi1_remote_timestamp_event_loop.Run();
1103 });
1104
1105 event.Wait();
1106
1107 {
1108 // Now, spin up a server for 2 seconds.
1109 FLAGS_application_name = "pi1_message_bridge_server";
1110 aos::ShmEventLoop pi1_server_event_loop(&pi1_config.message());
Austin Schuh744153b2020-12-26 00:02:29 -08001111 pi1_server_event_loop.SetRuntimeRealtimePriority(1);
Austin Schuh4889b182020-11-18 19:11:56 -08001112 MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
1113
1114 aos::TimerHandler *quit = pi1_server_event_loop.AddTimer(
1115 [&pi1_server_event_loop]() { pi1_server_event_loop.Exit(); });
1116 pi1_server_event_loop.OnRun([quit, &pi1_server_event_loop]() {
1117 // Stop between timestamps, not exactly on them.
1118 quit->Setup(pi1_server_event_loop.monotonic_now() +
1119 chrono::milliseconds(2050));
1120 });
1121
1122 // And go!
1123 pi1_server_event_loop.Run();
1124
1125 // Confirm there is no detected duplicate packet.
1126 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1127 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1128 ->Get(0)
1129 ->duplicate_packets(),
1130 0u);
1131
1132 EXPECT_TRUE(ping_fetcher.Fetch());
1133 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
1134 EXPECT_EQ(ping_timestamp_count, 1);
1135 LOG(INFO) << "Shutting down first pi1 MessageBridgeServer";
1136 }
1137
1138 {
1139 // Now, spin up a second server for 2 seconds.
1140 FLAGS_application_name = "pi1_message_bridge_server";
1141 aos::ShmEventLoop pi1_server_event_loop(&pi1_config.message());
Austin Schuh744153b2020-12-26 00:02:29 -08001142 pi1_server_event_loop.SetRuntimeRealtimePriority(1);
Austin Schuh4889b182020-11-18 19:11:56 -08001143 MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
1144
1145 aos::TimerHandler *quit = pi1_server_event_loop.AddTimer(
1146 [&pi1_server_event_loop]() { pi1_server_event_loop.Exit(); });
1147 pi1_server_event_loop.OnRun([quit, &pi1_server_event_loop]() {
1148 // Stop between timestamps, not exactly on them.
1149 quit->Setup(pi1_server_event_loop.monotonic_now() +
1150 chrono::milliseconds(2050));
1151 });
1152
1153 // And go!
1154 pi1_server_event_loop.Run();
1155
1156 // Confirm we detect the duplicate packet correctly.
1157 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1158 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1159 ->Get(0)
1160 ->duplicate_packets(),
1161 1u);
1162
1163 EXPECT_EQ(ping_timestamp_count, 1);
1164 EXPECT_FALSE(ping_fetcher.Fetch());
1165 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
1166 LOG(INFO) << "Shutting down first pi1 MessageBridgeServer";
1167 }
1168
1169 // Shut everyone else down
1170 pi1_client_event_loop.Exit();
1171 pi2_server_event_loop.Exit();
1172 pi2_client_event_loop.Exit();
1173 pi1_remote_timestamp_event_loop.Exit();
1174 pi1_remote_timestamp_thread.join();
1175 pi1_client_thread.join();
1176 pi2_server_thread.join();
1177 pi2_client_thread.join();
1178}
1179
Austin Schuhe84c3ed2019-12-14 15:29:48 -08001180} // namespace testing
1181} // namespace message_bridge
1182} // namespace aos