blob: 71d4138c71243ab1ebdf8ae4b997092bf4311f98 [file] [log] [blame]
Alex Perrycb7da4b2019-08-28 19:35:56 -07001#include "aos/events/simulated_event_loop.h"
2
milind1f1dca32021-07-03 13:50:07 -07003#include <chrono>
Austin Schuh5f1cc5c2019-12-01 18:01:11 -08004#include <string_view>
5
Alex Perrycb7da4b2019-08-28 19:35:56 -07006#include "aos/events/event_loop_param_test.h"
Austin Schuh2f8fd752020-09-01 22:38:28 -07007#include "aos/events/logging/logger_generated.h"
Austin Schuh01b4c352020-09-21 23:09:39 -07008#include "aos/events/message_counter.h"
Austin Schuh898f4972020-01-11 17:21:25 -08009#include "aos/events/ping_lib.h"
10#include "aos/events/pong_lib.h"
Austin Schuh7d87b672019-12-01 20:23:49 -080011#include "aos/events/test_message_generated.h"
Austin Schuh4c3b9702020-08-30 11:34:55 -070012#include "aos/network/message_bridge_client_generated.h"
13#include "aos/network/message_bridge_server_generated.h"
Austin Schuh0de30f32020-12-06 12:44:28 -080014#include "aos/network/remote_message_generated.h"
Austin Schuh87dd3832021-01-01 23:07:31 -080015#include "aos/network/testing_time_converter.h"
Austin Schuh4c3b9702020-08-30 11:34:55 -070016#include "aos/network/timestamp_generated.h"
Austin Schuh373f1762021-06-02 21:07:09 -070017#include "aos/testing/path.h"
Neil Balchc8f41ed2018-01-20 22:06:53 -080018#include "gtest/gtest.h"
19
20namespace aos {
21namespace testing {
Brian Silverman28d14302020-09-18 15:26:17 -070022namespace {
23
Austin Schuh373f1762021-06-02 21:07:09 -070024using aos::testing::ArtifactPath;
Brian Silverman28d14302020-09-18 15:26:17 -070025
Austin Schuh58646e22021-08-23 23:51:46 -070026using logger::BootTimestamp;
Austin Schuh0de30f32020-12-06 12:44:28 -080027using message_bridge::RemoteMessage;
Austin Schuh7267c532019-05-19 19:55:53 -070028namespace chrono = ::std::chrono;
29
Austin Schuh0de30f32020-12-06 12:44:28 -080030} // namespace
31
Neil Balchc8f41ed2018-01-20 22:06:53 -080032class SimulatedEventLoopTestFactory : public EventLoopTestFactory {
33 public:
Austin Schuh5f1cc5c2019-12-01 18:01:11 -080034 ::std::unique_ptr<EventLoop> Make(std::string_view name) override {
Austin Schuh217a9782019-12-21 23:02:50 -080035 MaybeMake();
Austin Schuhac0771c2020-01-07 18:36:30 -080036 return event_loop_factory_->MakeEventLoop(name, my_node());
Neil Balchc8f41ed2018-01-20 22:06:53 -080037 }
Austin Schuh5f1cc5c2019-12-01 18:01:11 -080038 ::std::unique_ptr<EventLoop> MakePrimary(std::string_view name) override {
Austin Schuh217a9782019-12-21 23:02:50 -080039 MaybeMake();
Austin Schuhac0771c2020-01-07 18:36:30 -080040 return event_loop_factory_->MakeEventLoop(name, my_node());
Austin Schuh44019f92019-05-19 19:58:27 -070041 }
42
Austin Schuh217a9782019-12-21 23:02:50 -080043 void Run() override { event_loop_factory_->Run(); }
44 void Exit() override { event_loop_factory_->Exit(); }
Austin Schuh44019f92019-05-19 19:58:27 -070045
Austin Schuh52d325c2019-06-23 18:59:06 -070046 // TODO(austin): Implement this. It's used currently for a phased loop test.
47 // I'm not sure how much that matters.
48 void SleepFor(::std::chrono::nanoseconds /*duration*/) override {}
49
Austin Schuh7d87b672019-12-01 20:23:49 -080050 void set_send_delay(std::chrono::nanoseconds send_delay) {
Austin Schuh217a9782019-12-21 23:02:50 -080051 MaybeMake();
52 event_loop_factory_->set_send_delay(send_delay);
Austin Schuh7d87b672019-12-01 20:23:49 -080053 }
54
Neil Balchc8f41ed2018-01-20 22:06:53 -080055 private:
Austin Schuh217a9782019-12-21 23:02:50 -080056 void MaybeMake() {
57 if (!event_loop_factory_) {
58 if (configuration()->has_nodes()) {
Austin Schuhac0771c2020-01-07 18:36:30 -080059 event_loop_factory_ =
60 std::make_unique<SimulatedEventLoopFactory>(configuration());
Austin Schuh217a9782019-12-21 23:02:50 -080061 } else {
62 event_loop_factory_ =
63 std::make_unique<SimulatedEventLoopFactory>(configuration());
64 }
65 }
66 }
67 std::unique_ptr<SimulatedEventLoopFactory> event_loop_factory_;
Neil Balchc8f41ed2018-01-20 22:06:53 -080068};
69
Austin Schuh6bae8252021-02-07 22:01:49 -080070auto CommonParameters() {
71 return ::testing::Combine(
72 ::testing::Values([]() { return new SimulatedEventLoopTestFactory(); }),
73 ::testing::Values(ReadMethod::COPY, ReadMethod::PIN),
74 ::testing::Values(DoTimingReports::kYes, DoTimingReports::kNo));
75}
Austin Schuh6b6dfa52019-06-12 20:16:20 -070076
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -070077INSTANTIATE_TEST_SUITE_P(SimulatedEventLoopCommonTest, AbstractEventLoopTest,
Austin Schuh66168842021-08-17 19:42:21 -070078 CommonParameters());
Brian Silverman77162972020-08-12 19:52:40 -070079
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -070080INSTANTIATE_TEST_SUITE_P(SimulatedEventLoopCommonDeathTest,
Austin Schuh66168842021-08-17 19:42:21 -070081 AbstractEventLoopDeathTest, CommonParameters());
Neil Balchc8f41ed2018-01-20 22:06:53 -080082
Austin Schuh89c9b812021-02-20 14:42:10 -080083// Parameters to run all the tests with.
84struct Param {
85 // The config file to use.
86 std::string config;
87 // If true, the RemoteMessage channel should be shared between all the remote
88 // channels. If false, there will be 1 RemoteMessage channel per remote
89 // channel.
90 bool shared;
91};
92
93class RemoteMessageSimulatedEventLoopTest
94 : public ::testing::TestWithParam<struct Param> {
95 public:
96 RemoteMessageSimulatedEventLoopTest()
97 : config(aos::configuration::ReadConfig(
Austin Schuh373f1762021-06-02 21:07:09 -070098 ArtifactPath(absl::StrCat("aos/events/", GetParam().config)))) {
Austin Schuh89c9b812021-02-20 14:42:10 -080099 LOG(INFO) << "Config " << GetParam().config;
100 }
101
102 bool shared() const { return GetParam().shared; }
103
104 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
105 MakePi2OnPi1MessageCounters(aos::EventLoop *event_loop) {
106 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>> counters;
107 if (shared()) {
108 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
109 event_loop, "/aos/remote_timestamps/pi2"));
110 } else {
111 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
112 event_loop,
113 "/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp"));
114 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
115 event_loop, "/aos/remote_timestamps/pi2/test/aos-examples-Ping"));
116 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
117 event_loop, "/aos/remote_timestamps/pi2/reliable/aos-examples-Ping"));
118 }
119 return counters;
120 }
121
122 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
123 MakePi1OnPi2MessageCounters(aos::EventLoop *event_loop) {
124 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>> counters;
125 if (shared()) {
126 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
127 event_loop, "/aos/remote_timestamps/pi1"));
128 } else {
129 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
130 event_loop, "/aos/remote_timestamps/pi1/test/aos-examples-Pong"));
131 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
132 event_loop,
133 "/aos/remote_timestamps/pi1/pi2/aos/aos-message_bridge-Timestamp"));
134 }
135 return counters;
136 }
137
138 aos::FlatbufferDetachedBuffer<aos::Configuration> config;
139};
140
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800141class FunctionEvent : public EventScheduler::Event {
142 public:
143 FunctionEvent(std::function<void()> fn) : fn_(fn) {}
144
145 void Handle() noexcept override { fn_(); }
146
147 private:
148 std::function<void()> fn_;
149};
150
Neil Balchc8f41ed2018-01-20 22:06:53 -0800151// Test that creating an event and running the scheduler runs the event.
152TEST(EventSchedulerTest, ScheduleEvent) {
153 int counter = 0;
Austin Schuh8bd96322020-02-13 21:18:22 -0800154 EventSchedulerScheduler scheduler_scheduler;
Austin Schuh58646e22021-08-23 23:51:46 -0700155 EventScheduler scheduler(0);
Austin Schuh8bd96322020-02-13 21:18:22 -0800156 scheduler_scheduler.AddEventScheduler(&scheduler);
Neil Balchc8f41ed2018-01-20 22:06:53 -0800157
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800158 FunctionEvent e([&counter]() { counter += 1; });
159 scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(1), &e);
Austin Schuh8bd96322020-02-13 21:18:22 -0800160 scheduler_scheduler.Run();
Neil Balchc8f41ed2018-01-20 22:06:53 -0800161 EXPECT_EQ(counter, 1);
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800162 auto token =
163 scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(2), &e);
Neil Balchc8f41ed2018-01-20 22:06:53 -0800164 scheduler.Deschedule(token);
Austin Schuh8bd96322020-02-13 21:18:22 -0800165 scheduler_scheduler.Run();
Neil Balchc8f41ed2018-01-20 22:06:53 -0800166 EXPECT_EQ(counter, 1);
167}
168
169// Test that descheduling an already scheduled event doesn't run the event.
170TEST(EventSchedulerTest, DescheduleEvent) {
171 int counter = 0;
Austin Schuh8bd96322020-02-13 21:18:22 -0800172 EventSchedulerScheduler scheduler_scheduler;
Austin Schuh58646e22021-08-23 23:51:46 -0700173 EventScheduler scheduler(0);
Austin Schuh8bd96322020-02-13 21:18:22 -0800174 scheduler_scheduler.AddEventScheduler(&scheduler);
Neil Balchc8f41ed2018-01-20 22:06:53 -0800175
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800176 FunctionEvent e([&counter]() { counter += 1; });
177 auto token =
178 scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(1), &e);
Neil Balchc8f41ed2018-01-20 22:06:53 -0800179 scheduler.Deschedule(token);
Austin Schuh8bd96322020-02-13 21:18:22 -0800180 scheduler_scheduler.Run();
Neil Balchc8f41ed2018-01-20 22:06:53 -0800181 EXPECT_EQ(counter, 0);
182}
Austin Schuh44019f92019-05-19 19:58:27 -0700183
Austin Schuh8fb315a2020-11-19 22:33:58 -0800184void SendTestMessage(aos::Sender<TestMessage> *sender, int value) {
185 aos::Sender<TestMessage>::Builder builder = sender->MakeBuilder();
186 TestMessage::Builder test_message_builder =
187 builder.MakeBuilder<TestMessage>();
188 test_message_builder.add_value(value);
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800189 ASSERT_EQ(builder.Send(test_message_builder.Finish()), RawSender::Error::kOk);
Austin Schuh8fb315a2020-11-19 22:33:58 -0800190}
191
192// Test that sending a message after running gets properly notified.
193TEST(SimulatedEventLoopTest, SendAfterRunFor) {
194 SimulatedEventLoopTestFactory factory;
195
196 SimulatedEventLoopFactory simulated_event_loop_factory(
197 factory.configuration());
198
199 ::std::unique_ptr<EventLoop> ping_event_loop =
200 simulated_event_loop_factory.MakeEventLoop("ping");
201 aos::Sender<TestMessage> test_message_sender =
202 ping_event_loop->MakeSender<TestMessage>("/test");
203 SendTestMessage(&test_message_sender, 1);
204
205 std::unique_ptr<EventLoop> pong1_event_loop =
206 simulated_event_loop_factory.MakeEventLoop("pong");
207 MessageCounter<TestMessage> test_message_counter1(pong1_event_loop.get(),
208 "/test");
209
210 EXPECT_FALSE(ping_event_loop->is_running());
211
212 // Watchers start when you start running, so there should be nothing counted.
213 simulated_event_loop_factory.RunFor(chrono::seconds(1));
214 EXPECT_EQ(test_message_counter1.count(), 0u);
215
216 std::unique_ptr<EventLoop> pong2_event_loop =
217 simulated_event_loop_factory.MakeEventLoop("pong");
218 MessageCounter<TestMessage> test_message_counter2(pong2_event_loop.get(),
219 "/test");
220
221 // Pauses in the middle don't count though, so this should be counted.
222 // But, the fresh watcher shouldn't pick it up yet.
223 SendTestMessage(&test_message_sender, 2);
224
225 EXPECT_EQ(test_message_counter1.count(), 0u);
226 EXPECT_EQ(test_message_counter2.count(), 0u);
227 simulated_event_loop_factory.RunFor(chrono::seconds(1));
228
229 EXPECT_EQ(test_message_counter1.count(), 1u);
230 EXPECT_EQ(test_message_counter2.count(), 0u);
231}
232
233// Test that creating an event loop while running dies.
234TEST(SimulatedEventLoopDeathTest, MakeEventLoopWhileRunning) {
235 SimulatedEventLoopTestFactory factory;
236
237 SimulatedEventLoopFactory simulated_event_loop_factory(
238 factory.configuration());
239
240 ::std::unique_ptr<EventLoop> event_loop =
241 simulated_event_loop_factory.MakeEventLoop("ping");
242
243 auto timer = event_loop->AddTimer([&]() {
244 EXPECT_DEATH(
245 {
246 ::std::unique_ptr<EventLoop> event_loop2 =
247 simulated_event_loop_factory.MakeEventLoop("ping");
248 },
249 "event loop while running");
250 simulated_event_loop_factory.Exit();
251 });
252
253 event_loop->OnRun([&event_loop, &timer] {
254 timer->Setup(event_loop->monotonic_now() + chrono::milliseconds(50));
255 });
256
257 simulated_event_loop_factory.Run();
258}
259
260// Test that creating a watcher after running dies.
261TEST(SimulatedEventLoopDeathTest, MakeWatcherAfterRunning) {
262 SimulatedEventLoopTestFactory factory;
263
264 SimulatedEventLoopFactory simulated_event_loop_factory(
265 factory.configuration());
266
267 ::std::unique_ptr<EventLoop> event_loop =
268 simulated_event_loop_factory.MakeEventLoop("ping");
269
270 simulated_event_loop_factory.RunFor(chrono::seconds(1));
271
272 EXPECT_DEATH(
273 { MessageCounter<TestMessage> counter(event_loop.get(), "/test"); },
274 "Can't add a watcher after running");
275
276 ::std::unique_ptr<EventLoop> event_loop2 =
277 simulated_event_loop_factory.MakeEventLoop("ping");
278
279 simulated_event_loop_factory.RunFor(chrono::seconds(1));
280
281 EXPECT_DEATH(
282 { MessageCounter<TestMessage> counter(event_loop2.get(), "/test"); },
283 "Can't add a watcher after running");
284}
285
Austin Schuh44019f92019-05-19 19:58:27 -0700286// Test that running for a time period with no handlers causes time to progress
287// correctly.
288TEST(SimulatedEventLoopTest, RunForNoHandlers) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800289 SimulatedEventLoopTestFactory factory;
290
291 SimulatedEventLoopFactory simulated_event_loop_factory(
292 factory.configuration());
Austin Schuh44019f92019-05-19 19:58:27 -0700293 ::std::unique_ptr<EventLoop> event_loop =
Austin Schuh5f1cc5c2019-12-01 18:01:11 -0800294 simulated_event_loop_factory.MakeEventLoop("loop");
Austin Schuh44019f92019-05-19 19:58:27 -0700295
296 simulated_event_loop_factory.RunFor(chrono::seconds(1));
297
298 EXPECT_EQ(::aos::monotonic_clock::epoch() + chrono::seconds(1),
Austin Schuh44019f92019-05-19 19:58:27 -0700299 event_loop->monotonic_now());
300}
301
302// Test that running for a time with a periodic handler causes time to end
303// correctly.
304TEST(SimulatedEventLoopTest, RunForTimerHandler) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800305 SimulatedEventLoopTestFactory factory;
306
307 SimulatedEventLoopFactory simulated_event_loop_factory(
308 factory.configuration());
Austin Schuh44019f92019-05-19 19:58:27 -0700309 ::std::unique_ptr<EventLoop> event_loop =
Austin Schuh5f1cc5c2019-12-01 18:01:11 -0800310 simulated_event_loop_factory.MakeEventLoop("loop");
Austin Schuh44019f92019-05-19 19:58:27 -0700311
312 int counter = 0;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700313 auto timer = event_loop->AddTimer([&counter]() { ++counter; });
Austin Schuh44019f92019-05-19 19:58:27 -0700314 event_loop->OnRun([&event_loop, &timer] {
315 timer->Setup(event_loop->monotonic_now() + chrono::milliseconds(50),
316 chrono::milliseconds(100));
317 });
318
319 simulated_event_loop_factory.RunFor(chrono::seconds(1));
320
321 EXPECT_EQ(::aos::monotonic_clock::epoch() + chrono::seconds(1),
Austin Schuh44019f92019-05-19 19:58:27 -0700322 event_loop->monotonic_now());
323 EXPECT_EQ(counter, 10);
324}
325
Austin Schuh7d87b672019-12-01 20:23:49 -0800326// Tests that watchers have latency in simulation.
327TEST(SimulatedEventLoopTest, WatcherTimingReport) {
328 SimulatedEventLoopTestFactory factory;
329 factory.set_send_delay(std::chrono::microseconds(50));
330
331 FLAGS_timing_report_ms = 1000;
332 auto loop1 = factory.MakePrimary("primary");
333 loop1->MakeWatcher("/test", [](const TestMessage &) {});
334
335 auto loop2 = factory.Make("sender_loop");
336
337 auto loop3 = factory.Make("report_fetcher");
338
339 Fetcher<timing::Report> report_fetcher =
340 loop3->MakeFetcher<timing::Report>("/aos");
341 EXPECT_FALSE(report_fetcher.Fetch());
342
343 auto sender = loop2->MakeSender<TestMessage>("/test");
344
345 // Send 10 messages in the middle of a timing report period so we get
346 // something interesting back.
347 auto test_timer = loop2->AddTimer([&sender]() {
348 for (int i = 0; i < 10; ++i) {
349 aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
350 TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
351 builder.add_value(200 + i);
milind1f1dca32021-07-03 13:50:07 -0700352 msg.CheckOk(msg.Send(builder.Finish()));
Austin Schuh7d87b672019-12-01 20:23:49 -0800353 }
354 });
355
356 // Quit after 1 timing report, mid way through the next cycle.
357 {
358 auto end_timer = loop1->AddTimer([&factory]() { factory.Exit(); });
359 end_timer->Setup(loop1->monotonic_now() + chrono::milliseconds(2500));
360 end_timer->set_name("end");
361 }
362
363 loop1->OnRun([&test_timer, &loop1]() {
364 test_timer->Setup(loop1->monotonic_now() + chrono::milliseconds(1500));
365 });
366
367 factory.Run();
368
369 // And, since we are here, check that the timing report makes sense.
370 // Start by looking for our event loop's timing.
371 FlatbufferDetachedBuffer<timing::Report> primary_report =
372 FlatbufferDetachedBuffer<timing::Report>::Empty();
373 while (report_fetcher.FetchNext()) {
374 LOG(INFO) << "Report " << FlatbufferToJson(report_fetcher.get());
375 if (report_fetcher->name()->string_view() == "primary") {
376 primary_report = CopyFlatBuffer(report_fetcher.get());
377 }
378 }
379
380 // Check the watcher report.
Ravago Jonescf453ab2020-05-06 21:14:53 -0700381 VLOG(1) << FlatbufferToJson(primary_report, {.multi_line = true});
Austin Schuh7d87b672019-12-01 20:23:49 -0800382
383 EXPECT_EQ(primary_report.message().name()->string_view(), "primary");
384
385 // Just the timing report timer.
386 ASSERT_NE(primary_report.message().timers(), nullptr);
387 EXPECT_EQ(primary_report.message().timers()->size(), 2);
388
389 // No phased loops
390 ASSERT_EQ(primary_report.message().phased_loops(), nullptr);
391
392 // And now confirm that the watcher received all 10 messages, and has latency.
393 ASSERT_NE(primary_report.message().watchers(), nullptr);
394 ASSERT_EQ(primary_report.message().watchers()->size(), 1);
395 EXPECT_EQ(primary_report.message().watchers()->Get(0)->count(), 10);
396 EXPECT_NEAR(
397 primary_report.message().watchers()->Get(0)->wakeup_latency()->average(),
398 0.00005, 1e-9);
399 EXPECT_NEAR(
400 primary_report.message().watchers()->Get(0)->wakeup_latency()->min(),
401 0.00005, 1e-9);
402 EXPECT_NEAR(
403 primary_report.message().watchers()->Get(0)->wakeup_latency()->max(),
404 0.00005, 1e-9);
405 EXPECT_EQ(primary_report.message()
406 .watchers()
407 ->Get(0)
408 ->wakeup_latency()
409 ->standard_deviation(),
410 0.0);
411
412 EXPECT_EQ(
413 primary_report.message().watchers()->Get(0)->handler_time()->average(),
414 0.0);
415 EXPECT_EQ(primary_report.message().watchers()->Get(0)->handler_time()->min(),
416 0.0);
417 EXPECT_EQ(primary_report.message().watchers()->Get(0)->handler_time()->max(),
418 0.0);
419 EXPECT_EQ(primary_report.message()
420 .watchers()
421 ->Get(0)
422 ->handler_time()
423 ->standard_deviation(),
424 0.0);
425}
426
Austin Schuh89c9b812021-02-20 14:42:10 -0800427size_t CountAll(
428 const std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
429 &counters) {
430 size_t count = 0u;
431 for (const std::unique_ptr<MessageCounter<RemoteMessage>> &counter :
432 counters) {
433 count += counter->count();
434 }
435 return count;
436}
437
Austin Schuh4c3b9702020-08-30 11:34:55 -0700438// Tests that ping and pong work when on 2 different nodes, and the message
439// gateway messages are sent out as expected.
Austin Schuh89c9b812021-02-20 14:42:10 -0800440TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodePingPong) {
Austin Schuh898f4972020-01-11 17:21:25 -0800441 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
442 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
Austin Schuh4c3b9702020-08-30 11:34:55 -0700443 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
Austin Schuh898f4972020-01-11 17:21:25 -0800444
445 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
446
447 std::unique_ptr<EventLoop> ping_event_loop =
448 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
449 Ping ping(ping_event_loop.get());
450
451 std::unique_ptr<EventLoop> pong_event_loop =
452 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
453 Pong pong(pong_event_loop.get());
454
455 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
456 simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700457 MessageCounter<examples::Pong> pi2_pong_counter(
458 pi2_pong_counter_event_loop.get(), "/test");
Austin Schuh2f8fd752020-09-01 22:38:28 -0700459 aos::Fetcher<message_bridge::Timestamp> pi1_on_pi2_timestamp_fetcher =
460 pi2_pong_counter_event_loop->MakeFetcher<message_bridge::Timestamp>(
461 "/pi1/aos");
462 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
463 pi2_pong_counter_event_loop->MakeFetcher<examples::Ping>("/test");
Austin Schuh898f4972020-01-11 17:21:25 -0800464
Austin Schuh4c3b9702020-08-30 11:34:55 -0700465 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
466 simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
Austin Schuh898f4972020-01-11 17:21:25 -0800467
468 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
469 simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700470 MessageCounter<examples::Pong> pi1_pong_counter(
471 pi1_pong_counter_event_loop.get(), "/test");
Austin Schuh2f8fd752020-09-01 22:38:28 -0700472 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
473 pi1_pong_counter_event_loop->MakeFetcher<examples::Ping>("/test");
474 aos::Fetcher<message_bridge::Timestamp> pi1_on_pi1_timestamp_fetcher =
475 pi1_pong_counter_event_loop->MakeFetcher<message_bridge::Timestamp>(
476 "/aos");
477
Austin Schuh4c3b9702020-08-30 11:34:55 -0700478 // Count timestamps.
479 MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
480 pi1_pong_counter_event_loop.get(), "/pi1/aos");
481 MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
482 pi2_pong_counter_event_loop.get(), "/pi1/aos");
483 MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
484 pi3_pong_counter_event_loop.get(), "/pi1/aos");
485 MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
486 pi1_pong_counter_event_loop.get(), "/pi2/aos");
487 MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
488 pi2_pong_counter_event_loop.get(), "/pi2/aos");
489 MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
490 pi1_pong_counter_event_loop.get(), "/pi3/aos");
491 MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
492 pi3_pong_counter_event_loop.get(), "/pi3/aos");
493
Austin Schuh2f8fd752020-09-01 22:38:28 -0700494 // Count remote timestamps
Austin Schuh89c9b812021-02-20 14:42:10 -0800495 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
496 remote_timestamps_pi2_on_pi1 =
497 MakePi2OnPi1MessageCounters(pi1_pong_counter_event_loop.get());
498 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
499 remote_timestamps_pi1_on_pi2 =
500 MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700501
Austin Schuh4c3b9702020-08-30 11:34:55 -0700502 // Wait to let timestamp estimation start up before looking for the results.
503 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
504
Austin Schuh8fb315a2020-11-19 22:33:58 -0800505 std::unique_ptr<EventLoop> pi1_statistics_counter_event_loop =
506 simulated_event_loop_factory.MakeEventLoop("pi1_statistics_counter", pi1);
507 std::unique_ptr<EventLoop> pi2_statistics_counter_event_loop =
508 simulated_event_loop_factory.MakeEventLoop("pi2_statistics_counter", pi2);
509 std::unique_ptr<EventLoop> pi3_statistics_counter_event_loop =
510 simulated_event_loop_factory.MakeEventLoop("pi3_statistics_counter", pi3);
511
Austin Schuh4c3b9702020-08-30 11:34:55 -0700512 int pi1_server_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800513 pi1_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700514 "/pi1/aos", [&pi1_server_statistics_count](
515 const message_bridge::ServerStatistics &stats) {
516 VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
517 EXPECT_EQ(stats.connections()->size(), 2u);
518 for (const message_bridge::ServerConnection *connection :
519 *stats.connections()) {
520 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800521 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700522 if (connection->node()->name()->string_view() == "pi2") {
523 EXPECT_GT(connection->sent_packets(), 50);
524 } else if (connection->node()->name()->string_view() == "pi3") {
525 EXPECT_GE(connection->sent_packets(), 5);
526 } else {
527 LOG(FATAL) << "Unknown connection";
528 }
529
530 EXPECT_TRUE(connection->has_monotonic_offset());
531 EXPECT_EQ(connection->monotonic_offset(), 0);
532 }
533 ++pi1_server_statistics_count;
534 });
535
536 int pi2_server_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800537 pi2_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700538 "/pi2/aos", [&pi2_server_statistics_count](
539 const message_bridge::ServerStatistics &stats) {
540 VLOG(1) << "pi2 ServerStatistics " << FlatbufferToJson(&stats);
541 EXPECT_EQ(stats.connections()->size(), 1u);
542
543 const message_bridge::ServerConnection *connection =
544 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800545 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700546 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
547 EXPECT_GT(connection->sent_packets(), 50);
548 EXPECT_TRUE(connection->has_monotonic_offset());
549 EXPECT_EQ(connection->monotonic_offset(), 0);
550 ++pi2_server_statistics_count;
551 });
552
553 int pi3_server_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800554 pi3_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700555 "/pi3/aos", [&pi3_server_statistics_count](
556 const message_bridge::ServerStatistics &stats) {
557 VLOG(1) << "pi3 ServerStatistics " << FlatbufferToJson(&stats);
558 EXPECT_EQ(stats.connections()->size(), 1u);
559
560 const message_bridge::ServerConnection *connection =
561 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800562 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700563 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
564 EXPECT_GE(connection->sent_packets(), 5);
565 EXPECT_TRUE(connection->has_monotonic_offset());
566 EXPECT_EQ(connection->monotonic_offset(), 0);
567 ++pi3_server_statistics_count;
568 });
569
570 int pi1_client_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800571 pi1_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700572 "/pi1/aos", [&pi1_client_statistics_count](
573 const message_bridge::ClientStatistics &stats) {
574 VLOG(1) << "pi1 ClientStatistics " << FlatbufferToJson(&stats);
575 EXPECT_EQ(stats.connections()->size(), 2u);
576
577 for (const message_bridge::ClientConnection *connection :
578 *stats.connections()) {
579 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
580 if (connection->node()->name()->string_view() == "pi2") {
581 EXPECT_GT(connection->received_packets(), 50);
582 } else if (connection->node()->name()->string_view() == "pi3") {
583 EXPECT_GE(connection->received_packets(), 5);
584 } else {
585 LOG(FATAL) << "Unknown connection";
586 }
587
Austin Schuhe61d4382021-03-31 21:33:02 -0700588 EXPECT_EQ(connection->partial_deliveries(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700589 EXPECT_TRUE(connection->has_monotonic_offset());
590 EXPECT_EQ(connection->monotonic_offset(), 150000);
591 }
592 ++pi1_client_statistics_count;
593 });
594
595 int pi2_client_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800596 pi2_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700597 "/pi2/aos", [&pi2_client_statistics_count](
598 const message_bridge::ClientStatistics &stats) {
599 VLOG(1) << "pi2 ClientStatistics " << FlatbufferToJson(&stats);
600 EXPECT_EQ(stats.connections()->size(), 1u);
601
602 const message_bridge::ClientConnection *connection =
603 stats.connections()->Get(0);
604 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
605 EXPECT_GT(connection->received_packets(), 50);
Austin Schuhe61d4382021-03-31 21:33:02 -0700606 EXPECT_EQ(connection->partial_deliveries(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700607 EXPECT_TRUE(connection->has_monotonic_offset());
608 EXPECT_EQ(connection->monotonic_offset(), 150000);
609 ++pi2_client_statistics_count;
610 });
611
612 int pi3_client_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800613 pi3_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700614 "/pi3/aos", [&pi3_client_statistics_count](
615 const message_bridge::ClientStatistics &stats) {
616 VLOG(1) << "pi3 ClientStatistics " << FlatbufferToJson(&stats);
617 EXPECT_EQ(stats.connections()->size(), 1u);
618
619 const message_bridge::ClientConnection *connection =
620 stats.connections()->Get(0);
621 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
622 EXPECT_GE(connection->received_packets(), 5);
Austin Schuhe61d4382021-03-31 21:33:02 -0700623 EXPECT_EQ(connection->partial_deliveries(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700624 EXPECT_TRUE(connection->has_monotonic_offset());
625 EXPECT_EQ(connection->monotonic_offset(), 150000);
626 ++pi3_client_statistics_count;
627 });
628
Austin Schuh2f8fd752020-09-01 22:38:28 -0700629 // Find the channel index for both the /pi1/aos Timestamp channel and Ping
630 // channel.
631 const size_t pi1_timestamp_channel =
632 configuration::ChannelIndex(pi1_pong_counter_event_loop->configuration(),
633 pi1_on_pi2_timestamp_fetcher.channel());
634 const size_t ping_timestamp_channel =
635 configuration::ChannelIndex(pi1_pong_counter_event_loop->configuration(),
636 ping_on_pi2_fetcher.channel());
637
638 for (const Channel *channel :
639 *pi1_pong_counter_event_loop->configuration()->channels()) {
640 VLOG(1) << "Channel "
641 << configuration::ChannelIndex(
642 pi1_pong_counter_event_loop->configuration(), channel)
643 << " " << configuration::CleanedChannelToString(channel);
644 }
645
Austin Schuh8fb315a2020-11-19 22:33:58 -0800646 std::unique_ptr<EventLoop> pi1_remote_timestamp =
647 simulated_event_loop_factory.MakeEventLoop("pi1_remote_timestamp", pi1);
648
Austin Schuh89c9b812021-02-20 14:42:10 -0800649 for (std::pair<int, std::string> channel :
650 shared()
651 ? std::vector<std::pair<
652 int, std::string>>{{-1, "/pi1/aos/remote_timestamps/pi2"}}
653 : std::vector<std::pair<int, std::string>>{
654 {pi1_timestamp_channel,
655 "/pi1/aos/remote_timestamps/pi2/pi1/aos/"
656 "aos-message_bridge-Timestamp"},
657 {ping_timestamp_channel,
658 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping"}}) {
659 // For each remote timestamp we get back, confirm that it is either a ping
660 // message, or a timestamp we sent out. Also confirm that the timestamps
661 // are correct.
662 pi1_remote_timestamp->MakeWatcher(
663 channel.second,
664 [pi1_timestamp_channel, ping_timestamp_channel, &ping_on_pi2_fetcher,
665 &ping_on_pi1_fetcher, &pi1_on_pi2_timestamp_fetcher,
666 &pi1_on_pi1_timestamp_fetcher, &simulated_event_loop_factory, pi2,
667 channel_index = channel.first](const RemoteMessage &header) {
668 VLOG(1) << aos::FlatbufferToJson(&header);
669 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuhcdd90272021-03-15 12:46:16 -0700670 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
Austin Schuh89c9b812021-02-20 14:42:10 -0800671 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
Austin Schuhcdd90272021-03-15 12:46:16 -0700672 ->boot_uuid());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700673
Austin Schuh89c9b812021-02-20 14:42:10 -0800674 const aos::monotonic_clock::time_point header_monotonic_sent_time(
675 chrono::nanoseconds(header.monotonic_sent_time()));
676 const aos::realtime_clock::time_point header_realtime_sent_time(
677 chrono::nanoseconds(header.realtime_sent_time()));
678 const aos::monotonic_clock::time_point header_monotonic_remote_time(
679 chrono::nanoseconds(header.monotonic_remote_time()));
680 const aos::realtime_clock::time_point header_realtime_remote_time(
681 chrono::nanoseconds(header.realtime_remote_time()));
Austin Schuh2f8fd752020-09-01 22:38:28 -0700682
Austin Schuh89c9b812021-02-20 14:42:10 -0800683 if (channel_index != -1) {
684 ASSERT_EQ(channel_index, header.channel_index());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700685 }
686
Austin Schuh89c9b812021-02-20 14:42:10 -0800687 const Context *pi1_context = nullptr;
688 const Context *pi2_context = nullptr;
689
690 if (header.channel_index() == pi1_timestamp_channel) {
691 // Find the forwarded message.
692 while (pi1_on_pi2_timestamp_fetcher.context().monotonic_event_time <
693 header_monotonic_sent_time) {
694 ASSERT_TRUE(pi1_on_pi2_timestamp_fetcher.FetchNext());
695 }
696
697 // And the source message.
698 while (pi1_on_pi1_timestamp_fetcher.context().monotonic_event_time <
699 header_monotonic_remote_time) {
700 ASSERT_TRUE(pi1_on_pi1_timestamp_fetcher.FetchNext());
701 }
702
703 pi1_context = &pi1_on_pi1_timestamp_fetcher.context();
704 pi2_context = &pi1_on_pi2_timestamp_fetcher.context();
705 } else if (header.channel_index() == ping_timestamp_channel) {
706 // Find the forwarded message.
707 while (ping_on_pi2_fetcher.context().monotonic_event_time <
708 header_monotonic_sent_time) {
709 ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
710 }
711
712 // And the source message.
713 while (ping_on_pi1_fetcher.context().monotonic_event_time <
714 header_monotonic_remote_time) {
715 ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
716 }
717
718 pi1_context = &ping_on_pi1_fetcher.context();
719 pi2_context = &ping_on_pi2_fetcher.context();
720 } else {
721 LOG(FATAL) << "Unknown channel";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700722 }
723
Austin Schuh89c9b812021-02-20 14:42:10 -0800724 // Confirm the forwarded message has matching timestamps to the
725 // timestamps we got back.
726 EXPECT_EQ(pi2_context->queue_index, header.queue_index());
727 EXPECT_EQ(pi2_context->remote_queue_index,
728 header.remote_queue_index());
729 EXPECT_EQ(pi2_context->monotonic_event_time,
730 header_monotonic_sent_time);
731 EXPECT_EQ(pi2_context->realtime_event_time,
732 header_realtime_sent_time);
733 EXPECT_EQ(pi2_context->realtime_remote_time,
734 header_realtime_remote_time);
735 EXPECT_EQ(pi2_context->monotonic_remote_time,
736 header_monotonic_remote_time);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700737
Austin Schuh89c9b812021-02-20 14:42:10 -0800738 // Confirm the forwarded message also matches the source message.
739 EXPECT_EQ(pi1_context->queue_index, header.remote_queue_index());
740 EXPECT_EQ(pi1_context->monotonic_event_time,
741 header_monotonic_remote_time);
742 EXPECT_EQ(pi1_context->realtime_event_time,
743 header_realtime_remote_time);
744 });
745 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700746
Austin Schuh4c3b9702020-08-30 11:34:55 -0700747 simulated_event_loop_factory.RunFor(chrono::seconds(10) -
748 chrono::milliseconds(500) +
749 chrono::milliseconds(5));
750
751 EXPECT_EQ(pi1_pong_counter.count(), 1001);
752 EXPECT_EQ(pi2_pong_counter.count(), 1001);
753
754 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 100);
755 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 100);
756 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 100);
757 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 100);
758 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 100);
759 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 100);
760 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 100);
761
Austin Schuh20ac95d2020-12-05 17:24:19 -0800762 EXPECT_EQ(pi1_server_statistics_count, 10);
763 EXPECT_EQ(pi2_server_statistics_count, 10);
764 EXPECT_EQ(pi3_server_statistics_count, 10);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700765
766 EXPECT_EQ(pi1_client_statistics_count, 95);
767 EXPECT_EQ(pi2_client_statistics_count, 95);
768 EXPECT_EQ(pi3_client_statistics_count, 95);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700769
770 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -0800771 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 1101);
772 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 1101);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700773}
774
775// Tests that an offset between nodes can be recovered and shows up in
776// ServerStatistics correctly.
777TEST(SimulatedEventLoopTest, MultinodePingPongWithOffset) {
778 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
Austin Schuh373f1762021-06-02 21:07:09 -0700779 aos::configuration::ReadConfig(ArtifactPath(
780 "aos/events/multinode_pingpong_test_combined_config.json"));
Austin Schuh4c3b9702020-08-30 11:34:55 -0700781 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
Austin Schuh87dd3832021-01-01 23:07:31 -0800782 const size_t pi1_index = configuration::GetNodeIndex(&config.message(), pi1);
783 ASSERT_EQ(pi1_index, 0u);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700784 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
Austin Schuh87dd3832021-01-01 23:07:31 -0800785 const size_t pi2_index = configuration::GetNodeIndex(&config.message(), pi2);
786 ASSERT_EQ(pi2_index, 1u);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700787 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
Austin Schuh87dd3832021-01-01 23:07:31 -0800788 const size_t pi3_index = configuration::GetNodeIndex(&config.message(), pi3);
789 ASSERT_EQ(pi3_index, 2u);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700790
Austin Schuh87dd3832021-01-01 23:07:31 -0800791 message_bridge::TestingTimeConverter time(
792 configuration::NodesCount(&config.message()));
Austin Schuh4c3b9702020-08-30 11:34:55 -0700793 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
Austin Schuh58646e22021-08-23 23:51:46 -0700794 simulated_event_loop_factory.SetTimeConverter(&time);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700795
796 constexpr chrono::milliseconds kOffset{1501};
Austin Schuh87dd3832021-01-01 23:07:31 -0800797 time.AddNextTimestamp(
798 distributed_clock::epoch(),
Austin Schuh58646e22021-08-23 23:51:46 -0700799 {BootTimestamp::epoch(), BootTimestamp::epoch() + kOffset,
800 BootTimestamp::epoch()});
Austin Schuh4c3b9702020-08-30 11:34:55 -0700801
802 std::unique_ptr<EventLoop> ping_event_loop =
803 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
804 Ping ping(ping_event_loop.get());
805
806 std::unique_ptr<EventLoop> pong_event_loop =
807 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
808 Pong pong(pong_event_loop.get());
809
Austin Schuh8fb315a2020-11-19 22:33:58 -0800810 // Wait to let timestamp estimation start up before looking for the results.
811 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
812
Austin Schuh87dd3832021-01-01 23:07:31 -0800813 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
814 simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
815
Austin Schuh4c3b9702020-08-30 11:34:55 -0700816 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
817 simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
818
819 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
820 simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
821
Austin Schuh4c3b9702020-08-30 11:34:55 -0700822 // Confirm the offsets are being recovered correctly.
823 int pi1_server_statistics_count = 0;
824 pi1_pong_counter_event_loop->MakeWatcher(
825 "/pi1/aos", [&pi1_server_statistics_count,
826 kOffset](const message_bridge::ServerStatistics &stats) {
827 VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
828 EXPECT_EQ(stats.connections()->size(), 2u);
829 for (const message_bridge::ServerConnection *connection :
830 *stats.connections()) {
831 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800832 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700833 if (connection->node()->name()->string_view() == "pi2") {
834 EXPECT_EQ(connection->monotonic_offset(),
835 chrono::nanoseconds(kOffset).count());
836 } else if (connection->node()->name()->string_view() == "pi3") {
837 EXPECT_EQ(connection->monotonic_offset(), 0);
838 } else {
839 LOG(FATAL) << "Unknown connection";
840 }
841
842 EXPECT_TRUE(connection->has_monotonic_offset());
843 }
844 ++pi1_server_statistics_count;
845 });
846
847 int pi2_server_statistics_count = 0;
848 pi2_pong_counter_event_loop->MakeWatcher(
849 "/pi2/aos", [&pi2_server_statistics_count,
850 kOffset](const message_bridge::ServerStatistics &stats) {
851 VLOG(1) << "pi2 ServerStatistics " << FlatbufferToJson(&stats);
852 EXPECT_EQ(stats.connections()->size(), 1u);
853
854 const message_bridge::ServerConnection *connection =
855 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800856 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700857 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
858 EXPECT_TRUE(connection->has_monotonic_offset());
859 EXPECT_EQ(connection->monotonic_offset(),
860 -chrono::nanoseconds(kOffset).count());
861 ++pi2_server_statistics_count;
862 });
863
864 int pi3_server_statistics_count = 0;
865 pi3_pong_counter_event_loop->MakeWatcher(
866 "/pi3/aos", [&pi3_server_statistics_count](
867 const message_bridge::ServerStatistics &stats) {
868 VLOG(1) << "pi3 ServerStatistics " << FlatbufferToJson(&stats);
869 EXPECT_EQ(stats.connections()->size(), 1u);
870
871 const message_bridge::ServerConnection *connection =
872 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800873 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700874 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
875 EXPECT_TRUE(connection->has_monotonic_offset());
876 EXPECT_EQ(connection->monotonic_offset(), 0);
877 ++pi3_server_statistics_count;
878 });
879
880 simulated_event_loop_factory.RunFor(chrono::seconds(10) -
881 chrono::milliseconds(500) +
882 chrono::milliseconds(5));
883
Austin Schuh20ac95d2020-12-05 17:24:19 -0800884 EXPECT_EQ(pi1_server_statistics_count, 10);
Austin Schuh58646e22021-08-23 23:51:46 -0700885 EXPECT_EQ(pi2_server_statistics_count, 10);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800886 EXPECT_EQ(pi3_server_statistics_count, 10);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700887}
888
889// Test that disabling statistics actually disables them.
Austin Schuh89c9b812021-02-20 14:42:10 -0800890TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeWithoutStatistics) {
Austin Schuh4c3b9702020-08-30 11:34:55 -0700891 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
892 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
893 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
894
895 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
896 simulated_event_loop_factory.DisableStatistics();
897
898 std::unique_ptr<EventLoop> ping_event_loop =
899 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
900 Ping ping(ping_event_loop.get());
901
902 std::unique_ptr<EventLoop> pong_event_loop =
903 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
904 Pong pong(pong_event_loop.get());
905
906 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
907 simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
908
909 MessageCounter<examples::Pong> pi2_pong_counter(
910 pi2_pong_counter_event_loop.get(), "/test");
911
912 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
913 simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
914
915 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
916 simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
917
918 MessageCounter<examples::Pong> pi1_pong_counter(
919 pi1_pong_counter_event_loop.get(), "/test");
920
921 // Count timestamps.
922 MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
923 pi1_pong_counter_event_loop.get(), "/pi1/aos");
924 MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
925 pi2_pong_counter_event_loop.get(), "/pi1/aos");
926 MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
927 pi3_pong_counter_event_loop.get(), "/pi1/aos");
928 MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
929 pi1_pong_counter_event_loop.get(), "/pi2/aos");
930 MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
931 pi2_pong_counter_event_loop.get(), "/pi2/aos");
932 MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
933 pi1_pong_counter_event_loop.get(), "/pi3/aos");
934 MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
935 pi3_pong_counter_event_loop.get(), "/pi3/aos");
936
Austin Schuh2f8fd752020-09-01 22:38:28 -0700937 // Count remote timestamps
Austin Schuh89c9b812021-02-20 14:42:10 -0800938 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
939 remote_timestamps_pi2_on_pi1 =
940 MakePi2OnPi1MessageCounters(pi1_pong_counter_event_loop.get());
941 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
942 remote_timestamps_pi1_on_pi2 =
943 MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700944
Austin Schuh4c3b9702020-08-30 11:34:55 -0700945 MessageCounter<message_bridge::ServerStatistics>
946 pi1_server_statistics_counter(pi1_pong_counter_event_loop.get(),
947 "/pi1/aos");
948 MessageCounter<message_bridge::ServerStatistics>
949 pi2_server_statistics_counter(pi2_pong_counter_event_loop.get(),
950 "/pi2/aos");
951 MessageCounter<message_bridge::ServerStatistics>
952 pi3_server_statistics_counter(pi3_pong_counter_event_loop.get(),
953 "/pi3/aos");
954
955 MessageCounter<message_bridge::ClientStatistics>
956 pi1_client_statistics_counter(pi1_pong_counter_event_loop.get(),
957 "/pi1/aos");
958 MessageCounter<message_bridge::ClientStatistics>
959 pi2_client_statistics_counter(pi2_pong_counter_event_loop.get(),
960 "/pi2/aos");
961 MessageCounter<message_bridge::ClientStatistics>
962 pi3_client_statistics_counter(pi3_pong_counter_event_loop.get(),
963 "/pi3/aos");
Austin Schuh898f4972020-01-11 17:21:25 -0800964
965 simulated_event_loop_factory.RunFor(chrono::seconds(10) +
966 chrono::milliseconds(5));
967
Austin Schuh4c3b9702020-08-30 11:34:55 -0700968 EXPECT_EQ(pi1_pong_counter.count(), 1001u);
969 EXPECT_EQ(pi2_pong_counter.count(), 1001u);
970
971 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 0u);
972 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 0u);
973 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 0u);
974 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 0u);
975 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 0u);
976 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 0u);
977 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 0u);
978
979 EXPECT_EQ(pi1_server_statistics_counter.count(), 0u);
980 EXPECT_EQ(pi2_server_statistics_counter.count(), 0u);
981 EXPECT_EQ(pi3_server_statistics_counter.count(), 0u);
982
983 EXPECT_EQ(pi1_client_statistics_counter.count(), 0u);
984 EXPECT_EQ(pi2_client_statistics_counter.count(), 0u);
985 EXPECT_EQ(pi3_client_statistics_counter.count(), 0u);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700986
987 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -0800988 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 1001);
989 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 1001);
Austin Schuh898f4972020-01-11 17:21:25 -0800990}
991
Austin Schuhc0b0f722020-12-12 18:36:06 -0800992bool AllConnected(const message_bridge::ServerStatistics *server_statistics) {
993 for (const message_bridge::ServerConnection *connection :
994 *server_statistics->connections()) {
995 if (connection->state() != message_bridge::State::CONNECTED) {
996 return false;
997 }
998 }
999 return true;
1000}
1001
1002bool AllConnectedBut(const message_bridge::ServerStatistics *server_statistics,
1003 std::string_view target) {
1004 for (const message_bridge::ServerConnection *connection :
1005 *server_statistics->connections()) {
1006 if (connection->node()->name()->string_view() == target) {
1007 if (connection->state() == message_bridge::State::CONNECTED) {
1008 return false;
1009 }
1010 } else {
1011 if (connection->state() != message_bridge::State::CONNECTED) {
1012 return false;
1013 }
1014 }
1015 }
1016 return true;
1017}
1018
1019bool AllConnected(const message_bridge::ClientStatistics *client_statistics) {
1020 for (const message_bridge::ClientConnection *connection :
1021 *client_statistics->connections()) {
1022 if (connection->state() != message_bridge::State::CONNECTED) {
1023 return false;
1024 }
1025 }
1026 return true;
1027}
1028
1029bool AllConnectedBut(const message_bridge::ClientStatistics *client_statistics,
1030 std::string_view target) {
1031 for (const message_bridge::ClientConnection *connection :
1032 *client_statistics->connections()) {
1033 if (connection->node()->name()->string_view() == target) {
1034 if (connection->state() == message_bridge::State::CONNECTED) {
1035 return false;
1036 }
1037 } else {
1038 if (connection->state() != message_bridge::State::CONNECTED) {
1039 return false;
1040 }
1041 }
1042 }
1043 return true;
1044}
1045
1046// Test that disconnecting nodes actually disconnects them.
Austin Schuh89c9b812021-02-20 14:42:10 -08001047TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeDisconnect) {
Austin Schuhc0b0f722020-12-12 18:36:06 -08001048 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
1049
Austin Schuh58646e22021-08-23 23:51:46 -07001050 NodeEventLoopFactory *pi1 =
1051 simulated_event_loop_factory.GetNodeEventLoopFactory("pi1");
1052 NodeEventLoopFactory *pi2 =
1053 simulated_event_loop_factory.GetNodeEventLoopFactory("pi2");
1054 NodeEventLoopFactory *pi3 =
1055 simulated_event_loop_factory.GetNodeEventLoopFactory("pi3");
1056
1057 std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001058 Ping ping(ping_event_loop.get());
1059
Austin Schuh58646e22021-08-23 23:51:46 -07001060 std::unique_ptr<EventLoop> pong_event_loop = pi2->MakeEventLoop("pong");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001061 Pong pong(pong_event_loop.get());
1062
1063 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
Austin Schuh58646e22021-08-23 23:51:46 -07001064 pi2->MakeEventLoop("pi2_pong_counter");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001065
1066 MessageCounter<examples::Pong> pi2_pong_counter(
1067 pi2_pong_counter_event_loop.get(), "/test");
1068
1069 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
Austin Schuh58646e22021-08-23 23:51:46 -07001070 pi3->MakeEventLoop("pi3_pong_counter");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001071
1072 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
Austin Schuh58646e22021-08-23 23:51:46 -07001073 pi1->MakeEventLoop("pi1_pong_counter");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001074
1075 MessageCounter<examples::Pong> pi1_pong_counter(
1076 pi1_pong_counter_event_loop.get(), "/test");
1077
1078 // Count timestamps.
1079 MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
1080 pi1_pong_counter_event_loop.get(), "/pi1/aos");
1081 MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
1082 pi2_pong_counter_event_loop.get(), "/pi1/aos");
1083 MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
1084 pi3_pong_counter_event_loop.get(), "/pi1/aos");
1085 MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
1086 pi1_pong_counter_event_loop.get(), "/pi2/aos");
1087 MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
1088 pi2_pong_counter_event_loop.get(), "/pi2/aos");
1089 MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
1090 pi1_pong_counter_event_loop.get(), "/pi3/aos");
1091 MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
1092 pi3_pong_counter_event_loop.get(), "/pi3/aos");
1093
1094 // Count remote timestamps
Austin Schuh89c9b812021-02-20 14:42:10 -08001095 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1096 remote_timestamps_pi2_on_pi1 =
1097 MakePi2OnPi1MessageCounters(pi1_pong_counter_event_loop.get());
1098 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1099 remote_timestamps_pi1_on_pi2 =
1100 MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001101
1102 MessageCounter<message_bridge::ServerStatistics>
Austin Schuh58646e22021-08-23 23:51:46 -07001103 *pi1_server_statistics_counter;
1104 pi1->OnStartup([pi1, &pi1_server_statistics_counter]() {
1105 pi1_server_statistics_counter =
1106 pi1->AlwaysStart<MessageCounter<message_bridge::ServerStatistics>>(
1107 "pi1_server_statistics_counter", "/pi1/aos");
1108 });
1109
Austin Schuhc0b0f722020-12-12 18:36:06 -08001110 aos::Fetcher<message_bridge::ServerStatistics> pi1_server_statistics_fetcher =
1111 pi1_pong_counter_event_loop
1112 ->MakeFetcher<message_bridge::ServerStatistics>("/pi1/aos");
1113 aos::Fetcher<message_bridge::ClientStatistics> pi1_client_statistics_fetcher =
1114 pi1_pong_counter_event_loop
1115 ->MakeFetcher<message_bridge::ClientStatistics>("/pi1/aos");
1116
1117 MessageCounter<message_bridge::ServerStatistics>
Austin Schuh58646e22021-08-23 23:51:46 -07001118 *pi2_server_statistics_counter;
1119 pi2->OnStartup([pi2, &pi2_server_statistics_counter]() {
1120 pi2_server_statistics_counter =
1121 pi2->AlwaysStart<MessageCounter<message_bridge::ServerStatistics>>(
1122 "pi2_server_statistics_counter", "/pi2/aos");
1123 });
Austin Schuhc0b0f722020-12-12 18:36:06 -08001124 aos::Fetcher<message_bridge::ServerStatistics> pi2_server_statistics_fetcher =
1125 pi2_pong_counter_event_loop
1126 ->MakeFetcher<message_bridge::ServerStatistics>("/pi2/aos");
1127 aos::Fetcher<message_bridge::ClientStatistics> pi2_client_statistics_fetcher =
1128 pi2_pong_counter_event_loop
1129 ->MakeFetcher<message_bridge::ClientStatistics>("/pi2/aos");
1130
1131 MessageCounter<message_bridge::ServerStatistics>
Austin Schuh58646e22021-08-23 23:51:46 -07001132 *pi3_server_statistics_counter;
1133 pi3->OnStartup([pi3, &pi3_server_statistics_counter]() {
1134 pi3_server_statistics_counter =
1135 pi3->AlwaysStart<MessageCounter<message_bridge::ServerStatistics>>(
1136 "pi3_server_statistics_counter", "/pi3/aos");
1137 });
Austin Schuhc0b0f722020-12-12 18:36:06 -08001138 aos::Fetcher<message_bridge::ServerStatistics> pi3_server_statistics_fetcher =
1139 pi3_pong_counter_event_loop
1140 ->MakeFetcher<message_bridge::ServerStatistics>("/pi3/aos");
1141 aos::Fetcher<message_bridge::ClientStatistics> pi3_client_statistics_fetcher =
1142 pi3_pong_counter_event_loop
1143 ->MakeFetcher<message_bridge::ClientStatistics>("/pi3/aos");
1144
1145 MessageCounter<message_bridge::ClientStatistics>
1146 pi1_client_statistics_counter(pi1_pong_counter_event_loop.get(),
1147 "/pi1/aos");
1148 MessageCounter<message_bridge::ClientStatistics>
1149 pi2_client_statistics_counter(pi2_pong_counter_event_loop.get(),
1150 "/pi2/aos");
1151 MessageCounter<message_bridge::ClientStatistics>
1152 pi3_client_statistics_counter(pi3_pong_counter_event_loop.get(),
1153 "/pi3/aos");
1154
1155 simulated_event_loop_factory.RunFor(chrono::seconds(2) +
1156 chrono::milliseconds(5));
1157
1158 EXPECT_EQ(pi1_pong_counter.count(), 201u);
1159 EXPECT_EQ(pi2_pong_counter.count(), 201u);
1160
1161 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 20u);
1162 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 20u);
1163 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 20u);
1164 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 20u);
1165 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 20u);
1166 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 20u);
1167 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 20u);
1168
Austin Schuh58646e22021-08-23 23:51:46 -07001169 EXPECT_EQ(pi1_server_statistics_counter->count(), 2u);
1170 EXPECT_EQ(pi2_server_statistics_counter->count(), 2u);
1171 EXPECT_EQ(pi3_server_statistics_counter->count(), 2u);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001172
1173 EXPECT_EQ(pi1_client_statistics_counter.count(), 20u);
1174 EXPECT_EQ(pi2_client_statistics_counter.count(), 20u);
1175 EXPECT_EQ(pi3_client_statistics_counter.count(), 20u);
1176
1177 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001178 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 221);
1179 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 221);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001180
1181 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1182 EXPECT_TRUE(AllConnected(pi1_server_statistics_fetcher.get()))
1183 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1184 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1185 EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
1186 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1187 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1188 EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
1189 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1190 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1191 EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
1192 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1193 EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
1194 EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
1195 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
1196 EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
1197 EXPECT_TRUE(AllConnected(pi3_client_statistics_fetcher.get()))
1198 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1199
Austin Schuh58646e22021-08-23 23:51:46 -07001200 pi1->Disconnect(pi3->node());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001201
1202 simulated_event_loop_factory.RunFor(chrono::seconds(2));
1203
1204 EXPECT_EQ(pi1_pong_counter.count(), 401u);
1205 EXPECT_EQ(pi2_pong_counter.count(), 401u);
1206
1207 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 40u);
1208 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 40u);
1209 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 20u);
1210 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 40u);
1211 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 40u);
1212 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 40u);
1213 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 40u);
1214
Austin Schuh58646e22021-08-23 23:51:46 -07001215 EXPECT_EQ(pi1_server_statistics_counter->count(), 4u);
1216 EXPECT_EQ(pi2_server_statistics_counter->count(), 4u);
1217 EXPECT_EQ(pi3_server_statistics_counter->count(), 4u);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001218
1219 EXPECT_EQ(pi1_client_statistics_counter.count(), 40u);
1220 EXPECT_EQ(pi2_client_statistics_counter.count(), 40u);
1221 EXPECT_EQ(pi3_client_statistics_counter.count(), 40u);
1222
1223 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001224 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 441);
1225 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 441);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001226
1227 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1228 EXPECT_TRUE(AllConnectedBut(pi1_server_statistics_fetcher.get(), "pi3"))
1229 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1230 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1231 EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
1232 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1233 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1234 EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
1235 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1236 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1237 EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
1238 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1239 EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
1240 EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
1241 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
1242 EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
1243 EXPECT_TRUE(AllConnectedBut(pi3_client_statistics_fetcher.get(), "pi1"))
1244 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1245
Austin Schuh58646e22021-08-23 23:51:46 -07001246 pi1->Connect(pi3->node());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001247
1248 simulated_event_loop_factory.RunFor(chrono::seconds(2));
1249
1250 EXPECT_EQ(pi1_pong_counter.count(), 601u);
1251 EXPECT_EQ(pi2_pong_counter.count(), 601u);
1252
1253 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 60u);
1254 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 60u);
1255 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 40u);
1256 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 60u);
1257 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 60u);
1258 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 60u);
1259 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 60u);
1260
Austin Schuh58646e22021-08-23 23:51:46 -07001261 EXPECT_EQ(pi1_server_statistics_counter->count(), 6u);
1262 EXPECT_EQ(pi2_server_statistics_counter->count(), 6u);
1263 EXPECT_EQ(pi3_server_statistics_counter->count(), 6u);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001264
1265 EXPECT_EQ(pi1_client_statistics_counter.count(), 60u);
1266 EXPECT_EQ(pi2_client_statistics_counter.count(), 60u);
1267 EXPECT_EQ(pi3_client_statistics_counter.count(), 60u);
1268
1269 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001270 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 661);
1271 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 661);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001272
1273 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1274 EXPECT_TRUE(AllConnected(pi1_server_statistics_fetcher.get()))
1275 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1276 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1277 EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
1278 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1279 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1280 EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
1281 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1282 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1283 EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
1284 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1285 EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
1286 EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
1287 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
1288 EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
1289 EXPECT_TRUE(AllConnected(pi3_client_statistics_fetcher.get()))
1290 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1291}
1292
Austin Schuh2febf0d2020-09-21 22:24:30 -07001293// Tests that the time offset having a slope doesn't break the world.
1294// SimulatedMessageBridge has enough self consistency CHECK statements to
1295// confirm, and we can can also check a message in each direction to make sure
1296// it gets delivered as expected.
1297TEST(SimulatedEventLoopTest, MultinodePingPongWithOffsetAndSlope) {
1298 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
Austin Schuh373f1762021-06-02 21:07:09 -07001299 aos::configuration::ReadConfig(ArtifactPath(
1300 "aos/events/multinode_pingpong_test_combined_config.json"));
Austin Schuh2febf0d2020-09-21 22:24:30 -07001301 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
Austin Schuh87dd3832021-01-01 23:07:31 -08001302 const size_t pi1_index = configuration::GetNodeIndex(&config.message(), pi1);
1303 ASSERT_EQ(pi1_index, 0u);
Austin Schuh2febf0d2020-09-21 22:24:30 -07001304 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
Austin Schuh87dd3832021-01-01 23:07:31 -08001305 const size_t pi2_index = configuration::GetNodeIndex(&config.message(), pi2);
1306 ASSERT_EQ(pi2_index, 1u);
1307 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
1308 const size_t pi3_index = configuration::GetNodeIndex(&config.message(), pi3);
1309 ASSERT_EQ(pi3_index, 2u);
Austin Schuh2febf0d2020-09-21 22:24:30 -07001310
Austin Schuh87dd3832021-01-01 23:07:31 -08001311 message_bridge::TestingTimeConverter time(
1312 configuration::NodesCount(&config.message()));
Austin Schuh2febf0d2020-09-21 22:24:30 -07001313 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
Austin Schuh58646e22021-08-23 23:51:46 -07001314 simulated_event_loop_factory.SetTimeConverter(&time);
Austin Schuh2febf0d2020-09-21 22:24:30 -07001315
Austin Schuh2febf0d2020-09-21 22:24:30 -07001316 constexpr chrono::milliseconds kOffset{150100};
Austin Schuh87dd3832021-01-01 23:07:31 -08001317 time.AddNextTimestamp(
1318 distributed_clock::epoch(),
Austin Schuh58646e22021-08-23 23:51:46 -07001319 {BootTimestamp::epoch(), BootTimestamp::epoch() + kOffset,
1320 BootTimestamp::epoch()});
1321 time.AddNextTimestamp(distributed_clock::epoch() + chrono::seconds(10),
1322 {BootTimestamp::epoch() + chrono::milliseconds(9999),
1323 BootTimestamp::epoch() + kOffset + chrono::seconds(10),
1324 BootTimestamp::epoch() + chrono::milliseconds(9999)});
Austin Schuh2febf0d2020-09-21 22:24:30 -07001325
1326 std::unique_ptr<EventLoop> ping_event_loop =
1327 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
1328 Ping ping(ping_event_loop.get());
1329
1330 std::unique_ptr<EventLoop> pong_event_loop =
1331 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
1332 Pong pong(pong_event_loop.get());
1333
1334 std::unique_ptr<EventLoop> pi1_counter_event_loop =
1335 simulated_event_loop_factory.MakeEventLoop("pi1_counter", pi1);
1336 std::unique_ptr<EventLoop> pi2_counter_event_loop =
1337 simulated_event_loop_factory.MakeEventLoop("pi2_counter", pi2);
1338
1339 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
1340 pi1_counter_event_loop->MakeFetcher<examples::Ping>("/test");
1341 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
1342 pi2_counter_event_loop->MakeFetcher<examples::Ping>("/test");
1343
1344 aos::Fetcher<examples::Pong> pong_on_pi2_fetcher =
1345 pi2_counter_event_loop->MakeFetcher<examples::Pong>("/test");
1346 aos::Fetcher<examples::Pong> pong_on_pi1_fetcher =
1347 pi1_counter_event_loop->MakeFetcher<examples::Pong>("/test");
1348
1349 // End after a pong message comes back. This will leave the latest messages
1350 // on all channels so we can look at timestamps easily and check they make
1351 // sense.
1352 std::unique_ptr<EventLoop> pi1_pong_ender =
1353 simulated_event_loop_factory.MakeEventLoop("pi2_counter", pi1);
1354 int count = 0;
1355 pi1_pong_ender->MakeWatcher(
1356 "/test", [&simulated_event_loop_factory, &count](const examples::Pong &) {
1357 if (++count == 100) {
1358 simulated_event_loop_factory.Exit();
1359 }
1360 });
1361
1362 // Run enough that messages should be delivered.
1363 simulated_event_loop_factory.Run();
1364
1365 // Grab the latest messages.
1366 EXPECT_TRUE(ping_on_pi1_fetcher.Fetch());
1367 EXPECT_TRUE(ping_on_pi2_fetcher.Fetch());
1368 EXPECT_TRUE(pong_on_pi1_fetcher.Fetch());
1369 EXPECT_TRUE(pong_on_pi2_fetcher.Fetch());
1370
1371 // Compute their time on the global distributed clock so we can compute
1372 // distance betwen them.
1373 const distributed_clock::time_point pi1_ping_time =
1374 simulated_event_loop_factory.GetNodeEventLoopFactory(pi1)
1375 ->ToDistributedClock(
1376 ping_on_pi1_fetcher.context().monotonic_event_time);
1377 const distributed_clock::time_point pi2_ping_time =
1378 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
1379 ->ToDistributedClock(
1380 ping_on_pi2_fetcher.context().monotonic_event_time);
1381 const distributed_clock::time_point pi1_pong_time =
1382 simulated_event_loop_factory.GetNodeEventLoopFactory(pi1)
1383 ->ToDistributedClock(
1384 pong_on_pi1_fetcher.context().monotonic_event_time);
1385 const distributed_clock::time_point pi2_pong_time =
1386 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
1387 ->ToDistributedClock(
1388 pong_on_pi2_fetcher.context().monotonic_event_time);
1389
1390 // And confirm the delivery delay is just about exactly 150 uS for both
1391 // directions like expected. There will be a couple ns of rounding errors in
1392 // the conversion functions that aren't worth accounting for right now. This
1393 // will either be really close, or really far.
1394 EXPECT_GE(pi2_ping_time, chrono::microseconds(150) - chrono::nanoseconds(10) +
1395 pi1_ping_time);
1396 EXPECT_LE(pi2_ping_time, chrono::microseconds(150) + chrono::nanoseconds(10) +
1397 pi1_ping_time);
1398
1399 EXPECT_GE(pi1_pong_time, chrono::microseconds(150) - chrono::nanoseconds(10) +
1400 pi2_pong_time);
1401 EXPECT_LE(pi1_pong_time, chrono::microseconds(150) + chrono::nanoseconds(10) +
1402 pi2_pong_time);
1403}
1404
Austin Schuh4c570ea2020-11-19 23:13:24 -08001405void SendPing(aos::Sender<examples::Ping> *sender, int value) {
1406 aos::Sender<examples::Ping>::Builder builder = sender->MakeBuilder();
1407 examples::Ping::Builder ping_builder = builder.MakeBuilder<examples::Ping>();
1408 ping_builder.add_value(value);
milind1f1dca32021-07-03 13:50:07 -07001409 builder.CheckOk(builder.Send(ping_builder.Finish()));
Austin Schuh4c570ea2020-11-19 23:13:24 -08001410}
1411
1412// Tests that reliable (and unreliable) ping messages get forwarded as expected.
Austin Schuh89c9b812021-02-20 14:42:10 -08001413TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeStartupTesting) {
Austin Schuh4c570ea2020-11-19 23:13:24 -08001414 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
1415 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
1416
1417 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
1418
1419 std::unique_ptr<EventLoop> ping_event_loop =
1420 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
1421 aos::Sender<examples::Ping> pi1_reliable_sender =
1422 ping_event_loop->MakeSender<examples::Ping>("/reliable");
1423 aos::Sender<examples::Ping> pi1_unreliable_sender =
1424 ping_event_loop->MakeSender<examples::Ping>("/unreliable");
1425 SendPing(&pi1_reliable_sender, 1);
1426 SendPing(&pi1_unreliable_sender, 1);
1427
1428 std::unique_ptr<EventLoop> pi2_pong_event_loop =
1429 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
1430 MessageCounter<examples::Ping> pi2_reliable_counter(pi2_pong_event_loop.get(),
1431 "/reliable");
1432 MessageCounter<examples::Ping> pi2_unreliable_counter(
1433 pi2_pong_event_loop.get(), "/unreliable");
1434 aos::Fetcher<examples::Ping> reliable_on_pi2_fetcher =
1435 pi2_pong_event_loop->MakeFetcher<examples::Ping>("/reliable");
1436 aos::Fetcher<examples::Ping> unreliable_on_pi2_fetcher =
1437 pi2_pong_event_loop->MakeFetcher<examples::Ping>("/unreliable");
1438
1439 const size_t reliable_channel_index = configuration::ChannelIndex(
1440 pi2_pong_event_loop->configuration(), reliable_on_pi2_fetcher.channel());
1441
1442 std::unique_ptr<EventLoop> pi1_remote_timestamp =
1443 simulated_event_loop_factory.MakeEventLoop("pi1_remote_timestamp", pi1);
1444
Austin Schuheeaa2022021-01-02 21:52:03 -08001445 const chrono::nanoseconds network_delay =
1446 simulated_event_loop_factory.network_delay();
1447
Austin Schuh4c570ea2020-11-19 23:13:24 -08001448 int reliable_timestamp_count = 0;
1449 pi1_remote_timestamp->MakeWatcher(
Austin Schuh89c9b812021-02-20 14:42:10 -08001450 shared() ? "/pi1/aos/remote_timestamps/pi2"
1451 : "/pi1/aos/remote_timestamps/pi2/reliable/aos-examples-Ping",
Austin Schuh20ac95d2020-12-05 17:24:19 -08001452 [reliable_channel_index, &reliable_timestamp_count,
Austin Schuheeaa2022021-01-02 21:52:03 -08001453 &simulated_event_loop_factory, pi2, network_delay, &pi2_pong_event_loop,
1454 &pi1_remote_timestamp](const RemoteMessage &header) {
Austin Schuh20ac95d2020-12-05 17:24:19 -08001455 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuhcdd90272021-03-15 12:46:16 -07001456 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
Austin Schuh20ac95d2020-12-05 17:24:19 -08001457 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
Austin Schuhcdd90272021-03-15 12:46:16 -07001458 ->boot_uuid());
Austin Schuh4c570ea2020-11-19 23:13:24 -08001459 VLOG(1) << aos::FlatbufferToJson(&header);
1460 if (header.channel_index() == reliable_channel_index) {
1461 ++reliable_timestamp_count;
1462 }
Austin Schuheeaa2022021-01-02 21:52:03 -08001463
1464 const aos::monotonic_clock::time_point header_monotonic_sent_time(
1465 chrono::nanoseconds(header.monotonic_sent_time()));
1466
1467 EXPECT_EQ(pi1_remote_timestamp->context().monotonic_event_time,
1468 header_monotonic_sent_time + network_delay +
1469 (pi1_remote_timestamp->monotonic_now() -
1470 pi2_pong_event_loop->monotonic_now()));
Austin Schuh4c570ea2020-11-19 23:13:24 -08001471 });
1472
1473 // Wait to let timestamp estimation start up before looking for the results.
1474 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
1475
1476 EXPECT_EQ(pi2_reliable_counter.count(), 1u);
1477 // This one isn't reliable, but was sent before the start. It should *not* be
1478 // delivered.
1479 EXPECT_EQ(pi2_unreliable_counter.count(), 0u);
1480 // Confirm we got a timestamp logged for the message that was forwarded.
1481 EXPECT_EQ(reliable_timestamp_count, 1u);
1482
1483 SendPing(&pi1_reliable_sender, 2);
1484 SendPing(&pi1_unreliable_sender, 2);
1485 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
1486 EXPECT_EQ(pi2_reliable_counter.count(), 2u);
1487 EXPECT_EQ(pi2_unreliable_counter.count(), 1u);
1488
1489 EXPECT_EQ(reliable_timestamp_count, 2u);
1490}
1491
Austin Schuh20ac95d2020-12-05 17:24:19 -08001492// Tests that rebooting a node changes the ServerStatistics message and the
1493// RemoteTimestamp message.
Austin Schuh89c9b812021-02-20 14:42:10 -08001494TEST_P(RemoteMessageSimulatedEventLoopTest, BootUUIDTest) {
Austin Schuh72e65682021-09-02 11:37:05 -07001495 const UUID pi1_boot0 = UUID::Random();
1496 const UUID pi2_boot0 = UUID::Random();
1497 const UUID pi2_boot1 = UUID::Random();
1498 const UUID pi3_boot0 = UUID::Random();
1499 UUID expected_boot_uuid = pi2_boot0;
Austin Schuh20ac95d2020-12-05 17:24:19 -08001500
Austin Schuh58646e22021-08-23 23:51:46 -07001501 message_bridge::TestingTimeConverter time(
1502 configuration::NodesCount(&config.message()));
1503 SimulatedEventLoopFactory factory(&config.message());
1504 factory.SetTimeConverter(&time);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001505
Austin Schuh58646e22021-08-23 23:51:46 -07001506 const size_t pi1_index =
1507 configuration::GetNodeIndex(&config.message(), "pi1");
1508 const size_t pi2_index =
1509 configuration::GetNodeIndex(&config.message(), "pi2");
1510 const size_t pi3_index =
1511 configuration::GetNodeIndex(&config.message(), "pi3");
Austin Schuh20ac95d2020-12-05 17:24:19 -08001512
Austin Schuh58646e22021-08-23 23:51:46 -07001513 {
1514 time.AddNextTimestamp(distributed_clock::epoch(),
1515 {BootTimestamp::epoch(), BootTimestamp::epoch(),
1516 BootTimestamp::epoch()});
1517
1518 const chrono::nanoseconds dt = chrono::milliseconds(2001);
1519
1520 time.AddNextTimestamp(
1521 distributed_clock::epoch() + dt,
1522 {BootTimestamp::epoch() + dt,
1523 BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
1524 BootTimestamp::epoch() + dt});
1525
1526 time.set_boot_uuid(pi1_index, 0, pi1_boot0);
1527 time.set_boot_uuid(pi2_index, 0, pi2_boot0);
1528 time.set_boot_uuid(pi2_index, 1, pi2_boot1);
1529 time.set_boot_uuid(pi3_index, 0, pi3_boot0);
1530 }
1531
1532 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1533 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
1534
1535 pi1->OnStartup([pi1]() { pi1->AlwaysStart<Ping>("ping"); });
1536 pi2->OnStartup([pi2]() { pi2->AlwaysStart<Pong>("pong"); });
Austin Schuh20ac95d2020-12-05 17:24:19 -08001537
1538 std::unique_ptr<EventLoop> pi1_remote_timestamp =
Austin Schuh58646e22021-08-23 23:51:46 -07001539 pi1->MakeEventLoop("pi1_remote_timestamp");
Austin Schuh20ac95d2020-12-05 17:24:19 -08001540
1541 int timestamp_count = 0;
1542 pi1_remote_timestamp->MakeWatcher(
Austin Schuh8902fa52021-03-14 22:39:24 -07001543 "/pi2/aos", [&expected_boot_uuid,
1544 &pi1_remote_timestamp](const message_bridge::Timestamp &) {
Austin Schuha9012be2021-07-21 15:19:11 -07001545 EXPECT_EQ(pi1_remote_timestamp->context().source_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -07001546 expected_boot_uuid);
1547 });
1548 pi1_remote_timestamp->MakeWatcher(
1549 "/test",
1550 [&expected_boot_uuid, &pi1_remote_timestamp](const examples::Pong &) {
Austin Schuha9012be2021-07-21 15:19:11 -07001551 EXPECT_EQ(pi1_remote_timestamp->context().source_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -07001552 expected_boot_uuid);
1553 });
1554 pi1_remote_timestamp->MakeWatcher(
Austin Schuh89c9b812021-02-20 14:42:10 -08001555 shared() ? "/pi1/aos/remote_timestamps/pi2"
1556 : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping",
Austin Schuh20ac95d2020-12-05 17:24:19 -08001557 [&timestamp_count, &expected_boot_uuid](const RemoteMessage &header) {
1558 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuhcdd90272021-03-15 12:46:16 -07001559 EXPECT_EQ(UUID::FromVector(header.boot_uuid()), expected_boot_uuid);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001560 VLOG(1) << aos::FlatbufferToJson(&header);
1561 ++timestamp_count;
1562 });
1563
1564 int pi1_server_statistics_count = 0;
Austin Schuh58646e22021-08-23 23:51:46 -07001565 bool first_pi1_server_statistics = true;
Austin Schuh20ac95d2020-12-05 17:24:19 -08001566 pi1_remote_timestamp->MakeWatcher(
Austin Schuh58646e22021-08-23 23:51:46 -07001567 "/pi1/aos", [&pi1_server_statistics_count, &expected_boot_uuid,
1568 &first_pi1_server_statistics](
Austin Schuh20ac95d2020-12-05 17:24:19 -08001569 const message_bridge::ServerStatistics &stats) {
1570 VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
1571 for (const message_bridge::ServerConnection *connection :
1572 *stats.connections()) {
Austin Schuh58646e22021-08-23 23:51:46 -07001573 if (connection->state() == message_bridge::State::CONNECTED) {
1574 ASSERT_TRUE(connection->has_boot_uuid());
1575 }
1576 if (!first_pi1_server_statistics) {
1577 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1578 }
Austin Schuh20ac95d2020-12-05 17:24:19 -08001579 if (connection->node()->name()->string_view() == "pi2") {
Austin Schuh58646e22021-08-23 23:51:46 -07001580 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1581 ASSERT_TRUE(connection->has_boot_uuid());
Austin Schuh20ac95d2020-12-05 17:24:19 -08001582 EXPECT_EQ(expected_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -07001583 UUID::FromString(connection->boot_uuid()))
Austin Schuh20ac95d2020-12-05 17:24:19 -08001584 << " : Got " << aos::FlatbufferToJson(&stats);
1585 ++pi1_server_statistics_count;
1586 }
1587 }
Austin Schuh58646e22021-08-23 23:51:46 -07001588 first_pi1_server_statistics = false;
Austin Schuh20ac95d2020-12-05 17:24:19 -08001589 });
1590
Austin Schuh58646e22021-08-23 23:51:46 -07001591 int pi1_client_statistics_count = 0;
1592 pi1_remote_timestamp->MakeWatcher(
1593 "/pi1/aos", [&pi1_client_statistics_count](
1594 const message_bridge::ClientStatistics &stats) {
1595 VLOG(1) << "pi1 ClientStatistics " << FlatbufferToJson(&stats);
1596 for (const message_bridge::ClientConnection *connection :
1597 *stats.connections()) {
1598 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1599 if (connection->node()->name()->string_view() == "pi2") {
1600 ++pi1_client_statistics_count;
1601 }
1602 }
1603 });
1604
1605 // Confirm that reboot changes the UUID.
1606 pi2->OnShutdown([&expected_boot_uuid, pi2, pi2_boot1]() {
1607 expected_boot_uuid = pi2_boot1;
1608 LOG(INFO) << "OnShutdown triggered for pi2";
1609 pi2->OnStartup([&expected_boot_uuid, pi2]() {
1610 EXPECT_EQ(expected_boot_uuid, pi2->boot_uuid());
1611 });
1612 });
1613
Austin Schuh20ac95d2020-12-05 17:24:19 -08001614 // Let a couple of ServerStatistics messages show up before rebooting.
Austin Schuh58646e22021-08-23 23:51:46 -07001615 factory.RunFor(chrono::milliseconds(2002));
Austin Schuh20ac95d2020-12-05 17:24:19 -08001616
1617 EXPECT_GT(timestamp_count, 100);
1618 EXPECT_GE(pi1_server_statistics_count, 1u);
1619
Austin Schuh20ac95d2020-12-05 17:24:19 -08001620 timestamp_count = 0;
1621 pi1_server_statistics_count = 0;
1622
Austin Schuh58646e22021-08-23 23:51:46 -07001623 factory.RunFor(chrono::milliseconds(2000));
Austin Schuh20ac95d2020-12-05 17:24:19 -08001624 EXPECT_GT(timestamp_count, 100);
1625 EXPECT_GE(pi1_server_statistics_count, 1u);
1626}
1627
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -07001628INSTANTIATE_TEST_SUITE_P(
Austin Schuh89c9b812021-02-20 14:42:10 -08001629 All, RemoteMessageSimulatedEventLoopTest,
1630 ::testing::Values(
1631 Param{"multinode_pingpong_test_combined_config.json", true},
1632 Param{"multinode_pingpong_test_split_config.json", false}));
1633
Austin Schuh58646e22021-08-23 23:51:46 -07001634// Tests that Startup and Shutdown do reasonable things.
1635TEST(SimulatedEventLoopTest, MultinodePingPongStartup) {
1636 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
1637 aos::configuration::ReadConfig(
1638 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
1639
Austin Schuh72e65682021-09-02 11:37:05 -07001640 size_t pi1_shutdown_counter = 0;
1641 size_t pi2_shutdown_counter = 0;
1642 MessageCounter<examples::Pong> *pi1_pong_counter = nullptr;
1643 MessageCounter<examples::Ping> *pi2_ping_counter = nullptr;
1644
Austin Schuh58646e22021-08-23 23:51:46 -07001645 message_bridge::TestingTimeConverter time(
1646 configuration::NodesCount(&config.message()));
1647 SimulatedEventLoopFactory factory(&config.message());
1648 factory.SetTimeConverter(&time);
1649 time.AddNextTimestamp(
1650 distributed_clock::epoch(),
1651 {BootTimestamp::epoch(), BootTimestamp::epoch(), BootTimestamp::epoch()});
1652
1653 const chrono::nanoseconds dt = chrono::seconds(10) + chrono::milliseconds(6);
1654
1655 time.AddNextTimestamp(
1656 distributed_clock::epoch() + dt,
1657 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
1658 BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
1659 BootTimestamp::epoch() + dt});
1660
1661 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1662 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
1663
1664 // Configure startup to start Ping and Pong, and count.
1665 size_t pi1_startup_counter = 0;
1666 size_t pi2_startup_counter = 0;
1667 pi1->OnStartup([pi1]() {
1668 LOG(INFO) << "Made ping";
1669 pi1->AlwaysStart<Ping>("ping");
1670 });
1671 pi1->OnStartup([&pi1_startup_counter]() { ++pi1_startup_counter; });
1672 pi2->OnStartup([pi2]() {
1673 LOG(INFO) << "Made pong";
1674 pi2->AlwaysStart<Pong>("pong");
1675 });
1676 pi2->OnStartup([&pi2_startup_counter]() { ++pi2_startup_counter; });
1677
1678 // Shutdown just counts.
Austin Schuh58646e22021-08-23 23:51:46 -07001679 pi1->OnShutdown([&pi1_shutdown_counter]() { ++pi1_shutdown_counter; });
1680 pi2->OnShutdown([&pi2_shutdown_counter]() { ++pi2_shutdown_counter; });
1681
Austin Schuh58646e22021-08-23 23:51:46 -07001682 // Automatically make counters on startup.
1683 pi1->OnStartup([&pi1_pong_counter, pi1]() {
1684 pi1_pong_counter = pi1->AlwaysStart<MessageCounter<examples::Pong>>(
1685 "pi1_pong_counter", "/test");
1686 });
1687 pi1->OnShutdown([&pi1_pong_counter]() { pi1_pong_counter = nullptr; });
1688 pi2->OnStartup([&pi2_ping_counter, pi2]() {
1689 pi2_ping_counter = pi2->AlwaysStart<MessageCounter<examples::Ping>>(
1690 "pi2_ping_counter", "/test");
1691 });
1692 pi2->OnShutdown([&pi2_ping_counter]() { pi2_ping_counter = nullptr; });
1693
1694 EXPECT_EQ(pi2_ping_counter, nullptr);
1695 EXPECT_EQ(pi1_pong_counter, nullptr);
1696
1697 EXPECT_EQ(pi1_startup_counter, 0u);
1698 EXPECT_EQ(pi2_startup_counter, 0u);
1699 EXPECT_EQ(pi1_shutdown_counter, 0u);
1700 EXPECT_EQ(pi2_shutdown_counter, 0u);
1701
1702 factory.RunFor(chrono::seconds(10) + chrono::milliseconds(5));
1703 EXPECT_EQ(pi1_startup_counter, 1u);
1704 EXPECT_EQ(pi2_startup_counter, 1u);
1705 EXPECT_EQ(pi1_shutdown_counter, 0u);
1706 EXPECT_EQ(pi2_shutdown_counter, 0u);
1707 EXPECT_EQ(pi2_ping_counter->count(), 1001);
1708 EXPECT_EQ(pi1_pong_counter->count(), 1001);
1709
1710 LOG(INFO) << pi1->monotonic_now();
1711 LOG(INFO) << pi2->monotonic_now();
1712
1713 factory.RunFor(chrono::seconds(5) + chrono::milliseconds(5));
1714
1715 EXPECT_EQ(pi1_startup_counter, 2u);
1716 EXPECT_EQ(pi2_startup_counter, 2u);
1717 EXPECT_EQ(pi1_shutdown_counter, 1u);
1718 EXPECT_EQ(pi2_shutdown_counter, 1u);
1719 EXPECT_EQ(pi2_ping_counter->count(), 501);
1720 EXPECT_EQ(pi1_pong_counter->count(), 501);
1721}
1722
1723// Tests that OnStartup handlers can be added after running and get called, and
1724// can't be called when running.
1725TEST(SimulatedEventLoopDeathTest, OnStartupWhileRunning) {
1726 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
1727 aos::configuration::ReadConfig(
1728 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
1729
1730 // Test that we can add startup handlers as long as we aren't running, and
1731 // they get run when Run gets called again.
1732 // Test that adding a startup handler when running fails.
1733 //
1734 // Test shutdown handlers get called on destruction.
1735 SimulatedEventLoopFactory factory(&config.message());
1736
1737 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1738
1739 int startup_count0 = 0;
1740 int startup_count1 = 0;
1741
1742 pi1->OnStartup([&]() { ++startup_count0; });
1743 EXPECT_EQ(startup_count0, 0);
1744 EXPECT_EQ(startup_count1, 0);
1745
1746 factory.RunFor(chrono::nanoseconds(1));
1747 EXPECT_EQ(startup_count0, 1);
1748 EXPECT_EQ(startup_count1, 0);
1749
1750 pi1->OnStartup([&]() { ++startup_count1; });
1751 EXPECT_EQ(startup_count0, 1);
1752 EXPECT_EQ(startup_count1, 0);
1753
1754 factory.RunFor(chrono::nanoseconds(1));
1755 EXPECT_EQ(startup_count0, 1);
1756 EXPECT_EQ(startup_count1, 1);
1757
1758 std::unique_ptr<EventLoop> loop = pi1->MakeEventLoop("foo");
1759 loop->OnRun([&]() { pi1->OnStartup([]() {}); });
1760
1761 EXPECT_DEATH({ factory.RunFor(chrono::nanoseconds(1)); },
1762 "Can only register OnStartup handlers when not running.");
1763}
1764
1765// Tests that OnStartup handlers can be added after running and get called, and
1766// all the handlers get called on reboot. Shutdown handlers are tested the same
1767// way.
1768TEST(SimulatedEventLoopTest, OnStartupShutdownAllRestarts) {
1769 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
1770 aos::configuration::ReadConfig(
1771 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
1772
Austin Schuh72e65682021-09-02 11:37:05 -07001773 int startup_count0 = 0;
1774 int shutdown_count0 = 0;
1775 int startup_count1 = 0;
1776 int shutdown_count1 = 0;
1777
Austin Schuh58646e22021-08-23 23:51:46 -07001778 message_bridge::TestingTimeConverter time(
1779 configuration::NodesCount(&config.message()));
1780 SimulatedEventLoopFactory factory(&config.message());
1781 factory.SetTimeConverter(&time);
1782 time.StartEqual();
1783
1784 const chrono::nanoseconds dt = chrono::seconds(10);
1785 time.RebootAt(0, distributed_clock::epoch() + dt);
1786 time.RebootAt(0, distributed_clock::epoch() + 2 * dt);
1787
1788 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1789
Austin Schuh58646e22021-08-23 23:51:46 -07001790 pi1->OnStartup([&]() { ++startup_count0; });
1791 pi1->OnShutdown([&]() { ++shutdown_count0; });
1792 EXPECT_EQ(startup_count0, 0);
1793 EXPECT_EQ(startup_count1, 0);
1794 EXPECT_EQ(shutdown_count0, 0);
1795 EXPECT_EQ(shutdown_count1, 0);
1796
1797 factory.RunFor(chrono::nanoseconds(1));
1798 EXPECT_EQ(startup_count0, 1);
1799 EXPECT_EQ(startup_count1, 0);
1800 EXPECT_EQ(shutdown_count0, 0);
1801 EXPECT_EQ(shutdown_count1, 0);
1802
1803 pi1->OnStartup([&]() { ++startup_count1; });
1804 EXPECT_EQ(startup_count0, 1);
1805 EXPECT_EQ(startup_count1, 0);
1806 EXPECT_EQ(shutdown_count0, 0);
1807 EXPECT_EQ(shutdown_count1, 0);
1808
1809 factory.RunFor(chrono::nanoseconds(1));
1810 EXPECT_EQ(startup_count0, 1);
1811 EXPECT_EQ(startup_count1, 1);
1812 EXPECT_EQ(shutdown_count0, 0);
1813 EXPECT_EQ(shutdown_count1, 0);
1814
1815 factory.RunFor(chrono::seconds(15));
1816
1817 EXPECT_EQ(startup_count0, 2);
1818 EXPECT_EQ(startup_count1, 2);
1819 EXPECT_EQ(shutdown_count0, 1);
1820 EXPECT_EQ(shutdown_count1, 0);
1821
1822 pi1->OnShutdown([&]() { ++shutdown_count1; });
1823 factory.RunFor(chrono::seconds(10));
1824
1825 EXPECT_EQ(startup_count0, 3);
1826 EXPECT_EQ(startup_count1, 3);
1827 EXPECT_EQ(shutdown_count0, 2);
1828 EXPECT_EQ(shutdown_count1, 1);
1829}
1830
1831// Tests that event loops which outlive shutdown crash.
1832TEST(SimulatedEventLoopDeathTest, EventLoopOutlivesReboot) {
1833 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
1834 aos::configuration::ReadConfig(
1835 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
1836
1837 message_bridge::TestingTimeConverter time(
1838 configuration::NodesCount(&config.message()));
1839 SimulatedEventLoopFactory factory(&config.message());
1840 factory.SetTimeConverter(&time);
1841 time.StartEqual();
1842
1843 const chrono::nanoseconds dt = chrono::seconds(10);
1844 time.RebootAt(0, distributed_clock::epoch() + dt);
1845
1846 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1847
1848 std::unique_ptr<EventLoop> loop = pi1->MakeEventLoop("foo");
1849
1850 EXPECT_DEATH({ factory.RunFor(dt * 2); }, "Event loop");
1851}
1852
1853// Tests that messages don't survive a reboot of a node.
1854TEST(SimulatedEventLoopTest, ChannelClearedOnReboot) {
1855 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
1856 aos::configuration::ReadConfig(
1857 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
1858
1859 message_bridge::TestingTimeConverter time(
1860 configuration::NodesCount(&config.message()));
1861 SimulatedEventLoopFactory factory(&config.message());
1862 factory.SetTimeConverter(&time);
1863 time.StartEqual();
1864
1865 const chrono::nanoseconds dt = chrono::seconds(10);
1866 time.RebootAt(0, distributed_clock::epoch() + dt);
1867
1868 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1869
1870 const UUID boot_uuid = pi1->boot_uuid();
1871 EXPECT_NE(boot_uuid, UUID::Zero());
1872
1873 {
1874 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
1875 aos::Sender<examples::Ping> test_message_sender =
1876 ping_event_loop->MakeSender<examples::Ping>("/reliable");
1877 SendPing(&test_message_sender, 1);
1878 }
1879
1880 factory.RunFor(chrono::seconds(5));
1881
1882 {
1883 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
1884 aos::Fetcher<examples::Ping> fetcher =
1885 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
1886 EXPECT_TRUE(fetcher.Fetch());
1887 }
1888
1889 factory.RunFor(chrono::seconds(10));
1890
1891 {
1892 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
1893 aos::Fetcher<examples::Ping> fetcher =
1894 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
1895 EXPECT_FALSE(fetcher.Fetch());
1896 }
1897 EXPECT_NE(boot_uuid, pi1->boot_uuid());
1898}
1899
1900// Tests that reliable messages get resent on reboot.
1901TEST(SimulatedEventLoopTest, ReliableMessageResentOnReboot) {
1902 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
1903 aos::configuration::ReadConfig(
1904 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
1905
1906 message_bridge::TestingTimeConverter time(
1907 configuration::NodesCount(&config.message()));
1908 SimulatedEventLoopFactory factory(&config.message());
1909 factory.SetTimeConverter(&time);
1910 time.StartEqual();
1911
1912 const chrono::nanoseconds dt = chrono::seconds(1);
1913 time.RebootAt(1, distributed_clock::epoch() + dt);
1914
1915 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1916 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
1917
1918 const UUID pi1_boot_uuid = pi1->boot_uuid();
1919 const UUID pi2_boot_uuid = pi2->boot_uuid();
1920 EXPECT_NE(pi1_boot_uuid, UUID::Zero());
1921 EXPECT_NE(pi2_boot_uuid, UUID::Zero());
1922
1923 {
1924 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
1925 aos::Sender<examples::Ping> test_message_sender =
1926 ping_event_loop->MakeSender<examples::Ping>("/reliable");
1927 SendPing(&test_message_sender, 1);
1928 }
1929
1930 factory.RunFor(chrono::milliseconds(500));
1931
1932 {
1933 ::std::unique_ptr<EventLoop> ping_event_loop = pi2->MakeEventLoop("pong");
1934 aos::Fetcher<examples::Ping> fetcher =
1935 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
1936 EXPECT_TRUE(fetcher.Fetch());
1937 }
1938
1939 factory.RunFor(chrono::seconds(1));
1940
1941 {
1942 ::std::unique_ptr<EventLoop> ping_event_loop = pi2->MakeEventLoop("pong");
1943 aos::Fetcher<examples::Ping> fetcher =
1944 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
1945 EXPECT_TRUE(fetcher.Fetch());
1946 }
1947 EXPECT_NE(pi2_boot_uuid, pi2->boot_uuid());
1948}
1949
Austin Schuh48205e62021-11-12 14:13:18 -08001950class SimulatedEventLoopDisconnectTest : public ::testing::Test {
1951 public:
1952 SimulatedEventLoopDisconnectTest()
1953 : config(aos::configuration::ReadConfig(ArtifactPath(
1954 "aos/events/multinode_pingpong_test_split_config.json"))),
1955 time(configuration::NodesCount(&config.message())),
1956 factory(&config.message()) {
1957 factory.SetTimeConverter(&time);
1958 }
1959
1960 void VerifyChannels(std::set<const aos::Channel *> statistics_channels,
1961 const monotonic_clock::time_point allowable_message_time,
1962 std::set<const aos::Node *> empty_nodes) {
1963 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1964 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
1965 std::unique_ptr<aos::EventLoop> pi1_event_loop =
1966 pi1->MakeEventLoop("fetcher");
1967 std::unique_ptr<aos::EventLoop> pi2_event_loop =
1968 pi2->MakeEventLoop("fetcher");
1969 for (const aos::Channel *channel : *factory.configuration()->channels()) {
1970 if (configuration::ChannelIsReadableOnNode(channel,
1971 pi1_event_loop->node())) {
1972 std::unique_ptr<aos::RawFetcher> fetcher =
1973 pi1_event_loop->MakeRawFetcher(channel);
1974 if (statistics_channels.find(channel) == statistics_channels.end() ||
1975 empty_nodes.find(pi1_event_loop->node()) != empty_nodes.end()) {
1976 EXPECT_FALSE(fetcher->Fetch() &&
1977 fetcher->context().monotonic_event_time >
1978 allowable_message_time)
1979 << ": Found recent message on channel "
1980 << configuration::CleanedChannelToString(channel) << " and time "
1981 << fetcher->context().monotonic_event_time << " > "
1982 << allowable_message_time << " on pi1";
1983 } else {
1984 EXPECT_TRUE(fetcher->Fetch() &&
1985 fetcher->context().monotonic_event_time >=
1986 allowable_message_time)
1987 << ": Didn't find recent message on channel "
1988 << configuration::CleanedChannelToString(channel) << " on pi1";
1989 }
1990 }
1991 if (configuration::ChannelIsReadableOnNode(channel,
1992 pi2_event_loop->node())) {
1993 std::unique_ptr<aos::RawFetcher> fetcher =
1994 pi2_event_loop->MakeRawFetcher(channel);
1995 if (statistics_channels.find(channel) == statistics_channels.end() ||
1996 empty_nodes.find(pi2_event_loop->node()) != empty_nodes.end()) {
1997 EXPECT_FALSE(fetcher->Fetch() &&
1998 fetcher->context().monotonic_event_time >
1999 allowable_message_time)
2000 << ": Found message on channel "
2001 << configuration::CleanedChannelToString(channel) << " and time "
2002 << fetcher->context().monotonic_event_time << " > "
2003 << allowable_message_time << " on pi2";
2004 } else {
2005 EXPECT_TRUE(fetcher->Fetch() &&
2006 fetcher->context().monotonic_event_time >=
2007 allowable_message_time)
2008 << ": Didn't find message on channel "
2009 << configuration::CleanedChannelToString(channel) << " on pi2";
2010 }
2011 }
2012 }
2013 }
2014
2015 aos::FlatbufferDetachedBuffer<aos::Configuration> config;
2016
2017 message_bridge::TestingTimeConverter time;
2018 SimulatedEventLoopFactory factory;
2019};
2020
2021// Tests that if we have message bridge client/server disabled, and timing
2022// reports disabled, no messages are sent. Also tests that we can disconnect a
2023// node and disable statistics on it and it actually fully disconnects.
2024TEST_F(SimulatedEventLoopDisconnectTest, NoMessagesWhenDisabled) {
2025 time.StartEqual();
2026 factory.SkipTimingReport();
2027 factory.DisableStatistics();
2028
2029 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2030 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2031
2032 std::unique_ptr<aos::EventLoop> pi1_event_loop =
2033 pi1->MakeEventLoop("fetcher");
2034 std::unique_ptr<aos::EventLoop> pi2_event_loop =
2035 pi2->MakeEventLoop("fetcher");
2036
2037 factory.RunFor(chrono::milliseconds(100000));
2038
2039 // Confirm no messages are sent if we've configured them all off.
2040 VerifyChannels({}, monotonic_clock::min_time, {});
2041
2042 // Now, confirm that all the message_bridge channels come back when we
2043 // re-enable.
2044 factory.EnableStatistics();
2045
2046 factory.RunFor(chrono::milliseconds(10050));
2047
2048 // Build up the list of all the messages we expect when we come back.
2049 {
2050 std::set<const aos::Channel *> statistics_channels;
2051 for (const std::pair<std::string_view, const Node *> pi :
2052 std::vector<std::pair<std::string_view, const Node *>>{
2053 {"/pi1/aos", pi1->node()},
2054 {"/pi2/aos", pi1->node()},
2055 {"/pi3/aos", pi1->node()}}) {
2056 statistics_channels.insert(configuration::GetChannel(
2057 factory.configuration(), pi.first, "aos.message_bridge.Timestamp", "",
2058 pi.second));
2059 statistics_channels.insert(configuration::GetChannel(
2060 factory.configuration(), pi.first,
2061 "aos.message_bridge.ServerStatistics", "", pi.second));
2062 statistics_channels.insert(configuration::GetChannel(
2063 factory.configuration(), pi.first,
2064 "aos.message_bridge.ClientStatistics", "", pi.second));
2065 }
2066
2067 statistics_channels.insert(configuration::GetChannel(
2068 factory.configuration(),
2069 "/pi1/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp",
2070 "aos.message_bridge.RemoteMessage", "", pi1->node()));
2071 statistics_channels.insert(configuration::GetChannel(
2072 factory.configuration(),
2073 "/pi2/aos/remote_timestamps/pi1/pi2/aos/aos-message_bridge-Timestamp",
2074 "aos.message_bridge.RemoteMessage", "", pi2->node()));
2075 VerifyChannels(statistics_channels, monotonic_clock::min_time, {});
2076 }
2077
2078 // Now test that we can disable the messages for a single node
2079 pi2->DisableStatistics();
2080 const aos::monotonic_clock::time_point statistics_disable_time =
2081 pi2->monotonic_now();
2082 factory.RunFor(chrono::milliseconds(10000));
2083
2084 // We should see a much smaller set of messages, but should still see messages
2085 // forwarded, mainly the timestamp message.
2086 {
2087 std::set<const aos::Channel *> statistics_channels;
2088 for (const std::pair<std::string_view, const Node *> pi :
2089 std::vector<std::pair<std::string_view, const Node *>>{
2090 {"/pi1/aos", pi1->node()}, {"/pi3/aos", pi1->node()}}) {
2091 statistics_channels.insert(configuration::GetChannel(
2092 factory.configuration(), pi.first, "aos.message_bridge.Timestamp", "",
2093 pi.second));
2094 statistics_channels.insert(configuration::GetChannel(
2095 factory.configuration(), pi.first,
2096 "aos.message_bridge.ServerStatistics", "", pi.second));
2097 statistics_channels.insert(configuration::GetChannel(
2098 factory.configuration(), pi.first,
2099 "aos.message_bridge.ClientStatistics", "", pi.second));
2100 }
2101
2102 statistics_channels.insert(configuration::GetChannel(
2103 factory.configuration(),
2104 "/pi1/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp",
2105 "aos.message_bridge.RemoteMessage", "", pi1->node()));
2106 VerifyChannels(statistics_channels, statistics_disable_time, {});
2107 }
2108
2109 // Now, fully disconnect the node. This will completely quiet down pi2.
2110 pi1->Disconnect(pi2->node());
2111 pi2->Disconnect(pi1->node());
2112
2113 const aos::monotonic_clock::time_point disconnect_disable_time =
2114 pi2->monotonic_now();
2115 factory.RunFor(chrono::milliseconds(10000));
2116
2117 {
2118 std::set<const aos::Channel *> statistics_channels;
2119 for (const std::pair<std::string_view, const Node *> pi :
2120 std::vector<std::pair<std::string_view, const Node *>>{
2121 {"/pi1/aos", pi1->node()}, {"/pi3/aos", pi1->node()}}) {
2122 statistics_channels.insert(configuration::GetChannel(
2123 factory.configuration(), pi.first, "aos.message_bridge.Timestamp", "",
2124 pi.second));
2125 statistics_channels.insert(configuration::GetChannel(
2126 factory.configuration(), pi.first,
2127 "aos.message_bridge.ServerStatistics", "", pi.second));
2128 statistics_channels.insert(configuration::GetChannel(
2129 factory.configuration(), pi.first,
2130 "aos.message_bridge.ClientStatistics", "", pi.second));
2131 }
2132
2133 VerifyChannels(statistics_channels, disconnect_disable_time, {pi2->node()});
2134 }
2135}
2136
Neil Balchc8f41ed2018-01-20 22:06:53 -08002137} // namespace testing
2138} // namespace aos