| #include "aos/events/event_loop_param_test.h" |
| |
| #include <chrono> |
| #include <filesystem> |
| #include <unordered_map> |
| #include <unordered_set> |
| |
| #include "absl/flags/flag.h" |
| #include "absl/flags/reflection.h" |
| #include "absl/log/check.h" |
| #include "absl/log/log.h" |
| #include "gmock/gmock.h" |
| #include "gtest/gtest.h" |
| |
| #include "aos/events/test_message_generated.h" |
| #include "aos/events/test_message_static.h" |
| #include "aos/flatbuffer_merge.h" |
| #include "aos/logging/log_message_generated.h" |
| #include "aos/logging/logging.h" |
| #include "aos/realtime.h" |
| |
| namespace aos::testing { |
| namespace { |
| namespace chrono = ::std::chrono; |
| } // namespace |
| |
| ::std::unique_ptr<EventLoop> AbstractEventLoopTest::Make( |
| std::string_view name) { |
| std::string name_copy(name); |
| if (name == "") { |
| name_copy = "loop"; |
| name_copy += std::to_string(event_loop_count_); |
| } |
| ++event_loop_count_; |
| auto result = factory_->Make(name_copy); |
| if (do_timing_reports() == DoTimingReports::kNo) { |
| result->SkipTimingReport(); |
| } |
| return result; |
| } |
| |
| void AbstractEventLoopTest::VerifyBuffers( |
| int number_buffers, |
| std::vector<std::reference_wrapper<const Fetcher<TestMessage>>> fetchers, |
| std::vector<std::reference_wrapper<const Sender<TestMessage>>> senders) { |
| // The buffers which are in a sender. |
| std::unordered_set<int> in_sender; |
| for (const Sender<TestMessage> &sender : senders) { |
| const int this_buffer = sender.buffer_index(); |
| CHECK_GE(this_buffer, 0); |
| CHECK_LT(this_buffer, number_buffers); |
| CHECK(in_sender.insert(this_buffer).second) << ": " << this_buffer; |
| } |
| |
| if (read_method() != ReadMethod::PIN) { |
| // If we're not using PIN, we can't really verify anything about what |
| // buffers the fetchers have. |
| return; |
| } |
| |
| // Mapping from TestMessage::value to buffer index. |
| std::unordered_map<int, int> fetcher_values; |
| for (const Fetcher<TestMessage> &fetcher : fetchers) { |
| if (!fetcher.get()) { |
| continue; |
| } |
| const int this_buffer = fetcher.context().buffer_index; |
| CHECK_GE(this_buffer, 0); |
| CHECK_LT(this_buffer, number_buffers); |
| CHECK(in_sender.count(this_buffer) == 0) << ": " << this_buffer; |
| const auto insert_result = fetcher_values.insert( |
| std::make_pair(fetcher.get()->value(), this_buffer)); |
| if (!insert_result.second) { |
| CHECK_EQ(this_buffer, insert_result.first->second); |
| } |
| } |
| } |
| |
| // Tests that watcher can receive messages from a sender. |
| // Also tests that OnRun() works. |
| TEST_P(AbstractEventLoopTest, Basic) { |
| auto loop1 = Make(); |
| auto loop2 = MakePrimary(); |
| |
| aos::Sender<TestMessage> sender = loop1->MakeSender<TestMessage>("/test"); |
| |
| bool happened = false; |
| |
| loop2->OnRun([&]() { |
| happened = true; |
| |
| aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder(); |
| TestMessage::Builder builder = msg.MakeBuilder<TestMessage>(); |
| builder.add_value(200); |
| msg.CheckOk(msg.Send(builder.Finish())); |
| }); |
| |
| loop2->MakeWatcher("/test", [&](const TestMessage &message) { |
| EXPECT_EQ(message.value(), 200); |
| this->Exit(); |
| }); |
| |
| EXPECT_FALSE(happened); |
| Run(); |
| EXPECT_TRUE(happened); |
| } |
| |
| // Tests that watcher can receive messages from a static sender. |
| // This confirms that the "static" flatbuffer API works with the EventLoop |
| // senders. |
| TEST_P(AbstractEventLoopTest, BasicStatic) { |
| auto loop1 = Make(); |
| auto loop2 = MakePrimary(); |
| |
| aos::Sender<TestMessageStatic> sender = |
| loop1->MakeSender<TestMessageStatic>("/test"); |
| |
| bool happened = false; |
| |
| loop2->OnRun([&]() { |
| happened = true; |
| |
| aos::Sender<TestMessageStatic>::StaticBuilder msg = |
| sender.MakeStaticBuilder(); |
| msg->set_value(200); |
| CHECK(msg.builder()->Verify()); |
| msg.CheckOk(msg.Send()); |
| }); |
| |
| loop2->MakeWatcher("/test", [&](const TestMessage &message) { |
| EXPECT_EQ(message.value(), 200); |
| this->Exit(); |
| }); |
| |
| EXPECT_FALSE(happened); |
| Run(); |
| EXPECT_TRUE(happened); |
| } |
| |
| // Tests that a static sender's Builder object can be moved safely. |
| TEST_P(AbstractEventLoopTest, StaticBuilderMoveConstructor) { |
| auto loop1 = MakePrimary(); |
| |
| aos::Sender<TestMessageStatic> sender = |
| loop1->MakeSender<TestMessageStatic>("/test"); |
| aos::Fetcher<TestMessage> fetcher = loop1->MakeFetcher<TestMessage>("/test"); |
| std::optional<aos::Sender<TestMessageStatic>::StaticBuilder> moved_to_builder; |
| { |
| aos::Sender<TestMessageStatic>::StaticBuilder moved_from_builder = |
| sender.MakeStaticBuilder(); |
| moved_to_builder.emplace(std::move(moved_from_builder)); |
| } |
| |
| loop1->OnRun([this, &moved_to_builder]() { |
| moved_to_builder.value()->set_value(200); |
| CHECK(moved_to_builder.value().builder()->Verify()); |
| moved_to_builder.value().CheckOk(moved_to_builder.value().Send()); |
| this->Exit(); |
| }); |
| |
| ASSERT_FALSE(fetcher.Fetch()); |
| Run(); |
| ASSERT_TRUE(fetcher.Fetch()); |
| EXPECT_EQ(200, fetcher->value()); |
| } |
| |
| // Tests that watcher can receive messages from a sender, sent via SendDetached. |
| TEST_P(AbstractEventLoopTest, BasicSendDetached) { |
| auto loop1 = Make(); |
| auto loop2 = MakePrimary(); |
| |
| aos::Sender<TestMessage> sender = loop1->MakeSender<TestMessage>("/test"); |
| |
| FlatbufferDetachedBuffer<TestMessage> detached = |
| flatbuffers::DetachedBuffer(); |
| { |
| aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder(); |
| TestMessage::Builder builder = msg.MakeBuilder<TestMessage>(); |
| builder.add_value(100); |
| detached = msg.Detach(builder.Finish()); |
| } |
| detached = flatbuffers::DetachedBuffer(); |
| { |
| aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder(); |
| TestMessage::Builder builder = msg.MakeBuilder<TestMessage>(); |
| builder.add_value(200); |
| detached = msg.Detach(builder.Finish()); |
| } |
| sender.CheckOk(sender.SendDetached(std::move(detached))); |
| |
| auto fetcher = loop2->MakeFetcher<TestMessage>("/test"); |
| ASSERT_TRUE(fetcher.Fetch()); |
| EXPECT_EQ(fetcher->value(), 200); |
| } |
| |
| // Tests that fetcher can receive messages from a sender, sent via SendJson. |
| TEST_P(AbstractEventLoopTest, BasicSendJson) { |
| auto loop1 = Make(); |
| auto loop2 = MakePrimary(); |
| |
| aos::Sender<TestMessage> sender = loop1->MakeSender<TestMessage>("/test"); |
| sender.CheckOk(sender.SendJson(R"json({"value":201})json")); |
| |
| auto fetcher = loop2->MakeFetcher<TestMessage>("/test"); |
| ASSERT_TRUE(fetcher.Fetch()); |
| EXPECT_EQ(fetcher->value(), 201); |
| } |
| |
| // Tests that invalid JSON isn't sent. |
| TEST_P(AbstractEventLoopDeathTest, InvalidSendJson) { |
| auto loop1 = Make(); |
| auto loop2 = MakePrimary(); |
| |
| aos::Sender<TestMessage> sender = loop1->MakeSender<TestMessage>("/test"); |
| EXPECT_DEATH({ sender.CheckOk(sender.SendJson(R"json({"val)json")); }, |
| "Invalid JSON"); |
| } |
| |
| // Verifies that a no-arg watcher will not have a data pointer. |
| TEST_P(AbstractEventLoopTest, NoArgNoData) { |
| auto loop1 = Make(); |
| auto loop2 = MakePrimary(); |
| |
| aos::Sender<TestMessage> sender = loop1->MakeSender<TestMessage>("/test"); |
| |
| bool happened = false; |
| |
| loop2->OnRun([&]() { |
| happened = true; |
| |
| aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder(); |
| TestMessage::Builder builder = msg.MakeBuilder<TestMessage>(); |
| msg.CheckOk(msg.Send(builder.Finish())); |
| }); |
| |
| loop2->MakeNoArgWatcher<TestMessage>("/test", [&]() { |
| EXPECT_GT(loop2->context().size, 0u); |
| EXPECT_EQ(nullptr, loop2->context().data); |
| EXPECT_EQ(-1, loop2->context().buffer_index); |
| this->Exit(); |
| }); |
| |
| EXPECT_FALSE(happened); |
| Run(); |
| EXPECT_TRUE(happened); |
| } |
| |
| // Tests that no-arg watcher can receive messages from a sender. |
| // Also tests that OnRun() works. |
| TEST_P(AbstractEventLoopTest, BasicNoArg) { |
| auto loop1 = Make(); |
| auto loop2 = MakePrimary(); |
| |
| aos::Sender<TestMessage> sender = loop1->MakeSender<TestMessage>("/test"); |
| |
| bool happened = false; |
| |
| loop2->OnRun([&]() { |
| happened = true; |
| |
| aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder(); |
| TestMessage::Builder builder = msg.MakeBuilder<TestMessage>(); |
| builder.add_value(200); |
| msg.CheckOk(msg.Send(builder.Finish())); |
| }); |
| |
| aos::Fetcher<TestMessage> fetcher = loop2->MakeFetcher<TestMessage>("/test"); |
| loop2->MakeNoArgWatcher<TestMessage>("/test", [&]() { |
| ASSERT_TRUE(fetcher.Fetch()); |
| EXPECT_EQ(fetcher->value(), 200); |
| this->Exit(); |
| }); |
| |
| EXPECT_FALSE(happened); |
| Run(); |
| EXPECT_TRUE(happened); |
| } |
| |
| // Tests that a watcher can be created with an std::function. |
| TEST_P(AbstractEventLoopTest, BasicFunction) { |
| auto loop1 = Make(); |
| auto loop2 = MakePrimary(); |
| |
| aos::Sender<TestMessage> sender = loop1->MakeSender<TestMessage>("/test"); |
| |
| bool happened = false; |
| |
| loop2->OnRun([&]() { |
| happened = true; |
| |
| aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder(); |
| TestMessage::Builder builder = msg.MakeBuilder<TestMessage>(); |
| builder.add_value(200); |
| msg.CheckOk(msg.Send(builder.Finish())); |
| }); |
| |
| loop2->MakeWatcher("/test", std::function<void(const TestMessage &)>( |
| [&](const TestMessage &message) { |
| EXPECT_EQ(message.value(), 200); |
| this->Exit(); |
| })); |
| |
| EXPECT_FALSE(happened); |
| Run(); |
| EXPECT_TRUE(happened); |
| } |
| |
| // Tests that watcher can receive messages from two senders. |
| // Also tests that OnRun() works. |
| TEST_P(AbstractEventLoopTest, BasicTwoSenders) { |
| auto loop1 = Make(); |
| auto loop2 = MakePrimary(); |
| |
| aos::Sender<TestMessage> sender1 = loop1->MakeSender<TestMessage>("/test"); |
| aos::Sender<TestMessage> sender2 = loop1->MakeSender<TestMessage>("/test"); |
| |
| bool happened = false; |
| |
| loop2->OnRun([&]() { |
| happened = true; |
| |
| { |
| aos::Sender<TestMessage>::Builder msg = sender1.MakeBuilder(); |
| TestMessage::Builder builder = msg.MakeBuilder<TestMessage>(); |
| builder.add_value(200); |
| msg.CheckOk(msg.Send(builder.Finish())); |
| } |
| { |
| aos::Sender<TestMessage>::Builder msg = sender2.MakeBuilder(); |
| TestMessage::Builder builder = msg.MakeBuilder<TestMessage>(); |
| builder.add_value(200); |
| msg.CheckOk(msg.Send(builder.Finish())); |
| } |
| }); |
| |
| int messages_received = 0; |
| loop2->MakeWatcher("/test", [&](const TestMessage &message) { |
| EXPECT_EQ(message.value(), 200); |
| this->Exit(); |
| ++messages_received; |
| }); |
| |
| EXPECT_FALSE(happened); |
| Run(); |
| EXPECT_TRUE(happened); |
| EXPECT_EQ(messages_received, 2); |
| } |
| |
| // Tests that a fetcher can fetch from a sender. |
| // Also tests that OnRun() works. |
| TEST_P(AbstractEventLoopTest, FetchWithoutRun) { |
| auto loop1 = Make(); |
| auto loop2 = Make(); |
| auto loop3 = MakePrimary(); |
| |
| auto sender = loop1->MakeSender<TestMessage>("/test"); |
| |
| auto fetcher = loop2->MakeFetcher<TestMessage>("/test"); |
| |
| EXPECT_FALSE(fetcher.Fetch()); |
| EXPECT_EQ(fetcher.get(), nullptr); |
| |
| EXPECT_EQ(fetcher.context().monotonic_event_time, monotonic_clock::min_time); |
| EXPECT_EQ(fetcher.context().monotonic_remote_time, monotonic_clock::min_time); |
| EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time, |
| monotonic_clock::min_time); |
| EXPECT_EQ(fetcher.context().realtime_event_time, realtime_clock::min_time); |
| EXPECT_EQ(fetcher.context().realtime_remote_time, realtime_clock::min_time); |
| EXPECT_EQ(fetcher.context().source_boot_uuid, UUID::Zero()); |
| EXPECT_EQ(fetcher.context().queue_index, 0xffffffffu); |
| EXPECT_EQ(fetcher.context().size, 0u); |
| EXPECT_EQ(fetcher.context().data, nullptr); |
| EXPECT_EQ(fetcher.context().buffer_index, -1); |
| |
| aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder(); |
| TestMessage::Builder builder = msg.MakeBuilder<TestMessage>(); |
| builder.add_value(200); |
| msg.CheckOk(msg.Send(builder.Finish())); |
| |
| EXPECT_TRUE(fetcher.Fetch()); |
| ASSERT_FALSE(fetcher.get() == nullptr); |
| EXPECT_EQ(fetcher.get()->value(), 200); |
| |
| const chrono::milliseconds kEpsilon(100); |
| |
| const aos::monotonic_clock::time_point monotonic_now = loop2->monotonic_now(); |
| const aos::realtime_clock::time_point realtime_now = loop2->realtime_now(); |
| EXPECT_EQ(fetcher.context().monotonic_event_time, |
| fetcher.context().monotonic_remote_time); |
| EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time, |
| monotonic_clock::min_time); |
| EXPECT_EQ(fetcher.context().realtime_event_time, |
| fetcher.context().realtime_remote_time); |
| |
| EXPECT_GE(fetcher.context().monotonic_event_time, monotonic_now - kEpsilon); |
| EXPECT_LE(fetcher.context().monotonic_event_time, monotonic_now + kEpsilon); |
| EXPECT_GE(fetcher.context().realtime_event_time, realtime_now - kEpsilon); |
| EXPECT_LE(fetcher.context().realtime_event_time, realtime_now + kEpsilon); |
| EXPECT_EQ(fetcher.context().source_boot_uuid, loop2->boot_uuid()); |
| EXPECT_EQ(fetcher.context().queue_index, 0x0u); |
| EXPECT_EQ(fetcher.context().size, 20u); |
| EXPECT_NE(fetcher.context().data, nullptr); |
| if (read_method() == ReadMethod::PIN) { |
| EXPECT_GE(fetcher.context().buffer_index, 0); |
| EXPECT_LT(fetcher.context().buffer_index, |
| loop2->NumberBuffers(fetcher.channel())); |
| } else { |
| EXPECT_EQ(fetcher.context().buffer_index, -1); |
| } |
| } |
| |
| std::function<bool(const Context &)> MakeShouldFetch( |
| bool should_fetch, size_t *called_count = nullptr) { |
| return [should_fetch, called_count](const Context &) { |
| if (called_count != nullptr) { |
| (*called_count)++; |
| } |
| return should_fetch; |
| }; |
| } |
| |
| // Tests that a fetcher using FetchIf can fetch from a sender. |
| TEST_P(AbstractEventLoopTest, FetchIfWithoutRun) { |
| auto loop1 = Make(); |
| auto loop2 = Make(); |
| auto loop3 = MakePrimary(); |
| |
| auto sender = loop1->MakeSender<TestMessage>("/test"); |
| |
| auto fetcher = loop2->MakeFetcher<TestMessage>("/test"); |
| |
| for (const bool should_fetch : {true, false}) { |
| EXPECT_FALSE(fetcher.FetchIf(MakeShouldFetch(should_fetch))); |
| EXPECT_EQ(fetcher.get(), nullptr); |
| |
| EXPECT_EQ(fetcher.context().monotonic_event_time, |
| monotonic_clock::min_time); |
| EXPECT_EQ(fetcher.context().monotonic_remote_time, |
| monotonic_clock::min_time); |
| EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time, |
| monotonic_clock::min_time); |
| EXPECT_EQ(fetcher.context().realtime_event_time, realtime_clock::min_time); |
| EXPECT_EQ(fetcher.context().realtime_remote_time, realtime_clock::min_time); |
| EXPECT_EQ(fetcher.context().source_boot_uuid, UUID::Zero()); |
| EXPECT_EQ(fetcher.context().queue_index, 0xffffffffu); |
| EXPECT_EQ(fetcher.context().size, 0u); |
| EXPECT_EQ(fetcher.context().data, nullptr); |
| EXPECT_EQ(fetcher.context().buffer_index, -1); |
| } |
| |
| aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder(); |
| TestMessage::Builder builder = msg.MakeBuilder<TestMessage>(); |
| builder.add_value(200); |
| msg.CheckOk(msg.Send(builder.Finish())); |
| |
| // Make sure failing to fetch won't affect anything. |
| EXPECT_FALSE(fetcher.FetchIf(MakeShouldFetch(false))); |
| EXPECT_EQ(fetcher.get(), nullptr); |
| |
| EXPECT_EQ(fetcher.context().monotonic_event_time, monotonic_clock::min_time); |
| EXPECT_EQ(fetcher.context().monotonic_remote_time, monotonic_clock::min_time); |
| EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time, |
| monotonic_clock::min_time); |
| EXPECT_EQ(fetcher.context().realtime_event_time, realtime_clock::min_time); |
| EXPECT_EQ(fetcher.context().realtime_remote_time, realtime_clock::min_time); |
| EXPECT_EQ(fetcher.context().source_boot_uuid, UUID::Zero()); |
| EXPECT_EQ(fetcher.context().queue_index, 0xffffffffu); |
| EXPECT_EQ(fetcher.context().size, 0u); |
| EXPECT_EQ(fetcher.context().data, nullptr); |
| EXPECT_EQ(fetcher.context().buffer_index, -1); |
| |
| // And now confirm we succeed and everything gets set right. |
| EXPECT_TRUE(fetcher.FetchIf(MakeShouldFetch(true))); |
| ASSERT_FALSE(fetcher.get() == nullptr); |
| EXPECT_EQ(fetcher.get()->value(), 200); |
| |
| const chrono::milliseconds kEpsilon(100); |
| |
| const aos::monotonic_clock::time_point monotonic_now = loop2->monotonic_now(); |
| const aos::realtime_clock::time_point realtime_now = loop2->realtime_now(); |
| EXPECT_EQ(fetcher.context().monotonic_event_time, |
| fetcher.context().monotonic_remote_time); |
| EXPECT_EQ(fetcher.context().realtime_event_time, |
| fetcher.context().realtime_remote_time); |
| EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time, |
| monotonic_clock::min_time); |
| |
| EXPECT_GE(fetcher.context().monotonic_event_time, monotonic_now - kEpsilon); |
| EXPECT_LE(fetcher.context().monotonic_event_time, monotonic_now + kEpsilon); |
| EXPECT_GE(fetcher.context().realtime_event_time, realtime_now - kEpsilon); |
| EXPECT_LE(fetcher.context().realtime_event_time, realtime_now + kEpsilon); |
| EXPECT_EQ(fetcher.context().source_boot_uuid, loop2->boot_uuid()); |
| EXPECT_EQ(fetcher.context().queue_index, 0x0u); |
| EXPECT_EQ(fetcher.context().size, 20u); |
| EXPECT_NE(fetcher.context().data, nullptr); |
| if (read_method() == ReadMethod::PIN) { |
| EXPECT_GE(fetcher.context().buffer_index, 0); |
| EXPECT_LT(fetcher.context().buffer_index, |
| loop2->NumberBuffers(fetcher.channel())); |
| } else { |
| EXPECT_EQ(fetcher.context().buffer_index, -1); |
| } |
| } |
| |
| // Tests that watcher will receive all messages sent if they are sent after |
| // initialization and before running. |
| TEST_P(AbstractEventLoopTest, DoubleSendAtStartup) { |
| auto loop1 = Make(); |
| auto loop2 = MakePrimary(); |
| |
| auto sender = loop1->MakeSender<TestMessage>("/test"); |
| |
| ::std::vector<int> values; |
| |
| loop2->MakeWatcher("/test", [&](const TestMessage &message) { |
| values.push_back(message.value()); |
| if (values.size() == 2) { |
| this->Exit(); |
| } |
| }); |
| |
| // Before Run, should be ignored. |
| { |
| aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder(); |
| TestMessage::Builder builder = msg.MakeBuilder<TestMessage>(); |
| builder.add_value(199); |
| msg.CheckOk(msg.Send(builder.Finish())); |
| } |
| |
| loop2->OnRun([&]() { |
| for (int i = 200; i < 202; ++i) { |
| aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder(); |
| TestMessage::Builder builder = msg.MakeBuilder<TestMessage>(); |
| builder.add_value(i); |
| msg.CheckOk(msg.Send(builder.Finish())); |
| } |
| }); |
| |
| Run(); |
| |
| EXPECT_THAT(values, ::testing::ElementsAreArray({200, 201})); |
| } |
| |
| // Tests that watcher will not receive messages sent before the watcher is |
| // created. |
| TEST_P(AbstractEventLoopTest, DoubleSendAfterStartup) { |
| auto loop1 = Make(); |
| auto loop2 = MakePrimary(); |
| |
| auto sender = loop1->MakeSender<TestMessage>("/test"); |
| |
| ::std::vector<int> values; |
| |
| for (int i = 200; i < 202; ++i) { |
| aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder(); |
| TestMessage::Builder builder = msg.MakeBuilder<TestMessage>(); |
| builder.add_value(i); |
| msg.CheckOk(msg.Send(builder.Finish())); |
| } |
| |
| loop2->MakeWatcher("/test", [&](const TestMessage &message) { |
| values.push_back(message.value()); |
| }); |
| |
| // Add a timer to actually quit. |
| auto test_timer = loop2->AddTimer([this]() { this->Exit(); }); |
| loop2->OnRun([&test_timer, &loop2]() { |
| test_timer->Schedule(loop2->monotonic_now(), |
| ::std::chrono::milliseconds(100)); |
| }); |
| |
| Run(); |
| EXPECT_EQ(0, values.size()); |
| } |
| |
| // Tests that FetchNext gets all the messages sent after it is constructed. |
| TEST_P(AbstractEventLoopTest, FetchNext) { |
| auto loop1 = Make(); |
| auto loop2 = MakePrimary(); |
| |
| auto sender = loop1->MakeSender<TestMessage>("/test"); |
| auto fetcher = loop2->MakeFetcher<TestMessage>("/test"); |
| |
| ::std::vector<int> values; |
| |
| for (int i = 200; i < 202; ++i) { |
| aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder(); |
| TestMessage::Builder builder = msg.MakeBuilder<TestMessage>(); |
| builder.add_value(i); |
| msg.CheckOk(msg.Send(builder.Finish())); |
| } |
| |
| // Add a timer to actually quit. |
| auto test_timer = loop2->AddTimer([&fetcher, &values, this]() { |
| while (fetcher.FetchNext()) { |
| values.push_back(fetcher.get()->value()); |
| } |
| this->Exit(); |
| }); |
| |
| loop2->OnRun([&test_timer, &loop2]() { |
| test_timer->Schedule(loop2->monotonic_now(), |
| ::std::chrono::milliseconds(100)); |
| }); |
| |
| Run(); |
| EXPECT_THAT(values, ::testing::ElementsAreArray({200, 201})); |
| } |
| |
| // Tests that FetchNext gets no messages sent before it is constructed. |
| TEST_P(AbstractEventLoopTest, FetchNextAfterSend) { |
| auto loop1 = Make(); |
| auto loop2 = MakePrimary(); |
| |
| auto sender = loop1->MakeSender<TestMessage>("/test"); |
| |
| ::std::vector<int> values; |
| |
| for (int i = 200; i < 202; ++i) { |
| aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder(); |
| TestMessage::Builder builder = msg.MakeBuilder<TestMessage>(); |
| builder.add_value(i); |
| msg.CheckOk(msg.Send(builder.Finish())); |
| } |
| |
| auto fetcher = loop2->MakeFetcher<TestMessage>("/test"); |
| |
| // Add a timer to actually quit. |
| auto test_timer = loop2->AddTimer([&fetcher, &values, this]() { |
| while (fetcher.FetchNext()) { |
| values.push_back(fetcher.get()->value()); |
| } |
| this->Exit(); |
| }); |
| |
| loop2->OnRun([&test_timer, &loop2]() { |
| test_timer->Schedule(loop2->monotonic_now(), |
| ::std::chrono::milliseconds(100)); |
| }); |
| |
| Run(); |
| EXPECT_THAT(0, values.size()); |
| } |
| |
| // Tests that FetchNextIf gets no messages sent before it is constructed. |
| TEST_P(AbstractEventLoopTest, FetchNextIfAfterSend) { |
| auto loop1 = Make(); |
| auto loop2 = MakePrimary(); |
| |
| auto sender = loop1->MakeSender<TestMessage>("/test"); |
| |
| ::std::vector<int> values; |
| |
| for (int i = 200; i < 202; ++i) { |
| aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder(); |
| TestMessage::Builder builder = msg.MakeBuilder<TestMessage>(); |
| builder.add_value(i); |
| msg.CheckOk(msg.Send(builder.Finish())); |
| } |
| |
| auto fetcher = loop2->MakeFetcher<TestMessage>("/test"); |
| |
| // Add a timer to actually quit. |
| auto test_timer = loop2->AddTimer([&fetcher, &values, this]() { |
| while (fetcher.FetchNextIf(MakeShouldFetch(true))) { |
| values.push_back(fetcher.get()->value()); |
| } |
| this->Exit(); |
| }); |
| |
| loop2->OnRun([&test_timer, &loop2]() { |
| test_timer->Schedule(loop2->monotonic_now(), |
| ::std::chrono::milliseconds(100)); |
| }); |
| |
| Run(); |
| EXPECT_EQ(0, values.size()); |
| } |
| |
| // Tests that Fetch returns the last message created before the loop was |
| // started. |
| TEST_P(AbstractEventLoopTest, FetchDataFromBeforeCreation) { |
| auto loop1 = Make(); |
| auto loop2 = MakePrimary(); |
| |
| auto sender = loop1->MakeSender<TestMessage>("/test"); |
| |
| ::std::vector<int> values; |
| |
| for (int i = 200; i < 202; ++i) { |
| aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder(); |
| TestMessage::Builder builder = msg.MakeBuilder<TestMessage>(); |
| builder.add_value(i); |
| msg.CheckOk(msg.Send(builder.Finish())); |
| } |
| |
| auto fetcher = loop2->MakeFetcher<TestMessage>("/test"); |
| |
| // Add a timer to actually quit. |
| auto test_timer = loop2->AddTimer([&fetcher, &values, this]() { |
| if (fetcher.Fetch()) { |
| values.push_back(fetcher.get()->value()); |
| } |
| // Do it again to make sure we don't double fetch. |
| if (fetcher.Fetch()) { |
| values.push_back(fetcher.get()->value()); |
| } |
| this->Exit(); |
| }); |
| |
| loop2->OnRun([&test_timer, &loop2]() { |
| test_timer->Schedule(loop2->monotonic_now(), |
| ::std::chrono::milliseconds(100)); |
| }); |
| |
| Run(); |
| EXPECT_THAT(values, ::testing::ElementsAreArray({201})); |
| } |
| |
| // Tests that FetchIf returns the last message created before the loop was |
| // started. |
| TEST_P(AbstractEventLoopTest, FetchIfDataFromBeforeCreation) { |
| auto loop1 = Make(); |
| auto loop2 = MakePrimary(); |
| |
| auto sender = loop1->MakeSender<TestMessage>("/test"); |
| |
| ::std::vector<int> values; |
| |
| for (int i = 200; i < 202; ++i) { |
| aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder(); |
| TestMessage::Builder builder = msg.MakeBuilder<TestMessage>(); |
| builder.add_value(i); |
| msg.CheckOk(msg.Send(builder.Finish())); |
| } |
| |
| auto fetcher = loop2->MakeFetcher<TestMessage>("/test"); |
| |
| // Add a timer to actually quit. |
| auto test_timer = loop2->AddTimer([&fetcher, &values, this]() { |
| if (fetcher.FetchIf(MakeShouldFetch(true))) { |
| values.push_back(fetcher.get()->value()); |
| } |
| |
| if (fetcher.FetchIf(MakeShouldFetch(false))) { |
| values.push_back(fetcher.get()->value()); |
| } |
| // Do it again to make sure we don't double fetch. |
| if (fetcher.FetchIf(MakeShouldFetch(true))) { |
| values.push_back(fetcher.get()->value()); |
| } |
| this->Exit(); |
| }); |
| |
| loop2->OnRun([&test_timer, &loop2]() { |
| test_timer->Schedule(loop2->monotonic_now(), |
| ::std::chrono::milliseconds(100)); |
| }); |
| |
| Run(); |
| EXPECT_THAT(values, ::testing::ElementsAreArray({201})); |
| } |
| |
| // Tests that timer handler is enabled after setup (even if it is in the past) |
| // and is disabled after running |
| TEST_P(AbstractEventLoopTest, CheckTimerDisabled) { |
| auto loop = MakePrimary("primary"); |
| |
| auto timer = loop->AddTimer([this]() { |
| LOG(INFO) << "timer called"; |
| Exit(); |
| }); |
| |
| loop->OnRun([&loop, timer]() { |
| EXPECT_TRUE(timer->IsDisabled()); |
| timer->Schedule(loop->monotonic_now() + chrono::milliseconds(100)); |
| EXPECT_FALSE(timer->IsDisabled()); |
| }); |
| |
| Run(); |
| EXPECT_TRUE(timer->IsDisabled()); |
| } |
| |
| // Tests that timer handler is enabled after setup (even if it is in the past) |
| // and is disabled after running |
| TEST_P(AbstractEventLoopTest, CheckTimerRunInPastDisabled) { |
| auto loop = MakePrimary("primary"); |
| |
| auto timer2 = loop->AddTimer([this]() { |
| LOG(INFO) << "timer called"; |
| Exit(); |
| }); |
| |
| auto timer = loop->AddTimer([&loop, timer2]() { |
| timer2->Schedule(loop->monotonic_now() - chrono::nanoseconds(1)); |
| }); |
| |
| loop->OnRun([&loop, timer]() { |
| timer->Schedule(loop->monotonic_now() + chrono::seconds(1)); |
| EXPECT_FALSE(timer->IsDisabled()); |
| }); |
| |
| Run(); |
| EXPECT_TRUE(timer2->IsDisabled()); |
| } |
| |
| // Tests that timer handler is not disabled even after calling Exit on the event |
| // loop within the timer |
| TEST_P(AbstractEventLoopTest, CheckTimerRepeatOnCountDisabled) { |
| auto loop = MakePrimary("primary"); |
| int counter = 0; |
| |
| auto timer = loop->AddTimer([&counter, this]() { |
| LOG(INFO) << "timer called"; |
| counter++; |
| if (counter >= 5) { |
| Exit(); |
| } |
| }); |
| |
| loop->OnRun([&loop, timer]() { |
| timer->Schedule(loop->monotonic_now() + chrono::seconds(1), |
| chrono::seconds(1)); |
| EXPECT_FALSE(timer->IsDisabled()); |
| }); |
| Run(); |
| |
| // Sanity check |
| EXPECT_EQ(counter, 5); |
| |
| // if you run the loop again, the timer will start running again |
| EXPECT_FALSE(timer->IsDisabled()); |
| |
| counter = 0; |
| Run(); |
| timer->Disable(); |
| |
| EXPECT_TRUE(timer->IsDisabled()); |
| } |
| |
| // Tests that timer handler is not disabled even after calling Exit on the event |
| // loop using an external timer |
| TEST_P(AbstractEventLoopTest, CheckTimerRepeatTillEndTimerDisabled) { |
| auto loop = MakePrimary("primary"); |
| |
| auto timer = loop->AddTimer([]() { LOG(INFO) << "timer called"; }); |
| |
| loop->OnRun([&loop, timer]() { |
| timer->Schedule(loop->monotonic_now() + chrono::seconds(1), |
| chrono::seconds(1)); |
| EXPECT_FALSE(timer->IsDisabled()); |
| }); |
| |
| EndEventLoop(loop.get(), chrono::seconds(5)); |
| Run(); |
| EXPECT_FALSE(timer->IsDisabled()); |
| |
| timer->Disable(); |
| EXPECT_TRUE(timer->IsDisabled()); |
| } |
| |
| // Tests that Fetch and FetchNext interleave as expected. |
| TEST_P(AbstractEventLoopTest, FetchAndFetchNextTogether) { |
| auto loop1 = Make(); |
| auto loop2 = MakePrimary(); |
| |
| auto sender = loop1->MakeSender<TestMessage>("/test"); |
| |
| ::std::vector<int> values; |
| |
| for (int i = 200; i < 202; ++i) { |
| aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder(); |
| TestMessage::Builder builder = msg.MakeBuilder<TestMessage>(); |
| builder.add_value(i); |
| msg.CheckOk(msg.Send(builder.Finish())); |
| } |
| |
| auto fetcher = loop2->MakeFetcher<TestMessage>("/test"); |
| |
| // Add a timer to actually quit. |
| auto test_timer = loop2->AddTimer([&fetcher, &values, &sender, this]() { |
| if (fetcher.Fetch()) { |
| values.push_back(fetcher.get()->value()); |
| } |
| |
| for (int i = 202; i < 205; ++i) { |
| aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder(); |
| TestMessage::Builder builder = msg.MakeBuilder<TestMessage>(); |
| builder.add_value(i); |
| msg.CheckOk(msg.Send(builder.Finish())); |
| } |
| |
| if (fetcher.FetchNext()) { |
| values.push_back(fetcher.get()->value()); |
| } |
| |
| if (fetcher.Fetch()) { |
| values.push_back(fetcher.get()->value()); |
| } |
| |
| this->Exit(); |
| }); |
| |
| loop2->OnRun([&test_timer, &loop2]() { |
| test_timer->Schedule(loop2->monotonic_now(), |
| ::std::chrono::milliseconds(100)); |
| }); |
| |
| Run(); |
| EXPECT_THAT(values, ::testing::ElementsAreArray({201, 202, 204})); |
| } |
| |
| // Tests that Fetch{If,} and FetchNext{If,} interleave as expected. |
| TEST_P(AbstractEventLoopTest, FetchAndFetchNextIfTogether) { |
| auto loop1 = Make(); |
| auto loop2 = MakePrimary(); |
| |
| auto sender = loop1->MakeSender<TestMessage>("/test"); |
| |
| ::std::vector<int> values; |
| |
| for (int i = 200; i < 202; ++i) { |
| aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder(); |
| TestMessage::Builder builder = msg.MakeBuilder<TestMessage>(); |
| builder.add_value(i); |
| msg.CheckOk(msg.Send(builder.Finish())); |
| } |
| |
| auto fetcher = loop2->MakeFetcher<TestMessage>("/test"); |
| |
| // Add a timer to actually quit. |
| auto test_timer = loop2->AddTimer([&fetcher, &values, &sender, this]() { |
| if (fetcher.Fetch()) { |
| values.push_back(fetcher.get()->value()); |
| } |
| |
| for (int i = 202; i < 205; ++i) { |
| aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder(); |
| TestMessage::Builder builder = msg.MakeBuilder<TestMessage>(); |
| builder.add_value(i); |
| msg.CheckOk(msg.Send(builder.Finish())); |
| } |
| |
| EXPECT_FALSE(fetcher.FetchNextIf(MakeShouldFetch(false))); |
| |
| if (fetcher.FetchNext()) { |
| values.push_back(fetcher.get()->value()); |
| } |
| |
| EXPECT_FALSE(fetcher.FetchNextIf(MakeShouldFetch(false))); |
| EXPECT_FALSE(fetcher.FetchIf(MakeShouldFetch(false))); |
| |
| if (fetcher.FetchIf(MakeShouldFetch(true))) { |
| values.push_back(fetcher.get()->value()); |
| } |
| |
| this->Exit(); |
| }); |
| |
| loop2->OnRun([&test_timer, &loop2]() { |
| test_timer->Schedule(loop2->monotonic_now(), |
| ::std::chrono::milliseconds(100)); |
| }); |
| |
| Run(); |
| EXPECT_THAT(values, ::testing::ElementsAreArray({201, 202, 204})); |
| } |
| |
| // Tests that FetchNext behaves correctly when we get two messages in the queue |
| // but don't consume the first until after the second has been sent. |
| TEST_P(AbstractEventLoopTest, FetchNextTest) { |
| auto send_loop = Make(); |
| auto fetch_loop = Make(); |
| auto sender = send_loop->MakeSender<TestMessage>("/test"); |
| Fetcher<TestMessage> fetcher = fetch_loop->MakeFetcher<TestMessage>("/test"); |
| |
| { |
| aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder(); |
| TestMessage::Builder builder = msg.MakeBuilder<TestMessage>(); |
| builder.add_value(100); |
| msg.CheckOk(msg.Send(builder.Finish())); |
| } |
| |
| { |
| aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder(); |
| TestMessage::Builder builder = msg.MakeBuilder<TestMessage>(); |
| builder.add_value(200); |
| msg.CheckOk(msg.Send(builder.Finish())); |
| } |
| |
| ASSERT_TRUE(fetcher.FetchNext()); |
| ASSERT_NE(nullptr, fetcher.get()); |
| EXPECT_EQ(100, fetcher.get()->value()); |
| |
| ASSERT_TRUE(fetcher.FetchNext()); |
| ASSERT_NE(nullptr, fetcher.get()); |
| EXPECT_EQ(200, fetcher.get()->value()); |
| |
| // When we run off the end of the queue, expect to still have the old message: |
| ASSERT_FALSE(fetcher.FetchNext()); |
| ASSERT_NE(nullptr, fetcher.get()); |
| EXPECT_EQ(200, fetcher.get()->value()); |
| } |
| |
| // Tests that FetchNext behaves correctly when we get two messages in the queue |
| // but don't consume the first until after the second has been sent. |
| TEST_P(AbstractEventLoopTest, FetchNextIfTest) { |
| auto send_loop = Make(); |
| auto fetch_loop = Make(); |
| auto sender = send_loop->MakeSender<TestMessage>("/test"); |
| Fetcher<TestMessage> fetcher = fetch_loop->MakeFetcher<TestMessage>("/test"); |
| |
| { |
| aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder(); |
| TestMessage::Builder builder = msg.MakeBuilder<TestMessage>(); |
| builder.add_value(100); |
| msg.CheckOk(msg.Send(builder.Finish())); |
| } |
| |
| { |
| aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder(); |
| TestMessage::Builder builder = msg.MakeBuilder<TestMessage>(); |
| builder.add_value(200); |
| msg.CheckOk(msg.Send(builder.Finish())); |
| } |
| |
| size_t called_count = 0; |
| ASSERT_TRUE(fetcher.FetchNextIf(MakeShouldFetch(true, &called_count))); |
| ASSERT_NE(nullptr, fetcher.get()); |
| EXPECT_EQ(100, fetcher.get()->value()); |
| EXPECT_EQ(called_count, 1u); |
| |
| ASSERT_FALSE(fetcher.FetchNextIf(MakeShouldFetch(false, &called_count))); |
| EXPECT_EQ(called_count, 2u); |
| |
| ASSERT_TRUE(fetcher.FetchNextIf(MakeShouldFetch(true, &called_count))); |
| ASSERT_NE(nullptr, fetcher.get()); |
| EXPECT_EQ(200, fetcher.get()->value()); |
| EXPECT_EQ(called_count, 3u); |
| |
| // When we run off the end of the queue, expect to still have the old message: |
| ASSERT_FALSE(fetcher.FetchNextIf(MakeShouldFetch(false, &called_count))); |
| EXPECT_EQ(called_count, 3u); |
| ASSERT_NE(nullptr, fetcher.get()); |
| EXPECT_EQ(200, fetcher.get()->value()); |
| } |
| |
| // Verify that a fetcher still holds its data, even after falling behind. |
| TEST_P(AbstractEventLoopTest, FetcherBehindData) { |
| auto send_loop = Make(); |
| auto fetch_loop = Make(); |
| auto sender = send_loop->MakeSender<TestMessage>("/test"); |
| Fetcher<TestMessage> fetcher = fetch_loop->MakeFetcher<TestMessage>("/test"); |
| { |
| aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder(); |
| TestMessage::Builder builder = msg.MakeBuilder<TestMessage>(); |
| builder.add_value(1); |
| msg.CheckOk(msg.Send(builder.Finish())); |
| } |
| ASSERT_TRUE(fetcher.Fetch()); |
| EXPECT_EQ(1, fetcher.get()->value()); |
| for (int i = 0; i < 300; ++i) { |
| aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder(); |
| TestMessage::Builder builder = msg.MakeBuilder<TestMessage>(); |
| builder.add_value(i + 2); |
| msg.CheckOk(msg.Send(builder.Finish())); |
| } |
| EXPECT_EQ(1, fetcher.get()->value()); |
| } |
| |
| // Try a bunch of orderings of operations with fetchers and senders. Verify that |
| // all the fetchers have the correct data at each step. |
| TEST_P(AbstractEventLoopTest, FetcherPermutations) { |
| for (int max_save = 0; max_save < 5; ++max_save) { |
| SCOPED_TRACE("max_save=" + std::to_string(max_save)); |
| |
| auto send_loop = Make(); |
| auto fetch_loop = Make(); |
| auto sender = send_loop->MakeSender<TestMessage>("/test"); |
| const auto send_message = [&sender](int i) { |
| aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder(); |
| TestMessage::Builder builder = msg.MakeBuilder<TestMessage>(); |
| builder.add_value(i); |
| msg.CheckOk(msg.Send(builder.Finish())); |
| }; |
| std::vector<Fetcher<TestMessage>> fetchers; |
| for (int i = 0; i < 10; ++i) { |
| fetchers.emplace_back(fetch_loop->MakeFetcher<TestMessage>("/test")); |
| } |
| send_message(1); |
| const auto verify_buffers = [&]() { |
| std::vector<std::reference_wrapper<const Fetcher<TestMessage>>> |
| fetchers_copy; |
| for (const auto &fetcher : fetchers) { |
| fetchers_copy.emplace_back(fetcher); |
| } |
| std::vector<std::reference_wrapper<const Sender<TestMessage>>> |
| senders_copy; |
| senders_copy.emplace_back(sender); |
| VerifyBuffers(send_loop->NumberBuffers(sender.channel()), fetchers_copy, |
| senders_copy); |
| }; |
| for (auto &fetcher : fetchers) { |
| ASSERT_TRUE(fetcher.Fetch()); |
| verify_buffers(); |
| EXPECT_EQ(1, fetcher.get()->value()); |
| } |
| |
| for (int save = 1; save <= max_save; ++save) { |
| SCOPED_TRACE("save=" + std::to_string(save)); |
| send_message(100 + save); |
| verify_buffers(); |
| for (size_t i = 0; i < fetchers.size() - save; ++i) { |
| SCOPED_TRACE("fetcher=" + std::to_string(i)); |
| ASSERT_TRUE(fetchers[i].Fetch()); |
| verify_buffers(); |
| EXPECT_EQ(100 + save, fetchers[i].get()->value()); |
| } |
| for (size_t i = fetchers.size() - save; i < fetchers.size() - 1; ++i) { |
| SCOPED_TRACE("fetcher=" + std::to_string(i)); |
| EXPECT_EQ(100 + (fetchers.size() - 1 - i), fetchers[i].get()->value()); |
| } |
| EXPECT_EQ(1, fetchers.back().get()->value()); |
| } |
| |
| for (int i = 0; i < 300; ++i) { |
| send_message(200 + i); |
| verify_buffers(); |
| } |
| |
| for (size_t i = 0; i < fetchers.size() - max_save; ++i) { |
| SCOPED_TRACE("fetcher=" + std::to_string(i)); |
| if (max_save > 0) { |
| EXPECT_EQ(100 + max_save, fetchers[i].get()->value()); |
| } else { |
| EXPECT_EQ(1, fetchers[i].get()->value()); |
| } |
| } |
| for (size_t i = fetchers.size() - max_save; i < fetchers.size() - 1; ++i) { |
| SCOPED_TRACE("fetcher=" + std::to_string(i)); |
| EXPECT_EQ(100 + (fetchers.size() - 1 - i), fetchers[i].get()->value()); |
| } |
| EXPECT_EQ(1, fetchers.back().get()->value()); |
| } |
| } |
| |
| // Verify that making a fetcher and watcher for "/test" succeeds. |
| TEST_P(AbstractEventLoopTest, FetcherAndWatcher) { |
| auto loop = Make(); |
| auto fetcher = loop->MakeFetcher<TestMessage>("/test"); |
| loop->MakeWatcher("/test", [&](const TestMessage &) {}); |
| } |
| |
| // Verify that making 2 fetchers for "/test" succeeds. |
| TEST_P(AbstractEventLoopTest, TwoFetcher) { |
| auto loop = Make(); |
| auto fetcher = loop->MakeFetcher<TestMessage>("/test"); |
| auto fetcher2 = loop->MakeFetcher<TestMessage>("/test"); |
| } |
| |
| // Verify that registering a watcher for an invalid channel name dies. |
| TEST_P(AbstractEventLoopDeathTest, InvalidChannelName) { |
| auto loop = Make(); |
| EXPECT_DEATH( |
| { loop->MakeWatcher("/test/invalid", [&](const TestMessage &) {}); }, |
| "/test/invalid"); |
| EXPECT_DEATH( |
| { loop->MakeNoArgWatcher<TestMessage>("/test/invalid", [&]() {}); }, |
| "/test/invalid"); |
| } |
| |
| // Verify that setting up a timer before monotonic_clock::epoch() fails. |
| TEST_P(AbstractEventLoopDeathTest, NegativeTimeTimer) { |
| auto loop = Make(); |
| TimerHandler *time = loop->AddTimer([]() {}); |
| EXPECT_DEATH( |
| time->Schedule(monotonic_clock::epoch() - std::chrono::seconds(1)), |
| "-1.000"); |
| } |
| |
| // Verify that registering a watcher twice for "/test" fails. |
| TEST_P(AbstractEventLoopDeathTest, TwoWatcher) { |
| auto loop = Make(); |
| loop->MakeWatcher("/test", [&](const TestMessage &) {}); |
| EXPECT_DEATH(loop->MakeWatcher("/test", [&](const TestMessage &) {}), |
| "/test"); |
| EXPECT_DEATH(loop->MakeNoArgWatcher<TestMessage>("/test", [&]() {}), "/test"); |
| } |
| |
| // Verify that registering a no-arg watcher twice for "/test" fails. |
| TEST_P(AbstractEventLoopDeathTest, TwoNoArgWatcher) { |
| auto loop = Make(); |
| loop->MakeNoArgWatcher<TestMessage>("/test", [&]() {}); |
| EXPECT_DEATH(loop->MakeWatcher("/test", [&](const TestMessage &) {}), |
| "/test"); |
| EXPECT_DEATH(loop->MakeNoArgWatcher<TestMessage>("/test", [&]() {}), "/test"); |
| } |
| |
| // Verify that SetRuntimeRealtimePriority fails while running. |
| TEST_P(AbstractEventLoopDeathTest, SetRuntimeRealtimePriority) { |
| auto loop = MakePrimary(); |
| EXPECT_EQ(0, loop->runtime_realtime_priority()); |
| // Confirm that runtime priority calls work when not realtime. |
| loop->SetRuntimeRealtimePriority(5); |
| EXPECT_EQ(5, loop->runtime_realtime_priority()); |
| |
| loop->OnRun([&]() { loop->SetRuntimeRealtimePriority(5); }); |
| |
| EXPECT_DEATH(Run(), "realtime"); |
| } |
| |
| namespace { |
| |
| bool CpuSetEqual(const cpu_set_t &a, const cpu_set_t &b) { |
| return CPU_EQUAL(&a, &b); |
| } |
| |
| } // namespace |
| |
| // Verify that SetRuntimeAffinity fails while running. |
| TEST_P(AbstractEventLoopDeathTest, SetRuntimeAffinity) { |
| const cpu_set_t available = GetCurrentThreadAffinity(); |
| int first_cpu = -1; |
| for (int i = 0; i < CPU_SETSIZE; ++i) { |
| if (CPU_ISSET(i, &available)) { |
| first_cpu = i; |
| break; |
| continue; |
| } |
| } |
| CHECK_NE(first_cpu, -1) << ": Default affinity has no CPUs?"; |
| |
| auto loop = MakePrimary(); |
| EXPECT_TRUE( |
| CpuSetEqual(EventLoop::DefaultAffinity(), loop->runtime_affinity())); |
| const cpu_set_t new_affinity = MakeCpusetFromCpus({first_cpu}); |
| // Confirm that runtime priority calls work when not running. |
| loop->SetRuntimeAffinity(new_affinity); |
| EXPECT_TRUE(CpuSetEqual(new_affinity, loop->runtime_affinity())); |
| |
| loop->OnRun( |
| [&]() { loop->SetRuntimeAffinity(MakeCpusetFromCpus({first_cpu})); }); |
| |
| EXPECT_DEATH(Run(), "Cannot set affinity while running"); |
| } |
| |
| // Verify that registering a watcher and a sender for "/test" fails. |
| TEST_P(AbstractEventLoopDeathTest, WatcherAndSender) { |
| auto loop = Make(); |
| auto sender = loop->MakeSender<TestMessage>("/test"); |
| EXPECT_DEATH(loop->MakeWatcher("/test", [&](const TestMessage &) {}), |
| "/test"); |
| } |
| |
| // Verify that creating too many senders fails. |
| TEST_P(AbstractEventLoopDeathTest, TooManySenders) { |
| auto loop = Make(); |
| std::vector<aos::Sender<TestMessage>> senders; |
| for (int i = 0; i < 10; ++i) { |
| senders.emplace_back(loop->MakeSender<TestMessage>("/test")); |
| } |
| EXPECT_DEATH({ loop->MakeSender<TestMessage>("/test"); }, |
| "Failed to create sender on \\{ \"name\": \"/test\", \"type\": " |
| "\"aos.TestMessage\"[^}]*\\ }, too many senders."); |
| } |
| |
| // Verify that creating too many fetchers fails. |
| TEST_P(AbstractEventLoopDeathTest, TooManyFetchers) { |
| if (read_method() != ReadMethod::PIN) { |
| // Other read methods don't limit the number of readers, so just skip this. |
| return; |
| } |
| |
| auto loop = Make(); |
| std::vector<aos::Fetcher<TestMessage>> fetchers; |
| for (int i = 0; i < 10; ++i) { |
| fetchers.emplace_back(loop->MakeFetcher<TestMessage>("/test")); |
| } |
| EXPECT_DEATH({ loop->MakeFetcher<TestMessage>("/test"); }, |
| "Failed to create reader on \\{ \"name\": \"/test\", \"type\": " |
| "\"aos.TestMessage\"[^}]*\\ }, too many readers."); |
| } |
| |
| // Verify that creating too many fetchers, split between two event loops, fails. |
| TEST_P(AbstractEventLoopDeathTest, TooManyFetchersTwoLoops) { |
| if (read_method() != ReadMethod::PIN) { |
| // Other read methods don't limit the number of readers, so just skip this. |
| return; |
| } |
| |
| auto loop = Make(); |
| auto loop2 = Make(); |
| std::vector<aos::Fetcher<TestMessage>> fetchers; |
| for (int i = 0; i < 5; ++i) { |
| fetchers.emplace_back(loop->MakeFetcher<TestMessage>("/test")); |
| fetchers.emplace_back(loop2->MakeFetcher<TestMessage>("/test")); |
| } |
| EXPECT_DEATH({ loop->MakeFetcher<TestMessage>("/test"); }, |
| "Failed to create reader on \\{ \"name\": \"/test\", \"type\": " |
| "\"aos.TestMessage\"[^}]*\\ }, too many readers."); |
| } |
| |
| // Verify that creating too many watchers fails. |
| TEST_P(AbstractEventLoopDeathTest, TooManyWatchers) { |
| if (read_method() != ReadMethod::PIN) { |
| // Other read methods don't limit the number of readers, so just skip this. |
| return; |
| } |
| |
| std::vector<std::unique_ptr<EventLoop>> loops; |
| for (int i = 0; i < 10; ++i) { |
| loops.emplace_back(Make()); |
| loops.back()->MakeWatcher("/test", [](const TestMessage &) {}); |
| } |
| EXPECT_DEATH({ Make()->MakeWatcher("/test", [](const TestMessage &) {}); }, |
| "Failed to create reader on \\{ \"name\": \"/test\", \"type\": " |
| "\"aos.TestMessage\"[^}]*\\ }, too many readers."); |
| } |
| |
| // Verify that creating too many watchers and fetchers combined fails. |
| TEST_P(AbstractEventLoopDeathTest, TooManyWatchersAndFetchers) { |
| if (read_method() != ReadMethod::PIN) { |
| // Other read methods don't limit the number of readers, so just skip this. |
| return; |
| } |
| |
| auto loop = Make(); |
| std::vector<aos::Fetcher<TestMessage>> fetchers; |
| std::vector<std::unique_ptr<EventLoop>> loops; |
| for (int i = 0; i < 5; ++i) { |
| fetchers.emplace_back(loop->MakeFetcher<TestMessage>("/test")); |
| loops.emplace_back(Make()); |
| loops.back()->MakeWatcher("/test", [](const TestMessage &) {}); |
| } |
| EXPECT_DEATH({ loop->MakeFetcher<TestMessage>("/test"); }, |
| "Failed to create reader on \\{ \"name\": \"/test\", \"type\": " |
| "\"aos.TestMessage\"[^}]*\\ }, too many readers."); |
| } |
| |
| // Verify that we can't create a sender inside OnRun. |
| TEST_P(AbstractEventLoopDeathTest, SenderInOnRun) { |
| auto loop1 = MakePrimary(); |
| |
| loop1->OnRun( |
| [&]() { auto sender = loop1->MakeSender<TestMessage>("/test2"); }); |
| |
| EXPECT_DEATH(Run(), "running"); |
| } |
| |
| // Verify that we can't create a watcher inside OnRun. |
| TEST_P(AbstractEventLoopDeathTest, WatcherInOnRun) { |
| auto loop1 = MakePrimary(); |
| |
| loop1->OnRun( |
| [&]() { loop1->MakeWatcher("/test", [&](const TestMessage &) {}); }); |
| |
| EXPECT_DEATH(Run(), "running"); |
| } |
| |
| // Verify that we can't create a no-arg watcher inside OnRun. |
| TEST_P(AbstractEventLoopDeathTest, NoArgWatcherInOnRun) { |
| auto loop1 = MakePrimary(); |
| |
| loop1->OnRun( |
| [&]() { loop1->MakeNoArgWatcher<TestMessage>("/test", [&]() {}); }); |
| |
| EXPECT_DEATH(Run(), "running"); |
| } |
| |
| // Verify that Quit() works when there are multiple watchers. |
| TEST_P(AbstractEventLoopTest, MultipleWatcherQuit) { |
| auto loop1 = Make(); |
| auto loop2 = MakePrimary(); |
| |
| loop2->MakeWatcher("/test1", [&](const TestMessage &) {}); |
| loop2->MakeWatcher("/test2", [&](const TestMessage &message) { |
| EXPECT_EQ(message.value(), 200); |
| this->Exit(); |
| }); |
| |
| auto sender = loop1->MakeSender<TestMessage>("/test2"); |
| |
| loop2->OnRun([&]() { |
| aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder(); |
| TestMessage::Builder builder = msg.MakeBuilder<TestMessage>(); |
| builder.add_value(200); |
| msg.CheckOk(msg.Send(builder.Finish())); |
| }); |
| |
| Run(); |
| } |
| |
| // Verify that AOS_LOG has the right name. |
| TEST_P(AbstractEventLoopTest, AOSLog) { |
| auto loop2 = MakePrimary("loop1"); |
| auto loop1 = Make("loop0"); |
| |
| auto fetcher = loop1->MakeFetcher<aos::logging::LogMessageFbs>("/aos"); |
| |
| EXPECT_FALSE(fetcher.Fetch()); |
| |
| loop2->OnRun([&]() { |
| AOS_LOG(INFO, "Testing123"); |
| this->Exit(); |
| }); |
| |
| Run(); |
| EXPECT_TRUE(fetcher.Fetch()); |
| EXPECT_EQ(fetcher->name()->string_view(), "loop1"); |
| } |
| |
| // Verify that AOS_LOG has the right name in a watcher. |
| TEST_P(AbstractEventLoopTest, AOSLogWatcher) { |
| auto loop2 = MakePrimary("loop1"); |
| auto loop1 = Make("loop0"); |
| |
| auto fetcher = loop1->MakeFetcher<aos::logging::LogMessageFbs>("/aos"); |
| |
| EXPECT_FALSE(fetcher.Fetch()); |
| |
| auto sender = loop1->MakeSender<TestMessage>("/test2"); |
| |
| loop2->MakeWatcher("/test2", [&](const TestMessage & /*message*/) { |
| AOS_LOG(INFO, "Testing123"); |
| this->Exit(); |
| }); |
| |
| loop2->OnRun([&]() { |
| aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder(); |
| TestMessage::Builder builder = msg.MakeBuilder<TestMessage>(); |
| builder.add_value(200); |
| msg.CheckOk(msg.Send(builder.Finish())); |
| }); |
| |
| Run(); |
| EXPECT_TRUE(fetcher.Fetch()); |
| EXPECT_EQ(fetcher->name()->string_view(), "loop1"); |
| } |
| |
| // Verify that AOS_LOG has the right name in a timer. |
| TEST_P(AbstractEventLoopTest, AOSLogTimer) { |
| auto loop2 = MakePrimary("loop1"); |
| auto loop1 = Make("loop0"); |
| |
| auto fetcher = loop1->MakeFetcher<aos::logging::LogMessageFbs>("/aos"); |
| |
| EXPECT_FALSE(fetcher.Fetch()); |
| |
| auto test_timer = loop2->AddTimer([&]() { |
| AOS_LOG(INFO, "Testing123"); |
| this->Exit(); |
| }); |
| |
| loop2->OnRun([&]() { test_timer->Schedule(loop2->monotonic_now()); }); |
| |
| Run(); |
| EXPECT_TRUE(fetcher.Fetch()); |
| EXPECT_EQ(fetcher->name()->string_view(), "loop1"); |
| } |
| |
| // Verify that timer intervals and duration function properly. |
| TEST_P(AbstractEventLoopTest, TimerIntervalAndDuration) { |
| // Force a slower rate so we are guaranteed to have reports for our timer. |
| absl::FlagSaver flag_saver; |
| absl::SetFlag(&FLAGS_timing_report_ms, 2000); |
| |
| const int kCount = 5; |
| |
| auto loop = MakePrimary(); |
| auto loop2 = Make(); |
| |
| ::std::vector<::aos::monotonic_clock::time_point> times; |
| ::std::vector<::aos::monotonic_clock::time_point> expected_times; |
| |
| Fetcher<timing::Report> report_fetcher = |
| loop2->MakeFetcher<timing::Report>("/aos"); |
| EXPECT_FALSE(report_fetcher.Fetch()); |
| |
| auto test_timer = loop->AddTimer([this, ×, &expected_times, &loop]() { |
| times.push_back(loop->monotonic_now()); |
| EXPECT_EQ(loop->context().monotonic_remote_time, monotonic_clock::min_time); |
| EXPECT_EQ(loop->context().monotonic_remote_transmit_time, |
| monotonic_clock::min_time); |
| EXPECT_EQ(loop->context().realtime_event_time, realtime_clock::min_time); |
| EXPECT_EQ(loop->context().realtime_remote_time, realtime_clock::min_time); |
| EXPECT_EQ(loop->context().source_boot_uuid, loop->boot_uuid()); |
| EXPECT_EQ(loop->context().queue_index, 0xffffffffu); |
| EXPECT_EQ(loop->context().size, 0u); |
| EXPECT_EQ(loop->context().data, nullptr); |
| EXPECT_EQ(loop->context().buffer_index, -1); |
| |
| expected_times.push_back(loop->context().monotonic_event_time); |
| if (times.size() == kCount) { |
| this->Exit(); |
| } |
| }); |
| test_timer->set_name("Test loop"); |
| |
| const monotonic_clock::time_point start_time = loop->monotonic_now(); |
| // TODO(austin): This should be an error... Should be done in OnRun only. |
| test_timer->Schedule(start_time + chrono::seconds(1), chrono::seconds(1)); |
| |
| Run(); |
| |
| // Confirm that we got both the right number of samples, and it's odd. |
| ASSERT_EQ(times.size(), static_cast<size_t>(kCount)); |
| ASSERT_EQ(times.size(), expected_times.size()); |
| ASSERT_EQ((times.size() % 2), 1); |
| |
| // Grab the middle sample. |
| ::aos::monotonic_clock::time_point average_time = times[times.size() / 2]; |
| |
| // Add up all the delays of all the times. |
| ::aos::monotonic_clock::duration sum = chrono::seconds(0); |
| for (const ::aos::monotonic_clock::time_point time : times) { |
| sum += time - average_time; |
| } |
| |
| // Average and add to the middle to find the average time. |
| sum /= times.size(); |
| average_time += sum; |
| |
| // Compute the offset from the average and the expected average. It |
| // should be pretty close to 0. |
| const ::aos::monotonic_clock::duration remainder = |
| average_time - start_time - chrono::seconds(times.size() / 2 + 1); |
| |
| const chrono::milliseconds kEpsilon(100); |
| EXPECT_LT(remainder, +kEpsilon); |
| EXPECT_GT(remainder, -kEpsilon); |
| |
| // Make sure that the average duration is close to 1 second. |
| EXPECT_NEAR(chrono::duration_cast<chrono::duration<double>>(times.back() - |
| times.front()) |
| .count() / |
| static_cast<double>(times.size() - 1), |
| 1.0, 0.1); |
| |
| // Confirm that the ideal wakeup times increment correctly. |
| for (size_t i = 1; i < expected_times.size(); ++i) { |
| EXPECT_EQ(expected_times[i], expected_times[i - 1] + chrono::seconds(1)); |
| } |
| |
| for (size_t i = 0; i < expected_times.size(); ++i) { |
| EXPECT_EQ((expected_times[i] - start_time) % chrono::seconds(1), |
| chrono::seconds(0)); |
| } |
| |
| EXPECT_LT(expected_times[expected_times.size() / 2], average_time + kEpsilon); |
| EXPECT_GT(expected_times[expected_times.size() / 2], average_time - kEpsilon); |
| |
| if (do_timing_reports() == DoTimingReports::kYes) { |
| // And, since we are here, check that the timing report makes sense. |
| // Start by looking for our event loop's timing. |
| FlatbufferDetachedBuffer<timing::Report> report = |
| FlatbufferDetachedBuffer<timing::Report>::Empty(); |
| while (report_fetcher.FetchNext()) { |
| if (report_fetcher->name()->string_view() == "primary") { |
| report = CopyFlatBuffer(report_fetcher.get()); |
| } |
| } |
| |
| // Confirm that we have the right number of reports, and the contents are |
| // sane. |
| VLOG(1) << FlatbufferToJson(report, {.multi_line = true}); |
| |
| EXPECT_EQ(report.message().name()->string_view(), "primary"); |
| |
| ASSERT_NE(report.message().senders(), nullptr); |
| EXPECT_EQ(report.message().senders()->size(), 2); |
| |
| ASSERT_NE(report.message().timers(), nullptr); |
| EXPECT_EQ(report.message().timers()->size(), 2); |
| |
| EXPECT_EQ(report.message().timers()->Get(0)->name()->string_view(), |
| "Test loop"); |
| EXPECT_GE(report.message().timers()->Get(0)->count(), 1); |
| |
| EXPECT_EQ(report.message().timers()->Get(1)->name()->string_view(), |
| "timing_reports"); |
| EXPECT_EQ(report.message().timers()->Get(1)->count(), 1); |
| |
| // Make sure there is a single phased loop report with our report in it. |
| ASSERT_EQ(report.message().phased_loops(), nullptr); |
| } else { |
| ASSERT_FALSE(report_fetcher.Fetch()); |
| } |
| } |
| |
| // Test that setting a default version string results in it getting populated |
| // correctly. |
| TEST_P(AbstractEventLoopTest, DefaultVersionStringInTimingReport) { |
| absl::FlagSaver flag_saver; |
| absl::SetFlag(&FLAGS_timing_report_ms, 1000); |
| |
| EventLoop::SetDefaultVersionString("default_version_string"); |
| |
| auto loop = MakePrimary(); |
| |
| Fetcher<timing::Report> report_fetcher = |
| loop->MakeFetcher<timing::Report>("/aos"); |
| |
| TimerHandler *exit_timer = loop->AddTimer([this]() { Exit(); }); |
| loop->OnRun([exit_timer, &loop, &report_fetcher]() { |
| report_fetcher.Fetch(); |
| exit_timer->Schedule(loop->monotonic_now() + std::chrono::seconds(2)); |
| }); |
| |
| Run(); |
| |
| bool found_primary_report = false; |
| while (report_fetcher.FetchNext()) { |
| if (report_fetcher->name()->string_view() == "primary") { |
| found_primary_report = true; |
| EXPECT_EQ("default_version_string", |
| report_fetcher->version()->string_view()); |
| } else { |
| FAIL() << report_fetcher->name()->string_view(); |
| } |
| } |
| |
| if (do_timing_reports() == DoTimingReports::kYes) { |
| EXPECT_TRUE(found_primary_report); |
| } else { |
| EXPECT_FALSE(found_primary_report); |
| } |
| } |
| |
| // Test that overriding the default version string results in it getting |
| // populated correctly. |
| TEST_P(AbstractEventLoopTest, OverrideDersionStringInTimingReport) { |
| absl::FlagSaver flag_saver; |
| absl::SetFlag(&FLAGS_timing_report_ms, 1000); |
| |
| EventLoop::SetDefaultVersionString("default_version_string"); |
| |
| auto loop = MakePrimary(); |
| loop->SetVersionString("override_version"); |
| |
| Fetcher<timing::Report> report_fetcher = |
| loop->MakeFetcher<timing::Report>("/aos"); |
| |
| TimerHandler *exit_timer = loop->AddTimer([this]() { Exit(); }); |
| loop->OnRun([exit_timer, &loop, &report_fetcher]() { |
| report_fetcher.Fetch(); |
| exit_timer->Schedule(loop->monotonic_now() + std::chrono::seconds(2)); |
| }); |
| |
| Run(); |
| |
| bool found_primary_report = false; |
| while (report_fetcher.FetchNext()) { |
| if (report_fetcher->name()->string_view() == "primary") { |
| found_primary_report = true; |
| EXPECT_EQ("override_version", report_fetcher->version()->string_view()); |
| } else { |
| FAIL() << report_fetcher->name()->string_view(); |
| } |
| } |
| |
| if (do_timing_reports() == DoTimingReports::kYes) { |
| EXPECT_TRUE(found_primary_report); |
| } else { |
| EXPECT_FALSE(found_primary_report); |
| } |
| } |
| |
| // Verify that we can change a timer's parameters during execution. |
| TEST_P(AbstractEventLoopTest, TimerChangeParameters) { |
| auto loop = MakePrimary(); |
| loop->SetRuntimeRealtimePriority(1); |
| std::vector<monotonic_clock::time_point> iteration_list; |
| |
| auto test_timer = loop->AddTimer([&iteration_list, &loop]() { |
| ScopedNotRealtime nrt; |
| iteration_list.push_back(loop->context().monotonic_event_time); |
| }); |
| |
| monotonic_clock::time_point s; |
| auto modifier_timer = loop->AddTimer([&test_timer, &s]() { |
| test_timer->Schedule(s + chrono::milliseconds(1750), |
| chrono::milliseconds(600)); |
| }); |
| |
| s = loop->monotonic_now(); |
| test_timer->Schedule(s, chrono::milliseconds(500)); |
| modifier_timer->Schedule(s + chrono::milliseconds(1250)); |
| EndEventLoop(loop.get(), chrono::milliseconds(3950)); |
| Run(); |
| |
| EXPECT_THAT( |
| iteration_list, |
| ::testing::ElementsAre( |
| s, s + chrono::milliseconds(500), s + chrono::milliseconds(1000), |
| s + chrono::milliseconds(1750), s + chrono::milliseconds(2350), |
| s + chrono::milliseconds(2950), s + chrono::milliseconds(3550))); |
| } |
| |
| // Verify that we can disable a timer during execution. |
| TEST_P(AbstractEventLoopTest, TimerDisable) { |
| auto loop = MakePrimary(); |
| loop->SetRuntimeRealtimePriority(1); |
| ::std::vector<::aos::monotonic_clock::time_point> iteration_list; |
| |
| auto test_timer = loop->AddTimer([&iteration_list, &loop]() { |
| ScopedNotRealtime nrt; |
| iteration_list.push_back(loop->context().monotonic_event_time); |
| }); |
| |
| auto ender_timer = loop->AddTimer([&test_timer]() { test_timer->Disable(); }); |
| |
| monotonic_clock::time_point s = loop->monotonic_now(); |
| test_timer->Schedule(s, ::std::chrono::milliseconds(500)); |
| ender_timer->Schedule(s + ::std::chrono::milliseconds(1250)); |
| EndEventLoop(loop.get(), ::std::chrono::milliseconds(2000)); |
| Run(); |
| |
| EXPECT_THAT(iteration_list, |
| ::testing::ElementsAre(s, s + chrono::milliseconds(500), |
| s + chrono::milliseconds(1000))); |
| } |
| |
| // Verify that a timer can disable itself. |
| // |
| // TODO(Brian): Do something similar with phased loops, both with a quick |
| // handler and a handler that would miss a cycle except it got deferred. Current |
| // behavior doing that is a mess. |
| TEST_P(AbstractEventLoopTest, TimerDisableSelf) { |
| auto loop = MakePrimary(); |
| |
| int count = 0; |
| aos::TimerHandler *test_timer; |
| test_timer = loop->AddTimer([&count, &test_timer]() { |
| ++count; |
| test_timer->Disable(); |
| }); |
| |
| test_timer->Schedule(loop->monotonic_now(), ::std::chrono::milliseconds(20)); |
| EndEventLoop(loop.get(), ::std::chrono::milliseconds(80)); |
| Run(); |
| |
| EXPECT_EQ(count, 1); |
| } |
| |
| // Verify that we can disable a timer during execution of another timer |
| // scheduled for the same time, with one ordering of creation for the timers. |
| // |
| // Also schedule some more events to reshuffle the heap in EventLoop used for |
| // tracking events to change up the order. This used to segfault |
| // SimulatedEventLoop. |
| TEST_P(AbstractEventLoopTest, TimerDisableOther) { |
| for (bool creation_order : {true, false}) { |
| for (bool setup_order : {true, false}) { |
| for (int shuffle_events = 0; shuffle_events < 5; ++shuffle_events) { |
| auto loop = MakePrimary(); |
| aos::TimerHandler *test_timer, *ender_timer; |
| if (creation_order) { |
| test_timer = loop->AddTimer([]() {}); |
| ender_timer = |
| loop->AddTimer([&test_timer]() { test_timer->Disable(); }); |
| } else { |
| ender_timer = |
| loop->AddTimer([&test_timer]() { test_timer->Disable(); }); |
| test_timer = loop->AddTimer([]() {}); |
| } |
| |
| const auto start = loop->monotonic_now(); |
| |
| for (int i = 0; i < shuffle_events; ++i) { |
| loop->AddTimer([]() {})->Schedule(start + |
| std::chrono::milliseconds(10)); |
| } |
| |
| if (setup_order) { |
| test_timer->Schedule(start + ::std::chrono::milliseconds(20)); |
| ender_timer->Schedule(start + ::std::chrono::milliseconds(20)); |
| } else { |
| ender_timer->Schedule(start + ::std::chrono::milliseconds(20)); |
| test_timer->Schedule(start + ::std::chrono::milliseconds(20)); |
| } |
| EndEventLoop(loop.get(), ::std::chrono::milliseconds(40)); |
| Run(); |
| } |
| } |
| } |
| } |
| |
| // Verifies that the event loop implementations detect when Channel is not a |
| // pointer into configuration(), or a name doesn't map to a channel in |
| // configuration(). |
| TEST_P(AbstractEventLoopDeathTest, InvalidChannel) { |
| auto loop = MakePrimary(); |
| |
| const Channel *channel = configuration::GetChannel( |
| loop->configuration(), "/test", "aos.TestMessage", "", nullptr); |
| |
| FlatbufferDetachedBuffer<Channel> channel_copy = CopyFlatBuffer(channel); |
| |
| EXPECT_DEATH( |
| loop->MakeRawSender(&channel_copy.message()), |
| "Channel pointer not found in configuration\\(\\)->channels\\(\\)"); |
| |
| EXPECT_DEATH( |
| loop->MakeSender<TestMessage>("/testbad"), |
| "Channel \\{ \"name\": \"/testbad\", \"type\": \"aos.TestMessage\" \\}" |
| " not found in config"); |
| |
| EXPECT_FALSE(loop->TryMakeSender<TestMessage>("/testbad")); |
| |
| EXPECT_DEATH( |
| loop->MakeRawFetcher(&channel_copy.message()), |
| "Channel pointer not found in configuration\\(\\)->channels\\(\\)"); |
| |
| EXPECT_DEATH( |
| loop->MakeFetcher<TestMessage>("/testbad"), |
| "Channel \\{ \"name\": \"/testbad\", \"type\": \"aos.TestMessage\" \\}" |
| " not found in config"); |
| |
| EXPECT_FALSE(loop->TryMakeFetcher<TestMessage>("/testbad").valid()); |
| |
| EXPECT_DEATH( |
| { |
| loop->MakeRawWatcher(&channel_copy.message(), |
| [](const Context, const void *) {}); |
| }, |
| "Channel pointer not found in configuration\\(\\)->channels\\(\\)"); |
| |
| EXPECT_DEATH( |
| { loop->MakeWatcher("/testbad", [](const TestMessage &) {}); }, |
| "Channel \\{ \"name\": \"/testbad\", \"type\": \"aos.TestMessage\" \\}" |
| " not found in config"); |
| } |
| |
| // Verifies that the event loop handles a channel which is not readable or |
| // writable on the current node nicely. |
| TEST_P(AbstractEventLoopDeathTest, InaccessibleChannel) { |
| EnableNodes("me"); |
| auto loop = MakePrimary("me"); |
| auto loop2 = Make("them"); |
| |
| const Channel *channel = configuration::GetChannel( |
| loop->configuration(), "/test_noforward", "aos.TestMessage", "", nullptr); |
| |
| FlatbufferDetachedBuffer<Channel> channel_copy = CopyFlatBuffer(channel); |
| |
| EXPECT_DEATH( |
| loop2->MakeSender<TestMessage>("/test_forward"), |
| "Channel" |
| " \\{ \"name\": \"/test_forward\", \"type\": \"aos.TestMessage\" \\}" |
| " is not able to be sent on this node"); |
| |
| EXPECT_FALSE(loop2->TryMakeSender<TestMessage>("/test_forward")); |
| |
| EXPECT_DEATH( |
| loop2->MakeRawFetcher(channel), |
| "Channel" |
| " \\{ \"name\": \"/test_noforward\", \"type\": \"aos.TestMessage\" \\}" |
| " is not able to be fetched on this node"); |
| |
| EXPECT_DEATH( |
| loop2->MakeFetcher<TestMessage>("/test_noforward"), |
| "Channel" |
| " \\{ \"name\": \"/test_noforward\", \"type\": \"aos.TestMessage\" \\}" |
| " is not able to be fetched on this node"); |
| |
| EXPECT_FALSE(loop2->TryMakeFetcher<TestMessage>("/test_noforward").valid()); |
| |
| EXPECT_DEATH( |
| { loop2->MakeRawWatcher(channel, [](const Context, const void *) {}); }, |
| "\\{ \"name\": \"/test_noforward\", \"type\": \"aos.TestMessage\", " |
| "\"source_node\": \"them\" \\}" |
| " is not able to be watched on this node"); |
| |
| EXPECT_DEATH( |
| { loop2->MakeWatcher("/test_noforward", [](const TestMessage &) {}); }, |
| "\\{ \"name\": \"/test_noforward\", \"type\": \"aos.TestMessage\", " |
| "\"source_node\": \"them\" \\}" |
| " is not able to be watched on this node"); |
| } |
| |
| // Verifies that the event loop implementations detect when Channel has an |
| // invalid alignment. |
| TEST_P(AbstractEventLoopDeathTest, InvalidChannelAlignment) { |
| const char *const kError = "multiple of alignment"; |
| InvalidChannelAlignment(); |
| |
| auto loop = MakePrimary(); |
| |
| const Channel *channel = configuration::GetChannel( |
| loop->configuration(), "/test", "aos.TestMessage", "", nullptr); |
| |
| EXPECT_DEATH({ loop->MakeRawSender(channel); }, kError); |
| EXPECT_DEATH({ loop->MakeSender<TestMessage>("/test"); }, kError); |
| |
| EXPECT_DEATH({ loop->MakeRawFetcher(channel); }, kError); |
| EXPECT_DEATH({ loop->MakeFetcher<TestMessage>("/test"); }, kError); |
| |
| EXPECT_DEATH( |
| { loop->MakeRawWatcher(channel, [](const Context &, const void *) {}); }, |
| kError); |
| EXPECT_DEATH({ loop->MakeRawNoArgWatcher(channel, [](const Context &) {}); }, |
| kError); |
| |
| EXPECT_DEATH({ loop->MakeNoArgWatcher<TestMessage>("/test", []() {}); }, |
| kError); |
| EXPECT_DEATH({ loop->MakeWatcher("/test", [](const TestMessage &) {}); }, |
| kError); |
| } |
| |
| // Verify that the send time on a message is roughly right when using a watcher. |
| TEST_P(AbstractEventLoopTest, MessageSendTime) { |
| auto loop1 = MakePrimary(); |
| auto loop2 = Make(); |
| auto sender = loop2->MakeSender<TestMessage>("/test"); |
| auto fetcher = loop2->MakeFetcher<TestMessage>("/test"); |
| |
| auto test_timer = loop1->AddTimer([&sender]() { |
| aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder(); |
| TestMessage::Builder builder = msg.MakeBuilder<TestMessage>(); |
| builder.add_value(200); |
| msg.CheckOk(msg.Send(builder.Finish())); |
| }); |
| |
| bool triggered = false; |
| loop1->MakeWatcher("/test", [&](const TestMessage &msg) { |
| // Confirm that the data pointer makes sense from a watcher, and all the |
| // timestamps look right. |
| EXPECT_GT(&msg, loop1->context().data); |
| EXPECT_EQ(loop1->context().monotonic_remote_time, |
| loop1->context().monotonic_event_time); |
| EXPECT_EQ(loop1->context().realtime_remote_time, |
| loop1->context().realtime_event_time); |
| EXPECT_EQ(loop1->context().source_boot_uuid, loop1->boot_uuid()); |
| EXPECT_EQ(loop1->context().monotonic_remote_transmit_time, |
| monotonic_clock::min_time); |
| |
| const aos::monotonic_clock::time_point monotonic_now = |
| loop1->monotonic_now(); |
| const aos::realtime_clock::time_point realtime_now = loop1->realtime_now(); |
| |
| EXPECT_LE(loop1->context().monotonic_event_time, monotonic_now); |
| EXPECT_LE(loop1->context().realtime_event_time, realtime_now); |
| EXPECT_GE(loop1->context().monotonic_event_time + chrono::milliseconds(500), |
| monotonic_now); |
| EXPECT_GE(loop1->context().realtime_event_time + chrono::milliseconds(500), |
| realtime_now); |
| |
| EXPECT_LT(&msg, reinterpret_cast<const void *>( |
| reinterpret_cast<const char *>(loop1->context().data) + |
| loop1->context().size)); |
| if (read_method() == ReadMethod::PIN) { |
| EXPECT_GE(loop1->context().buffer_index, 0); |
| EXPECT_LT(loop1->context().buffer_index, |
| loop1->NumberBuffers( |
| configuration::GetChannel(loop1->configuration(), "/test", |
| "aos.TestMessage", "", nullptr))); |
| } else { |
| EXPECT_EQ(-1, loop1->context().buffer_index); |
| } |
| triggered = true; |
| }); |
| |
| test_timer->Schedule(loop1->monotonic_now() + ::std::chrono::seconds(1)); |
| |
| EndEventLoop(loop1.get(), ::std::chrono::seconds(2)); |
| Run(); |
| |
| EXPECT_TRUE(triggered); |
| |
| ASSERT_TRUE(fetcher.Fetch()); |
| |
| monotonic_clock::duration monotonic_time_offset = |
| fetcher.context().monotonic_event_time - |
| (loop1->monotonic_now() - ::std::chrono::seconds(1)); |
| realtime_clock::duration realtime_time_offset = |
| fetcher.context().realtime_event_time - |
| (loop1->realtime_now() - ::std::chrono::seconds(1)); |
| |
| EXPECT_EQ(fetcher.context().realtime_event_time, |
| fetcher.context().realtime_remote_time); |
| EXPECT_EQ(fetcher.context().monotonic_event_time, |
| fetcher.context().monotonic_remote_time); |
| EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time, |
| monotonic_clock::min_time); |
| EXPECT_EQ(fetcher.context().source_boot_uuid, loop1->boot_uuid()); |
| |
| EXPECT_TRUE(monotonic_time_offset > ::std::chrono::milliseconds(-500)) |
| << ": Got " |
| << fetcher.context().monotonic_event_time.time_since_epoch().count() |
| << " expected " << loop1->monotonic_now().time_since_epoch().count(); |
| // Confirm that the data pointer makes sense. |
| EXPECT_GT(fetcher.get(), fetcher.context().data); |
| EXPECT_LT(fetcher.get(), |
| reinterpret_cast<const void *>( |
| reinterpret_cast<const char *>(fetcher.context().data) + |
| fetcher.context().size)); |
| EXPECT_TRUE(monotonic_time_offset < ::std::chrono::milliseconds(500)) |
| << ": Got " |
| << fetcher.context().monotonic_event_time.time_since_epoch().count() |
| << " expected " << loop1->monotonic_now().time_since_epoch().count(); |
| |
| EXPECT_TRUE(realtime_time_offset > ::std::chrono::milliseconds(-500)) |
| << ": Got " |
| << fetcher.context().realtime_event_time.time_since_epoch().count() |
| << " expected " << loop1->realtime_now().time_since_epoch().count(); |
| EXPECT_TRUE(realtime_time_offset < ::std::chrono::milliseconds(500)) |
| << ": Got " |
| << fetcher.context().realtime_event_time.time_since_epoch().count() |
| << " expected " << loop1->realtime_now().time_since_epoch().count(); |
| } |
| |
| // Verify that the send time on a message is roughly right when using a no-arg |
| // watcher. To get a message, we need to use a fetcher to actually access the |
| // message. This is also the main use case for no-arg fetchers. |
| TEST_P(AbstractEventLoopTest, MessageSendTimeNoArg) { |
| auto loop1 = MakePrimary(); |
| auto loop2 = Make(); |
| auto sender = loop2->MakeSender<TestMessage>("/test"); |
| auto fetcher = loop1->MakeFetcher<TestMessage>("/test"); |
| |
| auto test_timer = loop1->AddTimer([&sender]() { |
| aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder(); |
| TestMessage::Builder builder = msg.MakeBuilder<TestMessage>(); |
| builder.add_value(200); |
| msg.CheckOk(msg.Send(builder.Finish())); |
| }); |
| |
| bool triggered = false; |
| loop1->MakeNoArgWatcher<TestMessage>("/test", [&]() { |
| // Confirm that we can indeed use a fetcher on this channel from this |
| // context, and it results in a sane data pointer and timestamps. |
| ASSERT_TRUE(fetcher.Fetch()); |
| |
| EXPECT_EQ(loop1->context().monotonic_remote_time, |
| loop1->context().monotonic_event_time); |
| EXPECT_EQ(loop1->context().monotonic_remote_transmit_time, |
| monotonic_clock::min_time); |
| EXPECT_EQ(loop1->context().realtime_remote_time, |
| loop1->context().realtime_event_time); |
| EXPECT_EQ(loop1->context().source_boot_uuid, loop1->boot_uuid()); |
| |
| const aos::monotonic_clock::time_point monotonic_now = |
| loop1->monotonic_now(); |
| const aos::realtime_clock::time_point realtime_now = loop1->realtime_now(); |
| |
| EXPECT_LE(loop1->context().monotonic_event_time, monotonic_now); |
| EXPECT_LE(loop1->context().realtime_event_time, realtime_now); |
| EXPECT_GE(loop1->context().monotonic_event_time + chrono::milliseconds(500), |
| monotonic_now); |
| EXPECT_GE(loop1->context().realtime_event_time + chrono::milliseconds(500), |
| realtime_now); |
| |
| triggered = true; |
| }); |
| |
| test_timer->Schedule(loop1->monotonic_now() + ::std::chrono::seconds(1)); |
| |
| EndEventLoop(loop1.get(), ::std::chrono::seconds(2)); |
| Run(); |
| |
| ASSERT_TRUE(triggered); |
| |
| monotonic_clock::duration monotonic_time_offset = |
| fetcher.context().monotonic_event_time - |
| (loop1->monotonic_now() - ::std::chrono::seconds(1)); |
| realtime_clock::duration realtime_time_offset = |
| fetcher.context().realtime_event_time - |
| (loop1->realtime_now() - ::std::chrono::seconds(1)); |
| |
| EXPECT_EQ(fetcher.context().realtime_event_time, |
| fetcher.context().realtime_remote_time); |
| EXPECT_EQ(fetcher.context().monotonic_event_time, |
| fetcher.context().monotonic_remote_time); |
| EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time, |
| monotonic_clock::min_time); |
| EXPECT_EQ(fetcher.context().source_boot_uuid, loop1->boot_uuid()); |
| |
| EXPECT_TRUE(monotonic_time_offset > ::std::chrono::milliseconds(-500)) |
| << ": Got " |
| << fetcher.context().monotonic_event_time.time_since_epoch().count() |
| << " expected " << loop1->monotonic_now().time_since_epoch().count(); |
| // Confirm that the data pointer makes sense. |
| EXPECT_GT(fetcher.get(), fetcher.context().data); |
| EXPECT_LT(fetcher.get(), |
| reinterpret_cast<const void *>( |
| reinterpret_cast<const char *>(fetcher.context().data) + |
| fetcher.context().size)); |
| EXPECT_TRUE(monotonic_time_offset < ::std::chrono::milliseconds(500)) |
| << ": Got " |
| << fetcher.context().monotonic_event_time.time_since_epoch().count() |
| << " expected " << loop1->monotonic_now().time_since_epoch().count(); |
| |
| EXPECT_TRUE(realtime_time_offset > ::std::chrono::milliseconds(-500)) |
| << ": Got " |
| << fetcher.context().realtime_event_time.time_since_epoch().count() |
| << " expected " << loop1->realtime_now().time_since_epoch().count(); |
| EXPECT_TRUE(realtime_time_offset < ::std::chrono::milliseconds(500)) |
| << ": Got " |
| << fetcher.context().realtime_event_time.time_since_epoch().count() |
| << " expected " << loop1->realtime_now().time_since_epoch().count(); |
| } |
| |
| // Tests that a couple phased loops run in a row result in the correct offset |
| // and period. |
| TEST_P(AbstractEventLoopTest, PhasedLoopTest) { |
| // Force a slower rate so we are guaranteed to have reports for our phased |
| // loop. |
| absl::FlagSaver flag_saver; |
| absl::SetFlag(&FLAGS_timing_report_ms, 2000); |
| |
| const chrono::milliseconds kOffset = chrono::milliseconds(400); |
| const int kCount = 5; |
| |
| auto loop1 = MakePrimary(); |
| auto loop2 = Make(); |
| |
| Fetcher<timing::Report> report_fetcher = |
| loop2->MakeFetcher<timing::Report>("/aos"); |
| EXPECT_FALSE(report_fetcher.Fetch()); |
| |
| // Collect up a couple of samples. |
| ::std::vector<::aos::monotonic_clock::time_point> times; |
| ::std::vector<::aos::monotonic_clock::time_point> expected_times; |
| |
| // Run kCount iterations. |
| loop1 |
| ->AddPhasedLoop( |
| [×, &expected_times, &loop1, this](int count) { |
| EXPECT_EQ(count, 1); |
| times.push_back(loop1->monotonic_now()); |
| expected_times.push_back(loop1->context().monotonic_event_time); |
| |
| EXPECT_EQ(loop1->context().monotonic_remote_time, |
| monotonic_clock::min_time); |
| EXPECT_EQ(loop1->context().monotonic_remote_transmit_time, |
| monotonic_clock::min_time); |
| EXPECT_EQ(loop1->context().source_boot_uuid, loop1->boot_uuid()); |
| EXPECT_EQ(loop1->context().realtime_event_time, |
| realtime_clock::min_time); |
| EXPECT_EQ(loop1->context().realtime_remote_time, |
| realtime_clock::min_time); |
| EXPECT_EQ(loop1->context().queue_index, 0xffffffffu); |
| EXPECT_EQ(loop1->context().size, 0u); |
| EXPECT_EQ(loop1->context().data, nullptr); |
| EXPECT_EQ(loop1->context().buffer_index, -1); |
| |
| if (times.size() == kCount) { |
| LOG(INFO) << "Exiting"; |
| this->Exit(); |
| } |
| }, |
| chrono::seconds(1), kOffset) |
| ->set_name("Test loop"); |
| |
| // Add a delay to make sure that delay during startup doesn't result in a |
| // "missed cycle". |
| SleepFor(chrono::seconds(2)); |
| |
| Run(); |
| |
| // Confirm that we got both the right number of samples, and it's odd. |
| ASSERT_EQ(times.size(), static_cast<size_t>(kCount)); |
| ASSERT_EQ(times.size(), expected_times.size()); |
| ASSERT_EQ((times.size() % 2), 1); |
| |
| // Grab the middle sample. |
| ::aos::monotonic_clock::time_point average_time = times[times.size() / 2]; |
| |
| // Add up all the delays of all the times. |
| ::aos::monotonic_clock::duration sum = chrono::seconds(0); |
| for (const ::aos::monotonic_clock::time_point time : times) { |
| sum += time - average_time; |
| } |
| |
| // Average and add to the middle to find the average time. |
| sum /= times.size(); |
| average_time += sum; |
| |
| // Compute the offset from the start of the second of the average time. This |
| // should be pretty close to the offset. |
| const ::aos::monotonic_clock::duration remainder = |
| average_time.time_since_epoch() - |
| chrono::duration_cast<chrono::seconds>(average_time.time_since_epoch()); |
| |
| const chrono::milliseconds kEpsilon(100); |
| EXPECT_LT(remainder, kOffset + kEpsilon); |
| EXPECT_GT(remainder, kOffset - kEpsilon); |
| |
| // Make sure that the average duration is close to 1 second. |
| EXPECT_NEAR(chrono::duration_cast<chrono::duration<double>>(times.back() - |
| times.front()) |
| .count() / |
| static_cast<double>(times.size() - 1), |
| 1.0, 0.1); |
| |
| // Confirm that the ideal wakeup times increment correctly. |
| for (size_t i = 1; i < expected_times.size(); ++i) { |
| EXPECT_EQ(expected_times[i], expected_times[i - 1] + chrono::seconds(1)); |
| } |
| |
| for (size_t i = 0; i < expected_times.size(); ++i) { |
| EXPECT_EQ(expected_times[i].time_since_epoch() % chrono::seconds(1), |
| kOffset); |
| } |
| |
| EXPECT_LT(expected_times[expected_times.size() / 2], average_time + kEpsilon); |
| EXPECT_GT(expected_times[expected_times.size() / 2], average_time - kEpsilon); |
| |
| if (do_timing_reports() == DoTimingReports::kYes) { |
| // And, since we are here, check that the timing report makes sense. |
| // Start by looking for our event loop's timing. |
| FlatbufferDetachedBuffer<timing::Report> report = |
| FlatbufferDetachedBuffer<timing::Report>::Empty(); |
| while (report_fetcher.FetchNext()) { |
| if (report_fetcher->name()->string_view() == "primary") { |
| report = CopyFlatBuffer(report_fetcher.get()); |
| } |
| } |
| |
| VLOG(1) << FlatbufferToJson(report, {.multi_line = true}); |
| |
| EXPECT_EQ(report.message().name()->string_view(), "primary"); |
| |
| ASSERT_NE(report.message().senders(), nullptr); |
| EXPECT_EQ(report.message().senders()->size(), 2); |
| |
| ASSERT_NE(report.message().timers(), nullptr); |
| EXPECT_EQ(report.message().timers()->size(), 1); |
| |
| // Make sure there is a single phased loop report with our report in it. |
| ASSERT_NE(report.message().phased_loops(), nullptr); |
| ASSERT_EQ(report.message().phased_loops()->size(), 1); |
| EXPECT_EQ(report.message().phased_loops()->Get(0)->name()->string_view(), |
| "Test loop"); |
| EXPECT_GE(report.message().phased_loops()->Get(0)->count(), 1); |
| } else { |
| ASSERT_FALSE(report_fetcher.Fetch()); |
| } |
| } |
| |
| // Tests that a phased loop responds correctly to a changing offset. |
| TEST_P(AbstractEventLoopTest, PhasedLoopChangingOffsetTest) { |
| // Force a slower rate so we are guaranteed to have reports for our phased |
| // loop. |
| absl::FlagSaver flag_saver; |
| absl::SetFlag(&FLAGS_timing_report_ms, 2000); |
| |
| const chrono::milliseconds kOffset = chrono::milliseconds(400); |
| const chrono::milliseconds kInterval = chrono::milliseconds(1000); |
| const int kCount = 5; |
| |
| auto loop1 = MakePrimary(); |
| |
| // Collect up a couple of samples. |
| ::std::vector<::aos::monotonic_clock::time_point> times; |
| ::std::vector<::aos::monotonic_clock::time_point> expected_times; |
| |
| PhasedLoopHandler *phased_loop; |
| |
| // Run kCount iterations. |
| phased_loop = loop1->AddPhasedLoop( |
| [&phased_loop, ×, &expected_times, &loop1, this, kOffset, |
| kInterval](int count) { |
| EXPECT_EQ(count, 1); |
| times.push_back(loop1->monotonic_now()); |
| |
| expected_times.push_back(loop1->context().monotonic_event_time); |
| |
| phased_loop->set_interval_and_offset( |
| kInterval, kOffset - chrono::milliseconds(times.size())); |
| LOG(INFO) << "new offset: " |
| << (kOffset - chrono::milliseconds(times.size())).count(); |
| |
| if (times.size() == kCount) { |
| LOG(INFO) << "Exiting"; |
| this->Exit(); |
| } |
| }, |
| kInterval, kOffset); |
| phased_loop->set_name("Test loop"); |
| |
| // Add a delay to make sure that delay during startup doesn't result in a |
| // "missed cycle". |
| SleepFor(chrono::seconds(2)); |
| |
| Run(); |
| // Confirm that we got both the right number of samples, and it's odd. |
| EXPECT_EQ(times.size(), static_cast<size_t>(kCount)); |
| EXPECT_EQ(times.size(), expected_times.size()); |
| EXPECT_EQ((times.size() % 2), 1); |
| |
| // Grab the middle sample. |
| ::aos::monotonic_clock::time_point average_time = times[times.size() / 2]; |
| |
| // Add up all the delays of all the times. |
| ::aos::monotonic_clock::duration sum = chrono::seconds(0); |
| for (const ::aos::monotonic_clock::time_point time : times) { |
| sum += time - average_time; |
| } |
| |
| // Average and add to the middle to find the average time. |
| sum /= times.size(); |
| average_time += sum; |
| |
| // Compute the offset from the start of the second of the average time. This |
| // should be pretty close to the offset. |
| const ::aos::monotonic_clock::duration remainder = |
| average_time.time_since_epoch() - |
| chrono::duration_cast<chrono::seconds>(average_time.time_since_epoch()); |
| |
| const chrono::milliseconds kEpsilon(100); |
| EXPECT_LT(remainder, kOffset + kEpsilon); |
| EXPECT_GT(remainder, kOffset - kEpsilon); |
| |
| // Make sure that the average duration is close to 1 second. |
| EXPECT_NEAR(chrono::duration_cast<chrono::duration<double>>(times.back() - |
| times.front()) |
| .count() / |
| static_cast<double>(times.size() - 1), |
| 1.0, 0.1); |
| |
| // Confirm that the ideal wakeup times increment correctly. |
| for (size_t i = 1; i < expected_times.size(); ++i) { |
| LOG(INFO) << i - 1 << ": " << expected_times[i - 1] << ", " << i << ": " |
| << expected_times[i]; |
| EXPECT_EQ(expected_times[i], expected_times[i - 1] + chrono::seconds(1) - |
| chrono::milliseconds(1)); |
| } |
| |
| for (size_t i = 0; i < expected_times.size(); ++i) { |
| EXPECT_EQ(expected_times[i].time_since_epoch() % chrono::seconds(1), |
| kOffset - chrono::milliseconds(i)); |
| } |
| |
| EXPECT_LT(expected_times[expected_times.size() / 2], average_time + kEpsilon); |
| EXPECT_GT(expected_times[expected_times.size() / 2], average_time - kEpsilon); |
| } |
| |
| // Tests that a phased loop responds correctly to a changing offset; sweep |
| // across a variety of potential offset changes, to ensure that we are |
| // exercising a variety of potential cases. |
| TEST_P(AbstractEventLoopTest, PhasedLoopChangingOffsetSweep) { |
| const chrono::milliseconds kInterval = chrono::milliseconds(1000); |
| const int kCount = 5; |
| |
| auto loop1 = MakePrimary(); |
| |
| std::vector<aos::monotonic_clock::duration> offset_options; |
| for (int ii = 0; ii < kCount; ++ii) { |
| offset_options.push_back(ii * kInterval / kCount); |
| } |
| std::vector<aos::monotonic_clock::duration> offset_sweep; |
| // Run over all the pair-wise combinations of offsets. |
| for (int ii = 0; ii < kCount; ++ii) { |
| for (int jj = 0; jj < kCount; ++jj) { |
| offset_sweep.push_back(offset_options.at(ii)); |
| offset_sweep.push_back(offset_options.at(jj)); |
| } |
| } |
| |
| std::vector<::aos::monotonic_clock::time_point> expected_times; |
| |
| PhasedLoopHandler *phased_loop; |
| |
| // Run kCount iterations. |
| size_t counter = 0; |
| phased_loop = loop1->AddPhasedLoop( |
| [&phased_loop, &expected_times, &loop1, this, kInterval, &counter, |
| offset_sweep](int count) { |
| EXPECT_EQ(count, 1); |
| expected_times.push_back(loop1->context().monotonic_event_time); |
| |
| counter++; |
| |
| if (counter == offset_sweep.size()) { |
| LOG(INFO) << "Exiting"; |
| this->Exit(); |
| return; |
| } |
| |
| phased_loop->set_interval_and_offset(kInterval, |
| offset_sweep.at(counter)); |
| }, |
| kInterval, offset_sweep.at(0)); |
| |
| Run(); |
| ASSERT_EQ(expected_times.size(), offset_sweep.size()); |
| for (size_t ii = 1; ii < expected_times.size(); ++ii) { |
| EXPECT_LE(expected_times.at(ii) - expected_times.at(ii - 1), kInterval); |
| } |
| } |
| |
| // Tests that a phased loop responds correctly to being rescheduled with now |
| // equal to a time in the past. |
| TEST_P(AbstractEventLoopTest, PhasedLoopRescheduleInPast) { |
| const chrono::milliseconds kOffset = chrono::milliseconds(400); |
| const chrono::milliseconds kInterval = chrono::milliseconds(1000); |
| |
| auto loop1 = MakePrimary(); |
| |
| std::vector<::aos::monotonic_clock::time_point> expected_times; |
| |
| PhasedLoopHandler *phased_loop; |
| |
| int expected_count = 1; |
| |
| // Set up a timer that will get run immediately after the phased loop and |
| // which will attempt to reschedule the phased loop to just before now. This |
| // should succeed, but will result in 0 cycles elapsing. |
| TimerHandler *manager_timer = |
| loop1->AddTimer([&phased_loop, &loop1, &expected_count, this]() { |
| if (expected_count == 0) { |
| LOG(INFO) << "Exiting"; |
| this->Exit(); |
| return; |
| } |
| phased_loop->Reschedule(loop1->context().monotonic_event_time - |
| std::chrono::nanoseconds(1)); |
| expected_count = 0; |
| }); |
| |
| phased_loop = loop1->AddPhasedLoop( |
| [&expected_count, &expected_times, &loop1, manager_timer](int count) { |
| EXPECT_EQ(count, expected_count); |
| expected_times.push_back(loop1->context().monotonic_event_time); |
| |
| manager_timer->Schedule(loop1->context().monotonic_event_time); |
| }, |
| kInterval, kOffset); |
| phased_loop->set_name("Test loop"); |
| manager_timer->set_name("Manager timer"); |
| |
| Run(); |
| |
| ASSERT_EQ(2u, expected_times.size()); |
| ASSERT_EQ(expected_times[0], expected_times[1]); |
| } |
| |
| // Tests that a phased loop responds correctly to being rescheduled at the time |
| // when it should be triggering (it should kick the trigger to the next cycle). |
| TEST_P(AbstractEventLoopTest, PhasedLoopRescheduleNow) { |
| const chrono::milliseconds kOffset = chrono::milliseconds(400); |
| const chrono::milliseconds kInterval = chrono::milliseconds(1000); |
| |
| auto loop1 = MakePrimary(); |
| |
| std::vector<::aos::monotonic_clock::time_point> expected_times; |
| |
| PhasedLoopHandler *phased_loop; |
| |
| bool should_exit = false; |
| // Set up a timer that will get run immediately after the phased loop and |
| // which will attempt to reschedule the phased loop to now. This should |
| // succeed, but will result in no change to the expected behavior (since this |
| // is the same thing that is actually done internally). |
| TimerHandler *manager_timer = |
| loop1->AddTimer([&phased_loop, &loop1, &should_exit, this]() { |
| if (should_exit) { |
| LOG(INFO) << "Exiting"; |
| this->Exit(); |
| return; |
| } |
| phased_loop->Reschedule(loop1->context().monotonic_event_time); |
| should_exit = true; |
| }); |
| |
| phased_loop = loop1->AddPhasedLoop( |
| [&expected_times, &loop1, manager_timer](int count) { |
| EXPECT_EQ(count, 1); |
| expected_times.push_back(loop1->context().monotonic_event_time); |
| |
| manager_timer->Schedule(loop1->context().monotonic_event_time); |
| }, |
| kInterval, kOffset); |
| phased_loop->set_name("Test loop"); |
| manager_timer->set_name("Manager timer"); |
| |
| Run(); |
| |
| ASSERT_EQ(2u, expected_times.size()); |
| ASSERT_EQ(expected_times[0] + kInterval, expected_times[1]); |
| } |
| |
| // Tests that a phased loop responds correctly to being rescheduled at a time in |
| // the distant future. |
| TEST_P(AbstractEventLoopTest, PhasedLoopRescheduleFuture) { |
| const chrono::milliseconds kOffset = chrono::milliseconds(400); |
| const chrono::milliseconds kInterval = chrono::milliseconds(1000); |
| |
| auto loop1 = MakePrimary(); |
| |
| std::vector<::aos::monotonic_clock::time_point> expected_times; |
| |
| PhasedLoopHandler *phased_loop; |
| |
| bool should_exit = false; |
| int expected_count = 1; |
| TimerHandler *manager_timer = loop1->AddTimer( |
| [&expected_count, &phased_loop, &loop1, &should_exit, this, kInterval]() { |
| if (should_exit) { |
| LOG(INFO) << "Exiting"; |
| this->Exit(); |
| return; |
| } |
| expected_count = 10; |
| // Knock off 1 ns, since the scheduler rounds up when it is |
| // scheduled to exactly a loop time. |
| phased_loop->Reschedule(loop1->context().monotonic_event_time + |
| kInterval * expected_count - |
| std::chrono::nanoseconds(1)); |
| should_exit = true; |
| }); |
| |
| phased_loop = loop1->AddPhasedLoop( |
| [&expected_times, &loop1, manager_timer, &expected_count](int count) { |
| EXPECT_EQ(count, expected_count); |
| expected_times.push_back(loop1->context().monotonic_event_time); |
| |
| manager_timer->Schedule(loop1->context().monotonic_event_time); |
| }, |
| kInterval, kOffset); |
| phased_loop->set_name("Test loop"); |
| manager_timer->set_name("Manager timer"); |
| |
| Run(); |
| |
| ASSERT_EQ(2u, expected_times.size()); |
| ASSERT_EQ(expected_times[0] + expected_count * kInterval, expected_times[1]); |
| } |
| |
| // Tests that a phased loop responds correctly to having its phase offset |
| // incremented and then being scheduled after a set time, exercising a pattern |
| // where a phased loop's offset is changed while trying to maintain the trigger |
| // at a consistent period. |
| TEST_P(AbstractEventLoopTest, PhasedLoopRescheduleWithLaterOffset) { |
| const chrono::milliseconds kOffset = chrono::milliseconds(400); |
| const chrono::milliseconds kInterval = chrono::milliseconds(1000); |
| |
| auto loop1 = MakePrimary(); |
| |
| std::vector<::aos::monotonic_clock::time_point> expected_times; |
| |
| PhasedLoopHandler *phased_loop; |
| |
| bool should_exit = false; |
| TimerHandler *manager_timer = loop1->AddTimer( |
| [&phased_loop, &loop1, &should_exit, this, kInterval, kOffset]() { |
| if (should_exit) { |
| LOG(INFO) << "Exiting"; |
| this->Exit(); |
| return; |
| } |
| // Schedule the next callback to be strictly later than the current time |
| // + interval / 2, to ensure a consistent frequency. |
| monotonic_clock::time_point half_time = |
| loop1->context().monotonic_event_time + kInterval / 2; |
| phased_loop->set_interval_and_offset( |
| kInterval, kOffset + std::chrono::nanoseconds(1), half_time); |
| phased_loop->Reschedule(half_time); |
| should_exit = true; |
| }); |
| |
| phased_loop = loop1->AddPhasedLoop( |
| [&expected_times, &loop1, manager_timer](int count) { |
| EXPECT_EQ(1, count); |
| expected_times.push_back(loop1->context().monotonic_event_time); |
| |
| manager_timer->Schedule(loop1->context().monotonic_event_time); |
| }, |
| kInterval, kOffset); |
| phased_loop->set_name("Test loop"); |
| manager_timer->set_name("Manager timer"); |
| |
| Run(); |
| |
| ASSERT_EQ(2u, expected_times.size()); |
| ASSERT_EQ(expected_times[0] + kInterval + std::chrono::nanoseconds(1), |
| expected_times[1]); |
| } |
| |
| // Tests that a phased loop responds correctly to having its phase offset |
| // decremented and then being scheduled after a set time, exercising a pattern |
| // where a phased loop's offset is changed while trying to maintain the trigger |
| // at a consistent period. |
| TEST_P(AbstractEventLoopTest, PhasedLoopRescheduleWithEarlierOffset) { |
| const chrono::milliseconds kOffset = chrono::milliseconds(400); |
| const chrono::milliseconds kInterval = chrono::milliseconds(1000); |
| |
| auto loop1 = MakePrimary(); |
| |
| std::vector<::aos::monotonic_clock::time_point> expected_times; |
| |
| PhasedLoopHandler *phased_loop; |
| |
| bool should_exit = false; |
| TimerHandler *manager_timer = loop1->AddTimer( |
| [&phased_loop, &loop1, &should_exit, this, kInterval, kOffset]() { |
| if (should_exit) { |
| LOG(INFO) << "Exiting"; |
| this->Exit(); |
| return; |
| } |
| // Schedule the next callback to be strictly later than the current time |
| // + interval / 2, to ensure a consistent frequency. |
| const aos::monotonic_clock::time_point half_time = |
| loop1->context().monotonic_event_time + kInterval / 2; |
| phased_loop->set_interval_and_offset( |
| kInterval, kOffset - std::chrono::nanoseconds(1), half_time); |
| phased_loop->Reschedule(half_time); |
| should_exit = true; |
| }); |
| |
| phased_loop = loop1->AddPhasedLoop( |
| [&expected_times, &loop1, manager_timer](int count) { |
| EXPECT_EQ(1, count); |
| expected_times.push_back(loop1->context().monotonic_event_time); |
| |
| manager_timer->Schedule(loop1->context().monotonic_event_time); |
| }, |
| kInterval, kOffset); |
| phased_loop->set_name("Test loop"); |
| manager_timer->set_name("Manager timer"); |
| |
| Run(); |
| |
| ASSERT_EQ(2u, expected_times.size()); |
| ASSERT_EQ(expected_times[0] + kInterval - std::chrono::nanoseconds(1), |
| expected_times[1]); |
| } |
| |
| // Tests that senders count correctly in the timing report. |
| TEST_P(AbstractEventLoopTest, SenderTimingReport) { |
| absl::FlagSaver flag_saver; |
| absl::SetFlag(&FLAGS_timing_report_ms, 1000); |
| auto loop1 = MakePrimary(); |
| |
| auto loop2 = Make("watcher_loop"); |
| loop2->MakeWatcher("/test", [](const TestMessage &) {}); |
| |
| auto loop3 = Make(); |
| |
| Fetcher<timing::Report> report_fetcher = |
| loop3->MakeFetcher<timing::Report>("/aos"); |
| EXPECT_FALSE(report_fetcher.Fetch()); |
| |
| auto sender = loop1->MakeSender<TestMessage>("/test"); |
| |
| // Sanity check channel frequencies to ensure that we've designed the test |
| // correctly. |
| ASSERT_EQ(800, sender.channel()->frequency()); |
| ASSERT_EQ(2000000000, configuration::ChannelStorageDuration( |
| loop1->configuration(), sender.channel()) |
| .count()); |
| constexpr int kMaxAllowedMessages = 800 * 2; |
| constexpr int kSendMessages = kMaxAllowedMessages * 2; |
| constexpr int kDroppedMessages = kSendMessages - kMaxAllowedMessages; |
| |
| // Add a timer to actually quit. |
| auto test_timer = loop1->AddTimer([&sender]() { |
| for (int i = 0; i < kSendMessages; ++i) { |
| aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder(); |
| TestMessage::Builder builder = msg.MakeBuilder<TestMessage>(); |
| builder.add_value(200 + i); |
| if (i < kMaxAllowedMessages) { |
| msg.CheckOk(msg.Send(builder.Finish())); |
| } else { |
| EXPECT_EQ(RawSender::Error::kMessagesSentTooFast, |
| msg.Send(builder.Finish())); |
| } |
| } |
| }); |
| |
| // Quit after 1 timing report, mid way through the next cycle. |
| EndEventLoop(loop1.get(), chrono::milliseconds(2500)); |
| |
| loop1->OnRun([&test_timer, &loop1]() { |
| test_timer->Schedule(loop1->monotonic_now() + chrono::milliseconds(1500)); |
| }); |
| |
| Run(); |
| |
| if (do_timing_reports() == DoTimingReports::kYes) { |
| // And, since we are here, check that the timing report makes sense. |
| // Start by looking for our event loop's timing. |
| FlatbufferDetachedBuffer<timing::Report> primary_report = |
| FlatbufferDetachedBuffer<timing::Report>::Empty(); |
| while (report_fetcher.FetchNext()) { |
| VLOG(1) << "Report " << FlatbufferToJson(report_fetcher.get()); |
| if (report_fetcher->name()->string_view() == "primary") { |
| primary_report = CopyFlatBuffer(report_fetcher.get()); |
| } |
| } |
| |
| VLOG(1) << FlatbufferToJson(primary_report, {.multi_line = true}); |
| |
| EXPECT_EQ(primary_report.message().name()->string_view(), "primary"); |
| |
| ASSERT_NE(primary_report.message().senders(), nullptr); |
| EXPECT_EQ(primary_report.message().senders()->size(), 3); |
| |
| // Confirm that the sender looks sane. |
| EXPECT_EQ( |
| loop1->configuration() |
| ->channels() |
| ->Get(primary_report.message().senders()->Get(0)->channel_index()) |
| ->name() |
| ->string_view(), |
| "/test"); |
| EXPECT_EQ(primary_report.message().senders()->Get(0)->count(), |
| kMaxAllowedMessages); |
| ASSERT_TRUE(primary_report.message().senders()->Get(0)->has_error_counts()); |
| ASSERT_EQ( |
| primary_report.message().senders()->Get(0)->error_counts()->size(), 2u); |
| EXPECT_EQ( |
| primary_report.message() |
| .senders() |
| ->Get(0) |
| ->error_counts() |
| ->Get(static_cast<size_t>(timing::SendError::MESSAGE_SENT_TOO_FAST)) |
| ->count(), |
| kDroppedMessages) |
| << aos::FlatbufferToJson(primary_report); |
| EXPECT_EQ(primary_report.message() |
| .senders() |
| ->Get(0) |
| ->error_counts() |
| ->Get(static_cast<size_t>(timing::SendError::INVALID_REDZONE)) |
| ->count(), |
| 0); |
| |
| // Confirm that the timing primary_report sender looks sane. |
| EXPECT_EQ( |
| loop1->configuration() |
| ->channels() |
| ->Get(primary_report.message().senders()->Get(1)->channel_index()) |
| ->name() |
| ->string_view(), |
| "/aos"); |
| EXPECT_EQ(primary_report.message().senders()->Get(1)->count(), 1); |
| |
| ASSERT_NE(primary_report.message().timers(), nullptr); |
| EXPECT_EQ(primary_report.message().timers()->size(), 3); |
| |
| // Make sure there are no phased loops or watchers. |
| ASSERT_EQ(primary_report.message().phased_loops(), nullptr); |
| ASSERT_EQ(primary_report.message().watchers(), nullptr); |
| } else { |
| ASSERT_FALSE(report_fetcher.Fetch()); |
| } |
| } |
| |
| // Tests that the RawSender::Send(void*, size_t) overload tracks things properly |
| // in its timing report. |
| TEST_P(AbstractEventLoopTest, CopySenderTimingReport) { |
| absl::FlagSaver flag_saver; |
| absl::SetFlag(&FLAGS_timing_report_ms, 1000); |
| auto loop1 = Make(); |
| auto loop2 = MakePrimary(); |
| |
| const FlatbufferDetachedBuffer<TestMessage> kMessage = |
| JsonToFlatbuffer<TestMessage>("{}"); |
| |
| std::unique_ptr<aos::RawSender> sender = |
| loop2->MakeRawSender(configuration::GetChannel( |
| loop2->configuration(), "/test", "aos.TestMessage", "", nullptr)); |
| |
| Fetcher<timing::Report> report_fetcher = |
| loop1->MakeFetcher<timing::Report>("/aos"); |
| EXPECT_FALSE(report_fetcher.Fetch()); |
| |
| loop2->OnRun([&]() { |
| for (int ii = 0; ii < TestChannelQueueSize(loop2.get()); ++ii) { |
| EXPECT_EQ(sender->Send(kMessage.span().data(), kMessage.span().size()), |
| RawSender::Error::kOk); |
| } |
| EXPECT_EQ(sender->Send(kMessage.span().data(), kMessage.span().size()), |
| RawSender::Error::kMessagesSentTooFast); |
| }); |
| // Quit after 1 timing report, mid way through the next cycle. |
| EndEventLoop(loop2.get(), chrono::milliseconds(1500)); |
| |
| Run(); |
| |
| if (do_timing_reports() == DoTimingReports::kYes) { |
| // Check that the sent too fast actually got recorded by the timing report. |
| FlatbufferDetachedBuffer<timing::Report> primary_report = |
| FlatbufferDetachedBuffer<timing::Report>::Empty(); |
| while (report_fetcher.FetchNext()) { |
| if (report_fetcher->name()->string_view() == "primary") { |
| primary_report = CopyFlatBuffer(report_fetcher.get()); |
| } |
| } |
| |
| EXPECT_EQ(primary_report.message().name()->string_view(), "primary"); |
| |
| ASSERT_NE(primary_report.message().senders(), nullptr); |
| EXPECT_EQ(primary_report.message().senders()->size(), 3); |
| EXPECT_EQ( |
| primary_report.message() |
| .senders() |
| ->Get(0) |
| ->error_counts() |
| ->Get(static_cast<size_t>(timing::SendError::MESSAGE_SENT_TOO_FAST)) |
| ->count(), |
| 1); |
| } |
| } |
| |
| // Tests that the RawSender::Send(SharedSpan) overload works. |
| TEST_P(AbstractEventLoopTest, SharedSenderTimingReport) { |
| absl::FlagSaver flag_saver; |
| absl::SetFlag(&FLAGS_timing_report_ms, 1000); |
| auto loop1 = Make(); |
| auto loop2 = MakePrimary(); |
| |
| const FlatbufferDetachedBuffer<TestMessage> kMessage = |
| JsonToFlatbuffer<TestMessage>("{}"); |
| |
| std::unique_ptr<aos::RawSender> sender = |
| loop2->MakeRawSender(configuration::GetChannel( |
| loop2->configuration(), "/test", "aos.TestMessage", "", nullptr)); |
| |
| Fetcher<timing::Report> report_fetcher = |
| loop1->MakeFetcher<timing::Report>("/aos"); |
| EXPECT_FALSE(report_fetcher.Fetch()); |
| |
| loop2->OnRun([&]() { |
| for (int ii = 0; ii < TestChannelQueueSize(loop2.get()); ++ii) { |
| auto shared_span = MakeSharedSpan(kMessage.span().size()); |
| memcpy(shared_span.second.data(), kMessage.span().data(), |
| kMessage.span().size()); |
| EXPECT_EQ(sender->Send(std::move(shared_span.first)), |
| RawSender::Error::kOk); |
| } |
| auto shared_span = MakeSharedSpan(kMessage.span().size()); |
| memcpy(shared_span.second.data(), kMessage.span().data(), |
| kMessage.span().size()); |
| EXPECT_EQ(sender->Send(std::move(shared_span.first)), |
| RawSender::Error::kMessagesSentTooFast); |
| }); |
| // Quit after 1 timing report, mid way through the next cycle. |
| EndEventLoop(loop2.get(), chrono::milliseconds(1500)); |
| |
| Run(); |
| |
| if (do_timing_reports() == DoTimingReports::kYes) { |
| // Check that the sent too fast actually got recorded by the timing report. |
| FlatbufferDetachedBuffer<timing::Report> primary_report = |
| FlatbufferDetachedBuffer<timing::Report>::Empty(); |
| while (report_fetcher.FetchNext()) { |
| if (report_fetcher->name()->string_view() == "primary") { |
| primary_report = CopyFlatBuffer(report_fetcher.get()); |
| } |
| } |
| |
| EXPECT_EQ(primary_report.message().name()->string_view(), "primary"); |
| |
| ASSERT_NE(primary_report.message().senders(), nullptr); |
| EXPECT_EQ(primary_report.message().senders()->size(), 3); |
| EXPECT_EQ( |
| primary_report.message() |
| .senders() |
| ->Get(0) |
| ->error_counts() |
| ->Get(static_cast<size_t>(timing::SendError::MESSAGE_SENT_TOO_FAST)) |
| ->count(), |
| 1); |
| } |
| } |
| |
| // Tests that senders count correctly in the timing report. |
| TEST_P(AbstractEventLoopTest, WatcherTimingReport) { |
| absl::SetFlag(&FLAGS_timing_report_ms, 1000); |
| auto loop1 = MakePrimary(); |
| loop1->MakeWatcher("/test", [](const TestMessage &) {}); |
| |
| auto loop2 = Make("sender_loop"); |
| |
| auto loop3 = Make(); |
| |
| Fetcher<timing::Report> report_fetcher = |
| loop3->MakeFetcher<timing::Report>("/aos"); |
| EXPECT_FALSE(report_fetcher.Fetch()); |
| |
| auto sender = loop2->MakeSender<TestMessage>("/test"); |
| |
| // Add a timer to actually quit. |
| auto test_timer = loop1->AddTimer([&sender]() { |
| for (int i = 0; i < 10; ++i) { |
| aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder(); |
| TestMessage::Builder builder = msg.MakeBuilder<TestMessage>(); |
| builder.add_value(200 + i); |
| msg.CheckOk(msg.Send(builder.Finish())); |
| } |
| }); |
| |
| // Quit after 1 timing report, mid way through the next cycle. |
| EndEventLoop(loop1.get(), chrono::milliseconds(2500)); |
| |
| loop1->OnRun([&test_timer, &loop1]() { |
| test_timer->Schedule(loop1->monotonic_now() + chrono::milliseconds(1500)); |
| }); |
| |
| Run(); |
| |
| if (do_timing_reports() == DoTimingReports::kYes) { |
| // And, since we are here, check that the timing report makes sense. |
| // Start by looking for our event loop's timing. |
| FlatbufferDetachedBuffer<timing::Report> primary_report = |
| FlatbufferDetachedBuffer<timing::Report>::Empty(); |
| while (report_fetcher.FetchNext()) { |
| LOG(INFO) << "Report " << FlatbufferToJson(report_fetcher.get()); |
| if (report_fetcher->name()->string_view() == "primary") { |
| primary_report = CopyFlatBuffer(report_fetcher.get()); |
| } |
| } |
| |
| // Check the watcher report. |
| VLOG(1) << FlatbufferToJson(primary_report, {.multi_line = true}); |
| |
| EXPECT_EQ(primary_report.message().name()->string_view(), "primary"); |
| |
| // Just the timing report timer. |
| ASSERT_NE(primary_report.message().timers(), nullptr); |
| EXPECT_EQ(primary_report.message().timers()->size(), 3); |
| |
| // No phased loops |
| ASSERT_EQ(primary_report.message().phased_loops(), nullptr); |
| |
| ASSERT_NE(primary_report.message().watchers(), nullptr); |
| ASSERT_EQ(primary_report.message().watchers()->size(), 1); |
| EXPECT_EQ(primary_report.message().watchers()->Get(0)->count(), 10); |
| } else { |
| ASSERT_FALSE(report_fetcher.Fetch()); |
| } |
| } |
| |
| // Tests that fetchers count correctly in the timing report. |
| TEST_P(AbstractEventLoopTest, FetcherTimingReport) { |
| absl::SetFlag(&FLAGS_timing_report_ms, 1000); |
| auto loop1 = MakePrimary(); |
| auto loop2 = Make("sender_loop"); |
| |
| auto loop3 = Make(); |
| |
| Fetcher<timing::Report> report_fetcher = |
| loop3->MakeFetcher<timing::Report>("/aos"); |
| EXPECT_FALSE(report_fetcher.Fetch()); |
| |
| auto sender = loop2->MakeSender<TestMessage>("/test"); |
| auto fetcher1 = loop1->MakeFetcher<TestMessage>("/test"); |
| auto fetcher2 = loop1->MakeFetcher<TestMessage>("/test"); |
| fetcher1.Fetch(); |
| fetcher2.Fetch(); |
| |
| // Add a timer to actually quit. |
| auto test_timer = loop1->AddTimer([&sender]() { |
| for (int i = 0; i < 10; ++i) { |
| aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder(); |
| TestMessage::Builder builder = msg.MakeBuilder<TestMessage>(); |
| builder.add_value(200 + i); |
| msg.CheckOk(msg.Send(builder.Finish())); |
| } |
| }); |
| |
| auto test_timer2 = loop1->AddTimer([&fetcher1, &fetcher2]() { |
| fetcher1.Fetch(); |
| while (fetcher2.FetchNext()) { |
| } |
| }); |
| |
| // Quit after 1 timing report, mid way through the next cycle. |
| EndEventLoop(loop1.get(), chrono::milliseconds(2500)); |
| |
| loop1->OnRun([test_timer, test_timer2, &loop1]() { |
| test_timer->Schedule(loop1->monotonic_now() + chrono::milliseconds(1400)); |
| test_timer2->Schedule(loop1->monotonic_now() + chrono::milliseconds(1600)); |
| }); |
| |
| Run(); |
| |
| if (do_timing_reports() == DoTimingReports::kYes) { |
| // And, since we are here, check that the timing report makes sense. |
| // Start by looking for our event loop's timing. |
| FlatbufferDetachedBuffer<timing::Report> primary_report = |
| FlatbufferDetachedBuffer<timing::Report>::Empty(); |
| while (report_fetcher.FetchNext()) { |
| if (report_fetcher->name()->string_view() == "primary") { |
| primary_report = CopyFlatBuffer(report_fetcher.get()); |
| } |
| } |
| |
| VLOG(1) << FlatbufferToJson(primary_report, {.multi_line = true}); |
| |
| EXPECT_EQ(primary_report.message().name()->string_view(), "primary"); |
| |
| ASSERT_NE(primary_report.message().senders(), nullptr); |
| EXPECT_EQ(primary_report.message().senders()->size(), 2); |
| |
| ASSERT_NE(primary_report.message().timers(), nullptr); |
| EXPECT_EQ(primary_report.message().timers()->size(), 4); |
| |
| // Make sure there are no phased loops or watchers. |
| ASSERT_EQ(primary_report.message().phased_loops(), nullptr); |
| ASSERT_EQ(primary_report.message().watchers(), nullptr); |
| |
| // Now look at the fetchrs. |
| ASSERT_NE(primary_report.message().fetchers(), nullptr); |
| ASSERT_EQ(primary_report.message().fetchers()->size(), 2); |
| |
| EXPECT_EQ(primary_report.message().fetchers()->Get(0)->count(), 1); |
| EXPECT_GE(primary_report.message().fetchers()->Get(0)->latency()->average(), |
| 0.1); |
| EXPECT_GE(primary_report.message().fetchers()->Get(0)->latency()->min(), |
| 0.1); |
| EXPECT_GE(primary_report.message().fetchers()->Get(0)->latency()->max(), |
| 0.1); |
| EXPECT_EQ(primary_report.message() |
| .fetchers() |
| ->Get(0) |
| ->latency() |
| ->standard_deviation(), |
| 0.0); |
| |
| EXPECT_EQ(primary_report.message().fetchers()->Get(1)->count(), 10); |
| } else { |
| ASSERT_FALSE(report_fetcher.Fetch()); |
| } |
| } |
| |
| // Tests that a raw watcher and raw fetcher can receive messages from a raw |
| // sender without messing up offsets. |
| TEST_P(AbstractEventLoopTest, RawBasic) { |
| auto loop1 = Make(); |
| auto loop2 = MakePrimary(); |
| auto loop3 = Make(); |
| |
| const FlatbufferDetachedBuffer<TestMessage> kMessage = |
| JsonToFlatbuffer<TestMessage>("{}"); |
| |
| std::unique_ptr<aos::RawSender> sender = |
| loop1->MakeRawSender(configuration::GetChannel( |
| loop1->configuration(), "/test", "aos.TestMessage", "", nullptr)); |
| |
| std::unique_ptr<aos::RawFetcher> fetcher = |
| loop3->MakeRawFetcher(configuration::GetChannel( |
| loop3->configuration(), "/test", "aos.TestMessage", "", nullptr)); |
| |
| loop2->OnRun([&]() { |
| EXPECT_EQ(sender->Send(kMessage.span().data(), kMessage.span().size()), |
| RawSender::Error::kOk); |
| }); |
| |
| bool happened = false; |
| loop2->MakeRawWatcher( |
| configuration::GetChannel(loop2->configuration(), "/test", |
| "aos.TestMessage", "", nullptr), |
| [this, &kMessage, &fetcher, &happened](const Context &context, |
| const void *message) { |
| happened = true; |
| EXPECT_EQ( |
| kMessage.span(), |
| absl::Span<const uint8_t>( |
| reinterpret_cast<const uint8_t *>(message), context.size)); |
| EXPECT_EQ(message, context.data); |
| |
| ASSERT_TRUE(fetcher->Fetch()); |
| |
| EXPECT_EQ(kMessage.span(), |
| absl::Span<const uint8_t>(reinterpret_cast<const uint8_t *>( |
| fetcher->context().data), |
| fetcher->context().size)); |
| |
| this->Exit(); |
| }); |
| |
| EXPECT_FALSE(happened); |
| Run(); |
| EXPECT_TRUE(happened); |
| } |
| |
| // Tests that a raw watcher and raw fetcher can receive messages from a raw |
| // sender without messing up offsets, using the RawSpan overload. |
| TEST_P(AbstractEventLoopTest, RawBasicSharedSpan) { |
| auto loop1 = Make(); |
| auto loop2 = MakePrimary(); |
| auto loop3 = Make(); |
| |
| const FlatbufferDetachedBuffer<TestMessage> kMessage = |
| JsonToFlatbuffer<TestMessage>("{}"); |
| |
| std::unique_ptr<aos::RawSender> sender = |
| loop1->MakeRawSender(configuration::GetChannel( |
| loop1->configuration(), "/test", "aos.TestMessage", "", nullptr)); |
| |
| std::unique_ptr<aos::RawFetcher> fetcher = |
| loop3->MakeRawFetcher(configuration::GetChannel( |
| loop3->configuration(), "/test", "aos.TestMessage", "", nullptr)); |
| |
| loop2->OnRun([&]() { |
| auto shared_span = MakeSharedSpan(kMessage.span().size()); |
| memcpy(shared_span.second.data(), kMessage.span().data(), |
| kMessage.span().size()); |
| sender->CheckOk(sender->Send(std::move(shared_span.first))); |
| }); |
| |
| bool happened = false; |
| loop2->MakeRawWatcher( |
| configuration::GetChannel(loop2->configuration(), "/test", |
| "aos.TestMessage", "", nullptr), |
| [this, &kMessage, &fetcher, &happened](const Context &context, |
| const void *message) { |
| happened = true; |
| EXPECT_EQ( |
| kMessage.span(), |
| absl::Span<const uint8_t>( |
| reinterpret_cast<const uint8_t *>(message), context.size)); |
| EXPECT_EQ(message, context.data); |
| |
| ASSERT_TRUE(fetcher->Fetch()); |
| |
| EXPECT_EQ(kMessage.span(), |
| absl::Span<const uint8_t>(reinterpret_cast<const uint8_t *>( |
| fetcher->context().data), |
| fetcher->context().size)); |
| |
| this->Exit(); |
| }); |
| |
| EXPECT_FALSE(happened); |
| Run(); |
| EXPECT_TRUE(happened); |
| } |
| |
| // Tests that a raw watcher and raw fetcher can receive messages from a raw |
| // sender with remote times filled out. |
| TEST_P(AbstractEventLoopTest, RawRemoteTimes) { |
| auto loop1 = Make(); |
| auto loop2 = MakePrimary(); |
| auto loop3 = Make(); |
| |
| const FlatbufferDetachedBuffer<TestMessage> kMessage = |
| JsonToFlatbuffer<TestMessage>("{}"); |
| |
| const aos::monotonic_clock::time_point monotonic_remote_time = |
| aos::monotonic_clock::time_point(chrono::seconds(1501)); |
| const aos::realtime_clock::time_point realtime_remote_time = |
| aos::realtime_clock::time_point(chrono::seconds(3132)); |
| const aos::monotonic_clock::time_point monotonic_remote_transmit_time = |
| aos::monotonic_clock::time_point(chrono::seconds(1601)); |
| const uint32_t remote_queue_index = 0x254971; |
| const UUID source_boot_uuid = UUID::Random(); |
| |
| std::unique_ptr<aos::RawSender> sender = |
| loop1->MakeRawSender(configuration::GetChannel( |
| loop1->configuration(), "/test", "aos.TestMessage", "", nullptr)); |
| |
| std::unique_ptr<aos::RawFetcher> fetcher = |
| loop3->MakeRawFetcher(configuration::GetChannel( |
| loop3->configuration(), "/test", "aos.TestMessage", "", nullptr)); |
| |
| loop2->OnRun([&]() { |
| EXPECT_EQ(sender->Send(kMessage.span().data(), kMessage.span().size(), |
| monotonic_remote_time, realtime_remote_time, |
| monotonic_remote_transmit_time, remote_queue_index, |
| source_boot_uuid), |
| RawSender::Error::kOk); |
| }); |
| |
| bool happened = false; |
| loop2->MakeRawWatcher( |
| configuration::GetChannel(loop2->configuration(), "/test", |
| "aos.TestMessage", "", nullptr), |
| [this, monotonic_remote_time, realtime_remote_time, |
| monotonic_remote_transmit_time, source_boot_uuid, remote_queue_index, |
| &fetcher, &happened](const Context &context, const void * /*message*/) { |
| happened = true; |
| EXPECT_EQ(monotonic_remote_time, context.monotonic_remote_time); |
| EXPECT_EQ(realtime_remote_time, context.realtime_remote_time); |
| EXPECT_EQ(source_boot_uuid, context.source_boot_uuid); |
| EXPECT_EQ(remote_queue_index, context.remote_queue_index); |
| |
| ASSERT_TRUE(fetcher->Fetch()); |
| EXPECT_EQ(monotonic_remote_time, |
| fetcher->context().monotonic_remote_time); |
| EXPECT_EQ(realtime_remote_time, |
| fetcher->context().realtime_remote_time); |
| EXPECT_EQ(monotonic_remote_transmit_time, |
| fetcher->context().monotonic_remote_transmit_time); |
| |
| this->Exit(); |
| }); |
| |
| EXPECT_FALSE(happened); |
| Run(); |
| EXPECT_TRUE(happened); |
| |
| // Confirm everything goes back. |
| EXPECT_EQ(loop2->context().monotonic_event_time, monotonic_clock::min_time); |
| EXPECT_EQ(loop2->context().monotonic_remote_time, monotonic_clock::min_time); |
| EXPECT_EQ(loop2->context().monotonic_remote_transmit_time, |
| monotonic_clock::min_time); |
| EXPECT_EQ(loop2->context().realtime_event_time, realtime_clock::min_time); |
| EXPECT_EQ(loop2->context().realtime_remote_time, realtime_clock::min_time); |
| EXPECT_EQ(loop2->context().source_boot_uuid, loop2->boot_uuid()); |
| EXPECT_EQ(loop2->context().queue_index, 0xffffffffu); |
| EXPECT_EQ(loop2->context().size, 0u); |
| EXPECT_EQ(loop2->context().data, nullptr); |
| EXPECT_EQ(loop2->context().buffer_index, -1); |
| } |
| |
| // Tests that a raw sender fills out sent data. |
| TEST_P(AbstractEventLoopTest, RawSenderSentData) { |
| auto loop1 = MakePrimary(); |
| |
| const FlatbufferDetachedBuffer<TestMessage> kMessage = |
| JsonToFlatbuffer<TestMessage>("{}"); |
| |
| std::unique_ptr<aos::RawSender> sender = |
| loop1->MakeRawSender(configuration::GetChannel( |
| loop1->configuration(), "/test", "aos.TestMessage", "", nullptr)); |
| |
| const aos::monotonic_clock::time_point monotonic_now = loop1->monotonic_now(); |
| const aos::realtime_clock::time_point realtime_now = loop1->realtime_now(); |
| |
| EXPECT_EQ(sender->Send(kMessage.span().data(), kMessage.span().size()), |
| RawSender::Error::kOk); |
| |
| EXPECT_GE(sender->monotonic_sent_time(), monotonic_now); |
| EXPECT_LE(sender->monotonic_sent_time(), |
| monotonic_now + chrono::milliseconds(100)); |
| EXPECT_GE(sender->realtime_sent_time(), realtime_now); |
| EXPECT_LE(sender->realtime_sent_time(), |
| realtime_now + chrono::milliseconds(100)); |
| EXPECT_EQ(sender->sent_queue_index(), 0u); |
| |
| EXPECT_EQ(sender->Send(kMessage.span().data(), kMessage.span().size()), |
| RawSender::Error::kOk); |
| |
| EXPECT_GE(sender->monotonic_sent_time(), monotonic_now); |
| EXPECT_LE(sender->monotonic_sent_time(), |
| monotonic_now + chrono::milliseconds(100)); |
| EXPECT_GE(sender->realtime_sent_time(), realtime_now); |
| EXPECT_LE(sender->realtime_sent_time(), |
| realtime_now + chrono::milliseconds(100)); |
| EXPECT_EQ(sender->sent_queue_index(), 1u); |
| } |
| |
| // Tests that not setting up nodes results in no node. |
| TEST_P(AbstractEventLoopTest, NoNode) { |
| auto loop1 = Make(); |
| auto loop2 = MakePrimary(); |
| |
| EXPECT_EQ(loop1->node(), nullptr); |
| EXPECT_EQ(loop2->node(), nullptr); |
| } |
| |
| // Tests that setting up nodes results in node being set. |
| TEST_P(AbstractEventLoopTest, Node) { |
| EnableNodes("me"); |
| |
| auto loop1 = Make(); |
| auto loop2 = MakePrimary(); |
| |
| EXPECT_NE(loop1->node(), nullptr); |
| EXPECT_NE(loop2->node(), nullptr); |
| } |
| |
| // Tests that watchers work with a node setup. |
| TEST_P(AbstractEventLoopTest, NodeWatcher) { |
| EnableNodes("me"); |
| |
| auto loop1 = Make(); |
| auto loop2 = Make(); |
| loop1->MakeWatcher("/test", [](const TestMessage &) {}); |
| loop2->MakeRawWatcher( |
| configuration::GetChannel(configuration(), "/test", "aos.TestMessage", "", |
| nullptr), |
| [](const Context &, const void *) {}); |
| } |
| |
| // Tests that no-arg watchers work with a node setup. |
| TEST_P(AbstractEventLoopTest, NodeNoArgWatcher) { |
| EnableNodes("me"); |
| |
| auto loop1 = Make(); |
| auto loop2 = Make(); |
| loop1->MakeWatcher("/test", [](const TestMessage &) {}); |
| loop2->MakeRawNoArgWatcher( |
| configuration::GetChannel(configuration(), "/test", "aos.TestMessage", "", |
| nullptr), |
| [](const Context &) {}); |
| } |
| |
| // Tests that fetcher work with a node setup. |
| TEST_P(AbstractEventLoopTest, NodeFetcher) { |
| EnableNodes("me"); |
| auto loop1 = Make(); |
| |
| auto fetcher = loop1->MakeFetcher<TestMessage>("/test"); |
| auto raw_fetcher = loop1->MakeRawFetcher(configuration::GetChannel( |
| configuration(), "/test", "aos.TestMessage", "", nullptr)); |
| } |
| |
| // Tests that sender work with a node setup. |
| TEST_P(AbstractEventLoopTest, NodeSender) { |
| EnableNodes("me"); |
| auto loop1 = Make(); |
| |
| aos::Sender<TestMessage> sender = loop1->MakeSender<TestMessage>("/test"); |
| } |
| |
| // Tests that a non-realtime event loop timer is marked non-realtime. |
| TEST_P(AbstractEventLoopTest, NonRealtimeEventLoopTimer) { |
| auto loop1 = MakePrimary(); |
| |
| // Add a timer to actually quit. |
| auto test_timer = loop1->AddTimer([this]() { |
| CheckNotRealtime(); |
| this->Exit(); |
| }); |
| |
| loop1->OnRun([&test_timer, &loop1]() { |
| CheckNotRealtime(); |
| test_timer->Schedule(loop1->monotonic_now(), |
| ::std::chrono::milliseconds(100)); |
| }); |
| |
| Run(); |
| } |
| |
| // Tests that a realtime event loop timer is marked realtime. |
| TEST_P(AbstractEventLoopTest, RealtimeSend) { |
| auto loop1 = MakePrimary(); |
| |
| loop1->SetRuntimeRealtimePriority(1); |
| |
| auto sender = loop1->MakeSender<TestMessage>("/test2"); |
| |
| loop1->OnRun([&]() { |
| CheckRealtime(); |
| |
| aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder(); |
| TestMessage::Builder builder = msg.MakeBuilder<TestMessage>(); |
| builder.add_value(200); |
| msg.CheckOk(msg.Send(builder.Finish())); |
| |
| this->Exit(); |
| }); |
| |
| Run(); |
| } |
| |
| // Tests that a realtime event loop timer is marked realtime. |
| TEST_P(AbstractEventLoopTest, RealtimeEventLoopTimer) { |
| auto loop1 = MakePrimary(); |
| |
| loop1->SetRuntimeRealtimePriority(1); |
| |
| // Add a timer to actually quit. |
| auto test_timer = loop1->AddTimer([this]() { |
| CheckRealtime(); |
| this->Exit(); |
| }); |
| |
| loop1->OnRun([&test_timer, &loop1]() { |
| CheckRealtime(); |
| test_timer->Schedule(loop1->monotonic_now(), |
| ::std::chrono::milliseconds(100)); |
| }); |
| |
| Run(); |
| } |
| |
| // Tests that a non-realtime event loop phased loop is marked non-realtime. |
| TEST_P(AbstractEventLoopTest, NonRealtimeEventLoopPhasedLoop) { |
| auto loop1 = MakePrimary(); |
| |
| // Add a timer to actually quit. |
| loop1->AddPhasedLoop( |
| [this](int) { |
| CheckNotRealtime(); |
| this->Exit(); |
| }, |
| chrono::seconds(1), chrono::seconds(0)); |
| |
| Run(); |
| } |
| |
| // Tests that a realtime event loop phased loop is marked realtime. |
| TEST_P(AbstractEventLoopTest, RealtimeEventLoopPhasedLoop) { |
| auto loop1 = MakePrimary(); |
| |
| loop1->SetRuntimeRealtimePriority(1); |
| |
| // Add a timer to actually quit. |
| loop1->AddPhasedLoop( |
| [this](int) { |
| CheckRealtime(); |
| this->Exit(); |
| }, |
| chrono::seconds(1), chrono::seconds(0)); |
| |
| Run(); |
| } |
| |
| // Tests that a non-realtime event loop watcher is marked non-realtime. |
| TEST_P(AbstractEventLoopTest, NonRealtimeEventLoopWatcher) { |
| auto loop1 = MakePrimary(); |
| auto loop2 = Make(); |
| |
| aos::Sender<TestMessage> sender = loop2->MakeSender<TestMessage>("/test"); |
| |
| loop1->OnRun([&]() { |
| aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder(); |
| TestMessage::Builder builder = msg.MakeBuilder<TestMessage>(); |
| msg.CheckOk(msg.Send(builder.Finish())); |
| }); |
| |
| loop1->MakeWatcher("/test", [&](const TestMessage &) { |
| CheckNotRealtime(); |
| this->Exit(); |
| }); |
| |
| Run(); |
| } |
| |
| // Tests that a realtime event loop watcher is marked realtime. |
| TEST_P(AbstractEventLoopTest, RealtimeEventLoopWatcher) { |
| auto loop1 = MakePrimary(); |
| auto loop2 = Make(); |
| |
| loop1->SetRuntimeRealtimePriority(1); |
| |
| aos::Sender<TestMessage> sender = loop2->MakeSender<TestMessage>("/test"); |
| |
| loop1->OnRun([&]() { |
| aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder(); |
| TestMessage::Builder builder = msg.MakeBuilder<TestMessage>(); |
| msg.CheckOk(msg.Send(builder.Finish())); |
| }); |
| |
| loop1->MakeWatcher("/test", [&](const TestMessage &) { |
| CheckRealtime(); |
| this->Exit(); |
| }); |
| |
| Run(); |
| } |
| |
| // Tests that event loop's context's monotonic time is set to a value on OnRun. |
| TEST_P(AbstractEventLoopTest, SetContextOnRun) { |
| auto loop = MakePrimary(); |
| |
| EXPECT_EQ(loop->context().monotonic_event_time, monotonic_clock::min_time); |
| EXPECT_EQ(loop->context().monotonic_remote_time, monotonic_clock::min_time); |
| EXPECT_EQ(loop->context().monotonic_remote_transmit_time, |
| monotonic_clock::min_time); |
| EXPECT_EQ(loop->context().realtime_event_time, realtime_clock::min_time); |
| EXPECT_EQ(loop->context().realtime_remote_time, realtime_clock::min_time); |
| EXPECT_EQ(loop->context().source_boot_uuid, loop->boot_uuid()); |
| EXPECT_EQ(loop->context().queue_index, 0xffffffffu); |
| EXPECT_EQ(loop->context().remote_queue_index, 0xffffffffu); |
| EXPECT_EQ(loop->context().size, 0u); |
| EXPECT_EQ(loop->context().data, nullptr); |
| EXPECT_EQ(loop->context().buffer_index, -1); |
| |
| // We want to check that monotonic event time is before monotonic now |
| // called inside of callback, but after time point obtained callback. |
| aos::monotonic_clock::time_point monotonic_event_time_on_run; |
| |
| loop->OnRun([&]() { |
| monotonic_event_time_on_run = loop->context().monotonic_event_time; |
| EXPECT_LE(monotonic_event_time_on_run, loop->monotonic_now()); |
| EXPECT_EQ(loop->context().monotonic_remote_time, monotonic_clock::min_time); |
| EXPECT_EQ(loop->context().monotonic_remote_transmit_time, |
| monotonic_clock::min_time); |
| EXPECT_EQ(loop->context().realtime_event_time, realtime_clock::min_time); |
| EXPECT_EQ(loop->context().realtime_remote_time, realtime_clock::min_time); |
| EXPECT_EQ(loop->context().source_boot_uuid, loop->boot_uuid()); |
| EXPECT_EQ(loop->context().queue_index, 0xffffffffu); |
| EXPECT_EQ(loop->context().remote_queue_index, 0xffffffffu); |
| EXPECT_EQ(loop->context().size, 0u); |
| EXPECT_EQ(loop->context().data, nullptr); |
| EXPECT_EQ(loop->context().buffer_index, -1); |
| }); |
| |
| EndEventLoop(loop.get(), ::std::chrono::milliseconds(200)); |
| |
| const aos::monotonic_clock::time_point before_run_time = |
| loop->monotonic_now(); |
| Run(); |
| EXPECT_GE(monotonic_event_time_on_run, before_run_time); |
| |
| EXPECT_EQ(loop->context().monotonic_event_time, monotonic_clock::min_time); |
| EXPECT_EQ(loop->context().monotonic_remote_time, monotonic_clock::min_time); |
| EXPECT_EQ(loop->context().monotonic_remote_transmit_time, |
| monotonic_clock::min_time); |
| EXPECT_EQ(loop->context().realtime_event_time, realtime_clock::min_time); |
| EXPECT_EQ(loop->context().realtime_remote_time, realtime_clock::min_time); |
| EXPECT_EQ(loop->context().source_boot_uuid, loop->boot_uuid()); |
| EXPECT_EQ(loop->context().queue_index, 0xffffffffu); |
| EXPECT_EQ(loop->context().remote_queue_index, 0xffffffffu); |
| EXPECT_EQ(loop->context().size, 0u); |
| EXPECT_EQ(loop->context().data, nullptr); |
| EXPECT_EQ(loop->context().buffer_index, -1); |
| } |
| |
| // Tests that watchers fail when created on the wrong node. |
| TEST_P(AbstractEventLoopDeathTest, NodeWatcher) { |
| EnableNodes("them"); |
| |
| auto loop1 = Make(); |
| auto loop2 = Make(); |
| EXPECT_DEATH({ loop1->MakeWatcher("/test", [](const TestMessage &) {}); }, |
| "node"); |
| EXPECT_DEATH( |
| { |
| loop2->MakeRawWatcher( |
| configuration::GetChannel(configuration(), "/test", |
| "aos.TestMessage", "", nullptr), |
| [](const Context &, const void *) {}); |
| }, |
| "node"); |
| EXPECT_DEATH({ loop1->MakeNoArgWatcher<TestMessage>("/test", []() {}); }, |
| "node"); |
| EXPECT_DEATH( |
| { |
| loop2->MakeRawNoArgWatcher( |
| configuration::GetChannel(configuration(), "/test", |
| "aos.TestMessage", "", nullptr), |
| [](const Context &) {}); |
| }, |
| "node"); |
| } |
| |
| // Tests that fetchers fail when created on the wrong node. |
| TEST_P(AbstractEventLoopDeathTest, NodeFetcher) { |
| EnableNodes("them"); |
| auto loop1 = Make(); |
| |
| EXPECT_DEATH({ auto fetcher = loop1->MakeFetcher<TestMessage>("/test"); }, |
| "node"); |
| EXPECT_DEATH( |
| { |
| auto raw_fetcher = loop1->MakeRawFetcher(configuration::GetChannel( |
| configuration(), "/test", "aos.TestMessage", "", nullptr)); |
| }, |
| "node"); |
| } |
| |
| // Tests that senders fail when created on the wrong node. |
| TEST_P(AbstractEventLoopDeathTest, NodeSender) { |
| EnableNodes("them"); |
| auto loop1 = Make(); |
| |
| EXPECT_DEATH( |
| { |
| aos::Sender<TestMessage> sender = |
| loop1->MakeSender<TestMessage>("/test"); |
| }, |
| "node"); |
| |
| // Note: Creating raw senders is always supported. Right now, this lets us |
| // use them to create message_gateway. |
| } |
| |
| // Tests creating multiple Builders from a single Sender at the same time. |
| TEST_P(AbstractEventLoopDeathTest, MultipleBuilders) { |
| auto loop1 = Make(); |
| aos::Sender<TestMessage> sender = loop1->MakeSender<TestMessage>("/test"); |
| |
| { auto builder = sender.MakeBuilder(); } |
| { |
| auto builder = sender.MakeBuilder(); |
| builder.MakeBuilder<TestMessage>().Finish(); |
| } |
| { |
| // Creating this after the first one was destroyed should be fine. |
| auto builder = sender.MakeBuilder(); |
| builder.MakeBuilder<TestMessage>().Finish(); |
| // But not a second one. |
| EXPECT_DEATH(sender.MakeBuilder().MakeBuilder<TestMessage>().Finish(), |
| "May not have multiple active allocators"); |
| } |
| |
| FlatbufferDetachedBuffer<TestMessage> detached = |
| flatbuffers::DetachedBuffer(); |
| { |
| auto builder = sender.MakeBuilder(); |
| detached = builder.Detach(builder.MakeBuilder<TestMessage>().Finish()); |
| } |
| { |
| // This is the second one, after the detached one, so it should fail. |
| EXPECT_DEATH(sender.MakeBuilder().MakeBuilder<TestMessage>().Finish(), |
| "May not have multiple active allocators"); |
| } |
| |
| // Clear the detached one, and then we should be able to create another. |
| detached = flatbuffers::DetachedBuffer(); |
| { |
| auto builder = sender.MakeBuilder(); |
| builder.MakeBuilder<TestMessage>().Finish(); |
| } |
| |
| // And then detach another one. |
| { |
| auto builder = sender.MakeBuilder(); |
| detached = builder.Detach(builder.MakeBuilder<TestMessage>().Finish()); |
| } |
| } |
| |
| // Tests sending a buffer detached from a different builder. |
| TEST_P(AbstractEventLoopDeathTest, WrongDetachedBuffer) { |
| auto loop1 = Make(); |
| aos::Sender<TestMessage> sender1 = loop1->MakeSender<TestMessage>("/test"); |
| aos::Sender<TestMessage> sender2 = loop1->MakeSender<TestMessage>("/test"); |
| |
| auto builder = sender1.MakeBuilder(); |
| FlatbufferDetachedBuffer<TestMessage> detached = |
| builder.Detach(builder.MakeBuilder<TestMessage>().Finish()); |
| EXPECT_DEATH(sender2.CheckOk(sender2.SendDetached(std::move(detached))), |
| "May only send the buffer detached from this Sender"); |
| } |
| |
| // Tests that senders fail when created on the wrong node. |
| TEST_P(AbstractEventLoopDeathTest, SetVersionWhileRunning) { |
| auto loop1 = MakePrimary(); |
| |
| loop1->OnRun([&loop1, this]() { |
| EXPECT_DEATH({ loop1->SetVersionString("abcdef"); }, |
| "timing report while running"); |
| Exit(); |
| }); |
| |
| Run(); |
| } |
| |
| int TestChannelFrequency(EventLoop *event_loop) { |
| return event_loop->GetChannel<TestMessage>("/test")->frequency(); |
| } |
| |
| int TestChannelQueueSize(EventLoop *event_loop) { |
| return configuration::QueueSize(event_loop->configuration(), |
| event_loop->GetChannel<TestMessage>("/test")); |
| } |
| |
| RawSender::Error SendTestMessage(aos::Sender<TestMessage> &sender) { |
| aos::Sender<TestMessage>::Builder builder = sender.MakeBuilder(); |
| TestMessage::Builder test_message_builder = |
| builder.MakeBuilder<TestMessage>(); |
| test_message_builder.add_value(0); |
| return builder.Send(test_message_builder.Finish()); |
| } |
| |
| // Test that sending messages too fast returns |
| // RawSender::Error::kMessagesSentTooFast. |
| TEST_P(AbstractEventLoopTest, SendingMessagesTooFast) { |
| auto event_loop = MakePrimary(); |
| event_loop->SetRuntimeRealtimePriority(5); |
| |
| auto sender = event_loop->MakeSender<TestMessage>("/test"); |
| |
| // Send one message in the beginning, then wait until the |
| // channel_storage_duration is almost done and start sending messages rapidly, |
| // having some come in the next chanel_storage_duration. The queue_size is |
| // 1600, so the 1601st message will be the last valid one (the initial message |
| // having being sent more than a channel_storage_duration ago), and trying to |
| // send the 1602nd message should return |
| // RawSender::Error::kMessagesSentTooFast. |
| EXPECT_EQ(SendTestMessage(sender), RawSender::Error::kOk); |
| int msgs_sent = 1; |
| const int queue_size = TestChannelQueueSize(event_loop.get()); |
| |
| const int messages_per_ms = 2; |
| const auto kRepeatOffset = std::chrono::milliseconds(10); |
| const auto base_offset = |
| configuration::ChannelStorageDuration(event_loop->configuration(), |
| sender.channel()) - |
| (std::chrono::milliseconds(1) * (queue_size / 2) / messages_per_ms); |
| |
| const auto timer = event_loop->AddTimer([&]() { |
| // Send in bursts to reduce scheduler load to make the test more |
| // reproducible. |
| for (int i = 0; i < messages_per_ms * kRepeatOffset.count(); ++i) { |
| const bool done = (msgs_sent == queue_size + 1); |
| ASSERT_EQ(SendTestMessage(sender), |
| done ? RawSender::Error::kMessagesSentTooFast |
| : RawSender::Error::kOk); |
| msgs_sent++; |
| if (done) { |
| Exit(); |
| return; |
| } |
| } |
| }); |
| |
| event_loop->OnRun([&event_loop, &timer, &base_offset, &kRepeatOffset]() { |
| timer->Schedule(event_loop->monotonic_now() + base_offset, kRepeatOffset); |
| }); |
| |
| Run(); |
| } |
| |
| // Tests that we are able to send messages successfully after sending messages |
| // too fast and waiting while continuously attempting to send messages. |
| // Also tests that SendFailureCounter is working correctly in this |
| // situation |
| TEST_P(AbstractEventLoopTest, SendingAfterSendingTooFast) { |
| auto event_loop = MakePrimary(); |
| event_loop->SetRuntimeRealtimePriority(5); |
| |
| auto sender = event_loop->MakeSender<TestMessage>("/test"); |
| |
| // We are sending bunches of messages at 100 Hz, so we will be sending too |
| // fast after queue_size (800) ms. After this, keep sending messages, and |
| // exactly a channel storage duration (2s) after we send the first message we |
| // should be able to successfully send a message. |
| |
| const std::chrono::milliseconds kInterval = std::chrono::milliseconds(10); |
| const monotonic_clock::duration channel_storage_duration = |
| configuration::ChannelStorageDuration(event_loop->configuration(), |
| sender.channel()); |
| const int queue_size = TestChannelQueueSize(event_loop.get()); |
| |
| int msgs_sent = 0; |
| SendFailureCounter counter; |
| auto start = monotonic_clock::min_time; |
| |
| event_loop->AddPhasedLoop( |
| [&](int elapsed_cycles) { |
| // The queue is setup for 800 messages/sec. We want to fill that up at |
| // a rate of 2000 messages/sec so we make sure we fill it up. |
| for (int i = 0; i < 2 * kInterval.count() * elapsed_cycles; ++i) { |
| const auto actual_err = SendTestMessage(sender); |
| const bool done_waiting = (start != monotonic_clock::min_time && |
| sender.monotonic_sent_time() >= |
| (start + channel_storage_duration)); |
| const auto expected_err = |
| (msgs_sent < queue_size || done_waiting |
| ? RawSender::Error::kOk |
| : RawSender::Error::kMessagesSentTooFast); |
| |
| if (start == monotonic_clock::min_time) { |
| start = sender.monotonic_sent_time(); |
| } |
| |
| ASSERT_EQ(actual_err, expected_err); |
| counter.Count(actual_err); |
| msgs_sent++; |
| |
| EXPECT_EQ(counter.failures(), |
| msgs_sent <= queue_size |
| ? 0 |
| : (msgs_sent - queue_size) - |
| (actual_err == RawSender::Error::kOk ? 1 : 0)); |
| EXPECT_EQ(counter.just_failed(), actual_err != RawSender::Error::kOk); |
| |
| if (done_waiting) { |
| Exit(); |
| return; |
| } |
| } |
| }, |
| kInterval); |
| Run(); |
| } |
| |
| // Tests that RawSender::Error::kMessagesSentTooFast is returned |
| // when messages are sent too fast from senders in different loops |
| TEST_P(AbstractEventLoopTest, SendingTooFastWithMultipleLoops) { |
| auto loop1 = MakePrimary(); |
| auto loop2 = Make(); |
| |
| auto sender1 = loop1->MakeSender<TestMessage>("/test"); |
| auto sender2 = loop2->MakeSender<TestMessage>("/test"); |
| |
| // Send queue_size messages split between the senders. |
| const int queue_size = TestChannelQueueSize(loop1.get()); |
| for (int i = 0; i < queue_size / 2; i++) { |
| ASSERT_EQ(SendTestMessage(sender1), RawSender::Error::kOk); |
| ASSERT_EQ(SendTestMessage(sender2), RawSender::Error::kOk); |
| } |
| |
| // Since queue_size messages have been sent, this should return an error |
| EXPECT_EQ(SendTestMessage(sender2), RawSender::Error::kMessagesSentTooFast); |
| } |
| |
| // Tests that a longer storage durations store more messages. |
| TEST_P(AbstractEventLoopTest, SendingTooFastWithLongDuration) { |
| auto loop1 = MakePrimary(); |
| |
| auto sender1 = loop1->MakeSender<TestMessage>("/test3"); |
| |
| // Send queue_size messages split between the senders. |
| const int queue_size = |
| configuration::QueueSize(loop1->configuration(), sender1.channel()); |
| EXPECT_EQ(queue_size, 100 * 10); |
| for (int i = 0; i < queue_size; i++) { |
| ASSERT_EQ(SendTestMessage(sender1), RawSender::Error::kOk); |
| } |
| |
| // Since queue_size messages have been sent, and little time has elapsed, |
| // this should return an error. |
| EXPECT_EQ(SendTestMessage(sender1), RawSender::Error::kMessagesSentTooFast); |
| } |
| |
| // Tests that we can exit with a default constructor and that Run() will |
| // indicate a successful exit. |
| TEST_P(AbstractEventLoopTest, ExitHandleExitSuccessful) { |
| auto loop = MakePrimary(); |
| std::unique_ptr<ExitHandle> exit_handle = MakeExitHandle(); |
| bool happened = false; |
| |
| loop->OnRun([&exit_handle, &happened]() { |
| happened = true; |
| exit_handle->Exit(); |
| }); |
| |
| EXPECT_TRUE(Run().has_value()); |
| EXPECT_TRUE(happened); |
| } |
| |
| // Tests that we can exit with an error Status and have that returned via the |
| // Run() method. |
| TEST_P(AbstractEventLoopTest, ExitHandleExitFailure) { |
| auto loop = MakePrimary(); |
| std::unique_ptr<ExitHandle> exit_handle = MakeExitHandle(); |
| bool happened = false; |
| |
| loop->OnRun([&exit_handle, &happened]() { |
| happened = true; |
| exit_handle->Exit(Error::MakeUnexpectedError("Hello, World!")); |
| // The second Exit() should not affect the final return value. |
| exit_handle->Exit(Error::MakeUnexpectedError("Hello, World! 2")); |
| }); |
| const int line = __LINE__ - 4; |
| |
| Result<void> status = Run(); |
| |
| EXPECT_TRUE(happened); |
| EXPECT_FALSE(status.has_value()); |
| EXPECT_EQ(std::string("Hello, World!"), status.error().message()); |
| ASSERT_TRUE(status.error().source_location().has_value()); |
| EXPECT_EQ(std::string("event_loop_param_test.cc"), |
| std::filesystem::path(status.error().source_location()->file_name()) |
| .filename()); |
| EXPECT_EQ(line, status.error().source_location()->line()); |
| } |
| |
| } // namespace aos::testing |