Add a sent too fast check for simulation and shm

Returns an error if more than queue_size (frequency *
channel_storage_duration) messages were sent in one
channel_storage_duration.

Signed-off-by: Eric Schmiedeberg <eric.schmiedeberg@bluerivertech.com>
Change-Id: Ie41205ba37b66930d8a9082f2d85d7dc3388e3bf
diff --git a/aos/events/event_loop_param_test.cc b/aos/events/event_loop_param_test.cc
index fca25d4..18f5f96 100644
--- a/aos/events/event_loop_param_test.cc
+++ b/aos/events/event_loop_param_test.cc
@@ -4,6 +4,7 @@
 #include <unordered_map>
 #include <unordered_set>
 
+#include "aos/events/test_message_generated.h"
 #include "aos/flatbuffer_merge.h"
 #include "aos/logging/log_message_generated.h"
 #include "aos/logging/logging.h"
@@ -2567,5 +2568,148 @@
                "May only send the buffer detached from this Sender");
 }
 
+int TestChannelFrequency(EventLoop *event_loop) {
+  return event_loop->GetChannel<TestMessage>("/test")->frequency();
+}
+
+int TestChannelQueueSize(EventLoop *event_loop) {
+  const int frequency = TestChannelFrequency(event_loop);
+  const auto channel_storage_duration = std::chrono::nanoseconds(
+      event_loop->configuration()->channel_storage_duration());
+  const int queue_size =
+      frequency * std::chrono::duration_cast<std::chrono::duration<double>>(
+                      channel_storage_duration)
+                      .count();
+
+  return queue_size;
+}
+
+RawSender::Error SendTestMessage(aos::Sender<TestMessage> &sender) {
+  aos::Sender<TestMessage>::Builder builder = sender.MakeBuilder();
+  TestMessage::Builder test_message_builder =
+      builder.MakeBuilder<TestMessage>();
+  test_message_builder.add_value(0);
+  return builder.Send(test_message_builder.Finish());
+}
+
+// Test that sending messages too fast returns
+// RawSender::Error::kMessagesSentTooFast.
+TEST_P(AbstractEventLoopTest, SendingMessagesTooFast) {
+  auto event_loop = MakePrimary();
+
+  auto sender = event_loop->MakeSender<TestMessage>("/test");
+
+  // Send one message in the beginning, then wait until the
+  // channel_storage_duration is almost done and start sending messages rapidly,
+  // having some come in the next chanel_storage_duration. The queue_size is
+  // 1600, so the 1601st message will be the last valid one (the initial message
+  // having being sent more than a channel_storage_duration ago), and trying to
+  // send the 1602nd message should return
+  // RawSender::Error::kMessagesSentTooFast.
+  EXPECT_EQ(SendTestMessage(sender), RawSender::Error::kOk);
+  int msgs_sent = 1;
+  const int queue_size = TestChannelQueueSize(event_loop.get());
+
+  const auto timer = event_loop->AddTimer([&]() {
+    const bool done = (msgs_sent == queue_size + 1);
+    ASSERT_EQ(
+        SendTestMessage(sender),
+        done ? RawSender::Error::kMessagesSentTooFast : RawSender::Error::kOk);
+    msgs_sent++;
+    if (done) {
+      Exit();
+    }
+  });
+
+  const auto kRepeatOffset = std::chrono::milliseconds(1);
+  const auto base_offset =
+      std::chrono::nanoseconds(
+          event_loop->configuration()->channel_storage_duration()) -
+      (kRepeatOffset * (queue_size / 2));
+  event_loop->OnRun([&event_loop, &timer, &base_offset, &kRepeatOffset]() {
+    timer->Setup(event_loop->monotonic_now() + base_offset, kRepeatOffset);
+  });
+
+  Run();
+}
+
+// Tests that we are able to send messages successfully after sending messages
+// too fast and waiting while continuously attempting to send messages.
+// Also tests that SendFailureCounter is working correctly in this
+// situation
+TEST_P(AbstractEventLoopTest, SendingAfterSendingTooFast) {
+  auto event_loop = MakePrimary();
+
+  auto sender = event_loop->MakeSender<TestMessage>("/test");
+
+  // We are sending messages at 1 kHz, so we will be sending too fast after
+  // queue_size (1600) ms. After this, keep sending messages, and exactly a
+  // channel storage duration (2s) after we send the first message we should
+  // be able to successfully send a message.
+
+  const monotonic_clock::duration kInterval = std::chrono::milliseconds(1);
+  const monotonic_clock::duration channel_storage_duration =
+      std::chrono::nanoseconds(
+          event_loop->configuration()->channel_storage_duration());
+  const int queue_size = TestChannelQueueSize(event_loop.get());
+
+  int msgs_sent = 0;
+  SendFailureCounter counter;
+  auto start = monotonic_clock::min_time;
+
+  event_loop->AddPhasedLoop(
+      [&](int) {
+        const auto actual_err = SendTestMessage(sender);
+        const bool done_waiting = (start != monotonic_clock::min_time &&
+                                   sender.monotonic_sent_time() >=
+                                       (start + channel_storage_duration));
+        const auto expected_err =
+            (msgs_sent < queue_size || done_waiting
+                 ? RawSender::Error::kOk
+                 : RawSender::Error::kMessagesSentTooFast);
+
+        if (start == monotonic_clock::min_time) {
+          start = sender.monotonic_sent_time();
+        }
+
+        ASSERT_EQ(actual_err, expected_err);
+        counter.Count(actual_err);
+        msgs_sent++;
+
+        EXPECT_EQ(counter.failures(),
+                  msgs_sent <= queue_size
+                      ? 0
+                      : (msgs_sent - queue_size) -
+                            (actual_err == RawSender::Error::kOk ? 1 : 0));
+        EXPECT_EQ(counter.just_failed(), actual_err != RawSender::Error::kOk);
+
+        if (done_waiting) {
+          Exit();
+        }
+      },
+      kInterval);
+  Run();
+}
+
+// Tests that RawSender::Error::kMessagesSentTooFast is returned
+// when messages are sent too fast from senders in different loops
+TEST_P(AbstractEventLoopTest, SendingTooFastWithMultipleLoops) {
+  auto loop1 = MakePrimary();
+  auto loop2 = Make();
+
+  auto sender1 = loop1->MakeSender<TestMessage>("/test");
+  auto sender2 = loop2->MakeSender<TestMessage>("/test");
+
+  // Send queue_size messages split between the senders.
+  const int queue_size = TestChannelQueueSize(loop1.get());
+  for (int i = 0; i < queue_size / 2; i++) {
+    ASSERT_EQ(SendTestMessage(sender1), RawSender::Error::kOk);
+    ASSERT_EQ(SendTestMessage(sender2), RawSender::Error::kOk);
+  }
+
+  // Since queue_size messages have been sent, this should return an error
+  EXPECT_EQ(SendTestMessage(sender2), RawSender::Error::kMessagesSentTooFast);
+}
+
 }  // namespace testing
 }  // namespace aos