blob: 4cb51de651fd52cfadb1157ff85ad015b4842363 [file] [log] [blame]
Alex Perrycb7da4b2019-08-28 19:35:56 -07001#include "aos/events/simulated_event_loop.h"
2
Austin Schuh5f1cc5c2019-12-01 18:01:11 -08003#include <string_view>
4
Alex Perrycb7da4b2019-08-28 19:35:56 -07005#include "aos/events/event_loop_param_test.h"
Austin Schuh2f8fd752020-09-01 22:38:28 -07006#include "aos/events/logging/logger_generated.h"
Austin Schuh01b4c352020-09-21 23:09:39 -07007#include "aos/events/message_counter.h"
Austin Schuh898f4972020-01-11 17:21:25 -08008#include "aos/events/ping_lib.h"
9#include "aos/events/pong_lib.h"
Austin Schuh7d87b672019-12-01 20:23:49 -080010#include "aos/events/test_message_generated.h"
Austin Schuh4c3b9702020-08-30 11:34:55 -070011#include "aos/network/message_bridge_client_generated.h"
12#include "aos/network/message_bridge_server_generated.h"
Austin Schuh0de30f32020-12-06 12:44:28 -080013#include "aos/network/remote_message_generated.h"
Austin Schuh87dd3832021-01-01 23:07:31 -080014#include "aos/network/testing_time_converter.h"
Austin Schuh4c3b9702020-08-30 11:34:55 -070015#include "aos/network/timestamp_generated.h"
Austin Schuh373f1762021-06-02 21:07:09 -070016#include "aos/testing/path.h"
Neil Balchc8f41ed2018-01-20 22:06:53 -080017#include "gtest/gtest.h"
18
19namespace aos {
20namespace testing {
Brian Silverman28d14302020-09-18 15:26:17 -070021namespace {
22
Austin Schuh373f1762021-06-02 21:07:09 -070023using aos::testing::ArtifactPath;
Brian Silverman28d14302020-09-18 15:26:17 -070024
Austin Schuh0de30f32020-12-06 12:44:28 -080025using message_bridge::RemoteMessage;
Austin Schuh7267c532019-05-19 19:55:53 -070026namespace chrono = ::std::chrono;
27
Austin Schuh0de30f32020-12-06 12:44:28 -080028} // namespace
29
Neil Balchc8f41ed2018-01-20 22:06:53 -080030class SimulatedEventLoopTestFactory : public EventLoopTestFactory {
31 public:
Austin Schuh5f1cc5c2019-12-01 18:01:11 -080032 ::std::unique_ptr<EventLoop> Make(std::string_view name) override {
Austin Schuh217a9782019-12-21 23:02:50 -080033 MaybeMake();
Austin Schuhac0771c2020-01-07 18:36:30 -080034 return event_loop_factory_->MakeEventLoop(name, my_node());
Neil Balchc8f41ed2018-01-20 22:06:53 -080035 }
Austin Schuh5f1cc5c2019-12-01 18:01:11 -080036 ::std::unique_ptr<EventLoop> MakePrimary(std::string_view name) override {
Austin Schuh217a9782019-12-21 23:02:50 -080037 MaybeMake();
Austin Schuhac0771c2020-01-07 18:36:30 -080038 return event_loop_factory_->MakeEventLoop(name, my_node());
Austin Schuh44019f92019-05-19 19:58:27 -070039 }
40
Austin Schuh217a9782019-12-21 23:02:50 -080041 void Run() override { event_loop_factory_->Run(); }
42 void Exit() override { event_loop_factory_->Exit(); }
Austin Schuh44019f92019-05-19 19:58:27 -070043
Austin Schuh52d325c2019-06-23 18:59:06 -070044 // TODO(austin): Implement this. It's used currently for a phased loop test.
45 // I'm not sure how much that matters.
46 void SleepFor(::std::chrono::nanoseconds /*duration*/) override {}
47
Austin Schuh7d87b672019-12-01 20:23:49 -080048 void set_send_delay(std::chrono::nanoseconds send_delay) {
Austin Schuh217a9782019-12-21 23:02:50 -080049 MaybeMake();
50 event_loop_factory_->set_send_delay(send_delay);
Austin Schuh7d87b672019-12-01 20:23:49 -080051 }
52
Neil Balchc8f41ed2018-01-20 22:06:53 -080053 private:
Austin Schuh217a9782019-12-21 23:02:50 -080054 void MaybeMake() {
55 if (!event_loop_factory_) {
56 if (configuration()->has_nodes()) {
Austin Schuhac0771c2020-01-07 18:36:30 -080057 event_loop_factory_ =
58 std::make_unique<SimulatedEventLoopFactory>(configuration());
Austin Schuh217a9782019-12-21 23:02:50 -080059 } else {
60 event_loop_factory_ =
61 std::make_unique<SimulatedEventLoopFactory>(configuration());
62 }
63 }
64 }
65 std::unique_ptr<SimulatedEventLoopFactory> event_loop_factory_;
Neil Balchc8f41ed2018-01-20 22:06:53 -080066};
67
Austin Schuh6bae8252021-02-07 22:01:49 -080068auto CommonParameters() {
69 return ::testing::Combine(
70 ::testing::Values([]() { return new SimulatedEventLoopTestFactory(); }),
71 ::testing::Values(ReadMethod::COPY, ReadMethod::PIN),
72 ::testing::Values(DoTimingReports::kYes, DoTimingReports::kNo));
73}
Austin Schuh6b6dfa52019-06-12 20:16:20 -070074
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -070075INSTANTIATE_TEST_SUITE_P(SimulatedEventLoopCommonTest, AbstractEventLoopTest,
Austin Schuh6bae8252021-02-07 22:01:49 -080076 CommonParameters());
Brian Silverman77162972020-08-12 19:52:40 -070077
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -070078INSTANTIATE_TEST_SUITE_P(SimulatedEventLoopCommonDeathTest,
Austin Schuh6bae8252021-02-07 22:01:49 -080079 AbstractEventLoopDeathTest, CommonParameters());
Neil Balchc8f41ed2018-01-20 22:06:53 -080080
Austin Schuh89c9b812021-02-20 14:42:10 -080081// Parameters to run all the tests with.
82struct Param {
83 // The config file to use.
84 std::string config;
85 // If true, the RemoteMessage channel should be shared between all the remote
86 // channels. If false, there will be 1 RemoteMessage channel per remote
87 // channel.
88 bool shared;
89};
90
91class RemoteMessageSimulatedEventLoopTest
92 : public ::testing::TestWithParam<struct Param> {
93 public:
94 RemoteMessageSimulatedEventLoopTest()
95 : config(aos::configuration::ReadConfig(
Austin Schuh373f1762021-06-02 21:07:09 -070096 ArtifactPath(absl::StrCat("aos/events/", GetParam().config)))) {
Austin Schuh89c9b812021-02-20 14:42:10 -080097 LOG(INFO) << "Config " << GetParam().config;
98 }
99
100 bool shared() const { return GetParam().shared; }
101
102 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
103 MakePi2OnPi1MessageCounters(aos::EventLoop *event_loop) {
104 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>> counters;
105 if (shared()) {
106 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
107 event_loop, "/aos/remote_timestamps/pi2"));
108 } else {
109 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
110 event_loop,
111 "/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp"));
112 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
113 event_loop, "/aos/remote_timestamps/pi2/test/aos-examples-Ping"));
114 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
115 event_loop, "/aos/remote_timestamps/pi2/reliable/aos-examples-Ping"));
116 }
117 return counters;
118 }
119
120 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
121 MakePi1OnPi2MessageCounters(aos::EventLoop *event_loop) {
122 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>> counters;
123 if (shared()) {
124 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
125 event_loop, "/aos/remote_timestamps/pi1"));
126 } else {
127 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
128 event_loop, "/aos/remote_timestamps/pi1/test/aos-examples-Pong"));
129 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
130 event_loop,
131 "/aos/remote_timestamps/pi1/pi2/aos/aos-message_bridge-Timestamp"));
132 }
133 return counters;
134 }
135
136 aos::FlatbufferDetachedBuffer<aos::Configuration> config;
137};
138
Neil Balchc8f41ed2018-01-20 22:06:53 -0800139// Test that creating an event and running the scheduler runs the event.
140TEST(EventSchedulerTest, ScheduleEvent) {
141 int counter = 0;
Austin Schuh8bd96322020-02-13 21:18:22 -0800142 EventSchedulerScheduler scheduler_scheduler;
Neil Balchc8f41ed2018-01-20 22:06:53 -0800143 EventScheduler scheduler;
Austin Schuh8bd96322020-02-13 21:18:22 -0800144 scheduler_scheduler.AddEventScheduler(&scheduler);
Neil Balchc8f41ed2018-01-20 22:06:53 -0800145
Austin Schuh8bd96322020-02-13 21:18:22 -0800146 scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(1),
Austin Schuhac0771c2020-01-07 18:36:30 -0800147 [&counter]() { counter += 1; });
Austin Schuh8bd96322020-02-13 21:18:22 -0800148 scheduler_scheduler.Run();
Neil Balchc8f41ed2018-01-20 22:06:53 -0800149 EXPECT_EQ(counter, 1);
Ravago Jonescf453ab2020-05-06 21:14:53 -0700150 auto token = scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(2),
151 [&counter]() { counter += 1; });
Neil Balchc8f41ed2018-01-20 22:06:53 -0800152 scheduler.Deschedule(token);
Austin Schuh8bd96322020-02-13 21:18:22 -0800153 scheduler_scheduler.Run();
Neil Balchc8f41ed2018-01-20 22:06:53 -0800154 EXPECT_EQ(counter, 1);
155}
156
157// Test that descheduling an already scheduled event doesn't run the event.
158TEST(EventSchedulerTest, DescheduleEvent) {
159 int counter = 0;
Austin Schuh8bd96322020-02-13 21:18:22 -0800160 EventSchedulerScheduler scheduler_scheduler;
Neil Balchc8f41ed2018-01-20 22:06:53 -0800161 EventScheduler scheduler;
Austin Schuh8bd96322020-02-13 21:18:22 -0800162 scheduler_scheduler.AddEventScheduler(&scheduler);
Neil Balchc8f41ed2018-01-20 22:06:53 -0800163
Austin Schuh8bd96322020-02-13 21:18:22 -0800164 auto token = scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(1),
165 [&counter]() { counter += 1; });
Neil Balchc8f41ed2018-01-20 22:06:53 -0800166 scheduler.Deschedule(token);
Austin Schuh8bd96322020-02-13 21:18:22 -0800167 scheduler_scheduler.Run();
Neil Balchc8f41ed2018-01-20 22:06:53 -0800168 EXPECT_EQ(counter, 0);
169}
Austin Schuh44019f92019-05-19 19:58:27 -0700170
Austin Schuh8fb315a2020-11-19 22:33:58 -0800171void SendTestMessage(aos::Sender<TestMessage> *sender, int value) {
172 aos::Sender<TestMessage>::Builder builder = sender->MakeBuilder();
173 TestMessage::Builder test_message_builder =
174 builder.MakeBuilder<TestMessage>();
175 test_message_builder.add_value(value);
176 builder.Send(test_message_builder.Finish());
177}
178
179// Test that sending a message after running gets properly notified.
180TEST(SimulatedEventLoopTest, SendAfterRunFor) {
181 SimulatedEventLoopTestFactory factory;
182
183 SimulatedEventLoopFactory simulated_event_loop_factory(
184 factory.configuration());
185
186 ::std::unique_ptr<EventLoop> ping_event_loop =
187 simulated_event_loop_factory.MakeEventLoop("ping");
188 aos::Sender<TestMessage> test_message_sender =
189 ping_event_loop->MakeSender<TestMessage>("/test");
190 SendTestMessage(&test_message_sender, 1);
191
192 std::unique_ptr<EventLoop> pong1_event_loop =
193 simulated_event_loop_factory.MakeEventLoop("pong");
194 MessageCounter<TestMessage> test_message_counter1(pong1_event_loop.get(),
195 "/test");
196
197 EXPECT_FALSE(ping_event_loop->is_running());
198
199 // Watchers start when you start running, so there should be nothing counted.
200 simulated_event_loop_factory.RunFor(chrono::seconds(1));
201 EXPECT_EQ(test_message_counter1.count(), 0u);
202
203 std::unique_ptr<EventLoop> pong2_event_loop =
204 simulated_event_loop_factory.MakeEventLoop("pong");
205 MessageCounter<TestMessage> test_message_counter2(pong2_event_loop.get(),
206 "/test");
207
208 // Pauses in the middle don't count though, so this should be counted.
209 // But, the fresh watcher shouldn't pick it up yet.
210 SendTestMessage(&test_message_sender, 2);
211
212 EXPECT_EQ(test_message_counter1.count(), 0u);
213 EXPECT_EQ(test_message_counter2.count(), 0u);
214 simulated_event_loop_factory.RunFor(chrono::seconds(1));
215
216 EXPECT_EQ(test_message_counter1.count(), 1u);
217 EXPECT_EQ(test_message_counter2.count(), 0u);
218}
219
220// Test that creating an event loop while running dies.
221TEST(SimulatedEventLoopDeathTest, MakeEventLoopWhileRunning) {
222 SimulatedEventLoopTestFactory factory;
223
224 SimulatedEventLoopFactory simulated_event_loop_factory(
225 factory.configuration());
226
227 ::std::unique_ptr<EventLoop> event_loop =
228 simulated_event_loop_factory.MakeEventLoop("ping");
229
230 auto timer = event_loop->AddTimer([&]() {
231 EXPECT_DEATH(
232 {
233 ::std::unique_ptr<EventLoop> event_loop2 =
234 simulated_event_loop_factory.MakeEventLoop("ping");
235 },
236 "event loop while running");
237 simulated_event_loop_factory.Exit();
238 });
239
240 event_loop->OnRun([&event_loop, &timer] {
241 timer->Setup(event_loop->monotonic_now() + chrono::milliseconds(50));
242 });
243
244 simulated_event_loop_factory.Run();
245}
246
247// Test that creating a watcher after running dies.
248TEST(SimulatedEventLoopDeathTest, MakeWatcherAfterRunning) {
249 SimulatedEventLoopTestFactory factory;
250
251 SimulatedEventLoopFactory simulated_event_loop_factory(
252 factory.configuration());
253
254 ::std::unique_ptr<EventLoop> event_loop =
255 simulated_event_loop_factory.MakeEventLoop("ping");
256
257 simulated_event_loop_factory.RunFor(chrono::seconds(1));
258
259 EXPECT_DEATH(
260 { MessageCounter<TestMessage> counter(event_loop.get(), "/test"); },
261 "Can't add a watcher after running");
262
263 ::std::unique_ptr<EventLoop> event_loop2 =
264 simulated_event_loop_factory.MakeEventLoop("ping");
265
266 simulated_event_loop_factory.RunFor(chrono::seconds(1));
267
268 EXPECT_DEATH(
269 { MessageCounter<TestMessage> counter(event_loop2.get(), "/test"); },
270 "Can't add a watcher after running");
271}
272
Austin Schuh44019f92019-05-19 19:58:27 -0700273// Test that running for a time period with no handlers causes time to progress
274// correctly.
275TEST(SimulatedEventLoopTest, RunForNoHandlers) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800276 SimulatedEventLoopTestFactory factory;
277
278 SimulatedEventLoopFactory simulated_event_loop_factory(
279 factory.configuration());
Austin Schuh44019f92019-05-19 19:58:27 -0700280 ::std::unique_ptr<EventLoop> event_loop =
Austin Schuh5f1cc5c2019-12-01 18:01:11 -0800281 simulated_event_loop_factory.MakeEventLoop("loop");
Austin Schuh44019f92019-05-19 19:58:27 -0700282
283 simulated_event_loop_factory.RunFor(chrono::seconds(1));
284
285 EXPECT_EQ(::aos::monotonic_clock::epoch() + chrono::seconds(1),
Austin Schuh44019f92019-05-19 19:58:27 -0700286 event_loop->monotonic_now());
287}
288
289// Test that running for a time with a periodic handler causes time to end
290// correctly.
291TEST(SimulatedEventLoopTest, RunForTimerHandler) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800292 SimulatedEventLoopTestFactory factory;
293
294 SimulatedEventLoopFactory simulated_event_loop_factory(
295 factory.configuration());
Austin Schuh44019f92019-05-19 19:58:27 -0700296 ::std::unique_ptr<EventLoop> event_loop =
Austin Schuh5f1cc5c2019-12-01 18:01:11 -0800297 simulated_event_loop_factory.MakeEventLoop("loop");
Austin Schuh44019f92019-05-19 19:58:27 -0700298
299 int counter = 0;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700300 auto timer = event_loop->AddTimer([&counter]() { ++counter; });
Austin Schuh44019f92019-05-19 19:58:27 -0700301 event_loop->OnRun([&event_loop, &timer] {
302 timer->Setup(event_loop->monotonic_now() + chrono::milliseconds(50),
303 chrono::milliseconds(100));
304 });
305
306 simulated_event_loop_factory.RunFor(chrono::seconds(1));
307
308 EXPECT_EQ(::aos::monotonic_clock::epoch() + chrono::seconds(1),
Austin Schuh44019f92019-05-19 19:58:27 -0700309 event_loop->monotonic_now());
310 EXPECT_EQ(counter, 10);
311}
312
Austin Schuh7d87b672019-12-01 20:23:49 -0800313// Tests that watchers have latency in simulation.
314TEST(SimulatedEventLoopTest, WatcherTimingReport) {
315 SimulatedEventLoopTestFactory factory;
316 factory.set_send_delay(std::chrono::microseconds(50));
317
318 FLAGS_timing_report_ms = 1000;
319 auto loop1 = factory.MakePrimary("primary");
320 loop1->MakeWatcher("/test", [](const TestMessage &) {});
321
322 auto loop2 = factory.Make("sender_loop");
323
324 auto loop3 = factory.Make("report_fetcher");
325
326 Fetcher<timing::Report> report_fetcher =
327 loop3->MakeFetcher<timing::Report>("/aos");
328 EXPECT_FALSE(report_fetcher.Fetch());
329
330 auto sender = loop2->MakeSender<TestMessage>("/test");
331
332 // Send 10 messages in the middle of a timing report period so we get
333 // something interesting back.
334 auto test_timer = loop2->AddTimer([&sender]() {
335 for (int i = 0; i < 10; ++i) {
336 aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
337 TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
338 builder.add_value(200 + i);
339 ASSERT_TRUE(msg.Send(builder.Finish()));
340 }
341 });
342
343 // Quit after 1 timing report, mid way through the next cycle.
344 {
345 auto end_timer = loop1->AddTimer([&factory]() { factory.Exit(); });
346 end_timer->Setup(loop1->monotonic_now() + chrono::milliseconds(2500));
347 end_timer->set_name("end");
348 }
349
350 loop1->OnRun([&test_timer, &loop1]() {
351 test_timer->Setup(loop1->monotonic_now() + chrono::milliseconds(1500));
352 });
353
354 factory.Run();
355
356 // And, since we are here, check that the timing report makes sense.
357 // Start by looking for our event loop's timing.
358 FlatbufferDetachedBuffer<timing::Report> primary_report =
359 FlatbufferDetachedBuffer<timing::Report>::Empty();
360 while (report_fetcher.FetchNext()) {
361 LOG(INFO) << "Report " << FlatbufferToJson(report_fetcher.get());
362 if (report_fetcher->name()->string_view() == "primary") {
363 primary_report = CopyFlatBuffer(report_fetcher.get());
364 }
365 }
366
367 // Check the watcher report.
Ravago Jonescf453ab2020-05-06 21:14:53 -0700368 VLOG(1) << FlatbufferToJson(primary_report, {.multi_line = true});
Austin Schuh7d87b672019-12-01 20:23:49 -0800369
370 EXPECT_EQ(primary_report.message().name()->string_view(), "primary");
371
372 // Just the timing report timer.
373 ASSERT_NE(primary_report.message().timers(), nullptr);
374 EXPECT_EQ(primary_report.message().timers()->size(), 2);
375
376 // No phased loops
377 ASSERT_EQ(primary_report.message().phased_loops(), nullptr);
378
379 // And now confirm that the watcher received all 10 messages, and has latency.
380 ASSERT_NE(primary_report.message().watchers(), nullptr);
381 ASSERT_EQ(primary_report.message().watchers()->size(), 1);
382 EXPECT_EQ(primary_report.message().watchers()->Get(0)->count(), 10);
383 EXPECT_NEAR(
384 primary_report.message().watchers()->Get(0)->wakeup_latency()->average(),
385 0.00005, 1e-9);
386 EXPECT_NEAR(
387 primary_report.message().watchers()->Get(0)->wakeup_latency()->min(),
388 0.00005, 1e-9);
389 EXPECT_NEAR(
390 primary_report.message().watchers()->Get(0)->wakeup_latency()->max(),
391 0.00005, 1e-9);
392 EXPECT_EQ(primary_report.message()
393 .watchers()
394 ->Get(0)
395 ->wakeup_latency()
396 ->standard_deviation(),
397 0.0);
398
399 EXPECT_EQ(
400 primary_report.message().watchers()->Get(0)->handler_time()->average(),
401 0.0);
402 EXPECT_EQ(primary_report.message().watchers()->Get(0)->handler_time()->min(),
403 0.0);
404 EXPECT_EQ(primary_report.message().watchers()->Get(0)->handler_time()->max(),
405 0.0);
406 EXPECT_EQ(primary_report.message()
407 .watchers()
408 ->Get(0)
409 ->handler_time()
410 ->standard_deviation(),
411 0.0);
412}
413
Austin Schuh89c9b812021-02-20 14:42:10 -0800414size_t CountAll(
415 const std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
416 &counters) {
417 size_t count = 0u;
418 for (const std::unique_ptr<MessageCounter<RemoteMessage>> &counter :
419 counters) {
420 count += counter->count();
421 }
422 return count;
423}
424
Austin Schuh4c3b9702020-08-30 11:34:55 -0700425// Tests that ping and pong work when on 2 different nodes, and the message
426// gateway messages are sent out as expected.
Austin Schuh89c9b812021-02-20 14:42:10 -0800427TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodePingPong) {
Austin Schuh898f4972020-01-11 17:21:25 -0800428 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
429 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
Austin Schuh4c3b9702020-08-30 11:34:55 -0700430 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
Austin Schuh898f4972020-01-11 17:21:25 -0800431
432 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
433
434 std::unique_ptr<EventLoop> ping_event_loop =
435 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
436 Ping ping(ping_event_loop.get());
437
438 std::unique_ptr<EventLoop> pong_event_loop =
439 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
440 Pong pong(pong_event_loop.get());
441
442 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
443 simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700444 MessageCounter<examples::Pong> pi2_pong_counter(
445 pi2_pong_counter_event_loop.get(), "/test");
Austin Schuh2f8fd752020-09-01 22:38:28 -0700446 aos::Fetcher<message_bridge::Timestamp> pi1_on_pi2_timestamp_fetcher =
447 pi2_pong_counter_event_loop->MakeFetcher<message_bridge::Timestamp>(
448 "/pi1/aos");
449 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
450 pi2_pong_counter_event_loop->MakeFetcher<examples::Ping>("/test");
Austin Schuh898f4972020-01-11 17:21:25 -0800451
Austin Schuh4c3b9702020-08-30 11:34:55 -0700452 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
453 simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
Austin Schuh898f4972020-01-11 17:21:25 -0800454
455 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
456 simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700457 MessageCounter<examples::Pong> pi1_pong_counter(
458 pi1_pong_counter_event_loop.get(), "/test");
Austin Schuh2f8fd752020-09-01 22:38:28 -0700459 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
460 pi1_pong_counter_event_loop->MakeFetcher<examples::Ping>("/test");
461 aos::Fetcher<message_bridge::Timestamp> pi1_on_pi1_timestamp_fetcher =
462 pi1_pong_counter_event_loop->MakeFetcher<message_bridge::Timestamp>(
463 "/aos");
464
Austin Schuh4c3b9702020-08-30 11:34:55 -0700465 // Count timestamps.
466 MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
467 pi1_pong_counter_event_loop.get(), "/pi1/aos");
468 MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
469 pi2_pong_counter_event_loop.get(), "/pi1/aos");
470 MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
471 pi3_pong_counter_event_loop.get(), "/pi1/aos");
472 MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
473 pi1_pong_counter_event_loop.get(), "/pi2/aos");
474 MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
475 pi2_pong_counter_event_loop.get(), "/pi2/aos");
476 MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
477 pi1_pong_counter_event_loop.get(), "/pi3/aos");
478 MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
479 pi3_pong_counter_event_loop.get(), "/pi3/aos");
480
Austin Schuh2f8fd752020-09-01 22:38:28 -0700481 // Count remote timestamps
Austin Schuh89c9b812021-02-20 14:42:10 -0800482 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
483 remote_timestamps_pi2_on_pi1 =
484 MakePi2OnPi1MessageCounters(pi1_pong_counter_event_loop.get());
485 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
486 remote_timestamps_pi1_on_pi2 =
487 MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700488
Austin Schuh4c3b9702020-08-30 11:34:55 -0700489 // Wait to let timestamp estimation start up before looking for the results.
490 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
491
Austin Schuh8fb315a2020-11-19 22:33:58 -0800492 std::unique_ptr<EventLoop> pi1_statistics_counter_event_loop =
493 simulated_event_loop_factory.MakeEventLoop("pi1_statistics_counter", pi1);
494 std::unique_ptr<EventLoop> pi2_statistics_counter_event_loop =
495 simulated_event_loop_factory.MakeEventLoop("pi2_statistics_counter", pi2);
496 std::unique_ptr<EventLoop> pi3_statistics_counter_event_loop =
497 simulated_event_loop_factory.MakeEventLoop("pi3_statistics_counter", pi3);
498
Austin Schuh4c3b9702020-08-30 11:34:55 -0700499 int pi1_server_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800500 pi1_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700501 "/pi1/aos", [&pi1_server_statistics_count](
502 const message_bridge::ServerStatistics &stats) {
503 VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
504 EXPECT_EQ(stats.connections()->size(), 2u);
505 for (const message_bridge::ServerConnection *connection :
506 *stats.connections()) {
507 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800508 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700509 if (connection->node()->name()->string_view() == "pi2") {
510 EXPECT_GT(connection->sent_packets(), 50);
511 } else if (connection->node()->name()->string_view() == "pi3") {
512 EXPECT_GE(connection->sent_packets(), 5);
513 } else {
514 LOG(FATAL) << "Unknown connection";
515 }
516
517 EXPECT_TRUE(connection->has_monotonic_offset());
518 EXPECT_EQ(connection->monotonic_offset(), 0);
519 }
520 ++pi1_server_statistics_count;
521 });
522
523 int pi2_server_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800524 pi2_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700525 "/pi2/aos", [&pi2_server_statistics_count](
526 const message_bridge::ServerStatistics &stats) {
527 VLOG(1) << "pi2 ServerStatistics " << FlatbufferToJson(&stats);
528 EXPECT_EQ(stats.connections()->size(), 1u);
529
530 const message_bridge::ServerConnection *connection =
531 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800532 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700533 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
534 EXPECT_GT(connection->sent_packets(), 50);
535 EXPECT_TRUE(connection->has_monotonic_offset());
536 EXPECT_EQ(connection->monotonic_offset(), 0);
537 ++pi2_server_statistics_count;
538 });
539
540 int pi3_server_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800541 pi3_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700542 "/pi3/aos", [&pi3_server_statistics_count](
543 const message_bridge::ServerStatistics &stats) {
544 VLOG(1) << "pi3 ServerStatistics " << FlatbufferToJson(&stats);
545 EXPECT_EQ(stats.connections()->size(), 1u);
546
547 const message_bridge::ServerConnection *connection =
548 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800549 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700550 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
551 EXPECT_GE(connection->sent_packets(), 5);
552 EXPECT_TRUE(connection->has_monotonic_offset());
553 EXPECT_EQ(connection->monotonic_offset(), 0);
554 ++pi3_server_statistics_count;
555 });
556
557 int pi1_client_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800558 pi1_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700559 "/pi1/aos", [&pi1_client_statistics_count](
560 const message_bridge::ClientStatistics &stats) {
561 VLOG(1) << "pi1 ClientStatistics " << FlatbufferToJson(&stats);
562 EXPECT_EQ(stats.connections()->size(), 2u);
563
564 for (const message_bridge::ClientConnection *connection :
565 *stats.connections()) {
566 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
567 if (connection->node()->name()->string_view() == "pi2") {
568 EXPECT_GT(connection->received_packets(), 50);
569 } else if (connection->node()->name()->string_view() == "pi3") {
570 EXPECT_GE(connection->received_packets(), 5);
571 } else {
572 LOG(FATAL) << "Unknown connection";
573 }
574
Austin Schuhe61d4382021-03-31 21:33:02 -0700575 EXPECT_EQ(connection->partial_deliveries(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700576 EXPECT_TRUE(connection->has_monotonic_offset());
577 EXPECT_EQ(connection->monotonic_offset(), 150000);
578 }
579 ++pi1_client_statistics_count;
580 });
581
582 int pi2_client_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800583 pi2_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700584 "/pi2/aos", [&pi2_client_statistics_count](
585 const message_bridge::ClientStatistics &stats) {
586 VLOG(1) << "pi2 ClientStatistics " << FlatbufferToJson(&stats);
587 EXPECT_EQ(stats.connections()->size(), 1u);
588
589 const message_bridge::ClientConnection *connection =
590 stats.connections()->Get(0);
591 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
592 EXPECT_GT(connection->received_packets(), 50);
Austin Schuhe61d4382021-03-31 21:33:02 -0700593 EXPECT_EQ(connection->partial_deliveries(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700594 EXPECT_TRUE(connection->has_monotonic_offset());
595 EXPECT_EQ(connection->monotonic_offset(), 150000);
596 ++pi2_client_statistics_count;
597 });
598
599 int pi3_client_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800600 pi3_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700601 "/pi3/aos", [&pi3_client_statistics_count](
602 const message_bridge::ClientStatistics &stats) {
603 VLOG(1) << "pi3 ClientStatistics " << FlatbufferToJson(&stats);
604 EXPECT_EQ(stats.connections()->size(), 1u);
605
606 const message_bridge::ClientConnection *connection =
607 stats.connections()->Get(0);
608 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
609 EXPECT_GE(connection->received_packets(), 5);
Austin Schuhe61d4382021-03-31 21:33:02 -0700610 EXPECT_EQ(connection->partial_deliveries(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700611 EXPECT_TRUE(connection->has_monotonic_offset());
612 EXPECT_EQ(connection->monotonic_offset(), 150000);
613 ++pi3_client_statistics_count;
614 });
615
Austin Schuh2f8fd752020-09-01 22:38:28 -0700616 // Find the channel index for both the /pi1/aos Timestamp channel and Ping
617 // channel.
618 const size_t pi1_timestamp_channel =
619 configuration::ChannelIndex(pi1_pong_counter_event_loop->configuration(),
620 pi1_on_pi2_timestamp_fetcher.channel());
621 const size_t ping_timestamp_channel =
622 configuration::ChannelIndex(pi1_pong_counter_event_loop->configuration(),
623 ping_on_pi2_fetcher.channel());
624
625 for (const Channel *channel :
626 *pi1_pong_counter_event_loop->configuration()->channels()) {
627 VLOG(1) << "Channel "
628 << configuration::ChannelIndex(
629 pi1_pong_counter_event_loop->configuration(), channel)
630 << " " << configuration::CleanedChannelToString(channel);
631 }
632
Austin Schuh8fb315a2020-11-19 22:33:58 -0800633 std::unique_ptr<EventLoop> pi1_remote_timestamp =
634 simulated_event_loop_factory.MakeEventLoop("pi1_remote_timestamp", pi1);
635
Austin Schuh89c9b812021-02-20 14:42:10 -0800636 for (std::pair<int, std::string> channel :
637 shared()
638 ? std::vector<std::pair<
639 int, std::string>>{{-1, "/pi1/aos/remote_timestamps/pi2"}}
640 : std::vector<std::pair<int, std::string>>{
641 {pi1_timestamp_channel,
642 "/pi1/aos/remote_timestamps/pi2/pi1/aos/"
643 "aos-message_bridge-Timestamp"},
644 {ping_timestamp_channel,
645 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping"}}) {
646 // For each remote timestamp we get back, confirm that it is either a ping
647 // message, or a timestamp we sent out. Also confirm that the timestamps
648 // are correct.
649 pi1_remote_timestamp->MakeWatcher(
650 channel.second,
651 [pi1_timestamp_channel, ping_timestamp_channel, &ping_on_pi2_fetcher,
652 &ping_on_pi1_fetcher, &pi1_on_pi2_timestamp_fetcher,
653 &pi1_on_pi1_timestamp_fetcher, &simulated_event_loop_factory, pi2,
654 channel_index = channel.first](const RemoteMessage &header) {
655 VLOG(1) << aos::FlatbufferToJson(&header);
656 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuhcdd90272021-03-15 12:46:16 -0700657 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
Austin Schuh89c9b812021-02-20 14:42:10 -0800658 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
Austin Schuhcdd90272021-03-15 12:46:16 -0700659 ->boot_uuid());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700660
Austin Schuh89c9b812021-02-20 14:42:10 -0800661 const aos::monotonic_clock::time_point header_monotonic_sent_time(
662 chrono::nanoseconds(header.monotonic_sent_time()));
663 const aos::realtime_clock::time_point header_realtime_sent_time(
664 chrono::nanoseconds(header.realtime_sent_time()));
665 const aos::monotonic_clock::time_point header_monotonic_remote_time(
666 chrono::nanoseconds(header.monotonic_remote_time()));
667 const aos::realtime_clock::time_point header_realtime_remote_time(
668 chrono::nanoseconds(header.realtime_remote_time()));
Austin Schuh2f8fd752020-09-01 22:38:28 -0700669
Austin Schuh89c9b812021-02-20 14:42:10 -0800670 if (channel_index != -1) {
671 ASSERT_EQ(channel_index, header.channel_index());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700672 }
673
Austin Schuh89c9b812021-02-20 14:42:10 -0800674 const Context *pi1_context = nullptr;
675 const Context *pi2_context = nullptr;
676
677 if (header.channel_index() == pi1_timestamp_channel) {
678 // Find the forwarded message.
679 while (pi1_on_pi2_timestamp_fetcher.context().monotonic_event_time <
680 header_monotonic_sent_time) {
681 ASSERT_TRUE(pi1_on_pi2_timestamp_fetcher.FetchNext());
682 }
683
684 // And the source message.
685 while (pi1_on_pi1_timestamp_fetcher.context().monotonic_event_time <
686 header_monotonic_remote_time) {
687 ASSERT_TRUE(pi1_on_pi1_timestamp_fetcher.FetchNext());
688 }
689
690 pi1_context = &pi1_on_pi1_timestamp_fetcher.context();
691 pi2_context = &pi1_on_pi2_timestamp_fetcher.context();
692 } else if (header.channel_index() == ping_timestamp_channel) {
693 // Find the forwarded message.
694 while (ping_on_pi2_fetcher.context().monotonic_event_time <
695 header_monotonic_sent_time) {
696 ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
697 }
698
699 // And the source message.
700 while (ping_on_pi1_fetcher.context().monotonic_event_time <
701 header_monotonic_remote_time) {
702 ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
703 }
704
705 pi1_context = &ping_on_pi1_fetcher.context();
706 pi2_context = &ping_on_pi2_fetcher.context();
707 } else {
708 LOG(FATAL) << "Unknown channel";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700709 }
710
Austin Schuh89c9b812021-02-20 14:42:10 -0800711 // Confirm the forwarded message has matching timestamps to the
712 // timestamps we got back.
713 EXPECT_EQ(pi2_context->queue_index, header.queue_index());
714 EXPECT_EQ(pi2_context->remote_queue_index,
715 header.remote_queue_index());
716 EXPECT_EQ(pi2_context->monotonic_event_time,
717 header_monotonic_sent_time);
718 EXPECT_EQ(pi2_context->realtime_event_time,
719 header_realtime_sent_time);
720 EXPECT_EQ(pi2_context->realtime_remote_time,
721 header_realtime_remote_time);
722 EXPECT_EQ(pi2_context->monotonic_remote_time,
723 header_monotonic_remote_time);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700724
Austin Schuh89c9b812021-02-20 14:42:10 -0800725 // Confirm the forwarded message also matches the source message.
726 EXPECT_EQ(pi1_context->queue_index, header.remote_queue_index());
727 EXPECT_EQ(pi1_context->monotonic_event_time,
728 header_monotonic_remote_time);
729 EXPECT_EQ(pi1_context->realtime_event_time,
730 header_realtime_remote_time);
731 });
732 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700733
Austin Schuh4c3b9702020-08-30 11:34:55 -0700734 simulated_event_loop_factory.RunFor(chrono::seconds(10) -
735 chrono::milliseconds(500) +
736 chrono::milliseconds(5));
737
738 EXPECT_EQ(pi1_pong_counter.count(), 1001);
739 EXPECT_EQ(pi2_pong_counter.count(), 1001);
740
741 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 100);
742 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 100);
743 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 100);
744 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 100);
745 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 100);
746 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 100);
747 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 100);
748
Austin Schuh20ac95d2020-12-05 17:24:19 -0800749 EXPECT_EQ(pi1_server_statistics_count, 10);
750 EXPECT_EQ(pi2_server_statistics_count, 10);
751 EXPECT_EQ(pi3_server_statistics_count, 10);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700752
753 EXPECT_EQ(pi1_client_statistics_count, 95);
754 EXPECT_EQ(pi2_client_statistics_count, 95);
755 EXPECT_EQ(pi3_client_statistics_count, 95);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700756
757 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -0800758 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 1101);
759 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 1101);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700760}
761
762// Tests that an offset between nodes can be recovered and shows up in
763// ServerStatistics correctly.
764TEST(SimulatedEventLoopTest, MultinodePingPongWithOffset) {
765 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
Austin Schuh373f1762021-06-02 21:07:09 -0700766 aos::configuration::ReadConfig(ArtifactPath(
767 "aos/events/multinode_pingpong_test_combined_config.json"));
Austin Schuh4c3b9702020-08-30 11:34:55 -0700768 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
Austin Schuh87dd3832021-01-01 23:07:31 -0800769 const size_t pi1_index = configuration::GetNodeIndex(&config.message(), pi1);
770 ASSERT_EQ(pi1_index, 0u);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700771 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
Austin Schuh87dd3832021-01-01 23:07:31 -0800772 const size_t pi2_index = configuration::GetNodeIndex(&config.message(), pi2);
773 ASSERT_EQ(pi2_index, 1u);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700774 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
Austin Schuh87dd3832021-01-01 23:07:31 -0800775 const size_t pi3_index = configuration::GetNodeIndex(&config.message(), pi3);
776 ASSERT_EQ(pi3_index, 2u);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700777
Austin Schuh87dd3832021-01-01 23:07:31 -0800778 message_bridge::TestingTimeConverter time(
779 configuration::NodesCount(&config.message()));
Austin Schuh4c3b9702020-08-30 11:34:55 -0700780 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
781 NodeEventLoopFactory *pi2_factory =
782 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2);
Austin Schuh87dd3832021-01-01 23:07:31 -0800783 pi2_factory->SetTimeConverter(&time);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700784
785 constexpr chrono::milliseconds kOffset{1501};
Austin Schuh87dd3832021-01-01 23:07:31 -0800786 time.AddNextTimestamp(
787 distributed_clock::epoch(),
788 {monotonic_clock::epoch(), monotonic_clock::epoch() + kOffset,
789 monotonic_clock::epoch()});
Austin Schuh4c3b9702020-08-30 11:34:55 -0700790
791 std::unique_ptr<EventLoop> ping_event_loop =
792 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
793 Ping ping(ping_event_loop.get());
794
795 std::unique_ptr<EventLoop> pong_event_loop =
796 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
797 Pong pong(pong_event_loop.get());
798
Austin Schuh8fb315a2020-11-19 22:33:58 -0800799 // Wait to let timestamp estimation start up before looking for the results.
800 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
801
Austin Schuh87dd3832021-01-01 23:07:31 -0800802 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
803 simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
804
Austin Schuh4c3b9702020-08-30 11:34:55 -0700805 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
806 simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
807
808 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
809 simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
810
Austin Schuh4c3b9702020-08-30 11:34:55 -0700811 // Confirm the offsets are being recovered correctly.
812 int pi1_server_statistics_count = 0;
813 pi1_pong_counter_event_loop->MakeWatcher(
814 "/pi1/aos", [&pi1_server_statistics_count,
815 kOffset](const message_bridge::ServerStatistics &stats) {
816 VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
817 EXPECT_EQ(stats.connections()->size(), 2u);
818 for (const message_bridge::ServerConnection *connection :
819 *stats.connections()) {
820 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800821 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700822 if (connection->node()->name()->string_view() == "pi2") {
823 EXPECT_EQ(connection->monotonic_offset(),
824 chrono::nanoseconds(kOffset).count());
825 } else if (connection->node()->name()->string_view() == "pi3") {
826 EXPECT_EQ(connection->monotonic_offset(), 0);
827 } else {
828 LOG(FATAL) << "Unknown connection";
829 }
830
831 EXPECT_TRUE(connection->has_monotonic_offset());
832 }
833 ++pi1_server_statistics_count;
834 });
835
836 int pi2_server_statistics_count = 0;
837 pi2_pong_counter_event_loop->MakeWatcher(
838 "/pi2/aos", [&pi2_server_statistics_count,
839 kOffset](const message_bridge::ServerStatistics &stats) {
840 VLOG(1) << "pi2 ServerStatistics " << FlatbufferToJson(&stats);
841 EXPECT_EQ(stats.connections()->size(), 1u);
842
843 const message_bridge::ServerConnection *connection =
844 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800845 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700846 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
847 EXPECT_TRUE(connection->has_monotonic_offset());
848 EXPECT_EQ(connection->monotonic_offset(),
849 -chrono::nanoseconds(kOffset).count());
850 ++pi2_server_statistics_count;
851 });
852
853 int pi3_server_statistics_count = 0;
854 pi3_pong_counter_event_loop->MakeWatcher(
855 "/pi3/aos", [&pi3_server_statistics_count](
856 const message_bridge::ServerStatistics &stats) {
857 VLOG(1) << "pi3 ServerStatistics " << FlatbufferToJson(&stats);
858 EXPECT_EQ(stats.connections()->size(), 1u);
859
860 const message_bridge::ServerConnection *connection =
861 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800862 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700863 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
864 EXPECT_TRUE(connection->has_monotonic_offset());
865 EXPECT_EQ(connection->monotonic_offset(), 0);
866 ++pi3_server_statistics_count;
867 });
868
869 simulated_event_loop_factory.RunFor(chrono::seconds(10) -
870 chrono::milliseconds(500) +
871 chrono::milliseconds(5));
872
Austin Schuh20ac95d2020-12-05 17:24:19 -0800873 EXPECT_EQ(pi1_server_statistics_count, 10);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700874 EXPECT_EQ(pi2_server_statistics_count, 9);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800875 EXPECT_EQ(pi3_server_statistics_count, 10);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700876}
877
878// Test that disabling statistics actually disables them.
Austin Schuh89c9b812021-02-20 14:42:10 -0800879TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeWithoutStatistics) {
Austin Schuh4c3b9702020-08-30 11:34:55 -0700880 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
881 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
882 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
883
884 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
885 simulated_event_loop_factory.DisableStatistics();
886
887 std::unique_ptr<EventLoop> ping_event_loop =
888 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
889 Ping ping(ping_event_loop.get());
890
891 std::unique_ptr<EventLoop> pong_event_loop =
892 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
893 Pong pong(pong_event_loop.get());
894
895 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
896 simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
897
898 MessageCounter<examples::Pong> pi2_pong_counter(
899 pi2_pong_counter_event_loop.get(), "/test");
900
901 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
902 simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
903
904 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
905 simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
906
907 MessageCounter<examples::Pong> pi1_pong_counter(
908 pi1_pong_counter_event_loop.get(), "/test");
909
910 // Count timestamps.
911 MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
912 pi1_pong_counter_event_loop.get(), "/pi1/aos");
913 MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
914 pi2_pong_counter_event_loop.get(), "/pi1/aos");
915 MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
916 pi3_pong_counter_event_loop.get(), "/pi1/aos");
917 MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
918 pi1_pong_counter_event_loop.get(), "/pi2/aos");
919 MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
920 pi2_pong_counter_event_loop.get(), "/pi2/aos");
921 MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
922 pi1_pong_counter_event_loop.get(), "/pi3/aos");
923 MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
924 pi3_pong_counter_event_loop.get(), "/pi3/aos");
925
Austin Schuh2f8fd752020-09-01 22:38:28 -0700926 // Count remote timestamps
Austin Schuh89c9b812021-02-20 14:42:10 -0800927 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
928 remote_timestamps_pi2_on_pi1 =
929 MakePi2OnPi1MessageCounters(pi1_pong_counter_event_loop.get());
930 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
931 remote_timestamps_pi1_on_pi2 =
932 MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700933
Austin Schuh4c3b9702020-08-30 11:34:55 -0700934 MessageCounter<message_bridge::ServerStatistics>
935 pi1_server_statistics_counter(pi1_pong_counter_event_loop.get(),
936 "/pi1/aos");
937 MessageCounter<message_bridge::ServerStatistics>
938 pi2_server_statistics_counter(pi2_pong_counter_event_loop.get(),
939 "/pi2/aos");
940 MessageCounter<message_bridge::ServerStatistics>
941 pi3_server_statistics_counter(pi3_pong_counter_event_loop.get(),
942 "/pi3/aos");
943
944 MessageCounter<message_bridge::ClientStatistics>
945 pi1_client_statistics_counter(pi1_pong_counter_event_loop.get(),
946 "/pi1/aos");
947 MessageCounter<message_bridge::ClientStatistics>
948 pi2_client_statistics_counter(pi2_pong_counter_event_loop.get(),
949 "/pi2/aos");
950 MessageCounter<message_bridge::ClientStatistics>
951 pi3_client_statistics_counter(pi3_pong_counter_event_loop.get(),
952 "/pi3/aos");
Austin Schuh898f4972020-01-11 17:21:25 -0800953
954 simulated_event_loop_factory.RunFor(chrono::seconds(10) +
955 chrono::milliseconds(5));
956
Austin Schuh4c3b9702020-08-30 11:34:55 -0700957 EXPECT_EQ(pi1_pong_counter.count(), 1001u);
958 EXPECT_EQ(pi2_pong_counter.count(), 1001u);
959
960 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 0u);
961 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 0u);
962 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 0u);
963 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 0u);
964 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 0u);
965 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 0u);
966 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 0u);
967
968 EXPECT_EQ(pi1_server_statistics_counter.count(), 0u);
969 EXPECT_EQ(pi2_server_statistics_counter.count(), 0u);
970 EXPECT_EQ(pi3_server_statistics_counter.count(), 0u);
971
972 EXPECT_EQ(pi1_client_statistics_counter.count(), 0u);
973 EXPECT_EQ(pi2_client_statistics_counter.count(), 0u);
974 EXPECT_EQ(pi3_client_statistics_counter.count(), 0u);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700975
976 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -0800977 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 1001);
978 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 1001);
Austin Schuh898f4972020-01-11 17:21:25 -0800979}
980
Austin Schuhc0b0f722020-12-12 18:36:06 -0800981bool AllConnected(const message_bridge::ServerStatistics *server_statistics) {
982 for (const message_bridge::ServerConnection *connection :
983 *server_statistics->connections()) {
984 if (connection->state() != message_bridge::State::CONNECTED) {
985 return false;
986 }
987 }
988 return true;
989}
990
991bool AllConnectedBut(const message_bridge::ServerStatistics *server_statistics,
992 std::string_view target) {
993 for (const message_bridge::ServerConnection *connection :
994 *server_statistics->connections()) {
995 if (connection->node()->name()->string_view() == target) {
996 if (connection->state() == message_bridge::State::CONNECTED) {
997 return false;
998 }
999 } else {
1000 if (connection->state() != message_bridge::State::CONNECTED) {
1001 return false;
1002 }
1003 }
1004 }
1005 return true;
1006}
1007
1008bool AllConnected(const message_bridge::ClientStatistics *client_statistics) {
1009 for (const message_bridge::ClientConnection *connection :
1010 *client_statistics->connections()) {
1011 if (connection->state() != message_bridge::State::CONNECTED) {
1012 return false;
1013 }
1014 }
1015 return true;
1016}
1017
1018bool AllConnectedBut(const message_bridge::ClientStatistics *client_statistics,
1019 std::string_view target) {
1020 for (const message_bridge::ClientConnection *connection :
1021 *client_statistics->connections()) {
1022 if (connection->node()->name()->string_view() == target) {
1023 if (connection->state() == message_bridge::State::CONNECTED) {
1024 return false;
1025 }
1026 } else {
1027 if (connection->state() != message_bridge::State::CONNECTED) {
1028 return false;
1029 }
1030 }
1031 }
1032 return true;
1033}
1034
1035// Test that disconnecting nodes actually disconnects them.
Austin Schuh89c9b812021-02-20 14:42:10 -08001036TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeDisconnect) {
Austin Schuhc0b0f722020-12-12 18:36:06 -08001037 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
1038 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
1039 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
1040
1041 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
1042
1043 std::unique_ptr<EventLoop> ping_event_loop =
1044 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
1045 Ping ping(ping_event_loop.get());
1046
1047 std::unique_ptr<EventLoop> pong_event_loop =
1048 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
1049 Pong pong(pong_event_loop.get());
1050
1051 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
1052 simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
1053
1054 MessageCounter<examples::Pong> pi2_pong_counter(
1055 pi2_pong_counter_event_loop.get(), "/test");
1056
1057 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
1058 simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
1059
1060 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
1061 simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
1062
1063 MessageCounter<examples::Pong> pi1_pong_counter(
1064 pi1_pong_counter_event_loop.get(), "/test");
1065
1066 // Count timestamps.
1067 MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
1068 pi1_pong_counter_event_loop.get(), "/pi1/aos");
1069 MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
1070 pi2_pong_counter_event_loop.get(), "/pi1/aos");
1071 MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
1072 pi3_pong_counter_event_loop.get(), "/pi1/aos");
1073 MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
1074 pi1_pong_counter_event_loop.get(), "/pi2/aos");
1075 MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
1076 pi2_pong_counter_event_loop.get(), "/pi2/aos");
1077 MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
1078 pi1_pong_counter_event_loop.get(), "/pi3/aos");
1079 MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
1080 pi3_pong_counter_event_loop.get(), "/pi3/aos");
1081
1082 // Count remote timestamps
Austin Schuh89c9b812021-02-20 14:42:10 -08001083 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1084 remote_timestamps_pi2_on_pi1 =
1085 MakePi2OnPi1MessageCounters(pi1_pong_counter_event_loop.get());
1086 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1087 remote_timestamps_pi1_on_pi2 =
1088 MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001089
1090 MessageCounter<message_bridge::ServerStatistics>
1091 pi1_server_statistics_counter(pi1_pong_counter_event_loop.get(),
1092 "/pi1/aos");
1093 aos::Fetcher<message_bridge::ServerStatistics> pi1_server_statistics_fetcher =
1094 pi1_pong_counter_event_loop
1095 ->MakeFetcher<message_bridge::ServerStatistics>("/pi1/aos");
1096 aos::Fetcher<message_bridge::ClientStatistics> pi1_client_statistics_fetcher =
1097 pi1_pong_counter_event_loop
1098 ->MakeFetcher<message_bridge::ClientStatistics>("/pi1/aos");
1099
1100 MessageCounter<message_bridge::ServerStatistics>
1101 pi2_server_statistics_counter(pi2_pong_counter_event_loop.get(),
1102 "/pi2/aos");
1103 aos::Fetcher<message_bridge::ServerStatistics> pi2_server_statistics_fetcher =
1104 pi2_pong_counter_event_loop
1105 ->MakeFetcher<message_bridge::ServerStatistics>("/pi2/aos");
1106 aos::Fetcher<message_bridge::ClientStatistics> pi2_client_statistics_fetcher =
1107 pi2_pong_counter_event_loop
1108 ->MakeFetcher<message_bridge::ClientStatistics>("/pi2/aos");
1109
1110 MessageCounter<message_bridge::ServerStatistics>
1111 pi3_server_statistics_counter(pi3_pong_counter_event_loop.get(),
1112 "/pi3/aos");
1113 aos::Fetcher<message_bridge::ServerStatistics> pi3_server_statistics_fetcher =
1114 pi3_pong_counter_event_loop
1115 ->MakeFetcher<message_bridge::ServerStatistics>("/pi3/aos");
1116 aos::Fetcher<message_bridge::ClientStatistics> pi3_client_statistics_fetcher =
1117 pi3_pong_counter_event_loop
1118 ->MakeFetcher<message_bridge::ClientStatistics>("/pi3/aos");
1119
1120 MessageCounter<message_bridge::ClientStatistics>
1121 pi1_client_statistics_counter(pi1_pong_counter_event_loop.get(),
1122 "/pi1/aos");
1123 MessageCounter<message_bridge::ClientStatistics>
1124 pi2_client_statistics_counter(pi2_pong_counter_event_loop.get(),
1125 "/pi2/aos");
1126 MessageCounter<message_bridge::ClientStatistics>
1127 pi3_client_statistics_counter(pi3_pong_counter_event_loop.get(),
1128 "/pi3/aos");
1129
1130 simulated_event_loop_factory.RunFor(chrono::seconds(2) +
1131 chrono::milliseconds(5));
1132
1133 EXPECT_EQ(pi1_pong_counter.count(), 201u);
1134 EXPECT_EQ(pi2_pong_counter.count(), 201u);
1135
1136 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 20u);
1137 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 20u);
1138 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 20u);
1139 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 20u);
1140 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 20u);
1141 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 20u);
1142 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 20u);
1143
1144 EXPECT_EQ(pi1_server_statistics_counter.count(), 2u);
1145 EXPECT_EQ(pi2_server_statistics_counter.count(), 2u);
1146 EXPECT_EQ(pi3_server_statistics_counter.count(), 2u);
1147
1148 EXPECT_EQ(pi1_client_statistics_counter.count(), 20u);
1149 EXPECT_EQ(pi2_client_statistics_counter.count(), 20u);
1150 EXPECT_EQ(pi3_client_statistics_counter.count(), 20u);
1151
1152 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001153 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 221);
1154 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 221);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001155
1156 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1157 EXPECT_TRUE(AllConnected(pi1_server_statistics_fetcher.get()))
1158 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1159 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1160 EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
1161 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1162 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1163 EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
1164 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1165 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1166 EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
1167 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1168 EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
1169 EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
1170 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
1171 EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
1172 EXPECT_TRUE(AllConnected(pi3_client_statistics_fetcher.get()))
1173 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1174
1175 simulated_event_loop_factory.GetNodeEventLoopFactory(pi1)->Disconnect(pi3);
1176
1177 simulated_event_loop_factory.RunFor(chrono::seconds(2));
1178
1179 EXPECT_EQ(pi1_pong_counter.count(), 401u);
1180 EXPECT_EQ(pi2_pong_counter.count(), 401u);
1181
1182 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 40u);
1183 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 40u);
1184 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 20u);
1185 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 40u);
1186 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 40u);
1187 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 40u);
1188 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 40u);
1189
1190 EXPECT_EQ(pi1_server_statistics_counter.count(), 4u);
1191 EXPECT_EQ(pi2_server_statistics_counter.count(), 4u);
1192 EXPECT_EQ(pi3_server_statistics_counter.count(), 4u);
1193
1194 EXPECT_EQ(pi1_client_statistics_counter.count(), 40u);
1195 EXPECT_EQ(pi2_client_statistics_counter.count(), 40u);
1196 EXPECT_EQ(pi3_client_statistics_counter.count(), 40u);
1197
1198 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001199 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 441);
1200 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 441);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001201
1202 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1203 EXPECT_TRUE(AllConnectedBut(pi1_server_statistics_fetcher.get(), "pi3"))
1204 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1205 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1206 EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
1207 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1208 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1209 EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
1210 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1211 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1212 EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
1213 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1214 EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
1215 EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
1216 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
1217 EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
1218 EXPECT_TRUE(AllConnectedBut(pi3_client_statistics_fetcher.get(), "pi1"))
1219 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1220
1221 simulated_event_loop_factory.GetNodeEventLoopFactory(pi1)->Connect(pi3);
1222
1223 simulated_event_loop_factory.RunFor(chrono::seconds(2));
1224
1225 EXPECT_EQ(pi1_pong_counter.count(), 601u);
1226 EXPECT_EQ(pi2_pong_counter.count(), 601u);
1227
1228 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 60u);
1229 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 60u);
1230 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 40u);
1231 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 60u);
1232 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 60u);
1233 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 60u);
1234 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 60u);
1235
1236 EXPECT_EQ(pi1_server_statistics_counter.count(), 6u);
1237 EXPECT_EQ(pi2_server_statistics_counter.count(), 6u);
1238 EXPECT_EQ(pi3_server_statistics_counter.count(), 6u);
1239
1240 EXPECT_EQ(pi1_client_statistics_counter.count(), 60u);
1241 EXPECT_EQ(pi2_client_statistics_counter.count(), 60u);
1242 EXPECT_EQ(pi3_client_statistics_counter.count(), 60u);
1243
1244 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001245 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 661);
1246 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 661);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001247
1248 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1249 EXPECT_TRUE(AllConnected(pi1_server_statistics_fetcher.get()))
1250 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1251 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1252 EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
1253 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1254 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1255 EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
1256 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1257 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1258 EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
1259 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1260 EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
1261 EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
1262 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
1263 EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
1264 EXPECT_TRUE(AllConnected(pi3_client_statistics_fetcher.get()))
1265 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1266}
1267
Austin Schuh2febf0d2020-09-21 22:24:30 -07001268// Tests that the time offset having a slope doesn't break the world.
1269// SimulatedMessageBridge has enough self consistency CHECK statements to
1270// confirm, and we can can also check a message in each direction to make sure
1271// it gets delivered as expected.
1272TEST(SimulatedEventLoopTest, MultinodePingPongWithOffsetAndSlope) {
1273 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
Austin Schuh373f1762021-06-02 21:07:09 -07001274 aos::configuration::ReadConfig(ArtifactPath(
1275 "aos/events/multinode_pingpong_test_combined_config.json"));
Austin Schuh2febf0d2020-09-21 22:24:30 -07001276 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
Austin Schuh87dd3832021-01-01 23:07:31 -08001277 const size_t pi1_index = configuration::GetNodeIndex(&config.message(), pi1);
1278 ASSERT_EQ(pi1_index, 0u);
Austin Schuh2febf0d2020-09-21 22:24:30 -07001279 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
Austin Schuh87dd3832021-01-01 23:07:31 -08001280 const size_t pi2_index = configuration::GetNodeIndex(&config.message(), pi2);
1281 ASSERT_EQ(pi2_index, 1u);
1282 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
1283 const size_t pi3_index = configuration::GetNodeIndex(&config.message(), pi3);
1284 ASSERT_EQ(pi3_index, 2u);
Austin Schuh2febf0d2020-09-21 22:24:30 -07001285
Austin Schuh87dd3832021-01-01 23:07:31 -08001286 message_bridge::TestingTimeConverter time(
1287 configuration::NodesCount(&config.message()));
Austin Schuh2febf0d2020-09-21 22:24:30 -07001288 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
1289 NodeEventLoopFactory *pi2_factory =
1290 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2);
Austin Schuh87dd3832021-01-01 23:07:31 -08001291 pi2_factory->SetTimeConverter(&time);
Austin Schuh2febf0d2020-09-21 22:24:30 -07001292
Austin Schuh2febf0d2020-09-21 22:24:30 -07001293 constexpr chrono::milliseconds kOffset{150100};
Austin Schuh87dd3832021-01-01 23:07:31 -08001294 time.AddNextTimestamp(
1295 distributed_clock::epoch(),
1296 {monotonic_clock::epoch(), monotonic_clock::epoch() + kOffset,
1297 monotonic_clock::epoch()});
1298 time.AddNextTimestamp(
1299 distributed_clock::epoch() + chrono::seconds(10),
1300 {monotonic_clock::epoch() + chrono::milliseconds(9999),
1301 monotonic_clock::epoch() + kOffset + chrono::seconds(10),
1302 monotonic_clock::epoch() + chrono::milliseconds(9999)});
Austin Schuh2febf0d2020-09-21 22:24:30 -07001303
1304 std::unique_ptr<EventLoop> ping_event_loop =
1305 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
1306 Ping ping(ping_event_loop.get());
1307
1308 std::unique_ptr<EventLoop> pong_event_loop =
1309 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
1310 Pong pong(pong_event_loop.get());
1311
1312 std::unique_ptr<EventLoop> pi1_counter_event_loop =
1313 simulated_event_loop_factory.MakeEventLoop("pi1_counter", pi1);
1314 std::unique_ptr<EventLoop> pi2_counter_event_loop =
1315 simulated_event_loop_factory.MakeEventLoop("pi2_counter", pi2);
1316
1317 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
1318 pi1_counter_event_loop->MakeFetcher<examples::Ping>("/test");
1319 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
1320 pi2_counter_event_loop->MakeFetcher<examples::Ping>("/test");
1321
1322 aos::Fetcher<examples::Pong> pong_on_pi2_fetcher =
1323 pi2_counter_event_loop->MakeFetcher<examples::Pong>("/test");
1324 aos::Fetcher<examples::Pong> pong_on_pi1_fetcher =
1325 pi1_counter_event_loop->MakeFetcher<examples::Pong>("/test");
1326
1327 // End after a pong message comes back. This will leave the latest messages
1328 // on all channels so we can look at timestamps easily and check they make
1329 // sense.
1330 std::unique_ptr<EventLoop> pi1_pong_ender =
1331 simulated_event_loop_factory.MakeEventLoop("pi2_counter", pi1);
1332 int count = 0;
1333 pi1_pong_ender->MakeWatcher(
1334 "/test", [&simulated_event_loop_factory, &count](const examples::Pong &) {
1335 if (++count == 100) {
1336 simulated_event_loop_factory.Exit();
1337 }
1338 });
1339
1340 // Run enough that messages should be delivered.
1341 simulated_event_loop_factory.Run();
1342
1343 // Grab the latest messages.
1344 EXPECT_TRUE(ping_on_pi1_fetcher.Fetch());
1345 EXPECT_TRUE(ping_on_pi2_fetcher.Fetch());
1346 EXPECT_TRUE(pong_on_pi1_fetcher.Fetch());
1347 EXPECT_TRUE(pong_on_pi2_fetcher.Fetch());
1348
1349 // Compute their time on the global distributed clock so we can compute
1350 // distance betwen them.
1351 const distributed_clock::time_point pi1_ping_time =
1352 simulated_event_loop_factory.GetNodeEventLoopFactory(pi1)
1353 ->ToDistributedClock(
1354 ping_on_pi1_fetcher.context().monotonic_event_time);
1355 const distributed_clock::time_point pi2_ping_time =
1356 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
1357 ->ToDistributedClock(
1358 ping_on_pi2_fetcher.context().monotonic_event_time);
1359 const distributed_clock::time_point pi1_pong_time =
1360 simulated_event_loop_factory.GetNodeEventLoopFactory(pi1)
1361 ->ToDistributedClock(
1362 pong_on_pi1_fetcher.context().monotonic_event_time);
1363 const distributed_clock::time_point pi2_pong_time =
1364 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
1365 ->ToDistributedClock(
1366 pong_on_pi2_fetcher.context().monotonic_event_time);
1367
1368 // And confirm the delivery delay is just about exactly 150 uS for both
1369 // directions like expected. There will be a couple ns of rounding errors in
1370 // the conversion functions that aren't worth accounting for right now. This
1371 // will either be really close, or really far.
1372 EXPECT_GE(pi2_ping_time, chrono::microseconds(150) - chrono::nanoseconds(10) +
1373 pi1_ping_time);
1374 EXPECT_LE(pi2_ping_time, chrono::microseconds(150) + chrono::nanoseconds(10) +
1375 pi1_ping_time);
1376
1377 EXPECT_GE(pi1_pong_time, chrono::microseconds(150) - chrono::nanoseconds(10) +
1378 pi2_pong_time);
1379 EXPECT_LE(pi1_pong_time, chrono::microseconds(150) + chrono::nanoseconds(10) +
1380 pi2_pong_time);
1381}
1382
Austin Schuh4c570ea2020-11-19 23:13:24 -08001383void SendPing(aos::Sender<examples::Ping> *sender, int value) {
1384 aos::Sender<examples::Ping>::Builder builder = sender->MakeBuilder();
1385 examples::Ping::Builder ping_builder = builder.MakeBuilder<examples::Ping>();
1386 ping_builder.add_value(value);
1387 builder.Send(ping_builder.Finish());
1388}
1389
1390// Tests that reliable (and unreliable) ping messages get forwarded as expected.
Austin Schuh89c9b812021-02-20 14:42:10 -08001391TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeStartupTesting) {
Austin Schuh4c570ea2020-11-19 23:13:24 -08001392 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
1393 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
1394
1395 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
1396
1397 std::unique_ptr<EventLoop> ping_event_loop =
1398 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
1399 aos::Sender<examples::Ping> pi1_reliable_sender =
1400 ping_event_loop->MakeSender<examples::Ping>("/reliable");
1401 aos::Sender<examples::Ping> pi1_unreliable_sender =
1402 ping_event_loop->MakeSender<examples::Ping>("/unreliable");
1403 SendPing(&pi1_reliable_sender, 1);
1404 SendPing(&pi1_unreliable_sender, 1);
1405
1406 std::unique_ptr<EventLoop> pi2_pong_event_loop =
1407 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
1408 MessageCounter<examples::Ping> pi2_reliable_counter(pi2_pong_event_loop.get(),
1409 "/reliable");
1410 MessageCounter<examples::Ping> pi2_unreliable_counter(
1411 pi2_pong_event_loop.get(), "/unreliable");
1412 aos::Fetcher<examples::Ping> reliable_on_pi2_fetcher =
1413 pi2_pong_event_loop->MakeFetcher<examples::Ping>("/reliable");
1414 aos::Fetcher<examples::Ping> unreliable_on_pi2_fetcher =
1415 pi2_pong_event_loop->MakeFetcher<examples::Ping>("/unreliable");
1416
1417 const size_t reliable_channel_index = configuration::ChannelIndex(
1418 pi2_pong_event_loop->configuration(), reliable_on_pi2_fetcher.channel());
1419
1420 std::unique_ptr<EventLoop> pi1_remote_timestamp =
1421 simulated_event_loop_factory.MakeEventLoop("pi1_remote_timestamp", pi1);
1422
Austin Schuheeaa2022021-01-02 21:52:03 -08001423 const chrono::nanoseconds network_delay =
1424 simulated_event_loop_factory.network_delay();
1425
Austin Schuh4c570ea2020-11-19 23:13:24 -08001426 int reliable_timestamp_count = 0;
1427 pi1_remote_timestamp->MakeWatcher(
Austin Schuh89c9b812021-02-20 14:42:10 -08001428 shared() ? "/pi1/aos/remote_timestamps/pi2"
1429 : "/pi1/aos/remote_timestamps/pi2/reliable/aos-examples-Ping",
Austin Schuh20ac95d2020-12-05 17:24:19 -08001430 [reliable_channel_index, &reliable_timestamp_count,
Austin Schuheeaa2022021-01-02 21:52:03 -08001431 &simulated_event_loop_factory, pi2, network_delay, &pi2_pong_event_loop,
1432 &pi1_remote_timestamp](const RemoteMessage &header) {
Austin Schuh20ac95d2020-12-05 17:24:19 -08001433 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuhcdd90272021-03-15 12:46:16 -07001434 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
Austin Schuh20ac95d2020-12-05 17:24:19 -08001435 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
Austin Schuhcdd90272021-03-15 12:46:16 -07001436 ->boot_uuid());
Austin Schuh4c570ea2020-11-19 23:13:24 -08001437 VLOG(1) << aos::FlatbufferToJson(&header);
1438 if (header.channel_index() == reliable_channel_index) {
1439 ++reliable_timestamp_count;
1440 }
Austin Schuheeaa2022021-01-02 21:52:03 -08001441
1442 const aos::monotonic_clock::time_point header_monotonic_sent_time(
1443 chrono::nanoseconds(header.monotonic_sent_time()));
1444
1445 EXPECT_EQ(pi1_remote_timestamp->context().monotonic_event_time,
1446 header_monotonic_sent_time + network_delay +
1447 (pi1_remote_timestamp->monotonic_now() -
1448 pi2_pong_event_loop->monotonic_now()));
Austin Schuh4c570ea2020-11-19 23:13:24 -08001449 });
1450
1451 // Wait to let timestamp estimation start up before looking for the results.
1452 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
1453
1454 EXPECT_EQ(pi2_reliable_counter.count(), 1u);
1455 // This one isn't reliable, but was sent before the start. It should *not* be
1456 // delivered.
1457 EXPECT_EQ(pi2_unreliable_counter.count(), 0u);
1458 // Confirm we got a timestamp logged for the message that was forwarded.
1459 EXPECT_EQ(reliable_timestamp_count, 1u);
1460
1461 SendPing(&pi1_reliable_sender, 2);
1462 SendPing(&pi1_unreliable_sender, 2);
1463 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
1464 EXPECT_EQ(pi2_reliable_counter.count(), 2u);
1465 EXPECT_EQ(pi2_unreliable_counter.count(), 1u);
1466
1467 EXPECT_EQ(reliable_timestamp_count, 2u);
1468}
1469
Austin Schuh20ac95d2020-12-05 17:24:19 -08001470// Tests that rebooting a node changes the ServerStatistics message and the
1471// RemoteTimestamp message.
Austin Schuh89c9b812021-02-20 14:42:10 -08001472TEST_P(RemoteMessageSimulatedEventLoopTest, BootUUIDTest) {
Austin Schuh20ac95d2020-12-05 17:24:19 -08001473 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
1474 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
1475
1476 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
1477
1478 std::unique_ptr<EventLoop> ping_event_loop =
1479 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
1480 Ping ping(ping_event_loop.get());
1481
1482 std::unique_ptr<EventLoop> pong_event_loop =
1483 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
1484 Pong pong(pong_event_loop.get());
1485
1486 std::unique_ptr<EventLoop> pi1_remote_timestamp =
1487 simulated_event_loop_factory.MakeEventLoop("pi1_remote_timestamp", pi1);
Austin Schuh8902fa52021-03-14 22:39:24 -07001488 UUID expected_boot_uuid =
1489 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)->boot_uuid();
Austin Schuh20ac95d2020-12-05 17:24:19 -08001490
1491 int timestamp_count = 0;
1492 pi1_remote_timestamp->MakeWatcher(
Austin Schuh8902fa52021-03-14 22:39:24 -07001493 "/pi2/aos", [&expected_boot_uuid,
1494 &pi1_remote_timestamp](const message_bridge::Timestamp &) {
Austin Schuha9012be2021-07-21 15:19:11 -07001495 EXPECT_EQ(pi1_remote_timestamp->context().source_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -07001496 expected_boot_uuid);
1497 });
1498 pi1_remote_timestamp->MakeWatcher(
1499 "/test",
1500 [&expected_boot_uuid, &pi1_remote_timestamp](const examples::Pong &) {
Austin Schuha9012be2021-07-21 15:19:11 -07001501 EXPECT_EQ(pi1_remote_timestamp->context().source_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -07001502 expected_boot_uuid);
1503 });
1504 pi1_remote_timestamp->MakeWatcher(
Austin Schuh89c9b812021-02-20 14:42:10 -08001505 shared() ? "/pi1/aos/remote_timestamps/pi2"
1506 : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping",
Austin Schuh20ac95d2020-12-05 17:24:19 -08001507 [&timestamp_count, &expected_boot_uuid](const RemoteMessage &header) {
1508 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuhcdd90272021-03-15 12:46:16 -07001509 EXPECT_EQ(UUID::FromVector(header.boot_uuid()), expected_boot_uuid);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001510 VLOG(1) << aos::FlatbufferToJson(&header);
1511 ++timestamp_count;
1512 });
1513
1514 int pi1_server_statistics_count = 0;
1515 pi1_remote_timestamp->MakeWatcher(
1516 "/pi1/aos", [&pi1_server_statistics_count, &expected_boot_uuid](
1517 const message_bridge::ServerStatistics &stats) {
1518 VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
1519 for (const message_bridge::ServerConnection *connection :
1520 *stats.connections()) {
1521 EXPECT_TRUE(connection->has_boot_uuid());
1522 if (connection->node()->name()->string_view() == "pi2") {
1523 EXPECT_EQ(expected_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -07001524 UUID::FromString(connection->boot_uuid()))
Austin Schuh20ac95d2020-12-05 17:24:19 -08001525 << " : Got " << aos::FlatbufferToJson(&stats);
1526 ++pi1_server_statistics_count;
1527 }
1528 }
1529 });
1530
1531 // Let a couple of ServerStatistics messages show up before rebooting.
1532 simulated_event_loop_factory.RunFor(chrono::milliseconds(2001));
1533
1534 EXPECT_GT(timestamp_count, 100);
1535 EXPECT_GE(pi1_server_statistics_count, 1u);
1536
1537 // Confirm that reboot changes the UUID.
1538 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)->Reboot();
1539
Austin Schuh8902fa52021-03-14 22:39:24 -07001540 EXPECT_NE(
1541 expected_boot_uuid,
1542 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)->boot_uuid());
Austin Schuh20ac95d2020-12-05 17:24:19 -08001543
1544 expected_boot_uuid =
Austin Schuh8902fa52021-03-14 22:39:24 -07001545 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)->boot_uuid();
Austin Schuh20ac95d2020-12-05 17:24:19 -08001546 timestamp_count = 0;
1547 pi1_server_statistics_count = 0;
1548
1549 simulated_event_loop_factory.RunFor(chrono::milliseconds(2000));
1550 EXPECT_GT(timestamp_count, 100);
1551 EXPECT_GE(pi1_server_statistics_count, 1u);
1552}
1553
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -07001554INSTANTIATE_TEST_SUITE_P(
Austin Schuh89c9b812021-02-20 14:42:10 -08001555 All, RemoteMessageSimulatedEventLoopTest,
1556 ::testing::Values(
1557 Param{"multinode_pingpong_test_combined_config.json", true},
1558 Param{"multinode_pingpong_test_split_config.json", false}));
1559
Neil Balchc8f41ed2018-01-20 22:06:53 -08001560} // namespace testing
1561} // namespace aos