blob: a3e7f6331f1f950df1c2eab8338b1c82e171b946 [file] [log] [blame]
Austin Schuhe84c3ed2019-12-14 15:29:48 -08001#include "gtest/gtest.h"
2
3#include <chrono>
4#include <thread>
5
Austin Schuh2f8fd752020-09-01 22:38:28 -07006#include "absl/strings/str_cat.h"
Austin Schuhe84c3ed2019-12-14 15:29:48 -08007#include "aos/events/ping_generated.h"
8#include "aos/events/pong_generated.h"
9#include "aos/network/message_bridge_client_lib.h"
10#include "aos/network/message_bridge_server_lib.h"
Jim Ostrowski2192ddb2020-06-24 19:07:31 -070011#include "aos/network/team_number.h"
Austin Schuhe991fe22020-11-18 16:53:39 -080012#include "aos/util/file.h"
Austin Schuhe84c3ed2019-12-14 15:29:48 -080013
14namespace aos {
Austin Schuh2f8fd752020-09-01 22:38:28 -070015void SetShmBase(const std::string_view base);
16
Austin Schuhe84c3ed2019-12-14 15:29:48 -080017namespace message_bridge {
18namespace testing {
19
20namespace chrono = std::chrono;
21
Austin Schuhe991fe22020-11-18 16:53:39 -080022std::string ShmBase(const std::string_view node) {
Austin Schuh2f8fd752020-09-01 22:38:28 -070023 const char *tmpdir_c_str = getenv("TEST_TMPDIR");
24 if (tmpdir_c_str != nullptr) {
Austin Schuhe991fe22020-11-18 16:53:39 -080025 return absl::StrCat(tmpdir_c_str, "/", node);
Austin Schuh2f8fd752020-09-01 22:38:28 -070026 } else {
Austin Schuhe991fe22020-11-18 16:53:39 -080027 return absl::StrCat("/dev/shm/", node);
Austin Schuh2f8fd752020-09-01 22:38:28 -070028 }
29}
30
Austin Schuhe991fe22020-11-18 16:53:39 -080031void DoSetShmBase(const std::string_view node) {
32 aos::SetShmBase(ShmBase(node));
33}
34
35class MessageBridgeTest : public ::testing::Test {
36 public:
37 MessageBridgeTest()
38 : pi1_config(aos::configuration::ReadConfig(
39 "aos/network/message_bridge_test_server_config.json")),
40 pi2_config(aos::configuration::ReadConfig(
41 "aos/network/message_bridge_test_client_config.json")) {
42 util::UnlinkRecursive(ShmBase("pi1"));
43 util::UnlinkRecursive(ShmBase("pi2"));
44 }
45
46 aos::FlatbufferDetachedBuffer<aos::Configuration> pi1_config;
47 aos::FlatbufferDetachedBuffer<aos::Configuration> pi2_config;
48};
49
Austin Schuhe84c3ed2019-12-14 15:29:48 -080050// Test that we can send a ping message over sctp and receive it.
Austin Schuhe991fe22020-11-18 16:53:39 -080051TEST_F(MessageBridgeTest, PingPong) {
Austin Schuhe84c3ed2019-12-14 15:29:48 -080052 // This is rather annoying to set up. We need to start up a client and
53 // server, on the same node, but get them to think that they are on different
54 // nodes.
55 //
56 // We then get to wait until they are connected.
57 //
58 // After they are connected, we send a Ping message.
59 //
60 // On the other end, we receive a Pong message.
61 //
62 // But, we need the client to not post directly to "/test" like it would in a
63 // real system, otherwise we will re-send the ping message... So, use an
64 // application specific map to have the client post somewhere else.
65 //
66 // To top this all off, each of these needs to be done with a ShmEventLoop,
67 // which needs to run in a separate thread... And it is really hard to get
68 // everything started up reliably. So just be super generous on timeouts and
69 // hope for the best. We can be more generous in the future if we need to.
70 //
71 // We are faking the application names by passing in --application_name=foo
Austin Schuh2f8fd752020-09-01 22:38:28 -070072 DoSetShmBase("pi1");
Austin Schuhe84c3ed2019-12-14 15:29:48 -080073 FLAGS_application_name = "pi1_message_bridge_server";
74 // Force ourselves to be "raspberrypi" and allocate everything.
75 FLAGS_override_hostname = "raspberrypi";
Austin Schuh2f8fd752020-09-01 22:38:28 -070076
Austin Schuhe991fe22020-11-18 16:53:39 -080077 aos::ShmEventLoop pi1_server_event_loop(&pi1_config.message());
Austin Schuh7bc59052020-02-16 23:48:33 -080078 MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
79
80 FLAGS_application_name = "pi1_message_bridge_client";
Austin Schuhe991fe22020-11-18 16:53:39 -080081 aos::ShmEventLoop pi1_client_event_loop(&pi1_config.message());
Austin Schuh7bc59052020-02-16 23:48:33 -080082 MessageBridgeClient pi1_message_bridge_client(&pi1_client_event_loop);
Austin Schuhe84c3ed2019-12-14 15:29:48 -080083
84 // And build the app which sends the pings.
85 FLAGS_application_name = "ping";
Austin Schuhe991fe22020-11-18 16:53:39 -080086 aos::ShmEventLoop ping_event_loop(&pi1_config.message());
Austin Schuhe84c3ed2019-12-14 15:29:48 -080087 aos::Sender<examples::Ping> ping_sender =
88 ping_event_loop.MakeSender<examples::Ping>("/test");
89
Austin Schuhe991fe22020-11-18 16:53:39 -080090 aos::ShmEventLoop pi1_test_event_loop(&pi1_config.message());
Austin Schuh2f8fd752020-09-01 22:38:28 -070091 aos::Fetcher<logger::MessageHeader> message_header_fetcher1 =
92 pi1_test_event_loop.MakeFetcher<logger::MessageHeader>(
93 "/pi1/aos/remote_timestamps/pi2");
94
95 // Fetchers for confirming the remote timestamps made it.
96 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
97 ping_event_loop.MakeFetcher<examples::Ping>("/test");
98 aos::Fetcher<Timestamp> pi1_on_pi1_timestamp_fetcher =
99 ping_event_loop.MakeFetcher<Timestamp>("/aos");
100
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800101 // Now do it for "raspberrypi2", the client.
102 FLAGS_application_name = "pi2_message_bridge_client";
103 FLAGS_override_hostname = "raspberrypi2";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700104 DoSetShmBase("pi2");
105
Austin Schuh5344c352020-04-12 17:04:26 -0700106 aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
Austin Schuh7bc59052020-02-16 23:48:33 -0800107 MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
108
109 FLAGS_application_name = "pi2_message_bridge_server";
Austin Schuh5344c352020-04-12 17:04:26 -0700110 aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
Austin Schuh7bc59052020-02-16 23:48:33 -0800111 MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800112
113 // And build the app which sends the pongs.
114 FLAGS_application_name = "pong";
Austin Schuh5344c352020-04-12 17:04:26 -0700115 aos::ShmEventLoop pong_event_loop(&pi2_config.message());
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800116
Austin Schuh7bc59052020-02-16 23:48:33 -0800117 // And build the app for testing.
118 FLAGS_application_name = "test";
Austin Schuh5344c352020-04-12 17:04:26 -0700119 aos::ShmEventLoop test_event_loop(&pi2_config.message());
Austin Schuh7bc59052020-02-16 23:48:33 -0800120
121 aos::Fetcher<ClientStatistics> client_statistics_fetcher =
122 test_event_loop.MakeFetcher<ClientStatistics>("/aos");
Austin Schuh2f8fd752020-09-01 22:38:28 -0700123 aos::Fetcher<logger::MessageHeader> message_header_fetcher2 =
124 test_event_loop.MakeFetcher<logger::MessageHeader>(
125 "/pi2/aos/remote_timestamps/pi1");
126
127 // Event loop for fetching data delivered to pi2 from pi1 to match up
128 // messages.
129 aos::ShmEventLoop delivered_messages_event_loop(&pi2_config.message());
130 aos::Fetcher<Timestamp> pi1_on_pi2_timestamp_fetcher =
131 delivered_messages_event_loop.MakeFetcher<Timestamp>("/pi1/aos");
132 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
133 delivered_messages_event_loop.MakeFetcher<examples::Ping>("/test");
134 EXPECT_FALSE(ping_on_pi2_fetcher.Fetch());
135 EXPECT_FALSE(pi1_on_pi2_timestamp_fetcher.Fetch());
Austin Schuh7bc59052020-02-16 23:48:33 -0800136
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800137 // Count the pongs.
138 int pong_count = 0;
139 pong_event_loop.MakeWatcher(
Austin Schuh2f8fd752020-09-01 22:38:28 -0700140 "/test", [&pong_count](const examples::Ping &ping) {
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800141 ++pong_count;
142 LOG(INFO) << "Got ping back " << FlatbufferToJson(&ping);
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800143 });
144
145 FLAGS_override_hostname = "";
146
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800147 // Wait until we are connected, then send.
148 int ping_count = 0;
Austin Schuh7bc59052020-02-16 23:48:33 -0800149 int pi1_server_statistics_count = 0;
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800150 ping_event_loop.MakeWatcher(
Austin Schuh196a4452020-03-15 23:12:03 -0700151 "/pi1/aos",
Austin Schuh7bc59052020-02-16 23:48:33 -0800152 [&ping_count, &pi2_client_event_loop, &ping_sender,
153 &pi1_server_statistics_count](const ServerStatistics &stats) {
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800154 LOG(INFO) << FlatbufferToJson(&stats);
155
156 ASSERT_TRUE(stats.has_connections());
157 EXPECT_EQ(stats.connections()->size(), 1);
158
159 bool connected = false;
160 for (const ServerConnection *connection : *stats.connections()) {
Austin Schuh7bc59052020-02-16 23:48:33 -0800161 // Confirm that we are estimating the server time offset correctly. It
162 // should be about 0 since we are on the same machine here.
163 if (connection->has_monotonic_offset()) {
164 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
165 chrono::milliseconds(1));
166 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
167 chrono::milliseconds(-1));
168 ++pi1_server_statistics_count;
169 }
170
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800171 if (connection->node()->name()->string_view() ==
Austin Schuh7bc59052020-02-16 23:48:33 -0800172 pi2_client_event_loop.node()->name()->string_view()) {
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800173 if (connection->state() == State::CONNECTED) {
174 connected = true;
175 }
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800176 }
177 }
178
179 if (connected) {
180 LOG(INFO) << "Connected! Sent ping.";
181 auto builder = ping_sender.MakeBuilder();
182 examples::Ping::Builder ping_builder =
183 builder.MakeBuilder<examples::Ping>();
184 ping_builder.add_value(ping_count + 971);
185 builder.Send(ping_builder.Finish());
186 ++ping_count;
187 }
188 });
189
Austin Schuh7bc59052020-02-16 23:48:33 -0800190 // Confirm both client and server statistics messages have decent offsets in
191 // them.
192 int pi2_server_statistics_count = 0;
Austin Schuh196a4452020-03-15 23:12:03 -0700193 pong_event_loop.MakeWatcher("/pi2/aos", [&pi2_server_statistics_count](
Austin Schuh7bc59052020-02-16 23:48:33 -0800194 const ServerStatistics &stats) {
195 LOG(INFO) << FlatbufferToJson(&stats);
196 for (const ServerConnection *connection : *stats.connections()) {
197 if (connection->has_monotonic_offset()) {
198 ++pi2_server_statistics_count;
199 // Confirm that we are estimating the server time offset correctly. It
200 // should be about 0 since we are on the same machine here.
201 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
202 chrono::milliseconds(1));
203 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
204 chrono::milliseconds(-1));
205 }
206 }
207 });
208
209 int pi1_client_statistics_count = 0;
Austin Schuh5344c352020-04-12 17:04:26 -0700210 ping_event_loop.MakeWatcher("/pi1/aos", [&pi1_client_statistics_count](
211 const ClientStatistics &stats) {
212 LOG(INFO) << FlatbufferToJson(&stats);
Austin Schuh7bc59052020-02-16 23:48:33 -0800213
Austin Schuh5344c352020-04-12 17:04:26 -0700214 for (const ClientConnection *connection : *stats.connections()) {
215 if (connection->has_monotonic_offset()) {
216 ++pi1_client_statistics_count;
217 // It takes at least 10 microseconds to send a message between the
218 // client and server. The min (filtered) time shouldn't be over 10
219 // milliseconds on localhost. This might have to bump up if this is
220 // proving flaky.
221 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
222 chrono::milliseconds(10));
223 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
224 chrono::microseconds(10));
225 }
226 }
227 });
Austin Schuh7bc59052020-02-16 23:48:33 -0800228
229 int pi2_client_statistics_count = 0;
Austin Schuh196a4452020-03-15 23:12:03 -0700230 pong_event_loop.MakeWatcher("/pi2/aos", [&pi2_client_statistics_count](
Austin Schuh7bc59052020-02-16 23:48:33 -0800231 const ClientStatistics &stats) {
232 LOG(INFO) << FlatbufferToJson(&stats);
233
234 for (const ClientConnection *connection : *stats.connections()) {
235 if (connection->has_monotonic_offset()) {
236 ++pi2_client_statistics_count;
237 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
238 chrono::milliseconds(10));
239 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
240 chrono::microseconds(10));
241 }
242 }
243 });
244
Austin Schuh196a4452020-03-15 23:12:03 -0700245 ping_event_loop.MakeWatcher("/pi1/aos", [](const Timestamp &timestamp) {
Austin Schuh7bc59052020-02-16 23:48:33 -0800246 EXPECT_TRUE(timestamp.has_offsets());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700247 LOG(INFO) << "/pi1/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuh7bc59052020-02-16 23:48:33 -0800248 });
Austin Schuh196a4452020-03-15 23:12:03 -0700249 pong_event_loop.MakeWatcher("/pi2/aos", [](const Timestamp &timestamp) {
Austin Schuh7bc59052020-02-16 23:48:33 -0800250 EXPECT_TRUE(timestamp.has_offsets());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700251 LOG(INFO) << "/pi2/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuh7bc59052020-02-16 23:48:33 -0800252 });
253
254 // Run for 5 seconds to make sure we have time to estimate the offset.
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800255 aos::TimerHandler *quit = ping_event_loop.AddTimer(
256 [&ping_event_loop]() { ping_event_loop.Exit(); });
257 ping_event_loop.OnRun([quit, &ping_event_loop]() {
Austin Schuh7bc59052020-02-16 23:48:33 -0800258 // Stop between timestamps, not exactly on them.
259 quit->Setup(ping_event_loop.monotonic_now() + chrono::milliseconds(5050));
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800260 });
261
Austin Schuh2f8fd752020-09-01 22:38:28 -0700262 // Find the channel index for both the /pi1/aos Timestamp channel and Ping
263 // channel.
264 const size_t pi1_timestamp_channel = configuration::ChannelIndex(
265 pong_event_loop.configuration(), pi1_on_pi2_timestamp_fetcher.channel());
266 const size_t ping_timestamp_channel =
267 configuration::ChannelIndex(delivered_messages_event_loop.configuration(),
268 ping_on_pi2_fetcher.channel());
269
270 for (const Channel *channel : *ping_event_loop.configuration()->channels()) {
271 VLOG(1) << "Channel "
272 << configuration::ChannelIndex(ping_event_loop.configuration(),
273 channel)
274 << " " << configuration::CleanedChannelToString(channel);
275 }
276
277 // For each remote timestamp we get back, confirm that it is either a ping
278 // message, or a timestamp we sent out. Also confirm that the timestamps are
279 // correct.
280 ping_event_loop.MakeWatcher(
281 "/pi1/aos/remote_timestamps/pi2",
282 [pi1_timestamp_channel, ping_timestamp_channel, &ping_on_pi2_fetcher,
283 &ping_on_pi1_fetcher, &pi1_on_pi2_timestamp_fetcher,
284 &pi1_on_pi1_timestamp_fetcher](const logger::MessageHeader &header) {
285 VLOG(1) << aos::FlatbufferToJson(&header);
286
287 const aos::monotonic_clock::time_point header_monotonic_sent_time(
288 chrono::nanoseconds(header.monotonic_sent_time()));
289 const aos::realtime_clock::time_point header_realtime_sent_time(
290 chrono::nanoseconds(header.realtime_sent_time()));
291 const aos::monotonic_clock::time_point header_monotonic_remote_time(
292 chrono::nanoseconds(header.monotonic_remote_time()));
293 const aos::realtime_clock::time_point header_realtime_remote_time(
294 chrono::nanoseconds(header.realtime_remote_time()));
295
296 const Context *pi1_context = nullptr;
297 const Context *pi2_context = nullptr;
298
299 if (header.channel_index() == pi1_timestamp_channel) {
300 // Find the forwarded message.
301 while (pi1_on_pi2_timestamp_fetcher.context().monotonic_event_time <
302 header_monotonic_sent_time) {
303 ASSERT_TRUE(pi1_on_pi2_timestamp_fetcher.FetchNext());
304 }
305
306 // And the source message.
307 while (pi1_on_pi1_timestamp_fetcher.context().monotonic_event_time <
308 header_monotonic_remote_time) {
309 ASSERT_TRUE(pi1_on_pi1_timestamp_fetcher.FetchNext());
310 }
311
312 pi1_context = &pi1_on_pi1_timestamp_fetcher.context();
313 pi2_context = &pi1_on_pi2_timestamp_fetcher.context();
314 } else if (header.channel_index() == ping_timestamp_channel) {
315 // Find the forwarded message.
316 while (ping_on_pi2_fetcher.context().monotonic_event_time <
317 header_monotonic_sent_time) {
318 ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
319 }
320
321 // And the source message.
322 while (ping_on_pi1_fetcher.context().monotonic_event_time <
323 header_monotonic_remote_time) {
324 ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
325 }
326
327 pi1_context = &ping_on_pi1_fetcher.context();
328 pi2_context = &ping_on_pi2_fetcher.context();
329 } else {
330 LOG(FATAL) << "Unknown channel";
331 }
332
333 // Confirm the forwarded message has matching timestamps to the
334 // timestamps we got back.
335 EXPECT_EQ(pi2_context->queue_index, header.queue_index());
336 EXPECT_EQ(pi2_context->monotonic_event_time,
337 header_monotonic_sent_time);
338 EXPECT_EQ(pi2_context->realtime_event_time, header_realtime_sent_time);
339 EXPECT_EQ(pi2_context->realtime_remote_time,
340 header_realtime_remote_time);
341 EXPECT_EQ(pi2_context->monotonic_remote_time,
342 header_monotonic_remote_time);
343
344 // Confirm the forwarded message also matches the source message.
345 EXPECT_EQ(pi1_context->queue_index, header.queue_index());
346 EXPECT_EQ(pi1_context->monotonic_event_time,
347 header_monotonic_remote_time);
348 EXPECT_EQ(pi1_context->realtime_event_time,
349 header_realtime_remote_time);
350 });
351
Austin Schuh7bc59052020-02-16 23:48:33 -0800352 // Start everything up. Pong is the only thing we don't know how to wait on,
353 // so start it first.
354 std::thread pong_thread([&pong_event_loop]() { pong_event_loop.Run(); });
355
356 std::thread pi1_server_thread(
357 [&pi1_server_event_loop]() { pi1_server_event_loop.Run(); });
358 std::thread pi1_client_thread(
359 [&pi1_client_event_loop]() { pi1_client_event_loop.Run(); });
360 std::thread pi2_client_thread(
361 [&pi2_client_event_loop]() { pi2_client_event_loop.Run(); });
362 std::thread pi2_server_thread(
363 [&pi2_server_event_loop]() { pi2_server_event_loop.Run(); });
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800364
365 // And go!
366 ping_event_loop.Run();
367
368 // Shut everyone else down
Austin Schuh7bc59052020-02-16 23:48:33 -0800369 pi1_server_event_loop.Exit();
370 pi1_client_event_loop.Exit();
371 pi2_client_event_loop.Exit();
372 pi2_server_event_loop.Exit();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800373 pong_event_loop.Exit();
Austin Schuh7bc59052020-02-16 23:48:33 -0800374 pi1_server_thread.join();
375 pi1_client_thread.join();
376 pi2_client_thread.join();
377 pi2_server_thread.join();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800378 pong_thread.join();
379
380 // Make sure we sent something.
381 EXPECT_GE(ping_count, 1);
382 // And got something back.
383 EXPECT_GE(pong_count, 1);
Austin Schuh7bc59052020-02-16 23:48:33 -0800384
385 // Confirm that we are estimating a monotonic offset on the client.
386 ASSERT_TRUE(client_statistics_fetcher.Fetch());
387
388 EXPECT_EQ(client_statistics_fetcher->connections()->size(), 1u);
389 EXPECT_EQ(client_statistics_fetcher->connections()
390 ->Get(0)
391 ->node()
392 ->name()
393 ->string_view(),
394 "pi1");
395
396 // Make sure the offset in one direction is less than a second.
397 EXPECT_GT(
398 client_statistics_fetcher->connections()->Get(0)->monotonic_offset(), 0);
399 EXPECT_LT(
400 client_statistics_fetcher->connections()->Get(0)->monotonic_offset(),
401 1000000000);
402
403 EXPECT_GE(pi1_server_statistics_count, 2);
404 EXPECT_GE(pi2_server_statistics_count, 2);
405 EXPECT_GE(pi1_client_statistics_count, 2);
406 EXPECT_GE(pi2_client_statistics_count, 2);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700407
408 // Confirm we got timestamps back!
409 EXPECT_TRUE(message_header_fetcher1.Fetch());
410 EXPECT_TRUE(message_header_fetcher2.Fetch());
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800411}
412
Austin Schuh5344c352020-04-12 17:04:26 -0700413// Test that the client disconnecting triggers the server offsets on both sides
414// to clear.
Austin Schuhe991fe22020-11-18 16:53:39 -0800415TEST_F(MessageBridgeTest, ClientRestart) {
Austin Schuh5344c352020-04-12 17:04:26 -0700416 // This is rather annoying to set up. We need to start up a client and
417 // server, on the same node, but get them to think that they are on different
418 // nodes.
419 //
420 // We need the client to not post directly to "/test" like it would in a
421 // real system, otherwise we will re-send the ping message... So, use an
422 // application specific map to have the client post somewhere else.
423 //
424 // To top this all off, each of these needs to be done with a ShmEventLoop,
425 // which needs to run in a separate thread... And it is really hard to get
426 // everything started up reliably. So just be super generous on timeouts and
427 // hope for the best. We can be more generous in the future if we need to.
428 //
429 // We are faking the application names by passing in --application_name=foo
Austin Schuh5344c352020-04-12 17:04:26 -0700430 FLAGS_application_name = "pi1_message_bridge_server";
431 // Force ourselves to be "raspberrypi" and allocate everything.
432 FLAGS_override_hostname = "raspberrypi";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700433 DoSetShmBase("pi1");
Austin Schuhe991fe22020-11-18 16:53:39 -0800434 aos::ShmEventLoop pi1_server_event_loop(&pi1_config.message());
Austin Schuh5344c352020-04-12 17:04:26 -0700435 MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
436
437 FLAGS_application_name = "pi1_message_bridge_client";
Austin Schuhe991fe22020-11-18 16:53:39 -0800438 aos::ShmEventLoop pi1_client_event_loop(&pi1_config.message());
Austin Schuh5344c352020-04-12 17:04:26 -0700439 MessageBridgeClient pi1_message_bridge_client(&pi1_client_event_loop);
440
441 // And build the app for testing.
442 FLAGS_application_name = "test1";
Austin Schuhe991fe22020-11-18 16:53:39 -0800443 aos::ShmEventLoop pi1_test_event_loop(&pi1_config.message());
Austin Schuh5344c352020-04-12 17:04:26 -0700444 aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
445 pi1_test_event_loop.MakeFetcher<ServerStatistics>("/pi1/aos");
446
447 // Now do it for "raspberrypi2", the client.
448 FLAGS_override_hostname = "raspberrypi2";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700449 DoSetShmBase("pi2");
Austin Schuh5344c352020-04-12 17:04:26 -0700450 FLAGS_application_name = "pi2_message_bridge_server";
451 aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
452 MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
453
454 // And build the app for testing.
455 FLAGS_application_name = "test2";
456 aos::ShmEventLoop pi2_test_event_loop(&pi2_config.message());
457 aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
458 pi2_test_event_loop.MakeFetcher<ServerStatistics>("/pi2/aos");
459
460 // Wait until we are connected, then send.
461 pi1_test_event_loop.MakeWatcher(
462 "/pi1/aos", [](const ServerStatistics &stats) {
463 LOG(INFO) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
464 });
465
466 pi2_test_event_loop.MakeWatcher(
467 "/pi2/aos", [](const ServerStatistics &stats) {
468 LOG(INFO) << "pi2 ServerStatistics " << FlatbufferToJson(&stats);
469 });
470
471 pi1_test_event_loop.MakeWatcher(
472 "/pi1/aos", [](const ClientStatistics &stats) {
473 LOG(INFO) << "pi1 ClientStatistics " << FlatbufferToJson(&stats);
474 });
475
476 pi2_test_event_loop.MakeWatcher(
477 "/pi2/aos", [](const ClientStatistics &stats) {
478 LOG(INFO) << "pi2 ClientStatistics " << FlatbufferToJson(&stats);
479 });
480
481 pi1_test_event_loop.MakeWatcher("/pi1/aos", [](const Timestamp &timestamp) {
482 LOG(INFO) << "pi1 Timestamp " << FlatbufferToJson(&timestamp);
483 });
484 pi2_test_event_loop.MakeWatcher("/pi2/aos", [](const Timestamp &timestamp) {
485 LOG(INFO) << "pi2 Timestamp " << FlatbufferToJson(&timestamp);
486 });
487
488 // Start everything up. Pong is the only thing we don't know how to wait on,
489 // so start it first.
490 std::thread pi1_test_thread(
491 [&pi1_test_event_loop]() { pi1_test_event_loop.Run(); });
492 std::thread pi2_test_thread(
493 [&pi2_test_event_loop]() { pi2_test_event_loop.Run(); });
494
495 std::thread pi1_server_thread(
496 [&pi1_server_event_loop]() { pi1_server_event_loop.Run(); });
497 std::thread pi1_client_thread(
498 [&pi1_client_event_loop]() { pi1_client_event_loop.Run(); });
499 std::thread pi2_server_thread(
500 [&pi2_server_event_loop]() { pi2_server_event_loop.Run(); });
501
502 {
503 FLAGS_application_name = "pi2_message_bridge_client";
504 aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
505 MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
506
507 // Run for 5 seconds to make sure we have time to estimate the offset.
508 aos::TimerHandler *const quit = pi2_client_event_loop.AddTimer(
509 [&pi2_client_event_loop]() { pi2_client_event_loop.Exit(); });
510 pi2_client_event_loop.OnRun([quit, &pi2_client_event_loop]() {
511 // Stop between timestamps, not exactly on them.
512 quit->Setup(pi2_client_event_loop.monotonic_now() +
513 chrono::milliseconds(3050));
514 });
515
516 // And go!
517 pi2_client_event_loop.Run();
518
519 // Now confirm we are synchronized.
520 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
521 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
522
523 const ServerConnection *const pi1_connection =
524 pi1_server_statistics_fetcher->connections()->Get(0);
525 const ServerConnection *const pi2_connection =
526 pi2_server_statistics_fetcher->connections()->Get(0);
527
528 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
529 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
530 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
531 chrono::milliseconds(1));
532 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
533 chrono::milliseconds(-1));
534
535 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
536 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
537 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
538 chrono::milliseconds(1));
539 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
540 chrono::milliseconds(-1));
541 }
542
543 std::this_thread::sleep_for(std::chrono::seconds(2));
544
545 {
546 // Now confirm we are un-synchronized.
547 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
548 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
549 const ServerConnection *const pi1_connection =
550 pi1_server_statistics_fetcher->connections()->Get(0);
551 const ServerConnection *const pi2_connection =
552 pi2_server_statistics_fetcher->connections()->Get(0);
553
554 EXPECT_EQ(pi1_connection->state(), State::DISCONNECTED);
555 EXPECT_FALSE(pi1_connection->has_monotonic_offset());
556 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
557 EXPECT_FALSE(pi2_connection->has_monotonic_offset());
558 }
559
560 {
561 FLAGS_application_name = "pi2_message_bridge_client";
562 aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
563 MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
564
565 // Run for 5 seconds to make sure we have time to estimate the offset.
566 aos::TimerHandler *const quit = pi2_client_event_loop.AddTimer(
567 [&pi2_client_event_loop]() { pi2_client_event_loop.Exit(); });
568 pi2_client_event_loop.OnRun([quit, &pi2_client_event_loop]() {
569 // Stop between timestamps, not exactly on them.
570 quit->Setup(pi2_client_event_loop.monotonic_now() +
571 chrono::milliseconds(3050));
572 });
573
574 // And go!
575 pi2_client_event_loop.Run();
576
577 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
578 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
579
580 // Now confirm we are synchronized again.
581 const ServerConnection *const pi1_connection =
582 pi1_server_statistics_fetcher->connections()->Get(0);
583 const ServerConnection *const pi2_connection =
584 pi2_server_statistics_fetcher->connections()->Get(0);
585
586 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
587 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
588 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
589 chrono::milliseconds(1));
590 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
591 chrono::milliseconds(-1));
592
593 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
594 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
595 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
596 chrono::milliseconds(1));
597 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
598 chrono::milliseconds(-1));
599 }
600
601 // Shut everyone else down
602 pi1_server_event_loop.Exit();
603 pi1_client_event_loop.Exit();
604 pi2_server_event_loop.Exit();
605 pi1_test_event_loop.Exit();
606 pi2_test_event_loop.Exit();
607 pi1_server_thread.join();
608 pi1_client_thread.join();
609 pi2_server_thread.join();
610 pi1_test_thread.join();
611 pi2_test_thread.join();
612}
613
614// Test that the server disconnecting triggers the server offsets on the other
615// side to clear, along with the other client.
Austin Schuhe991fe22020-11-18 16:53:39 -0800616TEST_F(MessageBridgeTest, ServerRestart) {
Austin Schuh5344c352020-04-12 17:04:26 -0700617 // This is rather annoying to set up. We need to start up a client and
618 // server, on the same node, but get them to think that they are on different
619 // nodes.
620 //
621 // We need the client to not post directly to "/test" like it would in a
622 // real system, otherwise we will re-send the ping message... So, use an
623 // application specific map to have the client post somewhere else.
624 //
625 // To top this all off, each of these needs to be done with a ShmEventLoop,
626 // which needs to run in a separate thread... And it is really hard to get
627 // everything started up reliably. So just be super generous on timeouts and
628 // hope for the best. We can be more generous in the future if we need to.
629 //
630 // We are faking the application names by passing in --application_name=foo
Austin Schuh5344c352020-04-12 17:04:26 -0700631 FLAGS_application_name = "pi1_message_bridge_server";
632 // Force ourselves to be "raspberrypi" and allocate everything.
633 FLAGS_override_hostname = "raspberrypi";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700634 DoSetShmBase("pi1");
Austin Schuhe991fe22020-11-18 16:53:39 -0800635 aos::ShmEventLoop pi1_server_event_loop(&pi1_config.message());
Austin Schuh5344c352020-04-12 17:04:26 -0700636 MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
637
638 FLAGS_application_name = "pi1_message_bridge_client";
Austin Schuhe991fe22020-11-18 16:53:39 -0800639 aos::ShmEventLoop pi1_client_event_loop(&pi1_config.message());
Austin Schuh5344c352020-04-12 17:04:26 -0700640 MessageBridgeClient pi1_message_bridge_client(&pi1_client_event_loop);
641
642 // And build the app for testing.
643 FLAGS_application_name = "test1";
Austin Schuhe991fe22020-11-18 16:53:39 -0800644 aos::ShmEventLoop pi1_test_event_loop(&pi1_config.message());
Austin Schuh5344c352020-04-12 17:04:26 -0700645 aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
646 pi1_test_event_loop.MakeFetcher<ServerStatistics>("/pi1/aos");
647 aos::Fetcher<ClientStatistics> pi1_client_statistics_fetcher =
648 pi1_test_event_loop.MakeFetcher<ClientStatistics>("/pi1/aos");
649
650 // Now do it for "raspberrypi2", the client.
651 FLAGS_override_hostname = "raspberrypi2";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700652 DoSetShmBase("pi2");
Austin Schuh5344c352020-04-12 17:04:26 -0700653 FLAGS_application_name = "pi2_message_bridge_client";
654 aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
655 MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
656
657 // And build the app for testing.
658 FLAGS_application_name = "test2";
659 aos::ShmEventLoop pi2_test_event_loop(&pi2_config.message());
660 aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
661 pi2_test_event_loop.MakeFetcher<ServerStatistics>("/pi2/aos");
662
663 // Wait until we are connected, then send.
664 pi1_test_event_loop.MakeWatcher(
665 "/pi1/aos", [](const ServerStatistics &stats) {
666 LOG(INFO) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
667 });
668
669 // Confirm both client and server statistics messages have decent offsets in
670 // them.
671 pi2_test_event_loop.MakeWatcher(
672 "/pi2/aos", [](const ServerStatistics &stats) {
673 LOG(INFO) << "pi2 ServerStatistics " << FlatbufferToJson(&stats);
674 });
675
676 pi1_test_event_loop.MakeWatcher(
677 "/pi1/aos", [](const ClientStatistics &stats) {
678 LOG(INFO) << "pi1 ClientStatistics " << FlatbufferToJson(&stats);
679 });
680
681 pi2_test_event_loop.MakeWatcher(
682 "/pi2/aos", [](const ClientStatistics &stats) {
683 LOG(INFO) << "pi2 ClientStatistics " << FlatbufferToJson(&stats);
684 });
685
686 pi1_test_event_loop.MakeWatcher("/pi1/aos", [](const Timestamp &timestamp) {
687 LOG(INFO) << "pi1 Timestamp " << FlatbufferToJson(&timestamp);
688 });
689 pi2_test_event_loop.MakeWatcher("/pi2/aos", [](const Timestamp &timestamp) {
690 LOG(INFO) << "pi2 Timestamp " << FlatbufferToJson(&timestamp);
691 });
692
693 // Start everything up. Pong is the only thing we don't know how to wait on,
694 // so start it first.
695 std::thread pi1_test_thread(
696 [&pi1_test_event_loop]() { pi1_test_event_loop.Run(); });
697 std::thread pi2_test_thread(
698 [&pi2_test_event_loop]() { pi2_test_event_loop.Run(); });
699
700 std::thread pi1_server_thread(
701 [&pi1_server_event_loop]() { pi1_server_event_loop.Run(); });
702 std::thread pi1_client_thread(
703 [&pi1_client_event_loop]() { pi1_client_event_loop.Run(); });
704 std::thread pi2_client_thread(
705 [&pi2_client_event_loop]() { pi2_client_event_loop.Run(); });
706
707 {
708 FLAGS_application_name = "pi2_message_bridge_server";
709 aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
710 MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
711
712 // Run for 5 seconds to make sure we have time to estimate the offset.
713 aos::TimerHandler *const quit = pi2_server_event_loop.AddTimer(
714 [&pi2_server_event_loop]() { pi2_server_event_loop.Exit(); });
715 pi2_server_event_loop.OnRun([quit, &pi2_server_event_loop]() {
716 // Stop between timestamps, not exactly on them.
717 quit->Setup(pi2_server_event_loop.monotonic_now() +
718 chrono::milliseconds(3050));
719 });
720
721 // And go!
722 pi2_server_event_loop.Run();
723
724 // Now confirm we are synchronized.
725 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
726 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
727
728 const ServerConnection *const pi1_connection =
729 pi1_server_statistics_fetcher->connections()->Get(0);
730 const ServerConnection *const pi2_connection =
731 pi2_server_statistics_fetcher->connections()->Get(0);
732
733 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
734 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
735 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
736 chrono::milliseconds(1));
737 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
738 chrono::milliseconds(-1));
739
740 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
741 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
742 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
743 chrono::milliseconds(1));
744 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
745 chrono::milliseconds(-1));
746 }
747
748 std::this_thread::sleep_for(std::chrono::seconds(2));
749
750 {
751 // And confirm we are unsynchronized.
752 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
753 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
754
755 const ServerConnection *const pi1_server_connection =
756 pi1_server_statistics_fetcher->connections()->Get(0);
757 const ClientConnection *const pi1_client_connection =
758 pi1_client_statistics_fetcher->connections()->Get(0);
759
760 EXPECT_EQ(pi1_server_connection->state(), State::CONNECTED);
761 EXPECT_FALSE(pi1_server_connection->has_monotonic_offset());
762 EXPECT_EQ(pi1_client_connection->state(), State::DISCONNECTED);
763 EXPECT_FALSE(pi1_client_connection->has_monotonic_offset());
764 }
765
766 {
767 FLAGS_application_name = "pi2_message_bridge_server";
768 aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
769 MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
770
771 // Run for 5 seconds to make sure we have time to estimate the offset.
772 aos::TimerHandler *const quit = pi2_server_event_loop.AddTimer(
773 [&pi2_server_event_loop]() { pi2_server_event_loop.Exit(); });
774 pi2_server_event_loop.OnRun([quit, &pi2_server_event_loop]() {
775 // Stop between timestamps, not exactly on them.
776 quit->Setup(pi2_server_event_loop.monotonic_now() +
777 chrono::milliseconds(3050));
778 });
779
780 // And go!
781 pi2_server_event_loop.Run();
782
783 // And confirm we are synchronized again.
784 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
785 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
786
787 const ServerConnection *const pi1_connection =
788 pi1_server_statistics_fetcher->connections()->Get(0);
789 const ServerConnection *const pi2_connection =
790 pi2_server_statistics_fetcher->connections()->Get(0);
791
792 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
793 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
794 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
795 chrono::milliseconds(1));
796 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
797 chrono::milliseconds(-1));
798
799 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
800 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
801 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
802 chrono::milliseconds(1));
803 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
804 chrono::milliseconds(-1));
805 }
806
807 // Shut everyone else down
808 pi1_server_event_loop.Exit();
809 pi1_client_event_loop.Exit();
810 pi2_client_event_loop.Exit();
811 pi1_test_event_loop.Exit();
812 pi2_test_event_loop.Exit();
813 pi1_server_thread.join();
814 pi1_client_thread.join();
815 pi2_client_thread.join();
816 pi1_test_thread.join();
817 pi2_test_thread.join();
818}
819
820// TODO(austin): This test confirms that the external state does the right
821// thing, but doesn't confirm that the internal state does. We either need to
822// expose a way to check the state in a thread-safe way, or need a way to jump
823// time for one node to do that.
824
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800825} // namespace testing
826} // namespace message_bridge
827} // namespace aos