Add a sent too fast check for simulation and shm
Returns an error if more than queue_size (frequency *
channel_storage_duration) messages were sent in one
channel_storage_duration.
Signed-off-by: Eric Schmiedeberg <eric.schmiedeberg@bluerivertech.com>
Change-Id: Ie41205ba37b66930d8a9082f2d85d7dc3388e3bf
diff --git a/aos/ipc_lib/queue_racer.cc b/aos/ipc_lib/queue_racer.cc
index b738805..414b7fb 100644
--- a/aos/ipc_lib/queue_racer.cc
+++ b/aos/ipc_lib/queue_racer.cc
@@ -26,7 +26,23 @@
QueueRacer::QueueRacer(LocklessQueue queue, int num_threads,
uint64_t num_messages)
- : queue_(queue), num_threads_(num_threads), num_messages_(num_messages) {
+ : queue_(queue),
+ num_threads_(num_threads),
+ num_messages_(num_messages),
+ channel_storage_duration_(std::chrono::nanoseconds(1)),
+ expected_send_results_({LocklessQueueSender::Result::GOOD}),
+ check_writes_and_reads_(true) {
+ Reset();
+}
+
+QueueRacer::QueueRacer(LocklessQueue queue,
+ const QueueRacerConfiguration &config)
+ : queue_(queue),
+ num_threads_(config.num_threads),
+ num_messages_(config.num_messages),
+ channel_storage_duration_(config.channel_storage_duration),
+ expected_send_results_(config.expected_send_results),
+ check_writes_and_reads_(config.check_writes_and_reads) {
Reset();
}
@@ -117,7 +133,9 @@
EXPECT_NE(latest_queue_index_queue_index, QueueIndex::Invalid());
// latest_queue_index is an index, not a count. So it always reads 1
// low.
- EXPECT_GE(latest_queue_index + 1, finished_writes);
+ if (check_writes_and_reads_) {
+ EXPECT_GE(latest_queue_index + 1, finished_writes);
+ }
}
}
}
@@ -133,8 +151,8 @@
}
t.thread = ::std::thread([this, &t, thread_index, &run,
write_wrap_count]() {
- // Build up a sender.
- LocklessQueueSender sender = LocklessQueueSender::Make(queue_).value();
+ LocklessQueueSender sender =
+ LocklessQueueSender::Make(queue_, channel_storage_duration_).value();
CHECK_GE(sender.size(), sizeof(ThreadPlusCount));
// Signal that we are ready to start sending.
@@ -176,9 +194,16 @@
}
++started_writes_;
- sender.Send(sizeof(ThreadPlusCount), aos::monotonic_clock::min_time,
- aos::realtime_clock::min_time, 0xffffffff, UUID::Zero(),
- nullptr, nullptr, nullptr);
+ auto result =
+ sender.Send(sizeof(ThreadPlusCount), aos::monotonic_clock::min_time,
+ aos::realtime_clock::min_time, 0xffffffff, UUID::Zero(),
+ nullptr, nullptr, nullptr);
+
+ CHECK(std::find(expected_send_results_.begin(),
+ expected_send_results_.end(),
+ result) != expected_send_results_.end())
+ << "Unexpected send result: " << result;
+
// Blank out the new scratch buffer, to catch other people using it.
{
char *const new_data = static_cast<char *>(sender.Data()) +
@@ -210,7 +235,9 @@
queue_index_racer.join();
}
- CheckReads(race_reads, write_wrap_count, &threads);
+ if (check_writes_and_reads_) {
+ CheckReads(race_reads, write_wrap_count, &threads);
+ }
// Reap all the threads.
if (race_reads) {
@@ -221,26 +248,28 @@
queue_index_racer.join();
}
- // Confirm that the number of writes matches the expected number of writes.
- ASSERT_EQ(num_threads_ * num_messages_ * (1 + write_wrap_count),
- started_writes_);
- ASSERT_EQ(num_threads_ * num_messages_ * (1 + write_wrap_count),
- finished_writes_);
+ if (check_writes_and_reads_) {
+ // Confirm that the number of writes matches the expected number of writes.
+ ASSERT_EQ(num_threads_ * num_messages_ * (1 + write_wrap_count),
+ started_writes_);
+ ASSERT_EQ(num_threads_ * num_messages_ * (1 + write_wrap_count),
+ finished_writes_);
- // And that every thread sent the right number of messages.
- for (ThreadState &t : threads) {
- if (will_wrap) {
- if (!race_reads) {
- // If we are wrapping, there is a possibility that a thread writes
- // everything *before* we can read any of it, and it all gets
- // overwritten.
- ASSERT_TRUE(t.event_count == ::std::numeric_limits<uint64_t>::max() ||
- t.event_count == (1 + write_wrap_count) * num_messages_)
- << ": Got " << t.event_count << " events, expected "
- << (1 + write_wrap_count) * num_messages_;
+ // And that every thread sent the right number of messages.
+ for (ThreadState &t : threads) {
+ if (will_wrap) {
+ if (!race_reads) {
+ // If we are wrapping, there is a possibility that a thread writes
+ // everything *before* we can read any of it, and it all gets
+ // overwritten.
+ ASSERT_TRUE(t.event_count == ::std::numeric_limits<uint64_t>::max() ||
+ t.event_count == (1 + write_wrap_count) * num_messages_)
+ << ": Got " << t.event_count << " events, expected "
+ << (1 + write_wrap_count) * num_messages_;
+ }
+ } else {
+ ASSERT_EQ(t.event_count, num_messages_);
}
- } else {
- ASSERT_EQ(t.event_count, num_messages_);
}
}
}