Add should_fetch to lockless_queue so we can conditionally fetch

For logging, we want to be able to only fetch the next message if it is
time to do so.  This adds a callback which is provided all the context
information at the right point to decide if we actually fetch or not.
Next step is to actually expose it through EventLoop.

Change-Id: Ic15eedc46116f8acfcb5a80ed551e89e9d6b7fa8
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/BUILD b/aos/events/BUILD
index 08e3998..a65c71a 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -96,6 +96,20 @@
 )
 
 cc_library(
+    name = "context",
+    hdrs = [
+        "context.h",
+    ],
+    target_compatible_with = ["@platforms//os:linux"],
+    visibility = ["//visibility:public"],
+    deps = [
+        "//aos:flatbuffers",
+        "//aos:uuid",
+        "//aos/time",
+    ],
+)
+
+cc_library(
     name = "event_loop",
     srcs = [
         "event_loop.cc",
@@ -109,6 +123,7 @@
     target_compatible_with = ["@platforms//os:linux"],
     visibility = ["//visibility:public"],
     deps = [
+        ":context",
         ":event_loop_fbs",
         ":timing_statistics",
         "//aos:configuration",
diff --git a/aos/events/context.h b/aos/events/context.h
new file mode 100644
index 0000000..6648949
--- /dev/null
+++ b/aos/events/context.h
@@ -0,0 +1,70 @@
+#ifndef AOS_EVENTS_CONTEXT_H_
+#define AOS_EVENTS_CONTEXT_H_
+
+#include "aos/flatbuffers.h"
+#include "aos/time/time.h"
+#include "aos/uuid.h"
+
+namespace aos {
+
+// Struct available on Watchers, Fetchers, Timers, and PhasedLoops with context
+// about the current message.
+struct Context {
+  // Time that the message was sent on this node, or the timer was triggered.
+  monotonic_clock::time_point monotonic_event_time;
+  // Realtime the message was sent on this node.  This is set to min_time for
+  // Timers and PhasedLoops.
+  realtime_clock::time_point realtime_event_time;
+
+  // The rest are only valid for Watchers and Fetchers.
+
+  // For a single-node configuration, these two are identical to *_event_time.
+  // In a multinode configuration, these are the times that the message was
+  // sent on the original node.
+  monotonic_clock::time_point monotonic_remote_time;
+  realtime_clock::time_point realtime_remote_time;
+
+  // Index in the queue.
+  uint32_t queue_index;
+  // Index into the remote queue.  Useful to determine if data was lost.  In a
+  // single-node configuration, this will match queue_index.
+  uint32_t remote_queue_index;
+
+  // Size of the data sent.
+  size_t size;
+  // Pointer to the data.
+  const void *data;
+
+  // Index of the message buffer. This will be in [0, NumberBuffers) on
+  // read_method=PIN channels, and -1 for other channels.
+  //
+  // This only tells you about the underlying storage for this message, not
+  // anything about its position in the queue. This is only useful for advanced
+  // zero-copy use cases, on read_method=PIN channels.
+  //
+  // This will uniquely identify a message on this channel at a point in time.
+  // For senders, this point in time is while the sender has the message. With
+  // read_method==PIN, this point in time includes while the caller has access
+  // to this context. For other read_methods, this point in time may be before
+  // the caller has access to this context, which makes this pretty useless.
+  int buffer_index;
+
+  // UUID of the remote node which sent this message, or this node in the case
+  // of events which are local to this node.
+  UUID source_boot_uuid = UUID::Zero();
+
+  // Efficiently copies the flatbuffer into a FlatbufferVector, allocating
+  // memory in the process.  It is vital that T matches the type of the
+  // underlying flatbuffer.
+  template <typename T>
+  FlatbufferVector<T> CopyFlatBuffer() const {
+    ResizeableBuffer buffer;
+    buffer.resize(size);
+    memcpy(buffer.data(), data, size);
+    return FlatbufferVector<T>(std::move(buffer));
+  }
+};
+
+}  // namespace aos
+
+#endif  // AOS_EVENTS_CONTEXT_H_
diff --git a/aos/events/event_loop.h b/aos/events/event_loop.h
index e53bc21..17e3e00 100644
--- a/aos/events/event_loop.h
+++ b/aos/events/event_loop.h
@@ -15,6 +15,7 @@
 #include "aos/configuration.h"
 #include "aos/configuration_generated.h"
 #include "aos/events/channel_preallocated_allocator.h"
+#include "aos/events/context.h"
 #include "aos/events/event_loop_event.h"
 #include "aos/events/event_loop_generated.h"
 #include "aos/events/timing_statistics.h"
@@ -34,64 +35,6 @@
 class EventLoop;
 class WatcherState;
 
-// Struct available on Watchers, Fetchers, Timers, and PhasedLoops with context
-// about the current message.
-struct Context {
-  // Time that the message was sent on this node, or the timer was triggered.
-  monotonic_clock::time_point monotonic_event_time;
-  // Realtime the message was sent on this node.  This is set to min_time for
-  // Timers and PhasedLoops.
-  realtime_clock::time_point realtime_event_time;
-
-  // The rest are only valid for Watchers and Fetchers.
-
-  // For a single-node configuration, these two are identical to *_event_time.
-  // In a multinode configuration, these are the times that the message was
-  // sent on the original node.
-  monotonic_clock::time_point monotonic_remote_time;
-  realtime_clock::time_point realtime_remote_time;
-
-  // Index in the queue.
-  uint32_t queue_index;
-  // Index into the remote queue.  Useful to determine if data was lost.  In a
-  // single-node configuration, this will match queue_index.
-  uint32_t remote_queue_index;
-
-  // Size of the data sent.
-  size_t size;
-  // Pointer to the data.
-  const void *data;
-
-  // Index of the message buffer. This will be in [0, NumberBuffers) on
-  // read_method=PIN channels, and -1 for other channels.
-  //
-  // This only tells you about the underlying storage for this message, not
-  // anything about its position in the queue. This is only useful for advanced
-  // zero-copy use cases, on read_method=PIN channels.
-  //
-  // This will uniquely identify a message on this channel at a point in time.
-  // For senders, this point in time is while the sender has the message. With
-  // read_method==PIN, this point in time includes while the caller has access
-  // to this context. For other read_methods, this point in time may be before
-  // the caller has access to this context, which makes this pretty useless.
-  int buffer_index;
-
-  // UUID of the remote node which sent this message, or this node in the case
-  // of events which are local to this node.
-  UUID source_boot_uuid = UUID::Zero();
-
-  // Efficiently copies the flatbuffer into a FlatbufferVector, allocating
-  // memory in the process.  It is vital that T matches the type of the
-  // underlying flatbuffer.
-  template <typename T>
-  FlatbufferVector<T> CopyFlatBuffer() const {
-    ResizeableBuffer buffer;
-    buffer.resize(size);
-    memcpy(buffer.data(), data, size);
-    return FlatbufferVector<T>(std::move(buffer));
-  }
-};
-
 // Raw version of fetcher. Contains a local variable that the fetcher will
 // update.  This is used for reflection and as an interface to implement typed
 // fetchers.
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index 314e366..8ed6e0e 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -226,7 +226,8 @@
         queue_index.index(), &context_.monotonic_event_time,
         &context_.realtime_event_time, &context_.monotonic_remote_time,
         &context_.realtime_remote_time, &context_.remote_queue_index,
-        &context_.source_boot_uuid, &context_.size, copy_buffer);
+        &context_.source_boot_uuid, &context_.size, copy_buffer,
+        std::ref(should_fetch_));
 
     if (read_result == ipc_lib::LocklessQueueReader::Result::GOOD) {
       if (pin_data()) {
@@ -320,6 +321,11 @@
   std::optional<ipc_lib::LocklessQueuePinner> pinner_;
 
   Context context_;
+
+  // Pre-allocated should_fetch function so we don't allocate.
+  std::function<bool(const Context &)> should_fetch_ = [](const Context &) {
+    return true;
+  };
 };
 
 class ShmFetcher : public RawFetcher {
diff --git a/aos/ipc_lib/BUILD b/aos/ipc_lib/BUILD
index b6c2a89..147e0ba 100644
--- a/aos/ipc_lib/BUILD
+++ b/aos/ipc_lib/BUILD
@@ -209,6 +209,7 @@
         "//aos:configuration",
         "//aos:realtime",
         "//aos:uuid",
+        "//aos/events:context",
         "//aos/time",
         "//aos/util:compiler_memory_barrier",
         "@com_github_google_glog//:glog",
diff --git a/aos/ipc_lib/lockless_queue.cc b/aos/ipc_lib/lockless_queue.cc
index 9a53eb0..01706ba 100644
--- a/aos/ipc_lib/lockless_queue.cc
+++ b/aos/ipc_lib/lockless_queue.cc
@@ -1269,7 +1269,7 @@
     monotonic_clock::time_point *monotonic_remote_time,
     realtime_clock::time_point *realtime_remote_time,
     uint32_t *remote_queue_index, UUID *source_boot_uuid, size_t *length,
-    char *data) const {
+    char *data, std::function<bool(const Context &)> should_read) const {
   const size_t queue_size = memory_->queue_size();
 
   // Build up the QueueIndex.
@@ -1343,34 +1343,80 @@
 
   // Then read the data out.  Copy it all out to be deterministic and so we can
   // make length be from either end.
-  *monotonic_sent_time = m->header.monotonic_sent_time;
-  *realtime_sent_time = m->header.realtime_sent_time;
-  if (m->header.remote_queue_index == 0xffffffffu) {
-    *remote_queue_index = queue_index.index();
+  if (!should_read) {
+    *monotonic_sent_time = m->header.monotonic_sent_time;
+    *realtime_sent_time = m->header.realtime_sent_time;
+    if (m->header.remote_queue_index == 0xffffffffu) {
+      *remote_queue_index = queue_index.index();
+    } else {
+      *remote_queue_index = m->header.remote_queue_index;
+    }
+    *monotonic_remote_time = m->header.monotonic_remote_time;
+    *realtime_remote_time = m->header.realtime_remote_time;
+    *source_boot_uuid = m->header.source_boot_uuid;
+    *length = m->header.length;
   } else {
-    *remote_queue_index = m->header.remote_queue_index;
+    // Cache the header results so we don't modify the outputs unless the filter
+    // function says "go".
+    Context context;
+    context.monotonic_event_time = m->header.monotonic_sent_time;
+    context.realtime_event_time = m->header.realtime_sent_time;
+    context.monotonic_remote_time = m->header.monotonic_remote_time;
+    context.realtime_remote_time = m->header.realtime_remote_time;
+    context.queue_index = queue_index.index();
+    if (m->header.remote_queue_index == 0xffffffffu) {
+      context.remote_queue_index = context.queue_index;
+    } else {
+      context.remote_queue_index = m->header.remote_queue_index;
+    }
+    context.source_boot_uuid = m->header.source_boot_uuid;
+    context.size = m->header.length;
+    context.data = nullptr;
+    context.buffer_index = -1;
+
+    // And finally, confirm that the message *still* points to the queue index
+    // we want.  This means it didn't change out from under us. If something
+    // changed out from under us, we were reading it much too late in its
+    // lifetime.
+    aos_compiler_memory_barrier();
+    const QueueIndex final_queue_index = m->header.queue_index.Load(queue_size);
+    if (final_queue_index != queue_index) {
+      VLOG(3) << "Changed out from under us.  Reading " << std::hex
+              << queue_index.index() << ", finished with "
+              << final_queue_index.index() << ", delta: " << std::dec
+              << (final_queue_index.index() - queue_index.index());
+      return Result::OVERWROTE;
+    }
+
+    // We now know that the context is safe to use.  See if we are supposed to
+    // take the message or not.
+    if (!should_read(context)) {
+      return Result::FILTERED;
+    }
+
+    // And now take it.
+    *monotonic_sent_time = context.monotonic_event_time;
+    *realtime_sent_time = context.realtime_event_time;
+    *remote_queue_index = context.remote_queue_index;
+    *monotonic_remote_time = context.monotonic_remote_time;
+    *realtime_remote_time = context.realtime_remote_time;
+    *source_boot_uuid = context.source_boot_uuid;
+    *length = context.size;
   }
-  *monotonic_remote_time = m->header.monotonic_remote_time;
-  *realtime_remote_time = m->header.realtime_remote_time;
-  *source_boot_uuid = m->header.source_boot_uuid;
   if (data) {
     memcpy(data, m->data(memory_->message_data_size()),
            memory_->message_data_size());
-  }
-  *length = m->header.length;
 
-  // And finally, confirm that the message *still* points to the queue index we
-  // want.  This means it didn't change out from under us.
-  // If something changed out from under us, we were reading it much too late in
-  // it's lifetime.
-  aos_compiler_memory_barrier();
-  const QueueIndex final_queue_index = m->header.queue_index.Load(queue_size);
-  if (final_queue_index != queue_index) {
-    VLOG(3) << "Changed out from under us.  Reading " << std::hex
-            << queue_index.index() << ", finished with "
-            << final_queue_index.index() << ", delta: " << std::dec
-            << (final_queue_index.index() - queue_index.index());
-    return Result::OVERWROTE;
+    // Check again since we touched the message again.
+    aos_compiler_memory_barrier();
+    const QueueIndex final_queue_index = m->header.queue_index.Load(queue_size);
+    if (final_queue_index != queue_index) {
+      VLOG(3) << "Changed out from under us.  Reading " << std::hex
+              << queue_index.index() << ", finished with "
+              << final_queue_index.index() << ", delta: " << std::dec
+              << (final_queue_index.index() - queue_index.index());
+      return Result::OVERWROTE;
+    }
   }
 
   return Result::GOOD;
diff --git a/aos/ipc_lib/lockless_queue.h b/aos/ipc_lib/lockless_queue.h
index f83b558..2cafb48 100644
--- a/aos/ipc_lib/lockless_queue.h
+++ b/aos/ipc_lib/lockless_queue.h
@@ -10,6 +10,7 @@
 
 #include "absl/types/span.h"
 
+#include "aos/events/context.h"
 #include "aos/ipc_lib/aos_sync.h"
 #include "aos/ipc_lib/data_alignment.h"
 #include "aos/ipc_lib/index.h"
@@ -406,7 +407,19 @@
 
 class LocklessQueueReader {
  public:
-  enum class Result { TOO_OLD, GOOD, NOTHING_NEW, OVERWROTE };
+  enum class Result {
+    // Message we read was too old and no longer is in the queue.
+    TOO_OLD,
+    // Success!
+    GOOD,
+    // The message is in the future and we haven't written it yet.
+    NOTHING_NEW,
+    // There is a message, but should_read() returned false so we didn't fetch
+    // it.
+    FILTERED,
+    // The message got overwritten while we were reading it.
+    OVERWROTE,
+  };
 
   LocklessQueueReader(LocklessQueue queue) : memory_(queue.const_memory()) {
     queue.Initialize();
@@ -416,7 +429,8 @@
   // NOTHING_NEW until that gets overwritten with new data.  If you ask for an
   // element newer than QueueSize() from the current message, we consider it
   // behind by a large amount and return TOO_OLD.  If the message is modified
-  // out from underneath us as we read it, return OVERWROTE.
+  // out from underneath us as we read it, return OVERWROTE.  If we found a new
+  // message, but the filter function returned false, return FILTERED.
   //
   // data may be nullptr to indicate the data should not be copied.
   Result Read(uint32_t queue_index,
@@ -425,7 +439,8 @@
               monotonic_clock::time_point *monotonic_remote_time,
               realtime_clock::time_point *realtime_remote_time,
               uint32_t *remote_queue_index, UUID *source_boot_uuid,
-              size_t *length, char *data) const;
+              size_t *length, char *data,
+              std::function<bool(const Context &context)> should_read) const;
 
   // Returns the index to the latest queue message.  Returns empty_queue_index()
   // if there are no messages in the queue.  Do note that this index wraps if
diff --git a/aos/ipc_lib/lockless_queue_death_test.cc b/aos/ipc_lib/lockless_queue_death_test.cc
index 3ed3099..c37dc63 100644
--- a/aos/ipc_lib/lockless_queue_death_test.cc
+++ b/aos/ipc_lib/lockless_queue_death_test.cc
@@ -692,6 +692,11 @@
         // increments.
         char last_data = '0';
         int i = 0;
+
+        std::function<bool(const Context &)> should_read = [](const Context &) {
+          return true;
+        };
+
         while (true) {
           monotonic_clock::time_point monotonic_sent_time;
           realtime_clock::time_point realtime_sent_time;
@@ -702,10 +707,11 @@
           char read_data[1024];
           size_t length;
 
-          LocklessQueueReader::Result read_result = reader.Read(
-              i, &monotonic_sent_time, &realtime_sent_time,
-              &monotonic_remote_time, &realtime_remote_time,
-              &remote_queue_index, &source_boot_uuid, &length, &(read_data[0]));
+          LocklessQueueReader::Result read_result =
+              reader.Read(i, &monotonic_sent_time, &realtime_sent_time,
+                          &monotonic_remote_time, &realtime_remote_time,
+                          &remote_queue_index, &source_boot_uuid, &length,
+                          &(read_data[0]), std::ref(should_read));
 
           if (read_result != LocklessQueueReader::Result::GOOD) {
             if (read_result == LocklessQueueReader::Result::TOO_OLD) {
diff --git a/aos/ipc_lib/lockless_queue_test.cc b/aos/ipc_lib/lockless_queue_test.cc
index 34e2762..93ae2a3 100644
--- a/aos/ipc_lib/lockless_queue_test.cc
+++ b/aos/ipc_lib/lockless_queue_test.cc
@@ -234,9 +234,14 @@
       LocklessQueueSender::Make(queue(), kChannelStorageDuration).value();
   LocklessQueueReader reader(queue());
 
-  time::PhasedLoop loop(std::chrono::microseconds(1), monotonic_clock::now());
+  time::PhasedLoop loop(kChannelStorageDuration / (config_.queue_size - 1),
+                        monotonic_clock::now());
+  std::function<bool(const Context &)> should_read = [](const Context &) {
+    return true;
+  };
+
   // Send enough messages to wrap.
-  for (int i = 0; i < 20000; ++i) {
+  for (int i = 0; i < 2 * static_cast<int>(config_.queue_size); ++i) {
     // Confirm that the queue index makes sense given the number of sends.
     EXPECT_EQ(reader.LatestIndex().index(),
               i == 0 ? QueueIndex::Invalid().index() : i - 1);
@@ -244,7 +249,7 @@
     // Send a trivial piece of data.
     char data[100];
     size_t s = snprintf(data, sizeof(data), "foobar%d", i);
-    EXPECT_EQ(sender.Send(data, s, monotonic_clock::min_time,
+    ASSERT_EQ(sender.Send(data, s, monotonic_clock::min_time,
                           realtime_clock::min_time, 0xffffffffu, UUID::Zero(),
                           nullptr, nullptr, nullptr),
               LocklessQueueSender::Result::GOOD);
@@ -272,12 +277,12 @@
     LocklessQueueReader::Result read_result = reader.Read(
         index.index(), &monotonic_sent_time, &realtime_sent_time,
         &monotonic_remote_time, &realtime_remote_time, &remote_queue_index,
-        &source_boot_uuid, &length, &(read_data[0]));
+        &source_boot_uuid, &length, &(read_data[0]), std::ref(should_read));
 
     // This should either return GOOD, or TOO_OLD if it is before the start of
     // the queue.
     if (read_result != LocklessQueueReader::Result::GOOD) {
-      EXPECT_EQ(read_result, LocklessQueueReader::Result::TOO_OLD);
+      ASSERT_EQ(read_result, LocklessQueueReader::Result::TOO_OLD);
     }
 
     loop.SleepUntilNext();
@@ -291,6 +296,8 @@
   ::std::mt19937 generator(0);
   ::std::uniform_int_distribution<> write_wrap_count_distribution(0, 10);
   ::std::bernoulli_distribution race_reads_distribution;
+  ::std::bernoulli_distribution set_should_read_distribution;
+  ::std::bernoulli_distribution should_read_result_distribution;
   ::std::bernoulli_distribution wrap_writes_distribution;
 
   const chrono::seconds print_frequency(FLAGS_print_rate);
@@ -304,12 +311,15 @@
   monotonic_clock::time_point next_print_time = start_time + print_frequency;
   uint64_t messages = 0;
   for (int i = 0; i < FLAGS_min_iterations || monotonic_now < end_time; ++i) {
-    bool race_reads = race_reads_distribution(generator);
+    const bool race_reads = race_reads_distribution(generator);
+    const bool set_should_read = set_should_read_distribution(generator);
+    const bool should_read_result = should_read_result_distribution(generator);
     int write_wrap_count = write_wrap_count_distribution(generator);
     if (!wrap_writes_distribution(generator)) {
       write_wrap_count = 0;
     }
-    EXPECT_NO_FATAL_FAILURE(racer.RunIteration(race_reads, write_wrap_count))
+    EXPECT_NO_FATAL_FAILURE(racer.RunIteration(
+        race_reads, write_wrap_count, set_should_read, should_read_result))
         << ": Running with race_reads: " << race_reads
         << ", and write_wrap_count " << write_wrap_count << " and on iteration "
         << i;
@@ -391,7 +401,7 @@
                     std::chrono::milliseconds(500),
                     false});
 
-  EXPECT_NO_FATAL_FAILURE(racer.RunIteration(false, 0));
+  EXPECT_NO_FATAL_FAILURE(racer.RunIteration(false, 0, true, true));
 }
 
 // // Send enough messages to wrap the 32 bit send counter.
@@ -401,7 +411,7 @@
   QueueRacer racer(queue(), 1, kNumMessages);
 
   const monotonic_clock::time_point start_time = monotonic_clock::now();
-  EXPECT_NO_FATAL_FAILURE(racer.RunIteration(false, 0));
+  EXPECT_NO_FATAL_FAILURE(racer.RunIteration(false, 0, false, true));
   const monotonic_clock::time_point monotonic_now = monotonic_clock::now();
   double elapsed_seconds = chrono::duration_cast<chrono::duration<double>>(
                                monotonic_now - start_time)
diff --git a/aos/ipc_lib/queue_racer.cc b/aos/ipc_lib/queue_racer.cc
index 7c0408d..0b8f1a6 100644
--- a/aos/ipc_lib/queue_racer.cc
+++ b/aos/ipc_lib/queue_racer.cc
@@ -13,7 +13,7 @@
 namespace {
 
 struct ThreadPlusCount {
-  int thread;
+  uint64_t thread;
   uint64_t count;
 };
 
@@ -47,7 +47,8 @@
   Reset();
 }
 
-void QueueRacer::RunIteration(bool race_reads, int write_wrap_count) {
+void QueueRacer::RunIteration(bool race_reads, int write_wrap_count,
+                              bool set_should_read, bool should_read_result) {
   const bool will_wrap = num_messages_ * num_threads_ *
                              static_cast<uint64_t>(1 + write_wrap_count) >
                          queue_.config().queue_size;
@@ -197,7 +198,10 @@
         ++started_writes_;
         auto result =
             sender.Send(sizeof(ThreadPlusCount), aos::monotonic_clock::min_time,
-                        aos::realtime_clock::min_time, 0xffffffff, UUID::Zero(),
+                        aos::realtime_clock::min_time, 0xffffffff,
+                        UUID::FromSpan(absl::Span<const uint8_t>(
+                            reinterpret_cast<const uint8_t *>(&tpc),
+                            sizeof(ThreadPlusCount))),
                         nullptr, nullptr, nullptr);
 
         CHECK(std::find(expected_send_results_.begin(),
@@ -237,7 +241,8 @@
   }
 
   if (check_writes_and_reads_) {
-    CheckReads(race_reads, write_wrap_count, &threads);
+    CheckReads(race_reads, write_wrap_count, &threads, set_should_read,
+               should_read_result);
   }
 
   // Reap all the threads.
@@ -276,7 +281,8 @@
 }
 
 void QueueRacer::CheckReads(bool race_reads, int write_wrap_count,
-                            ::std::vector<ThreadState> *threads) {
+                            ::std::vector<ThreadState> *threads,
+                            bool set_should_read, bool should_read_result) {
   // Now read back the results to double check.
   LocklessQueueReader reader(queue_);
   const bool will_wrap = num_messages_ * num_threads_ * (1 + write_wrap_count) >
@@ -290,6 +296,15 @@
                 LocklessQueueSize(queue_.memory());
   }
 
+  std::function<bool(const Context &)> nop;
+
+  Context fetched_context;
+  std::function<bool(const Context &)> should_read =
+      [&should_read_result, &fetched_context](const Context &context) {
+        fetched_context = context;
+        return should_read_result;
+      };
+
   for (uint64_t i = initial_i;
        i < (1 + write_wrap_count) * num_messages_ * num_threads_; ++i) {
     monotonic_clock::time_point monotonic_sent_time;
@@ -308,8 +323,17 @@
     LocklessQueueReader::Result read_result = reader.Read(
         wrapped_i, &monotonic_sent_time, &realtime_sent_time,
         &monotonic_remote_time, &realtime_remote_time, &remote_queue_index,
-        &source_boot_uuid, &length, &(read_data[0]));
+        &source_boot_uuid, &length, &(read_data[0]),
+        set_should_read ? std::ref(should_read) : std::ref(nop));
 
+    // The code in lockless_queue.cc reads everything but data, checks that the
+    // header hasn't changed, then reads the data.  So, if we succeed and both
+    // end up not being corrupted, then we've confirmed everything works.
+    //
+    // Feed in both combos of should_read and whether or not to return true or
+    // false from should_read.  By capturing the header values inside the
+    // callback, we can also verify the state in the middle of the process to
+    // make sure we have the right boundaries.
     if (race_reads) {
       if (read_result == LocklessQueueReader::Result::NOTHING_NEW) {
         --i;
@@ -322,22 +346,54 @@
         continue;
       }
     }
-    // Every message should be good.
-    ASSERT_EQ(read_result, LocklessQueueReader::Result::GOOD) << ": i is " << i;
+
+    if (!set_should_read) {
+      // Every message should be good.
+      ASSERT_EQ(read_result, LocklessQueueReader::Result::GOOD)
+          << ": i is " << i;
+    } else {
+      if (should_read_result) {
+        ASSERT_EQ(read_result, LocklessQueueReader::Result::GOOD)
+            << ": i is " << i;
+
+        ASSERT_EQ(monotonic_sent_time, fetched_context.monotonic_event_time);
+        ASSERT_EQ(realtime_sent_time, fetched_context.realtime_event_time);
+        ASSERT_EQ(monotonic_remote_time, fetched_context.monotonic_remote_time);
+        ASSERT_EQ(realtime_remote_time, fetched_context.realtime_remote_time);
+        ASSERT_EQ(source_boot_uuid, fetched_context.source_boot_uuid);
+        ASSERT_EQ(remote_queue_index, fetched_context.remote_queue_index);
+        ASSERT_EQ(length, fetched_context.size);
+
+        ASSERT_EQ(
+            absl::Span<const uint8_t>(
+                reinterpret_cast<const uint8_t *>(
+                    read_data + LocklessQueueMessageDataSize(queue_.memory()) -
+                    length),
+                length),
+            source_boot_uuid.span());
+      } else {
+        ASSERT_EQ(read_result, LocklessQueueReader::Result::FILTERED);
+        monotonic_sent_time = fetched_context.monotonic_event_time;
+        realtime_sent_time = fetched_context.realtime_event_time;
+        monotonic_remote_time = fetched_context.monotonic_remote_time;
+        realtime_remote_time = fetched_context.realtime_remote_time;
+        source_boot_uuid = fetched_context.source_boot_uuid;
+        remote_queue_index = fetched_context.remote_queue_index;
+        length = fetched_context.size;
+      }
+    }
 
     // And, confirm that time never went backwards.
     ASSERT_GT(monotonic_sent_time, last_monotonic_sent_time);
     last_monotonic_sent_time = monotonic_sent_time;
 
-    EXPECT_EQ(monotonic_remote_time, aos::monotonic_clock::min_time);
-    EXPECT_EQ(realtime_remote_time, aos::realtime_clock::min_time);
-    EXPECT_EQ(source_boot_uuid, UUID::Zero());
+    ASSERT_EQ(monotonic_remote_time, aos::monotonic_clock::min_time);
+    ASSERT_EQ(realtime_remote_time, aos::realtime_clock::min_time);
 
     ThreadPlusCount tpc;
-    ASSERT_EQ(length, sizeof(ThreadPlusCount));
-    memcpy(&tpc,
-           read_data + LocklessQueueMessageDataSize(queue_.memory()) - length,
-           sizeof(ThreadPlusCount));
+    ASSERT_EQ(source_boot_uuid.span().size(), sizeof(ThreadPlusCount));
+    memcpy(&tpc, source_boot_uuid.span().data(),
+           source_boot_uuid.span().size());
 
     if (will_wrap) {
       // The queue won't chang out from under us, so we should get some amount
diff --git a/aos/ipc_lib/queue_racer.h b/aos/ipc_lib/queue_racer.h
index 3e5ca94..87f2cce 100644
--- a/aos/ipc_lib/queue_racer.h
+++ b/aos/ipc_lib/queue_racer.h
@@ -48,7 +48,12 @@
   // necesitates a loser check at the end.
   //
   // If both are set, run an even looser test.
-  void RunIteration(bool race_reads, int write_wrap_count);
+  //
+  // set_should_read is used to determine if we should pass in a valid
+  // should_read function, and should_read_result is the return result of that
+  // function.
+  void RunIteration(bool race_reads, int write_wrap_count, bool set_should_read,
+                    bool should_read_result);
 
   size_t CurrentIndex() {
     return LocklessQueueReader(queue_).LatestIndex().index();
@@ -64,7 +69,8 @@
   // clean up all the threads.  Otherwise we get an assert on the way out of
   // RunIteration instead of getting all the way back to gtest.
   void CheckReads(bool race_reads, int write_wrap_count,
-                  ::std::vector<ThreadState> *threads);
+                  ::std::vector<ThreadState> *threads, bool set_should_read,
+                  bool should_read_result);
 
   LocklessQueue queue_;
   const uint64_t num_threads_;
@@ -80,6 +86,14 @@
   ::std::atomic<uint64_t> started_writes_;
   // Number of writes completed.
   ::std::atomic<uint64_t> finished_writes_;
+
+  std::function<bool(uint32_t, monotonic_clock::time_point,
+                     realtime_clock::time_point, monotonic_clock::time_point,
+                     realtime_clock::time_point, uint32_t, UUID, size_t)>
+      should_read_ = [](uint32_t, monotonic_clock::time_point,
+                        realtime_clock::time_point, monotonic_clock::time_point,
+                        realtime_clock::time_point, uint32_t, UUID,
+                        size_t) { return true; };
 };
 
 }  // namespace ipc_lib