Add a sent too fast check for simulation and shm

Returns an error if more than queue_size (frequency *
channel_storage_duration) messages were sent in one
channel_storage_duration.

Signed-off-by: Eric Schmiedeberg <eric.schmiedeberg@bluerivertech.com>
Change-Id: Ie41205ba37b66930d8a9082f2d85d7dc3388e3bf
diff --git a/aos/ipc_lib/queue_racer.cc b/aos/ipc_lib/queue_racer.cc
index b738805..414b7fb 100644
--- a/aos/ipc_lib/queue_racer.cc
+++ b/aos/ipc_lib/queue_racer.cc
@@ -26,7 +26,23 @@
 
 QueueRacer::QueueRacer(LocklessQueue queue, int num_threads,
                        uint64_t num_messages)
-    : queue_(queue), num_threads_(num_threads), num_messages_(num_messages) {
+    : queue_(queue),
+      num_threads_(num_threads),
+      num_messages_(num_messages),
+      channel_storage_duration_(std::chrono::nanoseconds(1)),
+      expected_send_results_({LocklessQueueSender::Result::GOOD}),
+      check_writes_and_reads_(true) {
+  Reset();
+}
+
+QueueRacer::QueueRacer(LocklessQueue queue,
+                       const QueueRacerConfiguration &config)
+    : queue_(queue),
+      num_threads_(config.num_threads),
+      num_messages_(config.num_messages),
+      channel_storage_duration_(config.channel_storage_duration),
+      expected_send_results_(config.expected_send_results),
+      check_writes_and_reads_(config.check_writes_and_reads) {
   Reset();
 }
 
@@ -117,7 +133,9 @@
           EXPECT_NE(latest_queue_index_queue_index, QueueIndex::Invalid());
           // latest_queue_index is an index, not a count.  So it always reads 1
           // low.
-          EXPECT_GE(latest_queue_index + 1, finished_writes);
+          if (check_writes_and_reads_) {
+            EXPECT_GE(latest_queue_index + 1, finished_writes);
+          }
         }
       }
     }
@@ -133,8 +151,8 @@
     }
     t.thread = ::std::thread([this, &t, thread_index, &run,
                               write_wrap_count]() {
-      // Build up a sender.
-      LocklessQueueSender sender = LocklessQueueSender::Make(queue_).value();
+      LocklessQueueSender sender =
+          LocklessQueueSender::Make(queue_, channel_storage_duration_).value();
       CHECK_GE(sender.size(), sizeof(ThreadPlusCount));
 
       // Signal that we are ready to start sending.
@@ -176,9 +194,16 @@
         }
 
         ++started_writes_;
-        sender.Send(sizeof(ThreadPlusCount), aos::monotonic_clock::min_time,
-                    aos::realtime_clock::min_time, 0xffffffff, UUID::Zero(),
-                    nullptr, nullptr, nullptr);
+        auto result =
+            sender.Send(sizeof(ThreadPlusCount), aos::monotonic_clock::min_time,
+                        aos::realtime_clock::min_time, 0xffffffff, UUID::Zero(),
+                        nullptr, nullptr, nullptr);
+
+        CHECK(std::find(expected_send_results_.begin(),
+                        expected_send_results_.end(),
+                        result) != expected_send_results_.end())
+            << "Unexpected send result: " << result;
+
         // Blank out the new scratch buffer, to catch other people using it.
         {
           char *const new_data = static_cast<char *>(sender.Data()) +
@@ -210,7 +235,9 @@
     queue_index_racer.join();
   }
 
-  CheckReads(race_reads, write_wrap_count, &threads);
+  if (check_writes_and_reads_) {
+    CheckReads(race_reads, write_wrap_count, &threads);
+  }
 
   // Reap all the threads.
   if (race_reads) {
@@ -221,26 +248,28 @@
     queue_index_racer.join();
   }
 
-  // Confirm that the number of writes matches the expected number of writes.
-  ASSERT_EQ(num_threads_ * num_messages_ * (1 + write_wrap_count),
-            started_writes_);
-  ASSERT_EQ(num_threads_ * num_messages_ * (1 + write_wrap_count),
-            finished_writes_);
+  if (check_writes_and_reads_) {
+    // Confirm that the number of writes matches the expected number of writes.
+    ASSERT_EQ(num_threads_ * num_messages_ * (1 + write_wrap_count),
+              started_writes_);
+    ASSERT_EQ(num_threads_ * num_messages_ * (1 + write_wrap_count),
+              finished_writes_);
 
-  // And that every thread sent the right number of messages.
-  for (ThreadState &t : threads) {
-    if (will_wrap) {
-      if (!race_reads) {
-        // If we are wrapping, there is a possibility that a thread writes
-        // everything *before* we can read any of it, and it all gets
-        // overwritten.
-        ASSERT_TRUE(t.event_count == ::std::numeric_limits<uint64_t>::max() ||
-                    t.event_count == (1 + write_wrap_count) * num_messages_)
-            << ": Got " << t.event_count << " events, expected "
-            << (1 + write_wrap_count) * num_messages_;
+    // And that every thread sent the right number of messages.
+    for (ThreadState &t : threads) {
+      if (will_wrap) {
+        if (!race_reads) {
+          // If we are wrapping, there is a possibility that a thread writes
+          // everything *before* we can read any of it, and it all gets
+          // overwritten.
+          ASSERT_TRUE(t.event_count == ::std::numeric_limits<uint64_t>::max() ||
+                      t.event_count == (1 + write_wrap_count) * num_messages_)
+              << ": Got " << t.event_count << " events, expected "
+              << (1 + write_wrap_count) * num_messages_;
+        }
+      } else {
+        ASSERT_EQ(t.event_count, num_messages_);
       }
-    } else {
-      ASSERT_EQ(t.event_count, num_messages_);
     }
   }
 }