blob: ca0060c6d92721a77ca65a28fc3ec7cd84774e30 [file] [log] [blame]
Alex Perrycb7da4b2019-08-28 19:35:56 -07001#include "aos/events/simulated_event_loop.h"
2
Austin Schuh5f1cc5c2019-12-01 18:01:11 -08003#include <string_view>
4
Alex Perrycb7da4b2019-08-28 19:35:56 -07005#include "aos/events/event_loop_param_test.h"
Austin Schuh2f8fd752020-09-01 22:38:28 -07006#include "aos/events/logging/logger_generated.h"
Austin Schuh01b4c352020-09-21 23:09:39 -07007#include "aos/events/message_counter.h"
Austin Schuh898f4972020-01-11 17:21:25 -08008#include "aos/events/ping_lib.h"
9#include "aos/events/pong_lib.h"
Austin Schuh7d87b672019-12-01 20:23:49 -080010#include "aos/events/test_message_generated.h"
Austin Schuh4c3b9702020-08-30 11:34:55 -070011#include "aos/network/message_bridge_client_generated.h"
12#include "aos/network/message_bridge_server_generated.h"
Austin Schuh0de30f32020-12-06 12:44:28 -080013#include "aos/network/remote_message_generated.h"
Austin Schuh87dd3832021-01-01 23:07:31 -080014#include "aos/network/testing_time_converter.h"
Austin Schuh4c3b9702020-08-30 11:34:55 -070015#include "aos/network/timestamp_generated.h"
Neil Balchc8f41ed2018-01-20 22:06:53 -080016#include "gtest/gtest.h"
17
18namespace aos {
19namespace testing {
Brian Silverman28d14302020-09-18 15:26:17 -070020namespace {
21
22std::string ConfigPrefix() { return "aos/"; }
23
Austin Schuh0de30f32020-12-06 12:44:28 -080024using message_bridge::RemoteMessage;
Austin Schuh7267c532019-05-19 19:55:53 -070025namespace chrono = ::std::chrono;
26
Austin Schuh0de30f32020-12-06 12:44:28 -080027} // namespace
28
Neil Balchc8f41ed2018-01-20 22:06:53 -080029class SimulatedEventLoopTestFactory : public EventLoopTestFactory {
30 public:
Austin Schuh5f1cc5c2019-12-01 18:01:11 -080031 ::std::unique_ptr<EventLoop> Make(std::string_view name) override {
Austin Schuh217a9782019-12-21 23:02:50 -080032 MaybeMake();
Austin Schuhac0771c2020-01-07 18:36:30 -080033 return event_loop_factory_->MakeEventLoop(name, my_node());
Neil Balchc8f41ed2018-01-20 22:06:53 -080034 }
Austin Schuh5f1cc5c2019-12-01 18:01:11 -080035 ::std::unique_ptr<EventLoop> MakePrimary(std::string_view name) override {
Austin Schuh217a9782019-12-21 23:02:50 -080036 MaybeMake();
Austin Schuhac0771c2020-01-07 18:36:30 -080037 return event_loop_factory_->MakeEventLoop(name, my_node());
Austin Schuh44019f92019-05-19 19:58:27 -070038 }
39
Austin Schuh217a9782019-12-21 23:02:50 -080040 void Run() override { event_loop_factory_->Run(); }
41 void Exit() override { event_loop_factory_->Exit(); }
Austin Schuh44019f92019-05-19 19:58:27 -070042
Austin Schuh52d325c2019-06-23 18:59:06 -070043 // TODO(austin): Implement this. It's used currently for a phased loop test.
44 // I'm not sure how much that matters.
45 void SleepFor(::std::chrono::nanoseconds /*duration*/) override {}
46
Austin Schuh7d87b672019-12-01 20:23:49 -080047 void set_send_delay(std::chrono::nanoseconds send_delay) {
Austin Schuh217a9782019-12-21 23:02:50 -080048 MaybeMake();
49 event_loop_factory_->set_send_delay(send_delay);
Austin Schuh7d87b672019-12-01 20:23:49 -080050 }
51
Neil Balchc8f41ed2018-01-20 22:06:53 -080052 private:
Austin Schuh217a9782019-12-21 23:02:50 -080053 void MaybeMake() {
54 if (!event_loop_factory_) {
55 if (configuration()->has_nodes()) {
Austin Schuhac0771c2020-01-07 18:36:30 -080056 event_loop_factory_ =
57 std::make_unique<SimulatedEventLoopFactory>(configuration());
Austin Schuh217a9782019-12-21 23:02:50 -080058 } else {
59 event_loop_factory_ =
60 std::make_unique<SimulatedEventLoopFactory>(configuration());
61 }
62 }
63 }
64 std::unique_ptr<SimulatedEventLoopFactory> event_loop_factory_;
Neil Balchc8f41ed2018-01-20 22:06:53 -080065};
66
Austin Schuh6bae8252021-02-07 22:01:49 -080067auto CommonParameters() {
68 return ::testing::Combine(
69 ::testing::Values([]() { return new SimulatedEventLoopTestFactory(); }),
70 ::testing::Values(ReadMethod::COPY, ReadMethod::PIN),
71 ::testing::Values(DoTimingReports::kYes, DoTimingReports::kNo));
72}
Austin Schuh6b6dfa52019-06-12 20:16:20 -070073
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -070074INSTANTIATE_TEST_SUITE_P(SimulatedEventLoopCommonTest, AbstractEventLoopTest,
Austin Schuh6bae8252021-02-07 22:01:49 -080075 CommonParameters());
Brian Silverman77162972020-08-12 19:52:40 -070076
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -070077INSTANTIATE_TEST_SUITE_P(SimulatedEventLoopCommonDeathTest,
Austin Schuh6bae8252021-02-07 22:01:49 -080078 AbstractEventLoopDeathTest, CommonParameters());
Neil Balchc8f41ed2018-01-20 22:06:53 -080079
Austin Schuh89c9b812021-02-20 14:42:10 -080080// Parameters to run all the tests with.
81struct Param {
82 // The config file to use.
83 std::string config;
84 // If true, the RemoteMessage channel should be shared between all the remote
85 // channels. If false, there will be 1 RemoteMessage channel per remote
86 // channel.
87 bool shared;
88};
89
90class RemoteMessageSimulatedEventLoopTest
91 : public ::testing::TestWithParam<struct Param> {
92 public:
93 RemoteMessageSimulatedEventLoopTest()
94 : config(aos::configuration::ReadConfig(
95 absl::StrCat(ConfigPrefix(), "events/", GetParam().config))) {
96 LOG(INFO) << "Config " << GetParam().config;
97 }
98
99 bool shared() const { return GetParam().shared; }
100
101 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
102 MakePi2OnPi1MessageCounters(aos::EventLoop *event_loop) {
103 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>> counters;
104 if (shared()) {
105 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
106 event_loop, "/aos/remote_timestamps/pi2"));
107 } else {
108 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
109 event_loop,
110 "/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp"));
111 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
112 event_loop, "/aos/remote_timestamps/pi2/test/aos-examples-Ping"));
113 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
114 event_loop, "/aos/remote_timestamps/pi2/reliable/aos-examples-Ping"));
115 }
116 return counters;
117 }
118
119 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
120 MakePi1OnPi2MessageCounters(aos::EventLoop *event_loop) {
121 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>> counters;
122 if (shared()) {
123 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
124 event_loop, "/aos/remote_timestamps/pi1"));
125 } else {
126 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
127 event_loop, "/aos/remote_timestamps/pi1/test/aos-examples-Pong"));
128 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
129 event_loop,
130 "/aos/remote_timestamps/pi1/pi2/aos/aos-message_bridge-Timestamp"));
131 }
132 return counters;
133 }
134
135 aos::FlatbufferDetachedBuffer<aos::Configuration> config;
136};
137
Neil Balchc8f41ed2018-01-20 22:06:53 -0800138// Test that creating an event and running the scheduler runs the event.
139TEST(EventSchedulerTest, ScheduleEvent) {
140 int counter = 0;
Austin Schuh8bd96322020-02-13 21:18:22 -0800141 EventSchedulerScheduler scheduler_scheduler;
Neil Balchc8f41ed2018-01-20 22:06:53 -0800142 EventScheduler scheduler;
Austin Schuh8bd96322020-02-13 21:18:22 -0800143 scheduler_scheduler.AddEventScheduler(&scheduler);
Neil Balchc8f41ed2018-01-20 22:06:53 -0800144
Austin Schuh8bd96322020-02-13 21:18:22 -0800145 scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(1),
Austin Schuhac0771c2020-01-07 18:36:30 -0800146 [&counter]() { counter += 1; });
Austin Schuh8bd96322020-02-13 21:18:22 -0800147 scheduler_scheduler.Run();
Neil Balchc8f41ed2018-01-20 22:06:53 -0800148 EXPECT_EQ(counter, 1);
Ravago Jonescf453ab2020-05-06 21:14:53 -0700149 auto token = scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(2),
150 [&counter]() { counter += 1; });
Neil Balchc8f41ed2018-01-20 22:06:53 -0800151 scheduler.Deschedule(token);
Austin Schuh8bd96322020-02-13 21:18:22 -0800152 scheduler_scheduler.Run();
Neil Balchc8f41ed2018-01-20 22:06:53 -0800153 EXPECT_EQ(counter, 1);
154}
155
156// Test that descheduling an already scheduled event doesn't run the event.
157TEST(EventSchedulerTest, DescheduleEvent) {
158 int counter = 0;
Austin Schuh8bd96322020-02-13 21:18:22 -0800159 EventSchedulerScheduler scheduler_scheduler;
Neil Balchc8f41ed2018-01-20 22:06:53 -0800160 EventScheduler scheduler;
Austin Schuh8bd96322020-02-13 21:18:22 -0800161 scheduler_scheduler.AddEventScheduler(&scheduler);
Neil Balchc8f41ed2018-01-20 22:06:53 -0800162
Austin Schuh8bd96322020-02-13 21:18:22 -0800163 auto token = scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(1),
164 [&counter]() { counter += 1; });
Neil Balchc8f41ed2018-01-20 22:06:53 -0800165 scheduler.Deschedule(token);
Austin Schuh8bd96322020-02-13 21:18:22 -0800166 scheduler_scheduler.Run();
Neil Balchc8f41ed2018-01-20 22:06:53 -0800167 EXPECT_EQ(counter, 0);
168}
Austin Schuh44019f92019-05-19 19:58:27 -0700169
Austin Schuh8fb315a2020-11-19 22:33:58 -0800170void SendTestMessage(aos::Sender<TestMessage> *sender, int value) {
171 aos::Sender<TestMessage>::Builder builder = sender->MakeBuilder();
172 TestMessage::Builder test_message_builder =
173 builder.MakeBuilder<TestMessage>();
174 test_message_builder.add_value(value);
175 builder.Send(test_message_builder.Finish());
176}
177
178// Test that sending a message after running gets properly notified.
179TEST(SimulatedEventLoopTest, SendAfterRunFor) {
180 SimulatedEventLoopTestFactory factory;
181
182 SimulatedEventLoopFactory simulated_event_loop_factory(
183 factory.configuration());
184
185 ::std::unique_ptr<EventLoop> ping_event_loop =
186 simulated_event_loop_factory.MakeEventLoop("ping");
187 aos::Sender<TestMessage> test_message_sender =
188 ping_event_loop->MakeSender<TestMessage>("/test");
189 SendTestMessage(&test_message_sender, 1);
190
191 std::unique_ptr<EventLoop> pong1_event_loop =
192 simulated_event_loop_factory.MakeEventLoop("pong");
193 MessageCounter<TestMessage> test_message_counter1(pong1_event_loop.get(),
194 "/test");
195
196 EXPECT_FALSE(ping_event_loop->is_running());
197
198 // Watchers start when you start running, so there should be nothing counted.
199 simulated_event_loop_factory.RunFor(chrono::seconds(1));
200 EXPECT_EQ(test_message_counter1.count(), 0u);
201
202 std::unique_ptr<EventLoop> pong2_event_loop =
203 simulated_event_loop_factory.MakeEventLoop("pong");
204 MessageCounter<TestMessage> test_message_counter2(pong2_event_loop.get(),
205 "/test");
206
207 // Pauses in the middle don't count though, so this should be counted.
208 // But, the fresh watcher shouldn't pick it up yet.
209 SendTestMessage(&test_message_sender, 2);
210
211 EXPECT_EQ(test_message_counter1.count(), 0u);
212 EXPECT_EQ(test_message_counter2.count(), 0u);
213 simulated_event_loop_factory.RunFor(chrono::seconds(1));
214
215 EXPECT_EQ(test_message_counter1.count(), 1u);
216 EXPECT_EQ(test_message_counter2.count(), 0u);
217}
218
219// Test that creating an event loop while running dies.
220TEST(SimulatedEventLoopDeathTest, MakeEventLoopWhileRunning) {
221 SimulatedEventLoopTestFactory factory;
222
223 SimulatedEventLoopFactory simulated_event_loop_factory(
224 factory.configuration());
225
226 ::std::unique_ptr<EventLoop> event_loop =
227 simulated_event_loop_factory.MakeEventLoop("ping");
228
229 auto timer = event_loop->AddTimer([&]() {
230 EXPECT_DEATH(
231 {
232 ::std::unique_ptr<EventLoop> event_loop2 =
233 simulated_event_loop_factory.MakeEventLoop("ping");
234 },
235 "event loop while running");
236 simulated_event_loop_factory.Exit();
237 });
238
239 event_loop->OnRun([&event_loop, &timer] {
240 timer->Setup(event_loop->monotonic_now() + chrono::milliseconds(50));
241 });
242
243 simulated_event_loop_factory.Run();
244}
245
246// Test that creating a watcher after running dies.
247TEST(SimulatedEventLoopDeathTest, MakeWatcherAfterRunning) {
248 SimulatedEventLoopTestFactory factory;
249
250 SimulatedEventLoopFactory simulated_event_loop_factory(
251 factory.configuration());
252
253 ::std::unique_ptr<EventLoop> event_loop =
254 simulated_event_loop_factory.MakeEventLoop("ping");
255
256 simulated_event_loop_factory.RunFor(chrono::seconds(1));
257
258 EXPECT_DEATH(
259 { MessageCounter<TestMessage> counter(event_loop.get(), "/test"); },
260 "Can't add a watcher after running");
261
262 ::std::unique_ptr<EventLoop> event_loop2 =
263 simulated_event_loop_factory.MakeEventLoop("ping");
264
265 simulated_event_loop_factory.RunFor(chrono::seconds(1));
266
267 EXPECT_DEATH(
268 { MessageCounter<TestMessage> counter(event_loop2.get(), "/test"); },
269 "Can't add a watcher after running");
270}
271
Austin Schuh44019f92019-05-19 19:58:27 -0700272// Test that running for a time period with no handlers causes time to progress
273// correctly.
274TEST(SimulatedEventLoopTest, RunForNoHandlers) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800275 SimulatedEventLoopTestFactory factory;
276
277 SimulatedEventLoopFactory simulated_event_loop_factory(
278 factory.configuration());
Austin Schuh44019f92019-05-19 19:58:27 -0700279 ::std::unique_ptr<EventLoop> event_loop =
Austin Schuh5f1cc5c2019-12-01 18:01:11 -0800280 simulated_event_loop_factory.MakeEventLoop("loop");
Austin Schuh44019f92019-05-19 19:58:27 -0700281
282 simulated_event_loop_factory.RunFor(chrono::seconds(1));
283
284 EXPECT_EQ(::aos::monotonic_clock::epoch() + chrono::seconds(1),
Austin Schuh44019f92019-05-19 19:58:27 -0700285 event_loop->monotonic_now());
286}
287
288// Test that running for a time with a periodic handler causes time to end
289// correctly.
290TEST(SimulatedEventLoopTest, RunForTimerHandler) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800291 SimulatedEventLoopTestFactory factory;
292
293 SimulatedEventLoopFactory simulated_event_loop_factory(
294 factory.configuration());
Austin Schuh44019f92019-05-19 19:58:27 -0700295 ::std::unique_ptr<EventLoop> event_loop =
Austin Schuh5f1cc5c2019-12-01 18:01:11 -0800296 simulated_event_loop_factory.MakeEventLoop("loop");
Austin Schuh44019f92019-05-19 19:58:27 -0700297
298 int counter = 0;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700299 auto timer = event_loop->AddTimer([&counter]() { ++counter; });
Austin Schuh44019f92019-05-19 19:58:27 -0700300 event_loop->OnRun([&event_loop, &timer] {
301 timer->Setup(event_loop->monotonic_now() + chrono::milliseconds(50),
302 chrono::milliseconds(100));
303 });
304
305 simulated_event_loop_factory.RunFor(chrono::seconds(1));
306
307 EXPECT_EQ(::aos::monotonic_clock::epoch() + chrono::seconds(1),
Austin Schuh44019f92019-05-19 19:58:27 -0700308 event_loop->monotonic_now());
309 EXPECT_EQ(counter, 10);
310}
311
Austin Schuh7d87b672019-12-01 20:23:49 -0800312// Tests that watchers have latency in simulation.
313TEST(SimulatedEventLoopTest, WatcherTimingReport) {
314 SimulatedEventLoopTestFactory factory;
315 factory.set_send_delay(std::chrono::microseconds(50));
316
317 FLAGS_timing_report_ms = 1000;
318 auto loop1 = factory.MakePrimary("primary");
319 loop1->MakeWatcher("/test", [](const TestMessage &) {});
320
321 auto loop2 = factory.Make("sender_loop");
322
323 auto loop3 = factory.Make("report_fetcher");
324
325 Fetcher<timing::Report> report_fetcher =
326 loop3->MakeFetcher<timing::Report>("/aos");
327 EXPECT_FALSE(report_fetcher.Fetch());
328
329 auto sender = loop2->MakeSender<TestMessage>("/test");
330
331 // Send 10 messages in the middle of a timing report period so we get
332 // something interesting back.
333 auto test_timer = loop2->AddTimer([&sender]() {
334 for (int i = 0; i < 10; ++i) {
335 aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
336 TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
337 builder.add_value(200 + i);
338 ASSERT_TRUE(msg.Send(builder.Finish()));
339 }
340 });
341
342 // Quit after 1 timing report, mid way through the next cycle.
343 {
344 auto end_timer = loop1->AddTimer([&factory]() { factory.Exit(); });
345 end_timer->Setup(loop1->monotonic_now() + chrono::milliseconds(2500));
346 end_timer->set_name("end");
347 }
348
349 loop1->OnRun([&test_timer, &loop1]() {
350 test_timer->Setup(loop1->monotonic_now() + chrono::milliseconds(1500));
351 });
352
353 factory.Run();
354
355 // And, since we are here, check that the timing report makes sense.
356 // Start by looking for our event loop's timing.
357 FlatbufferDetachedBuffer<timing::Report> primary_report =
358 FlatbufferDetachedBuffer<timing::Report>::Empty();
359 while (report_fetcher.FetchNext()) {
360 LOG(INFO) << "Report " << FlatbufferToJson(report_fetcher.get());
361 if (report_fetcher->name()->string_view() == "primary") {
362 primary_report = CopyFlatBuffer(report_fetcher.get());
363 }
364 }
365
366 // Check the watcher report.
Ravago Jonescf453ab2020-05-06 21:14:53 -0700367 VLOG(1) << FlatbufferToJson(primary_report, {.multi_line = true});
Austin Schuh7d87b672019-12-01 20:23:49 -0800368
369 EXPECT_EQ(primary_report.message().name()->string_view(), "primary");
370
371 // Just the timing report timer.
372 ASSERT_NE(primary_report.message().timers(), nullptr);
373 EXPECT_EQ(primary_report.message().timers()->size(), 2);
374
375 // No phased loops
376 ASSERT_EQ(primary_report.message().phased_loops(), nullptr);
377
378 // And now confirm that the watcher received all 10 messages, and has latency.
379 ASSERT_NE(primary_report.message().watchers(), nullptr);
380 ASSERT_EQ(primary_report.message().watchers()->size(), 1);
381 EXPECT_EQ(primary_report.message().watchers()->Get(0)->count(), 10);
382 EXPECT_NEAR(
383 primary_report.message().watchers()->Get(0)->wakeup_latency()->average(),
384 0.00005, 1e-9);
385 EXPECT_NEAR(
386 primary_report.message().watchers()->Get(0)->wakeup_latency()->min(),
387 0.00005, 1e-9);
388 EXPECT_NEAR(
389 primary_report.message().watchers()->Get(0)->wakeup_latency()->max(),
390 0.00005, 1e-9);
391 EXPECT_EQ(primary_report.message()
392 .watchers()
393 ->Get(0)
394 ->wakeup_latency()
395 ->standard_deviation(),
396 0.0);
397
398 EXPECT_EQ(
399 primary_report.message().watchers()->Get(0)->handler_time()->average(),
400 0.0);
401 EXPECT_EQ(primary_report.message().watchers()->Get(0)->handler_time()->min(),
402 0.0);
403 EXPECT_EQ(primary_report.message().watchers()->Get(0)->handler_time()->max(),
404 0.0);
405 EXPECT_EQ(primary_report.message()
406 .watchers()
407 ->Get(0)
408 ->handler_time()
409 ->standard_deviation(),
410 0.0);
411}
412
Austin Schuh89c9b812021-02-20 14:42:10 -0800413size_t CountAll(
414 const std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
415 &counters) {
416 size_t count = 0u;
417 for (const std::unique_ptr<MessageCounter<RemoteMessage>> &counter :
418 counters) {
419 count += counter->count();
420 }
421 return count;
422}
423
Austin Schuh4c3b9702020-08-30 11:34:55 -0700424// Tests that ping and pong work when on 2 different nodes, and the message
425// gateway messages are sent out as expected.
Austin Schuh89c9b812021-02-20 14:42:10 -0800426TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodePingPong) {
Austin Schuh898f4972020-01-11 17:21:25 -0800427 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
428 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
Austin Schuh4c3b9702020-08-30 11:34:55 -0700429 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
Austin Schuh898f4972020-01-11 17:21:25 -0800430
431 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
432
433 std::unique_ptr<EventLoop> ping_event_loop =
434 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
435 Ping ping(ping_event_loop.get());
436
437 std::unique_ptr<EventLoop> pong_event_loop =
438 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
439 Pong pong(pong_event_loop.get());
440
441 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
442 simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700443 MessageCounter<examples::Pong> pi2_pong_counter(
444 pi2_pong_counter_event_loop.get(), "/test");
Austin Schuh2f8fd752020-09-01 22:38:28 -0700445 aos::Fetcher<message_bridge::Timestamp> pi1_on_pi2_timestamp_fetcher =
446 pi2_pong_counter_event_loop->MakeFetcher<message_bridge::Timestamp>(
447 "/pi1/aos");
448 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
449 pi2_pong_counter_event_loop->MakeFetcher<examples::Ping>("/test");
Austin Schuh898f4972020-01-11 17:21:25 -0800450
Austin Schuh4c3b9702020-08-30 11:34:55 -0700451 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
452 simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
Austin Schuh898f4972020-01-11 17:21:25 -0800453
454 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
455 simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700456 MessageCounter<examples::Pong> pi1_pong_counter(
457 pi1_pong_counter_event_loop.get(), "/test");
Austin Schuh2f8fd752020-09-01 22:38:28 -0700458 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
459 pi1_pong_counter_event_loop->MakeFetcher<examples::Ping>("/test");
460 aos::Fetcher<message_bridge::Timestamp> pi1_on_pi1_timestamp_fetcher =
461 pi1_pong_counter_event_loop->MakeFetcher<message_bridge::Timestamp>(
462 "/aos");
463
Austin Schuh4c3b9702020-08-30 11:34:55 -0700464 // Count timestamps.
465 MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
466 pi1_pong_counter_event_loop.get(), "/pi1/aos");
467 MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
468 pi2_pong_counter_event_loop.get(), "/pi1/aos");
469 MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
470 pi3_pong_counter_event_loop.get(), "/pi1/aos");
471 MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
472 pi1_pong_counter_event_loop.get(), "/pi2/aos");
473 MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
474 pi2_pong_counter_event_loop.get(), "/pi2/aos");
475 MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
476 pi1_pong_counter_event_loop.get(), "/pi3/aos");
477 MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
478 pi3_pong_counter_event_loop.get(), "/pi3/aos");
479
Austin Schuh2f8fd752020-09-01 22:38:28 -0700480 // Count remote timestamps
Austin Schuh89c9b812021-02-20 14:42:10 -0800481 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
482 remote_timestamps_pi2_on_pi1 =
483 MakePi2OnPi1MessageCounters(pi1_pong_counter_event_loop.get());
484 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
485 remote_timestamps_pi1_on_pi2 =
486 MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700487
Austin Schuh4c3b9702020-08-30 11:34:55 -0700488 // Wait to let timestamp estimation start up before looking for the results.
489 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
490
Austin Schuh8fb315a2020-11-19 22:33:58 -0800491 std::unique_ptr<EventLoop> pi1_statistics_counter_event_loop =
492 simulated_event_loop_factory.MakeEventLoop("pi1_statistics_counter", pi1);
493 std::unique_ptr<EventLoop> pi2_statistics_counter_event_loop =
494 simulated_event_loop_factory.MakeEventLoop("pi2_statistics_counter", pi2);
495 std::unique_ptr<EventLoop> pi3_statistics_counter_event_loop =
496 simulated_event_loop_factory.MakeEventLoop("pi3_statistics_counter", pi3);
497
Austin Schuh4c3b9702020-08-30 11:34:55 -0700498 int pi1_server_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800499 pi1_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700500 "/pi1/aos", [&pi1_server_statistics_count](
501 const message_bridge::ServerStatistics &stats) {
502 VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
503 EXPECT_EQ(stats.connections()->size(), 2u);
504 for (const message_bridge::ServerConnection *connection :
505 *stats.connections()) {
506 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800507 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700508 if (connection->node()->name()->string_view() == "pi2") {
509 EXPECT_GT(connection->sent_packets(), 50);
510 } else if (connection->node()->name()->string_view() == "pi3") {
511 EXPECT_GE(connection->sent_packets(), 5);
512 } else {
513 LOG(FATAL) << "Unknown connection";
514 }
515
516 EXPECT_TRUE(connection->has_monotonic_offset());
517 EXPECT_EQ(connection->monotonic_offset(), 0);
518 }
519 ++pi1_server_statistics_count;
520 });
521
522 int pi2_server_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800523 pi2_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700524 "/pi2/aos", [&pi2_server_statistics_count](
525 const message_bridge::ServerStatistics &stats) {
526 VLOG(1) << "pi2 ServerStatistics " << FlatbufferToJson(&stats);
527 EXPECT_EQ(stats.connections()->size(), 1u);
528
529 const message_bridge::ServerConnection *connection =
530 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800531 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700532 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
533 EXPECT_GT(connection->sent_packets(), 50);
534 EXPECT_TRUE(connection->has_monotonic_offset());
535 EXPECT_EQ(connection->monotonic_offset(), 0);
536 ++pi2_server_statistics_count;
537 });
538
539 int pi3_server_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800540 pi3_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700541 "/pi3/aos", [&pi3_server_statistics_count](
542 const message_bridge::ServerStatistics &stats) {
543 VLOG(1) << "pi3 ServerStatistics " << FlatbufferToJson(&stats);
544 EXPECT_EQ(stats.connections()->size(), 1u);
545
546 const message_bridge::ServerConnection *connection =
547 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800548 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700549 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
550 EXPECT_GE(connection->sent_packets(), 5);
551 EXPECT_TRUE(connection->has_monotonic_offset());
552 EXPECT_EQ(connection->monotonic_offset(), 0);
553 ++pi3_server_statistics_count;
554 });
555
556 int pi1_client_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800557 pi1_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700558 "/pi1/aos", [&pi1_client_statistics_count](
559 const message_bridge::ClientStatistics &stats) {
560 VLOG(1) << "pi1 ClientStatistics " << FlatbufferToJson(&stats);
561 EXPECT_EQ(stats.connections()->size(), 2u);
562
563 for (const message_bridge::ClientConnection *connection :
564 *stats.connections()) {
565 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
566 if (connection->node()->name()->string_view() == "pi2") {
567 EXPECT_GT(connection->received_packets(), 50);
568 } else if (connection->node()->name()->string_view() == "pi3") {
569 EXPECT_GE(connection->received_packets(), 5);
570 } else {
571 LOG(FATAL) << "Unknown connection";
572 }
573
Austin Schuhe61d4382021-03-31 21:33:02 -0700574 EXPECT_EQ(connection->partial_deliveries(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700575 EXPECT_TRUE(connection->has_monotonic_offset());
576 EXPECT_EQ(connection->monotonic_offset(), 150000);
577 }
578 ++pi1_client_statistics_count;
579 });
580
581 int pi2_client_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800582 pi2_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700583 "/pi2/aos", [&pi2_client_statistics_count](
584 const message_bridge::ClientStatistics &stats) {
585 VLOG(1) << "pi2 ClientStatistics " << FlatbufferToJson(&stats);
586 EXPECT_EQ(stats.connections()->size(), 1u);
587
588 const message_bridge::ClientConnection *connection =
589 stats.connections()->Get(0);
590 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
591 EXPECT_GT(connection->received_packets(), 50);
Austin Schuhe61d4382021-03-31 21:33:02 -0700592 EXPECT_EQ(connection->partial_deliveries(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700593 EXPECT_TRUE(connection->has_monotonic_offset());
594 EXPECT_EQ(connection->monotonic_offset(), 150000);
595 ++pi2_client_statistics_count;
596 });
597
598 int pi3_client_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800599 pi3_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700600 "/pi3/aos", [&pi3_client_statistics_count](
601 const message_bridge::ClientStatistics &stats) {
602 VLOG(1) << "pi3 ClientStatistics " << FlatbufferToJson(&stats);
603 EXPECT_EQ(stats.connections()->size(), 1u);
604
605 const message_bridge::ClientConnection *connection =
606 stats.connections()->Get(0);
607 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
608 EXPECT_GE(connection->received_packets(), 5);
Austin Schuhe61d4382021-03-31 21:33:02 -0700609 EXPECT_EQ(connection->partial_deliveries(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700610 EXPECT_TRUE(connection->has_monotonic_offset());
611 EXPECT_EQ(connection->monotonic_offset(), 150000);
612 ++pi3_client_statistics_count;
613 });
614
Austin Schuh2f8fd752020-09-01 22:38:28 -0700615 // Find the channel index for both the /pi1/aos Timestamp channel and Ping
616 // channel.
617 const size_t pi1_timestamp_channel =
618 configuration::ChannelIndex(pi1_pong_counter_event_loop->configuration(),
619 pi1_on_pi2_timestamp_fetcher.channel());
620 const size_t ping_timestamp_channel =
621 configuration::ChannelIndex(pi1_pong_counter_event_loop->configuration(),
622 ping_on_pi2_fetcher.channel());
623
624 for (const Channel *channel :
625 *pi1_pong_counter_event_loop->configuration()->channels()) {
626 VLOG(1) << "Channel "
627 << configuration::ChannelIndex(
628 pi1_pong_counter_event_loop->configuration(), channel)
629 << " " << configuration::CleanedChannelToString(channel);
630 }
631
Austin Schuh8fb315a2020-11-19 22:33:58 -0800632 std::unique_ptr<EventLoop> pi1_remote_timestamp =
633 simulated_event_loop_factory.MakeEventLoop("pi1_remote_timestamp", pi1);
634
Austin Schuh89c9b812021-02-20 14:42:10 -0800635 for (std::pair<int, std::string> channel :
636 shared()
637 ? std::vector<std::pair<
638 int, std::string>>{{-1, "/pi1/aos/remote_timestamps/pi2"}}
639 : std::vector<std::pair<int, std::string>>{
640 {pi1_timestamp_channel,
641 "/pi1/aos/remote_timestamps/pi2/pi1/aos/"
642 "aos-message_bridge-Timestamp"},
643 {ping_timestamp_channel,
644 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping"}}) {
645 // For each remote timestamp we get back, confirm that it is either a ping
646 // message, or a timestamp we sent out. Also confirm that the timestamps
647 // are correct.
648 pi1_remote_timestamp->MakeWatcher(
649 channel.second,
650 [pi1_timestamp_channel, ping_timestamp_channel, &ping_on_pi2_fetcher,
651 &ping_on_pi1_fetcher, &pi1_on_pi2_timestamp_fetcher,
652 &pi1_on_pi1_timestamp_fetcher, &simulated_event_loop_factory, pi2,
653 channel_index = channel.first](const RemoteMessage &header) {
654 VLOG(1) << aos::FlatbufferToJson(&header);
655 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuhcdd90272021-03-15 12:46:16 -0700656 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
Austin Schuh89c9b812021-02-20 14:42:10 -0800657 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
Austin Schuhcdd90272021-03-15 12:46:16 -0700658 ->boot_uuid());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700659
Austin Schuh89c9b812021-02-20 14:42:10 -0800660 const aos::monotonic_clock::time_point header_monotonic_sent_time(
661 chrono::nanoseconds(header.monotonic_sent_time()));
662 const aos::realtime_clock::time_point header_realtime_sent_time(
663 chrono::nanoseconds(header.realtime_sent_time()));
664 const aos::monotonic_clock::time_point header_monotonic_remote_time(
665 chrono::nanoseconds(header.monotonic_remote_time()));
666 const aos::realtime_clock::time_point header_realtime_remote_time(
667 chrono::nanoseconds(header.realtime_remote_time()));
Austin Schuh2f8fd752020-09-01 22:38:28 -0700668
Austin Schuh89c9b812021-02-20 14:42:10 -0800669 if (channel_index != -1) {
670 ASSERT_EQ(channel_index, header.channel_index());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700671 }
672
Austin Schuh89c9b812021-02-20 14:42:10 -0800673 const Context *pi1_context = nullptr;
674 const Context *pi2_context = nullptr;
675
676 if (header.channel_index() == pi1_timestamp_channel) {
677 // Find the forwarded message.
678 while (pi1_on_pi2_timestamp_fetcher.context().monotonic_event_time <
679 header_monotonic_sent_time) {
680 ASSERT_TRUE(pi1_on_pi2_timestamp_fetcher.FetchNext());
681 }
682
683 // And the source message.
684 while (pi1_on_pi1_timestamp_fetcher.context().monotonic_event_time <
685 header_monotonic_remote_time) {
686 ASSERT_TRUE(pi1_on_pi1_timestamp_fetcher.FetchNext());
687 }
688
689 pi1_context = &pi1_on_pi1_timestamp_fetcher.context();
690 pi2_context = &pi1_on_pi2_timestamp_fetcher.context();
691 } else if (header.channel_index() == ping_timestamp_channel) {
692 // Find the forwarded message.
693 while (ping_on_pi2_fetcher.context().monotonic_event_time <
694 header_monotonic_sent_time) {
695 ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
696 }
697
698 // And the source message.
699 while (ping_on_pi1_fetcher.context().monotonic_event_time <
700 header_monotonic_remote_time) {
701 ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
702 }
703
704 pi1_context = &ping_on_pi1_fetcher.context();
705 pi2_context = &ping_on_pi2_fetcher.context();
706 } else {
707 LOG(FATAL) << "Unknown channel";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700708 }
709
Austin Schuh89c9b812021-02-20 14:42:10 -0800710 // Confirm the forwarded message has matching timestamps to the
711 // timestamps we got back.
712 EXPECT_EQ(pi2_context->queue_index, header.queue_index());
713 EXPECT_EQ(pi2_context->remote_queue_index,
714 header.remote_queue_index());
715 EXPECT_EQ(pi2_context->monotonic_event_time,
716 header_monotonic_sent_time);
717 EXPECT_EQ(pi2_context->realtime_event_time,
718 header_realtime_sent_time);
719 EXPECT_EQ(pi2_context->realtime_remote_time,
720 header_realtime_remote_time);
721 EXPECT_EQ(pi2_context->monotonic_remote_time,
722 header_monotonic_remote_time);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700723
Austin Schuh89c9b812021-02-20 14:42:10 -0800724 // Confirm the forwarded message also matches the source message.
725 EXPECT_EQ(pi1_context->queue_index, header.remote_queue_index());
726 EXPECT_EQ(pi1_context->monotonic_event_time,
727 header_monotonic_remote_time);
728 EXPECT_EQ(pi1_context->realtime_event_time,
729 header_realtime_remote_time);
730 });
731 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700732
Austin Schuh4c3b9702020-08-30 11:34:55 -0700733 simulated_event_loop_factory.RunFor(chrono::seconds(10) -
734 chrono::milliseconds(500) +
735 chrono::milliseconds(5));
736
737 EXPECT_EQ(pi1_pong_counter.count(), 1001);
738 EXPECT_EQ(pi2_pong_counter.count(), 1001);
739
740 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 100);
741 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 100);
742 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 100);
743 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 100);
744 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 100);
745 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 100);
746 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 100);
747
Austin Schuh20ac95d2020-12-05 17:24:19 -0800748 EXPECT_EQ(pi1_server_statistics_count, 10);
749 EXPECT_EQ(pi2_server_statistics_count, 10);
750 EXPECT_EQ(pi3_server_statistics_count, 10);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700751
752 EXPECT_EQ(pi1_client_statistics_count, 95);
753 EXPECT_EQ(pi2_client_statistics_count, 95);
754 EXPECT_EQ(pi3_client_statistics_count, 95);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700755
756 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -0800757 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 1101);
758 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 1101);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700759}
760
761// Tests that an offset between nodes can be recovered and shows up in
762// ServerStatistics correctly.
763TEST(SimulatedEventLoopTest, MultinodePingPongWithOffset) {
764 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
Austin Schuh89c9b812021-02-20 14:42:10 -0800765 aos::configuration::ReadConfig(
766 ConfigPrefix() +
767 "events/multinode_pingpong_test_combined_config.json");
Austin Schuh4c3b9702020-08-30 11:34:55 -0700768 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
Austin Schuh87dd3832021-01-01 23:07:31 -0800769 const size_t pi1_index = configuration::GetNodeIndex(&config.message(), pi1);
770 ASSERT_EQ(pi1_index, 0u);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700771 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
Austin Schuh87dd3832021-01-01 23:07:31 -0800772 const size_t pi2_index = configuration::GetNodeIndex(&config.message(), pi2);
773 ASSERT_EQ(pi2_index, 1u);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700774 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
Austin Schuh87dd3832021-01-01 23:07:31 -0800775 const size_t pi3_index = configuration::GetNodeIndex(&config.message(), pi3);
776 ASSERT_EQ(pi3_index, 2u);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700777
Austin Schuh87dd3832021-01-01 23:07:31 -0800778 message_bridge::TestingTimeConverter time(
779 configuration::NodesCount(&config.message()));
Austin Schuh4c3b9702020-08-30 11:34:55 -0700780 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
781 NodeEventLoopFactory *pi2_factory =
782 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2);
Austin Schuh87dd3832021-01-01 23:07:31 -0800783 pi2_factory->SetTimeConverter(&time);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700784
785 constexpr chrono::milliseconds kOffset{1501};
Austin Schuh87dd3832021-01-01 23:07:31 -0800786 time.AddNextTimestamp(
787 distributed_clock::epoch(),
788 {monotonic_clock::epoch(), monotonic_clock::epoch() + kOffset,
789 monotonic_clock::epoch()});
Austin Schuh4c3b9702020-08-30 11:34:55 -0700790
791 std::unique_ptr<EventLoop> ping_event_loop =
792 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
793 Ping ping(ping_event_loop.get());
794
795 std::unique_ptr<EventLoop> pong_event_loop =
796 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
797 Pong pong(pong_event_loop.get());
798
Austin Schuh8fb315a2020-11-19 22:33:58 -0800799 // Wait to let timestamp estimation start up before looking for the results.
800 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
801
Austin Schuh87dd3832021-01-01 23:07:31 -0800802 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
803 simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
804
Austin Schuh4c3b9702020-08-30 11:34:55 -0700805 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
806 simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
807
808 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
809 simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
810
Austin Schuh4c3b9702020-08-30 11:34:55 -0700811 // Confirm the offsets are being recovered correctly.
812 int pi1_server_statistics_count = 0;
813 pi1_pong_counter_event_loop->MakeWatcher(
814 "/pi1/aos", [&pi1_server_statistics_count,
815 kOffset](const message_bridge::ServerStatistics &stats) {
816 VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
817 EXPECT_EQ(stats.connections()->size(), 2u);
818 for (const message_bridge::ServerConnection *connection :
819 *stats.connections()) {
820 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800821 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700822 if (connection->node()->name()->string_view() == "pi2") {
823 EXPECT_EQ(connection->monotonic_offset(),
824 chrono::nanoseconds(kOffset).count());
825 } else if (connection->node()->name()->string_view() == "pi3") {
826 EXPECT_EQ(connection->monotonic_offset(), 0);
827 } else {
828 LOG(FATAL) << "Unknown connection";
829 }
830
831 EXPECT_TRUE(connection->has_monotonic_offset());
832 }
833 ++pi1_server_statistics_count;
834 });
835
836 int pi2_server_statistics_count = 0;
837 pi2_pong_counter_event_loop->MakeWatcher(
838 "/pi2/aos", [&pi2_server_statistics_count,
839 kOffset](const message_bridge::ServerStatistics &stats) {
840 VLOG(1) << "pi2 ServerStatistics " << FlatbufferToJson(&stats);
841 EXPECT_EQ(stats.connections()->size(), 1u);
842
843 const message_bridge::ServerConnection *connection =
844 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800845 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700846 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
847 EXPECT_TRUE(connection->has_monotonic_offset());
848 EXPECT_EQ(connection->monotonic_offset(),
849 -chrono::nanoseconds(kOffset).count());
850 ++pi2_server_statistics_count;
851 });
852
853 int pi3_server_statistics_count = 0;
854 pi3_pong_counter_event_loop->MakeWatcher(
855 "/pi3/aos", [&pi3_server_statistics_count](
856 const message_bridge::ServerStatistics &stats) {
857 VLOG(1) << "pi3 ServerStatistics " << FlatbufferToJson(&stats);
858 EXPECT_EQ(stats.connections()->size(), 1u);
859
860 const message_bridge::ServerConnection *connection =
861 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800862 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700863 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
864 EXPECT_TRUE(connection->has_monotonic_offset());
865 EXPECT_EQ(connection->monotonic_offset(), 0);
866 ++pi3_server_statistics_count;
867 });
868
869 simulated_event_loop_factory.RunFor(chrono::seconds(10) -
870 chrono::milliseconds(500) +
871 chrono::milliseconds(5));
872
Austin Schuh20ac95d2020-12-05 17:24:19 -0800873 EXPECT_EQ(pi1_server_statistics_count, 10);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700874 EXPECT_EQ(pi2_server_statistics_count, 9);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800875 EXPECT_EQ(pi3_server_statistics_count, 10);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700876}
877
878// Test that disabling statistics actually disables them.
Austin Schuh89c9b812021-02-20 14:42:10 -0800879TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeWithoutStatistics) {
Austin Schuh4c3b9702020-08-30 11:34:55 -0700880 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
881 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
882 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
883
884 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
885 simulated_event_loop_factory.DisableStatistics();
886
887 std::unique_ptr<EventLoop> ping_event_loop =
888 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
889 Ping ping(ping_event_loop.get());
890
891 std::unique_ptr<EventLoop> pong_event_loop =
892 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
893 Pong pong(pong_event_loop.get());
894
895 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
896 simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
897
898 MessageCounter<examples::Pong> pi2_pong_counter(
899 pi2_pong_counter_event_loop.get(), "/test");
900
901 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
902 simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
903
904 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
905 simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
906
907 MessageCounter<examples::Pong> pi1_pong_counter(
908 pi1_pong_counter_event_loop.get(), "/test");
909
910 // Count timestamps.
911 MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
912 pi1_pong_counter_event_loop.get(), "/pi1/aos");
913 MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
914 pi2_pong_counter_event_loop.get(), "/pi1/aos");
915 MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
916 pi3_pong_counter_event_loop.get(), "/pi1/aos");
917 MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
918 pi1_pong_counter_event_loop.get(), "/pi2/aos");
919 MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
920 pi2_pong_counter_event_loop.get(), "/pi2/aos");
921 MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
922 pi1_pong_counter_event_loop.get(), "/pi3/aos");
923 MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
924 pi3_pong_counter_event_loop.get(), "/pi3/aos");
925
Austin Schuh2f8fd752020-09-01 22:38:28 -0700926 // Count remote timestamps
Austin Schuh89c9b812021-02-20 14:42:10 -0800927 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
928 remote_timestamps_pi2_on_pi1 =
929 MakePi2OnPi1MessageCounters(pi1_pong_counter_event_loop.get());
930 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
931 remote_timestamps_pi1_on_pi2 =
932 MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700933
Austin Schuh4c3b9702020-08-30 11:34:55 -0700934 MessageCounter<message_bridge::ServerStatistics>
935 pi1_server_statistics_counter(pi1_pong_counter_event_loop.get(),
936 "/pi1/aos");
937 MessageCounter<message_bridge::ServerStatistics>
938 pi2_server_statistics_counter(pi2_pong_counter_event_loop.get(),
939 "/pi2/aos");
940 MessageCounter<message_bridge::ServerStatistics>
941 pi3_server_statistics_counter(pi3_pong_counter_event_loop.get(),
942 "/pi3/aos");
943
944 MessageCounter<message_bridge::ClientStatistics>
945 pi1_client_statistics_counter(pi1_pong_counter_event_loop.get(),
946 "/pi1/aos");
947 MessageCounter<message_bridge::ClientStatistics>
948 pi2_client_statistics_counter(pi2_pong_counter_event_loop.get(),
949 "/pi2/aos");
950 MessageCounter<message_bridge::ClientStatistics>
951 pi3_client_statistics_counter(pi3_pong_counter_event_loop.get(),
952 "/pi3/aos");
Austin Schuh898f4972020-01-11 17:21:25 -0800953
954 simulated_event_loop_factory.RunFor(chrono::seconds(10) +
955 chrono::milliseconds(5));
956
Austin Schuh4c3b9702020-08-30 11:34:55 -0700957 EXPECT_EQ(pi1_pong_counter.count(), 1001u);
958 EXPECT_EQ(pi2_pong_counter.count(), 1001u);
959
960 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 0u);
961 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 0u);
962 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 0u);
963 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 0u);
964 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 0u);
965 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 0u);
966 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 0u);
967
968 EXPECT_EQ(pi1_server_statistics_counter.count(), 0u);
969 EXPECT_EQ(pi2_server_statistics_counter.count(), 0u);
970 EXPECT_EQ(pi3_server_statistics_counter.count(), 0u);
971
972 EXPECT_EQ(pi1_client_statistics_counter.count(), 0u);
973 EXPECT_EQ(pi2_client_statistics_counter.count(), 0u);
974 EXPECT_EQ(pi3_client_statistics_counter.count(), 0u);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700975
976 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -0800977 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 1001);
978 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 1001);
Austin Schuh898f4972020-01-11 17:21:25 -0800979}
980
Austin Schuhc0b0f722020-12-12 18:36:06 -0800981bool AllConnected(const message_bridge::ServerStatistics *server_statistics) {
982 for (const message_bridge::ServerConnection *connection :
983 *server_statistics->connections()) {
984 if (connection->state() != message_bridge::State::CONNECTED) {
985 return false;
986 }
987 }
988 return true;
989}
990
991bool AllConnectedBut(const message_bridge::ServerStatistics *server_statistics,
992 std::string_view target) {
993 for (const message_bridge::ServerConnection *connection :
994 *server_statistics->connections()) {
995 if (connection->node()->name()->string_view() == target) {
996 if (connection->state() == message_bridge::State::CONNECTED) {
997 return false;
998 }
999 } else {
1000 if (connection->state() != message_bridge::State::CONNECTED) {
1001 return false;
1002 }
1003 }
1004 }
1005 return true;
1006}
1007
1008bool AllConnected(const message_bridge::ClientStatistics *client_statistics) {
1009 for (const message_bridge::ClientConnection *connection :
1010 *client_statistics->connections()) {
1011 if (connection->state() != message_bridge::State::CONNECTED) {
1012 return false;
1013 }
1014 }
1015 return true;
1016}
1017
1018bool AllConnectedBut(const message_bridge::ClientStatistics *client_statistics,
1019 std::string_view target) {
1020 for (const message_bridge::ClientConnection *connection :
1021 *client_statistics->connections()) {
1022 if (connection->node()->name()->string_view() == target) {
1023 if (connection->state() == message_bridge::State::CONNECTED) {
1024 return false;
1025 }
1026 } else {
1027 if (connection->state() != message_bridge::State::CONNECTED) {
1028 return false;
1029 }
1030 }
1031 }
1032 return true;
1033}
1034
1035// Test that disconnecting nodes actually disconnects them.
Austin Schuh89c9b812021-02-20 14:42:10 -08001036TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeDisconnect) {
Austin Schuhc0b0f722020-12-12 18:36:06 -08001037 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
1038 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
1039 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
1040
1041 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
1042
1043 std::unique_ptr<EventLoop> ping_event_loop =
1044 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
1045 Ping ping(ping_event_loop.get());
1046
1047 std::unique_ptr<EventLoop> pong_event_loop =
1048 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
1049 Pong pong(pong_event_loop.get());
1050
1051 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
1052 simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
1053
1054 MessageCounter<examples::Pong> pi2_pong_counter(
1055 pi2_pong_counter_event_loop.get(), "/test");
1056
1057 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
1058 simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
1059
1060 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
1061 simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
1062
1063 MessageCounter<examples::Pong> pi1_pong_counter(
1064 pi1_pong_counter_event_loop.get(), "/test");
1065
1066 // Count timestamps.
1067 MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
1068 pi1_pong_counter_event_loop.get(), "/pi1/aos");
1069 MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
1070 pi2_pong_counter_event_loop.get(), "/pi1/aos");
1071 MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
1072 pi3_pong_counter_event_loop.get(), "/pi1/aos");
1073 MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
1074 pi1_pong_counter_event_loop.get(), "/pi2/aos");
1075 MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
1076 pi2_pong_counter_event_loop.get(), "/pi2/aos");
1077 MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
1078 pi1_pong_counter_event_loop.get(), "/pi3/aos");
1079 MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
1080 pi3_pong_counter_event_loop.get(), "/pi3/aos");
1081
1082 // Count remote timestamps
Austin Schuh89c9b812021-02-20 14:42:10 -08001083 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1084 remote_timestamps_pi2_on_pi1 =
1085 MakePi2OnPi1MessageCounters(pi1_pong_counter_event_loop.get());
1086 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1087 remote_timestamps_pi1_on_pi2 =
1088 MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001089
1090 MessageCounter<message_bridge::ServerStatistics>
1091 pi1_server_statistics_counter(pi1_pong_counter_event_loop.get(),
1092 "/pi1/aos");
1093 aos::Fetcher<message_bridge::ServerStatistics> pi1_server_statistics_fetcher =
1094 pi1_pong_counter_event_loop
1095 ->MakeFetcher<message_bridge::ServerStatistics>("/pi1/aos");
1096 aos::Fetcher<message_bridge::ClientStatistics> pi1_client_statistics_fetcher =
1097 pi1_pong_counter_event_loop
1098 ->MakeFetcher<message_bridge::ClientStatistics>("/pi1/aos");
1099
1100 MessageCounter<message_bridge::ServerStatistics>
1101 pi2_server_statistics_counter(pi2_pong_counter_event_loop.get(),
1102 "/pi2/aos");
1103 aos::Fetcher<message_bridge::ServerStatistics> pi2_server_statistics_fetcher =
1104 pi2_pong_counter_event_loop
1105 ->MakeFetcher<message_bridge::ServerStatistics>("/pi2/aos");
1106 aos::Fetcher<message_bridge::ClientStatistics> pi2_client_statistics_fetcher =
1107 pi2_pong_counter_event_loop
1108 ->MakeFetcher<message_bridge::ClientStatistics>("/pi2/aos");
1109
1110 MessageCounter<message_bridge::ServerStatistics>
1111 pi3_server_statistics_counter(pi3_pong_counter_event_loop.get(),
1112 "/pi3/aos");
1113 aos::Fetcher<message_bridge::ServerStatistics> pi3_server_statistics_fetcher =
1114 pi3_pong_counter_event_loop
1115 ->MakeFetcher<message_bridge::ServerStatistics>("/pi3/aos");
1116 aos::Fetcher<message_bridge::ClientStatistics> pi3_client_statistics_fetcher =
1117 pi3_pong_counter_event_loop
1118 ->MakeFetcher<message_bridge::ClientStatistics>("/pi3/aos");
1119
1120 MessageCounter<message_bridge::ClientStatistics>
1121 pi1_client_statistics_counter(pi1_pong_counter_event_loop.get(),
1122 "/pi1/aos");
1123 MessageCounter<message_bridge::ClientStatistics>
1124 pi2_client_statistics_counter(pi2_pong_counter_event_loop.get(),
1125 "/pi2/aos");
1126 MessageCounter<message_bridge::ClientStatistics>
1127 pi3_client_statistics_counter(pi3_pong_counter_event_loop.get(),
1128 "/pi3/aos");
1129
1130 simulated_event_loop_factory.RunFor(chrono::seconds(2) +
1131 chrono::milliseconds(5));
1132
1133 EXPECT_EQ(pi1_pong_counter.count(), 201u);
1134 EXPECT_EQ(pi2_pong_counter.count(), 201u);
1135
1136 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 20u);
1137 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 20u);
1138 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 20u);
1139 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 20u);
1140 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 20u);
1141 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 20u);
1142 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 20u);
1143
1144 EXPECT_EQ(pi1_server_statistics_counter.count(), 2u);
1145 EXPECT_EQ(pi2_server_statistics_counter.count(), 2u);
1146 EXPECT_EQ(pi3_server_statistics_counter.count(), 2u);
1147
1148 EXPECT_EQ(pi1_client_statistics_counter.count(), 20u);
1149 EXPECT_EQ(pi2_client_statistics_counter.count(), 20u);
1150 EXPECT_EQ(pi3_client_statistics_counter.count(), 20u);
1151
1152 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001153 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 221);
1154 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 221);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001155
1156 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1157 EXPECT_TRUE(AllConnected(pi1_server_statistics_fetcher.get()))
1158 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1159 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1160 EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
1161 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1162 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1163 EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
1164 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1165 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1166 EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
1167 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1168 EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
1169 EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
1170 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
1171 EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
1172 EXPECT_TRUE(AllConnected(pi3_client_statistics_fetcher.get()))
1173 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1174
1175 simulated_event_loop_factory.GetNodeEventLoopFactory(pi1)->Disconnect(pi3);
1176
1177 simulated_event_loop_factory.RunFor(chrono::seconds(2));
1178
1179 EXPECT_EQ(pi1_pong_counter.count(), 401u);
1180 EXPECT_EQ(pi2_pong_counter.count(), 401u);
1181
1182 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 40u);
1183 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 40u);
1184 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 20u);
1185 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 40u);
1186 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 40u);
1187 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 40u);
1188 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 40u);
1189
1190 EXPECT_EQ(pi1_server_statistics_counter.count(), 4u);
1191 EXPECT_EQ(pi2_server_statistics_counter.count(), 4u);
1192 EXPECT_EQ(pi3_server_statistics_counter.count(), 4u);
1193
1194 EXPECT_EQ(pi1_client_statistics_counter.count(), 40u);
1195 EXPECT_EQ(pi2_client_statistics_counter.count(), 40u);
1196 EXPECT_EQ(pi3_client_statistics_counter.count(), 40u);
1197
1198 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001199 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 441);
1200 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 441);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001201
1202 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1203 EXPECT_TRUE(AllConnectedBut(pi1_server_statistics_fetcher.get(), "pi3"))
1204 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1205 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1206 EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
1207 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1208 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1209 EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
1210 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1211 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1212 EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
1213 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1214 EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
1215 EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
1216 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
1217 EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
1218 EXPECT_TRUE(AllConnectedBut(pi3_client_statistics_fetcher.get(), "pi1"))
1219 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1220
1221 simulated_event_loop_factory.GetNodeEventLoopFactory(pi1)->Connect(pi3);
1222
1223 simulated_event_loop_factory.RunFor(chrono::seconds(2));
1224
1225 EXPECT_EQ(pi1_pong_counter.count(), 601u);
1226 EXPECT_EQ(pi2_pong_counter.count(), 601u);
1227
1228 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 60u);
1229 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 60u);
1230 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 40u);
1231 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 60u);
1232 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 60u);
1233 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 60u);
1234 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 60u);
1235
1236 EXPECT_EQ(pi1_server_statistics_counter.count(), 6u);
1237 EXPECT_EQ(pi2_server_statistics_counter.count(), 6u);
1238 EXPECT_EQ(pi3_server_statistics_counter.count(), 6u);
1239
1240 EXPECT_EQ(pi1_client_statistics_counter.count(), 60u);
1241 EXPECT_EQ(pi2_client_statistics_counter.count(), 60u);
1242 EXPECT_EQ(pi3_client_statistics_counter.count(), 60u);
1243
1244 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001245 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 661);
1246 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 661);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001247
1248 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1249 EXPECT_TRUE(AllConnected(pi1_server_statistics_fetcher.get()))
1250 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1251 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1252 EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
1253 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1254 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1255 EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
1256 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1257 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1258 EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
1259 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1260 EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
1261 EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
1262 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
1263 EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
1264 EXPECT_TRUE(AllConnected(pi3_client_statistics_fetcher.get()))
1265 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1266}
1267
Austin Schuh2febf0d2020-09-21 22:24:30 -07001268// Tests that the time offset having a slope doesn't break the world.
1269// SimulatedMessageBridge has enough self consistency CHECK statements to
1270// confirm, and we can can also check a message in each direction to make sure
1271// it gets delivered as expected.
1272TEST(SimulatedEventLoopTest, MultinodePingPongWithOffsetAndSlope) {
1273 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
Austin Schuh89c9b812021-02-20 14:42:10 -08001274 aos::configuration::ReadConfig(
1275 ConfigPrefix() +
1276 "events/multinode_pingpong_test_combined_config.json");
Austin Schuh2febf0d2020-09-21 22:24:30 -07001277 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
Austin Schuh87dd3832021-01-01 23:07:31 -08001278 const size_t pi1_index = configuration::GetNodeIndex(&config.message(), pi1);
1279 ASSERT_EQ(pi1_index, 0u);
Austin Schuh2febf0d2020-09-21 22:24:30 -07001280 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
Austin Schuh87dd3832021-01-01 23:07:31 -08001281 const size_t pi2_index = configuration::GetNodeIndex(&config.message(), pi2);
1282 ASSERT_EQ(pi2_index, 1u);
1283 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
1284 const size_t pi3_index = configuration::GetNodeIndex(&config.message(), pi3);
1285 ASSERT_EQ(pi3_index, 2u);
Austin Schuh2febf0d2020-09-21 22:24:30 -07001286
Austin Schuh87dd3832021-01-01 23:07:31 -08001287 message_bridge::TestingTimeConverter time(
1288 configuration::NodesCount(&config.message()));
Austin Schuh2febf0d2020-09-21 22:24:30 -07001289 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
1290 NodeEventLoopFactory *pi2_factory =
1291 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2);
Austin Schuh87dd3832021-01-01 23:07:31 -08001292 pi2_factory->SetTimeConverter(&time);
Austin Schuh2febf0d2020-09-21 22:24:30 -07001293
Austin Schuh2febf0d2020-09-21 22:24:30 -07001294 constexpr chrono::milliseconds kOffset{150100};
Austin Schuh87dd3832021-01-01 23:07:31 -08001295 time.AddNextTimestamp(
1296 distributed_clock::epoch(),
1297 {monotonic_clock::epoch(), monotonic_clock::epoch() + kOffset,
1298 monotonic_clock::epoch()});
1299 time.AddNextTimestamp(
1300 distributed_clock::epoch() + chrono::seconds(10),
1301 {monotonic_clock::epoch() + chrono::milliseconds(9999),
1302 monotonic_clock::epoch() + kOffset + chrono::seconds(10),
1303 monotonic_clock::epoch() + chrono::milliseconds(9999)});
Austin Schuh2febf0d2020-09-21 22:24:30 -07001304
1305 std::unique_ptr<EventLoop> ping_event_loop =
1306 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
1307 Ping ping(ping_event_loop.get());
1308
1309 std::unique_ptr<EventLoop> pong_event_loop =
1310 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
1311 Pong pong(pong_event_loop.get());
1312
1313 std::unique_ptr<EventLoop> pi1_counter_event_loop =
1314 simulated_event_loop_factory.MakeEventLoop("pi1_counter", pi1);
1315 std::unique_ptr<EventLoop> pi2_counter_event_loop =
1316 simulated_event_loop_factory.MakeEventLoop("pi2_counter", pi2);
1317
1318 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
1319 pi1_counter_event_loop->MakeFetcher<examples::Ping>("/test");
1320 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
1321 pi2_counter_event_loop->MakeFetcher<examples::Ping>("/test");
1322
1323 aos::Fetcher<examples::Pong> pong_on_pi2_fetcher =
1324 pi2_counter_event_loop->MakeFetcher<examples::Pong>("/test");
1325 aos::Fetcher<examples::Pong> pong_on_pi1_fetcher =
1326 pi1_counter_event_loop->MakeFetcher<examples::Pong>("/test");
1327
1328 // End after a pong message comes back. This will leave the latest messages
1329 // on all channels so we can look at timestamps easily and check they make
1330 // sense.
1331 std::unique_ptr<EventLoop> pi1_pong_ender =
1332 simulated_event_loop_factory.MakeEventLoop("pi2_counter", pi1);
1333 int count = 0;
1334 pi1_pong_ender->MakeWatcher(
1335 "/test", [&simulated_event_loop_factory, &count](const examples::Pong &) {
1336 if (++count == 100) {
1337 simulated_event_loop_factory.Exit();
1338 }
1339 });
1340
1341 // Run enough that messages should be delivered.
1342 simulated_event_loop_factory.Run();
1343
1344 // Grab the latest messages.
1345 EXPECT_TRUE(ping_on_pi1_fetcher.Fetch());
1346 EXPECT_TRUE(ping_on_pi2_fetcher.Fetch());
1347 EXPECT_TRUE(pong_on_pi1_fetcher.Fetch());
1348 EXPECT_TRUE(pong_on_pi2_fetcher.Fetch());
1349
1350 // Compute their time on the global distributed clock so we can compute
1351 // distance betwen them.
1352 const distributed_clock::time_point pi1_ping_time =
1353 simulated_event_loop_factory.GetNodeEventLoopFactory(pi1)
1354 ->ToDistributedClock(
1355 ping_on_pi1_fetcher.context().monotonic_event_time);
1356 const distributed_clock::time_point pi2_ping_time =
1357 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
1358 ->ToDistributedClock(
1359 ping_on_pi2_fetcher.context().monotonic_event_time);
1360 const distributed_clock::time_point pi1_pong_time =
1361 simulated_event_loop_factory.GetNodeEventLoopFactory(pi1)
1362 ->ToDistributedClock(
1363 pong_on_pi1_fetcher.context().monotonic_event_time);
1364 const distributed_clock::time_point pi2_pong_time =
1365 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
1366 ->ToDistributedClock(
1367 pong_on_pi2_fetcher.context().monotonic_event_time);
1368
1369 // And confirm the delivery delay is just about exactly 150 uS for both
1370 // directions like expected. There will be a couple ns of rounding errors in
1371 // the conversion functions that aren't worth accounting for right now. This
1372 // will either be really close, or really far.
1373 EXPECT_GE(pi2_ping_time, chrono::microseconds(150) - chrono::nanoseconds(10) +
1374 pi1_ping_time);
1375 EXPECT_LE(pi2_ping_time, chrono::microseconds(150) + chrono::nanoseconds(10) +
1376 pi1_ping_time);
1377
1378 EXPECT_GE(pi1_pong_time, chrono::microseconds(150) - chrono::nanoseconds(10) +
1379 pi2_pong_time);
1380 EXPECT_LE(pi1_pong_time, chrono::microseconds(150) + chrono::nanoseconds(10) +
1381 pi2_pong_time);
1382}
1383
Austin Schuh4c570ea2020-11-19 23:13:24 -08001384void SendPing(aos::Sender<examples::Ping> *sender, int value) {
1385 aos::Sender<examples::Ping>::Builder builder = sender->MakeBuilder();
1386 examples::Ping::Builder ping_builder = builder.MakeBuilder<examples::Ping>();
1387 ping_builder.add_value(value);
1388 builder.Send(ping_builder.Finish());
1389}
1390
1391// Tests that reliable (and unreliable) ping messages get forwarded as expected.
Austin Schuh89c9b812021-02-20 14:42:10 -08001392TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeStartupTesting) {
Austin Schuh4c570ea2020-11-19 23:13:24 -08001393 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
1394 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
1395
1396 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
1397
1398 std::unique_ptr<EventLoop> ping_event_loop =
1399 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
1400 aos::Sender<examples::Ping> pi1_reliable_sender =
1401 ping_event_loop->MakeSender<examples::Ping>("/reliable");
1402 aos::Sender<examples::Ping> pi1_unreliable_sender =
1403 ping_event_loop->MakeSender<examples::Ping>("/unreliable");
1404 SendPing(&pi1_reliable_sender, 1);
1405 SendPing(&pi1_unreliable_sender, 1);
1406
1407 std::unique_ptr<EventLoop> pi2_pong_event_loop =
1408 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
1409 MessageCounter<examples::Ping> pi2_reliable_counter(pi2_pong_event_loop.get(),
1410 "/reliable");
1411 MessageCounter<examples::Ping> pi2_unreliable_counter(
1412 pi2_pong_event_loop.get(), "/unreliable");
1413 aos::Fetcher<examples::Ping> reliable_on_pi2_fetcher =
1414 pi2_pong_event_loop->MakeFetcher<examples::Ping>("/reliable");
1415 aos::Fetcher<examples::Ping> unreliable_on_pi2_fetcher =
1416 pi2_pong_event_loop->MakeFetcher<examples::Ping>("/unreliable");
1417
1418 const size_t reliable_channel_index = configuration::ChannelIndex(
1419 pi2_pong_event_loop->configuration(), reliable_on_pi2_fetcher.channel());
1420
1421 std::unique_ptr<EventLoop> pi1_remote_timestamp =
1422 simulated_event_loop_factory.MakeEventLoop("pi1_remote_timestamp", pi1);
1423
Austin Schuheeaa2022021-01-02 21:52:03 -08001424 const chrono::nanoseconds network_delay =
1425 simulated_event_loop_factory.network_delay();
1426
Austin Schuh4c570ea2020-11-19 23:13:24 -08001427 int reliable_timestamp_count = 0;
1428 pi1_remote_timestamp->MakeWatcher(
Austin Schuh89c9b812021-02-20 14:42:10 -08001429 shared() ? "/pi1/aos/remote_timestamps/pi2"
1430 : "/pi1/aos/remote_timestamps/pi2/reliable/aos-examples-Ping",
Austin Schuh20ac95d2020-12-05 17:24:19 -08001431 [reliable_channel_index, &reliable_timestamp_count,
Austin Schuheeaa2022021-01-02 21:52:03 -08001432 &simulated_event_loop_factory, pi2, network_delay, &pi2_pong_event_loop,
1433 &pi1_remote_timestamp](const RemoteMessage &header) {
Austin Schuh20ac95d2020-12-05 17:24:19 -08001434 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuhcdd90272021-03-15 12:46:16 -07001435 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
Austin Schuh20ac95d2020-12-05 17:24:19 -08001436 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
Austin Schuhcdd90272021-03-15 12:46:16 -07001437 ->boot_uuid());
Austin Schuh4c570ea2020-11-19 23:13:24 -08001438 VLOG(1) << aos::FlatbufferToJson(&header);
1439 if (header.channel_index() == reliable_channel_index) {
1440 ++reliable_timestamp_count;
1441 }
Austin Schuheeaa2022021-01-02 21:52:03 -08001442
1443 const aos::monotonic_clock::time_point header_monotonic_sent_time(
1444 chrono::nanoseconds(header.monotonic_sent_time()));
1445
1446 EXPECT_EQ(pi1_remote_timestamp->context().monotonic_event_time,
1447 header_monotonic_sent_time + network_delay +
1448 (pi1_remote_timestamp->monotonic_now() -
1449 pi2_pong_event_loop->monotonic_now()));
Austin Schuh4c570ea2020-11-19 23:13:24 -08001450 });
1451
1452 // Wait to let timestamp estimation start up before looking for the results.
1453 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
1454
1455 EXPECT_EQ(pi2_reliable_counter.count(), 1u);
1456 // This one isn't reliable, but was sent before the start. It should *not* be
1457 // delivered.
1458 EXPECT_EQ(pi2_unreliable_counter.count(), 0u);
1459 // Confirm we got a timestamp logged for the message that was forwarded.
1460 EXPECT_EQ(reliable_timestamp_count, 1u);
1461
1462 SendPing(&pi1_reliable_sender, 2);
1463 SendPing(&pi1_unreliable_sender, 2);
1464 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
1465 EXPECT_EQ(pi2_reliable_counter.count(), 2u);
1466 EXPECT_EQ(pi2_unreliable_counter.count(), 1u);
1467
1468 EXPECT_EQ(reliable_timestamp_count, 2u);
1469}
1470
Austin Schuh20ac95d2020-12-05 17:24:19 -08001471// Tests that rebooting a node changes the ServerStatistics message and the
1472// RemoteTimestamp message.
Austin Schuh89c9b812021-02-20 14:42:10 -08001473TEST_P(RemoteMessageSimulatedEventLoopTest, BootUUIDTest) {
Austin Schuh20ac95d2020-12-05 17:24:19 -08001474 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
1475 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
1476
1477 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
1478
1479 std::unique_ptr<EventLoop> ping_event_loop =
1480 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
1481 Ping ping(ping_event_loop.get());
1482
1483 std::unique_ptr<EventLoop> pong_event_loop =
1484 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
1485 Pong pong(pong_event_loop.get());
1486
1487 std::unique_ptr<EventLoop> pi1_remote_timestamp =
1488 simulated_event_loop_factory.MakeEventLoop("pi1_remote_timestamp", pi1);
Austin Schuh8902fa52021-03-14 22:39:24 -07001489 UUID expected_boot_uuid =
1490 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)->boot_uuid();
Austin Schuh20ac95d2020-12-05 17:24:19 -08001491
1492 int timestamp_count = 0;
1493 pi1_remote_timestamp->MakeWatcher(
Austin Schuh8902fa52021-03-14 22:39:24 -07001494 "/pi2/aos", [&expected_boot_uuid,
1495 &pi1_remote_timestamp](const message_bridge::Timestamp &) {
1496 EXPECT_EQ(pi1_remote_timestamp->context().remote_boot_uuid,
1497 expected_boot_uuid);
1498 });
1499 pi1_remote_timestamp->MakeWatcher(
1500 "/test",
1501 [&expected_boot_uuid, &pi1_remote_timestamp](const examples::Pong &) {
1502 EXPECT_EQ(pi1_remote_timestamp->context().remote_boot_uuid,
1503 expected_boot_uuid);
1504 });
1505 pi1_remote_timestamp->MakeWatcher(
Austin Schuh89c9b812021-02-20 14:42:10 -08001506 shared() ? "/pi1/aos/remote_timestamps/pi2"
1507 : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping",
Austin Schuh20ac95d2020-12-05 17:24:19 -08001508 [&timestamp_count, &expected_boot_uuid](const RemoteMessage &header) {
1509 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuhcdd90272021-03-15 12:46:16 -07001510 EXPECT_EQ(UUID::FromVector(header.boot_uuid()), expected_boot_uuid);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001511 VLOG(1) << aos::FlatbufferToJson(&header);
1512 ++timestamp_count;
1513 });
1514
1515 int pi1_server_statistics_count = 0;
1516 pi1_remote_timestamp->MakeWatcher(
1517 "/pi1/aos", [&pi1_server_statistics_count, &expected_boot_uuid](
1518 const message_bridge::ServerStatistics &stats) {
1519 VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
1520 for (const message_bridge::ServerConnection *connection :
1521 *stats.connections()) {
1522 EXPECT_TRUE(connection->has_boot_uuid());
1523 if (connection->node()->name()->string_view() == "pi2") {
1524 EXPECT_EQ(expected_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -07001525 UUID::FromString(connection->boot_uuid()))
Austin Schuh20ac95d2020-12-05 17:24:19 -08001526 << " : Got " << aos::FlatbufferToJson(&stats);
1527 ++pi1_server_statistics_count;
1528 }
1529 }
1530 });
1531
1532 // Let a couple of ServerStatistics messages show up before rebooting.
1533 simulated_event_loop_factory.RunFor(chrono::milliseconds(2001));
1534
1535 EXPECT_GT(timestamp_count, 100);
1536 EXPECT_GE(pi1_server_statistics_count, 1u);
1537
1538 // Confirm that reboot changes the UUID.
1539 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)->Reboot();
1540
Austin Schuh8902fa52021-03-14 22:39:24 -07001541 EXPECT_NE(
1542 expected_boot_uuid,
1543 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)->boot_uuid());
Austin Schuh20ac95d2020-12-05 17:24:19 -08001544
1545 expected_boot_uuid =
Austin Schuh8902fa52021-03-14 22:39:24 -07001546 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)->boot_uuid();
Austin Schuh20ac95d2020-12-05 17:24:19 -08001547 timestamp_count = 0;
1548 pi1_server_statistics_count = 0;
1549
1550 simulated_event_loop_factory.RunFor(chrono::milliseconds(2000));
1551 EXPECT_GT(timestamp_count, 100);
1552 EXPECT_GE(pi1_server_statistics_count, 1u);
1553}
1554
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -07001555INSTANTIATE_TEST_SUITE_P(
Austin Schuh89c9b812021-02-20 14:42:10 -08001556 All, RemoteMessageSimulatedEventLoopTest,
1557 ::testing::Values(
1558 Param{"multinode_pingpong_test_combined_config.json", true},
1559 Param{"multinode_pingpong_test_split_config.json", false}));
1560
Neil Balchc8f41ed2018-01-20 22:06:53 -08001561} // namespace testing
1562} // namespace aos