blob: 696c16e68a04e1dd6ece194caa87cf1ffae43b19 [file] [log] [blame]
Alex Perrycb7da4b2019-08-28 19:35:56 -07001#include "aos/events/simulated_event_loop.h"
2
milind1f1dca32021-07-03 13:50:07 -07003#include <chrono>
Austin Schuh5f1cc5c2019-12-01 18:01:11 -08004#include <string_view>
5
Alex Perrycb7da4b2019-08-28 19:35:56 -07006#include "aos/events/event_loop_param_test.h"
Austin Schuh2f8fd752020-09-01 22:38:28 -07007#include "aos/events/logging/logger_generated.h"
Austin Schuh01b4c352020-09-21 23:09:39 -07008#include "aos/events/message_counter.h"
Austin Schuh898f4972020-01-11 17:21:25 -08009#include "aos/events/ping_lib.h"
10#include "aos/events/pong_lib.h"
Austin Schuh7d87b672019-12-01 20:23:49 -080011#include "aos/events/test_message_generated.h"
Austin Schuh4c3b9702020-08-30 11:34:55 -070012#include "aos/network/message_bridge_client_generated.h"
13#include "aos/network/message_bridge_server_generated.h"
Austin Schuh0de30f32020-12-06 12:44:28 -080014#include "aos/network/remote_message_generated.h"
Austin Schuh87dd3832021-01-01 23:07:31 -080015#include "aos/network/testing_time_converter.h"
Austin Schuh4c3b9702020-08-30 11:34:55 -070016#include "aos/network/timestamp_generated.h"
Austin Schuh373f1762021-06-02 21:07:09 -070017#include "aos/testing/path.h"
Neil Balchc8f41ed2018-01-20 22:06:53 -080018#include "gtest/gtest.h"
19
20namespace aos {
21namespace testing {
Brian Silverman28d14302020-09-18 15:26:17 -070022namespace {
23
Austin Schuh373f1762021-06-02 21:07:09 -070024using aos::testing::ArtifactPath;
Brian Silverman28d14302020-09-18 15:26:17 -070025
Austin Schuh58646e22021-08-23 23:51:46 -070026using logger::BootTimestamp;
Austin Schuh0de30f32020-12-06 12:44:28 -080027using message_bridge::RemoteMessage;
Austin Schuh7267c532019-05-19 19:55:53 -070028namespace chrono = ::std::chrono;
29
Austin Schuh0de30f32020-12-06 12:44:28 -080030} // namespace
31
Neil Balchc8f41ed2018-01-20 22:06:53 -080032class SimulatedEventLoopTestFactory : public EventLoopTestFactory {
33 public:
Austin Schuh5f1cc5c2019-12-01 18:01:11 -080034 ::std::unique_ptr<EventLoop> Make(std::string_view name) override {
Austin Schuh217a9782019-12-21 23:02:50 -080035 MaybeMake();
Austin Schuhac0771c2020-01-07 18:36:30 -080036 return event_loop_factory_->MakeEventLoop(name, my_node());
Neil Balchc8f41ed2018-01-20 22:06:53 -080037 }
Austin Schuh5f1cc5c2019-12-01 18:01:11 -080038 ::std::unique_ptr<EventLoop> MakePrimary(std::string_view name) override {
Austin Schuh217a9782019-12-21 23:02:50 -080039 MaybeMake();
Austin Schuhac0771c2020-01-07 18:36:30 -080040 return event_loop_factory_->MakeEventLoop(name, my_node());
Austin Schuh44019f92019-05-19 19:58:27 -070041 }
42
Austin Schuh217a9782019-12-21 23:02:50 -080043 void Run() override { event_loop_factory_->Run(); }
44 void Exit() override { event_loop_factory_->Exit(); }
Austin Schuh44019f92019-05-19 19:58:27 -070045
Austin Schuh52d325c2019-06-23 18:59:06 -070046 // TODO(austin): Implement this. It's used currently for a phased loop test.
47 // I'm not sure how much that matters.
48 void SleepFor(::std::chrono::nanoseconds /*duration*/) override {}
49
Austin Schuh7d87b672019-12-01 20:23:49 -080050 void set_send_delay(std::chrono::nanoseconds send_delay) {
Austin Schuh217a9782019-12-21 23:02:50 -080051 MaybeMake();
52 event_loop_factory_->set_send_delay(send_delay);
Austin Schuh7d87b672019-12-01 20:23:49 -080053 }
54
Neil Balchc8f41ed2018-01-20 22:06:53 -080055 private:
Austin Schuh217a9782019-12-21 23:02:50 -080056 void MaybeMake() {
57 if (!event_loop_factory_) {
58 if (configuration()->has_nodes()) {
Austin Schuhac0771c2020-01-07 18:36:30 -080059 event_loop_factory_ =
60 std::make_unique<SimulatedEventLoopFactory>(configuration());
Austin Schuh217a9782019-12-21 23:02:50 -080061 } else {
62 event_loop_factory_ =
63 std::make_unique<SimulatedEventLoopFactory>(configuration());
64 }
65 }
66 }
67 std::unique_ptr<SimulatedEventLoopFactory> event_loop_factory_;
Neil Balchc8f41ed2018-01-20 22:06:53 -080068};
69
Austin Schuh6bae8252021-02-07 22:01:49 -080070auto CommonParameters() {
71 return ::testing::Combine(
72 ::testing::Values([]() { return new SimulatedEventLoopTestFactory(); }),
73 ::testing::Values(ReadMethod::COPY, ReadMethod::PIN),
74 ::testing::Values(DoTimingReports::kYes, DoTimingReports::kNo));
75}
Austin Schuh6b6dfa52019-06-12 20:16:20 -070076
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -070077INSTANTIATE_TEST_SUITE_P(SimulatedEventLoopCommonTest, AbstractEventLoopTest,
Austin Schuh66168842021-08-17 19:42:21 -070078 CommonParameters());
Brian Silverman77162972020-08-12 19:52:40 -070079
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -070080INSTANTIATE_TEST_SUITE_P(SimulatedEventLoopCommonDeathTest,
Austin Schuh66168842021-08-17 19:42:21 -070081 AbstractEventLoopDeathTest, CommonParameters());
Neil Balchc8f41ed2018-01-20 22:06:53 -080082
Austin Schuh89c9b812021-02-20 14:42:10 -080083// Parameters to run all the tests with.
84struct Param {
85 // The config file to use.
86 std::string config;
87 // If true, the RemoteMessage channel should be shared between all the remote
88 // channels. If false, there will be 1 RemoteMessage channel per remote
89 // channel.
90 bool shared;
91};
92
93class RemoteMessageSimulatedEventLoopTest
94 : public ::testing::TestWithParam<struct Param> {
95 public:
96 RemoteMessageSimulatedEventLoopTest()
97 : config(aos::configuration::ReadConfig(
Austin Schuh373f1762021-06-02 21:07:09 -070098 ArtifactPath(absl::StrCat("aos/events/", GetParam().config)))) {
Austin Schuh89c9b812021-02-20 14:42:10 -080099 LOG(INFO) << "Config " << GetParam().config;
100 }
101
102 bool shared() const { return GetParam().shared; }
103
104 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
105 MakePi2OnPi1MessageCounters(aos::EventLoop *event_loop) {
106 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>> counters;
107 if (shared()) {
108 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
109 event_loop, "/aos/remote_timestamps/pi2"));
110 } else {
111 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
112 event_loop,
113 "/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp"));
114 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
115 event_loop, "/aos/remote_timestamps/pi2/test/aos-examples-Ping"));
116 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
117 event_loop, "/aos/remote_timestamps/pi2/reliable/aos-examples-Ping"));
118 }
119 return counters;
120 }
121
122 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
123 MakePi1OnPi2MessageCounters(aos::EventLoop *event_loop) {
124 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>> counters;
125 if (shared()) {
126 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
127 event_loop, "/aos/remote_timestamps/pi1"));
128 } else {
129 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
130 event_loop, "/aos/remote_timestamps/pi1/test/aos-examples-Pong"));
131 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
132 event_loop,
133 "/aos/remote_timestamps/pi1/pi2/aos/aos-message_bridge-Timestamp"));
134 }
135 return counters;
136 }
137
138 aos::FlatbufferDetachedBuffer<aos::Configuration> config;
139};
140
Neil Balchc8f41ed2018-01-20 22:06:53 -0800141// Test that creating an event and running the scheduler runs the event.
142TEST(EventSchedulerTest, ScheduleEvent) {
143 int counter = 0;
Austin Schuh8bd96322020-02-13 21:18:22 -0800144 EventSchedulerScheduler scheduler_scheduler;
Austin Schuh58646e22021-08-23 23:51:46 -0700145 EventScheduler scheduler(0);
Austin Schuh8bd96322020-02-13 21:18:22 -0800146 scheduler_scheduler.AddEventScheduler(&scheduler);
Neil Balchc8f41ed2018-01-20 22:06:53 -0800147
Austin Schuh8bd96322020-02-13 21:18:22 -0800148 scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(1),
Austin Schuhac0771c2020-01-07 18:36:30 -0800149 [&counter]() { counter += 1; });
Austin Schuh8bd96322020-02-13 21:18:22 -0800150 scheduler_scheduler.Run();
Neil Balchc8f41ed2018-01-20 22:06:53 -0800151 EXPECT_EQ(counter, 1);
Ravago Jonescf453ab2020-05-06 21:14:53 -0700152 auto token = scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(2),
153 [&counter]() { counter += 1; });
Neil Balchc8f41ed2018-01-20 22:06:53 -0800154 scheduler.Deschedule(token);
Austin Schuh8bd96322020-02-13 21:18:22 -0800155 scheduler_scheduler.Run();
Neil Balchc8f41ed2018-01-20 22:06:53 -0800156 EXPECT_EQ(counter, 1);
157}
158
159// Test that descheduling an already scheduled event doesn't run the event.
160TEST(EventSchedulerTest, DescheduleEvent) {
161 int counter = 0;
Austin Schuh8bd96322020-02-13 21:18:22 -0800162 EventSchedulerScheduler scheduler_scheduler;
Austin Schuh58646e22021-08-23 23:51:46 -0700163 EventScheduler scheduler(0);
Austin Schuh8bd96322020-02-13 21:18:22 -0800164 scheduler_scheduler.AddEventScheduler(&scheduler);
Neil Balchc8f41ed2018-01-20 22:06:53 -0800165
Austin Schuh8bd96322020-02-13 21:18:22 -0800166 auto token = scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(1),
167 [&counter]() { counter += 1; });
Neil Balchc8f41ed2018-01-20 22:06:53 -0800168 scheduler.Deschedule(token);
Austin Schuh8bd96322020-02-13 21:18:22 -0800169 scheduler_scheduler.Run();
Neil Balchc8f41ed2018-01-20 22:06:53 -0800170 EXPECT_EQ(counter, 0);
171}
Austin Schuh44019f92019-05-19 19:58:27 -0700172
Austin Schuh8fb315a2020-11-19 22:33:58 -0800173void SendTestMessage(aos::Sender<TestMessage> *sender, int value) {
174 aos::Sender<TestMessage>::Builder builder = sender->MakeBuilder();
175 TestMessage::Builder test_message_builder =
176 builder.MakeBuilder<TestMessage>();
177 test_message_builder.add_value(value);
milind1f1dca32021-07-03 13:50:07 -0700178 ASSERT_EQ(builder.Send(test_message_builder.Finish()),
179 RawSender::Error::kOk);
Austin Schuh8fb315a2020-11-19 22:33:58 -0800180}
181
182// Test that sending a message after running gets properly notified.
183TEST(SimulatedEventLoopTest, SendAfterRunFor) {
184 SimulatedEventLoopTestFactory factory;
185
186 SimulatedEventLoopFactory simulated_event_loop_factory(
187 factory.configuration());
188
189 ::std::unique_ptr<EventLoop> ping_event_loop =
190 simulated_event_loop_factory.MakeEventLoop("ping");
191 aos::Sender<TestMessage> test_message_sender =
192 ping_event_loop->MakeSender<TestMessage>("/test");
193 SendTestMessage(&test_message_sender, 1);
194
195 std::unique_ptr<EventLoop> pong1_event_loop =
196 simulated_event_loop_factory.MakeEventLoop("pong");
197 MessageCounter<TestMessage> test_message_counter1(pong1_event_loop.get(),
198 "/test");
199
200 EXPECT_FALSE(ping_event_loop->is_running());
201
202 // Watchers start when you start running, so there should be nothing counted.
203 simulated_event_loop_factory.RunFor(chrono::seconds(1));
204 EXPECT_EQ(test_message_counter1.count(), 0u);
205
206 std::unique_ptr<EventLoop> pong2_event_loop =
207 simulated_event_loop_factory.MakeEventLoop("pong");
208 MessageCounter<TestMessage> test_message_counter2(pong2_event_loop.get(),
209 "/test");
210
211 // Pauses in the middle don't count though, so this should be counted.
212 // But, the fresh watcher shouldn't pick it up yet.
213 SendTestMessage(&test_message_sender, 2);
214
215 EXPECT_EQ(test_message_counter1.count(), 0u);
216 EXPECT_EQ(test_message_counter2.count(), 0u);
217 simulated_event_loop_factory.RunFor(chrono::seconds(1));
218
219 EXPECT_EQ(test_message_counter1.count(), 1u);
220 EXPECT_EQ(test_message_counter2.count(), 0u);
221}
222
223// Test that creating an event loop while running dies.
224TEST(SimulatedEventLoopDeathTest, MakeEventLoopWhileRunning) {
225 SimulatedEventLoopTestFactory factory;
226
227 SimulatedEventLoopFactory simulated_event_loop_factory(
228 factory.configuration());
229
230 ::std::unique_ptr<EventLoop> event_loop =
231 simulated_event_loop_factory.MakeEventLoop("ping");
232
233 auto timer = event_loop->AddTimer([&]() {
234 EXPECT_DEATH(
235 {
236 ::std::unique_ptr<EventLoop> event_loop2 =
237 simulated_event_loop_factory.MakeEventLoop("ping");
238 },
239 "event loop while running");
240 simulated_event_loop_factory.Exit();
241 });
242
243 event_loop->OnRun([&event_loop, &timer] {
244 timer->Setup(event_loop->monotonic_now() + chrono::milliseconds(50));
245 });
246
247 simulated_event_loop_factory.Run();
248}
249
250// Test that creating a watcher after running dies.
251TEST(SimulatedEventLoopDeathTest, MakeWatcherAfterRunning) {
252 SimulatedEventLoopTestFactory factory;
253
254 SimulatedEventLoopFactory simulated_event_loop_factory(
255 factory.configuration());
256
257 ::std::unique_ptr<EventLoop> event_loop =
258 simulated_event_loop_factory.MakeEventLoop("ping");
259
260 simulated_event_loop_factory.RunFor(chrono::seconds(1));
261
262 EXPECT_DEATH(
263 { MessageCounter<TestMessage> counter(event_loop.get(), "/test"); },
264 "Can't add a watcher after running");
265
266 ::std::unique_ptr<EventLoop> event_loop2 =
267 simulated_event_loop_factory.MakeEventLoop("ping");
268
269 simulated_event_loop_factory.RunFor(chrono::seconds(1));
270
271 EXPECT_DEATH(
272 { MessageCounter<TestMessage> counter(event_loop2.get(), "/test"); },
273 "Can't add a watcher after running");
274}
275
Austin Schuh44019f92019-05-19 19:58:27 -0700276// Test that running for a time period with no handlers causes time to progress
277// correctly.
278TEST(SimulatedEventLoopTest, RunForNoHandlers) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800279 SimulatedEventLoopTestFactory factory;
280
281 SimulatedEventLoopFactory simulated_event_loop_factory(
282 factory.configuration());
Austin Schuh44019f92019-05-19 19:58:27 -0700283 ::std::unique_ptr<EventLoop> event_loop =
Austin Schuh5f1cc5c2019-12-01 18:01:11 -0800284 simulated_event_loop_factory.MakeEventLoop("loop");
Austin Schuh44019f92019-05-19 19:58:27 -0700285
286 simulated_event_loop_factory.RunFor(chrono::seconds(1));
287
288 EXPECT_EQ(::aos::monotonic_clock::epoch() + chrono::seconds(1),
Austin Schuh44019f92019-05-19 19:58:27 -0700289 event_loop->monotonic_now());
290}
291
292// Test that running for a time with a periodic handler causes time to end
293// correctly.
294TEST(SimulatedEventLoopTest, RunForTimerHandler) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800295 SimulatedEventLoopTestFactory factory;
296
297 SimulatedEventLoopFactory simulated_event_loop_factory(
298 factory.configuration());
Austin Schuh44019f92019-05-19 19:58:27 -0700299 ::std::unique_ptr<EventLoop> event_loop =
Austin Schuh5f1cc5c2019-12-01 18:01:11 -0800300 simulated_event_loop_factory.MakeEventLoop("loop");
Austin Schuh44019f92019-05-19 19:58:27 -0700301
302 int counter = 0;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700303 auto timer = event_loop->AddTimer([&counter]() { ++counter; });
Austin Schuh44019f92019-05-19 19:58:27 -0700304 event_loop->OnRun([&event_loop, &timer] {
305 timer->Setup(event_loop->monotonic_now() + chrono::milliseconds(50),
306 chrono::milliseconds(100));
307 });
308
309 simulated_event_loop_factory.RunFor(chrono::seconds(1));
310
311 EXPECT_EQ(::aos::monotonic_clock::epoch() + chrono::seconds(1),
Austin Schuh44019f92019-05-19 19:58:27 -0700312 event_loop->monotonic_now());
313 EXPECT_EQ(counter, 10);
314}
315
Austin Schuh7d87b672019-12-01 20:23:49 -0800316// Tests that watchers have latency in simulation.
317TEST(SimulatedEventLoopTest, WatcherTimingReport) {
318 SimulatedEventLoopTestFactory factory;
319 factory.set_send_delay(std::chrono::microseconds(50));
320
321 FLAGS_timing_report_ms = 1000;
322 auto loop1 = factory.MakePrimary("primary");
323 loop1->MakeWatcher("/test", [](const TestMessage &) {});
324
325 auto loop2 = factory.Make("sender_loop");
326
327 auto loop3 = factory.Make("report_fetcher");
328
329 Fetcher<timing::Report> report_fetcher =
330 loop3->MakeFetcher<timing::Report>("/aos");
331 EXPECT_FALSE(report_fetcher.Fetch());
332
333 auto sender = loop2->MakeSender<TestMessage>("/test");
334
335 // Send 10 messages in the middle of a timing report period so we get
336 // something interesting back.
337 auto test_timer = loop2->AddTimer([&sender]() {
338 for (int i = 0; i < 10; ++i) {
339 aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
340 TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
341 builder.add_value(200 + i);
milind1f1dca32021-07-03 13:50:07 -0700342 msg.CheckOk(msg.Send(builder.Finish()));
Austin Schuh7d87b672019-12-01 20:23:49 -0800343 }
344 });
345
346 // Quit after 1 timing report, mid way through the next cycle.
347 {
348 auto end_timer = loop1->AddTimer([&factory]() { factory.Exit(); });
349 end_timer->Setup(loop1->monotonic_now() + chrono::milliseconds(2500));
350 end_timer->set_name("end");
351 }
352
353 loop1->OnRun([&test_timer, &loop1]() {
354 test_timer->Setup(loop1->monotonic_now() + chrono::milliseconds(1500));
355 });
356
357 factory.Run();
358
359 // And, since we are here, check that the timing report makes sense.
360 // Start by looking for our event loop's timing.
361 FlatbufferDetachedBuffer<timing::Report> primary_report =
362 FlatbufferDetachedBuffer<timing::Report>::Empty();
363 while (report_fetcher.FetchNext()) {
364 LOG(INFO) << "Report " << FlatbufferToJson(report_fetcher.get());
365 if (report_fetcher->name()->string_view() == "primary") {
366 primary_report = CopyFlatBuffer(report_fetcher.get());
367 }
368 }
369
370 // Check the watcher report.
Ravago Jonescf453ab2020-05-06 21:14:53 -0700371 VLOG(1) << FlatbufferToJson(primary_report, {.multi_line = true});
Austin Schuh7d87b672019-12-01 20:23:49 -0800372
373 EXPECT_EQ(primary_report.message().name()->string_view(), "primary");
374
375 // Just the timing report timer.
376 ASSERT_NE(primary_report.message().timers(), nullptr);
377 EXPECT_EQ(primary_report.message().timers()->size(), 2);
378
379 // No phased loops
380 ASSERT_EQ(primary_report.message().phased_loops(), nullptr);
381
382 // And now confirm that the watcher received all 10 messages, and has latency.
383 ASSERT_NE(primary_report.message().watchers(), nullptr);
384 ASSERT_EQ(primary_report.message().watchers()->size(), 1);
385 EXPECT_EQ(primary_report.message().watchers()->Get(0)->count(), 10);
386 EXPECT_NEAR(
387 primary_report.message().watchers()->Get(0)->wakeup_latency()->average(),
388 0.00005, 1e-9);
389 EXPECT_NEAR(
390 primary_report.message().watchers()->Get(0)->wakeup_latency()->min(),
391 0.00005, 1e-9);
392 EXPECT_NEAR(
393 primary_report.message().watchers()->Get(0)->wakeup_latency()->max(),
394 0.00005, 1e-9);
395 EXPECT_EQ(primary_report.message()
396 .watchers()
397 ->Get(0)
398 ->wakeup_latency()
399 ->standard_deviation(),
400 0.0);
401
402 EXPECT_EQ(
403 primary_report.message().watchers()->Get(0)->handler_time()->average(),
404 0.0);
405 EXPECT_EQ(primary_report.message().watchers()->Get(0)->handler_time()->min(),
406 0.0);
407 EXPECT_EQ(primary_report.message().watchers()->Get(0)->handler_time()->max(),
408 0.0);
409 EXPECT_EQ(primary_report.message()
410 .watchers()
411 ->Get(0)
412 ->handler_time()
413 ->standard_deviation(),
414 0.0);
415}
416
Austin Schuh89c9b812021-02-20 14:42:10 -0800417size_t CountAll(
418 const std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
419 &counters) {
420 size_t count = 0u;
421 for (const std::unique_ptr<MessageCounter<RemoteMessage>> &counter :
422 counters) {
423 count += counter->count();
424 }
425 return count;
426}
427
Austin Schuh4c3b9702020-08-30 11:34:55 -0700428// Tests that ping and pong work when on 2 different nodes, and the message
429// gateway messages are sent out as expected.
Austin Schuh89c9b812021-02-20 14:42:10 -0800430TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodePingPong) {
Austin Schuh898f4972020-01-11 17:21:25 -0800431 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
432 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
Austin Schuh4c3b9702020-08-30 11:34:55 -0700433 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
Austin Schuh898f4972020-01-11 17:21:25 -0800434
435 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
436
437 std::unique_ptr<EventLoop> ping_event_loop =
438 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
439 Ping ping(ping_event_loop.get());
440
441 std::unique_ptr<EventLoop> pong_event_loop =
442 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
443 Pong pong(pong_event_loop.get());
444
445 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
446 simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700447 MessageCounter<examples::Pong> pi2_pong_counter(
448 pi2_pong_counter_event_loop.get(), "/test");
Austin Schuh2f8fd752020-09-01 22:38:28 -0700449 aos::Fetcher<message_bridge::Timestamp> pi1_on_pi2_timestamp_fetcher =
450 pi2_pong_counter_event_loop->MakeFetcher<message_bridge::Timestamp>(
451 "/pi1/aos");
452 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
453 pi2_pong_counter_event_loop->MakeFetcher<examples::Ping>("/test");
Austin Schuh898f4972020-01-11 17:21:25 -0800454
Austin Schuh4c3b9702020-08-30 11:34:55 -0700455 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
456 simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
Austin Schuh898f4972020-01-11 17:21:25 -0800457
458 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
459 simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700460 MessageCounter<examples::Pong> pi1_pong_counter(
461 pi1_pong_counter_event_loop.get(), "/test");
Austin Schuh2f8fd752020-09-01 22:38:28 -0700462 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
463 pi1_pong_counter_event_loop->MakeFetcher<examples::Ping>("/test");
464 aos::Fetcher<message_bridge::Timestamp> pi1_on_pi1_timestamp_fetcher =
465 pi1_pong_counter_event_loop->MakeFetcher<message_bridge::Timestamp>(
466 "/aos");
467
Austin Schuh4c3b9702020-08-30 11:34:55 -0700468 // Count timestamps.
469 MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
470 pi1_pong_counter_event_loop.get(), "/pi1/aos");
471 MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
472 pi2_pong_counter_event_loop.get(), "/pi1/aos");
473 MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
474 pi3_pong_counter_event_loop.get(), "/pi1/aos");
475 MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
476 pi1_pong_counter_event_loop.get(), "/pi2/aos");
477 MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
478 pi2_pong_counter_event_loop.get(), "/pi2/aos");
479 MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
480 pi1_pong_counter_event_loop.get(), "/pi3/aos");
481 MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
482 pi3_pong_counter_event_loop.get(), "/pi3/aos");
483
Austin Schuh2f8fd752020-09-01 22:38:28 -0700484 // Count remote timestamps
Austin Schuh89c9b812021-02-20 14:42:10 -0800485 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
486 remote_timestamps_pi2_on_pi1 =
487 MakePi2OnPi1MessageCounters(pi1_pong_counter_event_loop.get());
488 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
489 remote_timestamps_pi1_on_pi2 =
490 MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700491
Austin Schuh4c3b9702020-08-30 11:34:55 -0700492 // Wait to let timestamp estimation start up before looking for the results.
493 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
494
Austin Schuh8fb315a2020-11-19 22:33:58 -0800495 std::unique_ptr<EventLoop> pi1_statistics_counter_event_loop =
496 simulated_event_loop_factory.MakeEventLoop("pi1_statistics_counter", pi1);
497 std::unique_ptr<EventLoop> pi2_statistics_counter_event_loop =
498 simulated_event_loop_factory.MakeEventLoop("pi2_statistics_counter", pi2);
499 std::unique_ptr<EventLoop> pi3_statistics_counter_event_loop =
500 simulated_event_loop_factory.MakeEventLoop("pi3_statistics_counter", pi3);
501
Austin Schuh4c3b9702020-08-30 11:34:55 -0700502 int pi1_server_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800503 pi1_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700504 "/pi1/aos", [&pi1_server_statistics_count](
505 const message_bridge::ServerStatistics &stats) {
506 VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
507 EXPECT_EQ(stats.connections()->size(), 2u);
508 for (const message_bridge::ServerConnection *connection :
509 *stats.connections()) {
510 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800511 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700512 if (connection->node()->name()->string_view() == "pi2") {
513 EXPECT_GT(connection->sent_packets(), 50);
514 } else if (connection->node()->name()->string_view() == "pi3") {
515 EXPECT_GE(connection->sent_packets(), 5);
516 } else {
517 LOG(FATAL) << "Unknown connection";
518 }
519
520 EXPECT_TRUE(connection->has_monotonic_offset());
521 EXPECT_EQ(connection->monotonic_offset(), 0);
522 }
523 ++pi1_server_statistics_count;
524 });
525
526 int pi2_server_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800527 pi2_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700528 "/pi2/aos", [&pi2_server_statistics_count](
529 const message_bridge::ServerStatistics &stats) {
530 VLOG(1) << "pi2 ServerStatistics " << FlatbufferToJson(&stats);
531 EXPECT_EQ(stats.connections()->size(), 1u);
532
533 const message_bridge::ServerConnection *connection =
534 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800535 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700536 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
537 EXPECT_GT(connection->sent_packets(), 50);
538 EXPECT_TRUE(connection->has_monotonic_offset());
539 EXPECT_EQ(connection->monotonic_offset(), 0);
540 ++pi2_server_statistics_count;
541 });
542
543 int pi3_server_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800544 pi3_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700545 "/pi3/aos", [&pi3_server_statistics_count](
546 const message_bridge::ServerStatistics &stats) {
547 VLOG(1) << "pi3 ServerStatistics " << FlatbufferToJson(&stats);
548 EXPECT_EQ(stats.connections()->size(), 1u);
549
550 const message_bridge::ServerConnection *connection =
551 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800552 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700553 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
554 EXPECT_GE(connection->sent_packets(), 5);
555 EXPECT_TRUE(connection->has_monotonic_offset());
556 EXPECT_EQ(connection->monotonic_offset(), 0);
557 ++pi3_server_statistics_count;
558 });
559
560 int pi1_client_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800561 pi1_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700562 "/pi1/aos", [&pi1_client_statistics_count](
563 const message_bridge::ClientStatistics &stats) {
564 VLOG(1) << "pi1 ClientStatistics " << FlatbufferToJson(&stats);
565 EXPECT_EQ(stats.connections()->size(), 2u);
566
567 for (const message_bridge::ClientConnection *connection :
568 *stats.connections()) {
569 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
570 if (connection->node()->name()->string_view() == "pi2") {
571 EXPECT_GT(connection->received_packets(), 50);
572 } else if (connection->node()->name()->string_view() == "pi3") {
573 EXPECT_GE(connection->received_packets(), 5);
574 } else {
575 LOG(FATAL) << "Unknown connection";
576 }
577
Austin Schuhe61d4382021-03-31 21:33:02 -0700578 EXPECT_EQ(connection->partial_deliveries(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700579 EXPECT_TRUE(connection->has_monotonic_offset());
580 EXPECT_EQ(connection->monotonic_offset(), 150000);
581 }
582 ++pi1_client_statistics_count;
583 });
584
585 int pi2_client_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800586 pi2_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700587 "/pi2/aos", [&pi2_client_statistics_count](
588 const message_bridge::ClientStatistics &stats) {
589 VLOG(1) << "pi2 ClientStatistics " << FlatbufferToJson(&stats);
590 EXPECT_EQ(stats.connections()->size(), 1u);
591
592 const message_bridge::ClientConnection *connection =
593 stats.connections()->Get(0);
594 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
595 EXPECT_GT(connection->received_packets(), 50);
Austin Schuhe61d4382021-03-31 21:33:02 -0700596 EXPECT_EQ(connection->partial_deliveries(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700597 EXPECT_TRUE(connection->has_monotonic_offset());
598 EXPECT_EQ(connection->monotonic_offset(), 150000);
599 ++pi2_client_statistics_count;
600 });
601
602 int pi3_client_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800603 pi3_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700604 "/pi3/aos", [&pi3_client_statistics_count](
605 const message_bridge::ClientStatistics &stats) {
606 VLOG(1) << "pi3 ClientStatistics " << FlatbufferToJson(&stats);
607 EXPECT_EQ(stats.connections()->size(), 1u);
608
609 const message_bridge::ClientConnection *connection =
610 stats.connections()->Get(0);
611 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
612 EXPECT_GE(connection->received_packets(), 5);
Austin Schuhe61d4382021-03-31 21:33:02 -0700613 EXPECT_EQ(connection->partial_deliveries(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700614 EXPECT_TRUE(connection->has_monotonic_offset());
615 EXPECT_EQ(connection->monotonic_offset(), 150000);
616 ++pi3_client_statistics_count;
617 });
618
Austin Schuh2f8fd752020-09-01 22:38:28 -0700619 // Find the channel index for both the /pi1/aos Timestamp channel and Ping
620 // channel.
621 const size_t pi1_timestamp_channel =
622 configuration::ChannelIndex(pi1_pong_counter_event_loop->configuration(),
623 pi1_on_pi2_timestamp_fetcher.channel());
624 const size_t ping_timestamp_channel =
625 configuration::ChannelIndex(pi1_pong_counter_event_loop->configuration(),
626 ping_on_pi2_fetcher.channel());
627
628 for (const Channel *channel :
629 *pi1_pong_counter_event_loop->configuration()->channels()) {
630 VLOG(1) << "Channel "
631 << configuration::ChannelIndex(
632 pi1_pong_counter_event_loop->configuration(), channel)
633 << " " << configuration::CleanedChannelToString(channel);
634 }
635
Austin Schuh8fb315a2020-11-19 22:33:58 -0800636 std::unique_ptr<EventLoop> pi1_remote_timestamp =
637 simulated_event_loop_factory.MakeEventLoop("pi1_remote_timestamp", pi1);
638
Austin Schuh89c9b812021-02-20 14:42:10 -0800639 for (std::pair<int, std::string> channel :
640 shared()
641 ? std::vector<std::pair<
642 int, std::string>>{{-1, "/pi1/aos/remote_timestamps/pi2"}}
643 : std::vector<std::pair<int, std::string>>{
644 {pi1_timestamp_channel,
645 "/pi1/aos/remote_timestamps/pi2/pi1/aos/"
646 "aos-message_bridge-Timestamp"},
647 {ping_timestamp_channel,
648 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping"}}) {
649 // For each remote timestamp we get back, confirm that it is either a ping
650 // message, or a timestamp we sent out. Also confirm that the timestamps
651 // are correct.
652 pi1_remote_timestamp->MakeWatcher(
653 channel.second,
654 [pi1_timestamp_channel, ping_timestamp_channel, &ping_on_pi2_fetcher,
655 &ping_on_pi1_fetcher, &pi1_on_pi2_timestamp_fetcher,
656 &pi1_on_pi1_timestamp_fetcher, &simulated_event_loop_factory, pi2,
657 channel_index = channel.first](const RemoteMessage &header) {
658 VLOG(1) << aos::FlatbufferToJson(&header);
659 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuhcdd90272021-03-15 12:46:16 -0700660 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
Austin Schuh89c9b812021-02-20 14:42:10 -0800661 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
Austin Schuhcdd90272021-03-15 12:46:16 -0700662 ->boot_uuid());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700663
Austin Schuh89c9b812021-02-20 14:42:10 -0800664 const aos::monotonic_clock::time_point header_monotonic_sent_time(
665 chrono::nanoseconds(header.monotonic_sent_time()));
666 const aos::realtime_clock::time_point header_realtime_sent_time(
667 chrono::nanoseconds(header.realtime_sent_time()));
668 const aos::monotonic_clock::time_point header_monotonic_remote_time(
669 chrono::nanoseconds(header.monotonic_remote_time()));
670 const aos::realtime_clock::time_point header_realtime_remote_time(
671 chrono::nanoseconds(header.realtime_remote_time()));
Austin Schuh2f8fd752020-09-01 22:38:28 -0700672
Austin Schuh89c9b812021-02-20 14:42:10 -0800673 if (channel_index != -1) {
674 ASSERT_EQ(channel_index, header.channel_index());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700675 }
676
Austin Schuh89c9b812021-02-20 14:42:10 -0800677 const Context *pi1_context = nullptr;
678 const Context *pi2_context = nullptr;
679
680 if (header.channel_index() == pi1_timestamp_channel) {
681 // Find the forwarded message.
682 while (pi1_on_pi2_timestamp_fetcher.context().monotonic_event_time <
683 header_monotonic_sent_time) {
684 ASSERT_TRUE(pi1_on_pi2_timestamp_fetcher.FetchNext());
685 }
686
687 // And the source message.
688 while (pi1_on_pi1_timestamp_fetcher.context().monotonic_event_time <
689 header_monotonic_remote_time) {
690 ASSERT_TRUE(pi1_on_pi1_timestamp_fetcher.FetchNext());
691 }
692
693 pi1_context = &pi1_on_pi1_timestamp_fetcher.context();
694 pi2_context = &pi1_on_pi2_timestamp_fetcher.context();
695 } else if (header.channel_index() == ping_timestamp_channel) {
696 // Find the forwarded message.
697 while (ping_on_pi2_fetcher.context().monotonic_event_time <
698 header_monotonic_sent_time) {
699 ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
700 }
701
702 // And the source message.
703 while (ping_on_pi1_fetcher.context().monotonic_event_time <
704 header_monotonic_remote_time) {
705 ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
706 }
707
708 pi1_context = &ping_on_pi1_fetcher.context();
709 pi2_context = &ping_on_pi2_fetcher.context();
710 } else {
711 LOG(FATAL) << "Unknown channel";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700712 }
713
Austin Schuh89c9b812021-02-20 14:42:10 -0800714 // Confirm the forwarded message has matching timestamps to the
715 // timestamps we got back.
716 EXPECT_EQ(pi2_context->queue_index, header.queue_index());
717 EXPECT_EQ(pi2_context->remote_queue_index,
718 header.remote_queue_index());
719 EXPECT_EQ(pi2_context->monotonic_event_time,
720 header_monotonic_sent_time);
721 EXPECT_EQ(pi2_context->realtime_event_time,
722 header_realtime_sent_time);
723 EXPECT_EQ(pi2_context->realtime_remote_time,
724 header_realtime_remote_time);
725 EXPECT_EQ(pi2_context->monotonic_remote_time,
726 header_monotonic_remote_time);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700727
Austin Schuh89c9b812021-02-20 14:42:10 -0800728 // Confirm the forwarded message also matches the source message.
729 EXPECT_EQ(pi1_context->queue_index, header.remote_queue_index());
730 EXPECT_EQ(pi1_context->monotonic_event_time,
731 header_monotonic_remote_time);
732 EXPECT_EQ(pi1_context->realtime_event_time,
733 header_realtime_remote_time);
734 });
735 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700736
Austin Schuh4c3b9702020-08-30 11:34:55 -0700737 simulated_event_loop_factory.RunFor(chrono::seconds(10) -
738 chrono::milliseconds(500) +
739 chrono::milliseconds(5));
740
741 EXPECT_EQ(pi1_pong_counter.count(), 1001);
742 EXPECT_EQ(pi2_pong_counter.count(), 1001);
743
744 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 100);
745 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 100);
746 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 100);
747 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 100);
748 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 100);
749 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 100);
750 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 100);
751
Austin Schuh20ac95d2020-12-05 17:24:19 -0800752 EXPECT_EQ(pi1_server_statistics_count, 10);
753 EXPECT_EQ(pi2_server_statistics_count, 10);
754 EXPECT_EQ(pi3_server_statistics_count, 10);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700755
756 EXPECT_EQ(pi1_client_statistics_count, 95);
757 EXPECT_EQ(pi2_client_statistics_count, 95);
758 EXPECT_EQ(pi3_client_statistics_count, 95);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700759
760 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -0800761 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 1101);
762 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 1101);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700763}
764
765// Tests that an offset between nodes can be recovered and shows up in
766// ServerStatistics correctly.
767TEST(SimulatedEventLoopTest, MultinodePingPongWithOffset) {
768 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
Austin Schuh373f1762021-06-02 21:07:09 -0700769 aos::configuration::ReadConfig(ArtifactPath(
770 "aos/events/multinode_pingpong_test_combined_config.json"));
Austin Schuh4c3b9702020-08-30 11:34:55 -0700771 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
Austin Schuh87dd3832021-01-01 23:07:31 -0800772 const size_t pi1_index = configuration::GetNodeIndex(&config.message(), pi1);
773 ASSERT_EQ(pi1_index, 0u);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700774 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
Austin Schuh87dd3832021-01-01 23:07:31 -0800775 const size_t pi2_index = configuration::GetNodeIndex(&config.message(), pi2);
776 ASSERT_EQ(pi2_index, 1u);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700777 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
Austin Schuh87dd3832021-01-01 23:07:31 -0800778 const size_t pi3_index = configuration::GetNodeIndex(&config.message(), pi3);
779 ASSERT_EQ(pi3_index, 2u);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700780
Austin Schuh87dd3832021-01-01 23:07:31 -0800781 message_bridge::TestingTimeConverter time(
782 configuration::NodesCount(&config.message()));
Austin Schuh4c3b9702020-08-30 11:34:55 -0700783 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
Austin Schuh58646e22021-08-23 23:51:46 -0700784 simulated_event_loop_factory.SetTimeConverter(&time);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700785
786 constexpr chrono::milliseconds kOffset{1501};
Austin Schuh87dd3832021-01-01 23:07:31 -0800787 time.AddNextTimestamp(
788 distributed_clock::epoch(),
Austin Schuh58646e22021-08-23 23:51:46 -0700789 {BootTimestamp::epoch(), BootTimestamp::epoch() + kOffset,
790 BootTimestamp::epoch()});
Austin Schuh4c3b9702020-08-30 11:34:55 -0700791
792 std::unique_ptr<EventLoop> ping_event_loop =
793 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
794 Ping ping(ping_event_loop.get());
795
796 std::unique_ptr<EventLoop> pong_event_loop =
797 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
798 Pong pong(pong_event_loop.get());
799
Austin Schuh8fb315a2020-11-19 22:33:58 -0800800 // Wait to let timestamp estimation start up before looking for the results.
801 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
802
Austin Schuh87dd3832021-01-01 23:07:31 -0800803 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
804 simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
805
Austin Schuh4c3b9702020-08-30 11:34:55 -0700806 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
807 simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
808
809 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
810 simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
811
Austin Schuh4c3b9702020-08-30 11:34:55 -0700812 // Confirm the offsets are being recovered correctly.
813 int pi1_server_statistics_count = 0;
814 pi1_pong_counter_event_loop->MakeWatcher(
815 "/pi1/aos", [&pi1_server_statistics_count,
816 kOffset](const message_bridge::ServerStatistics &stats) {
817 VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
818 EXPECT_EQ(stats.connections()->size(), 2u);
819 for (const message_bridge::ServerConnection *connection :
820 *stats.connections()) {
821 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800822 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700823 if (connection->node()->name()->string_view() == "pi2") {
824 EXPECT_EQ(connection->monotonic_offset(),
825 chrono::nanoseconds(kOffset).count());
826 } else if (connection->node()->name()->string_view() == "pi3") {
827 EXPECT_EQ(connection->monotonic_offset(), 0);
828 } else {
829 LOG(FATAL) << "Unknown connection";
830 }
831
832 EXPECT_TRUE(connection->has_monotonic_offset());
833 }
834 ++pi1_server_statistics_count;
835 });
836
837 int pi2_server_statistics_count = 0;
838 pi2_pong_counter_event_loop->MakeWatcher(
839 "/pi2/aos", [&pi2_server_statistics_count,
840 kOffset](const message_bridge::ServerStatistics &stats) {
841 VLOG(1) << "pi2 ServerStatistics " << FlatbufferToJson(&stats);
842 EXPECT_EQ(stats.connections()->size(), 1u);
843
844 const message_bridge::ServerConnection *connection =
845 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800846 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700847 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
848 EXPECT_TRUE(connection->has_monotonic_offset());
849 EXPECT_EQ(connection->monotonic_offset(),
850 -chrono::nanoseconds(kOffset).count());
851 ++pi2_server_statistics_count;
852 });
853
854 int pi3_server_statistics_count = 0;
855 pi3_pong_counter_event_loop->MakeWatcher(
856 "/pi3/aos", [&pi3_server_statistics_count](
857 const message_bridge::ServerStatistics &stats) {
858 VLOG(1) << "pi3 ServerStatistics " << FlatbufferToJson(&stats);
859 EXPECT_EQ(stats.connections()->size(), 1u);
860
861 const message_bridge::ServerConnection *connection =
862 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800863 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700864 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
865 EXPECT_TRUE(connection->has_monotonic_offset());
866 EXPECT_EQ(connection->monotonic_offset(), 0);
867 ++pi3_server_statistics_count;
868 });
869
870 simulated_event_loop_factory.RunFor(chrono::seconds(10) -
871 chrono::milliseconds(500) +
872 chrono::milliseconds(5));
873
Austin Schuh20ac95d2020-12-05 17:24:19 -0800874 EXPECT_EQ(pi1_server_statistics_count, 10);
Austin Schuh58646e22021-08-23 23:51:46 -0700875 EXPECT_EQ(pi2_server_statistics_count, 10);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800876 EXPECT_EQ(pi3_server_statistics_count, 10);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700877}
878
879// Test that disabling statistics actually disables them.
Austin Schuh89c9b812021-02-20 14:42:10 -0800880TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeWithoutStatistics) {
Austin Schuh4c3b9702020-08-30 11:34:55 -0700881 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
882 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
883 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
884
885 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
886 simulated_event_loop_factory.DisableStatistics();
887
888 std::unique_ptr<EventLoop> ping_event_loop =
889 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
890 Ping ping(ping_event_loop.get());
891
892 std::unique_ptr<EventLoop> pong_event_loop =
893 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
894 Pong pong(pong_event_loop.get());
895
896 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
897 simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
898
899 MessageCounter<examples::Pong> pi2_pong_counter(
900 pi2_pong_counter_event_loop.get(), "/test");
901
902 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
903 simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
904
905 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
906 simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
907
908 MessageCounter<examples::Pong> pi1_pong_counter(
909 pi1_pong_counter_event_loop.get(), "/test");
910
911 // Count timestamps.
912 MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
913 pi1_pong_counter_event_loop.get(), "/pi1/aos");
914 MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
915 pi2_pong_counter_event_loop.get(), "/pi1/aos");
916 MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
917 pi3_pong_counter_event_loop.get(), "/pi1/aos");
918 MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
919 pi1_pong_counter_event_loop.get(), "/pi2/aos");
920 MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
921 pi2_pong_counter_event_loop.get(), "/pi2/aos");
922 MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
923 pi1_pong_counter_event_loop.get(), "/pi3/aos");
924 MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
925 pi3_pong_counter_event_loop.get(), "/pi3/aos");
926
Austin Schuh2f8fd752020-09-01 22:38:28 -0700927 // Count remote timestamps
Austin Schuh89c9b812021-02-20 14:42:10 -0800928 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
929 remote_timestamps_pi2_on_pi1 =
930 MakePi2OnPi1MessageCounters(pi1_pong_counter_event_loop.get());
931 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
932 remote_timestamps_pi1_on_pi2 =
933 MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700934
Austin Schuh4c3b9702020-08-30 11:34:55 -0700935 MessageCounter<message_bridge::ServerStatistics>
936 pi1_server_statistics_counter(pi1_pong_counter_event_loop.get(),
937 "/pi1/aos");
938 MessageCounter<message_bridge::ServerStatistics>
939 pi2_server_statistics_counter(pi2_pong_counter_event_loop.get(),
940 "/pi2/aos");
941 MessageCounter<message_bridge::ServerStatistics>
942 pi3_server_statistics_counter(pi3_pong_counter_event_loop.get(),
943 "/pi3/aos");
944
945 MessageCounter<message_bridge::ClientStatistics>
946 pi1_client_statistics_counter(pi1_pong_counter_event_loop.get(),
947 "/pi1/aos");
948 MessageCounter<message_bridge::ClientStatistics>
949 pi2_client_statistics_counter(pi2_pong_counter_event_loop.get(),
950 "/pi2/aos");
951 MessageCounter<message_bridge::ClientStatistics>
952 pi3_client_statistics_counter(pi3_pong_counter_event_loop.get(),
953 "/pi3/aos");
Austin Schuh898f4972020-01-11 17:21:25 -0800954
955 simulated_event_loop_factory.RunFor(chrono::seconds(10) +
956 chrono::milliseconds(5));
957
Austin Schuh4c3b9702020-08-30 11:34:55 -0700958 EXPECT_EQ(pi1_pong_counter.count(), 1001u);
959 EXPECT_EQ(pi2_pong_counter.count(), 1001u);
960
961 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 0u);
962 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 0u);
963 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 0u);
964 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 0u);
965 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 0u);
966 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 0u);
967 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 0u);
968
969 EXPECT_EQ(pi1_server_statistics_counter.count(), 0u);
970 EXPECT_EQ(pi2_server_statistics_counter.count(), 0u);
971 EXPECT_EQ(pi3_server_statistics_counter.count(), 0u);
972
973 EXPECT_EQ(pi1_client_statistics_counter.count(), 0u);
974 EXPECT_EQ(pi2_client_statistics_counter.count(), 0u);
975 EXPECT_EQ(pi3_client_statistics_counter.count(), 0u);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700976
977 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -0800978 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 1001);
979 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 1001);
Austin Schuh898f4972020-01-11 17:21:25 -0800980}
981
Austin Schuhc0b0f722020-12-12 18:36:06 -0800982bool AllConnected(const message_bridge::ServerStatistics *server_statistics) {
983 for (const message_bridge::ServerConnection *connection :
984 *server_statistics->connections()) {
985 if (connection->state() != message_bridge::State::CONNECTED) {
986 return false;
987 }
988 }
989 return true;
990}
991
992bool AllConnectedBut(const message_bridge::ServerStatistics *server_statistics,
993 std::string_view target) {
994 for (const message_bridge::ServerConnection *connection :
995 *server_statistics->connections()) {
996 if (connection->node()->name()->string_view() == target) {
997 if (connection->state() == message_bridge::State::CONNECTED) {
998 return false;
999 }
1000 } else {
1001 if (connection->state() != message_bridge::State::CONNECTED) {
1002 return false;
1003 }
1004 }
1005 }
1006 return true;
1007}
1008
1009bool AllConnected(const message_bridge::ClientStatistics *client_statistics) {
1010 for (const message_bridge::ClientConnection *connection :
1011 *client_statistics->connections()) {
1012 if (connection->state() != message_bridge::State::CONNECTED) {
1013 return false;
1014 }
1015 }
1016 return true;
1017}
1018
1019bool AllConnectedBut(const message_bridge::ClientStatistics *client_statistics,
1020 std::string_view target) {
1021 for (const message_bridge::ClientConnection *connection :
1022 *client_statistics->connections()) {
1023 if (connection->node()->name()->string_view() == target) {
1024 if (connection->state() == message_bridge::State::CONNECTED) {
1025 return false;
1026 }
1027 } else {
1028 if (connection->state() != message_bridge::State::CONNECTED) {
1029 return false;
1030 }
1031 }
1032 }
1033 return true;
1034}
1035
1036// Test that disconnecting nodes actually disconnects them.
Austin Schuh89c9b812021-02-20 14:42:10 -08001037TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeDisconnect) {
Austin Schuhc0b0f722020-12-12 18:36:06 -08001038 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
1039
Austin Schuh58646e22021-08-23 23:51:46 -07001040 NodeEventLoopFactory *pi1 =
1041 simulated_event_loop_factory.GetNodeEventLoopFactory("pi1");
1042 NodeEventLoopFactory *pi2 =
1043 simulated_event_loop_factory.GetNodeEventLoopFactory("pi2");
1044 NodeEventLoopFactory *pi3 =
1045 simulated_event_loop_factory.GetNodeEventLoopFactory("pi3");
1046
1047 std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001048 Ping ping(ping_event_loop.get());
1049
Austin Schuh58646e22021-08-23 23:51:46 -07001050 std::unique_ptr<EventLoop> pong_event_loop = pi2->MakeEventLoop("pong");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001051 Pong pong(pong_event_loop.get());
1052
1053 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
Austin Schuh58646e22021-08-23 23:51:46 -07001054 pi2->MakeEventLoop("pi2_pong_counter");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001055
1056 MessageCounter<examples::Pong> pi2_pong_counter(
1057 pi2_pong_counter_event_loop.get(), "/test");
1058
1059 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
Austin Schuh58646e22021-08-23 23:51:46 -07001060 pi3->MakeEventLoop("pi3_pong_counter");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001061
1062 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
Austin Schuh58646e22021-08-23 23:51:46 -07001063 pi1->MakeEventLoop("pi1_pong_counter");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001064
1065 MessageCounter<examples::Pong> pi1_pong_counter(
1066 pi1_pong_counter_event_loop.get(), "/test");
1067
1068 // Count timestamps.
1069 MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
1070 pi1_pong_counter_event_loop.get(), "/pi1/aos");
1071 MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
1072 pi2_pong_counter_event_loop.get(), "/pi1/aos");
1073 MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
1074 pi3_pong_counter_event_loop.get(), "/pi1/aos");
1075 MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
1076 pi1_pong_counter_event_loop.get(), "/pi2/aos");
1077 MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
1078 pi2_pong_counter_event_loop.get(), "/pi2/aos");
1079 MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
1080 pi1_pong_counter_event_loop.get(), "/pi3/aos");
1081 MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
1082 pi3_pong_counter_event_loop.get(), "/pi3/aos");
1083
1084 // Count remote timestamps
Austin Schuh89c9b812021-02-20 14:42:10 -08001085 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1086 remote_timestamps_pi2_on_pi1 =
1087 MakePi2OnPi1MessageCounters(pi1_pong_counter_event_loop.get());
1088 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1089 remote_timestamps_pi1_on_pi2 =
1090 MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001091
1092 MessageCounter<message_bridge::ServerStatistics>
Austin Schuh58646e22021-08-23 23:51:46 -07001093 *pi1_server_statistics_counter;
1094 pi1->OnStartup([pi1, &pi1_server_statistics_counter]() {
1095 pi1_server_statistics_counter =
1096 pi1->AlwaysStart<MessageCounter<message_bridge::ServerStatistics>>(
1097 "pi1_server_statistics_counter", "/pi1/aos");
1098 });
1099
Austin Schuhc0b0f722020-12-12 18:36:06 -08001100 aos::Fetcher<message_bridge::ServerStatistics> pi1_server_statistics_fetcher =
1101 pi1_pong_counter_event_loop
1102 ->MakeFetcher<message_bridge::ServerStatistics>("/pi1/aos");
1103 aos::Fetcher<message_bridge::ClientStatistics> pi1_client_statistics_fetcher =
1104 pi1_pong_counter_event_loop
1105 ->MakeFetcher<message_bridge::ClientStatistics>("/pi1/aos");
1106
1107 MessageCounter<message_bridge::ServerStatistics>
Austin Schuh58646e22021-08-23 23:51:46 -07001108 *pi2_server_statistics_counter;
1109 pi2->OnStartup([pi2, &pi2_server_statistics_counter]() {
1110 pi2_server_statistics_counter =
1111 pi2->AlwaysStart<MessageCounter<message_bridge::ServerStatistics>>(
1112 "pi2_server_statistics_counter", "/pi2/aos");
1113 });
Austin Schuhc0b0f722020-12-12 18:36:06 -08001114 aos::Fetcher<message_bridge::ServerStatistics> pi2_server_statistics_fetcher =
1115 pi2_pong_counter_event_loop
1116 ->MakeFetcher<message_bridge::ServerStatistics>("/pi2/aos");
1117 aos::Fetcher<message_bridge::ClientStatistics> pi2_client_statistics_fetcher =
1118 pi2_pong_counter_event_loop
1119 ->MakeFetcher<message_bridge::ClientStatistics>("/pi2/aos");
1120
1121 MessageCounter<message_bridge::ServerStatistics>
Austin Schuh58646e22021-08-23 23:51:46 -07001122 *pi3_server_statistics_counter;
1123 pi3->OnStartup([pi3, &pi3_server_statistics_counter]() {
1124 pi3_server_statistics_counter =
1125 pi3->AlwaysStart<MessageCounter<message_bridge::ServerStatistics>>(
1126 "pi3_server_statistics_counter", "/pi3/aos");
1127 });
Austin Schuhc0b0f722020-12-12 18:36:06 -08001128 aos::Fetcher<message_bridge::ServerStatistics> pi3_server_statistics_fetcher =
1129 pi3_pong_counter_event_loop
1130 ->MakeFetcher<message_bridge::ServerStatistics>("/pi3/aos");
1131 aos::Fetcher<message_bridge::ClientStatistics> pi3_client_statistics_fetcher =
1132 pi3_pong_counter_event_loop
1133 ->MakeFetcher<message_bridge::ClientStatistics>("/pi3/aos");
1134
1135 MessageCounter<message_bridge::ClientStatistics>
1136 pi1_client_statistics_counter(pi1_pong_counter_event_loop.get(),
1137 "/pi1/aos");
1138 MessageCounter<message_bridge::ClientStatistics>
1139 pi2_client_statistics_counter(pi2_pong_counter_event_loop.get(),
1140 "/pi2/aos");
1141 MessageCounter<message_bridge::ClientStatistics>
1142 pi3_client_statistics_counter(pi3_pong_counter_event_loop.get(),
1143 "/pi3/aos");
1144
1145 simulated_event_loop_factory.RunFor(chrono::seconds(2) +
1146 chrono::milliseconds(5));
1147
1148 EXPECT_EQ(pi1_pong_counter.count(), 201u);
1149 EXPECT_EQ(pi2_pong_counter.count(), 201u);
1150
1151 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 20u);
1152 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 20u);
1153 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 20u);
1154 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 20u);
1155 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 20u);
1156 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 20u);
1157 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 20u);
1158
Austin Schuh58646e22021-08-23 23:51:46 -07001159 EXPECT_EQ(pi1_server_statistics_counter->count(), 2u);
1160 EXPECT_EQ(pi2_server_statistics_counter->count(), 2u);
1161 EXPECT_EQ(pi3_server_statistics_counter->count(), 2u);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001162
1163 EXPECT_EQ(pi1_client_statistics_counter.count(), 20u);
1164 EXPECT_EQ(pi2_client_statistics_counter.count(), 20u);
1165 EXPECT_EQ(pi3_client_statistics_counter.count(), 20u);
1166
1167 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001168 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 221);
1169 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 221);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001170
1171 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1172 EXPECT_TRUE(AllConnected(pi1_server_statistics_fetcher.get()))
1173 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1174 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1175 EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
1176 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1177 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1178 EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
1179 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1180 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1181 EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
1182 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1183 EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
1184 EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
1185 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
1186 EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
1187 EXPECT_TRUE(AllConnected(pi3_client_statistics_fetcher.get()))
1188 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1189
Austin Schuh58646e22021-08-23 23:51:46 -07001190 pi1->Disconnect(pi3->node());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001191
1192 simulated_event_loop_factory.RunFor(chrono::seconds(2));
1193
1194 EXPECT_EQ(pi1_pong_counter.count(), 401u);
1195 EXPECT_EQ(pi2_pong_counter.count(), 401u);
1196
1197 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 40u);
1198 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 40u);
1199 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 20u);
1200 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 40u);
1201 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 40u);
1202 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 40u);
1203 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 40u);
1204
Austin Schuh58646e22021-08-23 23:51:46 -07001205 EXPECT_EQ(pi1_server_statistics_counter->count(), 4u);
1206 EXPECT_EQ(pi2_server_statistics_counter->count(), 4u);
1207 EXPECT_EQ(pi3_server_statistics_counter->count(), 4u);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001208
1209 EXPECT_EQ(pi1_client_statistics_counter.count(), 40u);
1210 EXPECT_EQ(pi2_client_statistics_counter.count(), 40u);
1211 EXPECT_EQ(pi3_client_statistics_counter.count(), 40u);
1212
1213 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001214 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 441);
1215 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 441);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001216
1217 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1218 EXPECT_TRUE(AllConnectedBut(pi1_server_statistics_fetcher.get(), "pi3"))
1219 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1220 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1221 EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
1222 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1223 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1224 EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
1225 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1226 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1227 EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
1228 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1229 EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
1230 EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
1231 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
1232 EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
1233 EXPECT_TRUE(AllConnectedBut(pi3_client_statistics_fetcher.get(), "pi1"))
1234 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1235
Austin Schuh58646e22021-08-23 23:51:46 -07001236 pi1->Connect(pi3->node());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001237
1238 simulated_event_loop_factory.RunFor(chrono::seconds(2));
1239
1240 EXPECT_EQ(pi1_pong_counter.count(), 601u);
1241 EXPECT_EQ(pi2_pong_counter.count(), 601u);
1242
1243 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 60u);
1244 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 60u);
1245 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 40u);
1246 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 60u);
1247 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 60u);
1248 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 60u);
1249 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 60u);
1250
Austin Schuh58646e22021-08-23 23:51:46 -07001251 EXPECT_EQ(pi1_server_statistics_counter->count(), 6u);
1252 EXPECT_EQ(pi2_server_statistics_counter->count(), 6u);
1253 EXPECT_EQ(pi3_server_statistics_counter->count(), 6u);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001254
1255 EXPECT_EQ(pi1_client_statistics_counter.count(), 60u);
1256 EXPECT_EQ(pi2_client_statistics_counter.count(), 60u);
1257 EXPECT_EQ(pi3_client_statistics_counter.count(), 60u);
1258
1259 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001260 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 661);
1261 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 661);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001262
1263 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1264 EXPECT_TRUE(AllConnected(pi1_server_statistics_fetcher.get()))
1265 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1266 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1267 EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
1268 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1269 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1270 EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
1271 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1272 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1273 EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
1274 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1275 EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
1276 EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
1277 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
1278 EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
1279 EXPECT_TRUE(AllConnected(pi3_client_statistics_fetcher.get()))
1280 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1281}
1282
Austin Schuh2febf0d2020-09-21 22:24:30 -07001283// Tests that the time offset having a slope doesn't break the world.
1284// SimulatedMessageBridge has enough self consistency CHECK statements to
1285// confirm, and we can can also check a message in each direction to make sure
1286// it gets delivered as expected.
1287TEST(SimulatedEventLoopTest, MultinodePingPongWithOffsetAndSlope) {
1288 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
Austin Schuh373f1762021-06-02 21:07:09 -07001289 aos::configuration::ReadConfig(ArtifactPath(
1290 "aos/events/multinode_pingpong_test_combined_config.json"));
Austin Schuh2febf0d2020-09-21 22:24:30 -07001291 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
Austin Schuh87dd3832021-01-01 23:07:31 -08001292 const size_t pi1_index = configuration::GetNodeIndex(&config.message(), pi1);
1293 ASSERT_EQ(pi1_index, 0u);
Austin Schuh2febf0d2020-09-21 22:24:30 -07001294 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
Austin Schuh87dd3832021-01-01 23:07:31 -08001295 const size_t pi2_index = configuration::GetNodeIndex(&config.message(), pi2);
1296 ASSERT_EQ(pi2_index, 1u);
1297 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
1298 const size_t pi3_index = configuration::GetNodeIndex(&config.message(), pi3);
1299 ASSERT_EQ(pi3_index, 2u);
Austin Schuh2febf0d2020-09-21 22:24:30 -07001300
Austin Schuh87dd3832021-01-01 23:07:31 -08001301 message_bridge::TestingTimeConverter time(
1302 configuration::NodesCount(&config.message()));
Austin Schuh2febf0d2020-09-21 22:24:30 -07001303 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
Austin Schuh58646e22021-08-23 23:51:46 -07001304 simulated_event_loop_factory.SetTimeConverter(&time);
Austin Schuh2febf0d2020-09-21 22:24:30 -07001305
Austin Schuh2febf0d2020-09-21 22:24:30 -07001306 constexpr chrono::milliseconds kOffset{150100};
Austin Schuh87dd3832021-01-01 23:07:31 -08001307 time.AddNextTimestamp(
1308 distributed_clock::epoch(),
Austin Schuh58646e22021-08-23 23:51:46 -07001309 {BootTimestamp::epoch(), BootTimestamp::epoch() + kOffset,
1310 BootTimestamp::epoch()});
1311 time.AddNextTimestamp(distributed_clock::epoch() + chrono::seconds(10),
1312 {BootTimestamp::epoch() + chrono::milliseconds(9999),
1313 BootTimestamp::epoch() + kOffset + chrono::seconds(10),
1314 BootTimestamp::epoch() + chrono::milliseconds(9999)});
Austin Schuh2febf0d2020-09-21 22:24:30 -07001315
1316 std::unique_ptr<EventLoop> ping_event_loop =
1317 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
1318 Ping ping(ping_event_loop.get());
1319
1320 std::unique_ptr<EventLoop> pong_event_loop =
1321 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
1322 Pong pong(pong_event_loop.get());
1323
1324 std::unique_ptr<EventLoop> pi1_counter_event_loop =
1325 simulated_event_loop_factory.MakeEventLoop("pi1_counter", pi1);
1326 std::unique_ptr<EventLoop> pi2_counter_event_loop =
1327 simulated_event_loop_factory.MakeEventLoop("pi2_counter", pi2);
1328
1329 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
1330 pi1_counter_event_loop->MakeFetcher<examples::Ping>("/test");
1331 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
1332 pi2_counter_event_loop->MakeFetcher<examples::Ping>("/test");
1333
1334 aos::Fetcher<examples::Pong> pong_on_pi2_fetcher =
1335 pi2_counter_event_loop->MakeFetcher<examples::Pong>("/test");
1336 aos::Fetcher<examples::Pong> pong_on_pi1_fetcher =
1337 pi1_counter_event_loop->MakeFetcher<examples::Pong>("/test");
1338
1339 // End after a pong message comes back. This will leave the latest messages
1340 // on all channels so we can look at timestamps easily and check they make
1341 // sense.
1342 std::unique_ptr<EventLoop> pi1_pong_ender =
1343 simulated_event_loop_factory.MakeEventLoop("pi2_counter", pi1);
1344 int count = 0;
1345 pi1_pong_ender->MakeWatcher(
1346 "/test", [&simulated_event_loop_factory, &count](const examples::Pong &) {
1347 if (++count == 100) {
1348 simulated_event_loop_factory.Exit();
1349 }
1350 });
1351
1352 // Run enough that messages should be delivered.
1353 simulated_event_loop_factory.Run();
1354
1355 // Grab the latest messages.
1356 EXPECT_TRUE(ping_on_pi1_fetcher.Fetch());
1357 EXPECT_TRUE(ping_on_pi2_fetcher.Fetch());
1358 EXPECT_TRUE(pong_on_pi1_fetcher.Fetch());
1359 EXPECT_TRUE(pong_on_pi2_fetcher.Fetch());
1360
1361 // Compute their time on the global distributed clock so we can compute
1362 // distance betwen them.
1363 const distributed_clock::time_point pi1_ping_time =
1364 simulated_event_loop_factory.GetNodeEventLoopFactory(pi1)
1365 ->ToDistributedClock(
1366 ping_on_pi1_fetcher.context().monotonic_event_time);
1367 const distributed_clock::time_point pi2_ping_time =
1368 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
1369 ->ToDistributedClock(
1370 ping_on_pi2_fetcher.context().monotonic_event_time);
1371 const distributed_clock::time_point pi1_pong_time =
1372 simulated_event_loop_factory.GetNodeEventLoopFactory(pi1)
1373 ->ToDistributedClock(
1374 pong_on_pi1_fetcher.context().monotonic_event_time);
1375 const distributed_clock::time_point pi2_pong_time =
1376 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
1377 ->ToDistributedClock(
1378 pong_on_pi2_fetcher.context().monotonic_event_time);
1379
1380 // And confirm the delivery delay is just about exactly 150 uS for both
1381 // directions like expected. There will be a couple ns of rounding errors in
1382 // the conversion functions that aren't worth accounting for right now. This
1383 // will either be really close, or really far.
1384 EXPECT_GE(pi2_ping_time, chrono::microseconds(150) - chrono::nanoseconds(10) +
1385 pi1_ping_time);
1386 EXPECT_LE(pi2_ping_time, chrono::microseconds(150) + chrono::nanoseconds(10) +
1387 pi1_ping_time);
1388
1389 EXPECT_GE(pi1_pong_time, chrono::microseconds(150) - chrono::nanoseconds(10) +
1390 pi2_pong_time);
1391 EXPECT_LE(pi1_pong_time, chrono::microseconds(150) + chrono::nanoseconds(10) +
1392 pi2_pong_time);
1393}
1394
Austin Schuh4c570ea2020-11-19 23:13:24 -08001395void SendPing(aos::Sender<examples::Ping> *sender, int value) {
1396 aos::Sender<examples::Ping>::Builder builder = sender->MakeBuilder();
1397 examples::Ping::Builder ping_builder = builder.MakeBuilder<examples::Ping>();
1398 ping_builder.add_value(value);
milind1f1dca32021-07-03 13:50:07 -07001399 builder.CheckOk(builder.Send(ping_builder.Finish()));
Austin Schuh4c570ea2020-11-19 23:13:24 -08001400}
1401
1402// Tests that reliable (and unreliable) ping messages get forwarded as expected.
Austin Schuh89c9b812021-02-20 14:42:10 -08001403TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeStartupTesting) {
Austin Schuh4c570ea2020-11-19 23:13:24 -08001404 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
1405 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
1406
1407 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
1408
1409 std::unique_ptr<EventLoop> ping_event_loop =
1410 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
1411 aos::Sender<examples::Ping> pi1_reliable_sender =
1412 ping_event_loop->MakeSender<examples::Ping>("/reliable");
1413 aos::Sender<examples::Ping> pi1_unreliable_sender =
1414 ping_event_loop->MakeSender<examples::Ping>("/unreliable");
1415 SendPing(&pi1_reliable_sender, 1);
1416 SendPing(&pi1_unreliable_sender, 1);
1417
1418 std::unique_ptr<EventLoop> pi2_pong_event_loop =
1419 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
1420 MessageCounter<examples::Ping> pi2_reliable_counter(pi2_pong_event_loop.get(),
1421 "/reliable");
1422 MessageCounter<examples::Ping> pi2_unreliable_counter(
1423 pi2_pong_event_loop.get(), "/unreliable");
1424 aos::Fetcher<examples::Ping> reliable_on_pi2_fetcher =
1425 pi2_pong_event_loop->MakeFetcher<examples::Ping>("/reliable");
1426 aos::Fetcher<examples::Ping> unreliable_on_pi2_fetcher =
1427 pi2_pong_event_loop->MakeFetcher<examples::Ping>("/unreliable");
1428
1429 const size_t reliable_channel_index = configuration::ChannelIndex(
1430 pi2_pong_event_loop->configuration(), reliable_on_pi2_fetcher.channel());
1431
1432 std::unique_ptr<EventLoop> pi1_remote_timestamp =
1433 simulated_event_loop_factory.MakeEventLoop("pi1_remote_timestamp", pi1);
1434
Austin Schuheeaa2022021-01-02 21:52:03 -08001435 const chrono::nanoseconds network_delay =
1436 simulated_event_loop_factory.network_delay();
1437
Austin Schuh4c570ea2020-11-19 23:13:24 -08001438 int reliable_timestamp_count = 0;
1439 pi1_remote_timestamp->MakeWatcher(
Austin Schuh89c9b812021-02-20 14:42:10 -08001440 shared() ? "/pi1/aos/remote_timestamps/pi2"
1441 : "/pi1/aos/remote_timestamps/pi2/reliable/aos-examples-Ping",
Austin Schuh20ac95d2020-12-05 17:24:19 -08001442 [reliable_channel_index, &reliable_timestamp_count,
Austin Schuheeaa2022021-01-02 21:52:03 -08001443 &simulated_event_loop_factory, pi2, network_delay, &pi2_pong_event_loop,
1444 &pi1_remote_timestamp](const RemoteMessage &header) {
Austin Schuh20ac95d2020-12-05 17:24:19 -08001445 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuhcdd90272021-03-15 12:46:16 -07001446 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
Austin Schuh20ac95d2020-12-05 17:24:19 -08001447 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
Austin Schuhcdd90272021-03-15 12:46:16 -07001448 ->boot_uuid());
Austin Schuh4c570ea2020-11-19 23:13:24 -08001449 VLOG(1) << aos::FlatbufferToJson(&header);
1450 if (header.channel_index() == reliable_channel_index) {
1451 ++reliable_timestamp_count;
1452 }
Austin Schuheeaa2022021-01-02 21:52:03 -08001453
1454 const aos::monotonic_clock::time_point header_monotonic_sent_time(
1455 chrono::nanoseconds(header.monotonic_sent_time()));
1456
1457 EXPECT_EQ(pi1_remote_timestamp->context().monotonic_event_time,
1458 header_monotonic_sent_time + network_delay +
1459 (pi1_remote_timestamp->monotonic_now() -
1460 pi2_pong_event_loop->monotonic_now()));
Austin Schuh4c570ea2020-11-19 23:13:24 -08001461 });
1462
1463 // Wait to let timestamp estimation start up before looking for the results.
1464 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
1465
1466 EXPECT_EQ(pi2_reliable_counter.count(), 1u);
1467 // This one isn't reliable, but was sent before the start. It should *not* be
1468 // delivered.
1469 EXPECT_EQ(pi2_unreliable_counter.count(), 0u);
1470 // Confirm we got a timestamp logged for the message that was forwarded.
1471 EXPECT_EQ(reliable_timestamp_count, 1u);
1472
1473 SendPing(&pi1_reliable_sender, 2);
1474 SendPing(&pi1_unreliable_sender, 2);
1475 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
1476 EXPECT_EQ(pi2_reliable_counter.count(), 2u);
1477 EXPECT_EQ(pi2_unreliable_counter.count(), 1u);
1478
1479 EXPECT_EQ(reliable_timestamp_count, 2u);
1480}
1481
Austin Schuh20ac95d2020-12-05 17:24:19 -08001482// Tests that rebooting a node changes the ServerStatistics message and the
1483// RemoteTimestamp message.
Austin Schuh89c9b812021-02-20 14:42:10 -08001484TEST_P(RemoteMessageSimulatedEventLoopTest, BootUUIDTest) {
Austin Schuh72e65682021-09-02 11:37:05 -07001485 const UUID pi1_boot0 = UUID::Random();
1486 const UUID pi2_boot0 = UUID::Random();
1487 const UUID pi2_boot1 = UUID::Random();
1488 const UUID pi3_boot0 = UUID::Random();
1489 UUID expected_boot_uuid = pi2_boot0;
Austin Schuh20ac95d2020-12-05 17:24:19 -08001490
Austin Schuh58646e22021-08-23 23:51:46 -07001491 message_bridge::TestingTimeConverter time(
1492 configuration::NodesCount(&config.message()));
1493 SimulatedEventLoopFactory factory(&config.message());
1494 factory.SetTimeConverter(&time);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001495
Austin Schuh58646e22021-08-23 23:51:46 -07001496 const size_t pi1_index =
1497 configuration::GetNodeIndex(&config.message(), "pi1");
1498 const size_t pi2_index =
1499 configuration::GetNodeIndex(&config.message(), "pi2");
1500 const size_t pi3_index =
1501 configuration::GetNodeIndex(&config.message(), "pi3");
Austin Schuh20ac95d2020-12-05 17:24:19 -08001502
Austin Schuh58646e22021-08-23 23:51:46 -07001503 {
1504 time.AddNextTimestamp(distributed_clock::epoch(),
1505 {BootTimestamp::epoch(), BootTimestamp::epoch(),
1506 BootTimestamp::epoch()});
1507
1508 const chrono::nanoseconds dt = chrono::milliseconds(2001);
1509
1510 time.AddNextTimestamp(
1511 distributed_clock::epoch() + dt,
1512 {BootTimestamp::epoch() + dt,
1513 BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
1514 BootTimestamp::epoch() + dt});
1515
1516 time.set_boot_uuid(pi1_index, 0, pi1_boot0);
1517 time.set_boot_uuid(pi2_index, 0, pi2_boot0);
1518 time.set_boot_uuid(pi2_index, 1, pi2_boot1);
1519 time.set_boot_uuid(pi3_index, 0, pi3_boot0);
1520 }
1521
1522 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1523 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
1524
1525 pi1->OnStartup([pi1]() { pi1->AlwaysStart<Ping>("ping"); });
1526 pi2->OnStartup([pi2]() { pi2->AlwaysStart<Pong>("pong"); });
Austin Schuh20ac95d2020-12-05 17:24:19 -08001527
1528 std::unique_ptr<EventLoop> pi1_remote_timestamp =
Austin Schuh58646e22021-08-23 23:51:46 -07001529 pi1->MakeEventLoop("pi1_remote_timestamp");
Austin Schuh20ac95d2020-12-05 17:24:19 -08001530
1531 int timestamp_count = 0;
1532 pi1_remote_timestamp->MakeWatcher(
Austin Schuh8902fa52021-03-14 22:39:24 -07001533 "/pi2/aos", [&expected_boot_uuid,
1534 &pi1_remote_timestamp](const message_bridge::Timestamp &) {
Austin Schuha9012be2021-07-21 15:19:11 -07001535 EXPECT_EQ(pi1_remote_timestamp->context().source_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -07001536 expected_boot_uuid);
1537 });
1538 pi1_remote_timestamp->MakeWatcher(
1539 "/test",
1540 [&expected_boot_uuid, &pi1_remote_timestamp](const examples::Pong &) {
Austin Schuha9012be2021-07-21 15:19:11 -07001541 EXPECT_EQ(pi1_remote_timestamp->context().source_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -07001542 expected_boot_uuid);
1543 });
1544 pi1_remote_timestamp->MakeWatcher(
Austin Schuh89c9b812021-02-20 14:42:10 -08001545 shared() ? "/pi1/aos/remote_timestamps/pi2"
1546 : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping",
Austin Schuh20ac95d2020-12-05 17:24:19 -08001547 [&timestamp_count, &expected_boot_uuid](const RemoteMessage &header) {
1548 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuhcdd90272021-03-15 12:46:16 -07001549 EXPECT_EQ(UUID::FromVector(header.boot_uuid()), expected_boot_uuid);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001550 VLOG(1) << aos::FlatbufferToJson(&header);
1551 ++timestamp_count;
1552 });
1553
1554 int pi1_server_statistics_count = 0;
Austin Schuh58646e22021-08-23 23:51:46 -07001555 bool first_pi1_server_statistics = true;
Austin Schuh20ac95d2020-12-05 17:24:19 -08001556 pi1_remote_timestamp->MakeWatcher(
Austin Schuh58646e22021-08-23 23:51:46 -07001557 "/pi1/aos", [&pi1_server_statistics_count, &expected_boot_uuid,
1558 &first_pi1_server_statistics](
Austin Schuh20ac95d2020-12-05 17:24:19 -08001559 const message_bridge::ServerStatistics &stats) {
1560 VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
1561 for (const message_bridge::ServerConnection *connection :
1562 *stats.connections()) {
Austin Schuh58646e22021-08-23 23:51:46 -07001563 if (connection->state() == message_bridge::State::CONNECTED) {
1564 ASSERT_TRUE(connection->has_boot_uuid());
1565 }
1566 if (!first_pi1_server_statistics) {
1567 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1568 }
Austin Schuh20ac95d2020-12-05 17:24:19 -08001569 if (connection->node()->name()->string_view() == "pi2") {
Austin Schuh58646e22021-08-23 23:51:46 -07001570 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1571 ASSERT_TRUE(connection->has_boot_uuid());
Austin Schuh20ac95d2020-12-05 17:24:19 -08001572 EXPECT_EQ(expected_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -07001573 UUID::FromString(connection->boot_uuid()))
Austin Schuh20ac95d2020-12-05 17:24:19 -08001574 << " : Got " << aos::FlatbufferToJson(&stats);
1575 ++pi1_server_statistics_count;
1576 }
1577 }
Austin Schuh58646e22021-08-23 23:51:46 -07001578 first_pi1_server_statistics = false;
Austin Schuh20ac95d2020-12-05 17:24:19 -08001579 });
1580
Austin Schuh58646e22021-08-23 23:51:46 -07001581 int pi1_client_statistics_count = 0;
1582 pi1_remote_timestamp->MakeWatcher(
1583 "/pi1/aos", [&pi1_client_statistics_count](
1584 const message_bridge::ClientStatistics &stats) {
1585 VLOG(1) << "pi1 ClientStatistics " << FlatbufferToJson(&stats);
1586 for (const message_bridge::ClientConnection *connection :
1587 *stats.connections()) {
1588 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1589 if (connection->node()->name()->string_view() == "pi2") {
1590 ++pi1_client_statistics_count;
1591 }
1592 }
1593 });
1594
1595 // Confirm that reboot changes the UUID.
1596 pi2->OnShutdown([&expected_boot_uuid, pi2, pi2_boot1]() {
1597 expected_boot_uuid = pi2_boot1;
1598 LOG(INFO) << "OnShutdown triggered for pi2";
1599 pi2->OnStartup([&expected_boot_uuid, pi2]() {
1600 EXPECT_EQ(expected_boot_uuid, pi2->boot_uuid());
1601 });
1602 });
1603
Austin Schuh20ac95d2020-12-05 17:24:19 -08001604 // Let a couple of ServerStatistics messages show up before rebooting.
Austin Schuh58646e22021-08-23 23:51:46 -07001605 factory.RunFor(chrono::milliseconds(2002));
Austin Schuh20ac95d2020-12-05 17:24:19 -08001606
1607 EXPECT_GT(timestamp_count, 100);
1608 EXPECT_GE(pi1_server_statistics_count, 1u);
1609
Austin Schuh20ac95d2020-12-05 17:24:19 -08001610 timestamp_count = 0;
1611 pi1_server_statistics_count = 0;
1612
Austin Schuh58646e22021-08-23 23:51:46 -07001613 factory.RunFor(chrono::milliseconds(2000));
Austin Schuh20ac95d2020-12-05 17:24:19 -08001614 EXPECT_GT(timestamp_count, 100);
1615 EXPECT_GE(pi1_server_statistics_count, 1u);
1616}
1617
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -07001618INSTANTIATE_TEST_SUITE_P(
Austin Schuh89c9b812021-02-20 14:42:10 -08001619 All, RemoteMessageSimulatedEventLoopTest,
1620 ::testing::Values(
1621 Param{"multinode_pingpong_test_combined_config.json", true},
1622 Param{"multinode_pingpong_test_split_config.json", false}));
1623
Austin Schuh58646e22021-08-23 23:51:46 -07001624// Tests that Startup and Shutdown do reasonable things.
1625TEST(SimulatedEventLoopTest, MultinodePingPongStartup) {
1626 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
1627 aos::configuration::ReadConfig(
1628 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
1629
Austin Schuh72e65682021-09-02 11:37:05 -07001630 size_t pi1_shutdown_counter = 0;
1631 size_t pi2_shutdown_counter = 0;
1632 MessageCounter<examples::Pong> *pi1_pong_counter = nullptr;
1633 MessageCounter<examples::Ping> *pi2_ping_counter = nullptr;
1634
Austin Schuh58646e22021-08-23 23:51:46 -07001635 message_bridge::TestingTimeConverter time(
1636 configuration::NodesCount(&config.message()));
1637 SimulatedEventLoopFactory factory(&config.message());
1638 factory.SetTimeConverter(&time);
1639 time.AddNextTimestamp(
1640 distributed_clock::epoch(),
1641 {BootTimestamp::epoch(), BootTimestamp::epoch(), BootTimestamp::epoch()});
1642
1643 const chrono::nanoseconds dt = chrono::seconds(10) + chrono::milliseconds(6);
1644
1645 time.AddNextTimestamp(
1646 distributed_clock::epoch() + dt,
1647 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
1648 BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
1649 BootTimestamp::epoch() + dt});
1650
1651 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1652 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
1653
1654 // Configure startup to start Ping and Pong, and count.
1655 size_t pi1_startup_counter = 0;
1656 size_t pi2_startup_counter = 0;
1657 pi1->OnStartup([pi1]() {
1658 LOG(INFO) << "Made ping";
1659 pi1->AlwaysStart<Ping>("ping");
1660 });
1661 pi1->OnStartup([&pi1_startup_counter]() { ++pi1_startup_counter; });
1662 pi2->OnStartup([pi2]() {
1663 LOG(INFO) << "Made pong";
1664 pi2->AlwaysStart<Pong>("pong");
1665 });
1666 pi2->OnStartup([&pi2_startup_counter]() { ++pi2_startup_counter; });
1667
1668 // Shutdown just counts.
Austin Schuh58646e22021-08-23 23:51:46 -07001669 pi1->OnShutdown([&pi1_shutdown_counter]() { ++pi1_shutdown_counter; });
1670 pi2->OnShutdown([&pi2_shutdown_counter]() { ++pi2_shutdown_counter; });
1671
Austin Schuh58646e22021-08-23 23:51:46 -07001672 // Automatically make counters on startup.
1673 pi1->OnStartup([&pi1_pong_counter, pi1]() {
1674 pi1_pong_counter = pi1->AlwaysStart<MessageCounter<examples::Pong>>(
1675 "pi1_pong_counter", "/test");
1676 });
1677 pi1->OnShutdown([&pi1_pong_counter]() { pi1_pong_counter = nullptr; });
1678 pi2->OnStartup([&pi2_ping_counter, pi2]() {
1679 pi2_ping_counter = pi2->AlwaysStart<MessageCounter<examples::Ping>>(
1680 "pi2_ping_counter", "/test");
1681 });
1682 pi2->OnShutdown([&pi2_ping_counter]() { pi2_ping_counter = nullptr; });
1683
1684 EXPECT_EQ(pi2_ping_counter, nullptr);
1685 EXPECT_EQ(pi1_pong_counter, nullptr);
1686
1687 EXPECT_EQ(pi1_startup_counter, 0u);
1688 EXPECT_EQ(pi2_startup_counter, 0u);
1689 EXPECT_EQ(pi1_shutdown_counter, 0u);
1690 EXPECT_EQ(pi2_shutdown_counter, 0u);
1691
1692 factory.RunFor(chrono::seconds(10) + chrono::milliseconds(5));
1693 EXPECT_EQ(pi1_startup_counter, 1u);
1694 EXPECT_EQ(pi2_startup_counter, 1u);
1695 EXPECT_EQ(pi1_shutdown_counter, 0u);
1696 EXPECT_EQ(pi2_shutdown_counter, 0u);
1697 EXPECT_EQ(pi2_ping_counter->count(), 1001);
1698 EXPECT_EQ(pi1_pong_counter->count(), 1001);
1699
1700 LOG(INFO) << pi1->monotonic_now();
1701 LOG(INFO) << pi2->monotonic_now();
1702
1703 factory.RunFor(chrono::seconds(5) + chrono::milliseconds(5));
1704
1705 EXPECT_EQ(pi1_startup_counter, 2u);
1706 EXPECT_EQ(pi2_startup_counter, 2u);
1707 EXPECT_EQ(pi1_shutdown_counter, 1u);
1708 EXPECT_EQ(pi2_shutdown_counter, 1u);
1709 EXPECT_EQ(pi2_ping_counter->count(), 501);
1710 EXPECT_EQ(pi1_pong_counter->count(), 501);
1711}
1712
1713// Tests that OnStartup handlers can be added after running and get called, and
1714// can't be called when running.
1715TEST(SimulatedEventLoopDeathTest, OnStartupWhileRunning) {
1716 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
1717 aos::configuration::ReadConfig(
1718 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
1719
1720 // Test that we can add startup handlers as long as we aren't running, and
1721 // they get run when Run gets called again.
1722 // Test that adding a startup handler when running fails.
1723 //
1724 // Test shutdown handlers get called on destruction.
1725 SimulatedEventLoopFactory factory(&config.message());
1726
1727 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1728
1729 int startup_count0 = 0;
1730 int startup_count1 = 0;
1731
1732 pi1->OnStartup([&]() { ++startup_count0; });
1733 EXPECT_EQ(startup_count0, 0);
1734 EXPECT_EQ(startup_count1, 0);
1735
1736 factory.RunFor(chrono::nanoseconds(1));
1737 EXPECT_EQ(startup_count0, 1);
1738 EXPECT_EQ(startup_count1, 0);
1739
1740 pi1->OnStartup([&]() { ++startup_count1; });
1741 EXPECT_EQ(startup_count0, 1);
1742 EXPECT_EQ(startup_count1, 0);
1743
1744 factory.RunFor(chrono::nanoseconds(1));
1745 EXPECT_EQ(startup_count0, 1);
1746 EXPECT_EQ(startup_count1, 1);
1747
1748 std::unique_ptr<EventLoop> loop = pi1->MakeEventLoop("foo");
1749 loop->OnRun([&]() { pi1->OnStartup([]() {}); });
1750
1751 EXPECT_DEATH({ factory.RunFor(chrono::nanoseconds(1)); },
1752 "Can only register OnStartup handlers when not running.");
1753}
1754
1755// Tests that OnStartup handlers can be added after running and get called, and
1756// all the handlers get called on reboot. Shutdown handlers are tested the same
1757// way.
1758TEST(SimulatedEventLoopTest, OnStartupShutdownAllRestarts) {
1759 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
1760 aos::configuration::ReadConfig(
1761 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
1762
Austin Schuh72e65682021-09-02 11:37:05 -07001763 int startup_count0 = 0;
1764 int shutdown_count0 = 0;
1765 int startup_count1 = 0;
1766 int shutdown_count1 = 0;
1767
Austin Schuh58646e22021-08-23 23:51:46 -07001768 message_bridge::TestingTimeConverter time(
1769 configuration::NodesCount(&config.message()));
1770 SimulatedEventLoopFactory factory(&config.message());
1771 factory.SetTimeConverter(&time);
1772 time.StartEqual();
1773
1774 const chrono::nanoseconds dt = chrono::seconds(10);
1775 time.RebootAt(0, distributed_clock::epoch() + dt);
1776 time.RebootAt(0, distributed_clock::epoch() + 2 * dt);
1777
1778 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1779
Austin Schuh58646e22021-08-23 23:51:46 -07001780 pi1->OnStartup([&]() { ++startup_count0; });
1781 pi1->OnShutdown([&]() { ++shutdown_count0; });
1782 EXPECT_EQ(startup_count0, 0);
1783 EXPECT_EQ(startup_count1, 0);
1784 EXPECT_EQ(shutdown_count0, 0);
1785 EXPECT_EQ(shutdown_count1, 0);
1786
1787 factory.RunFor(chrono::nanoseconds(1));
1788 EXPECT_EQ(startup_count0, 1);
1789 EXPECT_EQ(startup_count1, 0);
1790 EXPECT_EQ(shutdown_count0, 0);
1791 EXPECT_EQ(shutdown_count1, 0);
1792
1793 pi1->OnStartup([&]() { ++startup_count1; });
1794 EXPECT_EQ(startup_count0, 1);
1795 EXPECT_EQ(startup_count1, 0);
1796 EXPECT_EQ(shutdown_count0, 0);
1797 EXPECT_EQ(shutdown_count1, 0);
1798
1799 factory.RunFor(chrono::nanoseconds(1));
1800 EXPECT_EQ(startup_count0, 1);
1801 EXPECT_EQ(startup_count1, 1);
1802 EXPECT_EQ(shutdown_count0, 0);
1803 EXPECT_EQ(shutdown_count1, 0);
1804
1805 factory.RunFor(chrono::seconds(15));
1806
1807 EXPECT_EQ(startup_count0, 2);
1808 EXPECT_EQ(startup_count1, 2);
1809 EXPECT_EQ(shutdown_count0, 1);
1810 EXPECT_EQ(shutdown_count1, 0);
1811
1812 pi1->OnShutdown([&]() { ++shutdown_count1; });
1813 factory.RunFor(chrono::seconds(10));
1814
1815 EXPECT_EQ(startup_count0, 3);
1816 EXPECT_EQ(startup_count1, 3);
1817 EXPECT_EQ(shutdown_count0, 2);
1818 EXPECT_EQ(shutdown_count1, 1);
1819}
1820
1821// Tests that event loops which outlive shutdown crash.
1822TEST(SimulatedEventLoopDeathTest, EventLoopOutlivesReboot) {
1823 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
1824 aos::configuration::ReadConfig(
1825 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
1826
1827 message_bridge::TestingTimeConverter time(
1828 configuration::NodesCount(&config.message()));
1829 SimulatedEventLoopFactory factory(&config.message());
1830 factory.SetTimeConverter(&time);
1831 time.StartEqual();
1832
1833 const chrono::nanoseconds dt = chrono::seconds(10);
1834 time.RebootAt(0, distributed_clock::epoch() + dt);
1835
1836 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1837
1838 std::unique_ptr<EventLoop> loop = pi1->MakeEventLoop("foo");
1839
1840 EXPECT_DEATH({ factory.RunFor(dt * 2); }, "Event loop");
1841}
1842
1843// Tests that messages don't survive a reboot of a node.
1844TEST(SimulatedEventLoopTest, ChannelClearedOnReboot) {
1845 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
1846 aos::configuration::ReadConfig(
1847 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
1848
1849 message_bridge::TestingTimeConverter time(
1850 configuration::NodesCount(&config.message()));
1851 SimulatedEventLoopFactory factory(&config.message());
1852 factory.SetTimeConverter(&time);
1853 time.StartEqual();
1854
1855 const chrono::nanoseconds dt = chrono::seconds(10);
1856 time.RebootAt(0, distributed_clock::epoch() + dt);
1857
1858 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1859
1860 const UUID boot_uuid = pi1->boot_uuid();
1861 EXPECT_NE(boot_uuid, UUID::Zero());
1862
1863 {
1864 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
1865 aos::Sender<examples::Ping> test_message_sender =
1866 ping_event_loop->MakeSender<examples::Ping>("/reliable");
1867 SendPing(&test_message_sender, 1);
1868 }
1869
1870 factory.RunFor(chrono::seconds(5));
1871
1872 {
1873 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
1874 aos::Fetcher<examples::Ping> fetcher =
1875 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
1876 EXPECT_TRUE(fetcher.Fetch());
1877 }
1878
1879 factory.RunFor(chrono::seconds(10));
1880
1881 {
1882 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
1883 aos::Fetcher<examples::Ping> fetcher =
1884 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
1885 EXPECT_FALSE(fetcher.Fetch());
1886 }
1887 EXPECT_NE(boot_uuid, pi1->boot_uuid());
1888}
1889
1890// Tests that reliable messages get resent on reboot.
1891TEST(SimulatedEventLoopTest, ReliableMessageResentOnReboot) {
1892 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
1893 aos::configuration::ReadConfig(
1894 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
1895
1896 message_bridge::TestingTimeConverter time(
1897 configuration::NodesCount(&config.message()));
1898 SimulatedEventLoopFactory factory(&config.message());
1899 factory.SetTimeConverter(&time);
1900 time.StartEqual();
1901
1902 const chrono::nanoseconds dt = chrono::seconds(1);
1903 time.RebootAt(1, distributed_clock::epoch() + dt);
1904
1905 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1906 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
1907
1908 const UUID pi1_boot_uuid = pi1->boot_uuid();
1909 const UUID pi2_boot_uuid = pi2->boot_uuid();
1910 EXPECT_NE(pi1_boot_uuid, UUID::Zero());
1911 EXPECT_NE(pi2_boot_uuid, UUID::Zero());
1912
1913 {
1914 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
1915 aos::Sender<examples::Ping> test_message_sender =
1916 ping_event_loop->MakeSender<examples::Ping>("/reliable");
1917 SendPing(&test_message_sender, 1);
1918 }
1919
1920 factory.RunFor(chrono::milliseconds(500));
1921
1922 {
1923 ::std::unique_ptr<EventLoop> ping_event_loop = pi2->MakeEventLoop("pong");
1924 aos::Fetcher<examples::Ping> fetcher =
1925 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
1926 EXPECT_TRUE(fetcher.Fetch());
1927 }
1928
1929 factory.RunFor(chrono::seconds(1));
1930
1931 {
1932 ::std::unique_ptr<EventLoop> ping_event_loop = pi2->MakeEventLoop("pong");
1933 aos::Fetcher<examples::Ping> fetcher =
1934 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
1935 EXPECT_TRUE(fetcher.Fetch());
1936 }
1937 EXPECT_NE(pi2_boot_uuid, pi2->boot_uuid());
1938}
1939
Austin Schuh48205e62021-11-12 14:13:18 -08001940class SimulatedEventLoopDisconnectTest : public ::testing::Test {
1941 public:
1942 SimulatedEventLoopDisconnectTest()
1943 : config(aos::configuration::ReadConfig(ArtifactPath(
1944 "aos/events/multinode_pingpong_test_split_config.json"))),
1945 time(configuration::NodesCount(&config.message())),
1946 factory(&config.message()) {
1947 factory.SetTimeConverter(&time);
1948 }
1949
1950 void VerifyChannels(std::set<const aos::Channel *> statistics_channels,
1951 const monotonic_clock::time_point allowable_message_time,
1952 std::set<const aos::Node *> empty_nodes) {
1953 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1954 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
1955 std::unique_ptr<aos::EventLoop> pi1_event_loop =
1956 pi1->MakeEventLoop("fetcher");
1957 std::unique_ptr<aos::EventLoop> pi2_event_loop =
1958 pi2->MakeEventLoop("fetcher");
1959 for (const aos::Channel *channel : *factory.configuration()->channels()) {
1960 if (configuration::ChannelIsReadableOnNode(channel,
1961 pi1_event_loop->node())) {
1962 std::unique_ptr<aos::RawFetcher> fetcher =
1963 pi1_event_loop->MakeRawFetcher(channel);
1964 if (statistics_channels.find(channel) == statistics_channels.end() ||
1965 empty_nodes.find(pi1_event_loop->node()) != empty_nodes.end()) {
1966 EXPECT_FALSE(fetcher->Fetch() &&
1967 fetcher->context().monotonic_event_time >
1968 allowable_message_time)
1969 << ": Found recent message on channel "
1970 << configuration::CleanedChannelToString(channel) << " and time "
1971 << fetcher->context().monotonic_event_time << " > "
1972 << allowable_message_time << " on pi1";
1973 } else {
1974 EXPECT_TRUE(fetcher->Fetch() &&
1975 fetcher->context().monotonic_event_time >=
1976 allowable_message_time)
1977 << ": Didn't find recent message on channel "
1978 << configuration::CleanedChannelToString(channel) << " on pi1";
1979 }
1980 }
1981 if (configuration::ChannelIsReadableOnNode(channel,
1982 pi2_event_loop->node())) {
1983 std::unique_ptr<aos::RawFetcher> fetcher =
1984 pi2_event_loop->MakeRawFetcher(channel);
1985 if (statistics_channels.find(channel) == statistics_channels.end() ||
1986 empty_nodes.find(pi2_event_loop->node()) != empty_nodes.end()) {
1987 EXPECT_FALSE(fetcher->Fetch() &&
1988 fetcher->context().monotonic_event_time >
1989 allowable_message_time)
1990 << ": Found message on channel "
1991 << configuration::CleanedChannelToString(channel) << " and time "
1992 << fetcher->context().monotonic_event_time << " > "
1993 << allowable_message_time << " on pi2";
1994 } else {
1995 EXPECT_TRUE(fetcher->Fetch() &&
1996 fetcher->context().monotonic_event_time >=
1997 allowable_message_time)
1998 << ": Didn't find message on channel "
1999 << configuration::CleanedChannelToString(channel) << " on pi2";
2000 }
2001 }
2002 }
2003 }
2004
2005 aos::FlatbufferDetachedBuffer<aos::Configuration> config;
2006
2007 message_bridge::TestingTimeConverter time;
2008 SimulatedEventLoopFactory factory;
2009};
2010
2011// Tests that if we have message bridge client/server disabled, and timing
2012// reports disabled, no messages are sent. Also tests that we can disconnect a
2013// node and disable statistics on it and it actually fully disconnects.
2014TEST_F(SimulatedEventLoopDisconnectTest, NoMessagesWhenDisabled) {
2015 time.StartEqual();
2016 factory.SkipTimingReport();
2017 factory.DisableStatistics();
2018
2019 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2020 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2021
2022 std::unique_ptr<aos::EventLoop> pi1_event_loop =
2023 pi1->MakeEventLoop("fetcher");
2024 std::unique_ptr<aos::EventLoop> pi2_event_loop =
2025 pi2->MakeEventLoop("fetcher");
2026
2027 factory.RunFor(chrono::milliseconds(100000));
2028
2029 // Confirm no messages are sent if we've configured them all off.
2030 VerifyChannels({}, monotonic_clock::min_time, {});
2031
2032 // Now, confirm that all the message_bridge channels come back when we
2033 // re-enable.
2034 factory.EnableStatistics();
2035
2036 factory.RunFor(chrono::milliseconds(10050));
2037
2038 // Build up the list of all the messages we expect when we come back.
2039 {
2040 std::set<const aos::Channel *> statistics_channels;
2041 for (const std::pair<std::string_view, const Node *> pi :
2042 std::vector<std::pair<std::string_view, const Node *>>{
2043 {"/pi1/aos", pi1->node()},
2044 {"/pi2/aos", pi1->node()},
2045 {"/pi3/aos", pi1->node()}}) {
2046 statistics_channels.insert(configuration::GetChannel(
2047 factory.configuration(), pi.first, "aos.message_bridge.Timestamp", "",
2048 pi.second));
2049 statistics_channels.insert(configuration::GetChannel(
2050 factory.configuration(), pi.first,
2051 "aos.message_bridge.ServerStatistics", "", pi.second));
2052 statistics_channels.insert(configuration::GetChannel(
2053 factory.configuration(), pi.first,
2054 "aos.message_bridge.ClientStatistics", "", pi.second));
2055 }
2056
2057 statistics_channels.insert(configuration::GetChannel(
2058 factory.configuration(),
2059 "/pi1/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp",
2060 "aos.message_bridge.RemoteMessage", "", pi1->node()));
2061 statistics_channels.insert(configuration::GetChannel(
2062 factory.configuration(),
2063 "/pi2/aos/remote_timestamps/pi1/pi2/aos/aos-message_bridge-Timestamp",
2064 "aos.message_bridge.RemoteMessage", "", pi2->node()));
2065 VerifyChannels(statistics_channels, monotonic_clock::min_time, {});
2066 }
2067
2068 // Now test that we can disable the messages for a single node
2069 pi2->DisableStatistics();
2070 const aos::monotonic_clock::time_point statistics_disable_time =
2071 pi2->monotonic_now();
2072 factory.RunFor(chrono::milliseconds(10000));
2073
2074 // We should see a much smaller set of messages, but should still see messages
2075 // forwarded, mainly the timestamp message.
2076 {
2077 std::set<const aos::Channel *> statistics_channels;
2078 for (const std::pair<std::string_view, const Node *> pi :
2079 std::vector<std::pair<std::string_view, const Node *>>{
2080 {"/pi1/aos", pi1->node()}, {"/pi3/aos", pi1->node()}}) {
2081 statistics_channels.insert(configuration::GetChannel(
2082 factory.configuration(), pi.first, "aos.message_bridge.Timestamp", "",
2083 pi.second));
2084 statistics_channels.insert(configuration::GetChannel(
2085 factory.configuration(), pi.first,
2086 "aos.message_bridge.ServerStatistics", "", pi.second));
2087 statistics_channels.insert(configuration::GetChannel(
2088 factory.configuration(), pi.first,
2089 "aos.message_bridge.ClientStatistics", "", pi.second));
2090 }
2091
2092 statistics_channels.insert(configuration::GetChannel(
2093 factory.configuration(),
2094 "/pi1/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp",
2095 "aos.message_bridge.RemoteMessage", "", pi1->node()));
2096 VerifyChannels(statistics_channels, statistics_disable_time, {});
2097 }
2098
2099 // Now, fully disconnect the node. This will completely quiet down pi2.
2100 pi1->Disconnect(pi2->node());
2101 pi2->Disconnect(pi1->node());
2102
2103 const aos::monotonic_clock::time_point disconnect_disable_time =
2104 pi2->monotonic_now();
2105 factory.RunFor(chrono::milliseconds(10000));
2106
2107 {
2108 std::set<const aos::Channel *> statistics_channels;
2109 for (const std::pair<std::string_view, const Node *> pi :
2110 std::vector<std::pair<std::string_view, const Node *>>{
2111 {"/pi1/aos", pi1->node()}, {"/pi3/aos", pi1->node()}}) {
2112 statistics_channels.insert(configuration::GetChannel(
2113 factory.configuration(), pi.first, "aos.message_bridge.Timestamp", "",
2114 pi.second));
2115 statistics_channels.insert(configuration::GetChannel(
2116 factory.configuration(), pi.first,
2117 "aos.message_bridge.ServerStatistics", "", pi.second));
2118 statistics_channels.insert(configuration::GetChannel(
2119 factory.configuration(), pi.first,
2120 "aos.message_bridge.ClientStatistics", "", pi.second));
2121 }
2122
2123 VerifyChannels(statistics_channels, disconnect_disable_time, {pi2->node()});
2124 }
2125}
2126
Neil Balchc8f41ed2018-01-20 22:06:53 -08002127} // namespace testing
2128} // namespace aos