blob: 050c9a3537206755e243aeb754fb37a120d64c0a [file] [log] [blame]
Alex Perrycb7da4b2019-08-28 19:35:56 -07001#include "aos/events/simulated_event_loop.h"
2
milind1f1dca32021-07-03 13:50:07 -07003#include <chrono>
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -07004#include <functional>
Austin Schuh5f1cc5c2019-12-01 18:01:11 -08005#include <string_view>
6
Philipp Schrader790cb542023-07-05 21:06:52 -07007#include "gtest/gtest.h"
8
Alex Perrycb7da4b2019-08-28 19:35:56 -07009#include "aos/events/event_loop_param_test.h"
Austin Schuh2f8fd752020-09-01 22:38:28 -070010#include "aos/events/logging/logger_generated.h"
Austin Schuh01b4c352020-09-21 23:09:39 -070011#include "aos/events/message_counter.h"
Austin Schuh898f4972020-01-11 17:21:25 -080012#include "aos/events/ping_lib.h"
13#include "aos/events/pong_lib.h"
Austin Schuh7d87b672019-12-01 20:23:49 -080014#include "aos/events/test_message_generated.h"
Austin Schuh4c3b9702020-08-30 11:34:55 -070015#include "aos/network/message_bridge_client_generated.h"
16#include "aos/network/message_bridge_server_generated.h"
Austin Schuh0de30f32020-12-06 12:44:28 -080017#include "aos/network/remote_message_generated.h"
Austin Schuh87dd3832021-01-01 23:07:31 -080018#include "aos/network/testing_time_converter.h"
Austin Schuh4c3b9702020-08-30 11:34:55 -070019#include "aos/network/timestamp_generated.h"
Austin Schuh373f1762021-06-02 21:07:09 -070020#include "aos/testing/path.h"
Neil Balchc8f41ed2018-01-20 22:06:53 -080021
Stephan Pleinesf63bde82024-01-13 15:59:33 -080022namespace aos::testing {
Brian Silverman28d14302020-09-18 15:26:17 -070023namespace {
24
Austin Schuh373f1762021-06-02 21:07:09 -070025using aos::testing::ArtifactPath;
Brian Silverman28d14302020-09-18 15:26:17 -070026
Austin Schuh58646e22021-08-23 23:51:46 -070027using logger::BootTimestamp;
Austin Schuh0de30f32020-12-06 12:44:28 -080028using message_bridge::RemoteMessage;
Austin Schuh7267c532019-05-19 19:55:53 -070029namespace chrono = ::std::chrono;
30
Austin Schuh0de30f32020-12-06 12:44:28 -080031} // namespace
32
Neil Balchc8f41ed2018-01-20 22:06:53 -080033class SimulatedEventLoopTestFactory : public EventLoopTestFactory {
34 public:
Austin Schuh5f1cc5c2019-12-01 18:01:11 -080035 ::std::unique_ptr<EventLoop> Make(std::string_view name) override {
Austin Schuh217a9782019-12-21 23:02:50 -080036 MaybeMake();
Austin Schuhac0771c2020-01-07 18:36:30 -080037 return event_loop_factory_->MakeEventLoop(name, my_node());
Neil Balchc8f41ed2018-01-20 22:06:53 -080038 }
Austin Schuh5f1cc5c2019-12-01 18:01:11 -080039 ::std::unique_ptr<EventLoop> MakePrimary(std::string_view name) override {
Austin Schuh217a9782019-12-21 23:02:50 -080040 MaybeMake();
Austin Schuhac0771c2020-01-07 18:36:30 -080041 return event_loop_factory_->MakeEventLoop(name, my_node());
Austin Schuh44019f92019-05-19 19:58:27 -070042 }
43
Austin Schuh217a9782019-12-21 23:02:50 -080044 void Run() override { event_loop_factory_->Run(); }
45 void Exit() override { event_loop_factory_->Exit(); }
Austin Schuh44019f92019-05-19 19:58:27 -070046
Austin Schuh52d325c2019-06-23 18:59:06 -070047 // TODO(austin): Implement this. It's used currently for a phased loop test.
48 // I'm not sure how much that matters.
49 void SleepFor(::std::chrono::nanoseconds /*duration*/) override {}
50
Austin Schuh7d87b672019-12-01 20:23:49 -080051 void set_send_delay(std::chrono::nanoseconds send_delay) {
Austin Schuh217a9782019-12-21 23:02:50 -080052 MaybeMake();
53 event_loop_factory_->set_send_delay(send_delay);
Austin Schuh7d87b672019-12-01 20:23:49 -080054 }
55
Neil Balchc8f41ed2018-01-20 22:06:53 -080056 private:
Austin Schuh217a9782019-12-21 23:02:50 -080057 void MaybeMake() {
58 if (!event_loop_factory_) {
59 if (configuration()->has_nodes()) {
Austin Schuhac0771c2020-01-07 18:36:30 -080060 event_loop_factory_ =
61 std::make_unique<SimulatedEventLoopFactory>(configuration());
Austin Schuh217a9782019-12-21 23:02:50 -080062 } else {
63 event_loop_factory_ =
64 std::make_unique<SimulatedEventLoopFactory>(configuration());
65 }
66 }
67 }
68 std::unique_ptr<SimulatedEventLoopFactory> event_loop_factory_;
Neil Balchc8f41ed2018-01-20 22:06:53 -080069};
70
Austin Schuh6bae8252021-02-07 22:01:49 -080071auto CommonParameters() {
72 return ::testing::Combine(
73 ::testing::Values([]() { return new SimulatedEventLoopTestFactory(); }),
74 ::testing::Values(ReadMethod::COPY, ReadMethod::PIN),
75 ::testing::Values(DoTimingReports::kYes, DoTimingReports::kNo));
76}
Austin Schuh6b6dfa52019-06-12 20:16:20 -070077
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -070078INSTANTIATE_TEST_SUITE_P(SimulatedEventLoopCommonTest, AbstractEventLoopTest,
Austin Schuh66168842021-08-17 19:42:21 -070079 CommonParameters());
Brian Silverman77162972020-08-12 19:52:40 -070080
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -070081INSTANTIATE_TEST_SUITE_P(SimulatedEventLoopCommonDeathTest,
Austin Schuh66168842021-08-17 19:42:21 -070082 AbstractEventLoopDeathTest, CommonParameters());
Neil Balchc8f41ed2018-01-20 22:06:53 -080083
Austin Schuh89c9b812021-02-20 14:42:10 -080084// Parameters to run all the tests with.
85struct Param {
86 // The config file to use.
87 std::string config;
88 // If true, the RemoteMessage channel should be shared between all the remote
89 // channels. If false, there will be 1 RemoteMessage channel per remote
90 // channel.
91 bool shared;
92};
93
94class RemoteMessageSimulatedEventLoopTest
95 : public ::testing::TestWithParam<struct Param> {
96 public:
97 RemoteMessageSimulatedEventLoopTest()
98 : config(aos::configuration::ReadConfig(
Austin Schuh373f1762021-06-02 21:07:09 -070099 ArtifactPath(absl::StrCat("aos/events/", GetParam().config)))) {
Austin Schuh89c9b812021-02-20 14:42:10 -0800100 LOG(INFO) << "Config " << GetParam().config;
101 }
102
103 bool shared() const { return GetParam().shared; }
104
105 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
106 MakePi2OnPi1MessageCounters(aos::EventLoop *event_loop) {
107 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>> counters;
108 if (shared()) {
109 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
110 event_loop, "/aos/remote_timestamps/pi2"));
111 } else {
112 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
113 event_loop,
114 "/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp"));
115 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
116 event_loop, "/aos/remote_timestamps/pi2/test/aos-examples-Ping"));
117 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
118 event_loop, "/aos/remote_timestamps/pi2/reliable/aos-examples-Ping"));
119 }
120 return counters;
121 }
122
123 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
124 MakePi1OnPi2MessageCounters(aos::EventLoop *event_loop) {
125 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>> counters;
126 if (shared()) {
127 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
128 event_loop, "/aos/remote_timestamps/pi1"));
129 } else {
130 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
131 event_loop, "/aos/remote_timestamps/pi1/test/aos-examples-Pong"));
132 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
133 event_loop,
134 "/aos/remote_timestamps/pi1/pi2/aos/aos-message_bridge-Timestamp"));
135 }
136 return counters;
137 }
138
139 aos::FlatbufferDetachedBuffer<aos::Configuration> config;
140};
141
Austin Schuh8fb315a2020-11-19 22:33:58 -0800142// Test that sending a message after running gets properly notified.
143TEST(SimulatedEventLoopTest, SendAfterRunFor) {
144 SimulatedEventLoopTestFactory factory;
145
146 SimulatedEventLoopFactory simulated_event_loop_factory(
147 factory.configuration());
148
149 ::std::unique_ptr<EventLoop> ping_event_loop =
150 simulated_event_loop_factory.MakeEventLoop("ping");
151 aos::Sender<TestMessage> test_message_sender =
152 ping_event_loop->MakeSender<TestMessage>("/test");
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700153 ASSERT_EQ(SendTestMessage(test_message_sender), RawSender::Error::kOk);
Austin Schuh8fb315a2020-11-19 22:33:58 -0800154
155 std::unique_ptr<EventLoop> pong1_event_loop =
156 simulated_event_loop_factory.MakeEventLoop("pong");
157 MessageCounter<TestMessage> test_message_counter1(pong1_event_loop.get(),
158 "/test");
159
160 EXPECT_FALSE(ping_event_loop->is_running());
161
162 // Watchers start when you start running, so there should be nothing counted.
163 simulated_event_loop_factory.RunFor(chrono::seconds(1));
164 EXPECT_EQ(test_message_counter1.count(), 0u);
165
166 std::unique_ptr<EventLoop> pong2_event_loop =
167 simulated_event_loop_factory.MakeEventLoop("pong");
168 MessageCounter<TestMessage> test_message_counter2(pong2_event_loop.get(),
169 "/test");
170
171 // Pauses in the middle don't count though, so this should be counted.
172 // But, the fresh watcher shouldn't pick it up yet.
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700173 ASSERT_EQ(SendTestMessage(test_message_sender), RawSender::Error::kOk);
Austin Schuh8fb315a2020-11-19 22:33:58 -0800174
175 EXPECT_EQ(test_message_counter1.count(), 0u);
176 EXPECT_EQ(test_message_counter2.count(), 0u);
177 simulated_event_loop_factory.RunFor(chrono::seconds(1));
178
179 EXPECT_EQ(test_message_counter1.count(), 1u);
180 EXPECT_EQ(test_message_counter2.count(), 0u);
181}
182
Austin Schuh60e77942022-05-16 17:48:24 -0700183// Test that if we configure an event loop to be able to send too fast that we
184// do allow it to do so.
James Kuszmaul890c2492022-04-06 14:59:31 -0700185TEST(SimulatedEventLoopTest, AllowSendTooFast) {
186 SimulatedEventLoopTestFactory factory;
187
188 SimulatedEventLoopFactory simulated_event_loop_factory(
189 factory.configuration());
190
191 // Create two event loops: One will be allowed to send too fast, one won't. We
192 // will then test to ensure that the one that is allowed to send too fast can
193 // indeed send too fast, but that it then makes it so that the second event
194 // loop can no longer send anything because *it* is still limited.
195 ::std::unique_ptr<EventLoop> too_fast_event_loop =
196 simulated_event_loop_factory.GetNodeEventLoopFactory(nullptr)
197 ->MakeEventLoop("too_fast_sender",
198 {NodeEventLoopFactory::CheckSentTooFast::kNo,
James Kuszmaul94ca5132022-07-19 09:11:08 -0700199 NodeEventLoopFactory::ExclusiveSenders::kNo,
200 {}});
James Kuszmaul890c2492022-04-06 14:59:31 -0700201 aos::Sender<TestMessage> too_fast_message_sender =
202 too_fast_event_loop->MakeSender<TestMessage>("/test");
203
204 ::std::unique_ptr<EventLoop> limited_event_loop =
205 simulated_event_loop_factory.MakeEventLoop("limited_sender");
206 aos::Sender<TestMessage> limited_message_sender =
207 limited_event_loop->MakeSender<TestMessage>("/test");
208
209 const int queue_size = TestChannelQueueSize(too_fast_event_loop.get());
210 for (int ii = 0; ii < queue_size; ++ii) {
211 ASSERT_EQ(SendTestMessage(too_fast_message_sender), RawSender::Error::kOk);
212 }
213 // And now we should start being in the sending-too-fast phase.
214 for (int ii = 0; ii < queue_size; ++ii) {
215 ASSERT_EQ(SendTestMessage(too_fast_message_sender), RawSender::Error::kOk);
Austin Schuh60e77942022-05-16 17:48:24 -0700216 ASSERT_EQ(SendTestMessage(limited_message_sender),
217 RawSender::Error::kMessagesSentTooFast);
James Kuszmaul890c2492022-04-06 14:59:31 -0700218 }
219}
220
221// Test that if we setup an exclusive sender that it is indeed exclusive.
222TEST(SimulatedEventLoopDeathTest, ExclusiveSenders) {
223 SimulatedEventLoopTestFactory factory;
224
225 SimulatedEventLoopFactory simulated_event_loop_factory(
226 factory.configuration());
227
228 ::std::unique_ptr<EventLoop> exclusive_event_loop =
229 simulated_event_loop_factory.GetNodeEventLoopFactory(nullptr)
James Kuszmaul94ca5132022-07-19 09:11:08 -0700230 ->MakeEventLoop(
231 "too_fast_sender",
232 {NodeEventLoopFactory::CheckSentTooFast::kYes,
233 NodeEventLoopFactory::ExclusiveSenders::kYes,
234 {{configuration::GetChannel(factory.configuration(), "/test1",
235 "aos.TestMessage", "", nullptr),
236 NodeEventLoopFactory::ExclusiveSenders::kNo}}});
James Kuszmaul890c2492022-04-06 14:59:31 -0700237 exclusive_event_loop->SkipAosLog();
238 exclusive_event_loop->SkipTimingReport();
239 ::std::unique_ptr<EventLoop> normal_event_loop =
240 simulated_event_loop_factory.MakeEventLoop("limited_sender");
241 // Set things up to have the exclusive sender be destroyed so we can test
242 // recovery.
243 {
244 aos::Sender<TestMessage> exclusive_sender =
245 exclusive_event_loop->MakeSender<TestMessage>("/test");
246
247 EXPECT_DEATH(normal_event_loop->MakeSender<TestMessage>("/test"),
248 "TestMessage");
249 }
250 // This one should succeed now that the exclusive channel is removed.
251 aos::Sender<TestMessage> normal_sender =
252 normal_event_loop->MakeSender<TestMessage>("/test");
Austin Schuh60e77942022-05-16 17:48:24 -0700253 EXPECT_DEATH(exclusive_event_loop->MakeSender<TestMessage>("/test"),
254 "TestMessage");
James Kuszmaul94ca5132022-07-19 09:11:08 -0700255
256 // And check an explicitly exempted channel:
257 aos::Sender<TestMessage> non_exclusive_sender =
258 exclusive_event_loop->MakeSender<TestMessage>("/test1");
259 aos::Sender<TestMessage> non_exclusive_sender_regular_event_loop =
260 normal_event_loop->MakeSender<TestMessage>("/test1");
James Kuszmaul890c2492022-04-06 14:59:31 -0700261}
262
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700263void TestSentTooFastCheckEdgeCase(
264 const std::function<RawSender::Error(int, int)> expected_err,
265 const bool send_twice_at_end) {
266 SimulatedEventLoopTestFactory factory;
267
268 auto event_loop = factory.MakePrimary("primary");
269
270 auto sender = event_loop->MakeSender<TestMessage>("/test");
271
272 const int queue_size = TestChannelQueueSize(event_loop.get());
273 int msgs_sent = 0;
274 event_loop->AddPhasedLoop(
275 [&](int) {
276 EXPECT_EQ(SendTestMessage(sender), expected_err(msgs_sent, queue_size));
277 msgs_sent++;
278
279 // If send_twice_at_end, send the last two messages (message
280 // queue_size and queue_size + 1) in the same iteration, meaning that
281 // we would be sending very slightly too fast. Otherwise, we will send
282 // message queue_size + 1 in the next iteration and we will continue
283 // to be sending exactly at the channel frequency.
284 if (send_twice_at_end && (msgs_sent == queue_size)) {
285 EXPECT_EQ(SendTestMessage(sender),
286 expected_err(msgs_sent, queue_size));
287 msgs_sent++;
288 }
289
290 if (msgs_sent > queue_size) {
291 factory.Exit();
292 }
293 },
294 std::chrono::duration_cast<std::chrono::nanoseconds>(
295 std::chrono::duration<double>(
296 1.0 / TestChannelFrequency(event_loop.get()))));
297
298 factory.Run();
299}
300
301// Tests that RawSender::Error::kMessagesSentTooFast is not returned
302// when messages are sent at the exact frequency of the channel.
303TEST(SimulatedEventLoopTest, SendingAtExactlyChannelFrequency) {
304 TestSentTooFastCheckEdgeCase([](int, int) { return RawSender::Error::kOk; },
305 false);
306}
307
308// Tests that RawSender::Error::kMessagesSentTooFast is returned
309// when sending exactly one more message than allowed in a channel storage
310// duration.
311TEST(SimulatedEventLoopTest, SendingSlightlyTooFast) {
312 TestSentTooFastCheckEdgeCase(
313 [](const int msgs_sent, const int queue_size) {
314 return (msgs_sent == queue_size ? RawSender::Error::kMessagesSentTooFast
315 : RawSender::Error::kOk);
316 },
317 true);
318}
319
Austin Schuh8fb315a2020-11-19 22:33:58 -0800320// Test that creating an event loop while running dies.
321TEST(SimulatedEventLoopDeathTest, MakeEventLoopWhileRunning) {
322 SimulatedEventLoopTestFactory factory;
323
324 SimulatedEventLoopFactory simulated_event_loop_factory(
325 factory.configuration());
326
327 ::std::unique_ptr<EventLoop> event_loop =
328 simulated_event_loop_factory.MakeEventLoop("ping");
329
330 auto timer = event_loop->AddTimer([&]() {
331 EXPECT_DEATH(
332 {
333 ::std::unique_ptr<EventLoop> event_loop2 =
334 simulated_event_loop_factory.MakeEventLoop("ping");
335 },
336 "event loop while running");
337 simulated_event_loop_factory.Exit();
338 });
339
340 event_loop->OnRun([&event_loop, &timer] {
Philipp Schradera6712522023-07-05 20:25:11 -0700341 timer->Schedule(event_loop->monotonic_now() + chrono::milliseconds(50));
Austin Schuh8fb315a2020-11-19 22:33:58 -0800342 });
343
344 simulated_event_loop_factory.Run();
345}
346
347// Test that creating a watcher after running dies.
348TEST(SimulatedEventLoopDeathTest, MakeWatcherAfterRunning) {
349 SimulatedEventLoopTestFactory factory;
350
351 SimulatedEventLoopFactory simulated_event_loop_factory(
352 factory.configuration());
353
354 ::std::unique_ptr<EventLoop> event_loop =
355 simulated_event_loop_factory.MakeEventLoop("ping");
356
357 simulated_event_loop_factory.RunFor(chrono::seconds(1));
358
359 EXPECT_DEATH(
360 { MessageCounter<TestMessage> counter(event_loop.get(), "/test"); },
361 "Can't add a watcher after running");
362
363 ::std::unique_ptr<EventLoop> event_loop2 =
364 simulated_event_loop_factory.MakeEventLoop("ping");
365
366 simulated_event_loop_factory.RunFor(chrono::seconds(1));
367
368 EXPECT_DEATH(
369 { MessageCounter<TestMessage> counter(event_loop2.get(), "/test"); },
370 "Can't add a watcher after running");
371}
372
Austin Schuh44019f92019-05-19 19:58:27 -0700373// Test that running for a time period with no handlers causes time to progress
374// correctly.
375TEST(SimulatedEventLoopTest, RunForNoHandlers) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800376 SimulatedEventLoopTestFactory factory;
377
378 SimulatedEventLoopFactory simulated_event_loop_factory(
379 factory.configuration());
Austin Schuh44019f92019-05-19 19:58:27 -0700380 ::std::unique_ptr<EventLoop> event_loop =
Austin Schuh5f1cc5c2019-12-01 18:01:11 -0800381 simulated_event_loop_factory.MakeEventLoop("loop");
Austin Schuh44019f92019-05-19 19:58:27 -0700382
383 simulated_event_loop_factory.RunFor(chrono::seconds(1));
384
385 EXPECT_EQ(::aos::monotonic_clock::epoch() + chrono::seconds(1),
Austin Schuh44019f92019-05-19 19:58:27 -0700386 event_loop->monotonic_now());
387}
388
389// Test that running for a time with a periodic handler causes time to end
390// correctly.
391TEST(SimulatedEventLoopTest, RunForTimerHandler) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800392 SimulatedEventLoopTestFactory factory;
393
394 SimulatedEventLoopFactory simulated_event_loop_factory(
395 factory.configuration());
Austin Schuh44019f92019-05-19 19:58:27 -0700396 ::std::unique_ptr<EventLoop> event_loop =
Austin Schuh5f1cc5c2019-12-01 18:01:11 -0800397 simulated_event_loop_factory.MakeEventLoop("loop");
Austin Schuh44019f92019-05-19 19:58:27 -0700398
399 int counter = 0;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700400 auto timer = event_loop->AddTimer([&counter]() { ++counter; });
Austin Schuh44019f92019-05-19 19:58:27 -0700401 event_loop->OnRun([&event_loop, &timer] {
Philipp Schradera6712522023-07-05 20:25:11 -0700402 timer->Schedule(event_loop->monotonic_now() + chrono::milliseconds(50),
403 chrono::milliseconds(100));
Austin Schuh44019f92019-05-19 19:58:27 -0700404 });
405
406 simulated_event_loop_factory.RunFor(chrono::seconds(1));
407
408 EXPECT_EQ(::aos::monotonic_clock::epoch() + chrono::seconds(1),
Austin Schuh44019f92019-05-19 19:58:27 -0700409 event_loop->monotonic_now());
410 EXPECT_EQ(counter, 10);
411}
412
Austin Schuh7d87b672019-12-01 20:23:49 -0800413// Tests that watchers have latency in simulation.
414TEST(SimulatedEventLoopTest, WatcherTimingReport) {
415 SimulatedEventLoopTestFactory factory;
416 factory.set_send_delay(std::chrono::microseconds(50));
417
418 FLAGS_timing_report_ms = 1000;
419 auto loop1 = factory.MakePrimary("primary");
420 loop1->MakeWatcher("/test", [](const TestMessage &) {});
421
422 auto loop2 = factory.Make("sender_loop");
423
424 auto loop3 = factory.Make("report_fetcher");
425
426 Fetcher<timing::Report> report_fetcher =
427 loop3->MakeFetcher<timing::Report>("/aos");
428 EXPECT_FALSE(report_fetcher.Fetch());
429
430 auto sender = loop2->MakeSender<TestMessage>("/test");
431
432 // Send 10 messages in the middle of a timing report period so we get
433 // something interesting back.
434 auto test_timer = loop2->AddTimer([&sender]() {
435 for (int i = 0; i < 10; ++i) {
436 aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
437 TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
438 builder.add_value(200 + i);
milind1f1dca32021-07-03 13:50:07 -0700439 msg.CheckOk(msg.Send(builder.Finish()));
Austin Schuh7d87b672019-12-01 20:23:49 -0800440 }
441 });
442
443 // Quit after 1 timing report, mid way through the next cycle.
444 {
445 auto end_timer = loop1->AddTimer([&factory]() { factory.Exit(); });
Philipp Schradera6712522023-07-05 20:25:11 -0700446 end_timer->Schedule(loop1->monotonic_now() + chrono::milliseconds(2500));
Austin Schuh7d87b672019-12-01 20:23:49 -0800447 end_timer->set_name("end");
448 }
449
450 loop1->OnRun([&test_timer, &loop1]() {
Philipp Schradera6712522023-07-05 20:25:11 -0700451 test_timer->Schedule(loop1->monotonic_now() + chrono::milliseconds(1500));
Austin Schuh7d87b672019-12-01 20:23:49 -0800452 });
453
454 factory.Run();
455
456 // And, since we are here, check that the timing report makes sense.
457 // Start by looking for our event loop's timing.
458 FlatbufferDetachedBuffer<timing::Report> primary_report =
459 FlatbufferDetachedBuffer<timing::Report>::Empty();
460 while (report_fetcher.FetchNext()) {
461 LOG(INFO) << "Report " << FlatbufferToJson(report_fetcher.get());
462 if (report_fetcher->name()->string_view() == "primary") {
463 primary_report = CopyFlatBuffer(report_fetcher.get());
464 }
465 }
466
467 // Check the watcher report.
Ravago Jonescf453ab2020-05-06 21:14:53 -0700468 VLOG(1) << FlatbufferToJson(primary_report, {.multi_line = true});
Austin Schuh7d87b672019-12-01 20:23:49 -0800469
470 EXPECT_EQ(primary_report.message().name()->string_view(), "primary");
471
472 // Just the timing report timer.
473 ASSERT_NE(primary_report.message().timers(), nullptr);
474 EXPECT_EQ(primary_report.message().timers()->size(), 2);
475
476 // No phased loops
477 ASSERT_EQ(primary_report.message().phased_loops(), nullptr);
478
479 // And now confirm that the watcher received all 10 messages, and has latency.
480 ASSERT_NE(primary_report.message().watchers(), nullptr);
481 ASSERT_EQ(primary_report.message().watchers()->size(), 1);
482 EXPECT_EQ(primary_report.message().watchers()->Get(0)->count(), 10);
483 EXPECT_NEAR(
484 primary_report.message().watchers()->Get(0)->wakeup_latency()->average(),
485 0.00005, 1e-9);
486 EXPECT_NEAR(
487 primary_report.message().watchers()->Get(0)->wakeup_latency()->min(),
488 0.00005, 1e-9);
489 EXPECT_NEAR(
490 primary_report.message().watchers()->Get(0)->wakeup_latency()->max(),
491 0.00005, 1e-9);
492 EXPECT_EQ(primary_report.message()
493 .watchers()
494 ->Get(0)
495 ->wakeup_latency()
496 ->standard_deviation(),
497 0.0);
498
499 EXPECT_EQ(
500 primary_report.message().watchers()->Get(0)->handler_time()->average(),
501 0.0);
502 EXPECT_EQ(primary_report.message().watchers()->Get(0)->handler_time()->min(),
503 0.0);
504 EXPECT_EQ(primary_report.message().watchers()->Get(0)->handler_time()->max(),
505 0.0);
506 EXPECT_EQ(primary_report.message()
507 .watchers()
508 ->Get(0)
509 ->handler_time()
510 ->standard_deviation(),
511 0.0);
512}
513
Austin Schuh89c9b812021-02-20 14:42:10 -0800514size_t CountAll(
515 const std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
516 &counters) {
517 size_t count = 0u;
518 for (const std::unique_ptr<MessageCounter<RemoteMessage>> &counter :
519 counters) {
520 count += counter->count();
521 }
522 return count;
523}
524
Austin Schuh4c3b9702020-08-30 11:34:55 -0700525// Tests that ping and pong work when on 2 different nodes, and the message
526// gateway messages are sent out as expected.
Austin Schuh89c9b812021-02-20 14:42:10 -0800527TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodePingPong) {
Austin Schuh898f4972020-01-11 17:21:25 -0800528 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
529 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
Austin Schuh4c3b9702020-08-30 11:34:55 -0700530 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
Austin Schuh898f4972020-01-11 17:21:25 -0800531
532 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
533
534 std::unique_ptr<EventLoop> ping_event_loop =
535 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
536 Ping ping(ping_event_loop.get());
537
538 std::unique_ptr<EventLoop> pong_event_loop =
539 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
540 Pong pong(pong_event_loop.get());
541
542 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
543 simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700544 MessageCounter<examples::Pong> pi2_pong_counter(
545 pi2_pong_counter_event_loop.get(), "/test");
Austin Schuh2f8fd752020-09-01 22:38:28 -0700546 aos::Fetcher<message_bridge::Timestamp> pi1_on_pi2_timestamp_fetcher =
547 pi2_pong_counter_event_loop->MakeFetcher<message_bridge::Timestamp>(
548 "/pi1/aos");
549 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
550 pi2_pong_counter_event_loop->MakeFetcher<examples::Ping>("/test");
Austin Schuh898f4972020-01-11 17:21:25 -0800551
Austin Schuh4c3b9702020-08-30 11:34:55 -0700552 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
553 simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
Austin Schuh898f4972020-01-11 17:21:25 -0800554
555 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
556 simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700557 MessageCounter<examples::Pong> pi1_pong_counter(
558 pi1_pong_counter_event_loop.get(), "/test");
Austin Schuh2f8fd752020-09-01 22:38:28 -0700559 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
560 pi1_pong_counter_event_loop->MakeFetcher<examples::Ping>("/test");
561 aos::Fetcher<message_bridge::Timestamp> pi1_on_pi1_timestamp_fetcher =
562 pi1_pong_counter_event_loop->MakeFetcher<message_bridge::Timestamp>(
563 "/aos");
564
Austin Schuh4c3b9702020-08-30 11:34:55 -0700565 // Count timestamps.
566 MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
567 pi1_pong_counter_event_loop.get(), "/pi1/aos");
568 MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
569 pi2_pong_counter_event_loop.get(), "/pi1/aos");
570 MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
571 pi3_pong_counter_event_loop.get(), "/pi1/aos");
572 MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
573 pi1_pong_counter_event_loop.get(), "/pi2/aos");
574 MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
575 pi2_pong_counter_event_loop.get(), "/pi2/aos");
576 MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
577 pi1_pong_counter_event_loop.get(), "/pi3/aos");
578 MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
579 pi3_pong_counter_event_loop.get(), "/pi3/aos");
580
Austin Schuh2f8fd752020-09-01 22:38:28 -0700581 // Count remote timestamps
Austin Schuh89c9b812021-02-20 14:42:10 -0800582 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
583 remote_timestamps_pi2_on_pi1 =
584 MakePi2OnPi1MessageCounters(pi1_pong_counter_event_loop.get());
585 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
586 remote_timestamps_pi1_on_pi2 =
587 MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700588
Austin Schuh4c3b9702020-08-30 11:34:55 -0700589 // Wait to let timestamp estimation start up before looking for the results.
590 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
591
Austin Schuh8fb315a2020-11-19 22:33:58 -0800592 std::unique_ptr<EventLoop> pi1_statistics_counter_event_loop =
593 simulated_event_loop_factory.MakeEventLoop("pi1_statistics_counter", pi1);
594 std::unique_ptr<EventLoop> pi2_statistics_counter_event_loop =
595 simulated_event_loop_factory.MakeEventLoop("pi2_statistics_counter", pi2);
596 std::unique_ptr<EventLoop> pi3_statistics_counter_event_loop =
597 simulated_event_loop_factory.MakeEventLoop("pi3_statistics_counter", pi3);
598
Austin Schuh4c3b9702020-08-30 11:34:55 -0700599 int pi1_server_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800600 pi1_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700601 "/pi1/aos", [&pi1_server_statistics_count](
602 const message_bridge::ServerStatistics &stats) {
603 VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
604 EXPECT_EQ(stats.connections()->size(), 2u);
605 for (const message_bridge::ServerConnection *connection :
606 *stats.connections()) {
607 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800608 EXPECT_EQ(connection->connection_count(), 1u);
609 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800610 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700611 if (connection->node()->name()->string_view() == "pi2") {
612 EXPECT_GT(connection->sent_packets(), 50);
613 } else if (connection->node()->name()->string_view() == "pi3") {
614 EXPECT_GE(connection->sent_packets(), 5);
615 } else {
616 LOG(FATAL) << "Unknown connection";
617 }
618
619 EXPECT_TRUE(connection->has_monotonic_offset());
620 EXPECT_EQ(connection->monotonic_offset(), 0);
James Kuszmaula6681e22023-05-26 11:20:40 -0700621
622 EXPECT_TRUE(connection->has_channels());
623 int accumulated_sent_count = 0;
624 int accumulated_dropped_count = 0;
625 for (const message_bridge::ServerChannelStatistics *channel :
626 *connection->channels()) {
627 accumulated_sent_count += channel->sent_packets();
628 accumulated_dropped_count += channel->dropped_packets();
629 }
630 EXPECT_EQ(connection->sent_packets(), accumulated_sent_count);
631 EXPECT_EQ(connection->dropped_packets(), accumulated_dropped_count);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700632 }
633 ++pi1_server_statistics_count;
634 });
635
636 int pi2_server_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800637 pi2_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700638 "/pi2/aos", [&pi2_server_statistics_count](
639 const message_bridge::ServerStatistics &stats) {
640 VLOG(1) << "pi2 ServerStatistics " << FlatbufferToJson(&stats);
641 EXPECT_EQ(stats.connections()->size(), 1u);
642
643 const message_bridge::ServerConnection *connection =
644 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800645 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700646 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
647 EXPECT_GT(connection->sent_packets(), 50);
648 EXPECT_TRUE(connection->has_monotonic_offset());
649 EXPECT_EQ(connection->monotonic_offset(), 0);
Austin Schuh367a7f42021-11-23 23:04:36 -0800650 EXPECT_EQ(connection->connection_count(), 1u);
651 EXPECT_EQ(connection->connected_since_time(), 0);
James Kuszmaula6681e22023-05-26 11:20:40 -0700652
653 EXPECT_TRUE(connection->has_channels());
654 int accumulated_sent_count = 0;
655 int accumulated_dropped_count = 0;
656 for (const message_bridge::ServerChannelStatistics *channel :
657 *connection->channels()) {
658 accumulated_sent_count += channel->sent_packets();
659 accumulated_dropped_count += channel->dropped_packets();
660 }
661 EXPECT_EQ(connection->sent_packets(), accumulated_sent_count);
662 EXPECT_EQ(connection->dropped_packets(), accumulated_dropped_count);
663
Austin Schuh4c3b9702020-08-30 11:34:55 -0700664 ++pi2_server_statistics_count;
665 });
666
667 int pi3_server_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800668 pi3_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700669 "/pi3/aos", [&pi3_server_statistics_count](
670 const message_bridge::ServerStatistics &stats) {
671 VLOG(1) << "pi3 ServerStatistics " << FlatbufferToJson(&stats);
672 EXPECT_EQ(stats.connections()->size(), 1u);
673
674 const message_bridge::ServerConnection *connection =
675 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800676 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700677 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
678 EXPECT_GE(connection->sent_packets(), 5);
679 EXPECT_TRUE(connection->has_monotonic_offset());
680 EXPECT_EQ(connection->monotonic_offset(), 0);
Austin Schuh367a7f42021-11-23 23:04:36 -0800681 EXPECT_EQ(connection->connection_count(), 1u);
682 EXPECT_EQ(connection->connected_since_time(), 0);
James Kuszmaula6681e22023-05-26 11:20:40 -0700683
684 EXPECT_TRUE(connection->has_channels());
685 int accumulated_sent_count = 0;
686 int accumulated_dropped_count = 0;
687 for (const message_bridge::ServerChannelStatistics *channel :
688 *connection->channels()) {
689 accumulated_sent_count += channel->sent_packets();
690 accumulated_dropped_count += channel->dropped_packets();
691 }
692 EXPECT_EQ(connection->sent_packets(), accumulated_sent_count);
693 EXPECT_EQ(connection->dropped_packets(), accumulated_dropped_count);
694
Austin Schuh4c3b9702020-08-30 11:34:55 -0700695 ++pi3_server_statistics_count;
696 });
697
698 int pi1_client_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800699 pi1_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700700 "/pi1/aos", [&pi1_client_statistics_count](
701 const message_bridge::ClientStatistics &stats) {
702 VLOG(1) << "pi1 ClientStatistics " << FlatbufferToJson(&stats);
703 EXPECT_EQ(stats.connections()->size(), 2u);
704
705 for (const message_bridge::ClientConnection *connection :
706 *stats.connections()) {
707 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
708 if (connection->node()->name()->string_view() == "pi2") {
709 EXPECT_GT(connection->received_packets(), 50);
710 } else if (connection->node()->name()->string_view() == "pi3") {
711 EXPECT_GE(connection->received_packets(), 5);
712 } else {
713 LOG(FATAL) << "Unknown connection";
714 }
715
Austin Schuhe61d4382021-03-31 21:33:02 -0700716 EXPECT_EQ(connection->partial_deliveries(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700717 EXPECT_TRUE(connection->has_monotonic_offset());
718 EXPECT_EQ(connection->monotonic_offset(), 150000);
Austin Schuh367a7f42021-11-23 23:04:36 -0800719 EXPECT_EQ(connection->connection_count(), 1u);
720 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700721 }
722 ++pi1_client_statistics_count;
723 });
724
725 int pi2_client_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800726 pi2_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700727 "/pi2/aos", [&pi2_client_statistics_count](
728 const message_bridge::ClientStatistics &stats) {
729 VLOG(1) << "pi2 ClientStatistics " << FlatbufferToJson(&stats);
730 EXPECT_EQ(stats.connections()->size(), 1u);
731
732 const message_bridge::ClientConnection *connection =
733 stats.connections()->Get(0);
734 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
735 EXPECT_GT(connection->received_packets(), 50);
Austin Schuhe61d4382021-03-31 21:33:02 -0700736 EXPECT_EQ(connection->partial_deliveries(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700737 EXPECT_TRUE(connection->has_monotonic_offset());
738 EXPECT_EQ(connection->monotonic_offset(), 150000);
Austin Schuh367a7f42021-11-23 23:04:36 -0800739 EXPECT_EQ(connection->connection_count(), 1u);
740 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700741 ++pi2_client_statistics_count;
742 });
743
744 int pi3_client_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800745 pi3_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700746 "/pi3/aos", [&pi3_client_statistics_count](
747 const message_bridge::ClientStatistics &stats) {
748 VLOG(1) << "pi3 ClientStatistics " << FlatbufferToJson(&stats);
749 EXPECT_EQ(stats.connections()->size(), 1u);
750
751 const message_bridge::ClientConnection *connection =
752 stats.connections()->Get(0);
753 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
754 EXPECT_GE(connection->received_packets(), 5);
Austin Schuhe61d4382021-03-31 21:33:02 -0700755 EXPECT_EQ(connection->partial_deliveries(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700756 EXPECT_TRUE(connection->has_monotonic_offset());
757 EXPECT_EQ(connection->monotonic_offset(), 150000);
Austin Schuh367a7f42021-11-23 23:04:36 -0800758 EXPECT_EQ(connection->connection_count(), 1u);
759 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700760 ++pi3_client_statistics_count;
761 });
762
Austin Schuh2f8fd752020-09-01 22:38:28 -0700763 // Find the channel index for both the /pi1/aos Timestamp channel and Ping
764 // channel.
765 const size_t pi1_timestamp_channel =
766 configuration::ChannelIndex(pi1_pong_counter_event_loop->configuration(),
767 pi1_on_pi2_timestamp_fetcher.channel());
768 const size_t ping_timestamp_channel =
769 configuration::ChannelIndex(pi1_pong_counter_event_loop->configuration(),
770 ping_on_pi2_fetcher.channel());
771
772 for (const Channel *channel :
773 *pi1_pong_counter_event_loop->configuration()->channels()) {
774 VLOG(1) << "Channel "
775 << configuration::ChannelIndex(
776 pi1_pong_counter_event_loop->configuration(), channel)
777 << " " << configuration::CleanedChannelToString(channel);
778 }
779
Austin Schuh8fb315a2020-11-19 22:33:58 -0800780 std::unique_ptr<EventLoop> pi1_remote_timestamp =
781 simulated_event_loop_factory.MakeEventLoop("pi1_remote_timestamp", pi1);
782
Austin Schuh89c9b812021-02-20 14:42:10 -0800783 for (std::pair<int, std::string> channel :
784 shared()
785 ? std::vector<std::pair<
786 int, std::string>>{{-1, "/pi1/aos/remote_timestamps/pi2"}}
787 : std::vector<std::pair<int, std::string>>{
788 {pi1_timestamp_channel,
789 "/pi1/aos/remote_timestamps/pi2/pi1/aos/"
790 "aos-message_bridge-Timestamp"},
791 {ping_timestamp_channel,
792 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping"}}) {
793 // For each remote timestamp we get back, confirm that it is either a ping
794 // message, or a timestamp we sent out. Also confirm that the timestamps
795 // are correct.
796 pi1_remote_timestamp->MakeWatcher(
797 channel.second,
798 [pi1_timestamp_channel, ping_timestamp_channel, &ping_on_pi2_fetcher,
799 &ping_on_pi1_fetcher, &pi1_on_pi2_timestamp_fetcher,
800 &pi1_on_pi1_timestamp_fetcher, &simulated_event_loop_factory, pi2,
801 channel_index = channel.first](const RemoteMessage &header) {
802 VLOG(1) << aos::FlatbufferToJson(&header);
803 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuhcdd90272021-03-15 12:46:16 -0700804 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
Austin Schuh89c9b812021-02-20 14:42:10 -0800805 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
Austin Schuhcdd90272021-03-15 12:46:16 -0700806 ->boot_uuid());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700807
Austin Schuh89c9b812021-02-20 14:42:10 -0800808 const aos::monotonic_clock::time_point header_monotonic_sent_time(
809 chrono::nanoseconds(header.monotonic_sent_time()));
810 const aos::realtime_clock::time_point header_realtime_sent_time(
811 chrono::nanoseconds(header.realtime_sent_time()));
812 const aos::monotonic_clock::time_point header_monotonic_remote_time(
813 chrono::nanoseconds(header.monotonic_remote_time()));
814 const aos::realtime_clock::time_point header_realtime_remote_time(
815 chrono::nanoseconds(header.realtime_remote_time()));
Austin Schuh2f8fd752020-09-01 22:38:28 -0700816
Austin Schuh89c9b812021-02-20 14:42:10 -0800817 if (channel_index != -1) {
818 ASSERT_EQ(channel_index, header.channel_index());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700819 }
820
Austin Schuh89c9b812021-02-20 14:42:10 -0800821 const Context *pi1_context = nullptr;
822 const Context *pi2_context = nullptr;
823
824 if (header.channel_index() == pi1_timestamp_channel) {
825 // Find the forwarded message.
826 while (pi1_on_pi2_timestamp_fetcher.context().monotonic_event_time <
827 header_monotonic_sent_time) {
828 ASSERT_TRUE(pi1_on_pi2_timestamp_fetcher.FetchNext());
829 }
830
831 // And the source message.
832 while (pi1_on_pi1_timestamp_fetcher.context().monotonic_event_time <
833 header_monotonic_remote_time) {
834 ASSERT_TRUE(pi1_on_pi1_timestamp_fetcher.FetchNext());
835 }
836
837 pi1_context = &pi1_on_pi1_timestamp_fetcher.context();
838 pi2_context = &pi1_on_pi2_timestamp_fetcher.context();
839 } else if (header.channel_index() == ping_timestamp_channel) {
840 // Find the forwarded message.
841 while (ping_on_pi2_fetcher.context().monotonic_event_time <
842 header_monotonic_sent_time) {
843 ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
844 }
845
846 // And the source message.
847 while (ping_on_pi1_fetcher.context().monotonic_event_time <
848 header_monotonic_remote_time) {
849 ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
850 }
851
852 pi1_context = &ping_on_pi1_fetcher.context();
853 pi2_context = &ping_on_pi2_fetcher.context();
854 } else {
855 LOG(FATAL) << "Unknown channel";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700856 }
857
Austin Schuh89c9b812021-02-20 14:42:10 -0800858 // Confirm the forwarded message has matching timestamps to the
859 // timestamps we got back.
860 EXPECT_EQ(pi2_context->queue_index, header.queue_index());
861 EXPECT_EQ(pi2_context->remote_queue_index,
862 header.remote_queue_index());
863 EXPECT_EQ(pi2_context->monotonic_event_time,
864 header_monotonic_sent_time);
865 EXPECT_EQ(pi2_context->realtime_event_time,
866 header_realtime_sent_time);
867 EXPECT_EQ(pi2_context->realtime_remote_time,
868 header_realtime_remote_time);
869 EXPECT_EQ(pi2_context->monotonic_remote_time,
870 header_monotonic_remote_time);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700871
Austin Schuh89c9b812021-02-20 14:42:10 -0800872 // Confirm the forwarded message also matches the source message.
873 EXPECT_EQ(pi1_context->queue_index, header.remote_queue_index());
874 EXPECT_EQ(pi1_context->monotonic_event_time,
875 header_monotonic_remote_time);
876 EXPECT_EQ(pi1_context->realtime_event_time,
877 header_realtime_remote_time);
878 });
879 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700880
Austin Schuh4c3b9702020-08-30 11:34:55 -0700881 simulated_event_loop_factory.RunFor(chrono::seconds(10) -
882 chrono::milliseconds(500) +
883 chrono::milliseconds(5));
884
885 EXPECT_EQ(pi1_pong_counter.count(), 1001);
886 EXPECT_EQ(pi2_pong_counter.count(), 1001);
887
888 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 100);
889 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 100);
890 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 100);
891 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 100);
892 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 100);
893 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 100);
894 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 100);
895
Austin Schuh20ac95d2020-12-05 17:24:19 -0800896 EXPECT_EQ(pi1_server_statistics_count, 10);
897 EXPECT_EQ(pi2_server_statistics_count, 10);
898 EXPECT_EQ(pi3_server_statistics_count, 10);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700899
900 EXPECT_EQ(pi1_client_statistics_count, 95);
901 EXPECT_EQ(pi2_client_statistics_count, 95);
902 EXPECT_EQ(pi3_client_statistics_count, 95);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700903
904 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -0800905 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 1101);
906 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 1101);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700907}
908
909// Tests that an offset between nodes can be recovered and shows up in
910// ServerStatistics correctly.
911TEST(SimulatedEventLoopTest, MultinodePingPongWithOffset) {
912 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
Austin Schuh373f1762021-06-02 21:07:09 -0700913 aos::configuration::ReadConfig(ArtifactPath(
914 "aos/events/multinode_pingpong_test_combined_config.json"));
Austin Schuh4c3b9702020-08-30 11:34:55 -0700915 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
Austin Schuh87dd3832021-01-01 23:07:31 -0800916 const size_t pi1_index = configuration::GetNodeIndex(&config.message(), pi1);
917 ASSERT_EQ(pi1_index, 0u);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700918 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
Austin Schuh87dd3832021-01-01 23:07:31 -0800919 const size_t pi2_index = configuration::GetNodeIndex(&config.message(), pi2);
920 ASSERT_EQ(pi2_index, 1u);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700921 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
Austin Schuh87dd3832021-01-01 23:07:31 -0800922 const size_t pi3_index = configuration::GetNodeIndex(&config.message(), pi3);
923 ASSERT_EQ(pi3_index, 2u);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700924
Austin Schuh87dd3832021-01-01 23:07:31 -0800925 message_bridge::TestingTimeConverter time(
926 configuration::NodesCount(&config.message()));
Austin Schuh4c3b9702020-08-30 11:34:55 -0700927 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
Austin Schuh58646e22021-08-23 23:51:46 -0700928 simulated_event_loop_factory.SetTimeConverter(&time);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700929
930 constexpr chrono::milliseconds kOffset{1501};
Austin Schuh87dd3832021-01-01 23:07:31 -0800931 time.AddNextTimestamp(
932 distributed_clock::epoch(),
Austin Schuh58646e22021-08-23 23:51:46 -0700933 {BootTimestamp::epoch(), BootTimestamp::epoch() + kOffset,
934 BootTimestamp::epoch()});
Austin Schuh4c3b9702020-08-30 11:34:55 -0700935
936 std::unique_ptr<EventLoop> ping_event_loop =
937 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
938 Ping ping(ping_event_loop.get());
939
940 std::unique_ptr<EventLoop> pong_event_loop =
941 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
942 Pong pong(pong_event_loop.get());
943
Austin Schuh8fb315a2020-11-19 22:33:58 -0800944 // Wait to let timestamp estimation start up before looking for the results.
945 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
946
Austin Schuh87dd3832021-01-01 23:07:31 -0800947 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
948 simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
949
Austin Schuh4c3b9702020-08-30 11:34:55 -0700950 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
951 simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
952
953 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
954 simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
955
Austin Schuh4c3b9702020-08-30 11:34:55 -0700956 // Confirm the offsets are being recovered correctly.
957 int pi1_server_statistics_count = 0;
958 pi1_pong_counter_event_loop->MakeWatcher(
959 "/pi1/aos", [&pi1_server_statistics_count,
960 kOffset](const message_bridge::ServerStatistics &stats) {
961 VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
962 EXPECT_EQ(stats.connections()->size(), 2u);
963 for (const message_bridge::ServerConnection *connection :
964 *stats.connections()) {
965 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800966 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700967 if (connection->node()->name()->string_view() == "pi2") {
968 EXPECT_EQ(connection->monotonic_offset(),
969 chrono::nanoseconds(kOffset).count());
970 } else if (connection->node()->name()->string_view() == "pi3") {
971 EXPECT_EQ(connection->monotonic_offset(), 0);
972 } else {
973 LOG(FATAL) << "Unknown connection";
974 }
975
976 EXPECT_TRUE(connection->has_monotonic_offset());
977 }
978 ++pi1_server_statistics_count;
979 });
980
981 int pi2_server_statistics_count = 0;
982 pi2_pong_counter_event_loop->MakeWatcher(
983 "/pi2/aos", [&pi2_server_statistics_count,
984 kOffset](const message_bridge::ServerStatistics &stats) {
985 VLOG(1) << "pi2 ServerStatistics " << FlatbufferToJson(&stats);
986 EXPECT_EQ(stats.connections()->size(), 1u);
987
988 const message_bridge::ServerConnection *connection =
989 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800990 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700991 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
992 EXPECT_TRUE(connection->has_monotonic_offset());
993 EXPECT_EQ(connection->monotonic_offset(),
994 -chrono::nanoseconds(kOffset).count());
995 ++pi2_server_statistics_count;
996 });
997
998 int pi3_server_statistics_count = 0;
999 pi3_pong_counter_event_loop->MakeWatcher(
1000 "/pi3/aos", [&pi3_server_statistics_count](
1001 const message_bridge::ServerStatistics &stats) {
1002 VLOG(1) << "pi3 ServerStatistics " << FlatbufferToJson(&stats);
1003 EXPECT_EQ(stats.connections()->size(), 1u);
1004
1005 const message_bridge::ServerConnection *connection =
1006 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001007 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -07001008 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1009 EXPECT_TRUE(connection->has_monotonic_offset());
1010 EXPECT_EQ(connection->monotonic_offset(), 0);
1011 ++pi3_server_statistics_count;
1012 });
1013
1014 simulated_event_loop_factory.RunFor(chrono::seconds(10) -
1015 chrono::milliseconds(500) +
1016 chrono::milliseconds(5));
1017
Austin Schuh20ac95d2020-12-05 17:24:19 -08001018 EXPECT_EQ(pi1_server_statistics_count, 10);
Austin Schuh58646e22021-08-23 23:51:46 -07001019 EXPECT_EQ(pi2_server_statistics_count, 10);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001020 EXPECT_EQ(pi3_server_statistics_count, 10);
Austin Schuh4c3b9702020-08-30 11:34:55 -07001021}
1022
1023// Test that disabling statistics actually disables them.
Austin Schuh89c9b812021-02-20 14:42:10 -08001024TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeWithoutStatistics) {
Austin Schuh4c3b9702020-08-30 11:34:55 -07001025 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
1026 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
1027 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
1028
1029 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
1030 simulated_event_loop_factory.DisableStatistics();
1031
1032 std::unique_ptr<EventLoop> ping_event_loop =
1033 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
1034 Ping ping(ping_event_loop.get());
1035
1036 std::unique_ptr<EventLoop> pong_event_loop =
1037 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
1038 Pong pong(pong_event_loop.get());
1039
1040 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
1041 simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
1042
1043 MessageCounter<examples::Pong> pi2_pong_counter(
1044 pi2_pong_counter_event_loop.get(), "/test");
1045
1046 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
1047 simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
1048
1049 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
1050 simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
1051
1052 MessageCounter<examples::Pong> pi1_pong_counter(
1053 pi1_pong_counter_event_loop.get(), "/test");
1054
1055 // Count timestamps.
1056 MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
1057 pi1_pong_counter_event_loop.get(), "/pi1/aos");
1058 MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
1059 pi2_pong_counter_event_loop.get(), "/pi1/aos");
1060 MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
1061 pi3_pong_counter_event_loop.get(), "/pi1/aos");
1062 MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
1063 pi1_pong_counter_event_loop.get(), "/pi2/aos");
1064 MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
1065 pi2_pong_counter_event_loop.get(), "/pi2/aos");
1066 MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
1067 pi1_pong_counter_event_loop.get(), "/pi3/aos");
1068 MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
1069 pi3_pong_counter_event_loop.get(), "/pi3/aos");
1070
Austin Schuh2f8fd752020-09-01 22:38:28 -07001071 // Count remote timestamps
Austin Schuh89c9b812021-02-20 14:42:10 -08001072 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1073 remote_timestamps_pi2_on_pi1 =
1074 MakePi2OnPi1MessageCounters(pi1_pong_counter_event_loop.get());
1075 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1076 remote_timestamps_pi1_on_pi2 =
1077 MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
Austin Schuh2f8fd752020-09-01 22:38:28 -07001078
Austin Schuh4c3b9702020-08-30 11:34:55 -07001079 MessageCounter<message_bridge::ServerStatistics>
1080 pi1_server_statistics_counter(pi1_pong_counter_event_loop.get(),
1081 "/pi1/aos");
1082 MessageCounter<message_bridge::ServerStatistics>
1083 pi2_server_statistics_counter(pi2_pong_counter_event_loop.get(),
1084 "/pi2/aos");
1085 MessageCounter<message_bridge::ServerStatistics>
1086 pi3_server_statistics_counter(pi3_pong_counter_event_loop.get(),
1087 "/pi3/aos");
1088
1089 MessageCounter<message_bridge::ClientStatistics>
1090 pi1_client_statistics_counter(pi1_pong_counter_event_loop.get(),
1091 "/pi1/aos");
1092 MessageCounter<message_bridge::ClientStatistics>
1093 pi2_client_statistics_counter(pi2_pong_counter_event_loop.get(),
1094 "/pi2/aos");
1095 MessageCounter<message_bridge::ClientStatistics>
1096 pi3_client_statistics_counter(pi3_pong_counter_event_loop.get(),
1097 "/pi3/aos");
Austin Schuh898f4972020-01-11 17:21:25 -08001098
1099 simulated_event_loop_factory.RunFor(chrono::seconds(10) +
1100 chrono::milliseconds(5));
1101
Austin Schuh4c3b9702020-08-30 11:34:55 -07001102 EXPECT_EQ(pi1_pong_counter.count(), 1001u);
1103 EXPECT_EQ(pi2_pong_counter.count(), 1001u);
1104
1105 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 0u);
1106 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 0u);
1107 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 0u);
1108 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 0u);
1109 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 0u);
1110 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 0u);
1111 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 0u);
1112
1113 EXPECT_EQ(pi1_server_statistics_counter.count(), 0u);
1114 EXPECT_EQ(pi2_server_statistics_counter.count(), 0u);
1115 EXPECT_EQ(pi3_server_statistics_counter.count(), 0u);
1116
1117 EXPECT_EQ(pi1_client_statistics_counter.count(), 0u);
1118 EXPECT_EQ(pi2_client_statistics_counter.count(), 0u);
1119 EXPECT_EQ(pi3_client_statistics_counter.count(), 0u);
Austin Schuh2f8fd752020-09-01 22:38:28 -07001120
1121 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001122 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 1001);
1123 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 1001);
Austin Schuh898f4972020-01-11 17:21:25 -08001124}
1125
Austin Schuhc0b0f722020-12-12 18:36:06 -08001126bool AllConnected(const message_bridge::ServerStatistics *server_statistics) {
1127 for (const message_bridge::ServerConnection *connection :
1128 *server_statistics->connections()) {
1129 if (connection->state() != message_bridge::State::CONNECTED) {
1130 return false;
1131 }
1132 }
1133 return true;
1134}
1135
1136bool AllConnectedBut(const message_bridge::ServerStatistics *server_statistics,
1137 std::string_view target) {
1138 for (const message_bridge::ServerConnection *connection :
1139 *server_statistics->connections()) {
1140 if (connection->node()->name()->string_view() == target) {
1141 if (connection->state() == message_bridge::State::CONNECTED) {
1142 return false;
1143 }
1144 } else {
1145 if (connection->state() != message_bridge::State::CONNECTED) {
1146 return false;
1147 }
1148 }
1149 }
1150 return true;
1151}
1152
1153bool AllConnected(const message_bridge::ClientStatistics *client_statistics) {
1154 for (const message_bridge::ClientConnection *connection :
1155 *client_statistics->connections()) {
1156 if (connection->state() != message_bridge::State::CONNECTED) {
1157 return false;
1158 }
Austin Schuh367a7f42021-11-23 23:04:36 -08001159 EXPECT_TRUE(connection->has_boot_uuid());
1160 EXPECT_TRUE(connection->has_connected_since_time());
1161 EXPECT_TRUE(connection->has_connection_count());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001162 }
1163 return true;
1164}
1165
1166bool AllConnectedBut(const message_bridge::ClientStatistics *client_statistics,
1167 std::string_view target) {
1168 for (const message_bridge::ClientConnection *connection :
1169 *client_statistics->connections()) {
1170 if (connection->node()->name()->string_view() == target) {
1171 if (connection->state() == message_bridge::State::CONNECTED) {
1172 return false;
1173 }
Austin Schuh367a7f42021-11-23 23:04:36 -08001174 EXPECT_FALSE(connection->has_boot_uuid());
1175 EXPECT_FALSE(connection->has_connected_since_time());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001176 } else {
1177 if (connection->state() != message_bridge::State::CONNECTED) {
1178 return false;
1179 }
Austin Schuh367a7f42021-11-23 23:04:36 -08001180 EXPECT_TRUE(connection->has_boot_uuid());
1181 EXPECT_TRUE(connection->has_connected_since_time());
1182 EXPECT_TRUE(connection->has_connection_count());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001183 }
1184 }
1185 return true;
1186}
1187
Austin Schuh367a7f42021-11-23 23:04:36 -08001188int ConnectedCount(const message_bridge::ClientStatistics *client_statistics,
1189 std::string_view target) {
1190 for (const message_bridge::ClientConnection *connection :
1191 *client_statistics->connections()) {
1192 if (connection->node()->name()->string_view() == target) {
1193 return connection->connection_count();
1194 }
1195 }
1196 return 0;
1197}
1198
1199int ConnectedCount(const message_bridge::ServerStatistics *server_statistics,
1200 std::string_view target) {
1201 for (const message_bridge::ServerConnection *connection :
1202 *server_statistics->connections()) {
1203 if (connection->node()->name()->string_view() == target) {
1204 return connection->connection_count();
1205 }
1206 }
1207 return 0;
1208}
1209
Austin Schuhc0b0f722020-12-12 18:36:06 -08001210// Test that disconnecting nodes actually disconnects them.
Austin Schuh89c9b812021-02-20 14:42:10 -08001211TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeDisconnect) {
Austin Schuhc0b0f722020-12-12 18:36:06 -08001212 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
1213
Austin Schuh58646e22021-08-23 23:51:46 -07001214 NodeEventLoopFactory *pi1 =
1215 simulated_event_loop_factory.GetNodeEventLoopFactory("pi1");
1216 NodeEventLoopFactory *pi2 =
1217 simulated_event_loop_factory.GetNodeEventLoopFactory("pi2");
1218 NodeEventLoopFactory *pi3 =
1219 simulated_event_loop_factory.GetNodeEventLoopFactory("pi3");
1220
1221 std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001222 Ping ping(ping_event_loop.get());
1223
Austin Schuh58646e22021-08-23 23:51:46 -07001224 std::unique_ptr<EventLoop> pong_event_loop = pi2->MakeEventLoop("pong");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001225 Pong pong(pong_event_loop.get());
1226
1227 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
Austin Schuh58646e22021-08-23 23:51:46 -07001228 pi2->MakeEventLoop("pi2_pong_counter");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001229
1230 MessageCounter<examples::Pong> pi2_pong_counter(
1231 pi2_pong_counter_event_loop.get(), "/test");
1232
1233 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
Austin Schuh58646e22021-08-23 23:51:46 -07001234 pi3->MakeEventLoop("pi3_pong_counter");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001235
1236 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
Austin Schuh58646e22021-08-23 23:51:46 -07001237 pi1->MakeEventLoop("pi1_pong_counter");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001238
1239 MessageCounter<examples::Pong> pi1_pong_counter(
1240 pi1_pong_counter_event_loop.get(), "/test");
1241
1242 // Count timestamps.
1243 MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
1244 pi1_pong_counter_event_loop.get(), "/pi1/aos");
1245 MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
1246 pi2_pong_counter_event_loop.get(), "/pi1/aos");
1247 MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
1248 pi3_pong_counter_event_loop.get(), "/pi1/aos");
1249 MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
1250 pi1_pong_counter_event_loop.get(), "/pi2/aos");
1251 MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
1252 pi2_pong_counter_event_loop.get(), "/pi2/aos");
1253 MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
1254 pi1_pong_counter_event_loop.get(), "/pi3/aos");
1255 MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
1256 pi3_pong_counter_event_loop.get(), "/pi3/aos");
1257
1258 // Count remote timestamps
Austin Schuh89c9b812021-02-20 14:42:10 -08001259 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1260 remote_timestamps_pi2_on_pi1 =
1261 MakePi2OnPi1MessageCounters(pi1_pong_counter_event_loop.get());
1262 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1263 remote_timestamps_pi1_on_pi2 =
1264 MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001265
1266 MessageCounter<message_bridge::ServerStatistics>
Austin Schuh58646e22021-08-23 23:51:46 -07001267 *pi1_server_statistics_counter;
1268 pi1->OnStartup([pi1, &pi1_server_statistics_counter]() {
1269 pi1_server_statistics_counter =
1270 pi1->AlwaysStart<MessageCounter<message_bridge::ServerStatistics>>(
1271 "pi1_server_statistics_counter", "/pi1/aos");
1272 });
1273
Austin Schuhc0b0f722020-12-12 18:36:06 -08001274 aos::Fetcher<message_bridge::ServerStatistics> pi1_server_statistics_fetcher =
1275 pi1_pong_counter_event_loop
1276 ->MakeFetcher<message_bridge::ServerStatistics>("/pi1/aos");
1277 aos::Fetcher<message_bridge::ClientStatistics> pi1_client_statistics_fetcher =
1278 pi1_pong_counter_event_loop
1279 ->MakeFetcher<message_bridge::ClientStatistics>("/pi1/aos");
1280
1281 MessageCounter<message_bridge::ServerStatistics>
Austin Schuh58646e22021-08-23 23:51:46 -07001282 *pi2_server_statistics_counter;
1283 pi2->OnStartup([pi2, &pi2_server_statistics_counter]() {
1284 pi2_server_statistics_counter =
1285 pi2->AlwaysStart<MessageCounter<message_bridge::ServerStatistics>>(
1286 "pi2_server_statistics_counter", "/pi2/aos");
1287 });
Austin Schuhc0b0f722020-12-12 18:36:06 -08001288 aos::Fetcher<message_bridge::ServerStatistics> pi2_server_statistics_fetcher =
1289 pi2_pong_counter_event_loop
1290 ->MakeFetcher<message_bridge::ServerStatistics>("/pi2/aos");
1291 aos::Fetcher<message_bridge::ClientStatistics> pi2_client_statistics_fetcher =
1292 pi2_pong_counter_event_loop
1293 ->MakeFetcher<message_bridge::ClientStatistics>("/pi2/aos");
1294
1295 MessageCounter<message_bridge::ServerStatistics>
Austin Schuh58646e22021-08-23 23:51:46 -07001296 *pi3_server_statistics_counter;
1297 pi3->OnStartup([pi3, &pi3_server_statistics_counter]() {
1298 pi3_server_statistics_counter =
1299 pi3->AlwaysStart<MessageCounter<message_bridge::ServerStatistics>>(
1300 "pi3_server_statistics_counter", "/pi3/aos");
1301 });
Austin Schuhc0b0f722020-12-12 18:36:06 -08001302 aos::Fetcher<message_bridge::ServerStatistics> pi3_server_statistics_fetcher =
1303 pi3_pong_counter_event_loop
1304 ->MakeFetcher<message_bridge::ServerStatistics>("/pi3/aos");
1305 aos::Fetcher<message_bridge::ClientStatistics> pi3_client_statistics_fetcher =
1306 pi3_pong_counter_event_loop
1307 ->MakeFetcher<message_bridge::ClientStatistics>("/pi3/aos");
1308
1309 MessageCounter<message_bridge::ClientStatistics>
1310 pi1_client_statistics_counter(pi1_pong_counter_event_loop.get(),
1311 "/pi1/aos");
1312 MessageCounter<message_bridge::ClientStatistics>
1313 pi2_client_statistics_counter(pi2_pong_counter_event_loop.get(),
1314 "/pi2/aos");
1315 MessageCounter<message_bridge::ClientStatistics>
1316 pi3_client_statistics_counter(pi3_pong_counter_event_loop.get(),
1317 "/pi3/aos");
1318
James Kuszmaul86e86c32022-07-21 17:39:47 -07001319 std::vector<std::unique_ptr<aos::EventLoop>> statistics_watcher_loops;
1320 statistics_watcher_loops.emplace_back(pi1->MakeEventLoop("test"));
1321 statistics_watcher_loops.emplace_back(pi2->MakeEventLoop("test"));
1322 statistics_watcher_loops.emplace_back(pi3->MakeEventLoop("test"));
1323 // The currenct contract is that, if all nodes boot simultaneously in
1324 // simulation, that they should all act as if they area already connected,
1325 // without ever observing the transition from disconnected to connected (note
1326 // that on a real system the ServerStatistics message will get resent for each
1327 // and every new connection, even if the new connections happen
1328 // "simultaneously"--in simulation, we are essentially acting as if we are
1329 // starting execution in an already running system, rather than observing the
1330 // boot process).
1331 for (auto &event_loop : statistics_watcher_loops) {
1332 event_loop->MakeWatcher(
1333 "/aos", [](const message_bridge::ServerStatistics &msg) {
1334 for (const message_bridge::ServerConnection *connection :
1335 *msg.connections()) {
1336 EXPECT_EQ(message_bridge::State::CONNECTED, connection->state())
1337 << connection->node()->name()->string_view();
1338 }
1339 });
1340 }
1341
Austin Schuhc0b0f722020-12-12 18:36:06 -08001342 simulated_event_loop_factory.RunFor(chrono::seconds(2) +
1343 chrono::milliseconds(5));
1344
James Kuszmaul86e86c32022-07-21 17:39:47 -07001345 statistics_watcher_loops.clear();
1346
Austin Schuhc0b0f722020-12-12 18:36:06 -08001347 EXPECT_EQ(pi1_pong_counter.count(), 201u);
1348 EXPECT_EQ(pi2_pong_counter.count(), 201u);
1349
1350 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 20u);
1351 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 20u);
1352 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 20u);
1353 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 20u);
1354 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 20u);
1355 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 20u);
1356 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 20u);
1357
Austin Schuh58646e22021-08-23 23:51:46 -07001358 EXPECT_EQ(pi1_server_statistics_counter->count(), 2u);
1359 EXPECT_EQ(pi2_server_statistics_counter->count(), 2u);
1360 EXPECT_EQ(pi3_server_statistics_counter->count(), 2u);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001361
1362 EXPECT_EQ(pi1_client_statistics_counter.count(), 20u);
1363 EXPECT_EQ(pi2_client_statistics_counter.count(), 20u);
1364 EXPECT_EQ(pi3_client_statistics_counter.count(), 20u);
1365
1366 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001367 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 221);
1368 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 221);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001369
1370 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1371 EXPECT_TRUE(AllConnected(pi1_server_statistics_fetcher.get()))
1372 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1373 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1374 EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
1375 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1376 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1377 EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
1378 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1379 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1380 EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
1381 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1382 EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
1383 EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
1384 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
1385 EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
1386 EXPECT_TRUE(AllConnected(pi3_client_statistics_fetcher.get()))
1387 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1388
Austin Schuh58646e22021-08-23 23:51:46 -07001389 pi1->Disconnect(pi3->node());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001390
1391 simulated_event_loop_factory.RunFor(chrono::seconds(2));
1392
1393 EXPECT_EQ(pi1_pong_counter.count(), 401u);
1394 EXPECT_EQ(pi2_pong_counter.count(), 401u);
1395
1396 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 40u);
1397 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 40u);
1398 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 20u);
1399 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 40u);
1400 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 40u);
1401 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 40u);
1402 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 40u);
1403
Austin Schuh58646e22021-08-23 23:51:46 -07001404 EXPECT_EQ(pi1_server_statistics_counter->count(), 4u);
1405 EXPECT_EQ(pi2_server_statistics_counter->count(), 4u);
1406 EXPECT_EQ(pi3_server_statistics_counter->count(), 4u);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001407
1408 EXPECT_EQ(pi1_client_statistics_counter.count(), 40u);
1409 EXPECT_EQ(pi2_client_statistics_counter.count(), 40u);
1410 EXPECT_EQ(pi3_client_statistics_counter.count(), 40u);
1411
1412 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001413 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 441);
1414 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 441);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001415
1416 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1417 EXPECT_TRUE(AllConnectedBut(pi1_server_statistics_fetcher.get(), "pi3"))
1418 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1419 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1420 EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
1421 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1422 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1423 EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
1424 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1425 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1426 EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
1427 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1428 EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
1429 EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
1430 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
1431 EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
1432 EXPECT_TRUE(AllConnectedBut(pi3_client_statistics_fetcher.get(), "pi1"))
1433 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1434
Austin Schuh58646e22021-08-23 23:51:46 -07001435 pi1->Connect(pi3->node());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001436
1437 simulated_event_loop_factory.RunFor(chrono::seconds(2));
1438
Austin Schuh367a7f42021-11-23 23:04:36 -08001439 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1440 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1441 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1442 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1443 EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
1444 EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
1445
1446 EXPECT_EQ(ConnectedCount(pi1_server_statistics_fetcher.get(), "pi3"), 2u)
1447 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1448 EXPECT_EQ(ConnectedCount(pi1_server_statistics_fetcher.get(), "pi2"), 1u)
1449 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1450 EXPECT_EQ(ConnectedCount(pi1_client_statistics_fetcher.get(), "pi3"), 1u)
1451 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1452 EXPECT_EQ(ConnectedCount(pi1_client_statistics_fetcher.get(), "pi2"), 1u)
1453 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1454
1455 EXPECT_EQ(ConnectedCount(pi2_server_statistics_fetcher.get(), "pi1"), 1u)
1456 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1457 EXPECT_EQ(ConnectedCount(pi2_client_statistics_fetcher.get(), "pi1"), 1u)
1458 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1459
1460 EXPECT_EQ(ConnectedCount(pi3_server_statistics_fetcher.get(), "pi1"), 1u)
1461 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
1462 EXPECT_EQ(ConnectedCount(pi3_client_statistics_fetcher.get(), "pi1"), 2u)
1463 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1464
Austin Schuhc0b0f722020-12-12 18:36:06 -08001465 EXPECT_EQ(pi1_pong_counter.count(), 601u);
1466 EXPECT_EQ(pi2_pong_counter.count(), 601u);
1467
1468 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 60u);
1469 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 60u);
1470 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 40u);
1471 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 60u);
1472 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 60u);
1473 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 60u);
1474 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 60u);
1475
Austin Schuh58646e22021-08-23 23:51:46 -07001476 EXPECT_EQ(pi1_server_statistics_counter->count(), 6u);
1477 EXPECT_EQ(pi2_server_statistics_counter->count(), 6u);
1478 EXPECT_EQ(pi3_server_statistics_counter->count(), 6u);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001479
1480 EXPECT_EQ(pi1_client_statistics_counter.count(), 60u);
1481 EXPECT_EQ(pi2_client_statistics_counter.count(), 60u);
1482 EXPECT_EQ(pi3_client_statistics_counter.count(), 60u);
1483
1484 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001485 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 661);
1486 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 661);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001487
Austin Schuhc0b0f722020-12-12 18:36:06 -08001488 EXPECT_TRUE(AllConnected(pi1_server_statistics_fetcher.get()))
1489 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001490 EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
1491 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001492 EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
1493 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001494 EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
1495 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001496 EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
1497 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001498 EXPECT_TRUE(AllConnected(pi3_client_statistics_fetcher.get()))
1499 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1500}
1501
Austin Schuh2febf0d2020-09-21 22:24:30 -07001502// Tests that the time offset having a slope doesn't break the world.
1503// SimulatedMessageBridge has enough self consistency CHECK statements to
1504// confirm, and we can can also check a message in each direction to make sure
1505// it gets delivered as expected.
1506TEST(SimulatedEventLoopTest, MultinodePingPongWithOffsetAndSlope) {
1507 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
Austin Schuh373f1762021-06-02 21:07:09 -07001508 aos::configuration::ReadConfig(ArtifactPath(
1509 "aos/events/multinode_pingpong_test_combined_config.json"));
Austin Schuh2febf0d2020-09-21 22:24:30 -07001510 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
Austin Schuh87dd3832021-01-01 23:07:31 -08001511 const size_t pi1_index = configuration::GetNodeIndex(&config.message(), pi1);
1512 ASSERT_EQ(pi1_index, 0u);
Austin Schuh2febf0d2020-09-21 22:24:30 -07001513 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
Austin Schuh87dd3832021-01-01 23:07:31 -08001514 const size_t pi2_index = configuration::GetNodeIndex(&config.message(), pi2);
1515 ASSERT_EQ(pi2_index, 1u);
1516 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
1517 const size_t pi3_index = configuration::GetNodeIndex(&config.message(), pi3);
1518 ASSERT_EQ(pi3_index, 2u);
Austin Schuh2febf0d2020-09-21 22:24:30 -07001519
Austin Schuh87dd3832021-01-01 23:07:31 -08001520 message_bridge::TestingTimeConverter time(
1521 configuration::NodesCount(&config.message()));
Austin Schuh2febf0d2020-09-21 22:24:30 -07001522 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
Austin Schuh58646e22021-08-23 23:51:46 -07001523 simulated_event_loop_factory.SetTimeConverter(&time);
Austin Schuh2febf0d2020-09-21 22:24:30 -07001524
Austin Schuh2febf0d2020-09-21 22:24:30 -07001525 constexpr chrono::milliseconds kOffset{150100};
Austin Schuh87dd3832021-01-01 23:07:31 -08001526 time.AddNextTimestamp(
1527 distributed_clock::epoch(),
Austin Schuh58646e22021-08-23 23:51:46 -07001528 {BootTimestamp::epoch(), BootTimestamp::epoch() + kOffset,
1529 BootTimestamp::epoch()});
1530 time.AddNextTimestamp(distributed_clock::epoch() + chrono::seconds(10),
1531 {BootTimestamp::epoch() + chrono::milliseconds(9999),
1532 BootTimestamp::epoch() + kOffset + chrono::seconds(10),
1533 BootTimestamp::epoch() + chrono::milliseconds(9999)});
Austin Schuh2febf0d2020-09-21 22:24:30 -07001534
1535 std::unique_ptr<EventLoop> ping_event_loop =
1536 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
1537 Ping ping(ping_event_loop.get());
1538
1539 std::unique_ptr<EventLoop> pong_event_loop =
1540 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
1541 Pong pong(pong_event_loop.get());
1542
1543 std::unique_ptr<EventLoop> pi1_counter_event_loop =
1544 simulated_event_loop_factory.MakeEventLoop("pi1_counter", pi1);
1545 std::unique_ptr<EventLoop> pi2_counter_event_loop =
1546 simulated_event_loop_factory.MakeEventLoop("pi2_counter", pi2);
1547
1548 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
1549 pi1_counter_event_loop->MakeFetcher<examples::Ping>("/test");
1550 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
1551 pi2_counter_event_loop->MakeFetcher<examples::Ping>("/test");
1552
1553 aos::Fetcher<examples::Pong> pong_on_pi2_fetcher =
1554 pi2_counter_event_loop->MakeFetcher<examples::Pong>("/test");
1555 aos::Fetcher<examples::Pong> pong_on_pi1_fetcher =
1556 pi1_counter_event_loop->MakeFetcher<examples::Pong>("/test");
1557
1558 // End after a pong message comes back. This will leave the latest messages
1559 // on all channels so we can look at timestamps easily and check they make
1560 // sense.
1561 std::unique_ptr<EventLoop> pi1_pong_ender =
1562 simulated_event_loop_factory.MakeEventLoop("pi2_counter", pi1);
1563 int count = 0;
1564 pi1_pong_ender->MakeWatcher(
1565 "/test", [&simulated_event_loop_factory, &count](const examples::Pong &) {
1566 if (++count == 100) {
1567 simulated_event_loop_factory.Exit();
1568 }
1569 });
1570
1571 // Run enough that messages should be delivered.
1572 simulated_event_loop_factory.Run();
1573
1574 // Grab the latest messages.
1575 EXPECT_TRUE(ping_on_pi1_fetcher.Fetch());
1576 EXPECT_TRUE(ping_on_pi2_fetcher.Fetch());
1577 EXPECT_TRUE(pong_on_pi1_fetcher.Fetch());
1578 EXPECT_TRUE(pong_on_pi2_fetcher.Fetch());
1579
1580 // Compute their time on the global distributed clock so we can compute
1581 // distance betwen them.
1582 const distributed_clock::time_point pi1_ping_time =
1583 simulated_event_loop_factory.GetNodeEventLoopFactory(pi1)
1584 ->ToDistributedClock(
1585 ping_on_pi1_fetcher.context().monotonic_event_time);
1586 const distributed_clock::time_point pi2_ping_time =
1587 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
1588 ->ToDistributedClock(
1589 ping_on_pi2_fetcher.context().monotonic_event_time);
1590 const distributed_clock::time_point pi1_pong_time =
1591 simulated_event_loop_factory.GetNodeEventLoopFactory(pi1)
1592 ->ToDistributedClock(
1593 pong_on_pi1_fetcher.context().monotonic_event_time);
1594 const distributed_clock::time_point pi2_pong_time =
1595 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
1596 ->ToDistributedClock(
1597 pong_on_pi2_fetcher.context().monotonic_event_time);
1598
1599 // And confirm the delivery delay is just about exactly 150 uS for both
1600 // directions like expected. There will be a couple ns of rounding errors in
1601 // the conversion functions that aren't worth accounting for right now. This
1602 // will either be really close, or really far.
1603 EXPECT_GE(pi2_ping_time, chrono::microseconds(150) - chrono::nanoseconds(10) +
1604 pi1_ping_time);
1605 EXPECT_LE(pi2_ping_time, chrono::microseconds(150) + chrono::nanoseconds(10) +
1606 pi1_ping_time);
1607
1608 EXPECT_GE(pi1_pong_time, chrono::microseconds(150) - chrono::nanoseconds(10) +
1609 pi2_pong_time);
1610 EXPECT_LE(pi1_pong_time, chrono::microseconds(150) + chrono::nanoseconds(10) +
1611 pi2_pong_time);
1612}
1613
Austin Schuh4c570ea2020-11-19 23:13:24 -08001614void SendPing(aos::Sender<examples::Ping> *sender, int value) {
1615 aos::Sender<examples::Ping>::Builder builder = sender->MakeBuilder();
1616 examples::Ping::Builder ping_builder = builder.MakeBuilder<examples::Ping>();
1617 ping_builder.add_value(value);
milind1f1dca32021-07-03 13:50:07 -07001618 builder.CheckOk(builder.Send(ping_builder.Finish()));
Austin Schuh4c570ea2020-11-19 23:13:24 -08001619}
1620
1621// Tests that reliable (and unreliable) ping messages get forwarded as expected.
Austin Schuh89c9b812021-02-20 14:42:10 -08001622TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeStartupTesting) {
Austin Schuh4c570ea2020-11-19 23:13:24 -08001623 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
1624 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
1625
1626 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
1627
1628 std::unique_ptr<EventLoop> ping_event_loop =
1629 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
1630 aos::Sender<examples::Ping> pi1_reliable_sender =
1631 ping_event_loop->MakeSender<examples::Ping>("/reliable");
1632 aos::Sender<examples::Ping> pi1_unreliable_sender =
1633 ping_event_loop->MakeSender<examples::Ping>("/unreliable");
1634 SendPing(&pi1_reliable_sender, 1);
1635 SendPing(&pi1_unreliable_sender, 1);
1636
1637 std::unique_ptr<EventLoop> pi2_pong_event_loop =
1638 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
James Kuszmaul86e86c32022-07-21 17:39:47 -07001639 aos::Sender<examples::Ping> pi2_reliable_sender =
1640 pi2_pong_event_loop->MakeSender<examples::Ping>("/reliable2");
1641 SendPing(&pi2_reliable_sender, 1);
Austin Schuh4c570ea2020-11-19 23:13:24 -08001642 MessageCounter<examples::Ping> pi2_reliable_counter(pi2_pong_event_loop.get(),
1643 "/reliable");
James Kuszmaul86e86c32022-07-21 17:39:47 -07001644 MessageCounter<examples::Ping> pi1_reliable_counter(ping_event_loop.get(),
1645 "/reliable2");
Austin Schuh4c570ea2020-11-19 23:13:24 -08001646 MessageCounter<examples::Ping> pi2_unreliable_counter(
1647 pi2_pong_event_loop.get(), "/unreliable");
1648 aos::Fetcher<examples::Ping> reliable_on_pi2_fetcher =
1649 pi2_pong_event_loop->MakeFetcher<examples::Ping>("/reliable");
1650 aos::Fetcher<examples::Ping> unreliable_on_pi2_fetcher =
1651 pi2_pong_event_loop->MakeFetcher<examples::Ping>("/unreliable");
1652
1653 const size_t reliable_channel_index = configuration::ChannelIndex(
1654 pi2_pong_event_loop->configuration(), reliable_on_pi2_fetcher.channel());
1655
1656 std::unique_ptr<EventLoop> pi1_remote_timestamp =
1657 simulated_event_loop_factory.MakeEventLoop("pi1_remote_timestamp", pi1);
1658
Austin Schuheeaa2022021-01-02 21:52:03 -08001659 const chrono::nanoseconds network_delay =
1660 simulated_event_loop_factory.network_delay();
1661
Austin Schuh4c570ea2020-11-19 23:13:24 -08001662 int reliable_timestamp_count = 0;
1663 pi1_remote_timestamp->MakeWatcher(
Austin Schuh89c9b812021-02-20 14:42:10 -08001664 shared() ? "/pi1/aos/remote_timestamps/pi2"
1665 : "/pi1/aos/remote_timestamps/pi2/reliable/aos-examples-Ping",
Austin Schuh20ac95d2020-12-05 17:24:19 -08001666 [reliable_channel_index, &reliable_timestamp_count,
Austin Schuheeaa2022021-01-02 21:52:03 -08001667 &simulated_event_loop_factory, pi2, network_delay, &pi2_pong_event_loop,
1668 &pi1_remote_timestamp](const RemoteMessage &header) {
Austin Schuh20ac95d2020-12-05 17:24:19 -08001669 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuhcdd90272021-03-15 12:46:16 -07001670 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
Austin Schuh20ac95d2020-12-05 17:24:19 -08001671 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
Austin Schuhcdd90272021-03-15 12:46:16 -07001672 ->boot_uuid());
Austin Schuh4c570ea2020-11-19 23:13:24 -08001673 VLOG(1) << aos::FlatbufferToJson(&header);
1674 if (header.channel_index() == reliable_channel_index) {
1675 ++reliable_timestamp_count;
1676 }
Austin Schuheeaa2022021-01-02 21:52:03 -08001677
1678 const aos::monotonic_clock::time_point header_monotonic_sent_time(
1679 chrono::nanoseconds(header.monotonic_sent_time()));
1680
1681 EXPECT_EQ(pi1_remote_timestamp->context().monotonic_event_time,
1682 header_monotonic_sent_time + network_delay +
1683 (pi1_remote_timestamp->monotonic_now() -
1684 pi2_pong_event_loop->monotonic_now()));
Austin Schuh4c570ea2020-11-19 23:13:24 -08001685 });
1686
1687 // Wait to let timestamp estimation start up before looking for the results.
1688 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
1689
1690 EXPECT_EQ(pi2_reliable_counter.count(), 1u);
1691 // This one isn't reliable, but was sent before the start. It should *not* be
1692 // delivered.
1693 EXPECT_EQ(pi2_unreliable_counter.count(), 0u);
1694 // Confirm we got a timestamp logged for the message that was forwarded.
1695 EXPECT_EQ(reliable_timestamp_count, 1u);
1696
1697 SendPing(&pi1_reliable_sender, 2);
1698 SendPing(&pi1_unreliable_sender, 2);
1699 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
1700 EXPECT_EQ(pi2_reliable_counter.count(), 2u);
James Kuszmaul86e86c32022-07-21 17:39:47 -07001701 EXPECT_EQ(pi1_reliable_counter.count(), 1u);
Austin Schuh4c570ea2020-11-19 23:13:24 -08001702 EXPECT_EQ(pi2_unreliable_counter.count(), 1u);
1703
1704 EXPECT_EQ(reliable_timestamp_count, 2u);
1705}
1706
Austin Schuh20ac95d2020-12-05 17:24:19 -08001707// Tests that rebooting a node changes the ServerStatistics message and the
1708// RemoteTimestamp message.
Austin Schuh89c9b812021-02-20 14:42:10 -08001709TEST_P(RemoteMessageSimulatedEventLoopTest, BootUUIDTest) {
Austin Schuh72e65682021-09-02 11:37:05 -07001710 const UUID pi1_boot0 = UUID::Random();
1711 const UUID pi2_boot0 = UUID::Random();
1712 const UUID pi2_boot1 = UUID::Random();
1713 const UUID pi3_boot0 = UUID::Random();
1714 UUID expected_boot_uuid = pi2_boot0;
Austin Schuh20ac95d2020-12-05 17:24:19 -08001715
Austin Schuh58646e22021-08-23 23:51:46 -07001716 message_bridge::TestingTimeConverter time(
1717 configuration::NodesCount(&config.message()));
1718 SimulatedEventLoopFactory factory(&config.message());
1719 factory.SetTimeConverter(&time);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001720
Austin Schuh58646e22021-08-23 23:51:46 -07001721 const size_t pi1_index =
1722 configuration::GetNodeIndex(&config.message(), "pi1");
1723 const size_t pi2_index =
1724 configuration::GetNodeIndex(&config.message(), "pi2");
1725 const size_t pi3_index =
1726 configuration::GetNodeIndex(&config.message(), "pi3");
Austin Schuh20ac95d2020-12-05 17:24:19 -08001727
Austin Schuh58646e22021-08-23 23:51:46 -07001728 {
1729 time.AddNextTimestamp(distributed_clock::epoch(),
1730 {BootTimestamp::epoch(), BootTimestamp::epoch(),
1731 BootTimestamp::epoch()});
1732
1733 const chrono::nanoseconds dt = chrono::milliseconds(2001);
1734
1735 time.AddNextTimestamp(
1736 distributed_clock::epoch() + dt,
1737 {BootTimestamp::epoch() + dt,
1738 BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
1739 BootTimestamp::epoch() + dt});
1740
1741 time.set_boot_uuid(pi1_index, 0, pi1_boot0);
1742 time.set_boot_uuid(pi2_index, 0, pi2_boot0);
1743 time.set_boot_uuid(pi2_index, 1, pi2_boot1);
1744 time.set_boot_uuid(pi3_index, 0, pi3_boot0);
1745 }
1746
1747 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1748 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
1749
1750 pi1->OnStartup([pi1]() { pi1->AlwaysStart<Ping>("ping"); });
1751 pi2->OnStartup([pi2]() { pi2->AlwaysStart<Pong>("pong"); });
Austin Schuh20ac95d2020-12-05 17:24:19 -08001752
1753 std::unique_ptr<EventLoop> pi1_remote_timestamp =
Austin Schuh58646e22021-08-23 23:51:46 -07001754 pi1->MakeEventLoop("pi1_remote_timestamp");
Austin Schuh20ac95d2020-12-05 17:24:19 -08001755
1756 int timestamp_count = 0;
1757 pi1_remote_timestamp->MakeWatcher(
Austin Schuh8902fa52021-03-14 22:39:24 -07001758 "/pi2/aos", [&expected_boot_uuid,
1759 &pi1_remote_timestamp](const message_bridge::Timestamp &) {
Austin Schuha9012be2021-07-21 15:19:11 -07001760 EXPECT_EQ(pi1_remote_timestamp->context().source_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -07001761 expected_boot_uuid);
1762 });
1763 pi1_remote_timestamp->MakeWatcher(
1764 "/test",
1765 [&expected_boot_uuid, &pi1_remote_timestamp](const examples::Pong &) {
Austin Schuha9012be2021-07-21 15:19:11 -07001766 EXPECT_EQ(pi1_remote_timestamp->context().source_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -07001767 expected_boot_uuid);
1768 });
1769 pi1_remote_timestamp->MakeWatcher(
Austin Schuh89c9b812021-02-20 14:42:10 -08001770 shared() ? "/pi1/aos/remote_timestamps/pi2"
1771 : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping",
Austin Schuh20ac95d2020-12-05 17:24:19 -08001772 [&timestamp_count, &expected_boot_uuid](const RemoteMessage &header) {
1773 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuhcdd90272021-03-15 12:46:16 -07001774 EXPECT_EQ(UUID::FromVector(header.boot_uuid()), expected_boot_uuid);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001775 VLOG(1) << aos::FlatbufferToJson(&header);
1776 ++timestamp_count;
1777 });
1778
1779 int pi1_server_statistics_count = 0;
Austin Schuh58646e22021-08-23 23:51:46 -07001780 bool first_pi1_server_statistics = true;
Austin Schuh367a7f42021-11-23 23:04:36 -08001781 int boot_number = 0;
1782 monotonic_clock::time_point expected_connection_time = pi1->monotonic_now();
Austin Schuh20ac95d2020-12-05 17:24:19 -08001783 pi1_remote_timestamp->MakeWatcher(
Austin Schuh367a7f42021-11-23 23:04:36 -08001784 "/pi1/aos",
1785 [&pi1_server_statistics_count, &expected_boot_uuid,
1786 &expected_connection_time, &first_pi1_server_statistics,
1787 &boot_number](const message_bridge::ServerStatistics &stats) {
Austin Schuh20ac95d2020-12-05 17:24:19 -08001788 VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
1789 for (const message_bridge::ServerConnection *connection :
1790 *stats.connections()) {
Austin Schuh58646e22021-08-23 23:51:46 -07001791 if (connection->state() == message_bridge::State::CONNECTED) {
1792 ASSERT_TRUE(connection->has_boot_uuid());
1793 }
1794 if (!first_pi1_server_statistics) {
1795 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1796 }
Austin Schuh20ac95d2020-12-05 17:24:19 -08001797 if (connection->node()->name()->string_view() == "pi2") {
Austin Schuh58646e22021-08-23 23:51:46 -07001798 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1799 ASSERT_TRUE(connection->has_boot_uuid());
Austin Schuh20ac95d2020-12-05 17:24:19 -08001800 EXPECT_EQ(expected_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -07001801 UUID::FromString(connection->boot_uuid()))
Austin Schuh20ac95d2020-12-05 17:24:19 -08001802 << " : Got " << aos::FlatbufferToJson(&stats);
Austin Schuh367a7f42021-11-23 23:04:36 -08001803 EXPECT_EQ(monotonic_clock::time_point(chrono::nanoseconds(
1804 connection->connected_since_time())),
1805 expected_connection_time);
1806 EXPECT_EQ(boot_number + 1, connection->connection_count());
Austin Schuh20ac95d2020-12-05 17:24:19 -08001807 ++pi1_server_statistics_count;
1808 }
1809 }
Austin Schuh58646e22021-08-23 23:51:46 -07001810 first_pi1_server_statistics = false;
Austin Schuh20ac95d2020-12-05 17:24:19 -08001811 });
1812
Austin Schuh58646e22021-08-23 23:51:46 -07001813 int pi1_client_statistics_count = 0;
1814 pi1_remote_timestamp->MakeWatcher(
Austin Schuh367a7f42021-11-23 23:04:36 -08001815 "/pi1/aos", [&pi1_client_statistics_count, &expected_boot_uuid,
1816 &expected_connection_time, &boot_number](
Austin Schuh58646e22021-08-23 23:51:46 -07001817 const message_bridge::ClientStatistics &stats) {
1818 VLOG(1) << "pi1 ClientStatistics " << FlatbufferToJson(&stats);
1819 for (const message_bridge::ClientConnection *connection :
1820 *stats.connections()) {
1821 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1822 if (connection->node()->name()->string_view() == "pi2") {
1823 ++pi1_client_statistics_count;
Austin Schuh367a7f42021-11-23 23:04:36 -08001824 EXPECT_EQ(expected_boot_uuid,
1825 UUID::FromString(connection->boot_uuid()))
1826 << " : Got " << aos::FlatbufferToJson(&stats);
1827 EXPECT_EQ(monotonic_clock::time_point(chrono::nanoseconds(
1828 connection->connected_since_time())),
1829 expected_connection_time);
1830 EXPECT_EQ(boot_number + 1, connection->connection_count());
1831 } else {
1832 EXPECT_EQ(connection->connected_since_time(), 0);
1833 EXPECT_EQ(1, connection->connection_count());
Austin Schuh58646e22021-08-23 23:51:46 -07001834 }
1835 }
1836 });
1837
1838 // Confirm that reboot changes the UUID.
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -07001839 pi2->OnShutdown([&expected_boot_uuid, &boot_number, &expected_connection_time,
1840 pi1, pi2, pi2_boot1]() {
1841 expected_boot_uuid = pi2_boot1;
1842 ++boot_number;
1843 LOG(INFO) << "OnShutdown triggered for pi2";
1844 pi2->OnStartup(
1845 [&expected_boot_uuid, &expected_connection_time, pi1, pi2]() {
1846 EXPECT_EQ(expected_boot_uuid, pi2->boot_uuid());
1847 expected_connection_time = pi1->monotonic_now();
1848 });
1849 });
Austin Schuh58646e22021-08-23 23:51:46 -07001850
Austin Schuh20ac95d2020-12-05 17:24:19 -08001851 // Let a couple of ServerStatistics messages show up before rebooting.
Austin Schuh58646e22021-08-23 23:51:46 -07001852 factory.RunFor(chrono::milliseconds(2002));
Austin Schuh20ac95d2020-12-05 17:24:19 -08001853
1854 EXPECT_GT(timestamp_count, 100);
1855 EXPECT_GE(pi1_server_statistics_count, 1u);
1856
Austin Schuh20ac95d2020-12-05 17:24:19 -08001857 timestamp_count = 0;
1858 pi1_server_statistics_count = 0;
1859
Austin Schuh58646e22021-08-23 23:51:46 -07001860 factory.RunFor(chrono::milliseconds(2000));
Austin Schuh20ac95d2020-12-05 17:24:19 -08001861 EXPECT_GT(timestamp_count, 100);
1862 EXPECT_GE(pi1_server_statistics_count, 1u);
1863}
1864
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -07001865INSTANTIATE_TEST_SUITE_P(
Austin Schuh89c9b812021-02-20 14:42:10 -08001866 All, RemoteMessageSimulatedEventLoopTest,
1867 ::testing::Values(
1868 Param{"multinode_pingpong_test_combined_config.json", true},
1869 Param{"multinode_pingpong_test_split_config.json", false}));
1870
Austin Schuh58646e22021-08-23 23:51:46 -07001871// Tests that Startup and Shutdown do reasonable things.
1872TEST(SimulatedEventLoopTest, MultinodePingPongStartup) {
1873 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
1874 aos::configuration::ReadConfig(
1875 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
1876
Austin Schuh72e65682021-09-02 11:37:05 -07001877 size_t pi1_shutdown_counter = 0;
1878 size_t pi2_shutdown_counter = 0;
1879 MessageCounter<examples::Pong> *pi1_pong_counter = nullptr;
1880 MessageCounter<examples::Ping> *pi2_ping_counter = nullptr;
1881
Austin Schuh58646e22021-08-23 23:51:46 -07001882 message_bridge::TestingTimeConverter time(
1883 configuration::NodesCount(&config.message()));
1884 SimulatedEventLoopFactory factory(&config.message());
1885 factory.SetTimeConverter(&time);
1886 time.AddNextTimestamp(
1887 distributed_clock::epoch(),
1888 {BootTimestamp::epoch(), BootTimestamp::epoch(), BootTimestamp::epoch()});
1889
1890 const chrono::nanoseconds dt = chrono::seconds(10) + chrono::milliseconds(6);
1891
1892 time.AddNextTimestamp(
1893 distributed_clock::epoch() + dt,
1894 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
1895 BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
1896 BootTimestamp::epoch() + dt});
1897
1898 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1899 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
1900
1901 // Configure startup to start Ping and Pong, and count.
1902 size_t pi1_startup_counter = 0;
1903 size_t pi2_startup_counter = 0;
1904 pi1->OnStartup([pi1]() {
1905 LOG(INFO) << "Made ping";
1906 pi1->AlwaysStart<Ping>("ping");
1907 });
1908 pi1->OnStartup([&pi1_startup_counter]() { ++pi1_startup_counter; });
1909 pi2->OnStartup([pi2]() {
1910 LOG(INFO) << "Made pong";
1911 pi2->AlwaysStart<Pong>("pong");
1912 });
1913 pi2->OnStartup([&pi2_startup_counter]() { ++pi2_startup_counter; });
1914
1915 // Shutdown just counts.
Austin Schuh58646e22021-08-23 23:51:46 -07001916 pi1->OnShutdown([&pi1_shutdown_counter]() { ++pi1_shutdown_counter; });
1917 pi2->OnShutdown([&pi2_shutdown_counter]() { ++pi2_shutdown_counter; });
1918
Austin Schuh58646e22021-08-23 23:51:46 -07001919 // Automatically make counters on startup.
1920 pi1->OnStartup([&pi1_pong_counter, pi1]() {
1921 pi1_pong_counter = pi1->AlwaysStart<MessageCounter<examples::Pong>>(
1922 "pi1_pong_counter", "/test");
1923 });
1924 pi1->OnShutdown([&pi1_pong_counter]() { pi1_pong_counter = nullptr; });
1925 pi2->OnStartup([&pi2_ping_counter, pi2]() {
1926 pi2_ping_counter = pi2->AlwaysStart<MessageCounter<examples::Ping>>(
1927 "pi2_ping_counter", "/test");
1928 });
1929 pi2->OnShutdown([&pi2_ping_counter]() { pi2_ping_counter = nullptr; });
1930
1931 EXPECT_EQ(pi2_ping_counter, nullptr);
1932 EXPECT_EQ(pi1_pong_counter, nullptr);
1933
1934 EXPECT_EQ(pi1_startup_counter, 0u);
1935 EXPECT_EQ(pi2_startup_counter, 0u);
1936 EXPECT_EQ(pi1_shutdown_counter, 0u);
1937 EXPECT_EQ(pi2_shutdown_counter, 0u);
1938
1939 factory.RunFor(chrono::seconds(10) + chrono::milliseconds(5));
1940 EXPECT_EQ(pi1_startup_counter, 1u);
1941 EXPECT_EQ(pi2_startup_counter, 1u);
1942 EXPECT_EQ(pi1_shutdown_counter, 0u);
1943 EXPECT_EQ(pi2_shutdown_counter, 0u);
1944 EXPECT_EQ(pi2_ping_counter->count(), 1001);
1945 EXPECT_EQ(pi1_pong_counter->count(), 1001);
1946
1947 LOG(INFO) << pi1->monotonic_now();
1948 LOG(INFO) << pi2->monotonic_now();
1949
1950 factory.RunFor(chrono::seconds(5) + chrono::milliseconds(5));
1951
1952 EXPECT_EQ(pi1_startup_counter, 2u);
1953 EXPECT_EQ(pi2_startup_counter, 2u);
1954 EXPECT_EQ(pi1_shutdown_counter, 1u);
1955 EXPECT_EQ(pi2_shutdown_counter, 1u);
1956 EXPECT_EQ(pi2_ping_counter->count(), 501);
1957 EXPECT_EQ(pi1_pong_counter->count(), 501);
1958}
1959
1960// Tests that OnStartup handlers can be added after running and get called, and
1961// can't be called when running.
1962TEST(SimulatedEventLoopDeathTest, OnStartupWhileRunning) {
1963 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
1964 aos::configuration::ReadConfig(
1965 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
1966
1967 // Test that we can add startup handlers as long as we aren't running, and
1968 // they get run when Run gets called again.
1969 // Test that adding a startup handler when running fails.
1970 //
1971 // Test shutdown handlers get called on destruction.
1972 SimulatedEventLoopFactory factory(&config.message());
1973
1974 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1975
1976 int startup_count0 = 0;
1977 int startup_count1 = 0;
1978
1979 pi1->OnStartup([&]() { ++startup_count0; });
1980 EXPECT_EQ(startup_count0, 0);
1981 EXPECT_EQ(startup_count1, 0);
1982
1983 factory.RunFor(chrono::nanoseconds(1));
1984 EXPECT_EQ(startup_count0, 1);
1985 EXPECT_EQ(startup_count1, 0);
1986
1987 pi1->OnStartup([&]() { ++startup_count1; });
1988 EXPECT_EQ(startup_count0, 1);
1989 EXPECT_EQ(startup_count1, 0);
1990
1991 factory.RunFor(chrono::nanoseconds(1));
1992 EXPECT_EQ(startup_count0, 1);
1993 EXPECT_EQ(startup_count1, 1);
1994
1995 std::unique_ptr<EventLoop> loop = pi1->MakeEventLoop("foo");
1996 loop->OnRun([&]() { pi1->OnStartup([]() {}); });
1997
1998 EXPECT_DEATH({ factory.RunFor(chrono::nanoseconds(1)); },
1999 "Can only register OnStartup handlers when not running.");
2000}
2001
2002// Tests that OnStartup handlers can be added after running and get called, and
2003// all the handlers get called on reboot. Shutdown handlers are tested the same
2004// way.
2005TEST(SimulatedEventLoopTest, OnStartupShutdownAllRestarts) {
2006 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2007 aos::configuration::ReadConfig(
2008 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2009
Austin Schuh72e65682021-09-02 11:37:05 -07002010 int startup_count0 = 0;
2011 int shutdown_count0 = 0;
2012 int startup_count1 = 0;
2013 int shutdown_count1 = 0;
2014
Austin Schuh58646e22021-08-23 23:51:46 -07002015 message_bridge::TestingTimeConverter time(
2016 configuration::NodesCount(&config.message()));
2017 SimulatedEventLoopFactory factory(&config.message());
2018 factory.SetTimeConverter(&time);
2019 time.StartEqual();
2020
2021 const chrono::nanoseconds dt = chrono::seconds(10);
2022 time.RebootAt(0, distributed_clock::epoch() + dt);
2023 time.RebootAt(0, distributed_clock::epoch() + 2 * dt);
2024
2025 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2026
Austin Schuh58646e22021-08-23 23:51:46 -07002027 pi1->OnStartup([&]() { ++startup_count0; });
2028 pi1->OnShutdown([&]() { ++shutdown_count0; });
2029 EXPECT_EQ(startup_count0, 0);
2030 EXPECT_EQ(startup_count1, 0);
2031 EXPECT_EQ(shutdown_count0, 0);
2032 EXPECT_EQ(shutdown_count1, 0);
2033
2034 factory.RunFor(chrono::nanoseconds(1));
2035 EXPECT_EQ(startup_count0, 1);
2036 EXPECT_EQ(startup_count1, 0);
2037 EXPECT_EQ(shutdown_count0, 0);
2038 EXPECT_EQ(shutdown_count1, 0);
2039
2040 pi1->OnStartup([&]() { ++startup_count1; });
2041 EXPECT_EQ(startup_count0, 1);
2042 EXPECT_EQ(startup_count1, 0);
2043 EXPECT_EQ(shutdown_count0, 0);
2044 EXPECT_EQ(shutdown_count1, 0);
2045
2046 factory.RunFor(chrono::nanoseconds(1));
2047 EXPECT_EQ(startup_count0, 1);
2048 EXPECT_EQ(startup_count1, 1);
2049 EXPECT_EQ(shutdown_count0, 0);
2050 EXPECT_EQ(shutdown_count1, 0);
2051
2052 factory.RunFor(chrono::seconds(15));
2053
2054 EXPECT_EQ(startup_count0, 2);
2055 EXPECT_EQ(startup_count1, 2);
2056 EXPECT_EQ(shutdown_count0, 1);
2057 EXPECT_EQ(shutdown_count1, 0);
2058
2059 pi1->OnShutdown([&]() { ++shutdown_count1; });
2060 factory.RunFor(chrono::seconds(10));
2061
2062 EXPECT_EQ(startup_count0, 3);
2063 EXPECT_EQ(startup_count1, 3);
2064 EXPECT_EQ(shutdown_count0, 2);
2065 EXPECT_EQ(shutdown_count1, 1);
2066}
2067
2068// Tests that event loops which outlive shutdown crash.
2069TEST(SimulatedEventLoopDeathTest, EventLoopOutlivesReboot) {
2070 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2071 aos::configuration::ReadConfig(
2072 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2073
2074 message_bridge::TestingTimeConverter time(
2075 configuration::NodesCount(&config.message()));
2076 SimulatedEventLoopFactory factory(&config.message());
2077 factory.SetTimeConverter(&time);
2078 time.StartEqual();
2079
2080 const chrono::nanoseconds dt = chrono::seconds(10);
2081 time.RebootAt(0, distributed_clock::epoch() + dt);
2082
2083 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2084
2085 std::unique_ptr<EventLoop> loop = pi1->MakeEventLoop("foo");
2086
2087 EXPECT_DEATH({ factory.RunFor(dt * 2); }, "Event loop");
2088}
2089
Brian Silvermane1fe2512022-08-14 23:18:50 -07002090// Test that an ExitHandle outliving its factory is caught.
2091TEST(SimulatedEventLoopDeathTest, ExitHandleOutlivesFactory) {
2092 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2093 aos::configuration::ReadConfig(
2094 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2095 auto factory = std::make_unique<SimulatedEventLoopFactory>(&config.message());
2096 NodeEventLoopFactory *pi1 = factory->GetNodeEventLoopFactory("pi1");
2097 std::unique_ptr<EventLoop> loop = pi1->MakeEventLoop("foo");
2098 auto exit_handle = factory->MakeExitHandle();
2099 EXPECT_DEATH(factory.reset(),
2100 "All ExitHandles must be destroyed before the factory");
2101}
2102
Austin Schuh3e31f912023-08-21 21:29:10 -07002103// Test that AllowApplicationCreationDuring can't happen in OnRun callbacks.
2104TEST(SimulatedEventLoopDeathTest, AllowApplicationCreationDuringInOnRun) {
2105 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2106 aos::configuration::ReadConfig(
2107 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2108 auto factory = std::make_unique<SimulatedEventLoopFactory>(&config.message());
2109 NodeEventLoopFactory *pi1 = factory->GetNodeEventLoopFactory("pi1");
2110 std::unique_ptr<EventLoop> loop = pi1->MakeEventLoop("foo");
2111 loop->OnRun([&]() { factory->AllowApplicationCreationDuring([]() {}); });
2112 EXPECT_DEATH(factory->RunFor(chrono::seconds(1)), "OnRun");
2113}
2114
Austin Schuh58646e22021-08-23 23:51:46 -07002115// Tests that messages don't survive a reboot of a node.
2116TEST(SimulatedEventLoopTest, ChannelClearedOnReboot) {
2117 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2118 aos::configuration::ReadConfig(
2119 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2120
2121 message_bridge::TestingTimeConverter time(
2122 configuration::NodesCount(&config.message()));
2123 SimulatedEventLoopFactory factory(&config.message());
2124 factory.SetTimeConverter(&time);
2125 time.StartEqual();
2126
2127 const chrono::nanoseconds dt = chrono::seconds(10);
2128 time.RebootAt(0, distributed_clock::epoch() + dt);
2129
2130 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2131
2132 const UUID boot_uuid = pi1->boot_uuid();
2133 EXPECT_NE(boot_uuid, UUID::Zero());
2134
2135 {
2136 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
2137 aos::Sender<examples::Ping> test_message_sender =
2138 ping_event_loop->MakeSender<examples::Ping>("/reliable");
2139 SendPing(&test_message_sender, 1);
2140 }
2141
2142 factory.RunFor(chrono::seconds(5));
2143
2144 {
2145 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
2146 aos::Fetcher<examples::Ping> fetcher =
2147 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
2148 EXPECT_TRUE(fetcher.Fetch());
2149 }
2150
2151 factory.RunFor(chrono::seconds(10));
2152
2153 {
2154 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
2155 aos::Fetcher<examples::Ping> fetcher =
2156 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
2157 EXPECT_FALSE(fetcher.Fetch());
2158 }
2159 EXPECT_NE(boot_uuid, pi1->boot_uuid());
2160}
2161
2162// Tests that reliable messages get resent on reboot.
2163TEST(SimulatedEventLoopTest, ReliableMessageResentOnReboot) {
2164 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2165 aos::configuration::ReadConfig(
2166 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2167
2168 message_bridge::TestingTimeConverter time(
2169 configuration::NodesCount(&config.message()));
2170 SimulatedEventLoopFactory factory(&config.message());
2171 factory.SetTimeConverter(&time);
2172 time.StartEqual();
2173
2174 const chrono::nanoseconds dt = chrono::seconds(1);
2175 time.RebootAt(1, distributed_clock::epoch() + dt);
2176
2177 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2178 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2179
2180 const UUID pi1_boot_uuid = pi1->boot_uuid();
2181 const UUID pi2_boot_uuid = pi2->boot_uuid();
2182 EXPECT_NE(pi1_boot_uuid, UUID::Zero());
2183 EXPECT_NE(pi2_boot_uuid, UUID::Zero());
2184
2185 {
2186 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
2187 aos::Sender<examples::Ping> test_message_sender =
2188 ping_event_loop->MakeSender<examples::Ping>("/reliable");
2189 SendPing(&test_message_sender, 1);
2190 }
2191
2192 factory.RunFor(chrono::milliseconds(500));
2193
2194 {
2195 ::std::unique_ptr<EventLoop> ping_event_loop = pi2->MakeEventLoop("pong");
2196 aos::Fetcher<examples::Ping> fetcher =
2197 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
2198 EXPECT_TRUE(fetcher.Fetch());
2199 }
2200
2201 factory.RunFor(chrono::seconds(1));
2202
2203 {
2204 ::std::unique_ptr<EventLoop> ping_event_loop = pi2->MakeEventLoop("pong");
2205 aos::Fetcher<examples::Ping> fetcher =
2206 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
2207 EXPECT_TRUE(fetcher.Fetch());
2208 }
2209 EXPECT_NE(pi2_boot_uuid, pi2->boot_uuid());
2210}
2211
James Kuszmaul86e86c32022-07-21 17:39:47 -07002212TEST(SimulatedEventLoopTest, ReliableMessageSentOnStaggeredBoot) {
2213 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2214 aos::configuration::ReadConfig(
2215 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2216
2217 message_bridge::TestingTimeConverter time(
2218 configuration::NodesCount(&config.message()));
2219 time.AddNextTimestamp(
2220 distributed_clock::epoch(),
2221 {BootTimestamp{0, monotonic_clock::epoch()},
2222 BootTimestamp{0, monotonic_clock::epoch() - chrono::seconds(1)},
2223 BootTimestamp{0, monotonic_clock::epoch()}});
2224 SimulatedEventLoopFactory factory(&config.message());
2225 factory.SetTimeConverter(&time);
2226
2227 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2228 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2229
2230 const UUID pi1_boot_uuid = pi1->boot_uuid();
2231 const UUID pi2_boot_uuid = pi2->boot_uuid();
2232 EXPECT_NE(pi1_boot_uuid, UUID::Zero());
2233 EXPECT_NE(pi2_boot_uuid, UUID::Zero());
2234
2235 {
2236 ::std::unique_ptr<EventLoop> pi1_event_loop = pi1->MakeEventLoop("ping");
2237 aos::Sender<examples::Ping> pi1_sender =
2238 pi1_event_loop->MakeSender<examples::Ping>("/reliable");
2239 SendPing(&pi1_sender, 1);
2240 }
2241 ::std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("ping");
2242 aos::Sender<examples::Ping> pi2_sender =
2243 pi2_event_loop->MakeSender<examples::Ping>("/reliable2");
2244 SendPing(&pi2_sender, 1);
2245 // Verify that we staggered the OnRun callback correctly.
2246 pi2_event_loop->OnRun([pi1, pi2]() {
2247 EXPECT_EQ(pi1->monotonic_now(),
2248 monotonic_clock::epoch() + std::chrono::seconds(1));
2249 EXPECT_EQ(pi2->monotonic_now(), monotonic_clock::epoch());
2250 });
2251
2252 factory.RunFor(chrono::seconds(2));
2253
2254 {
2255 ::std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("pong");
2256 aos::Fetcher<examples::Ping> fetcher =
2257 pi2_event_loop->MakeFetcher<examples::Ping>("/reliable");
2258 ASSERT_TRUE(fetcher.Fetch());
2259 EXPECT_EQ(fetcher.context().monotonic_event_time,
2260 monotonic_clock::epoch() + factory.network_delay());
2261 EXPECT_EQ(fetcher.context().monotonic_remote_time,
2262 monotonic_clock::epoch());
2263 }
2264 {
2265 ::std::unique_ptr<EventLoop> pi1_event_loop = pi1->MakeEventLoop("pong");
2266 aos::Fetcher<examples::Ping> fetcher =
2267 pi1_event_loop->MakeFetcher<examples::Ping>("/reliable2");
2268 ASSERT_TRUE(fetcher.Fetch());
2269 EXPECT_EQ(fetcher.context().monotonic_event_time,
2270 monotonic_clock::epoch() + std::chrono::seconds(1) +
2271 factory.network_delay());
2272 EXPECT_EQ(fetcher.context().monotonic_remote_time,
2273 monotonic_clock::epoch() - std::chrono::seconds(1));
2274 }
2275}
2276
Austin Schuh48205e62021-11-12 14:13:18 -08002277class SimulatedEventLoopDisconnectTest : public ::testing::Test {
2278 public:
2279 SimulatedEventLoopDisconnectTest()
2280 : config(aos::configuration::ReadConfig(ArtifactPath(
2281 "aos/events/multinode_pingpong_test_split_config.json"))),
2282 time(configuration::NodesCount(&config.message())),
2283 factory(&config.message()) {
2284 factory.SetTimeConverter(&time);
2285 }
2286
2287 void VerifyChannels(std::set<const aos::Channel *> statistics_channels,
2288 const monotonic_clock::time_point allowable_message_time,
2289 std::set<const aos::Node *> empty_nodes) {
2290 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2291 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2292 std::unique_ptr<aos::EventLoop> pi1_event_loop =
2293 pi1->MakeEventLoop("fetcher");
2294 std::unique_ptr<aos::EventLoop> pi2_event_loop =
2295 pi2->MakeEventLoop("fetcher");
2296 for (const aos::Channel *channel : *factory.configuration()->channels()) {
2297 if (configuration::ChannelIsReadableOnNode(channel,
2298 pi1_event_loop->node())) {
2299 std::unique_ptr<aos::RawFetcher> fetcher =
2300 pi1_event_loop->MakeRawFetcher(channel);
2301 if (statistics_channels.find(channel) == statistics_channels.end() ||
2302 empty_nodes.find(pi1_event_loop->node()) != empty_nodes.end()) {
2303 EXPECT_FALSE(fetcher->Fetch() &&
2304 fetcher->context().monotonic_event_time >
2305 allowable_message_time)
2306 << ": Found recent message on channel "
2307 << configuration::CleanedChannelToString(channel) << " and time "
2308 << fetcher->context().monotonic_event_time << " > "
2309 << allowable_message_time << " on pi1";
2310 } else {
2311 EXPECT_TRUE(fetcher->Fetch() &&
2312 fetcher->context().monotonic_event_time >=
2313 allowable_message_time)
2314 << ": Didn't find recent message on channel "
2315 << configuration::CleanedChannelToString(channel) << " on pi1";
2316 }
2317 }
2318 if (configuration::ChannelIsReadableOnNode(channel,
2319 pi2_event_loop->node())) {
2320 std::unique_ptr<aos::RawFetcher> fetcher =
2321 pi2_event_loop->MakeRawFetcher(channel);
2322 if (statistics_channels.find(channel) == statistics_channels.end() ||
2323 empty_nodes.find(pi2_event_loop->node()) != empty_nodes.end()) {
2324 EXPECT_FALSE(fetcher->Fetch() &&
2325 fetcher->context().monotonic_event_time >
2326 allowable_message_time)
2327 << ": Found message on channel "
2328 << configuration::CleanedChannelToString(channel) << " and time "
2329 << fetcher->context().monotonic_event_time << " > "
2330 << allowable_message_time << " on pi2";
2331 } else {
2332 EXPECT_TRUE(fetcher->Fetch() &&
2333 fetcher->context().monotonic_event_time >=
2334 allowable_message_time)
2335 << ": Didn't find message on channel "
2336 << configuration::CleanedChannelToString(channel) << " on pi2";
2337 }
2338 }
2339 }
2340 }
2341
2342 aos::FlatbufferDetachedBuffer<aos::Configuration> config;
2343
2344 message_bridge::TestingTimeConverter time;
2345 SimulatedEventLoopFactory factory;
2346};
2347
2348// Tests that if we have message bridge client/server disabled, and timing
2349// reports disabled, no messages are sent. Also tests that we can disconnect a
2350// node and disable statistics on it and it actually fully disconnects.
2351TEST_F(SimulatedEventLoopDisconnectTest, NoMessagesWhenDisabled) {
2352 time.StartEqual();
2353 factory.SkipTimingReport();
2354 factory.DisableStatistics();
2355
2356 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2357 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2358
2359 std::unique_ptr<aos::EventLoop> pi1_event_loop =
2360 pi1->MakeEventLoop("fetcher");
2361 std::unique_ptr<aos::EventLoop> pi2_event_loop =
2362 pi2->MakeEventLoop("fetcher");
2363
2364 factory.RunFor(chrono::milliseconds(100000));
2365
2366 // Confirm no messages are sent if we've configured them all off.
2367 VerifyChannels({}, monotonic_clock::min_time, {});
2368
2369 // Now, confirm that all the message_bridge channels come back when we
2370 // re-enable.
2371 factory.EnableStatistics();
2372
2373 factory.RunFor(chrono::milliseconds(10050));
2374
2375 // Build up the list of all the messages we expect when we come back.
2376 {
2377 std::set<const aos::Channel *> statistics_channels;
Brian Silverman18c8aac2022-01-02 00:03:37 -08002378 for (const std::pair<std::string_view, const Node *> &pi :
Austin Schuh48205e62021-11-12 14:13:18 -08002379 std::vector<std::pair<std::string_view, const Node *>>{
2380 {"/pi1/aos", pi1->node()},
2381 {"/pi2/aos", pi1->node()},
2382 {"/pi3/aos", pi1->node()}}) {
2383 statistics_channels.insert(configuration::GetChannel(
2384 factory.configuration(), pi.first, "aos.message_bridge.Timestamp", "",
2385 pi.second));
2386 statistics_channels.insert(configuration::GetChannel(
2387 factory.configuration(), pi.first,
2388 "aos.message_bridge.ServerStatistics", "", pi.second));
2389 statistics_channels.insert(configuration::GetChannel(
2390 factory.configuration(), pi.first,
2391 "aos.message_bridge.ClientStatistics", "", pi.second));
2392 }
2393
2394 statistics_channels.insert(configuration::GetChannel(
2395 factory.configuration(),
2396 "/pi1/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp",
2397 "aos.message_bridge.RemoteMessage", "", pi1->node()));
2398 statistics_channels.insert(configuration::GetChannel(
2399 factory.configuration(),
2400 "/pi2/aos/remote_timestamps/pi1/pi2/aos/aos-message_bridge-Timestamp",
2401 "aos.message_bridge.RemoteMessage", "", pi2->node()));
2402 VerifyChannels(statistics_channels, monotonic_clock::min_time, {});
2403 }
2404
2405 // Now test that we can disable the messages for a single node
2406 pi2->DisableStatistics();
2407 const aos::monotonic_clock::time_point statistics_disable_time =
2408 pi2->monotonic_now();
2409 factory.RunFor(chrono::milliseconds(10000));
2410
2411 // We should see a much smaller set of messages, but should still see messages
2412 // forwarded, mainly the timestamp message.
2413 {
2414 std::set<const aos::Channel *> statistics_channels;
Brian Silverman18c8aac2022-01-02 00:03:37 -08002415 for (const std::pair<std::string_view, const Node *> &pi :
Austin Schuh48205e62021-11-12 14:13:18 -08002416 std::vector<std::pair<std::string_view, const Node *>>{
2417 {"/pi1/aos", pi1->node()}, {"/pi3/aos", pi1->node()}}) {
2418 statistics_channels.insert(configuration::GetChannel(
2419 factory.configuration(), pi.first, "aos.message_bridge.Timestamp", "",
2420 pi.second));
2421 statistics_channels.insert(configuration::GetChannel(
2422 factory.configuration(), pi.first,
2423 "aos.message_bridge.ServerStatistics", "", pi.second));
2424 statistics_channels.insert(configuration::GetChannel(
2425 factory.configuration(), pi.first,
2426 "aos.message_bridge.ClientStatistics", "", pi.second));
2427 }
2428
2429 statistics_channels.insert(configuration::GetChannel(
2430 factory.configuration(),
2431 "/pi1/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp",
2432 "aos.message_bridge.RemoteMessage", "", pi1->node()));
2433 VerifyChannels(statistics_channels, statistics_disable_time, {});
2434 }
2435
2436 // Now, fully disconnect the node. This will completely quiet down pi2.
2437 pi1->Disconnect(pi2->node());
2438 pi2->Disconnect(pi1->node());
2439
2440 const aos::monotonic_clock::time_point disconnect_disable_time =
2441 pi2->monotonic_now();
2442 factory.RunFor(chrono::milliseconds(10000));
2443
2444 {
2445 std::set<const aos::Channel *> statistics_channels;
Brian Silverman18c8aac2022-01-02 00:03:37 -08002446 for (const std::pair<std::string_view, const Node *> &pi :
Austin Schuh48205e62021-11-12 14:13:18 -08002447 std::vector<std::pair<std::string_view, const Node *>>{
2448 {"/pi1/aos", pi1->node()}, {"/pi3/aos", pi1->node()}}) {
2449 statistics_channels.insert(configuration::GetChannel(
2450 factory.configuration(), pi.first, "aos.message_bridge.Timestamp", "",
2451 pi.second));
2452 statistics_channels.insert(configuration::GetChannel(
2453 factory.configuration(), pi.first,
2454 "aos.message_bridge.ServerStatistics", "", pi.second));
2455 statistics_channels.insert(configuration::GetChannel(
2456 factory.configuration(), pi.first,
2457 "aos.message_bridge.ClientStatistics", "", pi.second));
2458 }
2459
2460 VerifyChannels(statistics_channels, disconnect_disable_time, {pi2->node()});
2461 }
2462}
2463
Stephan Pleinesf63bde82024-01-13 15:59:33 -08002464} // namespace aos::testing