blob: 4b155cefcc23d1c284387c9387d2c51426eeda7e [file] [log] [blame]
Alex Perrycb7da4b2019-08-28 19:35:56 -07001#include "aos/events/simulated_event_loop.h"
2
milind1f1dca32021-07-03 13:50:07 -07003#include <chrono>
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -07004#include <functional>
Austin Schuh5f1cc5c2019-12-01 18:01:11 -08005#include <string_view>
6
Alex Perrycb7da4b2019-08-28 19:35:56 -07007#include "aos/events/event_loop_param_test.h"
Austin Schuh2f8fd752020-09-01 22:38:28 -07008#include "aos/events/logging/logger_generated.h"
Austin Schuh01b4c352020-09-21 23:09:39 -07009#include "aos/events/message_counter.h"
Austin Schuh898f4972020-01-11 17:21:25 -080010#include "aos/events/ping_lib.h"
11#include "aos/events/pong_lib.h"
Austin Schuh7d87b672019-12-01 20:23:49 -080012#include "aos/events/test_message_generated.h"
Austin Schuh4c3b9702020-08-30 11:34:55 -070013#include "aos/network/message_bridge_client_generated.h"
14#include "aos/network/message_bridge_server_generated.h"
Austin Schuh0de30f32020-12-06 12:44:28 -080015#include "aos/network/remote_message_generated.h"
Austin Schuh87dd3832021-01-01 23:07:31 -080016#include "aos/network/testing_time_converter.h"
Austin Schuh4c3b9702020-08-30 11:34:55 -070017#include "aos/network/timestamp_generated.h"
Austin Schuh373f1762021-06-02 21:07:09 -070018#include "aos/testing/path.h"
Neil Balchc8f41ed2018-01-20 22:06:53 -080019#include "gtest/gtest.h"
20
21namespace aos {
22namespace testing {
Brian Silverman28d14302020-09-18 15:26:17 -070023namespace {
24
Austin Schuh373f1762021-06-02 21:07:09 -070025using aos::testing::ArtifactPath;
Brian Silverman28d14302020-09-18 15:26:17 -070026
Austin Schuh58646e22021-08-23 23:51:46 -070027using logger::BootTimestamp;
Austin Schuh0de30f32020-12-06 12:44:28 -080028using message_bridge::RemoteMessage;
Austin Schuh7267c532019-05-19 19:55:53 -070029namespace chrono = ::std::chrono;
30
Austin Schuh0de30f32020-12-06 12:44:28 -080031} // namespace
32
Neil Balchc8f41ed2018-01-20 22:06:53 -080033class SimulatedEventLoopTestFactory : public EventLoopTestFactory {
34 public:
Austin Schuh5f1cc5c2019-12-01 18:01:11 -080035 ::std::unique_ptr<EventLoop> Make(std::string_view name) override {
Austin Schuh217a9782019-12-21 23:02:50 -080036 MaybeMake();
Austin Schuhac0771c2020-01-07 18:36:30 -080037 return event_loop_factory_->MakeEventLoop(name, my_node());
Neil Balchc8f41ed2018-01-20 22:06:53 -080038 }
Austin Schuh5f1cc5c2019-12-01 18:01:11 -080039 ::std::unique_ptr<EventLoop> MakePrimary(std::string_view name) override {
Austin Schuh217a9782019-12-21 23:02:50 -080040 MaybeMake();
Austin Schuhac0771c2020-01-07 18:36:30 -080041 return event_loop_factory_->MakeEventLoop(name, my_node());
Austin Schuh44019f92019-05-19 19:58:27 -070042 }
43
Austin Schuh217a9782019-12-21 23:02:50 -080044 void Run() override { event_loop_factory_->Run(); }
45 void Exit() override { event_loop_factory_->Exit(); }
Austin Schuh44019f92019-05-19 19:58:27 -070046
Austin Schuh52d325c2019-06-23 18:59:06 -070047 // TODO(austin): Implement this. It's used currently for a phased loop test.
48 // I'm not sure how much that matters.
49 void SleepFor(::std::chrono::nanoseconds /*duration*/) override {}
50
Austin Schuh7d87b672019-12-01 20:23:49 -080051 void set_send_delay(std::chrono::nanoseconds send_delay) {
Austin Schuh217a9782019-12-21 23:02:50 -080052 MaybeMake();
53 event_loop_factory_->set_send_delay(send_delay);
Austin Schuh7d87b672019-12-01 20:23:49 -080054 }
55
Neil Balchc8f41ed2018-01-20 22:06:53 -080056 private:
Austin Schuh217a9782019-12-21 23:02:50 -080057 void MaybeMake() {
58 if (!event_loop_factory_) {
59 if (configuration()->has_nodes()) {
Austin Schuhac0771c2020-01-07 18:36:30 -080060 event_loop_factory_ =
61 std::make_unique<SimulatedEventLoopFactory>(configuration());
Austin Schuh217a9782019-12-21 23:02:50 -080062 } else {
63 event_loop_factory_ =
64 std::make_unique<SimulatedEventLoopFactory>(configuration());
65 }
66 }
67 }
68 std::unique_ptr<SimulatedEventLoopFactory> event_loop_factory_;
Neil Balchc8f41ed2018-01-20 22:06:53 -080069};
70
Austin Schuh6bae8252021-02-07 22:01:49 -080071auto CommonParameters() {
72 return ::testing::Combine(
73 ::testing::Values([]() { return new SimulatedEventLoopTestFactory(); }),
74 ::testing::Values(ReadMethod::COPY, ReadMethod::PIN),
75 ::testing::Values(DoTimingReports::kYes, DoTimingReports::kNo));
76}
Austin Schuh6b6dfa52019-06-12 20:16:20 -070077
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -070078INSTANTIATE_TEST_SUITE_P(SimulatedEventLoopCommonTest, AbstractEventLoopTest,
Austin Schuh66168842021-08-17 19:42:21 -070079 CommonParameters());
Brian Silverman77162972020-08-12 19:52:40 -070080
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -070081INSTANTIATE_TEST_SUITE_P(SimulatedEventLoopCommonDeathTest,
Austin Schuh66168842021-08-17 19:42:21 -070082 AbstractEventLoopDeathTest, CommonParameters());
Neil Balchc8f41ed2018-01-20 22:06:53 -080083
Austin Schuh89c9b812021-02-20 14:42:10 -080084// Parameters to run all the tests with.
85struct Param {
86 // The config file to use.
87 std::string config;
88 // If true, the RemoteMessage channel should be shared between all the remote
89 // channels. If false, there will be 1 RemoteMessage channel per remote
90 // channel.
91 bool shared;
92};
93
94class RemoteMessageSimulatedEventLoopTest
95 : public ::testing::TestWithParam<struct Param> {
96 public:
97 RemoteMessageSimulatedEventLoopTest()
98 : config(aos::configuration::ReadConfig(
Austin Schuh373f1762021-06-02 21:07:09 -070099 ArtifactPath(absl::StrCat("aos/events/", GetParam().config)))) {
Austin Schuh89c9b812021-02-20 14:42:10 -0800100 LOG(INFO) << "Config " << GetParam().config;
101 }
102
103 bool shared() const { return GetParam().shared; }
104
105 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
106 MakePi2OnPi1MessageCounters(aos::EventLoop *event_loop) {
107 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>> counters;
108 if (shared()) {
109 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
110 event_loop, "/aos/remote_timestamps/pi2"));
111 } else {
112 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
113 event_loop,
114 "/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp"));
115 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
116 event_loop, "/aos/remote_timestamps/pi2/test/aos-examples-Ping"));
117 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
118 event_loop, "/aos/remote_timestamps/pi2/reliable/aos-examples-Ping"));
119 }
120 return counters;
121 }
122
123 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
124 MakePi1OnPi2MessageCounters(aos::EventLoop *event_loop) {
125 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>> counters;
126 if (shared()) {
127 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
128 event_loop, "/aos/remote_timestamps/pi1"));
129 } else {
130 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
131 event_loop, "/aos/remote_timestamps/pi1/test/aos-examples-Pong"));
132 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
133 event_loop,
134 "/aos/remote_timestamps/pi1/pi2/aos/aos-message_bridge-Timestamp"));
135 }
136 return counters;
137 }
138
139 aos::FlatbufferDetachedBuffer<aos::Configuration> config;
140};
141
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800142class FunctionEvent : public EventScheduler::Event {
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700143 public:
144 FunctionEvent(std::function<void()> fn) : fn_(fn) {}
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800145
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700146 void Handle() noexcept override { fn_(); }
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800147
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700148 private:
149 std::function<void()> fn_;
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800150};
151
Neil Balchc8f41ed2018-01-20 22:06:53 -0800152// Test that creating an event and running the scheduler runs the event.
153TEST(EventSchedulerTest, ScheduleEvent) {
154 int counter = 0;
Austin Schuh8bd96322020-02-13 21:18:22 -0800155 EventSchedulerScheduler scheduler_scheduler;
Austin Schuh58646e22021-08-23 23:51:46 -0700156 EventScheduler scheduler(0);
Austin Schuh8bd96322020-02-13 21:18:22 -0800157 scheduler_scheduler.AddEventScheduler(&scheduler);
Neil Balchc8f41ed2018-01-20 22:06:53 -0800158
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800159 FunctionEvent e([&counter]() { counter += 1; });
160 scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(1), &e);
Austin Schuh8bd96322020-02-13 21:18:22 -0800161 scheduler_scheduler.Run();
Neil Balchc8f41ed2018-01-20 22:06:53 -0800162 EXPECT_EQ(counter, 1);
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800163 auto token =
164 scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(2), &e);
Neil Balchc8f41ed2018-01-20 22:06:53 -0800165 scheduler.Deschedule(token);
Austin Schuh8bd96322020-02-13 21:18:22 -0800166 scheduler_scheduler.Run();
Neil Balchc8f41ed2018-01-20 22:06:53 -0800167 EXPECT_EQ(counter, 1);
168}
169
170// Test that descheduling an already scheduled event doesn't run the event.
171TEST(EventSchedulerTest, DescheduleEvent) {
172 int counter = 0;
Austin Schuh8bd96322020-02-13 21:18:22 -0800173 EventSchedulerScheduler scheduler_scheduler;
Austin Schuh58646e22021-08-23 23:51:46 -0700174 EventScheduler scheduler(0);
Austin Schuh8bd96322020-02-13 21:18:22 -0800175 scheduler_scheduler.AddEventScheduler(&scheduler);
Neil Balchc8f41ed2018-01-20 22:06:53 -0800176
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800177 FunctionEvent e([&counter]() { counter += 1; });
178 auto token =
179 scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(1), &e);
Neil Balchc8f41ed2018-01-20 22:06:53 -0800180 scheduler.Deschedule(token);
Austin Schuh8bd96322020-02-13 21:18:22 -0800181 scheduler_scheduler.Run();
Neil Balchc8f41ed2018-01-20 22:06:53 -0800182 EXPECT_EQ(counter, 0);
183}
Austin Schuh44019f92019-05-19 19:58:27 -0700184
Austin Schuhe33c08d2022-02-03 18:15:21 -0800185// Test that TemporarilyStopAndRun respects and preserves running.
186TEST(EventSchedulerTest, TemporarilyStopAndRun) {
187 int counter = 0;
188 EventSchedulerScheduler scheduler_scheduler;
189 EventScheduler scheduler(0);
190 scheduler_scheduler.AddEventScheduler(&scheduler);
191
192 scheduler_scheduler.TemporarilyStopAndRun(
193 [&]() { CHECK(!scheduler_scheduler.is_running()); });
194 ASSERT_FALSE(scheduler_scheduler.is_running());
195
196 FunctionEvent e([&]() {
197 counter += 1;
198 CHECK(scheduler_scheduler.is_running());
199 scheduler_scheduler.TemporarilyStopAndRun(
200 [&]() { CHECK(!scheduler_scheduler.is_running()); });
201 CHECK(scheduler_scheduler.is_running());
202 });
203 scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(1), &e);
204 scheduler_scheduler.Run();
205 EXPECT_EQ(counter, 1);
206}
207
Austin Schuh8fb315a2020-11-19 22:33:58 -0800208// Test that sending a message after running gets properly notified.
209TEST(SimulatedEventLoopTest, SendAfterRunFor) {
210 SimulatedEventLoopTestFactory factory;
211
212 SimulatedEventLoopFactory simulated_event_loop_factory(
213 factory.configuration());
214
215 ::std::unique_ptr<EventLoop> ping_event_loop =
216 simulated_event_loop_factory.MakeEventLoop("ping");
217 aos::Sender<TestMessage> test_message_sender =
218 ping_event_loop->MakeSender<TestMessage>("/test");
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700219 ASSERT_EQ(SendTestMessage(test_message_sender), RawSender::Error::kOk);
Austin Schuh8fb315a2020-11-19 22:33:58 -0800220
221 std::unique_ptr<EventLoop> pong1_event_loop =
222 simulated_event_loop_factory.MakeEventLoop("pong");
223 MessageCounter<TestMessage> test_message_counter1(pong1_event_loop.get(),
224 "/test");
225
226 EXPECT_FALSE(ping_event_loop->is_running());
227
228 // Watchers start when you start running, so there should be nothing counted.
229 simulated_event_loop_factory.RunFor(chrono::seconds(1));
230 EXPECT_EQ(test_message_counter1.count(), 0u);
231
232 std::unique_ptr<EventLoop> pong2_event_loop =
233 simulated_event_loop_factory.MakeEventLoop("pong");
234 MessageCounter<TestMessage> test_message_counter2(pong2_event_loop.get(),
235 "/test");
236
237 // Pauses in the middle don't count though, so this should be counted.
238 // But, the fresh watcher shouldn't pick it up yet.
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700239 ASSERT_EQ(SendTestMessage(test_message_sender), RawSender::Error::kOk);
Austin Schuh8fb315a2020-11-19 22:33:58 -0800240
241 EXPECT_EQ(test_message_counter1.count(), 0u);
242 EXPECT_EQ(test_message_counter2.count(), 0u);
243 simulated_event_loop_factory.RunFor(chrono::seconds(1));
244
245 EXPECT_EQ(test_message_counter1.count(), 1u);
246 EXPECT_EQ(test_message_counter2.count(), 0u);
247}
248
James Kuszmaul890c2492022-04-06 14:59:31 -0700249// Test that if we configure an event loop to be able to send too fast that we do allow it to do so.
250TEST(SimulatedEventLoopTest, AllowSendTooFast) {
251 SimulatedEventLoopTestFactory factory;
252
253 SimulatedEventLoopFactory simulated_event_loop_factory(
254 factory.configuration());
255
256 // Create two event loops: One will be allowed to send too fast, one won't. We
257 // will then test to ensure that the one that is allowed to send too fast can
258 // indeed send too fast, but that it then makes it so that the second event
259 // loop can no longer send anything because *it* is still limited.
260 ::std::unique_ptr<EventLoop> too_fast_event_loop =
261 simulated_event_loop_factory.GetNodeEventLoopFactory(nullptr)
262 ->MakeEventLoop("too_fast_sender",
263 {NodeEventLoopFactory::CheckSentTooFast::kNo,
264 NodeEventLoopFactory::ExclusiveSenders::kNo});
265 aos::Sender<TestMessage> too_fast_message_sender =
266 too_fast_event_loop->MakeSender<TestMessage>("/test");
267
268 ::std::unique_ptr<EventLoop> limited_event_loop =
269 simulated_event_loop_factory.MakeEventLoop("limited_sender");
270 aos::Sender<TestMessage> limited_message_sender =
271 limited_event_loop->MakeSender<TestMessage>("/test");
272
273 const int queue_size = TestChannelQueueSize(too_fast_event_loop.get());
274 for (int ii = 0; ii < queue_size; ++ii) {
275 ASSERT_EQ(SendTestMessage(too_fast_message_sender), RawSender::Error::kOk);
276 }
277 // And now we should start being in the sending-too-fast phase.
278 for (int ii = 0; ii < queue_size; ++ii) {
279 ASSERT_EQ(SendTestMessage(too_fast_message_sender), RawSender::Error::kOk);
280 ASSERT_EQ(SendTestMessage(limited_message_sender), RawSender::Error::kMessagesSentTooFast);
281 }
282}
283
284// Test that if we setup an exclusive sender that it is indeed exclusive.
285TEST(SimulatedEventLoopDeathTest, ExclusiveSenders) {
286 SimulatedEventLoopTestFactory factory;
287
288 SimulatedEventLoopFactory simulated_event_loop_factory(
289 factory.configuration());
290
291 ::std::unique_ptr<EventLoop> exclusive_event_loop =
292 simulated_event_loop_factory.GetNodeEventLoopFactory(nullptr)
293 ->MakeEventLoop("too_fast_sender",
294 {NodeEventLoopFactory::CheckSentTooFast::kYes,
295 NodeEventLoopFactory::ExclusiveSenders::kYes});
296 exclusive_event_loop->SkipAosLog();
297 exclusive_event_loop->SkipTimingReport();
298 ::std::unique_ptr<EventLoop> normal_event_loop =
299 simulated_event_loop_factory.MakeEventLoop("limited_sender");
300 // Set things up to have the exclusive sender be destroyed so we can test
301 // recovery.
302 {
303 aos::Sender<TestMessage> exclusive_sender =
304 exclusive_event_loop->MakeSender<TestMessage>("/test");
305
306 EXPECT_DEATH(normal_event_loop->MakeSender<TestMessage>("/test"),
307 "TestMessage");
308 }
309 // This one should succeed now that the exclusive channel is removed.
310 aos::Sender<TestMessage> normal_sender =
311 normal_event_loop->MakeSender<TestMessage>("/test");
312 EXPECT_DEATH(exclusive_event_loop->MakeSender<TestMessage>("/test"), "TestMessage");
313}
314
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700315void TestSentTooFastCheckEdgeCase(
316 const std::function<RawSender::Error(int, int)> expected_err,
317 const bool send_twice_at_end) {
318 SimulatedEventLoopTestFactory factory;
319
320 auto event_loop = factory.MakePrimary("primary");
321
322 auto sender = event_loop->MakeSender<TestMessage>("/test");
323
324 const int queue_size = TestChannelQueueSize(event_loop.get());
325 int msgs_sent = 0;
326 event_loop->AddPhasedLoop(
327 [&](int) {
328 EXPECT_EQ(SendTestMessage(sender), expected_err(msgs_sent, queue_size));
329 msgs_sent++;
330
331 // If send_twice_at_end, send the last two messages (message
332 // queue_size and queue_size + 1) in the same iteration, meaning that
333 // we would be sending very slightly too fast. Otherwise, we will send
334 // message queue_size + 1 in the next iteration and we will continue
335 // to be sending exactly at the channel frequency.
336 if (send_twice_at_end && (msgs_sent == queue_size)) {
337 EXPECT_EQ(SendTestMessage(sender),
338 expected_err(msgs_sent, queue_size));
339 msgs_sent++;
340 }
341
342 if (msgs_sent > queue_size) {
343 factory.Exit();
344 }
345 },
346 std::chrono::duration_cast<std::chrono::nanoseconds>(
347 std::chrono::duration<double>(
348 1.0 / TestChannelFrequency(event_loop.get()))));
349
350 factory.Run();
351}
352
353// Tests that RawSender::Error::kMessagesSentTooFast is not returned
354// when messages are sent at the exact frequency of the channel.
355TEST(SimulatedEventLoopTest, SendingAtExactlyChannelFrequency) {
356 TestSentTooFastCheckEdgeCase([](int, int) { return RawSender::Error::kOk; },
357 false);
358}
359
360// Tests that RawSender::Error::kMessagesSentTooFast is returned
361// when sending exactly one more message than allowed in a channel storage
362// duration.
363TEST(SimulatedEventLoopTest, SendingSlightlyTooFast) {
364 TestSentTooFastCheckEdgeCase(
365 [](const int msgs_sent, const int queue_size) {
366 return (msgs_sent == queue_size ? RawSender::Error::kMessagesSentTooFast
367 : RawSender::Error::kOk);
368 },
369 true);
370}
371
Austin Schuh8fb315a2020-11-19 22:33:58 -0800372// Test that creating an event loop while running dies.
373TEST(SimulatedEventLoopDeathTest, MakeEventLoopWhileRunning) {
374 SimulatedEventLoopTestFactory factory;
375
376 SimulatedEventLoopFactory simulated_event_loop_factory(
377 factory.configuration());
378
379 ::std::unique_ptr<EventLoop> event_loop =
380 simulated_event_loop_factory.MakeEventLoop("ping");
381
382 auto timer = event_loop->AddTimer([&]() {
383 EXPECT_DEATH(
384 {
385 ::std::unique_ptr<EventLoop> event_loop2 =
386 simulated_event_loop_factory.MakeEventLoop("ping");
387 },
388 "event loop while running");
389 simulated_event_loop_factory.Exit();
390 });
391
392 event_loop->OnRun([&event_loop, &timer] {
393 timer->Setup(event_loop->monotonic_now() + chrono::milliseconds(50));
394 });
395
396 simulated_event_loop_factory.Run();
397}
398
399// Test that creating a watcher after running dies.
400TEST(SimulatedEventLoopDeathTest, MakeWatcherAfterRunning) {
401 SimulatedEventLoopTestFactory factory;
402
403 SimulatedEventLoopFactory simulated_event_loop_factory(
404 factory.configuration());
405
406 ::std::unique_ptr<EventLoop> event_loop =
407 simulated_event_loop_factory.MakeEventLoop("ping");
408
409 simulated_event_loop_factory.RunFor(chrono::seconds(1));
410
411 EXPECT_DEATH(
412 { MessageCounter<TestMessage> counter(event_loop.get(), "/test"); },
413 "Can't add a watcher after running");
414
415 ::std::unique_ptr<EventLoop> event_loop2 =
416 simulated_event_loop_factory.MakeEventLoop("ping");
417
418 simulated_event_loop_factory.RunFor(chrono::seconds(1));
419
420 EXPECT_DEATH(
421 { MessageCounter<TestMessage> counter(event_loop2.get(), "/test"); },
422 "Can't add a watcher after running");
423}
424
Austin Schuh44019f92019-05-19 19:58:27 -0700425// Test that running for a time period with no handlers causes time to progress
426// correctly.
427TEST(SimulatedEventLoopTest, RunForNoHandlers) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800428 SimulatedEventLoopTestFactory factory;
429
430 SimulatedEventLoopFactory simulated_event_loop_factory(
431 factory.configuration());
Austin Schuh44019f92019-05-19 19:58:27 -0700432 ::std::unique_ptr<EventLoop> event_loop =
Austin Schuh5f1cc5c2019-12-01 18:01:11 -0800433 simulated_event_loop_factory.MakeEventLoop("loop");
Austin Schuh44019f92019-05-19 19:58:27 -0700434
435 simulated_event_loop_factory.RunFor(chrono::seconds(1));
436
437 EXPECT_EQ(::aos::monotonic_clock::epoch() + chrono::seconds(1),
Austin Schuh44019f92019-05-19 19:58:27 -0700438 event_loop->monotonic_now());
439}
440
441// Test that running for a time with a periodic handler causes time to end
442// correctly.
443TEST(SimulatedEventLoopTest, RunForTimerHandler) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800444 SimulatedEventLoopTestFactory factory;
445
446 SimulatedEventLoopFactory simulated_event_loop_factory(
447 factory.configuration());
Austin Schuh44019f92019-05-19 19:58:27 -0700448 ::std::unique_ptr<EventLoop> event_loop =
Austin Schuh5f1cc5c2019-12-01 18:01:11 -0800449 simulated_event_loop_factory.MakeEventLoop("loop");
Austin Schuh44019f92019-05-19 19:58:27 -0700450
451 int counter = 0;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700452 auto timer = event_loop->AddTimer([&counter]() { ++counter; });
Austin Schuh44019f92019-05-19 19:58:27 -0700453 event_loop->OnRun([&event_loop, &timer] {
454 timer->Setup(event_loop->monotonic_now() + chrono::milliseconds(50),
455 chrono::milliseconds(100));
456 });
457
458 simulated_event_loop_factory.RunFor(chrono::seconds(1));
459
460 EXPECT_EQ(::aos::monotonic_clock::epoch() + chrono::seconds(1),
Austin Schuh44019f92019-05-19 19:58:27 -0700461 event_loop->monotonic_now());
462 EXPECT_EQ(counter, 10);
463}
464
Austin Schuh7d87b672019-12-01 20:23:49 -0800465// Tests that watchers have latency in simulation.
466TEST(SimulatedEventLoopTest, WatcherTimingReport) {
467 SimulatedEventLoopTestFactory factory;
468 factory.set_send_delay(std::chrono::microseconds(50));
469
470 FLAGS_timing_report_ms = 1000;
471 auto loop1 = factory.MakePrimary("primary");
472 loop1->MakeWatcher("/test", [](const TestMessage &) {});
473
474 auto loop2 = factory.Make("sender_loop");
475
476 auto loop3 = factory.Make("report_fetcher");
477
478 Fetcher<timing::Report> report_fetcher =
479 loop3->MakeFetcher<timing::Report>("/aos");
480 EXPECT_FALSE(report_fetcher.Fetch());
481
482 auto sender = loop2->MakeSender<TestMessage>("/test");
483
484 // Send 10 messages in the middle of a timing report period so we get
485 // something interesting back.
486 auto test_timer = loop2->AddTimer([&sender]() {
487 for (int i = 0; i < 10; ++i) {
488 aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
489 TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
490 builder.add_value(200 + i);
milind1f1dca32021-07-03 13:50:07 -0700491 msg.CheckOk(msg.Send(builder.Finish()));
Austin Schuh7d87b672019-12-01 20:23:49 -0800492 }
493 });
494
495 // Quit after 1 timing report, mid way through the next cycle.
496 {
497 auto end_timer = loop1->AddTimer([&factory]() { factory.Exit(); });
498 end_timer->Setup(loop1->monotonic_now() + chrono::milliseconds(2500));
499 end_timer->set_name("end");
500 }
501
502 loop1->OnRun([&test_timer, &loop1]() {
503 test_timer->Setup(loop1->monotonic_now() + chrono::milliseconds(1500));
504 });
505
506 factory.Run();
507
508 // And, since we are here, check that the timing report makes sense.
509 // Start by looking for our event loop's timing.
510 FlatbufferDetachedBuffer<timing::Report> primary_report =
511 FlatbufferDetachedBuffer<timing::Report>::Empty();
512 while (report_fetcher.FetchNext()) {
513 LOG(INFO) << "Report " << FlatbufferToJson(report_fetcher.get());
514 if (report_fetcher->name()->string_view() == "primary") {
515 primary_report = CopyFlatBuffer(report_fetcher.get());
516 }
517 }
518
519 // Check the watcher report.
Ravago Jonescf453ab2020-05-06 21:14:53 -0700520 VLOG(1) << FlatbufferToJson(primary_report, {.multi_line = true});
Austin Schuh7d87b672019-12-01 20:23:49 -0800521
522 EXPECT_EQ(primary_report.message().name()->string_view(), "primary");
523
524 // Just the timing report timer.
525 ASSERT_NE(primary_report.message().timers(), nullptr);
526 EXPECT_EQ(primary_report.message().timers()->size(), 2);
527
528 // No phased loops
529 ASSERT_EQ(primary_report.message().phased_loops(), nullptr);
530
531 // And now confirm that the watcher received all 10 messages, and has latency.
532 ASSERT_NE(primary_report.message().watchers(), nullptr);
533 ASSERT_EQ(primary_report.message().watchers()->size(), 1);
534 EXPECT_EQ(primary_report.message().watchers()->Get(0)->count(), 10);
535 EXPECT_NEAR(
536 primary_report.message().watchers()->Get(0)->wakeup_latency()->average(),
537 0.00005, 1e-9);
538 EXPECT_NEAR(
539 primary_report.message().watchers()->Get(0)->wakeup_latency()->min(),
540 0.00005, 1e-9);
541 EXPECT_NEAR(
542 primary_report.message().watchers()->Get(0)->wakeup_latency()->max(),
543 0.00005, 1e-9);
544 EXPECT_EQ(primary_report.message()
545 .watchers()
546 ->Get(0)
547 ->wakeup_latency()
548 ->standard_deviation(),
549 0.0);
550
551 EXPECT_EQ(
552 primary_report.message().watchers()->Get(0)->handler_time()->average(),
553 0.0);
554 EXPECT_EQ(primary_report.message().watchers()->Get(0)->handler_time()->min(),
555 0.0);
556 EXPECT_EQ(primary_report.message().watchers()->Get(0)->handler_time()->max(),
557 0.0);
558 EXPECT_EQ(primary_report.message()
559 .watchers()
560 ->Get(0)
561 ->handler_time()
562 ->standard_deviation(),
563 0.0);
564}
565
Austin Schuh89c9b812021-02-20 14:42:10 -0800566size_t CountAll(
567 const std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
568 &counters) {
569 size_t count = 0u;
570 for (const std::unique_ptr<MessageCounter<RemoteMessage>> &counter :
571 counters) {
572 count += counter->count();
573 }
574 return count;
575}
576
Austin Schuh4c3b9702020-08-30 11:34:55 -0700577// Tests that ping and pong work when on 2 different nodes, and the message
578// gateway messages are sent out as expected.
Austin Schuh89c9b812021-02-20 14:42:10 -0800579TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodePingPong) {
Austin Schuh898f4972020-01-11 17:21:25 -0800580 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
581 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
Austin Schuh4c3b9702020-08-30 11:34:55 -0700582 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
Austin Schuh898f4972020-01-11 17:21:25 -0800583
584 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
585
586 std::unique_ptr<EventLoop> ping_event_loop =
587 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
588 Ping ping(ping_event_loop.get());
589
590 std::unique_ptr<EventLoop> pong_event_loop =
591 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
592 Pong pong(pong_event_loop.get());
593
594 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
595 simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700596 MessageCounter<examples::Pong> pi2_pong_counter(
597 pi2_pong_counter_event_loop.get(), "/test");
Austin Schuh2f8fd752020-09-01 22:38:28 -0700598 aos::Fetcher<message_bridge::Timestamp> pi1_on_pi2_timestamp_fetcher =
599 pi2_pong_counter_event_loop->MakeFetcher<message_bridge::Timestamp>(
600 "/pi1/aos");
601 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
602 pi2_pong_counter_event_loop->MakeFetcher<examples::Ping>("/test");
Austin Schuh898f4972020-01-11 17:21:25 -0800603
Austin Schuh4c3b9702020-08-30 11:34:55 -0700604 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
605 simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
Austin Schuh898f4972020-01-11 17:21:25 -0800606
607 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
608 simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700609 MessageCounter<examples::Pong> pi1_pong_counter(
610 pi1_pong_counter_event_loop.get(), "/test");
Austin Schuh2f8fd752020-09-01 22:38:28 -0700611 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
612 pi1_pong_counter_event_loop->MakeFetcher<examples::Ping>("/test");
613 aos::Fetcher<message_bridge::Timestamp> pi1_on_pi1_timestamp_fetcher =
614 pi1_pong_counter_event_loop->MakeFetcher<message_bridge::Timestamp>(
615 "/aos");
616
Austin Schuh4c3b9702020-08-30 11:34:55 -0700617 // Count timestamps.
618 MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
619 pi1_pong_counter_event_loop.get(), "/pi1/aos");
620 MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
621 pi2_pong_counter_event_loop.get(), "/pi1/aos");
622 MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
623 pi3_pong_counter_event_loop.get(), "/pi1/aos");
624 MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
625 pi1_pong_counter_event_loop.get(), "/pi2/aos");
626 MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
627 pi2_pong_counter_event_loop.get(), "/pi2/aos");
628 MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
629 pi1_pong_counter_event_loop.get(), "/pi3/aos");
630 MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
631 pi3_pong_counter_event_loop.get(), "/pi3/aos");
632
Austin Schuh2f8fd752020-09-01 22:38:28 -0700633 // Count remote timestamps
Austin Schuh89c9b812021-02-20 14:42:10 -0800634 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
635 remote_timestamps_pi2_on_pi1 =
636 MakePi2OnPi1MessageCounters(pi1_pong_counter_event_loop.get());
637 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
638 remote_timestamps_pi1_on_pi2 =
639 MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700640
Austin Schuh4c3b9702020-08-30 11:34:55 -0700641 // Wait to let timestamp estimation start up before looking for the results.
642 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
643
Austin Schuh8fb315a2020-11-19 22:33:58 -0800644 std::unique_ptr<EventLoop> pi1_statistics_counter_event_loop =
645 simulated_event_loop_factory.MakeEventLoop("pi1_statistics_counter", pi1);
646 std::unique_ptr<EventLoop> pi2_statistics_counter_event_loop =
647 simulated_event_loop_factory.MakeEventLoop("pi2_statistics_counter", pi2);
648 std::unique_ptr<EventLoop> pi3_statistics_counter_event_loop =
649 simulated_event_loop_factory.MakeEventLoop("pi3_statistics_counter", pi3);
650
Austin Schuh4c3b9702020-08-30 11:34:55 -0700651 int pi1_server_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800652 pi1_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700653 "/pi1/aos", [&pi1_server_statistics_count](
654 const message_bridge::ServerStatistics &stats) {
655 VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
656 EXPECT_EQ(stats.connections()->size(), 2u);
657 for (const message_bridge::ServerConnection *connection :
658 *stats.connections()) {
659 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800660 EXPECT_EQ(connection->connection_count(), 1u);
661 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800662 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700663 if (connection->node()->name()->string_view() == "pi2") {
664 EXPECT_GT(connection->sent_packets(), 50);
665 } else if (connection->node()->name()->string_view() == "pi3") {
666 EXPECT_GE(connection->sent_packets(), 5);
667 } else {
668 LOG(FATAL) << "Unknown connection";
669 }
670
671 EXPECT_TRUE(connection->has_monotonic_offset());
672 EXPECT_EQ(connection->monotonic_offset(), 0);
673 }
674 ++pi1_server_statistics_count;
675 });
676
677 int pi2_server_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800678 pi2_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700679 "/pi2/aos", [&pi2_server_statistics_count](
680 const message_bridge::ServerStatistics &stats) {
681 VLOG(1) << "pi2 ServerStatistics " << FlatbufferToJson(&stats);
682 EXPECT_EQ(stats.connections()->size(), 1u);
683
684 const message_bridge::ServerConnection *connection =
685 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800686 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700687 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
688 EXPECT_GT(connection->sent_packets(), 50);
689 EXPECT_TRUE(connection->has_monotonic_offset());
690 EXPECT_EQ(connection->monotonic_offset(), 0);
Austin Schuh367a7f42021-11-23 23:04:36 -0800691 EXPECT_EQ(connection->connection_count(), 1u);
692 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700693 ++pi2_server_statistics_count;
694 });
695
696 int pi3_server_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800697 pi3_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700698 "/pi3/aos", [&pi3_server_statistics_count](
699 const message_bridge::ServerStatistics &stats) {
700 VLOG(1) << "pi3 ServerStatistics " << FlatbufferToJson(&stats);
701 EXPECT_EQ(stats.connections()->size(), 1u);
702
703 const message_bridge::ServerConnection *connection =
704 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800705 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700706 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
707 EXPECT_GE(connection->sent_packets(), 5);
708 EXPECT_TRUE(connection->has_monotonic_offset());
709 EXPECT_EQ(connection->monotonic_offset(), 0);
Austin Schuh367a7f42021-11-23 23:04:36 -0800710 EXPECT_EQ(connection->connection_count(), 1u);
711 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700712 ++pi3_server_statistics_count;
713 });
714
715 int pi1_client_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800716 pi1_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700717 "/pi1/aos", [&pi1_client_statistics_count](
718 const message_bridge::ClientStatistics &stats) {
719 VLOG(1) << "pi1 ClientStatistics " << FlatbufferToJson(&stats);
720 EXPECT_EQ(stats.connections()->size(), 2u);
721
722 for (const message_bridge::ClientConnection *connection :
723 *stats.connections()) {
724 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
725 if (connection->node()->name()->string_view() == "pi2") {
726 EXPECT_GT(connection->received_packets(), 50);
727 } else if (connection->node()->name()->string_view() == "pi3") {
728 EXPECT_GE(connection->received_packets(), 5);
729 } else {
730 LOG(FATAL) << "Unknown connection";
731 }
732
Austin Schuhe61d4382021-03-31 21:33:02 -0700733 EXPECT_EQ(connection->partial_deliveries(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700734 EXPECT_TRUE(connection->has_monotonic_offset());
735 EXPECT_EQ(connection->monotonic_offset(), 150000);
Austin Schuh367a7f42021-11-23 23:04:36 -0800736 EXPECT_EQ(connection->connection_count(), 1u);
737 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700738 }
739 ++pi1_client_statistics_count;
740 });
741
742 int pi2_client_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800743 pi2_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700744 "/pi2/aos", [&pi2_client_statistics_count](
745 const message_bridge::ClientStatistics &stats) {
746 VLOG(1) << "pi2 ClientStatistics " << FlatbufferToJson(&stats);
747 EXPECT_EQ(stats.connections()->size(), 1u);
748
749 const message_bridge::ClientConnection *connection =
750 stats.connections()->Get(0);
751 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
752 EXPECT_GT(connection->received_packets(), 50);
Austin Schuhe61d4382021-03-31 21:33:02 -0700753 EXPECT_EQ(connection->partial_deliveries(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700754 EXPECT_TRUE(connection->has_monotonic_offset());
755 EXPECT_EQ(connection->monotonic_offset(), 150000);
Austin Schuh367a7f42021-11-23 23:04:36 -0800756 EXPECT_EQ(connection->connection_count(), 1u);
757 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700758 ++pi2_client_statistics_count;
759 });
760
761 int pi3_client_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800762 pi3_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700763 "/pi3/aos", [&pi3_client_statistics_count](
764 const message_bridge::ClientStatistics &stats) {
765 VLOG(1) << "pi3 ClientStatistics " << FlatbufferToJson(&stats);
766 EXPECT_EQ(stats.connections()->size(), 1u);
767
768 const message_bridge::ClientConnection *connection =
769 stats.connections()->Get(0);
770 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
771 EXPECT_GE(connection->received_packets(), 5);
Austin Schuhe61d4382021-03-31 21:33:02 -0700772 EXPECT_EQ(connection->partial_deliveries(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700773 EXPECT_TRUE(connection->has_monotonic_offset());
774 EXPECT_EQ(connection->monotonic_offset(), 150000);
Austin Schuh367a7f42021-11-23 23:04:36 -0800775 EXPECT_EQ(connection->connection_count(), 1u);
776 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700777 ++pi3_client_statistics_count;
778 });
779
Austin Schuh2f8fd752020-09-01 22:38:28 -0700780 // Find the channel index for both the /pi1/aos Timestamp channel and Ping
781 // channel.
782 const size_t pi1_timestamp_channel =
783 configuration::ChannelIndex(pi1_pong_counter_event_loop->configuration(),
784 pi1_on_pi2_timestamp_fetcher.channel());
785 const size_t ping_timestamp_channel =
786 configuration::ChannelIndex(pi1_pong_counter_event_loop->configuration(),
787 ping_on_pi2_fetcher.channel());
788
789 for (const Channel *channel :
790 *pi1_pong_counter_event_loop->configuration()->channels()) {
791 VLOG(1) << "Channel "
792 << configuration::ChannelIndex(
793 pi1_pong_counter_event_loop->configuration(), channel)
794 << " " << configuration::CleanedChannelToString(channel);
795 }
796
Austin Schuh8fb315a2020-11-19 22:33:58 -0800797 std::unique_ptr<EventLoop> pi1_remote_timestamp =
798 simulated_event_loop_factory.MakeEventLoop("pi1_remote_timestamp", pi1);
799
Austin Schuh89c9b812021-02-20 14:42:10 -0800800 for (std::pair<int, std::string> channel :
801 shared()
802 ? std::vector<std::pair<
803 int, std::string>>{{-1, "/pi1/aos/remote_timestamps/pi2"}}
804 : std::vector<std::pair<int, std::string>>{
805 {pi1_timestamp_channel,
806 "/pi1/aos/remote_timestamps/pi2/pi1/aos/"
807 "aos-message_bridge-Timestamp"},
808 {ping_timestamp_channel,
809 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping"}}) {
810 // For each remote timestamp we get back, confirm that it is either a ping
811 // message, or a timestamp we sent out. Also confirm that the timestamps
812 // are correct.
813 pi1_remote_timestamp->MakeWatcher(
814 channel.second,
815 [pi1_timestamp_channel, ping_timestamp_channel, &ping_on_pi2_fetcher,
816 &ping_on_pi1_fetcher, &pi1_on_pi2_timestamp_fetcher,
817 &pi1_on_pi1_timestamp_fetcher, &simulated_event_loop_factory, pi2,
818 channel_index = channel.first](const RemoteMessage &header) {
819 VLOG(1) << aos::FlatbufferToJson(&header);
820 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuhcdd90272021-03-15 12:46:16 -0700821 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
Austin Schuh89c9b812021-02-20 14:42:10 -0800822 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
Austin Schuhcdd90272021-03-15 12:46:16 -0700823 ->boot_uuid());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700824
Austin Schuh89c9b812021-02-20 14:42:10 -0800825 const aos::monotonic_clock::time_point header_monotonic_sent_time(
826 chrono::nanoseconds(header.monotonic_sent_time()));
827 const aos::realtime_clock::time_point header_realtime_sent_time(
828 chrono::nanoseconds(header.realtime_sent_time()));
829 const aos::monotonic_clock::time_point header_monotonic_remote_time(
830 chrono::nanoseconds(header.monotonic_remote_time()));
831 const aos::realtime_clock::time_point header_realtime_remote_time(
832 chrono::nanoseconds(header.realtime_remote_time()));
Austin Schuh2f8fd752020-09-01 22:38:28 -0700833
Austin Schuh89c9b812021-02-20 14:42:10 -0800834 if (channel_index != -1) {
835 ASSERT_EQ(channel_index, header.channel_index());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700836 }
837
Austin Schuh89c9b812021-02-20 14:42:10 -0800838 const Context *pi1_context = nullptr;
839 const Context *pi2_context = nullptr;
840
841 if (header.channel_index() == pi1_timestamp_channel) {
842 // Find the forwarded message.
843 while (pi1_on_pi2_timestamp_fetcher.context().monotonic_event_time <
844 header_monotonic_sent_time) {
845 ASSERT_TRUE(pi1_on_pi2_timestamp_fetcher.FetchNext());
846 }
847
848 // And the source message.
849 while (pi1_on_pi1_timestamp_fetcher.context().monotonic_event_time <
850 header_monotonic_remote_time) {
851 ASSERT_TRUE(pi1_on_pi1_timestamp_fetcher.FetchNext());
852 }
853
854 pi1_context = &pi1_on_pi1_timestamp_fetcher.context();
855 pi2_context = &pi1_on_pi2_timestamp_fetcher.context();
856 } else if (header.channel_index() == ping_timestamp_channel) {
857 // Find the forwarded message.
858 while (ping_on_pi2_fetcher.context().monotonic_event_time <
859 header_monotonic_sent_time) {
860 ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
861 }
862
863 // And the source message.
864 while (ping_on_pi1_fetcher.context().monotonic_event_time <
865 header_monotonic_remote_time) {
866 ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
867 }
868
869 pi1_context = &ping_on_pi1_fetcher.context();
870 pi2_context = &ping_on_pi2_fetcher.context();
871 } else {
872 LOG(FATAL) << "Unknown channel";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700873 }
874
Austin Schuh89c9b812021-02-20 14:42:10 -0800875 // Confirm the forwarded message has matching timestamps to the
876 // timestamps we got back.
877 EXPECT_EQ(pi2_context->queue_index, header.queue_index());
878 EXPECT_EQ(pi2_context->remote_queue_index,
879 header.remote_queue_index());
880 EXPECT_EQ(pi2_context->monotonic_event_time,
881 header_monotonic_sent_time);
882 EXPECT_EQ(pi2_context->realtime_event_time,
883 header_realtime_sent_time);
884 EXPECT_EQ(pi2_context->realtime_remote_time,
885 header_realtime_remote_time);
886 EXPECT_EQ(pi2_context->monotonic_remote_time,
887 header_monotonic_remote_time);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700888
Austin Schuh89c9b812021-02-20 14:42:10 -0800889 // Confirm the forwarded message also matches the source message.
890 EXPECT_EQ(pi1_context->queue_index, header.remote_queue_index());
891 EXPECT_EQ(pi1_context->monotonic_event_time,
892 header_monotonic_remote_time);
893 EXPECT_EQ(pi1_context->realtime_event_time,
894 header_realtime_remote_time);
895 });
896 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700897
Austin Schuh4c3b9702020-08-30 11:34:55 -0700898 simulated_event_loop_factory.RunFor(chrono::seconds(10) -
899 chrono::milliseconds(500) +
900 chrono::milliseconds(5));
901
902 EXPECT_EQ(pi1_pong_counter.count(), 1001);
903 EXPECT_EQ(pi2_pong_counter.count(), 1001);
904
905 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 100);
906 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 100);
907 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 100);
908 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 100);
909 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 100);
910 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 100);
911 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 100);
912
Austin Schuh20ac95d2020-12-05 17:24:19 -0800913 EXPECT_EQ(pi1_server_statistics_count, 10);
914 EXPECT_EQ(pi2_server_statistics_count, 10);
915 EXPECT_EQ(pi3_server_statistics_count, 10);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700916
917 EXPECT_EQ(pi1_client_statistics_count, 95);
918 EXPECT_EQ(pi2_client_statistics_count, 95);
919 EXPECT_EQ(pi3_client_statistics_count, 95);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700920
921 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -0800922 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 1101);
923 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 1101);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700924}
925
926// Tests that an offset between nodes can be recovered and shows up in
927// ServerStatistics correctly.
928TEST(SimulatedEventLoopTest, MultinodePingPongWithOffset) {
929 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
Austin Schuh373f1762021-06-02 21:07:09 -0700930 aos::configuration::ReadConfig(ArtifactPath(
931 "aos/events/multinode_pingpong_test_combined_config.json"));
Austin Schuh4c3b9702020-08-30 11:34:55 -0700932 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
Austin Schuh87dd3832021-01-01 23:07:31 -0800933 const size_t pi1_index = configuration::GetNodeIndex(&config.message(), pi1);
934 ASSERT_EQ(pi1_index, 0u);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700935 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
Austin Schuh87dd3832021-01-01 23:07:31 -0800936 const size_t pi2_index = configuration::GetNodeIndex(&config.message(), pi2);
937 ASSERT_EQ(pi2_index, 1u);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700938 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
Austin Schuh87dd3832021-01-01 23:07:31 -0800939 const size_t pi3_index = configuration::GetNodeIndex(&config.message(), pi3);
940 ASSERT_EQ(pi3_index, 2u);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700941
Austin Schuh87dd3832021-01-01 23:07:31 -0800942 message_bridge::TestingTimeConverter time(
943 configuration::NodesCount(&config.message()));
Austin Schuh4c3b9702020-08-30 11:34:55 -0700944 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
Austin Schuh58646e22021-08-23 23:51:46 -0700945 simulated_event_loop_factory.SetTimeConverter(&time);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700946
947 constexpr chrono::milliseconds kOffset{1501};
Austin Schuh87dd3832021-01-01 23:07:31 -0800948 time.AddNextTimestamp(
949 distributed_clock::epoch(),
Austin Schuh58646e22021-08-23 23:51:46 -0700950 {BootTimestamp::epoch(), BootTimestamp::epoch() + kOffset,
951 BootTimestamp::epoch()});
Austin Schuh4c3b9702020-08-30 11:34:55 -0700952
953 std::unique_ptr<EventLoop> ping_event_loop =
954 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
955 Ping ping(ping_event_loop.get());
956
957 std::unique_ptr<EventLoop> pong_event_loop =
958 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
959 Pong pong(pong_event_loop.get());
960
Austin Schuh8fb315a2020-11-19 22:33:58 -0800961 // Wait to let timestamp estimation start up before looking for the results.
962 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
963
Austin Schuh87dd3832021-01-01 23:07:31 -0800964 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
965 simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
966
Austin Schuh4c3b9702020-08-30 11:34:55 -0700967 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
968 simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
969
970 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
971 simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
972
Austin Schuh4c3b9702020-08-30 11:34:55 -0700973 // Confirm the offsets are being recovered correctly.
974 int pi1_server_statistics_count = 0;
975 pi1_pong_counter_event_loop->MakeWatcher(
976 "/pi1/aos", [&pi1_server_statistics_count,
977 kOffset](const message_bridge::ServerStatistics &stats) {
978 VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
979 EXPECT_EQ(stats.connections()->size(), 2u);
980 for (const message_bridge::ServerConnection *connection :
981 *stats.connections()) {
982 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800983 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700984 if (connection->node()->name()->string_view() == "pi2") {
985 EXPECT_EQ(connection->monotonic_offset(),
986 chrono::nanoseconds(kOffset).count());
987 } else if (connection->node()->name()->string_view() == "pi3") {
988 EXPECT_EQ(connection->monotonic_offset(), 0);
989 } else {
990 LOG(FATAL) << "Unknown connection";
991 }
992
993 EXPECT_TRUE(connection->has_monotonic_offset());
994 }
995 ++pi1_server_statistics_count;
996 });
997
998 int pi2_server_statistics_count = 0;
999 pi2_pong_counter_event_loop->MakeWatcher(
1000 "/pi2/aos", [&pi2_server_statistics_count,
1001 kOffset](const message_bridge::ServerStatistics &stats) {
1002 VLOG(1) << "pi2 ServerStatistics " << FlatbufferToJson(&stats);
1003 EXPECT_EQ(stats.connections()->size(), 1u);
1004
1005 const message_bridge::ServerConnection *connection =
1006 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001007 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -07001008 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1009 EXPECT_TRUE(connection->has_monotonic_offset());
1010 EXPECT_EQ(connection->monotonic_offset(),
1011 -chrono::nanoseconds(kOffset).count());
1012 ++pi2_server_statistics_count;
1013 });
1014
1015 int pi3_server_statistics_count = 0;
1016 pi3_pong_counter_event_loop->MakeWatcher(
1017 "/pi3/aos", [&pi3_server_statistics_count](
1018 const message_bridge::ServerStatistics &stats) {
1019 VLOG(1) << "pi3 ServerStatistics " << FlatbufferToJson(&stats);
1020 EXPECT_EQ(stats.connections()->size(), 1u);
1021
1022 const message_bridge::ServerConnection *connection =
1023 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001024 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -07001025 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1026 EXPECT_TRUE(connection->has_monotonic_offset());
1027 EXPECT_EQ(connection->monotonic_offset(), 0);
1028 ++pi3_server_statistics_count;
1029 });
1030
1031 simulated_event_loop_factory.RunFor(chrono::seconds(10) -
1032 chrono::milliseconds(500) +
1033 chrono::milliseconds(5));
1034
Austin Schuh20ac95d2020-12-05 17:24:19 -08001035 EXPECT_EQ(pi1_server_statistics_count, 10);
Austin Schuh58646e22021-08-23 23:51:46 -07001036 EXPECT_EQ(pi2_server_statistics_count, 10);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001037 EXPECT_EQ(pi3_server_statistics_count, 10);
Austin Schuh4c3b9702020-08-30 11:34:55 -07001038}
1039
1040// Test that disabling statistics actually disables them.
Austin Schuh89c9b812021-02-20 14:42:10 -08001041TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeWithoutStatistics) {
Austin Schuh4c3b9702020-08-30 11:34:55 -07001042 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
1043 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
1044 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
1045
1046 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
1047 simulated_event_loop_factory.DisableStatistics();
1048
1049 std::unique_ptr<EventLoop> ping_event_loop =
1050 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
1051 Ping ping(ping_event_loop.get());
1052
1053 std::unique_ptr<EventLoop> pong_event_loop =
1054 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
1055 Pong pong(pong_event_loop.get());
1056
1057 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
1058 simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
1059
1060 MessageCounter<examples::Pong> pi2_pong_counter(
1061 pi2_pong_counter_event_loop.get(), "/test");
1062
1063 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
1064 simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
1065
1066 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
1067 simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
1068
1069 MessageCounter<examples::Pong> pi1_pong_counter(
1070 pi1_pong_counter_event_loop.get(), "/test");
1071
1072 // Count timestamps.
1073 MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
1074 pi1_pong_counter_event_loop.get(), "/pi1/aos");
1075 MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
1076 pi2_pong_counter_event_loop.get(), "/pi1/aos");
1077 MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
1078 pi3_pong_counter_event_loop.get(), "/pi1/aos");
1079 MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
1080 pi1_pong_counter_event_loop.get(), "/pi2/aos");
1081 MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
1082 pi2_pong_counter_event_loop.get(), "/pi2/aos");
1083 MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
1084 pi1_pong_counter_event_loop.get(), "/pi3/aos");
1085 MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
1086 pi3_pong_counter_event_loop.get(), "/pi3/aos");
1087
Austin Schuh2f8fd752020-09-01 22:38:28 -07001088 // Count remote timestamps
Austin Schuh89c9b812021-02-20 14:42:10 -08001089 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1090 remote_timestamps_pi2_on_pi1 =
1091 MakePi2OnPi1MessageCounters(pi1_pong_counter_event_loop.get());
1092 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1093 remote_timestamps_pi1_on_pi2 =
1094 MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
Austin Schuh2f8fd752020-09-01 22:38:28 -07001095
Austin Schuh4c3b9702020-08-30 11:34:55 -07001096 MessageCounter<message_bridge::ServerStatistics>
1097 pi1_server_statistics_counter(pi1_pong_counter_event_loop.get(),
1098 "/pi1/aos");
1099 MessageCounter<message_bridge::ServerStatistics>
1100 pi2_server_statistics_counter(pi2_pong_counter_event_loop.get(),
1101 "/pi2/aos");
1102 MessageCounter<message_bridge::ServerStatistics>
1103 pi3_server_statistics_counter(pi3_pong_counter_event_loop.get(),
1104 "/pi3/aos");
1105
1106 MessageCounter<message_bridge::ClientStatistics>
1107 pi1_client_statistics_counter(pi1_pong_counter_event_loop.get(),
1108 "/pi1/aos");
1109 MessageCounter<message_bridge::ClientStatistics>
1110 pi2_client_statistics_counter(pi2_pong_counter_event_loop.get(),
1111 "/pi2/aos");
1112 MessageCounter<message_bridge::ClientStatistics>
1113 pi3_client_statistics_counter(pi3_pong_counter_event_loop.get(),
1114 "/pi3/aos");
Austin Schuh898f4972020-01-11 17:21:25 -08001115
1116 simulated_event_loop_factory.RunFor(chrono::seconds(10) +
1117 chrono::milliseconds(5));
1118
Austin Schuh4c3b9702020-08-30 11:34:55 -07001119 EXPECT_EQ(pi1_pong_counter.count(), 1001u);
1120 EXPECT_EQ(pi2_pong_counter.count(), 1001u);
1121
1122 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 0u);
1123 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 0u);
1124 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 0u);
1125 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 0u);
1126 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 0u);
1127 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 0u);
1128 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 0u);
1129
1130 EXPECT_EQ(pi1_server_statistics_counter.count(), 0u);
1131 EXPECT_EQ(pi2_server_statistics_counter.count(), 0u);
1132 EXPECT_EQ(pi3_server_statistics_counter.count(), 0u);
1133
1134 EXPECT_EQ(pi1_client_statistics_counter.count(), 0u);
1135 EXPECT_EQ(pi2_client_statistics_counter.count(), 0u);
1136 EXPECT_EQ(pi3_client_statistics_counter.count(), 0u);
Austin Schuh2f8fd752020-09-01 22:38:28 -07001137
1138 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001139 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 1001);
1140 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 1001);
Austin Schuh898f4972020-01-11 17:21:25 -08001141}
1142
Austin Schuhc0b0f722020-12-12 18:36:06 -08001143bool AllConnected(const message_bridge::ServerStatistics *server_statistics) {
1144 for (const message_bridge::ServerConnection *connection :
1145 *server_statistics->connections()) {
1146 if (connection->state() != message_bridge::State::CONNECTED) {
1147 return false;
1148 }
1149 }
1150 return true;
1151}
1152
1153bool AllConnectedBut(const message_bridge::ServerStatistics *server_statistics,
1154 std::string_view target) {
1155 for (const message_bridge::ServerConnection *connection :
1156 *server_statistics->connections()) {
1157 if (connection->node()->name()->string_view() == target) {
1158 if (connection->state() == message_bridge::State::CONNECTED) {
1159 return false;
1160 }
1161 } else {
1162 if (connection->state() != message_bridge::State::CONNECTED) {
1163 return false;
1164 }
1165 }
1166 }
1167 return true;
1168}
1169
1170bool AllConnected(const message_bridge::ClientStatistics *client_statistics) {
1171 for (const message_bridge::ClientConnection *connection :
1172 *client_statistics->connections()) {
1173 if (connection->state() != message_bridge::State::CONNECTED) {
1174 return false;
1175 }
Austin Schuh367a7f42021-11-23 23:04:36 -08001176 EXPECT_TRUE(connection->has_boot_uuid());
1177 EXPECT_TRUE(connection->has_connected_since_time());
1178 EXPECT_TRUE(connection->has_connection_count());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001179 }
1180 return true;
1181}
1182
1183bool AllConnectedBut(const message_bridge::ClientStatistics *client_statistics,
1184 std::string_view target) {
1185 for (const message_bridge::ClientConnection *connection :
1186 *client_statistics->connections()) {
1187 if (connection->node()->name()->string_view() == target) {
1188 if (connection->state() == message_bridge::State::CONNECTED) {
1189 return false;
1190 }
Austin Schuh367a7f42021-11-23 23:04:36 -08001191 EXPECT_FALSE(connection->has_boot_uuid());
1192 EXPECT_FALSE(connection->has_connected_since_time());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001193 } else {
1194 if (connection->state() != message_bridge::State::CONNECTED) {
1195 return false;
1196 }
Austin Schuh367a7f42021-11-23 23:04:36 -08001197 EXPECT_TRUE(connection->has_boot_uuid());
1198 EXPECT_TRUE(connection->has_connected_since_time());
1199 EXPECT_TRUE(connection->has_connection_count());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001200 }
1201 }
1202 return true;
1203}
1204
Austin Schuh367a7f42021-11-23 23:04:36 -08001205int ConnectedCount(const message_bridge::ClientStatistics *client_statistics,
1206 std::string_view target) {
1207 for (const message_bridge::ClientConnection *connection :
1208 *client_statistics->connections()) {
1209 if (connection->node()->name()->string_view() == target) {
1210 return connection->connection_count();
1211 }
1212 }
1213 return 0;
1214}
1215
1216int ConnectedCount(const message_bridge::ServerStatistics *server_statistics,
1217 std::string_view target) {
1218 for (const message_bridge::ServerConnection *connection :
1219 *server_statistics->connections()) {
1220 if (connection->node()->name()->string_view() == target) {
1221 return connection->connection_count();
1222 }
1223 }
1224 return 0;
1225}
1226
Austin Schuhc0b0f722020-12-12 18:36:06 -08001227// Test that disconnecting nodes actually disconnects them.
Austin Schuh89c9b812021-02-20 14:42:10 -08001228TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeDisconnect) {
Austin Schuhc0b0f722020-12-12 18:36:06 -08001229 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
1230
Austin Schuh58646e22021-08-23 23:51:46 -07001231 NodeEventLoopFactory *pi1 =
1232 simulated_event_loop_factory.GetNodeEventLoopFactory("pi1");
1233 NodeEventLoopFactory *pi2 =
1234 simulated_event_loop_factory.GetNodeEventLoopFactory("pi2");
1235 NodeEventLoopFactory *pi3 =
1236 simulated_event_loop_factory.GetNodeEventLoopFactory("pi3");
1237
1238 std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001239 Ping ping(ping_event_loop.get());
1240
Austin Schuh58646e22021-08-23 23:51:46 -07001241 std::unique_ptr<EventLoop> pong_event_loop = pi2->MakeEventLoop("pong");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001242 Pong pong(pong_event_loop.get());
1243
1244 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
Austin Schuh58646e22021-08-23 23:51:46 -07001245 pi2->MakeEventLoop("pi2_pong_counter");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001246
1247 MessageCounter<examples::Pong> pi2_pong_counter(
1248 pi2_pong_counter_event_loop.get(), "/test");
1249
1250 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
Austin Schuh58646e22021-08-23 23:51:46 -07001251 pi3->MakeEventLoop("pi3_pong_counter");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001252
1253 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
Austin Schuh58646e22021-08-23 23:51:46 -07001254 pi1->MakeEventLoop("pi1_pong_counter");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001255
1256 MessageCounter<examples::Pong> pi1_pong_counter(
1257 pi1_pong_counter_event_loop.get(), "/test");
1258
1259 // Count timestamps.
1260 MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
1261 pi1_pong_counter_event_loop.get(), "/pi1/aos");
1262 MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
1263 pi2_pong_counter_event_loop.get(), "/pi1/aos");
1264 MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
1265 pi3_pong_counter_event_loop.get(), "/pi1/aos");
1266 MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
1267 pi1_pong_counter_event_loop.get(), "/pi2/aos");
1268 MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
1269 pi2_pong_counter_event_loop.get(), "/pi2/aos");
1270 MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
1271 pi1_pong_counter_event_loop.get(), "/pi3/aos");
1272 MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
1273 pi3_pong_counter_event_loop.get(), "/pi3/aos");
1274
1275 // Count remote timestamps
Austin Schuh89c9b812021-02-20 14:42:10 -08001276 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1277 remote_timestamps_pi2_on_pi1 =
1278 MakePi2OnPi1MessageCounters(pi1_pong_counter_event_loop.get());
1279 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1280 remote_timestamps_pi1_on_pi2 =
1281 MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001282
1283 MessageCounter<message_bridge::ServerStatistics>
Austin Schuh58646e22021-08-23 23:51:46 -07001284 *pi1_server_statistics_counter;
1285 pi1->OnStartup([pi1, &pi1_server_statistics_counter]() {
1286 pi1_server_statistics_counter =
1287 pi1->AlwaysStart<MessageCounter<message_bridge::ServerStatistics>>(
1288 "pi1_server_statistics_counter", "/pi1/aos");
1289 });
1290
Austin Schuhc0b0f722020-12-12 18:36:06 -08001291 aos::Fetcher<message_bridge::ServerStatistics> pi1_server_statistics_fetcher =
1292 pi1_pong_counter_event_loop
1293 ->MakeFetcher<message_bridge::ServerStatistics>("/pi1/aos");
1294 aos::Fetcher<message_bridge::ClientStatistics> pi1_client_statistics_fetcher =
1295 pi1_pong_counter_event_loop
1296 ->MakeFetcher<message_bridge::ClientStatistics>("/pi1/aos");
1297
1298 MessageCounter<message_bridge::ServerStatistics>
Austin Schuh58646e22021-08-23 23:51:46 -07001299 *pi2_server_statistics_counter;
1300 pi2->OnStartup([pi2, &pi2_server_statistics_counter]() {
1301 pi2_server_statistics_counter =
1302 pi2->AlwaysStart<MessageCounter<message_bridge::ServerStatistics>>(
1303 "pi2_server_statistics_counter", "/pi2/aos");
1304 });
Austin Schuhc0b0f722020-12-12 18:36:06 -08001305 aos::Fetcher<message_bridge::ServerStatistics> pi2_server_statistics_fetcher =
1306 pi2_pong_counter_event_loop
1307 ->MakeFetcher<message_bridge::ServerStatistics>("/pi2/aos");
1308 aos::Fetcher<message_bridge::ClientStatistics> pi2_client_statistics_fetcher =
1309 pi2_pong_counter_event_loop
1310 ->MakeFetcher<message_bridge::ClientStatistics>("/pi2/aos");
1311
1312 MessageCounter<message_bridge::ServerStatistics>
Austin Schuh58646e22021-08-23 23:51:46 -07001313 *pi3_server_statistics_counter;
1314 pi3->OnStartup([pi3, &pi3_server_statistics_counter]() {
1315 pi3_server_statistics_counter =
1316 pi3->AlwaysStart<MessageCounter<message_bridge::ServerStatistics>>(
1317 "pi3_server_statistics_counter", "/pi3/aos");
1318 });
Austin Schuhc0b0f722020-12-12 18:36:06 -08001319 aos::Fetcher<message_bridge::ServerStatistics> pi3_server_statistics_fetcher =
1320 pi3_pong_counter_event_loop
1321 ->MakeFetcher<message_bridge::ServerStatistics>("/pi3/aos");
1322 aos::Fetcher<message_bridge::ClientStatistics> pi3_client_statistics_fetcher =
1323 pi3_pong_counter_event_loop
1324 ->MakeFetcher<message_bridge::ClientStatistics>("/pi3/aos");
1325
1326 MessageCounter<message_bridge::ClientStatistics>
1327 pi1_client_statistics_counter(pi1_pong_counter_event_loop.get(),
1328 "/pi1/aos");
1329 MessageCounter<message_bridge::ClientStatistics>
1330 pi2_client_statistics_counter(pi2_pong_counter_event_loop.get(),
1331 "/pi2/aos");
1332 MessageCounter<message_bridge::ClientStatistics>
1333 pi3_client_statistics_counter(pi3_pong_counter_event_loop.get(),
1334 "/pi3/aos");
1335
1336 simulated_event_loop_factory.RunFor(chrono::seconds(2) +
1337 chrono::milliseconds(5));
1338
1339 EXPECT_EQ(pi1_pong_counter.count(), 201u);
1340 EXPECT_EQ(pi2_pong_counter.count(), 201u);
1341
1342 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 20u);
1343 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 20u);
1344 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 20u);
1345 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 20u);
1346 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 20u);
1347 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 20u);
1348 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 20u);
1349
Austin Schuh58646e22021-08-23 23:51:46 -07001350 EXPECT_EQ(pi1_server_statistics_counter->count(), 2u);
1351 EXPECT_EQ(pi2_server_statistics_counter->count(), 2u);
1352 EXPECT_EQ(pi3_server_statistics_counter->count(), 2u);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001353
1354 EXPECT_EQ(pi1_client_statistics_counter.count(), 20u);
1355 EXPECT_EQ(pi2_client_statistics_counter.count(), 20u);
1356 EXPECT_EQ(pi3_client_statistics_counter.count(), 20u);
1357
1358 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001359 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 221);
1360 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 221);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001361
1362 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1363 EXPECT_TRUE(AllConnected(pi1_server_statistics_fetcher.get()))
1364 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1365 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1366 EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
1367 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1368 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1369 EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
1370 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1371 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1372 EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
1373 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1374 EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
1375 EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
1376 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
1377 EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
1378 EXPECT_TRUE(AllConnected(pi3_client_statistics_fetcher.get()))
1379 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1380
Austin Schuh58646e22021-08-23 23:51:46 -07001381 pi1->Disconnect(pi3->node());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001382
1383 simulated_event_loop_factory.RunFor(chrono::seconds(2));
1384
1385 EXPECT_EQ(pi1_pong_counter.count(), 401u);
1386 EXPECT_EQ(pi2_pong_counter.count(), 401u);
1387
1388 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 40u);
1389 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 40u);
1390 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 20u);
1391 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 40u);
1392 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 40u);
1393 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 40u);
1394 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 40u);
1395
Austin Schuh58646e22021-08-23 23:51:46 -07001396 EXPECT_EQ(pi1_server_statistics_counter->count(), 4u);
1397 EXPECT_EQ(pi2_server_statistics_counter->count(), 4u);
1398 EXPECT_EQ(pi3_server_statistics_counter->count(), 4u);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001399
1400 EXPECT_EQ(pi1_client_statistics_counter.count(), 40u);
1401 EXPECT_EQ(pi2_client_statistics_counter.count(), 40u);
1402 EXPECT_EQ(pi3_client_statistics_counter.count(), 40u);
1403
1404 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001405 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 441);
1406 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 441);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001407
1408 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1409 EXPECT_TRUE(AllConnectedBut(pi1_server_statistics_fetcher.get(), "pi3"))
1410 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1411 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1412 EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
1413 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1414 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1415 EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
1416 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1417 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1418 EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
1419 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1420 EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
1421 EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
1422 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
1423 EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
1424 EXPECT_TRUE(AllConnectedBut(pi3_client_statistics_fetcher.get(), "pi1"))
1425 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1426
Austin Schuh58646e22021-08-23 23:51:46 -07001427 pi1->Connect(pi3->node());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001428
1429 simulated_event_loop_factory.RunFor(chrono::seconds(2));
1430
Austin Schuh367a7f42021-11-23 23:04:36 -08001431 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1432 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1433 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1434 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1435 EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
1436 EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
1437
1438 EXPECT_EQ(ConnectedCount(pi1_server_statistics_fetcher.get(), "pi3"), 2u)
1439 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1440 EXPECT_EQ(ConnectedCount(pi1_server_statistics_fetcher.get(), "pi2"), 1u)
1441 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1442 EXPECT_EQ(ConnectedCount(pi1_client_statistics_fetcher.get(), "pi3"), 1u)
1443 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1444 EXPECT_EQ(ConnectedCount(pi1_client_statistics_fetcher.get(), "pi2"), 1u)
1445 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1446
1447 EXPECT_EQ(ConnectedCount(pi2_server_statistics_fetcher.get(), "pi1"), 1u)
1448 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1449 EXPECT_EQ(ConnectedCount(pi2_client_statistics_fetcher.get(), "pi1"), 1u)
1450 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1451
1452 EXPECT_EQ(ConnectedCount(pi3_server_statistics_fetcher.get(), "pi1"), 1u)
1453 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
1454 EXPECT_EQ(ConnectedCount(pi3_client_statistics_fetcher.get(), "pi1"), 2u)
1455 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1456
Austin Schuhc0b0f722020-12-12 18:36:06 -08001457 EXPECT_EQ(pi1_pong_counter.count(), 601u);
1458 EXPECT_EQ(pi2_pong_counter.count(), 601u);
1459
1460 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 60u);
1461 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 60u);
1462 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 40u);
1463 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 60u);
1464 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 60u);
1465 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 60u);
1466 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 60u);
1467
Austin Schuh58646e22021-08-23 23:51:46 -07001468 EXPECT_EQ(pi1_server_statistics_counter->count(), 6u);
1469 EXPECT_EQ(pi2_server_statistics_counter->count(), 6u);
1470 EXPECT_EQ(pi3_server_statistics_counter->count(), 6u);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001471
1472 EXPECT_EQ(pi1_client_statistics_counter.count(), 60u);
1473 EXPECT_EQ(pi2_client_statistics_counter.count(), 60u);
1474 EXPECT_EQ(pi3_client_statistics_counter.count(), 60u);
1475
1476 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001477 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 661);
1478 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 661);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001479
Austin Schuhc0b0f722020-12-12 18:36:06 -08001480 EXPECT_TRUE(AllConnected(pi1_server_statistics_fetcher.get()))
1481 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001482 EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
1483 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001484 EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
1485 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001486 EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
1487 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001488 EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
1489 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001490 EXPECT_TRUE(AllConnected(pi3_client_statistics_fetcher.get()))
1491 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1492}
1493
Austin Schuh2febf0d2020-09-21 22:24:30 -07001494// Tests that the time offset having a slope doesn't break the world.
1495// SimulatedMessageBridge has enough self consistency CHECK statements to
1496// confirm, and we can can also check a message in each direction to make sure
1497// it gets delivered as expected.
1498TEST(SimulatedEventLoopTest, MultinodePingPongWithOffsetAndSlope) {
1499 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
Austin Schuh373f1762021-06-02 21:07:09 -07001500 aos::configuration::ReadConfig(ArtifactPath(
1501 "aos/events/multinode_pingpong_test_combined_config.json"));
Austin Schuh2febf0d2020-09-21 22:24:30 -07001502 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
Austin Schuh87dd3832021-01-01 23:07:31 -08001503 const size_t pi1_index = configuration::GetNodeIndex(&config.message(), pi1);
1504 ASSERT_EQ(pi1_index, 0u);
Austin Schuh2febf0d2020-09-21 22:24:30 -07001505 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
Austin Schuh87dd3832021-01-01 23:07:31 -08001506 const size_t pi2_index = configuration::GetNodeIndex(&config.message(), pi2);
1507 ASSERT_EQ(pi2_index, 1u);
1508 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
1509 const size_t pi3_index = configuration::GetNodeIndex(&config.message(), pi3);
1510 ASSERT_EQ(pi3_index, 2u);
Austin Schuh2febf0d2020-09-21 22:24:30 -07001511
Austin Schuh87dd3832021-01-01 23:07:31 -08001512 message_bridge::TestingTimeConverter time(
1513 configuration::NodesCount(&config.message()));
Austin Schuh2febf0d2020-09-21 22:24:30 -07001514 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
Austin Schuh58646e22021-08-23 23:51:46 -07001515 simulated_event_loop_factory.SetTimeConverter(&time);
Austin Schuh2febf0d2020-09-21 22:24:30 -07001516
Austin Schuh2febf0d2020-09-21 22:24:30 -07001517 constexpr chrono::milliseconds kOffset{150100};
Austin Schuh87dd3832021-01-01 23:07:31 -08001518 time.AddNextTimestamp(
1519 distributed_clock::epoch(),
Austin Schuh58646e22021-08-23 23:51:46 -07001520 {BootTimestamp::epoch(), BootTimestamp::epoch() + kOffset,
1521 BootTimestamp::epoch()});
1522 time.AddNextTimestamp(distributed_clock::epoch() + chrono::seconds(10),
1523 {BootTimestamp::epoch() + chrono::milliseconds(9999),
1524 BootTimestamp::epoch() + kOffset + chrono::seconds(10),
1525 BootTimestamp::epoch() + chrono::milliseconds(9999)});
Austin Schuh2febf0d2020-09-21 22:24:30 -07001526
1527 std::unique_ptr<EventLoop> ping_event_loop =
1528 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
1529 Ping ping(ping_event_loop.get());
1530
1531 std::unique_ptr<EventLoop> pong_event_loop =
1532 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
1533 Pong pong(pong_event_loop.get());
1534
1535 std::unique_ptr<EventLoop> pi1_counter_event_loop =
1536 simulated_event_loop_factory.MakeEventLoop("pi1_counter", pi1);
1537 std::unique_ptr<EventLoop> pi2_counter_event_loop =
1538 simulated_event_loop_factory.MakeEventLoop("pi2_counter", pi2);
1539
1540 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
1541 pi1_counter_event_loop->MakeFetcher<examples::Ping>("/test");
1542 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
1543 pi2_counter_event_loop->MakeFetcher<examples::Ping>("/test");
1544
1545 aos::Fetcher<examples::Pong> pong_on_pi2_fetcher =
1546 pi2_counter_event_loop->MakeFetcher<examples::Pong>("/test");
1547 aos::Fetcher<examples::Pong> pong_on_pi1_fetcher =
1548 pi1_counter_event_loop->MakeFetcher<examples::Pong>("/test");
1549
1550 // End after a pong message comes back. This will leave the latest messages
1551 // on all channels so we can look at timestamps easily and check they make
1552 // sense.
1553 std::unique_ptr<EventLoop> pi1_pong_ender =
1554 simulated_event_loop_factory.MakeEventLoop("pi2_counter", pi1);
1555 int count = 0;
1556 pi1_pong_ender->MakeWatcher(
1557 "/test", [&simulated_event_loop_factory, &count](const examples::Pong &) {
1558 if (++count == 100) {
1559 simulated_event_loop_factory.Exit();
1560 }
1561 });
1562
1563 // Run enough that messages should be delivered.
1564 simulated_event_loop_factory.Run();
1565
1566 // Grab the latest messages.
1567 EXPECT_TRUE(ping_on_pi1_fetcher.Fetch());
1568 EXPECT_TRUE(ping_on_pi2_fetcher.Fetch());
1569 EXPECT_TRUE(pong_on_pi1_fetcher.Fetch());
1570 EXPECT_TRUE(pong_on_pi2_fetcher.Fetch());
1571
1572 // Compute their time on the global distributed clock so we can compute
1573 // distance betwen them.
1574 const distributed_clock::time_point pi1_ping_time =
1575 simulated_event_loop_factory.GetNodeEventLoopFactory(pi1)
1576 ->ToDistributedClock(
1577 ping_on_pi1_fetcher.context().monotonic_event_time);
1578 const distributed_clock::time_point pi2_ping_time =
1579 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
1580 ->ToDistributedClock(
1581 ping_on_pi2_fetcher.context().monotonic_event_time);
1582 const distributed_clock::time_point pi1_pong_time =
1583 simulated_event_loop_factory.GetNodeEventLoopFactory(pi1)
1584 ->ToDistributedClock(
1585 pong_on_pi1_fetcher.context().monotonic_event_time);
1586 const distributed_clock::time_point pi2_pong_time =
1587 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
1588 ->ToDistributedClock(
1589 pong_on_pi2_fetcher.context().monotonic_event_time);
1590
1591 // And confirm the delivery delay is just about exactly 150 uS for both
1592 // directions like expected. There will be a couple ns of rounding errors in
1593 // the conversion functions that aren't worth accounting for right now. This
1594 // will either be really close, or really far.
1595 EXPECT_GE(pi2_ping_time, chrono::microseconds(150) - chrono::nanoseconds(10) +
1596 pi1_ping_time);
1597 EXPECT_LE(pi2_ping_time, chrono::microseconds(150) + chrono::nanoseconds(10) +
1598 pi1_ping_time);
1599
1600 EXPECT_GE(pi1_pong_time, chrono::microseconds(150) - chrono::nanoseconds(10) +
1601 pi2_pong_time);
1602 EXPECT_LE(pi1_pong_time, chrono::microseconds(150) + chrono::nanoseconds(10) +
1603 pi2_pong_time);
1604}
1605
Austin Schuh4c570ea2020-11-19 23:13:24 -08001606void SendPing(aos::Sender<examples::Ping> *sender, int value) {
1607 aos::Sender<examples::Ping>::Builder builder = sender->MakeBuilder();
1608 examples::Ping::Builder ping_builder = builder.MakeBuilder<examples::Ping>();
1609 ping_builder.add_value(value);
milind1f1dca32021-07-03 13:50:07 -07001610 builder.CheckOk(builder.Send(ping_builder.Finish()));
Austin Schuh4c570ea2020-11-19 23:13:24 -08001611}
1612
1613// Tests that reliable (and unreliable) ping messages get forwarded as expected.
Austin Schuh89c9b812021-02-20 14:42:10 -08001614TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeStartupTesting) {
Austin Schuh4c570ea2020-11-19 23:13:24 -08001615 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
1616 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
1617
1618 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
1619
1620 std::unique_ptr<EventLoop> ping_event_loop =
1621 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
1622 aos::Sender<examples::Ping> pi1_reliable_sender =
1623 ping_event_loop->MakeSender<examples::Ping>("/reliable");
1624 aos::Sender<examples::Ping> pi1_unreliable_sender =
1625 ping_event_loop->MakeSender<examples::Ping>("/unreliable");
1626 SendPing(&pi1_reliable_sender, 1);
1627 SendPing(&pi1_unreliable_sender, 1);
1628
1629 std::unique_ptr<EventLoop> pi2_pong_event_loop =
1630 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
1631 MessageCounter<examples::Ping> pi2_reliable_counter(pi2_pong_event_loop.get(),
1632 "/reliable");
1633 MessageCounter<examples::Ping> pi2_unreliable_counter(
1634 pi2_pong_event_loop.get(), "/unreliable");
1635 aos::Fetcher<examples::Ping> reliable_on_pi2_fetcher =
1636 pi2_pong_event_loop->MakeFetcher<examples::Ping>("/reliable");
1637 aos::Fetcher<examples::Ping> unreliable_on_pi2_fetcher =
1638 pi2_pong_event_loop->MakeFetcher<examples::Ping>("/unreliable");
1639
1640 const size_t reliable_channel_index = configuration::ChannelIndex(
1641 pi2_pong_event_loop->configuration(), reliable_on_pi2_fetcher.channel());
1642
1643 std::unique_ptr<EventLoop> pi1_remote_timestamp =
1644 simulated_event_loop_factory.MakeEventLoop("pi1_remote_timestamp", pi1);
1645
Austin Schuheeaa2022021-01-02 21:52:03 -08001646 const chrono::nanoseconds network_delay =
1647 simulated_event_loop_factory.network_delay();
1648
Austin Schuh4c570ea2020-11-19 23:13:24 -08001649 int reliable_timestamp_count = 0;
1650 pi1_remote_timestamp->MakeWatcher(
Austin Schuh89c9b812021-02-20 14:42:10 -08001651 shared() ? "/pi1/aos/remote_timestamps/pi2"
1652 : "/pi1/aos/remote_timestamps/pi2/reliable/aos-examples-Ping",
Austin Schuh20ac95d2020-12-05 17:24:19 -08001653 [reliable_channel_index, &reliable_timestamp_count,
Austin Schuheeaa2022021-01-02 21:52:03 -08001654 &simulated_event_loop_factory, pi2, network_delay, &pi2_pong_event_loop,
1655 &pi1_remote_timestamp](const RemoteMessage &header) {
Austin Schuh20ac95d2020-12-05 17:24:19 -08001656 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuhcdd90272021-03-15 12:46:16 -07001657 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
Austin Schuh20ac95d2020-12-05 17:24:19 -08001658 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
Austin Schuhcdd90272021-03-15 12:46:16 -07001659 ->boot_uuid());
Austin Schuh4c570ea2020-11-19 23:13:24 -08001660 VLOG(1) << aos::FlatbufferToJson(&header);
1661 if (header.channel_index() == reliable_channel_index) {
1662 ++reliable_timestamp_count;
1663 }
Austin Schuheeaa2022021-01-02 21:52:03 -08001664
1665 const aos::monotonic_clock::time_point header_monotonic_sent_time(
1666 chrono::nanoseconds(header.monotonic_sent_time()));
1667
1668 EXPECT_EQ(pi1_remote_timestamp->context().monotonic_event_time,
1669 header_monotonic_sent_time + network_delay +
1670 (pi1_remote_timestamp->monotonic_now() -
1671 pi2_pong_event_loop->monotonic_now()));
Austin Schuh4c570ea2020-11-19 23:13:24 -08001672 });
1673
1674 // Wait to let timestamp estimation start up before looking for the results.
1675 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
1676
1677 EXPECT_EQ(pi2_reliable_counter.count(), 1u);
1678 // This one isn't reliable, but was sent before the start. It should *not* be
1679 // delivered.
1680 EXPECT_EQ(pi2_unreliable_counter.count(), 0u);
1681 // Confirm we got a timestamp logged for the message that was forwarded.
1682 EXPECT_EQ(reliable_timestamp_count, 1u);
1683
1684 SendPing(&pi1_reliable_sender, 2);
1685 SendPing(&pi1_unreliable_sender, 2);
1686 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
1687 EXPECT_EQ(pi2_reliable_counter.count(), 2u);
1688 EXPECT_EQ(pi2_unreliable_counter.count(), 1u);
1689
1690 EXPECT_EQ(reliable_timestamp_count, 2u);
1691}
1692
Austin Schuh20ac95d2020-12-05 17:24:19 -08001693// Tests that rebooting a node changes the ServerStatistics message and the
1694// RemoteTimestamp message.
Austin Schuh89c9b812021-02-20 14:42:10 -08001695TEST_P(RemoteMessageSimulatedEventLoopTest, BootUUIDTest) {
Austin Schuh72e65682021-09-02 11:37:05 -07001696 const UUID pi1_boot0 = UUID::Random();
1697 const UUID pi2_boot0 = UUID::Random();
1698 const UUID pi2_boot1 = UUID::Random();
1699 const UUID pi3_boot0 = UUID::Random();
1700 UUID expected_boot_uuid = pi2_boot0;
Austin Schuh20ac95d2020-12-05 17:24:19 -08001701
Austin Schuh58646e22021-08-23 23:51:46 -07001702 message_bridge::TestingTimeConverter time(
1703 configuration::NodesCount(&config.message()));
1704 SimulatedEventLoopFactory factory(&config.message());
1705 factory.SetTimeConverter(&time);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001706
Austin Schuh58646e22021-08-23 23:51:46 -07001707 const size_t pi1_index =
1708 configuration::GetNodeIndex(&config.message(), "pi1");
1709 const size_t pi2_index =
1710 configuration::GetNodeIndex(&config.message(), "pi2");
1711 const size_t pi3_index =
1712 configuration::GetNodeIndex(&config.message(), "pi3");
Austin Schuh20ac95d2020-12-05 17:24:19 -08001713
Austin Schuh58646e22021-08-23 23:51:46 -07001714 {
1715 time.AddNextTimestamp(distributed_clock::epoch(),
1716 {BootTimestamp::epoch(), BootTimestamp::epoch(),
1717 BootTimestamp::epoch()});
1718
1719 const chrono::nanoseconds dt = chrono::milliseconds(2001);
1720
1721 time.AddNextTimestamp(
1722 distributed_clock::epoch() + dt,
1723 {BootTimestamp::epoch() + dt,
1724 BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
1725 BootTimestamp::epoch() + dt});
1726
1727 time.set_boot_uuid(pi1_index, 0, pi1_boot0);
1728 time.set_boot_uuid(pi2_index, 0, pi2_boot0);
1729 time.set_boot_uuid(pi2_index, 1, pi2_boot1);
1730 time.set_boot_uuid(pi3_index, 0, pi3_boot0);
1731 }
1732
1733 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1734 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
1735
1736 pi1->OnStartup([pi1]() { pi1->AlwaysStart<Ping>("ping"); });
1737 pi2->OnStartup([pi2]() { pi2->AlwaysStart<Pong>("pong"); });
Austin Schuh20ac95d2020-12-05 17:24:19 -08001738
1739 std::unique_ptr<EventLoop> pi1_remote_timestamp =
Austin Schuh58646e22021-08-23 23:51:46 -07001740 pi1->MakeEventLoop("pi1_remote_timestamp");
Austin Schuh20ac95d2020-12-05 17:24:19 -08001741
1742 int timestamp_count = 0;
1743 pi1_remote_timestamp->MakeWatcher(
Austin Schuh8902fa52021-03-14 22:39:24 -07001744 "/pi2/aos", [&expected_boot_uuid,
1745 &pi1_remote_timestamp](const message_bridge::Timestamp &) {
Austin Schuha9012be2021-07-21 15:19:11 -07001746 EXPECT_EQ(pi1_remote_timestamp->context().source_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -07001747 expected_boot_uuid);
1748 });
1749 pi1_remote_timestamp->MakeWatcher(
1750 "/test",
1751 [&expected_boot_uuid, &pi1_remote_timestamp](const examples::Pong &) {
Austin Schuha9012be2021-07-21 15:19:11 -07001752 EXPECT_EQ(pi1_remote_timestamp->context().source_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -07001753 expected_boot_uuid);
1754 });
1755 pi1_remote_timestamp->MakeWatcher(
Austin Schuh89c9b812021-02-20 14:42:10 -08001756 shared() ? "/pi1/aos/remote_timestamps/pi2"
1757 : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping",
Austin Schuh20ac95d2020-12-05 17:24:19 -08001758 [&timestamp_count, &expected_boot_uuid](const RemoteMessage &header) {
1759 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuhcdd90272021-03-15 12:46:16 -07001760 EXPECT_EQ(UUID::FromVector(header.boot_uuid()), expected_boot_uuid);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001761 VLOG(1) << aos::FlatbufferToJson(&header);
1762 ++timestamp_count;
1763 });
1764
1765 int pi1_server_statistics_count = 0;
Austin Schuh58646e22021-08-23 23:51:46 -07001766 bool first_pi1_server_statistics = true;
Austin Schuh367a7f42021-11-23 23:04:36 -08001767 int boot_number = 0;
1768 monotonic_clock::time_point expected_connection_time = pi1->monotonic_now();
Austin Schuh20ac95d2020-12-05 17:24:19 -08001769 pi1_remote_timestamp->MakeWatcher(
Austin Schuh367a7f42021-11-23 23:04:36 -08001770 "/pi1/aos",
1771 [&pi1_server_statistics_count, &expected_boot_uuid,
1772 &expected_connection_time, &first_pi1_server_statistics,
1773 &boot_number](const message_bridge::ServerStatistics &stats) {
Austin Schuh20ac95d2020-12-05 17:24:19 -08001774 VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
1775 for (const message_bridge::ServerConnection *connection :
1776 *stats.connections()) {
Austin Schuh58646e22021-08-23 23:51:46 -07001777 if (connection->state() == message_bridge::State::CONNECTED) {
1778 ASSERT_TRUE(connection->has_boot_uuid());
1779 }
1780 if (!first_pi1_server_statistics) {
1781 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1782 }
Austin Schuh20ac95d2020-12-05 17:24:19 -08001783 if (connection->node()->name()->string_view() == "pi2") {
Austin Schuh58646e22021-08-23 23:51:46 -07001784 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1785 ASSERT_TRUE(connection->has_boot_uuid());
Austin Schuh20ac95d2020-12-05 17:24:19 -08001786 EXPECT_EQ(expected_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -07001787 UUID::FromString(connection->boot_uuid()))
Austin Schuh20ac95d2020-12-05 17:24:19 -08001788 << " : Got " << aos::FlatbufferToJson(&stats);
Austin Schuh367a7f42021-11-23 23:04:36 -08001789 EXPECT_EQ(monotonic_clock::time_point(chrono::nanoseconds(
1790 connection->connected_since_time())),
1791 expected_connection_time);
1792 EXPECT_EQ(boot_number + 1, connection->connection_count());
Austin Schuh20ac95d2020-12-05 17:24:19 -08001793 ++pi1_server_statistics_count;
1794 }
1795 }
Austin Schuh58646e22021-08-23 23:51:46 -07001796 first_pi1_server_statistics = false;
Austin Schuh20ac95d2020-12-05 17:24:19 -08001797 });
1798
Austin Schuh58646e22021-08-23 23:51:46 -07001799 int pi1_client_statistics_count = 0;
1800 pi1_remote_timestamp->MakeWatcher(
Austin Schuh367a7f42021-11-23 23:04:36 -08001801 "/pi1/aos", [&pi1_client_statistics_count, &expected_boot_uuid,
1802 &expected_connection_time, &boot_number](
Austin Schuh58646e22021-08-23 23:51:46 -07001803 const message_bridge::ClientStatistics &stats) {
1804 VLOG(1) << "pi1 ClientStatistics " << FlatbufferToJson(&stats);
1805 for (const message_bridge::ClientConnection *connection :
1806 *stats.connections()) {
1807 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1808 if (connection->node()->name()->string_view() == "pi2") {
1809 ++pi1_client_statistics_count;
Austin Schuh367a7f42021-11-23 23:04:36 -08001810 EXPECT_EQ(expected_boot_uuid,
1811 UUID::FromString(connection->boot_uuid()))
1812 << " : Got " << aos::FlatbufferToJson(&stats);
1813 EXPECT_EQ(monotonic_clock::time_point(chrono::nanoseconds(
1814 connection->connected_since_time())),
1815 expected_connection_time);
1816 EXPECT_EQ(boot_number + 1, connection->connection_count());
1817 } else {
1818 EXPECT_EQ(connection->connected_since_time(), 0);
1819 EXPECT_EQ(1, connection->connection_count());
Austin Schuh58646e22021-08-23 23:51:46 -07001820 }
1821 }
1822 });
1823
1824 // Confirm that reboot changes the UUID.
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -07001825 pi2->OnShutdown([&expected_boot_uuid, &boot_number, &expected_connection_time,
1826 pi1, pi2, pi2_boot1]() {
1827 expected_boot_uuid = pi2_boot1;
1828 ++boot_number;
1829 LOG(INFO) << "OnShutdown triggered for pi2";
1830 pi2->OnStartup(
1831 [&expected_boot_uuid, &expected_connection_time, pi1, pi2]() {
1832 EXPECT_EQ(expected_boot_uuid, pi2->boot_uuid());
1833 expected_connection_time = pi1->monotonic_now();
1834 });
1835 });
Austin Schuh58646e22021-08-23 23:51:46 -07001836
Austin Schuh20ac95d2020-12-05 17:24:19 -08001837 // Let a couple of ServerStatistics messages show up before rebooting.
Austin Schuh58646e22021-08-23 23:51:46 -07001838 factory.RunFor(chrono::milliseconds(2002));
Austin Schuh20ac95d2020-12-05 17:24:19 -08001839
1840 EXPECT_GT(timestamp_count, 100);
1841 EXPECT_GE(pi1_server_statistics_count, 1u);
1842
Austin Schuh20ac95d2020-12-05 17:24:19 -08001843 timestamp_count = 0;
1844 pi1_server_statistics_count = 0;
1845
Austin Schuh58646e22021-08-23 23:51:46 -07001846 factory.RunFor(chrono::milliseconds(2000));
Austin Schuh20ac95d2020-12-05 17:24:19 -08001847 EXPECT_GT(timestamp_count, 100);
1848 EXPECT_GE(pi1_server_statistics_count, 1u);
1849}
1850
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -07001851INSTANTIATE_TEST_SUITE_P(
Austin Schuh89c9b812021-02-20 14:42:10 -08001852 All, RemoteMessageSimulatedEventLoopTest,
1853 ::testing::Values(
1854 Param{"multinode_pingpong_test_combined_config.json", true},
1855 Param{"multinode_pingpong_test_split_config.json", false}));
1856
Austin Schuh58646e22021-08-23 23:51:46 -07001857// Tests that Startup and Shutdown do reasonable things.
1858TEST(SimulatedEventLoopTest, MultinodePingPongStartup) {
1859 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
1860 aos::configuration::ReadConfig(
1861 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
1862
Austin Schuh72e65682021-09-02 11:37:05 -07001863 size_t pi1_shutdown_counter = 0;
1864 size_t pi2_shutdown_counter = 0;
1865 MessageCounter<examples::Pong> *pi1_pong_counter = nullptr;
1866 MessageCounter<examples::Ping> *pi2_ping_counter = nullptr;
1867
Austin Schuh58646e22021-08-23 23:51:46 -07001868 message_bridge::TestingTimeConverter time(
1869 configuration::NodesCount(&config.message()));
1870 SimulatedEventLoopFactory factory(&config.message());
1871 factory.SetTimeConverter(&time);
1872 time.AddNextTimestamp(
1873 distributed_clock::epoch(),
1874 {BootTimestamp::epoch(), BootTimestamp::epoch(), BootTimestamp::epoch()});
1875
1876 const chrono::nanoseconds dt = chrono::seconds(10) + chrono::milliseconds(6);
1877
1878 time.AddNextTimestamp(
1879 distributed_clock::epoch() + dt,
1880 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
1881 BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
1882 BootTimestamp::epoch() + dt});
1883
1884 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1885 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
1886
1887 // Configure startup to start Ping and Pong, and count.
1888 size_t pi1_startup_counter = 0;
1889 size_t pi2_startup_counter = 0;
1890 pi1->OnStartup([pi1]() {
1891 LOG(INFO) << "Made ping";
1892 pi1->AlwaysStart<Ping>("ping");
1893 });
1894 pi1->OnStartup([&pi1_startup_counter]() { ++pi1_startup_counter; });
1895 pi2->OnStartup([pi2]() {
1896 LOG(INFO) << "Made pong";
1897 pi2->AlwaysStart<Pong>("pong");
1898 });
1899 pi2->OnStartup([&pi2_startup_counter]() { ++pi2_startup_counter; });
1900
1901 // Shutdown just counts.
Austin Schuh58646e22021-08-23 23:51:46 -07001902 pi1->OnShutdown([&pi1_shutdown_counter]() { ++pi1_shutdown_counter; });
1903 pi2->OnShutdown([&pi2_shutdown_counter]() { ++pi2_shutdown_counter; });
1904
Austin Schuh58646e22021-08-23 23:51:46 -07001905 // Automatically make counters on startup.
1906 pi1->OnStartup([&pi1_pong_counter, pi1]() {
1907 pi1_pong_counter = pi1->AlwaysStart<MessageCounter<examples::Pong>>(
1908 "pi1_pong_counter", "/test");
1909 });
1910 pi1->OnShutdown([&pi1_pong_counter]() { pi1_pong_counter = nullptr; });
1911 pi2->OnStartup([&pi2_ping_counter, pi2]() {
1912 pi2_ping_counter = pi2->AlwaysStart<MessageCounter<examples::Ping>>(
1913 "pi2_ping_counter", "/test");
1914 });
1915 pi2->OnShutdown([&pi2_ping_counter]() { pi2_ping_counter = nullptr; });
1916
1917 EXPECT_EQ(pi2_ping_counter, nullptr);
1918 EXPECT_EQ(pi1_pong_counter, nullptr);
1919
1920 EXPECT_EQ(pi1_startup_counter, 0u);
1921 EXPECT_EQ(pi2_startup_counter, 0u);
1922 EXPECT_EQ(pi1_shutdown_counter, 0u);
1923 EXPECT_EQ(pi2_shutdown_counter, 0u);
1924
1925 factory.RunFor(chrono::seconds(10) + chrono::milliseconds(5));
1926 EXPECT_EQ(pi1_startup_counter, 1u);
1927 EXPECT_EQ(pi2_startup_counter, 1u);
1928 EXPECT_EQ(pi1_shutdown_counter, 0u);
1929 EXPECT_EQ(pi2_shutdown_counter, 0u);
1930 EXPECT_EQ(pi2_ping_counter->count(), 1001);
1931 EXPECT_EQ(pi1_pong_counter->count(), 1001);
1932
1933 LOG(INFO) << pi1->monotonic_now();
1934 LOG(INFO) << pi2->monotonic_now();
1935
1936 factory.RunFor(chrono::seconds(5) + chrono::milliseconds(5));
1937
1938 EXPECT_EQ(pi1_startup_counter, 2u);
1939 EXPECT_EQ(pi2_startup_counter, 2u);
1940 EXPECT_EQ(pi1_shutdown_counter, 1u);
1941 EXPECT_EQ(pi2_shutdown_counter, 1u);
1942 EXPECT_EQ(pi2_ping_counter->count(), 501);
1943 EXPECT_EQ(pi1_pong_counter->count(), 501);
1944}
1945
1946// Tests that OnStartup handlers can be added after running and get called, and
1947// can't be called when running.
1948TEST(SimulatedEventLoopDeathTest, OnStartupWhileRunning) {
1949 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
1950 aos::configuration::ReadConfig(
1951 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
1952
1953 // Test that we can add startup handlers as long as we aren't running, and
1954 // they get run when Run gets called again.
1955 // Test that adding a startup handler when running fails.
1956 //
1957 // Test shutdown handlers get called on destruction.
1958 SimulatedEventLoopFactory factory(&config.message());
1959
1960 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1961
1962 int startup_count0 = 0;
1963 int startup_count1 = 0;
1964
1965 pi1->OnStartup([&]() { ++startup_count0; });
1966 EXPECT_EQ(startup_count0, 0);
1967 EXPECT_EQ(startup_count1, 0);
1968
1969 factory.RunFor(chrono::nanoseconds(1));
1970 EXPECT_EQ(startup_count0, 1);
1971 EXPECT_EQ(startup_count1, 0);
1972
1973 pi1->OnStartup([&]() { ++startup_count1; });
1974 EXPECT_EQ(startup_count0, 1);
1975 EXPECT_EQ(startup_count1, 0);
1976
1977 factory.RunFor(chrono::nanoseconds(1));
1978 EXPECT_EQ(startup_count0, 1);
1979 EXPECT_EQ(startup_count1, 1);
1980
1981 std::unique_ptr<EventLoop> loop = pi1->MakeEventLoop("foo");
1982 loop->OnRun([&]() { pi1->OnStartup([]() {}); });
1983
1984 EXPECT_DEATH({ factory.RunFor(chrono::nanoseconds(1)); },
1985 "Can only register OnStartup handlers when not running.");
1986}
1987
1988// Tests that OnStartup handlers can be added after running and get called, and
1989// all the handlers get called on reboot. Shutdown handlers are tested the same
1990// way.
1991TEST(SimulatedEventLoopTest, OnStartupShutdownAllRestarts) {
1992 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
1993 aos::configuration::ReadConfig(
1994 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
1995
Austin Schuh72e65682021-09-02 11:37:05 -07001996 int startup_count0 = 0;
1997 int shutdown_count0 = 0;
1998 int startup_count1 = 0;
1999 int shutdown_count1 = 0;
2000
Austin Schuh58646e22021-08-23 23:51:46 -07002001 message_bridge::TestingTimeConverter time(
2002 configuration::NodesCount(&config.message()));
2003 SimulatedEventLoopFactory factory(&config.message());
2004 factory.SetTimeConverter(&time);
2005 time.StartEqual();
2006
2007 const chrono::nanoseconds dt = chrono::seconds(10);
2008 time.RebootAt(0, distributed_clock::epoch() + dt);
2009 time.RebootAt(0, distributed_clock::epoch() + 2 * dt);
2010
2011 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2012
Austin Schuh58646e22021-08-23 23:51:46 -07002013 pi1->OnStartup([&]() { ++startup_count0; });
2014 pi1->OnShutdown([&]() { ++shutdown_count0; });
2015 EXPECT_EQ(startup_count0, 0);
2016 EXPECT_EQ(startup_count1, 0);
2017 EXPECT_EQ(shutdown_count0, 0);
2018 EXPECT_EQ(shutdown_count1, 0);
2019
2020 factory.RunFor(chrono::nanoseconds(1));
2021 EXPECT_EQ(startup_count0, 1);
2022 EXPECT_EQ(startup_count1, 0);
2023 EXPECT_EQ(shutdown_count0, 0);
2024 EXPECT_EQ(shutdown_count1, 0);
2025
2026 pi1->OnStartup([&]() { ++startup_count1; });
2027 EXPECT_EQ(startup_count0, 1);
2028 EXPECT_EQ(startup_count1, 0);
2029 EXPECT_EQ(shutdown_count0, 0);
2030 EXPECT_EQ(shutdown_count1, 0);
2031
2032 factory.RunFor(chrono::nanoseconds(1));
2033 EXPECT_EQ(startup_count0, 1);
2034 EXPECT_EQ(startup_count1, 1);
2035 EXPECT_EQ(shutdown_count0, 0);
2036 EXPECT_EQ(shutdown_count1, 0);
2037
2038 factory.RunFor(chrono::seconds(15));
2039
2040 EXPECT_EQ(startup_count0, 2);
2041 EXPECT_EQ(startup_count1, 2);
2042 EXPECT_EQ(shutdown_count0, 1);
2043 EXPECT_EQ(shutdown_count1, 0);
2044
2045 pi1->OnShutdown([&]() { ++shutdown_count1; });
2046 factory.RunFor(chrono::seconds(10));
2047
2048 EXPECT_EQ(startup_count0, 3);
2049 EXPECT_EQ(startup_count1, 3);
2050 EXPECT_EQ(shutdown_count0, 2);
2051 EXPECT_EQ(shutdown_count1, 1);
2052}
2053
2054// Tests that event loops which outlive shutdown crash.
2055TEST(SimulatedEventLoopDeathTest, EventLoopOutlivesReboot) {
2056 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2057 aos::configuration::ReadConfig(
2058 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2059
2060 message_bridge::TestingTimeConverter time(
2061 configuration::NodesCount(&config.message()));
2062 SimulatedEventLoopFactory factory(&config.message());
2063 factory.SetTimeConverter(&time);
2064 time.StartEqual();
2065
2066 const chrono::nanoseconds dt = chrono::seconds(10);
2067 time.RebootAt(0, distributed_clock::epoch() + dt);
2068
2069 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2070
2071 std::unique_ptr<EventLoop> loop = pi1->MakeEventLoop("foo");
2072
2073 EXPECT_DEATH({ factory.RunFor(dt * 2); }, "Event loop");
2074}
2075
2076// Tests that messages don't survive a reboot of a node.
2077TEST(SimulatedEventLoopTest, ChannelClearedOnReboot) {
2078 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2079 aos::configuration::ReadConfig(
2080 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2081
2082 message_bridge::TestingTimeConverter time(
2083 configuration::NodesCount(&config.message()));
2084 SimulatedEventLoopFactory factory(&config.message());
2085 factory.SetTimeConverter(&time);
2086 time.StartEqual();
2087
2088 const chrono::nanoseconds dt = chrono::seconds(10);
2089 time.RebootAt(0, distributed_clock::epoch() + dt);
2090
2091 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2092
2093 const UUID boot_uuid = pi1->boot_uuid();
2094 EXPECT_NE(boot_uuid, UUID::Zero());
2095
2096 {
2097 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
2098 aos::Sender<examples::Ping> test_message_sender =
2099 ping_event_loop->MakeSender<examples::Ping>("/reliable");
2100 SendPing(&test_message_sender, 1);
2101 }
2102
2103 factory.RunFor(chrono::seconds(5));
2104
2105 {
2106 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
2107 aos::Fetcher<examples::Ping> fetcher =
2108 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
2109 EXPECT_TRUE(fetcher.Fetch());
2110 }
2111
2112 factory.RunFor(chrono::seconds(10));
2113
2114 {
2115 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
2116 aos::Fetcher<examples::Ping> fetcher =
2117 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
2118 EXPECT_FALSE(fetcher.Fetch());
2119 }
2120 EXPECT_NE(boot_uuid, pi1->boot_uuid());
2121}
2122
2123// Tests that reliable messages get resent on reboot.
2124TEST(SimulatedEventLoopTest, ReliableMessageResentOnReboot) {
2125 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2126 aos::configuration::ReadConfig(
2127 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2128
2129 message_bridge::TestingTimeConverter time(
2130 configuration::NodesCount(&config.message()));
2131 SimulatedEventLoopFactory factory(&config.message());
2132 factory.SetTimeConverter(&time);
2133 time.StartEqual();
2134
2135 const chrono::nanoseconds dt = chrono::seconds(1);
2136 time.RebootAt(1, distributed_clock::epoch() + dt);
2137
2138 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2139 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2140
2141 const UUID pi1_boot_uuid = pi1->boot_uuid();
2142 const UUID pi2_boot_uuid = pi2->boot_uuid();
2143 EXPECT_NE(pi1_boot_uuid, UUID::Zero());
2144 EXPECT_NE(pi2_boot_uuid, UUID::Zero());
2145
2146 {
2147 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
2148 aos::Sender<examples::Ping> test_message_sender =
2149 ping_event_loop->MakeSender<examples::Ping>("/reliable");
2150 SendPing(&test_message_sender, 1);
2151 }
2152
2153 factory.RunFor(chrono::milliseconds(500));
2154
2155 {
2156 ::std::unique_ptr<EventLoop> ping_event_loop = pi2->MakeEventLoop("pong");
2157 aos::Fetcher<examples::Ping> fetcher =
2158 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
2159 EXPECT_TRUE(fetcher.Fetch());
2160 }
2161
2162 factory.RunFor(chrono::seconds(1));
2163
2164 {
2165 ::std::unique_ptr<EventLoop> ping_event_loop = pi2->MakeEventLoop("pong");
2166 aos::Fetcher<examples::Ping> fetcher =
2167 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
2168 EXPECT_TRUE(fetcher.Fetch());
2169 }
2170 EXPECT_NE(pi2_boot_uuid, pi2->boot_uuid());
2171}
2172
Austin Schuh48205e62021-11-12 14:13:18 -08002173class SimulatedEventLoopDisconnectTest : public ::testing::Test {
2174 public:
2175 SimulatedEventLoopDisconnectTest()
2176 : config(aos::configuration::ReadConfig(ArtifactPath(
2177 "aos/events/multinode_pingpong_test_split_config.json"))),
2178 time(configuration::NodesCount(&config.message())),
2179 factory(&config.message()) {
2180 factory.SetTimeConverter(&time);
2181 }
2182
2183 void VerifyChannels(std::set<const aos::Channel *> statistics_channels,
2184 const monotonic_clock::time_point allowable_message_time,
2185 std::set<const aos::Node *> empty_nodes) {
2186 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2187 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2188 std::unique_ptr<aos::EventLoop> pi1_event_loop =
2189 pi1->MakeEventLoop("fetcher");
2190 std::unique_ptr<aos::EventLoop> pi2_event_loop =
2191 pi2->MakeEventLoop("fetcher");
2192 for (const aos::Channel *channel : *factory.configuration()->channels()) {
2193 if (configuration::ChannelIsReadableOnNode(channel,
2194 pi1_event_loop->node())) {
2195 std::unique_ptr<aos::RawFetcher> fetcher =
2196 pi1_event_loop->MakeRawFetcher(channel);
2197 if (statistics_channels.find(channel) == statistics_channels.end() ||
2198 empty_nodes.find(pi1_event_loop->node()) != empty_nodes.end()) {
2199 EXPECT_FALSE(fetcher->Fetch() &&
2200 fetcher->context().monotonic_event_time >
2201 allowable_message_time)
2202 << ": Found recent message on channel "
2203 << configuration::CleanedChannelToString(channel) << " and time "
2204 << fetcher->context().monotonic_event_time << " > "
2205 << allowable_message_time << " on pi1";
2206 } else {
2207 EXPECT_TRUE(fetcher->Fetch() &&
2208 fetcher->context().monotonic_event_time >=
2209 allowable_message_time)
2210 << ": Didn't find recent message on channel "
2211 << configuration::CleanedChannelToString(channel) << " on pi1";
2212 }
2213 }
2214 if (configuration::ChannelIsReadableOnNode(channel,
2215 pi2_event_loop->node())) {
2216 std::unique_ptr<aos::RawFetcher> fetcher =
2217 pi2_event_loop->MakeRawFetcher(channel);
2218 if (statistics_channels.find(channel) == statistics_channels.end() ||
2219 empty_nodes.find(pi2_event_loop->node()) != empty_nodes.end()) {
2220 EXPECT_FALSE(fetcher->Fetch() &&
2221 fetcher->context().monotonic_event_time >
2222 allowable_message_time)
2223 << ": Found message on channel "
2224 << configuration::CleanedChannelToString(channel) << " and time "
2225 << fetcher->context().monotonic_event_time << " > "
2226 << allowable_message_time << " on pi2";
2227 } else {
2228 EXPECT_TRUE(fetcher->Fetch() &&
2229 fetcher->context().monotonic_event_time >=
2230 allowable_message_time)
2231 << ": Didn't find message on channel "
2232 << configuration::CleanedChannelToString(channel) << " on pi2";
2233 }
2234 }
2235 }
2236 }
2237
2238 aos::FlatbufferDetachedBuffer<aos::Configuration> config;
2239
2240 message_bridge::TestingTimeConverter time;
2241 SimulatedEventLoopFactory factory;
2242};
2243
2244// Tests that if we have message bridge client/server disabled, and timing
2245// reports disabled, no messages are sent. Also tests that we can disconnect a
2246// node and disable statistics on it and it actually fully disconnects.
2247TEST_F(SimulatedEventLoopDisconnectTest, NoMessagesWhenDisabled) {
2248 time.StartEqual();
2249 factory.SkipTimingReport();
2250 factory.DisableStatistics();
2251
2252 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2253 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2254
2255 std::unique_ptr<aos::EventLoop> pi1_event_loop =
2256 pi1->MakeEventLoop("fetcher");
2257 std::unique_ptr<aos::EventLoop> pi2_event_loop =
2258 pi2->MakeEventLoop("fetcher");
2259
2260 factory.RunFor(chrono::milliseconds(100000));
2261
2262 // Confirm no messages are sent if we've configured them all off.
2263 VerifyChannels({}, monotonic_clock::min_time, {});
2264
2265 // Now, confirm that all the message_bridge channels come back when we
2266 // re-enable.
2267 factory.EnableStatistics();
2268
2269 factory.RunFor(chrono::milliseconds(10050));
2270
2271 // Build up the list of all the messages we expect when we come back.
2272 {
2273 std::set<const aos::Channel *> statistics_channels;
Brian Silverman18c8aac2022-01-02 00:03:37 -08002274 for (const std::pair<std::string_view, const Node *> &pi :
Austin Schuh48205e62021-11-12 14:13:18 -08002275 std::vector<std::pair<std::string_view, const Node *>>{
2276 {"/pi1/aos", pi1->node()},
2277 {"/pi2/aos", pi1->node()},
2278 {"/pi3/aos", pi1->node()}}) {
2279 statistics_channels.insert(configuration::GetChannel(
2280 factory.configuration(), pi.first, "aos.message_bridge.Timestamp", "",
2281 pi.second));
2282 statistics_channels.insert(configuration::GetChannel(
2283 factory.configuration(), pi.first,
2284 "aos.message_bridge.ServerStatistics", "", pi.second));
2285 statistics_channels.insert(configuration::GetChannel(
2286 factory.configuration(), pi.first,
2287 "aos.message_bridge.ClientStatistics", "", pi.second));
2288 }
2289
2290 statistics_channels.insert(configuration::GetChannel(
2291 factory.configuration(),
2292 "/pi1/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp",
2293 "aos.message_bridge.RemoteMessage", "", pi1->node()));
2294 statistics_channels.insert(configuration::GetChannel(
2295 factory.configuration(),
2296 "/pi2/aos/remote_timestamps/pi1/pi2/aos/aos-message_bridge-Timestamp",
2297 "aos.message_bridge.RemoteMessage", "", pi2->node()));
2298 VerifyChannels(statistics_channels, monotonic_clock::min_time, {});
2299 }
2300
2301 // Now test that we can disable the messages for a single node
2302 pi2->DisableStatistics();
2303 const aos::monotonic_clock::time_point statistics_disable_time =
2304 pi2->monotonic_now();
2305 factory.RunFor(chrono::milliseconds(10000));
2306
2307 // We should see a much smaller set of messages, but should still see messages
2308 // forwarded, mainly the timestamp message.
2309 {
2310 std::set<const aos::Channel *> statistics_channels;
Brian Silverman18c8aac2022-01-02 00:03:37 -08002311 for (const std::pair<std::string_view, const Node *> &pi :
Austin Schuh48205e62021-11-12 14:13:18 -08002312 std::vector<std::pair<std::string_view, const Node *>>{
2313 {"/pi1/aos", pi1->node()}, {"/pi3/aos", pi1->node()}}) {
2314 statistics_channels.insert(configuration::GetChannel(
2315 factory.configuration(), pi.first, "aos.message_bridge.Timestamp", "",
2316 pi.second));
2317 statistics_channels.insert(configuration::GetChannel(
2318 factory.configuration(), pi.first,
2319 "aos.message_bridge.ServerStatistics", "", pi.second));
2320 statistics_channels.insert(configuration::GetChannel(
2321 factory.configuration(), pi.first,
2322 "aos.message_bridge.ClientStatistics", "", pi.second));
2323 }
2324
2325 statistics_channels.insert(configuration::GetChannel(
2326 factory.configuration(),
2327 "/pi1/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp",
2328 "aos.message_bridge.RemoteMessage", "", pi1->node()));
2329 VerifyChannels(statistics_channels, statistics_disable_time, {});
2330 }
2331
2332 // Now, fully disconnect the node. This will completely quiet down pi2.
2333 pi1->Disconnect(pi2->node());
2334 pi2->Disconnect(pi1->node());
2335
2336 const aos::monotonic_clock::time_point disconnect_disable_time =
2337 pi2->monotonic_now();
2338 factory.RunFor(chrono::milliseconds(10000));
2339
2340 {
2341 std::set<const aos::Channel *> statistics_channels;
Brian Silverman18c8aac2022-01-02 00:03:37 -08002342 for (const std::pair<std::string_view, const Node *> &pi :
Austin Schuh48205e62021-11-12 14:13:18 -08002343 std::vector<std::pair<std::string_view, const Node *>>{
2344 {"/pi1/aos", pi1->node()}, {"/pi3/aos", pi1->node()}}) {
2345 statistics_channels.insert(configuration::GetChannel(
2346 factory.configuration(), pi.first, "aos.message_bridge.Timestamp", "",
2347 pi.second));
2348 statistics_channels.insert(configuration::GetChannel(
2349 factory.configuration(), pi.first,
2350 "aos.message_bridge.ServerStatistics", "", pi.second));
2351 statistics_channels.insert(configuration::GetChannel(
2352 factory.configuration(), pi.first,
2353 "aos.message_bridge.ClientStatistics", "", pi.second));
2354 }
2355
2356 VerifyChannels(statistics_channels, disconnect_disable_time, {pi2->node()});
2357 }
2358}
2359
Neil Balchc8f41ed2018-01-20 22:06:53 -08002360} // namespace testing
2361} // namespace aos