Add LocklessQueue::Pinner class
This will allow reading messages from queues without copying, which is
helpful for speeding up the processing of images.
Change-Id: Ia4bb98afa6fe1c1b5cc186e3071c7458f143d77d
diff --git a/aos/ipc_lib/lockless_queue.cc b/aos/ipc_lib/lockless_queue.cc
index d9a1a71..01e5f24 100644
--- a/aos/ipc_lib/lockless_queue.cc
+++ b/aos/ipc_lib/lockless_queue.cc
@@ -38,6 +38,97 @@
LocklessQueueMemory *const memory_;
};
+bool IsPinned(LocklessQueueMemory *memory, Index index) {
+ DCHECK(index.valid());
+ const size_t queue_size = memory->queue_size();
+ const QueueIndex message_index =
+ memory->GetMessage(index)->header.queue_index.Load(queue_size);
+ if (!message_index.valid()) {
+ return false;
+ }
+ DCHECK(memory->GetQueue(message_index.Wrapped())->Load() != index)
+ << ": Message is in the queue";
+ for (int pinner_index = 0;
+ pinner_index < static_cast<int>(memory->config.num_pinners);
+ ++pinner_index) {
+ ipc_lib::Pinner *const pinner = memory->GetPinner(pinner_index);
+
+ if (pinner->pinned.RelaxedLoad(queue_size) == message_index) {
+ return true;
+ }
+ }
+ return false;
+}
+
+// Ensures sender->scratch_index (which must contain to_replace) is not pinned.
+//
+// Returns the new scratch_index value.
+Index SwapPinnedSenderScratch(LocklessQueueMemory *const memory,
+ ipc_lib::Sender *const sender,
+ const Index to_replace) {
+ // If anybody's trying to pin this message, then grab a message from a pinner
+ // to write into instead, and leave the message we pulled out of the queue
+ // (currently in our scratch_index) with a pinner.
+ //
+ // This loop will terminate in at most one iteration through the pinners in
+ // any steady-state configuration of the memory. There are only as many
+ // Pinner::pinned values to worry about as there are Pinner::scratch_index
+ // values to check against, plus to_replace, which means there will always be
+ // a free one. We might have to make multiple passes if things are being
+ // changed concurrently though, but nobody dying can make this loop fail to
+ // terminate (because the number of processes that can die is bounded, because
+ // no new ones can start while we've got the lock).
+ for (int pinner_index = 0; true;
+ pinner_index = (pinner_index + 1) % memory->config.num_pinners) {
+ if (!IsPinned(memory, to_replace)) {
+ // No pinners on our current scratch_index, so we're fine now.
+ VLOG(3) << "No pinners: " << to_replace.DebugString();
+ return to_replace;
+ }
+
+ ipc_lib::Pinner *const pinner = memory->GetPinner(pinner_index);
+
+ const Index pinner_scratch = pinner->scratch_index.RelaxedLoad();
+ CHECK(pinner_scratch.valid())
+ << ": Pinner scratch_index should always be valid";
+ if (IsPinned(memory, pinner_scratch)) {
+ // Wouldn't do us any good to swap with this one, so don't bother, and
+ // move onto the next one.
+ VLOG(3) << "Also pinned: " << pinner_scratch.DebugString();
+ continue;
+ }
+
+ sender->to_replace.RelaxedStore(pinner_scratch);
+ aos_compiler_memory_barrier();
+ // Give the pinner the message (which is currently in
+ // sender->scratch_index).
+ if (!pinner->scratch_index.CompareAndExchangeStrong(pinner_scratch,
+ to_replace)) {
+ // Somebody swapped into this pinner before us. The new value is probably
+ // pinned, so we don't want to look at it again immediately.
+ VLOG(3) << "Pinner " << pinner_index
+ << " scratch_index changed: " << pinner_scratch.DebugString()
+ << ", " << to_replace.DebugString();
+ sender->to_replace.RelaxedInvalidate();
+ continue;
+ }
+ aos_compiler_memory_barrier();
+ // Now update the sender's scratch space and record that we succeeded.
+ sender->scratch_index.Store(pinner_scratch);
+ aos_compiler_memory_barrier();
+ // And then record that we succeeded, but definitely after the above
+ // store.
+ sender->to_replace.RelaxedInvalidate();
+ VLOG(3) << "Got new scratch message: " << pinner_scratch.DebugString();
+
+ // If it's in a pinner's scratch_index, it should not be in the queue, which
+ // means nobody new can pin it for real. However, they can still attempt to
+ // pin it, which means we can't verify !IsPinned down here.
+
+ return pinner_scratch;
+ }
+}
+
// Returns true if it succeeded. Returns false if another sender died in the
// middle.
bool DoCleanup(LocklessQueueMemory *memory, const GrabQueueSetupLockOrDie &) {
@@ -48,6 +139,7 @@
aos_compiler_memory_barrier();
const size_t num_senders = memory->num_senders();
+ const size_t num_pinners = memory->num_pinners();
const size_t queue_size = memory->queue_size();
const size_t num_messages = memory->num_messages();
@@ -105,11 +197,17 @@
// to_replace = yyy
// We are in the act of moving to_replace to scratch_index, but didn't
// finish. Easy.
+ //
+ // If doing a pinner swap, we've definitely done it.
// 4) scratch_index = yyy
// to_replace = invalid
// Finished, but died. Looks like 1)
+ // Swapping with a pinner's scratch_index passes through the same states.
+ // We just need to ensure the message that ends up in the senders's
+ // scratch_index isn't pinned, using the same code as sending does.
+
// Any cleanup code needs to follow the same set of states to be robust to
// death, so death can be restarted.
@@ -117,6 +215,14 @@
// 1) or 4). Make sure we aren't corrupted and declare victory.
CHECK(scratch_index.valid());
+ // If it's in 1) with a pinner, the sender might have a pinned message,
+ // so fix that.
+ SwapPinnedSenderScratch(memory, sender, scratch_index);
+
+ // If it's in 4), it may not have completed this step yet. This will
+ // always be a NOP if it's in 1), verified by a DCHECK.
+ memory->GetMessage(scratch_index)->header.queue_index.RelaxedInvalidate();
+
__atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_RELEASE);
++valid_senders;
continue;
@@ -129,6 +235,11 @@
// Just need to invalidate to_replace to finish.
sender->to_replace.Invalidate();
+ // Make sure to indicate it's an unused message before a sender gets its
+ // hands on it.
+ memory->GetMessage(scratch_index)->header.queue_index.RelaxedInvalidate();
+ aos_compiler_memory_barrier();
+
// And mark that we succeeded.
__atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_RELEASE);
++valid_senders;
@@ -139,6 +250,20 @@
need_recovery[i] = true;
}
+ // Cleaning up pinners is easy. We don't actually have to do anything, but
+ // invalidating its pinned field might help catch bugs elsewhere trying to
+ // read it before it's set.
+ for (size_t i = 0; i < num_pinners; ++i) {
+ Pinner *const pinner = memory->GetPinner(i);
+ const uint32_t tid =
+ __atomic_load_n(&(pinner->tid.futex), __ATOMIC_ACQUIRE);
+ if (!(tid & FUTEX_OWNER_DIED)) {
+ continue;
+ }
+ pinner->pinned.Invalidate();
+ __atomic_store_n(&(pinner->tid.futex), 0, __ATOMIC_RELEASE);
+ }
+
// If all the senders are (or were made) good, there is no need to do the hard
// case.
if (valid_senders == num_senders) {
@@ -162,18 +287,18 @@
return false;
}
++num_missing;
- } else {
- CHECK(!need_recovery[i]) << ": Somebody else recovered a sender: " << i;
- // We can do a relaxed load here because we're the only person touching
- // this sender at this point, if it matters. If it's not a dead sender,
- // then any message it every has will already be accounted for, so this
- // will always be a NOP.
- const Index scratch_index = sender->scratch_index.RelaxedLoad();
- if (!accounted_for[scratch_index.message_index()]) {
- ++num_accounted_for;
- }
- accounted_for[scratch_index.message_index()] = true;
+ continue;
}
+ CHECK(!need_recovery[i]) << ": Somebody else recovered a sender: " << i;
+ // We can do a relaxed load here because we're the only person touching
+ // this sender at this point, if it matters. If it's not a dead sender,
+ // then any message it ever has will eventually be accounted for if we
+ // make enough tries through the outer loop.
+ const Index scratch_index = sender->scratch_index.RelaxedLoad();
+ if (!accounted_for[scratch_index.message_index()]) {
+ ++num_accounted_for;
+ }
+ accounted_for[scratch_index.message_index()] = true;
}
for (size_t i = 0; i < queue_size; ++i) {
@@ -185,6 +310,16 @@
accounted_for[index.message_index()] = true;
}
+ for (size_t pinner_index = 0; pinner_index < num_pinners; ++pinner_index) {
+ // Same logic as above for scratch_index applies here too.
+ const Index index =
+ memory->GetPinner(pinner_index)->scratch_index.RelaxedLoad();
+ if (!accounted_for[index.message_index()]) {
+ ++num_accounted_for;
+ }
+ accounted_for[index.message_index()] = true;
+ }
+
CHECK_LE(num_accounted_for + num_missing, num_messages);
}
@@ -224,6 +359,9 @@
// atomically insert scratch_index into the queue yet. So
// invalidate to_replace.
sender->to_replace.Invalidate();
+ // Sender definitely will not have gotten here, so finish for it.
+ memory->GetMessage(scratch_index)
+ ->header.queue_index.RelaxedInvalidate();
// And then mark this sender clean.
__atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_RELEASE);
@@ -240,6 +378,12 @@
// scratch_index is accounted for. That means we did the insert,
// but didn't record it.
CHECK(to_replace.valid());
+
+ // Make sure to indicate it's an unused message before a sender gets its
+ // hands on it.
+ memory->GetMessage(to_replace)->header.queue_index.RelaxedInvalidate();
+ aos_compiler_memory_barrier();
+
// Finish the transaction. Copy to_replace, then clear it.
sender->scratch_index.Store(to_replace);
@@ -311,6 +455,9 @@
CHECK_EQ(size % alignof(Sender), 0u);
size += LocklessQueueMemory::SizeOfSenders(config);
+ CHECK_EQ(size % alignof(Pinner), 0u);
+ size += LocklessQueueMemory::SizeOfPinners(config);
+
return size;
}
@@ -371,6 +518,7 @@
// TODO(austin): Check these for out of bounds.
memory->config.num_watchers = config.num_watchers;
memory->config.num_senders = config.num_senders;
+ memory->config.num_pinners = config.num_pinners;
memory->config.queue_size = config.queue_size;
memory->config.message_data_size = config.message_data_size;
@@ -403,6 +551,15 @@
s->to_replace.RelaxedInvalidate();
}
+ for (size_t i = 0; i < memory->num_pinners(); ++i) {
+ ::aos::ipc_lib::Pinner *pinner = memory->GetPinner(i);
+ // Nobody else can possibly be touching these because we haven't set
+ // initialized to true yet.
+ pinner->scratch_index.RelaxedStore(
+ Index(0xffff, i + memory->num_senders() + memory->queue_size()));
+ pinner->pinned.Invalidate();
+ }
+
aos_compiler_memory_barrier();
// Signal everything is done. This needs to be done last, so if we die, we
// redo initialization.
@@ -609,11 +766,16 @@
return;
}
- ::aos::ipc_lib::Sender *s = memory_->GetSender(sender_index_);
+ ::aos::ipc_lib::Sender *const sender = memory_->GetSender(sender_index_);
// Indicate that we are now alive by taking over the slot. If the previous
// owner died, we still want to do this.
- death_notification_init(&(s->tid));
+ death_notification_init(&(sender->tid));
+
+ const Index scratch_index = sender->scratch_index.RelaxedLoad();
+ Message *const message = memory_->GetMessage(scratch_index);
+ CHECK(!message->header.queue_index.RelaxedLoad(memory_->queue_size()).valid())
+ << ": " << std::hex << scratch_index.get();
}
LocklessQueue::Sender::~Sender() {
@@ -622,29 +784,17 @@
}
}
-std::optional<LocklessQueue::Sender> LocklessQueue::MakeSender() {
- LocklessQueue::Sender result = LocklessQueue::Sender(memory_);
- if (result.valid()) {
- return std::move(result);
- } else {
- return std::nullopt;
- }
-}
-
-QueueIndex ZeroOrValid(QueueIndex index) {
- if (!index.valid()) {
- return index.Clear();
- }
- return index;
-}
-
size_t LocklessQueue::Sender::size() { return memory_->message_data_size(); }
void *LocklessQueue::Sender::Data() {
::aos::ipc_lib::Sender *sender = memory_->GetSender(sender_index_);
- Index scratch_index = sender->scratch_index.RelaxedLoad();
- Message *message = memory_->GetMessage(scratch_index);
- message->header.queue_index.Invalidate();
+ const Index scratch_index = sender->scratch_index.RelaxedLoad();
+ Message *const message = memory_->GetMessage(scratch_index);
+ // We should have invalidated this when we first got the buffer. Verify that
+ // in debug mode.
+ DCHECK(
+ !message->header.queue_index.RelaxedLoad(memory_->queue_size()).valid())
+ << ": " << std::hex << scratch_index.get();
return message->data(memory_->message_data_size());
}
@@ -666,6 +816,126 @@
monotonic_sent_time, realtime_sent_time, queue_index);
}
+LocklessQueue::Pinner::Pinner(LocklessQueueMemory *memory) : memory_(memory) {
+ GrabQueueSetupLockOrDie grab_queue_setup_lock(memory_);
+
+ // Since we already have the lock, go ahead and try cleaning up.
+ Cleanup(memory_, grab_queue_setup_lock);
+
+ const int num_pinners = memory_->num_pinners();
+
+ for (int i = 0; i < num_pinners; ++i) {
+ ::aos::ipc_lib::Pinner *p = memory->GetPinner(i);
+ // This doesn't need synchronization because we're the only process doing
+ // initialization right now, and nobody else will be touching pinners which
+ // we're interested in.
+ const uint32_t tid = __atomic_load_n(&(p->tid.futex), __ATOMIC_RELAXED);
+ if (tid == 0) {
+ pinner_index_ = i;
+ break;
+ }
+ }
+
+ if (pinner_index_ == -1) {
+ VLOG(1) << "Too many pinners, starting to bail.";
+ return;
+ }
+
+ ::aos::ipc_lib::Pinner *p = memory_->GetPinner(pinner_index_);
+ p->pinned.Invalidate();
+
+ // Indicate that we are now alive by taking over the slot. If the previous
+ // owner died, we still want to do this.
+ death_notification_init(&(p->tid));
+}
+
+LocklessQueue::Pinner::~Pinner() {
+ if (valid()) {
+ memory_->GetPinner(pinner_index_)->pinned.Invalidate();
+ aos_compiler_memory_barrier();
+ death_notification_release(&(memory_->GetPinner(pinner_index_)->tid));
+ }
+}
+
+// This method doesn't mess with any scratch_index, so it doesn't have to worry
+// about message ownership.
+bool LocklessQueue::Pinner::PinIndex(uint32_t uint32_queue_index) {
+ const size_t queue_size = memory_->queue_size();
+ const QueueIndex queue_index =
+ QueueIndex::Zero(queue_size).IncrementBy(uint32_queue_index);
+ ipc_lib::Pinner *const pinner = memory_->GetPinner(pinner_index_);
+
+ AtomicIndex *const queue_slot = memory_->GetQueue(queue_index.Wrapped());
+
+ // Indicate that we want to pin this message.
+ pinner->pinned.Store(queue_index);
+ aos_compiler_memory_barrier();
+
+ {
+ const Index message_index = queue_slot->Load();
+ Message *const message = memory_->GetMessage(message_index);
+
+ const QueueIndex message_queue_index =
+ message->header.queue_index.Load(queue_size);
+ if (message_queue_index == queue_index) {
+ VLOG(3) << "Eq: " << std::hex << message_queue_index.index();
+ aos_compiler_memory_barrier();
+ return true;
+ }
+ VLOG(3) << "Message reused: " << std::hex << message_queue_index.index()
+ << ", " << queue_index.index();
+ }
+
+ // Being down here means we asked to pin a message before realizing it's no
+ // longer in the queue, so back that out now.
+ pinner->pinned.Invalidate();
+ VLOG(3) << "Unpinned: " << std::hex << queue_index.index();
+ return false;
+}
+
+size_t LocklessQueue::Pinner::size() const {
+ return memory_->message_data_size();
+}
+
+const void *LocklessQueue::Pinner::Data() const {
+ const size_t queue_size = memory_->queue_size();
+ ::aos::ipc_lib::Pinner *pinner = memory_->GetPinner(pinner_index_);
+ QueueIndex pinned = pinner->pinned.RelaxedLoad(queue_size);
+ CHECK(pinned.valid());
+ const Message *message = memory_->GetMessage(pinned);
+
+ return message->data(memory_->message_data_size());
+}
+
+std::optional<LocklessQueue::Sender> LocklessQueue::MakeSender() {
+ LocklessQueue::Sender result = LocklessQueue::Sender(memory_);
+ if (result.valid()) {
+ return std::move(result);
+ } else {
+ return std::nullopt;
+ }
+}
+
+std::optional<LocklessQueue::Pinner> LocklessQueue::MakePinner() {
+ LocklessQueue::Pinner result = LocklessQueue::Pinner(memory_);
+ if (result.valid()) {
+ return std::move(result);
+ } else {
+ return std::nullopt;
+ }
+}
+
+namespace {
+
+QueueIndex ZeroOrValid(QueueIndex index) {
+ if (!index.valid()) {
+ return index.Clear();
+ }
+ return index;
+}
+
+} // namespace
+
void LocklessQueue::Sender::Send(
size_t length, aos::monotonic_clock::time_point monotonic_remote_time,
aos::realtime_clock::time_point realtime_remote_time,
@@ -682,6 +952,12 @@
const Index scratch_index = sender->scratch_index.RelaxedLoad();
Message *const message = memory_->GetMessage(scratch_index);
+ // We should have invalidated this when we first got the buffer. Verify that
+ // in debug mode.
+ DCHECK(
+ !message->header.queue_index.RelaxedLoad(memory_->queue_size()).valid())
+ << ": " << std::hex << scratch_index.get();
+
message->header.length = length;
// Pass these through. Any alternative behavior can be implemented out a
// layer.
@@ -689,6 +965,7 @@
message->header.monotonic_remote_time = monotonic_remote_time;
message->header.realtime_remote_time = realtime_remote_time;
+ Index to_replace = Index::Invalid();
while (true) {
const QueueIndex actual_next_queue_index =
memory_->next_queue_index.Load(queue_size);
@@ -698,7 +975,7 @@
// This needs to synchronize with whoever the previous writer at this
// location was.
- const Index to_replace = memory_->LoadIndex(next_queue_index);
+ to_replace = memory_->LoadIndex(next_queue_index);
const QueueIndex decremented_queue_index =
next_queue_index.DecrementBy(queue_size);
@@ -726,9 +1003,14 @@
}
// Confirm that the message is what it should be.
+ //
+ // This is just a best-effort check to skip reading the clocks if possible.
+ // If this fails, then the compare-exchange below definitely would, so we
+ // can bail out now.
{
const QueueIndex previous_index =
- memory_->GetMessage(to_replace)->header.queue_index.Load(queue_size);
+ memory_->GetMessage(to_replace)
+ ->header.queue_index.RelaxedLoad(queue_size);
if (previous_index != decremented_queue_index && previous_index.valid()) {
// Retry.
VLOG(3) << "Something fishy happened, queue index doesn't match. "
@@ -794,8 +1076,22 @@
aos_compiler_memory_barrier();
// And then record that we succeeded, but definitely after the above store.
sender->to_replace.RelaxedInvalidate();
+
break;
}
+
+ // to_replace is our current scratch_index. It isn't in the queue, which means
+ // nobody new can pin it. They can set their `pinned` to it, but they will
+ // back it out, so they don't count. This means that we just need to find a
+ // message for which no pinner had it in `pinned`, and then we know this
+ // message will never be pinned. We'll start with to_replace, and if that is
+ // pinned then we'll look for a new one to use instead.
+ const Index new_scratch =
+ SwapPinnedSenderScratch(memory_, sender, to_replace);
+
+ // If anybody is looking at this message (they shouldn't be), then try telling
+ // them about it (best-effort).
+ memory_->GetMessage(new_scratch)->header.queue_index.RelaxedInvalidate();
}
LocklessQueue::ReadResult LocklessQueue::Read(
@@ -827,46 +1123,46 @@
VLOG(3) << "Matches: " << std::hex << starting_queue_index.index()
<< ", " << queue_index.DecrementBy(queue_size).index();
return ReadResult::NOTHING_NEW;
+ }
+
+ // Someone has re-used this message between when we pulled it out of the
+ // queue and when we grabbed its index. It is pretty hard to deduce
+ // what happened. Just try again.
+ Message *const new_m = memory_->GetMessage(queue_index);
+ if (m != new_m) {
+ m = new_m;
+ VLOG(3) << "Retrying, m doesn't match";
+ continue;
+ }
+
+ // We have confirmed that message still points to the same message. This
+ // means that the message didn't get swapped out from under us, so
+ // starting_queue_index is correct.
+ //
+ // Either we got too far behind (signaled by this being a valid
+ // message), or this is one of the initial messages which are invalid.
+ if (starting_queue_index.valid()) {
+ VLOG(3) << "Too old. Tried for " << std::hex << queue_index.index()
+ << ", got " << starting_queue_index.index() << ", behind by "
+ << std::dec
+ << (starting_queue_index.index() - queue_index.index());
+ return ReadResult::TOO_OLD;
+ }
+
+ VLOG(3) << "Initial";
+
+ // There isn't a valid message at this location.
+ //
+ // If someone asks for one of the messages within the first go around,
+ // then they need to wait. They got ahead. Otherwise, they are
+ // asking for something crazy, like something before the beginning of
+ // the queue. Tell them that they are behind.
+ if (uint32_queue_index < memory_->queue_size()) {
+ VLOG(3) << "Near zero, " << std::hex << uint32_queue_index;
+ return ReadResult::NOTHING_NEW;
} else {
- // Someone has re-used this message between when we pulled it out of the
- // queue and when we grabbed its index. It is pretty hard to deduce
- // what happened. Just try again.
- Message *const new_m = memory_->GetMessage(queue_index);
- if (m != new_m) {
- m = new_m;
- VLOG(3) << "Retrying, m doesn't match";
- continue;
- }
-
- // We have confirmed that message still points to the same message. This
- // means that the message didn't get swapped out from under us, so
- // starting_queue_index is correct.
- //
- // Either we got too far behind (signaled by this being a valid
- // message), or this is one of the initial messages which are invalid.
- if (starting_queue_index.valid()) {
- VLOG(3) << "Too old. Tried for " << std::hex << queue_index.index()
- << ", got " << starting_queue_index.index() << ", behind by "
- << std::dec
- << (starting_queue_index.index() - queue_index.index());
- return ReadResult::TOO_OLD;
- }
-
- VLOG(3) << "Initial";
-
- // There isn't a valid message at this location.
- //
- // If someone asks for one of the messages within the first go around,
- // then they need to wait. They got ahead. Otherwise, they are
- // asking for something crazy, like something before the beginning of
- // the queue. Tell them that they are behind.
- if (uint32_queue_index < memory_->queue_size()) {
- VLOG(3) << "Near zero, " << std::hex << uint32_queue_index;
- return ReadResult::NOTHING_NEW;
- } else {
- VLOG(3) << "Not near zero, " << std::hex << uint32_queue_index;
- return ReadResult::TOO_OLD;
- }
+ VLOG(3) << "Not near zero, " << std::hex << uint32_queue_index;
+ return ReadResult::TOO_OLD;
}
}
VLOG(3) << "Eq: " << std::hex << starting_queue_index.index() << ", "
@@ -960,6 +1256,8 @@
<< ::std::endl;
::std::cout << " size_t num_senders = " << memory->config.num_senders
<< ::std::endl;
+ ::std::cout << " size_t num_pinners = " << memory->config.num_pinners
+ << ::std::endl;
::std::cout << " size_t queue_size = " << memory->config.queue_size
<< ::std::endl;
::std::cout << " size_t message_data_size = "
@@ -1049,6 +1347,22 @@
}
::std::cout << " }" << ::std::endl;
+ ::std::cout << " Pinner pinners[" << memory->num_pinners() << "] {"
+ << ::std::endl;
+ for (size_t i = 0; i < memory->num_pinners(); ++i) {
+ Pinner *p = memory->GetPinner(i);
+ ::std::cout << " [" << i << "] -> Pinner {" << ::std::endl;
+ ::std::cout << " aos_mutex tid = " << PrintMutex(&p->tid)
+ << ::std::endl;
+ ::std::cout << " AtomicIndex scratch_index = "
+ << p->scratch_index.Load().DebugString() << ::std::endl;
+ ::std::cout << " AtomicIndex pinned = "
+ << p->pinned.Load(memory->queue_size()).DebugString()
+ << ::std::endl;
+ ::std::cout << " }" << ::std::endl;
+ }
+ ::std::cout << " }" << ::std::endl;
+
::std::cout << " Watcher watchers[" << memory->num_watchers() << "] {"
<< ::std::endl;
for (size_t i = 0; i < memory->num_watchers(); ++i) {