Add a sent too fast check for simulation and shm

Returns an error if more than queue_size (frequency *
channel_storage_duration) messages were sent in one
channel_storage_duration.

Signed-off-by: Eric Schmiedeberg <eric.schmiedeberg@bluerivertech.com>
Change-Id: Ie41205ba37b66930d8a9082f2d85d7dc3388e3bf
diff --git a/aos/ipc_lib/lockless_queue.cc b/aos/ipc_lib/lockless_queue.cc
index fe86f6c..5f12423 100644
--- a/aos/ipc_lib/lockless_queue.cc
+++ b/aos/ipc_lib/lockless_queue.cc
@@ -4,6 +4,7 @@
 #include <sys/types.h>
 #include <syscall.h>
 #include <unistd.h>
+
 #include <algorithm>
 #include <iomanip>
 #include <iostream>
@@ -503,7 +504,7 @@
 
   bool bad = false;
 
-  for (size_t i = 0; i < redzone.size(); ++i) {
+  for (size_t i = 0; i < redzone.size() && !bad; ++i) {
     if (memcmp(&redzone[i], &redzone_value, 1)) {
       bad = true;
     }
@@ -603,6 +604,7 @@
       Message *const message =
           memory->GetMessage(Index(QueueIndex::Zero(memory->queue_size()), i));
       message->header.queue_index.Invalidate();
+      message->header.monotonic_sent_time = monotonic_clock::min_time;
       FillRedzone(memory, message->PreRedzone(memory->message_data_size()));
       FillRedzone(memory, message->PostRedzone(memory->message_data_size(),
                                                memory->message_size()));
@@ -831,8 +833,16 @@
   return count;
 }
 
-LocklessQueueSender::LocklessQueueSender(LocklessQueueMemory *memory)
-    : memory_(memory) {
+std::ostream &operator<<(std::ostream &os,
+                         const LocklessQueueSender::Result r) {
+  os << static_cast<int>(r);
+  return os;
+}
+
+LocklessQueueSender::LocklessQueueSender(
+    LocklessQueueMemory *memory,
+    monotonic_clock::duration channel_storage_duration)
+    : memory_(memory), channel_storage_duration_(channel_storage_duration) {
   GrabQueueSetupLockOrDie grab_queue_setup_lock(memory_);
 
   // Since we already have the lock, go ahead and try cleaning up.
@@ -877,9 +887,9 @@
 }
 
 std::optional<LocklessQueueSender> LocklessQueueSender::Make(
-    LocklessQueue queue) {
+    LocklessQueue queue, monotonic_clock::duration channel_storage_duration) {
   queue.Initialize();
-  LocklessQueueSender result(queue.memory());
+  LocklessQueueSender result(queue.memory(), channel_storage_duration);
   if (result.sender_index_ != -1) {
     return std::move(result);
   } else {
@@ -904,7 +914,7 @@
   return message->data(memory_->message_data_size());
 }
 
-bool LocklessQueueSender::Send(
+LocklessQueueSender::Result LocklessQueueSender::Send(
     const char *data, size_t length,
     monotonic_clock::time_point monotonic_remote_time,
     realtime_clock::time_point realtime_remote_time,
@@ -921,7 +931,7 @@
               realtime_sent_time, queue_index);
 }
 
-bool LocklessQueueSender::Send(
+LocklessQueueSender::Result LocklessQueueSender::Send(
     size_t length, monotonic_clock::time_point monotonic_remote_time,
     realtime_clock::time_point realtime_remote_time,
     uint32_t remote_queue_index, const UUID &source_boot_uuid,
@@ -936,7 +946,7 @@
   const Index scratch_index = sender->scratch_index.RelaxedLoad();
   Message *const message = memory_->GetMessage(scratch_index);
   if (CheckBothRedzones(memory_, message)) {
-    return false;
+    return Result::INVALID_REDZONE;
   }
 
   // We should have invalidated this when we first got the buffer. Verify that
@@ -995,11 +1005,14 @@
     // This is just a best-effort check to skip reading the clocks if possible.
     // If this fails, then the compare-exchange below definitely would, so we
     // can bail out now.
+    const Message *message_to_replace = memory_->GetMessage(to_replace);
+    bool is_previous_index_valid = false;
     {
       const QueueIndex previous_index =
-          memory_->GetMessage(to_replace)
-              ->header.queue_index.RelaxedLoad(queue_size);
-      if (previous_index != decremented_queue_index && previous_index.valid()) {
+          message_to_replace->header.queue_index.RelaxedLoad(queue_size);
+      is_previous_index_valid = previous_index.valid();
+      if (previous_index != decremented_queue_index &&
+          is_previous_index_valid) {
         // Retry.
         VLOG(3) << "Something fishy happened, queue index doesn't match.  "
                    "Retrying.  Previous index was "
@@ -1011,6 +1024,7 @@
 
     message->header.monotonic_sent_time = ::aos::monotonic_clock::now();
     message->header.realtime_sent_time = ::aos::realtime_clock::now();
+
     if (monotonic_sent_time != nullptr) {
       *monotonic_sent_time = message->header.monotonic_sent_time;
     }
@@ -1021,8 +1035,48 @@
       *queue_index = next_queue_index.index();
     }
 
+    const auto to_replace_monotonic_sent_time =
+        message_to_replace->header.monotonic_sent_time;
+
+    // If we are overwriting a message sent in the last
+    // channel_storage_duration_, that means that we would be sending more than
+    // queue_size messages and would therefore be sending too fast. If the
+    // previous index is not valid then the message hasn't been filled out yet
+    // so we aren't sending too fast. And, if it is not less than the sent time
+    // of the message that we are going to write, someone else beat us and the
+    // compare and exchange below will fail.
+    if (is_previous_index_valid &&
+        (to_replace_monotonic_sent_time <
+         message->header.monotonic_sent_time) &&
+        (message->header.monotonic_sent_time - to_replace_monotonic_sent_time <
+         channel_storage_duration_)) {
+      // There is a possibility that another context beat us to writing out the
+      // message in the queue, but we beat that context to acquiring the sent
+      // time. In this case our sent time is *greater than* the other context's
+      // sent time. Therefore, we can check if we got beat filling out this
+      // message *after* doing the above check to determine if we hit this edge
+      // case. Otherwise, messages are being sent too fast.
+      const QueueIndex previous_index =
+          message_to_replace->header.queue_index.Load(queue_size);
+      if (previous_index != decremented_queue_index && previous_index.valid()) {
+        VLOG(3) << "Got beat during check for messages being sent too fast"
+                   "Retrying.";
+        continue;
+      } else {
+        VLOG(3) << "Messages sent too fast. Returning. Attempted index: "
+                << decremented_queue_index.index()
+                << " message sent time: " << message->header.monotonic_sent_time
+                << "  message to replace sent time: "
+                << to_replace_monotonic_sent_time;
+        // Since we are not using the message obtained from scratch_index
+        // and we are not retrying, we need to invalidate its queue_index.
+        message->header.queue_index.Invalidate();
+        return Result::MESSAGES_SENT_TOO_FAST;
+      }
+    }
+
     // Before we are fully done filling out the message, update the Sender state
-    // with the new index to write.  This re-uses the barrier for the
+    // with the new index to write. This re-uses the barrier for the
     // queue_index store.
     const Index index_to_write(next_queue_index, scratch_index.message_index());
 
@@ -1085,7 +1139,7 @@
   // If anybody is looking at this message (they shouldn't be), then try telling
   // them about it (best-effort).
   memory_->GetMessage(new_scratch)->header.queue_index.RelaxedInvalidate();
-  return true;
+  return Result::GOOD;
 }
 
 int LocklessQueueSender::buffer_index() const {