blob: 6b88ffe9de936574ddb88b1d801f3e268cc9e2f9 [file] [log] [blame]
Alex Perrycb7da4b2019-08-28 19:35:56 -07001#include "aos/events/simulated_event_loop.h"
2
Austin Schuh5f1cc5c2019-12-01 18:01:11 -08003#include <string_view>
4
Alex Perrycb7da4b2019-08-28 19:35:56 -07005#include "aos/events/event_loop_param_test.h"
Austin Schuh2f8fd752020-09-01 22:38:28 -07006#include "aos/events/logging/logger_generated.h"
Austin Schuh01b4c352020-09-21 23:09:39 -07007#include "aos/events/message_counter.h"
Austin Schuh898f4972020-01-11 17:21:25 -08008#include "aos/events/ping_lib.h"
9#include "aos/events/pong_lib.h"
Austin Schuh7d87b672019-12-01 20:23:49 -080010#include "aos/events/test_message_generated.h"
Austin Schuh4c3b9702020-08-30 11:34:55 -070011#include "aos/network/message_bridge_client_generated.h"
12#include "aos/network/message_bridge_server_generated.h"
Austin Schuh0de30f32020-12-06 12:44:28 -080013#include "aos/network/remote_message_generated.h"
Austin Schuh87dd3832021-01-01 23:07:31 -080014#include "aos/network/testing_time_converter.h"
Austin Schuh4c3b9702020-08-30 11:34:55 -070015#include "aos/network/timestamp_generated.h"
Austin Schuh373f1762021-06-02 21:07:09 -070016#include "aos/testing/path.h"
Neil Balchc8f41ed2018-01-20 22:06:53 -080017#include "gtest/gtest.h"
18
19namespace aos {
20namespace testing {
Brian Silverman28d14302020-09-18 15:26:17 -070021namespace {
22
Austin Schuh373f1762021-06-02 21:07:09 -070023using aos::testing::ArtifactPath;
Brian Silverman28d14302020-09-18 15:26:17 -070024
Austin Schuh58646e22021-08-23 23:51:46 -070025using logger::BootTimestamp;
Austin Schuh0de30f32020-12-06 12:44:28 -080026using message_bridge::RemoteMessage;
Austin Schuh7267c532019-05-19 19:55:53 -070027namespace chrono = ::std::chrono;
28
Austin Schuh0de30f32020-12-06 12:44:28 -080029} // namespace
30
Neil Balchc8f41ed2018-01-20 22:06:53 -080031class SimulatedEventLoopTestFactory : public EventLoopTestFactory {
32 public:
Austin Schuh5f1cc5c2019-12-01 18:01:11 -080033 ::std::unique_ptr<EventLoop> Make(std::string_view name) override {
Austin Schuh217a9782019-12-21 23:02:50 -080034 MaybeMake();
Austin Schuhac0771c2020-01-07 18:36:30 -080035 return event_loop_factory_->MakeEventLoop(name, my_node());
Neil Balchc8f41ed2018-01-20 22:06:53 -080036 }
Austin Schuh5f1cc5c2019-12-01 18:01:11 -080037 ::std::unique_ptr<EventLoop> MakePrimary(std::string_view name) override {
Austin Schuh217a9782019-12-21 23:02:50 -080038 MaybeMake();
Austin Schuhac0771c2020-01-07 18:36:30 -080039 return event_loop_factory_->MakeEventLoop(name, my_node());
Austin Schuh44019f92019-05-19 19:58:27 -070040 }
41
Austin Schuh217a9782019-12-21 23:02:50 -080042 void Run() override { event_loop_factory_->Run(); }
43 void Exit() override { event_loop_factory_->Exit(); }
Austin Schuh44019f92019-05-19 19:58:27 -070044
Austin Schuh52d325c2019-06-23 18:59:06 -070045 // TODO(austin): Implement this. It's used currently for a phased loop test.
46 // I'm not sure how much that matters.
47 void SleepFor(::std::chrono::nanoseconds /*duration*/) override {}
48
Austin Schuh7d87b672019-12-01 20:23:49 -080049 void set_send_delay(std::chrono::nanoseconds send_delay) {
Austin Schuh217a9782019-12-21 23:02:50 -080050 MaybeMake();
51 event_loop_factory_->set_send_delay(send_delay);
Austin Schuh7d87b672019-12-01 20:23:49 -080052 }
53
Neil Balchc8f41ed2018-01-20 22:06:53 -080054 private:
Austin Schuh217a9782019-12-21 23:02:50 -080055 void MaybeMake() {
56 if (!event_loop_factory_) {
57 if (configuration()->has_nodes()) {
Austin Schuhac0771c2020-01-07 18:36:30 -080058 event_loop_factory_ =
59 std::make_unique<SimulatedEventLoopFactory>(configuration());
Austin Schuh217a9782019-12-21 23:02:50 -080060 } else {
61 event_loop_factory_ =
62 std::make_unique<SimulatedEventLoopFactory>(configuration());
63 }
64 }
65 }
66 std::unique_ptr<SimulatedEventLoopFactory> event_loop_factory_;
Neil Balchc8f41ed2018-01-20 22:06:53 -080067};
68
Austin Schuh6bae8252021-02-07 22:01:49 -080069auto CommonParameters() {
70 return ::testing::Combine(
71 ::testing::Values([]() { return new SimulatedEventLoopTestFactory(); }),
72 ::testing::Values(ReadMethod::COPY, ReadMethod::PIN),
73 ::testing::Values(DoTimingReports::kYes, DoTimingReports::kNo));
74}
Austin Schuh6b6dfa52019-06-12 20:16:20 -070075
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -070076INSTANTIATE_TEST_SUITE_P(SimulatedEventLoopCommonTest, AbstractEventLoopTest,
Austin Schuh66168842021-08-17 19:42:21 -070077 CommonParameters());
Brian Silverman77162972020-08-12 19:52:40 -070078
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -070079INSTANTIATE_TEST_SUITE_P(SimulatedEventLoopCommonDeathTest,
Austin Schuh66168842021-08-17 19:42:21 -070080 AbstractEventLoopDeathTest, CommonParameters());
Neil Balchc8f41ed2018-01-20 22:06:53 -080081
Austin Schuh89c9b812021-02-20 14:42:10 -080082// Parameters to run all the tests with.
83struct Param {
84 // The config file to use.
85 std::string config;
86 // If true, the RemoteMessage channel should be shared between all the remote
87 // channels. If false, there will be 1 RemoteMessage channel per remote
88 // channel.
89 bool shared;
90};
91
92class RemoteMessageSimulatedEventLoopTest
93 : public ::testing::TestWithParam<struct Param> {
94 public:
95 RemoteMessageSimulatedEventLoopTest()
96 : config(aos::configuration::ReadConfig(
Austin Schuh373f1762021-06-02 21:07:09 -070097 ArtifactPath(absl::StrCat("aos/events/", GetParam().config)))) {
Austin Schuh89c9b812021-02-20 14:42:10 -080098 LOG(INFO) << "Config " << GetParam().config;
99 }
100
101 bool shared() const { return GetParam().shared; }
102
103 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
104 MakePi2OnPi1MessageCounters(aos::EventLoop *event_loop) {
105 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>> counters;
106 if (shared()) {
107 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
108 event_loop, "/aos/remote_timestamps/pi2"));
109 } else {
110 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
111 event_loop,
112 "/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp"));
113 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
114 event_loop, "/aos/remote_timestamps/pi2/test/aos-examples-Ping"));
115 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
116 event_loop, "/aos/remote_timestamps/pi2/reliable/aos-examples-Ping"));
117 }
118 return counters;
119 }
120
121 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
122 MakePi1OnPi2MessageCounters(aos::EventLoop *event_loop) {
123 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>> counters;
124 if (shared()) {
125 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
126 event_loop, "/aos/remote_timestamps/pi1"));
127 } else {
128 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
129 event_loop, "/aos/remote_timestamps/pi1/test/aos-examples-Pong"));
130 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
131 event_loop,
132 "/aos/remote_timestamps/pi1/pi2/aos/aos-message_bridge-Timestamp"));
133 }
134 return counters;
135 }
136
137 aos::FlatbufferDetachedBuffer<aos::Configuration> config;
138};
139
Neil Balchc8f41ed2018-01-20 22:06:53 -0800140// Test that creating an event and running the scheduler runs the event.
141TEST(EventSchedulerTest, ScheduleEvent) {
142 int counter = 0;
Austin Schuh8bd96322020-02-13 21:18:22 -0800143 EventSchedulerScheduler scheduler_scheduler;
Austin Schuh58646e22021-08-23 23:51:46 -0700144 EventScheduler scheduler(0);
Austin Schuh8bd96322020-02-13 21:18:22 -0800145 scheduler_scheduler.AddEventScheduler(&scheduler);
Neil Balchc8f41ed2018-01-20 22:06:53 -0800146
Austin Schuh8bd96322020-02-13 21:18:22 -0800147 scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(1),
Austin Schuhac0771c2020-01-07 18:36:30 -0800148 [&counter]() { counter += 1; });
Austin Schuh8bd96322020-02-13 21:18:22 -0800149 scheduler_scheduler.Run();
Neil Balchc8f41ed2018-01-20 22:06:53 -0800150 EXPECT_EQ(counter, 1);
Ravago Jonescf453ab2020-05-06 21:14:53 -0700151 auto token = scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(2),
152 [&counter]() { counter += 1; });
Neil Balchc8f41ed2018-01-20 22:06:53 -0800153 scheduler.Deschedule(token);
Austin Schuh8bd96322020-02-13 21:18:22 -0800154 scheduler_scheduler.Run();
Neil Balchc8f41ed2018-01-20 22:06:53 -0800155 EXPECT_EQ(counter, 1);
156}
157
158// Test that descheduling an already scheduled event doesn't run the event.
159TEST(EventSchedulerTest, DescheduleEvent) {
160 int counter = 0;
Austin Schuh8bd96322020-02-13 21:18:22 -0800161 EventSchedulerScheduler scheduler_scheduler;
Austin Schuh58646e22021-08-23 23:51:46 -0700162 EventScheduler scheduler(0);
Austin Schuh8bd96322020-02-13 21:18:22 -0800163 scheduler_scheduler.AddEventScheduler(&scheduler);
Neil Balchc8f41ed2018-01-20 22:06:53 -0800164
Austin Schuh8bd96322020-02-13 21:18:22 -0800165 auto token = scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(1),
166 [&counter]() { counter += 1; });
Neil Balchc8f41ed2018-01-20 22:06:53 -0800167 scheduler.Deschedule(token);
Austin Schuh8bd96322020-02-13 21:18:22 -0800168 scheduler_scheduler.Run();
Neil Balchc8f41ed2018-01-20 22:06:53 -0800169 EXPECT_EQ(counter, 0);
170}
Austin Schuh44019f92019-05-19 19:58:27 -0700171
Austin Schuh8fb315a2020-11-19 22:33:58 -0800172void SendTestMessage(aos::Sender<TestMessage> *sender, int value) {
173 aos::Sender<TestMessage>::Builder builder = sender->MakeBuilder();
174 TestMessage::Builder test_message_builder =
175 builder.MakeBuilder<TestMessage>();
176 test_message_builder.add_value(value);
177 builder.Send(test_message_builder.Finish());
178}
179
180// Test that sending a message after running gets properly notified.
181TEST(SimulatedEventLoopTest, SendAfterRunFor) {
182 SimulatedEventLoopTestFactory factory;
183
184 SimulatedEventLoopFactory simulated_event_loop_factory(
185 factory.configuration());
186
187 ::std::unique_ptr<EventLoop> ping_event_loop =
188 simulated_event_loop_factory.MakeEventLoop("ping");
189 aos::Sender<TestMessage> test_message_sender =
190 ping_event_loop->MakeSender<TestMessage>("/test");
191 SendTestMessage(&test_message_sender, 1);
192
193 std::unique_ptr<EventLoop> pong1_event_loop =
194 simulated_event_loop_factory.MakeEventLoop("pong");
195 MessageCounter<TestMessage> test_message_counter1(pong1_event_loop.get(),
196 "/test");
197
198 EXPECT_FALSE(ping_event_loop->is_running());
199
200 // Watchers start when you start running, so there should be nothing counted.
201 simulated_event_loop_factory.RunFor(chrono::seconds(1));
202 EXPECT_EQ(test_message_counter1.count(), 0u);
203
204 std::unique_ptr<EventLoop> pong2_event_loop =
205 simulated_event_loop_factory.MakeEventLoop("pong");
206 MessageCounter<TestMessage> test_message_counter2(pong2_event_loop.get(),
207 "/test");
208
209 // Pauses in the middle don't count though, so this should be counted.
210 // But, the fresh watcher shouldn't pick it up yet.
211 SendTestMessage(&test_message_sender, 2);
212
213 EXPECT_EQ(test_message_counter1.count(), 0u);
214 EXPECT_EQ(test_message_counter2.count(), 0u);
215 simulated_event_loop_factory.RunFor(chrono::seconds(1));
216
217 EXPECT_EQ(test_message_counter1.count(), 1u);
218 EXPECT_EQ(test_message_counter2.count(), 0u);
219}
220
221// Test that creating an event loop while running dies.
222TEST(SimulatedEventLoopDeathTest, MakeEventLoopWhileRunning) {
223 SimulatedEventLoopTestFactory factory;
224
225 SimulatedEventLoopFactory simulated_event_loop_factory(
226 factory.configuration());
227
228 ::std::unique_ptr<EventLoop> event_loop =
229 simulated_event_loop_factory.MakeEventLoop("ping");
230
231 auto timer = event_loop->AddTimer([&]() {
232 EXPECT_DEATH(
233 {
234 ::std::unique_ptr<EventLoop> event_loop2 =
235 simulated_event_loop_factory.MakeEventLoop("ping");
236 },
237 "event loop while running");
238 simulated_event_loop_factory.Exit();
239 });
240
241 event_loop->OnRun([&event_loop, &timer] {
242 timer->Setup(event_loop->monotonic_now() + chrono::milliseconds(50));
243 });
244
245 simulated_event_loop_factory.Run();
246}
247
248// Test that creating a watcher after running dies.
249TEST(SimulatedEventLoopDeathTest, MakeWatcherAfterRunning) {
250 SimulatedEventLoopTestFactory factory;
251
252 SimulatedEventLoopFactory simulated_event_loop_factory(
253 factory.configuration());
254
255 ::std::unique_ptr<EventLoop> event_loop =
256 simulated_event_loop_factory.MakeEventLoop("ping");
257
258 simulated_event_loop_factory.RunFor(chrono::seconds(1));
259
260 EXPECT_DEATH(
261 { MessageCounter<TestMessage> counter(event_loop.get(), "/test"); },
262 "Can't add a watcher after running");
263
264 ::std::unique_ptr<EventLoop> event_loop2 =
265 simulated_event_loop_factory.MakeEventLoop("ping");
266
267 simulated_event_loop_factory.RunFor(chrono::seconds(1));
268
269 EXPECT_DEATH(
270 { MessageCounter<TestMessage> counter(event_loop2.get(), "/test"); },
271 "Can't add a watcher after running");
272}
273
Austin Schuh44019f92019-05-19 19:58:27 -0700274// Test that running for a time period with no handlers causes time to progress
275// correctly.
276TEST(SimulatedEventLoopTest, RunForNoHandlers) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800277 SimulatedEventLoopTestFactory factory;
278
279 SimulatedEventLoopFactory simulated_event_loop_factory(
280 factory.configuration());
Austin Schuh44019f92019-05-19 19:58:27 -0700281 ::std::unique_ptr<EventLoop> event_loop =
Austin Schuh5f1cc5c2019-12-01 18:01:11 -0800282 simulated_event_loop_factory.MakeEventLoop("loop");
Austin Schuh44019f92019-05-19 19:58:27 -0700283
284 simulated_event_loop_factory.RunFor(chrono::seconds(1));
285
286 EXPECT_EQ(::aos::monotonic_clock::epoch() + chrono::seconds(1),
Austin Schuh44019f92019-05-19 19:58:27 -0700287 event_loop->monotonic_now());
288}
289
290// Test that running for a time with a periodic handler causes time to end
291// correctly.
292TEST(SimulatedEventLoopTest, RunForTimerHandler) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800293 SimulatedEventLoopTestFactory factory;
294
295 SimulatedEventLoopFactory simulated_event_loop_factory(
296 factory.configuration());
Austin Schuh44019f92019-05-19 19:58:27 -0700297 ::std::unique_ptr<EventLoop> event_loop =
Austin Schuh5f1cc5c2019-12-01 18:01:11 -0800298 simulated_event_loop_factory.MakeEventLoop("loop");
Austin Schuh44019f92019-05-19 19:58:27 -0700299
300 int counter = 0;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700301 auto timer = event_loop->AddTimer([&counter]() { ++counter; });
Austin Schuh44019f92019-05-19 19:58:27 -0700302 event_loop->OnRun([&event_loop, &timer] {
303 timer->Setup(event_loop->monotonic_now() + chrono::milliseconds(50),
304 chrono::milliseconds(100));
305 });
306
307 simulated_event_loop_factory.RunFor(chrono::seconds(1));
308
309 EXPECT_EQ(::aos::monotonic_clock::epoch() + chrono::seconds(1),
Austin Schuh44019f92019-05-19 19:58:27 -0700310 event_loop->monotonic_now());
311 EXPECT_EQ(counter, 10);
312}
313
Austin Schuh7d87b672019-12-01 20:23:49 -0800314// Tests that watchers have latency in simulation.
315TEST(SimulatedEventLoopTest, WatcherTimingReport) {
316 SimulatedEventLoopTestFactory factory;
317 factory.set_send_delay(std::chrono::microseconds(50));
318
319 FLAGS_timing_report_ms = 1000;
320 auto loop1 = factory.MakePrimary("primary");
321 loop1->MakeWatcher("/test", [](const TestMessage &) {});
322
323 auto loop2 = factory.Make("sender_loop");
324
325 auto loop3 = factory.Make("report_fetcher");
326
327 Fetcher<timing::Report> report_fetcher =
328 loop3->MakeFetcher<timing::Report>("/aos");
329 EXPECT_FALSE(report_fetcher.Fetch());
330
331 auto sender = loop2->MakeSender<TestMessage>("/test");
332
333 // Send 10 messages in the middle of a timing report period so we get
334 // something interesting back.
335 auto test_timer = loop2->AddTimer([&sender]() {
336 for (int i = 0; i < 10; ++i) {
337 aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
338 TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
339 builder.add_value(200 + i);
340 ASSERT_TRUE(msg.Send(builder.Finish()));
341 }
342 });
343
344 // Quit after 1 timing report, mid way through the next cycle.
345 {
346 auto end_timer = loop1->AddTimer([&factory]() { factory.Exit(); });
347 end_timer->Setup(loop1->monotonic_now() + chrono::milliseconds(2500));
348 end_timer->set_name("end");
349 }
350
351 loop1->OnRun([&test_timer, &loop1]() {
352 test_timer->Setup(loop1->monotonic_now() + chrono::milliseconds(1500));
353 });
354
355 factory.Run();
356
357 // And, since we are here, check that the timing report makes sense.
358 // Start by looking for our event loop's timing.
359 FlatbufferDetachedBuffer<timing::Report> primary_report =
360 FlatbufferDetachedBuffer<timing::Report>::Empty();
361 while (report_fetcher.FetchNext()) {
362 LOG(INFO) << "Report " << FlatbufferToJson(report_fetcher.get());
363 if (report_fetcher->name()->string_view() == "primary") {
364 primary_report = CopyFlatBuffer(report_fetcher.get());
365 }
366 }
367
368 // Check the watcher report.
Ravago Jonescf453ab2020-05-06 21:14:53 -0700369 VLOG(1) << FlatbufferToJson(primary_report, {.multi_line = true});
Austin Schuh7d87b672019-12-01 20:23:49 -0800370
371 EXPECT_EQ(primary_report.message().name()->string_view(), "primary");
372
373 // Just the timing report timer.
374 ASSERT_NE(primary_report.message().timers(), nullptr);
375 EXPECT_EQ(primary_report.message().timers()->size(), 2);
376
377 // No phased loops
378 ASSERT_EQ(primary_report.message().phased_loops(), nullptr);
379
380 // And now confirm that the watcher received all 10 messages, and has latency.
381 ASSERT_NE(primary_report.message().watchers(), nullptr);
382 ASSERT_EQ(primary_report.message().watchers()->size(), 1);
383 EXPECT_EQ(primary_report.message().watchers()->Get(0)->count(), 10);
384 EXPECT_NEAR(
385 primary_report.message().watchers()->Get(0)->wakeup_latency()->average(),
386 0.00005, 1e-9);
387 EXPECT_NEAR(
388 primary_report.message().watchers()->Get(0)->wakeup_latency()->min(),
389 0.00005, 1e-9);
390 EXPECT_NEAR(
391 primary_report.message().watchers()->Get(0)->wakeup_latency()->max(),
392 0.00005, 1e-9);
393 EXPECT_EQ(primary_report.message()
394 .watchers()
395 ->Get(0)
396 ->wakeup_latency()
397 ->standard_deviation(),
398 0.0);
399
400 EXPECT_EQ(
401 primary_report.message().watchers()->Get(0)->handler_time()->average(),
402 0.0);
403 EXPECT_EQ(primary_report.message().watchers()->Get(0)->handler_time()->min(),
404 0.0);
405 EXPECT_EQ(primary_report.message().watchers()->Get(0)->handler_time()->max(),
406 0.0);
407 EXPECT_EQ(primary_report.message()
408 .watchers()
409 ->Get(0)
410 ->handler_time()
411 ->standard_deviation(),
412 0.0);
413}
414
Austin Schuh89c9b812021-02-20 14:42:10 -0800415size_t CountAll(
416 const std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
417 &counters) {
418 size_t count = 0u;
419 for (const std::unique_ptr<MessageCounter<RemoteMessage>> &counter :
420 counters) {
421 count += counter->count();
422 }
423 return count;
424}
425
Austin Schuh4c3b9702020-08-30 11:34:55 -0700426// Tests that ping and pong work when on 2 different nodes, and the message
427// gateway messages are sent out as expected.
Austin Schuh89c9b812021-02-20 14:42:10 -0800428TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodePingPong) {
Austin Schuh898f4972020-01-11 17:21:25 -0800429 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
430 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
Austin Schuh4c3b9702020-08-30 11:34:55 -0700431 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
Austin Schuh898f4972020-01-11 17:21:25 -0800432
433 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
434
435 std::unique_ptr<EventLoop> ping_event_loop =
436 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
437 Ping ping(ping_event_loop.get());
438
439 std::unique_ptr<EventLoop> pong_event_loop =
440 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
441 Pong pong(pong_event_loop.get());
442
443 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
444 simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700445 MessageCounter<examples::Pong> pi2_pong_counter(
446 pi2_pong_counter_event_loop.get(), "/test");
Austin Schuh2f8fd752020-09-01 22:38:28 -0700447 aos::Fetcher<message_bridge::Timestamp> pi1_on_pi2_timestamp_fetcher =
448 pi2_pong_counter_event_loop->MakeFetcher<message_bridge::Timestamp>(
449 "/pi1/aos");
450 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
451 pi2_pong_counter_event_loop->MakeFetcher<examples::Ping>("/test");
Austin Schuh898f4972020-01-11 17:21:25 -0800452
Austin Schuh4c3b9702020-08-30 11:34:55 -0700453 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
454 simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
Austin Schuh898f4972020-01-11 17:21:25 -0800455
456 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
457 simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700458 MessageCounter<examples::Pong> pi1_pong_counter(
459 pi1_pong_counter_event_loop.get(), "/test");
Austin Schuh2f8fd752020-09-01 22:38:28 -0700460 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
461 pi1_pong_counter_event_loop->MakeFetcher<examples::Ping>("/test");
462 aos::Fetcher<message_bridge::Timestamp> pi1_on_pi1_timestamp_fetcher =
463 pi1_pong_counter_event_loop->MakeFetcher<message_bridge::Timestamp>(
464 "/aos");
465
Austin Schuh4c3b9702020-08-30 11:34:55 -0700466 // Count timestamps.
467 MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
468 pi1_pong_counter_event_loop.get(), "/pi1/aos");
469 MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
470 pi2_pong_counter_event_loop.get(), "/pi1/aos");
471 MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
472 pi3_pong_counter_event_loop.get(), "/pi1/aos");
473 MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
474 pi1_pong_counter_event_loop.get(), "/pi2/aos");
475 MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
476 pi2_pong_counter_event_loop.get(), "/pi2/aos");
477 MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
478 pi1_pong_counter_event_loop.get(), "/pi3/aos");
479 MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
480 pi3_pong_counter_event_loop.get(), "/pi3/aos");
481
Austin Schuh2f8fd752020-09-01 22:38:28 -0700482 // Count remote timestamps
Austin Schuh89c9b812021-02-20 14:42:10 -0800483 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
484 remote_timestamps_pi2_on_pi1 =
485 MakePi2OnPi1MessageCounters(pi1_pong_counter_event_loop.get());
486 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
487 remote_timestamps_pi1_on_pi2 =
488 MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700489
Austin Schuh4c3b9702020-08-30 11:34:55 -0700490 // Wait to let timestamp estimation start up before looking for the results.
491 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
492
Austin Schuh8fb315a2020-11-19 22:33:58 -0800493 std::unique_ptr<EventLoop> pi1_statistics_counter_event_loop =
494 simulated_event_loop_factory.MakeEventLoop("pi1_statistics_counter", pi1);
495 std::unique_ptr<EventLoop> pi2_statistics_counter_event_loop =
496 simulated_event_loop_factory.MakeEventLoop("pi2_statistics_counter", pi2);
497 std::unique_ptr<EventLoop> pi3_statistics_counter_event_loop =
498 simulated_event_loop_factory.MakeEventLoop("pi3_statistics_counter", pi3);
499
Austin Schuh4c3b9702020-08-30 11:34:55 -0700500 int pi1_server_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800501 pi1_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700502 "/pi1/aos", [&pi1_server_statistics_count](
503 const message_bridge::ServerStatistics &stats) {
504 VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
505 EXPECT_EQ(stats.connections()->size(), 2u);
506 for (const message_bridge::ServerConnection *connection :
507 *stats.connections()) {
508 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800509 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700510 if (connection->node()->name()->string_view() == "pi2") {
511 EXPECT_GT(connection->sent_packets(), 50);
512 } else if (connection->node()->name()->string_view() == "pi3") {
513 EXPECT_GE(connection->sent_packets(), 5);
514 } else {
515 LOG(FATAL) << "Unknown connection";
516 }
517
518 EXPECT_TRUE(connection->has_monotonic_offset());
519 EXPECT_EQ(connection->monotonic_offset(), 0);
520 }
521 ++pi1_server_statistics_count;
522 });
523
524 int pi2_server_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800525 pi2_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700526 "/pi2/aos", [&pi2_server_statistics_count](
527 const message_bridge::ServerStatistics &stats) {
528 VLOG(1) << "pi2 ServerStatistics " << FlatbufferToJson(&stats);
529 EXPECT_EQ(stats.connections()->size(), 1u);
530
531 const message_bridge::ServerConnection *connection =
532 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800533 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700534 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
535 EXPECT_GT(connection->sent_packets(), 50);
536 EXPECT_TRUE(connection->has_monotonic_offset());
537 EXPECT_EQ(connection->monotonic_offset(), 0);
538 ++pi2_server_statistics_count;
539 });
540
541 int pi3_server_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800542 pi3_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700543 "/pi3/aos", [&pi3_server_statistics_count](
544 const message_bridge::ServerStatistics &stats) {
545 VLOG(1) << "pi3 ServerStatistics " << FlatbufferToJson(&stats);
546 EXPECT_EQ(stats.connections()->size(), 1u);
547
548 const message_bridge::ServerConnection *connection =
549 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800550 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700551 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
552 EXPECT_GE(connection->sent_packets(), 5);
553 EXPECT_TRUE(connection->has_monotonic_offset());
554 EXPECT_EQ(connection->monotonic_offset(), 0);
555 ++pi3_server_statistics_count;
556 });
557
558 int pi1_client_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800559 pi1_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700560 "/pi1/aos", [&pi1_client_statistics_count](
561 const message_bridge::ClientStatistics &stats) {
562 VLOG(1) << "pi1 ClientStatistics " << FlatbufferToJson(&stats);
563 EXPECT_EQ(stats.connections()->size(), 2u);
564
565 for (const message_bridge::ClientConnection *connection :
566 *stats.connections()) {
567 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
568 if (connection->node()->name()->string_view() == "pi2") {
569 EXPECT_GT(connection->received_packets(), 50);
570 } else if (connection->node()->name()->string_view() == "pi3") {
571 EXPECT_GE(connection->received_packets(), 5);
572 } else {
573 LOG(FATAL) << "Unknown connection";
574 }
575
Austin Schuhe61d4382021-03-31 21:33:02 -0700576 EXPECT_EQ(connection->partial_deliveries(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700577 EXPECT_TRUE(connection->has_monotonic_offset());
578 EXPECT_EQ(connection->monotonic_offset(), 150000);
579 }
580 ++pi1_client_statistics_count;
581 });
582
583 int pi2_client_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800584 pi2_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700585 "/pi2/aos", [&pi2_client_statistics_count](
586 const message_bridge::ClientStatistics &stats) {
587 VLOG(1) << "pi2 ClientStatistics " << FlatbufferToJson(&stats);
588 EXPECT_EQ(stats.connections()->size(), 1u);
589
590 const message_bridge::ClientConnection *connection =
591 stats.connections()->Get(0);
592 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
593 EXPECT_GT(connection->received_packets(), 50);
Austin Schuhe61d4382021-03-31 21:33:02 -0700594 EXPECT_EQ(connection->partial_deliveries(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700595 EXPECT_TRUE(connection->has_monotonic_offset());
596 EXPECT_EQ(connection->monotonic_offset(), 150000);
597 ++pi2_client_statistics_count;
598 });
599
600 int pi3_client_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800601 pi3_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700602 "/pi3/aos", [&pi3_client_statistics_count](
603 const message_bridge::ClientStatistics &stats) {
604 VLOG(1) << "pi3 ClientStatistics " << FlatbufferToJson(&stats);
605 EXPECT_EQ(stats.connections()->size(), 1u);
606
607 const message_bridge::ClientConnection *connection =
608 stats.connections()->Get(0);
609 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
610 EXPECT_GE(connection->received_packets(), 5);
Austin Schuhe61d4382021-03-31 21:33:02 -0700611 EXPECT_EQ(connection->partial_deliveries(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700612 EXPECT_TRUE(connection->has_monotonic_offset());
613 EXPECT_EQ(connection->monotonic_offset(), 150000);
614 ++pi3_client_statistics_count;
615 });
616
Austin Schuh2f8fd752020-09-01 22:38:28 -0700617 // Find the channel index for both the /pi1/aos Timestamp channel and Ping
618 // channel.
619 const size_t pi1_timestamp_channel =
620 configuration::ChannelIndex(pi1_pong_counter_event_loop->configuration(),
621 pi1_on_pi2_timestamp_fetcher.channel());
622 const size_t ping_timestamp_channel =
623 configuration::ChannelIndex(pi1_pong_counter_event_loop->configuration(),
624 ping_on_pi2_fetcher.channel());
625
626 for (const Channel *channel :
627 *pi1_pong_counter_event_loop->configuration()->channels()) {
628 VLOG(1) << "Channel "
629 << configuration::ChannelIndex(
630 pi1_pong_counter_event_loop->configuration(), channel)
631 << " " << configuration::CleanedChannelToString(channel);
632 }
633
Austin Schuh8fb315a2020-11-19 22:33:58 -0800634 std::unique_ptr<EventLoop> pi1_remote_timestamp =
635 simulated_event_loop_factory.MakeEventLoop("pi1_remote_timestamp", pi1);
636
Austin Schuh89c9b812021-02-20 14:42:10 -0800637 for (std::pair<int, std::string> channel :
638 shared()
639 ? std::vector<std::pair<
640 int, std::string>>{{-1, "/pi1/aos/remote_timestamps/pi2"}}
641 : std::vector<std::pair<int, std::string>>{
642 {pi1_timestamp_channel,
643 "/pi1/aos/remote_timestamps/pi2/pi1/aos/"
644 "aos-message_bridge-Timestamp"},
645 {ping_timestamp_channel,
646 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping"}}) {
647 // For each remote timestamp we get back, confirm that it is either a ping
648 // message, or a timestamp we sent out. Also confirm that the timestamps
649 // are correct.
650 pi1_remote_timestamp->MakeWatcher(
651 channel.second,
652 [pi1_timestamp_channel, ping_timestamp_channel, &ping_on_pi2_fetcher,
653 &ping_on_pi1_fetcher, &pi1_on_pi2_timestamp_fetcher,
654 &pi1_on_pi1_timestamp_fetcher, &simulated_event_loop_factory, pi2,
655 channel_index = channel.first](const RemoteMessage &header) {
656 VLOG(1) << aos::FlatbufferToJson(&header);
657 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuhcdd90272021-03-15 12:46:16 -0700658 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
Austin Schuh89c9b812021-02-20 14:42:10 -0800659 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
Austin Schuhcdd90272021-03-15 12:46:16 -0700660 ->boot_uuid());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700661
Austin Schuh89c9b812021-02-20 14:42:10 -0800662 const aos::monotonic_clock::time_point header_monotonic_sent_time(
663 chrono::nanoseconds(header.monotonic_sent_time()));
664 const aos::realtime_clock::time_point header_realtime_sent_time(
665 chrono::nanoseconds(header.realtime_sent_time()));
666 const aos::monotonic_clock::time_point header_monotonic_remote_time(
667 chrono::nanoseconds(header.monotonic_remote_time()));
668 const aos::realtime_clock::time_point header_realtime_remote_time(
669 chrono::nanoseconds(header.realtime_remote_time()));
Austin Schuh2f8fd752020-09-01 22:38:28 -0700670
Austin Schuh89c9b812021-02-20 14:42:10 -0800671 if (channel_index != -1) {
672 ASSERT_EQ(channel_index, header.channel_index());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700673 }
674
Austin Schuh89c9b812021-02-20 14:42:10 -0800675 const Context *pi1_context = nullptr;
676 const Context *pi2_context = nullptr;
677
678 if (header.channel_index() == pi1_timestamp_channel) {
679 // Find the forwarded message.
680 while (pi1_on_pi2_timestamp_fetcher.context().monotonic_event_time <
681 header_monotonic_sent_time) {
682 ASSERT_TRUE(pi1_on_pi2_timestamp_fetcher.FetchNext());
683 }
684
685 // And the source message.
686 while (pi1_on_pi1_timestamp_fetcher.context().monotonic_event_time <
687 header_monotonic_remote_time) {
688 ASSERT_TRUE(pi1_on_pi1_timestamp_fetcher.FetchNext());
689 }
690
691 pi1_context = &pi1_on_pi1_timestamp_fetcher.context();
692 pi2_context = &pi1_on_pi2_timestamp_fetcher.context();
693 } else if (header.channel_index() == ping_timestamp_channel) {
694 // Find the forwarded message.
695 while (ping_on_pi2_fetcher.context().monotonic_event_time <
696 header_monotonic_sent_time) {
697 ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
698 }
699
700 // And the source message.
701 while (ping_on_pi1_fetcher.context().monotonic_event_time <
702 header_monotonic_remote_time) {
703 ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
704 }
705
706 pi1_context = &ping_on_pi1_fetcher.context();
707 pi2_context = &ping_on_pi2_fetcher.context();
708 } else {
709 LOG(FATAL) << "Unknown channel";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700710 }
711
Austin Schuh89c9b812021-02-20 14:42:10 -0800712 // Confirm the forwarded message has matching timestamps to the
713 // timestamps we got back.
714 EXPECT_EQ(pi2_context->queue_index, header.queue_index());
715 EXPECT_EQ(pi2_context->remote_queue_index,
716 header.remote_queue_index());
717 EXPECT_EQ(pi2_context->monotonic_event_time,
718 header_monotonic_sent_time);
719 EXPECT_EQ(pi2_context->realtime_event_time,
720 header_realtime_sent_time);
721 EXPECT_EQ(pi2_context->realtime_remote_time,
722 header_realtime_remote_time);
723 EXPECT_EQ(pi2_context->monotonic_remote_time,
724 header_monotonic_remote_time);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700725
Austin Schuh89c9b812021-02-20 14:42:10 -0800726 // Confirm the forwarded message also matches the source message.
727 EXPECT_EQ(pi1_context->queue_index, header.remote_queue_index());
728 EXPECT_EQ(pi1_context->monotonic_event_time,
729 header_monotonic_remote_time);
730 EXPECT_EQ(pi1_context->realtime_event_time,
731 header_realtime_remote_time);
732 });
733 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700734
Austin Schuh4c3b9702020-08-30 11:34:55 -0700735 simulated_event_loop_factory.RunFor(chrono::seconds(10) -
736 chrono::milliseconds(500) +
737 chrono::milliseconds(5));
738
739 EXPECT_EQ(pi1_pong_counter.count(), 1001);
740 EXPECT_EQ(pi2_pong_counter.count(), 1001);
741
742 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 100);
743 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 100);
744 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 100);
745 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 100);
746 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 100);
747 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 100);
748 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 100);
749
Austin Schuh20ac95d2020-12-05 17:24:19 -0800750 EXPECT_EQ(pi1_server_statistics_count, 10);
751 EXPECT_EQ(pi2_server_statistics_count, 10);
752 EXPECT_EQ(pi3_server_statistics_count, 10);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700753
754 EXPECT_EQ(pi1_client_statistics_count, 95);
755 EXPECT_EQ(pi2_client_statistics_count, 95);
756 EXPECT_EQ(pi3_client_statistics_count, 95);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700757
758 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -0800759 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 1101);
760 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 1101);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700761}
762
763// Tests that an offset between nodes can be recovered and shows up in
764// ServerStatistics correctly.
765TEST(SimulatedEventLoopTest, MultinodePingPongWithOffset) {
766 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
Austin Schuh373f1762021-06-02 21:07:09 -0700767 aos::configuration::ReadConfig(ArtifactPath(
768 "aos/events/multinode_pingpong_test_combined_config.json"));
Austin Schuh4c3b9702020-08-30 11:34:55 -0700769 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
Austin Schuh87dd3832021-01-01 23:07:31 -0800770 const size_t pi1_index = configuration::GetNodeIndex(&config.message(), pi1);
771 ASSERT_EQ(pi1_index, 0u);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700772 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
Austin Schuh87dd3832021-01-01 23:07:31 -0800773 const size_t pi2_index = configuration::GetNodeIndex(&config.message(), pi2);
774 ASSERT_EQ(pi2_index, 1u);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700775 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
Austin Schuh87dd3832021-01-01 23:07:31 -0800776 const size_t pi3_index = configuration::GetNodeIndex(&config.message(), pi3);
777 ASSERT_EQ(pi3_index, 2u);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700778
Austin Schuh87dd3832021-01-01 23:07:31 -0800779 message_bridge::TestingTimeConverter time(
780 configuration::NodesCount(&config.message()));
Austin Schuh4c3b9702020-08-30 11:34:55 -0700781 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
Austin Schuh58646e22021-08-23 23:51:46 -0700782 simulated_event_loop_factory.SetTimeConverter(&time);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700783
784 constexpr chrono::milliseconds kOffset{1501};
Austin Schuh87dd3832021-01-01 23:07:31 -0800785 time.AddNextTimestamp(
786 distributed_clock::epoch(),
Austin Schuh58646e22021-08-23 23:51:46 -0700787 {BootTimestamp::epoch(), BootTimestamp::epoch() + kOffset,
788 BootTimestamp::epoch()});
Austin Schuh4c3b9702020-08-30 11:34:55 -0700789
790 std::unique_ptr<EventLoop> ping_event_loop =
791 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
792 Ping ping(ping_event_loop.get());
793
794 std::unique_ptr<EventLoop> pong_event_loop =
795 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
796 Pong pong(pong_event_loop.get());
797
Austin Schuh8fb315a2020-11-19 22:33:58 -0800798 // Wait to let timestamp estimation start up before looking for the results.
799 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
800
Austin Schuh87dd3832021-01-01 23:07:31 -0800801 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
802 simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
803
Austin Schuh4c3b9702020-08-30 11:34:55 -0700804 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
805 simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
806
807 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
808 simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
809
Austin Schuh4c3b9702020-08-30 11:34:55 -0700810 // Confirm the offsets are being recovered correctly.
811 int pi1_server_statistics_count = 0;
812 pi1_pong_counter_event_loop->MakeWatcher(
813 "/pi1/aos", [&pi1_server_statistics_count,
814 kOffset](const message_bridge::ServerStatistics &stats) {
815 VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
816 EXPECT_EQ(stats.connections()->size(), 2u);
817 for (const message_bridge::ServerConnection *connection :
818 *stats.connections()) {
819 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800820 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700821 if (connection->node()->name()->string_view() == "pi2") {
822 EXPECT_EQ(connection->monotonic_offset(),
823 chrono::nanoseconds(kOffset).count());
824 } else if (connection->node()->name()->string_view() == "pi3") {
825 EXPECT_EQ(connection->monotonic_offset(), 0);
826 } else {
827 LOG(FATAL) << "Unknown connection";
828 }
829
830 EXPECT_TRUE(connection->has_monotonic_offset());
831 }
832 ++pi1_server_statistics_count;
833 });
834
835 int pi2_server_statistics_count = 0;
836 pi2_pong_counter_event_loop->MakeWatcher(
837 "/pi2/aos", [&pi2_server_statistics_count,
838 kOffset](const message_bridge::ServerStatistics &stats) {
839 VLOG(1) << "pi2 ServerStatistics " << FlatbufferToJson(&stats);
840 EXPECT_EQ(stats.connections()->size(), 1u);
841
842 const message_bridge::ServerConnection *connection =
843 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800844 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700845 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
846 EXPECT_TRUE(connection->has_monotonic_offset());
847 EXPECT_EQ(connection->monotonic_offset(),
848 -chrono::nanoseconds(kOffset).count());
849 ++pi2_server_statistics_count;
850 });
851
852 int pi3_server_statistics_count = 0;
853 pi3_pong_counter_event_loop->MakeWatcher(
854 "/pi3/aos", [&pi3_server_statistics_count](
855 const message_bridge::ServerStatistics &stats) {
856 VLOG(1) << "pi3 ServerStatistics " << FlatbufferToJson(&stats);
857 EXPECT_EQ(stats.connections()->size(), 1u);
858
859 const message_bridge::ServerConnection *connection =
860 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800861 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700862 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
863 EXPECT_TRUE(connection->has_monotonic_offset());
864 EXPECT_EQ(connection->monotonic_offset(), 0);
865 ++pi3_server_statistics_count;
866 });
867
868 simulated_event_loop_factory.RunFor(chrono::seconds(10) -
869 chrono::milliseconds(500) +
870 chrono::milliseconds(5));
871
Austin Schuh20ac95d2020-12-05 17:24:19 -0800872 EXPECT_EQ(pi1_server_statistics_count, 10);
Austin Schuh58646e22021-08-23 23:51:46 -0700873 EXPECT_EQ(pi2_server_statistics_count, 10);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800874 EXPECT_EQ(pi3_server_statistics_count, 10);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700875}
876
877// Test that disabling statistics actually disables them.
Austin Schuh89c9b812021-02-20 14:42:10 -0800878TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeWithoutStatistics) {
Austin Schuh4c3b9702020-08-30 11:34:55 -0700879 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
880 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
881 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
882
883 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
884 simulated_event_loop_factory.DisableStatistics();
885
886 std::unique_ptr<EventLoop> ping_event_loop =
887 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
888 Ping ping(ping_event_loop.get());
889
890 std::unique_ptr<EventLoop> pong_event_loop =
891 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
892 Pong pong(pong_event_loop.get());
893
894 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
895 simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
896
897 MessageCounter<examples::Pong> pi2_pong_counter(
898 pi2_pong_counter_event_loop.get(), "/test");
899
900 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
901 simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
902
903 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
904 simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
905
906 MessageCounter<examples::Pong> pi1_pong_counter(
907 pi1_pong_counter_event_loop.get(), "/test");
908
909 // Count timestamps.
910 MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
911 pi1_pong_counter_event_loop.get(), "/pi1/aos");
912 MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
913 pi2_pong_counter_event_loop.get(), "/pi1/aos");
914 MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
915 pi3_pong_counter_event_loop.get(), "/pi1/aos");
916 MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
917 pi1_pong_counter_event_loop.get(), "/pi2/aos");
918 MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
919 pi2_pong_counter_event_loop.get(), "/pi2/aos");
920 MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
921 pi1_pong_counter_event_loop.get(), "/pi3/aos");
922 MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
923 pi3_pong_counter_event_loop.get(), "/pi3/aos");
924
Austin Schuh2f8fd752020-09-01 22:38:28 -0700925 // Count remote timestamps
Austin Schuh89c9b812021-02-20 14:42:10 -0800926 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
927 remote_timestamps_pi2_on_pi1 =
928 MakePi2OnPi1MessageCounters(pi1_pong_counter_event_loop.get());
929 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
930 remote_timestamps_pi1_on_pi2 =
931 MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700932
Austin Schuh4c3b9702020-08-30 11:34:55 -0700933 MessageCounter<message_bridge::ServerStatistics>
934 pi1_server_statistics_counter(pi1_pong_counter_event_loop.get(),
935 "/pi1/aos");
936 MessageCounter<message_bridge::ServerStatistics>
937 pi2_server_statistics_counter(pi2_pong_counter_event_loop.get(),
938 "/pi2/aos");
939 MessageCounter<message_bridge::ServerStatistics>
940 pi3_server_statistics_counter(pi3_pong_counter_event_loop.get(),
941 "/pi3/aos");
942
943 MessageCounter<message_bridge::ClientStatistics>
944 pi1_client_statistics_counter(pi1_pong_counter_event_loop.get(),
945 "/pi1/aos");
946 MessageCounter<message_bridge::ClientStatistics>
947 pi2_client_statistics_counter(pi2_pong_counter_event_loop.get(),
948 "/pi2/aos");
949 MessageCounter<message_bridge::ClientStatistics>
950 pi3_client_statistics_counter(pi3_pong_counter_event_loop.get(),
951 "/pi3/aos");
Austin Schuh898f4972020-01-11 17:21:25 -0800952
953 simulated_event_loop_factory.RunFor(chrono::seconds(10) +
954 chrono::milliseconds(5));
955
Austin Schuh4c3b9702020-08-30 11:34:55 -0700956 EXPECT_EQ(pi1_pong_counter.count(), 1001u);
957 EXPECT_EQ(pi2_pong_counter.count(), 1001u);
958
959 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 0u);
960 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 0u);
961 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 0u);
962 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 0u);
963 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 0u);
964 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 0u);
965 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 0u);
966
967 EXPECT_EQ(pi1_server_statistics_counter.count(), 0u);
968 EXPECT_EQ(pi2_server_statistics_counter.count(), 0u);
969 EXPECT_EQ(pi3_server_statistics_counter.count(), 0u);
970
971 EXPECT_EQ(pi1_client_statistics_counter.count(), 0u);
972 EXPECT_EQ(pi2_client_statistics_counter.count(), 0u);
973 EXPECT_EQ(pi3_client_statistics_counter.count(), 0u);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700974
975 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -0800976 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 1001);
977 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 1001);
Austin Schuh898f4972020-01-11 17:21:25 -0800978}
979
Austin Schuhc0b0f722020-12-12 18:36:06 -0800980bool AllConnected(const message_bridge::ServerStatistics *server_statistics) {
981 for (const message_bridge::ServerConnection *connection :
982 *server_statistics->connections()) {
983 if (connection->state() != message_bridge::State::CONNECTED) {
984 return false;
985 }
986 }
987 return true;
988}
989
990bool AllConnectedBut(const message_bridge::ServerStatistics *server_statistics,
991 std::string_view target) {
992 for (const message_bridge::ServerConnection *connection :
993 *server_statistics->connections()) {
994 if (connection->node()->name()->string_view() == target) {
995 if (connection->state() == message_bridge::State::CONNECTED) {
996 return false;
997 }
998 } else {
999 if (connection->state() != message_bridge::State::CONNECTED) {
1000 return false;
1001 }
1002 }
1003 }
1004 return true;
1005}
1006
1007bool AllConnected(const message_bridge::ClientStatistics *client_statistics) {
1008 for (const message_bridge::ClientConnection *connection :
1009 *client_statistics->connections()) {
1010 if (connection->state() != message_bridge::State::CONNECTED) {
1011 return false;
1012 }
1013 }
1014 return true;
1015}
1016
1017bool AllConnectedBut(const message_bridge::ClientStatistics *client_statistics,
1018 std::string_view target) {
1019 for (const message_bridge::ClientConnection *connection :
1020 *client_statistics->connections()) {
1021 if (connection->node()->name()->string_view() == target) {
1022 if (connection->state() == message_bridge::State::CONNECTED) {
1023 return false;
1024 }
1025 } else {
1026 if (connection->state() != message_bridge::State::CONNECTED) {
1027 return false;
1028 }
1029 }
1030 }
1031 return true;
1032}
1033
1034// Test that disconnecting nodes actually disconnects them.
Austin Schuh89c9b812021-02-20 14:42:10 -08001035TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeDisconnect) {
Austin Schuhc0b0f722020-12-12 18:36:06 -08001036 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
1037
Austin Schuh58646e22021-08-23 23:51:46 -07001038 NodeEventLoopFactory *pi1 =
1039 simulated_event_loop_factory.GetNodeEventLoopFactory("pi1");
1040 NodeEventLoopFactory *pi2 =
1041 simulated_event_loop_factory.GetNodeEventLoopFactory("pi2");
1042 NodeEventLoopFactory *pi3 =
1043 simulated_event_loop_factory.GetNodeEventLoopFactory("pi3");
1044
1045 std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001046 Ping ping(ping_event_loop.get());
1047
Austin Schuh58646e22021-08-23 23:51:46 -07001048 std::unique_ptr<EventLoop> pong_event_loop = pi2->MakeEventLoop("pong");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001049 Pong pong(pong_event_loop.get());
1050
1051 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
Austin Schuh58646e22021-08-23 23:51:46 -07001052 pi2->MakeEventLoop("pi2_pong_counter");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001053
1054 MessageCounter<examples::Pong> pi2_pong_counter(
1055 pi2_pong_counter_event_loop.get(), "/test");
1056
1057 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
Austin Schuh58646e22021-08-23 23:51:46 -07001058 pi3->MakeEventLoop("pi3_pong_counter");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001059
1060 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
Austin Schuh58646e22021-08-23 23:51:46 -07001061 pi1->MakeEventLoop("pi1_pong_counter");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001062
1063 MessageCounter<examples::Pong> pi1_pong_counter(
1064 pi1_pong_counter_event_loop.get(), "/test");
1065
1066 // Count timestamps.
1067 MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
1068 pi1_pong_counter_event_loop.get(), "/pi1/aos");
1069 MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
1070 pi2_pong_counter_event_loop.get(), "/pi1/aos");
1071 MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
1072 pi3_pong_counter_event_loop.get(), "/pi1/aos");
1073 MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
1074 pi1_pong_counter_event_loop.get(), "/pi2/aos");
1075 MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
1076 pi2_pong_counter_event_loop.get(), "/pi2/aos");
1077 MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
1078 pi1_pong_counter_event_loop.get(), "/pi3/aos");
1079 MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
1080 pi3_pong_counter_event_loop.get(), "/pi3/aos");
1081
1082 // Count remote timestamps
Austin Schuh89c9b812021-02-20 14:42:10 -08001083 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1084 remote_timestamps_pi2_on_pi1 =
1085 MakePi2OnPi1MessageCounters(pi1_pong_counter_event_loop.get());
1086 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1087 remote_timestamps_pi1_on_pi2 =
1088 MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001089
1090 MessageCounter<message_bridge::ServerStatistics>
Austin Schuh58646e22021-08-23 23:51:46 -07001091 *pi1_server_statistics_counter;
1092 pi1->OnStartup([pi1, &pi1_server_statistics_counter]() {
1093 pi1_server_statistics_counter =
1094 pi1->AlwaysStart<MessageCounter<message_bridge::ServerStatistics>>(
1095 "pi1_server_statistics_counter", "/pi1/aos");
1096 });
1097
Austin Schuhc0b0f722020-12-12 18:36:06 -08001098 aos::Fetcher<message_bridge::ServerStatistics> pi1_server_statistics_fetcher =
1099 pi1_pong_counter_event_loop
1100 ->MakeFetcher<message_bridge::ServerStatistics>("/pi1/aos");
1101 aos::Fetcher<message_bridge::ClientStatistics> pi1_client_statistics_fetcher =
1102 pi1_pong_counter_event_loop
1103 ->MakeFetcher<message_bridge::ClientStatistics>("/pi1/aos");
1104
1105 MessageCounter<message_bridge::ServerStatistics>
Austin Schuh58646e22021-08-23 23:51:46 -07001106 *pi2_server_statistics_counter;
1107 pi2->OnStartup([pi2, &pi2_server_statistics_counter]() {
1108 pi2_server_statistics_counter =
1109 pi2->AlwaysStart<MessageCounter<message_bridge::ServerStatistics>>(
1110 "pi2_server_statistics_counter", "/pi2/aos");
1111 });
Austin Schuhc0b0f722020-12-12 18:36:06 -08001112 aos::Fetcher<message_bridge::ServerStatistics> pi2_server_statistics_fetcher =
1113 pi2_pong_counter_event_loop
1114 ->MakeFetcher<message_bridge::ServerStatistics>("/pi2/aos");
1115 aos::Fetcher<message_bridge::ClientStatistics> pi2_client_statistics_fetcher =
1116 pi2_pong_counter_event_loop
1117 ->MakeFetcher<message_bridge::ClientStatistics>("/pi2/aos");
1118
1119 MessageCounter<message_bridge::ServerStatistics>
Austin Schuh58646e22021-08-23 23:51:46 -07001120 *pi3_server_statistics_counter;
1121 pi3->OnStartup([pi3, &pi3_server_statistics_counter]() {
1122 pi3_server_statistics_counter =
1123 pi3->AlwaysStart<MessageCounter<message_bridge::ServerStatistics>>(
1124 "pi3_server_statistics_counter", "/pi3/aos");
1125 });
Austin Schuhc0b0f722020-12-12 18:36:06 -08001126 aos::Fetcher<message_bridge::ServerStatistics> pi3_server_statistics_fetcher =
1127 pi3_pong_counter_event_loop
1128 ->MakeFetcher<message_bridge::ServerStatistics>("/pi3/aos");
1129 aos::Fetcher<message_bridge::ClientStatistics> pi3_client_statistics_fetcher =
1130 pi3_pong_counter_event_loop
1131 ->MakeFetcher<message_bridge::ClientStatistics>("/pi3/aos");
1132
1133 MessageCounter<message_bridge::ClientStatistics>
1134 pi1_client_statistics_counter(pi1_pong_counter_event_loop.get(),
1135 "/pi1/aos");
1136 MessageCounter<message_bridge::ClientStatistics>
1137 pi2_client_statistics_counter(pi2_pong_counter_event_loop.get(),
1138 "/pi2/aos");
1139 MessageCounter<message_bridge::ClientStatistics>
1140 pi3_client_statistics_counter(pi3_pong_counter_event_loop.get(),
1141 "/pi3/aos");
1142
1143 simulated_event_loop_factory.RunFor(chrono::seconds(2) +
1144 chrono::milliseconds(5));
1145
1146 EXPECT_EQ(pi1_pong_counter.count(), 201u);
1147 EXPECT_EQ(pi2_pong_counter.count(), 201u);
1148
1149 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 20u);
1150 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 20u);
1151 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 20u);
1152 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 20u);
1153 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 20u);
1154 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 20u);
1155 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 20u);
1156
Austin Schuh58646e22021-08-23 23:51:46 -07001157 EXPECT_EQ(pi1_server_statistics_counter->count(), 2u);
1158 EXPECT_EQ(pi2_server_statistics_counter->count(), 2u);
1159 EXPECT_EQ(pi3_server_statistics_counter->count(), 2u);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001160
1161 EXPECT_EQ(pi1_client_statistics_counter.count(), 20u);
1162 EXPECT_EQ(pi2_client_statistics_counter.count(), 20u);
1163 EXPECT_EQ(pi3_client_statistics_counter.count(), 20u);
1164
1165 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001166 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 221);
1167 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 221);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001168
1169 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1170 EXPECT_TRUE(AllConnected(pi1_server_statistics_fetcher.get()))
1171 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1172 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1173 EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
1174 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1175 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1176 EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
1177 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1178 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1179 EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
1180 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1181 EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
1182 EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
1183 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
1184 EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
1185 EXPECT_TRUE(AllConnected(pi3_client_statistics_fetcher.get()))
1186 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1187
Austin Schuh58646e22021-08-23 23:51:46 -07001188 pi1->Disconnect(pi3->node());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001189
1190 simulated_event_loop_factory.RunFor(chrono::seconds(2));
1191
1192 EXPECT_EQ(pi1_pong_counter.count(), 401u);
1193 EXPECT_EQ(pi2_pong_counter.count(), 401u);
1194
1195 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 40u);
1196 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 40u);
1197 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 20u);
1198 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 40u);
1199 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 40u);
1200 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 40u);
1201 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 40u);
1202
Austin Schuh58646e22021-08-23 23:51:46 -07001203 EXPECT_EQ(pi1_server_statistics_counter->count(), 4u);
1204 EXPECT_EQ(pi2_server_statistics_counter->count(), 4u);
1205 EXPECT_EQ(pi3_server_statistics_counter->count(), 4u);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001206
1207 EXPECT_EQ(pi1_client_statistics_counter.count(), 40u);
1208 EXPECT_EQ(pi2_client_statistics_counter.count(), 40u);
1209 EXPECT_EQ(pi3_client_statistics_counter.count(), 40u);
1210
1211 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001212 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 441);
1213 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 441);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001214
1215 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1216 EXPECT_TRUE(AllConnectedBut(pi1_server_statistics_fetcher.get(), "pi3"))
1217 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1218 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1219 EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
1220 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1221 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1222 EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
1223 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1224 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1225 EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
1226 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1227 EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
1228 EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
1229 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
1230 EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
1231 EXPECT_TRUE(AllConnectedBut(pi3_client_statistics_fetcher.get(), "pi1"))
1232 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1233
Austin Schuh58646e22021-08-23 23:51:46 -07001234 pi1->Connect(pi3->node());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001235
1236 simulated_event_loop_factory.RunFor(chrono::seconds(2));
1237
1238 EXPECT_EQ(pi1_pong_counter.count(), 601u);
1239 EXPECT_EQ(pi2_pong_counter.count(), 601u);
1240
1241 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 60u);
1242 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 60u);
1243 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 40u);
1244 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 60u);
1245 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 60u);
1246 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 60u);
1247 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 60u);
1248
Austin Schuh58646e22021-08-23 23:51:46 -07001249 EXPECT_EQ(pi1_server_statistics_counter->count(), 6u);
1250 EXPECT_EQ(pi2_server_statistics_counter->count(), 6u);
1251 EXPECT_EQ(pi3_server_statistics_counter->count(), 6u);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001252
1253 EXPECT_EQ(pi1_client_statistics_counter.count(), 60u);
1254 EXPECT_EQ(pi2_client_statistics_counter.count(), 60u);
1255 EXPECT_EQ(pi3_client_statistics_counter.count(), 60u);
1256
1257 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001258 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 661);
1259 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 661);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001260
1261 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1262 EXPECT_TRUE(AllConnected(pi1_server_statistics_fetcher.get()))
1263 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1264 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1265 EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
1266 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1267 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1268 EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
1269 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1270 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1271 EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
1272 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1273 EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
1274 EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
1275 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
1276 EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
1277 EXPECT_TRUE(AllConnected(pi3_client_statistics_fetcher.get()))
1278 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1279}
1280
Austin Schuh2febf0d2020-09-21 22:24:30 -07001281// Tests that the time offset having a slope doesn't break the world.
1282// SimulatedMessageBridge has enough self consistency CHECK statements to
1283// confirm, and we can can also check a message in each direction to make sure
1284// it gets delivered as expected.
1285TEST(SimulatedEventLoopTest, MultinodePingPongWithOffsetAndSlope) {
1286 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
Austin Schuh373f1762021-06-02 21:07:09 -07001287 aos::configuration::ReadConfig(ArtifactPath(
1288 "aos/events/multinode_pingpong_test_combined_config.json"));
Austin Schuh2febf0d2020-09-21 22:24:30 -07001289 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
Austin Schuh87dd3832021-01-01 23:07:31 -08001290 const size_t pi1_index = configuration::GetNodeIndex(&config.message(), pi1);
1291 ASSERT_EQ(pi1_index, 0u);
Austin Schuh2febf0d2020-09-21 22:24:30 -07001292 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
Austin Schuh87dd3832021-01-01 23:07:31 -08001293 const size_t pi2_index = configuration::GetNodeIndex(&config.message(), pi2);
1294 ASSERT_EQ(pi2_index, 1u);
1295 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
1296 const size_t pi3_index = configuration::GetNodeIndex(&config.message(), pi3);
1297 ASSERT_EQ(pi3_index, 2u);
Austin Schuh2febf0d2020-09-21 22:24:30 -07001298
Austin Schuh87dd3832021-01-01 23:07:31 -08001299 message_bridge::TestingTimeConverter time(
1300 configuration::NodesCount(&config.message()));
Austin Schuh2febf0d2020-09-21 22:24:30 -07001301 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
Austin Schuh58646e22021-08-23 23:51:46 -07001302 simulated_event_loop_factory.SetTimeConverter(&time);
Austin Schuh2febf0d2020-09-21 22:24:30 -07001303
Austin Schuh2febf0d2020-09-21 22:24:30 -07001304 constexpr chrono::milliseconds kOffset{150100};
Austin Schuh87dd3832021-01-01 23:07:31 -08001305 time.AddNextTimestamp(
1306 distributed_clock::epoch(),
Austin Schuh58646e22021-08-23 23:51:46 -07001307 {BootTimestamp::epoch(), BootTimestamp::epoch() + kOffset,
1308 BootTimestamp::epoch()});
1309 time.AddNextTimestamp(distributed_clock::epoch() + chrono::seconds(10),
1310 {BootTimestamp::epoch() + chrono::milliseconds(9999),
1311 BootTimestamp::epoch() + kOffset + chrono::seconds(10),
1312 BootTimestamp::epoch() + chrono::milliseconds(9999)});
Austin Schuh2febf0d2020-09-21 22:24:30 -07001313
1314 std::unique_ptr<EventLoop> ping_event_loop =
1315 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
1316 Ping ping(ping_event_loop.get());
1317
1318 std::unique_ptr<EventLoop> pong_event_loop =
1319 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
1320 Pong pong(pong_event_loop.get());
1321
1322 std::unique_ptr<EventLoop> pi1_counter_event_loop =
1323 simulated_event_loop_factory.MakeEventLoop("pi1_counter", pi1);
1324 std::unique_ptr<EventLoop> pi2_counter_event_loop =
1325 simulated_event_loop_factory.MakeEventLoop("pi2_counter", pi2);
1326
1327 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
1328 pi1_counter_event_loop->MakeFetcher<examples::Ping>("/test");
1329 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
1330 pi2_counter_event_loop->MakeFetcher<examples::Ping>("/test");
1331
1332 aos::Fetcher<examples::Pong> pong_on_pi2_fetcher =
1333 pi2_counter_event_loop->MakeFetcher<examples::Pong>("/test");
1334 aos::Fetcher<examples::Pong> pong_on_pi1_fetcher =
1335 pi1_counter_event_loop->MakeFetcher<examples::Pong>("/test");
1336
1337 // End after a pong message comes back. This will leave the latest messages
1338 // on all channels so we can look at timestamps easily and check they make
1339 // sense.
1340 std::unique_ptr<EventLoop> pi1_pong_ender =
1341 simulated_event_loop_factory.MakeEventLoop("pi2_counter", pi1);
1342 int count = 0;
1343 pi1_pong_ender->MakeWatcher(
1344 "/test", [&simulated_event_loop_factory, &count](const examples::Pong &) {
1345 if (++count == 100) {
1346 simulated_event_loop_factory.Exit();
1347 }
1348 });
1349
1350 // Run enough that messages should be delivered.
1351 simulated_event_loop_factory.Run();
1352
1353 // Grab the latest messages.
1354 EXPECT_TRUE(ping_on_pi1_fetcher.Fetch());
1355 EXPECT_TRUE(ping_on_pi2_fetcher.Fetch());
1356 EXPECT_TRUE(pong_on_pi1_fetcher.Fetch());
1357 EXPECT_TRUE(pong_on_pi2_fetcher.Fetch());
1358
1359 // Compute their time on the global distributed clock so we can compute
1360 // distance betwen them.
1361 const distributed_clock::time_point pi1_ping_time =
1362 simulated_event_loop_factory.GetNodeEventLoopFactory(pi1)
1363 ->ToDistributedClock(
1364 ping_on_pi1_fetcher.context().monotonic_event_time);
1365 const distributed_clock::time_point pi2_ping_time =
1366 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
1367 ->ToDistributedClock(
1368 ping_on_pi2_fetcher.context().monotonic_event_time);
1369 const distributed_clock::time_point pi1_pong_time =
1370 simulated_event_loop_factory.GetNodeEventLoopFactory(pi1)
1371 ->ToDistributedClock(
1372 pong_on_pi1_fetcher.context().monotonic_event_time);
1373 const distributed_clock::time_point pi2_pong_time =
1374 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
1375 ->ToDistributedClock(
1376 pong_on_pi2_fetcher.context().monotonic_event_time);
1377
1378 // And confirm the delivery delay is just about exactly 150 uS for both
1379 // directions like expected. There will be a couple ns of rounding errors in
1380 // the conversion functions that aren't worth accounting for right now. This
1381 // will either be really close, or really far.
1382 EXPECT_GE(pi2_ping_time, chrono::microseconds(150) - chrono::nanoseconds(10) +
1383 pi1_ping_time);
1384 EXPECT_LE(pi2_ping_time, chrono::microseconds(150) + chrono::nanoseconds(10) +
1385 pi1_ping_time);
1386
1387 EXPECT_GE(pi1_pong_time, chrono::microseconds(150) - chrono::nanoseconds(10) +
1388 pi2_pong_time);
1389 EXPECT_LE(pi1_pong_time, chrono::microseconds(150) + chrono::nanoseconds(10) +
1390 pi2_pong_time);
1391}
1392
Austin Schuh4c570ea2020-11-19 23:13:24 -08001393void SendPing(aos::Sender<examples::Ping> *sender, int value) {
1394 aos::Sender<examples::Ping>::Builder builder = sender->MakeBuilder();
1395 examples::Ping::Builder ping_builder = builder.MakeBuilder<examples::Ping>();
1396 ping_builder.add_value(value);
1397 builder.Send(ping_builder.Finish());
1398}
1399
1400// Tests that reliable (and unreliable) ping messages get forwarded as expected.
Austin Schuh89c9b812021-02-20 14:42:10 -08001401TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeStartupTesting) {
Austin Schuh4c570ea2020-11-19 23:13:24 -08001402 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
1403 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
1404
1405 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
1406
1407 std::unique_ptr<EventLoop> ping_event_loop =
1408 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
1409 aos::Sender<examples::Ping> pi1_reliable_sender =
1410 ping_event_loop->MakeSender<examples::Ping>("/reliable");
1411 aos::Sender<examples::Ping> pi1_unreliable_sender =
1412 ping_event_loop->MakeSender<examples::Ping>("/unreliable");
1413 SendPing(&pi1_reliable_sender, 1);
1414 SendPing(&pi1_unreliable_sender, 1);
1415
1416 std::unique_ptr<EventLoop> pi2_pong_event_loop =
1417 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
1418 MessageCounter<examples::Ping> pi2_reliable_counter(pi2_pong_event_loop.get(),
1419 "/reliable");
1420 MessageCounter<examples::Ping> pi2_unreliable_counter(
1421 pi2_pong_event_loop.get(), "/unreliable");
1422 aos::Fetcher<examples::Ping> reliable_on_pi2_fetcher =
1423 pi2_pong_event_loop->MakeFetcher<examples::Ping>("/reliable");
1424 aos::Fetcher<examples::Ping> unreliable_on_pi2_fetcher =
1425 pi2_pong_event_loop->MakeFetcher<examples::Ping>("/unreliable");
1426
1427 const size_t reliable_channel_index = configuration::ChannelIndex(
1428 pi2_pong_event_loop->configuration(), reliable_on_pi2_fetcher.channel());
1429
1430 std::unique_ptr<EventLoop> pi1_remote_timestamp =
1431 simulated_event_loop_factory.MakeEventLoop("pi1_remote_timestamp", pi1);
1432
Austin Schuheeaa2022021-01-02 21:52:03 -08001433 const chrono::nanoseconds network_delay =
1434 simulated_event_loop_factory.network_delay();
1435
Austin Schuh4c570ea2020-11-19 23:13:24 -08001436 int reliable_timestamp_count = 0;
1437 pi1_remote_timestamp->MakeWatcher(
Austin Schuh89c9b812021-02-20 14:42:10 -08001438 shared() ? "/pi1/aos/remote_timestamps/pi2"
1439 : "/pi1/aos/remote_timestamps/pi2/reliable/aos-examples-Ping",
Austin Schuh20ac95d2020-12-05 17:24:19 -08001440 [reliable_channel_index, &reliable_timestamp_count,
Austin Schuheeaa2022021-01-02 21:52:03 -08001441 &simulated_event_loop_factory, pi2, network_delay, &pi2_pong_event_loop,
1442 &pi1_remote_timestamp](const RemoteMessage &header) {
Austin Schuh20ac95d2020-12-05 17:24:19 -08001443 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuhcdd90272021-03-15 12:46:16 -07001444 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
Austin Schuh20ac95d2020-12-05 17:24:19 -08001445 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
Austin Schuhcdd90272021-03-15 12:46:16 -07001446 ->boot_uuid());
Austin Schuh4c570ea2020-11-19 23:13:24 -08001447 VLOG(1) << aos::FlatbufferToJson(&header);
1448 if (header.channel_index() == reliable_channel_index) {
1449 ++reliable_timestamp_count;
1450 }
Austin Schuheeaa2022021-01-02 21:52:03 -08001451
1452 const aos::monotonic_clock::time_point header_monotonic_sent_time(
1453 chrono::nanoseconds(header.monotonic_sent_time()));
1454
1455 EXPECT_EQ(pi1_remote_timestamp->context().monotonic_event_time,
1456 header_monotonic_sent_time + network_delay +
1457 (pi1_remote_timestamp->monotonic_now() -
1458 pi2_pong_event_loop->monotonic_now()));
Austin Schuh4c570ea2020-11-19 23:13:24 -08001459 });
1460
1461 // Wait to let timestamp estimation start up before looking for the results.
1462 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
1463
1464 EXPECT_EQ(pi2_reliable_counter.count(), 1u);
1465 // This one isn't reliable, but was sent before the start. It should *not* be
1466 // delivered.
1467 EXPECT_EQ(pi2_unreliable_counter.count(), 0u);
1468 // Confirm we got a timestamp logged for the message that was forwarded.
1469 EXPECT_EQ(reliable_timestamp_count, 1u);
1470
1471 SendPing(&pi1_reliable_sender, 2);
1472 SendPing(&pi1_unreliable_sender, 2);
1473 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
1474 EXPECT_EQ(pi2_reliable_counter.count(), 2u);
1475 EXPECT_EQ(pi2_unreliable_counter.count(), 1u);
1476
1477 EXPECT_EQ(reliable_timestamp_count, 2u);
1478}
1479
Austin Schuh20ac95d2020-12-05 17:24:19 -08001480// Tests that rebooting a node changes the ServerStatistics message and the
1481// RemoteTimestamp message.
Austin Schuh89c9b812021-02-20 14:42:10 -08001482TEST_P(RemoteMessageSimulatedEventLoopTest, BootUUIDTest) {
Austin Schuh72e65682021-09-02 11:37:05 -07001483 const UUID pi1_boot0 = UUID::Random();
1484 const UUID pi2_boot0 = UUID::Random();
1485 const UUID pi2_boot1 = UUID::Random();
1486 const UUID pi3_boot0 = UUID::Random();
1487 UUID expected_boot_uuid = pi2_boot0;
Austin Schuh20ac95d2020-12-05 17:24:19 -08001488
Austin Schuh58646e22021-08-23 23:51:46 -07001489 message_bridge::TestingTimeConverter time(
1490 configuration::NodesCount(&config.message()));
1491 SimulatedEventLoopFactory factory(&config.message());
1492 factory.SetTimeConverter(&time);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001493
Austin Schuh58646e22021-08-23 23:51:46 -07001494 const size_t pi1_index =
1495 configuration::GetNodeIndex(&config.message(), "pi1");
1496 const size_t pi2_index =
1497 configuration::GetNodeIndex(&config.message(), "pi2");
1498 const size_t pi3_index =
1499 configuration::GetNodeIndex(&config.message(), "pi3");
Austin Schuh20ac95d2020-12-05 17:24:19 -08001500
Austin Schuh58646e22021-08-23 23:51:46 -07001501 {
1502 time.AddNextTimestamp(distributed_clock::epoch(),
1503 {BootTimestamp::epoch(), BootTimestamp::epoch(),
1504 BootTimestamp::epoch()});
1505
1506 const chrono::nanoseconds dt = chrono::milliseconds(2001);
1507
1508 time.AddNextTimestamp(
1509 distributed_clock::epoch() + dt,
1510 {BootTimestamp::epoch() + dt,
1511 BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
1512 BootTimestamp::epoch() + dt});
1513
1514 time.set_boot_uuid(pi1_index, 0, pi1_boot0);
1515 time.set_boot_uuid(pi2_index, 0, pi2_boot0);
1516 time.set_boot_uuid(pi2_index, 1, pi2_boot1);
1517 time.set_boot_uuid(pi3_index, 0, pi3_boot0);
1518 }
1519
1520 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1521 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
1522
1523 pi1->OnStartup([pi1]() { pi1->AlwaysStart<Ping>("ping"); });
1524 pi2->OnStartup([pi2]() { pi2->AlwaysStart<Pong>("pong"); });
Austin Schuh20ac95d2020-12-05 17:24:19 -08001525
1526 std::unique_ptr<EventLoop> pi1_remote_timestamp =
Austin Schuh58646e22021-08-23 23:51:46 -07001527 pi1->MakeEventLoop("pi1_remote_timestamp");
Austin Schuh20ac95d2020-12-05 17:24:19 -08001528
1529 int timestamp_count = 0;
1530 pi1_remote_timestamp->MakeWatcher(
Austin Schuh8902fa52021-03-14 22:39:24 -07001531 "/pi2/aos", [&expected_boot_uuid,
1532 &pi1_remote_timestamp](const message_bridge::Timestamp &) {
Austin Schuha9012be2021-07-21 15:19:11 -07001533 EXPECT_EQ(pi1_remote_timestamp->context().source_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -07001534 expected_boot_uuid);
1535 });
1536 pi1_remote_timestamp->MakeWatcher(
1537 "/test",
1538 [&expected_boot_uuid, &pi1_remote_timestamp](const examples::Pong &) {
Austin Schuha9012be2021-07-21 15:19:11 -07001539 EXPECT_EQ(pi1_remote_timestamp->context().source_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -07001540 expected_boot_uuid);
1541 });
1542 pi1_remote_timestamp->MakeWatcher(
Austin Schuh89c9b812021-02-20 14:42:10 -08001543 shared() ? "/pi1/aos/remote_timestamps/pi2"
1544 : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping",
Austin Schuh20ac95d2020-12-05 17:24:19 -08001545 [&timestamp_count, &expected_boot_uuid](const RemoteMessage &header) {
1546 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuhcdd90272021-03-15 12:46:16 -07001547 EXPECT_EQ(UUID::FromVector(header.boot_uuid()), expected_boot_uuid);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001548 VLOG(1) << aos::FlatbufferToJson(&header);
1549 ++timestamp_count;
1550 });
1551
1552 int pi1_server_statistics_count = 0;
Austin Schuh58646e22021-08-23 23:51:46 -07001553 bool first_pi1_server_statistics = true;
Austin Schuh20ac95d2020-12-05 17:24:19 -08001554 pi1_remote_timestamp->MakeWatcher(
Austin Schuh58646e22021-08-23 23:51:46 -07001555 "/pi1/aos", [&pi1_server_statistics_count, &expected_boot_uuid,
1556 &first_pi1_server_statistics](
Austin Schuh20ac95d2020-12-05 17:24:19 -08001557 const message_bridge::ServerStatistics &stats) {
1558 VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
1559 for (const message_bridge::ServerConnection *connection :
1560 *stats.connections()) {
Austin Schuh58646e22021-08-23 23:51:46 -07001561 if (connection->state() == message_bridge::State::CONNECTED) {
1562 ASSERT_TRUE(connection->has_boot_uuid());
1563 }
1564 if (!first_pi1_server_statistics) {
1565 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1566 }
Austin Schuh20ac95d2020-12-05 17:24:19 -08001567 if (connection->node()->name()->string_view() == "pi2") {
Austin Schuh58646e22021-08-23 23:51:46 -07001568 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1569 ASSERT_TRUE(connection->has_boot_uuid());
Austin Schuh20ac95d2020-12-05 17:24:19 -08001570 EXPECT_EQ(expected_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -07001571 UUID::FromString(connection->boot_uuid()))
Austin Schuh20ac95d2020-12-05 17:24:19 -08001572 << " : Got " << aos::FlatbufferToJson(&stats);
1573 ++pi1_server_statistics_count;
1574 }
1575 }
Austin Schuh58646e22021-08-23 23:51:46 -07001576 first_pi1_server_statistics = false;
Austin Schuh20ac95d2020-12-05 17:24:19 -08001577 });
1578
Austin Schuh58646e22021-08-23 23:51:46 -07001579 int pi1_client_statistics_count = 0;
1580 pi1_remote_timestamp->MakeWatcher(
1581 "/pi1/aos", [&pi1_client_statistics_count](
1582 const message_bridge::ClientStatistics &stats) {
1583 VLOG(1) << "pi1 ClientStatistics " << FlatbufferToJson(&stats);
1584 for (const message_bridge::ClientConnection *connection :
1585 *stats.connections()) {
1586 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1587 if (connection->node()->name()->string_view() == "pi2") {
1588 ++pi1_client_statistics_count;
1589 }
1590 }
1591 });
1592
1593 // Confirm that reboot changes the UUID.
1594 pi2->OnShutdown([&expected_boot_uuid, pi2, pi2_boot1]() {
1595 expected_boot_uuid = pi2_boot1;
1596 LOG(INFO) << "OnShutdown triggered for pi2";
1597 pi2->OnStartup([&expected_boot_uuid, pi2]() {
1598 EXPECT_EQ(expected_boot_uuid, pi2->boot_uuid());
1599 });
1600 });
1601
Austin Schuh20ac95d2020-12-05 17:24:19 -08001602 // Let a couple of ServerStatistics messages show up before rebooting.
Austin Schuh58646e22021-08-23 23:51:46 -07001603 factory.RunFor(chrono::milliseconds(2002));
Austin Schuh20ac95d2020-12-05 17:24:19 -08001604
1605 EXPECT_GT(timestamp_count, 100);
1606 EXPECT_GE(pi1_server_statistics_count, 1u);
1607
Austin Schuh20ac95d2020-12-05 17:24:19 -08001608 timestamp_count = 0;
1609 pi1_server_statistics_count = 0;
1610
Austin Schuh58646e22021-08-23 23:51:46 -07001611 factory.RunFor(chrono::milliseconds(2000));
Austin Schuh20ac95d2020-12-05 17:24:19 -08001612 EXPECT_GT(timestamp_count, 100);
1613 EXPECT_GE(pi1_server_statistics_count, 1u);
1614}
1615
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -07001616INSTANTIATE_TEST_SUITE_P(
Austin Schuh89c9b812021-02-20 14:42:10 -08001617 All, RemoteMessageSimulatedEventLoopTest,
1618 ::testing::Values(
1619 Param{"multinode_pingpong_test_combined_config.json", true},
1620 Param{"multinode_pingpong_test_split_config.json", false}));
1621
Austin Schuh58646e22021-08-23 23:51:46 -07001622// Tests that Startup and Shutdown do reasonable things.
1623TEST(SimulatedEventLoopTest, MultinodePingPongStartup) {
1624 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
1625 aos::configuration::ReadConfig(
1626 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
1627
Austin Schuh72e65682021-09-02 11:37:05 -07001628 size_t pi1_shutdown_counter = 0;
1629 size_t pi2_shutdown_counter = 0;
1630 MessageCounter<examples::Pong> *pi1_pong_counter = nullptr;
1631 MessageCounter<examples::Ping> *pi2_ping_counter = nullptr;
1632
Austin Schuh58646e22021-08-23 23:51:46 -07001633 message_bridge::TestingTimeConverter time(
1634 configuration::NodesCount(&config.message()));
1635 SimulatedEventLoopFactory factory(&config.message());
1636 factory.SetTimeConverter(&time);
1637 time.AddNextTimestamp(
1638 distributed_clock::epoch(),
1639 {BootTimestamp::epoch(), BootTimestamp::epoch(), BootTimestamp::epoch()});
1640
1641 const chrono::nanoseconds dt = chrono::seconds(10) + chrono::milliseconds(6);
1642
1643 time.AddNextTimestamp(
1644 distributed_clock::epoch() + dt,
1645 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
1646 BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
1647 BootTimestamp::epoch() + dt});
1648
1649 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1650 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
1651
1652 // Configure startup to start Ping and Pong, and count.
1653 size_t pi1_startup_counter = 0;
1654 size_t pi2_startup_counter = 0;
1655 pi1->OnStartup([pi1]() {
1656 LOG(INFO) << "Made ping";
1657 pi1->AlwaysStart<Ping>("ping");
1658 });
1659 pi1->OnStartup([&pi1_startup_counter]() { ++pi1_startup_counter; });
1660 pi2->OnStartup([pi2]() {
1661 LOG(INFO) << "Made pong";
1662 pi2->AlwaysStart<Pong>("pong");
1663 });
1664 pi2->OnStartup([&pi2_startup_counter]() { ++pi2_startup_counter; });
1665
1666 // Shutdown just counts.
Austin Schuh58646e22021-08-23 23:51:46 -07001667 pi1->OnShutdown([&pi1_shutdown_counter]() { ++pi1_shutdown_counter; });
1668 pi2->OnShutdown([&pi2_shutdown_counter]() { ++pi2_shutdown_counter; });
1669
Austin Schuh58646e22021-08-23 23:51:46 -07001670 // Automatically make counters on startup.
1671 pi1->OnStartup([&pi1_pong_counter, pi1]() {
1672 pi1_pong_counter = pi1->AlwaysStart<MessageCounter<examples::Pong>>(
1673 "pi1_pong_counter", "/test");
1674 });
1675 pi1->OnShutdown([&pi1_pong_counter]() { pi1_pong_counter = nullptr; });
1676 pi2->OnStartup([&pi2_ping_counter, pi2]() {
1677 pi2_ping_counter = pi2->AlwaysStart<MessageCounter<examples::Ping>>(
1678 "pi2_ping_counter", "/test");
1679 });
1680 pi2->OnShutdown([&pi2_ping_counter]() { pi2_ping_counter = nullptr; });
1681
1682 EXPECT_EQ(pi2_ping_counter, nullptr);
1683 EXPECT_EQ(pi1_pong_counter, nullptr);
1684
1685 EXPECT_EQ(pi1_startup_counter, 0u);
1686 EXPECT_EQ(pi2_startup_counter, 0u);
1687 EXPECT_EQ(pi1_shutdown_counter, 0u);
1688 EXPECT_EQ(pi2_shutdown_counter, 0u);
1689
1690 factory.RunFor(chrono::seconds(10) + chrono::milliseconds(5));
1691 EXPECT_EQ(pi1_startup_counter, 1u);
1692 EXPECT_EQ(pi2_startup_counter, 1u);
1693 EXPECT_EQ(pi1_shutdown_counter, 0u);
1694 EXPECT_EQ(pi2_shutdown_counter, 0u);
1695 EXPECT_EQ(pi2_ping_counter->count(), 1001);
1696 EXPECT_EQ(pi1_pong_counter->count(), 1001);
1697
1698 LOG(INFO) << pi1->monotonic_now();
1699 LOG(INFO) << pi2->monotonic_now();
1700
1701 factory.RunFor(chrono::seconds(5) + chrono::milliseconds(5));
1702
1703 EXPECT_EQ(pi1_startup_counter, 2u);
1704 EXPECT_EQ(pi2_startup_counter, 2u);
1705 EXPECT_EQ(pi1_shutdown_counter, 1u);
1706 EXPECT_EQ(pi2_shutdown_counter, 1u);
1707 EXPECT_EQ(pi2_ping_counter->count(), 501);
1708 EXPECT_EQ(pi1_pong_counter->count(), 501);
1709}
1710
1711// Tests that OnStartup handlers can be added after running and get called, and
1712// can't be called when running.
1713TEST(SimulatedEventLoopDeathTest, OnStartupWhileRunning) {
1714 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
1715 aos::configuration::ReadConfig(
1716 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
1717
1718 // Test that we can add startup handlers as long as we aren't running, and
1719 // they get run when Run gets called again.
1720 // Test that adding a startup handler when running fails.
1721 //
1722 // Test shutdown handlers get called on destruction.
1723 SimulatedEventLoopFactory factory(&config.message());
1724
1725 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1726
1727 int startup_count0 = 0;
1728 int startup_count1 = 0;
1729
1730 pi1->OnStartup([&]() { ++startup_count0; });
1731 EXPECT_EQ(startup_count0, 0);
1732 EXPECT_EQ(startup_count1, 0);
1733
1734 factory.RunFor(chrono::nanoseconds(1));
1735 EXPECT_EQ(startup_count0, 1);
1736 EXPECT_EQ(startup_count1, 0);
1737
1738 pi1->OnStartup([&]() { ++startup_count1; });
1739 EXPECT_EQ(startup_count0, 1);
1740 EXPECT_EQ(startup_count1, 0);
1741
1742 factory.RunFor(chrono::nanoseconds(1));
1743 EXPECT_EQ(startup_count0, 1);
1744 EXPECT_EQ(startup_count1, 1);
1745
1746 std::unique_ptr<EventLoop> loop = pi1->MakeEventLoop("foo");
1747 loop->OnRun([&]() { pi1->OnStartup([]() {}); });
1748
1749 EXPECT_DEATH({ factory.RunFor(chrono::nanoseconds(1)); },
1750 "Can only register OnStartup handlers when not running.");
1751}
1752
1753// Tests that OnStartup handlers can be added after running and get called, and
1754// all the handlers get called on reboot. Shutdown handlers are tested the same
1755// way.
1756TEST(SimulatedEventLoopTest, OnStartupShutdownAllRestarts) {
1757 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
1758 aos::configuration::ReadConfig(
1759 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
1760
Austin Schuh72e65682021-09-02 11:37:05 -07001761 int startup_count0 = 0;
1762 int shutdown_count0 = 0;
1763 int startup_count1 = 0;
1764 int shutdown_count1 = 0;
1765
Austin Schuh58646e22021-08-23 23:51:46 -07001766 message_bridge::TestingTimeConverter time(
1767 configuration::NodesCount(&config.message()));
1768 SimulatedEventLoopFactory factory(&config.message());
1769 factory.SetTimeConverter(&time);
1770 time.StartEqual();
1771
1772 const chrono::nanoseconds dt = chrono::seconds(10);
1773 time.RebootAt(0, distributed_clock::epoch() + dt);
1774 time.RebootAt(0, distributed_clock::epoch() + 2 * dt);
1775
1776 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1777
Austin Schuh58646e22021-08-23 23:51:46 -07001778 pi1->OnStartup([&]() { ++startup_count0; });
1779 pi1->OnShutdown([&]() { ++shutdown_count0; });
1780 EXPECT_EQ(startup_count0, 0);
1781 EXPECT_EQ(startup_count1, 0);
1782 EXPECT_EQ(shutdown_count0, 0);
1783 EXPECT_EQ(shutdown_count1, 0);
1784
1785 factory.RunFor(chrono::nanoseconds(1));
1786 EXPECT_EQ(startup_count0, 1);
1787 EXPECT_EQ(startup_count1, 0);
1788 EXPECT_EQ(shutdown_count0, 0);
1789 EXPECT_EQ(shutdown_count1, 0);
1790
1791 pi1->OnStartup([&]() { ++startup_count1; });
1792 EXPECT_EQ(startup_count0, 1);
1793 EXPECT_EQ(startup_count1, 0);
1794 EXPECT_EQ(shutdown_count0, 0);
1795 EXPECT_EQ(shutdown_count1, 0);
1796
1797 factory.RunFor(chrono::nanoseconds(1));
1798 EXPECT_EQ(startup_count0, 1);
1799 EXPECT_EQ(startup_count1, 1);
1800 EXPECT_EQ(shutdown_count0, 0);
1801 EXPECT_EQ(shutdown_count1, 0);
1802
1803 factory.RunFor(chrono::seconds(15));
1804
1805 EXPECT_EQ(startup_count0, 2);
1806 EXPECT_EQ(startup_count1, 2);
1807 EXPECT_EQ(shutdown_count0, 1);
1808 EXPECT_EQ(shutdown_count1, 0);
1809
1810 pi1->OnShutdown([&]() { ++shutdown_count1; });
1811 factory.RunFor(chrono::seconds(10));
1812
1813 EXPECT_EQ(startup_count0, 3);
1814 EXPECT_EQ(startup_count1, 3);
1815 EXPECT_EQ(shutdown_count0, 2);
1816 EXPECT_EQ(shutdown_count1, 1);
1817}
1818
1819// Tests that event loops which outlive shutdown crash.
1820TEST(SimulatedEventLoopDeathTest, EventLoopOutlivesReboot) {
1821 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
1822 aos::configuration::ReadConfig(
1823 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
1824
1825 message_bridge::TestingTimeConverter time(
1826 configuration::NodesCount(&config.message()));
1827 SimulatedEventLoopFactory factory(&config.message());
1828 factory.SetTimeConverter(&time);
1829 time.StartEqual();
1830
1831 const chrono::nanoseconds dt = chrono::seconds(10);
1832 time.RebootAt(0, distributed_clock::epoch() + dt);
1833
1834 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1835
1836 std::unique_ptr<EventLoop> loop = pi1->MakeEventLoop("foo");
1837
1838 EXPECT_DEATH({ factory.RunFor(dt * 2); }, "Event loop");
1839}
1840
1841// Tests that messages don't survive a reboot of a node.
1842TEST(SimulatedEventLoopTest, ChannelClearedOnReboot) {
1843 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
1844 aos::configuration::ReadConfig(
1845 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
1846
1847 message_bridge::TestingTimeConverter time(
1848 configuration::NodesCount(&config.message()));
1849 SimulatedEventLoopFactory factory(&config.message());
1850 factory.SetTimeConverter(&time);
1851 time.StartEqual();
1852
1853 const chrono::nanoseconds dt = chrono::seconds(10);
1854 time.RebootAt(0, distributed_clock::epoch() + dt);
1855
1856 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1857
1858 const UUID boot_uuid = pi1->boot_uuid();
1859 EXPECT_NE(boot_uuid, UUID::Zero());
1860
1861 {
1862 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
1863 aos::Sender<examples::Ping> test_message_sender =
1864 ping_event_loop->MakeSender<examples::Ping>("/reliable");
1865 SendPing(&test_message_sender, 1);
1866 }
1867
1868 factory.RunFor(chrono::seconds(5));
1869
1870 {
1871 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
1872 aos::Fetcher<examples::Ping> fetcher =
1873 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
1874 EXPECT_TRUE(fetcher.Fetch());
1875 }
1876
1877 factory.RunFor(chrono::seconds(10));
1878
1879 {
1880 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
1881 aos::Fetcher<examples::Ping> fetcher =
1882 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
1883 EXPECT_FALSE(fetcher.Fetch());
1884 }
1885 EXPECT_NE(boot_uuid, pi1->boot_uuid());
1886}
1887
1888// Tests that reliable messages get resent on reboot.
1889TEST(SimulatedEventLoopTest, ReliableMessageResentOnReboot) {
1890 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
1891 aos::configuration::ReadConfig(
1892 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
1893
1894 message_bridge::TestingTimeConverter time(
1895 configuration::NodesCount(&config.message()));
1896 SimulatedEventLoopFactory factory(&config.message());
1897 factory.SetTimeConverter(&time);
1898 time.StartEqual();
1899
1900 const chrono::nanoseconds dt = chrono::seconds(1);
1901 time.RebootAt(1, distributed_clock::epoch() + dt);
1902
1903 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1904 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
1905
1906 const UUID pi1_boot_uuid = pi1->boot_uuid();
1907 const UUID pi2_boot_uuid = pi2->boot_uuid();
1908 EXPECT_NE(pi1_boot_uuid, UUID::Zero());
1909 EXPECT_NE(pi2_boot_uuid, UUID::Zero());
1910
1911 {
1912 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
1913 aos::Sender<examples::Ping> test_message_sender =
1914 ping_event_loop->MakeSender<examples::Ping>("/reliable");
1915 SendPing(&test_message_sender, 1);
1916 }
1917
1918 factory.RunFor(chrono::milliseconds(500));
1919
1920 {
1921 ::std::unique_ptr<EventLoop> ping_event_loop = pi2->MakeEventLoop("pong");
1922 aos::Fetcher<examples::Ping> fetcher =
1923 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
1924 EXPECT_TRUE(fetcher.Fetch());
1925 }
1926
1927 factory.RunFor(chrono::seconds(1));
1928
1929 {
1930 ::std::unique_ptr<EventLoop> ping_event_loop = pi2->MakeEventLoop("pong");
1931 aos::Fetcher<examples::Ping> fetcher =
1932 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
1933 EXPECT_TRUE(fetcher.Fetch());
1934 }
1935 EXPECT_NE(pi2_boot_uuid, pi2->boot_uuid());
1936}
1937
Neil Balchc8f41ed2018-01-20 22:06:53 -08001938} // namespace testing
1939} // namespace aos