Add a sent too fast check for simulation and shm

Returns an error if more than queue_size (frequency *
channel_storage_duration) messages were sent in one
channel_storage_duration.

Signed-off-by: Eric Schmiedeberg <eric.schmiedeberg@bluerivertech.com>
Change-Id: Ie41205ba37b66930d8a9082f2d85d7dc3388e3bf
diff --git a/aos/ipc_lib/lockless_queue_test.cc b/aos/ipc_lib/lockless_queue_test.cc
index 2b9f49c..57dd94b 100644
--- a/aos/ipc_lib/lockless_queue_test.cc
+++ b/aos/ipc_lib/lockless_queue_test.cc
@@ -16,6 +16,7 @@
 #include "aos/ipc_lib/queue_racer.h"
 #include "aos/ipc_lib/signalfd.h"
 #include "aos/realtime.h"
+#include "aos/util/phased_loop.h"
 #include "gflags/gflags.h"
 #include "gtest/gtest.h"
 
@@ -42,6 +43,9 @@
 
 class LocklessQueueTest : public ::testing::Test {
  public:
+  static constexpr monotonic_clock::duration kChannelStorageDuration =
+      std::chrono::milliseconds(500);
+
   LocklessQueueTest() {
     config_.num_watchers = 10;
     config_.num_senders = 100;
@@ -99,8 +103,6 @@
   LocklessQueueConfiguration config_;
 };
 
-typedef LocklessQueueTest LocklessQueueDeathTest;
-
 // Tests that wakeup doesn't do anything if nothing was registered.
 TEST_F(LocklessQueueTest, NoWatcherWakeup) {
   LocklessQueueWakeUpper wake_upper(queue());
@@ -190,9 +192,10 @@
 TEST_F(LocklessQueueTest, TooManySenders) {
   ::std::vector<LocklessQueueSender> senders;
   for (size_t i = 0; i < config_.num_senders; ++i) {
-    senders.emplace_back(LocklessQueueSender::Make(queue()).value());
+    senders.emplace_back(
+        LocklessQueueSender::Make(queue(), kChannelStorageDuration).value());
   }
-  EXPECT_FALSE(LocklessQueueSender::Make(queue()));
+  EXPECT_FALSE(LocklessQueueSender::Make(queue(), kChannelStorageDuration));
 }
 
 // Now, start 2 threads and have them receive the signals.
@@ -226,9 +229,11 @@
 
 // Do a simple send test.
 TEST_F(LocklessQueueTest, Send) {
-  LocklessQueueSender sender = LocklessQueueSender::Make(queue()).value();
+  LocklessQueueSender sender =
+      LocklessQueueSender::Make(queue(), kChannelStorageDuration).value();
   LocklessQueueReader reader(queue());
 
+  time::PhasedLoop loop(std::chrono::microseconds(1), monotonic_clock::now());
   // Send enough messages to wrap.
   for (int i = 0; i < 20000; ++i) {
     // Confirm that the queue index makes sense given the number of sends.
@@ -238,8 +243,10 @@
     // Send a trivial piece of data.
     char data[100];
     size_t s = snprintf(data, sizeof(data), "foobar%d", i);
-    sender.Send(data, s, monotonic_clock::min_time, realtime_clock::min_time,
-                0xffffffffu, UUID::Zero(), nullptr, nullptr, nullptr);
+    EXPECT_EQ(sender.Send(data, s, monotonic_clock::min_time,
+                          realtime_clock::min_time, 0xffffffffu, UUID::Zero(),
+                          nullptr, nullptr, nullptr),
+              LocklessQueueSender::Result::GOOD);
 
     // Confirm that the queue index still makes sense.  This is easier since the
     // empty case has been handled.
@@ -271,6 +278,8 @@
     if (read_result != LocklessQueueReader::Result::GOOD) {
       EXPECT_EQ(read_result, LocklessQueueReader::Result::TOO_OLD);
     }
+
+    loop.SleepUntilNext();
   }
 }
 
@@ -342,7 +351,42 @@
 
 }  // namespace
 
-// Send enough messages to wrap the 32 bit send counter.
+class LocklessQueueTestTooFast : public LocklessQueueTest {
+ public:
+  LocklessQueueTestTooFast() {
+    // Force a scenario where senders get rate limited
+    config_.num_watchers = 1000;
+    config_.num_senders = 100;
+    config_.num_pinners = 5;
+    config_.queue_size = 100;
+    // Exercise the alignment code.  This would throw off alignment.
+    config_.message_data_size = 101;
+
+    // Since our backing store is an array of uint64_t for alignment purposes,
+    // normalize by the size.
+    memory_.resize(LocklessQueueMemorySize(config_) / sizeof(uint64_t));
+
+    Reset();
+  }
+};
+
+// Ensure we always return OK or MESSAGES_SENT_TOO_FAST under an extreme load
+// on the Sender Queue.
+TEST_F(LocklessQueueTestTooFast, MessagesSentTooFast) {
+  PinForTest pin_cpu;
+  uint64_t kNumMessages = 1000000;
+  QueueRacer racer(queue(),
+                   {FLAGS_thread_count,
+                    kNumMessages,
+                    {LocklessQueueSender::Result::GOOD,
+                     LocklessQueueSender::Result::MESSAGES_SENT_TOO_FAST},
+                    std::chrono::milliseconds(500),
+                    false});
+
+  EXPECT_NO_FATAL_FAILURE(racer.RunIteration(false, 0));
+}
+
+// // Send enough messages to wrap the 32 bit send counter.
 TEST_F(LocklessQueueTest, WrappedSend) {
   PinForTest pin_cpu;
   uint64_t kNumMessages = 0x100010000ul;