Add should_fetch to lockless_queue so we can conditionally fetch

For logging, we want to be able to only fetch the next message if it is
time to do so.  This adds a callback which is provided all the context
information at the right point to decide if we actually fetch or not.
Next step is to actually expose it through EventLoop.

Change-Id: Ic15eedc46116f8acfcb5a80ed551e89e9d6b7fa8
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/ipc_lib/lockless_queue_test.cc b/aos/ipc_lib/lockless_queue_test.cc
index 34e2762..93ae2a3 100644
--- a/aos/ipc_lib/lockless_queue_test.cc
+++ b/aos/ipc_lib/lockless_queue_test.cc
@@ -234,9 +234,14 @@
       LocklessQueueSender::Make(queue(), kChannelStorageDuration).value();
   LocklessQueueReader reader(queue());
 
-  time::PhasedLoop loop(std::chrono::microseconds(1), monotonic_clock::now());
+  time::PhasedLoop loop(kChannelStorageDuration / (config_.queue_size - 1),
+                        monotonic_clock::now());
+  std::function<bool(const Context &)> should_read = [](const Context &) {
+    return true;
+  };
+
   // Send enough messages to wrap.
-  for (int i = 0; i < 20000; ++i) {
+  for (int i = 0; i < 2 * static_cast<int>(config_.queue_size); ++i) {
     // Confirm that the queue index makes sense given the number of sends.
     EXPECT_EQ(reader.LatestIndex().index(),
               i == 0 ? QueueIndex::Invalid().index() : i - 1);
@@ -244,7 +249,7 @@
     // Send a trivial piece of data.
     char data[100];
     size_t s = snprintf(data, sizeof(data), "foobar%d", i);
-    EXPECT_EQ(sender.Send(data, s, monotonic_clock::min_time,
+    ASSERT_EQ(sender.Send(data, s, monotonic_clock::min_time,
                           realtime_clock::min_time, 0xffffffffu, UUID::Zero(),
                           nullptr, nullptr, nullptr),
               LocklessQueueSender::Result::GOOD);
@@ -272,12 +277,12 @@
     LocklessQueueReader::Result read_result = reader.Read(
         index.index(), &monotonic_sent_time, &realtime_sent_time,
         &monotonic_remote_time, &realtime_remote_time, &remote_queue_index,
-        &source_boot_uuid, &length, &(read_data[0]));
+        &source_boot_uuid, &length, &(read_data[0]), std::ref(should_read));
 
     // This should either return GOOD, or TOO_OLD if it is before the start of
     // the queue.
     if (read_result != LocklessQueueReader::Result::GOOD) {
-      EXPECT_EQ(read_result, LocklessQueueReader::Result::TOO_OLD);
+      ASSERT_EQ(read_result, LocklessQueueReader::Result::TOO_OLD);
     }
 
     loop.SleepUntilNext();
@@ -291,6 +296,8 @@
   ::std::mt19937 generator(0);
   ::std::uniform_int_distribution<> write_wrap_count_distribution(0, 10);
   ::std::bernoulli_distribution race_reads_distribution;
+  ::std::bernoulli_distribution set_should_read_distribution;
+  ::std::bernoulli_distribution should_read_result_distribution;
   ::std::bernoulli_distribution wrap_writes_distribution;
 
   const chrono::seconds print_frequency(FLAGS_print_rate);
@@ -304,12 +311,15 @@
   monotonic_clock::time_point next_print_time = start_time + print_frequency;
   uint64_t messages = 0;
   for (int i = 0; i < FLAGS_min_iterations || monotonic_now < end_time; ++i) {
-    bool race_reads = race_reads_distribution(generator);
+    const bool race_reads = race_reads_distribution(generator);
+    const bool set_should_read = set_should_read_distribution(generator);
+    const bool should_read_result = should_read_result_distribution(generator);
     int write_wrap_count = write_wrap_count_distribution(generator);
     if (!wrap_writes_distribution(generator)) {
       write_wrap_count = 0;
     }
-    EXPECT_NO_FATAL_FAILURE(racer.RunIteration(race_reads, write_wrap_count))
+    EXPECT_NO_FATAL_FAILURE(racer.RunIteration(
+        race_reads, write_wrap_count, set_should_read, should_read_result))
         << ": Running with race_reads: " << race_reads
         << ", and write_wrap_count " << write_wrap_count << " and on iteration "
         << i;
@@ -391,7 +401,7 @@
                     std::chrono::milliseconds(500),
                     false});
 
-  EXPECT_NO_FATAL_FAILURE(racer.RunIteration(false, 0));
+  EXPECT_NO_FATAL_FAILURE(racer.RunIteration(false, 0, true, true));
 }
 
 // // Send enough messages to wrap the 32 bit send counter.
@@ -401,7 +411,7 @@
   QueueRacer racer(queue(), 1, kNumMessages);
 
   const monotonic_clock::time_point start_time = monotonic_clock::now();
-  EXPECT_NO_FATAL_FAILURE(racer.RunIteration(false, 0));
+  EXPECT_NO_FATAL_FAILURE(racer.RunIteration(false, 0, false, true));
   const monotonic_clock::time_point monotonic_now = monotonic_clock::now();
   double elapsed_seconds = chrono::duration_cast<chrono::duration<double>>(
                                monotonic_now - start_time)