blob: 2d59c5de0cefd8d450286d076e760f2952dedf01 [file] [log] [blame]
Alex Perrycb7da4b2019-08-28 19:35:56 -07001#include "aos/events/simulated_event_loop.h"
2
milind1f1dca32021-07-03 13:50:07 -07003#include <chrono>
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -07004#include <functional>
Austin Schuh5f1cc5c2019-12-01 18:01:11 -08005#include <string_view>
6
Alex Perrycb7da4b2019-08-28 19:35:56 -07007#include "aos/events/event_loop_param_test.h"
Austin Schuh2f8fd752020-09-01 22:38:28 -07008#include "aos/events/logging/logger_generated.h"
Austin Schuh01b4c352020-09-21 23:09:39 -07009#include "aos/events/message_counter.h"
Austin Schuh898f4972020-01-11 17:21:25 -080010#include "aos/events/ping_lib.h"
11#include "aos/events/pong_lib.h"
Austin Schuh7d87b672019-12-01 20:23:49 -080012#include "aos/events/test_message_generated.h"
Austin Schuh4c3b9702020-08-30 11:34:55 -070013#include "aos/network/message_bridge_client_generated.h"
14#include "aos/network/message_bridge_server_generated.h"
Austin Schuh0de30f32020-12-06 12:44:28 -080015#include "aos/network/remote_message_generated.h"
Austin Schuh87dd3832021-01-01 23:07:31 -080016#include "aos/network/testing_time_converter.h"
Austin Schuh4c3b9702020-08-30 11:34:55 -070017#include "aos/network/timestamp_generated.h"
Austin Schuh373f1762021-06-02 21:07:09 -070018#include "aos/testing/path.h"
Neil Balchc8f41ed2018-01-20 22:06:53 -080019#include "gtest/gtest.h"
20
21namespace aos {
22namespace testing {
Brian Silverman28d14302020-09-18 15:26:17 -070023namespace {
24
Austin Schuh373f1762021-06-02 21:07:09 -070025using aos::testing::ArtifactPath;
Brian Silverman28d14302020-09-18 15:26:17 -070026
Austin Schuh58646e22021-08-23 23:51:46 -070027using logger::BootTimestamp;
Austin Schuh0de30f32020-12-06 12:44:28 -080028using message_bridge::RemoteMessage;
Austin Schuh7267c532019-05-19 19:55:53 -070029namespace chrono = ::std::chrono;
30
Austin Schuh0de30f32020-12-06 12:44:28 -080031} // namespace
32
Neil Balchc8f41ed2018-01-20 22:06:53 -080033class SimulatedEventLoopTestFactory : public EventLoopTestFactory {
34 public:
Austin Schuh5f1cc5c2019-12-01 18:01:11 -080035 ::std::unique_ptr<EventLoop> Make(std::string_view name) override {
Austin Schuh217a9782019-12-21 23:02:50 -080036 MaybeMake();
Austin Schuhac0771c2020-01-07 18:36:30 -080037 return event_loop_factory_->MakeEventLoop(name, my_node());
Neil Balchc8f41ed2018-01-20 22:06:53 -080038 }
Austin Schuh5f1cc5c2019-12-01 18:01:11 -080039 ::std::unique_ptr<EventLoop> MakePrimary(std::string_view name) override {
Austin Schuh217a9782019-12-21 23:02:50 -080040 MaybeMake();
Austin Schuhac0771c2020-01-07 18:36:30 -080041 return event_loop_factory_->MakeEventLoop(name, my_node());
Austin Schuh44019f92019-05-19 19:58:27 -070042 }
43
Austin Schuh217a9782019-12-21 23:02:50 -080044 void Run() override { event_loop_factory_->Run(); }
45 void Exit() override { event_loop_factory_->Exit(); }
Austin Schuh44019f92019-05-19 19:58:27 -070046
Austin Schuh52d325c2019-06-23 18:59:06 -070047 // TODO(austin): Implement this. It's used currently for a phased loop test.
48 // I'm not sure how much that matters.
49 void SleepFor(::std::chrono::nanoseconds /*duration*/) override {}
50
Austin Schuh7d87b672019-12-01 20:23:49 -080051 void set_send_delay(std::chrono::nanoseconds send_delay) {
Austin Schuh217a9782019-12-21 23:02:50 -080052 MaybeMake();
53 event_loop_factory_->set_send_delay(send_delay);
Austin Schuh7d87b672019-12-01 20:23:49 -080054 }
55
Neil Balchc8f41ed2018-01-20 22:06:53 -080056 private:
Austin Schuh217a9782019-12-21 23:02:50 -080057 void MaybeMake() {
58 if (!event_loop_factory_) {
59 if (configuration()->has_nodes()) {
Austin Schuhac0771c2020-01-07 18:36:30 -080060 event_loop_factory_ =
61 std::make_unique<SimulatedEventLoopFactory>(configuration());
Austin Schuh217a9782019-12-21 23:02:50 -080062 } else {
63 event_loop_factory_ =
64 std::make_unique<SimulatedEventLoopFactory>(configuration());
65 }
66 }
67 }
68 std::unique_ptr<SimulatedEventLoopFactory> event_loop_factory_;
Neil Balchc8f41ed2018-01-20 22:06:53 -080069};
70
Austin Schuh6bae8252021-02-07 22:01:49 -080071auto CommonParameters() {
72 return ::testing::Combine(
73 ::testing::Values([]() { return new SimulatedEventLoopTestFactory(); }),
74 ::testing::Values(ReadMethod::COPY, ReadMethod::PIN),
75 ::testing::Values(DoTimingReports::kYes, DoTimingReports::kNo));
76}
Austin Schuh6b6dfa52019-06-12 20:16:20 -070077
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -070078INSTANTIATE_TEST_SUITE_P(SimulatedEventLoopCommonTest, AbstractEventLoopTest,
Austin Schuh66168842021-08-17 19:42:21 -070079 CommonParameters());
Brian Silverman77162972020-08-12 19:52:40 -070080
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -070081INSTANTIATE_TEST_SUITE_P(SimulatedEventLoopCommonDeathTest,
Austin Schuh66168842021-08-17 19:42:21 -070082 AbstractEventLoopDeathTest, CommonParameters());
Neil Balchc8f41ed2018-01-20 22:06:53 -080083
Austin Schuh89c9b812021-02-20 14:42:10 -080084// Parameters to run all the tests with.
85struct Param {
86 // The config file to use.
87 std::string config;
88 // If true, the RemoteMessage channel should be shared between all the remote
89 // channels. If false, there will be 1 RemoteMessage channel per remote
90 // channel.
91 bool shared;
92};
93
94class RemoteMessageSimulatedEventLoopTest
95 : public ::testing::TestWithParam<struct Param> {
96 public:
97 RemoteMessageSimulatedEventLoopTest()
98 : config(aos::configuration::ReadConfig(
Austin Schuh373f1762021-06-02 21:07:09 -070099 ArtifactPath(absl::StrCat("aos/events/", GetParam().config)))) {
Austin Schuh89c9b812021-02-20 14:42:10 -0800100 LOG(INFO) << "Config " << GetParam().config;
101 }
102
103 bool shared() const { return GetParam().shared; }
104
105 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
106 MakePi2OnPi1MessageCounters(aos::EventLoop *event_loop) {
107 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>> counters;
108 if (shared()) {
109 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
110 event_loop, "/aos/remote_timestamps/pi2"));
111 } else {
112 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
113 event_loop,
114 "/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp"));
115 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
116 event_loop, "/aos/remote_timestamps/pi2/test/aos-examples-Ping"));
117 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
118 event_loop, "/aos/remote_timestamps/pi2/reliable/aos-examples-Ping"));
119 }
120 return counters;
121 }
122
123 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
124 MakePi1OnPi2MessageCounters(aos::EventLoop *event_loop) {
125 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>> counters;
126 if (shared()) {
127 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
128 event_loop, "/aos/remote_timestamps/pi1"));
129 } else {
130 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
131 event_loop, "/aos/remote_timestamps/pi1/test/aos-examples-Pong"));
132 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
133 event_loop,
134 "/aos/remote_timestamps/pi1/pi2/aos/aos-message_bridge-Timestamp"));
135 }
136 return counters;
137 }
138
139 aos::FlatbufferDetachedBuffer<aos::Configuration> config;
140};
141
Austin Schuh8fb315a2020-11-19 22:33:58 -0800142// Test that sending a message after running gets properly notified.
143TEST(SimulatedEventLoopTest, SendAfterRunFor) {
144 SimulatedEventLoopTestFactory factory;
145
146 SimulatedEventLoopFactory simulated_event_loop_factory(
147 factory.configuration());
148
149 ::std::unique_ptr<EventLoop> ping_event_loop =
150 simulated_event_loop_factory.MakeEventLoop("ping");
151 aos::Sender<TestMessage> test_message_sender =
152 ping_event_loop->MakeSender<TestMessage>("/test");
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700153 ASSERT_EQ(SendTestMessage(test_message_sender), RawSender::Error::kOk);
Austin Schuh8fb315a2020-11-19 22:33:58 -0800154
155 std::unique_ptr<EventLoop> pong1_event_loop =
156 simulated_event_loop_factory.MakeEventLoop("pong");
157 MessageCounter<TestMessage> test_message_counter1(pong1_event_loop.get(),
158 "/test");
159
160 EXPECT_FALSE(ping_event_loop->is_running());
161
162 // Watchers start when you start running, so there should be nothing counted.
163 simulated_event_loop_factory.RunFor(chrono::seconds(1));
164 EXPECT_EQ(test_message_counter1.count(), 0u);
165
166 std::unique_ptr<EventLoop> pong2_event_loop =
167 simulated_event_loop_factory.MakeEventLoop("pong");
168 MessageCounter<TestMessage> test_message_counter2(pong2_event_loop.get(),
169 "/test");
170
171 // Pauses in the middle don't count though, so this should be counted.
172 // But, the fresh watcher shouldn't pick it up yet.
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700173 ASSERT_EQ(SendTestMessage(test_message_sender), RawSender::Error::kOk);
Austin Schuh8fb315a2020-11-19 22:33:58 -0800174
175 EXPECT_EQ(test_message_counter1.count(), 0u);
176 EXPECT_EQ(test_message_counter2.count(), 0u);
177 simulated_event_loop_factory.RunFor(chrono::seconds(1));
178
179 EXPECT_EQ(test_message_counter1.count(), 1u);
180 EXPECT_EQ(test_message_counter2.count(), 0u);
181}
182
Austin Schuh60e77942022-05-16 17:48:24 -0700183// Test that if we configure an event loop to be able to send too fast that we
184// do allow it to do so.
James Kuszmaul890c2492022-04-06 14:59:31 -0700185TEST(SimulatedEventLoopTest, AllowSendTooFast) {
186 SimulatedEventLoopTestFactory factory;
187
188 SimulatedEventLoopFactory simulated_event_loop_factory(
189 factory.configuration());
190
191 // Create two event loops: One will be allowed to send too fast, one won't. We
192 // will then test to ensure that the one that is allowed to send too fast can
193 // indeed send too fast, but that it then makes it so that the second event
194 // loop can no longer send anything because *it* is still limited.
195 ::std::unique_ptr<EventLoop> too_fast_event_loop =
196 simulated_event_loop_factory.GetNodeEventLoopFactory(nullptr)
197 ->MakeEventLoop("too_fast_sender",
198 {NodeEventLoopFactory::CheckSentTooFast::kNo,
James Kuszmaul94ca5132022-07-19 09:11:08 -0700199 NodeEventLoopFactory::ExclusiveSenders::kNo,
200 {}});
James Kuszmaul890c2492022-04-06 14:59:31 -0700201 aos::Sender<TestMessage> too_fast_message_sender =
202 too_fast_event_loop->MakeSender<TestMessage>("/test");
203
204 ::std::unique_ptr<EventLoop> limited_event_loop =
205 simulated_event_loop_factory.MakeEventLoop("limited_sender");
206 aos::Sender<TestMessage> limited_message_sender =
207 limited_event_loop->MakeSender<TestMessage>("/test");
208
209 const int queue_size = TestChannelQueueSize(too_fast_event_loop.get());
210 for (int ii = 0; ii < queue_size; ++ii) {
211 ASSERT_EQ(SendTestMessage(too_fast_message_sender), RawSender::Error::kOk);
212 }
213 // And now we should start being in the sending-too-fast phase.
214 for (int ii = 0; ii < queue_size; ++ii) {
215 ASSERT_EQ(SendTestMessage(too_fast_message_sender), RawSender::Error::kOk);
Austin Schuh60e77942022-05-16 17:48:24 -0700216 ASSERT_EQ(SendTestMessage(limited_message_sender),
217 RawSender::Error::kMessagesSentTooFast);
James Kuszmaul890c2492022-04-06 14:59:31 -0700218 }
219}
220
221// Test that if we setup an exclusive sender that it is indeed exclusive.
222TEST(SimulatedEventLoopDeathTest, ExclusiveSenders) {
223 SimulatedEventLoopTestFactory factory;
224
225 SimulatedEventLoopFactory simulated_event_loop_factory(
226 factory.configuration());
227
228 ::std::unique_ptr<EventLoop> exclusive_event_loop =
229 simulated_event_loop_factory.GetNodeEventLoopFactory(nullptr)
James Kuszmaul94ca5132022-07-19 09:11:08 -0700230 ->MakeEventLoop(
231 "too_fast_sender",
232 {NodeEventLoopFactory::CheckSentTooFast::kYes,
233 NodeEventLoopFactory::ExclusiveSenders::kYes,
234 {{configuration::GetChannel(factory.configuration(), "/test1",
235 "aos.TestMessage", "", nullptr),
236 NodeEventLoopFactory::ExclusiveSenders::kNo}}});
James Kuszmaul890c2492022-04-06 14:59:31 -0700237 exclusive_event_loop->SkipAosLog();
238 exclusive_event_loop->SkipTimingReport();
239 ::std::unique_ptr<EventLoop> normal_event_loop =
240 simulated_event_loop_factory.MakeEventLoop("limited_sender");
241 // Set things up to have the exclusive sender be destroyed so we can test
242 // recovery.
243 {
244 aos::Sender<TestMessage> exclusive_sender =
245 exclusive_event_loop->MakeSender<TestMessage>("/test");
246
247 EXPECT_DEATH(normal_event_loop->MakeSender<TestMessage>("/test"),
248 "TestMessage");
249 }
250 // This one should succeed now that the exclusive channel is removed.
251 aos::Sender<TestMessage> normal_sender =
252 normal_event_loop->MakeSender<TestMessage>("/test");
Austin Schuh60e77942022-05-16 17:48:24 -0700253 EXPECT_DEATH(exclusive_event_loop->MakeSender<TestMessage>("/test"),
254 "TestMessage");
James Kuszmaul94ca5132022-07-19 09:11:08 -0700255
256 // And check an explicitly exempted channel:
257 aos::Sender<TestMessage> non_exclusive_sender =
258 exclusive_event_loop->MakeSender<TestMessage>("/test1");
259 aos::Sender<TestMessage> non_exclusive_sender_regular_event_loop =
260 normal_event_loop->MakeSender<TestMessage>("/test1");
James Kuszmaul890c2492022-04-06 14:59:31 -0700261}
262
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700263void TestSentTooFastCheckEdgeCase(
264 const std::function<RawSender::Error(int, int)> expected_err,
265 const bool send_twice_at_end) {
266 SimulatedEventLoopTestFactory factory;
267
268 auto event_loop = factory.MakePrimary("primary");
269
270 auto sender = event_loop->MakeSender<TestMessage>("/test");
271
272 const int queue_size = TestChannelQueueSize(event_loop.get());
273 int msgs_sent = 0;
274 event_loop->AddPhasedLoop(
275 [&](int) {
276 EXPECT_EQ(SendTestMessage(sender), expected_err(msgs_sent, queue_size));
277 msgs_sent++;
278
279 // If send_twice_at_end, send the last two messages (message
280 // queue_size and queue_size + 1) in the same iteration, meaning that
281 // we would be sending very slightly too fast. Otherwise, we will send
282 // message queue_size + 1 in the next iteration and we will continue
283 // to be sending exactly at the channel frequency.
284 if (send_twice_at_end && (msgs_sent == queue_size)) {
285 EXPECT_EQ(SendTestMessage(sender),
286 expected_err(msgs_sent, queue_size));
287 msgs_sent++;
288 }
289
290 if (msgs_sent > queue_size) {
291 factory.Exit();
292 }
293 },
294 std::chrono::duration_cast<std::chrono::nanoseconds>(
295 std::chrono::duration<double>(
296 1.0 / TestChannelFrequency(event_loop.get()))));
297
298 factory.Run();
299}
300
301// Tests that RawSender::Error::kMessagesSentTooFast is not returned
302// when messages are sent at the exact frequency of the channel.
303TEST(SimulatedEventLoopTest, SendingAtExactlyChannelFrequency) {
304 TestSentTooFastCheckEdgeCase([](int, int) { return RawSender::Error::kOk; },
305 false);
306}
307
308// Tests that RawSender::Error::kMessagesSentTooFast is returned
309// when sending exactly one more message than allowed in a channel storage
310// duration.
311TEST(SimulatedEventLoopTest, SendingSlightlyTooFast) {
312 TestSentTooFastCheckEdgeCase(
313 [](const int msgs_sent, const int queue_size) {
314 return (msgs_sent == queue_size ? RawSender::Error::kMessagesSentTooFast
315 : RawSender::Error::kOk);
316 },
317 true);
318}
319
Austin Schuh8fb315a2020-11-19 22:33:58 -0800320// Test that creating an event loop while running dies.
321TEST(SimulatedEventLoopDeathTest, MakeEventLoopWhileRunning) {
322 SimulatedEventLoopTestFactory factory;
323
324 SimulatedEventLoopFactory simulated_event_loop_factory(
325 factory.configuration());
326
327 ::std::unique_ptr<EventLoop> event_loop =
328 simulated_event_loop_factory.MakeEventLoop("ping");
329
330 auto timer = event_loop->AddTimer([&]() {
331 EXPECT_DEATH(
332 {
333 ::std::unique_ptr<EventLoop> event_loop2 =
334 simulated_event_loop_factory.MakeEventLoop("ping");
335 },
336 "event loop while running");
337 simulated_event_loop_factory.Exit();
338 });
339
340 event_loop->OnRun([&event_loop, &timer] {
341 timer->Setup(event_loop->monotonic_now() + chrono::milliseconds(50));
342 });
343
344 simulated_event_loop_factory.Run();
345}
346
347// Test that creating a watcher after running dies.
348TEST(SimulatedEventLoopDeathTest, MakeWatcherAfterRunning) {
349 SimulatedEventLoopTestFactory factory;
350
351 SimulatedEventLoopFactory simulated_event_loop_factory(
352 factory.configuration());
353
354 ::std::unique_ptr<EventLoop> event_loop =
355 simulated_event_loop_factory.MakeEventLoop("ping");
356
357 simulated_event_loop_factory.RunFor(chrono::seconds(1));
358
359 EXPECT_DEATH(
360 { MessageCounter<TestMessage> counter(event_loop.get(), "/test"); },
361 "Can't add a watcher after running");
362
363 ::std::unique_ptr<EventLoop> event_loop2 =
364 simulated_event_loop_factory.MakeEventLoop("ping");
365
366 simulated_event_loop_factory.RunFor(chrono::seconds(1));
367
368 EXPECT_DEATH(
369 { MessageCounter<TestMessage> counter(event_loop2.get(), "/test"); },
370 "Can't add a watcher after running");
371}
372
Austin Schuh44019f92019-05-19 19:58:27 -0700373// Test that running for a time period with no handlers causes time to progress
374// correctly.
375TEST(SimulatedEventLoopTest, RunForNoHandlers) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800376 SimulatedEventLoopTestFactory factory;
377
378 SimulatedEventLoopFactory simulated_event_loop_factory(
379 factory.configuration());
Austin Schuh44019f92019-05-19 19:58:27 -0700380 ::std::unique_ptr<EventLoop> event_loop =
Austin Schuh5f1cc5c2019-12-01 18:01:11 -0800381 simulated_event_loop_factory.MakeEventLoop("loop");
Austin Schuh44019f92019-05-19 19:58:27 -0700382
383 simulated_event_loop_factory.RunFor(chrono::seconds(1));
384
385 EXPECT_EQ(::aos::monotonic_clock::epoch() + chrono::seconds(1),
Austin Schuh44019f92019-05-19 19:58:27 -0700386 event_loop->monotonic_now());
387}
388
389// Test that running for a time with a periodic handler causes time to end
390// correctly.
391TEST(SimulatedEventLoopTest, RunForTimerHandler) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800392 SimulatedEventLoopTestFactory factory;
393
394 SimulatedEventLoopFactory simulated_event_loop_factory(
395 factory.configuration());
Austin Schuh44019f92019-05-19 19:58:27 -0700396 ::std::unique_ptr<EventLoop> event_loop =
Austin Schuh5f1cc5c2019-12-01 18:01:11 -0800397 simulated_event_loop_factory.MakeEventLoop("loop");
Austin Schuh44019f92019-05-19 19:58:27 -0700398
399 int counter = 0;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700400 auto timer = event_loop->AddTimer([&counter]() { ++counter; });
Austin Schuh44019f92019-05-19 19:58:27 -0700401 event_loop->OnRun([&event_loop, &timer] {
402 timer->Setup(event_loop->monotonic_now() + chrono::milliseconds(50),
403 chrono::milliseconds(100));
404 });
405
406 simulated_event_loop_factory.RunFor(chrono::seconds(1));
407
408 EXPECT_EQ(::aos::monotonic_clock::epoch() + chrono::seconds(1),
Austin Schuh44019f92019-05-19 19:58:27 -0700409 event_loop->monotonic_now());
410 EXPECT_EQ(counter, 10);
411}
412
Austin Schuh7d87b672019-12-01 20:23:49 -0800413// Tests that watchers have latency in simulation.
414TEST(SimulatedEventLoopTest, WatcherTimingReport) {
415 SimulatedEventLoopTestFactory factory;
416 factory.set_send_delay(std::chrono::microseconds(50));
417
418 FLAGS_timing_report_ms = 1000;
419 auto loop1 = factory.MakePrimary("primary");
420 loop1->MakeWatcher("/test", [](const TestMessage &) {});
421
422 auto loop2 = factory.Make("sender_loop");
423
424 auto loop3 = factory.Make("report_fetcher");
425
426 Fetcher<timing::Report> report_fetcher =
427 loop3->MakeFetcher<timing::Report>("/aos");
428 EXPECT_FALSE(report_fetcher.Fetch());
429
430 auto sender = loop2->MakeSender<TestMessage>("/test");
431
432 // Send 10 messages in the middle of a timing report period so we get
433 // something interesting back.
434 auto test_timer = loop2->AddTimer([&sender]() {
435 for (int i = 0; i < 10; ++i) {
436 aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
437 TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
438 builder.add_value(200 + i);
milind1f1dca32021-07-03 13:50:07 -0700439 msg.CheckOk(msg.Send(builder.Finish()));
Austin Schuh7d87b672019-12-01 20:23:49 -0800440 }
441 });
442
443 // Quit after 1 timing report, mid way through the next cycle.
444 {
445 auto end_timer = loop1->AddTimer([&factory]() { factory.Exit(); });
446 end_timer->Setup(loop1->monotonic_now() + chrono::milliseconds(2500));
447 end_timer->set_name("end");
448 }
449
450 loop1->OnRun([&test_timer, &loop1]() {
451 test_timer->Setup(loop1->monotonic_now() + chrono::milliseconds(1500));
452 });
453
454 factory.Run();
455
456 // And, since we are here, check that the timing report makes sense.
457 // Start by looking for our event loop's timing.
458 FlatbufferDetachedBuffer<timing::Report> primary_report =
459 FlatbufferDetachedBuffer<timing::Report>::Empty();
460 while (report_fetcher.FetchNext()) {
461 LOG(INFO) << "Report " << FlatbufferToJson(report_fetcher.get());
462 if (report_fetcher->name()->string_view() == "primary") {
463 primary_report = CopyFlatBuffer(report_fetcher.get());
464 }
465 }
466
467 // Check the watcher report.
Ravago Jonescf453ab2020-05-06 21:14:53 -0700468 VLOG(1) << FlatbufferToJson(primary_report, {.multi_line = true});
Austin Schuh7d87b672019-12-01 20:23:49 -0800469
470 EXPECT_EQ(primary_report.message().name()->string_view(), "primary");
471
472 // Just the timing report timer.
473 ASSERT_NE(primary_report.message().timers(), nullptr);
474 EXPECT_EQ(primary_report.message().timers()->size(), 2);
475
476 // No phased loops
477 ASSERT_EQ(primary_report.message().phased_loops(), nullptr);
478
479 // And now confirm that the watcher received all 10 messages, and has latency.
480 ASSERT_NE(primary_report.message().watchers(), nullptr);
481 ASSERT_EQ(primary_report.message().watchers()->size(), 1);
482 EXPECT_EQ(primary_report.message().watchers()->Get(0)->count(), 10);
483 EXPECT_NEAR(
484 primary_report.message().watchers()->Get(0)->wakeup_latency()->average(),
485 0.00005, 1e-9);
486 EXPECT_NEAR(
487 primary_report.message().watchers()->Get(0)->wakeup_latency()->min(),
488 0.00005, 1e-9);
489 EXPECT_NEAR(
490 primary_report.message().watchers()->Get(0)->wakeup_latency()->max(),
491 0.00005, 1e-9);
492 EXPECT_EQ(primary_report.message()
493 .watchers()
494 ->Get(0)
495 ->wakeup_latency()
496 ->standard_deviation(),
497 0.0);
498
499 EXPECT_EQ(
500 primary_report.message().watchers()->Get(0)->handler_time()->average(),
501 0.0);
502 EXPECT_EQ(primary_report.message().watchers()->Get(0)->handler_time()->min(),
503 0.0);
504 EXPECT_EQ(primary_report.message().watchers()->Get(0)->handler_time()->max(),
505 0.0);
506 EXPECT_EQ(primary_report.message()
507 .watchers()
508 ->Get(0)
509 ->handler_time()
510 ->standard_deviation(),
511 0.0);
512}
513
Austin Schuh89c9b812021-02-20 14:42:10 -0800514size_t CountAll(
515 const std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
516 &counters) {
517 size_t count = 0u;
518 for (const std::unique_ptr<MessageCounter<RemoteMessage>> &counter :
519 counters) {
520 count += counter->count();
521 }
522 return count;
523}
524
Austin Schuh4c3b9702020-08-30 11:34:55 -0700525// Tests that ping and pong work when on 2 different nodes, and the message
526// gateway messages are sent out as expected.
Austin Schuh89c9b812021-02-20 14:42:10 -0800527TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodePingPong) {
Austin Schuh898f4972020-01-11 17:21:25 -0800528 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
529 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
Austin Schuh4c3b9702020-08-30 11:34:55 -0700530 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
Austin Schuh898f4972020-01-11 17:21:25 -0800531
532 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
533
534 std::unique_ptr<EventLoop> ping_event_loop =
535 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
536 Ping ping(ping_event_loop.get());
537
538 std::unique_ptr<EventLoop> pong_event_loop =
539 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
540 Pong pong(pong_event_loop.get());
541
542 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
543 simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700544 MessageCounter<examples::Pong> pi2_pong_counter(
545 pi2_pong_counter_event_loop.get(), "/test");
Austin Schuh2f8fd752020-09-01 22:38:28 -0700546 aos::Fetcher<message_bridge::Timestamp> pi1_on_pi2_timestamp_fetcher =
547 pi2_pong_counter_event_loop->MakeFetcher<message_bridge::Timestamp>(
548 "/pi1/aos");
549 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
550 pi2_pong_counter_event_loop->MakeFetcher<examples::Ping>("/test");
Austin Schuh898f4972020-01-11 17:21:25 -0800551
Austin Schuh4c3b9702020-08-30 11:34:55 -0700552 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
553 simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
Austin Schuh898f4972020-01-11 17:21:25 -0800554
555 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
556 simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700557 MessageCounter<examples::Pong> pi1_pong_counter(
558 pi1_pong_counter_event_loop.get(), "/test");
Austin Schuh2f8fd752020-09-01 22:38:28 -0700559 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
560 pi1_pong_counter_event_loop->MakeFetcher<examples::Ping>("/test");
561 aos::Fetcher<message_bridge::Timestamp> pi1_on_pi1_timestamp_fetcher =
562 pi1_pong_counter_event_loop->MakeFetcher<message_bridge::Timestamp>(
563 "/aos");
564
Austin Schuh4c3b9702020-08-30 11:34:55 -0700565 // Count timestamps.
566 MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
567 pi1_pong_counter_event_loop.get(), "/pi1/aos");
568 MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
569 pi2_pong_counter_event_loop.get(), "/pi1/aos");
570 MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
571 pi3_pong_counter_event_loop.get(), "/pi1/aos");
572 MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
573 pi1_pong_counter_event_loop.get(), "/pi2/aos");
574 MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
575 pi2_pong_counter_event_loop.get(), "/pi2/aos");
576 MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
577 pi1_pong_counter_event_loop.get(), "/pi3/aos");
578 MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
579 pi3_pong_counter_event_loop.get(), "/pi3/aos");
580
Austin Schuh2f8fd752020-09-01 22:38:28 -0700581 // Count remote timestamps
Austin Schuh89c9b812021-02-20 14:42:10 -0800582 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
583 remote_timestamps_pi2_on_pi1 =
584 MakePi2OnPi1MessageCounters(pi1_pong_counter_event_loop.get());
585 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
586 remote_timestamps_pi1_on_pi2 =
587 MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700588
Austin Schuh4c3b9702020-08-30 11:34:55 -0700589 // Wait to let timestamp estimation start up before looking for the results.
590 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
591
Austin Schuh8fb315a2020-11-19 22:33:58 -0800592 std::unique_ptr<EventLoop> pi1_statistics_counter_event_loop =
593 simulated_event_loop_factory.MakeEventLoop("pi1_statistics_counter", pi1);
594 std::unique_ptr<EventLoop> pi2_statistics_counter_event_loop =
595 simulated_event_loop_factory.MakeEventLoop("pi2_statistics_counter", pi2);
596 std::unique_ptr<EventLoop> pi3_statistics_counter_event_loop =
597 simulated_event_loop_factory.MakeEventLoop("pi3_statistics_counter", pi3);
598
Austin Schuh4c3b9702020-08-30 11:34:55 -0700599 int pi1_server_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800600 pi1_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700601 "/pi1/aos", [&pi1_server_statistics_count](
602 const message_bridge::ServerStatistics &stats) {
603 VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
604 EXPECT_EQ(stats.connections()->size(), 2u);
605 for (const message_bridge::ServerConnection *connection :
606 *stats.connections()) {
607 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800608 EXPECT_EQ(connection->connection_count(), 1u);
609 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800610 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700611 if (connection->node()->name()->string_view() == "pi2") {
612 EXPECT_GT(connection->sent_packets(), 50);
613 } else if (connection->node()->name()->string_view() == "pi3") {
614 EXPECT_GE(connection->sent_packets(), 5);
615 } else {
616 LOG(FATAL) << "Unknown connection";
617 }
618
619 EXPECT_TRUE(connection->has_monotonic_offset());
620 EXPECT_EQ(connection->monotonic_offset(), 0);
621 }
622 ++pi1_server_statistics_count;
623 });
624
625 int pi2_server_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800626 pi2_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700627 "/pi2/aos", [&pi2_server_statistics_count](
628 const message_bridge::ServerStatistics &stats) {
629 VLOG(1) << "pi2 ServerStatistics " << FlatbufferToJson(&stats);
630 EXPECT_EQ(stats.connections()->size(), 1u);
631
632 const message_bridge::ServerConnection *connection =
633 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800634 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700635 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
636 EXPECT_GT(connection->sent_packets(), 50);
637 EXPECT_TRUE(connection->has_monotonic_offset());
638 EXPECT_EQ(connection->monotonic_offset(), 0);
Austin Schuh367a7f42021-11-23 23:04:36 -0800639 EXPECT_EQ(connection->connection_count(), 1u);
640 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700641 ++pi2_server_statistics_count;
642 });
643
644 int pi3_server_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800645 pi3_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700646 "/pi3/aos", [&pi3_server_statistics_count](
647 const message_bridge::ServerStatistics &stats) {
648 VLOG(1) << "pi3 ServerStatistics " << FlatbufferToJson(&stats);
649 EXPECT_EQ(stats.connections()->size(), 1u);
650
651 const message_bridge::ServerConnection *connection =
652 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800653 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700654 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
655 EXPECT_GE(connection->sent_packets(), 5);
656 EXPECT_TRUE(connection->has_monotonic_offset());
657 EXPECT_EQ(connection->monotonic_offset(), 0);
Austin Schuh367a7f42021-11-23 23:04:36 -0800658 EXPECT_EQ(connection->connection_count(), 1u);
659 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700660 ++pi3_server_statistics_count;
661 });
662
663 int pi1_client_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800664 pi1_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700665 "/pi1/aos", [&pi1_client_statistics_count](
666 const message_bridge::ClientStatistics &stats) {
667 VLOG(1) << "pi1 ClientStatistics " << FlatbufferToJson(&stats);
668 EXPECT_EQ(stats.connections()->size(), 2u);
669
670 for (const message_bridge::ClientConnection *connection :
671 *stats.connections()) {
672 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
673 if (connection->node()->name()->string_view() == "pi2") {
674 EXPECT_GT(connection->received_packets(), 50);
675 } else if (connection->node()->name()->string_view() == "pi3") {
676 EXPECT_GE(connection->received_packets(), 5);
677 } else {
678 LOG(FATAL) << "Unknown connection";
679 }
680
Austin Schuhe61d4382021-03-31 21:33:02 -0700681 EXPECT_EQ(connection->partial_deliveries(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700682 EXPECT_TRUE(connection->has_monotonic_offset());
683 EXPECT_EQ(connection->monotonic_offset(), 150000);
Austin Schuh367a7f42021-11-23 23:04:36 -0800684 EXPECT_EQ(connection->connection_count(), 1u);
685 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700686 }
687 ++pi1_client_statistics_count;
688 });
689
690 int pi2_client_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800691 pi2_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700692 "/pi2/aos", [&pi2_client_statistics_count](
693 const message_bridge::ClientStatistics &stats) {
694 VLOG(1) << "pi2 ClientStatistics " << FlatbufferToJson(&stats);
695 EXPECT_EQ(stats.connections()->size(), 1u);
696
697 const message_bridge::ClientConnection *connection =
698 stats.connections()->Get(0);
699 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
700 EXPECT_GT(connection->received_packets(), 50);
Austin Schuhe61d4382021-03-31 21:33:02 -0700701 EXPECT_EQ(connection->partial_deliveries(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700702 EXPECT_TRUE(connection->has_monotonic_offset());
703 EXPECT_EQ(connection->monotonic_offset(), 150000);
Austin Schuh367a7f42021-11-23 23:04:36 -0800704 EXPECT_EQ(connection->connection_count(), 1u);
705 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700706 ++pi2_client_statistics_count;
707 });
708
709 int pi3_client_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800710 pi3_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700711 "/pi3/aos", [&pi3_client_statistics_count](
712 const message_bridge::ClientStatistics &stats) {
713 VLOG(1) << "pi3 ClientStatistics " << FlatbufferToJson(&stats);
714 EXPECT_EQ(stats.connections()->size(), 1u);
715
716 const message_bridge::ClientConnection *connection =
717 stats.connections()->Get(0);
718 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
719 EXPECT_GE(connection->received_packets(), 5);
Austin Schuhe61d4382021-03-31 21:33:02 -0700720 EXPECT_EQ(connection->partial_deliveries(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700721 EXPECT_TRUE(connection->has_monotonic_offset());
722 EXPECT_EQ(connection->monotonic_offset(), 150000);
Austin Schuh367a7f42021-11-23 23:04:36 -0800723 EXPECT_EQ(connection->connection_count(), 1u);
724 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700725 ++pi3_client_statistics_count;
726 });
727
Austin Schuh2f8fd752020-09-01 22:38:28 -0700728 // Find the channel index for both the /pi1/aos Timestamp channel and Ping
729 // channel.
730 const size_t pi1_timestamp_channel =
731 configuration::ChannelIndex(pi1_pong_counter_event_loop->configuration(),
732 pi1_on_pi2_timestamp_fetcher.channel());
733 const size_t ping_timestamp_channel =
734 configuration::ChannelIndex(pi1_pong_counter_event_loop->configuration(),
735 ping_on_pi2_fetcher.channel());
736
737 for (const Channel *channel :
738 *pi1_pong_counter_event_loop->configuration()->channels()) {
739 VLOG(1) << "Channel "
740 << configuration::ChannelIndex(
741 pi1_pong_counter_event_loop->configuration(), channel)
742 << " " << configuration::CleanedChannelToString(channel);
743 }
744
Austin Schuh8fb315a2020-11-19 22:33:58 -0800745 std::unique_ptr<EventLoop> pi1_remote_timestamp =
746 simulated_event_loop_factory.MakeEventLoop("pi1_remote_timestamp", pi1);
747
Austin Schuh89c9b812021-02-20 14:42:10 -0800748 for (std::pair<int, std::string> channel :
749 shared()
750 ? std::vector<std::pair<
751 int, std::string>>{{-1, "/pi1/aos/remote_timestamps/pi2"}}
752 : std::vector<std::pair<int, std::string>>{
753 {pi1_timestamp_channel,
754 "/pi1/aos/remote_timestamps/pi2/pi1/aos/"
755 "aos-message_bridge-Timestamp"},
756 {ping_timestamp_channel,
757 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping"}}) {
758 // For each remote timestamp we get back, confirm that it is either a ping
759 // message, or a timestamp we sent out. Also confirm that the timestamps
760 // are correct.
761 pi1_remote_timestamp->MakeWatcher(
762 channel.second,
763 [pi1_timestamp_channel, ping_timestamp_channel, &ping_on_pi2_fetcher,
764 &ping_on_pi1_fetcher, &pi1_on_pi2_timestamp_fetcher,
765 &pi1_on_pi1_timestamp_fetcher, &simulated_event_loop_factory, pi2,
766 channel_index = channel.first](const RemoteMessage &header) {
767 VLOG(1) << aos::FlatbufferToJson(&header);
768 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuhcdd90272021-03-15 12:46:16 -0700769 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
Austin Schuh89c9b812021-02-20 14:42:10 -0800770 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
Austin Schuhcdd90272021-03-15 12:46:16 -0700771 ->boot_uuid());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700772
Austin Schuh89c9b812021-02-20 14:42:10 -0800773 const aos::monotonic_clock::time_point header_monotonic_sent_time(
774 chrono::nanoseconds(header.monotonic_sent_time()));
775 const aos::realtime_clock::time_point header_realtime_sent_time(
776 chrono::nanoseconds(header.realtime_sent_time()));
777 const aos::monotonic_clock::time_point header_monotonic_remote_time(
778 chrono::nanoseconds(header.monotonic_remote_time()));
779 const aos::realtime_clock::time_point header_realtime_remote_time(
780 chrono::nanoseconds(header.realtime_remote_time()));
Austin Schuh2f8fd752020-09-01 22:38:28 -0700781
Austin Schuh89c9b812021-02-20 14:42:10 -0800782 if (channel_index != -1) {
783 ASSERT_EQ(channel_index, header.channel_index());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700784 }
785
Austin Schuh89c9b812021-02-20 14:42:10 -0800786 const Context *pi1_context = nullptr;
787 const Context *pi2_context = nullptr;
788
789 if (header.channel_index() == pi1_timestamp_channel) {
790 // Find the forwarded message.
791 while (pi1_on_pi2_timestamp_fetcher.context().monotonic_event_time <
792 header_monotonic_sent_time) {
793 ASSERT_TRUE(pi1_on_pi2_timestamp_fetcher.FetchNext());
794 }
795
796 // And the source message.
797 while (pi1_on_pi1_timestamp_fetcher.context().monotonic_event_time <
798 header_monotonic_remote_time) {
799 ASSERT_TRUE(pi1_on_pi1_timestamp_fetcher.FetchNext());
800 }
801
802 pi1_context = &pi1_on_pi1_timestamp_fetcher.context();
803 pi2_context = &pi1_on_pi2_timestamp_fetcher.context();
804 } else if (header.channel_index() == ping_timestamp_channel) {
805 // Find the forwarded message.
806 while (ping_on_pi2_fetcher.context().monotonic_event_time <
807 header_monotonic_sent_time) {
808 ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
809 }
810
811 // And the source message.
812 while (ping_on_pi1_fetcher.context().monotonic_event_time <
813 header_monotonic_remote_time) {
814 ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
815 }
816
817 pi1_context = &ping_on_pi1_fetcher.context();
818 pi2_context = &ping_on_pi2_fetcher.context();
819 } else {
820 LOG(FATAL) << "Unknown channel";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700821 }
822
Austin Schuh89c9b812021-02-20 14:42:10 -0800823 // Confirm the forwarded message has matching timestamps to the
824 // timestamps we got back.
825 EXPECT_EQ(pi2_context->queue_index, header.queue_index());
826 EXPECT_EQ(pi2_context->remote_queue_index,
827 header.remote_queue_index());
828 EXPECT_EQ(pi2_context->monotonic_event_time,
829 header_monotonic_sent_time);
830 EXPECT_EQ(pi2_context->realtime_event_time,
831 header_realtime_sent_time);
832 EXPECT_EQ(pi2_context->realtime_remote_time,
833 header_realtime_remote_time);
834 EXPECT_EQ(pi2_context->monotonic_remote_time,
835 header_monotonic_remote_time);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700836
Austin Schuh89c9b812021-02-20 14:42:10 -0800837 // Confirm the forwarded message also matches the source message.
838 EXPECT_EQ(pi1_context->queue_index, header.remote_queue_index());
839 EXPECT_EQ(pi1_context->monotonic_event_time,
840 header_monotonic_remote_time);
841 EXPECT_EQ(pi1_context->realtime_event_time,
842 header_realtime_remote_time);
843 });
844 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700845
Austin Schuh4c3b9702020-08-30 11:34:55 -0700846 simulated_event_loop_factory.RunFor(chrono::seconds(10) -
847 chrono::milliseconds(500) +
848 chrono::milliseconds(5));
849
850 EXPECT_EQ(pi1_pong_counter.count(), 1001);
851 EXPECT_EQ(pi2_pong_counter.count(), 1001);
852
853 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 100);
854 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 100);
855 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 100);
856 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 100);
857 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 100);
858 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 100);
859 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 100);
860
Austin Schuh20ac95d2020-12-05 17:24:19 -0800861 EXPECT_EQ(pi1_server_statistics_count, 10);
862 EXPECT_EQ(pi2_server_statistics_count, 10);
863 EXPECT_EQ(pi3_server_statistics_count, 10);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700864
865 EXPECT_EQ(pi1_client_statistics_count, 95);
866 EXPECT_EQ(pi2_client_statistics_count, 95);
867 EXPECT_EQ(pi3_client_statistics_count, 95);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700868
869 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -0800870 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 1101);
871 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 1101);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700872}
873
874// Tests that an offset between nodes can be recovered and shows up in
875// ServerStatistics correctly.
876TEST(SimulatedEventLoopTest, MultinodePingPongWithOffset) {
877 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
Austin Schuh373f1762021-06-02 21:07:09 -0700878 aos::configuration::ReadConfig(ArtifactPath(
879 "aos/events/multinode_pingpong_test_combined_config.json"));
Austin Schuh4c3b9702020-08-30 11:34:55 -0700880 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
Austin Schuh87dd3832021-01-01 23:07:31 -0800881 const size_t pi1_index = configuration::GetNodeIndex(&config.message(), pi1);
882 ASSERT_EQ(pi1_index, 0u);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700883 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
Austin Schuh87dd3832021-01-01 23:07:31 -0800884 const size_t pi2_index = configuration::GetNodeIndex(&config.message(), pi2);
885 ASSERT_EQ(pi2_index, 1u);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700886 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
Austin Schuh87dd3832021-01-01 23:07:31 -0800887 const size_t pi3_index = configuration::GetNodeIndex(&config.message(), pi3);
888 ASSERT_EQ(pi3_index, 2u);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700889
Austin Schuh87dd3832021-01-01 23:07:31 -0800890 message_bridge::TestingTimeConverter time(
891 configuration::NodesCount(&config.message()));
Austin Schuh4c3b9702020-08-30 11:34:55 -0700892 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
Austin Schuh58646e22021-08-23 23:51:46 -0700893 simulated_event_loop_factory.SetTimeConverter(&time);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700894
895 constexpr chrono::milliseconds kOffset{1501};
Austin Schuh87dd3832021-01-01 23:07:31 -0800896 time.AddNextTimestamp(
897 distributed_clock::epoch(),
Austin Schuh58646e22021-08-23 23:51:46 -0700898 {BootTimestamp::epoch(), BootTimestamp::epoch() + kOffset,
899 BootTimestamp::epoch()});
Austin Schuh4c3b9702020-08-30 11:34:55 -0700900
901 std::unique_ptr<EventLoop> ping_event_loop =
902 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
903 Ping ping(ping_event_loop.get());
904
905 std::unique_ptr<EventLoop> pong_event_loop =
906 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
907 Pong pong(pong_event_loop.get());
908
Austin Schuh8fb315a2020-11-19 22:33:58 -0800909 // Wait to let timestamp estimation start up before looking for the results.
910 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
911
Austin Schuh87dd3832021-01-01 23:07:31 -0800912 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
913 simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
914
Austin Schuh4c3b9702020-08-30 11:34:55 -0700915 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
916 simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
917
918 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
919 simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
920
Austin Schuh4c3b9702020-08-30 11:34:55 -0700921 // Confirm the offsets are being recovered correctly.
922 int pi1_server_statistics_count = 0;
923 pi1_pong_counter_event_loop->MakeWatcher(
924 "/pi1/aos", [&pi1_server_statistics_count,
925 kOffset](const message_bridge::ServerStatistics &stats) {
926 VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
927 EXPECT_EQ(stats.connections()->size(), 2u);
928 for (const message_bridge::ServerConnection *connection :
929 *stats.connections()) {
930 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800931 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700932 if (connection->node()->name()->string_view() == "pi2") {
933 EXPECT_EQ(connection->monotonic_offset(),
934 chrono::nanoseconds(kOffset).count());
935 } else if (connection->node()->name()->string_view() == "pi3") {
936 EXPECT_EQ(connection->monotonic_offset(), 0);
937 } else {
938 LOG(FATAL) << "Unknown connection";
939 }
940
941 EXPECT_TRUE(connection->has_monotonic_offset());
942 }
943 ++pi1_server_statistics_count;
944 });
945
946 int pi2_server_statistics_count = 0;
947 pi2_pong_counter_event_loop->MakeWatcher(
948 "/pi2/aos", [&pi2_server_statistics_count,
949 kOffset](const message_bridge::ServerStatistics &stats) {
950 VLOG(1) << "pi2 ServerStatistics " << FlatbufferToJson(&stats);
951 EXPECT_EQ(stats.connections()->size(), 1u);
952
953 const message_bridge::ServerConnection *connection =
954 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800955 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700956 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
957 EXPECT_TRUE(connection->has_monotonic_offset());
958 EXPECT_EQ(connection->monotonic_offset(),
959 -chrono::nanoseconds(kOffset).count());
960 ++pi2_server_statistics_count;
961 });
962
963 int pi3_server_statistics_count = 0;
964 pi3_pong_counter_event_loop->MakeWatcher(
965 "/pi3/aos", [&pi3_server_statistics_count](
966 const message_bridge::ServerStatistics &stats) {
967 VLOG(1) << "pi3 ServerStatistics " << FlatbufferToJson(&stats);
968 EXPECT_EQ(stats.connections()->size(), 1u);
969
970 const message_bridge::ServerConnection *connection =
971 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800972 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700973 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
974 EXPECT_TRUE(connection->has_monotonic_offset());
975 EXPECT_EQ(connection->monotonic_offset(), 0);
976 ++pi3_server_statistics_count;
977 });
978
979 simulated_event_loop_factory.RunFor(chrono::seconds(10) -
980 chrono::milliseconds(500) +
981 chrono::milliseconds(5));
982
Austin Schuh20ac95d2020-12-05 17:24:19 -0800983 EXPECT_EQ(pi1_server_statistics_count, 10);
Austin Schuh58646e22021-08-23 23:51:46 -0700984 EXPECT_EQ(pi2_server_statistics_count, 10);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800985 EXPECT_EQ(pi3_server_statistics_count, 10);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700986}
987
988// Test that disabling statistics actually disables them.
Austin Schuh89c9b812021-02-20 14:42:10 -0800989TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeWithoutStatistics) {
Austin Schuh4c3b9702020-08-30 11:34:55 -0700990 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
991 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
992 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
993
994 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
995 simulated_event_loop_factory.DisableStatistics();
996
997 std::unique_ptr<EventLoop> ping_event_loop =
998 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
999 Ping ping(ping_event_loop.get());
1000
1001 std::unique_ptr<EventLoop> pong_event_loop =
1002 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
1003 Pong pong(pong_event_loop.get());
1004
1005 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
1006 simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
1007
1008 MessageCounter<examples::Pong> pi2_pong_counter(
1009 pi2_pong_counter_event_loop.get(), "/test");
1010
1011 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
1012 simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
1013
1014 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
1015 simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
1016
1017 MessageCounter<examples::Pong> pi1_pong_counter(
1018 pi1_pong_counter_event_loop.get(), "/test");
1019
1020 // Count timestamps.
1021 MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
1022 pi1_pong_counter_event_loop.get(), "/pi1/aos");
1023 MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
1024 pi2_pong_counter_event_loop.get(), "/pi1/aos");
1025 MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
1026 pi3_pong_counter_event_loop.get(), "/pi1/aos");
1027 MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
1028 pi1_pong_counter_event_loop.get(), "/pi2/aos");
1029 MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
1030 pi2_pong_counter_event_loop.get(), "/pi2/aos");
1031 MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
1032 pi1_pong_counter_event_loop.get(), "/pi3/aos");
1033 MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
1034 pi3_pong_counter_event_loop.get(), "/pi3/aos");
1035
Austin Schuh2f8fd752020-09-01 22:38:28 -07001036 // Count remote timestamps
Austin Schuh89c9b812021-02-20 14:42:10 -08001037 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1038 remote_timestamps_pi2_on_pi1 =
1039 MakePi2OnPi1MessageCounters(pi1_pong_counter_event_loop.get());
1040 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1041 remote_timestamps_pi1_on_pi2 =
1042 MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
Austin Schuh2f8fd752020-09-01 22:38:28 -07001043
Austin Schuh4c3b9702020-08-30 11:34:55 -07001044 MessageCounter<message_bridge::ServerStatistics>
1045 pi1_server_statistics_counter(pi1_pong_counter_event_loop.get(),
1046 "/pi1/aos");
1047 MessageCounter<message_bridge::ServerStatistics>
1048 pi2_server_statistics_counter(pi2_pong_counter_event_loop.get(),
1049 "/pi2/aos");
1050 MessageCounter<message_bridge::ServerStatistics>
1051 pi3_server_statistics_counter(pi3_pong_counter_event_loop.get(),
1052 "/pi3/aos");
1053
1054 MessageCounter<message_bridge::ClientStatistics>
1055 pi1_client_statistics_counter(pi1_pong_counter_event_loop.get(),
1056 "/pi1/aos");
1057 MessageCounter<message_bridge::ClientStatistics>
1058 pi2_client_statistics_counter(pi2_pong_counter_event_loop.get(),
1059 "/pi2/aos");
1060 MessageCounter<message_bridge::ClientStatistics>
1061 pi3_client_statistics_counter(pi3_pong_counter_event_loop.get(),
1062 "/pi3/aos");
Austin Schuh898f4972020-01-11 17:21:25 -08001063
1064 simulated_event_loop_factory.RunFor(chrono::seconds(10) +
1065 chrono::milliseconds(5));
1066
Austin Schuh4c3b9702020-08-30 11:34:55 -07001067 EXPECT_EQ(pi1_pong_counter.count(), 1001u);
1068 EXPECT_EQ(pi2_pong_counter.count(), 1001u);
1069
1070 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 0u);
1071 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 0u);
1072 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 0u);
1073 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 0u);
1074 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 0u);
1075 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 0u);
1076 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 0u);
1077
1078 EXPECT_EQ(pi1_server_statistics_counter.count(), 0u);
1079 EXPECT_EQ(pi2_server_statistics_counter.count(), 0u);
1080 EXPECT_EQ(pi3_server_statistics_counter.count(), 0u);
1081
1082 EXPECT_EQ(pi1_client_statistics_counter.count(), 0u);
1083 EXPECT_EQ(pi2_client_statistics_counter.count(), 0u);
1084 EXPECT_EQ(pi3_client_statistics_counter.count(), 0u);
Austin Schuh2f8fd752020-09-01 22:38:28 -07001085
1086 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001087 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 1001);
1088 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 1001);
Austin Schuh898f4972020-01-11 17:21:25 -08001089}
1090
Austin Schuhc0b0f722020-12-12 18:36:06 -08001091bool AllConnected(const message_bridge::ServerStatistics *server_statistics) {
1092 for (const message_bridge::ServerConnection *connection :
1093 *server_statistics->connections()) {
1094 if (connection->state() != message_bridge::State::CONNECTED) {
1095 return false;
1096 }
1097 }
1098 return true;
1099}
1100
1101bool AllConnectedBut(const message_bridge::ServerStatistics *server_statistics,
1102 std::string_view target) {
1103 for (const message_bridge::ServerConnection *connection :
1104 *server_statistics->connections()) {
1105 if (connection->node()->name()->string_view() == target) {
1106 if (connection->state() == message_bridge::State::CONNECTED) {
1107 return false;
1108 }
1109 } else {
1110 if (connection->state() != message_bridge::State::CONNECTED) {
1111 return false;
1112 }
1113 }
1114 }
1115 return true;
1116}
1117
1118bool AllConnected(const message_bridge::ClientStatistics *client_statistics) {
1119 for (const message_bridge::ClientConnection *connection :
1120 *client_statistics->connections()) {
1121 if (connection->state() != message_bridge::State::CONNECTED) {
1122 return false;
1123 }
Austin Schuh367a7f42021-11-23 23:04:36 -08001124 EXPECT_TRUE(connection->has_boot_uuid());
1125 EXPECT_TRUE(connection->has_connected_since_time());
1126 EXPECT_TRUE(connection->has_connection_count());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001127 }
1128 return true;
1129}
1130
1131bool AllConnectedBut(const message_bridge::ClientStatistics *client_statistics,
1132 std::string_view target) {
1133 for (const message_bridge::ClientConnection *connection :
1134 *client_statistics->connections()) {
1135 if (connection->node()->name()->string_view() == target) {
1136 if (connection->state() == message_bridge::State::CONNECTED) {
1137 return false;
1138 }
Austin Schuh367a7f42021-11-23 23:04:36 -08001139 EXPECT_FALSE(connection->has_boot_uuid());
1140 EXPECT_FALSE(connection->has_connected_since_time());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001141 } else {
1142 if (connection->state() != message_bridge::State::CONNECTED) {
1143 return false;
1144 }
Austin Schuh367a7f42021-11-23 23:04:36 -08001145 EXPECT_TRUE(connection->has_boot_uuid());
1146 EXPECT_TRUE(connection->has_connected_since_time());
1147 EXPECT_TRUE(connection->has_connection_count());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001148 }
1149 }
1150 return true;
1151}
1152
Austin Schuh367a7f42021-11-23 23:04:36 -08001153int ConnectedCount(const message_bridge::ClientStatistics *client_statistics,
1154 std::string_view target) {
1155 for (const message_bridge::ClientConnection *connection :
1156 *client_statistics->connections()) {
1157 if (connection->node()->name()->string_view() == target) {
1158 return connection->connection_count();
1159 }
1160 }
1161 return 0;
1162}
1163
1164int ConnectedCount(const message_bridge::ServerStatistics *server_statistics,
1165 std::string_view target) {
1166 for (const message_bridge::ServerConnection *connection :
1167 *server_statistics->connections()) {
1168 if (connection->node()->name()->string_view() == target) {
1169 return connection->connection_count();
1170 }
1171 }
1172 return 0;
1173}
1174
Austin Schuhc0b0f722020-12-12 18:36:06 -08001175// Test that disconnecting nodes actually disconnects them.
Austin Schuh89c9b812021-02-20 14:42:10 -08001176TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeDisconnect) {
Austin Schuhc0b0f722020-12-12 18:36:06 -08001177 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
1178
Austin Schuh58646e22021-08-23 23:51:46 -07001179 NodeEventLoopFactory *pi1 =
1180 simulated_event_loop_factory.GetNodeEventLoopFactory("pi1");
1181 NodeEventLoopFactory *pi2 =
1182 simulated_event_loop_factory.GetNodeEventLoopFactory("pi2");
1183 NodeEventLoopFactory *pi3 =
1184 simulated_event_loop_factory.GetNodeEventLoopFactory("pi3");
1185
1186 std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001187 Ping ping(ping_event_loop.get());
1188
Austin Schuh58646e22021-08-23 23:51:46 -07001189 std::unique_ptr<EventLoop> pong_event_loop = pi2->MakeEventLoop("pong");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001190 Pong pong(pong_event_loop.get());
1191
1192 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
Austin Schuh58646e22021-08-23 23:51:46 -07001193 pi2->MakeEventLoop("pi2_pong_counter");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001194
1195 MessageCounter<examples::Pong> pi2_pong_counter(
1196 pi2_pong_counter_event_loop.get(), "/test");
1197
1198 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
Austin Schuh58646e22021-08-23 23:51:46 -07001199 pi3->MakeEventLoop("pi3_pong_counter");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001200
1201 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
Austin Schuh58646e22021-08-23 23:51:46 -07001202 pi1->MakeEventLoop("pi1_pong_counter");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001203
1204 MessageCounter<examples::Pong> pi1_pong_counter(
1205 pi1_pong_counter_event_loop.get(), "/test");
1206
1207 // Count timestamps.
1208 MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
1209 pi1_pong_counter_event_loop.get(), "/pi1/aos");
1210 MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
1211 pi2_pong_counter_event_loop.get(), "/pi1/aos");
1212 MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
1213 pi3_pong_counter_event_loop.get(), "/pi1/aos");
1214 MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
1215 pi1_pong_counter_event_loop.get(), "/pi2/aos");
1216 MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
1217 pi2_pong_counter_event_loop.get(), "/pi2/aos");
1218 MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
1219 pi1_pong_counter_event_loop.get(), "/pi3/aos");
1220 MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
1221 pi3_pong_counter_event_loop.get(), "/pi3/aos");
1222
1223 // Count remote timestamps
Austin Schuh89c9b812021-02-20 14:42:10 -08001224 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1225 remote_timestamps_pi2_on_pi1 =
1226 MakePi2OnPi1MessageCounters(pi1_pong_counter_event_loop.get());
1227 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1228 remote_timestamps_pi1_on_pi2 =
1229 MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001230
1231 MessageCounter<message_bridge::ServerStatistics>
Austin Schuh58646e22021-08-23 23:51:46 -07001232 *pi1_server_statistics_counter;
1233 pi1->OnStartup([pi1, &pi1_server_statistics_counter]() {
1234 pi1_server_statistics_counter =
1235 pi1->AlwaysStart<MessageCounter<message_bridge::ServerStatistics>>(
1236 "pi1_server_statistics_counter", "/pi1/aos");
1237 });
1238
Austin Schuhc0b0f722020-12-12 18:36:06 -08001239 aos::Fetcher<message_bridge::ServerStatistics> pi1_server_statistics_fetcher =
1240 pi1_pong_counter_event_loop
1241 ->MakeFetcher<message_bridge::ServerStatistics>("/pi1/aos");
1242 aos::Fetcher<message_bridge::ClientStatistics> pi1_client_statistics_fetcher =
1243 pi1_pong_counter_event_loop
1244 ->MakeFetcher<message_bridge::ClientStatistics>("/pi1/aos");
1245
1246 MessageCounter<message_bridge::ServerStatistics>
Austin Schuh58646e22021-08-23 23:51:46 -07001247 *pi2_server_statistics_counter;
1248 pi2->OnStartup([pi2, &pi2_server_statistics_counter]() {
1249 pi2_server_statistics_counter =
1250 pi2->AlwaysStart<MessageCounter<message_bridge::ServerStatistics>>(
1251 "pi2_server_statistics_counter", "/pi2/aos");
1252 });
Austin Schuhc0b0f722020-12-12 18:36:06 -08001253 aos::Fetcher<message_bridge::ServerStatistics> pi2_server_statistics_fetcher =
1254 pi2_pong_counter_event_loop
1255 ->MakeFetcher<message_bridge::ServerStatistics>("/pi2/aos");
1256 aos::Fetcher<message_bridge::ClientStatistics> pi2_client_statistics_fetcher =
1257 pi2_pong_counter_event_loop
1258 ->MakeFetcher<message_bridge::ClientStatistics>("/pi2/aos");
1259
1260 MessageCounter<message_bridge::ServerStatistics>
Austin Schuh58646e22021-08-23 23:51:46 -07001261 *pi3_server_statistics_counter;
1262 pi3->OnStartup([pi3, &pi3_server_statistics_counter]() {
1263 pi3_server_statistics_counter =
1264 pi3->AlwaysStart<MessageCounter<message_bridge::ServerStatistics>>(
1265 "pi3_server_statistics_counter", "/pi3/aos");
1266 });
Austin Schuhc0b0f722020-12-12 18:36:06 -08001267 aos::Fetcher<message_bridge::ServerStatistics> pi3_server_statistics_fetcher =
1268 pi3_pong_counter_event_loop
1269 ->MakeFetcher<message_bridge::ServerStatistics>("/pi3/aos");
1270 aos::Fetcher<message_bridge::ClientStatistics> pi3_client_statistics_fetcher =
1271 pi3_pong_counter_event_loop
1272 ->MakeFetcher<message_bridge::ClientStatistics>("/pi3/aos");
1273
1274 MessageCounter<message_bridge::ClientStatistics>
1275 pi1_client_statistics_counter(pi1_pong_counter_event_loop.get(),
1276 "/pi1/aos");
1277 MessageCounter<message_bridge::ClientStatistics>
1278 pi2_client_statistics_counter(pi2_pong_counter_event_loop.get(),
1279 "/pi2/aos");
1280 MessageCounter<message_bridge::ClientStatistics>
1281 pi3_client_statistics_counter(pi3_pong_counter_event_loop.get(),
1282 "/pi3/aos");
1283
James Kuszmaul86e86c32022-07-21 17:39:47 -07001284 std::vector<std::unique_ptr<aos::EventLoop>> statistics_watcher_loops;
1285 statistics_watcher_loops.emplace_back(pi1->MakeEventLoop("test"));
1286 statistics_watcher_loops.emplace_back(pi2->MakeEventLoop("test"));
1287 statistics_watcher_loops.emplace_back(pi3->MakeEventLoop("test"));
1288 // The currenct contract is that, if all nodes boot simultaneously in
1289 // simulation, that they should all act as if they area already connected,
1290 // without ever observing the transition from disconnected to connected (note
1291 // that on a real system the ServerStatistics message will get resent for each
1292 // and every new connection, even if the new connections happen
1293 // "simultaneously"--in simulation, we are essentially acting as if we are
1294 // starting execution in an already running system, rather than observing the
1295 // boot process).
1296 for (auto &event_loop : statistics_watcher_loops) {
1297 event_loop->MakeWatcher(
1298 "/aos", [](const message_bridge::ServerStatistics &msg) {
1299 for (const message_bridge::ServerConnection *connection :
1300 *msg.connections()) {
1301 EXPECT_EQ(message_bridge::State::CONNECTED, connection->state())
1302 << connection->node()->name()->string_view();
1303 }
1304 });
1305 }
1306
Austin Schuhc0b0f722020-12-12 18:36:06 -08001307 simulated_event_loop_factory.RunFor(chrono::seconds(2) +
1308 chrono::milliseconds(5));
1309
James Kuszmaul86e86c32022-07-21 17:39:47 -07001310 statistics_watcher_loops.clear();
1311
Austin Schuhc0b0f722020-12-12 18:36:06 -08001312 EXPECT_EQ(pi1_pong_counter.count(), 201u);
1313 EXPECT_EQ(pi2_pong_counter.count(), 201u);
1314
1315 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 20u);
1316 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 20u);
1317 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 20u);
1318 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 20u);
1319 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 20u);
1320 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 20u);
1321 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 20u);
1322
Austin Schuh58646e22021-08-23 23:51:46 -07001323 EXPECT_EQ(pi1_server_statistics_counter->count(), 2u);
1324 EXPECT_EQ(pi2_server_statistics_counter->count(), 2u);
1325 EXPECT_EQ(pi3_server_statistics_counter->count(), 2u);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001326
1327 EXPECT_EQ(pi1_client_statistics_counter.count(), 20u);
1328 EXPECT_EQ(pi2_client_statistics_counter.count(), 20u);
1329 EXPECT_EQ(pi3_client_statistics_counter.count(), 20u);
1330
1331 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001332 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 221);
1333 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 221);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001334
1335 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1336 EXPECT_TRUE(AllConnected(pi1_server_statistics_fetcher.get()))
1337 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1338 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1339 EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
1340 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1341 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1342 EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
1343 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1344 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1345 EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
1346 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1347 EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
1348 EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
1349 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
1350 EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
1351 EXPECT_TRUE(AllConnected(pi3_client_statistics_fetcher.get()))
1352 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1353
Austin Schuh58646e22021-08-23 23:51:46 -07001354 pi1->Disconnect(pi3->node());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001355
1356 simulated_event_loop_factory.RunFor(chrono::seconds(2));
1357
1358 EXPECT_EQ(pi1_pong_counter.count(), 401u);
1359 EXPECT_EQ(pi2_pong_counter.count(), 401u);
1360
1361 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 40u);
1362 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 40u);
1363 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 20u);
1364 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 40u);
1365 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 40u);
1366 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 40u);
1367 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 40u);
1368
Austin Schuh58646e22021-08-23 23:51:46 -07001369 EXPECT_EQ(pi1_server_statistics_counter->count(), 4u);
1370 EXPECT_EQ(pi2_server_statistics_counter->count(), 4u);
1371 EXPECT_EQ(pi3_server_statistics_counter->count(), 4u);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001372
1373 EXPECT_EQ(pi1_client_statistics_counter.count(), 40u);
1374 EXPECT_EQ(pi2_client_statistics_counter.count(), 40u);
1375 EXPECT_EQ(pi3_client_statistics_counter.count(), 40u);
1376
1377 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001378 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 441);
1379 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 441);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001380
1381 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1382 EXPECT_TRUE(AllConnectedBut(pi1_server_statistics_fetcher.get(), "pi3"))
1383 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1384 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1385 EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
1386 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1387 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1388 EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
1389 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1390 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1391 EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
1392 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1393 EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
1394 EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
1395 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
1396 EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
1397 EXPECT_TRUE(AllConnectedBut(pi3_client_statistics_fetcher.get(), "pi1"))
1398 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1399
Austin Schuh58646e22021-08-23 23:51:46 -07001400 pi1->Connect(pi3->node());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001401
1402 simulated_event_loop_factory.RunFor(chrono::seconds(2));
1403
Austin Schuh367a7f42021-11-23 23:04:36 -08001404 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1405 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1406 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1407 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1408 EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
1409 EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
1410
1411 EXPECT_EQ(ConnectedCount(pi1_server_statistics_fetcher.get(), "pi3"), 2u)
1412 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1413 EXPECT_EQ(ConnectedCount(pi1_server_statistics_fetcher.get(), "pi2"), 1u)
1414 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1415 EXPECT_EQ(ConnectedCount(pi1_client_statistics_fetcher.get(), "pi3"), 1u)
1416 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1417 EXPECT_EQ(ConnectedCount(pi1_client_statistics_fetcher.get(), "pi2"), 1u)
1418 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1419
1420 EXPECT_EQ(ConnectedCount(pi2_server_statistics_fetcher.get(), "pi1"), 1u)
1421 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1422 EXPECT_EQ(ConnectedCount(pi2_client_statistics_fetcher.get(), "pi1"), 1u)
1423 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1424
1425 EXPECT_EQ(ConnectedCount(pi3_server_statistics_fetcher.get(), "pi1"), 1u)
1426 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
1427 EXPECT_EQ(ConnectedCount(pi3_client_statistics_fetcher.get(), "pi1"), 2u)
1428 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1429
Austin Schuhc0b0f722020-12-12 18:36:06 -08001430 EXPECT_EQ(pi1_pong_counter.count(), 601u);
1431 EXPECT_EQ(pi2_pong_counter.count(), 601u);
1432
1433 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 60u);
1434 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 60u);
1435 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 40u);
1436 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 60u);
1437 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 60u);
1438 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 60u);
1439 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 60u);
1440
Austin Schuh58646e22021-08-23 23:51:46 -07001441 EXPECT_EQ(pi1_server_statistics_counter->count(), 6u);
1442 EXPECT_EQ(pi2_server_statistics_counter->count(), 6u);
1443 EXPECT_EQ(pi3_server_statistics_counter->count(), 6u);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001444
1445 EXPECT_EQ(pi1_client_statistics_counter.count(), 60u);
1446 EXPECT_EQ(pi2_client_statistics_counter.count(), 60u);
1447 EXPECT_EQ(pi3_client_statistics_counter.count(), 60u);
1448
1449 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001450 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 661);
1451 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 661);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001452
Austin Schuhc0b0f722020-12-12 18:36:06 -08001453 EXPECT_TRUE(AllConnected(pi1_server_statistics_fetcher.get()))
1454 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001455 EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
1456 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001457 EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
1458 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001459 EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
1460 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001461 EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
1462 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001463 EXPECT_TRUE(AllConnected(pi3_client_statistics_fetcher.get()))
1464 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1465}
1466
Austin Schuh2febf0d2020-09-21 22:24:30 -07001467// Tests that the time offset having a slope doesn't break the world.
1468// SimulatedMessageBridge has enough self consistency CHECK statements to
1469// confirm, and we can can also check a message in each direction to make sure
1470// it gets delivered as expected.
1471TEST(SimulatedEventLoopTest, MultinodePingPongWithOffsetAndSlope) {
1472 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
Austin Schuh373f1762021-06-02 21:07:09 -07001473 aos::configuration::ReadConfig(ArtifactPath(
1474 "aos/events/multinode_pingpong_test_combined_config.json"));
Austin Schuh2febf0d2020-09-21 22:24:30 -07001475 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
Austin Schuh87dd3832021-01-01 23:07:31 -08001476 const size_t pi1_index = configuration::GetNodeIndex(&config.message(), pi1);
1477 ASSERT_EQ(pi1_index, 0u);
Austin Schuh2febf0d2020-09-21 22:24:30 -07001478 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
Austin Schuh87dd3832021-01-01 23:07:31 -08001479 const size_t pi2_index = configuration::GetNodeIndex(&config.message(), pi2);
1480 ASSERT_EQ(pi2_index, 1u);
1481 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
1482 const size_t pi3_index = configuration::GetNodeIndex(&config.message(), pi3);
1483 ASSERT_EQ(pi3_index, 2u);
Austin Schuh2febf0d2020-09-21 22:24:30 -07001484
Austin Schuh87dd3832021-01-01 23:07:31 -08001485 message_bridge::TestingTimeConverter time(
1486 configuration::NodesCount(&config.message()));
Austin Schuh2febf0d2020-09-21 22:24:30 -07001487 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
Austin Schuh58646e22021-08-23 23:51:46 -07001488 simulated_event_loop_factory.SetTimeConverter(&time);
Austin Schuh2febf0d2020-09-21 22:24:30 -07001489
Austin Schuh2febf0d2020-09-21 22:24:30 -07001490 constexpr chrono::milliseconds kOffset{150100};
Austin Schuh87dd3832021-01-01 23:07:31 -08001491 time.AddNextTimestamp(
1492 distributed_clock::epoch(),
Austin Schuh58646e22021-08-23 23:51:46 -07001493 {BootTimestamp::epoch(), BootTimestamp::epoch() + kOffset,
1494 BootTimestamp::epoch()});
1495 time.AddNextTimestamp(distributed_clock::epoch() + chrono::seconds(10),
1496 {BootTimestamp::epoch() + chrono::milliseconds(9999),
1497 BootTimestamp::epoch() + kOffset + chrono::seconds(10),
1498 BootTimestamp::epoch() + chrono::milliseconds(9999)});
Austin Schuh2febf0d2020-09-21 22:24:30 -07001499
1500 std::unique_ptr<EventLoop> ping_event_loop =
1501 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
1502 Ping ping(ping_event_loop.get());
1503
1504 std::unique_ptr<EventLoop> pong_event_loop =
1505 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
1506 Pong pong(pong_event_loop.get());
1507
1508 std::unique_ptr<EventLoop> pi1_counter_event_loop =
1509 simulated_event_loop_factory.MakeEventLoop("pi1_counter", pi1);
1510 std::unique_ptr<EventLoop> pi2_counter_event_loop =
1511 simulated_event_loop_factory.MakeEventLoop("pi2_counter", pi2);
1512
1513 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
1514 pi1_counter_event_loop->MakeFetcher<examples::Ping>("/test");
1515 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
1516 pi2_counter_event_loop->MakeFetcher<examples::Ping>("/test");
1517
1518 aos::Fetcher<examples::Pong> pong_on_pi2_fetcher =
1519 pi2_counter_event_loop->MakeFetcher<examples::Pong>("/test");
1520 aos::Fetcher<examples::Pong> pong_on_pi1_fetcher =
1521 pi1_counter_event_loop->MakeFetcher<examples::Pong>("/test");
1522
1523 // End after a pong message comes back. This will leave the latest messages
1524 // on all channels so we can look at timestamps easily and check they make
1525 // sense.
1526 std::unique_ptr<EventLoop> pi1_pong_ender =
1527 simulated_event_loop_factory.MakeEventLoop("pi2_counter", pi1);
1528 int count = 0;
1529 pi1_pong_ender->MakeWatcher(
1530 "/test", [&simulated_event_loop_factory, &count](const examples::Pong &) {
1531 if (++count == 100) {
1532 simulated_event_loop_factory.Exit();
1533 }
1534 });
1535
1536 // Run enough that messages should be delivered.
1537 simulated_event_loop_factory.Run();
1538
1539 // Grab the latest messages.
1540 EXPECT_TRUE(ping_on_pi1_fetcher.Fetch());
1541 EXPECT_TRUE(ping_on_pi2_fetcher.Fetch());
1542 EXPECT_TRUE(pong_on_pi1_fetcher.Fetch());
1543 EXPECT_TRUE(pong_on_pi2_fetcher.Fetch());
1544
1545 // Compute their time on the global distributed clock so we can compute
1546 // distance betwen them.
1547 const distributed_clock::time_point pi1_ping_time =
1548 simulated_event_loop_factory.GetNodeEventLoopFactory(pi1)
1549 ->ToDistributedClock(
1550 ping_on_pi1_fetcher.context().monotonic_event_time);
1551 const distributed_clock::time_point pi2_ping_time =
1552 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
1553 ->ToDistributedClock(
1554 ping_on_pi2_fetcher.context().monotonic_event_time);
1555 const distributed_clock::time_point pi1_pong_time =
1556 simulated_event_loop_factory.GetNodeEventLoopFactory(pi1)
1557 ->ToDistributedClock(
1558 pong_on_pi1_fetcher.context().monotonic_event_time);
1559 const distributed_clock::time_point pi2_pong_time =
1560 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
1561 ->ToDistributedClock(
1562 pong_on_pi2_fetcher.context().monotonic_event_time);
1563
1564 // And confirm the delivery delay is just about exactly 150 uS for both
1565 // directions like expected. There will be a couple ns of rounding errors in
1566 // the conversion functions that aren't worth accounting for right now. This
1567 // will either be really close, or really far.
1568 EXPECT_GE(pi2_ping_time, chrono::microseconds(150) - chrono::nanoseconds(10) +
1569 pi1_ping_time);
1570 EXPECT_LE(pi2_ping_time, chrono::microseconds(150) + chrono::nanoseconds(10) +
1571 pi1_ping_time);
1572
1573 EXPECT_GE(pi1_pong_time, chrono::microseconds(150) - chrono::nanoseconds(10) +
1574 pi2_pong_time);
1575 EXPECT_LE(pi1_pong_time, chrono::microseconds(150) + chrono::nanoseconds(10) +
1576 pi2_pong_time);
1577}
1578
Austin Schuh4c570ea2020-11-19 23:13:24 -08001579void SendPing(aos::Sender<examples::Ping> *sender, int value) {
1580 aos::Sender<examples::Ping>::Builder builder = sender->MakeBuilder();
1581 examples::Ping::Builder ping_builder = builder.MakeBuilder<examples::Ping>();
1582 ping_builder.add_value(value);
milind1f1dca32021-07-03 13:50:07 -07001583 builder.CheckOk(builder.Send(ping_builder.Finish()));
Austin Schuh4c570ea2020-11-19 23:13:24 -08001584}
1585
1586// Tests that reliable (and unreliable) ping messages get forwarded as expected.
Austin Schuh89c9b812021-02-20 14:42:10 -08001587TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeStartupTesting) {
Austin Schuh4c570ea2020-11-19 23:13:24 -08001588 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
1589 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
1590
1591 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
1592
1593 std::unique_ptr<EventLoop> ping_event_loop =
1594 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
1595 aos::Sender<examples::Ping> pi1_reliable_sender =
1596 ping_event_loop->MakeSender<examples::Ping>("/reliable");
1597 aos::Sender<examples::Ping> pi1_unreliable_sender =
1598 ping_event_loop->MakeSender<examples::Ping>("/unreliable");
1599 SendPing(&pi1_reliable_sender, 1);
1600 SendPing(&pi1_unreliable_sender, 1);
1601
1602 std::unique_ptr<EventLoop> pi2_pong_event_loop =
1603 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
James Kuszmaul86e86c32022-07-21 17:39:47 -07001604 aos::Sender<examples::Ping> pi2_reliable_sender =
1605 pi2_pong_event_loop->MakeSender<examples::Ping>("/reliable2");
1606 SendPing(&pi2_reliable_sender, 1);
Austin Schuh4c570ea2020-11-19 23:13:24 -08001607 MessageCounter<examples::Ping> pi2_reliable_counter(pi2_pong_event_loop.get(),
1608 "/reliable");
James Kuszmaul86e86c32022-07-21 17:39:47 -07001609 MessageCounter<examples::Ping> pi1_reliable_counter(ping_event_loop.get(),
1610 "/reliable2");
Austin Schuh4c570ea2020-11-19 23:13:24 -08001611 MessageCounter<examples::Ping> pi2_unreliable_counter(
1612 pi2_pong_event_loop.get(), "/unreliable");
1613 aos::Fetcher<examples::Ping> reliable_on_pi2_fetcher =
1614 pi2_pong_event_loop->MakeFetcher<examples::Ping>("/reliable");
1615 aos::Fetcher<examples::Ping> unreliable_on_pi2_fetcher =
1616 pi2_pong_event_loop->MakeFetcher<examples::Ping>("/unreliable");
1617
1618 const size_t reliable_channel_index = configuration::ChannelIndex(
1619 pi2_pong_event_loop->configuration(), reliable_on_pi2_fetcher.channel());
1620
1621 std::unique_ptr<EventLoop> pi1_remote_timestamp =
1622 simulated_event_loop_factory.MakeEventLoop("pi1_remote_timestamp", pi1);
1623
Austin Schuheeaa2022021-01-02 21:52:03 -08001624 const chrono::nanoseconds network_delay =
1625 simulated_event_loop_factory.network_delay();
1626
Austin Schuh4c570ea2020-11-19 23:13:24 -08001627 int reliable_timestamp_count = 0;
1628 pi1_remote_timestamp->MakeWatcher(
Austin Schuh89c9b812021-02-20 14:42:10 -08001629 shared() ? "/pi1/aos/remote_timestamps/pi2"
1630 : "/pi1/aos/remote_timestamps/pi2/reliable/aos-examples-Ping",
Austin Schuh20ac95d2020-12-05 17:24:19 -08001631 [reliable_channel_index, &reliable_timestamp_count,
Austin Schuheeaa2022021-01-02 21:52:03 -08001632 &simulated_event_loop_factory, pi2, network_delay, &pi2_pong_event_loop,
1633 &pi1_remote_timestamp](const RemoteMessage &header) {
Austin Schuh20ac95d2020-12-05 17:24:19 -08001634 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuhcdd90272021-03-15 12:46:16 -07001635 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
Austin Schuh20ac95d2020-12-05 17:24:19 -08001636 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
Austin Schuhcdd90272021-03-15 12:46:16 -07001637 ->boot_uuid());
Austin Schuh4c570ea2020-11-19 23:13:24 -08001638 VLOG(1) << aos::FlatbufferToJson(&header);
1639 if (header.channel_index() == reliable_channel_index) {
1640 ++reliable_timestamp_count;
1641 }
Austin Schuheeaa2022021-01-02 21:52:03 -08001642
1643 const aos::monotonic_clock::time_point header_monotonic_sent_time(
1644 chrono::nanoseconds(header.monotonic_sent_time()));
1645
1646 EXPECT_EQ(pi1_remote_timestamp->context().monotonic_event_time,
1647 header_monotonic_sent_time + network_delay +
1648 (pi1_remote_timestamp->monotonic_now() -
1649 pi2_pong_event_loop->monotonic_now()));
Austin Schuh4c570ea2020-11-19 23:13:24 -08001650 });
1651
1652 // Wait to let timestamp estimation start up before looking for the results.
1653 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
1654
1655 EXPECT_EQ(pi2_reliable_counter.count(), 1u);
1656 // This one isn't reliable, but was sent before the start. It should *not* be
1657 // delivered.
1658 EXPECT_EQ(pi2_unreliable_counter.count(), 0u);
1659 // Confirm we got a timestamp logged for the message that was forwarded.
1660 EXPECT_EQ(reliable_timestamp_count, 1u);
1661
1662 SendPing(&pi1_reliable_sender, 2);
1663 SendPing(&pi1_unreliable_sender, 2);
1664 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
1665 EXPECT_EQ(pi2_reliable_counter.count(), 2u);
James Kuszmaul86e86c32022-07-21 17:39:47 -07001666 EXPECT_EQ(pi1_reliable_counter.count(), 1u);
Austin Schuh4c570ea2020-11-19 23:13:24 -08001667 EXPECT_EQ(pi2_unreliable_counter.count(), 1u);
1668
1669 EXPECT_EQ(reliable_timestamp_count, 2u);
1670}
1671
Austin Schuh20ac95d2020-12-05 17:24:19 -08001672// Tests that rebooting a node changes the ServerStatistics message and the
1673// RemoteTimestamp message.
Austin Schuh89c9b812021-02-20 14:42:10 -08001674TEST_P(RemoteMessageSimulatedEventLoopTest, BootUUIDTest) {
Austin Schuh72e65682021-09-02 11:37:05 -07001675 const UUID pi1_boot0 = UUID::Random();
1676 const UUID pi2_boot0 = UUID::Random();
1677 const UUID pi2_boot1 = UUID::Random();
1678 const UUID pi3_boot0 = UUID::Random();
1679 UUID expected_boot_uuid = pi2_boot0;
Austin Schuh20ac95d2020-12-05 17:24:19 -08001680
Austin Schuh58646e22021-08-23 23:51:46 -07001681 message_bridge::TestingTimeConverter time(
1682 configuration::NodesCount(&config.message()));
1683 SimulatedEventLoopFactory factory(&config.message());
1684 factory.SetTimeConverter(&time);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001685
Austin Schuh58646e22021-08-23 23:51:46 -07001686 const size_t pi1_index =
1687 configuration::GetNodeIndex(&config.message(), "pi1");
1688 const size_t pi2_index =
1689 configuration::GetNodeIndex(&config.message(), "pi2");
1690 const size_t pi3_index =
1691 configuration::GetNodeIndex(&config.message(), "pi3");
Austin Schuh20ac95d2020-12-05 17:24:19 -08001692
Austin Schuh58646e22021-08-23 23:51:46 -07001693 {
1694 time.AddNextTimestamp(distributed_clock::epoch(),
1695 {BootTimestamp::epoch(), BootTimestamp::epoch(),
1696 BootTimestamp::epoch()});
1697
1698 const chrono::nanoseconds dt = chrono::milliseconds(2001);
1699
1700 time.AddNextTimestamp(
1701 distributed_clock::epoch() + dt,
1702 {BootTimestamp::epoch() + dt,
1703 BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
1704 BootTimestamp::epoch() + dt});
1705
1706 time.set_boot_uuid(pi1_index, 0, pi1_boot0);
1707 time.set_boot_uuid(pi2_index, 0, pi2_boot0);
1708 time.set_boot_uuid(pi2_index, 1, pi2_boot1);
1709 time.set_boot_uuid(pi3_index, 0, pi3_boot0);
1710 }
1711
1712 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1713 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
1714
1715 pi1->OnStartup([pi1]() { pi1->AlwaysStart<Ping>("ping"); });
1716 pi2->OnStartup([pi2]() { pi2->AlwaysStart<Pong>("pong"); });
Austin Schuh20ac95d2020-12-05 17:24:19 -08001717
1718 std::unique_ptr<EventLoop> pi1_remote_timestamp =
Austin Schuh58646e22021-08-23 23:51:46 -07001719 pi1->MakeEventLoop("pi1_remote_timestamp");
Austin Schuh20ac95d2020-12-05 17:24:19 -08001720
1721 int timestamp_count = 0;
1722 pi1_remote_timestamp->MakeWatcher(
Austin Schuh8902fa52021-03-14 22:39:24 -07001723 "/pi2/aos", [&expected_boot_uuid,
1724 &pi1_remote_timestamp](const message_bridge::Timestamp &) {
Austin Schuha9012be2021-07-21 15:19:11 -07001725 EXPECT_EQ(pi1_remote_timestamp->context().source_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -07001726 expected_boot_uuid);
1727 });
1728 pi1_remote_timestamp->MakeWatcher(
1729 "/test",
1730 [&expected_boot_uuid, &pi1_remote_timestamp](const examples::Pong &) {
Austin Schuha9012be2021-07-21 15:19:11 -07001731 EXPECT_EQ(pi1_remote_timestamp->context().source_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -07001732 expected_boot_uuid);
1733 });
1734 pi1_remote_timestamp->MakeWatcher(
Austin Schuh89c9b812021-02-20 14:42:10 -08001735 shared() ? "/pi1/aos/remote_timestamps/pi2"
1736 : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping",
Austin Schuh20ac95d2020-12-05 17:24:19 -08001737 [&timestamp_count, &expected_boot_uuid](const RemoteMessage &header) {
1738 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuhcdd90272021-03-15 12:46:16 -07001739 EXPECT_EQ(UUID::FromVector(header.boot_uuid()), expected_boot_uuid);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001740 VLOG(1) << aos::FlatbufferToJson(&header);
1741 ++timestamp_count;
1742 });
1743
1744 int pi1_server_statistics_count = 0;
Austin Schuh58646e22021-08-23 23:51:46 -07001745 bool first_pi1_server_statistics = true;
Austin Schuh367a7f42021-11-23 23:04:36 -08001746 int boot_number = 0;
1747 monotonic_clock::time_point expected_connection_time = pi1->monotonic_now();
Austin Schuh20ac95d2020-12-05 17:24:19 -08001748 pi1_remote_timestamp->MakeWatcher(
Austin Schuh367a7f42021-11-23 23:04:36 -08001749 "/pi1/aos",
1750 [&pi1_server_statistics_count, &expected_boot_uuid,
1751 &expected_connection_time, &first_pi1_server_statistics,
1752 &boot_number](const message_bridge::ServerStatistics &stats) {
Austin Schuh20ac95d2020-12-05 17:24:19 -08001753 VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
1754 for (const message_bridge::ServerConnection *connection :
1755 *stats.connections()) {
Austin Schuh58646e22021-08-23 23:51:46 -07001756 if (connection->state() == message_bridge::State::CONNECTED) {
1757 ASSERT_TRUE(connection->has_boot_uuid());
1758 }
1759 if (!first_pi1_server_statistics) {
1760 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1761 }
Austin Schuh20ac95d2020-12-05 17:24:19 -08001762 if (connection->node()->name()->string_view() == "pi2") {
Austin Schuh58646e22021-08-23 23:51:46 -07001763 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1764 ASSERT_TRUE(connection->has_boot_uuid());
Austin Schuh20ac95d2020-12-05 17:24:19 -08001765 EXPECT_EQ(expected_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -07001766 UUID::FromString(connection->boot_uuid()))
Austin Schuh20ac95d2020-12-05 17:24:19 -08001767 << " : Got " << aos::FlatbufferToJson(&stats);
Austin Schuh367a7f42021-11-23 23:04:36 -08001768 EXPECT_EQ(monotonic_clock::time_point(chrono::nanoseconds(
1769 connection->connected_since_time())),
1770 expected_connection_time);
1771 EXPECT_EQ(boot_number + 1, connection->connection_count());
Austin Schuh20ac95d2020-12-05 17:24:19 -08001772 ++pi1_server_statistics_count;
1773 }
1774 }
Austin Schuh58646e22021-08-23 23:51:46 -07001775 first_pi1_server_statistics = false;
Austin Schuh20ac95d2020-12-05 17:24:19 -08001776 });
1777
Austin Schuh58646e22021-08-23 23:51:46 -07001778 int pi1_client_statistics_count = 0;
1779 pi1_remote_timestamp->MakeWatcher(
Austin Schuh367a7f42021-11-23 23:04:36 -08001780 "/pi1/aos", [&pi1_client_statistics_count, &expected_boot_uuid,
1781 &expected_connection_time, &boot_number](
Austin Schuh58646e22021-08-23 23:51:46 -07001782 const message_bridge::ClientStatistics &stats) {
1783 VLOG(1) << "pi1 ClientStatistics " << FlatbufferToJson(&stats);
1784 for (const message_bridge::ClientConnection *connection :
1785 *stats.connections()) {
1786 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1787 if (connection->node()->name()->string_view() == "pi2") {
1788 ++pi1_client_statistics_count;
Austin Schuh367a7f42021-11-23 23:04:36 -08001789 EXPECT_EQ(expected_boot_uuid,
1790 UUID::FromString(connection->boot_uuid()))
1791 << " : Got " << aos::FlatbufferToJson(&stats);
1792 EXPECT_EQ(monotonic_clock::time_point(chrono::nanoseconds(
1793 connection->connected_since_time())),
1794 expected_connection_time);
1795 EXPECT_EQ(boot_number + 1, connection->connection_count());
1796 } else {
1797 EXPECT_EQ(connection->connected_since_time(), 0);
1798 EXPECT_EQ(1, connection->connection_count());
Austin Schuh58646e22021-08-23 23:51:46 -07001799 }
1800 }
1801 });
1802
1803 // Confirm that reboot changes the UUID.
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -07001804 pi2->OnShutdown([&expected_boot_uuid, &boot_number, &expected_connection_time,
1805 pi1, pi2, pi2_boot1]() {
1806 expected_boot_uuid = pi2_boot1;
1807 ++boot_number;
1808 LOG(INFO) << "OnShutdown triggered for pi2";
1809 pi2->OnStartup(
1810 [&expected_boot_uuid, &expected_connection_time, pi1, pi2]() {
1811 EXPECT_EQ(expected_boot_uuid, pi2->boot_uuid());
1812 expected_connection_time = pi1->monotonic_now();
1813 });
1814 });
Austin Schuh58646e22021-08-23 23:51:46 -07001815
Austin Schuh20ac95d2020-12-05 17:24:19 -08001816 // Let a couple of ServerStatistics messages show up before rebooting.
Austin Schuh58646e22021-08-23 23:51:46 -07001817 factory.RunFor(chrono::milliseconds(2002));
Austin Schuh20ac95d2020-12-05 17:24:19 -08001818
1819 EXPECT_GT(timestamp_count, 100);
1820 EXPECT_GE(pi1_server_statistics_count, 1u);
1821
Austin Schuh20ac95d2020-12-05 17:24:19 -08001822 timestamp_count = 0;
1823 pi1_server_statistics_count = 0;
1824
Austin Schuh58646e22021-08-23 23:51:46 -07001825 factory.RunFor(chrono::milliseconds(2000));
Austin Schuh20ac95d2020-12-05 17:24:19 -08001826 EXPECT_GT(timestamp_count, 100);
1827 EXPECT_GE(pi1_server_statistics_count, 1u);
1828}
1829
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -07001830INSTANTIATE_TEST_SUITE_P(
Austin Schuh89c9b812021-02-20 14:42:10 -08001831 All, RemoteMessageSimulatedEventLoopTest,
1832 ::testing::Values(
1833 Param{"multinode_pingpong_test_combined_config.json", true},
1834 Param{"multinode_pingpong_test_split_config.json", false}));
1835
Austin Schuh58646e22021-08-23 23:51:46 -07001836// Tests that Startup and Shutdown do reasonable things.
1837TEST(SimulatedEventLoopTest, MultinodePingPongStartup) {
1838 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
1839 aos::configuration::ReadConfig(
1840 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
1841
Austin Schuh72e65682021-09-02 11:37:05 -07001842 size_t pi1_shutdown_counter = 0;
1843 size_t pi2_shutdown_counter = 0;
1844 MessageCounter<examples::Pong> *pi1_pong_counter = nullptr;
1845 MessageCounter<examples::Ping> *pi2_ping_counter = nullptr;
1846
Austin Schuh58646e22021-08-23 23:51:46 -07001847 message_bridge::TestingTimeConverter time(
1848 configuration::NodesCount(&config.message()));
1849 SimulatedEventLoopFactory factory(&config.message());
1850 factory.SetTimeConverter(&time);
1851 time.AddNextTimestamp(
1852 distributed_clock::epoch(),
1853 {BootTimestamp::epoch(), BootTimestamp::epoch(), BootTimestamp::epoch()});
1854
1855 const chrono::nanoseconds dt = chrono::seconds(10) + chrono::milliseconds(6);
1856
1857 time.AddNextTimestamp(
1858 distributed_clock::epoch() + dt,
1859 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
1860 BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
1861 BootTimestamp::epoch() + dt});
1862
1863 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1864 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
1865
1866 // Configure startup to start Ping and Pong, and count.
1867 size_t pi1_startup_counter = 0;
1868 size_t pi2_startup_counter = 0;
1869 pi1->OnStartup([pi1]() {
1870 LOG(INFO) << "Made ping";
1871 pi1->AlwaysStart<Ping>("ping");
1872 });
1873 pi1->OnStartup([&pi1_startup_counter]() { ++pi1_startup_counter; });
1874 pi2->OnStartup([pi2]() {
1875 LOG(INFO) << "Made pong";
1876 pi2->AlwaysStart<Pong>("pong");
1877 });
1878 pi2->OnStartup([&pi2_startup_counter]() { ++pi2_startup_counter; });
1879
1880 // Shutdown just counts.
Austin Schuh58646e22021-08-23 23:51:46 -07001881 pi1->OnShutdown([&pi1_shutdown_counter]() { ++pi1_shutdown_counter; });
1882 pi2->OnShutdown([&pi2_shutdown_counter]() { ++pi2_shutdown_counter; });
1883
Austin Schuh58646e22021-08-23 23:51:46 -07001884 // Automatically make counters on startup.
1885 pi1->OnStartup([&pi1_pong_counter, pi1]() {
1886 pi1_pong_counter = pi1->AlwaysStart<MessageCounter<examples::Pong>>(
1887 "pi1_pong_counter", "/test");
1888 });
1889 pi1->OnShutdown([&pi1_pong_counter]() { pi1_pong_counter = nullptr; });
1890 pi2->OnStartup([&pi2_ping_counter, pi2]() {
1891 pi2_ping_counter = pi2->AlwaysStart<MessageCounter<examples::Ping>>(
1892 "pi2_ping_counter", "/test");
1893 });
1894 pi2->OnShutdown([&pi2_ping_counter]() { pi2_ping_counter = nullptr; });
1895
1896 EXPECT_EQ(pi2_ping_counter, nullptr);
1897 EXPECT_EQ(pi1_pong_counter, nullptr);
1898
1899 EXPECT_EQ(pi1_startup_counter, 0u);
1900 EXPECT_EQ(pi2_startup_counter, 0u);
1901 EXPECT_EQ(pi1_shutdown_counter, 0u);
1902 EXPECT_EQ(pi2_shutdown_counter, 0u);
1903
1904 factory.RunFor(chrono::seconds(10) + chrono::milliseconds(5));
1905 EXPECT_EQ(pi1_startup_counter, 1u);
1906 EXPECT_EQ(pi2_startup_counter, 1u);
1907 EXPECT_EQ(pi1_shutdown_counter, 0u);
1908 EXPECT_EQ(pi2_shutdown_counter, 0u);
1909 EXPECT_EQ(pi2_ping_counter->count(), 1001);
1910 EXPECT_EQ(pi1_pong_counter->count(), 1001);
1911
1912 LOG(INFO) << pi1->monotonic_now();
1913 LOG(INFO) << pi2->monotonic_now();
1914
1915 factory.RunFor(chrono::seconds(5) + chrono::milliseconds(5));
1916
1917 EXPECT_EQ(pi1_startup_counter, 2u);
1918 EXPECT_EQ(pi2_startup_counter, 2u);
1919 EXPECT_EQ(pi1_shutdown_counter, 1u);
1920 EXPECT_EQ(pi2_shutdown_counter, 1u);
1921 EXPECT_EQ(pi2_ping_counter->count(), 501);
1922 EXPECT_EQ(pi1_pong_counter->count(), 501);
1923}
1924
1925// Tests that OnStartup handlers can be added after running and get called, and
1926// can't be called when running.
1927TEST(SimulatedEventLoopDeathTest, OnStartupWhileRunning) {
1928 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
1929 aos::configuration::ReadConfig(
1930 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
1931
1932 // Test that we can add startup handlers as long as we aren't running, and
1933 // they get run when Run gets called again.
1934 // Test that adding a startup handler when running fails.
1935 //
1936 // Test shutdown handlers get called on destruction.
1937 SimulatedEventLoopFactory factory(&config.message());
1938
1939 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1940
1941 int startup_count0 = 0;
1942 int startup_count1 = 0;
1943
1944 pi1->OnStartup([&]() { ++startup_count0; });
1945 EXPECT_EQ(startup_count0, 0);
1946 EXPECT_EQ(startup_count1, 0);
1947
1948 factory.RunFor(chrono::nanoseconds(1));
1949 EXPECT_EQ(startup_count0, 1);
1950 EXPECT_EQ(startup_count1, 0);
1951
1952 pi1->OnStartup([&]() { ++startup_count1; });
1953 EXPECT_EQ(startup_count0, 1);
1954 EXPECT_EQ(startup_count1, 0);
1955
1956 factory.RunFor(chrono::nanoseconds(1));
1957 EXPECT_EQ(startup_count0, 1);
1958 EXPECT_EQ(startup_count1, 1);
1959
1960 std::unique_ptr<EventLoop> loop = pi1->MakeEventLoop("foo");
1961 loop->OnRun([&]() { pi1->OnStartup([]() {}); });
1962
1963 EXPECT_DEATH({ factory.RunFor(chrono::nanoseconds(1)); },
1964 "Can only register OnStartup handlers when not running.");
1965}
1966
1967// Tests that OnStartup handlers can be added after running and get called, and
1968// all the handlers get called on reboot. Shutdown handlers are tested the same
1969// way.
1970TEST(SimulatedEventLoopTest, OnStartupShutdownAllRestarts) {
1971 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
1972 aos::configuration::ReadConfig(
1973 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
1974
Austin Schuh72e65682021-09-02 11:37:05 -07001975 int startup_count0 = 0;
1976 int shutdown_count0 = 0;
1977 int startup_count1 = 0;
1978 int shutdown_count1 = 0;
1979
Austin Schuh58646e22021-08-23 23:51:46 -07001980 message_bridge::TestingTimeConverter time(
1981 configuration::NodesCount(&config.message()));
1982 SimulatedEventLoopFactory factory(&config.message());
1983 factory.SetTimeConverter(&time);
1984 time.StartEqual();
1985
1986 const chrono::nanoseconds dt = chrono::seconds(10);
1987 time.RebootAt(0, distributed_clock::epoch() + dt);
1988 time.RebootAt(0, distributed_clock::epoch() + 2 * dt);
1989
1990 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1991
Austin Schuh58646e22021-08-23 23:51:46 -07001992 pi1->OnStartup([&]() { ++startup_count0; });
1993 pi1->OnShutdown([&]() { ++shutdown_count0; });
1994 EXPECT_EQ(startup_count0, 0);
1995 EXPECT_EQ(startup_count1, 0);
1996 EXPECT_EQ(shutdown_count0, 0);
1997 EXPECT_EQ(shutdown_count1, 0);
1998
1999 factory.RunFor(chrono::nanoseconds(1));
2000 EXPECT_EQ(startup_count0, 1);
2001 EXPECT_EQ(startup_count1, 0);
2002 EXPECT_EQ(shutdown_count0, 0);
2003 EXPECT_EQ(shutdown_count1, 0);
2004
2005 pi1->OnStartup([&]() { ++startup_count1; });
2006 EXPECT_EQ(startup_count0, 1);
2007 EXPECT_EQ(startup_count1, 0);
2008 EXPECT_EQ(shutdown_count0, 0);
2009 EXPECT_EQ(shutdown_count1, 0);
2010
2011 factory.RunFor(chrono::nanoseconds(1));
2012 EXPECT_EQ(startup_count0, 1);
2013 EXPECT_EQ(startup_count1, 1);
2014 EXPECT_EQ(shutdown_count0, 0);
2015 EXPECT_EQ(shutdown_count1, 0);
2016
2017 factory.RunFor(chrono::seconds(15));
2018
2019 EXPECT_EQ(startup_count0, 2);
2020 EXPECT_EQ(startup_count1, 2);
2021 EXPECT_EQ(shutdown_count0, 1);
2022 EXPECT_EQ(shutdown_count1, 0);
2023
2024 pi1->OnShutdown([&]() { ++shutdown_count1; });
2025 factory.RunFor(chrono::seconds(10));
2026
2027 EXPECT_EQ(startup_count0, 3);
2028 EXPECT_EQ(startup_count1, 3);
2029 EXPECT_EQ(shutdown_count0, 2);
2030 EXPECT_EQ(shutdown_count1, 1);
2031}
2032
2033// Tests that event loops which outlive shutdown crash.
2034TEST(SimulatedEventLoopDeathTest, EventLoopOutlivesReboot) {
2035 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2036 aos::configuration::ReadConfig(
2037 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2038
2039 message_bridge::TestingTimeConverter time(
2040 configuration::NodesCount(&config.message()));
2041 SimulatedEventLoopFactory factory(&config.message());
2042 factory.SetTimeConverter(&time);
2043 time.StartEqual();
2044
2045 const chrono::nanoseconds dt = chrono::seconds(10);
2046 time.RebootAt(0, distributed_clock::epoch() + dt);
2047
2048 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2049
2050 std::unique_ptr<EventLoop> loop = pi1->MakeEventLoop("foo");
2051
2052 EXPECT_DEATH({ factory.RunFor(dt * 2); }, "Event loop");
2053}
2054
2055// Tests that messages don't survive a reboot of a node.
2056TEST(SimulatedEventLoopTest, ChannelClearedOnReboot) {
2057 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2058 aos::configuration::ReadConfig(
2059 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2060
2061 message_bridge::TestingTimeConverter time(
2062 configuration::NodesCount(&config.message()));
2063 SimulatedEventLoopFactory factory(&config.message());
2064 factory.SetTimeConverter(&time);
2065 time.StartEqual();
2066
2067 const chrono::nanoseconds dt = chrono::seconds(10);
2068 time.RebootAt(0, distributed_clock::epoch() + dt);
2069
2070 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2071
2072 const UUID boot_uuid = pi1->boot_uuid();
2073 EXPECT_NE(boot_uuid, UUID::Zero());
2074
2075 {
2076 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
2077 aos::Sender<examples::Ping> test_message_sender =
2078 ping_event_loop->MakeSender<examples::Ping>("/reliable");
2079 SendPing(&test_message_sender, 1);
2080 }
2081
2082 factory.RunFor(chrono::seconds(5));
2083
2084 {
2085 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
2086 aos::Fetcher<examples::Ping> fetcher =
2087 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
2088 EXPECT_TRUE(fetcher.Fetch());
2089 }
2090
2091 factory.RunFor(chrono::seconds(10));
2092
2093 {
2094 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
2095 aos::Fetcher<examples::Ping> fetcher =
2096 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
2097 EXPECT_FALSE(fetcher.Fetch());
2098 }
2099 EXPECT_NE(boot_uuid, pi1->boot_uuid());
2100}
2101
2102// Tests that reliable messages get resent on reboot.
2103TEST(SimulatedEventLoopTest, ReliableMessageResentOnReboot) {
2104 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2105 aos::configuration::ReadConfig(
2106 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2107
2108 message_bridge::TestingTimeConverter time(
2109 configuration::NodesCount(&config.message()));
2110 SimulatedEventLoopFactory factory(&config.message());
2111 factory.SetTimeConverter(&time);
2112 time.StartEqual();
2113
2114 const chrono::nanoseconds dt = chrono::seconds(1);
2115 time.RebootAt(1, distributed_clock::epoch() + dt);
2116
2117 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2118 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2119
2120 const UUID pi1_boot_uuid = pi1->boot_uuid();
2121 const UUID pi2_boot_uuid = pi2->boot_uuid();
2122 EXPECT_NE(pi1_boot_uuid, UUID::Zero());
2123 EXPECT_NE(pi2_boot_uuid, UUID::Zero());
2124
2125 {
2126 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
2127 aos::Sender<examples::Ping> test_message_sender =
2128 ping_event_loop->MakeSender<examples::Ping>("/reliable");
2129 SendPing(&test_message_sender, 1);
2130 }
2131
2132 factory.RunFor(chrono::milliseconds(500));
2133
2134 {
2135 ::std::unique_ptr<EventLoop> ping_event_loop = pi2->MakeEventLoop("pong");
2136 aos::Fetcher<examples::Ping> fetcher =
2137 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
2138 EXPECT_TRUE(fetcher.Fetch());
2139 }
2140
2141 factory.RunFor(chrono::seconds(1));
2142
2143 {
2144 ::std::unique_ptr<EventLoop> ping_event_loop = pi2->MakeEventLoop("pong");
2145 aos::Fetcher<examples::Ping> fetcher =
2146 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
2147 EXPECT_TRUE(fetcher.Fetch());
2148 }
2149 EXPECT_NE(pi2_boot_uuid, pi2->boot_uuid());
2150}
2151
James Kuszmaul86e86c32022-07-21 17:39:47 -07002152TEST(SimulatedEventLoopTest, ReliableMessageSentOnStaggeredBoot) {
2153 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2154 aos::configuration::ReadConfig(
2155 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2156
2157 message_bridge::TestingTimeConverter time(
2158 configuration::NodesCount(&config.message()));
2159 time.AddNextTimestamp(
2160 distributed_clock::epoch(),
2161 {BootTimestamp{0, monotonic_clock::epoch()},
2162 BootTimestamp{0, monotonic_clock::epoch() - chrono::seconds(1)},
2163 BootTimestamp{0, monotonic_clock::epoch()}});
2164 SimulatedEventLoopFactory factory(&config.message());
2165 factory.SetTimeConverter(&time);
2166
2167 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2168 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2169
2170 const UUID pi1_boot_uuid = pi1->boot_uuid();
2171 const UUID pi2_boot_uuid = pi2->boot_uuid();
2172 EXPECT_NE(pi1_boot_uuid, UUID::Zero());
2173 EXPECT_NE(pi2_boot_uuid, UUID::Zero());
2174
2175 {
2176 ::std::unique_ptr<EventLoop> pi1_event_loop = pi1->MakeEventLoop("ping");
2177 aos::Sender<examples::Ping> pi1_sender =
2178 pi1_event_loop->MakeSender<examples::Ping>("/reliable");
2179 SendPing(&pi1_sender, 1);
2180 }
2181 ::std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("ping");
2182 aos::Sender<examples::Ping> pi2_sender =
2183 pi2_event_loop->MakeSender<examples::Ping>("/reliable2");
2184 SendPing(&pi2_sender, 1);
2185 // Verify that we staggered the OnRun callback correctly.
2186 pi2_event_loop->OnRun([pi1, pi2]() {
2187 EXPECT_EQ(pi1->monotonic_now(),
2188 monotonic_clock::epoch() + std::chrono::seconds(1));
2189 EXPECT_EQ(pi2->monotonic_now(), monotonic_clock::epoch());
2190 });
2191
2192 factory.RunFor(chrono::seconds(2));
2193
2194 {
2195 ::std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("pong");
2196 aos::Fetcher<examples::Ping> fetcher =
2197 pi2_event_loop->MakeFetcher<examples::Ping>("/reliable");
2198 ASSERT_TRUE(fetcher.Fetch());
2199 EXPECT_EQ(fetcher.context().monotonic_event_time,
2200 monotonic_clock::epoch() + factory.network_delay());
2201 EXPECT_EQ(fetcher.context().monotonic_remote_time,
2202 monotonic_clock::epoch());
2203 }
2204 {
2205 ::std::unique_ptr<EventLoop> pi1_event_loop = pi1->MakeEventLoop("pong");
2206 aos::Fetcher<examples::Ping> fetcher =
2207 pi1_event_loop->MakeFetcher<examples::Ping>("/reliable2");
2208 ASSERT_TRUE(fetcher.Fetch());
2209 EXPECT_EQ(fetcher.context().monotonic_event_time,
2210 monotonic_clock::epoch() + std::chrono::seconds(1) +
2211 factory.network_delay());
2212 EXPECT_EQ(fetcher.context().monotonic_remote_time,
2213 monotonic_clock::epoch() - std::chrono::seconds(1));
2214 }
2215}
2216
Austin Schuh48205e62021-11-12 14:13:18 -08002217class SimulatedEventLoopDisconnectTest : public ::testing::Test {
2218 public:
2219 SimulatedEventLoopDisconnectTest()
2220 : config(aos::configuration::ReadConfig(ArtifactPath(
2221 "aos/events/multinode_pingpong_test_split_config.json"))),
2222 time(configuration::NodesCount(&config.message())),
2223 factory(&config.message()) {
2224 factory.SetTimeConverter(&time);
2225 }
2226
2227 void VerifyChannels(std::set<const aos::Channel *> statistics_channels,
2228 const monotonic_clock::time_point allowable_message_time,
2229 std::set<const aos::Node *> empty_nodes) {
2230 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2231 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2232 std::unique_ptr<aos::EventLoop> pi1_event_loop =
2233 pi1->MakeEventLoop("fetcher");
2234 std::unique_ptr<aos::EventLoop> pi2_event_loop =
2235 pi2->MakeEventLoop("fetcher");
2236 for (const aos::Channel *channel : *factory.configuration()->channels()) {
2237 if (configuration::ChannelIsReadableOnNode(channel,
2238 pi1_event_loop->node())) {
2239 std::unique_ptr<aos::RawFetcher> fetcher =
2240 pi1_event_loop->MakeRawFetcher(channel);
2241 if (statistics_channels.find(channel) == statistics_channels.end() ||
2242 empty_nodes.find(pi1_event_loop->node()) != empty_nodes.end()) {
2243 EXPECT_FALSE(fetcher->Fetch() &&
2244 fetcher->context().monotonic_event_time >
2245 allowable_message_time)
2246 << ": Found recent message on channel "
2247 << configuration::CleanedChannelToString(channel) << " and time "
2248 << fetcher->context().monotonic_event_time << " > "
2249 << allowable_message_time << " on pi1";
2250 } else {
2251 EXPECT_TRUE(fetcher->Fetch() &&
2252 fetcher->context().monotonic_event_time >=
2253 allowable_message_time)
2254 << ": Didn't find recent message on channel "
2255 << configuration::CleanedChannelToString(channel) << " on pi1";
2256 }
2257 }
2258 if (configuration::ChannelIsReadableOnNode(channel,
2259 pi2_event_loop->node())) {
2260 std::unique_ptr<aos::RawFetcher> fetcher =
2261 pi2_event_loop->MakeRawFetcher(channel);
2262 if (statistics_channels.find(channel) == statistics_channels.end() ||
2263 empty_nodes.find(pi2_event_loop->node()) != empty_nodes.end()) {
2264 EXPECT_FALSE(fetcher->Fetch() &&
2265 fetcher->context().monotonic_event_time >
2266 allowable_message_time)
2267 << ": Found message on channel "
2268 << configuration::CleanedChannelToString(channel) << " and time "
2269 << fetcher->context().monotonic_event_time << " > "
2270 << allowable_message_time << " on pi2";
2271 } else {
2272 EXPECT_TRUE(fetcher->Fetch() &&
2273 fetcher->context().monotonic_event_time >=
2274 allowable_message_time)
2275 << ": Didn't find message on channel "
2276 << configuration::CleanedChannelToString(channel) << " on pi2";
2277 }
2278 }
2279 }
2280 }
2281
2282 aos::FlatbufferDetachedBuffer<aos::Configuration> config;
2283
2284 message_bridge::TestingTimeConverter time;
2285 SimulatedEventLoopFactory factory;
2286};
2287
2288// Tests that if we have message bridge client/server disabled, and timing
2289// reports disabled, no messages are sent. Also tests that we can disconnect a
2290// node and disable statistics on it and it actually fully disconnects.
2291TEST_F(SimulatedEventLoopDisconnectTest, NoMessagesWhenDisabled) {
2292 time.StartEqual();
2293 factory.SkipTimingReport();
2294 factory.DisableStatistics();
2295
2296 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2297 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2298
2299 std::unique_ptr<aos::EventLoop> pi1_event_loop =
2300 pi1->MakeEventLoop("fetcher");
2301 std::unique_ptr<aos::EventLoop> pi2_event_loop =
2302 pi2->MakeEventLoop("fetcher");
2303
2304 factory.RunFor(chrono::milliseconds(100000));
2305
2306 // Confirm no messages are sent if we've configured them all off.
2307 VerifyChannels({}, monotonic_clock::min_time, {});
2308
2309 // Now, confirm that all the message_bridge channels come back when we
2310 // re-enable.
2311 factory.EnableStatistics();
2312
2313 factory.RunFor(chrono::milliseconds(10050));
2314
2315 // Build up the list of all the messages we expect when we come back.
2316 {
2317 std::set<const aos::Channel *> statistics_channels;
Brian Silverman18c8aac2022-01-02 00:03:37 -08002318 for (const std::pair<std::string_view, const Node *> &pi :
Austin Schuh48205e62021-11-12 14:13:18 -08002319 std::vector<std::pair<std::string_view, const Node *>>{
2320 {"/pi1/aos", pi1->node()},
2321 {"/pi2/aos", pi1->node()},
2322 {"/pi3/aos", pi1->node()}}) {
2323 statistics_channels.insert(configuration::GetChannel(
2324 factory.configuration(), pi.first, "aos.message_bridge.Timestamp", "",
2325 pi.second));
2326 statistics_channels.insert(configuration::GetChannel(
2327 factory.configuration(), pi.first,
2328 "aos.message_bridge.ServerStatistics", "", pi.second));
2329 statistics_channels.insert(configuration::GetChannel(
2330 factory.configuration(), pi.first,
2331 "aos.message_bridge.ClientStatistics", "", pi.second));
2332 }
2333
2334 statistics_channels.insert(configuration::GetChannel(
2335 factory.configuration(),
2336 "/pi1/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp",
2337 "aos.message_bridge.RemoteMessage", "", pi1->node()));
2338 statistics_channels.insert(configuration::GetChannel(
2339 factory.configuration(),
2340 "/pi2/aos/remote_timestamps/pi1/pi2/aos/aos-message_bridge-Timestamp",
2341 "aos.message_bridge.RemoteMessage", "", pi2->node()));
2342 VerifyChannels(statistics_channels, monotonic_clock::min_time, {});
2343 }
2344
2345 // Now test that we can disable the messages for a single node
2346 pi2->DisableStatistics();
2347 const aos::monotonic_clock::time_point statistics_disable_time =
2348 pi2->monotonic_now();
2349 factory.RunFor(chrono::milliseconds(10000));
2350
2351 // We should see a much smaller set of messages, but should still see messages
2352 // forwarded, mainly the timestamp message.
2353 {
2354 std::set<const aos::Channel *> statistics_channels;
Brian Silverman18c8aac2022-01-02 00:03:37 -08002355 for (const std::pair<std::string_view, const Node *> &pi :
Austin Schuh48205e62021-11-12 14:13:18 -08002356 std::vector<std::pair<std::string_view, const Node *>>{
2357 {"/pi1/aos", pi1->node()}, {"/pi3/aos", pi1->node()}}) {
2358 statistics_channels.insert(configuration::GetChannel(
2359 factory.configuration(), pi.first, "aos.message_bridge.Timestamp", "",
2360 pi.second));
2361 statistics_channels.insert(configuration::GetChannel(
2362 factory.configuration(), pi.first,
2363 "aos.message_bridge.ServerStatistics", "", pi.second));
2364 statistics_channels.insert(configuration::GetChannel(
2365 factory.configuration(), pi.first,
2366 "aos.message_bridge.ClientStatistics", "", pi.second));
2367 }
2368
2369 statistics_channels.insert(configuration::GetChannel(
2370 factory.configuration(),
2371 "/pi1/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp",
2372 "aos.message_bridge.RemoteMessage", "", pi1->node()));
2373 VerifyChannels(statistics_channels, statistics_disable_time, {});
2374 }
2375
2376 // Now, fully disconnect the node. This will completely quiet down pi2.
2377 pi1->Disconnect(pi2->node());
2378 pi2->Disconnect(pi1->node());
2379
2380 const aos::monotonic_clock::time_point disconnect_disable_time =
2381 pi2->monotonic_now();
2382 factory.RunFor(chrono::milliseconds(10000));
2383
2384 {
2385 std::set<const aos::Channel *> statistics_channels;
Brian Silverman18c8aac2022-01-02 00:03:37 -08002386 for (const std::pair<std::string_view, const Node *> &pi :
Austin Schuh48205e62021-11-12 14:13:18 -08002387 std::vector<std::pair<std::string_view, const Node *>>{
2388 {"/pi1/aos", pi1->node()}, {"/pi3/aos", pi1->node()}}) {
2389 statistics_channels.insert(configuration::GetChannel(
2390 factory.configuration(), pi.first, "aos.message_bridge.Timestamp", "",
2391 pi.second));
2392 statistics_channels.insert(configuration::GetChannel(
2393 factory.configuration(), pi.first,
2394 "aos.message_bridge.ServerStatistics", "", pi.second));
2395 statistics_channels.insert(configuration::GetChannel(
2396 factory.configuration(), pi.first,
2397 "aos.message_bridge.ClientStatistics", "", pi.second));
2398 }
2399
2400 VerifyChannels(statistics_channels, disconnect_disable_time, {pi2->node()});
2401 }
2402}
2403
Neil Balchc8f41ed2018-01-20 22:06:53 -08002404} // namespace testing
2405} // namespace aos