blob: 09202ffbcb4050064579c3f32ac2ea574244b2c9 [file] [log] [blame]
Alex Perrycb7da4b2019-08-28 19:35:56 -07001#include "aos/events/simulated_event_loop.h"
2
milind1f1dca32021-07-03 13:50:07 -07003#include <chrono>
Austin Schuh5f1cc5c2019-12-01 18:01:11 -08004#include <string_view>
5
Alex Perrycb7da4b2019-08-28 19:35:56 -07006#include "aos/events/event_loop_param_test.h"
Austin Schuh2f8fd752020-09-01 22:38:28 -07007#include "aos/events/logging/logger_generated.h"
Austin Schuh01b4c352020-09-21 23:09:39 -07008#include "aos/events/message_counter.h"
Austin Schuh898f4972020-01-11 17:21:25 -08009#include "aos/events/ping_lib.h"
10#include "aos/events/pong_lib.h"
Austin Schuh7d87b672019-12-01 20:23:49 -080011#include "aos/events/test_message_generated.h"
Austin Schuh4c3b9702020-08-30 11:34:55 -070012#include "aos/network/message_bridge_client_generated.h"
13#include "aos/network/message_bridge_server_generated.h"
Austin Schuh0de30f32020-12-06 12:44:28 -080014#include "aos/network/remote_message_generated.h"
Austin Schuh87dd3832021-01-01 23:07:31 -080015#include "aos/network/testing_time_converter.h"
Austin Schuh4c3b9702020-08-30 11:34:55 -070016#include "aos/network/timestamp_generated.h"
Austin Schuh373f1762021-06-02 21:07:09 -070017#include "aos/testing/path.h"
Neil Balchc8f41ed2018-01-20 22:06:53 -080018#include "gtest/gtest.h"
19
20namespace aos {
21namespace testing {
Brian Silverman28d14302020-09-18 15:26:17 -070022namespace {
23
Austin Schuh373f1762021-06-02 21:07:09 -070024using aos::testing::ArtifactPath;
Brian Silverman28d14302020-09-18 15:26:17 -070025
Austin Schuh58646e22021-08-23 23:51:46 -070026using logger::BootTimestamp;
Austin Schuh0de30f32020-12-06 12:44:28 -080027using message_bridge::RemoteMessage;
Austin Schuh7267c532019-05-19 19:55:53 -070028namespace chrono = ::std::chrono;
29
Austin Schuh0de30f32020-12-06 12:44:28 -080030} // namespace
31
Neil Balchc8f41ed2018-01-20 22:06:53 -080032class SimulatedEventLoopTestFactory : public EventLoopTestFactory {
33 public:
Austin Schuh5f1cc5c2019-12-01 18:01:11 -080034 ::std::unique_ptr<EventLoop> Make(std::string_view name) override {
Austin Schuh217a9782019-12-21 23:02:50 -080035 MaybeMake();
Austin Schuhac0771c2020-01-07 18:36:30 -080036 return event_loop_factory_->MakeEventLoop(name, my_node());
Neil Balchc8f41ed2018-01-20 22:06:53 -080037 }
Austin Schuh5f1cc5c2019-12-01 18:01:11 -080038 ::std::unique_ptr<EventLoop> MakePrimary(std::string_view name) override {
Austin Schuh217a9782019-12-21 23:02:50 -080039 MaybeMake();
Austin Schuhac0771c2020-01-07 18:36:30 -080040 return event_loop_factory_->MakeEventLoop(name, my_node());
Austin Schuh44019f92019-05-19 19:58:27 -070041 }
42
Austin Schuh217a9782019-12-21 23:02:50 -080043 void Run() override { event_loop_factory_->Run(); }
44 void Exit() override { event_loop_factory_->Exit(); }
Austin Schuh44019f92019-05-19 19:58:27 -070045
Austin Schuh52d325c2019-06-23 18:59:06 -070046 // TODO(austin): Implement this. It's used currently for a phased loop test.
47 // I'm not sure how much that matters.
48 void SleepFor(::std::chrono::nanoseconds /*duration*/) override {}
49
Austin Schuh7d87b672019-12-01 20:23:49 -080050 void set_send_delay(std::chrono::nanoseconds send_delay) {
Austin Schuh217a9782019-12-21 23:02:50 -080051 MaybeMake();
52 event_loop_factory_->set_send_delay(send_delay);
Austin Schuh7d87b672019-12-01 20:23:49 -080053 }
54
Neil Balchc8f41ed2018-01-20 22:06:53 -080055 private:
Austin Schuh217a9782019-12-21 23:02:50 -080056 void MaybeMake() {
57 if (!event_loop_factory_) {
58 if (configuration()->has_nodes()) {
Austin Schuhac0771c2020-01-07 18:36:30 -080059 event_loop_factory_ =
60 std::make_unique<SimulatedEventLoopFactory>(configuration());
Austin Schuh217a9782019-12-21 23:02:50 -080061 } else {
62 event_loop_factory_ =
63 std::make_unique<SimulatedEventLoopFactory>(configuration());
64 }
65 }
66 }
67 std::unique_ptr<SimulatedEventLoopFactory> event_loop_factory_;
Neil Balchc8f41ed2018-01-20 22:06:53 -080068};
69
Austin Schuh6bae8252021-02-07 22:01:49 -080070auto CommonParameters() {
71 return ::testing::Combine(
72 ::testing::Values([]() { return new SimulatedEventLoopTestFactory(); }),
73 ::testing::Values(ReadMethod::COPY, ReadMethod::PIN),
74 ::testing::Values(DoTimingReports::kYes, DoTimingReports::kNo));
75}
Austin Schuh6b6dfa52019-06-12 20:16:20 -070076
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -070077INSTANTIATE_TEST_SUITE_P(SimulatedEventLoopCommonTest, AbstractEventLoopTest,
Austin Schuh66168842021-08-17 19:42:21 -070078 CommonParameters());
Brian Silverman77162972020-08-12 19:52:40 -070079
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -070080INSTANTIATE_TEST_SUITE_P(SimulatedEventLoopCommonDeathTest,
Austin Schuh66168842021-08-17 19:42:21 -070081 AbstractEventLoopDeathTest, CommonParameters());
Neil Balchc8f41ed2018-01-20 22:06:53 -080082
Austin Schuh89c9b812021-02-20 14:42:10 -080083// Parameters to run all the tests with.
84struct Param {
85 // The config file to use.
86 std::string config;
87 // If true, the RemoteMessage channel should be shared between all the remote
88 // channels. If false, there will be 1 RemoteMessage channel per remote
89 // channel.
90 bool shared;
91};
92
93class RemoteMessageSimulatedEventLoopTest
94 : public ::testing::TestWithParam<struct Param> {
95 public:
96 RemoteMessageSimulatedEventLoopTest()
97 : config(aos::configuration::ReadConfig(
Austin Schuh373f1762021-06-02 21:07:09 -070098 ArtifactPath(absl::StrCat("aos/events/", GetParam().config)))) {
Austin Schuh89c9b812021-02-20 14:42:10 -080099 LOG(INFO) << "Config " << GetParam().config;
100 }
101
102 bool shared() const { return GetParam().shared; }
103
104 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
105 MakePi2OnPi1MessageCounters(aos::EventLoop *event_loop) {
106 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>> counters;
107 if (shared()) {
108 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
109 event_loop, "/aos/remote_timestamps/pi2"));
110 } else {
111 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
112 event_loop,
113 "/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp"));
114 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
115 event_loop, "/aos/remote_timestamps/pi2/test/aos-examples-Ping"));
116 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
117 event_loop, "/aos/remote_timestamps/pi2/reliable/aos-examples-Ping"));
118 }
119 return counters;
120 }
121
122 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
123 MakePi1OnPi2MessageCounters(aos::EventLoop *event_loop) {
124 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>> counters;
125 if (shared()) {
126 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
127 event_loop, "/aos/remote_timestamps/pi1"));
128 } else {
129 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
130 event_loop, "/aos/remote_timestamps/pi1/test/aos-examples-Pong"));
131 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
132 event_loop,
133 "/aos/remote_timestamps/pi1/pi2/aos/aos-message_bridge-Timestamp"));
134 }
135 return counters;
136 }
137
138 aos::FlatbufferDetachedBuffer<aos::Configuration> config;
139};
140
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800141class FunctionEvent : public EventScheduler::Event {
142 public:
143 FunctionEvent(std::function<void()> fn) : fn_(fn) {}
144
145 void Handle() noexcept override { fn_(); }
146
147 private:
148 std::function<void()> fn_;
149};
150
Neil Balchc8f41ed2018-01-20 22:06:53 -0800151// Test that creating an event and running the scheduler runs the event.
152TEST(EventSchedulerTest, ScheduleEvent) {
153 int counter = 0;
Austin Schuh8bd96322020-02-13 21:18:22 -0800154 EventSchedulerScheduler scheduler_scheduler;
Austin Schuh58646e22021-08-23 23:51:46 -0700155 EventScheduler scheduler(0);
Austin Schuh8bd96322020-02-13 21:18:22 -0800156 scheduler_scheduler.AddEventScheduler(&scheduler);
Neil Balchc8f41ed2018-01-20 22:06:53 -0800157
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800158 FunctionEvent e([&counter]() { counter += 1; });
159 scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(1), &e);
Austin Schuh8bd96322020-02-13 21:18:22 -0800160 scheduler_scheduler.Run();
Neil Balchc8f41ed2018-01-20 22:06:53 -0800161 EXPECT_EQ(counter, 1);
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800162 auto token =
163 scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(2), &e);
Neil Balchc8f41ed2018-01-20 22:06:53 -0800164 scheduler.Deschedule(token);
Austin Schuh8bd96322020-02-13 21:18:22 -0800165 scheduler_scheduler.Run();
Neil Balchc8f41ed2018-01-20 22:06:53 -0800166 EXPECT_EQ(counter, 1);
167}
168
169// Test that descheduling an already scheduled event doesn't run the event.
170TEST(EventSchedulerTest, DescheduleEvent) {
171 int counter = 0;
Austin Schuh8bd96322020-02-13 21:18:22 -0800172 EventSchedulerScheduler scheduler_scheduler;
Austin Schuh58646e22021-08-23 23:51:46 -0700173 EventScheduler scheduler(0);
Austin Schuh8bd96322020-02-13 21:18:22 -0800174 scheduler_scheduler.AddEventScheduler(&scheduler);
Neil Balchc8f41ed2018-01-20 22:06:53 -0800175
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800176 FunctionEvent e([&counter]() { counter += 1; });
177 auto token =
178 scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(1), &e);
Neil Balchc8f41ed2018-01-20 22:06:53 -0800179 scheduler.Deschedule(token);
Austin Schuh8bd96322020-02-13 21:18:22 -0800180 scheduler_scheduler.Run();
Neil Balchc8f41ed2018-01-20 22:06:53 -0800181 EXPECT_EQ(counter, 0);
182}
Austin Schuh44019f92019-05-19 19:58:27 -0700183
Austin Schuhe33c08d2022-02-03 18:15:21 -0800184// Test that TemporarilyStopAndRun respects and preserves running.
185TEST(EventSchedulerTest, TemporarilyStopAndRun) {
186 int counter = 0;
187 EventSchedulerScheduler scheduler_scheduler;
188 EventScheduler scheduler(0);
189 scheduler_scheduler.AddEventScheduler(&scheduler);
190
191 scheduler_scheduler.TemporarilyStopAndRun(
192 [&]() { CHECK(!scheduler_scheduler.is_running()); });
193 ASSERT_FALSE(scheduler_scheduler.is_running());
194
195 FunctionEvent e([&]() {
196 counter += 1;
197 CHECK(scheduler_scheduler.is_running());
198 scheduler_scheduler.TemporarilyStopAndRun(
199 [&]() { CHECK(!scheduler_scheduler.is_running()); });
200 CHECK(scheduler_scheduler.is_running());
201 });
202 scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(1), &e);
203 scheduler_scheduler.Run();
204 EXPECT_EQ(counter, 1);
205}
206
Austin Schuh8fb315a2020-11-19 22:33:58 -0800207void SendTestMessage(aos::Sender<TestMessage> *sender, int value) {
208 aos::Sender<TestMessage>::Builder builder = sender->MakeBuilder();
209 TestMessage::Builder test_message_builder =
210 builder.MakeBuilder<TestMessage>();
211 test_message_builder.add_value(value);
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800212 ASSERT_EQ(builder.Send(test_message_builder.Finish()), RawSender::Error::kOk);
Austin Schuh8fb315a2020-11-19 22:33:58 -0800213}
214
215// Test that sending a message after running gets properly notified.
216TEST(SimulatedEventLoopTest, SendAfterRunFor) {
217 SimulatedEventLoopTestFactory factory;
218
219 SimulatedEventLoopFactory simulated_event_loop_factory(
220 factory.configuration());
221
222 ::std::unique_ptr<EventLoop> ping_event_loop =
223 simulated_event_loop_factory.MakeEventLoop("ping");
224 aos::Sender<TestMessage> test_message_sender =
225 ping_event_loop->MakeSender<TestMessage>("/test");
226 SendTestMessage(&test_message_sender, 1);
227
228 std::unique_ptr<EventLoop> pong1_event_loop =
229 simulated_event_loop_factory.MakeEventLoop("pong");
230 MessageCounter<TestMessage> test_message_counter1(pong1_event_loop.get(),
231 "/test");
232
233 EXPECT_FALSE(ping_event_loop->is_running());
234
235 // Watchers start when you start running, so there should be nothing counted.
236 simulated_event_loop_factory.RunFor(chrono::seconds(1));
237 EXPECT_EQ(test_message_counter1.count(), 0u);
238
239 std::unique_ptr<EventLoop> pong2_event_loop =
240 simulated_event_loop_factory.MakeEventLoop("pong");
241 MessageCounter<TestMessage> test_message_counter2(pong2_event_loop.get(),
242 "/test");
243
244 // Pauses in the middle don't count though, so this should be counted.
245 // But, the fresh watcher shouldn't pick it up yet.
246 SendTestMessage(&test_message_sender, 2);
247
248 EXPECT_EQ(test_message_counter1.count(), 0u);
249 EXPECT_EQ(test_message_counter2.count(), 0u);
250 simulated_event_loop_factory.RunFor(chrono::seconds(1));
251
252 EXPECT_EQ(test_message_counter1.count(), 1u);
253 EXPECT_EQ(test_message_counter2.count(), 0u);
254}
255
256// Test that creating an event loop while running dies.
257TEST(SimulatedEventLoopDeathTest, MakeEventLoopWhileRunning) {
258 SimulatedEventLoopTestFactory factory;
259
260 SimulatedEventLoopFactory simulated_event_loop_factory(
261 factory.configuration());
262
263 ::std::unique_ptr<EventLoop> event_loop =
264 simulated_event_loop_factory.MakeEventLoop("ping");
265
266 auto timer = event_loop->AddTimer([&]() {
267 EXPECT_DEATH(
268 {
269 ::std::unique_ptr<EventLoop> event_loop2 =
270 simulated_event_loop_factory.MakeEventLoop("ping");
271 },
272 "event loop while running");
273 simulated_event_loop_factory.Exit();
274 });
275
276 event_loop->OnRun([&event_loop, &timer] {
277 timer->Setup(event_loop->monotonic_now() + chrono::milliseconds(50));
278 });
279
280 simulated_event_loop_factory.Run();
281}
282
283// Test that creating a watcher after running dies.
284TEST(SimulatedEventLoopDeathTest, MakeWatcherAfterRunning) {
285 SimulatedEventLoopTestFactory factory;
286
287 SimulatedEventLoopFactory simulated_event_loop_factory(
288 factory.configuration());
289
290 ::std::unique_ptr<EventLoop> event_loop =
291 simulated_event_loop_factory.MakeEventLoop("ping");
292
293 simulated_event_loop_factory.RunFor(chrono::seconds(1));
294
295 EXPECT_DEATH(
296 { MessageCounter<TestMessage> counter(event_loop.get(), "/test"); },
297 "Can't add a watcher after running");
298
299 ::std::unique_ptr<EventLoop> event_loop2 =
300 simulated_event_loop_factory.MakeEventLoop("ping");
301
302 simulated_event_loop_factory.RunFor(chrono::seconds(1));
303
304 EXPECT_DEATH(
305 { MessageCounter<TestMessage> counter(event_loop2.get(), "/test"); },
306 "Can't add a watcher after running");
307}
308
Austin Schuh44019f92019-05-19 19:58:27 -0700309// Test that running for a time period with no handlers causes time to progress
310// correctly.
311TEST(SimulatedEventLoopTest, RunForNoHandlers) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800312 SimulatedEventLoopTestFactory factory;
313
314 SimulatedEventLoopFactory simulated_event_loop_factory(
315 factory.configuration());
Austin Schuh44019f92019-05-19 19:58:27 -0700316 ::std::unique_ptr<EventLoop> event_loop =
Austin Schuh5f1cc5c2019-12-01 18:01:11 -0800317 simulated_event_loop_factory.MakeEventLoop("loop");
Austin Schuh44019f92019-05-19 19:58:27 -0700318
319 simulated_event_loop_factory.RunFor(chrono::seconds(1));
320
321 EXPECT_EQ(::aos::monotonic_clock::epoch() + chrono::seconds(1),
Austin Schuh44019f92019-05-19 19:58:27 -0700322 event_loop->monotonic_now());
323}
324
325// Test that running for a time with a periodic handler causes time to end
326// correctly.
327TEST(SimulatedEventLoopTest, RunForTimerHandler) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800328 SimulatedEventLoopTestFactory factory;
329
330 SimulatedEventLoopFactory simulated_event_loop_factory(
331 factory.configuration());
Austin Schuh44019f92019-05-19 19:58:27 -0700332 ::std::unique_ptr<EventLoop> event_loop =
Austin Schuh5f1cc5c2019-12-01 18:01:11 -0800333 simulated_event_loop_factory.MakeEventLoop("loop");
Austin Schuh44019f92019-05-19 19:58:27 -0700334
335 int counter = 0;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700336 auto timer = event_loop->AddTimer([&counter]() { ++counter; });
Austin Schuh44019f92019-05-19 19:58:27 -0700337 event_loop->OnRun([&event_loop, &timer] {
338 timer->Setup(event_loop->monotonic_now() + chrono::milliseconds(50),
339 chrono::milliseconds(100));
340 });
341
342 simulated_event_loop_factory.RunFor(chrono::seconds(1));
343
344 EXPECT_EQ(::aos::monotonic_clock::epoch() + chrono::seconds(1),
Austin Schuh44019f92019-05-19 19:58:27 -0700345 event_loop->monotonic_now());
346 EXPECT_EQ(counter, 10);
347}
348
Austin Schuh7d87b672019-12-01 20:23:49 -0800349// Tests that watchers have latency in simulation.
350TEST(SimulatedEventLoopTest, WatcherTimingReport) {
351 SimulatedEventLoopTestFactory factory;
352 factory.set_send_delay(std::chrono::microseconds(50));
353
354 FLAGS_timing_report_ms = 1000;
355 auto loop1 = factory.MakePrimary("primary");
356 loop1->MakeWatcher("/test", [](const TestMessage &) {});
357
358 auto loop2 = factory.Make("sender_loop");
359
360 auto loop3 = factory.Make("report_fetcher");
361
362 Fetcher<timing::Report> report_fetcher =
363 loop3->MakeFetcher<timing::Report>("/aos");
364 EXPECT_FALSE(report_fetcher.Fetch());
365
366 auto sender = loop2->MakeSender<TestMessage>("/test");
367
368 // Send 10 messages in the middle of a timing report period so we get
369 // something interesting back.
370 auto test_timer = loop2->AddTimer([&sender]() {
371 for (int i = 0; i < 10; ++i) {
372 aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
373 TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
374 builder.add_value(200 + i);
milind1f1dca32021-07-03 13:50:07 -0700375 msg.CheckOk(msg.Send(builder.Finish()));
Austin Schuh7d87b672019-12-01 20:23:49 -0800376 }
377 });
378
379 // Quit after 1 timing report, mid way through the next cycle.
380 {
381 auto end_timer = loop1->AddTimer([&factory]() { factory.Exit(); });
382 end_timer->Setup(loop1->monotonic_now() + chrono::milliseconds(2500));
383 end_timer->set_name("end");
384 }
385
386 loop1->OnRun([&test_timer, &loop1]() {
387 test_timer->Setup(loop1->monotonic_now() + chrono::milliseconds(1500));
388 });
389
390 factory.Run();
391
392 // And, since we are here, check that the timing report makes sense.
393 // Start by looking for our event loop's timing.
394 FlatbufferDetachedBuffer<timing::Report> primary_report =
395 FlatbufferDetachedBuffer<timing::Report>::Empty();
396 while (report_fetcher.FetchNext()) {
397 LOG(INFO) << "Report " << FlatbufferToJson(report_fetcher.get());
398 if (report_fetcher->name()->string_view() == "primary") {
399 primary_report = CopyFlatBuffer(report_fetcher.get());
400 }
401 }
402
403 // Check the watcher report.
Ravago Jonescf453ab2020-05-06 21:14:53 -0700404 VLOG(1) << FlatbufferToJson(primary_report, {.multi_line = true});
Austin Schuh7d87b672019-12-01 20:23:49 -0800405
406 EXPECT_EQ(primary_report.message().name()->string_view(), "primary");
407
408 // Just the timing report timer.
409 ASSERT_NE(primary_report.message().timers(), nullptr);
410 EXPECT_EQ(primary_report.message().timers()->size(), 2);
411
412 // No phased loops
413 ASSERT_EQ(primary_report.message().phased_loops(), nullptr);
414
415 // And now confirm that the watcher received all 10 messages, and has latency.
416 ASSERT_NE(primary_report.message().watchers(), nullptr);
417 ASSERT_EQ(primary_report.message().watchers()->size(), 1);
418 EXPECT_EQ(primary_report.message().watchers()->Get(0)->count(), 10);
419 EXPECT_NEAR(
420 primary_report.message().watchers()->Get(0)->wakeup_latency()->average(),
421 0.00005, 1e-9);
422 EXPECT_NEAR(
423 primary_report.message().watchers()->Get(0)->wakeup_latency()->min(),
424 0.00005, 1e-9);
425 EXPECT_NEAR(
426 primary_report.message().watchers()->Get(0)->wakeup_latency()->max(),
427 0.00005, 1e-9);
428 EXPECT_EQ(primary_report.message()
429 .watchers()
430 ->Get(0)
431 ->wakeup_latency()
432 ->standard_deviation(),
433 0.0);
434
435 EXPECT_EQ(
436 primary_report.message().watchers()->Get(0)->handler_time()->average(),
437 0.0);
438 EXPECT_EQ(primary_report.message().watchers()->Get(0)->handler_time()->min(),
439 0.0);
440 EXPECT_EQ(primary_report.message().watchers()->Get(0)->handler_time()->max(),
441 0.0);
442 EXPECT_EQ(primary_report.message()
443 .watchers()
444 ->Get(0)
445 ->handler_time()
446 ->standard_deviation(),
447 0.0);
448}
449
Austin Schuh89c9b812021-02-20 14:42:10 -0800450size_t CountAll(
451 const std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
452 &counters) {
453 size_t count = 0u;
454 for (const std::unique_ptr<MessageCounter<RemoteMessage>> &counter :
455 counters) {
456 count += counter->count();
457 }
458 return count;
459}
460
Austin Schuh4c3b9702020-08-30 11:34:55 -0700461// Tests that ping and pong work when on 2 different nodes, and the message
462// gateway messages are sent out as expected.
Austin Schuh89c9b812021-02-20 14:42:10 -0800463TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodePingPong) {
Austin Schuh898f4972020-01-11 17:21:25 -0800464 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
465 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
Austin Schuh4c3b9702020-08-30 11:34:55 -0700466 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
Austin Schuh898f4972020-01-11 17:21:25 -0800467
468 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
469
470 std::unique_ptr<EventLoop> ping_event_loop =
471 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
472 Ping ping(ping_event_loop.get());
473
474 std::unique_ptr<EventLoop> pong_event_loop =
475 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
476 Pong pong(pong_event_loop.get());
477
478 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
479 simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700480 MessageCounter<examples::Pong> pi2_pong_counter(
481 pi2_pong_counter_event_loop.get(), "/test");
Austin Schuh2f8fd752020-09-01 22:38:28 -0700482 aos::Fetcher<message_bridge::Timestamp> pi1_on_pi2_timestamp_fetcher =
483 pi2_pong_counter_event_loop->MakeFetcher<message_bridge::Timestamp>(
484 "/pi1/aos");
485 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
486 pi2_pong_counter_event_loop->MakeFetcher<examples::Ping>("/test");
Austin Schuh898f4972020-01-11 17:21:25 -0800487
Austin Schuh4c3b9702020-08-30 11:34:55 -0700488 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
489 simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
Austin Schuh898f4972020-01-11 17:21:25 -0800490
491 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
492 simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700493 MessageCounter<examples::Pong> pi1_pong_counter(
494 pi1_pong_counter_event_loop.get(), "/test");
Austin Schuh2f8fd752020-09-01 22:38:28 -0700495 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
496 pi1_pong_counter_event_loop->MakeFetcher<examples::Ping>("/test");
497 aos::Fetcher<message_bridge::Timestamp> pi1_on_pi1_timestamp_fetcher =
498 pi1_pong_counter_event_loop->MakeFetcher<message_bridge::Timestamp>(
499 "/aos");
500
Austin Schuh4c3b9702020-08-30 11:34:55 -0700501 // Count timestamps.
502 MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
503 pi1_pong_counter_event_loop.get(), "/pi1/aos");
504 MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
505 pi2_pong_counter_event_loop.get(), "/pi1/aos");
506 MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
507 pi3_pong_counter_event_loop.get(), "/pi1/aos");
508 MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
509 pi1_pong_counter_event_loop.get(), "/pi2/aos");
510 MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
511 pi2_pong_counter_event_loop.get(), "/pi2/aos");
512 MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
513 pi1_pong_counter_event_loop.get(), "/pi3/aos");
514 MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
515 pi3_pong_counter_event_loop.get(), "/pi3/aos");
516
Austin Schuh2f8fd752020-09-01 22:38:28 -0700517 // Count remote timestamps
Austin Schuh89c9b812021-02-20 14:42:10 -0800518 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
519 remote_timestamps_pi2_on_pi1 =
520 MakePi2OnPi1MessageCounters(pi1_pong_counter_event_loop.get());
521 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
522 remote_timestamps_pi1_on_pi2 =
523 MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700524
Austin Schuh4c3b9702020-08-30 11:34:55 -0700525 // Wait to let timestamp estimation start up before looking for the results.
526 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
527
Austin Schuh8fb315a2020-11-19 22:33:58 -0800528 std::unique_ptr<EventLoop> pi1_statistics_counter_event_loop =
529 simulated_event_loop_factory.MakeEventLoop("pi1_statistics_counter", pi1);
530 std::unique_ptr<EventLoop> pi2_statistics_counter_event_loop =
531 simulated_event_loop_factory.MakeEventLoop("pi2_statistics_counter", pi2);
532 std::unique_ptr<EventLoop> pi3_statistics_counter_event_loop =
533 simulated_event_loop_factory.MakeEventLoop("pi3_statistics_counter", pi3);
534
Austin Schuh4c3b9702020-08-30 11:34:55 -0700535 int pi1_server_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800536 pi1_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700537 "/pi1/aos", [&pi1_server_statistics_count](
538 const message_bridge::ServerStatistics &stats) {
539 VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
540 EXPECT_EQ(stats.connections()->size(), 2u);
541 for (const message_bridge::ServerConnection *connection :
542 *stats.connections()) {
543 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800544 EXPECT_EQ(connection->connection_count(), 1u);
545 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800546 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700547 if (connection->node()->name()->string_view() == "pi2") {
548 EXPECT_GT(connection->sent_packets(), 50);
549 } else if (connection->node()->name()->string_view() == "pi3") {
550 EXPECT_GE(connection->sent_packets(), 5);
551 } else {
552 LOG(FATAL) << "Unknown connection";
553 }
554
555 EXPECT_TRUE(connection->has_monotonic_offset());
556 EXPECT_EQ(connection->monotonic_offset(), 0);
557 }
558 ++pi1_server_statistics_count;
559 });
560
561 int pi2_server_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800562 pi2_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700563 "/pi2/aos", [&pi2_server_statistics_count](
564 const message_bridge::ServerStatistics &stats) {
565 VLOG(1) << "pi2 ServerStatistics " << FlatbufferToJson(&stats);
566 EXPECT_EQ(stats.connections()->size(), 1u);
567
568 const message_bridge::ServerConnection *connection =
569 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800570 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700571 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
572 EXPECT_GT(connection->sent_packets(), 50);
573 EXPECT_TRUE(connection->has_monotonic_offset());
574 EXPECT_EQ(connection->monotonic_offset(), 0);
Austin Schuh367a7f42021-11-23 23:04:36 -0800575 EXPECT_EQ(connection->connection_count(), 1u);
576 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700577 ++pi2_server_statistics_count;
578 });
579
580 int pi3_server_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800581 pi3_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700582 "/pi3/aos", [&pi3_server_statistics_count](
583 const message_bridge::ServerStatistics &stats) {
584 VLOG(1) << "pi3 ServerStatistics " << FlatbufferToJson(&stats);
585 EXPECT_EQ(stats.connections()->size(), 1u);
586
587 const message_bridge::ServerConnection *connection =
588 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800589 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700590 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
591 EXPECT_GE(connection->sent_packets(), 5);
592 EXPECT_TRUE(connection->has_monotonic_offset());
593 EXPECT_EQ(connection->monotonic_offset(), 0);
Austin Schuh367a7f42021-11-23 23:04:36 -0800594 EXPECT_EQ(connection->connection_count(), 1u);
595 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700596 ++pi3_server_statistics_count;
597 });
598
599 int pi1_client_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800600 pi1_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700601 "/pi1/aos", [&pi1_client_statistics_count](
602 const message_bridge::ClientStatistics &stats) {
603 VLOG(1) << "pi1 ClientStatistics " << FlatbufferToJson(&stats);
604 EXPECT_EQ(stats.connections()->size(), 2u);
605
606 for (const message_bridge::ClientConnection *connection :
607 *stats.connections()) {
608 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
609 if (connection->node()->name()->string_view() == "pi2") {
610 EXPECT_GT(connection->received_packets(), 50);
611 } else if (connection->node()->name()->string_view() == "pi3") {
612 EXPECT_GE(connection->received_packets(), 5);
613 } else {
614 LOG(FATAL) << "Unknown connection";
615 }
616
Austin Schuhe61d4382021-03-31 21:33:02 -0700617 EXPECT_EQ(connection->partial_deliveries(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700618 EXPECT_TRUE(connection->has_monotonic_offset());
619 EXPECT_EQ(connection->monotonic_offset(), 150000);
Austin Schuh367a7f42021-11-23 23:04:36 -0800620 EXPECT_EQ(connection->connection_count(), 1u);
621 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700622 }
623 ++pi1_client_statistics_count;
624 });
625
626 int pi2_client_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800627 pi2_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700628 "/pi2/aos", [&pi2_client_statistics_count](
629 const message_bridge::ClientStatistics &stats) {
630 VLOG(1) << "pi2 ClientStatistics " << FlatbufferToJson(&stats);
631 EXPECT_EQ(stats.connections()->size(), 1u);
632
633 const message_bridge::ClientConnection *connection =
634 stats.connections()->Get(0);
635 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
636 EXPECT_GT(connection->received_packets(), 50);
Austin Schuhe61d4382021-03-31 21:33:02 -0700637 EXPECT_EQ(connection->partial_deliveries(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700638 EXPECT_TRUE(connection->has_monotonic_offset());
639 EXPECT_EQ(connection->monotonic_offset(), 150000);
Austin Schuh367a7f42021-11-23 23:04:36 -0800640 EXPECT_EQ(connection->connection_count(), 1u);
641 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700642 ++pi2_client_statistics_count;
643 });
644
645 int pi3_client_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800646 pi3_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700647 "/pi3/aos", [&pi3_client_statistics_count](
648 const message_bridge::ClientStatistics &stats) {
649 VLOG(1) << "pi3 ClientStatistics " << FlatbufferToJson(&stats);
650 EXPECT_EQ(stats.connections()->size(), 1u);
651
652 const message_bridge::ClientConnection *connection =
653 stats.connections()->Get(0);
654 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
655 EXPECT_GE(connection->received_packets(), 5);
Austin Schuhe61d4382021-03-31 21:33:02 -0700656 EXPECT_EQ(connection->partial_deliveries(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700657 EXPECT_TRUE(connection->has_monotonic_offset());
658 EXPECT_EQ(connection->monotonic_offset(), 150000);
Austin Schuh367a7f42021-11-23 23:04:36 -0800659 EXPECT_EQ(connection->connection_count(), 1u);
660 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700661 ++pi3_client_statistics_count;
662 });
663
Austin Schuh2f8fd752020-09-01 22:38:28 -0700664 // Find the channel index for both the /pi1/aos Timestamp channel and Ping
665 // channel.
666 const size_t pi1_timestamp_channel =
667 configuration::ChannelIndex(pi1_pong_counter_event_loop->configuration(),
668 pi1_on_pi2_timestamp_fetcher.channel());
669 const size_t ping_timestamp_channel =
670 configuration::ChannelIndex(pi1_pong_counter_event_loop->configuration(),
671 ping_on_pi2_fetcher.channel());
672
673 for (const Channel *channel :
674 *pi1_pong_counter_event_loop->configuration()->channels()) {
675 VLOG(1) << "Channel "
676 << configuration::ChannelIndex(
677 pi1_pong_counter_event_loop->configuration(), channel)
678 << " " << configuration::CleanedChannelToString(channel);
679 }
680
Austin Schuh8fb315a2020-11-19 22:33:58 -0800681 std::unique_ptr<EventLoop> pi1_remote_timestamp =
682 simulated_event_loop_factory.MakeEventLoop("pi1_remote_timestamp", pi1);
683
Austin Schuh89c9b812021-02-20 14:42:10 -0800684 for (std::pair<int, std::string> channel :
685 shared()
686 ? std::vector<std::pair<
687 int, std::string>>{{-1, "/pi1/aos/remote_timestamps/pi2"}}
688 : std::vector<std::pair<int, std::string>>{
689 {pi1_timestamp_channel,
690 "/pi1/aos/remote_timestamps/pi2/pi1/aos/"
691 "aos-message_bridge-Timestamp"},
692 {ping_timestamp_channel,
693 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping"}}) {
694 // For each remote timestamp we get back, confirm that it is either a ping
695 // message, or a timestamp we sent out. Also confirm that the timestamps
696 // are correct.
697 pi1_remote_timestamp->MakeWatcher(
698 channel.second,
699 [pi1_timestamp_channel, ping_timestamp_channel, &ping_on_pi2_fetcher,
700 &ping_on_pi1_fetcher, &pi1_on_pi2_timestamp_fetcher,
701 &pi1_on_pi1_timestamp_fetcher, &simulated_event_loop_factory, pi2,
702 channel_index = channel.first](const RemoteMessage &header) {
703 VLOG(1) << aos::FlatbufferToJson(&header);
704 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuhcdd90272021-03-15 12:46:16 -0700705 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
Austin Schuh89c9b812021-02-20 14:42:10 -0800706 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
Austin Schuhcdd90272021-03-15 12:46:16 -0700707 ->boot_uuid());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700708
Austin Schuh89c9b812021-02-20 14:42:10 -0800709 const aos::monotonic_clock::time_point header_monotonic_sent_time(
710 chrono::nanoseconds(header.monotonic_sent_time()));
711 const aos::realtime_clock::time_point header_realtime_sent_time(
712 chrono::nanoseconds(header.realtime_sent_time()));
713 const aos::monotonic_clock::time_point header_monotonic_remote_time(
714 chrono::nanoseconds(header.monotonic_remote_time()));
715 const aos::realtime_clock::time_point header_realtime_remote_time(
716 chrono::nanoseconds(header.realtime_remote_time()));
Austin Schuh2f8fd752020-09-01 22:38:28 -0700717
Austin Schuh89c9b812021-02-20 14:42:10 -0800718 if (channel_index != -1) {
719 ASSERT_EQ(channel_index, header.channel_index());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700720 }
721
Austin Schuh89c9b812021-02-20 14:42:10 -0800722 const Context *pi1_context = nullptr;
723 const Context *pi2_context = nullptr;
724
725 if (header.channel_index() == pi1_timestamp_channel) {
726 // Find the forwarded message.
727 while (pi1_on_pi2_timestamp_fetcher.context().monotonic_event_time <
728 header_monotonic_sent_time) {
729 ASSERT_TRUE(pi1_on_pi2_timestamp_fetcher.FetchNext());
730 }
731
732 // And the source message.
733 while (pi1_on_pi1_timestamp_fetcher.context().monotonic_event_time <
734 header_monotonic_remote_time) {
735 ASSERT_TRUE(pi1_on_pi1_timestamp_fetcher.FetchNext());
736 }
737
738 pi1_context = &pi1_on_pi1_timestamp_fetcher.context();
739 pi2_context = &pi1_on_pi2_timestamp_fetcher.context();
740 } else if (header.channel_index() == ping_timestamp_channel) {
741 // Find the forwarded message.
742 while (ping_on_pi2_fetcher.context().monotonic_event_time <
743 header_monotonic_sent_time) {
744 ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
745 }
746
747 // And the source message.
748 while (ping_on_pi1_fetcher.context().monotonic_event_time <
749 header_monotonic_remote_time) {
750 ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
751 }
752
753 pi1_context = &ping_on_pi1_fetcher.context();
754 pi2_context = &ping_on_pi2_fetcher.context();
755 } else {
756 LOG(FATAL) << "Unknown channel";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700757 }
758
Austin Schuh89c9b812021-02-20 14:42:10 -0800759 // Confirm the forwarded message has matching timestamps to the
760 // timestamps we got back.
761 EXPECT_EQ(pi2_context->queue_index, header.queue_index());
762 EXPECT_EQ(pi2_context->remote_queue_index,
763 header.remote_queue_index());
764 EXPECT_EQ(pi2_context->monotonic_event_time,
765 header_monotonic_sent_time);
766 EXPECT_EQ(pi2_context->realtime_event_time,
767 header_realtime_sent_time);
768 EXPECT_EQ(pi2_context->realtime_remote_time,
769 header_realtime_remote_time);
770 EXPECT_EQ(pi2_context->monotonic_remote_time,
771 header_monotonic_remote_time);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700772
Austin Schuh89c9b812021-02-20 14:42:10 -0800773 // Confirm the forwarded message also matches the source message.
774 EXPECT_EQ(pi1_context->queue_index, header.remote_queue_index());
775 EXPECT_EQ(pi1_context->monotonic_event_time,
776 header_monotonic_remote_time);
777 EXPECT_EQ(pi1_context->realtime_event_time,
778 header_realtime_remote_time);
779 });
780 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700781
Austin Schuh4c3b9702020-08-30 11:34:55 -0700782 simulated_event_loop_factory.RunFor(chrono::seconds(10) -
783 chrono::milliseconds(500) +
784 chrono::milliseconds(5));
785
786 EXPECT_EQ(pi1_pong_counter.count(), 1001);
787 EXPECT_EQ(pi2_pong_counter.count(), 1001);
788
789 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 100);
790 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 100);
791 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 100);
792 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 100);
793 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 100);
794 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 100);
795 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 100);
796
Austin Schuh20ac95d2020-12-05 17:24:19 -0800797 EXPECT_EQ(pi1_server_statistics_count, 10);
798 EXPECT_EQ(pi2_server_statistics_count, 10);
799 EXPECT_EQ(pi3_server_statistics_count, 10);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700800
801 EXPECT_EQ(pi1_client_statistics_count, 95);
802 EXPECT_EQ(pi2_client_statistics_count, 95);
803 EXPECT_EQ(pi3_client_statistics_count, 95);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700804
805 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -0800806 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 1101);
807 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 1101);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700808}
809
810// Tests that an offset between nodes can be recovered and shows up in
811// ServerStatistics correctly.
812TEST(SimulatedEventLoopTest, MultinodePingPongWithOffset) {
813 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
Austin Schuh373f1762021-06-02 21:07:09 -0700814 aos::configuration::ReadConfig(ArtifactPath(
815 "aos/events/multinode_pingpong_test_combined_config.json"));
Austin Schuh4c3b9702020-08-30 11:34:55 -0700816 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
Austin Schuh87dd3832021-01-01 23:07:31 -0800817 const size_t pi1_index = configuration::GetNodeIndex(&config.message(), pi1);
818 ASSERT_EQ(pi1_index, 0u);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700819 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
Austin Schuh87dd3832021-01-01 23:07:31 -0800820 const size_t pi2_index = configuration::GetNodeIndex(&config.message(), pi2);
821 ASSERT_EQ(pi2_index, 1u);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700822 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
Austin Schuh87dd3832021-01-01 23:07:31 -0800823 const size_t pi3_index = configuration::GetNodeIndex(&config.message(), pi3);
824 ASSERT_EQ(pi3_index, 2u);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700825
Austin Schuh87dd3832021-01-01 23:07:31 -0800826 message_bridge::TestingTimeConverter time(
827 configuration::NodesCount(&config.message()));
Austin Schuh4c3b9702020-08-30 11:34:55 -0700828 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
Austin Schuh58646e22021-08-23 23:51:46 -0700829 simulated_event_loop_factory.SetTimeConverter(&time);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700830
831 constexpr chrono::milliseconds kOffset{1501};
Austin Schuh87dd3832021-01-01 23:07:31 -0800832 time.AddNextTimestamp(
833 distributed_clock::epoch(),
Austin Schuh58646e22021-08-23 23:51:46 -0700834 {BootTimestamp::epoch(), BootTimestamp::epoch() + kOffset,
835 BootTimestamp::epoch()});
Austin Schuh4c3b9702020-08-30 11:34:55 -0700836
837 std::unique_ptr<EventLoop> ping_event_loop =
838 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
839 Ping ping(ping_event_loop.get());
840
841 std::unique_ptr<EventLoop> pong_event_loop =
842 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
843 Pong pong(pong_event_loop.get());
844
Austin Schuh8fb315a2020-11-19 22:33:58 -0800845 // Wait to let timestamp estimation start up before looking for the results.
846 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
847
Austin Schuh87dd3832021-01-01 23:07:31 -0800848 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
849 simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
850
Austin Schuh4c3b9702020-08-30 11:34:55 -0700851 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
852 simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
853
854 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
855 simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
856
Austin Schuh4c3b9702020-08-30 11:34:55 -0700857 // Confirm the offsets are being recovered correctly.
858 int pi1_server_statistics_count = 0;
859 pi1_pong_counter_event_loop->MakeWatcher(
860 "/pi1/aos", [&pi1_server_statistics_count,
861 kOffset](const message_bridge::ServerStatistics &stats) {
862 VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
863 EXPECT_EQ(stats.connections()->size(), 2u);
864 for (const message_bridge::ServerConnection *connection :
865 *stats.connections()) {
866 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800867 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700868 if (connection->node()->name()->string_view() == "pi2") {
869 EXPECT_EQ(connection->monotonic_offset(),
870 chrono::nanoseconds(kOffset).count());
871 } else if (connection->node()->name()->string_view() == "pi3") {
872 EXPECT_EQ(connection->monotonic_offset(), 0);
873 } else {
874 LOG(FATAL) << "Unknown connection";
875 }
876
877 EXPECT_TRUE(connection->has_monotonic_offset());
878 }
879 ++pi1_server_statistics_count;
880 });
881
882 int pi2_server_statistics_count = 0;
883 pi2_pong_counter_event_loop->MakeWatcher(
884 "/pi2/aos", [&pi2_server_statistics_count,
885 kOffset](const message_bridge::ServerStatistics &stats) {
886 VLOG(1) << "pi2 ServerStatistics " << FlatbufferToJson(&stats);
887 EXPECT_EQ(stats.connections()->size(), 1u);
888
889 const message_bridge::ServerConnection *connection =
890 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800891 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700892 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
893 EXPECT_TRUE(connection->has_monotonic_offset());
894 EXPECT_EQ(connection->monotonic_offset(),
895 -chrono::nanoseconds(kOffset).count());
896 ++pi2_server_statistics_count;
897 });
898
899 int pi3_server_statistics_count = 0;
900 pi3_pong_counter_event_loop->MakeWatcher(
901 "/pi3/aos", [&pi3_server_statistics_count](
902 const message_bridge::ServerStatistics &stats) {
903 VLOG(1) << "pi3 ServerStatistics " << FlatbufferToJson(&stats);
904 EXPECT_EQ(stats.connections()->size(), 1u);
905
906 const message_bridge::ServerConnection *connection =
907 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800908 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700909 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
910 EXPECT_TRUE(connection->has_monotonic_offset());
911 EXPECT_EQ(connection->monotonic_offset(), 0);
912 ++pi3_server_statistics_count;
913 });
914
915 simulated_event_loop_factory.RunFor(chrono::seconds(10) -
916 chrono::milliseconds(500) +
917 chrono::milliseconds(5));
918
Austin Schuh20ac95d2020-12-05 17:24:19 -0800919 EXPECT_EQ(pi1_server_statistics_count, 10);
Austin Schuh58646e22021-08-23 23:51:46 -0700920 EXPECT_EQ(pi2_server_statistics_count, 10);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800921 EXPECT_EQ(pi3_server_statistics_count, 10);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700922}
923
924// Test that disabling statistics actually disables them.
Austin Schuh89c9b812021-02-20 14:42:10 -0800925TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeWithoutStatistics) {
Austin Schuh4c3b9702020-08-30 11:34:55 -0700926 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
927 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
928 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
929
930 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
931 simulated_event_loop_factory.DisableStatistics();
932
933 std::unique_ptr<EventLoop> ping_event_loop =
934 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
935 Ping ping(ping_event_loop.get());
936
937 std::unique_ptr<EventLoop> pong_event_loop =
938 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
939 Pong pong(pong_event_loop.get());
940
941 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
942 simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
943
944 MessageCounter<examples::Pong> pi2_pong_counter(
945 pi2_pong_counter_event_loop.get(), "/test");
946
947 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
948 simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
949
950 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
951 simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
952
953 MessageCounter<examples::Pong> pi1_pong_counter(
954 pi1_pong_counter_event_loop.get(), "/test");
955
956 // Count timestamps.
957 MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
958 pi1_pong_counter_event_loop.get(), "/pi1/aos");
959 MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
960 pi2_pong_counter_event_loop.get(), "/pi1/aos");
961 MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
962 pi3_pong_counter_event_loop.get(), "/pi1/aos");
963 MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
964 pi1_pong_counter_event_loop.get(), "/pi2/aos");
965 MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
966 pi2_pong_counter_event_loop.get(), "/pi2/aos");
967 MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
968 pi1_pong_counter_event_loop.get(), "/pi3/aos");
969 MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
970 pi3_pong_counter_event_loop.get(), "/pi3/aos");
971
Austin Schuh2f8fd752020-09-01 22:38:28 -0700972 // Count remote timestamps
Austin Schuh89c9b812021-02-20 14:42:10 -0800973 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
974 remote_timestamps_pi2_on_pi1 =
975 MakePi2OnPi1MessageCounters(pi1_pong_counter_event_loop.get());
976 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
977 remote_timestamps_pi1_on_pi2 =
978 MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700979
Austin Schuh4c3b9702020-08-30 11:34:55 -0700980 MessageCounter<message_bridge::ServerStatistics>
981 pi1_server_statistics_counter(pi1_pong_counter_event_loop.get(),
982 "/pi1/aos");
983 MessageCounter<message_bridge::ServerStatistics>
984 pi2_server_statistics_counter(pi2_pong_counter_event_loop.get(),
985 "/pi2/aos");
986 MessageCounter<message_bridge::ServerStatistics>
987 pi3_server_statistics_counter(pi3_pong_counter_event_loop.get(),
988 "/pi3/aos");
989
990 MessageCounter<message_bridge::ClientStatistics>
991 pi1_client_statistics_counter(pi1_pong_counter_event_loop.get(),
992 "/pi1/aos");
993 MessageCounter<message_bridge::ClientStatistics>
994 pi2_client_statistics_counter(pi2_pong_counter_event_loop.get(),
995 "/pi2/aos");
996 MessageCounter<message_bridge::ClientStatistics>
997 pi3_client_statistics_counter(pi3_pong_counter_event_loop.get(),
998 "/pi3/aos");
Austin Schuh898f4972020-01-11 17:21:25 -0800999
1000 simulated_event_loop_factory.RunFor(chrono::seconds(10) +
1001 chrono::milliseconds(5));
1002
Austin Schuh4c3b9702020-08-30 11:34:55 -07001003 EXPECT_EQ(pi1_pong_counter.count(), 1001u);
1004 EXPECT_EQ(pi2_pong_counter.count(), 1001u);
1005
1006 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 0u);
1007 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 0u);
1008 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 0u);
1009 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 0u);
1010 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 0u);
1011 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 0u);
1012 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 0u);
1013
1014 EXPECT_EQ(pi1_server_statistics_counter.count(), 0u);
1015 EXPECT_EQ(pi2_server_statistics_counter.count(), 0u);
1016 EXPECT_EQ(pi3_server_statistics_counter.count(), 0u);
1017
1018 EXPECT_EQ(pi1_client_statistics_counter.count(), 0u);
1019 EXPECT_EQ(pi2_client_statistics_counter.count(), 0u);
1020 EXPECT_EQ(pi3_client_statistics_counter.count(), 0u);
Austin Schuh2f8fd752020-09-01 22:38:28 -07001021
1022 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001023 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 1001);
1024 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 1001);
Austin Schuh898f4972020-01-11 17:21:25 -08001025}
1026
Austin Schuhc0b0f722020-12-12 18:36:06 -08001027bool AllConnected(const message_bridge::ServerStatistics *server_statistics) {
1028 for (const message_bridge::ServerConnection *connection :
1029 *server_statistics->connections()) {
1030 if (connection->state() != message_bridge::State::CONNECTED) {
1031 return false;
1032 }
1033 }
1034 return true;
1035}
1036
1037bool AllConnectedBut(const message_bridge::ServerStatistics *server_statistics,
1038 std::string_view target) {
1039 for (const message_bridge::ServerConnection *connection :
1040 *server_statistics->connections()) {
1041 if (connection->node()->name()->string_view() == target) {
1042 if (connection->state() == message_bridge::State::CONNECTED) {
1043 return false;
1044 }
1045 } else {
1046 if (connection->state() != message_bridge::State::CONNECTED) {
1047 return false;
1048 }
1049 }
1050 }
1051 return true;
1052}
1053
1054bool AllConnected(const message_bridge::ClientStatistics *client_statistics) {
1055 for (const message_bridge::ClientConnection *connection :
1056 *client_statistics->connections()) {
1057 if (connection->state() != message_bridge::State::CONNECTED) {
1058 return false;
1059 }
Austin Schuh367a7f42021-11-23 23:04:36 -08001060 EXPECT_TRUE(connection->has_boot_uuid());
1061 EXPECT_TRUE(connection->has_connected_since_time());
1062 EXPECT_TRUE(connection->has_connection_count());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001063 }
1064 return true;
1065}
1066
1067bool AllConnectedBut(const message_bridge::ClientStatistics *client_statistics,
1068 std::string_view target) {
1069 for (const message_bridge::ClientConnection *connection :
1070 *client_statistics->connections()) {
1071 if (connection->node()->name()->string_view() == target) {
1072 if (connection->state() == message_bridge::State::CONNECTED) {
1073 return false;
1074 }
Austin Schuh367a7f42021-11-23 23:04:36 -08001075 EXPECT_FALSE(connection->has_boot_uuid());
1076 EXPECT_FALSE(connection->has_connected_since_time());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001077 } else {
1078 if (connection->state() != message_bridge::State::CONNECTED) {
1079 return false;
1080 }
Austin Schuh367a7f42021-11-23 23:04:36 -08001081 EXPECT_TRUE(connection->has_boot_uuid());
1082 EXPECT_TRUE(connection->has_connected_since_time());
1083 EXPECT_TRUE(connection->has_connection_count());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001084 }
1085 }
1086 return true;
1087}
1088
Austin Schuh367a7f42021-11-23 23:04:36 -08001089int ConnectedCount(const message_bridge::ClientStatistics *client_statistics,
1090 std::string_view target) {
1091 for (const message_bridge::ClientConnection *connection :
1092 *client_statistics->connections()) {
1093 if (connection->node()->name()->string_view() == target) {
1094 return connection->connection_count();
1095 }
1096 }
1097 return 0;
1098}
1099
1100int ConnectedCount(const message_bridge::ServerStatistics *server_statistics,
1101 std::string_view target) {
1102 for (const message_bridge::ServerConnection *connection :
1103 *server_statistics->connections()) {
1104 if (connection->node()->name()->string_view() == target) {
1105 return connection->connection_count();
1106 }
1107 }
1108 return 0;
1109}
1110
Austin Schuhc0b0f722020-12-12 18:36:06 -08001111// Test that disconnecting nodes actually disconnects them.
Austin Schuh89c9b812021-02-20 14:42:10 -08001112TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeDisconnect) {
Austin Schuhc0b0f722020-12-12 18:36:06 -08001113 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
1114
Austin Schuh58646e22021-08-23 23:51:46 -07001115 NodeEventLoopFactory *pi1 =
1116 simulated_event_loop_factory.GetNodeEventLoopFactory("pi1");
1117 NodeEventLoopFactory *pi2 =
1118 simulated_event_loop_factory.GetNodeEventLoopFactory("pi2");
1119 NodeEventLoopFactory *pi3 =
1120 simulated_event_loop_factory.GetNodeEventLoopFactory("pi3");
1121
1122 std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001123 Ping ping(ping_event_loop.get());
1124
Austin Schuh58646e22021-08-23 23:51:46 -07001125 std::unique_ptr<EventLoop> pong_event_loop = pi2->MakeEventLoop("pong");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001126 Pong pong(pong_event_loop.get());
1127
1128 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
Austin Schuh58646e22021-08-23 23:51:46 -07001129 pi2->MakeEventLoop("pi2_pong_counter");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001130
1131 MessageCounter<examples::Pong> pi2_pong_counter(
1132 pi2_pong_counter_event_loop.get(), "/test");
1133
1134 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
Austin Schuh58646e22021-08-23 23:51:46 -07001135 pi3->MakeEventLoop("pi3_pong_counter");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001136
1137 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
Austin Schuh58646e22021-08-23 23:51:46 -07001138 pi1->MakeEventLoop("pi1_pong_counter");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001139
1140 MessageCounter<examples::Pong> pi1_pong_counter(
1141 pi1_pong_counter_event_loop.get(), "/test");
1142
1143 // Count timestamps.
1144 MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
1145 pi1_pong_counter_event_loop.get(), "/pi1/aos");
1146 MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
1147 pi2_pong_counter_event_loop.get(), "/pi1/aos");
1148 MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
1149 pi3_pong_counter_event_loop.get(), "/pi1/aos");
1150 MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
1151 pi1_pong_counter_event_loop.get(), "/pi2/aos");
1152 MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
1153 pi2_pong_counter_event_loop.get(), "/pi2/aos");
1154 MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
1155 pi1_pong_counter_event_loop.get(), "/pi3/aos");
1156 MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
1157 pi3_pong_counter_event_loop.get(), "/pi3/aos");
1158
1159 // Count remote timestamps
Austin Schuh89c9b812021-02-20 14:42:10 -08001160 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1161 remote_timestamps_pi2_on_pi1 =
1162 MakePi2OnPi1MessageCounters(pi1_pong_counter_event_loop.get());
1163 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1164 remote_timestamps_pi1_on_pi2 =
1165 MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001166
1167 MessageCounter<message_bridge::ServerStatistics>
Austin Schuh58646e22021-08-23 23:51:46 -07001168 *pi1_server_statistics_counter;
1169 pi1->OnStartup([pi1, &pi1_server_statistics_counter]() {
1170 pi1_server_statistics_counter =
1171 pi1->AlwaysStart<MessageCounter<message_bridge::ServerStatistics>>(
1172 "pi1_server_statistics_counter", "/pi1/aos");
1173 });
1174
Austin Schuhc0b0f722020-12-12 18:36:06 -08001175 aos::Fetcher<message_bridge::ServerStatistics> pi1_server_statistics_fetcher =
1176 pi1_pong_counter_event_loop
1177 ->MakeFetcher<message_bridge::ServerStatistics>("/pi1/aos");
1178 aos::Fetcher<message_bridge::ClientStatistics> pi1_client_statistics_fetcher =
1179 pi1_pong_counter_event_loop
1180 ->MakeFetcher<message_bridge::ClientStatistics>("/pi1/aos");
1181
1182 MessageCounter<message_bridge::ServerStatistics>
Austin Schuh58646e22021-08-23 23:51:46 -07001183 *pi2_server_statistics_counter;
1184 pi2->OnStartup([pi2, &pi2_server_statistics_counter]() {
1185 pi2_server_statistics_counter =
1186 pi2->AlwaysStart<MessageCounter<message_bridge::ServerStatistics>>(
1187 "pi2_server_statistics_counter", "/pi2/aos");
1188 });
Austin Schuhc0b0f722020-12-12 18:36:06 -08001189 aos::Fetcher<message_bridge::ServerStatistics> pi2_server_statistics_fetcher =
1190 pi2_pong_counter_event_loop
1191 ->MakeFetcher<message_bridge::ServerStatistics>("/pi2/aos");
1192 aos::Fetcher<message_bridge::ClientStatistics> pi2_client_statistics_fetcher =
1193 pi2_pong_counter_event_loop
1194 ->MakeFetcher<message_bridge::ClientStatistics>("/pi2/aos");
1195
1196 MessageCounter<message_bridge::ServerStatistics>
Austin Schuh58646e22021-08-23 23:51:46 -07001197 *pi3_server_statistics_counter;
1198 pi3->OnStartup([pi3, &pi3_server_statistics_counter]() {
1199 pi3_server_statistics_counter =
1200 pi3->AlwaysStart<MessageCounter<message_bridge::ServerStatistics>>(
1201 "pi3_server_statistics_counter", "/pi3/aos");
1202 });
Austin Schuhc0b0f722020-12-12 18:36:06 -08001203 aos::Fetcher<message_bridge::ServerStatistics> pi3_server_statistics_fetcher =
1204 pi3_pong_counter_event_loop
1205 ->MakeFetcher<message_bridge::ServerStatistics>("/pi3/aos");
1206 aos::Fetcher<message_bridge::ClientStatistics> pi3_client_statistics_fetcher =
1207 pi3_pong_counter_event_loop
1208 ->MakeFetcher<message_bridge::ClientStatistics>("/pi3/aos");
1209
1210 MessageCounter<message_bridge::ClientStatistics>
1211 pi1_client_statistics_counter(pi1_pong_counter_event_loop.get(),
1212 "/pi1/aos");
1213 MessageCounter<message_bridge::ClientStatistics>
1214 pi2_client_statistics_counter(pi2_pong_counter_event_loop.get(),
1215 "/pi2/aos");
1216 MessageCounter<message_bridge::ClientStatistics>
1217 pi3_client_statistics_counter(pi3_pong_counter_event_loop.get(),
1218 "/pi3/aos");
1219
1220 simulated_event_loop_factory.RunFor(chrono::seconds(2) +
1221 chrono::milliseconds(5));
1222
1223 EXPECT_EQ(pi1_pong_counter.count(), 201u);
1224 EXPECT_EQ(pi2_pong_counter.count(), 201u);
1225
1226 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 20u);
1227 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 20u);
1228 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 20u);
1229 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 20u);
1230 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 20u);
1231 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 20u);
1232 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 20u);
1233
Austin Schuh58646e22021-08-23 23:51:46 -07001234 EXPECT_EQ(pi1_server_statistics_counter->count(), 2u);
1235 EXPECT_EQ(pi2_server_statistics_counter->count(), 2u);
1236 EXPECT_EQ(pi3_server_statistics_counter->count(), 2u);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001237
1238 EXPECT_EQ(pi1_client_statistics_counter.count(), 20u);
1239 EXPECT_EQ(pi2_client_statistics_counter.count(), 20u);
1240 EXPECT_EQ(pi3_client_statistics_counter.count(), 20u);
1241
1242 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001243 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 221);
1244 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 221);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001245
1246 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1247 EXPECT_TRUE(AllConnected(pi1_server_statistics_fetcher.get()))
1248 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1249 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1250 EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
1251 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1252 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1253 EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
1254 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1255 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1256 EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
1257 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1258 EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
1259 EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
1260 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
1261 EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
1262 EXPECT_TRUE(AllConnected(pi3_client_statistics_fetcher.get()))
1263 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1264
Austin Schuh58646e22021-08-23 23:51:46 -07001265 pi1->Disconnect(pi3->node());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001266
1267 simulated_event_loop_factory.RunFor(chrono::seconds(2));
1268
1269 EXPECT_EQ(pi1_pong_counter.count(), 401u);
1270 EXPECT_EQ(pi2_pong_counter.count(), 401u);
1271
1272 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 40u);
1273 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 40u);
1274 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 20u);
1275 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 40u);
1276 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 40u);
1277 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 40u);
1278 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 40u);
1279
Austin Schuh58646e22021-08-23 23:51:46 -07001280 EXPECT_EQ(pi1_server_statistics_counter->count(), 4u);
1281 EXPECT_EQ(pi2_server_statistics_counter->count(), 4u);
1282 EXPECT_EQ(pi3_server_statistics_counter->count(), 4u);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001283
1284 EXPECT_EQ(pi1_client_statistics_counter.count(), 40u);
1285 EXPECT_EQ(pi2_client_statistics_counter.count(), 40u);
1286 EXPECT_EQ(pi3_client_statistics_counter.count(), 40u);
1287
1288 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001289 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 441);
1290 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 441);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001291
1292 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1293 EXPECT_TRUE(AllConnectedBut(pi1_server_statistics_fetcher.get(), "pi3"))
1294 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1295 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1296 EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
1297 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1298 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1299 EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
1300 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1301 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1302 EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
1303 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1304 EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
1305 EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
1306 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
1307 EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
1308 EXPECT_TRUE(AllConnectedBut(pi3_client_statistics_fetcher.get(), "pi1"))
1309 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1310
Austin Schuh58646e22021-08-23 23:51:46 -07001311 pi1->Connect(pi3->node());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001312
1313 simulated_event_loop_factory.RunFor(chrono::seconds(2));
1314
Austin Schuh367a7f42021-11-23 23:04:36 -08001315 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1316 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1317 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1318 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1319 EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
1320 EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
1321
1322 EXPECT_EQ(ConnectedCount(pi1_server_statistics_fetcher.get(), "pi3"), 2u)
1323 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1324 EXPECT_EQ(ConnectedCount(pi1_server_statistics_fetcher.get(), "pi2"), 1u)
1325 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1326 EXPECT_EQ(ConnectedCount(pi1_client_statistics_fetcher.get(), "pi3"), 1u)
1327 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1328 EXPECT_EQ(ConnectedCount(pi1_client_statistics_fetcher.get(), "pi2"), 1u)
1329 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1330
1331 EXPECT_EQ(ConnectedCount(pi2_server_statistics_fetcher.get(), "pi1"), 1u)
1332 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1333 EXPECT_EQ(ConnectedCount(pi2_client_statistics_fetcher.get(), "pi1"), 1u)
1334 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1335
1336 EXPECT_EQ(ConnectedCount(pi3_server_statistics_fetcher.get(), "pi1"), 1u)
1337 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
1338 EXPECT_EQ(ConnectedCount(pi3_client_statistics_fetcher.get(), "pi1"), 2u)
1339 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1340
1341
Austin Schuhc0b0f722020-12-12 18:36:06 -08001342 EXPECT_EQ(pi1_pong_counter.count(), 601u);
1343 EXPECT_EQ(pi2_pong_counter.count(), 601u);
1344
1345 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 60u);
1346 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 60u);
1347 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 40u);
1348 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 60u);
1349 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 60u);
1350 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 60u);
1351 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 60u);
1352
Austin Schuh58646e22021-08-23 23:51:46 -07001353 EXPECT_EQ(pi1_server_statistics_counter->count(), 6u);
1354 EXPECT_EQ(pi2_server_statistics_counter->count(), 6u);
1355 EXPECT_EQ(pi3_server_statistics_counter->count(), 6u);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001356
1357 EXPECT_EQ(pi1_client_statistics_counter.count(), 60u);
1358 EXPECT_EQ(pi2_client_statistics_counter.count(), 60u);
1359 EXPECT_EQ(pi3_client_statistics_counter.count(), 60u);
1360
1361 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001362 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 661);
1363 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 661);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001364
Austin Schuhc0b0f722020-12-12 18:36:06 -08001365 EXPECT_TRUE(AllConnected(pi1_server_statistics_fetcher.get()))
1366 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001367 EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
1368 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001369 EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
1370 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001371 EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
1372 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001373 EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
1374 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001375 EXPECT_TRUE(AllConnected(pi3_client_statistics_fetcher.get()))
1376 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1377}
1378
Austin Schuh2febf0d2020-09-21 22:24:30 -07001379// Tests that the time offset having a slope doesn't break the world.
1380// SimulatedMessageBridge has enough self consistency CHECK statements to
1381// confirm, and we can can also check a message in each direction to make sure
1382// it gets delivered as expected.
1383TEST(SimulatedEventLoopTest, MultinodePingPongWithOffsetAndSlope) {
1384 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
Austin Schuh373f1762021-06-02 21:07:09 -07001385 aos::configuration::ReadConfig(ArtifactPath(
1386 "aos/events/multinode_pingpong_test_combined_config.json"));
Austin Schuh2febf0d2020-09-21 22:24:30 -07001387 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
Austin Schuh87dd3832021-01-01 23:07:31 -08001388 const size_t pi1_index = configuration::GetNodeIndex(&config.message(), pi1);
1389 ASSERT_EQ(pi1_index, 0u);
Austin Schuh2febf0d2020-09-21 22:24:30 -07001390 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
Austin Schuh87dd3832021-01-01 23:07:31 -08001391 const size_t pi2_index = configuration::GetNodeIndex(&config.message(), pi2);
1392 ASSERT_EQ(pi2_index, 1u);
1393 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
1394 const size_t pi3_index = configuration::GetNodeIndex(&config.message(), pi3);
1395 ASSERT_EQ(pi3_index, 2u);
Austin Schuh2febf0d2020-09-21 22:24:30 -07001396
Austin Schuh87dd3832021-01-01 23:07:31 -08001397 message_bridge::TestingTimeConverter time(
1398 configuration::NodesCount(&config.message()));
Austin Schuh2febf0d2020-09-21 22:24:30 -07001399 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
Austin Schuh58646e22021-08-23 23:51:46 -07001400 simulated_event_loop_factory.SetTimeConverter(&time);
Austin Schuh2febf0d2020-09-21 22:24:30 -07001401
Austin Schuh2febf0d2020-09-21 22:24:30 -07001402 constexpr chrono::milliseconds kOffset{150100};
Austin Schuh87dd3832021-01-01 23:07:31 -08001403 time.AddNextTimestamp(
1404 distributed_clock::epoch(),
Austin Schuh58646e22021-08-23 23:51:46 -07001405 {BootTimestamp::epoch(), BootTimestamp::epoch() + kOffset,
1406 BootTimestamp::epoch()});
1407 time.AddNextTimestamp(distributed_clock::epoch() + chrono::seconds(10),
1408 {BootTimestamp::epoch() + chrono::milliseconds(9999),
1409 BootTimestamp::epoch() + kOffset + chrono::seconds(10),
1410 BootTimestamp::epoch() + chrono::milliseconds(9999)});
Austin Schuh2febf0d2020-09-21 22:24:30 -07001411
1412 std::unique_ptr<EventLoop> ping_event_loop =
1413 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
1414 Ping ping(ping_event_loop.get());
1415
1416 std::unique_ptr<EventLoop> pong_event_loop =
1417 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
1418 Pong pong(pong_event_loop.get());
1419
1420 std::unique_ptr<EventLoop> pi1_counter_event_loop =
1421 simulated_event_loop_factory.MakeEventLoop("pi1_counter", pi1);
1422 std::unique_ptr<EventLoop> pi2_counter_event_loop =
1423 simulated_event_loop_factory.MakeEventLoop("pi2_counter", pi2);
1424
1425 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
1426 pi1_counter_event_loop->MakeFetcher<examples::Ping>("/test");
1427 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
1428 pi2_counter_event_loop->MakeFetcher<examples::Ping>("/test");
1429
1430 aos::Fetcher<examples::Pong> pong_on_pi2_fetcher =
1431 pi2_counter_event_loop->MakeFetcher<examples::Pong>("/test");
1432 aos::Fetcher<examples::Pong> pong_on_pi1_fetcher =
1433 pi1_counter_event_loop->MakeFetcher<examples::Pong>("/test");
1434
1435 // End after a pong message comes back. This will leave the latest messages
1436 // on all channels so we can look at timestamps easily and check they make
1437 // sense.
1438 std::unique_ptr<EventLoop> pi1_pong_ender =
1439 simulated_event_loop_factory.MakeEventLoop("pi2_counter", pi1);
1440 int count = 0;
1441 pi1_pong_ender->MakeWatcher(
1442 "/test", [&simulated_event_loop_factory, &count](const examples::Pong &) {
1443 if (++count == 100) {
1444 simulated_event_loop_factory.Exit();
1445 }
1446 });
1447
1448 // Run enough that messages should be delivered.
1449 simulated_event_loop_factory.Run();
1450
1451 // Grab the latest messages.
1452 EXPECT_TRUE(ping_on_pi1_fetcher.Fetch());
1453 EXPECT_TRUE(ping_on_pi2_fetcher.Fetch());
1454 EXPECT_TRUE(pong_on_pi1_fetcher.Fetch());
1455 EXPECT_TRUE(pong_on_pi2_fetcher.Fetch());
1456
1457 // Compute their time on the global distributed clock so we can compute
1458 // distance betwen them.
1459 const distributed_clock::time_point pi1_ping_time =
1460 simulated_event_loop_factory.GetNodeEventLoopFactory(pi1)
1461 ->ToDistributedClock(
1462 ping_on_pi1_fetcher.context().monotonic_event_time);
1463 const distributed_clock::time_point pi2_ping_time =
1464 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
1465 ->ToDistributedClock(
1466 ping_on_pi2_fetcher.context().monotonic_event_time);
1467 const distributed_clock::time_point pi1_pong_time =
1468 simulated_event_loop_factory.GetNodeEventLoopFactory(pi1)
1469 ->ToDistributedClock(
1470 pong_on_pi1_fetcher.context().monotonic_event_time);
1471 const distributed_clock::time_point pi2_pong_time =
1472 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
1473 ->ToDistributedClock(
1474 pong_on_pi2_fetcher.context().monotonic_event_time);
1475
1476 // And confirm the delivery delay is just about exactly 150 uS for both
1477 // directions like expected. There will be a couple ns of rounding errors in
1478 // the conversion functions that aren't worth accounting for right now. This
1479 // will either be really close, or really far.
1480 EXPECT_GE(pi2_ping_time, chrono::microseconds(150) - chrono::nanoseconds(10) +
1481 pi1_ping_time);
1482 EXPECT_LE(pi2_ping_time, chrono::microseconds(150) + chrono::nanoseconds(10) +
1483 pi1_ping_time);
1484
1485 EXPECT_GE(pi1_pong_time, chrono::microseconds(150) - chrono::nanoseconds(10) +
1486 pi2_pong_time);
1487 EXPECT_LE(pi1_pong_time, chrono::microseconds(150) + chrono::nanoseconds(10) +
1488 pi2_pong_time);
1489}
1490
Austin Schuh4c570ea2020-11-19 23:13:24 -08001491void SendPing(aos::Sender<examples::Ping> *sender, int value) {
1492 aos::Sender<examples::Ping>::Builder builder = sender->MakeBuilder();
1493 examples::Ping::Builder ping_builder = builder.MakeBuilder<examples::Ping>();
1494 ping_builder.add_value(value);
milind1f1dca32021-07-03 13:50:07 -07001495 builder.CheckOk(builder.Send(ping_builder.Finish()));
Austin Schuh4c570ea2020-11-19 23:13:24 -08001496}
1497
1498// Tests that reliable (and unreliable) ping messages get forwarded as expected.
Austin Schuh89c9b812021-02-20 14:42:10 -08001499TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeStartupTesting) {
Austin Schuh4c570ea2020-11-19 23:13:24 -08001500 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
1501 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
1502
1503 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
1504
1505 std::unique_ptr<EventLoop> ping_event_loop =
1506 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
1507 aos::Sender<examples::Ping> pi1_reliable_sender =
1508 ping_event_loop->MakeSender<examples::Ping>("/reliable");
1509 aos::Sender<examples::Ping> pi1_unreliable_sender =
1510 ping_event_loop->MakeSender<examples::Ping>("/unreliable");
1511 SendPing(&pi1_reliable_sender, 1);
1512 SendPing(&pi1_unreliable_sender, 1);
1513
1514 std::unique_ptr<EventLoop> pi2_pong_event_loop =
1515 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
1516 MessageCounter<examples::Ping> pi2_reliable_counter(pi2_pong_event_loop.get(),
1517 "/reliable");
1518 MessageCounter<examples::Ping> pi2_unreliable_counter(
1519 pi2_pong_event_loop.get(), "/unreliable");
1520 aos::Fetcher<examples::Ping> reliable_on_pi2_fetcher =
1521 pi2_pong_event_loop->MakeFetcher<examples::Ping>("/reliable");
1522 aos::Fetcher<examples::Ping> unreliable_on_pi2_fetcher =
1523 pi2_pong_event_loop->MakeFetcher<examples::Ping>("/unreliable");
1524
1525 const size_t reliable_channel_index = configuration::ChannelIndex(
1526 pi2_pong_event_loop->configuration(), reliable_on_pi2_fetcher.channel());
1527
1528 std::unique_ptr<EventLoop> pi1_remote_timestamp =
1529 simulated_event_loop_factory.MakeEventLoop("pi1_remote_timestamp", pi1);
1530
Austin Schuheeaa2022021-01-02 21:52:03 -08001531 const chrono::nanoseconds network_delay =
1532 simulated_event_loop_factory.network_delay();
1533
Austin Schuh4c570ea2020-11-19 23:13:24 -08001534 int reliable_timestamp_count = 0;
1535 pi1_remote_timestamp->MakeWatcher(
Austin Schuh89c9b812021-02-20 14:42:10 -08001536 shared() ? "/pi1/aos/remote_timestamps/pi2"
1537 : "/pi1/aos/remote_timestamps/pi2/reliable/aos-examples-Ping",
Austin Schuh20ac95d2020-12-05 17:24:19 -08001538 [reliable_channel_index, &reliable_timestamp_count,
Austin Schuheeaa2022021-01-02 21:52:03 -08001539 &simulated_event_loop_factory, pi2, network_delay, &pi2_pong_event_loop,
1540 &pi1_remote_timestamp](const RemoteMessage &header) {
Austin Schuh20ac95d2020-12-05 17:24:19 -08001541 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuhcdd90272021-03-15 12:46:16 -07001542 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
Austin Schuh20ac95d2020-12-05 17:24:19 -08001543 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
Austin Schuhcdd90272021-03-15 12:46:16 -07001544 ->boot_uuid());
Austin Schuh4c570ea2020-11-19 23:13:24 -08001545 VLOG(1) << aos::FlatbufferToJson(&header);
1546 if (header.channel_index() == reliable_channel_index) {
1547 ++reliable_timestamp_count;
1548 }
Austin Schuheeaa2022021-01-02 21:52:03 -08001549
1550 const aos::monotonic_clock::time_point header_monotonic_sent_time(
1551 chrono::nanoseconds(header.monotonic_sent_time()));
1552
1553 EXPECT_EQ(pi1_remote_timestamp->context().monotonic_event_time,
1554 header_monotonic_sent_time + network_delay +
1555 (pi1_remote_timestamp->monotonic_now() -
1556 pi2_pong_event_loop->monotonic_now()));
Austin Schuh4c570ea2020-11-19 23:13:24 -08001557 });
1558
1559 // Wait to let timestamp estimation start up before looking for the results.
1560 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
1561
1562 EXPECT_EQ(pi2_reliable_counter.count(), 1u);
1563 // This one isn't reliable, but was sent before the start. It should *not* be
1564 // delivered.
1565 EXPECT_EQ(pi2_unreliable_counter.count(), 0u);
1566 // Confirm we got a timestamp logged for the message that was forwarded.
1567 EXPECT_EQ(reliable_timestamp_count, 1u);
1568
1569 SendPing(&pi1_reliable_sender, 2);
1570 SendPing(&pi1_unreliable_sender, 2);
1571 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
1572 EXPECT_EQ(pi2_reliable_counter.count(), 2u);
1573 EXPECT_EQ(pi2_unreliable_counter.count(), 1u);
1574
1575 EXPECT_EQ(reliable_timestamp_count, 2u);
1576}
1577
Austin Schuh20ac95d2020-12-05 17:24:19 -08001578// Tests that rebooting a node changes the ServerStatistics message and the
1579// RemoteTimestamp message.
Austin Schuh89c9b812021-02-20 14:42:10 -08001580TEST_P(RemoteMessageSimulatedEventLoopTest, BootUUIDTest) {
Austin Schuh72e65682021-09-02 11:37:05 -07001581 const UUID pi1_boot0 = UUID::Random();
1582 const UUID pi2_boot0 = UUID::Random();
1583 const UUID pi2_boot1 = UUID::Random();
1584 const UUID pi3_boot0 = UUID::Random();
1585 UUID expected_boot_uuid = pi2_boot0;
Austin Schuh20ac95d2020-12-05 17:24:19 -08001586
Austin Schuh58646e22021-08-23 23:51:46 -07001587 message_bridge::TestingTimeConverter time(
1588 configuration::NodesCount(&config.message()));
1589 SimulatedEventLoopFactory factory(&config.message());
1590 factory.SetTimeConverter(&time);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001591
Austin Schuh58646e22021-08-23 23:51:46 -07001592 const size_t pi1_index =
1593 configuration::GetNodeIndex(&config.message(), "pi1");
1594 const size_t pi2_index =
1595 configuration::GetNodeIndex(&config.message(), "pi2");
1596 const size_t pi3_index =
1597 configuration::GetNodeIndex(&config.message(), "pi3");
Austin Schuh20ac95d2020-12-05 17:24:19 -08001598
Austin Schuh58646e22021-08-23 23:51:46 -07001599 {
1600 time.AddNextTimestamp(distributed_clock::epoch(),
1601 {BootTimestamp::epoch(), BootTimestamp::epoch(),
1602 BootTimestamp::epoch()});
1603
1604 const chrono::nanoseconds dt = chrono::milliseconds(2001);
1605
1606 time.AddNextTimestamp(
1607 distributed_clock::epoch() + dt,
1608 {BootTimestamp::epoch() + dt,
1609 BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
1610 BootTimestamp::epoch() + dt});
1611
1612 time.set_boot_uuid(pi1_index, 0, pi1_boot0);
1613 time.set_boot_uuid(pi2_index, 0, pi2_boot0);
1614 time.set_boot_uuid(pi2_index, 1, pi2_boot1);
1615 time.set_boot_uuid(pi3_index, 0, pi3_boot0);
1616 }
1617
1618 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1619 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
1620
1621 pi1->OnStartup([pi1]() { pi1->AlwaysStart<Ping>("ping"); });
1622 pi2->OnStartup([pi2]() { pi2->AlwaysStart<Pong>("pong"); });
Austin Schuh20ac95d2020-12-05 17:24:19 -08001623
1624 std::unique_ptr<EventLoop> pi1_remote_timestamp =
Austin Schuh58646e22021-08-23 23:51:46 -07001625 pi1->MakeEventLoop("pi1_remote_timestamp");
Austin Schuh20ac95d2020-12-05 17:24:19 -08001626
1627 int timestamp_count = 0;
1628 pi1_remote_timestamp->MakeWatcher(
Austin Schuh8902fa52021-03-14 22:39:24 -07001629 "/pi2/aos", [&expected_boot_uuid,
1630 &pi1_remote_timestamp](const message_bridge::Timestamp &) {
Austin Schuha9012be2021-07-21 15:19:11 -07001631 EXPECT_EQ(pi1_remote_timestamp->context().source_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -07001632 expected_boot_uuid);
1633 });
1634 pi1_remote_timestamp->MakeWatcher(
1635 "/test",
1636 [&expected_boot_uuid, &pi1_remote_timestamp](const examples::Pong &) {
Austin Schuha9012be2021-07-21 15:19:11 -07001637 EXPECT_EQ(pi1_remote_timestamp->context().source_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -07001638 expected_boot_uuid);
1639 });
1640 pi1_remote_timestamp->MakeWatcher(
Austin Schuh89c9b812021-02-20 14:42:10 -08001641 shared() ? "/pi1/aos/remote_timestamps/pi2"
1642 : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping",
Austin Schuh20ac95d2020-12-05 17:24:19 -08001643 [&timestamp_count, &expected_boot_uuid](const RemoteMessage &header) {
1644 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuhcdd90272021-03-15 12:46:16 -07001645 EXPECT_EQ(UUID::FromVector(header.boot_uuid()), expected_boot_uuid);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001646 VLOG(1) << aos::FlatbufferToJson(&header);
1647 ++timestamp_count;
1648 });
1649
1650 int pi1_server_statistics_count = 0;
Austin Schuh58646e22021-08-23 23:51:46 -07001651 bool first_pi1_server_statistics = true;
Austin Schuh367a7f42021-11-23 23:04:36 -08001652 int boot_number = 0;
1653 monotonic_clock::time_point expected_connection_time = pi1->monotonic_now();
Austin Schuh20ac95d2020-12-05 17:24:19 -08001654 pi1_remote_timestamp->MakeWatcher(
Austin Schuh367a7f42021-11-23 23:04:36 -08001655 "/pi1/aos",
1656 [&pi1_server_statistics_count, &expected_boot_uuid,
1657 &expected_connection_time, &first_pi1_server_statistics,
1658 &boot_number](const message_bridge::ServerStatistics &stats) {
Austin Schuh20ac95d2020-12-05 17:24:19 -08001659 VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
1660 for (const message_bridge::ServerConnection *connection :
1661 *stats.connections()) {
Austin Schuh58646e22021-08-23 23:51:46 -07001662 if (connection->state() == message_bridge::State::CONNECTED) {
1663 ASSERT_TRUE(connection->has_boot_uuid());
1664 }
1665 if (!first_pi1_server_statistics) {
1666 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1667 }
Austin Schuh20ac95d2020-12-05 17:24:19 -08001668 if (connection->node()->name()->string_view() == "pi2") {
Austin Schuh58646e22021-08-23 23:51:46 -07001669 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1670 ASSERT_TRUE(connection->has_boot_uuid());
Austin Schuh20ac95d2020-12-05 17:24:19 -08001671 EXPECT_EQ(expected_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -07001672 UUID::FromString(connection->boot_uuid()))
Austin Schuh20ac95d2020-12-05 17:24:19 -08001673 << " : Got " << aos::FlatbufferToJson(&stats);
Austin Schuh367a7f42021-11-23 23:04:36 -08001674 EXPECT_EQ(monotonic_clock::time_point(chrono::nanoseconds(
1675 connection->connected_since_time())),
1676 expected_connection_time);
1677 EXPECT_EQ(boot_number + 1, connection->connection_count());
Austin Schuh20ac95d2020-12-05 17:24:19 -08001678 ++pi1_server_statistics_count;
1679 }
1680 }
Austin Schuh58646e22021-08-23 23:51:46 -07001681 first_pi1_server_statistics = false;
Austin Schuh20ac95d2020-12-05 17:24:19 -08001682 });
1683
Austin Schuh58646e22021-08-23 23:51:46 -07001684 int pi1_client_statistics_count = 0;
1685 pi1_remote_timestamp->MakeWatcher(
Austin Schuh367a7f42021-11-23 23:04:36 -08001686 "/pi1/aos", [&pi1_client_statistics_count, &expected_boot_uuid,
1687 &expected_connection_time, &boot_number](
Austin Schuh58646e22021-08-23 23:51:46 -07001688 const message_bridge::ClientStatistics &stats) {
1689 VLOG(1) << "pi1 ClientStatistics " << FlatbufferToJson(&stats);
1690 for (const message_bridge::ClientConnection *connection :
1691 *stats.connections()) {
1692 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1693 if (connection->node()->name()->string_view() == "pi2") {
1694 ++pi1_client_statistics_count;
Austin Schuh367a7f42021-11-23 23:04:36 -08001695 EXPECT_EQ(expected_boot_uuid,
1696 UUID::FromString(connection->boot_uuid()))
1697 << " : Got " << aos::FlatbufferToJson(&stats);
1698 EXPECT_EQ(monotonic_clock::time_point(chrono::nanoseconds(
1699 connection->connected_since_time())),
1700 expected_connection_time);
1701 EXPECT_EQ(boot_number + 1, connection->connection_count());
1702 } else {
1703 EXPECT_EQ(connection->connected_since_time(), 0);
1704 EXPECT_EQ(1, connection->connection_count());
Austin Schuh58646e22021-08-23 23:51:46 -07001705 }
1706 }
1707 });
1708
1709 // Confirm that reboot changes the UUID.
Austin Schuh367a7f42021-11-23 23:04:36 -08001710 pi2->OnShutdown(
1711 [&expected_boot_uuid, &boot_number, &expected_connection_time, pi1, pi2,
1712 pi2_boot1]() {
1713 expected_boot_uuid = pi2_boot1;
1714 ++boot_number;
1715 LOG(INFO) << "OnShutdown triggered for pi2";
1716 pi2->OnStartup(
1717 [&expected_boot_uuid, &expected_connection_time, pi1, pi2]() {
1718 EXPECT_EQ(expected_boot_uuid, pi2->boot_uuid());
1719 expected_connection_time = pi1->monotonic_now();
1720 });
1721 });
Austin Schuh58646e22021-08-23 23:51:46 -07001722
Austin Schuh20ac95d2020-12-05 17:24:19 -08001723 // Let a couple of ServerStatistics messages show up before rebooting.
Austin Schuh58646e22021-08-23 23:51:46 -07001724 factory.RunFor(chrono::milliseconds(2002));
Austin Schuh20ac95d2020-12-05 17:24:19 -08001725
1726 EXPECT_GT(timestamp_count, 100);
1727 EXPECT_GE(pi1_server_statistics_count, 1u);
1728
Austin Schuh20ac95d2020-12-05 17:24:19 -08001729 timestamp_count = 0;
1730 pi1_server_statistics_count = 0;
1731
Austin Schuh58646e22021-08-23 23:51:46 -07001732 factory.RunFor(chrono::milliseconds(2000));
Austin Schuh20ac95d2020-12-05 17:24:19 -08001733 EXPECT_GT(timestamp_count, 100);
1734 EXPECT_GE(pi1_server_statistics_count, 1u);
1735}
1736
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -07001737INSTANTIATE_TEST_SUITE_P(
Austin Schuh89c9b812021-02-20 14:42:10 -08001738 All, RemoteMessageSimulatedEventLoopTest,
1739 ::testing::Values(
1740 Param{"multinode_pingpong_test_combined_config.json", true},
1741 Param{"multinode_pingpong_test_split_config.json", false}));
1742
Austin Schuh58646e22021-08-23 23:51:46 -07001743// Tests that Startup and Shutdown do reasonable things.
1744TEST(SimulatedEventLoopTest, MultinodePingPongStartup) {
1745 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
1746 aos::configuration::ReadConfig(
1747 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
1748
Austin Schuh72e65682021-09-02 11:37:05 -07001749 size_t pi1_shutdown_counter = 0;
1750 size_t pi2_shutdown_counter = 0;
1751 MessageCounter<examples::Pong> *pi1_pong_counter = nullptr;
1752 MessageCounter<examples::Ping> *pi2_ping_counter = nullptr;
1753
Austin Schuh58646e22021-08-23 23:51:46 -07001754 message_bridge::TestingTimeConverter time(
1755 configuration::NodesCount(&config.message()));
1756 SimulatedEventLoopFactory factory(&config.message());
1757 factory.SetTimeConverter(&time);
1758 time.AddNextTimestamp(
1759 distributed_clock::epoch(),
1760 {BootTimestamp::epoch(), BootTimestamp::epoch(), BootTimestamp::epoch()});
1761
1762 const chrono::nanoseconds dt = chrono::seconds(10) + chrono::milliseconds(6);
1763
1764 time.AddNextTimestamp(
1765 distributed_clock::epoch() + dt,
1766 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
1767 BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
1768 BootTimestamp::epoch() + dt});
1769
1770 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1771 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
1772
1773 // Configure startup to start Ping and Pong, and count.
1774 size_t pi1_startup_counter = 0;
1775 size_t pi2_startup_counter = 0;
1776 pi1->OnStartup([pi1]() {
1777 LOG(INFO) << "Made ping";
1778 pi1->AlwaysStart<Ping>("ping");
1779 });
1780 pi1->OnStartup([&pi1_startup_counter]() { ++pi1_startup_counter; });
1781 pi2->OnStartup([pi2]() {
1782 LOG(INFO) << "Made pong";
1783 pi2->AlwaysStart<Pong>("pong");
1784 });
1785 pi2->OnStartup([&pi2_startup_counter]() { ++pi2_startup_counter; });
1786
1787 // Shutdown just counts.
Austin Schuh58646e22021-08-23 23:51:46 -07001788 pi1->OnShutdown([&pi1_shutdown_counter]() { ++pi1_shutdown_counter; });
1789 pi2->OnShutdown([&pi2_shutdown_counter]() { ++pi2_shutdown_counter; });
1790
Austin Schuh58646e22021-08-23 23:51:46 -07001791 // Automatically make counters on startup.
1792 pi1->OnStartup([&pi1_pong_counter, pi1]() {
1793 pi1_pong_counter = pi1->AlwaysStart<MessageCounter<examples::Pong>>(
1794 "pi1_pong_counter", "/test");
1795 });
1796 pi1->OnShutdown([&pi1_pong_counter]() { pi1_pong_counter = nullptr; });
1797 pi2->OnStartup([&pi2_ping_counter, pi2]() {
1798 pi2_ping_counter = pi2->AlwaysStart<MessageCounter<examples::Ping>>(
1799 "pi2_ping_counter", "/test");
1800 });
1801 pi2->OnShutdown([&pi2_ping_counter]() { pi2_ping_counter = nullptr; });
1802
1803 EXPECT_EQ(pi2_ping_counter, nullptr);
1804 EXPECT_EQ(pi1_pong_counter, nullptr);
1805
1806 EXPECT_EQ(pi1_startup_counter, 0u);
1807 EXPECT_EQ(pi2_startup_counter, 0u);
1808 EXPECT_EQ(pi1_shutdown_counter, 0u);
1809 EXPECT_EQ(pi2_shutdown_counter, 0u);
1810
1811 factory.RunFor(chrono::seconds(10) + chrono::milliseconds(5));
1812 EXPECT_EQ(pi1_startup_counter, 1u);
1813 EXPECT_EQ(pi2_startup_counter, 1u);
1814 EXPECT_EQ(pi1_shutdown_counter, 0u);
1815 EXPECT_EQ(pi2_shutdown_counter, 0u);
1816 EXPECT_EQ(pi2_ping_counter->count(), 1001);
1817 EXPECT_EQ(pi1_pong_counter->count(), 1001);
1818
1819 LOG(INFO) << pi1->monotonic_now();
1820 LOG(INFO) << pi2->monotonic_now();
1821
1822 factory.RunFor(chrono::seconds(5) + chrono::milliseconds(5));
1823
1824 EXPECT_EQ(pi1_startup_counter, 2u);
1825 EXPECT_EQ(pi2_startup_counter, 2u);
1826 EXPECT_EQ(pi1_shutdown_counter, 1u);
1827 EXPECT_EQ(pi2_shutdown_counter, 1u);
1828 EXPECT_EQ(pi2_ping_counter->count(), 501);
1829 EXPECT_EQ(pi1_pong_counter->count(), 501);
1830}
1831
1832// Tests that OnStartup handlers can be added after running and get called, and
1833// can't be called when running.
1834TEST(SimulatedEventLoopDeathTest, OnStartupWhileRunning) {
1835 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
1836 aos::configuration::ReadConfig(
1837 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
1838
1839 // Test that we can add startup handlers as long as we aren't running, and
1840 // they get run when Run gets called again.
1841 // Test that adding a startup handler when running fails.
1842 //
1843 // Test shutdown handlers get called on destruction.
1844 SimulatedEventLoopFactory factory(&config.message());
1845
1846 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1847
1848 int startup_count0 = 0;
1849 int startup_count1 = 0;
1850
1851 pi1->OnStartup([&]() { ++startup_count0; });
1852 EXPECT_EQ(startup_count0, 0);
1853 EXPECT_EQ(startup_count1, 0);
1854
1855 factory.RunFor(chrono::nanoseconds(1));
1856 EXPECT_EQ(startup_count0, 1);
1857 EXPECT_EQ(startup_count1, 0);
1858
1859 pi1->OnStartup([&]() { ++startup_count1; });
1860 EXPECT_EQ(startup_count0, 1);
1861 EXPECT_EQ(startup_count1, 0);
1862
1863 factory.RunFor(chrono::nanoseconds(1));
1864 EXPECT_EQ(startup_count0, 1);
1865 EXPECT_EQ(startup_count1, 1);
1866
1867 std::unique_ptr<EventLoop> loop = pi1->MakeEventLoop("foo");
1868 loop->OnRun([&]() { pi1->OnStartup([]() {}); });
1869
1870 EXPECT_DEATH({ factory.RunFor(chrono::nanoseconds(1)); },
1871 "Can only register OnStartup handlers when not running.");
1872}
1873
1874// Tests that OnStartup handlers can be added after running and get called, and
1875// all the handlers get called on reboot. Shutdown handlers are tested the same
1876// way.
1877TEST(SimulatedEventLoopTest, OnStartupShutdownAllRestarts) {
1878 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
1879 aos::configuration::ReadConfig(
1880 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
1881
Austin Schuh72e65682021-09-02 11:37:05 -07001882 int startup_count0 = 0;
1883 int shutdown_count0 = 0;
1884 int startup_count1 = 0;
1885 int shutdown_count1 = 0;
1886
Austin Schuh58646e22021-08-23 23:51:46 -07001887 message_bridge::TestingTimeConverter time(
1888 configuration::NodesCount(&config.message()));
1889 SimulatedEventLoopFactory factory(&config.message());
1890 factory.SetTimeConverter(&time);
1891 time.StartEqual();
1892
1893 const chrono::nanoseconds dt = chrono::seconds(10);
1894 time.RebootAt(0, distributed_clock::epoch() + dt);
1895 time.RebootAt(0, distributed_clock::epoch() + 2 * dt);
1896
1897 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1898
Austin Schuh58646e22021-08-23 23:51:46 -07001899 pi1->OnStartup([&]() { ++startup_count0; });
1900 pi1->OnShutdown([&]() { ++shutdown_count0; });
1901 EXPECT_EQ(startup_count0, 0);
1902 EXPECT_EQ(startup_count1, 0);
1903 EXPECT_EQ(shutdown_count0, 0);
1904 EXPECT_EQ(shutdown_count1, 0);
1905
1906 factory.RunFor(chrono::nanoseconds(1));
1907 EXPECT_EQ(startup_count0, 1);
1908 EXPECT_EQ(startup_count1, 0);
1909 EXPECT_EQ(shutdown_count0, 0);
1910 EXPECT_EQ(shutdown_count1, 0);
1911
1912 pi1->OnStartup([&]() { ++startup_count1; });
1913 EXPECT_EQ(startup_count0, 1);
1914 EXPECT_EQ(startup_count1, 0);
1915 EXPECT_EQ(shutdown_count0, 0);
1916 EXPECT_EQ(shutdown_count1, 0);
1917
1918 factory.RunFor(chrono::nanoseconds(1));
1919 EXPECT_EQ(startup_count0, 1);
1920 EXPECT_EQ(startup_count1, 1);
1921 EXPECT_EQ(shutdown_count0, 0);
1922 EXPECT_EQ(shutdown_count1, 0);
1923
1924 factory.RunFor(chrono::seconds(15));
1925
1926 EXPECT_EQ(startup_count0, 2);
1927 EXPECT_EQ(startup_count1, 2);
1928 EXPECT_EQ(shutdown_count0, 1);
1929 EXPECT_EQ(shutdown_count1, 0);
1930
1931 pi1->OnShutdown([&]() { ++shutdown_count1; });
1932 factory.RunFor(chrono::seconds(10));
1933
1934 EXPECT_EQ(startup_count0, 3);
1935 EXPECT_EQ(startup_count1, 3);
1936 EXPECT_EQ(shutdown_count0, 2);
1937 EXPECT_EQ(shutdown_count1, 1);
1938}
1939
1940// Tests that event loops which outlive shutdown crash.
1941TEST(SimulatedEventLoopDeathTest, EventLoopOutlivesReboot) {
1942 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
1943 aos::configuration::ReadConfig(
1944 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
1945
1946 message_bridge::TestingTimeConverter time(
1947 configuration::NodesCount(&config.message()));
1948 SimulatedEventLoopFactory factory(&config.message());
1949 factory.SetTimeConverter(&time);
1950 time.StartEqual();
1951
1952 const chrono::nanoseconds dt = chrono::seconds(10);
1953 time.RebootAt(0, distributed_clock::epoch() + dt);
1954
1955 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1956
1957 std::unique_ptr<EventLoop> loop = pi1->MakeEventLoop("foo");
1958
1959 EXPECT_DEATH({ factory.RunFor(dt * 2); }, "Event loop");
1960}
1961
1962// Tests that messages don't survive a reboot of a node.
1963TEST(SimulatedEventLoopTest, ChannelClearedOnReboot) {
1964 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
1965 aos::configuration::ReadConfig(
1966 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
1967
1968 message_bridge::TestingTimeConverter time(
1969 configuration::NodesCount(&config.message()));
1970 SimulatedEventLoopFactory factory(&config.message());
1971 factory.SetTimeConverter(&time);
1972 time.StartEqual();
1973
1974 const chrono::nanoseconds dt = chrono::seconds(10);
1975 time.RebootAt(0, distributed_clock::epoch() + dt);
1976
1977 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1978
1979 const UUID boot_uuid = pi1->boot_uuid();
1980 EXPECT_NE(boot_uuid, UUID::Zero());
1981
1982 {
1983 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
1984 aos::Sender<examples::Ping> test_message_sender =
1985 ping_event_loop->MakeSender<examples::Ping>("/reliable");
1986 SendPing(&test_message_sender, 1);
1987 }
1988
1989 factory.RunFor(chrono::seconds(5));
1990
1991 {
1992 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
1993 aos::Fetcher<examples::Ping> fetcher =
1994 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
1995 EXPECT_TRUE(fetcher.Fetch());
1996 }
1997
1998 factory.RunFor(chrono::seconds(10));
1999
2000 {
2001 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
2002 aos::Fetcher<examples::Ping> fetcher =
2003 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
2004 EXPECT_FALSE(fetcher.Fetch());
2005 }
2006 EXPECT_NE(boot_uuid, pi1->boot_uuid());
2007}
2008
2009// Tests that reliable messages get resent on reboot.
2010TEST(SimulatedEventLoopTest, ReliableMessageResentOnReboot) {
2011 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2012 aos::configuration::ReadConfig(
2013 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2014
2015 message_bridge::TestingTimeConverter time(
2016 configuration::NodesCount(&config.message()));
2017 SimulatedEventLoopFactory factory(&config.message());
2018 factory.SetTimeConverter(&time);
2019 time.StartEqual();
2020
2021 const chrono::nanoseconds dt = chrono::seconds(1);
2022 time.RebootAt(1, distributed_clock::epoch() + dt);
2023
2024 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2025 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2026
2027 const UUID pi1_boot_uuid = pi1->boot_uuid();
2028 const UUID pi2_boot_uuid = pi2->boot_uuid();
2029 EXPECT_NE(pi1_boot_uuid, UUID::Zero());
2030 EXPECT_NE(pi2_boot_uuid, UUID::Zero());
2031
2032 {
2033 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
2034 aos::Sender<examples::Ping> test_message_sender =
2035 ping_event_loop->MakeSender<examples::Ping>("/reliable");
2036 SendPing(&test_message_sender, 1);
2037 }
2038
2039 factory.RunFor(chrono::milliseconds(500));
2040
2041 {
2042 ::std::unique_ptr<EventLoop> ping_event_loop = pi2->MakeEventLoop("pong");
2043 aos::Fetcher<examples::Ping> fetcher =
2044 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
2045 EXPECT_TRUE(fetcher.Fetch());
2046 }
2047
2048 factory.RunFor(chrono::seconds(1));
2049
2050 {
2051 ::std::unique_ptr<EventLoop> ping_event_loop = pi2->MakeEventLoop("pong");
2052 aos::Fetcher<examples::Ping> fetcher =
2053 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
2054 EXPECT_TRUE(fetcher.Fetch());
2055 }
2056 EXPECT_NE(pi2_boot_uuid, pi2->boot_uuid());
2057}
2058
Austin Schuh48205e62021-11-12 14:13:18 -08002059class SimulatedEventLoopDisconnectTest : public ::testing::Test {
2060 public:
2061 SimulatedEventLoopDisconnectTest()
2062 : config(aos::configuration::ReadConfig(ArtifactPath(
2063 "aos/events/multinode_pingpong_test_split_config.json"))),
2064 time(configuration::NodesCount(&config.message())),
2065 factory(&config.message()) {
2066 factory.SetTimeConverter(&time);
2067 }
2068
2069 void VerifyChannels(std::set<const aos::Channel *> statistics_channels,
2070 const monotonic_clock::time_point allowable_message_time,
2071 std::set<const aos::Node *> empty_nodes) {
2072 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2073 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2074 std::unique_ptr<aos::EventLoop> pi1_event_loop =
2075 pi1->MakeEventLoop("fetcher");
2076 std::unique_ptr<aos::EventLoop> pi2_event_loop =
2077 pi2->MakeEventLoop("fetcher");
2078 for (const aos::Channel *channel : *factory.configuration()->channels()) {
2079 if (configuration::ChannelIsReadableOnNode(channel,
2080 pi1_event_loop->node())) {
2081 std::unique_ptr<aos::RawFetcher> fetcher =
2082 pi1_event_loop->MakeRawFetcher(channel);
2083 if (statistics_channels.find(channel) == statistics_channels.end() ||
2084 empty_nodes.find(pi1_event_loop->node()) != empty_nodes.end()) {
2085 EXPECT_FALSE(fetcher->Fetch() &&
2086 fetcher->context().monotonic_event_time >
2087 allowable_message_time)
2088 << ": Found recent message on channel "
2089 << configuration::CleanedChannelToString(channel) << " and time "
2090 << fetcher->context().monotonic_event_time << " > "
2091 << allowable_message_time << " on pi1";
2092 } else {
2093 EXPECT_TRUE(fetcher->Fetch() &&
2094 fetcher->context().monotonic_event_time >=
2095 allowable_message_time)
2096 << ": Didn't find recent message on channel "
2097 << configuration::CleanedChannelToString(channel) << " on pi1";
2098 }
2099 }
2100 if (configuration::ChannelIsReadableOnNode(channel,
2101 pi2_event_loop->node())) {
2102 std::unique_ptr<aos::RawFetcher> fetcher =
2103 pi2_event_loop->MakeRawFetcher(channel);
2104 if (statistics_channels.find(channel) == statistics_channels.end() ||
2105 empty_nodes.find(pi2_event_loop->node()) != empty_nodes.end()) {
2106 EXPECT_FALSE(fetcher->Fetch() &&
2107 fetcher->context().monotonic_event_time >
2108 allowable_message_time)
2109 << ": Found message on channel "
2110 << configuration::CleanedChannelToString(channel) << " and time "
2111 << fetcher->context().monotonic_event_time << " > "
2112 << allowable_message_time << " on pi2";
2113 } else {
2114 EXPECT_TRUE(fetcher->Fetch() &&
2115 fetcher->context().monotonic_event_time >=
2116 allowable_message_time)
2117 << ": Didn't find message on channel "
2118 << configuration::CleanedChannelToString(channel) << " on pi2";
2119 }
2120 }
2121 }
2122 }
2123
2124 aos::FlatbufferDetachedBuffer<aos::Configuration> config;
2125
2126 message_bridge::TestingTimeConverter time;
2127 SimulatedEventLoopFactory factory;
2128};
2129
2130// Tests that if we have message bridge client/server disabled, and timing
2131// reports disabled, no messages are sent. Also tests that we can disconnect a
2132// node and disable statistics on it and it actually fully disconnects.
2133TEST_F(SimulatedEventLoopDisconnectTest, NoMessagesWhenDisabled) {
2134 time.StartEqual();
2135 factory.SkipTimingReport();
2136 factory.DisableStatistics();
2137
2138 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2139 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2140
2141 std::unique_ptr<aos::EventLoop> pi1_event_loop =
2142 pi1->MakeEventLoop("fetcher");
2143 std::unique_ptr<aos::EventLoop> pi2_event_loop =
2144 pi2->MakeEventLoop("fetcher");
2145
2146 factory.RunFor(chrono::milliseconds(100000));
2147
2148 // Confirm no messages are sent if we've configured them all off.
2149 VerifyChannels({}, monotonic_clock::min_time, {});
2150
2151 // Now, confirm that all the message_bridge channels come back when we
2152 // re-enable.
2153 factory.EnableStatistics();
2154
2155 factory.RunFor(chrono::milliseconds(10050));
2156
2157 // Build up the list of all the messages we expect when we come back.
2158 {
2159 std::set<const aos::Channel *> statistics_channels;
Brian Silverman18c8aac2022-01-02 00:03:37 -08002160 for (const std::pair<std::string_view, const Node *> &pi :
Austin Schuh48205e62021-11-12 14:13:18 -08002161 std::vector<std::pair<std::string_view, const Node *>>{
2162 {"/pi1/aos", pi1->node()},
2163 {"/pi2/aos", pi1->node()},
2164 {"/pi3/aos", pi1->node()}}) {
2165 statistics_channels.insert(configuration::GetChannel(
2166 factory.configuration(), pi.first, "aos.message_bridge.Timestamp", "",
2167 pi.second));
2168 statistics_channels.insert(configuration::GetChannel(
2169 factory.configuration(), pi.first,
2170 "aos.message_bridge.ServerStatistics", "", pi.second));
2171 statistics_channels.insert(configuration::GetChannel(
2172 factory.configuration(), pi.first,
2173 "aos.message_bridge.ClientStatistics", "", pi.second));
2174 }
2175
2176 statistics_channels.insert(configuration::GetChannel(
2177 factory.configuration(),
2178 "/pi1/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp",
2179 "aos.message_bridge.RemoteMessage", "", pi1->node()));
2180 statistics_channels.insert(configuration::GetChannel(
2181 factory.configuration(),
2182 "/pi2/aos/remote_timestamps/pi1/pi2/aos/aos-message_bridge-Timestamp",
2183 "aos.message_bridge.RemoteMessage", "", pi2->node()));
2184 VerifyChannels(statistics_channels, monotonic_clock::min_time, {});
2185 }
2186
2187 // Now test that we can disable the messages for a single node
2188 pi2->DisableStatistics();
2189 const aos::monotonic_clock::time_point statistics_disable_time =
2190 pi2->monotonic_now();
2191 factory.RunFor(chrono::milliseconds(10000));
2192
2193 // We should see a much smaller set of messages, but should still see messages
2194 // forwarded, mainly the timestamp message.
2195 {
2196 std::set<const aos::Channel *> statistics_channels;
Brian Silverman18c8aac2022-01-02 00:03:37 -08002197 for (const std::pair<std::string_view, const Node *> &pi :
Austin Schuh48205e62021-11-12 14:13:18 -08002198 std::vector<std::pair<std::string_view, const Node *>>{
2199 {"/pi1/aos", pi1->node()}, {"/pi3/aos", pi1->node()}}) {
2200 statistics_channels.insert(configuration::GetChannel(
2201 factory.configuration(), pi.first, "aos.message_bridge.Timestamp", "",
2202 pi.second));
2203 statistics_channels.insert(configuration::GetChannel(
2204 factory.configuration(), pi.first,
2205 "aos.message_bridge.ServerStatistics", "", pi.second));
2206 statistics_channels.insert(configuration::GetChannel(
2207 factory.configuration(), pi.first,
2208 "aos.message_bridge.ClientStatistics", "", pi.second));
2209 }
2210
2211 statistics_channels.insert(configuration::GetChannel(
2212 factory.configuration(),
2213 "/pi1/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp",
2214 "aos.message_bridge.RemoteMessage", "", pi1->node()));
2215 VerifyChannels(statistics_channels, statistics_disable_time, {});
2216 }
2217
2218 // Now, fully disconnect the node. This will completely quiet down pi2.
2219 pi1->Disconnect(pi2->node());
2220 pi2->Disconnect(pi1->node());
2221
2222 const aos::monotonic_clock::time_point disconnect_disable_time =
2223 pi2->monotonic_now();
2224 factory.RunFor(chrono::milliseconds(10000));
2225
2226 {
2227 std::set<const aos::Channel *> statistics_channels;
Brian Silverman18c8aac2022-01-02 00:03:37 -08002228 for (const std::pair<std::string_view, const Node *> &pi :
Austin Schuh48205e62021-11-12 14:13:18 -08002229 std::vector<std::pair<std::string_view, const Node *>>{
2230 {"/pi1/aos", pi1->node()}, {"/pi3/aos", pi1->node()}}) {
2231 statistics_channels.insert(configuration::GetChannel(
2232 factory.configuration(), pi.first, "aos.message_bridge.Timestamp", "",
2233 pi.second));
2234 statistics_channels.insert(configuration::GetChannel(
2235 factory.configuration(), pi.first,
2236 "aos.message_bridge.ServerStatistics", "", pi.second));
2237 statistics_channels.insert(configuration::GetChannel(
2238 factory.configuration(), pi.first,
2239 "aos.message_bridge.ClientStatistics", "", pi.second));
2240 }
2241
2242 VerifyChannels(statistics_channels, disconnect_disable_time, {pi2->node()});
2243 }
2244}
2245
Neil Balchc8f41ed2018-01-20 22:06:53 -08002246} // namespace testing
2247} // namespace aos