blob: 9a1b9ccceef65299bbc1e2d79a181207a135671d [file] [log] [blame]
Alex Perrycb7da4b2019-08-28 19:35:56 -07001#include "aos/events/simulated_event_loop.h"
2
milind1f1dca32021-07-03 13:50:07 -07003#include <chrono>
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -07004#include <functional>
Austin Schuh5f1cc5c2019-12-01 18:01:11 -08005#include <string_view>
6
Austin Schuh99f7c6a2024-06-25 22:07:44 -07007#include "absl/flags/flag.h"
8#include "absl/flags/reflection.h"
Philipp Schrader790cb542023-07-05 21:06:52 -07009#include "gtest/gtest.h"
10
Alex Perrycb7da4b2019-08-28 19:35:56 -070011#include "aos/events/event_loop_param_test.h"
Austin Schuhbf610b72024-04-04 20:04:55 -070012#include "aos/events/function_scheduler.h"
Austin Schuh2f8fd752020-09-01 22:38:28 -070013#include "aos/events/logging/logger_generated.h"
Austin Schuh01b4c352020-09-21 23:09:39 -070014#include "aos/events/message_counter.h"
Austin Schuh898f4972020-01-11 17:21:25 -080015#include "aos/events/ping_lib.h"
16#include "aos/events/pong_lib.h"
Austin Schuh7d87b672019-12-01 20:23:49 -080017#include "aos/events/test_message_generated.h"
Austin Schuh4c3b9702020-08-30 11:34:55 -070018#include "aos/network/message_bridge_client_generated.h"
19#include "aos/network/message_bridge_server_generated.h"
Austin Schuh0de30f32020-12-06 12:44:28 -080020#include "aos/network/remote_message_generated.h"
Austin Schuh87dd3832021-01-01 23:07:31 -080021#include "aos/network/testing_time_converter.h"
Austin Schuh4c3b9702020-08-30 11:34:55 -070022#include "aos/network/timestamp_generated.h"
Austin Schuh373f1762021-06-02 21:07:09 -070023#include "aos/testing/path.h"
Neil Balchc8f41ed2018-01-20 22:06:53 -080024
Stephan Pleinesf63bde82024-01-13 15:59:33 -080025namespace aos::testing {
Brian Silverman28d14302020-09-18 15:26:17 -070026namespace {
27
Austin Schuh373f1762021-06-02 21:07:09 -070028using aos::testing::ArtifactPath;
Brian Silverman28d14302020-09-18 15:26:17 -070029
Austin Schuh58646e22021-08-23 23:51:46 -070030using logger::BootTimestamp;
Austin Schuh0de30f32020-12-06 12:44:28 -080031using message_bridge::RemoteMessage;
Austin Schuh7267c532019-05-19 19:55:53 -070032namespace chrono = ::std::chrono;
33
Austin Schuh0de30f32020-12-06 12:44:28 -080034} // namespace
35
Neil Balchc8f41ed2018-01-20 22:06:53 -080036class SimulatedEventLoopTestFactory : public EventLoopTestFactory {
37 public:
Austin Schuh5f1cc5c2019-12-01 18:01:11 -080038 ::std::unique_ptr<EventLoop> Make(std::string_view name) override {
Austin Schuh217a9782019-12-21 23:02:50 -080039 MaybeMake();
Austin Schuhac0771c2020-01-07 18:36:30 -080040 return event_loop_factory_->MakeEventLoop(name, my_node());
Neil Balchc8f41ed2018-01-20 22:06:53 -080041 }
Austin Schuh5f1cc5c2019-12-01 18:01:11 -080042 ::std::unique_ptr<EventLoop> MakePrimary(std::string_view name) override {
Austin Schuh217a9782019-12-21 23:02:50 -080043 MaybeMake();
Austin Schuhac0771c2020-01-07 18:36:30 -080044 return event_loop_factory_->MakeEventLoop(name, my_node());
Austin Schuh44019f92019-05-19 19:58:27 -070045 }
46
James Kuszmaul9f998082024-05-23 15:37:35 -070047 Result<void> Run() override { return event_loop_factory_->Run(); }
48
49 std::unique_ptr<ExitHandle> MakeExitHandle() override {
50 MaybeMake();
51 return event_loop_factory_->MakeExitHandle();
52 }
53
Austin Schuh217a9782019-12-21 23:02:50 -080054 void Exit() override { event_loop_factory_->Exit(); }
Austin Schuh44019f92019-05-19 19:58:27 -070055
Austin Schuh52d325c2019-06-23 18:59:06 -070056 // TODO(austin): Implement this. It's used currently for a phased loop test.
57 // I'm not sure how much that matters.
58 void SleepFor(::std::chrono::nanoseconds /*duration*/) override {}
59
Austin Schuh7d87b672019-12-01 20:23:49 -080060 void set_send_delay(std::chrono::nanoseconds send_delay) {
Austin Schuh217a9782019-12-21 23:02:50 -080061 MaybeMake();
62 event_loop_factory_->set_send_delay(send_delay);
Austin Schuh7d87b672019-12-01 20:23:49 -080063 }
64
Neil Balchc8f41ed2018-01-20 22:06:53 -080065 private:
Austin Schuh217a9782019-12-21 23:02:50 -080066 void MaybeMake() {
67 if (!event_loop_factory_) {
68 if (configuration()->has_nodes()) {
Austin Schuhac0771c2020-01-07 18:36:30 -080069 event_loop_factory_ =
70 std::make_unique<SimulatedEventLoopFactory>(configuration());
Austin Schuh217a9782019-12-21 23:02:50 -080071 } else {
72 event_loop_factory_ =
73 std::make_unique<SimulatedEventLoopFactory>(configuration());
74 }
75 }
76 }
77 std::unique_ptr<SimulatedEventLoopFactory> event_loop_factory_;
Neil Balchc8f41ed2018-01-20 22:06:53 -080078};
79
Austin Schuh6bae8252021-02-07 22:01:49 -080080auto CommonParameters() {
81 return ::testing::Combine(
82 ::testing::Values([]() { return new SimulatedEventLoopTestFactory(); }),
83 ::testing::Values(ReadMethod::COPY, ReadMethod::PIN),
84 ::testing::Values(DoTimingReports::kYes, DoTimingReports::kNo));
85}
Austin Schuh6b6dfa52019-06-12 20:16:20 -070086
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -070087INSTANTIATE_TEST_SUITE_P(SimulatedEventLoopCommonTest, AbstractEventLoopTest,
Austin Schuh66168842021-08-17 19:42:21 -070088 CommonParameters());
Brian Silverman77162972020-08-12 19:52:40 -070089
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -070090INSTANTIATE_TEST_SUITE_P(SimulatedEventLoopCommonDeathTest,
Austin Schuh66168842021-08-17 19:42:21 -070091 AbstractEventLoopDeathTest, CommonParameters());
Neil Balchc8f41ed2018-01-20 22:06:53 -080092
Austin Schuh89c9b812021-02-20 14:42:10 -080093// Parameters to run all the tests with.
94struct Param {
95 // The config file to use.
96 std::string config;
97 // If true, the RemoteMessage channel should be shared between all the remote
98 // channels. If false, there will be 1 RemoteMessage channel per remote
99 // channel.
100 bool shared;
101};
102
103class RemoteMessageSimulatedEventLoopTest
104 : public ::testing::TestWithParam<struct Param> {
105 public:
106 RemoteMessageSimulatedEventLoopTest()
107 : config(aos::configuration::ReadConfig(
Austin Schuh373f1762021-06-02 21:07:09 -0700108 ArtifactPath(absl::StrCat("aos/events/", GetParam().config)))) {
Austin Schuh89c9b812021-02-20 14:42:10 -0800109 LOG(INFO) << "Config " << GetParam().config;
110 }
111
112 bool shared() const { return GetParam().shared; }
113
114 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
115 MakePi2OnPi1MessageCounters(aos::EventLoop *event_loop) {
116 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>> counters;
117 if (shared()) {
118 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
119 event_loop, "/aos/remote_timestamps/pi2"));
120 } else {
121 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
122 event_loop,
123 "/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp"));
124 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
125 event_loop, "/aos/remote_timestamps/pi2/test/aos-examples-Ping"));
126 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
127 event_loop, "/aos/remote_timestamps/pi2/reliable/aos-examples-Ping"));
128 }
129 return counters;
130 }
131
132 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
133 MakePi1OnPi2MessageCounters(aos::EventLoop *event_loop) {
134 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>> counters;
135 if (shared()) {
136 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
137 event_loop, "/aos/remote_timestamps/pi1"));
138 } else {
139 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
140 event_loop, "/aos/remote_timestamps/pi1/test/aos-examples-Pong"));
141 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
142 event_loop,
143 "/aos/remote_timestamps/pi1/pi2/aos/aos-message_bridge-Timestamp"));
144 }
145 return counters;
146 }
147
148 aos::FlatbufferDetachedBuffer<aos::Configuration> config;
149};
150
Austin Schuh8fb315a2020-11-19 22:33:58 -0800151// Test that sending a message after running gets properly notified.
152TEST(SimulatedEventLoopTest, SendAfterRunFor) {
153 SimulatedEventLoopTestFactory factory;
154
155 SimulatedEventLoopFactory simulated_event_loop_factory(
156 factory.configuration());
157
158 ::std::unique_ptr<EventLoop> ping_event_loop =
159 simulated_event_loop_factory.MakeEventLoop("ping");
160 aos::Sender<TestMessage> test_message_sender =
161 ping_event_loop->MakeSender<TestMessage>("/test");
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700162 ASSERT_EQ(SendTestMessage(test_message_sender), RawSender::Error::kOk);
Austin Schuh8fb315a2020-11-19 22:33:58 -0800163
164 std::unique_ptr<EventLoop> pong1_event_loop =
165 simulated_event_loop_factory.MakeEventLoop("pong");
166 MessageCounter<TestMessage> test_message_counter1(pong1_event_loop.get(),
167 "/test");
168
169 EXPECT_FALSE(ping_event_loop->is_running());
170
171 // Watchers start when you start running, so there should be nothing counted.
172 simulated_event_loop_factory.RunFor(chrono::seconds(1));
173 EXPECT_EQ(test_message_counter1.count(), 0u);
174
175 std::unique_ptr<EventLoop> pong2_event_loop =
176 simulated_event_loop_factory.MakeEventLoop("pong");
177 MessageCounter<TestMessage> test_message_counter2(pong2_event_loop.get(),
178 "/test");
179
180 // Pauses in the middle don't count though, so this should be counted.
181 // But, the fresh watcher shouldn't pick it up yet.
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700182 ASSERT_EQ(SendTestMessage(test_message_sender), RawSender::Error::kOk);
Austin Schuh8fb315a2020-11-19 22:33:58 -0800183
184 EXPECT_EQ(test_message_counter1.count(), 0u);
185 EXPECT_EQ(test_message_counter2.count(), 0u);
186 simulated_event_loop_factory.RunFor(chrono::seconds(1));
187
188 EXPECT_EQ(test_message_counter1.count(), 1u);
189 EXPECT_EQ(test_message_counter2.count(), 0u);
190}
191
Austin Schuhd027bf52024-05-12 22:24:12 -0700192// Test that OnRun callbacks get deleted if the event loop gets deleted.
193TEST(SimulatedEventLoopTest, DestructEventLoopBeforeOnRun) {
194 SimulatedEventLoopTestFactory factory;
195
196 SimulatedEventLoopFactory simulated_event_loop_factory(
197 factory.configuration());
198
199 {
200 ::std::unique_ptr<EventLoop> test_event_loop =
201 simulated_event_loop_factory.MakeEventLoop("test");
202 test_event_loop->OnRun([]() { LOG(FATAL) << "Don't run this"; });
203 }
204
205 simulated_event_loop_factory.RunFor(chrono::seconds(1));
206}
207
208// Tests that the order event loops are created is the order that the OnRun
209// callbacks are run.
210TEST(SimulatedEventLoopTest, OnRunOrderFollowsConstructionOrder) {
211 SimulatedEventLoopTestFactory factory;
212
213 SimulatedEventLoopFactory simulated_event_loop_factory(
214 factory.configuration());
215
216 int count = 0;
217
218 std::unique_ptr<EventLoop> test1_event_loop =
219 simulated_event_loop_factory.MakeEventLoop("test1");
220 std::unique_ptr<EventLoop> test2_event_loop =
221 simulated_event_loop_factory.MakeEventLoop("test2");
222 test2_event_loop->OnRun([&count]() {
223 EXPECT_EQ(count, 1u);
224 ++count;
225 });
226 test1_event_loop->OnRun([&count]() {
227 EXPECT_EQ(count, 0u);
228 ++count;
229 });
230
231 simulated_event_loop_factory.RunFor(chrono::seconds(1));
232
233 EXPECT_EQ(count, 2u);
234}
235
236// Test that we can't register OnRun callbacks after starting.
237TEST(SimulatedEventLoopDeathTest, OnRunAfterRunning) {
238 SimulatedEventLoopTestFactory factory;
239
240 SimulatedEventLoopFactory simulated_event_loop_factory(
241 factory.configuration());
242
243 std::unique_ptr<EventLoop> test_event_loop =
244 simulated_event_loop_factory.MakeEventLoop("test");
245 test_event_loop->OnRun([]() {});
246
247 simulated_event_loop_factory.RunFor(chrono::seconds(1));
248
249 EXPECT_DEATH(test_event_loop->OnRun([]() {}), "OnRun");
250}
251
Austin Schuh60e77942022-05-16 17:48:24 -0700252// Test that if we configure an event loop to be able to send too fast that we
253// do allow it to do so.
James Kuszmaul890c2492022-04-06 14:59:31 -0700254TEST(SimulatedEventLoopTest, AllowSendTooFast) {
255 SimulatedEventLoopTestFactory factory;
256
257 SimulatedEventLoopFactory simulated_event_loop_factory(
258 factory.configuration());
259
260 // Create two event loops: One will be allowed to send too fast, one won't. We
261 // will then test to ensure that the one that is allowed to send too fast can
262 // indeed send too fast, but that it then makes it so that the second event
263 // loop can no longer send anything because *it* is still limited.
264 ::std::unique_ptr<EventLoop> too_fast_event_loop =
265 simulated_event_loop_factory.GetNodeEventLoopFactory(nullptr)
266 ->MakeEventLoop("too_fast_sender",
267 {NodeEventLoopFactory::CheckSentTooFast::kNo,
James Kuszmaul94ca5132022-07-19 09:11:08 -0700268 NodeEventLoopFactory::ExclusiveSenders::kNo,
269 {}});
James Kuszmaul890c2492022-04-06 14:59:31 -0700270 aos::Sender<TestMessage> too_fast_message_sender =
271 too_fast_event_loop->MakeSender<TestMessage>("/test");
272
273 ::std::unique_ptr<EventLoop> limited_event_loop =
274 simulated_event_loop_factory.MakeEventLoop("limited_sender");
275 aos::Sender<TestMessage> limited_message_sender =
276 limited_event_loop->MakeSender<TestMessage>("/test");
277
278 const int queue_size = TestChannelQueueSize(too_fast_event_loop.get());
279 for (int ii = 0; ii < queue_size; ++ii) {
280 ASSERT_EQ(SendTestMessage(too_fast_message_sender), RawSender::Error::kOk);
281 }
282 // And now we should start being in the sending-too-fast phase.
283 for (int ii = 0; ii < queue_size; ++ii) {
284 ASSERT_EQ(SendTestMessage(too_fast_message_sender), RawSender::Error::kOk);
Austin Schuh60e77942022-05-16 17:48:24 -0700285 ASSERT_EQ(SendTestMessage(limited_message_sender),
286 RawSender::Error::kMessagesSentTooFast);
James Kuszmaul890c2492022-04-06 14:59:31 -0700287 }
288}
289
290// Test that if we setup an exclusive sender that it is indeed exclusive.
291TEST(SimulatedEventLoopDeathTest, ExclusiveSenders) {
292 SimulatedEventLoopTestFactory factory;
293
294 SimulatedEventLoopFactory simulated_event_loop_factory(
295 factory.configuration());
296
297 ::std::unique_ptr<EventLoop> exclusive_event_loop =
298 simulated_event_loop_factory.GetNodeEventLoopFactory(nullptr)
James Kuszmaul94ca5132022-07-19 09:11:08 -0700299 ->MakeEventLoop(
300 "too_fast_sender",
301 {NodeEventLoopFactory::CheckSentTooFast::kYes,
302 NodeEventLoopFactory::ExclusiveSenders::kYes,
303 {{configuration::GetChannel(factory.configuration(), "/test1",
304 "aos.TestMessage", "", nullptr),
305 NodeEventLoopFactory::ExclusiveSenders::kNo}}});
James Kuszmaul890c2492022-04-06 14:59:31 -0700306 exclusive_event_loop->SkipAosLog();
307 exclusive_event_loop->SkipTimingReport();
308 ::std::unique_ptr<EventLoop> normal_event_loop =
309 simulated_event_loop_factory.MakeEventLoop("limited_sender");
310 // Set things up to have the exclusive sender be destroyed so we can test
311 // recovery.
312 {
313 aos::Sender<TestMessage> exclusive_sender =
314 exclusive_event_loop->MakeSender<TestMessage>("/test");
315
316 EXPECT_DEATH(normal_event_loop->MakeSender<TestMessage>("/test"),
317 "TestMessage");
318 }
319 // This one should succeed now that the exclusive channel is removed.
320 aos::Sender<TestMessage> normal_sender =
321 normal_event_loop->MakeSender<TestMessage>("/test");
Austin Schuh60e77942022-05-16 17:48:24 -0700322 EXPECT_DEATH(exclusive_event_loop->MakeSender<TestMessage>("/test"),
323 "TestMessage");
James Kuszmaul94ca5132022-07-19 09:11:08 -0700324
325 // And check an explicitly exempted channel:
326 aos::Sender<TestMessage> non_exclusive_sender =
327 exclusive_event_loop->MakeSender<TestMessage>("/test1");
328 aos::Sender<TestMessage> non_exclusive_sender_regular_event_loop =
329 normal_event_loop->MakeSender<TestMessage>("/test1");
James Kuszmaul890c2492022-04-06 14:59:31 -0700330}
331
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700332void TestSentTooFastCheckEdgeCase(
333 const std::function<RawSender::Error(int, int)> expected_err,
334 const bool send_twice_at_end) {
335 SimulatedEventLoopTestFactory factory;
336
337 auto event_loop = factory.MakePrimary("primary");
338
339 auto sender = event_loop->MakeSender<TestMessage>("/test");
340
341 const int queue_size = TestChannelQueueSize(event_loop.get());
342 int msgs_sent = 0;
343 event_loop->AddPhasedLoop(
344 [&](int) {
345 EXPECT_EQ(SendTestMessage(sender), expected_err(msgs_sent, queue_size));
346 msgs_sent++;
347
348 // If send_twice_at_end, send the last two messages (message
349 // queue_size and queue_size + 1) in the same iteration, meaning that
350 // we would be sending very slightly too fast. Otherwise, we will send
351 // message queue_size + 1 in the next iteration and we will continue
352 // to be sending exactly at the channel frequency.
353 if (send_twice_at_end && (msgs_sent == queue_size)) {
354 EXPECT_EQ(SendTestMessage(sender),
355 expected_err(msgs_sent, queue_size));
356 msgs_sent++;
357 }
358
359 if (msgs_sent > queue_size) {
360 factory.Exit();
361 }
362 },
363 std::chrono::duration_cast<std::chrono::nanoseconds>(
364 std::chrono::duration<double>(
365 1.0 / TestChannelFrequency(event_loop.get()))));
366
367 factory.Run();
368}
369
370// Tests that RawSender::Error::kMessagesSentTooFast is not returned
371// when messages are sent at the exact frequency of the channel.
372TEST(SimulatedEventLoopTest, SendingAtExactlyChannelFrequency) {
373 TestSentTooFastCheckEdgeCase([](int, int) { return RawSender::Error::kOk; },
374 false);
375}
376
377// Tests that RawSender::Error::kMessagesSentTooFast is returned
378// when sending exactly one more message than allowed in a channel storage
379// duration.
380TEST(SimulatedEventLoopTest, SendingSlightlyTooFast) {
381 TestSentTooFastCheckEdgeCase(
382 [](const int msgs_sent, const int queue_size) {
383 return (msgs_sent == queue_size ? RawSender::Error::kMessagesSentTooFast
384 : RawSender::Error::kOk);
385 },
386 true);
387}
388
Austin Schuh8fb315a2020-11-19 22:33:58 -0800389// Test that creating an event loop while running dies.
390TEST(SimulatedEventLoopDeathTest, MakeEventLoopWhileRunning) {
391 SimulatedEventLoopTestFactory factory;
392
393 SimulatedEventLoopFactory simulated_event_loop_factory(
394 factory.configuration());
395
396 ::std::unique_ptr<EventLoop> event_loop =
397 simulated_event_loop_factory.MakeEventLoop("ping");
398
399 auto timer = event_loop->AddTimer([&]() {
400 EXPECT_DEATH(
401 {
402 ::std::unique_ptr<EventLoop> event_loop2 =
403 simulated_event_loop_factory.MakeEventLoop("ping");
404 },
405 "event loop while running");
406 simulated_event_loop_factory.Exit();
407 });
408
409 event_loop->OnRun([&event_loop, &timer] {
Philipp Schradera6712522023-07-05 20:25:11 -0700410 timer->Schedule(event_loop->monotonic_now() + chrono::milliseconds(50));
Austin Schuh8fb315a2020-11-19 22:33:58 -0800411 });
412
413 simulated_event_loop_factory.Run();
414}
415
416// Test that creating a watcher after running dies.
417TEST(SimulatedEventLoopDeathTest, MakeWatcherAfterRunning) {
418 SimulatedEventLoopTestFactory factory;
419
420 SimulatedEventLoopFactory simulated_event_loop_factory(
421 factory.configuration());
422
423 ::std::unique_ptr<EventLoop> event_loop =
424 simulated_event_loop_factory.MakeEventLoop("ping");
425
426 simulated_event_loop_factory.RunFor(chrono::seconds(1));
427
428 EXPECT_DEATH(
429 { MessageCounter<TestMessage> counter(event_loop.get(), "/test"); },
430 "Can't add a watcher after running");
431
432 ::std::unique_ptr<EventLoop> event_loop2 =
433 simulated_event_loop_factory.MakeEventLoop("ping");
434
435 simulated_event_loop_factory.RunFor(chrono::seconds(1));
436
437 EXPECT_DEATH(
438 { MessageCounter<TestMessage> counter(event_loop2.get(), "/test"); },
439 "Can't add a watcher after running");
440}
441
Austin Schuh44019f92019-05-19 19:58:27 -0700442// Test that running for a time period with no handlers causes time to progress
443// correctly.
444TEST(SimulatedEventLoopTest, RunForNoHandlers) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800445 SimulatedEventLoopTestFactory factory;
446
447 SimulatedEventLoopFactory simulated_event_loop_factory(
448 factory.configuration());
Austin Schuh44019f92019-05-19 19:58:27 -0700449 ::std::unique_ptr<EventLoop> event_loop =
Austin Schuh5f1cc5c2019-12-01 18:01:11 -0800450 simulated_event_loop_factory.MakeEventLoop("loop");
Austin Schuh44019f92019-05-19 19:58:27 -0700451
452 simulated_event_loop_factory.RunFor(chrono::seconds(1));
453
454 EXPECT_EQ(::aos::monotonic_clock::epoch() + chrono::seconds(1),
Austin Schuh44019f92019-05-19 19:58:27 -0700455 event_loop->monotonic_now());
456}
457
458// Test that running for a time with a periodic handler causes time to end
459// correctly.
460TEST(SimulatedEventLoopTest, RunForTimerHandler) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800461 SimulatedEventLoopTestFactory factory;
462
463 SimulatedEventLoopFactory simulated_event_loop_factory(
464 factory.configuration());
Austin Schuh44019f92019-05-19 19:58:27 -0700465 ::std::unique_ptr<EventLoop> event_loop =
Austin Schuh5f1cc5c2019-12-01 18:01:11 -0800466 simulated_event_loop_factory.MakeEventLoop("loop");
Austin Schuh44019f92019-05-19 19:58:27 -0700467
468 int counter = 0;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700469 auto timer = event_loop->AddTimer([&counter]() { ++counter; });
Austin Schuh44019f92019-05-19 19:58:27 -0700470 event_loop->OnRun([&event_loop, &timer] {
Philipp Schradera6712522023-07-05 20:25:11 -0700471 timer->Schedule(event_loop->monotonic_now() + chrono::milliseconds(50),
472 chrono::milliseconds(100));
Austin Schuh44019f92019-05-19 19:58:27 -0700473 });
474
475 simulated_event_loop_factory.RunFor(chrono::seconds(1));
476
477 EXPECT_EQ(::aos::monotonic_clock::epoch() + chrono::seconds(1),
Austin Schuh44019f92019-05-19 19:58:27 -0700478 event_loop->monotonic_now());
479 EXPECT_EQ(counter, 10);
480}
481
Austin Schuh7d87b672019-12-01 20:23:49 -0800482// Tests that watchers have latency in simulation.
483TEST(SimulatedEventLoopTest, WatcherTimingReport) {
Austin Schuh99f7c6a2024-06-25 22:07:44 -0700484 absl::FlagSaver flag_saver;
Austin Schuh7d87b672019-12-01 20:23:49 -0800485 SimulatedEventLoopTestFactory factory;
486 factory.set_send_delay(std::chrono::microseconds(50));
487
Austin Schuh99f7c6a2024-06-25 22:07:44 -0700488 absl::SetFlag(&FLAGS_timing_report_ms, 1000);
Austin Schuh7d87b672019-12-01 20:23:49 -0800489 auto loop1 = factory.MakePrimary("primary");
490 loop1->MakeWatcher("/test", [](const TestMessage &) {});
491
492 auto loop2 = factory.Make("sender_loop");
493
494 auto loop3 = factory.Make("report_fetcher");
495
496 Fetcher<timing::Report> report_fetcher =
497 loop3->MakeFetcher<timing::Report>("/aos");
498 EXPECT_FALSE(report_fetcher.Fetch());
499
500 auto sender = loop2->MakeSender<TestMessage>("/test");
501
502 // Send 10 messages in the middle of a timing report period so we get
503 // something interesting back.
504 auto test_timer = loop2->AddTimer([&sender]() {
505 for (int i = 0; i < 10; ++i) {
506 aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
507 TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
508 builder.add_value(200 + i);
milind1f1dca32021-07-03 13:50:07 -0700509 msg.CheckOk(msg.Send(builder.Finish()));
Austin Schuh7d87b672019-12-01 20:23:49 -0800510 }
511 });
512
513 // Quit after 1 timing report, mid way through the next cycle.
514 {
515 auto end_timer = loop1->AddTimer([&factory]() { factory.Exit(); });
Philipp Schradera6712522023-07-05 20:25:11 -0700516 end_timer->Schedule(loop1->monotonic_now() + chrono::milliseconds(2500));
Austin Schuh7d87b672019-12-01 20:23:49 -0800517 end_timer->set_name("end");
518 }
519
520 loop1->OnRun([&test_timer, &loop1]() {
Philipp Schradera6712522023-07-05 20:25:11 -0700521 test_timer->Schedule(loop1->monotonic_now() + chrono::milliseconds(1500));
Austin Schuh7d87b672019-12-01 20:23:49 -0800522 });
523
524 factory.Run();
525
526 // And, since we are here, check that the timing report makes sense.
527 // Start by looking for our event loop's timing.
528 FlatbufferDetachedBuffer<timing::Report> primary_report =
529 FlatbufferDetachedBuffer<timing::Report>::Empty();
530 while (report_fetcher.FetchNext()) {
531 LOG(INFO) << "Report " << FlatbufferToJson(report_fetcher.get());
532 if (report_fetcher->name()->string_view() == "primary") {
533 primary_report = CopyFlatBuffer(report_fetcher.get());
534 }
535 }
536
537 // Check the watcher report.
Ravago Jonescf453ab2020-05-06 21:14:53 -0700538 VLOG(1) << FlatbufferToJson(primary_report, {.multi_line = true});
Austin Schuh7d87b672019-12-01 20:23:49 -0800539
540 EXPECT_EQ(primary_report.message().name()->string_view(), "primary");
541
542 // Just the timing report timer.
543 ASSERT_NE(primary_report.message().timers(), nullptr);
544 EXPECT_EQ(primary_report.message().timers()->size(), 2);
545
546 // No phased loops
547 ASSERT_EQ(primary_report.message().phased_loops(), nullptr);
548
549 // And now confirm that the watcher received all 10 messages, and has latency.
550 ASSERT_NE(primary_report.message().watchers(), nullptr);
551 ASSERT_EQ(primary_report.message().watchers()->size(), 1);
552 EXPECT_EQ(primary_report.message().watchers()->Get(0)->count(), 10);
553 EXPECT_NEAR(
554 primary_report.message().watchers()->Get(0)->wakeup_latency()->average(),
555 0.00005, 1e-9);
556 EXPECT_NEAR(
557 primary_report.message().watchers()->Get(0)->wakeup_latency()->min(),
558 0.00005, 1e-9);
559 EXPECT_NEAR(
560 primary_report.message().watchers()->Get(0)->wakeup_latency()->max(),
561 0.00005, 1e-9);
562 EXPECT_EQ(primary_report.message()
563 .watchers()
564 ->Get(0)
565 ->wakeup_latency()
566 ->standard_deviation(),
567 0.0);
568
569 EXPECT_EQ(
570 primary_report.message().watchers()->Get(0)->handler_time()->average(),
571 0.0);
572 EXPECT_EQ(primary_report.message().watchers()->Get(0)->handler_time()->min(),
573 0.0);
574 EXPECT_EQ(primary_report.message().watchers()->Get(0)->handler_time()->max(),
575 0.0);
576 EXPECT_EQ(primary_report.message()
577 .watchers()
578 ->Get(0)
579 ->handler_time()
580 ->standard_deviation(),
581 0.0);
582}
583
Austin Schuh89c9b812021-02-20 14:42:10 -0800584size_t CountAll(
585 const std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
586 &counters) {
587 size_t count = 0u;
588 for (const std::unique_ptr<MessageCounter<RemoteMessage>> &counter :
589 counters) {
590 count += counter->count();
591 }
592 return count;
593}
594
Austin Schuh4c3b9702020-08-30 11:34:55 -0700595// Tests that ping and pong work when on 2 different nodes, and the message
596// gateway messages are sent out as expected.
Austin Schuh89c9b812021-02-20 14:42:10 -0800597TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodePingPong) {
Austin Schuh898f4972020-01-11 17:21:25 -0800598 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
599 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
Austin Schuh4c3b9702020-08-30 11:34:55 -0700600 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
Austin Schuh898f4972020-01-11 17:21:25 -0800601
602 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
603
604 std::unique_ptr<EventLoop> ping_event_loop =
605 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
606 Ping ping(ping_event_loop.get());
607
608 std::unique_ptr<EventLoop> pong_event_loop =
609 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
610 Pong pong(pong_event_loop.get());
611
612 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
613 simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700614 MessageCounter<examples::Pong> pi2_pong_counter(
615 pi2_pong_counter_event_loop.get(), "/test");
Austin Schuh2f8fd752020-09-01 22:38:28 -0700616 aos::Fetcher<message_bridge::Timestamp> pi1_on_pi2_timestamp_fetcher =
617 pi2_pong_counter_event_loop->MakeFetcher<message_bridge::Timestamp>(
618 "/pi1/aos");
619 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
620 pi2_pong_counter_event_loop->MakeFetcher<examples::Ping>("/test");
Austin Schuh898f4972020-01-11 17:21:25 -0800621
Austin Schuh4c3b9702020-08-30 11:34:55 -0700622 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
623 simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
Austin Schuh898f4972020-01-11 17:21:25 -0800624
625 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
626 simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700627 MessageCounter<examples::Pong> pi1_pong_counter(
628 pi1_pong_counter_event_loop.get(), "/test");
Austin Schuh2f8fd752020-09-01 22:38:28 -0700629 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
630 pi1_pong_counter_event_loop->MakeFetcher<examples::Ping>("/test");
631 aos::Fetcher<message_bridge::Timestamp> pi1_on_pi1_timestamp_fetcher =
632 pi1_pong_counter_event_loop->MakeFetcher<message_bridge::Timestamp>(
633 "/aos");
634
Austin Schuh4c3b9702020-08-30 11:34:55 -0700635 // Count timestamps.
636 MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
637 pi1_pong_counter_event_loop.get(), "/pi1/aos");
638 MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
639 pi2_pong_counter_event_loop.get(), "/pi1/aos");
640 MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
641 pi3_pong_counter_event_loop.get(), "/pi1/aos");
642 MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
643 pi1_pong_counter_event_loop.get(), "/pi2/aos");
644 MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
645 pi2_pong_counter_event_loop.get(), "/pi2/aos");
646 MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
647 pi1_pong_counter_event_loop.get(), "/pi3/aos");
648 MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
649 pi3_pong_counter_event_loop.get(), "/pi3/aos");
650
Austin Schuh2f8fd752020-09-01 22:38:28 -0700651 // Count remote timestamps
Austin Schuh89c9b812021-02-20 14:42:10 -0800652 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
653 remote_timestamps_pi2_on_pi1 =
654 MakePi2OnPi1MessageCounters(pi1_pong_counter_event_loop.get());
655 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
656 remote_timestamps_pi1_on_pi2 =
657 MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700658
Austin Schuh4c3b9702020-08-30 11:34:55 -0700659 // Wait to let timestamp estimation start up before looking for the results.
660 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
661
Austin Schuh8fb315a2020-11-19 22:33:58 -0800662 std::unique_ptr<EventLoop> pi1_statistics_counter_event_loop =
663 simulated_event_loop_factory.MakeEventLoop("pi1_statistics_counter", pi1);
664 std::unique_ptr<EventLoop> pi2_statistics_counter_event_loop =
665 simulated_event_loop_factory.MakeEventLoop("pi2_statistics_counter", pi2);
666 std::unique_ptr<EventLoop> pi3_statistics_counter_event_loop =
667 simulated_event_loop_factory.MakeEventLoop("pi3_statistics_counter", pi3);
668
Austin Schuh4c3b9702020-08-30 11:34:55 -0700669 int pi1_server_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800670 pi1_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700671 "/pi1/aos", [&pi1_server_statistics_count](
672 const message_bridge::ServerStatistics &stats) {
673 VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
674 EXPECT_EQ(stats.connections()->size(), 2u);
675 for (const message_bridge::ServerConnection *connection :
676 *stats.connections()) {
677 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800678 EXPECT_EQ(connection->connection_count(), 1u);
679 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800680 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700681 if (connection->node()->name()->string_view() == "pi2") {
682 EXPECT_GT(connection->sent_packets(), 50);
683 } else if (connection->node()->name()->string_view() == "pi3") {
684 EXPECT_GE(connection->sent_packets(), 5);
685 } else {
686 LOG(FATAL) << "Unknown connection";
687 }
688
689 EXPECT_TRUE(connection->has_monotonic_offset());
690 EXPECT_EQ(connection->monotonic_offset(), 0);
James Kuszmaula6681e22023-05-26 11:20:40 -0700691
692 EXPECT_TRUE(connection->has_channels());
693 int accumulated_sent_count = 0;
694 int accumulated_dropped_count = 0;
695 for (const message_bridge::ServerChannelStatistics *channel :
696 *connection->channels()) {
697 accumulated_sent_count += channel->sent_packets();
698 accumulated_dropped_count += channel->dropped_packets();
699 }
700 EXPECT_EQ(connection->sent_packets(), accumulated_sent_count);
701 EXPECT_EQ(connection->dropped_packets(), accumulated_dropped_count);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700702 }
703 ++pi1_server_statistics_count;
704 });
705
706 int pi2_server_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800707 pi2_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700708 "/pi2/aos", [&pi2_server_statistics_count](
709 const message_bridge::ServerStatistics &stats) {
710 VLOG(1) << "pi2 ServerStatistics " << FlatbufferToJson(&stats);
711 EXPECT_EQ(stats.connections()->size(), 1u);
712
713 const message_bridge::ServerConnection *connection =
714 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800715 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700716 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
717 EXPECT_GT(connection->sent_packets(), 50);
718 EXPECT_TRUE(connection->has_monotonic_offset());
719 EXPECT_EQ(connection->monotonic_offset(), 0);
Austin Schuh367a7f42021-11-23 23:04:36 -0800720 EXPECT_EQ(connection->connection_count(), 1u);
721 EXPECT_EQ(connection->connected_since_time(), 0);
James Kuszmaula6681e22023-05-26 11:20:40 -0700722
723 EXPECT_TRUE(connection->has_channels());
724 int accumulated_sent_count = 0;
725 int accumulated_dropped_count = 0;
726 for (const message_bridge::ServerChannelStatistics *channel :
727 *connection->channels()) {
728 accumulated_sent_count += channel->sent_packets();
729 accumulated_dropped_count += channel->dropped_packets();
730 }
731 EXPECT_EQ(connection->sent_packets(), accumulated_sent_count);
732 EXPECT_EQ(connection->dropped_packets(), accumulated_dropped_count);
733
Austin Schuh4c3b9702020-08-30 11:34:55 -0700734 ++pi2_server_statistics_count;
735 });
736
737 int pi3_server_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800738 pi3_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700739 "/pi3/aos", [&pi3_server_statistics_count](
740 const message_bridge::ServerStatistics &stats) {
741 VLOG(1) << "pi3 ServerStatistics " << FlatbufferToJson(&stats);
742 EXPECT_EQ(stats.connections()->size(), 1u);
743
744 const message_bridge::ServerConnection *connection =
745 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800746 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700747 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
748 EXPECT_GE(connection->sent_packets(), 5);
749 EXPECT_TRUE(connection->has_monotonic_offset());
750 EXPECT_EQ(connection->monotonic_offset(), 0);
Austin Schuh367a7f42021-11-23 23:04:36 -0800751 EXPECT_EQ(connection->connection_count(), 1u);
752 EXPECT_EQ(connection->connected_since_time(), 0);
James Kuszmaula6681e22023-05-26 11:20:40 -0700753
754 EXPECT_TRUE(connection->has_channels());
755 int accumulated_sent_count = 0;
756 int accumulated_dropped_count = 0;
757 for (const message_bridge::ServerChannelStatistics *channel :
758 *connection->channels()) {
759 accumulated_sent_count += channel->sent_packets();
760 accumulated_dropped_count += channel->dropped_packets();
761 }
762 EXPECT_EQ(connection->sent_packets(), accumulated_sent_count);
763 EXPECT_EQ(connection->dropped_packets(), accumulated_dropped_count);
764
Austin Schuh4c3b9702020-08-30 11:34:55 -0700765 ++pi3_server_statistics_count;
766 });
767
768 int pi1_client_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800769 pi1_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700770 "/pi1/aos", [&pi1_client_statistics_count](
771 const message_bridge::ClientStatistics &stats) {
772 VLOG(1) << "pi1 ClientStatistics " << FlatbufferToJson(&stats);
773 EXPECT_EQ(stats.connections()->size(), 2u);
774
775 for (const message_bridge::ClientConnection *connection :
776 *stats.connections()) {
777 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
778 if (connection->node()->name()->string_view() == "pi2") {
779 EXPECT_GT(connection->received_packets(), 50);
780 } else if (connection->node()->name()->string_view() == "pi3") {
781 EXPECT_GE(connection->received_packets(), 5);
782 } else {
783 LOG(FATAL) << "Unknown connection";
784 }
785
Austin Schuhe61d4382021-03-31 21:33:02 -0700786 EXPECT_EQ(connection->partial_deliveries(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700787 EXPECT_TRUE(connection->has_monotonic_offset());
Austin Schuhac6d89e2024-03-27 14:56:09 -0700788 EXPECT_EQ(connection->monotonic_offset(), 100000);
Austin Schuh367a7f42021-11-23 23:04:36 -0800789 EXPECT_EQ(connection->connection_count(), 1u);
790 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700791 }
792 ++pi1_client_statistics_count;
793 });
794
795 int pi2_client_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800796 pi2_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700797 "/pi2/aos", [&pi2_client_statistics_count](
798 const message_bridge::ClientStatistics &stats) {
799 VLOG(1) << "pi2 ClientStatistics " << FlatbufferToJson(&stats);
800 EXPECT_EQ(stats.connections()->size(), 1u);
801
802 const message_bridge::ClientConnection *connection =
803 stats.connections()->Get(0);
804 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
805 EXPECT_GT(connection->received_packets(), 50);
Austin Schuhe61d4382021-03-31 21:33:02 -0700806 EXPECT_EQ(connection->partial_deliveries(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700807 EXPECT_TRUE(connection->has_monotonic_offset());
Austin Schuhac6d89e2024-03-27 14:56:09 -0700808 EXPECT_EQ(connection->monotonic_offset(), 100000);
Austin Schuh367a7f42021-11-23 23:04:36 -0800809 EXPECT_EQ(connection->connection_count(), 1u);
810 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700811 ++pi2_client_statistics_count;
812 });
813
814 int pi3_client_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800815 pi3_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700816 "/pi3/aos", [&pi3_client_statistics_count](
817 const message_bridge::ClientStatistics &stats) {
818 VLOG(1) << "pi3 ClientStatistics " << FlatbufferToJson(&stats);
819 EXPECT_EQ(stats.connections()->size(), 1u);
820
821 const message_bridge::ClientConnection *connection =
822 stats.connections()->Get(0);
823 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
824 EXPECT_GE(connection->received_packets(), 5);
Austin Schuhe61d4382021-03-31 21:33:02 -0700825 EXPECT_EQ(connection->partial_deliveries(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700826 EXPECT_TRUE(connection->has_monotonic_offset());
Austin Schuhac6d89e2024-03-27 14:56:09 -0700827 EXPECT_EQ(connection->monotonic_offset(), 100000);
Austin Schuh367a7f42021-11-23 23:04:36 -0800828 EXPECT_EQ(connection->connection_count(), 1u);
829 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700830 ++pi3_client_statistics_count;
831 });
832
Austin Schuh2f8fd752020-09-01 22:38:28 -0700833 // Find the channel index for both the /pi1/aos Timestamp channel and Ping
834 // channel.
835 const size_t pi1_timestamp_channel =
836 configuration::ChannelIndex(pi1_pong_counter_event_loop->configuration(),
837 pi1_on_pi2_timestamp_fetcher.channel());
838 const size_t ping_timestamp_channel =
839 configuration::ChannelIndex(pi1_pong_counter_event_loop->configuration(),
840 ping_on_pi2_fetcher.channel());
841
842 for (const Channel *channel :
843 *pi1_pong_counter_event_loop->configuration()->channels()) {
844 VLOG(1) << "Channel "
845 << configuration::ChannelIndex(
846 pi1_pong_counter_event_loop->configuration(), channel)
847 << " " << configuration::CleanedChannelToString(channel);
848 }
849
Austin Schuh8fb315a2020-11-19 22:33:58 -0800850 std::unique_ptr<EventLoop> pi1_remote_timestamp =
851 simulated_event_loop_factory.MakeEventLoop("pi1_remote_timestamp", pi1);
852
Austin Schuh89c9b812021-02-20 14:42:10 -0800853 for (std::pair<int, std::string> channel :
854 shared()
855 ? std::vector<std::pair<
856 int, std::string>>{{-1, "/pi1/aos/remote_timestamps/pi2"}}
857 : std::vector<std::pair<int, std::string>>{
858 {pi1_timestamp_channel,
859 "/pi1/aos/remote_timestamps/pi2/pi1/aos/"
860 "aos-message_bridge-Timestamp"},
861 {ping_timestamp_channel,
862 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping"}}) {
863 // For each remote timestamp we get back, confirm that it is either a ping
864 // message, or a timestamp we sent out. Also confirm that the timestamps
865 // are correct.
866 pi1_remote_timestamp->MakeWatcher(
867 channel.second,
868 [pi1_timestamp_channel, ping_timestamp_channel, &ping_on_pi2_fetcher,
869 &ping_on_pi1_fetcher, &pi1_on_pi2_timestamp_fetcher,
870 &pi1_on_pi1_timestamp_fetcher, &simulated_event_loop_factory, pi2,
Austin Schuhac6d89e2024-03-27 14:56:09 -0700871 channel_index = channel.first,
872 channel_name = channel.second](const RemoteMessage &header) {
873 VLOG(1) << channel_name << " aos::message_bridge::RemoteMessage -> "
874 << aos::FlatbufferToJson(&header);
Austin Schuh89c9b812021-02-20 14:42:10 -0800875 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuhcdd90272021-03-15 12:46:16 -0700876 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
Austin Schuh89c9b812021-02-20 14:42:10 -0800877 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
Austin Schuhcdd90272021-03-15 12:46:16 -0700878 ->boot_uuid());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700879
Austin Schuh89c9b812021-02-20 14:42:10 -0800880 const aos::monotonic_clock::time_point header_monotonic_sent_time(
881 chrono::nanoseconds(header.monotonic_sent_time()));
882 const aos::realtime_clock::time_point header_realtime_sent_time(
883 chrono::nanoseconds(header.realtime_sent_time()));
884 const aos::monotonic_clock::time_point header_monotonic_remote_time(
885 chrono::nanoseconds(header.monotonic_remote_time()));
Austin Schuhac6d89e2024-03-27 14:56:09 -0700886 const aos::monotonic_clock::time_point
887 header_monotonic_remote_transmit_time(
888 chrono::nanoseconds(header.monotonic_remote_transmit_time()));
Austin Schuh89c9b812021-02-20 14:42:10 -0800889 const aos::realtime_clock::time_point header_realtime_remote_time(
890 chrono::nanoseconds(header.realtime_remote_time()));
Austin Schuh2f8fd752020-09-01 22:38:28 -0700891
Austin Schuh89c9b812021-02-20 14:42:10 -0800892 if (channel_index != -1) {
893 ASSERT_EQ(channel_index, header.channel_index());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700894 }
895
Austin Schuh89c9b812021-02-20 14:42:10 -0800896 const Context *pi1_context = nullptr;
897 const Context *pi2_context = nullptr;
898
899 if (header.channel_index() == pi1_timestamp_channel) {
900 // Find the forwarded message.
901 while (pi1_on_pi2_timestamp_fetcher.context().monotonic_event_time <
902 header_monotonic_sent_time) {
903 ASSERT_TRUE(pi1_on_pi2_timestamp_fetcher.FetchNext());
904 }
905
906 // And the source message.
907 while (pi1_on_pi1_timestamp_fetcher.context().monotonic_event_time <
908 header_monotonic_remote_time) {
909 ASSERT_TRUE(pi1_on_pi1_timestamp_fetcher.FetchNext());
910 }
911
912 pi1_context = &pi1_on_pi1_timestamp_fetcher.context();
913 pi2_context = &pi1_on_pi2_timestamp_fetcher.context();
Austin Schuhac6d89e2024-03-27 14:56:09 -0700914
915 EXPECT_EQ(header_monotonic_remote_transmit_time,
916 pi2_context->monotonic_remote_time);
Austin Schuh89c9b812021-02-20 14:42:10 -0800917 } else if (header.channel_index() == ping_timestamp_channel) {
918 // Find the forwarded message.
919 while (ping_on_pi2_fetcher.context().monotonic_event_time <
920 header_monotonic_sent_time) {
921 ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
922 }
923
924 // And the source message.
925 while (ping_on_pi1_fetcher.context().monotonic_event_time <
926 header_monotonic_remote_time) {
927 ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
928 }
929
930 pi1_context = &ping_on_pi1_fetcher.context();
931 pi2_context = &ping_on_pi2_fetcher.context();
Austin Schuhac6d89e2024-03-27 14:56:09 -0700932
933 EXPECT_EQ(header_monotonic_remote_transmit_time,
934 pi2_context->monotonic_event_time -
935 simulated_event_loop_factory.network_delay());
Austin Schuh89c9b812021-02-20 14:42:10 -0800936 } else {
937 LOG(FATAL) << "Unknown channel";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700938 }
939
Austin Schuh89c9b812021-02-20 14:42:10 -0800940 // Confirm the forwarded message has matching timestamps to the
941 // timestamps we got back.
942 EXPECT_EQ(pi2_context->queue_index, header.queue_index());
943 EXPECT_EQ(pi2_context->remote_queue_index,
944 header.remote_queue_index());
945 EXPECT_EQ(pi2_context->monotonic_event_time,
946 header_monotonic_sent_time);
947 EXPECT_EQ(pi2_context->realtime_event_time,
948 header_realtime_sent_time);
949 EXPECT_EQ(pi2_context->realtime_remote_time,
950 header_realtime_remote_time);
951 EXPECT_EQ(pi2_context->monotonic_remote_time,
952 header_monotonic_remote_time);
Austin Schuhac6d89e2024-03-27 14:56:09 -0700953 EXPECT_EQ(pi2_context->monotonic_remote_transmit_time,
954 header_monotonic_remote_transmit_time);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700955
Austin Schuh89c9b812021-02-20 14:42:10 -0800956 // Confirm the forwarded message also matches the source message.
957 EXPECT_EQ(pi1_context->queue_index, header.remote_queue_index());
958 EXPECT_EQ(pi1_context->monotonic_event_time,
959 header_monotonic_remote_time);
960 EXPECT_EQ(pi1_context->realtime_event_time,
961 header_realtime_remote_time);
962 });
963 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700964
Austin Schuh4c3b9702020-08-30 11:34:55 -0700965 simulated_event_loop_factory.RunFor(chrono::seconds(10) -
966 chrono::milliseconds(500) +
967 chrono::milliseconds(5));
968
969 EXPECT_EQ(pi1_pong_counter.count(), 1001);
970 EXPECT_EQ(pi2_pong_counter.count(), 1001);
971
972 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 100);
973 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 100);
974 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 100);
975 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 100);
976 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 100);
977 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 100);
978 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 100);
979
Austin Schuh20ac95d2020-12-05 17:24:19 -0800980 EXPECT_EQ(pi1_server_statistics_count, 10);
981 EXPECT_EQ(pi2_server_statistics_count, 10);
982 EXPECT_EQ(pi3_server_statistics_count, 10);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700983
984 EXPECT_EQ(pi1_client_statistics_count, 95);
985 EXPECT_EQ(pi2_client_statistics_count, 95);
986 EXPECT_EQ(pi3_client_statistics_count, 95);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700987
988 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -0800989 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 1101);
990 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 1101);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700991}
992
993// Tests that an offset between nodes can be recovered and shows up in
994// ServerStatistics correctly.
995TEST(SimulatedEventLoopTest, MultinodePingPongWithOffset) {
996 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
Austin Schuh373f1762021-06-02 21:07:09 -0700997 aos::configuration::ReadConfig(ArtifactPath(
998 "aos/events/multinode_pingpong_test_combined_config.json"));
Austin Schuh4c3b9702020-08-30 11:34:55 -0700999 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
Austin Schuh87dd3832021-01-01 23:07:31 -08001000 const size_t pi1_index = configuration::GetNodeIndex(&config.message(), pi1);
1001 ASSERT_EQ(pi1_index, 0u);
Austin Schuh4c3b9702020-08-30 11:34:55 -07001002 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
Austin Schuh87dd3832021-01-01 23:07:31 -08001003 const size_t pi2_index = configuration::GetNodeIndex(&config.message(), pi2);
1004 ASSERT_EQ(pi2_index, 1u);
Austin Schuh4c3b9702020-08-30 11:34:55 -07001005 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
Austin Schuh87dd3832021-01-01 23:07:31 -08001006 const size_t pi3_index = configuration::GetNodeIndex(&config.message(), pi3);
1007 ASSERT_EQ(pi3_index, 2u);
Austin Schuh4c3b9702020-08-30 11:34:55 -07001008
Austin Schuh87dd3832021-01-01 23:07:31 -08001009 message_bridge::TestingTimeConverter time(
1010 configuration::NodesCount(&config.message()));
Austin Schuh4c3b9702020-08-30 11:34:55 -07001011 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
Austin Schuh58646e22021-08-23 23:51:46 -07001012 simulated_event_loop_factory.SetTimeConverter(&time);
Austin Schuh4c3b9702020-08-30 11:34:55 -07001013
1014 constexpr chrono::milliseconds kOffset{1501};
Austin Schuh87dd3832021-01-01 23:07:31 -08001015 time.AddNextTimestamp(
1016 distributed_clock::epoch(),
Austin Schuh58646e22021-08-23 23:51:46 -07001017 {BootTimestamp::epoch(), BootTimestamp::epoch() + kOffset,
1018 BootTimestamp::epoch()});
Austin Schuh4c3b9702020-08-30 11:34:55 -07001019
1020 std::unique_ptr<EventLoop> ping_event_loop =
1021 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
1022 Ping ping(ping_event_loop.get());
1023
1024 std::unique_ptr<EventLoop> pong_event_loop =
1025 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
1026 Pong pong(pong_event_loop.get());
1027
Austin Schuh8fb315a2020-11-19 22:33:58 -08001028 // Wait to let timestamp estimation start up before looking for the results.
1029 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
1030
Austin Schuh87dd3832021-01-01 23:07:31 -08001031 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
1032 simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
1033
Austin Schuh4c3b9702020-08-30 11:34:55 -07001034 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
1035 simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
1036
1037 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
1038 simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
1039
Austin Schuh4c3b9702020-08-30 11:34:55 -07001040 // Confirm the offsets are being recovered correctly.
1041 int pi1_server_statistics_count = 0;
1042 pi1_pong_counter_event_loop->MakeWatcher(
1043 "/pi1/aos", [&pi1_server_statistics_count,
1044 kOffset](const message_bridge::ServerStatistics &stats) {
1045 VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
1046 EXPECT_EQ(stats.connections()->size(), 2u);
1047 for (const message_bridge::ServerConnection *connection :
1048 *stats.connections()) {
1049 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001050 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -07001051 if (connection->node()->name()->string_view() == "pi2") {
1052 EXPECT_EQ(connection->monotonic_offset(),
1053 chrono::nanoseconds(kOffset).count());
1054 } else if (connection->node()->name()->string_view() == "pi3") {
1055 EXPECT_EQ(connection->monotonic_offset(), 0);
1056 } else {
1057 LOG(FATAL) << "Unknown connection";
1058 }
1059
1060 EXPECT_TRUE(connection->has_monotonic_offset());
1061 }
1062 ++pi1_server_statistics_count;
1063 });
1064
1065 int pi2_server_statistics_count = 0;
1066 pi2_pong_counter_event_loop->MakeWatcher(
1067 "/pi2/aos", [&pi2_server_statistics_count,
1068 kOffset](const message_bridge::ServerStatistics &stats) {
1069 VLOG(1) << "pi2 ServerStatistics " << FlatbufferToJson(&stats);
1070 EXPECT_EQ(stats.connections()->size(), 1u);
1071
1072 const message_bridge::ServerConnection *connection =
1073 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001074 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -07001075 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1076 EXPECT_TRUE(connection->has_monotonic_offset());
1077 EXPECT_EQ(connection->monotonic_offset(),
1078 -chrono::nanoseconds(kOffset).count());
1079 ++pi2_server_statistics_count;
1080 });
1081
1082 int pi3_server_statistics_count = 0;
1083 pi3_pong_counter_event_loop->MakeWatcher(
1084 "/pi3/aos", [&pi3_server_statistics_count](
1085 const message_bridge::ServerStatistics &stats) {
1086 VLOG(1) << "pi3 ServerStatistics " << FlatbufferToJson(&stats);
1087 EXPECT_EQ(stats.connections()->size(), 1u);
1088
1089 const message_bridge::ServerConnection *connection =
1090 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001091 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -07001092 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1093 EXPECT_TRUE(connection->has_monotonic_offset());
1094 EXPECT_EQ(connection->monotonic_offset(), 0);
1095 ++pi3_server_statistics_count;
1096 });
1097
1098 simulated_event_loop_factory.RunFor(chrono::seconds(10) -
1099 chrono::milliseconds(500) +
1100 chrono::milliseconds(5));
1101
Austin Schuh20ac95d2020-12-05 17:24:19 -08001102 EXPECT_EQ(pi1_server_statistics_count, 10);
Austin Schuh58646e22021-08-23 23:51:46 -07001103 EXPECT_EQ(pi2_server_statistics_count, 10);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001104 EXPECT_EQ(pi3_server_statistics_count, 10);
Austin Schuh4c3b9702020-08-30 11:34:55 -07001105}
1106
1107// Test that disabling statistics actually disables them.
Austin Schuh89c9b812021-02-20 14:42:10 -08001108TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeWithoutStatistics) {
Austin Schuh4c3b9702020-08-30 11:34:55 -07001109 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
1110 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
1111 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
1112
1113 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
1114 simulated_event_loop_factory.DisableStatistics();
1115
1116 std::unique_ptr<EventLoop> ping_event_loop =
1117 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
1118 Ping ping(ping_event_loop.get());
1119
1120 std::unique_ptr<EventLoop> pong_event_loop =
1121 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
1122 Pong pong(pong_event_loop.get());
1123
1124 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
1125 simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
1126
1127 MessageCounter<examples::Pong> pi2_pong_counter(
1128 pi2_pong_counter_event_loop.get(), "/test");
1129
1130 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
1131 simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
1132
1133 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
1134 simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
1135
1136 MessageCounter<examples::Pong> pi1_pong_counter(
1137 pi1_pong_counter_event_loop.get(), "/test");
1138
1139 // Count timestamps.
1140 MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
1141 pi1_pong_counter_event_loop.get(), "/pi1/aos");
1142 MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
1143 pi2_pong_counter_event_loop.get(), "/pi1/aos");
1144 MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
1145 pi3_pong_counter_event_loop.get(), "/pi1/aos");
1146 MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
1147 pi1_pong_counter_event_loop.get(), "/pi2/aos");
1148 MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
1149 pi2_pong_counter_event_loop.get(), "/pi2/aos");
1150 MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
1151 pi1_pong_counter_event_loop.get(), "/pi3/aos");
1152 MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
1153 pi3_pong_counter_event_loop.get(), "/pi3/aos");
1154
Austin Schuh2f8fd752020-09-01 22:38:28 -07001155 // Count remote timestamps
Austin Schuh89c9b812021-02-20 14:42:10 -08001156 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1157 remote_timestamps_pi2_on_pi1 =
1158 MakePi2OnPi1MessageCounters(pi1_pong_counter_event_loop.get());
1159 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1160 remote_timestamps_pi1_on_pi2 =
1161 MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
Austin Schuh2f8fd752020-09-01 22:38:28 -07001162
Austin Schuh4c3b9702020-08-30 11:34:55 -07001163 MessageCounter<message_bridge::ServerStatistics>
1164 pi1_server_statistics_counter(pi1_pong_counter_event_loop.get(),
1165 "/pi1/aos");
1166 MessageCounter<message_bridge::ServerStatistics>
1167 pi2_server_statistics_counter(pi2_pong_counter_event_loop.get(),
1168 "/pi2/aos");
1169 MessageCounter<message_bridge::ServerStatistics>
1170 pi3_server_statistics_counter(pi3_pong_counter_event_loop.get(),
1171 "/pi3/aos");
1172
1173 MessageCounter<message_bridge::ClientStatistics>
1174 pi1_client_statistics_counter(pi1_pong_counter_event_loop.get(),
1175 "/pi1/aos");
1176 MessageCounter<message_bridge::ClientStatistics>
1177 pi2_client_statistics_counter(pi2_pong_counter_event_loop.get(),
1178 "/pi2/aos");
1179 MessageCounter<message_bridge::ClientStatistics>
1180 pi3_client_statistics_counter(pi3_pong_counter_event_loop.get(),
1181 "/pi3/aos");
Austin Schuh898f4972020-01-11 17:21:25 -08001182
1183 simulated_event_loop_factory.RunFor(chrono::seconds(10) +
1184 chrono::milliseconds(5));
1185
Austin Schuh4c3b9702020-08-30 11:34:55 -07001186 EXPECT_EQ(pi1_pong_counter.count(), 1001u);
1187 EXPECT_EQ(pi2_pong_counter.count(), 1001u);
1188
1189 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 0u);
1190 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 0u);
1191 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 0u);
1192 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 0u);
1193 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 0u);
1194 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 0u);
1195 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 0u);
1196
1197 EXPECT_EQ(pi1_server_statistics_counter.count(), 0u);
1198 EXPECT_EQ(pi2_server_statistics_counter.count(), 0u);
1199 EXPECT_EQ(pi3_server_statistics_counter.count(), 0u);
1200
1201 EXPECT_EQ(pi1_client_statistics_counter.count(), 0u);
1202 EXPECT_EQ(pi2_client_statistics_counter.count(), 0u);
1203 EXPECT_EQ(pi3_client_statistics_counter.count(), 0u);
Austin Schuh2f8fd752020-09-01 22:38:28 -07001204
1205 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001206 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 1001);
1207 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 1001);
Austin Schuh898f4972020-01-11 17:21:25 -08001208}
1209
Austin Schuhc0b0f722020-12-12 18:36:06 -08001210bool AllConnected(const message_bridge::ServerStatistics *server_statistics) {
1211 for (const message_bridge::ServerConnection *connection :
1212 *server_statistics->connections()) {
1213 if (connection->state() != message_bridge::State::CONNECTED) {
1214 return false;
1215 }
1216 }
1217 return true;
1218}
1219
1220bool AllConnectedBut(const message_bridge::ServerStatistics *server_statistics,
1221 std::string_view target) {
1222 for (const message_bridge::ServerConnection *connection :
1223 *server_statistics->connections()) {
1224 if (connection->node()->name()->string_view() == target) {
1225 if (connection->state() == message_bridge::State::CONNECTED) {
1226 return false;
1227 }
1228 } else {
1229 if (connection->state() != message_bridge::State::CONNECTED) {
1230 return false;
1231 }
1232 }
1233 }
1234 return true;
1235}
1236
1237bool AllConnected(const message_bridge::ClientStatistics *client_statistics) {
1238 for (const message_bridge::ClientConnection *connection :
1239 *client_statistics->connections()) {
1240 if (connection->state() != message_bridge::State::CONNECTED) {
1241 return false;
1242 }
Austin Schuh367a7f42021-11-23 23:04:36 -08001243 EXPECT_TRUE(connection->has_boot_uuid());
1244 EXPECT_TRUE(connection->has_connected_since_time());
1245 EXPECT_TRUE(connection->has_connection_count());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001246 }
1247 return true;
1248}
1249
1250bool AllConnectedBut(const message_bridge::ClientStatistics *client_statistics,
1251 std::string_view target) {
1252 for (const message_bridge::ClientConnection *connection :
1253 *client_statistics->connections()) {
1254 if (connection->node()->name()->string_view() == target) {
1255 if (connection->state() == message_bridge::State::CONNECTED) {
1256 return false;
1257 }
Austin Schuh367a7f42021-11-23 23:04:36 -08001258 EXPECT_FALSE(connection->has_boot_uuid());
1259 EXPECT_FALSE(connection->has_connected_since_time());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001260 } else {
1261 if (connection->state() != message_bridge::State::CONNECTED) {
1262 return false;
1263 }
Austin Schuh367a7f42021-11-23 23:04:36 -08001264 EXPECT_TRUE(connection->has_boot_uuid());
1265 EXPECT_TRUE(connection->has_connected_since_time());
1266 EXPECT_TRUE(connection->has_connection_count());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001267 }
1268 }
1269 return true;
1270}
1271
Austin Schuh367a7f42021-11-23 23:04:36 -08001272int ConnectedCount(const message_bridge::ClientStatistics *client_statistics,
1273 std::string_view target) {
1274 for (const message_bridge::ClientConnection *connection :
1275 *client_statistics->connections()) {
1276 if (connection->node()->name()->string_view() == target) {
1277 return connection->connection_count();
1278 }
1279 }
1280 return 0;
1281}
1282
1283int ConnectedCount(const message_bridge::ServerStatistics *server_statistics,
1284 std::string_view target) {
1285 for (const message_bridge::ServerConnection *connection :
1286 *server_statistics->connections()) {
1287 if (connection->node()->name()->string_view() == target) {
1288 return connection->connection_count();
1289 }
1290 }
1291 return 0;
1292}
1293
Austin Schuhc0b0f722020-12-12 18:36:06 -08001294// Test that disconnecting nodes actually disconnects them.
Austin Schuh89c9b812021-02-20 14:42:10 -08001295TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeDisconnect) {
Austin Schuhc0b0f722020-12-12 18:36:06 -08001296 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
1297
Austin Schuh58646e22021-08-23 23:51:46 -07001298 NodeEventLoopFactory *pi1 =
1299 simulated_event_loop_factory.GetNodeEventLoopFactory("pi1");
1300 NodeEventLoopFactory *pi2 =
1301 simulated_event_loop_factory.GetNodeEventLoopFactory("pi2");
1302 NodeEventLoopFactory *pi3 =
1303 simulated_event_loop_factory.GetNodeEventLoopFactory("pi3");
1304
1305 std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001306 Ping ping(ping_event_loop.get());
1307
Austin Schuh58646e22021-08-23 23:51:46 -07001308 std::unique_ptr<EventLoop> pong_event_loop = pi2->MakeEventLoop("pong");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001309 Pong pong(pong_event_loop.get());
1310
1311 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
Austin Schuh58646e22021-08-23 23:51:46 -07001312 pi2->MakeEventLoop("pi2_pong_counter");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001313
1314 MessageCounter<examples::Pong> pi2_pong_counter(
1315 pi2_pong_counter_event_loop.get(), "/test");
1316
1317 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
Austin Schuh58646e22021-08-23 23:51:46 -07001318 pi3->MakeEventLoop("pi3_pong_counter");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001319
1320 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
Austin Schuh58646e22021-08-23 23:51:46 -07001321 pi1->MakeEventLoop("pi1_pong_counter");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001322
1323 MessageCounter<examples::Pong> pi1_pong_counter(
1324 pi1_pong_counter_event_loop.get(), "/test");
1325
1326 // Count timestamps.
1327 MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
1328 pi1_pong_counter_event_loop.get(), "/pi1/aos");
1329 MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
1330 pi2_pong_counter_event_loop.get(), "/pi1/aos");
1331 MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
1332 pi3_pong_counter_event_loop.get(), "/pi1/aos");
1333 MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
1334 pi1_pong_counter_event_loop.get(), "/pi2/aos");
1335 MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
1336 pi2_pong_counter_event_loop.get(), "/pi2/aos");
1337 MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
1338 pi1_pong_counter_event_loop.get(), "/pi3/aos");
1339 MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
1340 pi3_pong_counter_event_loop.get(), "/pi3/aos");
1341
1342 // Count remote timestamps
Austin Schuh89c9b812021-02-20 14:42:10 -08001343 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1344 remote_timestamps_pi2_on_pi1 =
1345 MakePi2OnPi1MessageCounters(pi1_pong_counter_event_loop.get());
1346 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1347 remote_timestamps_pi1_on_pi2 =
1348 MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001349
1350 MessageCounter<message_bridge::ServerStatistics>
Austin Schuh58646e22021-08-23 23:51:46 -07001351 *pi1_server_statistics_counter;
1352 pi1->OnStartup([pi1, &pi1_server_statistics_counter]() {
1353 pi1_server_statistics_counter =
1354 pi1->AlwaysStart<MessageCounter<message_bridge::ServerStatistics>>(
1355 "pi1_server_statistics_counter", "/pi1/aos");
1356 });
1357
Austin Schuhc0b0f722020-12-12 18:36:06 -08001358 aos::Fetcher<message_bridge::ServerStatistics> pi1_server_statistics_fetcher =
1359 pi1_pong_counter_event_loop
1360 ->MakeFetcher<message_bridge::ServerStatistics>("/pi1/aos");
1361 aos::Fetcher<message_bridge::ClientStatistics> pi1_client_statistics_fetcher =
1362 pi1_pong_counter_event_loop
1363 ->MakeFetcher<message_bridge::ClientStatistics>("/pi1/aos");
1364
1365 MessageCounter<message_bridge::ServerStatistics>
Austin Schuh58646e22021-08-23 23:51:46 -07001366 *pi2_server_statistics_counter;
1367 pi2->OnStartup([pi2, &pi2_server_statistics_counter]() {
1368 pi2_server_statistics_counter =
1369 pi2->AlwaysStart<MessageCounter<message_bridge::ServerStatistics>>(
1370 "pi2_server_statistics_counter", "/pi2/aos");
1371 });
Austin Schuhc0b0f722020-12-12 18:36:06 -08001372 aos::Fetcher<message_bridge::ServerStatistics> pi2_server_statistics_fetcher =
1373 pi2_pong_counter_event_loop
1374 ->MakeFetcher<message_bridge::ServerStatistics>("/pi2/aos");
1375 aos::Fetcher<message_bridge::ClientStatistics> pi2_client_statistics_fetcher =
1376 pi2_pong_counter_event_loop
1377 ->MakeFetcher<message_bridge::ClientStatistics>("/pi2/aos");
1378
1379 MessageCounter<message_bridge::ServerStatistics>
Austin Schuh58646e22021-08-23 23:51:46 -07001380 *pi3_server_statistics_counter;
1381 pi3->OnStartup([pi3, &pi3_server_statistics_counter]() {
1382 pi3_server_statistics_counter =
1383 pi3->AlwaysStart<MessageCounter<message_bridge::ServerStatistics>>(
1384 "pi3_server_statistics_counter", "/pi3/aos");
1385 });
Austin Schuhc0b0f722020-12-12 18:36:06 -08001386 aos::Fetcher<message_bridge::ServerStatistics> pi3_server_statistics_fetcher =
1387 pi3_pong_counter_event_loop
1388 ->MakeFetcher<message_bridge::ServerStatistics>("/pi3/aos");
1389 aos::Fetcher<message_bridge::ClientStatistics> pi3_client_statistics_fetcher =
1390 pi3_pong_counter_event_loop
1391 ->MakeFetcher<message_bridge::ClientStatistics>("/pi3/aos");
1392
1393 MessageCounter<message_bridge::ClientStatistics>
1394 pi1_client_statistics_counter(pi1_pong_counter_event_loop.get(),
1395 "/pi1/aos");
1396 MessageCounter<message_bridge::ClientStatistics>
1397 pi2_client_statistics_counter(pi2_pong_counter_event_loop.get(),
1398 "/pi2/aos");
1399 MessageCounter<message_bridge::ClientStatistics>
1400 pi3_client_statistics_counter(pi3_pong_counter_event_loop.get(),
1401 "/pi3/aos");
1402
James Kuszmaul86e86c32022-07-21 17:39:47 -07001403 std::vector<std::unique_ptr<aos::EventLoop>> statistics_watcher_loops;
1404 statistics_watcher_loops.emplace_back(pi1->MakeEventLoop("test"));
1405 statistics_watcher_loops.emplace_back(pi2->MakeEventLoop("test"));
1406 statistics_watcher_loops.emplace_back(pi3->MakeEventLoop("test"));
1407 // The currenct contract is that, if all nodes boot simultaneously in
1408 // simulation, that they should all act as if they area already connected,
1409 // without ever observing the transition from disconnected to connected (note
1410 // that on a real system the ServerStatistics message will get resent for each
1411 // and every new connection, even if the new connections happen
1412 // "simultaneously"--in simulation, we are essentially acting as if we are
1413 // starting execution in an already running system, rather than observing the
1414 // boot process).
1415 for (auto &event_loop : statistics_watcher_loops) {
1416 event_loop->MakeWatcher(
1417 "/aos", [](const message_bridge::ServerStatistics &msg) {
1418 for (const message_bridge::ServerConnection *connection :
1419 *msg.connections()) {
1420 EXPECT_EQ(message_bridge::State::CONNECTED, connection->state())
1421 << connection->node()->name()->string_view();
1422 }
1423 });
1424 }
1425
Austin Schuhc0b0f722020-12-12 18:36:06 -08001426 simulated_event_loop_factory.RunFor(chrono::seconds(2) +
1427 chrono::milliseconds(5));
1428
James Kuszmaul86e86c32022-07-21 17:39:47 -07001429 statistics_watcher_loops.clear();
1430
Austin Schuhc0b0f722020-12-12 18:36:06 -08001431 EXPECT_EQ(pi1_pong_counter.count(), 201u);
1432 EXPECT_EQ(pi2_pong_counter.count(), 201u);
1433
1434 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 20u);
1435 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 20u);
1436 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 20u);
1437 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 20u);
1438 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 20u);
1439 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 20u);
1440 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 20u);
1441
Austin Schuh58646e22021-08-23 23:51:46 -07001442 EXPECT_EQ(pi1_server_statistics_counter->count(), 2u);
1443 EXPECT_EQ(pi2_server_statistics_counter->count(), 2u);
1444 EXPECT_EQ(pi3_server_statistics_counter->count(), 2u);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001445
1446 EXPECT_EQ(pi1_client_statistics_counter.count(), 20u);
1447 EXPECT_EQ(pi2_client_statistics_counter.count(), 20u);
1448 EXPECT_EQ(pi3_client_statistics_counter.count(), 20u);
1449
1450 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001451 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 221);
1452 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 221);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001453
1454 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1455 EXPECT_TRUE(AllConnected(pi1_server_statistics_fetcher.get()))
1456 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1457 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1458 EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
1459 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1460 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1461 EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
1462 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1463 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1464 EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
1465 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1466 EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
1467 EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
1468 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
1469 EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
1470 EXPECT_TRUE(AllConnected(pi3_client_statistics_fetcher.get()))
1471 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1472
Austin Schuh58646e22021-08-23 23:51:46 -07001473 pi1->Disconnect(pi3->node());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001474
1475 simulated_event_loop_factory.RunFor(chrono::seconds(2));
1476
1477 EXPECT_EQ(pi1_pong_counter.count(), 401u);
1478 EXPECT_EQ(pi2_pong_counter.count(), 401u);
1479
1480 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 40u);
1481 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 40u);
1482 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 20u);
1483 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 40u);
1484 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 40u);
1485 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 40u);
1486 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 40u);
1487
Austin Schuh58646e22021-08-23 23:51:46 -07001488 EXPECT_EQ(pi1_server_statistics_counter->count(), 4u);
1489 EXPECT_EQ(pi2_server_statistics_counter->count(), 4u);
1490 EXPECT_EQ(pi3_server_statistics_counter->count(), 4u);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001491
1492 EXPECT_EQ(pi1_client_statistics_counter.count(), 40u);
1493 EXPECT_EQ(pi2_client_statistics_counter.count(), 40u);
1494 EXPECT_EQ(pi3_client_statistics_counter.count(), 40u);
1495
1496 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001497 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 441);
1498 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 441);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001499
1500 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1501 EXPECT_TRUE(AllConnectedBut(pi1_server_statistics_fetcher.get(), "pi3"))
1502 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1503 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1504 EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
1505 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1506 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1507 EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
1508 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1509 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1510 EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
1511 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1512 EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
1513 EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
1514 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
1515 EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
1516 EXPECT_TRUE(AllConnectedBut(pi3_client_statistics_fetcher.get(), "pi1"))
1517 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1518
Austin Schuh58646e22021-08-23 23:51:46 -07001519 pi1->Connect(pi3->node());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001520
1521 simulated_event_loop_factory.RunFor(chrono::seconds(2));
1522
Austin Schuh367a7f42021-11-23 23:04:36 -08001523 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1524 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1525 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1526 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1527 EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
1528 EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
1529
1530 EXPECT_EQ(ConnectedCount(pi1_server_statistics_fetcher.get(), "pi3"), 2u)
1531 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1532 EXPECT_EQ(ConnectedCount(pi1_server_statistics_fetcher.get(), "pi2"), 1u)
1533 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1534 EXPECT_EQ(ConnectedCount(pi1_client_statistics_fetcher.get(), "pi3"), 1u)
1535 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1536 EXPECT_EQ(ConnectedCount(pi1_client_statistics_fetcher.get(), "pi2"), 1u)
1537 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1538
1539 EXPECT_EQ(ConnectedCount(pi2_server_statistics_fetcher.get(), "pi1"), 1u)
1540 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1541 EXPECT_EQ(ConnectedCount(pi2_client_statistics_fetcher.get(), "pi1"), 1u)
1542 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1543
1544 EXPECT_EQ(ConnectedCount(pi3_server_statistics_fetcher.get(), "pi1"), 1u)
1545 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
1546 EXPECT_EQ(ConnectedCount(pi3_client_statistics_fetcher.get(), "pi1"), 2u)
1547 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1548
Austin Schuhc0b0f722020-12-12 18:36:06 -08001549 EXPECT_EQ(pi1_pong_counter.count(), 601u);
1550 EXPECT_EQ(pi2_pong_counter.count(), 601u);
1551
1552 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 60u);
1553 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 60u);
1554 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 40u);
1555 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 60u);
1556 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 60u);
1557 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 60u);
1558 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 60u);
1559
Austin Schuh58646e22021-08-23 23:51:46 -07001560 EXPECT_EQ(pi1_server_statistics_counter->count(), 6u);
1561 EXPECT_EQ(pi2_server_statistics_counter->count(), 6u);
1562 EXPECT_EQ(pi3_server_statistics_counter->count(), 6u);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001563
1564 EXPECT_EQ(pi1_client_statistics_counter.count(), 60u);
1565 EXPECT_EQ(pi2_client_statistics_counter.count(), 60u);
1566 EXPECT_EQ(pi3_client_statistics_counter.count(), 60u);
1567
1568 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001569 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 661);
1570 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 661);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001571
Austin Schuhc0b0f722020-12-12 18:36:06 -08001572 EXPECT_TRUE(AllConnected(pi1_server_statistics_fetcher.get()))
1573 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001574 EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
1575 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001576 EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
1577 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001578 EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
1579 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001580 EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
1581 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001582 EXPECT_TRUE(AllConnected(pi3_client_statistics_fetcher.get()))
1583 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1584}
1585
Austin Schuh2febf0d2020-09-21 22:24:30 -07001586// Tests that the time offset having a slope doesn't break the world.
1587// SimulatedMessageBridge has enough self consistency CHECK statements to
1588// confirm, and we can can also check a message in each direction to make sure
1589// it gets delivered as expected.
1590TEST(SimulatedEventLoopTest, MultinodePingPongWithOffsetAndSlope) {
1591 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
Austin Schuh373f1762021-06-02 21:07:09 -07001592 aos::configuration::ReadConfig(ArtifactPath(
1593 "aos/events/multinode_pingpong_test_combined_config.json"));
Austin Schuh2febf0d2020-09-21 22:24:30 -07001594 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
Austin Schuh87dd3832021-01-01 23:07:31 -08001595 const size_t pi1_index = configuration::GetNodeIndex(&config.message(), pi1);
1596 ASSERT_EQ(pi1_index, 0u);
Austin Schuh2febf0d2020-09-21 22:24:30 -07001597 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
Austin Schuh87dd3832021-01-01 23:07:31 -08001598 const size_t pi2_index = configuration::GetNodeIndex(&config.message(), pi2);
1599 ASSERT_EQ(pi2_index, 1u);
1600 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
1601 const size_t pi3_index = configuration::GetNodeIndex(&config.message(), pi3);
1602 ASSERT_EQ(pi3_index, 2u);
Austin Schuh2febf0d2020-09-21 22:24:30 -07001603
Austin Schuh87dd3832021-01-01 23:07:31 -08001604 message_bridge::TestingTimeConverter time(
1605 configuration::NodesCount(&config.message()));
Austin Schuh2febf0d2020-09-21 22:24:30 -07001606 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
Austin Schuh58646e22021-08-23 23:51:46 -07001607 simulated_event_loop_factory.SetTimeConverter(&time);
Austin Schuh2febf0d2020-09-21 22:24:30 -07001608
Austin Schuh2febf0d2020-09-21 22:24:30 -07001609 constexpr chrono::milliseconds kOffset{150100};
Austin Schuh87dd3832021-01-01 23:07:31 -08001610 time.AddNextTimestamp(
1611 distributed_clock::epoch(),
Austin Schuh58646e22021-08-23 23:51:46 -07001612 {BootTimestamp::epoch(), BootTimestamp::epoch() + kOffset,
1613 BootTimestamp::epoch()});
1614 time.AddNextTimestamp(distributed_clock::epoch() + chrono::seconds(10),
1615 {BootTimestamp::epoch() + chrono::milliseconds(9999),
1616 BootTimestamp::epoch() + kOffset + chrono::seconds(10),
1617 BootTimestamp::epoch() + chrono::milliseconds(9999)});
Austin Schuh2febf0d2020-09-21 22:24:30 -07001618
1619 std::unique_ptr<EventLoop> ping_event_loop =
1620 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
1621 Ping ping(ping_event_loop.get());
1622
1623 std::unique_ptr<EventLoop> pong_event_loop =
1624 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
1625 Pong pong(pong_event_loop.get());
1626
1627 std::unique_ptr<EventLoop> pi1_counter_event_loop =
1628 simulated_event_loop_factory.MakeEventLoop("pi1_counter", pi1);
1629 std::unique_ptr<EventLoop> pi2_counter_event_loop =
1630 simulated_event_loop_factory.MakeEventLoop("pi2_counter", pi2);
1631
1632 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
1633 pi1_counter_event_loop->MakeFetcher<examples::Ping>("/test");
1634 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
1635 pi2_counter_event_loop->MakeFetcher<examples::Ping>("/test");
1636
1637 aos::Fetcher<examples::Pong> pong_on_pi2_fetcher =
1638 pi2_counter_event_loop->MakeFetcher<examples::Pong>("/test");
1639 aos::Fetcher<examples::Pong> pong_on_pi1_fetcher =
1640 pi1_counter_event_loop->MakeFetcher<examples::Pong>("/test");
1641
1642 // End after a pong message comes back. This will leave the latest messages
1643 // on all channels so we can look at timestamps easily and check they make
1644 // sense.
1645 std::unique_ptr<EventLoop> pi1_pong_ender =
1646 simulated_event_loop_factory.MakeEventLoop("pi2_counter", pi1);
1647 int count = 0;
1648 pi1_pong_ender->MakeWatcher(
1649 "/test", [&simulated_event_loop_factory, &count](const examples::Pong &) {
1650 if (++count == 100) {
1651 simulated_event_loop_factory.Exit();
1652 }
1653 });
1654
1655 // Run enough that messages should be delivered.
1656 simulated_event_loop_factory.Run();
1657
1658 // Grab the latest messages.
1659 EXPECT_TRUE(ping_on_pi1_fetcher.Fetch());
1660 EXPECT_TRUE(ping_on_pi2_fetcher.Fetch());
1661 EXPECT_TRUE(pong_on_pi1_fetcher.Fetch());
1662 EXPECT_TRUE(pong_on_pi2_fetcher.Fetch());
1663
1664 // Compute their time on the global distributed clock so we can compute
1665 // distance betwen them.
1666 const distributed_clock::time_point pi1_ping_time =
1667 simulated_event_loop_factory.GetNodeEventLoopFactory(pi1)
1668 ->ToDistributedClock(
1669 ping_on_pi1_fetcher.context().monotonic_event_time);
1670 const distributed_clock::time_point pi2_ping_time =
1671 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
1672 ->ToDistributedClock(
1673 ping_on_pi2_fetcher.context().monotonic_event_time);
1674 const distributed_clock::time_point pi1_pong_time =
1675 simulated_event_loop_factory.GetNodeEventLoopFactory(pi1)
1676 ->ToDistributedClock(
1677 pong_on_pi1_fetcher.context().monotonic_event_time);
1678 const distributed_clock::time_point pi2_pong_time =
1679 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
1680 ->ToDistributedClock(
1681 pong_on_pi2_fetcher.context().monotonic_event_time);
1682
1683 // And confirm the delivery delay is just about exactly 150 uS for both
1684 // directions like expected. There will be a couple ns of rounding errors in
1685 // the conversion functions that aren't worth accounting for right now. This
1686 // will either be really close, or really far.
1687 EXPECT_GE(pi2_ping_time, chrono::microseconds(150) - chrono::nanoseconds(10) +
1688 pi1_ping_time);
1689 EXPECT_LE(pi2_ping_time, chrono::microseconds(150) + chrono::nanoseconds(10) +
1690 pi1_ping_time);
1691
1692 EXPECT_GE(pi1_pong_time, chrono::microseconds(150) - chrono::nanoseconds(10) +
1693 pi2_pong_time);
1694 EXPECT_LE(pi1_pong_time, chrono::microseconds(150) + chrono::nanoseconds(10) +
1695 pi2_pong_time);
1696}
1697
Austin Schuh4c570ea2020-11-19 23:13:24 -08001698void SendPing(aos::Sender<examples::Ping> *sender, int value) {
1699 aos::Sender<examples::Ping>::Builder builder = sender->MakeBuilder();
1700 examples::Ping::Builder ping_builder = builder.MakeBuilder<examples::Ping>();
1701 ping_builder.add_value(value);
milind1f1dca32021-07-03 13:50:07 -07001702 builder.CheckOk(builder.Send(ping_builder.Finish()));
Austin Schuh4c570ea2020-11-19 23:13:24 -08001703}
1704
1705// Tests that reliable (and unreliable) ping messages get forwarded as expected.
Austin Schuh89c9b812021-02-20 14:42:10 -08001706TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeStartupTesting) {
Austin Schuh4c570ea2020-11-19 23:13:24 -08001707 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
1708 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
1709
1710 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
1711
1712 std::unique_ptr<EventLoop> ping_event_loop =
1713 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
1714 aos::Sender<examples::Ping> pi1_reliable_sender =
1715 ping_event_loop->MakeSender<examples::Ping>("/reliable");
1716 aos::Sender<examples::Ping> pi1_unreliable_sender =
1717 ping_event_loop->MakeSender<examples::Ping>("/unreliable");
1718 SendPing(&pi1_reliable_sender, 1);
1719 SendPing(&pi1_unreliable_sender, 1);
1720
1721 std::unique_ptr<EventLoop> pi2_pong_event_loop =
1722 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
James Kuszmaul86e86c32022-07-21 17:39:47 -07001723 aos::Sender<examples::Ping> pi2_reliable_sender =
1724 pi2_pong_event_loop->MakeSender<examples::Ping>("/reliable2");
1725 SendPing(&pi2_reliable_sender, 1);
Austin Schuh4c570ea2020-11-19 23:13:24 -08001726 MessageCounter<examples::Ping> pi2_reliable_counter(pi2_pong_event_loop.get(),
1727 "/reliable");
James Kuszmaul86e86c32022-07-21 17:39:47 -07001728 MessageCounter<examples::Ping> pi1_reliable_counter(ping_event_loop.get(),
1729 "/reliable2");
Austin Schuh4c570ea2020-11-19 23:13:24 -08001730 MessageCounter<examples::Ping> pi2_unreliable_counter(
1731 pi2_pong_event_loop.get(), "/unreliable");
1732 aos::Fetcher<examples::Ping> reliable_on_pi2_fetcher =
1733 pi2_pong_event_loop->MakeFetcher<examples::Ping>("/reliable");
1734 aos::Fetcher<examples::Ping> unreliable_on_pi2_fetcher =
1735 pi2_pong_event_loop->MakeFetcher<examples::Ping>("/unreliable");
1736
1737 const size_t reliable_channel_index = configuration::ChannelIndex(
1738 pi2_pong_event_loop->configuration(), reliable_on_pi2_fetcher.channel());
1739
1740 std::unique_ptr<EventLoop> pi1_remote_timestamp =
1741 simulated_event_loop_factory.MakeEventLoop("pi1_remote_timestamp", pi1);
1742
Austin Schuheeaa2022021-01-02 21:52:03 -08001743 const chrono::nanoseconds network_delay =
1744 simulated_event_loop_factory.network_delay();
1745
Austin Schuh4c570ea2020-11-19 23:13:24 -08001746 int reliable_timestamp_count = 0;
1747 pi1_remote_timestamp->MakeWatcher(
Austin Schuh89c9b812021-02-20 14:42:10 -08001748 shared() ? "/pi1/aos/remote_timestamps/pi2"
1749 : "/pi1/aos/remote_timestamps/pi2/reliable/aos-examples-Ping",
Austin Schuh20ac95d2020-12-05 17:24:19 -08001750 [reliable_channel_index, &reliable_timestamp_count,
Austin Schuheeaa2022021-01-02 21:52:03 -08001751 &simulated_event_loop_factory, pi2, network_delay, &pi2_pong_event_loop,
1752 &pi1_remote_timestamp](const RemoteMessage &header) {
Austin Schuh20ac95d2020-12-05 17:24:19 -08001753 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuhcdd90272021-03-15 12:46:16 -07001754 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
Austin Schuh20ac95d2020-12-05 17:24:19 -08001755 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
Austin Schuhcdd90272021-03-15 12:46:16 -07001756 ->boot_uuid());
Austin Schuh4c570ea2020-11-19 23:13:24 -08001757 VLOG(1) << aos::FlatbufferToJson(&header);
1758 if (header.channel_index() == reliable_channel_index) {
1759 ++reliable_timestamp_count;
1760 }
Austin Schuheeaa2022021-01-02 21:52:03 -08001761
1762 const aos::monotonic_clock::time_point header_monotonic_sent_time(
1763 chrono::nanoseconds(header.monotonic_sent_time()));
1764
1765 EXPECT_EQ(pi1_remote_timestamp->context().monotonic_event_time,
1766 header_monotonic_sent_time + network_delay +
1767 (pi1_remote_timestamp->monotonic_now() -
1768 pi2_pong_event_loop->monotonic_now()));
Austin Schuh4c570ea2020-11-19 23:13:24 -08001769 });
1770
1771 // Wait to let timestamp estimation start up before looking for the results.
1772 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
1773
1774 EXPECT_EQ(pi2_reliable_counter.count(), 1u);
1775 // This one isn't reliable, but was sent before the start. It should *not* be
1776 // delivered.
1777 EXPECT_EQ(pi2_unreliable_counter.count(), 0u);
1778 // Confirm we got a timestamp logged for the message that was forwarded.
1779 EXPECT_EQ(reliable_timestamp_count, 1u);
1780
1781 SendPing(&pi1_reliable_sender, 2);
1782 SendPing(&pi1_unreliable_sender, 2);
1783 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
1784 EXPECT_EQ(pi2_reliable_counter.count(), 2u);
James Kuszmaul86e86c32022-07-21 17:39:47 -07001785 EXPECT_EQ(pi1_reliable_counter.count(), 1u);
Austin Schuh4c570ea2020-11-19 23:13:24 -08001786 EXPECT_EQ(pi2_unreliable_counter.count(), 1u);
1787
1788 EXPECT_EQ(reliable_timestamp_count, 2u);
1789}
1790
Austin Schuh20ac95d2020-12-05 17:24:19 -08001791// Tests that rebooting a node changes the ServerStatistics message and the
1792// RemoteTimestamp message.
Austin Schuh89c9b812021-02-20 14:42:10 -08001793TEST_P(RemoteMessageSimulatedEventLoopTest, BootUUIDTest) {
Austin Schuh72e65682021-09-02 11:37:05 -07001794 const UUID pi1_boot0 = UUID::Random();
1795 const UUID pi2_boot0 = UUID::Random();
1796 const UUID pi2_boot1 = UUID::Random();
1797 const UUID pi3_boot0 = UUID::Random();
1798 UUID expected_boot_uuid = pi2_boot0;
James Kuszmaul6d6b2282024-05-22 09:51:15 -07001799 int boot_number = 0;
1800 monotonic_clock::time_point expected_connection_time;
Austin Schuh20ac95d2020-12-05 17:24:19 -08001801
Austin Schuh58646e22021-08-23 23:51:46 -07001802 message_bridge::TestingTimeConverter time(
1803 configuration::NodesCount(&config.message()));
1804 SimulatedEventLoopFactory factory(&config.message());
1805 factory.SetTimeConverter(&time);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001806
Austin Schuh58646e22021-08-23 23:51:46 -07001807 const size_t pi1_index =
1808 configuration::GetNodeIndex(&config.message(), "pi1");
1809 const size_t pi2_index =
1810 configuration::GetNodeIndex(&config.message(), "pi2");
1811 const size_t pi3_index =
1812 configuration::GetNodeIndex(&config.message(), "pi3");
Austin Schuh20ac95d2020-12-05 17:24:19 -08001813
Austin Schuh58646e22021-08-23 23:51:46 -07001814 {
1815 time.AddNextTimestamp(distributed_clock::epoch(),
1816 {BootTimestamp::epoch(), BootTimestamp::epoch(),
1817 BootTimestamp::epoch()});
1818
1819 const chrono::nanoseconds dt = chrono::milliseconds(2001);
1820
1821 time.AddNextTimestamp(
1822 distributed_clock::epoch() + dt,
1823 {BootTimestamp::epoch() + dt,
1824 BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
1825 BootTimestamp::epoch() + dt});
1826
1827 time.set_boot_uuid(pi1_index, 0, pi1_boot0);
1828 time.set_boot_uuid(pi2_index, 0, pi2_boot0);
1829 time.set_boot_uuid(pi2_index, 1, pi2_boot1);
1830 time.set_boot_uuid(pi3_index, 0, pi3_boot0);
1831 }
1832
1833 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1834 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
1835
1836 pi1->OnStartup([pi1]() { pi1->AlwaysStart<Ping>("ping"); });
1837 pi2->OnStartup([pi2]() { pi2->AlwaysStart<Pong>("pong"); });
Austin Schuh20ac95d2020-12-05 17:24:19 -08001838
1839 std::unique_ptr<EventLoop> pi1_remote_timestamp =
Austin Schuh58646e22021-08-23 23:51:46 -07001840 pi1->MakeEventLoop("pi1_remote_timestamp");
Austin Schuh20ac95d2020-12-05 17:24:19 -08001841
1842 int timestamp_count = 0;
1843 pi1_remote_timestamp->MakeWatcher(
Austin Schuh8902fa52021-03-14 22:39:24 -07001844 "/pi2/aos", [&expected_boot_uuid,
1845 &pi1_remote_timestamp](const message_bridge::Timestamp &) {
Austin Schuha9012be2021-07-21 15:19:11 -07001846 EXPECT_EQ(pi1_remote_timestamp->context().source_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -07001847 expected_boot_uuid);
1848 });
1849 pi1_remote_timestamp->MakeWatcher(
1850 "/test",
1851 [&expected_boot_uuid, &pi1_remote_timestamp](const examples::Pong &) {
Austin Schuha9012be2021-07-21 15:19:11 -07001852 EXPECT_EQ(pi1_remote_timestamp->context().source_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -07001853 expected_boot_uuid);
1854 });
1855 pi1_remote_timestamp->MakeWatcher(
Austin Schuh89c9b812021-02-20 14:42:10 -08001856 shared() ? "/pi1/aos/remote_timestamps/pi2"
1857 : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping",
Austin Schuh20ac95d2020-12-05 17:24:19 -08001858 [&timestamp_count, &expected_boot_uuid](const RemoteMessage &header) {
1859 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuhcdd90272021-03-15 12:46:16 -07001860 EXPECT_EQ(UUID::FromVector(header.boot_uuid()), expected_boot_uuid);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001861 VLOG(1) << aos::FlatbufferToJson(&header);
1862 ++timestamp_count;
1863 });
1864
1865 int pi1_server_statistics_count = 0;
Austin Schuh58646e22021-08-23 23:51:46 -07001866 bool first_pi1_server_statistics = true;
James Kuszmaul6d6b2282024-05-22 09:51:15 -07001867 expected_connection_time = pi1->monotonic_now();
Austin Schuh20ac95d2020-12-05 17:24:19 -08001868 pi1_remote_timestamp->MakeWatcher(
Austin Schuh367a7f42021-11-23 23:04:36 -08001869 "/pi1/aos",
1870 [&pi1_server_statistics_count, &expected_boot_uuid,
1871 &expected_connection_time, &first_pi1_server_statistics,
1872 &boot_number](const message_bridge::ServerStatistics &stats) {
Austin Schuh20ac95d2020-12-05 17:24:19 -08001873 VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
1874 for (const message_bridge::ServerConnection *connection :
1875 *stats.connections()) {
Austin Schuh58646e22021-08-23 23:51:46 -07001876 if (connection->state() == message_bridge::State::CONNECTED) {
1877 ASSERT_TRUE(connection->has_boot_uuid());
1878 }
1879 if (!first_pi1_server_statistics) {
1880 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1881 }
Austin Schuh20ac95d2020-12-05 17:24:19 -08001882 if (connection->node()->name()->string_view() == "pi2") {
Austin Schuh58646e22021-08-23 23:51:46 -07001883 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1884 ASSERT_TRUE(connection->has_boot_uuid());
Austin Schuh20ac95d2020-12-05 17:24:19 -08001885 EXPECT_EQ(expected_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -07001886 UUID::FromString(connection->boot_uuid()))
Austin Schuh20ac95d2020-12-05 17:24:19 -08001887 << " : Got " << aos::FlatbufferToJson(&stats);
Austin Schuh367a7f42021-11-23 23:04:36 -08001888 EXPECT_EQ(monotonic_clock::time_point(chrono::nanoseconds(
1889 connection->connected_since_time())),
1890 expected_connection_time);
1891 EXPECT_EQ(boot_number + 1, connection->connection_count());
Austin Schuh20ac95d2020-12-05 17:24:19 -08001892 ++pi1_server_statistics_count;
1893 }
1894 }
Austin Schuh58646e22021-08-23 23:51:46 -07001895 first_pi1_server_statistics = false;
Austin Schuh20ac95d2020-12-05 17:24:19 -08001896 });
1897
Austin Schuh58646e22021-08-23 23:51:46 -07001898 int pi1_client_statistics_count = 0;
1899 pi1_remote_timestamp->MakeWatcher(
Austin Schuh367a7f42021-11-23 23:04:36 -08001900 "/pi1/aos", [&pi1_client_statistics_count, &expected_boot_uuid,
1901 &expected_connection_time, &boot_number](
Austin Schuh58646e22021-08-23 23:51:46 -07001902 const message_bridge::ClientStatistics &stats) {
1903 VLOG(1) << "pi1 ClientStatistics " << FlatbufferToJson(&stats);
1904 for (const message_bridge::ClientConnection *connection :
1905 *stats.connections()) {
1906 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1907 if (connection->node()->name()->string_view() == "pi2") {
1908 ++pi1_client_statistics_count;
Austin Schuh367a7f42021-11-23 23:04:36 -08001909 EXPECT_EQ(expected_boot_uuid,
1910 UUID::FromString(connection->boot_uuid()))
1911 << " : Got " << aos::FlatbufferToJson(&stats);
1912 EXPECT_EQ(monotonic_clock::time_point(chrono::nanoseconds(
1913 connection->connected_since_time())),
1914 expected_connection_time);
1915 EXPECT_EQ(boot_number + 1, connection->connection_count());
1916 } else {
1917 EXPECT_EQ(connection->connected_since_time(), 0);
1918 EXPECT_EQ(1, connection->connection_count());
Austin Schuh58646e22021-08-23 23:51:46 -07001919 }
1920 }
1921 });
1922
1923 // Confirm that reboot changes the UUID.
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -07001924 pi2->OnShutdown([&expected_boot_uuid, &boot_number, &expected_connection_time,
1925 pi1, pi2, pi2_boot1]() {
1926 expected_boot_uuid = pi2_boot1;
1927 ++boot_number;
1928 LOG(INFO) << "OnShutdown triggered for pi2";
1929 pi2->OnStartup(
1930 [&expected_boot_uuid, &expected_connection_time, pi1, pi2]() {
1931 EXPECT_EQ(expected_boot_uuid, pi2->boot_uuid());
1932 expected_connection_time = pi1->monotonic_now();
1933 });
1934 });
Austin Schuh58646e22021-08-23 23:51:46 -07001935
Austin Schuh20ac95d2020-12-05 17:24:19 -08001936 // Let a couple of ServerStatistics messages show up before rebooting.
Austin Schuh58646e22021-08-23 23:51:46 -07001937 factory.RunFor(chrono::milliseconds(2002));
Austin Schuh20ac95d2020-12-05 17:24:19 -08001938
1939 EXPECT_GT(timestamp_count, 100);
1940 EXPECT_GE(pi1_server_statistics_count, 1u);
1941
Austin Schuh20ac95d2020-12-05 17:24:19 -08001942 timestamp_count = 0;
1943 pi1_server_statistics_count = 0;
1944
Austin Schuh58646e22021-08-23 23:51:46 -07001945 factory.RunFor(chrono::milliseconds(2000));
Austin Schuh20ac95d2020-12-05 17:24:19 -08001946 EXPECT_GT(timestamp_count, 100);
1947 EXPECT_GE(pi1_server_statistics_count, 1u);
1948}
1949
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -07001950INSTANTIATE_TEST_SUITE_P(
Austin Schuh89c9b812021-02-20 14:42:10 -08001951 All, RemoteMessageSimulatedEventLoopTest,
1952 ::testing::Values(
1953 Param{"multinode_pingpong_test_combined_config.json", true},
1954 Param{"multinode_pingpong_test_split_config.json", false}));
1955
Austin Schuh58646e22021-08-23 23:51:46 -07001956// Tests that Startup and Shutdown do reasonable things.
1957TEST(SimulatedEventLoopTest, MultinodePingPongStartup) {
1958 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
1959 aos::configuration::ReadConfig(
1960 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
1961
Austin Schuh72e65682021-09-02 11:37:05 -07001962 size_t pi1_shutdown_counter = 0;
1963 size_t pi2_shutdown_counter = 0;
1964 MessageCounter<examples::Pong> *pi1_pong_counter = nullptr;
1965 MessageCounter<examples::Ping> *pi2_ping_counter = nullptr;
1966
Austin Schuh58646e22021-08-23 23:51:46 -07001967 message_bridge::TestingTimeConverter time(
1968 configuration::NodesCount(&config.message()));
1969 SimulatedEventLoopFactory factory(&config.message());
1970 factory.SetTimeConverter(&time);
1971 time.AddNextTimestamp(
1972 distributed_clock::epoch(),
1973 {BootTimestamp::epoch(), BootTimestamp::epoch(), BootTimestamp::epoch()});
1974
1975 const chrono::nanoseconds dt = chrono::seconds(10) + chrono::milliseconds(6);
1976
1977 time.AddNextTimestamp(
1978 distributed_clock::epoch() + dt,
1979 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
1980 BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
1981 BootTimestamp::epoch() + dt});
1982
1983 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1984 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
1985
1986 // Configure startup to start Ping and Pong, and count.
1987 size_t pi1_startup_counter = 0;
1988 size_t pi2_startup_counter = 0;
1989 pi1->OnStartup([pi1]() {
1990 LOG(INFO) << "Made ping";
1991 pi1->AlwaysStart<Ping>("ping");
1992 });
1993 pi1->OnStartup([&pi1_startup_counter]() { ++pi1_startup_counter; });
1994 pi2->OnStartup([pi2]() {
1995 LOG(INFO) << "Made pong";
1996 pi2->AlwaysStart<Pong>("pong");
1997 });
1998 pi2->OnStartup([&pi2_startup_counter]() { ++pi2_startup_counter; });
1999
2000 // Shutdown just counts.
Austin Schuh58646e22021-08-23 23:51:46 -07002001 pi1->OnShutdown([&pi1_shutdown_counter]() { ++pi1_shutdown_counter; });
2002 pi2->OnShutdown([&pi2_shutdown_counter]() { ++pi2_shutdown_counter; });
2003
Austin Schuh58646e22021-08-23 23:51:46 -07002004 // Automatically make counters on startup.
2005 pi1->OnStartup([&pi1_pong_counter, pi1]() {
2006 pi1_pong_counter = pi1->AlwaysStart<MessageCounter<examples::Pong>>(
2007 "pi1_pong_counter", "/test");
2008 });
2009 pi1->OnShutdown([&pi1_pong_counter]() { pi1_pong_counter = nullptr; });
2010 pi2->OnStartup([&pi2_ping_counter, pi2]() {
2011 pi2_ping_counter = pi2->AlwaysStart<MessageCounter<examples::Ping>>(
2012 "pi2_ping_counter", "/test");
2013 });
2014 pi2->OnShutdown([&pi2_ping_counter]() { pi2_ping_counter = nullptr; });
2015
2016 EXPECT_EQ(pi2_ping_counter, nullptr);
2017 EXPECT_EQ(pi1_pong_counter, nullptr);
2018
2019 EXPECT_EQ(pi1_startup_counter, 0u);
2020 EXPECT_EQ(pi2_startup_counter, 0u);
2021 EXPECT_EQ(pi1_shutdown_counter, 0u);
2022 EXPECT_EQ(pi2_shutdown_counter, 0u);
2023
2024 factory.RunFor(chrono::seconds(10) + chrono::milliseconds(5));
2025 EXPECT_EQ(pi1_startup_counter, 1u);
2026 EXPECT_EQ(pi2_startup_counter, 1u);
2027 EXPECT_EQ(pi1_shutdown_counter, 0u);
2028 EXPECT_EQ(pi2_shutdown_counter, 0u);
2029 EXPECT_EQ(pi2_ping_counter->count(), 1001);
2030 EXPECT_EQ(pi1_pong_counter->count(), 1001);
2031
2032 LOG(INFO) << pi1->monotonic_now();
2033 LOG(INFO) << pi2->monotonic_now();
2034
2035 factory.RunFor(chrono::seconds(5) + chrono::milliseconds(5));
2036
2037 EXPECT_EQ(pi1_startup_counter, 2u);
2038 EXPECT_EQ(pi2_startup_counter, 2u);
2039 EXPECT_EQ(pi1_shutdown_counter, 1u);
2040 EXPECT_EQ(pi2_shutdown_counter, 1u);
2041 EXPECT_EQ(pi2_ping_counter->count(), 501);
2042 EXPECT_EQ(pi1_pong_counter->count(), 501);
2043}
2044
2045// Tests that OnStartup handlers can be added after running and get called, and
2046// can't be called when running.
2047TEST(SimulatedEventLoopDeathTest, OnStartupWhileRunning) {
2048 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2049 aos::configuration::ReadConfig(
2050 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2051
2052 // Test that we can add startup handlers as long as we aren't running, and
2053 // they get run when Run gets called again.
2054 // Test that adding a startup handler when running fails.
2055 //
2056 // Test shutdown handlers get called on destruction.
2057 SimulatedEventLoopFactory factory(&config.message());
2058
2059 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2060
2061 int startup_count0 = 0;
2062 int startup_count1 = 0;
2063
2064 pi1->OnStartup([&]() { ++startup_count0; });
2065 EXPECT_EQ(startup_count0, 0);
2066 EXPECT_EQ(startup_count1, 0);
2067
2068 factory.RunFor(chrono::nanoseconds(1));
2069 EXPECT_EQ(startup_count0, 1);
2070 EXPECT_EQ(startup_count1, 0);
2071
2072 pi1->OnStartup([&]() { ++startup_count1; });
2073 EXPECT_EQ(startup_count0, 1);
2074 EXPECT_EQ(startup_count1, 0);
2075
2076 factory.RunFor(chrono::nanoseconds(1));
2077 EXPECT_EQ(startup_count0, 1);
2078 EXPECT_EQ(startup_count1, 1);
2079
2080 std::unique_ptr<EventLoop> loop = pi1->MakeEventLoop("foo");
2081 loop->OnRun([&]() { pi1->OnStartup([]() {}); });
2082
2083 EXPECT_DEATH({ factory.RunFor(chrono::nanoseconds(1)); },
2084 "Can only register OnStartup handlers when not running.");
2085}
2086
2087// Tests that OnStartup handlers can be added after running and get called, and
2088// all the handlers get called on reboot. Shutdown handlers are tested the same
2089// way.
2090TEST(SimulatedEventLoopTest, OnStartupShutdownAllRestarts) {
2091 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2092 aos::configuration::ReadConfig(
2093 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2094
Austin Schuh72e65682021-09-02 11:37:05 -07002095 int startup_count0 = 0;
2096 int shutdown_count0 = 0;
2097 int startup_count1 = 0;
2098 int shutdown_count1 = 0;
2099
Austin Schuh58646e22021-08-23 23:51:46 -07002100 message_bridge::TestingTimeConverter time(
2101 configuration::NodesCount(&config.message()));
2102 SimulatedEventLoopFactory factory(&config.message());
2103 factory.SetTimeConverter(&time);
2104 time.StartEqual();
2105
2106 const chrono::nanoseconds dt = chrono::seconds(10);
2107 time.RebootAt(0, distributed_clock::epoch() + dt);
2108 time.RebootAt(0, distributed_clock::epoch() + 2 * dt);
2109
2110 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2111
Austin Schuh58646e22021-08-23 23:51:46 -07002112 pi1->OnStartup([&]() { ++startup_count0; });
2113 pi1->OnShutdown([&]() { ++shutdown_count0; });
2114 EXPECT_EQ(startup_count0, 0);
2115 EXPECT_EQ(startup_count1, 0);
2116 EXPECT_EQ(shutdown_count0, 0);
2117 EXPECT_EQ(shutdown_count1, 0);
2118
2119 factory.RunFor(chrono::nanoseconds(1));
2120 EXPECT_EQ(startup_count0, 1);
2121 EXPECT_EQ(startup_count1, 0);
2122 EXPECT_EQ(shutdown_count0, 0);
2123 EXPECT_EQ(shutdown_count1, 0);
2124
2125 pi1->OnStartup([&]() { ++startup_count1; });
2126 EXPECT_EQ(startup_count0, 1);
2127 EXPECT_EQ(startup_count1, 0);
2128 EXPECT_EQ(shutdown_count0, 0);
2129 EXPECT_EQ(shutdown_count1, 0);
2130
2131 factory.RunFor(chrono::nanoseconds(1));
2132 EXPECT_EQ(startup_count0, 1);
2133 EXPECT_EQ(startup_count1, 1);
2134 EXPECT_EQ(shutdown_count0, 0);
2135 EXPECT_EQ(shutdown_count1, 0);
2136
2137 factory.RunFor(chrono::seconds(15));
2138
2139 EXPECT_EQ(startup_count0, 2);
2140 EXPECT_EQ(startup_count1, 2);
2141 EXPECT_EQ(shutdown_count0, 1);
2142 EXPECT_EQ(shutdown_count1, 0);
2143
2144 pi1->OnShutdown([&]() { ++shutdown_count1; });
2145 factory.RunFor(chrono::seconds(10));
2146
2147 EXPECT_EQ(startup_count0, 3);
2148 EXPECT_EQ(startup_count1, 3);
2149 EXPECT_EQ(shutdown_count0, 2);
2150 EXPECT_EQ(shutdown_count1, 1);
2151}
2152
2153// Tests that event loops which outlive shutdown crash.
2154TEST(SimulatedEventLoopDeathTest, EventLoopOutlivesReboot) {
2155 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2156 aos::configuration::ReadConfig(
2157 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2158
2159 message_bridge::TestingTimeConverter time(
2160 configuration::NodesCount(&config.message()));
2161 SimulatedEventLoopFactory factory(&config.message());
2162 factory.SetTimeConverter(&time);
2163 time.StartEqual();
2164
2165 const chrono::nanoseconds dt = chrono::seconds(10);
2166 time.RebootAt(0, distributed_clock::epoch() + dt);
2167
2168 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2169
2170 std::unique_ptr<EventLoop> loop = pi1->MakeEventLoop("foo");
2171
2172 EXPECT_DEATH({ factory.RunFor(dt * 2); }, "Event loop");
2173}
2174
Brian Silvermane1fe2512022-08-14 23:18:50 -07002175// Test that an ExitHandle outliving its factory is caught.
2176TEST(SimulatedEventLoopDeathTest, ExitHandleOutlivesFactory) {
2177 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2178 aos::configuration::ReadConfig(
2179 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2180 auto factory = std::make_unique<SimulatedEventLoopFactory>(&config.message());
2181 NodeEventLoopFactory *pi1 = factory->GetNodeEventLoopFactory("pi1");
2182 std::unique_ptr<EventLoop> loop = pi1->MakeEventLoop("foo");
2183 auto exit_handle = factory->MakeExitHandle();
2184 EXPECT_DEATH(factory.reset(),
2185 "All ExitHandles must be destroyed before the factory");
2186}
2187
Austin Schuh3e31f912023-08-21 21:29:10 -07002188// Test that AllowApplicationCreationDuring can't happen in OnRun callbacks.
2189TEST(SimulatedEventLoopDeathTest, AllowApplicationCreationDuringInOnRun) {
2190 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2191 aos::configuration::ReadConfig(
2192 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2193 auto factory = std::make_unique<SimulatedEventLoopFactory>(&config.message());
2194 NodeEventLoopFactory *pi1 = factory->GetNodeEventLoopFactory("pi1");
2195 std::unique_ptr<EventLoop> loop = pi1->MakeEventLoop("foo");
2196 loop->OnRun([&]() { factory->AllowApplicationCreationDuring([]() {}); });
2197 EXPECT_DEATH(factory->RunFor(chrono::seconds(1)), "OnRun");
2198}
2199
Austin Schuh58646e22021-08-23 23:51:46 -07002200// Tests that messages don't survive a reboot of a node.
2201TEST(SimulatedEventLoopTest, ChannelClearedOnReboot) {
2202 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2203 aos::configuration::ReadConfig(
2204 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2205
2206 message_bridge::TestingTimeConverter time(
2207 configuration::NodesCount(&config.message()));
2208 SimulatedEventLoopFactory factory(&config.message());
2209 factory.SetTimeConverter(&time);
2210 time.StartEqual();
2211
2212 const chrono::nanoseconds dt = chrono::seconds(10);
2213 time.RebootAt(0, distributed_clock::epoch() + dt);
2214
2215 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2216
2217 const UUID boot_uuid = pi1->boot_uuid();
2218 EXPECT_NE(boot_uuid, UUID::Zero());
2219
2220 {
2221 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
2222 aos::Sender<examples::Ping> test_message_sender =
2223 ping_event_loop->MakeSender<examples::Ping>("/reliable");
2224 SendPing(&test_message_sender, 1);
2225 }
2226
2227 factory.RunFor(chrono::seconds(5));
2228
2229 {
2230 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
2231 aos::Fetcher<examples::Ping> fetcher =
2232 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
2233 EXPECT_TRUE(fetcher.Fetch());
2234 }
2235
2236 factory.RunFor(chrono::seconds(10));
2237
2238 {
2239 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
2240 aos::Fetcher<examples::Ping> fetcher =
2241 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
2242 EXPECT_FALSE(fetcher.Fetch());
2243 }
2244 EXPECT_NE(boot_uuid, pi1->boot_uuid());
2245}
2246
2247// Tests that reliable messages get resent on reboot.
2248TEST(SimulatedEventLoopTest, ReliableMessageResentOnReboot) {
2249 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2250 aos::configuration::ReadConfig(
2251 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2252
2253 message_bridge::TestingTimeConverter time(
2254 configuration::NodesCount(&config.message()));
2255 SimulatedEventLoopFactory factory(&config.message());
2256 factory.SetTimeConverter(&time);
2257 time.StartEqual();
2258
2259 const chrono::nanoseconds dt = chrono::seconds(1);
2260 time.RebootAt(1, distributed_clock::epoch() + dt);
2261
2262 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2263 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2264
2265 const UUID pi1_boot_uuid = pi1->boot_uuid();
2266 const UUID pi2_boot_uuid = pi2->boot_uuid();
2267 EXPECT_NE(pi1_boot_uuid, UUID::Zero());
2268 EXPECT_NE(pi2_boot_uuid, UUID::Zero());
2269
2270 {
2271 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
2272 aos::Sender<examples::Ping> test_message_sender =
2273 ping_event_loop->MakeSender<examples::Ping>("/reliable");
2274 SendPing(&test_message_sender, 1);
2275 }
2276
2277 factory.RunFor(chrono::milliseconds(500));
2278
2279 {
2280 ::std::unique_ptr<EventLoop> ping_event_loop = pi2->MakeEventLoop("pong");
2281 aos::Fetcher<examples::Ping> fetcher =
2282 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
Austin Schuhac6d89e2024-03-27 14:56:09 -07002283 ASSERT_TRUE(fetcher.Fetch());
2284 EXPECT_EQ(fetcher.context().monotonic_remote_time,
2285 monotonic_clock::epoch());
2286 // Message bridge picks up the Ping message immediately on reboot.
2287 EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
2288 monotonic_clock::epoch());
2289 EXPECT_EQ(fetcher.context().monotonic_event_time,
2290 monotonic_clock::epoch() + factory.network_delay());
2291 ASSERT_FALSE(fetcher.Fetch());
Austin Schuh58646e22021-08-23 23:51:46 -07002292 }
2293
2294 factory.RunFor(chrono::seconds(1));
2295
2296 {
2297 ::std::unique_ptr<EventLoop> ping_event_loop = pi2->MakeEventLoop("pong");
2298 aos::Fetcher<examples::Ping> fetcher =
2299 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
Austin Schuhac6d89e2024-03-27 14:56:09 -07002300 ASSERT_TRUE(fetcher.Fetch());
2301 EXPECT_EQ(fetcher.context().monotonic_remote_time,
2302 monotonic_clock::epoch());
2303 // Message bridge picks up the Ping message immediately on reboot.
2304 EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
2305 monotonic_clock::epoch() + chrono::seconds(1));
2306 EXPECT_EQ(fetcher.context().monotonic_event_time,
2307 monotonic_clock::epoch() + factory.network_delay());
2308 ASSERT_FALSE(fetcher.Fetch());
Austin Schuh58646e22021-08-23 23:51:46 -07002309 }
2310 EXPECT_NE(pi2_boot_uuid, pi2->boot_uuid());
2311}
2312
James Kuszmaul86e86c32022-07-21 17:39:47 -07002313TEST(SimulatedEventLoopTest, ReliableMessageSentOnStaggeredBoot) {
2314 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2315 aos::configuration::ReadConfig(
2316 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2317
2318 message_bridge::TestingTimeConverter time(
2319 configuration::NodesCount(&config.message()));
2320 time.AddNextTimestamp(
2321 distributed_clock::epoch(),
2322 {BootTimestamp{0, monotonic_clock::epoch()},
2323 BootTimestamp{0, monotonic_clock::epoch() - chrono::seconds(1)},
2324 BootTimestamp{0, monotonic_clock::epoch()}});
2325 SimulatedEventLoopFactory factory(&config.message());
2326 factory.SetTimeConverter(&time);
2327
2328 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2329 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2330
2331 const UUID pi1_boot_uuid = pi1->boot_uuid();
2332 const UUID pi2_boot_uuid = pi2->boot_uuid();
2333 EXPECT_NE(pi1_boot_uuid, UUID::Zero());
2334 EXPECT_NE(pi2_boot_uuid, UUID::Zero());
2335
2336 {
2337 ::std::unique_ptr<EventLoop> pi1_event_loop = pi1->MakeEventLoop("ping");
2338 aos::Sender<examples::Ping> pi1_sender =
2339 pi1_event_loop->MakeSender<examples::Ping>("/reliable");
2340 SendPing(&pi1_sender, 1);
2341 }
2342 ::std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("ping");
2343 aos::Sender<examples::Ping> pi2_sender =
2344 pi2_event_loop->MakeSender<examples::Ping>("/reliable2");
2345 SendPing(&pi2_sender, 1);
2346 // Verify that we staggered the OnRun callback correctly.
2347 pi2_event_loop->OnRun([pi1, pi2]() {
2348 EXPECT_EQ(pi1->monotonic_now(),
2349 monotonic_clock::epoch() + std::chrono::seconds(1));
2350 EXPECT_EQ(pi2->monotonic_now(), monotonic_clock::epoch());
2351 });
2352
2353 factory.RunFor(chrono::seconds(2));
2354
2355 {
2356 ::std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("pong");
2357 aos::Fetcher<examples::Ping> fetcher =
2358 pi2_event_loop->MakeFetcher<examples::Ping>("/reliable");
2359 ASSERT_TRUE(fetcher.Fetch());
2360 EXPECT_EQ(fetcher.context().monotonic_event_time,
2361 monotonic_clock::epoch() + factory.network_delay());
2362 EXPECT_EQ(fetcher.context().monotonic_remote_time,
2363 monotonic_clock::epoch());
Austin Schuhac6d89e2024-03-27 14:56:09 -07002364 EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
2365 monotonic_clock::epoch() + chrono::seconds(1));
James Kuszmaul86e86c32022-07-21 17:39:47 -07002366 }
2367 {
2368 ::std::unique_ptr<EventLoop> pi1_event_loop = pi1->MakeEventLoop("pong");
2369 aos::Fetcher<examples::Ping> fetcher =
2370 pi1_event_loop->MakeFetcher<examples::Ping>("/reliable2");
2371 ASSERT_TRUE(fetcher.Fetch());
2372 EXPECT_EQ(fetcher.context().monotonic_event_time,
2373 monotonic_clock::epoch() + std::chrono::seconds(1) +
2374 factory.network_delay());
2375 EXPECT_EQ(fetcher.context().monotonic_remote_time,
2376 monotonic_clock::epoch() - std::chrono::seconds(1));
Austin Schuhac6d89e2024-03-27 14:56:09 -07002377 EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
2378 monotonic_clock::epoch());
James Kuszmaul86e86c32022-07-21 17:39:47 -07002379 }
2380}
2381
Austin Schuh48205e62021-11-12 14:13:18 -08002382class SimulatedEventLoopDisconnectTest : public ::testing::Test {
2383 public:
2384 SimulatedEventLoopDisconnectTest()
2385 : config(aos::configuration::ReadConfig(ArtifactPath(
2386 "aos/events/multinode_pingpong_test_split_config.json"))),
2387 time(configuration::NodesCount(&config.message())),
2388 factory(&config.message()) {
2389 factory.SetTimeConverter(&time);
2390 }
2391
2392 void VerifyChannels(std::set<const aos::Channel *> statistics_channels,
2393 const monotonic_clock::time_point allowable_message_time,
2394 std::set<const aos::Node *> empty_nodes) {
2395 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2396 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2397 std::unique_ptr<aos::EventLoop> pi1_event_loop =
2398 pi1->MakeEventLoop("fetcher");
2399 std::unique_ptr<aos::EventLoop> pi2_event_loop =
2400 pi2->MakeEventLoop("fetcher");
2401 for (const aos::Channel *channel : *factory.configuration()->channels()) {
2402 if (configuration::ChannelIsReadableOnNode(channel,
2403 pi1_event_loop->node())) {
2404 std::unique_ptr<aos::RawFetcher> fetcher =
2405 pi1_event_loop->MakeRawFetcher(channel);
2406 if (statistics_channels.find(channel) == statistics_channels.end() ||
2407 empty_nodes.find(pi1_event_loop->node()) != empty_nodes.end()) {
2408 EXPECT_FALSE(fetcher->Fetch() &&
2409 fetcher->context().monotonic_event_time >
2410 allowable_message_time)
2411 << ": Found recent message on channel "
2412 << configuration::CleanedChannelToString(channel) << " and time "
2413 << fetcher->context().monotonic_event_time << " > "
2414 << allowable_message_time << " on pi1";
2415 } else {
2416 EXPECT_TRUE(fetcher->Fetch() &&
2417 fetcher->context().monotonic_event_time >=
2418 allowable_message_time)
2419 << ": Didn't find recent message on channel "
2420 << configuration::CleanedChannelToString(channel) << " on pi1";
2421 }
2422 }
2423 if (configuration::ChannelIsReadableOnNode(channel,
2424 pi2_event_loop->node())) {
2425 std::unique_ptr<aos::RawFetcher> fetcher =
2426 pi2_event_loop->MakeRawFetcher(channel);
2427 if (statistics_channels.find(channel) == statistics_channels.end() ||
2428 empty_nodes.find(pi2_event_loop->node()) != empty_nodes.end()) {
2429 EXPECT_FALSE(fetcher->Fetch() &&
2430 fetcher->context().monotonic_event_time >
2431 allowable_message_time)
2432 << ": Found message on channel "
2433 << configuration::CleanedChannelToString(channel) << " and time "
2434 << fetcher->context().monotonic_event_time << " > "
2435 << allowable_message_time << " on pi2";
2436 } else {
2437 EXPECT_TRUE(fetcher->Fetch() &&
2438 fetcher->context().monotonic_event_time >=
2439 allowable_message_time)
2440 << ": Didn't find message on channel "
2441 << configuration::CleanedChannelToString(channel) << " on pi2";
2442 }
2443 }
2444 }
2445 }
2446
2447 aos::FlatbufferDetachedBuffer<aos::Configuration> config;
2448
2449 message_bridge::TestingTimeConverter time;
2450 SimulatedEventLoopFactory factory;
2451};
2452
2453// Tests that if we have message bridge client/server disabled, and timing
2454// reports disabled, no messages are sent. Also tests that we can disconnect a
2455// node and disable statistics on it and it actually fully disconnects.
2456TEST_F(SimulatedEventLoopDisconnectTest, NoMessagesWhenDisabled) {
2457 time.StartEqual();
2458 factory.SkipTimingReport();
2459 factory.DisableStatistics();
2460
2461 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2462 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2463
2464 std::unique_ptr<aos::EventLoop> pi1_event_loop =
2465 pi1->MakeEventLoop("fetcher");
2466 std::unique_ptr<aos::EventLoop> pi2_event_loop =
2467 pi2->MakeEventLoop("fetcher");
2468
2469 factory.RunFor(chrono::milliseconds(100000));
2470
2471 // Confirm no messages are sent if we've configured them all off.
2472 VerifyChannels({}, monotonic_clock::min_time, {});
2473
2474 // Now, confirm that all the message_bridge channels come back when we
2475 // re-enable.
2476 factory.EnableStatistics();
2477
2478 factory.RunFor(chrono::milliseconds(10050));
2479
2480 // Build up the list of all the messages we expect when we come back.
2481 {
2482 std::set<const aos::Channel *> statistics_channels;
Brian Silverman18c8aac2022-01-02 00:03:37 -08002483 for (const std::pair<std::string_view, const Node *> &pi :
Austin Schuh48205e62021-11-12 14:13:18 -08002484 std::vector<std::pair<std::string_view, const Node *>>{
2485 {"/pi1/aos", pi1->node()},
2486 {"/pi2/aos", pi1->node()},
2487 {"/pi3/aos", pi1->node()}}) {
2488 statistics_channels.insert(configuration::GetChannel(
2489 factory.configuration(), pi.first, "aos.message_bridge.Timestamp", "",
2490 pi.second));
2491 statistics_channels.insert(configuration::GetChannel(
2492 factory.configuration(), pi.first,
2493 "aos.message_bridge.ServerStatistics", "", pi.second));
2494 statistics_channels.insert(configuration::GetChannel(
2495 factory.configuration(), pi.first,
2496 "aos.message_bridge.ClientStatistics", "", pi.second));
2497 }
2498
2499 statistics_channels.insert(configuration::GetChannel(
2500 factory.configuration(),
2501 "/pi1/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp",
2502 "aos.message_bridge.RemoteMessage", "", pi1->node()));
2503 statistics_channels.insert(configuration::GetChannel(
2504 factory.configuration(),
2505 "/pi2/aos/remote_timestamps/pi1/pi2/aos/aos-message_bridge-Timestamp",
2506 "aos.message_bridge.RemoteMessage", "", pi2->node()));
2507 VerifyChannels(statistics_channels, monotonic_clock::min_time, {});
2508 }
2509
2510 // Now test that we can disable the messages for a single node
2511 pi2->DisableStatistics();
2512 const aos::monotonic_clock::time_point statistics_disable_time =
2513 pi2->monotonic_now();
2514 factory.RunFor(chrono::milliseconds(10000));
2515
2516 // We should see a much smaller set of messages, but should still see messages
2517 // forwarded, mainly the timestamp message.
2518 {
2519 std::set<const aos::Channel *> statistics_channels;
Brian Silverman18c8aac2022-01-02 00:03:37 -08002520 for (const std::pair<std::string_view, const Node *> &pi :
Austin Schuh48205e62021-11-12 14:13:18 -08002521 std::vector<std::pair<std::string_view, const Node *>>{
2522 {"/pi1/aos", pi1->node()}, {"/pi3/aos", pi1->node()}}) {
2523 statistics_channels.insert(configuration::GetChannel(
2524 factory.configuration(), pi.first, "aos.message_bridge.Timestamp", "",
2525 pi.second));
2526 statistics_channels.insert(configuration::GetChannel(
2527 factory.configuration(), pi.first,
2528 "aos.message_bridge.ServerStatistics", "", pi.second));
2529 statistics_channels.insert(configuration::GetChannel(
2530 factory.configuration(), pi.first,
2531 "aos.message_bridge.ClientStatistics", "", pi.second));
2532 }
2533
2534 statistics_channels.insert(configuration::GetChannel(
2535 factory.configuration(),
2536 "/pi1/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp",
2537 "aos.message_bridge.RemoteMessage", "", pi1->node()));
2538 VerifyChannels(statistics_channels, statistics_disable_time, {});
2539 }
2540
2541 // Now, fully disconnect the node. This will completely quiet down pi2.
2542 pi1->Disconnect(pi2->node());
2543 pi2->Disconnect(pi1->node());
2544
2545 const aos::monotonic_clock::time_point disconnect_disable_time =
2546 pi2->monotonic_now();
2547 factory.RunFor(chrono::milliseconds(10000));
2548
2549 {
2550 std::set<const aos::Channel *> statistics_channels;
Brian Silverman18c8aac2022-01-02 00:03:37 -08002551 for (const std::pair<std::string_view, const Node *> &pi :
Austin Schuh48205e62021-11-12 14:13:18 -08002552 std::vector<std::pair<std::string_view, const Node *>>{
2553 {"/pi1/aos", pi1->node()}, {"/pi3/aos", pi1->node()}}) {
2554 statistics_channels.insert(configuration::GetChannel(
2555 factory.configuration(), pi.first, "aos.message_bridge.Timestamp", "",
2556 pi.second));
2557 statistics_channels.insert(configuration::GetChannel(
2558 factory.configuration(), pi.first,
2559 "aos.message_bridge.ServerStatistics", "", pi.second));
2560 statistics_channels.insert(configuration::GetChannel(
2561 factory.configuration(), pi.first,
2562 "aos.message_bridge.ClientStatistics", "", pi.second));
2563 }
2564
2565 VerifyChannels(statistics_channels, disconnect_disable_time, {pi2->node()});
2566 }
2567}
2568
Austin Schuh9cce6842024-04-02 18:55:44 -07002569// Struct to capture the expected time a message should be received (and it's
2570// value). This is from the perspective of the node receiving the message.
2571struct ExpectedTimestamps {
2572 // The time that the message was published on the sending node's monotonic
2573 // clock.
2574 monotonic_clock::time_point remote_time;
2575 // The time that the message was virtually transmitted over the virtual
2576 // network on the sending node's monotonic clock.
2577 monotonic_clock::time_point remote_transmit_time;
2578 // The time that the message was received on the receiving node's clock.
2579 monotonic_clock::time_point event_time;
2580 // The value inside the message.
2581 int value;
2582};
2583
Austin Schuhac6d89e2024-03-27 14:56:09 -07002584// Tests that rapidly sent messages get timestamped correctly.
2585TEST(SimulatedEventLoopTest, TransmitTimestamps) {
2586 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2587 aos::configuration::ReadConfig(
2588 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2589
2590 message_bridge::TestingTimeConverter time(
2591 configuration::NodesCount(&config.message()));
2592 SimulatedEventLoopFactory factory(&config.message());
2593 factory.SetTimeConverter(&time);
2594 time.StartEqual();
2595
2596 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2597 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2598
2599 ::std::unique_ptr<EventLoop> ping_event_loop = pi2->MakeEventLoop("pong");
2600 aos::Fetcher<examples::Ping> fetcher =
2601 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
2602 EXPECT_FALSE(fetcher.Fetch());
2603
2604 {
2605 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
Austin Schuh9cce6842024-04-02 18:55:44 -07002606 FunctionScheduler run_at(ping_event_loop.get());
Austin Schuhac6d89e2024-03-27 14:56:09 -07002607 aos::Sender<examples::Ping> test_message_sender =
2608 ping_event_loop->MakeSender<examples::Ping>("/reliable");
Austin Schuh9cce6842024-04-02 18:55:44 -07002609 aos::monotonic_clock::time_point now = ping_event_loop->monotonic_now();
Austin Schuhac6d89e2024-03-27 14:56:09 -07002610 for (const std::chrono::nanoseconds dt :
2611 {chrono::microseconds(5000), chrono::microseconds(1),
2612 chrono::microseconds(2), chrono::microseconds(70),
Austin Schuh9cce6842024-04-02 18:55:44 -07002613 chrono::microseconds(63), chrono::microseconds(140)}) {
2614 now += dt;
2615 run_at.ScheduleAt([&]() { SendPing(&test_message_sender, 1); }, now);
Austin Schuhac6d89e2024-03-27 14:56:09 -07002616 }
2617
Austin Schuh9cce6842024-04-02 18:55:44 -07002618 now += chrono::milliseconds(10);
2619
2620 factory.RunFor(now - ping_event_loop->monotonic_now());
Austin Schuhac6d89e2024-03-27 14:56:09 -07002621 }
2622
Austin Schuh9cce6842024-04-02 18:55:44 -07002623 const monotonic_clock::time_point e = monotonic_clock::epoch();
2624 const chrono::nanoseconds send_delay = factory.send_delay();
2625 const chrono::nanoseconds network_delay = factory.network_delay();
Austin Schuhac6d89e2024-03-27 14:56:09 -07002626
Austin Schuh9cce6842024-04-02 18:55:44 -07002627 const std::vector<ExpectedTimestamps> expected_values = {
2628 // First message shows up after wakeup + network delay as expected.
2629 ExpectedTimestamps{
2630 .remote_time = e + chrono::microseconds(5000),
2631 .remote_transmit_time = e + chrono::microseconds(5000) + send_delay,
2632 .event_time =
2633 e + chrono::microseconds(5000) + send_delay + network_delay,
2634 .value = 1,
2635 },
2636 // Next message is close enough that it gets picked up at the same wakeup.
2637 ExpectedTimestamps{
2638 .remote_time = e + chrono::microseconds(5001),
2639 .remote_transmit_time = e + chrono::microseconds(5000) + send_delay,
2640 .event_time =
2641 e + chrono::microseconds(5000) + send_delay + network_delay,
2642 .value = 1,
2643 },
2644 // Same for the third.
2645 ExpectedTimestamps{
2646 .remote_time = e + chrono::microseconds(5003),
2647 .remote_transmit_time = e + chrono::microseconds(5000) + send_delay,
2648 .event_time =
2649 e + chrono::microseconds(5000) + send_delay + network_delay,
2650 .value = 1,
2651 },
2652 // Fourth waits long enough to do the right thing.
2653 ExpectedTimestamps{
2654 .remote_time = e + chrono::microseconds(5073),
2655 .remote_transmit_time = e + chrono::microseconds(5073) + send_delay,
2656 .event_time =
2657 e + chrono::microseconds(5073) + send_delay + network_delay,
2658 .value = 1,
2659 },
2660 // Fifth waits long enough to do the right thing as well (but kicks off
2661 // while the fourth is in flight over the network).
2662 ExpectedTimestamps{
2663 .remote_time = e + chrono::microseconds(5136),
2664 .remote_transmit_time = e + chrono::microseconds(5136) + send_delay,
2665 .event_time =
2666 e + chrono::microseconds(5136) + send_delay + network_delay,
2667 .value = 1,
2668 },
2669 // Sixth waits long enough to do the right thing as well (but kicks off
2670 // while the fifth is in flight over the network and has almost landed).
2671 // The timer wakeup for the Timestamp message coming back will find the
2672 // sixth message a little bit early.
2673 ExpectedTimestamps{
2674 .remote_time = e + chrono::microseconds(5276),
2675 .remote_transmit_time = e + chrono::microseconds(5273) + send_delay,
2676 .event_time =
2677 e + chrono::microseconds(5273) + send_delay + network_delay,
2678 .value = 1,
2679 },
2680 };
Austin Schuhac6d89e2024-03-27 14:56:09 -07002681
Austin Schuh9cce6842024-04-02 18:55:44 -07002682 for (const ExpectedTimestamps value : expected_values) {
2683 ASSERT_TRUE(fetcher.FetchNext());
2684 EXPECT_EQ(fetcher.context().monotonic_remote_time, value.remote_time);
2685 EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
2686 value.remote_transmit_time);
2687 EXPECT_EQ(fetcher.context().monotonic_event_time, value.event_time);
2688 EXPECT_EQ(fetcher->value(), value.value);
2689 }
Austin Schuhac6d89e2024-03-27 14:56:09 -07002690
2691 ASSERT_FALSE(fetcher.FetchNext());
2692}
2693
2694// Tests that a reliable message gets forwarded if it was sent originally when
2695// nodes were disconnected.
2696TEST_F(SimulatedEventLoopDisconnectTest, ReliableMessageSendsOnConnect) {
2697 time.StartEqual();
2698 factory.SkipTimingReport();
2699 factory.DisableStatistics();
2700
2701 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2702 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2703
2704 // Fully disconnect the nodes.
2705 pi1->Disconnect(pi2->node());
2706 pi2->Disconnect(pi1->node());
2707
Austin Schuhac6d89e2024-03-27 14:56:09 -07002708 std::unique_ptr<aos::EventLoop> pi2_event_loop =
2709 pi2->MakeEventLoop("fetcher");
2710 aos::Fetcher<examples::Ping> pi2_reliable_fetcher =
2711 pi2_event_loop->MakeFetcher<examples::Ping>("/reliable");
2712
2713 factory.RunFor(chrono::milliseconds(100));
2714
2715 {
Austin Schuheeb86fc2024-04-04 20:12:39 -07002716 std::unique_ptr<aos::EventLoop> pi1_event_loop =
2717 pi1->MakeEventLoop("sender");
Austin Schuhac6d89e2024-03-27 14:56:09 -07002718 aos::Sender<examples::Ping> pi1_reliable_sender =
2719 pi1_event_loop->MakeSender<examples::Ping>("/reliable");
Austin Schuh9cce6842024-04-02 18:55:44 -07002720 FunctionScheduler run_at(pi1_event_loop.get());
2721 aos::monotonic_clock::time_point now = pi1_event_loop->monotonic_now();
Austin Schuhac6d89e2024-03-27 14:56:09 -07002722 for (int i = 0; i < 100; ++i) {
Austin Schuh9cce6842024-04-02 18:55:44 -07002723 run_at.ScheduleAt([&, i = i]() { SendPing(&pi1_reliable_sender, i); },
2724 now);
2725 now += chrono::milliseconds(100);
Austin Schuhac6d89e2024-03-27 14:56:09 -07002726 }
Austin Schuh9cce6842024-04-02 18:55:44 -07002727 now += chrono::milliseconds(50);
Austin Schuhac6d89e2024-03-27 14:56:09 -07002728
Austin Schuh9cce6842024-04-02 18:55:44 -07002729 factory.RunFor(now - pi1_event_loop->monotonic_now());
2730 }
Austin Schuhac6d89e2024-03-27 14:56:09 -07002731
2732 ASSERT_FALSE(pi2_reliable_fetcher.Fetch());
2733
2734 pi1->Connect(pi2->node());
2735 pi2->Connect(pi1->node());
2736
2737 factory.RunFor(chrono::milliseconds(1));
2738
2739 ASSERT_TRUE(pi2_reliable_fetcher.Fetch());
2740 ASSERT_EQ(pi2_reliable_fetcher.context().monotonic_remote_time,
2741 monotonic_clock::epoch() + chrono::milliseconds(10000));
2742 ASSERT_EQ(pi2_reliable_fetcher.context().monotonic_remote_transmit_time,
2743 monotonic_clock::epoch() + chrono::milliseconds(10150));
2744 ASSERT_EQ(pi2_reliable_fetcher.context().monotonic_event_time,
2745 monotonic_clock::epoch() + chrono::milliseconds(10150) +
2746 factory.network_delay());
2747 ASSERT_EQ(pi2_reliable_fetcher->value(), 99);
2748
Austin Schuh9cce6842024-04-02 18:55:44 -07002749 // TODO(austin): Verify that the dropped packet count increases.
2750
Austin Schuhac6d89e2024-03-27 14:56:09 -07002751 ASSERT_FALSE(pi2_reliable_fetcher.Fetch());
2752}
2753
Austin Schuh9cce6842024-04-02 18:55:44 -07002754// Tests that if we disconnect while a message is in various states of being
2755// queued, it gets either dropped or sent as expected.
2756TEST_F(SimulatedEventLoopDisconnectTest, MessageInFlightDuringDisconnect) {
2757 time.StartEqual();
2758 factory.SkipTimingReport();
2759 factory.DisableStatistics();
2760
2761 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2762 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2763
2764 std::unique_ptr<aos::EventLoop> pi1_event_loop = pi1->MakeEventLoop("sender");
2765
2766 std::unique_ptr<aos::EventLoop> pi2_event_loop =
2767 pi2->MakeEventLoop("fetcher");
2768 aos::Fetcher<examples::Ping> fetcher =
2769 pi2_event_loop->MakeFetcher<examples::Ping>("/unreliable");
2770
2771 ASSERT_FALSE(fetcher.Fetch());
2772
2773 aos::monotonic_clock::time_point now = pi1_event_loop->monotonic_now();
2774 {
2775 FunctionScheduler run_at(pi1_event_loop.get());
2776 aos::Sender<examples::Ping> pi1_sender =
2777 pi1_event_loop->MakeSender<examples::Ping>("/unreliable");
2778
2779 int i = 0;
2780 for (const std::chrono::nanoseconds dt :
2781 {chrono::microseconds(5000), chrono::microseconds(1),
2782 chrono::microseconds(2), chrono::microseconds(70),
2783 chrono::microseconds(63), chrono::microseconds(140),
2784 chrono::microseconds(160)}) {
2785 run_at.ScheduleAt(
2786 [&]() {
2787 pi1->Connect(pi2->node());
2788 pi2->Connect(pi1->node());
2789 },
2790 now);
2791
2792 now += chrono::milliseconds(100);
2793
2794 run_at.ScheduleAt([&, i = i]() { SendPing(&pi1_sender, i); }, now);
2795
2796 now += dt;
2797
2798 run_at.ScheduleAt(
2799 [&]() {
2800 // Fully disconnect the nodes.
2801 pi1->Disconnect(pi2->node());
2802 pi2->Disconnect(pi1->node());
2803 },
2804 now);
2805
2806 now += chrono::milliseconds(100) - dt;
2807 ++i;
2808 }
2809
2810 factory.RunFor(now - pi1_event_loop->monotonic_now());
2811 }
2812
2813 const monotonic_clock::time_point e = monotonic_clock::epoch();
2814 const chrono::nanoseconds send_delay = factory.send_delay();
2815 const chrono::nanoseconds network_delay = factory.network_delay();
2816
2817 const std::vector<ExpectedTimestamps> expected_values = {
2818 ExpectedTimestamps{
2819 .remote_time = e + chrono::milliseconds(100),
2820 .remote_transmit_time = e + chrono::milliseconds(100) + send_delay,
2821 .event_time =
2822 e + chrono::milliseconds(100) + send_delay + network_delay,
2823 .value = 0,
2824 },
2825 ExpectedTimestamps{
2826 .remote_time = e + chrono::milliseconds(1300),
2827 .remote_transmit_time = e + chrono::milliseconds(1300) + send_delay,
2828 .event_time =
2829 e + chrono::milliseconds(1300) + send_delay + network_delay,
2830 .value = 6,
2831 },
2832 };
2833
2834 for (const ExpectedTimestamps value : expected_values) {
2835 ASSERT_TRUE(fetcher.FetchNext());
2836 EXPECT_EQ(fetcher.context().monotonic_remote_time, value.remote_time);
2837 EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
2838 value.remote_transmit_time);
2839 EXPECT_EQ(fetcher.context().monotonic_event_time, value.event_time);
2840 EXPECT_EQ(fetcher->value(), value.value);
2841 }
2842
2843 // TODO(austin): Verify that the dropped packet count increases.
2844
2845 ASSERT_FALSE(fetcher.Fetch());
2846}
2847
2848class PingLogger {
2849 public:
2850 PingLogger(aos::EventLoop *event_loop, std::string_view channel,
2851 std::vector<std::pair<aos::Context, int>> *msgs)
2852 : event_loop_(event_loop),
2853 fetcher_(event_loop_->MakeFetcher<examples::Ping>(channel)),
2854 msgs_(msgs) {
2855 event_loop_->OnRun([this]() { CHECK(!fetcher_.Fetch()); });
2856 }
2857
2858 ~PingLogger() {
2859 while (fetcher_.FetchNext()) {
2860 msgs_->emplace_back(fetcher_.context(), fetcher_->value());
2861 }
2862 }
2863
2864 private:
2865 aos::EventLoop *event_loop_;
2866 aos::Fetcher<examples::Ping> fetcher_;
2867 std::vector<std::pair<aos::Context, int>> *msgs_;
2868};
2869
2870// Tests that rebooting while a message is in flight works as expected.
2871TEST_F(SimulatedEventLoopDisconnectTest, MessageInFlightDuringReboot) {
2872 time.StartEqual();
2873 for (int i = 0; i < 8; ++i) {
2874 time.RebootAt(1, distributed_clock::epoch() + chrono::seconds(10 * i));
2875 }
2876
2877 factory.SkipTimingReport();
2878 factory.DisableStatistics();
2879
2880 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2881 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2882
2883 std::unique_ptr<aos::EventLoop> pi1_event_loop = pi1->MakeEventLoop("sender");
2884
2885 aos::monotonic_clock::time_point now = pi1_event_loop->monotonic_now();
2886 FunctionScheduler run_at(pi1_event_loop.get());
2887 aos::Sender<examples::Ping> pi1_sender =
2888 pi1_event_loop->MakeSender<examples::Ping>("/unreliable");
2889
2890 int i = 0;
2891 for (const std::chrono::nanoseconds dt :
2892 {chrono::microseconds(5000), chrono::microseconds(1),
2893 chrono::microseconds(2), chrono::microseconds(70),
2894 chrono::microseconds(63), chrono::microseconds(140),
2895 chrono::microseconds(160)}) {
2896 run_at.ScheduleAt([&, i = i]() { SendPing(&pi1_sender, i); },
2897 now + chrono::seconds(10) - dt);
2898
2899 now += chrono::seconds(10);
2900 ++i;
2901 }
2902
2903 std::vector<std::pair<aos::Context, int>> msgs;
2904
2905 pi2->OnStartup([pi2, &msgs]() {
2906 pi2->AlwaysStart<PingLogger>("ping_logger", "/unreliable", &msgs);
2907 });
2908
2909 factory.RunFor(now - pi1_event_loop->monotonic_now() + chrono::seconds(10));
2910
2911 const monotonic_clock::time_point e = monotonic_clock::epoch();
2912 const chrono::nanoseconds send_delay = factory.send_delay();
2913 const chrono::nanoseconds network_delay = factory.network_delay();
2914
2915 const std::vector<ExpectedTimestamps> expected_values = {
2916 ExpectedTimestamps{
2917 .remote_time = e + chrono::microseconds(9995000),
2918 .remote_transmit_time =
2919 e + chrono::microseconds(9995000) + send_delay,
2920 .event_time =
2921 e + chrono::microseconds(9995000) + send_delay + network_delay,
2922 .value = 0,
2923 },
2924 ExpectedTimestamps{
2925 .remote_time = e + chrono::microseconds(19999999),
2926 .remote_transmit_time =
2927 e + chrono::microseconds(19999999) + send_delay,
2928 .event_time =
2929 e + chrono::microseconds(-1) + send_delay + network_delay,
2930 .value = 1,
2931 },
2932 ExpectedTimestamps{
2933 .remote_time = e + chrono::microseconds(29999998),
2934 .remote_transmit_time =
2935 e + chrono::microseconds(29999998) + send_delay,
2936 .event_time =
2937 e + chrono::microseconds(-2) + send_delay + network_delay,
2938 .value = 2,
2939 },
2940 ExpectedTimestamps{
2941 .remote_time = e + chrono::microseconds(69999840),
2942 .remote_transmit_time =
2943 e + chrono::microseconds(69999840) + send_delay,
2944 .event_time =
2945 e + chrono::microseconds(9999840) + send_delay + network_delay,
2946 .value = 6,
2947 },
2948 };
2949
2950 ASSERT_EQ(msgs.size(), expected_values.size());
2951
2952 for (size_t i = 0; i < msgs.size(); ++i) {
2953 EXPECT_EQ(msgs[i].first.monotonic_remote_time,
2954 expected_values[i].remote_time);
2955 EXPECT_EQ(msgs[i].first.monotonic_remote_transmit_time,
2956 expected_values[i].remote_transmit_time);
2957 EXPECT_EQ(msgs[i].first.monotonic_event_time,
2958 expected_values[i].event_time);
2959 EXPECT_EQ(msgs[i].second, expected_values[i].value);
2960 }
2961
2962 // TODO(austin): Verify that the dropped packet count increases.
2963}
2964
2965// Tests that rebooting while a message is in flight works as expected.
2966TEST_F(SimulatedEventLoopDisconnectTest, ReliableMessageInFlightDuringReboot) {
2967 time.StartEqual();
2968 for (int i = 0; i < 8; ++i) {
2969 time.RebootAt(1, distributed_clock::epoch() + chrono::seconds(10 * i));
2970 }
2971
2972 factory.SkipTimingReport();
2973 factory.DisableStatistics();
2974
2975 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2976 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2977
2978 std::unique_ptr<aos::EventLoop> pi1_event_loop = pi1->MakeEventLoop("sender");
2979
2980 aos::monotonic_clock::time_point now = pi1_event_loop->monotonic_now();
2981 FunctionScheduler run_at(pi1_event_loop.get());
2982 aos::Sender<examples::Ping> pi1_sender =
2983 pi1_event_loop->MakeSender<examples::Ping>("/reliable");
2984
2985 int i = 0;
2986 for (const std::chrono::nanoseconds dt :
2987 {chrono::microseconds(5000), chrono::microseconds(1),
2988 chrono::microseconds(2), chrono::microseconds(70),
2989 chrono::microseconds(63), chrono::microseconds(140),
2990 chrono::microseconds(160)}) {
2991 run_at.ScheduleAt([&, i = i]() { SendPing(&pi1_sender, i); },
2992 now + chrono::seconds(10) - dt);
2993
2994 now += chrono::seconds(10);
2995 ++i;
2996 }
2997
2998 std::vector<std::pair<aos::Context, int>> msgs;
2999
3000 PingLogger *logger;
3001 pi2->OnStartup([pi2, &msgs, &logger]() {
3002 logger = pi2->AlwaysStart<PingLogger>("ping_logger", "/reliable", &msgs);
3003 });
3004
3005 factory.RunFor(now - pi1_event_loop->monotonic_now() + chrono::seconds(10));
3006
3007 // Stop the logger to flush the last boot of data.
3008 pi2->Stop(logger);
3009
3010 const monotonic_clock::time_point e = monotonic_clock::epoch();
3011 const chrono::nanoseconds send_delay = factory.send_delay();
3012 const chrono::nanoseconds network_delay = factory.network_delay();
3013
3014 // Verified using --vmodule=simulated_event_loop=1 and looking at the actual
3015 // event times to confirm what should have been forwarded when.
3016 const std::vector<ExpectedTimestamps> expected_values = {
3017 ExpectedTimestamps{
3018 .remote_time = e + chrono::microseconds(9995000),
3019 .remote_transmit_time =
3020 e + chrono::microseconds(9995000) + send_delay,
3021 .event_time =
3022 e + chrono::microseconds(9995000) + send_delay + network_delay,
3023 .value = 0,
3024 },
3025 ExpectedTimestamps{
3026 .remote_time = e + chrono::microseconds(9995000),
3027 .remote_transmit_time = e + chrono::microseconds(10000000),
3028 .event_time = e + network_delay,
3029 .value = 0,
3030 },
3031 ExpectedTimestamps{
3032 .remote_time = e + chrono::microseconds(19999999),
3033 .remote_transmit_time = e + chrono::microseconds(20000000),
3034 .event_time = e + network_delay,
3035 .value = 1,
3036 },
3037 ExpectedTimestamps{
3038 .remote_time = e + chrono::microseconds(29999998),
3039 .remote_transmit_time = e + chrono::microseconds(30000000),
3040 .event_time = e + network_delay,
3041 .value = 2,
3042 },
3043 ExpectedTimestamps{
3044 .remote_time = e + chrono::microseconds(39999930),
3045 .remote_transmit_time = e + chrono::microseconds(40000000),
3046 .event_time = e + network_delay,
3047 .value = 3,
3048 },
3049 ExpectedTimestamps{
3050 .remote_time = e + chrono::microseconds(49999937),
3051 .remote_transmit_time = e + chrono::microseconds(50000000),
3052 .event_time = e + network_delay,
3053 .value = 4,
3054 },
3055 ExpectedTimestamps{
3056 .remote_time = e + chrono::microseconds(59999860),
3057 .remote_transmit_time = e + chrono::microseconds(60000000),
3058 .event_time = e + network_delay,
3059 .value = 5,
3060 },
3061 ExpectedTimestamps{
3062 .remote_time = e + chrono::microseconds(69999840),
3063 .remote_transmit_time = e + chrono::microseconds(69999890),
3064 .event_time = e + chrono::microseconds(9999890) + network_delay,
3065 .value = 6,
3066 },
3067 ExpectedTimestamps{
3068 .remote_time = e + chrono::microseconds(69999840),
3069 .remote_transmit_time = e + chrono::microseconds(70000000),
3070 .event_time = e + network_delay,
3071 .value = 6,
3072 },
3073 };
3074
3075 ASSERT_EQ(msgs.size(), expected_values.size());
3076
3077 for (size_t i = 0; i < msgs.size(); ++i) {
3078 EXPECT_EQ(msgs[i].first.monotonic_remote_time,
3079 expected_values[i].remote_time);
3080 EXPECT_EQ(msgs[i].first.monotonic_remote_transmit_time,
3081 expected_values[i].remote_transmit_time);
3082 EXPECT_EQ(msgs[i].first.monotonic_event_time,
3083 expected_values[i].event_time);
3084 EXPECT_EQ(msgs[i].second, expected_values[i].value);
3085 }
3086
3087 // TODO(austin): Verify that the dropped packet count increases.
3088}
3089
Stephan Pleinesf63bde82024-01-13 15:59:33 -08003090} // namespace aos::testing