blob: 9106a86a4fafb3529504c2968843f1e0290de404 [file] [log] [blame]
Austin Schuhe84c3ed2019-12-14 15:29:48 -08001#include <chrono>
2#include <thread>
3
Austin Schuh2f8fd752020-09-01 22:38:28 -07004#include "absl/strings/str_cat.h"
Philipp Schrader790cb542023-07-05 21:06:52 -07005#include "gtest/gtest.h"
6
Austin Schuhe84c3ed2019-12-14 15:29:48 -08007#include "aos/events/ping_generated.h"
8#include "aos/events/pong_generated.h"
Brian Silverman7b266d92021-02-17 21:24:02 -08009#include "aos/ipc_lib/event.h"
Austin Schuhe84c3ed2019-12-14 15:29:48 -080010#include "aos/network/message_bridge_client_lib.h"
Austin Schuh89f23e32023-05-15 17:06:43 -070011#include "aos/network/message_bridge_protocol.h"
Austin Schuhe84c3ed2019-12-14 15:29:48 -080012#include "aos/network/message_bridge_server_lib.h"
Jim Ostrowski2192ddb2020-06-24 19:07:31 -070013#include "aos/network/team_number.h"
Austin Schuhb0e439d2023-05-15 10:55:40 -070014#include "aos/sha256.h"
Austin Schuh373f1762021-06-02 21:07:09 -070015#include "aos/testing/path.h"
Austin Schuhe991fe22020-11-18 16:53:39 -080016#include "aos/util/file.h"
Austin Schuhe84c3ed2019-12-14 15:29:48 -080017
Austin Schuh8902fa52021-03-14 22:39:24 -070018DECLARE_string(boot_uuid);
19
Austin Schuhe84c3ed2019-12-14 15:29:48 -080020namespace aos {
Austin Schuh2f8fd752020-09-01 22:38:28 -070021void SetShmBase(const std::string_view base);
22
Austin Schuhe84c3ed2019-12-14 15:29:48 -080023namespace message_bridge {
24namespace testing {
25
Austin Schuh373f1762021-06-02 21:07:09 -070026using aos::testing::ArtifactPath;
27
Austin Schuhe84c3ed2019-12-14 15:29:48 -080028namespace chrono = std::chrono;
29
Austin Schuhe991fe22020-11-18 16:53:39 -080030std::string ShmBase(const std::string_view node) {
Austin Schuh2f8fd752020-09-01 22:38:28 -070031 const char *tmpdir_c_str = getenv("TEST_TMPDIR");
32 if (tmpdir_c_str != nullptr) {
Austin Schuhe991fe22020-11-18 16:53:39 -080033 return absl::StrCat(tmpdir_c_str, "/", node);
Austin Schuh2f8fd752020-09-01 22:38:28 -070034 } else {
Austin Schuhe991fe22020-11-18 16:53:39 -080035 return absl::StrCat("/dev/shm/", node);
Austin Schuh2f8fd752020-09-01 22:38:28 -070036 }
37}
38
Austin Schuhe991fe22020-11-18 16:53:39 -080039void DoSetShmBase(const std::string_view node) {
40 aos::SetShmBase(ShmBase(node));
41}
42
Austin Schuha4e616a2023-05-15 17:59:30 -070043// Class to manage starting and stopping a thread with an event loop in it. The
44// thread is guarenteed to be running before the constructor exits.
45class ThreadedEventLoopRunner {
46 public:
47 ThreadedEventLoopRunner(aos::ShmEventLoop *event_loop)
48 : event_loop_(event_loop), my_thread_([this]() {
49 LOG(INFO) << "Started " << event_loop_->name();
50 event_loop_->OnRun([this]() { event_.Set(); });
51 event_loop_->Run();
52 }) {
53 event_.Wait();
54 }
55
56 ~ThreadedEventLoopRunner() { Exit(); }
57
58 void Exit() {
59 if (my_thread_.joinable()) {
60 event_loop_->Exit();
61 my_thread_.join();
62 my_thread_ = std::thread();
63 }
64 }
65
66 private:
67 aos::Event event_;
68 aos::ShmEventLoop *event_loop_;
69 std::thread my_thread_;
70};
71
Austin Schuh36a2c3e2021-02-18 22:28:38 -080072// Parameters to run all the tests with.
73struct Param {
74 // The config file to use.
75 std::string config;
76 // If true, the RemoteMessage channel should be shared between all the remote
77 // channels. If false, there will be 1 RemoteMessage channel per remote
78 // channel.
79 bool shared;
80};
81
82class MessageBridgeParameterizedTest
83 : public ::testing::TestWithParam<struct Param> {
Austin Schuh0de30f32020-12-06 12:44:28 -080084 public:
Austin Schuh36a2c3e2021-02-18 22:28:38 -080085 MessageBridgeParameterizedTest()
86 : config(aos::configuration::ReadConfig(
Austin Schuh373f1762021-06-02 21:07:09 -070087 ArtifactPath(absl::StrCat("aos/network/", GetParam().config)))),
Austin Schuhb0e439d2023-05-15 10:55:40 -070088 config_sha256(Sha256(config.span())),
Austin Schuh8902fa52021-03-14 22:39:24 -070089 pi1_boot_uuid_(UUID::Random()),
90 pi2_boot_uuid_(UUID::Random()) {
Austin Schuh0de30f32020-12-06 12:44:28 -080091 util::UnlinkRecursive(ShmBase("pi1"));
92 util::UnlinkRecursive(ShmBase("pi2"));
93 }
Austin Schuhe991fe22020-11-18 16:53:39 -080094
Austin Schuh36a2c3e2021-02-18 22:28:38 -080095 bool shared() const { return GetParam().shared; }
96
Austin Schuh0a2f12f2021-01-08 22:48:29 -080097 void OnPi1() {
98 DoSetShmBase("pi1");
99 FLAGS_override_hostname = "raspberrypi";
Austin Schuh8902fa52021-03-14 22:39:24 -0700100 FLAGS_boot_uuid = pi1_boot_uuid_.ToString();
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800101 }
102
103 void OnPi2() {
104 DoSetShmBase("pi2");
105 FLAGS_override_hostname = "raspberrypi2";
Austin Schuh8902fa52021-03-14 22:39:24 -0700106 FLAGS_boot_uuid = pi2_boot_uuid_.ToString();
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800107 }
108
Austin Schuhb0e439d2023-05-15 10:55:40 -0700109 void MakePi1Server(std::string server_config_sha256 = "") {
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800110 OnPi1();
111 FLAGS_application_name = "pi1_message_bridge_server";
112 pi1_server_event_loop =
Austin Schuhf466ab52021-02-16 22:00:38 -0800113 std::make_unique<aos::ShmEventLoop>(&config.message());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800114 pi1_server_event_loop->SetRuntimeRealtimePriority(1);
Austin Schuhb0e439d2023-05-15 10:55:40 -0700115 pi1_message_bridge_server = std::make_unique<MessageBridgeServer>(
116 pi1_server_event_loop.get(), server_config_sha256.size() == 0
117 ? config_sha256
118 : server_config_sha256);
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800119 }
120
121 void RunPi1Server(chrono::nanoseconds duration) {
Philipp Schradera6712522023-07-05 20:25:11 -0700122 // Set up a shutdown callback.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800123 aos::TimerHandler *const quit = pi1_server_event_loop->AddTimer(
124 [this]() { pi1_server_event_loop->Exit(); });
125 pi1_server_event_loop->OnRun([this, quit, duration]() {
126 // Stop between timestamps, not exactly on them.
Philipp Schradera6712522023-07-05 20:25:11 -0700127 quit->Schedule(pi1_server_event_loop->monotonic_now() + duration);
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800128 });
129
130 pi1_server_event_loop->Run();
131 }
132
133 void StartPi1Server() {
Austin Schuha4e616a2023-05-15 17:59:30 -0700134 pi1_server_thread =
135 std::make_unique<ThreadedEventLoopRunner>(pi1_server_event_loop.get());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800136 }
137
138 void StopPi1Server() {
Austin Schuha4e616a2023-05-15 17:59:30 -0700139 pi1_server_thread.reset();
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800140 pi1_message_bridge_server.reset();
141 pi1_server_event_loop.reset();
142 }
143
144 void MakePi1Client() {
145 OnPi1();
146 FLAGS_application_name = "pi1_message_bridge_client";
147 pi1_client_event_loop =
Austin Schuhf466ab52021-02-16 22:00:38 -0800148 std::make_unique<aos::ShmEventLoop>(&config.message());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800149 pi1_client_event_loop->SetRuntimeRealtimePriority(1);
Austin Schuhb0e439d2023-05-15 10:55:40 -0700150 pi1_message_bridge_client = std::make_unique<MessageBridgeClient>(
151 pi1_client_event_loop.get(), config_sha256);
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800152 }
153
154 void StartPi1Client() {
Austin Schuha4e616a2023-05-15 17:59:30 -0700155 pi1_client_thread =
156 std::make_unique<ThreadedEventLoopRunner>(pi1_client_event_loop.get());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800157 }
158
159 void StopPi1Client() {
Austin Schuha4e616a2023-05-15 17:59:30 -0700160 pi1_client_thread.reset();
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800161 pi1_message_bridge_client.reset();
162 pi1_client_event_loop.reset();
163 }
164
165 void MakePi1Test() {
166 OnPi1();
167 FLAGS_application_name = "test1";
168 pi1_test_event_loop =
Austin Schuhf466ab52021-02-16 22:00:38 -0800169 std::make_unique<aos::ShmEventLoop>(&config.message());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800170
171 pi1_test_event_loop->MakeWatcher(
172 "/pi1/aos", [](const ServerStatistics &stats) {
173 VLOG(1) << "/pi1/aos ServerStatistics " << FlatbufferToJson(&stats);
174 });
175
176 pi1_test_event_loop->MakeWatcher(
177 "/pi1/aos", [](const ClientStatistics &stats) {
178 VLOG(1) << "/pi1/aos ClientStatistics " << FlatbufferToJson(&stats);
179 });
180
181 pi1_test_event_loop->MakeWatcher(
182 "/pi1/aos", [](const Timestamp &timestamp) {
183 VLOG(1) << "/pi1/aos Timestamp " << FlatbufferToJson(&timestamp);
184 });
Austin Schuh8902fa52021-03-14 22:39:24 -0700185 pi1_test_event_loop->MakeWatcher(
186 "/pi2/aos", [this](const Timestamp &timestamp) {
187 VLOG(1) << "/pi2/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuha9012be2021-07-21 15:19:11 -0700188 EXPECT_EQ(pi1_test_event_loop->context().source_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -0700189 pi2_boot_uuid_);
190 });
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800191 }
192
193 void StartPi1Test() {
Austin Schuha4e616a2023-05-15 17:59:30 -0700194 pi1_test_thread =
195 std::make_unique<ThreadedEventLoopRunner>(pi1_test_event_loop.get());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800196 }
197
Austin Schuha4e616a2023-05-15 17:59:30 -0700198 void StopPi1Test() { pi1_test_thread.reset(); }
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800199
200 void MakePi2Server() {
201 OnPi2();
202 FLAGS_application_name = "pi2_message_bridge_server";
203 pi2_server_event_loop =
Austin Schuhf466ab52021-02-16 22:00:38 -0800204 std::make_unique<aos::ShmEventLoop>(&config.message());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800205 pi2_server_event_loop->SetRuntimeRealtimePriority(1);
Austin Schuhb0e439d2023-05-15 10:55:40 -0700206 pi2_message_bridge_server = std::make_unique<MessageBridgeServer>(
207 pi2_server_event_loop.get(), config_sha256);
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800208 }
209
210 void RunPi2Server(chrono::nanoseconds duration) {
Philipp Schradera6712522023-07-05 20:25:11 -0700211 // Set up a shutdown callback.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800212 aos::TimerHandler *const quit = pi2_server_event_loop->AddTimer(
213 [this]() { pi2_server_event_loop->Exit(); });
214 pi2_server_event_loop->OnRun([this, quit, duration]() {
215 // Stop between timestamps, not exactly on them.
Philipp Schradera6712522023-07-05 20:25:11 -0700216 quit->Schedule(pi2_server_event_loop->monotonic_now() + duration);
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800217 });
218
219 pi2_server_event_loop->Run();
220 }
221
222 void StartPi2Server() {
Austin Schuha4e616a2023-05-15 17:59:30 -0700223 pi2_server_thread =
224 std::make_unique<ThreadedEventLoopRunner>(pi2_server_event_loop.get());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800225 }
226
227 void StopPi2Server() {
Austin Schuha4e616a2023-05-15 17:59:30 -0700228 pi2_server_thread.reset();
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800229 pi2_message_bridge_server.reset();
230 pi2_server_event_loop.reset();
231 }
232
233 void MakePi2Client() {
234 OnPi2();
235 FLAGS_application_name = "pi2_message_bridge_client";
236 pi2_client_event_loop =
Austin Schuhf466ab52021-02-16 22:00:38 -0800237 std::make_unique<aos::ShmEventLoop>(&config.message());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800238 pi2_client_event_loop->SetRuntimeRealtimePriority(1);
Austin Schuhb0e439d2023-05-15 10:55:40 -0700239 pi2_message_bridge_client = std::make_unique<MessageBridgeClient>(
240 pi2_client_event_loop.get(), config_sha256);
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800241 }
242
243 void RunPi2Client(chrono::nanoseconds duration) {
244 // Run for 5 seconds to make sure we have time to estimate the offset.
245 aos::TimerHandler *const quit = pi2_client_event_loop->AddTimer(
246 [this]() { pi2_client_event_loop->Exit(); });
247 pi2_client_event_loop->OnRun([this, quit, duration]() {
248 // Stop between timestamps, not exactly on them.
Philipp Schradera6712522023-07-05 20:25:11 -0700249 quit->Schedule(pi2_client_event_loop->monotonic_now() + duration);
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800250 });
251
252 // And go!
253 pi2_client_event_loop->Run();
254 }
255
256 void StartPi2Client() {
Austin Schuha4e616a2023-05-15 17:59:30 -0700257 pi2_client_thread =
258 std::make_unique<ThreadedEventLoopRunner>(pi2_client_event_loop.get());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800259 }
260
261 void StopPi2Client() {
Austin Schuha4e616a2023-05-15 17:59:30 -0700262 pi2_client_thread.reset();
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800263 pi2_message_bridge_client.reset();
264 pi2_client_event_loop.reset();
265 }
266
267 void MakePi2Test() {
268 OnPi2();
269 FLAGS_application_name = "test2";
270 pi2_test_event_loop =
Austin Schuhf466ab52021-02-16 22:00:38 -0800271 std::make_unique<aos::ShmEventLoop>(&config.message());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800272
273 pi2_test_event_loop->MakeWatcher(
274 "/pi2/aos", [](const ServerStatistics &stats) {
275 VLOG(1) << "/pi2/aos ServerStatistics " << FlatbufferToJson(&stats);
276 });
277
278 pi2_test_event_loop->MakeWatcher(
279 "/pi2/aos", [](const ClientStatistics &stats) {
280 VLOG(1) << "/pi2/aos ClientStatistics " << FlatbufferToJson(&stats);
281 });
282
283 pi2_test_event_loop->MakeWatcher(
Austin Schuh8902fa52021-03-14 22:39:24 -0700284 "/pi1/aos", [this](const Timestamp &timestamp) {
285 VLOG(1) << "/pi1/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuha9012be2021-07-21 15:19:11 -0700286 EXPECT_EQ(pi2_test_event_loop->context().source_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -0700287 pi1_boot_uuid_);
288 });
289 pi2_test_event_loop->MakeWatcher(
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800290 "/pi2/aos", [](const Timestamp &timestamp) {
291 VLOG(1) << "/pi2/aos Timestamp " << FlatbufferToJson(&timestamp);
292 });
293 }
294
295 void StartPi2Test() {
Austin Schuha4e616a2023-05-15 17:59:30 -0700296 pi2_test_thread =
297 std::make_unique<ThreadedEventLoopRunner>(pi2_test_event_loop.get());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800298 }
299
Austin Schuha4e616a2023-05-15 17:59:30 -0700300 void StopPi2Test() { pi2_test_thread.reset(); }
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800301
Austin Schuhf466ab52021-02-16 22:00:38 -0800302 aos::FlatbufferDetachedBuffer<aos::Configuration> config;
Austin Schuhb0e439d2023-05-15 10:55:40 -0700303 std::string config_sha256;
304
Austin Schuh8902fa52021-03-14 22:39:24 -0700305 const UUID pi1_boot_uuid_;
306 const UUID pi2_boot_uuid_;
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800307
308 std::unique_ptr<aos::ShmEventLoop> pi1_server_event_loop;
309 std::unique_ptr<MessageBridgeServer> pi1_message_bridge_server;
Austin Schuha4e616a2023-05-15 17:59:30 -0700310 std::unique_ptr<ThreadedEventLoopRunner> pi1_server_thread;
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800311
312 std::unique_ptr<aos::ShmEventLoop> pi1_client_event_loop;
313 std::unique_ptr<MessageBridgeClient> pi1_message_bridge_client;
Austin Schuha4e616a2023-05-15 17:59:30 -0700314 std::unique_ptr<ThreadedEventLoopRunner> pi1_client_thread;
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800315
316 std::unique_ptr<aos::ShmEventLoop> pi1_test_event_loop;
Austin Schuha4e616a2023-05-15 17:59:30 -0700317 std::unique_ptr<ThreadedEventLoopRunner> pi1_test_thread;
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800318
319 std::unique_ptr<aos::ShmEventLoop> pi2_server_event_loop;
320 std::unique_ptr<MessageBridgeServer> pi2_message_bridge_server;
Austin Schuha4e616a2023-05-15 17:59:30 -0700321 std::unique_ptr<ThreadedEventLoopRunner> pi2_server_thread;
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800322
323 std::unique_ptr<aos::ShmEventLoop> pi2_client_event_loop;
324 std::unique_ptr<MessageBridgeClient> pi2_message_bridge_client;
Austin Schuha4e616a2023-05-15 17:59:30 -0700325 std::unique_ptr<ThreadedEventLoopRunner> pi2_client_thread;
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800326
327 std::unique_ptr<aos::ShmEventLoop> pi2_test_event_loop;
Austin Schuha4e616a2023-05-15 17:59:30 -0700328 std::unique_ptr<ThreadedEventLoopRunner> pi2_test_thread;
Austin Schuhe991fe22020-11-18 16:53:39 -0800329};
330
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800331// Test that we can send a ping message over sctp and receive it.
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800332TEST_P(MessageBridgeParameterizedTest, PingPong) {
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800333 // This is rather annoying to set up. We need to start up a client and
334 // server, on the same node, but get them to think that they are on different
335 // nodes.
336 //
337 // We then get to wait until they are connected.
338 //
339 // After they are connected, we send a Ping message.
340 //
341 // On the other end, we receive a Pong message.
342 //
343 // But, we need the client to not post directly to "/test" like it would in a
344 // real system, otherwise we will re-send the ping message... So, use an
345 // application specific map to have the client post somewhere else.
346 //
347 // To top this all off, each of these needs to be done with a ShmEventLoop,
348 // which needs to run in a separate thread... And it is really hard to get
349 // everything started up reliably. So just be super generous on timeouts and
350 // hope for the best. We can be more generous in the future if we need to.
351 //
352 // We are faking the application names by passing in --application_name=foo
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800353 OnPi1();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800354 // Force ourselves to be "raspberrypi" and allocate everything.
Austin Schuh2f8fd752020-09-01 22:38:28 -0700355
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800356 MakePi1Server();
357 MakePi1Client();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800358
Austin Schuh89e1e9c2023-05-15 14:38:44 -0700359 const std::string long_data = std::string(10000, 'a');
360
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800361 // And build the app which sends the pings.
362 FLAGS_application_name = "ping";
Austin Schuhf466ab52021-02-16 22:00:38 -0800363 aos::ShmEventLoop ping_event_loop(&config.message());
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800364 aos::Sender<examples::Ping> ping_sender =
365 ping_event_loop.MakeSender<examples::Ping>("/test");
366
Austin Schuhf466ab52021-02-16 22:00:38 -0800367 aos::ShmEventLoop pi1_test_event_loop(&config.message());
Austin Schuh0de30f32020-12-06 12:44:28 -0800368 aos::Fetcher<RemoteMessage> message_header_fetcher1 =
369 pi1_test_event_loop.MakeFetcher<RemoteMessage>(
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800370 shared() ? "/pi1/aos/remote_timestamps/pi2"
371 : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping");
Austin Schuh2f8fd752020-09-01 22:38:28 -0700372
373 // Fetchers for confirming the remote timestamps made it.
374 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
375 ping_event_loop.MakeFetcher<examples::Ping>("/test");
376 aos::Fetcher<Timestamp> pi1_on_pi1_timestamp_fetcher =
377 ping_event_loop.MakeFetcher<Timestamp>("/aos");
378
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800379 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800380 OnPi2();
Austin Schuh2f8fd752020-09-01 22:38:28 -0700381
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800382 MakePi2Client();
383 MakePi2Server();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800384
385 // And build the app which sends the pongs.
386 FLAGS_application_name = "pong";
Austin Schuhf466ab52021-02-16 22:00:38 -0800387 aos::ShmEventLoop pong_event_loop(&config.message());
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800388
Austin Schuh7bc59052020-02-16 23:48:33 -0800389 // And build the app for testing.
390 FLAGS_application_name = "test";
Austin Schuhf466ab52021-02-16 22:00:38 -0800391 aos::ShmEventLoop test_event_loop(&config.message());
Austin Schuh7bc59052020-02-16 23:48:33 -0800392
393 aos::Fetcher<ClientStatistics> client_statistics_fetcher =
394 test_event_loop.MakeFetcher<ClientStatistics>("/aos");
Austin Schuh0de30f32020-12-06 12:44:28 -0800395 aos::Fetcher<RemoteMessage> message_header_fetcher2 =
396 test_event_loop.MakeFetcher<RemoteMessage>(
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800397 shared() ? "/pi2/aos/remote_timestamps/pi1"
398 : "/pi2/aos/remote_timestamps/pi1/pi2/aos/"
399 "aos-message_bridge-Timestamp");
Austin Schuh2f8fd752020-09-01 22:38:28 -0700400
401 // Event loop for fetching data delivered to pi2 from pi1 to match up
402 // messages.
Austin Schuhf466ab52021-02-16 22:00:38 -0800403 aos::ShmEventLoop delivered_messages_event_loop(&config.message());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700404 aos::Fetcher<Timestamp> pi1_on_pi2_timestamp_fetcher =
405 delivered_messages_event_loop.MakeFetcher<Timestamp>("/pi1/aos");
406 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
407 delivered_messages_event_loop.MakeFetcher<examples::Ping>("/test");
408 EXPECT_FALSE(ping_on_pi2_fetcher.Fetch());
409 EXPECT_FALSE(pi1_on_pi2_timestamp_fetcher.Fetch());
Austin Schuh7bc59052020-02-16 23:48:33 -0800410
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800411 // Count the pongs.
412 int pong_count = 0;
Austin Schuh8902fa52021-03-14 22:39:24 -0700413 pong_event_loop.MakeWatcher("/test", [&pong_count, &pong_event_loop,
414 this](const examples::Ping &ping) {
Austin Schuha9012be2021-07-21 15:19:11 -0700415 EXPECT_EQ(pong_event_loop.context().source_boot_uuid, pi1_boot_uuid_);
Austin Schuh8902fa52021-03-14 22:39:24 -0700416 ++pong_count;
417 VLOG(1) << "Got ping back " << FlatbufferToJson(&ping);
418 });
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800419
420 FLAGS_override_hostname = "";
421
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800422 // Wait until we are connected, then send.
423 int ping_count = 0;
Austin Schuh7bc59052020-02-16 23:48:33 -0800424 int pi1_server_statistics_count = 0;
Philipp Schrader790cb542023-07-05 21:06:52 -0700425 ping_event_loop.MakeWatcher(
426 "/pi1/aos",
427 [this, &ping_count, &ping_sender, &pi1_server_statistics_count,
428 &long_data](const ServerStatistics &stats) {
429 VLOG(1) << "/pi1/aos ServerStatistics " << FlatbufferToJson(&stats);
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800430
Philipp Schrader790cb542023-07-05 21:06:52 -0700431 ASSERT_TRUE(stats.has_connections());
432 EXPECT_EQ(stats.connections()->size(), 1);
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800433
Philipp Schrader790cb542023-07-05 21:06:52 -0700434 bool connected = false;
435 for (const ServerConnection *connection : *stats.connections()) {
436 // Confirm that we are estimating the server time offset correctly. It
437 // should be about 0 since we are on the same machine here.
438 if (connection->has_monotonic_offset()) {
439 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
440 chrono::milliseconds(1));
441 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
442 chrono::milliseconds(-1));
443 ++pi1_server_statistics_count;
444 }
Austin Schuh7bc59052020-02-16 23:48:33 -0800445
Philipp Schrader790cb542023-07-05 21:06:52 -0700446 if (connection->node()->name()->string_view() ==
447 pi2_client_event_loop->node()->name()->string_view()) {
448 if (connection->state() == State::CONNECTED) {
449 EXPECT_TRUE(connection->has_boot_uuid());
450 EXPECT_EQ(connection->connection_count(), 1u);
451 EXPECT_LT(monotonic_clock::time_point(chrono::nanoseconds(
452 connection->connected_since_time())),
453 monotonic_clock::now());
454 connected = true;
455 } else {
456 EXPECT_FALSE(connection->has_connection_count());
457 EXPECT_FALSE(connection->has_connected_since_time());
458 }
459 }
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800460 }
461
Philipp Schrader790cb542023-07-05 21:06:52 -0700462 if (connected) {
463 VLOG(1) << "Connected! Sent ping.";
464 auto builder = ping_sender.MakeBuilder();
465 builder.fbb()->CreateString(long_data);
466 examples::Ping::Builder ping_builder =
467 builder.MakeBuilder<examples::Ping>();
468 ping_builder.add_value(ping_count + 971);
469 EXPECT_EQ(builder.Send(ping_builder.Finish()), RawSender::Error::kOk);
470 ++ping_count;
471 }
472 });
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800473
Austin Schuh7bc59052020-02-16 23:48:33 -0800474 // Confirm both client and server statistics messages have decent offsets in
475 // them.
476 int pi2_server_statistics_count = 0;
Austin Schuh196a4452020-03-15 23:12:03 -0700477 pong_event_loop.MakeWatcher("/pi2/aos", [&pi2_server_statistics_count](
Austin Schuh7bc59052020-02-16 23:48:33 -0800478 const ServerStatistics &stats) {
Austin Schuh1ca49e92020-12-11 00:01:27 -0800479 VLOG(1) << "/pi2/aos ServerStatistics " << FlatbufferToJson(&stats);
Austin Schuh7bc59052020-02-16 23:48:33 -0800480 for (const ServerConnection *connection : *stats.connections()) {
481 if (connection->has_monotonic_offset()) {
482 ++pi2_server_statistics_count;
483 // Confirm that we are estimating the server time offset correctly. It
484 // should be about 0 since we are on the same machine here.
485 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
486 chrono::milliseconds(1));
487 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
488 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800489 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh7bc59052020-02-16 23:48:33 -0800490 }
Austin Schuh367a7f42021-11-23 23:04:36 -0800491
492 if (connection->state() == State::CONNECTED) {
493 EXPECT_EQ(connection->connection_count(), 1u);
494 EXPECT_LT(monotonic_clock::time_point(
495 chrono::nanoseconds(connection->connected_since_time())),
496 monotonic_clock::now());
497 } else {
Austin Schuha4e616a2023-05-15 17:59:30 -0700498 // If we have been connected, we expect the connection count to stay
499 // around.
500 if (pi2_server_statistics_count > 0) {
501 EXPECT_TRUE(connection->has_connection_count());
502 EXPECT_EQ(connection->connection_count(), 1u);
503 } else {
504 EXPECT_FALSE(connection->has_connection_count());
505 }
Austin Schuh367a7f42021-11-23 23:04:36 -0800506 EXPECT_FALSE(connection->has_connected_since_time());
507 }
Austin Schuh7bc59052020-02-16 23:48:33 -0800508 }
509 });
510
511 int pi1_client_statistics_count = 0;
Austin Schuh367a7f42021-11-23 23:04:36 -0800512 int pi1_connected_client_statistics_count = 0;
513 ping_event_loop.MakeWatcher(
514 "/pi1/aos",
515 [&pi1_client_statistics_count,
516 &pi1_connected_client_statistics_count](const ClientStatistics &stats) {
517 VLOG(1) << "/pi1/aos ClientStatistics " << FlatbufferToJson(&stats);
Austin Schuh7bc59052020-02-16 23:48:33 -0800518
Austin Schuh367a7f42021-11-23 23:04:36 -0800519 for (const ClientConnection *connection : *stats.connections()) {
520 if (connection->has_monotonic_offset()) {
521 ++pi1_client_statistics_count;
522 // It takes at least 10 microseconds to send a message between the
523 // client and server. The min (filtered) time shouldn't be over 10
524 // milliseconds on localhost. This might have to bump up if this is
525 // proving flaky.
526 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
527 chrono::milliseconds(10))
528 << " " << connection->monotonic_offset()
529 << "ns vs 10000ns on iteration " << pi1_client_statistics_count;
530 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
531 chrono::microseconds(10))
532 << " " << connection->monotonic_offset()
533 << "ns vs 10000ns on iteration " << pi1_client_statistics_count;
534 }
535 if (connection->state() == State::CONNECTED) {
536 EXPECT_EQ(connection->connection_count(), 1u);
537 EXPECT_LT(monotonic_clock::time_point(chrono::nanoseconds(
538 connection->connected_since_time())),
539 monotonic_clock::now());
540 // The first Connected message may not have a UUID in it since no
541 // data has flown. That's fine.
542 if (pi1_connected_client_statistics_count > 0) {
543 EXPECT_TRUE(connection->has_boot_uuid())
544 << ": " << aos::FlatbufferToJson(connection);
545 }
546 ++pi1_connected_client_statistics_count;
547 } else {
548 EXPECT_FALSE(connection->has_connection_count());
549 EXPECT_FALSE(connection->has_connected_since_time());
550 }
551 }
552 });
Austin Schuh7bc59052020-02-16 23:48:33 -0800553
554 int pi2_client_statistics_count = 0;
Austin Schuh367a7f42021-11-23 23:04:36 -0800555 int pi2_connected_client_statistics_count = 0;
556 pong_event_loop.MakeWatcher(
557 "/pi2/aos",
558 [&pi2_client_statistics_count,
559 &pi2_connected_client_statistics_count](const ClientStatistics &stats) {
560 VLOG(1) << "/pi2/aos ClientStatistics " << FlatbufferToJson(&stats);
Austin Schuh7bc59052020-02-16 23:48:33 -0800561
Austin Schuh367a7f42021-11-23 23:04:36 -0800562 for (const ClientConnection *connection : *stats.connections()) {
563 if (connection->has_monotonic_offset()) {
564 ++pi2_client_statistics_count;
565 EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
566 chrono::milliseconds(10))
567 << ": got " << aos::FlatbufferToJson(connection);
568 EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
569 chrono::microseconds(10))
570 << ": got " << aos::FlatbufferToJson(connection);
571 }
572 if (connection->state() == State::CONNECTED) {
573 EXPECT_EQ(connection->connection_count(), 1u);
574 EXPECT_LT(monotonic_clock::time_point(chrono::nanoseconds(
575 connection->connected_since_time())),
576 monotonic_clock::now());
577 if (pi2_connected_client_statistics_count > 0) {
578 EXPECT_TRUE(connection->has_boot_uuid());
579 }
580 ++pi2_connected_client_statistics_count;
581 } else {
Austin Schuha4e616a2023-05-15 17:59:30 -0700582 if (pi2_connected_client_statistics_count == 0) {
583 EXPECT_FALSE(connection->has_connection_count())
584 << aos::FlatbufferToJson(&stats);
585 } else {
586 EXPECT_TRUE(connection->has_connection_count())
587 << aos::FlatbufferToJson(&stats);
588 EXPECT_EQ(connection->connection_count(), 1u);
589 }
Austin Schuh367a7f42021-11-23 23:04:36 -0800590 EXPECT_FALSE(connection->has_connected_since_time());
591 }
592 }
593 });
Austin Schuh7bc59052020-02-16 23:48:33 -0800594
Austin Schuh196a4452020-03-15 23:12:03 -0700595 ping_event_loop.MakeWatcher("/pi1/aos", [](const Timestamp &timestamp) {
Austin Schuh7bc59052020-02-16 23:48:33 -0800596 EXPECT_TRUE(timestamp.has_offsets());
Austin Schuh1ca49e92020-12-11 00:01:27 -0800597 VLOG(1) << "/pi1/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuh7bc59052020-02-16 23:48:33 -0800598 });
Austin Schuh196a4452020-03-15 23:12:03 -0700599 pong_event_loop.MakeWatcher("/pi2/aos", [](const Timestamp &timestamp) {
Austin Schuh7bc59052020-02-16 23:48:33 -0800600 EXPECT_TRUE(timestamp.has_offsets());
Austin Schuh1ca49e92020-12-11 00:01:27 -0800601 VLOG(1) << "/pi2/aos Timestamp " << FlatbufferToJson(&timestamp);
Austin Schuh7bc59052020-02-16 23:48:33 -0800602 });
603
Austin Schuh2f8fd752020-09-01 22:38:28 -0700604 // Find the channel index for both the /pi1/aos Timestamp channel and Ping
605 // channel.
606 const size_t pi1_timestamp_channel = configuration::ChannelIndex(
607 pong_event_loop.configuration(), pi1_on_pi2_timestamp_fetcher.channel());
608 const size_t ping_timestamp_channel =
609 configuration::ChannelIndex(delivered_messages_event_loop.configuration(),
610 ping_on_pi2_fetcher.channel());
611
612 for (const Channel *channel : *ping_event_loop.configuration()->channels()) {
613 VLOG(1) << "Channel "
614 << configuration::ChannelIndex(ping_event_loop.configuration(),
615 channel)
616 << " " << configuration::CleanedChannelToString(channel);
617 }
618
619 // For each remote timestamp we get back, confirm that it is either a ping
620 // message, or a timestamp we sent out. Also confirm that the timestamps are
621 // correct.
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800622 for (std::pair<int, std::string> channel :
623 shared()
624 ? std::vector<std::pair<
625 int, std::string>>{{-1, "/pi1/aos/remote_timestamps/pi2"}}
626 : std::vector<std::pair<int, std::string>>{
627 {pi1_timestamp_channel,
628 "/pi1/aos/remote_timestamps/pi2/pi1/aos/"
629 "aos-message_bridge-Timestamp"},
630 {ping_timestamp_channel,
631 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping"}}) {
632 ping_event_loop.MakeWatcher(
633 channel.second,
634 [pi1_timestamp_channel, ping_timestamp_channel, &ping_on_pi2_fetcher,
635 &ping_on_pi1_fetcher, &pi1_on_pi2_timestamp_fetcher,
636 &pi1_on_pi1_timestamp_fetcher,
637 channel_index = channel.first](const RemoteMessage &header) {
638 VLOG(1) << "/pi1/aos/remote_timestamps/pi2 RemoteMessage "
639 << aos::FlatbufferToJson(&header);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700640
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800641 EXPECT_TRUE(header.has_boot_uuid());
642 if (channel_index != -1) {
643 ASSERT_EQ(channel_index, header.channel_index());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700644 }
645
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800646 const aos::monotonic_clock::time_point header_monotonic_sent_time(
647 chrono::nanoseconds(header.monotonic_sent_time()));
648 const aos::realtime_clock::time_point header_realtime_sent_time(
649 chrono::nanoseconds(header.realtime_sent_time()));
650 const aos::monotonic_clock::time_point header_monotonic_remote_time(
651 chrono::nanoseconds(header.monotonic_remote_time()));
652 const aos::realtime_clock::time_point header_realtime_remote_time(
653 chrono::nanoseconds(header.realtime_remote_time()));
654
655 const Context *pi1_context = nullptr;
656 const Context *pi2_context = nullptr;
657
658 if (header.channel_index() == pi1_timestamp_channel) {
659 // Find the forwarded message.
660 while (pi1_on_pi2_timestamp_fetcher.context().monotonic_event_time <
661 header_monotonic_sent_time) {
662 ASSERT_TRUE(pi1_on_pi2_timestamp_fetcher.FetchNext());
663 }
664
665 // And the source message.
666 while (pi1_on_pi1_timestamp_fetcher.context().monotonic_event_time <
667 header_monotonic_remote_time) {
668 ASSERT_TRUE(pi1_on_pi1_timestamp_fetcher.FetchNext());
669 }
670
671 pi1_context = &pi1_on_pi1_timestamp_fetcher.context();
672 pi2_context = &pi1_on_pi2_timestamp_fetcher.context();
673 } else if (header.channel_index() == ping_timestamp_channel) {
674 // Find the forwarded message.
675 while (ping_on_pi2_fetcher.context().monotonic_event_time <
676 header_monotonic_sent_time) {
677 ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
678 }
679
680 // And the source message.
681 while (ping_on_pi1_fetcher.context().monotonic_event_time <
682 header_monotonic_remote_time) {
683 ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
684 }
685
686 pi1_context = &ping_on_pi1_fetcher.context();
687 pi2_context = &ping_on_pi2_fetcher.context();
688 } else {
689 LOG(FATAL) << "Unknown channel";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700690 }
691
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800692 // Confirm the forwarded message has matching timestamps to the
693 // timestamps we got back.
694 EXPECT_EQ(pi2_context->queue_index, header.queue_index());
695 EXPECT_EQ(pi2_context->monotonic_event_time,
696 header_monotonic_sent_time);
697 EXPECT_EQ(pi2_context->realtime_event_time,
698 header_realtime_sent_time);
699 EXPECT_EQ(pi2_context->realtime_remote_time,
700 header_realtime_remote_time);
701 EXPECT_EQ(pi2_context->monotonic_remote_time,
702 header_monotonic_remote_time);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700703
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800704 // Confirm the forwarded message also matches the source message.
705 EXPECT_EQ(pi1_context->queue_index, header.queue_index());
706 EXPECT_EQ(pi1_context->monotonic_event_time,
707 header_monotonic_remote_time);
708 EXPECT_EQ(pi1_context->realtime_event_time,
709 header_realtime_remote_time);
710 });
711 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700712
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800713 // Start everything up. Pong is the only thing we don't know how to wait
714 // on, so start it first.
Austin Schuha4e616a2023-05-15 17:59:30 -0700715 ThreadedEventLoopRunner pong_thread(&pong_event_loop);
716 ThreadedEventLoopRunner ping_thread(&ping_event_loop);
Austin Schuh7bc59052020-02-16 23:48:33 -0800717
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800718 StartPi1Server();
719 StartPi1Client();
720 StartPi2Client();
721 StartPi2Server();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800722
723 // And go!
Austin Schuha4e616a2023-05-15 17:59:30 -0700724 // Run for 5 seconds to make sure we have time to estimate the offset.
725 std::this_thread::sleep_for(chrono::milliseconds(5050));
Austin Schuh7bc59052020-02-16 23:48:33 -0800726
727 // Confirm that we are estimating a monotonic offset on the client.
728 ASSERT_TRUE(client_statistics_fetcher.Fetch());
729
730 EXPECT_EQ(client_statistics_fetcher->connections()->size(), 1u);
731 EXPECT_EQ(client_statistics_fetcher->connections()
732 ->Get(0)
733 ->node()
734 ->name()
735 ->string_view(),
736 "pi1");
737
738 // Make sure the offset in one direction is less than a second.
739 EXPECT_GT(
Austin Schuh2b159eb2021-07-31 19:42:21 -0700740 client_statistics_fetcher->connections()->Get(0)->monotonic_offset(), 0)
741 << aos::FlatbufferToJson(client_statistics_fetcher.get());
Austin Schuh7bc59052020-02-16 23:48:33 -0800742 EXPECT_LT(
743 client_statistics_fetcher->connections()->Get(0)->monotonic_offset(),
Austin Schuh2b159eb2021-07-31 19:42:21 -0700744 1000000000)
745 << aos::FlatbufferToJson(client_statistics_fetcher.get());
Austin Schuh7bc59052020-02-16 23:48:33 -0800746
Austin Schuha4e616a2023-05-15 17:59:30 -0700747 // Shut everyone else down before confirming everything actually ran.
748 ping_thread.Exit();
749 pong_thread.Exit();
750 StopPi1Server();
751 StopPi1Client();
752 StopPi2Client();
753 StopPi2Server();
754
755 // Make sure we sent something.
756 EXPECT_GE(ping_count, 1);
757 // And got something back.
758 EXPECT_GE(pong_count, 1);
759
Austin Schuh7bc59052020-02-16 23:48:33 -0800760 EXPECT_GE(pi1_server_statistics_count, 2);
761 EXPECT_GE(pi2_server_statistics_count, 2);
762 EXPECT_GE(pi1_client_statistics_count, 2);
763 EXPECT_GE(pi2_client_statistics_count, 2);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700764
765 // Confirm we got timestamps back!
766 EXPECT_TRUE(message_header_fetcher1.Fetch());
767 EXPECT_TRUE(message_header_fetcher2.Fetch());
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800768}
769
Austin Schuh5344c352020-04-12 17:04:26 -0700770// Test that the client disconnecting triggers the server offsets on both sides
771// to clear.
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800772TEST_P(MessageBridgeParameterizedTest, ClientRestart) {
Austin Schuh5344c352020-04-12 17:04:26 -0700773 // This is rather annoying to set up. We need to start up a client and
774 // server, on the same node, but get them to think that they are on different
775 // nodes.
776 //
777 // We need the client to not post directly to "/test" like it would in a
778 // real system, otherwise we will re-send the ping message... So, use an
779 // application specific map to have the client post somewhere else.
780 //
781 // To top this all off, each of these needs to be done with a ShmEventLoop,
782 // which needs to run in a separate thread... And it is really hard to get
783 // everything started up reliably. So just be super generous on timeouts and
784 // hope for the best. We can be more generous in the future if we need to.
785 //
786 // We are faking the application names by passing in --application_name=foo
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800787 OnPi1();
Austin Schuh5344c352020-04-12 17:04:26 -0700788
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800789 MakePi1Server();
790 MakePi1Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700791
792 // And build the app for testing.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800793 MakePi1Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700794 aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800795 pi1_test_event_loop->MakeFetcher<ServerStatistics>("/pi1/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700796
797 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800798 OnPi2();
799 MakePi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700800
801 // And build the app for testing.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800802 MakePi2Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700803 aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800804 pi2_test_event_loop->MakeFetcher<ServerStatistics>("/pi2/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700805
806 // Wait until we are connected, then send.
Austin Schuh5344c352020-04-12 17:04:26 -0700807
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800808 StartPi1Test();
809 StartPi2Test();
810 StartPi1Server();
811 StartPi1Client();
812 StartPi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700813
814 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800815 MakePi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700816
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800817 RunPi2Client(chrono::milliseconds(3050));
Austin Schuh5344c352020-04-12 17:04:26 -0700818
819 // Now confirm we are synchronized.
820 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
821 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
822
823 const ServerConnection *const pi1_connection =
824 pi1_server_statistics_fetcher->connections()->Get(0);
825 const ServerConnection *const pi2_connection =
826 pi2_server_statistics_fetcher->connections()->Get(0);
827
828 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800829 EXPECT_EQ(pi1_connection->connection_count(), 1u);
830 EXPECT_TRUE(pi1_connection->has_connected_since_time());
Austin Schuh5344c352020-04-12 17:04:26 -0700831 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
832 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
833 chrono::milliseconds(1));
834 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
835 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800836 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700837
838 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800839 EXPECT_EQ(pi2_connection->connection_count(), 1u);
840 EXPECT_TRUE(pi2_connection->has_connected_since_time());
Austin Schuh5344c352020-04-12 17:04:26 -0700841 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
842 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
843 chrono::milliseconds(1));
844 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
845 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800846 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800847
848 StopPi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700849 }
850
Austin Schuhd0d894e2021-10-24 17:13:11 -0700851 std::this_thread::sleep_for(SctpClientConnection::kReconnectTimeout +
852 std::chrono::seconds(1));
Austin Schuh5344c352020-04-12 17:04:26 -0700853
854 {
855 // Now confirm we are un-synchronized.
856 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
857 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
858 const ServerConnection *const pi1_connection =
859 pi1_server_statistics_fetcher->connections()->Get(0);
860 const ServerConnection *const pi2_connection =
861 pi2_server_statistics_fetcher->connections()->Get(0);
862
863 EXPECT_EQ(pi1_connection->state(), State::DISCONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800864 EXPECT_EQ(pi1_connection->connection_count(), 1u);
865 EXPECT_FALSE(pi1_connection->has_connected_since_time());
Austin Schuh5344c352020-04-12 17:04:26 -0700866 EXPECT_FALSE(pi1_connection->has_monotonic_offset());
Austin Schuh20ac95d2020-12-05 17:24:19 -0800867 EXPECT_FALSE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700868 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
869 EXPECT_FALSE(pi2_connection->has_monotonic_offset());
Austin Schuh20ac95d2020-12-05 17:24:19 -0800870 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh367a7f42021-11-23 23:04:36 -0800871 EXPECT_EQ(pi2_connection->connection_count(), 1u);
872 EXPECT_TRUE(pi2_connection->has_connected_since_time());
Austin Schuh5344c352020-04-12 17:04:26 -0700873 }
874
875 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800876 MakePi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700877 // And go!
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800878 RunPi2Client(chrono::milliseconds(3050));
Austin Schuh5344c352020-04-12 17:04:26 -0700879
880 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
881 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
882
883 // Now confirm we are synchronized again.
884 const ServerConnection *const pi1_connection =
885 pi1_server_statistics_fetcher->connections()->Get(0);
886 const ServerConnection *const pi2_connection =
887 pi2_server_statistics_fetcher->connections()->Get(0);
888
889 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800890 EXPECT_EQ(pi1_connection->connection_count(), 2u);
891 EXPECT_TRUE(pi1_connection->has_connected_since_time());
Austin Schuh5344c352020-04-12 17:04:26 -0700892 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
893 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
Austin Schuh367a7f42021-11-23 23:04:36 -0800894 chrono::milliseconds(1))
895 << ": " << FlatbufferToJson(pi1_connection);
Austin Schuh5344c352020-04-12 17:04:26 -0700896 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
Austin Schuh367a7f42021-11-23 23:04:36 -0800897 chrono::milliseconds(-1))
898 << ": " << FlatbufferToJson(pi1_connection);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800899 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -0700900
901 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800902 EXPECT_EQ(pi2_connection->connection_count(), 1u);
903 EXPECT_TRUE(pi2_connection->has_connected_since_time());
Austin Schuh5344c352020-04-12 17:04:26 -0700904 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
905 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
Austin Schuh367a7f42021-11-23 23:04:36 -0800906 chrono::milliseconds(1))
907 << ": " << FlatbufferToJson(pi2_connection);
Austin Schuh5344c352020-04-12 17:04:26 -0700908 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
Austin Schuh367a7f42021-11-23 23:04:36 -0800909 chrono::milliseconds(-1))
910 << ": " << FlatbufferToJson(pi2_connection);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800911 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800912
913 StopPi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700914 }
915
916 // Shut everyone else down
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800917 StopPi1Server();
918 StopPi1Client();
919 StopPi2Server();
920 StopPi1Test();
921 StopPi2Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700922}
923
924// Test that the server disconnecting triggers the server offsets on the other
925// side to clear, along with the other client.
Austin Schuh36a2c3e2021-02-18 22:28:38 -0800926TEST_P(MessageBridgeParameterizedTest, ServerRestart) {
Austin Schuh5344c352020-04-12 17:04:26 -0700927 // This is rather annoying to set up. We need to start up a client and
928 // server, on the same node, but get them to think that they are on different
929 // nodes.
930 //
931 // We need the client to not post directly to "/test" like it would in a
932 // real system, otherwise we will re-send the ping message... So, use an
933 // application specific map to have the client post somewhere else.
934 //
935 // To top this all off, each of these needs to be done with a ShmEventLoop,
936 // which needs to run in a separate thread... And it is really hard to get
937 // everything started up reliably. So just be super generous on timeouts and
938 // hope for the best. We can be more generous in the future if we need to.
939 //
940 // We are faking the application names by passing in --application_name=foo
Austin Schuh5344c352020-04-12 17:04:26 -0700941 // Force ourselves to be "raspberrypi" and allocate everything.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800942 OnPi1();
943 MakePi1Server();
944 MakePi1Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700945
946 // And build the app for testing.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800947 MakePi1Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700948 aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800949 pi1_test_event_loop->MakeFetcher<ServerStatistics>("/pi1/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700950 aos::Fetcher<ClientStatistics> pi1_client_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800951 pi1_test_event_loop->MakeFetcher<ClientStatistics>("/pi1/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700952
953 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800954 OnPi2();
955 MakePi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700956
957 // And build the app for testing.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800958 MakePi2Test();
Austin Schuh5344c352020-04-12 17:04:26 -0700959 aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800960 pi2_test_event_loop->MakeFetcher<ServerStatistics>("/pi2/aos");
Austin Schuh5344c352020-04-12 17:04:26 -0700961
962 // Start everything up. Pong is the only thing we don't know how to wait on,
963 // so start it first.
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800964 StartPi1Test();
965 StartPi2Test();
966 StartPi1Server();
967 StartPi1Client();
968 StartPi2Client();
Austin Schuh5344c352020-04-12 17:04:26 -0700969
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800970 // Confirm both client and server statistics messages have decent offsets in
971 // them.
Austin Schuh5344c352020-04-12 17:04:26 -0700972
973 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800974 MakePi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -0700975
Austin Schuh0a2f12f2021-01-08 22:48:29 -0800976 RunPi2Server(chrono::milliseconds(3050));
Austin Schuh5344c352020-04-12 17:04:26 -0700977
978 // Now confirm we are synchronized.
979 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
980 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
981
982 const ServerConnection *const pi1_connection =
983 pi1_server_statistics_fetcher->connections()->Get(0);
984 const ServerConnection *const pi2_connection =
985 pi2_server_statistics_fetcher->connections()->Get(0);
986
987 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
988 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
989 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
990 chrono::milliseconds(1));
991 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
992 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -0800993 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh367a7f42021-11-23 23:04:36 -0800994 EXPECT_TRUE(pi1_connection->has_connected_since_time());
995 EXPECT_EQ(pi1_connection->connection_count(), 1u);
Austin Schuh5344c352020-04-12 17:04:26 -0700996
997 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
998 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
999 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
1000 chrono::milliseconds(1));
1001 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
1002 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -08001003 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh367a7f42021-11-23 23:04:36 -08001004 EXPECT_TRUE(pi2_connection->has_connected_since_time());
1005 EXPECT_EQ(pi2_connection->connection_count(), 1u);
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001006
1007 StopPi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -07001008 }
1009
1010 std::this_thread::sleep_for(std::chrono::seconds(2));
1011
1012 {
1013 // And confirm we are unsynchronized.
1014 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1015 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1016
1017 const ServerConnection *const pi1_server_connection =
1018 pi1_server_statistics_fetcher->connections()->Get(0);
1019 const ClientConnection *const pi1_client_connection =
1020 pi1_client_statistics_fetcher->connections()->Get(0);
1021
1022 EXPECT_EQ(pi1_server_connection->state(), State::CONNECTED);
1023 EXPECT_FALSE(pi1_server_connection->has_monotonic_offset());
Austin Schuh367a7f42021-11-23 23:04:36 -08001024 EXPECT_TRUE(pi1_server_connection->has_connected_since_time());
1025 EXPECT_EQ(pi1_server_connection->connection_count(), 1u);
1026
Austin Schuh20ac95d2020-12-05 17:24:19 -08001027 EXPECT_TRUE(pi1_server_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -07001028 EXPECT_EQ(pi1_client_connection->state(), State::DISCONNECTED);
1029 EXPECT_FALSE(pi1_client_connection->has_monotonic_offset());
Austin Schuh367a7f42021-11-23 23:04:36 -08001030 EXPECT_FALSE(pi1_client_connection->has_connected_since_time());
1031 EXPECT_EQ(pi1_client_connection->connection_count(), 1u);
1032 EXPECT_FALSE(pi1_client_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -07001033 }
1034
1035 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001036 MakePi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -07001037
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001038 RunPi2Server(chrono::milliseconds(3050));
Austin Schuh5344c352020-04-12 17:04:26 -07001039
1040 // And confirm we are synchronized again.
1041 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1042 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
Austin Schuh367a7f42021-11-23 23:04:36 -08001043 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
Austin Schuh5344c352020-04-12 17:04:26 -07001044
1045 const ServerConnection *const pi1_connection =
1046 pi1_server_statistics_fetcher->connections()->Get(0);
1047 const ServerConnection *const pi2_connection =
1048 pi2_server_statistics_fetcher->connections()->Get(0);
Austin Schuh367a7f42021-11-23 23:04:36 -08001049 const ClientConnection *const pi1_client_connection =
1050 pi1_client_statistics_fetcher->connections()->Get(0);
Austin Schuh5344c352020-04-12 17:04:26 -07001051
1052 EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
1053 EXPECT_TRUE(pi1_connection->has_monotonic_offset());
1054 EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
1055 chrono::milliseconds(1));
1056 EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
1057 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -08001058 EXPECT_TRUE(pi1_connection->has_boot_uuid());
Austin Schuh5344c352020-04-12 17:04:26 -07001059
Austin Schuh367a7f42021-11-23 23:04:36 -08001060 EXPECT_EQ(pi1_client_connection->state(), State::CONNECTED);
1061 EXPECT_TRUE(pi1_client_connection->has_connected_since_time());
1062 EXPECT_EQ(pi1_client_connection->connection_count(), 2u);
1063 EXPECT_TRUE(pi1_client_connection->has_boot_uuid());
1064
Austin Schuh5344c352020-04-12 17:04:26 -07001065 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
1066 EXPECT_TRUE(pi2_connection->has_monotonic_offset());
1067 EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
1068 chrono::milliseconds(1));
1069 EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
1070 chrono::milliseconds(-1));
Austin Schuh20ac95d2020-12-05 17:24:19 -08001071 EXPECT_TRUE(pi2_connection->has_boot_uuid());
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001072
1073 StopPi2Server();
Austin Schuh5344c352020-04-12 17:04:26 -07001074 }
1075
1076 // Shut everyone else down
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001077 StopPi1Server();
1078 StopPi1Client();
1079 StopPi2Client();
1080 StopPi1Test();
1081 StopPi2Test();
Austin Schuh5344c352020-04-12 17:04:26 -07001082}
1083
Austin Schuh4889b182020-11-18 19:11:56 -08001084// TODO(austin): The above test confirms that the external state does the right
Austin Schuh5344c352020-04-12 17:04:26 -07001085// thing, but doesn't confirm that the internal state does. We either need to
1086// expose a way to check the state in a thread-safe way, or need a way to jump
1087// time for one node to do that.
1088
Austin Schuh4889b182020-11-18 19:11:56 -08001089void SendPing(aos::Sender<examples::Ping> *sender, int value) {
1090 aos::Sender<examples::Ping>::Builder builder = sender->MakeBuilder();
1091 examples::Ping::Builder ping_builder = builder.MakeBuilder<examples::Ping>();
1092 ping_builder.add_value(value);
milind1f1dca32021-07-03 13:50:07 -07001093 builder.CheckOk(builder.Send(ping_builder.Finish()));
Austin Schuh4889b182020-11-18 19:11:56 -08001094}
1095
1096// Tests that when a message is sent before the bridge starts up, but is
1097// configured as reliable, we forward it. Confirm this survives a client reset.
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001098TEST_P(MessageBridgeParameterizedTest, ReliableSentBeforeClientStartup) {
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001099 OnPi1();
Austin Schuh4889b182020-11-18 19:11:56 -08001100
1101 FLAGS_application_name = "sender";
Austin Schuhf466ab52021-02-16 22:00:38 -08001102 aos::ShmEventLoop send_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -08001103 aos::Sender<examples::Ping> ping_sender =
1104 send_event_loop.MakeSender<examples::Ping>("/test");
1105 SendPing(&ping_sender, 1);
1106 aos::Sender<examples::Ping> unreliable_ping_sender =
1107 send_event_loop.MakeSender<examples::Ping>("/unreliable");
1108 SendPing(&unreliable_ping_sender, 1);
1109
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001110 MakePi1Server();
1111 MakePi1Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001112
1113 FLAGS_application_name = "pi1_timestamp";
Austin Schuhf466ab52021-02-16 22:00:38 -08001114 aos::ShmEventLoop pi1_remote_timestamp_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -08001115
1116 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001117 OnPi2();
Austin Schuh4889b182020-11-18 19:11:56 -08001118
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001119 MakePi2Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001120
Austin Schuhf466ab52021-02-16 22:00:38 -08001121 aos::ShmEventLoop receive_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -08001122 aos::Fetcher<examples::Ping> ping_fetcher =
1123 receive_event_loop.MakeFetcher<examples::Ping>("/test");
1124 aos::Fetcher<examples::Ping> unreliable_ping_fetcher =
1125 receive_event_loop.MakeFetcher<examples::Ping>("/unreliable");
1126 aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
1127 receive_event_loop.MakeFetcher<ClientStatistics>("/pi2/aos");
1128
1129 const size_t ping_channel_index = configuration::ChannelIndex(
1130 receive_event_loop.configuration(), ping_fetcher.channel());
1131
1132 std::atomic<int> ping_timestamp_count{0};
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001133 const std::string channel_name =
1134 shared() ? "/pi1/aos/remote_timestamps/pi2"
1135 : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping";
Austin Schuh4889b182020-11-18 19:11:56 -08001136 pi1_remote_timestamp_event_loop.MakeWatcher(
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001137 channel_name, [this, channel_name, ping_channel_index,
1138 &ping_timestamp_count](const RemoteMessage &header) {
Austin Schuh61e973f2021-02-21 21:43:56 -08001139 VLOG(1) << channel_name << " RemoteMessage "
Austin Schuh0de30f32020-12-06 12:44:28 -08001140 << aos::FlatbufferToJson(&header);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001141 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001142 if (shared() && header.channel_index() != ping_channel_index) {
1143 return;
Austin Schuh4889b182020-11-18 19:11:56 -08001144 }
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001145 CHECK_EQ(header.channel_index(), ping_channel_index);
1146 ++ping_timestamp_count;
Austin Schuh4889b182020-11-18 19:11:56 -08001147 });
1148
1149 // Before everything starts up, confirm there is no message.
1150 EXPECT_FALSE(ping_fetcher.Fetch());
1151 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
1152
1153 // Spin up the persistant pieces.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001154 StartPi1Server();
1155 StartPi1Client();
1156 StartPi2Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001157
1158 // Event used to wait for the timestamp counting thread to start.
Austin Schuha4e616a2023-05-15 17:59:30 -07001159 std::unique_ptr<ThreadedEventLoopRunner> pi1_remote_timestamp_thread =
1160 std::make_unique<ThreadedEventLoopRunner>(
1161 &pi1_remote_timestamp_event_loop);
Austin Schuh4889b182020-11-18 19:11:56 -08001162
1163 {
1164 // Now, spin up a client for 2 seconds.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001165 MakePi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001166
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001167 RunPi2Client(chrono::milliseconds(2050));
Austin Schuh4889b182020-11-18 19:11:56 -08001168
1169 // Confirm there is no detected duplicate packet.
1170 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1171 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1172 ->Get(0)
1173 ->duplicate_packets(),
1174 0u);
1175
Austin Schuhe61d4382021-03-31 21:33:02 -07001176 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1177 ->Get(0)
1178 ->partial_deliveries(),
1179 0u);
1180
Austin Schuh4889b182020-11-18 19:11:56 -08001181 EXPECT_TRUE(ping_fetcher.Fetch());
1182 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
1183 EXPECT_EQ(ping_timestamp_count, 1);
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001184
1185 StopPi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001186 }
1187
1188 {
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001189 // Now, spin up a client for 2 seconds.
1190 MakePi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001191
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001192 RunPi2Client(chrono::milliseconds(5050));
Austin Schuh4889b182020-11-18 19:11:56 -08001193
1194 // Confirm we detect the duplicate packet correctly.
1195 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1196 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1197 ->Get(0)
1198 ->duplicate_packets(),
1199 1u);
1200
Austin Schuhe61d4382021-03-31 21:33:02 -07001201 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1202 ->Get(0)
1203 ->partial_deliveries(),
1204 0u);
1205
Austin Schuh4889b182020-11-18 19:11:56 -08001206 EXPECT_EQ(ping_timestamp_count, 1);
1207 EXPECT_FALSE(ping_fetcher.Fetch());
1208 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001209
1210 StopPi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001211 }
1212
1213 // Shut everyone else down
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001214 StopPi1Client();
1215 StopPi2Server();
Austin Schuha4e616a2023-05-15 17:59:30 -07001216 pi1_remote_timestamp_thread.reset();
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001217 StopPi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001218}
1219
1220// Tests that when a message is sent before the bridge starts up, but is
1221// configured as reliable, we forward it. Confirm this works across server
1222// resets.
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001223TEST_P(MessageBridgeParameterizedTest, ReliableSentBeforeServerStartup) {
Austin Schuh4889b182020-11-18 19:11:56 -08001224 // Now do it for "raspberrypi2", the client.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001225 OnPi2();
Austin Schuh4889b182020-11-18 19:11:56 -08001226
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001227 MakePi2Server();
1228 MakePi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001229
Austin Schuhf466ab52021-02-16 22:00:38 -08001230 aos::ShmEventLoop receive_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -08001231 aos::Fetcher<examples::Ping> ping_fetcher =
1232 receive_event_loop.MakeFetcher<examples::Ping>("/test");
1233 aos::Fetcher<examples::Ping> unreliable_ping_fetcher =
1234 receive_event_loop.MakeFetcher<examples::Ping>("/unreliable");
1235 aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
1236 receive_event_loop.MakeFetcher<ClientStatistics>("/pi2/aos");
1237
Austin Schuh4889b182020-11-18 19:11:56 -08001238 // Force ourselves to be "raspberrypi" and allocate everything.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001239 OnPi1();
Austin Schuh4889b182020-11-18 19:11:56 -08001240
1241 FLAGS_application_name = "sender";
Austin Schuhf466ab52021-02-16 22:00:38 -08001242 aos::ShmEventLoop send_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -08001243 aos::Sender<examples::Ping> ping_sender =
1244 send_event_loop.MakeSender<examples::Ping>("/test");
1245 {
1246 aos::Sender<examples::Ping>::Builder builder = ping_sender.MakeBuilder();
1247 examples::Ping::Builder ping_builder =
1248 builder.MakeBuilder<examples::Ping>();
1249 ping_builder.add_value(1);
milind1f1dca32021-07-03 13:50:07 -07001250 builder.CheckOk(builder.Send(ping_builder.Finish()));
Austin Schuh4889b182020-11-18 19:11:56 -08001251 }
1252
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001253 MakePi1Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001254
1255 FLAGS_application_name = "pi1_timestamp";
Austin Schuhf466ab52021-02-16 22:00:38 -08001256 aos::ShmEventLoop pi1_remote_timestamp_event_loop(&config.message());
Austin Schuh4889b182020-11-18 19:11:56 -08001257
1258 const size_t ping_channel_index = configuration::ChannelIndex(
1259 receive_event_loop.configuration(), ping_fetcher.channel());
1260
1261 std::atomic<int> ping_timestamp_count{0};
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001262 const std::string channel_name =
1263 shared() ? "/pi1/aos/remote_timestamps/pi2"
1264 : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping";
Austin Schuh4889b182020-11-18 19:11:56 -08001265 pi1_remote_timestamp_event_loop.MakeWatcher(
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001266 channel_name, [this, channel_name, ping_channel_index,
1267 &ping_timestamp_count](const RemoteMessage &header) {
1268 VLOG(1) << channel_name << " RemoteMessage "
Austin Schuh0de30f32020-12-06 12:44:28 -08001269 << aos::FlatbufferToJson(&header);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001270 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001271 if (shared() && header.channel_index() != ping_channel_index) {
1272 return;
Austin Schuh4889b182020-11-18 19:11:56 -08001273 }
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001274 CHECK_EQ(header.channel_index(), ping_channel_index);
1275 ++ping_timestamp_count;
Austin Schuh4889b182020-11-18 19:11:56 -08001276 });
1277
1278 // Before everything starts up, confirm there is no message.
1279 EXPECT_FALSE(ping_fetcher.Fetch());
1280 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
1281
1282 // Spin up the persistant pieces.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001283 StartPi1Client();
1284 StartPi2Server();
1285 StartPi2Client();
Austin Schuh4889b182020-11-18 19:11:56 -08001286
Austin Schuha4e616a2023-05-15 17:59:30 -07001287 std::unique_ptr<ThreadedEventLoopRunner> pi1_remote_timestamp_thread =
1288 std::make_unique<ThreadedEventLoopRunner>(
1289 &pi1_remote_timestamp_event_loop);
Austin Schuh4889b182020-11-18 19:11:56 -08001290
1291 {
1292 // Now, spin up a server for 2 seconds.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001293 MakePi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001294
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001295 RunPi1Server(chrono::milliseconds(2050));
Austin Schuh4889b182020-11-18 19:11:56 -08001296
1297 // Confirm there is no detected duplicate packet.
1298 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1299 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1300 ->Get(0)
1301 ->duplicate_packets(),
1302 0u);
1303
Austin Schuhe61d4382021-03-31 21:33:02 -07001304 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1305 ->Get(0)
1306 ->partial_deliveries(),
1307 0u);
1308
Austin Schuh4889b182020-11-18 19:11:56 -08001309 EXPECT_TRUE(ping_fetcher.Fetch());
1310 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
1311 EXPECT_EQ(ping_timestamp_count, 1);
1312 LOG(INFO) << "Shutting down first pi1 MessageBridgeServer";
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001313
1314 StopPi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001315 }
1316
1317 {
1318 // Now, spin up a second server for 2 seconds.
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001319 MakePi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001320
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001321 RunPi1Server(chrono::milliseconds(2050));
Austin Schuh4889b182020-11-18 19:11:56 -08001322
1323 // Confirm we detect the duplicate packet correctly.
1324 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1325 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1326 ->Get(0)
1327 ->duplicate_packets(),
1328 1u);
1329
Austin Schuhe61d4382021-03-31 21:33:02 -07001330 EXPECT_EQ(pi2_client_statistics_fetcher->connections()
1331 ->Get(0)
1332 ->partial_deliveries(),
1333 0u);
1334
Austin Schuh4889b182020-11-18 19:11:56 -08001335 EXPECT_EQ(ping_timestamp_count, 1);
1336 EXPECT_FALSE(ping_fetcher.Fetch());
1337 EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001338
1339 StopPi1Server();
Austin Schuh4889b182020-11-18 19:11:56 -08001340 }
1341
1342 // Shut everyone else down
Austin Schuh0a2f12f2021-01-08 22:48:29 -08001343 StopPi1Client();
1344 StopPi2Server();
1345 StopPi2Client();
Austin Schuha4e616a2023-05-15 17:59:30 -07001346 pi1_remote_timestamp_thread.reset();
Austin Schuh4889b182020-11-18 19:11:56 -08001347}
1348
Austin Schuh89f23e32023-05-15 17:06:43 -07001349// Test that a client which connects with too big a message gets disconnected
1350// without crashing.
1351TEST_P(MessageBridgeParameterizedTest, TooBigConnect) {
Austin Schuhb0e439d2023-05-15 10:55:40 -07001352 // This is rather annoying to set up. We need to start up a client and
1353 // server, on the same node, but get them to think that they are on different
1354 // nodes.
1355 //
1356 // We need the client to not post directly to "/test" like it would in a
1357 // real system, otherwise we will re-send the ping message... So, use an
1358 // application specific map to have the client post somewhere else.
1359 //
1360 // To top this all off, each of these needs to be done with a ShmEventLoop,
1361 // which needs to run in a separate thread... And it is really hard to get
1362 // everything started up reliably. So just be super generous on timeouts and
1363 // hope for the best. We can be more generous in the future if we need to.
1364 //
1365 // We are faking the application names by passing in --application_name=foo
1366 OnPi1();
1367
Austin Schuh530f9ee2023-05-15 14:29:31 -07001368 MakePi1Server(
1369 "dummy sha256 ");
Austin Schuhb0e439d2023-05-15 10:55:40 -07001370 MakePi1Client();
1371
1372 // And build the app for testing.
1373 MakePi1Test();
1374 aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
1375 pi1_test_event_loop->MakeFetcher<ServerStatistics>("/pi1/aos");
1376 aos::Fetcher<ClientStatistics> pi1_client_statistics_fetcher =
1377 pi1_test_event_loop->MakeFetcher<ClientStatistics>("/pi1/aos");
1378
1379 // Now do it for "raspberrypi2", the client.
1380 OnPi2();
1381 MakePi2Server();
1382
1383 // And build the app for testing.
1384 MakePi2Test();
1385 aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
1386 pi2_test_event_loop->MakeFetcher<ServerStatistics>("/pi2/aos");
1387 aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
1388 pi2_test_event_loop->MakeFetcher<ClientStatistics>("/pi2/aos");
1389
1390 // Wait until we are connected, then send.
1391
1392 StartPi1Test();
1393 StartPi2Test();
1394 StartPi1Server();
1395 StartPi1Client();
1396 StartPi2Server();
1397
1398 {
Austin Schuh89f23e32023-05-15 17:06:43 -07001399 // Now, spin up a SctpClient and send a massive hunk of data. This should
1400 // trigger a disconnect, but no crash.
1401 OnPi2();
1402 FLAGS_application_name = "pi2_message_bridge_client";
1403 pi2_client_event_loop =
1404 std::make_unique<aos::ShmEventLoop>(&config.message());
1405 pi2_client_event_loop->SetRuntimeRealtimePriority(1);
1406
1407 const aos::Node *const remote_node = CHECK_NOTNULL(
1408 configuration::GetNode(pi2_client_event_loop->configuration(), "pi1"));
1409
1410 const aos::FlatbufferDetachedBuffer<aos::message_bridge::Connect>
1411 connect_message(MakeConnectMessage(
1412 pi2_client_event_loop->configuration(),
1413 pi2_client_event_loop->node(), "pi1",
1414 pi2_client_event_loop->boot_uuid(), config_sha256));
1415
1416 SctpClient client(remote_node->hostname()->string_view(),
1417 remote_node->port(),
1418 connect_message.message().channels_to_transfer()->size() +
1419 kControlStreams(),
1420 "");
1421
1422 client.SetMaxReadSize(100000);
1423 client.SetMaxWriteSize(100000);
1424
1425 client.SetPoolSize(2u);
1426
1427 const std::string big_data(100000, 'a');
1428
1429 pi2_client_event_loop->epoll()->OnReadable(client.fd(), [&]() {
1430 aos::unique_c_ptr<Message> message = client.Read();
1431 client.FreeMessage(std::move(message));
1432 });
1433
1434 aos::TimerHandler *const send_big_message = pi2_client_event_loop->AddTimer(
1435 [&]() { CHECK(client.Send(kConnectStream(), big_data, 0)); });
1436
1437 pi2_client_event_loop->OnRun([this, send_big_message]() {
1438 send_big_message->Schedule(pi2_client_event_loop->monotonic_now() +
1439 chrono::seconds(1));
1440 });
Austin Schuhb0e439d2023-05-15 10:55:40 -07001441
1442 RunPi2Client(chrono::milliseconds(3050));
1443
1444 // Now confirm we are synchronized.
1445 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1446 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1447 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
Austin Schuh89f23e32023-05-15 17:06:43 -07001448 EXPECT_FALSE(pi2_client_statistics_fetcher.Fetch());
Austin Schuhb0e439d2023-05-15 10:55:40 -07001449
1450 const ServerConnection *const pi1_connection =
1451 pi1_server_statistics_fetcher->connections()->Get(0);
1452 const ClientConnection *const pi1_client_connection =
1453 pi1_client_statistics_fetcher->connections()->Get(0);
1454 const ServerConnection *const pi2_connection =
1455 pi2_server_statistics_fetcher->connections()->Get(0);
Austin Schuhb0e439d2023-05-15 10:55:40 -07001456
1457 // Make sure one direction is disconnected with a bunch of connection
1458 // attempts and failures.
1459 EXPECT_EQ(pi1_connection->state(), State::DISCONNECTED);
1460 EXPECT_EQ(pi1_connection->connection_count(), 0u);
Austin Schuh89f23e32023-05-15 17:06:43 -07001461 EXPECT_GE(pi1_server_statistics_fetcher->invalid_connection_count(), 1u);
Austin Schuhb0e439d2023-05-15 10:55:40 -07001462
1463 // And the other direction is happy.
1464 EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
1465 EXPECT_EQ(pi2_connection->connection_count(), 1u);
1466 EXPECT_TRUE(pi2_connection->has_connected_since_time());
1467 EXPECT_FALSE(pi2_connection->has_monotonic_offset());
1468 EXPECT_TRUE(pi2_connection->has_boot_uuid());
1469
1470 EXPECT_EQ(pi1_client_connection->state(), State::CONNECTED);
1471 EXPECT_EQ(pi1_client_connection->connection_count(), 1u);
1472
1473 VLOG(1) << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1474 VLOG(1) << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
Austin Schuhb0e439d2023-05-15 10:55:40 -07001475 VLOG(1) << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1476
Austin Schuh89f23e32023-05-15 17:06:43 -07001477 pi2_client_event_loop->epoll()->DeleteFd(client.fd());
1478
Austin Schuhb0e439d2023-05-15 10:55:40 -07001479 StopPi2Client();
1480 }
1481
1482 // Shut everyone else down
1483 StopPi1Server();
1484 StopPi1Client();
1485 StopPi2Server();
1486 StopPi1Test();
1487 StopPi2Test();
1488}
1489
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -07001490INSTANTIATE_TEST_SUITE_P(
Austin Schuh36a2c3e2021-02-18 22:28:38 -08001491 MessageBridgeTests, MessageBridgeParameterizedTest,
1492 ::testing::Values(
1493 Param{"message_bridge_test_combined_timestamps_common_config.json",
1494 true},
1495 Param{"message_bridge_test_common_config.json", false}));
1496
Austin Schuhe84c3ed2019-12-14 15:29:48 -08001497} // namespace testing
1498} // namespace message_bridge
1499} // namespace aos