Add a sent too fast check for simulation and shm
Returns an error if more than queue_size (frequency *
channel_storage_duration) messages were sent in one
channel_storage_duration.
Signed-off-by: Eric Schmiedeberg <eric.schmiedeberg@bluerivertech.com>
Change-Id: Ie41205ba37b66930d8a9082f2d85d7dc3388e3bf
diff --git a/aos/events/event_loop_param_test.cc b/aos/events/event_loop_param_test.cc
index fca25d4..18f5f96 100644
--- a/aos/events/event_loop_param_test.cc
+++ b/aos/events/event_loop_param_test.cc
@@ -4,6 +4,7 @@
#include <unordered_map>
#include <unordered_set>
+#include "aos/events/test_message_generated.h"
#include "aos/flatbuffer_merge.h"
#include "aos/logging/log_message_generated.h"
#include "aos/logging/logging.h"
@@ -2567,5 +2568,148 @@
"May only send the buffer detached from this Sender");
}
+int TestChannelFrequency(EventLoop *event_loop) {
+ return event_loop->GetChannel<TestMessage>("/test")->frequency();
+}
+
+int TestChannelQueueSize(EventLoop *event_loop) {
+ const int frequency = TestChannelFrequency(event_loop);
+ const auto channel_storage_duration = std::chrono::nanoseconds(
+ event_loop->configuration()->channel_storage_duration());
+ const int queue_size =
+ frequency * std::chrono::duration_cast<std::chrono::duration<double>>(
+ channel_storage_duration)
+ .count();
+
+ return queue_size;
+}
+
+RawSender::Error SendTestMessage(aos::Sender<TestMessage> &sender) {
+ aos::Sender<TestMessage>::Builder builder = sender.MakeBuilder();
+ TestMessage::Builder test_message_builder =
+ builder.MakeBuilder<TestMessage>();
+ test_message_builder.add_value(0);
+ return builder.Send(test_message_builder.Finish());
+}
+
+// Test that sending messages too fast returns
+// RawSender::Error::kMessagesSentTooFast.
+TEST_P(AbstractEventLoopTest, SendingMessagesTooFast) {
+ auto event_loop = MakePrimary();
+
+ auto sender = event_loop->MakeSender<TestMessage>("/test");
+
+ // Send one message in the beginning, then wait until the
+ // channel_storage_duration is almost done and start sending messages rapidly,
+ // having some come in the next chanel_storage_duration. The queue_size is
+ // 1600, so the 1601st message will be the last valid one (the initial message
+ // having being sent more than a channel_storage_duration ago), and trying to
+ // send the 1602nd message should return
+ // RawSender::Error::kMessagesSentTooFast.
+ EXPECT_EQ(SendTestMessage(sender), RawSender::Error::kOk);
+ int msgs_sent = 1;
+ const int queue_size = TestChannelQueueSize(event_loop.get());
+
+ const auto timer = event_loop->AddTimer([&]() {
+ const bool done = (msgs_sent == queue_size + 1);
+ ASSERT_EQ(
+ SendTestMessage(sender),
+ done ? RawSender::Error::kMessagesSentTooFast : RawSender::Error::kOk);
+ msgs_sent++;
+ if (done) {
+ Exit();
+ }
+ });
+
+ const auto kRepeatOffset = std::chrono::milliseconds(1);
+ const auto base_offset =
+ std::chrono::nanoseconds(
+ event_loop->configuration()->channel_storage_duration()) -
+ (kRepeatOffset * (queue_size / 2));
+ event_loop->OnRun([&event_loop, &timer, &base_offset, &kRepeatOffset]() {
+ timer->Setup(event_loop->monotonic_now() + base_offset, kRepeatOffset);
+ });
+
+ Run();
+}
+
+// Tests that we are able to send messages successfully after sending messages
+// too fast and waiting while continuously attempting to send messages.
+// Also tests that SendFailureCounter is working correctly in this
+// situation
+TEST_P(AbstractEventLoopTest, SendingAfterSendingTooFast) {
+ auto event_loop = MakePrimary();
+
+ auto sender = event_loop->MakeSender<TestMessage>("/test");
+
+ // We are sending messages at 1 kHz, so we will be sending too fast after
+ // queue_size (1600) ms. After this, keep sending messages, and exactly a
+ // channel storage duration (2s) after we send the first message we should
+ // be able to successfully send a message.
+
+ const monotonic_clock::duration kInterval = std::chrono::milliseconds(1);
+ const monotonic_clock::duration channel_storage_duration =
+ std::chrono::nanoseconds(
+ event_loop->configuration()->channel_storage_duration());
+ const int queue_size = TestChannelQueueSize(event_loop.get());
+
+ int msgs_sent = 0;
+ SendFailureCounter counter;
+ auto start = monotonic_clock::min_time;
+
+ event_loop->AddPhasedLoop(
+ [&](int) {
+ const auto actual_err = SendTestMessage(sender);
+ const bool done_waiting = (start != monotonic_clock::min_time &&
+ sender.monotonic_sent_time() >=
+ (start + channel_storage_duration));
+ const auto expected_err =
+ (msgs_sent < queue_size || done_waiting
+ ? RawSender::Error::kOk
+ : RawSender::Error::kMessagesSentTooFast);
+
+ if (start == monotonic_clock::min_time) {
+ start = sender.monotonic_sent_time();
+ }
+
+ ASSERT_EQ(actual_err, expected_err);
+ counter.Count(actual_err);
+ msgs_sent++;
+
+ EXPECT_EQ(counter.failures(),
+ msgs_sent <= queue_size
+ ? 0
+ : (msgs_sent - queue_size) -
+ (actual_err == RawSender::Error::kOk ? 1 : 0));
+ EXPECT_EQ(counter.just_failed(), actual_err != RawSender::Error::kOk);
+
+ if (done_waiting) {
+ Exit();
+ }
+ },
+ kInterval);
+ Run();
+}
+
+// Tests that RawSender::Error::kMessagesSentTooFast is returned
+// when messages are sent too fast from senders in different loops
+TEST_P(AbstractEventLoopTest, SendingTooFastWithMultipleLoops) {
+ auto loop1 = MakePrimary();
+ auto loop2 = Make();
+
+ auto sender1 = loop1->MakeSender<TestMessage>("/test");
+ auto sender2 = loop2->MakeSender<TestMessage>("/test");
+
+ // Send queue_size messages split between the senders.
+ const int queue_size = TestChannelQueueSize(loop1.get());
+ for (int i = 0; i < queue_size / 2; i++) {
+ ASSERT_EQ(SendTestMessage(sender1), RawSender::Error::kOk);
+ ASSERT_EQ(SendTestMessage(sender2), RawSender::Error::kOk);
+ }
+
+ // Since queue_size messages have been sent, this should return an error
+ EXPECT_EQ(SendTestMessage(sender2), RawSender::Error::kMessagesSentTooFast);
+}
+
} // namespace testing
} // namespace aos