blob: 745a273fa56ece20eb82c6752244872a1e796378 [file] [log] [blame]
Alex Perrycb7da4b2019-08-28 19:35:56 -07001#include "aos/events/simulated_event_loop.h"
2
milind1f1dca32021-07-03 13:50:07 -07003#include <chrono>
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -07004#include <functional>
Austin Schuh5f1cc5c2019-12-01 18:01:11 -08005#include <string_view>
6
Philipp Schrader790cb542023-07-05 21:06:52 -07007#include "gtest/gtest.h"
8
Alex Perrycb7da4b2019-08-28 19:35:56 -07009#include "aos/events/event_loop_param_test.h"
Austin Schuhbf610b72024-04-04 20:04:55 -070010#include "aos/events/function_scheduler.h"
Austin Schuh2f8fd752020-09-01 22:38:28 -070011#include "aos/events/logging/logger_generated.h"
Austin Schuh01b4c352020-09-21 23:09:39 -070012#include "aos/events/message_counter.h"
Austin Schuh898f4972020-01-11 17:21:25 -080013#include "aos/events/ping_lib.h"
14#include "aos/events/pong_lib.h"
Austin Schuh7d87b672019-12-01 20:23:49 -080015#include "aos/events/test_message_generated.h"
Austin Schuh4c3b9702020-08-30 11:34:55 -070016#include "aos/network/message_bridge_client_generated.h"
17#include "aos/network/message_bridge_server_generated.h"
Austin Schuh0de30f32020-12-06 12:44:28 -080018#include "aos/network/remote_message_generated.h"
Austin Schuh87dd3832021-01-01 23:07:31 -080019#include "aos/network/testing_time_converter.h"
Austin Schuh4c3b9702020-08-30 11:34:55 -070020#include "aos/network/timestamp_generated.h"
Austin Schuh373f1762021-06-02 21:07:09 -070021#include "aos/testing/path.h"
Neil Balchc8f41ed2018-01-20 22:06:53 -080022
Stephan Pleinesf63bde82024-01-13 15:59:33 -080023namespace aos::testing {
Brian Silverman28d14302020-09-18 15:26:17 -070024namespace {
25
Austin Schuh373f1762021-06-02 21:07:09 -070026using aos::testing::ArtifactPath;
Brian Silverman28d14302020-09-18 15:26:17 -070027
Austin Schuh58646e22021-08-23 23:51:46 -070028using logger::BootTimestamp;
Austin Schuh0de30f32020-12-06 12:44:28 -080029using message_bridge::RemoteMessage;
Austin Schuh7267c532019-05-19 19:55:53 -070030namespace chrono = ::std::chrono;
31
Austin Schuh0de30f32020-12-06 12:44:28 -080032} // namespace
33
Neil Balchc8f41ed2018-01-20 22:06:53 -080034class SimulatedEventLoopTestFactory : public EventLoopTestFactory {
35 public:
Austin Schuh5f1cc5c2019-12-01 18:01:11 -080036 ::std::unique_ptr<EventLoop> Make(std::string_view name) override {
Austin Schuh217a9782019-12-21 23:02:50 -080037 MaybeMake();
Austin Schuhac0771c2020-01-07 18:36:30 -080038 return event_loop_factory_->MakeEventLoop(name, my_node());
Neil Balchc8f41ed2018-01-20 22:06:53 -080039 }
Austin Schuh5f1cc5c2019-12-01 18:01:11 -080040 ::std::unique_ptr<EventLoop> MakePrimary(std::string_view name) override {
Austin Schuh217a9782019-12-21 23:02:50 -080041 MaybeMake();
Austin Schuhac0771c2020-01-07 18:36:30 -080042 return event_loop_factory_->MakeEventLoop(name, my_node());
Austin Schuh44019f92019-05-19 19:58:27 -070043 }
44
Austin Schuh217a9782019-12-21 23:02:50 -080045 void Run() override { event_loop_factory_->Run(); }
46 void Exit() override { event_loop_factory_->Exit(); }
Austin Schuh44019f92019-05-19 19:58:27 -070047
Austin Schuh52d325c2019-06-23 18:59:06 -070048 // TODO(austin): Implement this. It's used currently for a phased loop test.
49 // I'm not sure how much that matters.
50 void SleepFor(::std::chrono::nanoseconds /*duration*/) override {}
51
Austin Schuh7d87b672019-12-01 20:23:49 -080052 void set_send_delay(std::chrono::nanoseconds send_delay) {
Austin Schuh217a9782019-12-21 23:02:50 -080053 MaybeMake();
54 event_loop_factory_->set_send_delay(send_delay);
Austin Schuh7d87b672019-12-01 20:23:49 -080055 }
56
Neil Balchc8f41ed2018-01-20 22:06:53 -080057 private:
Austin Schuh217a9782019-12-21 23:02:50 -080058 void MaybeMake() {
59 if (!event_loop_factory_) {
60 if (configuration()->has_nodes()) {
Austin Schuhac0771c2020-01-07 18:36:30 -080061 event_loop_factory_ =
62 std::make_unique<SimulatedEventLoopFactory>(configuration());
Austin Schuh217a9782019-12-21 23:02:50 -080063 } else {
64 event_loop_factory_ =
65 std::make_unique<SimulatedEventLoopFactory>(configuration());
66 }
67 }
68 }
69 std::unique_ptr<SimulatedEventLoopFactory> event_loop_factory_;
Neil Balchc8f41ed2018-01-20 22:06:53 -080070};
71
Austin Schuh6bae8252021-02-07 22:01:49 -080072auto CommonParameters() {
73 return ::testing::Combine(
74 ::testing::Values([]() { return new SimulatedEventLoopTestFactory(); }),
75 ::testing::Values(ReadMethod::COPY, ReadMethod::PIN),
76 ::testing::Values(DoTimingReports::kYes, DoTimingReports::kNo));
77}
Austin Schuh6b6dfa52019-06-12 20:16:20 -070078
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -070079INSTANTIATE_TEST_SUITE_P(SimulatedEventLoopCommonTest, AbstractEventLoopTest,
Austin Schuh66168842021-08-17 19:42:21 -070080 CommonParameters());
Brian Silverman77162972020-08-12 19:52:40 -070081
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -070082INSTANTIATE_TEST_SUITE_P(SimulatedEventLoopCommonDeathTest,
Austin Schuh66168842021-08-17 19:42:21 -070083 AbstractEventLoopDeathTest, CommonParameters());
Neil Balchc8f41ed2018-01-20 22:06:53 -080084
Austin Schuh89c9b812021-02-20 14:42:10 -080085// Parameters to run all the tests with.
86struct Param {
87 // The config file to use.
88 std::string config;
89 // If true, the RemoteMessage channel should be shared between all the remote
90 // channels. If false, there will be 1 RemoteMessage channel per remote
91 // channel.
92 bool shared;
93};
94
95class RemoteMessageSimulatedEventLoopTest
96 : public ::testing::TestWithParam<struct Param> {
97 public:
98 RemoteMessageSimulatedEventLoopTest()
99 : config(aos::configuration::ReadConfig(
Austin Schuh373f1762021-06-02 21:07:09 -0700100 ArtifactPath(absl::StrCat("aos/events/", GetParam().config)))) {
Austin Schuh89c9b812021-02-20 14:42:10 -0800101 LOG(INFO) << "Config " << GetParam().config;
102 }
103
104 bool shared() const { return GetParam().shared; }
105
106 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
107 MakePi2OnPi1MessageCounters(aos::EventLoop *event_loop) {
108 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>> counters;
109 if (shared()) {
110 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
111 event_loop, "/aos/remote_timestamps/pi2"));
112 } else {
113 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
114 event_loop,
115 "/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp"));
116 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
117 event_loop, "/aos/remote_timestamps/pi2/test/aos-examples-Ping"));
118 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
119 event_loop, "/aos/remote_timestamps/pi2/reliable/aos-examples-Ping"));
120 }
121 return counters;
122 }
123
124 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
125 MakePi1OnPi2MessageCounters(aos::EventLoop *event_loop) {
126 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>> counters;
127 if (shared()) {
128 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
129 event_loop, "/aos/remote_timestamps/pi1"));
130 } else {
131 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
132 event_loop, "/aos/remote_timestamps/pi1/test/aos-examples-Pong"));
133 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
134 event_loop,
135 "/aos/remote_timestamps/pi1/pi2/aos/aos-message_bridge-Timestamp"));
136 }
137 return counters;
138 }
139
140 aos::FlatbufferDetachedBuffer<aos::Configuration> config;
141};
142
Austin Schuh8fb315a2020-11-19 22:33:58 -0800143// Test that sending a message after running gets properly notified.
144TEST(SimulatedEventLoopTest, SendAfterRunFor) {
145 SimulatedEventLoopTestFactory factory;
146
147 SimulatedEventLoopFactory simulated_event_loop_factory(
148 factory.configuration());
149
150 ::std::unique_ptr<EventLoop> ping_event_loop =
151 simulated_event_loop_factory.MakeEventLoop("ping");
152 aos::Sender<TestMessage> test_message_sender =
153 ping_event_loop->MakeSender<TestMessage>("/test");
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700154 ASSERT_EQ(SendTestMessage(test_message_sender), RawSender::Error::kOk);
Austin Schuh8fb315a2020-11-19 22:33:58 -0800155
156 std::unique_ptr<EventLoop> pong1_event_loop =
157 simulated_event_loop_factory.MakeEventLoop("pong");
158 MessageCounter<TestMessage> test_message_counter1(pong1_event_loop.get(),
159 "/test");
160
161 EXPECT_FALSE(ping_event_loop->is_running());
162
163 // Watchers start when you start running, so there should be nothing counted.
164 simulated_event_loop_factory.RunFor(chrono::seconds(1));
165 EXPECT_EQ(test_message_counter1.count(), 0u);
166
167 std::unique_ptr<EventLoop> pong2_event_loop =
168 simulated_event_loop_factory.MakeEventLoop("pong");
169 MessageCounter<TestMessage> test_message_counter2(pong2_event_loop.get(),
170 "/test");
171
172 // Pauses in the middle don't count though, so this should be counted.
173 // But, the fresh watcher shouldn't pick it up yet.
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700174 ASSERT_EQ(SendTestMessage(test_message_sender), RawSender::Error::kOk);
Austin Schuh8fb315a2020-11-19 22:33:58 -0800175
176 EXPECT_EQ(test_message_counter1.count(), 0u);
177 EXPECT_EQ(test_message_counter2.count(), 0u);
178 simulated_event_loop_factory.RunFor(chrono::seconds(1));
179
180 EXPECT_EQ(test_message_counter1.count(), 1u);
181 EXPECT_EQ(test_message_counter2.count(), 0u);
182}
183
Austin Schuh60e77942022-05-16 17:48:24 -0700184// Test that if we configure an event loop to be able to send too fast that we
185// do allow it to do so.
James Kuszmaul890c2492022-04-06 14:59:31 -0700186TEST(SimulatedEventLoopTest, AllowSendTooFast) {
187 SimulatedEventLoopTestFactory factory;
188
189 SimulatedEventLoopFactory simulated_event_loop_factory(
190 factory.configuration());
191
192 // Create two event loops: One will be allowed to send too fast, one won't. We
193 // will then test to ensure that the one that is allowed to send too fast can
194 // indeed send too fast, but that it then makes it so that the second event
195 // loop can no longer send anything because *it* is still limited.
196 ::std::unique_ptr<EventLoop> too_fast_event_loop =
197 simulated_event_loop_factory.GetNodeEventLoopFactory(nullptr)
198 ->MakeEventLoop("too_fast_sender",
199 {NodeEventLoopFactory::CheckSentTooFast::kNo,
James Kuszmaul94ca5132022-07-19 09:11:08 -0700200 NodeEventLoopFactory::ExclusiveSenders::kNo,
201 {}});
James Kuszmaul890c2492022-04-06 14:59:31 -0700202 aos::Sender<TestMessage> too_fast_message_sender =
203 too_fast_event_loop->MakeSender<TestMessage>("/test");
204
205 ::std::unique_ptr<EventLoop> limited_event_loop =
206 simulated_event_loop_factory.MakeEventLoop("limited_sender");
207 aos::Sender<TestMessage> limited_message_sender =
208 limited_event_loop->MakeSender<TestMessage>("/test");
209
210 const int queue_size = TestChannelQueueSize(too_fast_event_loop.get());
211 for (int ii = 0; ii < queue_size; ++ii) {
212 ASSERT_EQ(SendTestMessage(too_fast_message_sender), RawSender::Error::kOk);
213 }
214 // And now we should start being in the sending-too-fast phase.
215 for (int ii = 0; ii < queue_size; ++ii) {
216 ASSERT_EQ(SendTestMessage(too_fast_message_sender), RawSender::Error::kOk);
Austin Schuh60e77942022-05-16 17:48:24 -0700217 ASSERT_EQ(SendTestMessage(limited_message_sender),
218 RawSender::Error::kMessagesSentTooFast);
James Kuszmaul890c2492022-04-06 14:59:31 -0700219 }
220}
221
222// Test that if we setup an exclusive sender that it is indeed exclusive.
223TEST(SimulatedEventLoopDeathTest, ExclusiveSenders) {
224 SimulatedEventLoopTestFactory factory;
225
226 SimulatedEventLoopFactory simulated_event_loop_factory(
227 factory.configuration());
228
229 ::std::unique_ptr<EventLoop> exclusive_event_loop =
230 simulated_event_loop_factory.GetNodeEventLoopFactory(nullptr)
James Kuszmaul94ca5132022-07-19 09:11:08 -0700231 ->MakeEventLoop(
232 "too_fast_sender",
233 {NodeEventLoopFactory::CheckSentTooFast::kYes,
234 NodeEventLoopFactory::ExclusiveSenders::kYes,
235 {{configuration::GetChannel(factory.configuration(), "/test1",
236 "aos.TestMessage", "", nullptr),
237 NodeEventLoopFactory::ExclusiveSenders::kNo}}});
James Kuszmaul890c2492022-04-06 14:59:31 -0700238 exclusive_event_loop->SkipAosLog();
239 exclusive_event_loop->SkipTimingReport();
240 ::std::unique_ptr<EventLoop> normal_event_loop =
241 simulated_event_loop_factory.MakeEventLoop("limited_sender");
242 // Set things up to have the exclusive sender be destroyed so we can test
243 // recovery.
244 {
245 aos::Sender<TestMessage> exclusive_sender =
246 exclusive_event_loop->MakeSender<TestMessage>("/test");
247
248 EXPECT_DEATH(normal_event_loop->MakeSender<TestMessage>("/test"),
249 "TestMessage");
250 }
251 // This one should succeed now that the exclusive channel is removed.
252 aos::Sender<TestMessage> normal_sender =
253 normal_event_loop->MakeSender<TestMessage>("/test");
Austin Schuh60e77942022-05-16 17:48:24 -0700254 EXPECT_DEATH(exclusive_event_loop->MakeSender<TestMessage>("/test"),
255 "TestMessage");
James Kuszmaul94ca5132022-07-19 09:11:08 -0700256
257 // And check an explicitly exempted channel:
258 aos::Sender<TestMessage> non_exclusive_sender =
259 exclusive_event_loop->MakeSender<TestMessage>("/test1");
260 aos::Sender<TestMessage> non_exclusive_sender_regular_event_loop =
261 normal_event_loop->MakeSender<TestMessage>("/test1");
James Kuszmaul890c2492022-04-06 14:59:31 -0700262}
263
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700264void TestSentTooFastCheckEdgeCase(
265 const std::function<RawSender::Error(int, int)> expected_err,
266 const bool send_twice_at_end) {
267 SimulatedEventLoopTestFactory factory;
268
269 auto event_loop = factory.MakePrimary("primary");
270
271 auto sender = event_loop->MakeSender<TestMessage>("/test");
272
273 const int queue_size = TestChannelQueueSize(event_loop.get());
274 int msgs_sent = 0;
275 event_loop->AddPhasedLoop(
276 [&](int) {
277 EXPECT_EQ(SendTestMessage(sender), expected_err(msgs_sent, queue_size));
278 msgs_sent++;
279
280 // If send_twice_at_end, send the last two messages (message
281 // queue_size and queue_size + 1) in the same iteration, meaning that
282 // we would be sending very slightly too fast. Otherwise, we will send
283 // message queue_size + 1 in the next iteration and we will continue
284 // to be sending exactly at the channel frequency.
285 if (send_twice_at_end && (msgs_sent == queue_size)) {
286 EXPECT_EQ(SendTestMessage(sender),
287 expected_err(msgs_sent, queue_size));
288 msgs_sent++;
289 }
290
291 if (msgs_sent > queue_size) {
292 factory.Exit();
293 }
294 },
295 std::chrono::duration_cast<std::chrono::nanoseconds>(
296 std::chrono::duration<double>(
297 1.0 / TestChannelFrequency(event_loop.get()))));
298
299 factory.Run();
300}
301
302// Tests that RawSender::Error::kMessagesSentTooFast is not returned
303// when messages are sent at the exact frequency of the channel.
304TEST(SimulatedEventLoopTest, SendingAtExactlyChannelFrequency) {
305 TestSentTooFastCheckEdgeCase([](int, int) { return RawSender::Error::kOk; },
306 false);
307}
308
309// Tests that RawSender::Error::kMessagesSentTooFast is returned
310// when sending exactly one more message than allowed in a channel storage
311// duration.
312TEST(SimulatedEventLoopTest, SendingSlightlyTooFast) {
313 TestSentTooFastCheckEdgeCase(
314 [](const int msgs_sent, const int queue_size) {
315 return (msgs_sent == queue_size ? RawSender::Error::kMessagesSentTooFast
316 : RawSender::Error::kOk);
317 },
318 true);
319}
320
Austin Schuh8fb315a2020-11-19 22:33:58 -0800321// Test that creating an event loop while running dies.
322TEST(SimulatedEventLoopDeathTest, MakeEventLoopWhileRunning) {
323 SimulatedEventLoopTestFactory factory;
324
325 SimulatedEventLoopFactory simulated_event_loop_factory(
326 factory.configuration());
327
328 ::std::unique_ptr<EventLoop> event_loop =
329 simulated_event_loop_factory.MakeEventLoop("ping");
330
331 auto timer = event_loop->AddTimer([&]() {
332 EXPECT_DEATH(
333 {
334 ::std::unique_ptr<EventLoop> event_loop2 =
335 simulated_event_loop_factory.MakeEventLoop("ping");
336 },
337 "event loop while running");
338 simulated_event_loop_factory.Exit();
339 });
340
341 event_loop->OnRun([&event_loop, &timer] {
Philipp Schradera6712522023-07-05 20:25:11 -0700342 timer->Schedule(event_loop->monotonic_now() + chrono::milliseconds(50));
Austin Schuh8fb315a2020-11-19 22:33:58 -0800343 });
344
345 simulated_event_loop_factory.Run();
346}
347
348// Test that creating a watcher after running dies.
349TEST(SimulatedEventLoopDeathTest, MakeWatcherAfterRunning) {
350 SimulatedEventLoopTestFactory factory;
351
352 SimulatedEventLoopFactory simulated_event_loop_factory(
353 factory.configuration());
354
355 ::std::unique_ptr<EventLoop> event_loop =
356 simulated_event_loop_factory.MakeEventLoop("ping");
357
358 simulated_event_loop_factory.RunFor(chrono::seconds(1));
359
360 EXPECT_DEATH(
361 { MessageCounter<TestMessage> counter(event_loop.get(), "/test"); },
362 "Can't add a watcher after running");
363
364 ::std::unique_ptr<EventLoop> event_loop2 =
365 simulated_event_loop_factory.MakeEventLoop("ping");
366
367 simulated_event_loop_factory.RunFor(chrono::seconds(1));
368
369 EXPECT_DEATH(
370 { MessageCounter<TestMessage> counter(event_loop2.get(), "/test"); },
371 "Can't add a watcher after running");
372}
373
Austin Schuh44019f92019-05-19 19:58:27 -0700374// Test that running for a time period with no handlers causes time to progress
375// correctly.
376TEST(SimulatedEventLoopTest, RunForNoHandlers) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800377 SimulatedEventLoopTestFactory factory;
378
379 SimulatedEventLoopFactory simulated_event_loop_factory(
380 factory.configuration());
Austin Schuh44019f92019-05-19 19:58:27 -0700381 ::std::unique_ptr<EventLoop> event_loop =
Austin Schuh5f1cc5c2019-12-01 18:01:11 -0800382 simulated_event_loop_factory.MakeEventLoop("loop");
Austin Schuh44019f92019-05-19 19:58:27 -0700383
384 simulated_event_loop_factory.RunFor(chrono::seconds(1));
385
386 EXPECT_EQ(::aos::monotonic_clock::epoch() + chrono::seconds(1),
Austin Schuh44019f92019-05-19 19:58:27 -0700387 event_loop->monotonic_now());
388}
389
390// Test that running for a time with a periodic handler causes time to end
391// correctly.
392TEST(SimulatedEventLoopTest, RunForTimerHandler) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800393 SimulatedEventLoopTestFactory factory;
394
395 SimulatedEventLoopFactory simulated_event_loop_factory(
396 factory.configuration());
Austin Schuh44019f92019-05-19 19:58:27 -0700397 ::std::unique_ptr<EventLoop> event_loop =
Austin Schuh5f1cc5c2019-12-01 18:01:11 -0800398 simulated_event_loop_factory.MakeEventLoop("loop");
Austin Schuh44019f92019-05-19 19:58:27 -0700399
400 int counter = 0;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700401 auto timer = event_loop->AddTimer([&counter]() { ++counter; });
Austin Schuh44019f92019-05-19 19:58:27 -0700402 event_loop->OnRun([&event_loop, &timer] {
Philipp Schradera6712522023-07-05 20:25:11 -0700403 timer->Schedule(event_loop->monotonic_now() + chrono::milliseconds(50),
404 chrono::milliseconds(100));
Austin Schuh44019f92019-05-19 19:58:27 -0700405 });
406
407 simulated_event_loop_factory.RunFor(chrono::seconds(1));
408
409 EXPECT_EQ(::aos::monotonic_clock::epoch() + chrono::seconds(1),
Austin Schuh44019f92019-05-19 19:58:27 -0700410 event_loop->monotonic_now());
411 EXPECT_EQ(counter, 10);
412}
413
Austin Schuh7d87b672019-12-01 20:23:49 -0800414// Tests that watchers have latency in simulation.
415TEST(SimulatedEventLoopTest, WatcherTimingReport) {
416 SimulatedEventLoopTestFactory factory;
417 factory.set_send_delay(std::chrono::microseconds(50));
418
419 FLAGS_timing_report_ms = 1000;
420 auto loop1 = factory.MakePrimary("primary");
421 loop1->MakeWatcher("/test", [](const TestMessage &) {});
422
423 auto loop2 = factory.Make("sender_loop");
424
425 auto loop3 = factory.Make("report_fetcher");
426
427 Fetcher<timing::Report> report_fetcher =
428 loop3->MakeFetcher<timing::Report>("/aos");
429 EXPECT_FALSE(report_fetcher.Fetch());
430
431 auto sender = loop2->MakeSender<TestMessage>("/test");
432
433 // Send 10 messages in the middle of a timing report period so we get
434 // something interesting back.
435 auto test_timer = loop2->AddTimer([&sender]() {
436 for (int i = 0; i < 10; ++i) {
437 aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
438 TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
439 builder.add_value(200 + i);
milind1f1dca32021-07-03 13:50:07 -0700440 msg.CheckOk(msg.Send(builder.Finish()));
Austin Schuh7d87b672019-12-01 20:23:49 -0800441 }
442 });
443
444 // Quit after 1 timing report, mid way through the next cycle.
445 {
446 auto end_timer = loop1->AddTimer([&factory]() { factory.Exit(); });
Philipp Schradera6712522023-07-05 20:25:11 -0700447 end_timer->Schedule(loop1->monotonic_now() + chrono::milliseconds(2500));
Austin Schuh7d87b672019-12-01 20:23:49 -0800448 end_timer->set_name("end");
449 }
450
451 loop1->OnRun([&test_timer, &loop1]() {
Philipp Schradera6712522023-07-05 20:25:11 -0700452 test_timer->Schedule(loop1->monotonic_now() + chrono::milliseconds(1500));
Austin Schuh7d87b672019-12-01 20:23:49 -0800453 });
454
455 factory.Run();
456
457 // And, since we are here, check that the timing report makes sense.
458 // Start by looking for our event loop's timing.
459 FlatbufferDetachedBuffer<timing::Report> primary_report =
460 FlatbufferDetachedBuffer<timing::Report>::Empty();
461 while (report_fetcher.FetchNext()) {
462 LOG(INFO) << "Report " << FlatbufferToJson(report_fetcher.get());
463 if (report_fetcher->name()->string_view() == "primary") {
464 primary_report = CopyFlatBuffer(report_fetcher.get());
465 }
466 }
467
468 // Check the watcher report.
Ravago Jonescf453ab2020-05-06 21:14:53 -0700469 VLOG(1) << FlatbufferToJson(primary_report, {.multi_line = true});
Austin Schuh7d87b672019-12-01 20:23:49 -0800470
471 EXPECT_EQ(primary_report.message().name()->string_view(), "primary");
472
473 // Just the timing report timer.
474 ASSERT_NE(primary_report.message().timers(), nullptr);
475 EXPECT_EQ(primary_report.message().timers()->size(), 2);
476
477 // No phased loops
478 ASSERT_EQ(primary_report.message().phased_loops(), nullptr);
479
480 // And now confirm that the watcher received all 10 messages, and has latency.
481 ASSERT_NE(primary_report.message().watchers(), nullptr);
482 ASSERT_EQ(primary_report.message().watchers()->size(), 1);
483 EXPECT_EQ(primary_report.message().watchers()->Get(0)->count(), 10);
484 EXPECT_NEAR(
485 primary_report.message().watchers()->Get(0)->wakeup_latency()->average(),
486 0.00005, 1e-9);
487 EXPECT_NEAR(
488 primary_report.message().watchers()->Get(0)->wakeup_latency()->min(),
489 0.00005, 1e-9);
490 EXPECT_NEAR(
491 primary_report.message().watchers()->Get(0)->wakeup_latency()->max(),
492 0.00005, 1e-9);
493 EXPECT_EQ(primary_report.message()
494 .watchers()
495 ->Get(0)
496 ->wakeup_latency()
497 ->standard_deviation(),
498 0.0);
499
500 EXPECT_EQ(
501 primary_report.message().watchers()->Get(0)->handler_time()->average(),
502 0.0);
503 EXPECT_EQ(primary_report.message().watchers()->Get(0)->handler_time()->min(),
504 0.0);
505 EXPECT_EQ(primary_report.message().watchers()->Get(0)->handler_time()->max(),
506 0.0);
507 EXPECT_EQ(primary_report.message()
508 .watchers()
509 ->Get(0)
510 ->handler_time()
511 ->standard_deviation(),
512 0.0);
513}
514
Austin Schuh89c9b812021-02-20 14:42:10 -0800515size_t CountAll(
516 const std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
517 &counters) {
518 size_t count = 0u;
519 for (const std::unique_ptr<MessageCounter<RemoteMessage>> &counter :
520 counters) {
521 count += counter->count();
522 }
523 return count;
524}
525
Austin Schuh4c3b9702020-08-30 11:34:55 -0700526// Tests that ping and pong work when on 2 different nodes, and the message
527// gateway messages are sent out as expected.
Austin Schuh89c9b812021-02-20 14:42:10 -0800528TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodePingPong) {
Austin Schuh898f4972020-01-11 17:21:25 -0800529 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
530 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
Austin Schuh4c3b9702020-08-30 11:34:55 -0700531 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
Austin Schuh898f4972020-01-11 17:21:25 -0800532
533 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
534
535 std::unique_ptr<EventLoop> ping_event_loop =
536 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
537 Ping ping(ping_event_loop.get());
538
539 std::unique_ptr<EventLoop> pong_event_loop =
540 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
541 Pong pong(pong_event_loop.get());
542
543 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
544 simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700545 MessageCounter<examples::Pong> pi2_pong_counter(
546 pi2_pong_counter_event_loop.get(), "/test");
Austin Schuh2f8fd752020-09-01 22:38:28 -0700547 aos::Fetcher<message_bridge::Timestamp> pi1_on_pi2_timestamp_fetcher =
548 pi2_pong_counter_event_loop->MakeFetcher<message_bridge::Timestamp>(
549 "/pi1/aos");
550 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
551 pi2_pong_counter_event_loop->MakeFetcher<examples::Ping>("/test");
Austin Schuh898f4972020-01-11 17:21:25 -0800552
Austin Schuh4c3b9702020-08-30 11:34:55 -0700553 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
554 simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
Austin Schuh898f4972020-01-11 17:21:25 -0800555
556 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
557 simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700558 MessageCounter<examples::Pong> pi1_pong_counter(
559 pi1_pong_counter_event_loop.get(), "/test");
Austin Schuh2f8fd752020-09-01 22:38:28 -0700560 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
561 pi1_pong_counter_event_loop->MakeFetcher<examples::Ping>("/test");
562 aos::Fetcher<message_bridge::Timestamp> pi1_on_pi1_timestamp_fetcher =
563 pi1_pong_counter_event_loop->MakeFetcher<message_bridge::Timestamp>(
564 "/aos");
565
Austin Schuh4c3b9702020-08-30 11:34:55 -0700566 // Count timestamps.
567 MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
568 pi1_pong_counter_event_loop.get(), "/pi1/aos");
569 MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
570 pi2_pong_counter_event_loop.get(), "/pi1/aos");
571 MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
572 pi3_pong_counter_event_loop.get(), "/pi1/aos");
573 MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
574 pi1_pong_counter_event_loop.get(), "/pi2/aos");
575 MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
576 pi2_pong_counter_event_loop.get(), "/pi2/aos");
577 MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
578 pi1_pong_counter_event_loop.get(), "/pi3/aos");
579 MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
580 pi3_pong_counter_event_loop.get(), "/pi3/aos");
581
Austin Schuh2f8fd752020-09-01 22:38:28 -0700582 // Count remote timestamps
Austin Schuh89c9b812021-02-20 14:42:10 -0800583 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
584 remote_timestamps_pi2_on_pi1 =
585 MakePi2OnPi1MessageCounters(pi1_pong_counter_event_loop.get());
586 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
587 remote_timestamps_pi1_on_pi2 =
588 MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700589
Austin Schuh4c3b9702020-08-30 11:34:55 -0700590 // Wait to let timestamp estimation start up before looking for the results.
591 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
592
Austin Schuh8fb315a2020-11-19 22:33:58 -0800593 std::unique_ptr<EventLoop> pi1_statistics_counter_event_loop =
594 simulated_event_loop_factory.MakeEventLoop("pi1_statistics_counter", pi1);
595 std::unique_ptr<EventLoop> pi2_statistics_counter_event_loop =
596 simulated_event_loop_factory.MakeEventLoop("pi2_statistics_counter", pi2);
597 std::unique_ptr<EventLoop> pi3_statistics_counter_event_loop =
598 simulated_event_loop_factory.MakeEventLoop("pi3_statistics_counter", pi3);
599
Austin Schuh4c3b9702020-08-30 11:34:55 -0700600 int pi1_server_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800601 pi1_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700602 "/pi1/aos", [&pi1_server_statistics_count](
603 const message_bridge::ServerStatistics &stats) {
604 VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
605 EXPECT_EQ(stats.connections()->size(), 2u);
606 for (const message_bridge::ServerConnection *connection :
607 *stats.connections()) {
608 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800609 EXPECT_EQ(connection->connection_count(), 1u);
610 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800611 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700612 if (connection->node()->name()->string_view() == "pi2") {
613 EXPECT_GT(connection->sent_packets(), 50);
614 } else if (connection->node()->name()->string_view() == "pi3") {
615 EXPECT_GE(connection->sent_packets(), 5);
616 } else {
617 LOG(FATAL) << "Unknown connection";
618 }
619
620 EXPECT_TRUE(connection->has_monotonic_offset());
621 EXPECT_EQ(connection->monotonic_offset(), 0);
James Kuszmaula6681e22023-05-26 11:20:40 -0700622
623 EXPECT_TRUE(connection->has_channels());
624 int accumulated_sent_count = 0;
625 int accumulated_dropped_count = 0;
626 for (const message_bridge::ServerChannelStatistics *channel :
627 *connection->channels()) {
628 accumulated_sent_count += channel->sent_packets();
629 accumulated_dropped_count += channel->dropped_packets();
630 }
631 EXPECT_EQ(connection->sent_packets(), accumulated_sent_count);
632 EXPECT_EQ(connection->dropped_packets(), accumulated_dropped_count);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700633 }
634 ++pi1_server_statistics_count;
635 });
636
637 int pi2_server_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800638 pi2_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700639 "/pi2/aos", [&pi2_server_statistics_count](
640 const message_bridge::ServerStatistics &stats) {
641 VLOG(1) << "pi2 ServerStatistics " << FlatbufferToJson(&stats);
642 EXPECT_EQ(stats.connections()->size(), 1u);
643
644 const message_bridge::ServerConnection *connection =
645 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800646 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700647 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
648 EXPECT_GT(connection->sent_packets(), 50);
649 EXPECT_TRUE(connection->has_monotonic_offset());
650 EXPECT_EQ(connection->monotonic_offset(), 0);
Austin Schuh367a7f42021-11-23 23:04:36 -0800651 EXPECT_EQ(connection->connection_count(), 1u);
652 EXPECT_EQ(connection->connected_since_time(), 0);
James Kuszmaula6681e22023-05-26 11:20:40 -0700653
654 EXPECT_TRUE(connection->has_channels());
655 int accumulated_sent_count = 0;
656 int accumulated_dropped_count = 0;
657 for (const message_bridge::ServerChannelStatistics *channel :
658 *connection->channels()) {
659 accumulated_sent_count += channel->sent_packets();
660 accumulated_dropped_count += channel->dropped_packets();
661 }
662 EXPECT_EQ(connection->sent_packets(), accumulated_sent_count);
663 EXPECT_EQ(connection->dropped_packets(), accumulated_dropped_count);
664
Austin Schuh4c3b9702020-08-30 11:34:55 -0700665 ++pi2_server_statistics_count;
666 });
667
668 int pi3_server_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800669 pi3_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700670 "/pi3/aos", [&pi3_server_statistics_count](
671 const message_bridge::ServerStatistics &stats) {
672 VLOG(1) << "pi3 ServerStatistics " << FlatbufferToJson(&stats);
673 EXPECT_EQ(stats.connections()->size(), 1u);
674
675 const message_bridge::ServerConnection *connection =
676 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800677 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700678 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
679 EXPECT_GE(connection->sent_packets(), 5);
680 EXPECT_TRUE(connection->has_monotonic_offset());
681 EXPECT_EQ(connection->monotonic_offset(), 0);
Austin Schuh367a7f42021-11-23 23:04:36 -0800682 EXPECT_EQ(connection->connection_count(), 1u);
683 EXPECT_EQ(connection->connected_since_time(), 0);
James Kuszmaula6681e22023-05-26 11:20:40 -0700684
685 EXPECT_TRUE(connection->has_channels());
686 int accumulated_sent_count = 0;
687 int accumulated_dropped_count = 0;
688 for (const message_bridge::ServerChannelStatistics *channel :
689 *connection->channels()) {
690 accumulated_sent_count += channel->sent_packets();
691 accumulated_dropped_count += channel->dropped_packets();
692 }
693 EXPECT_EQ(connection->sent_packets(), accumulated_sent_count);
694 EXPECT_EQ(connection->dropped_packets(), accumulated_dropped_count);
695
Austin Schuh4c3b9702020-08-30 11:34:55 -0700696 ++pi3_server_statistics_count;
697 });
698
699 int pi1_client_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800700 pi1_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700701 "/pi1/aos", [&pi1_client_statistics_count](
702 const message_bridge::ClientStatistics &stats) {
703 VLOG(1) << "pi1 ClientStatistics " << FlatbufferToJson(&stats);
704 EXPECT_EQ(stats.connections()->size(), 2u);
705
706 for (const message_bridge::ClientConnection *connection :
707 *stats.connections()) {
708 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
709 if (connection->node()->name()->string_view() == "pi2") {
710 EXPECT_GT(connection->received_packets(), 50);
711 } else if (connection->node()->name()->string_view() == "pi3") {
712 EXPECT_GE(connection->received_packets(), 5);
713 } else {
714 LOG(FATAL) << "Unknown connection";
715 }
716
Austin Schuhe61d4382021-03-31 21:33:02 -0700717 EXPECT_EQ(connection->partial_deliveries(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700718 EXPECT_TRUE(connection->has_monotonic_offset());
Austin Schuhac6d89e2024-03-27 14:56:09 -0700719 EXPECT_EQ(connection->monotonic_offset(), 100000);
Austin Schuh367a7f42021-11-23 23:04:36 -0800720 EXPECT_EQ(connection->connection_count(), 1u);
721 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700722 }
723 ++pi1_client_statistics_count;
724 });
725
726 int pi2_client_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800727 pi2_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700728 "/pi2/aos", [&pi2_client_statistics_count](
729 const message_bridge::ClientStatistics &stats) {
730 VLOG(1) << "pi2 ClientStatistics " << FlatbufferToJson(&stats);
731 EXPECT_EQ(stats.connections()->size(), 1u);
732
733 const message_bridge::ClientConnection *connection =
734 stats.connections()->Get(0);
735 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
736 EXPECT_GT(connection->received_packets(), 50);
Austin Schuhe61d4382021-03-31 21:33:02 -0700737 EXPECT_EQ(connection->partial_deliveries(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700738 EXPECT_TRUE(connection->has_monotonic_offset());
Austin Schuhac6d89e2024-03-27 14:56:09 -0700739 EXPECT_EQ(connection->monotonic_offset(), 100000);
Austin Schuh367a7f42021-11-23 23:04:36 -0800740 EXPECT_EQ(connection->connection_count(), 1u);
741 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700742 ++pi2_client_statistics_count;
743 });
744
745 int pi3_client_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800746 pi3_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700747 "/pi3/aos", [&pi3_client_statistics_count](
748 const message_bridge::ClientStatistics &stats) {
749 VLOG(1) << "pi3 ClientStatistics " << FlatbufferToJson(&stats);
750 EXPECT_EQ(stats.connections()->size(), 1u);
751
752 const message_bridge::ClientConnection *connection =
753 stats.connections()->Get(0);
754 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
755 EXPECT_GE(connection->received_packets(), 5);
Austin Schuhe61d4382021-03-31 21:33:02 -0700756 EXPECT_EQ(connection->partial_deliveries(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700757 EXPECT_TRUE(connection->has_monotonic_offset());
Austin Schuhac6d89e2024-03-27 14:56:09 -0700758 EXPECT_EQ(connection->monotonic_offset(), 100000);
Austin Schuh367a7f42021-11-23 23:04:36 -0800759 EXPECT_EQ(connection->connection_count(), 1u);
760 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700761 ++pi3_client_statistics_count;
762 });
763
Austin Schuh2f8fd752020-09-01 22:38:28 -0700764 // Find the channel index for both the /pi1/aos Timestamp channel and Ping
765 // channel.
766 const size_t pi1_timestamp_channel =
767 configuration::ChannelIndex(pi1_pong_counter_event_loop->configuration(),
768 pi1_on_pi2_timestamp_fetcher.channel());
769 const size_t ping_timestamp_channel =
770 configuration::ChannelIndex(pi1_pong_counter_event_loop->configuration(),
771 ping_on_pi2_fetcher.channel());
772
773 for (const Channel *channel :
774 *pi1_pong_counter_event_loop->configuration()->channels()) {
775 VLOG(1) << "Channel "
776 << configuration::ChannelIndex(
777 pi1_pong_counter_event_loop->configuration(), channel)
778 << " " << configuration::CleanedChannelToString(channel);
779 }
780
Austin Schuh8fb315a2020-11-19 22:33:58 -0800781 std::unique_ptr<EventLoop> pi1_remote_timestamp =
782 simulated_event_loop_factory.MakeEventLoop("pi1_remote_timestamp", pi1);
783
Austin Schuh89c9b812021-02-20 14:42:10 -0800784 for (std::pair<int, std::string> channel :
785 shared()
786 ? std::vector<std::pair<
787 int, std::string>>{{-1, "/pi1/aos/remote_timestamps/pi2"}}
788 : std::vector<std::pair<int, std::string>>{
789 {pi1_timestamp_channel,
790 "/pi1/aos/remote_timestamps/pi2/pi1/aos/"
791 "aos-message_bridge-Timestamp"},
792 {ping_timestamp_channel,
793 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping"}}) {
794 // For each remote timestamp we get back, confirm that it is either a ping
795 // message, or a timestamp we sent out. Also confirm that the timestamps
796 // are correct.
797 pi1_remote_timestamp->MakeWatcher(
798 channel.second,
799 [pi1_timestamp_channel, ping_timestamp_channel, &ping_on_pi2_fetcher,
800 &ping_on_pi1_fetcher, &pi1_on_pi2_timestamp_fetcher,
801 &pi1_on_pi1_timestamp_fetcher, &simulated_event_loop_factory, pi2,
Austin Schuhac6d89e2024-03-27 14:56:09 -0700802 channel_index = channel.first,
803 channel_name = channel.second](const RemoteMessage &header) {
804 VLOG(1) << channel_name << " aos::message_bridge::RemoteMessage -> "
805 << aos::FlatbufferToJson(&header);
Austin Schuh89c9b812021-02-20 14:42:10 -0800806 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuhcdd90272021-03-15 12:46:16 -0700807 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
Austin Schuh89c9b812021-02-20 14:42:10 -0800808 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
Austin Schuhcdd90272021-03-15 12:46:16 -0700809 ->boot_uuid());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700810
Austin Schuh89c9b812021-02-20 14:42:10 -0800811 const aos::monotonic_clock::time_point header_monotonic_sent_time(
812 chrono::nanoseconds(header.monotonic_sent_time()));
813 const aos::realtime_clock::time_point header_realtime_sent_time(
814 chrono::nanoseconds(header.realtime_sent_time()));
815 const aos::monotonic_clock::time_point header_monotonic_remote_time(
816 chrono::nanoseconds(header.monotonic_remote_time()));
Austin Schuhac6d89e2024-03-27 14:56:09 -0700817 const aos::monotonic_clock::time_point
818 header_monotonic_remote_transmit_time(
819 chrono::nanoseconds(header.monotonic_remote_transmit_time()));
Austin Schuh89c9b812021-02-20 14:42:10 -0800820 const aos::realtime_clock::time_point header_realtime_remote_time(
821 chrono::nanoseconds(header.realtime_remote_time()));
Austin Schuh2f8fd752020-09-01 22:38:28 -0700822
Austin Schuh89c9b812021-02-20 14:42:10 -0800823 if (channel_index != -1) {
824 ASSERT_EQ(channel_index, header.channel_index());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700825 }
826
Austin Schuh89c9b812021-02-20 14:42:10 -0800827 const Context *pi1_context = nullptr;
828 const Context *pi2_context = nullptr;
829
830 if (header.channel_index() == pi1_timestamp_channel) {
831 // Find the forwarded message.
832 while (pi1_on_pi2_timestamp_fetcher.context().monotonic_event_time <
833 header_monotonic_sent_time) {
834 ASSERT_TRUE(pi1_on_pi2_timestamp_fetcher.FetchNext());
835 }
836
837 // And the source message.
838 while (pi1_on_pi1_timestamp_fetcher.context().monotonic_event_time <
839 header_monotonic_remote_time) {
840 ASSERT_TRUE(pi1_on_pi1_timestamp_fetcher.FetchNext());
841 }
842
843 pi1_context = &pi1_on_pi1_timestamp_fetcher.context();
844 pi2_context = &pi1_on_pi2_timestamp_fetcher.context();
Austin Schuhac6d89e2024-03-27 14:56:09 -0700845
846 EXPECT_EQ(header_monotonic_remote_transmit_time,
847 pi2_context->monotonic_remote_time);
Austin Schuh89c9b812021-02-20 14:42:10 -0800848 } else if (header.channel_index() == ping_timestamp_channel) {
849 // Find the forwarded message.
850 while (ping_on_pi2_fetcher.context().monotonic_event_time <
851 header_monotonic_sent_time) {
852 ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
853 }
854
855 // And the source message.
856 while (ping_on_pi1_fetcher.context().monotonic_event_time <
857 header_monotonic_remote_time) {
858 ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
859 }
860
861 pi1_context = &ping_on_pi1_fetcher.context();
862 pi2_context = &ping_on_pi2_fetcher.context();
Austin Schuhac6d89e2024-03-27 14:56:09 -0700863
864 EXPECT_EQ(header_monotonic_remote_transmit_time,
865 pi2_context->monotonic_event_time -
866 simulated_event_loop_factory.network_delay());
Austin Schuh89c9b812021-02-20 14:42:10 -0800867 } else {
868 LOG(FATAL) << "Unknown channel";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700869 }
870
Austin Schuh89c9b812021-02-20 14:42:10 -0800871 // Confirm the forwarded message has matching timestamps to the
872 // timestamps we got back.
873 EXPECT_EQ(pi2_context->queue_index, header.queue_index());
874 EXPECT_EQ(pi2_context->remote_queue_index,
875 header.remote_queue_index());
876 EXPECT_EQ(pi2_context->monotonic_event_time,
877 header_monotonic_sent_time);
878 EXPECT_EQ(pi2_context->realtime_event_time,
879 header_realtime_sent_time);
880 EXPECT_EQ(pi2_context->realtime_remote_time,
881 header_realtime_remote_time);
882 EXPECT_EQ(pi2_context->monotonic_remote_time,
883 header_monotonic_remote_time);
Austin Schuhac6d89e2024-03-27 14:56:09 -0700884 EXPECT_EQ(pi2_context->monotonic_remote_transmit_time,
885 header_monotonic_remote_transmit_time);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700886
Austin Schuh89c9b812021-02-20 14:42:10 -0800887 // Confirm the forwarded message also matches the source message.
888 EXPECT_EQ(pi1_context->queue_index, header.remote_queue_index());
889 EXPECT_EQ(pi1_context->monotonic_event_time,
890 header_monotonic_remote_time);
891 EXPECT_EQ(pi1_context->realtime_event_time,
892 header_realtime_remote_time);
893 });
894 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700895
Austin Schuh4c3b9702020-08-30 11:34:55 -0700896 simulated_event_loop_factory.RunFor(chrono::seconds(10) -
897 chrono::milliseconds(500) +
898 chrono::milliseconds(5));
899
900 EXPECT_EQ(pi1_pong_counter.count(), 1001);
901 EXPECT_EQ(pi2_pong_counter.count(), 1001);
902
903 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 100);
904 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 100);
905 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 100);
906 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 100);
907 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 100);
908 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 100);
909 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 100);
910
Austin Schuh20ac95d2020-12-05 17:24:19 -0800911 EXPECT_EQ(pi1_server_statistics_count, 10);
912 EXPECT_EQ(pi2_server_statistics_count, 10);
913 EXPECT_EQ(pi3_server_statistics_count, 10);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700914
915 EXPECT_EQ(pi1_client_statistics_count, 95);
916 EXPECT_EQ(pi2_client_statistics_count, 95);
917 EXPECT_EQ(pi3_client_statistics_count, 95);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700918
919 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -0800920 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 1101);
921 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 1101);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700922}
923
924// Tests that an offset between nodes can be recovered and shows up in
925// ServerStatistics correctly.
926TEST(SimulatedEventLoopTest, MultinodePingPongWithOffset) {
927 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
Austin Schuh373f1762021-06-02 21:07:09 -0700928 aos::configuration::ReadConfig(ArtifactPath(
929 "aos/events/multinode_pingpong_test_combined_config.json"));
Austin Schuh4c3b9702020-08-30 11:34:55 -0700930 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
Austin Schuh87dd3832021-01-01 23:07:31 -0800931 const size_t pi1_index = configuration::GetNodeIndex(&config.message(), pi1);
932 ASSERT_EQ(pi1_index, 0u);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700933 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
Austin Schuh87dd3832021-01-01 23:07:31 -0800934 const size_t pi2_index = configuration::GetNodeIndex(&config.message(), pi2);
935 ASSERT_EQ(pi2_index, 1u);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700936 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
Austin Schuh87dd3832021-01-01 23:07:31 -0800937 const size_t pi3_index = configuration::GetNodeIndex(&config.message(), pi3);
938 ASSERT_EQ(pi3_index, 2u);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700939
Austin Schuh87dd3832021-01-01 23:07:31 -0800940 message_bridge::TestingTimeConverter time(
941 configuration::NodesCount(&config.message()));
Austin Schuh4c3b9702020-08-30 11:34:55 -0700942 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
Austin Schuh58646e22021-08-23 23:51:46 -0700943 simulated_event_loop_factory.SetTimeConverter(&time);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700944
945 constexpr chrono::milliseconds kOffset{1501};
Austin Schuh87dd3832021-01-01 23:07:31 -0800946 time.AddNextTimestamp(
947 distributed_clock::epoch(),
Austin Schuh58646e22021-08-23 23:51:46 -0700948 {BootTimestamp::epoch(), BootTimestamp::epoch() + kOffset,
949 BootTimestamp::epoch()});
Austin Schuh4c3b9702020-08-30 11:34:55 -0700950
951 std::unique_ptr<EventLoop> ping_event_loop =
952 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
953 Ping ping(ping_event_loop.get());
954
955 std::unique_ptr<EventLoop> pong_event_loop =
956 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
957 Pong pong(pong_event_loop.get());
958
Austin Schuh8fb315a2020-11-19 22:33:58 -0800959 // Wait to let timestamp estimation start up before looking for the results.
960 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
961
Austin Schuh87dd3832021-01-01 23:07:31 -0800962 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
963 simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
964
Austin Schuh4c3b9702020-08-30 11:34:55 -0700965 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
966 simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
967
968 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
969 simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
970
Austin Schuh4c3b9702020-08-30 11:34:55 -0700971 // Confirm the offsets are being recovered correctly.
972 int pi1_server_statistics_count = 0;
973 pi1_pong_counter_event_loop->MakeWatcher(
974 "/pi1/aos", [&pi1_server_statistics_count,
975 kOffset](const message_bridge::ServerStatistics &stats) {
976 VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
977 EXPECT_EQ(stats.connections()->size(), 2u);
978 for (const message_bridge::ServerConnection *connection :
979 *stats.connections()) {
980 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800981 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700982 if (connection->node()->name()->string_view() == "pi2") {
983 EXPECT_EQ(connection->monotonic_offset(),
984 chrono::nanoseconds(kOffset).count());
985 } else if (connection->node()->name()->string_view() == "pi3") {
986 EXPECT_EQ(connection->monotonic_offset(), 0);
987 } else {
988 LOG(FATAL) << "Unknown connection";
989 }
990
991 EXPECT_TRUE(connection->has_monotonic_offset());
992 }
993 ++pi1_server_statistics_count;
994 });
995
996 int pi2_server_statistics_count = 0;
997 pi2_pong_counter_event_loop->MakeWatcher(
998 "/pi2/aos", [&pi2_server_statistics_count,
999 kOffset](const message_bridge::ServerStatistics &stats) {
1000 VLOG(1) << "pi2 ServerStatistics " << FlatbufferToJson(&stats);
1001 EXPECT_EQ(stats.connections()->size(), 1u);
1002
1003 const message_bridge::ServerConnection *connection =
1004 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001005 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -07001006 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1007 EXPECT_TRUE(connection->has_monotonic_offset());
1008 EXPECT_EQ(connection->monotonic_offset(),
1009 -chrono::nanoseconds(kOffset).count());
1010 ++pi2_server_statistics_count;
1011 });
1012
1013 int pi3_server_statistics_count = 0;
1014 pi3_pong_counter_event_loop->MakeWatcher(
1015 "/pi3/aos", [&pi3_server_statistics_count](
1016 const message_bridge::ServerStatistics &stats) {
1017 VLOG(1) << "pi3 ServerStatistics " << FlatbufferToJson(&stats);
1018 EXPECT_EQ(stats.connections()->size(), 1u);
1019
1020 const message_bridge::ServerConnection *connection =
1021 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001022 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -07001023 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1024 EXPECT_TRUE(connection->has_monotonic_offset());
1025 EXPECT_EQ(connection->monotonic_offset(), 0);
1026 ++pi3_server_statistics_count;
1027 });
1028
1029 simulated_event_loop_factory.RunFor(chrono::seconds(10) -
1030 chrono::milliseconds(500) +
1031 chrono::milliseconds(5));
1032
Austin Schuh20ac95d2020-12-05 17:24:19 -08001033 EXPECT_EQ(pi1_server_statistics_count, 10);
Austin Schuh58646e22021-08-23 23:51:46 -07001034 EXPECT_EQ(pi2_server_statistics_count, 10);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001035 EXPECT_EQ(pi3_server_statistics_count, 10);
Austin Schuh4c3b9702020-08-30 11:34:55 -07001036}
1037
1038// Test that disabling statistics actually disables them.
Austin Schuh89c9b812021-02-20 14:42:10 -08001039TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeWithoutStatistics) {
Austin Schuh4c3b9702020-08-30 11:34:55 -07001040 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
1041 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
1042 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
1043
1044 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
1045 simulated_event_loop_factory.DisableStatistics();
1046
1047 std::unique_ptr<EventLoop> ping_event_loop =
1048 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
1049 Ping ping(ping_event_loop.get());
1050
1051 std::unique_ptr<EventLoop> pong_event_loop =
1052 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
1053 Pong pong(pong_event_loop.get());
1054
1055 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
1056 simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
1057
1058 MessageCounter<examples::Pong> pi2_pong_counter(
1059 pi2_pong_counter_event_loop.get(), "/test");
1060
1061 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
1062 simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
1063
1064 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
1065 simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
1066
1067 MessageCounter<examples::Pong> pi1_pong_counter(
1068 pi1_pong_counter_event_loop.get(), "/test");
1069
1070 // Count timestamps.
1071 MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
1072 pi1_pong_counter_event_loop.get(), "/pi1/aos");
1073 MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
1074 pi2_pong_counter_event_loop.get(), "/pi1/aos");
1075 MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
1076 pi3_pong_counter_event_loop.get(), "/pi1/aos");
1077 MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
1078 pi1_pong_counter_event_loop.get(), "/pi2/aos");
1079 MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
1080 pi2_pong_counter_event_loop.get(), "/pi2/aos");
1081 MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
1082 pi1_pong_counter_event_loop.get(), "/pi3/aos");
1083 MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
1084 pi3_pong_counter_event_loop.get(), "/pi3/aos");
1085
Austin Schuh2f8fd752020-09-01 22:38:28 -07001086 // Count remote timestamps
Austin Schuh89c9b812021-02-20 14:42:10 -08001087 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1088 remote_timestamps_pi2_on_pi1 =
1089 MakePi2OnPi1MessageCounters(pi1_pong_counter_event_loop.get());
1090 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1091 remote_timestamps_pi1_on_pi2 =
1092 MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
Austin Schuh2f8fd752020-09-01 22:38:28 -07001093
Austin Schuh4c3b9702020-08-30 11:34:55 -07001094 MessageCounter<message_bridge::ServerStatistics>
1095 pi1_server_statistics_counter(pi1_pong_counter_event_loop.get(),
1096 "/pi1/aos");
1097 MessageCounter<message_bridge::ServerStatistics>
1098 pi2_server_statistics_counter(pi2_pong_counter_event_loop.get(),
1099 "/pi2/aos");
1100 MessageCounter<message_bridge::ServerStatistics>
1101 pi3_server_statistics_counter(pi3_pong_counter_event_loop.get(),
1102 "/pi3/aos");
1103
1104 MessageCounter<message_bridge::ClientStatistics>
1105 pi1_client_statistics_counter(pi1_pong_counter_event_loop.get(),
1106 "/pi1/aos");
1107 MessageCounter<message_bridge::ClientStatistics>
1108 pi2_client_statistics_counter(pi2_pong_counter_event_loop.get(),
1109 "/pi2/aos");
1110 MessageCounter<message_bridge::ClientStatistics>
1111 pi3_client_statistics_counter(pi3_pong_counter_event_loop.get(),
1112 "/pi3/aos");
Austin Schuh898f4972020-01-11 17:21:25 -08001113
1114 simulated_event_loop_factory.RunFor(chrono::seconds(10) +
1115 chrono::milliseconds(5));
1116
Austin Schuh4c3b9702020-08-30 11:34:55 -07001117 EXPECT_EQ(pi1_pong_counter.count(), 1001u);
1118 EXPECT_EQ(pi2_pong_counter.count(), 1001u);
1119
1120 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 0u);
1121 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 0u);
1122 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 0u);
1123 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 0u);
1124 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 0u);
1125 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 0u);
1126 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 0u);
1127
1128 EXPECT_EQ(pi1_server_statistics_counter.count(), 0u);
1129 EXPECT_EQ(pi2_server_statistics_counter.count(), 0u);
1130 EXPECT_EQ(pi3_server_statistics_counter.count(), 0u);
1131
1132 EXPECT_EQ(pi1_client_statistics_counter.count(), 0u);
1133 EXPECT_EQ(pi2_client_statistics_counter.count(), 0u);
1134 EXPECT_EQ(pi3_client_statistics_counter.count(), 0u);
Austin Schuh2f8fd752020-09-01 22:38:28 -07001135
1136 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001137 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 1001);
1138 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 1001);
Austin Schuh898f4972020-01-11 17:21:25 -08001139}
1140
Austin Schuhc0b0f722020-12-12 18:36:06 -08001141bool AllConnected(const message_bridge::ServerStatistics *server_statistics) {
1142 for (const message_bridge::ServerConnection *connection :
1143 *server_statistics->connections()) {
1144 if (connection->state() != message_bridge::State::CONNECTED) {
1145 return false;
1146 }
1147 }
1148 return true;
1149}
1150
1151bool AllConnectedBut(const message_bridge::ServerStatistics *server_statistics,
1152 std::string_view target) {
1153 for (const message_bridge::ServerConnection *connection :
1154 *server_statistics->connections()) {
1155 if (connection->node()->name()->string_view() == target) {
1156 if (connection->state() == message_bridge::State::CONNECTED) {
1157 return false;
1158 }
1159 } else {
1160 if (connection->state() != message_bridge::State::CONNECTED) {
1161 return false;
1162 }
1163 }
1164 }
1165 return true;
1166}
1167
1168bool AllConnected(const message_bridge::ClientStatistics *client_statistics) {
1169 for (const message_bridge::ClientConnection *connection :
1170 *client_statistics->connections()) {
1171 if (connection->state() != message_bridge::State::CONNECTED) {
1172 return false;
1173 }
Austin Schuh367a7f42021-11-23 23:04:36 -08001174 EXPECT_TRUE(connection->has_boot_uuid());
1175 EXPECT_TRUE(connection->has_connected_since_time());
1176 EXPECT_TRUE(connection->has_connection_count());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001177 }
1178 return true;
1179}
1180
1181bool AllConnectedBut(const message_bridge::ClientStatistics *client_statistics,
1182 std::string_view target) {
1183 for (const message_bridge::ClientConnection *connection :
1184 *client_statistics->connections()) {
1185 if (connection->node()->name()->string_view() == target) {
1186 if (connection->state() == message_bridge::State::CONNECTED) {
1187 return false;
1188 }
Austin Schuh367a7f42021-11-23 23:04:36 -08001189 EXPECT_FALSE(connection->has_boot_uuid());
1190 EXPECT_FALSE(connection->has_connected_since_time());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001191 } else {
1192 if (connection->state() != message_bridge::State::CONNECTED) {
1193 return false;
1194 }
Austin Schuh367a7f42021-11-23 23:04:36 -08001195 EXPECT_TRUE(connection->has_boot_uuid());
1196 EXPECT_TRUE(connection->has_connected_since_time());
1197 EXPECT_TRUE(connection->has_connection_count());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001198 }
1199 }
1200 return true;
1201}
1202
Austin Schuh367a7f42021-11-23 23:04:36 -08001203int ConnectedCount(const message_bridge::ClientStatistics *client_statistics,
1204 std::string_view target) {
1205 for (const message_bridge::ClientConnection *connection :
1206 *client_statistics->connections()) {
1207 if (connection->node()->name()->string_view() == target) {
1208 return connection->connection_count();
1209 }
1210 }
1211 return 0;
1212}
1213
1214int ConnectedCount(const message_bridge::ServerStatistics *server_statistics,
1215 std::string_view target) {
1216 for (const message_bridge::ServerConnection *connection :
1217 *server_statistics->connections()) {
1218 if (connection->node()->name()->string_view() == target) {
1219 return connection->connection_count();
1220 }
1221 }
1222 return 0;
1223}
1224
Austin Schuhc0b0f722020-12-12 18:36:06 -08001225// Test that disconnecting nodes actually disconnects them.
Austin Schuh89c9b812021-02-20 14:42:10 -08001226TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeDisconnect) {
Austin Schuhc0b0f722020-12-12 18:36:06 -08001227 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
1228
Austin Schuh58646e22021-08-23 23:51:46 -07001229 NodeEventLoopFactory *pi1 =
1230 simulated_event_loop_factory.GetNodeEventLoopFactory("pi1");
1231 NodeEventLoopFactory *pi2 =
1232 simulated_event_loop_factory.GetNodeEventLoopFactory("pi2");
1233 NodeEventLoopFactory *pi3 =
1234 simulated_event_loop_factory.GetNodeEventLoopFactory("pi3");
1235
1236 std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001237 Ping ping(ping_event_loop.get());
1238
Austin Schuh58646e22021-08-23 23:51:46 -07001239 std::unique_ptr<EventLoop> pong_event_loop = pi2->MakeEventLoop("pong");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001240 Pong pong(pong_event_loop.get());
1241
1242 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
Austin Schuh58646e22021-08-23 23:51:46 -07001243 pi2->MakeEventLoop("pi2_pong_counter");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001244
1245 MessageCounter<examples::Pong> pi2_pong_counter(
1246 pi2_pong_counter_event_loop.get(), "/test");
1247
1248 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
Austin Schuh58646e22021-08-23 23:51:46 -07001249 pi3->MakeEventLoop("pi3_pong_counter");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001250
1251 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
Austin Schuh58646e22021-08-23 23:51:46 -07001252 pi1->MakeEventLoop("pi1_pong_counter");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001253
1254 MessageCounter<examples::Pong> pi1_pong_counter(
1255 pi1_pong_counter_event_loop.get(), "/test");
1256
1257 // Count timestamps.
1258 MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
1259 pi1_pong_counter_event_loop.get(), "/pi1/aos");
1260 MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
1261 pi2_pong_counter_event_loop.get(), "/pi1/aos");
1262 MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
1263 pi3_pong_counter_event_loop.get(), "/pi1/aos");
1264 MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
1265 pi1_pong_counter_event_loop.get(), "/pi2/aos");
1266 MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
1267 pi2_pong_counter_event_loop.get(), "/pi2/aos");
1268 MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
1269 pi1_pong_counter_event_loop.get(), "/pi3/aos");
1270 MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
1271 pi3_pong_counter_event_loop.get(), "/pi3/aos");
1272
1273 // Count remote timestamps
Austin Schuh89c9b812021-02-20 14:42:10 -08001274 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1275 remote_timestamps_pi2_on_pi1 =
1276 MakePi2OnPi1MessageCounters(pi1_pong_counter_event_loop.get());
1277 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1278 remote_timestamps_pi1_on_pi2 =
1279 MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001280
1281 MessageCounter<message_bridge::ServerStatistics>
Austin Schuh58646e22021-08-23 23:51:46 -07001282 *pi1_server_statistics_counter;
1283 pi1->OnStartup([pi1, &pi1_server_statistics_counter]() {
1284 pi1_server_statistics_counter =
1285 pi1->AlwaysStart<MessageCounter<message_bridge::ServerStatistics>>(
1286 "pi1_server_statistics_counter", "/pi1/aos");
1287 });
1288
Austin Schuhc0b0f722020-12-12 18:36:06 -08001289 aos::Fetcher<message_bridge::ServerStatistics> pi1_server_statistics_fetcher =
1290 pi1_pong_counter_event_loop
1291 ->MakeFetcher<message_bridge::ServerStatistics>("/pi1/aos");
1292 aos::Fetcher<message_bridge::ClientStatistics> pi1_client_statistics_fetcher =
1293 pi1_pong_counter_event_loop
1294 ->MakeFetcher<message_bridge::ClientStatistics>("/pi1/aos");
1295
1296 MessageCounter<message_bridge::ServerStatistics>
Austin Schuh58646e22021-08-23 23:51:46 -07001297 *pi2_server_statistics_counter;
1298 pi2->OnStartup([pi2, &pi2_server_statistics_counter]() {
1299 pi2_server_statistics_counter =
1300 pi2->AlwaysStart<MessageCounter<message_bridge::ServerStatistics>>(
1301 "pi2_server_statistics_counter", "/pi2/aos");
1302 });
Austin Schuhc0b0f722020-12-12 18:36:06 -08001303 aos::Fetcher<message_bridge::ServerStatistics> pi2_server_statistics_fetcher =
1304 pi2_pong_counter_event_loop
1305 ->MakeFetcher<message_bridge::ServerStatistics>("/pi2/aos");
1306 aos::Fetcher<message_bridge::ClientStatistics> pi2_client_statistics_fetcher =
1307 pi2_pong_counter_event_loop
1308 ->MakeFetcher<message_bridge::ClientStatistics>("/pi2/aos");
1309
1310 MessageCounter<message_bridge::ServerStatistics>
Austin Schuh58646e22021-08-23 23:51:46 -07001311 *pi3_server_statistics_counter;
1312 pi3->OnStartup([pi3, &pi3_server_statistics_counter]() {
1313 pi3_server_statistics_counter =
1314 pi3->AlwaysStart<MessageCounter<message_bridge::ServerStatistics>>(
1315 "pi3_server_statistics_counter", "/pi3/aos");
1316 });
Austin Schuhc0b0f722020-12-12 18:36:06 -08001317 aos::Fetcher<message_bridge::ServerStatistics> pi3_server_statistics_fetcher =
1318 pi3_pong_counter_event_loop
1319 ->MakeFetcher<message_bridge::ServerStatistics>("/pi3/aos");
1320 aos::Fetcher<message_bridge::ClientStatistics> pi3_client_statistics_fetcher =
1321 pi3_pong_counter_event_loop
1322 ->MakeFetcher<message_bridge::ClientStatistics>("/pi3/aos");
1323
1324 MessageCounter<message_bridge::ClientStatistics>
1325 pi1_client_statistics_counter(pi1_pong_counter_event_loop.get(),
1326 "/pi1/aos");
1327 MessageCounter<message_bridge::ClientStatistics>
1328 pi2_client_statistics_counter(pi2_pong_counter_event_loop.get(),
1329 "/pi2/aos");
1330 MessageCounter<message_bridge::ClientStatistics>
1331 pi3_client_statistics_counter(pi3_pong_counter_event_loop.get(),
1332 "/pi3/aos");
1333
James Kuszmaul86e86c32022-07-21 17:39:47 -07001334 std::vector<std::unique_ptr<aos::EventLoop>> statistics_watcher_loops;
1335 statistics_watcher_loops.emplace_back(pi1->MakeEventLoop("test"));
1336 statistics_watcher_loops.emplace_back(pi2->MakeEventLoop("test"));
1337 statistics_watcher_loops.emplace_back(pi3->MakeEventLoop("test"));
1338 // The currenct contract is that, if all nodes boot simultaneously in
1339 // simulation, that they should all act as if they area already connected,
1340 // without ever observing the transition from disconnected to connected (note
1341 // that on a real system the ServerStatistics message will get resent for each
1342 // and every new connection, even if the new connections happen
1343 // "simultaneously"--in simulation, we are essentially acting as if we are
1344 // starting execution in an already running system, rather than observing the
1345 // boot process).
1346 for (auto &event_loop : statistics_watcher_loops) {
1347 event_loop->MakeWatcher(
1348 "/aos", [](const message_bridge::ServerStatistics &msg) {
1349 for (const message_bridge::ServerConnection *connection :
1350 *msg.connections()) {
1351 EXPECT_EQ(message_bridge::State::CONNECTED, connection->state())
1352 << connection->node()->name()->string_view();
1353 }
1354 });
1355 }
1356
Austin Schuhc0b0f722020-12-12 18:36:06 -08001357 simulated_event_loop_factory.RunFor(chrono::seconds(2) +
1358 chrono::milliseconds(5));
1359
James Kuszmaul86e86c32022-07-21 17:39:47 -07001360 statistics_watcher_loops.clear();
1361
Austin Schuhc0b0f722020-12-12 18:36:06 -08001362 EXPECT_EQ(pi1_pong_counter.count(), 201u);
1363 EXPECT_EQ(pi2_pong_counter.count(), 201u);
1364
1365 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 20u);
1366 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 20u);
1367 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 20u);
1368 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 20u);
1369 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 20u);
1370 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 20u);
1371 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 20u);
1372
Austin Schuh58646e22021-08-23 23:51:46 -07001373 EXPECT_EQ(pi1_server_statistics_counter->count(), 2u);
1374 EXPECT_EQ(pi2_server_statistics_counter->count(), 2u);
1375 EXPECT_EQ(pi3_server_statistics_counter->count(), 2u);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001376
1377 EXPECT_EQ(pi1_client_statistics_counter.count(), 20u);
1378 EXPECT_EQ(pi2_client_statistics_counter.count(), 20u);
1379 EXPECT_EQ(pi3_client_statistics_counter.count(), 20u);
1380
1381 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001382 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 221);
1383 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 221);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001384
1385 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1386 EXPECT_TRUE(AllConnected(pi1_server_statistics_fetcher.get()))
1387 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1388 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1389 EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
1390 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1391 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1392 EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
1393 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1394 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1395 EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
1396 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1397 EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
1398 EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
1399 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
1400 EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
1401 EXPECT_TRUE(AllConnected(pi3_client_statistics_fetcher.get()))
1402 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1403
Austin Schuh58646e22021-08-23 23:51:46 -07001404 pi1->Disconnect(pi3->node());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001405
1406 simulated_event_loop_factory.RunFor(chrono::seconds(2));
1407
1408 EXPECT_EQ(pi1_pong_counter.count(), 401u);
1409 EXPECT_EQ(pi2_pong_counter.count(), 401u);
1410
1411 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 40u);
1412 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 40u);
1413 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 20u);
1414 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 40u);
1415 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 40u);
1416 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 40u);
1417 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 40u);
1418
Austin Schuh58646e22021-08-23 23:51:46 -07001419 EXPECT_EQ(pi1_server_statistics_counter->count(), 4u);
1420 EXPECT_EQ(pi2_server_statistics_counter->count(), 4u);
1421 EXPECT_EQ(pi3_server_statistics_counter->count(), 4u);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001422
1423 EXPECT_EQ(pi1_client_statistics_counter.count(), 40u);
1424 EXPECT_EQ(pi2_client_statistics_counter.count(), 40u);
1425 EXPECT_EQ(pi3_client_statistics_counter.count(), 40u);
1426
1427 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001428 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 441);
1429 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 441);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001430
1431 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1432 EXPECT_TRUE(AllConnectedBut(pi1_server_statistics_fetcher.get(), "pi3"))
1433 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1434 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1435 EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
1436 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1437 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1438 EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
1439 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1440 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1441 EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
1442 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1443 EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
1444 EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
1445 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
1446 EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
1447 EXPECT_TRUE(AllConnectedBut(pi3_client_statistics_fetcher.get(), "pi1"))
1448 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1449
Austin Schuh58646e22021-08-23 23:51:46 -07001450 pi1->Connect(pi3->node());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001451
1452 simulated_event_loop_factory.RunFor(chrono::seconds(2));
1453
Austin Schuh367a7f42021-11-23 23:04:36 -08001454 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1455 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1456 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1457 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1458 EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
1459 EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
1460
1461 EXPECT_EQ(ConnectedCount(pi1_server_statistics_fetcher.get(), "pi3"), 2u)
1462 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1463 EXPECT_EQ(ConnectedCount(pi1_server_statistics_fetcher.get(), "pi2"), 1u)
1464 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1465 EXPECT_EQ(ConnectedCount(pi1_client_statistics_fetcher.get(), "pi3"), 1u)
1466 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1467 EXPECT_EQ(ConnectedCount(pi1_client_statistics_fetcher.get(), "pi2"), 1u)
1468 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1469
1470 EXPECT_EQ(ConnectedCount(pi2_server_statistics_fetcher.get(), "pi1"), 1u)
1471 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1472 EXPECT_EQ(ConnectedCount(pi2_client_statistics_fetcher.get(), "pi1"), 1u)
1473 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1474
1475 EXPECT_EQ(ConnectedCount(pi3_server_statistics_fetcher.get(), "pi1"), 1u)
1476 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
1477 EXPECT_EQ(ConnectedCount(pi3_client_statistics_fetcher.get(), "pi1"), 2u)
1478 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1479
Austin Schuhc0b0f722020-12-12 18:36:06 -08001480 EXPECT_EQ(pi1_pong_counter.count(), 601u);
1481 EXPECT_EQ(pi2_pong_counter.count(), 601u);
1482
1483 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 60u);
1484 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 60u);
1485 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 40u);
1486 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 60u);
1487 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 60u);
1488 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 60u);
1489 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 60u);
1490
Austin Schuh58646e22021-08-23 23:51:46 -07001491 EXPECT_EQ(pi1_server_statistics_counter->count(), 6u);
1492 EXPECT_EQ(pi2_server_statistics_counter->count(), 6u);
1493 EXPECT_EQ(pi3_server_statistics_counter->count(), 6u);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001494
1495 EXPECT_EQ(pi1_client_statistics_counter.count(), 60u);
1496 EXPECT_EQ(pi2_client_statistics_counter.count(), 60u);
1497 EXPECT_EQ(pi3_client_statistics_counter.count(), 60u);
1498
1499 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001500 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 661);
1501 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 661);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001502
Austin Schuhc0b0f722020-12-12 18:36:06 -08001503 EXPECT_TRUE(AllConnected(pi1_server_statistics_fetcher.get()))
1504 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001505 EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
1506 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001507 EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
1508 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001509 EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
1510 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001511 EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
1512 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001513 EXPECT_TRUE(AllConnected(pi3_client_statistics_fetcher.get()))
1514 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1515}
1516
Austin Schuh2febf0d2020-09-21 22:24:30 -07001517// Tests that the time offset having a slope doesn't break the world.
1518// SimulatedMessageBridge has enough self consistency CHECK statements to
1519// confirm, and we can can also check a message in each direction to make sure
1520// it gets delivered as expected.
1521TEST(SimulatedEventLoopTest, MultinodePingPongWithOffsetAndSlope) {
1522 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
Austin Schuh373f1762021-06-02 21:07:09 -07001523 aos::configuration::ReadConfig(ArtifactPath(
1524 "aos/events/multinode_pingpong_test_combined_config.json"));
Austin Schuh2febf0d2020-09-21 22:24:30 -07001525 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
Austin Schuh87dd3832021-01-01 23:07:31 -08001526 const size_t pi1_index = configuration::GetNodeIndex(&config.message(), pi1);
1527 ASSERT_EQ(pi1_index, 0u);
Austin Schuh2febf0d2020-09-21 22:24:30 -07001528 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
Austin Schuh87dd3832021-01-01 23:07:31 -08001529 const size_t pi2_index = configuration::GetNodeIndex(&config.message(), pi2);
1530 ASSERT_EQ(pi2_index, 1u);
1531 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
1532 const size_t pi3_index = configuration::GetNodeIndex(&config.message(), pi3);
1533 ASSERT_EQ(pi3_index, 2u);
Austin Schuh2febf0d2020-09-21 22:24:30 -07001534
Austin Schuh87dd3832021-01-01 23:07:31 -08001535 message_bridge::TestingTimeConverter time(
1536 configuration::NodesCount(&config.message()));
Austin Schuh2febf0d2020-09-21 22:24:30 -07001537 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
Austin Schuh58646e22021-08-23 23:51:46 -07001538 simulated_event_loop_factory.SetTimeConverter(&time);
Austin Schuh2febf0d2020-09-21 22:24:30 -07001539
Austin Schuh2febf0d2020-09-21 22:24:30 -07001540 constexpr chrono::milliseconds kOffset{150100};
Austin Schuh87dd3832021-01-01 23:07:31 -08001541 time.AddNextTimestamp(
1542 distributed_clock::epoch(),
Austin Schuh58646e22021-08-23 23:51:46 -07001543 {BootTimestamp::epoch(), BootTimestamp::epoch() + kOffset,
1544 BootTimestamp::epoch()});
1545 time.AddNextTimestamp(distributed_clock::epoch() + chrono::seconds(10),
1546 {BootTimestamp::epoch() + chrono::milliseconds(9999),
1547 BootTimestamp::epoch() + kOffset + chrono::seconds(10),
1548 BootTimestamp::epoch() + chrono::milliseconds(9999)});
Austin Schuh2febf0d2020-09-21 22:24:30 -07001549
1550 std::unique_ptr<EventLoop> ping_event_loop =
1551 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
1552 Ping ping(ping_event_loop.get());
1553
1554 std::unique_ptr<EventLoop> pong_event_loop =
1555 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
1556 Pong pong(pong_event_loop.get());
1557
1558 std::unique_ptr<EventLoop> pi1_counter_event_loop =
1559 simulated_event_loop_factory.MakeEventLoop("pi1_counter", pi1);
1560 std::unique_ptr<EventLoop> pi2_counter_event_loop =
1561 simulated_event_loop_factory.MakeEventLoop("pi2_counter", pi2);
1562
1563 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
1564 pi1_counter_event_loop->MakeFetcher<examples::Ping>("/test");
1565 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
1566 pi2_counter_event_loop->MakeFetcher<examples::Ping>("/test");
1567
1568 aos::Fetcher<examples::Pong> pong_on_pi2_fetcher =
1569 pi2_counter_event_loop->MakeFetcher<examples::Pong>("/test");
1570 aos::Fetcher<examples::Pong> pong_on_pi1_fetcher =
1571 pi1_counter_event_loop->MakeFetcher<examples::Pong>("/test");
1572
1573 // End after a pong message comes back. This will leave the latest messages
1574 // on all channels so we can look at timestamps easily and check they make
1575 // sense.
1576 std::unique_ptr<EventLoop> pi1_pong_ender =
1577 simulated_event_loop_factory.MakeEventLoop("pi2_counter", pi1);
1578 int count = 0;
1579 pi1_pong_ender->MakeWatcher(
1580 "/test", [&simulated_event_loop_factory, &count](const examples::Pong &) {
1581 if (++count == 100) {
1582 simulated_event_loop_factory.Exit();
1583 }
1584 });
1585
1586 // Run enough that messages should be delivered.
1587 simulated_event_loop_factory.Run();
1588
1589 // Grab the latest messages.
1590 EXPECT_TRUE(ping_on_pi1_fetcher.Fetch());
1591 EXPECT_TRUE(ping_on_pi2_fetcher.Fetch());
1592 EXPECT_TRUE(pong_on_pi1_fetcher.Fetch());
1593 EXPECT_TRUE(pong_on_pi2_fetcher.Fetch());
1594
1595 // Compute their time on the global distributed clock so we can compute
1596 // distance betwen them.
1597 const distributed_clock::time_point pi1_ping_time =
1598 simulated_event_loop_factory.GetNodeEventLoopFactory(pi1)
1599 ->ToDistributedClock(
1600 ping_on_pi1_fetcher.context().monotonic_event_time);
1601 const distributed_clock::time_point pi2_ping_time =
1602 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
1603 ->ToDistributedClock(
1604 ping_on_pi2_fetcher.context().monotonic_event_time);
1605 const distributed_clock::time_point pi1_pong_time =
1606 simulated_event_loop_factory.GetNodeEventLoopFactory(pi1)
1607 ->ToDistributedClock(
1608 pong_on_pi1_fetcher.context().monotonic_event_time);
1609 const distributed_clock::time_point pi2_pong_time =
1610 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
1611 ->ToDistributedClock(
1612 pong_on_pi2_fetcher.context().monotonic_event_time);
1613
1614 // And confirm the delivery delay is just about exactly 150 uS for both
1615 // directions like expected. There will be a couple ns of rounding errors in
1616 // the conversion functions that aren't worth accounting for right now. This
1617 // will either be really close, or really far.
1618 EXPECT_GE(pi2_ping_time, chrono::microseconds(150) - chrono::nanoseconds(10) +
1619 pi1_ping_time);
1620 EXPECT_LE(pi2_ping_time, chrono::microseconds(150) + chrono::nanoseconds(10) +
1621 pi1_ping_time);
1622
1623 EXPECT_GE(pi1_pong_time, chrono::microseconds(150) - chrono::nanoseconds(10) +
1624 pi2_pong_time);
1625 EXPECT_LE(pi1_pong_time, chrono::microseconds(150) + chrono::nanoseconds(10) +
1626 pi2_pong_time);
1627}
1628
Austin Schuh4c570ea2020-11-19 23:13:24 -08001629void SendPing(aos::Sender<examples::Ping> *sender, int value) {
1630 aos::Sender<examples::Ping>::Builder builder = sender->MakeBuilder();
1631 examples::Ping::Builder ping_builder = builder.MakeBuilder<examples::Ping>();
1632 ping_builder.add_value(value);
milind1f1dca32021-07-03 13:50:07 -07001633 builder.CheckOk(builder.Send(ping_builder.Finish()));
Austin Schuh4c570ea2020-11-19 23:13:24 -08001634}
1635
1636// Tests that reliable (and unreliable) ping messages get forwarded as expected.
Austin Schuh89c9b812021-02-20 14:42:10 -08001637TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeStartupTesting) {
Austin Schuh4c570ea2020-11-19 23:13:24 -08001638 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
1639 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
1640
1641 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
1642
1643 std::unique_ptr<EventLoop> ping_event_loop =
1644 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
1645 aos::Sender<examples::Ping> pi1_reliable_sender =
1646 ping_event_loop->MakeSender<examples::Ping>("/reliable");
1647 aos::Sender<examples::Ping> pi1_unreliable_sender =
1648 ping_event_loop->MakeSender<examples::Ping>("/unreliable");
1649 SendPing(&pi1_reliable_sender, 1);
1650 SendPing(&pi1_unreliable_sender, 1);
1651
1652 std::unique_ptr<EventLoop> pi2_pong_event_loop =
1653 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
James Kuszmaul86e86c32022-07-21 17:39:47 -07001654 aos::Sender<examples::Ping> pi2_reliable_sender =
1655 pi2_pong_event_loop->MakeSender<examples::Ping>("/reliable2");
1656 SendPing(&pi2_reliable_sender, 1);
Austin Schuh4c570ea2020-11-19 23:13:24 -08001657 MessageCounter<examples::Ping> pi2_reliable_counter(pi2_pong_event_loop.get(),
1658 "/reliable");
James Kuszmaul86e86c32022-07-21 17:39:47 -07001659 MessageCounter<examples::Ping> pi1_reliable_counter(ping_event_loop.get(),
1660 "/reliable2");
Austin Schuh4c570ea2020-11-19 23:13:24 -08001661 MessageCounter<examples::Ping> pi2_unreliable_counter(
1662 pi2_pong_event_loop.get(), "/unreliable");
1663 aos::Fetcher<examples::Ping> reliable_on_pi2_fetcher =
1664 pi2_pong_event_loop->MakeFetcher<examples::Ping>("/reliable");
1665 aos::Fetcher<examples::Ping> unreliable_on_pi2_fetcher =
1666 pi2_pong_event_loop->MakeFetcher<examples::Ping>("/unreliable");
1667
1668 const size_t reliable_channel_index = configuration::ChannelIndex(
1669 pi2_pong_event_loop->configuration(), reliable_on_pi2_fetcher.channel());
1670
1671 std::unique_ptr<EventLoop> pi1_remote_timestamp =
1672 simulated_event_loop_factory.MakeEventLoop("pi1_remote_timestamp", pi1);
1673
Austin Schuheeaa2022021-01-02 21:52:03 -08001674 const chrono::nanoseconds network_delay =
1675 simulated_event_loop_factory.network_delay();
1676
Austin Schuh4c570ea2020-11-19 23:13:24 -08001677 int reliable_timestamp_count = 0;
1678 pi1_remote_timestamp->MakeWatcher(
Austin Schuh89c9b812021-02-20 14:42:10 -08001679 shared() ? "/pi1/aos/remote_timestamps/pi2"
1680 : "/pi1/aos/remote_timestamps/pi2/reliable/aos-examples-Ping",
Austin Schuh20ac95d2020-12-05 17:24:19 -08001681 [reliable_channel_index, &reliable_timestamp_count,
Austin Schuheeaa2022021-01-02 21:52:03 -08001682 &simulated_event_loop_factory, pi2, network_delay, &pi2_pong_event_loop,
1683 &pi1_remote_timestamp](const RemoteMessage &header) {
Austin Schuh20ac95d2020-12-05 17:24:19 -08001684 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuhcdd90272021-03-15 12:46:16 -07001685 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
Austin Schuh20ac95d2020-12-05 17:24:19 -08001686 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
Austin Schuhcdd90272021-03-15 12:46:16 -07001687 ->boot_uuid());
Austin Schuh4c570ea2020-11-19 23:13:24 -08001688 VLOG(1) << aos::FlatbufferToJson(&header);
1689 if (header.channel_index() == reliable_channel_index) {
1690 ++reliable_timestamp_count;
1691 }
Austin Schuheeaa2022021-01-02 21:52:03 -08001692
1693 const aos::monotonic_clock::time_point header_monotonic_sent_time(
1694 chrono::nanoseconds(header.monotonic_sent_time()));
1695
1696 EXPECT_EQ(pi1_remote_timestamp->context().monotonic_event_time,
1697 header_monotonic_sent_time + network_delay +
1698 (pi1_remote_timestamp->monotonic_now() -
1699 pi2_pong_event_loop->monotonic_now()));
Austin Schuh4c570ea2020-11-19 23:13:24 -08001700 });
1701
1702 // Wait to let timestamp estimation start up before looking for the results.
1703 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
1704
1705 EXPECT_EQ(pi2_reliable_counter.count(), 1u);
1706 // This one isn't reliable, but was sent before the start. It should *not* be
1707 // delivered.
1708 EXPECT_EQ(pi2_unreliable_counter.count(), 0u);
1709 // Confirm we got a timestamp logged for the message that was forwarded.
1710 EXPECT_EQ(reliable_timestamp_count, 1u);
1711
1712 SendPing(&pi1_reliable_sender, 2);
1713 SendPing(&pi1_unreliable_sender, 2);
1714 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
1715 EXPECT_EQ(pi2_reliable_counter.count(), 2u);
James Kuszmaul86e86c32022-07-21 17:39:47 -07001716 EXPECT_EQ(pi1_reliable_counter.count(), 1u);
Austin Schuh4c570ea2020-11-19 23:13:24 -08001717 EXPECT_EQ(pi2_unreliable_counter.count(), 1u);
1718
1719 EXPECT_EQ(reliable_timestamp_count, 2u);
1720}
1721
Austin Schuh20ac95d2020-12-05 17:24:19 -08001722// Tests that rebooting a node changes the ServerStatistics message and the
1723// RemoteTimestamp message.
Austin Schuh89c9b812021-02-20 14:42:10 -08001724TEST_P(RemoteMessageSimulatedEventLoopTest, BootUUIDTest) {
Austin Schuh72e65682021-09-02 11:37:05 -07001725 const UUID pi1_boot0 = UUID::Random();
1726 const UUID pi2_boot0 = UUID::Random();
1727 const UUID pi2_boot1 = UUID::Random();
1728 const UUID pi3_boot0 = UUID::Random();
1729 UUID expected_boot_uuid = pi2_boot0;
Austin Schuh20ac95d2020-12-05 17:24:19 -08001730
Austin Schuh58646e22021-08-23 23:51:46 -07001731 message_bridge::TestingTimeConverter time(
1732 configuration::NodesCount(&config.message()));
1733 SimulatedEventLoopFactory factory(&config.message());
1734 factory.SetTimeConverter(&time);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001735
Austin Schuh58646e22021-08-23 23:51:46 -07001736 const size_t pi1_index =
1737 configuration::GetNodeIndex(&config.message(), "pi1");
1738 const size_t pi2_index =
1739 configuration::GetNodeIndex(&config.message(), "pi2");
1740 const size_t pi3_index =
1741 configuration::GetNodeIndex(&config.message(), "pi3");
Austin Schuh20ac95d2020-12-05 17:24:19 -08001742
Austin Schuh58646e22021-08-23 23:51:46 -07001743 {
1744 time.AddNextTimestamp(distributed_clock::epoch(),
1745 {BootTimestamp::epoch(), BootTimestamp::epoch(),
1746 BootTimestamp::epoch()});
1747
1748 const chrono::nanoseconds dt = chrono::milliseconds(2001);
1749
1750 time.AddNextTimestamp(
1751 distributed_clock::epoch() + dt,
1752 {BootTimestamp::epoch() + dt,
1753 BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
1754 BootTimestamp::epoch() + dt});
1755
1756 time.set_boot_uuid(pi1_index, 0, pi1_boot0);
1757 time.set_boot_uuid(pi2_index, 0, pi2_boot0);
1758 time.set_boot_uuid(pi2_index, 1, pi2_boot1);
1759 time.set_boot_uuid(pi3_index, 0, pi3_boot0);
1760 }
1761
1762 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1763 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
1764
1765 pi1->OnStartup([pi1]() { pi1->AlwaysStart<Ping>("ping"); });
1766 pi2->OnStartup([pi2]() { pi2->AlwaysStart<Pong>("pong"); });
Austin Schuh20ac95d2020-12-05 17:24:19 -08001767
1768 std::unique_ptr<EventLoop> pi1_remote_timestamp =
Austin Schuh58646e22021-08-23 23:51:46 -07001769 pi1->MakeEventLoop("pi1_remote_timestamp");
Austin Schuh20ac95d2020-12-05 17:24:19 -08001770
1771 int timestamp_count = 0;
1772 pi1_remote_timestamp->MakeWatcher(
Austin Schuh8902fa52021-03-14 22:39:24 -07001773 "/pi2/aos", [&expected_boot_uuid,
1774 &pi1_remote_timestamp](const message_bridge::Timestamp &) {
Austin Schuha9012be2021-07-21 15:19:11 -07001775 EXPECT_EQ(pi1_remote_timestamp->context().source_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -07001776 expected_boot_uuid);
1777 });
1778 pi1_remote_timestamp->MakeWatcher(
1779 "/test",
1780 [&expected_boot_uuid, &pi1_remote_timestamp](const examples::Pong &) {
Austin Schuha9012be2021-07-21 15:19:11 -07001781 EXPECT_EQ(pi1_remote_timestamp->context().source_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -07001782 expected_boot_uuid);
1783 });
1784 pi1_remote_timestamp->MakeWatcher(
Austin Schuh89c9b812021-02-20 14:42:10 -08001785 shared() ? "/pi1/aos/remote_timestamps/pi2"
1786 : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping",
Austin Schuh20ac95d2020-12-05 17:24:19 -08001787 [&timestamp_count, &expected_boot_uuid](const RemoteMessage &header) {
1788 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuhcdd90272021-03-15 12:46:16 -07001789 EXPECT_EQ(UUID::FromVector(header.boot_uuid()), expected_boot_uuid);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001790 VLOG(1) << aos::FlatbufferToJson(&header);
1791 ++timestamp_count;
1792 });
1793
1794 int pi1_server_statistics_count = 0;
Austin Schuh58646e22021-08-23 23:51:46 -07001795 bool first_pi1_server_statistics = true;
Austin Schuh367a7f42021-11-23 23:04:36 -08001796 int boot_number = 0;
1797 monotonic_clock::time_point expected_connection_time = pi1->monotonic_now();
Austin Schuh20ac95d2020-12-05 17:24:19 -08001798 pi1_remote_timestamp->MakeWatcher(
Austin Schuh367a7f42021-11-23 23:04:36 -08001799 "/pi1/aos",
1800 [&pi1_server_statistics_count, &expected_boot_uuid,
1801 &expected_connection_time, &first_pi1_server_statistics,
1802 &boot_number](const message_bridge::ServerStatistics &stats) {
Austin Schuh20ac95d2020-12-05 17:24:19 -08001803 VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
1804 for (const message_bridge::ServerConnection *connection :
1805 *stats.connections()) {
Austin Schuh58646e22021-08-23 23:51:46 -07001806 if (connection->state() == message_bridge::State::CONNECTED) {
1807 ASSERT_TRUE(connection->has_boot_uuid());
1808 }
1809 if (!first_pi1_server_statistics) {
1810 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1811 }
Austin Schuh20ac95d2020-12-05 17:24:19 -08001812 if (connection->node()->name()->string_view() == "pi2") {
Austin Schuh58646e22021-08-23 23:51:46 -07001813 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1814 ASSERT_TRUE(connection->has_boot_uuid());
Austin Schuh20ac95d2020-12-05 17:24:19 -08001815 EXPECT_EQ(expected_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -07001816 UUID::FromString(connection->boot_uuid()))
Austin Schuh20ac95d2020-12-05 17:24:19 -08001817 << " : Got " << aos::FlatbufferToJson(&stats);
Austin Schuh367a7f42021-11-23 23:04:36 -08001818 EXPECT_EQ(monotonic_clock::time_point(chrono::nanoseconds(
1819 connection->connected_since_time())),
1820 expected_connection_time);
1821 EXPECT_EQ(boot_number + 1, connection->connection_count());
Austin Schuh20ac95d2020-12-05 17:24:19 -08001822 ++pi1_server_statistics_count;
1823 }
1824 }
Austin Schuh58646e22021-08-23 23:51:46 -07001825 first_pi1_server_statistics = false;
Austin Schuh20ac95d2020-12-05 17:24:19 -08001826 });
1827
Austin Schuh58646e22021-08-23 23:51:46 -07001828 int pi1_client_statistics_count = 0;
1829 pi1_remote_timestamp->MakeWatcher(
Austin Schuh367a7f42021-11-23 23:04:36 -08001830 "/pi1/aos", [&pi1_client_statistics_count, &expected_boot_uuid,
1831 &expected_connection_time, &boot_number](
Austin Schuh58646e22021-08-23 23:51:46 -07001832 const message_bridge::ClientStatistics &stats) {
1833 VLOG(1) << "pi1 ClientStatistics " << FlatbufferToJson(&stats);
1834 for (const message_bridge::ClientConnection *connection :
1835 *stats.connections()) {
1836 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1837 if (connection->node()->name()->string_view() == "pi2") {
1838 ++pi1_client_statistics_count;
Austin Schuh367a7f42021-11-23 23:04:36 -08001839 EXPECT_EQ(expected_boot_uuid,
1840 UUID::FromString(connection->boot_uuid()))
1841 << " : Got " << aos::FlatbufferToJson(&stats);
1842 EXPECT_EQ(monotonic_clock::time_point(chrono::nanoseconds(
1843 connection->connected_since_time())),
1844 expected_connection_time);
1845 EXPECT_EQ(boot_number + 1, connection->connection_count());
1846 } else {
1847 EXPECT_EQ(connection->connected_since_time(), 0);
1848 EXPECT_EQ(1, connection->connection_count());
Austin Schuh58646e22021-08-23 23:51:46 -07001849 }
1850 }
1851 });
1852
1853 // Confirm that reboot changes the UUID.
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -07001854 pi2->OnShutdown([&expected_boot_uuid, &boot_number, &expected_connection_time,
1855 pi1, pi2, pi2_boot1]() {
1856 expected_boot_uuid = pi2_boot1;
1857 ++boot_number;
1858 LOG(INFO) << "OnShutdown triggered for pi2";
1859 pi2->OnStartup(
1860 [&expected_boot_uuid, &expected_connection_time, pi1, pi2]() {
1861 EXPECT_EQ(expected_boot_uuid, pi2->boot_uuid());
1862 expected_connection_time = pi1->monotonic_now();
1863 });
1864 });
Austin Schuh58646e22021-08-23 23:51:46 -07001865
Austin Schuh20ac95d2020-12-05 17:24:19 -08001866 // Let a couple of ServerStatistics messages show up before rebooting.
Austin Schuh58646e22021-08-23 23:51:46 -07001867 factory.RunFor(chrono::milliseconds(2002));
Austin Schuh20ac95d2020-12-05 17:24:19 -08001868
1869 EXPECT_GT(timestamp_count, 100);
1870 EXPECT_GE(pi1_server_statistics_count, 1u);
1871
Austin Schuh20ac95d2020-12-05 17:24:19 -08001872 timestamp_count = 0;
1873 pi1_server_statistics_count = 0;
1874
Austin Schuh58646e22021-08-23 23:51:46 -07001875 factory.RunFor(chrono::milliseconds(2000));
Austin Schuh20ac95d2020-12-05 17:24:19 -08001876 EXPECT_GT(timestamp_count, 100);
1877 EXPECT_GE(pi1_server_statistics_count, 1u);
1878}
1879
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -07001880INSTANTIATE_TEST_SUITE_P(
Austin Schuh89c9b812021-02-20 14:42:10 -08001881 All, RemoteMessageSimulatedEventLoopTest,
1882 ::testing::Values(
1883 Param{"multinode_pingpong_test_combined_config.json", true},
1884 Param{"multinode_pingpong_test_split_config.json", false}));
1885
Austin Schuh58646e22021-08-23 23:51:46 -07001886// Tests that Startup and Shutdown do reasonable things.
1887TEST(SimulatedEventLoopTest, MultinodePingPongStartup) {
1888 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
1889 aos::configuration::ReadConfig(
1890 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
1891
Austin Schuh72e65682021-09-02 11:37:05 -07001892 size_t pi1_shutdown_counter = 0;
1893 size_t pi2_shutdown_counter = 0;
1894 MessageCounter<examples::Pong> *pi1_pong_counter = nullptr;
1895 MessageCounter<examples::Ping> *pi2_ping_counter = nullptr;
1896
Austin Schuh58646e22021-08-23 23:51:46 -07001897 message_bridge::TestingTimeConverter time(
1898 configuration::NodesCount(&config.message()));
1899 SimulatedEventLoopFactory factory(&config.message());
1900 factory.SetTimeConverter(&time);
1901 time.AddNextTimestamp(
1902 distributed_clock::epoch(),
1903 {BootTimestamp::epoch(), BootTimestamp::epoch(), BootTimestamp::epoch()});
1904
1905 const chrono::nanoseconds dt = chrono::seconds(10) + chrono::milliseconds(6);
1906
1907 time.AddNextTimestamp(
1908 distributed_clock::epoch() + dt,
1909 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
1910 BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
1911 BootTimestamp::epoch() + dt});
1912
1913 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1914 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
1915
1916 // Configure startup to start Ping and Pong, and count.
1917 size_t pi1_startup_counter = 0;
1918 size_t pi2_startup_counter = 0;
1919 pi1->OnStartup([pi1]() {
1920 LOG(INFO) << "Made ping";
1921 pi1->AlwaysStart<Ping>("ping");
1922 });
1923 pi1->OnStartup([&pi1_startup_counter]() { ++pi1_startup_counter; });
1924 pi2->OnStartup([pi2]() {
1925 LOG(INFO) << "Made pong";
1926 pi2->AlwaysStart<Pong>("pong");
1927 });
1928 pi2->OnStartup([&pi2_startup_counter]() { ++pi2_startup_counter; });
1929
1930 // Shutdown just counts.
Austin Schuh58646e22021-08-23 23:51:46 -07001931 pi1->OnShutdown([&pi1_shutdown_counter]() { ++pi1_shutdown_counter; });
1932 pi2->OnShutdown([&pi2_shutdown_counter]() { ++pi2_shutdown_counter; });
1933
Austin Schuh58646e22021-08-23 23:51:46 -07001934 // Automatically make counters on startup.
1935 pi1->OnStartup([&pi1_pong_counter, pi1]() {
1936 pi1_pong_counter = pi1->AlwaysStart<MessageCounter<examples::Pong>>(
1937 "pi1_pong_counter", "/test");
1938 });
1939 pi1->OnShutdown([&pi1_pong_counter]() { pi1_pong_counter = nullptr; });
1940 pi2->OnStartup([&pi2_ping_counter, pi2]() {
1941 pi2_ping_counter = pi2->AlwaysStart<MessageCounter<examples::Ping>>(
1942 "pi2_ping_counter", "/test");
1943 });
1944 pi2->OnShutdown([&pi2_ping_counter]() { pi2_ping_counter = nullptr; });
1945
1946 EXPECT_EQ(pi2_ping_counter, nullptr);
1947 EXPECT_EQ(pi1_pong_counter, nullptr);
1948
1949 EXPECT_EQ(pi1_startup_counter, 0u);
1950 EXPECT_EQ(pi2_startup_counter, 0u);
1951 EXPECT_EQ(pi1_shutdown_counter, 0u);
1952 EXPECT_EQ(pi2_shutdown_counter, 0u);
1953
1954 factory.RunFor(chrono::seconds(10) + chrono::milliseconds(5));
1955 EXPECT_EQ(pi1_startup_counter, 1u);
1956 EXPECT_EQ(pi2_startup_counter, 1u);
1957 EXPECT_EQ(pi1_shutdown_counter, 0u);
1958 EXPECT_EQ(pi2_shutdown_counter, 0u);
1959 EXPECT_EQ(pi2_ping_counter->count(), 1001);
1960 EXPECT_EQ(pi1_pong_counter->count(), 1001);
1961
1962 LOG(INFO) << pi1->monotonic_now();
1963 LOG(INFO) << pi2->monotonic_now();
1964
1965 factory.RunFor(chrono::seconds(5) + chrono::milliseconds(5));
1966
1967 EXPECT_EQ(pi1_startup_counter, 2u);
1968 EXPECT_EQ(pi2_startup_counter, 2u);
1969 EXPECT_EQ(pi1_shutdown_counter, 1u);
1970 EXPECT_EQ(pi2_shutdown_counter, 1u);
1971 EXPECT_EQ(pi2_ping_counter->count(), 501);
1972 EXPECT_EQ(pi1_pong_counter->count(), 501);
1973}
1974
1975// Tests that OnStartup handlers can be added after running and get called, and
1976// can't be called when running.
1977TEST(SimulatedEventLoopDeathTest, OnStartupWhileRunning) {
1978 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
1979 aos::configuration::ReadConfig(
1980 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
1981
1982 // Test that we can add startup handlers as long as we aren't running, and
1983 // they get run when Run gets called again.
1984 // Test that adding a startup handler when running fails.
1985 //
1986 // Test shutdown handlers get called on destruction.
1987 SimulatedEventLoopFactory factory(&config.message());
1988
1989 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1990
1991 int startup_count0 = 0;
1992 int startup_count1 = 0;
1993
1994 pi1->OnStartup([&]() { ++startup_count0; });
1995 EXPECT_EQ(startup_count0, 0);
1996 EXPECT_EQ(startup_count1, 0);
1997
1998 factory.RunFor(chrono::nanoseconds(1));
1999 EXPECT_EQ(startup_count0, 1);
2000 EXPECT_EQ(startup_count1, 0);
2001
2002 pi1->OnStartup([&]() { ++startup_count1; });
2003 EXPECT_EQ(startup_count0, 1);
2004 EXPECT_EQ(startup_count1, 0);
2005
2006 factory.RunFor(chrono::nanoseconds(1));
2007 EXPECT_EQ(startup_count0, 1);
2008 EXPECT_EQ(startup_count1, 1);
2009
2010 std::unique_ptr<EventLoop> loop = pi1->MakeEventLoop("foo");
2011 loop->OnRun([&]() { pi1->OnStartup([]() {}); });
2012
2013 EXPECT_DEATH({ factory.RunFor(chrono::nanoseconds(1)); },
2014 "Can only register OnStartup handlers when not running.");
2015}
2016
2017// Tests that OnStartup handlers can be added after running and get called, and
2018// all the handlers get called on reboot. Shutdown handlers are tested the same
2019// way.
2020TEST(SimulatedEventLoopTest, OnStartupShutdownAllRestarts) {
2021 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2022 aos::configuration::ReadConfig(
2023 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2024
Austin Schuh72e65682021-09-02 11:37:05 -07002025 int startup_count0 = 0;
2026 int shutdown_count0 = 0;
2027 int startup_count1 = 0;
2028 int shutdown_count1 = 0;
2029
Austin Schuh58646e22021-08-23 23:51:46 -07002030 message_bridge::TestingTimeConverter time(
2031 configuration::NodesCount(&config.message()));
2032 SimulatedEventLoopFactory factory(&config.message());
2033 factory.SetTimeConverter(&time);
2034 time.StartEqual();
2035
2036 const chrono::nanoseconds dt = chrono::seconds(10);
2037 time.RebootAt(0, distributed_clock::epoch() + dt);
2038 time.RebootAt(0, distributed_clock::epoch() + 2 * dt);
2039
2040 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2041
Austin Schuh58646e22021-08-23 23:51:46 -07002042 pi1->OnStartup([&]() { ++startup_count0; });
2043 pi1->OnShutdown([&]() { ++shutdown_count0; });
2044 EXPECT_EQ(startup_count0, 0);
2045 EXPECT_EQ(startup_count1, 0);
2046 EXPECT_EQ(shutdown_count0, 0);
2047 EXPECT_EQ(shutdown_count1, 0);
2048
2049 factory.RunFor(chrono::nanoseconds(1));
2050 EXPECT_EQ(startup_count0, 1);
2051 EXPECT_EQ(startup_count1, 0);
2052 EXPECT_EQ(shutdown_count0, 0);
2053 EXPECT_EQ(shutdown_count1, 0);
2054
2055 pi1->OnStartup([&]() { ++startup_count1; });
2056 EXPECT_EQ(startup_count0, 1);
2057 EXPECT_EQ(startup_count1, 0);
2058 EXPECT_EQ(shutdown_count0, 0);
2059 EXPECT_EQ(shutdown_count1, 0);
2060
2061 factory.RunFor(chrono::nanoseconds(1));
2062 EXPECT_EQ(startup_count0, 1);
2063 EXPECT_EQ(startup_count1, 1);
2064 EXPECT_EQ(shutdown_count0, 0);
2065 EXPECT_EQ(shutdown_count1, 0);
2066
2067 factory.RunFor(chrono::seconds(15));
2068
2069 EXPECT_EQ(startup_count0, 2);
2070 EXPECT_EQ(startup_count1, 2);
2071 EXPECT_EQ(shutdown_count0, 1);
2072 EXPECT_EQ(shutdown_count1, 0);
2073
2074 pi1->OnShutdown([&]() { ++shutdown_count1; });
2075 factory.RunFor(chrono::seconds(10));
2076
2077 EXPECT_EQ(startup_count0, 3);
2078 EXPECT_EQ(startup_count1, 3);
2079 EXPECT_EQ(shutdown_count0, 2);
2080 EXPECT_EQ(shutdown_count1, 1);
2081}
2082
2083// Tests that event loops which outlive shutdown crash.
2084TEST(SimulatedEventLoopDeathTest, EventLoopOutlivesReboot) {
2085 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2086 aos::configuration::ReadConfig(
2087 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2088
2089 message_bridge::TestingTimeConverter time(
2090 configuration::NodesCount(&config.message()));
2091 SimulatedEventLoopFactory factory(&config.message());
2092 factory.SetTimeConverter(&time);
2093 time.StartEqual();
2094
2095 const chrono::nanoseconds dt = chrono::seconds(10);
2096 time.RebootAt(0, distributed_clock::epoch() + dt);
2097
2098 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2099
2100 std::unique_ptr<EventLoop> loop = pi1->MakeEventLoop("foo");
2101
2102 EXPECT_DEATH({ factory.RunFor(dt * 2); }, "Event loop");
2103}
2104
Brian Silvermane1fe2512022-08-14 23:18:50 -07002105// Test that an ExitHandle outliving its factory is caught.
2106TEST(SimulatedEventLoopDeathTest, ExitHandleOutlivesFactory) {
2107 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2108 aos::configuration::ReadConfig(
2109 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2110 auto factory = std::make_unique<SimulatedEventLoopFactory>(&config.message());
2111 NodeEventLoopFactory *pi1 = factory->GetNodeEventLoopFactory("pi1");
2112 std::unique_ptr<EventLoop> loop = pi1->MakeEventLoop("foo");
2113 auto exit_handle = factory->MakeExitHandle();
2114 EXPECT_DEATH(factory.reset(),
2115 "All ExitHandles must be destroyed before the factory");
2116}
2117
Austin Schuh3e31f912023-08-21 21:29:10 -07002118// Test that AllowApplicationCreationDuring can't happen in OnRun callbacks.
2119TEST(SimulatedEventLoopDeathTest, AllowApplicationCreationDuringInOnRun) {
2120 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2121 aos::configuration::ReadConfig(
2122 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2123 auto factory = std::make_unique<SimulatedEventLoopFactory>(&config.message());
2124 NodeEventLoopFactory *pi1 = factory->GetNodeEventLoopFactory("pi1");
2125 std::unique_ptr<EventLoop> loop = pi1->MakeEventLoop("foo");
2126 loop->OnRun([&]() { factory->AllowApplicationCreationDuring([]() {}); });
2127 EXPECT_DEATH(factory->RunFor(chrono::seconds(1)), "OnRun");
2128}
2129
Austin Schuh58646e22021-08-23 23:51:46 -07002130// Tests that messages don't survive a reboot of a node.
2131TEST(SimulatedEventLoopTest, ChannelClearedOnReboot) {
2132 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2133 aos::configuration::ReadConfig(
2134 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2135
2136 message_bridge::TestingTimeConverter time(
2137 configuration::NodesCount(&config.message()));
2138 SimulatedEventLoopFactory factory(&config.message());
2139 factory.SetTimeConverter(&time);
2140 time.StartEqual();
2141
2142 const chrono::nanoseconds dt = chrono::seconds(10);
2143 time.RebootAt(0, distributed_clock::epoch() + dt);
2144
2145 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2146
2147 const UUID boot_uuid = pi1->boot_uuid();
2148 EXPECT_NE(boot_uuid, UUID::Zero());
2149
2150 {
2151 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
2152 aos::Sender<examples::Ping> test_message_sender =
2153 ping_event_loop->MakeSender<examples::Ping>("/reliable");
2154 SendPing(&test_message_sender, 1);
2155 }
2156
2157 factory.RunFor(chrono::seconds(5));
2158
2159 {
2160 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
2161 aos::Fetcher<examples::Ping> fetcher =
2162 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
2163 EXPECT_TRUE(fetcher.Fetch());
2164 }
2165
2166 factory.RunFor(chrono::seconds(10));
2167
2168 {
2169 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
2170 aos::Fetcher<examples::Ping> fetcher =
2171 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
2172 EXPECT_FALSE(fetcher.Fetch());
2173 }
2174 EXPECT_NE(boot_uuid, pi1->boot_uuid());
2175}
2176
2177// Tests that reliable messages get resent on reboot.
2178TEST(SimulatedEventLoopTest, ReliableMessageResentOnReboot) {
2179 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2180 aos::configuration::ReadConfig(
2181 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2182
2183 message_bridge::TestingTimeConverter time(
2184 configuration::NodesCount(&config.message()));
2185 SimulatedEventLoopFactory factory(&config.message());
2186 factory.SetTimeConverter(&time);
2187 time.StartEqual();
2188
2189 const chrono::nanoseconds dt = chrono::seconds(1);
2190 time.RebootAt(1, distributed_clock::epoch() + dt);
2191
2192 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2193 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2194
2195 const UUID pi1_boot_uuid = pi1->boot_uuid();
2196 const UUID pi2_boot_uuid = pi2->boot_uuid();
2197 EXPECT_NE(pi1_boot_uuid, UUID::Zero());
2198 EXPECT_NE(pi2_boot_uuid, UUID::Zero());
2199
2200 {
2201 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
2202 aos::Sender<examples::Ping> test_message_sender =
2203 ping_event_loop->MakeSender<examples::Ping>("/reliable");
2204 SendPing(&test_message_sender, 1);
2205 }
2206
2207 factory.RunFor(chrono::milliseconds(500));
2208
2209 {
2210 ::std::unique_ptr<EventLoop> ping_event_loop = pi2->MakeEventLoop("pong");
2211 aos::Fetcher<examples::Ping> fetcher =
2212 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
Austin Schuhac6d89e2024-03-27 14:56:09 -07002213 ASSERT_TRUE(fetcher.Fetch());
2214 EXPECT_EQ(fetcher.context().monotonic_remote_time,
2215 monotonic_clock::epoch());
2216 // Message bridge picks up the Ping message immediately on reboot.
2217 EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
2218 monotonic_clock::epoch());
2219 EXPECT_EQ(fetcher.context().monotonic_event_time,
2220 monotonic_clock::epoch() + factory.network_delay());
2221 ASSERT_FALSE(fetcher.Fetch());
Austin Schuh58646e22021-08-23 23:51:46 -07002222 }
2223
2224 factory.RunFor(chrono::seconds(1));
2225
2226 {
2227 ::std::unique_ptr<EventLoop> ping_event_loop = pi2->MakeEventLoop("pong");
2228 aos::Fetcher<examples::Ping> fetcher =
2229 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
Austin Schuhac6d89e2024-03-27 14:56:09 -07002230 ASSERT_TRUE(fetcher.Fetch());
2231 EXPECT_EQ(fetcher.context().monotonic_remote_time,
2232 monotonic_clock::epoch());
2233 // Message bridge picks up the Ping message immediately on reboot.
2234 EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
2235 monotonic_clock::epoch() + chrono::seconds(1));
2236 EXPECT_EQ(fetcher.context().monotonic_event_time,
2237 monotonic_clock::epoch() + factory.network_delay());
2238 ASSERT_FALSE(fetcher.Fetch());
Austin Schuh58646e22021-08-23 23:51:46 -07002239 }
2240 EXPECT_NE(pi2_boot_uuid, pi2->boot_uuid());
2241}
2242
James Kuszmaul86e86c32022-07-21 17:39:47 -07002243TEST(SimulatedEventLoopTest, ReliableMessageSentOnStaggeredBoot) {
2244 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2245 aos::configuration::ReadConfig(
2246 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2247
2248 message_bridge::TestingTimeConverter time(
2249 configuration::NodesCount(&config.message()));
2250 time.AddNextTimestamp(
2251 distributed_clock::epoch(),
2252 {BootTimestamp{0, monotonic_clock::epoch()},
2253 BootTimestamp{0, monotonic_clock::epoch() - chrono::seconds(1)},
2254 BootTimestamp{0, monotonic_clock::epoch()}});
2255 SimulatedEventLoopFactory factory(&config.message());
2256 factory.SetTimeConverter(&time);
2257
2258 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2259 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2260
2261 const UUID pi1_boot_uuid = pi1->boot_uuid();
2262 const UUID pi2_boot_uuid = pi2->boot_uuid();
2263 EXPECT_NE(pi1_boot_uuid, UUID::Zero());
2264 EXPECT_NE(pi2_boot_uuid, UUID::Zero());
2265
2266 {
2267 ::std::unique_ptr<EventLoop> pi1_event_loop = pi1->MakeEventLoop("ping");
2268 aos::Sender<examples::Ping> pi1_sender =
2269 pi1_event_loop->MakeSender<examples::Ping>("/reliable");
2270 SendPing(&pi1_sender, 1);
2271 }
2272 ::std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("ping");
2273 aos::Sender<examples::Ping> pi2_sender =
2274 pi2_event_loop->MakeSender<examples::Ping>("/reliable2");
2275 SendPing(&pi2_sender, 1);
2276 // Verify that we staggered the OnRun callback correctly.
2277 pi2_event_loop->OnRun([pi1, pi2]() {
2278 EXPECT_EQ(pi1->monotonic_now(),
2279 monotonic_clock::epoch() + std::chrono::seconds(1));
2280 EXPECT_EQ(pi2->monotonic_now(), monotonic_clock::epoch());
2281 });
2282
2283 factory.RunFor(chrono::seconds(2));
2284
2285 {
2286 ::std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("pong");
2287 aos::Fetcher<examples::Ping> fetcher =
2288 pi2_event_loop->MakeFetcher<examples::Ping>("/reliable");
2289 ASSERT_TRUE(fetcher.Fetch());
2290 EXPECT_EQ(fetcher.context().monotonic_event_time,
2291 monotonic_clock::epoch() + factory.network_delay());
2292 EXPECT_EQ(fetcher.context().monotonic_remote_time,
2293 monotonic_clock::epoch());
Austin Schuhac6d89e2024-03-27 14:56:09 -07002294 EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
2295 monotonic_clock::epoch() + chrono::seconds(1));
James Kuszmaul86e86c32022-07-21 17:39:47 -07002296 }
2297 {
2298 ::std::unique_ptr<EventLoop> pi1_event_loop = pi1->MakeEventLoop("pong");
2299 aos::Fetcher<examples::Ping> fetcher =
2300 pi1_event_loop->MakeFetcher<examples::Ping>("/reliable2");
2301 ASSERT_TRUE(fetcher.Fetch());
2302 EXPECT_EQ(fetcher.context().monotonic_event_time,
2303 monotonic_clock::epoch() + std::chrono::seconds(1) +
2304 factory.network_delay());
2305 EXPECT_EQ(fetcher.context().monotonic_remote_time,
2306 monotonic_clock::epoch() - std::chrono::seconds(1));
Austin Schuhac6d89e2024-03-27 14:56:09 -07002307 EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
2308 monotonic_clock::epoch());
James Kuszmaul86e86c32022-07-21 17:39:47 -07002309 }
2310}
2311
Austin Schuh48205e62021-11-12 14:13:18 -08002312class SimulatedEventLoopDisconnectTest : public ::testing::Test {
2313 public:
2314 SimulatedEventLoopDisconnectTest()
2315 : config(aos::configuration::ReadConfig(ArtifactPath(
2316 "aos/events/multinode_pingpong_test_split_config.json"))),
2317 time(configuration::NodesCount(&config.message())),
2318 factory(&config.message()) {
2319 factory.SetTimeConverter(&time);
2320 }
2321
2322 void VerifyChannels(std::set<const aos::Channel *> statistics_channels,
2323 const monotonic_clock::time_point allowable_message_time,
2324 std::set<const aos::Node *> empty_nodes) {
2325 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2326 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2327 std::unique_ptr<aos::EventLoop> pi1_event_loop =
2328 pi1->MakeEventLoop("fetcher");
2329 std::unique_ptr<aos::EventLoop> pi2_event_loop =
2330 pi2->MakeEventLoop("fetcher");
2331 for (const aos::Channel *channel : *factory.configuration()->channels()) {
2332 if (configuration::ChannelIsReadableOnNode(channel,
2333 pi1_event_loop->node())) {
2334 std::unique_ptr<aos::RawFetcher> fetcher =
2335 pi1_event_loop->MakeRawFetcher(channel);
2336 if (statistics_channels.find(channel) == statistics_channels.end() ||
2337 empty_nodes.find(pi1_event_loop->node()) != empty_nodes.end()) {
2338 EXPECT_FALSE(fetcher->Fetch() &&
2339 fetcher->context().monotonic_event_time >
2340 allowable_message_time)
2341 << ": Found recent message on channel "
2342 << configuration::CleanedChannelToString(channel) << " and time "
2343 << fetcher->context().monotonic_event_time << " > "
2344 << allowable_message_time << " on pi1";
2345 } else {
2346 EXPECT_TRUE(fetcher->Fetch() &&
2347 fetcher->context().monotonic_event_time >=
2348 allowable_message_time)
2349 << ": Didn't find recent message on channel "
2350 << configuration::CleanedChannelToString(channel) << " on pi1";
2351 }
2352 }
2353 if (configuration::ChannelIsReadableOnNode(channel,
2354 pi2_event_loop->node())) {
2355 std::unique_ptr<aos::RawFetcher> fetcher =
2356 pi2_event_loop->MakeRawFetcher(channel);
2357 if (statistics_channels.find(channel) == statistics_channels.end() ||
2358 empty_nodes.find(pi2_event_loop->node()) != empty_nodes.end()) {
2359 EXPECT_FALSE(fetcher->Fetch() &&
2360 fetcher->context().monotonic_event_time >
2361 allowable_message_time)
2362 << ": Found message on channel "
2363 << configuration::CleanedChannelToString(channel) << " and time "
2364 << fetcher->context().monotonic_event_time << " > "
2365 << allowable_message_time << " on pi2";
2366 } else {
2367 EXPECT_TRUE(fetcher->Fetch() &&
2368 fetcher->context().monotonic_event_time >=
2369 allowable_message_time)
2370 << ": Didn't find message on channel "
2371 << configuration::CleanedChannelToString(channel) << " on pi2";
2372 }
2373 }
2374 }
2375 }
2376
2377 aos::FlatbufferDetachedBuffer<aos::Configuration> config;
2378
2379 message_bridge::TestingTimeConverter time;
2380 SimulatedEventLoopFactory factory;
2381};
2382
2383// Tests that if we have message bridge client/server disabled, and timing
2384// reports disabled, no messages are sent. Also tests that we can disconnect a
2385// node and disable statistics on it and it actually fully disconnects.
2386TEST_F(SimulatedEventLoopDisconnectTest, NoMessagesWhenDisabled) {
2387 time.StartEqual();
2388 factory.SkipTimingReport();
2389 factory.DisableStatistics();
2390
2391 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2392 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2393
2394 std::unique_ptr<aos::EventLoop> pi1_event_loop =
2395 pi1->MakeEventLoop("fetcher");
2396 std::unique_ptr<aos::EventLoop> pi2_event_loop =
2397 pi2->MakeEventLoop("fetcher");
2398
2399 factory.RunFor(chrono::milliseconds(100000));
2400
2401 // Confirm no messages are sent if we've configured them all off.
2402 VerifyChannels({}, monotonic_clock::min_time, {});
2403
2404 // Now, confirm that all the message_bridge channels come back when we
2405 // re-enable.
2406 factory.EnableStatistics();
2407
2408 factory.RunFor(chrono::milliseconds(10050));
2409
2410 // Build up the list of all the messages we expect when we come back.
2411 {
2412 std::set<const aos::Channel *> statistics_channels;
Brian Silverman18c8aac2022-01-02 00:03:37 -08002413 for (const std::pair<std::string_view, const Node *> &pi :
Austin Schuh48205e62021-11-12 14:13:18 -08002414 std::vector<std::pair<std::string_view, const Node *>>{
2415 {"/pi1/aos", pi1->node()},
2416 {"/pi2/aos", pi1->node()},
2417 {"/pi3/aos", pi1->node()}}) {
2418 statistics_channels.insert(configuration::GetChannel(
2419 factory.configuration(), pi.first, "aos.message_bridge.Timestamp", "",
2420 pi.second));
2421 statistics_channels.insert(configuration::GetChannel(
2422 factory.configuration(), pi.first,
2423 "aos.message_bridge.ServerStatistics", "", pi.second));
2424 statistics_channels.insert(configuration::GetChannel(
2425 factory.configuration(), pi.first,
2426 "aos.message_bridge.ClientStatistics", "", pi.second));
2427 }
2428
2429 statistics_channels.insert(configuration::GetChannel(
2430 factory.configuration(),
2431 "/pi1/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp",
2432 "aos.message_bridge.RemoteMessage", "", pi1->node()));
2433 statistics_channels.insert(configuration::GetChannel(
2434 factory.configuration(),
2435 "/pi2/aos/remote_timestamps/pi1/pi2/aos/aos-message_bridge-Timestamp",
2436 "aos.message_bridge.RemoteMessage", "", pi2->node()));
2437 VerifyChannels(statistics_channels, monotonic_clock::min_time, {});
2438 }
2439
2440 // Now test that we can disable the messages for a single node
2441 pi2->DisableStatistics();
2442 const aos::monotonic_clock::time_point statistics_disable_time =
2443 pi2->monotonic_now();
2444 factory.RunFor(chrono::milliseconds(10000));
2445
2446 // We should see a much smaller set of messages, but should still see messages
2447 // forwarded, mainly the timestamp message.
2448 {
2449 std::set<const aos::Channel *> statistics_channels;
Brian Silverman18c8aac2022-01-02 00:03:37 -08002450 for (const std::pair<std::string_view, const Node *> &pi :
Austin Schuh48205e62021-11-12 14:13:18 -08002451 std::vector<std::pair<std::string_view, const Node *>>{
2452 {"/pi1/aos", pi1->node()}, {"/pi3/aos", pi1->node()}}) {
2453 statistics_channels.insert(configuration::GetChannel(
2454 factory.configuration(), pi.first, "aos.message_bridge.Timestamp", "",
2455 pi.second));
2456 statistics_channels.insert(configuration::GetChannel(
2457 factory.configuration(), pi.first,
2458 "aos.message_bridge.ServerStatistics", "", pi.second));
2459 statistics_channels.insert(configuration::GetChannel(
2460 factory.configuration(), pi.first,
2461 "aos.message_bridge.ClientStatistics", "", pi.second));
2462 }
2463
2464 statistics_channels.insert(configuration::GetChannel(
2465 factory.configuration(),
2466 "/pi1/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp",
2467 "aos.message_bridge.RemoteMessage", "", pi1->node()));
2468 VerifyChannels(statistics_channels, statistics_disable_time, {});
2469 }
2470
2471 // Now, fully disconnect the node. This will completely quiet down pi2.
2472 pi1->Disconnect(pi2->node());
2473 pi2->Disconnect(pi1->node());
2474
2475 const aos::monotonic_clock::time_point disconnect_disable_time =
2476 pi2->monotonic_now();
2477 factory.RunFor(chrono::milliseconds(10000));
2478
2479 {
2480 std::set<const aos::Channel *> statistics_channels;
Brian Silverman18c8aac2022-01-02 00:03:37 -08002481 for (const std::pair<std::string_view, const Node *> &pi :
Austin Schuh48205e62021-11-12 14:13:18 -08002482 std::vector<std::pair<std::string_view, const Node *>>{
2483 {"/pi1/aos", pi1->node()}, {"/pi3/aos", pi1->node()}}) {
2484 statistics_channels.insert(configuration::GetChannel(
2485 factory.configuration(), pi.first, "aos.message_bridge.Timestamp", "",
2486 pi.second));
2487 statistics_channels.insert(configuration::GetChannel(
2488 factory.configuration(), pi.first,
2489 "aos.message_bridge.ServerStatistics", "", pi.second));
2490 statistics_channels.insert(configuration::GetChannel(
2491 factory.configuration(), pi.first,
2492 "aos.message_bridge.ClientStatistics", "", pi.second));
2493 }
2494
2495 VerifyChannels(statistics_channels, disconnect_disable_time, {pi2->node()});
2496 }
2497}
2498
Austin Schuh9cce6842024-04-02 18:55:44 -07002499// Struct to capture the expected time a message should be received (and it's
2500// value). This is from the perspective of the node receiving the message.
2501struct ExpectedTimestamps {
2502 // The time that the message was published on the sending node's monotonic
2503 // clock.
2504 monotonic_clock::time_point remote_time;
2505 // The time that the message was virtually transmitted over the virtual
2506 // network on the sending node's monotonic clock.
2507 monotonic_clock::time_point remote_transmit_time;
2508 // The time that the message was received on the receiving node's clock.
2509 monotonic_clock::time_point event_time;
2510 // The value inside the message.
2511 int value;
2512};
2513
Austin Schuhac6d89e2024-03-27 14:56:09 -07002514// Tests that rapidly sent messages get timestamped correctly.
2515TEST(SimulatedEventLoopTest, TransmitTimestamps) {
2516 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2517 aos::configuration::ReadConfig(
2518 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2519
2520 message_bridge::TestingTimeConverter time(
2521 configuration::NodesCount(&config.message()));
2522 SimulatedEventLoopFactory factory(&config.message());
2523 factory.SetTimeConverter(&time);
2524 time.StartEqual();
2525
2526 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2527 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2528
2529 ::std::unique_ptr<EventLoop> ping_event_loop = pi2->MakeEventLoop("pong");
2530 aos::Fetcher<examples::Ping> fetcher =
2531 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
2532 EXPECT_FALSE(fetcher.Fetch());
2533
2534 {
2535 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
Austin Schuh9cce6842024-04-02 18:55:44 -07002536 FunctionScheduler run_at(ping_event_loop.get());
Austin Schuhac6d89e2024-03-27 14:56:09 -07002537 aos::Sender<examples::Ping> test_message_sender =
2538 ping_event_loop->MakeSender<examples::Ping>("/reliable");
Austin Schuh9cce6842024-04-02 18:55:44 -07002539 aos::monotonic_clock::time_point now = ping_event_loop->monotonic_now();
Austin Schuhac6d89e2024-03-27 14:56:09 -07002540 for (const std::chrono::nanoseconds dt :
2541 {chrono::microseconds(5000), chrono::microseconds(1),
2542 chrono::microseconds(2), chrono::microseconds(70),
Austin Schuh9cce6842024-04-02 18:55:44 -07002543 chrono::microseconds(63), chrono::microseconds(140)}) {
2544 now += dt;
2545 run_at.ScheduleAt([&]() { SendPing(&test_message_sender, 1); }, now);
Austin Schuhac6d89e2024-03-27 14:56:09 -07002546 }
2547
Austin Schuh9cce6842024-04-02 18:55:44 -07002548 now += chrono::milliseconds(10);
2549
2550 factory.RunFor(now - ping_event_loop->monotonic_now());
Austin Schuhac6d89e2024-03-27 14:56:09 -07002551 }
2552
Austin Schuh9cce6842024-04-02 18:55:44 -07002553 const monotonic_clock::time_point e = monotonic_clock::epoch();
2554 const chrono::nanoseconds send_delay = factory.send_delay();
2555 const chrono::nanoseconds network_delay = factory.network_delay();
Austin Schuhac6d89e2024-03-27 14:56:09 -07002556
Austin Schuh9cce6842024-04-02 18:55:44 -07002557 const std::vector<ExpectedTimestamps> expected_values = {
2558 // First message shows up after wakeup + network delay as expected.
2559 ExpectedTimestamps{
2560 .remote_time = e + chrono::microseconds(5000),
2561 .remote_transmit_time = e + chrono::microseconds(5000) + send_delay,
2562 .event_time =
2563 e + chrono::microseconds(5000) + send_delay + network_delay,
2564 .value = 1,
2565 },
2566 // Next message is close enough that it gets picked up at the same wakeup.
2567 ExpectedTimestamps{
2568 .remote_time = e + chrono::microseconds(5001),
2569 .remote_transmit_time = e + chrono::microseconds(5000) + send_delay,
2570 .event_time =
2571 e + chrono::microseconds(5000) + send_delay + network_delay,
2572 .value = 1,
2573 },
2574 // Same for the third.
2575 ExpectedTimestamps{
2576 .remote_time = e + chrono::microseconds(5003),
2577 .remote_transmit_time = e + chrono::microseconds(5000) + send_delay,
2578 .event_time =
2579 e + chrono::microseconds(5000) + send_delay + network_delay,
2580 .value = 1,
2581 },
2582 // Fourth waits long enough to do the right thing.
2583 ExpectedTimestamps{
2584 .remote_time = e + chrono::microseconds(5073),
2585 .remote_transmit_time = e + chrono::microseconds(5073) + send_delay,
2586 .event_time =
2587 e + chrono::microseconds(5073) + send_delay + network_delay,
2588 .value = 1,
2589 },
2590 // Fifth waits long enough to do the right thing as well (but kicks off
2591 // while the fourth is in flight over the network).
2592 ExpectedTimestamps{
2593 .remote_time = e + chrono::microseconds(5136),
2594 .remote_transmit_time = e + chrono::microseconds(5136) + send_delay,
2595 .event_time =
2596 e + chrono::microseconds(5136) + send_delay + network_delay,
2597 .value = 1,
2598 },
2599 // Sixth waits long enough to do the right thing as well (but kicks off
2600 // while the fifth is in flight over the network and has almost landed).
2601 // The timer wakeup for the Timestamp message coming back will find the
2602 // sixth message a little bit early.
2603 ExpectedTimestamps{
2604 .remote_time = e + chrono::microseconds(5276),
2605 .remote_transmit_time = e + chrono::microseconds(5273) + send_delay,
2606 .event_time =
2607 e + chrono::microseconds(5273) + send_delay + network_delay,
2608 .value = 1,
2609 },
2610 };
Austin Schuhac6d89e2024-03-27 14:56:09 -07002611
Austin Schuh9cce6842024-04-02 18:55:44 -07002612 for (const ExpectedTimestamps value : expected_values) {
2613 ASSERT_TRUE(fetcher.FetchNext());
2614 EXPECT_EQ(fetcher.context().monotonic_remote_time, value.remote_time);
2615 EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
2616 value.remote_transmit_time);
2617 EXPECT_EQ(fetcher.context().monotonic_event_time, value.event_time);
2618 EXPECT_EQ(fetcher->value(), value.value);
2619 }
Austin Schuhac6d89e2024-03-27 14:56:09 -07002620
2621 ASSERT_FALSE(fetcher.FetchNext());
2622}
2623
2624// Tests that a reliable message gets forwarded if it was sent originally when
2625// nodes were disconnected.
2626TEST_F(SimulatedEventLoopDisconnectTest, ReliableMessageSendsOnConnect) {
2627 time.StartEqual();
2628 factory.SkipTimingReport();
2629 factory.DisableStatistics();
2630
2631 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2632 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2633
2634 // Fully disconnect the nodes.
2635 pi1->Disconnect(pi2->node());
2636 pi2->Disconnect(pi1->node());
2637
Austin Schuhac6d89e2024-03-27 14:56:09 -07002638 std::unique_ptr<aos::EventLoop> pi2_event_loop =
2639 pi2->MakeEventLoop("fetcher");
2640 aos::Fetcher<examples::Ping> pi2_reliable_fetcher =
2641 pi2_event_loop->MakeFetcher<examples::Ping>("/reliable");
2642
2643 factory.RunFor(chrono::milliseconds(100));
2644
2645 {
Austin Schuheeb86fc2024-04-04 20:12:39 -07002646 std::unique_ptr<aos::EventLoop> pi1_event_loop =
2647 pi1->MakeEventLoop("sender");
Austin Schuhac6d89e2024-03-27 14:56:09 -07002648 aos::Sender<examples::Ping> pi1_reliable_sender =
2649 pi1_event_loop->MakeSender<examples::Ping>("/reliable");
Austin Schuh9cce6842024-04-02 18:55:44 -07002650 FunctionScheduler run_at(pi1_event_loop.get());
2651 aos::monotonic_clock::time_point now = pi1_event_loop->monotonic_now();
Austin Schuhac6d89e2024-03-27 14:56:09 -07002652 for (int i = 0; i < 100; ++i) {
Austin Schuh9cce6842024-04-02 18:55:44 -07002653 run_at.ScheduleAt([&, i = i]() { SendPing(&pi1_reliable_sender, i); },
2654 now);
2655 now += chrono::milliseconds(100);
Austin Schuhac6d89e2024-03-27 14:56:09 -07002656 }
Austin Schuh9cce6842024-04-02 18:55:44 -07002657 now += chrono::milliseconds(50);
Austin Schuhac6d89e2024-03-27 14:56:09 -07002658
Austin Schuh9cce6842024-04-02 18:55:44 -07002659 factory.RunFor(now - pi1_event_loop->monotonic_now());
2660 }
Austin Schuhac6d89e2024-03-27 14:56:09 -07002661
2662 ASSERT_FALSE(pi2_reliable_fetcher.Fetch());
2663
2664 pi1->Connect(pi2->node());
2665 pi2->Connect(pi1->node());
2666
2667 factory.RunFor(chrono::milliseconds(1));
2668
2669 ASSERT_TRUE(pi2_reliable_fetcher.Fetch());
2670 ASSERT_EQ(pi2_reliable_fetcher.context().monotonic_remote_time,
2671 monotonic_clock::epoch() + chrono::milliseconds(10000));
2672 ASSERT_EQ(pi2_reliable_fetcher.context().monotonic_remote_transmit_time,
2673 monotonic_clock::epoch() + chrono::milliseconds(10150));
2674 ASSERT_EQ(pi2_reliable_fetcher.context().monotonic_event_time,
2675 monotonic_clock::epoch() + chrono::milliseconds(10150) +
2676 factory.network_delay());
2677 ASSERT_EQ(pi2_reliable_fetcher->value(), 99);
2678
Austin Schuh9cce6842024-04-02 18:55:44 -07002679 // TODO(austin): Verify that the dropped packet count increases.
2680
Austin Schuhac6d89e2024-03-27 14:56:09 -07002681 ASSERT_FALSE(pi2_reliable_fetcher.Fetch());
2682}
2683
Austin Schuh9cce6842024-04-02 18:55:44 -07002684// Tests that if we disconnect while a message is in various states of being
2685// queued, it gets either dropped or sent as expected.
2686TEST_F(SimulatedEventLoopDisconnectTest, MessageInFlightDuringDisconnect) {
2687 time.StartEqual();
2688 factory.SkipTimingReport();
2689 factory.DisableStatistics();
2690
2691 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2692 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2693
2694 std::unique_ptr<aos::EventLoop> pi1_event_loop = pi1->MakeEventLoop("sender");
2695
2696 std::unique_ptr<aos::EventLoop> pi2_event_loop =
2697 pi2->MakeEventLoop("fetcher");
2698 aos::Fetcher<examples::Ping> fetcher =
2699 pi2_event_loop->MakeFetcher<examples::Ping>("/unreliable");
2700
2701 ASSERT_FALSE(fetcher.Fetch());
2702
2703 aos::monotonic_clock::time_point now = pi1_event_loop->monotonic_now();
2704 {
2705 FunctionScheduler run_at(pi1_event_loop.get());
2706 aos::Sender<examples::Ping> pi1_sender =
2707 pi1_event_loop->MakeSender<examples::Ping>("/unreliable");
2708
2709 int i = 0;
2710 for (const std::chrono::nanoseconds dt :
2711 {chrono::microseconds(5000), chrono::microseconds(1),
2712 chrono::microseconds(2), chrono::microseconds(70),
2713 chrono::microseconds(63), chrono::microseconds(140),
2714 chrono::microseconds(160)}) {
2715 run_at.ScheduleAt(
2716 [&]() {
2717 pi1->Connect(pi2->node());
2718 pi2->Connect(pi1->node());
2719 },
2720 now);
2721
2722 now += chrono::milliseconds(100);
2723
2724 run_at.ScheduleAt([&, i = i]() { SendPing(&pi1_sender, i); }, now);
2725
2726 now += dt;
2727
2728 run_at.ScheduleAt(
2729 [&]() {
2730 // Fully disconnect the nodes.
2731 pi1->Disconnect(pi2->node());
2732 pi2->Disconnect(pi1->node());
2733 },
2734 now);
2735
2736 now += chrono::milliseconds(100) - dt;
2737 ++i;
2738 }
2739
2740 factory.RunFor(now - pi1_event_loop->monotonic_now());
2741 }
2742
2743 const monotonic_clock::time_point e = monotonic_clock::epoch();
2744 const chrono::nanoseconds send_delay = factory.send_delay();
2745 const chrono::nanoseconds network_delay = factory.network_delay();
2746
2747 const std::vector<ExpectedTimestamps> expected_values = {
2748 ExpectedTimestamps{
2749 .remote_time = e + chrono::milliseconds(100),
2750 .remote_transmit_time = e + chrono::milliseconds(100) + send_delay,
2751 .event_time =
2752 e + chrono::milliseconds(100) + send_delay + network_delay,
2753 .value = 0,
2754 },
2755 ExpectedTimestamps{
2756 .remote_time = e + chrono::milliseconds(1300),
2757 .remote_transmit_time = e + chrono::milliseconds(1300) + send_delay,
2758 .event_time =
2759 e + chrono::milliseconds(1300) + send_delay + network_delay,
2760 .value = 6,
2761 },
2762 };
2763
2764 for (const ExpectedTimestamps value : expected_values) {
2765 ASSERT_TRUE(fetcher.FetchNext());
2766 EXPECT_EQ(fetcher.context().monotonic_remote_time, value.remote_time);
2767 EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
2768 value.remote_transmit_time);
2769 EXPECT_EQ(fetcher.context().monotonic_event_time, value.event_time);
2770 EXPECT_EQ(fetcher->value(), value.value);
2771 }
2772
2773 // TODO(austin): Verify that the dropped packet count increases.
2774
2775 ASSERT_FALSE(fetcher.Fetch());
2776}
2777
2778class PingLogger {
2779 public:
2780 PingLogger(aos::EventLoop *event_loop, std::string_view channel,
2781 std::vector<std::pair<aos::Context, int>> *msgs)
2782 : event_loop_(event_loop),
2783 fetcher_(event_loop_->MakeFetcher<examples::Ping>(channel)),
2784 msgs_(msgs) {
2785 event_loop_->OnRun([this]() { CHECK(!fetcher_.Fetch()); });
2786 }
2787
2788 ~PingLogger() {
2789 while (fetcher_.FetchNext()) {
2790 msgs_->emplace_back(fetcher_.context(), fetcher_->value());
2791 }
2792 }
2793
2794 private:
2795 aos::EventLoop *event_loop_;
2796 aos::Fetcher<examples::Ping> fetcher_;
2797 std::vector<std::pair<aos::Context, int>> *msgs_;
2798};
2799
2800// Tests that rebooting while a message is in flight works as expected.
2801TEST_F(SimulatedEventLoopDisconnectTest, MessageInFlightDuringReboot) {
2802 time.StartEqual();
2803 for (int i = 0; i < 8; ++i) {
2804 time.RebootAt(1, distributed_clock::epoch() + chrono::seconds(10 * i));
2805 }
2806
2807 factory.SkipTimingReport();
2808 factory.DisableStatistics();
2809
2810 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2811 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2812
2813 std::unique_ptr<aos::EventLoop> pi1_event_loop = pi1->MakeEventLoop("sender");
2814
2815 aos::monotonic_clock::time_point now = pi1_event_loop->monotonic_now();
2816 FunctionScheduler run_at(pi1_event_loop.get());
2817 aos::Sender<examples::Ping> pi1_sender =
2818 pi1_event_loop->MakeSender<examples::Ping>("/unreliable");
2819
2820 int i = 0;
2821 for (const std::chrono::nanoseconds dt :
2822 {chrono::microseconds(5000), chrono::microseconds(1),
2823 chrono::microseconds(2), chrono::microseconds(70),
2824 chrono::microseconds(63), chrono::microseconds(140),
2825 chrono::microseconds(160)}) {
2826 run_at.ScheduleAt([&, i = i]() { SendPing(&pi1_sender, i); },
2827 now + chrono::seconds(10) - dt);
2828
2829 now += chrono::seconds(10);
2830 ++i;
2831 }
2832
2833 std::vector<std::pair<aos::Context, int>> msgs;
2834
2835 pi2->OnStartup([pi2, &msgs]() {
2836 pi2->AlwaysStart<PingLogger>("ping_logger", "/unreliable", &msgs);
2837 });
2838
2839 factory.RunFor(now - pi1_event_loop->monotonic_now() + chrono::seconds(10));
2840
2841 const monotonic_clock::time_point e = monotonic_clock::epoch();
2842 const chrono::nanoseconds send_delay = factory.send_delay();
2843 const chrono::nanoseconds network_delay = factory.network_delay();
2844
2845 const std::vector<ExpectedTimestamps> expected_values = {
2846 ExpectedTimestamps{
2847 .remote_time = e + chrono::microseconds(9995000),
2848 .remote_transmit_time =
2849 e + chrono::microseconds(9995000) + send_delay,
2850 .event_time =
2851 e + chrono::microseconds(9995000) + send_delay + network_delay,
2852 .value = 0,
2853 },
2854 ExpectedTimestamps{
2855 .remote_time = e + chrono::microseconds(19999999),
2856 .remote_transmit_time =
2857 e + chrono::microseconds(19999999) + send_delay,
2858 .event_time =
2859 e + chrono::microseconds(-1) + send_delay + network_delay,
2860 .value = 1,
2861 },
2862 ExpectedTimestamps{
2863 .remote_time = e + chrono::microseconds(29999998),
2864 .remote_transmit_time =
2865 e + chrono::microseconds(29999998) + send_delay,
2866 .event_time =
2867 e + chrono::microseconds(-2) + send_delay + network_delay,
2868 .value = 2,
2869 },
2870 ExpectedTimestamps{
2871 .remote_time = e + chrono::microseconds(69999840),
2872 .remote_transmit_time =
2873 e + chrono::microseconds(69999840) + send_delay,
2874 .event_time =
2875 e + chrono::microseconds(9999840) + send_delay + network_delay,
2876 .value = 6,
2877 },
2878 };
2879
2880 ASSERT_EQ(msgs.size(), expected_values.size());
2881
2882 for (size_t i = 0; i < msgs.size(); ++i) {
2883 EXPECT_EQ(msgs[i].first.monotonic_remote_time,
2884 expected_values[i].remote_time);
2885 EXPECT_EQ(msgs[i].first.monotonic_remote_transmit_time,
2886 expected_values[i].remote_transmit_time);
2887 EXPECT_EQ(msgs[i].first.monotonic_event_time,
2888 expected_values[i].event_time);
2889 EXPECT_EQ(msgs[i].second, expected_values[i].value);
2890 }
2891
2892 // TODO(austin): Verify that the dropped packet count increases.
2893}
2894
2895// Tests that rebooting while a message is in flight works as expected.
2896TEST_F(SimulatedEventLoopDisconnectTest, ReliableMessageInFlightDuringReboot) {
2897 time.StartEqual();
2898 for (int i = 0; i < 8; ++i) {
2899 time.RebootAt(1, distributed_clock::epoch() + chrono::seconds(10 * i));
2900 }
2901
2902 factory.SkipTimingReport();
2903 factory.DisableStatistics();
2904
2905 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2906 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2907
2908 std::unique_ptr<aos::EventLoop> pi1_event_loop = pi1->MakeEventLoop("sender");
2909
2910 aos::monotonic_clock::time_point now = pi1_event_loop->monotonic_now();
2911 FunctionScheduler run_at(pi1_event_loop.get());
2912 aos::Sender<examples::Ping> pi1_sender =
2913 pi1_event_loop->MakeSender<examples::Ping>("/reliable");
2914
2915 int i = 0;
2916 for (const std::chrono::nanoseconds dt :
2917 {chrono::microseconds(5000), chrono::microseconds(1),
2918 chrono::microseconds(2), chrono::microseconds(70),
2919 chrono::microseconds(63), chrono::microseconds(140),
2920 chrono::microseconds(160)}) {
2921 run_at.ScheduleAt([&, i = i]() { SendPing(&pi1_sender, i); },
2922 now + chrono::seconds(10) - dt);
2923
2924 now += chrono::seconds(10);
2925 ++i;
2926 }
2927
2928 std::vector<std::pair<aos::Context, int>> msgs;
2929
2930 PingLogger *logger;
2931 pi2->OnStartup([pi2, &msgs, &logger]() {
2932 logger = pi2->AlwaysStart<PingLogger>("ping_logger", "/reliable", &msgs);
2933 });
2934
2935 factory.RunFor(now - pi1_event_loop->monotonic_now() + chrono::seconds(10));
2936
2937 // Stop the logger to flush the last boot of data.
2938 pi2->Stop(logger);
2939
2940 const monotonic_clock::time_point e = monotonic_clock::epoch();
2941 const chrono::nanoseconds send_delay = factory.send_delay();
2942 const chrono::nanoseconds network_delay = factory.network_delay();
2943
2944 // Verified using --vmodule=simulated_event_loop=1 and looking at the actual
2945 // event times to confirm what should have been forwarded when.
2946 const std::vector<ExpectedTimestamps> expected_values = {
2947 ExpectedTimestamps{
2948 .remote_time = e + chrono::microseconds(9995000),
2949 .remote_transmit_time =
2950 e + chrono::microseconds(9995000) + send_delay,
2951 .event_time =
2952 e + chrono::microseconds(9995000) + send_delay + network_delay,
2953 .value = 0,
2954 },
2955 ExpectedTimestamps{
2956 .remote_time = e + chrono::microseconds(9995000),
2957 .remote_transmit_time = e + chrono::microseconds(10000000),
2958 .event_time = e + network_delay,
2959 .value = 0,
2960 },
2961 ExpectedTimestamps{
2962 .remote_time = e + chrono::microseconds(19999999),
2963 .remote_transmit_time = e + chrono::microseconds(20000000),
2964 .event_time = e + network_delay,
2965 .value = 1,
2966 },
2967 ExpectedTimestamps{
2968 .remote_time = e + chrono::microseconds(29999998),
2969 .remote_transmit_time = e + chrono::microseconds(30000000),
2970 .event_time = e + network_delay,
2971 .value = 2,
2972 },
2973 ExpectedTimestamps{
2974 .remote_time = e + chrono::microseconds(39999930),
2975 .remote_transmit_time = e + chrono::microseconds(40000000),
2976 .event_time = e + network_delay,
2977 .value = 3,
2978 },
2979 ExpectedTimestamps{
2980 .remote_time = e + chrono::microseconds(49999937),
2981 .remote_transmit_time = e + chrono::microseconds(50000000),
2982 .event_time = e + network_delay,
2983 .value = 4,
2984 },
2985 ExpectedTimestamps{
2986 .remote_time = e + chrono::microseconds(59999860),
2987 .remote_transmit_time = e + chrono::microseconds(60000000),
2988 .event_time = e + network_delay,
2989 .value = 5,
2990 },
2991 ExpectedTimestamps{
2992 .remote_time = e + chrono::microseconds(69999840),
2993 .remote_transmit_time = e + chrono::microseconds(69999890),
2994 .event_time = e + chrono::microseconds(9999890) + network_delay,
2995 .value = 6,
2996 },
2997 ExpectedTimestamps{
2998 .remote_time = e + chrono::microseconds(69999840),
2999 .remote_transmit_time = e + chrono::microseconds(70000000),
3000 .event_time = e + network_delay,
3001 .value = 6,
3002 },
3003 };
3004
3005 ASSERT_EQ(msgs.size(), expected_values.size());
3006
3007 for (size_t i = 0; i < msgs.size(); ++i) {
3008 EXPECT_EQ(msgs[i].first.monotonic_remote_time,
3009 expected_values[i].remote_time);
3010 EXPECT_EQ(msgs[i].first.monotonic_remote_transmit_time,
3011 expected_values[i].remote_transmit_time);
3012 EXPECT_EQ(msgs[i].first.monotonic_event_time,
3013 expected_values[i].event_time);
3014 EXPECT_EQ(msgs[i].second, expected_values[i].value);
3015 }
3016
3017 // TODO(austin): Verify that the dropped packet count increases.
3018}
3019
Stephan Pleinesf63bde82024-01-13 15:59:33 -08003020} // namespace aos::testing