Add a sent too fast check for simulation and shm

Returns an error if more than queue_size (frequency *
channel_storage_duration) messages were sent in one
channel_storage_duration.

Signed-off-by: Eric Schmiedeberg <eric.schmiedeberg@bluerivertech.com>
Change-Id: Ie41205ba37b66930d8a9082f2d85d7dc3388e3bf
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index ed3d099..9159553 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -522,7 +522,10 @@
             chrono::ceil<chrono::seconds>(chrono::nanoseconds(
                 event_loop->configuration()->channel_storage_duration()))),
         lockless_queue_sender_(VerifySender(
-            ipc_lib::LocklessQueueSender::Make(lockless_queue_memory_.queue()),
+            ipc_lib::LocklessQueueSender::Make(
+                lockless_queue_memory_.queue(),
+                std::chrono::nanoseconds(
+                    event_loop->configuration()->channel_storage_duration())),
             channel)),
         wake_upper_(lockless_queue_memory_.queue()) {}
 
@@ -557,17 +560,17 @@
     CHECK_LE(length, static_cast<size_t>(channel()->max_size()))
         << ": Sent too big a message on "
         << configuration::CleanedChannelToString(channel());
-    CHECK(lockless_queue_sender_.Send(length, monotonic_remote_time,
-                                      realtime_remote_time, remote_queue_index,
-                                      source_boot_uuid, &monotonic_sent_time_,
-                                      &realtime_sent_time_, &sent_queue_index_))
+    const auto result = lockless_queue_sender_.Send(
+        length, monotonic_remote_time, realtime_remote_time, remote_queue_index,
+        source_boot_uuid, &monotonic_sent_time_, &realtime_sent_time_,
+        &sent_queue_index_);
+    CHECK_NE(result, ipc_lib::LocklessQueueSender::Result::INVALID_REDZONE)
         << ": Somebody wrote outside the buffer of their message on channel "
         << configuration::CleanedChannelToString(channel());
 
     wake_upper_.Wakeup(event_loop()->is_running() ? event_loop()->priority()
                                                   : 0);
-    // TODO(Milind): check for messages sent too fast
-    return Error::kOk;
+    return CheckLocklessQueueResult(result);
   }
 
   Error DoSend(const void *msg, size_t length,
@@ -579,16 +582,19 @@
     CHECK_LE(length, static_cast<size_t>(channel()->max_size()))
         << ": Sent too big a message on "
         << configuration::CleanedChannelToString(channel());
-    CHECK(lockless_queue_sender_.Send(
+    const auto result = lockless_queue_sender_.Send(
         reinterpret_cast<const char *>(msg), length, monotonic_remote_time,
         realtime_remote_time, remote_queue_index, source_boot_uuid,
-        &monotonic_sent_time_, &realtime_sent_time_, &sent_queue_index_))
-        << ": Somebody wrote outside the buffer of their message on channel "
+        &monotonic_sent_time_, &realtime_sent_time_, &sent_queue_index_);
+
+    CHECK_NE(result, ipc_lib::LocklessQueueSender::Result::INVALID_REDZONE)
+        << ": Somebody wrote outside the buffer of their message on "
+           "channel "
         << configuration::CleanedChannelToString(channel());
     wake_upper_.Wakeup(event_loop()->is_running() ? event_loop()->priority()
                                                   : 0);
-    // TODO(austin): Return an error if we send too fast.
-    return RawSender::Error::kOk;
+
+    return CheckLocklessQueueResult(result);
   }
 
   absl::Span<char> GetSharedMemory() const {
@@ -605,6 +611,20 @@
     return static_cast<const ShmEventLoop *>(event_loop());
   }
 
+  RawSender::Error CheckLocklessQueueResult(
+      const ipc_lib::LocklessQueueSender::Result &result) {
+    switch (result) {
+      case ipc_lib::LocklessQueueSender::Result::GOOD:
+        return Error::kOk;
+      case ipc_lib::LocklessQueueSender::Result::MESSAGES_SENT_TOO_FAST:
+        return Error::kMessagesSentTooFast;
+      case ipc_lib::LocklessQueueSender::Result::INVALID_REDZONE:
+        return Error::kInvalidRedzone;
+    }
+    LOG(FATAL) << "Unknown lockless queue sender result"
+               << static_cast<int>(result);
+  }
+
   MMappedQueue lockless_queue_memory_;
   ipc_lib::LocklessQueueSender lockless_queue_sender_;
   ipc_lib::LocklessQueueWakeUpper wake_upper_;