blob: a46cc1ce40309f5eb6a5b09467ec6ce95b6093b3 [file] [log] [blame]
Alex Perrycb7da4b2019-08-28 19:35:56 -07001#include "aos/events/simulated_event_loop.h"
2
milind1f1dca32021-07-03 13:50:07 -07003#include <chrono>
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -07004#include <functional>
Austin Schuh5f1cc5c2019-12-01 18:01:11 -08005#include <string_view>
6
Alex Perrycb7da4b2019-08-28 19:35:56 -07007#include "aos/events/event_loop_param_test.h"
Austin Schuh2f8fd752020-09-01 22:38:28 -07008#include "aos/events/logging/logger_generated.h"
Austin Schuh01b4c352020-09-21 23:09:39 -07009#include "aos/events/message_counter.h"
Austin Schuh898f4972020-01-11 17:21:25 -080010#include "aos/events/ping_lib.h"
11#include "aos/events/pong_lib.h"
Austin Schuh7d87b672019-12-01 20:23:49 -080012#include "aos/events/test_message_generated.h"
Austin Schuh4c3b9702020-08-30 11:34:55 -070013#include "aos/network/message_bridge_client_generated.h"
14#include "aos/network/message_bridge_server_generated.h"
Austin Schuh0de30f32020-12-06 12:44:28 -080015#include "aos/network/remote_message_generated.h"
Austin Schuh87dd3832021-01-01 23:07:31 -080016#include "aos/network/testing_time_converter.h"
Austin Schuh4c3b9702020-08-30 11:34:55 -070017#include "aos/network/timestamp_generated.h"
Austin Schuh373f1762021-06-02 21:07:09 -070018#include "aos/testing/path.h"
Neil Balchc8f41ed2018-01-20 22:06:53 -080019#include "gtest/gtest.h"
20
21namespace aos {
22namespace testing {
Brian Silverman28d14302020-09-18 15:26:17 -070023namespace {
24
Austin Schuh373f1762021-06-02 21:07:09 -070025using aos::testing::ArtifactPath;
Brian Silverman28d14302020-09-18 15:26:17 -070026
Austin Schuh58646e22021-08-23 23:51:46 -070027using logger::BootTimestamp;
Austin Schuh0de30f32020-12-06 12:44:28 -080028using message_bridge::RemoteMessage;
Austin Schuh7267c532019-05-19 19:55:53 -070029namespace chrono = ::std::chrono;
30
Austin Schuh0de30f32020-12-06 12:44:28 -080031} // namespace
32
Neil Balchc8f41ed2018-01-20 22:06:53 -080033class SimulatedEventLoopTestFactory : public EventLoopTestFactory {
34 public:
Austin Schuh5f1cc5c2019-12-01 18:01:11 -080035 ::std::unique_ptr<EventLoop> Make(std::string_view name) override {
Austin Schuh217a9782019-12-21 23:02:50 -080036 MaybeMake();
Austin Schuhac0771c2020-01-07 18:36:30 -080037 return event_loop_factory_->MakeEventLoop(name, my_node());
Neil Balchc8f41ed2018-01-20 22:06:53 -080038 }
Austin Schuh5f1cc5c2019-12-01 18:01:11 -080039 ::std::unique_ptr<EventLoop> MakePrimary(std::string_view name) override {
Austin Schuh217a9782019-12-21 23:02:50 -080040 MaybeMake();
Austin Schuhac0771c2020-01-07 18:36:30 -080041 return event_loop_factory_->MakeEventLoop(name, my_node());
Austin Schuh44019f92019-05-19 19:58:27 -070042 }
43
Austin Schuh217a9782019-12-21 23:02:50 -080044 void Run() override { event_loop_factory_->Run(); }
45 void Exit() override { event_loop_factory_->Exit(); }
Austin Schuh44019f92019-05-19 19:58:27 -070046
Austin Schuh52d325c2019-06-23 18:59:06 -070047 // TODO(austin): Implement this. It's used currently for a phased loop test.
48 // I'm not sure how much that matters.
49 void SleepFor(::std::chrono::nanoseconds /*duration*/) override {}
50
Austin Schuh7d87b672019-12-01 20:23:49 -080051 void set_send_delay(std::chrono::nanoseconds send_delay) {
Austin Schuh217a9782019-12-21 23:02:50 -080052 MaybeMake();
53 event_loop_factory_->set_send_delay(send_delay);
Austin Schuh7d87b672019-12-01 20:23:49 -080054 }
55
Neil Balchc8f41ed2018-01-20 22:06:53 -080056 private:
Austin Schuh217a9782019-12-21 23:02:50 -080057 void MaybeMake() {
58 if (!event_loop_factory_) {
59 if (configuration()->has_nodes()) {
Austin Schuhac0771c2020-01-07 18:36:30 -080060 event_loop_factory_ =
61 std::make_unique<SimulatedEventLoopFactory>(configuration());
Austin Schuh217a9782019-12-21 23:02:50 -080062 } else {
63 event_loop_factory_ =
64 std::make_unique<SimulatedEventLoopFactory>(configuration());
65 }
66 }
67 }
68 std::unique_ptr<SimulatedEventLoopFactory> event_loop_factory_;
Neil Balchc8f41ed2018-01-20 22:06:53 -080069};
70
Austin Schuh6bae8252021-02-07 22:01:49 -080071auto CommonParameters() {
72 return ::testing::Combine(
73 ::testing::Values([]() { return new SimulatedEventLoopTestFactory(); }),
74 ::testing::Values(ReadMethod::COPY, ReadMethod::PIN),
75 ::testing::Values(DoTimingReports::kYes, DoTimingReports::kNo));
76}
Austin Schuh6b6dfa52019-06-12 20:16:20 -070077
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -070078INSTANTIATE_TEST_SUITE_P(SimulatedEventLoopCommonTest, AbstractEventLoopTest,
Austin Schuh66168842021-08-17 19:42:21 -070079 CommonParameters());
Brian Silverman77162972020-08-12 19:52:40 -070080
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -070081INSTANTIATE_TEST_SUITE_P(SimulatedEventLoopCommonDeathTest,
Austin Schuh66168842021-08-17 19:42:21 -070082 AbstractEventLoopDeathTest, CommonParameters());
Neil Balchc8f41ed2018-01-20 22:06:53 -080083
Austin Schuh89c9b812021-02-20 14:42:10 -080084// Parameters to run all the tests with.
85struct Param {
86 // The config file to use.
87 std::string config;
88 // If true, the RemoteMessage channel should be shared between all the remote
89 // channels. If false, there will be 1 RemoteMessage channel per remote
90 // channel.
91 bool shared;
92};
93
94class RemoteMessageSimulatedEventLoopTest
95 : public ::testing::TestWithParam<struct Param> {
96 public:
97 RemoteMessageSimulatedEventLoopTest()
98 : config(aos::configuration::ReadConfig(
Austin Schuh373f1762021-06-02 21:07:09 -070099 ArtifactPath(absl::StrCat("aos/events/", GetParam().config)))) {
Austin Schuh89c9b812021-02-20 14:42:10 -0800100 LOG(INFO) << "Config " << GetParam().config;
101 }
102
103 bool shared() const { return GetParam().shared; }
104
105 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
106 MakePi2OnPi1MessageCounters(aos::EventLoop *event_loop) {
107 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>> counters;
108 if (shared()) {
109 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
110 event_loop, "/aos/remote_timestamps/pi2"));
111 } else {
112 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
113 event_loop,
114 "/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp"));
115 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
116 event_loop, "/aos/remote_timestamps/pi2/test/aos-examples-Ping"));
117 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
118 event_loop, "/aos/remote_timestamps/pi2/reliable/aos-examples-Ping"));
119 }
120 return counters;
121 }
122
123 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
124 MakePi1OnPi2MessageCounters(aos::EventLoop *event_loop) {
125 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>> counters;
126 if (shared()) {
127 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
128 event_loop, "/aos/remote_timestamps/pi1"));
129 } else {
130 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
131 event_loop, "/aos/remote_timestamps/pi1/test/aos-examples-Pong"));
132 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
133 event_loop,
134 "/aos/remote_timestamps/pi1/pi2/aos/aos-message_bridge-Timestamp"));
135 }
136 return counters;
137 }
138
139 aos::FlatbufferDetachedBuffer<aos::Configuration> config;
140};
141
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800142class FunctionEvent : public EventScheduler::Event {
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700143 public:
144 FunctionEvent(std::function<void()> fn) : fn_(fn) {}
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800145
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700146 void Handle() noexcept override { fn_(); }
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800147
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700148 private:
149 std::function<void()> fn_;
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800150};
151
Neil Balchc8f41ed2018-01-20 22:06:53 -0800152// Test that creating an event and running the scheduler runs the event.
153TEST(EventSchedulerTest, ScheduleEvent) {
154 int counter = 0;
Austin Schuh8bd96322020-02-13 21:18:22 -0800155 EventSchedulerScheduler scheduler_scheduler;
Austin Schuh58646e22021-08-23 23:51:46 -0700156 EventScheduler scheduler(0);
Austin Schuh8bd96322020-02-13 21:18:22 -0800157 scheduler_scheduler.AddEventScheduler(&scheduler);
Neil Balchc8f41ed2018-01-20 22:06:53 -0800158
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800159 FunctionEvent e([&counter]() { counter += 1; });
160 scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(1), &e);
Austin Schuh8bd96322020-02-13 21:18:22 -0800161 scheduler_scheduler.Run();
Neil Balchc8f41ed2018-01-20 22:06:53 -0800162 EXPECT_EQ(counter, 1);
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800163 auto token =
164 scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(2), &e);
Neil Balchc8f41ed2018-01-20 22:06:53 -0800165 scheduler.Deschedule(token);
Austin Schuh8bd96322020-02-13 21:18:22 -0800166 scheduler_scheduler.Run();
Neil Balchc8f41ed2018-01-20 22:06:53 -0800167 EXPECT_EQ(counter, 1);
168}
169
170// Test that descheduling an already scheduled event doesn't run the event.
171TEST(EventSchedulerTest, DescheduleEvent) {
172 int counter = 0;
Austin Schuh8bd96322020-02-13 21:18:22 -0800173 EventSchedulerScheduler scheduler_scheduler;
Austin Schuh58646e22021-08-23 23:51:46 -0700174 EventScheduler scheduler(0);
Austin Schuh8bd96322020-02-13 21:18:22 -0800175 scheduler_scheduler.AddEventScheduler(&scheduler);
Neil Balchc8f41ed2018-01-20 22:06:53 -0800176
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800177 FunctionEvent e([&counter]() { counter += 1; });
178 auto token =
179 scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(1), &e);
Neil Balchc8f41ed2018-01-20 22:06:53 -0800180 scheduler.Deschedule(token);
Austin Schuh8bd96322020-02-13 21:18:22 -0800181 scheduler_scheduler.Run();
Neil Balchc8f41ed2018-01-20 22:06:53 -0800182 EXPECT_EQ(counter, 0);
183}
Austin Schuh44019f92019-05-19 19:58:27 -0700184
Austin Schuhe33c08d2022-02-03 18:15:21 -0800185// Test that TemporarilyStopAndRun respects and preserves running.
186TEST(EventSchedulerTest, TemporarilyStopAndRun) {
187 int counter = 0;
188 EventSchedulerScheduler scheduler_scheduler;
189 EventScheduler scheduler(0);
190 scheduler_scheduler.AddEventScheduler(&scheduler);
191
192 scheduler_scheduler.TemporarilyStopAndRun(
193 [&]() { CHECK(!scheduler_scheduler.is_running()); });
194 ASSERT_FALSE(scheduler_scheduler.is_running());
195
196 FunctionEvent e([&]() {
197 counter += 1;
198 CHECK(scheduler_scheduler.is_running());
199 scheduler_scheduler.TemporarilyStopAndRun(
200 [&]() { CHECK(!scheduler_scheduler.is_running()); });
201 CHECK(scheduler_scheduler.is_running());
202 });
203 scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(1), &e);
204 scheduler_scheduler.Run();
205 EXPECT_EQ(counter, 1);
206}
207
Austin Schuh8fb315a2020-11-19 22:33:58 -0800208// Test that sending a message after running gets properly notified.
209TEST(SimulatedEventLoopTest, SendAfterRunFor) {
210 SimulatedEventLoopTestFactory factory;
211
212 SimulatedEventLoopFactory simulated_event_loop_factory(
213 factory.configuration());
214
215 ::std::unique_ptr<EventLoop> ping_event_loop =
216 simulated_event_loop_factory.MakeEventLoop("ping");
217 aos::Sender<TestMessage> test_message_sender =
218 ping_event_loop->MakeSender<TestMessage>("/test");
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700219 ASSERT_EQ(SendTestMessage(test_message_sender), RawSender::Error::kOk);
Austin Schuh8fb315a2020-11-19 22:33:58 -0800220
221 std::unique_ptr<EventLoop> pong1_event_loop =
222 simulated_event_loop_factory.MakeEventLoop("pong");
223 MessageCounter<TestMessage> test_message_counter1(pong1_event_loop.get(),
224 "/test");
225
226 EXPECT_FALSE(ping_event_loop->is_running());
227
228 // Watchers start when you start running, so there should be nothing counted.
229 simulated_event_loop_factory.RunFor(chrono::seconds(1));
230 EXPECT_EQ(test_message_counter1.count(), 0u);
231
232 std::unique_ptr<EventLoop> pong2_event_loop =
233 simulated_event_loop_factory.MakeEventLoop("pong");
234 MessageCounter<TestMessage> test_message_counter2(pong2_event_loop.get(),
235 "/test");
236
237 // Pauses in the middle don't count though, so this should be counted.
238 // But, the fresh watcher shouldn't pick it up yet.
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700239 ASSERT_EQ(SendTestMessage(test_message_sender), RawSender::Error::kOk);
Austin Schuh8fb315a2020-11-19 22:33:58 -0800240
241 EXPECT_EQ(test_message_counter1.count(), 0u);
242 EXPECT_EQ(test_message_counter2.count(), 0u);
243 simulated_event_loop_factory.RunFor(chrono::seconds(1));
244
245 EXPECT_EQ(test_message_counter1.count(), 1u);
246 EXPECT_EQ(test_message_counter2.count(), 0u);
247}
248
Austin Schuh60e77942022-05-16 17:48:24 -0700249// Test that if we configure an event loop to be able to send too fast that we
250// do allow it to do so.
James Kuszmaul890c2492022-04-06 14:59:31 -0700251TEST(SimulatedEventLoopTest, AllowSendTooFast) {
252 SimulatedEventLoopTestFactory factory;
253
254 SimulatedEventLoopFactory simulated_event_loop_factory(
255 factory.configuration());
256
257 // Create two event loops: One will be allowed to send too fast, one won't. We
258 // will then test to ensure that the one that is allowed to send too fast can
259 // indeed send too fast, but that it then makes it so that the second event
260 // loop can no longer send anything because *it* is still limited.
261 ::std::unique_ptr<EventLoop> too_fast_event_loop =
262 simulated_event_loop_factory.GetNodeEventLoopFactory(nullptr)
263 ->MakeEventLoop("too_fast_sender",
264 {NodeEventLoopFactory::CheckSentTooFast::kNo,
James Kuszmaul94ca5132022-07-19 09:11:08 -0700265 NodeEventLoopFactory::ExclusiveSenders::kNo,
266 {}});
James Kuszmaul890c2492022-04-06 14:59:31 -0700267 aos::Sender<TestMessage> too_fast_message_sender =
268 too_fast_event_loop->MakeSender<TestMessage>("/test");
269
270 ::std::unique_ptr<EventLoop> limited_event_loop =
271 simulated_event_loop_factory.MakeEventLoop("limited_sender");
272 aos::Sender<TestMessage> limited_message_sender =
273 limited_event_loop->MakeSender<TestMessage>("/test");
274
275 const int queue_size = TestChannelQueueSize(too_fast_event_loop.get());
276 for (int ii = 0; ii < queue_size; ++ii) {
277 ASSERT_EQ(SendTestMessage(too_fast_message_sender), RawSender::Error::kOk);
278 }
279 // And now we should start being in the sending-too-fast phase.
280 for (int ii = 0; ii < queue_size; ++ii) {
281 ASSERT_EQ(SendTestMessage(too_fast_message_sender), RawSender::Error::kOk);
Austin Schuh60e77942022-05-16 17:48:24 -0700282 ASSERT_EQ(SendTestMessage(limited_message_sender),
283 RawSender::Error::kMessagesSentTooFast);
James Kuszmaul890c2492022-04-06 14:59:31 -0700284 }
285}
286
287// Test that if we setup an exclusive sender that it is indeed exclusive.
288TEST(SimulatedEventLoopDeathTest, ExclusiveSenders) {
289 SimulatedEventLoopTestFactory factory;
290
291 SimulatedEventLoopFactory simulated_event_loop_factory(
292 factory.configuration());
293
294 ::std::unique_ptr<EventLoop> exclusive_event_loop =
295 simulated_event_loop_factory.GetNodeEventLoopFactory(nullptr)
James Kuszmaul94ca5132022-07-19 09:11:08 -0700296 ->MakeEventLoop(
297 "too_fast_sender",
298 {NodeEventLoopFactory::CheckSentTooFast::kYes,
299 NodeEventLoopFactory::ExclusiveSenders::kYes,
300 {{configuration::GetChannel(factory.configuration(), "/test1",
301 "aos.TestMessage", "", nullptr),
302 NodeEventLoopFactory::ExclusiveSenders::kNo}}});
James Kuszmaul890c2492022-04-06 14:59:31 -0700303 exclusive_event_loop->SkipAosLog();
304 exclusive_event_loop->SkipTimingReport();
305 ::std::unique_ptr<EventLoop> normal_event_loop =
306 simulated_event_loop_factory.MakeEventLoop("limited_sender");
307 // Set things up to have the exclusive sender be destroyed so we can test
308 // recovery.
309 {
310 aos::Sender<TestMessage> exclusive_sender =
311 exclusive_event_loop->MakeSender<TestMessage>("/test");
312
313 EXPECT_DEATH(normal_event_loop->MakeSender<TestMessage>("/test"),
314 "TestMessage");
315 }
316 // This one should succeed now that the exclusive channel is removed.
317 aos::Sender<TestMessage> normal_sender =
318 normal_event_loop->MakeSender<TestMessage>("/test");
Austin Schuh60e77942022-05-16 17:48:24 -0700319 EXPECT_DEATH(exclusive_event_loop->MakeSender<TestMessage>("/test"),
320 "TestMessage");
James Kuszmaul94ca5132022-07-19 09:11:08 -0700321
322 // And check an explicitly exempted channel:
323 aos::Sender<TestMessage> non_exclusive_sender =
324 exclusive_event_loop->MakeSender<TestMessage>("/test1");
325 aos::Sender<TestMessage> non_exclusive_sender_regular_event_loop =
326 normal_event_loop->MakeSender<TestMessage>("/test1");
James Kuszmaul890c2492022-04-06 14:59:31 -0700327}
328
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700329void TestSentTooFastCheckEdgeCase(
330 const std::function<RawSender::Error(int, int)> expected_err,
331 const bool send_twice_at_end) {
332 SimulatedEventLoopTestFactory factory;
333
334 auto event_loop = factory.MakePrimary("primary");
335
336 auto sender = event_loop->MakeSender<TestMessage>("/test");
337
338 const int queue_size = TestChannelQueueSize(event_loop.get());
339 int msgs_sent = 0;
340 event_loop->AddPhasedLoop(
341 [&](int) {
342 EXPECT_EQ(SendTestMessage(sender), expected_err(msgs_sent, queue_size));
343 msgs_sent++;
344
345 // If send_twice_at_end, send the last two messages (message
346 // queue_size and queue_size + 1) in the same iteration, meaning that
347 // we would be sending very slightly too fast. Otherwise, we will send
348 // message queue_size + 1 in the next iteration and we will continue
349 // to be sending exactly at the channel frequency.
350 if (send_twice_at_end && (msgs_sent == queue_size)) {
351 EXPECT_EQ(SendTestMessage(sender),
352 expected_err(msgs_sent, queue_size));
353 msgs_sent++;
354 }
355
356 if (msgs_sent > queue_size) {
357 factory.Exit();
358 }
359 },
360 std::chrono::duration_cast<std::chrono::nanoseconds>(
361 std::chrono::duration<double>(
362 1.0 / TestChannelFrequency(event_loop.get()))));
363
364 factory.Run();
365}
366
367// Tests that RawSender::Error::kMessagesSentTooFast is not returned
368// when messages are sent at the exact frequency of the channel.
369TEST(SimulatedEventLoopTest, SendingAtExactlyChannelFrequency) {
370 TestSentTooFastCheckEdgeCase([](int, int) { return RawSender::Error::kOk; },
371 false);
372}
373
374// Tests that RawSender::Error::kMessagesSentTooFast is returned
375// when sending exactly one more message than allowed in a channel storage
376// duration.
377TEST(SimulatedEventLoopTest, SendingSlightlyTooFast) {
378 TestSentTooFastCheckEdgeCase(
379 [](const int msgs_sent, const int queue_size) {
380 return (msgs_sent == queue_size ? RawSender::Error::kMessagesSentTooFast
381 : RawSender::Error::kOk);
382 },
383 true);
384}
385
Austin Schuh8fb315a2020-11-19 22:33:58 -0800386// Test that creating an event loop while running dies.
387TEST(SimulatedEventLoopDeathTest, MakeEventLoopWhileRunning) {
388 SimulatedEventLoopTestFactory factory;
389
390 SimulatedEventLoopFactory simulated_event_loop_factory(
391 factory.configuration());
392
393 ::std::unique_ptr<EventLoop> event_loop =
394 simulated_event_loop_factory.MakeEventLoop("ping");
395
396 auto timer = event_loop->AddTimer([&]() {
397 EXPECT_DEATH(
398 {
399 ::std::unique_ptr<EventLoop> event_loop2 =
400 simulated_event_loop_factory.MakeEventLoop("ping");
401 },
402 "event loop while running");
403 simulated_event_loop_factory.Exit();
404 });
405
406 event_loop->OnRun([&event_loop, &timer] {
407 timer->Setup(event_loop->monotonic_now() + chrono::milliseconds(50));
408 });
409
410 simulated_event_loop_factory.Run();
411}
412
413// Test that creating a watcher after running dies.
414TEST(SimulatedEventLoopDeathTest, MakeWatcherAfterRunning) {
415 SimulatedEventLoopTestFactory factory;
416
417 SimulatedEventLoopFactory simulated_event_loop_factory(
418 factory.configuration());
419
420 ::std::unique_ptr<EventLoop> event_loop =
421 simulated_event_loop_factory.MakeEventLoop("ping");
422
423 simulated_event_loop_factory.RunFor(chrono::seconds(1));
424
425 EXPECT_DEATH(
426 { MessageCounter<TestMessage> counter(event_loop.get(), "/test"); },
427 "Can't add a watcher after running");
428
429 ::std::unique_ptr<EventLoop> event_loop2 =
430 simulated_event_loop_factory.MakeEventLoop("ping");
431
432 simulated_event_loop_factory.RunFor(chrono::seconds(1));
433
434 EXPECT_DEATH(
435 { MessageCounter<TestMessage> counter(event_loop2.get(), "/test"); },
436 "Can't add a watcher after running");
437}
438
Austin Schuh44019f92019-05-19 19:58:27 -0700439// Test that running for a time period with no handlers causes time to progress
440// correctly.
441TEST(SimulatedEventLoopTest, RunForNoHandlers) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800442 SimulatedEventLoopTestFactory factory;
443
444 SimulatedEventLoopFactory simulated_event_loop_factory(
445 factory.configuration());
Austin Schuh44019f92019-05-19 19:58:27 -0700446 ::std::unique_ptr<EventLoop> event_loop =
Austin Schuh5f1cc5c2019-12-01 18:01:11 -0800447 simulated_event_loop_factory.MakeEventLoop("loop");
Austin Schuh44019f92019-05-19 19:58:27 -0700448
449 simulated_event_loop_factory.RunFor(chrono::seconds(1));
450
451 EXPECT_EQ(::aos::monotonic_clock::epoch() + chrono::seconds(1),
Austin Schuh44019f92019-05-19 19:58:27 -0700452 event_loop->monotonic_now());
453}
454
455// Test that running for a time with a periodic handler causes time to end
456// correctly.
457TEST(SimulatedEventLoopTest, RunForTimerHandler) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800458 SimulatedEventLoopTestFactory factory;
459
460 SimulatedEventLoopFactory simulated_event_loop_factory(
461 factory.configuration());
Austin Schuh44019f92019-05-19 19:58:27 -0700462 ::std::unique_ptr<EventLoop> event_loop =
Austin Schuh5f1cc5c2019-12-01 18:01:11 -0800463 simulated_event_loop_factory.MakeEventLoop("loop");
Austin Schuh44019f92019-05-19 19:58:27 -0700464
465 int counter = 0;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700466 auto timer = event_loop->AddTimer([&counter]() { ++counter; });
Austin Schuh44019f92019-05-19 19:58:27 -0700467 event_loop->OnRun([&event_loop, &timer] {
468 timer->Setup(event_loop->monotonic_now() + chrono::milliseconds(50),
469 chrono::milliseconds(100));
470 });
471
472 simulated_event_loop_factory.RunFor(chrono::seconds(1));
473
474 EXPECT_EQ(::aos::monotonic_clock::epoch() + chrono::seconds(1),
Austin Schuh44019f92019-05-19 19:58:27 -0700475 event_loop->monotonic_now());
476 EXPECT_EQ(counter, 10);
477}
478
Austin Schuh7d87b672019-12-01 20:23:49 -0800479// Tests that watchers have latency in simulation.
480TEST(SimulatedEventLoopTest, WatcherTimingReport) {
481 SimulatedEventLoopTestFactory factory;
482 factory.set_send_delay(std::chrono::microseconds(50));
483
484 FLAGS_timing_report_ms = 1000;
485 auto loop1 = factory.MakePrimary("primary");
486 loop1->MakeWatcher("/test", [](const TestMessage &) {});
487
488 auto loop2 = factory.Make("sender_loop");
489
490 auto loop3 = factory.Make("report_fetcher");
491
492 Fetcher<timing::Report> report_fetcher =
493 loop3->MakeFetcher<timing::Report>("/aos");
494 EXPECT_FALSE(report_fetcher.Fetch());
495
496 auto sender = loop2->MakeSender<TestMessage>("/test");
497
498 // Send 10 messages in the middle of a timing report period so we get
499 // something interesting back.
500 auto test_timer = loop2->AddTimer([&sender]() {
501 for (int i = 0; i < 10; ++i) {
502 aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
503 TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
504 builder.add_value(200 + i);
milind1f1dca32021-07-03 13:50:07 -0700505 msg.CheckOk(msg.Send(builder.Finish()));
Austin Schuh7d87b672019-12-01 20:23:49 -0800506 }
507 });
508
509 // Quit after 1 timing report, mid way through the next cycle.
510 {
511 auto end_timer = loop1->AddTimer([&factory]() { factory.Exit(); });
512 end_timer->Setup(loop1->monotonic_now() + chrono::milliseconds(2500));
513 end_timer->set_name("end");
514 }
515
516 loop1->OnRun([&test_timer, &loop1]() {
517 test_timer->Setup(loop1->monotonic_now() + chrono::milliseconds(1500));
518 });
519
520 factory.Run();
521
522 // And, since we are here, check that the timing report makes sense.
523 // Start by looking for our event loop's timing.
524 FlatbufferDetachedBuffer<timing::Report> primary_report =
525 FlatbufferDetachedBuffer<timing::Report>::Empty();
526 while (report_fetcher.FetchNext()) {
527 LOG(INFO) << "Report " << FlatbufferToJson(report_fetcher.get());
528 if (report_fetcher->name()->string_view() == "primary") {
529 primary_report = CopyFlatBuffer(report_fetcher.get());
530 }
531 }
532
533 // Check the watcher report.
Ravago Jonescf453ab2020-05-06 21:14:53 -0700534 VLOG(1) << FlatbufferToJson(primary_report, {.multi_line = true});
Austin Schuh7d87b672019-12-01 20:23:49 -0800535
536 EXPECT_EQ(primary_report.message().name()->string_view(), "primary");
537
538 // Just the timing report timer.
539 ASSERT_NE(primary_report.message().timers(), nullptr);
540 EXPECT_EQ(primary_report.message().timers()->size(), 2);
541
542 // No phased loops
543 ASSERT_EQ(primary_report.message().phased_loops(), nullptr);
544
545 // And now confirm that the watcher received all 10 messages, and has latency.
546 ASSERT_NE(primary_report.message().watchers(), nullptr);
547 ASSERT_EQ(primary_report.message().watchers()->size(), 1);
548 EXPECT_EQ(primary_report.message().watchers()->Get(0)->count(), 10);
549 EXPECT_NEAR(
550 primary_report.message().watchers()->Get(0)->wakeup_latency()->average(),
551 0.00005, 1e-9);
552 EXPECT_NEAR(
553 primary_report.message().watchers()->Get(0)->wakeup_latency()->min(),
554 0.00005, 1e-9);
555 EXPECT_NEAR(
556 primary_report.message().watchers()->Get(0)->wakeup_latency()->max(),
557 0.00005, 1e-9);
558 EXPECT_EQ(primary_report.message()
559 .watchers()
560 ->Get(0)
561 ->wakeup_latency()
562 ->standard_deviation(),
563 0.0);
564
565 EXPECT_EQ(
566 primary_report.message().watchers()->Get(0)->handler_time()->average(),
567 0.0);
568 EXPECT_EQ(primary_report.message().watchers()->Get(0)->handler_time()->min(),
569 0.0);
570 EXPECT_EQ(primary_report.message().watchers()->Get(0)->handler_time()->max(),
571 0.0);
572 EXPECT_EQ(primary_report.message()
573 .watchers()
574 ->Get(0)
575 ->handler_time()
576 ->standard_deviation(),
577 0.0);
578}
579
Austin Schuh89c9b812021-02-20 14:42:10 -0800580size_t CountAll(
581 const std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
582 &counters) {
583 size_t count = 0u;
584 for (const std::unique_ptr<MessageCounter<RemoteMessage>> &counter :
585 counters) {
586 count += counter->count();
587 }
588 return count;
589}
590
Austin Schuh4c3b9702020-08-30 11:34:55 -0700591// Tests that ping and pong work when on 2 different nodes, and the message
592// gateway messages are sent out as expected.
Austin Schuh89c9b812021-02-20 14:42:10 -0800593TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodePingPong) {
Austin Schuh898f4972020-01-11 17:21:25 -0800594 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
595 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
Austin Schuh4c3b9702020-08-30 11:34:55 -0700596 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
Austin Schuh898f4972020-01-11 17:21:25 -0800597
598 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
599
600 std::unique_ptr<EventLoop> ping_event_loop =
601 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
602 Ping ping(ping_event_loop.get());
603
604 std::unique_ptr<EventLoop> pong_event_loop =
605 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
606 Pong pong(pong_event_loop.get());
607
608 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
609 simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700610 MessageCounter<examples::Pong> pi2_pong_counter(
611 pi2_pong_counter_event_loop.get(), "/test");
Austin Schuh2f8fd752020-09-01 22:38:28 -0700612 aos::Fetcher<message_bridge::Timestamp> pi1_on_pi2_timestamp_fetcher =
613 pi2_pong_counter_event_loop->MakeFetcher<message_bridge::Timestamp>(
614 "/pi1/aos");
615 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
616 pi2_pong_counter_event_loop->MakeFetcher<examples::Ping>("/test");
Austin Schuh898f4972020-01-11 17:21:25 -0800617
Austin Schuh4c3b9702020-08-30 11:34:55 -0700618 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
619 simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
Austin Schuh898f4972020-01-11 17:21:25 -0800620
621 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
622 simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700623 MessageCounter<examples::Pong> pi1_pong_counter(
624 pi1_pong_counter_event_loop.get(), "/test");
Austin Schuh2f8fd752020-09-01 22:38:28 -0700625 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
626 pi1_pong_counter_event_loop->MakeFetcher<examples::Ping>("/test");
627 aos::Fetcher<message_bridge::Timestamp> pi1_on_pi1_timestamp_fetcher =
628 pi1_pong_counter_event_loop->MakeFetcher<message_bridge::Timestamp>(
629 "/aos");
630
Austin Schuh4c3b9702020-08-30 11:34:55 -0700631 // Count timestamps.
632 MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
633 pi1_pong_counter_event_loop.get(), "/pi1/aos");
634 MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
635 pi2_pong_counter_event_loop.get(), "/pi1/aos");
636 MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
637 pi3_pong_counter_event_loop.get(), "/pi1/aos");
638 MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
639 pi1_pong_counter_event_loop.get(), "/pi2/aos");
640 MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
641 pi2_pong_counter_event_loop.get(), "/pi2/aos");
642 MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
643 pi1_pong_counter_event_loop.get(), "/pi3/aos");
644 MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
645 pi3_pong_counter_event_loop.get(), "/pi3/aos");
646
Austin Schuh2f8fd752020-09-01 22:38:28 -0700647 // Count remote timestamps
Austin Schuh89c9b812021-02-20 14:42:10 -0800648 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
649 remote_timestamps_pi2_on_pi1 =
650 MakePi2OnPi1MessageCounters(pi1_pong_counter_event_loop.get());
651 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
652 remote_timestamps_pi1_on_pi2 =
653 MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700654
Austin Schuh4c3b9702020-08-30 11:34:55 -0700655 // Wait to let timestamp estimation start up before looking for the results.
656 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
657
Austin Schuh8fb315a2020-11-19 22:33:58 -0800658 std::unique_ptr<EventLoop> pi1_statistics_counter_event_loop =
659 simulated_event_loop_factory.MakeEventLoop("pi1_statistics_counter", pi1);
660 std::unique_ptr<EventLoop> pi2_statistics_counter_event_loop =
661 simulated_event_loop_factory.MakeEventLoop("pi2_statistics_counter", pi2);
662 std::unique_ptr<EventLoop> pi3_statistics_counter_event_loop =
663 simulated_event_loop_factory.MakeEventLoop("pi3_statistics_counter", pi3);
664
Austin Schuh4c3b9702020-08-30 11:34:55 -0700665 int pi1_server_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800666 pi1_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700667 "/pi1/aos", [&pi1_server_statistics_count](
668 const message_bridge::ServerStatistics &stats) {
669 VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
670 EXPECT_EQ(stats.connections()->size(), 2u);
671 for (const message_bridge::ServerConnection *connection :
672 *stats.connections()) {
673 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800674 EXPECT_EQ(connection->connection_count(), 1u);
675 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800676 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700677 if (connection->node()->name()->string_view() == "pi2") {
678 EXPECT_GT(connection->sent_packets(), 50);
679 } else if (connection->node()->name()->string_view() == "pi3") {
680 EXPECT_GE(connection->sent_packets(), 5);
681 } else {
682 LOG(FATAL) << "Unknown connection";
683 }
684
685 EXPECT_TRUE(connection->has_monotonic_offset());
686 EXPECT_EQ(connection->monotonic_offset(), 0);
687 }
688 ++pi1_server_statistics_count;
689 });
690
691 int pi2_server_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800692 pi2_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700693 "/pi2/aos", [&pi2_server_statistics_count](
694 const message_bridge::ServerStatistics &stats) {
695 VLOG(1) << "pi2 ServerStatistics " << FlatbufferToJson(&stats);
696 EXPECT_EQ(stats.connections()->size(), 1u);
697
698 const message_bridge::ServerConnection *connection =
699 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800700 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700701 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
702 EXPECT_GT(connection->sent_packets(), 50);
703 EXPECT_TRUE(connection->has_monotonic_offset());
704 EXPECT_EQ(connection->monotonic_offset(), 0);
Austin Schuh367a7f42021-11-23 23:04:36 -0800705 EXPECT_EQ(connection->connection_count(), 1u);
706 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700707 ++pi2_server_statistics_count;
708 });
709
710 int pi3_server_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800711 pi3_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700712 "/pi3/aos", [&pi3_server_statistics_count](
713 const message_bridge::ServerStatistics &stats) {
714 VLOG(1) << "pi3 ServerStatistics " << FlatbufferToJson(&stats);
715 EXPECT_EQ(stats.connections()->size(), 1u);
716
717 const message_bridge::ServerConnection *connection =
718 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800719 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700720 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
721 EXPECT_GE(connection->sent_packets(), 5);
722 EXPECT_TRUE(connection->has_monotonic_offset());
723 EXPECT_EQ(connection->monotonic_offset(), 0);
Austin Schuh367a7f42021-11-23 23:04:36 -0800724 EXPECT_EQ(connection->connection_count(), 1u);
725 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700726 ++pi3_server_statistics_count;
727 });
728
729 int pi1_client_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800730 pi1_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700731 "/pi1/aos", [&pi1_client_statistics_count](
732 const message_bridge::ClientStatistics &stats) {
733 VLOG(1) << "pi1 ClientStatistics " << FlatbufferToJson(&stats);
734 EXPECT_EQ(stats.connections()->size(), 2u);
735
736 for (const message_bridge::ClientConnection *connection :
737 *stats.connections()) {
738 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
739 if (connection->node()->name()->string_view() == "pi2") {
740 EXPECT_GT(connection->received_packets(), 50);
741 } else if (connection->node()->name()->string_view() == "pi3") {
742 EXPECT_GE(connection->received_packets(), 5);
743 } else {
744 LOG(FATAL) << "Unknown connection";
745 }
746
Austin Schuhe61d4382021-03-31 21:33:02 -0700747 EXPECT_EQ(connection->partial_deliveries(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700748 EXPECT_TRUE(connection->has_monotonic_offset());
749 EXPECT_EQ(connection->monotonic_offset(), 150000);
Austin Schuh367a7f42021-11-23 23:04:36 -0800750 EXPECT_EQ(connection->connection_count(), 1u);
751 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700752 }
753 ++pi1_client_statistics_count;
754 });
755
756 int pi2_client_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800757 pi2_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700758 "/pi2/aos", [&pi2_client_statistics_count](
759 const message_bridge::ClientStatistics &stats) {
760 VLOG(1) << "pi2 ClientStatistics " << FlatbufferToJson(&stats);
761 EXPECT_EQ(stats.connections()->size(), 1u);
762
763 const message_bridge::ClientConnection *connection =
764 stats.connections()->Get(0);
765 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
766 EXPECT_GT(connection->received_packets(), 50);
Austin Schuhe61d4382021-03-31 21:33:02 -0700767 EXPECT_EQ(connection->partial_deliveries(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700768 EXPECT_TRUE(connection->has_monotonic_offset());
769 EXPECT_EQ(connection->monotonic_offset(), 150000);
Austin Schuh367a7f42021-11-23 23:04:36 -0800770 EXPECT_EQ(connection->connection_count(), 1u);
771 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700772 ++pi2_client_statistics_count;
773 });
774
775 int pi3_client_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800776 pi3_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700777 "/pi3/aos", [&pi3_client_statistics_count](
778 const message_bridge::ClientStatistics &stats) {
779 VLOG(1) << "pi3 ClientStatistics " << FlatbufferToJson(&stats);
780 EXPECT_EQ(stats.connections()->size(), 1u);
781
782 const message_bridge::ClientConnection *connection =
783 stats.connections()->Get(0);
784 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
785 EXPECT_GE(connection->received_packets(), 5);
Austin Schuhe61d4382021-03-31 21:33:02 -0700786 EXPECT_EQ(connection->partial_deliveries(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700787 EXPECT_TRUE(connection->has_monotonic_offset());
788 EXPECT_EQ(connection->monotonic_offset(), 150000);
Austin Schuh367a7f42021-11-23 23:04:36 -0800789 EXPECT_EQ(connection->connection_count(), 1u);
790 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700791 ++pi3_client_statistics_count;
792 });
793
Austin Schuh2f8fd752020-09-01 22:38:28 -0700794 // Find the channel index for both the /pi1/aos Timestamp channel and Ping
795 // channel.
796 const size_t pi1_timestamp_channel =
797 configuration::ChannelIndex(pi1_pong_counter_event_loop->configuration(),
798 pi1_on_pi2_timestamp_fetcher.channel());
799 const size_t ping_timestamp_channel =
800 configuration::ChannelIndex(pi1_pong_counter_event_loop->configuration(),
801 ping_on_pi2_fetcher.channel());
802
803 for (const Channel *channel :
804 *pi1_pong_counter_event_loop->configuration()->channels()) {
805 VLOG(1) << "Channel "
806 << configuration::ChannelIndex(
807 pi1_pong_counter_event_loop->configuration(), channel)
808 << " " << configuration::CleanedChannelToString(channel);
809 }
810
Austin Schuh8fb315a2020-11-19 22:33:58 -0800811 std::unique_ptr<EventLoop> pi1_remote_timestamp =
812 simulated_event_loop_factory.MakeEventLoop("pi1_remote_timestamp", pi1);
813
Austin Schuh89c9b812021-02-20 14:42:10 -0800814 for (std::pair<int, std::string> channel :
815 shared()
816 ? std::vector<std::pair<
817 int, std::string>>{{-1, "/pi1/aos/remote_timestamps/pi2"}}
818 : std::vector<std::pair<int, std::string>>{
819 {pi1_timestamp_channel,
820 "/pi1/aos/remote_timestamps/pi2/pi1/aos/"
821 "aos-message_bridge-Timestamp"},
822 {ping_timestamp_channel,
823 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping"}}) {
824 // For each remote timestamp we get back, confirm that it is either a ping
825 // message, or a timestamp we sent out. Also confirm that the timestamps
826 // are correct.
827 pi1_remote_timestamp->MakeWatcher(
828 channel.second,
829 [pi1_timestamp_channel, ping_timestamp_channel, &ping_on_pi2_fetcher,
830 &ping_on_pi1_fetcher, &pi1_on_pi2_timestamp_fetcher,
831 &pi1_on_pi1_timestamp_fetcher, &simulated_event_loop_factory, pi2,
832 channel_index = channel.first](const RemoteMessage &header) {
833 VLOG(1) << aos::FlatbufferToJson(&header);
834 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuhcdd90272021-03-15 12:46:16 -0700835 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
Austin Schuh89c9b812021-02-20 14:42:10 -0800836 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
Austin Schuhcdd90272021-03-15 12:46:16 -0700837 ->boot_uuid());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700838
Austin Schuh89c9b812021-02-20 14:42:10 -0800839 const aos::monotonic_clock::time_point header_monotonic_sent_time(
840 chrono::nanoseconds(header.monotonic_sent_time()));
841 const aos::realtime_clock::time_point header_realtime_sent_time(
842 chrono::nanoseconds(header.realtime_sent_time()));
843 const aos::monotonic_clock::time_point header_monotonic_remote_time(
844 chrono::nanoseconds(header.monotonic_remote_time()));
845 const aos::realtime_clock::time_point header_realtime_remote_time(
846 chrono::nanoseconds(header.realtime_remote_time()));
Austin Schuh2f8fd752020-09-01 22:38:28 -0700847
Austin Schuh89c9b812021-02-20 14:42:10 -0800848 if (channel_index != -1) {
849 ASSERT_EQ(channel_index, header.channel_index());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700850 }
851
Austin Schuh89c9b812021-02-20 14:42:10 -0800852 const Context *pi1_context = nullptr;
853 const Context *pi2_context = nullptr;
854
855 if (header.channel_index() == pi1_timestamp_channel) {
856 // Find the forwarded message.
857 while (pi1_on_pi2_timestamp_fetcher.context().monotonic_event_time <
858 header_monotonic_sent_time) {
859 ASSERT_TRUE(pi1_on_pi2_timestamp_fetcher.FetchNext());
860 }
861
862 // And the source message.
863 while (pi1_on_pi1_timestamp_fetcher.context().monotonic_event_time <
864 header_monotonic_remote_time) {
865 ASSERT_TRUE(pi1_on_pi1_timestamp_fetcher.FetchNext());
866 }
867
868 pi1_context = &pi1_on_pi1_timestamp_fetcher.context();
869 pi2_context = &pi1_on_pi2_timestamp_fetcher.context();
870 } else if (header.channel_index() == ping_timestamp_channel) {
871 // Find the forwarded message.
872 while (ping_on_pi2_fetcher.context().monotonic_event_time <
873 header_monotonic_sent_time) {
874 ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
875 }
876
877 // And the source message.
878 while (ping_on_pi1_fetcher.context().monotonic_event_time <
879 header_monotonic_remote_time) {
880 ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
881 }
882
883 pi1_context = &ping_on_pi1_fetcher.context();
884 pi2_context = &ping_on_pi2_fetcher.context();
885 } else {
886 LOG(FATAL) << "Unknown channel";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700887 }
888
Austin Schuh89c9b812021-02-20 14:42:10 -0800889 // Confirm the forwarded message has matching timestamps to the
890 // timestamps we got back.
891 EXPECT_EQ(pi2_context->queue_index, header.queue_index());
892 EXPECT_EQ(pi2_context->remote_queue_index,
893 header.remote_queue_index());
894 EXPECT_EQ(pi2_context->monotonic_event_time,
895 header_monotonic_sent_time);
896 EXPECT_EQ(pi2_context->realtime_event_time,
897 header_realtime_sent_time);
898 EXPECT_EQ(pi2_context->realtime_remote_time,
899 header_realtime_remote_time);
900 EXPECT_EQ(pi2_context->monotonic_remote_time,
901 header_monotonic_remote_time);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700902
Austin Schuh89c9b812021-02-20 14:42:10 -0800903 // Confirm the forwarded message also matches the source message.
904 EXPECT_EQ(pi1_context->queue_index, header.remote_queue_index());
905 EXPECT_EQ(pi1_context->monotonic_event_time,
906 header_monotonic_remote_time);
907 EXPECT_EQ(pi1_context->realtime_event_time,
908 header_realtime_remote_time);
909 });
910 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700911
Austin Schuh4c3b9702020-08-30 11:34:55 -0700912 simulated_event_loop_factory.RunFor(chrono::seconds(10) -
913 chrono::milliseconds(500) +
914 chrono::milliseconds(5));
915
916 EXPECT_EQ(pi1_pong_counter.count(), 1001);
917 EXPECT_EQ(pi2_pong_counter.count(), 1001);
918
919 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 100);
920 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 100);
921 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 100);
922 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 100);
923 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 100);
924 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 100);
925 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 100);
926
Austin Schuh20ac95d2020-12-05 17:24:19 -0800927 EXPECT_EQ(pi1_server_statistics_count, 10);
928 EXPECT_EQ(pi2_server_statistics_count, 10);
929 EXPECT_EQ(pi3_server_statistics_count, 10);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700930
931 EXPECT_EQ(pi1_client_statistics_count, 95);
932 EXPECT_EQ(pi2_client_statistics_count, 95);
933 EXPECT_EQ(pi3_client_statistics_count, 95);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700934
935 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -0800936 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 1101);
937 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 1101);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700938}
939
940// Tests that an offset between nodes can be recovered and shows up in
941// ServerStatistics correctly.
942TEST(SimulatedEventLoopTest, MultinodePingPongWithOffset) {
943 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
Austin Schuh373f1762021-06-02 21:07:09 -0700944 aos::configuration::ReadConfig(ArtifactPath(
945 "aos/events/multinode_pingpong_test_combined_config.json"));
Austin Schuh4c3b9702020-08-30 11:34:55 -0700946 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
Austin Schuh87dd3832021-01-01 23:07:31 -0800947 const size_t pi1_index = configuration::GetNodeIndex(&config.message(), pi1);
948 ASSERT_EQ(pi1_index, 0u);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700949 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
Austin Schuh87dd3832021-01-01 23:07:31 -0800950 const size_t pi2_index = configuration::GetNodeIndex(&config.message(), pi2);
951 ASSERT_EQ(pi2_index, 1u);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700952 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
Austin Schuh87dd3832021-01-01 23:07:31 -0800953 const size_t pi3_index = configuration::GetNodeIndex(&config.message(), pi3);
954 ASSERT_EQ(pi3_index, 2u);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700955
Austin Schuh87dd3832021-01-01 23:07:31 -0800956 message_bridge::TestingTimeConverter time(
957 configuration::NodesCount(&config.message()));
Austin Schuh4c3b9702020-08-30 11:34:55 -0700958 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
Austin Schuh58646e22021-08-23 23:51:46 -0700959 simulated_event_loop_factory.SetTimeConverter(&time);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700960
961 constexpr chrono::milliseconds kOffset{1501};
Austin Schuh87dd3832021-01-01 23:07:31 -0800962 time.AddNextTimestamp(
963 distributed_clock::epoch(),
Austin Schuh58646e22021-08-23 23:51:46 -0700964 {BootTimestamp::epoch(), BootTimestamp::epoch() + kOffset,
965 BootTimestamp::epoch()});
Austin Schuh4c3b9702020-08-30 11:34:55 -0700966
967 std::unique_ptr<EventLoop> ping_event_loop =
968 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
969 Ping ping(ping_event_loop.get());
970
971 std::unique_ptr<EventLoop> pong_event_loop =
972 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
973 Pong pong(pong_event_loop.get());
974
Austin Schuh8fb315a2020-11-19 22:33:58 -0800975 // Wait to let timestamp estimation start up before looking for the results.
976 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
977
Austin Schuh87dd3832021-01-01 23:07:31 -0800978 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
979 simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
980
Austin Schuh4c3b9702020-08-30 11:34:55 -0700981 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
982 simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
983
984 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
985 simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
986
Austin Schuh4c3b9702020-08-30 11:34:55 -0700987 // Confirm the offsets are being recovered correctly.
988 int pi1_server_statistics_count = 0;
989 pi1_pong_counter_event_loop->MakeWatcher(
990 "/pi1/aos", [&pi1_server_statistics_count,
991 kOffset](const message_bridge::ServerStatistics &stats) {
992 VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
993 EXPECT_EQ(stats.connections()->size(), 2u);
994 for (const message_bridge::ServerConnection *connection :
995 *stats.connections()) {
996 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800997 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700998 if (connection->node()->name()->string_view() == "pi2") {
999 EXPECT_EQ(connection->monotonic_offset(),
1000 chrono::nanoseconds(kOffset).count());
1001 } else if (connection->node()->name()->string_view() == "pi3") {
1002 EXPECT_EQ(connection->monotonic_offset(), 0);
1003 } else {
1004 LOG(FATAL) << "Unknown connection";
1005 }
1006
1007 EXPECT_TRUE(connection->has_monotonic_offset());
1008 }
1009 ++pi1_server_statistics_count;
1010 });
1011
1012 int pi2_server_statistics_count = 0;
1013 pi2_pong_counter_event_loop->MakeWatcher(
1014 "/pi2/aos", [&pi2_server_statistics_count,
1015 kOffset](const message_bridge::ServerStatistics &stats) {
1016 VLOG(1) << "pi2 ServerStatistics " << FlatbufferToJson(&stats);
1017 EXPECT_EQ(stats.connections()->size(), 1u);
1018
1019 const message_bridge::ServerConnection *connection =
1020 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001021 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -07001022 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1023 EXPECT_TRUE(connection->has_monotonic_offset());
1024 EXPECT_EQ(connection->monotonic_offset(),
1025 -chrono::nanoseconds(kOffset).count());
1026 ++pi2_server_statistics_count;
1027 });
1028
1029 int pi3_server_statistics_count = 0;
1030 pi3_pong_counter_event_loop->MakeWatcher(
1031 "/pi3/aos", [&pi3_server_statistics_count](
1032 const message_bridge::ServerStatistics &stats) {
1033 VLOG(1) << "pi3 ServerStatistics " << FlatbufferToJson(&stats);
1034 EXPECT_EQ(stats.connections()->size(), 1u);
1035
1036 const message_bridge::ServerConnection *connection =
1037 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001038 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -07001039 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1040 EXPECT_TRUE(connection->has_monotonic_offset());
1041 EXPECT_EQ(connection->monotonic_offset(), 0);
1042 ++pi3_server_statistics_count;
1043 });
1044
1045 simulated_event_loop_factory.RunFor(chrono::seconds(10) -
1046 chrono::milliseconds(500) +
1047 chrono::milliseconds(5));
1048
Austin Schuh20ac95d2020-12-05 17:24:19 -08001049 EXPECT_EQ(pi1_server_statistics_count, 10);
Austin Schuh58646e22021-08-23 23:51:46 -07001050 EXPECT_EQ(pi2_server_statistics_count, 10);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001051 EXPECT_EQ(pi3_server_statistics_count, 10);
Austin Schuh4c3b9702020-08-30 11:34:55 -07001052}
1053
1054// Test that disabling statistics actually disables them.
Austin Schuh89c9b812021-02-20 14:42:10 -08001055TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeWithoutStatistics) {
Austin Schuh4c3b9702020-08-30 11:34:55 -07001056 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
1057 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
1058 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
1059
1060 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
1061 simulated_event_loop_factory.DisableStatistics();
1062
1063 std::unique_ptr<EventLoop> ping_event_loop =
1064 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
1065 Ping ping(ping_event_loop.get());
1066
1067 std::unique_ptr<EventLoop> pong_event_loop =
1068 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
1069 Pong pong(pong_event_loop.get());
1070
1071 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
1072 simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
1073
1074 MessageCounter<examples::Pong> pi2_pong_counter(
1075 pi2_pong_counter_event_loop.get(), "/test");
1076
1077 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
1078 simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
1079
1080 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
1081 simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
1082
1083 MessageCounter<examples::Pong> pi1_pong_counter(
1084 pi1_pong_counter_event_loop.get(), "/test");
1085
1086 // Count timestamps.
1087 MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
1088 pi1_pong_counter_event_loop.get(), "/pi1/aos");
1089 MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
1090 pi2_pong_counter_event_loop.get(), "/pi1/aos");
1091 MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
1092 pi3_pong_counter_event_loop.get(), "/pi1/aos");
1093 MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
1094 pi1_pong_counter_event_loop.get(), "/pi2/aos");
1095 MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
1096 pi2_pong_counter_event_loop.get(), "/pi2/aos");
1097 MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
1098 pi1_pong_counter_event_loop.get(), "/pi3/aos");
1099 MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
1100 pi3_pong_counter_event_loop.get(), "/pi3/aos");
1101
Austin Schuh2f8fd752020-09-01 22:38:28 -07001102 // Count remote timestamps
Austin Schuh89c9b812021-02-20 14:42:10 -08001103 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1104 remote_timestamps_pi2_on_pi1 =
1105 MakePi2OnPi1MessageCounters(pi1_pong_counter_event_loop.get());
1106 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1107 remote_timestamps_pi1_on_pi2 =
1108 MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
Austin Schuh2f8fd752020-09-01 22:38:28 -07001109
Austin Schuh4c3b9702020-08-30 11:34:55 -07001110 MessageCounter<message_bridge::ServerStatistics>
1111 pi1_server_statistics_counter(pi1_pong_counter_event_loop.get(),
1112 "/pi1/aos");
1113 MessageCounter<message_bridge::ServerStatistics>
1114 pi2_server_statistics_counter(pi2_pong_counter_event_loop.get(),
1115 "/pi2/aos");
1116 MessageCounter<message_bridge::ServerStatistics>
1117 pi3_server_statistics_counter(pi3_pong_counter_event_loop.get(),
1118 "/pi3/aos");
1119
1120 MessageCounter<message_bridge::ClientStatistics>
1121 pi1_client_statistics_counter(pi1_pong_counter_event_loop.get(),
1122 "/pi1/aos");
1123 MessageCounter<message_bridge::ClientStatistics>
1124 pi2_client_statistics_counter(pi2_pong_counter_event_loop.get(),
1125 "/pi2/aos");
1126 MessageCounter<message_bridge::ClientStatistics>
1127 pi3_client_statistics_counter(pi3_pong_counter_event_loop.get(),
1128 "/pi3/aos");
Austin Schuh898f4972020-01-11 17:21:25 -08001129
1130 simulated_event_loop_factory.RunFor(chrono::seconds(10) +
1131 chrono::milliseconds(5));
1132
Austin Schuh4c3b9702020-08-30 11:34:55 -07001133 EXPECT_EQ(pi1_pong_counter.count(), 1001u);
1134 EXPECT_EQ(pi2_pong_counter.count(), 1001u);
1135
1136 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 0u);
1137 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 0u);
1138 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 0u);
1139 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 0u);
1140 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 0u);
1141 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 0u);
1142 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 0u);
1143
1144 EXPECT_EQ(pi1_server_statistics_counter.count(), 0u);
1145 EXPECT_EQ(pi2_server_statistics_counter.count(), 0u);
1146 EXPECT_EQ(pi3_server_statistics_counter.count(), 0u);
1147
1148 EXPECT_EQ(pi1_client_statistics_counter.count(), 0u);
1149 EXPECT_EQ(pi2_client_statistics_counter.count(), 0u);
1150 EXPECT_EQ(pi3_client_statistics_counter.count(), 0u);
Austin Schuh2f8fd752020-09-01 22:38:28 -07001151
1152 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001153 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 1001);
1154 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 1001);
Austin Schuh898f4972020-01-11 17:21:25 -08001155}
1156
Austin Schuhc0b0f722020-12-12 18:36:06 -08001157bool AllConnected(const message_bridge::ServerStatistics *server_statistics) {
1158 for (const message_bridge::ServerConnection *connection :
1159 *server_statistics->connections()) {
1160 if (connection->state() != message_bridge::State::CONNECTED) {
1161 return false;
1162 }
1163 }
1164 return true;
1165}
1166
1167bool AllConnectedBut(const message_bridge::ServerStatistics *server_statistics,
1168 std::string_view target) {
1169 for (const message_bridge::ServerConnection *connection :
1170 *server_statistics->connections()) {
1171 if (connection->node()->name()->string_view() == target) {
1172 if (connection->state() == message_bridge::State::CONNECTED) {
1173 return false;
1174 }
1175 } else {
1176 if (connection->state() != message_bridge::State::CONNECTED) {
1177 return false;
1178 }
1179 }
1180 }
1181 return true;
1182}
1183
1184bool AllConnected(const message_bridge::ClientStatistics *client_statistics) {
1185 for (const message_bridge::ClientConnection *connection :
1186 *client_statistics->connections()) {
1187 if (connection->state() != message_bridge::State::CONNECTED) {
1188 return false;
1189 }
Austin Schuh367a7f42021-11-23 23:04:36 -08001190 EXPECT_TRUE(connection->has_boot_uuid());
1191 EXPECT_TRUE(connection->has_connected_since_time());
1192 EXPECT_TRUE(connection->has_connection_count());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001193 }
1194 return true;
1195}
1196
1197bool AllConnectedBut(const message_bridge::ClientStatistics *client_statistics,
1198 std::string_view target) {
1199 for (const message_bridge::ClientConnection *connection :
1200 *client_statistics->connections()) {
1201 if (connection->node()->name()->string_view() == target) {
1202 if (connection->state() == message_bridge::State::CONNECTED) {
1203 return false;
1204 }
Austin Schuh367a7f42021-11-23 23:04:36 -08001205 EXPECT_FALSE(connection->has_boot_uuid());
1206 EXPECT_FALSE(connection->has_connected_since_time());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001207 } else {
1208 if (connection->state() != message_bridge::State::CONNECTED) {
1209 return false;
1210 }
Austin Schuh367a7f42021-11-23 23:04:36 -08001211 EXPECT_TRUE(connection->has_boot_uuid());
1212 EXPECT_TRUE(connection->has_connected_since_time());
1213 EXPECT_TRUE(connection->has_connection_count());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001214 }
1215 }
1216 return true;
1217}
1218
Austin Schuh367a7f42021-11-23 23:04:36 -08001219int ConnectedCount(const message_bridge::ClientStatistics *client_statistics,
1220 std::string_view target) {
1221 for (const message_bridge::ClientConnection *connection :
1222 *client_statistics->connections()) {
1223 if (connection->node()->name()->string_view() == target) {
1224 return connection->connection_count();
1225 }
1226 }
1227 return 0;
1228}
1229
1230int ConnectedCount(const message_bridge::ServerStatistics *server_statistics,
1231 std::string_view target) {
1232 for (const message_bridge::ServerConnection *connection :
1233 *server_statistics->connections()) {
1234 if (connection->node()->name()->string_view() == target) {
1235 return connection->connection_count();
1236 }
1237 }
1238 return 0;
1239}
1240
Austin Schuhc0b0f722020-12-12 18:36:06 -08001241// Test that disconnecting nodes actually disconnects them.
Austin Schuh89c9b812021-02-20 14:42:10 -08001242TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeDisconnect) {
Austin Schuhc0b0f722020-12-12 18:36:06 -08001243 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
1244
Austin Schuh58646e22021-08-23 23:51:46 -07001245 NodeEventLoopFactory *pi1 =
1246 simulated_event_loop_factory.GetNodeEventLoopFactory("pi1");
1247 NodeEventLoopFactory *pi2 =
1248 simulated_event_loop_factory.GetNodeEventLoopFactory("pi2");
1249 NodeEventLoopFactory *pi3 =
1250 simulated_event_loop_factory.GetNodeEventLoopFactory("pi3");
1251
1252 std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001253 Ping ping(ping_event_loop.get());
1254
Austin Schuh58646e22021-08-23 23:51:46 -07001255 std::unique_ptr<EventLoop> pong_event_loop = pi2->MakeEventLoop("pong");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001256 Pong pong(pong_event_loop.get());
1257
1258 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
Austin Schuh58646e22021-08-23 23:51:46 -07001259 pi2->MakeEventLoop("pi2_pong_counter");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001260
1261 MessageCounter<examples::Pong> pi2_pong_counter(
1262 pi2_pong_counter_event_loop.get(), "/test");
1263
1264 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
Austin Schuh58646e22021-08-23 23:51:46 -07001265 pi3->MakeEventLoop("pi3_pong_counter");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001266
1267 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
Austin Schuh58646e22021-08-23 23:51:46 -07001268 pi1->MakeEventLoop("pi1_pong_counter");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001269
1270 MessageCounter<examples::Pong> pi1_pong_counter(
1271 pi1_pong_counter_event_loop.get(), "/test");
1272
1273 // Count timestamps.
1274 MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
1275 pi1_pong_counter_event_loop.get(), "/pi1/aos");
1276 MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
1277 pi2_pong_counter_event_loop.get(), "/pi1/aos");
1278 MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
1279 pi3_pong_counter_event_loop.get(), "/pi1/aos");
1280 MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
1281 pi1_pong_counter_event_loop.get(), "/pi2/aos");
1282 MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
1283 pi2_pong_counter_event_loop.get(), "/pi2/aos");
1284 MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
1285 pi1_pong_counter_event_loop.get(), "/pi3/aos");
1286 MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
1287 pi3_pong_counter_event_loop.get(), "/pi3/aos");
1288
1289 // Count remote timestamps
Austin Schuh89c9b812021-02-20 14:42:10 -08001290 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1291 remote_timestamps_pi2_on_pi1 =
1292 MakePi2OnPi1MessageCounters(pi1_pong_counter_event_loop.get());
1293 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1294 remote_timestamps_pi1_on_pi2 =
1295 MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001296
1297 MessageCounter<message_bridge::ServerStatistics>
Austin Schuh58646e22021-08-23 23:51:46 -07001298 *pi1_server_statistics_counter;
1299 pi1->OnStartup([pi1, &pi1_server_statistics_counter]() {
1300 pi1_server_statistics_counter =
1301 pi1->AlwaysStart<MessageCounter<message_bridge::ServerStatistics>>(
1302 "pi1_server_statistics_counter", "/pi1/aos");
1303 });
1304
Austin Schuhc0b0f722020-12-12 18:36:06 -08001305 aos::Fetcher<message_bridge::ServerStatistics> pi1_server_statistics_fetcher =
1306 pi1_pong_counter_event_loop
1307 ->MakeFetcher<message_bridge::ServerStatistics>("/pi1/aos");
1308 aos::Fetcher<message_bridge::ClientStatistics> pi1_client_statistics_fetcher =
1309 pi1_pong_counter_event_loop
1310 ->MakeFetcher<message_bridge::ClientStatistics>("/pi1/aos");
1311
1312 MessageCounter<message_bridge::ServerStatistics>
Austin Schuh58646e22021-08-23 23:51:46 -07001313 *pi2_server_statistics_counter;
1314 pi2->OnStartup([pi2, &pi2_server_statistics_counter]() {
1315 pi2_server_statistics_counter =
1316 pi2->AlwaysStart<MessageCounter<message_bridge::ServerStatistics>>(
1317 "pi2_server_statistics_counter", "/pi2/aos");
1318 });
Austin Schuhc0b0f722020-12-12 18:36:06 -08001319 aos::Fetcher<message_bridge::ServerStatistics> pi2_server_statistics_fetcher =
1320 pi2_pong_counter_event_loop
1321 ->MakeFetcher<message_bridge::ServerStatistics>("/pi2/aos");
1322 aos::Fetcher<message_bridge::ClientStatistics> pi2_client_statistics_fetcher =
1323 pi2_pong_counter_event_loop
1324 ->MakeFetcher<message_bridge::ClientStatistics>("/pi2/aos");
1325
1326 MessageCounter<message_bridge::ServerStatistics>
Austin Schuh58646e22021-08-23 23:51:46 -07001327 *pi3_server_statistics_counter;
1328 pi3->OnStartup([pi3, &pi3_server_statistics_counter]() {
1329 pi3_server_statistics_counter =
1330 pi3->AlwaysStart<MessageCounter<message_bridge::ServerStatistics>>(
1331 "pi3_server_statistics_counter", "/pi3/aos");
1332 });
Austin Schuhc0b0f722020-12-12 18:36:06 -08001333 aos::Fetcher<message_bridge::ServerStatistics> pi3_server_statistics_fetcher =
1334 pi3_pong_counter_event_loop
1335 ->MakeFetcher<message_bridge::ServerStatistics>("/pi3/aos");
1336 aos::Fetcher<message_bridge::ClientStatistics> pi3_client_statistics_fetcher =
1337 pi3_pong_counter_event_loop
1338 ->MakeFetcher<message_bridge::ClientStatistics>("/pi3/aos");
1339
1340 MessageCounter<message_bridge::ClientStatistics>
1341 pi1_client_statistics_counter(pi1_pong_counter_event_loop.get(),
1342 "/pi1/aos");
1343 MessageCounter<message_bridge::ClientStatistics>
1344 pi2_client_statistics_counter(pi2_pong_counter_event_loop.get(),
1345 "/pi2/aos");
1346 MessageCounter<message_bridge::ClientStatistics>
1347 pi3_client_statistics_counter(pi3_pong_counter_event_loop.get(),
1348 "/pi3/aos");
1349
1350 simulated_event_loop_factory.RunFor(chrono::seconds(2) +
1351 chrono::milliseconds(5));
1352
1353 EXPECT_EQ(pi1_pong_counter.count(), 201u);
1354 EXPECT_EQ(pi2_pong_counter.count(), 201u);
1355
1356 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 20u);
1357 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 20u);
1358 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 20u);
1359 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 20u);
1360 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 20u);
1361 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 20u);
1362 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 20u);
1363
Austin Schuh58646e22021-08-23 23:51:46 -07001364 EXPECT_EQ(pi1_server_statistics_counter->count(), 2u);
1365 EXPECT_EQ(pi2_server_statistics_counter->count(), 2u);
1366 EXPECT_EQ(pi3_server_statistics_counter->count(), 2u);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001367
1368 EXPECT_EQ(pi1_client_statistics_counter.count(), 20u);
1369 EXPECT_EQ(pi2_client_statistics_counter.count(), 20u);
1370 EXPECT_EQ(pi3_client_statistics_counter.count(), 20u);
1371
1372 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001373 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 221);
1374 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 221);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001375
1376 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1377 EXPECT_TRUE(AllConnected(pi1_server_statistics_fetcher.get()))
1378 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1379 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1380 EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
1381 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1382 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1383 EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
1384 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1385 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1386 EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
1387 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1388 EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
1389 EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
1390 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
1391 EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
1392 EXPECT_TRUE(AllConnected(pi3_client_statistics_fetcher.get()))
1393 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1394
Austin Schuh58646e22021-08-23 23:51:46 -07001395 pi1->Disconnect(pi3->node());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001396
1397 simulated_event_loop_factory.RunFor(chrono::seconds(2));
1398
1399 EXPECT_EQ(pi1_pong_counter.count(), 401u);
1400 EXPECT_EQ(pi2_pong_counter.count(), 401u);
1401
1402 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 40u);
1403 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 40u);
1404 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 20u);
1405 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 40u);
1406 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 40u);
1407 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 40u);
1408 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 40u);
1409
Austin Schuh58646e22021-08-23 23:51:46 -07001410 EXPECT_EQ(pi1_server_statistics_counter->count(), 4u);
1411 EXPECT_EQ(pi2_server_statistics_counter->count(), 4u);
1412 EXPECT_EQ(pi3_server_statistics_counter->count(), 4u);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001413
1414 EXPECT_EQ(pi1_client_statistics_counter.count(), 40u);
1415 EXPECT_EQ(pi2_client_statistics_counter.count(), 40u);
1416 EXPECT_EQ(pi3_client_statistics_counter.count(), 40u);
1417
1418 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001419 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 441);
1420 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 441);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001421
1422 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1423 EXPECT_TRUE(AllConnectedBut(pi1_server_statistics_fetcher.get(), "pi3"))
1424 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1425 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1426 EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
1427 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1428 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1429 EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
1430 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1431 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1432 EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
1433 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1434 EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
1435 EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
1436 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
1437 EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
1438 EXPECT_TRUE(AllConnectedBut(pi3_client_statistics_fetcher.get(), "pi1"))
1439 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1440
Austin Schuh58646e22021-08-23 23:51:46 -07001441 pi1->Connect(pi3->node());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001442
1443 simulated_event_loop_factory.RunFor(chrono::seconds(2));
1444
Austin Schuh367a7f42021-11-23 23:04:36 -08001445 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1446 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1447 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1448 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1449 EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
1450 EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
1451
1452 EXPECT_EQ(ConnectedCount(pi1_server_statistics_fetcher.get(), "pi3"), 2u)
1453 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1454 EXPECT_EQ(ConnectedCount(pi1_server_statistics_fetcher.get(), "pi2"), 1u)
1455 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1456 EXPECT_EQ(ConnectedCount(pi1_client_statistics_fetcher.get(), "pi3"), 1u)
1457 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1458 EXPECT_EQ(ConnectedCount(pi1_client_statistics_fetcher.get(), "pi2"), 1u)
1459 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1460
1461 EXPECT_EQ(ConnectedCount(pi2_server_statistics_fetcher.get(), "pi1"), 1u)
1462 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1463 EXPECT_EQ(ConnectedCount(pi2_client_statistics_fetcher.get(), "pi1"), 1u)
1464 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1465
1466 EXPECT_EQ(ConnectedCount(pi3_server_statistics_fetcher.get(), "pi1"), 1u)
1467 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
1468 EXPECT_EQ(ConnectedCount(pi3_client_statistics_fetcher.get(), "pi1"), 2u)
1469 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1470
Austin Schuhc0b0f722020-12-12 18:36:06 -08001471 EXPECT_EQ(pi1_pong_counter.count(), 601u);
1472 EXPECT_EQ(pi2_pong_counter.count(), 601u);
1473
1474 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 60u);
1475 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 60u);
1476 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 40u);
1477 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 60u);
1478 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 60u);
1479 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 60u);
1480 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 60u);
1481
Austin Schuh58646e22021-08-23 23:51:46 -07001482 EXPECT_EQ(pi1_server_statistics_counter->count(), 6u);
1483 EXPECT_EQ(pi2_server_statistics_counter->count(), 6u);
1484 EXPECT_EQ(pi3_server_statistics_counter->count(), 6u);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001485
1486 EXPECT_EQ(pi1_client_statistics_counter.count(), 60u);
1487 EXPECT_EQ(pi2_client_statistics_counter.count(), 60u);
1488 EXPECT_EQ(pi3_client_statistics_counter.count(), 60u);
1489
1490 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001491 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 661);
1492 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 661);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001493
Austin Schuhc0b0f722020-12-12 18:36:06 -08001494 EXPECT_TRUE(AllConnected(pi1_server_statistics_fetcher.get()))
1495 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001496 EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
1497 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001498 EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
1499 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001500 EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
1501 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001502 EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
1503 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001504 EXPECT_TRUE(AllConnected(pi3_client_statistics_fetcher.get()))
1505 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1506}
1507
Austin Schuh2febf0d2020-09-21 22:24:30 -07001508// Tests that the time offset having a slope doesn't break the world.
1509// SimulatedMessageBridge has enough self consistency CHECK statements to
1510// confirm, and we can can also check a message in each direction to make sure
1511// it gets delivered as expected.
1512TEST(SimulatedEventLoopTest, MultinodePingPongWithOffsetAndSlope) {
1513 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
Austin Schuh373f1762021-06-02 21:07:09 -07001514 aos::configuration::ReadConfig(ArtifactPath(
1515 "aos/events/multinode_pingpong_test_combined_config.json"));
Austin Schuh2febf0d2020-09-21 22:24:30 -07001516 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
Austin Schuh87dd3832021-01-01 23:07:31 -08001517 const size_t pi1_index = configuration::GetNodeIndex(&config.message(), pi1);
1518 ASSERT_EQ(pi1_index, 0u);
Austin Schuh2febf0d2020-09-21 22:24:30 -07001519 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
Austin Schuh87dd3832021-01-01 23:07:31 -08001520 const size_t pi2_index = configuration::GetNodeIndex(&config.message(), pi2);
1521 ASSERT_EQ(pi2_index, 1u);
1522 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
1523 const size_t pi3_index = configuration::GetNodeIndex(&config.message(), pi3);
1524 ASSERT_EQ(pi3_index, 2u);
Austin Schuh2febf0d2020-09-21 22:24:30 -07001525
Austin Schuh87dd3832021-01-01 23:07:31 -08001526 message_bridge::TestingTimeConverter time(
1527 configuration::NodesCount(&config.message()));
Austin Schuh2febf0d2020-09-21 22:24:30 -07001528 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
Austin Schuh58646e22021-08-23 23:51:46 -07001529 simulated_event_loop_factory.SetTimeConverter(&time);
Austin Schuh2febf0d2020-09-21 22:24:30 -07001530
Austin Schuh2febf0d2020-09-21 22:24:30 -07001531 constexpr chrono::milliseconds kOffset{150100};
Austin Schuh87dd3832021-01-01 23:07:31 -08001532 time.AddNextTimestamp(
1533 distributed_clock::epoch(),
Austin Schuh58646e22021-08-23 23:51:46 -07001534 {BootTimestamp::epoch(), BootTimestamp::epoch() + kOffset,
1535 BootTimestamp::epoch()});
1536 time.AddNextTimestamp(distributed_clock::epoch() + chrono::seconds(10),
1537 {BootTimestamp::epoch() + chrono::milliseconds(9999),
1538 BootTimestamp::epoch() + kOffset + chrono::seconds(10),
1539 BootTimestamp::epoch() + chrono::milliseconds(9999)});
Austin Schuh2febf0d2020-09-21 22:24:30 -07001540
1541 std::unique_ptr<EventLoop> ping_event_loop =
1542 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
1543 Ping ping(ping_event_loop.get());
1544
1545 std::unique_ptr<EventLoop> pong_event_loop =
1546 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
1547 Pong pong(pong_event_loop.get());
1548
1549 std::unique_ptr<EventLoop> pi1_counter_event_loop =
1550 simulated_event_loop_factory.MakeEventLoop("pi1_counter", pi1);
1551 std::unique_ptr<EventLoop> pi2_counter_event_loop =
1552 simulated_event_loop_factory.MakeEventLoop("pi2_counter", pi2);
1553
1554 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
1555 pi1_counter_event_loop->MakeFetcher<examples::Ping>("/test");
1556 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
1557 pi2_counter_event_loop->MakeFetcher<examples::Ping>("/test");
1558
1559 aos::Fetcher<examples::Pong> pong_on_pi2_fetcher =
1560 pi2_counter_event_loop->MakeFetcher<examples::Pong>("/test");
1561 aos::Fetcher<examples::Pong> pong_on_pi1_fetcher =
1562 pi1_counter_event_loop->MakeFetcher<examples::Pong>("/test");
1563
1564 // End after a pong message comes back. This will leave the latest messages
1565 // on all channels so we can look at timestamps easily and check they make
1566 // sense.
1567 std::unique_ptr<EventLoop> pi1_pong_ender =
1568 simulated_event_loop_factory.MakeEventLoop("pi2_counter", pi1);
1569 int count = 0;
1570 pi1_pong_ender->MakeWatcher(
1571 "/test", [&simulated_event_loop_factory, &count](const examples::Pong &) {
1572 if (++count == 100) {
1573 simulated_event_loop_factory.Exit();
1574 }
1575 });
1576
1577 // Run enough that messages should be delivered.
1578 simulated_event_loop_factory.Run();
1579
1580 // Grab the latest messages.
1581 EXPECT_TRUE(ping_on_pi1_fetcher.Fetch());
1582 EXPECT_TRUE(ping_on_pi2_fetcher.Fetch());
1583 EXPECT_TRUE(pong_on_pi1_fetcher.Fetch());
1584 EXPECT_TRUE(pong_on_pi2_fetcher.Fetch());
1585
1586 // Compute their time on the global distributed clock so we can compute
1587 // distance betwen them.
1588 const distributed_clock::time_point pi1_ping_time =
1589 simulated_event_loop_factory.GetNodeEventLoopFactory(pi1)
1590 ->ToDistributedClock(
1591 ping_on_pi1_fetcher.context().monotonic_event_time);
1592 const distributed_clock::time_point pi2_ping_time =
1593 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
1594 ->ToDistributedClock(
1595 ping_on_pi2_fetcher.context().monotonic_event_time);
1596 const distributed_clock::time_point pi1_pong_time =
1597 simulated_event_loop_factory.GetNodeEventLoopFactory(pi1)
1598 ->ToDistributedClock(
1599 pong_on_pi1_fetcher.context().monotonic_event_time);
1600 const distributed_clock::time_point pi2_pong_time =
1601 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
1602 ->ToDistributedClock(
1603 pong_on_pi2_fetcher.context().monotonic_event_time);
1604
1605 // And confirm the delivery delay is just about exactly 150 uS for both
1606 // directions like expected. There will be a couple ns of rounding errors in
1607 // the conversion functions that aren't worth accounting for right now. This
1608 // will either be really close, or really far.
1609 EXPECT_GE(pi2_ping_time, chrono::microseconds(150) - chrono::nanoseconds(10) +
1610 pi1_ping_time);
1611 EXPECT_LE(pi2_ping_time, chrono::microseconds(150) + chrono::nanoseconds(10) +
1612 pi1_ping_time);
1613
1614 EXPECT_GE(pi1_pong_time, chrono::microseconds(150) - chrono::nanoseconds(10) +
1615 pi2_pong_time);
1616 EXPECT_LE(pi1_pong_time, chrono::microseconds(150) + chrono::nanoseconds(10) +
1617 pi2_pong_time);
1618}
1619
Austin Schuh4c570ea2020-11-19 23:13:24 -08001620void SendPing(aos::Sender<examples::Ping> *sender, int value) {
1621 aos::Sender<examples::Ping>::Builder builder = sender->MakeBuilder();
1622 examples::Ping::Builder ping_builder = builder.MakeBuilder<examples::Ping>();
1623 ping_builder.add_value(value);
milind1f1dca32021-07-03 13:50:07 -07001624 builder.CheckOk(builder.Send(ping_builder.Finish()));
Austin Schuh4c570ea2020-11-19 23:13:24 -08001625}
1626
1627// Tests that reliable (and unreliable) ping messages get forwarded as expected.
Austin Schuh89c9b812021-02-20 14:42:10 -08001628TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeStartupTesting) {
Austin Schuh4c570ea2020-11-19 23:13:24 -08001629 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
1630 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
1631
1632 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
1633
1634 std::unique_ptr<EventLoop> ping_event_loop =
1635 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
1636 aos::Sender<examples::Ping> pi1_reliable_sender =
1637 ping_event_loop->MakeSender<examples::Ping>("/reliable");
1638 aos::Sender<examples::Ping> pi1_unreliable_sender =
1639 ping_event_loop->MakeSender<examples::Ping>("/unreliable");
1640 SendPing(&pi1_reliable_sender, 1);
1641 SendPing(&pi1_unreliable_sender, 1);
1642
1643 std::unique_ptr<EventLoop> pi2_pong_event_loop =
1644 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
1645 MessageCounter<examples::Ping> pi2_reliable_counter(pi2_pong_event_loop.get(),
1646 "/reliable");
1647 MessageCounter<examples::Ping> pi2_unreliable_counter(
1648 pi2_pong_event_loop.get(), "/unreliable");
1649 aos::Fetcher<examples::Ping> reliable_on_pi2_fetcher =
1650 pi2_pong_event_loop->MakeFetcher<examples::Ping>("/reliable");
1651 aos::Fetcher<examples::Ping> unreliable_on_pi2_fetcher =
1652 pi2_pong_event_loop->MakeFetcher<examples::Ping>("/unreliable");
1653
1654 const size_t reliable_channel_index = configuration::ChannelIndex(
1655 pi2_pong_event_loop->configuration(), reliable_on_pi2_fetcher.channel());
1656
1657 std::unique_ptr<EventLoop> pi1_remote_timestamp =
1658 simulated_event_loop_factory.MakeEventLoop("pi1_remote_timestamp", pi1);
1659
Austin Schuheeaa2022021-01-02 21:52:03 -08001660 const chrono::nanoseconds network_delay =
1661 simulated_event_loop_factory.network_delay();
1662
Austin Schuh4c570ea2020-11-19 23:13:24 -08001663 int reliable_timestamp_count = 0;
1664 pi1_remote_timestamp->MakeWatcher(
Austin Schuh89c9b812021-02-20 14:42:10 -08001665 shared() ? "/pi1/aos/remote_timestamps/pi2"
1666 : "/pi1/aos/remote_timestamps/pi2/reliable/aos-examples-Ping",
Austin Schuh20ac95d2020-12-05 17:24:19 -08001667 [reliable_channel_index, &reliable_timestamp_count,
Austin Schuheeaa2022021-01-02 21:52:03 -08001668 &simulated_event_loop_factory, pi2, network_delay, &pi2_pong_event_loop,
1669 &pi1_remote_timestamp](const RemoteMessage &header) {
Austin Schuh20ac95d2020-12-05 17:24:19 -08001670 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuhcdd90272021-03-15 12:46:16 -07001671 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
Austin Schuh20ac95d2020-12-05 17:24:19 -08001672 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
Austin Schuhcdd90272021-03-15 12:46:16 -07001673 ->boot_uuid());
Austin Schuh4c570ea2020-11-19 23:13:24 -08001674 VLOG(1) << aos::FlatbufferToJson(&header);
1675 if (header.channel_index() == reliable_channel_index) {
1676 ++reliable_timestamp_count;
1677 }
Austin Schuheeaa2022021-01-02 21:52:03 -08001678
1679 const aos::monotonic_clock::time_point header_monotonic_sent_time(
1680 chrono::nanoseconds(header.monotonic_sent_time()));
1681
1682 EXPECT_EQ(pi1_remote_timestamp->context().monotonic_event_time,
1683 header_monotonic_sent_time + network_delay +
1684 (pi1_remote_timestamp->monotonic_now() -
1685 pi2_pong_event_loop->monotonic_now()));
Austin Schuh4c570ea2020-11-19 23:13:24 -08001686 });
1687
1688 // Wait to let timestamp estimation start up before looking for the results.
1689 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
1690
1691 EXPECT_EQ(pi2_reliable_counter.count(), 1u);
1692 // This one isn't reliable, but was sent before the start. It should *not* be
1693 // delivered.
1694 EXPECT_EQ(pi2_unreliable_counter.count(), 0u);
1695 // Confirm we got a timestamp logged for the message that was forwarded.
1696 EXPECT_EQ(reliable_timestamp_count, 1u);
1697
1698 SendPing(&pi1_reliable_sender, 2);
1699 SendPing(&pi1_unreliable_sender, 2);
1700 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
1701 EXPECT_EQ(pi2_reliable_counter.count(), 2u);
1702 EXPECT_EQ(pi2_unreliable_counter.count(), 1u);
1703
1704 EXPECT_EQ(reliable_timestamp_count, 2u);
1705}
1706
Austin Schuh20ac95d2020-12-05 17:24:19 -08001707// Tests that rebooting a node changes the ServerStatistics message and the
1708// RemoteTimestamp message.
Austin Schuh89c9b812021-02-20 14:42:10 -08001709TEST_P(RemoteMessageSimulatedEventLoopTest, BootUUIDTest) {
Austin Schuh72e65682021-09-02 11:37:05 -07001710 const UUID pi1_boot0 = UUID::Random();
1711 const UUID pi2_boot0 = UUID::Random();
1712 const UUID pi2_boot1 = UUID::Random();
1713 const UUID pi3_boot0 = UUID::Random();
1714 UUID expected_boot_uuid = pi2_boot0;
Austin Schuh20ac95d2020-12-05 17:24:19 -08001715
Austin Schuh58646e22021-08-23 23:51:46 -07001716 message_bridge::TestingTimeConverter time(
1717 configuration::NodesCount(&config.message()));
1718 SimulatedEventLoopFactory factory(&config.message());
1719 factory.SetTimeConverter(&time);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001720
Austin Schuh58646e22021-08-23 23:51:46 -07001721 const size_t pi1_index =
1722 configuration::GetNodeIndex(&config.message(), "pi1");
1723 const size_t pi2_index =
1724 configuration::GetNodeIndex(&config.message(), "pi2");
1725 const size_t pi3_index =
1726 configuration::GetNodeIndex(&config.message(), "pi3");
Austin Schuh20ac95d2020-12-05 17:24:19 -08001727
Austin Schuh58646e22021-08-23 23:51:46 -07001728 {
1729 time.AddNextTimestamp(distributed_clock::epoch(),
1730 {BootTimestamp::epoch(), BootTimestamp::epoch(),
1731 BootTimestamp::epoch()});
1732
1733 const chrono::nanoseconds dt = chrono::milliseconds(2001);
1734
1735 time.AddNextTimestamp(
1736 distributed_clock::epoch() + dt,
1737 {BootTimestamp::epoch() + dt,
1738 BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
1739 BootTimestamp::epoch() + dt});
1740
1741 time.set_boot_uuid(pi1_index, 0, pi1_boot0);
1742 time.set_boot_uuid(pi2_index, 0, pi2_boot0);
1743 time.set_boot_uuid(pi2_index, 1, pi2_boot1);
1744 time.set_boot_uuid(pi3_index, 0, pi3_boot0);
1745 }
1746
1747 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1748 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
1749
1750 pi1->OnStartup([pi1]() { pi1->AlwaysStart<Ping>("ping"); });
1751 pi2->OnStartup([pi2]() { pi2->AlwaysStart<Pong>("pong"); });
Austin Schuh20ac95d2020-12-05 17:24:19 -08001752
1753 std::unique_ptr<EventLoop> pi1_remote_timestamp =
Austin Schuh58646e22021-08-23 23:51:46 -07001754 pi1->MakeEventLoop("pi1_remote_timestamp");
Austin Schuh20ac95d2020-12-05 17:24:19 -08001755
1756 int timestamp_count = 0;
1757 pi1_remote_timestamp->MakeWatcher(
Austin Schuh8902fa52021-03-14 22:39:24 -07001758 "/pi2/aos", [&expected_boot_uuid,
1759 &pi1_remote_timestamp](const message_bridge::Timestamp &) {
Austin Schuha9012be2021-07-21 15:19:11 -07001760 EXPECT_EQ(pi1_remote_timestamp->context().source_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -07001761 expected_boot_uuid);
1762 });
1763 pi1_remote_timestamp->MakeWatcher(
1764 "/test",
1765 [&expected_boot_uuid, &pi1_remote_timestamp](const examples::Pong &) {
Austin Schuha9012be2021-07-21 15:19:11 -07001766 EXPECT_EQ(pi1_remote_timestamp->context().source_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -07001767 expected_boot_uuid);
1768 });
1769 pi1_remote_timestamp->MakeWatcher(
Austin Schuh89c9b812021-02-20 14:42:10 -08001770 shared() ? "/pi1/aos/remote_timestamps/pi2"
1771 : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping",
Austin Schuh20ac95d2020-12-05 17:24:19 -08001772 [&timestamp_count, &expected_boot_uuid](const RemoteMessage &header) {
1773 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuhcdd90272021-03-15 12:46:16 -07001774 EXPECT_EQ(UUID::FromVector(header.boot_uuid()), expected_boot_uuid);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001775 VLOG(1) << aos::FlatbufferToJson(&header);
1776 ++timestamp_count;
1777 });
1778
1779 int pi1_server_statistics_count = 0;
Austin Schuh58646e22021-08-23 23:51:46 -07001780 bool first_pi1_server_statistics = true;
Austin Schuh367a7f42021-11-23 23:04:36 -08001781 int boot_number = 0;
1782 monotonic_clock::time_point expected_connection_time = pi1->monotonic_now();
Austin Schuh20ac95d2020-12-05 17:24:19 -08001783 pi1_remote_timestamp->MakeWatcher(
Austin Schuh367a7f42021-11-23 23:04:36 -08001784 "/pi1/aos",
1785 [&pi1_server_statistics_count, &expected_boot_uuid,
1786 &expected_connection_time, &first_pi1_server_statistics,
1787 &boot_number](const message_bridge::ServerStatistics &stats) {
Austin Schuh20ac95d2020-12-05 17:24:19 -08001788 VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
1789 for (const message_bridge::ServerConnection *connection :
1790 *stats.connections()) {
Austin Schuh58646e22021-08-23 23:51:46 -07001791 if (connection->state() == message_bridge::State::CONNECTED) {
1792 ASSERT_TRUE(connection->has_boot_uuid());
1793 }
1794 if (!first_pi1_server_statistics) {
1795 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1796 }
Austin Schuh20ac95d2020-12-05 17:24:19 -08001797 if (connection->node()->name()->string_view() == "pi2") {
Austin Schuh58646e22021-08-23 23:51:46 -07001798 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1799 ASSERT_TRUE(connection->has_boot_uuid());
Austin Schuh20ac95d2020-12-05 17:24:19 -08001800 EXPECT_EQ(expected_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -07001801 UUID::FromString(connection->boot_uuid()))
Austin Schuh20ac95d2020-12-05 17:24:19 -08001802 << " : Got " << aos::FlatbufferToJson(&stats);
Austin Schuh367a7f42021-11-23 23:04:36 -08001803 EXPECT_EQ(monotonic_clock::time_point(chrono::nanoseconds(
1804 connection->connected_since_time())),
1805 expected_connection_time);
1806 EXPECT_EQ(boot_number + 1, connection->connection_count());
Austin Schuh20ac95d2020-12-05 17:24:19 -08001807 ++pi1_server_statistics_count;
1808 }
1809 }
Austin Schuh58646e22021-08-23 23:51:46 -07001810 first_pi1_server_statistics = false;
Austin Schuh20ac95d2020-12-05 17:24:19 -08001811 });
1812
Austin Schuh58646e22021-08-23 23:51:46 -07001813 int pi1_client_statistics_count = 0;
1814 pi1_remote_timestamp->MakeWatcher(
Austin Schuh367a7f42021-11-23 23:04:36 -08001815 "/pi1/aos", [&pi1_client_statistics_count, &expected_boot_uuid,
1816 &expected_connection_time, &boot_number](
Austin Schuh58646e22021-08-23 23:51:46 -07001817 const message_bridge::ClientStatistics &stats) {
1818 VLOG(1) << "pi1 ClientStatistics " << FlatbufferToJson(&stats);
1819 for (const message_bridge::ClientConnection *connection :
1820 *stats.connections()) {
1821 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1822 if (connection->node()->name()->string_view() == "pi2") {
1823 ++pi1_client_statistics_count;
Austin Schuh367a7f42021-11-23 23:04:36 -08001824 EXPECT_EQ(expected_boot_uuid,
1825 UUID::FromString(connection->boot_uuid()))
1826 << " : Got " << aos::FlatbufferToJson(&stats);
1827 EXPECT_EQ(monotonic_clock::time_point(chrono::nanoseconds(
1828 connection->connected_since_time())),
1829 expected_connection_time);
1830 EXPECT_EQ(boot_number + 1, connection->connection_count());
1831 } else {
1832 EXPECT_EQ(connection->connected_since_time(), 0);
1833 EXPECT_EQ(1, connection->connection_count());
Austin Schuh58646e22021-08-23 23:51:46 -07001834 }
1835 }
1836 });
1837
1838 // Confirm that reboot changes the UUID.
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -07001839 pi2->OnShutdown([&expected_boot_uuid, &boot_number, &expected_connection_time,
1840 pi1, pi2, pi2_boot1]() {
1841 expected_boot_uuid = pi2_boot1;
1842 ++boot_number;
1843 LOG(INFO) << "OnShutdown triggered for pi2";
1844 pi2->OnStartup(
1845 [&expected_boot_uuid, &expected_connection_time, pi1, pi2]() {
1846 EXPECT_EQ(expected_boot_uuid, pi2->boot_uuid());
1847 expected_connection_time = pi1->monotonic_now();
1848 });
1849 });
Austin Schuh58646e22021-08-23 23:51:46 -07001850
Austin Schuh20ac95d2020-12-05 17:24:19 -08001851 // Let a couple of ServerStatistics messages show up before rebooting.
Austin Schuh58646e22021-08-23 23:51:46 -07001852 factory.RunFor(chrono::milliseconds(2002));
Austin Schuh20ac95d2020-12-05 17:24:19 -08001853
1854 EXPECT_GT(timestamp_count, 100);
1855 EXPECT_GE(pi1_server_statistics_count, 1u);
1856
Austin Schuh20ac95d2020-12-05 17:24:19 -08001857 timestamp_count = 0;
1858 pi1_server_statistics_count = 0;
1859
Austin Schuh58646e22021-08-23 23:51:46 -07001860 factory.RunFor(chrono::milliseconds(2000));
Austin Schuh20ac95d2020-12-05 17:24:19 -08001861 EXPECT_GT(timestamp_count, 100);
1862 EXPECT_GE(pi1_server_statistics_count, 1u);
1863}
1864
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -07001865INSTANTIATE_TEST_SUITE_P(
Austin Schuh89c9b812021-02-20 14:42:10 -08001866 All, RemoteMessageSimulatedEventLoopTest,
1867 ::testing::Values(
1868 Param{"multinode_pingpong_test_combined_config.json", true},
1869 Param{"multinode_pingpong_test_split_config.json", false}));
1870
Austin Schuh58646e22021-08-23 23:51:46 -07001871// Tests that Startup and Shutdown do reasonable things.
1872TEST(SimulatedEventLoopTest, MultinodePingPongStartup) {
1873 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
1874 aos::configuration::ReadConfig(
1875 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
1876
Austin Schuh72e65682021-09-02 11:37:05 -07001877 size_t pi1_shutdown_counter = 0;
1878 size_t pi2_shutdown_counter = 0;
1879 MessageCounter<examples::Pong> *pi1_pong_counter = nullptr;
1880 MessageCounter<examples::Ping> *pi2_ping_counter = nullptr;
1881
Austin Schuh58646e22021-08-23 23:51:46 -07001882 message_bridge::TestingTimeConverter time(
1883 configuration::NodesCount(&config.message()));
1884 SimulatedEventLoopFactory factory(&config.message());
1885 factory.SetTimeConverter(&time);
1886 time.AddNextTimestamp(
1887 distributed_clock::epoch(),
1888 {BootTimestamp::epoch(), BootTimestamp::epoch(), BootTimestamp::epoch()});
1889
1890 const chrono::nanoseconds dt = chrono::seconds(10) + chrono::milliseconds(6);
1891
1892 time.AddNextTimestamp(
1893 distributed_clock::epoch() + dt,
1894 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
1895 BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
1896 BootTimestamp::epoch() + dt});
1897
1898 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1899 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
1900
1901 // Configure startup to start Ping and Pong, and count.
1902 size_t pi1_startup_counter = 0;
1903 size_t pi2_startup_counter = 0;
1904 pi1->OnStartup([pi1]() {
1905 LOG(INFO) << "Made ping";
1906 pi1->AlwaysStart<Ping>("ping");
1907 });
1908 pi1->OnStartup([&pi1_startup_counter]() { ++pi1_startup_counter; });
1909 pi2->OnStartup([pi2]() {
1910 LOG(INFO) << "Made pong";
1911 pi2->AlwaysStart<Pong>("pong");
1912 });
1913 pi2->OnStartup([&pi2_startup_counter]() { ++pi2_startup_counter; });
1914
1915 // Shutdown just counts.
Austin Schuh58646e22021-08-23 23:51:46 -07001916 pi1->OnShutdown([&pi1_shutdown_counter]() { ++pi1_shutdown_counter; });
1917 pi2->OnShutdown([&pi2_shutdown_counter]() { ++pi2_shutdown_counter; });
1918
Austin Schuh58646e22021-08-23 23:51:46 -07001919 // Automatically make counters on startup.
1920 pi1->OnStartup([&pi1_pong_counter, pi1]() {
1921 pi1_pong_counter = pi1->AlwaysStart<MessageCounter<examples::Pong>>(
1922 "pi1_pong_counter", "/test");
1923 });
1924 pi1->OnShutdown([&pi1_pong_counter]() { pi1_pong_counter = nullptr; });
1925 pi2->OnStartup([&pi2_ping_counter, pi2]() {
1926 pi2_ping_counter = pi2->AlwaysStart<MessageCounter<examples::Ping>>(
1927 "pi2_ping_counter", "/test");
1928 });
1929 pi2->OnShutdown([&pi2_ping_counter]() { pi2_ping_counter = nullptr; });
1930
1931 EXPECT_EQ(pi2_ping_counter, nullptr);
1932 EXPECT_EQ(pi1_pong_counter, nullptr);
1933
1934 EXPECT_EQ(pi1_startup_counter, 0u);
1935 EXPECT_EQ(pi2_startup_counter, 0u);
1936 EXPECT_EQ(pi1_shutdown_counter, 0u);
1937 EXPECT_EQ(pi2_shutdown_counter, 0u);
1938
1939 factory.RunFor(chrono::seconds(10) + chrono::milliseconds(5));
1940 EXPECT_EQ(pi1_startup_counter, 1u);
1941 EXPECT_EQ(pi2_startup_counter, 1u);
1942 EXPECT_EQ(pi1_shutdown_counter, 0u);
1943 EXPECT_EQ(pi2_shutdown_counter, 0u);
1944 EXPECT_EQ(pi2_ping_counter->count(), 1001);
1945 EXPECT_EQ(pi1_pong_counter->count(), 1001);
1946
1947 LOG(INFO) << pi1->monotonic_now();
1948 LOG(INFO) << pi2->monotonic_now();
1949
1950 factory.RunFor(chrono::seconds(5) + chrono::milliseconds(5));
1951
1952 EXPECT_EQ(pi1_startup_counter, 2u);
1953 EXPECT_EQ(pi2_startup_counter, 2u);
1954 EXPECT_EQ(pi1_shutdown_counter, 1u);
1955 EXPECT_EQ(pi2_shutdown_counter, 1u);
1956 EXPECT_EQ(pi2_ping_counter->count(), 501);
1957 EXPECT_EQ(pi1_pong_counter->count(), 501);
1958}
1959
1960// Tests that OnStartup handlers can be added after running and get called, and
1961// can't be called when running.
1962TEST(SimulatedEventLoopDeathTest, OnStartupWhileRunning) {
1963 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
1964 aos::configuration::ReadConfig(
1965 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
1966
1967 // Test that we can add startup handlers as long as we aren't running, and
1968 // they get run when Run gets called again.
1969 // Test that adding a startup handler when running fails.
1970 //
1971 // Test shutdown handlers get called on destruction.
1972 SimulatedEventLoopFactory factory(&config.message());
1973
1974 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1975
1976 int startup_count0 = 0;
1977 int startup_count1 = 0;
1978
1979 pi1->OnStartup([&]() { ++startup_count0; });
1980 EXPECT_EQ(startup_count0, 0);
1981 EXPECT_EQ(startup_count1, 0);
1982
1983 factory.RunFor(chrono::nanoseconds(1));
1984 EXPECT_EQ(startup_count0, 1);
1985 EXPECT_EQ(startup_count1, 0);
1986
1987 pi1->OnStartup([&]() { ++startup_count1; });
1988 EXPECT_EQ(startup_count0, 1);
1989 EXPECT_EQ(startup_count1, 0);
1990
1991 factory.RunFor(chrono::nanoseconds(1));
1992 EXPECT_EQ(startup_count0, 1);
1993 EXPECT_EQ(startup_count1, 1);
1994
1995 std::unique_ptr<EventLoop> loop = pi1->MakeEventLoop("foo");
1996 loop->OnRun([&]() { pi1->OnStartup([]() {}); });
1997
1998 EXPECT_DEATH({ factory.RunFor(chrono::nanoseconds(1)); },
1999 "Can only register OnStartup handlers when not running.");
2000}
2001
2002// Tests that OnStartup handlers can be added after running and get called, and
2003// all the handlers get called on reboot. Shutdown handlers are tested the same
2004// way.
2005TEST(SimulatedEventLoopTest, OnStartupShutdownAllRestarts) {
2006 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2007 aos::configuration::ReadConfig(
2008 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2009
Austin Schuh72e65682021-09-02 11:37:05 -07002010 int startup_count0 = 0;
2011 int shutdown_count0 = 0;
2012 int startup_count1 = 0;
2013 int shutdown_count1 = 0;
2014
Austin Schuh58646e22021-08-23 23:51:46 -07002015 message_bridge::TestingTimeConverter time(
2016 configuration::NodesCount(&config.message()));
2017 SimulatedEventLoopFactory factory(&config.message());
2018 factory.SetTimeConverter(&time);
2019 time.StartEqual();
2020
2021 const chrono::nanoseconds dt = chrono::seconds(10);
2022 time.RebootAt(0, distributed_clock::epoch() + dt);
2023 time.RebootAt(0, distributed_clock::epoch() + 2 * dt);
2024
2025 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2026
Austin Schuh58646e22021-08-23 23:51:46 -07002027 pi1->OnStartup([&]() { ++startup_count0; });
2028 pi1->OnShutdown([&]() { ++shutdown_count0; });
2029 EXPECT_EQ(startup_count0, 0);
2030 EXPECT_EQ(startup_count1, 0);
2031 EXPECT_EQ(shutdown_count0, 0);
2032 EXPECT_EQ(shutdown_count1, 0);
2033
2034 factory.RunFor(chrono::nanoseconds(1));
2035 EXPECT_EQ(startup_count0, 1);
2036 EXPECT_EQ(startup_count1, 0);
2037 EXPECT_EQ(shutdown_count0, 0);
2038 EXPECT_EQ(shutdown_count1, 0);
2039
2040 pi1->OnStartup([&]() { ++startup_count1; });
2041 EXPECT_EQ(startup_count0, 1);
2042 EXPECT_EQ(startup_count1, 0);
2043 EXPECT_EQ(shutdown_count0, 0);
2044 EXPECT_EQ(shutdown_count1, 0);
2045
2046 factory.RunFor(chrono::nanoseconds(1));
2047 EXPECT_EQ(startup_count0, 1);
2048 EXPECT_EQ(startup_count1, 1);
2049 EXPECT_EQ(shutdown_count0, 0);
2050 EXPECT_EQ(shutdown_count1, 0);
2051
2052 factory.RunFor(chrono::seconds(15));
2053
2054 EXPECT_EQ(startup_count0, 2);
2055 EXPECT_EQ(startup_count1, 2);
2056 EXPECT_EQ(shutdown_count0, 1);
2057 EXPECT_EQ(shutdown_count1, 0);
2058
2059 pi1->OnShutdown([&]() { ++shutdown_count1; });
2060 factory.RunFor(chrono::seconds(10));
2061
2062 EXPECT_EQ(startup_count0, 3);
2063 EXPECT_EQ(startup_count1, 3);
2064 EXPECT_EQ(shutdown_count0, 2);
2065 EXPECT_EQ(shutdown_count1, 1);
2066}
2067
2068// Tests that event loops which outlive shutdown crash.
2069TEST(SimulatedEventLoopDeathTest, EventLoopOutlivesReboot) {
2070 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2071 aos::configuration::ReadConfig(
2072 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2073
2074 message_bridge::TestingTimeConverter time(
2075 configuration::NodesCount(&config.message()));
2076 SimulatedEventLoopFactory factory(&config.message());
2077 factory.SetTimeConverter(&time);
2078 time.StartEqual();
2079
2080 const chrono::nanoseconds dt = chrono::seconds(10);
2081 time.RebootAt(0, distributed_clock::epoch() + dt);
2082
2083 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2084
2085 std::unique_ptr<EventLoop> loop = pi1->MakeEventLoop("foo");
2086
2087 EXPECT_DEATH({ factory.RunFor(dt * 2); }, "Event loop");
2088}
2089
2090// Tests that messages don't survive a reboot of a node.
2091TEST(SimulatedEventLoopTest, ChannelClearedOnReboot) {
2092 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2093 aos::configuration::ReadConfig(
2094 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2095
2096 message_bridge::TestingTimeConverter time(
2097 configuration::NodesCount(&config.message()));
2098 SimulatedEventLoopFactory factory(&config.message());
2099 factory.SetTimeConverter(&time);
2100 time.StartEqual();
2101
2102 const chrono::nanoseconds dt = chrono::seconds(10);
2103 time.RebootAt(0, distributed_clock::epoch() + dt);
2104
2105 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2106
2107 const UUID boot_uuid = pi1->boot_uuid();
2108 EXPECT_NE(boot_uuid, UUID::Zero());
2109
2110 {
2111 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
2112 aos::Sender<examples::Ping> test_message_sender =
2113 ping_event_loop->MakeSender<examples::Ping>("/reliable");
2114 SendPing(&test_message_sender, 1);
2115 }
2116
2117 factory.RunFor(chrono::seconds(5));
2118
2119 {
2120 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
2121 aos::Fetcher<examples::Ping> fetcher =
2122 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
2123 EXPECT_TRUE(fetcher.Fetch());
2124 }
2125
2126 factory.RunFor(chrono::seconds(10));
2127
2128 {
2129 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
2130 aos::Fetcher<examples::Ping> fetcher =
2131 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
2132 EXPECT_FALSE(fetcher.Fetch());
2133 }
2134 EXPECT_NE(boot_uuid, pi1->boot_uuid());
2135}
2136
2137// Tests that reliable messages get resent on reboot.
2138TEST(SimulatedEventLoopTest, ReliableMessageResentOnReboot) {
2139 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2140 aos::configuration::ReadConfig(
2141 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2142
2143 message_bridge::TestingTimeConverter time(
2144 configuration::NodesCount(&config.message()));
2145 SimulatedEventLoopFactory factory(&config.message());
2146 factory.SetTimeConverter(&time);
2147 time.StartEqual();
2148
2149 const chrono::nanoseconds dt = chrono::seconds(1);
2150 time.RebootAt(1, distributed_clock::epoch() + dt);
2151
2152 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2153 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2154
2155 const UUID pi1_boot_uuid = pi1->boot_uuid();
2156 const UUID pi2_boot_uuid = pi2->boot_uuid();
2157 EXPECT_NE(pi1_boot_uuid, UUID::Zero());
2158 EXPECT_NE(pi2_boot_uuid, UUID::Zero());
2159
2160 {
2161 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
2162 aos::Sender<examples::Ping> test_message_sender =
2163 ping_event_loop->MakeSender<examples::Ping>("/reliable");
2164 SendPing(&test_message_sender, 1);
2165 }
2166
2167 factory.RunFor(chrono::milliseconds(500));
2168
2169 {
2170 ::std::unique_ptr<EventLoop> ping_event_loop = pi2->MakeEventLoop("pong");
2171 aos::Fetcher<examples::Ping> fetcher =
2172 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
2173 EXPECT_TRUE(fetcher.Fetch());
2174 }
2175
2176 factory.RunFor(chrono::seconds(1));
2177
2178 {
2179 ::std::unique_ptr<EventLoop> ping_event_loop = pi2->MakeEventLoop("pong");
2180 aos::Fetcher<examples::Ping> fetcher =
2181 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
2182 EXPECT_TRUE(fetcher.Fetch());
2183 }
2184 EXPECT_NE(pi2_boot_uuid, pi2->boot_uuid());
2185}
2186
Austin Schuh48205e62021-11-12 14:13:18 -08002187class SimulatedEventLoopDisconnectTest : public ::testing::Test {
2188 public:
2189 SimulatedEventLoopDisconnectTest()
2190 : config(aos::configuration::ReadConfig(ArtifactPath(
2191 "aos/events/multinode_pingpong_test_split_config.json"))),
2192 time(configuration::NodesCount(&config.message())),
2193 factory(&config.message()) {
2194 factory.SetTimeConverter(&time);
2195 }
2196
2197 void VerifyChannels(std::set<const aos::Channel *> statistics_channels,
2198 const monotonic_clock::time_point allowable_message_time,
2199 std::set<const aos::Node *> empty_nodes) {
2200 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2201 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2202 std::unique_ptr<aos::EventLoop> pi1_event_loop =
2203 pi1->MakeEventLoop("fetcher");
2204 std::unique_ptr<aos::EventLoop> pi2_event_loop =
2205 pi2->MakeEventLoop("fetcher");
2206 for (const aos::Channel *channel : *factory.configuration()->channels()) {
2207 if (configuration::ChannelIsReadableOnNode(channel,
2208 pi1_event_loop->node())) {
2209 std::unique_ptr<aos::RawFetcher> fetcher =
2210 pi1_event_loop->MakeRawFetcher(channel);
2211 if (statistics_channels.find(channel) == statistics_channels.end() ||
2212 empty_nodes.find(pi1_event_loop->node()) != empty_nodes.end()) {
2213 EXPECT_FALSE(fetcher->Fetch() &&
2214 fetcher->context().monotonic_event_time >
2215 allowable_message_time)
2216 << ": Found recent message on channel "
2217 << configuration::CleanedChannelToString(channel) << " and time "
2218 << fetcher->context().monotonic_event_time << " > "
2219 << allowable_message_time << " on pi1";
2220 } else {
2221 EXPECT_TRUE(fetcher->Fetch() &&
2222 fetcher->context().monotonic_event_time >=
2223 allowable_message_time)
2224 << ": Didn't find recent message on channel "
2225 << configuration::CleanedChannelToString(channel) << " on pi1";
2226 }
2227 }
2228 if (configuration::ChannelIsReadableOnNode(channel,
2229 pi2_event_loop->node())) {
2230 std::unique_ptr<aos::RawFetcher> fetcher =
2231 pi2_event_loop->MakeRawFetcher(channel);
2232 if (statistics_channels.find(channel) == statistics_channels.end() ||
2233 empty_nodes.find(pi2_event_loop->node()) != empty_nodes.end()) {
2234 EXPECT_FALSE(fetcher->Fetch() &&
2235 fetcher->context().monotonic_event_time >
2236 allowable_message_time)
2237 << ": Found message on channel "
2238 << configuration::CleanedChannelToString(channel) << " and time "
2239 << fetcher->context().monotonic_event_time << " > "
2240 << allowable_message_time << " on pi2";
2241 } else {
2242 EXPECT_TRUE(fetcher->Fetch() &&
2243 fetcher->context().monotonic_event_time >=
2244 allowable_message_time)
2245 << ": Didn't find message on channel "
2246 << configuration::CleanedChannelToString(channel) << " on pi2";
2247 }
2248 }
2249 }
2250 }
2251
2252 aos::FlatbufferDetachedBuffer<aos::Configuration> config;
2253
2254 message_bridge::TestingTimeConverter time;
2255 SimulatedEventLoopFactory factory;
2256};
2257
2258// Tests that if we have message bridge client/server disabled, and timing
2259// reports disabled, no messages are sent. Also tests that we can disconnect a
2260// node and disable statistics on it and it actually fully disconnects.
2261TEST_F(SimulatedEventLoopDisconnectTest, NoMessagesWhenDisabled) {
2262 time.StartEqual();
2263 factory.SkipTimingReport();
2264 factory.DisableStatistics();
2265
2266 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2267 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2268
2269 std::unique_ptr<aos::EventLoop> pi1_event_loop =
2270 pi1->MakeEventLoop("fetcher");
2271 std::unique_ptr<aos::EventLoop> pi2_event_loop =
2272 pi2->MakeEventLoop("fetcher");
2273
2274 factory.RunFor(chrono::milliseconds(100000));
2275
2276 // Confirm no messages are sent if we've configured them all off.
2277 VerifyChannels({}, monotonic_clock::min_time, {});
2278
2279 // Now, confirm that all the message_bridge channels come back when we
2280 // re-enable.
2281 factory.EnableStatistics();
2282
2283 factory.RunFor(chrono::milliseconds(10050));
2284
2285 // Build up the list of all the messages we expect when we come back.
2286 {
2287 std::set<const aos::Channel *> statistics_channels;
Brian Silverman18c8aac2022-01-02 00:03:37 -08002288 for (const std::pair<std::string_view, const Node *> &pi :
Austin Schuh48205e62021-11-12 14:13:18 -08002289 std::vector<std::pair<std::string_view, const Node *>>{
2290 {"/pi1/aos", pi1->node()},
2291 {"/pi2/aos", pi1->node()},
2292 {"/pi3/aos", pi1->node()}}) {
2293 statistics_channels.insert(configuration::GetChannel(
2294 factory.configuration(), pi.first, "aos.message_bridge.Timestamp", "",
2295 pi.second));
2296 statistics_channels.insert(configuration::GetChannel(
2297 factory.configuration(), pi.first,
2298 "aos.message_bridge.ServerStatistics", "", pi.second));
2299 statistics_channels.insert(configuration::GetChannel(
2300 factory.configuration(), pi.first,
2301 "aos.message_bridge.ClientStatistics", "", pi.second));
2302 }
2303
2304 statistics_channels.insert(configuration::GetChannel(
2305 factory.configuration(),
2306 "/pi1/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp",
2307 "aos.message_bridge.RemoteMessage", "", pi1->node()));
2308 statistics_channels.insert(configuration::GetChannel(
2309 factory.configuration(),
2310 "/pi2/aos/remote_timestamps/pi1/pi2/aos/aos-message_bridge-Timestamp",
2311 "aos.message_bridge.RemoteMessage", "", pi2->node()));
2312 VerifyChannels(statistics_channels, monotonic_clock::min_time, {});
2313 }
2314
2315 // Now test that we can disable the messages for a single node
2316 pi2->DisableStatistics();
2317 const aos::monotonic_clock::time_point statistics_disable_time =
2318 pi2->monotonic_now();
2319 factory.RunFor(chrono::milliseconds(10000));
2320
2321 // We should see a much smaller set of messages, but should still see messages
2322 // forwarded, mainly the timestamp message.
2323 {
2324 std::set<const aos::Channel *> statistics_channels;
Brian Silverman18c8aac2022-01-02 00:03:37 -08002325 for (const std::pair<std::string_view, const Node *> &pi :
Austin Schuh48205e62021-11-12 14:13:18 -08002326 std::vector<std::pair<std::string_view, const Node *>>{
2327 {"/pi1/aos", pi1->node()}, {"/pi3/aos", pi1->node()}}) {
2328 statistics_channels.insert(configuration::GetChannel(
2329 factory.configuration(), pi.first, "aos.message_bridge.Timestamp", "",
2330 pi.second));
2331 statistics_channels.insert(configuration::GetChannel(
2332 factory.configuration(), pi.first,
2333 "aos.message_bridge.ServerStatistics", "", pi.second));
2334 statistics_channels.insert(configuration::GetChannel(
2335 factory.configuration(), pi.first,
2336 "aos.message_bridge.ClientStatistics", "", pi.second));
2337 }
2338
2339 statistics_channels.insert(configuration::GetChannel(
2340 factory.configuration(),
2341 "/pi1/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp",
2342 "aos.message_bridge.RemoteMessage", "", pi1->node()));
2343 VerifyChannels(statistics_channels, statistics_disable_time, {});
2344 }
2345
2346 // Now, fully disconnect the node. This will completely quiet down pi2.
2347 pi1->Disconnect(pi2->node());
2348 pi2->Disconnect(pi1->node());
2349
2350 const aos::monotonic_clock::time_point disconnect_disable_time =
2351 pi2->monotonic_now();
2352 factory.RunFor(chrono::milliseconds(10000));
2353
2354 {
2355 std::set<const aos::Channel *> statistics_channels;
Brian Silverman18c8aac2022-01-02 00:03:37 -08002356 for (const std::pair<std::string_view, const Node *> &pi :
Austin Schuh48205e62021-11-12 14:13:18 -08002357 std::vector<std::pair<std::string_view, const Node *>>{
2358 {"/pi1/aos", pi1->node()}, {"/pi3/aos", pi1->node()}}) {
2359 statistics_channels.insert(configuration::GetChannel(
2360 factory.configuration(), pi.first, "aos.message_bridge.Timestamp", "",
2361 pi.second));
2362 statistics_channels.insert(configuration::GetChannel(
2363 factory.configuration(), pi.first,
2364 "aos.message_bridge.ServerStatistics", "", pi.second));
2365 statistics_channels.insert(configuration::GetChannel(
2366 factory.configuration(), pi.first,
2367 "aos.message_bridge.ClientStatistics", "", pi.second));
2368 }
2369
2370 VerifyChannels(statistics_channels, disconnect_disable_time, {pi2->node()});
2371 }
2372}
2373
Neil Balchc8f41ed2018-01-20 22:06:53 -08002374} // namespace testing
2375} // namespace aos