blob: a83243d175ca1641780a5bee20a6800ffdaffc2c [file] [log] [blame]
Alex Perrycb7da4b2019-08-28 19:35:56 -07001#include "aos/events/simulated_event_loop.h"
2
milind1f1dca32021-07-03 13:50:07 -07003#include <chrono>
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -07004#include <functional>
Austin Schuh5f1cc5c2019-12-01 18:01:11 -08005#include <string_view>
6
Alex Perrycb7da4b2019-08-28 19:35:56 -07007#include "aos/events/event_loop_param_test.h"
Austin Schuh2f8fd752020-09-01 22:38:28 -07008#include "aos/events/logging/logger_generated.h"
Austin Schuh01b4c352020-09-21 23:09:39 -07009#include "aos/events/message_counter.h"
Austin Schuh898f4972020-01-11 17:21:25 -080010#include "aos/events/ping_lib.h"
11#include "aos/events/pong_lib.h"
Austin Schuh7d87b672019-12-01 20:23:49 -080012#include "aos/events/test_message_generated.h"
Austin Schuh4c3b9702020-08-30 11:34:55 -070013#include "aos/network/message_bridge_client_generated.h"
14#include "aos/network/message_bridge_server_generated.h"
Austin Schuh0de30f32020-12-06 12:44:28 -080015#include "aos/network/remote_message_generated.h"
Austin Schuh87dd3832021-01-01 23:07:31 -080016#include "aos/network/testing_time_converter.h"
Austin Schuh4c3b9702020-08-30 11:34:55 -070017#include "aos/network/timestamp_generated.h"
Austin Schuh373f1762021-06-02 21:07:09 -070018#include "aos/testing/path.h"
Neil Balchc8f41ed2018-01-20 22:06:53 -080019#include "gtest/gtest.h"
20
21namespace aos {
22namespace testing {
Brian Silverman28d14302020-09-18 15:26:17 -070023namespace {
24
Austin Schuh373f1762021-06-02 21:07:09 -070025using aos::testing::ArtifactPath;
Brian Silverman28d14302020-09-18 15:26:17 -070026
Austin Schuh58646e22021-08-23 23:51:46 -070027using logger::BootTimestamp;
Austin Schuh0de30f32020-12-06 12:44:28 -080028using message_bridge::RemoteMessage;
Austin Schuh7267c532019-05-19 19:55:53 -070029namespace chrono = ::std::chrono;
30
Austin Schuh0de30f32020-12-06 12:44:28 -080031} // namespace
32
Neil Balchc8f41ed2018-01-20 22:06:53 -080033class SimulatedEventLoopTestFactory : public EventLoopTestFactory {
34 public:
Austin Schuh5f1cc5c2019-12-01 18:01:11 -080035 ::std::unique_ptr<EventLoop> Make(std::string_view name) override {
Austin Schuh217a9782019-12-21 23:02:50 -080036 MaybeMake();
Austin Schuhac0771c2020-01-07 18:36:30 -080037 return event_loop_factory_->MakeEventLoop(name, my_node());
Neil Balchc8f41ed2018-01-20 22:06:53 -080038 }
Austin Schuh5f1cc5c2019-12-01 18:01:11 -080039 ::std::unique_ptr<EventLoop> MakePrimary(std::string_view name) override {
Austin Schuh217a9782019-12-21 23:02:50 -080040 MaybeMake();
Austin Schuhac0771c2020-01-07 18:36:30 -080041 return event_loop_factory_->MakeEventLoop(name, my_node());
Austin Schuh44019f92019-05-19 19:58:27 -070042 }
43
Austin Schuh217a9782019-12-21 23:02:50 -080044 void Run() override { event_loop_factory_->Run(); }
45 void Exit() override { event_loop_factory_->Exit(); }
Austin Schuh44019f92019-05-19 19:58:27 -070046
Austin Schuh52d325c2019-06-23 18:59:06 -070047 // TODO(austin): Implement this. It's used currently for a phased loop test.
48 // I'm not sure how much that matters.
49 void SleepFor(::std::chrono::nanoseconds /*duration*/) override {}
50
Austin Schuh7d87b672019-12-01 20:23:49 -080051 void set_send_delay(std::chrono::nanoseconds send_delay) {
Austin Schuh217a9782019-12-21 23:02:50 -080052 MaybeMake();
53 event_loop_factory_->set_send_delay(send_delay);
Austin Schuh7d87b672019-12-01 20:23:49 -080054 }
55
Neil Balchc8f41ed2018-01-20 22:06:53 -080056 private:
Austin Schuh217a9782019-12-21 23:02:50 -080057 void MaybeMake() {
58 if (!event_loop_factory_) {
59 if (configuration()->has_nodes()) {
Austin Schuhac0771c2020-01-07 18:36:30 -080060 event_loop_factory_ =
61 std::make_unique<SimulatedEventLoopFactory>(configuration());
Austin Schuh217a9782019-12-21 23:02:50 -080062 } else {
63 event_loop_factory_ =
64 std::make_unique<SimulatedEventLoopFactory>(configuration());
65 }
66 }
67 }
68 std::unique_ptr<SimulatedEventLoopFactory> event_loop_factory_;
Neil Balchc8f41ed2018-01-20 22:06:53 -080069};
70
Austin Schuh6bae8252021-02-07 22:01:49 -080071auto CommonParameters() {
72 return ::testing::Combine(
73 ::testing::Values([]() { return new SimulatedEventLoopTestFactory(); }),
74 ::testing::Values(ReadMethod::COPY, ReadMethod::PIN),
75 ::testing::Values(DoTimingReports::kYes, DoTimingReports::kNo));
76}
Austin Schuh6b6dfa52019-06-12 20:16:20 -070077
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -070078INSTANTIATE_TEST_SUITE_P(SimulatedEventLoopCommonTest, AbstractEventLoopTest,
Austin Schuh66168842021-08-17 19:42:21 -070079 CommonParameters());
Brian Silverman77162972020-08-12 19:52:40 -070080
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -070081INSTANTIATE_TEST_SUITE_P(SimulatedEventLoopCommonDeathTest,
Austin Schuh66168842021-08-17 19:42:21 -070082 AbstractEventLoopDeathTest, CommonParameters());
Neil Balchc8f41ed2018-01-20 22:06:53 -080083
Austin Schuh89c9b812021-02-20 14:42:10 -080084// Parameters to run all the tests with.
85struct Param {
86 // The config file to use.
87 std::string config;
88 // If true, the RemoteMessage channel should be shared between all the remote
89 // channels. If false, there will be 1 RemoteMessage channel per remote
90 // channel.
91 bool shared;
92};
93
94class RemoteMessageSimulatedEventLoopTest
95 : public ::testing::TestWithParam<struct Param> {
96 public:
97 RemoteMessageSimulatedEventLoopTest()
98 : config(aos::configuration::ReadConfig(
Austin Schuh373f1762021-06-02 21:07:09 -070099 ArtifactPath(absl::StrCat("aos/events/", GetParam().config)))) {
Austin Schuh89c9b812021-02-20 14:42:10 -0800100 LOG(INFO) << "Config " << GetParam().config;
101 }
102
103 bool shared() const { return GetParam().shared; }
104
105 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
106 MakePi2OnPi1MessageCounters(aos::EventLoop *event_loop) {
107 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>> counters;
108 if (shared()) {
109 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
110 event_loop, "/aos/remote_timestamps/pi2"));
111 } else {
112 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
113 event_loop,
114 "/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp"));
115 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
116 event_loop, "/aos/remote_timestamps/pi2/test/aos-examples-Ping"));
117 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
118 event_loop, "/aos/remote_timestamps/pi2/reliable/aos-examples-Ping"));
119 }
120 return counters;
121 }
122
123 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
124 MakePi1OnPi2MessageCounters(aos::EventLoop *event_loop) {
125 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>> counters;
126 if (shared()) {
127 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
128 event_loop, "/aos/remote_timestamps/pi1"));
129 } else {
130 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
131 event_loop, "/aos/remote_timestamps/pi1/test/aos-examples-Pong"));
132 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
133 event_loop,
134 "/aos/remote_timestamps/pi1/pi2/aos/aos-message_bridge-Timestamp"));
135 }
136 return counters;
137 }
138
139 aos::FlatbufferDetachedBuffer<aos::Configuration> config;
140};
141
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800142class FunctionEvent : public EventScheduler::Event {
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700143 public:
144 FunctionEvent(std::function<void()> fn) : fn_(fn) {}
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800145
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700146 void Handle() noexcept override { fn_(); }
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800147
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700148 private:
149 std::function<void()> fn_;
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800150};
151
Neil Balchc8f41ed2018-01-20 22:06:53 -0800152// Test that creating an event and running the scheduler runs the event.
153TEST(EventSchedulerTest, ScheduleEvent) {
154 int counter = 0;
Austin Schuh8bd96322020-02-13 21:18:22 -0800155 EventSchedulerScheduler scheduler_scheduler;
Austin Schuh58646e22021-08-23 23:51:46 -0700156 EventScheduler scheduler(0);
Austin Schuh8bd96322020-02-13 21:18:22 -0800157 scheduler_scheduler.AddEventScheduler(&scheduler);
Neil Balchc8f41ed2018-01-20 22:06:53 -0800158
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800159 FunctionEvent e([&counter]() { counter += 1; });
160 scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(1), &e);
Austin Schuh8bd96322020-02-13 21:18:22 -0800161 scheduler_scheduler.Run();
Neil Balchc8f41ed2018-01-20 22:06:53 -0800162 EXPECT_EQ(counter, 1);
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800163 auto token =
164 scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(2), &e);
Neil Balchc8f41ed2018-01-20 22:06:53 -0800165 scheduler.Deschedule(token);
Austin Schuh8bd96322020-02-13 21:18:22 -0800166 scheduler_scheduler.Run();
Neil Balchc8f41ed2018-01-20 22:06:53 -0800167 EXPECT_EQ(counter, 1);
168}
169
170// Test that descheduling an already scheduled event doesn't run the event.
171TEST(EventSchedulerTest, DescheduleEvent) {
172 int counter = 0;
Austin Schuh8bd96322020-02-13 21:18:22 -0800173 EventSchedulerScheduler scheduler_scheduler;
Austin Schuh58646e22021-08-23 23:51:46 -0700174 EventScheduler scheduler(0);
Austin Schuh8bd96322020-02-13 21:18:22 -0800175 scheduler_scheduler.AddEventScheduler(&scheduler);
Neil Balchc8f41ed2018-01-20 22:06:53 -0800176
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800177 FunctionEvent e([&counter]() { counter += 1; });
178 auto token =
179 scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(1), &e);
Neil Balchc8f41ed2018-01-20 22:06:53 -0800180 scheduler.Deschedule(token);
Austin Schuh8bd96322020-02-13 21:18:22 -0800181 scheduler_scheduler.Run();
Neil Balchc8f41ed2018-01-20 22:06:53 -0800182 EXPECT_EQ(counter, 0);
183}
Austin Schuh44019f92019-05-19 19:58:27 -0700184
Austin Schuhe33c08d2022-02-03 18:15:21 -0800185// Test that TemporarilyStopAndRun respects and preserves running.
186TEST(EventSchedulerTest, TemporarilyStopAndRun) {
187 int counter = 0;
188 EventSchedulerScheduler scheduler_scheduler;
189 EventScheduler scheduler(0);
190 scheduler_scheduler.AddEventScheduler(&scheduler);
191
192 scheduler_scheduler.TemporarilyStopAndRun(
193 [&]() { CHECK(!scheduler_scheduler.is_running()); });
194 ASSERT_FALSE(scheduler_scheduler.is_running());
195
196 FunctionEvent e([&]() {
197 counter += 1;
198 CHECK(scheduler_scheduler.is_running());
199 scheduler_scheduler.TemporarilyStopAndRun(
200 [&]() { CHECK(!scheduler_scheduler.is_running()); });
201 CHECK(scheduler_scheduler.is_running());
202 });
203 scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(1), &e);
204 scheduler_scheduler.Run();
205 EXPECT_EQ(counter, 1);
206}
207
Austin Schuh8fb315a2020-11-19 22:33:58 -0800208// Test that sending a message after running gets properly notified.
209TEST(SimulatedEventLoopTest, SendAfterRunFor) {
210 SimulatedEventLoopTestFactory factory;
211
212 SimulatedEventLoopFactory simulated_event_loop_factory(
213 factory.configuration());
214
215 ::std::unique_ptr<EventLoop> ping_event_loop =
216 simulated_event_loop_factory.MakeEventLoop("ping");
217 aos::Sender<TestMessage> test_message_sender =
218 ping_event_loop->MakeSender<TestMessage>("/test");
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700219 ASSERT_EQ(SendTestMessage(test_message_sender), RawSender::Error::kOk);
Austin Schuh8fb315a2020-11-19 22:33:58 -0800220
221 std::unique_ptr<EventLoop> pong1_event_loop =
222 simulated_event_loop_factory.MakeEventLoop("pong");
223 MessageCounter<TestMessage> test_message_counter1(pong1_event_loop.get(),
224 "/test");
225
226 EXPECT_FALSE(ping_event_loop->is_running());
227
228 // Watchers start when you start running, so there should be nothing counted.
229 simulated_event_loop_factory.RunFor(chrono::seconds(1));
230 EXPECT_EQ(test_message_counter1.count(), 0u);
231
232 std::unique_ptr<EventLoop> pong2_event_loop =
233 simulated_event_loop_factory.MakeEventLoop("pong");
234 MessageCounter<TestMessage> test_message_counter2(pong2_event_loop.get(),
235 "/test");
236
237 // Pauses in the middle don't count though, so this should be counted.
238 // But, the fresh watcher shouldn't pick it up yet.
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700239 ASSERT_EQ(SendTestMessage(test_message_sender), RawSender::Error::kOk);
Austin Schuh8fb315a2020-11-19 22:33:58 -0800240
241 EXPECT_EQ(test_message_counter1.count(), 0u);
242 EXPECT_EQ(test_message_counter2.count(), 0u);
243 simulated_event_loop_factory.RunFor(chrono::seconds(1));
244
245 EXPECT_EQ(test_message_counter1.count(), 1u);
246 EXPECT_EQ(test_message_counter2.count(), 0u);
247}
248
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700249void TestSentTooFastCheckEdgeCase(
250 const std::function<RawSender::Error(int, int)> expected_err,
251 const bool send_twice_at_end) {
252 SimulatedEventLoopTestFactory factory;
253
254 auto event_loop = factory.MakePrimary("primary");
255
256 auto sender = event_loop->MakeSender<TestMessage>("/test");
257
258 const int queue_size = TestChannelQueueSize(event_loop.get());
259 int msgs_sent = 0;
260 event_loop->AddPhasedLoop(
261 [&](int) {
262 EXPECT_EQ(SendTestMessage(sender), expected_err(msgs_sent, queue_size));
263 msgs_sent++;
264
265 // If send_twice_at_end, send the last two messages (message
266 // queue_size and queue_size + 1) in the same iteration, meaning that
267 // we would be sending very slightly too fast. Otherwise, we will send
268 // message queue_size + 1 in the next iteration and we will continue
269 // to be sending exactly at the channel frequency.
270 if (send_twice_at_end && (msgs_sent == queue_size)) {
271 EXPECT_EQ(SendTestMessage(sender),
272 expected_err(msgs_sent, queue_size));
273 msgs_sent++;
274 }
275
276 if (msgs_sent > queue_size) {
277 factory.Exit();
278 }
279 },
280 std::chrono::duration_cast<std::chrono::nanoseconds>(
281 std::chrono::duration<double>(
282 1.0 / TestChannelFrequency(event_loop.get()))));
283
284 factory.Run();
285}
286
287// Tests that RawSender::Error::kMessagesSentTooFast is not returned
288// when messages are sent at the exact frequency of the channel.
289TEST(SimulatedEventLoopTest, SendingAtExactlyChannelFrequency) {
290 TestSentTooFastCheckEdgeCase([](int, int) { return RawSender::Error::kOk; },
291 false);
292}
293
294// Tests that RawSender::Error::kMessagesSentTooFast is returned
295// when sending exactly one more message than allowed in a channel storage
296// duration.
297TEST(SimulatedEventLoopTest, SendingSlightlyTooFast) {
298 TestSentTooFastCheckEdgeCase(
299 [](const int msgs_sent, const int queue_size) {
300 return (msgs_sent == queue_size ? RawSender::Error::kMessagesSentTooFast
301 : RawSender::Error::kOk);
302 },
303 true);
304}
305
Austin Schuh8fb315a2020-11-19 22:33:58 -0800306// Test that creating an event loop while running dies.
307TEST(SimulatedEventLoopDeathTest, MakeEventLoopWhileRunning) {
308 SimulatedEventLoopTestFactory factory;
309
310 SimulatedEventLoopFactory simulated_event_loop_factory(
311 factory.configuration());
312
313 ::std::unique_ptr<EventLoop> event_loop =
314 simulated_event_loop_factory.MakeEventLoop("ping");
315
316 auto timer = event_loop->AddTimer([&]() {
317 EXPECT_DEATH(
318 {
319 ::std::unique_ptr<EventLoop> event_loop2 =
320 simulated_event_loop_factory.MakeEventLoop("ping");
321 },
322 "event loop while running");
323 simulated_event_loop_factory.Exit();
324 });
325
326 event_loop->OnRun([&event_loop, &timer] {
327 timer->Setup(event_loop->monotonic_now() + chrono::milliseconds(50));
328 });
329
330 simulated_event_loop_factory.Run();
331}
332
333// Test that creating a watcher after running dies.
334TEST(SimulatedEventLoopDeathTest, MakeWatcherAfterRunning) {
335 SimulatedEventLoopTestFactory factory;
336
337 SimulatedEventLoopFactory simulated_event_loop_factory(
338 factory.configuration());
339
340 ::std::unique_ptr<EventLoop> event_loop =
341 simulated_event_loop_factory.MakeEventLoop("ping");
342
343 simulated_event_loop_factory.RunFor(chrono::seconds(1));
344
345 EXPECT_DEATH(
346 { MessageCounter<TestMessage> counter(event_loop.get(), "/test"); },
347 "Can't add a watcher after running");
348
349 ::std::unique_ptr<EventLoop> event_loop2 =
350 simulated_event_loop_factory.MakeEventLoop("ping");
351
352 simulated_event_loop_factory.RunFor(chrono::seconds(1));
353
354 EXPECT_DEATH(
355 { MessageCounter<TestMessage> counter(event_loop2.get(), "/test"); },
356 "Can't add a watcher after running");
357}
358
Austin Schuh44019f92019-05-19 19:58:27 -0700359// Test that running for a time period with no handlers causes time to progress
360// correctly.
361TEST(SimulatedEventLoopTest, RunForNoHandlers) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800362 SimulatedEventLoopTestFactory factory;
363
364 SimulatedEventLoopFactory simulated_event_loop_factory(
365 factory.configuration());
Austin Schuh44019f92019-05-19 19:58:27 -0700366 ::std::unique_ptr<EventLoop> event_loop =
Austin Schuh5f1cc5c2019-12-01 18:01:11 -0800367 simulated_event_loop_factory.MakeEventLoop("loop");
Austin Schuh44019f92019-05-19 19:58:27 -0700368
369 simulated_event_loop_factory.RunFor(chrono::seconds(1));
370
371 EXPECT_EQ(::aos::monotonic_clock::epoch() + chrono::seconds(1),
Austin Schuh44019f92019-05-19 19:58:27 -0700372 event_loop->monotonic_now());
373}
374
375// Test that running for a time with a periodic handler causes time to end
376// correctly.
377TEST(SimulatedEventLoopTest, RunForTimerHandler) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800378 SimulatedEventLoopTestFactory factory;
379
380 SimulatedEventLoopFactory simulated_event_loop_factory(
381 factory.configuration());
Austin Schuh44019f92019-05-19 19:58:27 -0700382 ::std::unique_ptr<EventLoop> event_loop =
Austin Schuh5f1cc5c2019-12-01 18:01:11 -0800383 simulated_event_loop_factory.MakeEventLoop("loop");
Austin Schuh44019f92019-05-19 19:58:27 -0700384
385 int counter = 0;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700386 auto timer = event_loop->AddTimer([&counter]() { ++counter; });
Austin Schuh44019f92019-05-19 19:58:27 -0700387 event_loop->OnRun([&event_loop, &timer] {
388 timer->Setup(event_loop->monotonic_now() + chrono::milliseconds(50),
389 chrono::milliseconds(100));
390 });
391
392 simulated_event_loop_factory.RunFor(chrono::seconds(1));
393
394 EXPECT_EQ(::aos::monotonic_clock::epoch() + chrono::seconds(1),
Austin Schuh44019f92019-05-19 19:58:27 -0700395 event_loop->monotonic_now());
396 EXPECT_EQ(counter, 10);
397}
398
Austin Schuh7d87b672019-12-01 20:23:49 -0800399// Tests that watchers have latency in simulation.
400TEST(SimulatedEventLoopTest, WatcherTimingReport) {
401 SimulatedEventLoopTestFactory factory;
402 factory.set_send_delay(std::chrono::microseconds(50));
403
404 FLAGS_timing_report_ms = 1000;
405 auto loop1 = factory.MakePrimary("primary");
406 loop1->MakeWatcher("/test", [](const TestMessage &) {});
407
408 auto loop2 = factory.Make("sender_loop");
409
410 auto loop3 = factory.Make("report_fetcher");
411
412 Fetcher<timing::Report> report_fetcher =
413 loop3->MakeFetcher<timing::Report>("/aos");
414 EXPECT_FALSE(report_fetcher.Fetch());
415
416 auto sender = loop2->MakeSender<TestMessage>("/test");
417
418 // Send 10 messages in the middle of a timing report period so we get
419 // something interesting back.
420 auto test_timer = loop2->AddTimer([&sender]() {
421 for (int i = 0; i < 10; ++i) {
422 aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
423 TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
424 builder.add_value(200 + i);
milind1f1dca32021-07-03 13:50:07 -0700425 msg.CheckOk(msg.Send(builder.Finish()));
Austin Schuh7d87b672019-12-01 20:23:49 -0800426 }
427 });
428
429 // Quit after 1 timing report, mid way through the next cycle.
430 {
431 auto end_timer = loop1->AddTimer([&factory]() { factory.Exit(); });
432 end_timer->Setup(loop1->monotonic_now() + chrono::milliseconds(2500));
433 end_timer->set_name("end");
434 }
435
436 loop1->OnRun([&test_timer, &loop1]() {
437 test_timer->Setup(loop1->monotonic_now() + chrono::milliseconds(1500));
438 });
439
440 factory.Run();
441
442 // And, since we are here, check that the timing report makes sense.
443 // Start by looking for our event loop's timing.
444 FlatbufferDetachedBuffer<timing::Report> primary_report =
445 FlatbufferDetachedBuffer<timing::Report>::Empty();
446 while (report_fetcher.FetchNext()) {
447 LOG(INFO) << "Report " << FlatbufferToJson(report_fetcher.get());
448 if (report_fetcher->name()->string_view() == "primary") {
449 primary_report = CopyFlatBuffer(report_fetcher.get());
450 }
451 }
452
453 // Check the watcher report.
Ravago Jonescf453ab2020-05-06 21:14:53 -0700454 VLOG(1) << FlatbufferToJson(primary_report, {.multi_line = true});
Austin Schuh7d87b672019-12-01 20:23:49 -0800455
456 EXPECT_EQ(primary_report.message().name()->string_view(), "primary");
457
458 // Just the timing report timer.
459 ASSERT_NE(primary_report.message().timers(), nullptr);
460 EXPECT_EQ(primary_report.message().timers()->size(), 2);
461
462 // No phased loops
463 ASSERT_EQ(primary_report.message().phased_loops(), nullptr);
464
465 // And now confirm that the watcher received all 10 messages, and has latency.
466 ASSERT_NE(primary_report.message().watchers(), nullptr);
467 ASSERT_EQ(primary_report.message().watchers()->size(), 1);
468 EXPECT_EQ(primary_report.message().watchers()->Get(0)->count(), 10);
469 EXPECT_NEAR(
470 primary_report.message().watchers()->Get(0)->wakeup_latency()->average(),
471 0.00005, 1e-9);
472 EXPECT_NEAR(
473 primary_report.message().watchers()->Get(0)->wakeup_latency()->min(),
474 0.00005, 1e-9);
475 EXPECT_NEAR(
476 primary_report.message().watchers()->Get(0)->wakeup_latency()->max(),
477 0.00005, 1e-9);
478 EXPECT_EQ(primary_report.message()
479 .watchers()
480 ->Get(0)
481 ->wakeup_latency()
482 ->standard_deviation(),
483 0.0);
484
485 EXPECT_EQ(
486 primary_report.message().watchers()->Get(0)->handler_time()->average(),
487 0.0);
488 EXPECT_EQ(primary_report.message().watchers()->Get(0)->handler_time()->min(),
489 0.0);
490 EXPECT_EQ(primary_report.message().watchers()->Get(0)->handler_time()->max(),
491 0.0);
492 EXPECT_EQ(primary_report.message()
493 .watchers()
494 ->Get(0)
495 ->handler_time()
496 ->standard_deviation(),
497 0.0);
498}
499
Austin Schuh89c9b812021-02-20 14:42:10 -0800500size_t CountAll(
501 const std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
502 &counters) {
503 size_t count = 0u;
504 for (const std::unique_ptr<MessageCounter<RemoteMessage>> &counter :
505 counters) {
506 count += counter->count();
507 }
508 return count;
509}
510
Austin Schuh4c3b9702020-08-30 11:34:55 -0700511// Tests that ping and pong work when on 2 different nodes, and the message
512// gateway messages are sent out as expected.
Austin Schuh89c9b812021-02-20 14:42:10 -0800513TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodePingPong) {
Austin Schuh898f4972020-01-11 17:21:25 -0800514 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
515 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
Austin Schuh4c3b9702020-08-30 11:34:55 -0700516 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
Austin Schuh898f4972020-01-11 17:21:25 -0800517
518 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
519
520 std::unique_ptr<EventLoop> ping_event_loop =
521 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
522 Ping ping(ping_event_loop.get());
523
524 std::unique_ptr<EventLoop> pong_event_loop =
525 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
526 Pong pong(pong_event_loop.get());
527
528 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
529 simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700530 MessageCounter<examples::Pong> pi2_pong_counter(
531 pi2_pong_counter_event_loop.get(), "/test");
Austin Schuh2f8fd752020-09-01 22:38:28 -0700532 aos::Fetcher<message_bridge::Timestamp> pi1_on_pi2_timestamp_fetcher =
533 pi2_pong_counter_event_loop->MakeFetcher<message_bridge::Timestamp>(
534 "/pi1/aos");
535 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
536 pi2_pong_counter_event_loop->MakeFetcher<examples::Ping>("/test");
Austin Schuh898f4972020-01-11 17:21:25 -0800537
Austin Schuh4c3b9702020-08-30 11:34:55 -0700538 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
539 simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
Austin Schuh898f4972020-01-11 17:21:25 -0800540
541 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
542 simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700543 MessageCounter<examples::Pong> pi1_pong_counter(
544 pi1_pong_counter_event_loop.get(), "/test");
Austin Schuh2f8fd752020-09-01 22:38:28 -0700545 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
546 pi1_pong_counter_event_loop->MakeFetcher<examples::Ping>("/test");
547 aos::Fetcher<message_bridge::Timestamp> pi1_on_pi1_timestamp_fetcher =
548 pi1_pong_counter_event_loop->MakeFetcher<message_bridge::Timestamp>(
549 "/aos");
550
Austin Schuh4c3b9702020-08-30 11:34:55 -0700551 // Count timestamps.
552 MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
553 pi1_pong_counter_event_loop.get(), "/pi1/aos");
554 MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
555 pi2_pong_counter_event_loop.get(), "/pi1/aos");
556 MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
557 pi3_pong_counter_event_loop.get(), "/pi1/aos");
558 MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
559 pi1_pong_counter_event_loop.get(), "/pi2/aos");
560 MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
561 pi2_pong_counter_event_loop.get(), "/pi2/aos");
562 MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
563 pi1_pong_counter_event_loop.get(), "/pi3/aos");
564 MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
565 pi3_pong_counter_event_loop.get(), "/pi3/aos");
566
Austin Schuh2f8fd752020-09-01 22:38:28 -0700567 // Count remote timestamps
Austin Schuh89c9b812021-02-20 14:42:10 -0800568 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
569 remote_timestamps_pi2_on_pi1 =
570 MakePi2OnPi1MessageCounters(pi1_pong_counter_event_loop.get());
571 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
572 remote_timestamps_pi1_on_pi2 =
573 MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700574
Austin Schuh4c3b9702020-08-30 11:34:55 -0700575 // Wait to let timestamp estimation start up before looking for the results.
576 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
577
Austin Schuh8fb315a2020-11-19 22:33:58 -0800578 std::unique_ptr<EventLoop> pi1_statistics_counter_event_loop =
579 simulated_event_loop_factory.MakeEventLoop("pi1_statistics_counter", pi1);
580 std::unique_ptr<EventLoop> pi2_statistics_counter_event_loop =
581 simulated_event_loop_factory.MakeEventLoop("pi2_statistics_counter", pi2);
582 std::unique_ptr<EventLoop> pi3_statistics_counter_event_loop =
583 simulated_event_loop_factory.MakeEventLoop("pi3_statistics_counter", pi3);
584
Austin Schuh4c3b9702020-08-30 11:34:55 -0700585 int pi1_server_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800586 pi1_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700587 "/pi1/aos", [&pi1_server_statistics_count](
588 const message_bridge::ServerStatistics &stats) {
589 VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
590 EXPECT_EQ(stats.connections()->size(), 2u);
591 for (const message_bridge::ServerConnection *connection :
592 *stats.connections()) {
593 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800594 EXPECT_EQ(connection->connection_count(), 1u);
595 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800596 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700597 if (connection->node()->name()->string_view() == "pi2") {
598 EXPECT_GT(connection->sent_packets(), 50);
599 } else if (connection->node()->name()->string_view() == "pi3") {
600 EXPECT_GE(connection->sent_packets(), 5);
601 } else {
602 LOG(FATAL) << "Unknown connection";
603 }
604
605 EXPECT_TRUE(connection->has_monotonic_offset());
606 EXPECT_EQ(connection->monotonic_offset(), 0);
607 }
608 ++pi1_server_statistics_count;
609 });
610
611 int pi2_server_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800612 pi2_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700613 "/pi2/aos", [&pi2_server_statistics_count](
614 const message_bridge::ServerStatistics &stats) {
615 VLOG(1) << "pi2 ServerStatistics " << FlatbufferToJson(&stats);
616 EXPECT_EQ(stats.connections()->size(), 1u);
617
618 const message_bridge::ServerConnection *connection =
619 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800620 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700621 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
622 EXPECT_GT(connection->sent_packets(), 50);
623 EXPECT_TRUE(connection->has_monotonic_offset());
624 EXPECT_EQ(connection->monotonic_offset(), 0);
Austin Schuh367a7f42021-11-23 23:04:36 -0800625 EXPECT_EQ(connection->connection_count(), 1u);
626 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700627 ++pi2_server_statistics_count;
628 });
629
630 int pi3_server_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800631 pi3_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700632 "/pi3/aos", [&pi3_server_statistics_count](
633 const message_bridge::ServerStatistics &stats) {
634 VLOG(1) << "pi3 ServerStatistics " << FlatbufferToJson(&stats);
635 EXPECT_EQ(stats.connections()->size(), 1u);
636
637 const message_bridge::ServerConnection *connection =
638 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800639 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700640 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
641 EXPECT_GE(connection->sent_packets(), 5);
642 EXPECT_TRUE(connection->has_monotonic_offset());
643 EXPECT_EQ(connection->monotonic_offset(), 0);
Austin Schuh367a7f42021-11-23 23:04:36 -0800644 EXPECT_EQ(connection->connection_count(), 1u);
645 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700646 ++pi3_server_statistics_count;
647 });
648
649 int pi1_client_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800650 pi1_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700651 "/pi1/aos", [&pi1_client_statistics_count](
652 const message_bridge::ClientStatistics &stats) {
653 VLOG(1) << "pi1 ClientStatistics " << FlatbufferToJson(&stats);
654 EXPECT_EQ(stats.connections()->size(), 2u);
655
656 for (const message_bridge::ClientConnection *connection :
657 *stats.connections()) {
658 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
659 if (connection->node()->name()->string_view() == "pi2") {
660 EXPECT_GT(connection->received_packets(), 50);
661 } else if (connection->node()->name()->string_view() == "pi3") {
662 EXPECT_GE(connection->received_packets(), 5);
663 } else {
664 LOG(FATAL) << "Unknown connection";
665 }
666
Austin Schuhe61d4382021-03-31 21:33:02 -0700667 EXPECT_EQ(connection->partial_deliveries(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700668 EXPECT_TRUE(connection->has_monotonic_offset());
669 EXPECT_EQ(connection->monotonic_offset(), 150000);
Austin Schuh367a7f42021-11-23 23:04:36 -0800670 EXPECT_EQ(connection->connection_count(), 1u);
671 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700672 }
673 ++pi1_client_statistics_count;
674 });
675
676 int pi2_client_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800677 pi2_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700678 "/pi2/aos", [&pi2_client_statistics_count](
679 const message_bridge::ClientStatistics &stats) {
680 VLOG(1) << "pi2 ClientStatistics " << FlatbufferToJson(&stats);
681 EXPECT_EQ(stats.connections()->size(), 1u);
682
683 const message_bridge::ClientConnection *connection =
684 stats.connections()->Get(0);
685 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
686 EXPECT_GT(connection->received_packets(), 50);
Austin Schuhe61d4382021-03-31 21:33:02 -0700687 EXPECT_EQ(connection->partial_deliveries(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700688 EXPECT_TRUE(connection->has_monotonic_offset());
689 EXPECT_EQ(connection->monotonic_offset(), 150000);
Austin Schuh367a7f42021-11-23 23:04:36 -0800690 EXPECT_EQ(connection->connection_count(), 1u);
691 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700692 ++pi2_client_statistics_count;
693 });
694
695 int pi3_client_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800696 pi3_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700697 "/pi3/aos", [&pi3_client_statistics_count](
698 const message_bridge::ClientStatistics &stats) {
699 VLOG(1) << "pi3 ClientStatistics " << FlatbufferToJson(&stats);
700 EXPECT_EQ(stats.connections()->size(), 1u);
701
702 const message_bridge::ClientConnection *connection =
703 stats.connections()->Get(0);
704 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
705 EXPECT_GE(connection->received_packets(), 5);
Austin Schuhe61d4382021-03-31 21:33:02 -0700706 EXPECT_EQ(connection->partial_deliveries(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700707 EXPECT_TRUE(connection->has_monotonic_offset());
708 EXPECT_EQ(connection->monotonic_offset(), 150000);
Austin Schuh367a7f42021-11-23 23:04:36 -0800709 EXPECT_EQ(connection->connection_count(), 1u);
710 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700711 ++pi3_client_statistics_count;
712 });
713
Austin Schuh2f8fd752020-09-01 22:38:28 -0700714 // Find the channel index for both the /pi1/aos Timestamp channel and Ping
715 // channel.
716 const size_t pi1_timestamp_channel =
717 configuration::ChannelIndex(pi1_pong_counter_event_loop->configuration(),
718 pi1_on_pi2_timestamp_fetcher.channel());
719 const size_t ping_timestamp_channel =
720 configuration::ChannelIndex(pi1_pong_counter_event_loop->configuration(),
721 ping_on_pi2_fetcher.channel());
722
723 for (const Channel *channel :
724 *pi1_pong_counter_event_loop->configuration()->channels()) {
725 VLOG(1) << "Channel "
726 << configuration::ChannelIndex(
727 pi1_pong_counter_event_loop->configuration(), channel)
728 << " " << configuration::CleanedChannelToString(channel);
729 }
730
Austin Schuh8fb315a2020-11-19 22:33:58 -0800731 std::unique_ptr<EventLoop> pi1_remote_timestamp =
732 simulated_event_loop_factory.MakeEventLoop("pi1_remote_timestamp", pi1);
733
Austin Schuh89c9b812021-02-20 14:42:10 -0800734 for (std::pair<int, std::string> channel :
735 shared()
736 ? std::vector<std::pair<
737 int, std::string>>{{-1, "/pi1/aos/remote_timestamps/pi2"}}
738 : std::vector<std::pair<int, std::string>>{
739 {pi1_timestamp_channel,
740 "/pi1/aos/remote_timestamps/pi2/pi1/aos/"
741 "aos-message_bridge-Timestamp"},
742 {ping_timestamp_channel,
743 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping"}}) {
744 // For each remote timestamp we get back, confirm that it is either a ping
745 // message, or a timestamp we sent out. Also confirm that the timestamps
746 // are correct.
747 pi1_remote_timestamp->MakeWatcher(
748 channel.second,
749 [pi1_timestamp_channel, ping_timestamp_channel, &ping_on_pi2_fetcher,
750 &ping_on_pi1_fetcher, &pi1_on_pi2_timestamp_fetcher,
751 &pi1_on_pi1_timestamp_fetcher, &simulated_event_loop_factory, pi2,
752 channel_index = channel.first](const RemoteMessage &header) {
753 VLOG(1) << aos::FlatbufferToJson(&header);
754 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuhcdd90272021-03-15 12:46:16 -0700755 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
Austin Schuh89c9b812021-02-20 14:42:10 -0800756 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
Austin Schuhcdd90272021-03-15 12:46:16 -0700757 ->boot_uuid());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700758
Austin Schuh89c9b812021-02-20 14:42:10 -0800759 const aos::monotonic_clock::time_point header_monotonic_sent_time(
760 chrono::nanoseconds(header.monotonic_sent_time()));
761 const aos::realtime_clock::time_point header_realtime_sent_time(
762 chrono::nanoseconds(header.realtime_sent_time()));
763 const aos::monotonic_clock::time_point header_monotonic_remote_time(
764 chrono::nanoseconds(header.monotonic_remote_time()));
765 const aos::realtime_clock::time_point header_realtime_remote_time(
766 chrono::nanoseconds(header.realtime_remote_time()));
Austin Schuh2f8fd752020-09-01 22:38:28 -0700767
Austin Schuh89c9b812021-02-20 14:42:10 -0800768 if (channel_index != -1) {
769 ASSERT_EQ(channel_index, header.channel_index());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700770 }
771
Austin Schuh89c9b812021-02-20 14:42:10 -0800772 const Context *pi1_context = nullptr;
773 const Context *pi2_context = nullptr;
774
775 if (header.channel_index() == pi1_timestamp_channel) {
776 // Find the forwarded message.
777 while (pi1_on_pi2_timestamp_fetcher.context().monotonic_event_time <
778 header_monotonic_sent_time) {
779 ASSERT_TRUE(pi1_on_pi2_timestamp_fetcher.FetchNext());
780 }
781
782 // And the source message.
783 while (pi1_on_pi1_timestamp_fetcher.context().monotonic_event_time <
784 header_monotonic_remote_time) {
785 ASSERT_TRUE(pi1_on_pi1_timestamp_fetcher.FetchNext());
786 }
787
788 pi1_context = &pi1_on_pi1_timestamp_fetcher.context();
789 pi2_context = &pi1_on_pi2_timestamp_fetcher.context();
790 } else if (header.channel_index() == ping_timestamp_channel) {
791 // Find the forwarded message.
792 while (ping_on_pi2_fetcher.context().monotonic_event_time <
793 header_monotonic_sent_time) {
794 ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
795 }
796
797 // And the source message.
798 while (ping_on_pi1_fetcher.context().monotonic_event_time <
799 header_monotonic_remote_time) {
800 ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
801 }
802
803 pi1_context = &ping_on_pi1_fetcher.context();
804 pi2_context = &ping_on_pi2_fetcher.context();
805 } else {
806 LOG(FATAL) << "Unknown channel";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700807 }
808
Austin Schuh89c9b812021-02-20 14:42:10 -0800809 // Confirm the forwarded message has matching timestamps to the
810 // timestamps we got back.
811 EXPECT_EQ(pi2_context->queue_index, header.queue_index());
812 EXPECT_EQ(pi2_context->remote_queue_index,
813 header.remote_queue_index());
814 EXPECT_EQ(pi2_context->monotonic_event_time,
815 header_monotonic_sent_time);
816 EXPECT_EQ(pi2_context->realtime_event_time,
817 header_realtime_sent_time);
818 EXPECT_EQ(pi2_context->realtime_remote_time,
819 header_realtime_remote_time);
820 EXPECT_EQ(pi2_context->monotonic_remote_time,
821 header_monotonic_remote_time);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700822
Austin Schuh89c9b812021-02-20 14:42:10 -0800823 // Confirm the forwarded message also matches the source message.
824 EXPECT_EQ(pi1_context->queue_index, header.remote_queue_index());
825 EXPECT_EQ(pi1_context->monotonic_event_time,
826 header_monotonic_remote_time);
827 EXPECT_EQ(pi1_context->realtime_event_time,
828 header_realtime_remote_time);
829 });
830 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700831
Austin Schuh4c3b9702020-08-30 11:34:55 -0700832 simulated_event_loop_factory.RunFor(chrono::seconds(10) -
833 chrono::milliseconds(500) +
834 chrono::milliseconds(5));
835
836 EXPECT_EQ(pi1_pong_counter.count(), 1001);
837 EXPECT_EQ(pi2_pong_counter.count(), 1001);
838
839 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 100);
840 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 100);
841 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 100);
842 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 100);
843 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 100);
844 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 100);
845 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 100);
846
Austin Schuh20ac95d2020-12-05 17:24:19 -0800847 EXPECT_EQ(pi1_server_statistics_count, 10);
848 EXPECT_EQ(pi2_server_statistics_count, 10);
849 EXPECT_EQ(pi3_server_statistics_count, 10);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700850
851 EXPECT_EQ(pi1_client_statistics_count, 95);
852 EXPECT_EQ(pi2_client_statistics_count, 95);
853 EXPECT_EQ(pi3_client_statistics_count, 95);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700854
855 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -0800856 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 1101);
857 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 1101);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700858}
859
860// Tests that an offset between nodes can be recovered and shows up in
861// ServerStatistics correctly.
862TEST(SimulatedEventLoopTest, MultinodePingPongWithOffset) {
863 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
Austin Schuh373f1762021-06-02 21:07:09 -0700864 aos::configuration::ReadConfig(ArtifactPath(
865 "aos/events/multinode_pingpong_test_combined_config.json"));
Austin Schuh4c3b9702020-08-30 11:34:55 -0700866 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
Austin Schuh87dd3832021-01-01 23:07:31 -0800867 const size_t pi1_index = configuration::GetNodeIndex(&config.message(), pi1);
868 ASSERT_EQ(pi1_index, 0u);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700869 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
Austin Schuh87dd3832021-01-01 23:07:31 -0800870 const size_t pi2_index = configuration::GetNodeIndex(&config.message(), pi2);
871 ASSERT_EQ(pi2_index, 1u);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700872 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
Austin Schuh87dd3832021-01-01 23:07:31 -0800873 const size_t pi3_index = configuration::GetNodeIndex(&config.message(), pi3);
874 ASSERT_EQ(pi3_index, 2u);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700875
Austin Schuh87dd3832021-01-01 23:07:31 -0800876 message_bridge::TestingTimeConverter time(
877 configuration::NodesCount(&config.message()));
Austin Schuh4c3b9702020-08-30 11:34:55 -0700878 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
Austin Schuh58646e22021-08-23 23:51:46 -0700879 simulated_event_loop_factory.SetTimeConverter(&time);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700880
881 constexpr chrono::milliseconds kOffset{1501};
Austin Schuh87dd3832021-01-01 23:07:31 -0800882 time.AddNextTimestamp(
883 distributed_clock::epoch(),
Austin Schuh58646e22021-08-23 23:51:46 -0700884 {BootTimestamp::epoch(), BootTimestamp::epoch() + kOffset,
885 BootTimestamp::epoch()});
Austin Schuh4c3b9702020-08-30 11:34:55 -0700886
887 std::unique_ptr<EventLoop> ping_event_loop =
888 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
889 Ping ping(ping_event_loop.get());
890
891 std::unique_ptr<EventLoop> pong_event_loop =
892 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
893 Pong pong(pong_event_loop.get());
894
Austin Schuh8fb315a2020-11-19 22:33:58 -0800895 // Wait to let timestamp estimation start up before looking for the results.
896 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
897
Austin Schuh87dd3832021-01-01 23:07:31 -0800898 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
899 simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
900
Austin Schuh4c3b9702020-08-30 11:34:55 -0700901 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
902 simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
903
904 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
905 simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
906
Austin Schuh4c3b9702020-08-30 11:34:55 -0700907 // Confirm the offsets are being recovered correctly.
908 int pi1_server_statistics_count = 0;
909 pi1_pong_counter_event_loop->MakeWatcher(
910 "/pi1/aos", [&pi1_server_statistics_count,
911 kOffset](const message_bridge::ServerStatistics &stats) {
912 VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
913 EXPECT_EQ(stats.connections()->size(), 2u);
914 for (const message_bridge::ServerConnection *connection :
915 *stats.connections()) {
916 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800917 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700918 if (connection->node()->name()->string_view() == "pi2") {
919 EXPECT_EQ(connection->monotonic_offset(),
920 chrono::nanoseconds(kOffset).count());
921 } else if (connection->node()->name()->string_view() == "pi3") {
922 EXPECT_EQ(connection->monotonic_offset(), 0);
923 } else {
924 LOG(FATAL) << "Unknown connection";
925 }
926
927 EXPECT_TRUE(connection->has_monotonic_offset());
928 }
929 ++pi1_server_statistics_count;
930 });
931
932 int pi2_server_statistics_count = 0;
933 pi2_pong_counter_event_loop->MakeWatcher(
934 "/pi2/aos", [&pi2_server_statistics_count,
935 kOffset](const message_bridge::ServerStatistics &stats) {
936 VLOG(1) << "pi2 ServerStatistics " << FlatbufferToJson(&stats);
937 EXPECT_EQ(stats.connections()->size(), 1u);
938
939 const message_bridge::ServerConnection *connection =
940 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800941 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700942 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
943 EXPECT_TRUE(connection->has_monotonic_offset());
944 EXPECT_EQ(connection->monotonic_offset(),
945 -chrono::nanoseconds(kOffset).count());
946 ++pi2_server_statistics_count;
947 });
948
949 int pi3_server_statistics_count = 0;
950 pi3_pong_counter_event_loop->MakeWatcher(
951 "/pi3/aos", [&pi3_server_statistics_count](
952 const message_bridge::ServerStatistics &stats) {
953 VLOG(1) << "pi3 ServerStatistics " << FlatbufferToJson(&stats);
954 EXPECT_EQ(stats.connections()->size(), 1u);
955
956 const message_bridge::ServerConnection *connection =
957 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800958 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700959 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
960 EXPECT_TRUE(connection->has_monotonic_offset());
961 EXPECT_EQ(connection->monotonic_offset(), 0);
962 ++pi3_server_statistics_count;
963 });
964
965 simulated_event_loop_factory.RunFor(chrono::seconds(10) -
966 chrono::milliseconds(500) +
967 chrono::milliseconds(5));
968
Austin Schuh20ac95d2020-12-05 17:24:19 -0800969 EXPECT_EQ(pi1_server_statistics_count, 10);
Austin Schuh58646e22021-08-23 23:51:46 -0700970 EXPECT_EQ(pi2_server_statistics_count, 10);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800971 EXPECT_EQ(pi3_server_statistics_count, 10);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700972}
973
974// Test that disabling statistics actually disables them.
Austin Schuh89c9b812021-02-20 14:42:10 -0800975TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeWithoutStatistics) {
Austin Schuh4c3b9702020-08-30 11:34:55 -0700976 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
977 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
978 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
979
980 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
981 simulated_event_loop_factory.DisableStatistics();
982
983 std::unique_ptr<EventLoop> ping_event_loop =
984 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
985 Ping ping(ping_event_loop.get());
986
987 std::unique_ptr<EventLoop> pong_event_loop =
988 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
989 Pong pong(pong_event_loop.get());
990
991 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
992 simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
993
994 MessageCounter<examples::Pong> pi2_pong_counter(
995 pi2_pong_counter_event_loop.get(), "/test");
996
997 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
998 simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
999
1000 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
1001 simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
1002
1003 MessageCounter<examples::Pong> pi1_pong_counter(
1004 pi1_pong_counter_event_loop.get(), "/test");
1005
1006 // Count timestamps.
1007 MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
1008 pi1_pong_counter_event_loop.get(), "/pi1/aos");
1009 MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
1010 pi2_pong_counter_event_loop.get(), "/pi1/aos");
1011 MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
1012 pi3_pong_counter_event_loop.get(), "/pi1/aos");
1013 MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
1014 pi1_pong_counter_event_loop.get(), "/pi2/aos");
1015 MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
1016 pi2_pong_counter_event_loop.get(), "/pi2/aos");
1017 MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
1018 pi1_pong_counter_event_loop.get(), "/pi3/aos");
1019 MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
1020 pi3_pong_counter_event_loop.get(), "/pi3/aos");
1021
Austin Schuh2f8fd752020-09-01 22:38:28 -07001022 // Count remote timestamps
Austin Schuh89c9b812021-02-20 14:42:10 -08001023 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1024 remote_timestamps_pi2_on_pi1 =
1025 MakePi2OnPi1MessageCounters(pi1_pong_counter_event_loop.get());
1026 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1027 remote_timestamps_pi1_on_pi2 =
1028 MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
Austin Schuh2f8fd752020-09-01 22:38:28 -07001029
Austin Schuh4c3b9702020-08-30 11:34:55 -07001030 MessageCounter<message_bridge::ServerStatistics>
1031 pi1_server_statistics_counter(pi1_pong_counter_event_loop.get(),
1032 "/pi1/aos");
1033 MessageCounter<message_bridge::ServerStatistics>
1034 pi2_server_statistics_counter(pi2_pong_counter_event_loop.get(),
1035 "/pi2/aos");
1036 MessageCounter<message_bridge::ServerStatistics>
1037 pi3_server_statistics_counter(pi3_pong_counter_event_loop.get(),
1038 "/pi3/aos");
1039
1040 MessageCounter<message_bridge::ClientStatistics>
1041 pi1_client_statistics_counter(pi1_pong_counter_event_loop.get(),
1042 "/pi1/aos");
1043 MessageCounter<message_bridge::ClientStatistics>
1044 pi2_client_statistics_counter(pi2_pong_counter_event_loop.get(),
1045 "/pi2/aos");
1046 MessageCounter<message_bridge::ClientStatistics>
1047 pi3_client_statistics_counter(pi3_pong_counter_event_loop.get(),
1048 "/pi3/aos");
Austin Schuh898f4972020-01-11 17:21:25 -08001049
1050 simulated_event_loop_factory.RunFor(chrono::seconds(10) +
1051 chrono::milliseconds(5));
1052
Austin Schuh4c3b9702020-08-30 11:34:55 -07001053 EXPECT_EQ(pi1_pong_counter.count(), 1001u);
1054 EXPECT_EQ(pi2_pong_counter.count(), 1001u);
1055
1056 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 0u);
1057 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 0u);
1058 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 0u);
1059 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 0u);
1060 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 0u);
1061 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 0u);
1062 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 0u);
1063
1064 EXPECT_EQ(pi1_server_statistics_counter.count(), 0u);
1065 EXPECT_EQ(pi2_server_statistics_counter.count(), 0u);
1066 EXPECT_EQ(pi3_server_statistics_counter.count(), 0u);
1067
1068 EXPECT_EQ(pi1_client_statistics_counter.count(), 0u);
1069 EXPECT_EQ(pi2_client_statistics_counter.count(), 0u);
1070 EXPECT_EQ(pi3_client_statistics_counter.count(), 0u);
Austin Schuh2f8fd752020-09-01 22:38:28 -07001071
1072 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001073 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 1001);
1074 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 1001);
Austin Schuh898f4972020-01-11 17:21:25 -08001075}
1076
Austin Schuhc0b0f722020-12-12 18:36:06 -08001077bool AllConnected(const message_bridge::ServerStatistics *server_statistics) {
1078 for (const message_bridge::ServerConnection *connection :
1079 *server_statistics->connections()) {
1080 if (connection->state() != message_bridge::State::CONNECTED) {
1081 return false;
1082 }
1083 }
1084 return true;
1085}
1086
1087bool AllConnectedBut(const message_bridge::ServerStatistics *server_statistics,
1088 std::string_view target) {
1089 for (const message_bridge::ServerConnection *connection :
1090 *server_statistics->connections()) {
1091 if (connection->node()->name()->string_view() == target) {
1092 if (connection->state() == message_bridge::State::CONNECTED) {
1093 return false;
1094 }
1095 } else {
1096 if (connection->state() != message_bridge::State::CONNECTED) {
1097 return false;
1098 }
1099 }
1100 }
1101 return true;
1102}
1103
1104bool AllConnected(const message_bridge::ClientStatistics *client_statistics) {
1105 for (const message_bridge::ClientConnection *connection :
1106 *client_statistics->connections()) {
1107 if (connection->state() != message_bridge::State::CONNECTED) {
1108 return false;
1109 }
Austin Schuh367a7f42021-11-23 23:04:36 -08001110 EXPECT_TRUE(connection->has_boot_uuid());
1111 EXPECT_TRUE(connection->has_connected_since_time());
1112 EXPECT_TRUE(connection->has_connection_count());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001113 }
1114 return true;
1115}
1116
1117bool AllConnectedBut(const message_bridge::ClientStatistics *client_statistics,
1118 std::string_view target) {
1119 for (const message_bridge::ClientConnection *connection :
1120 *client_statistics->connections()) {
1121 if (connection->node()->name()->string_view() == target) {
1122 if (connection->state() == message_bridge::State::CONNECTED) {
1123 return false;
1124 }
Austin Schuh367a7f42021-11-23 23:04:36 -08001125 EXPECT_FALSE(connection->has_boot_uuid());
1126 EXPECT_FALSE(connection->has_connected_since_time());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001127 } else {
1128 if (connection->state() != message_bridge::State::CONNECTED) {
1129 return false;
1130 }
Austin Schuh367a7f42021-11-23 23:04:36 -08001131 EXPECT_TRUE(connection->has_boot_uuid());
1132 EXPECT_TRUE(connection->has_connected_since_time());
1133 EXPECT_TRUE(connection->has_connection_count());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001134 }
1135 }
1136 return true;
1137}
1138
Austin Schuh367a7f42021-11-23 23:04:36 -08001139int ConnectedCount(const message_bridge::ClientStatistics *client_statistics,
1140 std::string_view target) {
1141 for (const message_bridge::ClientConnection *connection :
1142 *client_statistics->connections()) {
1143 if (connection->node()->name()->string_view() == target) {
1144 return connection->connection_count();
1145 }
1146 }
1147 return 0;
1148}
1149
1150int ConnectedCount(const message_bridge::ServerStatistics *server_statistics,
1151 std::string_view target) {
1152 for (const message_bridge::ServerConnection *connection :
1153 *server_statistics->connections()) {
1154 if (connection->node()->name()->string_view() == target) {
1155 return connection->connection_count();
1156 }
1157 }
1158 return 0;
1159}
1160
Austin Schuhc0b0f722020-12-12 18:36:06 -08001161// Test that disconnecting nodes actually disconnects them.
Austin Schuh89c9b812021-02-20 14:42:10 -08001162TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeDisconnect) {
Austin Schuhc0b0f722020-12-12 18:36:06 -08001163 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
1164
Austin Schuh58646e22021-08-23 23:51:46 -07001165 NodeEventLoopFactory *pi1 =
1166 simulated_event_loop_factory.GetNodeEventLoopFactory("pi1");
1167 NodeEventLoopFactory *pi2 =
1168 simulated_event_loop_factory.GetNodeEventLoopFactory("pi2");
1169 NodeEventLoopFactory *pi3 =
1170 simulated_event_loop_factory.GetNodeEventLoopFactory("pi3");
1171
1172 std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001173 Ping ping(ping_event_loop.get());
1174
Austin Schuh58646e22021-08-23 23:51:46 -07001175 std::unique_ptr<EventLoop> pong_event_loop = pi2->MakeEventLoop("pong");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001176 Pong pong(pong_event_loop.get());
1177
1178 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
Austin Schuh58646e22021-08-23 23:51:46 -07001179 pi2->MakeEventLoop("pi2_pong_counter");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001180
1181 MessageCounter<examples::Pong> pi2_pong_counter(
1182 pi2_pong_counter_event_loop.get(), "/test");
1183
1184 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
Austin Schuh58646e22021-08-23 23:51:46 -07001185 pi3->MakeEventLoop("pi3_pong_counter");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001186
1187 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
Austin Schuh58646e22021-08-23 23:51:46 -07001188 pi1->MakeEventLoop("pi1_pong_counter");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001189
1190 MessageCounter<examples::Pong> pi1_pong_counter(
1191 pi1_pong_counter_event_loop.get(), "/test");
1192
1193 // Count timestamps.
1194 MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
1195 pi1_pong_counter_event_loop.get(), "/pi1/aos");
1196 MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
1197 pi2_pong_counter_event_loop.get(), "/pi1/aos");
1198 MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
1199 pi3_pong_counter_event_loop.get(), "/pi1/aos");
1200 MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
1201 pi1_pong_counter_event_loop.get(), "/pi2/aos");
1202 MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
1203 pi2_pong_counter_event_loop.get(), "/pi2/aos");
1204 MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
1205 pi1_pong_counter_event_loop.get(), "/pi3/aos");
1206 MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
1207 pi3_pong_counter_event_loop.get(), "/pi3/aos");
1208
1209 // Count remote timestamps
Austin Schuh89c9b812021-02-20 14:42:10 -08001210 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1211 remote_timestamps_pi2_on_pi1 =
1212 MakePi2OnPi1MessageCounters(pi1_pong_counter_event_loop.get());
1213 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1214 remote_timestamps_pi1_on_pi2 =
1215 MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001216
1217 MessageCounter<message_bridge::ServerStatistics>
Austin Schuh58646e22021-08-23 23:51:46 -07001218 *pi1_server_statistics_counter;
1219 pi1->OnStartup([pi1, &pi1_server_statistics_counter]() {
1220 pi1_server_statistics_counter =
1221 pi1->AlwaysStart<MessageCounter<message_bridge::ServerStatistics>>(
1222 "pi1_server_statistics_counter", "/pi1/aos");
1223 });
1224
Austin Schuhc0b0f722020-12-12 18:36:06 -08001225 aos::Fetcher<message_bridge::ServerStatistics> pi1_server_statistics_fetcher =
1226 pi1_pong_counter_event_loop
1227 ->MakeFetcher<message_bridge::ServerStatistics>("/pi1/aos");
1228 aos::Fetcher<message_bridge::ClientStatistics> pi1_client_statistics_fetcher =
1229 pi1_pong_counter_event_loop
1230 ->MakeFetcher<message_bridge::ClientStatistics>("/pi1/aos");
1231
1232 MessageCounter<message_bridge::ServerStatistics>
Austin Schuh58646e22021-08-23 23:51:46 -07001233 *pi2_server_statistics_counter;
1234 pi2->OnStartup([pi2, &pi2_server_statistics_counter]() {
1235 pi2_server_statistics_counter =
1236 pi2->AlwaysStart<MessageCounter<message_bridge::ServerStatistics>>(
1237 "pi2_server_statistics_counter", "/pi2/aos");
1238 });
Austin Schuhc0b0f722020-12-12 18:36:06 -08001239 aos::Fetcher<message_bridge::ServerStatistics> pi2_server_statistics_fetcher =
1240 pi2_pong_counter_event_loop
1241 ->MakeFetcher<message_bridge::ServerStatistics>("/pi2/aos");
1242 aos::Fetcher<message_bridge::ClientStatistics> pi2_client_statistics_fetcher =
1243 pi2_pong_counter_event_loop
1244 ->MakeFetcher<message_bridge::ClientStatistics>("/pi2/aos");
1245
1246 MessageCounter<message_bridge::ServerStatistics>
Austin Schuh58646e22021-08-23 23:51:46 -07001247 *pi3_server_statistics_counter;
1248 pi3->OnStartup([pi3, &pi3_server_statistics_counter]() {
1249 pi3_server_statistics_counter =
1250 pi3->AlwaysStart<MessageCounter<message_bridge::ServerStatistics>>(
1251 "pi3_server_statistics_counter", "/pi3/aos");
1252 });
Austin Schuhc0b0f722020-12-12 18:36:06 -08001253 aos::Fetcher<message_bridge::ServerStatistics> pi3_server_statistics_fetcher =
1254 pi3_pong_counter_event_loop
1255 ->MakeFetcher<message_bridge::ServerStatistics>("/pi3/aos");
1256 aos::Fetcher<message_bridge::ClientStatistics> pi3_client_statistics_fetcher =
1257 pi3_pong_counter_event_loop
1258 ->MakeFetcher<message_bridge::ClientStatistics>("/pi3/aos");
1259
1260 MessageCounter<message_bridge::ClientStatistics>
1261 pi1_client_statistics_counter(pi1_pong_counter_event_loop.get(),
1262 "/pi1/aos");
1263 MessageCounter<message_bridge::ClientStatistics>
1264 pi2_client_statistics_counter(pi2_pong_counter_event_loop.get(),
1265 "/pi2/aos");
1266 MessageCounter<message_bridge::ClientStatistics>
1267 pi3_client_statistics_counter(pi3_pong_counter_event_loop.get(),
1268 "/pi3/aos");
1269
1270 simulated_event_loop_factory.RunFor(chrono::seconds(2) +
1271 chrono::milliseconds(5));
1272
1273 EXPECT_EQ(pi1_pong_counter.count(), 201u);
1274 EXPECT_EQ(pi2_pong_counter.count(), 201u);
1275
1276 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 20u);
1277 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 20u);
1278 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 20u);
1279 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 20u);
1280 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 20u);
1281 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 20u);
1282 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 20u);
1283
Austin Schuh58646e22021-08-23 23:51:46 -07001284 EXPECT_EQ(pi1_server_statistics_counter->count(), 2u);
1285 EXPECT_EQ(pi2_server_statistics_counter->count(), 2u);
1286 EXPECT_EQ(pi3_server_statistics_counter->count(), 2u);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001287
1288 EXPECT_EQ(pi1_client_statistics_counter.count(), 20u);
1289 EXPECT_EQ(pi2_client_statistics_counter.count(), 20u);
1290 EXPECT_EQ(pi3_client_statistics_counter.count(), 20u);
1291
1292 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001293 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 221);
1294 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 221);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001295
1296 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1297 EXPECT_TRUE(AllConnected(pi1_server_statistics_fetcher.get()))
1298 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1299 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1300 EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
1301 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1302 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1303 EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
1304 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1305 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1306 EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
1307 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1308 EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
1309 EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
1310 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
1311 EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
1312 EXPECT_TRUE(AllConnected(pi3_client_statistics_fetcher.get()))
1313 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1314
Austin Schuh58646e22021-08-23 23:51:46 -07001315 pi1->Disconnect(pi3->node());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001316
1317 simulated_event_loop_factory.RunFor(chrono::seconds(2));
1318
1319 EXPECT_EQ(pi1_pong_counter.count(), 401u);
1320 EXPECT_EQ(pi2_pong_counter.count(), 401u);
1321
1322 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 40u);
1323 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 40u);
1324 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 20u);
1325 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 40u);
1326 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 40u);
1327 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 40u);
1328 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 40u);
1329
Austin Schuh58646e22021-08-23 23:51:46 -07001330 EXPECT_EQ(pi1_server_statistics_counter->count(), 4u);
1331 EXPECT_EQ(pi2_server_statistics_counter->count(), 4u);
1332 EXPECT_EQ(pi3_server_statistics_counter->count(), 4u);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001333
1334 EXPECT_EQ(pi1_client_statistics_counter.count(), 40u);
1335 EXPECT_EQ(pi2_client_statistics_counter.count(), 40u);
1336 EXPECT_EQ(pi3_client_statistics_counter.count(), 40u);
1337
1338 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001339 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 441);
1340 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 441);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001341
1342 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1343 EXPECT_TRUE(AllConnectedBut(pi1_server_statistics_fetcher.get(), "pi3"))
1344 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1345 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1346 EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
1347 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1348 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1349 EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
1350 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1351 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1352 EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
1353 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1354 EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
1355 EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
1356 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
1357 EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
1358 EXPECT_TRUE(AllConnectedBut(pi3_client_statistics_fetcher.get(), "pi1"))
1359 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1360
Austin Schuh58646e22021-08-23 23:51:46 -07001361 pi1->Connect(pi3->node());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001362
1363 simulated_event_loop_factory.RunFor(chrono::seconds(2));
1364
Austin Schuh367a7f42021-11-23 23:04:36 -08001365 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1366 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1367 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1368 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1369 EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
1370 EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
1371
1372 EXPECT_EQ(ConnectedCount(pi1_server_statistics_fetcher.get(), "pi3"), 2u)
1373 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1374 EXPECT_EQ(ConnectedCount(pi1_server_statistics_fetcher.get(), "pi2"), 1u)
1375 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1376 EXPECT_EQ(ConnectedCount(pi1_client_statistics_fetcher.get(), "pi3"), 1u)
1377 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1378 EXPECT_EQ(ConnectedCount(pi1_client_statistics_fetcher.get(), "pi2"), 1u)
1379 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1380
1381 EXPECT_EQ(ConnectedCount(pi2_server_statistics_fetcher.get(), "pi1"), 1u)
1382 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1383 EXPECT_EQ(ConnectedCount(pi2_client_statistics_fetcher.get(), "pi1"), 1u)
1384 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1385
1386 EXPECT_EQ(ConnectedCount(pi3_server_statistics_fetcher.get(), "pi1"), 1u)
1387 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
1388 EXPECT_EQ(ConnectedCount(pi3_client_statistics_fetcher.get(), "pi1"), 2u)
1389 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1390
Austin Schuhc0b0f722020-12-12 18:36:06 -08001391 EXPECT_EQ(pi1_pong_counter.count(), 601u);
1392 EXPECT_EQ(pi2_pong_counter.count(), 601u);
1393
1394 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 60u);
1395 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 60u);
1396 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 40u);
1397 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 60u);
1398 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 60u);
1399 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 60u);
1400 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 60u);
1401
Austin Schuh58646e22021-08-23 23:51:46 -07001402 EXPECT_EQ(pi1_server_statistics_counter->count(), 6u);
1403 EXPECT_EQ(pi2_server_statistics_counter->count(), 6u);
1404 EXPECT_EQ(pi3_server_statistics_counter->count(), 6u);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001405
1406 EXPECT_EQ(pi1_client_statistics_counter.count(), 60u);
1407 EXPECT_EQ(pi2_client_statistics_counter.count(), 60u);
1408 EXPECT_EQ(pi3_client_statistics_counter.count(), 60u);
1409
1410 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001411 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 661);
1412 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 661);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001413
Austin Schuhc0b0f722020-12-12 18:36:06 -08001414 EXPECT_TRUE(AllConnected(pi1_server_statistics_fetcher.get()))
1415 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001416 EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
1417 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001418 EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
1419 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001420 EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
1421 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001422 EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
1423 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001424 EXPECT_TRUE(AllConnected(pi3_client_statistics_fetcher.get()))
1425 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1426}
1427
Austin Schuh2febf0d2020-09-21 22:24:30 -07001428// Tests that the time offset having a slope doesn't break the world.
1429// SimulatedMessageBridge has enough self consistency CHECK statements to
1430// confirm, and we can can also check a message in each direction to make sure
1431// it gets delivered as expected.
1432TEST(SimulatedEventLoopTest, MultinodePingPongWithOffsetAndSlope) {
1433 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
Austin Schuh373f1762021-06-02 21:07:09 -07001434 aos::configuration::ReadConfig(ArtifactPath(
1435 "aos/events/multinode_pingpong_test_combined_config.json"));
Austin Schuh2febf0d2020-09-21 22:24:30 -07001436 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
Austin Schuh87dd3832021-01-01 23:07:31 -08001437 const size_t pi1_index = configuration::GetNodeIndex(&config.message(), pi1);
1438 ASSERT_EQ(pi1_index, 0u);
Austin Schuh2febf0d2020-09-21 22:24:30 -07001439 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
Austin Schuh87dd3832021-01-01 23:07:31 -08001440 const size_t pi2_index = configuration::GetNodeIndex(&config.message(), pi2);
1441 ASSERT_EQ(pi2_index, 1u);
1442 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
1443 const size_t pi3_index = configuration::GetNodeIndex(&config.message(), pi3);
1444 ASSERT_EQ(pi3_index, 2u);
Austin Schuh2febf0d2020-09-21 22:24:30 -07001445
Austin Schuh87dd3832021-01-01 23:07:31 -08001446 message_bridge::TestingTimeConverter time(
1447 configuration::NodesCount(&config.message()));
Austin Schuh2febf0d2020-09-21 22:24:30 -07001448 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
Austin Schuh58646e22021-08-23 23:51:46 -07001449 simulated_event_loop_factory.SetTimeConverter(&time);
Austin Schuh2febf0d2020-09-21 22:24:30 -07001450
Austin Schuh2febf0d2020-09-21 22:24:30 -07001451 constexpr chrono::milliseconds kOffset{150100};
Austin Schuh87dd3832021-01-01 23:07:31 -08001452 time.AddNextTimestamp(
1453 distributed_clock::epoch(),
Austin Schuh58646e22021-08-23 23:51:46 -07001454 {BootTimestamp::epoch(), BootTimestamp::epoch() + kOffset,
1455 BootTimestamp::epoch()});
1456 time.AddNextTimestamp(distributed_clock::epoch() + chrono::seconds(10),
1457 {BootTimestamp::epoch() + chrono::milliseconds(9999),
1458 BootTimestamp::epoch() + kOffset + chrono::seconds(10),
1459 BootTimestamp::epoch() + chrono::milliseconds(9999)});
Austin Schuh2febf0d2020-09-21 22:24:30 -07001460
1461 std::unique_ptr<EventLoop> ping_event_loop =
1462 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
1463 Ping ping(ping_event_loop.get());
1464
1465 std::unique_ptr<EventLoop> pong_event_loop =
1466 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
1467 Pong pong(pong_event_loop.get());
1468
1469 std::unique_ptr<EventLoop> pi1_counter_event_loop =
1470 simulated_event_loop_factory.MakeEventLoop("pi1_counter", pi1);
1471 std::unique_ptr<EventLoop> pi2_counter_event_loop =
1472 simulated_event_loop_factory.MakeEventLoop("pi2_counter", pi2);
1473
1474 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
1475 pi1_counter_event_loop->MakeFetcher<examples::Ping>("/test");
1476 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
1477 pi2_counter_event_loop->MakeFetcher<examples::Ping>("/test");
1478
1479 aos::Fetcher<examples::Pong> pong_on_pi2_fetcher =
1480 pi2_counter_event_loop->MakeFetcher<examples::Pong>("/test");
1481 aos::Fetcher<examples::Pong> pong_on_pi1_fetcher =
1482 pi1_counter_event_loop->MakeFetcher<examples::Pong>("/test");
1483
1484 // End after a pong message comes back. This will leave the latest messages
1485 // on all channels so we can look at timestamps easily and check they make
1486 // sense.
1487 std::unique_ptr<EventLoop> pi1_pong_ender =
1488 simulated_event_loop_factory.MakeEventLoop("pi2_counter", pi1);
1489 int count = 0;
1490 pi1_pong_ender->MakeWatcher(
1491 "/test", [&simulated_event_loop_factory, &count](const examples::Pong &) {
1492 if (++count == 100) {
1493 simulated_event_loop_factory.Exit();
1494 }
1495 });
1496
1497 // Run enough that messages should be delivered.
1498 simulated_event_loop_factory.Run();
1499
1500 // Grab the latest messages.
1501 EXPECT_TRUE(ping_on_pi1_fetcher.Fetch());
1502 EXPECT_TRUE(ping_on_pi2_fetcher.Fetch());
1503 EXPECT_TRUE(pong_on_pi1_fetcher.Fetch());
1504 EXPECT_TRUE(pong_on_pi2_fetcher.Fetch());
1505
1506 // Compute their time on the global distributed clock so we can compute
1507 // distance betwen them.
1508 const distributed_clock::time_point pi1_ping_time =
1509 simulated_event_loop_factory.GetNodeEventLoopFactory(pi1)
1510 ->ToDistributedClock(
1511 ping_on_pi1_fetcher.context().monotonic_event_time);
1512 const distributed_clock::time_point pi2_ping_time =
1513 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
1514 ->ToDistributedClock(
1515 ping_on_pi2_fetcher.context().monotonic_event_time);
1516 const distributed_clock::time_point pi1_pong_time =
1517 simulated_event_loop_factory.GetNodeEventLoopFactory(pi1)
1518 ->ToDistributedClock(
1519 pong_on_pi1_fetcher.context().monotonic_event_time);
1520 const distributed_clock::time_point pi2_pong_time =
1521 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
1522 ->ToDistributedClock(
1523 pong_on_pi2_fetcher.context().monotonic_event_time);
1524
1525 // And confirm the delivery delay is just about exactly 150 uS for both
1526 // directions like expected. There will be a couple ns of rounding errors in
1527 // the conversion functions that aren't worth accounting for right now. This
1528 // will either be really close, or really far.
1529 EXPECT_GE(pi2_ping_time, chrono::microseconds(150) - chrono::nanoseconds(10) +
1530 pi1_ping_time);
1531 EXPECT_LE(pi2_ping_time, chrono::microseconds(150) + chrono::nanoseconds(10) +
1532 pi1_ping_time);
1533
1534 EXPECT_GE(pi1_pong_time, chrono::microseconds(150) - chrono::nanoseconds(10) +
1535 pi2_pong_time);
1536 EXPECT_LE(pi1_pong_time, chrono::microseconds(150) + chrono::nanoseconds(10) +
1537 pi2_pong_time);
1538}
1539
Austin Schuh4c570ea2020-11-19 23:13:24 -08001540void SendPing(aos::Sender<examples::Ping> *sender, int value) {
1541 aos::Sender<examples::Ping>::Builder builder = sender->MakeBuilder();
1542 examples::Ping::Builder ping_builder = builder.MakeBuilder<examples::Ping>();
1543 ping_builder.add_value(value);
milind1f1dca32021-07-03 13:50:07 -07001544 builder.CheckOk(builder.Send(ping_builder.Finish()));
Austin Schuh4c570ea2020-11-19 23:13:24 -08001545}
1546
1547// Tests that reliable (and unreliable) ping messages get forwarded as expected.
Austin Schuh89c9b812021-02-20 14:42:10 -08001548TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeStartupTesting) {
Austin Schuh4c570ea2020-11-19 23:13:24 -08001549 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
1550 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
1551
1552 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
1553
1554 std::unique_ptr<EventLoop> ping_event_loop =
1555 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
1556 aos::Sender<examples::Ping> pi1_reliable_sender =
1557 ping_event_loop->MakeSender<examples::Ping>("/reliable");
1558 aos::Sender<examples::Ping> pi1_unreliable_sender =
1559 ping_event_loop->MakeSender<examples::Ping>("/unreliable");
1560 SendPing(&pi1_reliable_sender, 1);
1561 SendPing(&pi1_unreliable_sender, 1);
1562
1563 std::unique_ptr<EventLoop> pi2_pong_event_loop =
1564 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
1565 MessageCounter<examples::Ping> pi2_reliable_counter(pi2_pong_event_loop.get(),
1566 "/reliable");
1567 MessageCounter<examples::Ping> pi2_unreliable_counter(
1568 pi2_pong_event_loop.get(), "/unreliable");
1569 aos::Fetcher<examples::Ping> reliable_on_pi2_fetcher =
1570 pi2_pong_event_loop->MakeFetcher<examples::Ping>("/reliable");
1571 aos::Fetcher<examples::Ping> unreliable_on_pi2_fetcher =
1572 pi2_pong_event_loop->MakeFetcher<examples::Ping>("/unreliable");
1573
1574 const size_t reliable_channel_index = configuration::ChannelIndex(
1575 pi2_pong_event_loop->configuration(), reliable_on_pi2_fetcher.channel());
1576
1577 std::unique_ptr<EventLoop> pi1_remote_timestamp =
1578 simulated_event_loop_factory.MakeEventLoop("pi1_remote_timestamp", pi1);
1579
Austin Schuheeaa2022021-01-02 21:52:03 -08001580 const chrono::nanoseconds network_delay =
1581 simulated_event_loop_factory.network_delay();
1582
Austin Schuh4c570ea2020-11-19 23:13:24 -08001583 int reliable_timestamp_count = 0;
1584 pi1_remote_timestamp->MakeWatcher(
Austin Schuh89c9b812021-02-20 14:42:10 -08001585 shared() ? "/pi1/aos/remote_timestamps/pi2"
1586 : "/pi1/aos/remote_timestamps/pi2/reliable/aos-examples-Ping",
Austin Schuh20ac95d2020-12-05 17:24:19 -08001587 [reliable_channel_index, &reliable_timestamp_count,
Austin Schuheeaa2022021-01-02 21:52:03 -08001588 &simulated_event_loop_factory, pi2, network_delay, &pi2_pong_event_loop,
1589 &pi1_remote_timestamp](const RemoteMessage &header) {
Austin Schuh20ac95d2020-12-05 17:24:19 -08001590 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuhcdd90272021-03-15 12:46:16 -07001591 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
Austin Schuh20ac95d2020-12-05 17:24:19 -08001592 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
Austin Schuhcdd90272021-03-15 12:46:16 -07001593 ->boot_uuid());
Austin Schuh4c570ea2020-11-19 23:13:24 -08001594 VLOG(1) << aos::FlatbufferToJson(&header);
1595 if (header.channel_index() == reliable_channel_index) {
1596 ++reliable_timestamp_count;
1597 }
Austin Schuheeaa2022021-01-02 21:52:03 -08001598
1599 const aos::monotonic_clock::time_point header_monotonic_sent_time(
1600 chrono::nanoseconds(header.monotonic_sent_time()));
1601
1602 EXPECT_EQ(pi1_remote_timestamp->context().monotonic_event_time,
1603 header_monotonic_sent_time + network_delay +
1604 (pi1_remote_timestamp->monotonic_now() -
1605 pi2_pong_event_loop->monotonic_now()));
Austin Schuh4c570ea2020-11-19 23:13:24 -08001606 });
1607
1608 // Wait to let timestamp estimation start up before looking for the results.
1609 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
1610
1611 EXPECT_EQ(pi2_reliable_counter.count(), 1u);
1612 // This one isn't reliable, but was sent before the start. It should *not* be
1613 // delivered.
1614 EXPECT_EQ(pi2_unreliable_counter.count(), 0u);
1615 // Confirm we got a timestamp logged for the message that was forwarded.
1616 EXPECT_EQ(reliable_timestamp_count, 1u);
1617
1618 SendPing(&pi1_reliable_sender, 2);
1619 SendPing(&pi1_unreliable_sender, 2);
1620 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
1621 EXPECT_EQ(pi2_reliable_counter.count(), 2u);
1622 EXPECT_EQ(pi2_unreliable_counter.count(), 1u);
1623
1624 EXPECT_EQ(reliable_timestamp_count, 2u);
1625}
1626
Austin Schuh20ac95d2020-12-05 17:24:19 -08001627// Tests that rebooting a node changes the ServerStatistics message and the
1628// RemoteTimestamp message.
Austin Schuh89c9b812021-02-20 14:42:10 -08001629TEST_P(RemoteMessageSimulatedEventLoopTest, BootUUIDTest) {
Austin Schuh72e65682021-09-02 11:37:05 -07001630 const UUID pi1_boot0 = UUID::Random();
1631 const UUID pi2_boot0 = UUID::Random();
1632 const UUID pi2_boot1 = UUID::Random();
1633 const UUID pi3_boot0 = UUID::Random();
1634 UUID expected_boot_uuid = pi2_boot0;
Austin Schuh20ac95d2020-12-05 17:24:19 -08001635
Austin Schuh58646e22021-08-23 23:51:46 -07001636 message_bridge::TestingTimeConverter time(
1637 configuration::NodesCount(&config.message()));
1638 SimulatedEventLoopFactory factory(&config.message());
1639 factory.SetTimeConverter(&time);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001640
Austin Schuh58646e22021-08-23 23:51:46 -07001641 const size_t pi1_index =
1642 configuration::GetNodeIndex(&config.message(), "pi1");
1643 const size_t pi2_index =
1644 configuration::GetNodeIndex(&config.message(), "pi2");
1645 const size_t pi3_index =
1646 configuration::GetNodeIndex(&config.message(), "pi3");
Austin Schuh20ac95d2020-12-05 17:24:19 -08001647
Austin Schuh58646e22021-08-23 23:51:46 -07001648 {
1649 time.AddNextTimestamp(distributed_clock::epoch(),
1650 {BootTimestamp::epoch(), BootTimestamp::epoch(),
1651 BootTimestamp::epoch()});
1652
1653 const chrono::nanoseconds dt = chrono::milliseconds(2001);
1654
1655 time.AddNextTimestamp(
1656 distributed_clock::epoch() + dt,
1657 {BootTimestamp::epoch() + dt,
1658 BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
1659 BootTimestamp::epoch() + dt});
1660
1661 time.set_boot_uuid(pi1_index, 0, pi1_boot0);
1662 time.set_boot_uuid(pi2_index, 0, pi2_boot0);
1663 time.set_boot_uuid(pi2_index, 1, pi2_boot1);
1664 time.set_boot_uuid(pi3_index, 0, pi3_boot0);
1665 }
1666
1667 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1668 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
1669
1670 pi1->OnStartup([pi1]() { pi1->AlwaysStart<Ping>("ping"); });
1671 pi2->OnStartup([pi2]() { pi2->AlwaysStart<Pong>("pong"); });
Austin Schuh20ac95d2020-12-05 17:24:19 -08001672
1673 std::unique_ptr<EventLoop> pi1_remote_timestamp =
Austin Schuh58646e22021-08-23 23:51:46 -07001674 pi1->MakeEventLoop("pi1_remote_timestamp");
Austin Schuh20ac95d2020-12-05 17:24:19 -08001675
1676 int timestamp_count = 0;
1677 pi1_remote_timestamp->MakeWatcher(
Austin Schuh8902fa52021-03-14 22:39:24 -07001678 "/pi2/aos", [&expected_boot_uuid,
1679 &pi1_remote_timestamp](const message_bridge::Timestamp &) {
Austin Schuha9012be2021-07-21 15:19:11 -07001680 EXPECT_EQ(pi1_remote_timestamp->context().source_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -07001681 expected_boot_uuid);
1682 });
1683 pi1_remote_timestamp->MakeWatcher(
1684 "/test",
1685 [&expected_boot_uuid, &pi1_remote_timestamp](const examples::Pong &) {
Austin Schuha9012be2021-07-21 15:19:11 -07001686 EXPECT_EQ(pi1_remote_timestamp->context().source_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -07001687 expected_boot_uuid);
1688 });
1689 pi1_remote_timestamp->MakeWatcher(
Austin Schuh89c9b812021-02-20 14:42:10 -08001690 shared() ? "/pi1/aos/remote_timestamps/pi2"
1691 : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping",
Austin Schuh20ac95d2020-12-05 17:24:19 -08001692 [&timestamp_count, &expected_boot_uuid](const RemoteMessage &header) {
1693 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuhcdd90272021-03-15 12:46:16 -07001694 EXPECT_EQ(UUID::FromVector(header.boot_uuid()), expected_boot_uuid);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001695 VLOG(1) << aos::FlatbufferToJson(&header);
1696 ++timestamp_count;
1697 });
1698
1699 int pi1_server_statistics_count = 0;
Austin Schuh58646e22021-08-23 23:51:46 -07001700 bool first_pi1_server_statistics = true;
Austin Schuh367a7f42021-11-23 23:04:36 -08001701 int boot_number = 0;
1702 monotonic_clock::time_point expected_connection_time = pi1->monotonic_now();
Austin Schuh20ac95d2020-12-05 17:24:19 -08001703 pi1_remote_timestamp->MakeWatcher(
Austin Schuh367a7f42021-11-23 23:04:36 -08001704 "/pi1/aos",
1705 [&pi1_server_statistics_count, &expected_boot_uuid,
1706 &expected_connection_time, &first_pi1_server_statistics,
1707 &boot_number](const message_bridge::ServerStatistics &stats) {
Austin Schuh20ac95d2020-12-05 17:24:19 -08001708 VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
1709 for (const message_bridge::ServerConnection *connection :
1710 *stats.connections()) {
Austin Schuh58646e22021-08-23 23:51:46 -07001711 if (connection->state() == message_bridge::State::CONNECTED) {
1712 ASSERT_TRUE(connection->has_boot_uuid());
1713 }
1714 if (!first_pi1_server_statistics) {
1715 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1716 }
Austin Schuh20ac95d2020-12-05 17:24:19 -08001717 if (connection->node()->name()->string_view() == "pi2") {
Austin Schuh58646e22021-08-23 23:51:46 -07001718 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1719 ASSERT_TRUE(connection->has_boot_uuid());
Austin Schuh20ac95d2020-12-05 17:24:19 -08001720 EXPECT_EQ(expected_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -07001721 UUID::FromString(connection->boot_uuid()))
Austin Schuh20ac95d2020-12-05 17:24:19 -08001722 << " : Got " << aos::FlatbufferToJson(&stats);
Austin Schuh367a7f42021-11-23 23:04:36 -08001723 EXPECT_EQ(monotonic_clock::time_point(chrono::nanoseconds(
1724 connection->connected_since_time())),
1725 expected_connection_time);
1726 EXPECT_EQ(boot_number + 1, connection->connection_count());
Austin Schuh20ac95d2020-12-05 17:24:19 -08001727 ++pi1_server_statistics_count;
1728 }
1729 }
Austin Schuh58646e22021-08-23 23:51:46 -07001730 first_pi1_server_statistics = false;
Austin Schuh20ac95d2020-12-05 17:24:19 -08001731 });
1732
Austin Schuh58646e22021-08-23 23:51:46 -07001733 int pi1_client_statistics_count = 0;
1734 pi1_remote_timestamp->MakeWatcher(
Austin Schuh367a7f42021-11-23 23:04:36 -08001735 "/pi1/aos", [&pi1_client_statistics_count, &expected_boot_uuid,
1736 &expected_connection_time, &boot_number](
Austin Schuh58646e22021-08-23 23:51:46 -07001737 const message_bridge::ClientStatistics &stats) {
1738 VLOG(1) << "pi1 ClientStatistics " << FlatbufferToJson(&stats);
1739 for (const message_bridge::ClientConnection *connection :
1740 *stats.connections()) {
1741 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1742 if (connection->node()->name()->string_view() == "pi2") {
1743 ++pi1_client_statistics_count;
Austin Schuh367a7f42021-11-23 23:04:36 -08001744 EXPECT_EQ(expected_boot_uuid,
1745 UUID::FromString(connection->boot_uuid()))
1746 << " : Got " << aos::FlatbufferToJson(&stats);
1747 EXPECT_EQ(monotonic_clock::time_point(chrono::nanoseconds(
1748 connection->connected_since_time())),
1749 expected_connection_time);
1750 EXPECT_EQ(boot_number + 1, connection->connection_count());
1751 } else {
1752 EXPECT_EQ(connection->connected_since_time(), 0);
1753 EXPECT_EQ(1, connection->connection_count());
Austin Schuh58646e22021-08-23 23:51:46 -07001754 }
1755 }
1756 });
1757
1758 // Confirm that reboot changes the UUID.
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -07001759 pi2->OnShutdown([&expected_boot_uuid, &boot_number, &expected_connection_time,
1760 pi1, pi2, pi2_boot1]() {
1761 expected_boot_uuid = pi2_boot1;
1762 ++boot_number;
1763 LOG(INFO) << "OnShutdown triggered for pi2";
1764 pi2->OnStartup(
1765 [&expected_boot_uuid, &expected_connection_time, pi1, pi2]() {
1766 EXPECT_EQ(expected_boot_uuid, pi2->boot_uuid());
1767 expected_connection_time = pi1->monotonic_now();
1768 });
1769 });
Austin Schuh58646e22021-08-23 23:51:46 -07001770
Austin Schuh20ac95d2020-12-05 17:24:19 -08001771 // Let a couple of ServerStatistics messages show up before rebooting.
Austin Schuh58646e22021-08-23 23:51:46 -07001772 factory.RunFor(chrono::milliseconds(2002));
Austin Schuh20ac95d2020-12-05 17:24:19 -08001773
1774 EXPECT_GT(timestamp_count, 100);
1775 EXPECT_GE(pi1_server_statistics_count, 1u);
1776
Austin Schuh20ac95d2020-12-05 17:24:19 -08001777 timestamp_count = 0;
1778 pi1_server_statistics_count = 0;
1779
Austin Schuh58646e22021-08-23 23:51:46 -07001780 factory.RunFor(chrono::milliseconds(2000));
Austin Schuh20ac95d2020-12-05 17:24:19 -08001781 EXPECT_GT(timestamp_count, 100);
1782 EXPECT_GE(pi1_server_statistics_count, 1u);
1783}
1784
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -07001785INSTANTIATE_TEST_SUITE_P(
Austin Schuh89c9b812021-02-20 14:42:10 -08001786 All, RemoteMessageSimulatedEventLoopTest,
1787 ::testing::Values(
1788 Param{"multinode_pingpong_test_combined_config.json", true},
1789 Param{"multinode_pingpong_test_split_config.json", false}));
1790
Austin Schuh58646e22021-08-23 23:51:46 -07001791// Tests that Startup and Shutdown do reasonable things.
1792TEST(SimulatedEventLoopTest, MultinodePingPongStartup) {
1793 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
1794 aos::configuration::ReadConfig(
1795 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
1796
Austin Schuh72e65682021-09-02 11:37:05 -07001797 size_t pi1_shutdown_counter = 0;
1798 size_t pi2_shutdown_counter = 0;
1799 MessageCounter<examples::Pong> *pi1_pong_counter = nullptr;
1800 MessageCounter<examples::Ping> *pi2_ping_counter = nullptr;
1801
Austin Schuh58646e22021-08-23 23:51:46 -07001802 message_bridge::TestingTimeConverter time(
1803 configuration::NodesCount(&config.message()));
1804 SimulatedEventLoopFactory factory(&config.message());
1805 factory.SetTimeConverter(&time);
1806 time.AddNextTimestamp(
1807 distributed_clock::epoch(),
1808 {BootTimestamp::epoch(), BootTimestamp::epoch(), BootTimestamp::epoch()});
1809
1810 const chrono::nanoseconds dt = chrono::seconds(10) + chrono::milliseconds(6);
1811
1812 time.AddNextTimestamp(
1813 distributed_clock::epoch() + dt,
1814 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
1815 BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
1816 BootTimestamp::epoch() + dt});
1817
1818 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1819 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
1820
1821 // Configure startup to start Ping and Pong, and count.
1822 size_t pi1_startup_counter = 0;
1823 size_t pi2_startup_counter = 0;
1824 pi1->OnStartup([pi1]() {
1825 LOG(INFO) << "Made ping";
1826 pi1->AlwaysStart<Ping>("ping");
1827 });
1828 pi1->OnStartup([&pi1_startup_counter]() { ++pi1_startup_counter; });
1829 pi2->OnStartup([pi2]() {
1830 LOG(INFO) << "Made pong";
1831 pi2->AlwaysStart<Pong>("pong");
1832 });
1833 pi2->OnStartup([&pi2_startup_counter]() { ++pi2_startup_counter; });
1834
1835 // Shutdown just counts.
Austin Schuh58646e22021-08-23 23:51:46 -07001836 pi1->OnShutdown([&pi1_shutdown_counter]() { ++pi1_shutdown_counter; });
1837 pi2->OnShutdown([&pi2_shutdown_counter]() { ++pi2_shutdown_counter; });
1838
Austin Schuh58646e22021-08-23 23:51:46 -07001839 // Automatically make counters on startup.
1840 pi1->OnStartup([&pi1_pong_counter, pi1]() {
1841 pi1_pong_counter = pi1->AlwaysStart<MessageCounter<examples::Pong>>(
1842 "pi1_pong_counter", "/test");
1843 });
1844 pi1->OnShutdown([&pi1_pong_counter]() { pi1_pong_counter = nullptr; });
1845 pi2->OnStartup([&pi2_ping_counter, pi2]() {
1846 pi2_ping_counter = pi2->AlwaysStart<MessageCounter<examples::Ping>>(
1847 "pi2_ping_counter", "/test");
1848 });
1849 pi2->OnShutdown([&pi2_ping_counter]() { pi2_ping_counter = nullptr; });
1850
1851 EXPECT_EQ(pi2_ping_counter, nullptr);
1852 EXPECT_EQ(pi1_pong_counter, nullptr);
1853
1854 EXPECT_EQ(pi1_startup_counter, 0u);
1855 EXPECT_EQ(pi2_startup_counter, 0u);
1856 EXPECT_EQ(pi1_shutdown_counter, 0u);
1857 EXPECT_EQ(pi2_shutdown_counter, 0u);
1858
1859 factory.RunFor(chrono::seconds(10) + chrono::milliseconds(5));
1860 EXPECT_EQ(pi1_startup_counter, 1u);
1861 EXPECT_EQ(pi2_startup_counter, 1u);
1862 EXPECT_EQ(pi1_shutdown_counter, 0u);
1863 EXPECT_EQ(pi2_shutdown_counter, 0u);
1864 EXPECT_EQ(pi2_ping_counter->count(), 1001);
1865 EXPECT_EQ(pi1_pong_counter->count(), 1001);
1866
1867 LOG(INFO) << pi1->monotonic_now();
1868 LOG(INFO) << pi2->monotonic_now();
1869
1870 factory.RunFor(chrono::seconds(5) + chrono::milliseconds(5));
1871
1872 EXPECT_EQ(pi1_startup_counter, 2u);
1873 EXPECT_EQ(pi2_startup_counter, 2u);
1874 EXPECT_EQ(pi1_shutdown_counter, 1u);
1875 EXPECT_EQ(pi2_shutdown_counter, 1u);
1876 EXPECT_EQ(pi2_ping_counter->count(), 501);
1877 EXPECT_EQ(pi1_pong_counter->count(), 501);
1878}
1879
1880// Tests that OnStartup handlers can be added after running and get called, and
1881// can't be called when running.
1882TEST(SimulatedEventLoopDeathTest, OnStartupWhileRunning) {
1883 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
1884 aos::configuration::ReadConfig(
1885 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
1886
1887 // Test that we can add startup handlers as long as we aren't running, and
1888 // they get run when Run gets called again.
1889 // Test that adding a startup handler when running fails.
1890 //
1891 // Test shutdown handlers get called on destruction.
1892 SimulatedEventLoopFactory factory(&config.message());
1893
1894 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1895
1896 int startup_count0 = 0;
1897 int startup_count1 = 0;
1898
1899 pi1->OnStartup([&]() { ++startup_count0; });
1900 EXPECT_EQ(startup_count0, 0);
1901 EXPECT_EQ(startup_count1, 0);
1902
1903 factory.RunFor(chrono::nanoseconds(1));
1904 EXPECT_EQ(startup_count0, 1);
1905 EXPECT_EQ(startup_count1, 0);
1906
1907 pi1->OnStartup([&]() { ++startup_count1; });
1908 EXPECT_EQ(startup_count0, 1);
1909 EXPECT_EQ(startup_count1, 0);
1910
1911 factory.RunFor(chrono::nanoseconds(1));
1912 EXPECT_EQ(startup_count0, 1);
1913 EXPECT_EQ(startup_count1, 1);
1914
1915 std::unique_ptr<EventLoop> loop = pi1->MakeEventLoop("foo");
1916 loop->OnRun([&]() { pi1->OnStartup([]() {}); });
1917
1918 EXPECT_DEATH({ factory.RunFor(chrono::nanoseconds(1)); },
1919 "Can only register OnStartup handlers when not running.");
1920}
1921
1922// Tests that OnStartup handlers can be added after running and get called, and
1923// all the handlers get called on reboot. Shutdown handlers are tested the same
1924// way.
1925TEST(SimulatedEventLoopTest, OnStartupShutdownAllRestarts) {
1926 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
1927 aos::configuration::ReadConfig(
1928 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
1929
Austin Schuh72e65682021-09-02 11:37:05 -07001930 int startup_count0 = 0;
1931 int shutdown_count0 = 0;
1932 int startup_count1 = 0;
1933 int shutdown_count1 = 0;
1934
Austin Schuh58646e22021-08-23 23:51:46 -07001935 message_bridge::TestingTimeConverter time(
1936 configuration::NodesCount(&config.message()));
1937 SimulatedEventLoopFactory factory(&config.message());
1938 factory.SetTimeConverter(&time);
1939 time.StartEqual();
1940
1941 const chrono::nanoseconds dt = chrono::seconds(10);
1942 time.RebootAt(0, distributed_clock::epoch() + dt);
1943 time.RebootAt(0, distributed_clock::epoch() + 2 * dt);
1944
1945 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1946
Austin Schuh58646e22021-08-23 23:51:46 -07001947 pi1->OnStartup([&]() { ++startup_count0; });
1948 pi1->OnShutdown([&]() { ++shutdown_count0; });
1949 EXPECT_EQ(startup_count0, 0);
1950 EXPECT_EQ(startup_count1, 0);
1951 EXPECT_EQ(shutdown_count0, 0);
1952 EXPECT_EQ(shutdown_count1, 0);
1953
1954 factory.RunFor(chrono::nanoseconds(1));
1955 EXPECT_EQ(startup_count0, 1);
1956 EXPECT_EQ(startup_count1, 0);
1957 EXPECT_EQ(shutdown_count0, 0);
1958 EXPECT_EQ(shutdown_count1, 0);
1959
1960 pi1->OnStartup([&]() { ++startup_count1; });
1961 EXPECT_EQ(startup_count0, 1);
1962 EXPECT_EQ(startup_count1, 0);
1963 EXPECT_EQ(shutdown_count0, 0);
1964 EXPECT_EQ(shutdown_count1, 0);
1965
1966 factory.RunFor(chrono::nanoseconds(1));
1967 EXPECT_EQ(startup_count0, 1);
1968 EXPECT_EQ(startup_count1, 1);
1969 EXPECT_EQ(shutdown_count0, 0);
1970 EXPECT_EQ(shutdown_count1, 0);
1971
1972 factory.RunFor(chrono::seconds(15));
1973
1974 EXPECT_EQ(startup_count0, 2);
1975 EXPECT_EQ(startup_count1, 2);
1976 EXPECT_EQ(shutdown_count0, 1);
1977 EXPECT_EQ(shutdown_count1, 0);
1978
1979 pi1->OnShutdown([&]() { ++shutdown_count1; });
1980 factory.RunFor(chrono::seconds(10));
1981
1982 EXPECT_EQ(startup_count0, 3);
1983 EXPECT_EQ(startup_count1, 3);
1984 EXPECT_EQ(shutdown_count0, 2);
1985 EXPECT_EQ(shutdown_count1, 1);
1986}
1987
1988// Tests that event loops which outlive shutdown crash.
1989TEST(SimulatedEventLoopDeathTest, EventLoopOutlivesReboot) {
1990 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
1991 aos::configuration::ReadConfig(
1992 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
1993
1994 message_bridge::TestingTimeConverter time(
1995 configuration::NodesCount(&config.message()));
1996 SimulatedEventLoopFactory factory(&config.message());
1997 factory.SetTimeConverter(&time);
1998 time.StartEqual();
1999
2000 const chrono::nanoseconds dt = chrono::seconds(10);
2001 time.RebootAt(0, distributed_clock::epoch() + dt);
2002
2003 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2004
2005 std::unique_ptr<EventLoop> loop = pi1->MakeEventLoop("foo");
2006
2007 EXPECT_DEATH({ factory.RunFor(dt * 2); }, "Event loop");
2008}
2009
2010// Tests that messages don't survive a reboot of a node.
2011TEST(SimulatedEventLoopTest, ChannelClearedOnReboot) {
2012 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2013 aos::configuration::ReadConfig(
2014 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2015
2016 message_bridge::TestingTimeConverter time(
2017 configuration::NodesCount(&config.message()));
2018 SimulatedEventLoopFactory factory(&config.message());
2019 factory.SetTimeConverter(&time);
2020 time.StartEqual();
2021
2022 const chrono::nanoseconds dt = chrono::seconds(10);
2023 time.RebootAt(0, distributed_clock::epoch() + dt);
2024
2025 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2026
2027 const UUID boot_uuid = pi1->boot_uuid();
2028 EXPECT_NE(boot_uuid, UUID::Zero());
2029
2030 {
2031 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
2032 aos::Sender<examples::Ping> test_message_sender =
2033 ping_event_loop->MakeSender<examples::Ping>("/reliable");
2034 SendPing(&test_message_sender, 1);
2035 }
2036
2037 factory.RunFor(chrono::seconds(5));
2038
2039 {
2040 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
2041 aos::Fetcher<examples::Ping> fetcher =
2042 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
2043 EXPECT_TRUE(fetcher.Fetch());
2044 }
2045
2046 factory.RunFor(chrono::seconds(10));
2047
2048 {
2049 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
2050 aos::Fetcher<examples::Ping> fetcher =
2051 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
2052 EXPECT_FALSE(fetcher.Fetch());
2053 }
2054 EXPECT_NE(boot_uuid, pi1->boot_uuid());
2055}
2056
2057// Tests that reliable messages get resent on reboot.
2058TEST(SimulatedEventLoopTest, ReliableMessageResentOnReboot) {
2059 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2060 aos::configuration::ReadConfig(
2061 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2062
2063 message_bridge::TestingTimeConverter time(
2064 configuration::NodesCount(&config.message()));
2065 SimulatedEventLoopFactory factory(&config.message());
2066 factory.SetTimeConverter(&time);
2067 time.StartEqual();
2068
2069 const chrono::nanoseconds dt = chrono::seconds(1);
2070 time.RebootAt(1, distributed_clock::epoch() + dt);
2071
2072 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2073 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2074
2075 const UUID pi1_boot_uuid = pi1->boot_uuid();
2076 const UUID pi2_boot_uuid = pi2->boot_uuid();
2077 EXPECT_NE(pi1_boot_uuid, UUID::Zero());
2078 EXPECT_NE(pi2_boot_uuid, UUID::Zero());
2079
2080 {
2081 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
2082 aos::Sender<examples::Ping> test_message_sender =
2083 ping_event_loop->MakeSender<examples::Ping>("/reliable");
2084 SendPing(&test_message_sender, 1);
2085 }
2086
2087 factory.RunFor(chrono::milliseconds(500));
2088
2089 {
2090 ::std::unique_ptr<EventLoop> ping_event_loop = pi2->MakeEventLoop("pong");
2091 aos::Fetcher<examples::Ping> fetcher =
2092 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
2093 EXPECT_TRUE(fetcher.Fetch());
2094 }
2095
2096 factory.RunFor(chrono::seconds(1));
2097
2098 {
2099 ::std::unique_ptr<EventLoop> ping_event_loop = pi2->MakeEventLoop("pong");
2100 aos::Fetcher<examples::Ping> fetcher =
2101 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
2102 EXPECT_TRUE(fetcher.Fetch());
2103 }
2104 EXPECT_NE(pi2_boot_uuid, pi2->boot_uuid());
2105}
2106
Austin Schuh48205e62021-11-12 14:13:18 -08002107class SimulatedEventLoopDisconnectTest : public ::testing::Test {
2108 public:
2109 SimulatedEventLoopDisconnectTest()
2110 : config(aos::configuration::ReadConfig(ArtifactPath(
2111 "aos/events/multinode_pingpong_test_split_config.json"))),
2112 time(configuration::NodesCount(&config.message())),
2113 factory(&config.message()) {
2114 factory.SetTimeConverter(&time);
2115 }
2116
2117 void VerifyChannels(std::set<const aos::Channel *> statistics_channels,
2118 const monotonic_clock::time_point allowable_message_time,
2119 std::set<const aos::Node *> empty_nodes) {
2120 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2121 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2122 std::unique_ptr<aos::EventLoop> pi1_event_loop =
2123 pi1->MakeEventLoop("fetcher");
2124 std::unique_ptr<aos::EventLoop> pi2_event_loop =
2125 pi2->MakeEventLoop("fetcher");
2126 for (const aos::Channel *channel : *factory.configuration()->channels()) {
2127 if (configuration::ChannelIsReadableOnNode(channel,
2128 pi1_event_loop->node())) {
2129 std::unique_ptr<aos::RawFetcher> fetcher =
2130 pi1_event_loop->MakeRawFetcher(channel);
2131 if (statistics_channels.find(channel) == statistics_channels.end() ||
2132 empty_nodes.find(pi1_event_loop->node()) != empty_nodes.end()) {
2133 EXPECT_FALSE(fetcher->Fetch() &&
2134 fetcher->context().monotonic_event_time >
2135 allowable_message_time)
2136 << ": Found recent message on channel "
2137 << configuration::CleanedChannelToString(channel) << " and time "
2138 << fetcher->context().monotonic_event_time << " > "
2139 << allowable_message_time << " on pi1";
2140 } else {
2141 EXPECT_TRUE(fetcher->Fetch() &&
2142 fetcher->context().monotonic_event_time >=
2143 allowable_message_time)
2144 << ": Didn't find recent message on channel "
2145 << configuration::CleanedChannelToString(channel) << " on pi1";
2146 }
2147 }
2148 if (configuration::ChannelIsReadableOnNode(channel,
2149 pi2_event_loop->node())) {
2150 std::unique_ptr<aos::RawFetcher> fetcher =
2151 pi2_event_loop->MakeRawFetcher(channel);
2152 if (statistics_channels.find(channel) == statistics_channels.end() ||
2153 empty_nodes.find(pi2_event_loop->node()) != empty_nodes.end()) {
2154 EXPECT_FALSE(fetcher->Fetch() &&
2155 fetcher->context().monotonic_event_time >
2156 allowable_message_time)
2157 << ": Found message on channel "
2158 << configuration::CleanedChannelToString(channel) << " and time "
2159 << fetcher->context().monotonic_event_time << " > "
2160 << allowable_message_time << " on pi2";
2161 } else {
2162 EXPECT_TRUE(fetcher->Fetch() &&
2163 fetcher->context().monotonic_event_time >=
2164 allowable_message_time)
2165 << ": Didn't find message on channel "
2166 << configuration::CleanedChannelToString(channel) << " on pi2";
2167 }
2168 }
2169 }
2170 }
2171
2172 aos::FlatbufferDetachedBuffer<aos::Configuration> config;
2173
2174 message_bridge::TestingTimeConverter time;
2175 SimulatedEventLoopFactory factory;
2176};
2177
2178// Tests that if we have message bridge client/server disabled, and timing
2179// reports disabled, no messages are sent. Also tests that we can disconnect a
2180// node and disable statistics on it and it actually fully disconnects.
2181TEST_F(SimulatedEventLoopDisconnectTest, NoMessagesWhenDisabled) {
2182 time.StartEqual();
2183 factory.SkipTimingReport();
2184 factory.DisableStatistics();
2185
2186 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2187 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2188
2189 std::unique_ptr<aos::EventLoop> pi1_event_loop =
2190 pi1->MakeEventLoop("fetcher");
2191 std::unique_ptr<aos::EventLoop> pi2_event_loop =
2192 pi2->MakeEventLoop("fetcher");
2193
2194 factory.RunFor(chrono::milliseconds(100000));
2195
2196 // Confirm no messages are sent if we've configured them all off.
2197 VerifyChannels({}, monotonic_clock::min_time, {});
2198
2199 // Now, confirm that all the message_bridge channels come back when we
2200 // re-enable.
2201 factory.EnableStatistics();
2202
2203 factory.RunFor(chrono::milliseconds(10050));
2204
2205 // Build up the list of all the messages we expect when we come back.
2206 {
2207 std::set<const aos::Channel *> statistics_channels;
Brian Silverman18c8aac2022-01-02 00:03:37 -08002208 for (const std::pair<std::string_view, const Node *> &pi :
Austin Schuh48205e62021-11-12 14:13:18 -08002209 std::vector<std::pair<std::string_view, const Node *>>{
2210 {"/pi1/aos", pi1->node()},
2211 {"/pi2/aos", pi1->node()},
2212 {"/pi3/aos", pi1->node()}}) {
2213 statistics_channels.insert(configuration::GetChannel(
2214 factory.configuration(), pi.first, "aos.message_bridge.Timestamp", "",
2215 pi.second));
2216 statistics_channels.insert(configuration::GetChannel(
2217 factory.configuration(), pi.first,
2218 "aos.message_bridge.ServerStatistics", "", pi.second));
2219 statistics_channels.insert(configuration::GetChannel(
2220 factory.configuration(), pi.first,
2221 "aos.message_bridge.ClientStatistics", "", pi.second));
2222 }
2223
2224 statistics_channels.insert(configuration::GetChannel(
2225 factory.configuration(),
2226 "/pi1/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp",
2227 "aos.message_bridge.RemoteMessage", "", pi1->node()));
2228 statistics_channels.insert(configuration::GetChannel(
2229 factory.configuration(),
2230 "/pi2/aos/remote_timestamps/pi1/pi2/aos/aos-message_bridge-Timestamp",
2231 "aos.message_bridge.RemoteMessage", "", pi2->node()));
2232 VerifyChannels(statistics_channels, monotonic_clock::min_time, {});
2233 }
2234
2235 // Now test that we can disable the messages for a single node
2236 pi2->DisableStatistics();
2237 const aos::monotonic_clock::time_point statistics_disable_time =
2238 pi2->monotonic_now();
2239 factory.RunFor(chrono::milliseconds(10000));
2240
2241 // We should see a much smaller set of messages, but should still see messages
2242 // forwarded, mainly the timestamp message.
2243 {
2244 std::set<const aos::Channel *> statistics_channels;
Brian Silverman18c8aac2022-01-02 00:03:37 -08002245 for (const std::pair<std::string_view, const Node *> &pi :
Austin Schuh48205e62021-11-12 14:13:18 -08002246 std::vector<std::pair<std::string_view, const Node *>>{
2247 {"/pi1/aos", pi1->node()}, {"/pi3/aos", pi1->node()}}) {
2248 statistics_channels.insert(configuration::GetChannel(
2249 factory.configuration(), pi.first, "aos.message_bridge.Timestamp", "",
2250 pi.second));
2251 statistics_channels.insert(configuration::GetChannel(
2252 factory.configuration(), pi.first,
2253 "aos.message_bridge.ServerStatistics", "", pi.second));
2254 statistics_channels.insert(configuration::GetChannel(
2255 factory.configuration(), pi.first,
2256 "aos.message_bridge.ClientStatistics", "", pi.second));
2257 }
2258
2259 statistics_channels.insert(configuration::GetChannel(
2260 factory.configuration(),
2261 "/pi1/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp",
2262 "aos.message_bridge.RemoteMessage", "", pi1->node()));
2263 VerifyChannels(statistics_channels, statistics_disable_time, {});
2264 }
2265
2266 // Now, fully disconnect the node. This will completely quiet down pi2.
2267 pi1->Disconnect(pi2->node());
2268 pi2->Disconnect(pi1->node());
2269
2270 const aos::monotonic_clock::time_point disconnect_disable_time =
2271 pi2->monotonic_now();
2272 factory.RunFor(chrono::milliseconds(10000));
2273
2274 {
2275 std::set<const aos::Channel *> statistics_channels;
Brian Silverman18c8aac2022-01-02 00:03:37 -08002276 for (const std::pair<std::string_view, const Node *> &pi :
Austin Schuh48205e62021-11-12 14:13:18 -08002277 std::vector<std::pair<std::string_view, const Node *>>{
2278 {"/pi1/aos", pi1->node()}, {"/pi3/aos", pi1->node()}}) {
2279 statistics_channels.insert(configuration::GetChannel(
2280 factory.configuration(), pi.first, "aos.message_bridge.Timestamp", "",
2281 pi.second));
2282 statistics_channels.insert(configuration::GetChannel(
2283 factory.configuration(), pi.first,
2284 "aos.message_bridge.ServerStatistics", "", pi.second));
2285 statistics_channels.insert(configuration::GetChannel(
2286 factory.configuration(), pi.first,
2287 "aos.message_bridge.ClientStatistics", "", pi.second));
2288 }
2289
2290 VerifyChannels(statistics_channels, disconnect_disable_time, {pi2->node()});
2291 }
2292}
2293
Neil Balchc8f41ed2018-01-20 22:06:53 -08002294} // namespace testing
2295} // namespace aos