blob: 23badae660700a8c2a34f14114343c16b441f5c7 [file] [log] [blame]
Alex Perrycb7da4b2019-08-28 19:35:56 -07001#include "aos/events/simulated_event_loop.h"
2
milind1f1dca32021-07-03 13:50:07 -07003#include <chrono>
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -07004#include <functional>
Austin Schuh5f1cc5c2019-12-01 18:01:11 -08005#include <string_view>
6
Philipp Schrader790cb542023-07-05 21:06:52 -07007#include "gtest/gtest.h"
8
Alex Perrycb7da4b2019-08-28 19:35:56 -07009#include "aos/events/event_loop_param_test.h"
Austin Schuh2f8fd752020-09-01 22:38:28 -070010#include "aos/events/logging/logger_generated.h"
Austin Schuh01b4c352020-09-21 23:09:39 -070011#include "aos/events/message_counter.h"
Austin Schuh898f4972020-01-11 17:21:25 -080012#include "aos/events/ping_lib.h"
13#include "aos/events/pong_lib.h"
Austin Schuh7d87b672019-12-01 20:23:49 -080014#include "aos/events/test_message_generated.h"
Austin Schuh4c3b9702020-08-30 11:34:55 -070015#include "aos/network/message_bridge_client_generated.h"
16#include "aos/network/message_bridge_server_generated.h"
Austin Schuh0de30f32020-12-06 12:44:28 -080017#include "aos/network/remote_message_generated.h"
Austin Schuh87dd3832021-01-01 23:07:31 -080018#include "aos/network/testing_time_converter.h"
Austin Schuh4c3b9702020-08-30 11:34:55 -070019#include "aos/network/timestamp_generated.h"
Austin Schuh373f1762021-06-02 21:07:09 -070020#include "aos/testing/path.h"
Neil Balchc8f41ed2018-01-20 22:06:53 -080021
22namespace aos {
23namespace testing {
Brian Silverman28d14302020-09-18 15:26:17 -070024namespace {
25
Austin Schuh373f1762021-06-02 21:07:09 -070026using aos::testing::ArtifactPath;
Brian Silverman28d14302020-09-18 15:26:17 -070027
Austin Schuh58646e22021-08-23 23:51:46 -070028using logger::BootTimestamp;
Austin Schuh0de30f32020-12-06 12:44:28 -080029using message_bridge::RemoteMessage;
Austin Schuh7267c532019-05-19 19:55:53 -070030namespace chrono = ::std::chrono;
31
Austin Schuh0de30f32020-12-06 12:44:28 -080032} // namespace
33
Neil Balchc8f41ed2018-01-20 22:06:53 -080034class SimulatedEventLoopTestFactory : public EventLoopTestFactory {
35 public:
Austin Schuh5f1cc5c2019-12-01 18:01:11 -080036 ::std::unique_ptr<EventLoop> Make(std::string_view name) override {
Austin Schuh217a9782019-12-21 23:02:50 -080037 MaybeMake();
Austin Schuhac0771c2020-01-07 18:36:30 -080038 return event_loop_factory_->MakeEventLoop(name, my_node());
Neil Balchc8f41ed2018-01-20 22:06:53 -080039 }
Austin Schuh5f1cc5c2019-12-01 18:01:11 -080040 ::std::unique_ptr<EventLoop> MakePrimary(std::string_view name) override {
Austin Schuh217a9782019-12-21 23:02:50 -080041 MaybeMake();
Austin Schuhac0771c2020-01-07 18:36:30 -080042 return event_loop_factory_->MakeEventLoop(name, my_node());
Austin Schuh44019f92019-05-19 19:58:27 -070043 }
44
Austin Schuh217a9782019-12-21 23:02:50 -080045 void Run() override { event_loop_factory_->Run(); }
46 void Exit() override { event_loop_factory_->Exit(); }
Austin Schuh44019f92019-05-19 19:58:27 -070047
Austin Schuh52d325c2019-06-23 18:59:06 -070048 // TODO(austin): Implement this. It's used currently for a phased loop test.
49 // I'm not sure how much that matters.
50 void SleepFor(::std::chrono::nanoseconds /*duration*/) override {}
51
Austin Schuh7d87b672019-12-01 20:23:49 -080052 void set_send_delay(std::chrono::nanoseconds send_delay) {
Austin Schuh217a9782019-12-21 23:02:50 -080053 MaybeMake();
54 event_loop_factory_->set_send_delay(send_delay);
Austin Schuh7d87b672019-12-01 20:23:49 -080055 }
56
Neil Balchc8f41ed2018-01-20 22:06:53 -080057 private:
Austin Schuh217a9782019-12-21 23:02:50 -080058 void MaybeMake() {
59 if (!event_loop_factory_) {
60 if (configuration()->has_nodes()) {
Austin Schuhac0771c2020-01-07 18:36:30 -080061 event_loop_factory_ =
62 std::make_unique<SimulatedEventLoopFactory>(configuration());
Austin Schuh217a9782019-12-21 23:02:50 -080063 } else {
64 event_loop_factory_ =
65 std::make_unique<SimulatedEventLoopFactory>(configuration());
66 }
67 }
68 }
69 std::unique_ptr<SimulatedEventLoopFactory> event_loop_factory_;
Neil Balchc8f41ed2018-01-20 22:06:53 -080070};
71
Austin Schuh6bae8252021-02-07 22:01:49 -080072auto CommonParameters() {
73 return ::testing::Combine(
74 ::testing::Values([]() { return new SimulatedEventLoopTestFactory(); }),
75 ::testing::Values(ReadMethod::COPY, ReadMethod::PIN),
76 ::testing::Values(DoTimingReports::kYes, DoTimingReports::kNo));
77}
Austin Schuh6b6dfa52019-06-12 20:16:20 -070078
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -070079INSTANTIATE_TEST_SUITE_P(SimulatedEventLoopCommonTest, AbstractEventLoopTest,
Austin Schuh66168842021-08-17 19:42:21 -070080 CommonParameters());
Brian Silverman77162972020-08-12 19:52:40 -070081
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -070082INSTANTIATE_TEST_SUITE_P(SimulatedEventLoopCommonDeathTest,
Austin Schuh66168842021-08-17 19:42:21 -070083 AbstractEventLoopDeathTest, CommonParameters());
Neil Balchc8f41ed2018-01-20 22:06:53 -080084
Austin Schuh89c9b812021-02-20 14:42:10 -080085// Parameters to run all the tests with.
86struct Param {
87 // The config file to use.
88 std::string config;
89 // If true, the RemoteMessage channel should be shared between all the remote
90 // channels. If false, there will be 1 RemoteMessage channel per remote
91 // channel.
92 bool shared;
93};
94
95class RemoteMessageSimulatedEventLoopTest
96 : public ::testing::TestWithParam<struct Param> {
97 public:
98 RemoteMessageSimulatedEventLoopTest()
99 : config(aos::configuration::ReadConfig(
Austin Schuh373f1762021-06-02 21:07:09 -0700100 ArtifactPath(absl::StrCat("aos/events/", GetParam().config)))) {
Austin Schuh89c9b812021-02-20 14:42:10 -0800101 LOG(INFO) << "Config " << GetParam().config;
102 }
103
104 bool shared() const { return GetParam().shared; }
105
106 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
107 MakePi2OnPi1MessageCounters(aos::EventLoop *event_loop) {
108 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>> counters;
109 if (shared()) {
110 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
111 event_loop, "/aos/remote_timestamps/pi2"));
112 } else {
113 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
114 event_loop,
115 "/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp"));
116 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
117 event_loop, "/aos/remote_timestamps/pi2/test/aos-examples-Ping"));
118 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
119 event_loop, "/aos/remote_timestamps/pi2/reliable/aos-examples-Ping"));
120 }
121 return counters;
122 }
123
124 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
125 MakePi1OnPi2MessageCounters(aos::EventLoop *event_loop) {
126 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>> counters;
127 if (shared()) {
128 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
129 event_loop, "/aos/remote_timestamps/pi1"));
130 } else {
131 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
132 event_loop, "/aos/remote_timestamps/pi1/test/aos-examples-Pong"));
133 counters.emplace_back(std::make_unique<MessageCounter<RemoteMessage>>(
134 event_loop,
135 "/aos/remote_timestamps/pi1/pi2/aos/aos-message_bridge-Timestamp"));
136 }
137 return counters;
138 }
139
140 aos::FlatbufferDetachedBuffer<aos::Configuration> config;
141};
142
Austin Schuh8fb315a2020-11-19 22:33:58 -0800143// Test that sending a message after running gets properly notified.
144TEST(SimulatedEventLoopTest, SendAfterRunFor) {
145 SimulatedEventLoopTestFactory factory;
146
147 SimulatedEventLoopFactory simulated_event_loop_factory(
148 factory.configuration());
149
150 ::std::unique_ptr<EventLoop> ping_event_loop =
151 simulated_event_loop_factory.MakeEventLoop("ping");
152 aos::Sender<TestMessage> test_message_sender =
153 ping_event_loop->MakeSender<TestMessage>("/test");
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700154 ASSERT_EQ(SendTestMessage(test_message_sender), RawSender::Error::kOk);
Austin Schuh8fb315a2020-11-19 22:33:58 -0800155
156 std::unique_ptr<EventLoop> pong1_event_loop =
157 simulated_event_loop_factory.MakeEventLoop("pong");
158 MessageCounter<TestMessage> test_message_counter1(pong1_event_loop.get(),
159 "/test");
160
161 EXPECT_FALSE(ping_event_loop->is_running());
162
163 // Watchers start when you start running, so there should be nothing counted.
164 simulated_event_loop_factory.RunFor(chrono::seconds(1));
165 EXPECT_EQ(test_message_counter1.count(), 0u);
166
167 std::unique_ptr<EventLoop> pong2_event_loop =
168 simulated_event_loop_factory.MakeEventLoop("pong");
169 MessageCounter<TestMessage> test_message_counter2(pong2_event_loop.get(),
170 "/test");
171
172 // Pauses in the middle don't count though, so this should be counted.
173 // But, the fresh watcher shouldn't pick it up yet.
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700174 ASSERT_EQ(SendTestMessage(test_message_sender), RawSender::Error::kOk);
Austin Schuh8fb315a2020-11-19 22:33:58 -0800175
176 EXPECT_EQ(test_message_counter1.count(), 0u);
177 EXPECT_EQ(test_message_counter2.count(), 0u);
178 simulated_event_loop_factory.RunFor(chrono::seconds(1));
179
180 EXPECT_EQ(test_message_counter1.count(), 1u);
181 EXPECT_EQ(test_message_counter2.count(), 0u);
182}
183
Austin Schuh60e77942022-05-16 17:48:24 -0700184// Test that if we configure an event loop to be able to send too fast that we
185// do allow it to do so.
James Kuszmaul890c2492022-04-06 14:59:31 -0700186TEST(SimulatedEventLoopTest, AllowSendTooFast) {
187 SimulatedEventLoopTestFactory factory;
188
189 SimulatedEventLoopFactory simulated_event_loop_factory(
190 factory.configuration());
191
192 // Create two event loops: One will be allowed to send too fast, one won't. We
193 // will then test to ensure that the one that is allowed to send too fast can
194 // indeed send too fast, but that it then makes it so that the second event
195 // loop can no longer send anything because *it* is still limited.
196 ::std::unique_ptr<EventLoop> too_fast_event_loop =
197 simulated_event_loop_factory.GetNodeEventLoopFactory(nullptr)
198 ->MakeEventLoop("too_fast_sender",
199 {NodeEventLoopFactory::CheckSentTooFast::kNo,
James Kuszmaul94ca5132022-07-19 09:11:08 -0700200 NodeEventLoopFactory::ExclusiveSenders::kNo,
201 {}});
James Kuszmaul890c2492022-04-06 14:59:31 -0700202 aos::Sender<TestMessage> too_fast_message_sender =
203 too_fast_event_loop->MakeSender<TestMessage>("/test");
204
205 ::std::unique_ptr<EventLoop> limited_event_loop =
206 simulated_event_loop_factory.MakeEventLoop("limited_sender");
207 aos::Sender<TestMessage> limited_message_sender =
208 limited_event_loop->MakeSender<TestMessage>("/test");
209
210 const int queue_size = TestChannelQueueSize(too_fast_event_loop.get());
211 for (int ii = 0; ii < queue_size; ++ii) {
212 ASSERT_EQ(SendTestMessage(too_fast_message_sender), RawSender::Error::kOk);
213 }
214 // And now we should start being in the sending-too-fast phase.
215 for (int ii = 0; ii < queue_size; ++ii) {
216 ASSERT_EQ(SendTestMessage(too_fast_message_sender), RawSender::Error::kOk);
Austin Schuh60e77942022-05-16 17:48:24 -0700217 ASSERT_EQ(SendTestMessage(limited_message_sender),
218 RawSender::Error::kMessagesSentTooFast);
James Kuszmaul890c2492022-04-06 14:59:31 -0700219 }
220}
221
222// Test that if we setup an exclusive sender that it is indeed exclusive.
223TEST(SimulatedEventLoopDeathTest, ExclusiveSenders) {
224 SimulatedEventLoopTestFactory factory;
225
226 SimulatedEventLoopFactory simulated_event_loop_factory(
227 factory.configuration());
228
229 ::std::unique_ptr<EventLoop> exclusive_event_loop =
230 simulated_event_loop_factory.GetNodeEventLoopFactory(nullptr)
James Kuszmaul94ca5132022-07-19 09:11:08 -0700231 ->MakeEventLoop(
232 "too_fast_sender",
233 {NodeEventLoopFactory::CheckSentTooFast::kYes,
234 NodeEventLoopFactory::ExclusiveSenders::kYes,
235 {{configuration::GetChannel(factory.configuration(), "/test1",
236 "aos.TestMessage", "", nullptr),
237 NodeEventLoopFactory::ExclusiveSenders::kNo}}});
James Kuszmaul890c2492022-04-06 14:59:31 -0700238 exclusive_event_loop->SkipAosLog();
239 exclusive_event_loop->SkipTimingReport();
240 ::std::unique_ptr<EventLoop> normal_event_loop =
241 simulated_event_loop_factory.MakeEventLoop("limited_sender");
242 // Set things up to have the exclusive sender be destroyed so we can test
243 // recovery.
244 {
245 aos::Sender<TestMessage> exclusive_sender =
246 exclusive_event_loop->MakeSender<TestMessage>("/test");
247
248 EXPECT_DEATH(normal_event_loop->MakeSender<TestMessage>("/test"),
249 "TestMessage");
250 }
251 // This one should succeed now that the exclusive channel is removed.
252 aos::Sender<TestMessage> normal_sender =
253 normal_event_loop->MakeSender<TestMessage>("/test");
Austin Schuh60e77942022-05-16 17:48:24 -0700254 EXPECT_DEATH(exclusive_event_loop->MakeSender<TestMessage>("/test"),
255 "TestMessage");
James Kuszmaul94ca5132022-07-19 09:11:08 -0700256
257 // And check an explicitly exempted channel:
258 aos::Sender<TestMessage> non_exclusive_sender =
259 exclusive_event_loop->MakeSender<TestMessage>("/test1");
260 aos::Sender<TestMessage> non_exclusive_sender_regular_event_loop =
261 normal_event_loop->MakeSender<TestMessage>("/test1");
James Kuszmaul890c2492022-04-06 14:59:31 -0700262}
263
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700264void TestSentTooFastCheckEdgeCase(
265 const std::function<RawSender::Error(int, int)> expected_err,
266 const bool send_twice_at_end) {
267 SimulatedEventLoopTestFactory factory;
268
269 auto event_loop = factory.MakePrimary("primary");
270
271 auto sender = event_loop->MakeSender<TestMessage>("/test");
272
273 const int queue_size = TestChannelQueueSize(event_loop.get());
274 int msgs_sent = 0;
275 event_loop->AddPhasedLoop(
276 [&](int) {
277 EXPECT_EQ(SendTestMessage(sender), expected_err(msgs_sent, queue_size));
278 msgs_sent++;
279
280 // If send_twice_at_end, send the last two messages (message
281 // queue_size and queue_size + 1) in the same iteration, meaning that
282 // we would be sending very slightly too fast. Otherwise, we will send
283 // message queue_size + 1 in the next iteration and we will continue
284 // to be sending exactly at the channel frequency.
285 if (send_twice_at_end && (msgs_sent == queue_size)) {
286 EXPECT_EQ(SendTestMessage(sender),
287 expected_err(msgs_sent, queue_size));
288 msgs_sent++;
289 }
290
291 if (msgs_sent > queue_size) {
292 factory.Exit();
293 }
294 },
295 std::chrono::duration_cast<std::chrono::nanoseconds>(
296 std::chrono::duration<double>(
297 1.0 / TestChannelFrequency(event_loop.get()))));
298
299 factory.Run();
300}
301
302// Tests that RawSender::Error::kMessagesSentTooFast is not returned
303// when messages are sent at the exact frequency of the channel.
304TEST(SimulatedEventLoopTest, SendingAtExactlyChannelFrequency) {
305 TestSentTooFastCheckEdgeCase([](int, int) { return RawSender::Error::kOk; },
306 false);
307}
308
309// Tests that RawSender::Error::kMessagesSentTooFast is returned
310// when sending exactly one more message than allowed in a channel storage
311// duration.
312TEST(SimulatedEventLoopTest, SendingSlightlyTooFast) {
313 TestSentTooFastCheckEdgeCase(
314 [](const int msgs_sent, const int queue_size) {
315 return (msgs_sent == queue_size ? RawSender::Error::kMessagesSentTooFast
316 : RawSender::Error::kOk);
317 },
318 true);
319}
320
Austin Schuh8fb315a2020-11-19 22:33:58 -0800321// Test that creating an event loop while running dies.
322TEST(SimulatedEventLoopDeathTest, MakeEventLoopWhileRunning) {
323 SimulatedEventLoopTestFactory factory;
324
325 SimulatedEventLoopFactory simulated_event_loop_factory(
326 factory.configuration());
327
328 ::std::unique_ptr<EventLoop> event_loop =
329 simulated_event_loop_factory.MakeEventLoop("ping");
330
331 auto timer = event_loop->AddTimer([&]() {
332 EXPECT_DEATH(
333 {
334 ::std::unique_ptr<EventLoop> event_loop2 =
335 simulated_event_loop_factory.MakeEventLoop("ping");
336 },
337 "event loop while running");
338 simulated_event_loop_factory.Exit();
339 });
340
341 event_loop->OnRun([&event_loop, &timer] {
Philipp Schradera6712522023-07-05 20:25:11 -0700342 timer->Schedule(event_loop->monotonic_now() + chrono::milliseconds(50));
Austin Schuh8fb315a2020-11-19 22:33:58 -0800343 });
344
345 simulated_event_loop_factory.Run();
346}
347
348// Test that creating a watcher after running dies.
349TEST(SimulatedEventLoopDeathTest, MakeWatcherAfterRunning) {
350 SimulatedEventLoopTestFactory factory;
351
352 SimulatedEventLoopFactory simulated_event_loop_factory(
353 factory.configuration());
354
355 ::std::unique_ptr<EventLoop> event_loop =
356 simulated_event_loop_factory.MakeEventLoop("ping");
357
358 simulated_event_loop_factory.RunFor(chrono::seconds(1));
359
360 EXPECT_DEATH(
361 { MessageCounter<TestMessage> counter(event_loop.get(), "/test"); },
362 "Can't add a watcher after running");
363
364 ::std::unique_ptr<EventLoop> event_loop2 =
365 simulated_event_loop_factory.MakeEventLoop("ping");
366
367 simulated_event_loop_factory.RunFor(chrono::seconds(1));
368
369 EXPECT_DEATH(
370 { MessageCounter<TestMessage> counter(event_loop2.get(), "/test"); },
371 "Can't add a watcher after running");
372}
373
Austin Schuh44019f92019-05-19 19:58:27 -0700374// Test that running for a time period with no handlers causes time to progress
375// correctly.
376TEST(SimulatedEventLoopTest, RunForNoHandlers) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800377 SimulatedEventLoopTestFactory factory;
378
379 SimulatedEventLoopFactory simulated_event_loop_factory(
380 factory.configuration());
Austin Schuh44019f92019-05-19 19:58:27 -0700381 ::std::unique_ptr<EventLoop> event_loop =
Austin Schuh5f1cc5c2019-12-01 18:01:11 -0800382 simulated_event_loop_factory.MakeEventLoop("loop");
Austin Schuh44019f92019-05-19 19:58:27 -0700383
384 simulated_event_loop_factory.RunFor(chrono::seconds(1));
385
386 EXPECT_EQ(::aos::monotonic_clock::epoch() + chrono::seconds(1),
Austin Schuh44019f92019-05-19 19:58:27 -0700387 event_loop->monotonic_now());
388}
389
390// Test that running for a time with a periodic handler causes time to end
391// correctly.
392TEST(SimulatedEventLoopTest, RunForTimerHandler) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800393 SimulatedEventLoopTestFactory factory;
394
395 SimulatedEventLoopFactory simulated_event_loop_factory(
396 factory.configuration());
Austin Schuh44019f92019-05-19 19:58:27 -0700397 ::std::unique_ptr<EventLoop> event_loop =
Austin Schuh5f1cc5c2019-12-01 18:01:11 -0800398 simulated_event_loop_factory.MakeEventLoop("loop");
Austin Schuh44019f92019-05-19 19:58:27 -0700399
400 int counter = 0;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700401 auto timer = event_loop->AddTimer([&counter]() { ++counter; });
Austin Schuh44019f92019-05-19 19:58:27 -0700402 event_loop->OnRun([&event_loop, &timer] {
Philipp Schradera6712522023-07-05 20:25:11 -0700403 timer->Schedule(event_loop->monotonic_now() + chrono::milliseconds(50),
404 chrono::milliseconds(100));
Austin Schuh44019f92019-05-19 19:58:27 -0700405 });
406
407 simulated_event_loop_factory.RunFor(chrono::seconds(1));
408
409 EXPECT_EQ(::aos::monotonic_clock::epoch() + chrono::seconds(1),
Austin Schuh44019f92019-05-19 19:58:27 -0700410 event_loop->monotonic_now());
411 EXPECT_EQ(counter, 10);
412}
413
Austin Schuh7d87b672019-12-01 20:23:49 -0800414// Tests that watchers have latency in simulation.
415TEST(SimulatedEventLoopTest, WatcherTimingReport) {
416 SimulatedEventLoopTestFactory factory;
417 factory.set_send_delay(std::chrono::microseconds(50));
418
419 FLAGS_timing_report_ms = 1000;
420 auto loop1 = factory.MakePrimary("primary");
421 loop1->MakeWatcher("/test", [](const TestMessage &) {});
422
423 auto loop2 = factory.Make("sender_loop");
424
425 auto loop3 = factory.Make("report_fetcher");
426
427 Fetcher<timing::Report> report_fetcher =
428 loop3->MakeFetcher<timing::Report>("/aos");
429 EXPECT_FALSE(report_fetcher.Fetch());
430
431 auto sender = loop2->MakeSender<TestMessage>("/test");
432
433 // Send 10 messages in the middle of a timing report period so we get
434 // something interesting back.
435 auto test_timer = loop2->AddTimer([&sender]() {
436 for (int i = 0; i < 10; ++i) {
437 aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
438 TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
439 builder.add_value(200 + i);
milind1f1dca32021-07-03 13:50:07 -0700440 msg.CheckOk(msg.Send(builder.Finish()));
Austin Schuh7d87b672019-12-01 20:23:49 -0800441 }
442 });
443
444 // Quit after 1 timing report, mid way through the next cycle.
445 {
446 auto end_timer = loop1->AddTimer([&factory]() { factory.Exit(); });
Philipp Schradera6712522023-07-05 20:25:11 -0700447 end_timer->Schedule(loop1->monotonic_now() + chrono::milliseconds(2500));
Austin Schuh7d87b672019-12-01 20:23:49 -0800448 end_timer->set_name("end");
449 }
450
451 loop1->OnRun([&test_timer, &loop1]() {
Philipp Schradera6712522023-07-05 20:25:11 -0700452 test_timer->Schedule(loop1->monotonic_now() + chrono::milliseconds(1500));
Austin Schuh7d87b672019-12-01 20:23:49 -0800453 });
454
455 factory.Run();
456
457 // And, since we are here, check that the timing report makes sense.
458 // Start by looking for our event loop's timing.
459 FlatbufferDetachedBuffer<timing::Report> primary_report =
460 FlatbufferDetachedBuffer<timing::Report>::Empty();
461 while (report_fetcher.FetchNext()) {
462 LOG(INFO) << "Report " << FlatbufferToJson(report_fetcher.get());
463 if (report_fetcher->name()->string_view() == "primary") {
464 primary_report = CopyFlatBuffer(report_fetcher.get());
465 }
466 }
467
468 // Check the watcher report.
Ravago Jonescf453ab2020-05-06 21:14:53 -0700469 VLOG(1) << FlatbufferToJson(primary_report, {.multi_line = true});
Austin Schuh7d87b672019-12-01 20:23:49 -0800470
471 EXPECT_EQ(primary_report.message().name()->string_view(), "primary");
472
473 // Just the timing report timer.
474 ASSERT_NE(primary_report.message().timers(), nullptr);
475 EXPECT_EQ(primary_report.message().timers()->size(), 2);
476
477 // No phased loops
478 ASSERT_EQ(primary_report.message().phased_loops(), nullptr);
479
480 // And now confirm that the watcher received all 10 messages, and has latency.
481 ASSERT_NE(primary_report.message().watchers(), nullptr);
482 ASSERT_EQ(primary_report.message().watchers()->size(), 1);
483 EXPECT_EQ(primary_report.message().watchers()->Get(0)->count(), 10);
484 EXPECT_NEAR(
485 primary_report.message().watchers()->Get(0)->wakeup_latency()->average(),
486 0.00005, 1e-9);
487 EXPECT_NEAR(
488 primary_report.message().watchers()->Get(0)->wakeup_latency()->min(),
489 0.00005, 1e-9);
490 EXPECT_NEAR(
491 primary_report.message().watchers()->Get(0)->wakeup_latency()->max(),
492 0.00005, 1e-9);
493 EXPECT_EQ(primary_report.message()
494 .watchers()
495 ->Get(0)
496 ->wakeup_latency()
497 ->standard_deviation(),
498 0.0);
499
500 EXPECT_EQ(
501 primary_report.message().watchers()->Get(0)->handler_time()->average(),
502 0.0);
503 EXPECT_EQ(primary_report.message().watchers()->Get(0)->handler_time()->min(),
504 0.0);
505 EXPECT_EQ(primary_report.message().watchers()->Get(0)->handler_time()->max(),
506 0.0);
507 EXPECT_EQ(primary_report.message()
508 .watchers()
509 ->Get(0)
510 ->handler_time()
511 ->standard_deviation(),
512 0.0);
513}
514
Austin Schuh89c9b812021-02-20 14:42:10 -0800515size_t CountAll(
516 const std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
517 &counters) {
518 size_t count = 0u;
519 for (const std::unique_ptr<MessageCounter<RemoteMessage>> &counter :
520 counters) {
521 count += counter->count();
522 }
523 return count;
524}
525
Austin Schuh4c3b9702020-08-30 11:34:55 -0700526// Tests that ping and pong work when on 2 different nodes, and the message
527// gateway messages are sent out as expected.
Austin Schuh89c9b812021-02-20 14:42:10 -0800528TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodePingPong) {
Austin Schuh898f4972020-01-11 17:21:25 -0800529 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
530 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
Austin Schuh4c3b9702020-08-30 11:34:55 -0700531 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
Austin Schuh898f4972020-01-11 17:21:25 -0800532
533 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
534
535 std::unique_ptr<EventLoop> ping_event_loop =
536 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
537 Ping ping(ping_event_loop.get());
538
539 std::unique_ptr<EventLoop> pong_event_loop =
540 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
541 Pong pong(pong_event_loop.get());
542
543 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
544 simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700545 MessageCounter<examples::Pong> pi2_pong_counter(
546 pi2_pong_counter_event_loop.get(), "/test");
Austin Schuh2f8fd752020-09-01 22:38:28 -0700547 aos::Fetcher<message_bridge::Timestamp> pi1_on_pi2_timestamp_fetcher =
548 pi2_pong_counter_event_loop->MakeFetcher<message_bridge::Timestamp>(
549 "/pi1/aos");
550 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
551 pi2_pong_counter_event_loop->MakeFetcher<examples::Ping>("/test");
Austin Schuh898f4972020-01-11 17:21:25 -0800552
Austin Schuh4c3b9702020-08-30 11:34:55 -0700553 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
554 simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
Austin Schuh898f4972020-01-11 17:21:25 -0800555
556 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
557 simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700558 MessageCounter<examples::Pong> pi1_pong_counter(
559 pi1_pong_counter_event_loop.get(), "/test");
Austin Schuh2f8fd752020-09-01 22:38:28 -0700560 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
561 pi1_pong_counter_event_loop->MakeFetcher<examples::Ping>("/test");
562 aos::Fetcher<message_bridge::Timestamp> pi1_on_pi1_timestamp_fetcher =
563 pi1_pong_counter_event_loop->MakeFetcher<message_bridge::Timestamp>(
564 "/aos");
565
Austin Schuh4c3b9702020-08-30 11:34:55 -0700566 // Count timestamps.
567 MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
568 pi1_pong_counter_event_loop.get(), "/pi1/aos");
569 MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
570 pi2_pong_counter_event_loop.get(), "/pi1/aos");
571 MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
572 pi3_pong_counter_event_loop.get(), "/pi1/aos");
573 MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
574 pi1_pong_counter_event_loop.get(), "/pi2/aos");
575 MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
576 pi2_pong_counter_event_loop.get(), "/pi2/aos");
577 MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
578 pi1_pong_counter_event_loop.get(), "/pi3/aos");
579 MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
580 pi3_pong_counter_event_loop.get(), "/pi3/aos");
581
Austin Schuh2f8fd752020-09-01 22:38:28 -0700582 // Count remote timestamps
Austin Schuh89c9b812021-02-20 14:42:10 -0800583 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
584 remote_timestamps_pi2_on_pi1 =
585 MakePi2OnPi1MessageCounters(pi1_pong_counter_event_loop.get());
586 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
587 remote_timestamps_pi1_on_pi2 =
588 MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700589
Austin Schuh4c3b9702020-08-30 11:34:55 -0700590 // Wait to let timestamp estimation start up before looking for the results.
591 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
592
Austin Schuh8fb315a2020-11-19 22:33:58 -0800593 std::unique_ptr<EventLoop> pi1_statistics_counter_event_loop =
594 simulated_event_loop_factory.MakeEventLoop("pi1_statistics_counter", pi1);
595 std::unique_ptr<EventLoop> pi2_statistics_counter_event_loop =
596 simulated_event_loop_factory.MakeEventLoop("pi2_statistics_counter", pi2);
597 std::unique_ptr<EventLoop> pi3_statistics_counter_event_loop =
598 simulated_event_loop_factory.MakeEventLoop("pi3_statistics_counter", pi3);
599
Austin Schuh4c3b9702020-08-30 11:34:55 -0700600 int pi1_server_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800601 pi1_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700602 "/pi1/aos", [&pi1_server_statistics_count](
603 const message_bridge::ServerStatistics &stats) {
604 VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
605 EXPECT_EQ(stats.connections()->size(), 2u);
606 for (const message_bridge::ServerConnection *connection :
607 *stats.connections()) {
608 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
Austin Schuh367a7f42021-11-23 23:04:36 -0800609 EXPECT_EQ(connection->connection_count(), 1u);
610 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800611 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700612 if (connection->node()->name()->string_view() == "pi2") {
613 EXPECT_GT(connection->sent_packets(), 50);
614 } else if (connection->node()->name()->string_view() == "pi3") {
615 EXPECT_GE(connection->sent_packets(), 5);
616 } else {
617 LOG(FATAL) << "Unknown connection";
618 }
619
620 EXPECT_TRUE(connection->has_monotonic_offset());
621 EXPECT_EQ(connection->monotonic_offset(), 0);
James Kuszmaula6681e22023-05-26 11:20:40 -0700622
623 EXPECT_TRUE(connection->has_channels());
624 int accumulated_sent_count = 0;
625 int accumulated_dropped_count = 0;
626 for (const message_bridge::ServerChannelStatistics *channel :
627 *connection->channels()) {
628 accumulated_sent_count += channel->sent_packets();
629 accumulated_dropped_count += channel->dropped_packets();
630 }
631 EXPECT_EQ(connection->sent_packets(), accumulated_sent_count);
632 EXPECT_EQ(connection->dropped_packets(), accumulated_dropped_count);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700633 }
634 ++pi1_server_statistics_count;
635 });
636
637 int pi2_server_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800638 pi2_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700639 "/pi2/aos", [&pi2_server_statistics_count](
640 const message_bridge::ServerStatistics &stats) {
641 VLOG(1) << "pi2 ServerStatistics " << FlatbufferToJson(&stats);
642 EXPECT_EQ(stats.connections()->size(), 1u);
643
644 const message_bridge::ServerConnection *connection =
645 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800646 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700647 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
648 EXPECT_GT(connection->sent_packets(), 50);
649 EXPECT_TRUE(connection->has_monotonic_offset());
650 EXPECT_EQ(connection->monotonic_offset(), 0);
Austin Schuh367a7f42021-11-23 23:04:36 -0800651 EXPECT_EQ(connection->connection_count(), 1u);
652 EXPECT_EQ(connection->connected_since_time(), 0);
James Kuszmaula6681e22023-05-26 11:20:40 -0700653
654 EXPECT_TRUE(connection->has_channels());
655 int accumulated_sent_count = 0;
656 int accumulated_dropped_count = 0;
657 for (const message_bridge::ServerChannelStatistics *channel :
658 *connection->channels()) {
659 accumulated_sent_count += channel->sent_packets();
660 accumulated_dropped_count += channel->dropped_packets();
661 }
662 EXPECT_EQ(connection->sent_packets(), accumulated_sent_count);
663 EXPECT_EQ(connection->dropped_packets(), accumulated_dropped_count);
664
Austin Schuh4c3b9702020-08-30 11:34:55 -0700665 ++pi2_server_statistics_count;
666 });
667
668 int pi3_server_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800669 pi3_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700670 "/pi3/aos", [&pi3_server_statistics_count](
671 const message_bridge::ServerStatistics &stats) {
672 VLOG(1) << "pi3 ServerStatistics " << FlatbufferToJson(&stats);
673 EXPECT_EQ(stats.connections()->size(), 1u);
674
675 const message_bridge::ServerConnection *connection =
676 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800677 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700678 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
679 EXPECT_GE(connection->sent_packets(), 5);
680 EXPECT_TRUE(connection->has_monotonic_offset());
681 EXPECT_EQ(connection->monotonic_offset(), 0);
Austin Schuh367a7f42021-11-23 23:04:36 -0800682 EXPECT_EQ(connection->connection_count(), 1u);
683 EXPECT_EQ(connection->connected_since_time(), 0);
James Kuszmaula6681e22023-05-26 11:20:40 -0700684
685 EXPECT_TRUE(connection->has_channels());
686 int accumulated_sent_count = 0;
687 int accumulated_dropped_count = 0;
688 for (const message_bridge::ServerChannelStatistics *channel :
689 *connection->channels()) {
690 accumulated_sent_count += channel->sent_packets();
691 accumulated_dropped_count += channel->dropped_packets();
692 }
693 EXPECT_EQ(connection->sent_packets(), accumulated_sent_count);
694 EXPECT_EQ(connection->dropped_packets(), accumulated_dropped_count);
695
Austin Schuh4c3b9702020-08-30 11:34:55 -0700696 ++pi3_server_statistics_count;
697 });
698
699 int pi1_client_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800700 pi1_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700701 "/pi1/aos", [&pi1_client_statistics_count](
702 const message_bridge::ClientStatistics &stats) {
703 VLOG(1) << "pi1 ClientStatistics " << FlatbufferToJson(&stats);
704 EXPECT_EQ(stats.connections()->size(), 2u);
705
706 for (const message_bridge::ClientConnection *connection :
707 *stats.connections()) {
708 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
709 if (connection->node()->name()->string_view() == "pi2") {
710 EXPECT_GT(connection->received_packets(), 50);
711 } else if (connection->node()->name()->string_view() == "pi3") {
712 EXPECT_GE(connection->received_packets(), 5);
713 } else {
714 LOG(FATAL) << "Unknown connection";
715 }
716
Austin Schuhe61d4382021-03-31 21:33:02 -0700717 EXPECT_EQ(connection->partial_deliveries(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700718 EXPECT_TRUE(connection->has_monotonic_offset());
719 EXPECT_EQ(connection->monotonic_offset(), 150000);
Austin Schuh367a7f42021-11-23 23:04:36 -0800720 EXPECT_EQ(connection->connection_count(), 1u);
721 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700722 }
723 ++pi1_client_statistics_count;
724 });
725
726 int pi2_client_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800727 pi2_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700728 "/pi2/aos", [&pi2_client_statistics_count](
729 const message_bridge::ClientStatistics &stats) {
730 VLOG(1) << "pi2 ClientStatistics " << FlatbufferToJson(&stats);
731 EXPECT_EQ(stats.connections()->size(), 1u);
732
733 const message_bridge::ClientConnection *connection =
734 stats.connections()->Get(0);
735 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
736 EXPECT_GT(connection->received_packets(), 50);
Austin Schuhe61d4382021-03-31 21:33:02 -0700737 EXPECT_EQ(connection->partial_deliveries(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700738 EXPECT_TRUE(connection->has_monotonic_offset());
739 EXPECT_EQ(connection->monotonic_offset(), 150000);
Austin Schuh367a7f42021-11-23 23:04:36 -0800740 EXPECT_EQ(connection->connection_count(), 1u);
741 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700742 ++pi2_client_statistics_count;
743 });
744
745 int pi3_client_statistics_count = 0;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800746 pi3_statistics_counter_event_loop->MakeWatcher(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700747 "/pi3/aos", [&pi3_client_statistics_count](
748 const message_bridge::ClientStatistics &stats) {
749 VLOG(1) << "pi3 ClientStatistics " << FlatbufferToJson(&stats);
750 EXPECT_EQ(stats.connections()->size(), 1u);
751
752 const message_bridge::ClientConnection *connection =
753 stats.connections()->Get(0);
754 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
755 EXPECT_GE(connection->received_packets(), 5);
Austin Schuhe61d4382021-03-31 21:33:02 -0700756 EXPECT_EQ(connection->partial_deliveries(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700757 EXPECT_TRUE(connection->has_monotonic_offset());
758 EXPECT_EQ(connection->monotonic_offset(), 150000);
Austin Schuh367a7f42021-11-23 23:04:36 -0800759 EXPECT_EQ(connection->connection_count(), 1u);
760 EXPECT_EQ(connection->connected_since_time(), 0);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700761 ++pi3_client_statistics_count;
762 });
763
Austin Schuh2f8fd752020-09-01 22:38:28 -0700764 // Find the channel index for both the /pi1/aos Timestamp channel and Ping
765 // channel.
766 const size_t pi1_timestamp_channel =
767 configuration::ChannelIndex(pi1_pong_counter_event_loop->configuration(),
768 pi1_on_pi2_timestamp_fetcher.channel());
769 const size_t ping_timestamp_channel =
770 configuration::ChannelIndex(pi1_pong_counter_event_loop->configuration(),
771 ping_on_pi2_fetcher.channel());
772
773 for (const Channel *channel :
774 *pi1_pong_counter_event_loop->configuration()->channels()) {
775 VLOG(1) << "Channel "
776 << configuration::ChannelIndex(
777 pi1_pong_counter_event_loop->configuration(), channel)
778 << " " << configuration::CleanedChannelToString(channel);
779 }
780
Austin Schuh8fb315a2020-11-19 22:33:58 -0800781 std::unique_ptr<EventLoop> pi1_remote_timestamp =
782 simulated_event_loop_factory.MakeEventLoop("pi1_remote_timestamp", pi1);
783
Austin Schuh89c9b812021-02-20 14:42:10 -0800784 for (std::pair<int, std::string> channel :
785 shared()
786 ? std::vector<std::pair<
787 int, std::string>>{{-1, "/pi1/aos/remote_timestamps/pi2"}}
788 : std::vector<std::pair<int, std::string>>{
789 {pi1_timestamp_channel,
790 "/pi1/aos/remote_timestamps/pi2/pi1/aos/"
791 "aos-message_bridge-Timestamp"},
792 {ping_timestamp_channel,
793 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping"}}) {
794 // For each remote timestamp we get back, confirm that it is either a ping
795 // message, or a timestamp we sent out. Also confirm that the timestamps
796 // are correct.
797 pi1_remote_timestamp->MakeWatcher(
798 channel.second,
799 [pi1_timestamp_channel, ping_timestamp_channel, &ping_on_pi2_fetcher,
800 &ping_on_pi1_fetcher, &pi1_on_pi2_timestamp_fetcher,
801 &pi1_on_pi1_timestamp_fetcher, &simulated_event_loop_factory, pi2,
802 channel_index = channel.first](const RemoteMessage &header) {
803 VLOG(1) << aos::FlatbufferToJson(&header);
804 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuhcdd90272021-03-15 12:46:16 -0700805 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
Austin Schuh89c9b812021-02-20 14:42:10 -0800806 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
Austin Schuhcdd90272021-03-15 12:46:16 -0700807 ->boot_uuid());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700808
Austin Schuh89c9b812021-02-20 14:42:10 -0800809 const aos::monotonic_clock::time_point header_monotonic_sent_time(
810 chrono::nanoseconds(header.monotonic_sent_time()));
811 const aos::realtime_clock::time_point header_realtime_sent_time(
812 chrono::nanoseconds(header.realtime_sent_time()));
813 const aos::monotonic_clock::time_point header_monotonic_remote_time(
814 chrono::nanoseconds(header.monotonic_remote_time()));
815 const aos::realtime_clock::time_point header_realtime_remote_time(
816 chrono::nanoseconds(header.realtime_remote_time()));
Austin Schuh2f8fd752020-09-01 22:38:28 -0700817
Austin Schuh89c9b812021-02-20 14:42:10 -0800818 if (channel_index != -1) {
819 ASSERT_EQ(channel_index, header.channel_index());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700820 }
821
Austin Schuh89c9b812021-02-20 14:42:10 -0800822 const Context *pi1_context = nullptr;
823 const Context *pi2_context = nullptr;
824
825 if (header.channel_index() == pi1_timestamp_channel) {
826 // Find the forwarded message.
827 while (pi1_on_pi2_timestamp_fetcher.context().monotonic_event_time <
828 header_monotonic_sent_time) {
829 ASSERT_TRUE(pi1_on_pi2_timestamp_fetcher.FetchNext());
830 }
831
832 // And the source message.
833 while (pi1_on_pi1_timestamp_fetcher.context().monotonic_event_time <
834 header_monotonic_remote_time) {
835 ASSERT_TRUE(pi1_on_pi1_timestamp_fetcher.FetchNext());
836 }
837
838 pi1_context = &pi1_on_pi1_timestamp_fetcher.context();
839 pi2_context = &pi1_on_pi2_timestamp_fetcher.context();
840 } else if (header.channel_index() == ping_timestamp_channel) {
841 // Find the forwarded message.
842 while (ping_on_pi2_fetcher.context().monotonic_event_time <
843 header_monotonic_sent_time) {
844 ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
845 }
846
847 // And the source message.
848 while (ping_on_pi1_fetcher.context().monotonic_event_time <
849 header_monotonic_remote_time) {
850 ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
851 }
852
853 pi1_context = &ping_on_pi1_fetcher.context();
854 pi2_context = &ping_on_pi2_fetcher.context();
855 } else {
856 LOG(FATAL) << "Unknown channel";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700857 }
858
Austin Schuh89c9b812021-02-20 14:42:10 -0800859 // Confirm the forwarded message has matching timestamps to the
860 // timestamps we got back.
861 EXPECT_EQ(pi2_context->queue_index, header.queue_index());
862 EXPECT_EQ(pi2_context->remote_queue_index,
863 header.remote_queue_index());
864 EXPECT_EQ(pi2_context->monotonic_event_time,
865 header_monotonic_sent_time);
866 EXPECT_EQ(pi2_context->realtime_event_time,
867 header_realtime_sent_time);
868 EXPECT_EQ(pi2_context->realtime_remote_time,
869 header_realtime_remote_time);
870 EXPECT_EQ(pi2_context->monotonic_remote_time,
871 header_monotonic_remote_time);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700872
Austin Schuh89c9b812021-02-20 14:42:10 -0800873 // Confirm the forwarded message also matches the source message.
874 EXPECT_EQ(pi1_context->queue_index, header.remote_queue_index());
875 EXPECT_EQ(pi1_context->monotonic_event_time,
876 header_monotonic_remote_time);
877 EXPECT_EQ(pi1_context->realtime_event_time,
878 header_realtime_remote_time);
879 });
880 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700881
Austin Schuh4c3b9702020-08-30 11:34:55 -0700882 simulated_event_loop_factory.RunFor(chrono::seconds(10) -
883 chrono::milliseconds(500) +
884 chrono::milliseconds(5));
885
886 EXPECT_EQ(pi1_pong_counter.count(), 1001);
887 EXPECT_EQ(pi2_pong_counter.count(), 1001);
888
889 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 100);
890 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 100);
891 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 100);
892 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 100);
893 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 100);
894 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 100);
895 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 100);
896
Austin Schuh20ac95d2020-12-05 17:24:19 -0800897 EXPECT_EQ(pi1_server_statistics_count, 10);
898 EXPECT_EQ(pi2_server_statistics_count, 10);
899 EXPECT_EQ(pi3_server_statistics_count, 10);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700900
901 EXPECT_EQ(pi1_client_statistics_count, 95);
902 EXPECT_EQ(pi2_client_statistics_count, 95);
903 EXPECT_EQ(pi3_client_statistics_count, 95);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700904
905 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -0800906 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 1101);
907 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 1101);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700908}
909
910// Tests that an offset between nodes can be recovered and shows up in
911// ServerStatistics correctly.
912TEST(SimulatedEventLoopTest, MultinodePingPongWithOffset) {
913 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
Austin Schuh373f1762021-06-02 21:07:09 -0700914 aos::configuration::ReadConfig(ArtifactPath(
915 "aos/events/multinode_pingpong_test_combined_config.json"));
Austin Schuh4c3b9702020-08-30 11:34:55 -0700916 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
Austin Schuh87dd3832021-01-01 23:07:31 -0800917 const size_t pi1_index = configuration::GetNodeIndex(&config.message(), pi1);
918 ASSERT_EQ(pi1_index, 0u);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700919 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
Austin Schuh87dd3832021-01-01 23:07:31 -0800920 const size_t pi2_index = configuration::GetNodeIndex(&config.message(), pi2);
921 ASSERT_EQ(pi2_index, 1u);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700922 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
Austin Schuh87dd3832021-01-01 23:07:31 -0800923 const size_t pi3_index = configuration::GetNodeIndex(&config.message(), pi3);
924 ASSERT_EQ(pi3_index, 2u);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700925
Austin Schuh87dd3832021-01-01 23:07:31 -0800926 message_bridge::TestingTimeConverter time(
927 configuration::NodesCount(&config.message()));
Austin Schuh4c3b9702020-08-30 11:34:55 -0700928 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
Austin Schuh58646e22021-08-23 23:51:46 -0700929 simulated_event_loop_factory.SetTimeConverter(&time);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700930
931 constexpr chrono::milliseconds kOffset{1501};
Austin Schuh87dd3832021-01-01 23:07:31 -0800932 time.AddNextTimestamp(
933 distributed_clock::epoch(),
Austin Schuh58646e22021-08-23 23:51:46 -0700934 {BootTimestamp::epoch(), BootTimestamp::epoch() + kOffset,
935 BootTimestamp::epoch()});
Austin Schuh4c3b9702020-08-30 11:34:55 -0700936
937 std::unique_ptr<EventLoop> ping_event_loop =
938 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
939 Ping ping(ping_event_loop.get());
940
941 std::unique_ptr<EventLoop> pong_event_loop =
942 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
943 Pong pong(pong_event_loop.get());
944
Austin Schuh8fb315a2020-11-19 22:33:58 -0800945 // Wait to let timestamp estimation start up before looking for the results.
946 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
947
Austin Schuh87dd3832021-01-01 23:07:31 -0800948 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
949 simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
950
Austin Schuh4c3b9702020-08-30 11:34:55 -0700951 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
952 simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
953
954 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
955 simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
956
Austin Schuh4c3b9702020-08-30 11:34:55 -0700957 // Confirm the offsets are being recovered correctly.
958 int pi1_server_statistics_count = 0;
959 pi1_pong_counter_event_loop->MakeWatcher(
960 "/pi1/aos", [&pi1_server_statistics_count,
961 kOffset](const message_bridge::ServerStatistics &stats) {
962 VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
963 EXPECT_EQ(stats.connections()->size(), 2u);
964 for (const message_bridge::ServerConnection *connection :
965 *stats.connections()) {
966 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800967 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700968 if (connection->node()->name()->string_view() == "pi2") {
969 EXPECT_EQ(connection->monotonic_offset(),
970 chrono::nanoseconds(kOffset).count());
971 } else if (connection->node()->name()->string_view() == "pi3") {
972 EXPECT_EQ(connection->monotonic_offset(), 0);
973 } else {
974 LOG(FATAL) << "Unknown connection";
975 }
976
977 EXPECT_TRUE(connection->has_monotonic_offset());
978 }
979 ++pi1_server_statistics_count;
980 });
981
982 int pi2_server_statistics_count = 0;
983 pi2_pong_counter_event_loop->MakeWatcher(
984 "/pi2/aos", [&pi2_server_statistics_count,
985 kOffset](const message_bridge::ServerStatistics &stats) {
986 VLOG(1) << "pi2 ServerStatistics " << FlatbufferToJson(&stats);
987 EXPECT_EQ(stats.connections()->size(), 1u);
988
989 const message_bridge::ServerConnection *connection =
990 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800991 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700992 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
993 EXPECT_TRUE(connection->has_monotonic_offset());
994 EXPECT_EQ(connection->monotonic_offset(),
995 -chrono::nanoseconds(kOffset).count());
996 ++pi2_server_statistics_count;
997 });
998
999 int pi3_server_statistics_count = 0;
1000 pi3_pong_counter_event_loop->MakeWatcher(
1001 "/pi3/aos", [&pi3_server_statistics_count](
1002 const message_bridge::ServerStatistics &stats) {
1003 VLOG(1) << "pi3 ServerStatistics " << FlatbufferToJson(&stats);
1004 EXPECT_EQ(stats.connections()->size(), 1u);
1005
1006 const message_bridge::ServerConnection *connection =
1007 stats.connections()->Get(0);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001008 EXPECT_TRUE(connection->has_boot_uuid());
Austin Schuh4c3b9702020-08-30 11:34:55 -07001009 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1010 EXPECT_TRUE(connection->has_monotonic_offset());
1011 EXPECT_EQ(connection->monotonic_offset(), 0);
1012 ++pi3_server_statistics_count;
1013 });
1014
1015 simulated_event_loop_factory.RunFor(chrono::seconds(10) -
1016 chrono::milliseconds(500) +
1017 chrono::milliseconds(5));
1018
Austin Schuh20ac95d2020-12-05 17:24:19 -08001019 EXPECT_EQ(pi1_server_statistics_count, 10);
Austin Schuh58646e22021-08-23 23:51:46 -07001020 EXPECT_EQ(pi2_server_statistics_count, 10);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001021 EXPECT_EQ(pi3_server_statistics_count, 10);
Austin Schuh4c3b9702020-08-30 11:34:55 -07001022}
1023
1024// Test that disabling statistics actually disables them.
Austin Schuh89c9b812021-02-20 14:42:10 -08001025TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeWithoutStatistics) {
Austin Schuh4c3b9702020-08-30 11:34:55 -07001026 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
1027 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
1028 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
1029
1030 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
1031 simulated_event_loop_factory.DisableStatistics();
1032
1033 std::unique_ptr<EventLoop> ping_event_loop =
1034 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
1035 Ping ping(ping_event_loop.get());
1036
1037 std::unique_ptr<EventLoop> pong_event_loop =
1038 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
1039 Pong pong(pong_event_loop.get());
1040
1041 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
1042 simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
1043
1044 MessageCounter<examples::Pong> pi2_pong_counter(
1045 pi2_pong_counter_event_loop.get(), "/test");
1046
1047 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
1048 simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
1049
1050 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
1051 simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
1052
1053 MessageCounter<examples::Pong> pi1_pong_counter(
1054 pi1_pong_counter_event_loop.get(), "/test");
1055
1056 // Count timestamps.
1057 MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
1058 pi1_pong_counter_event_loop.get(), "/pi1/aos");
1059 MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
1060 pi2_pong_counter_event_loop.get(), "/pi1/aos");
1061 MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
1062 pi3_pong_counter_event_loop.get(), "/pi1/aos");
1063 MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
1064 pi1_pong_counter_event_loop.get(), "/pi2/aos");
1065 MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
1066 pi2_pong_counter_event_loop.get(), "/pi2/aos");
1067 MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
1068 pi1_pong_counter_event_loop.get(), "/pi3/aos");
1069 MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
1070 pi3_pong_counter_event_loop.get(), "/pi3/aos");
1071
Austin Schuh2f8fd752020-09-01 22:38:28 -07001072 // Count remote timestamps
Austin Schuh89c9b812021-02-20 14:42:10 -08001073 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1074 remote_timestamps_pi2_on_pi1 =
1075 MakePi2OnPi1MessageCounters(pi1_pong_counter_event_loop.get());
1076 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1077 remote_timestamps_pi1_on_pi2 =
1078 MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
Austin Schuh2f8fd752020-09-01 22:38:28 -07001079
Austin Schuh4c3b9702020-08-30 11:34:55 -07001080 MessageCounter<message_bridge::ServerStatistics>
1081 pi1_server_statistics_counter(pi1_pong_counter_event_loop.get(),
1082 "/pi1/aos");
1083 MessageCounter<message_bridge::ServerStatistics>
1084 pi2_server_statistics_counter(pi2_pong_counter_event_loop.get(),
1085 "/pi2/aos");
1086 MessageCounter<message_bridge::ServerStatistics>
1087 pi3_server_statistics_counter(pi3_pong_counter_event_loop.get(),
1088 "/pi3/aos");
1089
1090 MessageCounter<message_bridge::ClientStatistics>
1091 pi1_client_statistics_counter(pi1_pong_counter_event_loop.get(),
1092 "/pi1/aos");
1093 MessageCounter<message_bridge::ClientStatistics>
1094 pi2_client_statistics_counter(pi2_pong_counter_event_loop.get(),
1095 "/pi2/aos");
1096 MessageCounter<message_bridge::ClientStatistics>
1097 pi3_client_statistics_counter(pi3_pong_counter_event_loop.get(),
1098 "/pi3/aos");
Austin Schuh898f4972020-01-11 17:21:25 -08001099
1100 simulated_event_loop_factory.RunFor(chrono::seconds(10) +
1101 chrono::milliseconds(5));
1102
Austin Schuh4c3b9702020-08-30 11:34:55 -07001103 EXPECT_EQ(pi1_pong_counter.count(), 1001u);
1104 EXPECT_EQ(pi2_pong_counter.count(), 1001u);
1105
1106 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 0u);
1107 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 0u);
1108 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 0u);
1109 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 0u);
1110 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 0u);
1111 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 0u);
1112 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 0u);
1113
1114 EXPECT_EQ(pi1_server_statistics_counter.count(), 0u);
1115 EXPECT_EQ(pi2_server_statistics_counter.count(), 0u);
1116 EXPECT_EQ(pi3_server_statistics_counter.count(), 0u);
1117
1118 EXPECT_EQ(pi1_client_statistics_counter.count(), 0u);
1119 EXPECT_EQ(pi2_client_statistics_counter.count(), 0u);
1120 EXPECT_EQ(pi3_client_statistics_counter.count(), 0u);
Austin Schuh2f8fd752020-09-01 22:38:28 -07001121
1122 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001123 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 1001);
1124 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 1001);
Austin Schuh898f4972020-01-11 17:21:25 -08001125}
1126
Austin Schuhc0b0f722020-12-12 18:36:06 -08001127bool AllConnected(const message_bridge::ServerStatistics *server_statistics) {
1128 for (const message_bridge::ServerConnection *connection :
1129 *server_statistics->connections()) {
1130 if (connection->state() != message_bridge::State::CONNECTED) {
1131 return false;
1132 }
1133 }
1134 return true;
1135}
1136
1137bool AllConnectedBut(const message_bridge::ServerStatistics *server_statistics,
1138 std::string_view target) {
1139 for (const message_bridge::ServerConnection *connection :
1140 *server_statistics->connections()) {
1141 if (connection->node()->name()->string_view() == target) {
1142 if (connection->state() == message_bridge::State::CONNECTED) {
1143 return false;
1144 }
1145 } else {
1146 if (connection->state() != message_bridge::State::CONNECTED) {
1147 return false;
1148 }
1149 }
1150 }
1151 return true;
1152}
1153
1154bool AllConnected(const message_bridge::ClientStatistics *client_statistics) {
1155 for (const message_bridge::ClientConnection *connection :
1156 *client_statistics->connections()) {
1157 if (connection->state() != message_bridge::State::CONNECTED) {
1158 return false;
1159 }
Austin Schuh367a7f42021-11-23 23:04:36 -08001160 EXPECT_TRUE(connection->has_boot_uuid());
1161 EXPECT_TRUE(connection->has_connected_since_time());
1162 EXPECT_TRUE(connection->has_connection_count());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001163 }
1164 return true;
1165}
1166
1167bool AllConnectedBut(const message_bridge::ClientStatistics *client_statistics,
1168 std::string_view target) {
1169 for (const message_bridge::ClientConnection *connection :
1170 *client_statistics->connections()) {
1171 if (connection->node()->name()->string_view() == target) {
1172 if (connection->state() == message_bridge::State::CONNECTED) {
1173 return false;
1174 }
Austin Schuh367a7f42021-11-23 23:04:36 -08001175 EXPECT_FALSE(connection->has_boot_uuid());
1176 EXPECT_FALSE(connection->has_connected_since_time());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001177 } else {
1178 if (connection->state() != message_bridge::State::CONNECTED) {
1179 return false;
1180 }
Austin Schuh367a7f42021-11-23 23:04:36 -08001181 EXPECT_TRUE(connection->has_boot_uuid());
1182 EXPECT_TRUE(connection->has_connected_since_time());
1183 EXPECT_TRUE(connection->has_connection_count());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001184 }
1185 }
1186 return true;
1187}
1188
Austin Schuh367a7f42021-11-23 23:04:36 -08001189int ConnectedCount(const message_bridge::ClientStatistics *client_statistics,
1190 std::string_view target) {
1191 for (const message_bridge::ClientConnection *connection :
1192 *client_statistics->connections()) {
1193 if (connection->node()->name()->string_view() == target) {
1194 return connection->connection_count();
1195 }
1196 }
1197 return 0;
1198}
1199
1200int ConnectedCount(const message_bridge::ServerStatistics *server_statistics,
1201 std::string_view target) {
1202 for (const message_bridge::ServerConnection *connection :
1203 *server_statistics->connections()) {
1204 if (connection->node()->name()->string_view() == target) {
1205 return connection->connection_count();
1206 }
1207 }
1208 return 0;
1209}
1210
Austin Schuhc0b0f722020-12-12 18:36:06 -08001211// Test that disconnecting nodes actually disconnects them.
Austin Schuh89c9b812021-02-20 14:42:10 -08001212TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeDisconnect) {
Austin Schuhc0b0f722020-12-12 18:36:06 -08001213 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
1214
Austin Schuh58646e22021-08-23 23:51:46 -07001215 NodeEventLoopFactory *pi1 =
1216 simulated_event_loop_factory.GetNodeEventLoopFactory("pi1");
1217 NodeEventLoopFactory *pi2 =
1218 simulated_event_loop_factory.GetNodeEventLoopFactory("pi2");
1219 NodeEventLoopFactory *pi3 =
1220 simulated_event_loop_factory.GetNodeEventLoopFactory("pi3");
1221
1222 std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001223 Ping ping(ping_event_loop.get());
1224
Austin Schuh58646e22021-08-23 23:51:46 -07001225 std::unique_ptr<EventLoop> pong_event_loop = pi2->MakeEventLoop("pong");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001226 Pong pong(pong_event_loop.get());
1227
1228 std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
Austin Schuh58646e22021-08-23 23:51:46 -07001229 pi2->MakeEventLoop("pi2_pong_counter");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001230
1231 MessageCounter<examples::Pong> pi2_pong_counter(
1232 pi2_pong_counter_event_loop.get(), "/test");
1233
1234 std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
Austin Schuh58646e22021-08-23 23:51:46 -07001235 pi3->MakeEventLoop("pi3_pong_counter");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001236
1237 std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
Austin Schuh58646e22021-08-23 23:51:46 -07001238 pi1->MakeEventLoop("pi1_pong_counter");
Austin Schuhc0b0f722020-12-12 18:36:06 -08001239
1240 MessageCounter<examples::Pong> pi1_pong_counter(
1241 pi1_pong_counter_event_loop.get(), "/test");
1242
1243 // Count timestamps.
1244 MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
1245 pi1_pong_counter_event_loop.get(), "/pi1/aos");
1246 MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
1247 pi2_pong_counter_event_loop.get(), "/pi1/aos");
1248 MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
1249 pi3_pong_counter_event_loop.get(), "/pi1/aos");
1250 MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
1251 pi1_pong_counter_event_loop.get(), "/pi2/aos");
1252 MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
1253 pi2_pong_counter_event_loop.get(), "/pi2/aos");
1254 MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
1255 pi1_pong_counter_event_loop.get(), "/pi3/aos");
1256 MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
1257 pi3_pong_counter_event_loop.get(), "/pi3/aos");
1258
1259 // Count remote timestamps
Austin Schuh89c9b812021-02-20 14:42:10 -08001260 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1261 remote_timestamps_pi2_on_pi1 =
1262 MakePi2OnPi1MessageCounters(pi1_pong_counter_event_loop.get());
1263 std::vector<std::unique_ptr<MessageCounter<RemoteMessage>>>
1264 remote_timestamps_pi1_on_pi2 =
1265 MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001266
1267 MessageCounter<message_bridge::ServerStatistics>
Austin Schuh58646e22021-08-23 23:51:46 -07001268 *pi1_server_statistics_counter;
1269 pi1->OnStartup([pi1, &pi1_server_statistics_counter]() {
1270 pi1_server_statistics_counter =
1271 pi1->AlwaysStart<MessageCounter<message_bridge::ServerStatistics>>(
1272 "pi1_server_statistics_counter", "/pi1/aos");
1273 });
1274
Austin Schuhc0b0f722020-12-12 18:36:06 -08001275 aos::Fetcher<message_bridge::ServerStatistics> pi1_server_statistics_fetcher =
1276 pi1_pong_counter_event_loop
1277 ->MakeFetcher<message_bridge::ServerStatistics>("/pi1/aos");
1278 aos::Fetcher<message_bridge::ClientStatistics> pi1_client_statistics_fetcher =
1279 pi1_pong_counter_event_loop
1280 ->MakeFetcher<message_bridge::ClientStatistics>("/pi1/aos");
1281
1282 MessageCounter<message_bridge::ServerStatistics>
Austin Schuh58646e22021-08-23 23:51:46 -07001283 *pi2_server_statistics_counter;
1284 pi2->OnStartup([pi2, &pi2_server_statistics_counter]() {
1285 pi2_server_statistics_counter =
1286 pi2->AlwaysStart<MessageCounter<message_bridge::ServerStatistics>>(
1287 "pi2_server_statistics_counter", "/pi2/aos");
1288 });
Austin Schuhc0b0f722020-12-12 18:36:06 -08001289 aos::Fetcher<message_bridge::ServerStatistics> pi2_server_statistics_fetcher =
1290 pi2_pong_counter_event_loop
1291 ->MakeFetcher<message_bridge::ServerStatistics>("/pi2/aos");
1292 aos::Fetcher<message_bridge::ClientStatistics> pi2_client_statistics_fetcher =
1293 pi2_pong_counter_event_loop
1294 ->MakeFetcher<message_bridge::ClientStatistics>("/pi2/aos");
1295
1296 MessageCounter<message_bridge::ServerStatistics>
Austin Schuh58646e22021-08-23 23:51:46 -07001297 *pi3_server_statistics_counter;
1298 pi3->OnStartup([pi3, &pi3_server_statistics_counter]() {
1299 pi3_server_statistics_counter =
1300 pi3->AlwaysStart<MessageCounter<message_bridge::ServerStatistics>>(
1301 "pi3_server_statistics_counter", "/pi3/aos");
1302 });
Austin Schuhc0b0f722020-12-12 18:36:06 -08001303 aos::Fetcher<message_bridge::ServerStatistics> pi3_server_statistics_fetcher =
1304 pi3_pong_counter_event_loop
1305 ->MakeFetcher<message_bridge::ServerStatistics>("/pi3/aos");
1306 aos::Fetcher<message_bridge::ClientStatistics> pi3_client_statistics_fetcher =
1307 pi3_pong_counter_event_loop
1308 ->MakeFetcher<message_bridge::ClientStatistics>("/pi3/aos");
1309
1310 MessageCounter<message_bridge::ClientStatistics>
1311 pi1_client_statistics_counter(pi1_pong_counter_event_loop.get(),
1312 "/pi1/aos");
1313 MessageCounter<message_bridge::ClientStatistics>
1314 pi2_client_statistics_counter(pi2_pong_counter_event_loop.get(),
1315 "/pi2/aos");
1316 MessageCounter<message_bridge::ClientStatistics>
1317 pi3_client_statistics_counter(pi3_pong_counter_event_loop.get(),
1318 "/pi3/aos");
1319
James Kuszmaul86e86c32022-07-21 17:39:47 -07001320 std::vector<std::unique_ptr<aos::EventLoop>> statistics_watcher_loops;
1321 statistics_watcher_loops.emplace_back(pi1->MakeEventLoop("test"));
1322 statistics_watcher_loops.emplace_back(pi2->MakeEventLoop("test"));
1323 statistics_watcher_loops.emplace_back(pi3->MakeEventLoop("test"));
1324 // The currenct contract is that, if all nodes boot simultaneously in
1325 // simulation, that they should all act as if they area already connected,
1326 // without ever observing the transition from disconnected to connected (note
1327 // that on a real system the ServerStatistics message will get resent for each
1328 // and every new connection, even if the new connections happen
1329 // "simultaneously"--in simulation, we are essentially acting as if we are
1330 // starting execution in an already running system, rather than observing the
1331 // boot process).
1332 for (auto &event_loop : statistics_watcher_loops) {
1333 event_loop->MakeWatcher(
1334 "/aos", [](const message_bridge::ServerStatistics &msg) {
1335 for (const message_bridge::ServerConnection *connection :
1336 *msg.connections()) {
1337 EXPECT_EQ(message_bridge::State::CONNECTED, connection->state())
1338 << connection->node()->name()->string_view();
1339 }
1340 });
1341 }
1342
Austin Schuhc0b0f722020-12-12 18:36:06 -08001343 simulated_event_loop_factory.RunFor(chrono::seconds(2) +
1344 chrono::milliseconds(5));
1345
James Kuszmaul86e86c32022-07-21 17:39:47 -07001346 statistics_watcher_loops.clear();
1347
Austin Schuhc0b0f722020-12-12 18:36:06 -08001348 EXPECT_EQ(pi1_pong_counter.count(), 201u);
1349 EXPECT_EQ(pi2_pong_counter.count(), 201u);
1350
1351 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 20u);
1352 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 20u);
1353 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 20u);
1354 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 20u);
1355 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 20u);
1356 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 20u);
1357 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 20u);
1358
Austin Schuh58646e22021-08-23 23:51:46 -07001359 EXPECT_EQ(pi1_server_statistics_counter->count(), 2u);
1360 EXPECT_EQ(pi2_server_statistics_counter->count(), 2u);
1361 EXPECT_EQ(pi3_server_statistics_counter->count(), 2u);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001362
1363 EXPECT_EQ(pi1_client_statistics_counter.count(), 20u);
1364 EXPECT_EQ(pi2_client_statistics_counter.count(), 20u);
1365 EXPECT_EQ(pi3_client_statistics_counter.count(), 20u);
1366
1367 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001368 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 221);
1369 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 221);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001370
1371 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1372 EXPECT_TRUE(AllConnected(pi1_server_statistics_fetcher.get()))
1373 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1374 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1375 EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
1376 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1377 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1378 EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
1379 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1380 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1381 EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
1382 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1383 EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
1384 EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
1385 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
1386 EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
1387 EXPECT_TRUE(AllConnected(pi3_client_statistics_fetcher.get()))
1388 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1389
Austin Schuh58646e22021-08-23 23:51:46 -07001390 pi1->Disconnect(pi3->node());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001391
1392 simulated_event_loop_factory.RunFor(chrono::seconds(2));
1393
1394 EXPECT_EQ(pi1_pong_counter.count(), 401u);
1395 EXPECT_EQ(pi2_pong_counter.count(), 401u);
1396
1397 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 40u);
1398 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 40u);
1399 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 20u);
1400 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 40u);
1401 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 40u);
1402 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 40u);
1403 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 40u);
1404
Austin Schuh58646e22021-08-23 23:51:46 -07001405 EXPECT_EQ(pi1_server_statistics_counter->count(), 4u);
1406 EXPECT_EQ(pi2_server_statistics_counter->count(), 4u);
1407 EXPECT_EQ(pi3_server_statistics_counter->count(), 4u);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001408
1409 EXPECT_EQ(pi1_client_statistics_counter.count(), 40u);
1410 EXPECT_EQ(pi2_client_statistics_counter.count(), 40u);
1411 EXPECT_EQ(pi3_client_statistics_counter.count(), 40u);
1412
1413 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001414 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 441);
1415 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 441);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001416
1417 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1418 EXPECT_TRUE(AllConnectedBut(pi1_server_statistics_fetcher.get(), "pi3"))
1419 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1420 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1421 EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
1422 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1423 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1424 EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
1425 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1426 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1427 EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
1428 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1429 EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
1430 EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
1431 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
1432 EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
1433 EXPECT_TRUE(AllConnectedBut(pi3_client_statistics_fetcher.get(), "pi1"))
1434 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1435
Austin Schuh58646e22021-08-23 23:51:46 -07001436 pi1->Connect(pi3->node());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001437
1438 simulated_event_loop_factory.RunFor(chrono::seconds(2));
1439
Austin Schuh367a7f42021-11-23 23:04:36 -08001440 EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
1441 EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
1442 EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
1443 EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
1444 EXPECT_TRUE(pi3_server_statistics_fetcher.Fetch());
1445 EXPECT_TRUE(pi3_client_statistics_fetcher.Fetch());
1446
1447 EXPECT_EQ(ConnectedCount(pi1_server_statistics_fetcher.get(), "pi3"), 2u)
1448 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1449 EXPECT_EQ(ConnectedCount(pi1_server_statistics_fetcher.get(), "pi2"), 1u)
1450 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
1451 EXPECT_EQ(ConnectedCount(pi1_client_statistics_fetcher.get(), "pi3"), 1u)
1452 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1453 EXPECT_EQ(ConnectedCount(pi1_client_statistics_fetcher.get(), "pi2"), 1u)
1454 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
1455
1456 EXPECT_EQ(ConnectedCount(pi2_server_statistics_fetcher.get(), "pi1"), 1u)
1457 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
1458 EXPECT_EQ(ConnectedCount(pi2_client_statistics_fetcher.get(), "pi1"), 1u)
1459 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
1460
1461 EXPECT_EQ(ConnectedCount(pi3_server_statistics_fetcher.get(), "pi1"), 1u)
1462 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
1463 EXPECT_EQ(ConnectedCount(pi3_client_statistics_fetcher.get(), "pi1"), 2u)
1464 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1465
Austin Schuhc0b0f722020-12-12 18:36:06 -08001466 EXPECT_EQ(pi1_pong_counter.count(), 601u);
1467 EXPECT_EQ(pi2_pong_counter.count(), 601u);
1468
1469 EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 60u);
1470 EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 60u);
1471 EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 40u);
1472 EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 60u);
1473 EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 60u);
1474 EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 60u);
1475 EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 60u);
1476
Austin Schuh58646e22021-08-23 23:51:46 -07001477 EXPECT_EQ(pi1_server_statistics_counter->count(), 6u);
1478 EXPECT_EQ(pi2_server_statistics_counter->count(), 6u);
1479 EXPECT_EQ(pi3_server_statistics_counter->count(), 6u);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001480
1481 EXPECT_EQ(pi1_client_statistics_counter.count(), 60u);
1482 EXPECT_EQ(pi2_client_statistics_counter.count(), 60u);
1483 EXPECT_EQ(pi3_client_statistics_counter.count(), 60u);
1484
1485 // Also confirm that remote timestamps are being forwarded correctly.
Austin Schuh89c9b812021-02-20 14:42:10 -08001486 EXPECT_EQ(CountAll(remote_timestamps_pi2_on_pi1), 661);
1487 EXPECT_EQ(CountAll(remote_timestamps_pi1_on_pi2), 661);
Austin Schuhc0b0f722020-12-12 18:36:06 -08001488
Austin Schuhc0b0f722020-12-12 18:36:06 -08001489 EXPECT_TRUE(AllConnected(pi1_server_statistics_fetcher.get()))
1490 << " : " << aos::FlatbufferToJson(pi1_server_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001491 EXPECT_TRUE(AllConnected(pi1_client_statistics_fetcher.get()))
1492 << " : " << aos::FlatbufferToJson(pi1_client_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001493 EXPECT_TRUE(AllConnected(pi2_server_statistics_fetcher.get()))
1494 << " : " << aos::FlatbufferToJson(pi2_server_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001495 EXPECT_TRUE(AllConnected(pi2_client_statistics_fetcher.get()))
1496 << " : " << aos::FlatbufferToJson(pi2_client_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001497 EXPECT_TRUE(AllConnected(pi3_server_statistics_fetcher.get()))
1498 << " : " << aos::FlatbufferToJson(pi3_server_statistics_fetcher.get());
Austin Schuhc0b0f722020-12-12 18:36:06 -08001499 EXPECT_TRUE(AllConnected(pi3_client_statistics_fetcher.get()))
1500 << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
1501}
1502
Austin Schuh2febf0d2020-09-21 22:24:30 -07001503// Tests that the time offset having a slope doesn't break the world.
1504// SimulatedMessageBridge has enough self consistency CHECK statements to
1505// confirm, and we can can also check a message in each direction to make sure
1506// it gets delivered as expected.
1507TEST(SimulatedEventLoopTest, MultinodePingPongWithOffsetAndSlope) {
1508 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
Austin Schuh373f1762021-06-02 21:07:09 -07001509 aos::configuration::ReadConfig(ArtifactPath(
1510 "aos/events/multinode_pingpong_test_combined_config.json"));
Austin Schuh2febf0d2020-09-21 22:24:30 -07001511 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
Austin Schuh87dd3832021-01-01 23:07:31 -08001512 const size_t pi1_index = configuration::GetNodeIndex(&config.message(), pi1);
1513 ASSERT_EQ(pi1_index, 0u);
Austin Schuh2febf0d2020-09-21 22:24:30 -07001514 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
Austin Schuh87dd3832021-01-01 23:07:31 -08001515 const size_t pi2_index = configuration::GetNodeIndex(&config.message(), pi2);
1516 ASSERT_EQ(pi2_index, 1u);
1517 const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
1518 const size_t pi3_index = configuration::GetNodeIndex(&config.message(), pi3);
1519 ASSERT_EQ(pi3_index, 2u);
Austin Schuh2febf0d2020-09-21 22:24:30 -07001520
Austin Schuh87dd3832021-01-01 23:07:31 -08001521 message_bridge::TestingTimeConverter time(
1522 configuration::NodesCount(&config.message()));
Austin Schuh2febf0d2020-09-21 22:24:30 -07001523 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
Austin Schuh58646e22021-08-23 23:51:46 -07001524 simulated_event_loop_factory.SetTimeConverter(&time);
Austin Schuh2febf0d2020-09-21 22:24:30 -07001525
Austin Schuh2febf0d2020-09-21 22:24:30 -07001526 constexpr chrono::milliseconds kOffset{150100};
Austin Schuh87dd3832021-01-01 23:07:31 -08001527 time.AddNextTimestamp(
1528 distributed_clock::epoch(),
Austin Schuh58646e22021-08-23 23:51:46 -07001529 {BootTimestamp::epoch(), BootTimestamp::epoch() + kOffset,
1530 BootTimestamp::epoch()});
1531 time.AddNextTimestamp(distributed_clock::epoch() + chrono::seconds(10),
1532 {BootTimestamp::epoch() + chrono::milliseconds(9999),
1533 BootTimestamp::epoch() + kOffset + chrono::seconds(10),
1534 BootTimestamp::epoch() + chrono::milliseconds(9999)});
Austin Schuh2febf0d2020-09-21 22:24:30 -07001535
1536 std::unique_ptr<EventLoop> ping_event_loop =
1537 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
1538 Ping ping(ping_event_loop.get());
1539
1540 std::unique_ptr<EventLoop> pong_event_loop =
1541 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
1542 Pong pong(pong_event_loop.get());
1543
1544 std::unique_ptr<EventLoop> pi1_counter_event_loop =
1545 simulated_event_loop_factory.MakeEventLoop("pi1_counter", pi1);
1546 std::unique_ptr<EventLoop> pi2_counter_event_loop =
1547 simulated_event_loop_factory.MakeEventLoop("pi2_counter", pi2);
1548
1549 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
1550 pi1_counter_event_loop->MakeFetcher<examples::Ping>("/test");
1551 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
1552 pi2_counter_event_loop->MakeFetcher<examples::Ping>("/test");
1553
1554 aos::Fetcher<examples::Pong> pong_on_pi2_fetcher =
1555 pi2_counter_event_loop->MakeFetcher<examples::Pong>("/test");
1556 aos::Fetcher<examples::Pong> pong_on_pi1_fetcher =
1557 pi1_counter_event_loop->MakeFetcher<examples::Pong>("/test");
1558
1559 // End after a pong message comes back. This will leave the latest messages
1560 // on all channels so we can look at timestamps easily and check they make
1561 // sense.
1562 std::unique_ptr<EventLoop> pi1_pong_ender =
1563 simulated_event_loop_factory.MakeEventLoop("pi2_counter", pi1);
1564 int count = 0;
1565 pi1_pong_ender->MakeWatcher(
1566 "/test", [&simulated_event_loop_factory, &count](const examples::Pong &) {
1567 if (++count == 100) {
1568 simulated_event_loop_factory.Exit();
1569 }
1570 });
1571
1572 // Run enough that messages should be delivered.
1573 simulated_event_loop_factory.Run();
1574
1575 // Grab the latest messages.
1576 EXPECT_TRUE(ping_on_pi1_fetcher.Fetch());
1577 EXPECT_TRUE(ping_on_pi2_fetcher.Fetch());
1578 EXPECT_TRUE(pong_on_pi1_fetcher.Fetch());
1579 EXPECT_TRUE(pong_on_pi2_fetcher.Fetch());
1580
1581 // Compute their time on the global distributed clock so we can compute
1582 // distance betwen them.
1583 const distributed_clock::time_point pi1_ping_time =
1584 simulated_event_loop_factory.GetNodeEventLoopFactory(pi1)
1585 ->ToDistributedClock(
1586 ping_on_pi1_fetcher.context().monotonic_event_time);
1587 const distributed_clock::time_point pi2_ping_time =
1588 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
1589 ->ToDistributedClock(
1590 ping_on_pi2_fetcher.context().monotonic_event_time);
1591 const distributed_clock::time_point pi1_pong_time =
1592 simulated_event_loop_factory.GetNodeEventLoopFactory(pi1)
1593 ->ToDistributedClock(
1594 pong_on_pi1_fetcher.context().monotonic_event_time);
1595 const distributed_clock::time_point pi2_pong_time =
1596 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
1597 ->ToDistributedClock(
1598 pong_on_pi2_fetcher.context().monotonic_event_time);
1599
1600 // And confirm the delivery delay is just about exactly 150 uS for both
1601 // directions like expected. There will be a couple ns of rounding errors in
1602 // the conversion functions that aren't worth accounting for right now. This
1603 // will either be really close, or really far.
1604 EXPECT_GE(pi2_ping_time, chrono::microseconds(150) - chrono::nanoseconds(10) +
1605 pi1_ping_time);
1606 EXPECT_LE(pi2_ping_time, chrono::microseconds(150) + chrono::nanoseconds(10) +
1607 pi1_ping_time);
1608
1609 EXPECT_GE(pi1_pong_time, chrono::microseconds(150) - chrono::nanoseconds(10) +
1610 pi2_pong_time);
1611 EXPECT_LE(pi1_pong_time, chrono::microseconds(150) + chrono::nanoseconds(10) +
1612 pi2_pong_time);
1613}
1614
Austin Schuh4c570ea2020-11-19 23:13:24 -08001615void SendPing(aos::Sender<examples::Ping> *sender, int value) {
1616 aos::Sender<examples::Ping>::Builder builder = sender->MakeBuilder();
1617 examples::Ping::Builder ping_builder = builder.MakeBuilder<examples::Ping>();
1618 ping_builder.add_value(value);
milind1f1dca32021-07-03 13:50:07 -07001619 builder.CheckOk(builder.Send(ping_builder.Finish()));
Austin Schuh4c570ea2020-11-19 23:13:24 -08001620}
1621
1622// Tests that reliable (and unreliable) ping messages get forwarded as expected.
Austin Schuh89c9b812021-02-20 14:42:10 -08001623TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeStartupTesting) {
Austin Schuh4c570ea2020-11-19 23:13:24 -08001624 const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
1625 const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
1626
1627 SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
1628
1629 std::unique_ptr<EventLoop> ping_event_loop =
1630 simulated_event_loop_factory.MakeEventLoop("ping", pi1);
1631 aos::Sender<examples::Ping> pi1_reliable_sender =
1632 ping_event_loop->MakeSender<examples::Ping>("/reliable");
1633 aos::Sender<examples::Ping> pi1_unreliable_sender =
1634 ping_event_loop->MakeSender<examples::Ping>("/unreliable");
1635 SendPing(&pi1_reliable_sender, 1);
1636 SendPing(&pi1_unreliable_sender, 1);
1637
1638 std::unique_ptr<EventLoop> pi2_pong_event_loop =
1639 simulated_event_loop_factory.MakeEventLoop("pong", pi2);
James Kuszmaul86e86c32022-07-21 17:39:47 -07001640 aos::Sender<examples::Ping> pi2_reliable_sender =
1641 pi2_pong_event_loop->MakeSender<examples::Ping>("/reliable2");
1642 SendPing(&pi2_reliable_sender, 1);
Austin Schuh4c570ea2020-11-19 23:13:24 -08001643 MessageCounter<examples::Ping> pi2_reliable_counter(pi2_pong_event_loop.get(),
1644 "/reliable");
James Kuszmaul86e86c32022-07-21 17:39:47 -07001645 MessageCounter<examples::Ping> pi1_reliable_counter(ping_event_loop.get(),
1646 "/reliable2");
Austin Schuh4c570ea2020-11-19 23:13:24 -08001647 MessageCounter<examples::Ping> pi2_unreliable_counter(
1648 pi2_pong_event_loop.get(), "/unreliable");
1649 aos::Fetcher<examples::Ping> reliable_on_pi2_fetcher =
1650 pi2_pong_event_loop->MakeFetcher<examples::Ping>("/reliable");
1651 aos::Fetcher<examples::Ping> unreliable_on_pi2_fetcher =
1652 pi2_pong_event_loop->MakeFetcher<examples::Ping>("/unreliable");
1653
1654 const size_t reliable_channel_index = configuration::ChannelIndex(
1655 pi2_pong_event_loop->configuration(), reliable_on_pi2_fetcher.channel());
1656
1657 std::unique_ptr<EventLoop> pi1_remote_timestamp =
1658 simulated_event_loop_factory.MakeEventLoop("pi1_remote_timestamp", pi1);
1659
Austin Schuheeaa2022021-01-02 21:52:03 -08001660 const chrono::nanoseconds network_delay =
1661 simulated_event_loop_factory.network_delay();
1662
Austin Schuh4c570ea2020-11-19 23:13:24 -08001663 int reliable_timestamp_count = 0;
1664 pi1_remote_timestamp->MakeWatcher(
Austin Schuh89c9b812021-02-20 14:42:10 -08001665 shared() ? "/pi1/aos/remote_timestamps/pi2"
1666 : "/pi1/aos/remote_timestamps/pi2/reliable/aos-examples-Ping",
Austin Schuh20ac95d2020-12-05 17:24:19 -08001667 [reliable_channel_index, &reliable_timestamp_count,
Austin Schuheeaa2022021-01-02 21:52:03 -08001668 &simulated_event_loop_factory, pi2, network_delay, &pi2_pong_event_loop,
1669 &pi1_remote_timestamp](const RemoteMessage &header) {
Austin Schuh20ac95d2020-12-05 17:24:19 -08001670 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuhcdd90272021-03-15 12:46:16 -07001671 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
Austin Schuh20ac95d2020-12-05 17:24:19 -08001672 simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
Austin Schuhcdd90272021-03-15 12:46:16 -07001673 ->boot_uuid());
Austin Schuh4c570ea2020-11-19 23:13:24 -08001674 VLOG(1) << aos::FlatbufferToJson(&header);
1675 if (header.channel_index() == reliable_channel_index) {
1676 ++reliable_timestamp_count;
1677 }
Austin Schuheeaa2022021-01-02 21:52:03 -08001678
1679 const aos::monotonic_clock::time_point header_monotonic_sent_time(
1680 chrono::nanoseconds(header.monotonic_sent_time()));
1681
1682 EXPECT_EQ(pi1_remote_timestamp->context().monotonic_event_time,
1683 header_monotonic_sent_time + network_delay +
1684 (pi1_remote_timestamp->monotonic_now() -
1685 pi2_pong_event_loop->monotonic_now()));
Austin Schuh4c570ea2020-11-19 23:13:24 -08001686 });
1687
1688 // Wait to let timestamp estimation start up before looking for the results.
1689 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
1690
1691 EXPECT_EQ(pi2_reliable_counter.count(), 1u);
1692 // This one isn't reliable, but was sent before the start. It should *not* be
1693 // delivered.
1694 EXPECT_EQ(pi2_unreliable_counter.count(), 0u);
1695 // Confirm we got a timestamp logged for the message that was forwarded.
1696 EXPECT_EQ(reliable_timestamp_count, 1u);
1697
1698 SendPing(&pi1_reliable_sender, 2);
1699 SendPing(&pi1_unreliable_sender, 2);
1700 simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
1701 EXPECT_EQ(pi2_reliable_counter.count(), 2u);
James Kuszmaul86e86c32022-07-21 17:39:47 -07001702 EXPECT_EQ(pi1_reliable_counter.count(), 1u);
Austin Schuh4c570ea2020-11-19 23:13:24 -08001703 EXPECT_EQ(pi2_unreliable_counter.count(), 1u);
1704
1705 EXPECT_EQ(reliable_timestamp_count, 2u);
1706}
1707
Austin Schuh20ac95d2020-12-05 17:24:19 -08001708// Tests that rebooting a node changes the ServerStatistics message and the
1709// RemoteTimestamp message.
Austin Schuh89c9b812021-02-20 14:42:10 -08001710TEST_P(RemoteMessageSimulatedEventLoopTest, BootUUIDTest) {
Austin Schuh72e65682021-09-02 11:37:05 -07001711 const UUID pi1_boot0 = UUID::Random();
1712 const UUID pi2_boot0 = UUID::Random();
1713 const UUID pi2_boot1 = UUID::Random();
1714 const UUID pi3_boot0 = UUID::Random();
1715 UUID expected_boot_uuid = pi2_boot0;
Austin Schuh20ac95d2020-12-05 17:24:19 -08001716
Austin Schuh58646e22021-08-23 23:51:46 -07001717 message_bridge::TestingTimeConverter time(
1718 configuration::NodesCount(&config.message()));
1719 SimulatedEventLoopFactory factory(&config.message());
1720 factory.SetTimeConverter(&time);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001721
Austin Schuh58646e22021-08-23 23:51:46 -07001722 const size_t pi1_index =
1723 configuration::GetNodeIndex(&config.message(), "pi1");
1724 const size_t pi2_index =
1725 configuration::GetNodeIndex(&config.message(), "pi2");
1726 const size_t pi3_index =
1727 configuration::GetNodeIndex(&config.message(), "pi3");
Austin Schuh20ac95d2020-12-05 17:24:19 -08001728
Austin Schuh58646e22021-08-23 23:51:46 -07001729 {
1730 time.AddNextTimestamp(distributed_clock::epoch(),
1731 {BootTimestamp::epoch(), BootTimestamp::epoch(),
1732 BootTimestamp::epoch()});
1733
1734 const chrono::nanoseconds dt = chrono::milliseconds(2001);
1735
1736 time.AddNextTimestamp(
1737 distributed_clock::epoch() + dt,
1738 {BootTimestamp::epoch() + dt,
1739 BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
1740 BootTimestamp::epoch() + dt});
1741
1742 time.set_boot_uuid(pi1_index, 0, pi1_boot0);
1743 time.set_boot_uuid(pi2_index, 0, pi2_boot0);
1744 time.set_boot_uuid(pi2_index, 1, pi2_boot1);
1745 time.set_boot_uuid(pi3_index, 0, pi3_boot0);
1746 }
1747
1748 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1749 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
1750
1751 pi1->OnStartup([pi1]() { pi1->AlwaysStart<Ping>("ping"); });
1752 pi2->OnStartup([pi2]() { pi2->AlwaysStart<Pong>("pong"); });
Austin Schuh20ac95d2020-12-05 17:24:19 -08001753
1754 std::unique_ptr<EventLoop> pi1_remote_timestamp =
Austin Schuh58646e22021-08-23 23:51:46 -07001755 pi1->MakeEventLoop("pi1_remote_timestamp");
Austin Schuh20ac95d2020-12-05 17:24:19 -08001756
1757 int timestamp_count = 0;
1758 pi1_remote_timestamp->MakeWatcher(
Austin Schuh8902fa52021-03-14 22:39:24 -07001759 "/pi2/aos", [&expected_boot_uuid,
1760 &pi1_remote_timestamp](const message_bridge::Timestamp &) {
Austin Schuha9012be2021-07-21 15:19:11 -07001761 EXPECT_EQ(pi1_remote_timestamp->context().source_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -07001762 expected_boot_uuid);
1763 });
1764 pi1_remote_timestamp->MakeWatcher(
1765 "/test",
1766 [&expected_boot_uuid, &pi1_remote_timestamp](const examples::Pong &) {
Austin Schuha9012be2021-07-21 15:19:11 -07001767 EXPECT_EQ(pi1_remote_timestamp->context().source_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -07001768 expected_boot_uuid);
1769 });
1770 pi1_remote_timestamp->MakeWatcher(
Austin Schuh89c9b812021-02-20 14:42:10 -08001771 shared() ? "/pi1/aos/remote_timestamps/pi2"
1772 : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping",
Austin Schuh20ac95d2020-12-05 17:24:19 -08001773 [&timestamp_count, &expected_boot_uuid](const RemoteMessage &header) {
1774 EXPECT_TRUE(header.has_boot_uuid());
Austin Schuhcdd90272021-03-15 12:46:16 -07001775 EXPECT_EQ(UUID::FromVector(header.boot_uuid()), expected_boot_uuid);
Austin Schuh20ac95d2020-12-05 17:24:19 -08001776 VLOG(1) << aos::FlatbufferToJson(&header);
1777 ++timestamp_count;
1778 });
1779
1780 int pi1_server_statistics_count = 0;
Austin Schuh58646e22021-08-23 23:51:46 -07001781 bool first_pi1_server_statistics = true;
Austin Schuh367a7f42021-11-23 23:04:36 -08001782 int boot_number = 0;
1783 monotonic_clock::time_point expected_connection_time = pi1->monotonic_now();
Austin Schuh20ac95d2020-12-05 17:24:19 -08001784 pi1_remote_timestamp->MakeWatcher(
Austin Schuh367a7f42021-11-23 23:04:36 -08001785 "/pi1/aos",
1786 [&pi1_server_statistics_count, &expected_boot_uuid,
1787 &expected_connection_time, &first_pi1_server_statistics,
1788 &boot_number](const message_bridge::ServerStatistics &stats) {
Austin Schuh20ac95d2020-12-05 17:24:19 -08001789 VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
1790 for (const message_bridge::ServerConnection *connection :
1791 *stats.connections()) {
Austin Schuh58646e22021-08-23 23:51:46 -07001792 if (connection->state() == message_bridge::State::CONNECTED) {
1793 ASSERT_TRUE(connection->has_boot_uuid());
1794 }
1795 if (!first_pi1_server_statistics) {
1796 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1797 }
Austin Schuh20ac95d2020-12-05 17:24:19 -08001798 if (connection->node()->name()->string_view() == "pi2") {
Austin Schuh58646e22021-08-23 23:51:46 -07001799 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1800 ASSERT_TRUE(connection->has_boot_uuid());
Austin Schuh20ac95d2020-12-05 17:24:19 -08001801 EXPECT_EQ(expected_boot_uuid,
Austin Schuh8902fa52021-03-14 22:39:24 -07001802 UUID::FromString(connection->boot_uuid()))
Austin Schuh20ac95d2020-12-05 17:24:19 -08001803 << " : Got " << aos::FlatbufferToJson(&stats);
Austin Schuh367a7f42021-11-23 23:04:36 -08001804 EXPECT_EQ(monotonic_clock::time_point(chrono::nanoseconds(
1805 connection->connected_since_time())),
1806 expected_connection_time);
1807 EXPECT_EQ(boot_number + 1, connection->connection_count());
Austin Schuh20ac95d2020-12-05 17:24:19 -08001808 ++pi1_server_statistics_count;
1809 }
1810 }
Austin Schuh58646e22021-08-23 23:51:46 -07001811 first_pi1_server_statistics = false;
Austin Schuh20ac95d2020-12-05 17:24:19 -08001812 });
1813
Austin Schuh58646e22021-08-23 23:51:46 -07001814 int pi1_client_statistics_count = 0;
1815 pi1_remote_timestamp->MakeWatcher(
Austin Schuh367a7f42021-11-23 23:04:36 -08001816 "/pi1/aos", [&pi1_client_statistics_count, &expected_boot_uuid,
1817 &expected_connection_time, &boot_number](
Austin Schuh58646e22021-08-23 23:51:46 -07001818 const message_bridge::ClientStatistics &stats) {
1819 VLOG(1) << "pi1 ClientStatistics " << FlatbufferToJson(&stats);
1820 for (const message_bridge::ClientConnection *connection :
1821 *stats.connections()) {
1822 EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
1823 if (connection->node()->name()->string_view() == "pi2") {
1824 ++pi1_client_statistics_count;
Austin Schuh367a7f42021-11-23 23:04:36 -08001825 EXPECT_EQ(expected_boot_uuid,
1826 UUID::FromString(connection->boot_uuid()))
1827 << " : Got " << aos::FlatbufferToJson(&stats);
1828 EXPECT_EQ(monotonic_clock::time_point(chrono::nanoseconds(
1829 connection->connected_since_time())),
1830 expected_connection_time);
1831 EXPECT_EQ(boot_number + 1, connection->connection_count());
1832 } else {
1833 EXPECT_EQ(connection->connected_since_time(), 0);
1834 EXPECT_EQ(1, connection->connection_count());
Austin Schuh58646e22021-08-23 23:51:46 -07001835 }
1836 }
1837 });
1838
1839 // Confirm that reboot changes the UUID.
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -07001840 pi2->OnShutdown([&expected_boot_uuid, &boot_number, &expected_connection_time,
1841 pi1, pi2, pi2_boot1]() {
1842 expected_boot_uuid = pi2_boot1;
1843 ++boot_number;
1844 LOG(INFO) << "OnShutdown triggered for pi2";
1845 pi2->OnStartup(
1846 [&expected_boot_uuid, &expected_connection_time, pi1, pi2]() {
1847 EXPECT_EQ(expected_boot_uuid, pi2->boot_uuid());
1848 expected_connection_time = pi1->monotonic_now();
1849 });
1850 });
Austin Schuh58646e22021-08-23 23:51:46 -07001851
Austin Schuh20ac95d2020-12-05 17:24:19 -08001852 // Let a couple of ServerStatistics messages show up before rebooting.
Austin Schuh58646e22021-08-23 23:51:46 -07001853 factory.RunFor(chrono::milliseconds(2002));
Austin Schuh20ac95d2020-12-05 17:24:19 -08001854
1855 EXPECT_GT(timestamp_count, 100);
1856 EXPECT_GE(pi1_server_statistics_count, 1u);
1857
Austin Schuh20ac95d2020-12-05 17:24:19 -08001858 timestamp_count = 0;
1859 pi1_server_statistics_count = 0;
1860
Austin Schuh58646e22021-08-23 23:51:46 -07001861 factory.RunFor(chrono::milliseconds(2000));
Austin Schuh20ac95d2020-12-05 17:24:19 -08001862 EXPECT_GT(timestamp_count, 100);
1863 EXPECT_GE(pi1_server_statistics_count, 1u);
1864}
1865
James Kuszmaulf4bf9fe2021-05-10 22:58:24 -07001866INSTANTIATE_TEST_SUITE_P(
Austin Schuh89c9b812021-02-20 14:42:10 -08001867 All, RemoteMessageSimulatedEventLoopTest,
1868 ::testing::Values(
1869 Param{"multinode_pingpong_test_combined_config.json", true},
1870 Param{"multinode_pingpong_test_split_config.json", false}));
1871
Austin Schuh58646e22021-08-23 23:51:46 -07001872// Tests that Startup and Shutdown do reasonable things.
1873TEST(SimulatedEventLoopTest, MultinodePingPongStartup) {
1874 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
1875 aos::configuration::ReadConfig(
1876 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
1877
Austin Schuh72e65682021-09-02 11:37:05 -07001878 size_t pi1_shutdown_counter = 0;
1879 size_t pi2_shutdown_counter = 0;
1880 MessageCounter<examples::Pong> *pi1_pong_counter = nullptr;
1881 MessageCounter<examples::Ping> *pi2_ping_counter = nullptr;
1882
Austin Schuh58646e22021-08-23 23:51:46 -07001883 message_bridge::TestingTimeConverter time(
1884 configuration::NodesCount(&config.message()));
1885 SimulatedEventLoopFactory factory(&config.message());
1886 factory.SetTimeConverter(&time);
1887 time.AddNextTimestamp(
1888 distributed_clock::epoch(),
1889 {BootTimestamp::epoch(), BootTimestamp::epoch(), BootTimestamp::epoch()});
1890
1891 const chrono::nanoseconds dt = chrono::seconds(10) + chrono::milliseconds(6);
1892
1893 time.AddNextTimestamp(
1894 distributed_clock::epoch() + dt,
1895 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
1896 BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
1897 BootTimestamp::epoch() + dt});
1898
1899 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1900 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
1901
1902 // Configure startup to start Ping and Pong, and count.
1903 size_t pi1_startup_counter = 0;
1904 size_t pi2_startup_counter = 0;
1905 pi1->OnStartup([pi1]() {
1906 LOG(INFO) << "Made ping";
1907 pi1->AlwaysStart<Ping>("ping");
1908 });
1909 pi1->OnStartup([&pi1_startup_counter]() { ++pi1_startup_counter; });
1910 pi2->OnStartup([pi2]() {
1911 LOG(INFO) << "Made pong";
1912 pi2->AlwaysStart<Pong>("pong");
1913 });
1914 pi2->OnStartup([&pi2_startup_counter]() { ++pi2_startup_counter; });
1915
1916 // Shutdown just counts.
Austin Schuh58646e22021-08-23 23:51:46 -07001917 pi1->OnShutdown([&pi1_shutdown_counter]() { ++pi1_shutdown_counter; });
1918 pi2->OnShutdown([&pi2_shutdown_counter]() { ++pi2_shutdown_counter; });
1919
Austin Schuh58646e22021-08-23 23:51:46 -07001920 // Automatically make counters on startup.
1921 pi1->OnStartup([&pi1_pong_counter, pi1]() {
1922 pi1_pong_counter = pi1->AlwaysStart<MessageCounter<examples::Pong>>(
1923 "pi1_pong_counter", "/test");
1924 });
1925 pi1->OnShutdown([&pi1_pong_counter]() { pi1_pong_counter = nullptr; });
1926 pi2->OnStartup([&pi2_ping_counter, pi2]() {
1927 pi2_ping_counter = pi2->AlwaysStart<MessageCounter<examples::Ping>>(
1928 "pi2_ping_counter", "/test");
1929 });
1930 pi2->OnShutdown([&pi2_ping_counter]() { pi2_ping_counter = nullptr; });
1931
1932 EXPECT_EQ(pi2_ping_counter, nullptr);
1933 EXPECT_EQ(pi1_pong_counter, nullptr);
1934
1935 EXPECT_EQ(pi1_startup_counter, 0u);
1936 EXPECT_EQ(pi2_startup_counter, 0u);
1937 EXPECT_EQ(pi1_shutdown_counter, 0u);
1938 EXPECT_EQ(pi2_shutdown_counter, 0u);
1939
1940 factory.RunFor(chrono::seconds(10) + chrono::milliseconds(5));
1941 EXPECT_EQ(pi1_startup_counter, 1u);
1942 EXPECT_EQ(pi2_startup_counter, 1u);
1943 EXPECT_EQ(pi1_shutdown_counter, 0u);
1944 EXPECT_EQ(pi2_shutdown_counter, 0u);
1945 EXPECT_EQ(pi2_ping_counter->count(), 1001);
1946 EXPECT_EQ(pi1_pong_counter->count(), 1001);
1947
1948 LOG(INFO) << pi1->monotonic_now();
1949 LOG(INFO) << pi2->monotonic_now();
1950
1951 factory.RunFor(chrono::seconds(5) + chrono::milliseconds(5));
1952
1953 EXPECT_EQ(pi1_startup_counter, 2u);
1954 EXPECT_EQ(pi2_startup_counter, 2u);
1955 EXPECT_EQ(pi1_shutdown_counter, 1u);
1956 EXPECT_EQ(pi2_shutdown_counter, 1u);
1957 EXPECT_EQ(pi2_ping_counter->count(), 501);
1958 EXPECT_EQ(pi1_pong_counter->count(), 501);
1959}
1960
1961// Tests that OnStartup handlers can be added after running and get called, and
1962// can't be called when running.
1963TEST(SimulatedEventLoopDeathTest, OnStartupWhileRunning) {
1964 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
1965 aos::configuration::ReadConfig(
1966 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
1967
1968 // Test that we can add startup handlers as long as we aren't running, and
1969 // they get run when Run gets called again.
1970 // Test that adding a startup handler when running fails.
1971 //
1972 // Test shutdown handlers get called on destruction.
1973 SimulatedEventLoopFactory factory(&config.message());
1974
1975 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
1976
1977 int startup_count0 = 0;
1978 int startup_count1 = 0;
1979
1980 pi1->OnStartup([&]() { ++startup_count0; });
1981 EXPECT_EQ(startup_count0, 0);
1982 EXPECT_EQ(startup_count1, 0);
1983
1984 factory.RunFor(chrono::nanoseconds(1));
1985 EXPECT_EQ(startup_count0, 1);
1986 EXPECT_EQ(startup_count1, 0);
1987
1988 pi1->OnStartup([&]() { ++startup_count1; });
1989 EXPECT_EQ(startup_count0, 1);
1990 EXPECT_EQ(startup_count1, 0);
1991
1992 factory.RunFor(chrono::nanoseconds(1));
1993 EXPECT_EQ(startup_count0, 1);
1994 EXPECT_EQ(startup_count1, 1);
1995
1996 std::unique_ptr<EventLoop> loop = pi1->MakeEventLoop("foo");
1997 loop->OnRun([&]() { pi1->OnStartup([]() {}); });
1998
1999 EXPECT_DEATH({ factory.RunFor(chrono::nanoseconds(1)); },
2000 "Can only register OnStartup handlers when not running.");
2001}
2002
2003// Tests that OnStartup handlers can be added after running and get called, and
2004// all the handlers get called on reboot. Shutdown handlers are tested the same
2005// way.
2006TEST(SimulatedEventLoopTest, OnStartupShutdownAllRestarts) {
2007 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2008 aos::configuration::ReadConfig(
2009 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2010
Austin Schuh72e65682021-09-02 11:37:05 -07002011 int startup_count0 = 0;
2012 int shutdown_count0 = 0;
2013 int startup_count1 = 0;
2014 int shutdown_count1 = 0;
2015
Austin Schuh58646e22021-08-23 23:51:46 -07002016 message_bridge::TestingTimeConverter time(
2017 configuration::NodesCount(&config.message()));
2018 SimulatedEventLoopFactory factory(&config.message());
2019 factory.SetTimeConverter(&time);
2020 time.StartEqual();
2021
2022 const chrono::nanoseconds dt = chrono::seconds(10);
2023 time.RebootAt(0, distributed_clock::epoch() + dt);
2024 time.RebootAt(0, distributed_clock::epoch() + 2 * dt);
2025
2026 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2027
Austin Schuh58646e22021-08-23 23:51:46 -07002028 pi1->OnStartup([&]() { ++startup_count0; });
2029 pi1->OnShutdown([&]() { ++shutdown_count0; });
2030 EXPECT_EQ(startup_count0, 0);
2031 EXPECT_EQ(startup_count1, 0);
2032 EXPECT_EQ(shutdown_count0, 0);
2033 EXPECT_EQ(shutdown_count1, 0);
2034
2035 factory.RunFor(chrono::nanoseconds(1));
2036 EXPECT_EQ(startup_count0, 1);
2037 EXPECT_EQ(startup_count1, 0);
2038 EXPECT_EQ(shutdown_count0, 0);
2039 EXPECT_EQ(shutdown_count1, 0);
2040
2041 pi1->OnStartup([&]() { ++startup_count1; });
2042 EXPECT_EQ(startup_count0, 1);
2043 EXPECT_EQ(startup_count1, 0);
2044 EXPECT_EQ(shutdown_count0, 0);
2045 EXPECT_EQ(shutdown_count1, 0);
2046
2047 factory.RunFor(chrono::nanoseconds(1));
2048 EXPECT_EQ(startup_count0, 1);
2049 EXPECT_EQ(startup_count1, 1);
2050 EXPECT_EQ(shutdown_count0, 0);
2051 EXPECT_EQ(shutdown_count1, 0);
2052
2053 factory.RunFor(chrono::seconds(15));
2054
2055 EXPECT_EQ(startup_count0, 2);
2056 EXPECT_EQ(startup_count1, 2);
2057 EXPECT_EQ(shutdown_count0, 1);
2058 EXPECT_EQ(shutdown_count1, 0);
2059
2060 pi1->OnShutdown([&]() { ++shutdown_count1; });
2061 factory.RunFor(chrono::seconds(10));
2062
2063 EXPECT_EQ(startup_count0, 3);
2064 EXPECT_EQ(startup_count1, 3);
2065 EXPECT_EQ(shutdown_count0, 2);
2066 EXPECT_EQ(shutdown_count1, 1);
2067}
2068
2069// Tests that event loops which outlive shutdown crash.
2070TEST(SimulatedEventLoopDeathTest, EventLoopOutlivesReboot) {
2071 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2072 aos::configuration::ReadConfig(
2073 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2074
2075 message_bridge::TestingTimeConverter time(
2076 configuration::NodesCount(&config.message()));
2077 SimulatedEventLoopFactory factory(&config.message());
2078 factory.SetTimeConverter(&time);
2079 time.StartEqual();
2080
2081 const chrono::nanoseconds dt = chrono::seconds(10);
2082 time.RebootAt(0, distributed_clock::epoch() + dt);
2083
2084 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2085
2086 std::unique_ptr<EventLoop> loop = pi1->MakeEventLoop("foo");
2087
2088 EXPECT_DEATH({ factory.RunFor(dt * 2); }, "Event loop");
2089}
2090
Brian Silvermane1fe2512022-08-14 23:18:50 -07002091// Test that an ExitHandle outliving its factory is caught.
2092TEST(SimulatedEventLoopDeathTest, ExitHandleOutlivesFactory) {
2093 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2094 aos::configuration::ReadConfig(
2095 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2096 auto factory = std::make_unique<SimulatedEventLoopFactory>(&config.message());
2097 NodeEventLoopFactory *pi1 = factory->GetNodeEventLoopFactory("pi1");
2098 std::unique_ptr<EventLoop> loop = pi1->MakeEventLoop("foo");
2099 auto exit_handle = factory->MakeExitHandle();
2100 EXPECT_DEATH(factory.reset(),
2101 "All ExitHandles must be destroyed before the factory");
2102}
2103
Austin Schuh3e31f912023-08-21 21:29:10 -07002104// Test that AllowApplicationCreationDuring can't happen in OnRun callbacks.
2105TEST(SimulatedEventLoopDeathTest, AllowApplicationCreationDuringInOnRun) {
2106 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2107 aos::configuration::ReadConfig(
2108 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2109 auto factory = std::make_unique<SimulatedEventLoopFactory>(&config.message());
2110 NodeEventLoopFactory *pi1 = factory->GetNodeEventLoopFactory("pi1");
2111 std::unique_ptr<EventLoop> loop = pi1->MakeEventLoop("foo");
2112 loop->OnRun([&]() { factory->AllowApplicationCreationDuring([]() {}); });
2113 EXPECT_DEATH(factory->RunFor(chrono::seconds(1)), "OnRun");
2114}
2115
Austin Schuh58646e22021-08-23 23:51:46 -07002116// Tests that messages don't survive a reboot of a node.
2117TEST(SimulatedEventLoopTest, ChannelClearedOnReboot) {
2118 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2119 aos::configuration::ReadConfig(
2120 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2121
2122 message_bridge::TestingTimeConverter time(
2123 configuration::NodesCount(&config.message()));
2124 SimulatedEventLoopFactory factory(&config.message());
2125 factory.SetTimeConverter(&time);
2126 time.StartEqual();
2127
2128 const chrono::nanoseconds dt = chrono::seconds(10);
2129 time.RebootAt(0, distributed_clock::epoch() + dt);
2130
2131 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2132
2133 const UUID boot_uuid = pi1->boot_uuid();
2134 EXPECT_NE(boot_uuid, UUID::Zero());
2135
2136 {
2137 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
2138 aos::Sender<examples::Ping> test_message_sender =
2139 ping_event_loop->MakeSender<examples::Ping>("/reliable");
2140 SendPing(&test_message_sender, 1);
2141 }
2142
2143 factory.RunFor(chrono::seconds(5));
2144
2145 {
2146 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
2147 aos::Fetcher<examples::Ping> fetcher =
2148 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
2149 EXPECT_TRUE(fetcher.Fetch());
2150 }
2151
2152 factory.RunFor(chrono::seconds(10));
2153
2154 {
2155 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
2156 aos::Fetcher<examples::Ping> fetcher =
2157 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
2158 EXPECT_FALSE(fetcher.Fetch());
2159 }
2160 EXPECT_NE(boot_uuid, pi1->boot_uuid());
2161}
2162
2163// Tests that reliable messages get resent on reboot.
2164TEST(SimulatedEventLoopTest, ReliableMessageResentOnReboot) {
2165 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2166 aos::configuration::ReadConfig(
2167 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2168
2169 message_bridge::TestingTimeConverter time(
2170 configuration::NodesCount(&config.message()));
2171 SimulatedEventLoopFactory factory(&config.message());
2172 factory.SetTimeConverter(&time);
2173 time.StartEqual();
2174
2175 const chrono::nanoseconds dt = chrono::seconds(1);
2176 time.RebootAt(1, distributed_clock::epoch() + dt);
2177
2178 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2179 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2180
2181 const UUID pi1_boot_uuid = pi1->boot_uuid();
2182 const UUID pi2_boot_uuid = pi2->boot_uuid();
2183 EXPECT_NE(pi1_boot_uuid, UUID::Zero());
2184 EXPECT_NE(pi2_boot_uuid, UUID::Zero());
2185
2186 {
2187 ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
2188 aos::Sender<examples::Ping> test_message_sender =
2189 ping_event_loop->MakeSender<examples::Ping>("/reliable");
2190 SendPing(&test_message_sender, 1);
2191 }
2192
2193 factory.RunFor(chrono::milliseconds(500));
2194
2195 {
2196 ::std::unique_ptr<EventLoop> ping_event_loop = pi2->MakeEventLoop("pong");
2197 aos::Fetcher<examples::Ping> fetcher =
2198 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
2199 EXPECT_TRUE(fetcher.Fetch());
2200 }
2201
2202 factory.RunFor(chrono::seconds(1));
2203
2204 {
2205 ::std::unique_ptr<EventLoop> ping_event_loop = pi2->MakeEventLoop("pong");
2206 aos::Fetcher<examples::Ping> fetcher =
2207 ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
2208 EXPECT_TRUE(fetcher.Fetch());
2209 }
2210 EXPECT_NE(pi2_boot_uuid, pi2->boot_uuid());
2211}
2212
James Kuszmaul86e86c32022-07-21 17:39:47 -07002213TEST(SimulatedEventLoopTest, ReliableMessageSentOnStaggeredBoot) {
2214 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2215 aos::configuration::ReadConfig(
2216 ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
2217
2218 message_bridge::TestingTimeConverter time(
2219 configuration::NodesCount(&config.message()));
2220 time.AddNextTimestamp(
2221 distributed_clock::epoch(),
2222 {BootTimestamp{0, monotonic_clock::epoch()},
2223 BootTimestamp{0, monotonic_clock::epoch() - chrono::seconds(1)},
2224 BootTimestamp{0, monotonic_clock::epoch()}});
2225 SimulatedEventLoopFactory factory(&config.message());
2226 factory.SetTimeConverter(&time);
2227
2228 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2229 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2230
2231 const UUID pi1_boot_uuid = pi1->boot_uuid();
2232 const UUID pi2_boot_uuid = pi2->boot_uuid();
2233 EXPECT_NE(pi1_boot_uuid, UUID::Zero());
2234 EXPECT_NE(pi2_boot_uuid, UUID::Zero());
2235
2236 {
2237 ::std::unique_ptr<EventLoop> pi1_event_loop = pi1->MakeEventLoop("ping");
2238 aos::Sender<examples::Ping> pi1_sender =
2239 pi1_event_loop->MakeSender<examples::Ping>("/reliable");
2240 SendPing(&pi1_sender, 1);
2241 }
2242 ::std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("ping");
2243 aos::Sender<examples::Ping> pi2_sender =
2244 pi2_event_loop->MakeSender<examples::Ping>("/reliable2");
2245 SendPing(&pi2_sender, 1);
2246 // Verify that we staggered the OnRun callback correctly.
2247 pi2_event_loop->OnRun([pi1, pi2]() {
2248 EXPECT_EQ(pi1->monotonic_now(),
2249 monotonic_clock::epoch() + std::chrono::seconds(1));
2250 EXPECT_EQ(pi2->monotonic_now(), monotonic_clock::epoch());
2251 });
2252
2253 factory.RunFor(chrono::seconds(2));
2254
2255 {
2256 ::std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("pong");
2257 aos::Fetcher<examples::Ping> fetcher =
2258 pi2_event_loop->MakeFetcher<examples::Ping>("/reliable");
2259 ASSERT_TRUE(fetcher.Fetch());
2260 EXPECT_EQ(fetcher.context().monotonic_event_time,
2261 monotonic_clock::epoch() + factory.network_delay());
2262 EXPECT_EQ(fetcher.context().monotonic_remote_time,
2263 monotonic_clock::epoch());
2264 }
2265 {
2266 ::std::unique_ptr<EventLoop> pi1_event_loop = pi1->MakeEventLoop("pong");
2267 aos::Fetcher<examples::Ping> fetcher =
2268 pi1_event_loop->MakeFetcher<examples::Ping>("/reliable2");
2269 ASSERT_TRUE(fetcher.Fetch());
2270 EXPECT_EQ(fetcher.context().monotonic_event_time,
2271 monotonic_clock::epoch() + std::chrono::seconds(1) +
2272 factory.network_delay());
2273 EXPECT_EQ(fetcher.context().monotonic_remote_time,
2274 monotonic_clock::epoch() - std::chrono::seconds(1));
2275 }
2276}
2277
Austin Schuh48205e62021-11-12 14:13:18 -08002278class SimulatedEventLoopDisconnectTest : public ::testing::Test {
2279 public:
2280 SimulatedEventLoopDisconnectTest()
2281 : config(aos::configuration::ReadConfig(ArtifactPath(
2282 "aos/events/multinode_pingpong_test_split_config.json"))),
2283 time(configuration::NodesCount(&config.message())),
2284 factory(&config.message()) {
2285 factory.SetTimeConverter(&time);
2286 }
2287
2288 void VerifyChannels(std::set<const aos::Channel *> statistics_channels,
2289 const monotonic_clock::time_point allowable_message_time,
2290 std::set<const aos::Node *> empty_nodes) {
2291 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2292 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2293 std::unique_ptr<aos::EventLoop> pi1_event_loop =
2294 pi1->MakeEventLoop("fetcher");
2295 std::unique_ptr<aos::EventLoop> pi2_event_loop =
2296 pi2->MakeEventLoop("fetcher");
2297 for (const aos::Channel *channel : *factory.configuration()->channels()) {
2298 if (configuration::ChannelIsReadableOnNode(channel,
2299 pi1_event_loop->node())) {
2300 std::unique_ptr<aos::RawFetcher> fetcher =
2301 pi1_event_loop->MakeRawFetcher(channel);
2302 if (statistics_channels.find(channel) == statistics_channels.end() ||
2303 empty_nodes.find(pi1_event_loop->node()) != empty_nodes.end()) {
2304 EXPECT_FALSE(fetcher->Fetch() &&
2305 fetcher->context().monotonic_event_time >
2306 allowable_message_time)
2307 << ": Found recent message on channel "
2308 << configuration::CleanedChannelToString(channel) << " and time "
2309 << fetcher->context().monotonic_event_time << " > "
2310 << allowable_message_time << " on pi1";
2311 } else {
2312 EXPECT_TRUE(fetcher->Fetch() &&
2313 fetcher->context().monotonic_event_time >=
2314 allowable_message_time)
2315 << ": Didn't find recent message on channel "
2316 << configuration::CleanedChannelToString(channel) << " on pi1";
2317 }
2318 }
2319 if (configuration::ChannelIsReadableOnNode(channel,
2320 pi2_event_loop->node())) {
2321 std::unique_ptr<aos::RawFetcher> fetcher =
2322 pi2_event_loop->MakeRawFetcher(channel);
2323 if (statistics_channels.find(channel) == statistics_channels.end() ||
2324 empty_nodes.find(pi2_event_loop->node()) != empty_nodes.end()) {
2325 EXPECT_FALSE(fetcher->Fetch() &&
2326 fetcher->context().monotonic_event_time >
2327 allowable_message_time)
2328 << ": Found message on channel "
2329 << configuration::CleanedChannelToString(channel) << " and time "
2330 << fetcher->context().monotonic_event_time << " > "
2331 << allowable_message_time << " on pi2";
2332 } else {
2333 EXPECT_TRUE(fetcher->Fetch() &&
2334 fetcher->context().monotonic_event_time >=
2335 allowable_message_time)
2336 << ": Didn't find message on channel "
2337 << configuration::CleanedChannelToString(channel) << " on pi2";
2338 }
2339 }
2340 }
2341 }
2342
2343 aos::FlatbufferDetachedBuffer<aos::Configuration> config;
2344
2345 message_bridge::TestingTimeConverter time;
2346 SimulatedEventLoopFactory factory;
2347};
2348
2349// Tests that if we have message bridge client/server disabled, and timing
2350// reports disabled, no messages are sent. Also tests that we can disconnect a
2351// node and disable statistics on it and it actually fully disconnects.
2352TEST_F(SimulatedEventLoopDisconnectTest, NoMessagesWhenDisabled) {
2353 time.StartEqual();
2354 factory.SkipTimingReport();
2355 factory.DisableStatistics();
2356
2357 NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
2358 NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
2359
2360 std::unique_ptr<aos::EventLoop> pi1_event_loop =
2361 pi1->MakeEventLoop("fetcher");
2362 std::unique_ptr<aos::EventLoop> pi2_event_loop =
2363 pi2->MakeEventLoop("fetcher");
2364
2365 factory.RunFor(chrono::milliseconds(100000));
2366
2367 // Confirm no messages are sent if we've configured them all off.
2368 VerifyChannels({}, monotonic_clock::min_time, {});
2369
2370 // Now, confirm that all the message_bridge channels come back when we
2371 // re-enable.
2372 factory.EnableStatistics();
2373
2374 factory.RunFor(chrono::milliseconds(10050));
2375
2376 // Build up the list of all the messages we expect when we come back.
2377 {
2378 std::set<const aos::Channel *> statistics_channels;
Brian Silverman18c8aac2022-01-02 00:03:37 -08002379 for (const std::pair<std::string_view, const Node *> &pi :
Austin Schuh48205e62021-11-12 14:13:18 -08002380 std::vector<std::pair<std::string_view, const Node *>>{
2381 {"/pi1/aos", pi1->node()},
2382 {"/pi2/aos", pi1->node()},
2383 {"/pi3/aos", pi1->node()}}) {
2384 statistics_channels.insert(configuration::GetChannel(
2385 factory.configuration(), pi.first, "aos.message_bridge.Timestamp", "",
2386 pi.second));
2387 statistics_channels.insert(configuration::GetChannel(
2388 factory.configuration(), pi.first,
2389 "aos.message_bridge.ServerStatistics", "", pi.second));
2390 statistics_channels.insert(configuration::GetChannel(
2391 factory.configuration(), pi.first,
2392 "aos.message_bridge.ClientStatistics", "", pi.second));
2393 }
2394
2395 statistics_channels.insert(configuration::GetChannel(
2396 factory.configuration(),
2397 "/pi1/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp",
2398 "aos.message_bridge.RemoteMessage", "", pi1->node()));
2399 statistics_channels.insert(configuration::GetChannel(
2400 factory.configuration(),
2401 "/pi2/aos/remote_timestamps/pi1/pi2/aos/aos-message_bridge-Timestamp",
2402 "aos.message_bridge.RemoteMessage", "", pi2->node()));
2403 VerifyChannels(statistics_channels, monotonic_clock::min_time, {});
2404 }
2405
2406 // Now test that we can disable the messages for a single node
2407 pi2->DisableStatistics();
2408 const aos::monotonic_clock::time_point statistics_disable_time =
2409 pi2->monotonic_now();
2410 factory.RunFor(chrono::milliseconds(10000));
2411
2412 // We should see a much smaller set of messages, but should still see messages
2413 // forwarded, mainly the timestamp message.
2414 {
2415 std::set<const aos::Channel *> statistics_channels;
Brian Silverman18c8aac2022-01-02 00:03:37 -08002416 for (const std::pair<std::string_view, const Node *> &pi :
Austin Schuh48205e62021-11-12 14:13:18 -08002417 std::vector<std::pair<std::string_view, const Node *>>{
2418 {"/pi1/aos", pi1->node()}, {"/pi3/aos", pi1->node()}}) {
2419 statistics_channels.insert(configuration::GetChannel(
2420 factory.configuration(), pi.first, "aos.message_bridge.Timestamp", "",
2421 pi.second));
2422 statistics_channels.insert(configuration::GetChannel(
2423 factory.configuration(), pi.first,
2424 "aos.message_bridge.ServerStatistics", "", pi.second));
2425 statistics_channels.insert(configuration::GetChannel(
2426 factory.configuration(), pi.first,
2427 "aos.message_bridge.ClientStatistics", "", pi.second));
2428 }
2429
2430 statistics_channels.insert(configuration::GetChannel(
2431 factory.configuration(),
2432 "/pi1/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp",
2433 "aos.message_bridge.RemoteMessage", "", pi1->node()));
2434 VerifyChannels(statistics_channels, statistics_disable_time, {});
2435 }
2436
2437 // Now, fully disconnect the node. This will completely quiet down pi2.
2438 pi1->Disconnect(pi2->node());
2439 pi2->Disconnect(pi1->node());
2440
2441 const aos::monotonic_clock::time_point disconnect_disable_time =
2442 pi2->monotonic_now();
2443 factory.RunFor(chrono::milliseconds(10000));
2444
2445 {
2446 std::set<const aos::Channel *> statistics_channels;
Brian Silverman18c8aac2022-01-02 00:03:37 -08002447 for (const std::pair<std::string_view, const Node *> &pi :
Austin Schuh48205e62021-11-12 14:13:18 -08002448 std::vector<std::pair<std::string_view, const Node *>>{
2449 {"/pi1/aos", pi1->node()}, {"/pi3/aos", pi1->node()}}) {
2450 statistics_channels.insert(configuration::GetChannel(
2451 factory.configuration(), pi.first, "aos.message_bridge.Timestamp", "",
2452 pi.second));
2453 statistics_channels.insert(configuration::GetChannel(
2454 factory.configuration(), pi.first,
2455 "aos.message_bridge.ServerStatistics", "", pi.second));
2456 statistics_channels.insert(configuration::GetChannel(
2457 factory.configuration(), pi.first,
2458 "aos.message_bridge.ClientStatistics", "", pi.second));
2459 }
2460
2461 VerifyChannels(statistics_channels, disconnect_disable_time, {pi2->node()});
2462 }
2463}
2464
Neil Balchc8f41ed2018-01-20 22:06:53 -08002465} // namespace testing
2466} // namespace aos