Add should_fetch to lockless_queue so we can conditionally fetch
For logging, we want to be able to only fetch the next message if it is
time to do so. This adds a callback which is provided all the context
information at the right point to decide if we actually fetch or not.
Next step is to actually expose it through EventLoop.
Change-Id: Ic15eedc46116f8acfcb5a80ed551e89e9d6b7fa8
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/ipc_lib/lockless_queue_test.cc b/aos/ipc_lib/lockless_queue_test.cc
index 34e2762..93ae2a3 100644
--- a/aos/ipc_lib/lockless_queue_test.cc
+++ b/aos/ipc_lib/lockless_queue_test.cc
@@ -234,9 +234,14 @@
LocklessQueueSender::Make(queue(), kChannelStorageDuration).value();
LocklessQueueReader reader(queue());
- time::PhasedLoop loop(std::chrono::microseconds(1), monotonic_clock::now());
+ time::PhasedLoop loop(kChannelStorageDuration / (config_.queue_size - 1),
+ monotonic_clock::now());
+ std::function<bool(const Context &)> should_read = [](const Context &) {
+ return true;
+ };
+
// Send enough messages to wrap.
- for (int i = 0; i < 20000; ++i) {
+ for (int i = 0; i < 2 * static_cast<int>(config_.queue_size); ++i) {
// Confirm that the queue index makes sense given the number of sends.
EXPECT_EQ(reader.LatestIndex().index(),
i == 0 ? QueueIndex::Invalid().index() : i - 1);
@@ -244,7 +249,7 @@
// Send a trivial piece of data.
char data[100];
size_t s = snprintf(data, sizeof(data), "foobar%d", i);
- EXPECT_EQ(sender.Send(data, s, monotonic_clock::min_time,
+ ASSERT_EQ(sender.Send(data, s, monotonic_clock::min_time,
realtime_clock::min_time, 0xffffffffu, UUID::Zero(),
nullptr, nullptr, nullptr),
LocklessQueueSender::Result::GOOD);
@@ -272,12 +277,12 @@
LocklessQueueReader::Result read_result = reader.Read(
index.index(), &monotonic_sent_time, &realtime_sent_time,
&monotonic_remote_time, &realtime_remote_time, &remote_queue_index,
- &source_boot_uuid, &length, &(read_data[0]));
+ &source_boot_uuid, &length, &(read_data[0]), std::ref(should_read));
// This should either return GOOD, or TOO_OLD if it is before the start of
// the queue.
if (read_result != LocklessQueueReader::Result::GOOD) {
- EXPECT_EQ(read_result, LocklessQueueReader::Result::TOO_OLD);
+ ASSERT_EQ(read_result, LocklessQueueReader::Result::TOO_OLD);
}
loop.SleepUntilNext();
@@ -291,6 +296,8 @@
::std::mt19937 generator(0);
::std::uniform_int_distribution<> write_wrap_count_distribution(0, 10);
::std::bernoulli_distribution race_reads_distribution;
+ ::std::bernoulli_distribution set_should_read_distribution;
+ ::std::bernoulli_distribution should_read_result_distribution;
::std::bernoulli_distribution wrap_writes_distribution;
const chrono::seconds print_frequency(FLAGS_print_rate);
@@ -304,12 +311,15 @@
monotonic_clock::time_point next_print_time = start_time + print_frequency;
uint64_t messages = 0;
for (int i = 0; i < FLAGS_min_iterations || monotonic_now < end_time; ++i) {
- bool race_reads = race_reads_distribution(generator);
+ const bool race_reads = race_reads_distribution(generator);
+ const bool set_should_read = set_should_read_distribution(generator);
+ const bool should_read_result = should_read_result_distribution(generator);
int write_wrap_count = write_wrap_count_distribution(generator);
if (!wrap_writes_distribution(generator)) {
write_wrap_count = 0;
}
- EXPECT_NO_FATAL_FAILURE(racer.RunIteration(race_reads, write_wrap_count))
+ EXPECT_NO_FATAL_FAILURE(racer.RunIteration(
+ race_reads, write_wrap_count, set_should_read, should_read_result))
<< ": Running with race_reads: " << race_reads
<< ", and write_wrap_count " << write_wrap_count << " and on iteration "
<< i;
@@ -391,7 +401,7 @@
std::chrono::milliseconds(500),
false});
- EXPECT_NO_FATAL_FAILURE(racer.RunIteration(false, 0));
+ EXPECT_NO_FATAL_FAILURE(racer.RunIteration(false, 0, true, true));
}
// // Send enough messages to wrap the 32 bit send counter.
@@ -401,7 +411,7 @@
QueueRacer racer(queue(), 1, kNumMessages);
const monotonic_clock::time_point start_time = monotonic_clock::now();
- EXPECT_NO_FATAL_FAILURE(racer.RunIteration(false, 0));
+ EXPECT_NO_FATAL_FAILURE(racer.RunIteration(false, 0, false, true));
const monotonic_clock::time_point monotonic_now = monotonic_clock::now();
double elapsed_seconds = chrono::duration_cast<chrono::duration<double>>(
monotonic_now - start_time)