blob: a317db814064003f114a1aa095a9f41906d6b1a5 [file] [log] [blame]
Alex Perrycb7da4b2019-08-28 19:35:56 -07001#include "aos/events/simulated_event_loop.h"
2
milind1f1dca32021-07-03 13:50:07 -07003#include <chrono>
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -07004#include <functional>
Austin Schuh5f1cc5c2019-12-01 18:01:11 -08005#include <string_view>
6
Philipp Schrader790cb542023-07-05 21:06:52 -07007#include "gtest/gtest.h"
8
Alex Perrycb7da4b2019-08-28 19:35:56 -07009#include "aos/events/event_loop_param_test.h"
Austin Schuhbf610b72024-04-04 20:04:55 -070010#include "aos/events/function_scheduler.h"
Austin Schuh2f8fd752020-09-01 22:38:28 -070011#include "aos/events/logging/logger_generated.h"
Austin Schuh01b4c352020-09-21 23:09:39 -070012#include "aos/events/message_counter.h"
Austin Schuh898f4972020-01-11 17:21:25 -080013#include "aos/events/ping_lib.h"
14#include "aos/events/pong_lib.h"
Austin Schuh7d87b672019-12-01 20:23:49 -080015#include "aos/events/test_message_generated.h"
Austin Schuh4c3b9702020-08-30 11:34:55 -070016#include "aos/network/message_bridge_client_generated.h"
17#include "aos/network/message_bridge_server_generated.h"
Austin Schuh0de30f32020-12-06 12:44:28 -080018#include "aos/network/remote_message_generated.h"
Austin Schuh87dd3832021-01-01 23:07:31 -080019#include "aos/network/testing_time_converter.h"
Austin Schuh4c3b9702020-08-30 11:34:55 -070020#include "aos/network/timestamp_generated.h"
Austin Schuh373f1762021-06-02 21:07:09 -070021#include "aos/testing/path.h"
Neil Balchc8f41ed2018-01-20 22:06:53 -080022
Stephan Pleinesf63bde82024-01-13 15:59:33 -080023namespace aos::testing {
Brian Silverman28d14302020-09-18 15:26:17 -070024namespace {
25
Austin Schuh373f1762021-06-02 21:07:09 -070026using aos::testing::ArtifactPath;
Brian Silverman28d14302020-09-18 15:26:17 -070027
Austin Schuh58646e22021-08-23 23:51:46 -070028using logger::BootTimestamp;
Austin Schuh0de30f32020-12-06 12:44:28 -080029using message_bridge::RemoteMessage;
Austin Schuh7267c532019-05-19 19:55:53 -070030namespace chrono = ::std::chrono;
31
Austin Schuh0de30f32020-12-06 12:44:28 -080032} // namespace
33
Neil Balchc8f41ed2018-01-20 22:06:53 -080034class SimulatedEventLoopTestFactory : public EventLoopTestFactory {
35 public:
Austin Schuh5f1cc5c2019-12-01 18:01:11 -080036 ::std::unique_ptr<EventLoop> Make(std::string_view name) override {
Austin Schuh217a9782019-12-21 23:02:50 -080037 MaybeMake();
Austin Schuhac0771c2020-01-07 18:36:30 -080038 return event_loop_factory_->MakeEventLoop(name, my_node());
Neil Balchc8f41ed2018-01-20 22:06:53 -080039 }
Austin Schuh5f1cc5c2019-12-01 18:01:11 -080040 ::std::unique_ptr<EventLoop> MakePrimary(std::string_view name) override {
Austin Schuh217a9782019-12-21 23:02:50 -080041 MaybeMake();
Austin Schuhac0771c2020-01-07 18:36:30 -080042 return event_loop_factory_->MakeEventLoop(name, my_node());
Austin Schuh44019f92019-05-19 19:58:27 -070043 }
44
Austin Schuh217a9782019-12-21 23:02:50 -080045 void Run() override { event_loop_factory_->Run(); }
46 void Exit() override { event_loop_factory_->Exit(); }
Austin Schuh44019f92019-05-19 19:58:27 -070047
Austin Schuh52d325c2019-06-23 18:59:06 -070048 // TODO(austin): Implement this. It's used currently for a phased loop test.
49 // I'm not sure how much that matters.
50 void SleepFor(::std::chrono::nanoseconds /*duration*/) override {}
51
Austin Schuh7d87b672019-12-01 20:23:49 -080052 void set_send_delay(std::chrono::nanoseconds send_delay) {
Austin Schuh217a9782019-12-21 23:02:50 -080053 MaybeMake();
54 event_loop_factory_->set_send_delay(send_delay);
Austin Schuh7d87b672019-12-01 20:23:49 -080055 }
56
Neil Balchc8f41ed2018-01-20 22:06:53 -080057 private:
Austin Schuh217a9782019-12-21 23:02:50 -080058 void MaybeMake() {
59 if (!event_loop_factory_) {
60 if (configuration()->has_nodes()) {
Austin Schuhac0771c2020-01-07 18:36:30 -080061 event_loop_factory_ =
62 std::make_unique<SimulatedEventLoopFactory>(configuration());
Austin Schuh217a9782019-12-21 23:02:50 -080063 } else {
64 event_loop_factory_ =
65 std::make_unique<SimulatedEventLoopFactory>(configuration());
66 }
67 }
68 }
69 std::unique_ptr<SimulatedEventLoopFactory> event_loop_factory_;
Neil Balchc8f41ed2018-01-20 22:06:53 -080070};
71
Austin Schuh6bae8252021-02-07 22:01:49 -080072auto CommonParameters() {
73 return ::testing::Combine(
74 ::testing::Values([]() { return new SimulatedEventLoopTestFactory(); }),
75 ::testing::Values(ReadMethod::COPY, ReadMethod::PIN),
76 ::testing::Values(DoTimingReports::kYes, DoTimingReports::kNo));
77}
Austin Schuh6b6dfa52019-06-12 20:16:20 -070078
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -070079INSTANTIATE_TEST_SUITE_P(SimulatedEventLoopCommonTest, AbstractEventLoopTest,
Austin Schuh66168842021-08-17 19:42:21 -070080 CommonParameters());
Brian Silverman77162972020-08-12 19:52:40 -070081
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -070082INSTANTIATE_TEST_SUITE_P(SimulatedEventLoopCommonDeathTest,
Austin Schuh66168842021-08-17 19:42:21 -070083 AbstractEventLoopDeathTest, CommonParameters());
Neil Balchc8f41ed2018-01-20 22:06:53 -080084
Austin Schuh89c9b812021-02-20 14:42:10 -080085// Parameters to run all the tests with.
86struct Param {
87 // The config file to use.
88 std::string config;
89 // If true, the RemoteMessage channel should be shared between all the remote
90 // channels. If false, there will be 1 RemoteMessage channel per remote
91 // channel.
92 bool shared;
93};
94
95class RemoteMessageSimulatedEventLoopTest
96 : public ::testing::TestWithParam<struct Param> {
97 public:
98 RemoteMessageSimulatedEventLoopTest()
99 : config(aos::configuration::ReadConfig(
Austin Schuh373f1762021-06-02 21:07:09 -0700100 ArtifactPath(absl::StrCat("aos/events/", GetParam().config)))) {
Austin Schuh89c9b812021-02-20 14:42:10 -0800101 LOG(INFO) << "Config " << GetParam().config;
102 }
103
104 bool shared() const { return GetParam().shared; }
105
106 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
107 MakePi2OnPi1MessageCounters(aos::EventLoop *event_loop) {
108 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>> counters;
109 if (shared()) {
110 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
111 event_loop, "/aos/remote_timestamps/pi2"));
112 } else {
113 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
114 event_loop,
115 "/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp"));
116 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
117 event_loop, "/aos/remote_timestamps/pi2/test/aos-examples-Ping"));
118 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
119 event_loop, "/aos/remote_timestamps/pi2/reliable/aos-examples-Ping"));
120 }
121 return counters;
122 }
123
124 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
125 MakePi1OnPi2MessageCounters(aos::EventLoop *event_loop) {
126 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>> counters;
127 if (shared()) {
128 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
129 event_loop, "/aos/remote_timestamps/pi1"));
130 } else {
131 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
132 event_loop, "/aos/remote_timestamps/pi1/test/aos-examples-Pong"));
133 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
134 event_loop,
135 "/aos/remote_timestamps/pi1/pi2/aos/aos-message_bridge-Timestamp"));
136 }
137 return counters;
138 }
139
140 aos::FlatbufferDetachedBuffer<aos::Configuration> config;
141};
142
Austin Schuh8fb315a2020-11-19 22:33:58 -0800143// Test that sending a message after running gets properly notified.
144TEST(SimulatedEventLoopTest, SendAfterRunFor) {
145 SimulatedEventLoopTestFactory factory;
146
147 SimulatedEventLoopFactory simulated_event_loop_factory(
148 factory.configuration());
149
150 ::std::unique_ptr<EventLoop> ping_event_loop =
151 simulated_event_loop_factory.MakeEventLoop("ping");
152 aos::Sender<TestMessage> test_message_sender =
153 ping_event_loop->MakeSender<TestMessage>("/test");
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700154 ASSERT_EQ(SendTestMessage(test_message_sender), RawSender::Error::kOk);
Austin Schuh8fb315a2020-11-19 22:33:58 -0800155
156 std::unique_ptr<EventLoop> pong1_event_loop =
157 simulated_event_loop_factory.MakeEventLoop("pong");
158 MessageCounter<TestMessage> test_message_counter1(pong1_event_loop.get(),
159 "/test");
160
161 EXPECT_FALSE(ping_event_loop->is_running());
162
163 // Watchers start when you start running, so there should be nothing counted.
164 simulated_event_loop_factory.RunFor(chrono::seconds(1));
165 EXPECT_EQ(test_message_counter1.count(), 0u);
166
167 std::unique_ptr<EventLoop> pong2_event_loop =
168 simulated_event_loop_factory.MakeEventLoop("pong");
169 MessageCounter<TestMessage> test_message_counter2(pong2_event_loop.get(),
170 "/test");
171
172 // Pauses in the middle don't count though, so this should be counted.
173 // But, the fresh watcher shouldn't pick it up yet.
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700174 ASSERT_EQ(SendTestMessage(test_message_sender), RawSender::Error::kOk);
Austin Schuh8fb315a2020-11-19 22:33:58 -0800175
176 EXPECT_EQ(test_message_counter1.count(), 0u);
177 EXPECT_EQ(test_message_counter2.count(), 0u);
178 simulated_event_loop_factory.RunFor(chrono::seconds(1));
179
180 EXPECT_EQ(test_message_counter1.count(), 1u);
181 EXPECT_EQ(test_message_counter2.count(), 0u);
182}
183
Austin Schuhd027bf52024-05-12 22:24:12 -0700184// Test that OnRun callbacks get deleted if the event loop gets deleted.
185TEST(SimulatedEventLoopTest, DestructEventLoopBeforeOnRun) {
186 SimulatedEventLoopTestFactory factory;
187
188 SimulatedEventLoopFactory simulated_event_loop_factory(
189 factory.configuration());
190
191 {
192 ::std::unique_ptr<EventLoop> test_event_loop =
193 simulated_event_loop_factory.MakeEventLoop("test");
194 test_event_loop->OnRun([]() { LOG(FATAL) << "Don't run this"; });
195 }
196
197 simulated_event_loop_factory.RunFor(chrono::seconds(1));
198}
199
200// Tests that the order event loops are created is the order that the OnRun
201// callbacks are run.
202TEST(SimulatedEventLoopTest, OnRunOrderFollowsConstructionOrder) {
203 SimulatedEventLoopTestFactory factory;
204
205 SimulatedEventLoopFactory simulated_event_loop_factory(
206 factory.configuration());
207
208 int count = 0;
209
210 std::unique_ptr<EventLoop> test1_event_loop =
211 simulated_event_loop_factory.MakeEventLoop("test1");
212 std::unique_ptr<EventLoop> test2_event_loop =
213 simulated_event_loop_factory.MakeEventLoop("test2");
214 test2_event_loop->OnRun([&count]() {
215 EXPECT_EQ(count, 1u);
216 ++count;
217 });
218 test1_event_loop->OnRun([&count]() {
219 EXPECT_EQ(count, 0u);
220 ++count;
221 });
222
223 simulated_event_loop_factory.RunFor(chrono::seconds(1));
224
225 EXPECT_EQ(count, 2u);
226}
227
228// Test that we can't register OnRun callbacks after starting.
229TEST(SimulatedEventLoopDeathTest, OnRunAfterRunning) {
230 SimulatedEventLoopTestFactory factory;
231
232 SimulatedEventLoopFactory simulated_event_loop_factory(
233 factory.configuration());
234
235 std::unique_ptr<EventLoop> test_event_loop =
236 simulated_event_loop_factory.MakeEventLoop("test");
237 test_event_loop->OnRun([]() {});
238
239 simulated_event_loop_factory.RunFor(chrono::seconds(1));
240
241 EXPECT_DEATH(test_event_loop->OnRun([]() {}), "OnRun");
242}
243
Austin Schuh60e77942022-05-16 17:48:24 -0700244// Test that if we configure an event loop to be able to send too fast that we
245// do allow it to do so.
James Kuszmaul890c2492022-04-06 14:59:31 -0700246TEST(SimulatedEventLoopTest, AllowSendTooFast) {
247 SimulatedEventLoopTestFactory factory;
248
249 SimulatedEventLoopFactory simulated_event_loop_factory(
250 factory.configuration());
251
252 // Create two event loops: One will be allowed to send too fast, one won't. We
253 // will then test to ensure that the one that is allowed to send too fast can
254 // indeed send too fast, but that it then makes it so that the second event
255 // loop can no longer send anything because *it* is still limited.
256 ::std::unique_ptr<EventLoop> too_fast_event_loop =
257 simulated_event_loop_factory.GetNodeEventLoopFactory(nullptr)
258 ->MakeEventLoop("too_fast_sender",
259 {NodeEventLoopFactory::CheckSentTooFast::kNo,
James Kuszmaul94ca5132022-07-19 09:11:08 -0700260 NodeEventLoopFactory::ExclusiveSenders::kNo,
261 {}});
James Kuszmaul890c2492022-04-06 14:59:31 -0700262 aos::Sender<TestMessage> too_fast_message_sender =
263 too_fast_event_loop->MakeSender<TestMessage>("/test");
264
265 ::std::unique_ptr<EventLoop> limited_event_loop =
266 simulated_event_loop_factory.MakeEventLoop("limited_sender");
267 aos::Sender<TestMessage> limited_message_sender =
268 limited_event_loop->MakeSender<TestMessage>("/test");
269
270 const int queue_size = TestChannelQueueSize(too_fast_event_loop.get());
271 for (int ii = 0; ii < queue_size; ++ii) {
272 ASSERT_EQ(SendTestMessage(too_fast_message_sender), RawSender::Error::kOk);
273 }
274 // And now we should start being in the sending-too-fast phase.
275 for (int ii = 0; ii < queue_size; ++ii) {
276 ASSERT_EQ(SendTestMessage(too_fast_message_sender), RawSender::Error::kOk);
Austin Schuh60e77942022-05-16 17:48:24 -0700277 ASSERT_EQ(SendTestMessage(limited_message_sender),
278 RawSender::Error::kMessagesSentTooFast);
James Kuszmaul890c2492022-04-06 14:59:31 -0700279 }
280}
281
282// Test that if we setup an exclusive sender that it is indeed exclusive.
283TEST(SimulatedEventLoopDeathTest, ExclusiveSenders) {
284 SimulatedEventLoopTestFactory factory;
285
286 SimulatedEventLoopFactory simulated_event_loop_factory(
287 factory.configuration());
288
289 ::std::unique_ptr<EventLoop> exclusive_event_loop =
290 simulated_event_loop_factory.GetNodeEventLoopFactory(nullptr)
James Kuszmaul94ca5132022-07-19 09:11:08 -0700291 ->MakeEventLoop(
292 "too_fast_sender",
293 {NodeEventLoopFactory::CheckSentTooFast::kYes,
294 NodeEventLoopFactory::ExclusiveSenders::kYes,
295 {{configuration::GetChannel(factory.configuration(), "/test1",
296 "aos.TestMessage", "", nullptr),
297 NodeEventLoopFactory::ExclusiveSenders::kNo}}});
James Kuszmaul890c2492022-04-06 14:59:31 -0700298 exclusive_event_loop->SkipAosLog();
299 exclusive_event_loop->SkipTimingReport();
300 ::std::unique_ptr<EventLoop> normal_event_loop =
301 simulated_event_loop_factory.MakeEventLoop("limited_sender");
302 // Set things up to have the exclusive sender be destroyed so we can test
303 // recovery.
304 {
305 aos::Sender<TestMessage> exclusive_sender =
306 exclusive_event_loop->MakeSender<TestMessage>("/test");
307
308 EXPECT_DEATH(normal_event_loop->MakeSender<TestMessage>("/test"),
309 "TestMessage");
310 }
311 // This one should succeed now that the exclusive channel is removed.
312 aos::Sender<TestMessage> normal_sender =
313 normal_event_loop->MakeSender<TestMessage>("/test");
Austin Schuh60e77942022-05-16 17:48:24 -0700314 EXPECT_DEATH(exclusive_event_loop->MakeSender<TestMessage>("/test"),
315 "TestMessage");
James Kuszmaul94ca5132022-07-19 09:11:08 -0700316
317 // And check an explicitly exempted channel:
318 aos::Sender<TestMessage> non_exclusive_sender =
319 exclusive_event_loop->MakeSender<TestMessage>("/test1");
320 aos::Sender<TestMessage> non_exclusive_sender_regular_event_loop =
321 normal_event_loop->MakeSender<TestMessage>("/test1");
James Kuszmaul890c2492022-04-06 14:59:31 -0700322}
323
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700324void TestSentTooFastCheckEdgeCase(
325 const std::function<RawSender::Error(int, int)> expected_err,
326 const bool send_twice_at_end) {
327 SimulatedEventLoopTestFactory factory;
328
329 auto event_loop = factory.MakePrimary("primary");
330
331 auto sender = event_loop->MakeSender<TestMessage>("/test");
332
333 const int queue_size = TestChannelQueueSize(event_loop.get());
334 int msgs_sent = 0;
335 event_loop->AddPhasedLoop(
336 [&](int) {
337 EXPECT_EQ(SendTestMessage(sender), expected_err(msgs_sent, queue_size));
338 msgs_sent++;
339
340 // If send_twice_at_end, send the last two messages (message
341 // queue_size and queue_size + 1) in the same iteration, meaning that
342 // we would be sending very slightly too fast. Otherwise, we will send
343 // message queue_size + 1 in the next iteration and we will continue
344 // to be sending exactly at the channel frequency.
345 if (send_twice_at_end && (msgs_sent == queue_size)) {
346 EXPECT_EQ(SendTestMessage(sender),
347 expected_err(msgs_sent, queue_size));
348 msgs_sent++;
349 }
350
351 if (msgs_sent > queue_size) {
352 factory.Exit();
353 }
354 },
355 std::chrono::duration_cast<std::chrono::nanoseconds>(
356 std::chrono::duration<double>(
357 1.0 / TestChannelFrequency(event_loop.get()))));
358
359 factory.Run();
360}
361
362// Tests that RawSender::Error::kMessagesSentTooFast is not returned
363// when messages are sent at the exact frequency of the channel.
364TEST(SimulatedEventLoopTest, SendingAtExactlyChannelFrequency) {
365 TestSentTooFastCheckEdgeCase([](int, int) { return RawSender::Error::kOk; },
366 false);
367}
368
369// Tests that RawSender::Error::kMessagesSentTooFast is returned
370// when sending exactly one more message than allowed in a channel storage
371// duration.
372TEST(SimulatedEventLoopTest, SendingSlightlyTooFast) {
373 TestSentTooFastCheckEdgeCase(
374 [](const int msgs_sent, const int queue_size) {
375 return (msgs_sent == queue_size ? RawSender::Error::kMessagesSentTooFast
376 : RawSender::Error::kOk);
377 },
378 true);
379}
380
Austin Schuh8fb315a2020-11-19 22:33:58 -0800381// Test that creating an event loop while running dies.
382TEST(SimulatedEventLoopDeathTest, MakeEventLoopWhileRunning) {
383 SimulatedEventLoopTestFactory factory;
384
385 SimulatedEventLoopFactory simulated_event_loop_factory(
386 factory.configuration());
387
388 ::std::unique_ptr<EventLoop> event_loop =
389 simulated_event_loop_factory.MakeEventLoop("ping");
390
391 auto timer = event_loop->AddTimer([&]() {
392 EXPECT_DEATH(
393 {
394 ::std::unique_ptr<EventLoop> event_loop2 =
395 simulated_event_loop_factory.MakeEventLoop("ping");
396 },
397 "event loop while running");
398 simulated_event_loop_factory.Exit();
399 });
400
401 event_loop->OnRun([&event_loop, &timer] {
Philipp Schradera6712522023-07-05 20:25:11 -0700402 timer->Schedule(event_loop->monotonic_now() + chrono::milliseconds(50));
Austin Schuh8fb315a2020-11-19 22:33:58 -0800403 });
404
405 simulated_event_loop_factory.Run();
406}
407
408// Test that creating a watcher after running dies.
409TEST(SimulatedEventLoopDeathTest, MakeWatcherAfterRunning) {
410 SimulatedEventLoopTestFactory factory;
411
412 SimulatedEventLoopFactory simulated_event_loop_factory(
413 factory.configuration());
414
415 ::std::unique_ptr<EventLoop> event_loop =
416 simulated_event_loop_factory.MakeEventLoop("ping");
417
418 simulated_event_loop_factory.RunFor(chrono::seconds(1));
419
420 EXPECT_DEATH(
421 { MessageCounter<TestMessage> counter(event_loop.get(), "/test"); },
422 "Can't add a watcher after running");
423
424 ::std::unique_ptr<EventLoop> event_loop2 =
425 simulated_event_loop_factory.MakeEventLoop("ping");
426
427 simulated_event_loop_factory.RunFor(chrono::seconds(1));
428
429 EXPECT_DEATH(
430 { MessageCounter<TestMessage> counter(event_loop2.get(), "/test"); },
431 "Can't add a watcher after running");
432}
433
Austin Schuh44019f92019-05-19 19:58:27 -0700434// Test that running for a time period with no handlers causes time to progress
435// correctly.
436TEST(SimulatedEventLoopTest, RunForNoHandlers) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800437 SimulatedEventLoopTestFactory factory;
438
439 SimulatedEventLoopFactory simulated_event_loop_factory(
440 factory.configuration());
Austin Schuh44019f92019-05-19 19:58:27 -0700441 ::std::unique_ptr<EventLoop> event_loop =
Austin Schuh5f1cc5c2019-12-01 18:01:11 -0800442 simulated_event_loop_factory.MakeEventLoop("loop");
Austin Schuh44019f92019-05-19 19:58:27 -0700443
444 simulated_event_loop_factory.RunFor(chrono::seconds(1));
445
446 EXPECT_EQ(::aos::monotonic_clock::epoch() + chrono::seconds(1),
Austin Schuh44019f92019-05-19 19:58:27 -0700447 event_loop->monotonic_now());
448}
449
450// Test that running for a time with a periodic handler causes time to end
451// correctly.
452TEST(SimulatedEventLoopTest, RunForTimerHandler) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800453 SimulatedEventLoopTestFactory factory;
454
455 SimulatedEventLoopFactory simulated_event_loop_factory(
456 factory.configuration());
Austin Schuh44019f92019-05-19 19:58:27 -0700457 ::std::unique_ptr<EventLoop> event_loop =
Austin Schuh5f1cc5c2019-12-01 18:01:11 -0800458 simulated_event_loop_factory.MakeEventLoop("loop");
Austin Schuh44019f92019-05-19 19:58:27 -0700459
460 int counter = 0;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700461 auto timer = event_loop->AddTimer([&counter]() { ++counter; });
Austin Schuh44019f92019-05-19 19:58:27 -0700462 event_loop->OnRun([&event_loop, &timer] {
Philipp Schradera6712522023-07-05 20:25:11 -0700463 timer->Schedule(event_loop->monotonic_now() + chrono::milliseconds(50),
464 chrono::milliseconds(100));
Austin Schuh44019f92019-05-19 19:58:27 -0700465 });
466
467 simulated_event_loop_factory.RunFor(chrono::seconds(1));
468
469 EXPECT_EQ(::aos::monotonic_clock::epoch() + chrono::seconds(1),
Austin Schuh44019f92019-05-19 19:58:27 -0700470 event_loop->monotonic_now());
471 EXPECT_EQ(counter, 10);
472}
473
Austin Schuh7d87b672019-12-01 20:23:49 -0800474// Tests that watchers have latency in simulation.
475TEST(SimulatedEventLoopTest, WatcherTimingReport) {
476 SimulatedEventLoopTestFactory factory;
477 factory.set_send_delay(std::chrono::microseconds(50));
478
479 FLAGS_timing_report_ms = 1000;
480 auto loop1 = factory.MakePrimary("primary");
481 loop1->MakeWatcher("/test", [](const TestMessage &) {});
482
483 auto loop2 = factory.Make("sender_loop");
484
485 auto loop3 = factory.Make("report_fetcher");
486
487 Fetcher<timing::Report> report_fetcher =
488 loop3->MakeFetcher<timing::Report>("/aos");
489 EXPECT_FALSE(report_fetcher.Fetch());
490
491 auto sender = loop2->MakeSender<TestMessage>("/test");
492
493 // Send 10 messages in the middle of a timing report period so we get
494 // something interesting back.
495 auto test_timer = loop2->AddTimer([&sender]() {
496 for (int i = 0; i < 10; ++i) {
497 aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
498 TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
499 builder.add_value(200 + i);
milind1f1dca32021-07-03 13:50:07 -0700500 msg.CheckOk(msg.Send(builder.Finish()));
Austin Schuh7d87b672019-12-01 20:23:49 -0800501 }
502 });
503
504 // Quit after 1 timing report, mid way through the next cycle.
505 {
506 auto end_timer = loop1->AddTimer([&factory]() { factory.Exit(); });
Philipp Schradera6712522023-07-05 20:25:11 -0700507 end_timer->Schedule(loop1->monotonic_now() + chrono::milliseconds(2500));
Austin Schuh7d87b672019-12-01 20:23:49 -0800508 end_timer->set_name("end");
509 }
510
511 loop1->OnRun([&test_timer, &loop1]() {
Philipp Schradera6712522023-07-05 20:25:11 -0700512 test_timer->Schedule(loop1->monotonic_now() + chrono::milliseconds(1500));
Austin Schuh7d87b672019-12-01 20:23:49 -0800513 });
514
515 factory.Run();
516
517 // And, since we are here, check that the timing report makes sense.
518 // Start by looking for our event loop's timing.
519 FlatbufferDetachedBuffer<timing::Report> primary_report =
520 FlatbufferDetachedBuffer<timing::Report>::Empty();
521 while (report_fetcher.FetchNext()) {
522 LOG(INFO) << "Report " << FlatbufferToJson(report_fetcher.get());
523 if (report_fetcher->name()->string_view() == "primary") {
524 primary_report = CopyFlatBuffer(report_fetcher.get());
525 }
526 }
527
528 // Check the watcher report.
Ravago Jonescf453ab2020-05-06 21:14:53 -0700529 VLOG(1) << FlatbufferToJson(primary_report, {.multi_line = true});
Austin Schuh7d87b672019-12-01 20:23:49 -0800530
531 EXPECT_EQ(primary_report.message().name()->string_view(), "primary");
532
533 // Just the timing report timer.
534 ASSERT_NE(primary_report.message().timers(), nullptr);
535 EXPECT_EQ(primary_report.message().timers()->size(), 2);
536
537 // No phased loops
538 ASSERT_EQ(primary_report.message().phased_loops(), nullptr);
539
540 // And now confirm that the watcher received all 10 messages, and has latency.
541 ASSERT_NE(primary_report.message().watchers(), nullptr);
542 ASSERT_EQ(primary_report.message().watchers()->size(), 1);
543 EXPECT_EQ(primary_report.message().watchers()->Get(0)->count(), 10);
544 EXPECT_NEAR(
545 primary_report.message().watchers()->Get(0)->wakeup_latency()->average(),
546 0.00005, 1e-9);
547 EXPECT_NEAR(
548 primary_report.message().watchers()->Get(0)->wakeup_latency()->min(),
549 0.00005, 1e-9);
550 EXPECT_NEAR(
551 primary_report.message().watchers()->Get(0)->wakeup_latency()->max(),
552 0.00005, 1e-9);
553 EXPECT_EQ(primary_report.message()
554 .watchers()
555 ->Get(0)
556 ->wakeup_latency()
557 ->standard_deviation(),
558 0.0);
559
560 EXPECT_EQ(
561 primary_report.message().watchers()->Get(0)->handler_time()->average(),
562 0.0);
563 EXPECT_EQ(primary_report.message().watchers()->Get(0)->handler_time()->min(),
564 0.0);
565 EXPECT_EQ(primary_report.message().watchers()->Get(0)->handler_time()->max(),
566 0.0);
567 EXPECT_EQ(primary_report.message()
568 .watchers()
569 ->Get(0)
570 ->handler_time()
571 ->standard_deviation(),
572 0.0);
573}
574
Austin Schuh89c9b812021-02-20 14:42:10 -0800575size_t CountAll(
576 const std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
577 &counters) {
578 size_t count = 0u;
579 for (const std::unique_ptr<MessageCounter<RemoteMessage>> &counter :
580 counters) {
581 count += counter->count();
582 }
583 return count;
584}
585
Austin Schuh4c3b9702020-08-30 11:34:55 -0700586// Tests that ping and pong work when on 2 different nodes, and the message
587// gateway messages are sent out as expected.
Austin Schuh89c9b812021-02-20 14:42:10 -0800588TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodePingPong) {
Austin Schuh898f4972020-01-11 17:21:25 -0800589 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
590 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
Austin Schuh4c3b9702020-08-30 11:34:55 -0700591 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
Austin Schuh898f4972020-01-11 17:21:25 -0800592
593 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
594
595 std::unique_ptr<EventLoop> ping_event_loop =
596 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
597 Ping ping(ping_event_loop.get());
598
599 std::unique_ptr<EventLoop> pong_event_loop =
600 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
601 Pong pong(pong_event_loop.get());
602
603 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
604 simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700605 MessageCounter<examples::Pong> pi2_pong_counter(
606 pi2_pong_counter_event_loop.get(), "/test");
Austin Schuh2f8fd752020-09-01 22:38:28 -0700607 aos::Fetcher<message_bridge::Timestamp> pi1_on_pi2_timestamp_fetcher =
608 pi2_pong_counter_event_loop->MakeFetcher<message_bridge::Timestamp>(
609 "/pi1/aos");
610 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
611 pi2_pong_counter_event_loop->MakeFetcher<examples::Ping>("/test");
Austin Schuh898f4972020-01-11 17:21:25 -0800612
Austin Schuh4c3b9702020-08-30 11:34:55 -0700613 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
614 simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
Austin Schuh898f4972020-01-11 17:21:25 -0800615
616 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
617 simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700618 MessageCounter<examples::Pong> pi1_pong_counter(
619 pi1_pong_counter_event_loop.get(), "/test");
Austin Schuh2f8fd752020-09-01 22:38:28 -0700620 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
621 pi1_pong_counter_event_loop->MakeFetcher<examples::Ping>("/test");
622 aos::Fetcher<message_bridge::Timestamp> pi1_on_pi1_timestamp_fetcher =
623 pi1_pong_counter_event_loop->MakeFetcher<message_bridge::Timestamp>(
624 "/aos");
625
Austin Schuh4c3b9702020-08-30 11:34:55 -0700626 // Count timestamps.
627 MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
628 pi1_pong_counter_event_loop.get(), "/pi1/aos");
629 MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
630 pi2_pong_counter_event_loop.get(), "/pi1/aos");
631 MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
632 pi3_pong_counter_event_loop.get(), "/pi1/aos");
633 MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
634 pi1_pong_counter_event_loop.get(), "/pi2/aos");
635 MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
636 pi2_pong_counter_event_loop.get(), "/pi2/aos");
637 MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
638 pi1_pong_counter_event_loop.get(), "/pi3/aos");
639 MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
640 pi3_pong_counter_event_loop.get(), "/pi3/aos");
641
Austin Schuh2f8fd752020-09-01 22:38:28 -0700642 // Count remote timestamps
Austin Schuh89c9b812021-02-20 14:42:10 -0800643 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
644 remote_timestamps_pi2_on_pi1 =
645 MakePi2OnPi1MessageCounters(pi1_pong_counter_event_loop.get());
646 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
647 remote_timestamps_pi1_on_pi2 =
648 MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700649
Austin Schuh4c3b9702020-08-30 11:34:55 -0700650 // Wait to let timestamp estimation start up before looking for the results.
651 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
652
Austin Schuh8fb315a2020-11-19 22:33:58 -0800653 std::unique_ptr<EventLoop> pi1_statistics_counter_event_loop =
654 simulated_event_loop_factory.MakeEventLoop("pi1_statistics_counter", pi1);
655 std::unique_ptr<EventLoop> pi2_statistics_counter_event_loop =
656 simulated_event_loop_factory.MakeEventLoop("pi2_statistics_counter", pi2);
657 std::unique_ptr<EventLoop> pi3_statistics_counter_event_loop =
658 simulated_event_loop_factory.MakeEventLoop("pi3_statistics_counter", pi3);
659
Austin Schuh4c3b9702020-08-30 11:34:55 -0700660 int pi1_server_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800661 pi1_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700662 "/pi1/aos", [&pi1_server_statistics_count](
663 const message_bridge::ServerStatistics &stats) {
664 VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
665 EXPECT_EQ(stats.connections()->size(), 2u);
666 for (const message_bridge::ServerConnection *connection :
667 *stats.connections()) {
668 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800669 EXPECT_EQ(connection->connection_count(), 1u);
670 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800671 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700672 if (connection->node()->name()->string_view() == "pi2") {
673 EXPECT_GT(connection->sent_packets(), 50);
674 } else if (connection->node()->name()->string_view() == "pi3") {
675 EXPECT_GE(connection->sent_packets(), 5);
676 } else {
677 LOG(FATAL) << "Unknown connection";
678 }
679
680 EXPECT_TRUE(connection->has_monotonic_offset());
681 EXPECT_EQ(connection->monotonic_offset(), 0);
James Kuszmaula6681e22023-05-26 11:20:40 -0700682
683 EXPECT_TRUE(connection->has_channels());
684 int accumulated_sent_count = 0;
685 int accumulated_dropped_count = 0;
686 for (const message_bridge::ServerChannelStatistics *channel :
687 *connection->channels()) {
688 accumulated_sent_count += channel->sent_packets();
689 accumulated_dropped_count += channel->dropped_packets();
690 }
691 EXPECT_EQ(connection->sent_packets(), accumulated_sent_count);
692 EXPECT_EQ(connection->dropped_packets(), accumulated_dropped_count);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700693 }
694 ++pi1_server_statistics_count;
695 });
696
697 int pi2_server_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800698 pi2_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700699 "/pi2/aos", [&pi2_server_statistics_count](
700 const message_bridge::ServerStatistics &stats) {
701 VLOG(1) << "pi2 ServerStatistics " << FlatbufferToJson(&stats);
702 EXPECT_EQ(stats.connections()->size(), 1u);
703
704 const message_bridge::ServerConnection *connection =
705 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800706 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700707 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
708 EXPECT_GT(connection->sent_packets(), 50);
709 EXPECT_TRUE(connection->has_monotonic_offset());
710 EXPECT_EQ(connection->monotonic_offset(), 0);
Austin Schuh367a7f42021-11-23 23:04:36 -0800711 EXPECT_EQ(connection->connection_count(), 1u);
712 EXPECT_EQ(connection->connected_since_time(), 0);
James Kuszmaula6681e22023-05-26 11:20:40 -0700713
714 EXPECT_TRUE(connection->has_channels());
715 int accumulated_sent_count = 0;
716 int accumulated_dropped_count = 0;
717 for (const message_bridge::ServerChannelStatistics *channel :
718 *connection->channels()) {
719 accumulated_sent_count += channel->sent_packets();
720 accumulated_dropped_count += channel->dropped_packets();
721 }
722 EXPECT_EQ(connection->sent_packets(), accumulated_sent_count);
723 EXPECT_EQ(connection->dropped_packets(), accumulated_dropped_count);
724
Austin Schuh4c3b9702020-08-30 11:34:55 -0700725 ++pi2_server_statistics_count;
726 });
727
728 int pi3_server_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800729 pi3_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700730 "/pi3/aos", [&pi3_server_statistics_count](
731 const message_bridge::ServerStatistics &stats) {
732 VLOG(1) << "pi3 ServerStatistics " << FlatbufferToJson(&stats);
733 EXPECT_EQ(stats.connections()->size(), 1u);
734
735 const message_bridge::ServerConnection *connection =
736 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800737 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700738 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
739 EXPECT_GE(connection->sent_packets(), 5);
740 EXPECT_TRUE(connection->has_monotonic_offset());
741 EXPECT_EQ(connection->monotonic_offset(), 0);
Austin Schuh367a7f42021-11-23 23:04:36 -0800742 EXPECT_EQ(connection->connection_count(), 1u);
743 EXPECT_EQ(connection->connected_since_time(), 0);
James Kuszmaula6681e22023-05-26 11:20:40 -0700744
745 EXPECT_TRUE(connection->has_channels());
746 int accumulated_sent_count = 0;
747 int accumulated_dropped_count = 0;
748 for (const message_bridge::ServerChannelStatistics *channel :
749 *connection->channels()) {
750 accumulated_sent_count += channel->sent_packets();
751 accumulated_dropped_count += channel->dropped_packets();
752 }
753 EXPECT_EQ(connection->sent_packets(), accumulated_sent_count);
754 EXPECT_EQ(connection->dropped_packets(), accumulated_dropped_count);
755
Austin Schuh4c3b9702020-08-30 11:34:55 -0700756 ++pi3_server_statistics_count;
757 });
758
759 int pi1_client_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800760 pi1_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700761 "/pi1/aos", [&pi1_client_statistics_count](
762 const message_bridge::ClientStatistics &stats) {
763 VLOG(1) << "pi1 ClientStatistics " << FlatbufferToJson(&stats);
764 EXPECT_EQ(stats.connections()->size(), 2u);
765
766 for (const message_bridge::ClientConnection *connection :
767 *stats.connections()) {
768 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
769 if (connection->node()->name()->string_view() == "pi2") {
770 EXPECT_GT(connection->received_packets(), 50);
771 } else if (connection->node()->name()->string_view() == "pi3") {
772 EXPECT_GE(connection->received_packets(), 5);
773 } else {
774 LOG(FATAL) << "Unknown connection";
775 }
776
Austin Schuhe61d4382021-03-31 21:33:02 -0700777 EXPECT_EQ(connection->partial_deliveries(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700778 EXPECT_TRUE(connection->has_monotonic_offset());
Austin Schuhac6d89e2024-03-27 14:56:09 -0700779 EXPECT_EQ(connection->monotonic_offset(), 100000);
Austin Schuh367a7f42021-11-23 23:04:36 -0800780 EXPECT_EQ(connection->connection_count(), 1u);
781 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700782 }
783 ++pi1_client_statistics_count;
784 });
785
786 int pi2_client_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800787 pi2_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700788 "/pi2/aos", [&pi2_client_statistics_count](
789 const message_bridge::ClientStatistics &stats) {
790 VLOG(1) << "pi2 ClientStatistics " << FlatbufferToJson(&stats);
791 EXPECT_EQ(stats.connections()->size(), 1u);
792
793 const message_bridge::ClientConnection *connection =
794 stats.connections()->Get(0);
795 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
796 EXPECT_GT(connection->received_packets(), 50);
Austin Schuhe61d4382021-03-31 21:33:02 -0700797 EXPECT_EQ(connection->partial_deliveries(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700798 EXPECT_TRUE(connection->has_monotonic_offset());
Austin Schuhac6d89e2024-03-27 14:56:09 -0700799 EXPECT_EQ(connection->monotonic_offset(), 100000);
Austin Schuh367a7f42021-11-23 23:04:36 -0800800 EXPECT_EQ(connection->connection_count(), 1u);
801 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700802 ++pi2_client_statistics_count;
803 });
804
805 int pi3_client_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800806 pi3_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700807 "/pi3/aos", [&pi3_client_statistics_count](
808 const message_bridge::ClientStatistics &stats) {
809 VLOG(1) << "pi3 ClientStatistics " << FlatbufferToJson(&stats);
810 EXPECT_EQ(stats.connections()->size(), 1u);
811
812 const message_bridge::ClientConnection *connection =
813 stats.connections()->Get(0);
814 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
815 EXPECT_GE(connection->received_packets(), 5);
Austin Schuhe61d4382021-03-31 21:33:02 -0700816 EXPECT_EQ(connection->partial_deliveries(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700817 EXPECT_TRUE(connection->has_monotonic_offset());
Austin Schuhac6d89e2024-03-27 14:56:09 -0700818 EXPECT_EQ(connection->monotonic_offset(), 100000);
Austin Schuh367a7f42021-11-23 23:04:36 -0800819 EXPECT_EQ(connection->connection_count(), 1u);
820 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700821 ++pi3_client_statistics_count;
822 });
823
Austin Schuh2f8fd752020-09-01 22:38:28 -0700824 // Find the channel index for both the /pi1/aos Timestamp channel and Ping
825 // channel.
826 const size_t pi1_timestamp_channel =
827 configuration::ChannelIndex(pi1_pong_counter_event_loop->configuration(),
828 pi1_on_pi2_timestamp_fetcher.channel());
829 const size_t ping_timestamp_channel =
830 configuration::ChannelIndex(pi1_pong_counter_event_loop->configuration(),
831 ping_on_pi2_fetcher.channel());
832
833 for (const Channel *channel :
834 *pi1_pong_counter_event_loop->configuration()->channels()) {
835 VLOG(1) << "Channel "
836 << configuration::ChannelIndex(
837 pi1_pong_counter_event_loop->configuration(), channel)
838 << " " << configuration::CleanedChannelToString(channel);
839 }
840
Austin Schuh8fb315a2020-11-19 22:33:58 -0800841 std::unique_ptr<EventLoop> pi1_remote_timestamp =
842 simulated_event_loop_factory.MakeEventLoop("pi1_remote_timestamp", pi1);
843
Austin Schuh89c9b812021-02-20 14:42:10 -0800844 for (std::pair<int, std::string> channel :
845 shared()
846 ? std::vector<std::pair<
847 int, std::string>>{{-1, "/pi1/aos/remote_timestamps/pi2"}}
848 : std::vector<std::pair<int, std::string>>{
849 {pi1_timestamp_channel,
850 "/pi1/aos/remote_timestamps/pi2/pi1/aos/"
851 "aos-message_bridge-Timestamp"},
852 {ping_timestamp_channel,
853 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping"}}) {
854 // For each remote timestamp we get back, confirm that it is either a ping
855 // message, or a timestamp we sent out. Also confirm that the timestamps
856 // are correct.
857 pi1_remote_timestamp->MakeWatcher(
858 channel.second,
859 [pi1_timestamp_channel, ping_timestamp_channel, &ping_on_pi2_fetcher,
860 &ping_on_pi1_fetcher, &pi1_on_pi2_timestamp_fetcher,
861 &pi1_on_pi1_timestamp_fetcher, &simulated_event_loop_factory, pi2,
Austin Schuhac6d89e2024-03-27 14:56:09 -0700862 channel_index = channel.first,
863 channel_name = channel.second](const RemoteMessage &header) {
864 VLOG(1) << channel_name << " aos::message_bridge::RemoteMessage -> "
865 << aos::FlatbufferToJson(&header);
Austin Schuh89c9b812021-02-20 14:42:10 -0800866 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuhcdd90272021-03-15 12:46:16 -0700867 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
Austin Schuh89c9b812021-02-20 14:42:10 -0800868 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
Austin Schuhcdd90272021-03-15 12:46:16 -0700869 ->boot_uuid());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700870
Austin Schuh89c9b812021-02-20 14:42:10 -0800871 const aos::monotonic_clock::time_point header_monotonic_sent_time(
872 chrono::nanoseconds(header.monotonic_sent_time()));
873 const aos::realtime_clock::time_point header_realtime_sent_time(
874 chrono::nanoseconds(header.realtime_sent_time()));
875 const aos::monotonic_clock::time_point header_monotonic_remote_time(
876 chrono::nanoseconds(header.monotonic_remote_time()));
Austin Schuhac6d89e2024-03-27 14:56:09 -0700877 const aos::monotonic_clock::time_point
878 header_monotonic_remote_transmit_time(
879 chrono::nanoseconds(header.monotonic_remote_transmit_time()));
Austin Schuh89c9b812021-02-20 14:42:10 -0800880 const aos::realtime_clock::time_point header_realtime_remote_time(
881 chrono::nanoseconds(header.realtime_remote_time()));
Austin Schuh2f8fd752020-09-01 22:38:28 -0700882
Austin Schuh89c9b812021-02-20 14:42:10 -0800883 if (channel_index != -1) {
884 ASSERT_EQ(channel_index, header.channel_index());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700885 }
886
Austin Schuh89c9b812021-02-20 14:42:10 -0800887 const Context *pi1_context = nullptr;
888 const Context *pi2_context = nullptr;
889
890 if (header.channel_index() == pi1_timestamp_channel) {
891 // Find the forwarded message.
892 while (pi1_on_pi2_timestamp_fetcher.context().monotonic_event_time <
893 header_monotonic_sent_time) {
894 ASSERT_TRUE(pi1_on_pi2_timestamp_fetcher.FetchNext());
895 }
896
897 // And the source message.
898 while (pi1_on_pi1_timestamp_fetcher.context().monotonic_event_time <
899 header_monotonic_remote_time) {
900 ASSERT_TRUE(pi1_on_pi1_timestamp_fetcher.FetchNext());
901 }
902
903 pi1_context = &pi1_on_pi1_timestamp_fetcher.context();
904 pi2_context = &pi1_on_pi2_timestamp_fetcher.context();
Austin Schuhac6d89e2024-03-27 14:56:09 -0700905
906 EXPECT_EQ(header_monotonic_remote_transmit_time,
907 pi2_context->monotonic_remote_time);
Austin Schuh89c9b812021-02-20 14:42:10 -0800908 } else if (header.channel_index() == ping_timestamp_channel) {
909 // Find the forwarded message.
910 while (ping_on_pi2_fetcher.context().monotonic_event_time <
911 header_monotonic_sent_time) {
912 ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
913 }
914
915 // And the source message.
916 while (ping_on_pi1_fetcher.context().monotonic_event_time <
917 header_monotonic_remote_time) {
918 ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
919 }
920
921 pi1_context = &ping_on_pi1_fetcher.context();
922 pi2_context = &ping_on_pi2_fetcher.context();
Austin Schuhac6d89e2024-03-27 14:56:09 -0700923
924 EXPECT_EQ(header_monotonic_remote_transmit_time,
925 pi2_context->monotonic_event_time -
926 simulated_event_loop_factory.network_delay());
Austin Schuh89c9b812021-02-20 14:42:10 -0800927 } else {
928 LOG(FATAL) << "Unknown channel";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700929 }
930
Austin Schuh89c9b812021-02-20 14:42:10 -0800931 // Confirm the forwarded message has matching timestamps to the
932 // timestamps we got back.
933 EXPECT_EQ(pi2_context->queue_index, header.queue_index());
934 EXPECT_EQ(pi2_context->remote_queue_index,
935 header.remote_queue_index());
936 EXPECT_EQ(pi2_context->monotonic_event_time,
937 header_monotonic_sent_time);
938 EXPECT_EQ(pi2_context->realtime_event_time,
939 header_realtime_sent_time);
940 EXPECT_EQ(pi2_context->realtime_remote_time,
941 header_realtime_remote_time);
942 EXPECT_EQ(pi2_context->monotonic_remote_time,
943 header_monotonic_remote_time);
Austin Schuhac6d89e2024-03-27 14:56:09 -0700944 EXPECT_EQ(pi2_context->monotonic_remote_transmit_time,
945 header_monotonic_remote_transmit_time);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700946
Austin Schuh89c9b812021-02-20 14:42:10 -0800947 // Confirm the forwarded message also matches the source message.
948 EXPECT_EQ(pi1_context->queue_index, header.remote_queue_index());
949 EXPECT_EQ(pi1_context->monotonic_event_time,
950 header_monotonic_remote_time);
951 EXPECT_EQ(pi1_context->realtime_event_time,
952 header_realtime_remote_time);
953 });
954 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700955
Austin Schuh4c3b9702020-08-30 11:34:55 -0700956 simulated_event_loop_factory.RunFor(chrono::seconds(10) -
957 chrono::milliseconds(500) +
958 chrono::milliseconds(5));
959
960 EXPECT_EQ(pi1_pong_counter.count(), 1001);
961 EXPECT_EQ(pi2_pong_counter.count(), 1001);
962
963 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 100);
964 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 100);
965 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 100);
966 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 100);
967 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 100);
968 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 100);
969 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 100);
970
Austin Schuh20ac95d2020-12-05 17:24:19 -0800971 EXPECT_EQ(pi1_server_statistics_count, 10);
972 EXPECT_EQ(pi2_server_statistics_count, 10);
973 EXPECT_EQ(pi3_server_statistics_count, 10);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700974
975 EXPECT_EQ(pi1_client_statistics_count, 95);
976 EXPECT_EQ(pi2_client_statistics_count, 95);
977 EXPECT_EQ(pi3_client_statistics_count, 95);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700978
979 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -0800980 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 1101);
981 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 1101);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700982}
983
984// Tests that an offset between nodes can be recovered and shows up in
985// ServerStatistics correctly.
986TEST(SimulatedEventLoopTest, MultinodePingPongWithOffset) {
987 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
Austin Schuh373f1762021-06-02 21:07:09 -0700988 aos::configuration::ReadConfig(ArtifactPath(
989 "aos/events/multinode_pingpong_test_combined_config.json"));
Austin Schuh4c3b9702020-08-30 11:34:55 -0700990 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
Austin Schuh87dd3832021-01-01 23:07:31 -0800991 const size_t pi1_index = configuration::GetNodeIndex(&config.message(), pi1);
992 ASSERT_EQ(pi1_index, 0u);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700993 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
Austin Schuh87dd3832021-01-01 23:07:31 -0800994 const size_t pi2_index = configuration::GetNodeIndex(&config.message(), pi2);
995 ASSERT_EQ(pi2_index, 1u);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700996 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
Austin Schuh87dd3832021-01-01 23:07:31 -0800997 const size_t pi3_index = configuration::GetNodeIndex(&config.message(), pi3);
998 ASSERT_EQ(pi3_index, 2u);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700999
Austin Schuh87dd3832021-01-01 23:07:31 -08001000 message_bridge::TestingTimeConverter time(
1001 configuration::NodesCount(&config.message()));
Austin Schuh4c3b9702020-08-30 11:34:55 -07001002 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
Austin Schuh58646e22021-08-23 23:51:46 -07001003 simulated_event_loop_factory.SetTimeConverter(&time);
Austin Schuh4c3b9702020-08-30 11:34:55 -07001004
1005 constexpr chrono::milliseconds kOffset{1501};
Austin Schuh87dd3832021-01-01 23:07:31 -08001006 time.AddNextTimestamp(
1007 distributed_clock::epoch(),
Austin Schuh58646e22021-08-23 23:51:46 -07001008 {BootTimestamp::epoch(), BootTimestamp::epoch() + kOffset,
1009 BootTimestamp::epoch()});
Austin Schuh4c3b9702020-08-30 11:34:55 -07001010
1011 std::unique_ptr<EventLoop> ping_event_loop =
1012 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
1013 Ping ping(ping_event_loop.get());
1014
1015 std::unique_ptr<EventLoop> pong_event_loop =
1016 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
1017 Pong pong(pong_event_loop.get());
1018
Austin Schuh8fb315a2020-11-19 22:33:58 -08001019 // Wait to let timestamp estimation start up before looking for the results.
1020 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
1021
Austin Schuh87dd3832021-01-01 23:07:31 -08001022 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
1023 simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
1024
Austin Schuh4c3b9702020-08-30 11:34:55 -07001025 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
1026 simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
1027
1028 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
1029 simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
1030
Austin Schuh4c3b9702020-08-30 11:34:55 -07001031 // Confirm the offsets are being recovered correctly.
1032 int pi1_server_statistics_count = 0;
1033 pi1_pong_counter_event_loop->MakeWatcher(
1034 "/pi1/aos", [&pi1_server_statistics_count,
1035 kOffset](const message_bridge::ServerStatistics &stats) {
1036 VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
1037 EXPECT_EQ(stats.connections()->size(), 2u);
1038 for (const message_bridge::ServerConnection *connection :
1039 *stats.connections()) {
1040 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001041 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -07001042 if (connection->node()->name()->string_view() == "pi2") {
1043 EXPECT_EQ(connection->monotonic_offset(),
1044 chrono::nanoseconds(kOffset).count());
1045 } else if (connection->node()->name()->string_view() == "pi3") {
1046 EXPECT_EQ(connection->monotonic_offset(), 0);
1047 } else {
1048 LOG(FATAL) << "Unknown connection";
1049 }
1050
1051 EXPECT_TRUE(connection->has_monotonic_offset());
1052 }
1053 ++pi1_server_statistics_count;
1054 });
1055
1056 int pi2_server_statistics_count = 0;
1057 pi2_pong_counter_event_loop->MakeWatcher(
1058 "/pi2/aos", [&pi2_server_statistics_count,
1059 kOffset](const message_bridge::ServerStatistics &stats) {
1060 VLOG(1) << "pi2 ServerStatistics " << FlatbufferToJson(&stats);
1061 EXPECT_EQ(stats.connections()->size(), 1u);
1062
1063 const message_bridge::ServerConnection *connection =
1064 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001065 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -07001066 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1067 EXPECT_TRUE(connection->has_monotonic_offset());
1068 EXPECT_EQ(connection->monotonic_offset(),
1069 -chrono::nanoseconds(kOffset).count());
1070 ++pi2_server_statistics_count;
1071 });
1072
1073 int pi3_server_statistics_count = 0;
1074 pi3_pong_counter_event_loop->MakeWatcher(
1075 "/pi3/aos", [&pi3_server_statistics_count](
1076 const message_bridge::ServerStatistics &stats) {
1077 VLOG(1) << "pi3 ServerStatistics " << FlatbufferToJson(&stats);
1078 EXPECT_EQ(stats.connections()->size(), 1u);
1079
1080 const message_bridge::ServerConnection *connection =
1081 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001082 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -07001083 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1084 EXPECT_TRUE(connection->has_monotonic_offset());
1085 EXPECT_EQ(connection->monotonic_offset(), 0);
1086 ++pi3_server_statistics_count;
1087 });
1088
1089 simulated_event_loop_factory.RunFor(chrono::seconds(10) -
1090 chrono::milliseconds(500) +
1091 chrono::milliseconds(5));
1092
Austin Schuh20ac95d2020-12-05 17:24:19 -08001093 EXPECT_EQ(pi1_server_statistics_count, 10);
Austin Schuh58646e22021-08-23 23:51:46 -07001094 EXPECT_EQ(pi2_server_statistics_count, 10);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001095 EXPECT_EQ(pi3_server_statistics_count, 10);
Austin Schuh4c3b9702020-08-30 11:34:55 -07001096}
1097
1098// Test that disabling statistics actually disables them.
Austin Schuh89c9b812021-02-20 14:42:10 -08001099TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeWithoutStatistics) {
Austin Schuh4c3b9702020-08-30 11:34:55 -07001100 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
1101 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
1102 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
1103
1104 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
1105 simulated_event_loop_factory.DisableStatistics();
1106
1107 std::unique_ptr<EventLoop> ping_event_loop =
1108 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
1109 Ping ping(ping_event_loop.get());
1110
1111 std::unique_ptr<EventLoop> pong_event_loop =
1112 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
1113 Pong pong(pong_event_loop.get());
1114
1115 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
1116 simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
1117
1118 MessageCounter<examples::Pong> pi2_pong_counter(
1119 pi2_pong_counter_event_loop.get(), "/test");
1120
1121 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
1122 simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
1123
1124 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
1125 simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
1126
1127 MessageCounter<examples::Pong> pi1_pong_counter(
1128 pi1_pong_counter_event_loop.get(), "/test");
1129
1130 // Count timestamps.
1131 MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
1132 pi1_pong_counter_event_loop.get(), "/pi1/aos");
1133 MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
1134 pi2_pong_counter_event_loop.get(), "/pi1/aos");
1135 MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
1136 pi3_pong_counter_event_loop.get(), "/pi1/aos");
1137 MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
1138 pi1_pong_counter_event_loop.get(), "/pi2/aos");
1139 MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
1140 pi2_pong_counter_event_loop.get(), "/pi2/aos");
1141 MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
1142 pi1_pong_counter_event_loop.get(), "/pi3/aos");
1143 MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
1144 pi3_pong_counter_event_loop.get(), "/pi3/aos");
1145
Austin Schuh2f8fd752020-09-01 22:38:28 -07001146 // Count remote timestamps
Austin Schuh89c9b812021-02-20 14:42:10 -08001147 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1148 remote_timestamps_pi2_on_pi1 =
1149 MakePi2OnPi1MessageCounters(pi1_pong_counter_event_loop.get());
1150 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1151 remote_timestamps_pi1_on_pi2 =
1152 MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
Austin Schuh2f8fd752020-09-01 22:38:28 -07001153
Austin Schuh4c3b9702020-08-30 11:34:55 -07001154 MessageCounter<message_bridge::ServerStatistics>
1155 pi1_server_statistics_counter(pi1_pong_counter_event_loop.get(),
1156 "/pi1/aos");
1157 MessageCounter<message_bridge::ServerStatistics>
1158 pi2_server_statistics_counter(pi2_pong_counter_event_loop.get(),
1159 "/pi2/aos");
1160 MessageCounter<message_bridge::ServerStatistics>
1161 pi3_server_statistics_counter(pi3_pong_counter_event_loop.get(),
1162 "/pi3/aos");
1163
1164 MessageCounter<message_bridge::ClientStatistics>
1165 pi1_client_statistics_counter(pi1_pong_counter_event_loop.get(),
1166 "/pi1/aos");
1167 MessageCounter<message_bridge::ClientStatistics>
1168 pi2_client_statistics_counter(pi2_pong_counter_event_loop.get(),
1169 "/pi2/aos");
1170 MessageCounter<message_bridge::ClientStatistics>
1171 pi3_client_statistics_counter(pi3_pong_counter_event_loop.get(),
1172 "/pi3/aos");
Austin Schuh898f4972020-01-11 17:21:25 -08001173
1174 simulated_event_loop_factory.RunFor(chrono::seconds(10) +
1175 chrono::milliseconds(5));
1176
Austin Schuh4c3b9702020-08-30 11:34:55 -07001177 EXPECT_EQ(pi1_pong_counter.count(), 1001u);
1178 EXPECT_EQ(pi2_pong_counter.count(), 1001u);
1179
1180 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 0u);
1181 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 0u);
1182 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 0u);
1183 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 0u);
1184 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 0u);
1185 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 0u);
1186 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 0u);
1187
1188 EXPECT_EQ(pi1_server_statistics_counter.count(), 0u);
1189 EXPECT_EQ(pi2_server_statistics_counter.count(), 0u);
1190 EXPECT_EQ(pi3_server_statistics_counter.count(), 0u);
1191
1192 EXPECT_EQ(pi1_client_statistics_counter.count(), 0u);
1193 EXPECT_EQ(pi2_client_statistics_counter.count(), 0u);
1194 EXPECT_EQ(pi3_client_statistics_counter.count(), 0u);
Austin Schuh2f8fd752020-09-01 22:38:28 -07001195
1196 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001197 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 1001);
1198 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 1001);
Austin Schuh898f4972020-01-11 17:21:25 -08001199}
1200
Austin Schuhc0b0f722020-12-12 18:36:06 -08001201bool AllConnected(const message_bridge::ServerStatistics *server_statistics) {
1202 for (const message_bridge::ServerConnection *connection :
1203 *server_statistics->connections()) {
1204 if (connection->state() != message_bridge::State::CONNECTED) {
1205 return false;
1206 }
1207 }
1208 return true;
1209}
1210
1211bool AllConnectedBut(const message_bridge::ServerStatistics *server_statistics,
1212 std::string_view target) {
1213 for (const message_bridge::ServerConnection *connection :
1214 *server_statistics->connections()) {
1215 if (connection->node()->name()->string_view() == target) {
1216 if (connection->state() == message_bridge::State::CONNECTED) {
1217 return false;
1218 }
1219 } else {
1220 if (connection->state() != message_bridge::State::CONNECTED) {
1221 return false;
1222 }
1223 }
1224 }
1225 return true;
1226}
1227
1228bool AllConnected(const message_bridge::ClientStatistics *client_statistics) {
1229 for (const message_bridge::ClientConnection *connection :
1230 *client_statistics->connections()) {
1231 if (connection->state() != message_bridge::State::CONNECTED) {
1232 return false;
1233 }
Austin Schuh367a7f42021-11-23 23:04:36 -08001234 EXPECT_TRUE(connection->has_boot_uuid());
1235 EXPECT_TRUE(connection->has_connected_since_time());
1236 EXPECT_TRUE(connection->has_connection_count());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001237 }
1238 return true;
1239}
1240
1241bool AllConnectedBut(const message_bridge::ClientStatistics *client_statistics,
1242 std::string_view target) {
1243 for (const message_bridge::ClientConnection *connection :
1244 *client_statistics->connections()) {
1245 if (connection->node()->name()->string_view() == target) {
1246 if (connection->state() == message_bridge::State::CONNECTED) {
1247 return false;
1248 }
Austin Schuh367a7f42021-11-23 23:04:36 -08001249 EXPECT_FALSE(connection->has_boot_uuid());
1250 EXPECT_FALSE(connection->has_connected_since_time());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001251 } else {
1252 if (connection->state() != message_bridge::State::CONNECTED) {
1253 return false;
1254 }
Austin Schuh367a7f42021-11-23 23:04:36 -08001255 EXPECT_TRUE(connection->has_boot_uuid());
1256 EXPECT_TRUE(connection->has_connected_since_time());
1257 EXPECT_TRUE(connection->has_connection_count());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001258 }
1259 }
1260 return true;
1261}
1262
Austin Schuh367a7f42021-11-23 23:04:36 -08001263int ConnectedCount(const message_bridge::ClientStatistics *client_statistics,
1264 std::string_view target) {
1265 for (const message_bridge::ClientConnection *connection :
1266 *client_statistics->connections()) {
1267 if (connection->node()->name()->string_view() == target) {
1268 return connection->connection_count();
1269 }
1270 }
1271 return 0;
1272}
1273
1274int ConnectedCount(const message_bridge::ServerStatistics *server_statistics,
1275 std::string_view target) {
1276 for (const message_bridge::ServerConnection *connection :
1277 *server_statistics->connections()) {
1278 if (connection->node()->name()->string_view() == target) {
1279 return connection->connection_count();
1280 }
1281 }
1282 return 0;
1283}
1284
Austin Schuhc0b0f722020-12-12 18:36:06 -08001285// Test that disconnecting nodes actually disconnects them.
Austin Schuh89c9b812021-02-20 14:42:10 -08001286TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeDisconnect) {
Austin Schuhc0b0f722020-12-12 18:36:06 -08001287 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
1288
Austin Schuh58646e22021-08-23 23:51:46 -07001289 NodeEventLoopFactory *pi1 =
1290 simulated_event_loop_factory.GetNodeEventLoopFactory("pi1");
1291 NodeEventLoopFactory *pi2 =
1292 simulated_event_loop_factory.GetNodeEventLoopFactory("pi2");
1293 NodeEventLoopFactory *pi3 =
1294 simulated_event_loop_factory.GetNodeEventLoopFactory("pi3");
1295
1296 std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001297 Ping ping(ping_event_loop.get());
1298
Austin Schuh58646e22021-08-23 23:51:46 -07001299 std::unique_ptr<EventLoop> pong_event_loop = pi2->MakeEventLoop("pong");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001300 Pong pong(pong_event_loop.get());
1301
1302 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
Austin Schuh58646e22021-08-23 23:51:46 -07001303 pi2->MakeEventLoop("pi2_pong_counter");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001304
1305 MessageCounter<examples::Pong> pi2_pong_counter(
1306 pi2_pong_counter_event_loop.get(), "/test");
1307
1308 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
Austin Schuh58646e22021-08-23 23:51:46 -07001309 pi3->MakeEventLoop("pi3_pong_counter");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001310
1311 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
Austin Schuh58646e22021-08-23 23:51:46 -07001312 pi1->MakeEventLoop("pi1_pong_counter");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001313
1314 MessageCounter<examples::Pong> pi1_pong_counter(
1315 pi1_pong_counter_event_loop.get(), "/test");
1316
1317 // Count timestamps.
1318 MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
1319 pi1_pong_counter_event_loop.get(), "/pi1/aos");
1320 MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
1321 pi2_pong_counter_event_loop.get(), "/pi1/aos");
1322 MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
1323 pi3_pong_counter_event_loop.get(), "/pi1/aos");
1324 MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
1325 pi1_pong_counter_event_loop.get(), "/pi2/aos");
1326 MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
1327 pi2_pong_counter_event_loop.get(), "/pi2/aos");
1328 MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
1329 pi1_pong_counter_event_loop.get(), "/pi3/aos");
1330 MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
1331 pi3_pong_counter_event_loop.get(), "/pi3/aos");
1332
1333 // Count remote timestamps
Austin Schuh89c9b812021-02-20 14:42:10 -08001334 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1335 remote_timestamps_pi2_on_pi1 =
1336 MakePi2OnPi1MessageCounters(pi1_pong_counter_event_loop.get());
1337 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1338 remote_timestamps_pi1_on_pi2 =
1339 MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001340
1341 MessageCounter<message_bridge::ServerStatistics>
Austin Schuh58646e22021-08-23 23:51:46 -07001342 *pi1_server_statistics_counter;
1343 pi1->OnStartup([pi1, &pi1_server_statistics_counter]() {
1344 pi1_server_statistics_counter =
1345 pi1->AlwaysStart<MessageCounter<message_bridge::ServerStatistics>>(
1346 "pi1_server_statistics_counter", "/pi1/aos");
1347 });
1348
Austin Schuhc0b0f722020-12-12 18:36:06 -08001349 aos::Fetcher<message_bridge::ServerStatistics> pi1_server_statistics_fetcher =
1350 pi1_pong_counter_event_loop
1351 ->MakeFetcher<message_bridge::ServerStatistics>("/pi1/aos");
1352 aos::Fetcher<message_bridge::ClientStatistics> pi1_client_statistics_fetcher =
1353 pi1_pong_counter_event_loop
1354 ->MakeFetcher<message_bridge::ClientStatistics>("/pi1/aos");
1355
1356 MessageCounter<message_bridge::ServerStatistics>
Austin Schuh58646e22021-08-23 23:51:46 -07001357 *pi2_server_statistics_counter;
1358 pi2->OnStartup([pi2, &pi2_server_statistics_counter]() {
1359 pi2_server_statistics_counter =
1360 pi2->AlwaysStart<MessageCounter<message_bridge::ServerStatistics>>(
1361 "pi2_server_statistics_counter", "/pi2/aos");
1362 });
Austin Schuhc0b0f722020-12-12 18:36:06 -08001363 aos::Fetcher<message_bridge::ServerStatistics> pi2_server_statistics_fetcher =
1364 pi2_pong_counter_event_loop
1365 ->MakeFetcher<message_bridge::ServerStatistics>("/pi2/aos");
1366 aos::Fetcher<message_bridge::ClientStatistics> pi2_client_statistics_fetcher =
1367 pi2_pong_counter_event_loop
1368 ->MakeFetcher<message_bridge::ClientStatistics>("/pi2/aos");
1369
1370 MessageCounter<message_bridge::ServerStatistics>
Austin Schuh58646e22021-08-23 23:51:46 -07001371 *pi3_server_statistics_counter;
1372 pi3->OnStartup([pi3, &pi3_server_statistics_counter]() {
1373 pi3_server_statistics_counter =
1374 pi3->AlwaysStart<MessageCounter<message_bridge::ServerStatistics>>(
1375 "pi3_server_statistics_counter", "/pi3/aos");
1376 });
Austin Schuhc0b0f722020-12-12 18:36:06 -08001377 aos::Fetcher<message_bridge::ServerStatistics> pi3_server_statistics_fetcher =
1378 pi3_pong_counter_event_loop
1379 ->MakeFetcher<message_bridge::ServerStatistics>("/pi3/aos");
1380 aos::Fetcher<message_bridge::ClientStatistics> pi3_client_statistics_fetcher =
1381 pi3_pong_counter_event_loop
1382 ->MakeFetcher<message_bridge::ClientStatistics>("/pi3/aos");
1383
1384 MessageCounter<message_bridge::ClientStatistics>
1385 pi1_client_statistics_counter(pi1_pong_counter_event_loop.get(),
1386 "/pi1/aos");
1387 MessageCounter<message_bridge::ClientStatistics>
1388 pi2_client_statistics_counter(pi2_pong_counter_event_loop.get(),
1389 "/pi2/aos");
1390 MessageCounter<message_bridge::ClientStatistics>
1391 pi3_client_statistics_counter(pi3_pong_counter_event_loop.get(),
1392 "/pi3/aos");
1393
James Kuszmaul86e86c32022-07-21 17:39:47 -07001394 std::vector<std::unique_ptr<aos::EventLoop>> statistics_watcher_loops;
1395 statistics_watcher_loops.emplace_back(pi1->MakeEventLoop("test"));
1396 statistics_watcher_loops.emplace_back(pi2->MakeEventLoop("test"));
1397 statistics_watcher_loops.emplace_back(pi3->MakeEventLoop("test"));
1398 // The currenct contract is that, if all nodes boot simultaneously in
1399 // simulation, that they should all act as if they area already connected,
1400 // without ever observing the transition from disconnected to connected (note
1401 // that on a real system the ServerStatistics message will get resent for each
1402 // and every new connection, even if the new connections happen
1403 // "simultaneously"--in simulation, we are essentially acting as if we are
1404 // starting execution in an already running system, rather than observing the
1405 // boot process).
1406 for (auto &event_loop : statistics_watcher_loops) {
1407 event_loop->MakeWatcher(
1408 "/aos", [](const message_bridge::ServerStatistics &msg) {
1409 for (const message_bridge::ServerConnection *connection :
1410 *msg.connections()) {
1411 EXPECT_EQ(message_bridge::State::CONNECTED, connection->state())
1412 << connection->node()->name()->string_view();
1413 }
1414 });
1415 }
1416
Austin Schuhc0b0f722020-12-12 18:36:06 -08001417 simulated_event_loop_factory.RunFor(chrono::seconds(2) +
1418 chrono::milliseconds(5));
1419
James Kuszmaul86e86c32022-07-21 17:39:47 -07001420 statistics_watcher_loops.clear();
1421
Austin Schuhc0b0f722020-12-12 18:36:06 -08001422 EXPECT_EQ(pi1_pong_counter.count(), 201u);
1423 EXPECT_EQ(pi2_pong_counter.count(), 201u);
1424
1425 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 20u);
1426 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 20u);
1427 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 20u);
1428 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 20u);
1429 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 20u);
1430 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 20u);
1431 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 20u);
1432
Austin Schuh58646e22021-08-23 23:51:46 -07001433 EXPECT_EQ(pi1_server_statistics_counter->count(), 2u);
1434 EXPECT_EQ(pi2_server_statistics_counter->count(), 2u);
1435 EXPECT_EQ(pi3_server_statistics_counter->count(), 2u);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001436
1437 EXPECT_EQ(pi1_client_statistics_counter.count(), 20u);
1438 EXPECT_EQ(pi2_client_statistics_counter.count(), 20u);
1439 EXPECT_EQ(pi3_client_statistics_counter.count(), 20u);
1440
1441 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001442 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 221);
1443 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 221);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001444
1445 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1446 EXPECT_TRUE(AllConnected(pi1_server_statistics_fetcher.get()))
1447 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1448 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1449 EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
1450 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1451 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1452 EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
1453 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1454 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1455 EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
1456 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1457 EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
1458 EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
1459 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
1460 EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
1461 EXPECT_TRUE(AllConnected(pi3_client_statistics_fetcher.get()))
1462 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1463
Austin Schuh58646e22021-08-23 23:51:46 -07001464 pi1->Disconnect(pi3->node());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001465
1466 simulated_event_loop_factory.RunFor(chrono::seconds(2));
1467
1468 EXPECT_EQ(pi1_pong_counter.count(), 401u);
1469 EXPECT_EQ(pi2_pong_counter.count(), 401u);
1470
1471 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 40u);
1472 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 40u);
1473 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 20u);
1474 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 40u);
1475 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 40u);
1476 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 40u);
1477 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 40u);
1478
Austin Schuh58646e22021-08-23 23:51:46 -07001479 EXPECT_EQ(pi1_server_statistics_counter->count(), 4u);
1480 EXPECT_EQ(pi2_server_statistics_counter->count(), 4u);
1481 EXPECT_EQ(pi3_server_statistics_counter->count(), 4u);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001482
1483 EXPECT_EQ(pi1_client_statistics_counter.count(), 40u);
1484 EXPECT_EQ(pi2_client_statistics_counter.count(), 40u);
1485 EXPECT_EQ(pi3_client_statistics_counter.count(), 40u);
1486
1487 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001488 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 441);
1489 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 441);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001490
1491 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1492 EXPECT_TRUE(AllConnectedBut(pi1_server_statistics_fetcher.get(), "pi3"))
1493 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1494 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1495 EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
1496 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1497 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1498 EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
1499 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1500 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1501 EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
1502 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1503 EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
1504 EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
1505 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
1506 EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
1507 EXPECT_TRUE(AllConnectedBut(pi3_client_statistics_fetcher.get(), "pi1"))
1508 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1509
Austin Schuh58646e22021-08-23 23:51:46 -07001510 pi1->Connect(pi3->node());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001511
1512 simulated_event_loop_factory.RunFor(chrono::seconds(2));
1513
Austin Schuh367a7f42021-11-23 23:04:36 -08001514 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1515 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1516 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1517 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1518 EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
1519 EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
1520
1521 EXPECT_EQ(ConnectedCount(pi1_server_statistics_fetcher.get(), "pi3"), 2u)
1522 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1523 EXPECT_EQ(ConnectedCount(pi1_server_statistics_fetcher.get(), "pi2"), 1u)
1524 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1525 EXPECT_EQ(ConnectedCount(pi1_client_statistics_fetcher.get(), "pi3"), 1u)
1526 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1527 EXPECT_EQ(ConnectedCount(pi1_client_statistics_fetcher.get(), "pi2"), 1u)
1528 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1529
1530 EXPECT_EQ(ConnectedCount(pi2_server_statistics_fetcher.get(), "pi1"), 1u)
1531 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1532 EXPECT_EQ(ConnectedCount(pi2_client_statistics_fetcher.get(), "pi1"), 1u)
1533 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1534
1535 EXPECT_EQ(ConnectedCount(pi3_server_statistics_fetcher.get(), "pi1"), 1u)
1536 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
1537 EXPECT_EQ(ConnectedCount(pi3_client_statistics_fetcher.get(), "pi1"), 2u)
1538 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1539
Austin Schuhc0b0f722020-12-12 18:36:06 -08001540 EXPECT_EQ(pi1_pong_counter.count(), 601u);
1541 EXPECT_EQ(pi2_pong_counter.count(), 601u);
1542
1543 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 60u);
1544 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 60u);
1545 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 40u);
1546 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 60u);
1547 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 60u);
1548 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 60u);
1549 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 60u);
1550
Austin Schuh58646e22021-08-23 23:51:46 -07001551 EXPECT_EQ(pi1_server_statistics_counter->count(), 6u);
1552 EXPECT_EQ(pi2_server_statistics_counter->count(), 6u);
1553 EXPECT_EQ(pi3_server_statistics_counter->count(), 6u);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001554
1555 EXPECT_EQ(pi1_client_statistics_counter.count(), 60u);
1556 EXPECT_EQ(pi2_client_statistics_counter.count(), 60u);
1557 EXPECT_EQ(pi3_client_statistics_counter.count(), 60u);
1558
1559 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001560 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 661);
1561 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 661);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001562
Austin Schuhc0b0f722020-12-12 18:36:06 -08001563 EXPECT_TRUE(AllConnected(pi1_server_statistics_fetcher.get()))
1564 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001565 EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
1566 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001567 EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
1568 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001569 EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
1570 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001571 EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
1572 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001573 EXPECT_TRUE(AllConnected(pi3_client_statistics_fetcher.get()))
1574 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1575}
1576
Austin Schuh2febf0d2020-09-21 22:24:30 -07001577// Tests that the time offset having a slope doesn't break the world.
1578// SimulatedMessageBridge has enough self consistency CHECK statements to
1579// confirm, and we can can also check a message in each direction to make sure
1580// it gets delivered as expected.
1581TEST(SimulatedEventLoopTest, MultinodePingPongWithOffsetAndSlope) {
1582 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
Austin Schuh373f1762021-06-02 21:07:09 -07001583 aos::configuration::ReadConfig(ArtifactPath(
1584 "aos/events/multinode_pingpong_test_combined_config.json"));
Austin Schuh2febf0d2020-09-21 22:24:30 -07001585 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
Austin Schuh87dd3832021-01-01 23:07:31 -08001586 const size_t pi1_index = configuration::GetNodeIndex(&config.message(), pi1);
1587 ASSERT_EQ(pi1_index, 0u);
Austin Schuh2febf0d2020-09-21 22:24:30 -07001588 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
Austin Schuh87dd3832021-01-01 23:07:31 -08001589 const size_t pi2_index = configuration::GetNodeIndex(&config.message(), pi2);
1590 ASSERT_EQ(pi2_index, 1u);
1591 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
1592 const size_t pi3_index = configuration::GetNodeIndex(&config.message(), pi3);
1593 ASSERT_EQ(pi3_index, 2u);
Austin Schuh2febf0d2020-09-21 22:24:30 -07001594
Austin Schuh87dd3832021-01-01 23:07:31 -08001595 message_bridge::TestingTimeConverter time(
1596 configuration::NodesCount(&config.message()));
Austin Schuh2febf0d2020-09-21 22:24:30 -07001597 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
Austin Schuh58646e22021-08-23 23:51:46 -07001598 simulated_event_loop_factory.SetTimeConverter(&time);
Austin Schuh2febf0d2020-09-21 22:24:30 -07001599
Austin Schuh2febf0d2020-09-21 22:24:30 -07001600 constexpr chrono::milliseconds kOffset{150100};
Austin Schuh87dd3832021-01-01 23:07:31 -08001601 time.AddNextTimestamp(
1602 distributed_clock::epoch(),
Austin Schuh58646e22021-08-23 23:51:46 -07001603 {BootTimestamp::epoch(), BootTimestamp::epoch() + kOffset,
1604 BootTimestamp::epoch()});
1605 time.AddNextTimestamp(distributed_clock::epoch() + chrono::seconds(10),
1606 {BootTimestamp::epoch() + chrono::milliseconds(9999),
1607 BootTimestamp::epoch() + kOffset + chrono::seconds(10),
1608 BootTimestamp::epoch() + chrono::milliseconds(9999)});
Austin Schuh2febf0d2020-09-21 22:24:30 -07001609
1610 std::unique_ptr<EventLoop> ping_event_loop =
1611 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
1612 Ping ping(ping_event_loop.get());
1613
1614 std::unique_ptr<EventLoop> pong_event_loop =
1615 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
1616 Pong pong(pong_event_loop.get());
1617
1618 std::unique_ptr<EventLoop> pi1_counter_event_loop =
1619 simulated_event_loop_factory.MakeEventLoop("pi1_counter", pi1);
1620 std::unique_ptr<EventLoop> pi2_counter_event_loop =
1621 simulated_event_loop_factory.MakeEventLoop("pi2_counter", pi2);
1622
1623 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
1624 pi1_counter_event_loop->MakeFetcher<examples::Ping>("/test");
1625 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
1626 pi2_counter_event_loop->MakeFetcher<examples::Ping>("/test");
1627
1628 aos::Fetcher<examples::Pong> pong_on_pi2_fetcher =
1629 pi2_counter_event_loop->MakeFetcher<examples::Pong>("/test");
1630 aos::Fetcher<examples::Pong> pong_on_pi1_fetcher =
1631 pi1_counter_event_loop->MakeFetcher<examples::Pong>("/test");
1632
1633 // End after a pong message comes back. This will leave the latest messages
1634 // on all channels so we can look at timestamps easily and check they make
1635 // sense.
1636 std::unique_ptr<EventLoop> pi1_pong_ender =
1637 simulated_event_loop_factory.MakeEventLoop("pi2_counter", pi1);
1638 int count = 0;
1639 pi1_pong_ender->MakeWatcher(
1640 "/test", [&simulated_event_loop_factory, &count](const examples::Pong &) {
1641 if (++count == 100) {
1642 simulated_event_loop_factory.Exit();
1643 }
1644 });
1645
1646 // Run enough that messages should be delivered.
1647 simulated_event_loop_factory.Run();
1648
1649 // Grab the latest messages.
1650 EXPECT_TRUE(ping_on_pi1_fetcher.Fetch());
1651 EXPECT_TRUE(ping_on_pi2_fetcher.Fetch());
1652 EXPECT_TRUE(pong_on_pi1_fetcher.Fetch());
1653 EXPECT_TRUE(pong_on_pi2_fetcher.Fetch());
1654
1655 // Compute their time on the global distributed clock so we can compute
1656 // distance betwen them.
1657 const distributed_clock::time_point pi1_ping_time =
1658 simulated_event_loop_factory.GetNodeEventLoopFactory(pi1)
1659 ->ToDistributedClock(
1660 ping_on_pi1_fetcher.context().monotonic_event_time);
1661 const distributed_clock::time_point pi2_ping_time =
1662 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
1663 ->ToDistributedClock(
1664 ping_on_pi2_fetcher.context().monotonic_event_time);
1665 const distributed_clock::time_point pi1_pong_time =
1666 simulated_event_loop_factory.GetNodeEventLoopFactory(pi1)
1667 ->ToDistributedClock(
1668 pong_on_pi1_fetcher.context().monotonic_event_time);
1669 const distributed_clock::time_point pi2_pong_time =
1670 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
1671 ->ToDistributedClock(
1672 pong_on_pi2_fetcher.context().monotonic_event_time);
1673
1674 // And confirm the delivery delay is just about exactly 150 uS for both
1675 // directions like expected. There will be a couple ns of rounding errors in
1676 // the conversion functions that aren't worth accounting for right now. This
1677 // will either be really close, or really far.
1678 EXPECT_GE(pi2_ping_time, chrono::microseconds(150) - chrono::nanoseconds(10) +
1679 pi1_ping_time);
1680 EXPECT_LE(pi2_ping_time, chrono::microseconds(150) + chrono::nanoseconds(10) +
1681 pi1_ping_time);
1682
1683 EXPECT_GE(pi1_pong_time, chrono::microseconds(150) - chrono::nanoseconds(10) +
1684 pi2_pong_time);
1685 EXPECT_LE(pi1_pong_time, chrono::microseconds(150) + chrono::nanoseconds(10) +
1686 pi2_pong_time);
1687}
1688
Austin Schuh4c570ea2020-11-19 23:13:24 -08001689void SendPing(aos::Sender<examples::Ping> *sender, int value) {
1690 aos::Sender<examples::Ping>::Builder builder = sender->MakeBuilder();
1691 examples::Ping::Builder ping_builder = builder.MakeBuilder<examples::Ping>();
1692 ping_builder.add_value(value);
milind1f1dca32021-07-03 13:50:07 -07001693 builder.CheckOk(builder.Send(ping_builder.Finish()));
Austin Schuh4c570ea2020-11-19 23:13:24 -08001694}
1695
1696// Tests that reliable (and unreliable) ping messages get forwarded as expected.
Austin Schuh89c9b812021-02-20 14:42:10 -08001697TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeStartupTesting) {
Austin Schuh4c570ea2020-11-19 23:13:24 -08001698 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
1699 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
1700
1701 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
1702
1703 std::unique_ptr<EventLoop> ping_event_loop =
1704 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
1705 aos::Sender<examples::Ping> pi1_reliable_sender =
1706 ping_event_loop->MakeSender<examples::Ping>("/reliable");
1707 aos::Sender<examples::Ping> pi1_unreliable_sender =
1708 ping_event_loop->MakeSender<examples::Ping>("/unreliable");
1709 SendPing(&pi1_reliable_sender, 1);
1710 SendPing(&pi1_unreliable_sender, 1);
1711
1712 std::unique_ptr<EventLoop> pi2_pong_event_loop =
1713 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
James Kuszmaul86e86c32022-07-21 17:39:47 -07001714 aos::Sender<examples::Ping> pi2_reliable_sender =
1715 pi2_pong_event_loop->MakeSender<examples::Ping>("/reliable2");
1716 SendPing(&pi2_reliable_sender, 1);
Austin Schuh4c570ea2020-11-19 23:13:24 -08001717 MessageCounter<examples::Ping> pi2_reliable_counter(pi2_pong_event_loop.get(),
1718 "/reliable");
James Kuszmaul86e86c32022-07-21 17:39:47 -07001719 MessageCounter<examples::Ping> pi1_reliable_counter(ping_event_loop.get(),
1720 "/reliable2");
Austin Schuh4c570ea2020-11-19 23:13:24 -08001721 MessageCounter<examples::Ping> pi2_unreliable_counter(
1722 pi2_pong_event_loop.get(), "/unreliable");
1723 aos::Fetcher<examples::Ping> reliable_on_pi2_fetcher =
1724 pi2_pong_event_loop->MakeFetcher<examples::Ping>("/reliable");
1725 aos::Fetcher<examples::Ping> unreliable_on_pi2_fetcher =
1726 pi2_pong_event_loop->MakeFetcher<examples::Ping>("/unreliable");
1727
1728 const size_t reliable_channel_index = configuration::ChannelIndex(
1729 pi2_pong_event_loop->configuration(), reliable_on_pi2_fetcher.channel());
1730
1731 std::unique_ptr<EventLoop> pi1_remote_timestamp =
1732 simulated_event_loop_factory.MakeEventLoop("pi1_remote_timestamp", pi1);
1733
Austin Schuheeaa2022021-01-02 21:52:03 -08001734 const chrono::nanoseconds network_delay =
1735 simulated_event_loop_factory.network_delay();
1736
Austin Schuh4c570ea2020-11-19 23:13:24 -08001737 int reliable_timestamp_count = 0;
1738 pi1_remote_timestamp->MakeWatcher(
Austin Schuh89c9b812021-02-20 14:42:10 -08001739 shared() ? "/pi1/aos/remote_timestamps/pi2"
1740 : "/pi1/aos/remote_timestamps/pi2/reliable/aos-examples-Ping",
Austin Schuh20ac95d2020-12-05 17:24:19 -08001741 [reliable_channel_index, &reliable_timestamp_count,
Austin Schuheeaa2022021-01-02 21:52:03 -08001742 &simulated_event_loop_factory, pi2, network_delay, &pi2_pong_event_loop,
1743 &pi1_remote_timestamp](const RemoteMessage &header) {
Austin Schuh20ac95d2020-12-05 17:24:19 -08001744 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuhcdd90272021-03-15 12:46:16 -07001745 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
Austin Schuh20ac95d2020-12-05 17:24:19 -08001746 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
Austin Schuhcdd90272021-03-15 12:46:16 -07001747 ->boot_uuid());
Austin Schuh4c570ea2020-11-19 23:13:24 -08001748 VLOG(1) << aos::FlatbufferToJson(&header);
1749 if (header.channel_index() == reliable_channel_index) {
1750 ++reliable_timestamp_count;
1751 }
Austin Schuheeaa2022021-01-02 21:52:03 -08001752
1753 const aos::monotonic_clock::time_point header_monotonic_sent_time(
1754 chrono::nanoseconds(header.monotonic_sent_time()));
1755
1756 EXPECT_EQ(pi1_remote_timestamp->context().monotonic_event_time,
1757 header_monotonic_sent_time + network_delay +
1758 (pi1_remote_timestamp->monotonic_now() -
1759 pi2_pong_event_loop->monotonic_now()));
Austin Schuh4c570ea2020-11-19 23:13:24 -08001760 });
1761
1762 // Wait to let timestamp estimation start up before looking for the results.
1763 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
1764
1765 EXPECT_EQ(pi2_reliable_counter.count(), 1u);
1766 // This one isn't reliable, but was sent before the start. It should *not* be
1767 // delivered.
1768 EXPECT_EQ(pi2_unreliable_counter.count(), 0u);
1769 // Confirm we got a timestamp logged for the message that was forwarded.
1770 EXPECT_EQ(reliable_timestamp_count, 1u);
1771
1772 SendPing(&pi1_reliable_sender, 2);
1773 SendPing(&pi1_unreliable_sender, 2);
1774 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
1775 EXPECT_EQ(pi2_reliable_counter.count(), 2u);
James Kuszmaul86e86c32022-07-21 17:39:47 -07001776 EXPECT_EQ(pi1_reliable_counter.count(), 1u);
Austin Schuh4c570ea2020-11-19 23:13:24 -08001777 EXPECT_EQ(pi2_unreliable_counter.count(), 1u);
1778
1779 EXPECT_EQ(reliable_timestamp_count, 2u);
1780}
1781
Austin Schuh20ac95d2020-12-05 17:24:19 -08001782// Tests that rebooting a node changes the ServerStatistics message and the
1783// RemoteTimestamp message.
Austin Schuh89c9b812021-02-20 14:42:10 -08001784TEST_P(RemoteMessageSimulatedEventLoopTest, BootUUIDTest) {
Austin Schuh72e65682021-09-02 11:37:05 -07001785 const UUID pi1_boot0 = UUID::Random();
1786 const UUID pi2_boot0 = UUID::Random();
1787 const UUID pi2_boot1 = UUID::Random();
1788 const UUID pi3_boot0 = UUID::Random();
1789 UUID expected_boot_uuid = pi2_boot0;
James Kuszmaul6d6b2282024-05-22 09:51:15 -07001790 int boot_number = 0;
1791 monotonic_clock::time_point expected_connection_time;
Austin Schuh20ac95d2020-12-05 17:24:19 -08001792
Austin Schuh58646e22021-08-23 23:51:46 -07001793 message_bridge::TestingTimeConverter time(
1794 configuration::NodesCount(&config.message()));
1795 SimulatedEventLoopFactory factory(&config.message());
1796 factory.SetTimeConverter(&time);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001797
Austin Schuh58646e22021-08-23 23:51:46 -07001798 const size_t pi1_index =
1799 configuration::GetNodeIndex(&config.message(), "pi1");
1800 const size_t pi2_index =
1801 configuration::GetNodeIndex(&config.message(), "pi2");
1802 const size_t pi3_index =
1803 configuration::GetNodeIndex(&config.message(), "pi3");
Austin Schuh20ac95d2020-12-05 17:24:19 -08001804
Austin Schuh58646e22021-08-23 23:51:46 -07001805 {
1806 time.AddNextTimestamp(distributed_clock::epoch(),
1807 {BootTimestamp::epoch(), BootTimestamp::epoch(),
1808 BootTimestamp::epoch()});
1809
1810 const chrono::nanoseconds dt = chrono::milliseconds(2001);
1811
1812 time.AddNextTimestamp(
1813 distributed_clock::epoch() + dt,
1814 {BootTimestamp::epoch() + dt,
1815 BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
1816 BootTimestamp::epoch() + dt});
1817
1818 time.set_boot_uuid(pi1_index, 0, pi1_boot0);
1819 time.set_boot_uuid(pi2_index, 0, pi2_boot0);
1820 time.set_boot_uuid(pi2_index, 1, pi2_boot1);
1821 time.set_boot_uuid(pi3_index, 0, pi3_boot0);
1822 }
1823
1824 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1825 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
1826
1827 pi1->OnStartup([pi1]() { pi1->AlwaysStart<Ping>("ping"); });
1828 pi2->OnStartup([pi2]() { pi2->AlwaysStart<Pong>("pong"); });
Austin Schuh20ac95d2020-12-05 17:24:19 -08001829
1830 std::unique_ptr<EventLoop> pi1_remote_timestamp =
Austin Schuh58646e22021-08-23 23:51:46 -07001831 pi1->MakeEventLoop("pi1_remote_timestamp");
Austin Schuh20ac95d2020-12-05 17:24:19 -08001832
1833 int timestamp_count = 0;
1834 pi1_remote_timestamp->MakeWatcher(
Austin Schuh8902fa52021-03-14 22:39:24 -07001835 "/pi2/aos", [&expected_boot_uuid,
1836 &pi1_remote_timestamp](const message_bridge::Timestamp &) {
Austin Schuha9012be2021-07-21 15:19:11 -07001837 EXPECT_EQ(pi1_remote_timestamp->context().source_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -07001838 expected_boot_uuid);
1839 });
1840 pi1_remote_timestamp->MakeWatcher(
1841 "/test",
1842 [&expected_boot_uuid, &pi1_remote_timestamp](const examples::Pong &) {
Austin Schuha9012be2021-07-21 15:19:11 -07001843 EXPECT_EQ(pi1_remote_timestamp->context().source_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -07001844 expected_boot_uuid);
1845 });
1846 pi1_remote_timestamp->MakeWatcher(
Austin Schuh89c9b812021-02-20 14:42:10 -08001847 shared() ? "/pi1/aos/remote_timestamps/pi2"
1848 : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping",
Austin Schuh20ac95d2020-12-05 17:24:19 -08001849 [&timestamp_count, &expected_boot_uuid](const RemoteMessage &header) {
1850 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuhcdd90272021-03-15 12:46:16 -07001851 EXPECT_EQ(UUID::FromVector(header.boot_uuid()), expected_boot_uuid);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001852 VLOG(1) << aos::FlatbufferToJson(&header);
1853 ++timestamp_count;
1854 });
1855
1856 int pi1_server_statistics_count = 0;
Austin Schuh58646e22021-08-23 23:51:46 -07001857 bool first_pi1_server_statistics = true;
James Kuszmaul6d6b2282024-05-22 09:51:15 -07001858 expected_connection_time = pi1->monotonic_now();
Austin Schuh20ac95d2020-12-05 17:24:19 -08001859 pi1_remote_timestamp->MakeWatcher(
Austin Schuh367a7f42021-11-23 23:04:36 -08001860 "/pi1/aos",
1861 [&pi1_server_statistics_count, &expected_boot_uuid,
1862 &expected_connection_time, &first_pi1_server_statistics,
1863 &boot_number](const message_bridge::ServerStatistics &stats) {
Austin Schuh20ac95d2020-12-05 17:24:19 -08001864 VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
1865 for (const message_bridge::ServerConnection *connection :
1866 *stats.connections()) {
Austin Schuh58646e22021-08-23 23:51:46 -07001867 if (connection->state() == message_bridge::State::CONNECTED) {
1868 ASSERT_TRUE(connection->has_boot_uuid());
1869 }
1870 if (!first_pi1_server_statistics) {
1871 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1872 }
Austin Schuh20ac95d2020-12-05 17:24:19 -08001873 if (connection->node()->name()->string_view() == "pi2") {
Austin Schuh58646e22021-08-23 23:51:46 -07001874 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1875 ASSERT_TRUE(connection->has_boot_uuid());
Austin Schuh20ac95d2020-12-05 17:24:19 -08001876 EXPECT_EQ(expected_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -07001877 UUID::FromString(connection->boot_uuid()))
Austin Schuh20ac95d2020-12-05 17:24:19 -08001878 << " : Got " << aos::FlatbufferToJson(&stats);
Austin Schuh367a7f42021-11-23 23:04:36 -08001879 EXPECT_EQ(monotonic_clock::time_point(chrono::nanoseconds(
1880 connection->connected_since_time())),
1881 expected_connection_time);
1882 EXPECT_EQ(boot_number + 1, connection->connection_count());
Austin Schuh20ac95d2020-12-05 17:24:19 -08001883 ++pi1_server_statistics_count;
1884 }
1885 }
Austin Schuh58646e22021-08-23 23:51:46 -07001886 first_pi1_server_statistics = false;
Austin Schuh20ac95d2020-12-05 17:24:19 -08001887 });
1888
Austin Schuh58646e22021-08-23 23:51:46 -07001889 int pi1_client_statistics_count = 0;
1890 pi1_remote_timestamp->MakeWatcher(
Austin Schuh367a7f42021-11-23 23:04:36 -08001891 "/pi1/aos", [&pi1_client_statistics_count, &expected_boot_uuid,
1892 &expected_connection_time, &boot_number](
Austin Schuh58646e22021-08-23 23:51:46 -07001893 const message_bridge::ClientStatistics &stats) {
1894 VLOG(1) << "pi1 ClientStatistics " << FlatbufferToJson(&stats);
1895 for (const message_bridge::ClientConnection *connection :
1896 *stats.connections()) {
1897 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1898 if (connection->node()->name()->string_view() == "pi2") {
1899 ++pi1_client_statistics_count;
Austin Schuh367a7f42021-11-23 23:04:36 -08001900 EXPECT_EQ(expected_boot_uuid,
1901 UUID::FromString(connection->boot_uuid()))
1902 << " : Got " << aos::FlatbufferToJson(&stats);
1903 EXPECT_EQ(monotonic_clock::time_point(chrono::nanoseconds(
1904 connection->connected_since_time())),
1905 expected_connection_time);
1906 EXPECT_EQ(boot_number + 1, connection->connection_count());
1907 } else {
1908 EXPECT_EQ(connection->connected_since_time(), 0);
1909 EXPECT_EQ(1, connection->connection_count());
Austin Schuh58646e22021-08-23 23:51:46 -07001910 }
1911 }
1912 });
1913
1914 // Confirm that reboot changes the UUID.
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -07001915 pi2->OnShutdown([&expected_boot_uuid, &boot_number, &expected_connection_time,
1916 pi1, pi2, pi2_boot1]() {
1917 expected_boot_uuid = pi2_boot1;
1918 ++boot_number;
1919 LOG(INFO) << "OnShutdown triggered for pi2";
1920 pi2->OnStartup(
1921 [&expected_boot_uuid, &expected_connection_time, pi1, pi2]() {
1922 EXPECT_EQ(expected_boot_uuid, pi2->boot_uuid());
1923 expected_connection_time = pi1->monotonic_now();
1924 });
1925 });
Austin Schuh58646e22021-08-23 23:51:46 -07001926
Austin Schuh20ac95d2020-12-05 17:24:19 -08001927 // Let a couple of ServerStatistics messages show up before rebooting.
Austin Schuh58646e22021-08-23 23:51:46 -07001928 factory.RunFor(chrono::milliseconds(2002));
Austin Schuh20ac95d2020-12-05 17:24:19 -08001929
1930 EXPECT_GT(timestamp_count, 100);
1931 EXPECT_GE(pi1_server_statistics_count, 1u);
1932
Austin Schuh20ac95d2020-12-05 17:24:19 -08001933 timestamp_count = 0;
1934 pi1_server_statistics_count = 0;
1935
Austin Schuh58646e22021-08-23 23:51:46 -07001936 factory.RunFor(chrono::milliseconds(2000));
Austin Schuh20ac95d2020-12-05 17:24:19 -08001937 EXPECT_GT(timestamp_count, 100);
1938 EXPECT_GE(pi1_server_statistics_count, 1u);
1939}
1940
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -07001941INSTANTIATE_TEST_SUITE_P(
Austin Schuh89c9b812021-02-20 14:42:10 -08001942 All, RemoteMessageSimulatedEventLoopTest,
1943 ::testing::Values(
1944 Param{"multinode_pingpong_test_combined_config.json", true},
1945 Param{"multinode_pingpong_test_split_config.json", false}));
1946
Austin Schuh58646e22021-08-23 23:51:46 -07001947// Tests that Startup and Shutdown do reasonable things.
1948TEST(SimulatedEventLoopTest, MultinodePingPongStartup) {
1949 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
1950 aos::configuration::ReadConfig(
1951 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
1952
Austin Schuh72e65682021-09-02 11:37:05 -07001953 size_t pi1_shutdown_counter = 0;
1954 size_t pi2_shutdown_counter = 0;
1955 MessageCounter<examples::Pong> *pi1_pong_counter = nullptr;
1956 MessageCounter<examples::Ping> *pi2_ping_counter = nullptr;
1957
Austin Schuh58646e22021-08-23 23:51:46 -07001958 message_bridge::TestingTimeConverter time(
1959 configuration::NodesCount(&config.message()));
1960 SimulatedEventLoopFactory factory(&config.message());
1961 factory.SetTimeConverter(&time);
1962 time.AddNextTimestamp(
1963 distributed_clock::epoch(),
1964 {BootTimestamp::epoch(), BootTimestamp::epoch(), BootTimestamp::epoch()});
1965
1966 const chrono::nanoseconds dt = chrono::seconds(10) + chrono::milliseconds(6);
1967
1968 time.AddNextTimestamp(
1969 distributed_clock::epoch() + dt,
1970 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
1971 BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
1972 BootTimestamp::epoch() + dt});
1973
1974 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1975 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
1976
1977 // Configure startup to start Ping and Pong, and count.
1978 size_t pi1_startup_counter = 0;
1979 size_t pi2_startup_counter = 0;
1980 pi1->OnStartup([pi1]() {
1981 LOG(INFO) << "Made ping";
1982 pi1->AlwaysStart<Ping>("ping");
1983 });
1984 pi1->OnStartup([&pi1_startup_counter]() { ++pi1_startup_counter; });
1985 pi2->OnStartup([pi2]() {
1986 LOG(INFO) << "Made pong";
1987 pi2->AlwaysStart<Pong>("pong");
1988 });
1989 pi2->OnStartup([&pi2_startup_counter]() { ++pi2_startup_counter; });
1990
1991 // Shutdown just counts.
Austin Schuh58646e22021-08-23 23:51:46 -07001992 pi1->OnShutdown([&pi1_shutdown_counter]() { ++pi1_shutdown_counter; });
1993 pi2->OnShutdown([&pi2_shutdown_counter]() { ++pi2_shutdown_counter; });
1994
Austin Schuh58646e22021-08-23 23:51:46 -07001995 // Automatically make counters on startup.
1996 pi1->OnStartup([&pi1_pong_counter, pi1]() {
1997 pi1_pong_counter = pi1->AlwaysStart<MessageCounter<examples::Pong>>(
1998 "pi1_pong_counter", "/test");
1999 });
2000 pi1->OnShutdown([&pi1_pong_counter]() { pi1_pong_counter = nullptr; });
2001 pi2->OnStartup([&pi2_ping_counter, pi2]() {
2002 pi2_ping_counter = pi2->AlwaysStart<MessageCounter<examples::Ping>>(
2003 "pi2_ping_counter", "/test");
2004 });
2005 pi2->OnShutdown([&pi2_ping_counter]() { pi2_ping_counter = nullptr; });
2006
2007 EXPECT_EQ(pi2_ping_counter, nullptr);
2008 EXPECT_EQ(pi1_pong_counter, nullptr);
2009
2010 EXPECT_EQ(pi1_startup_counter, 0u);
2011 EXPECT_EQ(pi2_startup_counter, 0u);
2012 EXPECT_EQ(pi1_shutdown_counter, 0u);
2013 EXPECT_EQ(pi2_shutdown_counter, 0u);
2014
2015 factory.RunFor(chrono::seconds(10) + chrono::milliseconds(5));
2016 EXPECT_EQ(pi1_startup_counter, 1u);
2017 EXPECT_EQ(pi2_startup_counter, 1u);
2018 EXPECT_EQ(pi1_shutdown_counter, 0u);
2019 EXPECT_EQ(pi2_shutdown_counter, 0u);
2020 EXPECT_EQ(pi2_ping_counter->count(), 1001);
2021 EXPECT_EQ(pi1_pong_counter->count(), 1001);
2022
2023 LOG(INFO) << pi1->monotonic_now();
2024 LOG(INFO) << pi2->monotonic_now();
2025
2026 factory.RunFor(chrono::seconds(5) + chrono::milliseconds(5));
2027
2028 EXPECT_EQ(pi1_startup_counter, 2u);
2029 EXPECT_EQ(pi2_startup_counter, 2u);
2030 EXPECT_EQ(pi1_shutdown_counter, 1u);
2031 EXPECT_EQ(pi2_shutdown_counter, 1u);
2032 EXPECT_EQ(pi2_ping_counter->count(), 501);
2033 EXPECT_EQ(pi1_pong_counter->count(), 501);
2034}
2035
2036// Tests that OnStartup handlers can be added after running and get called, and
2037// can't be called when running.
2038TEST(SimulatedEventLoopDeathTest, OnStartupWhileRunning) {
2039 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2040 aos::configuration::ReadConfig(
2041 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2042
2043 // Test that we can add startup handlers as long as we aren't running, and
2044 // they get run when Run gets called again.
2045 // Test that adding a startup handler when running fails.
2046 //
2047 // Test shutdown handlers get called on destruction.
2048 SimulatedEventLoopFactory factory(&config.message());
2049
2050 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2051
2052 int startup_count0 = 0;
2053 int startup_count1 = 0;
2054
2055 pi1->OnStartup([&]() { ++startup_count0; });
2056 EXPECT_EQ(startup_count0, 0);
2057 EXPECT_EQ(startup_count1, 0);
2058
2059 factory.RunFor(chrono::nanoseconds(1));
2060 EXPECT_EQ(startup_count0, 1);
2061 EXPECT_EQ(startup_count1, 0);
2062
2063 pi1->OnStartup([&]() { ++startup_count1; });
2064 EXPECT_EQ(startup_count0, 1);
2065 EXPECT_EQ(startup_count1, 0);
2066
2067 factory.RunFor(chrono::nanoseconds(1));
2068 EXPECT_EQ(startup_count0, 1);
2069 EXPECT_EQ(startup_count1, 1);
2070
2071 std::unique_ptr<EventLoop> loop = pi1->MakeEventLoop("foo");
2072 loop->OnRun([&]() { pi1->OnStartup([]() {}); });
2073
2074 EXPECT_DEATH({ factory.RunFor(chrono::nanoseconds(1)); },
2075 "Can only register OnStartup handlers when not running.");
2076}
2077
2078// Tests that OnStartup handlers can be added after running and get called, and
2079// all the handlers get called on reboot. Shutdown handlers are tested the same
2080// way.
2081TEST(SimulatedEventLoopTest, OnStartupShutdownAllRestarts) {
2082 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2083 aos::configuration::ReadConfig(
2084 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2085
Austin Schuh72e65682021-09-02 11:37:05 -07002086 int startup_count0 = 0;
2087 int shutdown_count0 = 0;
2088 int startup_count1 = 0;
2089 int shutdown_count1 = 0;
2090
Austin Schuh58646e22021-08-23 23:51:46 -07002091 message_bridge::TestingTimeConverter time(
2092 configuration::NodesCount(&config.message()));
2093 SimulatedEventLoopFactory factory(&config.message());
2094 factory.SetTimeConverter(&time);
2095 time.StartEqual();
2096
2097 const chrono::nanoseconds dt = chrono::seconds(10);
2098 time.RebootAt(0, distributed_clock::epoch() + dt);
2099 time.RebootAt(0, distributed_clock::epoch() + 2 * dt);
2100
2101 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2102
Austin Schuh58646e22021-08-23 23:51:46 -07002103 pi1->OnStartup([&]() { ++startup_count0; });
2104 pi1->OnShutdown([&]() { ++shutdown_count0; });
2105 EXPECT_EQ(startup_count0, 0);
2106 EXPECT_EQ(startup_count1, 0);
2107 EXPECT_EQ(shutdown_count0, 0);
2108 EXPECT_EQ(shutdown_count1, 0);
2109
2110 factory.RunFor(chrono::nanoseconds(1));
2111 EXPECT_EQ(startup_count0, 1);
2112 EXPECT_EQ(startup_count1, 0);
2113 EXPECT_EQ(shutdown_count0, 0);
2114 EXPECT_EQ(shutdown_count1, 0);
2115
2116 pi1->OnStartup([&]() { ++startup_count1; });
2117 EXPECT_EQ(startup_count0, 1);
2118 EXPECT_EQ(startup_count1, 0);
2119 EXPECT_EQ(shutdown_count0, 0);
2120 EXPECT_EQ(shutdown_count1, 0);
2121
2122 factory.RunFor(chrono::nanoseconds(1));
2123 EXPECT_EQ(startup_count0, 1);
2124 EXPECT_EQ(startup_count1, 1);
2125 EXPECT_EQ(shutdown_count0, 0);
2126 EXPECT_EQ(shutdown_count1, 0);
2127
2128 factory.RunFor(chrono::seconds(15));
2129
2130 EXPECT_EQ(startup_count0, 2);
2131 EXPECT_EQ(startup_count1, 2);
2132 EXPECT_EQ(shutdown_count0, 1);
2133 EXPECT_EQ(shutdown_count1, 0);
2134
2135 pi1->OnShutdown([&]() { ++shutdown_count1; });
2136 factory.RunFor(chrono::seconds(10));
2137
2138 EXPECT_EQ(startup_count0, 3);
2139 EXPECT_EQ(startup_count1, 3);
2140 EXPECT_EQ(shutdown_count0, 2);
2141 EXPECT_EQ(shutdown_count1, 1);
2142}
2143
2144// Tests that event loops which outlive shutdown crash.
2145TEST(SimulatedEventLoopDeathTest, EventLoopOutlivesReboot) {
2146 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2147 aos::configuration::ReadConfig(
2148 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2149
2150 message_bridge::TestingTimeConverter time(
2151 configuration::NodesCount(&config.message()));
2152 SimulatedEventLoopFactory factory(&config.message());
2153 factory.SetTimeConverter(&time);
2154 time.StartEqual();
2155
2156 const chrono::nanoseconds dt = chrono::seconds(10);
2157 time.RebootAt(0, distributed_clock::epoch() + dt);
2158
2159 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2160
2161 std::unique_ptr<EventLoop> loop = pi1->MakeEventLoop("foo");
2162
2163 EXPECT_DEATH({ factory.RunFor(dt * 2); }, "Event loop");
2164}
2165
Brian Silvermane1fe2512022-08-14 23:18:50 -07002166// Test that an ExitHandle outliving its factory is caught.
2167TEST(SimulatedEventLoopDeathTest, ExitHandleOutlivesFactory) {
2168 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2169 aos::configuration::ReadConfig(
2170 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2171 auto factory = std::make_unique<SimulatedEventLoopFactory>(&config.message());
2172 NodeEventLoopFactory *pi1 = factory->GetNodeEventLoopFactory("pi1");
2173 std::unique_ptr<EventLoop> loop = pi1->MakeEventLoop("foo");
2174 auto exit_handle = factory->MakeExitHandle();
2175 EXPECT_DEATH(factory.reset(),
2176 "All ExitHandles must be destroyed before the factory");
2177}
2178
Austin Schuh3e31f912023-08-21 21:29:10 -07002179// Test that AllowApplicationCreationDuring can't happen in OnRun callbacks.
2180TEST(SimulatedEventLoopDeathTest, AllowApplicationCreationDuringInOnRun) {
2181 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2182 aos::configuration::ReadConfig(
2183 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2184 auto factory = std::make_unique<SimulatedEventLoopFactory>(&config.message());
2185 NodeEventLoopFactory *pi1 = factory->GetNodeEventLoopFactory("pi1");
2186 std::unique_ptr<EventLoop> loop = pi1->MakeEventLoop("foo");
2187 loop->OnRun([&]() { factory->AllowApplicationCreationDuring([]() {}); });
2188 EXPECT_DEATH(factory->RunFor(chrono::seconds(1)), "OnRun");
2189}
2190
Austin Schuh58646e22021-08-23 23:51:46 -07002191// Tests that messages don't survive a reboot of a node.
2192TEST(SimulatedEventLoopTest, ChannelClearedOnReboot) {
2193 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2194 aos::configuration::ReadConfig(
2195 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2196
2197 message_bridge::TestingTimeConverter time(
2198 configuration::NodesCount(&config.message()));
2199 SimulatedEventLoopFactory factory(&config.message());
2200 factory.SetTimeConverter(&time);
2201 time.StartEqual();
2202
2203 const chrono::nanoseconds dt = chrono::seconds(10);
2204 time.RebootAt(0, distributed_clock::epoch() + dt);
2205
2206 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2207
2208 const UUID boot_uuid = pi1->boot_uuid();
2209 EXPECT_NE(boot_uuid, UUID::Zero());
2210
2211 {
2212 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
2213 aos::Sender<examples::Ping> test_message_sender =
2214 ping_event_loop->MakeSender<examples::Ping>("/reliable");
2215 SendPing(&test_message_sender, 1);
2216 }
2217
2218 factory.RunFor(chrono::seconds(5));
2219
2220 {
2221 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
2222 aos::Fetcher<examples::Ping> fetcher =
2223 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
2224 EXPECT_TRUE(fetcher.Fetch());
2225 }
2226
2227 factory.RunFor(chrono::seconds(10));
2228
2229 {
2230 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
2231 aos::Fetcher<examples::Ping> fetcher =
2232 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
2233 EXPECT_FALSE(fetcher.Fetch());
2234 }
2235 EXPECT_NE(boot_uuid, pi1->boot_uuid());
2236}
2237
2238// Tests that reliable messages get resent on reboot.
2239TEST(SimulatedEventLoopTest, ReliableMessageResentOnReboot) {
2240 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2241 aos::configuration::ReadConfig(
2242 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2243
2244 message_bridge::TestingTimeConverter time(
2245 configuration::NodesCount(&config.message()));
2246 SimulatedEventLoopFactory factory(&config.message());
2247 factory.SetTimeConverter(&time);
2248 time.StartEqual();
2249
2250 const chrono::nanoseconds dt = chrono::seconds(1);
2251 time.RebootAt(1, distributed_clock::epoch() + dt);
2252
2253 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2254 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2255
2256 const UUID pi1_boot_uuid = pi1->boot_uuid();
2257 const UUID pi2_boot_uuid = pi2->boot_uuid();
2258 EXPECT_NE(pi1_boot_uuid, UUID::Zero());
2259 EXPECT_NE(pi2_boot_uuid, UUID::Zero());
2260
2261 {
2262 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
2263 aos::Sender<examples::Ping> test_message_sender =
2264 ping_event_loop->MakeSender<examples::Ping>("/reliable");
2265 SendPing(&test_message_sender, 1);
2266 }
2267
2268 factory.RunFor(chrono::milliseconds(500));
2269
2270 {
2271 ::std::unique_ptr<EventLoop> ping_event_loop = pi2->MakeEventLoop("pong");
2272 aos::Fetcher<examples::Ping> fetcher =
2273 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
Austin Schuhac6d89e2024-03-27 14:56:09 -07002274 ASSERT_TRUE(fetcher.Fetch());
2275 EXPECT_EQ(fetcher.context().monotonic_remote_time,
2276 monotonic_clock::epoch());
2277 // Message bridge picks up the Ping message immediately on reboot.
2278 EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
2279 monotonic_clock::epoch());
2280 EXPECT_EQ(fetcher.context().monotonic_event_time,
2281 monotonic_clock::epoch() + factory.network_delay());
2282 ASSERT_FALSE(fetcher.Fetch());
Austin Schuh58646e22021-08-23 23:51:46 -07002283 }
2284
2285 factory.RunFor(chrono::seconds(1));
2286
2287 {
2288 ::std::unique_ptr<EventLoop> ping_event_loop = pi2->MakeEventLoop("pong");
2289 aos::Fetcher<examples::Ping> fetcher =
2290 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
Austin Schuhac6d89e2024-03-27 14:56:09 -07002291 ASSERT_TRUE(fetcher.Fetch());
2292 EXPECT_EQ(fetcher.context().monotonic_remote_time,
2293 monotonic_clock::epoch());
2294 // Message bridge picks up the Ping message immediately on reboot.
2295 EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
2296 monotonic_clock::epoch() + chrono::seconds(1));
2297 EXPECT_EQ(fetcher.context().monotonic_event_time,
2298 monotonic_clock::epoch() + factory.network_delay());
2299 ASSERT_FALSE(fetcher.Fetch());
Austin Schuh58646e22021-08-23 23:51:46 -07002300 }
2301 EXPECT_NE(pi2_boot_uuid, pi2->boot_uuid());
2302}
2303
James Kuszmaul86e86c32022-07-21 17:39:47 -07002304TEST(SimulatedEventLoopTest, ReliableMessageSentOnStaggeredBoot) {
2305 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2306 aos::configuration::ReadConfig(
2307 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2308
2309 message_bridge::TestingTimeConverter time(
2310 configuration::NodesCount(&config.message()));
2311 time.AddNextTimestamp(
2312 distributed_clock::epoch(),
2313 {BootTimestamp{0, monotonic_clock::epoch()},
2314 BootTimestamp{0, monotonic_clock::epoch() - chrono::seconds(1)},
2315 BootTimestamp{0, monotonic_clock::epoch()}});
2316 SimulatedEventLoopFactory factory(&config.message());
2317 factory.SetTimeConverter(&time);
2318
2319 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2320 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2321
2322 const UUID pi1_boot_uuid = pi1->boot_uuid();
2323 const UUID pi2_boot_uuid = pi2->boot_uuid();
2324 EXPECT_NE(pi1_boot_uuid, UUID::Zero());
2325 EXPECT_NE(pi2_boot_uuid, UUID::Zero());
2326
2327 {
2328 ::std::unique_ptr<EventLoop> pi1_event_loop = pi1->MakeEventLoop("ping");
2329 aos::Sender<examples::Ping> pi1_sender =
2330 pi1_event_loop->MakeSender<examples::Ping>("/reliable");
2331 SendPing(&pi1_sender, 1);
2332 }
2333 ::std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("ping");
2334 aos::Sender<examples::Ping> pi2_sender =
2335 pi2_event_loop->MakeSender<examples::Ping>("/reliable2");
2336 SendPing(&pi2_sender, 1);
2337 // Verify that we staggered the OnRun callback correctly.
2338 pi2_event_loop->OnRun([pi1, pi2]() {
2339 EXPECT_EQ(pi1->monotonic_now(),
2340 monotonic_clock::epoch() + std::chrono::seconds(1));
2341 EXPECT_EQ(pi2->monotonic_now(), monotonic_clock::epoch());
2342 });
2343
2344 factory.RunFor(chrono::seconds(2));
2345
2346 {
2347 ::std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("pong");
2348 aos::Fetcher<examples::Ping> fetcher =
2349 pi2_event_loop->MakeFetcher<examples::Ping>("/reliable");
2350 ASSERT_TRUE(fetcher.Fetch());
2351 EXPECT_EQ(fetcher.context().monotonic_event_time,
2352 monotonic_clock::epoch() + factory.network_delay());
2353 EXPECT_EQ(fetcher.context().monotonic_remote_time,
2354 monotonic_clock::epoch());
Austin Schuhac6d89e2024-03-27 14:56:09 -07002355 EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
2356 monotonic_clock::epoch() + chrono::seconds(1));
James Kuszmaul86e86c32022-07-21 17:39:47 -07002357 }
2358 {
2359 ::std::unique_ptr<EventLoop> pi1_event_loop = pi1->MakeEventLoop("pong");
2360 aos::Fetcher<examples::Ping> fetcher =
2361 pi1_event_loop->MakeFetcher<examples::Ping>("/reliable2");
2362 ASSERT_TRUE(fetcher.Fetch());
2363 EXPECT_EQ(fetcher.context().monotonic_event_time,
2364 monotonic_clock::epoch() + std::chrono::seconds(1) +
2365 factory.network_delay());
2366 EXPECT_EQ(fetcher.context().monotonic_remote_time,
2367 monotonic_clock::epoch() - std::chrono::seconds(1));
Austin Schuhac6d89e2024-03-27 14:56:09 -07002368 EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
2369 monotonic_clock::epoch());
James Kuszmaul86e86c32022-07-21 17:39:47 -07002370 }
2371}
2372
Austin Schuh48205e62021-11-12 14:13:18 -08002373class SimulatedEventLoopDisconnectTest : public ::testing::Test {
2374 public:
2375 SimulatedEventLoopDisconnectTest()
2376 : config(aos::configuration::ReadConfig(ArtifactPath(
2377 "aos/events/multinode_pingpong_test_split_config.json"))),
2378 time(configuration::NodesCount(&config.message())),
2379 factory(&config.message()) {
2380 factory.SetTimeConverter(&time);
2381 }
2382
2383 void VerifyChannels(std::set<const aos::Channel *> statistics_channels,
2384 const monotonic_clock::time_point allowable_message_time,
2385 std::set<const aos::Node *> empty_nodes) {
2386 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2387 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2388 std::unique_ptr<aos::EventLoop> pi1_event_loop =
2389 pi1->MakeEventLoop("fetcher");
2390 std::unique_ptr<aos::EventLoop> pi2_event_loop =
2391 pi2->MakeEventLoop("fetcher");
2392 for (const aos::Channel *channel : *factory.configuration()->channels()) {
2393 if (configuration::ChannelIsReadableOnNode(channel,
2394 pi1_event_loop->node())) {
2395 std::unique_ptr<aos::RawFetcher> fetcher =
2396 pi1_event_loop->MakeRawFetcher(channel);
2397 if (statistics_channels.find(channel) == statistics_channels.end() ||
2398 empty_nodes.find(pi1_event_loop->node()) != empty_nodes.end()) {
2399 EXPECT_FALSE(fetcher->Fetch() &&
2400 fetcher->context().monotonic_event_time >
2401 allowable_message_time)
2402 << ": Found recent message on channel "
2403 << configuration::CleanedChannelToString(channel) << " and time "
2404 << fetcher->context().monotonic_event_time << " > "
2405 << allowable_message_time << " on pi1";
2406 } else {
2407 EXPECT_TRUE(fetcher->Fetch() &&
2408 fetcher->context().monotonic_event_time >=
2409 allowable_message_time)
2410 << ": Didn't find recent message on channel "
2411 << configuration::CleanedChannelToString(channel) << " on pi1";
2412 }
2413 }
2414 if (configuration::ChannelIsReadableOnNode(channel,
2415 pi2_event_loop->node())) {
2416 std::unique_ptr<aos::RawFetcher> fetcher =
2417 pi2_event_loop->MakeRawFetcher(channel);
2418 if (statistics_channels.find(channel) == statistics_channels.end() ||
2419 empty_nodes.find(pi2_event_loop->node()) != empty_nodes.end()) {
2420 EXPECT_FALSE(fetcher->Fetch() &&
2421 fetcher->context().monotonic_event_time >
2422 allowable_message_time)
2423 << ": Found message on channel "
2424 << configuration::CleanedChannelToString(channel) << " and time "
2425 << fetcher->context().monotonic_event_time << " > "
2426 << allowable_message_time << " on pi2";
2427 } else {
2428 EXPECT_TRUE(fetcher->Fetch() &&
2429 fetcher->context().monotonic_event_time >=
2430 allowable_message_time)
2431 << ": Didn't find message on channel "
2432 << configuration::CleanedChannelToString(channel) << " on pi2";
2433 }
2434 }
2435 }
2436 }
2437
2438 aos::FlatbufferDetachedBuffer<aos::Configuration> config;
2439
2440 message_bridge::TestingTimeConverter time;
2441 SimulatedEventLoopFactory factory;
2442};
2443
2444// Tests that if we have message bridge client/server disabled, and timing
2445// reports disabled, no messages are sent. Also tests that we can disconnect a
2446// node and disable statistics on it and it actually fully disconnects.
2447TEST_F(SimulatedEventLoopDisconnectTest, NoMessagesWhenDisabled) {
2448 time.StartEqual();
2449 factory.SkipTimingReport();
2450 factory.DisableStatistics();
2451
2452 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2453 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2454
2455 std::unique_ptr<aos::EventLoop> pi1_event_loop =
2456 pi1->MakeEventLoop("fetcher");
2457 std::unique_ptr<aos::EventLoop> pi2_event_loop =
2458 pi2->MakeEventLoop("fetcher");
2459
2460 factory.RunFor(chrono::milliseconds(100000));
2461
2462 // Confirm no messages are sent if we've configured them all off.
2463 VerifyChannels({}, monotonic_clock::min_time, {});
2464
2465 // Now, confirm that all the message_bridge channels come back when we
2466 // re-enable.
2467 factory.EnableStatistics();
2468
2469 factory.RunFor(chrono::milliseconds(10050));
2470
2471 // Build up the list of all the messages we expect when we come back.
2472 {
2473 std::set<const aos::Channel *> statistics_channels;
Brian Silverman18c8aac2022-01-02 00:03:37 -08002474 for (const std::pair<std::string_view, const Node *> &pi :
Austin Schuh48205e62021-11-12 14:13:18 -08002475 std::vector<std::pair<std::string_view, const Node *>>{
2476 {"/pi1/aos", pi1->node()},
2477 {"/pi2/aos", pi1->node()},
2478 {"/pi3/aos", pi1->node()}}) {
2479 statistics_channels.insert(configuration::GetChannel(
2480 factory.configuration(), pi.first, "aos.message_bridge.Timestamp", "",
2481 pi.second));
2482 statistics_channels.insert(configuration::GetChannel(
2483 factory.configuration(), pi.first,
2484 "aos.message_bridge.ServerStatistics", "", pi.second));
2485 statistics_channels.insert(configuration::GetChannel(
2486 factory.configuration(), pi.first,
2487 "aos.message_bridge.ClientStatistics", "", pi.second));
2488 }
2489
2490 statistics_channels.insert(configuration::GetChannel(
2491 factory.configuration(),
2492 "/pi1/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp",
2493 "aos.message_bridge.RemoteMessage", "", pi1->node()));
2494 statistics_channels.insert(configuration::GetChannel(
2495 factory.configuration(),
2496 "/pi2/aos/remote_timestamps/pi1/pi2/aos/aos-message_bridge-Timestamp",
2497 "aos.message_bridge.RemoteMessage", "", pi2->node()));
2498 VerifyChannels(statistics_channels, monotonic_clock::min_time, {});
2499 }
2500
2501 // Now test that we can disable the messages for a single node
2502 pi2->DisableStatistics();
2503 const aos::monotonic_clock::time_point statistics_disable_time =
2504 pi2->monotonic_now();
2505 factory.RunFor(chrono::milliseconds(10000));
2506
2507 // We should see a much smaller set of messages, but should still see messages
2508 // forwarded, mainly the timestamp message.
2509 {
2510 std::set<const aos::Channel *> statistics_channels;
Brian Silverman18c8aac2022-01-02 00:03:37 -08002511 for (const std::pair<std::string_view, const Node *> &pi :
Austin Schuh48205e62021-11-12 14:13:18 -08002512 std::vector<std::pair<std::string_view, const Node *>>{
2513 {"/pi1/aos", pi1->node()}, {"/pi3/aos", pi1->node()}}) {
2514 statistics_channels.insert(configuration::GetChannel(
2515 factory.configuration(), pi.first, "aos.message_bridge.Timestamp", "",
2516 pi.second));
2517 statistics_channels.insert(configuration::GetChannel(
2518 factory.configuration(), pi.first,
2519 "aos.message_bridge.ServerStatistics", "", pi.second));
2520 statistics_channels.insert(configuration::GetChannel(
2521 factory.configuration(), pi.first,
2522 "aos.message_bridge.ClientStatistics", "", pi.second));
2523 }
2524
2525 statistics_channels.insert(configuration::GetChannel(
2526 factory.configuration(),
2527 "/pi1/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp",
2528 "aos.message_bridge.RemoteMessage", "", pi1->node()));
2529 VerifyChannels(statistics_channels, statistics_disable_time, {});
2530 }
2531
2532 // Now, fully disconnect the node. This will completely quiet down pi2.
2533 pi1->Disconnect(pi2->node());
2534 pi2->Disconnect(pi1->node());
2535
2536 const aos::monotonic_clock::time_point disconnect_disable_time =
2537 pi2->monotonic_now();
2538 factory.RunFor(chrono::milliseconds(10000));
2539
2540 {
2541 std::set<const aos::Channel *> statistics_channels;
Brian Silverman18c8aac2022-01-02 00:03:37 -08002542 for (const std::pair<std::string_view, const Node *> &pi :
Austin Schuh48205e62021-11-12 14:13:18 -08002543 std::vector<std::pair<std::string_view, const Node *>>{
2544 {"/pi1/aos", pi1->node()}, {"/pi3/aos", pi1->node()}}) {
2545 statistics_channels.insert(configuration::GetChannel(
2546 factory.configuration(), pi.first, "aos.message_bridge.Timestamp", "",
2547 pi.second));
2548 statistics_channels.insert(configuration::GetChannel(
2549 factory.configuration(), pi.first,
2550 "aos.message_bridge.ServerStatistics", "", pi.second));
2551 statistics_channels.insert(configuration::GetChannel(
2552 factory.configuration(), pi.first,
2553 "aos.message_bridge.ClientStatistics", "", pi.second));
2554 }
2555
2556 VerifyChannels(statistics_channels, disconnect_disable_time, {pi2->node()});
2557 }
2558}
2559
Austin Schuh9cce6842024-04-02 18:55:44 -07002560// Struct to capture the expected time a message should be received (and it's
2561// value). This is from the perspective of the node receiving the message.
2562struct ExpectedTimestamps {
2563 // The time that the message was published on the sending node's monotonic
2564 // clock.
2565 monotonic_clock::time_point remote_time;
2566 // The time that the message was virtually transmitted over the virtual
2567 // network on the sending node's monotonic clock.
2568 monotonic_clock::time_point remote_transmit_time;
2569 // The time that the message was received on the receiving node's clock.
2570 monotonic_clock::time_point event_time;
2571 // The value inside the message.
2572 int value;
2573};
2574
Austin Schuhac6d89e2024-03-27 14:56:09 -07002575// Tests that rapidly sent messages get timestamped correctly.
2576TEST(SimulatedEventLoopTest, TransmitTimestamps) {
2577 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2578 aos::configuration::ReadConfig(
2579 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2580
2581 message_bridge::TestingTimeConverter time(
2582 configuration::NodesCount(&config.message()));
2583 SimulatedEventLoopFactory factory(&config.message());
2584 factory.SetTimeConverter(&time);
2585 time.StartEqual();
2586
2587 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2588 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2589
2590 ::std::unique_ptr<EventLoop> ping_event_loop = pi2->MakeEventLoop("pong");
2591 aos::Fetcher<examples::Ping> fetcher =
2592 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
2593 EXPECT_FALSE(fetcher.Fetch());
2594
2595 {
2596 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
Austin Schuh9cce6842024-04-02 18:55:44 -07002597 FunctionScheduler run_at(ping_event_loop.get());
Austin Schuhac6d89e2024-03-27 14:56:09 -07002598 aos::Sender<examples::Ping> test_message_sender =
2599 ping_event_loop->MakeSender<examples::Ping>("/reliable");
Austin Schuh9cce6842024-04-02 18:55:44 -07002600 aos::monotonic_clock::time_point now = ping_event_loop->monotonic_now();
Austin Schuhac6d89e2024-03-27 14:56:09 -07002601 for (const std::chrono::nanoseconds dt :
2602 {chrono::microseconds(5000), chrono::microseconds(1),
2603 chrono::microseconds(2), chrono::microseconds(70),
Austin Schuh9cce6842024-04-02 18:55:44 -07002604 chrono::microseconds(63), chrono::microseconds(140)}) {
2605 now += dt;
2606 run_at.ScheduleAt([&]() { SendPing(&test_message_sender, 1); }, now);
Austin Schuhac6d89e2024-03-27 14:56:09 -07002607 }
2608
Austin Schuh9cce6842024-04-02 18:55:44 -07002609 now += chrono::milliseconds(10);
2610
2611 factory.RunFor(now - ping_event_loop->monotonic_now());
Austin Schuhac6d89e2024-03-27 14:56:09 -07002612 }
2613
Austin Schuh9cce6842024-04-02 18:55:44 -07002614 const monotonic_clock::time_point e = monotonic_clock::epoch();
2615 const chrono::nanoseconds send_delay = factory.send_delay();
2616 const chrono::nanoseconds network_delay = factory.network_delay();
Austin Schuhac6d89e2024-03-27 14:56:09 -07002617
Austin Schuh9cce6842024-04-02 18:55:44 -07002618 const std::vector<ExpectedTimestamps> expected_values = {
2619 // First message shows up after wakeup + network delay as expected.
2620 ExpectedTimestamps{
2621 .remote_time = e + chrono::microseconds(5000),
2622 .remote_transmit_time = e + chrono::microseconds(5000) + send_delay,
2623 .event_time =
2624 e + chrono::microseconds(5000) + send_delay + network_delay,
2625 .value = 1,
2626 },
2627 // Next message is close enough that it gets picked up at the same wakeup.
2628 ExpectedTimestamps{
2629 .remote_time = e + chrono::microseconds(5001),
2630 .remote_transmit_time = e + chrono::microseconds(5000) + send_delay,
2631 .event_time =
2632 e + chrono::microseconds(5000) + send_delay + network_delay,
2633 .value = 1,
2634 },
2635 // Same for the third.
2636 ExpectedTimestamps{
2637 .remote_time = e + chrono::microseconds(5003),
2638 .remote_transmit_time = e + chrono::microseconds(5000) + send_delay,
2639 .event_time =
2640 e + chrono::microseconds(5000) + send_delay + network_delay,
2641 .value = 1,
2642 },
2643 // Fourth waits long enough to do the right thing.
2644 ExpectedTimestamps{
2645 .remote_time = e + chrono::microseconds(5073),
2646 .remote_transmit_time = e + chrono::microseconds(5073) + send_delay,
2647 .event_time =
2648 e + chrono::microseconds(5073) + send_delay + network_delay,
2649 .value = 1,
2650 },
2651 // Fifth waits long enough to do the right thing as well (but kicks off
2652 // while the fourth is in flight over the network).
2653 ExpectedTimestamps{
2654 .remote_time = e + chrono::microseconds(5136),
2655 .remote_transmit_time = e + chrono::microseconds(5136) + send_delay,
2656 .event_time =
2657 e + chrono::microseconds(5136) + send_delay + network_delay,
2658 .value = 1,
2659 },
2660 // Sixth waits long enough to do the right thing as well (but kicks off
2661 // while the fifth is in flight over the network and has almost landed).
2662 // The timer wakeup for the Timestamp message coming back will find the
2663 // sixth message a little bit early.
2664 ExpectedTimestamps{
2665 .remote_time = e + chrono::microseconds(5276),
2666 .remote_transmit_time = e + chrono::microseconds(5273) + send_delay,
2667 .event_time =
2668 e + chrono::microseconds(5273) + send_delay + network_delay,
2669 .value = 1,
2670 },
2671 };
Austin Schuhac6d89e2024-03-27 14:56:09 -07002672
Austin Schuh9cce6842024-04-02 18:55:44 -07002673 for (const ExpectedTimestamps value : expected_values) {
2674 ASSERT_TRUE(fetcher.FetchNext());
2675 EXPECT_EQ(fetcher.context().monotonic_remote_time, value.remote_time);
2676 EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
2677 value.remote_transmit_time);
2678 EXPECT_EQ(fetcher.context().monotonic_event_time, value.event_time);
2679 EXPECT_EQ(fetcher->value(), value.value);
2680 }
Austin Schuhac6d89e2024-03-27 14:56:09 -07002681
2682 ASSERT_FALSE(fetcher.FetchNext());
2683}
2684
2685// Tests that a reliable message gets forwarded if it was sent originally when
2686// nodes were disconnected.
2687TEST_F(SimulatedEventLoopDisconnectTest, ReliableMessageSendsOnConnect) {
2688 time.StartEqual();
2689 factory.SkipTimingReport();
2690 factory.DisableStatistics();
2691
2692 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2693 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2694
2695 // Fully disconnect the nodes.
2696 pi1->Disconnect(pi2->node());
2697 pi2->Disconnect(pi1->node());
2698
Austin Schuhac6d89e2024-03-27 14:56:09 -07002699 std::unique_ptr<aos::EventLoop> pi2_event_loop =
2700 pi2->MakeEventLoop("fetcher");
2701 aos::Fetcher<examples::Ping> pi2_reliable_fetcher =
2702 pi2_event_loop->MakeFetcher<examples::Ping>("/reliable");
2703
2704 factory.RunFor(chrono::milliseconds(100));
2705
2706 {
Austin Schuheeb86fc2024-04-04 20:12:39 -07002707 std::unique_ptr<aos::EventLoop> pi1_event_loop =
2708 pi1->MakeEventLoop("sender");
Austin Schuhac6d89e2024-03-27 14:56:09 -07002709 aos::Sender<examples::Ping> pi1_reliable_sender =
2710 pi1_event_loop->MakeSender<examples::Ping>("/reliable");
Austin Schuh9cce6842024-04-02 18:55:44 -07002711 FunctionScheduler run_at(pi1_event_loop.get());
2712 aos::monotonic_clock::time_point now = pi1_event_loop->monotonic_now();
Austin Schuhac6d89e2024-03-27 14:56:09 -07002713 for (int i = 0; i < 100; ++i) {
Austin Schuh9cce6842024-04-02 18:55:44 -07002714 run_at.ScheduleAt([&, i = i]() { SendPing(&pi1_reliable_sender, i); },
2715 now);
2716 now += chrono::milliseconds(100);
Austin Schuhac6d89e2024-03-27 14:56:09 -07002717 }
Austin Schuh9cce6842024-04-02 18:55:44 -07002718 now += chrono::milliseconds(50);
Austin Schuhac6d89e2024-03-27 14:56:09 -07002719
Austin Schuh9cce6842024-04-02 18:55:44 -07002720 factory.RunFor(now - pi1_event_loop->monotonic_now());
2721 }
Austin Schuhac6d89e2024-03-27 14:56:09 -07002722
2723 ASSERT_FALSE(pi2_reliable_fetcher.Fetch());
2724
2725 pi1->Connect(pi2->node());
2726 pi2->Connect(pi1->node());
2727
2728 factory.RunFor(chrono::milliseconds(1));
2729
2730 ASSERT_TRUE(pi2_reliable_fetcher.Fetch());
2731 ASSERT_EQ(pi2_reliable_fetcher.context().monotonic_remote_time,
2732 monotonic_clock::epoch() + chrono::milliseconds(10000));
2733 ASSERT_EQ(pi2_reliable_fetcher.context().monotonic_remote_transmit_time,
2734 monotonic_clock::epoch() + chrono::milliseconds(10150));
2735 ASSERT_EQ(pi2_reliable_fetcher.context().monotonic_event_time,
2736 monotonic_clock::epoch() + chrono::milliseconds(10150) +
2737 factory.network_delay());
2738 ASSERT_EQ(pi2_reliable_fetcher->value(), 99);
2739
Austin Schuh9cce6842024-04-02 18:55:44 -07002740 // TODO(austin): Verify that the dropped packet count increases.
2741
Austin Schuhac6d89e2024-03-27 14:56:09 -07002742 ASSERT_FALSE(pi2_reliable_fetcher.Fetch());
2743}
2744
Austin Schuh9cce6842024-04-02 18:55:44 -07002745// Tests that if we disconnect while a message is in various states of being
2746// queued, it gets either dropped or sent as expected.
2747TEST_F(SimulatedEventLoopDisconnectTest, MessageInFlightDuringDisconnect) {
2748 time.StartEqual();
2749 factory.SkipTimingReport();
2750 factory.DisableStatistics();
2751
2752 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2753 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2754
2755 std::unique_ptr<aos::EventLoop> pi1_event_loop = pi1->MakeEventLoop("sender");
2756
2757 std::unique_ptr<aos::EventLoop> pi2_event_loop =
2758 pi2->MakeEventLoop("fetcher");
2759 aos::Fetcher<examples::Ping> fetcher =
2760 pi2_event_loop->MakeFetcher<examples::Ping>("/unreliable");
2761
2762 ASSERT_FALSE(fetcher.Fetch());
2763
2764 aos::monotonic_clock::time_point now = pi1_event_loop->monotonic_now();
2765 {
2766 FunctionScheduler run_at(pi1_event_loop.get());
2767 aos::Sender<examples::Ping> pi1_sender =
2768 pi1_event_loop->MakeSender<examples::Ping>("/unreliable");
2769
2770 int i = 0;
2771 for (const std::chrono::nanoseconds dt :
2772 {chrono::microseconds(5000), chrono::microseconds(1),
2773 chrono::microseconds(2), chrono::microseconds(70),
2774 chrono::microseconds(63), chrono::microseconds(140),
2775 chrono::microseconds(160)}) {
2776 run_at.ScheduleAt(
2777 [&]() {
2778 pi1->Connect(pi2->node());
2779 pi2->Connect(pi1->node());
2780 },
2781 now);
2782
2783 now += chrono::milliseconds(100);
2784
2785 run_at.ScheduleAt([&, i = i]() { SendPing(&pi1_sender, i); }, now);
2786
2787 now += dt;
2788
2789 run_at.ScheduleAt(
2790 [&]() {
2791 // Fully disconnect the nodes.
2792 pi1->Disconnect(pi2->node());
2793 pi2->Disconnect(pi1->node());
2794 },
2795 now);
2796
2797 now += chrono::milliseconds(100) - dt;
2798 ++i;
2799 }
2800
2801 factory.RunFor(now - pi1_event_loop->monotonic_now());
2802 }
2803
2804 const monotonic_clock::time_point e = monotonic_clock::epoch();
2805 const chrono::nanoseconds send_delay = factory.send_delay();
2806 const chrono::nanoseconds network_delay = factory.network_delay();
2807
2808 const std::vector<ExpectedTimestamps> expected_values = {
2809 ExpectedTimestamps{
2810 .remote_time = e + chrono::milliseconds(100),
2811 .remote_transmit_time = e + chrono::milliseconds(100) + send_delay,
2812 .event_time =
2813 e + chrono::milliseconds(100) + send_delay + network_delay,
2814 .value = 0,
2815 },
2816 ExpectedTimestamps{
2817 .remote_time = e + chrono::milliseconds(1300),
2818 .remote_transmit_time = e + chrono::milliseconds(1300) + send_delay,
2819 .event_time =
2820 e + chrono::milliseconds(1300) + send_delay + network_delay,
2821 .value = 6,
2822 },
2823 };
2824
2825 for (const ExpectedTimestamps value : expected_values) {
2826 ASSERT_TRUE(fetcher.FetchNext());
2827 EXPECT_EQ(fetcher.context().monotonic_remote_time, value.remote_time);
2828 EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
2829 value.remote_transmit_time);
2830 EXPECT_EQ(fetcher.context().monotonic_event_time, value.event_time);
2831 EXPECT_EQ(fetcher->value(), value.value);
2832 }
2833
2834 // TODO(austin): Verify that the dropped packet count increases.
2835
2836 ASSERT_FALSE(fetcher.Fetch());
2837}
2838
2839class PingLogger {
2840 public:
2841 PingLogger(aos::EventLoop *event_loop, std::string_view channel,
2842 std::vector<std::pair<aos::Context, int>> *msgs)
2843 : event_loop_(event_loop),
2844 fetcher_(event_loop_->MakeFetcher<examples::Ping>(channel)),
2845 msgs_(msgs) {
2846 event_loop_->OnRun([this]() { CHECK(!fetcher_.Fetch()); });
2847 }
2848
2849 ~PingLogger() {
2850 while (fetcher_.FetchNext()) {
2851 msgs_->emplace_back(fetcher_.context(), fetcher_->value());
2852 }
2853 }
2854
2855 private:
2856 aos::EventLoop *event_loop_;
2857 aos::Fetcher<examples::Ping> fetcher_;
2858 std::vector<std::pair<aos::Context, int>> *msgs_;
2859};
2860
2861// Tests that rebooting while a message is in flight works as expected.
2862TEST_F(SimulatedEventLoopDisconnectTest, MessageInFlightDuringReboot) {
2863 time.StartEqual();
2864 for (int i = 0; i < 8; ++i) {
2865 time.RebootAt(1, distributed_clock::epoch() + chrono::seconds(10 * i));
2866 }
2867
2868 factory.SkipTimingReport();
2869 factory.DisableStatistics();
2870
2871 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2872 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2873
2874 std::unique_ptr<aos::EventLoop> pi1_event_loop = pi1->MakeEventLoop("sender");
2875
2876 aos::monotonic_clock::time_point now = pi1_event_loop->monotonic_now();
2877 FunctionScheduler run_at(pi1_event_loop.get());
2878 aos::Sender<examples::Ping> pi1_sender =
2879 pi1_event_loop->MakeSender<examples::Ping>("/unreliable");
2880
2881 int i = 0;
2882 for (const std::chrono::nanoseconds dt :
2883 {chrono::microseconds(5000), chrono::microseconds(1),
2884 chrono::microseconds(2), chrono::microseconds(70),
2885 chrono::microseconds(63), chrono::microseconds(140),
2886 chrono::microseconds(160)}) {
2887 run_at.ScheduleAt([&, i = i]() { SendPing(&pi1_sender, i); },
2888 now + chrono::seconds(10) - dt);
2889
2890 now += chrono::seconds(10);
2891 ++i;
2892 }
2893
2894 std::vector<std::pair<aos::Context, int>> msgs;
2895
2896 pi2->OnStartup([pi2, &msgs]() {
2897 pi2->AlwaysStart<PingLogger>("ping_logger", "/unreliable", &msgs);
2898 });
2899
2900 factory.RunFor(now - pi1_event_loop->monotonic_now() + chrono::seconds(10));
2901
2902 const monotonic_clock::time_point e = monotonic_clock::epoch();
2903 const chrono::nanoseconds send_delay = factory.send_delay();
2904 const chrono::nanoseconds network_delay = factory.network_delay();
2905
2906 const std::vector<ExpectedTimestamps> expected_values = {
2907 ExpectedTimestamps{
2908 .remote_time = e + chrono::microseconds(9995000),
2909 .remote_transmit_time =
2910 e + chrono::microseconds(9995000) + send_delay,
2911 .event_time =
2912 e + chrono::microseconds(9995000) + send_delay + network_delay,
2913 .value = 0,
2914 },
2915 ExpectedTimestamps{
2916 .remote_time = e + chrono::microseconds(19999999),
2917 .remote_transmit_time =
2918 e + chrono::microseconds(19999999) + send_delay,
2919 .event_time =
2920 e + chrono::microseconds(-1) + send_delay + network_delay,
2921 .value = 1,
2922 },
2923 ExpectedTimestamps{
2924 .remote_time = e + chrono::microseconds(29999998),
2925 .remote_transmit_time =
2926 e + chrono::microseconds(29999998) + send_delay,
2927 .event_time =
2928 e + chrono::microseconds(-2) + send_delay + network_delay,
2929 .value = 2,
2930 },
2931 ExpectedTimestamps{
2932 .remote_time = e + chrono::microseconds(69999840),
2933 .remote_transmit_time =
2934 e + chrono::microseconds(69999840) + send_delay,
2935 .event_time =
2936 e + chrono::microseconds(9999840) + send_delay + network_delay,
2937 .value = 6,
2938 },
2939 };
2940
2941 ASSERT_EQ(msgs.size(), expected_values.size());
2942
2943 for (size_t i = 0; i < msgs.size(); ++i) {
2944 EXPECT_EQ(msgs[i].first.monotonic_remote_time,
2945 expected_values[i].remote_time);
2946 EXPECT_EQ(msgs[i].first.monotonic_remote_transmit_time,
2947 expected_values[i].remote_transmit_time);
2948 EXPECT_EQ(msgs[i].first.monotonic_event_time,
2949 expected_values[i].event_time);
2950 EXPECT_EQ(msgs[i].second, expected_values[i].value);
2951 }
2952
2953 // TODO(austin): Verify that the dropped packet count increases.
2954}
2955
2956// Tests that rebooting while a message is in flight works as expected.
2957TEST_F(SimulatedEventLoopDisconnectTest, ReliableMessageInFlightDuringReboot) {
2958 time.StartEqual();
2959 for (int i = 0; i < 8; ++i) {
2960 time.RebootAt(1, distributed_clock::epoch() + chrono::seconds(10 * i));
2961 }
2962
2963 factory.SkipTimingReport();
2964 factory.DisableStatistics();
2965
2966 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2967 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2968
2969 std::unique_ptr<aos::EventLoop> pi1_event_loop = pi1->MakeEventLoop("sender");
2970
2971 aos::monotonic_clock::time_point now = pi1_event_loop->monotonic_now();
2972 FunctionScheduler run_at(pi1_event_loop.get());
2973 aos::Sender<examples::Ping> pi1_sender =
2974 pi1_event_loop->MakeSender<examples::Ping>("/reliable");
2975
2976 int i = 0;
2977 for (const std::chrono::nanoseconds dt :
2978 {chrono::microseconds(5000), chrono::microseconds(1),
2979 chrono::microseconds(2), chrono::microseconds(70),
2980 chrono::microseconds(63), chrono::microseconds(140),
2981 chrono::microseconds(160)}) {
2982 run_at.ScheduleAt([&, i = i]() { SendPing(&pi1_sender, i); },
2983 now + chrono::seconds(10) - dt);
2984
2985 now += chrono::seconds(10);
2986 ++i;
2987 }
2988
2989 std::vector<std::pair<aos::Context, int>> msgs;
2990
2991 PingLogger *logger;
2992 pi2->OnStartup([pi2, &msgs, &logger]() {
2993 logger = pi2->AlwaysStart<PingLogger>("ping_logger", "/reliable", &msgs);
2994 });
2995
2996 factory.RunFor(now - pi1_event_loop->monotonic_now() + chrono::seconds(10));
2997
2998 // Stop the logger to flush the last boot of data.
2999 pi2->Stop(logger);
3000
3001 const monotonic_clock::time_point e = monotonic_clock::epoch();
3002 const chrono::nanoseconds send_delay = factory.send_delay();
3003 const chrono::nanoseconds network_delay = factory.network_delay();
3004
3005 // Verified using --vmodule=simulated_event_loop=1 and looking at the actual
3006 // event times to confirm what should have been forwarded when.
3007 const std::vector<ExpectedTimestamps> expected_values = {
3008 ExpectedTimestamps{
3009 .remote_time = e + chrono::microseconds(9995000),
3010 .remote_transmit_time =
3011 e + chrono::microseconds(9995000) + send_delay,
3012 .event_time =
3013 e + chrono::microseconds(9995000) + send_delay + network_delay,
3014 .value = 0,
3015 },
3016 ExpectedTimestamps{
3017 .remote_time = e + chrono::microseconds(9995000),
3018 .remote_transmit_time = e + chrono::microseconds(10000000),
3019 .event_time = e + network_delay,
3020 .value = 0,
3021 },
3022 ExpectedTimestamps{
3023 .remote_time = e + chrono::microseconds(19999999),
3024 .remote_transmit_time = e + chrono::microseconds(20000000),
3025 .event_time = e + network_delay,
3026 .value = 1,
3027 },
3028 ExpectedTimestamps{
3029 .remote_time = e + chrono::microseconds(29999998),
3030 .remote_transmit_time = e + chrono::microseconds(30000000),
3031 .event_time = e + network_delay,
3032 .value = 2,
3033 },
3034 ExpectedTimestamps{
3035 .remote_time = e + chrono::microseconds(39999930),
3036 .remote_transmit_time = e + chrono::microseconds(40000000),
3037 .event_time = e + network_delay,
3038 .value = 3,
3039 },
3040 ExpectedTimestamps{
3041 .remote_time = e + chrono::microseconds(49999937),
3042 .remote_transmit_time = e + chrono::microseconds(50000000),
3043 .event_time = e + network_delay,
3044 .value = 4,
3045 },
3046 ExpectedTimestamps{
3047 .remote_time = e + chrono::microseconds(59999860),
3048 .remote_transmit_time = e + chrono::microseconds(60000000),
3049 .event_time = e + network_delay,
3050 .value = 5,
3051 },
3052 ExpectedTimestamps{
3053 .remote_time = e + chrono::microseconds(69999840),
3054 .remote_transmit_time = e + chrono::microseconds(69999890),
3055 .event_time = e + chrono::microseconds(9999890) + network_delay,
3056 .value = 6,
3057 },
3058 ExpectedTimestamps{
3059 .remote_time = e + chrono::microseconds(69999840),
3060 .remote_transmit_time = e + chrono::microseconds(70000000),
3061 .event_time = e + network_delay,
3062 .value = 6,
3063 },
3064 };
3065
3066 ASSERT_EQ(msgs.size(), expected_values.size());
3067
3068 for (size_t i = 0; i < msgs.size(); ++i) {
3069 EXPECT_EQ(msgs[i].first.monotonic_remote_time,
3070 expected_values[i].remote_time);
3071 EXPECT_EQ(msgs[i].first.monotonic_remote_transmit_time,
3072 expected_values[i].remote_transmit_time);
3073 EXPECT_EQ(msgs[i].first.monotonic_event_time,
3074 expected_values[i].event_time);
3075 EXPECT_EQ(msgs[i].second, expected_values[i].value);
3076 }
3077
3078 // TODO(austin): Verify that the dropped packet count increases.
3079}
3080
Stephan Pleinesf63bde82024-01-13 15:59:33 -08003081} // namespace aos::testing