Add LocklessQueue::Pinner class

This will allow reading messages from queues without copying, which is
helpful for speeding up the processing of images.

Change-Id: Ia4bb98afa6fe1c1b5cc186e3071c7458f143d77d
diff --git a/aos/ipc_lib/queue_racer.cc b/aos/ipc_lib/queue_racer.cc
index f5b3d7e..69f5f21 100644
--- a/aos/ipc_lib/queue_racer.cc
+++ b/aos/ipc_lib/queue_racer.cc
@@ -139,42 +139,63 @@
     } else {
       t.event_count = 0;
     }
-    t.thread =
-        ::std::thread([this, &t, thread_index, &run, write_wrap_count]() {
-          // Build up a sender.
-          LocklessQueue queue(memory_, config_);
-          LocklessQueue::Sender sender = queue.MakeSender().value();
+    t.thread = ::std::thread([this, &t, thread_index, &run,
+                              write_wrap_count]() {
+      // Build up a sender.
+      LocklessQueue queue(memory_, config_);
+      LocklessQueue::Sender sender = queue.MakeSender().value();
+      CHECK_GE(sender.size(), sizeof(ThreadPlusCount));
 
-          // Signal that we are ready to start sending.
-          t.ready.Set();
+      // Signal that we are ready to start sending.
+      t.ready.Set();
 
-          // Wait until signaled to start running.
-          run.Wait();
+      // Wait until signaled to start running.
+      run.Wait();
 
-          // Gogogo!
-          for (uint64_t i = 0;
-               i < num_messages_ * static_cast<uint64_t>(1 + write_wrap_count);
-               ++i) {
-            char data[sizeof(ThreadPlusCount)];
-            ThreadPlusCount tpc;
-            tpc.thread = thread_index;
-            tpc.count = i;
-
-            memcpy(data, &tpc, sizeof(ThreadPlusCount));
-
-            if (i % 0x800000 == 0x100000) {
-              fprintf(stderr, "Sent %" PRIu64 ", %f %%\n", i,
-                      static_cast<double>(i) /
-                          static_cast<double>(num_messages_ *
-                                              (1 + write_wrap_count)) *
-                          100.0);
+      // Gogogo!
+      for (uint64_t i = 0;
+           i < num_messages_ * static_cast<uint64_t>(1 + write_wrap_count);
+           ++i) {
+        char *const data = static_cast<char *>(sender.Data()) + sender.size() -
+                           sizeof(ThreadPlusCount);
+        const char fill = (i + 55) & 0xFF;
+        memset(data, fill, sizeof(ThreadPlusCount));
+        {
+          bool found_nonzero = false;
+          for (size_t i = 0; i < sizeof(ThreadPlusCount); ++i) {
+            if (data[i] != fill) {
+              found_nonzero = true;
             }
-
-            ++started_writes_;
-            sender.Send(data, sizeof(ThreadPlusCount));
-            ++finished_writes_;
           }
-        });
+          CHECK(!found_nonzero) << ": Somebody else is writing to our buffer";
+        }
+
+        ThreadPlusCount tpc;
+        tpc.thread = thread_index;
+        tpc.count = i;
+
+        memcpy(data, &tpc, sizeof(ThreadPlusCount));
+
+        if (i % 0x800000 == 0x100000) {
+          fprintf(
+              stderr, "Sent %" PRIu64 ", %f %%\n", i,
+              static_cast<double>(i) /
+                  static_cast<double>(num_messages_ * (1 + write_wrap_count)) *
+                  100.0);
+        }
+
+        ++started_writes_;
+        sender.Send(sizeof(ThreadPlusCount));
+        // Blank out the new scratch buffer, to catch other people using it.
+        {
+          char *const new_data = static_cast<char *>(sender.Data()) +
+                                 sender.size() - sizeof(ThreadPlusCount);
+          const char new_fill = ~fill;
+          memset(new_data, new_fill, sizeof(ThreadPlusCount));
+        }
+        ++finished_writes_;
+      }
+    });
     ++thread_index;
   }
 
@@ -303,18 +324,18 @@
 
       if (race_reads) {
         // Make sure nothing goes backwards.  Really not much we can do here.
-        ASSERT_LE((*threads)[tpc.thread].event_count, tpc.count) << ": Thread "
-                                                                 << tpc.thread;
+        ASSERT_LE((*threads)[tpc.thread].event_count, tpc.count)
+            << ": Thread " << tpc.thread;
         (*threads)[tpc.thread].event_count = tpc.count;
       } else {
         // Make sure nothing goes backwards.  Really not much we can do here.
-        ASSERT_EQ((*threads)[tpc.thread].event_count, tpc.count) << ": Thread "
-                                                                 << tpc.thread;
+        ASSERT_EQ((*threads)[tpc.thread].event_count, tpc.count)
+            << ": Thread " << tpc.thread;
       }
     } else {
       // Confirm that we see every message counter from every thread.
-      ASSERT_EQ((*threads)[tpc.thread].event_count, tpc.count) << ": Thread "
-                                                               << tpc.thread;
+      ASSERT_EQ((*threads)[tpc.thread].event_count, tpc.count)
+          << ": Thread " << tpc.thread;
     }
     ++(*threads)[tpc.thread].event_count;
   }