blob: 36bdde9c6ee0f418560df2efef65d8aeeff7d255 [file] [log] [blame]
Austin Schuhe84c3ed2019-12-14 15:29:48 -08001#include "gtest/gtest.h"
2
3#include <chrono>
4#include <thread>
5
Austin Schuh2f8fd752020-09-01 22:38:28 -07006#include "absl/strings/str_cat.h"
Austin Schuh4889b182020-11-18 19:11:56 -08007#include "aos/event.h"
Austin Schuhe84c3ed2019-12-14 15:29:48 -08008#include "aos/events/ping_generated.h"
9#include "aos/events/pong_generated.h"
10#include "aos/network/message_bridge_client_lib.h"
11#include "aos/network/message_bridge_server_lib.h"
Jim Ostrowski2192ddb2020-06-24 19:07:31 -070012#include "aos/network/team_number.h"
Austin Schuhe991fe22020-11-18 16:53:39 -080013#include "aos/util/file.h"
Austin Schuhe84c3ed2019-12-14 15:29:48 -080014
15namespace aos {
Austin Schuh2f8fd752020-09-01 22:38:28 -070016void SetShmBase(const std::string_view base);
17
Austin Schuhe84c3ed2019-12-14 15:29:48 -080018namespace message_bridge {
19namespace testing {
20
21namespace chrono = std::chrono;
22
Austin Schuhe991fe22020-11-18 16:53:39 -080023std::string ShmBase(const std::string_view node) {
Austin Schuh2f8fd752020-09-01 22:38:28 -070024 const char *tmpdir_c_str = getenv("TEST_TMPDIR");
25 if (tmpdir_c_str != nullptr) {
Austin Schuhe991fe22020-11-18 16:53:39 -080026 return absl::StrCat(tmpdir_c_str, "/", node);
Austin Schuh2f8fd752020-09-01 22:38:28 -070027 } else {
Austin Schuhe991fe22020-11-18 16:53:39 -080028 return absl::StrCat("/dev/shm/", node);
Austin Schuh2f8fd752020-09-01 22:38:28 -070029 }
30}
31
Austin Schuhe991fe22020-11-18 16:53:39 -080032void DoSetShmBase(const std::string_view node) {
33 aos::SetShmBase(ShmBase(node));
34}
35
36class MessageBridgeTest : public ::testing::Test {
37 public:
38 MessageBridgeTest()
39 : pi1_config(aos::configuration::ReadConfig(
40 "aos/network/message_bridge_test_server_config.json")),
41 pi2_config(aos::configuration::ReadConfig(
42 "aos/network/message_bridge_test_client_config.json")) {
43 util::UnlinkRecursive(ShmBase("pi1"));
44 util::UnlinkRecursive(ShmBase("pi2"));
45 }
46
47 aos::FlatbufferDetachedBuffer<aos::Configuration> pi1_config;
48 aos::FlatbufferDetachedBuffer<aos::Configuration> pi2_config;
49};
50
Austin Schuhe84c3ed2019-12-14 15:29:48 -080051// Test that we can send a ping message over sctp and receive it.
Austin Schuhe991fe22020-11-18 16:53:39 -080052TEST_F(MessageBridgeTest, PingPong) {
Austin Schuhe84c3ed2019-12-14 15:29:48 -080053 // This is rather annoying to set up. We need to start up a client and
54 // server, on the same node, but get them to think that they are on different
55 // nodes.
56 //
57 // We then get to wait until they are connected.
58 //
59 // After they are connected, we send a Ping message.
60 //
61 // On the other end, we receive a Pong message.
62 //
63 // But, we need the client to not post directly to "/test" like it would in a
64 // real system, otherwise we will re-send the ping message... So, use an
65 // application specific map to have the client post somewhere else.
66 //
67 // To top this all off, each of these needs to be done with a ShmEventLoop,
68 // which needs to run in a separate thread... And it is really hard to get
69 // everything started up reliably. So just be super generous on timeouts and
70 // hope for the best. We can be more generous in the future if we need to.
71 //
72 // We are faking the application names by passing in --application_name=foo
Austin Schuh2f8fd752020-09-01 22:38:28 -070073 DoSetShmBase("pi1");
Austin Schuhe84c3ed2019-12-14 15:29:48 -080074 FLAGS_application_name = "pi1_message_bridge_server";
75 // Force ourselves to be "raspberrypi" and allocate everything.
76 FLAGS_override_hostname = "raspberrypi";
Austin Schuh2f8fd752020-09-01 22:38:28 -070077
Austin Schuhe991fe22020-11-18 16:53:39 -080078 aos::ShmEventLoop pi1_server_event_loop(&pi1_config.message());
Austin Schuh7bc59052020-02-16 23:48:33 -080079 MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
80
81 FLAGS_application_name = "pi1_message_bridge_client";
Austin Schuhe991fe22020-11-18 16:53:39 -080082 aos::ShmEventLoop pi1_client_event_loop(&pi1_config.message());
Austin Schuh7bc59052020-02-16 23:48:33 -080083 MessageBridgeClient pi1_message_bridge_client(&pi1_client_event_loop);
Austin Schuhe84c3ed2019-12-14 15:29:48 -080084
85 // And build the app which sends the pings.
86 FLAGS_application_name = "ping";
Austin Schuhe991fe22020-11-18 16:53:39 -080087 aos::ShmEventLoop ping_event_loop(&pi1_config.message());
Austin Schuhe84c3ed2019-12-14 15:29:48 -080088 aos::Sender<examples::Ping> ping_sender =
89 ping_event_loop.MakeSender<examples::Ping>("/test");
90
Austin Schuhe991fe22020-11-18 16:53:39 -080091 aos::ShmEventLoop pi1_test_event_loop(&pi1_config.message());
Austin Schuh2f8fd752020-09-01 22:38:28 -070092 aos::Fetcher<logger::MessageHeader> message_header_fetcher1 =
93 pi1_test_event_loop.MakeFetcher<logger::MessageHeader>(
94 "/pi1/aos/remote_timestamps/pi2");
95
96 // Fetchers for confirming the remote timestamps made it.
97 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
98 ping_event_loop.MakeFetcher<examples::Ping>("/test");
99 aos::Fetcher<Timestamp> pi1_on_pi1_timestamp_fetcher =
100 ping_event_loop.MakeFetcher<Timestamp>("/aos");
101
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800102 // Now do it for "raspberrypi2", the client.
103 FLAGS_application_name = "pi2_message_bridge_client";
104 FLAGS_override_hostname = "raspberrypi2";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700105 DoSetShmBase("pi2");
106
Austin Schuh5344c352020-04-12 17:04:26 -0700107 aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
Austin Schuh7bc59052020-02-16 23:48:33 -0800108 MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
109
110 FLAGS_application_name = "pi2_message_bridge_server";
Austin Schuh5344c352020-04-12 17:04:26 -0700111 aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
Austin Schuh7bc59052020-02-16 23:48:33 -0800112 MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800113
114 // And build the app which sends the pongs.
115 FLAGS_application_name = "pong";
Austin Schuh5344c352020-04-12 17:04:26 -0700116 aos::ShmEventLoop pong_event_loop(&pi2_config.message());
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800117
Austin Schuh7bc59052020-02-16 23:48:33 -0800118 // And build the app for testing.
119 FLAGS_application_name = "test";
Austin Schuh5344c352020-04-12 17:04:26 -0700120 aos::ShmEventLoop test_event_loop(&pi2_config.message());
Austin Schuh7bc59052020-02-16 23:48:33 -0800121
122 aos::Fetcher<ClientStatistics> client_statistics_fetcher =
123 test_event_loop.MakeFetcher<ClientStatistics>("/aos");
Austin Schuh2f8fd752020-09-01 22:38:28 -0700124 aos::Fetcher<logger::MessageHeader> message_header_fetcher2 =
125 test_event_loop.MakeFetcher<logger::MessageHeader>(
126 "/pi2/aos/remote_timestamps/pi1");
127
128 // Event loop for fetching data delivered to pi2 from pi1 to match up
129 // messages.
130 aos::ShmEventLoop delivered_messages_event_loop(&pi2_config.message());
131 aos::Fetcher<Timestamp> pi1_on_pi2_timestamp_fetcher =
132 delivered_messages_event_loop.MakeFetcher<Timestamp>("/pi1/aos");
133 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
134 delivered_messages_event_loop.MakeFetcher<examples::Ping>("/test");
135 EXPECT_FALSE(ping_on_pi2_fetcher.Fetch());
136 EXPECT_FALSE(pi1_on_pi2_timestamp_fetcher.Fetch());
Austin Schuh7bc59052020-02-16 23:48:33 -0800137
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800138 // Count the pongs.
139 int pong_count = 0;
140 pong_event_loop.MakeWatcher(
Austin Schuh2f8fd752020-09-01 22:38:28 -0700141 "/test", [&pong_count](const examples::Ping &ping) {
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800142 ++pong_count;
143 LOG(INFO) << "Got ping back " << FlatbufferToJson(&ping);
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800144 });
145
146 FLAGS_override_hostname = "";
147
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800148 // Wait until we are connected, then send.
149 int ping_count = 0;
Austin Schuh7bc59052020-02-16 23:48:33 -0800150 int pi1_server_statistics_count = 0;
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800151 ping_event_loop.MakeWatcher(
Austin Schuh196a4452020-03-15 23:12:03 -0700152 "/pi1/aos",
Austin Schuh7bc59052020-02-16 23:48:33 -0800153 [&ping_count, &pi2_client_event_loop, &ping_sender,
154 &pi1_server_statistics_count](const ServerStatistics &stats) {
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800155 LOG(INFO) << FlatbufferToJson(&stats);
156
157 ASSERT_TRUE(stats.has_connections());
158 EXPECT_EQ(stats.connections()->size(), 1);
159
160 bool connected = false;
161 for (const ServerConnection *connection : *stats.connections()) {
Austin Schuh7bc59052020-02-16 23:48:33 -0800162 // Confirm that we are estimating the server time offset correctly. It
163 // should be about 0 since we are on the same machine here.
164 if (connection->has_monotonic_offset()) {
165 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
166 chrono::milliseconds(1));
167 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
168 chrono::milliseconds(-1));
169 ++pi1_server_statistics_count;
170 }
171
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800172 if (connection->node()->name()->string_view() ==
Austin Schuh7bc59052020-02-16 23:48:33 -0800173 pi2_client_event_loop.node()->name()->string_view()) {
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800174 if (connection->state() == State::CONNECTED) {
175 connected = true;
176 }
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800177 }
178 }
179
180 if (connected) {
181 LOG(INFO) << "Connected! Sent ping.";
182 auto builder = ping_sender.MakeBuilder();
183 examples::Ping::Builder ping_builder =
184 builder.MakeBuilder<examples::Ping>();
185 ping_builder.add_value(ping_count + 971);
186 builder.Send(ping_builder.Finish());
187 ++ping_count;
188 }
189 });
190
Austin Schuh7bc59052020-02-16 23:48:33 -0800191 // Confirm both client and server statistics messages have decent offsets in
192 // them.
193 int pi2_server_statistics_count = 0;
Austin Schuh196a4452020-03-15 23:12:03 -0700194 pong_event_loop.MakeWatcher("/pi2/aos", [&pi2_server_statistics_count](
Austin Schuh7bc59052020-02-16 23:48:33 -0800195 const ServerStatistics &stats) {
196 LOG(INFO) << FlatbufferToJson(&stats);
197 for (const ServerConnection *connection : *stats.connections()) {
198 if (connection->has_monotonic_offset()) {
199 ++pi2_server_statistics_count;
200 // Confirm that we are estimating the server time offset correctly. It
201 // should be about 0 since we are on the same machine here.
202 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
203 chrono::milliseconds(1));
204 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
205 chrono::milliseconds(-1));
206 }
207 }
208 });
209
210 int pi1_client_statistics_count = 0;
Austin Schuh5344c352020-04-12 17:04:26 -0700211 ping_event_loop.MakeWatcher("/pi1/aos", [&pi1_client_statistics_count](
212 const ClientStatistics &stats) {
213 LOG(INFO) << FlatbufferToJson(&stats);
Austin Schuh7bc59052020-02-16 23:48:33 -0800214
Austin Schuh5344c352020-04-12 17:04:26 -0700215 for (const ClientConnection *connection : *stats.connections()) {
216 if (connection->has_monotonic_offset()) {
217 ++pi1_client_statistics_count;
218 // It takes at least 10 microseconds to send a message between the
219 // client and server. The min (filtered) time shouldn't be over 10
220 // milliseconds on localhost. This might have to bump up if this is
221 // proving flaky.
222 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
223 chrono::milliseconds(10));
224 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
225 chrono::microseconds(10));
226 }
227 }
228 });
Austin Schuh7bc59052020-02-16 23:48:33 -0800229
230 int pi2_client_statistics_count = 0;
Austin Schuh196a4452020-03-15 23:12:03 -0700231 pong_event_loop.MakeWatcher("/pi2/aos", [&pi2_client_statistics_count](
Austin Schuh7bc59052020-02-16 23:48:33 -0800232 const ClientStatistics &stats) {
233 LOG(INFO) << FlatbufferToJson(&stats);
234
235 for (const ClientConnection *connection : *stats.connections()) {
236 if (connection->has_monotonic_offset()) {
237 ++pi2_client_statistics_count;
238 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
239 chrono::milliseconds(10));
240 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
241 chrono::microseconds(10));
242 }
243 }
244 });
245
Austin Schuh196a4452020-03-15 23:12:03 -0700246 ping_event_loop.MakeWatcher("/pi1/aos", [](const Timestamp &timestamp) {
Austin Schuh7bc59052020-02-16 23:48:33 -0800247 EXPECT_TRUE(timestamp.has_offsets());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700248 LOG(INFO) << "/pi1/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuh7bc59052020-02-16 23:48:33 -0800249 });
Austin Schuh196a4452020-03-15 23:12:03 -0700250 pong_event_loop.MakeWatcher("/pi2/aos", [](const Timestamp &timestamp) {
Austin Schuh7bc59052020-02-16 23:48:33 -0800251 EXPECT_TRUE(timestamp.has_offsets());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700252 LOG(INFO) << "/pi2/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuh7bc59052020-02-16 23:48:33 -0800253 });
254
255 // Run for 5 seconds to make sure we have time to estimate the offset.
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800256 aos::TimerHandler *quit = ping_event_loop.AddTimer(
257 [&ping_event_loop]() { ping_event_loop.Exit(); });
258 ping_event_loop.OnRun([quit, &ping_event_loop]() {
Austin Schuh7bc59052020-02-16 23:48:33 -0800259 // Stop between timestamps, not exactly on them.
260 quit->Setup(ping_event_loop.monotonic_now() + chrono::milliseconds(5050));
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800261 });
262
Austin Schuh2f8fd752020-09-01 22:38:28 -0700263 // Find the channel index for both the /pi1/aos Timestamp channel and Ping
264 // channel.
265 const size_t pi1_timestamp_channel = configuration::ChannelIndex(
266 pong_event_loop.configuration(), pi1_on_pi2_timestamp_fetcher.channel());
267 const size_t ping_timestamp_channel =
268 configuration::ChannelIndex(delivered_messages_event_loop.configuration(),
269 ping_on_pi2_fetcher.channel());
270
271 for (const Channel *channel : *ping_event_loop.configuration()->channels()) {
272 VLOG(1) << "Channel "
273 << configuration::ChannelIndex(ping_event_loop.configuration(),
274 channel)
275 << " " << configuration::CleanedChannelToString(channel);
276 }
277
278 // For each remote timestamp we get back, confirm that it is either a ping
279 // message, or a timestamp we sent out. Also confirm that the timestamps are
280 // correct.
281 ping_event_loop.MakeWatcher(
282 "/pi1/aos/remote_timestamps/pi2",
283 [pi1_timestamp_channel, ping_timestamp_channel, &ping_on_pi2_fetcher,
284 &ping_on_pi1_fetcher, &pi1_on_pi2_timestamp_fetcher,
285 &pi1_on_pi1_timestamp_fetcher](const logger::MessageHeader &header) {
286 VLOG(1) << aos::FlatbufferToJson(&header);
287
288 const aos::monotonic_clock::time_point header_monotonic_sent_time(
289 chrono::nanoseconds(header.monotonic_sent_time()));
290 const aos::realtime_clock::time_point header_realtime_sent_time(
291 chrono::nanoseconds(header.realtime_sent_time()));
292 const aos::monotonic_clock::time_point header_monotonic_remote_time(
293 chrono::nanoseconds(header.monotonic_remote_time()));
294 const aos::realtime_clock::time_point header_realtime_remote_time(
295 chrono::nanoseconds(header.realtime_remote_time()));
296
297 const Context *pi1_context = nullptr;
298 const Context *pi2_context = nullptr;
299
300 if (header.channel_index() == pi1_timestamp_channel) {
301 // Find the forwarded message.
302 while (pi1_on_pi2_timestamp_fetcher.context().monotonic_event_time <
303 header_monotonic_sent_time) {
304 ASSERT_TRUE(pi1_on_pi2_timestamp_fetcher.FetchNext());
305 }
306
307 // And the source message.
308 while (pi1_on_pi1_timestamp_fetcher.context().monotonic_event_time <
309 header_monotonic_remote_time) {
310 ASSERT_TRUE(pi1_on_pi1_timestamp_fetcher.FetchNext());
311 }
312
313 pi1_context = &pi1_on_pi1_timestamp_fetcher.context();
314 pi2_context = &pi1_on_pi2_timestamp_fetcher.context();
315 } else if (header.channel_index() == ping_timestamp_channel) {
316 // Find the forwarded message.
317 while (ping_on_pi2_fetcher.context().monotonic_event_time <
318 header_monotonic_sent_time) {
319 ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
320 }
321
322 // And the source message.
323 while (ping_on_pi1_fetcher.context().monotonic_event_time <
324 header_monotonic_remote_time) {
325 ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
326 }
327
328 pi1_context = &ping_on_pi1_fetcher.context();
329 pi2_context = &ping_on_pi2_fetcher.context();
330 } else {
331 LOG(FATAL) << "Unknown channel";
332 }
333
334 // Confirm the forwarded message has matching timestamps to the
335 // timestamps we got back.
336 EXPECT_EQ(pi2_context->queue_index, header.queue_index());
337 EXPECT_EQ(pi2_context->monotonic_event_time,
338 header_monotonic_sent_time);
339 EXPECT_EQ(pi2_context->realtime_event_time, header_realtime_sent_time);
340 EXPECT_EQ(pi2_context->realtime_remote_time,
341 header_realtime_remote_time);
342 EXPECT_EQ(pi2_context->monotonic_remote_time,
343 header_monotonic_remote_time);
344
345 // Confirm the forwarded message also matches the source message.
346 EXPECT_EQ(pi1_context->queue_index, header.queue_index());
347 EXPECT_EQ(pi1_context->monotonic_event_time,
348 header_monotonic_remote_time);
349 EXPECT_EQ(pi1_context->realtime_event_time,
350 header_realtime_remote_time);
351 });
352
Austin Schuh7bc59052020-02-16 23:48:33 -0800353 // Start everything up. Pong is the only thing we don't know how to wait on,
354 // so start it first.
355 std::thread pong_thread([&pong_event_loop]() { pong_event_loop.Run(); });
356
357 std::thread pi1_server_thread(
358 [&pi1_server_event_loop]() { pi1_server_event_loop.Run(); });
359 std::thread pi1_client_thread(
360 [&pi1_client_event_loop]() { pi1_client_event_loop.Run(); });
361 std::thread pi2_client_thread(
362 [&pi2_client_event_loop]() { pi2_client_event_loop.Run(); });
363 std::thread pi2_server_thread(
364 [&pi2_server_event_loop]() { pi2_server_event_loop.Run(); });
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800365
366 // And go!
367 ping_event_loop.Run();
368
369 // Shut everyone else down
Austin Schuh7bc59052020-02-16 23:48:33 -0800370 pi1_server_event_loop.Exit();
371 pi1_client_event_loop.Exit();
372 pi2_client_event_loop.Exit();
373 pi2_server_event_loop.Exit();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800374 pong_event_loop.Exit();
Austin Schuh7bc59052020-02-16 23:48:33 -0800375 pi1_server_thread.join();
376 pi1_client_thread.join();
377 pi2_client_thread.join();
378 pi2_server_thread.join();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800379 pong_thread.join();
380
381 // Make sure we sent something.
382 EXPECT_GE(ping_count, 1);
383 // And got something back.
384 EXPECT_GE(pong_count, 1);
Austin Schuh7bc59052020-02-16 23:48:33 -0800385
386 // Confirm that we are estimating a monotonic offset on the client.
387 ASSERT_TRUE(client_statistics_fetcher.Fetch());
388
389 EXPECT_EQ(client_statistics_fetcher->connections()->size(), 1u);
390 EXPECT_EQ(client_statistics_fetcher->connections()
391 ->Get(0)
392 ->node()
393 ->name()
394 ->string_view(),
395 "pi1");
396
397 // Make sure the offset in one direction is less than a second.
398 EXPECT_GT(
399 client_statistics_fetcher->connections()->Get(0)->monotonic_offset(), 0);
400 EXPECT_LT(
401 client_statistics_fetcher->connections()->Get(0)->monotonic_offset(),
402 1000000000);
403
404 EXPECT_GE(pi1_server_statistics_count, 2);
405 EXPECT_GE(pi2_server_statistics_count, 2);
406 EXPECT_GE(pi1_client_statistics_count, 2);
407 EXPECT_GE(pi2_client_statistics_count, 2);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700408
409 // Confirm we got timestamps back!
410 EXPECT_TRUE(message_header_fetcher1.Fetch());
411 EXPECT_TRUE(message_header_fetcher2.Fetch());
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800412}
413
Austin Schuh5344c352020-04-12 17:04:26 -0700414// Test that the client disconnecting triggers the server offsets on both sides
415// to clear.
Austin Schuhe991fe22020-11-18 16:53:39 -0800416TEST_F(MessageBridgeTest, ClientRestart) {
Austin Schuh5344c352020-04-12 17:04:26 -0700417 // This is rather annoying to set up. We need to start up a client and
418 // server, on the same node, but get them to think that they are on different
419 // nodes.
420 //
421 // We need the client to not post directly to "/test" like it would in a
422 // real system, otherwise we will re-send the ping message... So, use an
423 // application specific map to have the client post somewhere else.
424 //
425 // To top this all off, each of these needs to be done with a ShmEventLoop,
426 // which needs to run in a separate thread... And it is really hard to get
427 // everything started up reliably. So just be super generous on timeouts and
428 // hope for the best. We can be more generous in the future if we need to.
429 //
430 // We are faking the application names by passing in --application_name=foo
Austin Schuh5344c352020-04-12 17:04:26 -0700431 FLAGS_application_name = "pi1_message_bridge_server";
432 // Force ourselves to be "raspberrypi" and allocate everything.
433 FLAGS_override_hostname = "raspberrypi";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700434 DoSetShmBase("pi1");
Austin Schuhe991fe22020-11-18 16:53:39 -0800435 aos::ShmEventLoop pi1_server_event_loop(&pi1_config.message());
Austin Schuh5344c352020-04-12 17:04:26 -0700436 MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
437
438 FLAGS_application_name = "pi1_message_bridge_client";
Austin Schuhe991fe22020-11-18 16:53:39 -0800439 aos::ShmEventLoop pi1_client_event_loop(&pi1_config.message());
Austin Schuh5344c352020-04-12 17:04:26 -0700440 MessageBridgeClient pi1_message_bridge_client(&pi1_client_event_loop);
441
442 // And build the app for testing.
443 FLAGS_application_name = "test1";
Austin Schuhe991fe22020-11-18 16:53:39 -0800444 aos::ShmEventLoop pi1_test_event_loop(&pi1_config.message());
Austin Schuh5344c352020-04-12 17:04:26 -0700445 aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
446 pi1_test_event_loop.MakeFetcher<ServerStatistics>("/pi1/aos");
447
448 // Now do it for "raspberrypi2", the client.
449 FLAGS_override_hostname = "raspberrypi2";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700450 DoSetShmBase("pi2");
Austin Schuh5344c352020-04-12 17:04:26 -0700451 FLAGS_application_name = "pi2_message_bridge_server";
452 aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
453 MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
454
455 // And build the app for testing.
456 FLAGS_application_name = "test2";
457 aos::ShmEventLoop pi2_test_event_loop(&pi2_config.message());
458 aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
459 pi2_test_event_loop.MakeFetcher<ServerStatistics>("/pi2/aos");
460
461 // Wait until we are connected, then send.
462 pi1_test_event_loop.MakeWatcher(
463 "/pi1/aos", [](const ServerStatistics &stats) {
464 LOG(INFO) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
465 });
466
467 pi2_test_event_loop.MakeWatcher(
468 "/pi2/aos", [](const ServerStatistics &stats) {
469 LOG(INFO) << "pi2 ServerStatistics " << FlatbufferToJson(&stats);
470 });
471
472 pi1_test_event_loop.MakeWatcher(
473 "/pi1/aos", [](const ClientStatistics &stats) {
474 LOG(INFO) << "pi1 ClientStatistics " << FlatbufferToJson(&stats);
475 });
476
477 pi2_test_event_loop.MakeWatcher(
478 "/pi2/aos", [](const ClientStatistics &stats) {
479 LOG(INFO) << "pi2 ClientStatistics " << FlatbufferToJson(&stats);
480 });
481
482 pi1_test_event_loop.MakeWatcher("/pi1/aos", [](const Timestamp &timestamp) {
483 LOG(INFO) << "pi1 Timestamp " << FlatbufferToJson(&timestamp);
484 });
485 pi2_test_event_loop.MakeWatcher("/pi2/aos", [](const Timestamp &timestamp) {
486 LOG(INFO) << "pi2 Timestamp " << FlatbufferToJson(&timestamp);
487 });
488
489 // Start everything up. Pong is the only thing we don't know how to wait on,
490 // so start it first.
491 std::thread pi1_test_thread(
492 [&pi1_test_event_loop]() { pi1_test_event_loop.Run(); });
493 std::thread pi2_test_thread(
494 [&pi2_test_event_loop]() { pi2_test_event_loop.Run(); });
495
496 std::thread pi1_server_thread(
497 [&pi1_server_event_loop]() { pi1_server_event_loop.Run(); });
498 std::thread pi1_client_thread(
499 [&pi1_client_event_loop]() { pi1_client_event_loop.Run(); });
500 std::thread pi2_server_thread(
501 [&pi2_server_event_loop]() { pi2_server_event_loop.Run(); });
502
503 {
504 FLAGS_application_name = "pi2_message_bridge_client";
505 aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
506 MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
507
508 // Run for 5 seconds to make sure we have time to estimate the offset.
509 aos::TimerHandler *const quit = pi2_client_event_loop.AddTimer(
510 [&pi2_client_event_loop]() { pi2_client_event_loop.Exit(); });
511 pi2_client_event_loop.OnRun([quit, &pi2_client_event_loop]() {
512 // Stop between timestamps, not exactly on them.
513 quit->Setup(pi2_client_event_loop.monotonic_now() +
514 chrono::milliseconds(3050));
515 });
516
517 // And go!
518 pi2_client_event_loop.Run();
519
520 // Now confirm we are synchronized.
521 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
522 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
523
524 const ServerConnection *const pi1_connection =
525 pi1_server_statistics_fetcher->connections()->Get(0);
526 const ServerConnection *const pi2_connection =
527 pi2_server_statistics_fetcher->connections()->Get(0);
528
529 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
530 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
531 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
532 chrono::milliseconds(1));
533 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
534 chrono::milliseconds(-1));
535
536 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
537 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
538 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
539 chrono::milliseconds(1));
540 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
541 chrono::milliseconds(-1));
542 }
543
544 std::this_thread::sleep_for(std::chrono::seconds(2));
545
546 {
547 // Now confirm we are un-synchronized.
548 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
549 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
550 const ServerConnection *const pi1_connection =
551 pi1_server_statistics_fetcher->connections()->Get(0);
552 const ServerConnection *const pi2_connection =
553 pi2_server_statistics_fetcher->connections()->Get(0);
554
555 EXPECT_EQ(pi1_connection->state(), State::DISCONNECTED);
556 EXPECT_FALSE(pi1_connection->has_monotonic_offset());
557 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
558 EXPECT_FALSE(pi2_connection->has_monotonic_offset());
559 }
560
561 {
562 FLAGS_application_name = "pi2_message_bridge_client";
563 aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
564 MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
565
566 // Run for 5 seconds to make sure we have time to estimate the offset.
567 aos::TimerHandler *const quit = pi2_client_event_loop.AddTimer(
568 [&pi2_client_event_loop]() { pi2_client_event_loop.Exit(); });
569 pi2_client_event_loop.OnRun([quit, &pi2_client_event_loop]() {
570 // Stop between timestamps, not exactly on them.
571 quit->Setup(pi2_client_event_loop.monotonic_now() +
572 chrono::milliseconds(3050));
573 });
574
575 // And go!
576 pi2_client_event_loop.Run();
577
578 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
579 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
580
581 // Now confirm we are synchronized again.
582 const ServerConnection *const pi1_connection =
583 pi1_server_statistics_fetcher->connections()->Get(0);
584 const ServerConnection *const pi2_connection =
585 pi2_server_statistics_fetcher->connections()->Get(0);
586
587 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
588 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
589 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
590 chrono::milliseconds(1));
591 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
592 chrono::milliseconds(-1));
593
594 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
595 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
596 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
597 chrono::milliseconds(1));
598 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
599 chrono::milliseconds(-1));
600 }
601
602 // Shut everyone else down
603 pi1_server_event_loop.Exit();
604 pi1_client_event_loop.Exit();
605 pi2_server_event_loop.Exit();
606 pi1_test_event_loop.Exit();
607 pi2_test_event_loop.Exit();
608 pi1_server_thread.join();
609 pi1_client_thread.join();
610 pi2_server_thread.join();
611 pi1_test_thread.join();
612 pi2_test_thread.join();
613}
614
615// Test that the server disconnecting triggers the server offsets on the other
616// side to clear, along with the other client.
Austin Schuhe991fe22020-11-18 16:53:39 -0800617TEST_F(MessageBridgeTest, ServerRestart) {
Austin Schuh5344c352020-04-12 17:04:26 -0700618 // This is rather annoying to set up. We need to start up a client and
619 // server, on the same node, but get them to think that they are on different
620 // nodes.
621 //
622 // We need the client to not post directly to "/test" like it would in a
623 // real system, otherwise we will re-send the ping message... So, use an
624 // application specific map to have the client post somewhere else.
625 //
626 // To top this all off, each of these needs to be done with a ShmEventLoop,
627 // which needs to run in a separate thread... And it is really hard to get
628 // everything started up reliably. So just be super generous on timeouts and
629 // hope for the best. We can be more generous in the future if we need to.
630 //
631 // We are faking the application names by passing in --application_name=foo
Austin Schuh5344c352020-04-12 17:04:26 -0700632 FLAGS_application_name = "pi1_message_bridge_server";
633 // Force ourselves to be "raspberrypi" and allocate everything.
634 FLAGS_override_hostname = "raspberrypi";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700635 DoSetShmBase("pi1");
Austin Schuhe991fe22020-11-18 16:53:39 -0800636 aos::ShmEventLoop pi1_server_event_loop(&pi1_config.message());
Austin Schuh5344c352020-04-12 17:04:26 -0700637 MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
638
639 FLAGS_application_name = "pi1_message_bridge_client";
Austin Schuhe991fe22020-11-18 16:53:39 -0800640 aos::ShmEventLoop pi1_client_event_loop(&pi1_config.message());
Austin Schuh5344c352020-04-12 17:04:26 -0700641 MessageBridgeClient pi1_message_bridge_client(&pi1_client_event_loop);
642
643 // And build the app for testing.
644 FLAGS_application_name = "test1";
Austin Schuhe991fe22020-11-18 16:53:39 -0800645 aos::ShmEventLoop pi1_test_event_loop(&pi1_config.message());
Austin Schuh5344c352020-04-12 17:04:26 -0700646 aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
647 pi1_test_event_loop.MakeFetcher<ServerStatistics>("/pi1/aos");
648 aos::Fetcher<ClientStatistics> pi1_client_statistics_fetcher =
649 pi1_test_event_loop.MakeFetcher<ClientStatistics>("/pi1/aos");
650
651 // Now do it for "raspberrypi2", the client.
652 FLAGS_override_hostname = "raspberrypi2";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700653 DoSetShmBase("pi2");
Austin Schuh5344c352020-04-12 17:04:26 -0700654 FLAGS_application_name = "pi2_message_bridge_client";
655 aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
656 MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
657
658 // And build the app for testing.
659 FLAGS_application_name = "test2";
660 aos::ShmEventLoop pi2_test_event_loop(&pi2_config.message());
661 aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
662 pi2_test_event_loop.MakeFetcher<ServerStatistics>("/pi2/aos");
663
664 // Wait until we are connected, then send.
665 pi1_test_event_loop.MakeWatcher(
666 "/pi1/aos", [](const ServerStatistics &stats) {
667 LOG(INFO) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
668 });
669
670 // Confirm both client and server statistics messages have decent offsets in
671 // them.
672 pi2_test_event_loop.MakeWatcher(
673 "/pi2/aos", [](const ServerStatistics &stats) {
674 LOG(INFO) << "pi2 ServerStatistics " << FlatbufferToJson(&stats);
675 });
676
677 pi1_test_event_loop.MakeWatcher(
678 "/pi1/aos", [](const ClientStatistics &stats) {
679 LOG(INFO) << "pi1 ClientStatistics " << FlatbufferToJson(&stats);
680 });
681
682 pi2_test_event_loop.MakeWatcher(
683 "/pi2/aos", [](const ClientStatistics &stats) {
684 LOG(INFO) << "pi2 ClientStatistics " << FlatbufferToJson(&stats);
685 });
686
687 pi1_test_event_loop.MakeWatcher("/pi1/aos", [](const Timestamp &timestamp) {
688 LOG(INFO) << "pi1 Timestamp " << FlatbufferToJson(&timestamp);
689 });
690 pi2_test_event_loop.MakeWatcher("/pi2/aos", [](const Timestamp &timestamp) {
691 LOG(INFO) << "pi2 Timestamp " << FlatbufferToJson(&timestamp);
692 });
693
694 // Start everything up. Pong is the only thing we don't know how to wait on,
695 // so start it first.
696 std::thread pi1_test_thread(
697 [&pi1_test_event_loop]() { pi1_test_event_loop.Run(); });
698 std::thread pi2_test_thread(
699 [&pi2_test_event_loop]() { pi2_test_event_loop.Run(); });
700
701 std::thread pi1_server_thread(
702 [&pi1_server_event_loop]() { pi1_server_event_loop.Run(); });
703 std::thread pi1_client_thread(
704 [&pi1_client_event_loop]() { pi1_client_event_loop.Run(); });
705 std::thread pi2_client_thread(
706 [&pi2_client_event_loop]() { pi2_client_event_loop.Run(); });
707
708 {
709 FLAGS_application_name = "pi2_message_bridge_server";
710 aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
711 MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
712
713 // Run for 5 seconds to make sure we have time to estimate the offset.
714 aos::TimerHandler *const quit = pi2_server_event_loop.AddTimer(
715 [&pi2_server_event_loop]() { pi2_server_event_loop.Exit(); });
716 pi2_server_event_loop.OnRun([quit, &pi2_server_event_loop]() {
717 // Stop between timestamps, not exactly on them.
718 quit->Setup(pi2_server_event_loop.monotonic_now() +
719 chrono::milliseconds(3050));
720 });
721
722 // And go!
723 pi2_server_event_loop.Run();
724
725 // Now confirm we are synchronized.
726 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
727 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
728
729 const ServerConnection *const pi1_connection =
730 pi1_server_statistics_fetcher->connections()->Get(0);
731 const ServerConnection *const pi2_connection =
732 pi2_server_statistics_fetcher->connections()->Get(0);
733
734 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
735 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
736 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
737 chrono::milliseconds(1));
738 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
739 chrono::milliseconds(-1));
740
741 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
742 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
743 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
744 chrono::milliseconds(1));
745 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
746 chrono::milliseconds(-1));
747 }
748
749 std::this_thread::sleep_for(std::chrono::seconds(2));
750
751 {
752 // And confirm we are unsynchronized.
753 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
754 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
755
756 const ServerConnection *const pi1_server_connection =
757 pi1_server_statistics_fetcher->connections()->Get(0);
758 const ClientConnection *const pi1_client_connection =
759 pi1_client_statistics_fetcher->connections()->Get(0);
760
761 EXPECT_EQ(pi1_server_connection->state(), State::CONNECTED);
762 EXPECT_FALSE(pi1_server_connection->has_monotonic_offset());
763 EXPECT_EQ(pi1_client_connection->state(), State::DISCONNECTED);
764 EXPECT_FALSE(pi1_client_connection->has_monotonic_offset());
765 }
766
767 {
768 FLAGS_application_name = "pi2_message_bridge_server";
769 aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
770 MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
771
772 // Run for 5 seconds to make sure we have time to estimate the offset.
773 aos::TimerHandler *const quit = pi2_server_event_loop.AddTimer(
774 [&pi2_server_event_loop]() { pi2_server_event_loop.Exit(); });
775 pi2_server_event_loop.OnRun([quit, &pi2_server_event_loop]() {
776 // Stop between timestamps, not exactly on them.
777 quit->Setup(pi2_server_event_loop.monotonic_now() +
778 chrono::milliseconds(3050));
779 });
780
781 // And go!
782 pi2_server_event_loop.Run();
783
784 // And confirm we are synchronized again.
785 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
786 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
787
788 const ServerConnection *const pi1_connection =
789 pi1_server_statistics_fetcher->connections()->Get(0);
790 const ServerConnection *const pi2_connection =
791 pi2_server_statistics_fetcher->connections()->Get(0);
792
793 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
794 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
795 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
796 chrono::milliseconds(1));
797 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
798 chrono::milliseconds(-1));
799
800 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
801 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
802 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
803 chrono::milliseconds(1));
804 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
805 chrono::milliseconds(-1));
806 }
807
808 // Shut everyone else down
809 pi1_server_event_loop.Exit();
810 pi1_client_event_loop.Exit();
811 pi2_client_event_loop.Exit();
812 pi1_test_event_loop.Exit();
813 pi2_test_event_loop.Exit();
814 pi1_server_thread.join();
815 pi1_client_thread.join();
816 pi2_client_thread.join();
817 pi1_test_thread.join();
818 pi2_test_thread.join();
819}
820
Austin Schuh4889b182020-11-18 19:11:56 -0800821// TODO(austin): The above test confirms that the external state does the right
Austin Schuh5344c352020-04-12 17:04:26 -0700822// thing, but doesn't confirm that the internal state does. We either need to
823// expose a way to check the state in a thread-safe way, or need a way to jump
824// time for one node to do that.
825
Austin Schuh4889b182020-11-18 19:11:56 -0800826void SendPing(aos::Sender<examples::Ping> *sender, int value) {
827 aos::Sender<examples::Ping>::Builder builder = sender->MakeBuilder();
828 examples::Ping::Builder ping_builder = builder.MakeBuilder<examples::Ping>();
829 ping_builder.add_value(value);
830 builder.Send(ping_builder.Finish());
831}
832
833// Tests that when a message is sent before the bridge starts up, but is
834// configured as reliable, we forward it. Confirm this survives a client reset.
835TEST_F(MessageBridgeTest, ReliableSentBeforeClientStartup) {
836 DoSetShmBase("pi1");
837 // Force ourselves to be "raspberrypi" and allocate everything.
838 FLAGS_override_hostname = "raspberrypi";
839
840 FLAGS_application_name = "sender";
841 aos::ShmEventLoop send_event_loop(&pi1_config.message());
842 aos::Sender<examples::Ping> ping_sender =
843 send_event_loop.MakeSender<examples::Ping>("/test");
844 SendPing(&ping_sender, 1);
845 aos::Sender<examples::Ping> unreliable_ping_sender =
846 send_event_loop.MakeSender<examples::Ping>("/unreliable");
847 SendPing(&unreliable_ping_sender, 1);
848
849 FLAGS_application_name = "pi1_message_bridge_server";
850 aos::ShmEventLoop pi1_server_event_loop(&pi1_config.message());
851 MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
852
853 FLAGS_application_name = "pi1_message_bridge_client";
854 aos::ShmEventLoop pi1_client_event_loop(&pi1_config.message());
855 MessageBridgeClient pi1_message_bridge_client(&pi1_client_event_loop);
856
857 FLAGS_application_name = "pi1_timestamp";
858 aos::ShmEventLoop pi1_remote_timestamp_event_loop(&pi1_config.message());
859
860 // Now do it for "raspberrypi2", the client.
861 DoSetShmBase("pi2");
862 FLAGS_override_hostname = "raspberrypi2";
863
864 FLAGS_application_name = "pi2_message_bridge_server";
865 aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
866 MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
867
868 aos::ShmEventLoop receive_event_loop(&pi2_config.message());
869 aos::Fetcher<examples::Ping> ping_fetcher =
870 receive_event_loop.MakeFetcher<examples::Ping>("/test");
871 aos::Fetcher<examples::Ping> unreliable_ping_fetcher =
872 receive_event_loop.MakeFetcher<examples::Ping>("/unreliable");
873 aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
874 receive_event_loop.MakeFetcher<ClientStatistics>("/pi2/aos");
875
876 const size_t ping_channel_index = configuration::ChannelIndex(
877 receive_event_loop.configuration(), ping_fetcher.channel());
878
879 std::atomic<int> ping_timestamp_count{0};
880 pi1_remote_timestamp_event_loop.MakeWatcher(
881 "/pi1/aos/remote_timestamps/pi2",
882 [ping_channel_index,
883 &ping_timestamp_count](const logger::MessageHeader &header) {
884 VLOG(1) << aos::FlatbufferToJson(&header);
885 if (header.channel_index() == ping_channel_index) {
886 ++ping_timestamp_count;
887 }
888 });
889
890 // Before everything starts up, confirm there is no message.
891 EXPECT_FALSE(ping_fetcher.Fetch());
892 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
893
894 // Spin up the persistant pieces.
895 std::thread pi1_server_thread(
896 [&pi1_server_event_loop]() { pi1_server_event_loop.Run(); });
897 std::thread pi1_client_thread(
898 [&pi1_client_event_loop]() { pi1_client_event_loop.Run(); });
899 std::thread pi2_server_thread(
900 [&pi2_server_event_loop]() { pi2_server_event_loop.Run(); });
901
902 // Event used to wait for the timestamp counting thread to start.
903 aos::Event event;
904 std::thread pi1_remote_timestamp_thread(
905 [&pi1_remote_timestamp_event_loop, &event]() {
906 pi1_remote_timestamp_event_loop.OnRun([&event]() { event.Set(); });
907 pi1_remote_timestamp_event_loop.Run();
908 });
909
910 event.Wait();
911
912 {
913 // Now, spin up a client for 2 seconds.
914 LOG(INFO) << "Starting first pi2 MessageBridgeClient";
915 FLAGS_application_name = "pi2_message_bridge_client";
916 aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
917 MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
918
919 aos::TimerHandler *quit = pi2_client_event_loop.AddTimer(
920 [&pi2_client_event_loop]() { pi2_client_event_loop.Exit(); });
921 pi2_client_event_loop.OnRun([quit, &pi2_client_event_loop]() {
922 // Stop between timestamps, not exactly on them.
923 quit->Setup(pi2_client_event_loop.monotonic_now() +
924 chrono::milliseconds(2050));
925 });
926
927 // And go!
928 pi2_client_event_loop.Run();
929
930 // Confirm there is no detected duplicate packet.
931 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
932 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
933 ->Get(0)
934 ->duplicate_packets(),
935 0u);
936
937 EXPECT_TRUE(ping_fetcher.Fetch());
938 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
939 EXPECT_EQ(ping_timestamp_count, 1);
940 LOG(INFO) << "Shutting down first pi2 MessageBridgeClient";
941 }
942
943 {
944 // Now, spin up a second client for 2 seconds.
945 LOG(INFO) << "Starting second pi2 MessageBridgeClient";
946 FLAGS_application_name = "pi2_message_bridge_client";
947 aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
948 MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
949
950 aos::TimerHandler *quit = pi2_client_event_loop.AddTimer(
951 [&pi2_client_event_loop]() { pi2_client_event_loop.Exit(); });
952 pi2_client_event_loop.OnRun([quit, &pi2_client_event_loop]() {
953 // Stop between timestamps, not exactly on them.
954 quit->Setup(pi2_client_event_loop.monotonic_now() +
955 chrono::milliseconds(5050));
956 });
957
958 // And go!
959 pi2_client_event_loop.Run();
960
961 // Confirm we detect the duplicate packet correctly.
962 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
963 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
964 ->Get(0)
965 ->duplicate_packets(),
966 1u);
967
968 EXPECT_EQ(ping_timestamp_count, 1);
969 EXPECT_FALSE(ping_fetcher.Fetch());
970 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
971 }
972
973 // Shut everyone else down
974 pi1_server_event_loop.Exit();
975 pi1_client_event_loop.Exit();
976 pi2_server_event_loop.Exit();
977 pi1_remote_timestamp_event_loop.Exit();
978 pi1_remote_timestamp_thread.join();
979 pi1_server_thread.join();
980 pi1_client_thread.join();
981 pi2_server_thread.join();
982}
983
984// Tests that when a message is sent before the bridge starts up, but is
985// configured as reliable, we forward it. Confirm this works across server
986// resets.
987TEST_F(MessageBridgeTest, ReliableSentBeforeServerStartup) {
988 // Now do it for "raspberrypi2", the client.
989 DoSetShmBase("pi2");
990 FLAGS_override_hostname = "raspberrypi2";
991
992 FLAGS_application_name = "pi2_message_bridge_server";
993 aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
994 MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
995
996 FLAGS_application_name = "pi2_message_bridge_client";
997 aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
998 MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
999
1000 aos::ShmEventLoop receive_event_loop(&pi2_config.message());
1001 aos::Fetcher<examples::Ping> ping_fetcher =
1002 receive_event_loop.MakeFetcher<examples::Ping>("/test");
1003 aos::Fetcher<examples::Ping> unreliable_ping_fetcher =
1004 receive_event_loop.MakeFetcher<examples::Ping>("/unreliable");
1005 aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
1006 receive_event_loop.MakeFetcher<ClientStatistics>("/pi2/aos");
1007
1008 DoSetShmBase("pi1");
1009 // Force ourselves to be "raspberrypi" and allocate everything.
1010 FLAGS_override_hostname = "raspberrypi";
1011
1012 FLAGS_application_name = "sender";
1013 aos::ShmEventLoop send_event_loop(&pi1_config.message());
1014 aos::Sender<examples::Ping> ping_sender =
1015 send_event_loop.MakeSender<examples::Ping>("/test");
1016 {
1017 aos::Sender<examples::Ping>::Builder builder = ping_sender.MakeBuilder();
1018 examples::Ping::Builder ping_builder =
1019 builder.MakeBuilder<examples::Ping>();
1020 ping_builder.add_value(1);
1021 builder.Send(ping_builder.Finish());
1022 }
1023
1024 FLAGS_application_name = "pi1_message_bridge_client";
1025 aos::ShmEventLoop pi1_client_event_loop(&pi1_config.message());
1026 MessageBridgeClient pi1_message_bridge_client(&pi1_client_event_loop);
1027
1028 FLAGS_application_name = "pi1_timestamp";
1029 aos::ShmEventLoop pi1_remote_timestamp_event_loop(&pi1_config.message());
1030
1031 const size_t ping_channel_index = configuration::ChannelIndex(
1032 receive_event_loop.configuration(), ping_fetcher.channel());
1033
1034 std::atomic<int> ping_timestamp_count{0};
1035 pi1_remote_timestamp_event_loop.MakeWatcher(
1036 "/pi1/aos/remote_timestamps/pi2",
1037 [ping_channel_index,
1038 &ping_timestamp_count](const logger::MessageHeader &header) {
1039 VLOG(1) << aos::FlatbufferToJson(&header);
1040 if (header.channel_index() == ping_channel_index) {
1041 ++ping_timestamp_count;
1042 }
1043 });
1044
1045 // Before everything starts up, confirm there is no message.
1046 EXPECT_FALSE(ping_fetcher.Fetch());
1047 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
1048
1049 // Spin up the persistant pieces.
1050 std::thread pi1_client_thread(
1051 [&pi1_client_event_loop]() { pi1_client_event_loop.Run(); });
1052 std::thread pi2_server_thread(
1053 [&pi2_server_event_loop]() { pi2_server_event_loop.Run(); });
1054 std::thread pi2_client_thread(
1055 [&pi2_client_event_loop]() { pi2_client_event_loop.Run(); });
1056
1057 // Event used to wait for the timestamp counting thread to start.
1058 aos::Event event;
1059 std::thread pi1_remote_timestamp_thread(
1060 [&pi1_remote_timestamp_event_loop, &event]() {
1061 pi1_remote_timestamp_event_loop.OnRun([&event]() { event.Set(); });
1062 pi1_remote_timestamp_event_loop.Run();
1063 });
1064
1065 event.Wait();
1066
1067 {
1068 // Now, spin up a server for 2 seconds.
1069 FLAGS_application_name = "pi1_message_bridge_server";
1070 aos::ShmEventLoop pi1_server_event_loop(&pi1_config.message());
1071 MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
1072
1073 aos::TimerHandler *quit = pi1_server_event_loop.AddTimer(
1074 [&pi1_server_event_loop]() { pi1_server_event_loop.Exit(); });
1075 pi1_server_event_loop.OnRun([quit, &pi1_server_event_loop]() {
1076 // Stop between timestamps, not exactly on them.
1077 quit->Setup(pi1_server_event_loop.monotonic_now() +
1078 chrono::milliseconds(2050));
1079 });
1080
1081 // And go!
1082 pi1_server_event_loop.Run();
1083
1084 // Confirm there is no detected duplicate packet.
1085 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1086 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1087 ->Get(0)
1088 ->duplicate_packets(),
1089 0u);
1090
1091 EXPECT_TRUE(ping_fetcher.Fetch());
1092 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
1093 EXPECT_EQ(ping_timestamp_count, 1);
1094 LOG(INFO) << "Shutting down first pi1 MessageBridgeServer";
1095 }
1096
1097 {
1098 // Now, spin up a second server for 2 seconds.
1099 FLAGS_application_name = "pi1_message_bridge_server";
1100 aos::ShmEventLoop pi1_server_event_loop(&pi1_config.message());
1101 MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
1102
1103 aos::TimerHandler *quit = pi1_server_event_loop.AddTimer(
1104 [&pi1_server_event_loop]() { pi1_server_event_loop.Exit(); });
1105 pi1_server_event_loop.OnRun([quit, &pi1_server_event_loop]() {
1106 // Stop between timestamps, not exactly on them.
1107 quit->Setup(pi1_server_event_loop.monotonic_now() +
1108 chrono::milliseconds(2050));
1109 });
1110
1111 // And go!
1112 pi1_server_event_loop.Run();
1113
1114 // Confirm we detect the duplicate packet correctly.
1115 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1116 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1117 ->Get(0)
1118 ->duplicate_packets(),
1119 1u);
1120
1121 EXPECT_EQ(ping_timestamp_count, 1);
1122 EXPECT_FALSE(ping_fetcher.Fetch());
1123 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
1124 LOG(INFO) << "Shutting down first pi1 MessageBridgeServer";
1125 }
1126
1127 // Shut everyone else down
1128 pi1_client_event_loop.Exit();
1129 pi2_server_event_loop.Exit();
1130 pi2_client_event_loop.Exit();
1131 pi1_remote_timestamp_event_loop.Exit();
1132 pi1_remote_timestamp_thread.join();
1133 pi1_client_thread.join();
1134 pi2_server_thread.join();
1135 pi2_client_thread.join();
1136}
1137
Austin Schuhe84c3ed2019-12-14 15:29:48 -08001138} // namespace testing
1139} // namespace message_bridge
1140} // namespace aos