Add a sent too fast check for simulation and shm

Returns an error if more than queue_size (frequency *
channel_storage_duration) messages were sent in one
channel_storage_duration.

Signed-off-by: Eric Schmiedeberg <eric.schmiedeberg@bluerivertech.com>
Change-Id: Ie41205ba37b66930d8a9082f2d85d7dc3388e3bf
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index a1ef95d..69e6638 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -151,10 +151,12 @@
 class SimulatedChannel {
  public:
   explicit SimulatedChannel(const Channel *channel,
-                            std::chrono::nanoseconds channel_storage_duration)
+                            std::chrono::nanoseconds channel_storage_duration,
+                            const EventScheduler *scheduler)
       : channel_(channel),
         channel_storage_duration_(channel_storage_duration),
-        next_queue_index_(ipc_lib::QueueIndex::Zero(number_buffers())) {
+        next_queue_index_(ipc_lib::QueueIndex::Zero(number_buffers())),
+        scheduler_(scheduler) {
     available_buffer_indices_.resize(number_buffers());
     for (int i = 0; i < number_buffers(); ++i) {
       available_buffer_indices_[i] = i;
@@ -296,6 +298,11 @@
   int sender_count_ = 0;
 
   std::vector<uint16_t> available_buffer_indices_;
+
+  const EventScheduler *scheduler_;
+
+  // Queue of all the message send times in the last channel_storage_duration_
+  std::queue<monotonic_clock::time_point> last_times_;
 };
 
 namespace {
@@ -782,14 +789,14 @@
     const Channel *channel) {
   auto it = channels_->find(SimpleChannel(channel));
   if (it == channels_->end()) {
-    it =
-        channels_
-            ->emplace(
-                SimpleChannel(channel),
-                std::unique_ptr<SimulatedChannel>(new SimulatedChannel(
-                    channel, std::chrono::nanoseconds(
-                                 configuration()->channel_storage_duration()))))
-            .first;
+    it = channels_
+             ->emplace(SimpleChannel(channel),
+                       std::unique_ptr<SimulatedChannel>(new SimulatedChannel(
+                           channel,
+                           std::chrono::nanoseconds(
+                               configuration()->channel_storage_duration()),
+                           scheduler_)))
+             .first;
   }
   return it->second.get();
 }
@@ -930,7 +937,21 @@
 
 std::optional<uint32_t> SimulatedChannel::Send(
     std::shared_ptr<SimulatedMessage> message) {
-  std::optional<uint32_t> queue_index = {next_queue_index_.index()};
+  const auto now = scheduler_->monotonic_now();
+  // Remove times that are greater than or equal to a channel_storage_duration_
+  // ago
+  while (!last_times_.empty() &&
+         (now - last_times_.front() >= channel_storage_duration_)) {
+    last_times_.pop();
+  }
+
+  // Check that we are not sending messages too fast
+  if (static_cast<int>(last_times_.size()) >= queue_size()) {
+    return std::nullopt;
+  }
+
+  const std::optional<uint32_t> queue_index = {next_queue_index_.index()};
+  last_times_.push(now);
 
   message->context.queue_index = *queue_index;
   // Points to the actual data depending on the size set in context. Data may