blob: 14e0a0831d313be11145278817abddf5386baa88 [file] [log] [blame]
Alex Perrycb7da4b2019-08-28 19:35:56 -07001#include "aos/events/simulated_event_loop.h"
2
milind1f1dca32021-07-03 13:50:07 -07003#include <chrono>
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -07004#include <functional>
Austin Schuh5f1cc5c2019-12-01 18:01:11 -08005#include <string_view>
6
Philipp Schrader790cb542023-07-05 21:06:52 -07007#include "gtest/gtest.h"
8
Alex Perrycb7da4b2019-08-28 19:35:56 -07009#include "aos/events/event_loop_param_test.h"
Austin Schuh2f8fd752020-09-01 22:38:28 -070010#include "aos/events/logging/logger_generated.h"
Austin Schuh01b4c352020-09-21 23:09:39 -070011#include "aos/events/message_counter.h"
Austin Schuh898f4972020-01-11 17:21:25 -080012#include "aos/events/ping_lib.h"
13#include "aos/events/pong_lib.h"
Austin Schuh7d87b672019-12-01 20:23:49 -080014#include "aos/events/test_message_generated.h"
Austin Schuh4c3b9702020-08-30 11:34:55 -070015#include "aos/network/message_bridge_client_generated.h"
16#include "aos/network/message_bridge_server_generated.h"
Austin Schuh0de30f32020-12-06 12:44:28 -080017#include "aos/network/remote_message_generated.h"
Austin Schuh87dd3832021-01-01 23:07:31 -080018#include "aos/network/testing_time_converter.h"
Austin Schuh4c3b9702020-08-30 11:34:55 -070019#include "aos/network/timestamp_generated.h"
Austin Schuh373f1762021-06-02 21:07:09 -070020#include "aos/testing/path.h"
Neil Balchc8f41ed2018-01-20 22:06:53 -080021
22namespace aos {
23namespace testing {
Brian Silverman28d14302020-09-18 15:26:17 -070024namespace {
25
Austin Schuh373f1762021-06-02 21:07:09 -070026using aos::testing::ArtifactPath;
Brian Silverman28d14302020-09-18 15:26:17 -070027
Austin Schuh58646e22021-08-23 23:51:46 -070028using logger::BootTimestamp;
Austin Schuh0de30f32020-12-06 12:44:28 -080029using message_bridge::RemoteMessage;
Austin Schuh7267c532019-05-19 19:55:53 -070030namespace chrono = ::std::chrono;
31
Austin Schuh0de30f32020-12-06 12:44:28 -080032} // namespace
33
Neil Balchc8f41ed2018-01-20 22:06:53 -080034class SimulatedEventLoopTestFactory : public EventLoopTestFactory {
35 public:
Austin Schuh5f1cc5c2019-12-01 18:01:11 -080036 ::std::unique_ptr<EventLoop> Make(std::string_view name) override {
Austin Schuh217a9782019-12-21 23:02:50 -080037 MaybeMake();
Austin Schuhac0771c2020-01-07 18:36:30 -080038 return event_loop_factory_->MakeEventLoop(name, my_node());
Neil Balchc8f41ed2018-01-20 22:06:53 -080039 }
Austin Schuh5f1cc5c2019-12-01 18:01:11 -080040 ::std::unique_ptr<EventLoop> MakePrimary(std::string_view name) override {
Austin Schuh217a9782019-12-21 23:02:50 -080041 MaybeMake();
Austin Schuhac0771c2020-01-07 18:36:30 -080042 return event_loop_factory_->MakeEventLoop(name, my_node());
Austin Schuh44019f92019-05-19 19:58:27 -070043 }
44
Austin Schuh217a9782019-12-21 23:02:50 -080045 void Run() override { event_loop_factory_->Run(); }
46 void Exit() override { event_loop_factory_->Exit(); }
Austin Schuh44019f92019-05-19 19:58:27 -070047
Austin Schuh52d325c2019-06-23 18:59:06 -070048 // TODO(austin): Implement this. It's used currently for a phased loop test.
49 // I'm not sure how much that matters.
50 void SleepFor(::std::chrono::nanoseconds /*duration*/) override {}
51
Austin Schuh7d87b672019-12-01 20:23:49 -080052 void set_send_delay(std::chrono::nanoseconds send_delay) {
Austin Schuh217a9782019-12-21 23:02:50 -080053 MaybeMake();
54 event_loop_factory_->set_send_delay(send_delay);
Austin Schuh7d87b672019-12-01 20:23:49 -080055 }
56
Neil Balchc8f41ed2018-01-20 22:06:53 -080057 private:
Austin Schuh217a9782019-12-21 23:02:50 -080058 void MaybeMake() {
59 if (!event_loop_factory_) {
60 if (configuration()->has_nodes()) {
Austin Schuhac0771c2020-01-07 18:36:30 -080061 event_loop_factory_ =
62 std::make_unique<SimulatedEventLoopFactory>(configuration());
Austin Schuh217a9782019-12-21 23:02:50 -080063 } else {
64 event_loop_factory_ =
65 std::make_unique<SimulatedEventLoopFactory>(configuration());
66 }
67 }
68 }
69 std::unique_ptr<SimulatedEventLoopFactory> event_loop_factory_;
Neil Balchc8f41ed2018-01-20 22:06:53 -080070};
71
Austin Schuh6bae8252021-02-07 22:01:49 -080072auto CommonParameters() {
73 return ::testing::Combine(
74 ::testing::Values([]() { return new SimulatedEventLoopTestFactory(); }),
75 ::testing::Values(ReadMethod::COPY, ReadMethod::PIN),
76 ::testing::Values(DoTimingReports::kYes, DoTimingReports::kNo));
77}
Austin Schuh6b6dfa52019-06-12 20:16:20 -070078
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -070079INSTANTIATE_TEST_SUITE_P(SimulatedEventLoopCommonTest, AbstractEventLoopTest,
Austin Schuh66168842021-08-17 19:42:21 -070080 CommonParameters());
Brian Silverman77162972020-08-12 19:52:40 -070081
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -070082INSTANTIATE_TEST_SUITE_P(SimulatedEventLoopCommonDeathTest,
Austin Schuh66168842021-08-17 19:42:21 -070083 AbstractEventLoopDeathTest, CommonParameters());
Neil Balchc8f41ed2018-01-20 22:06:53 -080084
Austin Schuh89c9b812021-02-20 14:42:10 -080085// Parameters to run all the tests with.
86struct Param {
87 // The config file to use.
88 std::string config;
89 // If true, the RemoteMessage channel should be shared between all the remote
90 // channels. If false, there will be 1 RemoteMessage channel per remote
91 // channel.
92 bool shared;
93};
94
95class RemoteMessageSimulatedEventLoopTest
96 : public ::testing::TestWithParam<struct Param> {
97 public:
98 RemoteMessageSimulatedEventLoopTest()
99 : config(aos::configuration::ReadConfig(
Austin Schuh373f1762021-06-02 21:07:09 -0700100 ArtifactPath(absl::StrCat("aos/events/", GetParam().config)))) {
Austin Schuh89c9b812021-02-20 14:42:10 -0800101 LOG(INFO) << "Config " << GetParam().config;
102 }
103
104 bool shared() const { return GetParam().shared; }
105
106 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
107 MakePi2OnPi1MessageCounters(aos::EventLoop *event_loop) {
108 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>> counters;
109 if (shared()) {
110 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
111 event_loop, "/aos/remote_timestamps/pi2"));
112 } else {
113 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
114 event_loop,
115 "/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp"));
116 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
117 event_loop, "/aos/remote_timestamps/pi2/test/aos-examples-Ping"));
118 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
119 event_loop, "/aos/remote_timestamps/pi2/reliable/aos-examples-Ping"));
120 }
121 return counters;
122 }
123
124 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
125 MakePi1OnPi2MessageCounters(aos::EventLoop *event_loop) {
126 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>> counters;
127 if (shared()) {
128 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
129 event_loop, "/aos/remote_timestamps/pi1"));
130 } else {
131 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
132 event_loop, "/aos/remote_timestamps/pi1/test/aos-examples-Pong"));
133 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
134 event_loop,
135 "/aos/remote_timestamps/pi1/pi2/aos/aos-message_bridge-Timestamp"));
136 }
137 return counters;
138 }
139
140 aos::FlatbufferDetachedBuffer<aos::Configuration> config;
141};
142
Austin Schuh8fb315a2020-11-19 22:33:58 -0800143// Test that sending a message after running gets properly notified.
144TEST(SimulatedEventLoopTest, SendAfterRunFor) {
145 SimulatedEventLoopTestFactory factory;
146
147 SimulatedEventLoopFactory simulated_event_loop_factory(
148 factory.configuration());
149
150 ::std::unique_ptr<EventLoop> ping_event_loop =
151 simulated_event_loop_factory.MakeEventLoop("ping");
152 aos::Sender<TestMessage> test_message_sender =
153 ping_event_loop->MakeSender<TestMessage>("/test");
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700154 ASSERT_EQ(SendTestMessage(test_message_sender), RawSender::Error::kOk);
Austin Schuh8fb315a2020-11-19 22:33:58 -0800155
156 std::unique_ptr<EventLoop> pong1_event_loop =
157 simulated_event_loop_factory.MakeEventLoop("pong");
158 MessageCounter<TestMessage> test_message_counter1(pong1_event_loop.get(),
159 "/test");
160
161 EXPECT_FALSE(ping_event_loop->is_running());
162
163 // Watchers start when you start running, so there should be nothing counted.
164 simulated_event_loop_factory.RunFor(chrono::seconds(1));
165 EXPECT_EQ(test_message_counter1.count(), 0u);
166
167 std::unique_ptr<EventLoop> pong2_event_loop =
168 simulated_event_loop_factory.MakeEventLoop("pong");
169 MessageCounter<TestMessage> test_message_counter2(pong2_event_loop.get(),
170 "/test");
171
172 // Pauses in the middle don't count though, so this should be counted.
173 // But, the fresh watcher shouldn't pick it up yet.
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700174 ASSERT_EQ(SendTestMessage(test_message_sender), RawSender::Error::kOk);
Austin Schuh8fb315a2020-11-19 22:33:58 -0800175
176 EXPECT_EQ(test_message_counter1.count(), 0u);
177 EXPECT_EQ(test_message_counter2.count(), 0u);
178 simulated_event_loop_factory.RunFor(chrono::seconds(1));
179
180 EXPECT_EQ(test_message_counter1.count(), 1u);
181 EXPECT_EQ(test_message_counter2.count(), 0u);
182}
183
Austin Schuh60e77942022-05-16 17:48:24 -0700184// Test that if we configure an event loop to be able to send too fast that we
185// do allow it to do so.
James Kuszmaul890c2492022-04-06 14:59:31 -0700186TEST(SimulatedEventLoopTest, AllowSendTooFast) {
187 SimulatedEventLoopTestFactory factory;
188
189 SimulatedEventLoopFactory simulated_event_loop_factory(
190 factory.configuration());
191
192 // Create two event loops: One will be allowed to send too fast, one won't. We
193 // will then test to ensure that the one that is allowed to send too fast can
194 // indeed send too fast, but that it then makes it so that the second event
195 // loop can no longer send anything because *it* is still limited.
196 ::std::unique_ptr<EventLoop> too_fast_event_loop =
197 simulated_event_loop_factory.GetNodeEventLoopFactory(nullptr)
198 ->MakeEventLoop("too_fast_sender",
199 {NodeEventLoopFactory::CheckSentTooFast::kNo,
James Kuszmaul94ca5132022-07-19 09:11:08 -0700200 NodeEventLoopFactory::ExclusiveSenders::kNo,
201 {}});
James Kuszmaul890c2492022-04-06 14:59:31 -0700202 aos::Sender<TestMessage> too_fast_message_sender =
203 too_fast_event_loop->MakeSender<TestMessage>("/test");
204
205 ::std::unique_ptr<EventLoop> limited_event_loop =
206 simulated_event_loop_factory.MakeEventLoop("limited_sender");
207 aos::Sender<TestMessage> limited_message_sender =
208 limited_event_loop->MakeSender<TestMessage>("/test");
209
210 const int queue_size = TestChannelQueueSize(too_fast_event_loop.get());
211 for (int ii = 0; ii < queue_size; ++ii) {
212 ASSERT_EQ(SendTestMessage(too_fast_message_sender), RawSender::Error::kOk);
213 }
214 // And now we should start being in the sending-too-fast phase.
215 for (int ii = 0; ii < queue_size; ++ii) {
216 ASSERT_EQ(SendTestMessage(too_fast_message_sender), RawSender::Error::kOk);
Austin Schuh60e77942022-05-16 17:48:24 -0700217 ASSERT_EQ(SendTestMessage(limited_message_sender),
218 RawSender::Error::kMessagesSentTooFast);
James Kuszmaul890c2492022-04-06 14:59:31 -0700219 }
220}
221
222// Test that if we setup an exclusive sender that it is indeed exclusive.
223TEST(SimulatedEventLoopDeathTest, ExclusiveSenders) {
224 SimulatedEventLoopTestFactory factory;
225
226 SimulatedEventLoopFactory simulated_event_loop_factory(
227 factory.configuration());
228
229 ::std::unique_ptr<EventLoop> exclusive_event_loop =
230 simulated_event_loop_factory.GetNodeEventLoopFactory(nullptr)
James Kuszmaul94ca5132022-07-19 09:11:08 -0700231 ->MakeEventLoop(
232 "too_fast_sender",
233 {NodeEventLoopFactory::CheckSentTooFast::kYes,
234 NodeEventLoopFactory::ExclusiveSenders::kYes,
235 {{configuration::GetChannel(factory.configuration(), "/test1",
236 "aos.TestMessage", "", nullptr),
237 NodeEventLoopFactory::ExclusiveSenders::kNo}}});
James Kuszmaul890c2492022-04-06 14:59:31 -0700238 exclusive_event_loop->SkipAosLog();
239 exclusive_event_loop->SkipTimingReport();
240 ::std::unique_ptr<EventLoop> normal_event_loop =
241 simulated_event_loop_factory.MakeEventLoop("limited_sender");
242 // Set things up to have the exclusive sender be destroyed so we can test
243 // recovery.
244 {
245 aos::Sender<TestMessage> exclusive_sender =
246 exclusive_event_loop->MakeSender<TestMessage>("/test");
247
248 EXPECT_DEATH(normal_event_loop->MakeSender<TestMessage>("/test"),
249 "TestMessage");
250 }
251 // This one should succeed now that the exclusive channel is removed.
252 aos::Sender<TestMessage> normal_sender =
253 normal_event_loop->MakeSender<TestMessage>("/test");
Austin Schuh60e77942022-05-16 17:48:24 -0700254 EXPECT_DEATH(exclusive_event_loop->MakeSender<TestMessage>("/test"),
255 "TestMessage");
James Kuszmaul94ca5132022-07-19 09:11:08 -0700256
257 // And check an explicitly exempted channel:
258 aos::Sender<TestMessage> non_exclusive_sender =
259 exclusive_event_loop->MakeSender<TestMessage>("/test1");
260 aos::Sender<TestMessage> non_exclusive_sender_regular_event_loop =
261 normal_event_loop->MakeSender<TestMessage>("/test1");
James Kuszmaul890c2492022-04-06 14:59:31 -0700262}
263
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700264void TestSentTooFastCheckEdgeCase(
265 const std::function<RawSender::Error(int, int)> expected_err,
266 const bool send_twice_at_end) {
267 SimulatedEventLoopTestFactory factory;
268
269 auto event_loop = factory.MakePrimary("primary");
270
271 auto sender = event_loop->MakeSender<TestMessage>("/test");
272
273 const int queue_size = TestChannelQueueSize(event_loop.get());
274 int msgs_sent = 0;
275 event_loop->AddPhasedLoop(
276 [&](int) {
277 EXPECT_EQ(SendTestMessage(sender), expected_err(msgs_sent, queue_size));
278 msgs_sent++;
279
280 // If send_twice_at_end, send the last two messages (message
281 // queue_size and queue_size + 1) in the same iteration, meaning that
282 // we would be sending very slightly too fast. Otherwise, we will send
283 // message queue_size + 1 in the next iteration and we will continue
284 // to be sending exactly at the channel frequency.
285 if (send_twice_at_end && (msgs_sent == queue_size)) {
286 EXPECT_EQ(SendTestMessage(sender),
287 expected_err(msgs_sent, queue_size));
288 msgs_sent++;
289 }
290
291 if (msgs_sent > queue_size) {
292 factory.Exit();
293 }
294 },
295 std::chrono::duration_cast<std::chrono::nanoseconds>(
296 std::chrono::duration<double>(
297 1.0 / TestChannelFrequency(event_loop.get()))));
298
299 factory.Run();
300}
301
302// Tests that RawSender::Error::kMessagesSentTooFast is not returned
303// when messages are sent at the exact frequency of the channel.
304TEST(SimulatedEventLoopTest, SendingAtExactlyChannelFrequency) {
305 TestSentTooFastCheckEdgeCase([](int, int) { return RawSender::Error::kOk; },
306 false);
307}
308
309// Tests that RawSender::Error::kMessagesSentTooFast is returned
310// when sending exactly one more message than allowed in a channel storage
311// duration.
312TEST(SimulatedEventLoopTest, SendingSlightlyTooFast) {
313 TestSentTooFastCheckEdgeCase(
314 [](const int msgs_sent, const int queue_size) {
315 return (msgs_sent == queue_size ? RawSender::Error::kMessagesSentTooFast
316 : RawSender::Error::kOk);
317 },
318 true);
319}
320
Austin Schuh8fb315a2020-11-19 22:33:58 -0800321// Test that creating an event loop while running dies.
322TEST(SimulatedEventLoopDeathTest, MakeEventLoopWhileRunning) {
323 SimulatedEventLoopTestFactory factory;
324
325 SimulatedEventLoopFactory simulated_event_loop_factory(
326 factory.configuration());
327
328 ::std::unique_ptr<EventLoop> event_loop =
329 simulated_event_loop_factory.MakeEventLoop("ping");
330
331 auto timer = event_loop->AddTimer([&]() {
332 EXPECT_DEATH(
333 {
334 ::std::unique_ptr<EventLoop> event_loop2 =
335 simulated_event_loop_factory.MakeEventLoop("ping");
336 },
337 "event loop while running");
338 simulated_event_loop_factory.Exit();
339 });
340
341 event_loop->OnRun([&event_loop, &timer] {
Philipp Schradera6712522023-07-05 20:25:11 -0700342 timer->Schedule(event_loop->monotonic_now() + chrono::milliseconds(50));
Austin Schuh8fb315a2020-11-19 22:33:58 -0800343 });
344
345 simulated_event_loop_factory.Run();
346}
347
348// Test that creating a watcher after running dies.
349TEST(SimulatedEventLoopDeathTest, MakeWatcherAfterRunning) {
350 SimulatedEventLoopTestFactory factory;
351
352 SimulatedEventLoopFactory simulated_event_loop_factory(
353 factory.configuration());
354
355 ::std::unique_ptr<EventLoop> event_loop =
356 simulated_event_loop_factory.MakeEventLoop("ping");
357
358 simulated_event_loop_factory.RunFor(chrono::seconds(1));
359
360 EXPECT_DEATH(
361 { MessageCounter<TestMessage> counter(event_loop.get(), "/test"); },
362 "Can't add a watcher after running");
363
364 ::std::unique_ptr<EventLoop> event_loop2 =
365 simulated_event_loop_factory.MakeEventLoop("ping");
366
367 simulated_event_loop_factory.RunFor(chrono::seconds(1));
368
369 EXPECT_DEATH(
370 { MessageCounter<TestMessage> counter(event_loop2.get(), "/test"); },
371 "Can't add a watcher after running");
372}
373
Austin Schuh44019f92019-05-19 19:58:27 -0700374// Test that running for a time period with no handlers causes time to progress
375// correctly.
376TEST(SimulatedEventLoopTest, RunForNoHandlers) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800377 SimulatedEventLoopTestFactory factory;
378
379 SimulatedEventLoopFactory simulated_event_loop_factory(
380 factory.configuration());
Austin Schuh44019f92019-05-19 19:58:27 -0700381 ::std::unique_ptr<EventLoop> event_loop =
Austin Schuh5f1cc5c2019-12-01 18:01:11 -0800382 simulated_event_loop_factory.MakeEventLoop("loop");
Austin Schuh44019f92019-05-19 19:58:27 -0700383
384 simulated_event_loop_factory.RunFor(chrono::seconds(1));
385
386 EXPECT_EQ(::aos::monotonic_clock::epoch() + chrono::seconds(1),
Austin Schuh44019f92019-05-19 19:58:27 -0700387 event_loop->monotonic_now());
388}
389
390// Test that running for a time with a periodic handler causes time to end
391// correctly.
392TEST(SimulatedEventLoopTest, RunForTimerHandler) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800393 SimulatedEventLoopTestFactory factory;
394
395 SimulatedEventLoopFactory simulated_event_loop_factory(
396 factory.configuration());
Austin Schuh44019f92019-05-19 19:58:27 -0700397 ::std::unique_ptr<EventLoop> event_loop =
Austin Schuh5f1cc5c2019-12-01 18:01:11 -0800398 simulated_event_loop_factory.MakeEventLoop("loop");
Austin Schuh44019f92019-05-19 19:58:27 -0700399
400 int counter = 0;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700401 auto timer = event_loop->AddTimer([&counter]() { ++counter; });
Austin Schuh44019f92019-05-19 19:58:27 -0700402 event_loop->OnRun([&event_loop, &timer] {
Philipp Schradera6712522023-07-05 20:25:11 -0700403 timer->Schedule(event_loop->monotonic_now() + chrono::milliseconds(50),
404 chrono::milliseconds(100));
Austin Schuh44019f92019-05-19 19:58:27 -0700405 });
406
407 simulated_event_loop_factory.RunFor(chrono::seconds(1));
408
409 EXPECT_EQ(::aos::monotonic_clock::epoch() + chrono::seconds(1),
Austin Schuh44019f92019-05-19 19:58:27 -0700410 event_loop->monotonic_now());
411 EXPECT_EQ(counter, 10);
412}
413
Austin Schuh7d87b672019-12-01 20:23:49 -0800414// Tests that watchers have latency in simulation.
415TEST(SimulatedEventLoopTest, WatcherTimingReport) {
416 SimulatedEventLoopTestFactory factory;
417 factory.set_send_delay(std::chrono::microseconds(50));
418
419 FLAGS_timing_report_ms = 1000;
420 auto loop1 = factory.MakePrimary("primary");
421 loop1->MakeWatcher("/test", [](const TestMessage &) {});
422
423 auto loop2 = factory.Make("sender_loop");
424
425 auto loop3 = factory.Make("report_fetcher");
426
427 Fetcher<timing::Report> report_fetcher =
428 loop3->MakeFetcher<timing::Report>("/aos");
429 EXPECT_FALSE(report_fetcher.Fetch());
430
431 auto sender = loop2->MakeSender<TestMessage>("/test");
432
433 // Send 10 messages in the middle of a timing report period so we get
434 // something interesting back.
435 auto test_timer = loop2->AddTimer([&sender]() {
436 for (int i = 0; i < 10; ++i) {
437 aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
438 TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
439 builder.add_value(200 + i);
milind1f1dca32021-07-03 13:50:07 -0700440 msg.CheckOk(msg.Send(builder.Finish()));
Austin Schuh7d87b672019-12-01 20:23:49 -0800441 }
442 });
443
444 // Quit after 1 timing report, mid way through the next cycle.
445 {
446 auto end_timer = loop1->AddTimer([&factory]() { factory.Exit(); });
Philipp Schradera6712522023-07-05 20:25:11 -0700447 end_timer->Schedule(loop1->monotonic_now() + chrono::milliseconds(2500));
Austin Schuh7d87b672019-12-01 20:23:49 -0800448 end_timer->set_name("end");
449 }
450
451 loop1->OnRun([&test_timer, &loop1]() {
Philipp Schradera6712522023-07-05 20:25:11 -0700452 test_timer->Schedule(loop1->monotonic_now() + chrono::milliseconds(1500));
Austin Schuh7d87b672019-12-01 20:23:49 -0800453 });
454
455 factory.Run();
456
457 // And, since we are here, check that the timing report makes sense.
458 // Start by looking for our event loop's timing.
459 FlatbufferDetachedBuffer<timing::Report> primary_report =
460 FlatbufferDetachedBuffer<timing::Report>::Empty();
461 while (report_fetcher.FetchNext()) {
462 LOG(INFO) << "Report " << FlatbufferToJson(report_fetcher.get());
463 if (report_fetcher->name()->string_view() == "primary") {
464 primary_report = CopyFlatBuffer(report_fetcher.get());
465 }
466 }
467
468 // Check the watcher report.
Ravago Jonescf453ab2020-05-06 21:14:53 -0700469 VLOG(1) << FlatbufferToJson(primary_report, {.multi_line = true});
Austin Schuh7d87b672019-12-01 20:23:49 -0800470
471 EXPECT_EQ(primary_report.message().name()->string_view(), "primary");
472
473 // Just the timing report timer.
474 ASSERT_NE(primary_report.message().timers(), nullptr);
475 EXPECT_EQ(primary_report.message().timers()->size(), 2);
476
477 // No phased loops
478 ASSERT_EQ(primary_report.message().phased_loops(), nullptr);
479
480 // And now confirm that the watcher received all 10 messages, and has latency.
481 ASSERT_NE(primary_report.message().watchers(), nullptr);
482 ASSERT_EQ(primary_report.message().watchers()->size(), 1);
483 EXPECT_EQ(primary_report.message().watchers()->Get(0)->count(), 10);
484 EXPECT_NEAR(
485 primary_report.message().watchers()->Get(0)->wakeup_latency()->average(),
486 0.00005, 1e-9);
487 EXPECT_NEAR(
488 primary_report.message().watchers()->Get(0)->wakeup_latency()->min(),
489 0.00005, 1e-9);
490 EXPECT_NEAR(
491 primary_report.message().watchers()->Get(0)->wakeup_latency()->max(),
492 0.00005, 1e-9);
493 EXPECT_EQ(primary_report.message()
494 .watchers()
495 ->Get(0)
496 ->wakeup_latency()
497 ->standard_deviation(),
498 0.0);
499
500 EXPECT_EQ(
501 primary_report.message().watchers()->Get(0)->handler_time()->average(),
502 0.0);
503 EXPECT_EQ(primary_report.message().watchers()->Get(0)->handler_time()->min(),
504 0.0);
505 EXPECT_EQ(primary_report.message().watchers()->Get(0)->handler_time()->max(),
506 0.0);
507 EXPECT_EQ(primary_report.message()
508 .watchers()
509 ->Get(0)
510 ->handler_time()
511 ->standard_deviation(),
512 0.0);
513}
514
Austin Schuh89c9b812021-02-20 14:42:10 -0800515size_t CountAll(
516 const std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
517 &counters) {
518 size_t count = 0u;
519 for (const std::unique_ptr<MessageCounter<RemoteMessage>> &counter :
520 counters) {
521 count += counter->count();
522 }
523 return count;
524}
525
Austin Schuh4c3b9702020-08-30 11:34:55 -0700526// Tests that ping and pong work when on 2 different nodes, and the message
527// gateway messages are sent out as expected.
Austin Schuh89c9b812021-02-20 14:42:10 -0800528TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodePingPong) {
Austin Schuh898f4972020-01-11 17:21:25 -0800529 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
530 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
Austin Schuh4c3b9702020-08-30 11:34:55 -0700531 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
Austin Schuh898f4972020-01-11 17:21:25 -0800532
533 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
534
535 std::unique_ptr<EventLoop> ping_event_loop =
536 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
537 Ping ping(ping_event_loop.get());
538
539 std::unique_ptr<EventLoop> pong_event_loop =
540 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
541 Pong pong(pong_event_loop.get());
542
543 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
544 simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700545 MessageCounter<examples::Pong> pi2_pong_counter(
546 pi2_pong_counter_event_loop.get(), "/test");
Austin Schuh2f8fd752020-09-01 22:38:28 -0700547 aos::Fetcher<message_bridge::Timestamp> pi1_on_pi2_timestamp_fetcher =
548 pi2_pong_counter_event_loop->MakeFetcher<message_bridge::Timestamp>(
549 "/pi1/aos");
550 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
551 pi2_pong_counter_event_loop->MakeFetcher<examples::Ping>("/test");
Austin Schuh898f4972020-01-11 17:21:25 -0800552
Austin Schuh4c3b9702020-08-30 11:34:55 -0700553 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
554 simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
Austin Schuh898f4972020-01-11 17:21:25 -0800555
556 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
557 simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700558 MessageCounter<examples::Pong> pi1_pong_counter(
559 pi1_pong_counter_event_loop.get(), "/test");
Austin Schuh2f8fd752020-09-01 22:38:28 -0700560 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
561 pi1_pong_counter_event_loop->MakeFetcher<examples::Ping>("/test");
562 aos::Fetcher<message_bridge::Timestamp> pi1_on_pi1_timestamp_fetcher =
563 pi1_pong_counter_event_loop->MakeFetcher<message_bridge::Timestamp>(
564 "/aos");
565
Austin Schuh4c3b9702020-08-30 11:34:55 -0700566 // Count timestamps.
567 MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
568 pi1_pong_counter_event_loop.get(), "/pi1/aos");
569 MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
570 pi2_pong_counter_event_loop.get(), "/pi1/aos");
571 MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
572 pi3_pong_counter_event_loop.get(), "/pi1/aos");
573 MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
574 pi1_pong_counter_event_loop.get(), "/pi2/aos");
575 MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
576 pi2_pong_counter_event_loop.get(), "/pi2/aos");
577 MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
578 pi1_pong_counter_event_loop.get(), "/pi3/aos");
579 MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
580 pi3_pong_counter_event_loop.get(), "/pi3/aos");
581
Austin Schuh2f8fd752020-09-01 22:38:28 -0700582 // Count remote timestamps
Austin Schuh89c9b812021-02-20 14:42:10 -0800583 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
584 remote_timestamps_pi2_on_pi1 =
585 MakePi2OnPi1MessageCounters(pi1_pong_counter_event_loop.get());
586 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
587 remote_timestamps_pi1_on_pi2 =
588 MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700589
Austin Schuh4c3b9702020-08-30 11:34:55 -0700590 // Wait to let timestamp estimation start up before looking for the results.
591 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
592
Austin Schuh8fb315a2020-11-19 22:33:58 -0800593 std::unique_ptr<EventLoop> pi1_statistics_counter_event_loop =
594 simulated_event_loop_factory.MakeEventLoop("pi1_statistics_counter", pi1);
595 std::unique_ptr<EventLoop> pi2_statistics_counter_event_loop =
596 simulated_event_loop_factory.MakeEventLoop("pi2_statistics_counter", pi2);
597 std::unique_ptr<EventLoop> pi3_statistics_counter_event_loop =
598 simulated_event_loop_factory.MakeEventLoop("pi3_statistics_counter", pi3);
599
Austin Schuh4c3b9702020-08-30 11:34:55 -0700600 int pi1_server_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800601 pi1_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700602 "/pi1/aos", [&pi1_server_statistics_count](
603 const message_bridge::ServerStatistics &stats) {
604 VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
605 EXPECT_EQ(stats.connections()->size(), 2u);
606 for (const message_bridge::ServerConnection *connection :
607 *stats.connections()) {
608 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800609 EXPECT_EQ(connection->connection_count(), 1u);
610 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800611 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700612 if (connection->node()->name()->string_view() == "pi2") {
613 EXPECT_GT(connection->sent_packets(), 50);
614 } else if (connection->node()->name()->string_view() == "pi3") {
615 EXPECT_GE(connection->sent_packets(), 5);
616 } else {
617 LOG(FATAL) << "Unknown connection";
618 }
619
620 EXPECT_TRUE(connection->has_monotonic_offset());
621 EXPECT_EQ(connection->monotonic_offset(), 0);
622 }
623 ++pi1_server_statistics_count;
624 });
625
626 int pi2_server_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800627 pi2_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700628 "/pi2/aos", [&pi2_server_statistics_count](
629 const message_bridge::ServerStatistics &stats) {
630 VLOG(1) << "pi2 ServerStatistics " << FlatbufferToJson(&stats);
631 EXPECT_EQ(stats.connections()->size(), 1u);
632
633 const message_bridge::ServerConnection *connection =
634 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800635 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700636 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
637 EXPECT_GT(connection->sent_packets(), 50);
638 EXPECT_TRUE(connection->has_monotonic_offset());
639 EXPECT_EQ(connection->monotonic_offset(), 0);
Austin Schuh367a7f42021-11-23 23:04:36 -0800640 EXPECT_EQ(connection->connection_count(), 1u);
641 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700642 ++pi2_server_statistics_count;
643 });
644
645 int pi3_server_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800646 pi3_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700647 "/pi3/aos", [&pi3_server_statistics_count](
648 const message_bridge::ServerStatistics &stats) {
649 VLOG(1) << "pi3 ServerStatistics " << FlatbufferToJson(&stats);
650 EXPECT_EQ(stats.connections()->size(), 1u);
651
652 const message_bridge::ServerConnection *connection =
653 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800654 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700655 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
656 EXPECT_GE(connection->sent_packets(), 5);
657 EXPECT_TRUE(connection->has_monotonic_offset());
658 EXPECT_EQ(connection->monotonic_offset(), 0);
Austin Schuh367a7f42021-11-23 23:04:36 -0800659 EXPECT_EQ(connection->connection_count(), 1u);
660 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700661 ++pi3_server_statistics_count;
662 });
663
664 int pi1_client_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800665 pi1_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700666 "/pi1/aos", [&pi1_client_statistics_count](
667 const message_bridge::ClientStatistics &stats) {
668 VLOG(1) << "pi1 ClientStatistics " << FlatbufferToJson(&stats);
669 EXPECT_EQ(stats.connections()->size(), 2u);
670
671 for (const message_bridge::ClientConnection *connection :
672 *stats.connections()) {
673 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
674 if (connection->node()->name()->string_view() == "pi2") {
675 EXPECT_GT(connection->received_packets(), 50);
676 } else if (connection->node()->name()->string_view() == "pi3") {
677 EXPECT_GE(connection->received_packets(), 5);
678 } else {
679 LOG(FATAL) << "Unknown connection";
680 }
681
Austin Schuhe61d4382021-03-31 21:33:02 -0700682 EXPECT_EQ(connection->partial_deliveries(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700683 EXPECT_TRUE(connection->has_monotonic_offset());
684 EXPECT_EQ(connection->monotonic_offset(), 150000);
Austin Schuh367a7f42021-11-23 23:04:36 -0800685 EXPECT_EQ(connection->connection_count(), 1u);
686 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700687 }
688 ++pi1_client_statistics_count;
689 });
690
691 int pi2_client_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800692 pi2_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700693 "/pi2/aos", [&pi2_client_statistics_count](
694 const message_bridge::ClientStatistics &stats) {
695 VLOG(1) << "pi2 ClientStatistics " << FlatbufferToJson(&stats);
696 EXPECT_EQ(stats.connections()->size(), 1u);
697
698 const message_bridge::ClientConnection *connection =
699 stats.connections()->Get(0);
700 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
701 EXPECT_GT(connection->received_packets(), 50);
Austin Schuhe61d4382021-03-31 21:33:02 -0700702 EXPECT_EQ(connection->partial_deliveries(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700703 EXPECT_TRUE(connection->has_monotonic_offset());
704 EXPECT_EQ(connection->monotonic_offset(), 150000);
Austin Schuh367a7f42021-11-23 23:04:36 -0800705 EXPECT_EQ(connection->connection_count(), 1u);
706 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700707 ++pi2_client_statistics_count;
708 });
709
710 int pi3_client_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800711 pi3_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700712 "/pi3/aos", [&pi3_client_statistics_count](
713 const message_bridge::ClientStatistics &stats) {
714 VLOG(1) << "pi3 ClientStatistics " << FlatbufferToJson(&stats);
715 EXPECT_EQ(stats.connections()->size(), 1u);
716
717 const message_bridge::ClientConnection *connection =
718 stats.connections()->Get(0);
719 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
720 EXPECT_GE(connection->received_packets(), 5);
Austin Schuhe61d4382021-03-31 21:33:02 -0700721 EXPECT_EQ(connection->partial_deliveries(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700722 EXPECT_TRUE(connection->has_monotonic_offset());
723 EXPECT_EQ(connection->monotonic_offset(), 150000);
Austin Schuh367a7f42021-11-23 23:04:36 -0800724 EXPECT_EQ(connection->connection_count(), 1u);
725 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700726 ++pi3_client_statistics_count;
727 });
728
Austin Schuh2f8fd752020-09-01 22:38:28 -0700729 // Find the channel index for both the /pi1/aos Timestamp channel and Ping
730 // channel.
731 const size_t pi1_timestamp_channel =
732 configuration::ChannelIndex(pi1_pong_counter_event_loop->configuration(),
733 pi1_on_pi2_timestamp_fetcher.channel());
734 const size_t ping_timestamp_channel =
735 configuration::ChannelIndex(pi1_pong_counter_event_loop->configuration(),
736 ping_on_pi2_fetcher.channel());
737
738 for (const Channel *channel :
739 *pi1_pong_counter_event_loop->configuration()->channels()) {
740 VLOG(1) << "Channel "
741 << configuration::ChannelIndex(
742 pi1_pong_counter_event_loop->configuration(), channel)
743 << " " << configuration::CleanedChannelToString(channel);
744 }
745
Austin Schuh8fb315a2020-11-19 22:33:58 -0800746 std::unique_ptr<EventLoop> pi1_remote_timestamp =
747 simulated_event_loop_factory.MakeEventLoop("pi1_remote_timestamp", pi1);
748
Austin Schuh89c9b812021-02-20 14:42:10 -0800749 for (std::pair<int, std::string> channel :
750 shared()
751 ? std::vector<std::pair<
752 int, std::string>>{{-1, "/pi1/aos/remote_timestamps/pi2"}}
753 : std::vector<std::pair<int, std::string>>{
754 {pi1_timestamp_channel,
755 "/pi1/aos/remote_timestamps/pi2/pi1/aos/"
756 "aos-message_bridge-Timestamp"},
757 {ping_timestamp_channel,
758 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping"}}) {
759 // For each remote timestamp we get back, confirm that it is either a ping
760 // message, or a timestamp we sent out. Also confirm that the timestamps
761 // are correct.
762 pi1_remote_timestamp->MakeWatcher(
763 channel.second,
764 [pi1_timestamp_channel, ping_timestamp_channel, &ping_on_pi2_fetcher,
765 &ping_on_pi1_fetcher, &pi1_on_pi2_timestamp_fetcher,
766 &pi1_on_pi1_timestamp_fetcher, &simulated_event_loop_factory, pi2,
767 channel_index = channel.first](const RemoteMessage &header) {
768 VLOG(1) << aos::FlatbufferToJson(&header);
769 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuhcdd90272021-03-15 12:46:16 -0700770 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
Austin Schuh89c9b812021-02-20 14:42:10 -0800771 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
Austin Schuhcdd90272021-03-15 12:46:16 -0700772 ->boot_uuid());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700773
Austin Schuh89c9b812021-02-20 14:42:10 -0800774 const aos::monotonic_clock::time_point header_monotonic_sent_time(
775 chrono::nanoseconds(header.monotonic_sent_time()));
776 const aos::realtime_clock::time_point header_realtime_sent_time(
777 chrono::nanoseconds(header.realtime_sent_time()));
778 const aos::monotonic_clock::time_point header_monotonic_remote_time(
779 chrono::nanoseconds(header.monotonic_remote_time()));
780 const aos::realtime_clock::time_point header_realtime_remote_time(
781 chrono::nanoseconds(header.realtime_remote_time()));
Austin Schuh2f8fd752020-09-01 22:38:28 -0700782
Austin Schuh89c9b812021-02-20 14:42:10 -0800783 if (channel_index != -1) {
784 ASSERT_EQ(channel_index, header.channel_index());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700785 }
786
Austin Schuh89c9b812021-02-20 14:42:10 -0800787 const Context *pi1_context = nullptr;
788 const Context *pi2_context = nullptr;
789
790 if (header.channel_index() == pi1_timestamp_channel) {
791 // Find the forwarded message.
792 while (pi1_on_pi2_timestamp_fetcher.context().monotonic_event_time <
793 header_monotonic_sent_time) {
794 ASSERT_TRUE(pi1_on_pi2_timestamp_fetcher.FetchNext());
795 }
796
797 // And the source message.
798 while (pi1_on_pi1_timestamp_fetcher.context().monotonic_event_time <
799 header_monotonic_remote_time) {
800 ASSERT_TRUE(pi1_on_pi1_timestamp_fetcher.FetchNext());
801 }
802
803 pi1_context = &pi1_on_pi1_timestamp_fetcher.context();
804 pi2_context = &pi1_on_pi2_timestamp_fetcher.context();
805 } else if (header.channel_index() == ping_timestamp_channel) {
806 // Find the forwarded message.
807 while (ping_on_pi2_fetcher.context().monotonic_event_time <
808 header_monotonic_sent_time) {
809 ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
810 }
811
812 // And the source message.
813 while (ping_on_pi1_fetcher.context().monotonic_event_time <
814 header_monotonic_remote_time) {
815 ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
816 }
817
818 pi1_context = &ping_on_pi1_fetcher.context();
819 pi2_context = &ping_on_pi2_fetcher.context();
820 } else {
821 LOG(FATAL) << "Unknown channel";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700822 }
823
Austin Schuh89c9b812021-02-20 14:42:10 -0800824 // Confirm the forwarded message has matching timestamps to the
825 // timestamps we got back.
826 EXPECT_EQ(pi2_context->queue_index, header.queue_index());
827 EXPECT_EQ(pi2_context->remote_queue_index,
828 header.remote_queue_index());
829 EXPECT_EQ(pi2_context->monotonic_event_time,
830 header_monotonic_sent_time);
831 EXPECT_EQ(pi2_context->realtime_event_time,
832 header_realtime_sent_time);
833 EXPECT_EQ(pi2_context->realtime_remote_time,
834 header_realtime_remote_time);
835 EXPECT_EQ(pi2_context->monotonic_remote_time,
836 header_monotonic_remote_time);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700837
Austin Schuh89c9b812021-02-20 14:42:10 -0800838 // Confirm the forwarded message also matches the source message.
839 EXPECT_EQ(pi1_context->queue_index, header.remote_queue_index());
840 EXPECT_EQ(pi1_context->monotonic_event_time,
841 header_monotonic_remote_time);
842 EXPECT_EQ(pi1_context->realtime_event_time,
843 header_realtime_remote_time);
844 });
845 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700846
Austin Schuh4c3b9702020-08-30 11:34:55 -0700847 simulated_event_loop_factory.RunFor(chrono::seconds(10) -
848 chrono::milliseconds(500) +
849 chrono::milliseconds(5));
850
851 EXPECT_EQ(pi1_pong_counter.count(), 1001);
852 EXPECT_EQ(pi2_pong_counter.count(), 1001);
853
854 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 100);
855 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 100);
856 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 100);
857 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 100);
858 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 100);
859 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 100);
860 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 100);
861
Austin Schuh20ac95d2020-12-05 17:24:19 -0800862 EXPECT_EQ(pi1_server_statistics_count, 10);
863 EXPECT_EQ(pi2_server_statistics_count, 10);
864 EXPECT_EQ(pi3_server_statistics_count, 10);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700865
866 EXPECT_EQ(pi1_client_statistics_count, 95);
867 EXPECT_EQ(pi2_client_statistics_count, 95);
868 EXPECT_EQ(pi3_client_statistics_count, 95);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700869
870 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -0800871 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 1101);
872 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 1101);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700873}
874
875// Tests that an offset between nodes can be recovered and shows up in
876// ServerStatistics correctly.
877TEST(SimulatedEventLoopTest, MultinodePingPongWithOffset) {
878 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
Austin Schuh373f1762021-06-02 21:07:09 -0700879 aos::configuration::ReadConfig(ArtifactPath(
880 "aos/events/multinode_pingpong_test_combined_config.json"));
Austin Schuh4c3b9702020-08-30 11:34:55 -0700881 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
Austin Schuh87dd3832021-01-01 23:07:31 -0800882 const size_t pi1_index = configuration::GetNodeIndex(&config.message(), pi1);
883 ASSERT_EQ(pi1_index, 0u);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700884 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
Austin Schuh87dd3832021-01-01 23:07:31 -0800885 const size_t pi2_index = configuration::GetNodeIndex(&config.message(), pi2);
886 ASSERT_EQ(pi2_index, 1u);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700887 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
Austin Schuh87dd3832021-01-01 23:07:31 -0800888 const size_t pi3_index = configuration::GetNodeIndex(&config.message(), pi3);
889 ASSERT_EQ(pi3_index, 2u);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700890
Austin Schuh87dd3832021-01-01 23:07:31 -0800891 message_bridge::TestingTimeConverter time(
892 configuration::NodesCount(&config.message()));
Austin Schuh4c3b9702020-08-30 11:34:55 -0700893 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
Austin Schuh58646e22021-08-23 23:51:46 -0700894 simulated_event_loop_factory.SetTimeConverter(&time);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700895
896 constexpr chrono::milliseconds kOffset{1501};
Austin Schuh87dd3832021-01-01 23:07:31 -0800897 time.AddNextTimestamp(
898 distributed_clock::epoch(),
Austin Schuh58646e22021-08-23 23:51:46 -0700899 {BootTimestamp::epoch(), BootTimestamp::epoch() + kOffset,
900 BootTimestamp::epoch()});
Austin Schuh4c3b9702020-08-30 11:34:55 -0700901
902 std::unique_ptr<EventLoop> ping_event_loop =
903 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
904 Ping ping(ping_event_loop.get());
905
906 std::unique_ptr<EventLoop> pong_event_loop =
907 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
908 Pong pong(pong_event_loop.get());
909
Austin Schuh8fb315a2020-11-19 22:33:58 -0800910 // Wait to let timestamp estimation start up before looking for the results.
911 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
912
Austin Schuh87dd3832021-01-01 23:07:31 -0800913 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
914 simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
915
Austin Schuh4c3b9702020-08-30 11:34:55 -0700916 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
917 simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
918
919 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
920 simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
921
Austin Schuh4c3b9702020-08-30 11:34:55 -0700922 // Confirm the offsets are being recovered correctly.
923 int pi1_server_statistics_count = 0;
924 pi1_pong_counter_event_loop->MakeWatcher(
925 "/pi1/aos", [&pi1_server_statistics_count,
926 kOffset](const message_bridge::ServerStatistics &stats) {
927 VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
928 EXPECT_EQ(stats.connections()->size(), 2u);
929 for (const message_bridge::ServerConnection *connection :
930 *stats.connections()) {
931 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800932 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700933 if (connection->node()->name()->string_view() == "pi2") {
934 EXPECT_EQ(connection->monotonic_offset(),
935 chrono::nanoseconds(kOffset).count());
936 } else if (connection->node()->name()->string_view() == "pi3") {
937 EXPECT_EQ(connection->monotonic_offset(), 0);
938 } else {
939 LOG(FATAL) << "Unknown connection";
940 }
941
942 EXPECT_TRUE(connection->has_monotonic_offset());
943 }
944 ++pi1_server_statistics_count;
945 });
946
947 int pi2_server_statistics_count = 0;
948 pi2_pong_counter_event_loop->MakeWatcher(
949 "/pi2/aos", [&pi2_server_statistics_count,
950 kOffset](const message_bridge::ServerStatistics &stats) {
951 VLOG(1) << "pi2 ServerStatistics " << FlatbufferToJson(&stats);
952 EXPECT_EQ(stats.connections()->size(), 1u);
953
954 const message_bridge::ServerConnection *connection =
955 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800956 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700957 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
958 EXPECT_TRUE(connection->has_monotonic_offset());
959 EXPECT_EQ(connection->monotonic_offset(),
960 -chrono::nanoseconds(kOffset).count());
961 ++pi2_server_statistics_count;
962 });
963
964 int pi3_server_statistics_count = 0;
965 pi3_pong_counter_event_loop->MakeWatcher(
966 "/pi3/aos", [&pi3_server_statistics_count](
967 const message_bridge::ServerStatistics &stats) {
968 VLOG(1) << "pi3 ServerStatistics " << FlatbufferToJson(&stats);
969 EXPECT_EQ(stats.connections()->size(), 1u);
970
971 const message_bridge::ServerConnection *connection =
972 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800973 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700974 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
975 EXPECT_TRUE(connection->has_monotonic_offset());
976 EXPECT_EQ(connection->monotonic_offset(), 0);
977 ++pi3_server_statistics_count;
978 });
979
980 simulated_event_loop_factory.RunFor(chrono::seconds(10) -
981 chrono::milliseconds(500) +
982 chrono::milliseconds(5));
983
Austin Schuh20ac95d2020-12-05 17:24:19 -0800984 EXPECT_EQ(pi1_server_statistics_count, 10);
Austin Schuh58646e22021-08-23 23:51:46 -0700985 EXPECT_EQ(pi2_server_statistics_count, 10);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800986 EXPECT_EQ(pi3_server_statistics_count, 10);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700987}
988
989// Test that disabling statistics actually disables them.
Austin Schuh89c9b812021-02-20 14:42:10 -0800990TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeWithoutStatistics) {
Austin Schuh4c3b9702020-08-30 11:34:55 -0700991 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
992 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
993 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
994
995 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
996 simulated_event_loop_factory.DisableStatistics();
997
998 std::unique_ptr<EventLoop> ping_event_loop =
999 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
1000 Ping ping(ping_event_loop.get());
1001
1002 std::unique_ptr<EventLoop> pong_event_loop =
1003 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
1004 Pong pong(pong_event_loop.get());
1005
1006 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
1007 simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
1008
1009 MessageCounter<examples::Pong> pi2_pong_counter(
1010 pi2_pong_counter_event_loop.get(), "/test");
1011
1012 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
1013 simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
1014
1015 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
1016 simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
1017
1018 MessageCounter<examples::Pong> pi1_pong_counter(
1019 pi1_pong_counter_event_loop.get(), "/test");
1020
1021 // Count timestamps.
1022 MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
1023 pi1_pong_counter_event_loop.get(), "/pi1/aos");
1024 MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
1025 pi2_pong_counter_event_loop.get(), "/pi1/aos");
1026 MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
1027 pi3_pong_counter_event_loop.get(), "/pi1/aos");
1028 MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
1029 pi1_pong_counter_event_loop.get(), "/pi2/aos");
1030 MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
1031 pi2_pong_counter_event_loop.get(), "/pi2/aos");
1032 MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
1033 pi1_pong_counter_event_loop.get(), "/pi3/aos");
1034 MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
1035 pi3_pong_counter_event_loop.get(), "/pi3/aos");
1036
Austin Schuh2f8fd752020-09-01 22:38:28 -07001037 // Count remote timestamps
Austin Schuh89c9b812021-02-20 14:42:10 -08001038 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1039 remote_timestamps_pi2_on_pi1 =
1040 MakePi2OnPi1MessageCounters(pi1_pong_counter_event_loop.get());
1041 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1042 remote_timestamps_pi1_on_pi2 =
1043 MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
Austin Schuh2f8fd752020-09-01 22:38:28 -07001044
Austin Schuh4c3b9702020-08-30 11:34:55 -07001045 MessageCounter<message_bridge::ServerStatistics>
1046 pi1_server_statistics_counter(pi1_pong_counter_event_loop.get(),
1047 "/pi1/aos");
1048 MessageCounter<message_bridge::ServerStatistics>
1049 pi2_server_statistics_counter(pi2_pong_counter_event_loop.get(),
1050 "/pi2/aos");
1051 MessageCounter<message_bridge::ServerStatistics>
1052 pi3_server_statistics_counter(pi3_pong_counter_event_loop.get(),
1053 "/pi3/aos");
1054
1055 MessageCounter<message_bridge::ClientStatistics>
1056 pi1_client_statistics_counter(pi1_pong_counter_event_loop.get(),
1057 "/pi1/aos");
1058 MessageCounter<message_bridge::ClientStatistics>
1059 pi2_client_statistics_counter(pi2_pong_counter_event_loop.get(),
1060 "/pi2/aos");
1061 MessageCounter<message_bridge::ClientStatistics>
1062 pi3_client_statistics_counter(pi3_pong_counter_event_loop.get(),
1063 "/pi3/aos");
Austin Schuh898f4972020-01-11 17:21:25 -08001064
1065 simulated_event_loop_factory.RunFor(chrono::seconds(10) +
1066 chrono::milliseconds(5));
1067
Austin Schuh4c3b9702020-08-30 11:34:55 -07001068 EXPECT_EQ(pi1_pong_counter.count(), 1001u);
1069 EXPECT_EQ(pi2_pong_counter.count(), 1001u);
1070
1071 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 0u);
1072 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 0u);
1073 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 0u);
1074 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 0u);
1075 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 0u);
1076 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 0u);
1077 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 0u);
1078
1079 EXPECT_EQ(pi1_server_statistics_counter.count(), 0u);
1080 EXPECT_EQ(pi2_server_statistics_counter.count(), 0u);
1081 EXPECT_EQ(pi3_server_statistics_counter.count(), 0u);
1082
1083 EXPECT_EQ(pi1_client_statistics_counter.count(), 0u);
1084 EXPECT_EQ(pi2_client_statistics_counter.count(), 0u);
1085 EXPECT_EQ(pi3_client_statistics_counter.count(), 0u);
Austin Schuh2f8fd752020-09-01 22:38:28 -07001086
1087 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001088 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 1001);
1089 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 1001);
Austin Schuh898f4972020-01-11 17:21:25 -08001090}
1091
Austin Schuhc0b0f722020-12-12 18:36:06 -08001092bool AllConnected(const message_bridge::ServerStatistics *server_statistics) {
1093 for (const message_bridge::ServerConnection *connection :
1094 *server_statistics->connections()) {
1095 if (connection->state() != message_bridge::State::CONNECTED) {
1096 return false;
1097 }
1098 }
1099 return true;
1100}
1101
1102bool AllConnectedBut(const message_bridge::ServerStatistics *server_statistics,
1103 std::string_view target) {
1104 for (const message_bridge::ServerConnection *connection :
1105 *server_statistics->connections()) {
1106 if (connection->node()->name()->string_view() == target) {
1107 if (connection->state() == message_bridge::State::CONNECTED) {
1108 return false;
1109 }
1110 } else {
1111 if (connection->state() != message_bridge::State::CONNECTED) {
1112 return false;
1113 }
1114 }
1115 }
1116 return true;
1117}
1118
1119bool AllConnected(const message_bridge::ClientStatistics *client_statistics) {
1120 for (const message_bridge::ClientConnection *connection :
1121 *client_statistics->connections()) {
1122 if (connection->state() != message_bridge::State::CONNECTED) {
1123 return false;
1124 }
Austin Schuh367a7f42021-11-23 23:04:36 -08001125 EXPECT_TRUE(connection->has_boot_uuid());
1126 EXPECT_TRUE(connection->has_connected_since_time());
1127 EXPECT_TRUE(connection->has_connection_count());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001128 }
1129 return true;
1130}
1131
1132bool AllConnectedBut(const message_bridge::ClientStatistics *client_statistics,
1133 std::string_view target) {
1134 for (const message_bridge::ClientConnection *connection :
1135 *client_statistics->connections()) {
1136 if (connection->node()->name()->string_view() == target) {
1137 if (connection->state() == message_bridge::State::CONNECTED) {
1138 return false;
1139 }
Austin Schuh367a7f42021-11-23 23:04:36 -08001140 EXPECT_FALSE(connection->has_boot_uuid());
1141 EXPECT_FALSE(connection->has_connected_since_time());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001142 } else {
1143 if (connection->state() != message_bridge::State::CONNECTED) {
1144 return false;
1145 }
Austin Schuh367a7f42021-11-23 23:04:36 -08001146 EXPECT_TRUE(connection->has_boot_uuid());
1147 EXPECT_TRUE(connection->has_connected_since_time());
1148 EXPECT_TRUE(connection->has_connection_count());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001149 }
1150 }
1151 return true;
1152}
1153
Austin Schuh367a7f42021-11-23 23:04:36 -08001154int ConnectedCount(const message_bridge::ClientStatistics *client_statistics,
1155 std::string_view target) {
1156 for (const message_bridge::ClientConnection *connection :
1157 *client_statistics->connections()) {
1158 if (connection->node()->name()->string_view() == target) {
1159 return connection->connection_count();
1160 }
1161 }
1162 return 0;
1163}
1164
1165int ConnectedCount(const message_bridge::ServerStatistics *server_statistics,
1166 std::string_view target) {
1167 for (const message_bridge::ServerConnection *connection :
1168 *server_statistics->connections()) {
1169 if (connection->node()->name()->string_view() == target) {
1170 return connection->connection_count();
1171 }
1172 }
1173 return 0;
1174}
1175
Austin Schuhc0b0f722020-12-12 18:36:06 -08001176// Test that disconnecting nodes actually disconnects them.
Austin Schuh89c9b812021-02-20 14:42:10 -08001177TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeDisconnect) {
Austin Schuhc0b0f722020-12-12 18:36:06 -08001178 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
1179
Austin Schuh58646e22021-08-23 23:51:46 -07001180 NodeEventLoopFactory *pi1 =
1181 simulated_event_loop_factory.GetNodeEventLoopFactory("pi1");
1182 NodeEventLoopFactory *pi2 =
1183 simulated_event_loop_factory.GetNodeEventLoopFactory("pi2");
1184 NodeEventLoopFactory *pi3 =
1185 simulated_event_loop_factory.GetNodeEventLoopFactory("pi3");
1186
1187 std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001188 Ping ping(ping_event_loop.get());
1189
Austin Schuh58646e22021-08-23 23:51:46 -07001190 std::unique_ptr<EventLoop> pong_event_loop = pi2->MakeEventLoop("pong");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001191 Pong pong(pong_event_loop.get());
1192
1193 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
Austin Schuh58646e22021-08-23 23:51:46 -07001194 pi2->MakeEventLoop("pi2_pong_counter");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001195
1196 MessageCounter<examples::Pong> pi2_pong_counter(
1197 pi2_pong_counter_event_loop.get(), "/test");
1198
1199 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
Austin Schuh58646e22021-08-23 23:51:46 -07001200 pi3->MakeEventLoop("pi3_pong_counter");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001201
1202 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
Austin Schuh58646e22021-08-23 23:51:46 -07001203 pi1->MakeEventLoop("pi1_pong_counter");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001204
1205 MessageCounter<examples::Pong> pi1_pong_counter(
1206 pi1_pong_counter_event_loop.get(), "/test");
1207
1208 // Count timestamps.
1209 MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
1210 pi1_pong_counter_event_loop.get(), "/pi1/aos");
1211 MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
1212 pi2_pong_counter_event_loop.get(), "/pi1/aos");
1213 MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
1214 pi3_pong_counter_event_loop.get(), "/pi1/aos");
1215 MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
1216 pi1_pong_counter_event_loop.get(), "/pi2/aos");
1217 MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
1218 pi2_pong_counter_event_loop.get(), "/pi2/aos");
1219 MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
1220 pi1_pong_counter_event_loop.get(), "/pi3/aos");
1221 MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
1222 pi3_pong_counter_event_loop.get(), "/pi3/aos");
1223
1224 // Count remote timestamps
Austin Schuh89c9b812021-02-20 14:42:10 -08001225 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1226 remote_timestamps_pi2_on_pi1 =
1227 MakePi2OnPi1MessageCounters(pi1_pong_counter_event_loop.get());
1228 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1229 remote_timestamps_pi1_on_pi2 =
1230 MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001231
1232 MessageCounter<message_bridge::ServerStatistics>
Austin Schuh58646e22021-08-23 23:51:46 -07001233 *pi1_server_statistics_counter;
1234 pi1->OnStartup([pi1, &pi1_server_statistics_counter]() {
1235 pi1_server_statistics_counter =
1236 pi1->AlwaysStart<MessageCounter<message_bridge::ServerStatistics>>(
1237 "pi1_server_statistics_counter", "/pi1/aos");
1238 });
1239
Austin Schuhc0b0f722020-12-12 18:36:06 -08001240 aos::Fetcher<message_bridge::ServerStatistics> pi1_server_statistics_fetcher =
1241 pi1_pong_counter_event_loop
1242 ->MakeFetcher<message_bridge::ServerStatistics>("/pi1/aos");
1243 aos::Fetcher<message_bridge::ClientStatistics> pi1_client_statistics_fetcher =
1244 pi1_pong_counter_event_loop
1245 ->MakeFetcher<message_bridge::ClientStatistics>("/pi1/aos");
1246
1247 MessageCounter<message_bridge::ServerStatistics>
Austin Schuh58646e22021-08-23 23:51:46 -07001248 *pi2_server_statistics_counter;
1249 pi2->OnStartup([pi2, &pi2_server_statistics_counter]() {
1250 pi2_server_statistics_counter =
1251 pi2->AlwaysStart<MessageCounter<message_bridge::ServerStatistics>>(
1252 "pi2_server_statistics_counter", "/pi2/aos");
1253 });
Austin Schuhc0b0f722020-12-12 18:36:06 -08001254 aos::Fetcher<message_bridge::ServerStatistics> pi2_server_statistics_fetcher =
1255 pi2_pong_counter_event_loop
1256 ->MakeFetcher<message_bridge::ServerStatistics>("/pi2/aos");
1257 aos::Fetcher<message_bridge::ClientStatistics> pi2_client_statistics_fetcher =
1258 pi2_pong_counter_event_loop
1259 ->MakeFetcher<message_bridge::ClientStatistics>("/pi2/aos");
1260
1261 MessageCounter<message_bridge::ServerStatistics>
Austin Schuh58646e22021-08-23 23:51:46 -07001262 *pi3_server_statistics_counter;
1263 pi3->OnStartup([pi3, &pi3_server_statistics_counter]() {
1264 pi3_server_statistics_counter =
1265 pi3->AlwaysStart<MessageCounter<message_bridge::ServerStatistics>>(
1266 "pi3_server_statistics_counter", "/pi3/aos");
1267 });
Austin Schuhc0b0f722020-12-12 18:36:06 -08001268 aos::Fetcher<message_bridge::ServerStatistics> pi3_server_statistics_fetcher =
1269 pi3_pong_counter_event_loop
1270 ->MakeFetcher<message_bridge::ServerStatistics>("/pi3/aos");
1271 aos::Fetcher<message_bridge::ClientStatistics> pi3_client_statistics_fetcher =
1272 pi3_pong_counter_event_loop
1273 ->MakeFetcher<message_bridge::ClientStatistics>("/pi3/aos");
1274
1275 MessageCounter<message_bridge::ClientStatistics>
1276 pi1_client_statistics_counter(pi1_pong_counter_event_loop.get(),
1277 "/pi1/aos");
1278 MessageCounter<message_bridge::ClientStatistics>
1279 pi2_client_statistics_counter(pi2_pong_counter_event_loop.get(),
1280 "/pi2/aos");
1281 MessageCounter<message_bridge::ClientStatistics>
1282 pi3_client_statistics_counter(pi3_pong_counter_event_loop.get(),
1283 "/pi3/aos");
1284
James Kuszmaul86e86c32022-07-21 17:39:47 -07001285 std::vector<std::unique_ptr<aos::EventLoop>> statistics_watcher_loops;
1286 statistics_watcher_loops.emplace_back(pi1->MakeEventLoop("test"));
1287 statistics_watcher_loops.emplace_back(pi2->MakeEventLoop("test"));
1288 statistics_watcher_loops.emplace_back(pi3->MakeEventLoop("test"));
1289 // The currenct contract is that, if all nodes boot simultaneously in
1290 // simulation, that they should all act as if they area already connected,
1291 // without ever observing the transition from disconnected to connected (note
1292 // that on a real system the ServerStatistics message will get resent for each
1293 // and every new connection, even if the new connections happen
1294 // "simultaneously"--in simulation, we are essentially acting as if we are
1295 // starting execution in an already running system, rather than observing the
1296 // boot process).
1297 for (auto &event_loop : statistics_watcher_loops) {
1298 event_loop->MakeWatcher(
1299 "/aos", [](const message_bridge::ServerStatistics &msg) {
1300 for (const message_bridge::ServerConnection *connection :
1301 *msg.connections()) {
1302 EXPECT_EQ(message_bridge::State::CONNECTED, connection->state())
1303 << connection->node()->name()->string_view();
1304 }
1305 });
1306 }
1307
Austin Schuhc0b0f722020-12-12 18:36:06 -08001308 simulated_event_loop_factory.RunFor(chrono::seconds(2) +
1309 chrono::milliseconds(5));
1310
James Kuszmaul86e86c32022-07-21 17:39:47 -07001311 statistics_watcher_loops.clear();
1312
Austin Schuhc0b0f722020-12-12 18:36:06 -08001313 EXPECT_EQ(pi1_pong_counter.count(), 201u);
1314 EXPECT_EQ(pi2_pong_counter.count(), 201u);
1315
1316 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 20u);
1317 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 20u);
1318 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 20u);
1319 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 20u);
1320 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 20u);
1321 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 20u);
1322 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 20u);
1323
Austin Schuh58646e22021-08-23 23:51:46 -07001324 EXPECT_EQ(pi1_server_statistics_counter->count(), 2u);
1325 EXPECT_EQ(pi2_server_statistics_counter->count(), 2u);
1326 EXPECT_EQ(pi3_server_statistics_counter->count(), 2u);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001327
1328 EXPECT_EQ(pi1_client_statistics_counter.count(), 20u);
1329 EXPECT_EQ(pi2_client_statistics_counter.count(), 20u);
1330 EXPECT_EQ(pi3_client_statistics_counter.count(), 20u);
1331
1332 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001333 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 221);
1334 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 221);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001335
1336 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1337 EXPECT_TRUE(AllConnected(pi1_server_statistics_fetcher.get()))
1338 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1339 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1340 EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
1341 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1342 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1343 EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
1344 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1345 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1346 EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
1347 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1348 EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
1349 EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
1350 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
1351 EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
1352 EXPECT_TRUE(AllConnected(pi3_client_statistics_fetcher.get()))
1353 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1354
Austin Schuh58646e22021-08-23 23:51:46 -07001355 pi1->Disconnect(pi3->node());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001356
1357 simulated_event_loop_factory.RunFor(chrono::seconds(2));
1358
1359 EXPECT_EQ(pi1_pong_counter.count(), 401u);
1360 EXPECT_EQ(pi2_pong_counter.count(), 401u);
1361
1362 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 40u);
1363 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 40u);
1364 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 20u);
1365 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 40u);
1366 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 40u);
1367 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 40u);
1368 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 40u);
1369
Austin Schuh58646e22021-08-23 23:51:46 -07001370 EXPECT_EQ(pi1_server_statistics_counter->count(), 4u);
1371 EXPECT_EQ(pi2_server_statistics_counter->count(), 4u);
1372 EXPECT_EQ(pi3_server_statistics_counter->count(), 4u);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001373
1374 EXPECT_EQ(pi1_client_statistics_counter.count(), 40u);
1375 EXPECT_EQ(pi2_client_statistics_counter.count(), 40u);
1376 EXPECT_EQ(pi3_client_statistics_counter.count(), 40u);
1377
1378 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001379 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 441);
1380 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 441);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001381
1382 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1383 EXPECT_TRUE(AllConnectedBut(pi1_server_statistics_fetcher.get(), "pi3"))
1384 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1385 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1386 EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
1387 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1388 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1389 EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
1390 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1391 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1392 EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
1393 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1394 EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
1395 EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
1396 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
1397 EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
1398 EXPECT_TRUE(AllConnectedBut(pi3_client_statistics_fetcher.get(), "pi1"))
1399 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1400
Austin Schuh58646e22021-08-23 23:51:46 -07001401 pi1->Connect(pi3->node());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001402
1403 simulated_event_loop_factory.RunFor(chrono::seconds(2));
1404
Austin Schuh367a7f42021-11-23 23:04:36 -08001405 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1406 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1407 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1408 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1409 EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
1410 EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
1411
1412 EXPECT_EQ(ConnectedCount(pi1_server_statistics_fetcher.get(), "pi3"), 2u)
1413 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1414 EXPECT_EQ(ConnectedCount(pi1_server_statistics_fetcher.get(), "pi2"), 1u)
1415 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1416 EXPECT_EQ(ConnectedCount(pi1_client_statistics_fetcher.get(), "pi3"), 1u)
1417 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1418 EXPECT_EQ(ConnectedCount(pi1_client_statistics_fetcher.get(), "pi2"), 1u)
1419 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1420
1421 EXPECT_EQ(ConnectedCount(pi2_server_statistics_fetcher.get(), "pi1"), 1u)
1422 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1423 EXPECT_EQ(ConnectedCount(pi2_client_statistics_fetcher.get(), "pi1"), 1u)
1424 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1425
1426 EXPECT_EQ(ConnectedCount(pi3_server_statistics_fetcher.get(), "pi1"), 1u)
1427 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
1428 EXPECT_EQ(ConnectedCount(pi3_client_statistics_fetcher.get(), "pi1"), 2u)
1429 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1430
Austin Schuhc0b0f722020-12-12 18:36:06 -08001431 EXPECT_EQ(pi1_pong_counter.count(), 601u);
1432 EXPECT_EQ(pi2_pong_counter.count(), 601u);
1433
1434 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 60u);
1435 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 60u);
1436 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 40u);
1437 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 60u);
1438 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 60u);
1439 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 60u);
1440 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 60u);
1441
Austin Schuh58646e22021-08-23 23:51:46 -07001442 EXPECT_EQ(pi1_server_statistics_counter->count(), 6u);
1443 EXPECT_EQ(pi2_server_statistics_counter->count(), 6u);
1444 EXPECT_EQ(pi3_server_statistics_counter->count(), 6u);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001445
1446 EXPECT_EQ(pi1_client_statistics_counter.count(), 60u);
1447 EXPECT_EQ(pi2_client_statistics_counter.count(), 60u);
1448 EXPECT_EQ(pi3_client_statistics_counter.count(), 60u);
1449
1450 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001451 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 661);
1452 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 661);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001453
Austin Schuhc0b0f722020-12-12 18:36:06 -08001454 EXPECT_TRUE(AllConnected(pi1_server_statistics_fetcher.get()))
1455 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001456 EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
1457 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001458 EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
1459 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001460 EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
1461 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001462 EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
1463 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001464 EXPECT_TRUE(AllConnected(pi3_client_statistics_fetcher.get()))
1465 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1466}
1467
Austin Schuh2febf0d2020-09-21 22:24:30 -07001468// Tests that the time offset having a slope doesn't break the world.
1469// SimulatedMessageBridge has enough self consistency CHECK statements to
1470// confirm, and we can can also check a message in each direction to make sure
1471// it gets delivered as expected.
1472TEST(SimulatedEventLoopTest, MultinodePingPongWithOffsetAndSlope) {
1473 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
Austin Schuh373f1762021-06-02 21:07:09 -07001474 aos::configuration::ReadConfig(ArtifactPath(
1475 "aos/events/multinode_pingpong_test_combined_config.json"));
Austin Schuh2febf0d2020-09-21 22:24:30 -07001476 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
Austin Schuh87dd3832021-01-01 23:07:31 -08001477 const size_t pi1_index = configuration::GetNodeIndex(&config.message(), pi1);
1478 ASSERT_EQ(pi1_index, 0u);
Austin Schuh2febf0d2020-09-21 22:24:30 -07001479 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
Austin Schuh87dd3832021-01-01 23:07:31 -08001480 const size_t pi2_index = configuration::GetNodeIndex(&config.message(), pi2);
1481 ASSERT_EQ(pi2_index, 1u);
1482 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
1483 const size_t pi3_index = configuration::GetNodeIndex(&config.message(), pi3);
1484 ASSERT_EQ(pi3_index, 2u);
Austin Schuh2febf0d2020-09-21 22:24:30 -07001485
Austin Schuh87dd3832021-01-01 23:07:31 -08001486 message_bridge::TestingTimeConverter time(
1487 configuration::NodesCount(&config.message()));
Austin Schuh2febf0d2020-09-21 22:24:30 -07001488 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
Austin Schuh58646e22021-08-23 23:51:46 -07001489 simulated_event_loop_factory.SetTimeConverter(&time);
Austin Schuh2febf0d2020-09-21 22:24:30 -07001490
Austin Schuh2febf0d2020-09-21 22:24:30 -07001491 constexpr chrono::milliseconds kOffset{150100};
Austin Schuh87dd3832021-01-01 23:07:31 -08001492 time.AddNextTimestamp(
1493 distributed_clock::epoch(),
Austin Schuh58646e22021-08-23 23:51:46 -07001494 {BootTimestamp::epoch(), BootTimestamp::epoch() + kOffset,
1495 BootTimestamp::epoch()});
1496 time.AddNextTimestamp(distributed_clock::epoch() + chrono::seconds(10),
1497 {BootTimestamp::epoch() + chrono::milliseconds(9999),
1498 BootTimestamp::epoch() + kOffset + chrono::seconds(10),
1499 BootTimestamp::epoch() + chrono::milliseconds(9999)});
Austin Schuh2febf0d2020-09-21 22:24:30 -07001500
1501 std::unique_ptr<EventLoop> ping_event_loop =
1502 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
1503 Ping ping(ping_event_loop.get());
1504
1505 std::unique_ptr<EventLoop> pong_event_loop =
1506 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
1507 Pong pong(pong_event_loop.get());
1508
1509 std::unique_ptr<EventLoop> pi1_counter_event_loop =
1510 simulated_event_loop_factory.MakeEventLoop("pi1_counter", pi1);
1511 std::unique_ptr<EventLoop> pi2_counter_event_loop =
1512 simulated_event_loop_factory.MakeEventLoop("pi2_counter", pi2);
1513
1514 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
1515 pi1_counter_event_loop->MakeFetcher<examples::Ping>("/test");
1516 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
1517 pi2_counter_event_loop->MakeFetcher<examples::Ping>("/test");
1518
1519 aos::Fetcher<examples::Pong> pong_on_pi2_fetcher =
1520 pi2_counter_event_loop->MakeFetcher<examples::Pong>("/test");
1521 aos::Fetcher<examples::Pong> pong_on_pi1_fetcher =
1522 pi1_counter_event_loop->MakeFetcher<examples::Pong>("/test");
1523
1524 // End after a pong message comes back. This will leave the latest messages
1525 // on all channels so we can look at timestamps easily and check they make
1526 // sense.
1527 std::unique_ptr<EventLoop> pi1_pong_ender =
1528 simulated_event_loop_factory.MakeEventLoop("pi2_counter", pi1);
1529 int count = 0;
1530 pi1_pong_ender->MakeWatcher(
1531 "/test", [&simulated_event_loop_factory, &count](const examples::Pong &) {
1532 if (++count == 100) {
1533 simulated_event_loop_factory.Exit();
1534 }
1535 });
1536
1537 // Run enough that messages should be delivered.
1538 simulated_event_loop_factory.Run();
1539
1540 // Grab the latest messages.
1541 EXPECT_TRUE(ping_on_pi1_fetcher.Fetch());
1542 EXPECT_TRUE(ping_on_pi2_fetcher.Fetch());
1543 EXPECT_TRUE(pong_on_pi1_fetcher.Fetch());
1544 EXPECT_TRUE(pong_on_pi2_fetcher.Fetch());
1545
1546 // Compute their time on the global distributed clock so we can compute
1547 // distance betwen them.
1548 const distributed_clock::time_point pi1_ping_time =
1549 simulated_event_loop_factory.GetNodeEventLoopFactory(pi1)
1550 ->ToDistributedClock(
1551 ping_on_pi1_fetcher.context().monotonic_event_time);
1552 const distributed_clock::time_point pi2_ping_time =
1553 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
1554 ->ToDistributedClock(
1555 ping_on_pi2_fetcher.context().monotonic_event_time);
1556 const distributed_clock::time_point pi1_pong_time =
1557 simulated_event_loop_factory.GetNodeEventLoopFactory(pi1)
1558 ->ToDistributedClock(
1559 pong_on_pi1_fetcher.context().monotonic_event_time);
1560 const distributed_clock::time_point pi2_pong_time =
1561 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
1562 ->ToDistributedClock(
1563 pong_on_pi2_fetcher.context().monotonic_event_time);
1564
1565 // And confirm the delivery delay is just about exactly 150 uS for both
1566 // directions like expected. There will be a couple ns of rounding errors in
1567 // the conversion functions that aren't worth accounting for right now. This
1568 // will either be really close, or really far.
1569 EXPECT_GE(pi2_ping_time, chrono::microseconds(150) - chrono::nanoseconds(10) +
1570 pi1_ping_time);
1571 EXPECT_LE(pi2_ping_time, chrono::microseconds(150) + chrono::nanoseconds(10) +
1572 pi1_ping_time);
1573
1574 EXPECT_GE(pi1_pong_time, chrono::microseconds(150) - chrono::nanoseconds(10) +
1575 pi2_pong_time);
1576 EXPECT_LE(pi1_pong_time, chrono::microseconds(150) + chrono::nanoseconds(10) +
1577 pi2_pong_time);
1578}
1579
Austin Schuh4c570ea2020-11-19 23:13:24 -08001580void SendPing(aos::Sender<examples::Ping> *sender, int value) {
1581 aos::Sender<examples::Ping>::Builder builder = sender->MakeBuilder();
1582 examples::Ping::Builder ping_builder = builder.MakeBuilder<examples::Ping>();
1583 ping_builder.add_value(value);
milind1f1dca32021-07-03 13:50:07 -07001584 builder.CheckOk(builder.Send(ping_builder.Finish()));
Austin Schuh4c570ea2020-11-19 23:13:24 -08001585}
1586
1587// Tests that reliable (and unreliable) ping messages get forwarded as expected.
Austin Schuh89c9b812021-02-20 14:42:10 -08001588TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeStartupTesting) {
Austin Schuh4c570ea2020-11-19 23:13:24 -08001589 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
1590 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
1591
1592 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
1593
1594 std::unique_ptr<EventLoop> ping_event_loop =
1595 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
1596 aos::Sender<examples::Ping> pi1_reliable_sender =
1597 ping_event_loop->MakeSender<examples::Ping>("/reliable");
1598 aos::Sender<examples::Ping> pi1_unreliable_sender =
1599 ping_event_loop->MakeSender<examples::Ping>("/unreliable");
1600 SendPing(&pi1_reliable_sender, 1);
1601 SendPing(&pi1_unreliable_sender, 1);
1602
1603 std::unique_ptr<EventLoop> pi2_pong_event_loop =
1604 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
James Kuszmaul86e86c32022-07-21 17:39:47 -07001605 aos::Sender<examples::Ping> pi2_reliable_sender =
1606 pi2_pong_event_loop->MakeSender<examples::Ping>("/reliable2");
1607 SendPing(&pi2_reliable_sender, 1);
Austin Schuh4c570ea2020-11-19 23:13:24 -08001608 MessageCounter<examples::Ping> pi2_reliable_counter(pi2_pong_event_loop.get(),
1609 "/reliable");
James Kuszmaul86e86c32022-07-21 17:39:47 -07001610 MessageCounter<examples::Ping> pi1_reliable_counter(ping_event_loop.get(),
1611 "/reliable2");
Austin Schuh4c570ea2020-11-19 23:13:24 -08001612 MessageCounter<examples::Ping> pi2_unreliable_counter(
1613 pi2_pong_event_loop.get(), "/unreliable");
1614 aos::Fetcher<examples::Ping> reliable_on_pi2_fetcher =
1615 pi2_pong_event_loop->MakeFetcher<examples::Ping>("/reliable");
1616 aos::Fetcher<examples::Ping> unreliable_on_pi2_fetcher =
1617 pi2_pong_event_loop->MakeFetcher<examples::Ping>("/unreliable");
1618
1619 const size_t reliable_channel_index = configuration::ChannelIndex(
1620 pi2_pong_event_loop->configuration(), reliable_on_pi2_fetcher.channel());
1621
1622 std::unique_ptr<EventLoop> pi1_remote_timestamp =
1623 simulated_event_loop_factory.MakeEventLoop("pi1_remote_timestamp", pi1);
1624
Austin Schuheeaa2022021-01-02 21:52:03 -08001625 const chrono::nanoseconds network_delay =
1626 simulated_event_loop_factory.network_delay();
1627
Austin Schuh4c570ea2020-11-19 23:13:24 -08001628 int reliable_timestamp_count = 0;
1629 pi1_remote_timestamp->MakeWatcher(
Austin Schuh89c9b812021-02-20 14:42:10 -08001630 shared() ? "/pi1/aos/remote_timestamps/pi2"
1631 : "/pi1/aos/remote_timestamps/pi2/reliable/aos-examples-Ping",
Austin Schuh20ac95d2020-12-05 17:24:19 -08001632 [reliable_channel_index, &reliable_timestamp_count,
Austin Schuheeaa2022021-01-02 21:52:03 -08001633 &simulated_event_loop_factory, pi2, network_delay, &pi2_pong_event_loop,
1634 &pi1_remote_timestamp](const RemoteMessage &header) {
Austin Schuh20ac95d2020-12-05 17:24:19 -08001635 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuhcdd90272021-03-15 12:46:16 -07001636 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
Austin Schuh20ac95d2020-12-05 17:24:19 -08001637 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
Austin Schuhcdd90272021-03-15 12:46:16 -07001638 ->boot_uuid());
Austin Schuh4c570ea2020-11-19 23:13:24 -08001639 VLOG(1) << aos::FlatbufferToJson(&header);
1640 if (header.channel_index() == reliable_channel_index) {
1641 ++reliable_timestamp_count;
1642 }
Austin Schuheeaa2022021-01-02 21:52:03 -08001643
1644 const aos::monotonic_clock::time_point header_monotonic_sent_time(
1645 chrono::nanoseconds(header.monotonic_sent_time()));
1646
1647 EXPECT_EQ(pi1_remote_timestamp->context().monotonic_event_time,
1648 header_monotonic_sent_time + network_delay +
1649 (pi1_remote_timestamp->monotonic_now() -
1650 pi2_pong_event_loop->monotonic_now()));
Austin Schuh4c570ea2020-11-19 23:13:24 -08001651 });
1652
1653 // Wait to let timestamp estimation start up before looking for the results.
1654 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
1655
1656 EXPECT_EQ(pi2_reliable_counter.count(), 1u);
1657 // This one isn't reliable, but was sent before the start. It should *not* be
1658 // delivered.
1659 EXPECT_EQ(pi2_unreliable_counter.count(), 0u);
1660 // Confirm we got a timestamp logged for the message that was forwarded.
1661 EXPECT_EQ(reliable_timestamp_count, 1u);
1662
1663 SendPing(&pi1_reliable_sender, 2);
1664 SendPing(&pi1_unreliable_sender, 2);
1665 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
1666 EXPECT_EQ(pi2_reliable_counter.count(), 2u);
James Kuszmaul86e86c32022-07-21 17:39:47 -07001667 EXPECT_EQ(pi1_reliable_counter.count(), 1u);
Austin Schuh4c570ea2020-11-19 23:13:24 -08001668 EXPECT_EQ(pi2_unreliable_counter.count(), 1u);
1669
1670 EXPECT_EQ(reliable_timestamp_count, 2u);
1671}
1672
Austin Schuh20ac95d2020-12-05 17:24:19 -08001673// Tests that rebooting a node changes the ServerStatistics message and the
1674// RemoteTimestamp message.
Austin Schuh89c9b812021-02-20 14:42:10 -08001675TEST_P(RemoteMessageSimulatedEventLoopTest, BootUUIDTest) {
Austin Schuh72e65682021-09-02 11:37:05 -07001676 const UUID pi1_boot0 = UUID::Random();
1677 const UUID pi2_boot0 = UUID::Random();
1678 const UUID pi2_boot1 = UUID::Random();
1679 const UUID pi3_boot0 = UUID::Random();
1680 UUID expected_boot_uuid = pi2_boot0;
Austin Schuh20ac95d2020-12-05 17:24:19 -08001681
Austin Schuh58646e22021-08-23 23:51:46 -07001682 message_bridge::TestingTimeConverter time(
1683 configuration::NodesCount(&config.message()));
1684 SimulatedEventLoopFactory factory(&config.message());
1685 factory.SetTimeConverter(&time);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001686
Austin Schuh58646e22021-08-23 23:51:46 -07001687 const size_t pi1_index =
1688 configuration::GetNodeIndex(&config.message(), "pi1");
1689 const size_t pi2_index =
1690 configuration::GetNodeIndex(&config.message(), "pi2");
1691 const size_t pi3_index =
1692 configuration::GetNodeIndex(&config.message(), "pi3");
Austin Schuh20ac95d2020-12-05 17:24:19 -08001693
Austin Schuh58646e22021-08-23 23:51:46 -07001694 {
1695 time.AddNextTimestamp(distributed_clock::epoch(),
1696 {BootTimestamp::epoch(), BootTimestamp::epoch(),
1697 BootTimestamp::epoch()});
1698
1699 const chrono::nanoseconds dt = chrono::milliseconds(2001);
1700
1701 time.AddNextTimestamp(
1702 distributed_clock::epoch() + dt,
1703 {BootTimestamp::epoch() + dt,
1704 BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
1705 BootTimestamp::epoch() + dt});
1706
1707 time.set_boot_uuid(pi1_index, 0, pi1_boot0);
1708 time.set_boot_uuid(pi2_index, 0, pi2_boot0);
1709 time.set_boot_uuid(pi2_index, 1, pi2_boot1);
1710 time.set_boot_uuid(pi3_index, 0, pi3_boot0);
1711 }
1712
1713 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1714 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
1715
1716 pi1->OnStartup([pi1]() { pi1->AlwaysStart<Ping>("ping"); });
1717 pi2->OnStartup([pi2]() { pi2->AlwaysStart<Pong>("pong"); });
Austin Schuh20ac95d2020-12-05 17:24:19 -08001718
1719 std::unique_ptr<EventLoop> pi1_remote_timestamp =
Austin Schuh58646e22021-08-23 23:51:46 -07001720 pi1->MakeEventLoop("pi1_remote_timestamp");
Austin Schuh20ac95d2020-12-05 17:24:19 -08001721
1722 int timestamp_count = 0;
1723 pi1_remote_timestamp->MakeWatcher(
Austin Schuh8902fa52021-03-14 22:39:24 -07001724 "/pi2/aos", [&expected_boot_uuid,
1725 &pi1_remote_timestamp](const message_bridge::Timestamp &) {
Austin Schuha9012be2021-07-21 15:19:11 -07001726 EXPECT_EQ(pi1_remote_timestamp->context().source_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -07001727 expected_boot_uuid);
1728 });
1729 pi1_remote_timestamp->MakeWatcher(
1730 "/test",
1731 [&expected_boot_uuid, &pi1_remote_timestamp](const examples::Pong &) {
Austin Schuha9012be2021-07-21 15:19:11 -07001732 EXPECT_EQ(pi1_remote_timestamp->context().source_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -07001733 expected_boot_uuid);
1734 });
1735 pi1_remote_timestamp->MakeWatcher(
Austin Schuh89c9b812021-02-20 14:42:10 -08001736 shared() ? "/pi1/aos/remote_timestamps/pi2"
1737 : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping",
Austin Schuh20ac95d2020-12-05 17:24:19 -08001738 [&timestamp_count, &expected_boot_uuid](const RemoteMessage &header) {
1739 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuhcdd90272021-03-15 12:46:16 -07001740 EXPECT_EQ(UUID::FromVector(header.boot_uuid()), expected_boot_uuid);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001741 VLOG(1) << aos::FlatbufferToJson(&header);
1742 ++timestamp_count;
1743 });
1744
1745 int pi1_server_statistics_count = 0;
Austin Schuh58646e22021-08-23 23:51:46 -07001746 bool first_pi1_server_statistics = true;
Austin Schuh367a7f42021-11-23 23:04:36 -08001747 int boot_number = 0;
1748 monotonic_clock::time_point expected_connection_time = pi1->monotonic_now();
Austin Schuh20ac95d2020-12-05 17:24:19 -08001749 pi1_remote_timestamp->MakeWatcher(
Austin Schuh367a7f42021-11-23 23:04:36 -08001750 "/pi1/aos",
1751 [&pi1_server_statistics_count, &expected_boot_uuid,
1752 &expected_connection_time, &first_pi1_server_statistics,
1753 &boot_number](const message_bridge::ServerStatistics &stats) {
Austin Schuh20ac95d2020-12-05 17:24:19 -08001754 VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
1755 for (const message_bridge::ServerConnection *connection :
1756 *stats.connections()) {
Austin Schuh58646e22021-08-23 23:51:46 -07001757 if (connection->state() == message_bridge::State::CONNECTED) {
1758 ASSERT_TRUE(connection->has_boot_uuid());
1759 }
1760 if (!first_pi1_server_statistics) {
1761 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1762 }
Austin Schuh20ac95d2020-12-05 17:24:19 -08001763 if (connection->node()->name()->string_view() == "pi2") {
Austin Schuh58646e22021-08-23 23:51:46 -07001764 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1765 ASSERT_TRUE(connection->has_boot_uuid());
Austin Schuh20ac95d2020-12-05 17:24:19 -08001766 EXPECT_EQ(expected_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -07001767 UUID::FromString(connection->boot_uuid()))
Austin Schuh20ac95d2020-12-05 17:24:19 -08001768 << " : Got " << aos::FlatbufferToJson(&stats);
Austin Schuh367a7f42021-11-23 23:04:36 -08001769 EXPECT_EQ(monotonic_clock::time_point(chrono::nanoseconds(
1770 connection->connected_since_time())),
1771 expected_connection_time);
1772 EXPECT_EQ(boot_number + 1, connection->connection_count());
Austin Schuh20ac95d2020-12-05 17:24:19 -08001773 ++pi1_server_statistics_count;
1774 }
1775 }
Austin Schuh58646e22021-08-23 23:51:46 -07001776 first_pi1_server_statistics = false;
Austin Schuh20ac95d2020-12-05 17:24:19 -08001777 });
1778
Austin Schuh58646e22021-08-23 23:51:46 -07001779 int pi1_client_statistics_count = 0;
1780 pi1_remote_timestamp->MakeWatcher(
Austin Schuh367a7f42021-11-23 23:04:36 -08001781 "/pi1/aos", [&pi1_client_statistics_count, &expected_boot_uuid,
1782 &expected_connection_time, &boot_number](
Austin Schuh58646e22021-08-23 23:51:46 -07001783 const message_bridge::ClientStatistics &stats) {
1784 VLOG(1) << "pi1 ClientStatistics " << FlatbufferToJson(&stats);
1785 for (const message_bridge::ClientConnection *connection :
1786 *stats.connections()) {
1787 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1788 if (connection->node()->name()->string_view() == "pi2") {
1789 ++pi1_client_statistics_count;
Austin Schuh367a7f42021-11-23 23:04:36 -08001790 EXPECT_EQ(expected_boot_uuid,
1791 UUID::FromString(connection->boot_uuid()))
1792 << " : Got " << aos::FlatbufferToJson(&stats);
1793 EXPECT_EQ(monotonic_clock::time_point(chrono::nanoseconds(
1794 connection->connected_since_time())),
1795 expected_connection_time);
1796 EXPECT_EQ(boot_number + 1, connection->connection_count());
1797 } else {
1798 EXPECT_EQ(connection->connected_since_time(), 0);
1799 EXPECT_EQ(1, connection->connection_count());
Austin Schuh58646e22021-08-23 23:51:46 -07001800 }
1801 }
1802 });
1803
1804 // Confirm that reboot changes the UUID.
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -07001805 pi2->OnShutdown([&expected_boot_uuid, &boot_number, &expected_connection_time,
1806 pi1, pi2, pi2_boot1]() {
1807 expected_boot_uuid = pi2_boot1;
1808 ++boot_number;
1809 LOG(INFO) << "OnShutdown triggered for pi2";
1810 pi2->OnStartup(
1811 [&expected_boot_uuid, &expected_connection_time, pi1, pi2]() {
1812 EXPECT_EQ(expected_boot_uuid, pi2->boot_uuid());
1813 expected_connection_time = pi1->monotonic_now();
1814 });
1815 });
Austin Schuh58646e22021-08-23 23:51:46 -07001816
Austin Schuh20ac95d2020-12-05 17:24:19 -08001817 // Let a couple of ServerStatistics messages show up before rebooting.
Austin Schuh58646e22021-08-23 23:51:46 -07001818 factory.RunFor(chrono::milliseconds(2002));
Austin Schuh20ac95d2020-12-05 17:24:19 -08001819
1820 EXPECT_GT(timestamp_count, 100);
1821 EXPECT_GE(pi1_server_statistics_count, 1u);
1822
Austin Schuh20ac95d2020-12-05 17:24:19 -08001823 timestamp_count = 0;
1824 pi1_server_statistics_count = 0;
1825
Austin Schuh58646e22021-08-23 23:51:46 -07001826 factory.RunFor(chrono::milliseconds(2000));
Austin Schuh20ac95d2020-12-05 17:24:19 -08001827 EXPECT_GT(timestamp_count, 100);
1828 EXPECT_GE(pi1_server_statistics_count, 1u);
1829}
1830
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -07001831INSTANTIATE_TEST_SUITE_P(
Austin Schuh89c9b812021-02-20 14:42:10 -08001832 All, RemoteMessageSimulatedEventLoopTest,
1833 ::testing::Values(
1834 Param{"multinode_pingpong_test_combined_config.json", true},
1835 Param{"multinode_pingpong_test_split_config.json", false}));
1836
Austin Schuh58646e22021-08-23 23:51:46 -07001837// Tests that Startup and Shutdown do reasonable things.
1838TEST(SimulatedEventLoopTest, MultinodePingPongStartup) {
1839 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
1840 aos::configuration::ReadConfig(
1841 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
1842
Austin Schuh72e65682021-09-02 11:37:05 -07001843 size_t pi1_shutdown_counter = 0;
1844 size_t pi2_shutdown_counter = 0;
1845 MessageCounter<examples::Pong> *pi1_pong_counter = nullptr;
1846 MessageCounter<examples::Ping> *pi2_ping_counter = nullptr;
1847
Austin Schuh58646e22021-08-23 23:51:46 -07001848 message_bridge::TestingTimeConverter time(
1849 configuration::NodesCount(&config.message()));
1850 SimulatedEventLoopFactory factory(&config.message());
1851 factory.SetTimeConverter(&time);
1852 time.AddNextTimestamp(
1853 distributed_clock::epoch(),
1854 {BootTimestamp::epoch(), BootTimestamp::epoch(), BootTimestamp::epoch()});
1855
1856 const chrono::nanoseconds dt = chrono::seconds(10) + chrono::milliseconds(6);
1857
1858 time.AddNextTimestamp(
1859 distributed_clock::epoch() + dt,
1860 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
1861 BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
1862 BootTimestamp::epoch() + dt});
1863
1864 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1865 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
1866
1867 // Configure startup to start Ping and Pong, and count.
1868 size_t pi1_startup_counter = 0;
1869 size_t pi2_startup_counter = 0;
1870 pi1->OnStartup([pi1]() {
1871 LOG(INFO) << "Made ping";
1872 pi1->AlwaysStart<Ping>("ping");
1873 });
1874 pi1->OnStartup([&pi1_startup_counter]() { ++pi1_startup_counter; });
1875 pi2->OnStartup([pi2]() {
1876 LOG(INFO) << "Made pong";
1877 pi2->AlwaysStart<Pong>("pong");
1878 });
1879 pi2->OnStartup([&pi2_startup_counter]() { ++pi2_startup_counter; });
1880
1881 // Shutdown just counts.
Austin Schuh58646e22021-08-23 23:51:46 -07001882 pi1->OnShutdown([&pi1_shutdown_counter]() { ++pi1_shutdown_counter; });
1883 pi2->OnShutdown([&pi2_shutdown_counter]() { ++pi2_shutdown_counter; });
1884
Austin Schuh58646e22021-08-23 23:51:46 -07001885 // Automatically make counters on startup.
1886 pi1->OnStartup([&pi1_pong_counter, pi1]() {
1887 pi1_pong_counter = pi1->AlwaysStart<MessageCounter<examples::Pong>>(
1888 "pi1_pong_counter", "/test");
1889 });
1890 pi1->OnShutdown([&pi1_pong_counter]() { pi1_pong_counter = nullptr; });
1891 pi2->OnStartup([&pi2_ping_counter, pi2]() {
1892 pi2_ping_counter = pi2->AlwaysStart<MessageCounter<examples::Ping>>(
1893 "pi2_ping_counter", "/test");
1894 });
1895 pi2->OnShutdown([&pi2_ping_counter]() { pi2_ping_counter = nullptr; });
1896
1897 EXPECT_EQ(pi2_ping_counter, nullptr);
1898 EXPECT_EQ(pi1_pong_counter, nullptr);
1899
1900 EXPECT_EQ(pi1_startup_counter, 0u);
1901 EXPECT_EQ(pi2_startup_counter, 0u);
1902 EXPECT_EQ(pi1_shutdown_counter, 0u);
1903 EXPECT_EQ(pi2_shutdown_counter, 0u);
1904
1905 factory.RunFor(chrono::seconds(10) + chrono::milliseconds(5));
1906 EXPECT_EQ(pi1_startup_counter, 1u);
1907 EXPECT_EQ(pi2_startup_counter, 1u);
1908 EXPECT_EQ(pi1_shutdown_counter, 0u);
1909 EXPECT_EQ(pi2_shutdown_counter, 0u);
1910 EXPECT_EQ(pi2_ping_counter->count(), 1001);
1911 EXPECT_EQ(pi1_pong_counter->count(), 1001);
1912
1913 LOG(INFO) << pi1->monotonic_now();
1914 LOG(INFO) << pi2->monotonic_now();
1915
1916 factory.RunFor(chrono::seconds(5) + chrono::milliseconds(5));
1917
1918 EXPECT_EQ(pi1_startup_counter, 2u);
1919 EXPECT_EQ(pi2_startup_counter, 2u);
1920 EXPECT_EQ(pi1_shutdown_counter, 1u);
1921 EXPECT_EQ(pi2_shutdown_counter, 1u);
1922 EXPECT_EQ(pi2_ping_counter->count(), 501);
1923 EXPECT_EQ(pi1_pong_counter->count(), 501);
1924}
1925
1926// Tests that OnStartup handlers can be added after running and get called, and
1927// can't be called when running.
1928TEST(SimulatedEventLoopDeathTest, OnStartupWhileRunning) {
1929 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
1930 aos::configuration::ReadConfig(
1931 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
1932
1933 // Test that we can add startup handlers as long as we aren't running, and
1934 // they get run when Run gets called again.
1935 // Test that adding a startup handler when running fails.
1936 //
1937 // Test shutdown handlers get called on destruction.
1938 SimulatedEventLoopFactory factory(&config.message());
1939
1940 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1941
1942 int startup_count0 = 0;
1943 int startup_count1 = 0;
1944
1945 pi1->OnStartup([&]() { ++startup_count0; });
1946 EXPECT_EQ(startup_count0, 0);
1947 EXPECT_EQ(startup_count1, 0);
1948
1949 factory.RunFor(chrono::nanoseconds(1));
1950 EXPECT_EQ(startup_count0, 1);
1951 EXPECT_EQ(startup_count1, 0);
1952
1953 pi1->OnStartup([&]() { ++startup_count1; });
1954 EXPECT_EQ(startup_count0, 1);
1955 EXPECT_EQ(startup_count1, 0);
1956
1957 factory.RunFor(chrono::nanoseconds(1));
1958 EXPECT_EQ(startup_count0, 1);
1959 EXPECT_EQ(startup_count1, 1);
1960
1961 std::unique_ptr<EventLoop> loop = pi1->MakeEventLoop("foo");
1962 loop->OnRun([&]() { pi1->OnStartup([]() {}); });
1963
1964 EXPECT_DEATH({ factory.RunFor(chrono::nanoseconds(1)); },
1965 "Can only register OnStartup handlers when not running.");
1966}
1967
1968// Tests that OnStartup handlers can be added after running and get called, and
1969// all the handlers get called on reboot. Shutdown handlers are tested the same
1970// way.
1971TEST(SimulatedEventLoopTest, OnStartupShutdownAllRestarts) {
1972 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
1973 aos::configuration::ReadConfig(
1974 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
1975
Austin Schuh72e65682021-09-02 11:37:05 -07001976 int startup_count0 = 0;
1977 int shutdown_count0 = 0;
1978 int startup_count1 = 0;
1979 int shutdown_count1 = 0;
1980
Austin Schuh58646e22021-08-23 23:51:46 -07001981 message_bridge::TestingTimeConverter time(
1982 configuration::NodesCount(&config.message()));
1983 SimulatedEventLoopFactory factory(&config.message());
1984 factory.SetTimeConverter(&time);
1985 time.StartEqual();
1986
1987 const chrono::nanoseconds dt = chrono::seconds(10);
1988 time.RebootAt(0, distributed_clock::epoch() + dt);
1989 time.RebootAt(0, distributed_clock::epoch() + 2 * dt);
1990
1991 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1992
Austin Schuh58646e22021-08-23 23:51:46 -07001993 pi1->OnStartup([&]() { ++startup_count0; });
1994 pi1->OnShutdown([&]() { ++shutdown_count0; });
1995 EXPECT_EQ(startup_count0, 0);
1996 EXPECT_EQ(startup_count1, 0);
1997 EXPECT_EQ(shutdown_count0, 0);
1998 EXPECT_EQ(shutdown_count1, 0);
1999
2000 factory.RunFor(chrono::nanoseconds(1));
2001 EXPECT_EQ(startup_count0, 1);
2002 EXPECT_EQ(startup_count1, 0);
2003 EXPECT_EQ(shutdown_count0, 0);
2004 EXPECT_EQ(shutdown_count1, 0);
2005
2006 pi1->OnStartup([&]() { ++startup_count1; });
2007 EXPECT_EQ(startup_count0, 1);
2008 EXPECT_EQ(startup_count1, 0);
2009 EXPECT_EQ(shutdown_count0, 0);
2010 EXPECT_EQ(shutdown_count1, 0);
2011
2012 factory.RunFor(chrono::nanoseconds(1));
2013 EXPECT_EQ(startup_count0, 1);
2014 EXPECT_EQ(startup_count1, 1);
2015 EXPECT_EQ(shutdown_count0, 0);
2016 EXPECT_EQ(shutdown_count1, 0);
2017
2018 factory.RunFor(chrono::seconds(15));
2019
2020 EXPECT_EQ(startup_count0, 2);
2021 EXPECT_EQ(startup_count1, 2);
2022 EXPECT_EQ(shutdown_count0, 1);
2023 EXPECT_EQ(shutdown_count1, 0);
2024
2025 pi1->OnShutdown([&]() { ++shutdown_count1; });
2026 factory.RunFor(chrono::seconds(10));
2027
2028 EXPECT_EQ(startup_count0, 3);
2029 EXPECT_EQ(startup_count1, 3);
2030 EXPECT_EQ(shutdown_count0, 2);
2031 EXPECT_EQ(shutdown_count1, 1);
2032}
2033
2034// Tests that event loops which outlive shutdown crash.
2035TEST(SimulatedEventLoopDeathTest, EventLoopOutlivesReboot) {
2036 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2037 aos::configuration::ReadConfig(
2038 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2039
2040 message_bridge::TestingTimeConverter time(
2041 configuration::NodesCount(&config.message()));
2042 SimulatedEventLoopFactory factory(&config.message());
2043 factory.SetTimeConverter(&time);
2044 time.StartEqual();
2045
2046 const chrono::nanoseconds dt = chrono::seconds(10);
2047 time.RebootAt(0, distributed_clock::epoch() + dt);
2048
2049 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2050
2051 std::unique_ptr<EventLoop> loop = pi1->MakeEventLoop("foo");
2052
2053 EXPECT_DEATH({ factory.RunFor(dt * 2); }, "Event loop");
2054}
2055
Brian Silvermane1fe2512022-08-14 23:18:50 -07002056// Test that an ExitHandle outliving its factory is caught.
2057TEST(SimulatedEventLoopDeathTest, ExitHandleOutlivesFactory) {
2058 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2059 aos::configuration::ReadConfig(
2060 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2061 auto factory = std::make_unique<SimulatedEventLoopFactory>(&config.message());
2062 NodeEventLoopFactory *pi1 = factory->GetNodeEventLoopFactory("pi1");
2063 std::unique_ptr<EventLoop> loop = pi1->MakeEventLoop("foo");
2064 auto exit_handle = factory->MakeExitHandle();
2065 EXPECT_DEATH(factory.reset(),
2066 "All ExitHandles must be destroyed before the factory");
2067}
2068
Austin Schuh58646e22021-08-23 23:51:46 -07002069// Tests that messages don't survive a reboot of a node.
2070TEST(SimulatedEventLoopTest, ChannelClearedOnReboot) {
2071 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2072 aos::configuration::ReadConfig(
2073 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2074
2075 message_bridge::TestingTimeConverter time(
2076 configuration::NodesCount(&config.message()));
2077 SimulatedEventLoopFactory factory(&config.message());
2078 factory.SetTimeConverter(&time);
2079 time.StartEqual();
2080
2081 const chrono::nanoseconds dt = chrono::seconds(10);
2082 time.RebootAt(0, distributed_clock::epoch() + dt);
2083
2084 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2085
2086 const UUID boot_uuid = pi1->boot_uuid();
2087 EXPECT_NE(boot_uuid, UUID::Zero());
2088
2089 {
2090 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
2091 aos::Sender<examples::Ping> test_message_sender =
2092 ping_event_loop->MakeSender<examples::Ping>("/reliable");
2093 SendPing(&test_message_sender, 1);
2094 }
2095
2096 factory.RunFor(chrono::seconds(5));
2097
2098 {
2099 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
2100 aos::Fetcher<examples::Ping> fetcher =
2101 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
2102 EXPECT_TRUE(fetcher.Fetch());
2103 }
2104
2105 factory.RunFor(chrono::seconds(10));
2106
2107 {
2108 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
2109 aos::Fetcher<examples::Ping> fetcher =
2110 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
2111 EXPECT_FALSE(fetcher.Fetch());
2112 }
2113 EXPECT_NE(boot_uuid, pi1->boot_uuid());
2114}
2115
2116// Tests that reliable messages get resent on reboot.
2117TEST(SimulatedEventLoopTest, ReliableMessageResentOnReboot) {
2118 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2119 aos::configuration::ReadConfig(
2120 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2121
2122 message_bridge::TestingTimeConverter time(
2123 configuration::NodesCount(&config.message()));
2124 SimulatedEventLoopFactory factory(&config.message());
2125 factory.SetTimeConverter(&time);
2126 time.StartEqual();
2127
2128 const chrono::nanoseconds dt = chrono::seconds(1);
2129 time.RebootAt(1, distributed_clock::epoch() + dt);
2130
2131 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2132 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2133
2134 const UUID pi1_boot_uuid = pi1->boot_uuid();
2135 const UUID pi2_boot_uuid = pi2->boot_uuid();
2136 EXPECT_NE(pi1_boot_uuid, UUID::Zero());
2137 EXPECT_NE(pi2_boot_uuid, UUID::Zero());
2138
2139 {
2140 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
2141 aos::Sender<examples::Ping> test_message_sender =
2142 ping_event_loop->MakeSender<examples::Ping>("/reliable");
2143 SendPing(&test_message_sender, 1);
2144 }
2145
2146 factory.RunFor(chrono::milliseconds(500));
2147
2148 {
2149 ::std::unique_ptr<EventLoop> ping_event_loop = pi2->MakeEventLoop("pong");
2150 aos::Fetcher<examples::Ping> fetcher =
2151 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
2152 EXPECT_TRUE(fetcher.Fetch());
2153 }
2154
2155 factory.RunFor(chrono::seconds(1));
2156
2157 {
2158 ::std::unique_ptr<EventLoop> ping_event_loop = pi2->MakeEventLoop("pong");
2159 aos::Fetcher<examples::Ping> fetcher =
2160 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
2161 EXPECT_TRUE(fetcher.Fetch());
2162 }
2163 EXPECT_NE(pi2_boot_uuid, pi2->boot_uuid());
2164}
2165
James Kuszmaul86e86c32022-07-21 17:39:47 -07002166TEST(SimulatedEventLoopTest, ReliableMessageSentOnStaggeredBoot) {
2167 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2168 aos::configuration::ReadConfig(
2169 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2170
2171 message_bridge::TestingTimeConverter time(
2172 configuration::NodesCount(&config.message()));
2173 time.AddNextTimestamp(
2174 distributed_clock::epoch(),
2175 {BootTimestamp{0, monotonic_clock::epoch()},
2176 BootTimestamp{0, monotonic_clock::epoch() - chrono::seconds(1)},
2177 BootTimestamp{0, monotonic_clock::epoch()}});
2178 SimulatedEventLoopFactory factory(&config.message());
2179 factory.SetTimeConverter(&time);
2180
2181 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2182 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2183
2184 const UUID pi1_boot_uuid = pi1->boot_uuid();
2185 const UUID pi2_boot_uuid = pi2->boot_uuid();
2186 EXPECT_NE(pi1_boot_uuid, UUID::Zero());
2187 EXPECT_NE(pi2_boot_uuid, UUID::Zero());
2188
2189 {
2190 ::std::unique_ptr<EventLoop> pi1_event_loop = pi1->MakeEventLoop("ping");
2191 aos::Sender<examples::Ping> pi1_sender =
2192 pi1_event_loop->MakeSender<examples::Ping>("/reliable");
2193 SendPing(&pi1_sender, 1);
2194 }
2195 ::std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("ping");
2196 aos::Sender<examples::Ping> pi2_sender =
2197 pi2_event_loop->MakeSender<examples::Ping>("/reliable2");
2198 SendPing(&pi2_sender, 1);
2199 // Verify that we staggered the OnRun callback correctly.
2200 pi2_event_loop->OnRun([pi1, pi2]() {
2201 EXPECT_EQ(pi1->monotonic_now(),
2202 monotonic_clock::epoch() + std::chrono::seconds(1));
2203 EXPECT_EQ(pi2->monotonic_now(), monotonic_clock::epoch());
2204 });
2205
2206 factory.RunFor(chrono::seconds(2));
2207
2208 {
2209 ::std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("pong");
2210 aos::Fetcher<examples::Ping> fetcher =
2211 pi2_event_loop->MakeFetcher<examples::Ping>("/reliable");
2212 ASSERT_TRUE(fetcher.Fetch());
2213 EXPECT_EQ(fetcher.context().monotonic_event_time,
2214 monotonic_clock::epoch() + factory.network_delay());
2215 EXPECT_EQ(fetcher.context().monotonic_remote_time,
2216 monotonic_clock::epoch());
2217 }
2218 {
2219 ::std::unique_ptr<EventLoop> pi1_event_loop = pi1->MakeEventLoop("pong");
2220 aos::Fetcher<examples::Ping> fetcher =
2221 pi1_event_loop->MakeFetcher<examples::Ping>("/reliable2");
2222 ASSERT_TRUE(fetcher.Fetch());
2223 EXPECT_EQ(fetcher.context().monotonic_event_time,
2224 monotonic_clock::epoch() + std::chrono::seconds(1) +
2225 factory.network_delay());
2226 EXPECT_EQ(fetcher.context().monotonic_remote_time,
2227 monotonic_clock::epoch() - std::chrono::seconds(1));
2228 }
2229}
2230
Austin Schuh48205e62021-11-12 14:13:18 -08002231class SimulatedEventLoopDisconnectTest : public ::testing::Test {
2232 public:
2233 SimulatedEventLoopDisconnectTest()
2234 : config(aos::configuration::ReadConfig(ArtifactPath(
2235 "aos/events/multinode_pingpong_test_split_config.json"))),
2236 time(configuration::NodesCount(&config.message())),
2237 factory(&config.message()) {
2238 factory.SetTimeConverter(&time);
2239 }
2240
2241 void VerifyChannels(std::set<const aos::Channel *> statistics_channels,
2242 const monotonic_clock::time_point allowable_message_time,
2243 std::set<const aos::Node *> empty_nodes) {
2244 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2245 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2246 std::unique_ptr<aos::EventLoop> pi1_event_loop =
2247 pi1->MakeEventLoop("fetcher");
2248 std::unique_ptr<aos::EventLoop> pi2_event_loop =
2249 pi2->MakeEventLoop("fetcher");
2250 for (const aos::Channel *channel : *factory.configuration()->channels()) {
2251 if (configuration::ChannelIsReadableOnNode(channel,
2252 pi1_event_loop->node())) {
2253 std::unique_ptr<aos::RawFetcher> fetcher =
2254 pi1_event_loop->MakeRawFetcher(channel);
2255 if (statistics_channels.find(channel) == statistics_channels.end() ||
2256 empty_nodes.find(pi1_event_loop->node()) != empty_nodes.end()) {
2257 EXPECT_FALSE(fetcher->Fetch() &&
2258 fetcher->context().monotonic_event_time >
2259 allowable_message_time)
2260 << ": Found recent message on channel "
2261 << configuration::CleanedChannelToString(channel) << " and time "
2262 << fetcher->context().monotonic_event_time << " > "
2263 << allowable_message_time << " on pi1";
2264 } else {
2265 EXPECT_TRUE(fetcher->Fetch() &&
2266 fetcher->context().monotonic_event_time >=
2267 allowable_message_time)
2268 << ": Didn't find recent message on channel "
2269 << configuration::CleanedChannelToString(channel) << " on pi1";
2270 }
2271 }
2272 if (configuration::ChannelIsReadableOnNode(channel,
2273 pi2_event_loop->node())) {
2274 std::unique_ptr<aos::RawFetcher> fetcher =
2275 pi2_event_loop->MakeRawFetcher(channel);
2276 if (statistics_channels.find(channel) == statistics_channels.end() ||
2277 empty_nodes.find(pi2_event_loop->node()) != empty_nodes.end()) {
2278 EXPECT_FALSE(fetcher->Fetch() &&
2279 fetcher->context().monotonic_event_time >
2280 allowable_message_time)
2281 << ": Found message on channel "
2282 << configuration::CleanedChannelToString(channel) << " and time "
2283 << fetcher->context().monotonic_event_time << " > "
2284 << allowable_message_time << " on pi2";
2285 } else {
2286 EXPECT_TRUE(fetcher->Fetch() &&
2287 fetcher->context().monotonic_event_time >=
2288 allowable_message_time)
2289 << ": Didn't find message on channel "
2290 << configuration::CleanedChannelToString(channel) << " on pi2";
2291 }
2292 }
2293 }
2294 }
2295
2296 aos::FlatbufferDetachedBuffer<aos::Configuration> config;
2297
2298 message_bridge::TestingTimeConverter time;
2299 SimulatedEventLoopFactory factory;
2300};
2301
2302// Tests that if we have message bridge client/server disabled, and timing
2303// reports disabled, no messages are sent. Also tests that we can disconnect a
2304// node and disable statistics on it and it actually fully disconnects.
2305TEST_F(SimulatedEventLoopDisconnectTest, NoMessagesWhenDisabled) {
2306 time.StartEqual();
2307 factory.SkipTimingReport();
2308 factory.DisableStatistics();
2309
2310 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2311 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2312
2313 std::unique_ptr<aos::EventLoop> pi1_event_loop =
2314 pi1->MakeEventLoop("fetcher");
2315 std::unique_ptr<aos::EventLoop> pi2_event_loop =
2316 pi2->MakeEventLoop("fetcher");
2317
2318 factory.RunFor(chrono::milliseconds(100000));
2319
2320 // Confirm no messages are sent if we've configured them all off.
2321 VerifyChannels({}, monotonic_clock::min_time, {});
2322
2323 // Now, confirm that all the message_bridge channels come back when we
2324 // re-enable.
2325 factory.EnableStatistics();
2326
2327 factory.RunFor(chrono::milliseconds(10050));
2328
2329 // Build up the list of all the messages we expect when we come back.
2330 {
2331 std::set<const aos::Channel *> statistics_channels;
Brian Silverman18c8aac2022-01-02 00:03:37 -08002332 for (const std::pair<std::string_view, const Node *> &pi :
Austin Schuh48205e62021-11-12 14:13:18 -08002333 std::vector<std::pair<std::string_view, const Node *>>{
2334 {"/pi1/aos", pi1->node()},
2335 {"/pi2/aos", pi1->node()},
2336 {"/pi3/aos", pi1->node()}}) {
2337 statistics_channels.insert(configuration::GetChannel(
2338 factory.configuration(), pi.first, "aos.message_bridge.Timestamp", "",
2339 pi.second));
2340 statistics_channels.insert(configuration::GetChannel(
2341 factory.configuration(), pi.first,
2342 "aos.message_bridge.ServerStatistics", "", pi.second));
2343 statistics_channels.insert(configuration::GetChannel(
2344 factory.configuration(), pi.first,
2345 "aos.message_bridge.ClientStatistics", "", pi.second));
2346 }
2347
2348 statistics_channels.insert(configuration::GetChannel(
2349 factory.configuration(),
2350 "/pi1/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp",
2351 "aos.message_bridge.RemoteMessage", "", pi1->node()));
2352 statistics_channels.insert(configuration::GetChannel(
2353 factory.configuration(),
2354 "/pi2/aos/remote_timestamps/pi1/pi2/aos/aos-message_bridge-Timestamp",
2355 "aos.message_bridge.RemoteMessage", "", pi2->node()));
2356 VerifyChannels(statistics_channels, monotonic_clock::min_time, {});
2357 }
2358
2359 // Now test that we can disable the messages for a single node
2360 pi2->DisableStatistics();
2361 const aos::monotonic_clock::time_point statistics_disable_time =
2362 pi2->monotonic_now();
2363 factory.RunFor(chrono::milliseconds(10000));
2364
2365 // We should see a much smaller set of messages, but should still see messages
2366 // forwarded, mainly the timestamp message.
2367 {
2368 std::set<const aos::Channel *> statistics_channels;
Brian Silverman18c8aac2022-01-02 00:03:37 -08002369 for (const std::pair<std::string_view, const Node *> &pi :
Austin Schuh48205e62021-11-12 14:13:18 -08002370 std::vector<std::pair<std::string_view, const Node *>>{
2371 {"/pi1/aos", pi1->node()}, {"/pi3/aos", pi1->node()}}) {
2372 statistics_channels.insert(configuration::GetChannel(
2373 factory.configuration(), pi.first, "aos.message_bridge.Timestamp", "",
2374 pi.second));
2375 statistics_channels.insert(configuration::GetChannel(
2376 factory.configuration(), pi.first,
2377 "aos.message_bridge.ServerStatistics", "", pi.second));
2378 statistics_channels.insert(configuration::GetChannel(
2379 factory.configuration(), pi.first,
2380 "aos.message_bridge.ClientStatistics", "", pi.second));
2381 }
2382
2383 statistics_channels.insert(configuration::GetChannel(
2384 factory.configuration(),
2385 "/pi1/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp",
2386 "aos.message_bridge.RemoteMessage", "", pi1->node()));
2387 VerifyChannels(statistics_channels, statistics_disable_time, {});
2388 }
2389
2390 // Now, fully disconnect the node. This will completely quiet down pi2.
2391 pi1->Disconnect(pi2->node());
2392 pi2->Disconnect(pi1->node());
2393
2394 const aos::monotonic_clock::time_point disconnect_disable_time =
2395 pi2->monotonic_now();
2396 factory.RunFor(chrono::milliseconds(10000));
2397
2398 {
2399 std::set<const aos::Channel *> statistics_channels;
Brian Silverman18c8aac2022-01-02 00:03:37 -08002400 for (const std::pair<std::string_view, const Node *> &pi :
Austin Schuh48205e62021-11-12 14:13:18 -08002401 std::vector<std::pair<std::string_view, const Node *>>{
2402 {"/pi1/aos", pi1->node()}, {"/pi3/aos", pi1->node()}}) {
2403 statistics_channels.insert(configuration::GetChannel(
2404 factory.configuration(), pi.first, "aos.message_bridge.Timestamp", "",
2405 pi.second));
2406 statistics_channels.insert(configuration::GetChannel(
2407 factory.configuration(), pi.first,
2408 "aos.message_bridge.ServerStatistics", "", pi.second));
2409 statistics_channels.insert(configuration::GetChannel(
2410 factory.configuration(), pi.first,
2411 "aos.message_bridge.ClientStatistics", "", pi.second));
2412 }
2413
2414 VerifyChannels(statistics_channels, disconnect_disable_time, {pi2->node()});
2415 }
2416}
2417
Neil Balchc8f41ed2018-01-20 22:06:53 -08002418} // namespace testing
2419} // namespace aos