blob: eddacb18dbd04262575c43efa5565ac6673a9fa1 [file] [log] [blame]
Alex Perrycb7da4b2019-08-28 19:35:56 -07001#include "aos/events/simulated_event_loop.h"
2
milind1f1dca32021-07-03 13:50:07 -07003#include <chrono>
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -07004#include <functional>
Austin Schuh5f1cc5c2019-12-01 18:01:11 -08005#include <string_view>
6
Alex Perrycb7da4b2019-08-28 19:35:56 -07007#include "aos/events/event_loop_param_test.h"
Austin Schuh2f8fd752020-09-01 22:38:28 -07008#include "aos/events/logging/logger_generated.h"
Austin Schuh01b4c352020-09-21 23:09:39 -07009#include "aos/events/message_counter.h"
Austin Schuh898f4972020-01-11 17:21:25 -080010#include "aos/events/ping_lib.h"
11#include "aos/events/pong_lib.h"
Austin Schuh7d87b672019-12-01 20:23:49 -080012#include "aos/events/test_message_generated.h"
Austin Schuh4c3b9702020-08-30 11:34:55 -070013#include "aos/network/message_bridge_client_generated.h"
14#include "aos/network/message_bridge_server_generated.h"
Austin Schuh0de30f32020-12-06 12:44:28 -080015#include "aos/network/remote_message_generated.h"
Austin Schuh87dd3832021-01-01 23:07:31 -080016#include "aos/network/testing_time_converter.h"
Austin Schuh4c3b9702020-08-30 11:34:55 -070017#include "aos/network/timestamp_generated.h"
Austin Schuh373f1762021-06-02 21:07:09 -070018#include "aos/testing/path.h"
Neil Balchc8f41ed2018-01-20 22:06:53 -080019#include "gtest/gtest.h"
20
21namespace aos {
22namespace testing {
Brian Silverman28d14302020-09-18 15:26:17 -070023namespace {
24
Austin Schuh373f1762021-06-02 21:07:09 -070025using aos::testing::ArtifactPath;
Brian Silverman28d14302020-09-18 15:26:17 -070026
Austin Schuh58646e22021-08-23 23:51:46 -070027using logger::BootTimestamp;
Austin Schuh0de30f32020-12-06 12:44:28 -080028using message_bridge::RemoteMessage;
Austin Schuh7267c532019-05-19 19:55:53 -070029namespace chrono = ::std::chrono;
30
Austin Schuh0de30f32020-12-06 12:44:28 -080031} // namespace
32
Neil Balchc8f41ed2018-01-20 22:06:53 -080033class SimulatedEventLoopTestFactory : public EventLoopTestFactory {
34 public:
Austin Schuh5f1cc5c2019-12-01 18:01:11 -080035 ::std::unique_ptr<EventLoop> Make(std::string_view name) override {
Austin Schuh217a9782019-12-21 23:02:50 -080036 MaybeMake();
Austin Schuhac0771c2020-01-07 18:36:30 -080037 return event_loop_factory_->MakeEventLoop(name, my_node());
Neil Balchc8f41ed2018-01-20 22:06:53 -080038 }
Austin Schuh5f1cc5c2019-12-01 18:01:11 -080039 ::std::unique_ptr<EventLoop> MakePrimary(std::string_view name) override {
Austin Schuh217a9782019-12-21 23:02:50 -080040 MaybeMake();
Austin Schuhac0771c2020-01-07 18:36:30 -080041 return event_loop_factory_->MakeEventLoop(name, my_node());
Austin Schuh44019f92019-05-19 19:58:27 -070042 }
43
Austin Schuh217a9782019-12-21 23:02:50 -080044 void Run() override { event_loop_factory_->Run(); }
45 void Exit() override { event_loop_factory_->Exit(); }
Austin Schuh44019f92019-05-19 19:58:27 -070046
Austin Schuh52d325c2019-06-23 18:59:06 -070047 // TODO(austin): Implement this. It's used currently for a phased loop test.
48 // I'm not sure how much that matters.
49 void SleepFor(::std::chrono::nanoseconds /*duration*/) override {}
50
Austin Schuh7d87b672019-12-01 20:23:49 -080051 void set_send_delay(std::chrono::nanoseconds send_delay) {
Austin Schuh217a9782019-12-21 23:02:50 -080052 MaybeMake();
53 event_loop_factory_->set_send_delay(send_delay);
Austin Schuh7d87b672019-12-01 20:23:49 -080054 }
55
Neil Balchc8f41ed2018-01-20 22:06:53 -080056 private:
Austin Schuh217a9782019-12-21 23:02:50 -080057 void MaybeMake() {
58 if (!event_loop_factory_) {
59 if (configuration()->has_nodes()) {
Austin Schuhac0771c2020-01-07 18:36:30 -080060 event_loop_factory_ =
61 std::make_unique<SimulatedEventLoopFactory>(configuration());
Austin Schuh217a9782019-12-21 23:02:50 -080062 } else {
63 event_loop_factory_ =
64 std::make_unique<SimulatedEventLoopFactory>(configuration());
65 }
66 }
67 }
68 std::unique_ptr<SimulatedEventLoopFactory> event_loop_factory_;
Neil Balchc8f41ed2018-01-20 22:06:53 -080069};
70
Austin Schuh6bae8252021-02-07 22:01:49 -080071auto CommonParameters() {
72 return ::testing::Combine(
73 ::testing::Values([]() { return new SimulatedEventLoopTestFactory(); }),
74 ::testing::Values(ReadMethod::COPY, ReadMethod::PIN),
75 ::testing::Values(DoTimingReports::kYes, DoTimingReports::kNo));
76}
Austin Schuh6b6dfa52019-06-12 20:16:20 -070077
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -070078INSTANTIATE_TEST_SUITE_P(SimulatedEventLoopCommonTest, AbstractEventLoopTest,
Austin Schuh66168842021-08-17 19:42:21 -070079 CommonParameters());
Brian Silverman77162972020-08-12 19:52:40 -070080
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -070081INSTANTIATE_TEST_SUITE_P(SimulatedEventLoopCommonDeathTest,
Austin Schuh66168842021-08-17 19:42:21 -070082 AbstractEventLoopDeathTest, CommonParameters());
Neil Balchc8f41ed2018-01-20 22:06:53 -080083
Austin Schuh89c9b812021-02-20 14:42:10 -080084// Parameters to run all the tests with.
85struct Param {
86 // The config file to use.
87 std::string config;
88 // If true, the RemoteMessage channel should be shared between all the remote
89 // channels. If false, there will be 1 RemoteMessage channel per remote
90 // channel.
91 bool shared;
92};
93
94class RemoteMessageSimulatedEventLoopTest
95 : public ::testing::TestWithParam<struct Param> {
96 public:
97 RemoteMessageSimulatedEventLoopTest()
98 : config(aos::configuration::ReadConfig(
Austin Schuh373f1762021-06-02 21:07:09 -070099 ArtifactPath(absl::StrCat("aos/events/", GetParam().config)))) {
Austin Schuh89c9b812021-02-20 14:42:10 -0800100 LOG(INFO) << "Config " << GetParam().config;
101 }
102
103 bool shared() const { return GetParam().shared; }
104
105 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
106 MakePi2OnPi1MessageCounters(aos::EventLoop *event_loop) {
107 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>> counters;
108 if (shared()) {
109 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
110 event_loop, "/aos/remote_timestamps/pi2"));
111 } else {
112 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
113 event_loop,
114 "/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp"));
115 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
116 event_loop, "/aos/remote_timestamps/pi2/test/aos-examples-Ping"));
117 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
118 event_loop, "/aos/remote_timestamps/pi2/reliable/aos-examples-Ping"));
119 }
120 return counters;
121 }
122
123 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
124 MakePi1OnPi2MessageCounters(aos::EventLoop *event_loop) {
125 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>> counters;
126 if (shared()) {
127 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
128 event_loop, "/aos/remote_timestamps/pi1"));
129 } else {
130 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
131 event_loop, "/aos/remote_timestamps/pi1/test/aos-examples-Pong"));
132 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
133 event_loop,
134 "/aos/remote_timestamps/pi1/pi2/aos/aos-message_bridge-Timestamp"));
135 }
136 return counters;
137 }
138
139 aos::FlatbufferDetachedBuffer<aos::Configuration> config;
140};
141
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800142class FunctionEvent : public EventScheduler::Event {
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700143 public:
144 FunctionEvent(std::function<void()> fn) : fn_(fn) {}
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800145
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700146 void Handle() noexcept override { fn_(); }
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800147
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700148 private:
149 std::function<void()> fn_;
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800150};
151
Neil Balchc8f41ed2018-01-20 22:06:53 -0800152// Test that creating an event and running the scheduler runs the event.
153TEST(EventSchedulerTest, ScheduleEvent) {
154 int counter = 0;
Austin Schuh8bd96322020-02-13 21:18:22 -0800155 EventSchedulerScheduler scheduler_scheduler;
Austin Schuh58646e22021-08-23 23:51:46 -0700156 EventScheduler scheduler(0);
Austin Schuh8bd96322020-02-13 21:18:22 -0800157 scheduler_scheduler.AddEventScheduler(&scheduler);
Neil Balchc8f41ed2018-01-20 22:06:53 -0800158
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800159 FunctionEvent e([&counter]() { counter += 1; });
160 scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(1), &e);
Austin Schuh8bd96322020-02-13 21:18:22 -0800161 scheduler_scheduler.Run();
Neil Balchc8f41ed2018-01-20 22:06:53 -0800162 EXPECT_EQ(counter, 1);
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800163 auto token =
164 scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(2), &e);
Neil Balchc8f41ed2018-01-20 22:06:53 -0800165 scheduler.Deschedule(token);
Austin Schuh8bd96322020-02-13 21:18:22 -0800166 scheduler_scheduler.Run();
Neil Balchc8f41ed2018-01-20 22:06:53 -0800167 EXPECT_EQ(counter, 1);
168}
169
170// Test that descheduling an already scheduled event doesn't run the event.
171TEST(EventSchedulerTest, DescheduleEvent) {
172 int counter = 0;
Austin Schuh8bd96322020-02-13 21:18:22 -0800173 EventSchedulerScheduler scheduler_scheduler;
Austin Schuh58646e22021-08-23 23:51:46 -0700174 EventScheduler scheduler(0);
Austin Schuh8bd96322020-02-13 21:18:22 -0800175 scheduler_scheduler.AddEventScheduler(&scheduler);
Neil Balchc8f41ed2018-01-20 22:06:53 -0800176
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800177 FunctionEvent e([&counter]() { counter += 1; });
178 auto token =
179 scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(1), &e);
Neil Balchc8f41ed2018-01-20 22:06:53 -0800180 scheduler.Deschedule(token);
Austin Schuh8bd96322020-02-13 21:18:22 -0800181 scheduler_scheduler.Run();
Neil Balchc8f41ed2018-01-20 22:06:53 -0800182 EXPECT_EQ(counter, 0);
183}
Austin Schuh44019f92019-05-19 19:58:27 -0700184
Austin Schuhe33c08d2022-02-03 18:15:21 -0800185// Test that TemporarilyStopAndRun respects and preserves running.
186TEST(EventSchedulerTest, TemporarilyStopAndRun) {
187 int counter = 0;
188 EventSchedulerScheduler scheduler_scheduler;
189 EventScheduler scheduler(0);
190 scheduler_scheduler.AddEventScheduler(&scheduler);
191
192 scheduler_scheduler.TemporarilyStopAndRun(
193 [&]() { CHECK(!scheduler_scheduler.is_running()); });
194 ASSERT_FALSE(scheduler_scheduler.is_running());
195
196 FunctionEvent e([&]() {
197 counter += 1;
198 CHECK(scheduler_scheduler.is_running());
199 scheduler_scheduler.TemporarilyStopAndRun(
200 [&]() { CHECK(!scheduler_scheduler.is_running()); });
201 CHECK(scheduler_scheduler.is_running());
202 });
203 scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(1), &e);
204 scheduler_scheduler.Run();
205 EXPECT_EQ(counter, 1);
206}
207
Austin Schuh8fb315a2020-11-19 22:33:58 -0800208// Test that sending a message after running gets properly notified.
209TEST(SimulatedEventLoopTest, SendAfterRunFor) {
210 SimulatedEventLoopTestFactory factory;
211
212 SimulatedEventLoopFactory simulated_event_loop_factory(
213 factory.configuration());
214
215 ::std::unique_ptr<EventLoop> ping_event_loop =
216 simulated_event_loop_factory.MakeEventLoop("ping");
217 aos::Sender<TestMessage> test_message_sender =
218 ping_event_loop->MakeSender<TestMessage>("/test");
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700219 ASSERT_EQ(SendTestMessage(test_message_sender), RawSender::Error::kOk);
Austin Schuh8fb315a2020-11-19 22:33:58 -0800220
221 std::unique_ptr<EventLoop> pong1_event_loop =
222 simulated_event_loop_factory.MakeEventLoop("pong");
223 MessageCounter<TestMessage> test_message_counter1(pong1_event_loop.get(),
224 "/test");
225
226 EXPECT_FALSE(ping_event_loop->is_running());
227
228 // Watchers start when you start running, so there should be nothing counted.
229 simulated_event_loop_factory.RunFor(chrono::seconds(1));
230 EXPECT_EQ(test_message_counter1.count(), 0u);
231
232 std::unique_ptr<EventLoop> pong2_event_loop =
233 simulated_event_loop_factory.MakeEventLoop("pong");
234 MessageCounter<TestMessage> test_message_counter2(pong2_event_loop.get(),
235 "/test");
236
237 // Pauses in the middle don't count though, so this should be counted.
238 // But, the fresh watcher shouldn't pick it up yet.
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700239 ASSERT_EQ(SendTestMessage(test_message_sender), RawSender::Error::kOk);
Austin Schuh8fb315a2020-11-19 22:33:58 -0800240
241 EXPECT_EQ(test_message_counter1.count(), 0u);
242 EXPECT_EQ(test_message_counter2.count(), 0u);
243 simulated_event_loop_factory.RunFor(chrono::seconds(1));
244
245 EXPECT_EQ(test_message_counter1.count(), 1u);
246 EXPECT_EQ(test_message_counter2.count(), 0u);
247}
248
Austin Schuh60e77942022-05-16 17:48:24 -0700249// Test that if we configure an event loop to be able to send too fast that we
250// do allow it to do so.
James Kuszmaul890c2492022-04-06 14:59:31 -0700251TEST(SimulatedEventLoopTest, AllowSendTooFast) {
252 SimulatedEventLoopTestFactory factory;
253
254 SimulatedEventLoopFactory simulated_event_loop_factory(
255 factory.configuration());
256
257 // Create two event loops: One will be allowed to send too fast, one won't. We
258 // will then test to ensure that the one that is allowed to send too fast can
259 // indeed send too fast, but that it then makes it so that the second event
260 // loop can no longer send anything because *it* is still limited.
261 ::std::unique_ptr<EventLoop> too_fast_event_loop =
262 simulated_event_loop_factory.GetNodeEventLoopFactory(nullptr)
263 ->MakeEventLoop("too_fast_sender",
264 {NodeEventLoopFactory::CheckSentTooFast::kNo,
265 NodeEventLoopFactory::ExclusiveSenders::kNo});
266 aos::Sender<TestMessage> too_fast_message_sender =
267 too_fast_event_loop->MakeSender<TestMessage>("/test");
268
269 ::std::unique_ptr<EventLoop> limited_event_loop =
270 simulated_event_loop_factory.MakeEventLoop("limited_sender");
271 aos::Sender<TestMessage> limited_message_sender =
272 limited_event_loop->MakeSender<TestMessage>("/test");
273
274 const int queue_size = TestChannelQueueSize(too_fast_event_loop.get());
275 for (int ii = 0; ii < queue_size; ++ii) {
276 ASSERT_EQ(SendTestMessage(too_fast_message_sender), RawSender::Error::kOk);
277 }
278 // And now we should start being in the sending-too-fast phase.
279 for (int ii = 0; ii < queue_size; ++ii) {
280 ASSERT_EQ(SendTestMessage(too_fast_message_sender), RawSender::Error::kOk);
Austin Schuh60e77942022-05-16 17:48:24 -0700281 ASSERT_EQ(SendTestMessage(limited_message_sender),
282 RawSender::Error::kMessagesSentTooFast);
James Kuszmaul890c2492022-04-06 14:59:31 -0700283 }
284}
285
286// Test that if we setup an exclusive sender that it is indeed exclusive.
287TEST(SimulatedEventLoopDeathTest, ExclusiveSenders) {
288 SimulatedEventLoopTestFactory factory;
289
290 SimulatedEventLoopFactory simulated_event_loop_factory(
291 factory.configuration());
292
293 ::std::unique_ptr<EventLoop> exclusive_event_loop =
294 simulated_event_loop_factory.GetNodeEventLoopFactory(nullptr)
295 ->MakeEventLoop("too_fast_sender",
296 {NodeEventLoopFactory::CheckSentTooFast::kYes,
297 NodeEventLoopFactory::ExclusiveSenders::kYes});
298 exclusive_event_loop->SkipAosLog();
299 exclusive_event_loop->SkipTimingReport();
300 ::std::unique_ptr<EventLoop> normal_event_loop =
301 simulated_event_loop_factory.MakeEventLoop("limited_sender");
302 // Set things up to have the exclusive sender be destroyed so we can test
303 // recovery.
304 {
305 aos::Sender<TestMessage> exclusive_sender =
306 exclusive_event_loop->MakeSender<TestMessage>("/test");
307
308 EXPECT_DEATH(normal_event_loop->MakeSender<TestMessage>("/test"),
309 "TestMessage");
310 }
311 // This one should succeed now that the exclusive channel is removed.
312 aos::Sender<TestMessage> normal_sender =
313 normal_event_loop->MakeSender<TestMessage>("/test");
Austin Schuh60e77942022-05-16 17:48:24 -0700314 EXPECT_DEATH(exclusive_event_loop->MakeSender<TestMessage>("/test"),
315 "TestMessage");
James Kuszmaul890c2492022-04-06 14:59:31 -0700316}
317
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700318void TestSentTooFastCheckEdgeCase(
319 const std::function<RawSender::Error(int, int)> expected_err,
320 const bool send_twice_at_end) {
321 SimulatedEventLoopTestFactory factory;
322
323 auto event_loop = factory.MakePrimary("primary");
324
325 auto sender = event_loop->MakeSender<TestMessage>("/test");
326
327 const int queue_size = TestChannelQueueSize(event_loop.get());
328 int msgs_sent = 0;
329 event_loop->AddPhasedLoop(
330 [&](int) {
331 EXPECT_EQ(SendTestMessage(sender), expected_err(msgs_sent, queue_size));
332 msgs_sent++;
333
334 // If send_twice_at_end, send the last two messages (message
335 // queue_size and queue_size + 1) in the same iteration, meaning that
336 // we would be sending very slightly too fast. Otherwise, we will send
337 // message queue_size + 1 in the next iteration and we will continue
338 // to be sending exactly at the channel frequency.
339 if (send_twice_at_end && (msgs_sent == queue_size)) {
340 EXPECT_EQ(SendTestMessage(sender),
341 expected_err(msgs_sent, queue_size));
342 msgs_sent++;
343 }
344
345 if (msgs_sent > queue_size) {
346 factory.Exit();
347 }
348 },
349 std::chrono::duration_cast<std::chrono::nanoseconds>(
350 std::chrono::duration<double>(
351 1.0 / TestChannelFrequency(event_loop.get()))));
352
353 factory.Run();
354}
355
356// Tests that RawSender::Error::kMessagesSentTooFast is not returned
357// when messages are sent at the exact frequency of the channel.
358TEST(SimulatedEventLoopTest, SendingAtExactlyChannelFrequency) {
359 TestSentTooFastCheckEdgeCase([](int, int) { return RawSender::Error::kOk; },
360 false);
361}
362
363// Tests that RawSender::Error::kMessagesSentTooFast is returned
364// when sending exactly one more message than allowed in a channel storage
365// duration.
366TEST(SimulatedEventLoopTest, SendingSlightlyTooFast) {
367 TestSentTooFastCheckEdgeCase(
368 [](const int msgs_sent, const int queue_size) {
369 return (msgs_sent == queue_size ? RawSender::Error::kMessagesSentTooFast
370 : RawSender::Error::kOk);
371 },
372 true);
373}
374
Austin Schuh8fb315a2020-11-19 22:33:58 -0800375// Test that creating an event loop while running dies.
376TEST(SimulatedEventLoopDeathTest, MakeEventLoopWhileRunning) {
377 SimulatedEventLoopTestFactory factory;
378
379 SimulatedEventLoopFactory simulated_event_loop_factory(
380 factory.configuration());
381
382 ::std::unique_ptr<EventLoop> event_loop =
383 simulated_event_loop_factory.MakeEventLoop("ping");
384
385 auto timer = event_loop->AddTimer([&]() {
386 EXPECT_DEATH(
387 {
388 ::std::unique_ptr<EventLoop> event_loop2 =
389 simulated_event_loop_factory.MakeEventLoop("ping");
390 },
391 "event loop while running");
392 simulated_event_loop_factory.Exit();
393 });
394
395 event_loop->OnRun([&event_loop, &timer] {
396 timer->Setup(event_loop->monotonic_now() + chrono::milliseconds(50));
397 });
398
399 simulated_event_loop_factory.Run();
400}
401
402// Test that creating a watcher after running dies.
403TEST(SimulatedEventLoopDeathTest, MakeWatcherAfterRunning) {
404 SimulatedEventLoopTestFactory factory;
405
406 SimulatedEventLoopFactory simulated_event_loop_factory(
407 factory.configuration());
408
409 ::std::unique_ptr<EventLoop> event_loop =
410 simulated_event_loop_factory.MakeEventLoop("ping");
411
412 simulated_event_loop_factory.RunFor(chrono::seconds(1));
413
414 EXPECT_DEATH(
415 { MessageCounter<TestMessage> counter(event_loop.get(), "/test"); },
416 "Can't add a watcher after running");
417
418 ::std::unique_ptr<EventLoop> event_loop2 =
419 simulated_event_loop_factory.MakeEventLoop("ping");
420
421 simulated_event_loop_factory.RunFor(chrono::seconds(1));
422
423 EXPECT_DEATH(
424 { MessageCounter<TestMessage> counter(event_loop2.get(), "/test"); },
425 "Can't add a watcher after running");
426}
427
Austin Schuh44019f92019-05-19 19:58:27 -0700428// Test that running for a time period with no handlers causes time to progress
429// correctly.
430TEST(SimulatedEventLoopTest, RunForNoHandlers) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800431 SimulatedEventLoopTestFactory factory;
432
433 SimulatedEventLoopFactory simulated_event_loop_factory(
434 factory.configuration());
Austin Schuh44019f92019-05-19 19:58:27 -0700435 ::std::unique_ptr<EventLoop> event_loop =
Austin Schuh5f1cc5c2019-12-01 18:01:11 -0800436 simulated_event_loop_factory.MakeEventLoop("loop");
Austin Schuh44019f92019-05-19 19:58:27 -0700437
438 simulated_event_loop_factory.RunFor(chrono::seconds(1));
439
440 EXPECT_EQ(::aos::monotonic_clock::epoch() + chrono::seconds(1),
Austin Schuh44019f92019-05-19 19:58:27 -0700441 event_loop->monotonic_now());
442}
443
444// Test that running for a time with a periodic handler causes time to end
445// correctly.
446TEST(SimulatedEventLoopTest, RunForTimerHandler) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800447 SimulatedEventLoopTestFactory factory;
448
449 SimulatedEventLoopFactory simulated_event_loop_factory(
450 factory.configuration());
Austin Schuh44019f92019-05-19 19:58:27 -0700451 ::std::unique_ptr<EventLoop> event_loop =
Austin Schuh5f1cc5c2019-12-01 18:01:11 -0800452 simulated_event_loop_factory.MakeEventLoop("loop");
Austin Schuh44019f92019-05-19 19:58:27 -0700453
454 int counter = 0;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700455 auto timer = event_loop->AddTimer([&counter]() { ++counter; });
Austin Schuh44019f92019-05-19 19:58:27 -0700456 event_loop->OnRun([&event_loop, &timer] {
457 timer->Setup(event_loop->monotonic_now() + chrono::milliseconds(50),
458 chrono::milliseconds(100));
459 });
460
461 simulated_event_loop_factory.RunFor(chrono::seconds(1));
462
463 EXPECT_EQ(::aos::monotonic_clock::epoch() + chrono::seconds(1),
Austin Schuh44019f92019-05-19 19:58:27 -0700464 event_loop->monotonic_now());
465 EXPECT_EQ(counter, 10);
466}
467
Austin Schuh7d87b672019-12-01 20:23:49 -0800468// Tests that watchers have latency in simulation.
469TEST(SimulatedEventLoopTest, WatcherTimingReport) {
470 SimulatedEventLoopTestFactory factory;
471 factory.set_send_delay(std::chrono::microseconds(50));
472
473 FLAGS_timing_report_ms = 1000;
474 auto loop1 = factory.MakePrimary("primary");
475 loop1->MakeWatcher("/test", [](const TestMessage &) {});
476
477 auto loop2 = factory.Make("sender_loop");
478
479 auto loop3 = factory.Make("report_fetcher");
480
481 Fetcher<timing::Report> report_fetcher =
482 loop3->MakeFetcher<timing::Report>("/aos");
483 EXPECT_FALSE(report_fetcher.Fetch());
484
485 auto sender = loop2->MakeSender<TestMessage>("/test");
486
487 // Send 10 messages in the middle of a timing report period so we get
488 // something interesting back.
489 auto test_timer = loop2->AddTimer([&sender]() {
490 for (int i = 0; i < 10; ++i) {
491 aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
492 TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
493 builder.add_value(200 + i);
milind1f1dca32021-07-03 13:50:07 -0700494 msg.CheckOk(msg.Send(builder.Finish()));
Austin Schuh7d87b672019-12-01 20:23:49 -0800495 }
496 });
497
498 // Quit after 1 timing report, mid way through the next cycle.
499 {
500 auto end_timer = loop1->AddTimer([&factory]() { factory.Exit(); });
501 end_timer->Setup(loop1->monotonic_now() + chrono::milliseconds(2500));
502 end_timer->set_name("end");
503 }
504
505 loop1->OnRun([&test_timer, &loop1]() {
506 test_timer->Setup(loop1->monotonic_now() + chrono::milliseconds(1500));
507 });
508
509 factory.Run();
510
511 // And, since we are here, check that the timing report makes sense.
512 // Start by looking for our event loop's timing.
513 FlatbufferDetachedBuffer<timing::Report> primary_report =
514 FlatbufferDetachedBuffer<timing::Report>::Empty();
515 while (report_fetcher.FetchNext()) {
516 LOG(INFO) << "Report " << FlatbufferToJson(report_fetcher.get());
517 if (report_fetcher->name()->string_view() == "primary") {
518 primary_report = CopyFlatBuffer(report_fetcher.get());
519 }
520 }
521
522 // Check the watcher report.
Ravago Jonescf453ab2020-05-06 21:14:53 -0700523 VLOG(1) << FlatbufferToJson(primary_report, {.multi_line = true});
Austin Schuh7d87b672019-12-01 20:23:49 -0800524
525 EXPECT_EQ(primary_report.message().name()->string_view(), "primary");
526
527 // Just the timing report timer.
528 ASSERT_NE(primary_report.message().timers(), nullptr);
529 EXPECT_EQ(primary_report.message().timers()->size(), 2);
530
531 // No phased loops
532 ASSERT_EQ(primary_report.message().phased_loops(), nullptr);
533
534 // And now confirm that the watcher received all 10 messages, and has latency.
535 ASSERT_NE(primary_report.message().watchers(), nullptr);
536 ASSERT_EQ(primary_report.message().watchers()->size(), 1);
537 EXPECT_EQ(primary_report.message().watchers()->Get(0)->count(), 10);
538 EXPECT_NEAR(
539 primary_report.message().watchers()->Get(0)->wakeup_latency()->average(),
540 0.00005, 1e-9);
541 EXPECT_NEAR(
542 primary_report.message().watchers()->Get(0)->wakeup_latency()->min(),
543 0.00005, 1e-9);
544 EXPECT_NEAR(
545 primary_report.message().watchers()->Get(0)->wakeup_latency()->max(),
546 0.00005, 1e-9);
547 EXPECT_EQ(primary_report.message()
548 .watchers()
549 ->Get(0)
550 ->wakeup_latency()
551 ->standard_deviation(),
552 0.0);
553
554 EXPECT_EQ(
555 primary_report.message().watchers()->Get(0)->handler_time()->average(),
556 0.0);
557 EXPECT_EQ(primary_report.message().watchers()->Get(0)->handler_time()->min(),
558 0.0);
559 EXPECT_EQ(primary_report.message().watchers()->Get(0)->handler_time()->max(),
560 0.0);
561 EXPECT_EQ(primary_report.message()
562 .watchers()
563 ->Get(0)
564 ->handler_time()
565 ->standard_deviation(),
566 0.0);
567}
568
Austin Schuh89c9b812021-02-20 14:42:10 -0800569size_t CountAll(
570 const std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
571 &counters) {
572 size_t count = 0u;
573 for (const std::unique_ptr<MessageCounter<RemoteMessage>> &counter :
574 counters) {
575 count += counter->count();
576 }
577 return count;
578}
579
Austin Schuh4c3b9702020-08-30 11:34:55 -0700580// Tests that ping and pong work when on 2 different nodes, and the message
581// gateway messages are sent out as expected.
Austin Schuh89c9b812021-02-20 14:42:10 -0800582TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodePingPong) {
Austin Schuh898f4972020-01-11 17:21:25 -0800583 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
584 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
Austin Schuh4c3b9702020-08-30 11:34:55 -0700585 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
Austin Schuh898f4972020-01-11 17:21:25 -0800586
587 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
588
589 std::unique_ptr<EventLoop> ping_event_loop =
590 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
591 Ping ping(ping_event_loop.get());
592
593 std::unique_ptr<EventLoop> pong_event_loop =
594 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
595 Pong pong(pong_event_loop.get());
596
597 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
598 simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700599 MessageCounter<examples::Pong> pi2_pong_counter(
600 pi2_pong_counter_event_loop.get(), "/test");
Austin Schuh2f8fd752020-09-01 22:38:28 -0700601 aos::Fetcher<message_bridge::Timestamp> pi1_on_pi2_timestamp_fetcher =
602 pi2_pong_counter_event_loop->MakeFetcher<message_bridge::Timestamp>(
603 "/pi1/aos");
604 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
605 pi2_pong_counter_event_loop->MakeFetcher<examples::Ping>("/test");
Austin Schuh898f4972020-01-11 17:21:25 -0800606
Austin Schuh4c3b9702020-08-30 11:34:55 -0700607 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
608 simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
Austin Schuh898f4972020-01-11 17:21:25 -0800609
610 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
611 simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700612 MessageCounter<examples::Pong> pi1_pong_counter(
613 pi1_pong_counter_event_loop.get(), "/test");
Austin Schuh2f8fd752020-09-01 22:38:28 -0700614 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
615 pi1_pong_counter_event_loop->MakeFetcher<examples::Ping>("/test");
616 aos::Fetcher<message_bridge::Timestamp> pi1_on_pi1_timestamp_fetcher =
617 pi1_pong_counter_event_loop->MakeFetcher<message_bridge::Timestamp>(
618 "/aos");
619
Austin Schuh4c3b9702020-08-30 11:34:55 -0700620 // Count timestamps.
621 MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
622 pi1_pong_counter_event_loop.get(), "/pi1/aos");
623 MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
624 pi2_pong_counter_event_loop.get(), "/pi1/aos");
625 MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
626 pi3_pong_counter_event_loop.get(), "/pi1/aos");
627 MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
628 pi1_pong_counter_event_loop.get(), "/pi2/aos");
629 MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
630 pi2_pong_counter_event_loop.get(), "/pi2/aos");
631 MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
632 pi1_pong_counter_event_loop.get(), "/pi3/aos");
633 MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
634 pi3_pong_counter_event_loop.get(), "/pi3/aos");
635
Austin Schuh2f8fd752020-09-01 22:38:28 -0700636 // Count remote timestamps
Austin Schuh89c9b812021-02-20 14:42:10 -0800637 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
638 remote_timestamps_pi2_on_pi1 =
639 MakePi2OnPi1MessageCounters(pi1_pong_counter_event_loop.get());
640 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
641 remote_timestamps_pi1_on_pi2 =
642 MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700643
Austin Schuh4c3b9702020-08-30 11:34:55 -0700644 // Wait to let timestamp estimation start up before looking for the results.
645 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
646
Austin Schuh8fb315a2020-11-19 22:33:58 -0800647 std::unique_ptr<EventLoop> pi1_statistics_counter_event_loop =
648 simulated_event_loop_factory.MakeEventLoop("pi1_statistics_counter", pi1);
649 std::unique_ptr<EventLoop> pi2_statistics_counter_event_loop =
650 simulated_event_loop_factory.MakeEventLoop("pi2_statistics_counter", pi2);
651 std::unique_ptr<EventLoop> pi3_statistics_counter_event_loop =
652 simulated_event_loop_factory.MakeEventLoop("pi3_statistics_counter", pi3);
653
Austin Schuh4c3b9702020-08-30 11:34:55 -0700654 int pi1_server_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800655 pi1_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700656 "/pi1/aos", [&pi1_server_statistics_count](
657 const message_bridge::ServerStatistics &stats) {
658 VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
659 EXPECT_EQ(stats.connections()->size(), 2u);
660 for (const message_bridge::ServerConnection *connection :
661 *stats.connections()) {
662 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800663 EXPECT_EQ(connection->connection_count(), 1u);
664 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800665 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700666 if (connection->node()->name()->string_view() == "pi2") {
667 EXPECT_GT(connection->sent_packets(), 50);
668 } else if (connection->node()->name()->string_view() == "pi3") {
669 EXPECT_GE(connection->sent_packets(), 5);
670 } else {
671 LOG(FATAL) << "Unknown connection";
672 }
673
674 EXPECT_TRUE(connection->has_monotonic_offset());
675 EXPECT_EQ(connection->monotonic_offset(), 0);
676 }
677 ++pi1_server_statistics_count;
678 });
679
680 int pi2_server_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800681 pi2_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700682 "/pi2/aos", [&pi2_server_statistics_count](
683 const message_bridge::ServerStatistics &stats) {
684 VLOG(1) << "pi2 ServerStatistics " << FlatbufferToJson(&stats);
685 EXPECT_EQ(stats.connections()->size(), 1u);
686
687 const message_bridge::ServerConnection *connection =
688 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800689 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700690 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
691 EXPECT_GT(connection->sent_packets(), 50);
692 EXPECT_TRUE(connection->has_monotonic_offset());
693 EXPECT_EQ(connection->monotonic_offset(), 0);
Austin Schuh367a7f42021-11-23 23:04:36 -0800694 EXPECT_EQ(connection->connection_count(), 1u);
695 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700696 ++pi2_server_statistics_count;
697 });
698
699 int pi3_server_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800700 pi3_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700701 "/pi3/aos", [&pi3_server_statistics_count](
702 const message_bridge::ServerStatistics &stats) {
703 VLOG(1) << "pi3 ServerStatistics " << FlatbufferToJson(&stats);
704 EXPECT_EQ(stats.connections()->size(), 1u);
705
706 const message_bridge::ServerConnection *connection =
707 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800708 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700709 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
710 EXPECT_GE(connection->sent_packets(), 5);
711 EXPECT_TRUE(connection->has_monotonic_offset());
712 EXPECT_EQ(connection->monotonic_offset(), 0);
Austin Schuh367a7f42021-11-23 23:04:36 -0800713 EXPECT_EQ(connection->connection_count(), 1u);
714 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700715 ++pi3_server_statistics_count;
716 });
717
718 int pi1_client_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800719 pi1_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700720 "/pi1/aos", [&pi1_client_statistics_count](
721 const message_bridge::ClientStatistics &stats) {
722 VLOG(1) << "pi1 ClientStatistics " << FlatbufferToJson(&stats);
723 EXPECT_EQ(stats.connections()->size(), 2u);
724
725 for (const message_bridge::ClientConnection *connection :
726 *stats.connections()) {
727 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
728 if (connection->node()->name()->string_view() == "pi2") {
729 EXPECT_GT(connection->received_packets(), 50);
730 } else if (connection->node()->name()->string_view() == "pi3") {
731 EXPECT_GE(connection->received_packets(), 5);
732 } else {
733 LOG(FATAL) << "Unknown connection";
734 }
735
Austin Schuhe61d4382021-03-31 21:33:02 -0700736 EXPECT_EQ(connection->partial_deliveries(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700737 EXPECT_TRUE(connection->has_monotonic_offset());
738 EXPECT_EQ(connection->monotonic_offset(), 150000);
Austin Schuh367a7f42021-11-23 23:04:36 -0800739 EXPECT_EQ(connection->connection_count(), 1u);
740 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700741 }
742 ++pi1_client_statistics_count;
743 });
744
745 int pi2_client_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800746 pi2_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700747 "/pi2/aos", [&pi2_client_statistics_count](
748 const message_bridge::ClientStatistics &stats) {
749 VLOG(1) << "pi2 ClientStatistics " << FlatbufferToJson(&stats);
750 EXPECT_EQ(stats.connections()->size(), 1u);
751
752 const message_bridge::ClientConnection *connection =
753 stats.connections()->Get(0);
754 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
755 EXPECT_GT(connection->received_packets(), 50);
Austin Schuhe61d4382021-03-31 21:33:02 -0700756 EXPECT_EQ(connection->partial_deliveries(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700757 EXPECT_TRUE(connection->has_monotonic_offset());
758 EXPECT_EQ(connection->monotonic_offset(), 150000);
Austin Schuh367a7f42021-11-23 23:04:36 -0800759 EXPECT_EQ(connection->connection_count(), 1u);
760 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700761 ++pi2_client_statistics_count;
762 });
763
764 int pi3_client_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800765 pi3_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700766 "/pi3/aos", [&pi3_client_statistics_count](
767 const message_bridge::ClientStatistics &stats) {
768 VLOG(1) << "pi3 ClientStatistics " << FlatbufferToJson(&stats);
769 EXPECT_EQ(stats.connections()->size(), 1u);
770
771 const message_bridge::ClientConnection *connection =
772 stats.connections()->Get(0);
773 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
774 EXPECT_GE(connection->received_packets(), 5);
Austin Schuhe61d4382021-03-31 21:33:02 -0700775 EXPECT_EQ(connection->partial_deliveries(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700776 EXPECT_TRUE(connection->has_monotonic_offset());
777 EXPECT_EQ(connection->monotonic_offset(), 150000);
Austin Schuh367a7f42021-11-23 23:04:36 -0800778 EXPECT_EQ(connection->connection_count(), 1u);
779 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700780 ++pi3_client_statistics_count;
781 });
782
Austin Schuh2f8fd752020-09-01 22:38:28 -0700783 // Find the channel index for both the /pi1/aos Timestamp channel and Ping
784 // channel.
785 const size_t pi1_timestamp_channel =
786 configuration::ChannelIndex(pi1_pong_counter_event_loop->configuration(),
787 pi1_on_pi2_timestamp_fetcher.channel());
788 const size_t ping_timestamp_channel =
789 configuration::ChannelIndex(pi1_pong_counter_event_loop->configuration(),
790 ping_on_pi2_fetcher.channel());
791
792 for (const Channel *channel :
793 *pi1_pong_counter_event_loop->configuration()->channels()) {
794 VLOG(1) << "Channel "
795 << configuration::ChannelIndex(
796 pi1_pong_counter_event_loop->configuration(), channel)
797 << " " << configuration::CleanedChannelToString(channel);
798 }
799
Austin Schuh8fb315a2020-11-19 22:33:58 -0800800 std::unique_ptr<EventLoop> pi1_remote_timestamp =
801 simulated_event_loop_factory.MakeEventLoop("pi1_remote_timestamp", pi1);
802
Austin Schuh89c9b812021-02-20 14:42:10 -0800803 for (std::pair<int, std::string> channel :
804 shared()
805 ? std::vector<std::pair<
806 int, std::string>>{{-1, "/pi1/aos/remote_timestamps/pi2"}}
807 : std::vector<std::pair<int, std::string>>{
808 {pi1_timestamp_channel,
809 "/pi1/aos/remote_timestamps/pi2/pi1/aos/"
810 "aos-message_bridge-Timestamp"},
811 {ping_timestamp_channel,
812 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping"}}) {
813 // For each remote timestamp we get back, confirm that it is either a ping
814 // message, or a timestamp we sent out. Also confirm that the timestamps
815 // are correct.
816 pi1_remote_timestamp->MakeWatcher(
817 channel.second,
818 [pi1_timestamp_channel, ping_timestamp_channel, &ping_on_pi2_fetcher,
819 &ping_on_pi1_fetcher, &pi1_on_pi2_timestamp_fetcher,
820 &pi1_on_pi1_timestamp_fetcher, &simulated_event_loop_factory, pi2,
821 channel_index = channel.first](const RemoteMessage &header) {
822 VLOG(1) << aos::FlatbufferToJson(&header);
823 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuhcdd90272021-03-15 12:46:16 -0700824 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
Austin Schuh89c9b812021-02-20 14:42:10 -0800825 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
Austin Schuhcdd90272021-03-15 12:46:16 -0700826 ->boot_uuid());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700827
Austin Schuh89c9b812021-02-20 14:42:10 -0800828 const aos::monotonic_clock::time_point header_monotonic_sent_time(
829 chrono::nanoseconds(header.monotonic_sent_time()));
830 const aos::realtime_clock::time_point header_realtime_sent_time(
831 chrono::nanoseconds(header.realtime_sent_time()));
832 const aos::monotonic_clock::time_point header_monotonic_remote_time(
833 chrono::nanoseconds(header.monotonic_remote_time()));
834 const aos::realtime_clock::time_point header_realtime_remote_time(
835 chrono::nanoseconds(header.realtime_remote_time()));
Austin Schuh2f8fd752020-09-01 22:38:28 -0700836
Austin Schuh89c9b812021-02-20 14:42:10 -0800837 if (channel_index != -1) {
838 ASSERT_EQ(channel_index, header.channel_index());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700839 }
840
Austin Schuh89c9b812021-02-20 14:42:10 -0800841 const Context *pi1_context = nullptr;
842 const Context *pi2_context = nullptr;
843
844 if (header.channel_index() == pi1_timestamp_channel) {
845 // Find the forwarded message.
846 while (pi1_on_pi2_timestamp_fetcher.context().monotonic_event_time <
847 header_monotonic_sent_time) {
848 ASSERT_TRUE(pi1_on_pi2_timestamp_fetcher.FetchNext());
849 }
850
851 // And the source message.
852 while (pi1_on_pi1_timestamp_fetcher.context().monotonic_event_time <
853 header_monotonic_remote_time) {
854 ASSERT_TRUE(pi1_on_pi1_timestamp_fetcher.FetchNext());
855 }
856
857 pi1_context = &pi1_on_pi1_timestamp_fetcher.context();
858 pi2_context = &pi1_on_pi2_timestamp_fetcher.context();
859 } else if (header.channel_index() == ping_timestamp_channel) {
860 // Find the forwarded message.
861 while (ping_on_pi2_fetcher.context().monotonic_event_time <
862 header_monotonic_sent_time) {
863 ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
864 }
865
866 // And the source message.
867 while (ping_on_pi1_fetcher.context().monotonic_event_time <
868 header_monotonic_remote_time) {
869 ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
870 }
871
872 pi1_context = &ping_on_pi1_fetcher.context();
873 pi2_context = &ping_on_pi2_fetcher.context();
874 } else {
875 LOG(FATAL) << "Unknown channel";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700876 }
877
Austin Schuh89c9b812021-02-20 14:42:10 -0800878 // Confirm the forwarded message has matching timestamps to the
879 // timestamps we got back.
880 EXPECT_EQ(pi2_context->queue_index, header.queue_index());
881 EXPECT_EQ(pi2_context->remote_queue_index,
882 header.remote_queue_index());
883 EXPECT_EQ(pi2_context->monotonic_event_time,
884 header_monotonic_sent_time);
885 EXPECT_EQ(pi2_context->realtime_event_time,
886 header_realtime_sent_time);
887 EXPECT_EQ(pi2_context->realtime_remote_time,
888 header_realtime_remote_time);
889 EXPECT_EQ(pi2_context->monotonic_remote_time,
890 header_monotonic_remote_time);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700891
Austin Schuh89c9b812021-02-20 14:42:10 -0800892 // Confirm the forwarded message also matches the source message.
893 EXPECT_EQ(pi1_context->queue_index, header.remote_queue_index());
894 EXPECT_EQ(pi1_context->monotonic_event_time,
895 header_monotonic_remote_time);
896 EXPECT_EQ(pi1_context->realtime_event_time,
897 header_realtime_remote_time);
898 });
899 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700900
Austin Schuh4c3b9702020-08-30 11:34:55 -0700901 simulated_event_loop_factory.RunFor(chrono::seconds(10) -
902 chrono::milliseconds(500) +
903 chrono::milliseconds(5));
904
905 EXPECT_EQ(pi1_pong_counter.count(), 1001);
906 EXPECT_EQ(pi2_pong_counter.count(), 1001);
907
908 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 100);
909 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 100);
910 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 100);
911 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 100);
912 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 100);
913 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 100);
914 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 100);
915
Austin Schuh20ac95d2020-12-05 17:24:19 -0800916 EXPECT_EQ(pi1_server_statistics_count, 10);
917 EXPECT_EQ(pi2_server_statistics_count, 10);
918 EXPECT_EQ(pi3_server_statistics_count, 10);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700919
920 EXPECT_EQ(pi1_client_statistics_count, 95);
921 EXPECT_EQ(pi2_client_statistics_count, 95);
922 EXPECT_EQ(pi3_client_statistics_count, 95);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700923
924 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -0800925 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 1101);
926 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 1101);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700927}
928
929// Tests that an offset between nodes can be recovered and shows up in
930// ServerStatistics correctly.
931TEST(SimulatedEventLoopTest, MultinodePingPongWithOffset) {
932 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
Austin Schuh373f1762021-06-02 21:07:09 -0700933 aos::configuration::ReadConfig(ArtifactPath(
934 "aos/events/multinode_pingpong_test_combined_config.json"));
Austin Schuh4c3b9702020-08-30 11:34:55 -0700935 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
Austin Schuh87dd3832021-01-01 23:07:31 -0800936 const size_t pi1_index = configuration::GetNodeIndex(&config.message(), pi1);
937 ASSERT_EQ(pi1_index, 0u);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700938 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
Austin Schuh87dd3832021-01-01 23:07:31 -0800939 const size_t pi2_index = configuration::GetNodeIndex(&config.message(), pi2);
940 ASSERT_EQ(pi2_index, 1u);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700941 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
Austin Schuh87dd3832021-01-01 23:07:31 -0800942 const size_t pi3_index = configuration::GetNodeIndex(&config.message(), pi3);
943 ASSERT_EQ(pi3_index, 2u);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700944
Austin Schuh87dd3832021-01-01 23:07:31 -0800945 message_bridge::TestingTimeConverter time(
946 configuration::NodesCount(&config.message()));
Austin Schuh4c3b9702020-08-30 11:34:55 -0700947 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
Austin Schuh58646e22021-08-23 23:51:46 -0700948 simulated_event_loop_factory.SetTimeConverter(&time);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700949
950 constexpr chrono::milliseconds kOffset{1501};
Austin Schuh87dd3832021-01-01 23:07:31 -0800951 time.AddNextTimestamp(
952 distributed_clock::epoch(),
Austin Schuh58646e22021-08-23 23:51:46 -0700953 {BootTimestamp::epoch(), BootTimestamp::epoch() + kOffset,
954 BootTimestamp::epoch()});
Austin Schuh4c3b9702020-08-30 11:34:55 -0700955
956 std::unique_ptr<EventLoop> ping_event_loop =
957 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
958 Ping ping(ping_event_loop.get());
959
960 std::unique_ptr<EventLoop> pong_event_loop =
961 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
962 Pong pong(pong_event_loop.get());
963
Austin Schuh8fb315a2020-11-19 22:33:58 -0800964 // Wait to let timestamp estimation start up before looking for the results.
965 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
966
Austin Schuh87dd3832021-01-01 23:07:31 -0800967 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
968 simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
969
Austin Schuh4c3b9702020-08-30 11:34:55 -0700970 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
971 simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
972
973 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
974 simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
975
Austin Schuh4c3b9702020-08-30 11:34:55 -0700976 // Confirm the offsets are being recovered correctly.
977 int pi1_server_statistics_count = 0;
978 pi1_pong_counter_event_loop->MakeWatcher(
979 "/pi1/aos", [&pi1_server_statistics_count,
980 kOffset](const message_bridge::ServerStatistics &stats) {
981 VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
982 EXPECT_EQ(stats.connections()->size(), 2u);
983 for (const message_bridge::ServerConnection *connection :
984 *stats.connections()) {
985 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800986 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700987 if (connection->node()->name()->string_view() == "pi2") {
988 EXPECT_EQ(connection->monotonic_offset(),
989 chrono::nanoseconds(kOffset).count());
990 } else if (connection->node()->name()->string_view() == "pi3") {
991 EXPECT_EQ(connection->monotonic_offset(), 0);
992 } else {
993 LOG(FATAL) << "Unknown connection";
994 }
995
996 EXPECT_TRUE(connection->has_monotonic_offset());
997 }
998 ++pi1_server_statistics_count;
999 });
1000
1001 int pi2_server_statistics_count = 0;
1002 pi2_pong_counter_event_loop->MakeWatcher(
1003 "/pi2/aos", [&pi2_server_statistics_count,
1004 kOffset](const message_bridge::ServerStatistics &stats) {
1005 VLOG(1) << "pi2 ServerStatistics " << FlatbufferToJson(&stats);
1006 EXPECT_EQ(stats.connections()->size(), 1u);
1007
1008 const message_bridge::ServerConnection *connection =
1009 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001010 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -07001011 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1012 EXPECT_TRUE(connection->has_monotonic_offset());
1013 EXPECT_EQ(connection->monotonic_offset(),
1014 -chrono::nanoseconds(kOffset).count());
1015 ++pi2_server_statistics_count;
1016 });
1017
1018 int pi3_server_statistics_count = 0;
1019 pi3_pong_counter_event_loop->MakeWatcher(
1020 "/pi3/aos", [&pi3_server_statistics_count](
1021 const message_bridge::ServerStatistics &stats) {
1022 VLOG(1) << "pi3 ServerStatistics " << FlatbufferToJson(&stats);
1023 EXPECT_EQ(stats.connections()->size(), 1u);
1024
1025 const message_bridge::ServerConnection *connection =
1026 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001027 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -07001028 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1029 EXPECT_TRUE(connection->has_monotonic_offset());
1030 EXPECT_EQ(connection->monotonic_offset(), 0);
1031 ++pi3_server_statistics_count;
1032 });
1033
1034 simulated_event_loop_factory.RunFor(chrono::seconds(10) -
1035 chrono::milliseconds(500) +
1036 chrono::milliseconds(5));
1037
Austin Schuh20ac95d2020-12-05 17:24:19 -08001038 EXPECT_EQ(pi1_server_statistics_count, 10);
Austin Schuh58646e22021-08-23 23:51:46 -07001039 EXPECT_EQ(pi2_server_statistics_count, 10);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001040 EXPECT_EQ(pi3_server_statistics_count, 10);
Austin Schuh4c3b9702020-08-30 11:34:55 -07001041}
1042
1043// Test that disabling statistics actually disables them.
Austin Schuh89c9b812021-02-20 14:42:10 -08001044TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeWithoutStatistics) {
Austin Schuh4c3b9702020-08-30 11:34:55 -07001045 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
1046 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
1047 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
1048
1049 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
1050 simulated_event_loop_factory.DisableStatistics();
1051
1052 std::unique_ptr<EventLoop> ping_event_loop =
1053 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
1054 Ping ping(ping_event_loop.get());
1055
1056 std::unique_ptr<EventLoop> pong_event_loop =
1057 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
1058 Pong pong(pong_event_loop.get());
1059
1060 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
1061 simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
1062
1063 MessageCounter<examples::Pong> pi2_pong_counter(
1064 pi2_pong_counter_event_loop.get(), "/test");
1065
1066 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
1067 simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
1068
1069 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
1070 simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
1071
1072 MessageCounter<examples::Pong> pi1_pong_counter(
1073 pi1_pong_counter_event_loop.get(), "/test");
1074
1075 // Count timestamps.
1076 MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
1077 pi1_pong_counter_event_loop.get(), "/pi1/aos");
1078 MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
1079 pi2_pong_counter_event_loop.get(), "/pi1/aos");
1080 MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
1081 pi3_pong_counter_event_loop.get(), "/pi1/aos");
1082 MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
1083 pi1_pong_counter_event_loop.get(), "/pi2/aos");
1084 MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
1085 pi2_pong_counter_event_loop.get(), "/pi2/aos");
1086 MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
1087 pi1_pong_counter_event_loop.get(), "/pi3/aos");
1088 MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
1089 pi3_pong_counter_event_loop.get(), "/pi3/aos");
1090
Austin Schuh2f8fd752020-09-01 22:38:28 -07001091 // Count remote timestamps
Austin Schuh89c9b812021-02-20 14:42:10 -08001092 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1093 remote_timestamps_pi2_on_pi1 =
1094 MakePi2OnPi1MessageCounters(pi1_pong_counter_event_loop.get());
1095 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1096 remote_timestamps_pi1_on_pi2 =
1097 MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
Austin Schuh2f8fd752020-09-01 22:38:28 -07001098
Austin Schuh4c3b9702020-08-30 11:34:55 -07001099 MessageCounter<message_bridge::ServerStatistics>
1100 pi1_server_statistics_counter(pi1_pong_counter_event_loop.get(),
1101 "/pi1/aos");
1102 MessageCounter<message_bridge::ServerStatistics>
1103 pi2_server_statistics_counter(pi2_pong_counter_event_loop.get(),
1104 "/pi2/aos");
1105 MessageCounter<message_bridge::ServerStatistics>
1106 pi3_server_statistics_counter(pi3_pong_counter_event_loop.get(),
1107 "/pi3/aos");
1108
1109 MessageCounter<message_bridge::ClientStatistics>
1110 pi1_client_statistics_counter(pi1_pong_counter_event_loop.get(),
1111 "/pi1/aos");
1112 MessageCounter<message_bridge::ClientStatistics>
1113 pi2_client_statistics_counter(pi2_pong_counter_event_loop.get(),
1114 "/pi2/aos");
1115 MessageCounter<message_bridge::ClientStatistics>
1116 pi3_client_statistics_counter(pi3_pong_counter_event_loop.get(),
1117 "/pi3/aos");
Austin Schuh898f4972020-01-11 17:21:25 -08001118
1119 simulated_event_loop_factory.RunFor(chrono::seconds(10) +
1120 chrono::milliseconds(5));
1121
Austin Schuh4c3b9702020-08-30 11:34:55 -07001122 EXPECT_EQ(pi1_pong_counter.count(), 1001u);
1123 EXPECT_EQ(pi2_pong_counter.count(), 1001u);
1124
1125 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 0u);
1126 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 0u);
1127 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 0u);
1128 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 0u);
1129 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 0u);
1130 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 0u);
1131 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 0u);
1132
1133 EXPECT_EQ(pi1_server_statistics_counter.count(), 0u);
1134 EXPECT_EQ(pi2_server_statistics_counter.count(), 0u);
1135 EXPECT_EQ(pi3_server_statistics_counter.count(), 0u);
1136
1137 EXPECT_EQ(pi1_client_statistics_counter.count(), 0u);
1138 EXPECT_EQ(pi2_client_statistics_counter.count(), 0u);
1139 EXPECT_EQ(pi3_client_statistics_counter.count(), 0u);
Austin Schuh2f8fd752020-09-01 22:38:28 -07001140
1141 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001142 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 1001);
1143 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 1001);
Austin Schuh898f4972020-01-11 17:21:25 -08001144}
1145
Austin Schuhc0b0f722020-12-12 18:36:06 -08001146bool AllConnected(const message_bridge::ServerStatistics *server_statistics) {
1147 for (const message_bridge::ServerConnection *connection :
1148 *server_statistics->connections()) {
1149 if (connection->state() != message_bridge::State::CONNECTED) {
1150 return false;
1151 }
1152 }
1153 return true;
1154}
1155
1156bool AllConnectedBut(const message_bridge::ServerStatistics *server_statistics,
1157 std::string_view target) {
1158 for (const message_bridge::ServerConnection *connection :
1159 *server_statistics->connections()) {
1160 if (connection->node()->name()->string_view() == target) {
1161 if (connection->state() == message_bridge::State::CONNECTED) {
1162 return false;
1163 }
1164 } else {
1165 if (connection->state() != message_bridge::State::CONNECTED) {
1166 return false;
1167 }
1168 }
1169 }
1170 return true;
1171}
1172
1173bool AllConnected(const message_bridge::ClientStatistics *client_statistics) {
1174 for (const message_bridge::ClientConnection *connection :
1175 *client_statistics->connections()) {
1176 if (connection->state() != message_bridge::State::CONNECTED) {
1177 return false;
1178 }
Austin Schuh367a7f42021-11-23 23:04:36 -08001179 EXPECT_TRUE(connection->has_boot_uuid());
1180 EXPECT_TRUE(connection->has_connected_since_time());
1181 EXPECT_TRUE(connection->has_connection_count());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001182 }
1183 return true;
1184}
1185
1186bool AllConnectedBut(const message_bridge::ClientStatistics *client_statistics,
1187 std::string_view target) {
1188 for (const message_bridge::ClientConnection *connection :
1189 *client_statistics->connections()) {
1190 if (connection->node()->name()->string_view() == target) {
1191 if (connection->state() == message_bridge::State::CONNECTED) {
1192 return false;
1193 }
Austin Schuh367a7f42021-11-23 23:04:36 -08001194 EXPECT_FALSE(connection->has_boot_uuid());
1195 EXPECT_FALSE(connection->has_connected_since_time());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001196 } else {
1197 if (connection->state() != message_bridge::State::CONNECTED) {
1198 return false;
1199 }
Austin Schuh367a7f42021-11-23 23:04:36 -08001200 EXPECT_TRUE(connection->has_boot_uuid());
1201 EXPECT_TRUE(connection->has_connected_since_time());
1202 EXPECT_TRUE(connection->has_connection_count());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001203 }
1204 }
1205 return true;
1206}
1207
Austin Schuh367a7f42021-11-23 23:04:36 -08001208int ConnectedCount(const message_bridge::ClientStatistics *client_statistics,
1209 std::string_view target) {
1210 for (const message_bridge::ClientConnection *connection :
1211 *client_statistics->connections()) {
1212 if (connection->node()->name()->string_view() == target) {
1213 return connection->connection_count();
1214 }
1215 }
1216 return 0;
1217}
1218
1219int ConnectedCount(const message_bridge::ServerStatistics *server_statistics,
1220 std::string_view target) {
1221 for (const message_bridge::ServerConnection *connection :
1222 *server_statistics->connections()) {
1223 if (connection->node()->name()->string_view() == target) {
1224 return connection->connection_count();
1225 }
1226 }
1227 return 0;
1228}
1229
Austin Schuhc0b0f722020-12-12 18:36:06 -08001230// Test that disconnecting nodes actually disconnects them.
Austin Schuh89c9b812021-02-20 14:42:10 -08001231TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeDisconnect) {
Austin Schuhc0b0f722020-12-12 18:36:06 -08001232 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
1233
Austin Schuh58646e22021-08-23 23:51:46 -07001234 NodeEventLoopFactory *pi1 =
1235 simulated_event_loop_factory.GetNodeEventLoopFactory("pi1");
1236 NodeEventLoopFactory *pi2 =
1237 simulated_event_loop_factory.GetNodeEventLoopFactory("pi2");
1238 NodeEventLoopFactory *pi3 =
1239 simulated_event_loop_factory.GetNodeEventLoopFactory("pi3");
1240
1241 std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001242 Ping ping(ping_event_loop.get());
1243
Austin Schuh58646e22021-08-23 23:51:46 -07001244 std::unique_ptr<EventLoop> pong_event_loop = pi2->MakeEventLoop("pong");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001245 Pong pong(pong_event_loop.get());
1246
1247 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
Austin Schuh58646e22021-08-23 23:51:46 -07001248 pi2->MakeEventLoop("pi2_pong_counter");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001249
1250 MessageCounter<examples::Pong> pi2_pong_counter(
1251 pi2_pong_counter_event_loop.get(), "/test");
1252
1253 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
Austin Schuh58646e22021-08-23 23:51:46 -07001254 pi3->MakeEventLoop("pi3_pong_counter");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001255
1256 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
Austin Schuh58646e22021-08-23 23:51:46 -07001257 pi1->MakeEventLoop("pi1_pong_counter");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001258
1259 MessageCounter<examples::Pong> pi1_pong_counter(
1260 pi1_pong_counter_event_loop.get(), "/test");
1261
1262 // Count timestamps.
1263 MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
1264 pi1_pong_counter_event_loop.get(), "/pi1/aos");
1265 MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
1266 pi2_pong_counter_event_loop.get(), "/pi1/aos");
1267 MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
1268 pi3_pong_counter_event_loop.get(), "/pi1/aos");
1269 MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
1270 pi1_pong_counter_event_loop.get(), "/pi2/aos");
1271 MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
1272 pi2_pong_counter_event_loop.get(), "/pi2/aos");
1273 MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
1274 pi1_pong_counter_event_loop.get(), "/pi3/aos");
1275 MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
1276 pi3_pong_counter_event_loop.get(), "/pi3/aos");
1277
1278 // Count remote timestamps
Austin Schuh89c9b812021-02-20 14:42:10 -08001279 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1280 remote_timestamps_pi2_on_pi1 =
1281 MakePi2OnPi1MessageCounters(pi1_pong_counter_event_loop.get());
1282 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1283 remote_timestamps_pi1_on_pi2 =
1284 MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001285
1286 MessageCounter<message_bridge::ServerStatistics>
Austin Schuh58646e22021-08-23 23:51:46 -07001287 *pi1_server_statistics_counter;
1288 pi1->OnStartup([pi1, &pi1_server_statistics_counter]() {
1289 pi1_server_statistics_counter =
1290 pi1->AlwaysStart<MessageCounter<message_bridge::ServerStatistics>>(
1291 "pi1_server_statistics_counter", "/pi1/aos");
1292 });
1293
Austin Schuhc0b0f722020-12-12 18:36:06 -08001294 aos::Fetcher<message_bridge::ServerStatistics> pi1_server_statistics_fetcher =
1295 pi1_pong_counter_event_loop
1296 ->MakeFetcher<message_bridge::ServerStatistics>("/pi1/aos");
1297 aos::Fetcher<message_bridge::ClientStatistics> pi1_client_statistics_fetcher =
1298 pi1_pong_counter_event_loop
1299 ->MakeFetcher<message_bridge::ClientStatistics>("/pi1/aos");
1300
1301 MessageCounter<message_bridge::ServerStatistics>
Austin Schuh58646e22021-08-23 23:51:46 -07001302 *pi2_server_statistics_counter;
1303 pi2->OnStartup([pi2, &pi2_server_statistics_counter]() {
1304 pi2_server_statistics_counter =
1305 pi2->AlwaysStart<MessageCounter<message_bridge::ServerStatistics>>(
1306 "pi2_server_statistics_counter", "/pi2/aos");
1307 });
Austin Schuhc0b0f722020-12-12 18:36:06 -08001308 aos::Fetcher<message_bridge::ServerStatistics> pi2_server_statistics_fetcher =
1309 pi2_pong_counter_event_loop
1310 ->MakeFetcher<message_bridge::ServerStatistics>("/pi2/aos");
1311 aos::Fetcher<message_bridge::ClientStatistics> pi2_client_statistics_fetcher =
1312 pi2_pong_counter_event_loop
1313 ->MakeFetcher<message_bridge::ClientStatistics>("/pi2/aos");
1314
1315 MessageCounter<message_bridge::ServerStatistics>
Austin Schuh58646e22021-08-23 23:51:46 -07001316 *pi3_server_statistics_counter;
1317 pi3->OnStartup([pi3, &pi3_server_statistics_counter]() {
1318 pi3_server_statistics_counter =
1319 pi3->AlwaysStart<MessageCounter<message_bridge::ServerStatistics>>(
1320 "pi3_server_statistics_counter", "/pi3/aos");
1321 });
Austin Schuhc0b0f722020-12-12 18:36:06 -08001322 aos::Fetcher<message_bridge::ServerStatistics> pi3_server_statistics_fetcher =
1323 pi3_pong_counter_event_loop
1324 ->MakeFetcher<message_bridge::ServerStatistics>("/pi3/aos");
1325 aos::Fetcher<message_bridge::ClientStatistics> pi3_client_statistics_fetcher =
1326 pi3_pong_counter_event_loop
1327 ->MakeFetcher<message_bridge::ClientStatistics>("/pi3/aos");
1328
1329 MessageCounter<message_bridge::ClientStatistics>
1330 pi1_client_statistics_counter(pi1_pong_counter_event_loop.get(),
1331 "/pi1/aos");
1332 MessageCounter<message_bridge::ClientStatistics>
1333 pi2_client_statistics_counter(pi2_pong_counter_event_loop.get(),
1334 "/pi2/aos");
1335 MessageCounter<message_bridge::ClientStatistics>
1336 pi3_client_statistics_counter(pi3_pong_counter_event_loop.get(),
1337 "/pi3/aos");
1338
1339 simulated_event_loop_factory.RunFor(chrono::seconds(2) +
1340 chrono::milliseconds(5));
1341
1342 EXPECT_EQ(pi1_pong_counter.count(), 201u);
1343 EXPECT_EQ(pi2_pong_counter.count(), 201u);
1344
1345 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 20u);
1346 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 20u);
1347 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 20u);
1348 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 20u);
1349 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 20u);
1350 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 20u);
1351 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 20u);
1352
Austin Schuh58646e22021-08-23 23:51:46 -07001353 EXPECT_EQ(pi1_server_statistics_counter->count(), 2u);
1354 EXPECT_EQ(pi2_server_statistics_counter->count(), 2u);
1355 EXPECT_EQ(pi3_server_statistics_counter->count(), 2u);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001356
1357 EXPECT_EQ(pi1_client_statistics_counter.count(), 20u);
1358 EXPECT_EQ(pi2_client_statistics_counter.count(), 20u);
1359 EXPECT_EQ(pi3_client_statistics_counter.count(), 20u);
1360
1361 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001362 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 221);
1363 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 221);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001364
1365 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1366 EXPECT_TRUE(AllConnected(pi1_server_statistics_fetcher.get()))
1367 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1368 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1369 EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
1370 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1371 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1372 EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
1373 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1374 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1375 EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
1376 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1377 EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
1378 EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
1379 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
1380 EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
1381 EXPECT_TRUE(AllConnected(pi3_client_statistics_fetcher.get()))
1382 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1383
Austin Schuh58646e22021-08-23 23:51:46 -07001384 pi1->Disconnect(pi3->node());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001385
1386 simulated_event_loop_factory.RunFor(chrono::seconds(2));
1387
1388 EXPECT_EQ(pi1_pong_counter.count(), 401u);
1389 EXPECT_EQ(pi2_pong_counter.count(), 401u);
1390
1391 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 40u);
1392 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 40u);
1393 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 20u);
1394 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 40u);
1395 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 40u);
1396 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 40u);
1397 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 40u);
1398
Austin Schuh58646e22021-08-23 23:51:46 -07001399 EXPECT_EQ(pi1_server_statistics_counter->count(), 4u);
1400 EXPECT_EQ(pi2_server_statistics_counter->count(), 4u);
1401 EXPECT_EQ(pi3_server_statistics_counter->count(), 4u);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001402
1403 EXPECT_EQ(pi1_client_statistics_counter.count(), 40u);
1404 EXPECT_EQ(pi2_client_statistics_counter.count(), 40u);
1405 EXPECT_EQ(pi3_client_statistics_counter.count(), 40u);
1406
1407 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001408 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 441);
1409 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 441);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001410
1411 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1412 EXPECT_TRUE(AllConnectedBut(pi1_server_statistics_fetcher.get(), "pi3"))
1413 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1414 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1415 EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
1416 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1417 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1418 EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
1419 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1420 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1421 EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
1422 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1423 EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
1424 EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
1425 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
1426 EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
1427 EXPECT_TRUE(AllConnectedBut(pi3_client_statistics_fetcher.get(), "pi1"))
1428 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1429
Austin Schuh58646e22021-08-23 23:51:46 -07001430 pi1->Connect(pi3->node());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001431
1432 simulated_event_loop_factory.RunFor(chrono::seconds(2));
1433
Austin Schuh367a7f42021-11-23 23:04:36 -08001434 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1435 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1436 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1437 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1438 EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
1439 EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
1440
1441 EXPECT_EQ(ConnectedCount(pi1_server_statistics_fetcher.get(), "pi3"), 2u)
1442 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1443 EXPECT_EQ(ConnectedCount(pi1_server_statistics_fetcher.get(), "pi2"), 1u)
1444 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1445 EXPECT_EQ(ConnectedCount(pi1_client_statistics_fetcher.get(), "pi3"), 1u)
1446 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1447 EXPECT_EQ(ConnectedCount(pi1_client_statistics_fetcher.get(), "pi2"), 1u)
1448 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1449
1450 EXPECT_EQ(ConnectedCount(pi2_server_statistics_fetcher.get(), "pi1"), 1u)
1451 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1452 EXPECT_EQ(ConnectedCount(pi2_client_statistics_fetcher.get(), "pi1"), 1u)
1453 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1454
1455 EXPECT_EQ(ConnectedCount(pi3_server_statistics_fetcher.get(), "pi1"), 1u)
1456 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
1457 EXPECT_EQ(ConnectedCount(pi3_client_statistics_fetcher.get(), "pi1"), 2u)
1458 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1459
Austin Schuhc0b0f722020-12-12 18:36:06 -08001460 EXPECT_EQ(pi1_pong_counter.count(), 601u);
1461 EXPECT_EQ(pi2_pong_counter.count(), 601u);
1462
1463 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 60u);
1464 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 60u);
1465 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 40u);
1466 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 60u);
1467 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 60u);
1468 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 60u);
1469 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 60u);
1470
Austin Schuh58646e22021-08-23 23:51:46 -07001471 EXPECT_EQ(pi1_server_statistics_counter->count(), 6u);
1472 EXPECT_EQ(pi2_server_statistics_counter->count(), 6u);
1473 EXPECT_EQ(pi3_server_statistics_counter->count(), 6u);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001474
1475 EXPECT_EQ(pi1_client_statistics_counter.count(), 60u);
1476 EXPECT_EQ(pi2_client_statistics_counter.count(), 60u);
1477 EXPECT_EQ(pi3_client_statistics_counter.count(), 60u);
1478
1479 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001480 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 661);
1481 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 661);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001482
Austin Schuhc0b0f722020-12-12 18:36:06 -08001483 EXPECT_TRUE(AllConnected(pi1_server_statistics_fetcher.get()))
1484 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001485 EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
1486 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001487 EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
1488 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001489 EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
1490 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001491 EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
1492 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001493 EXPECT_TRUE(AllConnected(pi3_client_statistics_fetcher.get()))
1494 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1495}
1496
Austin Schuh2febf0d2020-09-21 22:24:30 -07001497// Tests that the time offset having a slope doesn't break the world.
1498// SimulatedMessageBridge has enough self consistency CHECK statements to
1499// confirm, and we can can also check a message in each direction to make sure
1500// it gets delivered as expected.
1501TEST(SimulatedEventLoopTest, MultinodePingPongWithOffsetAndSlope) {
1502 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
Austin Schuh373f1762021-06-02 21:07:09 -07001503 aos::configuration::ReadConfig(ArtifactPath(
1504 "aos/events/multinode_pingpong_test_combined_config.json"));
Austin Schuh2febf0d2020-09-21 22:24:30 -07001505 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
Austin Schuh87dd3832021-01-01 23:07:31 -08001506 const size_t pi1_index = configuration::GetNodeIndex(&config.message(), pi1);
1507 ASSERT_EQ(pi1_index, 0u);
Austin Schuh2febf0d2020-09-21 22:24:30 -07001508 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
Austin Schuh87dd3832021-01-01 23:07:31 -08001509 const size_t pi2_index = configuration::GetNodeIndex(&config.message(), pi2);
1510 ASSERT_EQ(pi2_index, 1u);
1511 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
1512 const size_t pi3_index = configuration::GetNodeIndex(&config.message(), pi3);
1513 ASSERT_EQ(pi3_index, 2u);
Austin Schuh2febf0d2020-09-21 22:24:30 -07001514
Austin Schuh87dd3832021-01-01 23:07:31 -08001515 message_bridge::TestingTimeConverter time(
1516 configuration::NodesCount(&config.message()));
Austin Schuh2febf0d2020-09-21 22:24:30 -07001517 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
Austin Schuh58646e22021-08-23 23:51:46 -07001518 simulated_event_loop_factory.SetTimeConverter(&time);
Austin Schuh2febf0d2020-09-21 22:24:30 -07001519
Austin Schuh2febf0d2020-09-21 22:24:30 -07001520 constexpr chrono::milliseconds kOffset{150100};
Austin Schuh87dd3832021-01-01 23:07:31 -08001521 time.AddNextTimestamp(
1522 distributed_clock::epoch(),
Austin Schuh58646e22021-08-23 23:51:46 -07001523 {BootTimestamp::epoch(), BootTimestamp::epoch() + kOffset,
1524 BootTimestamp::epoch()});
1525 time.AddNextTimestamp(distributed_clock::epoch() + chrono::seconds(10),
1526 {BootTimestamp::epoch() + chrono::milliseconds(9999),
1527 BootTimestamp::epoch() + kOffset + chrono::seconds(10),
1528 BootTimestamp::epoch() + chrono::milliseconds(9999)});
Austin Schuh2febf0d2020-09-21 22:24:30 -07001529
1530 std::unique_ptr<EventLoop> ping_event_loop =
1531 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
1532 Ping ping(ping_event_loop.get());
1533
1534 std::unique_ptr<EventLoop> pong_event_loop =
1535 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
1536 Pong pong(pong_event_loop.get());
1537
1538 std::unique_ptr<EventLoop> pi1_counter_event_loop =
1539 simulated_event_loop_factory.MakeEventLoop("pi1_counter", pi1);
1540 std::unique_ptr<EventLoop> pi2_counter_event_loop =
1541 simulated_event_loop_factory.MakeEventLoop("pi2_counter", pi2);
1542
1543 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
1544 pi1_counter_event_loop->MakeFetcher<examples::Ping>("/test");
1545 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
1546 pi2_counter_event_loop->MakeFetcher<examples::Ping>("/test");
1547
1548 aos::Fetcher<examples::Pong> pong_on_pi2_fetcher =
1549 pi2_counter_event_loop->MakeFetcher<examples::Pong>("/test");
1550 aos::Fetcher<examples::Pong> pong_on_pi1_fetcher =
1551 pi1_counter_event_loop->MakeFetcher<examples::Pong>("/test");
1552
1553 // End after a pong message comes back. This will leave the latest messages
1554 // on all channels so we can look at timestamps easily and check they make
1555 // sense.
1556 std::unique_ptr<EventLoop> pi1_pong_ender =
1557 simulated_event_loop_factory.MakeEventLoop("pi2_counter", pi1);
1558 int count = 0;
1559 pi1_pong_ender->MakeWatcher(
1560 "/test", [&simulated_event_loop_factory, &count](const examples::Pong &) {
1561 if (++count == 100) {
1562 simulated_event_loop_factory.Exit();
1563 }
1564 });
1565
1566 // Run enough that messages should be delivered.
1567 simulated_event_loop_factory.Run();
1568
1569 // Grab the latest messages.
1570 EXPECT_TRUE(ping_on_pi1_fetcher.Fetch());
1571 EXPECT_TRUE(ping_on_pi2_fetcher.Fetch());
1572 EXPECT_TRUE(pong_on_pi1_fetcher.Fetch());
1573 EXPECT_TRUE(pong_on_pi2_fetcher.Fetch());
1574
1575 // Compute their time on the global distributed clock so we can compute
1576 // distance betwen them.
1577 const distributed_clock::time_point pi1_ping_time =
1578 simulated_event_loop_factory.GetNodeEventLoopFactory(pi1)
1579 ->ToDistributedClock(
1580 ping_on_pi1_fetcher.context().monotonic_event_time);
1581 const distributed_clock::time_point pi2_ping_time =
1582 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
1583 ->ToDistributedClock(
1584 ping_on_pi2_fetcher.context().monotonic_event_time);
1585 const distributed_clock::time_point pi1_pong_time =
1586 simulated_event_loop_factory.GetNodeEventLoopFactory(pi1)
1587 ->ToDistributedClock(
1588 pong_on_pi1_fetcher.context().monotonic_event_time);
1589 const distributed_clock::time_point pi2_pong_time =
1590 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
1591 ->ToDistributedClock(
1592 pong_on_pi2_fetcher.context().monotonic_event_time);
1593
1594 // And confirm the delivery delay is just about exactly 150 uS for both
1595 // directions like expected. There will be a couple ns of rounding errors in
1596 // the conversion functions that aren't worth accounting for right now. This
1597 // will either be really close, or really far.
1598 EXPECT_GE(pi2_ping_time, chrono::microseconds(150) - chrono::nanoseconds(10) +
1599 pi1_ping_time);
1600 EXPECT_LE(pi2_ping_time, chrono::microseconds(150) + chrono::nanoseconds(10) +
1601 pi1_ping_time);
1602
1603 EXPECT_GE(pi1_pong_time, chrono::microseconds(150) - chrono::nanoseconds(10) +
1604 pi2_pong_time);
1605 EXPECT_LE(pi1_pong_time, chrono::microseconds(150) + chrono::nanoseconds(10) +
1606 pi2_pong_time);
1607}
1608
Austin Schuh4c570ea2020-11-19 23:13:24 -08001609void SendPing(aos::Sender<examples::Ping> *sender, int value) {
1610 aos::Sender<examples::Ping>::Builder builder = sender->MakeBuilder();
1611 examples::Ping::Builder ping_builder = builder.MakeBuilder<examples::Ping>();
1612 ping_builder.add_value(value);
milind1f1dca32021-07-03 13:50:07 -07001613 builder.CheckOk(builder.Send(ping_builder.Finish()));
Austin Schuh4c570ea2020-11-19 23:13:24 -08001614}
1615
1616// Tests that reliable (and unreliable) ping messages get forwarded as expected.
Austin Schuh89c9b812021-02-20 14:42:10 -08001617TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeStartupTesting) {
Austin Schuh4c570ea2020-11-19 23:13:24 -08001618 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
1619 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
1620
1621 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
1622
1623 std::unique_ptr<EventLoop> ping_event_loop =
1624 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
1625 aos::Sender<examples::Ping> pi1_reliable_sender =
1626 ping_event_loop->MakeSender<examples::Ping>("/reliable");
1627 aos::Sender<examples::Ping> pi1_unreliable_sender =
1628 ping_event_loop->MakeSender<examples::Ping>("/unreliable");
1629 SendPing(&pi1_reliable_sender, 1);
1630 SendPing(&pi1_unreliable_sender, 1);
1631
1632 std::unique_ptr<EventLoop> pi2_pong_event_loop =
1633 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
1634 MessageCounter<examples::Ping> pi2_reliable_counter(pi2_pong_event_loop.get(),
1635 "/reliable");
1636 MessageCounter<examples::Ping> pi2_unreliable_counter(
1637 pi2_pong_event_loop.get(), "/unreliable");
1638 aos::Fetcher<examples::Ping> reliable_on_pi2_fetcher =
1639 pi2_pong_event_loop->MakeFetcher<examples::Ping>("/reliable");
1640 aos::Fetcher<examples::Ping> unreliable_on_pi2_fetcher =
1641 pi2_pong_event_loop->MakeFetcher<examples::Ping>("/unreliable");
1642
1643 const size_t reliable_channel_index = configuration::ChannelIndex(
1644 pi2_pong_event_loop->configuration(), reliable_on_pi2_fetcher.channel());
1645
1646 std::unique_ptr<EventLoop> pi1_remote_timestamp =
1647 simulated_event_loop_factory.MakeEventLoop("pi1_remote_timestamp", pi1);
1648
Austin Schuheeaa2022021-01-02 21:52:03 -08001649 const chrono::nanoseconds network_delay =
1650 simulated_event_loop_factory.network_delay();
1651
Austin Schuh4c570ea2020-11-19 23:13:24 -08001652 int reliable_timestamp_count = 0;
1653 pi1_remote_timestamp->MakeWatcher(
Austin Schuh89c9b812021-02-20 14:42:10 -08001654 shared() ? "/pi1/aos/remote_timestamps/pi2"
1655 : "/pi1/aos/remote_timestamps/pi2/reliable/aos-examples-Ping",
Austin Schuh20ac95d2020-12-05 17:24:19 -08001656 [reliable_channel_index, &reliable_timestamp_count,
Austin Schuheeaa2022021-01-02 21:52:03 -08001657 &simulated_event_loop_factory, pi2, network_delay, &pi2_pong_event_loop,
1658 &pi1_remote_timestamp](const RemoteMessage &header) {
Austin Schuh20ac95d2020-12-05 17:24:19 -08001659 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuhcdd90272021-03-15 12:46:16 -07001660 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
Austin Schuh20ac95d2020-12-05 17:24:19 -08001661 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
Austin Schuhcdd90272021-03-15 12:46:16 -07001662 ->boot_uuid());
Austin Schuh4c570ea2020-11-19 23:13:24 -08001663 VLOG(1) << aos::FlatbufferToJson(&header);
1664 if (header.channel_index() == reliable_channel_index) {
1665 ++reliable_timestamp_count;
1666 }
Austin Schuheeaa2022021-01-02 21:52:03 -08001667
1668 const aos::monotonic_clock::time_point header_monotonic_sent_time(
1669 chrono::nanoseconds(header.monotonic_sent_time()));
1670
1671 EXPECT_EQ(pi1_remote_timestamp->context().monotonic_event_time,
1672 header_monotonic_sent_time + network_delay +
1673 (pi1_remote_timestamp->monotonic_now() -
1674 pi2_pong_event_loop->monotonic_now()));
Austin Schuh4c570ea2020-11-19 23:13:24 -08001675 });
1676
1677 // Wait to let timestamp estimation start up before looking for the results.
1678 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
1679
1680 EXPECT_EQ(pi2_reliable_counter.count(), 1u);
1681 // This one isn't reliable, but was sent before the start. It should *not* be
1682 // delivered.
1683 EXPECT_EQ(pi2_unreliable_counter.count(), 0u);
1684 // Confirm we got a timestamp logged for the message that was forwarded.
1685 EXPECT_EQ(reliable_timestamp_count, 1u);
1686
1687 SendPing(&pi1_reliable_sender, 2);
1688 SendPing(&pi1_unreliable_sender, 2);
1689 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
1690 EXPECT_EQ(pi2_reliable_counter.count(), 2u);
1691 EXPECT_EQ(pi2_unreliable_counter.count(), 1u);
1692
1693 EXPECT_EQ(reliable_timestamp_count, 2u);
1694}
1695
Austin Schuh20ac95d2020-12-05 17:24:19 -08001696// Tests that rebooting a node changes the ServerStatistics message and the
1697// RemoteTimestamp message.
Austin Schuh89c9b812021-02-20 14:42:10 -08001698TEST_P(RemoteMessageSimulatedEventLoopTest, BootUUIDTest) {
Austin Schuh72e65682021-09-02 11:37:05 -07001699 const UUID pi1_boot0 = UUID::Random();
1700 const UUID pi2_boot0 = UUID::Random();
1701 const UUID pi2_boot1 = UUID::Random();
1702 const UUID pi3_boot0 = UUID::Random();
1703 UUID expected_boot_uuid = pi2_boot0;
Austin Schuh20ac95d2020-12-05 17:24:19 -08001704
Austin Schuh58646e22021-08-23 23:51:46 -07001705 message_bridge::TestingTimeConverter time(
1706 configuration::NodesCount(&config.message()));
1707 SimulatedEventLoopFactory factory(&config.message());
1708 factory.SetTimeConverter(&time);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001709
Austin Schuh58646e22021-08-23 23:51:46 -07001710 const size_t pi1_index =
1711 configuration::GetNodeIndex(&config.message(), "pi1");
1712 const size_t pi2_index =
1713 configuration::GetNodeIndex(&config.message(), "pi2");
1714 const size_t pi3_index =
1715 configuration::GetNodeIndex(&config.message(), "pi3");
Austin Schuh20ac95d2020-12-05 17:24:19 -08001716
Austin Schuh58646e22021-08-23 23:51:46 -07001717 {
1718 time.AddNextTimestamp(distributed_clock::epoch(),
1719 {BootTimestamp::epoch(), BootTimestamp::epoch(),
1720 BootTimestamp::epoch()});
1721
1722 const chrono::nanoseconds dt = chrono::milliseconds(2001);
1723
1724 time.AddNextTimestamp(
1725 distributed_clock::epoch() + dt,
1726 {BootTimestamp::epoch() + dt,
1727 BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
1728 BootTimestamp::epoch() + dt});
1729
1730 time.set_boot_uuid(pi1_index, 0, pi1_boot0);
1731 time.set_boot_uuid(pi2_index, 0, pi2_boot0);
1732 time.set_boot_uuid(pi2_index, 1, pi2_boot1);
1733 time.set_boot_uuid(pi3_index, 0, pi3_boot0);
1734 }
1735
1736 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1737 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
1738
1739 pi1->OnStartup([pi1]() { pi1->AlwaysStart<Ping>("ping"); });
1740 pi2->OnStartup([pi2]() { pi2->AlwaysStart<Pong>("pong"); });
Austin Schuh20ac95d2020-12-05 17:24:19 -08001741
1742 std::unique_ptr<EventLoop> pi1_remote_timestamp =
Austin Schuh58646e22021-08-23 23:51:46 -07001743 pi1->MakeEventLoop("pi1_remote_timestamp");
Austin Schuh20ac95d2020-12-05 17:24:19 -08001744
1745 int timestamp_count = 0;
1746 pi1_remote_timestamp->MakeWatcher(
Austin Schuh8902fa52021-03-14 22:39:24 -07001747 "/pi2/aos", [&expected_boot_uuid,
1748 &pi1_remote_timestamp](const message_bridge::Timestamp &) {
Austin Schuha9012be2021-07-21 15:19:11 -07001749 EXPECT_EQ(pi1_remote_timestamp->context().source_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -07001750 expected_boot_uuid);
1751 });
1752 pi1_remote_timestamp->MakeWatcher(
1753 "/test",
1754 [&expected_boot_uuid, &pi1_remote_timestamp](const examples::Pong &) {
Austin Schuha9012be2021-07-21 15:19:11 -07001755 EXPECT_EQ(pi1_remote_timestamp->context().source_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -07001756 expected_boot_uuid);
1757 });
1758 pi1_remote_timestamp->MakeWatcher(
Austin Schuh89c9b812021-02-20 14:42:10 -08001759 shared() ? "/pi1/aos/remote_timestamps/pi2"
1760 : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping",
Austin Schuh20ac95d2020-12-05 17:24:19 -08001761 [&timestamp_count, &expected_boot_uuid](const RemoteMessage &header) {
1762 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuhcdd90272021-03-15 12:46:16 -07001763 EXPECT_EQ(UUID::FromVector(header.boot_uuid()), expected_boot_uuid);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001764 VLOG(1) << aos::FlatbufferToJson(&header);
1765 ++timestamp_count;
1766 });
1767
1768 int pi1_server_statistics_count = 0;
Austin Schuh58646e22021-08-23 23:51:46 -07001769 bool first_pi1_server_statistics = true;
Austin Schuh367a7f42021-11-23 23:04:36 -08001770 int boot_number = 0;
1771 monotonic_clock::time_point expected_connection_time = pi1->monotonic_now();
Austin Schuh20ac95d2020-12-05 17:24:19 -08001772 pi1_remote_timestamp->MakeWatcher(
Austin Schuh367a7f42021-11-23 23:04:36 -08001773 "/pi1/aos",
1774 [&pi1_server_statistics_count, &expected_boot_uuid,
1775 &expected_connection_time, &first_pi1_server_statistics,
1776 &boot_number](const message_bridge::ServerStatistics &stats) {
Austin Schuh20ac95d2020-12-05 17:24:19 -08001777 VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
1778 for (const message_bridge::ServerConnection *connection :
1779 *stats.connections()) {
Austin Schuh58646e22021-08-23 23:51:46 -07001780 if (connection->state() == message_bridge::State::CONNECTED) {
1781 ASSERT_TRUE(connection->has_boot_uuid());
1782 }
1783 if (!first_pi1_server_statistics) {
1784 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1785 }
Austin Schuh20ac95d2020-12-05 17:24:19 -08001786 if (connection->node()->name()->string_view() == "pi2") {
Austin Schuh58646e22021-08-23 23:51:46 -07001787 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1788 ASSERT_TRUE(connection->has_boot_uuid());
Austin Schuh20ac95d2020-12-05 17:24:19 -08001789 EXPECT_EQ(expected_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -07001790 UUID::FromString(connection->boot_uuid()))
Austin Schuh20ac95d2020-12-05 17:24:19 -08001791 << " : Got " << aos::FlatbufferToJson(&stats);
Austin Schuh367a7f42021-11-23 23:04:36 -08001792 EXPECT_EQ(monotonic_clock::time_point(chrono::nanoseconds(
1793 connection->connected_since_time())),
1794 expected_connection_time);
1795 EXPECT_EQ(boot_number + 1, connection->connection_count());
Austin Schuh20ac95d2020-12-05 17:24:19 -08001796 ++pi1_server_statistics_count;
1797 }
1798 }
Austin Schuh58646e22021-08-23 23:51:46 -07001799 first_pi1_server_statistics = false;
Austin Schuh20ac95d2020-12-05 17:24:19 -08001800 });
1801
Austin Schuh58646e22021-08-23 23:51:46 -07001802 int pi1_client_statistics_count = 0;
1803 pi1_remote_timestamp->MakeWatcher(
Austin Schuh367a7f42021-11-23 23:04:36 -08001804 "/pi1/aos", [&pi1_client_statistics_count, &expected_boot_uuid,
1805 &expected_connection_time, &boot_number](
Austin Schuh58646e22021-08-23 23:51:46 -07001806 const message_bridge::ClientStatistics &stats) {
1807 VLOG(1) << "pi1 ClientStatistics " << FlatbufferToJson(&stats);
1808 for (const message_bridge::ClientConnection *connection :
1809 *stats.connections()) {
1810 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1811 if (connection->node()->name()->string_view() == "pi2") {
1812 ++pi1_client_statistics_count;
Austin Schuh367a7f42021-11-23 23:04:36 -08001813 EXPECT_EQ(expected_boot_uuid,
1814 UUID::FromString(connection->boot_uuid()))
1815 << " : Got " << aos::FlatbufferToJson(&stats);
1816 EXPECT_EQ(monotonic_clock::time_point(chrono::nanoseconds(
1817 connection->connected_since_time())),
1818 expected_connection_time);
1819 EXPECT_EQ(boot_number + 1, connection->connection_count());
1820 } else {
1821 EXPECT_EQ(connection->connected_since_time(), 0);
1822 EXPECT_EQ(1, connection->connection_count());
Austin Schuh58646e22021-08-23 23:51:46 -07001823 }
1824 }
1825 });
1826
1827 // Confirm that reboot changes the UUID.
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -07001828 pi2->OnShutdown([&expected_boot_uuid, &boot_number, &expected_connection_time,
1829 pi1, pi2, pi2_boot1]() {
1830 expected_boot_uuid = pi2_boot1;
1831 ++boot_number;
1832 LOG(INFO) << "OnShutdown triggered for pi2";
1833 pi2->OnStartup(
1834 [&expected_boot_uuid, &expected_connection_time, pi1, pi2]() {
1835 EXPECT_EQ(expected_boot_uuid, pi2->boot_uuid());
1836 expected_connection_time = pi1->monotonic_now();
1837 });
1838 });
Austin Schuh58646e22021-08-23 23:51:46 -07001839
Austin Schuh20ac95d2020-12-05 17:24:19 -08001840 // Let a couple of ServerStatistics messages show up before rebooting.
Austin Schuh58646e22021-08-23 23:51:46 -07001841 factory.RunFor(chrono::milliseconds(2002));
Austin Schuh20ac95d2020-12-05 17:24:19 -08001842
1843 EXPECT_GT(timestamp_count, 100);
1844 EXPECT_GE(pi1_server_statistics_count, 1u);
1845
Austin Schuh20ac95d2020-12-05 17:24:19 -08001846 timestamp_count = 0;
1847 pi1_server_statistics_count = 0;
1848
Austin Schuh58646e22021-08-23 23:51:46 -07001849 factory.RunFor(chrono::milliseconds(2000));
Austin Schuh20ac95d2020-12-05 17:24:19 -08001850 EXPECT_GT(timestamp_count, 100);
1851 EXPECT_GE(pi1_server_statistics_count, 1u);
1852}
1853
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -07001854INSTANTIATE_TEST_SUITE_P(
Austin Schuh89c9b812021-02-20 14:42:10 -08001855 All, RemoteMessageSimulatedEventLoopTest,
1856 ::testing::Values(
1857 Param{"multinode_pingpong_test_combined_config.json", true},
1858 Param{"multinode_pingpong_test_split_config.json", false}));
1859
Austin Schuh58646e22021-08-23 23:51:46 -07001860// Tests that Startup and Shutdown do reasonable things.
1861TEST(SimulatedEventLoopTest, MultinodePingPongStartup) {
1862 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
1863 aos::configuration::ReadConfig(
1864 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
1865
Austin Schuh72e65682021-09-02 11:37:05 -07001866 size_t pi1_shutdown_counter = 0;
1867 size_t pi2_shutdown_counter = 0;
1868 MessageCounter<examples::Pong> *pi1_pong_counter = nullptr;
1869 MessageCounter<examples::Ping> *pi2_ping_counter = nullptr;
1870
Austin Schuh58646e22021-08-23 23:51:46 -07001871 message_bridge::TestingTimeConverter time(
1872 configuration::NodesCount(&config.message()));
1873 SimulatedEventLoopFactory factory(&config.message());
1874 factory.SetTimeConverter(&time);
1875 time.AddNextTimestamp(
1876 distributed_clock::epoch(),
1877 {BootTimestamp::epoch(), BootTimestamp::epoch(), BootTimestamp::epoch()});
1878
1879 const chrono::nanoseconds dt = chrono::seconds(10) + chrono::milliseconds(6);
1880
1881 time.AddNextTimestamp(
1882 distributed_clock::epoch() + dt,
1883 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
1884 BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
1885 BootTimestamp::epoch() + dt});
1886
1887 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1888 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
1889
1890 // Configure startup to start Ping and Pong, and count.
1891 size_t pi1_startup_counter = 0;
1892 size_t pi2_startup_counter = 0;
1893 pi1->OnStartup([pi1]() {
1894 LOG(INFO) << "Made ping";
1895 pi1->AlwaysStart<Ping>("ping");
1896 });
1897 pi1->OnStartup([&pi1_startup_counter]() { ++pi1_startup_counter; });
1898 pi2->OnStartup([pi2]() {
1899 LOG(INFO) << "Made pong";
1900 pi2->AlwaysStart<Pong>("pong");
1901 });
1902 pi2->OnStartup([&pi2_startup_counter]() { ++pi2_startup_counter; });
1903
1904 // Shutdown just counts.
Austin Schuh58646e22021-08-23 23:51:46 -07001905 pi1->OnShutdown([&pi1_shutdown_counter]() { ++pi1_shutdown_counter; });
1906 pi2->OnShutdown([&pi2_shutdown_counter]() { ++pi2_shutdown_counter; });
1907
Austin Schuh58646e22021-08-23 23:51:46 -07001908 // Automatically make counters on startup.
1909 pi1->OnStartup([&pi1_pong_counter, pi1]() {
1910 pi1_pong_counter = pi1->AlwaysStart<MessageCounter<examples::Pong>>(
1911 "pi1_pong_counter", "/test");
1912 });
1913 pi1->OnShutdown([&pi1_pong_counter]() { pi1_pong_counter = nullptr; });
1914 pi2->OnStartup([&pi2_ping_counter, pi2]() {
1915 pi2_ping_counter = pi2->AlwaysStart<MessageCounter<examples::Ping>>(
1916 "pi2_ping_counter", "/test");
1917 });
1918 pi2->OnShutdown([&pi2_ping_counter]() { pi2_ping_counter = nullptr; });
1919
1920 EXPECT_EQ(pi2_ping_counter, nullptr);
1921 EXPECT_EQ(pi1_pong_counter, nullptr);
1922
1923 EXPECT_EQ(pi1_startup_counter, 0u);
1924 EXPECT_EQ(pi2_startup_counter, 0u);
1925 EXPECT_EQ(pi1_shutdown_counter, 0u);
1926 EXPECT_EQ(pi2_shutdown_counter, 0u);
1927
1928 factory.RunFor(chrono::seconds(10) + chrono::milliseconds(5));
1929 EXPECT_EQ(pi1_startup_counter, 1u);
1930 EXPECT_EQ(pi2_startup_counter, 1u);
1931 EXPECT_EQ(pi1_shutdown_counter, 0u);
1932 EXPECT_EQ(pi2_shutdown_counter, 0u);
1933 EXPECT_EQ(pi2_ping_counter->count(), 1001);
1934 EXPECT_EQ(pi1_pong_counter->count(), 1001);
1935
1936 LOG(INFO) << pi1->monotonic_now();
1937 LOG(INFO) << pi2->monotonic_now();
1938
1939 factory.RunFor(chrono::seconds(5) + chrono::milliseconds(5));
1940
1941 EXPECT_EQ(pi1_startup_counter, 2u);
1942 EXPECT_EQ(pi2_startup_counter, 2u);
1943 EXPECT_EQ(pi1_shutdown_counter, 1u);
1944 EXPECT_EQ(pi2_shutdown_counter, 1u);
1945 EXPECT_EQ(pi2_ping_counter->count(), 501);
1946 EXPECT_EQ(pi1_pong_counter->count(), 501);
1947}
1948
1949// Tests that OnStartup handlers can be added after running and get called, and
1950// can't be called when running.
1951TEST(SimulatedEventLoopDeathTest, OnStartupWhileRunning) {
1952 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
1953 aos::configuration::ReadConfig(
1954 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
1955
1956 // Test that we can add startup handlers as long as we aren't running, and
1957 // they get run when Run gets called again.
1958 // Test that adding a startup handler when running fails.
1959 //
1960 // Test shutdown handlers get called on destruction.
1961 SimulatedEventLoopFactory factory(&config.message());
1962
1963 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1964
1965 int startup_count0 = 0;
1966 int startup_count1 = 0;
1967
1968 pi1->OnStartup([&]() { ++startup_count0; });
1969 EXPECT_EQ(startup_count0, 0);
1970 EXPECT_EQ(startup_count1, 0);
1971
1972 factory.RunFor(chrono::nanoseconds(1));
1973 EXPECT_EQ(startup_count0, 1);
1974 EXPECT_EQ(startup_count1, 0);
1975
1976 pi1->OnStartup([&]() { ++startup_count1; });
1977 EXPECT_EQ(startup_count0, 1);
1978 EXPECT_EQ(startup_count1, 0);
1979
1980 factory.RunFor(chrono::nanoseconds(1));
1981 EXPECT_EQ(startup_count0, 1);
1982 EXPECT_EQ(startup_count1, 1);
1983
1984 std::unique_ptr<EventLoop> loop = pi1->MakeEventLoop("foo");
1985 loop->OnRun([&]() { pi1->OnStartup([]() {}); });
1986
1987 EXPECT_DEATH({ factory.RunFor(chrono::nanoseconds(1)); },
1988 "Can only register OnStartup handlers when not running.");
1989}
1990
1991// Tests that OnStartup handlers can be added after running and get called, and
1992// all the handlers get called on reboot. Shutdown handlers are tested the same
1993// way.
1994TEST(SimulatedEventLoopTest, OnStartupShutdownAllRestarts) {
1995 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
1996 aos::configuration::ReadConfig(
1997 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
1998
Austin Schuh72e65682021-09-02 11:37:05 -07001999 int startup_count0 = 0;
2000 int shutdown_count0 = 0;
2001 int startup_count1 = 0;
2002 int shutdown_count1 = 0;
2003
Austin Schuh58646e22021-08-23 23:51:46 -07002004 message_bridge::TestingTimeConverter time(
2005 configuration::NodesCount(&config.message()));
2006 SimulatedEventLoopFactory factory(&config.message());
2007 factory.SetTimeConverter(&time);
2008 time.StartEqual();
2009
2010 const chrono::nanoseconds dt = chrono::seconds(10);
2011 time.RebootAt(0, distributed_clock::epoch() + dt);
2012 time.RebootAt(0, distributed_clock::epoch() + 2 * dt);
2013
2014 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2015
Austin Schuh58646e22021-08-23 23:51:46 -07002016 pi1->OnStartup([&]() { ++startup_count0; });
2017 pi1->OnShutdown([&]() { ++shutdown_count0; });
2018 EXPECT_EQ(startup_count0, 0);
2019 EXPECT_EQ(startup_count1, 0);
2020 EXPECT_EQ(shutdown_count0, 0);
2021 EXPECT_EQ(shutdown_count1, 0);
2022
2023 factory.RunFor(chrono::nanoseconds(1));
2024 EXPECT_EQ(startup_count0, 1);
2025 EXPECT_EQ(startup_count1, 0);
2026 EXPECT_EQ(shutdown_count0, 0);
2027 EXPECT_EQ(shutdown_count1, 0);
2028
2029 pi1->OnStartup([&]() { ++startup_count1; });
2030 EXPECT_EQ(startup_count0, 1);
2031 EXPECT_EQ(startup_count1, 0);
2032 EXPECT_EQ(shutdown_count0, 0);
2033 EXPECT_EQ(shutdown_count1, 0);
2034
2035 factory.RunFor(chrono::nanoseconds(1));
2036 EXPECT_EQ(startup_count0, 1);
2037 EXPECT_EQ(startup_count1, 1);
2038 EXPECT_EQ(shutdown_count0, 0);
2039 EXPECT_EQ(shutdown_count1, 0);
2040
2041 factory.RunFor(chrono::seconds(15));
2042
2043 EXPECT_EQ(startup_count0, 2);
2044 EXPECT_EQ(startup_count1, 2);
2045 EXPECT_EQ(shutdown_count0, 1);
2046 EXPECT_EQ(shutdown_count1, 0);
2047
2048 pi1->OnShutdown([&]() { ++shutdown_count1; });
2049 factory.RunFor(chrono::seconds(10));
2050
2051 EXPECT_EQ(startup_count0, 3);
2052 EXPECT_EQ(startup_count1, 3);
2053 EXPECT_EQ(shutdown_count0, 2);
2054 EXPECT_EQ(shutdown_count1, 1);
2055}
2056
2057// Tests that event loops which outlive shutdown crash.
2058TEST(SimulatedEventLoopDeathTest, EventLoopOutlivesReboot) {
2059 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2060 aos::configuration::ReadConfig(
2061 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2062
2063 message_bridge::TestingTimeConverter time(
2064 configuration::NodesCount(&config.message()));
2065 SimulatedEventLoopFactory factory(&config.message());
2066 factory.SetTimeConverter(&time);
2067 time.StartEqual();
2068
2069 const chrono::nanoseconds dt = chrono::seconds(10);
2070 time.RebootAt(0, distributed_clock::epoch() + dt);
2071
2072 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2073
2074 std::unique_ptr<EventLoop> loop = pi1->MakeEventLoop("foo");
2075
2076 EXPECT_DEATH({ factory.RunFor(dt * 2); }, "Event loop");
2077}
2078
Brian Silvermane1fe2512022-08-14 23:18:50 -07002079// Test that an ExitHandle outliving its factory is caught.
2080TEST(SimulatedEventLoopDeathTest, ExitHandleOutlivesFactory) {
2081 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2082 aos::configuration::ReadConfig(
2083 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2084 auto factory = std::make_unique<SimulatedEventLoopFactory>(&config.message());
2085 NodeEventLoopFactory *pi1 = factory->GetNodeEventLoopFactory("pi1");
2086 std::unique_ptr<EventLoop> loop = pi1->MakeEventLoop("foo");
2087 auto exit_handle = factory->MakeExitHandle();
2088 EXPECT_DEATH(factory.reset(),
2089 "All ExitHandles must be destroyed before the factory");
2090}
2091
Austin Schuh58646e22021-08-23 23:51:46 -07002092// Tests that messages don't survive a reboot of a node.
2093TEST(SimulatedEventLoopTest, ChannelClearedOnReboot) {
2094 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2095 aos::configuration::ReadConfig(
2096 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2097
2098 message_bridge::TestingTimeConverter time(
2099 configuration::NodesCount(&config.message()));
2100 SimulatedEventLoopFactory factory(&config.message());
2101 factory.SetTimeConverter(&time);
2102 time.StartEqual();
2103
2104 const chrono::nanoseconds dt = chrono::seconds(10);
2105 time.RebootAt(0, distributed_clock::epoch() + dt);
2106
2107 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2108
2109 const UUID boot_uuid = pi1->boot_uuid();
2110 EXPECT_NE(boot_uuid, UUID::Zero());
2111
2112 {
2113 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
2114 aos::Sender<examples::Ping> test_message_sender =
2115 ping_event_loop->MakeSender<examples::Ping>("/reliable");
2116 SendPing(&test_message_sender, 1);
2117 }
2118
2119 factory.RunFor(chrono::seconds(5));
2120
2121 {
2122 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
2123 aos::Fetcher<examples::Ping> fetcher =
2124 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
2125 EXPECT_TRUE(fetcher.Fetch());
2126 }
2127
2128 factory.RunFor(chrono::seconds(10));
2129
2130 {
2131 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
2132 aos::Fetcher<examples::Ping> fetcher =
2133 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
2134 EXPECT_FALSE(fetcher.Fetch());
2135 }
2136 EXPECT_NE(boot_uuid, pi1->boot_uuid());
2137}
2138
2139// Tests that reliable messages get resent on reboot.
2140TEST(SimulatedEventLoopTest, ReliableMessageResentOnReboot) {
2141 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2142 aos::configuration::ReadConfig(
2143 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2144
2145 message_bridge::TestingTimeConverter time(
2146 configuration::NodesCount(&config.message()));
2147 SimulatedEventLoopFactory factory(&config.message());
2148 factory.SetTimeConverter(&time);
2149 time.StartEqual();
2150
2151 const chrono::nanoseconds dt = chrono::seconds(1);
2152 time.RebootAt(1, distributed_clock::epoch() + dt);
2153
2154 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2155 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2156
2157 const UUID pi1_boot_uuid = pi1->boot_uuid();
2158 const UUID pi2_boot_uuid = pi2->boot_uuid();
2159 EXPECT_NE(pi1_boot_uuid, UUID::Zero());
2160 EXPECT_NE(pi2_boot_uuid, UUID::Zero());
2161
2162 {
2163 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
2164 aos::Sender<examples::Ping> test_message_sender =
2165 ping_event_loop->MakeSender<examples::Ping>("/reliable");
2166 SendPing(&test_message_sender, 1);
2167 }
2168
2169 factory.RunFor(chrono::milliseconds(500));
2170
2171 {
2172 ::std::unique_ptr<EventLoop> ping_event_loop = pi2->MakeEventLoop("pong");
2173 aos::Fetcher<examples::Ping> fetcher =
2174 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
2175 EXPECT_TRUE(fetcher.Fetch());
2176 }
2177
2178 factory.RunFor(chrono::seconds(1));
2179
2180 {
2181 ::std::unique_ptr<EventLoop> ping_event_loop = pi2->MakeEventLoop("pong");
2182 aos::Fetcher<examples::Ping> fetcher =
2183 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
2184 EXPECT_TRUE(fetcher.Fetch());
2185 }
2186 EXPECT_NE(pi2_boot_uuid, pi2->boot_uuid());
2187}
2188
Austin Schuh48205e62021-11-12 14:13:18 -08002189class SimulatedEventLoopDisconnectTest : public ::testing::Test {
2190 public:
2191 SimulatedEventLoopDisconnectTest()
2192 : config(aos::configuration::ReadConfig(ArtifactPath(
2193 "aos/events/multinode_pingpong_test_split_config.json"))),
2194 time(configuration::NodesCount(&config.message())),
2195 factory(&config.message()) {
2196 factory.SetTimeConverter(&time);
2197 }
2198
2199 void VerifyChannels(std::set<const aos::Channel *> statistics_channels,
2200 const monotonic_clock::time_point allowable_message_time,
2201 std::set<const aos::Node *> empty_nodes) {
2202 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2203 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2204 std::unique_ptr<aos::EventLoop> pi1_event_loop =
2205 pi1->MakeEventLoop("fetcher");
2206 std::unique_ptr<aos::EventLoop> pi2_event_loop =
2207 pi2->MakeEventLoop("fetcher");
2208 for (const aos::Channel *channel : *factory.configuration()->channels()) {
2209 if (configuration::ChannelIsReadableOnNode(channel,
2210 pi1_event_loop->node())) {
2211 std::unique_ptr<aos::RawFetcher> fetcher =
2212 pi1_event_loop->MakeRawFetcher(channel);
2213 if (statistics_channels.find(channel) == statistics_channels.end() ||
2214 empty_nodes.find(pi1_event_loop->node()) != empty_nodes.end()) {
2215 EXPECT_FALSE(fetcher->Fetch() &&
2216 fetcher->context().monotonic_event_time >
2217 allowable_message_time)
2218 << ": Found recent message on channel "
2219 << configuration::CleanedChannelToString(channel) << " and time "
2220 << fetcher->context().monotonic_event_time << " > "
2221 << allowable_message_time << " on pi1";
2222 } else {
2223 EXPECT_TRUE(fetcher->Fetch() &&
2224 fetcher->context().monotonic_event_time >=
2225 allowable_message_time)
2226 << ": Didn't find recent message on channel "
2227 << configuration::CleanedChannelToString(channel) << " on pi1";
2228 }
2229 }
2230 if (configuration::ChannelIsReadableOnNode(channel,
2231 pi2_event_loop->node())) {
2232 std::unique_ptr<aos::RawFetcher> fetcher =
2233 pi2_event_loop->MakeRawFetcher(channel);
2234 if (statistics_channels.find(channel) == statistics_channels.end() ||
2235 empty_nodes.find(pi2_event_loop->node()) != empty_nodes.end()) {
2236 EXPECT_FALSE(fetcher->Fetch() &&
2237 fetcher->context().monotonic_event_time >
2238 allowable_message_time)
2239 << ": Found message on channel "
2240 << configuration::CleanedChannelToString(channel) << " and time "
2241 << fetcher->context().monotonic_event_time << " > "
2242 << allowable_message_time << " on pi2";
2243 } else {
2244 EXPECT_TRUE(fetcher->Fetch() &&
2245 fetcher->context().monotonic_event_time >=
2246 allowable_message_time)
2247 << ": Didn't find message on channel "
2248 << configuration::CleanedChannelToString(channel) << " on pi2";
2249 }
2250 }
2251 }
2252 }
2253
2254 aos::FlatbufferDetachedBuffer<aos::Configuration> config;
2255
2256 message_bridge::TestingTimeConverter time;
2257 SimulatedEventLoopFactory factory;
2258};
2259
2260// Tests that if we have message bridge client/server disabled, and timing
2261// reports disabled, no messages are sent. Also tests that we can disconnect a
2262// node and disable statistics on it and it actually fully disconnects.
2263TEST_F(SimulatedEventLoopDisconnectTest, NoMessagesWhenDisabled) {
2264 time.StartEqual();
2265 factory.SkipTimingReport();
2266 factory.DisableStatistics();
2267
2268 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2269 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2270
2271 std::unique_ptr<aos::EventLoop> pi1_event_loop =
2272 pi1->MakeEventLoop("fetcher");
2273 std::unique_ptr<aos::EventLoop> pi2_event_loop =
2274 pi2->MakeEventLoop("fetcher");
2275
2276 factory.RunFor(chrono::milliseconds(100000));
2277
2278 // Confirm no messages are sent if we've configured them all off.
2279 VerifyChannels({}, monotonic_clock::min_time, {});
2280
2281 // Now, confirm that all the message_bridge channels come back when we
2282 // re-enable.
2283 factory.EnableStatistics();
2284
2285 factory.RunFor(chrono::milliseconds(10050));
2286
2287 // Build up the list of all the messages we expect when we come back.
2288 {
2289 std::set<const aos::Channel *> statistics_channels;
Brian Silverman18c8aac2022-01-02 00:03:37 -08002290 for (const std::pair<std::string_view, const Node *> &pi :
Austin Schuh48205e62021-11-12 14:13:18 -08002291 std::vector<std::pair<std::string_view, const Node *>>{
2292 {"/pi1/aos", pi1->node()},
2293 {"/pi2/aos", pi1->node()},
2294 {"/pi3/aos", pi1->node()}}) {
2295 statistics_channels.insert(configuration::GetChannel(
2296 factory.configuration(), pi.first, "aos.message_bridge.Timestamp", "",
2297 pi.second));
2298 statistics_channels.insert(configuration::GetChannel(
2299 factory.configuration(), pi.first,
2300 "aos.message_bridge.ServerStatistics", "", pi.second));
2301 statistics_channels.insert(configuration::GetChannel(
2302 factory.configuration(), pi.first,
2303 "aos.message_bridge.ClientStatistics", "", pi.second));
2304 }
2305
2306 statistics_channels.insert(configuration::GetChannel(
2307 factory.configuration(),
2308 "/pi1/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp",
2309 "aos.message_bridge.RemoteMessage", "", pi1->node()));
2310 statistics_channels.insert(configuration::GetChannel(
2311 factory.configuration(),
2312 "/pi2/aos/remote_timestamps/pi1/pi2/aos/aos-message_bridge-Timestamp",
2313 "aos.message_bridge.RemoteMessage", "", pi2->node()));
2314 VerifyChannels(statistics_channels, monotonic_clock::min_time, {});
2315 }
2316
2317 // Now test that we can disable the messages for a single node
2318 pi2->DisableStatistics();
2319 const aos::monotonic_clock::time_point statistics_disable_time =
2320 pi2->monotonic_now();
2321 factory.RunFor(chrono::milliseconds(10000));
2322
2323 // We should see a much smaller set of messages, but should still see messages
2324 // forwarded, mainly the timestamp message.
2325 {
2326 std::set<const aos::Channel *> statistics_channels;
Brian Silverman18c8aac2022-01-02 00:03:37 -08002327 for (const std::pair<std::string_view, const Node *> &pi :
Austin Schuh48205e62021-11-12 14:13:18 -08002328 std::vector<std::pair<std::string_view, const Node *>>{
2329 {"/pi1/aos", pi1->node()}, {"/pi3/aos", pi1->node()}}) {
2330 statistics_channels.insert(configuration::GetChannel(
2331 factory.configuration(), pi.first, "aos.message_bridge.Timestamp", "",
2332 pi.second));
2333 statistics_channels.insert(configuration::GetChannel(
2334 factory.configuration(), pi.first,
2335 "aos.message_bridge.ServerStatistics", "", pi.second));
2336 statistics_channels.insert(configuration::GetChannel(
2337 factory.configuration(), pi.first,
2338 "aos.message_bridge.ClientStatistics", "", pi.second));
2339 }
2340
2341 statistics_channels.insert(configuration::GetChannel(
2342 factory.configuration(),
2343 "/pi1/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp",
2344 "aos.message_bridge.RemoteMessage", "", pi1->node()));
2345 VerifyChannels(statistics_channels, statistics_disable_time, {});
2346 }
2347
2348 // Now, fully disconnect the node. This will completely quiet down pi2.
2349 pi1->Disconnect(pi2->node());
2350 pi2->Disconnect(pi1->node());
2351
2352 const aos::monotonic_clock::time_point disconnect_disable_time =
2353 pi2->monotonic_now();
2354 factory.RunFor(chrono::milliseconds(10000));
2355
2356 {
2357 std::set<const aos::Channel *> statistics_channels;
Brian Silverman18c8aac2022-01-02 00:03:37 -08002358 for (const std::pair<std::string_view, const Node *> &pi :
Austin Schuh48205e62021-11-12 14:13:18 -08002359 std::vector<std::pair<std::string_view, const Node *>>{
2360 {"/pi1/aos", pi1->node()}, {"/pi3/aos", pi1->node()}}) {
2361 statistics_channels.insert(configuration::GetChannel(
2362 factory.configuration(), pi.first, "aos.message_bridge.Timestamp", "",
2363 pi.second));
2364 statistics_channels.insert(configuration::GetChannel(
2365 factory.configuration(), pi.first,
2366 "aos.message_bridge.ServerStatistics", "", pi.second));
2367 statistics_channels.insert(configuration::GetChannel(
2368 factory.configuration(), pi.first,
2369 "aos.message_bridge.ClientStatistics", "", pi.second));
2370 }
2371
2372 VerifyChannels(statistics_channels, disconnect_disable_time, {pi2->node()});
2373 }
2374}
2375
Neil Balchc8f41ed2018-01-20 22:06:53 -08002376} // namespace testing
2377} // namespace aos