blob: b717ea370bd2401dc74169b5b51af0949cff8f7d [file] [log] [blame]
Alex Perrycb7da4b2019-08-28 19:35:56 -07001#include "aos/events/simulated_event_loop.h"
2
milind1f1dca32021-07-03 13:50:07 -07003#include <chrono>
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -07004#include <functional>
Austin Schuh5f1cc5c2019-12-01 18:01:11 -08005#include <string_view>
6
Philipp Schrader790cb542023-07-05 21:06:52 -07007#include "gtest/gtest.h"
8
Alex Perrycb7da4b2019-08-28 19:35:56 -07009#include "aos/events/event_loop_param_test.h"
Austin Schuh2f8fd752020-09-01 22:38:28 -070010#include "aos/events/logging/logger_generated.h"
Austin Schuh01b4c352020-09-21 23:09:39 -070011#include "aos/events/message_counter.h"
Austin Schuh898f4972020-01-11 17:21:25 -080012#include "aos/events/ping_lib.h"
13#include "aos/events/pong_lib.h"
Austin Schuh7d87b672019-12-01 20:23:49 -080014#include "aos/events/test_message_generated.h"
Austin Schuh4c3b9702020-08-30 11:34:55 -070015#include "aos/network/message_bridge_client_generated.h"
16#include "aos/network/message_bridge_server_generated.h"
Austin Schuh0de30f32020-12-06 12:44:28 -080017#include "aos/network/remote_message_generated.h"
Austin Schuh87dd3832021-01-01 23:07:31 -080018#include "aos/network/testing_time_converter.h"
Austin Schuh4c3b9702020-08-30 11:34:55 -070019#include "aos/network/timestamp_generated.h"
Austin Schuh373f1762021-06-02 21:07:09 -070020#include "aos/testing/path.h"
Neil Balchc8f41ed2018-01-20 22:06:53 -080021
Stephan Pleinesf63bde82024-01-13 15:59:33 -080022namespace aos::testing {
Brian Silverman28d14302020-09-18 15:26:17 -070023namespace {
24
Austin Schuh373f1762021-06-02 21:07:09 -070025using aos::testing::ArtifactPath;
Brian Silverman28d14302020-09-18 15:26:17 -070026
Austin Schuh58646e22021-08-23 23:51:46 -070027using logger::BootTimestamp;
Austin Schuh0de30f32020-12-06 12:44:28 -080028using message_bridge::RemoteMessage;
Austin Schuh7267c532019-05-19 19:55:53 -070029namespace chrono = ::std::chrono;
30
Austin Schuh0de30f32020-12-06 12:44:28 -080031} // namespace
32
Neil Balchc8f41ed2018-01-20 22:06:53 -080033class SimulatedEventLoopTestFactory : public EventLoopTestFactory {
34 public:
Austin Schuh5f1cc5c2019-12-01 18:01:11 -080035 ::std::unique_ptr<EventLoop> Make(std::string_view name) override {
Austin Schuh217a9782019-12-21 23:02:50 -080036 MaybeMake();
Austin Schuhac0771c2020-01-07 18:36:30 -080037 return event_loop_factory_->MakeEventLoop(name, my_node());
Neil Balchc8f41ed2018-01-20 22:06:53 -080038 }
Austin Schuh5f1cc5c2019-12-01 18:01:11 -080039 ::std::unique_ptr<EventLoop> MakePrimary(std::string_view name) override {
Austin Schuh217a9782019-12-21 23:02:50 -080040 MaybeMake();
Austin Schuhac0771c2020-01-07 18:36:30 -080041 return event_loop_factory_->MakeEventLoop(name, my_node());
Austin Schuh44019f92019-05-19 19:58:27 -070042 }
43
Austin Schuh217a9782019-12-21 23:02:50 -080044 void Run() override { event_loop_factory_->Run(); }
45 void Exit() override { event_loop_factory_->Exit(); }
Austin Schuh44019f92019-05-19 19:58:27 -070046
Austin Schuh52d325c2019-06-23 18:59:06 -070047 // TODO(austin): Implement this. It's used currently for a phased loop test.
48 // I'm not sure how much that matters.
49 void SleepFor(::std::chrono::nanoseconds /*duration*/) override {}
50
Austin Schuh7d87b672019-12-01 20:23:49 -080051 void set_send_delay(std::chrono::nanoseconds send_delay) {
Austin Schuh217a9782019-12-21 23:02:50 -080052 MaybeMake();
53 event_loop_factory_->set_send_delay(send_delay);
Austin Schuh7d87b672019-12-01 20:23:49 -080054 }
55
Neil Balchc8f41ed2018-01-20 22:06:53 -080056 private:
Austin Schuh217a9782019-12-21 23:02:50 -080057 void MaybeMake() {
58 if (!event_loop_factory_) {
59 if (configuration()->has_nodes()) {
Austin Schuhac0771c2020-01-07 18:36:30 -080060 event_loop_factory_ =
61 std::make_unique<SimulatedEventLoopFactory>(configuration());
Austin Schuh217a9782019-12-21 23:02:50 -080062 } else {
63 event_loop_factory_ =
64 std::make_unique<SimulatedEventLoopFactory>(configuration());
65 }
66 }
67 }
68 std::unique_ptr<SimulatedEventLoopFactory> event_loop_factory_;
Neil Balchc8f41ed2018-01-20 22:06:53 -080069};
70
Austin Schuh6bae8252021-02-07 22:01:49 -080071auto CommonParameters() {
72 return ::testing::Combine(
73 ::testing::Values([]() { return new SimulatedEventLoopTestFactory(); }),
74 ::testing::Values(ReadMethod::COPY, ReadMethod::PIN),
75 ::testing::Values(DoTimingReports::kYes, DoTimingReports::kNo));
76}
Austin Schuh6b6dfa52019-06-12 20:16:20 -070077
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -070078INSTANTIATE_TEST_SUITE_P(SimulatedEventLoopCommonTest, AbstractEventLoopTest,
Austin Schuh66168842021-08-17 19:42:21 -070079 CommonParameters());
Brian Silverman77162972020-08-12 19:52:40 -070080
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -070081INSTANTIATE_TEST_SUITE_P(SimulatedEventLoopCommonDeathTest,
Austin Schuh66168842021-08-17 19:42:21 -070082 AbstractEventLoopDeathTest, CommonParameters());
Neil Balchc8f41ed2018-01-20 22:06:53 -080083
Austin Schuh89c9b812021-02-20 14:42:10 -080084// Parameters to run all the tests with.
85struct Param {
86 // The config file to use.
87 std::string config;
88 // If true, the RemoteMessage channel should be shared between all the remote
89 // channels. If false, there will be 1 RemoteMessage channel per remote
90 // channel.
91 bool shared;
92};
93
94class RemoteMessageSimulatedEventLoopTest
95 : public ::testing::TestWithParam<struct Param> {
96 public:
97 RemoteMessageSimulatedEventLoopTest()
98 : config(aos::configuration::ReadConfig(
Austin Schuh373f1762021-06-02 21:07:09 -070099 ArtifactPath(absl::StrCat("aos/events/", GetParam().config)))) {
Austin Schuh89c9b812021-02-20 14:42:10 -0800100 LOG(INFO) << "Config " << GetParam().config;
101 }
102
103 bool shared() const { return GetParam().shared; }
104
105 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
106 MakePi2OnPi1MessageCounters(aos::EventLoop *event_loop) {
107 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>> counters;
108 if (shared()) {
109 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
110 event_loop, "/aos/remote_timestamps/pi2"));
111 } else {
112 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
113 event_loop,
114 "/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp"));
115 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
116 event_loop, "/aos/remote_timestamps/pi2/test/aos-examples-Ping"));
117 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
118 event_loop, "/aos/remote_timestamps/pi2/reliable/aos-examples-Ping"));
119 }
120 return counters;
121 }
122
123 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
124 MakePi1OnPi2MessageCounters(aos::EventLoop *event_loop) {
125 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>> counters;
126 if (shared()) {
127 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
128 event_loop, "/aos/remote_timestamps/pi1"));
129 } else {
130 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
131 event_loop, "/aos/remote_timestamps/pi1/test/aos-examples-Pong"));
132 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
133 event_loop,
134 "/aos/remote_timestamps/pi1/pi2/aos/aos-message_bridge-Timestamp"));
135 }
136 return counters;
137 }
138
139 aos::FlatbufferDetachedBuffer<aos::Configuration> config;
140};
141
Austin Schuh8fb315a2020-11-19 22:33:58 -0800142// Test that sending a message after running gets properly notified.
143TEST(SimulatedEventLoopTest, SendAfterRunFor) {
144 SimulatedEventLoopTestFactory factory;
145
146 SimulatedEventLoopFactory simulated_event_loop_factory(
147 factory.configuration());
148
149 ::std::unique_ptr<EventLoop> ping_event_loop =
150 simulated_event_loop_factory.MakeEventLoop("ping");
151 aos::Sender<TestMessage> test_message_sender =
152 ping_event_loop->MakeSender<TestMessage>("/test");
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700153 ASSERT_EQ(SendTestMessage(test_message_sender), RawSender::Error::kOk);
Austin Schuh8fb315a2020-11-19 22:33:58 -0800154
155 std::unique_ptr<EventLoop> pong1_event_loop =
156 simulated_event_loop_factory.MakeEventLoop("pong");
157 MessageCounter<TestMessage> test_message_counter1(pong1_event_loop.get(),
158 "/test");
159
160 EXPECT_FALSE(ping_event_loop->is_running());
161
162 // Watchers start when you start running, so there should be nothing counted.
163 simulated_event_loop_factory.RunFor(chrono::seconds(1));
164 EXPECT_EQ(test_message_counter1.count(), 0u);
165
166 std::unique_ptr<EventLoop> pong2_event_loop =
167 simulated_event_loop_factory.MakeEventLoop("pong");
168 MessageCounter<TestMessage> test_message_counter2(pong2_event_loop.get(),
169 "/test");
170
171 // Pauses in the middle don't count though, so this should be counted.
172 // But, the fresh watcher shouldn't pick it up yet.
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700173 ASSERT_EQ(SendTestMessage(test_message_sender), RawSender::Error::kOk);
Austin Schuh8fb315a2020-11-19 22:33:58 -0800174
175 EXPECT_EQ(test_message_counter1.count(), 0u);
176 EXPECT_EQ(test_message_counter2.count(), 0u);
177 simulated_event_loop_factory.RunFor(chrono::seconds(1));
178
179 EXPECT_EQ(test_message_counter1.count(), 1u);
180 EXPECT_EQ(test_message_counter2.count(), 0u);
181}
182
Austin Schuh60e77942022-05-16 17:48:24 -0700183// Test that if we configure an event loop to be able to send too fast that we
184// do allow it to do so.
James Kuszmaul890c2492022-04-06 14:59:31 -0700185TEST(SimulatedEventLoopTest, AllowSendTooFast) {
186 SimulatedEventLoopTestFactory factory;
187
188 SimulatedEventLoopFactory simulated_event_loop_factory(
189 factory.configuration());
190
191 // Create two event loops: One will be allowed to send too fast, one won't. We
192 // will then test to ensure that the one that is allowed to send too fast can
193 // indeed send too fast, but that it then makes it so that the second event
194 // loop can no longer send anything because *it* is still limited.
195 ::std::unique_ptr<EventLoop> too_fast_event_loop =
196 simulated_event_loop_factory.GetNodeEventLoopFactory(nullptr)
197 ->MakeEventLoop("too_fast_sender",
198 {NodeEventLoopFactory::CheckSentTooFast::kNo,
James Kuszmaul94ca5132022-07-19 09:11:08 -0700199 NodeEventLoopFactory::ExclusiveSenders::kNo,
200 {}});
James Kuszmaul890c2492022-04-06 14:59:31 -0700201 aos::Sender<TestMessage> too_fast_message_sender =
202 too_fast_event_loop->MakeSender<TestMessage>("/test");
203
204 ::std::unique_ptr<EventLoop> limited_event_loop =
205 simulated_event_loop_factory.MakeEventLoop("limited_sender");
206 aos::Sender<TestMessage> limited_message_sender =
207 limited_event_loop->MakeSender<TestMessage>("/test");
208
209 const int queue_size = TestChannelQueueSize(too_fast_event_loop.get());
210 for (int ii = 0; ii < queue_size; ++ii) {
211 ASSERT_EQ(SendTestMessage(too_fast_message_sender), RawSender::Error::kOk);
212 }
213 // And now we should start being in the sending-too-fast phase.
214 for (int ii = 0; ii < queue_size; ++ii) {
215 ASSERT_EQ(SendTestMessage(too_fast_message_sender), RawSender::Error::kOk);
Austin Schuh60e77942022-05-16 17:48:24 -0700216 ASSERT_EQ(SendTestMessage(limited_message_sender),
217 RawSender::Error::kMessagesSentTooFast);
James Kuszmaul890c2492022-04-06 14:59:31 -0700218 }
219}
220
221// Test that if we setup an exclusive sender that it is indeed exclusive.
222TEST(SimulatedEventLoopDeathTest, ExclusiveSenders) {
223 SimulatedEventLoopTestFactory factory;
224
225 SimulatedEventLoopFactory simulated_event_loop_factory(
226 factory.configuration());
227
228 ::std::unique_ptr<EventLoop> exclusive_event_loop =
229 simulated_event_loop_factory.GetNodeEventLoopFactory(nullptr)
James Kuszmaul94ca5132022-07-19 09:11:08 -0700230 ->MakeEventLoop(
231 "too_fast_sender",
232 {NodeEventLoopFactory::CheckSentTooFast::kYes,
233 NodeEventLoopFactory::ExclusiveSenders::kYes,
234 {{configuration::GetChannel(factory.configuration(), "/test1",
235 "aos.TestMessage", "", nullptr),
236 NodeEventLoopFactory::ExclusiveSenders::kNo}}});
James Kuszmaul890c2492022-04-06 14:59:31 -0700237 exclusive_event_loop->SkipAosLog();
238 exclusive_event_loop->SkipTimingReport();
239 ::std::unique_ptr<EventLoop> normal_event_loop =
240 simulated_event_loop_factory.MakeEventLoop("limited_sender");
241 // Set things up to have the exclusive sender be destroyed so we can test
242 // recovery.
243 {
244 aos::Sender<TestMessage> exclusive_sender =
245 exclusive_event_loop->MakeSender<TestMessage>("/test");
246
247 EXPECT_DEATH(normal_event_loop->MakeSender<TestMessage>("/test"),
248 "TestMessage");
249 }
250 // This one should succeed now that the exclusive channel is removed.
251 aos::Sender<TestMessage> normal_sender =
252 normal_event_loop->MakeSender<TestMessage>("/test");
Austin Schuh60e77942022-05-16 17:48:24 -0700253 EXPECT_DEATH(exclusive_event_loop->MakeSender<TestMessage>("/test"),
254 "TestMessage");
James Kuszmaul94ca5132022-07-19 09:11:08 -0700255
256 // And check an explicitly exempted channel:
257 aos::Sender<TestMessage> non_exclusive_sender =
258 exclusive_event_loop->MakeSender<TestMessage>("/test1");
259 aos::Sender<TestMessage> non_exclusive_sender_regular_event_loop =
260 normal_event_loop->MakeSender<TestMessage>("/test1");
James Kuszmaul890c2492022-04-06 14:59:31 -0700261}
262
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700263void TestSentTooFastCheckEdgeCase(
264 const std::function<RawSender::Error(int, int)> expected_err,
265 const bool send_twice_at_end) {
266 SimulatedEventLoopTestFactory factory;
267
268 auto event_loop = factory.MakePrimary("primary");
269
270 auto sender = event_loop->MakeSender<TestMessage>("/test");
271
272 const int queue_size = TestChannelQueueSize(event_loop.get());
273 int msgs_sent = 0;
274 event_loop->AddPhasedLoop(
275 [&](int) {
276 EXPECT_EQ(SendTestMessage(sender), expected_err(msgs_sent, queue_size));
277 msgs_sent++;
278
279 // If send_twice_at_end, send the last two messages (message
280 // queue_size and queue_size + 1) in the same iteration, meaning that
281 // we would be sending very slightly too fast. Otherwise, we will send
282 // message queue_size + 1 in the next iteration and we will continue
283 // to be sending exactly at the channel frequency.
284 if (send_twice_at_end && (msgs_sent == queue_size)) {
285 EXPECT_EQ(SendTestMessage(sender),
286 expected_err(msgs_sent, queue_size));
287 msgs_sent++;
288 }
289
290 if (msgs_sent > queue_size) {
291 factory.Exit();
292 }
293 },
294 std::chrono::duration_cast<std::chrono::nanoseconds>(
295 std::chrono::duration<double>(
296 1.0 / TestChannelFrequency(event_loop.get()))));
297
298 factory.Run();
299}
300
301// Tests that RawSender::Error::kMessagesSentTooFast is not returned
302// when messages are sent at the exact frequency of the channel.
303TEST(SimulatedEventLoopTest, SendingAtExactlyChannelFrequency) {
304 TestSentTooFastCheckEdgeCase([](int, int) { return RawSender::Error::kOk; },
305 false);
306}
307
308// Tests that RawSender::Error::kMessagesSentTooFast is returned
309// when sending exactly one more message than allowed in a channel storage
310// duration.
311TEST(SimulatedEventLoopTest, SendingSlightlyTooFast) {
312 TestSentTooFastCheckEdgeCase(
313 [](const int msgs_sent, const int queue_size) {
314 return (msgs_sent == queue_size ? RawSender::Error::kMessagesSentTooFast
315 : RawSender::Error::kOk);
316 },
317 true);
318}
319
Austin Schuh8fb315a2020-11-19 22:33:58 -0800320// Test that creating an event loop while running dies.
321TEST(SimulatedEventLoopDeathTest, MakeEventLoopWhileRunning) {
322 SimulatedEventLoopTestFactory factory;
323
324 SimulatedEventLoopFactory simulated_event_loop_factory(
325 factory.configuration());
326
327 ::std::unique_ptr<EventLoop> event_loop =
328 simulated_event_loop_factory.MakeEventLoop("ping");
329
330 auto timer = event_loop->AddTimer([&]() {
331 EXPECT_DEATH(
332 {
333 ::std::unique_ptr<EventLoop> event_loop2 =
334 simulated_event_loop_factory.MakeEventLoop("ping");
335 },
336 "event loop while running");
337 simulated_event_loop_factory.Exit();
338 });
339
340 event_loop->OnRun([&event_loop, &timer] {
Philipp Schradera6712522023-07-05 20:25:11 -0700341 timer->Schedule(event_loop->monotonic_now() + chrono::milliseconds(50));
Austin Schuh8fb315a2020-11-19 22:33:58 -0800342 });
343
344 simulated_event_loop_factory.Run();
345}
346
347// Test that creating a watcher after running dies.
348TEST(SimulatedEventLoopDeathTest, MakeWatcherAfterRunning) {
349 SimulatedEventLoopTestFactory factory;
350
351 SimulatedEventLoopFactory simulated_event_loop_factory(
352 factory.configuration());
353
354 ::std::unique_ptr<EventLoop> event_loop =
355 simulated_event_loop_factory.MakeEventLoop("ping");
356
357 simulated_event_loop_factory.RunFor(chrono::seconds(1));
358
359 EXPECT_DEATH(
360 { MessageCounter<TestMessage> counter(event_loop.get(), "/test"); },
361 "Can't add a watcher after running");
362
363 ::std::unique_ptr<EventLoop> event_loop2 =
364 simulated_event_loop_factory.MakeEventLoop("ping");
365
366 simulated_event_loop_factory.RunFor(chrono::seconds(1));
367
368 EXPECT_DEATH(
369 { MessageCounter<TestMessage> counter(event_loop2.get(), "/test"); },
370 "Can't add a watcher after running");
371}
372
Austin Schuh44019f92019-05-19 19:58:27 -0700373// Test that running for a time period with no handlers causes time to progress
374// correctly.
375TEST(SimulatedEventLoopTest, RunForNoHandlers) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800376 SimulatedEventLoopTestFactory factory;
377
378 SimulatedEventLoopFactory simulated_event_loop_factory(
379 factory.configuration());
Austin Schuh44019f92019-05-19 19:58:27 -0700380 ::std::unique_ptr<EventLoop> event_loop =
Austin Schuh5f1cc5c2019-12-01 18:01:11 -0800381 simulated_event_loop_factory.MakeEventLoop("loop");
Austin Schuh44019f92019-05-19 19:58:27 -0700382
383 simulated_event_loop_factory.RunFor(chrono::seconds(1));
384
385 EXPECT_EQ(::aos::monotonic_clock::epoch() + chrono::seconds(1),
Austin Schuh44019f92019-05-19 19:58:27 -0700386 event_loop->monotonic_now());
387}
388
389// Test that running for a time with a periodic handler causes time to end
390// correctly.
391TEST(SimulatedEventLoopTest, RunForTimerHandler) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800392 SimulatedEventLoopTestFactory factory;
393
394 SimulatedEventLoopFactory simulated_event_loop_factory(
395 factory.configuration());
Austin Schuh44019f92019-05-19 19:58:27 -0700396 ::std::unique_ptr<EventLoop> event_loop =
Austin Schuh5f1cc5c2019-12-01 18:01:11 -0800397 simulated_event_loop_factory.MakeEventLoop("loop");
Austin Schuh44019f92019-05-19 19:58:27 -0700398
399 int counter = 0;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700400 auto timer = event_loop->AddTimer([&counter]() { ++counter; });
Austin Schuh44019f92019-05-19 19:58:27 -0700401 event_loop->OnRun([&event_loop, &timer] {
Philipp Schradera6712522023-07-05 20:25:11 -0700402 timer->Schedule(event_loop->monotonic_now() + chrono::milliseconds(50),
403 chrono::milliseconds(100));
Austin Schuh44019f92019-05-19 19:58:27 -0700404 });
405
406 simulated_event_loop_factory.RunFor(chrono::seconds(1));
407
408 EXPECT_EQ(::aos::monotonic_clock::epoch() + chrono::seconds(1),
Austin Schuh44019f92019-05-19 19:58:27 -0700409 event_loop->monotonic_now());
410 EXPECT_EQ(counter, 10);
411}
412
Austin Schuh7d87b672019-12-01 20:23:49 -0800413// Tests that watchers have latency in simulation.
414TEST(SimulatedEventLoopTest, WatcherTimingReport) {
415 SimulatedEventLoopTestFactory factory;
416 factory.set_send_delay(std::chrono::microseconds(50));
417
418 FLAGS_timing_report_ms = 1000;
419 auto loop1 = factory.MakePrimary("primary");
420 loop1->MakeWatcher("/test", [](const TestMessage &) {});
421
422 auto loop2 = factory.Make("sender_loop");
423
424 auto loop3 = factory.Make("report_fetcher");
425
426 Fetcher<timing::Report> report_fetcher =
427 loop3->MakeFetcher<timing::Report>("/aos");
428 EXPECT_FALSE(report_fetcher.Fetch());
429
430 auto sender = loop2->MakeSender<TestMessage>("/test");
431
432 // Send 10 messages in the middle of a timing report period so we get
433 // something interesting back.
434 auto test_timer = loop2->AddTimer([&sender]() {
435 for (int i = 0; i < 10; ++i) {
436 aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
437 TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
438 builder.add_value(200 + i);
milind1f1dca32021-07-03 13:50:07 -0700439 msg.CheckOk(msg.Send(builder.Finish()));
Austin Schuh7d87b672019-12-01 20:23:49 -0800440 }
441 });
442
443 // Quit after 1 timing report, mid way through the next cycle.
444 {
445 auto end_timer = loop1->AddTimer([&factory]() { factory.Exit(); });
Philipp Schradera6712522023-07-05 20:25:11 -0700446 end_timer->Schedule(loop1->monotonic_now() + chrono::milliseconds(2500));
Austin Schuh7d87b672019-12-01 20:23:49 -0800447 end_timer->set_name("end");
448 }
449
450 loop1->OnRun([&test_timer, &loop1]() {
Philipp Schradera6712522023-07-05 20:25:11 -0700451 test_timer->Schedule(loop1->monotonic_now() + chrono::milliseconds(1500));
Austin Schuh7d87b672019-12-01 20:23:49 -0800452 });
453
454 factory.Run();
455
456 // And, since we are here, check that the timing report makes sense.
457 // Start by looking for our event loop's timing.
458 FlatbufferDetachedBuffer<timing::Report> primary_report =
459 FlatbufferDetachedBuffer<timing::Report>::Empty();
460 while (report_fetcher.FetchNext()) {
461 LOG(INFO) << "Report " << FlatbufferToJson(report_fetcher.get());
462 if (report_fetcher->name()->string_view() == "primary") {
463 primary_report = CopyFlatBuffer(report_fetcher.get());
464 }
465 }
466
467 // Check the watcher report.
Ravago Jonescf453ab2020-05-06 21:14:53 -0700468 VLOG(1) << FlatbufferToJson(primary_report, {.multi_line = true});
Austin Schuh7d87b672019-12-01 20:23:49 -0800469
470 EXPECT_EQ(primary_report.message().name()->string_view(), "primary");
471
472 // Just the timing report timer.
473 ASSERT_NE(primary_report.message().timers(), nullptr);
474 EXPECT_EQ(primary_report.message().timers()->size(), 2);
475
476 // No phased loops
477 ASSERT_EQ(primary_report.message().phased_loops(), nullptr);
478
479 // And now confirm that the watcher received all 10 messages, and has latency.
480 ASSERT_NE(primary_report.message().watchers(), nullptr);
481 ASSERT_EQ(primary_report.message().watchers()->size(), 1);
482 EXPECT_EQ(primary_report.message().watchers()->Get(0)->count(), 10);
483 EXPECT_NEAR(
484 primary_report.message().watchers()->Get(0)->wakeup_latency()->average(),
485 0.00005, 1e-9);
486 EXPECT_NEAR(
487 primary_report.message().watchers()->Get(0)->wakeup_latency()->min(),
488 0.00005, 1e-9);
489 EXPECT_NEAR(
490 primary_report.message().watchers()->Get(0)->wakeup_latency()->max(),
491 0.00005, 1e-9);
492 EXPECT_EQ(primary_report.message()
493 .watchers()
494 ->Get(0)
495 ->wakeup_latency()
496 ->standard_deviation(),
497 0.0);
498
499 EXPECT_EQ(
500 primary_report.message().watchers()->Get(0)->handler_time()->average(),
501 0.0);
502 EXPECT_EQ(primary_report.message().watchers()->Get(0)->handler_time()->min(),
503 0.0);
504 EXPECT_EQ(primary_report.message().watchers()->Get(0)->handler_time()->max(),
505 0.0);
506 EXPECT_EQ(primary_report.message()
507 .watchers()
508 ->Get(0)
509 ->handler_time()
510 ->standard_deviation(),
511 0.0);
512}
513
Austin Schuh89c9b812021-02-20 14:42:10 -0800514size_t CountAll(
515 const std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
516 &counters) {
517 size_t count = 0u;
518 for (const std::unique_ptr<MessageCounter<RemoteMessage>> &counter :
519 counters) {
520 count += counter->count();
521 }
522 return count;
523}
524
Austin Schuh4c3b9702020-08-30 11:34:55 -0700525// Tests that ping and pong work when on 2 different nodes, and the message
526// gateway messages are sent out as expected.
Austin Schuh89c9b812021-02-20 14:42:10 -0800527TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodePingPong) {
Austin Schuh898f4972020-01-11 17:21:25 -0800528 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
529 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
Austin Schuh4c3b9702020-08-30 11:34:55 -0700530 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
Austin Schuh898f4972020-01-11 17:21:25 -0800531
532 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
533
534 std::unique_ptr<EventLoop> ping_event_loop =
535 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
536 Ping ping(ping_event_loop.get());
537
538 std::unique_ptr<EventLoop> pong_event_loop =
539 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
540 Pong pong(pong_event_loop.get());
541
542 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
543 simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700544 MessageCounter<examples::Pong> pi2_pong_counter(
545 pi2_pong_counter_event_loop.get(), "/test");
Austin Schuh2f8fd752020-09-01 22:38:28 -0700546 aos::Fetcher<message_bridge::Timestamp> pi1_on_pi2_timestamp_fetcher =
547 pi2_pong_counter_event_loop->MakeFetcher<message_bridge::Timestamp>(
548 "/pi1/aos");
549 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
550 pi2_pong_counter_event_loop->MakeFetcher<examples::Ping>("/test");
Austin Schuh898f4972020-01-11 17:21:25 -0800551
Austin Schuh4c3b9702020-08-30 11:34:55 -0700552 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
553 simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
Austin Schuh898f4972020-01-11 17:21:25 -0800554
555 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
556 simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700557 MessageCounter<examples::Pong> pi1_pong_counter(
558 pi1_pong_counter_event_loop.get(), "/test");
Austin Schuh2f8fd752020-09-01 22:38:28 -0700559 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
560 pi1_pong_counter_event_loop->MakeFetcher<examples::Ping>("/test");
561 aos::Fetcher<message_bridge::Timestamp> pi1_on_pi1_timestamp_fetcher =
562 pi1_pong_counter_event_loop->MakeFetcher<message_bridge::Timestamp>(
563 "/aos");
564
Austin Schuh4c3b9702020-08-30 11:34:55 -0700565 // Count timestamps.
566 MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
567 pi1_pong_counter_event_loop.get(), "/pi1/aos");
568 MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
569 pi2_pong_counter_event_loop.get(), "/pi1/aos");
570 MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
571 pi3_pong_counter_event_loop.get(), "/pi1/aos");
572 MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
573 pi1_pong_counter_event_loop.get(), "/pi2/aos");
574 MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
575 pi2_pong_counter_event_loop.get(), "/pi2/aos");
576 MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
577 pi1_pong_counter_event_loop.get(), "/pi3/aos");
578 MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
579 pi3_pong_counter_event_loop.get(), "/pi3/aos");
580
Austin Schuh2f8fd752020-09-01 22:38:28 -0700581 // Count remote timestamps
Austin Schuh89c9b812021-02-20 14:42:10 -0800582 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
583 remote_timestamps_pi2_on_pi1 =
584 MakePi2OnPi1MessageCounters(pi1_pong_counter_event_loop.get());
585 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
586 remote_timestamps_pi1_on_pi2 =
587 MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700588
Austin Schuh4c3b9702020-08-30 11:34:55 -0700589 // Wait to let timestamp estimation start up before looking for the results.
590 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
591
Austin Schuh8fb315a2020-11-19 22:33:58 -0800592 std::unique_ptr<EventLoop> pi1_statistics_counter_event_loop =
593 simulated_event_loop_factory.MakeEventLoop("pi1_statistics_counter", pi1);
594 std::unique_ptr<EventLoop> pi2_statistics_counter_event_loop =
595 simulated_event_loop_factory.MakeEventLoop("pi2_statistics_counter", pi2);
596 std::unique_ptr<EventLoop> pi3_statistics_counter_event_loop =
597 simulated_event_loop_factory.MakeEventLoop("pi3_statistics_counter", pi3);
598
Austin Schuh4c3b9702020-08-30 11:34:55 -0700599 int pi1_server_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800600 pi1_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700601 "/pi1/aos", [&pi1_server_statistics_count](
602 const message_bridge::ServerStatistics &stats) {
603 VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
604 EXPECT_EQ(stats.connections()->size(), 2u);
605 for (const message_bridge::ServerConnection *connection :
606 *stats.connections()) {
607 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800608 EXPECT_EQ(connection->connection_count(), 1u);
609 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800610 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700611 if (connection->node()->name()->string_view() == "pi2") {
612 EXPECT_GT(connection->sent_packets(), 50);
613 } else if (connection->node()->name()->string_view() == "pi3") {
614 EXPECT_GE(connection->sent_packets(), 5);
615 } else {
616 LOG(FATAL) << "Unknown connection";
617 }
618
619 EXPECT_TRUE(connection->has_monotonic_offset());
620 EXPECT_EQ(connection->monotonic_offset(), 0);
James Kuszmaula6681e22023-05-26 11:20:40 -0700621
622 EXPECT_TRUE(connection->has_channels());
623 int accumulated_sent_count = 0;
624 int accumulated_dropped_count = 0;
625 for (const message_bridge::ServerChannelStatistics *channel :
626 *connection->channels()) {
627 accumulated_sent_count += channel->sent_packets();
628 accumulated_dropped_count += channel->dropped_packets();
629 }
630 EXPECT_EQ(connection->sent_packets(), accumulated_sent_count);
631 EXPECT_EQ(connection->dropped_packets(), accumulated_dropped_count);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700632 }
633 ++pi1_server_statistics_count;
634 });
635
636 int pi2_server_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800637 pi2_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700638 "/pi2/aos", [&pi2_server_statistics_count](
639 const message_bridge::ServerStatistics &stats) {
640 VLOG(1) << "pi2 ServerStatistics " << FlatbufferToJson(&stats);
641 EXPECT_EQ(stats.connections()->size(), 1u);
642
643 const message_bridge::ServerConnection *connection =
644 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800645 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700646 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
647 EXPECT_GT(connection->sent_packets(), 50);
648 EXPECT_TRUE(connection->has_monotonic_offset());
649 EXPECT_EQ(connection->monotonic_offset(), 0);
Austin Schuh367a7f42021-11-23 23:04:36 -0800650 EXPECT_EQ(connection->connection_count(), 1u);
651 EXPECT_EQ(connection->connected_since_time(), 0);
James Kuszmaula6681e22023-05-26 11:20:40 -0700652
653 EXPECT_TRUE(connection->has_channels());
654 int accumulated_sent_count = 0;
655 int accumulated_dropped_count = 0;
656 for (const message_bridge::ServerChannelStatistics *channel :
657 *connection->channels()) {
658 accumulated_sent_count += channel->sent_packets();
659 accumulated_dropped_count += channel->dropped_packets();
660 }
661 EXPECT_EQ(connection->sent_packets(), accumulated_sent_count);
662 EXPECT_EQ(connection->dropped_packets(), accumulated_dropped_count);
663
Austin Schuh4c3b9702020-08-30 11:34:55 -0700664 ++pi2_server_statistics_count;
665 });
666
667 int pi3_server_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800668 pi3_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700669 "/pi3/aos", [&pi3_server_statistics_count](
670 const message_bridge::ServerStatistics &stats) {
671 VLOG(1) << "pi3 ServerStatistics " << FlatbufferToJson(&stats);
672 EXPECT_EQ(stats.connections()->size(), 1u);
673
674 const message_bridge::ServerConnection *connection =
675 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800676 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700677 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
678 EXPECT_GE(connection->sent_packets(), 5);
679 EXPECT_TRUE(connection->has_monotonic_offset());
680 EXPECT_EQ(connection->monotonic_offset(), 0);
Austin Schuh367a7f42021-11-23 23:04:36 -0800681 EXPECT_EQ(connection->connection_count(), 1u);
682 EXPECT_EQ(connection->connected_since_time(), 0);
James Kuszmaula6681e22023-05-26 11:20:40 -0700683
684 EXPECT_TRUE(connection->has_channels());
685 int accumulated_sent_count = 0;
686 int accumulated_dropped_count = 0;
687 for (const message_bridge::ServerChannelStatistics *channel :
688 *connection->channels()) {
689 accumulated_sent_count += channel->sent_packets();
690 accumulated_dropped_count += channel->dropped_packets();
691 }
692 EXPECT_EQ(connection->sent_packets(), accumulated_sent_count);
693 EXPECT_EQ(connection->dropped_packets(), accumulated_dropped_count);
694
Austin Schuh4c3b9702020-08-30 11:34:55 -0700695 ++pi3_server_statistics_count;
696 });
697
698 int pi1_client_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800699 pi1_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700700 "/pi1/aos", [&pi1_client_statistics_count](
701 const message_bridge::ClientStatistics &stats) {
702 VLOG(1) << "pi1 ClientStatistics " << FlatbufferToJson(&stats);
703 EXPECT_EQ(stats.connections()->size(), 2u);
704
705 for (const message_bridge::ClientConnection *connection :
706 *stats.connections()) {
707 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
708 if (connection->node()->name()->string_view() == "pi2") {
709 EXPECT_GT(connection->received_packets(), 50);
710 } else if (connection->node()->name()->string_view() == "pi3") {
711 EXPECT_GE(connection->received_packets(), 5);
712 } else {
713 LOG(FATAL) << "Unknown connection";
714 }
715
Austin Schuhe61d4382021-03-31 21:33:02 -0700716 EXPECT_EQ(connection->partial_deliveries(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700717 EXPECT_TRUE(connection->has_monotonic_offset());
Austin Schuhac6d89e2024-03-27 14:56:09 -0700718 EXPECT_EQ(connection->monotonic_offset(), 100000);
Austin Schuh367a7f42021-11-23 23:04:36 -0800719 EXPECT_EQ(connection->connection_count(), 1u);
720 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700721 }
722 ++pi1_client_statistics_count;
723 });
724
725 int pi2_client_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800726 pi2_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700727 "/pi2/aos", [&pi2_client_statistics_count](
728 const message_bridge::ClientStatistics &stats) {
729 VLOG(1) << "pi2 ClientStatistics " << FlatbufferToJson(&stats);
730 EXPECT_EQ(stats.connections()->size(), 1u);
731
732 const message_bridge::ClientConnection *connection =
733 stats.connections()->Get(0);
734 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
735 EXPECT_GT(connection->received_packets(), 50);
Austin Schuhe61d4382021-03-31 21:33:02 -0700736 EXPECT_EQ(connection->partial_deliveries(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700737 EXPECT_TRUE(connection->has_monotonic_offset());
Austin Schuhac6d89e2024-03-27 14:56:09 -0700738 EXPECT_EQ(connection->monotonic_offset(), 100000);
Austin Schuh367a7f42021-11-23 23:04:36 -0800739 EXPECT_EQ(connection->connection_count(), 1u);
740 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700741 ++pi2_client_statistics_count;
742 });
743
744 int pi3_client_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800745 pi3_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700746 "/pi3/aos", [&pi3_client_statistics_count](
747 const message_bridge::ClientStatistics &stats) {
748 VLOG(1) << "pi3 ClientStatistics " << FlatbufferToJson(&stats);
749 EXPECT_EQ(stats.connections()->size(), 1u);
750
751 const message_bridge::ClientConnection *connection =
752 stats.connections()->Get(0);
753 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
754 EXPECT_GE(connection->received_packets(), 5);
Austin Schuhe61d4382021-03-31 21:33:02 -0700755 EXPECT_EQ(connection->partial_deliveries(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700756 EXPECT_TRUE(connection->has_monotonic_offset());
Austin Schuhac6d89e2024-03-27 14:56:09 -0700757 EXPECT_EQ(connection->monotonic_offset(), 100000);
Austin Schuh367a7f42021-11-23 23:04:36 -0800758 EXPECT_EQ(connection->connection_count(), 1u);
759 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700760 ++pi3_client_statistics_count;
761 });
762
Austin Schuh2f8fd752020-09-01 22:38:28 -0700763 // Find the channel index for both the /pi1/aos Timestamp channel and Ping
764 // channel.
765 const size_t pi1_timestamp_channel =
766 configuration::ChannelIndex(pi1_pong_counter_event_loop->configuration(),
767 pi1_on_pi2_timestamp_fetcher.channel());
768 const size_t ping_timestamp_channel =
769 configuration::ChannelIndex(pi1_pong_counter_event_loop->configuration(),
770 ping_on_pi2_fetcher.channel());
771
772 for (const Channel *channel :
773 *pi1_pong_counter_event_loop->configuration()->channels()) {
774 VLOG(1) << "Channel "
775 << configuration::ChannelIndex(
776 pi1_pong_counter_event_loop->configuration(), channel)
777 << " " << configuration::CleanedChannelToString(channel);
778 }
779
Austin Schuh8fb315a2020-11-19 22:33:58 -0800780 std::unique_ptr<EventLoop> pi1_remote_timestamp =
781 simulated_event_loop_factory.MakeEventLoop("pi1_remote_timestamp", pi1);
782
Austin Schuh89c9b812021-02-20 14:42:10 -0800783 for (std::pair<int, std::string> channel :
784 shared()
785 ? std::vector<std::pair<
786 int, std::string>>{{-1, "/pi1/aos/remote_timestamps/pi2"}}
787 : std::vector<std::pair<int, std::string>>{
788 {pi1_timestamp_channel,
789 "/pi1/aos/remote_timestamps/pi2/pi1/aos/"
790 "aos-message_bridge-Timestamp"},
791 {ping_timestamp_channel,
792 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping"}}) {
793 // For each remote timestamp we get back, confirm that it is either a ping
794 // message, or a timestamp we sent out. Also confirm that the timestamps
795 // are correct.
796 pi1_remote_timestamp->MakeWatcher(
797 channel.second,
798 [pi1_timestamp_channel, ping_timestamp_channel, &ping_on_pi2_fetcher,
799 &ping_on_pi1_fetcher, &pi1_on_pi2_timestamp_fetcher,
800 &pi1_on_pi1_timestamp_fetcher, &simulated_event_loop_factory, pi2,
Austin Schuhac6d89e2024-03-27 14:56:09 -0700801 channel_index = channel.first,
802 channel_name = channel.second](const RemoteMessage &header) {
803 VLOG(1) << channel_name << " aos::message_bridge::RemoteMessage -> "
804 << aos::FlatbufferToJson(&header);
Austin Schuh89c9b812021-02-20 14:42:10 -0800805 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuhcdd90272021-03-15 12:46:16 -0700806 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
Austin Schuh89c9b812021-02-20 14:42:10 -0800807 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
Austin Schuhcdd90272021-03-15 12:46:16 -0700808 ->boot_uuid());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700809
Austin Schuh89c9b812021-02-20 14:42:10 -0800810 const aos::monotonic_clock::time_point header_monotonic_sent_time(
811 chrono::nanoseconds(header.monotonic_sent_time()));
812 const aos::realtime_clock::time_point header_realtime_sent_time(
813 chrono::nanoseconds(header.realtime_sent_time()));
814 const aos::monotonic_clock::time_point header_monotonic_remote_time(
815 chrono::nanoseconds(header.monotonic_remote_time()));
Austin Schuhac6d89e2024-03-27 14:56:09 -0700816 const aos::monotonic_clock::time_point
817 header_monotonic_remote_transmit_time(
818 chrono::nanoseconds(header.monotonic_remote_transmit_time()));
Austin Schuh89c9b812021-02-20 14:42:10 -0800819 const aos::realtime_clock::time_point header_realtime_remote_time(
820 chrono::nanoseconds(header.realtime_remote_time()));
Austin Schuh2f8fd752020-09-01 22:38:28 -0700821
Austin Schuh89c9b812021-02-20 14:42:10 -0800822 if (channel_index != -1) {
823 ASSERT_EQ(channel_index, header.channel_index());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700824 }
825
Austin Schuh89c9b812021-02-20 14:42:10 -0800826 const Context *pi1_context = nullptr;
827 const Context *pi2_context = nullptr;
828
829 if (header.channel_index() == pi1_timestamp_channel) {
830 // Find the forwarded message.
831 while (pi1_on_pi2_timestamp_fetcher.context().monotonic_event_time <
832 header_monotonic_sent_time) {
833 ASSERT_TRUE(pi1_on_pi2_timestamp_fetcher.FetchNext());
834 }
835
836 // And the source message.
837 while (pi1_on_pi1_timestamp_fetcher.context().monotonic_event_time <
838 header_monotonic_remote_time) {
839 ASSERT_TRUE(pi1_on_pi1_timestamp_fetcher.FetchNext());
840 }
841
842 pi1_context = &pi1_on_pi1_timestamp_fetcher.context();
843 pi2_context = &pi1_on_pi2_timestamp_fetcher.context();
Austin Schuhac6d89e2024-03-27 14:56:09 -0700844
845 EXPECT_EQ(header_monotonic_remote_transmit_time,
846 pi2_context->monotonic_remote_time);
Austin Schuh89c9b812021-02-20 14:42:10 -0800847 } else if (header.channel_index() == ping_timestamp_channel) {
848 // Find the forwarded message.
849 while (ping_on_pi2_fetcher.context().monotonic_event_time <
850 header_monotonic_sent_time) {
851 ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
852 }
853
854 // And the source message.
855 while (ping_on_pi1_fetcher.context().monotonic_event_time <
856 header_monotonic_remote_time) {
857 ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
858 }
859
860 pi1_context = &ping_on_pi1_fetcher.context();
861 pi2_context = &ping_on_pi2_fetcher.context();
Austin Schuhac6d89e2024-03-27 14:56:09 -0700862
863 EXPECT_EQ(header_monotonic_remote_transmit_time,
864 pi2_context->monotonic_event_time -
865 simulated_event_loop_factory.network_delay());
Austin Schuh89c9b812021-02-20 14:42:10 -0800866 } else {
867 LOG(FATAL) << "Unknown channel";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700868 }
869
Austin Schuh89c9b812021-02-20 14:42:10 -0800870 // Confirm the forwarded message has matching timestamps to the
871 // timestamps we got back.
872 EXPECT_EQ(pi2_context->queue_index, header.queue_index());
873 EXPECT_EQ(pi2_context->remote_queue_index,
874 header.remote_queue_index());
875 EXPECT_EQ(pi2_context->monotonic_event_time,
876 header_monotonic_sent_time);
877 EXPECT_EQ(pi2_context->realtime_event_time,
878 header_realtime_sent_time);
879 EXPECT_EQ(pi2_context->realtime_remote_time,
880 header_realtime_remote_time);
881 EXPECT_EQ(pi2_context->monotonic_remote_time,
882 header_monotonic_remote_time);
Austin Schuhac6d89e2024-03-27 14:56:09 -0700883 EXPECT_EQ(pi2_context->monotonic_remote_transmit_time,
884 header_monotonic_remote_transmit_time);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700885
Austin Schuh89c9b812021-02-20 14:42:10 -0800886 // Confirm the forwarded message also matches the source message.
887 EXPECT_EQ(pi1_context->queue_index, header.remote_queue_index());
888 EXPECT_EQ(pi1_context->monotonic_event_time,
889 header_monotonic_remote_time);
890 EXPECT_EQ(pi1_context->realtime_event_time,
891 header_realtime_remote_time);
892 });
893 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700894
Austin Schuh4c3b9702020-08-30 11:34:55 -0700895 simulated_event_loop_factory.RunFor(chrono::seconds(10) -
896 chrono::milliseconds(500) +
897 chrono::milliseconds(5));
898
899 EXPECT_EQ(pi1_pong_counter.count(), 1001);
900 EXPECT_EQ(pi2_pong_counter.count(), 1001);
901
902 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 100);
903 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 100);
904 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 100);
905 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 100);
906 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 100);
907 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 100);
908 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 100);
909
Austin Schuh20ac95d2020-12-05 17:24:19 -0800910 EXPECT_EQ(pi1_server_statistics_count, 10);
911 EXPECT_EQ(pi2_server_statistics_count, 10);
912 EXPECT_EQ(pi3_server_statistics_count, 10);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700913
914 EXPECT_EQ(pi1_client_statistics_count, 95);
915 EXPECT_EQ(pi2_client_statistics_count, 95);
916 EXPECT_EQ(pi3_client_statistics_count, 95);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700917
918 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -0800919 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 1101);
920 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 1101);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700921}
922
923// Tests that an offset between nodes can be recovered and shows up in
924// ServerStatistics correctly.
925TEST(SimulatedEventLoopTest, MultinodePingPongWithOffset) {
926 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
Austin Schuh373f1762021-06-02 21:07:09 -0700927 aos::configuration::ReadConfig(ArtifactPath(
928 "aos/events/multinode_pingpong_test_combined_config.json"));
Austin Schuh4c3b9702020-08-30 11:34:55 -0700929 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
Austin Schuh87dd3832021-01-01 23:07:31 -0800930 const size_t pi1_index = configuration::GetNodeIndex(&config.message(), pi1);
931 ASSERT_EQ(pi1_index, 0u);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700932 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
Austin Schuh87dd3832021-01-01 23:07:31 -0800933 const size_t pi2_index = configuration::GetNodeIndex(&config.message(), pi2);
934 ASSERT_EQ(pi2_index, 1u);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700935 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
Austin Schuh87dd3832021-01-01 23:07:31 -0800936 const size_t pi3_index = configuration::GetNodeIndex(&config.message(), pi3);
937 ASSERT_EQ(pi3_index, 2u);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700938
Austin Schuh87dd3832021-01-01 23:07:31 -0800939 message_bridge::TestingTimeConverter time(
940 configuration::NodesCount(&config.message()));
Austin Schuh4c3b9702020-08-30 11:34:55 -0700941 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
Austin Schuh58646e22021-08-23 23:51:46 -0700942 simulated_event_loop_factory.SetTimeConverter(&time);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700943
944 constexpr chrono::milliseconds kOffset{1501};
Austin Schuh87dd3832021-01-01 23:07:31 -0800945 time.AddNextTimestamp(
946 distributed_clock::epoch(),
Austin Schuh58646e22021-08-23 23:51:46 -0700947 {BootTimestamp::epoch(), BootTimestamp::epoch() + kOffset,
948 BootTimestamp::epoch()});
Austin Schuh4c3b9702020-08-30 11:34:55 -0700949
950 std::unique_ptr<EventLoop> ping_event_loop =
951 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
952 Ping ping(ping_event_loop.get());
953
954 std::unique_ptr<EventLoop> pong_event_loop =
955 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
956 Pong pong(pong_event_loop.get());
957
Austin Schuh8fb315a2020-11-19 22:33:58 -0800958 // Wait to let timestamp estimation start up before looking for the results.
959 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
960
Austin Schuh87dd3832021-01-01 23:07:31 -0800961 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
962 simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
963
Austin Schuh4c3b9702020-08-30 11:34:55 -0700964 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
965 simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
966
967 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
968 simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
969
Austin Schuh4c3b9702020-08-30 11:34:55 -0700970 // Confirm the offsets are being recovered correctly.
971 int pi1_server_statistics_count = 0;
972 pi1_pong_counter_event_loop->MakeWatcher(
973 "/pi1/aos", [&pi1_server_statistics_count,
974 kOffset](const message_bridge::ServerStatistics &stats) {
975 VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
976 EXPECT_EQ(stats.connections()->size(), 2u);
977 for (const message_bridge::ServerConnection *connection :
978 *stats.connections()) {
979 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800980 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700981 if (connection->node()->name()->string_view() == "pi2") {
982 EXPECT_EQ(connection->monotonic_offset(),
983 chrono::nanoseconds(kOffset).count());
984 } else if (connection->node()->name()->string_view() == "pi3") {
985 EXPECT_EQ(connection->monotonic_offset(), 0);
986 } else {
987 LOG(FATAL) << "Unknown connection";
988 }
989
990 EXPECT_TRUE(connection->has_monotonic_offset());
991 }
992 ++pi1_server_statistics_count;
993 });
994
995 int pi2_server_statistics_count = 0;
996 pi2_pong_counter_event_loop->MakeWatcher(
997 "/pi2/aos", [&pi2_server_statistics_count,
998 kOffset](const message_bridge::ServerStatistics &stats) {
999 VLOG(1) << "pi2 ServerStatistics " << FlatbufferToJson(&stats);
1000 EXPECT_EQ(stats.connections()->size(), 1u);
1001
1002 const message_bridge::ServerConnection *connection =
1003 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001004 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -07001005 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1006 EXPECT_TRUE(connection->has_monotonic_offset());
1007 EXPECT_EQ(connection->monotonic_offset(),
1008 -chrono::nanoseconds(kOffset).count());
1009 ++pi2_server_statistics_count;
1010 });
1011
1012 int pi3_server_statistics_count = 0;
1013 pi3_pong_counter_event_loop->MakeWatcher(
1014 "/pi3/aos", [&pi3_server_statistics_count](
1015 const message_bridge::ServerStatistics &stats) {
1016 VLOG(1) << "pi3 ServerStatistics " << FlatbufferToJson(&stats);
1017 EXPECT_EQ(stats.connections()->size(), 1u);
1018
1019 const message_bridge::ServerConnection *connection =
1020 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001021 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -07001022 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1023 EXPECT_TRUE(connection->has_monotonic_offset());
1024 EXPECT_EQ(connection->monotonic_offset(), 0);
1025 ++pi3_server_statistics_count;
1026 });
1027
1028 simulated_event_loop_factory.RunFor(chrono::seconds(10) -
1029 chrono::milliseconds(500) +
1030 chrono::milliseconds(5));
1031
Austin Schuh20ac95d2020-12-05 17:24:19 -08001032 EXPECT_EQ(pi1_server_statistics_count, 10);
Austin Schuh58646e22021-08-23 23:51:46 -07001033 EXPECT_EQ(pi2_server_statistics_count, 10);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001034 EXPECT_EQ(pi3_server_statistics_count, 10);
Austin Schuh4c3b9702020-08-30 11:34:55 -07001035}
1036
1037// Test that disabling statistics actually disables them.
Austin Schuh89c9b812021-02-20 14:42:10 -08001038TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeWithoutStatistics) {
Austin Schuh4c3b9702020-08-30 11:34:55 -07001039 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
1040 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
1041 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
1042
1043 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
1044 simulated_event_loop_factory.DisableStatistics();
1045
1046 std::unique_ptr<EventLoop> ping_event_loop =
1047 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
1048 Ping ping(ping_event_loop.get());
1049
1050 std::unique_ptr<EventLoop> pong_event_loop =
1051 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
1052 Pong pong(pong_event_loop.get());
1053
1054 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
1055 simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
1056
1057 MessageCounter<examples::Pong> pi2_pong_counter(
1058 pi2_pong_counter_event_loop.get(), "/test");
1059
1060 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
1061 simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
1062
1063 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
1064 simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
1065
1066 MessageCounter<examples::Pong> pi1_pong_counter(
1067 pi1_pong_counter_event_loop.get(), "/test");
1068
1069 // Count timestamps.
1070 MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
1071 pi1_pong_counter_event_loop.get(), "/pi1/aos");
1072 MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
1073 pi2_pong_counter_event_loop.get(), "/pi1/aos");
1074 MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
1075 pi3_pong_counter_event_loop.get(), "/pi1/aos");
1076 MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
1077 pi1_pong_counter_event_loop.get(), "/pi2/aos");
1078 MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
1079 pi2_pong_counter_event_loop.get(), "/pi2/aos");
1080 MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
1081 pi1_pong_counter_event_loop.get(), "/pi3/aos");
1082 MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
1083 pi3_pong_counter_event_loop.get(), "/pi3/aos");
1084
Austin Schuh2f8fd752020-09-01 22:38:28 -07001085 // Count remote timestamps
Austin Schuh89c9b812021-02-20 14:42:10 -08001086 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1087 remote_timestamps_pi2_on_pi1 =
1088 MakePi2OnPi1MessageCounters(pi1_pong_counter_event_loop.get());
1089 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1090 remote_timestamps_pi1_on_pi2 =
1091 MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
Austin Schuh2f8fd752020-09-01 22:38:28 -07001092
Austin Schuh4c3b9702020-08-30 11:34:55 -07001093 MessageCounter<message_bridge::ServerStatistics>
1094 pi1_server_statistics_counter(pi1_pong_counter_event_loop.get(),
1095 "/pi1/aos");
1096 MessageCounter<message_bridge::ServerStatistics>
1097 pi2_server_statistics_counter(pi2_pong_counter_event_loop.get(),
1098 "/pi2/aos");
1099 MessageCounter<message_bridge::ServerStatistics>
1100 pi3_server_statistics_counter(pi3_pong_counter_event_loop.get(),
1101 "/pi3/aos");
1102
1103 MessageCounter<message_bridge::ClientStatistics>
1104 pi1_client_statistics_counter(pi1_pong_counter_event_loop.get(),
1105 "/pi1/aos");
1106 MessageCounter<message_bridge::ClientStatistics>
1107 pi2_client_statistics_counter(pi2_pong_counter_event_loop.get(),
1108 "/pi2/aos");
1109 MessageCounter<message_bridge::ClientStatistics>
1110 pi3_client_statistics_counter(pi3_pong_counter_event_loop.get(),
1111 "/pi3/aos");
Austin Schuh898f4972020-01-11 17:21:25 -08001112
1113 simulated_event_loop_factory.RunFor(chrono::seconds(10) +
1114 chrono::milliseconds(5));
1115
Austin Schuh4c3b9702020-08-30 11:34:55 -07001116 EXPECT_EQ(pi1_pong_counter.count(), 1001u);
1117 EXPECT_EQ(pi2_pong_counter.count(), 1001u);
1118
1119 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 0u);
1120 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 0u);
1121 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 0u);
1122 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 0u);
1123 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 0u);
1124 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 0u);
1125 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 0u);
1126
1127 EXPECT_EQ(pi1_server_statistics_counter.count(), 0u);
1128 EXPECT_EQ(pi2_server_statistics_counter.count(), 0u);
1129 EXPECT_EQ(pi3_server_statistics_counter.count(), 0u);
1130
1131 EXPECT_EQ(pi1_client_statistics_counter.count(), 0u);
1132 EXPECT_EQ(pi2_client_statistics_counter.count(), 0u);
1133 EXPECT_EQ(pi3_client_statistics_counter.count(), 0u);
Austin Schuh2f8fd752020-09-01 22:38:28 -07001134
1135 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001136 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 1001);
1137 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 1001);
Austin Schuh898f4972020-01-11 17:21:25 -08001138}
1139
Austin Schuhc0b0f722020-12-12 18:36:06 -08001140bool AllConnected(const message_bridge::ServerStatistics *server_statistics) {
1141 for (const message_bridge::ServerConnection *connection :
1142 *server_statistics->connections()) {
1143 if (connection->state() != message_bridge::State::CONNECTED) {
1144 return false;
1145 }
1146 }
1147 return true;
1148}
1149
1150bool AllConnectedBut(const message_bridge::ServerStatistics *server_statistics,
1151 std::string_view target) {
1152 for (const message_bridge::ServerConnection *connection :
1153 *server_statistics->connections()) {
1154 if (connection->node()->name()->string_view() == target) {
1155 if (connection->state() == message_bridge::State::CONNECTED) {
1156 return false;
1157 }
1158 } else {
1159 if (connection->state() != message_bridge::State::CONNECTED) {
1160 return false;
1161 }
1162 }
1163 }
1164 return true;
1165}
1166
1167bool AllConnected(const message_bridge::ClientStatistics *client_statistics) {
1168 for (const message_bridge::ClientConnection *connection :
1169 *client_statistics->connections()) {
1170 if (connection->state() != message_bridge::State::CONNECTED) {
1171 return false;
1172 }
Austin Schuh367a7f42021-11-23 23:04:36 -08001173 EXPECT_TRUE(connection->has_boot_uuid());
1174 EXPECT_TRUE(connection->has_connected_since_time());
1175 EXPECT_TRUE(connection->has_connection_count());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001176 }
1177 return true;
1178}
1179
1180bool AllConnectedBut(const message_bridge::ClientStatistics *client_statistics,
1181 std::string_view target) {
1182 for (const message_bridge::ClientConnection *connection :
1183 *client_statistics->connections()) {
1184 if (connection->node()->name()->string_view() == target) {
1185 if (connection->state() == message_bridge::State::CONNECTED) {
1186 return false;
1187 }
Austin Schuh367a7f42021-11-23 23:04:36 -08001188 EXPECT_FALSE(connection->has_boot_uuid());
1189 EXPECT_FALSE(connection->has_connected_since_time());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001190 } else {
1191 if (connection->state() != message_bridge::State::CONNECTED) {
1192 return false;
1193 }
Austin Schuh367a7f42021-11-23 23:04:36 -08001194 EXPECT_TRUE(connection->has_boot_uuid());
1195 EXPECT_TRUE(connection->has_connected_since_time());
1196 EXPECT_TRUE(connection->has_connection_count());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001197 }
1198 }
1199 return true;
1200}
1201
Austin Schuh367a7f42021-11-23 23:04:36 -08001202int ConnectedCount(const message_bridge::ClientStatistics *client_statistics,
1203 std::string_view target) {
1204 for (const message_bridge::ClientConnection *connection :
1205 *client_statistics->connections()) {
1206 if (connection->node()->name()->string_view() == target) {
1207 return connection->connection_count();
1208 }
1209 }
1210 return 0;
1211}
1212
1213int ConnectedCount(const message_bridge::ServerStatistics *server_statistics,
1214 std::string_view target) {
1215 for (const message_bridge::ServerConnection *connection :
1216 *server_statistics->connections()) {
1217 if (connection->node()->name()->string_view() == target) {
1218 return connection->connection_count();
1219 }
1220 }
1221 return 0;
1222}
1223
Austin Schuhc0b0f722020-12-12 18:36:06 -08001224// Test that disconnecting nodes actually disconnects them.
Austin Schuh89c9b812021-02-20 14:42:10 -08001225TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeDisconnect) {
Austin Schuhc0b0f722020-12-12 18:36:06 -08001226 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
1227
Austin Schuh58646e22021-08-23 23:51:46 -07001228 NodeEventLoopFactory *pi1 =
1229 simulated_event_loop_factory.GetNodeEventLoopFactory("pi1");
1230 NodeEventLoopFactory *pi2 =
1231 simulated_event_loop_factory.GetNodeEventLoopFactory("pi2");
1232 NodeEventLoopFactory *pi3 =
1233 simulated_event_loop_factory.GetNodeEventLoopFactory("pi3");
1234
1235 std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001236 Ping ping(ping_event_loop.get());
1237
Austin Schuh58646e22021-08-23 23:51:46 -07001238 std::unique_ptr<EventLoop> pong_event_loop = pi2->MakeEventLoop("pong");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001239 Pong pong(pong_event_loop.get());
1240
1241 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
Austin Schuh58646e22021-08-23 23:51:46 -07001242 pi2->MakeEventLoop("pi2_pong_counter");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001243
1244 MessageCounter<examples::Pong> pi2_pong_counter(
1245 pi2_pong_counter_event_loop.get(), "/test");
1246
1247 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
Austin Schuh58646e22021-08-23 23:51:46 -07001248 pi3->MakeEventLoop("pi3_pong_counter");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001249
1250 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
Austin Schuh58646e22021-08-23 23:51:46 -07001251 pi1->MakeEventLoop("pi1_pong_counter");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001252
1253 MessageCounter<examples::Pong> pi1_pong_counter(
1254 pi1_pong_counter_event_loop.get(), "/test");
1255
1256 // Count timestamps.
1257 MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
1258 pi1_pong_counter_event_loop.get(), "/pi1/aos");
1259 MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
1260 pi2_pong_counter_event_loop.get(), "/pi1/aos");
1261 MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
1262 pi3_pong_counter_event_loop.get(), "/pi1/aos");
1263 MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
1264 pi1_pong_counter_event_loop.get(), "/pi2/aos");
1265 MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
1266 pi2_pong_counter_event_loop.get(), "/pi2/aos");
1267 MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
1268 pi1_pong_counter_event_loop.get(), "/pi3/aos");
1269 MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
1270 pi3_pong_counter_event_loop.get(), "/pi3/aos");
1271
1272 // Count remote timestamps
Austin Schuh89c9b812021-02-20 14:42:10 -08001273 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1274 remote_timestamps_pi2_on_pi1 =
1275 MakePi2OnPi1MessageCounters(pi1_pong_counter_event_loop.get());
1276 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1277 remote_timestamps_pi1_on_pi2 =
1278 MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001279
1280 MessageCounter<message_bridge::ServerStatistics>
Austin Schuh58646e22021-08-23 23:51:46 -07001281 *pi1_server_statistics_counter;
1282 pi1->OnStartup([pi1, &pi1_server_statistics_counter]() {
1283 pi1_server_statistics_counter =
1284 pi1->AlwaysStart<MessageCounter<message_bridge::ServerStatistics>>(
1285 "pi1_server_statistics_counter", "/pi1/aos");
1286 });
1287
Austin Schuhc0b0f722020-12-12 18:36:06 -08001288 aos::Fetcher<message_bridge::ServerStatistics> pi1_server_statistics_fetcher =
1289 pi1_pong_counter_event_loop
1290 ->MakeFetcher<message_bridge::ServerStatistics>("/pi1/aos");
1291 aos::Fetcher<message_bridge::ClientStatistics> pi1_client_statistics_fetcher =
1292 pi1_pong_counter_event_loop
1293 ->MakeFetcher<message_bridge::ClientStatistics>("/pi1/aos");
1294
1295 MessageCounter<message_bridge::ServerStatistics>
Austin Schuh58646e22021-08-23 23:51:46 -07001296 *pi2_server_statistics_counter;
1297 pi2->OnStartup([pi2, &pi2_server_statistics_counter]() {
1298 pi2_server_statistics_counter =
1299 pi2->AlwaysStart<MessageCounter<message_bridge::ServerStatistics>>(
1300 "pi2_server_statistics_counter", "/pi2/aos");
1301 });
Austin Schuhc0b0f722020-12-12 18:36:06 -08001302 aos::Fetcher<message_bridge::ServerStatistics> pi2_server_statistics_fetcher =
1303 pi2_pong_counter_event_loop
1304 ->MakeFetcher<message_bridge::ServerStatistics>("/pi2/aos");
1305 aos::Fetcher<message_bridge::ClientStatistics> pi2_client_statistics_fetcher =
1306 pi2_pong_counter_event_loop
1307 ->MakeFetcher<message_bridge::ClientStatistics>("/pi2/aos");
1308
1309 MessageCounter<message_bridge::ServerStatistics>
Austin Schuh58646e22021-08-23 23:51:46 -07001310 *pi3_server_statistics_counter;
1311 pi3->OnStartup([pi3, &pi3_server_statistics_counter]() {
1312 pi3_server_statistics_counter =
1313 pi3->AlwaysStart<MessageCounter<message_bridge::ServerStatistics>>(
1314 "pi3_server_statistics_counter", "/pi3/aos");
1315 });
Austin Schuhc0b0f722020-12-12 18:36:06 -08001316 aos::Fetcher<message_bridge::ServerStatistics> pi3_server_statistics_fetcher =
1317 pi3_pong_counter_event_loop
1318 ->MakeFetcher<message_bridge::ServerStatistics>("/pi3/aos");
1319 aos::Fetcher<message_bridge::ClientStatistics> pi3_client_statistics_fetcher =
1320 pi3_pong_counter_event_loop
1321 ->MakeFetcher<message_bridge::ClientStatistics>("/pi3/aos");
1322
1323 MessageCounter<message_bridge::ClientStatistics>
1324 pi1_client_statistics_counter(pi1_pong_counter_event_loop.get(),
1325 "/pi1/aos");
1326 MessageCounter<message_bridge::ClientStatistics>
1327 pi2_client_statistics_counter(pi2_pong_counter_event_loop.get(),
1328 "/pi2/aos");
1329 MessageCounter<message_bridge::ClientStatistics>
1330 pi3_client_statistics_counter(pi3_pong_counter_event_loop.get(),
1331 "/pi3/aos");
1332
James Kuszmaul86e86c32022-07-21 17:39:47 -07001333 std::vector<std::unique_ptr<aos::EventLoop>> statistics_watcher_loops;
1334 statistics_watcher_loops.emplace_back(pi1->MakeEventLoop("test"));
1335 statistics_watcher_loops.emplace_back(pi2->MakeEventLoop("test"));
1336 statistics_watcher_loops.emplace_back(pi3->MakeEventLoop("test"));
1337 // The currenct contract is that, if all nodes boot simultaneously in
1338 // simulation, that they should all act as if they area already connected,
1339 // without ever observing the transition from disconnected to connected (note
1340 // that on a real system the ServerStatistics message will get resent for each
1341 // and every new connection, even if the new connections happen
1342 // "simultaneously"--in simulation, we are essentially acting as if we are
1343 // starting execution in an already running system, rather than observing the
1344 // boot process).
1345 for (auto &event_loop : statistics_watcher_loops) {
1346 event_loop->MakeWatcher(
1347 "/aos", [](const message_bridge::ServerStatistics &msg) {
1348 for (const message_bridge::ServerConnection *connection :
1349 *msg.connections()) {
1350 EXPECT_EQ(message_bridge::State::CONNECTED, connection->state())
1351 << connection->node()->name()->string_view();
1352 }
1353 });
1354 }
1355
Austin Schuhc0b0f722020-12-12 18:36:06 -08001356 simulated_event_loop_factory.RunFor(chrono::seconds(2) +
1357 chrono::milliseconds(5));
1358
James Kuszmaul86e86c32022-07-21 17:39:47 -07001359 statistics_watcher_loops.clear();
1360
Austin Schuhc0b0f722020-12-12 18:36:06 -08001361 EXPECT_EQ(pi1_pong_counter.count(), 201u);
1362 EXPECT_EQ(pi2_pong_counter.count(), 201u);
1363
1364 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 20u);
1365 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 20u);
1366 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 20u);
1367 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 20u);
1368 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 20u);
1369 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 20u);
1370 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 20u);
1371
Austin Schuh58646e22021-08-23 23:51:46 -07001372 EXPECT_EQ(pi1_server_statistics_counter->count(), 2u);
1373 EXPECT_EQ(pi2_server_statistics_counter->count(), 2u);
1374 EXPECT_EQ(pi3_server_statistics_counter->count(), 2u);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001375
1376 EXPECT_EQ(pi1_client_statistics_counter.count(), 20u);
1377 EXPECT_EQ(pi2_client_statistics_counter.count(), 20u);
1378 EXPECT_EQ(pi3_client_statistics_counter.count(), 20u);
1379
1380 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001381 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 221);
1382 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 221);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001383
1384 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1385 EXPECT_TRUE(AllConnected(pi1_server_statistics_fetcher.get()))
1386 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1387 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1388 EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
1389 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1390 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1391 EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
1392 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1393 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1394 EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
1395 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1396 EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
1397 EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
1398 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
1399 EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
1400 EXPECT_TRUE(AllConnected(pi3_client_statistics_fetcher.get()))
1401 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1402
Austin Schuh58646e22021-08-23 23:51:46 -07001403 pi1->Disconnect(pi3->node());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001404
1405 simulated_event_loop_factory.RunFor(chrono::seconds(2));
1406
1407 EXPECT_EQ(pi1_pong_counter.count(), 401u);
1408 EXPECT_EQ(pi2_pong_counter.count(), 401u);
1409
1410 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 40u);
1411 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 40u);
1412 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 20u);
1413 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 40u);
1414 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 40u);
1415 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 40u);
1416 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 40u);
1417
Austin Schuh58646e22021-08-23 23:51:46 -07001418 EXPECT_EQ(pi1_server_statistics_counter->count(), 4u);
1419 EXPECT_EQ(pi2_server_statistics_counter->count(), 4u);
1420 EXPECT_EQ(pi3_server_statistics_counter->count(), 4u);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001421
1422 EXPECT_EQ(pi1_client_statistics_counter.count(), 40u);
1423 EXPECT_EQ(pi2_client_statistics_counter.count(), 40u);
1424 EXPECT_EQ(pi3_client_statistics_counter.count(), 40u);
1425
1426 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001427 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 441);
1428 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 441);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001429
1430 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1431 EXPECT_TRUE(AllConnectedBut(pi1_server_statistics_fetcher.get(), "pi3"))
1432 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1433 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1434 EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
1435 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1436 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1437 EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
1438 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1439 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1440 EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
1441 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1442 EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
1443 EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
1444 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
1445 EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
1446 EXPECT_TRUE(AllConnectedBut(pi3_client_statistics_fetcher.get(), "pi1"))
1447 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1448
Austin Schuh58646e22021-08-23 23:51:46 -07001449 pi1->Connect(pi3->node());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001450
1451 simulated_event_loop_factory.RunFor(chrono::seconds(2));
1452
Austin Schuh367a7f42021-11-23 23:04:36 -08001453 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1454 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1455 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1456 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1457 EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
1458 EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
1459
1460 EXPECT_EQ(ConnectedCount(pi1_server_statistics_fetcher.get(), "pi3"), 2u)
1461 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1462 EXPECT_EQ(ConnectedCount(pi1_server_statistics_fetcher.get(), "pi2"), 1u)
1463 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1464 EXPECT_EQ(ConnectedCount(pi1_client_statistics_fetcher.get(), "pi3"), 1u)
1465 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1466 EXPECT_EQ(ConnectedCount(pi1_client_statistics_fetcher.get(), "pi2"), 1u)
1467 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1468
1469 EXPECT_EQ(ConnectedCount(pi2_server_statistics_fetcher.get(), "pi1"), 1u)
1470 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1471 EXPECT_EQ(ConnectedCount(pi2_client_statistics_fetcher.get(), "pi1"), 1u)
1472 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1473
1474 EXPECT_EQ(ConnectedCount(pi3_server_statistics_fetcher.get(), "pi1"), 1u)
1475 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
1476 EXPECT_EQ(ConnectedCount(pi3_client_statistics_fetcher.get(), "pi1"), 2u)
1477 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1478
Austin Schuhc0b0f722020-12-12 18:36:06 -08001479 EXPECT_EQ(pi1_pong_counter.count(), 601u);
1480 EXPECT_EQ(pi2_pong_counter.count(), 601u);
1481
1482 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 60u);
1483 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 60u);
1484 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 40u);
1485 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 60u);
1486 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 60u);
1487 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 60u);
1488 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 60u);
1489
Austin Schuh58646e22021-08-23 23:51:46 -07001490 EXPECT_EQ(pi1_server_statistics_counter->count(), 6u);
1491 EXPECT_EQ(pi2_server_statistics_counter->count(), 6u);
1492 EXPECT_EQ(pi3_server_statistics_counter->count(), 6u);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001493
1494 EXPECT_EQ(pi1_client_statistics_counter.count(), 60u);
1495 EXPECT_EQ(pi2_client_statistics_counter.count(), 60u);
1496 EXPECT_EQ(pi3_client_statistics_counter.count(), 60u);
1497
1498 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001499 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 661);
1500 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 661);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001501
Austin Schuhc0b0f722020-12-12 18:36:06 -08001502 EXPECT_TRUE(AllConnected(pi1_server_statistics_fetcher.get()))
1503 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001504 EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
1505 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001506 EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
1507 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001508 EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
1509 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001510 EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
1511 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001512 EXPECT_TRUE(AllConnected(pi3_client_statistics_fetcher.get()))
1513 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1514}
1515
Austin Schuh2febf0d2020-09-21 22:24:30 -07001516// Tests that the time offset having a slope doesn't break the world.
1517// SimulatedMessageBridge has enough self consistency CHECK statements to
1518// confirm, and we can can also check a message in each direction to make sure
1519// it gets delivered as expected.
1520TEST(SimulatedEventLoopTest, MultinodePingPongWithOffsetAndSlope) {
1521 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
Austin Schuh373f1762021-06-02 21:07:09 -07001522 aos::configuration::ReadConfig(ArtifactPath(
1523 "aos/events/multinode_pingpong_test_combined_config.json"));
Austin Schuh2febf0d2020-09-21 22:24:30 -07001524 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
Austin Schuh87dd3832021-01-01 23:07:31 -08001525 const size_t pi1_index = configuration::GetNodeIndex(&config.message(), pi1);
1526 ASSERT_EQ(pi1_index, 0u);
Austin Schuh2febf0d2020-09-21 22:24:30 -07001527 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
Austin Schuh87dd3832021-01-01 23:07:31 -08001528 const size_t pi2_index = configuration::GetNodeIndex(&config.message(), pi2);
1529 ASSERT_EQ(pi2_index, 1u);
1530 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
1531 const size_t pi3_index = configuration::GetNodeIndex(&config.message(), pi3);
1532 ASSERT_EQ(pi3_index, 2u);
Austin Schuh2febf0d2020-09-21 22:24:30 -07001533
Austin Schuh87dd3832021-01-01 23:07:31 -08001534 message_bridge::TestingTimeConverter time(
1535 configuration::NodesCount(&config.message()));
Austin Schuh2febf0d2020-09-21 22:24:30 -07001536 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
Austin Schuh58646e22021-08-23 23:51:46 -07001537 simulated_event_loop_factory.SetTimeConverter(&time);
Austin Schuh2febf0d2020-09-21 22:24:30 -07001538
Austin Schuh2febf0d2020-09-21 22:24:30 -07001539 constexpr chrono::milliseconds kOffset{150100};
Austin Schuh87dd3832021-01-01 23:07:31 -08001540 time.AddNextTimestamp(
1541 distributed_clock::epoch(),
Austin Schuh58646e22021-08-23 23:51:46 -07001542 {BootTimestamp::epoch(), BootTimestamp::epoch() + kOffset,
1543 BootTimestamp::epoch()});
1544 time.AddNextTimestamp(distributed_clock::epoch() + chrono::seconds(10),
1545 {BootTimestamp::epoch() + chrono::milliseconds(9999),
1546 BootTimestamp::epoch() + kOffset + chrono::seconds(10),
1547 BootTimestamp::epoch() + chrono::milliseconds(9999)});
Austin Schuh2febf0d2020-09-21 22:24:30 -07001548
1549 std::unique_ptr<EventLoop> ping_event_loop =
1550 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
1551 Ping ping(ping_event_loop.get());
1552
1553 std::unique_ptr<EventLoop> pong_event_loop =
1554 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
1555 Pong pong(pong_event_loop.get());
1556
1557 std::unique_ptr<EventLoop> pi1_counter_event_loop =
1558 simulated_event_loop_factory.MakeEventLoop("pi1_counter", pi1);
1559 std::unique_ptr<EventLoop> pi2_counter_event_loop =
1560 simulated_event_loop_factory.MakeEventLoop("pi2_counter", pi2);
1561
1562 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
1563 pi1_counter_event_loop->MakeFetcher<examples::Ping>("/test");
1564 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
1565 pi2_counter_event_loop->MakeFetcher<examples::Ping>("/test");
1566
1567 aos::Fetcher<examples::Pong> pong_on_pi2_fetcher =
1568 pi2_counter_event_loop->MakeFetcher<examples::Pong>("/test");
1569 aos::Fetcher<examples::Pong> pong_on_pi1_fetcher =
1570 pi1_counter_event_loop->MakeFetcher<examples::Pong>("/test");
1571
1572 // End after a pong message comes back. This will leave the latest messages
1573 // on all channels so we can look at timestamps easily and check they make
1574 // sense.
1575 std::unique_ptr<EventLoop> pi1_pong_ender =
1576 simulated_event_loop_factory.MakeEventLoop("pi2_counter", pi1);
1577 int count = 0;
1578 pi1_pong_ender->MakeWatcher(
1579 "/test", [&simulated_event_loop_factory, &count](const examples::Pong &) {
1580 if (++count == 100) {
1581 simulated_event_loop_factory.Exit();
1582 }
1583 });
1584
1585 // Run enough that messages should be delivered.
1586 simulated_event_loop_factory.Run();
1587
1588 // Grab the latest messages.
1589 EXPECT_TRUE(ping_on_pi1_fetcher.Fetch());
1590 EXPECT_TRUE(ping_on_pi2_fetcher.Fetch());
1591 EXPECT_TRUE(pong_on_pi1_fetcher.Fetch());
1592 EXPECT_TRUE(pong_on_pi2_fetcher.Fetch());
1593
1594 // Compute their time on the global distributed clock so we can compute
1595 // distance betwen them.
1596 const distributed_clock::time_point pi1_ping_time =
1597 simulated_event_loop_factory.GetNodeEventLoopFactory(pi1)
1598 ->ToDistributedClock(
1599 ping_on_pi1_fetcher.context().monotonic_event_time);
1600 const distributed_clock::time_point pi2_ping_time =
1601 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
1602 ->ToDistributedClock(
1603 ping_on_pi2_fetcher.context().monotonic_event_time);
1604 const distributed_clock::time_point pi1_pong_time =
1605 simulated_event_loop_factory.GetNodeEventLoopFactory(pi1)
1606 ->ToDistributedClock(
1607 pong_on_pi1_fetcher.context().monotonic_event_time);
1608 const distributed_clock::time_point pi2_pong_time =
1609 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
1610 ->ToDistributedClock(
1611 pong_on_pi2_fetcher.context().monotonic_event_time);
1612
1613 // And confirm the delivery delay is just about exactly 150 uS for both
1614 // directions like expected. There will be a couple ns of rounding errors in
1615 // the conversion functions that aren't worth accounting for right now. This
1616 // will either be really close, or really far.
1617 EXPECT_GE(pi2_ping_time, chrono::microseconds(150) - chrono::nanoseconds(10) +
1618 pi1_ping_time);
1619 EXPECT_LE(pi2_ping_time, chrono::microseconds(150) + chrono::nanoseconds(10) +
1620 pi1_ping_time);
1621
1622 EXPECT_GE(pi1_pong_time, chrono::microseconds(150) - chrono::nanoseconds(10) +
1623 pi2_pong_time);
1624 EXPECT_LE(pi1_pong_time, chrono::microseconds(150) + chrono::nanoseconds(10) +
1625 pi2_pong_time);
1626}
1627
Austin Schuh4c570ea2020-11-19 23:13:24 -08001628void SendPing(aos::Sender<examples::Ping> *sender, int value) {
1629 aos::Sender<examples::Ping>::Builder builder = sender->MakeBuilder();
1630 examples::Ping::Builder ping_builder = builder.MakeBuilder<examples::Ping>();
1631 ping_builder.add_value(value);
milind1f1dca32021-07-03 13:50:07 -07001632 builder.CheckOk(builder.Send(ping_builder.Finish()));
Austin Schuh4c570ea2020-11-19 23:13:24 -08001633}
1634
1635// Tests that reliable (and unreliable) ping messages get forwarded as expected.
Austin Schuh89c9b812021-02-20 14:42:10 -08001636TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeStartupTesting) {
Austin Schuh4c570ea2020-11-19 23:13:24 -08001637 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
1638 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
1639
1640 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
1641
1642 std::unique_ptr<EventLoop> ping_event_loop =
1643 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
1644 aos::Sender<examples::Ping> pi1_reliable_sender =
1645 ping_event_loop->MakeSender<examples::Ping>("/reliable");
1646 aos::Sender<examples::Ping> pi1_unreliable_sender =
1647 ping_event_loop->MakeSender<examples::Ping>("/unreliable");
1648 SendPing(&pi1_reliable_sender, 1);
1649 SendPing(&pi1_unreliable_sender, 1);
1650
1651 std::unique_ptr<EventLoop> pi2_pong_event_loop =
1652 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
James Kuszmaul86e86c32022-07-21 17:39:47 -07001653 aos::Sender<examples::Ping> pi2_reliable_sender =
1654 pi2_pong_event_loop->MakeSender<examples::Ping>("/reliable2");
1655 SendPing(&pi2_reliable_sender, 1);
Austin Schuh4c570ea2020-11-19 23:13:24 -08001656 MessageCounter<examples::Ping> pi2_reliable_counter(pi2_pong_event_loop.get(),
1657 "/reliable");
James Kuszmaul86e86c32022-07-21 17:39:47 -07001658 MessageCounter<examples::Ping> pi1_reliable_counter(ping_event_loop.get(),
1659 "/reliable2");
Austin Schuh4c570ea2020-11-19 23:13:24 -08001660 MessageCounter<examples::Ping> pi2_unreliable_counter(
1661 pi2_pong_event_loop.get(), "/unreliable");
1662 aos::Fetcher<examples::Ping> reliable_on_pi2_fetcher =
1663 pi2_pong_event_loop->MakeFetcher<examples::Ping>("/reliable");
1664 aos::Fetcher<examples::Ping> unreliable_on_pi2_fetcher =
1665 pi2_pong_event_loop->MakeFetcher<examples::Ping>("/unreliable");
1666
1667 const size_t reliable_channel_index = configuration::ChannelIndex(
1668 pi2_pong_event_loop->configuration(), reliable_on_pi2_fetcher.channel());
1669
1670 std::unique_ptr<EventLoop> pi1_remote_timestamp =
1671 simulated_event_loop_factory.MakeEventLoop("pi1_remote_timestamp", pi1);
1672
Austin Schuheeaa2022021-01-02 21:52:03 -08001673 const chrono::nanoseconds network_delay =
1674 simulated_event_loop_factory.network_delay();
1675
Austin Schuh4c570ea2020-11-19 23:13:24 -08001676 int reliable_timestamp_count = 0;
1677 pi1_remote_timestamp->MakeWatcher(
Austin Schuh89c9b812021-02-20 14:42:10 -08001678 shared() ? "/pi1/aos/remote_timestamps/pi2"
1679 : "/pi1/aos/remote_timestamps/pi2/reliable/aos-examples-Ping",
Austin Schuh20ac95d2020-12-05 17:24:19 -08001680 [reliable_channel_index, &reliable_timestamp_count,
Austin Schuheeaa2022021-01-02 21:52:03 -08001681 &simulated_event_loop_factory, pi2, network_delay, &pi2_pong_event_loop,
1682 &pi1_remote_timestamp](const RemoteMessage &header) {
Austin Schuh20ac95d2020-12-05 17:24:19 -08001683 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuhcdd90272021-03-15 12:46:16 -07001684 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
Austin Schuh20ac95d2020-12-05 17:24:19 -08001685 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
Austin Schuhcdd90272021-03-15 12:46:16 -07001686 ->boot_uuid());
Austin Schuh4c570ea2020-11-19 23:13:24 -08001687 VLOG(1) << aos::FlatbufferToJson(&header);
1688 if (header.channel_index() == reliable_channel_index) {
1689 ++reliable_timestamp_count;
1690 }
Austin Schuheeaa2022021-01-02 21:52:03 -08001691
1692 const aos::monotonic_clock::time_point header_monotonic_sent_time(
1693 chrono::nanoseconds(header.monotonic_sent_time()));
1694
1695 EXPECT_EQ(pi1_remote_timestamp->context().monotonic_event_time,
1696 header_monotonic_sent_time + network_delay +
1697 (pi1_remote_timestamp->monotonic_now() -
1698 pi2_pong_event_loop->monotonic_now()));
Austin Schuh4c570ea2020-11-19 23:13:24 -08001699 });
1700
1701 // Wait to let timestamp estimation start up before looking for the results.
1702 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
1703
1704 EXPECT_EQ(pi2_reliable_counter.count(), 1u);
1705 // This one isn't reliable, but was sent before the start. It should *not* be
1706 // delivered.
1707 EXPECT_EQ(pi2_unreliable_counter.count(), 0u);
1708 // Confirm we got a timestamp logged for the message that was forwarded.
1709 EXPECT_EQ(reliable_timestamp_count, 1u);
1710
1711 SendPing(&pi1_reliable_sender, 2);
1712 SendPing(&pi1_unreliable_sender, 2);
1713 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
1714 EXPECT_EQ(pi2_reliable_counter.count(), 2u);
James Kuszmaul86e86c32022-07-21 17:39:47 -07001715 EXPECT_EQ(pi1_reliable_counter.count(), 1u);
Austin Schuh4c570ea2020-11-19 23:13:24 -08001716 EXPECT_EQ(pi2_unreliable_counter.count(), 1u);
1717
1718 EXPECT_EQ(reliable_timestamp_count, 2u);
1719}
1720
Austin Schuh20ac95d2020-12-05 17:24:19 -08001721// Tests that rebooting a node changes the ServerStatistics message and the
1722// RemoteTimestamp message.
Austin Schuh89c9b812021-02-20 14:42:10 -08001723TEST_P(RemoteMessageSimulatedEventLoopTest, BootUUIDTest) {
Austin Schuh72e65682021-09-02 11:37:05 -07001724 const UUID pi1_boot0 = UUID::Random();
1725 const UUID pi2_boot0 = UUID::Random();
1726 const UUID pi2_boot1 = UUID::Random();
1727 const UUID pi3_boot0 = UUID::Random();
1728 UUID expected_boot_uuid = pi2_boot0;
Austin Schuh20ac95d2020-12-05 17:24:19 -08001729
Austin Schuh58646e22021-08-23 23:51:46 -07001730 message_bridge::TestingTimeConverter time(
1731 configuration::NodesCount(&config.message()));
1732 SimulatedEventLoopFactory factory(&config.message());
1733 factory.SetTimeConverter(&time);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001734
Austin Schuh58646e22021-08-23 23:51:46 -07001735 const size_t pi1_index =
1736 configuration::GetNodeIndex(&config.message(), "pi1");
1737 const size_t pi2_index =
1738 configuration::GetNodeIndex(&config.message(), "pi2");
1739 const size_t pi3_index =
1740 configuration::GetNodeIndex(&config.message(), "pi3");
Austin Schuh20ac95d2020-12-05 17:24:19 -08001741
Austin Schuh58646e22021-08-23 23:51:46 -07001742 {
1743 time.AddNextTimestamp(distributed_clock::epoch(),
1744 {BootTimestamp::epoch(), BootTimestamp::epoch(),
1745 BootTimestamp::epoch()});
1746
1747 const chrono::nanoseconds dt = chrono::milliseconds(2001);
1748
1749 time.AddNextTimestamp(
1750 distributed_clock::epoch() + dt,
1751 {BootTimestamp::epoch() + dt,
1752 BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
1753 BootTimestamp::epoch() + dt});
1754
1755 time.set_boot_uuid(pi1_index, 0, pi1_boot0);
1756 time.set_boot_uuid(pi2_index, 0, pi2_boot0);
1757 time.set_boot_uuid(pi2_index, 1, pi2_boot1);
1758 time.set_boot_uuid(pi3_index, 0, pi3_boot0);
1759 }
1760
1761 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1762 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
1763
1764 pi1->OnStartup([pi1]() { pi1->AlwaysStart<Ping>("ping"); });
1765 pi2->OnStartup([pi2]() { pi2->AlwaysStart<Pong>("pong"); });
Austin Schuh20ac95d2020-12-05 17:24:19 -08001766
1767 std::unique_ptr<EventLoop> pi1_remote_timestamp =
Austin Schuh58646e22021-08-23 23:51:46 -07001768 pi1->MakeEventLoop("pi1_remote_timestamp");
Austin Schuh20ac95d2020-12-05 17:24:19 -08001769
1770 int timestamp_count = 0;
1771 pi1_remote_timestamp->MakeWatcher(
Austin Schuh8902fa52021-03-14 22:39:24 -07001772 "/pi2/aos", [&expected_boot_uuid,
1773 &pi1_remote_timestamp](const message_bridge::Timestamp &) {
Austin Schuha9012be2021-07-21 15:19:11 -07001774 EXPECT_EQ(pi1_remote_timestamp->context().source_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -07001775 expected_boot_uuid);
1776 });
1777 pi1_remote_timestamp->MakeWatcher(
1778 "/test",
1779 [&expected_boot_uuid, &pi1_remote_timestamp](const examples::Pong &) {
Austin Schuha9012be2021-07-21 15:19:11 -07001780 EXPECT_EQ(pi1_remote_timestamp->context().source_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -07001781 expected_boot_uuid);
1782 });
1783 pi1_remote_timestamp->MakeWatcher(
Austin Schuh89c9b812021-02-20 14:42:10 -08001784 shared() ? "/pi1/aos/remote_timestamps/pi2"
1785 : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping",
Austin Schuh20ac95d2020-12-05 17:24:19 -08001786 [&timestamp_count, &expected_boot_uuid](const RemoteMessage &header) {
1787 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuhcdd90272021-03-15 12:46:16 -07001788 EXPECT_EQ(UUID::FromVector(header.boot_uuid()), expected_boot_uuid);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001789 VLOG(1) << aos::FlatbufferToJson(&header);
1790 ++timestamp_count;
1791 });
1792
1793 int pi1_server_statistics_count = 0;
Austin Schuh58646e22021-08-23 23:51:46 -07001794 bool first_pi1_server_statistics = true;
Austin Schuh367a7f42021-11-23 23:04:36 -08001795 int boot_number = 0;
1796 monotonic_clock::time_point expected_connection_time = pi1->monotonic_now();
Austin Schuh20ac95d2020-12-05 17:24:19 -08001797 pi1_remote_timestamp->MakeWatcher(
Austin Schuh367a7f42021-11-23 23:04:36 -08001798 "/pi1/aos",
1799 [&pi1_server_statistics_count, &expected_boot_uuid,
1800 &expected_connection_time, &first_pi1_server_statistics,
1801 &boot_number](const message_bridge::ServerStatistics &stats) {
Austin Schuh20ac95d2020-12-05 17:24:19 -08001802 VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
1803 for (const message_bridge::ServerConnection *connection :
1804 *stats.connections()) {
Austin Schuh58646e22021-08-23 23:51:46 -07001805 if (connection->state() == message_bridge::State::CONNECTED) {
1806 ASSERT_TRUE(connection->has_boot_uuid());
1807 }
1808 if (!first_pi1_server_statistics) {
1809 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1810 }
Austin Schuh20ac95d2020-12-05 17:24:19 -08001811 if (connection->node()->name()->string_view() == "pi2") {
Austin Schuh58646e22021-08-23 23:51:46 -07001812 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1813 ASSERT_TRUE(connection->has_boot_uuid());
Austin Schuh20ac95d2020-12-05 17:24:19 -08001814 EXPECT_EQ(expected_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -07001815 UUID::FromString(connection->boot_uuid()))
Austin Schuh20ac95d2020-12-05 17:24:19 -08001816 << " : Got " << aos::FlatbufferToJson(&stats);
Austin Schuh367a7f42021-11-23 23:04:36 -08001817 EXPECT_EQ(monotonic_clock::time_point(chrono::nanoseconds(
1818 connection->connected_since_time())),
1819 expected_connection_time);
1820 EXPECT_EQ(boot_number + 1, connection->connection_count());
Austin Schuh20ac95d2020-12-05 17:24:19 -08001821 ++pi1_server_statistics_count;
1822 }
1823 }
Austin Schuh58646e22021-08-23 23:51:46 -07001824 first_pi1_server_statistics = false;
Austin Schuh20ac95d2020-12-05 17:24:19 -08001825 });
1826
Austin Schuh58646e22021-08-23 23:51:46 -07001827 int pi1_client_statistics_count = 0;
1828 pi1_remote_timestamp->MakeWatcher(
Austin Schuh367a7f42021-11-23 23:04:36 -08001829 "/pi1/aos", [&pi1_client_statistics_count, &expected_boot_uuid,
1830 &expected_connection_time, &boot_number](
Austin Schuh58646e22021-08-23 23:51:46 -07001831 const message_bridge::ClientStatistics &stats) {
1832 VLOG(1) << "pi1 ClientStatistics " << FlatbufferToJson(&stats);
1833 for (const message_bridge::ClientConnection *connection :
1834 *stats.connections()) {
1835 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1836 if (connection->node()->name()->string_view() == "pi2") {
1837 ++pi1_client_statistics_count;
Austin Schuh367a7f42021-11-23 23:04:36 -08001838 EXPECT_EQ(expected_boot_uuid,
1839 UUID::FromString(connection->boot_uuid()))
1840 << " : Got " << aos::FlatbufferToJson(&stats);
1841 EXPECT_EQ(monotonic_clock::time_point(chrono::nanoseconds(
1842 connection->connected_since_time())),
1843 expected_connection_time);
1844 EXPECT_EQ(boot_number + 1, connection->connection_count());
1845 } else {
1846 EXPECT_EQ(connection->connected_since_time(), 0);
1847 EXPECT_EQ(1, connection->connection_count());
Austin Schuh58646e22021-08-23 23:51:46 -07001848 }
1849 }
1850 });
1851
1852 // Confirm that reboot changes the UUID.
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -07001853 pi2->OnShutdown([&expected_boot_uuid, &boot_number, &expected_connection_time,
1854 pi1, pi2, pi2_boot1]() {
1855 expected_boot_uuid = pi2_boot1;
1856 ++boot_number;
1857 LOG(INFO) << "OnShutdown triggered for pi2";
1858 pi2->OnStartup(
1859 [&expected_boot_uuid, &expected_connection_time, pi1, pi2]() {
1860 EXPECT_EQ(expected_boot_uuid, pi2->boot_uuid());
1861 expected_connection_time = pi1->monotonic_now();
1862 });
1863 });
Austin Schuh58646e22021-08-23 23:51:46 -07001864
Austin Schuh20ac95d2020-12-05 17:24:19 -08001865 // Let a couple of ServerStatistics messages show up before rebooting.
Austin Schuh58646e22021-08-23 23:51:46 -07001866 factory.RunFor(chrono::milliseconds(2002));
Austin Schuh20ac95d2020-12-05 17:24:19 -08001867
1868 EXPECT_GT(timestamp_count, 100);
1869 EXPECT_GE(pi1_server_statistics_count, 1u);
1870
Austin Schuh20ac95d2020-12-05 17:24:19 -08001871 timestamp_count = 0;
1872 pi1_server_statistics_count = 0;
1873
Austin Schuh58646e22021-08-23 23:51:46 -07001874 factory.RunFor(chrono::milliseconds(2000));
Austin Schuh20ac95d2020-12-05 17:24:19 -08001875 EXPECT_GT(timestamp_count, 100);
1876 EXPECT_GE(pi1_server_statistics_count, 1u);
1877}
1878
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -07001879INSTANTIATE_TEST_SUITE_P(
Austin Schuh89c9b812021-02-20 14:42:10 -08001880 All, RemoteMessageSimulatedEventLoopTest,
1881 ::testing::Values(
1882 Param{"multinode_pingpong_test_combined_config.json", true},
1883 Param{"multinode_pingpong_test_split_config.json", false}));
1884
Austin Schuh58646e22021-08-23 23:51:46 -07001885// Tests that Startup and Shutdown do reasonable things.
1886TEST(SimulatedEventLoopTest, MultinodePingPongStartup) {
1887 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
1888 aos::configuration::ReadConfig(
1889 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
1890
Austin Schuh72e65682021-09-02 11:37:05 -07001891 size_t pi1_shutdown_counter = 0;
1892 size_t pi2_shutdown_counter = 0;
1893 MessageCounter<examples::Pong> *pi1_pong_counter = nullptr;
1894 MessageCounter<examples::Ping> *pi2_ping_counter = nullptr;
1895
Austin Schuh58646e22021-08-23 23:51:46 -07001896 message_bridge::TestingTimeConverter time(
1897 configuration::NodesCount(&config.message()));
1898 SimulatedEventLoopFactory factory(&config.message());
1899 factory.SetTimeConverter(&time);
1900 time.AddNextTimestamp(
1901 distributed_clock::epoch(),
1902 {BootTimestamp::epoch(), BootTimestamp::epoch(), BootTimestamp::epoch()});
1903
1904 const chrono::nanoseconds dt = chrono::seconds(10) + chrono::milliseconds(6);
1905
1906 time.AddNextTimestamp(
1907 distributed_clock::epoch() + dt,
1908 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
1909 BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
1910 BootTimestamp::epoch() + dt});
1911
1912 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1913 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
1914
1915 // Configure startup to start Ping and Pong, and count.
1916 size_t pi1_startup_counter = 0;
1917 size_t pi2_startup_counter = 0;
1918 pi1->OnStartup([pi1]() {
1919 LOG(INFO) << "Made ping";
1920 pi1->AlwaysStart<Ping>("ping");
1921 });
1922 pi1->OnStartup([&pi1_startup_counter]() { ++pi1_startup_counter; });
1923 pi2->OnStartup([pi2]() {
1924 LOG(INFO) << "Made pong";
1925 pi2->AlwaysStart<Pong>("pong");
1926 });
1927 pi2->OnStartup([&pi2_startup_counter]() { ++pi2_startup_counter; });
1928
1929 // Shutdown just counts.
Austin Schuh58646e22021-08-23 23:51:46 -07001930 pi1->OnShutdown([&pi1_shutdown_counter]() { ++pi1_shutdown_counter; });
1931 pi2->OnShutdown([&pi2_shutdown_counter]() { ++pi2_shutdown_counter; });
1932
Austin Schuh58646e22021-08-23 23:51:46 -07001933 // Automatically make counters on startup.
1934 pi1->OnStartup([&pi1_pong_counter, pi1]() {
1935 pi1_pong_counter = pi1->AlwaysStart<MessageCounter<examples::Pong>>(
1936 "pi1_pong_counter", "/test");
1937 });
1938 pi1->OnShutdown([&pi1_pong_counter]() { pi1_pong_counter = nullptr; });
1939 pi2->OnStartup([&pi2_ping_counter, pi2]() {
1940 pi2_ping_counter = pi2->AlwaysStart<MessageCounter<examples::Ping>>(
1941 "pi2_ping_counter", "/test");
1942 });
1943 pi2->OnShutdown([&pi2_ping_counter]() { pi2_ping_counter = nullptr; });
1944
1945 EXPECT_EQ(pi2_ping_counter, nullptr);
1946 EXPECT_EQ(pi1_pong_counter, nullptr);
1947
1948 EXPECT_EQ(pi1_startup_counter, 0u);
1949 EXPECT_EQ(pi2_startup_counter, 0u);
1950 EXPECT_EQ(pi1_shutdown_counter, 0u);
1951 EXPECT_EQ(pi2_shutdown_counter, 0u);
1952
1953 factory.RunFor(chrono::seconds(10) + chrono::milliseconds(5));
1954 EXPECT_EQ(pi1_startup_counter, 1u);
1955 EXPECT_EQ(pi2_startup_counter, 1u);
1956 EXPECT_EQ(pi1_shutdown_counter, 0u);
1957 EXPECT_EQ(pi2_shutdown_counter, 0u);
1958 EXPECT_EQ(pi2_ping_counter->count(), 1001);
1959 EXPECT_EQ(pi1_pong_counter->count(), 1001);
1960
1961 LOG(INFO) << pi1->monotonic_now();
1962 LOG(INFO) << pi2->monotonic_now();
1963
1964 factory.RunFor(chrono::seconds(5) + chrono::milliseconds(5));
1965
1966 EXPECT_EQ(pi1_startup_counter, 2u);
1967 EXPECT_EQ(pi2_startup_counter, 2u);
1968 EXPECT_EQ(pi1_shutdown_counter, 1u);
1969 EXPECT_EQ(pi2_shutdown_counter, 1u);
1970 EXPECT_EQ(pi2_ping_counter->count(), 501);
1971 EXPECT_EQ(pi1_pong_counter->count(), 501);
1972}
1973
1974// Tests that OnStartup handlers can be added after running and get called, and
1975// can't be called when running.
1976TEST(SimulatedEventLoopDeathTest, OnStartupWhileRunning) {
1977 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
1978 aos::configuration::ReadConfig(
1979 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
1980
1981 // Test that we can add startup handlers as long as we aren't running, and
1982 // they get run when Run gets called again.
1983 // Test that adding a startup handler when running fails.
1984 //
1985 // Test shutdown handlers get called on destruction.
1986 SimulatedEventLoopFactory factory(&config.message());
1987
1988 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1989
1990 int startup_count0 = 0;
1991 int startup_count1 = 0;
1992
1993 pi1->OnStartup([&]() { ++startup_count0; });
1994 EXPECT_EQ(startup_count0, 0);
1995 EXPECT_EQ(startup_count1, 0);
1996
1997 factory.RunFor(chrono::nanoseconds(1));
1998 EXPECT_EQ(startup_count0, 1);
1999 EXPECT_EQ(startup_count1, 0);
2000
2001 pi1->OnStartup([&]() { ++startup_count1; });
2002 EXPECT_EQ(startup_count0, 1);
2003 EXPECT_EQ(startup_count1, 0);
2004
2005 factory.RunFor(chrono::nanoseconds(1));
2006 EXPECT_EQ(startup_count0, 1);
2007 EXPECT_EQ(startup_count1, 1);
2008
2009 std::unique_ptr<EventLoop> loop = pi1->MakeEventLoop("foo");
2010 loop->OnRun([&]() { pi1->OnStartup([]() {}); });
2011
2012 EXPECT_DEATH({ factory.RunFor(chrono::nanoseconds(1)); },
2013 "Can only register OnStartup handlers when not running.");
2014}
2015
2016// Tests that OnStartup handlers can be added after running and get called, and
2017// all the handlers get called on reboot. Shutdown handlers are tested the same
2018// way.
2019TEST(SimulatedEventLoopTest, OnStartupShutdownAllRestarts) {
2020 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2021 aos::configuration::ReadConfig(
2022 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2023
Austin Schuh72e65682021-09-02 11:37:05 -07002024 int startup_count0 = 0;
2025 int shutdown_count0 = 0;
2026 int startup_count1 = 0;
2027 int shutdown_count1 = 0;
2028
Austin Schuh58646e22021-08-23 23:51:46 -07002029 message_bridge::TestingTimeConverter time(
2030 configuration::NodesCount(&config.message()));
2031 SimulatedEventLoopFactory factory(&config.message());
2032 factory.SetTimeConverter(&time);
2033 time.StartEqual();
2034
2035 const chrono::nanoseconds dt = chrono::seconds(10);
2036 time.RebootAt(0, distributed_clock::epoch() + dt);
2037 time.RebootAt(0, distributed_clock::epoch() + 2 * dt);
2038
2039 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2040
Austin Schuh58646e22021-08-23 23:51:46 -07002041 pi1->OnStartup([&]() { ++startup_count0; });
2042 pi1->OnShutdown([&]() { ++shutdown_count0; });
2043 EXPECT_EQ(startup_count0, 0);
2044 EXPECT_EQ(startup_count1, 0);
2045 EXPECT_EQ(shutdown_count0, 0);
2046 EXPECT_EQ(shutdown_count1, 0);
2047
2048 factory.RunFor(chrono::nanoseconds(1));
2049 EXPECT_EQ(startup_count0, 1);
2050 EXPECT_EQ(startup_count1, 0);
2051 EXPECT_EQ(shutdown_count0, 0);
2052 EXPECT_EQ(shutdown_count1, 0);
2053
2054 pi1->OnStartup([&]() { ++startup_count1; });
2055 EXPECT_EQ(startup_count0, 1);
2056 EXPECT_EQ(startup_count1, 0);
2057 EXPECT_EQ(shutdown_count0, 0);
2058 EXPECT_EQ(shutdown_count1, 0);
2059
2060 factory.RunFor(chrono::nanoseconds(1));
2061 EXPECT_EQ(startup_count0, 1);
2062 EXPECT_EQ(startup_count1, 1);
2063 EXPECT_EQ(shutdown_count0, 0);
2064 EXPECT_EQ(shutdown_count1, 0);
2065
2066 factory.RunFor(chrono::seconds(15));
2067
2068 EXPECT_EQ(startup_count0, 2);
2069 EXPECT_EQ(startup_count1, 2);
2070 EXPECT_EQ(shutdown_count0, 1);
2071 EXPECT_EQ(shutdown_count1, 0);
2072
2073 pi1->OnShutdown([&]() { ++shutdown_count1; });
2074 factory.RunFor(chrono::seconds(10));
2075
2076 EXPECT_EQ(startup_count0, 3);
2077 EXPECT_EQ(startup_count1, 3);
2078 EXPECT_EQ(shutdown_count0, 2);
2079 EXPECT_EQ(shutdown_count1, 1);
2080}
2081
2082// Tests that event loops which outlive shutdown crash.
2083TEST(SimulatedEventLoopDeathTest, EventLoopOutlivesReboot) {
2084 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2085 aos::configuration::ReadConfig(
2086 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2087
2088 message_bridge::TestingTimeConverter time(
2089 configuration::NodesCount(&config.message()));
2090 SimulatedEventLoopFactory factory(&config.message());
2091 factory.SetTimeConverter(&time);
2092 time.StartEqual();
2093
2094 const chrono::nanoseconds dt = chrono::seconds(10);
2095 time.RebootAt(0, distributed_clock::epoch() + dt);
2096
2097 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2098
2099 std::unique_ptr<EventLoop> loop = pi1->MakeEventLoop("foo");
2100
2101 EXPECT_DEATH({ factory.RunFor(dt * 2); }, "Event loop");
2102}
2103
Brian Silvermane1fe2512022-08-14 23:18:50 -07002104// Test that an ExitHandle outliving its factory is caught.
2105TEST(SimulatedEventLoopDeathTest, ExitHandleOutlivesFactory) {
2106 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2107 aos::configuration::ReadConfig(
2108 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2109 auto factory = std::make_unique<SimulatedEventLoopFactory>(&config.message());
2110 NodeEventLoopFactory *pi1 = factory->GetNodeEventLoopFactory("pi1");
2111 std::unique_ptr<EventLoop> loop = pi1->MakeEventLoop("foo");
2112 auto exit_handle = factory->MakeExitHandle();
2113 EXPECT_DEATH(factory.reset(),
2114 "All ExitHandles must be destroyed before the factory");
2115}
2116
Austin Schuh3e31f912023-08-21 21:29:10 -07002117// Test that AllowApplicationCreationDuring can't happen in OnRun callbacks.
2118TEST(SimulatedEventLoopDeathTest, AllowApplicationCreationDuringInOnRun) {
2119 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2120 aos::configuration::ReadConfig(
2121 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2122 auto factory = std::make_unique<SimulatedEventLoopFactory>(&config.message());
2123 NodeEventLoopFactory *pi1 = factory->GetNodeEventLoopFactory("pi1");
2124 std::unique_ptr<EventLoop> loop = pi1->MakeEventLoop("foo");
2125 loop->OnRun([&]() { factory->AllowApplicationCreationDuring([]() {}); });
2126 EXPECT_DEATH(factory->RunFor(chrono::seconds(1)), "OnRun");
2127}
2128
Austin Schuh58646e22021-08-23 23:51:46 -07002129// Tests that messages don't survive a reboot of a node.
2130TEST(SimulatedEventLoopTest, ChannelClearedOnReboot) {
2131 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2132 aos::configuration::ReadConfig(
2133 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2134
2135 message_bridge::TestingTimeConverter time(
2136 configuration::NodesCount(&config.message()));
2137 SimulatedEventLoopFactory factory(&config.message());
2138 factory.SetTimeConverter(&time);
2139 time.StartEqual();
2140
2141 const chrono::nanoseconds dt = chrono::seconds(10);
2142 time.RebootAt(0, distributed_clock::epoch() + dt);
2143
2144 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2145
2146 const UUID boot_uuid = pi1->boot_uuid();
2147 EXPECT_NE(boot_uuid, UUID::Zero());
2148
2149 {
2150 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
2151 aos::Sender<examples::Ping> test_message_sender =
2152 ping_event_loop->MakeSender<examples::Ping>("/reliable");
2153 SendPing(&test_message_sender, 1);
2154 }
2155
2156 factory.RunFor(chrono::seconds(5));
2157
2158 {
2159 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
2160 aos::Fetcher<examples::Ping> fetcher =
2161 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
2162 EXPECT_TRUE(fetcher.Fetch());
2163 }
2164
2165 factory.RunFor(chrono::seconds(10));
2166
2167 {
2168 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
2169 aos::Fetcher<examples::Ping> fetcher =
2170 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
2171 EXPECT_FALSE(fetcher.Fetch());
2172 }
2173 EXPECT_NE(boot_uuid, pi1->boot_uuid());
2174}
2175
2176// Tests that reliable messages get resent on reboot.
2177TEST(SimulatedEventLoopTest, ReliableMessageResentOnReboot) {
2178 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2179 aos::configuration::ReadConfig(
2180 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2181
2182 message_bridge::TestingTimeConverter time(
2183 configuration::NodesCount(&config.message()));
2184 SimulatedEventLoopFactory factory(&config.message());
2185 factory.SetTimeConverter(&time);
2186 time.StartEqual();
2187
2188 const chrono::nanoseconds dt = chrono::seconds(1);
2189 time.RebootAt(1, distributed_clock::epoch() + dt);
2190
2191 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2192 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2193
2194 const UUID pi1_boot_uuid = pi1->boot_uuid();
2195 const UUID pi2_boot_uuid = pi2->boot_uuid();
2196 EXPECT_NE(pi1_boot_uuid, UUID::Zero());
2197 EXPECT_NE(pi2_boot_uuid, UUID::Zero());
2198
2199 {
2200 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
2201 aos::Sender<examples::Ping> test_message_sender =
2202 ping_event_loop->MakeSender<examples::Ping>("/reliable");
2203 SendPing(&test_message_sender, 1);
2204 }
2205
2206 factory.RunFor(chrono::milliseconds(500));
2207
2208 {
2209 ::std::unique_ptr<EventLoop> ping_event_loop = pi2->MakeEventLoop("pong");
2210 aos::Fetcher<examples::Ping> fetcher =
2211 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
Austin Schuhac6d89e2024-03-27 14:56:09 -07002212 ASSERT_TRUE(fetcher.Fetch());
2213 EXPECT_EQ(fetcher.context().monotonic_remote_time,
2214 monotonic_clock::epoch());
2215 // Message bridge picks up the Ping message immediately on reboot.
2216 EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
2217 monotonic_clock::epoch());
2218 EXPECT_EQ(fetcher.context().monotonic_event_time,
2219 monotonic_clock::epoch() + factory.network_delay());
2220 ASSERT_FALSE(fetcher.Fetch());
Austin Schuh58646e22021-08-23 23:51:46 -07002221 }
2222
2223 factory.RunFor(chrono::seconds(1));
2224
2225 {
2226 ::std::unique_ptr<EventLoop> ping_event_loop = pi2->MakeEventLoop("pong");
2227 aos::Fetcher<examples::Ping> fetcher =
2228 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
Austin Schuhac6d89e2024-03-27 14:56:09 -07002229 ASSERT_TRUE(fetcher.Fetch());
2230 EXPECT_EQ(fetcher.context().monotonic_remote_time,
2231 monotonic_clock::epoch());
2232 // Message bridge picks up the Ping message immediately on reboot.
2233 EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
2234 monotonic_clock::epoch() + chrono::seconds(1));
2235 EXPECT_EQ(fetcher.context().monotonic_event_time,
2236 monotonic_clock::epoch() + factory.network_delay());
2237 ASSERT_FALSE(fetcher.Fetch());
Austin Schuh58646e22021-08-23 23:51:46 -07002238 }
2239 EXPECT_NE(pi2_boot_uuid, pi2->boot_uuid());
2240}
2241
James Kuszmaul86e86c32022-07-21 17:39:47 -07002242TEST(SimulatedEventLoopTest, ReliableMessageSentOnStaggeredBoot) {
2243 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2244 aos::configuration::ReadConfig(
2245 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2246
2247 message_bridge::TestingTimeConverter time(
2248 configuration::NodesCount(&config.message()));
2249 time.AddNextTimestamp(
2250 distributed_clock::epoch(),
2251 {BootTimestamp{0, monotonic_clock::epoch()},
2252 BootTimestamp{0, monotonic_clock::epoch() - chrono::seconds(1)},
2253 BootTimestamp{0, monotonic_clock::epoch()}});
2254 SimulatedEventLoopFactory factory(&config.message());
2255 factory.SetTimeConverter(&time);
2256
2257 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2258 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2259
2260 const UUID pi1_boot_uuid = pi1->boot_uuid();
2261 const UUID pi2_boot_uuid = pi2->boot_uuid();
2262 EXPECT_NE(pi1_boot_uuid, UUID::Zero());
2263 EXPECT_NE(pi2_boot_uuid, UUID::Zero());
2264
2265 {
2266 ::std::unique_ptr<EventLoop> pi1_event_loop = pi1->MakeEventLoop("ping");
2267 aos::Sender<examples::Ping> pi1_sender =
2268 pi1_event_loop->MakeSender<examples::Ping>("/reliable");
2269 SendPing(&pi1_sender, 1);
2270 }
2271 ::std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("ping");
2272 aos::Sender<examples::Ping> pi2_sender =
2273 pi2_event_loop->MakeSender<examples::Ping>("/reliable2");
2274 SendPing(&pi2_sender, 1);
2275 // Verify that we staggered the OnRun callback correctly.
2276 pi2_event_loop->OnRun([pi1, pi2]() {
2277 EXPECT_EQ(pi1->monotonic_now(),
2278 monotonic_clock::epoch() + std::chrono::seconds(1));
2279 EXPECT_EQ(pi2->monotonic_now(), monotonic_clock::epoch());
2280 });
2281
2282 factory.RunFor(chrono::seconds(2));
2283
2284 {
2285 ::std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("pong");
2286 aos::Fetcher<examples::Ping> fetcher =
2287 pi2_event_loop->MakeFetcher<examples::Ping>("/reliable");
2288 ASSERT_TRUE(fetcher.Fetch());
2289 EXPECT_EQ(fetcher.context().monotonic_event_time,
2290 monotonic_clock::epoch() + factory.network_delay());
2291 EXPECT_EQ(fetcher.context().monotonic_remote_time,
2292 monotonic_clock::epoch());
Austin Schuhac6d89e2024-03-27 14:56:09 -07002293 EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
2294 monotonic_clock::epoch() + chrono::seconds(1));
James Kuszmaul86e86c32022-07-21 17:39:47 -07002295 }
2296 {
2297 ::std::unique_ptr<EventLoop> pi1_event_loop = pi1->MakeEventLoop("pong");
2298 aos::Fetcher<examples::Ping> fetcher =
2299 pi1_event_loop->MakeFetcher<examples::Ping>("/reliable2");
2300 ASSERT_TRUE(fetcher.Fetch());
2301 EXPECT_EQ(fetcher.context().monotonic_event_time,
2302 monotonic_clock::epoch() + std::chrono::seconds(1) +
2303 factory.network_delay());
2304 EXPECT_EQ(fetcher.context().monotonic_remote_time,
2305 monotonic_clock::epoch() - std::chrono::seconds(1));
Austin Schuhac6d89e2024-03-27 14:56:09 -07002306 EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
2307 monotonic_clock::epoch());
James Kuszmaul86e86c32022-07-21 17:39:47 -07002308 }
2309}
2310
Austin Schuh48205e62021-11-12 14:13:18 -08002311class SimulatedEventLoopDisconnectTest : public ::testing::Test {
2312 public:
2313 SimulatedEventLoopDisconnectTest()
2314 : config(aos::configuration::ReadConfig(ArtifactPath(
2315 "aos/events/multinode_pingpong_test_split_config.json"))),
2316 time(configuration::NodesCount(&config.message())),
2317 factory(&config.message()) {
2318 factory.SetTimeConverter(&time);
2319 }
2320
2321 void VerifyChannels(std::set<const aos::Channel *> statistics_channels,
2322 const monotonic_clock::time_point allowable_message_time,
2323 std::set<const aos::Node *> empty_nodes) {
2324 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2325 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2326 std::unique_ptr<aos::EventLoop> pi1_event_loop =
2327 pi1->MakeEventLoop("fetcher");
2328 std::unique_ptr<aos::EventLoop> pi2_event_loop =
2329 pi2->MakeEventLoop("fetcher");
2330 for (const aos::Channel *channel : *factory.configuration()->channels()) {
2331 if (configuration::ChannelIsReadableOnNode(channel,
2332 pi1_event_loop->node())) {
2333 std::unique_ptr<aos::RawFetcher> fetcher =
2334 pi1_event_loop->MakeRawFetcher(channel);
2335 if (statistics_channels.find(channel) == statistics_channels.end() ||
2336 empty_nodes.find(pi1_event_loop->node()) != empty_nodes.end()) {
2337 EXPECT_FALSE(fetcher->Fetch() &&
2338 fetcher->context().monotonic_event_time >
2339 allowable_message_time)
2340 << ": Found recent message on channel "
2341 << configuration::CleanedChannelToString(channel) << " and time "
2342 << fetcher->context().monotonic_event_time << " > "
2343 << allowable_message_time << " on pi1";
2344 } else {
2345 EXPECT_TRUE(fetcher->Fetch() &&
2346 fetcher->context().monotonic_event_time >=
2347 allowable_message_time)
2348 << ": Didn't find recent message on channel "
2349 << configuration::CleanedChannelToString(channel) << " on pi1";
2350 }
2351 }
2352 if (configuration::ChannelIsReadableOnNode(channel,
2353 pi2_event_loop->node())) {
2354 std::unique_ptr<aos::RawFetcher> fetcher =
2355 pi2_event_loop->MakeRawFetcher(channel);
2356 if (statistics_channels.find(channel) == statistics_channels.end() ||
2357 empty_nodes.find(pi2_event_loop->node()) != empty_nodes.end()) {
2358 EXPECT_FALSE(fetcher->Fetch() &&
2359 fetcher->context().monotonic_event_time >
2360 allowable_message_time)
2361 << ": Found message on channel "
2362 << configuration::CleanedChannelToString(channel) << " and time "
2363 << fetcher->context().monotonic_event_time << " > "
2364 << allowable_message_time << " on pi2";
2365 } else {
2366 EXPECT_TRUE(fetcher->Fetch() &&
2367 fetcher->context().monotonic_event_time >=
2368 allowable_message_time)
2369 << ": Didn't find message on channel "
2370 << configuration::CleanedChannelToString(channel) << " on pi2";
2371 }
2372 }
2373 }
2374 }
2375
2376 aos::FlatbufferDetachedBuffer<aos::Configuration> config;
2377
2378 message_bridge::TestingTimeConverter time;
2379 SimulatedEventLoopFactory factory;
2380};
2381
2382// Tests that if we have message bridge client/server disabled, and timing
2383// reports disabled, no messages are sent. Also tests that we can disconnect a
2384// node and disable statistics on it and it actually fully disconnects.
2385TEST_F(SimulatedEventLoopDisconnectTest, NoMessagesWhenDisabled) {
2386 time.StartEqual();
2387 factory.SkipTimingReport();
2388 factory.DisableStatistics();
2389
2390 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2391 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2392
2393 std::unique_ptr<aos::EventLoop> pi1_event_loop =
2394 pi1->MakeEventLoop("fetcher");
2395 std::unique_ptr<aos::EventLoop> pi2_event_loop =
2396 pi2->MakeEventLoop("fetcher");
2397
2398 factory.RunFor(chrono::milliseconds(100000));
2399
2400 // Confirm no messages are sent if we've configured them all off.
2401 VerifyChannels({}, monotonic_clock::min_time, {});
2402
2403 // Now, confirm that all the message_bridge channels come back when we
2404 // re-enable.
2405 factory.EnableStatistics();
2406
2407 factory.RunFor(chrono::milliseconds(10050));
2408
2409 // Build up the list of all the messages we expect when we come back.
2410 {
2411 std::set<const aos::Channel *> statistics_channels;
Brian Silverman18c8aac2022-01-02 00:03:37 -08002412 for (const std::pair<std::string_view, const Node *> &pi :
Austin Schuh48205e62021-11-12 14:13:18 -08002413 std::vector<std::pair<std::string_view, const Node *>>{
2414 {"/pi1/aos", pi1->node()},
2415 {"/pi2/aos", pi1->node()},
2416 {"/pi3/aos", pi1->node()}}) {
2417 statistics_channels.insert(configuration::GetChannel(
2418 factory.configuration(), pi.first, "aos.message_bridge.Timestamp", "",
2419 pi.second));
2420 statistics_channels.insert(configuration::GetChannel(
2421 factory.configuration(), pi.first,
2422 "aos.message_bridge.ServerStatistics", "", pi.second));
2423 statistics_channels.insert(configuration::GetChannel(
2424 factory.configuration(), pi.first,
2425 "aos.message_bridge.ClientStatistics", "", pi.second));
2426 }
2427
2428 statistics_channels.insert(configuration::GetChannel(
2429 factory.configuration(),
2430 "/pi1/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp",
2431 "aos.message_bridge.RemoteMessage", "", pi1->node()));
2432 statistics_channels.insert(configuration::GetChannel(
2433 factory.configuration(),
2434 "/pi2/aos/remote_timestamps/pi1/pi2/aos/aos-message_bridge-Timestamp",
2435 "aos.message_bridge.RemoteMessage", "", pi2->node()));
2436 VerifyChannels(statistics_channels, monotonic_clock::min_time, {});
2437 }
2438
2439 // Now test that we can disable the messages for a single node
2440 pi2->DisableStatistics();
2441 const aos::monotonic_clock::time_point statistics_disable_time =
2442 pi2->monotonic_now();
2443 factory.RunFor(chrono::milliseconds(10000));
2444
2445 // We should see a much smaller set of messages, but should still see messages
2446 // forwarded, mainly the timestamp message.
2447 {
2448 std::set<const aos::Channel *> statistics_channels;
Brian Silverman18c8aac2022-01-02 00:03:37 -08002449 for (const std::pair<std::string_view, const Node *> &pi :
Austin Schuh48205e62021-11-12 14:13:18 -08002450 std::vector<std::pair<std::string_view, const Node *>>{
2451 {"/pi1/aos", pi1->node()}, {"/pi3/aos", pi1->node()}}) {
2452 statistics_channels.insert(configuration::GetChannel(
2453 factory.configuration(), pi.first, "aos.message_bridge.Timestamp", "",
2454 pi.second));
2455 statistics_channels.insert(configuration::GetChannel(
2456 factory.configuration(), pi.first,
2457 "aos.message_bridge.ServerStatistics", "", pi.second));
2458 statistics_channels.insert(configuration::GetChannel(
2459 factory.configuration(), pi.first,
2460 "aos.message_bridge.ClientStatistics", "", pi.second));
2461 }
2462
2463 statistics_channels.insert(configuration::GetChannel(
2464 factory.configuration(),
2465 "/pi1/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp",
2466 "aos.message_bridge.RemoteMessage", "", pi1->node()));
2467 VerifyChannels(statistics_channels, statistics_disable_time, {});
2468 }
2469
2470 // Now, fully disconnect the node. This will completely quiet down pi2.
2471 pi1->Disconnect(pi2->node());
2472 pi2->Disconnect(pi1->node());
2473
2474 const aos::monotonic_clock::time_point disconnect_disable_time =
2475 pi2->monotonic_now();
2476 factory.RunFor(chrono::milliseconds(10000));
2477
2478 {
2479 std::set<const aos::Channel *> statistics_channels;
Brian Silverman18c8aac2022-01-02 00:03:37 -08002480 for (const std::pair<std::string_view, const Node *> &pi :
Austin Schuh48205e62021-11-12 14:13:18 -08002481 std::vector<std::pair<std::string_view, const Node *>>{
2482 {"/pi1/aos", pi1->node()}, {"/pi3/aos", pi1->node()}}) {
2483 statistics_channels.insert(configuration::GetChannel(
2484 factory.configuration(), pi.first, "aos.message_bridge.Timestamp", "",
2485 pi.second));
2486 statistics_channels.insert(configuration::GetChannel(
2487 factory.configuration(), pi.first,
2488 "aos.message_bridge.ServerStatistics", "", pi.second));
2489 statistics_channels.insert(configuration::GetChannel(
2490 factory.configuration(), pi.first,
2491 "aos.message_bridge.ClientStatistics", "", pi.second));
2492 }
2493
2494 VerifyChannels(statistics_channels, disconnect_disable_time, {pi2->node()});
2495 }
2496}
2497
Austin Schuhac6d89e2024-03-27 14:56:09 -07002498// Tests that rapidly sent messages get timestamped correctly.
2499TEST(SimulatedEventLoopTest, TransmitTimestamps) {
2500 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2501 aos::configuration::ReadConfig(
2502 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2503
2504 message_bridge::TestingTimeConverter time(
2505 configuration::NodesCount(&config.message()));
2506 SimulatedEventLoopFactory factory(&config.message());
2507 factory.SetTimeConverter(&time);
2508 time.StartEqual();
2509
2510 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2511 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2512
2513 ::std::unique_ptr<EventLoop> ping_event_loop = pi2->MakeEventLoop("pong");
2514 aos::Fetcher<examples::Ping> fetcher =
2515 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
2516 EXPECT_FALSE(fetcher.Fetch());
2517
2518 {
2519 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
2520 aos::Sender<examples::Ping> test_message_sender =
2521 ping_event_loop->MakeSender<examples::Ping>("/reliable");
2522 for (const std::chrono::nanoseconds dt :
2523 {chrono::microseconds(5000), chrono::microseconds(1),
2524 chrono::microseconds(2), chrono::microseconds(70),
2525 chrono::microseconds(63)}) {
2526 factory.RunFor(dt);
2527 SendPing(&test_message_sender, 1);
2528 }
2529
2530 factory.RunFor(chrono::milliseconds(10));
2531 }
2532
2533 ASSERT_TRUE(fetcher.FetchNext());
2534 EXPECT_EQ(fetcher.context().monotonic_remote_time,
2535 monotonic_clock::epoch() + chrono::microseconds(5000));
2536 // First message shows up after wakeup + network delay as expected.
2537 EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
2538 monotonic_clock::epoch() + chrono::microseconds(5000) +
2539 factory.send_delay());
2540 EXPECT_EQ(fetcher.context().monotonic_event_time,
2541 monotonic_clock::epoch() + chrono::microseconds(5000) +
2542 factory.send_delay() + factory.network_delay());
2543
2544 ASSERT_TRUE(fetcher.FetchNext());
2545 EXPECT_EQ(fetcher.context().monotonic_remote_time,
2546 monotonic_clock::epoch() + chrono::microseconds(5001));
2547 // Next message is close enough that it gets picked up at the same wakeup.
2548 EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
2549 monotonic_clock::epoch() + chrono::microseconds(5000) +
2550 factory.send_delay());
2551 EXPECT_EQ(fetcher.context().monotonic_event_time,
2552 monotonic_clock::epoch() + chrono::microseconds(5000) +
2553 factory.send_delay() + factory.network_delay());
2554
2555 ASSERT_TRUE(fetcher.FetchNext());
2556 EXPECT_EQ(fetcher.context().monotonic_remote_time,
2557 monotonic_clock::epoch() + chrono::microseconds(5003));
2558 // Same for the third.
2559 EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
2560 monotonic_clock::epoch() + chrono::microseconds(5000) +
2561 factory.send_delay());
2562 EXPECT_EQ(fetcher.context().monotonic_event_time,
2563 monotonic_clock::epoch() + chrono::microseconds(5000) +
2564 factory.send_delay() + factory.network_delay());
2565
2566 ASSERT_TRUE(fetcher.FetchNext());
2567 EXPECT_EQ(fetcher.context().monotonic_remote_time,
2568 monotonic_clock::epoch() + chrono::microseconds(5073));
2569 // Fourth waits long enough to do the right thing.
2570 EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
2571 monotonic_clock::epoch() + chrono::microseconds(5073) +
2572 factory.send_delay());
2573 EXPECT_EQ(fetcher.context().monotonic_event_time,
2574 monotonic_clock::epoch() + chrono::microseconds(5073) +
2575 factory.send_delay() + factory.network_delay());
2576
2577 ASSERT_TRUE(fetcher.FetchNext());
2578 EXPECT_EQ(fetcher.context().monotonic_remote_time,
2579 monotonic_clock::epoch() + chrono::microseconds(5136));
2580 // Fifth waits long enough to do the right thing as well (but kicks off while
2581 // the fourth is in flight over the network).
2582 EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
2583 monotonic_clock::epoch() + chrono::microseconds(5136) +
2584 factory.send_delay());
2585 EXPECT_EQ(fetcher.context().monotonic_event_time,
2586 monotonic_clock::epoch() + chrono::microseconds(5136) +
2587 factory.send_delay() + factory.network_delay());
2588
2589 ASSERT_FALSE(fetcher.FetchNext());
2590}
2591
2592// Tests that a reliable message gets forwarded if it was sent originally when
2593// nodes were disconnected.
2594TEST_F(SimulatedEventLoopDisconnectTest, ReliableMessageSendsOnConnect) {
2595 time.StartEqual();
2596 factory.SkipTimingReport();
2597 factory.DisableStatistics();
2598
2599 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2600 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2601
2602 // Fully disconnect the nodes.
2603 pi1->Disconnect(pi2->node());
2604 pi2->Disconnect(pi1->node());
2605
2606 std::unique_ptr<aos::EventLoop> pi1_event_loop = pi1->MakeEventLoop("sender");
2607
2608 std::unique_ptr<aos::EventLoop> pi2_event_loop =
2609 pi2->MakeEventLoop("fetcher");
2610 aos::Fetcher<examples::Ping> pi2_reliable_fetcher =
2611 pi2_event_loop->MakeFetcher<examples::Ping>("/reliable");
2612
2613 factory.RunFor(chrono::milliseconds(100));
2614
2615 {
2616 aos::Sender<examples::Ping> pi1_reliable_sender =
2617 pi1_event_loop->MakeSender<examples::Ping>("/reliable");
2618 for (int i = 0; i < 100; ++i) {
2619 SendPing(&pi1_reliable_sender, i);
2620 factory.RunFor(chrono::milliseconds(100));
2621 }
2622 }
2623
2624 factory.RunFor(chrono::milliseconds(50));
2625
2626 ASSERT_FALSE(pi2_reliable_fetcher.Fetch());
2627
2628 pi1->Connect(pi2->node());
2629 pi2->Connect(pi1->node());
2630
2631 factory.RunFor(chrono::milliseconds(1));
2632
2633 ASSERT_TRUE(pi2_reliable_fetcher.Fetch());
2634 ASSERT_EQ(pi2_reliable_fetcher.context().monotonic_remote_time,
2635 monotonic_clock::epoch() + chrono::milliseconds(10000));
2636 ASSERT_EQ(pi2_reliable_fetcher.context().monotonic_remote_transmit_time,
2637 monotonic_clock::epoch() + chrono::milliseconds(10150));
2638 ASSERT_EQ(pi2_reliable_fetcher.context().monotonic_event_time,
2639 monotonic_clock::epoch() + chrono::milliseconds(10150) +
2640 factory.network_delay());
2641 ASSERT_EQ(pi2_reliable_fetcher->value(), 99);
2642
2643 ASSERT_FALSE(pi2_reliable_fetcher.Fetch());
2644}
2645
Stephan Pleinesf63bde82024-01-13 15:59:33 -08002646} // namespace aos::testing