Add a sent too fast check for simulation and shm

Returns an error if more than queue_size (frequency *
channel_storage_duration) messages were sent in one
channel_storage_duration.

Signed-off-by: Eric Schmiedeberg <eric.schmiedeberg@bluerivertech.com>
Change-Id: Ie41205ba37b66930d8a9082f2d85d7dc3388e3bf
diff --git a/aos/configuration.fbs b/aos/configuration.fbs
index c0d67b0..7595167 100644
--- a/aos/configuration.fbs
+++ b/aos/configuration.fbs
@@ -57,6 +57,9 @@
   // Type name of the flatbuffer.
   type:string (id: 1);
   // Max frequency in messages/sec of the data published on this channel.
+  // The maximum number of messages that can be sent
+  // in a channel_storage_duration is
+  // frequency * channel_storage_duration (in seconds).
   frequency:int = 100 (id: 2);
   // Max size of the data being published.  (This will hopefully be
   // automatically computed in the future.)
diff --git a/aos/events/event_loop.cc b/aos/events/event_loop.cc
index 67c4472..08b0064 100644
--- a/aos/events/event_loop.cc
+++ b/aos/events/event_loop.cc
@@ -26,6 +26,8 @@
       return "RawSender::Error::kOk";
     case RawSender::Error::kMessagesSentTooFast:
       return "RawSender::Error::kMessagesSentTooFast";
+    case RawSender::Error::kInvalidRedzone:
+      return "RawSender::Error::kInvalidRedzone";
   }
   LOG(FATAL) << "Unknown error given with code " << static_cast<int>(err);
 }
diff --git a/aos/events/event_loop.h b/aos/events/event_loop.h
index e810e3f..c2498b3 100644
--- a/aos/events/event_loop.h
+++ b/aos/events/event_loop.h
@@ -139,13 +139,16 @@
  public:
   using SharedSpan = std::shared_ptr<const absl::Span<const uint8_t>>;
 
-  enum class [[nodiscard]] Error{
+  enum class [[nodiscard]] Error {
       // Represents success and no error
       kOk,
 
       // Error for messages on channels being sent faster than their
       // frequency and channel storage duration allow
-      kMessagesSentTooFast};
+      kMessagesSentTooFast,
+      // Access to Redzone was attempted in Sender Queue
+      kInvalidRedzone
+  };
 
   RawSender(EventLoop *event_loop, const Channel *channel);
   RawSender(const RawSender &) = delete;
diff --git a/aos/events/event_loop_param_test.cc b/aos/events/event_loop_param_test.cc
index fca25d4..18f5f96 100644
--- a/aos/events/event_loop_param_test.cc
+++ b/aos/events/event_loop_param_test.cc
@@ -4,6 +4,7 @@
 #include <unordered_map>
 #include <unordered_set>
 
+#include "aos/events/test_message_generated.h"
 #include "aos/flatbuffer_merge.h"
 #include "aos/logging/log_message_generated.h"
 #include "aos/logging/logging.h"
@@ -2567,5 +2568,148 @@
                "May only send the buffer detached from this Sender");
 }
 
+int TestChannelFrequency(EventLoop *event_loop) {
+  return event_loop->GetChannel<TestMessage>("/test")->frequency();
+}
+
+int TestChannelQueueSize(EventLoop *event_loop) {
+  const int frequency = TestChannelFrequency(event_loop);
+  const auto channel_storage_duration = std::chrono::nanoseconds(
+      event_loop->configuration()->channel_storage_duration());
+  const int queue_size =
+      frequency * std::chrono::duration_cast<std::chrono::duration<double>>(
+                      channel_storage_duration)
+                      .count();
+
+  return queue_size;
+}
+
+RawSender::Error SendTestMessage(aos::Sender<TestMessage> &sender) {
+  aos::Sender<TestMessage>::Builder builder = sender.MakeBuilder();
+  TestMessage::Builder test_message_builder =
+      builder.MakeBuilder<TestMessage>();
+  test_message_builder.add_value(0);
+  return builder.Send(test_message_builder.Finish());
+}
+
+// Test that sending messages too fast returns
+// RawSender::Error::kMessagesSentTooFast.
+TEST_P(AbstractEventLoopTest, SendingMessagesTooFast) {
+  auto event_loop = MakePrimary();
+
+  auto sender = event_loop->MakeSender<TestMessage>("/test");
+
+  // Send one message in the beginning, then wait until the
+  // channel_storage_duration is almost done and start sending messages rapidly,
+  // having some come in the next chanel_storage_duration. The queue_size is
+  // 1600, so the 1601st message will be the last valid one (the initial message
+  // having being sent more than a channel_storage_duration ago), and trying to
+  // send the 1602nd message should return
+  // RawSender::Error::kMessagesSentTooFast.
+  EXPECT_EQ(SendTestMessage(sender), RawSender::Error::kOk);
+  int msgs_sent = 1;
+  const int queue_size = TestChannelQueueSize(event_loop.get());
+
+  const auto timer = event_loop->AddTimer([&]() {
+    const bool done = (msgs_sent == queue_size + 1);
+    ASSERT_EQ(
+        SendTestMessage(sender),
+        done ? RawSender::Error::kMessagesSentTooFast : RawSender::Error::kOk);
+    msgs_sent++;
+    if (done) {
+      Exit();
+    }
+  });
+
+  const auto kRepeatOffset = std::chrono::milliseconds(1);
+  const auto base_offset =
+      std::chrono::nanoseconds(
+          event_loop->configuration()->channel_storage_duration()) -
+      (kRepeatOffset * (queue_size / 2));
+  event_loop->OnRun([&event_loop, &timer, &base_offset, &kRepeatOffset]() {
+    timer->Setup(event_loop->monotonic_now() + base_offset, kRepeatOffset);
+  });
+
+  Run();
+}
+
+// Tests that we are able to send messages successfully after sending messages
+// too fast and waiting while continuously attempting to send messages.
+// Also tests that SendFailureCounter is working correctly in this
+// situation
+TEST_P(AbstractEventLoopTest, SendingAfterSendingTooFast) {
+  auto event_loop = MakePrimary();
+
+  auto sender = event_loop->MakeSender<TestMessage>("/test");
+
+  // We are sending messages at 1 kHz, so we will be sending too fast after
+  // queue_size (1600) ms. After this, keep sending messages, and exactly a
+  // channel storage duration (2s) after we send the first message we should
+  // be able to successfully send a message.
+
+  const monotonic_clock::duration kInterval = std::chrono::milliseconds(1);
+  const monotonic_clock::duration channel_storage_duration =
+      std::chrono::nanoseconds(
+          event_loop->configuration()->channel_storage_duration());
+  const int queue_size = TestChannelQueueSize(event_loop.get());
+
+  int msgs_sent = 0;
+  SendFailureCounter counter;
+  auto start = monotonic_clock::min_time;
+
+  event_loop->AddPhasedLoop(
+      [&](int) {
+        const auto actual_err = SendTestMessage(sender);
+        const bool done_waiting = (start != monotonic_clock::min_time &&
+                                   sender.monotonic_sent_time() >=
+                                       (start + channel_storage_duration));
+        const auto expected_err =
+            (msgs_sent < queue_size || done_waiting
+                 ? RawSender::Error::kOk
+                 : RawSender::Error::kMessagesSentTooFast);
+
+        if (start == monotonic_clock::min_time) {
+          start = sender.monotonic_sent_time();
+        }
+
+        ASSERT_EQ(actual_err, expected_err);
+        counter.Count(actual_err);
+        msgs_sent++;
+
+        EXPECT_EQ(counter.failures(),
+                  msgs_sent <= queue_size
+                      ? 0
+                      : (msgs_sent - queue_size) -
+                            (actual_err == RawSender::Error::kOk ? 1 : 0));
+        EXPECT_EQ(counter.just_failed(), actual_err != RawSender::Error::kOk);
+
+        if (done_waiting) {
+          Exit();
+        }
+      },
+      kInterval);
+  Run();
+}
+
+// Tests that RawSender::Error::kMessagesSentTooFast is returned
+// when messages are sent too fast from senders in different loops
+TEST_P(AbstractEventLoopTest, SendingTooFastWithMultipleLoops) {
+  auto loop1 = MakePrimary();
+  auto loop2 = Make();
+
+  auto sender1 = loop1->MakeSender<TestMessage>("/test");
+  auto sender2 = loop2->MakeSender<TestMessage>("/test");
+
+  // Send queue_size messages split between the senders.
+  const int queue_size = TestChannelQueueSize(loop1.get());
+  for (int i = 0; i < queue_size / 2; i++) {
+    ASSERT_EQ(SendTestMessage(sender1), RawSender::Error::kOk);
+    ASSERT_EQ(SendTestMessage(sender2), RawSender::Error::kOk);
+  }
+
+  // Since queue_size messages have been sent, this should return an error
+  EXPECT_EQ(SendTestMessage(sender2), RawSender::Error::kMessagesSentTooFast);
+}
+
 }  // namespace testing
 }  // namespace aos
diff --git a/aos/events/event_loop_param_test.h b/aos/events/event_loop_param_test.h
index a9d280e..fad8dea 100644
--- a/aos/events/event_loop_param_test.h
+++ b/aos/events/event_loop_param_test.h
@@ -359,6 +359,12 @@
       std::vector<std::reference_wrapper<const Fetcher<TestMessage>>> fetchers,
       std::vector<std::reference_wrapper<const Sender<TestMessage>>> senders);
 
+  // Helper function for testing the sent too fast check using a PhasedLoop with
+  // an interval that sends exactly at the frequency of the channel
+  void TestSentTooFastCheckEdgeCase(
+      const std::function<RawSender::Error(int, int)> expected_err,
+      const bool send_twice_at_end);
+
  private:
   const ::std::unique_ptr<EventLoopTestFactory> factory_;
 
@@ -367,6 +373,13 @@
 
 using AbstractEventLoopDeathTest = AbstractEventLoopTest;
 
+// Returns the frequency of the /test TestMessage channel
+int TestChannelFrequency(EventLoop *event_loop);
+// Returns the queue size of the /test TestMessage channel
+int TestChannelQueueSize(EventLoop *event_loop);
+// Sends a test message with value 0 with the given sender
+RawSender::Error SendTestMessage(aos::Sender<TestMessage> &sender);
+
 }  // namespace testing
 }  // namespace aos
 
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index ed3d099..9159553 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -522,7 +522,10 @@
             chrono::ceil<chrono::seconds>(chrono::nanoseconds(
                 event_loop->configuration()->channel_storage_duration()))),
         lockless_queue_sender_(VerifySender(
-            ipc_lib::LocklessQueueSender::Make(lockless_queue_memory_.queue()),
+            ipc_lib::LocklessQueueSender::Make(
+                lockless_queue_memory_.queue(),
+                std::chrono::nanoseconds(
+                    event_loop->configuration()->channel_storage_duration())),
             channel)),
         wake_upper_(lockless_queue_memory_.queue()) {}
 
@@ -557,17 +560,17 @@
     CHECK_LE(length, static_cast<size_t>(channel()->max_size()))
         << ": Sent too big a message on "
         << configuration::CleanedChannelToString(channel());
-    CHECK(lockless_queue_sender_.Send(length, monotonic_remote_time,
-                                      realtime_remote_time, remote_queue_index,
-                                      source_boot_uuid, &monotonic_sent_time_,
-                                      &realtime_sent_time_, &sent_queue_index_))
+    const auto result = lockless_queue_sender_.Send(
+        length, monotonic_remote_time, realtime_remote_time, remote_queue_index,
+        source_boot_uuid, &monotonic_sent_time_, &realtime_sent_time_,
+        &sent_queue_index_);
+    CHECK_NE(result, ipc_lib::LocklessQueueSender::Result::INVALID_REDZONE)
         << ": Somebody wrote outside the buffer of their message on channel "
         << configuration::CleanedChannelToString(channel());
 
     wake_upper_.Wakeup(event_loop()->is_running() ? event_loop()->priority()
                                                   : 0);
-    // TODO(Milind): check for messages sent too fast
-    return Error::kOk;
+    return CheckLocklessQueueResult(result);
   }
 
   Error DoSend(const void *msg, size_t length,
@@ -579,16 +582,19 @@
     CHECK_LE(length, static_cast<size_t>(channel()->max_size()))
         << ": Sent too big a message on "
         << configuration::CleanedChannelToString(channel());
-    CHECK(lockless_queue_sender_.Send(
+    const auto result = lockless_queue_sender_.Send(
         reinterpret_cast<const char *>(msg), length, monotonic_remote_time,
         realtime_remote_time, remote_queue_index, source_boot_uuid,
-        &monotonic_sent_time_, &realtime_sent_time_, &sent_queue_index_))
-        << ": Somebody wrote outside the buffer of their message on channel "
+        &monotonic_sent_time_, &realtime_sent_time_, &sent_queue_index_);
+
+    CHECK_NE(result, ipc_lib::LocklessQueueSender::Result::INVALID_REDZONE)
+        << ": Somebody wrote outside the buffer of their message on "
+           "channel "
         << configuration::CleanedChannelToString(channel());
     wake_upper_.Wakeup(event_loop()->is_running() ? event_loop()->priority()
                                                   : 0);
-    // TODO(austin): Return an error if we send too fast.
-    return RawSender::Error::kOk;
+
+    return CheckLocklessQueueResult(result);
   }
 
   absl::Span<char> GetSharedMemory() const {
@@ -605,6 +611,20 @@
     return static_cast<const ShmEventLoop *>(event_loop());
   }
 
+  RawSender::Error CheckLocklessQueueResult(
+      const ipc_lib::LocklessQueueSender::Result &result) {
+    switch (result) {
+      case ipc_lib::LocklessQueueSender::Result::GOOD:
+        return Error::kOk;
+      case ipc_lib::LocklessQueueSender::Result::MESSAGES_SENT_TOO_FAST:
+        return Error::kMessagesSentTooFast;
+      case ipc_lib::LocklessQueueSender::Result::INVALID_REDZONE:
+        return Error::kInvalidRedzone;
+    }
+    LOG(FATAL) << "Unknown lockless queue sender result"
+               << static_cast<int>(result);
+  }
+
   MMappedQueue lockless_queue_memory_;
   ipc_lib::LocklessQueueSender lockless_queue_sender_;
   ipc_lib::LocklessQueueWakeUpper wake_upper_;
diff --git a/aos/events/shm_event_loop_test.cc b/aos/events/shm_event_loop_test.cc
index 01ca92b..f4107b0 100644
--- a/aos/events/shm_event_loop_test.cc
+++ b/aos/events/shm_event_loop_test.cc
@@ -116,6 +116,59 @@
 
   ShmEventLoopTestFactory *factory() { return &factory_; }
 
+  // Helper functions for testing when a fetcher cannot fetch the next message
+  // because it was overwritten
+  void TestNextMessageNotAvailable(const bool skip_timing_report) {
+    auto loop1 = factory()->MakePrimary("loop1");
+    if (skip_timing_report) {
+      loop1->SkipTimingReport();
+    }
+    auto fetcher = loop1->MakeFetcher<TestMessage>("/test");
+    auto loop2 = factory()->Make("loop2");
+    auto sender = loop2->MakeSender<TestMessage>("/test");
+    bool ran = false;
+    loop1->AddPhasedLoop(
+        [&sender](int) {
+          auto builder = sender.MakeBuilder();
+          TestMessage::Builder test_builder(*builder.fbb());
+          test_builder.add_value(0);
+          builder.CheckOk(builder.Send(test_builder.Finish()));
+        },
+        std::chrono::milliseconds(2));
+    loop1
+        ->AddTimer([this, &fetcher, &ran]() {
+          EXPECT_DEATH(fetcher.FetchNext(),
+                       "The next message is no longer "
+                       "available.*\"/test\".*\"aos\\.TestMessage\"");
+          factory()->Exit();
+          ran = true;
+        })
+        ->Setup(loop1->monotonic_now() + std::chrono::seconds(4));
+    factory()->Run();
+    EXPECT_TRUE(ran);
+  }
+  void TestNextMessageNotAvailableNoRun(const bool skip_timing_report) {
+    auto loop1 = factory()->MakePrimary("loop1");
+    if (skip_timing_report) {
+      loop1->SkipTimingReport();
+    }
+    auto fetcher = loop1->MakeFetcher<TestMessage>("/test");
+    auto loop2 = factory()->Make("loop2");
+    auto sender = loop2->MakeSender<TestMessage>("/test");
+    time::PhasedLoop phased_loop(std::chrono::milliseconds(2),
+                                 loop2->monotonic_now());
+    for (int i = 0; i < 2000; ++i) {
+      auto builder = sender.MakeBuilder();
+      TestMessage::Builder test_builder(*builder.fbb());
+      test_builder.add_value(0);
+      builder.CheckOk(builder.Send(test_builder.Finish()));
+      phased_loop.SleepUntilNext();
+    }
+    EXPECT_DEATH(fetcher.FetchNext(),
+                 "The next message is no longer "
+                 "available.*\"/test\".*\"aos\\.TestMessage\"");
+  }
+
  private:
   ShmEventLoopTestFactory factory_;
 };
@@ -351,89 +404,25 @@
 // Tests that the next message not being available prints a helpful error in the
 // normal case.
 TEST_P(ShmEventLoopDeathTest, NextMessageNotAvailable) {
-  auto loop1 = factory()->MakePrimary("loop1");
-  auto fetcher = loop1->MakeFetcher<TestMessage>("/test");
-  auto loop2 = factory()->Make("loop2");
-  auto sender = loop2->MakeSender<TestMessage>("/test");
-  bool ran = false;
-  loop1->OnRun([this, &sender, &fetcher, &ran]() {
-    for (int i = 0; i < 2000; ++i) {
-      auto builder = sender.MakeBuilder();
-      TestMessage::Builder test_builder(*builder.fbb());
-      test_builder.add_value(0);
-      builder.CheckOk(builder.Send(test_builder.Finish()));
-    }
-    EXPECT_DEATH(fetcher.FetchNext(),
-                 "The next message is no longer "
-                 "available.*\"/test\".*\"aos\\.TestMessage\"");
-    factory()->Exit();
-    ran = true;
-  });
-  factory()->Run();
-  EXPECT_TRUE(ran);
+  TestNextMessageNotAvailable(false);
 }
 
 // Tests that the next message not being available prints a helpful error with
 // timing reports disabled.
 TEST_P(ShmEventLoopDeathTest, NextMessageNotAvailableNoTimingReports) {
-  auto loop1 = factory()->MakePrimary("loop1");
-  loop1->SkipTimingReport();
-  auto fetcher = loop1->MakeFetcher<TestMessage>("/test");
-  auto loop2 = factory()->Make("loop2");
-  auto sender = loop2->MakeSender<TestMessage>("/test");
-  bool ran = false;
-  loop1->OnRun([this, &sender, &fetcher, &ran]() {
-    for (int i = 0; i < 2000; ++i) {
-      auto builder = sender.MakeBuilder();
-      TestMessage::Builder test_builder(*builder.fbb());
-      test_builder.add_value(0);
-      builder.CheckOk(builder.Send(test_builder.Finish()));
-    }
-    EXPECT_DEATH(fetcher.FetchNext(),
-                 "The next message is no longer "
-                 "available.*\"/test\".*\"aos\\.TestMessage\"");
-    factory()->Exit();
-    ran = true;
-  });
-  factory()->Run();
-  EXPECT_TRUE(ran);
+  TestNextMessageNotAvailable(true);
 }
 
 // Tests that the next message not being available prints a helpful error even
 // when Run is never called.
 TEST_P(ShmEventLoopDeathTest, NextMessageNotAvailableNoRun) {
-  auto loop1 = factory()->MakePrimary("loop1");
-  auto fetcher = loop1->MakeFetcher<TestMessage>("/test");
-  auto loop2 = factory()->Make("loop2");
-  auto sender = loop2->MakeSender<TestMessage>("/test");
-  for (int i = 0; i < 2000; ++i) {
-    auto builder = sender.MakeBuilder();
-    TestMessage::Builder test_builder(*builder.fbb());
-    test_builder.add_value(0);
-    builder.CheckOk(builder.Send(test_builder.Finish()));
-  }
-  EXPECT_DEATH(fetcher.FetchNext(),
-               "The next message is no longer "
-               "available.*\"/test\".*\"aos\\.TestMessage\"");
+  TestNextMessageNotAvailableNoRun(false);
 }
 
 // Tests that the next message not being available prints a helpful error even
 // when Run is never called without timing reports.
 TEST_P(ShmEventLoopDeathTest, NextMessageNotAvailableNoRunNoTimingReports) {
-  auto loop1 = factory()->MakePrimary("loop1");
-  loop1->SkipTimingReport();
-  auto fetcher = loop1->MakeFetcher<TestMessage>("/test");
-  auto loop2 = factory()->Make("loop2");
-  auto sender = loop2->MakeSender<TestMessage>("/test");
-  for (int i = 0; i < 2000; ++i) {
-    auto builder = sender.MakeBuilder();
-    TestMessage::Builder test_builder(*builder.fbb());
-    test_builder.add_value(0);
-    builder.CheckOk(builder.Send(test_builder.Finish()));
-  }
-  EXPECT_DEATH(fetcher.FetchNext(),
-               "The next message is no longer "
-               "available.*\"/test\".*\"aos\\.TestMessage\"");
+  TestNextMessageNotAvailableNoRun(true);
 }
 
 // TODO(austin): Test that missing a deadline with a timer recovers as expected.
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index a1ef95d..69e6638 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -151,10 +151,12 @@
 class SimulatedChannel {
  public:
   explicit SimulatedChannel(const Channel *channel,
-                            std::chrono::nanoseconds channel_storage_duration)
+                            std::chrono::nanoseconds channel_storage_duration,
+                            const EventScheduler *scheduler)
       : channel_(channel),
         channel_storage_duration_(channel_storage_duration),
-        next_queue_index_(ipc_lib::QueueIndex::Zero(number_buffers())) {
+        next_queue_index_(ipc_lib::QueueIndex::Zero(number_buffers())),
+        scheduler_(scheduler) {
     available_buffer_indices_.resize(number_buffers());
     for (int i = 0; i < number_buffers(); ++i) {
       available_buffer_indices_[i] = i;
@@ -296,6 +298,11 @@
   int sender_count_ = 0;
 
   std::vector<uint16_t> available_buffer_indices_;
+
+  const EventScheduler *scheduler_;
+
+  // Queue of all the message send times in the last channel_storage_duration_
+  std::queue<monotonic_clock::time_point> last_times_;
 };
 
 namespace {
@@ -782,14 +789,14 @@
     const Channel *channel) {
   auto it = channels_->find(SimpleChannel(channel));
   if (it == channels_->end()) {
-    it =
-        channels_
-            ->emplace(
-                SimpleChannel(channel),
-                std::unique_ptr<SimulatedChannel>(new SimulatedChannel(
-                    channel, std::chrono::nanoseconds(
-                                 configuration()->channel_storage_duration()))))
-            .first;
+    it = channels_
+             ->emplace(SimpleChannel(channel),
+                       std::unique_ptr<SimulatedChannel>(new SimulatedChannel(
+                           channel,
+                           std::chrono::nanoseconds(
+                               configuration()->channel_storage_duration()),
+                           scheduler_)))
+             .first;
   }
   return it->second.get();
 }
@@ -930,7 +937,21 @@
 
 std::optional<uint32_t> SimulatedChannel::Send(
     std::shared_ptr<SimulatedMessage> message) {
-  std::optional<uint32_t> queue_index = {next_queue_index_.index()};
+  const auto now = scheduler_->monotonic_now();
+  // Remove times that are greater than or equal to a channel_storage_duration_
+  // ago
+  while (!last_times_.empty() &&
+         (now - last_times_.front() >= channel_storage_duration_)) {
+    last_times_.pop();
+  }
+
+  // Check that we are not sending messages too fast
+  if (static_cast<int>(last_times_.size()) >= queue_size()) {
+    return std::nullopt;
+  }
+
+  const std::optional<uint32_t> queue_index = {next_queue_index_.index()};
+  last_times_.push(now);
 
   message->context.queue_index = *queue_index;
   // Points to the actual data depending on the size set in context. Data may
diff --git a/aos/events/simulated_event_loop_test.cc b/aos/events/simulated_event_loop_test.cc
index 09202ff..a83243d 100644
--- a/aos/events/simulated_event_loop_test.cc
+++ b/aos/events/simulated_event_loop_test.cc
@@ -1,6 +1,7 @@
 #include "aos/events/simulated_event_loop.h"
 
 #include <chrono>
+#include <functional>
 #include <string_view>
 
 #include "aos/events/event_loop_param_test.h"
@@ -139,13 +140,13 @@
 };
 
 class FunctionEvent : public EventScheduler::Event {
-  public:
-   FunctionEvent(std::function<void()> fn) : fn_(fn) {}
+ public:
+  FunctionEvent(std::function<void()> fn) : fn_(fn) {}
 
-   void Handle() noexcept override { fn_(); }
+  void Handle() noexcept override { fn_(); }
 
-  private:
-   std::function<void()> fn_;
+ private:
+  std::function<void()> fn_;
 };
 
 // Test that creating an event and running the scheduler runs the event.
@@ -204,14 +205,6 @@
   EXPECT_EQ(counter, 1);
 }
 
-void SendTestMessage(aos::Sender<TestMessage> *sender, int value) {
-  aos::Sender<TestMessage>::Builder builder = sender->MakeBuilder();
-  TestMessage::Builder test_message_builder =
-      builder.MakeBuilder<TestMessage>();
-  test_message_builder.add_value(value);
-  ASSERT_EQ(builder.Send(test_message_builder.Finish()), RawSender::Error::kOk);
-}
-
 // Test that sending a message after running gets properly notified.
 TEST(SimulatedEventLoopTest, SendAfterRunFor) {
   SimulatedEventLoopTestFactory factory;
@@ -223,7 +216,7 @@
       simulated_event_loop_factory.MakeEventLoop("ping");
   aos::Sender<TestMessage> test_message_sender =
       ping_event_loop->MakeSender<TestMessage>("/test");
-  SendTestMessage(&test_message_sender, 1);
+  ASSERT_EQ(SendTestMessage(test_message_sender), RawSender::Error::kOk);
 
   std::unique_ptr<EventLoop> pong1_event_loop =
       simulated_event_loop_factory.MakeEventLoop("pong");
@@ -243,7 +236,7 @@
 
   // Pauses in the middle don't count though, so this should be counted.
   // But, the fresh watcher shouldn't pick it up yet.
-  SendTestMessage(&test_message_sender, 2);
+  ASSERT_EQ(SendTestMessage(test_message_sender), RawSender::Error::kOk);
 
   EXPECT_EQ(test_message_counter1.count(), 0u);
   EXPECT_EQ(test_message_counter2.count(), 0u);
@@ -253,6 +246,63 @@
   EXPECT_EQ(test_message_counter2.count(), 0u);
 }
 
+void TestSentTooFastCheckEdgeCase(
+    const std::function<RawSender::Error(int, int)> expected_err,
+    const bool send_twice_at_end) {
+  SimulatedEventLoopTestFactory factory;
+
+  auto event_loop = factory.MakePrimary("primary");
+
+  auto sender = event_loop->MakeSender<TestMessage>("/test");
+
+  const int queue_size = TestChannelQueueSize(event_loop.get());
+  int msgs_sent = 0;
+  event_loop->AddPhasedLoop(
+      [&](int) {
+        EXPECT_EQ(SendTestMessage(sender), expected_err(msgs_sent, queue_size));
+        msgs_sent++;
+
+        // If send_twice_at_end, send the last two messages (message
+        // queue_size and queue_size + 1) in the same iteration, meaning that
+        // we would be sending very slightly too fast. Otherwise, we will send
+        // message queue_size + 1 in the next iteration and we will continue
+        // to be sending exactly at the channel frequency.
+        if (send_twice_at_end && (msgs_sent == queue_size)) {
+          EXPECT_EQ(SendTestMessage(sender),
+                    expected_err(msgs_sent, queue_size));
+          msgs_sent++;
+        }
+
+        if (msgs_sent > queue_size) {
+          factory.Exit();
+        }
+      },
+      std::chrono::duration_cast<std::chrono::nanoseconds>(
+          std::chrono::duration<double>(
+              1.0 / TestChannelFrequency(event_loop.get()))));
+
+  factory.Run();
+}
+
+// Tests that RawSender::Error::kMessagesSentTooFast is not returned
+// when messages are sent at the exact frequency of the channel.
+TEST(SimulatedEventLoopTest, SendingAtExactlyChannelFrequency) {
+  TestSentTooFastCheckEdgeCase([](int, int) { return RawSender::Error::kOk; },
+                               false);
+}
+
+// Tests that RawSender::Error::kMessagesSentTooFast is returned
+// when sending exactly one more message than allowed in a channel storage
+// duration.
+TEST(SimulatedEventLoopTest, SendingSlightlyTooFast) {
+  TestSentTooFastCheckEdgeCase(
+      [](const int msgs_sent, const int queue_size) {
+        return (msgs_sent == queue_size ? RawSender::Error::kMessagesSentTooFast
+                                        : RawSender::Error::kOk);
+      },
+      true);
+}
+
 // Test that creating an event loop while running dies.
 TEST(SimulatedEventLoopDeathTest, MakeEventLoopWhileRunning) {
   SimulatedEventLoopTestFactory factory;
@@ -1338,7 +1388,6 @@
   EXPECT_EQ(ConnectedCount(pi3_client_statistics_fetcher.get(), "pi1"), 2u)
       << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
 
-
   EXPECT_EQ(pi1_pong_counter.count(), 601u);
   EXPECT_EQ(pi2_pong_counter.count(), 601u);
 
@@ -1707,18 +1756,17 @@
       });
 
   // Confirm that reboot changes the UUID.
-  pi2->OnShutdown(
-      [&expected_boot_uuid, &boot_number, &expected_connection_time, pi1, pi2,
-       pi2_boot1]() {
-        expected_boot_uuid = pi2_boot1;
-        ++boot_number;
-        LOG(INFO) << "OnShutdown triggered for pi2";
-        pi2->OnStartup(
-            [&expected_boot_uuid, &expected_connection_time, pi1, pi2]() {
-              EXPECT_EQ(expected_boot_uuid, pi2->boot_uuid());
-              expected_connection_time = pi1->monotonic_now();
-            });
-      });
+  pi2->OnShutdown([&expected_boot_uuid, &boot_number, &expected_connection_time,
+                   pi1, pi2, pi2_boot1]() {
+    expected_boot_uuid = pi2_boot1;
+    ++boot_number;
+    LOG(INFO) << "OnShutdown triggered for pi2";
+    pi2->OnStartup(
+        [&expected_boot_uuid, &expected_connection_time, pi1, pi2]() {
+          EXPECT_EQ(expected_boot_uuid, pi2->boot_uuid());
+          expected_connection_time = pi1->monotonic_now();
+        });
+  });
 
   // Let a couple of ServerStatistics messages show up before rebooting.
   factory.RunFor(chrono::milliseconds(2002));
diff --git a/aos/ipc_lib/BUILD b/aos/ipc_lib/BUILD
index c124fe0..ef6c4f3 100644
--- a/aos/ipc_lib/BUILD
+++ b/aos/ipc_lib/BUILD
@@ -216,6 +216,7 @@
         "//aos/events:epoll",
         "//aos/testing:googletest",
         "//aos/testing:prevent_exit",
+        "//aos/util:phased_loop",
     ],
 )
 
diff --git a/aos/ipc_lib/lockless_queue.cc b/aos/ipc_lib/lockless_queue.cc
index fe86f6c..5f12423 100644
--- a/aos/ipc_lib/lockless_queue.cc
+++ b/aos/ipc_lib/lockless_queue.cc
@@ -4,6 +4,7 @@
 #include <sys/types.h>
 #include <syscall.h>
 #include <unistd.h>
+
 #include <algorithm>
 #include <iomanip>
 #include <iostream>
@@ -503,7 +504,7 @@
 
   bool bad = false;
 
-  for (size_t i = 0; i < redzone.size(); ++i) {
+  for (size_t i = 0; i < redzone.size() && !bad; ++i) {
     if (memcmp(&redzone[i], &redzone_value, 1)) {
       bad = true;
     }
@@ -603,6 +604,7 @@
       Message *const message =
           memory->GetMessage(Index(QueueIndex::Zero(memory->queue_size()), i));
       message->header.queue_index.Invalidate();
+      message->header.monotonic_sent_time = monotonic_clock::min_time;
       FillRedzone(memory, message->PreRedzone(memory->message_data_size()));
       FillRedzone(memory, message->PostRedzone(memory->message_data_size(),
                                                memory->message_size()));
@@ -831,8 +833,16 @@
   return count;
 }
 
-LocklessQueueSender::LocklessQueueSender(LocklessQueueMemory *memory)
-    : memory_(memory) {
+std::ostream &operator<<(std::ostream &os,
+                         const LocklessQueueSender::Result r) {
+  os << static_cast<int>(r);
+  return os;
+}
+
+LocklessQueueSender::LocklessQueueSender(
+    LocklessQueueMemory *memory,
+    monotonic_clock::duration channel_storage_duration)
+    : memory_(memory), channel_storage_duration_(channel_storage_duration) {
   GrabQueueSetupLockOrDie grab_queue_setup_lock(memory_);
 
   // Since we already have the lock, go ahead and try cleaning up.
@@ -877,9 +887,9 @@
 }
 
 std::optional<LocklessQueueSender> LocklessQueueSender::Make(
-    LocklessQueue queue) {
+    LocklessQueue queue, monotonic_clock::duration channel_storage_duration) {
   queue.Initialize();
-  LocklessQueueSender result(queue.memory());
+  LocklessQueueSender result(queue.memory(), channel_storage_duration);
   if (result.sender_index_ != -1) {
     return std::move(result);
   } else {
@@ -904,7 +914,7 @@
   return message->data(memory_->message_data_size());
 }
 
-bool LocklessQueueSender::Send(
+LocklessQueueSender::Result LocklessQueueSender::Send(
     const char *data, size_t length,
     monotonic_clock::time_point monotonic_remote_time,
     realtime_clock::time_point realtime_remote_time,
@@ -921,7 +931,7 @@
               realtime_sent_time, queue_index);
 }
 
-bool LocklessQueueSender::Send(
+LocklessQueueSender::Result LocklessQueueSender::Send(
     size_t length, monotonic_clock::time_point monotonic_remote_time,
     realtime_clock::time_point realtime_remote_time,
     uint32_t remote_queue_index, const UUID &source_boot_uuid,
@@ -936,7 +946,7 @@
   const Index scratch_index = sender->scratch_index.RelaxedLoad();
   Message *const message = memory_->GetMessage(scratch_index);
   if (CheckBothRedzones(memory_, message)) {
-    return false;
+    return Result::INVALID_REDZONE;
   }
 
   // We should have invalidated this when we first got the buffer. Verify that
@@ -995,11 +1005,14 @@
     // This is just a best-effort check to skip reading the clocks if possible.
     // If this fails, then the compare-exchange below definitely would, so we
     // can bail out now.
+    const Message *message_to_replace = memory_->GetMessage(to_replace);
+    bool is_previous_index_valid = false;
     {
       const QueueIndex previous_index =
-          memory_->GetMessage(to_replace)
-              ->header.queue_index.RelaxedLoad(queue_size);
-      if (previous_index != decremented_queue_index && previous_index.valid()) {
+          message_to_replace->header.queue_index.RelaxedLoad(queue_size);
+      is_previous_index_valid = previous_index.valid();
+      if (previous_index != decremented_queue_index &&
+          is_previous_index_valid) {
         // Retry.
         VLOG(3) << "Something fishy happened, queue index doesn't match.  "
                    "Retrying.  Previous index was "
@@ -1011,6 +1024,7 @@
 
     message->header.monotonic_sent_time = ::aos::monotonic_clock::now();
     message->header.realtime_sent_time = ::aos::realtime_clock::now();
+
     if (monotonic_sent_time != nullptr) {
       *monotonic_sent_time = message->header.monotonic_sent_time;
     }
@@ -1021,8 +1035,48 @@
       *queue_index = next_queue_index.index();
     }
 
+    const auto to_replace_monotonic_sent_time =
+        message_to_replace->header.monotonic_sent_time;
+
+    // If we are overwriting a message sent in the last
+    // channel_storage_duration_, that means that we would be sending more than
+    // queue_size messages and would therefore be sending too fast. If the
+    // previous index is not valid then the message hasn't been filled out yet
+    // so we aren't sending too fast. And, if it is not less than the sent time
+    // of the message that we are going to write, someone else beat us and the
+    // compare and exchange below will fail.
+    if (is_previous_index_valid &&
+        (to_replace_monotonic_sent_time <
+         message->header.monotonic_sent_time) &&
+        (message->header.monotonic_sent_time - to_replace_monotonic_sent_time <
+         channel_storage_duration_)) {
+      // There is a possibility that another context beat us to writing out the
+      // message in the queue, but we beat that context to acquiring the sent
+      // time. In this case our sent time is *greater than* the other context's
+      // sent time. Therefore, we can check if we got beat filling out this
+      // message *after* doing the above check to determine if we hit this edge
+      // case. Otherwise, messages are being sent too fast.
+      const QueueIndex previous_index =
+          message_to_replace->header.queue_index.Load(queue_size);
+      if (previous_index != decremented_queue_index && previous_index.valid()) {
+        VLOG(3) << "Got beat during check for messages being sent too fast"
+                   "Retrying.";
+        continue;
+      } else {
+        VLOG(3) << "Messages sent too fast. Returning. Attempted index: "
+                << decremented_queue_index.index()
+                << " message sent time: " << message->header.monotonic_sent_time
+                << "  message to replace sent time: "
+                << to_replace_monotonic_sent_time;
+        // Since we are not using the message obtained from scratch_index
+        // and we are not retrying, we need to invalidate its queue_index.
+        message->header.queue_index.Invalidate();
+        return Result::MESSAGES_SENT_TOO_FAST;
+      }
+    }
+
     // Before we are fully done filling out the message, update the Sender state
-    // with the new index to write.  This re-uses the barrier for the
+    // with the new index to write. This re-uses the barrier for the
     // queue_index store.
     const Index index_to_write(next_queue_index, scratch_index.message_index());
 
@@ -1085,7 +1139,7 @@
   // If anybody is looking at this message (they shouldn't be), then try telling
   // them about it (best-effort).
   memory_->GetMessage(new_scratch)->header.queue_index.RelaxedInvalidate();
-  return true;
+  return Result::GOOD;
 }
 
 int LocklessQueueSender::buffer_index() const {
diff --git a/aos/ipc_lib/lockless_queue.h b/aos/ipc_lib/lockless_queue.h
index d6fb72f..f7a85c3 100644
--- a/aos/ipc_lib/lockless_queue.h
+++ b/aos/ipc_lib/lockless_queue.h
@@ -282,10 +282,19 @@
 // scoped to this object's lifetime.
 class LocklessQueueSender {
  public:
+  // Enum of possible sending errors
+  // Send returns GOOD if the messages was sent successfully, INVALID_REDZONE if
+  // one of a message's redzones has invalid data, or MESSAGES_SENT_TOO_FAST if
+  // more than queue_size messages were going to be sent in a
+  // channel_storage_duration_.
+  enum class Result { GOOD, INVALID_REDZONE, MESSAGES_SENT_TOO_FAST };
+
   LocklessQueueSender(const LocklessQueueSender &) = delete;
   LocklessQueueSender &operator=(const LocklessQueueSender &) = delete;
   LocklessQueueSender(LocklessQueueSender &&other)
-      : memory_(other.memory_), sender_index_(other.sender_index_) {
+      : memory_(other.memory_),
+        sender_index_(other.sender_index_),
+        channel_storage_duration_(other.channel_storage_duration_) {
     other.memory_ = nullptr;
     other.sender_index_ = -1;
   }
@@ -299,7 +308,8 @@
 
   // Creates a sender.  If we couldn't allocate a sender, returns nullopt.
   // TODO(austin): Change the API if we find ourselves with more errors.
-  static std::optional<LocklessQueueSender> Make(LocklessQueue queue);
+  static std::optional<LocklessQueueSender> Make(
+      LocklessQueue queue, monotonic_clock::duration channel_storage_duration);
 
   // Sends a message without copying the data.
   // Copy at most size() bytes of data into the memory pointed to by Data(),
@@ -307,34 +317,43 @@
   // Note: calls to Data() are expensive enough that you should cache it.
   size_t size() const;
   void *Data();
-  bool Send(size_t length, monotonic_clock::time_point monotonic_remote_time,
-            realtime_clock::time_point realtime_remote_time,
-            uint32_t remote_queue_index, const UUID &source_boot_uuid,
-            monotonic_clock::time_point *monotonic_sent_time = nullptr,
-            realtime_clock::time_point *realtime_sent_time = nullptr,
-            uint32_t *queue_index = nullptr);
+  LocklessQueueSender::Result Send(
+      size_t length, monotonic_clock::time_point monotonic_remote_time,
+      realtime_clock::time_point realtime_remote_time,
+      uint32_t remote_queue_index, const UUID &source_boot_uuid,
+      monotonic_clock::time_point *monotonic_sent_time = nullptr,
+      realtime_clock::time_point *realtime_sent_time = nullptr,
+      uint32_t *queue_index = nullptr);
 
   // Sends up to length data.  Does not wakeup the target.
-  bool Send(const char *data, size_t length,
-            monotonic_clock::time_point monotonic_remote_time,
-            realtime_clock::time_point realtime_remote_time,
-            uint32_t remote_queue_index, const UUID &source_boot_uuid,
-            monotonic_clock::time_point *monotonic_sent_time = nullptr,
-            realtime_clock::time_point *realtime_sent_time = nullptr,
-            uint32_t *queue_index = nullptr);
+  LocklessQueueSender::Result Send(
+      const char *data, size_t length,
+      monotonic_clock::time_point monotonic_remote_time,
+      realtime_clock::time_point realtime_remote_time,
+      uint32_t remote_queue_index, const UUID &source_boot_uuid,
+      monotonic_clock::time_point *monotonic_sent_time = nullptr,
+      realtime_clock::time_point *realtime_sent_time = nullptr,
+      uint32_t *queue_index = nullptr);
 
   int buffer_index() const;
 
  private:
-  LocklessQueueSender(LocklessQueueMemory *memory);
+  LocklessQueueSender(LocklessQueueMemory *memory,
+                      monotonic_clock::duration channel_storage_duration);
 
   // Pointer to the backing memory.
   LocklessQueueMemory *memory_ = nullptr;
 
   // Index into the sender list.
   int sender_index_ = -1;
+
+  // Storage duration of the channel used to check if messages were sent too
+  // fast
+  const monotonic_clock::duration channel_storage_duration_;
 };
 
+std::ostream &operator<<(std::ostream &os, const LocklessQueueSender::Result r);
+
 // Pinner for blocks of data.  The resources associated with a pinner are
 // scoped to this object's lifetime.
 class LocklessQueuePinner {
diff --git a/aos/ipc_lib/lockless_queue_death_test.cc b/aos/ipc_lib/lockless_queue_death_test.cc
index 2217811..c5edb9e 100644
--- a/aos/ipc_lib/lockless_queue_death_test.cc
+++ b/aos/ipc_lib/lockless_queue_death_test.cc
@@ -558,6 +558,9 @@
 
 static int kPinnedMessageIndex = 0;
 
+constexpr monotonic_clock::duration kChannelStorageDuration =
+    std::chrono::milliseconds(500);
+
 }  // namespace
 
 // Tests that death during sends is recovered from correctly.
@@ -575,7 +578,7 @@
   config.num_watchers = 2;
   config.num_senders = 2;
   config.num_pinners = 1;
-  config.queue_size = 2;
+  config.queue_size = 10;
   config.message_data_size = 32;
 
   TestShmRobustness(
@@ -596,14 +599,16 @@
             config);
         // Now try to write some messages.  We will get killed a bunch as this
         // tries to happen.
-        LocklessQueueSender sender = LocklessQueueSender::Make(queue).value();
+        LocklessQueueSender sender =
+            LocklessQueueSender::Make(queue, kChannelStorageDuration).value();
         LocklessQueuePinner pinner = LocklessQueuePinner::Make(queue).value();
         for (int i = 0; i < 5; ++i) {
           char data[100];
           size_t s = snprintf(data, sizeof(data), "foobar%d", i + 1);
-          sender.Send(data, s + 1, monotonic_clock::min_time,
-                      realtime_clock::min_time, 0xffffffffl, UUID::Zero(),
-                      nullptr, nullptr, nullptr);
+          ASSERT_EQ(sender.Send(data, s + 1, monotonic_clock::min_time,
+                                realtime_clock::min_time, 0xffffffffl,
+                                UUID::Zero(), nullptr, nullptr, nullptr),
+                    LocklessQueueSender::Result::GOOD);
           // Pin a message, so when we keep writing we will exercise the pinning
           // logic.
           if (i == 1) {
@@ -613,7 +618,8 @@
       },
       [config, tid](void *raw_memory) {
         ::aos::ipc_lib::LocklessQueueMemory *const memory =
-            reinterpret_cast<::aos::ipc_lib::LocklessQueueMemory *>(raw_memory);
+            reinterpret_cast< ::aos::ipc_lib::LocklessQueueMemory *>(
+                raw_memory);
         // Confirm that we can create 2 senders (the number in the queue), and
         // send a message.  And that all the messages in the queue are valid.
         LocklessQueue queue(memory, memory, config);
@@ -639,7 +645,7 @@
         }
 
         // Building and destroying a sender will clean up the queue.
-        LocklessQueueSender::Make(queue).value();
+        LocklessQueueSender::Make(queue, kChannelStorageDuration).value();
 
         if (print) {
           LOG(INFO) << "Cleaned up version:";
@@ -665,19 +671,21 @@
         }
 
         {
-          LocklessQueueSender sender = LocklessQueueSender::Make(queue).value();
+          LocklessQueueSender sender =
+              LocklessQueueSender::Make(queue, kChannelStorageDuration).value();
           {
             // Make a second sender to confirm that the slot was freed.
             // If the sender doesn't get cleaned up, this will fail.
-            LocklessQueueSender::Make(queue).value();
+            LocklessQueueSender::Make(queue, kChannelStorageDuration).value();
           }
 
           // Send a message to make sure that the queue still works.
           char data[100];
           size_t s = snprintf(data, sizeof(data), "foobar%d", 971);
-          sender.Send(data, s + 1, monotonic_clock::min_time,
-                      realtime_clock::min_time, 0xffffffffl, UUID::Zero(),
-                      nullptr, nullptr, nullptr);
+          ASSERT_EQ(sender.Send(data, s + 1, monotonic_clock::min_time,
+                                realtime_clock::min_time, 0xffffffffl,
+                                UUID::Zero(), nullptr, nullptr, nullptr),
+                    LocklessQueueSender::Result::GOOD);
         }
 
         // Now loop through the queue and make sure the number in the snprintf
diff --git a/aos/ipc_lib/lockless_queue_test.cc b/aos/ipc_lib/lockless_queue_test.cc
index 2b9f49c..57dd94b 100644
--- a/aos/ipc_lib/lockless_queue_test.cc
+++ b/aos/ipc_lib/lockless_queue_test.cc
@@ -16,6 +16,7 @@
 #include "aos/ipc_lib/queue_racer.h"
 #include "aos/ipc_lib/signalfd.h"
 #include "aos/realtime.h"
+#include "aos/util/phased_loop.h"
 #include "gflags/gflags.h"
 #include "gtest/gtest.h"
 
@@ -42,6 +43,9 @@
 
 class LocklessQueueTest : public ::testing::Test {
  public:
+  static constexpr monotonic_clock::duration kChannelStorageDuration =
+      std::chrono::milliseconds(500);
+
   LocklessQueueTest() {
     config_.num_watchers = 10;
     config_.num_senders = 100;
@@ -99,8 +103,6 @@
   LocklessQueueConfiguration config_;
 };
 
-typedef LocklessQueueTest LocklessQueueDeathTest;
-
 // Tests that wakeup doesn't do anything if nothing was registered.
 TEST_F(LocklessQueueTest, NoWatcherWakeup) {
   LocklessQueueWakeUpper wake_upper(queue());
@@ -190,9 +192,10 @@
 TEST_F(LocklessQueueTest, TooManySenders) {
   ::std::vector<LocklessQueueSender> senders;
   for (size_t i = 0; i < config_.num_senders; ++i) {
-    senders.emplace_back(LocklessQueueSender::Make(queue()).value());
+    senders.emplace_back(
+        LocklessQueueSender::Make(queue(), kChannelStorageDuration).value());
   }
-  EXPECT_FALSE(LocklessQueueSender::Make(queue()));
+  EXPECT_FALSE(LocklessQueueSender::Make(queue(), kChannelStorageDuration));
 }
 
 // Now, start 2 threads and have them receive the signals.
@@ -226,9 +229,11 @@
 
 // Do a simple send test.
 TEST_F(LocklessQueueTest, Send) {
-  LocklessQueueSender sender = LocklessQueueSender::Make(queue()).value();
+  LocklessQueueSender sender =
+      LocklessQueueSender::Make(queue(), kChannelStorageDuration).value();
   LocklessQueueReader reader(queue());
 
+  time::PhasedLoop loop(std::chrono::microseconds(1), monotonic_clock::now());
   // Send enough messages to wrap.
   for (int i = 0; i < 20000; ++i) {
     // Confirm that the queue index makes sense given the number of sends.
@@ -238,8 +243,10 @@
     // Send a trivial piece of data.
     char data[100];
     size_t s = snprintf(data, sizeof(data), "foobar%d", i);
-    sender.Send(data, s, monotonic_clock::min_time, realtime_clock::min_time,
-                0xffffffffu, UUID::Zero(), nullptr, nullptr, nullptr);
+    EXPECT_EQ(sender.Send(data, s, monotonic_clock::min_time,
+                          realtime_clock::min_time, 0xffffffffu, UUID::Zero(),
+                          nullptr, nullptr, nullptr),
+              LocklessQueueSender::Result::GOOD);
 
     // Confirm that the queue index still makes sense.  This is easier since the
     // empty case has been handled.
@@ -271,6 +278,8 @@
     if (read_result != LocklessQueueReader::Result::GOOD) {
       EXPECT_EQ(read_result, LocklessQueueReader::Result::TOO_OLD);
     }
+
+    loop.SleepUntilNext();
   }
 }
 
@@ -342,7 +351,42 @@
 
 }  // namespace
 
-// Send enough messages to wrap the 32 bit send counter.
+class LocklessQueueTestTooFast : public LocklessQueueTest {
+ public:
+  LocklessQueueTestTooFast() {
+    // Force a scenario where senders get rate limited
+    config_.num_watchers = 1000;
+    config_.num_senders = 100;
+    config_.num_pinners = 5;
+    config_.queue_size = 100;
+    // Exercise the alignment code.  This would throw off alignment.
+    config_.message_data_size = 101;
+
+    // Since our backing store is an array of uint64_t for alignment purposes,
+    // normalize by the size.
+    memory_.resize(LocklessQueueMemorySize(config_) / sizeof(uint64_t));
+
+    Reset();
+  }
+};
+
+// Ensure we always return OK or MESSAGES_SENT_TOO_FAST under an extreme load
+// on the Sender Queue.
+TEST_F(LocklessQueueTestTooFast, MessagesSentTooFast) {
+  PinForTest pin_cpu;
+  uint64_t kNumMessages = 1000000;
+  QueueRacer racer(queue(),
+                   {FLAGS_thread_count,
+                    kNumMessages,
+                    {LocklessQueueSender::Result::GOOD,
+                     LocklessQueueSender::Result::MESSAGES_SENT_TOO_FAST},
+                    std::chrono::milliseconds(500),
+                    false});
+
+  EXPECT_NO_FATAL_FAILURE(racer.RunIteration(false, 0));
+}
+
+// // Send enough messages to wrap the 32 bit send counter.
 TEST_F(LocklessQueueTest, WrappedSend) {
   PinForTest pin_cpu;
   uint64_t kNumMessages = 0x100010000ul;
diff --git a/aos/ipc_lib/queue_racer.cc b/aos/ipc_lib/queue_racer.cc
index b738805..414b7fb 100644
--- a/aos/ipc_lib/queue_racer.cc
+++ b/aos/ipc_lib/queue_racer.cc
@@ -26,7 +26,23 @@
 
 QueueRacer::QueueRacer(LocklessQueue queue, int num_threads,
                        uint64_t num_messages)
-    : queue_(queue), num_threads_(num_threads), num_messages_(num_messages) {
+    : queue_(queue),
+      num_threads_(num_threads),
+      num_messages_(num_messages),
+      channel_storage_duration_(std::chrono::nanoseconds(1)),
+      expected_send_results_({LocklessQueueSender::Result::GOOD}),
+      check_writes_and_reads_(true) {
+  Reset();
+}
+
+QueueRacer::QueueRacer(LocklessQueue queue,
+                       const QueueRacerConfiguration &config)
+    : queue_(queue),
+      num_threads_(config.num_threads),
+      num_messages_(config.num_messages),
+      channel_storage_duration_(config.channel_storage_duration),
+      expected_send_results_(config.expected_send_results),
+      check_writes_and_reads_(config.check_writes_and_reads) {
   Reset();
 }
 
@@ -117,7 +133,9 @@
           EXPECT_NE(latest_queue_index_queue_index, QueueIndex::Invalid());
           // latest_queue_index is an index, not a count.  So it always reads 1
           // low.
-          EXPECT_GE(latest_queue_index + 1, finished_writes);
+          if (check_writes_and_reads_) {
+            EXPECT_GE(latest_queue_index + 1, finished_writes);
+          }
         }
       }
     }
@@ -133,8 +151,8 @@
     }
     t.thread = ::std::thread([this, &t, thread_index, &run,
                               write_wrap_count]() {
-      // Build up a sender.
-      LocklessQueueSender sender = LocklessQueueSender::Make(queue_).value();
+      LocklessQueueSender sender =
+          LocklessQueueSender::Make(queue_, channel_storage_duration_).value();
       CHECK_GE(sender.size(), sizeof(ThreadPlusCount));
 
       // Signal that we are ready to start sending.
@@ -176,9 +194,16 @@
         }
 
         ++started_writes_;
-        sender.Send(sizeof(ThreadPlusCount), aos::monotonic_clock::min_time,
-                    aos::realtime_clock::min_time, 0xffffffff, UUID::Zero(),
-                    nullptr, nullptr, nullptr);
+        auto result =
+            sender.Send(sizeof(ThreadPlusCount), aos::monotonic_clock::min_time,
+                        aos::realtime_clock::min_time, 0xffffffff, UUID::Zero(),
+                        nullptr, nullptr, nullptr);
+
+        CHECK(std::find(expected_send_results_.begin(),
+                        expected_send_results_.end(),
+                        result) != expected_send_results_.end())
+            << "Unexpected send result: " << result;
+
         // Blank out the new scratch buffer, to catch other people using it.
         {
           char *const new_data = static_cast<char *>(sender.Data()) +
@@ -210,7 +235,9 @@
     queue_index_racer.join();
   }
 
-  CheckReads(race_reads, write_wrap_count, &threads);
+  if (check_writes_and_reads_) {
+    CheckReads(race_reads, write_wrap_count, &threads);
+  }
 
   // Reap all the threads.
   if (race_reads) {
@@ -221,26 +248,28 @@
     queue_index_racer.join();
   }
 
-  // Confirm that the number of writes matches the expected number of writes.
-  ASSERT_EQ(num_threads_ * num_messages_ * (1 + write_wrap_count),
-            started_writes_);
-  ASSERT_EQ(num_threads_ * num_messages_ * (1 + write_wrap_count),
-            finished_writes_);
+  if (check_writes_and_reads_) {
+    // Confirm that the number of writes matches the expected number of writes.
+    ASSERT_EQ(num_threads_ * num_messages_ * (1 + write_wrap_count),
+              started_writes_);
+    ASSERT_EQ(num_threads_ * num_messages_ * (1 + write_wrap_count),
+              finished_writes_);
 
-  // And that every thread sent the right number of messages.
-  for (ThreadState &t : threads) {
-    if (will_wrap) {
-      if (!race_reads) {
-        // If we are wrapping, there is a possibility that a thread writes
-        // everything *before* we can read any of it, and it all gets
-        // overwritten.
-        ASSERT_TRUE(t.event_count == ::std::numeric_limits<uint64_t>::max() ||
-                    t.event_count == (1 + write_wrap_count) * num_messages_)
-            << ": Got " << t.event_count << " events, expected "
-            << (1 + write_wrap_count) * num_messages_;
+    // And that every thread sent the right number of messages.
+    for (ThreadState &t : threads) {
+      if (will_wrap) {
+        if (!race_reads) {
+          // If we are wrapping, there is a possibility that a thread writes
+          // everything *before* we can read any of it, and it all gets
+          // overwritten.
+          ASSERT_TRUE(t.event_count == ::std::numeric_limits<uint64_t>::max() ||
+                      t.event_count == (1 + write_wrap_count) * num_messages_)
+              << ": Got " << t.event_count << " events, expected "
+              << (1 + write_wrap_count) * num_messages_;
+        }
+      } else {
+        ASSERT_EQ(t.event_count, num_messages_);
       }
-    } else {
-      ASSERT_EQ(t.event_count, num_messages_);
     }
   }
 }
diff --git a/aos/ipc_lib/queue_racer.h b/aos/ipc_lib/queue_racer.h
index ea0238e..3e5ca94 100644
--- a/aos/ipc_lib/queue_racer.h
+++ b/aos/ipc_lib/queue_racer.h
@@ -10,11 +10,28 @@
 
 struct ThreadState;
 
+struct QueueRacerConfiguration {
+  // Number of threads that send messages
+  const int num_threads;
+  // Number of messages sent by each thread
+  const uint64_t num_messages;
+  // Allows QueueRacer to check for multiple returns from calling Send()
+  const std::vector<LocklessQueueSender::Result> expected_send_results = {
+      LocklessQueueSender::Result::GOOD};
+  // Channel Storage Duration for queue used by QueueRacer
+  const monotonic_clock::duration channel_storage_duration =
+      std::chrono::nanoseconds(1);
+  // Set to true if all writes and reads are expected to be successful
+  // This allows QueueRacer to be used for checking failure scenarios
+  const bool check_writes_and_reads;
+};
+
 // Class to test the queue by spinning up a bunch of writing threads and racing
 // them together to all write at once.
 class QueueRacer {
  public:
   QueueRacer(LocklessQueue queue, int num_threads, uint64_t num_messages);
+  QueueRacer(LocklessQueue queue, const QueueRacerConfiguration &config);
 
   // Runs an iteration of the race.
   //
@@ -52,7 +69,10 @@
   LocklessQueue queue_;
   const uint64_t num_threads_;
   const uint64_t num_messages_;
-
+  const monotonic_clock::duration channel_storage_duration_;
+  // Allows QueueRacer to check for multiple returns from calling Send()
+  const std::vector<LocklessQueueSender::Result> expected_send_results_;
+  const bool check_writes_and_reads_;
   // The overall number of writes executed will always be between the two of
   // these.  We can't atomically count writes, so we have to bound them.
   //
diff --git a/aos/network/message_bridge_test_combined_timestamps_common.json b/aos/network/message_bridge_test_combined_timestamps_common.json
index 74c932d..be79014 100644
--- a/aos/network/message_bridge_test_combined_timestamps_common.json
+++ b/aos/network/message_bridge_test_combined_timestamps_common.json
@@ -20,7 +20,7 @@
       "name": "/pi1/aos",
       "type": "aos.message_bridge.Timestamp",
       "source_node": "pi1",
-      "frequency": 10,
+      "frequency": 15,
       "max_size": 200,
       "destination_nodes": [
         {
@@ -36,7 +36,7 @@
       "name": "/pi2/aos",
       "type": "aos.message_bridge.Timestamp",
       "source_node": "pi2",
-      "frequency": 10,
+      "frequency": 15,
       "max_size": 200,
       "destination_nodes": [
         {
@@ -64,25 +64,25 @@
       "name": "/pi1/aos",
       "type": "aos.message_bridge.ClientStatistics",
       "source_node": "pi1",
-      "frequency": 2
+      "frequency": 15
     },
     {
       "name": "/pi2/aos",
       "type": "aos.message_bridge.ClientStatistics",
       "source_node": "pi2",
-      "frequency": 2
+      "frequency": 15
     },
     {
       "name": "/pi1/aos/remote_timestamps/pi2",
       "type": "aos.message_bridge.RemoteMessage",
       "source_node": "pi1",
-      "frequency": 10
+      "frequency": 15
     },
     {
       "name": "/pi2/aos/remote_timestamps/pi1",
       "type": "aos.message_bridge.RemoteMessage",
       "source_node": "pi2",
-      "frequency": 10
+      "frequency": 15
     },
     {
       "name": "/pi1/aos",
diff --git a/aos/network/message_bridge_test_common.json b/aos/network/message_bridge_test_common.json
index 99c80a9..4623edb 100644
--- a/aos/network/message_bridge_test_common.json
+++ b/aos/network/message_bridge_test_common.json
@@ -20,7 +20,7 @@
       "name": "/pi1/aos",
       "type": "aos.message_bridge.Timestamp",
       "source_node": "pi1",
-      "frequency": 10,
+      "frequency": 15,
       "max_size": 200,
       "destination_nodes": [
         {
@@ -36,7 +36,7 @@
       "name": "/pi2/aos",
       "type": "aos.message_bridge.Timestamp",
       "source_node": "pi2",
-      "frequency": 10,
+      "frequency": 15,
       "max_size": 200,
       "destination_nodes": [
         {
@@ -64,43 +64,43 @@
       "name": "/pi1/aos",
       "type": "aos.message_bridge.ClientStatistics",
       "source_node": "pi1",
-      "frequency": 2
+      "frequency": 15
     },
     {
       "name": "/pi2/aos",
       "type": "aos.message_bridge.ClientStatistics",
       "source_node": "pi2",
-      "frequency": 2
+      "frequency": 15
     },
     {
       "name": "/pi1/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp",
       "type": "aos.message_bridge.RemoteMessage",
       "source_node": "pi1",
-      "frequency": 10
+      "frequency": 15
     },
     {
       "name": "/pi2/aos/remote_timestamps/pi1/pi2/aos/aos-message_bridge-Timestamp",
       "type": "aos.message_bridge.RemoteMessage",
       "source_node": "pi2",
-      "frequency": 10
+      "frequency": 15
     },
     {
       "name": "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping",
       "type": "aos.message_bridge.RemoteMessage",
       "source_node": "pi1",
-      "frequency": 10
+      "frequency": 15
     },
     {
       "name": "/pi2/aos/remote_timestamps/pi1/test/aos-examples-Pong",
       "type": "aos.message_bridge.RemoteMessage",
       "source_node": "pi2",
-      "frequency": 10
+      "frequency": 15
     },
     {
       "name": "/pi1/aos/remote_timestamps/pi2/unreliable/aos-examples-Ping",
       "type": "aos.message_bridge.RemoteMessage",
       "source_node": "pi1",
-      "frequency": 10
+      "frequency": 15
     },
     {
       "name": "/pi1/aos",