blob: 277e7834710ff92d06eff920b980328742875f69 [file] [log] [blame]
Alex Perrycb7da4b2019-08-28 19:35:56 -07001#include "aos/events/simulated_event_loop.h"
2
milind1f1dca32021-07-03 13:50:07 -07003#include <chrono>
Austin Schuh5f1cc5c2019-12-01 18:01:11 -08004#include <string_view>
5
Alex Perrycb7da4b2019-08-28 19:35:56 -07006#include "aos/events/event_loop_param_test.h"
Austin Schuh2f8fd752020-09-01 22:38:28 -07007#include "aos/events/logging/logger_generated.h"
Austin Schuh01b4c352020-09-21 23:09:39 -07008#include "aos/events/message_counter.h"
Austin Schuh898f4972020-01-11 17:21:25 -08009#include "aos/events/ping_lib.h"
10#include "aos/events/pong_lib.h"
Austin Schuh7d87b672019-12-01 20:23:49 -080011#include "aos/events/test_message_generated.h"
Austin Schuh4c3b9702020-08-30 11:34:55 -070012#include "aos/network/message_bridge_client_generated.h"
13#include "aos/network/message_bridge_server_generated.h"
Austin Schuh0de30f32020-12-06 12:44:28 -080014#include "aos/network/remote_message_generated.h"
Austin Schuh87dd3832021-01-01 23:07:31 -080015#include "aos/network/testing_time_converter.h"
Austin Schuh4c3b9702020-08-30 11:34:55 -070016#include "aos/network/timestamp_generated.h"
Austin Schuh373f1762021-06-02 21:07:09 -070017#include "aos/testing/path.h"
Neil Balchc8f41ed2018-01-20 22:06:53 -080018#include "gtest/gtest.h"
19
20namespace aos {
21namespace testing {
Brian Silverman28d14302020-09-18 15:26:17 -070022namespace {
23
Austin Schuh373f1762021-06-02 21:07:09 -070024using aos::testing::ArtifactPath;
Brian Silverman28d14302020-09-18 15:26:17 -070025
Austin Schuh58646e22021-08-23 23:51:46 -070026using logger::BootTimestamp;
Austin Schuh0de30f32020-12-06 12:44:28 -080027using message_bridge::RemoteMessage;
Austin Schuh7267c532019-05-19 19:55:53 -070028namespace chrono = ::std::chrono;
29
Austin Schuh0de30f32020-12-06 12:44:28 -080030} // namespace
31
Neil Balchc8f41ed2018-01-20 22:06:53 -080032class SimulatedEventLoopTestFactory : public EventLoopTestFactory {
33 public:
Austin Schuh5f1cc5c2019-12-01 18:01:11 -080034 ::std::unique_ptr<EventLoop> Make(std::string_view name) override {
Austin Schuh217a9782019-12-21 23:02:50 -080035 MaybeMake();
Austin Schuhac0771c2020-01-07 18:36:30 -080036 return event_loop_factory_->MakeEventLoop(name, my_node());
Neil Balchc8f41ed2018-01-20 22:06:53 -080037 }
Austin Schuh5f1cc5c2019-12-01 18:01:11 -080038 ::std::unique_ptr<EventLoop> MakePrimary(std::string_view name) override {
Austin Schuh217a9782019-12-21 23:02:50 -080039 MaybeMake();
Austin Schuhac0771c2020-01-07 18:36:30 -080040 return event_loop_factory_->MakeEventLoop(name, my_node());
Austin Schuh44019f92019-05-19 19:58:27 -070041 }
42
Austin Schuh217a9782019-12-21 23:02:50 -080043 void Run() override { event_loop_factory_->Run(); }
44 void Exit() override { event_loop_factory_->Exit(); }
Austin Schuh44019f92019-05-19 19:58:27 -070045
Austin Schuh52d325c2019-06-23 18:59:06 -070046 // TODO(austin): Implement this. It's used currently for a phased loop test.
47 // I'm not sure how much that matters.
48 void SleepFor(::std::chrono::nanoseconds /*duration*/) override {}
49
Austin Schuh7d87b672019-12-01 20:23:49 -080050 void set_send_delay(std::chrono::nanoseconds send_delay) {
Austin Schuh217a9782019-12-21 23:02:50 -080051 MaybeMake();
52 event_loop_factory_->set_send_delay(send_delay);
Austin Schuh7d87b672019-12-01 20:23:49 -080053 }
54
Neil Balchc8f41ed2018-01-20 22:06:53 -080055 private:
Austin Schuh217a9782019-12-21 23:02:50 -080056 void MaybeMake() {
57 if (!event_loop_factory_) {
58 if (configuration()->has_nodes()) {
Austin Schuhac0771c2020-01-07 18:36:30 -080059 event_loop_factory_ =
60 std::make_unique<SimulatedEventLoopFactory>(configuration());
Austin Schuh217a9782019-12-21 23:02:50 -080061 } else {
62 event_loop_factory_ =
63 std::make_unique<SimulatedEventLoopFactory>(configuration());
64 }
65 }
66 }
67 std::unique_ptr<SimulatedEventLoopFactory> event_loop_factory_;
Neil Balchc8f41ed2018-01-20 22:06:53 -080068};
69
Austin Schuh6bae8252021-02-07 22:01:49 -080070auto CommonParameters() {
71 return ::testing::Combine(
72 ::testing::Values([]() { return new SimulatedEventLoopTestFactory(); }),
73 ::testing::Values(ReadMethod::COPY, ReadMethod::PIN),
74 ::testing::Values(DoTimingReports::kYes, DoTimingReports::kNo));
75}
Austin Schuh6b6dfa52019-06-12 20:16:20 -070076
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -070077INSTANTIATE_TEST_SUITE_P(SimulatedEventLoopCommonTest, AbstractEventLoopTest,
Austin Schuh66168842021-08-17 19:42:21 -070078 CommonParameters());
Brian Silverman77162972020-08-12 19:52:40 -070079
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -070080INSTANTIATE_TEST_SUITE_P(SimulatedEventLoopCommonDeathTest,
Austin Schuh66168842021-08-17 19:42:21 -070081 AbstractEventLoopDeathTest, CommonParameters());
Neil Balchc8f41ed2018-01-20 22:06:53 -080082
Austin Schuh89c9b812021-02-20 14:42:10 -080083// Parameters to run all the tests with.
84struct Param {
85 // The config file to use.
86 std::string config;
87 // If true, the RemoteMessage channel should be shared between all the remote
88 // channels. If false, there will be 1 RemoteMessage channel per remote
89 // channel.
90 bool shared;
91};
92
93class RemoteMessageSimulatedEventLoopTest
94 : public ::testing::TestWithParam<struct Param> {
95 public:
96 RemoteMessageSimulatedEventLoopTest()
97 : config(aos::configuration::ReadConfig(
Austin Schuh373f1762021-06-02 21:07:09 -070098 ArtifactPath(absl::StrCat("aos/events/", GetParam().config)))) {
Austin Schuh89c9b812021-02-20 14:42:10 -080099 LOG(INFO) << "Config " << GetParam().config;
100 }
101
102 bool shared() const { return GetParam().shared; }
103
104 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
105 MakePi2OnPi1MessageCounters(aos::EventLoop *event_loop) {
106 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>> counters;
107 if (shared()) {
108 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
109 event_loop, "/aos/remote_timestamps/pi2"));
110 } else {
111 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
112 event_loop,
113 "/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp"));
114 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
115 event_loop, "/aos/remote_timestamps/pi2/test/aos-examples-Ping"));
116 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
117 event_loop, "/aos/remote_timestamps/pi2/reliable/aos-examples-Ping"));
118 }
119 return counters;
120 }
121
122 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
123 MakePi1OnPi2MessageCounters(aos::EventLoop *event_loop) {
124 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>> counters;
125 if (shared()) {
126 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
127 event_loop, "/aos/remote_timestamps/pi1"));
128 } else {
129 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
130 event_loop, "/aos/remote_timestamps/pi1/test/aos-examples-Pong"));
131 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
132 event_loop,
133 "/aos/remote_timestamps/pi1/pi2/aos/aos-message_bridge-Timestamp"));
134 }
135 return counters;
136 }
137
138 aos::FlatbufferDetachedBuffer<aos::Configuration> config;
139};
140
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800141class FunctionEvent : public EventScheduler::Event {
142 public:
143 FunctionEvent(std::function<void()> fn) : fn_(fn) {}
144
145 void Handle() noexcept override { fn_(); }
146
147 private:
148 std::function<void()> fn_;
149};
150
Neil Balchc8f41ed2018-01-20 22:06:53 -0800151// Test that creating an event and running the scheduler runs the event.
152TEST(EventSchedulerTest, ScheduleEvent) {
153 int counter = 0;
Austin Schuh8bd96322020-02-13 21:18:22 -0800154 EventSchedulerScheduler scheduler_scheduler;
Austin Schuh58646e22021-08-23 23:51:46 -0700155 EventScheduler scheduler(0);
Austin Schuh8bd96322020-02-13 21:18:22 -0800156 scheduler_scheduler.AddEventScheduler(&scheduler);
Neil Balchc8f41ed2018-01-20 22:06:53 -0800157
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800158 FunctionEvent e([&counter]() { counter += 1; });
159 scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(1), &e);
Austin Schuh8bd96322020-02-13 21:18:22 -0800160 scheduler_scheduler.Run();
Neil Balchc8f41ed2018-01-20 22:06:53 -0800161 EXPECT_EQ(counter, 1);
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800162 auto token =
163 scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(2), &e);
Neil Balchc8f41ed2018-01-20 22:06:53 -0800164 scheduler.Deschedule(token);
Austin Schuh8bd96322020-02-13 21:18:22 -0800165 scheduler_scheduler.Run();
Neil Balchc8f41ed2018-01-20 22:06:53 -0800166 EXPECT_EQ(counter, 1);
167}
168
169// Test that descheduling an already scheduled event doesn't run the event.
170TEST(EventSchedulerTest, DescheduleEvent) {
171 int counter = 0;
Austin Schuh8bd96322020-02-13 21:18:22 -0800172 EventSchedulerScheduler scheduler_scheduler;
Austin Schuh58646e22021-08-23 23:51:46 -0700173 EventScheduler scheduler(0);
Austin Schuh8bd96322020-02-13 21:18:22 -0800174 scheduler_scheduler.AddEventScheduler(&scheduler);
Neil Balchc8f41ed2018-01-20 22:06:53 -0800175
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800176 FunctionEvent e([&counter]() { counter += 1; });
177 auto token =
178 scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(1), &e);
Neil Balchc8f41ed2018-01-20 22:06:53 -0800179 scheduler.Deschedule(token);
Austin Schuh8bd96322020-02-13 21:18:22 -0800180 scheduler_scheduler.Run();
Neil Balchc8f41ed2018-01-20 22:06:53 -0800181 EXPECT_EQ(counter, 0);
182}
Austin Schuh44019f92019-05-19 19:58:27 -0700183
Austin Schuh8fb315a2020-11-19 22:33:58 -0800184void SendTestMessage(aos::Sender<TestMessage> *sender, int value) {
185 aos::Sender<TestMessage>::Builder builder = sender->MakeBuilder();
186 TestMessage::Builder test_message_builder =
187 builder.MakeBuilder<TestMessage>();
188 test_message_builder.add_value(value);
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800189 ASSERT_EQ(builder.Send(test_message_builder.Finish()), RawSender::Error::kOk);
Austin Schuh8fb315a2020-11-19 22:33:58 -0800190}
191
192// Test that sending a message after running gets properly notified.
193TEST(SimulatedEventLoopTest, SendAfterRunFor) {
194 SimulatedEventLoopTestFactory factory;
195
196 SimulatedEventLoopFactory simulated_event_loop_factory(
197 factory.configuration());
198
199 ::std::unique_ptr<EventLoop> ping_event_loop =
200 simulated_event_loop_factory.MakeEventLoop("ping");
201 aos::Sender<TestMessage> test_message_sender =
202 ping_event_loop->MakeSender<TestMessage>("/test");
203 SendTestMessage(&test_message_sender, 1);
204
205 std::unique_ptr<EventLoop> pong1_event_loop =
206 simulated_event_loop_factory.MakeEventLoop("pong");
207 MessageCounter<TestMessage> test_message_counter1(pong1_event_loop.get(),
208 "/test");
209
210 EXPECT_FALSE(ping_event_loop->is_running());
211
212 // Watchers start when you start running, so there should be nothing counted.
213 simulated_event_loop_factory.RunFor(chrono::seconds(1));
214 EXPECT_EQ(test_message_counter1.count(), 0u);
215
216 std::unique_ptr<EventLoop> pong2_event_loop =
217 simulated_event_loop_factory.MakeEventLoop("pong");
218 MessageCounter<TestMessage> test_message_counter2(pong2_event_loop.get(),
219 "/test");
220
221 // Pauses in the middle don't count though, so this should be counted.
222 // But, the fresh watcher shouldn't pick it up yet.
223 SendTestMessage(&test_message_sender, 2);
224
225 EXPECT_EQ(test_message_counter1.count(), 0u);
226 EXPECT_EQ(test_message_counter2.count(), 0u);
227 simulated_event_loop_factory.RunFor(chrono::seconds(1));
228
229 EXPECT_EQ(test_message_counter1.count(), 1u);
230 EXPECT_EQ(test_message_counter2.count(), 0u);
231}
232
233// Test that creating an event loop while running dies.
234TEST(SimulatedEventLoopDeathTest, MakeEventLoopWhileRunning) {
235 SimulatedEventLoopTestFactory factory;
236
237 SimulatedEventLoopFactory simulated_event_loop_factory(
238 factory.configuration());
239
240 ::std::unique_ptr<EventLoop> event_loop =
241 simulated_event_loop_factory.MakeEventLoop("ping");
242
243 auto timer = event_loop->AddTimer([&]() {
244 EXPECT_DEATH(
245 {
246 ::std::unique_ptr<EventLoop> event_loop2 =
247 simulated_event_loop_factory.MakeEventLoop("ping");
248 },
249 "event loop while running");
250 simulated_event_loop_factory.Exit();
251 });
252
253 event_loop->OnRun([&event_loop, &timer] {
254 timer->Setup(event_loop->monotonic_now() + chrono::milliseconds(50));
255 });
256
257 simulated_event_loop_factory.Run();
258}
259
260// Test that creating a watcher after running dies.
261TEST(SimulatedEventLoopDeathTest, MakeWatcherAfterRunning) {
262 SimulatedEventLoopTestFactory factory;
263
264 SimulatedEventLoopFactory simulated_event_loop_factory(
265 factory.configuration());
266
267 ::std::unique_ptr<EventLoop> event_loop =
268 simulated_event_loop_factory.MakeEventLoop("ping");
269
270 simulated_event_loop_factory.RunFor(chrono::seconds(1));
271
272 EXPECT_DEATH(
273 { MessageCounter<TestMessage> counter(event_loop.get(), "/test"); },
274 "Can't add a watcher after running");
275
276 ::std::unique_ptr<EventLoop> event_loop2 =
277 simulated_event_loop_factory.MakeEventLoop("ping");
278
279 simulated_event_loop_factory.RunFor(chrono::seconds(1));
280
281 EXPECT_DEATH(
282 { MessageCounter<TestMessage> counter(event_loop2.get(), "/test"); },
283 "Can't add a watcher after running");
284}
285
Austin Schuh44019f92019-05-19 19:58:27 -0700286// Test that running for a time period with no handlers causes time to progress
287// correctly.
288TEST(SimulatedEventLoopTest, RunForNoHandlers) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800289 SimulatedEventLoopTestFactory factory;
290
291 SimulatedEventLoopFactory simulated_event_loop_factory(
292 factory.configuration());
Austin Schuh44019f92019-05-19 19:58:27 -0700293 ::std::unique_ptr<EventLoop> event_loop =
Austin Schuh5f1cc5c2019-12-01 18:01:11 -0800294 simulated_event_loop_factory.MakeEventLoop("loop");
Austin Schuh44019f92019-05-19 19:58:27 -0700295
296 simulated_event_loop_factory.RunFor(chrono::seconds(1));
297
298 EXPECT_EQ(::aos::monotonic_clock::epoch() + chrono::seconds(1),
Austin Schuh44019f92019-05-19 19:58:27 -0700299 event_loop->monotonic_now());
300}
301
302// Test that running for a time with a periodic handler causes time to end
303// correctly.
304TEST(SimulatedEventLoopTest, RunForTimerHandler) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800305 SimulatedEventLoopTestFactory factory;
306
307 SimulatedEventLoopFactory simulated_event_loop_factory(
308 factory.configuration());
Austin Schuh44019f92019-05-19 19:58:27 -0700309 ::std::unique_ptr<EventLoop> event_loop =
Austin Schuh5f1cc5c2019-12-01 18:01:11 -0800310 simulated_event_loop_factory.MakeEventLoop("loop");
Austin Schuh44019f92019-05-19 19:58:27 -0700311
312 int counter = 0;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700313 auto timer = event_loop->AddTimer([&counter]() { ++counter; });
Austin Schuh44019f92019-05-19 19:58:27 -0700314 event_loop->OnRun([&event_loop, &timer] {
315 timer->Setup(event_loop->monotonic_now() + chrono::milliseconds(50),
316 chrono::milliseconds(100));
317 });
318
319 simulated_event_loop_factory.RunFor(chrono::seconds(1));
320
321 EXPECT_EQ(::aos::monotonic_clock::epoch() + chrono::seconds(1),
Austin Schuh44019f92019-05-19 19:58:27 -0700322 event_loop->monotonic_now());
323 EXPECT_EQ(counter, 10);
324}
325
Austin Schuh7d87b672019-12-01 20:23:49 -0800326// Tests that watchers have latency in simulation.
327TEST(SimulatedEventLoopTest, WatcherTimingReport) {
328 SimulatedEventLoopTestFactory factory;
329 factory.set_send_delay(std::chrono::microseconds(50));
330
331 FLAGS_timing_report_ms = 1000;
332 auto loop1 = factory.MakePrimary("primary");
333 loop1->MakeWatcher("/test", [](const TestMessage &) {});
334
335 auto loop2 = factory.Make("sender_loop");
336
337 auto loop3 = factory.Make("report_fetcher");
338
339 Fetcher<timing::Report> report_fetcher =
340 loop3->MakeFetcher<timing::Report>("/aos");
341 EXPECT_FALSE(report_fetcher.Fetch());
342
343 auto sender = loop2->MakeSender<TestMessage>("/test");
344
345 // Send 10 messages in the middle of a timing report period so we get
346 // something interesting back.
347 auto test_timer = loop2->AddTimer([&sender]() {
348 for (int i = 0; i < 10; ++i) {
349 aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
350 TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
351 builder.add_value(200 + i);
milind1f1dca32021-07-03 13:50:07 -0700352 msg.CheckOk(msg.Send(builder.Finish()));
Austin Schuh7d87b672019-12-01 20:23:49 -0800353 }
354 });
355
356 // Quit after 1 timing report, mid way through the next cycle.
357 {
358 auto end_timer = loop1->AddTimer([&factory]() { factory.Exit(); });
359 end_timer->Setup(loop1->monotonic_now() + chrono::milliseconds(2500));
360 end_timer->set_name("end");
361 }
362
363 loop1->OnRun([&test_timer, &loop1]() {
364 test_timer->Setup(loop1->monotonic_now() + chrono::milliseconds(1500));
365 });
366
367 factory.Run();
368
369 // And, since we are here, check that the timing report makes sense.
370 // Start by looking for our event loop's timing.
371 FlatbufferDetachedBuffer<timing::Report> primary_report =
372 FlatbufferDetachedBuffer<timing::Report>::Empty();
373 while (report_fetcher.FetchNext()) {
374 LOG(INFO) << "Report " << FlatbufferToJson(report_fetcher.get());
375 if (report_fetcher->name()->string_view() == "primary") {
376 primary_report = CopyFlatBuffer(report_fetcher.get());
377 }
378 }
379
380 // Check the watcher report.
Ravago Jonescf453ab2020-05-06 21:14:53 -0700381 VLOG(1) << FlatbufferToJson(primary_report, {.multi_line = true});
Austin Schuh7d87b672019-12-01 20:23:49 -0800382
383 EXPECT_EQ(primary_report.message().name()->string_view(), "primary");
384
385 // Just the timing report timer.
386 ASSERT_NE(primary_report.message().timers(), nullptr);
387 EXPECT_EQ(primary_report.message().timers()->size(), 2);
388
389 // No phased loops
390 ASSERT_EQ(primary_report.message().phased_loops(), nullptr);
391
392 // And now confirm that the watcher received all 10 messages, and has latency.
393 ASSERT_NE(primary_report.message().watchers(), nullptr);
394 ASSERT_EQ(primary_report.message().watchers()->size(), 1);
395 EXPECT_EQ(primary_report.message().watchers()->Get(0)->count(), 10);
396 EXPECT_NEAR(
397 primary_report.message().watchers()->Get(0)->wakeup_latency()->average(),
398 0.00005, 1e-9);
399 EXPECT_NEAR(
400 primary_report.message().watchers()->Get(0)->wakeup_latency()->min(),
401 0.00005, 1e-9);
402 EXPECT_NEAR(
403 primary_report.message().watchers()->Get(0)->wakeup_latency()->max(),
404 0.00005, 1e-9);
405 EXPECT_EQ(primary_report.message()
406 .watchers()
407 ->Get(0)
408 ->wakeup_latency()
409 ->standard_deviation(),
410 0.0);
411
412 EXPECT_EQ(
413 primary_report.message().watchers()->Get(0)->handler_time()->average(),
414 0.0);
415 EXPECT_EQ(primary_report.message().watchers()->Get(0)->handler_time()->min(),
416 0.0);
417 EXPECT_EQ(primary_report.message().watchers()->Get(0)->handler_time()->max(),
418 0.0);
419 EXPECT_EQ(primary_report.message()
420 .watchers()
421 ->Get(0)
422 ->handler_time()
423 ->standard_deviation(),
424 0.0);
425}
426
Austin Schuh89c9b812021-02-20 14:42:10 -0800427size_t CountAll(
428 const std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
429 &counters) {
430 size_t count = 0u;
431 for (const std::unique_ptr<MessageCounter<RemoteMessage>> &counter :
432 counters) {
433 count += counter->count();
434 }
435 return count;
436}
437
Austin Schuh4c3b9702020-08-30 11:34:55 -0700438// Tests that ping and pong work when on 2 different nodes, and the message
439// gateway messages are sent out as expected.
Austin Schuh89c9b812021-02-20 14:42:10 -0800440TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodePingPong) {
Austin Schuh898f4972020-01-11 17:21:25 -0800441 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
442 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
Austin Schuh4c3b9702020-08-30 11:34:55 -0700443 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
Austin Schuh898f4972020-01-11 17:21:25 -0800444
445 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
446
447 std::unique_ptr<EventLoop> ping_event_loop =
448 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
449 Ping ping(ping_event_loop.get());
450
451 std::unique_ptr<EventLoop> pong_event_loop =
452 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
453 Pong pong(pong_event_loop.get());
454
455 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
456 simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700457 MessageCounter<examples::Pong> pi2_pong_counter(
458 pi2_pong_counter_event_loop.get(), "/test");
Austin Schuh2f8fd752020-09-01 22:38:28 -0700459 aos::Fetcher<message_bridge::Timestamp> pi1_on_pi2_timestamp_fetcher =
460 pi2_pong_counter_event_loop->MakeFetcher<message_bridge::Timestamp>(
461 "/pi1/aos");
462 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
463 pi2_pong_counter_event_loop->MakeFetcher<examples::Ping>("/test");
Austin Schuh898f4972020-01-11 17:21:25 -0800464
Austin Schuh4c3b9702020-08-30 11:34:55 -0700465 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
466 simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
Austin Schuh898f4972020-01-11 17:21:25 -0800467
468 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
469 simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700470 MessageCounter<examples::Pong> pi1_pong_counter(
471 pi1_pong_counter_event_loop.get(), "/test");
Austin Schuh2f8fd752020-09-01 22:38:28 -0700472 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
473 pi1_pong_counter_event_loop->MakeFetcher<examples::Ping>("/test");
474 aos::Fetcher<message_bridge::Timestamp> pi1_on_pi1_timestamp_fetcher =
475 pi1_pong_counter_event_loop->MakeFetcher<message_bridge::Timestamp>(
476 "/aos");
477
Austin Schuh4c3b9702020-08-30 11:34:55 -0700478 // Count timestamps.
479 MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
480 pi1_pong_counter_event_loop.get(), "/pi1/aos");
481 MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
482 pi2_pong_counter_event_loop.get(), "/pi1/aos");
483 MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
484 pi3_pong_counter_event_loop.get(), "/pi1/aos");
485 MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
486 pi1_pong_counter_event_loop.get(), "/pi2/aos");
487 MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
488 pi2_pong_counter_event_loop.get(), "/pi2/aos");
489 MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
490 pi1_pong_counter_event_loop.get(), "/pi3/aos");
491 MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
492 pi3_pong_counter_event_loop.get(), "/pi3/aos");
493
Austin Schuh2f8fd752020-09-01 22:38:28 -0700494 // Count remote timestamps
Austin Schuh89c9b812021-02-20 14:42:10 -0800495 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
496 remote_timestamps_pi2_on_pi1 =
497 MakePi2OnPi1MessageCounters(pi1_pong_counter_event_loop.get());
498 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
499 remote_timestamps_pi1_on_pi2 =
500 MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700501
Austin Schuh4c3b9702020-08-30 11:34:55 -0700502 // Wait to let timestamp estimation start up before looking for the results.
503 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
504
Austin Schuh8fb315a2020-11-19 22:33:58 -0800505 std::unique_ptr<EventLoop> pi1_statistics_counter_event_loop =
506 simulated_event_loop_factory.MakeEventLoop("pi1_statistics_counter", pi1);
507 std::unique_ptr<EventLoop> pi2_statistics_counter_event_loop =
508 simulated_event_loop_factory.MakeEventLoop("pi2_statistics_counter", pi2);
509 std::unique_ptr<EventLoop> pi3_statistics_counter_event_loop =
510 simulated_event_loop_factory.MakeEventLoop("pi3_statistics_counter", pi3);
511
Austin Schuh4c3b9702020-08-30 11:34:55 -0700512 int pi1_server_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800513 pi1_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700514 "/pi1/aos", [&pi1_server_statistics_count](
515 const message_bridge::ServerStatistics &stats) {
516 VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
517 EXPECT_EQ(stats.connections()->size(), 2u);
518 for (const message_bridge::ServerConnection *connection :
519 *stats.connections()) {
520 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800521 EXPECT_EQ(connection->connection_count(), 1u);
522 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800523 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700524 if (connection->node()->name()->string_view() == "pi2") {
525 EXPECT_GT(connection->sent_packets(), 50);
526 } else if (connection->node()->name()->string_view() == "pi3") {
527 EXPECT_GE(connection->sent_packets(), 5);
528 } else {
529 LOG(FATAL) << "Unknown connection";
530 }
531
532 EXPECT_TRUE(connection->has_monotonic_offset());
533 EXPECT_EQ(connection->monotonic_offset(), 0);
534 }
535 ++pi1_server_statistics_count;
536 });
537
538 int pi2_server_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800539 pi2_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700540 "/pi2/aos", [&pi2_server_statistics_count](
541 const message_bridge::ServerStatistics &stats) {
542 VLOG(1) << "pi2 ServerStatistics " << FlatbufferToJson(&stats);
543 EXPECT_EQ(stats.connections()->size(), 1u);
544
545 const message_bridge::ServerConnection *connection =
546 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800547 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700548 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
549 EXPECT_GT(connection->sent_packets(), 50);
550 EXPECT_TRUE(connection->has_monotonic_offset());
551 EXPECT_EQ(connection->monotonic_offset(), 0);
Austin Schuh367a7f42021-11-23 23:04:36 -0800552 EXPECT_EQ(connection->connection_count(), 1u);
553 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700554 ++pi2_server_statistics_count;
555 });
556
557 int pi3_server_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800558 pi3_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700559 "/pi3/aos", [&pi3_server_statistics_count](
560 const message_bridge::ServerStatistics &stats) {
561 VLOG(1) << "pi3 ServerStatistics " << FlatbufferToJson(&stats);
562 EXPECT_EQ(stats.connections()->size(), 1u);
563
564 const message_bridge::ServerConnection *connection =
565 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800566 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700567 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
568 EXPECT_GE(connection->sent_packets(), 5);
569 EXPECT_TRUE(connection->has_monotonic_offset());
570 EXPECT_EQ(connection->monotonic_offset(), 0);
Austin Schuh367a7f42021-11-23 23:04:36 -0800571 EXPECT_EQ(connection->connection_count(), 1u);
572 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700573 ++pi3_server_statistics_count;
574 });
575
576 int pi1_client_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800577 pi1_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700578 "/pi1/aos", [&pi1_client_statistics_count](
579 const message_bridge::ClientStatistics &stats) {
580 VLOG(1) << "pi1 ClientStatistics " << FlatbufferToJson(&stats);
581 EXPECT_EQ(stats.connections()->size(), 2u);
582
583 for (const message_bridge::ClientConnection *connection :
584 *stats.connections()) {
585 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
586 if (connection->node()->name()->string_view() == "pi2") {
587 EXPECT_GT(connection->received_packets(), 50);
588 } else if (connection->node()->name()->string_view() == "pi3") {
589 EXPECT_GE(connection->received_packets(), 5);
590 } else {
591 LOG(FATAL) << "Unknown connection";
592 }
593
Austin Schuhe61d4382021-03-31 21:33:02 -0700594 EXPECT_EQ(connection->partial_deliveries(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700595 EXPECT_TRUE(connection->has_monotonic_offset());
596 EXPECT_EQ(connection->monotonic_offset(), 150000);
Austin Schuh367a7f42021-11-23 23:04:36 -0800597 EXPECT_EQ(connection->connection_count(), 1u);
598 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700599 }
600 ++pi1_client_statistics_count;
601 });
602
603 int pi2_client_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800604 pi2_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700605 "/pi2/aos", [&pi2_client_statistics_count](
606 const message_bridge::ClientStatistics &stats) {
607 VLOG(1) << "pi2 ClientStatistics " << FlatbufferToJson(&stats);
608 EXPECT_EQ(stats.connections()->size(), 1u);
609
610 const message_bridge::ClientConnection *connection =
611 stats.connections()->Get(0);
612 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
613 EXPECT_GT(connection->received_packets(), 50);
Austin Schuhe61d4382021-03-31 21:33:02 -0700614 EXPECT_EQ(connection->partial_deliveries(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700615 EXPECT_TRUE(connection->has_monotonic_offset());
616 EXPECT_EQ(connection->monotonic_offset(), 150000);
Austin Schuh367a7f42021-11-23 23:04:36 -0800617 EXPECT_EQ(connection->connection_count(), 1u);
618 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700619 ++pi2_client_statistics_count;
620 });
621
622 int pi3_client_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800623 pi3_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700624 "/pi3/aos", [&pi3_client_statistics_count](
625 const message_bridge::ClientStatistics &stats) {
626 VLOG(1) << "pi3 ClientStatistics " << FlatbufferToJson(&stats);
627 EXPECT_EQ(stats.connections()->size(), 1u);
628
629 const message_bridge::ClientConnection *connection =
630 stats.connections()->Get(0);
631 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
632 EXPECT_GE(connection->received_packets(), 5);
Austin Schuhe61d4382021-03-31 21:33:02 -0700633 EXPECT_EQ(connection->partial_deliveries(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700634 EXPECT_TRUE(connection->has_monotonic_offset());
635 EXPECT_EQ(connection->monotonic_offset(), 150000);
Austin Schuh367a7f42021-11-23 23:04:36 -0800636 EXPECT_EQ(connection->connection_count(), 1u);
637 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700638 ++pi3_client_statistics_count;
639 });
640
Austin Schuh2f8fd752020-09-01 22:38:28 -0700641 // Find the channel index for both the /pi1/aos Timestamp channel and Ping
642 // channel.
643 const size_t pi1_timestamp_channel =
644 configuration::ChannelIndex(pi1_pong_counter_event_loop->configuration(),
645 pi1_on_pi2_timestamp_fetcher.channel());
646 const size_t ping_timestamp_channel =
647 configuration::ChannelIndex(pi1_pong_counter_event_loop->configuration(),
648 ping_on_pi2_fetcher.channel());
649
650 for (const Channel *channel :
651 *pi1_pong_counter_event_loop->configuration()->channels()) {
652 VLOG(1) << "Channel "
653 << configuration::ChannelIndex(
654 pi1_pong_counter_event_loop->configuration(), channel)
655 << " " << configuration::CleanedChannelToString(channel);
656 }
657
Austin Schuh8fb315a2020-11-19 22:33:58 -0800658 std::unique_ptr<EventLoop> pi1_remote_timestamp =
659 simulated_event_loop_factory.MakeEventLoop("pi1_remote_timestamp", pi1);
660
Austin Schuh89c9b812021-02-20 14:42:10 -0800661 for (std::pair<int, std::string> channel :
662 shared()
663 ? std::vector<std::pair<
664 int, std::string>>{{-1, "/pi1/aos/remote_timestamps/pi2"}}
665 : std::vector<std::pair<int, std::string>>{
666 {pi1_timestamp_channel,
667 "/pi1/aos/remote_timestamps/pi2/pi1/aos/"
668 "aos-message_bridge-Timestamp"},
669 {ping_timestamp_channel,
670 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping"}}) {
671 // For each remote timestamp we get back, confirm that it is either a ping
672 // message, or a timestamp we sent out. Also confirm that the timestamps
673 // are correct.
674 pi1_remote_timestamp->MakeWatcher(
675 channel.second,
676 [pi1_timestamp_channel, ping_timestamp_channel, &ping_on_pi2_fetcher,
677 &ping_on_pi1_fetcher, &pi1_on_pi2_timestamp_fetcher,
678 &pi1_on_pi1_timestamp_fetcher, &simulated_event_loop_factory, pi2,
679 channel_index = channel.first](const RemoteMessage &header) {
680 VLOG(1) << aos::FlatbufferToJson(&header);
681 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuhcdd90272021-03-15 12:46:16 -0700682 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
Austin Schuh89c9b812021-02-20 14:42:10 -0800683 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
Austin Schuhcdd90272021-03-15 12:46:16 -0700684 ->boot_uuid());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700685
Austin Schuh89c9b812021-02-20 14:42:10 -0800686 const aos::monotonic_clock::time_point header_monotonic_sent_time(
687 chrono::nanoseconds(header.monotonic_sent_time()));
688 const aos::realtime_clock::time_point header_realtime_sent_time(
689 chrono::nanoseconds(header.realtime_sent_time()));
690 const aos::monotonic_clock::time_point header_monotonic_remote_time(
691 chrono::nanoseconds(header.monotonic_remote_time()));
692 const aos::realtime_clock::time_point header_realtime_remote_time(
693 chrono::nanoseconds(header.realtime_remote_time()));
Austin Schuh2f8fd752020-09-01 22:38:28 -0700694
Austin Schuh89c9b812021-02-20 14:42:10 -0800695 if (channel_index != -1) {
696 ASSERT_EQ(channel_index, header.channel_index());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700697 }
698
Austin Schuh89c9b812021-02-20 14:42:10 -0800699 const Context *pi1_context = nullptr;
700 const Context *pi2_context = nullptr;
701
702 if (header.channel_index() == pi1_timestamp_channel) {
703 // Find the forwarded message.
704 while (pi1_on_pi2_timestamp_fetcher.context().monotonic_event_time <
705 header_monotonic_sent_time) {
706 ASSERT_TRUE(pi1_on_pi2_timestamp_fetcher.FetchNext());
707 }
708
709 // And the source message.
710 while (pi1_on_pi1_timestamp_fetcher.context().monotonic_event_time <
711 header_monotonic_remote_time) {
712 ASSERT_TRUE(pi1_on_pi1_timestamp_fetcher.FetchNext());
713 }
714
715 pi1_context = &pi1_on_pi1_timestamp_fetcher.context();
716 pi2_context = &pi1_on_pi2_timestamp_fetcher.context();
717 } else if (header.channel_index() == ping_timestamp_channel) {
718 // Find the forwarded message.
719 while (ping_on_pi2_fetcher.context().monotonic_event_time <
720 header_monotonic_sent_time) {
721 ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
722 }
723
724 // And the source message.
725 while (ping_on_pi1_fetcher.context().monotonic_event_time <
726 header_monotonic_remote_time) {
727 ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
728 }
729
730 pi1_context = &ping_on_pi1_fetcher.context();
731 pi2_context = &ping_on_pi2_fetcher.context();
732 } else {
733 LOG(FATAL) << "Unknown channel";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700734 }
735
Austin Schuh89c9b812021-02-20 14:42:10 -0800736 // Confirm the forwarded message has matching timestamps to the
737 // timestamps we got back.
738 EXPECT_EQ(pi2_context->queue_index, header.queue_index());
739 EXPECT_EQ(pi2_context->remote_queue_index,
740 header.remote_queue_index());
741 EXPECT_EQ(pi2_context->monotonic_event_time,
742 header_monotonic_sent_time);
743 EXPECT_EQ(pi2_context->realtime_event_time,
744 header_realtime_sent_time);
745 EXPECT_EQ(pi2_context->realtime_remote_time,
746 header_realtime_remote_time);
747 EXPECT_EQ(pi2_context->monotonic_remote_time,
748 header_monotonic_remote_time);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700749
Austin Schuh89c9b812021-02-20 14:42:10 -0800750 // Confirm the forwarded message also matches the source message.
751 EXPECT_EQ(pi1_context->queue_index, header.remote_queue_index());
752 EXPECT_EQ(pi1_context->monotonic_event_time,
753 header_monotonic_remote_time);
754 EXPECT_EQ(pi1_context->realtime_event_time,
755 header_realtime_remote_time);
756 });
757 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700758
Austin Schuh4c3b9702020-08-30 11:34:55 -0700759 simulated_event_loop_factory.RunFor(chrono::seconds(10) -
760 chrono::milliseconds(500) +
761 chrono::milliseconds(5));
762
763 EXPECT_EQ(pi1_pong_counter.count(), 1001);
764 EXPECT_EQ(pi2_pong_counter.count(), 1001);
765
766 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 100);
767 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 100);
768 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 100);
769 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 100);
770 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 100);
771 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 100);
772 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 100);
773
Austin Schuh20ac95d2020-12-05 17:24:19 -0800774 EXPECT_EQ(pi1_server_statistics_count, 10);
775 EXPECT_EQ(pi2_server_statistics_count, 10);
776 EXPECT_EQ(pi3_server_statistics_count, 10);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700777
778 EXPECT_EQ(pi1_client_statistics_count, 95);
779 EXPECT_EQ(pi2_client_statistics_count, 95);
780 EXPECT_EQ(pi3_client_statistics_count, 95);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700781
782 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -0800783 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 1101);
784 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 1101);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700785}
786
787// Tests that an offset between nodes can be recovered and shows up in
788// ServerStatistics correctly.
789TEST(SimulatedEventLoopTest, MultinodePingPongWithOffset) {
790 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
Austin Schuh373f1762021-06-02 21:07:09 -0700791 aos::configuration::ReadConfig(ArtifactPath(
792 "aos/events/multinode_pingpong_test_combined_config.json"));
Austin Schuh4c3b9702020-08-30 11:34:55 -0700793 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
Austin Schuh87dd3832021-01-01 23:07:31 -0800794 const size_t pi1_index = configuration::GetNodeIndex(&config.message(), pi1);
795 ASSERT_EQ(pi1_index, 0u);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700796 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
Austin Schuh87dd3832021-01-01 23:07:31 -0800797 const size_t pi2_index = configuration::GetNodeIndex(&config.message(), pi2);
798 ASSERT_EQ(pi2_index, 1u);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700799 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
Austin Schuh87dd3832021-01-01 23:07:31 -0800800 const size_t pi3_index = configuration::GetNodeIndex(&config.message(), pi3);
801 ASSERT_EQ(pi3_index, 2u);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700802
Austin Schuh87dd3832021-01-01 23:07:31 -0800803 message_bridge::TestingTimeConverter time(
804 configuration::NodesCount(&config.message()));
Austin Schuh4c3b9702020-08-30 11:34:55 -0700805 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
Austin Schuh58646e22021-08-23 23:51:46 -0700806 simulated_event_loop_factory.SetTimeConverter(&time);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700807
808 constexpr chrono::milliseconds kOffset{1501};
Austin Schuh87dd3832021-01-01 23:07:31 -0800809 time.AddNextTimestamp(
810 distributed_clock::epoch(),
Austin Schuh58646e22021-08-23 23:51:46 -0700811 {BootTimestamp::epoch(), BootTimestamp::epoch() + kOffset,
812 BootTimestamp::epoch()});
Austin Schuh4c3b9702020-08-30 11:34:55 -0700813
814 std::unique_ptr<EventLoop> ping_event_loop =
815 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
816 Ping ping(ping_event_loop.get());
817
818 std::unique_ptr<EventLoop> pong_event_loop =
819 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
820 Pong pong(pong_event_loop.get());
821
Austin Schuh8fb315a2020-11-19 22:33:58 -0800822 // Wait to let timestamp estimation start up before looking for the results.
823 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
824
Austin Schuh87dd3832021-01-01 23:07:31 -0800825 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
826 simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
827
Austin Schuh4c3b9702020-08-30 11:34:55 -0700828 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
829 simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
830
831 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
832 simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
833
Austin Schuh4c3b9702020-08-30 11:34:55 -0700834 // Confirm the offsets are being recovered correctly.
835 int pi1_server_statistics_count = 0;
836 pi1_pong_counter_event_loop->MakeWatcher(
837 "/pi1/aos", [&pi1_server_statistics_count,
838 kOffset](const message_bridge::ServerStatistics &stats) {
839 VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
840 EXPECT_EQ(stats.connections()->size(), 2u);
841 for (const message_bridge::ServerConnection *connection :
842 *stats.connections()) {
843 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800844 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700845 if (connection->node()->name()->string_view() == "pi2") {
846 EXPECT_EQ(connection->monotonic_offset(),
847 chrono::nanoseconds(kOffset).count());
848 } else if (connection->node()->name()->string_view() == "pi3") {
849 EXPECT_EQ(connection->monotonic_offset(), 0);
850 } else {
851 LOG(FATAL) << "Unknown connection";
852 }
853
854 EXPECT_TRUE(connection->has_monotonic_offset());
855 }
856 ++pi1_server_statistics_count;
857 });
858
859 int pi2_server_statistics_count = 0;
860 pi2_pong_counter_event_loop->MakeWatcher(
861 "/pi2/aos", [&pi2_server_statistics_count,
862 kOffset](const message_bridge::ServerStatistics &stats) {
863 VLOG(1) << "pi2 ServerStatistics " << FlatbufferToJson(&stats);
864 EXPECT_EQ(stats.connections()->size(), 1u);
865
866 const message_bridge::ServerConnection *connection =
867 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800868 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700869 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
870 EXPECT_TRUE(connection->has_monotonic_offset());
871 EXPECT_EQ(connection->monotonic_offset(),
872 -chrono::nanoseconds(kOffset).count());
873 ++pi2_server_statistics_count;
874 });
875
876 int pi3_server_statistics_count = 0;
877 pi3_pong_counter_event_loop->MakeWatcher(
878 "/pi3/aos", [&pi3_server_statistics_count](
879 const message_bridge::ServerStatistics &stats) {
880 VLOG(1) << "pi3 ServerStatistics " << FlatbufferToJson(&stats);
881 EXPECT_EQ(stats.connections()->size(), 1u);
882
883 const message_bridge::ServerConnection *connection =
884 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800885 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700886 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
887 EXPECT_TRUE(connection->has_monotonic_offset());
888 EXPECT_EQ(connection->monotonic_offset(), 0);
889 ++pi3_server_statistics_count;
890 });
891
892 simulated_event_loop_factory.RunFor(chrono::seconds(10) -
893 chrono::milliseconds(500) +
894 chrono::milliseconds(5));
895
Austin Schuh20ac95d2020-12-05 17:24:19 -0800896 EXPECT_EQ(pi1_server_statistics_count, 10);
Austin Schuh58646e22021-08-23 23:51:46 -0700897 EXPECT_EQ(pi2_server_statistics_count, 10);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800898 EXPECT_EQ(pi3_server_statistics_count, 10);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700899}
900
901// Test that disabling statistics actually disables them.
Austin Schuh89c9b812021-02-20 14:42:10 -0800902TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeWithoutStatistics) {
Austin Schuh4c3b9702020-08-30 11:34:55 -0700903 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
904 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
905 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
906
907 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
908 simulated_event_loop_factory.DisableStatistics();
909
910 std::unique_ptr<EventLoop> ping_event_loop =
911 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
912 Ping ping(ping_event_loop.get());
913
914 std::unique_ptr<EventLoop> pong_event_loop =
915 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
916 Pong pong(pong_event_loop.get());
917
918 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
919 simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
920
921 MessageCounter<examples::Pong> pi2_pong_counter(
922 pi2_pong_counter_event_loop.get(), "/test");
923
924 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
925 simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
926
927 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
928 simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
929
930 MessageCounter<examples::Pong> pi1_pong_counter(
931 pi1_pong_counter_event_loop.get(), "/test");
932
933 // Count timestamps.
934 MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
935 pi1_pong_counter_event_loop.get(), "/pi1/aos");
936 MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
937 pi2_pong_counter_event_loop.get(), "/pi1/aos");
938 MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
939 pi3_pong_counter_event_loop.get(), "/pi1/aos");
940 MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
941 pi1_pong_counter_event_loop.get(), "/pi2/aos");
942 MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
943 pi2_pong_counter_event_loop.get(), "/pi2/aos");
944 MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
945 pi1_pong_counter_event_loop.get(), "/pi3/aos");
946 MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
947 pi3_pong_counter_event_loop.get(), "/pi3/aos");
948
Austin Schuh2f8fd752020-09-01 22:38:28 -0700949 // Count remote timestamps
Austin Schuh89c9b812021-02-20 14:42:10 -0800950 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
951 remote_timestamps_pi2_on_pi1 =
952 MakePi2OnPi1MessageCounters(pi1_pong_counter_event_loop.get());
953 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
954 remote_timestamps_pi1_on_pi2 =
955 MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700956
Austin Schuh4c3b9702020-08-30 11:34:55 -0700957 MessageCounter<message_bridge::ServerStatistics>
958 pi1_server_statistics_counter(pi1_pong_counter_event_loop.get(),
959 "/pi1/aos");
960 MessageCounter<message_bridge::ServerStatistics>
961 pi2_server_statistics_counter(pi2_pong_counter_event_loop.get(),
962 "/pi2/aos");
963 MessageCounter<message_bridge::ServerStatistics>
964 pi3_server_statistics_counter(pi3_pong_counter_event_loop.get(),
965 "/pi3/aos");
966
967 MessageCounter<message_bridge::ClientStatistics>
968 pi1_client_statistics_counter(pi1_pong_counter_event_loop.get(),
969 "/pi1/aos");
970 MessageCounter<message_bridge::ClientStatistics>
971 pi2_client_statistics_counter(pi2_pong_counter_event_loop.get(),
972 "/pi2/aos");
973 MessageCounter<message_bridge::ClientStatistics>
974 pi3_client_statistics_counter(pi3_pong_counter_event_loop.get(),
975 "/pi3/aos");
Austin Schuh898f4972020-01-11 17:21:25 -0800976
977 simulated_event_loop_factory.RunFor(chrono::seconds(10) +
978 chrono::milliseconds(5));
979
Austin Schuh4c3b9702020-08-30 11:34:55 -0700980 EXPECT_EQ(pi1_pong_counter.count(), 1001u);
981 EXPECT_EQ(pi2_pong_counter.count(), 1001u);
982
983 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 0u);
984 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 0u);
985 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 0u);
986 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 0u);
987 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 0u);
988 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 0u);
989 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 0u);
990
991 EXPECT_EQ(pi1_server_statistics_counter.count(), 0u);
992 EXPECT_EQ(pi2_server_statistics_counter.count(), 0u);
993 EXPECT_EQ(pi3_server_statistics_counter.count(), 0u);
994
995 EXPECT_EQ(pi1_client_statistics_counter.count(), 0u);
996 EXPECT_EQ(pi2_client_statistics_counter.count(), 0u);
997 EXPECT_EQ(pi3_client_statistics_counter.count(), 0u);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700998
999 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001000 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 1001);
1001 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 1001);
Austin Schuh898f4972020-01-11 17:21:25 -08001002}
1003
Austin Schuhc0b0f722020-12-12 18:36:06 -08001004bool AllConnected(const message_bridge::ServerStatistics *server_statistics) {
1005 for (const message_bridge::ServerConnection *connection :
1006 *server_statistics->connections()) {
1007 if (connection->state() != message_bridge::State::CONNECTED) {
1008 return false;
1009 }
1010 }
1011 return true;
1012}
1013
1014bool AllConnectedBut(const message_bridge::ServerStatistics *server_statistics,
1015 std::string_view target) {
1016 for (const message_bridge::ServerConnection *connection :
1017 *server_statistics->connections()) {
1018 if (connection->node()->name()->string_view() == target) {
1019 if (connection->state() == message_bridge::State::CONNECTED) {
1020 return false;
1021 }
1022 } else {
1023 if (connection->state() != message_bridge::State::CONNECTED) {
1024 return false;
1025 }
1026 }
1027 }
1028 return true;
1029}
1030
1031bool AllConnected(const message_bridge::ClientStatistics *client_statistics) {
1032 for (const message_bridge::ClientConnection *connection :
1033 *client_statistics->connections()) {
1034 if (connection->state() != message_bridge::State::CONNECTED) {
1035 return false;
1036 }
Austin Schuh367a7f42021-11-23 23:04:36 -08001037 EXPECT_TRUE(connection->has_boot_uuid());
1038 EXPECT_TRUE(connection->has_connected_since_time());
1039 EXPECT_TRUE(connection->has_connection_count());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001040 }
1041 return true;
1042}
1043
1044bool AllConnectedBut(const message_bridge::ClientStatistics *client_statistics,
1045 std::string_view target) {
1046 for (const message_bridge::ClientConnection *connection :
1047 *client_statistics->connections()) {
1048 if (connection->node()->name()->string_view() == target) {
1049 if (connection->state() == message_bridge::State::CONNECTED) {
1050 return false;
1051 }
Austin Schuh367a7f42021-11-23 23:04:36 -08001052 EXPECT_FALSE(connection->has_boot_uuid());
1053 EXPECT_FALSE(connection->has_connected_since_time());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001054 } else {
1055 if (connection->state() != message_bridge::State::CONNECTED) {
1056 return false;
1057 }
Austin Schuh367a7f42021-11-23 23:04:36 -08001058 EXPECT_TRUE(connection->has_boot_uuid());
1059 EXPECT_TRUE(connection->has_connected_since_time());
1060 EXPECT_TRUE(connection->has_connection_count());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001061 }
1062 }
1063 return true;
1064}
1065
Austin Schuh367a7f42021-11-23 23:04:36 -08001066int ConnectedCount(const message_bridge::ClientStatistics *client_statistics,
1067 std::string_view target) {
1068 for (const message_bridge::ClientConnection *connection :
1069 *client_statistics->connections()) {
1070 if (connection->node()->name()->string_view() == target) {
1071 return connection->connection_count();
1072 }
1073 }
1074 return 0;
1075}
1076
1077int ConnectedCount(const message_bridge::ServerStatistics *server_statistics,
1078 std::string_view target) {
1079 for (const message_bridge::ServerConnection *connection :
1080 *server_statistics->connections()) {
1081 if (connection->node()->name()->string_view() == target) {
1082 return connection->connection_count();
1083 }
1084 }
1085 return 0;
1086}
1087
Austin Schuhc0b0f722020-12-12 18:36:06 -08001088// Test that disconnecting nodes actually disconnects them.
Austin Schuh89c9b812021-02-20 14:42:10 -08001089TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeDisconnect) {
Austin Schuhc0b0f722020-12-12 18:36:06 -08001090 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
1091
Austin Schuh58646e22021-08-23 23:51:46 -07001092 NodeEventLoopFactory *pi1 =
1093 simulated_event_loop_factory.GetNodeEventLoopFactory("pi1");
1094 NodeEventLoopFactory *pi2 =
1095 simulated_event_loop_factory.GetNodeEventLoopFactory("pi2");
1096 NodeEventLoopFactory *pi3 =
1097 simulated_event_loop_factory.GetNodeEventLoopFactory("pi3");
1098
1099 std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001100 Ping ping(ping_event_loop.get());
1101
Austin Schuh58646e22021-08-23 23:51:46 -07001102 std::unique_ptr<EventLoop> pong_event_loop = pi2->MakeEventLoop("pong");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001103 Pong pong(pong_event_loop.get());
1104
1105 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
Austin Schuh58646e22021-08-23 23:51:46 -07001106 pi2->MakeEventLoop("pi2_pong_counter");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001107
1108 MessageCounter<examples::Pong> pi2_pong_counter(
1109 pi2_pong_counter_event_loop.get(), "/test");
1110
1111 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
Austin Schuh58646e22021-08-23 23:51:46 -07001112 pi3->MakeEventLoop("pi3_pong_counter");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001113
1114 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
Austin Schuh58646e22021-08-23 23:51:46 -07001115 pi1->MakeEventLoop("pi1_pong_counter");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001116
1117 MessageCounter<examples::Pong> pi1_pong_counter(
1118 pi1_pong_counter_event_loop.get(), "/test");
1119
1120 // Count timestamps.
1121 MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
1122 pi1_pong_counter_event_loop.get(), "/pi1/aos");
1123 MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
1124 pi2_pong_counter_event_loop.get(), "/pi1/aos");
1125 MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
1126 pi3_pong_counter_event_loop.get(), "/pi1/aos");
1127 MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
1128 pi1_pong_counter_event_loop.get(), "/pi2/aos");
1129 MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
1130 pi2_pong_counter_event_loop.get(), "/pi2/aos");
1131 MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
1132 pi1_pong_counter_event_loop.get(), "/pi3/aos");
1133 MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
1134 pi3_pong_counter_event_loop.get(), "/pi3/aos");
1135
1136 // Count remote timestamps
Austin Schuh89c9b812021-02-20 14:42:10 -08001137 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1138 remote_timestamps_pi2_on_pi1 =
1139 MakePi2OnPi1MessageCounters(pi1_pong_counter_event_loop.get());
1140 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1141 remote_timestamps_pi1_on_pi2 =
1142 MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001143
1144 MessageCounter<message_bridge::ServerStatistics>
Austin Schuh58646e22021-08-23 23:51:46 -07001145 *pi1_server_statistics_counter;
1146 pi1->OnStartup([pi1, &pi1_server_statistics_counter]() {
1147 pi1_server_statistics_counter =
1148 pi1->AlwaysStart<MessageCounter<message_bridge::ServerStatistics>>(
1149 "pi1_server_statistics_counter", "/pi1/aos");
1150 });
1151
Austin Schuhc0b0f722020-12-12 18:36:06 -08001152 aos::Fetcher<message_bridge::ServerStatistics> pi1_server_statistics_fetcher =
1153 pi1_pong_counter_event_loop
1154 ->MakeFetcher<message_bridge::ServerStatistics>("/pi1/aos");
1155 aos::Fetcher<message_bridge::ClientStatistics> pi1_client_statistics_fetcher =
1156 pi1_pong_counter_event_loop
1157 ->MakeFetcher<message_bridge::ClientStatistics>("/pi1/aos");
1158
1159 MessageCounter<message_bridge::ServerStatistics>
Austin Schuh58646e22021-08-23 23:51:46 -07001160 *pi2_server_statistics_counter;
1161 pi2->OnStartup([pi2, &pi2_server_statistics_counter]() {
1162 pi2_server_statistics_counter =
1163 pi2->AlwaysStart<MessageCounter<message_bridge::ServerStatistics>>(
1164 "pi2_server_statistics_counter", "/pi2/aos");
1165 });
Austin Schuhc0b0f722020-12-12 18:36:06 -08001166 aos::Fetcher<message_bridge::ServerStatistics> pi2_server_statistics_fetcher =
1167 pi2_pong_counter_event_loop
1168 ->MakeFetcher<message_bridge::ServerStatistics>("/pi2/aos");
1169 aos::Fetcher<message_bridge::ClientStatistics> pi2_client_statistics_fetcher =
1170 pi2_pong_counter_event_loop
1171 ->MakeFetcher<message_bridge::ClientStatistics>("/pi2/aos");
1172
1173 MessageCounter<message_bridge::ServerStatistics>
Austin Schuh58646e22021-08-23 23:51:46 -07001174 *pi3_server_statistics_counter;
1175 pi3->OnStartup([pi3, &pi3_server_statistics_counter]() {
1176 pi3_server_statistics_counter =
1177 pi3->AlwaysStart<MessageCounter<message_bridge::ServerStatistics>>(
1178 "pi3_server_statistics_counter", "/pi3/aos");
1179 });
Austin Schuhc0b0f722020-12-12 18:36:06 -08001180 aos::Fetcher<message_bridge::ServerStatistics> pi3_server_statistics_fetcher =
1181 pi3_pong_counter_event_loop
1182 ->MakeFetcher<message_bridge::ServerStatistics>("/pi3/aos");
1183 aos::Fetcher<message_bridge::ClientStatistics> pi3_client_statistics_fetcher =
1184 pi3_pong_counter_event_loop
1185 ->MakeFetcher<message_bridge::ClientStatistics>("/pi3/aos");
1186
1187 MessageCounter<message_bridge::ClientStatistics>
1188 pi1_client_statistics_counter(pi1_pong_counter_event_loop.get(),
1189 "/pi1/aos");
1190 MessageCounter<message_bridge::ClientStatistics>
1191 pi2_client_statistics_counter(pi2_pong_counter_event_loop.get(),
1192 "/pi2/aos");
1193 MessageCounter<message_bridge::ClientStatistics>
1194 pi3_client_statistics_counter(pi3_pong_counter_event_loop.get(),
1195 "/pi3/aos");
1196
1197 simulated_event_loop_factory.RunFor(chrono::seconds(2) +
1198 chrono::milliseconds(5));
1199
1200 EXPECT_EQ(pi1_pong_counter.count(), 201u);
1201 EXPECT_EQ(pi2_pong_counter.count(), 201u);
1202
1203 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 20u);
1204 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 20u);
1205 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 20u);
1206 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 20u);
1207 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 20u);
1208 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 20u);
1209 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 20u);
1210
Austin Schuh58646e22021-08-23 23:51:46 -07001211 EXPECT_EQ(pi1_server_statistics_counter->count(), 2u);
1212 EXPECT_EQ(pi2_server_statistics_counter->count(), 2u);
1213 EXPECT_EQ(pi3_server_statistics_counter->count(), 2u);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001214
1215 EXPECT_EQ(pi1_client_statistics_counter.count(), 20u);
1216 EXPECT_EQ(pi2_client_statistics_counter.count(), 20u);
1217 EXPECT_EQ(pi3_client_statistics_counter.count(), 20u);
1218
1219 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001220 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 221);
1221 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 221);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001222
1223 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1224 EXPECT_TRUE(AllConnected(pi1_server_statistics_fetcher.get()))
1225 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1226 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1227 EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
1228 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1229 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1230 EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
1231 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1232 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1233 EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
1234 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1235 EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
1236 EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
1237 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
1238 EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
1239 EXPECT_TRUE(AllConnected(pi3_client_statistics_fetcher.get()))
1240 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1241
Austin Schuh58646e22021-08-23 23:51:46 -07001242 pi1->Disconnect(pi3->node());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001243
1244 simulated_event_loop_factory.RunFor(chrono::seconds(2));
1245
1246 EXPECT_EQ(pi1_pong_counter.count(), 401u);
1247 EXPECT_EQ(pi2_pong_counter.count(), 401u);
1248
1249 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 40u);
1250 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 40u);
1251 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 20u);
1252 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 40u);
1253 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 40u);
1254 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 40u);
1255 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 40u);
1256
Austin Schuh58646e22021-08-23 23:51:46 -07001257 EXPECT_EQ(pi1_server_statistics_counter->count(), 4u);
1258 EXPECT_EQ(pi2_server_statistics_counter->count(), 4u);
1259 EXPECT_EQ(pi3_server_statistics_counter->count(), 4u);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001260
1261 EXPECT_EQ(pi1_client_statistics_counter.count(), 40u);
1262 EXPECT_EQ(pi2_client_statistics_counter.count(), 40u);
1263 EXPECT_EQ(pi3_client_statistics_counter.count(), 40u);
1264
1265 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001266 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 441);
1267 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 441);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001268
1269 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1270 EXPECT_TRUE(AllConnectedBut(pi1_server_statistics_fetcher.get(), "pi3"))
1271 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1272 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1273 EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
1274 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1275 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1276 EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
1277 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1278 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1279 EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
1280 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1281 EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
1282 EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
1283 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
1284 EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
1285 EXPECT_TRUE(AllConnectedBut(pi3_client_statistics_fetcher.get(), "pi1"))
1286 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1287
Austin Schuh58646e22021-08-23 23:51:46 -07001288 pi1->Connect(pi3->node());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001289
1290 simulated_event_loop_factory.RunFor(chrono::seconds(2));
1291
Austin Schuh367a7f42021-11-23 23:04:36 -08001292 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1293 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1294 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1295 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1296 EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
1297 EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
1298
1299 EXPECT_EQ(ConnectedCount(pi1_server_statistics_fetcher.get(), "pi3"), 2u)
1300 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1301 EXPECT_EQ(ConnectedCount(pi1_server_statistics_fetcher.get(), "pi2"), 1u)
1302 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1303 EXPECT_EQ(ConnectedCount(pi1_client_statistics_fetcher.get(), "pi3"), 1u)
1304 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1305 EXPECT_EQ(ConnectedCount(pi1_client_statistics_fetcher.get(), "pi2"), 1u)
1306 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1307
1308 EXPECT_EQ(ConnectedCount(pi2_server_statistics_fetcher.get(), "pi1"), 1u)
1309 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1310 EXPECT_EQ(ConnectedCount(pi2_client_statistics_fetcher.get(), "pi1"), 1u)
1311 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1312
1313 EXPECT_EQ(ConnectedCount(pi3_server_statistics_fetcher.get(), "pi1"), 1u)
1314 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
1315 EXPECT_EQ(ConnectedCount(pi3_client_statistics_fetcher.get(), "pi1"), 2u)
1316 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1317
1318
Austin Schuhc0b0f722020-12-12 18:36:06 -08001319 EXPECT_EQ(pi1_pong_counter.count(), 601u);
1320 EXPECT_EQ(pi2_pong_counter.count(), 601u);
1321
1322 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 60u);
1323 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 60u);
1324 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 40u);
1325 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 60u);
1326 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 60u);
1327 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 60u);
1328 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 60u);
1329
Austin Schuh58646e22021-08-23 23:51:46 -07001330 EXPECT_EQ(pi1_server_statistics_counter->count(), 6u);
1331 EXPECT_EQ(pi2_server_statistics_counter->count(), 6u);
1332 EXPECT_EQ(pi3_server_statistics_counter->count(), 6u);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001333
1334 EXPECT_EQ(pi1_client_statistics_counter.count(), 60u);
1335 EXPECT_EQ(pi2_client_statistics_counter.count(), 60u);
1336 EXPECT_EQ(pi3_client_statistics_counter.count(), 60u);
1337
1338 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001339 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 661);
1340 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 661);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001341
Austin Schuhc0b0f722020-12-12 18:36:06 -08001342 EXPECT_TRUE(AllConnected(pi1_server_statistics_fetcher.get()))
1343 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001344 EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
1345 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001346 EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
1347 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001348 EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
1349 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001350 EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
1351 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001352 EXPECT_TRUE(AllConnected(pi3_client_statistics_fetcher.get()))
1353 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1354}
1355
Austin Schuh2febf0d2020-09-21 22:24:30 -07001356// Tests that the time offset having a slope doesn't break the world.
1357// SimulatedMessageBridge has enough self consistency CHECK statements to
1358// confirm, and we can can also check a message in each direction to make sure
1359// it gets delivered as expected.
1360TEST(SimulatedEventLoopTest, MultinodePingPongWithOffsetAndSlope) {
1361 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
Austin Schuh373f1762021-06-02 21:07:09 -07001362 aos::configuration::ReadConfig(ArtifactPath(
1363 "aos/events/multinode_pingpong_test_combined_config.json"));
Austin Schuh2febf0d2020-09-21 22:24:30 -07001364 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
Austin Schuh87dd3832021-01-01 23:07:31 -08001365 const size_t pi1_index = configuration::GetNodeIndex(&config.message(), pi1);
1366 ASSERT_EQ(pi1_index, 0u);
Austin Schuh2febf0d2020-09-21 22:24:30 -07001367 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
Austin Schuh87dd3832021-01-01 23:07:31 -08001368 const size_t pi2_index = configuration::GetNodeIndex(&config.message(), pi2);
1369 ASSERT_EQ(pi2_index, 1u);
1370 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
1371 const size_t pi3_index = configuration::GetNodeIndex(&config.message(), pi3);
1372 ASSERT_EQ(pi3_index, 2u);
Austin Schuh2febf0d2020-09-21 22:24:30 -07001373
Austin Schuh87dd3832021-01-01 23:07:31 -08001374 message_bridge::TestingTimeConverter time(
1375 configuration::NodesCount(&config.message()));
Austin Schuh2febf0d2020-09-21 22:24:30 -07001376 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
Austin Schuh58646e22021-08-23 23:51:46 -07001377 simulated_event_loop_factory.SetTimeConverter(&time);
Austin Schuh2febf0d2020-09-21 22:24:30 -07001378
Austin Schuh2febf0d2020-09-21 22:24:30 -07001379 constexpr chrono::milliseconds kOffset{150100};
Austin Schuh87dd3832021-01-01 23:07:31 -08001380 time.AddNextTimestamp(
1381 distributed_clock::epoch(),
Austin Schuh58646e22021-08-23 23:51:46 -07001382 {BootTimestamp::epoch(), BootTimestamp::epoch() + kOffset,
1383 BootTimestamp::epoch()});
1384 time.AddNextTimestamp(distributed_clock::epoch() + chrono::seconds(10),
1385 {BootTimestamp::epoch() + chrono::milliseconds(9999),
1386 BootTimestamp::epoch() + kOffset + chrono::seconds(10),
1387 BootTimestamp::epoch() + chrono::milliseconds(9999)});
Austin Schuh2febf0d2020-09-21 22:24:30 -07001388
1389 std::unique_ptr<EventLoop> ping_event_loop =
1390 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
1391 Ping ping(ping_event_loop.get());
1392
1393 std::unique_ptr<EventLoop> pong_event_loop =
1394 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
1395 Pong pong(pong_event_loop.get());
1396
1397 std::unique_ptr<EventLoop> pi1_counter_event_loop =
1398 simulated_event_loop_factory.MakeEventLoop("pi1_counter", pi1);
1399 std::unique_ptr<EventLoop> pi2_counter_event_loop =
1400 simulated_event_loop_factory.MakeEventLoop("pi2_counter", pi2);
1401
1402 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
1403 pi1_counter_event_loop->MakeFetcher<examples::Ping>("/test");
1404 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
1405 pi2_counter_event_loop->MakeFetcher<examples::Ping>("/test");
1406
1407 aos::Fetcher<examples::Pong> pong_on_pi2_fetcher =
1408 pi2_counter_event_loop->MakeFetcher<examples::Pong>("/test");
1409 aos::Fetcher<examples::Pong> pong_on_pi1_fetcher =
1410 pi1_counter_event_loop->MakeFetcher<examples::Pong>("/test");
1411
1412 // End after a pong message comes back. This will leave the latest messages
1413 // on all channels so we can look at timestamps easily and check they make
1414 // sense.
1415 std::unique_ptr<EventLoop> pi1_pong_ender =
1416 simulated_event_loop_factory.MakeEventLoop("pi2_counter", pi1);
1417 int count = 0;
1418 pi1_pong_ender->MakeWatcher(
1419 "/test", [&simulated_event_loop_factory, &count](const examples::Pong &) {
1420 if (++count == 100) {
1421 simulated_event_loop_factory.Exit();
1422 }
1423 });
1424
1425 // Run enough that messages should be delivered.
1426 simulated_event_loop_factory.Run();
1427
1428 // Grab the latest messages.
1429 EXPECT_TRUE(ping_on_pi1_fetcher.Fetch());
1430 EXPECT_TRUE(ping_on_pi2_fetcher.Fetch());
1431 EXPECT_TRUE(pong_on_pi1_fetcher.Fetch());
1432 EXPECT_TRUE(pong_on_pi2_fetcher.Fetch());
1433
1434 // Compute their time on the global distributed clock so we can compute
1435 // distance betwen them.
1436 const distributed_clock::time_point pi1_ping_time =
1437 simulated_event_loop_factory.GetNodeEventLoopFactory(pi1)
1438 ->ToDistributedClock(
1439 ping_on_pi1_fetcher.context().monotonic_event_time);
1440 const distributed_clock::time_point pi2_ping_time =
1441 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
1442 ->ToDistributedClock(
1443 ping_on_pi2_fetcher.context().monotonic_event_time);
1444 const distributed_clock::time_point pi1_pong_time =
1445 simulated_event_loop_factory.GetNodeEventLoopFactory(pi1)
1446 ->ToDistributedClock(
1447 pong_on_pi1_fetcher.context().monotonic_event_time);
1448 const distributed_clock::time_point pi2_pong_time =
1449 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
1450 ->ToDistributedClock(
1451 pong_on_pi2_fetcher.context().monotonic_event_time);
1452
1453 // And confirm the delivery delay is just about exactly 150 uS for both
1454 // directions like expected. There will be a couple ns of rounding errors in
1455 // the conversion functions that aren't worth accounting for right now. This
1456 // will either be really close, or really far.
1457 EXPECT_GE(pi2_ping_time, chrono::microseconds(150) - chrono::nanoseconds(10) +
1458 pi1_ping_time);
1459 EXPECT_LE(pi2_ping_time, chrono::microseconds(150) + chrono::nanoseconds(10) +
1460 pi1_ping_time);
1461
1462 EXPECT_GE(pi1_pong_time, chrono::microseconds(150) - chrono::nanoseconds(10) +
1463 pi2_pong_time);
1464 EXPECT_LE(pi1_pong_time, chrono::microseconds(150) + chrono::nanoseconds(10) +
1465 pi2_pong_time);
1466}
1467
Austin Schuh4c570ea2020-11-19 23:13:24 -08001468void SendPing(aos::Sender<examples::Ping> *sender, int value) {
1469 aos::Sender<examples::Ping>::Builder builder = sender->MakeBuilder();
1470 examples::Ping::Builder ping_builder = builder.MakeBuilder<examples::Ping>();
1471 ping_builder.add_value(value);
milind1f1dca32021-07-03 13:50:07 -07001472 builder.CheckOk(builder.Send(ping_builder.Finish()));
Austin Schuh4c570ea2020-11-19 23:13:24 -08001473}
1474
1475// Tests that reliable (and unreliable) ping messages get forwarded as expected.
Austin Schuh89c9b812021-02-20 14:42:10 -08001476TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeStartupTesting) {
Austin Schuh4c570ea2020-11-19 23:13:24 -08001477 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
1478 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
1479
1480 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
1481
1482 std::unique_ptr<EventLoop> ping_event_loop =
1483 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
1484 aos::Sender<examples::Ping> pi1_reliable_sender =
1485 ping_event_loop->MakeSender<examples::Ping>("/reliable");
1486 aos::Sender<examples::Ping> pi1_unreliable_sender =
1487 ping_event_loop->MakeSender<examples::Ping>("/unreliable");
1488 SendPing(&pi1_reliable_sender, 1);
1489 SendPing(&pi1_unreliable_sender, 1);
1490
1491 std::unique_ptr<EventLoop> pi2_pong_event_loop =
1492 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
1493 MessageCounter<examples::Ping> pi2_reliable_counter(pi2_pong_event_loop.get(),
1494 "/reliable");
1495 MessageCounter<examples::Ping> pi2_unreliable_counter(
1496 pi2_pong_event_loop.get(), "/unreliable");
1497 aos::Fetcher<examples::Ping> reliable_on_pi2_fetcher =
1498 pi2_pong_event_loop->MakeFetcher<examples::Ping>("/reliable");
1499 aos::Fetcher<examples::Ping> unreliable_on_pi2_fetcher =
1500 pi2_pong_event_loop->MakeFetcher<examples::Ping>("/unreliable");
1501
1502 const size_t reliable_channel_index = configuration::ChannelIndex(
1503 pi2_pong_event_loop->configuration(), reliable_on_pi2_fetcher.channel());
1504
1505 std::unique_ptr<EventLoop> pi1_remote_timestamp =
1506 simulated_event_loop_factory.MakeEventLoop("pi1_remote_timestamp", pi1);
1507
Austin Schuheeaa2022021-01-02 21:52:03 -08001508 const chrono::nanoseconds network_delay =
1509 simulated_event_loop_factory.network_delay();
1510
Austin Schuh4c570ea2020-11-19 23:13:24 -08001511 int reliable_timestamp_count = 0;
1512 pi1_remote_timestamp->MakeWatcher(
Austin Schuh89c9b812021-02-20 14:42:10 -08001513 shared() ? "/pi1/aos/remote_timestamps/pi2"
1514 : "/pi1/aos/remote_timestamps/pi2/reliable/aos-examples-Ping",
Austin Schuh20ac95d2020-12-05 17:24:19 -08001515 [reliable_channel_index, &reliable_timestamp_count,
Austin Schuheeaa2022021-01-02 21:52:03 -08001516 &simulated_event_loop_factory, pi2, network_delay, &pi2_pong_event_loop,
1517 &pi1_remote_timestamp](const RemoteMessage &header) {
Austin Schuh20ac95d2020-12-05 17:24:19 -08001518 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuhcdd90272021-03-15 12:46:16 -07001519 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
Austin Schuh20ac95d2020-12-05 17:24:19 -08001520 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
Austin Schuhcdd90272021-03-15 12:46:16 -07001521 ->boot_uuid());
Austin Schuh4c570ea2020-11-19 23:13:24 -08001522 VLOG(1) << aos::FlatbufferToJson(&header);
1523 if (header.channel_index() == reliable_channel_index) {
1524 ++reliable_timestamp_count;
1525 }
Austin Schuheeaa2022021-01-02 21:52:03 -08001526
1527 const aos::monotonic_clock::time_point header_monotonic_sent_time(
1528 chrono::nanoseconds(header.monotonic_sent_time()));
1529
1530 EXPECT_EQ(pi1_remote_timestamp->context().monotonic_event_time,
1531 header_monotonic_sent_time + network_delay +
1532 (pi1_remote_timestamp->monotonic_now() -
1533 pi2_pong_event_loop->monotonic_now()));
Austin Schuh4c570ea2020-11-19 23:13:24 -08001534 });
1535
1536 // Wait to let timestamp estimation start up before looking for the results.
1537 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
1538
1539 EXPECT_EQ(pi2_reliable_counter.count(), 1u);
1540 // This one isn't reliable, but was sent before the start. It should *not* be
1541 // delivered.
1542 EXPECT_EQ(pi2_unreliable_counter.count(), 0u);
1543 // Confirm we got a timestamp logged for the message that was forwarded.
1544 EXPECT_EQ(reliable_timestamp_count, 1u);
1545
1546 SendPing(&pi1_reliable_sender, 2);
1547 SendPing(&pi1_unreliable_sender, 2);
1548 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
1549 EXPECT_EQ(pi2_reliable_counter.count(), 2u);
1550 EXPECT_EQ(pi2_unreliable_counter.count(), 1u);
1551
1552 EXPECT_EQ(reliable_timestamp_count, 2u);
1553}
1554
Austin Schuh20ac95d2020-12-05 17:24:19 -08001555// Tests that rebooting a node changes the ServerStatistics message and the
1556// RemoteTimestamp message.
Austin Schuh89c9b812021-02-20 14:42:10 -08001557TEST_P(RemoteMessageSimulatedEventLoopTest, BootUUIDTest) {
Austin Schuh72e65682021-09-02 11:37:05 -07001558 const UUID pi1_boot0 = UUID::Random();
1559 const UUID pi2_boot0 = UUID::Random();
1560 const UUID pi2_boot1 = UUID::Random();
1561 const UUID pi3_boot0 = UUID::Random();
1562 UUID expected_boot_uuid = pi2_boot0;
Austin Schuh20ac95d2020-12-05 17:24:19 -08001563
Austin Schuh58646e22021-08-23 23:51:46 -07001564 message_bridge::TestingTimeConverter time(
1565 configuration::NodesCount(&config.message()));
1566 SimulatedEventLoopFactory factory(&config.message());
1567 factory.SetTimeConverter(&time);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001568
Austin Schuh58646e22021-08-23 23:51:46 -07001569 const size_t pi1_index =
1570 configuration::GetNodeIndex(&config.message(), "pi1");
1571 const size_t pi2_index =
1572 configuration::GetNodeIndex(&config.message(), "pi2");
1573 const size_t pi3_index =
1574 configuration::GetNodeIndex(&config.message(), "pi3");
Austin Schuh20ac95d2020-12-05 17:24:19 -08001575
Austin Schuh58646e22021-08-23 23:51:46 -07001576 {
1577 time.AddNextTimestamp(distributed_clock::epoch(),
1578 {BootTimestamp::epoch(), BootTimestamp::epoch(),
1579 BootTimestamp::epoch()});
1580
1581 const chrono::nanoseconds dt = chrono::milliseconds(2001);
1582
1583 time.AddNextTimestamp(
1584 distributed_clock::epoch() + dt,
1585 {BootTimestamp::epoch() + dt,
1586 BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
1587 BootTimestamp::epoch() + dt});
1588
1589 time.set_boot_uuid(pi1_index, 0, pi1_boot0);
1590 time.set_boot_uuid(pi2_index, 0, pi2_boot0);
1591 time.set_boot_uuid(pi2_index, 1, pi2_boot1);
1592 time.set_boot_uuid(pi3_index, 0, pi3_boot0);
1593 }
1594
1595 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1596 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
1597
1598 pi1->OnStartup([pi1]() { pi1->AlwaysStart<Ping>("ping"); });
1599 pi2->OnStartup([pi2]() { pi2->AlwaysStart<Pong>("pong"); });
Austin Schuh20ac95d2020-12-05 17:24:19 -08001600
1601 std::unique_ptr<EventLoop> pi1_remote_timestamp =
Austin Schuh58646e22021-08-23 23:51:46 -07001602 pi1->MakeEventLoop("pi1_remote_timestamp");
Austin Schuh20ac95d2020-12-05 17:24:19 -08001603
1604 int timestamp_count = 0;
1605 pi1_remote_timestamp->MakeWatcher(
Austin Schuh8902fa52021-03-14 22:39:24 -07001606 "/pi2/aos", [&expected_boot_uuid,
1607 &pi1_remote_timestamp](const message_bridge::Timestamp &) {
Austin Schuha9012be2021-07-21 15:19:11 -07001608 EXPECT_EQ(pi1_remote_timestamp->context().source_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -07001609 expected_boot_uuid);
1610 });
1611 pi1_remote_timestamp->MakeWatcher(
1612 "/test",
1613 [&expected_boot_uuid, &pi1_remote_timestamp](const examples::Pong &) {
Austin Schuha9012be2021-07-21 15:19:11 -07001614 EXPECT_EQ(pi1_remote_timestamp->context().source_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -07001615 expected_boot_uuid);
1616 });
1617 pi1_remote_timestamp->MakeWatcher(
Austin Schuh89c9b812021-02-20 14:42:10 -08001618 shared() ? "/pi1/aos/remote_timestamps/pi2"
1619 : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping",
Austin Schuh20ac95d2020-12-05 17:24:19 -08001620 [&timestamp_count, &expected_boot_uuid](const RemoteMessage &header) {
1621 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuhcdd90272021-03-15 12:46:16 -07001622 EXPECT_EQ(UUID::FromVector(header.boot_uuid()), expected_boot_uuid);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001623 VLOG(1) << aos::FlatbufferToJson(&header);
1624 ++timestamp_count;
1625 });
1626
1627 int pi1_server_statistics_count = 0;
Austin Schuh58646e22021-08-23 23:51:46 -07001628 bool first_pi1_server_statistics = true;
Austin Schuh367a7f42021-11-23 23:04:36 -08001629 int boot_number = 0;
1630 monotonic_clock::time_point expected_connection_time = pi1->monotonic_now();
Austin Schuh20ac95d2020-12-05 17:24:19 -08001631 pi1_remote_timestamp->MakeWatcher(
Austin Schuh367a7f42021-11-23 23:04:36 -08001632 "/pi1/aos",
1633 [&pi1_server_statistics_count, &expected_boot_uuid,
1634 &expected_connection_time, &first_pi1_server_statistics,
1635 &boot_number](const message_bridge::ServerStatistics &stats) {
Austin Schuh20ac95d2020-12-05 17:24:19 -08001636 VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
1637 for (const message_bridge::ServerConnection *connection :
1638 *stats.connections()) {
Austin Schuh58646e22021-08-23 23:51:46 -07001639 if (connection->state() == message_bridge::State::CONNECTED) {
1640 ASSERT_TRUE(connection->has_boot_uuid());
1641 }
1642 if (!first_pi1_server_statistics) {
1643 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1644 }
Austin Schuh20ac95d2020-12-05 17:24:19 -08001645 if (connection->node()->name()->string_view() == "pi2") {
Austin Schuh58646e22021-08-23 23:51:46 -07001646 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1647 ASSERT_TRUE(connection->has_boot_uuid());
Austin Schuh20ac95d2020-12-05 17:24:19 -08001648 EXPECT_EQ(expected_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -07001649 UUID::FromString(connection->boot_uuid()))
Austin Schuh20ac95d2020-12-05 17:24:19 -08001650 << " : Got " << aos::FlatbufferToJson(&stats);
Austin Schuh367a7f42021-11-23 23:04:36 -08001651 EXPECT_EQ(monotonic_clock::time_point(chrono::nanoseconds(
1652 connection->connected_since_time())),
1653 expected_connection_time);
1654 EXPECT_EQ(boot_number + 1, connection->connection_count());
Austin Schuh20ac95d2020-12-05 17:24:19 -08001655 ++pi1_server_statistics_count;
1656 }
1657 }
Austin Schuh58646e22021-08-23 23:51:46 -07001658 first_pi1_server_statistics = false;
Austin Schuh20ac95d2020-12-05 17:24:19 -08001659 });
1660
Austin Schuh58646e22021-08-23 23:51:46 -07001661 int pi1_client_statistics_count = 0;
1662 pi1_remote_timestamp->MakeWatcher(
Austin Schuh367a7f42021-11-23 23:04:36 -08001663 "/pi1/aos", [&pi1_client_statistics_count, &expected_boot_uuid,
1664 &expected_connection_time, &boot_number](
Austin Schuh58646e22021-08-23 23:51:46 -07001665 const message_bridge::ClientStatistics &stats) {
1666 VLOG(1) << "pi1 ClientStatistics " << FlatbufferToJson(&stats);
1667 for (const message_bridge::ClientConnection *connection :
1668 *stats.connections()) {
1669 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1670 if (connection->node()->name()->string_view() == "pi2") {
1671 ++pi1_client_statistics_count;
Austin Schuh367a7f42021-11-23 23:04:36 -08001672 EXPECT_EQ(expected_boot_uuid,
1673 UUID::FromString(connection->boot_uuid()))
1674 << " : Got " << aos::FlatbufferToJson(&stats);
1675 EXPECT_EQ(monotonic_clock::time_point(chrono::nanoseconds(
1676 connection->connected_since_time())),
1677 expected_connection_time);
1678 EXPECT_EQ(boot_number + 1, connection->connection_count());
1679 } else {
1680 EXPECT_EQ(connection->connected_since_time(), 0);
1681 EXPECT_EQ(1, connection->connection_count());
Austin Schuh58646e22021-08-23 23:51:46 -07001682 }
1683 }
1684 });
1685
1686 // Confirm that reboot changes the UUID.
Austin Schuh367a7f42021-11-23 23:04:36 -08001687 pi2->OnShutdown(
1688 [&expected_boot_uuid, &boot_number, &expected_connection_time, pi1, pi2,
1689 pi2_boot1]() {
1690 expected_boot_uuid = pi2_boot1;
1691 ++boot_number;
1692 LOG(INFO) << "OnShutdown triggered for pi2";
1693 pi2->OnStartup(
1694 [&expected_boot_uuid, &expected_connection_time, pi1, pi2]() {
1695 EXPECT_EQ(expected_boot_uuid, pi2->boot_uuid());
1696 expected_connection_time = pi1->monotonic_now();
1697 });
1698 });
Austin Schuh58646e22021-08-23 23:51:46 -07001699
Austin Schuh20ac95d2020-12-05 17:24:19 -08001700 // Let a couple of ServerStatistics messages show up before rebooting.
Austin Schuh58646e22021-08-23 23:51:46 -07001701 factory.RunFor(chrono::milliseconds(2002));
Austin Schuh20ac95d2020-12-05 17:24:19 -08001702
1703 EXPECT_GT(timestamp_count, 100);
1704 EXPECT_GE(pi1_server_statistics_count, 1u);
1705
Austin Schuh20ac95d2020-12-05 17:24:19 -08001706 timestamp_count = 0;
1707 pi1_server_statistics_count = 0;
1708
Austin Schuh58646e22021-08-23 23:51:46 -07001709 factory.RunFor(chrono::milliseconds(2000));
Austin Schuh20ac95d2020-12-05 17:24:19 -08001710 EXPECT_GT(timestamp_count, 100);
1711 EXPECT_GE(pi1_server_statistics_count, 1u);
1712}
1713
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -07001714INSTANTIATE_TEST_SUITE_P(
Austin Schuh89c9b812021-02-20 14:42:10 -08001715 All, RemoteMessageSimulatedEventLoopTest,
1716 ::testing::Values(
1717 Param{"multinode_pingpong_test_combined_config.json", true},
1718 Param{"multinode_pingpong_test_split_config.json", false}));
1719
Austin Schuh58646e22021-08-23 23:51:46 -07001720// Tests that Startup and Shutdown do reasonable things.
1721TEST(SimulatedEventLoopTest, MultinodePingPongStartup) {
1722 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
1723 aos::configuration::ReadConfig(
1724 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
1725
Austin Schuh72e65682021-09-02 11:37:05 -07001726 size_t pi1_shutdown_counter = 0;
1727 size_t pi2_shutdown_counter = 0;
1728 MessageCounter<examples::Pong> *pi1_pong_counter = nullptr;
1729 MessageCounter<examples::Ping> *pi2_ping_counter = nullptr;
1730
Austin Schuh58646e22021-08-23 23:51:46 -07001731 message_bridge::TestingTimeConverter time(
1732 configuration::NodesCount(&config.message()));
1733 SimulatedEventLoopFactory factory(&config.message());
1734 factory.SetTimeConverter(&time);
1735 time.AddNextTimestamp(
1736 distributed_clock::epoch(),
1737 {BootTimestamp::epoch(), BootTimestamp::epoch(), BootTimestamp::epoch()});
1738
1739 const chrono::nanoseconds dt = chrono::seconds(10) + chrono::milliseconds(6);
1740
1741 time.AddNextTimestamp(
1742 distributed_clock::epoch() + dt,
1743 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
1744 BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
1745 BootTimestamp::epoch() + dt});
1746
1747 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1748 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
1749
1750 // Configure startup to start Ping and Pong, and count.
1751 size_t pi1_startup_counter = 0;
1752 size_t pi2_startup_counter = 0;
1753 pi1->OnStartup([pi1]() {
1754 LOG(INFO) << "Made ping";
1755 pi1->AlwaysStart<Ping>("ping");
1756 });
1757 pi1->OnStartup([&pi1_startup_counter]() { ++pi1_startup_counter; });
1758 pi2->OnStartup([pi2]() {
1759 LOG(INFO) << "Made pong";
1760 pi2->AlwaysStart<Pong>("pong");
1761 });
1762 pi2->OnStartup([&pi2_startup_counter]() { ++pi2_startup_counter; });
1763
1764 // Shutdown just counts.
Austin Schuh58646e22021-08-23 23:51:46 -07001765 pi1->OnShutdown([&pi1_shutdown_counter]() { ++pi1_shutdown_counter; });
1766 pi2->OnShutdown([&pi2_shutdown_counter]() { ++pi2_shutdown_counter; });
1767
Austin Schuh58646e22021-08-23 23:51:46 -07001768 // Automatically make counters on startup.
1769 pi1->OnStartup([&pi1_pong_counter, pi1]() {
1770 pi1_pong_counter = pi1->AlwaysStart<MessageCounter<examples::Pong>>(
1771 "pi1_pong_counter", "/test");
1772 });
1773 pi1->OnShutdown([&pi1_pong_counter]() { pi1_pong_counter = nullptr; });
1774 pi2->OnStartup([&pi2_ping_counter, pi2]() {
1775 pi2_ping_counter = pi2->AlwaysStart<MessageCounter<examples::Ping>>(
1776 "pi2_ping_counter", "/test");
1777 });
1778 pi2->OnShutdown([&pi2_ping_counter]() { pi2_ping_counter = nullptr; });
1779
1780 EXPECT_EQ(pi2_ping_counter, nullptr);
1781 EXPECT_EQ(pi1_pong_counter, nullptr);
1782
1783 EXPECT_EQ(pi1_startup_counter, 0u);
1784 EXPECT_EQ(pi2_startup_counter, 0u);
1785 EXPECT_EQ(pi1_shutdown_counter, 0u);
1786 EXPECT_EQ(pi2_shutdown_counter, 0u);
1787
1788 factory.RunFor(chrono::seconds(10) + chrono::milliseconds(5));
1789 EXPECT_EQ(pi1_startup_counter, 1u);
1790 EXPECT_EQ(pi2_startup_counter, 1u);
1791 EXPECT_EQ(pi1_shutdown_counter, 0u);
1792 EXPECT_EQ(pi2_shutdown_counter, 0u);
1793 EXPECT_EQ(pi2_ping_counter->count(), 1001);
1794 EXPECT_EQ(pi1_pong_counter->count(), 1001);
1795
1796 LOG(INFO) << pi1->monotonic_now();
1797 LOG(INFO) << pi2->monotonic_now();
1798
1799 factory.RunFor(chrono::seconds(5) + chrono::milliseconds(5));
1800
1801 EXPECT_EQ(pi1_startup_counter, 2u);
1802 EXPECT_EQ(pi2_startup_counter, 2u);
1803 EXPECT_EQ(pi1_shutdown_counter, 1u);
1804 EXPECT_EQ(pi2_shutdown_counter, 1u);
1805 EXPECT_EQ(pi2_ping_counter->count(), 501);
1806 EXPECT_EQ(pi1_pong_counter->count(), 501);
1807}
1808
1809// Tests that OnStartup handlers can be added after running and get called, and
1810// can't be called when running.
1811TEST(SimulatedEventLoopDeathTest, OnStartupWhileRunning) {
1812 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
1813 aos::configuration::ReadConfig(
1814 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
1815
1816 // Test that we can add startup handlers as long as we aren't running, and
1817 // they get run when Run gets called again.
1818 // Test that adding a startup handler when running fails.
1819 //
1820 // Test shutdown handlers get called on destruction.
1821 SimulatedEventLoopFactory factory(&config.message());
1822
1823 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1824
1825 int startup_count0 = 0;
1826 int startup_count1 = 0;
1827
1828 pi1->OnStartup([&]() { ++startup_count0; });
1829 EXPECT_EQ(startup_count0, 0);
1830 EXPECT_EQ(startup_count1, 0);
1831
1832 factory.RunFor(chrono::nanoseconds(1));
1833 EXPECT_EQ(startup_count0, 1);
1834 EXPECT_EQ(startup_count1, 0);
1835
1836 pi1->OnStartup([&]() { ++startup_count1; });
1837 EXPECT_EQ(startup_count0, 1);
1838 EXPECT_EQ(startup_count1, 0);
1839
1840 factory.RunFor(chrono::nanoseconds(1));
1841 EXPECT_EQ(startup_count0, 1);
1842 EXPECT_EQ(startup_count1, 1);
1843
1844 std::unique_ptr<EventLoop> loop = pi1->MakeEventLoop("foo");
1845 loop->OnRun([&]() { pi1->OnStartup([]() {}); });
1846
1847 EXPECT_DEATH({ factory.RunFor(chrono::nanoseconds(1)); },
1848 "Can only register OnStartup handlers when not running.");
1849}
1850
1851// Tests that OnStartup handlers can be added after running and get called, and
1852// all the handlers get called on reboot. Shutdown handlers are tested the same
1853// way.
1854TEST(SimulatedEventLoopTest, OnStartupShutdownAllRestarts) {
1855 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
1856 aos::configuration::ReadConfig(
1857 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
1858
Austin Schuh72e65682021-09-02 11:37:05 -07001859 int startup_count0 = 0;
1860 int shutdown_count0 = 0;
1861 int startup_count1 = 0;
1862 int shutdown_count1 = 0;
1863
Austin Schuh58646e22021-08-23 23:51:46 -07001864 message_bridge::TestingTimeConverter time(
1865 configuration::NodesCount(&config.message()));
1866 SimulatedEventLoopFactory factory(&config.message());
1867 factory.SetTimeConverter(&time);
1868 time.StartEqual();
1869
1870 const chrono::nanoseconds dt = chrono::seconds(10);
1871 time.RebootAt(0, distributed_clock::epoch() + dt);
1872 time.RebootAt(0, distributed_clock::epoch() + 2 * dt);
1873
1874 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1875
Austin Schuh58646e22021-08-23 23:51:46 -07001876 pi1->OnStartup([&]() { ++startup_count0; });
1877 pi1->OnShutdown([&]() { ++shutdown_count0; });
1878 EXPECT_EQ(startup_count0, 0);
1879 EXPECT_EQ(startup_count1, 0);
1880 EXPECT_EQ(shutdown_count0, 0);
1881 EXPECT_EQ(shutdown_count1, 0);
1882
1883 factory.RunFor(chrono::nanoseconds(1));
1884 EXPECT_EQ(startup_count0, 1);
1885 EXPECT_EQ(startup_count1, 0);
1886 EXPECT_EQ(shutdown_count0, 0);
1887 EXPECT_EQ(shutdown_count1, 0);
1888
1889 pi1->OnStartup([&]() { ++startup_count1; });
1890 EXPECT_EQ(startup_count0, 1);
1891 EXPECT_EQ(startup_count1, 0);
1892 EXPECT_EQ(shutdown_count0, 0);
1893 EXPECT_EQ(shutdown_count1, 0);
1894
1895 factory.RunFor(chrono::nanoseconds(1));
1896 EXPECT_EQ(startup_count0, 1);
1897 EXPECT_EQ(startup_count1, 1);
1898 EXPECT_EQ(shutdown_count0, 0);
1899 EXPECT_EQ(shutdown_count1, 0);
1900
1901 factory.RunFor(chrono::seconds(15));
1902
1903 EXPECT_EQ(startup_count0, 2);
1904 EXPECT_EQ(startup_count1, 2);
1905 EXPECT_EQ(shutdown_count0, 1);
1906 EXPECT_EQ(shutdown_count1, 0);
1907
1908 pi1->OnShutdown([&]() { ++shutdown_count1; });
1909 factory.RunFor(chrono::seconds(10));
1910
1911 EXPECT_EQ(startup_count0, 3);
1912 EXPECT_EQ(startup_count1, 3);
1913 EXPECT_EQ(shutdown_count0, 2);
1914 EXPECT_EQ(shutdown_count1, 1);
1915}
1916
1917// Tests that event loops which outlive shutdown crash.
1918TEST(SimulatedEventLoopDeathTest, EventLoopOutlivesReboot) {
1919 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
1920 aos::configuration::ReadConfig(
1921 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
1922
1923 message_bridge::TestingTimeConverter time(
1924 configuration::NodesCount(&config.message()));
1925 SimulatedEventLoopFactory factory(&config.message());
1926 factory.SetTimeConverter(&time);
1927 time.StartEqual();
1928
1929 const chrono::nanoseconds dt = chrono::seconds(10);
1930 time.RebootAt(0, distributed_clock::epoch() + dt);
1931
1932 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1933
1934 std::unique_ptr<EventLoop> loop = pi1->MakeEventLoop("foo");
1935
1936 EXPECT_DEATH({ factory.RunFor(dt * 2); }, "Event loop");
1937}
1938
1939// Tests that messages don't survive a reboot of a node.
1940TEST(SimulatedEventLoopTest, ChannelClearedOnReboot) {
1941 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
1942 aos::configuration::ReadConfig(
1943 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
1944
1945 message_bridge::TestingTimeConverter time(
1946 configuration::NodesCount(&config.message()));
1947 SimulatedEventLoopFactory factory(&config.message());
1948 factory.SetTimeConverter(&time);
1949 time.StartEqual();
1950
1951 const chrono::nanoseconds dt = chrono::seconds(10);
1952 time.RebootAt(0, distributed_clock::epoch() + dt);
1953
1954 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1955
1956 const UUID boot_uuid = pi1->boot_uuid();
1957 EXPECT_NE(boot_uuid, UUID::Zero());
1958
1959 {
1960 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
1961 aos::Sender<examples::Ping> test_message_sender =
1962 ping_event_loop->MakeSender<examples::Ping>("/reliable");
1963 SendPing(&test_message_sender, 1);
1964 }
1965
1966 factory.RunFor(chrono::seconds(5));
1967
1968 {
1969 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
1970 aos::Fetcher<examples::Ping> fetcher =
1971 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
1972 EXPECT_TRUE(fetcher.Fetch());
1973 }
1974
1975 factory.RunFor(chrono::seconds(10));
1976
1977 {
1978 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
1979 aos::Fetcher<examples::Ping> fetcher =
1980 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
1981 EXPECT_FALSE(fetcher.Fetch());
1982 }
1983 EXPECT_NE(boot_uuid, pi1->boot_uuid());
1984}
1985
1986// Tests that reliable messages get resent on reboot.
1987TEST(SimulatedEventLoopTest, ReliableMessageResentOnReboot) {
1988 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
1989 aos::configuration::ReadConfig(
1990 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
1991
1992 message_bridge::TestingTimeConverter time(
1993 configuration::NodesCount(&config.message()));
1994 SimulatedEventLoopFactory factory(&config.message());
1995 factory.SetTimeConverter(&time);
1996 time.StartEqual();
1997
1998 const chrono::nanoseconds dt = chrono::seconds(1);
1999 time.RebootAt(1, distributed_clock::epoch() + dt);
2000
2001 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2002 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2003
2004 const UUID pi1_boot_uuid = pi1->boot_uuid();
2005 const UUID pi2_boot_uuid = pi2->boot_uuid();
2006 EXPECT_NE(pi1_boot_uuid, UUID::Zero());
2007 EXPECT_NE(pi2_boot_uuid, UUID::Zero());
2008
2009 {
2010 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
2011 aos::Sender<examples::Ping> test_message_sender =
2012 ping_event_loop->MakeSender<examples::Ping>("/reliable");
2013 SendPing(&test_message_sender, 1);
2014 }
2015
2016 factory.RunFor(chrono::milliseconds(500));
2017
2018 {
2019 ::std::unique_ptr<EventLoop> ping_event_loop = pi2->MakeEventLoop("pong");
2020 aos::Fetcher<examples::Ping> fetcher =
2021 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
2022 EXPECT_TRUE(fetcher.Fetch());
2023 }
2024
2025 factory.RunFor(chrono::seconds(1));
2026
2027 {
2028 ::std::unique_ptr<EventLoop> ping_event_loop = pi2->MakeEventLoop("pong");
2029 aos::Fetcher<examples::Ping> fetcher =
2030 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
2031 EXPECT_TRUE(fetcher.Fetch());
2032 }
2033 EXPECT_NE(pi2_boot_uuid, pi2->boot_uuid());
2034}
2035
Austin Schuh48205e62021-11-12 14:13:18 -08002036class SimulatedEventLoopDisconnectTest : public ::testing::Test {
2037 public:
2038 SimulatedEventLoopDisconnectTest()
2039 : config(aos::configuration::ReadConfig(ArtifactPath(
2040 "aos/events/multinode_pingpong_test_split_config.json"))),
2041 time(configuration::NodesCount(&config.message())),
2042 factory(&config.message()) {
2043 factory.SetTimeConverter(&time);
2044 }
2045
2046 void VerifyChannels(std::set<const aos::Channel *> statistics_channels,
2047 const monotonic_clock::time_point allowable_message_time,
2048 std::set<const aos::Node *> empty_nodes) {
2049 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2050 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2051 std::unique_ptr<aos::EventLoop> pi1_event_loop =
2052 pi1->MakeEventLoop("fetcher");
2053 std::unique_ptr<aos::EventLoop> pi2_event_loop =
2054 pi2->MakeEventLoop("fetcher");
2055 for (const aos::Channel *channel : *factory.configuration()->channels()) {
2056 if (configuration::ChannelIsReadableOnNode(channel,
2057 pi1_event_loop->node())) {
2058 std::unique_ptr<aos::RawFetcher> fetcher =
2059 pi1_event_loop->MakeRawFetcher(channel);
2060 if (statistics_channels.find(channel) == statistics_channels.end() ||
2061 empty_nodes.find(pi1_event_loop->node()) != empty_nodes.end()) {
2062 EXPECT_FALSE(fetcher->Fetch() &&
2063 fetcher->context().monotonic_event_time >
2064 allowable_message_time)
2065 << ": Found recent message on channel "
2066 << configuration::CleanedChannelToString(channel) << " and time "
2067 << fetcher->context().monotonic_event_time << " > "
2068 << allowable_message_time << " on pi1";
2069 } else {
2070 EXPECT_TRUE(fetcher->Fetch() &&
2071 fetcher->context().monotonic_event_time >=
2072 allowable_message_time)
2073 << ": Didn't find recent message on channel "
2074 << configuration::CleanedChannelToString(channel) << " on pi1";
2075 }
2076 }
2077 if (configuration::ChannelIsReadableOnNode(channel,
2078 pi2_event_loop->node())) {
2079 std::unique_ptr<aos::RawFetcher> fetcher =
2080 pi2_event_loop->MakeRawFetcher(channel);
2081 if (statistics_channels.find(channel) == statistics_channels.end() ||
2082 empty_nodes.find(pi2_event_loop->node()) != empty_nodes.end()) {
2083 EXPECT_FALSE(fetcher->Fetch() &&
2084 fetcher->context().monotonic_event_time >
2085 allowable_message_time)
2086 << ": Found message on channel "
2087 << configuration::CleanedChannelToString(channel) << " and time "
2088 << fetcher->context().monotonic_event_time << " > "
2089 << allowable_message_time << " on pi2";
2090 } else {
2091 EXPECT_TRUE(fetcher->Fetch() &&
2092 fetcher->context().monotonic_event_time >=
2093 allowable_message_time)
2094 << ": Didn't find message on channel "
2095 << configuration::CleanedChannelToString(channel) << " on pi2";
2096 }
2097 }
2098 }
2099 }
2100
2101 aos::FlatbufferDetachedBuffer<aos::Configuration> config;
2102
2103 message_bridge::TestingTimeConverter time;
2104 SimulatedEventLoopFactory factory;
2105};
2106
2107// Tests that if we have message bridge client/server disabled, and timing
2108// reports disabled, no messages are sent. Also tests that we can disconnect a
2109// node and disable statistics on it and it actually fully disconnects.
2110TEST_F(SimulatedEventLoopDisconnectTest, NoMessagesWhenDisabled) {
2111 time.StartEqual();
2112 factory.SkipTimingReport();
2113 factory.DisableStatistics();
2114
2115 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2116 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2117
2118 std::unique_ptr<aos::EventLoop> pi1_event_loop =
2119 pi1->MakeEventLoop("fetcher");
2120 std::unique_ptr<aos::EventLoop> pi2_event_loop =
2121 pi2->MakeEventLoop("fetcher");
2122
2123 factory.RunFor(chrono::milliseconds(100000));
2124
2125 // Confirm no messages are sent if we've configured them all off.
2126 VerifyChannels({}, monotonic_clock::min_time, {});
2127
2128 // Now, confirm that all the message_bridge channels come back when we
2129 // re-enable.
2130 factory.EnableStatistics();
2131
2132 factory.RunFor(chrono::milliseconds(10050));
2133
2134 // Build up the list of all the messages we expect when we come back.
2135 {
2136 std::set<const aos::Channel *> statistics_channels;
2137 for (const std::pair<std::string_view, const Node *> pi :
2138 std::vector<std::pair<std::string_view, const Node *>>{
2139 {"/pi1/aos", pi1->node()},
2140 {"/pi2/aos", pi1->node()},
2141 {"/pi3/aos", pi1->node()}}) {
2142 statistics_channels.insert(configuration::GetChannel(
2143 factory.configuration(), pi.first, "aos.message_bridge.Timestamp", "",
2144 pi.second));
2145 statistics_channels.insert(configuration::GetChannel(
2146 factory.configuration(), pi.first,
2147 "aos.message_bridge.ServerStatistics", "", pi.second));
2148 statistics_channels.insert(configuration::GetChannel(
2149 factory.configuration(), pi.first,
2150 "aos.message_bridge.ClientStatistics", "", pi.second));
2151 }
2152
2153 statistics_channels.insert(configuration::GetChannel(
2154 factory.configuration(),
2155 "/pi1/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp",
2156 "aos.message_bridge.RemoteMessage", "", pi1->node()));
2157 statistics_channels.insert(configuration::GetChannel(
2158 factory.configuration(),
2159 "/pi2/aos/remote_timestamps/pi1/pi2/aos/aos-message_bridge-Timestamp",
2160 "aos.message_bridge.RemoteMessage", "", pi2->node()));
2161 VerifyChannels(statistics_channels, monotonic_clock::min_time, {});
2162 }
2163
2164 // Now test that we can disable the messages for a single node
2165 pi2->DisableStatistics();
2166 const aos::monotonic_clock::time_point statistics_disable_time =
2167 pi2->monotonic_now();
2168 factory.RunFor(chrono::milliseconds(10000));
2169
2170 // We should see a much smaller set of messages, but should still see messages
2171 // forwarded, mainly the timestamp message.
2172 {
2173 std::set<const aos::Channel *> statistics_channels;
2174 for (const std::pair<std::string_view, const Node *> pi :
2175 std::vector<std::pair<std::string_view, const Node *>>{
2176 {"/pi1/aos", pi1->node()}, {"/pi3/aos", pi1->node()}}) {
2177 statistics_channels.insert(configuration::GetChannel(
2178 factory.configuration(), pi.first, "aos.message_bridge.Timestamp", "",
2179 pi.second));
2180 statistics_channels.insert(configuration::GetChannel(
2181 factory.configuration(), pi.first,
2182 "aos.message_bridge.ServerStatistics", "", pi.second));
2183 statistics_channels.insert(configuration::GetChannel(
2184 factory.configuration(), pi.first,
2185 "aos.message_bridge.ClientStatistics", "", pi.second));
2186 }
2187
2188 statistics_channels.insert(configuration::GetChannel(
2189 factory.configuration(),
2190 "/pi1/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp",
2191 "aos.message_bridge.RemoteMessage", "", pi1->node()));
2192 VerifyChannels(statistics_channels, statistics_disable_time, {});
2193 }
2194
2195 // Now, fully disconnect the node. This will completely quiet down pi2.
2196 pi1->Disconnect(pi2->node());
2197 pi2->Disconnect(pi1->node());
2198
2199 const aos::monotonic_clock::time_point disconnect_disable_time =
2200 pi2->monotonic_now();
2201 factory.RunFor(chrono::milliseconds(10000));
2202
2203 {
2204 std::set<const aos::Channel *> statistics_channels;
2205 for (const std::pair<std::string_view, const Node *> pi :
2206 std::vector<std::pair<std::string_view, const Node *>>{
2207 {"/pi1/aos", pi1->node()}, {"/pi3/aos", pi1->node()}}) {
2208 statistics_channels.insert(configuration::GetChannel(
2209 factory.configuration(), pi.first, "aos.message_bridge.Timestamp", "",
2210 pi.second));
2211 statistics_channels.insert(configuration::GetChannel(
2212 factory.configuration(), pi.first,
2213 "aos.message_bridge.ServerStatistics", "", pi.second));
2214 statistics_channels.insert(configuration::GetChannel(
2215 factory.configuration(), pi.first,
2216 "aos.message_bridge.ClientStatistics", "", pi.second));
2217 }
2218
2219 VerifyChannels(statistics_channels, disconnect_disable_time, {pi2->node()});
2220 }
2221}
2222
Neil Balchc8f41ed2018-01-20 22:06:53 -08002223} // namespace testing
2224} // namespace aos