Add a sent too fast check for simulation and shm

Returns an error if more than queue_size (frequency *
channel_storage_duration) messages were sent in one
channel_storage_duration.

Signed-off-by: Eric Schmiedeberg <eric.schmiedeberg@bluerivertech.com>
Change-Id: Ie41205ba37b66930d8a9082f2d85d7dc3388e3bf
diff --git a/aos/events/event_loop.cc b/aos/events/event_loop.cc
index 67c4472..08b0064 100644
--- a/aos/events/event_loop.cc
+++ b/aos/events/event_loop.cc
@@ -26,6 +26,8 @@
       return "RawSender::Error::kOk";
     case RawSender::Error::kMessagesSentTooFast:
       return "RawSender::Error::kMessagesSentTooFast";
+    case RawSender::Error::kInvalidRedzone:
+      return "RawSender::Error::kInvalidRedzone";
   }
   LOG(FATAL) << "Unknown error given with code " << static_cast<int>(err);
 }
diff --git a/aos/events/event_loop.h b/aos/events/event_loop.h
index e810e3f..c2498b3 100644
--- a/aos/events/event_loop.h
+++ b/aos/events/event_loop.h
@@ -139,13 +139,16 @@
  public:
   using SharedSpan = std::shared_ptr<const absl::Span<const uint8_t>>;
 
-  enum class [[nodiscard]] Error{
+  enum class [[nodiscard]] Error {
       // Represents success and no error
       kOk,
 
       // Error for messages on channels being sent faster than their
       // frequency and channel storage duration allow
-      kMessagesSentTooFast};
+      kMessagesSentTooFast,
+      // Access to Redzone was attempted in Sender Queue
+      kInvalidRedzone
+  };
 
   RawSender(EventLoop *event_loop, const Channel *channel);
   RawSender(const RawSender &) = delete;
diff --git a/aos/events/event_loop_param_test.cc b/aos/events/event_loop_param_test.cc
index fca25d4..18f5f96 100644
--- a/aos/events/event_loop_param_test.cc
+++ b/aos/events/event_loop_param_test.cc
@@ -4,6 +4,7 @@
 #include <unordered_map>
 #include <unordered_set>
 
+#include "aos/events/test_message_generated.h"
 #include "aos/flatbuffer_merge.h"
 #include "aos/logging/log_message_generated.h"
 #include "aos/logging/logging.h"
@@ -2567,5 +2568,148 @@
                "May only send the buffer detached from this Sender");
 }
 
+int TestChannelFrequency(EventLoop *event_loop) {
+  return event_loop->GetChannel<TestMessage>("/test")->frequency();
+}
+
+int TestChannelQueueSize(EventLoop *event_loop) {
+  const int frequency = TestChannelFrequency(event_loop);
+  const auto channel_storage_duration = std::chrono::nanoseconds(
+      event_loop->configuration()->channel_storage_duration());
+  const int queue_size =
+      frequency * std::chrono::duration_cast<std::chrono::duration<double>>(
+                      channel_storage_duration)
+                      .count();
+
+  return queue_size;
+}
+
+RawSender::Error SendTestMessage(aos::Sender<TestMessage> &sender) {
+  aos::Sender<TestMessage>::Builder builder = sender.MakeBuilder();
+  TestMessage::Builder test_message_builder =
+      builder.MakeBuilder<TestMessage>();
+  test_message_builder.add_value(0);
+  return builder.Send(test_message_builder.Finish());
+}
+
+// Test that sending messages too fast returns
+// RawSender::Error::kMessagesSentTooFast.
+TEST_P(AbstractEventLoopTest, SendingMessagesTooFast) {
+  auto event_loop = MakePrimary();
+
+  auto sender = event_loop->MakeSender<TestMessage>("/test");
+
+  // Send one message in the beginning, then wait until the
+  // channel_storage_duration is almost done and start sending messages rapidly,
+  // having some come in the next chanel_storage_duration. The queue_size is
+  // 1600, so the 1601st message will be the last valid one (the initial message
+  // having being sent more than a channel_storage_duration ago), and trying to
+  // send the 1602nd message should return
+  // RawSender::Error::kMessagesSentTooFast.
+  EXPECT_EQ(SendTestMessage(sender), RawSender::Error::kOk);
+  int msgs_sent = 1;
+  const int queue_size = TestChannelQueueSize(event_loop.get());
+
+  const auto timer = event_loop->AddTimer([&]() {
+    const bool done = (msgs_sent == queue_size + 1);
+    ASSERT_EQ(
+        SendTestMessage(sender),
+        done ? RawSender::Error::kMessagesSentTooFast : RawSender::Error::kOk);
+    msgs_sent++;
+    if (done) {
+      Exit();
+    }
+  });
+
+  const auto kRepeatOffset = std::chrono::milliseconds(1);
+  const auto base_offset =
+      std::chrono::nanoseconds(
+          event_loop->configuration()->channel_storage_duration()) -
+      (kRepeatOffset * (queue_size / 2));
+  event_loop->OnRun([&event_loop, &timer, &base_offset, &kRepeatOffset]() {
+    timer->Setup(event_loop->monotonic_now() + base_offset, kRepeatOffset);
+  });
+
+  Run();
+}
+
+// Tests that we are able to send messages successfully after sending messages
+// too fast and waiting while continuously attempting to send messages.
+// Also tests that SendFailureCounter is working correctly in this
+// situation
+TEST_P(AbstractEventLoopTest, SendingAfterSendingTooFast) {
+  auto event_loop = MakePrimary();
+
+  auto sender = event_loop->MakeSender<TestMessage>("/test");
+
+  // We are sending messages at 1 kHz, so we will be sending too fast after
+  // queue_size (1600) ms. After this, keep sending messages, and exactly a
+  // channel storage duration (2s) after we send the first message we should
+  // be able to successfully send a message.
+
+  const monotonic_clock::duration kInterval = std::chrono::milliseconds(1);
+  const monotonic_clock::duration channel_storage_duration =
+      std::chrono::nanoseconds(
+          event_loop->configuration()->channel_storage_duration());
+  const int queue_size = TestChannelQueueSize(event_loop.get());
+
+  int msgs_sent = 0;
+  SendFailureCounter counter;
+  auto start = monotonic_clock::min_time;
+
+  event_loop->AddPhasedLoop(
+      [&](int) {
+        const auto actual_err = SendTestMessage(sender);
+        const bool done_waiting = (start != monotonic_clock::min_time &&
+                                   sender.monotonic_sent_time() >=
+                                       (start + channel_storage_duration));
+        const auto expected_err =
+            (msgs_sent < queue_size || done_waiting
+                 ? RawSender::Error::kOk
+                 : RawSender::Error::kMessagesSentTooFast);
+
+        if (start == monotonic_clock::min_time) {
+          start = sender.monotonic_sent_time();
+        }
+
+        ASSERT_EQ(actual_err, expected_err);
+        counter.Count(actual_err);
+        msgs_sent++;
+
+        EXPECT_EQ(counter.failures(),
+                  msgs_sent <= queue_size
+                      ? 0
+                      : (msgs_sent - queue_size) -
+                            (actual_err == RawSender::Error::kOk ? 1 : 0));
+        EXPECT_EQ(counter.just_failed(), actual_err != RawSender::Error::kOk);
+
+        if (done_waiting) {
+          Exit();
+        }
+      },
+      kInterval);
+  Run();
+}
+
+// Tests that RawSender::Error::kMessagesSentTooFast is returned
+// when messages are sent too fast from senders in different loops
+TEST_P(AbstractEventLoopTest, SendingTooFastWithMultipleLoops) {
+  auto loop1 = MakePrimary();
+  auto loop2 = Make();
+
+  auto sender1 = loop1->MakeSender<TestMessage>("/test");
+  auto sender2 = loop2->MakeSender<TestMessage>("/test");
+
+  // Send queue_size messages split between the senders.
+  const int queue_size = TestChannelQueueSize(loop1.get());
+  for (int i = 0; i < queue_size / 2; i++) {
+    ASSERT_EQ(SendTestMessage(sender1), RawSender::Error::kOk);
+    ASSERT_EQ(SendTestMessage(sender2), RawSender::Error::kOk);
+  }
+
+  // Since queue_size messages have been sent, this should return an error
+  EXPECT_EQ(SendTestMessage(sender2), RawSender::Error::kMessagesSentTooFast);
+}
+
 }  // namespace testing
 }  // namespace aos
diff --git a/aos/events/event_loop_param_test.h b/aos/events/event_loop_param_test.h
index a9d280e..fad8dea 100644
--- a/aos/events/event_loop_param_test.h
+++ b/aos/events/event_loop_param_test.h
@@ -359,6 +359,12 @@
       std::vector<std::reference_wrapper<const Fetcher<TestMessage>>> fetchers,
       std::vector<std::reference_wrapper<const Sender<TestMessage>>> senders);
 
+  // Helper function for testing the sent too fast check using a PhasedLoop with
+  // an interval that sends exactly at the frequency of the channel
+  void TestSentTooFastCheckEdgeCase(
+      const std::function<RawSender::Error(int, int)> expected_err,
+      const bool send_twice_at_end);
+
  private:
   const ::std::unique_ptr<EventLoopTestFactory> factory_;
 
@@ -367,6 +373,13 @@
 
 using AbstractEventLoopDeathTest = AbstractEventLoopTest;
 
+// Returns the frequency of the /test TestMessage channel
+int TestChannelFrequency(EventLoop *event_loop);
+// Returns the queue size of the /test TestMessage channel
+int TestChannelQueueSize(EventLoop *event_loop);
+// Sends a test message with value 0 with the given sender
+RawSender::Error SendTestMessage(aos::Sender<TestMessage> &sender);
+
 }  // namespace testing
 }  // namespace aos
 
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index ed3d099..9159553 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -522,7 +522,10 @@
             chrono::ceil<chrono::seconds>(chrono::nanoseconds(
                 event_loop->configuration()->channel_storage_duration()))),
         lockless_queue_sender_(VerifySender(
-            ipc_lib::LocklessQueueSender::Make(lockless_queue_memory_.queue()),
+            ipc_lib::LocklessQueueSender::Make(
+                lockless_queue_memory_.queue(),
+                std::chrono::nanoseconds(
+                    event_loop->configuration()->channel_storage_duration())),
             channel)),
         wake_upper_(lockless_queue_memory_.queue()) {}
 
@@ -557,17 +560,17 @@
     CHECK_LE(length, static_cast<size_t>(channel()->max_size()))
         << ": Sent too big a message on "
         << configuration::CleanedChannelToString(channel());
-    CHECK(lockless_queue_sender_.Send(length, monotonic_remote_time,
-                                      realtime_remote_time, remote_queue_index,
-                                      source_boot_uuid, &monotonic_sent_time_,
-                                      &realtime_sent_time_, &sent_queue_index_))
+    const auto result = lockless_queue_sender_.Send(
+        length, monotonic_remote_time, realtime_remote_time, remote_queue_index,
+        source_boot_uuid, &monotonic_sent_time_, &realtime_sent_time_,
+        &sent_queue_index_);
+    CHECK_NE(result, ipc_lib::LocklessQueueSender::Result::INVALID_REDZONE)
         << ": Somebody wrote outside the buffer of their message on channel "
         << configuration::CleanedChannelToString(channel());
 
     wake_upper_.Wakeup(event_loop()->is_running() ? event_loop()->priority()
                                                   : 0);
-    // TODO(Milind): check for messages sent too fast
-    return Error::kOk;
+    return CheckLocklessQueueResult(result);
   }
 
   Error DoSend(const void *msg, size_t length,
@@ -579,16 +582,19 @@
     CHECK_LE(length, static_cast<size_t>(channel()->max_size()))
         << ": Sent too big a message on "
         << configuration::CleanedChannelToString(channel());
-    CHECK(lockless_queue_sender_.Send(
+    const auto result = lockless_queue_sender_.Send(
         reinterpret_cast<const char *>(msg), length, monotonic_remote_time,
         realtime_remote_time, remote_queue_index, source_boot_uuid,
-        &monotonic_sent_time_, &realtime_sent_time_, &sent_queue_index_))
-        << ": Somebody wrote outside the buffer of their message on channel "
+        &monotonic_sent_time_, &realtime_sent_time_, &sent_queue_index_);
+
+    CHECK_NE(result, ipc_lib::LocklessQueueSender::Result::INVALID_REDZONE)
+        << ": Somebody wrote outside the buffer of their message on "
+           "channel "
         << configuration::CleanedChannelToString(channel());
     wake_upper_.Wakeup(event_loop()->is_running() ? event_loop()->priority()
                                                   : 0);
-    // TODO(austin): Return an error if we send too fast.
-    return RawSender::Error::kOk;
+
+    return CheckLocklessQueueResult(result);
   }
 
   absl::Span<char> GetSharedMemory() const {
@@ -605,6 +611,20 @@
     return static_cast<const ShmEventLoop *>(event_loop());
   }
 
+  RawSender::Error CheckLocklessQueueResult(
+      const ipc_lib::LocklessQueueSender::Result &result) {
+    switch (result) {
+      case ipc_lib::LocklessQueueSender::Result::GOOD:
+        return Error::kOk;
+      case ipc_lib::LocklessQueueSender::Result::MESSAGES_SENT_TOO_FAST:
+        return Error::kMessagesSentTooFast;
+      case ipc_lib::LocklessQueueSender::Result::INVALID_REDZONE:
+        return Error::kInvalidRedzone;
+    }
+    LOG(FATAL) << "Unknown lockless queue sender result"
+               << static_cast<int>(result);
+  }
+
   MMappedQueue lockless_queue_memory_;
   ipc_lib::LocklessQueueSender lockless_queue_sender_;
   ipc_lib::LocklessQueueWakeUpper wake_upper_;
diff --git a/aos/events/shm_event_loop_test.cc b/aos/events/shm_event_loop_test.cc
index 01ca92b..f4107b0 100644
--- a/aos/events/shm_event_loop_test.cc
+++ b/aos/events/shm_event_loop_test.cc
@@ -116,6 +116,59 @@
 
   ShmEventLoopTestFactory *factory() { return &factory_; }
 
+  // Helper functions for testing when a fetcher cannot fetch the next message
+  // because it was overwritten
+  void TestNextMessageNotAvailable(const bool skip_timing_report) {
+    auto loop1 = factory()->MakePrimary("loop1");
+    if (skip_timing_report) {
+      loop1->SkipTimingReport();
+    }
+    auto fetcher = loop1->MakeFetcher<TestMessage>("/test");
+    auto loop2 = factory()->Make("loop2");
+    auto sender = loop2->MakeSender<TestMessage>("/test");
+    bool ran = false;
+    loop1->AddPhasedLoop(
+        [&sender](int) {
+          auto builder = sender.MakeBuilder();
+          TestMessage::Builder test_builder(*builder.fbb());
+          test_builder.add_value(0);
+          builder.CheckOk(builder.Send(test_builder.Finish()));
+        },
+        std::chrono::milliseconds(2));
+    loop1
+        ->AddTimer([this, &fetcher, &ran]() {
+          EXPECT_DEATH(fetcher.FetchNext(),
+                       "The next message is no longer "
+                       "available.*\"/test\".*\"aos\\.TestMessage\"");
+          factory()->Exit();
+          ran = true;
+        })
+        ->Setup(loop1->monotonic_now() + std::chrono::seconds(4));
+    factory()->Run();
+    EXPECT_TRUE(ran);
+  }
+  void TestNextMessageNotAvailableNoRun(const bool skip_timing_report) {
+    auto loop1 = factory()->MakePrimary("loop1");
+    if (skip_timing_report) {
+      loop1->SkipTimingReport();
+    }
+    auto fetcher = loop1->MakeFetcher<TestMessage>("/test");
+    auto loop2 = factory()->Make("loop2");
+    auto sender = loop2->MakeSender<TestMessage>("/test");
+    time::PhasedLoop phased_loop(std::chrono::milliseconds(2),
+                                 loop2->monotonic_now());
+    for (int i = 0; i < 2000; ++i) {
+      auto builder = sender.MakeBuilder();
+      TestMessage::Builder test_builder(*builder.fbb());
+      test_builder.add_value(0);
+      builder.CheckOk(builder.Send(test_builder.Finish()));
+      phased_loop.SleepUntilNext();
+    }
+    EXPECT_DEATH(fetcher.FetchNext(),
+                 "The next message is no longer "
+                 "available.*\"/test\".*\"aos\\.TestMessage\"");
+  }
+
  private:
   ShmEventLoopTestFactory factory_;
 };
@@ -351,89 +404,25 @@
 // Tests that the next message not being available prints a helpful error in the
 // normal case.
 TEST_P(ShmEventLoopDeathTest, NextMessageNotAvailable) {
-  auto loop1 = factory()->MakePrimary("loop1");
-  auto fetcher = loop1->MakeFetcher<TestMessage>("/test");
-  auto loop2 = factory()->Make("loop2");
-  auto sender = loop2->MakeSender<TestMessage>("/test");
-  bool ran = false;
-  loop1->OnRun([this, &sender, &fetcher, &ran]() {
-    for (int i = 0; i < 2000; ++i) {
-      auto builder = sender.MakeBuilder();
-      TestMessage::Builder test_builder(*builder.fbb());
-      test_builder.add_value(0);
-      builder.CheckOk(builder.Send(test_builder.Finish()));
-    }
-    EXPECT_DEATH(fetcher.FetchNext(),
-                 "The next message is no longer "
-                 "available.*\"/test\".*\"aos\\.TestMessage\"");
-    factory()->Exit();
-    ran = true;
-  });
-  factory()->Run();
-  EXPECT_TRUE(ran);
+  TestNextMessageNotAvailable(false);
 }
 
 // Tests that the next message not being available prints a helpful error with
 // timing reports disabled.
 TEST_P(ShmEventLoopDeathTest, NextMessageNotAvailableNoTimingReports) {
-  auto loop1 = factory()->MakePrimary("loop1");
-  loop1->SkipTimingReport();
-  auto fetcher = loop1->MakeFetcher<TestMessage>("/test");
-  auto loop2 = factory()->Make("loop2");
-  auto sender = loop2->MakeSender<TestMessage>("/test");
-  bool ran = false;
-  loop1->OnRun([this, &sender, &fetcher, &ran]() {
-    for (int i = 0; i < 2000; ++i) {
-      auto builder = sender.MakeBuilder();
-      TestMessage::Builder test_builder(*builder.fbb());
-      test_builder.add_value(0);
-      builder.CheckOk(builder.Send(test_builder.Finish()));
-    }
-    EXPECT_DEATH(fetcher.FetchNext(),
-                 "The next message is no longer "
-                 "available.*\"/test\".*\"aos\\.TestMessage\"");
-    factory()->Exit();
-    ran = true;
-  });
-  factory()->Run();
-  EXPECT_TRUE(ran);
+  TestNextMessageNotAvailable(true);
 }
 
 // Tests that the next message not being available prints a helpful error even
 // when Run is never called.
 TEST_P(ShmEventLoopDeathTest, NextMessageNotAvailableNoRun) {
-  auto loop1 = factory()->MakePrimary("loop1");
-  auto fetcher = loop1->MakeFetcher<TestMessage>("/test");
-  auto loop2 = factory()->Make("loop2");
-  auto sender = loop2->MakeSender<TestMessage>("/test");
-  for (int i = 0; i < 2000; ++i) {
-    auto builder = sender.MakeBuilder();
-    TestMessage::Builder test_builder(*builder.fbb());
-    test_builder.add_value(0);
-    builder.CheckOk(builder.Send(test_builder.Finish()));
-  }
-  EXPECT_DEATH(fetcher.FetchNext(),
-               "The next message is no longer "
-               "available.*\"/test\".*\"aos\\.TestMessage\"");
+  TestNextMessageNotAvailableNoRun(false);
 }
 
 // Tests that the next message not being available prints a helpful error even
 // when Run is never called without timing reports.
 TEST_P(ShmEventLoopDeathTest, NextMessageNotAvailableNoRunNoTimingReports) {
-  auto loop1 = factory()->MakePrimary("loop1");
-  loop1->SkipTimingReport();
-  auto fetcher = loop1->MakeFetcher<TestMessage>("/test");
-  auto loop2 = factory()->Make("loop2");
-  auto sender = loop2->MakeSender<TestMessage>("/test");
-  for (int i = 0; i < 2000; ++i) {
-    auto builder = sender.MakeBuilder();
-    TestMessage::Builder test_builder(*builder.fbb());
-    test_builder.add_value(0);
-    builder.CheckOk(builder.Send(test_builder.Finish()));
-  }
-  EXPECT_DEATH(fetcher.FetchNext(),
-               "The next message is no longer "
-               "available.*\"/test\".*\"aos\\.TestMessage\"");
+  TestNextMessageNotAvailableNoRun(true);
 }
 
 // TODO(austin): Test that missing a deadline with a timer recovers as expected.
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index a1ef95d..69e6638 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -151,10 +151,12 @@
 class SimulatedChannel {
  public:
   explicit SimulatedChannel(const Channel *channel,
-                            std::chrono::nanoseconds channel_storage_duration)
+                            std::chrono::nanoseconds channel_storage_duration,
+                            const EventScheduler *scheduler)
       : channel_(channel),
         channel_storage_duration_(channel_storage_duration),
-        next_queue_index_(ipc_lib::QueueIndex::Zero(number_buffers())) {
+        next_queue_index_(ipc_lib::QueueIndex::Zero(number_buffers())),
+        scheduler_(scheduler) {
     available_buffer_indices_.resize(number_buffers());
     for (int i = 0; i < number_buffers(); ++i) {
       available_buffer_indices_[i] = i;
@@ -296,6 +298,11 @@
   int sender_count_ = 0;
 
   std::vector<uint16_t> available_buffer_indices_;
+
+  const EventScheduler *scheduler_;
+
+  // Queue of all the message send times in the last channel_storage_duration_
+  std::queue<monotonic_clock::time_point> last_times_;
 };
 
 namespace {
@@ -782,14 +789,14 @@
     const Channel *channel) {
   auto it = channels_->find(SimpleChannel(channel));
   if (it == channels_->end()) {
-    it =
-        channels_
-            ->emplace(
-                SimpleChannel(channel),
-                std::unique_ptr<SimulatedChannel>(new SimulatedChannel(
-                    channel, std::chrono::nanoseconds(
-                                 configuration()->channel_storage_duration()))))
-            .first;
+    it = channels_
+             ->emplace(SimpleChannel(channel),
+                       std::unique_ptr<SimulatedChannel>(new SimulatedChannel(
+                           channel,
+                           std::chrono::nanoseconds(
+                               configuration()->channel_storage_duration()),
+                           scheduler_)))
+             .first;
   }
   return it->second.get();
 }
@@ -930,7 +937,21 @@
 
 std::optional<uint32_t> SimulatedChannel::Send(
     std::shared_ptr<SimulatedMessage> message) {
-  std::optional<uint32_t> queue_index = {next_queue_index_.index()};
+  const auto now = scheduler_->monotonic_now();
+  // Remove times that are greater than or equal to a channel_storage_duration_
+  // ago
+  while (!last_times_.empty() &&
+         (now - last_times_.front() >= channel_storage_duration_)) {
+    last_times_.pop();
+  }
+
+  // Check that we are not sending messages too fast
+  if (static_cast<int>(last_times_.size()) >= queue_size()) {
+    return std::nullopt;
+  }
+
+  const std::optional<uint32_t> queue_index = {next_queue_index_.index()};
+  last_times_.push(now);
 
   message->context.queue_index = *queue_index;
   // Points to the actual data depending on the size set in context. Data may
diff --git a/aos/events/simulated_event_loop_test.cc b/aos/events/simulated_event_loop_test.cc
index 09202ff..a83243d 100644
--- a/aos/events/simulated_event_loop_test.cc
+++ b/aos/events/simulated_event_loop_test.cc
@@ -1,6 +1,7 @@
 #include "aos/events/simulated_event_loop.h"
 
 #include <chrono>
+#include <functional>
 #include <string_view>
 
 #include "aos/events/event_loop_param_test.h"
@@ -139,13 +140,13 @@
 };
 
 class FunctionEvent : public EventScheduler::Event {
-  public:
-   FunctionEvent(std::function<void()> fn) : fn_(fn) {}
+ public:
+  FunctionEvent(std::function<void()> fn) : fn_(fn) {}
 
-   void Handle() noexcept override { fn_(); }
+  void Handle() noexcept override { fn_(); }
 
-  private:
-   std::function<void()> fn_;
+ private:
+  std::function<void()> fn_;
 };
 
 // Test that creating an event and running the scheduler runs the event.
@@ -204,14 +205,6 @@
   EXPECT_EQ(counter, 1);
 }
 
-void SendTestMessage(aos::Sender<TestMessage> *sender, int value) {
-  aos::Sender<TestMessage>::Builder builder = sender->MakeBuilder();
-  TestMessage::Builder test_message_builder =
-      builder.MakeBuilder<TestMessage>();
-  test_message_builder.add_value(value);
-  ASSERT_EQ(builder.Send(test_message_builder.Finish()), RawSender::Error::kOk);
-}
-
 // Test that sending a message after running gets properly notified.
 TEST(SimulatedEventLoopTest, SendAfterRunFor) {
   SimulatedEventLoopTestFactory factory;
@@ -223,7 +216,7 @@
       simulated_event_loop_factory.MakeEventLoop("ping");
   aos::Sender<TestMessage> test_message_sender =
       ping_event_loop->MakeSender<TestMessage>("/test");
-  SendTestMessage(&test_message_sender, 1);
+  ASSERT_EQ(SendTestMessage(test_message_sender), RawSender::Error::kOk);
 
   std::unique_ptr<EventLoop> pong1_event_loop =
       simulated_event_loop_factory.MakeEventLoop("pong");
@@ -243,7 +236,7 @@
 
   // Pauses in the middle don't count though, so this should be counted.
   // But, the fresh watcher shouldn't pick it up yet.
-  SendTestMessage(&test_message_sender, 2);
+  ASSERT_EQ(SendTestMessage(test_message_sender), RawSender::Error::kOk);
 
   EXPECT_EQ(test_message_counter1.count(), 0u);
   EXPECT_EQ(test_message_counter2.count(), 0u);
@@ -253,6 +246,63 @@
   EXPECT_EQ(test_message_counter2.count(), 0u);
 }
 
+void TestSentTooFastCheckEdgeCase(
+    const std::function<RawSender::Error(int, int)> expected_err,
+    const bool send_twice_at_end) {
+  SimulatedEventLoopTestFactory factory;
+
+  auto event_loop = factory.MakePrimary("primary");
+
+  auto sender = event_loop->MakeSender<TestMessage>("/test");
+
+  const int queue_size = TestChannelQueueSize(event_loop.get());
+  int msgs_sent = 0;
+  event_loop->AddPhasedLoop(
+      [&](int) {
+        EXPECT_EQ(SendTestMessage(sender), expected_err(msgs_sent, queue_size));
+        msgs_sent++;
+
+        // If send_twice_at_end, send the last two messages (message
+        // queue_size and queue_size + 1) in the same iteration, meaning that
+        // we would be sending very slightly too fast. Otherwise, we will send
+        // message queue_size + 1 in the next iteration and we will continue
+        // to be sending exactly at the channel frequency.
+        if (send_twice_at_end && (msgs_sent == queue_size)) {
+          EXPECT_EQ(SendTestMessage(sender),
+                    expected_err(msgs_sent, queue_size));
+          msgs_sent++;
+        }
+
+        if (msgs_sent > queue_size) {
+          factory.Exit();
+        }
+      },
+      std::chrono::duration_cast<std::chrono::nanoseconds>(
+          std::chrono::duration<double>(
+              1.0 / TestChannelFrequency(event_loop.get()))));
+
+  factory.Run();
+}
+
+// Tests that RawSender::Error::kMessagesSentTooFast is not returned
+// when messages are sent at the exact frequency of the channel.
+TEST(SimulatedEventLoopTest, SendingAtExactlyChannelFrequency) {
+  TestSentTooFastCheckEdgeCase([](int, int) { return RawSender::Error::kOk; },
+                               false);
+}
+
+// Tests that RawSender::Error::kMessagesSentTooFast is returned
+// when sending exactly one more message than allowed in a channel storage
+// duration.
+TEST(SimulatedEventLoopTest, SendingSlightlyTooFast) {
+  TestSentTooFastCheckEdgeCase(
+      [](const int msgs_sent, const int queue_size) {
+        return (msgs_sent == queue_size ? RawSender::Error::kMessagesSentTooFast
+                                        : RawSender::Error::kOk);
+      },
+      true);
+}
+
 // Test that creating an event loop while running dies.
 TEST(SimulatedEventLoopDeathTest, MakeEventLoopWhileRunning) {
   SimulatedEventLoopTestFactory factory;
@@ -1338,7 +1388,6 @@
   EXPECT_EQ(ConnectedCount(pi3_client_statistics_fetcher.get(), "pi1"), 2u)
       << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
 
-
   EXPECT_EQ(pi1_pong_counter.count(), 601u);
   EXPECT_EQ(pi2_pong_counter.count(), 601u);
 
@@ -1707,18 +1756,17 @@
       });
 
   // Confirm that reboot changes the UUID.
-  pi2->OnShutdown(
-      [&expected_boot_uuid, &boot_number, &expected_connection_time, pi1, pi2,
-       pi2_boot1]() {
-        expected_boot_uuid = pi2_boot1;
-        ++boot_number;
-        LOG(INFO) << "OnShutdown triggered for pi2";
-        pi2->OnStartup(
-            [&expected_boot_uuid, &expected_connection_time, pi1, pi2]() {
-              EXPECT_EQ(expected_boot_uuid, pi2->boot_uuid());
-              expected_connection_time = pi1->monotonic_now();
-            });
-      });
+  pi2->OnShutdown([&expected_boot_uuid, &boot_number, &expected_connection_time,
+                   pi1, pi2, pi2_boot1]() {
+    expected_boot_uuid = pi2_boot1;
+    ++boot_number;
+    LOG(INFO) << "OnShutdown triggered for pi2";
+    pi2->OnStartup(
+        [&expected_boot_uuid, &expected_connection_time, pi1, pi2]() {
+          EXPECT_EQ(expected_boot_uuid, pi2->boot_uuid());
+          expected_connection_time = pi1->monotonic_now();
+        });
+  });
 
   // Let a couple of ServerStatistics messages show up before rebooting.
   factory.RunFor(chrono::milliseconds(2002));