blob: 429d39ce3b93e3227e418bd7548a09d9637dd64e [file] [log] [blame]
Alex Perrycb7da4b2019-08-28 19:35:56 -07001#include "aos/events/simulated_event_loop.h"
2
milind1f1dca32021-07-03 13:50:07 -07003#include <chrono>
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -07004#include <functional>
Austin Schuh5f1cc5c2019-12-01 18:01:11 -08005#include <string_view>
6
Philipp Schrader790cb542023-07-05 21:06:52 -07007#include "gtest/gtest.h"
8
Alex Perrycb7da4b2019-08-28 19:35:56 -07009#include "aos/events/event_loop_param_test.h"
Austin Schuhbf610b72024-04-04 20:04:55 -070010#include "aos/events/function_scheduler.h"
Austin Schuh2f8fd752020-09-01 22:38:28 -070011#include "aos/events/logging/logger_generated.h"
Austin Schuh01b4c352020-09-21 23:09:39 -070012#include "aos/events/message_counter.h"
Austin Schuh898f4972020-01-11 17:21:25 -080013#include "aos/events/ping_lib.h"
14#include "aos/events/pong_lib.h"
Austin Schuh7d87b672019-12-01 20:23:49 -080015#include "aos/events/test_message_generated.h"
Austin Schuh4c3b9702020-08-30 11:34:55 -070016#include "aos/network/message_bridge_client_generated.h"
17#include "aos/network/message_bridge_server_generated.h"
Austin Schuh0de30f32020-12-06 12:44:28 -080018#include "aos/network/remote_message_generated.h"
Austin Schuh87dd3832021-01-01 23:07:31 -080019#include "aos/network/testing_time_converter.h"
Austin Schuh4c3b9702020-08-30 11:34:55 -070020#include "aos/network/timestamp_generated.h"
Austin Schuh373f1762021-06-02 21:07:09 -070021#include "aos/testing/path.h"
Neil Balchc8f41ed2018-01-20 22:06:53 -080022
Stephan Pleinesf63bde82024-01-13 15:59:33 -080023namespace aos::testing {
Brian Silverman28d14302020-09-18 15:26:17 -070024namespace {
25
Austin Schuh373f1762021-06-02 21:07:09 -070026using aos::testing::ArtifactPath;
Brian Silverman28d14302020-09-18 15:26:17 -070027
Austin Schuh58646e22021-08-23 23:51:46 -070028using logger::BootTimestamp;
Austin Schuh0de30f32020-12-06 12:44:28 -080029using message_bridge::RemoteMessage;
Austin Schuh7267c532019-05-19 19:55:53 -070030namespace chrono = ::std::chrono;
31
Austin Schuh0de30f32020-12-06 12:44:28 -080032} // namespace
33
Neil Balchc8f41ed2018-01-20 22:06:53 -080034class SimulatedEventLoopTestFactory : public EventLoopTestFactory {
35 public:
Austin Schuh5f1cc5c2019-12-01 18:01:11 -080036 ::std::unique_ptr<EventLoop> Make(std::string_view name) override {
Austin Schuh217a9782019-12-21 23:02:50 -080037 MaybeMake();
Austin Schuhac0771c2020-01-07 18:36:30 -080038 return event_loop_factory_->MakeEventLoop(name, my_node());
Neil Balchc8f41ed2018-01-20 22:06:53 -080039 }
Austin Schuh5f1cc5c2019-12-01 18:01:11 -080040 ::std::unique_ptr<EventLoop> MakePrimary(std::string_view name) override {
Austin Schuh217a9782019-12-21 23:02:50 -080041 MaybeMake();
Austin Schuhac0771c2020-01-07 18:36:30 -080042 return event_loop_factory_->MakeEventLoop(name, my_node());
Austin Schuh44019f92019-05-19 19:58:27 -070043 }
44
Austin Schuh217a9782019-12-21 23:02:50 -080045 void Run() override { event_loop_factory_->Run(); }
46 void Exit() override { event_loop_factory_->Exit(); }
Austin Schuh44019f92019-05-19 19:58:27 -070047
Austin Schuh52d325c2019-06-23 18:59:06 -070048 // TODO(austin): Implement this. It's used currently for a phased loop test.
49 // I'm not sure how much that matters.
50 void SleepFor(::std::chrono::nanoseconds /*duration*/) override {}
51
Austin Schuh7d87b672019-12-01 20:23:49 -080052 void set_send_delay(std::chrono::nanoseconds send_delay) {
Austin Schuh217a9782019-12-21 23:02:50 -080053 MaybeMake();
54 event_loop_factory_->set_send_delay(send_delay);
Austin Schuh7d87b672019-12-01 20:23:49 -080055 }
56
Neil Balchc8f41ed2018-01-20 22:06:53 -080057 private:
Austin Schuh217a9782019-12-21 23:02:50 -080058 void MaybeMake() {
59 if (!event_loop_factory_) {
60 if (configuration()->has_nodes()) {
Austin Schuhac0771c2020-01-07 18:36:30 -080061 event_loop_factory_ =
62 std::make_unique<SimulatedEventLoopFactory>(configuration());
Austin Schuh217a9782019-12-21 23:02:50 -080063 } else {
64 event_loop_factory_ =
65 std::make_unique<SimulatedEventLoopFactory>(configuration());
66 }
67 }
68 }
69 std::unique_ptr<SimulatedEventLoopFactory> event_loop_factory_;
Neil Balchc8f41ed2018-01-20 22:06:53 -080070};
71
Austin Schuh6bae8252021-02-07 22:01:49 -080072auto CommonParameters() {
73 return ::testing::Combine(
74 ::testing::Values([]() { return new SimulatedEventLoopTestFactory(); }),
75 ::testing::Values(ReadMethod::COPY, ReadMethod::PIN),
76 ::testing::Values(DoTimingReports::kYes, DoTimingReports::kNo));
77}
Austin Schuh6b6dfa52019-06-12 20:16:20 -070078
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -070079INSTANTIATE_TEST_SUITE_P(SimulatedEventLoopCommonTest, AbstractEventLoopTest,
Austin Schuh66168842021-08-17 19:42:21 -070080 CommonParameters());
Brian Silverman77162972020-08-12 19:52:40 -070081
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -070082INSTANTIATE_TEST_SUITE_P(SimulatedEventLoopCommonDeathTest,
Austin Schuh66168842021-08-17 19:42:21 -070083 AbstractEventLoopDeathTest, CommonParameters());
Neil Balchc8f41ed2018-01-20 22:06:53 -080084
Austin Schuh89c9b812021-02-20 14:42:10 -080085// Parameters to run all the tests with.
86struct Param {
87 // The config file to use.
88 std::string config;
89 // If true, the RemoteMessage channel should be shared between all the remote
90 // channels. If false, there will be 1 RemoteMessage channel per remote
91 // channel.
92 bool shared;
93};
94
95class RemoteMessageSimulatedEventLoopTest
96 : public ::testing::TestWithParam<struct Param> {
97 public:
98 RemoteMessageSimulatedEventLoopTest()
99 : config(aos::configuration::ReadConfig(
Austin Schuh373f1762021-06-02 21:07:09 -0700100 ArtifactPath(absl::StrCat("aos/events/", GetParam().config)))) {
Austin Schuh89c9b812021-02-20 14:42:10 -0800101 LOG(INFO) << "Config " << GetParam().config;
102 }
103
104 bool shared() const { return GetParam().shared; }
105
106 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
107 MakePi2OnPi1MessageCounters(aos::EventLoop *event_loop) {
108 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>> counters;
109 if (shared()) {
110 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
111 event_loop, "/aos/remote_timestamps/pi2"));
112 } else {
113 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
114 event_loop,
115 "/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp"));
116 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
117 event_loop, "/aos/remote_timestamps/pi2/test/aos-examples-Ping"));
118 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
119 event_loop, "/aos/remote_timestamps/pi2/reliable/aos-examples-Ping"));
120 }
121 return counters;
122 }
123
124 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
125 MakePi1OnPi2MessageCounters(aos::EventLoop *event_loop) {
126 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>> counters;
127 if (shared()) {
128 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
129 event_loop, "/aos/remote_timestamps/pi1"));
130 } else {
131 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
132 event_loop, "/aos/remote_timestamps/pi1/test/aos-examples-Pong"));
133 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
134 event_loop,
135 "/aos/remote_timestamps/pi1/pi2/aos/aos-message_bridge-Timestamp"));
136 }
137 return counters;
138 }
139
140 aos::FlatbufferDetachedBuffer<aos::Configuration> config;
141};
142
Austin Schuh8fb315a2020-11-19 22:33:58 -0800143// Test that sending a message after running gets properly notified.
144TEST(SimulatedEventLoopTest, SendAfterRunFor) {
145 SimulatedEventLoopTestFactory factory;
146
147 SimulatedEventLoopFactory simulated_event_loop_factory(
148 factory.configuration());
149
150 ::std::unique_ptr<EventLoop> ping_event_loop =
151 simulated_event_loop_factory.MakeEventLoop("ping");
152 aos::Sender<TestMessage> test_message_sender =
153 ping_event_loop->MakeSender<TestMessage>("/test");
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700154 ASSERT_EQ(SendTestMessage(test_message_sender), RawSender::Error::kOk);
Austin Schuh8fb315a2020-11-19 22:33:58 -0800155
156 std::unique_ptr<EventLoop> pong1_event_loop =
157 simulated_event_loop_factory.MakeEventLoop("pong");
158 MessageCounter<TestMessage> test_message_counter1(pong1_event_loop.get(),
159 "/test");
160
161 EXPECT_FALSE(ping_event_loop->is_running());
162
163 // Watchers start when you start running, so there should be nothing counted.
164 simulated_event_loop_factory.RunFor(chrono::seconds(1));
165 EXPECT_EQ(test_message_counter1.count(), 0u);
166
167 std::unique_ptr<EventLoop> pong2_event_loop =
168 simulated_event_loop_factory.MakeEventLoop("pong");
169 MessageCounter<TestMessage> test_message_counter2(pong2_event_loop.get(),
170 "/test");
171
172 // Pauses in the middle don't count though, so this should be counted.
173 // But, the fresh watcher shouldn't pick it up yet.
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700174 ASSERT_EQ(SendTestMessage(test_message_sender), RawSender::Error::kOk);
Austin Schuh8fb315a2020-11-19 22:33:58 -0800175
176 EXPECT_EQ(test_message_counter1.count(), 0u);
177 EXPECT_EQ(test_message_counter2.count(), 0u);
178 simulated_event_loop_factory.RunFor(chrono::seconds(1));
179
180 EXPECT_EQ(test_message_counter1.count(), 1u);
181 EXPECT_EQ(test_message_counter2.count(), 0u);
182}
183
Austin Schuhd027bf52024-05-12 22:24:12 -0700184// Test that OnRun callbacks get deleted if the event loop gets deleted.
185TEST(SimulatedEventLoopTest, DestructEventLoopBeforeOnRun) {
186 SimulatedEventLoopTestFactory factory;
187
188 SimulatedEventLoopFactory simulated_event_loop_factory(
189 factory.configuration());
190
191 {
192 ::std::unique_ptr<EventLoop> test_event_loop =
193 simulated_event_loop_factory.MakeEventLoop("test");
194 test_event_loop->OnRun([]() { LOG(FATAL) << "Don't run this"; });
195 }
196
197 simulated_event_loop_factory.RunFor(chrono::seconds(1));
198}
199
200// Tests that the order event loops are created is the order that the OnRun
201// callbacks are run.
202TEST(SimulatedEventLoopTest, OnRunOrderFollowsConstructionOrder) {
203 SimulatedEventLoopTestFactory factory;
204
205 SimulatedEventLoopFactory simulated_event_loop_factory(
206 factory.configuration());
207
208 int count = 0;
209
210 std::unique_ptr<EventLoop> test1_event_loop =
211 simulated_event_loop_factory.MakeEventLoop("test1");
212 std::unique_ptr<EventLoop> test2_event_loop =
213 simulated_event_loop_factory.MakeEventLoop("test2");
214 test2_event_loop->OnRun([&count]() {
215 EXPECT_EQ(count, 1u);
216 ++count;
217 });
218 test1_event_loop->OnRun([&count]() {
219 EXPECT_EQ(count, 0u);
220 ++count;
221 });
222
223 simulated_event_loop_factory.RunFor(chrono::seconds(1));
224
225 EXPECT_EQ(count, 2u);
226}
227
228// Test that we can't register OnRun callbacks after starting.
229TEST(SimulatedEventLoopDeathTest, OnRunAfterRunning) {
230 SimulatedEventLoopTestFactory factory;
231
232 SimulatedEventLoopFactory simulated_event_loop_factory(
233 factory.configuration());
234
235 std::unique_ptr<EventLoop> test_event_loop =
236 simulated_event_loop_factory.MakeEventLoop("test");
237 test_event_loop->OnRun([]() {});
238
239 simulated_event_loop_factory.RunFor(chrono::seconds(1));
240
241 EXPECT_DEATH(test_event_loop->OnRun([]() {}), "OnRun");
242}
243
Austin Schuh60e77942022-05-16 17:48:24 -0700244// Test that if we configure an event loop to be able to send too fast that we
245// do allow it to do so.
James Kuszmaul890c2492022-04-06 14:59:31 -0700246TEST(SimulatedEventLoopTest, AllowSendTooFast) {
247 SimulatedEventLoopTestFactory factory;
248
249 SimulatedEventLoopFactory simulated_event_loop_factory(
250 factory.configuration());
251
252 // Create two event loops: One will be allowed to send too fast, one won't. We
253 // will then test to ensure that the one that is allowed to send too fast can
254 // indeed send too fast, but that it then makes it so that the second event
255 // loop can no longer send anything because *it* is still limited.
256 ::std::unique_ptr<EventLoop> too_fast_event_loop =
257 simulated_event_loop_factory.GetNodeEventLoopFactory(nullptr)
258 ->MakeEventLoop("too_fast_sender",
259 {NodeEventLoopFactory::CheckSentTooFast::kNo,
James Kuszmaul94ca5132022-07-19 09:11:08 -0700260 NodeEventLoopFactory::ExclusiveSenders::kNo,
261 {}});
James Kuszmaul890c2492022-04-06 14:59:31 -0700262 aos::Sender<TestMessage> too_fast_message_sender =
263 too_fast_event_loop->MakeSender<TestMessage>("/test");
264
265 ::std::unique_ptr<EventLoop> limited_event_loop =
266 simulated_event_loop_factory.MakeEventLoop("limited_sender");
267 aos::Sender<TestMessage> limited_message_sender =
268 limited_event_loop->MakeSender<TestMessage>("/test");
269
270 const int queue_size = TestChannelQueueSize(too_fast_event_loop.get());
271 for (int ii = 0; ii < queue_size; ++ii) {
272 ASSERT_EQ(SendTestMessage(too_fast_message_sender), RawSender::Error::kOk);
273 }
274 // And now we should start being in the sending-too-fast phase.
275 for (int ii = 0; ii < queue_size; ++ii) {
276 ASSERT_EQ(SendTestMessage(too_fast_message_sender), RawSender::Error::kOk);
Austin Schuh60e77942022-05-16 17:48:24 -0700277 ASSERT_EQ(SendTestMessage(limited_message_sender),
278 RawSender::Error::kMessagesSentTooFast);
James Kuszmaul890c2492022-04-06 14:59:31 -0700279 }
280}
281
282// Test that if we setup an exclusive sender that it is indeed exclusive.
283TEST(SimulatedEventLoopDeathTest, ExclusiveSenders) {
284 SimulatedEventLoopTestFactory factory;
285
286 SimulatedEventLoopFactory simulated_event_loop_factory(
287 factory.configuration());
288
289 ::std::unique_ptr<EventLoop> exclusive_event_loop =
290 simulated_event_loop_factory.GetNodeEventLoopFactory(nullptr)
James Kuszmaul94ca5132022-07-19 09:11:08 -0700291 ->MakeEventLoop(
292 "too_fast_sender",
293 {NodeEventLoopFactory::CheckSentTooFast::kYes,
294 NodeEventLoopFactory::ExclusiveSenders::kYes,
295 {{configuration::GetChannel(factory.configuration(), "/test1",
296 "aos.TestMessage", "", nullptr),
297 NodeEventLoopFactory::ExclusiveSenders::kNo}}});
James Kuszmaul890c2492022-04-06 14:59:31 -0700298 exclusive_event_loop->SkipAosLog();
299 exclusive_event_loop->SkipTimingReport();
300 ::std::unique_ptr<EventLoop> normal_event_loop =
301 simulated_event_loop_factory.MakeEventLoop("limited_sender");
302 // Set things up to have the exclusive sender be destroyed so we can test
303 // recovery.
304 {
305 aos::Sender<TestMessage> exclusive_sender =
306 exclusive_event_loop->MakeSender<TestMessage>("/test");
307
308 EXPECT_DEATH(normal_event_loop->MakeSender<TestMessage>("/test"),
309 "TestMessage");
310 }
311 // This one should succeed now that the exclusive channel is removed.
312 aos::Sender<TestMessage> normal_sender =
313 normal_event_loop->MakeSender<TestMessage>("/test");
Austin Schuh60e77942022-05-16 17:48:24 -0700314 EXPECT_DEATH(exclusive_event_loop->MakeSender<TestMessage>("/test"),
315 "TestMessage");
James Kuszmaul94ca5132022-07-19 09:11:08 -0700316
317 // And check an explicitly exempted channel:
318 aos::Sender<TestMessage> non_exclusive_sender =
319 exclusive_event_loop->MakeSender<TestMessage>("/test1");
320 aos::Sender<TestMessage> non_exclusive_sender_regular_event_loop =
321 normal_event_loop->MakeSender<TestMessage>("/test1");
James Kuszmaul890c2492022-04-06 14:59:31 -0700322}
323
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700324void TestSentTooFastCheckEdgeCase(
325 const std::function<RawSender::Error(int, int)> expected_err,
326 const bool send_twice_at_end) {
327 SimulatedEventLoopTestFactory factory;
328
329 auto event_loop = factory.MakePrimary("primary");
330
331 auto sender = event_loop->MakeSender<TestMessage>("/test");
332
333 const int queue_size = TestChannelQueueSize(event_loop.get());
334 int msgs_sent = 0;
335 event_loop->AddPhasedLoop(
336 [&](int) {
337 EXPECT_EQ(SendTestMessage(sender), expected_err(msgs_sent, queue_size));
338 msgs_sent++;
339
340 // If send_twice_at_end, send the last two messages (message
341 // queue_size and queue_size + 1) in the same iteration, meaning that
342 // we would be sending very slightly too fast. Otherwise, we will send
343 // message queue_size + 1 in the next iteration and we will continue
344 // to be sending exactly at the channel frequency.
345 if (send_twice_at_end && (msgs_sent == queue_size)) {
346 EXPECT_EQ(SendTestMessage(sender),
347 expected_err(msgs_sent, queue_size));
348 msgs_sent++;
349 }
350
351 if (msgs_sent > queue_size) {
352 factory.Exit();
353 }
354 },
355 std::chrono::duration_cast<std::chrono::nanoseconds>(
356 std::chrono::duration<double>(
357 1.0 / TestChannelFrequency(event_loop.get()))));
358
359 factory.Run();
360}
361
362// Tests that RawSender::Error::kMessagesSentTooFast is not returned
363// when messages are sent at the exact frequency of the channel.
364TEST(SimulatedEventLoopTest, SendingAtExactlyChannelFrequency) {
365 TestSentTooFastCheckEdgeCase([](int, int) { return RawSender::Error::kOk; },
366 false);
367}
368
369// Tests that RawSender::Error::kMessagesSentTooFast is returned
370// when sending exactly one more message than allowed in a channel storage
371// duration.
372TEST(SimulatedEventLoopTest, SendingSlightlyTooFast) {
373 TestSentTooFastCheckEdgeCase(
374 [](const int msgs_sent, const int queue_size) {
375 return (msgs_sent == queue_size ? RawSender::Error::kMessagesSentTooFast
376 : RawSender::Error::kOk);
377 },
378 true);
379}
380
Austin Schuh8fb315a2020-11-19 22:33:58 -0800381// Test that creating an event loop while running dies.
382TEST(SimulatedEventLoopDeathTest, MakeEventLoopWhileRunning) {
383 SimulatedEventLoopTestFactory factory;
384
385 SimulatedEventLoopFactory simulated_event_loop_factory(
386 factory.configuration());
387
388 ::std::unique_ptr<EventLoop> event_loop =
389 simulated_event_loop_factory.MakeEventLoop("ping");
390
391 auto timer = event_loop->AddTimer([&]() {
392 EXPECT_DEATH(
393 {
394 ::std::unique_ptr<EventLoop> event_loop2 =
395 simulated_event_loop_factory.MakeEventLoop("ping");
396 },
397 "event loop while running");
398 simulated_event_loop_factory.Exit();
399 });
400
401 event_loop->OnRun([&event_loop, &timer] {
Philipp Schradera6712522023-07-05 20:25:11 -0700402 timer->Schedule(event_loop->monotonic_now() + chrono::milliseconds(50));
Austin Schuh8fb315a2020-11-19 22:33:58 -0800403 });
404
405 simulated_event_loop_factory.Run();
406}
407
408// Test that creating a watcher after running dies.
409TEST(SimulatedEventLoopDeathTest, MakeWatcherAfterRunning) {
410 SimulatedEventLoopTestFactory factory;
411
412 SimulatedEventLoopFactory simulated_event_loop_factory(
413 factory.configuration());
414
415 ::std::unique_ptr<EventLoop> event_loop =
416 simulated_event_loop_factory.MakeEventLoop("ping");
417
418 simulated_event_loop_factory.RunFor(chrono::seconds(1));
419
420 EXPECT_DEATH(
421 { MessageCounter<TestMessage> counter(event_loop.get(), "/test"); },
422 "Can't add a watcher after running");
423
424 ::std::unique_ptr<EventLoop> event_loop2 =
425 simulated_event_loop_factory.MakeEventLoop("ping");
426
427 simulated_event_loop_factory.RunFor(chrono::seconds(1));
428
429 EXPECT_DEATH(
430 { MessageCounter<TestMessage> counter(event_loop2.get(), "/test"); },
431 "Can't add a watcher after running");
432}
433
Austin Schuh44019f92019-05-19 19:58:27 -0700434// Test that running for a time period with no handlers causes time to progress
435// correctly.
436TEST(SimulatedEventLoopTest, RunForNoHandlers) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800437 SimulatedEventLoopTestFactory factory;
438
439 SimulatedEventLoopFactory simulated_event_loop_factory(
440 factory.configuration());
Austin Schuh44019f92019-05-19 19:58:27 -0700441 ::std::unique_ptr<EventLoop> event_loop =
Austin Schuh5f1cc5c2019-12-01 18:01:11 -0800442 simulated_event_loop_factory.MakeEventLoop("loop");
Austin Schuh44019f92019-05-19 19:58:27 -0700443
444 simulated_event_loop_factory.RunFor(chrono::seconds(1));
445
446 EXPECT_EQ(::aos::monotonic_clock::epoch() + chrono::seconds(1),
Austin Schuh44019f92019-05-19 19:58:27 -0700447 event_loop->monotonic_now());
448}
449
450// Test that running for a time with a periodic handler causes time to end
451// correctly.
452TEST(SimulatedEventLoopTest, RunForTimerHandler) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800453 SimulatedEventLoopTestFactory factory;
454
455 SimulatedEventLoopFactory simulated_event_loop_factory(
456 factory.configuration());
Austin Schuh44019f92019-05-19 19:58:27 -0700457 ::std::unique_ptr<EventLoop> event_loop =
Austin Schuh5f1cc5c2019-12-01 18:01:11 -0800458 simulated_event_loop_factory.MakeEventLoop("loop");
Austin Schuh44019f92019-05-19 19:58:27 -0700459
460 int counter = 0;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700461 auto timer = event_loop->AddTimer([&counter]() { ++counter; });
Austin Schuh44019f92019-05-19 19:58:27 -0700462 event_loop->OnRun([&event_loop, &timer] {
Philipp Schradera6712522023-07-05 20:25:11 -0700463 timer->Schedule(event_loop->monotonic_now() + chrono::milliseconds(50),
464 chrono::milliseconds(100));
Austin Schuh44019f92019-05-19 19:58:27 -0700465 });
466
467 simulated_event_loop_factory.RunFor(chrono::seconds(1));
468
469 EXPECT_EQ(::aos::monotonic_clock::epoch() + chrono::seconds(1),
Austin Schuh44019f92019-05-19 19:58:27 -0700470 event_loop->monotonic_now());
471 EXPECT_EQ(counter, 10);
472}
473
Austin Schuh7d87b672019-12-01 20:23:49 -0800474// Tests that watchers have latency in simulation.
475TEST(SimulatedEventLoopTest, WatcherTimingReport) {
476 SimulatedEventLoopTestFactory factory;
477 factory.set_send_delay(std::chrono::microseconds(50));
478
479 FLAGS_timing_report_ms = 1000;
480 auto loop1 = factory.MakePrimary("primary");
481 loop1->MakeWatcher("/test", [](const TestMessage &) {});
482
483 auto loop2 = factory.Make("sender_loop");
484
485 auto loop3 = factory.Make("report_fetcher");
486
487 Fetcher<timing::Report> report_fetcher =
488 loop3->MakeFetcher<timing::Report>("/aos");
489 EXPECT_FALSE(report_fetcher.Fetch());
490
491 auto sender = loop2->MakeSender<TestMessage>("/test");
492
493 // Send 10 messages in the middle of a timing report period so we get
494 // something interesting back.
495 auto test_timer = loop2->AddTimer([&sender]() {
496 for (int i = 0; i < 10; ++i) {
497 aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
498 TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
499 builder.add_value(200 + i);
milind1f1dca32021-07-03 13:50:07 -0700500 msg.CheckOk(msg.Send(builder.Finish()));
Austin Schuh7d87b672019-12-01 20:23:49 -0800501 }
502 });
503
504 // Quit after 1 timing report, mid way through the next cycle.
505 {
506 auto end_timer = loop1->AddTimer([&factory]() { factory.Exit(); });
Philipp Schradera6712522023-07-05 20:25:11 -0700507 end_timer->Schedule(loop1->monotonic_now() + chrono::milliseconds(2500));
Austin Schuh7d87b672019-12-01 20:23:49 -0800508 end_timer->set_name("end");
509 }
510
511 loop1->OnRun([&test_timer, &loop1]() {
Philipp Schradera6712522023-07-05 20:25:11 -0700512 test_timer->Schedule(loop1->monotonic_now() + chrono::milliseconds(1500));
Austin Schuh7d87b672019-12-01 20:23:49 -0800513 });
514
515 factory.Run();
516
517 // And, since we are here, check that the timing report makes sense.
518 // Start by looking for our event loop's timing.
519 FlatbufferDetachedBuffer<timing::Report> primary_report =
520 FlatbufferDetachedBuffer<timing::Report>::Empty();
521 while (report_fetcher.FetchNext()) {
522 LOG(INFO) << "Report " << FlatbufferToJson(report_fetcher.get());
523 if (report_fetcher->name()->string_view() == "primary") {
524 primary_report = CopyFlatBuffer(report_fetcher.get());
525 }
526 }
527
528 // Check the watcher report.
Ravago Jonescf453ab2020-05-06 21:14:53 -0700529 VLOG(1) << FlatbufferToJson(primary_report, {.multi_line = true});
Austin Schuh7d87b672019-12-01 20:23:49 -0800530
531 EXPECT_EQ(primary_report.message().name()->string_view(), "primary");
532
533 // Just the timing report timer.
534 ASSERT_NE(primary_report.message().timers(), nullptr);
535 EXPECT_EQ(primary_report.message().timers()->size(), 2);
536
537 // No phased loops
538 ASSERT_EQ(primary_report.message().phased_loops(), nullptr);
539
540 // And now confirm that the watcher received all 10 messages, and has latency.
541 ASSERT_NE(primary_report.message().watchers(), nullptr);
542 ASSERT_EQ(primary_report.message().watchers()->size(), 1);
543 EXPECT_EQ(primary_report.message().watchers()->Get(0)->count(), 10);
544 EXPECT_NEAR(
545 primary_report.message().watchers()->Get(0)->wakeup_latency()->average(),
546 0.00005, 1e-9);
547 EXPECT_NEAR(
548 primary_report.message().watchers()->Get(0)->wakeup_latency()->min(),
549 0.00005, 1e-9);
550 EXPECT_NEAR(
551 primary_report.message().watchers()->Get(0)->wakeup_latency()->max(),
552 0.00005, 1e-9);
553 EXPECT_EQ(primary_report.message()
554 .watchers()
555 ->Get(0)
556 ->wakeup_latency()
557 ->standard_deviation(),
558 0.0);
559
560 EXPECT_EQ(
561 primary_report.message().watchers()->Get(0)->handler_time()->average(),
562 0.0);
563 EXPECT_EQ(primary_report.message().watchers()->Get(0)->handler_time()->min(),
564 0.0);
565 EXPECT_EQ(primary_report.message().watchers()->Get(0)->handler_time()->max(),
566 0.0);
567 EXPECT_EQ(primary_report.message()
568 .watchers()
569 ->Get(0)
570 ->handler_time()
571 ->standard_deviation(),
572 0.0);
573}
574
Austin Schuh89c9b812021-02-20 14:42:10 -0800575size_t CountAll(
576 const std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
577 &counters) {
578 size_t count = 0u;
579 for (const std::unique_ptr<MessageCounter<RemoteMessage>> &counter :
580 counters) {
581 count += counter->count();
582 }
583 return count;
584}
585
Austin Schuh4c3b9702020-08-30 11:34:55 -0700586// Tests that ping and pong work when on 2 different nodes, and the message
587// gateway messages are sent out as expected.
Austin Schuh89c9b812021-02-20 14:42:10 -0800588TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodePingPong) {
Austin Schuh898f4972020-01-11 17:21:25 -0800589 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
590 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
Austin Schuh4c3b9702020-08-30 11:34:55 -0700591 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
Austin Schuh898f4972020-01-11 17:21:25 -0800592
593 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
594
595 std::unique_ptr<EventLoop> ping_event_loop =
596 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
597 Ping ping(ping_event_loop.get());
598
599 std::unique_ptr<EventLoop> pong_event_loop =
600 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
601 Pong pong(pong_event_loop.get());
602
603 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
604 simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700605 MessageCounter<examples::Pong> pi2_pong_counter(
606 pi2_pong_counter_event_loop.get(), "/test");
Austin Schuh2f8fd752020-09-01 22:38:28 -0700607 aos::Fetcher<message_bridge::Timestamp> pi1_on_pi2_timestamp_fetcher =
608 pi2_pong_counter_event_loop->MakeFetcher<message_bridge::Timestamp>(
609 "/pi1/aos");
610 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
611 pi2_pong_counter_event_loop->MakeFetcher<examples::Ping>("/test");
Austin Schuh898f4972020-01-11 17:21:25 -0800612
Austin Schuh4c3b9702020-08-30 11:34:55 -0700613 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
614 simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
Austin Schuh898f4972020-01-11 17:21:25 -0800615
616 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
617 simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700618 MessageCounter<examples::Pong> pi1_pong_counter(
619 pi1_pong_counter_event_loop.get(), "/test");
Austin Schuh2f8fd752020-09-01 22:38:28 -0700620 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
621 pi1_pong_counter_event_loop->MakeFetcher<examples::Ping>("/test");
622 aos::Fetcher<message_bridge::Timestamp> pi1_on_pi1_timestamp_fetcher =
623 pi1_pong_counter_event_loop->MakeFetcher<message_bridge::Timestamp>(
624 "/aos");
625
Austin Schuh4c3b9702020-08-30 11:34:55 -0700626 // Count timestamps.
627 MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
628 pi1_pong_counter_event_loop.get(), "/pi1/aos");
629 MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
630 pi2_pong_counter_event_loop.get(), "/pi1/aos");
631 MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
632 pi3_pong_counter_event_loop.get(), "/pi1/aos");
633 MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
634 pi1_pong_counter_event_loop.get(), "/pi2/aos");
635 MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
636 pi2_pong_counter_event_loop.get(), "/pi2/aos");
637 MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
638 pi1_pong_counter_event_loop.get(), "/pi3/aos");
639 MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
640 pi3_pong_counter_event_loop.get(), "/pi3/aos");
641
Austin Schuh2f8fd752020-09-01 22:38:28 -0700642 // Count remote timestamps
Austin Schuh89c9b812021-02-20 14:42:10 -0800643 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
644 remote_timestamps_pi2_on_pi1 =
645 MakePi2OnPi1MessageCounters(pi1_pong_counter_event_loop.get());
646 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
647 remote_timestamps_pi1_on_pi2 =
648 MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700649
Austin Schuh4c3b9702020-08-30 11:34:55 -0700650 // Wait to let timestamp estimation start up before looking for the results.
651 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
652
Austin Schuh8fb315a2020-11-19 22:33:58 -0800653 std::unique_ptr<EventLoop> pi1_statistics_counter_event_loop =
654 simulated_event_loop_factory.MakeEventLoop("pi1_statistics_counter", pi1);
655 std::unique_ptr<EventLoop> pi2_statistics_counter_event_loop =
656 simulated_event_loop_factory.MakeEventLoop("pi2_statistics_counter", pi2);
657 std::unique_ptr<EventLoop> pi3_statistics_counter_event_loop =
658 simulated_event_loop_factory.MakeEventLoop("pi3_statistics_counter", pi3);
659
Austin Schuh4c3b9702020-08-30 11:34:55 -0700660 int pi1_server_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800661 pi1_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700662 "/pi1/aos", [&pi1_server_statistics_count](
663 const message_bridge::ServerStatistics &stats) {
664 VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
665 EXPECT_EQ(stats.connections()->size(), 2u);
666 for (const message_bridge::ServerConnection *connection :
667 *stats.connections()) {
668 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800669 EXPECT_EQ(connection->connection_count(), 1u);
670 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800671 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700672 if (connection->node()->name()->string_view() == "pi2") {
673 EXPECT_GT(connection->sent_packets(), 50);
674 } else if (connection->node()->name()->string_view() == "pi3") {
675 EXPECT_GE(connection->sent_packets(), 5);
676 } else {
677 LOG(FATAL) << "Unknown connection";
678 }
679
680 EXPECT_TRUE(connection->has_monotonic_offset());
681 EXPECT_EQ(connection->monotonic_offset(), 0);
James Kuszmaula6681e22023-05-26 11:20:40 -0700682
683 EXPECT_TRUE(connection->has_channels());
684 int accumulated_sent_count = 0;
685 int accumulated_dropped_count = 0;
686 for (const message_bridge::ServerChannelStatistics *channel :
687 *connection->channels()) {
688 accumulated_sent_count += channel->sent_packets();
689 accumulated_dropped_count += channel->dropped_packets();
690 }
691 EXPECT_EQ(connection->sent_packets(), accumulated_sent_count);
692 EXPECT_EQ(connection->dropped_packets(), accumulated_dropped_count);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700693 }
694 ++pi1_server_statistics_count;
695 });
696
697 int pi2_server_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800698 pi2_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700699 "/pi2/aos", [&pi2_server_statistics_count](
700 const message_bridge::ServerStatistics &stats) {
701 VLOG(1) << "pi2 ServerStatistics " << FlatbufferToJson(&stats);
702 EXPECT_EQ(stats.connections()->size(), 1u);
703
704 const message_bridge::ServerConnection *connection =
705 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800706 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700707 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
708 EXPECT_GT(connection->sent_packets(), 50);
709 EXPECT_TRUE(connection->has_monotonic_offset());
710 EXPECT_EQ(connection->monotonic_offset(), 0);
Austin Schuh367a7f42021-11-23 23:04:36 -0800711 EXPECT_EQ(connection->connection_count(), 1u);
712 EXPECT_EQ(connection->connected_since_time(), 0);
James Kuszmaula6681e22023-05-26 11:20:40 -0700713
714 EXPECT_TRUE(connection->has_channels());
715 int accumulated_sent_count = 0;
716 int accumulated_dropped_count = 0;
717 for (const message_bridge::ServerChannelStatistics *channel :
718 *connection->channels()) {
719 accumulated_sent_count += channel->sent_packets();
720 accumulated_dropped_count += channel->dropped_packets();
721 }
722 EXPECT_EQ(connection->sent_packets(), accumulated_sent_count);
723 EXPECT_EQ(connection->dropped_packets(), accumulated_dropped_count);
724
Austin Schuh4c3b9702020-08-30 11:34:55 -0700725 ++pi2_server_statistics_count;
726 });
727
728 int pi3_server_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800729 pi3_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700730 "/pi3/aos", [&pi3_server_statistics_count](
731 const message_bridge::ServerStatistics &stats) {
732 VLOG(1) << "pi3 ServerStatistics " << FlatbufferToJson(&stats);
733 EXPECT_EQ(stats.connections()->size(), 1u);
734
735 const message_bridge::ServerConnection *connection =
736 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800737 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700738 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
739 EXPECT_GE(connection->sent_packets(), 5);
740 EXPECT_TRUE(connection->has_monotonic_offset());
741 EXPECT_EQ(connection->monotonic_offset(), 0);
Austin Schuh367a7f42021-11-23 23:04:36 -0800742 EXPECT_EQ(connection->connection_count(), 1u);
743 EXPECT_EQ(connection->connected_since_time(), 0);
James Kuszmaula6681e22023-05-26 11:20:40 -0700744
745 EXPECT_TRUE(connection->has_channels());
746 int accumulated_sent_count = 0;
747 int accumulated_dropped_count = 0;
748 for (const message_bridge::ServerChannelStatistics *channel :
749 *connection->channels()) {
750 accumulated_sent_count += channel->sent_packets();
751 accumulated_dropped_count += channel->dropped_packets();
752 }
753 EXPECT_EQ(connection->sent_packets(), accumulated_sent_count);
754 EXPECT_EQ(connection->dropped_packets(), accumulated_dropped_count);
755
Austin Schuh4c3b9702020-08-30 11:34:55 -0700756 ++pi3_server_statistics_count;
757 });
758
759 int pi1_client_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800760 pi1_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700761 "/pi1/aos", [&pi1_client_statistics_count](
762 const message_bridge::ClientStatistics &stats) {
763 VLOG(1) << "pi1 ClientStatistics " << FlatbufferToJson(&stats);
764 EXPECT_EQ(stats.connections()->size(), 2u);
765
766 for (const message_bridge::ClientConnection *connection :
767 *stats.connections()) {
768 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
769 if (connection->node()->name()->string_view() == "pi2") {
770 EXPECT_GT(connection->received_packets(), 50);
771 } else if (connection->node()->name()->string_view() == "pi3") {
772 EXPECT_GE(connection->received_packets(), 5);
773 } else {
774 LOG(FATAL) << "Unknown connection";
775 }
776
Austin Schuhe61d4382021-03-31 21:33:02 -0700777 EXPECT_EQ(connection->partial_deliveries(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700778 EXPECT_TRUE(connection->has_monotonic_offset());
Austin Schuhac6d89e2024-03-27 14:56:09 -0700779 EXPECT_EQ(connection->monotonic_offset(), 100000);
Austin Schuh367a7f42021-11-23 23:04:36 -0800780 EXPECT_EQ(connection->connection_count(), 1u);
781 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700782 }
783 ++pi1_client_statistics_count;
784 });
785
786 int pi2_client_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800787 pi2_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700788 "/pi2/aos", [&pi2_client_statistics_count](
789 const message_bridge::ClientStatistics &stats) {
790 VLOG(1) << "pi2 ClientStatistics " << FlatbufferToJson(&stats);
791 EXPECT_EQ(stats.connections()->size(), 1u);
792
793 const message_bridge::ClientConnection *connection =
794 stats.connections()->Get(0);
795 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
796 EXPECT_GT(connection->received_packets(), 50);
Austin Schuhe61d4382021-03-31 21:33:02 -0700797 EXPECT_EQ(connection->partial_deliveries(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700798 EXPECT_TRUE(connection->has_monotonic_offset());
Austin Schuhac6d89e2024-03-27 14:56:09 -0700799 EXPECT_EQ(connection->monotonic_offset(), 100000);
Austin Schuh367a7f42021-11-23 23:04:36 -0800800 EXPECT_EQ(connection->connection_count(), 1u);
801 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700802 ++pi2_client_statistics_count;
803 });
804
805 int pi3_client_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800806 pi3_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700807 "/pi3/aos", [&pi3_client_statistics_count](
808 const message_bridge::ClientStatistics &stats) {
809 VLOG(1) << "pi3 ClientStatistics " << FlatbufferToJson(&stats);
810 EXPECT_EQ(stats.connections()->size(), 1u);
811
812 const message_bridge::ClientConnection *connection =
813 stats.connections()->Get(0);
814 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
815 EXPECT_GE(connection->received_packets(), 5);
Austin Schuhe61d4382021-03-31 21:33:02 -0700816 EXPECT_EQ(connection->partial_deliveries(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700817 EXPECT_TRUE(connection->has_monotonic_offset());
Austin Schuhac6d89e2024-03-27 14:56:09 -0700818 EXPECT_EQ(connection->monotonic_offset(), 100000);
Austin Schuh367a7f42021-11-23 23:04:36 -0800819 EXPECT_EQ(connection->connection_count(), 1u);
820 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700821 ++pi3_client_statistics_count;
822 });
823
Austin Schuh2f8fd752020-09-01 22:38:28 -0700824 // Find the channel index for both the /pi1/aos Timestamp channel and Ping
825 // channel.
826 const size_t pi1_timestamp_channel =
827 configuration::ChannelIndex(pi1_pong_counter_event_loop->configuration(),
828 pi1_on_pi2_timestamp_fetcher.channel());
829 const size_t ping_timestamp_channel =
830 configuration::ChannelIndex(pi1_pong_counter_event_loop->configuration(),
831 ping_on_pi2_fetcher.channel());
832
833 for (const Channel *channel :
834 *pi1_pong_counter_event_loop->configuration()->channels()) {
835 VLOG(1) << "Channel "
836 << configuration::ChannelIndex(
837 pi1_pong_counter_event_loop->configuration(), channel)
838 << " " << configuration::CleanedChannelToString(channel);
839 }
840
Austin Schuh8fb315a2020-11-19 22:33:58 -0800841 std::unique_ptr<EventLoop> pi1_remote_timestamp =
842 simulated_event_loop_factory.MakeEventLoop("pi1_remote_timestamp", pi1);
843
Austin Schuh89c9b812021-02-20 14:42:10 -0800844 for (std::pair<int, std::string> channel :
845 shared()
846 ? std::vector<std::pair<
847 int, std::string>>{{-1, "/pi1/aos/remote_timestamps/pi2"}}
848 : std::vector<std::pair<int, std::string>>{
849 {pi1_timestamp_channel,
850 "/pi1/aos/remote_timestamps/pi2/pi1/aos/"
851 "aos-message_bridge-Timestamp"},
852 {ping_timestamp_channel,
853 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping"}}) {
854 // For each remote timestamp we get back, confirm that it is either a ping
855 // message, or a timestamp we sent out. Also confirm that the timestamps
856 // are correct.
857 pi1_remote_timestamp->MakeWatcher(
858 channel.second,
859 [pi1_timestamp_channel, ping_timestamp_channel, &ping_on_pi2_fetcher,
860 &ping_on_pi1_fetcher, &pi1_on_pi2_timestamp_fetcher,
861 &pi1_on_pi1_timestamp_fetcher, &simulated_event_loop_factory, pi2,
Austin Schuhac6d89e2024-03-27 14:56:09 -0700862 channel_index = channel.first,
863 channel_name = channel.second](const RemoteMessage &header) {
864 VLOG(1) << channel_name << " aos::message_bridge::RemoteMessage -> "
865 << aos::FlatbufferToJson(&header);
Austin Schuh89c9b812021-02-20 14:42:10 -0800866 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuhcdd90272021-03-15 12:46:16 -0700867 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
Austin Schuh89c9b812021-02-20 14:42:10 -0800868 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
Austin Schuhcdd90272021-03-15 12:46:16 -0700869 ->boot_uuid());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700870
Austin Schuh89c9b812021-02-20 14:42:10 -0800871 const aos::monotonic_clock::time_point header_monotonic_sent_time(
872 chrono::nanoseconds(header.monotonic_sent_time()));
873 const aos::realtime_clock::time_point header_realtime_sent_time(
874 chrono::nanoseconds(header.realtime_sent_time()));
875 const aos::monotonic_clock::time_point header_monotonic_remote_time(
876 chrono::nanoseconds(header.monotonic_remote_time()));
Austin Schuhac6d89e2024-03-27 14:56:09 -0700877 const aos::monotonic_clock::time_point
878 header_monotonic_remote_transmit_time(
879 chrono::nanoseconds(header.monotonic_remote_transmit_time()));
Austin Schuh89c9b812021-02-20 14:42:10 -0800880 const aos::realtime_clock::time_point header_realtime_remote_time(
881 chrono::nanoseconds(header.realtime_remote_time()));
Austin Schuh2f8fd752020-09-01 22:38:28 -0700882
Austin Schuh89c9b812021-02-20 14:42:10 -0800883 if (channel_index != -1) {
884 ASSERT_EQ(channel_index, header.channel_index());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700885 }
886
Austin Schuh89c9b812021-02-20 14:42:10 -0800887 const Context *pi1_context = nullptr;
888 const Context *pi2_context = nullptr;
889
890 if (header.channel_index() == pi1_timestamp_channel) {
891 // Find the forwarded message.
892 while (pi1_on_pi2_timestamp_fetcher.context().monotonic_event_time <
893 header_monotonic_sent_time) {
894 ASSERT_TRUE(pi1_on_pi2_timestamp_fetcher.FetchNext());
895 }
896
897 // And the source message.
898 while (pi1_on_pi1_timestamp_fetcher.context().monotonic_event_time <
899 header_monotonic_remote_time) {
900 ASSERT_TRUE(pi1_on_pi1_timestamp_fetcher.FetchNext());
901 }
902
903 pi1_context = &pi1_on_pi1_timestamp_fetcher.context();
904 pi2_context = &pi1_on_pi2_timestamp_fetcher.context();
Austin Schuhac6d89e2024-03-27 14:56:09 -0700905
906 EXPECT_EQ(header_monotonic_remote_transmit_time,
907 pi2_context->monotonic_remote_time);
Austin Schuh89c9b812021-02-20 14:42:10 -0800908 } else if (header.channel_index() == ping_timestamp_channel) {
909 // Find the forwarded message.
910 while (ping_on_pi2_fetcher.context().monotonic_event_time <
911 header_monotonic_sent_time) {
912 ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
913 }
914
915 // And the source message.
916 while (ping_on_pi1_fetcher.context().monotonic_event_time <
917 header_monotonic_remote_time) {
918 ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
919 }
920
921 pi1_context = &ping_on_pi1_fetcher.context();
922 pi2_context = &ping_on_pi2_fetcher.context();
Austin Schuhac6d89e2024-03-27 14:56:09 -0700923
924 EXPECT_EQ(header_monotonic_remote_transmit_time,
925 pi2_context->monotonic_event_time -
926 simulated_event_loop_factory.network_delay());
Austin Schuh89c9b812021-02-20 14:42:10 -0800927 } else {
928 LOG(FATAL) << "Unknown channel";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700929 }
930
Austin Schuh89c9b812021-02-20 14:42:10 -0800931 // Confirm the forwarded message has matching timestamps to the
932 // timestamps we got back.
933 EXPECT_EQ(pi2_context->queue_index, header.queue_index());
934 EXPECT_EQ(pi2_context->remote_queue_index,
935 header.remote_queue_index());
936 EXPECT_EQ(pi2_context->monotonic_event_time,
937 header_monotonic_sent_time);
938 EXPECT_EQ(pi2_context->realtime_event_time,
939 header_realtime_sent_time);
940 EXPECT_EQ(pi2_context->realtime_remote_time,
941 header_realtime_remote_time);
942 EXPECT_EQ(pi2_context->monotonic_remote_time,
943 header_monotonic_remote_time);
Austin Schuhac6d89e2024-03-27 14:56:09 -0700944 EXPECT_EQ(pi2_context->monotonic_remote_transmit_time,
945 header_monotonic_remote_transmit_time);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700946
Austin Schuh89c9b812021-02-20 14:42:10 -0800947 // Confirm the forwarded message also matches the source message.
948 EXPECT_EQ(pi1_context->queue_index, header.remote_queue_index());
949 EXPECT_EQ(pi1_context->monotonic_event_time,
950 header_monotonic_remote_time);
951 EXPECT_EQ(pi1_context->realtime_event_time,
952 header_realtime_remote_time);
953 });
954 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700955
Austin Schuh4c3b9702020-08-30 11:34:55 -0700956 simulated_event_loop_factory.RunFor(chrono::seconds(10) -
957 chrono::milliseconds(500) +
958 chrono::milliseconds(5));
959
960 EXPECT_EQ(pi1_pong_counter.count(), 1001);
961 EXPECT_EQ(pi2_pong_counter.count(), 1001);
962
963 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 100);
964 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 100);
965 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 100);
966 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 100);
967 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 100);
968 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 100);
969 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 100);
970
Austin Schuh20ac95d2020-12-05 17:24:19 -0800971 EXPECT_EQ(pi1_server_statistics_count, 10);
972 EXPECT_EQ(pi2_server_statistics_count, 10);
973 EXPECT_EQ(pi3_server_statistics_count, 10);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700974
975 EXPECT_EQ(pi1_client_statistics_count, 95);
976 EXPECT_EQ(pi2_client_statistics_count, 95);
977 EXPECT_EQ(pi3_client_statistics_count, 95);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700978
979 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -0800980 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 1101);
981 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 1101);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700982}
983
984// Tests that an offset between nodes can be recovered and shows up in
985// ServerStatistics correctly.
986TEST(SimulatedEventLoopTest, MultinodePingPongWithOffset) {
987 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
Austin Schuh373f1762021-06-02 21:07:09 -0700988 aos::configuration::ReadConfig(ArtifactPath(
989 "aos/events/multinode_pingpong_test_combined_config.json"));
Austin Schuh4c3b9702020-08-30 11:34:55 -0700990 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
Austin Schuh87dd3832021-01-01 23:07:31 -0800991 const size_t pi1_index = configuration::GetNodeIndex(&config.message(), pi1);
992 ASSERT_EQ(pi1_index, 0u);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700993 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
Austin Schuh87dd3832021-01-01 23:07:31 -0800994 const size_t pi2_index = configuration::GetNodeIndex(&config.message(), pi2);
995 ASSERT_EQ(pi2_index, 1u);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700996 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
Austin Schuh87dd3832021-01-01 23:07:31 -0800997 const size_t pi3_index = configuration::GetNodeIndex(&config.message(), pi3);
998 ASSERT_EQ(pi3_index, 2u);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700999
Austin Schuh87dd3832021-01-01 23:07:31 -08001000 message_bridge::TestingTimeConverter time(
1001 configuration::NodesCount(&config.message()));
Austin Schuh4c3b9702020-08-30 11:34:55 -07001002 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
Austin Schuh58646e22021-08-23 23:51:46 -07001003 simulated_event_loop_factory.SetTimeConverter(&time);
Austin Schuh4c3b9702020-08-30 11:34:55 -07001004
1005 constexpr chrono::milliseconds kOffset{1501};
Austin Schuh87dd3832021-01-01 23:07:31 -08001006 time.AddNextTimestamp(
1007 distributed_clock::epoch(),
Austin Schuh58646e22021-08-23 23:51:46 -07001008 {BootTimestamp::epoch(), BootTimestamp::epoch() + kOffset,
1009 BootTimestamp::epoch()});
Austin Schuh4c3b9702020-08-30 11:34:55 -07001010
1011 std::unique_ptr<EventLoop> ping_event_loop =
1012 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
1013 Ping ping(ping_event_loop.get());
1014
1015 std::unique_ptr<EventLoop> pong_event_loop =
1016 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
1017 Pong pong(pong_event_loop.get());
1018
Austin Schuh8fb315a2020-11-19 22:33:58 -08001019 // Wait to let timestamp estimation start up before looking for the results.
1020 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
1021
Austin Schuh87dd3832021-01-01 23:07:31 -08001022 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
1023 simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
1024
Austin Schuh4c3b9702020-08-30 11:34:55 -07001025 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
1026 simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
1027
1028 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
1029 simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
1030
Austin Schuh4c3b9702020-08-30 11:34:55 -07001031 // Confirm the offsets are being recovered correctly.
1032 int pi1_server_statistics_count = 0;
1033 pi1_pong_counter_event_loop->MakeWatcher(
1034 "/pi1/aos", [&pi1_server_statistics_count,
1035 kOffset](const message_bridge::ServerStatistics &stats) {
1036 VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
1037 EXPECT_EQ(stats.connections()->size(), 2u);
1038 for (const message_bridge::ServerConnection *connection :
1039 *stats.connections()) {
1040 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001041 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -07001042 if (connection->node()->name()->string_view() == "pi2") {
1043 EXPECT_EQ(connection->monotonic_offset(),
1044 chrono::nanoseconds(kOffset).count());
1045 } else if (connection->node()->name()->string_view() == "pi3") {
1046 EXPECT_EQ(connection->monotonic_offset(), 0);
1047 } else {
1048 LOG(FATAL) << "Unknown connection";
1049 }
1050
1051 EXPECT_TRUE(connection->has_monotonic_offset());
1052 }
1053 ++pi1_server_statistics_count;
1054 });
1055
1056 int pi2_server_statistics_count = 0;
1057 pi2_pong_counter_event_loop->MakeWatcher(
1058 "/pi2/aos", [&pi2_server_statistics_count,
1059 kOffset](const message_bridge::ServerStatistics &stats) {
1060 VLOG(1) << "pi2 ServerStatistics " << FlatbufferToJson(&stats);
1061 EXPECT_EQ(stats.connections()->size(), 1u);
1062
1063 const message_bridge::ServerConnection *connection =
1064 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001065 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -07001066 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1067 EXPECT_TRUE(connection->has_monotonic_offset());
1068 EXPECT_EQ(connection->monotonic_offset(),
1069 -chrono::nanoseconds(kOffset).count());
1070 ++pi2_server_statistics_count;
1071 });
1072
1073 int pi3_server_statistics_count = 0;
1074 pi3_pong_counter_event_loop->MakeWatcher(
1075 "/pi3/aos", [&pi3_server_statistics_count](
1076 const message_bridge::ServerStatistics &stats) {
1077 VLOG(1) << "pi3 ServerStatistics " << FlatbufferToJson(&stats);
1078 EXPECT_EQ(stats.connections()->size(), 1u);
1079
1080 const message_bridge::ServerConnection *connection =
1081 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001082 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -07001083 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1084 EXPECT_TRUE(connection->has_monotonic_offset());
1085 EXPECT_EQ(connection->monotonic_offset(), 0);
1086 ++pi3_server_statistics_count;
1087 });
1088
1089 simulated_event_loop_factory.RunFor(chrono::seconds(10) -
1090 chrono::milliseconds(500) +
1091 chrono::milliseconds(5));
1092
Austin Schuh20ac95d2020-12-05 17:24:19 -08001093 EXPECT_EQ(pi1_server_statistics_count, 10);
Austin Schuh58646e22021-08-23 23:51:46 -07001094 EXPECT_EQ(pi2_server_statistics_count, 10);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001095 EXPECT_EQ(pi3_server_statistics_count, 10);
Austin Schuh4c3b9702020-08-30 11:34:55 -07001096}
1097
1098// Test that disabling statistics actually disables them.
Austin Schuh89c9b812021-02-20 14:42:10 -08001099TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeWithoutStatistics) {
Austin Schuh4c3b9702020-08-30 11:34:55 -07001100 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
1101 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
1102 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
1103
1104 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
1105 simulated_event_loop_factory.DisableStatistics();
1106
1107 std::unique_ptr<EventLoop> ping_event_loop =
1108 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
1109 Ping ping(ping_event_loop.get());
1110
1111 std::unique_ptr<EventLoop> pong_event_loop =
1112 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
1113 Pong pong(pong_event_loop.get());
1114
1115 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
1116 simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
1117
1118 MessageCounter<examples::Pong> pi2_pong_counter(
1119 pi2_pong_counter_event_loop.get(), "/test");
1120
1121 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
1122 simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
1123
1124 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
1125 simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
1126
1127 MessageCounter<examples::Pong> pi1_pong_counter(
1128 pi1_pong_counter_event_loop.get(), "/test");
1129
1130 // Count timestamps.
1131 MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
1132 pi1_pong_counter_event_loop.get(), "/pi1/aos");
1133 MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
1134 pi2_pong_counter_event_loop.get(), "/pi1/aos");
1135 MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
1136 pi3_pong_counter_event_loop.get(), "/pi1/aos");
1137 MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
1138 pi1_pong_counter_event_loop.get(), "/pi2/aos");
1139 MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
1140 pi2_pong_counter_event_loop.get(), "/pi2/aos");
1141 MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
1142 pi1_pong_counter_event_loop.get(), "/pi3/aos");
1143 MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
1144 pi3_pong_counter_event_loop.get(), "/pi3/aos");
1145
Austin Schuh2f8fd752020-09-01 22:38:28 -07001146 // Count remote timestamps
Austin Schuh89c9b812021-02-20 14:42:10 -08001147 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1148 remote_timestamps_pi2_on_pi1 =
1149 MakePi2OnPi1MessageCounters(pi1_pong_counter_event_loop.get());
1150 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1151 remote_timestamps_pi1_on_pi2 =
1152 MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
Austin Schuh2f8fd752020-09-01 22:38:28 -07001153
Austin Schuh4c3b9702020-08-30 11:34:55 -07001154 MessageCounter<message_bridge::ServerStatistics>
1155 pi1_server_statistics_counter(pi1_pong_counter_event_loop.get(),
1156 "/pi1/aos");
1157 MessageCounter<message_bridge::ServerStatistics>
1158 pi2_server_statistics_counter(pi2_pong_counter_event_loop.get(),
1159 "/pi2/aos");
1160 MessageCounter<message_bridge::ServerStatistics>
1161 pi3_server_statistics_counter(pi3_pong_counter_event_loop.get(),
1162 "/pi3/aos");
1163
1164 MessageCounter<message_bridge::ClientStatistics>
1165 pi1_client_statistics_counter(pi1_pong_counter_event_loop.get(),
1166 "/pi1/aos");
1167 MessageCounter<message_bridge::ClientStatistics>
1168 pi2_client_statistics_counter(pi2_pong_counter_event_loop.get(),
1169 "/pi2/aos");
1170 MessageCounter<message_bridge::ClientStatistics>
1171 pi3_client_statistics_counter(pi3_pong_counter_event_loop.get(),
1172 "/pi3/aos");
Austin Schuh898f4972020-01-11 17:21:25 -08001173
1174 simulated_event_loop_factory.RunFor(chrono::seconds(10) +
1175 chrono::milliseconds(5));
1176
Austin Schuh4c3b9702020-08-30 11:34:55 -07001177 EXPECT_EQ(pi1_pong_counter.count(), 1001u);
1178 EXPECT_EQ(pi2_pong_counter.count(), 1001u);
1179
1180 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 0u);
1181 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 0u);
1182 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 0u);
1183 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 0u);
1184 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 0u);
1185 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 0u);
1186 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 0u);
1187
1188 EXPECT_EQ(pi1_server_statistics_counter.count(), 0u);
1189 EXPECT_EQ(pi2_server_statistics_counter.count(), 0u);
1190 EXPECT_EQ(pi3_server_statistics_counter.count(), 0u);
1191
1192 EXPECT_EQ(pi1_client_statistics_counter.count(), 0u);
1193 EXPECT_EQ(pi2_client_statistics_counter.count(), 0u);
1194 EXPECT_EQ(pi3_client_statistics_counter.count(), 0u);
Austin Schuh2f8fd752020-09-01 22:38:28 -07001195
1196 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001197 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 1001);
1198 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 1001);
Austin Schuh898f4972020-01-11 17:21:25 -08001199}
1200
Austin Schuhc0b0f722020-12-12 18:36:06 -08001201bool AllConnected(const message_bridge::ServerStatistics *server_statistics) {
1202 for (const message_bridge::ServerConnection *connection :
1203 *server_statistics->connections()) {
1204 if (connection->state() != message_bridge::State::CONNECTED) {
1205 return false;
1206 }
1207 }
1208 return true;
1209}
1210
1211bool AllConnectedBut(const message_bridge::ServerStatistics *server_statistics,
1212 std::string_view target) {
1213 for (const message_bridge::ServerConnection *connection :
1214 *server_statistics->connections()) {
1215 if (connection->node()->name()->string_view() == target) {
1216 if (connection->state() == message_bridge::State::CONNECTED) {
1217 return false;
1218 }
1219 } else {
1220 if (connection->state() != message_bridge::State::CONNECTED) {
1221 return false;
1222 }
1223 }
1224 }
1225 return true;
1226}
1227
1228bool AllConnected(const message_bridge::ClientStatistics *client_statistics) {
1229 for (const message_bridge::ClientConnection *connection :
1230 *client_statistics->connections()) {
1231 if (connection->state() != message_bridge::State::CONNECTED) {
1232 return false;
1233 }
Austin Schuh367a7f42021-11-23 23:04:36 -08001234 EXPECT_TRUE(connection->has_boot_uuid());
1235 EXPECT_TRUE(connection->has_connected_since_time());
1236 EXPECT_TRUE(connection->has_connection_count());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001237 }
1238 return true;
1239}
1240
1241bool AllConnectedBut(const message_bridge::ClientStatistics *client_statistics,
1242 std::string_view target) {
1243 for (const message_bridge::ClientConnection *connection :
1244 *client_statistics->connections()) {
1245 if (connection->node()->name()->string_view() == target) {
1246 if (connection->state() == message_bridge::State::CONNECTED) {
1247 return false;
1248 }
Austin Schuh367a7f42021-11-23 23:04:36 -08001249 EXPECT_FALSE(connection->has_boot_uuid());
1250 EXPECT_FALSE(connection->has_connected_since_time());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001251 } else {
1252 if (connection->state() != message_bridge::State::CONNECTED) {
1253 return false;
1254 }
Austin Schuh367a7f42021-11-23 23:04:36 -08001255 EXPECT_TRUE(connection->has_boot_uuid());
1256 EXPECT_TRUE(connection->has_connected_since_time());
1257 EXPECT_TRUE(connection->has_connection_count());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001258 }
1259 }
1260 return true;
1261}
1262
Austin Schuh367a7f42021-11-23 23:04:36 -08001263int ConnectedCount(const message_bridge::ClientStatistics *client_statistics,
1264 std::string_view target) {
1265 for (const message_bridge::ClientConnection *connection :
1266 *client_statistics->connections()) {
1267 if (connection->node()->name()->string_view() == target) {
1268 return connection->connection_count();
1269 }
1270 }
1271 return 0;
1272}
1273
1274int ConnectedCount(const message_bridge::ServerStatistics *server_statistics,
1275 std::string_view target) {
1276 for (const message_bridge::ServerConnection *connection :
1277 *server_statistics->connections()) {
1278 if (connection->node()->name()->string_view() == target) {
1279 return connection->connection_count();
1280 }
1281 }
1282 return 0;
1283}
1284
Austin Schuhc0b0f722020-12-12 18:36:06 -08001285// Test that disconnecting nodes actually disconnects them.
Austin Schuh89c9b812021-02-20 14:42:10 -08001286TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeDisconnect) {
Austin Schuhc0b0f722020-12-12 18:36:06 -08001287 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
1288
Austin Schuh58646e22021-08-23 23:51:46 -07001289 NodeEventLoopFactory *pi1 =
1290 simulated_event_loop_factory.GetNodeEventLoopFactory("pi1");
1291 NodeEventLoopFactory *pi2 =
1292 simulated_event_loop_factory.GetNodeEventLoopFactory("pi2");
1293 NodeEventLoopFactory *pi3 =
1294 simulated_event_loop_factory.GetNodeEventLoopFactory("pi3");
1295
1296 std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001297 Ping ping(ping_event_loop.get());
1298
Austin Schuh58646e22021-08-23 23:51:46 -07001299 std::unique_ptr<EventLoop> pong_event_loop = pi2->MakeEventLoop("pong");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001300 Pong pong(pong_event_loop.get());
1301
1302 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
Austin Schuh58646e22021-08-23 23:51:46 -07001303 pi2->MakeEventLoop("pi2_pong_counter");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001304
1305 MessageCounter<examples::Pong> pi2_pong_counter(
1306 pi2_pong_counter_event_loop.get(), "/test");
1307
1308 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
Austin Schuh58646e22021-08-23 23:51:46 -07001309 pi3->MakeEventLoop("pi3_pong_counter");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001310
1311 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
Austin Schuh58646e22021-08-23 23:51:46 -07001312 pi1->MakeEventLoop("pi1_pong_counter");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001313
1314 MessageCounter<examples::Pong> pi1_pong_counter(
1315 pi1_pong_counter_event_loop.get(), "/test");
1316
1317 // Count timestamps.
1318 MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
1319 pi1_pong_counter_event_loop.get(), "/pi1/aos");
1320 MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
1321 pi2_pong_counter_event_loop.get(), "/pi1/aos");
1322 MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
1323 pi3_pong_counter_event_loop.get(), "/pi1/aos");
1324 MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
1325 pi1_pong_counter_event_loop.get(), "/pi2/aos");
1326 MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
1327 pi2_pong_counter_event_loop.get(), "/pi2/aos");
1328 MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
1329 pi1_pong_counter_event_loop.get(), "/pi3/aos");
1330 MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
1331 pi3_pong_counter_event_loop.get(), "/pi3/aos");
1332
1333 // Count remote timestamps
Austin Schuh89c9b812021-02-20 14:42:10 -08001334 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1335 remote_timestamps_pi2_on_pi1 =
1336 MakePi2OnPi1MessageCounters(pi1_pong_counter_event_loop.get());
1337 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1338 remote_timestamps_pi1_on_pi2 =
1339 MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001340
1341 MessageCounter<message_bridge::ServerStatistics>
Austin Schuh58646e22021-08-23 23:51:46 -07001342 *pi1_server_statistics_counter;
1343 pi1->OnStartup([pi1, &pi1_server_statistics_counter]() {
1344 pi1_server_statistics_counter =
1345 pi1->AlwaysStart<MessageCounter<message_bridge::ServerStatistics>>(
1346 "pi1_server_statistics_counter", "/pi1/aos");
1347 });
1348
Austin Schuhc0b0f722020-12-12 18:36:06 -08001349 aos::Fetcher<message_bridge::ServerStatistics> pi1_server_statistics_fetcher =
1350 pi1_pong_counter_event_loop
1351 ->MakeFetcher<message_bridge::ServerStatistics>("/pi1/aos");
1352 aos::Fetcher<message_bridge::ClientStatistics> pi1_client_statistics_fetcher =
1353 pi1_pong_counter_event_loop
1354 ->MakeFetcher<message_bridge::ClientStatistics>("/pi1/aos");
1355
1356 MessageCounter<message_bridge::ServerStatistics>
Austin Schuh58646e22021-08-23 23:51:46 -07001357 *pi2_server_statistics_counter;
1358 pi2->OnStartup([pi2, &pi2_server_statistics_counter]() {
1359 pi2_server_statistics_counter =
1360 pi2->AlwaysStart<MessageCounter<message_bridge::ServerStatistics>>(
1361 "pi2_server_statistics_counter", "/pi2/aos");
1362 });
Austin Schuhc0b0f722020-12-12 18:36:06 -08001363 aos::Fetcher<message_bridge::ServerStatistics> pi2_server_statistics_fetcher =
1364 pi2_pong_counter_event_loop
1365 ->MakeFetcher<message_bridge::ServerStatistics>("/pi2/aos");
1366 aos::Fetcher<message_bridge::ClientStatistics> pi2_client_statistics_fetcher =
1367 pi2_pong_counter_event_loop
1368 ->MakeFetcher<message_bridge::ClientStatistics>("/pi2/aos");
1369
1370 MessageCounter<message_bridge::ServerStatistics>
Austin Schuh58646e22021-08-23 23:51:46 -07001371 *pi3_server_statistics_counter;
1372 pi3->OnStartup([pi3, &pi3_server_statistics_counter]() {
1373 pi3_server_statistics_counter =
1374 pi3->AlwaysStart<MessageCounter<message_bridge::ServerStatistics>>(
1375 "pi3_server_statistics_counter", "/pi3/aos");
1376 });
Austin Schuhc0b0f722020-12-12 18:36:06 -08001377 aos::Fetcher<message_bridge::ServerStatistics> pi3_server_statistics_fetcher =
1378 pi3_pong_counter_event_loop
1379 ->MakeFetcher<message_bridge::ServerStatistics>("/pi3/aos");
1380 aos::Fetcher<message_bridge::ClientStatistics> pi3_client_statistics_fetcher =
1381 pi3_pong_counter_event_loop
1382 ->MakeFetcher<message_bridge::ClientStatistics>("/pi3/aos");
1383
1384 MessageCounter<message_bridge::ClientStatistics>
1385 pi1_client_statistics_counter(pi1_pong_counter_event_loop.get(),
1386 "/pi1/aos");
1387 MessageCounter<message_bridge::ClientStatistics>
1388 pi2_client_statistics_counter(pi2_pong_counter_event_loop.get(),
1389 "/pi2/aos");
1390 MessageCounter<message_bridge::ClientStatistics>
1391 pi3_client_statistics_counter(pi3_pong_counter_event_loop.get(),
1392 "/pi3/aos");
1393
James Kuszmaul86e86c32022-07-21 17:39:47 -07001394 std::vector<std::unique_ptr<aos::EventLoop>> statistics_watcher_loops;
1395 statistics_watcher_loops.emplace_back(pi1->MakeEventLoop("test"));
1396 statistics_watcher_loops.emplace_back(pi2->MakeEventLoop("test"));
1397 statistics_watcher_loops.emplace_back(pi3->MakeEventLoop("test"));
1398 // The currenct contract is that, if all nodes boot simultaneously in
1399 // simulation, that they should all act as if they area already connected,
1400 // without ever observing the transition from disconnected to connected (note
1401 // that on a real system the ServerStatistics message will get resent for each
1402 // and every new connection, even if the new connections happen
1403 // "simultaneously"--in simulation, we are essentially acting as if we are
1404 // starting execution in an already running system, rather than observing the
1405 // boot process).
1406 for (auto &event_loop : statistics_watcher_loops) {
1407 event_loop->MakeWatcher(
1408 "/aos", [](const message_bridge::ServerStatistics &msg) {
1409 for (const message_bridge::ServerConnection *connection :
1410 *msg.connections()) {
1411 EXPECT_EQ(message_bridge::State::CONNECTED, connection->state())
1412 << connection->node()->name()->string_view();
1413 }
1414 });
1415 }
1416
Austin Schuhc0b0f722020-12-12 18:36:06 -08001417 simulated_event_loop_factory.RunFor(chrono::seconds(2) +
1418 chrono::milliseconds(5));
1419
James Kuszmaul86e86c32022-07-21 17:39:47 -07001420 statistics_watcher_loops.clear();
1421
Austin Schuhc0b0f722020-12-12 18:36:06 -08001422 EXPECT_EQ(pi1_pong_counter.count(), 201u);
1423 EXPECT_EQ(pi2_pong_counter.count(), 201u);
1424
1425 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 20u);
1426 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 20u);
1427 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 20u);
1428 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 20u);
1429 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 20u);
1430 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 20u);
1431 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 20u);
1432
Austin Schuh58646e22021-08-23 23:51:46 -07001433 EXPECT_EQ(pi1_server_statistics_counter->count(), 2u);
1434 EXPECT_EQ(pi2_server_statistics_counter->count(), 2u);
1435 EXPECT_EQ(pi3_server_statistics_counter->count(), 2u);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001436
1437 EXPECT_EQ(pi1_client_statistics_counter.count(), 20u);
1438 EXPECT_EQ(pi2_client_statistics_counter.count(), 20u);
1439 EXPECT_EQ(pi3_client_statistics_counter.count(), 20u);
1440
1441 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001442 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 221);
1443 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 221);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001444
1445 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1446 EXPECT_TRUE(AllConnected(pi1_server_statistics_fetcher.get()))
1447 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1448 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1449 EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
1450 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1451 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1452 EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
1453 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1454 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1455 EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
1456 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1457 EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
1458 EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
1459 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
1460 EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
1461 EXPECT_TRUE(AllConnected(pi3_client_statistics_fetcher.get()))
1462 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1463
Austin Schuh58646e22021-08-23 23:51:46 -07001464 pi1->Disconnect(pi3->node());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001465
1466 simulated_event_loop_factory.RunFor(chrono::seconds(2));
1467
1468 EXPECT_EQ(pi1_pong_counter.count(), 401u);
1469 EXPECT_EQ(pi2_pong_counter.count(), 401u);
1470
1471 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 40u);
1472 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 40u);
1473 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 20u);
1474 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 40u);
1475 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 40u);
1476 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 40u);
1477 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 40u);
1478
Austin Schuh58646e22021-08-23 23:51:46 -07001479 EXPECT_EQ(pi1_server_statistics_counter->count(), 4u);
1480 EXPECT_EQ(pi2_server_statistics_counter->count(), 4u);
1481 EXPECT_EQ(pi3_server_statistics_counter->count(), 4u);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001482
1483 EXPECT_EQ(pi1_client_statistics_counter.count(), 40u);
1484 EXPECT_EQ(pi2_client_statistics_counter.count(), 40u);
1485 EXPECT_EQ(pi3_client_statistics_counter.count(), 40u);
1486
1487 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001488 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 441);
1489 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 441);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001490
1491 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1492 EXPECT_TRUE(AllConnectedBut(pi1_server_statistics_fetcher.get(), "pi3"))
1493 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1494 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1495 EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
1496 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1497 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1498 EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
1499 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1500 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1501 EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
1502 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1503 EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
1504 EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
1505 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
1506 EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
1507 EXPECT_TRUE(AllConnectedBut(pi3_client_statistics_fetcher.get(), "pi1"))
1508 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1509
Austin Schuh58646e22021-08-23 23:51:46 -07001510 pi1->Connect(pi3->node());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001511
1512 simulated_event_loop_factory.RunFor(chrono::seconds(2));
1513
Austin Schuh367a7f42021-11-23 23:04:36 -08001514 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1515 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1516 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1517 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1518 EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
1519 EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
1520
1521 EXPECT_EQ(ConnectedCount(pi1_server_statistics_fetcher.get(), "pi3"), 2u)
1522 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1523 EXPECT_EQ(ConnectedCount(pi1_server_statistics_fetcher.get(), "pi2"), 1u)
1524 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1525 EXPECT_EQ(ConnectedCount(pi1_client_statistics_fetcher.get(), "pi3"), 1u)
1526 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1527 EXPECT_EQ(ConnectedCount(pi1_client_statistics_fetcher.get(), "pi2"), 1u)
1528 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1529
1530 EXPECT_EQ(ConnectedCount(pi2_server_statistics_fetcher.get(), "pi1"), 1u)
1531 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1532 EXPECT_EQ(ConnectedCount(pi2_client_statistics_fetcher.get(), "pi1"), 1u)
1533 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1534
1535 EXPECT_EQ(ConnectedCount(pi3_server_statistics_fetcher.get(), "pi1"), 1u)
1536 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
1537 EXPECT_EQ(ConnectedCount(pi3_client_statistics_fetcher.get(), "pi1"), 2u)
1538 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1539
Austin Schuhc0b0f722020-12-12 18:36:06 -08001540 EXPECT_EQ(pi1_pong_counter.count(), 601u);
1541 EXPECT_EQ(pi2_pong_counter.count(), 601u);
1542
1543 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 60u);
1544 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 60u);
1545 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 40u);
1546 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 60u);
1547 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 60u);
1548 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 60u);
1549 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 60u);
1550
Austin Schuh58646e22021-08-23 23:51:46 -07001551 EXPECT_EQ(pi1_server_statistics_counter->count(), 6u);
1552 EXPECT_EQ(pi2_server_statistics_counter->count(), 6u);
1553 EXPECT_EQ(pi3_server_statistics_counter->count(), 6u);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001554
1555 EXPECT_EQ(pi1_client_statistics_counter.count(), 60u);
1556 EXPECT_EQ(pi2_client_statistics_counter.count(), 60u);
1557 EXPECT_EQ(pi3_client_statistics_counter.count(), 60u);
1558
1559 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001560 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 661);
1561 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 661);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001562
Austin Schuhc0b0f722020-12-12 18:36:06 -08001563 EXPECT_TRUE(AllConnected(pi1_server_statistics_fetcher.get()))
1564 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001565 EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
1566 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001567 EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
1568 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001569 EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
1570 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001571 EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
1572 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001573 EXPECT_TRUE(AllConnected(pi3_client_statistics_fetcher.get()))
1574 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1575}
1576
Austin Schuh2febf0d2020-09-21 22:24:30 -07001577// Tests that the time offset having a slope doesn't break the world.
1578// SimulatedMessageBridge has enough self consistency CHECK statements to
1579// confirm, and we can can also check a message in each direction to make sure
1580// it gets delivered as expected.
1581TEST(SimulatedEventLoopTest, MultinodePingPongWithOffsetAndSlope) {
1582 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
Austin Schuh373f1762021-06-02 21:07:09 -07001583 aos::configuration::ReadConfig(ArtifactPath(
1584 "aos/events/multinode_pingpong_test_combined_config.json"));
Austin Schuh2febf0d2020-09-21 22:24:30 -07001585 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
Austin Schuh87dd3832021-01-01 23:07:31 -08001586 const size_t pi1_index = configuration::GetNodeIndex(&config.message(), pi1);
1587 ASSERT_EQ(pi1_index, 0u);
Austin Schuh2febf0d2020-09-21 22:24:30 -07001588 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
Austin Schuh87dd3832021-01-01 23:07:31 -08001589 const size_t pi2_index = configuration::GetNodeIndex(&config.message(), pi2);
1590 ASSERT_EQ(pi2_index, 1u);
1591 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
1592 const size_t pi3_index = configuration::GetNodeIndex(&config.message(), pi3);
1593 ASSERT_EQ(pi3_index, 2u);
Austin Schuh2febf0d2020-09-21 22:24:30 -07001594
Austin Schuh87dd3832021-01-01 23:07:31 -08001595 message_bridge::TestingTimeConverter time(
1596 configuration::NodesCount(&config.message()));
Austin Schuh2febf0d2020-09-21 22:24:30 -07001597 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
Austin Schuh58646e22021-08-23 23:51:46 -07001598 simulated_event_loop_factory.SetTimeConverter(&time);
Austin Schuh2febf0d2020-09-21 22:24:30 -07001599
Austin Schuh2febf0d2020-09-21 22:24:30 -07001600 constexpr chrono::milliseconds kOffset{150100};
Austin Schuh87dd3832021-01-01 23:07:31 -08001601 time.AddNextTimestamp(
1602 distributed_clock::epoch(),
Austin Schuh58646e22021-08-23 23:51:46 -07001603 {BootTimestamp::epoch(), BootTimestamp::epoch() + kOffset,
1604 BootTimestamp::epoch()});
1605 time.AddNextTimestamp(distributed_clock::epoch() + chrono::seconds(10),
1606 {BootTimestamp::epoch() + chrono::milliseconds(9999),
1607 BootTimestamp::epoch() + kOffset + chrono::seconds(10),
1608 BootTimestamp::epoch() + chrono::milliseconds(9999)});
Austin Schuh2febf0d2020-09-21 22:24:30 -07001609
1610 std::unique_ptr<EventLoop> ping_event_loop =
1611 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
1612 Ping ping(ping_event_loop.get());
1613
1614 std::unique_ptr<EventLoop> pong_event_loop =
1615 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
1616 Pong pong(pong_event_loop.get());
1617
1618 std::unique_ptr<EventLoop> pi1_counter_event_loop =
1619 simulated_event_loop_factory.MakeEventLoop("pi1_counter", pi1);
1620 std::unique_ptr<EventLoop> pi2_counter_event_loop =
1621 simulated_event_loop_factory.MakeEventLoop("pi2_counter", pi2);
1622
1623 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
1624 pi1_counter_event_loop->MakeFetcher<examples::Ping>("/test");
1625 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
1626 pi2_counter_event_loop->MakeFetcher<examples::Ping>("/test");
1627
1628 aos::Fetcher<examples::Pong> pong_on_pi2_fetcher =
1629 pi2_counter_event_loop->MakeFetcher<examples::Pong>("/test");
1630 aos::Fetcher<examples::Pong> pong_on_pi1_fetcher =
1631 pi1_counter_event_loop->MakeFetcher<examples::Pong>("/test");
1632
1633 // End after a pong message comes back. This will leave the latest messages
1634 // on all channels so we can look at timestamps easily and check they make
1635 // sense.
1636 std::unique_ptr<EventLoop> pi1_pong_ender =
1637 simulated_event_loop_factory.MakeEventLoop("pi2_counter", pi1);
1638 int count = 0;
1639 pi1_pong_ender->MakeWatcher(
1640 "/test", [&simulated_event_loop_factory, &count](const examples::Pong &) {
1641 if (++count == 100) {
1642 simulated_event_loop_factory.Exit();
1643 }
1644 });
1645
1646 // Run enough that messages should be delivered.
1647 simulated_event_loop_factory.Run();
1648
1649 // Grab the latest messages.
1650 EXPECT_TRUE(ping_on_pi1_fetcher.Fetch());
1651 EXPECT_TRUE(ping_on_pi2_fetcher.Fetch());
1652 EXPECT_TRUE(pong_on_pi1_fetcher.Fetch());
1653 EXPECT_TRUE(pong_on_pi2_fetcher.Fetch());
1654
1655 // Compute their time on the global distributed clock so we can compute
1656 // distance betwen them.
1657 const distributed_clock::time_point pi1_ping_time =
1658 simulated_event_loop_factory.GetNodeEventLoopFactory(pi1)
1659 ->ToDistributedClock(
1660 ping_on_pi1_fetcher.context().monotonic_event_time);
1661 const distributed_clock::time_point pi2_ping_time =
1662 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
1663 ->ToDistributedClock(
1664 ping_on_pi2_fetcher.context().monotonic_event_time);
1665 const distributed_clock::time_point pi1_pong_time =
1666 simulated_event_loop_factory.GetNodeEventLoopFactory(pi1)
1667 ->ToDistributedClock(
1668 pong_on_pi1_fetcher.context().monotonic_event_time);
1669 const distributed_clock::time_point pi2_pong_time =
1670 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
1671 ->ToDistributedClock(
1672 pong_on_pi2_fetcher.context().monotonic_event_time);
1673
1674 // And confirm the delivery delay is just about exactly 150 uS for both
1675 // directions like expected. There will be a couple ns of rounding errors in
1676 // the conversion functions that aren't worth accounting for right now. This
1677 // will either be really close, or really far.
1678 EXPECT_GE(pi2_ping_time, chrono::microseconds(150) - chrono::nanoseconds(10) +
1679 pi1_ping_time);
1680 EXPECT_LE(pi2_ping_time, chrono::microseconds(150) + chrono::nanoseconds(10) +
1681 pi1_ping_time);
1682
1683 EXPECT_GE(pi1_pong_time, chrono::microseconds(150) - chrono::nanoseconds(10) +
1684 pi2_pong_time);
1685 EXPECT_LE(pi1_pong_time, chrono::microseconds(150) + chrono::nanoseconds(10) +
1686 pi2_pong_time);
1687}
1688
Austin Schuh4c570ea2020-11-19 23:13:24 -08001689void SendPing(aos::Sender<examples::Ping> *sender, int value) {
1690 aos::Sender<examples::Ping>::Builder builder = sender->MakeBuilder();
1691 examples::Ping::Builder ping_builder = builder.MakeBuilder<examples::Ping>();
1692 ping_builder.add_value(value);
milind1f1dca32021-07-03 13:50:07 -07001693 builder.CheckOk(builder.Send(ping_builder.Finish()));
Austin Schuh4c570ea2020-11-19 23:13:24 -08001694}
1695
1696// Tests that reliable (and unreliable) ping messages get forwarded as expected.
Austin Schuh89c9b812021-02-20 14:42:10 -08001697TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeStartupTesting) {
Austin Schuh4c570ea2020-11-19 23:13:24 -08001698 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
1699 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
1700
1701 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
1702
1703 std::unique_ptr<EventLoop> ping_event_loop =
1704 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
1705 aos::Sender<examples::Ping> pi1_reliable_sender =
1706 ping_event_loop->MakeSender<examples::Ping>("/reliable");
1707 aos::Sender<examples::Ping> pi1_unreliable_sender =
1708 ping_event_loop->MakeSender<examples::Ping>("/unreliable");
1709 SendPing(&pi1_reliable_sender, 1);
1710 SendPing(&pi1_unreliable_sender, 1);
1711
1712 std::unique_ptr<EventLoop> pi2_pong_event_loop =
1713 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
James Kuszmaul86e86c32022-07-21 17:39:47 -07001714 aos::Sender<examples::Ping> pi2_reliable_sender =
1715 pi2_pong_event_loop->MakeSender<examples::Ping>("/reliable2");
1716 SendPing(&pi2_reliable_sender, 1);
Austin Schuh4c570ea2020-11-19 23:13:24 -08001717 MessageCounter<examples::Ping> pi2_reliable_counter(pi2_pong_event_loop.get(),
1718 "/reliable");
James Kuszmaul86e86c32022-07-21 17:39:47 -07001719 MessageCounter<examples::Ping> pi1_reliable_counter(ping_event_loop.get(),
1720 "/reliable2");
Austin Schuh4c570ea2020-11-19 23:13:24 -08001721 MessageCounter<examples::Ping> pi2_unreliable_counter(
1722 pi2_pong_event_loop.get(), "/unreliable");
1723 aos::Fetcher<examples::Ping> reliable_on_pi2_fetcher =
1724 pi2_pong_event_loop->MakeFetcher<examples::Ping>("/reliable");
1725 aos::Fetcher<examples::Ping> unreliable_on_pi2_fetcher =
1726 pi2_pong_event_loop->MakeFetcher<examples::Ping>("/unreliable");
1727
1728 const size_t reliable_channel_index = configuration::ChannelIndex(
1729 pi2_pong_event_loop->configuration(), reliable_on_pi2_fetcher.channel());
1730
1731 std::unique_ptr<EventLoop> pi1_remote_timestamp =
1732 simulated_event_loop_factory.MakeEventLoop("pi1_remote_timestamp", pi1);
1733
Austin Schuheeaa2022021-01-02 21:52:03 -08001734 const chrono::nanoseconds network_delay =
1735 simulated_event_loop_factory.network_delay();
1736
Austin Schuh4c570ea2020-11-19 23:13:24 -08001737 int reliable_timestamp_count = 0;
1738 pi1_remote_timestamp->MakeWatcher(
Austin Schuh89c9b812021-02-20 14:42:10 -08001739 shared() ? "/pi1/aos/remote_timestamps/pi2"
1740 : "/pi1/aos/remote_timestamps/pi2/reliable/aos-examples-Ping",
Austin Schuh20ac95d2020-12-05 17:24:19 -08001741 [reliable_channel_index, &reliable_timestamp_count,
Austin Schuheeaa2022021-01-02 21:52:03 -08001742 &simulated_event_loop_factory, pi2, network_delay, &pi2_pong_event_loop,
1743 &pi1_remote_timestamp](const RemoteMessage &header) {
Austin Schuh20ac95d2020-12-05 17:24:19 -08001744 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuhcdd90272021-03-15 12:46:16 -07001745 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
Austin Schuh20ac95d2020-12-05 17:24:19 -08001746 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
Austin Schuhcdd90272021-03-15 12:46:16 -07001747 ->boot_uuid());
Austin Schuh4c570ea2020-11-19 23:13:24 -08001748 VLOG(1) << aos::FlatbufferToJson(&header);
1749 if (header.channel_index() == reliable_channel_index) {
1750 ++reliable_timestamp_count;
1751 }
Austin Schuheeaa2022021-01-02 21:52:03 -08001752
1753 const aos::monotonic_clock::time_point header_monotonic_sent_time(
1754 chrono::nanoseconds(header.monotonic_sent_time()));
1755
1756 EXPECT_EQ(pi1_remote_timestamp->context().monotonic_event_time,
1757 header_monotonic_sent_time + network_delay +
1758 (pi1_remote_timestamp->monotonic_now() -
1759 pi2_pong_event_loop->monotonic_now()));
Austin Schuh4c570ea2020-11-19 23:13:24 -08001760 });
1761
1762 // Wait to let timestamp estimation start up before looking for the results.
1763 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
1764
1765 EXPECT_EQ(pi2_reliable_counter.count(), 1u);
1766 // This one isn't reliable, but was sent before the start. It should *not* be
1767 // delivered.
1768 EXPECT_EQ(pi2_unreliable_counter.count(), 0u);
1769 // Confirm we got a timestamp logged for the message that was forwarded.
1770 EXPECT_EQ(reliable_timestamp_count, 1u);
1771
1772 SendPing(&pi1_reliable_sender, 2);
1773 SendPing(&pi1_unreliable_sender, 2);
1774 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
1775 EXPECT_EQ(pi2_reliable_counter.count(), 2u);
James Kuszmaul86e86c32022-07-21 17:39:47 -07001776 EXPECT_EQ(pi1_reliable_counter.count(), 1u);
Austin Schuh4c570ea2020-11-19 23:13:24 -08001777 EXPECT_EQ(pi2_unreliable_counter.count(), 1u);
1778
1779 EXPECT_EQ(reliable_timestamp_count, 2u);
1780}
1781
Austin Schuh20ac95d2020-12-05 17:24:19 -08001782// Tests that rebooting a node changes the ServerStatistics message and the
1783// RemoteTimestamp message.
Austin Schuh89c9b812021-02-20 14:42:10 -08001784TEST_P(RemoteMessageSimulatedEventLoopTest, BootUUIDTest) {
Austin Schuh72e65682021-09-02 11:37:05 -07001785 const UUID pi1_boot0 = UUID::Random();
1786 const UUID pi2_boot0 = UUID::Random();
1787 const UUID pi2_boot1 = UUID::Random();
1788 const UUID pi3_boot0 = UUID::Random();
1789 UUID expected_boot_uuid = pi2_boot0;
Austin Schuh20ac95d2020-12-05 17:24:19 -08001790
Austin Schuh58646e22021-08-23 23:51:46 -07001791 message_bridge::TestingTimeConverter time(
1792 configuration::NodesCount(&config.message()));
1793 SimulatedEventLoopFactory factory(&config.message());
1794 factory.SetTimeConverter(&time);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001795
Austin Schuh58646e22021-08-23 23:51:46 -07001796 const size_t pi1_index =
1797 configuration::GetNodeIndex(&config.message(), "pi1");
1798 const size_t pi2_index =
1799 configuration::GetNodeIndex(&config.message(), "pi2");
1800 const size_t pi3_index =
1801 configuration::GetNodeIndex(&config.message(), "pi3");
Austin Schuh20ac95d2020-12-05 17:24:19 -08001802
Austin Schuh58646e22021-08-23 23:51:46 -07001803 {
1804 time.AddNextTimestamp(distributed_clock::epoch(),
1805 {BootTimestamp::epoch(), BootTimestamp::epoch(),
1806 BootTimestamp::epoch()});
1807
1808 const chrono::nanoseconds dt = chrono::milliseconds(2001);
1809
1810 time.AddNextTimestamp(
1811 distributed_clock::epoch() + dt,
1812 {BootTimestamp::epoch() + dt,
1813 BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
1814 BootTimestamp::epoch() + dt});
1815
1816 time.set_boot_uuid(pi1_index, 0, pi1_boot0);
1817 time.set_boot_uuid(pi2_index, 0, pi2_boot0);
1818 time.set_boot_uuid(pi2_index, 1, pi2_boot1);
1819 time.set_boot_uuid(pi3_index, 0, pi3_boot0);
1820 }
1821
1822 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1823 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
1824
1825 pi1->OnStartup([pi1]() { pi1->AlwaysStart<Ping>("ping"); });
1826 pi2->OnStartup([pi2]() { pi2->AlwaysStart<Pong>("pong"); });
Austin Schuh20ac95d2020-12-05 17:24:19 -08001827
1828 std::unique_ptr<EventLoop> pi1_remote_timestamp =
Austin Schuh58646e22021-08-23 23:51:46 -07001829 pi1->MakeEventLoop("pi1_remote_timestamp");
Austin Schuh20ac95d2020-12-05 17:24:19 -08001830
1831 int timestamp_count = 0;
1832 pi1_remote_timestamp->MakeWatcher(
Austin Schuh8902fa52021-03-14 22:39:24 -07001833 "/pi2/aos", [&expected_boot_uuid,
1834 &pi1_remote_timestamp](const message_bridge::Timestamp &) {
Austin Schuha9012be2021-07-21 15:19:11 -07001835 EXPECT_EQ(pi1_remote_timestamp->context().source_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -07001836 expected_boot_uuid);
1837 });
1838 pi1_remote_timestamp->MakeWatcher(
1839 "/test",
1840 [&expected_boot_uuid, &pi1_remote_timestamp](const examples::Pong &) {
Austin Schuha9012be2021-07-21 15:19:11 -07001841 EXPECT_EQ(pi1_remote_timestamp->context().source_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -07001842 expected_boot_uuid);
1843 });
1844 pi1_remote_timestamp->MakeWatcher(
Austin Schuh89c9b812021-02-20 14:42:10 -08001845 shared() ? "/pi1/aos/remote_timestamps/pi2"
1846 : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping",
Austin Schuh20ac95d2020-12-05 17:24:19 -08001847 [&timestamp_count, &expected_boot_uuid](const RemoteMessage &header) {
1848 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuhcdd90272021-03-15 12:46:16 -07001849 EXPECT_EQ(UUID::FromVector(header.boot_uuid()), expected_boot_uuid);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001850 VLOG(1) << aos::FlatbufferToJson(&header);
1851 ++timestamp_count;
1852 });
1853
1854 int pi1_server_statistics_count = 0;
Austin Schuh58646e22021-08-23 23:51:46 -07001855 bool first_pi1_server_statistics = true;
Austin Schuh367a7f42021-11-23 23:04:36 -08001856 int boot_number = 0;
1857 monotonic_clock::time_point expected_connection_time = pi1->monotonic_now();
Austin Schuh20ac95d2020-12-05 17:24:19 -08001858 pi1_remote_timestamp->MakeWatcher(
Austin Schuh367a7f42021-11-23 23:04:36 -08001859 "/pi1/aos",
1860 [&pi1_server_statistics_count, &expected_boot_uuid,
1861 &expected_connection_time, &first_pi1_server_statistics,
1862 &boot_number](const message_bridge::ServerStatistics &stats) {
Austin Schuh20ac95d2020-12-05 17:24:19 -08001863 VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
1864 for (const message_bridge::ServerConnection *connection :
1865 *stats.connections()) {
Austin Schuh58646e22021-08-23 23:51:46 -07001866 if (connection->state() == message_bridge::State::CONNECTED) {
1867 ASSERT_TRUE(connection->has_boot_uuid());
1868 }
1869 if (!first_pi1_server_statistics) {
1870 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1871 }
Austin Schuh20ac95d2020-12-05 17:24:19 -08001872 if (connection->node()->name()->string_view() == "pi2") {
Austin Schuh58646e22021-08-23 23:51:46 -07001873 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1874 ASSERT_TRUE(connection->has_boot_uuid());
Austin Schuh20ac95d2020-12-05 17:24:19 -08001875 EXPECT_EQ(expected_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -07001876 UUID::FromString(connection->boot_uuid()))
Austin Schuh20ac95d2020-12-05 17:24:19 -08001877 << " : Got " << aos::FlatbufferToJson(&stats);
Austin Schuh367a7f42021-11-23 23:04:36 -08001878 EXPECT_EQ(monotonic_clock::time_point(chrono::nanoseconds(
1879 connection->connected_since_time())),
1880 expected_connection_time);
1881 EXPECT_EQ(boot_number + 1, connection->connection_count());
Austin Schuh20ac95d2020-12-05 17:24:19 -08001882 ++pi1_server_statistics_count;
1883 }
1884 }
Austin Schuh58646e22021-08-23 23:51:46 -07001885 first_pi1_server_statistics = false;
Austin Schuh20ac95d2020-12-05 17:24:19 -08001886 });
1887
Austin Schuh58646e22021-08-23 23:51:46 -07001888 int pi1_client_statistics_count = 0;
1889 pi1_remote_timestamp->MakeWatcher(
Austin Schuh367a7f42021-11-23 23:04:36 -08001890 "/pi1/aos", [&pi1_client_statistics_count, &expected_boot_uuid,
1891 &expected_connection_time, &boot_number](
Austin Schuh58646e22021-08-23 23:51:46 -07001892 const message_bridge::ClientStatistics &stats) {
1893 VLOG(1) << "pi1 ClientStatistics " << FlatbufferToJson(&stats);
1894 for (const message_bridge::ClientConnection *connection :
1895 *stats.connections()) {
1896 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1897 if (connection->node()->name()->string_view() == "pi2") {
1898 ++pi1_client_statistics_count;
Austin Schuh367a7f42021-11-23 23:04:36 -08001899 EXPECT_EQ(expected_boot_uuid,
1900 UUID::FromString(connection->boot_uuid()))
1901 << " : Got " << aos::FlatbufferToJson(&stats);
1902 EXPECT_EQ(monotonic_clock::time_point(chrono::nanoseconds(
1903 connection->connected_since_time())),
1904 expected_connection_time);
1905 EXPECT_EQ(boot_number + 1, connection->connection_count());
1906 } else {
1907 EXPECT_EQ(connection->connected_since_time(), 0);
1908 EXPECT_EQ(1, connection->connection_count());
Austin Schuh58646e22021-08-23 23:51:46 -07001909 }
1910 }
1911 });
1912
1913 // Confirm that reboot changes the UUID.
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -07001914 pi2->OnShutdown([&expected_boot_uuid, &boot_number, &expected_connection_time,
1915 pi1, pi2, pi2_boot1]() {
1916 expected_boot_uuid = pi2_boot1;
1917 ++boot_number;
1918 LOG(INFO) << "OnShutdown triggered for pi2";
1919 pi2->OnStartup(
1920 [&expected_boot_uuid, &expected_connection_time, pi1, pi2]() {
1921 EXPECT_EQ(expected_boot_uuid, pi2->boot_uuid());
1922 expected_connection_time = pi1->monotonic_now();
1923 });
1924 });
Austin Schuh58646e22021-08-23 23:51:46 -07001925
Austin Schuh20ac95d2020-12-05 17:24:19 -08001926 // Let a couple of ServerStatistics messages show up before rebooting.
Austin Schuh58646e22021-08-23 23:51:46 -07001927 factory.RunFor(chrono::milliseconds(2002));
Austin Schuh20ac95d2020-12-05 17:24:19 -08001928
1929 EXPECT_GT(timestamp_count, 100);
1930 EXPECT_GE(pi1_server_statistics_count, 1u);
1931
Austin Schuh20ac95d2020-12-05 17:24:19 -08001932 timestamp_count = 0;
1933 pi1_server_statistics_count = 0;
1934
Austin Schuh58646e22021-08-23 23:51:46 -07001935 factory.RunFor(chrono::milliseconds(2000));
Austin Schuh20ac95d2020-12-05 17:24:19 -08001936 EXPECT_GT(timestamp_count, 100);
1937 EXPECT_GE(pi1_server_statistics_count, 1u);
1938}
1939
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -07001940INSTANTIATE_TEST_SUITE_P(
Austin Schuh89c9b812021-02-20 14:42:10 -08001941 All, RemoteMessageSimulatedEventLoopTest,
1942 ::testing::Values(
1943 Param{"multinode_pingpong_test_combined_config.json", true},
1944 Param{"multinode_pingpong_test_split_config.json", false}));
1945
Austin Schuh58646e22021-08-23 23:51:46 -07001946// Tests that Startup and Shutdown do reasonable things.
1947TEST(SimulatedEventLoopTest, MultinodePingPongStartup) {
1948 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
1949 aos::configuration::ReadConfig(
1950 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
1951
Austin Schuh72e65682021-09-02 11:37:05 -07001952 size_t pi1_shutdown_counter = 0;
1953 size_t pi2_shutdown_counter = 0;
1954 MessageCounter<examples::Pong> *pi1_pong_counter = nullptr;
1955 MessageCounter<examples::Ping> *pi2_ping_counter = nullptr;
1956
Austin Schuh58646e22021-08-23 23:51:46 -07001957 message_bridge::TestingTimeConverter time(
1958 configuration::NodesCount(&config.message()));
1959 SimulatedEventLoopFactory factory(&config.message());
1960 factory.SetTimeConverter(&time);
1961 time.AddNextTimestamp(
1962 distributed_clock::epoch(),
1963 {BootTimestamp::epoch(), BootTimestamp::epoch(), BootTimestamp::epoch()});
1964
1965 const chrono::nanoseconds dt = chrono::seconds(10) + chrono::milliseconds(6);
1966
1967 time.AddNextTimestamp(
1968 distributed_clock::epoch() + dt,
1969 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
1970 BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
1971 BootTimestamp::epoch() + dt});
1972
1973 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1974 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
1975
1976 // Configure startup to start Ping and Pong, and count.
1977 size_t pi1_startup_counter = 0;
1978 size_t pi2_startup_counter = 0;
1979 pi1->OnStartup([pi1]() {
1980 LOG(INFO) << "Made ping";
1981 pi1->AlwaysStart<Ping>("ping");
1982 });
1983 pi1->OnStartup([&pi1_startup_counter]() { ++pi1_startup_counter; });
1984 pi2->OnStartup([pi2]() {
1985 LOG(INFO) << "Made pong";
1986 pi2->AlwaysStart<Pong>("pong");
1987 });
1988 pi2->OnStartup([&pi2_startup_counter]() { ++pi2_startup_counter; });
1989
1990 // Shutdown just counts.
Austin Schuh58646e22021-08-23 23:51:46 -07001991 pi1->OnShutdown([&pi1_shutdown_counter]() { ++pi1_shutdown_counter; });
1992 pi2->OnShutdown([&pi2_shutdown_counter]() { ++pi2_shutdown_counter; });
1993
Austin Schuh58646e22021-08-23 23:51:46 -07001994 // Automatically make counters on startup.
1995 pi1->OnStartup([&pi1_pong_counter, pi1]() {
1996 pi1_pong_counter = pi1->AlwaysStart<MessageCounter<examples::Pong>>(
1997 "pi1_pong_counter", "/test");
1998 });
1999 pi1->OnShutdown([&pi1_pong_counter]() { pi1_pong_counter = nullptr; });
2000 pi2->OnStartup([&pi2_ping_counter, pi2]() {
2001 pi2_ping_counter = pi2->AlwaysStart<MessageCounter<examples::Ping>>(
2002 "pi2_ping_counter", "/test");
2003 });
2004 pi2->OnShutdown([&pi2_ping_counter]() { pi2_ping_counter = nullptr; });
2005
2006 EXPECT_EQ(pi2_ping_counter, nullptr);
2007 EXPECT_EQ(pi1_pong_counter, nullptr);
2008
2009 EXPECT_EQ(pi1_startup_counter, 0u);
2010 EXPECT_EQ(pi2_startup_counter, 0u);
2011 EXPECT_EQ(pi1_shutdown_counter, 0u);
2012 EXPECT_EQ(pi2_shutdown_counter, 0u);
2013
2014 factory.RunFor(chrono::seconds(10) + chrono::milliseconds(5));
2015 EXPECT_EQ(pi1_startup_counter, 1u);
2016 EXPECT_EQ(pi2_startup_counter, 1u);
2017 EXPECT_EQ(pi1_shutdown_counter, 0u);
2018 EXPECT_EQ(pi2_shutdown_counter, 0u);
2019 EXPECT_EQ(pi2_ping_counter->count(), 1001);
2020 EXPECT_EQ(pi1_pong_counter->count(), 1001);
2021
2022 LOG(INFO) << pi1->monotonic_now();
2023 LOG(INFO) << pi2->monotonic_now();
2024
2025 factory.RunFor(chrono::seconds(5) + chrono::milliseconds(5));
2026
2027 EXPECT_EQ(pi1_startup_counter, 2u);
2028 EXPECT_EQ(pi2_startup_counter, 2u);
2029 EXPECT_EQ(pi1_shutdown_counter, 1u);
2030 EXPECT_EQ(pi2_shutdown_counter, 1u);
2031 EXPECT_EQ(pi2_ping_counter->count(), 501);
2032 EXPECT_EQ(pi1_pong_counter->count(), 501);
2033}
2034
2035// Tests that OnStartup handlers can be added after running and get called, and
2036// can't be called when running.
2037TEST(SimulatedEventLoopDeathTest, OnStartupWhileRunning) {
2038 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2039 aos::configuration::ReadConfig(
2040 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2041
2042 // Test that we can add startup handlers as long as we aren't running, and
2043 // they get run when Run gets called again.
2044 // Test that adding a startup handler when running fails.
2045 //
2046 // Test shutdown handlers get called on destruction.
2047 SimulatedEventLoopFactory factory(&config.message());
2048
2049 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2050
2051 int startup_count0 = 0;
2052 int startup_count1 = 0;
2053
2054 pi1->OnStartup([&]() { ++startup_count0; });
2055 EXPECT_EQ(startup_count0, 0);
2056 EXPECT_EQ(startup_count1, 0);
2057
2058 factory.RunFor(chrono::nanoseconds(1));
2059 EXPECT_EQ(startup_count0, 1);
2060 EXPECT_EQ(startup_count1, 0);
2061
2062 pi1->OnStartup([&]() { ++startup_count1; });
2063 EXPECT_EQ(startup_count0, 1);
2064 EXPECT_EQ(startup_count1, 0);
2065
2066 factory.RunFor(chrono::nanoseconds(1));
2067 EXPECT_EQ(startup_count0, 1);
2068 EXPECT_EQ(startup_count1, 1);
2069
2070 std::unique_ptr<EventLoop> loop = pi1->MakeEventLoop("foo");
2071 loop->OnRun([&]() { pi1->OnStartup([]() {}); });
2072
2073 EXPECT_DEATH({ factory.RunFor(chrono::nanoseconds(1)); },
2074 "Can only register OnStartup handlers when not running.");
2075}
2076
2077// Tests that OnStartup handlers can be added after running and get called, and
2078// all the handlers get called on reboot. Shutdown handlers are tested the same
2079// way.
2080TEST(SimulatedEventLoopTest, OnStartupShutdownAllRestarts) {
2081 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2082 aos::configuration::ReadConfig(
2083 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2084
Austin Schuh72e65682021-09-02 11:37:05 -07002085 int startup_count0 = 0;
2086 int shutdown_count0 = 0;
2087 int startup_count1 = 0;
2088 int shutdown_count1 = 0;
2089
Austin Schuh58646e22021-08-23 23:51:46 -07002090 message_bridge::TestingTimeConverter time(
2091 configuration::NodesCount(&config.message()));
2092 SimulatedEventLoopFactory factory(&config.message());
2093 factory.SetTimeConverter(&time);
2094 time.StartEqual();
2095
2096 const chrono::nanoseconds dt = chrono::seconds(10);
2097 time.RebootAt(0, distributed_clock::epoch() + dt);
2098 time.RebootAt(0, distributed_clock::epoch() + 2 * dt);
2099
2100 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2101
Austin Schuh58646e22021-08-23 23:51:46 -07002102 pi1->OnStartup([&]() { ++startup_count0; });
2103 pi1->OnShutdown([&]() { ++shutdown_count0; });
2104 EXPECT_EQ(startup_count0, 0);
2105 EXPECT_EQ(startup_count1, 0);
2106 EXPECT_EQ(shutdown_count0, 0);
2107 EXPECT_EQ(shutdown_count1, 0);
2108
2109 factory.RunFor(chrono::nanoseconds(1));
2110 EXPECT_EQ(startup_count0, 1);
2111 EXPECT_EQ(startup_count1, 0);
2112 EXPECT_EQ(shutdown_count0, 0);
2113 EXPECT_EQ(shutdown_count1, 0);
2114
2115 pi1->OnStartup([&]() { ++startup_count1; });
2116 EXPECT_EQ(startup_count0, 1);
2117 EXPECT_EQ(startup_count1, 0);
2118 EXPECT_EQ(shutdown_count0, 0);
2119 EXPECT_EQ(shutdown_count1, 0);
2120
2121 factory.RunFor(chrono::nanoseconds(1));
2122 EXPECT_EQ(startup_count0, 1);
2123 EXPECT_EQ(startup_count1, 1);
2124 EXPECT_EQ(shutdown_count0, 0);
2125 EXPECT_EQ(shutdown_count1, 0);
2126
2127 factory.RunFor(chrono::seconds(15));
2128
2129 EXPECT_EQ(startup_count0, 2);
2130 EXPECT_EQ(startup_count1, 2);
2131 EXPECT_EQ(shutdown_count0, 1);
2132 EXPECT_EQ(shutdown_count1, 0);
2133
2134 pi1->OnShutdown([&]() { ++shutdown_count1; });
2135 factory.RunFor(chrono::seconds(10));
2136
2137 EXPECT_EQ(startup_count0, 3);
2138 EXPECT_EQ(startup_count1, 3);
2139 EXPECT_EQ(shutdown_count0, 2);
2140 EXPECT_EQ(shutdown_count1, 1);
2141}
2142
2143// Tests that event loops which outlive shutdown crash.
2144TEST(SimulatedEventLoopDeathTest, EventLoopOutlivesReboot) {
2145 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2146 aos::configuration::ReadConfig(
2147 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2148
2149 message_bridge::TestingTimeConverter time(
2150 configuration::NodesCount(&config.message()));
2151 SimulatedEventLoopFactory factory(&config.message());
2152 factory.SetTimeConverter(&time);
2153 time.StartEqual();
2154
2155 const chrono::nanoseconds dt = chrono::seconds(10);
2156 time.RebootAt(0, distributed_clock::epoch() + dt);
2157
2158 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2159
2160 std::unique_ptr<EventLoop> loop = pi1->MakeEventLoop("foo");
2161
2162 EXPECT_DEATH({ factory.RunFor(dt * 2); }, "Event loop");
2163}
2164
Brian Silvermane1fe2512022-08-14 23:18:50 -07002165// Test that an ExitHandle outliving its factory is caught.
2166TEST(SimulatedEventLoopDeathTest, ExitHandleOutlivesFactory) {
2167 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2168 aos::configuration::ReadConfig(
2169 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2170 auto factory = std::make_unique<SimulatedEventLoopFactory>(&config.message());
2171 NodeEventLoopFactory *pi1 = factory->GetNodeEventLoopFactory("pi1");
2172 std::unique_ptr<EventLoop> loop = pi1->MakeEventLoop("foo");
2173 auto exit_handle = factory->MakeExitHandle();
2174 EXPECT_DEATH(factory.reset(),
2175 "All ExitHandles must be destroyed before the factory");
2176}
2177
Austin Schuh3e31f912023-08-21 21:29:10 -07002178// Test that AllowApplicationCreationDuring can't happen in OnRun callbacks.
2179TEST(SimulatedEventLoopDeathTest, AllowApplicationCreationDuringInOnRun) {
2180 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2181 aos::configuration::ReadConfig(
2182 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2183 auto factory = std::make_unique<SimulatedEventLoopFactory>(&config.message());
2184 NodeEventLoopFactory *pi1 = factory->GetNodeEventLoopFactory("pi1");
2185 std::unique_ptr<EventLoop> loop = pi1->MakeEventLoop("foo");
2186 loop->OnRun([&]() { factory->AllowApplicationCreationDuring([]() {}); });
2187 EXPECT_DEATH(factory->RunFor(chrono::seconds(1)), "OnRun");
2188}
2189
Austin Schuh58646e22021-08-23 23:51:46 -07002190// Tests that messages don't survive a reboot of a node.
2191TEST(SimulatedEventLoopTest, ChannelClearedOnReboot) {
2192 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2193 aos::configuration::ReadConfig(
2194 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2195
2196 message_bridge::TestingTimeConverter time(
2197 configuration::NodesCount(&config.message()));
2198 SimulatedEventLoopFactory factory(&config.message());
2199 factory.SetTimeConverter(&time);
2200 time.StartEqual();
2201
2202 const chrono::nanoseconds dt = chrono::seconds(10);
2203 time.RebootAt(0, distributed_clock::epoch() + dt);
2204
2205 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2206
2207 const UUID boot_uuid = pi1->boot_uuid();
2208 EXPECT_NE(boot_uuid, UUID::Zero());
2209
2210 {
2211 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
2212 aos::Sender<examples::Ping> test_message_sender =
2213 ping_event_loop->MakeSender<examples::Ping>("/reliable");
2214 SendPing(&test_message_sender, 1);
2215 }
2216
2217 factory.RunFor(chrono::seconds(5));
2218
2219 {
2220 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
2221 aos::Fetcher<examples::Ping> fetcher =
2222 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
2223 EXPECT_TRUE(fetcher.Fetch());
2224 }
2225
2226 factory.RunFor(chrono::seconds(10));
2227
2228 {
2229 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
2230 aos::Fetcher<examples::Ping> fetcher =
2231 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
2232 EXPECT_FALSE(fetcher.Fetch());
2233 }
2234 EXPECT_NE(boot_uuid, pi1->boot_uuid());
2235}
2236
2237// Tests that reliable messages get resent on reboot.
2238TEST(SimulatedEventLoopTest, ReliableMessageResentOnReboot) {
2239 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2240 aos::configuration::ReadConfig(
2241 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2242
2243 message_bridge::TestingTimeConverter time(
2244 configuration::NodesCount(&config.message()));
2245 SimulatedEventLoopFactory factory(&config.message());
2246 factory.SetTimeConverter(&time);
2247 time.StartEqual();
2248
2249 const chrono::nanoseconds dt = chrono::seconds(1);
2250 time.RebootAt(1, distributed_clock::epoch() + dt);
2251
2252 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2253 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2254
2255 const UUID pi1_boot_uuid = pi1->boot_uuid();
2256 const UUID pi2_boot_uuid = pi2->boot_uuid();
2257 EXPECT_NE(pi1_boot_uuid, UUID::Zero());
2258 EXPECT_NE(pi2_boot_uuid, UUID::Zero());
2259
2260 {
2261 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
2262 aos::Sender<examples::Ping> test_message_sender =
2263 ping_event_loop->MakeSender<examples::Ping>("/reliable");
2264 SendPing(&test_message_sender, 1);
2265 }
2266
2267 factory.RunFor(chrono::milliseconds(500));
2268
2269 {
2270 ::std::unique_ptr<EventLoop> ping_event_loop = pi2->MakeEventLoop("pong");
2271 aos::Fetcher<examples::Ping> fetcher =
2272 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
Austin Schuhac6d89e2024-03-27 14:56:09 -07002273 ASSERT_TRUE(fetcher.Fetch());
2274 EXPECT_EQ(fetcher.context().monotonic_remote_time,
2275 monotonic_clock::epoch());
2276 // Message bridge picks up the Ping message immediately on reboot.
2277 EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
2278 monotonic_clock::epoch());
2279 EXPECT_EQ(fetcher.context().monotonic_event_time,
2280 monotonic_clock::epoch() + factory.network_delay());
2281 ASSERT_FALSE(fetcher.Fetch());
Austin Schuh58646e22021-08-23 23:51:46 -07002282 }
2283
2284 factory.RunFor(chrono::seconds(1));
2285
2286 {
2287 ::std::unique_ptr<EventLoop> ping_event_loop = pi2->MakeEventLoop("pong");
2288 aos::Fetcher<examples::Ping> fetcher =
2289 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
Austin Schuhac6d89e2024-03-27 14:56:09 -07002290 ASSERT_TRUE(fetcher.Fetch());
2291 EXPECT_EQ(fetcher.context().monotonic_remote_time,
2292 monotonic_clock::epoch());
2293 // Message bridge picks up the Ping message immediately on reboot.
2294 EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
2295 monotonic_clock::epoch() + chrono::seconds(1));
2296 EXPECT_EQ(fetcher.context().monotonic_event_time,
2297 monotonic_clock::epoch() + factory.network_delay());
2298 ASSERT_FALSE(fetcher.Fetch());
Austin Schuh58646e22021-08-23 23:51:46 -07002299 }
2300 EXPECT_NE(pi2_boot_uuid, pi2->boot_uuid());
2301}
2302
James Kuszmaul86e86c32022-07-21 17:39:47 -07002303TEST(SimulatedEventLoopTest, ReliableMessageSentOnStaggeredBoot) {
2304 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2305 aos::configuration::ReadConfig(
2306 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2307
2308 message_bridge::TestingTimeConverter time(
2309 configuration::NodesCount(&config.message()));
2310 time.AddNextTimestamp(
2311 distributed_clock::epoch(),
2312 {BootTimestamp{0, monotonic_clock::epoch()},
2313 BootTimestamp{0, monotonic_clock::epoch() - chrono::seconds(1)},
2314 BootTimestamp{0, monotonic_clock::epoch()}});
2315 SimulatedEventLoopFactory factory(&config.message());
2316 factory.SetTimeConverter(&time);
2317
2318 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2319 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2320
2321 const UUID pi1_boot_uuid = pi1->boot_uuid();
2322 const UUID pi2_boot_uuid = pi2->boot_uuid();
2323 EXPECT_NE(pi1_boot_uuid, UUID::Zero());
2324 EXPECT_NE(pi2_boot_uuid, UUID::Zero());
2325
2326 {
2327 ::std::unique_ptr<EventLoop> pi1_event_loop = pi1->MakeEventLoop("ping");
2328 aos::Sender<examples::Ping> pi1_sender =
2329 pi1_event_loop->MakeSender<examples::Ping>("/reliable");
2330 SendPing(&pi1_sender, 1);
2331 }
2332 ::std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("ping");
2333 aos::Sender<examples::Ping> pi2_sender =
2334 pi2_event_loop->MakeSender<examples::Ping>("/reliable2");
2335 SendPing(&pi2_sender, 1);
2336 // Verify that we staggered the OnRun callback correctly.
2337 pi2_event_loop->OnRun([pi1, pi2]() {
2338 EXPECT_EQ(pi1->monotonic_now(),
2339 monotonic_clock::epoch() + std::chrono::seconds(1));
2340 EXPECT_EQ(pi2->monotonic_now(), monotonic_clock::epoch());
2341 });
2342
2343 factory.RunFor(chrono::seconds(2));
2344
2345 {
2346 ::std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("pong");
2347 aos::Fetcher<examples::Ping> fetcher =
2348 pi2_event_loop->MakeFetcher<examples::Ping>("/reliable");
2349 ASSERT_TRUE(fetcher.Fetch());
2350 EXPECT_EQ(fetcher.context().monotonic_event_time,
2351 monotonic_clock::epoch() + factory.network_delay());
2352 EXPECT_EQ(fetcher.context().monotonic_remote_time,
2353 monotonic_clock::epoch());
Austin Schuhac6d89e2024-03-27 14:56:09 -07002354 EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
2355 monotonic_clock::epoch() + chrono::seconds(1));
James Kuszmaul86e86c32022-07-21 17:39:47 -07002356 }
2357 {
2358 ::std::unique_ptr<EventLoop> pi1_event_loop = pi1->MakeEventLoop("pong");
2359 aos::Fetcher<examples::Ping> fetcher =
2360 pi1_event_loop->MakeFetcher<examples::Ping>("/reliable2");
2361 ASSERT_TRUE(fetcher.Fetch());
2362 EXPECT_EQ(fetcher.context().monotonic_event_time,
2363 monotonic_clock::epoch() + std::chrono::seconds(1) +
2364 factory.network_delay());
2365 EXPECT_EQ(fetcher.context().monotonic_remote_time,
2366 monotonic_clock::epoch() - std::chrono::seconds(1));
Austin Schuhac6d89e2024-03-27 14:56:09 -07002367 EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
2368 monotonic_clock::epoch());
James Kuszmaul86e86c32022-07-21 17:39:47 -07002369 }
2370}
2371
Austin Schuh48205e62021-11-12 14:13:18 -08002372class SimulatedEventLoopDisconnectTest : public ::testing::Test {
2373 public:
2374 SimulatedEventLoopDisconnectTest()
2375 : config(aos::configuration::ReadConfig(ArtifactPath(
2376 "aos/events/multinode_pingpong_test_split_config.json"))),
2377 time(configuration::NodesCount(&config.message())),
2378 factory(&config.message()) {
2379 factory.SetTimeConverter(&time);
2380 }
2381
2382 void VerifyChannels(std::set<const aos::Channel *> statistics_channels,
2383 const monotonic_clock::time_point allowable_message_time,
2384 std::set<const aos::Node *> empty_nodes) {
2385 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2386 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2387 std::unique_ptr<aos::EventLoop> pi1_event_loop =
2388 pi1->MakeEventLoop("fetcher");
2389 std::unique_ptr<aos::EventLoop> pi2_event_loop =
2390 pi2->MakeEventLoop("fetcher");
2391 for (const aos::Channel *channel : *factory.configuration()->channels()) {
2392 if (configuration::ChannelIsReadableOnNode(channel,
2393 pi1_event_loop->node())) {
2394 std::unique_ptr<aos::RawFetcher> fetcher =
2395 pi1_event_loop->MakeRawFetcher(channel);
2396 if (statistics_channels.find(channel) == statistics_channels.end() ||
2397 empty_nodes.find(pi1_event_loop->node()) != empty_nodes.end()) {
2398 EXPECT_FALSE(fetcher->Fetch() &&
2399 fetcher->context().monotonic_event_time >
2400 allowable_message_time)
2401 << ": Found recent message on channel "
2402 << configuration::CleanedChannelToString(channel) << " and time "
2403 << fetcher->context().monotonic_event_time << " > "
2404 << allowable_message_time << " on pi1";
2405 } else {
2406 EXPECT_TRUE(fetcher->Fetch() &&
2407 fetcher->context().monotonic_event_time >=
2408 allowable_message_time)
2409 << ": Didn't find recent message on channel "
2410 << configuration::CleanedChannelToString(channel) << " on pi1";
2411 }
2412 }
2413 if (configuration::ChannelIsReadableOnNode(channel,
2414 pi2_event_loop->node())) {
2415 std::unique_ptr<aos::RawFetcher> fetcher =
2416 pi2_event_loop->MakeRawFetcher(channel);
2417 if (statistics_channels.find(channel) == statistics_channels.end() ||
2418 empty_nodes.find(pi2_event_loop->node()) != empty_nodes.end()) {
2419 EXPECT_FALSE(fetcher->Fetch() &&
2420 fetcher->context().monotonic_event_time >
2421 allowable_message_time)
2422 << ": Found message on channel "
2423 << configuration::CleanedChannelToString(channel) << " and time "
2424 << fetcher->context().monotonic_event_time << " > "
2425 << allowable_message_time << " on pi2";
2426 } else {
2427 EXPECT_TRUE(fetcher->Fetch() &&
2428 fetcher->context().monotonic_event_time >=
2429 allowable_message_time)
2430 << ": Didn't find message on channel "
2431 << configuration::CleanedChannelToString(channel) << " on pi2";
2432 }
2433 }
2434 }
2435 }
2436
2437 aos::FlatbufferDetachedBuffer<aos::Configuration> config;
2438
2439 message_bridge::TestingTimeConverter time;
2440 SimulatedEventLoopFactory factory;
2441};
2442
2443// Tests that if we have message bridge client/server disabled, and timing
2444// reports disabled, no messages are sent. Also tests that we can disconnect a
2445// node and disable statistics on it and it actually fully disconnects.
2446TEST_F(SimulatedEventLoopDisconnectTest, NoMessagesWhenDisabled) {
2447 time.StartEqual();
2448 factory.SkipTimingReport();
2449 factory.DisableStatistics();
2450
2451 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2452 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2453
2454 std::unique_ptr<aos::EventLoop> pi1_event_loop =
2455 pi1->MakeEventLoop("fetcher");
2456 std::unique_ptr<aos::EventLoop> pi2_event_loop =
2457 pi2->MakeEventLoop("fetcher");
2458
2459 factory.RunFor(chrono::milliseconds(100000));
2460
2461 // Confirm no messages are sent if we've configured them all off.
2462 VerifyChannels({}, monotonic_clock::min_time, {});
2463
2464 // Now, confirm that all the message_bridge channels come back when we
2465 // re-enable.
2466 factory.EnableStatistics();
2467
2468 factory.RunFor(chrono::milliseconds(10050));
2469
2470 // Build up the list of all the messages we expect when we come back.
2471 {
2472 std::set<const aos::Channel *> statistics_channels;
Brian Silverman18c8aac2022-01-02 00:03:37 -08002473 for (const std::pair<std::string_view, const Node *> &pi :
Austin Schuh48205e62021-11-12 14:13:18 -08002474 std::vector<std::pair<std::string_view, const Node *>>{
2475 {"/pi1/aos", pi1->node()},
2476 {"/pi2/aos", pi1->node()},
2477 {"/pi3/aos", pi1->node()}}) {
2478 statistics_channels.insert(configuration::GetChannel(
2479 factory.configuration(), pi.first, "aos.message_bridge.Timestamp", "",
2480 pi.second));
2481 statistics_channels.insert(configuration::GetChannel(
2482 factory.configuration(), pi.first,
2483 "aos.message_bridge.ServerStatistics", "", pi.second));
2484 statistics_channels.insert(configuration::GetChannel(
2485 factory.configuration(), pi.first,
2486 "aos.message_bridge.ClientStatistics", "", pi.second));
2487 }
2488
2489 statistics_channels.insert(configuration::GetChannel(
2490 factory.configuration(),
2491 "/pi1/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp",
2492 "aos.message_bridge.RemoteMessage", "", pi1->node()));
2493 statistics_channels.insert(configuration::GetChannel(
2494 factory.configuration(),
2495 "/pi2/aos/remote_timestamps/pi1/pi2/aos/aos-message_bridge-Timestamp",
2496 "aos.message_bridge.RemoteMessage", "", pi2->node()));
2497 VerifyChannels(statistics_channels, monotonic_clock::min_time, {});
2498 }
2499
2500 // Now test that we can disable the messages for a single node
2501 pi2->DisableStatistics();
2502 const aos::monotonic_clock::time_point statistics_disable_time =
2503 pi2->monotonic_now();
2504 factory.RunFor(chrono::milliseconds(10000));
2505
2506 // We should see a much smaller set of messages, but should still see messages
2507 // forwarded, mainly the timestamp message.
2508 {
2509 std::set<const aos::Channel *> statistics_channels;
Brian Silverman18c8aac2022-01-02 00:03:37 -08002510 for (const std::pair<std::string_view, const Node *> &pi :
Austin Schuh48205e62021-11-12 14:13:18 -08002511 std::vector<std::pair<std::string_view, const Node *>>{
2512 {"/pi1/aos", pi1->node()}, {"/pi3/aos", pi1->node()}}) {
2513 statistics_channels.insert(configuration::GetChannel(
2514 factory.configuration(), pi.first, "aos.message_bridge.Timestamp", "",
2515 pi.second));
2516 statistics_channels.insert(configuration::GetChannel(
2517 factory.configuration(), pi.first,
2518 "aos.message_bridge.ServerStatistics", "", pi.second));
2519 statistics_channels.insert(configuration::GetChannel(
2520 factory.configuration(), pi.first,
2521 "aos.message_bridge.ClientStatistics", "", pi.second));
2522 }
2523
2524 statistics_channels.insert(configuration::GetChannel(
2525 factory.configuration(),
2526 "/pi1/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp",
2527 "aos.message_bridge.RemoteMessage", "", pi1->node()));
2528 VerifyChannels(statistics_channels, statistics_disable_time, {});
2529 }
2530
2531 // Now, fully disconnect the node. This will completely quiet down pi2.
2532 pi1->Disconnect(pi2->node());
2533 pi2->Disconnect(pi1->node());
2534
2535 const aos::monotonic_clock::time_point disconnect_disable_time =
2536 pi2->monotonic_now();
2537 factory.RunFor(chrono::milliseconds(10000));
2538
2539 {
2540 std::set<const aos::Channel *> statistics_channels;
Brian Silverman18c8aac2022-01-02 00:03:37 -08002541 for (const std::pair<std::string_view, const Node *> &pi :
Austin Schuh48205e62021-11-12 14:13:18 -08002542 std::vector<std::pair<std::string_view, const Node *>>{
2543 {"/pi1/aos", pi1->node()}, {"/pi3/aos", pi1->node()}}) {
2544 statistics_channels.insert(configuration::GetChannel(
2545 factory.configuration(), pi.first, "aos.message_bridge.Timestamp", "",
2546 pi.second));
2547 statistics_channels.insert(configuration::GetChannel(
2548 factory.configuration(), pi.first,
2549 "aos.message_bridge.ServerStatistics", "", pi.second));
2550 statistics_channels.insert(configuration::GetChannel(
2551 factory.configuration(), pi.first,
2552 "aos.message_bridge.ClientStatistics", "", pi.second));
2553 }
2554
2555 VerifyChannels(statistics_channels, disconnect_disable_time, {pi2->node()});
2556 }
2557}
2558
Austin Schuh9cce6842024-04-02 18:55:44 -07002559// Struct to capture the expected time a message should be received (and it's
2560// value). This is from the perspective of the node receiving the message.
2561struct ExpectedTimestamps {
2562 // The time that the message was published on the sending node's monotonic
2563 // clock.
2564 monotonic_clock::time_point remote_time;
2565 // The time that the message was virtually transmitted over the virtual
2566 // network on the sending node's monotonic clock.
2567 monotonic_clock::time_point remote_transmit_time;
2568 // The time that the message was received on the receiving node's clock.
2569 monotonic_clock::time_point event_time;
2570 // The value inside the message.
2571 int value;
2572};
2573
Austin Schuhac6d89e2024-03-27 14:56:09 -07002574// Tests that rapidly sent messages get timestamped correctly.
2575TEST(SimulatedEventLoopTest, TransmitTimestamps) {
2576 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2577 aos::configuration::ReadConfig(
2578 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2579
2580 message_bridge::TestingTimeConverter time(
2581 configuration::NodesCount(&config.message()));
2582 SimulatedEventLoopFactory factory(&config.message());
2583 factory.SetTimeConverter(&time);
2584 time.StartEqual();
2585
2586 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2587 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2588
2589 ::std::unique_ptr<EventLoop> ping_event_loop = pi2->MakeEventLoop("pong");
2590 aos::Fetcher<examples::Ping> fetcher =
2591 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
2592 EXPECT_FALSE(fetcher.Fetch());
2593
2594 {
2595 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
Austin Schuh9cce6842024-04-02 18:55:44 -07002596 FunctionScheduler run_at(ping_event_loop.get());
Austin Schuhac6d89e2024-03-27 14:56:09 -07002597 aos::Sender<examples::Ping> test_message_sender =
2598 ping_event_loop->MakeSender<examples::Ping>("/reliable");
Austin Schuh9cce6842024-04-02 18:55:44 -07002599 aos::monotonic_clock::time_point now = ping_event_loop->monotonic_now();
Austin Schuhac6d89e2024-03-27 14:56:09 -07002600 for (const std::chrono::nanoseconds dt :
2601 {chrono::microseconds(5000), chrono::microseconds(1),
2602 chrono::microseconds(2), chrono::microseconds(70),
Austin Schuh9cce6842024-04-02 18:55:44 -07002603 chrono::microseconds(63), chrono::microseconds(140)}) {
2604 now += dt;
2605 run_at.ScheduleAt([&]() { SendPing(&test_message_sender, 1); }, now);
Austin Schuhac6d89e2024-03-27 14:56:09 -07002606 }
2607
Austin Schuh9cce6842024-04-02 18:55:44 -07002608 now += chrono::milliseconds(10);
2609
2610 factory.RunFor(now - ping_event_loop->monotonic_now());
Austin Schuhac6d89e2024-03-27 14:56:09 -07002611 }
2612
Austin Schuh9cce6842024-04-02 18:55:44 -07002613 const monotonic_clock::time_point e = monotonic_clock::epoch();
2614 const chrono::nanoseconds send_delay = factory.send_delay();
2615 const chrono::nanoseconds network_delay = factory.network_delay();
Austin Schuhac6d89e2024-03-27 14:56:09 -07002616
Austin Schuh9cce6842024-04-02 18:55:44 -07002617 const std::vector<ExpectedTimestamps> expected_values = {
2618 // First message shows up after wakeup + network delay as expected.
2619 ExpectedTimestamps{
2620 .remote_time = e + chrono::microseconds(5000),
2621 .remote_transmit_time = e + chrono::microseconds(5000) + send_delay,
2622 .event_time =
2623 e + chrono::microseconds(5000) + send_delay + network_delay,
2624 .value = 1,
2625 },
2626 // Next message is close enough that it gets picked up at the same wakeup.
2627 ExpectedTimestamps{
2628 .remote_time = e + chrono::microseconds(5001),
2629 .remote_transmit_time = e + chrono::microseconds(5000) + send_delay,
2630 .event_time =
2631 e + chrono::microseconds(5000) + send_delay + network_delay,
2632 .value = 1,
2633 },
2634 // Same for the third.
2635 ExpectedTimestamps{
2636 .remote_time = e + chrono::microseconds(5003),
2637 .remote_transmit_time = e + chrono::microseconds(5000) + send_delay,
2638 .event_time =
2639 e + chrono::microseconds(5000) + send_delay + network_delay,
2640 .value = 1,
2641 },
2642 // Fourth waits long enough to do the right thing.
2643 ExpectedTimestamps{
2644 .remote_time = e + chrono::microseconds(5073),
2645 .remote_transmit_time = e + chrono::microseconds(5073) + send_delay,
2646 .event_time =
2647 e + chrono::microseconds(5073) + send_delay + network_delay,
2648 .value = 1,
2649 },
2650 // Fifth waits long enough to do the right thing as well (but kicks off
2651 // while the fourth is in flight over the network).
2652 ExpectedTimestamps{
2653 .remote_time = e + chrono::microseconds(5136),
2654 .remote_transmit_time = e + chrono::microseconds(5136) + send_delay,
2655 .event_time =
2656 e + chrono::microseconds(5136) + send_delay + network_delay,
2657 .value = 1,
2658 },
2659 // Sixth waits long enough to do the right thing as well (but kicks off
2660 // while the fifth is in flight over the network and has almost landed).
2661 // The timer wakeup for the Timestamp message coming back will find the
2662 // sixth message a little bit early.
2663 ExpectedTimestamps{
2664 .remote_time = e + chrono::microseconds(5276),
2665 .remote_transmit_time = e + chrono::microseconds(5273) + send_delay,
2666 .event_time =
2667 e + chrono::microseconds(5273) + send_delay + network_delay,
2668 .value = 1,
2669 },
2670 };
Austin Schuhac6d89e2024-03-27 14:56:09 -07002671
Austin Schuh9cce6842024-04-02 18:55:44 -07002672 for (const ExpectedTimestamps value : expected_values) {
2673 ASSERT_TRUE(fetcher.FetchNext());
2674 EXPECT_EQ(fetcher.context().monotonic_remote_time, value.remote_time);
2675 EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
2676 value.remote_transmit_time);
2677 EXPECT_EQ(fetcher.context().monotonic_event_time, value.event_time);
2678 EXPECT_EQ(fetcher->value(), value.value);
2679 }
Austin Schuhac6d89e2024-03-27 14:56:09 -07002680
2681 ASSERT_FALSE(fetcher.FetchNext());
2682}
2683
2684// Tests that a reliable message gets forwarded if it was sent originally when
2685// nodes were disconnected.
2686TEST_F(SimulatedEventLoopDisconnectTest, ReliableMessageSendsOnConnect) {
2687 time.StartEqual();
2688 factory.SkipTimingReport();
2689 factory.DisableStatistics();
2690
2691 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2692 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2693
2694 // Fully disconnect the nodes.
2695 pi1->Disconnect(pi2->node());
2696 pi2->Disconnect(pi1->node());
2697
Austin Schuhac6d89e2024-03-27 14:56:09 -07002698 std::unique_ptr<aos::EventLoop> pi2_event_loop =
2699 pi2->MakeEventLoop("fetcher");
2700 aos::Fetcher<examples::Ping> pi2_reliable_fetcher =
2701 pi2_event_loop->MakeFetcher<examples::Ping>("/reliable");
2702
2703 factory.RunFor(chrono::milliseconds(100));
2704
2705 {
Austin Schuheeb86fc2024-04-04 20:12:39 -07002706 std::unique_ptr<aos::EventLoop> pi1_event_loop =
2707 pi1->MakeEventLoop("sender");
Austin Schuhac6d89e2024-03-27 14:56:09 -07002708 aos::Sender<examples::Ping> pi1_reliable_sender =
2709 pi1_event_loop->MakeSender<examples::Ping>("/reliable");
Austin Schuh9cce6842024-04-02 18:55:44 -07002710 FunctionScheduler run_at(pi1_event_loop.get());
2711 aos::monotonic_clock::time_point now = pi1_event_loop->monotonic_now();
Austin Schuhac6d89e2024-03-27 14:56:09 -07002712 for (int i = 0; i < 100; ++i) {
Austin Schuh9cce6842024-04-02 18:55:44 -07002713 run_at.ScheduleAt([&, i = i]() { SendPing(&pi1_reliable_sender, i); },
2714 now);
2715 now += chrono::milliseconds(100);
Austin Schuhac6d89e2024-03-27 14:56:09 -07002716 }
Austin Schuh9cce6842024-04-02 18:55:44 -07002717 now += chrono::milliseconds(50);
Austin Schuhac6d89e2024-03-27 14:56:09 -07002718
Austin Schuh9cce6842024-04-02 18:55:44 -07002719 factory.RunFor(now - pi1_event_loop->monotonic_now());
2720 }
Austin Schuhac6d89e2024-03-27 14:56:09 -07002721
2722 ASSERT_FALSE(pi2_reliable_fetcher.Fetch());
2723
2724 pi1->Connect(pi2->node());
2725 pi2->Connect(pi1->node());
2726
2727 factory.RunFor(chrono::milliseconds(1));
2728
2729 ASSERT_TRUE(pi2_reliable_fetcher.Fetch());
2730 ASSERT_EQ(pi2_reliable_fetcher.context().monotonic_remote_time,
2731 monotonic_clock::epoch() + chrono::milliseconds(10000));
2732 ASSERT_EQ(pi2_reliable_fetcher.context().monotonic_remote_transmit_time,
2733 monotonic_clock::epoch() + chrono::milliseconds(10150));
2734 ASSERT_EQ(pi2_reliable_fetcher.context().monotonic_event_time,
2735 monotonic_clock::epoch() + chrono::milliseconds(10150) +
2736 factory.network_delay());
2737 ASSERT_EQ(pi2_reliable_fetcher->value(), 99);
2738
Austin Schuh9cce6842024-04-02 18:55:44 -07002739 // TODO(austin): Verify that the dropped packet count increases.
2740
Austin Schuhac6d89e2024-03-27 14:56:09 -07002741 ASSERT_FALSE(pi2_reliable_fetcher.Fetch());
2742}
2743
Austin Schuh9cce6842024-04-02 18:55:44 -07002744// Tests that if we disconnect while a message is in various states of being
2745// queued, it gets either dropped or sent as expected.
2746TEST_F(SimulatedEventLoopDisconnectTest, MessageInFlightDuringDisconnect) {
2747 time.StartEqual();
2748 factory.SkipTimingReport();
2749 factory.DisableStatistics();
2750
2751 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2752 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2753
2754 std::unique_ptr<aos::EventLoop> pi1_event_loop = pi1->MakeEventLoop("sender");
2755
2756 std::unique_ptr<aos::EventLoop> pi2_event_loop =
2757 pi2->MakeEventLoop("fetcher");
2758 aos::Fetcher<examples::Ping> fetcher =
2759 pi2_event_loop->MakeFetcher<examples::Ping>("/unreliable");
2760
2761 ASSERT_FALSE(fetcher.Fetch());
2762
2763 aos::monotonic_clock::time_point now = pi1_event_loop->monotonic_now();
2764 {
2765 FunctionScheduler run_at(pi1_event_loop.get());
2766 aos::Sender<examples::Ping> pi1_sender =
2767 pi1_event_loop->MakeSender<examples::Ping>("/unreliable");
2768
2769 int i = 0;
2770 for (const std::chrono::nanoseconds dt :
2771 {chrono::microseconds(5000), chrono::microseconds(1),
2772 chrono::microseconds(2), chrono::microseconds(70),
2773 chrono::microseconds(63), chrono::microseconds(140),
2774 chrono::microseconds(160)}) {
2775 run_at.ScheduleAt(
2776 [&]() {
2777 pi1->Connect(pi2->node());
2778 pi2->Connect(pi1->node());
2779 },
2780 now);
2781
2782 now += chrono::milliseconds(100);
2783
2784 run_at.ScheduleAt([&, i = i]() { SendPing(&pi1_sender, i); }, now);
2785
2786 now += dt;
2787
2788 run_at.ScheduleAt(
2789 [&]() {
2790 // Fully disconnect the nodes.
2791 pi1->Disconnect(pi2->node());
2792 pi2->Disconnect(pi1->node());
2793 },
2794 now);
2795
2796 now += chrono::milliseconds(100) - dt;
2797 ++i;
2798 }
2799
2800 factory.RunFor(now - pi1_event_loop->monotonic_now());
2801 }
2802
2803 const monotonic_clock::time_point e = monotonic_clock::epoch();
2804 const chrono::nanoseconds send_delay = factory.send_delay();
2805 const chrono::nanoseconds network_delay = factory.network_delay();
2806
2807 const std::vector<ExpectedTimestamps> expected_values = {
2808 ExpectedTimestamps{
2809 .remote_time = e + chrono::milliseconds(100),
2810 .remote_transmit_time = e + chrono::milliseconds(100) + send_delay,
2811 .event_time =
2812 e + chrono::milliseconds(100) + send_delay + network_delay,
2813 .value = 0,
2814 },
2815 ExpectedTimestamps{
2816 .remote_time = e + chrono::milliseconds(1300),
2817 .remote_transmit_time = e + chrono::milliseconds(1300) + send_delay,
2818 .event_time =
2819 e + chrono::milliseconds(1300) + send_delay + network_delay,
2820 .value = 6,
2821 },
2822 };
2823
2824 for (const ExpectedTimestamps value : expected_values) {
2825 ASSERT_TRUE(fetcher.FetchNext());
2826 EXPECT_EQ(fetcher.context().monotonic_remote_time, value.remote_time);
2827 EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
2828 value.remote_transmit_time);
2829 EXPECT_EQ(fetcher.context().monotonic_event_time, value.event_time);
2830 EXPECT_EQ(fetcher->value(), value.value);
2831 }
2832
2833 // TODO(austin): Verify that the dropped packet count increases.
2834
2835 ASSERT_FALSE(fetcher.Fetch());
2836}
2837
2838class PingLogger {
2839 public:
2840 PingLogger(aos::EventLoop *event_loop, std::string_view channel,
2841 std::vector<std::pair<aos::Context, int>> *msgs)
2842 : event_loop_(event_loop),
2843 fetcher_(event_loop_->MakeFetcher<examples::Ping>(channel)),
2844 msgs_(msgs) {
2845 event_loop_->OnRun([this]() { CHECK(!fetcher_.Fetch()); });
2846 }
2847
2848 ~PingLogger() {
2849 while (fetcher_.FetchNext()) {
2850 msgs_->emplace_back(fetcher_.context(), fetcher_->value());
2851 }
2852 }
2853
2854 private:
2855 aos::EventLoop *event_loop_;
2856 aos::Fetcher<examples::Ping> fetcher_;
2857 std::vector<std::pair<aos::Context, int>> *msgs_;
2858};
2859
2860// Tests that rebooting while a message is in flight works as expected.
2861TEST_F(SimulatedEventLoopDisconnectTest, MessageInFlightDuringReboot) {
2862 time.StartEqual();
2863 for (int i = 0; i < 8; ++i) {
2864 time.RebootAt(1, distributed_clock::epoch() + chrono::seconds(10 * i));
2865 }
2866
2867 factory.SkipTimingReport();
2868 factory.DisableStatistics();
2869
2870 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2871 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2872
2873 std::unique_ptr<aos::EventLoop> pi1_event_loop = pi1->MakeEventLoop("sender");
2874
2875 aos::monotonic_clock::time_point now = pi1_event_loop->monotonic_now();
2876 FunctionScheduler run_at(pi1_event_loop.get());
2877 aos::Sender<examples::Ping> pi1_sender =
2878 pi1_event_loop->MakeSender<examples::Ping>("/unreliable");
2879
2880 int i = 0;
2881 for (const std::chrono::nanoseconds dt :
2882 {chrono::microseconds(5000), chrono::microseconds(1),
2883 chrono::microseconds(2), chrono::microseconds(70),
2884 chrono::microseconds(63), chrono::microseconds(140),
2885 chrono::microseconds(160)}) {
2886 run_at.ScheduleAt([&, i = i]() { SendPing(&pi1_sender, i); },
2887 now + chrono::seconds(10) - dt);
2888
2889 now += chrono::seconds(10);
2890 ++i;
2891 }
2892
2893 std::vector<std::pair<aos::Context, int>> msgs;
2894
2895 pi2->OnStartup([pi2, &msgs]() {
2896 pi2->AlwaysStart<PingLogger>("ping_logger", "/unreliable", &msgs);
2897 });
2898
2899 factory.RunFor(now - pi1_event_loop->monotonic_now() + chrono::seconds(10));
2900
2901 const monotonic_clock::time_point e = monotonic_clock::epoch();
2902 const chrono::nanoseconds send_delay = factory.send_delay();
2903 const chrono::nanoseconds network_delay = factory.network_delay();
2904
2905 const std::vector<ExpectedTimestamps> expected_values = {
2906 ExpectedTimestamps{
2907 .remote_time = e + chrono::microseconds(9995000),
2908 .remote_transmit_time =
2909 e + chrono::microseconds(9995000) + send_delay,
2910 .event_time =
2911 e + chrono::microseconds(9995000) + send_delay + network_delay,
2912 .value = 0,
2913 },
2914 ExpectedTimestamps{
2915 .remote_time = e + chrono::microseconds(19999999),
2916 .remote_transmit_time =
2917 e + chrono::microseconds(19999999) + send_delay,
2918 .event_time =
2919 e + chrono::microseconds(-1) + send_delay + network_delay,
2920 .value = 1,
2921 },
2922 ExpectedTimestamps{
2923 .remote_time = e + chrono::microseconds(29999998),
2924 .remote_transmit_time =
2925 e + chrono::microseconds(29999998) + send_delay,
2926 .event_time =
2927 e + chrono::microseconds(-2) + send_delay + network_delay,
2928 .value = 2,
2929 },
2930 ExpectedTimestamps{
2931 .remote_time = e + chrono::microseconds(69999840),
2932 .remote_transmit_time =
2933 e + chrono::microseconds(69999840) + send_delay,
2934 .event_time =
2935 e + chrono::microseconds(9999840) + send_delay + network_delay,
2936 .value = 6,
2937 },
2938 };
2939
2940 ASSERT_EQ(msgs.size(), expected_values.size());
2941
2942 for (size_t i = 0; i < msgs.size(); ++i) {
2943 EXPECT_EQ(msgs[i].first.monotonic_remote_time,
2944 expected_values[i].remote_time);
2945 EXPECT_EQ(msgs[i].first.monotonic_remote_transmit_time,
2946 expected_values[i].remote_transmit_time);
2947 EXPECT_EQ(msgs[i].first.monotonic_event_time,
2948 expected_values[i].event_time);
2949 EXPECT_EQ(msgs[i].second, expected_values[i].value);
2950 }
2951
2952 // TODO(austin): Verify that the dropped packet count increases.
2953}
2954
2955// Tests that rebooting while a message is in flight works as expected.
2956TEST_F(SimulatedEventLoopDisconnectTest, ReliableMessageInFlightDuringReboot) {
2957 time.StartEqual();
2958 for (int i = 0; i < 8; ++i) {
2959 time.RebootAt(1, distributed_clock::epoch() + chrono::seconds(10 * i));
2960 }
2961
2962 factory.SkipTimingReport();
2963 factory.DisableStatistics();
2964
2965 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2966 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2967
2968 std::unique_ptr<aos::EventLoop> pi1_event_loop = pi1->MakeEventLoop("sender");
2969
2970 aos::monotonic_clock::time_point now = pi1_event_loop->monotonic_now();
2971 FunctionScheduler run_at(pi1_event_loop.get());
2972 aos::Sender<examples::Ping> pi1_sender =
2973 pi1_event_loop->MakeSender<examples::Ping>("/reliable");
2974
2975 int i = 0;
2976 for (const std::chrono::nanoseconds dt :
2977 {chrono::microseconds(5000), chrono::microseconds(1),
2978 chrono::microseconds(2), chrono::microseconds(70),
2979 chrono::microseconds(63), chrono::microseconds(140),
2980 chrono::microseconds(160)}) {
2981 run_at.ScheduleAt([&, i = i]() { SendPing(&pi1_sender, i); },
2982 now + chrono::seconds(10) - dt);
2983
2984 now += chrono::seconds(10);
2985 ++i;
2986 }
2987
2988 std::vector<std::pair<aos::Context, int>> msgs;
2989
2990 PingLogger *logger;
2991 pi2->OnStartup([pi2, &msgs, &logger]() {
2992 logger = pi2->AlwaysStart<PingLogger>("ping_logger", "/reliable", &msgs);
2993 });
2994
2995 factory.RunFor(now - pi1_event_loop->monotonic_now() + chrono::seconds(10));
2996
2997 // Stop the logger to flush the last boot of data.
2998 pi2->Stop(logger);
2999
3000 const monotonic_clock::time_point e = monotonic_clock::epoch();
3001 const chrono::nanoseconds send_delay = factory.send_delay();
3002 const chrono::nanoseconds network_delay = factory.network_delay();
3003
3004 // Verified using --vmodule=simulated_event_loop=1 and looking at the actual
3005 // event times to confirm what should have been forwarded when.
3006 const std::vector<ExpectedTimestamps> expected_values = {
3007 ExpectedTimestamps{
3008 .remote_time = e + chrono::microseconds(9995000),
3009 .remote_transmit_time =
3010 e + chrono::microseconds(9995000) + send_delay,
3011 .event_time =
3012 e + chrono::microseconds(9995000) + send_delay + network_delay,
3013 .value = 0,
3014 },
3015 ExpectedTimestamps{
3016 .remote_time = e + chrono::microseconds(9995000),
3017 .remote_transmit_time = e + chrono::microseconds(10000000),
3018 .event_time = e + network_delay,
3019 .value = 0,
3020 },
3021 ExpectedTimestamps{
3022 .remote_time = e + chrono::microseconds(19999999),
3023 .remote_transmit_time = e + chrono::microseconds(20000000),
3024 .event_time = e + network_delay,
3025 .value = 1,
3026 },
3027 ExpectedTimestamps{
3028 .remote_time = e + chrono::microseconds(29999998),
3029 .remote_transmit_time = e + chrono::microseconds(30000000),
3030 .event_time = e + network_delay,
3031 .value = 2,
3032 },
3033 ExpectedTimestamps{
3034 .remote_time = e + chrono::microseconds(39999930),
3035 .remote_transmit_time = e + chrono::microseconds(40000000),
3036 .event_time = e + network_delay,
3037 .value = 3,
3038 },
3039 ExpectedTimestamps{
3040 .remote_time = e + chrono::microseconds(49999937),
3041 .remote_transmit_time = e + chrono::microseconds(50000000),
3042 .event_time = e + network_delay,
3043 .value = 4,
3044 },
3045 ExpectedTimestamps{
3046 .remote_time = e + chrono::microseconds(59999860),
3047 .remote_transmit_time = e + chrono::microseconds(60000000),
3048 .event_time = e + network_delay,
3049 .value = 5,
3050 },
3051 ExpectedTimestamps{
3052 .remote_time = e + chrono::microseconds(69999840),
3053 .remote_transmit_time = e + chrono::microseconds(69999890),
3054 .event_time = e + chrono::microseconds(9999890) + network_delay,
3055 .value = 6,
3056 },
3057 ExpectedTimestamps{
3058 .remote_time = e + chrono::microseconds(69999840),
3059 .remote_transmit_time = e + chrono::microseconds(70000000),
3060 .event_time = e + network_delay,
3061 .value = 6,
3062 },
3063 };
3064
3065 ASSERT_EQ(msgs.size(), expected_values.size());
3066
3067 for (size_t i = 0; i < msgs.size(); ++i) {
3068 EXPECT_EQ(msgs[i].first.monotonic_remote_time,
3069 expected_values[i].remote_time);
3070 EXPECT_EQ(msgs[i].first.monotonic_remote_transmit_time,
3071 expected_values[i].remote_transmit_time);
3072 EXPECT_EQ(msgs[i].first.monotonic_event_time,
3073 expected_values[i].event_time);
3074 EXPECT_EQ(msgs[i].second, expected_values[i].value);
3075 }
3076
3077 // TODO(austin): Verify that the dropped packet count increases.
3078}
3079
Stephan Pleinesf63bde82024-01-13 15:59:33 -08003080} // namespace aos::testing