Add LocklessQueue::Pinner class

This will allow reading messages from queues without copying, which is
helpful for speeding up the processing of images.

Change-Id: Ia4bb98afa6fe1c1b5cc186e3071c7458f143d77d
diff --git a/aos/ipc_lib/lockless_queue.cc b/aos/ipc_lib/lockless_queue.cc
index d9a1a71..01e5f24 100644
--- a/aos/ipc_lib/lockless_queue.cc
+++ b/aos/ipc_lib/lockless_queue.cc
@@ -38,6 +38,97 @@
   LocklessQueueMemory *const memory_;
 };
 
+bool IsPinned(LocklessQueueMemory *memory, Index index) {
+  DCHECK(index.valid());
+  const size_t queue_size = memory->queue_size();
+  const QueueIndex message_index =
+      memory->GetMessage(index)->header.queue_index.Load(queue_size);
+  if (!message_index.valid()) {
+    return false;
+  }
+  DCHECK(memory->GetQueue(message_index.Wrapped())->Load() != index)
+      << ": Message is in the queue";
+  for (int pinner_index = 0;
+       pinner_index < static_cast<int>(memory->config.num_pinners);
+       ++pinner_index) {
+    ipc_lib::Pinner *const pinner = memory->GetPinner(pinner_index);
+
+    if (pinner->pinned.RelaxedLoad(queue_size) == message_index) {
+      return true;
+    }
+  }
+  return false;
+}
+
+// Ensures sender->scratch_index (which must contain to_replace) is not pinned.
+//
+// Returns the new scratch_index value.
+Index SwapPinnedSenderScratch(LocklessQueueMemory *const memory,
+                              ipc_lib::Sender *const sender,
+                              const Index to_replace) {
+  // If anybody's trying to pin this message, then grab a message from a pinner
+  // to write into instead, and leave the message we pulled out of the queue
+  // (currently in our scratch_index) with a pinner.
+  //
+  // This loop will terminate in at most one iteration through the pinners in
+  // any steady-state configuration of the memory. There are only as many
+  // Pinner::pinned values to worry about as there are Pinner::scratch_index
+  // values to check against, plus to_replace, which means there will always be
+  // a free one. We might have to make multiple passes if things are being
+  // changed concurrently though, but nobody dying can make this loop fail to
+  // terminate (because the number of processes that can die is bounded, because
+  // no new ones can start while we've got the lock).
+  for (int pinner_index = 0; true;
+       pinner_index = (pinner_index + 1) % memory->config.num_pinners) {
+    if (!IsPinned(memory, to_replace)) {
+      // No pinners on our current scratch_index, so we're fine now.
+      VLOG(3) << "No pinners: " << to_replace.DebugString();
+      return to_replace;
+    }
+
+    ipc_lib::Pinner *const pinner = memory->GetPinner(pinner_index);
+
+    const Index pinner_scratch = pinner->scratch_index.RelaxedLoad();
+    CHECK(pinner_scratch.valid())
+        << ": Pinner scratch_index should always be valid";
+    if (IsPinned(memory, pinner_scratch)) {
+      // Wouldn't do us any good to swap with this one, so don't bother, and
+      // move onto the next one.
+      VLOG(3) << "Also pinned: " << pinner_scratch.DebugString();
+      continue;
+    }
+
+    sender->to_replace.RelaxedStore(pinner_scratch);
+    aos_compiler_memory_barrier();
+    // Give the pinner the message (which is currently in
+    // sender->scratch_index).
+    if (!pinner->scratch_index.CompareAndExchangeStrong(pinner_scratch,
+                                                        to_replace)) {
+      // Somebody swapped into this pinner before us. The new value is probably
+      // pinned, so we don't want to look at it again immediately.
+      VLOG(3) << "Pinner " << pinner_index
+              << " scratch_index changed: " << pinner_scratch.DebugString()
+              << ", " << to_replace.DebugString();
+      sender->to_replace.RelaxedInvalidate();
+      continue;
+    }
+    aos_compiler_memory_barrier();
+    // Now update the sender's scratch space and record that we succeeded.
+    sender->scratch_index.Store(pinner_scratch);
+    aos_compiler_memory_barrier();
+    // And then record that we succeeded, but definitely after the above
+    // store.
+    sender->to_replace.RelaxedInvalidate();
+    VLOG(3) << "Got new scratch message: " << pinner_scratch.DebugString();
+
+    // If it's in a pinner's scratch_index, it should not be in the queue, which
+    // means nobody new can pin it for real. However, they can still attempt to
+    // pin it, which means we can't verify !IsPinned down here.
+
+    return pinner_scratch;
+  }
+}
+
 // Returns true if it succeeded. Returns false if another sender died in the
 // middle.
 bool DoCleanup(LocklessQueueMemory *memory, const GrabQueueSetupLockOrDie &) {
@@ -48,6 +139,7 @@
   aos_compiler_memory_barrier();
 
   const size_t num_senders = memory->num_senders();
+  const size_t num_pinners = memory->num_pinners();
   const size_t queue_size = memory->queue_size();
   const size_t num_messages = memory->num_messages();
 
@@ -105,11 +197,17 @@
     //    to_replace = yyy
     // We are in the act of moving to_replace to scratch_index, but didn't
     // finish.  Easy.
+    //
+    // If doing a pinner swap, we've definitely done it.
 
     // 4) scratch_index = yyy
     //    to_replace = invalid
     // Finished, but died.  Looks like 1)
 
+    // Swapping with a pinner's scratch_index passes through the same states.
+    // We just need to ensure the message that ends up in the senders's
+    // scratch_index isn't pinned, using the same code as sending does.
+
     // Any cleanup code needs to follow the same set of states to be robust to
     // death, so death can be restarted.
 
@@ -117,6 +215,14 @@
       // 1) or 4).  Make sure we aren't corrupted and declare victory.
       CHECK(scratch_index.valid());
 
+      // If it's in 1) with a pinner, the sender might have a pinned message,
+      // so fix that.
+      SwapPinnedSenderScratch(memory, sender, scratch_index);
+
+      // If it's in 4), it may not have completed this step yet. This will
+      // always be a NOP if it's in 1), verified by a DCHECK.
+      memory->GetMessage(scratch_index)->header.queue_index.RelaxedInvalidate();
+
       __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_RELEASE);
       ++valid_senders;
       continue;
@@ -129,6 +235,11 @@
       // Just need to invalidate to_replace to finish.
       sender->to_replace.Invalidate();
 
+      // Make sure to indicate it's an unused message before a sender gets its
+      // hands on it.
+      memory->GetMessage(scratch_index)->header.queue_index.RelaxedInvalidate();
+      aos_compiler_memory_barrier();
+
       // And mark that we succeeded.
       __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_RELEASE);
       ++valid_senders;
@@ -139,6 +250,20 @@
     need_recovery[i] = true;
   }
 
+  // Cleaning up pinners is easy. We don't actually have to do anything, but
+  // invalidating its pinned field might help catch bugs elsewhere trying to
+  // read it before it's set.
+  for (size_t i = 0; i < num_pinners; ++i) {
+    Pinner *const pinner = memory->GetPinner(i);
+    const uint32_t tid =
+        __atomic_load_n(&(pinner->tid.futex), __ATOMIC_ACQUIRE);
+    if (!(tid & FUTEX_OWNER_DIED)) {
+      continue;
+    }
+    pinner->pinned.Invalidate();
+    __atomic_store_n(&(pinner->tid.futex), 0, __ATOMIC_RELEASE);
+  }
+
   // If all the senders are (or were made) good, there is no need to do the hard
   // case.
   if (valid_senders == num_senders) {
@@ -162,18 +287,18 @@
           return false;
         }
         ++num_missing;
-      } else {
-        CHECK(!need_recovery[i]) << ": Somebody else recovered a sender: " << i;
-        // We can do a relaxed load here because we're the only person touching
-        // this sender at this point, if it matters. If it's not a dead sender,
-        // then any message it every has will already be accounted for, so this
-        // will always be a NOP.
-        const Index scratch_index = sender->scratch_index.RelaxedLoad();
-        if (!accounted_for[scratch_index.message_index()]) {
-          ++num_accounted_for;
-        }
-        accounted_for[scratch_index.message_index()] = true;
+        continue;
       }
+      CHECK(!need_recovery[i]) << ": Somebody else recovered a sender: " << i;
+      // We can do a relaxed load here because we're the only person touching
+      // this sender at this point, if it matters. If it's not a dead sender,
+      // then any message it ever has will eventually be accounted for if we
+      // make enough tries through the outer loop.
+      const Index scratch_index = sender->scratch_index.RelaxedLoad();
+      if (!accounted_for[scratch_index.message_index()]) {
+        ++num_accounted_for;
+      }
+      accounted_for[scratch_index.message_index()] = true;
     }
 
     for (size_t i = 0; i < queue_size; ++i) {
@@ -185,6 +310,16 @@
       accounted_for[index.message_index()] = true;
     }
 
+    for (size_t pinner_index = 0; pinner_index < num_pinners; ++pinner_index) {
+      // Same logic as above for scratch_index applies here too.
+      const Index index =
+          memory->GetPinner(pinner_index)->scratch_index.RelaxedLoad();
+      if (!accounted_for[index.message_index()]) {
+        ++num_accounted_for;
+      }
+      accounted_for[index.message_index()] = true;
+    }
+
     CHECK_LE(num_accounted_for + num_missing, num_messages);
   }
 
@@ -224,6 +359,9 @@
         // atomically insert scratch_index into the queue yet.  So
         // invalidate to_replace.
         sender->to_replace.Invalidate();
+        // Sender definitely will not have gotten here, so finish for it.
+        memory->GetMessage(scratch_index)
+            ->header.queue_index.RelaxedInvalidate();
 
         // And then mark this sender clean.
         __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_RELEASE);
@@ -240,6 +378,12 @@
         // scratch_index is accounted for.  That means we did the insert,
         // but didn't record it.
         CHECK(to_replace.valid());
+
+        // Make sure to indicate it's an unused message before a sender gets its
+        // hands on it.
+        memory->GetMessage(to_replace)->header.queue_index.RelaxedInvalidate();
+        aos_compiler_memory_barrier();
+
         // Finish the transaction.  Copy to_replace, then clear it.
 
         sender->scratch_index.Store(to_replace);
@@ -311,6 +455,9 @@
   CHECK_EQ(size % alignof(Sender), 0u);
   size += LocklessQueueMemory::SizeOfSenders(config);
 
+  CHECK_EQ(size % alignof(Pinner), 0u);
+  size += LocklessQueueMemory::SizeOfPinners(config);
+
   return size;
 }
 
@@ -371,6 +518,7 @@
     // TODO(austin): Check these for out of bounds.
     memory->config.num_watchers = config.num_watchers;
     memory->config.num_senders = config.num_senders;
+    memory->config.num_pinners = config.num_pinners;
     memory->config.queue_size = config.queue_size;
     memory->config.message_data_size = config.message_data_size;
 
@@ -403,6 +551,15 @@
       s->to_replace.RelaxedInvalidate();
     }
 
+    for (size_t i = 0; i < memory->num_pinners(); ++i) {
+      ::aos::ipc_lib::Pinner *pinner = memory->GetPinner(i);
+      // Nobody else can possibly be touching these because we haven't set
+      // initialized to true yet.
+      pinner->scratch_index.RelaxedStore(
+          Index(0xffff, i + memory->num_senders() + memory->queue_size()));
+      pinner->pinned.Invalidate();
+    }
+
     aos_compiler_memory_barrier();
     // Signal everything is done.  This needs to be done last, so if we die, we
     // redo initialization.
@@ -609,11 +766,16 @@
     return;
   }
 
-  ::aos::ipc_lib::Sender *s = memory_->GetSender(sender_index_);
+  ::aos::ipc_lib::Sender *const sender = memory_->GetSender(sender_index_);
 
   // Indicate that we are now alive by taking over the slot. If the previous
   // owner died, we still want to do this.
-  death_notification_init(&(s->tid));
+  death_notification_init(&(sender->tid));
+
+  const Index scratch_index = sender->scratch_index.RelaxedLoad();
+  Message *const message = memory_->GetMessage(scratch_index);
+  CHECK(!message->header.queue_index.RelaxedLoad(memory_->queue_size()).valid())
+      << ": " << std::hex << scratch_index.get();
 }
 
 LocklessQueue::Sender::~Sender() {
@@ -622,29 +784,17 @@
   }
 }
 
-std::optional<LocklessQueue::Sender> LocklessQueue::MakeSender() {
-  LocklessQueue::Sender result = LocklessQueue::Sender(memory_);
-  if (result.valid()) {
-    return std::move(result);
-  } else {
-    return std::nullopt;
-  }
-}
-
-QueueIndex ZeroOrValid(QueueIndex index) {
-  if (!index.valid()) {
-    return index.Clear();
-  }
-  return index;
-}
-
 size_t LocklessQueue::Sender::size() { return memory_->message_data_size(); }
 
 void *LocklessQueue::Sender::Data() {
   ::aos::ipc_lib::Sender *sender = memory_->GetSender(sender_index_);
-  Index scratch_index = sender->scratch_index.RelaxedLoad();
-  Message *message = memory_->GetMessage(scratch_index);
-  message->header.queue_index.Invalidate();
+  const Index scratch_index = sender->scratch_index.RelaxedLoad();
+  Message *const message = memory_->GetMessage(scratch_index);
+  // We should have invalidated this when we first got the buffer. Verify that
+  // in debug mode.
+  DCHECK(
+      !message->header.queue_index.RelaxedLoad(memory_->queue_size()).valid())
+      << ": " << std::hex << scratch_index.get();
 
   return message->data(memory_->message_data_size());
 }
@@ -666,6 +816,126 @@
        monotonic_sent_time, realtime_sent_time, queue_index);
 }
 
+LocklessQueue::Pinner::Pinner(LocklessQueueMemory *memory) : memory_(memory) {
+  GrabQueueSetupLockOrDie grab_queue_setup_lock(memory_);
+
+  // Since we already have the lock, go ahead and try cleaning up.
+  Cleanup(memory_, grab_queue_setup_lock);
+
+  const int num_pinners = memory_->num_pinners();
+
+  for (int i = 0; i < num_pinners; ++i) {
+    ::aos::ipc_lib::Pinner *p = memory->GetPinner(i);
+    // This doesn't need synchronization because we're the only process doing
+    // initialization right now, and nobody else will be touching pinners which
+    // we're interested in.
+    const uint32_t tid = __atomic_load_n(&(p->tid.futex), __ATOMIC_RELAXED);
+    if (tid == 0) {
+      pinner_index_ = i;
+      break;
+    }
+  }
+
+  if (pinner_index_ == -1) {
+    VLOG(1) << "Too many pinners, starting to bail.";
+    return;
+  }
+
+  ::aos::ipc_lib::Pinner *p = memory_->GetPinner(pinner_index_);
+  p->pinned.Invalidate();
+
+  // Indicate that we are now alive by taking over the slot. If the previous
+  // owner died, we still want to do this.
+  death_notification_init(&(p->tid));
+}
+
+LocklessQueue::Pinner::~Pinner() {
+  if (valid()) {
+    memory_->GetPinner(pinner_index_)->pinned.Invalidate();
+    aos_compiler_memory_barrier();
+    death_notification_release(&(memory_->GetPinner(pinner_index_)->tid));
+  }
+}
+
+// This method doesn't mess with any scratch_index, so it doesn't have to worry
+// about message ownership.
+bool LocklessQueue::Pinner::PinIndex(uint32_t uint32_queue_index) {
+  const size_t queue_size = memory_->queue_size();
+  const QueueIndex queue_index =
+      QueueIndex::Zero(queue_size).IncrementBy(uint32_queue_index);
+  ipc_lib::Pinner *const pinner = memory_->GetPinner(pinner_index_);
+
+  AtomicIndex *const queue_slot = memory_->GetQueue(queue_index.Wrapped());
+
+  // Indicate that we want to pin this message.
+  pinner->pinned.Store(queue_index);
+  aos_compiler_memory_barrier();
+
+  {
+    const Index message_index = queue_slot->Load();
+    Message *const message = memory_->GetMessage(message_index);
+
+    const QueueIndex message_queue_index =
+        message->header.queue_index.Load(queue_size);
+    if (message_queue_index == queue_index) {
+      VLOG(3) << "Eq: " << std::hex << message_queue_index.index();
+      aos_compiler_memory_barrier();
+      return true;
+    }
+    VLOG(3) << "Message reused: " << std::hex << message_queue_index.index()
+            << ", " << queue_index.index();
+  }
+
+  // Being down here means we asked to pin a message before realizing it's no
+  // longer in the queue, so back that out now.
+  pinner->pinned.Invalidate();
+  VLOG(3) << "Unpinned: " << std::hex << queue_index.index();
+  return false;
+}
+
+size_t LocklessQueue::Pinner::size() const {
+  return memory_->message_data_size();
+}
+
+const void *LocklessQueue::Pinner::Data() const {
+  const size_t queue_size = memory_->queue_size();
+  ::aos::ipc_lib::Pinner *pinner = memory_->GetPinner(pinner_index_);
+  QueueIndex pinned = pinner->pinned.RelaxedLoad(queue_size);
+  CHECK(pinned.valid());
+  const Message *message = memory_->GetMessage(pinned);
+
+  return message->data(memory_->message_data_size());
+}
+
+std::optional<LocklessQueue::Sender> LocklessQueue::MakeSender() {
+  LocklessQueue::Sender result = LocklessQueue::Sender(memory_);
+  if (result.valid()) {
+    return std::move(result);
+  } else {
+    return std::nullopt;
+  }
+}
+
+std::optional<LocklessQueue::Pinner> LocklessQueue::MakePinner() {
+  LocklessQueue::Pinner result = LocklessQueue::Pinner(memory_);
+  if (result.valid()) {
+    return std::move(result);
+  } else {
+    return std::nullopt;
+  }
+}
+
+namespace {
+
+QueueIndex ZeroOrValid(QueueIndex index) {
+  if (!index.valid()) {
+    return index.Clear();
+  }
+  return index;
+}
+
+}  // namespace
+
 void LocklessQueue::Sender::Send(
     size_t length, aos::monotonic_clock::time_point monotonic_remote_time,
     aos::realtime_clock::time_point realtime_remote_time,
@@ -682,6 +952,12 @@
   const Index scratch_index = sender->scratch_index.RelaxedLoad();
   Message *const message = memory_->GetMessage(scratch_index);
 
+  // We should have invalidated this when we first got the buffer. Verify that
+  // in debug mode.
+  DCHECK(
+      !message->header.queue_index.RelaxedLoad(memory_->queue_size()).valid())
+      << ": " << std::hex << scratch_index.get();
+
   message->header.length = length;
   // Pass these through.  Any alternative behavior can be implemented out a
   // layer.
@@ -689,6 +965,7 @@
   message->header.monotonic_remote_time = monotonic_remote_time;
   message->header.realtime_remote_time = realtime_remote_time;
 
+  Index to_replace = Index::Invalid();
   while (true) {
     const QueueIndex actual_next_queue_index =
         memory_->next_queue_index.Load(queue_size);
@@ -698,7 +975,7 @@
 
     // This needs to synchronize with whoever the previous writer at this
     // location was.
-    const Index to_replace = memory_->LoadIndex(next_queue_index);
+    to_replace = memory_->LoadIndex(next_queue_index);
 
     const QueueIndex decremented_queue_index =
         next_queue_index.DecrementBy(queue_size);
@@ -726,9 +1003,14 @@
     }
 
     // Confirm that the message is what it should be.
+    //
+    // This is just a best-effort check to skip reading the clocks if possible.
+    // If this fails, then the compare-exchange below definitely would, so we
+    // can bail out now.
     {
       const QueueIndex previous_index =
-          memory_->GetMessage(to_replace)->header.queue_index.Load(queue_size);
+          memory_->GetMessage(to_replace)
+              ->header.queue_index.RelaxedLoad(queue_size);
       if (previous_index != decremented_queue_index && previous_index.valid()) {
         // Retry.
         VLOG(3) << "Something fishy happened, queue index doesn't match.  "
@@ -794,8 +1076,22 @@
     aos_compiler_memory_barrier();
     // And then record that we succeeded, but definitely after the above store.
     sender->to_replace.RelaxedInvalidate();
+
     break;
   }
+
+  // to_replace is our current scratch_index. It isn't in the queue, which means
+  // nobody new can pin it. They can set their `pinned` to it, but they will
+  // back it out, so they don't count. This means that we just need to find a
+  // message for which no pinner had it in `pinned`, and then we know this
+  // message will never be pinned. We'll start with to_replace, and if that is
+  // pinned then we'll look for a new one to use instead.
+  const Index new_scratch =
+      SwapPinnedSenderScratch(memory_, sender, to_replace);
+
+  // If anybody is looking at this message (they shouldn't be), then try telling
+  // them about it (best-effort).
+  memory_->GetMessage(new_scratch)->header.queue_index.RelaxedInvalidate();
 }
 
 LocklessQueue::ReadResult LocklessQueue::Read(
@@ -827,46 +1123,46 @@
         VLOG(3) << "Matches: " << std::hex << starting_queue_index.index()
                 << ", " << queue_index.DecrementBy(queue_size).index();
         return ReadResult::NOTHING_NEW;
+      }
+
+      // Someone has re-used this message between when we pulled it out of the
+      // queue and when we grabbed its index.  It is pretty hard to deduce
+      // what happened. Just try again.
+      Message *const new_m = memory_->GetMessage(queue_index);
+      if (m != new_m) {
+        m = new_m;
+        VLOG(3) << "Retrying, m doesn't match";
+        continue;
+      }
+
+      // We have confirmed that message still points to the same message. This
+      // means that the message didn't get swapped out from under us, so
+      // starting_queue_index is correct.
+      //
+      // Either we got too far behind (signaled by this being a valid
+      // message), or this is one of the initial messages which are invalid.
+      if (starting_queue_index.valid()) {
+        VLOG(3) << "Too old.  Tried for " << std::hex << queue_index.index()
+                << ", got " << starting_queue_index.index() << ", behind by "
+                << std::dec
+                << (starting_queue_index.index() - queue_index.index());
+        return ReadResult::TOO_OLD;
+      }
+
+      VLOG(3) << "Initial";
+
+      // There isn't a valid message at this location.
+      //
+      // If someone asks for one of the messages within the first go around,
+      // then they need to wait.  They got ahead.  Otherwise, they are
+      // asking for something crazy, like something before the beginning of
+      // the queue.  Tell them that they are behind.
+      if (uint32_queue_index < memory_->queue_size()) {
+        VLOG(3) << "Near zero, " << std::hex << uint32_queue_index;
+        return ReadResult::NOTHING_NEW;
       } else {
-        // Someone has re-used this message between when we pulled it out of the
-        // queue and when we grabbed its index.  It is pretty hard to deduce
-        // what happened. Just try again.
-        Message *const new_m = memory_->GetMessage(queue_index);
-        if (m != new_m) {
-          m = new_m;
-          VLOG(3) << "Retrying, m doesn't match";
-          continue;
-        }
-
-        // We have confirmed that message still points to the same message. This
-        // means that the message didn't get swapped out from under us, so
-        // starting_queue_index is correct.
-        //
-        // Either we got too far behind (signaled by this being a valid
-        // message), or this is one of the initial messages which are invalid.
-        if (starting_queue_index.valid()) {
-          VLOG(3) << "Too old.  Tried for " << std::hex << queue_index.index()
-                  << ", got " << starting_queue_index.index() << ", behind by "
-                  << std::dec
-                  << (starting_queue_index.index() - queue_index.index());
-          return ReadResult::TOO_OLD;
-        }
-
-        VLOG(3) << "Initial";
-
-        // There isn't a valid message at this location.
-        //
-        // If someone asks for one of the messages within the first go around,
-        // then they need to wait.  They got ahead.  Otherwise, they are
-        // asking for something crazy, like something before the beginning of
-        // the queue.  Tell them that they are behind.
-        if (uint32_queue_index < memory_->queue_size()) {
-          VLOG(3) << "Near zero, " << std::hex << uint32_queue_index;
-          return ReadResult::NOTHING_NEW;
-        } else {
-          VLOG(3) << "Not near zero, " << std::hex << uint32_queue_index;
-          return ReadResult::TOO_OLD;
-        }
+        VLOG(3) << "Not near zero, " << std::hex << uint32_queue_index;
+        return ReadResult::TOO_OLD;
       }
     }
     VLOG(3) << "Eq: " << std::hex << starting_queue_index.index() << ", "
@@ -960,6 +1256,8 @@
               << ::std::endl;
   ::std::cout << "    size_t num_senders = " << memory->config.num_senders
               << ::std::endl;
+  ::std::cout << "    size_t num_pinners = " << memory->config.num_pinners
+              << ::std::endl;
   ::std::cout << "    size_t queue_size = " << memory->config.queue_size
               << ::std::endl;
   ::std::cout << "    size_t message_data_size = "
@@ -1049,6 +1347,22 @@
   }
   ::std::cout << "  }" << ::std::endl;
 
+  ::std::cout << "  Pinner pinners[" << memory->num_pinners() << "] {"
+              << ::std::endl;
+  for (size_t i = 0; i < memory->num_pinners(); ++i) {
+    Pinner *p = memory->GetPinner(i);
+    ::std::cout << "    [" << i << "] -> Pinner {" << ::std::endl;
+    ::std::cout << "      aos_mutex tid = " << PrintMutex(&p->tid)
+                << ::std::endl;
+    ::std::cout << "      AtomicIndex scratch_index = "
+                << p->scratch_index.Load().DebugString() << ::std::endl;
+    ::std::cout << "      AtomicIndex pinned = "
+                << p->pinned.Load(memory->queue_size()).DebugString()
+                << ::std::endl;
+    ::std::cout << "    }" << ::std::endl;
+  }
+  ::std::cout << "  }" << ::std::endl;
+
   ::std::cout << "  Watcher watchers[" << memory->num_watchers() << "] {"
               << ::std::endl;
   for (size_t i = 0; i < memory->num_watchers(); ++i) {