Add should_fetch to lockless_queue so we can conditionally fetch

For logging, we want to be able to only fetch the next message if it is
time to do so.  This adds a callback which is provided all the context
information at the right point to decide if we actually fetch or not.
Next step is to actually expose it through EventLoop.

Change-Id: Ic15eedc46116f8acfcb5a80ed551e89e9d6b7fa8
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/ipc_lib/queue_racer.cc b/aos/ipc_lib/queue_racer.cc
index 7c0408d..0b8f1a6 100644
--- a/aos/ipc_lib/queue_racer.cc
+++ b/aos/ipc_lib/queue_racer.cc
@@ -13,7 +13,7 @@
 namespace {
 
 struct ThreadPlusCount {
-  int thread;
+  uint64_t thread;
   uint64_t count;
 };
 
@@ -47,7 +47,8 @@
   Reset();
 }
 
-void QueueRacer::RunIteration(bool race_reads, int write_wrap_count) {
+void QueueRacer::RunIteration(bool race_reads, int write_wrap_count,
+                              bool set_should_read, bool should_read_result) {
   const bool will_wrap = num_messages_ * num_threads_ *
                              static_cast<uint64_t>(1 + write_wrap_count) >
                          queue_.config().queue_size;
@@ -197,7 +198,10 @@
         ++started_writes_;
         auto result =
             sender.Send(sizeof(ThreadPlusCount), aos::monotonic_clock::min_time,
-                        aos::realtime_clock::min_time, 0xffffffff, UUID::Zero(),
+                        aos::realtime_clock::min_time, 0xffffffff,
+                        UUID::FromSpan(absl::Span<const uint8_t>(
+                            reinterpret_cast<const uint8_t *>(&tpc),
+                            sizeof(ThreadPlusCount))),
                         nullptr, nullptr, nullptr);
 
         CHECK(std::find(expected_send_results_.begin(),
@@ -237,7 +241,8 @@
   }
 
   if (check_writes_and_reads_) {
-    CheckReads(race_reads, write_wrap_count, &threads);
+    CheckReads(race_reads, write_wrap_count, &threads, set_should_read,
+               should_read_result);
   }
 
   // Reap all the threads.
@@ -276,7 +281,8 @@
 }
 
 void QueueRacer::CheckReads(bool race_reads, int write_wrap_count,
-                            ::std::vector<ThreadState> *threads) {
+                            ::std::vector<ThreadState> *threads,
+                            bool set_should_read, bool should_read_result) {
   // Now read back the results to double check.
   LocklessQueueReader reader(queue_);
   const bool will_wrap = num_messages_ * num_threads_ * (1 + write_wrap_count) >
@@ -290,6 +296,15 @@
                 LocklessQueueSize(queue_.memory());
   }
 
+  std::function<bool(const Context &)> nop;
+
+  Context fetched_context;
+  std::function<bool(const Context &)> should_read =
+      [&should_read_result, &fetched_context](const Context &context) {
+        fetched_context = context;
+        return should_read_result;
+      };
+
   for (uint64_t i = initial_i;
        i < (1 + write_wrap_count) * num_messages_ * num_threads_; ++i) {
     monotonic_clock::time_point monotonic_sent_time;
@@ -308,8 +323,17 @@
     LocklessQueueReader::Result read_result = reader.Read(
         wrapped_i, &monotonic_sent_time, &realtime_sent_time,
         &monotonic_remote_time, &realtime_remote_time, &remote_queue_index,
-        &source_boot_uuid, &length, &(read_data[0]));
+        &source_boot_uuid, &length, &(read_data[0]),
+        set_should_read ? std::ref(should_read) : std::ref(nop));
 
+    // The code in lockless_queue.cc reads everything but data, checks that the
+    // header hasn't changed, then reads the data.  So, if we succeed and both
+    // end up not being corrupted, then we've confirmed everything works.
+    //
+    // Feed in both combos of should_read and whether or not to return true or
+    // false from should_read.  By capturing the header values inside the
+    // callback, we can also verify the state in the middle of the process to
+    // make sure we have the right boundaries.
     if (race_reads) {
       if (read_result == LocklessQueueReader::Result::NOTHING_NEW) {
         --i;
@@ -322,22 +346,54 @@
         continue;
       }
     }
-    // Every message should be good.
-    ASSERT_EQ(read_result, LocklessQueueReader::Result::GOOD) << ": i is " << i;
+
+    if (!set_should_read) {
+      // Every message should be good.
+      ASSERT_EQ(read_result, LocklessQueueReader::Result::GOOD)
+          << ": i is " << i;
+    } else {
+      if (should_read_result) {
+        ASSERT_EQ(read_result, LocklessQueueReader::Result::GOOD)
+            << ": i is " << i;
+
+        ASSERT_EQ(monotonic_sent_time, fetched_context.monotonic_event_time);
+        ASSERT_EQ(realtime_sent_time, fetched_context.realtime_event_time);
+        ASSERT_EQ(monotonic_remote_time, fetched_context.monotonic_remote_time);
+        ASSERT_EQ(realtime_remote_time, fetched_context.realtime_remote_time);
+        ASSERT_EQ(source_boot_uuid, fetched_context.source_boot_uuid);
+        ASSERT_EQ(remote_queue_index, fetched_context.remote_queue_index);
+        ASSERT_EQ(length, fetched_context.size);
+
+        ASSERT_EQ(
+            absl::Span<const uint8_t>(
+                reinterpret_cast<const uint8_t *>(
+                    read_data + LocklessQueueMessageDataSize(queue_.memory()) -
+                    length),
+                length),
+            source_boot_uuid.span());
+      } else {
+        ASSERT_EQ(read_result, LocklessQueueReader::Result::FILTERED);
+        monotonic_sent_time = fetched_context.monotonic_event_time;
+        realtime_sent_time = fetched_context.realtime_event_time;
+        monotonic_remote_time = fetched_context.monotonic_remote_time;
+        realtime_remote_time = fetched_context.realtime_remote_time;
+        source_boot_uuid = fetched_context.source_boot_uuid;
+        remote_queue_index = fetched_context.remote_queue_index;
+        length = fetched_context.size;
+      }
+    }
 
     // And, confirm that time never went backwards.
     ASSERT_GT(monotonic_sent_time, last_monotonic_sent_time);
     last_monotonic_sent_time = monotonic_sent_time;
 
-    EXPECT_EQ(monotonic_remote_time, aos::monotonic_clock::min_time);
-    EXPECT_EQ(realtime_remote_time, aos::realtime_clock::min_time);
-    EXPECT_EQ(source_boot_uuid, UUID::Zero());
+    ASSERT_EQ(monotonic_remote_time, aos::monotonic_clock::min_time);
+    ASSERT_EQ(realtime_remote_time, aos::realtime_clock::min_time);
 
     ThreadPlusCount tpc;
-    ASSERT_EQ(length, sizeof(ThreadPlusCount));
-    memcpy(&tpc,
-           read_data + LocklessQueueMessageDataSize(queue_.memory()) - length,
-           sizeof(ThreadPlusCount));
+    ASSERT_EQ(source_boot_uuid.span().size(), sizeof(ThreadPlusCount));
+    memcpy(&tpc, source_boot_uuid.span().data(),
+           source_boot_uuid.span().size());
 
     if (will_wrap) {
       // The queue won't chang out from under us, so we should get some amount