blob: 1588edefb3199b4fe1cc5776e76f3c16dc97dd22 [file] [log] [blame]
Alex Perrycb7da4b2019-08-28 19:35:56 -07001#include "aos/events/simulated_event_loop.h"
2
milind1f1dca32021-07-03 13:50:07 -07003#include <chrono>
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -07004#include <functional>
Austin Schuh5f1cc5c2019-12-01 18:01:11 -08005#include <string_view>
6
Philipp Schrader790cb542023-07-05 21:06:52 -07007#include "gtest/gtest.h"
8
Alex Perrycb7da4b2019-08-28 19:35:56 -07009#include "aos/events/event_loop_param_test.h"
Austin Schuhbf610b72024-04-04 20:04:55 -070010#include "aos/events/function_scheduler.h"
Austin Schuh2f8fd752020-09-01 22:38:28 -070011#include "aos/events/logging/logger_generated.h"
Austin Schuh01b4c352020-09-21 23:09:39 -070012#include "aos/events/message_counter.h"
Austin Schuh898f4972020-01-11 17:21:25 -080013#include "aos/events/ping_lib.h"
14#include "aos/events/pong_lib.h"
Austin Schuh7d87b672019-12-01 20:23:49 -080015#include "aos/events/test_message_generated.h"
Austin Schuh4c3b9702020-08-30 11:34:55 -070016#include "aos/network/message_bridge_client_generated.h"
17#include "aos/network/message_bridge_server_generated.h"
Austin Schuh0de30f32020-12-06 12:44:28 -080018#include "aos/network/remote_message_generated.h"
Austin Schuh87dd3832021-01-01 23:07:31 -080019#include "aos/network/testing_time_converter.h"
Austin Schuh4c3b9702020-08-30 11:34:55 -070020#include "aos/network/timestamp_generated.h"
Austin Schuh373f1762021-06-02 21:07:09 -070021#include "aos/testing/path.h"
Neil Balchc8f41ed2018-01-20 22:06:53 -080022
Stephan Pleinesf63bde82024-01-13 15:59:33 -080023namespace aos::testing {
Brian Silverman28d14302020-09-18 15:26:17 -070024namespace {
25
Austin Schuh373f1762021-06-02 21:07:09 -070026using aos::testing::ArtifactPath;
Brian Silverman28d14302020-09-18 15:26:17 -070027
Austin Schuh58646e22021-08-23 23:51:46 -070028using logger::BootTimestamp;
Austin Schuh0de30f32020-12-06 12:44:28 -080029using message_bridge::RemoteMessage;
Austin Schuh7267c532019-05-19 19:55:53 -070030namespace chrono = ::std::chrono;
31
Austin Schuh0de30f32020-12-06 12:44:28 -080032} // namespace
33
Neil Balchc8f41ed2018-01-20 22:06:53 -080034class SimulatedEventLoopTestFactory : public EventLoopTestFactory {
35 public:
Austin Schuh5f1cc5c2019-12-01 18:01:11 -080036 ::std::unique_ptr<EventLoop> Make(std::string_view name) override {
Austin Schuh217a9782019-12-21 23:02:50 -080037 MaybeMake();
Austin Schuhac0771c2020-01-07 18:36:30 -080038 return event_loop_factory_->MakeEventLoop(name, my_node());
Neil Balchc8f41ed2018-01-20 22:06:53 -080039 }
Austin Schuh5f1cc5c2019-12-01 18:01:11 -080040 ::std::unique_ptr<EventLoop> MakePrimary(std::string_view name) override {
Austin Schuh217a9782019-12-21 23:02:50 -080041 MaybeMake();
Austin Schuhac0771c2020-01-07 18:36:30 -080042 return event_loop_factory_->MakeEventLoop(name, my_node());
Austin Schuh44019f92019-05-19 19:58:27 -070043 }
44
James Kuszmaul9f998082024-05-23 15:37:35 -070045 Result<void> Run() override { return event_loop_factory_->Run(); }
46
47 std::unique_ptr<ExitHandle> MakeExitHandle() override {
48 MaybeMake();
49 return event_loop_factory_->MakeExitHandle();
50 }
51
Austin Schuh217a9782019-12-21 23:02:50 -080052 void Exit() override { event_loop_factory_->Exit(); }
Austin Schuh44019f92019-05-19 19:58:27 -070053
Austin Schuh52d325c2019-06-23 18:59:06 -070054 // TODO(austin): Implement this. It's used currently for a phased loop test.
55 // I'm not sure how much that matters.
56 void SleepFor(::std::chrono::nanoseconds /*duration*/) override {}
57
Austin Schuh7d87b672019-12-01 20:23:49 -080058 void set_send_delay(std::chrono::nanoseconds send_delay) {
Austin Schuh217a9782019-12-21 23:02:50 -080059 MaybeMake();
60 event_loop_factory_->set_send_delay(send_delay);
Austin Schuh7d87b672019-12-01 20:23:49 -080061 }
62
Neil Balchc8f41ed2018-01-20 22:06:53 -080063 private:
Austin Schuh217a9782019-12-21 23:02:50 -080064 void MaybeMake() {
65 if (!event_loop_factory_) {
66 if (configuration()->has_nodes()) {
Austin Schuhac0771c2020-01-07 18:36:30 -080067 event_loop_factory_ =
68 std::make_unique<SimulatedEventLoopFactory>(configuration());
Austin Schuh217a9782019-12-21 23:02:50 -080069 } else {
70 event_loop_factory_ =
71 std::make_unique<SimulatedEventLoopFactory>(configuration());
72 }
73 }
74 }
75 std::unique_ptr<SimulatedEventLoopFactory> event_loop_factory_;
Neil Balchc8f41ed2018-01-20 22:06:53 -080076};
77
Austin Schuh6bae8252021-02-07 22:01:49 -080078auto CommonParameters() {
79 return ::testing::Combine(
80 ::testing::Values([]() { return new SimulatedEventLoopTestFactory(); }),
81 ::testing::Values(ReadMethod::COPY, ReadMethod::PIN),
82 ::testing::Values(DoTimingReports::kYes, DoTimingReports::kNo));
83}
Austin Schuh6b6dfa52019-06-12 20:16:20 -070084
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -070085INSTANTIATE_TEST_SUITE_P(SimulatedEventLoopCommonTest, AbstractEventLoopTest,
Austin Schuh66168842021-08-17 19:42:21 -070086 CommonParameters());
Brian Silverman77162972020-08-12 19:52:40 -070087
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -070088INSTANTIATE_TEST_SUITE_P(SimulatedEventLoopCommonDeathTest,
Austin Schuh66168842021-08-17 19:42:21 -070089 AbstractEventLoopDeathTest, CommonParameters());
Neil Balchc8f41ed2018-01-20 22:06:53 -080090
Austin Schuh89c9b812021-02-20 14:42:10 -080091// Parameters to run all the tests with.
92struct Param {
93 // The config file to use.
94 std::string config;
95 // If true, the RemoteMessage channel should be shared between all the remote
96 // channels. If false, there will be 1 RemoteMessage channel per remote
97 // channel.
98 bool shared;
99};
100
101class RemoteMessageSimulatedEventLoopTest
102 : public ::testing::TestWithParam<struct Param> {
103 public:
104 RemoteMessageSimulatedEventLoopTest()
105 : config(aos::configuration::ReadConfig(
Austin Schuh373f1762021-06-02 21:07:09 -0700106 ArtifactPath(absl::StrCat("aos/events/", GetParam().config)))) {
Austin Schuh89c9b812021-02-20 14:42:10 -0800107 LOG(INFO) << "Config " << GetParam().config;
108 }
109
110 bool shared() const { return GetParam().shared; }
111
112 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
113 MakePi2OnPi1MessageCounters(aos::EventLoop *event_loop) {
114 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>> counters;
115 if (shared()) {
116 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
117 event_loop, "/aos/remote_timestamps/pi2"));
118 } else {
119 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
120 event_loop,
121 "/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp"));
122 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
123 event_loop, "/aos/remote_timestamps/pi2/test/aos-examples-Ping"));
124 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
125 event_loop, "/aos/remote_timestamps/pi2/reliable/aos-examples-Ping"));
126 }
127 return counters;
128 }
129
130 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
131 MakePi1OnPi2MessageCounters(aos::EventLoop *event_loop) {
132 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>> counters;
133 if (shared()) {
134 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
135 event_loop, "/aos/remote_timestamps/pi1"));
136 } else {
137 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
138 event_loop, "/aos/remote_timestamps/pi1/test/aos-examples-Pong"));
139 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
140 event_loop,
141 "/aos/remote_timestamps/pi1/pi2/aos/aos-message_bridge-Timestamp"));
142 }
143 return counters;
144 }
145
146 aos::FlatbufferDetachedBuffer<aos::Configuration> config;
147};
148
Austin Schuh8fb315a2020-11-19 22:33:58 -0800149// Test that sending a message after running gets properly notified.
150TEST(SimulatedEventLoopTest, SendAfterRunFor) {
151 SimulatedEventLoopTestFactory factory;
152
153 SimulatedEventLoopFactory simulated_event_loop_factory(
154 factory.configuration());
155
156 ::std::unique_ptr<EventLoop> ping_event_loop =
157 simulated_event_loop_factory.MakeEventLoop("ping");
158 aos::Sender<TestMessage> test_message_sender =
159 ping_event_loop->MakeSender<TestMessage>("/test");
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700160 ASSERT_EQ(SendTestMessage(test_message_sender), RawSender::Error::kOk);
Austin Schuh8fb315a2020-11-19 22:33:58 -0800161
162 std::unique_ptr<EventLoop> pong1_event_loop =
163 simulated_event_loop_factory.MakeEventLoop("pong");
164 MessageCounter<TestMessage> test_message_counter1(pong1_event_loop.get(),
165 "/test");
166
167 EXPECT_FALSE(ping_event_loop->is_running());
168
169 // Watchers start when you start running, so there should be nothing counted.
170 simulated_event_loop_factory.RunFor(chrono::seconds(1));
171 EXPECT_EQ(test_message_counter1.count(), 0u);
172
173 std::unique_ptr<EventLoop> pong2_event_loop =
174 simulated_event_loop_factory.MakeEventLoop("pong");
175 MessageCounter<TestMessage> test_message_counter2(pong2_event_loop.get(),
176 "/test");
177
178 // Pauses in the middle don't count though, so this should be counted.
179 // But, the fresh watcher shouldn't pick it up yet.
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700180 ASSERT_EQ(SendTestMessage(test_message_sender), RawSender::Error::kOk);
Austin Schuh8fb315a2020-11-19 22:33:58 -0800181
182 EXPECT_EQ(test_message_counter1.count(), 0u);
183 EXPECT_EQ(test_message_counter2.count(), 0u);
184 simulated_event_loop_factory.RunFor(chrono::seconds(1));
185
186 EXPECT_EQ(test_message_counter1.count(), 1u);
187 EXPECT_EQ(test_message_counter2.count(), 0u);
188}
189
Austin Schuhd027bf52024-05-12 22:24:12 -0700190// Test that OnRun callbacks get deleted if the event loop gets deleted.
191TEST(SimulatedEventLoopTest, DestructEventLoopBeforeOnRun) {
192 SimulatedEventLoopTestFactory factory;
193
194 SimulatedEventLoopFactory simulated_event_loop_factory(
195 factory.configuration());
196
197 {
198 ::std::unique_ptr<EventLoop> test_event_loop =
199 simulated_event_loop_factory.MakeEventLoop("test");
200 test_event_loop->OnRun([]() { LOG(FATAL) << "Don't run this"; });
201 }
202
203 simulated_event_loop_factory.RunFor(chrono::seconds(1));
204}
205
206// Tests that the order event loops are created is the order that the OnRun
207// callbacks are run.
208TEST(SimulatedEventLoopTest, OnRunOrderFollowsConstructionOrder) {
209 SimulatedEventLoopTestFactory factory;
210
211 SimulatedEventLoopFactory simulated_event_loop_factory(
212 factory.configuration());
213
214 int count = 0;
215
216 std::unique_ptr<EventLoop> test1_event_loop =
217 simulated_event_loop_factory.MakeEventLoop("test1");
218 std::unique_ptr<EventLoop> test2_event_loop =
219 simulated_event_loop_factory.MakeEventLoop("test2");
220 test2_event_loop->OnRun([&count]() {
221 EXPECT_EQ(count, 1u);
222 ++count;
223 });
224 test1_event_loop->OnRun([&count]() {
225 EXPECT_EQ(count, 0u);
226 ++count;
227 });
228
229 simulated_event_loop_factory.RunFor(chrono::seconds(1));
230
231 EXPECT_EQ(count, 2u);
232}
233
234// Test that we can't register OnRun callbacks after starting.
235TEST(SimulatedEventLoopDeathTest, OnRunAfterRunning) {
236 SimulatedEventLoopTestFactory factory;
237
238 SimulatedEventLoopFactory simulated_event_loop_factory(
239 factory.configuration());
240
241 std::unique_ptr<EventLoop> test_event_loop =
242 simulated_event_loop_factory.MakeEventLoop("test");
243 test_event_loop->OnRun([]() {});
244
245 simulated_event_loop_factory.RunFor(chrono::seconds(1));
246
247 EXPECT_DEATH(test_event_loop->OnRun([]() {}), "OnRun");
248}
249
Austin Schuh60e77942022-05-16 17:48:24 -0700250// Test that if we configure an event loop to be able to send too fast that we
251// do allow it to do so.
James Kuszmaul890c2492022-04-06 14:59:31 -0700252TEST(SimulatedEventLoopTest, AllowSendTooFast) {
253 SimulatedEventLoopTestFactory factory;
254
255 SimulatedEventLoopFactory simulated_event_loop_factory(
256 factory.configuration());
257
258 // Create two event loops: One will be allowed to send too fast, one won't. We
259 // will then test to ensure that the one that is allowed to send too fast can
260 // indeed send too fast, but that it then makes it so that the second event
261 // loop can no longer send anything because *it* is still limited.
262 ::std::unique_ptr<EventLoop> too_fast_event_loop =
263 simulated_event_loop_factory.GetNodeEventLoopFactory(nullptr)
264 ->MakeEventLoop("too_fast_sender",
265 {NodeEventLoopFactory::CheckSentTooFast::kNo,
James Kuszmaul94ca5132022-07-19 09:11:08 -0700266 NodeEventLoopFactory::ExclusiveSenders::kNo,
267 {}});
James Kuszmaul890c2492022-04-06 14:59:31 -0700268 aos::Sender<TestMessage> too_fast_message_sender =
269 too_fast_event_loop->MakeSender<TestMessage>("/test");
270
271 ::std::unique_ptr<EventLoop> limited_event_loop =
272 simulated_event_loop_factory.MakeEventLoop("limited_sender");
273 aos::Sender<TestMessage> limited_message_sender =
274 limited_event_loop->MakeSender<TestMessage>("/test");
275
276 const int queue_size = TestChannelQueueSize(too_fast_event_loop.get());
277 for (int ii = 0; ii < queue_size; ++ii) {
278 ASSERT_EQ(SendTestMessage(too_fast_message_sender), RawSender::Error::kOk);
279 }
280 // And now we should start being in the sending-too-fast phase.
281 for (int ii = 0; ii < queue_size; ++ii) {
282 ASSERT_EQ(SendTestMessage(too_fast_message_sender), RawSender::Error::kOk);
Austin Schuh60e77942022-05-16 17:48:24 -0700283 ASSERT_EQ(SendTestMessage(limited_message_sender),
284 RawSender::Error::kMessagesSentTooFast);
James Kuszmaul890c2492022-04-06 14:59:31 -0700285 }
286}
287
288// Test that if we setup an exclusive sender that it is indeed exclusive.
289TEST(SimulatedEventLoopDeathTest, ExclusiveSenders) {
290 SimulatedEventLoopTestFactory factory;
291
292 SimulatedEventLoopFactory simulated_event_loop_factory(
293 factory.configuration());
294
295 ::std::unique_ptr<EventLoop> exclusive_event_loop =
296 simulated_event_loop_factory.GetNodeEventLoopFactory(nullptr)
James Kuszmaul94ca5132022-07-19 09:11:08 -0700297 ->MakeEventLoop(
298 "too_fast_sender",
299 {NodeEventLoopFactory::CheckSentTooFast::kYes,
300 NodeEventLoopFactory::ExclusiveSenders::kYes,
301 {{configuration::GetChannel(factory.configuration(), "/test1",
302 "aos.TestMessage", "", nullptr),
303 NodeEventLoopFactory::ExclusiveSenders::kNo}}});
James Kuszmaul890c2492022-04-06 14:59:31 -0700304 exclusive_event_loop->SkipAosLog();
305 exclusive_event_loop->SkipTimingReport();
306 ::std::unique_ptr<EventLoop> normal_event_loop =
307 simulated_event_loop_factory.MakeEventLoop("limited_sender");
308 // Set things up to have the exclusive sender be destroyed so we can test
309 // recovery.
310 {
311 aos::Sender<TestMessage> exclusive_sender =
312 exclusive_event_loop->MakeSender<TestMessage>("/test");
313
314 EXPECT_DEATH(normal_event_loop->MakeSender<TestMessage>("/test"),
315 "TestMessage");
316 }
317 // This one should succeed now that the exclusive channel is removed.
318 aos::Sender<TestMessage> normal_sender =
319 normal_event_loop->MakeSender<TestMessage>("/test");
Austin Schuh60e77942022-05-16 17:48:24 -0700320 EXPECT_DEATH(exclusive_event_loop->MakeSender<TestMessage>("/test"),
321 "TestMessage");
James Kuszmaul94ca5132022-07-19 09:11:08 -0700322
323 // And check an explicitly exempted channel:
324 aos::Sender<TestMessage> non_exclusive_sender =
325 exclusive_event_loop->MakeSender<TestMessage>("/test1");
326 aos::Sender<TestMessage> non_exclusive_sender_regular_event_loop =
327 normal_event_loop->MakeSender<TestMessage>("/test1");
James Kuszmaul890c2492022-04-06 14:59:31 -0700328}
329
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700330void TestSentTooFastCheckEdgeCase(
331 const std::function<RawSender::Error(int, int)> expected_err,
332 const bool send_twice_at_end) {
333 SimulatedEventLoopTestFactory factory;
334
335 auto event_loop = factory.MakePrimary("primary");
336
337 auto sender = event_loop->MakeSender<TestMessage>("/test");
338
339 const int queue_size = TestChannelQueueSize(event_loop.get());
340 int msgs_sent = 0;
341 event_loop->AddPhasedLoop(
342 [&](int) {
343 EXPECT_EQ(SendTestMessage(sender), expected_err(msgs_sent, queue_size));
344 msgs_sent++;
345
346 // If send_twice_at_end, send the last two messages (message
347 // queue_size and queue_size + 1) in the same iteration, meaning that
348 // we would be sending very slightly too fast. Otherwise, we will send
349 // message queue_size + 1 in the next iteration and we will continue
350 // to be sending exactly at the channel frequency.
351 if (send_twice_at_end && (msgs_sent == queue_size)) {
352 EXPECT_EQ(SendTestMessage(sender),
353 expected_err(msgs_sent, queue_size));
354 msgs_sent++;
355 }
356
357 if (msgs_sent > queue_size) {
358 factory.Exit();
359 }
360 },
361 std::chrono::duration_cast<std::chrono::nanoseconds>(
362 std::chrono::duration<double>(
363 1.0 / TestChannelFrequency(event_loop.get()))));
364
365 factory.Run();
366}
367
368// Tests that RawSender::Error::kMessagesSentTooFast is not returned
369// when messages are sent at the exact frequency of the channel.
370TEST(SimulatedEventLoopTest, SendingAtExactlyChannelFrequency) {
371 TestSentTooFastCheckEdgeCase([](int, int) { return RawSender::Error::kOk; },
372 false);
373}
374
375// Tests that RawSender::Error::kMessagesSentTooFast is returned
376// when sending exactly one more message than allowed in a channel storage
377// duration.
378TEST(SimulatedEventLoopTest, SendingSlightlyTooFast) {
379 TestSentTooFastCheckEdgeCase(
380 [](const int msgs_sent, const int queue_size) {
381 return (msgs_sent == queue_size ? RawSender::Error::kMessagesSentTooFast
382 : RawSender::Error::kOk);
383 },
384 true);
385}
386
Austin Schuh8fb315a2020-11-19 22:33:58 -0800387// Test that creating an event loop while running dies.
388TEST(SimulatedEventLoopDeathTest, MakeEventLoopWhileRunning) {
389 SimulatedEventLoopTestFactory factory;
390
391 SimulatedEventLoopFactory simulated_event_loop_factory(
392 factory.configuration());
393
394 ::std::unique_ptr<EventLoop> event_loop =
395 simulated_event_loop_factory.MakeEventLoop("ping");
396
397 auto timer = event_loop->AddTimer([&]() {
398 EXPECT_DEATH(
399 {
400 ::std::unique_ptr<EventLoop> event_loop2 =
401 simulated_event_loop_factory.MakeEventLoop("ping");
402 },
403 "event loop while running");
404 simulated_event_loop_factory.Exit();
405 });
406
407 event_loop->OnRun([&event_loop, &timer] {
Philipp Schradera6712522023-07-05 20:25:11 -0700408 timer->Schedule(event_loop->monotonic_now() + chrono::milliseconds(50));
Austin Schuh8fb315a2020-11-19 22:33:58 -0800409 });
410
411 simulated_event_loop_factory.Run();
412}
413
414// Test that creating a watcher after running dies.
415TEST(SimulatedEventLoopDeathTest, MakeWatcherAfterRunning) {
416 SimulatedEventLoopTestFactory factory;
417
418 SimulatedEventLoopFactory simulated_event_loop_factory(
419 factory.configuration());
420
421 ::std::unique_ptr<EventLoop> event_loop =
422 simulated_event_loop_factory.MakeEventLoop("ping");
423
424 simulated_event_loop_factory.RunFor(chrono::seconds(1));
425
426 EXPECT_DEATH(
427 { MessageCounter<TestMessage> counter(event_loop.get(), "/test"); },
428 "Can't add a watcher after running");
429
430 ::std::unique_ptr<EventLoop> event_loop2 =
431 simulated_event_loop_factory.MakeEventLoop("ping");
432
433 simulated_event_loop_factory.RunFor(chrono::seconds(1));
434
435 EXPECT_DEATH(
436 { MessageCounter<TestMessage> counter(event_loop2.get(), "/test"); },
437 "Can't add a watcher after running");
438}
439
Austin Schuh44019f92019-05-19 19:58:27 -0700440// Test that running for a time period with no handlers causes time to progress
441// correctly.
442TEST(SimulatedEventLoopTest, RunForNoHandlers) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800443 SimulatedEventLoopTestFactory factory;
444
445 SimulatedEventLoopFactory simulated_event_loop_factory(
446 factory.configuration());
Austin Schuh44019f92019-05-19 19:58:27 -0700447 ::std::unique_ptr<EventLoop> event_loop =
Austin Schuh5f1cc5c2019-12-01 18:01:11 -0800448 simulated_event_loop_factory.MakeEventLoop("loop");
Austin Schuh44019f92019-05-19 19:58:27 -0700449
450 simulated_event_loop_factory.RunFor(chrono::seconds(1));
451
452 EXPECT_EQ(::aos::monotonic_clock::epoch() + chrono::seconds(1),
Austin Schuh44019f92019-05-19 19:58:27 -0700453 event_loop->monotonic_now());
454}
455
456// Test that running for a time with a periodic handler causes time to end
457// correctly.
458TEST(SimulatedEventLoopTest, RunForTimerHandler) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800459 SimulatedEventLoopTestFactory factory;
460
461 SimulatedEventLoopFactory simulated_event_loop_factory(
462 factory.configuration());
Austin Schuh44019f92019-05-19 19:58:27 -0700463 ::std::unique_ptr<EventLoop> event_loop =
Austin Schuh5f1cc5c2019-12-01 18:01:11 -0800464 simulated_event_loop_factory.MakeEventLoop("loop");
Austin Schuh44019f92019-05-19 19:58:27 -0700465
466 int counter = 0;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700467 auto timer = event_loop->AddTimer([&counter]() { ++counter; });
Austin Schuh44019f92019-05-19 19:58:27 -0700468 event_loop->OnRun([&event_loop, &timer] {
Philipp Schradera6712522023-07-05 20:25:11 -0700469 timer->Schedule(event_loop->monotonic_now() + chrono::milliseconds(50),
470 chrono::milliseconds(100));
Austin Schuh44019f92019-05-19 19:58:27 -0700471 });
472
473 simulated_event_loop_factory.RunFor(chrono::seconds(1));
474
475 EXPECT_EQ(::aos::monotonic_clock::epoch() + chrono::seconds(1),
Austin Schuh44019f92019-05-19 19:58:27 -0700476 event_loop->monotonic_now());
477 EXPECT_EQ(counter, 10);
478}
479
Austin Schuh7d87b672019-12-01 20:23:49 -0800480// Tests that watchers have latency in simulation.
481TEST(SimulatedEventLoopTest, WatcherTimingReport) {
482 SimulatedEventLoopTestFactory factory;
483 factory.set_send_delay(std::chrono::microseconds(50));
484
485 FLAGS_timing_report_ms = 1000;
486 auto loop1 = factory.MakePrimary("primary");
487 loop1->MakeWatcher("/test", [](const TestMessage &) {});
488
489 auto loop2 = factory.Make("sender_loop");
490
491 auto loop3 = factory.Make("report_fetcher");
492
493 Fetcher<timing::Report> report_fetcher =
494 loop3->MakeFetcher<timing::Report>("/aos");
495 EXPECT_FALSE(report_fetcher.Fetch());
496
497 auto sender = loop2->MakeSender<TestMessage>("/test");
498
499 // Send 10 messages in the middle of a timing report period so we get
500 // something interesting back.
501 auto test_timer = loop2->AddTimer([&sender]() {
502 for (int i = 0; i < 10; ++i) {
503 aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
504 TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
505 builder.add_value(200 + i);
milind1f1dca32021-07-03 13:50:07 -0700506 msg.CheckOk(msg.Send(builder.Finish()));
Austin Schuh7d87b672019-12-01 20:23:49 -0800507 }
508 });
509
510 // Quit after 1 timing report, mid way through the next cycle.
511 {
512 auto end_timer = loop1->AddTimer([&factory]() { factory.Exit(); });
Philipp Schradera6712522023-07-05 20:25:11 -0700513 end_timer->Schedule(loop1->monotonic_now() + chrono::milliseconds(2500));
Austin Schuh7d87b672019-12-01 20:23:49 -0800514 end_timer->set_name("end");
515 }
516
517 loop1->OnRun([&test_timer, &loop1]() {
Philipp Schradera6712522023-07-05 20:25:11 -0700518 test_timer->Schedule(loop1->monotonic_now() + chrono::milliseconds(1500));
Austin Schuh7d87b672019-12-01 20:23:49 -0800519 });
520
521 factory.Run();
522
523 // And, since we are here, check that the timing report makes sense.
524 // Start by looking for our event loop's timing.
525 FlatbufferDetachedBuffer<timing::Report> primary_report =
526 FlatbufferDetachedBuffer<timing::Report>::Empty();
527 while (report_fetcher.FetchNext()) {
528 LOG(INFO) << "Report " << FlatbufferToJson(report_fetcher.get());
529 if (report_fetcher->name()->string_view() == "primary") {
530 primary_report = CopyFlatBuffer(report_fetcher.get());
531 }
532 }
533
534 // Check the watcher report.
Ravago Jonescf453ab2020-05-06 21:14:53 -0700535 VLOG(1) << FlatbufferToJson(primary_report, {.multi_line = true});
Austin Schuh7d87b672019-12-01 20:23:49 -0800536
537 EXPECT_EQ(primary_report.message().name()->string_view(), "primary");
538
539 // Just the timing report timer.
540 ASSERT_NE(primary_report.message().timers(), nullptr);
541 EXPECT_EQ(primary_report.message().timers()->size(), 2);
542
543 // No phased loops
544 ASSERT_EQ(primary_report.message().phased_loops(), nullptr);
545
546 // And now confirm that the watcher received all 10 messages, and has latency.
547 ASSERT_NE(primary_report.message().watchers(), nullptr);
548 ASSERT_EQ(primary_report.message().watchers()->size(), 1);
549 EXPECT_EQ(primary_report.message().watchers()->Get(0)->count(), 10);
550 EXPECT_NEAR(
551 primary_report.message().watchers()->Get(0)->wakeup_latency()->average(),
552 0.00005, 1e-9);
553 EXPECT_NEAR(
554 primary_report.message().watchers()->Get(0)->wakeup_latency()->min(),
555 0.00005, 1e-9);
556 EXPECT_NEAR(
557 primary_report.message().watchers()->Get(0)->wakeup_latency()->max(),
558 0.00005, 1e-9);
559 EXPECT_EQ(primary_report.message()
560 .watchers()
561 ->Get(0)
562 ->wakeup_latency()
563 ->standard_deviation(),
564 0.0);
565
566 EXPECT_EQ(
567 primary_report.message().watchers()->Get(0)->handler_time()->average(),
568 0.0);
569 EXPECT_EQ(primary_report.message().watchers()->Get(0)->handler_time()->min(),
570 0.0);
571 EXPECT_EQ(primary_report.message().watchers()->Get(0)->handler_time()->max(),
572 0.0);
573 EXPECT_EQ(primary_report.message()
574 .watchers()
575 ->Get(0)
576 ->handler_time()
577 ->standard_deviation(),
578 0.0);
579}
580
Austin Schuh89c9b812021-02-20 14:42:10 -0800581size_t CountAll(
582 const std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
583 &counters) {
584 size_t count = 0u;
585 for (const std::unique_ptr<MessageCounter<RemoteMessage>> &counter :
586 counters) {
587 count += counter->count();
588 }
589 return count;
590}
591
Austin Schuh4c3b9702020-08-30 11:34:55 -0700592// Tests that ping and pong work when on 2 different nodes, and the message
593// gateway messages are sent out as expected.
Austin Schuh89c9b812021-02-20 14:42:10 -0800594TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodePingPong) {
Austin Schuh898f4972020-01-11 17:21:25 -0800595 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
596 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
Austin Schuh4c3b9702020-08-30 11:34:55 -0700597 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
Austin Schuh898f4972020-01-11 17:21:25 -0800598
599 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
600
601 std::unique_ptr<EventLoop> ping_event_loop =
602 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
603 Ping ping(ping_event_loop.get());
604
605 std::unique_ptr<EventLoop> pong_event_loop =
606 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
607 Pong pong(pong_event_loop.get());
608
609 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
610 simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700611 MessageCounter<examples::Pong> pi2_pong_counter(
612 pi2_pong_counter_event_loop.get(), "/test");
Austin Schuh2f8fd752020-09-01 22:38:28 -0700613 aos::Fetcher<message_bridge::Timestamp> pi1_on_pi2_timestamp_fetcher =
614 pi2_pong_counter_event_loop->MakeFetcher<message_bridge::Timestamp>(
615 "/pi1/aos");
616 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
617 pi2_pong_counter_event_loop->MakeFetcher<examples::Ping>("/test");
Austin Schuh898f4972020-01-11 17:21:25 -0800618
Austin Schuh4c3b9702020-08-30 11:34:55 -0700619 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
620 simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
Austin Schuh898f4972020-01-11 17:21:25 -0800621
622 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
623 simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700624 MessageCounter<examples::Pong> pi1_pong_counter(
625 pi1_pong_counter_event_loop.get(), "/test");
Austin Schuh2f8fd752020-09-01 22:38:28 -0700626 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
627 pi1_pong_counter_event_loop->MakeFetcher<examples::Ping>("/test");
628 aos::Fetcher<message_bridge::Timestamp> pi1_on_pi1_timestamp_fetcher =
629 pi1_pong_counter_event_loop->MakeFetcher<message_bridge::Timestamp>(
630 "/aos");
631
Austin Schuh4c3b9702020-08-30 11:34:55 -0700632 // Count timestamps.
633 MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
634 pi1_pong_counter_event_loop.get(), "/pi1/aos");
635 MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
636 pi2_pong_counter_event_loop.get(), "/pi1/aos");
637 MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
638 pi3_pong_counter_event_loop.get(), "/pi1/aos");
639 MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
640 pi1_pong_counter_event_loop.get(), "/pi2/aos");
641 MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
642 pi2_pong_counter_event_loop.get(), "/pi2/aos");
643 MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
644 pi1_pong_counter_event_loop.get(), "/pi3/aos");
645 MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
646 pi3_pong_counter_event_loop.get(), "/pi3/aos");
647
Austin Schuh2f8fd752020-09-01 22:38:28 -0700648 // Count remote timestamps
Austin Schuh89c9b812021-02-20 14:42:10 -0800649 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
650 remote_timestamps_pi2_on_pi1 =
651 MakePi2OnPi1MessageCounters(pi1_pong_counter_event_loop.get());
652 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
653 remote_timestamps_pi1_on_pi2 =
654 MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700655
Austin Schuh4c3b9702020-08-30 11:34:55 -0700656 // Wait to let timestamp estimation start up before looking for the results.
657 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
658
Austin Schuh8fb315a2020-11-19 22:33:58 -0800659 std::unique_ptr<EventLoop> pi1_statistics_counter_event_loop =
660 simulated_event_loop_factory.MakeEventLoop("pi1_statistics_counter", pi1);
661 std::unique_ptr<EventLoop> pi2_statistics_counter_event_loop =
662 simulated_event_loop_factory.MakeEventLoop("pi2_statistics_counter", pi2);
663 std::unique_ptr<EventLoop> pi3_statistics_counter_event_loop =
664 simulated_event_loop_factory.MakeEventLoop("pi3_statistics_counter", pi3);
665
Austin Schuh4c3b9702020-08-30 11:34:55 -0700666 int pi1_server_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800667 pi1_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700668 "/pi1/aos", [&pi1_server_statistics_count](
669 const message_bridge::ServerStatistics &stats) {
670 VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
671 EXPECT_EQ(stats.connections()->size(), 2u);
672 for (const message_bridge::ServerConnection *connection :
673 *stats.connections()) {
674 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800675 EXPECT_EQ(connection->connection_count(), 1u);
676 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800677 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700678 if (connection->node()->name()->string_view() == "pi2") {
679 EXPECT_GT(connection->sent_packets(), 50);
680 } else if (connection->node()->name()->string_view() == "pi3") {
681 EXPECT_GE(connection->sent_packets(), 5);
682 } else {
683 LOG(FATAL) << "Unknown connection";
684 }
685
686 EXPECT_TRUE(connection->has_monotonic_offset());
687 EXPECT_EQ(connection->monotonic_offset(), 0);
James Kuszmaula6681e22023-05-26 11:20:40 -0700688
689 EXPECT_TRUE(connection->has_channels());
690 int accumulated_sent_count = 0;
691 int accumulated_dropped_count = 0;
692 for (const message_bridge::ServerChannelStatistics *channel :
693 *connection->channels()) {
694 accumulated_sent_count += channel->sent_packets();
695 accumulated_dropped_count += channel->dropped_packets();
696 }
697 EXPECT_EQ(connection->sent_packets(), accumulated_sent_count);
698 EXPECT_EQ(connection->dropped_packets(), accumulated_dropped_count);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700699 }
700 ++pi1_server_statistics_count;
701 });
702
703 int pi2_server_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800704 pi2_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700705 "/pi2/aos", [&pi2_server_statistics_count](
706 const message_bridge::ServerStatistics &stats) {
707 VLOG(1) << "pi2 ServerStatistics " << FlatbufferToJson(&stats);
708 EXPECT_EQ(stats.connections()->size(), 1u);
709
710 const message_bridge::ServerConnection *connection =
711 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800712 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700713 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
714 EXPECT_GT(connection->sent_packets(), 50);
715 EXPECT_TRUE(connection->has_monotonic_offset());
716 EXPECT_EQ(connection->monotonic_offset(), 0);
Austin Schuh367a7f42021-11-23 23:04:36 -0800717 EXPECT_EQ(connection->connection_count(), 1u);
718 EXPECT_EQ(connection->connected_since_time(), 0);
James Kuszmaula6681e22023-05-26 11:20:40 -0700719
720 EXPECT_TRUE(connection->has_channels());
721 int accumulated_sent_count = 0;
722 int accumulated_dropped_count = 0;
723 for (const message_bridge::ServerChannelStatistics *channel :
724 *connection->channels()) {
725 accumulated_sent_count += channel->sent_packets();
726 accumulated_dropped_count += channel->dropped_packets();
727 }
728 EXPECT_EQ(connection->sent_packets(), accumulated_sent_count);
729 EXPECT_EQ(connection->dropped_packets(), accumulated_dropped_count);
730
Austin Schuh4c3b9702020-08-30 11:34:55 -0700731 ++pi2_server_statistics_count;
732 });
733
734 int pi3_server_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800735 pi3_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700736 "/pi3/aos", [&pi3_server_statistics_count](
737 const message_bridge::ServerStatistics &stats) {
738 VLOG(1) << "pi3 ServerStatistics " << FlatbufferToJson(&stats);
739 EXPECT_EQ(stats.connections()->size(), 1u);
740
741 const message_bridge::ServerConnection *connection =
742 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800743 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700744 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
745 EXPECT_GE(connection->sent_packets(), 5);
746 EXPECT_TRUE(connection->has_monotonic_offset());
747 EXPECT_EQ(connection->monotonic_offset(), 0);
Austin Schuh367a7f42021-11-23 23:04:36 -0800748 EXPECT_EQ(connection->connection_count(), 1u);
749 EXPECT_EQ(connection->connected_since_time(), 0);
James Kuszmaula6681e22023-05-26 11:20:40 -0700750
751 EXPECT_TRUE(connection->has_channels());
752 int accumulated_sent_count = 0;
753 int accumulated_dropped_count = 0;
754 for (const message_bridge::ServerChannelStatistics *channel :
755 *connection->channels()) {
756 accumulated_sent_count += channel->sent_packets();
757 accumulated_dropped_count += channel->dropped_packets();
758 }
759 EXPECT_EQ(connection->sent_packets(), accumulated_sent_count);
760 EXPECT_EQ(connection->dropped_packets(), accumulated_dropped_count);
761
Austin Schuh4c3b9702020-08-30 11:34:55 -0700762 ++pi3_server_statistics_count;
763 });
764
765 int pi1_client_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800766 pi1_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700767 "/pi1/aos", [&pi1_client_statistics_count](
768 const message_bridge::ClientStatistics &stats) {
769 VLOG(1) << "pi1 ClientStatistics " << FlatbufferToJson(&stats);
770 EXPECT_EQ(stats.connections()->size(), 2u);
771
772 for (const message_bridge::ClientConnection *connection :
773 *stats.connections()) {
774 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
775 if (connection->node()->name()->string_view() == "pi2") {
776 EXPECT_GT(connection->received_packets(), 50);
777 } else if (connection->node()->name()->string_view() == "pi3") {
778 EXPECT_GE(connection->received_packets(), 5);
779 } else {
780 LOG(FATAL) << "Unknown connection";
781 }
782
Austin Schuhe61d4382021-03-31 21:33:02 -0700783 EXPECT_EQ(connection->partial_deliveries(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700784 EXPECT_TRUE(connection->has_monotonic_offset());
Austin Schuhac6d89e2024-03-27 14:56:09 -0700785 EXPECT_EQ(connection->monotonic_offset(), 100000);
Austin Schuh367a7f42021-11-23 23:04:36 -0800786 EXPECT_EQ(connection->connection_count(), 1u);
787 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700788 }
789 ++pi1_client_statistics_count;
790 });
791
792 int pi2_client_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800793 pi2_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700794 "/pi2/aos", [&pi2_client_statistics_count](
795 const message_bridge::ClientStatistics &stats) {
796 VLOG(1) << "pi2 ClientStatistics " << FlatbufferToJson(&stats);
797 EXPECT_EQ(stats.connections()->size(), 1u);
798
799 const message_bridge::ClientConnection *connection =
800 stats.connections()->Get(0);
801 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
802 EXPECT_GT(connection->received_packets(), 50);
Austin Schuhe61d4382021-03-31 21:33:02 -0700803 EXPECT_EQ(connection->partial_deliveries(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700804 EXPECT_TRUE(connection->has_monotonic_offset());
Austin Schuhac6d89e2024-03-27 14:56:09 -0700805 EXPECT_EQ(connection->monotonic_offset(), 100000);
Austin Schuh367a7f42021-11-23 23:04:36 -0800806 EXPECT_EQ(connection->connection_count(), 1u);
807 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700808 ++pi2_client_statistics_count;
809 });
810
811 int pi3_client_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800812 pi3_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700813 "/pi3/aos", [&pi3_client_statistics_count](
814 const message_bridge::ClientStatistics &stats) {
815 VLOG(1) << "pi3 ClientStatistics " << FlatbufferToJson(&stats);
816 EXPECT_EQ(stats.connections()->size(), 1u);
817
818 const message_bridge::ClientConnection *connection =
819 stats.connections()->Get(0);
820 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
821 EXPECT_GE(connection->received_packets(), 5);
Austin Schuhe61d4382021-03-31 21:33:02 -0700822 EXPECT_EQ(connection->partial_deliveries(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700823 EXPECT_TRUE(connection->has_monotonic_offset());
Austin Schuhac6d89e2024-03-27 14:56:09 -0700824 EXPECT_EQ(connection->monotonic_offset(), 100000);
Austin Schuh367a7f42021-11-23 23:04:36 -0800825 EXPECT_EQ(connection->connection_count(), 1u);
826 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700827 ++pi3_client_statistics_count;
828 });
829
Austin Schuh2f8fd752020-09-01 22:38:28 -0700830 // Find the channel index for both the /pi1/aos Timestamp channel and Ping
831 // channel.
832 const size_t pi1_timestamp_channel =
833 configuration::ChannelIndex(pi1_pong_counter_event_loop->configuration(),
834 pi1_on_pi2_timestamp_fetcher.channel());
835 const size_t ping_timestamp_channel =
836 configuration::ChannelIndex(pi1_pong_counter_event_loop->configuration(),
837 ping_on_pi2_fetcher.channel());
838
839 for (const Channel *channel :
840 *pi1_pong_counter_event_loop->configuration()->channels()) {
841 VLOG(1) << "Channel "
842 << configuration::ChannelIndex(
843 pi1_pong_counter_event_loop->configuration(), channel)
844 << " " << configuration::CleanedChannelToString(channel);
845 }
846
Austin Schuh8fb315a2020-11-19 22:33:58 -0800847 std::unique_ptr<EventLoop> pi1_remote_timestamp =
848 simulated_event_loop_factory.MakeEventLoop("pi1_remote_timestamp", pi1);
849
Austin Schuh89c9b812021-02-20 14:42:10 -0800850 for (std::pair<int, std::string> channel :
851 shared()
852 ? std::vector<std::pair<
853 int, std::string>>{{-1, "/pi1/aos/remote_timestamps/pi2"}}
854 : std::vector<std::pair<int, std::string>>{
855 {pi1_timestamp_channel,
856 "/pi1/aos/remote_timestamps/pi2/pi1/aos/"
857 "aos-message_bridge-Timestamp"},
858 {ping_timestamp_channel,
859 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping"}}) {
860 // For each remote timestamp we get back, confirm that it is either a ping
861 // message, or a timestamp we sent out. Also confirm that the timestamps
862 // are correct.
863 pi1_remote_timestamp->MakeWatcher(
864 channel.second,
865 [pi1_timestamp_channel, ping_timestamp_channel, &ping_on_pi2_fetcher,
866 &ping_on_pi1_fetcher, &pi1_on_pi2_timestamp_fetcher,
867 &pi1_on_pi1_timestamp_fetcher, &simulated_event_loop_factory, pi2,
Austin Schuhac6d89e2024-03-27 14:56:09 -0700868 channel_index = channel.first,
869 channel_name = channel.second](const RemoteMessage &header) {
870 VLOG(1) << channel_name << " aos::message_bridge::RemoteMessage -> "
871 << aos::FlatbufferToJson(&header);
Austin Schuh89c9b812021-02-20 14:42:10 -0800872 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuhcdd90272021-03-15 12:46:16 -0700873 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
Austin Schuh89c9b812021-02-20 14:42:10 -0800874 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
Austin Schuhcdd90272021-03-15 12:46:16 -0700875 ->boot_uuid());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700876
Austin Schuh89c9b812021-02-20 14:42:10 -0800877 const aos::monotonic_clock::time_point header_monotonic_sent_time(
878 chrono::nanoseconds(header.monotonic_sent_time()));
879 const aos::realtime_clock::time_point header_realtime_sent_time(
880 chrono::nanoseconds(header.realtime_sent_time()));
881 const aos::monotonic_clock::time_point header_monotonic_remote_time(
882 chrono::nanoseconds(header.monotonic_remote_time()));
Austin Schuhac6d89e2024-03-27 14:56:09 -0700883 const aos::monotonic_clock::time_point
884 header_monotonic_remote_transmit_time(
885 chrono::nanoseconds(header.monotonic_remote_transmit_time()));
Austin Schuh89c9b812021-02-20 14:42:10 -0800886 const aos::realtime_clock::time_point header_realtime_remote_time(
887 chrono::nanoseconds(header.realtime_remote_time()));
Austin Schuh2f8fd752020-09-01 22:38:28 -0700888
Austin Schuh89c9b812021-02-20 14:42:10 -0800889 if (channel_index != -1) {
890 ASSERT_EQ(channel_index, header.channel_index());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700891 }
892
Austin Schuh89c9b812021-02-20 14:42:10 -0800893 const Context *pi1_context = nullptr;
894 const Context *pi2_context = nullptr;
895
896 if (header.channel_index() == pi1_timestamp_channel) {
897 // Find the forwarded message.
898 while (pi1_on_pi2_timestamp_fetcher.context().monotonic_event_time <
899 header_monotonic_sent_time) {
900 ASSERT_TRUE(pi1_on_pi2_timestamp_fetcher.FetchNext());
901 }
902
903 // And the source message.
904 while (pi1_on_pi1_timestamp_fetcher.context().monotonic_event_time <
905 header_monotonic_remote_time) {
906 ASSERT_TRUE(pi1_on_pi1_timestamp_fetcher.FetchNext());
907 }
908
909 pi1_context = &pi1_on_pi1_timestamp_fetcher.context();
910 pi2_context = &pi1_on_pi2_timestamp_fetcher.context();
Austin Schuhac6d89e2024-03-27 14:56:09 -0700911
912 EXPECT_EQ(header_monotonic_remote_transmit_time,
913 pi2_context->monotonic_remote_time);
Austin Schuh89c9b812021-02-20 14:42:10 -0800914 } else if (header.channel_index() == ping_timestamp_channel) {
915 // Find the forwarded message.
916 while (ping_on_pi2_fetcher.context().monotonic_event_time <
917 header_monotonic_sent_time) {
918 ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
919 }
920
921 // And the source message.
922 while (ping_on_pi1_fetcher.context().monotonic_event_time <
923 header_monotonic_remote_time) {
924 ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
925 }
926
927 pi1_context = &ping_on_pi1_fetcher.context();
928 pi2_context = &ping_on_pi2_fetcher.context();
Austin Schuhac6d89e2024-03-27 14:56:09 -0700929
930 EXPECT_EQ(header_monotonic_remote_transmit_time,
931 pi2_context->monotonic_event_time -
932 simulated_event_loop_factory.network_delay());
Austin Schuh89c9b812021-02-20 14:42:10 -0800933 } else {
934 LOG(FATAL) << "Unknown channel";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700935 }
936
Austin Schuh89c9b812021-02-20 14:42:10 -0800937 // Confirm the forwarded message has matching timestamps to the
938 // timestamps we got back.
939 EXPECT_EQ(pi2_context->queue_index, header.queue_index());
940 EXPECT_EQ(pi2_context->remote_queue_index,
941 header.remote_queue_index());
942 EXPECT_EQ(pi2_context->monotonic_event_time,
943 header_monotonic_sent_time);
944 EXPECT_EQ(pi2_context->realtime_event_time,
945 header_realtime_sent_time);
946 EXPECT_EQ(pi2_context->realtime_remote_time,
947 header_realtime_remote_time);
948 EXPECT_EQ(pi2_context->monotonic_remote_time,
949 header_monotonic_remote_time);
Austin Schuhac6d89e2024-03-27 14:56:09 -0700950 EXPECT_EQ(pi2_context->monotonic_remote_transmit_time,
951 header_monotonic_remote_transmit_time);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700952
Austin Schuh89c9b812021-02-20 14:42:10 -0800953 // Confirm the forwarded message also matches the source message.
954 EXPECT_EQ(pi1_context->queue_index, header.remote_queue_index());
955 EXPECT_EQ(pi1_context->monotonic_event_time,
956 header_monotonic_remote_time);
957 EXPECT_EQ(pi1_context->realtime_event_time,
958 header_realtime_remote_time);
959 });
960 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700961
Austin Schuh4c3b9702020-08-30 11:34:55 -0700962 simulated_event_loop_factory.RunFor(chrono::seconds(10) -
963 chrono::milliseconds(500) +
964 chrono::milliseconds(5));
965
966 EXPECT_EQ(pi1_pong_counter.count(), 1001);
967 EXPECT_EQ(pi2_pong_counter.count(), 1001);
968
969 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 100);
970 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 100);
971 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 100);
972 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 100);
973 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 100);
974 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 100);
975 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 100);
976
Austin Schuh20ac95d2020-12-05 17:24:19 -0800977 EXPECT_EQ(pi1_server_statistics_count, 10);
978 EXPECT_EQ(pi2_server_statistics_count, 10);
979 EXPECT_EQ(pi3_server_statistics_count, 10);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700980
981 EXPECT_EQ(pi1_client_statistics_count, 95);
982 EXPECT_EQ(pi2_client_statistics_count, 95);
983 EXPECT_EQ(pi3_client_statistics_count, 95);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700984
985 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -0800986 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 1101);
987 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 1101);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700988}
989
990// Tests that an offset between nodes can be recovered and shows up in
991// ServerStatistics correctly.
992TEST(SimulatedEventLoopTest, MultinodePingPongWithOffset) {
993 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
Austin Schuh373f1762021-06-02 21:07:09 -0700994 aos::configuration::ReadConfig(ArtifactPath(
995 "aos/events/multinode_pingpong_test_combined_config.json"));
Austin Schuh4c3b9702020-08-30 11:34:55 -0700996 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
Austin Schuh87dd3832021-01-01 23:07:31 -0800997 const size_t pi1_index = configuration::GetNodeIndex(&config.message(), pi1);
998 ASSERT_EQ(pi1_index, 0u);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700999 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
Austin Schuh87dd3832021-01-01 23:07:31 -08001000 const size_t pi2_index = configuration::GetNodeIndex(&config.message(), pi2);
1001 ASSERT_EQ(pi2_index, 1u);
Austin Schuh4c3b9702020-08-30 11:34:55 -07001002 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
Austin Schuh87dd3832021-01-01 23:07:31 -08001003 const size_t pi3_index = configuration::GetNodeIndex(&config.message(), pi3);
1004 ASSERT_EQ(pi3_index, 2u);
Austin Schuh4c3b9702020-08-30 11:34:55 -07001005
Austin Schuh87dd3832021-01-01 23:07:31 -08001006 message_bridge::TestingTimeConverter time(
1007 configuration::NodesCount(&config.message()));
Austin Schuh4c3b9702020-08-30 11:34:55 -07001008 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
Austin Schuh58646e22021-08-23 23:51:46 -07001009 simulated_event_loop_factory.SetTimeConverter(&time);
Austin Schuh4c3b9702020-08-30 11:34:55 -07001010
1011 constexpr chrono::milliseconds kOffset{1501};
Austin Schuh87dd3832021-01-01 23:07:31 -08001012 time.AddNextTimestamp(
1013 distributed_clock::epoch(),
Austin Schuh58646e22021-08-23 23:51:46 -07001014 {BootTimestamp::epoch(), BootTimestamp::epoch() + kOffset,
1015 BootTimestamp::epoch()});
Austin Schuh4c3b9702020-08-30 11:34:55 -07001016
1017 std::unique_ptr<EventLoop> ping_event_loop =
1018 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
1019 Ping ping(ping_event_loop.get());
1020
1021 std::unique_ptr<EventLoop> pong_event_loop =
1022 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
1023 Pong pong(pong_event_loop.get());
1024
Austin Schuh8fb315a2020-11-19 22:33:58 -08001025 // Wait to let timestamp estimation start up before looking for the results.
1026 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
1027
Austin Schuh87dd3832021-01-01 23:07:31 -08001028 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
1029 simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
1030
Austin Schuh4c3b9702020-08-30 11:34:55 -07001031 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
1032 simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
1033
1034 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
1035 simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
1036
Austin Schuh4c3b9702020-08-30 11:34:55 -07001037 // Confirm the offsets are being recovered correctly.
1038 int pi1_server_statistics_count = 0;
1039 pi1_pong_counter_event_loop->MakeWatcher(
1040 "/pi1/aos", [&pi1_server_statistics_count,
1041 kOffset](const message_bridge::ServerStatistics &stats) {
1042 VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
1043 EXPECT_EQ(stats.connections()->size(), 2u);
1044 for (const message_bridge::ServerConnection *connection :
1045 *stats.connections()) {
1046 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001047 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -07001048 if (connection->node()->name()->string_view() == "pi2") {
1049 EXPECT_EQ(connection->monotonic_offset(),
1050 chrono::nanoseconds(kOffset).count());
1051 } else if (connection->node()->name()->string_view() == "pi3") {
1052 EXPECT_EQ(connection->monotonic_offset(), 0);
1053 } else {
1054 LOG(FATAL) << "Unknown connection";
1055 }
1056
1057 EXPECT_TRUE(connection->has_monotonic_offset());
1058 }
1059 ++pi1_server_statistics_count;
1060 });
1061
1062 int pi2_server_statistics_count = 0;
1063 pi2_pong_counter_event_loop->MakeWatcher(
1064 "/pi2/aos", [&pi2_server_statistics_count,
1065 kOffset](const message_bridge::ServerStatistics &stats) {
1066 VLOG(1) << "pi2 ServerStatistics " << FlatbufferToJson(&stats);
1067 EXPECT_EQ(stats.connections()->size(), 1u);
1068
1069 const message_bridge::ServerConnection *connection =
1070 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001071 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -07001072 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1073 EXPECT_TRUE(connection->has_monotonic_offset());
1074 EXPECT_EQ(connection->monotonic_offset(),
1075 -chrono::nanoseconds(kOffset).count());
1076 ++pi2_server_statistics_count;
1077 });
1078
1079 int pi3_server_statistics_count = 0;
1080 pi3_pong_counter_event_loop->MakeWatcher(
1081 "/pi3/aos", [&pi3_server_statistics_count](
1082 const message_bridge::ServerStatistics &stats) {
1083 VLOG(1) << "pi3 ServerStatistics " << FlatbufferToJson(&stats);
1084 EXPECT_EQ(stats.connections()->size(), 1u);
1085
1086 const message_bridge::ServerConnection *connection =
1087 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001088 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -07001089 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1090 EXPECT_TRUE(connection->has_monotonic_offset());
1091 EXPECT_EQ(connection->monotonic_offset(), 0);
1092 ++pi3_server_statistics_count;
1093 });
1094
1095 simulated_event_loop_factory.RunFor(chrono::seconds(10) -
1096 chrono::milliseconds(500) +
1097 chrono::milliseconds(5));
1098
Austin Schuh20ac95d2020-12-05 17:24:19 -08001099 EXPECT_EQ(pi1_server_statistics_count, 10);
Austin Schuh58646e22021-08-23 23:51:46 -07001100 EXPECT_EQ(pi2_server_statistics_count, 10);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001101 EXPECT_EQ(pi3_server_statistics_count, 10);
Austin Schuh4c3b9702020-08-30 11:34:55 -07001102}
1103
1104// Test that disabling statistics actually disables them.
Austin Schuh89c9b812021-02-20 14:42:10 -08001105TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeWithoutStatistics) {
Austin Schuh4c3b9702020-08-30 11:34:55 -07001106 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
1107 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
1108 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
1109
1110 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
1111 simulated_event_loop_factory.DisableStatistics();
1112
1113 std::unique_ptr<EventLoop> ping_event_loop =
1114 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
1115 Ping ping(ping_event_loop.get());
1116
1117 std::unique_ptr<EventLoop> pong_event_loop =
1118 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
1119 Pong pong(pong_event_loop.get());
1120
1121 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
1122 simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
1123
1124 MessageCounter<examples::Pong> pi2_pong_counter(
1125 pi2_pong_counter_event_loop.get(), "/test");
1126
1127 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
1128 simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
1129
1130 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
1131 simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
1132
1133 MessageCounter<examples::Pong> pi1_pong_counter(
1134 pi1_pong_counter_event_loop.get(), "/test");
1135
1136 // Count timestamps.
1137 MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
1138 pi1_pong_counter_event_loop.get(), "/pi1/aos");
1139 MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
1140 pi2_pong_counter_event_loop.get(), "/pi1/aos");
1141 MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
1142 pi3_pong_counter_event_loop.get(), "/pi1/aos");
1143 MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
1144 pi1_pong_counter_event_loop.get(), "/pi2/aos");
1145 MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
1146 pi2_pong_counter_event_loop.get(), "/pi2/aos");
1147 MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
1148 pi1_pong_counter_event_loop.get(), "/pi3/aos");
1149 MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
1150 pi3_pong_counter_event_loop.get(), "/pi3/aos");
1151
Austin Schuh2f8fd752020-09-01 22:38:28 -07001152 // Count remote timestamps
Austin Schuh89c9b812021-02-20 14:42:10 -08001153 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1154 remote_timestamps_pi2_on_pi1 =
1155 MakePi2OnPi1MessageCounters(pi1_pong_counter_event_loop.get());
1156 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1157 remote_timestamps_pi1_on_pi2 =
1158 MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
Austin Schuh2f8fd752020-09-01 22:38:28 -07001159
Austin Schuh4c3b9702020-08-30 11:34:55 -07001160 MessageCounter<message_bridge::ServerStatistics>
1161 pi1_server_statistics_counter(pi1_pong_counter_event_loop.get(),
1162 "/pi1/aos");
1163 MessageCounter<message_bridge::ServerStatistics>
1164 pi2_server_statistics_counter(pi2_pong_counter_event_loop.get(),
1165 "/pi2/aos");
1166 MessageCounter<message_bridge::ServerStatistics>
1167 pi3_server_statistics_counter(pi3_pong_counter_event_loop.get(),
1168 "/pi3/aos");
1169
1170 MessageCounter<message_bridge::ClientStatistics>
1171 pi1_client_statistics_counter(pi1_pong_counter_event_loop.get(),
1172 "/pi1/aos");
1173 MessageCounter<message_bridge::ClientStatistics>
1174 pi2_client_statistics_counter(pi2_pong_counter_event_loop.get(),
1175 "/pi2/aos");
1176 MessageCounter<message_bridge::ClientStatistics>
1177 pi3_client_statistics_counter(pi3_pong_counter_event_loop.get(),
1178 "/pi3/aos");
Austin Schuh898f4972020-01-11 17:21:25 -08001179
1180 simulated_event_loop_factory.RunFor(chrono::seconds(10) +
1181 chrono::milliseconds(5));
1182
Austin Schuh4c3b9702020-08-30 11:34:55 -07001183 EXPECT_EQ(pi1_pong_counter.count(), 1001u);
1184 EXPECT_EQ(pi2_pong_counter.count(), 1001u);
1185
1186 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 0u);
1187 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 0u);
1188 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 0u);
1189 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 0u);
1190 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 0u);
1191 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 0u);
1192 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 0u);
1193
1194 EXPECT_EQ(pi1_server_statistics_counter.count(), 0u);
1195 EXPECT_EQ(pi2_server_statistics_counter.count(), 0u);
1196 EXPECT_EQ(pi3_server_statistics_counter.count(), 0u);
1197
1198 EXPECT_EQ(pi1_client_statistics_counter.count(), 0u);
1199 EXPECT_EQ(pi2_client_statistics_counter.count(), 0u);
1200 EXPECT_EQ(pi3_client_statistics_counter.count(), 0u);
Austin Schuh2f8fd752020-09-01 22:38:28 -07001201
1202 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001203 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 1001);
1204 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 1001);
Austin Schuh898f4972020-01-11 17:21:25 -08001205}
1206
Austin Schuhc0b0f722020-12-12 18:36:06 -08001207bool AllConnected(const message_bridge::ServerStatistics *server_statistics) {
1208 for (const message_bridge::ServerConnection *connection :
1209 *server_statistics->connections()) {
1210 if (connection->state() != message_bridge::State::CONNECTED) {
1211 return false;
1212 }
1213 }
1214 return true;
1215}
1216
1217bool AllConnectedBut(const message_bridge::ServerStatistics *server_statistics,
1218 std::string_view target) {
1219 for (const message_bridge::ServerConnection *connection :
1220 *server_statistics->connections()) {
1221 if (connection->node()->name()->string_view() == target) {
1222 if (connection->state() == message_bridge::State::CONNECTED) {
1223 return false;
1224 }
1225 } else {
1226 if (connection->state() != message_bridge::State::CONNECTED) {
1227 return false;
1228 }
1229 }
1230 }
1231 return true;
1232}
1233
1234bool AllConnected(const message_bridge::ClientStatistics *client_statistics) {
1235 for (const message_bridge::ClientConnection *connection :
1236 *client_statistics->connections()) {
1237 if (connection->state() != message_bridge::State::CONNECTED) {
1238 return false;
1239 }
Austin Schuh367a7f42021-11-23 23:04:36 -08001240 EXPECT_TRUE(connection->has_boot_uuid());
1241 EXPECT_TRUE(connection->has_connected_since_time());
1242 EXPECT_TRUE(connection->has_connection_count());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001243 }
1244 return true;
1245}
1246
1247bool AllConnectedBut(const message_bridge::ClientStatistics *client_statistics,
1248 std::string_view target) {
1249 for (const message_bridge::ClientConnection *connection :
1250 *client_statistics->connections()) {
1251 if (connection->node()->name()->string_view() == target) {
1252 if (connection->state() == message_bridge::State::CONNECTED) {
1253 return false;
1254 }
Austin Schuh367a7f42021-11-23 23:04:36 -08001255 EXPECT_FALSE(connection->has_boot_uuid());
1256 EXPECT_FALSE(connection->has_connected_since_time());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001257 } else {
1258 if (connection->state() != message_bridge::State::CONNECTED) {
1259 return false;
1260 }
Austin Schuh367a7f42021-11-23 23:04:36 -08001261 EXPECT_TRUE(connection->has_boot_uuid());
1262 EXPECT_TRUE(connection->has_connected_since_time());
1263 EXPECT_TRUE(connection->has_connection_count());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001264 }
1265 }
1266 return true;
1267}
1268
Austin Schuh367a7f42021-11-23 23:04:36 -08001269int ConnectedCount(const message_bridge::ClientStatistics *client_statistics,
1270 std::string_view target) {
1271 for (const message_bridge::ClientConnection *connection :
1272 *client_statistics->connections()) {
1273 if (connection->node()->name()->string_view() == target) {
1274 return connection->connection_count();
1275 }
1276 }
1277 return 0;
1278}
1279
1280int ConnectedCount(const message_bridge::ServerStatistics *server_statistics,
1281 std::string_view target) {
1282 for (const message_bridge::ServerConnection *connection :
1283 *server_statistics->connections()) {
1284 if (connection->node()->name()->string_view() == target) {
1285 return connection->connection_count();
1286 }
1287 }
1288 return 0;
1289}
1290
Austin Schuhc0b0f722020-12-12 18:36:06 -08001291// Test that disconnecting nodes actually disconnects them.
Austin Schuh89c9b812021-02-20 14:42:10 -08001292TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeDisconnect) {
Austin Schuhc0b0f722020-12-12 18:36:06 -08001293 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
1294
Austin Schuh58646e22021-08-23 23:51:46 -07001295 NodeEventLoopFactory *pi1 =
1296 simulated_event_loop_factory.GetNodeEventLoopFactory("pi1");
1297 NodeEventLoopFactory *pi2 =
1298 simulated_event_loop_factory.GetNodeEventLoopFactory("pi2");
1299 NodeEventLoopFactory *pi3 =
1300 simulated_event_loop_factory.GetNodeEventLoopFactory("pi3");
1301
1302 std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001303 Ping ping(ping_event_loop.get());
1304
Austin Schuh58646e22021-08-23 23:51:46 -07001305 std::unique_ptr<EventLoop> pong_event_loop = pi2->MakeEventLoop("pong");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001306 Pong pong(pong_event_loop.get());
1307
1308 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
Austin Schuh58646e22021-08-23 23:51:46 -07001309 pi2->MakeEventLoop("pi2_pong_counter");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001310
1311 MessageCounter<examples::Pong> pi2_pong_counter(
1312 pi2_pong_counter_event_loop.get(), "/test");
1313
1314 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
Austin Schuh58646e22021-08-23 23:51:46 -07001315 pi3->MakeEventLoop("pi3_pong_counter");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001316
1317 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
Austin Schuh58646e22021-08-23 23:51:46 -07001318 pi1->MakeEventLoop("pi1_pong_counter");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001319
1320 MessageCounter<examples::Pong> pi1_pong_counter(
1321 pi1_pong_counter_event_loop.get(), "/test");
1322
1323 // Count timestamps.
1324 MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
1325 pi1_pong_counter_event_loop.get(), "/pi1/aos");
1326 MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
1327 pi2_pong_counter_event_loop.get(), "/pi1/aos");
1328 MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
1329 pi3_pong_counter_event_loop.get(), "/pi1/aos");
1330 MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
1331 pi1_pong_counter_event_loop.get(), "/pi2/aos");
1332 MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
1333 pi2_pong_counter_event_loop.get(), "/pi2/aos");
1334 MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
1335 pi1_pong_counter_event_loop.get(), "/pi3/aos");
1336 MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
1337 pi3_pong_counter_event_loop.get(), "/pi3/aos");
1338
1339 // Count remote timestamps
Austin Schuh89c9b812021-02-20 14:42:10 -08001340 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1341 remote_timestamps_pi2_on_pi1 =
1342 MakePi2OnPi1MessageCounters(pi1_pong_counter_event_loop.get());
1343 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1344 remote_timestamps_pi1_on_pi2 =
1345 MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001346
1347 MessageCounter<message_bridge::ServerStatistics>
Austin Schuh58646e22021-08-23 23:51:46 -07001348 *pi1_server_statistics_counter;
1349 pi1->OnStartup([pi1, &pi1_server_statistics_counter]() {
1350 pi1_server_statistics_counter =
1351 pi1->AlwaysStart<MessageCounter<message_bridge::ServerStatistics>>(
1352 "pi1_server_statistics_counter", "/pi1/aos");
1353 });
1354
Austin Schuhc0b0f722020-12-12 18:36:06 -08001355 aos::Fetcher<message_bridge::ServerStatistics> pi1_server_statistics_fetcher =
1356 pi1_pong_counter_event_loop
1357 ->MakeFetcher<message_bridge::ServerStatistics>("/pi1/aos");
1358 aos::Fetcher<message_bridge::ClientStatistics> pi1_client_statistics_fetcher =
1359 pi1_pong_counter_event_loop
1360 ->MakeFetcher<message_bridge::ClientStatistics>("/pi1/aos");
1361
1362 MessageCounter<message_bridge::ServerStatistics>
Austin Schuh58646e22021-08-23 23:51:46 -07001363 *pi2_server_statistics_counter;
1364 pi2->OnStartup([pi2, &pi2_server_statistics_counter]() {
1365 pi2_server_statistics_counter =
1366 pi2->AlwaysStart<MessageCounter<message_bridge::ServerStatistics>>(
1367 "pi2_server_statistics_counter", "/pi2/aos");
1368 });
Austin Schuhc0b0f722020-12-12 18:36:06 -08001369 aos::Fetcher<message_bridge::ServerStatistics> pi2_server_statistics_fetcher =
1370 pi2_pong_counter_event_loop
1371 ->MakeFetcher<message_bridge::ServerStatistics>("/pi2/aos");
1372 aos::Fetcher<message_bridge::ClientStatistics> pi2_client_statistics_fetcher =
1373 pi2_pong_counter_event_loop
1374 ->MakeFetcher<message_bridge::ClientStatistics>("/pi2/aos");
1375
1376 MessageCounter<message_bridge::ServerStatistics>
Austin Schuh58646e22021-08-23 23:51:46 -07001377 *pi3_server_statistics_counter;
1378 pi3->OnStartup([pi3, &pi3_server_statistics_counter]() {
1379 pi3_server_statistics_counter =
1380 pi3->AlwaysStart<MessageCounter<message_bridge::ServerStatistics>>(
1381 "pi3_server_statistics_counter", "/pi3/aos");
1382 });
Austin Schuhc0b0f722020-12-12 18:36:06 -08001383 aos::Fetcher<message_bridge::ServerStatistics> pi3_server_statistics_fetcher =
1384 pi3_pong_counter_event_loop
1385 ->MakeFetcher<message_bridge::ServerStatistics>("/pi3/aos");
1386 aos::Fetcher<message_bridge::ClientStatistics> pi3_client_statistics_fetcher =
1387 pi3_pong_counter_event_loop
1388 ->MakeFetcher<message_bridge::ClientStatistics>("/pi3/aos");
1389
1390 MessageCounter<message_bridge::ClientStatistics>
1391 pi1_client_statistics_counter(pi1_pong_counter_event_loop.get(),
1392 "/pi1/aos");
1393 MessageCounter<message_bridge::ClientStatistics>
1394 pi2_client_statistics_counter(pi2_pong_counter_event_loop.get(),
1395 "/pi2/aos");
1396 MessageCounter<message_bridge::ClientStatistics>
1397 pi3_client_statistics_counter(pi3_pong_counter_event_loop.get(),
1398 "/pi3/aos");
1399
James Kuszmaul86e86c32022-07-21 17:39:47 -07001400 std::vector<std::unique_ptr<aos::EventLoop>> statistics_watcher_loops;
1401 statistics_watcher_loops.emplace_back(pi1->MakeEventLoop("test"));
1402 statistics_watcher_loops.emplace_back(pi2->MakeEventLoop("test"));
1403 statistics_watcher_loops.emplace_back(pi3->MakeEventLoop("test"));
1404 // The currenct contract is that, if all nodes boot simultaneously in
1405 // simulation, that they should all act as if they area already connected,
1406 // without ever observing the transition from disconnected to connected (note
1407 // that on a real system the ServerStatistics message will get resent for each
1408 // and every new connection, even if the new connections happen
1409 // "simultaneously"--in simulation, we are essentially acting as if we are
1410 // starting execution in an already running system, rather than observing the
1411 // boot process).
1412 for (auto &event_loop : statistics_watcher_loops) {
1413 event_loop->MakeWatcher(
1414 "/aos", [](const message_bridge::ServerStatistics &msg) {
1415 for (const message_bridge::ServerConnection *connection :
1416 *msg.connections()) {
1417 EXPECT_EQ(message_bridge::State::CONNECTED, connection->state())
1418 << connection->node()->name()->string_view();
1419 }
1420 });
1421 }
1422
Austin Schuhc0b0f722020-12-12 18:36:06 -08001423 simulated_event_loop_factory.RunFor(chrono::seconds(2) +
1424 chrono::milliseconds(5));
1425
James Kuszmaul86e86c32022-07-21 17:39:47 -07001426 statistics_watcher_loops.clear();
1427
Austin Schuhc0b0f722020-12-12 18:36:06 -08001428 EXPECT_EQ(pi1_pong_counter.count(), 201u);
1429 EXPECT_EQ(pi2_pong_counter.count(), 201u);
1430
1431 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 20u);
1432 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 20u);
1433 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 20u);
1434 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 20u);
1435 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 20u);
1436 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 20u);
1437 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 20u);
1438
Austin Schuh58646e22021-08-23 23:51:46 -07001439 EXPECT_EQ(pi1_server_statistics_counter->count(), 2u);
1440 EXPECT_EQ(pi2_server_statistics_counter->count(), 2u);
1441 EXPECT_EQ(pi3_server_statistics_counter->count(), 2u);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001442
1443 EXPECT_EQ(pi1_client_statistics_counter.count(), 20u);
1444 EXPECT_EQ(pi2_client_statistics_counter.count(), 20u);
1445 EXPECT_EQ(pi3_client_statistics_counter.count(), 20u);
1446
1447 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001448 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 221);
1449 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 221);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001450
1451 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1452 EXPECT_TRUE(AllConnected(pi1_server_statistics_fetcher.get()))
1453 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1454 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1455 EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
1456 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1457 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1458 EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
1459 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1460 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1461 EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
1462 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1463 EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
1464 EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
1465 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
1466 EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
1467 EXPECT_TRUE(AllConnected(pi3_client_statistics_fetcher.get()))
1468 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1469
Austin Schuh58646e22021-08-23 23:51:46 -07001470 pi1->Disconnect(pi3->node());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001471
1472 simulated_event_loop_factory.RunFor(chrono::seconds(2));
1473
1474 EXPECT_EQ(pi1_pong_counter.count(), 401u);
1475 EXPECT_EQ(pi2_pong_counter.count(), 401u);
1476
1477 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 40u);
1478 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 40u);
1479 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 20u);
1480 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 40u);
1481 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 40u);
1482 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 40u);
1483 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 40u);
1484
Austin Schuh58646e22021-08-23 23:51:46 -07001485 EXPECT_EQ(pi1_server_statistics_counter->count(), 4u);
1486 EXPECT_EQ(pi2_server_statistics_counter->count(), 4u);
1487 EXPECT_EQ(pi3_server_statistics_counter->count(), 4u);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001488
1489 EXPECT_EQ(pi1_client_statistics_counter.count(), 40u);
1490 EXPECT_EQ(pi2_client_statistics_counter.count(), 40u);
1491 EXPECT_EQ(pi3_client_statistics_counter.count(), 40u);
1492
1493 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001494 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 441);
1495 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 441);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001496
1497 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1498 EXPECT_TRUE(AllConnectedBut(pi1_server_statistics_fetcher.get(), "pi3"))
1499 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1500 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1501 EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
1502 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1503 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1504 EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
1505 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1506 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1507 EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
1508 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1509 EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
1510 EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
1511 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
1512 EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
1513 EXPECT_TRUE(AllConnectedBut(pi3_client_statistics_fetcher.get(), "pi1"))
1514 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1515
Austin Schuh58646e22021-08-23 23:51:46 -07001516 pi1->Connect(pi3->node());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001517
1518 simulated_event_loop_factory.RunFor(chrono::seconds(2));
1519
Austin Schuh367a7f42021-11-23 23:04:36 -08001520 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1521 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1522 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1523 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1524 EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
1525 EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
1526
1527 EXPECT_EQ(ConnectedCount(pi1_server_statistics_fetcher.get(), "pi3"), 2u)
1528 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1529 EXPECT_EQ(ConnectedCount(pi1_server_statistics_fetcher.get(), "pi2"), 1u)
1530 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1531 EXPECT_EQ(ConnectedCount(pi1_client_statistics_fetcher.get(), "pi3"), 1u)
1532 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1533 EXPECT_EQ(ConnectedCount(pi1_client_statistics_fetcher.get(), "pi2"), 1u)
1534 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1535
1536 EXPECT_EQ(ConnectedCount(pi2_server_statistics_fetcher.get(), "pi1"), 1u)
1537 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1538 EXPECT_EQ(ConnectedCount(pi2_client_statistics_fetcher.get(), "pi1"), 1u)
1539 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1540
1541 EXPECT_EQ(ConnectedCount(pi3_server_statistics_fetcher.get(), "pi1"), 1u)
1542 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
1543 EXPECT_EQ(ConnectedCount(pi3_client_statistics_fetcher.get(), "pi1"), 2u)
1544 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1545
Austin Schuhc0b0f722020-12-12 18:36:06 -08001546 EXPECT_EQ(pi1_pong_counter.count(), 601u);
1547 EXPECT_EQ(pi2_pong_counter.count(), 601u);
1548
1549 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 60u);
1550 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 60u);
1551 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 40u);
1552 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 60u);
1553 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 60u);
1554 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 60u);
1555 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 60u);
1556
Austin Schuh58646e22021-08-23 23:51:46 -07001557 EXPECT_EQ(pi1_server_statistics_counter->count(), 6u);
1558 EXPECT_EQ(pi2_server_statistics_counter->count(), 6u);
1559 EXPECT_EQ(pi3_server_statistics_counter->count(), 6u);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001560
1561 EXPECT_EQ(pi1_client_statistics_counter.count(), 60u);
1562 EXPECT_EQ(pi2_client_statistics_counter.count(), 60u);
1563 EXPECT_EQ(pi3_client_statistics_counter.count(), 60u);
1564
1565 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001566 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 661);
1567 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 661);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001568
Austin Schuhc0b0f722020-12-12 18:36:06 -08001569 EXPECT_TRUE(AllConnected(pi1_server_statistics_fetcher.get()))
1570 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001571 EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
1572 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001573 EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
1574 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001575 EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
1576 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001577 EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
1578 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001579 EXPECT_TRUE(AllConnected(pi3_client_statistics_fetcher.get()))
1580 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1581}
1582
Austin Schuh2febf0d2020-09-21 22:24:30 -07001583// Tests that the time offset having a slope doesn't break the world.
1584// SimulatedMessageBridge has enough self consistency CHECK statements to
1585// confirm, and we can can also check a message in each direction to make sure
1586// it gets delivered as expected.
1587TEST(SimulatedEventLoopTest, MultinodePingPongWithOffsetAndSlope) {
1588 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
Austin Schuh373f1762021-06-02 21:07:09 -07001589 aos::configuration::ReadConfig(ArtifactPath(
1590 "aos/events/multinode_pingpong_test_combined_config.json"));
Austin Schuh2febf0d2020-09-21 22:24:30 -07001591 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
Austin Schuh87dd3832021-01-01 23:07:31 -08001592 const size_t pi1_index = configuration::GetNodeIndex(&config.message(), pi1);
1593 ASSERT_EQ(pi1_index, 0u);
Austin Schuh2febf0d2020-09-21 22:24:30 -07001594 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
Austin Schuh87dd3832021-01-01 23:07:31 -08001595 const size_t pi2_index = configuration::GetNodeIndex(&config.message(), pi2);
1596 ASSERT_EQ(pi2_index, 1u);
1597 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
1598 const size_t pi3_index = configuration::GetNodeIndex(&config.message(), pi3);
1599 ASSERT_EQ(pi3_index, 2u);
Austin Schuh2febf0d2020-09-21 22:24:30 -07001600
Austin Schuh87dd3832021-01-01 23:07:31 -08001601 message_bridge::TestingTimeConverter time(
1602 configuration::NodesCount(&config.message()));
Austin Schuh2febf0d2020-09-21 22:24:30 -07001603 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
Austin Schuh58646e22021-08-23 23:51:46 -07001604 simulated_event_loop_factory.SetTimeConverter(&time);
Austin Schuh2febf0d2020-09-21 22:24:30 -07001605
Austin Schuh2febf0d2020-09-21 22:24:30 -07001606 constexpr chrono::milliseconds kOffset{150100};
Austin Schuh87dd3832021-01-01 23:07:31 -08001607 time.AddNextTimestamp(
1608 distributed_clock::epoch(),
Austin Schuh58646e22021-08-23 23:51:46 -07001609 {BootTimestamp::epoch(), BootTimestamp::epoch() + kOffset,
1610 BootTimestamp::epoch()});
1611 time.AddNextTimestamp(distributed_clock::epoch() + chrono::seconds(10),
1612 {BootTimestamp::epoch() + chrono::milliseconds(9999),
1613 BootTimestamp::epoch() + kOffset + chrono::seconds(10),
1614 BootTimestamp::epoch() + chrono::milliseconds(9999)});
Austin Schuh2febf0d2020-09-21 22:24:30 -07001615
1616 std::unique_ptr<EventLoop> ping_event_loop =
1617 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
1618 Ping ping(ping_event_loop.get());
1619
1620 std::unique_ptr<EventLoop> pong_event_loop =
1621 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
1622 Pong pong(pong_event_loop.get());
1623
1624 std::unique_ptr<EventLoop> pi1_counter_event_loop =
1625 simulated_event_loop_factory.MakeEventLoop("pi1_counter", pi1);
1626 std::unique_ptr<EventLoop> pi2_counter_event_loop =
1627 simulated_event_loop_factory.MakeEventLoop("pi2_counter", pi2);
1628
1629 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
1630 pi1_counter_event_loop->MakeFetcher<examples::Ping>("/test");
1631 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
1632 pi2_counter_event_loop->MakeFetcher<examples::Ping>("/test");
1633
1634 aos::Fetcher<examples::Pong> pong_on_pi2_fetcher =
1635 pi2_counter_event_loop->MakeFetcher<examples::Pong>("/test");
1636 aos::Fetcher<examples::Pong> pong_on_pi1_fetcher =
1637 pi1_counter_event_loop->MakeFetcher<examples::Pong>("/test");
1638
1639 // End after a pong message comes back. This will leave the latest messages
1640 // on all channels so we can look at timestamps easily and check they make
1641 // sense.
1642 std::unique_ptr<EventLoop> pi1_pong_ender =
1643 simulated_event_loop_factory.MakeEventLoop("pi2_counter", pi1);
1644 int count = 0;
1645 pi1_pong_ender->MakeWatcher(
1646 "/test", [&simulated_event_loop_factory, &count](const examples::Pong &) {
1647 if (++count == 100) {
1648 simulated_event_loop_factory.Exit();
1649 }
1650 });
1651
1652 // Run enough that messages should be delivered.
1653 simulated_event_loop_factory.Run();
1654
1655 // Grab the latest messages.
1656 EXPECT_TRUE(ping_on_pi1_fetcher.Fetch());
1657 EXPECT_TRUE(ping_on_pi2_fetcher.Fetch());
1658 EXPECT_TRUE(pong_on_pi1_fetcher.Fetch());
1659 EXPECT_TRUE(pong_on_pi2_fetcher.Fetch());
1660
1661 // Compute their time on the global distributed clock so we can compute
1662 // distance betwen them.
1663 const distributed_clock::time_point pi1_ping_time =
1664 simulated_event_loop_factory.GetNodeEventLoopFactory(pi1)
1665 ->ToDistributedClock(
1666 ping_on_pi1_fetcher.context().monotonic_event_time);
1667 const distributed_clock::time_point pi2_ping_time =
1668 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
1669 ->ToDistributedClock(
1670 ping_on_pi2_fetcher.context().monotonic_event_time);
1671 const distributed_clock::time_point pi1_pong_time =
1672 simulated_event_loop_factory.GetNodeEventLoopFactory(pi1)
1673 ->ToDistributedClock(
1674 pong_on_pi1_fetcher.context().monotonic_event_time);
1675 const distributed_clock::time_point pi2_pong_time =
1676 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
1677 ->ToDistributedClock(
1678 pong_on_pi2_fetcher.context().monotonic_event_time);
1679
1680 // And confirm the delivery delay is just about exactly 150 uS for both
1681 // directions like expected. There will be a couple ns of rounding errors in
1682 // the conversion functions that aren't worth accounting for right now. This
1683 // will either be really close, or really far.
1684 EXPECT_GE(pi2_ping_time, chrono::microseconds(150) - chrono::nanoseconds(10) +
1685 pi1_ping_time);
1686 EXPECT_LE(pi2_ping_time, chrono::microseconds(150) + chrono::nanoseconds(10) +
1687 pi1_ping_time);
1688
1689 EXPECT_GE(pi1_pong_time, chrono::microseconds(150) - chrono::nanoseconds(10) +
1690 pi2_pong_time);
1691 EXPECT_LE(pi1_pong_time, chrono::microseconds(150) + chrono::nanoseconds(10) +
1692 pi2_pong_time);
1693}
1694
Austin Schuh4c570ea2020-11-19 23:13:24 -08001695void SendPing(aos::Sender<examples::Ping> *sender, int value) {
1696 aos::Sender<examples::Ping>::Builder builder = sender->MakeBuilder();
1697 examples::Ping::Builder ping_builder = builder.MakeBuilder<examples::Ping>();
1698 ping_builder.add_value(value);
milind1f1dca32021-07-03 13:50:07 -07001699 builder.CheckOk(builder.Send(ping_builder.Finish()));
Austin Schuh4c570ea2020-11-19 23:13:24 -08001700}
1701
1702// Tests that reliable (and unreliable) ping messages get forwarded as expected.
Austin Schuh89c9b812021-02-20 14:42:10 -08001703TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeStartupTesting) {
Austin Schuh4c570ea2020-11-19 23:13:24 -08001704 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
1705 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
1706
1707 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
1708
1709 std::unique_ptr<EventLoop> ping_event_loop =
1710 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
1711 aos::Sender<examples::Ping> pi1_reliable_sender =
1712 ping_event_loop->MakeSender<examples::Ping>("/reliable");
1713 aos::Sender<examples::Ping> pi1_unreliable_sender =
1714 ping_event_loop->MakeSender<examples::Ping>("/unreliable");
1715 SendPing(&pi1_reliable_sender, 1);
1716 SendPing(&pi1_unreliable_sender, 1);
1717
1718 std::unique_ptr<EventLoop> pi2_pong_event_loop =
1719 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
James Kuszmaul86e86c32022-07-21 17:39:47 -07001720 aos::Sender<examples::Ping> pi2_reliable_sender =
1721 pi2_pong_event_loop->MakeSender<examples::Ping>("/reliable2");
1722 SendPing(&pi2_reliable_sender, 1);
Austin Schuh4c570ea2020-11-19 23:13:24 -08001723 MessageCounter<examples::Ping> pi2_reliable_counter(pi2_pong_event_loop.get(),
1724 "/reliable");
James Kuszmaul86e86c32022-07-21 17:39:47 -07001725 MessageCounter<examples::Ping> pi1_reliable_counter(ping_event_loop.get(),
1726 "/reliable2");
Austin Schuh4c570ea2020-11-19 23:13:24 -08001727 MessageCounter<examples::Ping> pi2_unreliable_counter(
1728 pi2_pong_event_loop.get(), "/unreliable");
1729 aos::Fetcher<examples::Ping> reliable_on_pi2_fetcher =
1730 pi2_pong_event_loop->MakeFetcher<examples::Ping>("/reliable");
1731 aos::Fetcher<examples::Ping> unreliable_on_pi2_fetcher =
1732 pi2_pong_event_loop->MakeFetcher<examples::Ping>("/unreliable");
1733
1734 const size_t reliable_channel_index = configuration::ChannelIndex(
1735 pi2_pong_event_loop->configuration(), reliable_on_pi2_fetcher.channel());
1736
1737 std::unique_ptr<EventLoop> pi1_remote_timestamp =
1738 simulated_event_loop_factory.MakeEventLoop("pi1_remote_timestamp", pi1);
1739
Austin Schuheeaa2022021-01-02 21:52:03 -08001740 const chrono::nanoseconds network_delay =
1741 simulated_event_loop_factory.network_delay();
1742
Austin Schuh4c570ea2020-11-19 23:13:24 -08001743 int reliable_timestamp_count = 0;
1744 pi1_remote_timestamp->MakeWatcher(
Austin Schuh89c9b812021-02-20 14:42:10 -08001745 shared() ? "/pi1/aos/remote_timestamps/pi2"
1746 : "/pi1/aos/remote_timestamps/pi2/reliable/aos-examples-Ping",
Austin Schuh20ac95d2020-12-05 17:24:19 -08001747 [reliable_channel_index, &reliable_timestamp_count,
Austin Schuheeaa2022021-01-02 21:52:03 -08001748 &simulated_event_loop_factory, pi2, network_delay, &pi2_pong_event_loop,
1749 &pi1_remote_timestamp](const RemoteMessage &header) {
Austin Schuh20ac95d2020-12-05 17:24:19 -08001750 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuhcdd90272021-03-15 12:46:16 -07001751 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
Austin Schuh20ac95d2020-12-05 17:24:19 -08001752 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
Austin Schuhcdd90272021-03-15 12:46:16 -07001753 ->boot_uuid());
Austin Schuh4c570ea2020-11-19 23:13:24 -08001754 VLOG(1) << aos::FlatbufferToJson(&header);
1755 if (header.channel_index() == reliable_channel_index) {
1756 ++reliable_timestamp_count;
1757 }
Austin Schuheeaa2022021-01-02 21:52:03 -08001758
1759 const aos::monotonic_clock::time_point header_monotonic_sent_time(
1760 chrono::nanoseconds(header.monotonic_sent_time()));
1761
1762 EXPECT_EQ(pi1_remote_timestamp->context().monotonic_event_time,
1763 header_monotonic_sent_time + network_delay +
1764 (pi1_remote_timestamp->monotonic_now() -
1765 pi2_pong_event_loop->monotonic_now()));
Austin Schuh4c570ea2020-11-19 23:13:24 -08001766 });
1767
1768 // Wait to let timestamp estimation start up before looking for the results.
1769 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
1770
1771 EXPECT_EQ(pi2_reliable_counter.count(), 1u);
1772 // This one isn't reliable, but was sent before the start. It should *not* be
1773 // delivered.
1774 EXPECT_EQ(pi2_unreliable_counter.count(), 0u);
1775 // Confirm we got a timestamp logged for the message that was forwarded.
1776 EXPECT_EQ(reliable_timestamp_count, 1u);
1777
1778 SendPing(&pi1_reliable_sender, 2);
1779 SendPing(&pi1_unreliable_sender, 2);
1780 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
1781 EXPECT_EQ(pi2_reliable_counter.count(), 2u);
James Kuszmaul86e86c32022-07-21 17:39:47 -07001782 EXPECT_EQ(pi1_reliable_counter.count(), 1u);
Austin Schuh4c570ea2020-11-19 23:13:24 -08001783 EXPECT_EQ(pi2_unreliable_counter.count(), 1u);
1784
1785 EXPECT_EQ(reliable_timestamp_count, 2u);
1786}
1787
Austin Schuh20ac95d2020-12-05 17:24:19 -08001788// Tests that rebooting a node changes the ServerStatistics message and the
1789// RemoteTimestamp message.
Austin Schuh89c9b812021-02-20 14:42:10 -08001790TEST_P(RemoteMessageSimulatedEventLoopTest, BootUUIDTest) {
Austin Schuh72e65682021-09-02 11:37:05 -07001791 const UUID pi1_boot0 = UUID::Random();
1792 const UUID pi2_boot0 = UUID::Random();
1793 const UUID pi2_boot1 = UUID::Random();
1794 const UUID pi3_boot0 = UUID::Random();
1795 UUID expected_boot_uuid = pi2_boot0;
James Kuszmaul6d6b2282024-05-22 09:51:15 -07001796 int boot_number = 0;
1797 monotonic_clock::time_point expected_connection_time;
Austin Schuh20ac95d2020-12-05 17:24:19 -08001798
Austin Schuh58646e22021-08-23 23:51:46 -07001799 message_bridge::TestingTimeConverter time(
1800 configuration::NodesCount(&config.message()));
1801 SimulatedEventLoopFactory factory(&config.message());
1802 factory.SetTimeConverter(&time);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001803
Austin Schuh58646e22021-08-23 23:51:46 -07001804 const size_t pi1_index =
1805 configuration::GetNodeIndex(&config.message(), "pi1");
1806 const size_t pi2_index =
1807 configuration::GetNodeIndex(&config.message(), "pi2");
1808 const size_t pi3_index =
1809 configuration::GetNodeIndex(&config.message(), "pi3");
Austin Schuh20ac95d2020-12-05 17:24:19 -08001810
Austin Schuh58646e22021-08-23 23:51:46 -07001811 {
1812 time.AddNextTimestamp(distributed_clock::epoch(),
1813 {BootTimestamp::epoch(), BootTimestamp::epoch(),
1814 BootTimestamp::epoch()});
1815
1816 const chrono::nanoseconds dt = chrono::milliseconds(2001);
1817
1818 time.AddNextTimestamp(
1819 distributed_clock::epoch() + dt,
1820 {BootTimestamp::epoch() + dt,
1821 BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
1822 BootTimestamp::epoch() + dt});
1823
1824 time.set_boot_uuid(pi1_index, 0, pi1_boot0);
1825 time.set_boot_uuid(pi2_index, 0, pi2_boot0);
1826 time.set_boot_uuid(pi2_index, 1, pi2_boot1);
1827 time.set_boot_uuid(pi3_index, 0, pi3_boot0);
1828 }
1829
1830 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1831 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
1832
1833 pi1->OnStartup([pi1]() { pi1->AlwaysStart<Ping>("ping"); });
1834 pi2->OnStartup([pi2]() { pi2->AlwaysStart<Pong>("pong"); });
Austin Schuh20ac95d2020-12-05 17:24:19 -08001835
1836 std::unique_ptr<EventLoop> pi1_remote_timestamp =
Austin Schuh58646e22021-08-23 23:51:46 -07001837 pi1->MakeEventLoop("pi1_remote_timestamp");
Austin Schuh20ac95d2020-12-05 17:24:19 -08001838
1839 int timestamp_count = 0;
1840 pi1_remote_timestamp->MakeWatcher(
Austin Schuh8902fa52021-03-14 22:39:24 -07001841 "/pi2/aos", [&expected_boot_uuid,
1842 &pi1_remote_timestamp](const message_bridge::Timestamp &) {
Austin Schuha9012be2021-07-21 15:19:11 -07001843 EXPECT_EQ(pi1_remote_timestamp->context().source_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -07001844 expected_boot_uuid);
1845 });
1846 pi1_remote_timestamp->MakeWatcher(
1847 "/test",
1848 [&expected_boot_uuid, &pi1_remote_timestamp](const examples::Pong &) {
Austin Schuha9012be2021-07-21 15:19:11 -07001849 EXPECT_EQ(pi1_remote_timestamp->context().source_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -07001850 expected_boot_uuid);
1851 });
1852 pi1_remote_timestamp->MakeWatcher(
Austin Schuh89c9b812021-02-20 14:42:10 -08001853 shared() ? "/pi1/aos/remote_timestamps/pi2"
1854 : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping",
Austin Schuh20ac95d2020-12-05 17:24:19 -08001855 [&timestamp_count, &expected_boot_uuid](const RemoteMessage &header) {
1856 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuhcdd90272021-03-15 12:46:16 -07001857 EXPECT_EQ(UUID::FromVector(header.boot_uuid()), expected_boot_uuid);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001858 VLOG(1) << aos::FlatbufferToJson(&header);
1859 ++timestamp_count;
1860 });
1861
1862 int pi1_server_statistics_count = 0;
Austin Schuh58646e22021-08-23 23:51:46 -07001863 bool first_pi1_server_statistics = true;
James Kuszmaul6d6b2282024-05-22 09:51:15 -07001864 expected_connection_time = pi1->monotonic_now();
Austin Schuh20ac95d2020-12-05 17:24:19 -08001865 pi1_remote_timestamp->MakeWatcher(
Austin Schuh367a7f42021-11-23 23:04:36 -08001866 "/pi1/aos",
1867 [&pi1_server_statistics_count, &expected_boot_uuid,
1868 &expected_connection_time, &first_pi1_server_statistics,
1869 &boot_number](const message_bridge::ServerStatistics &stats) {
Austin Schuh20ac95d2020-12-05 17:24:19 -08001870 VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
1871 for (const message_bridge::ServerConnection *connection :
1872 *stats.connections()) {
Austin Schuh58646e22021-08-23 23:51:46 -07001873 if (connection->state() == message_bridge::State::CONNECTED) {
1874 ASSERT_TRUE(connection->has_boot_uuid());
1875 }
1876 if (!first_pi1_server_statistics) {
1877 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1878 }
Austin Schuh20ac95d2020-12-05 17:24:19 -08001879 if (connection->node()->name()->string_view() == "pi2") {
Austin Schuh58646e22021-08-23 23:51:46 -07001880 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1881 ASSERT_TRUE(connection->has_boot_uuid());
Austin Schuh20ac95d2020-12-05 17:24:19 -08001882 EXPECT_EQ(expected_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -07001883 UUID::FromString(connection->boot_uuid()))
Austin Schuh20ac95d2020-12-05 17:24:19 -08001884 << " : Got " << aos::FlatbufferToJson(&stats);
Austin Schuh367a7f42021-11-23 23:04:36 -08001885 EXPECT_EQ(monotonic_clock::time_point(chrono::nanoseconds(
1886 connection->connected_since_time())),
1887 expected_connection_time);
1888 EXPECT_EQ(boot_number + 1, connection->connection_count());
Austin Schuh20ac95d2020-12-05 17:24:19 -08001889 ++pi1_server_statistics_count;
1890 }
1891 }
Austin Schuh58646e22021-08-23 23:51:46 -07001892 first_pi1_server_statistics = false;
Austin Schuh20ac95d2020-12-05 17:24:19 -08001893 });
1894
Austin Schuh58646e22021-08-23 23:51:46 -07001895 int pi1_client_statistics_count = 0;
1896 pi1_remote_timestamp->MakeWatcher(
Austin Schuh367a7f42021-11-23 23:04:36 -08001897 "/pi1/aos", [&pi1_client_statistics_count, &expected_boot_uuid,
1898 &expected_connection_time, &boot_number](
Austin Schuh58646e22021-08-23 23:51:46 -07001899 const message_bridge::ClientStatistics &stats) {
1900 VLOG(1) << "pi1 ClientStatistics " << FlatbufferToJson(&stats);
1901 for (const message_bridge::ClientConnection *connection :
1902 *stats.connections()) {
1903 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1904 if (connection->node()->name()->string_view() == "pi2") {
1905 ++pi1_client_statistics_count;
Austin Schuh367a7f42021-11-23 23:04:36 -08001906 EXPECT_EQ(expected_boot_uuid,
1907 UUID::FromString(connection->boot_uuid()))
1908 << " : Got " << aos::FlatbufferToJson(&stats);
1909 EXPECT_EQ(monotonic_clock::time_point(chrono::nanoseconds(
1910 connection->connected_since_time())),
1911 expected_connection_time);
1912 EXPECT_EQ(boot_number + 1, connection->connection_count());
1913 } else {
1914 EXPECT_EQ(connection->connected_since_time(), 0);
1915 EXPECT_EQ(1, connection->connection_count());
Austin Schuh58646e22021-08-23 23:51:46 -07001916 }
1917 }
1918 });
1919
1920 // Confirm that reboot changes the UUID.
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -07001921 pi2->OnShutdown([&expected_boot_uuid, &boot_number, &expected_connection_time,
1922 pi1, pi2, pi2_boot1]() {
1923 expected_boot_uuid = pi2_boot1;
1924 ++boot_number;
1925 LOG(INFO) << "OnShutdown triggered for pi2";
1926 pi2->OnStartup(
1927 [&expected_boot_uuid, &expected_connection_time, pi1, pi2]() {
1928 EXPECT_EQ(expected_boot_uuid, pi2->boot_uuid());
1929 expected_connection_time = pi1->monotonic_now();
1930 });
1931 });
Austin Schuh58646e22021-08-23 23:51:46 -07001932
Austin Schuh20ac95d2020-12-05 17:24:19 -08001933 // Let a couple of ServerStatistics messages show up before rebooting.
Austin Schuh58646e22021-08-23 23:51:46 -07001934 factory.RunFor(chrono::milliseconds(2002));
Austin Schuh20ac95d2020-12-05 17:24:19 -08001935
1936 EXPECT_GT(timestamp_count, 100);
1937 EXPECT_GE(pi1_server_statistics_count, 1u);
1938
Austin Schuh20ac95d2020-12-05 17:24:19 -08001939 timestamp_count = 0;
1940 pi1_server_statistics_count = 0;
1941
Austin Schuh58646e22021-08-23 23:51:46 -07001942 factory.RunFor(chrono::milliseconds(2000));
Austin Schuh20ac95d2020-12-05 17:24:19 -08001943 EXPECT_GT(timestamp_count, 100);
1944 EXPECT_GE(pi1_server_statistics_count, 1u);
1945}
1946
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -07001947INSTANTIATE_TEST_SUITE_P(
Austin Schuh89c9b812021-02-20 14:42:10 -08001948 All, RemoteMessageSimulatedEventLoopTest,
1949 ::testing::Values(
1950 Param{"multinode_pingpong_test_combined_config.json", true},
1951 Param{"multinode_pingpong_test_split_config.json", false}));
1952
Austin Schuh58646e22021-08-23 23:51:46 -07001953// Tests that Startup and Shutdown do reasonable things.
1954TEST(SimulatedEventLoopTest, MultinodePingPongStartup) {
1955 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
1956 aos::configuration::ReadConfig(
1957 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
1958
Austin Schuh72e65682021-09-02 11:37:05 -07001959 size_t pi1_shutdown_counter = 0;
1960 size_t pi2_shutdown_counter = 0;
1961 MessageCounter<examples::Pong> *pi1_pong_counter = nullptr;
1962 MessageCounter<examples::Ping> *pi2_ping_counter = nullptr;
1963
Austin Schuh58646e22021-08-23 23:51:46 -07001964 message_bridge::TestingTimeConverter time(
1965 configuration::NodesCount(&config.message()));
1966 SimulatedEventLoopFactory factory(&config.message());
1967 factory.SetTimeConverter(&time);
1968 time.AddNextTimestamp(
1969 distributed_clock::epoch(),
1970 {BootTimestamp::epoch(), BootTimestamp::epoch(), BootTimestamp::epoch()});
1971
1972 const chrono::nanoseconds dt = chrono::seconds(10) + chrono::milliseconds(6);
1973
1974 time.AddNextTimestamp(
1975 distributed_clock::epoch() + dt,
1976 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
1977 BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
1978 BootTimestamp::epoch() + dt});
1979
1980 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1981 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
1982
1983 // Configure startup to start Ping and Pong, and count.
1984 size_t pi1_startup_counter = 0;
1985 size_t pi2_startup_counter = 0;
1986 pi1->OnStartup([pi1]() {
1987 LOG(INFO) << "Made ping";
1988 pi1->AlwaysStart<Ping>("ping");
1989 });
1990 pi1->OnStartup([&pi1_startup_counter]() { ++pi1_startup_counter; });
1991 pi2->OnStartup([pi2]() {
1992 LOG(INFO) << "Made pong";
1993 pi2->AlwaysStart<Pong>("pong");
1994 });
1995 pi2->OnStartup([&pi2_startup_counter]() { ++pi2_startup_counter; });
1996
1997 // Shutdown just counts.
Austin Schuh58646e22021-08-23 23:51:46 -07001998 pi1->OnShutdown([&pi1_shutdown_counter]() { ++pi1_shutdown_counter; });
1999 pi2->OnShutdown([&pi2_shutdown_counter]() { ++pi2_shutdown_counter; });
2000
Austin Schuh58646e22021-08-23 23:51:46 -07002001 // Automatically make counters on startup.
2002 pi1->OnStartup([&pi1_pong_counter, pi1]() {
2003 pi1_pong_counter = pi1->AlwaysStart<MessageCounter<examples::Pong>>(
2004 "pi1_pong_counter", "/test");
2005 });
2006 pi1->OnShutdown([&pi1_pong_counter]() { pi1_pong_counter = nullptr; });
2007 pi2->OnStartup([&pi2_ping_counter, pi2]() {
2008 pi2_ping_counter = pi2->AlwaysStart<MessageCounter<examples::Ping>>(
2009 "pi2_ping_counter", "/test");
2010 });
2011 pi2->OnShutdown([&pi2_ping_counter]() { pi2_ping_counter = nullptr; });
2012
2013 EXPECT_EQ(pi2_ping_counter, nullptr);
2014 EXPECT_EQ(pi1_pong_counter, nullptr);
2015
2016 EXPECT_EQ(pi1_startup_counter, 0u);
2017 EXPECT_EQ(pi2_startup_counter, 0u);
2018 EXPECT_EQ(pi1_shutdown_counter, 0u);
2019 EXPECT_EQ(pi2_shutdown_counter, 0u);
2020
2021 factory.RunFor(chrono::seconds(10) + chrono::milliseconds(5));
2022 EXPECT_EQ(pi1_startup_counter, 1u);
2023 EXPECT_EQ(pi2_startup_counter, 1u);
2024 EXPECT_EQ(pi1_shutdown_counter, 0u);
2025 EXPECT_EQ(pi2_shutdown_counter, 0u);
2026 EXPECT_EQ(pi2_ping_counter->count(), 1001);
2027 EXPECT_EQ(pi1_pong_counter->count(), 1001);
2028
2029 LOG(INFO) << pi1->monotonic_now();
2030 LOG(INFO) << pi2->monotonic_now();
2031
2032 factory.RunFor(chrono::seconds(5) + chrono::milliseconds(5));
2033
2034 EXPECT_EQ(pi1_startup_counter, 2u);
2035 EXPECT_EQ(pi2_startup_counter, 2u);
2036 EXPECT_EQ(pi1_shutdown_counter, 1u);
2037 EXPECT_EQ(pi2_shutdown_counter, 1u);
2038 EXPECT_EQ(pi2_ping_counter->count(), 501);
2039 EXPECT_EQ(pi1_pong_counter->count(), 501);
2040}
2041
2042// Tests that OnStartup handlers can be added after running and get called, and
2043// can't be called when running.
2044TEST(SimulatedEventLoopDeathTest, OnStartupWhileRunning) {
2045 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2046 aos::configuration::ReadConfig(
2047 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2048
2049 // Test that we can add startup handlers as long as we aren't running, and
2050 // they get run when Run gets called again.
2051 // Test that adding a startup handler when running fails.
2052 //
2053 // Test shutdown handlers get called on destruction.
2054 SimulatedEventLoopFactory factory(&config.message());
2055
2056 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2057
2058 int startup_count0 = 0;
2059 int startup_count1 = 0;
2060
2061 pi1->OnStartup([&]() { ++startup_count0; });
2062 EXPECT_EQ(startup_count0, 0);
2063 EXPECT_EQ(startup_count1, 0);
2064
2065 factory.RunFor(chrono::nanoseconds(1));
2066 EXPECT_EQ(startup_count0, 1);
2067 EXPECT_EQ(startup_count1, 0);
2068
2069 pi1->OnStartup([&]() { ++startup_count1; });
2070 EXPECT_EQ(startup_count0, 1);
2071 EXPECT_EQ(startup_count1, 0);
2072
2073 factory.RunFor(chrono::nanoseconds(1));
2074 EXPECT_EQ(startup_count0, 1);
2075 EXPECT_EQ(startup_count1, 1);
2076
2077 std::unique_ptr<EventLoop> loop = pi1->MakeEventLoop("foo");
2078 loop->OnRun([&]() { pi1->OnStartup([]() {}); });
2079
2080 EXPECT_DEATH({ factory.RunFor(chrono::nanoseconds(1)); },
2081 "Can only register OnStartup handlers when not running.");
2082}
2083
2084// Tests that OnStartup handlers can be added after running and get called, and
2085// all the handlers get called on reboot. Shutdown handlers are tested the same
2086// way.
2087TEST(SimulatedEventLoopTest, OnStartupShutdownAllRestarts) {
2088 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2089 aos::configuration::ReadConfig(
2090 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2091
Austin Schuh72e65682021-09-02 11:37:05 -07002092 int startup_count0 = 0;
2093 int shutdown_count0 = 0;
2094 int startup_count1 = 0;
2095 int shutdown_count1 = 0;
2096
Austin Schuh58646e22021-08-23 23:51:46 -07002097 message_bridge::TestingTimeConverter time(
2098 configuration::NodesCount(&config.message()));
2099 SimulatedEventLoopFactory factory(&config.message());
2100 factory.SetTimeConverter(&time);
2101 time.StartEqual();
2102
2103 const chrono::nanoseconds dt = chrono::seconds(10);
2104 time.RebootAt(0, distributed_clock::epoch() + dt);
2105 time.RebootAt(0, distributed_clock::epoch() + 2 * dt);
2106
2107 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2108
Austin Schuh58646e22021-08-23 23:51:46 -07002109 pi1->OnStartup([&]() { ++startup_count0; });
2110 pi1->OnShutdown([&]() { ++shutdown_count0; });
2111 EXPECT_EQ(startup_count0, 0);
2112 EXPECT_EQ(startup_count1, 0);
2113 EXPECT_EQ(shutdown_count0, 0);
2114 EXPECT_EQ(shutdown_count1, 0);
2115
2116 factory.RunFor(chrono::nanoseconds(1));
2117 EXPECT_EQ(startup_count0, 1);
2118 EXPECT_EQ(startup_count1, 0);
2119 EXPECT_EQ(shutdown_count0, 0);
2120 EXPECT_EQ(shutdown_count1, 0);
2121
2122 pi1->OnStartup([&]() { ++startup_count1; });
2123 EXPECT_EQ(startup_count0, 1);
2124 EXPECT_EQ(startup_count1, 0);
2125 EXPECT_EQ(shutdown_count0, 0);
2126 EXPECT_EQ(shutdown_count1, 0);
2127
2128 factory.RunFor(chrono::nanoseconds(1));
2129 EXPECT_EQ(startup_count0, 1);
2130 EXPECT_EQ(startup_count1, 1);
2131 EXPECT_EQ(shutdown_count0, 0);
2132 EXPECT_EQ(shutdown_count1, 0);
2133
2134 factory.RunFor(chrono::seconds(15));
2135
2136 EXPECT_EQ(startup_count0, 2);
2137 EXPECT_EQ(startup_count1, 2);
2138 EXPECT_EQ(shutdown_count0, 1);
2139 EXPECT_EQ(shutdown_count1, 0);
2140
2141 pi1->OnShutdown([&]() { ++shutdown_count1; });
2142 factory.RunFor(chrono::seconds(10));
2143
2144 EXPECT_EQ(startup_count0, 3);
2145 EXPECT_EQ(startup_count1, 3);
2146 EXPECT_EQ(shutdown_count0, 2);
2147 EXPECT_EQ(shutdown_count1, 1);
2148}
2149
2150// Tests that event loops which outlive shutdown crash.
2151TEST(SimulatedEventLoopDeathTest, EventLoopOutlivesReboot) {
2152 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2153 aos::configuration::ReadConfig(
2154 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2155
2156 message_bridge::TestingTimeConverter time(
2157 configuration::NodesCount(&config.message()));
2158 SimulatedEventLoopFactory factory(&config.message());
2159 factory.SetTimeConverter(&time);
2160 time.StartEqual();
2161
2162 const chrono::nanoseconds dt = chrono::seconds(10);
2163 time.RebootAt(0, distributed_clock::epoch() + dt);
2164
2165 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2166
2167 std::unique_ptr<EventLoop> loop = pi1->MakeEventLoop("foo");
2168
2169 EXPECT_DEATH({ factory.RunFor(dt * 2); }, "Event loop");
2170}
2171
Brian Silvermane1fe2512022-08-14 23:18:50 -07002172// Test that an ExitHandle outliving its factory is caught.
2173TEST(SimulatedEventLoopDeathTest, ExitHandleOutlivesFactory) {
2174 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2175 aos::configuration::ReadConfig(
2176 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2177 auto factory = std::make_unique<SimulatedEventLoopFactory>(&config.message());
2178 NodeEventLoopFactory *pi1 = factory->GetNodeEventLoopFactory("pi1");
2179 std::unique_ptr<EventLoop> loop = pi1->MakeEventLoop("foo");
2180 auto exit_handle = factory->MakeExitHandle();
2181 EXPECT_DEATH(factory.reset(),
2182 "All ExitHandles must be destroyed before the factory");
2183}
2184
Austin Schuh3e31f912023-08-21 21:29:10 -07002185// Test that AllowApplicationCreationDuring can't happen in OnRun callbacks.
2186TEST(SimulatedEventLoopDeathTest, AllowApplicationCreationDuringInOnRun) {
2187 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2188 aos::configuration::ReadConfig(
2189 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2190 auto factory = std::make_unique<SimulatedEventLoopFactory>(&config.message());
2191 NodeEventLoopFactory *pi1 = factory->GetNodeEventLoopFactory("pi1");
2192 std::unique_ptr<EventLoop> loop = pi1->MakeEventLoop("foo");
2193 loop->OnRun([&]() { factory->AllowApplicationCreationDuring([]() {}); });
2194 EXPECT_DEATH(factory->RunFor(chrono::seconds(1)), "OnRun");
2195}
2196
Austin Schuh58646e22021-08-23 23:51:46 -07002197// Tests that messages don't survive a reboot of a node.
2198TEST(SimulatedEventLoopTest, ChannelClearedOnReboot) {
2199 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2200 aos::configuration::ReadConfig(
2201 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2202
2203 message_bridge::TestingTimeConverter time(
2204 configuration::NodesCount(&config.message()));
2205 SimulatedEventLoopFactory factory(&config.message());
2206 factory.SetTimeConverter(&time);
2207 time.StartEqual();
2208
2209 const chrono::nanoseconds dt = chrono::seconds(10);
2210 time.RebootAt(0, distributed_clock::epoch() + dt);
2211
2212 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2213
2214 const UUID boot_uuid = pi1->boot_uuid();
2215 EXPECT_NE(boot_uuid, UUID::Zero());
2216
2217 {
2218 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
2219 aos::Sender<examples::Ping> test_message_sender =
2220 ping_event_loop->MakeSender<examples::Ping>("/reliable");
2221 SendPing(&test_message_sender, 1);
2222 }
2223
2224 factory.RunFor(chrono::seconds(5));
2225
2226 {
2227 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
2228 aos::Fetcher<examples::Ping> fetcher =
2229 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
2230 EXPECT_TRUE(fetcher.Fetch());
2231 }
2232
2233 factory.RunFor(chrono::seconds(10));
2234
2235 {
2236 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
2237 aos::Fetcher<examples::Ping> fetcher =
2238 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
2239 EXPECT_FALSE(fetcher.Fetch());
2240 }
2241 EXPECT_NE(boot_uuid, pi1->boot_uuid());
2242}
2243
2244// Tests that reliable messages get resent on reboot.
2245TEST(SimulatedEventLoopTest, ReliableMessageResentOnReboot) {
2246 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2247 aos::configuration::ReadConfig(
2248 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2249
2250 message_bridge::TestingTimeConverter time(
2251 configuration::NodesCount(&config.message()));
2252 SimulatedEventLoopFactory factory(&config.message());
2253 factory.SetTimeConverter(&time);
2254 time.StartEqual();
2255
2256 const chrono::nanoseconds dt = chrono::seconds(1);
2257 time.RebootAt(1, distributed_clock::epoch() + dt);
2258
2259 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2260 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2261
2262 const UUID pi1_boot_uuid = pi1->boot_uuid();
2263 const UUID pi2_boot_uuid = pi2->boot_uuid();
2264 EXPECT_NE(pi1_boot_uuid, UUID::Zero());
2265 EXPECT_NE(pi2_boot_uuid, UUID::Zero());
2266
2267 {
2268 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
2269 aos::Sender<examples::Ping> test_message_sender =
2270 ping_event_loop->MakeSender<examples::Ping>("/reliable");
2271 SendPing(&test_message_sender, 1);
2272 }
2273
2274 factory.RunFor(chrono::milliseconds(500));
2275
2276 {
2277 ::std::unique_ptr<EventLoop> ping_event_loop = pi2->MakeEventLoop("pong");
2278 aos::Fetcher<examples::Ping> fetcher =
2279 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
Austin Schuhac6d89e2024-03-27 14:56:09 -07002280 ASSERT_TRUE(fetcher.Fetch());
2281 EXPECT_EQ(fetcher.context().monotonic_remote_time,
2282 monotonic_clock::epoch());
2283 // Message bridge picks up the Ping message immediately on reboot.
2284 EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
2285 monotonic_clock::epoch());
2286 EXPECT_EQ(fetcher.context().monotonic_event_time,
2287 monotonic_clock::epoch() + factory.network_delay());
2288 ASSERT_FALSE(fetcher.Fetch());
Austin Schuh58646e22021-08-23 23:51:46 -07002289 }
2290
2291 factory.RunFor(chrono::seconds(1));
2292
2293 {
2294 ::std::unique_ptr<EventLoop> ping_event_loop = pi2->MakeEventLoop("pong");
2295 aos::Fetcher<examples::Ping> fetcher =
2296 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
Austin Schuhac6d89e2024-03-27 14:56:09 -07002297 ASSERT_TRUE(fetcher.Fetch());
2298 EXPECT_EQ(fetcher.context().monotonic_remote_time,
2299 monotonic_clock::epoch());
2300 // Message bridge picks up the Ping message immediately on reboot.
2301 EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
2302 monotonic_clock::epoch() + chrono::seconds(1));
2303 EXPECT_EQ(fetcher.context().monotonic_event_time,
2304 monotonic_clock::epoch() + factory.network_delay());
2305 ASSERT_FALSE(fetcher.Fetch());
Austin Schuh58646e22021-08-23 23:51:46 -07002306 }
2307 EXPECT_NE(pi2_boot_uuid, pi2->boot_uuid());
2308}
2309
James Kuszmaul86e86c32022-07-21 17:39:47 -07002310TEST(SimulatedEventLoopTest, ReliableMessageSentOnStaggeredBoot) {
2311 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2312 aos::configuration::ReadConfig(
2313 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2314
2315 message_bridge::TestingTimeConverter time(
2316 configuration::NodesCount(&config.message()));
2317 time.AddNextTimestamp(
2318 distributed_clock::epoch(),
2319 {BootTimestamp{0, monotonic_clock::epoch()},
2320 BootTimestamp{0, monotonic_clock::epoch() - chrono::seconds(1)},
2321 BootTimestamp{0, monotonic_clock::epoch()}});
2322 SimulatedEventLoopFactory factory(&config.message());
2323 factory.SetTimeConverter(&time);
2324
2325 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2326 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2327
2328 const UUID pi1_boot_uuid = pi1->boot_uuid();
2329 const UUID pi2_boot_uuid = pi2->boot_uuid();
2330 EXPECT_NE(pi1_boot_uuid, UUID::Zero());
2331 EXPECT_NE(pi2_boot_uuid, UUID::Zero());
2332
2333 {
2334 ::std::unique_ptr<EventLoop> pi1_event_loop = pi1->MakeEventLoop("ping");
2335 aos::Sender<examples::Ping> pi1_sender =
2336 pi1_event_loop->MakeSender<examples::Ping>("/reliable");
2337 SendPing(&pi1_sender, 1);
2338 }
2339 ::std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("ping");
2340 aos::Sender<examples::Ping> pi2_sender =
2341 pi2_event_loop->MakeSender<examples::Ping>("/reliable2");
2342 SendPing(&pi2_sender, 1);
2343 // Verify that we staggered the OnRun callback correctly.
2344 pi2_event_loop->OnRun([pi1, pi2]() {
2345 EXPECT_EQ(pi1->monotonic_now(),
2346 monotonic_clock::epoch() + std::chrono::seconds(1));
2347 EXPECT_EQ(pi2->monotonic_now(), monotonic_clock::epoch());
2348 });
2349
2350 factory.RunFor(chrono::seconds(2));
2351
2352 {
2353 ::std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("pong");
2354 aos::Fetcher<examples::Ping> fetcher =
2355 pi2_event_loop->MakeFetcher<examples::Ping>("/reliable");
2356 ASSERT_TRUE(fetcher.Fetch());
2357 EXPECT_EQ(fetcher.context().monotonic_event_time,
2358 monotonic_clock::epoch() + factory.network_delay());
2359 EXPECT_EQ(fetcher.context().monotonic_remote_time,
2360 monotonic_clock::epoch());
Austin Schuhac6d89e2024-03-27 14:56:09 -07002361 EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
2362 monotonic_clock::epoch() + chrono::seconds(1));
James Kuszmaul86e86c32022-07-21 17:39:47 -07002363 }
2364 {
2365 ::std::unique_ptr<EventLoop> pi1_event_loop = pi1->MakeEventLoop("pong");
2366 aos::Fetcher<examples::Ping> fetcher =
2367 pi1_event_loop->MakeFetcher<examples::Ping>("/reliable2");
2368 ASSERT_TRUE(fetcher.Fetch());
2369 EXPECT_EQ(fetcher.context().monotonic_event_time,
2370 monotonic_clock::epoch() + std::chrono::seconds(1) +
2371 factory.network_delay());
2372 EXPECT_EQ(fetcher.context().monotonic_remote_time,
2373 monotonic_clock::epoch() - std::chrono::seconds(1));
Austin Schuhac6d89e2024-03-27 14:56:09 -07002374 EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
2375 monotonic_clock::epoch());
James Kuszmaul86e86c32022-07-21 17:39:47 -07002376 }
2377}
2378
Austin Schuh48205e62021-11-12 14:13:18 -08002379class SimulatedEventLoopDisconnectTest : public ::testing::Test {
2380 public:
2381 SimulatedEventLoopDisconnectTest()
2382 : config(aos::configuration::ReadConfig(ArtifactPath(
2383 "aos/events/multinode_pingpong_test_split_config.json"))),
2384 time(configuration::NodesCount(&config.message())),
2385 factory(&config.message()) {
2386 factory.SetTimeConverter(&time);
2387 }
2388
2389 void VerifyChannels(std::set<const aos::Channel *> statistics_channels,
2390 const monotonic_clock::time_point allowable_message_time,
2391 std::set<const aos::Node *> empty_nodes) {
2392 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2393 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2394 std::unique_ptr<aos::EventLoop> pi1_event_loop =
2395 pi1->MakeEventLoop("fetcher");
2396 std::unique_ptr<aos::EventLoop> pi2_event_loop =
2397 pi2->MakeEventLoop("fetcher");
2398 for (const aos::Channel *channel : *factory.configuration()->channels()) {
2399 if (configuration::ChannelIsReadableOnNode(channel,
2400 pi1_event_loop->node())) {
2401 std::unique_ptr<aos::RawFetcher> fetcher =
2402 pi1_event_loop->MakeRawFetcher(channel);
2403 if (statistics_channels.find(channel) == statistics_channels.end() ||
2404 empty_nodes.find(pi1_event_loop->node()) != empty_nodes.end()) {
2405 EXPECT_FALSE(fetcher->Fetch() &&
2406 fetcher->context().monotonic_event_time >
2407 allowable_message_time)
2408 << ": Found recent message on channel "
2409 << configuration::CleanedChannelToString(channel) << " and time "
2410 << fetcher->context().monotonic_event_time << " > "
2411 << allowable_message_time << " on pi1";
2412 } else {
2413 EXPECT_TRUE(fetcher->Fetch() &&
2414 fetcher->context().monotonic_event_time >=
2415 allowable_message_time)
2416 << ": Didn't find recent message on channel "
2417 << configuration::CleanedChannelToString(channel) << " on pi1";
2418 }
2419 }
2420 if (configuration::ChannelIsReadableOnNode(channel,
2421 pi2_event_loop->node())) {
2422 std::unique_ptr<aos::RawFetcher> fetcher =
2423 pi2_event_loop->MakeRawFetcher(channel);
2424 if (statistics_channels.find(channel) == statistics_channels.end() ||
2425 empty_nodes.find(pi2_event_loop->node()) != empty_nodes.end()) {
2426 EXPECT_FALSE(fetcher->Fetch() &&
2427 fetcher->context().monotonic_event_time >
2428 allowable_message_time)
2429 << ": Found message on channel "
2430 << configuration::CleanedChannelToString(channel) << " and time "
2431 << fetcher->context().monotonic_event_time << " > "
2432 << allowable_message_time << " on pi2";
2433 } else {
2434 EXPECT_TRUE(fetcher->Fetch() &&
2435 fetcher->context().monotonic_event_time >=
2436 allowable_message_time)
2437 << ": Didn't find message on channel "
2438 << configuration::CleanedChannelToString(channel) << " on pi2";
2439 }
2440 }
2441 }
2442 }
2443
2444 aos::FlatbufferDetachedBuffer<aos::Configuration> config;
2445
2446 message_bridge::TestingTimeConverter time;
2447 SimulatedEventLoopFactory factory;
2448};
2449
2450// Tests that if we have message bridge client/server disabled, and timing
2451// reports disabled, no messages are sent. Also tests that we can disconnect a
2452// node and disable statistics on it and it actually fully disconnects.
2453TEST_F(SimulatedEventLoopDisconnectTest, NoMessagesWhenDisabled) {
2454 time.StartEqual();
2455 factory.SkipTimingReport();
2456 factory.DisableStatistics();
2457
2458 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2459 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2460
2461 std::unique_ptr<aos::EventLoop> pi1_event_loop =
2462 pi1->MakeEventLoop("fetcher");
2463 std::unique_ptr<aos::EventLoop> pi2_event_loop =
2464 pi2->MakeEventLoop("fetcher");
2465
2466 factory.RunFor(chrono::milliseconds(100000));
2467
2468 // Confirm no messages are sent if we've configured them all off.
2469 VerifyChannels({}, monotonic_clock::min_time, {});
2470
2471 // Now, confirm that all the message_bridge channels come back when we
2472 // re-enable.
2473 factory.EnableStatistics();
2474
2475 factory.RunFor(chrono::milliseconds(10050));
2476
2477 // Build up the list of all the messages we expect when we come back.
2478 {
2479 std::set<const aos::Channel *> statistics_channels;
Brian Silverman18c8aac2022-01-02 00:03:37 -08002480 for (const std::pair<std::string_view, const Node *> &pi :
Austin Schuh48205e62021-11-12 14:13:18 -08002481 std::vector<std::pair<std::string_view, const Node *>>{
2482 {"/pi1/aos", pi1->node()},
2483 {"/pi2/aos", pi1->node()},
2484 {"/pi3/aos", pi1->node()}}) {
2485 statistics_channels.insert(configuration::GetChannel(
2486 factory.configuration(), pi.first, "aos.message_bridge.Timestamp", "",
2487 pi.second));
2488 statistics_channels.insert(configuration::GetChannel(
2489 factory.configuration(), pi.first,
2490 "aos.message_bridge.ServerStatistics", "", pi.second));
2491 statistics_channels.insert(configuration::GetChannel(
2492 factory.configuration(), pi.first,
2493 "aos.message_bridge.ClientStatistics", "", pi.second));
2494 }
2495
2496 statistics_channels.insert(configuration::GetChannel(
2497 factory.configuration(),
2498 "/pi1/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp",
2499 "aos.message_bridge.RemoteMessage", "", pi1->node()));
2500 statistics_channels.insert(configuration::GetChannel(
2501 factory.configuration(),
2502 "/pi2/aos/remote_timestamps/pi1/pi2/aos/aos-message_bridge-Timestamp",
2503 "aos.message_bridge.RemoteMessage", "", pi2->node()));
2504 VerifyChannels(statistics_channels, monotonic_clock::min_time, {});
2505 }
2506
2507 // Now test that we can disable the messages for a single node
2508 pi2->DisableStatistics();
2509 const aos::monotonic_clock::time_point statistics_disable_time =
2510 pi2->monotonic_now();
2511 factory.RunFor(chrono::milliseconds(10000));
2512
2513 // We should see a much smaller set of messages, but should still see messages
2514 // forwarded, mainly the timestamp message.
2515 {
2516 std::set<const aos::Channel *> statistics_channels;
Brian Silverman18c8aac2022-01-02 00:03:37 -08002517 for (const std::pair<std::string_view, const Node *> &pi :
Austin Schuh48205e62021-11-12 14:13:18 -08002518 std::vector<std::pair<std::string_view, const Node *>>{
2519 {"/pi1/aos", pi1->node()}, {"/pi3/aos", pi1->node()}}) {
2520 statistics_channels.insert(configuration::GetChannel(
2521 factory.configuration(), pi.first, "aos.message_bridge.Timestamp", "",
2522 pi.second));
2523 statistics_channels.insert(configuration::GetChannel(
2524 factory.configuration(), pi.first,
2525 "aos.message_bridge.ServerStatistics", "", pi.second));
2526 statistics_channels.insert(configuration::GetChannel(
2527 factory.configuration(), pi.first,
2528 "aos.message_bridge.ClientStatistics", "", pi.second));
2529 }
2530
2531 statistics_channels.insert(configuration::GetChannel(
2532 factory.configuration(),
2533 "/pi1/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp",
2534 "aos.message_bridge.RemoteMessage", "", pi1->node()));
2535 VerifyChannels(statistics_channels, statistics_disable_time, {});
2536 }
2537
2538 // Now, fully disconnect the node. This will completely quiet down pi2.
2539 pi1->Disconnect(pi2->node());
2540 pi2->Disconnect(pi1->node());
2541
2542 const aos::monotonic_clock::time_point disconnect_disable_time =
2543 pi2->monotonic_now();
2544 factory.RunFor(chrono::milliseconds(10000));
2545
2546 {
2547 std::set<const aos::Channel *> statistics_channels;
Brian Silverman18c8aac2022-01-02 00:03:37 -08002548 for (const std::pair<std::string_view, const Node *> &pi :
Austin Schuh48205e62021-11-12 14:13:18 -08002549 std::vector<std::pair<std::string_view, const Node *>>{
2550 {"/pi1/aos", pi1->node()}, {"/pi3/aos", pi1->node()}}) {
2551 statistics_channels.insert(configuration::GetChannel(
2552 factory.configuration(), pi.first, "aos.message_bridge.Timestamp", "",
2553 pi.second));
2554 statistics_channels.insert(configuration::GetChannel(
2555 factory.configuration(), pi.first,
2556 "aos.message_bridge.ServerStatistics", "", pi.second));
2557 statistics_channels.insert(configuration::GetChannel(
2558 factory.configuration(), pi.first,
2559 "aos.message_bridge.ClientStatistics", "", pi.second));
2560 }
2561
2562 VerifyChannels(statistics_channels, disconnect_disable_time, {pi2->node()});
2563 }
2564}
2565
Austin Schuh9cce6842024-04-02 18:55:44 -07002566// Struct to capture the expected time a message should be received (and it's
2567// value). This is from the perspective of the node receiving the message.
2568struct ExpectedTimestamps {
2569 // The time that the message was published on the sending node's monotonic
2570 // clock.
2571 monotonic_clock::time_point remote_time;
2572 // The time that the message was virtually transmitted over the virtual
2573 // network on the sending node's monotonic clock.
2574 monotonic_clock::time_point remote_transmit_time;
2575 // The time that the message was received on the receiving node's clock.
2576 monotonic_clock::time_point event_time;
2577 // The value inside the message.
2578 int value;
2579};
2580
Austin Schuhac6d89e2024-03-27 14:56:09 -07002581// Tests that rapidly sent messages get timestamped correctly.
2582TEST(SimulatedEventLoopTest, TransmitTimestamps) {
2583 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2584 aos::configuration::ReadConfig(
2585 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2586
2587 message_bridge::TestingTimeConverter time(
2588 configuration::NodesCount(&config.message()));
2589 SimulatedEventLoopFactory factory(&config.message());
2590 factory.SetTimeConverter(&time);
2591 time.StartEqual();
2592
2593 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2594 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2595
2596 ::std::unique_ptr<EventLoop> ping_event_loop = pi2->MakeEventLoop("pong");
2597 aos::Fetcher<examples::Ping> fetcher =
2598 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
2599 EXPECT_FALSE(fetcher.Fetch());
2600
2601 {
2602 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
Austin Schuh9cce6842024-04-02 18:55:44 -07002603 FunctionScheduler run_at(ping_event_loop.get());
Austin Schuhac6d89e2024-03-27 14:56:09 -07002604 aos::Sender<examples::Ping> test_message_sender =
2605 ping_event_loop->MakeSender<examples::Ping>("/reliable");
Austin Schuh9cce6842024-04-02 18:55:44 -07002606 aos::monotonic_clock::time_point now = ping_event_loop->monotonic_now();
Austin Schuhac6d89e2024-03-27 14:56:09 -07002607 for (const std::chrono::nanoseconds dt :
2608 {chrono::microseconds(5000), chrono::microseconds(1),
2609 chrono::microseconds(2), chrono::microseconds(70),
Austin Schuh9cce6842024-04-02 18:55:44 -07002610 chrono::microseconds(63), chrono::microseconds(140)}) {
2611 now += dt;
2612 run_at.ScheduleAt([&]() { SendPing(&test_message_sender, 1); }, now);
Austin Schuhac6d89e2024-03-27 14:56:09 -07002613 }
2614
Austin Schuh9cce6842024-04-02 18:55:44 -07002615 now += chrono::milliseconds(10);
2616
2617 factory.RunFor(now - ping_event_loop->monotonic_now());
Austin Schuhac6d89e2024-03-27 14:56:09 -07002618 }
2619
Austin Schuh9cce6842024-04-02 18:55:44 -07002620 const monotonic_clock::time_point e = monotonic_clock::epoch();
2621 const chrono::nanoseconds send_delay = factory.send_delay();
2622 const chrono::nanoseconds network_delay = factory.network_delay();
Austin Schuhac6d89e2024-03-27 14:56:09 -07002623
Austin Schuh9cce6842024-04-02 18:55:44 -07002624 const std::vector<ExpectedTimestamps> expected_values = {
2625 // First message shows up after wakeup + network delay as expected.
2626 ExpectedTimestamps{
2627 .remote_time = e + chrono::microseconds(5000),
2628 .remote_transmit_time = e + chrono::microseconds(5000) + send_delay,
2629 .event_time =
2630 e + chrono::microseconds(5000) + send_delay + network_delay,
2631 .value = 1,
2632 },
2633 // Next message is close enough that it gets picked up at the same wakeup.
2634 ExpectedTimestamps{
2635 .remote_time = e + chrono::microseconds(5001),
2636 .remote_transmit_time = e + chrono::microseconds(5000) + send_delay,
2637 .event_time =
2638 e + chrono::microseconds(5000) + send_delay + network_delay,
2639 .value = 1,
2640 },
2641 // Same for the third.
2642 ExpectedTimestamps{
2643 .remote_time = e + chrono::microseconds(5003),
2644 .remote_transmit_time = e + chrono::microseconds(5000) + send_delay,
2645 .event_time =
2646 e + chrono::microseconds(5000) + send_delay + network_delay,
2647 .value = 1,
2648 },
2649 // Fourth waits long enough to do the right thing.
2650 ExpectedTimestamps{
2651 .remote_time = e + chrono::microseconds(5073),
2652 .remote_transmit_time = e + chrono::microseconds(5073) + send_delay,
2653 .event_time =
2654 e + chrono::microseconds(5073) + send_delay + network_delay,
2655 .value = 1,
2656 },
2657 // Fifth waits long enough to do the right thing as well (but kicks off
2658 // while the fourth is in flight over the network).
2659 ExpectedTimestamps{
2660 .remote_time = e + chrono::microseconds(5136),
2661 .remote_transmit_time = e + chrono::microseconds(5136) + send_delay,
2662 .event_time =
2663 e + chrono::microseconds(5136) + send_delay + network_delay,
2664 .value = 1,
2665 },
2666 // Sixth waits long enough to do the right thing as well (but kicks off
2667 // while the fifth is in flight over the network and has almost landed).
2668 // The timer wakeup for the Timestamp message coming back will find the
2669 // sixth message a little bit early.
2670 ExpectedTimestamps{
2671 .remote_time = e + chrono::microseconds(5276),
2672 .remote_transmit_time = e + chrono::microseconds(5273) + send_delay,
2673 .event_time =
2674 e + chrono::microseconds(5273) + send_delay + network_delay,
2675 .value = 1,
2676 },
2677 };
Austin Schuhac6d89e2024-03-27 14:56:09 -07002678
Austin Schuh9cce6842024-04-02 18:55:44 -07002679 for (const ExpectedTimestamps value : expected_values) {
2680 ASSERT_TRUE(fetcher.FetchNext());
2681 EXPECT_EQ(fetcher.context().monotonic_remote_time, value.remote_time);
2682 EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
2683 value.remote_transmit_time);
2684 EXPECT_EQ(fetcher.context().monotonic_event_time, value.event_time);
2685 EXPECT_EQ(fetcher->value(), value.value);
2686 }
Austin Schuhac6d89e2024-03-27 14:56:09 -07002687
2688 ASSERT_FALSE(fetcher.FetchNext());
2689}
2690
2691// Tests that a reliable message gets forwarded if it was sent originally when
2692// nodes were disconnected.
2693TEST_F(SimulatedEventLoopDisconnectTest, ReliableMessageSendsOnConnect) {
2694 time.StartEqual();
2695 factory.SkipTimingReport();
2696 factory.DisableStatistics();
2697
2698 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2699 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2700
2701 // Fully disconnect the nodes.
2702 pi1->Disconnect(pi2->node());
2703 pi2->Disconnect(pi1->node());
2704
Austin Schuhac6d89e2024-03-27 14:56:09 -07002705 std::unique_ptr<aos::EventLoop> pi2_event_loop =
2706 pi2->MakeEventLoop("fetcher");
2707 aos::Fetcher<examples::Ping> pi2_reliable_fetcher =
2708 pi2_event_loop->MakeFetcher<examples::Ping>("/reliable");
2709
2710 factory.RunFor(chrono::milliseconds(100));
2711
2712 {
Austin Schuheeb86fc2024-04-04 20:12:39 -07002713 std::unique_ptr<aos::EventLoop> pi1_event_loop =
2714 pi1->MakeEventLoop("sender");
Austin Schuhac6d89e2024-03-27 14:56:09 -07002715 aos::Sender<examples::Ping> pi1_reliable_sender =
2716 pi1_event_loop->MakeSender<examples::Ping>("/reliable");
Austin Schuh9cce6842024-04-02 18:55:44 -07002717 FunctionScheduler run_at(pi1_event_loop.get());
2718 aos::monotonic_clock::time_point now = pi1_event_loop->monotonic_now();
Austin Schuhac6d89e2024-03-27 14:56:09 -07002719 for (int i = 0; i < 100; ++i) {
Austin Schuh9cce6842024-04-02 18:55:44 -07002720 run_at.ScheduleAt([&, i = i]() { SendPing(&pi1_reliable_sender, i); },
2721 now);
2722 now += chrono::milliseconds(100);
Austin Schuhac6d89e2024-03-27 14:56:09 -07002723 }
Austin Schuh9cce6842024-04-02 18:55:44 -07002724 now += chrono::milliseconds(50);
Austin Schuhac6d89e2024-03-27 14:56:09 -07002725
Austin Schuh9cce6842024-04-02 18:55:44 -07002726 factory.RunFor(now - pi1_event_loop->monotonic_now());
2727 }
Austin Schuhac6d89e2024-03-27 14:56:09 -07002728
2729 ASSERT_FALSE(pi2_reliable_fetcher.Fetch());
2730
2731 pi1->Connect(pi2->node());
2732 pi2->Connect(pi1->node());
2733
2734 factory.RunFor(chrono::milliseconds(1));
2735
2736 ASSERT_TRUE(pi2_reliable_fetcher.Fetch());
2737 ASSERT_EQ(pi2_reliable_fetcher.context().monotonic_remote_time,
2738 monotonic_clock::epoch() + chrono::milliseconds(10000));
2739 ASSERT_EQ(pi2_reliable_fetcher.context().monotonic_remote_transmit_time,
2740 monotonic_clock::epoch() + chrono::milliseconds(10150));
2741 ASSERT_EQ(pi2_reliable_fetcher.context().monotonic_event_time,
2742 monotonic_clock::epoch() + chrono::milliseconds(10150) +
2743 factory.network_delay());
2744 ASSERT_EQ(pi2_reliable_fetcher->value(), 99);
2745
Austin Schuh9cce6842024-04-02 18:55:44 -07002746 // TODO(austin): Verify that the dropped packet count increases.
2747
Austin Schuhac6d89e2024-03-27 14:56:09 -07002748 ASSERT_FALSE(pi2_reliable_fetcher.Fetch());
2749}
2750
Austin Schuh9cce6842024-04-02 18:55:44 -07002751// Tests that if we disconnect while a message is in various states of being
2752// queued, it gets either dropped or sent as expected.
2753TEST_F(SimulatedEventLoopDisconnectTest, MessageInFlightDuringDisconnect) {
2754 time.StartEqual();
2755 factory.SkipTimingReport();
2756 factory.DisableStatistics();
2757
2758 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2759 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2760
2761 std::unique_ptr<aos::EventLoop> pi1_event_loop = pi1->MakeEventLoop("sender");
2762
2763 std::unique_ptr<aos::EventLoop> pi2_event_loop =
2764 pi2->MakeEventLoop("fetcher");
2765 aos::Fetcher<examples::Ping> fetcher =
2766 pi2_event_loop->MakeFetcher<examples::Ping>("/unreliable");
2767
2768 ASSERT_FALSE(fetcher.Fetch());
2769
2770 aos::monotonic_clock::time_point now = pi1_event_loop->monotonic_now();
2771 {
2772 FunctionScheduler run_at(pi1_event_loop.get());
2773 aos::Sender<examples::Ping> pi1_sender =
2774 pi1_event_loop->MakeSender<examples::Ping>("/unreliable");
2775
2776 int i = 0;
2777 for (const std::chrono::nanoseconds dt :
2778 {chrono::microseconds(5000), chrono::microseconds(1),
2779 chrono::microseconds(2), chrono::microseconds(70),
2780 chrono::microseconds(63), chrono::microseconds(140),
2781 chrono::microseconds(160)}) {
2782 run_at.ScheduleAt(
2783 [&]() {
2784 pi1->Connect(pi2->node());
2785 pi2->Connect(pi1->node());
2786 },
2787 now);
2788
2789 now += chrono::milliseconds(100);
2790
2791 run_at.ScheduleAt([&, i = i]() { SendPing(&pi1_sender, i); }, now);
2792
2793 now += dt;
2794
2795 run_at.ScheduleAt(
2796 [&]() {
2797 // Fully disconnect the nodes.
2798 pi1->Disconnect(pi2->node());
2799 pi2->Disconnect(pi1->node());
2800 },
2801 now);
2802
2803 now += chrono::milliseconds(100) - dt;
2804 ++i;
2805 }
2806
2807 factory.RunFor(now - pi1_event_loop->monotonic_now());
2808 }
2809
2810 const monotonic_clock::time_point e = monotonic_clock::epoch();
2811 const chrono::nanoseconds send_delay = factory.send_delay();
2812 const chrono::nanoseconds network_delay = factory.network_delay();
2813
2814 const std::vector<ExpectedTimestamps> expected_values = {
2815 ExpectedTimestamps{
2816 .remote_time = e + chrono::milliseconds(100),
2817 .remote_transmit_time = e + chrono::milliseconds(100) + send_delay,
2818 .event_time =
2819 e + chrono::milliseconds(100) + send_delay + network_delay,
2820 .value = 0,
2821 },
2822 ExpectedTimestamps{
2823 .remote_time = e + chrono::milliseconds(1300),
2824 .remote_transmit_time = e + chrono::milliseconds(1300) + send_delay,
2825 .event_time =
2826 e + chrono::milliseconds(1300) + send_delay + network_delay,
2827 .value = 6,
2828 },
2829 };
2830
2831 for (const ExpectedTimestamps value : expected_values) {
2832 ASSERT_TRUE(fetcher.FetchNext());
2833 EXPECT_EQ(fetcher.context().monotonic_remote_time, value.remote_time);
2834 EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
2835 value.remote_transmit_time);
2836 EXPECT_EQ(fetcher.context().monotonic_event_time, value.event_time);
2837 EXPECT_EQ(fetcher->value(), value.value);
2838 }
2839
2840 // TODO(austin): Verify that the dropped packet count increases.
2841
2842 ASSERT_FALSE(fetcher.Fetch());
2843}
2844
2845class PingLogger {
2846 public:
2847 PingLogger(aos::EventLoop *event_loop, std::string_view channel,
2848 std::vector<std::pair<aos::Context, int>> *msgs)
2849 : event_loop_(event_loop),
2850 fetcher_(event_loop_->MakeFetcher<examples::Ping>(channel)),
2851 msgs_(msgs) {
2852 event_loop_->OnRun([this]() { CHECK(!fetcher_.Fetch()); });
2853 }
2854
2855 ~PingLogger() {
2856 while (fetcher_.FetchNext()) {
2857 msgs_->emplace_back(fetcher_.context(), fetcher_->value());
2858 }
2859 }
2860
2861 private:
2862 aos::EventLoop *event_loop_;
2863 aos::Fetcher<examples::Ping> fetcher_;
2864 std::vector<std::pair<aos::Context, int>> *msgs_;
2865};
2866
2867// Tests that rebooting while a message is in flight works as expected.
2868TEST_F(SimulatedEventLoopDisconnectTest, MessageInFlightDuringReboot) {
2869 time.StartEqual();
2870 for (int i = 0; i < 8; ++i) {
2871 time.RebootAt(1, distributed_clock::epoch() + chrono::seconds(10 * i));
2872 }
2873
2874 factory.SkipTimingReport();
2875 factory.DisableStatistics();
2876
2877 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2878 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2879
2880 std::unique_ptr<aos::EventLoop> pi1_event_loop = pi1->MakeEventLoop("sender");
2881
2882 aos::monotonic_clock::time_point now = pi1_event_loop->monotonic_now();
2883 FunctionScheduler run_at(pi1_event_loop.get());
2884 aos::Sender<examples::Ping> pi1_sender =
2885 pi1_event_loop->MakeSender<examples::Ping>("/unreliable");
2886
2887 int i = 0;
2888 for (const std::chrono::nanoseconds dt :
2889 {chrono::microseconds(5000), chrono::microseconds(1),
2890 chrono::microseconds(2), chrono::microseconds(70),
2891 chrono::microseconds(63), chrono::microseconds(140),
2892 chrono::microseconds(160)}) {
2893 run_at.ScheduleAt([&, i = i]() { SendPing(&pi1_sender, i); },
2894 now + chrono::seconds(10) - dt);
2895
2896 now += chrono::seconds(10);
2897 ++i;
2898 }
2899
2900 std::vector<std::pair<aos::Context, int>> msgs;
2901
2902 pi2->OnStartup([pi2, &msgs]() {
2903 pi2->AlwaysStart<PingLogger>("ping_logger", "/unreliable", &msgs);
2904 });
2905
2906 factory.RunFor(now - pi1_event_loop->monotonic_now() + chrono::seconds(10));
2907
2908 const monotonic_clock::time_point e = monotonic_clock::epoch();
2909 const chrono::nanoseconds send_delay = factory.send_delay();
2910 const chrono::nanoseconds network_delay = factory.network_delay();
2911
2912 const std::vector<ExpectedTimestamps> expected_values = {
2913 ExpectedTimestamps{
2914 .remote_time = e + chrono::microseconds(9995000),
2915 .remote_transmit_time =
2916 e + chrono::microseconds(9995000) + send_delay,
2917 .event_time =
2918 e + chrono::microseconds(9995000) + send_delay + network_delay,
2919 .value = 0,
2920 },
2921 ExpectedTimestamps{
2922 .remote_time = e + chrono::microseconds(19999999),
2923 .remote_transmit_time =
2924 e + chrono::microseconds(19999999) + send_delay,
2925 .event_time =
2926 e + chrono::microseconds(-1) + send_delay + network_delay,
2927 .value = 1,
2928 },
2929 ExpectedTimestamps{
2930 .remote_time = e + chrono::microseconds(29999998),
2931 .remote_transmit_time =
2932 e + chrono::microseconds(29999998) + send_delay,
2933 .event_time =
2934 e + chrono::microseconds(-2) + send_delay + network_delay,
2935 .value = 2,
2936 },
2937 ExpectedTimestamps{
2938 .remote_time = e + chrono::microseconds(69999840),
2939 .remote_transmit_time =
2940 e + chrono::microseconds(69999840) + send_delay,
2941 .event_time =
2942 e + chrono::microseconds(9999840) + send_delay + network_delay,
2943 .value = 6,
2944 },
2945 };
2946
2947 ASSERT_EQ(msgs.size(), expected_values.size());
2948
2949 for (size_t i = 0; i < msgs.size(); ++i) {
2950 EXPECT_EQ(msgs[i].first.monotonic_remote_time,
2951 expected_values[i].remote_time);
2952 EXPECT_EQ(msgs[i].first.monotonic_remote_transmit_time,
2953 expected_values[i].remote_transmit_time);
2954 EXPECT_EQ(msgs[i].first.monotonic_event_time,
2955 expected_values[i].event_time);
2956 EXPECT_EQ(msgs[i].second, expected_values[i].value);
2957 }
2958
2959 // TODO(austin): Verify that the dropped packet count increases.
2960}
2961
2962// Tests that rebooting while a message is in flight works as expected.
2963TEST_F(SimulatedEventLoopDisconnectTest, ReliableMessageInFlightDuringReboot) {
2964 time.StartEqual();
2965 for (int i = 0; i < 8; ++i) {
2966 time.RebootAt(1, distributed_clock::epoch() + chrono::seconds(10 * i));
2967 }
2968
2969 factory.SkipTimingReport();
2970 factory.DisableStatistics();
2971
2972 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2973 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2974
2975 std::unique_ptr<aos::EventLoop> pi1_event_loop = pi1->MakeEventLoop("sender");
2976
2977 aos::monotonic_clock::time_point now = pi1_event_loop->monotonic_now();
2978 FunctionScheduler run_at(pi1_event_loop.get());
2979 aos::Sender<examples::Ping> pi1_sender =
2980 pi1_event_loop->MakeSender<examples::Ping>("/reliable");
2981
2982 int i = 0;
2983 for (const std::chrono::nanoseconds dt :
2984 {chrono::microseconds(5000), chrono::microseconds(1),
2985 chrono::microseconds(2), chrono::microseconds(70),
2986 chrono::microseconds(63), chrono::microseconds(140),
2987 chrono::microseconds(160)}) {
2988 run_at.ScheduleAt([&, i = i]() { SendPing(&pi1_sender, i); },
2989 now + chrono::seconds(10) - dt);
2990
2991 now += chrono::seconds(10);
2992 ++i;
2993 }
2994
2995 std::vector<std::pair<aos::Context, int>> msgs;
2996
2997 PingLogger *logger;
2998 pi2->OnStartup([pi2, &msgs, &logger]() {
2999 logger = pi2->AlwaysStart<PingLogger>("ping_logger", "/reliable", &msgs);
3000 });
3001
3002 factory.RunFor(now - pi1_event_loop->monotonic_now() + chrono::seconds(10));
3003
3004 // Stop the logger to flush the last boot of data.
3005 pi2->Stop(logger);
3006
3007 const monotonic_clock::time_point e = monotonic_clock::epoch();
3008 const chrono::nanoseconds send_delay = factory.send_delay();
3009 const chrono::nanoseconds network_delay = factory.network_delay();
3010
3011 // Verified using --vmodule=simulated_event_loop=1 and looking at the actual
3012 // event times to confirm what should have been forwarded when.
3013 const std::vector<ExpectedTimestamps> expected_values = {
3014 ExpectedTimestamps{
3015 .remote_time = e + chrono::microseconds(9995000),
3016 .remote_transmit_time =
3017 e + chrono::microseconds(9995000) + send_delay,
3018 .event_time =
3019 e + chrono::microseconds(9995000) + send_delay + network_delay,
3020 .value = 0,
3021 },
3022 ExpectedTimestamps{
3023 .remote_time = e + chrono::microseconds(9995000),
3024 .remote_transmit_time = e + chrono::microseconds(10000000),
3025 .event_time = e + network_delay,
3026 .value = 0,
3027 },
3028 ExpectedTimestamps{
3029 .remote_time = e + chrono::microseconds(19999999),
3030 .remote_transmit_time = e + chrono::microseconds(20000000),
3031 .event_time = e + network_delay,
3032 .value = 1,
3033 },
3034 ExpectedTimestamps{
3035 .remote_time = e + chrono::microseconds(29999998),
3036 .remote_transmit_time = e + chrono::microseconds(30000000),
3037 .event_time = e + network_delay,
3038 .value = 2,
3039 },
3040 ExpectedTimestamps{
3041 .remote_time = e + chrono::microseconds(39999930),
3042 .remote_transmit_time = e + chrono::microseconds(40000000),
3043 .event_time = e + network_delay,
3044 .value = 3,
3045 },
3046 ExpectedTimestamps{
3047 .remote_time = e + chrono::microseconds(49999937),
3048 .remote_transmit_time = e + chrono::microseconds(50000000),
3049 .event_time = e + network_delay,
3050 .value = 4,
3051 },
3052 ExpectedTimestamps{
3053 .remote_time = e + chrono::microseconds(59999860),
3054 .remote_transmit_time = e + chrono::microseconds(60000000),
3055 .event_time = e + network_delay,
3056 .value = 5,
3057 },
3058 ExpectedTimestamps{
3059 .remote_time = e + chrono::microseconds(69999840),
3060 .remote_transmit_time = e + chrono::microseconds(69999890),
3061 .event_time = e + chrono::microseconds(9999890) + network_delay,
3062 .value = 6,
3063 },
3064 ExpectedTimestamps{
3065 .remote_time = e + chrono::microseconds(69999840),
3066 .remote_transmit_time = e + chrono::microseconds(70000000),
3067 .event_time = e + network_delay,
3068 .value = 6,
3069 },
3070 };
3071
3072 ASSERT_EQ(msgs.size(), expected_values.size());
3073
3074 for (size_t i = 0; i < msgs.size(); ++i) {
3075 EXPECT_EQ(msgs[i].first.monotonic_remote_time,
3076 expected_values[i].remote_time);
3077 EXPECT_EQ(msgs[i].first.monotonic_remote_transmit_time,
3078 expected_values[i].remote_transmit_time);
3079 EXPECT_EQ(msgs[i].first.monotonic_event_time,
3080 expected_values[i].event_time);
3081 EXPECT_EQ(msgs[i].second, expected_values[i].value);
3082 }
3083
3084 // TODO(austin): Verify that the dropped packet count increases.
3085}
3086
Stephan Pleinesf63bde82024-01-13 15:59:33 -08003087} // namespace aos::testing