Add LocklessQueue::Pinner class
This will allow reading messages from queues without copying, which is
helpful for speeding up the processing of images.
Change-Id: Ia4bb98afa6fe1c1b5cc186e3071c7458f143d77d
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index d2c9112..0aa82a1 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -60,7 +60,7 @@
}
std::string ShmPath(const Channel *channel) {
CHECK(channel->has_type());
- return ShmFolder(channel) + channel->type()->str() + ".v2";
+ return ShmFolder(channel) + channel->type()->str() + ".v3";
}
void PageFaultData(char *data, size_t size) {
@@ -96,6 +96,7 @@
config_.num_watchers = channel->num_watchers();
config_.num_senders = channel->num_senders();
+ config_.num_pinners = 0;
config_.queue_size =
channel_storage_duration.count() * channel->frequency();
config_.message_data_size = channel->max_size();
diff --git a/aos/events/shm_event_loop_test.cc b/aos/events/shm_event_loop_test.cc
index d25e2f8..f0680a9 100644
--- a/aos/events/shm_event_loop_test.cc
+++ b/aos/events/shm_event_loop_test.cc
@@ -25,12 +25,12 @@
}
// Clean up anything left there before.
- unlink((FLAGS_shm_base + "/test/aos.TestMessage.v2").c_str());
- unlink((FLAGS_shm_base + "/test1/aos.TestMessage.v2").c_str());
- unlink((FLAGS_shm_base + "/test2/aos.TestMessage.v2").c_str());
- unlink((FLAGS_shm_base + "/test2/aos.TestMessage.v2").c_str());
- unlink((FLAGS_shm_base + "/aos/aos.timing.Report.v2").c_str());
- unlink((FLAGS_shm_base + "/aos/aos.logging.LogMessageFbs.v2").c_str());
+ unlink((FLAGS_shm_base + "/test/aos.TestMessage.v3").c_str());
+ unlink((FLAGS_shm_base + "/test1/aos.TestMessage.v3").c_str());
+ unlink((FLAGS_shm_base + "/test2/aos.TestMessage.v3").c_str());
+ unlink((FLAGS_shm_base + "/test2/aos.TestMessage.v3").c_str());
+ unlink((FLAGS_shm_base + "/aos/aos.timing.Report.v3").c_str());
+ unlink((FLAGS_shm_base + "/aos/aos.logging.LogMessageFbs.v3").c_str());
}
~ShmEventLoopTestFactory() { FLAGS_override_hostname = ""; }
diff --git a/aos/ipc_lib/BUILD b/aos/ipc_lib/BUILD
index 5e14abc..6c14200 100644
--- a/aos/ipc_lib/BUILD
+++ b/aos/ipc_lib/BUILD
@@ -134,6 +134,9 @@
srcs = ["index.cc"],
hdrs = ["index.h"],
visibility = ["//visibility:public"],
+ deps = [
+ "@com_github_google_glog//:glog",
+ ],
)
cc_test(
diff --git a/aos/ipc_lib/index.h b/aos/ipc_lib/index.h
index 7d979ea..890e85f 100644
--- a/aos/ipc_lib/index.h
+++ b/aos/ipc_lib/index.h
@@ -5,6 +5,8 @@
#include <atomic>
#include <string>
+#include "glog/logging.h"
+
namespace aos {
namespace ipc_lib {
@@ -52,9 +54,7 @@
}
// Gets the next index.
- QueueIndex Increment() const {
- return IncrementBy(1u);
- }
+ QueueIndex Increment() const { return IncrementBy(1u); }
// Gets the nth next element.
QueueIndex IncrementBy(uint32_t amount) const {
@@ -148,6 +148,10 @@
// Invalidates the element unconditionally.
inline void Invalidate() { Store(QueueIndex::Invalid()); }
+ inline void RelaxedInvalidate() {
+ index_.store(QueueIndex::Invalid().index_, ::std::memory_order_relaxed);
+ }
+
// Swaps expected for index atomically. Returns true on success, false
// otherwise.
inline bool CompareAndExchangeStrong(QueueIndex expected, QueueIndex index) {
@@ -168,7 +172,9 @@
: Index(queue_index.index_, message_index) {}
Index(uint32_t queue_index, uint16_t message_index)
: index_((queue_index & 0xffff) |
- (static_cast<uint32_t>(message_index) << 16)) {}
+ (static_cast<uint32_t>(message_index) << 16)) {
+ CHECK_LE(message_index, MaxMessages());
+ }
// Index of this message in the message array.
uint16_t message_index() const { return (index_ >> 16) & 0xffff; }
@@ -193,13 +199,13 @@
static constexpr uint16_t MaxMessages() { return 0xfffe; }
bool operator==(const Index other) const { return other.index_ == index_; }
+ bool operator!=(const Index other) const { return other.index_ != index_; }
// Returns a string representing the index.
::std::string DebugString() const;
private:
- Index(uint32_t index)
- : index_(index) {}
+ Index(uint32_t index) : index_(index) {}
friend class AtomicIndex;
@@ -235,11 +241,16 @@
// Swaps expected for index atomically. Returns true on success, false
// otherwise.
- inline bool CompareAndExchangeStrong(Index expected, Index index) {
+ bool CompareAndExchangeStrong(Index expected, Index index) {
return index_.compare_exchange_strong(expected.index_, index.index_,
::std::memory_order_acq_rel);
}
+ bool CompareAndExchangeWeak(Index *expected, Index index) {
+ return index_.compare_exchange_weak(expected->index_, index.index_,
+ ::std::memory_order_acq_rel);
+ }
+
private:
::std::atomic<uint32_t> index_;
};
diff --git a/aos/ipc_lib/lockless_queue.cc b/aos/ipc_lib/lockless_queue.cc
index d9a1a71..01e5f24 100644
--- a/aos/ipc_lib/lockless_queue.cc
+++ b/aos/ipc_lib/lockless_queue.cc
@@ -38,6 +38,97 @@
LocklessQueueMemory *const memory_;
};
+bool IsPinned(LocklessQueueMemory *memory, Index index) {
+ DCHECK(index.valid());
+ const size_t queue_size = memory->queue_size();
+ const QueueIndex message_index =
+ memory->GetMessage(index)->header.queue_index.Load(queue_size);
+ if (!message_index.valid()) {
+ return false;
+ }
+ DCHECK(memory->GetQueue(message_index.Wrapped())->Load() != index)
+ << ": Message is in the queue";
+ for (int pinner_index = 0;
+ pinner_index < static_cast<int>(memory->config.num_pinners);
+ ++pinner_index) {
+ ipc_lib::Pinner *const pinner = memory->GetPinner(pinner_index);
+
+ if (pinner->pinned.RelaxedLoad(queue_size) == message_index) {
+ return true;
+ }
+ }
+ return false;
+}
+
+// Ensures sender->scratch_index (which must contain to_replace) is not pinned.
+//
+// Returns the new scratch_index value.
+Index SwapPinnedSenderScratch(LocklessQueueMemory *const memory,
+ ipc_lib::Sender *const sender,
+ const Index to_replace) {
+ // If anybody's trying to pin this message, then grab a message from a pinner
+ // to write into instead, and leave the message we pulled out of the queue
+ // (currently in our scratch_index) with a pinner.
+ //
+ // This loop will terminate in at most one iteration through the pinners in
+ // any steady-state configuration of the memory. There are only as many
+ // Pinner::pinned values to worry about as there are Pinner::scratch_index
+ // values to check against, plus to_replace, which means there will always be
+ // a free one. We might have to make multiple passes if things are being
+ // changed concurrently though, but nobody dying can make this loop fail to
+ // terminate (because the number of processes that can die is bounded, because
+ // no new ones can start while we've got the lock).
+ for (int pinner_index = 0; true;
+ pinner_index = (pinner_index + 1) % memory->config.num_pinners) {
+ if (!IsPinned(memory, to_replace)) {
+ // No pinners on our current scratch_index, so we're fine now.
+ VLOG(3) << "No pinners: " << to_replace.DebugString();
+ return to_replace;
+ }
+
+ ipc_lib::Pinner *const pinner = memory->GetPinner(pinner_index);
+
+ const Index pinner_scratch = pinner->scratch_index.RelaxedLoad();
+ CHECK(pinner_scratch.valid())
+ << ": Pinner scratch_index should always be valid";
+ if (IsPinned(memory, pinner_scratch)) {
+ // Wouldn't do us any good to swap with this one, so don't bother, and
+ // move onto the next one.
+ VLOG(3) << "Also pinned: " << pinner_scratch.DebugString();
+ continue;
+ }
+
+ sender->to_replace.RelaxedStore(pinner_scratch);
+ aos_compiler_memory_barrier();
+ // Give the pinner the message (which is currently in
+ // sender->scratch_index).
+ if (!pinner->scratch_index.CompareAndExchangeStrong(pinner_scratch,
+ to_replace)) {
+ // Somebody swapped into this pinner before us. The new value is probably
+ // pinned, so we don't want to look at it again immediately.
+ VLOG(3) << "Pinner " << pinner_index
+ << " scratch_index changed: " << pinner_scratch.DebugString()
+ << ", " << to_replace.DebugString();
+ sender->to_replace.RelaxedInvalidate();
+ continue;
+ }
+ aos_compiler_memory_barrier();
+ // Now update the sender's scratch space and record that we succeeded.
+ sender->scratch_index.Store(pinner_scratch);
+ aos_compiler_memory_barrier();
+ // And then record that we succeeded, but definitely after the above
+ // store.
+ sender->to_replace.RelaxedInvalidate();
+ VLOG(3) << "Got new scratch message: " << pinner_scratch.DebugString();
+
+ // If it's in a pinner's scratch_index, it should not be in the queue, which
+ // means nobody new can pin it for real. However, they can still attempt to
+ // pin it, which means we can't verify !IsPinned down here.
+
+ return pinner_scratch;
+ }
+}
+
// Returns true if it succeeded. Returns false if another sender died in the
// middle.
bool DoCleanup(LocklessQueueMemory *memory, const GrabQueueSetupLockOrDie &) {
@@ -48,6 +139,7 @@
aos_compiler_memory_barrier();
const size_t num_senders = memory->num_senders();
+ const size_t num_pinners = memory->num_pinners();
const size_t queue_size = memory->queue_size();
const size_t num_messages = memory->num_messages();
@@ -105,11 +197,17 @@
// to_replace = yyy
// We are in the act of moving to_replace to scratch_index, but didn't
// finish. Easy.
+ //
+ // If doing a pinner swap, we've definitely done it.
// 4) scratch_index = yyy
// to_replace = invalid
// Finished, but died. Looks like 1)
+ // Swapping with a pinner's scratch_index passes through the same states.
+ // We just need to ensure the message that ends up in the senders's
+ // scratch_index isn't pinned, using the same code as sending does.
+
// Any cleanup code needs to follow the same set of states to be robust to
// death, so death can be restarted.
@@ -117,6 +215,14 @@
// 1) or 4). Make sure we aren't corrupted and declare victory.
CHECK(scratch_index.valid());
+ // If it's in 1) with a pinner, the sender might have a pinned message,
+ // so fix that.
+ SwapPinnedSenderScratch(memory, sender, scratch_index);
+
+ // If it's in 4), it may not have completed this step yet. This will
+ // always be a NOP if it's in 1), verified by a DCHECK.
+ memory->GetMessage(scratch_index)->header.queue_index.RelaxedInvalidate();
+
__atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_RELEASE);
++valid_senders;
continue;
@@ -129,6 +235,11 @@
// Just need to invalidate to_replace to finish.
sender->to_replace.Invalidate();
+ // Make sure to indicate it's an unused message before a sender gets its
+ // hands on it.
+ memory->GetMessage(scratch_index)->header.queue_index.RelaxedInvalidate();
+ aos_compiler_memory_barrier();
+
// And mark that we succeeded.
__atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_RELEASE);
++valid_senders;
@@ -139,6 +250,20 @@
need_recovery[i] = true;
}
+ // Cleaning up pinners is easy. We don't actually have to do anything, but
+ // invalidating its pinned field might help catch bugs elsewhere trying to
+ // read it before it's set.
+ for (size_t i = 0; i < num_pinners; ++i) {
+ Pinner *const pinner = memory->GetPinner(i);
+ const uint32_t tid =
+ __atomic_load_n(&(pinner->tid.futex), __ATOMIC_ACQUIRE);
+ if (!(tid & FUTEX_OWNER_DIED)) {
+ continue;
+ }
+ pinner->pinned.Invalidate();
+ __atomic_store_n(&(pinner->tid.futex), 0, __ATOMIC_RELEASE);
+ }
+
// If all the senders are (or were made) good, there is no need to do the hard
// case.
if (valid_senders == num_senders) {
@@ -162,18 +287,18 @@
return false;
}
++num_missing;
- } else {
- CHECK(!need_recovery[i]) << ": Somebody else recovered a sender: " << i;
- // We can do a relaxed load here because we're the only person touching
- // this sender at this point, if it matters. If it's not a dead sender,
- // then any message it every has will already be accounted for, so this
- // will always be a NOP.
- const Index scratch_index = sender->scratch_index.RelaxedLoad();
- if (!accounted_for[scratch_index.message_index()]) {
- ++num_accounted_for;
- }
- accounted_for[scratch_index.message_index()] = true;
+ continue;
}
+ CHECK(!need_recovery[i]) << ": Somebody else recovered a sender: " << i;
+ // We can do a relaxed load here because we're the only person touching
+ // this sender at this point, if it matters. If it's not a dead sender,
+ // then any message it ever has will eventually be accounted for if we
+ // make enough tries through the outer loop.
+ const Index scratch_index = sender->scratch_index.RelaxedLoad();
+ if (!accounted_for[scratch_index.message_index()]) {
+ ++num_accounted_for;
+ }
+ accounted_for[scratch_index.message_index()] = true;
}
for (size_t i = 0; i < queue_size; ++i) {
@@ -185,6 +310,16 @@
accounted_for[index.message_index()] = true;
}
+ for (size_t pinner_index = 0; pinner_index < num_pinners; ++pinner_index) {
+ // Same logic as above for scratch_index applies here too.
+ const Index index =
+ memory->GetPinner(pinner_index)->scratch_index.RelaxedLoad();
+ if (!accounted_for[index.message_index()]) {
+ ++num_accounted_for;
+ }
+ accounted_for[index.message_index()] = true;
+ }
+
CHECK_LE(num_accounted_for + num_missing, num_messages);
}
@@ -224,6 +359,9 @@
// atomically insert scratch_index into the queue yet. So
// invalidate to_replace.
sender->to_replace.Invalidate();
+ // Sender definitely will not have gotten here, so finish for it.
+ memory->GetMessage(scratch_index)
+ ->header.queue_index.RelaxedInvalidate();
// And then mark this sender clean.
__atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_RELEASE);
@@ -240,6 +378,12 @@
// scratch_index is accounted for. That means we did the insert,
// but didn't record it.
CHECK(to_replace.valid());
+
+ // Make sure to indicate it's an unused message before a sender gets its
+ // hands on it.
+ memory->GetMessage(to_replace)->header.queue_index.RelaxedInvalidate();
+ aos_compiler_memory_barrier();
+
// Finish the transaction. Copy to_replace, then clear it.
sender->scratch_index.Store(to_replace);
@@ -311,6 +455,9 @@
CHECK_EQ(size % alignof(Sender), 0u);
size += LocklessQueueMemory::SizeOfSenders(config);
+ CHECK_EQ(size % alignof(Pinner), 0u);
+ size += LocklessQueueMemory::SizeOfPinners(config);
+
return size;
}
@@ -371,6 +518,7 @@
// TODO(austin): Check these for out of bounds.
memory->config.num_watchers = config.num_watchers;
memory->config.num_senders = config.num_senders;
+ memory->config.num_pinners = config.num_pinners;
memory->config.queue_size = config.queue_size;
memory->config.message_data_size = config.message_data_size;
@@ -403,6 +551,15 @@
s->to_replace.RelaxedInvalidate();
}
+ for (size_t i = 0; i < memory->num_pinners(); ++i) {
+ ::aos::ipc_lib::Pinner *pinner = memory->GetPinner(i);
+ // Nobody else can possibly be touching these because we haven't set
+ // initialized to true yet.
+ pinner->scratch_index.RelaxedStore(
+ Index(0xffff, i + memory->num_senders() + memory->queue_size()));
+ pinner->pinned.Invalidate();
+ }
+
aos_compiler_memory_barrier();
// Signal everything is done. This needs to be done last, so if we die, we
// redo initialization.
@@ -609,11 +766,16 @@
return;
}
- ::aos::ipc_lib::Sender *s = memory_->GetSender(sender_index_);
+ ::aos::ipc_lib::Sender *const sender = memory_->GetSender(sender_index_);
// Indicate that we are now alive by taking over the slot. If the previous
// owner died, we still want to do this.
- death_notification_init(&(s->tid));
+ death_notification_init(&(sender->tid));
+
+ const Index scratch_index = sender->scratch_index.RelaxedLoad();
+ Message *const message = memory_->GetMessage(scratch_index);
+ CHECK(!message->header.queue_index.RelaxedLoad(memory_->queue_size()).valid())
+ << ": " << std::hex << scratch_index.get();
}
LocklessQueue::Sender::~Sender() {
@@ -622,29 +784,17 @@
}
}
-std::optional<LocklessQueue::Sender> LocklessQueue::MakeSender() {
- LocklessQueue::Sender result = LocklessQueue::Sender(memory_);
- if (result.valid()) {
- return std::move(result);
- } else {
- return std::nullopt;
- }
-}
-
-QueueIndex ZeroOrValid(QueueIndex index) {
- if (!index.valid()) {
- return index.Clear();
- }
- return index;
-}
-
size_t LocklessQueue::Sender::size() { return memory_->message_data_size(); }
void *LocklessQueue::Sender::Data() {
::aos::ipc_lib::Sender *sender = memory_->GetSender(sender_index_);
- Index scratch_index = sender->scratch_index.RelaxedLoad();
- Message *message = memory_->GetMessage(scratch_index);
- message->header.queue_index.Invalidate();
+ const Index scratch_index = sender->scratch_index.RelaxedLoad();
+ Message *const message = memory_->GetMessage(scratch_index);
+ // We should have invalidated this when we first got the buffer. Verify that
+ // in debug mode.
+ DCHECK(
+ !message->header.queue_index.RelaxedLoad(memory_->queue_size()).valid())
+ << ": " << std::hex << scratch_index.get();
return message->data(memory_->message_data_size());
}
@@ -666,6 +816,126 @@
monotonic_sent_time, realtime_sent_time, queue_index);
}
+LocklessQueue::Pinner::Pinner(LocklessQueueMemory *memory) : memory_(memory) {
+ GrabQueueSetupLockOrDie grab_queue_setup_lock(memory_);
+
+ // Since we already have the lock, go ahead and try cleaning up.
+ Cleanup(memory_, grab_queue_setup_lock);
+
+ const int num_pinners = memory_->num_pinners();
+
+ for (int i = 0; i < num_pinners; ++i) {
+ ::aos::ipc_lib::Pinner *p = memory->GetPinner(i);
+ // This doesn't need synchronization because we're the only process doing
+ // initialization right now, and nobody else will be touching pinners which
+ // we're interested in.
+ const uint32_t tid = __atomic_load_n(&(p->tid.futex), __ATOMIC_RELAXED);
+ if (tid == 0) {
+ pinner_index_ = i;
+ break;
+ }
+ }
+
+ if (pinner_index_ == -1) {
+ VLOG(1) << "Too many pinners, starting to bail.";
+ return;
+ }
+
+ ::aos::ipc_lib::Pinner *p = memory_->GetPinner(pinner_index_);
+ p->pinned.Invalidate();
+
+ // Indicate that we are now alive by taking over the slot. If the previous
+ // owner died, we still want to do this.
+ death_notification_init(&(p->tid));
+}
+
+LocklessQueue::Pinner::~Pinner() {
+ if (valid()) {
+ memory_->GetPinner(pinner_index_)->pinned.Invalidate();
+ aos_compiler_memory_barrier();
+ death_notification_release(&(memory_->GetPinner(pinner_index_)->tid));
+ }
+}
+
+// This method doesn't mess with any scratch_index, so it doesn't have to worry
+// about message ownership.
+bool LocklessQueue::Pinner::PinIndex(uint32_t uint32_queue_index) {
+ const size_t queue_size = memory_->queue_size();
+ const QueueIndex queue_index =
+ QueueIndex::Zero(queue_size).IncrementBy(uint32_queue_index);
+ ipc_lib::Pinner *const pinner = memory_->GetPinner(pinner_index_);
+
+ AtomicIndex *const queue_slot = memory_->GetQueue(queue_index.Wrapped());
+
+ // Indicate that we want to pin this message.
+ pinner->pinned.Store(queue_index);
+ aos_compiler_memory_barrier();
+
+ {
+ const Index message_index = queue_slot->Load();
+ Message *const message = memory_->GetMessage(message_index);
+
+ const QueueIndex message_queue_index =
+ message->header.queue_index.Load(queue_size);
+ if (message_queue_index == queue_index) {
+ VLOG(3) << "Eq: " << std::hex << message_queue_index.index();
+ aos_compiler_memory_barrier();
+ return true;
+ }
+ VLOG(3) << "Message reused: " << std::hex << message_queue_index.index()
+ << ", " << queue_index.index();
+ }
+
+ // Being down here means we asked to pin a message before realizing it's no
+ // longer in the queue, so back that out now.
+ pinner->pinned.Invalidate();
+ VLOG(3) << "Unpinned: " << std::hex << queue_index.index();
+ return false;
+}
+
+size_t LocklessQueue::Pinner::size() const {
+ return memory_->message_data_size();
+}
+
+const void *LocklessQueue::Pinner::Data() const {
+ const size_t queue_size = memory_->queue_size();
+ ::aos::ipc_lib::Pinner *pinner = memory_->GetPinner(pinner_index_);
+ QueueIndex pinned = pinner->pinned.RelaxedLoad(queue_size);
+ CHECK(pinned.valid());
+ const Message *message = memory_->GetMessage(pinned);
+
+ return message->data(memory_->message_data_size());
+}
+
+std::optional<LocklessQueue::Sender> LocklessQueue::MakeSender() {
+ LocklessQueue::Sender result = LocklessQueue::Sender(memory_);
+ if (result.valid()) {
+ return std::move(result);
+ } else {
+ return std::nullopt;
+ }
+}
+
+std::optional<LocklessQueue::Pinner> LocklessQueue::MakePinner() {
+ LocklessQueue::Pinner result = LocklessQueue::Pinner(memory_);
+ if (result.valid()) {
+ return std::move(result);
+ } else {
+ return std::nullopt;
+ }
+}
+
+namespace {
+
+QueueIndex ZeroOrValid(QueueIndex index) {
+ if (!index.valid()) {
+ return index.Clear();
+ }
+ return index;
+}
+
+} // namespace
+
void LocklessQueue::Sender::Send(
size_t length, aos::monotonic_clock::time_point monotonic_remote_time,
aos::realtime_clock::time_point realtime_remote_time,
@@ -682,6 +952,12 @@
const Index scratch_index = sender->scratch_index.RelaxedLoad();
Message *const message = memory_->GetMessage(scratch_index);
+ // We should have invalidated this when we first got the buffer. Verify that
+ // in debug mode.
+ DCHECK(
+ !message->header.queue_index.RelaxedLoad(memory_->queue_size()).valid())
+ << ": " << std::hex << scratch_index.get();
+
message->header.length = length;
// Pass these through. Any alternative behavior can be implemented out a
// layer.
@@ -689,6 +965,7 @@
message->header.monotonic_remote_time = monotonic_remote_time;
message->header.realtime_remote_time = realtime_remote_time;
+ Index to_replace = Index::Invalid();
while (true) {
const QueueIndex actual_next_queue_index =
memory_->next_queue_index.Load(queue_size);
@@ -698,7 +975,7 @@
// This needs to synchronize with whoever the previous writer at this
// location was.
- const Index to_replace = memory_->LoadIndex(next_queue_index);
+ to_replace = memory_->LoadIndex(next_queue_index);
const QueueIndex decremented_queue_index =
next_queue_index.DecrementBy(queue_size);
@@ -726,9 +1003,14 @@
}
// Confirm that the message is what it should be.
+ //
+ // This is just a best-effort check to skip reading the clocks if possible.
+ // If this fails, then the compare-exchange below definitely would, so we
+ // can bail out now.
{
const QueueIndex previous_index =
- memory_->GetMessage(to_replace)->header.queue_index.Load(queue_size);
+ memory_->GetMessage(to_replace)
+ ->header.queue_index.RelaxedLoad(queue_size);
if (previous_index != decremented_queue_index && previous_index.valid()) {
// Retry.
VLOG(3) << "Something fishy happened, queue index doesn't match. "
@@ -794,8 +1076,22 @@
aos_compiler_memory_barrier();
// And then record that we succeeded, but definitely after the above store.
sender->to_replace.RelaxedInvalidate();
+
break;
}
+
+ // to_replace is our current scratch_index. It isn't in the queue, which means
+ // nobody new can pin it. They can set their `pinned` to it, but they will
+ // back it out, so they don't count. This means that we just need to find a
+ // message for which no pinner had it in `pinned`, and then we know this
+ // message will never be pinned. We'll start with to_replace, and if that is
+ // pinned then we'll look for a new one to use instead.
+ const Index new_scratch =
+ SwapPinnedSenderScratch(memory_, sender, to_replace);
+
+ // If anybody is looking at this message (they shouldn't be), then try telling
+ // them about it (best-effort).
+ memory_->GetMessage(new_scratch)->header.queue_index.RelaxedInvalidate();
}
LocklessQueue::ReadResult LocklessQueue::Read(
@@ -827,46 +1123,46 @@
VLOG(3) << "Matches: " << std::hex << starting_queue_index.index()
<< ", " << queue_index.DecrementBy(queue_size).index();
return ReadResult::NOTHING_NEW;
+ }
+
+ // Someone has re-used this message between when we pulled it out of the
+ // queue and when we grabbed its index. It is pretty hard to deduce
+ // what happened. Just try again.
+ Message *const new_m = memory_->GetMessage(queue_index);
+ if (m != new_m) {
+ m = new_m;
+ VLOG(3) << "Retrying, m doesn't match";
+ continue;
+ }
+
+ // We have confirmed that message still points to the same message. This
+ // means that the message didn't get swapped out from under us, so
+ // starting_queue_index is correct.
+ //
+ // Either we got too far behind (signaled by this being a valid
+ // message), or this is one of the initial messages which are invalid.
+ if (starting_queue_index.valid()) {
+ VLOG(3) << "Too old. Tried for " << std::hex << queue_index.index()
+ << ", got " << starting_queue_index.index() << ", behind by "
+ << std::dec
+ << (starting_queue_index.index() - queue_index.index());
+ return ReadResult::TOO_OLD;
+ }
+
+ VLOG(3) << "Initial";
+
+ // There isn't a valid message at this location.
+ //
+ // If someone asks for one of the messages within the first go around,
+ // then they need to wait. They got ahead. Otherwise, they are
+ // asking for something crazy, like something before the beginning of
+ // the queue. Tell them that they are behind.
+ if (uint32_queue_index < memory_->queue_size()) {
+ VLOG(3) << "Near zero, " << std::hex << uint32_queue_index;
+ return ReadResult::NOTHING_NEW;
} else {
- // Someone has re-used this message between when we pulled it out of the
- // queue and when we grabbed its index. It is pretty hard to deduce
- // what happened. Just try again.
- Message *const new_m = memory_->GetMessage(queue_index);
- if (m != new_m) {
- m = new_m;
- VLOG(3) << "Retrying, m doesn't match";
- continue;
- }
-
- // We have confirmed that message still points to the same message. This
- // means that the message didn't get swapped out from under us, so
- // starting_queue_index is correct.
- //
- // Either we got too far behind (signaled by this being a valid
- // message), or this is one of the initial messages which are invalid.
- if (starting_queue_index.valid()) {
- VLOG(3) << "Too old. Tried for " << std::hex << queue_index.index()
- << ", got " << starting_queue_index.index() << ", behind by "
- << std::dec
- << (starting_queue_index.index() - queue_index.index());
- return ReadResult::TOO_OLD;
- }
-
- VLOG(3) << "Initial";
-
- // There isn't a valid message at this location.
- //
- // If someone asks for one of the messages within the first go around,
- // then they need to wait. They got ahead. Otherwise, they are
- // asking for something crazy, like something before the beginning of
- // the queue. Tell them that they are behind.
- if (uint32_queue_index < memory_->queue_size()) {
- VLOG(3) << "Near zero, " << std::hex << uint32_queue_index;
- return ReadResult::NOTHING_NEW;
- } else {
- VLOG(3) << "Not near zero, " << std::hex << uint32_queue_index;
- return ReadResult::TOO_OLD;
- }
+ VLOG(3) << "Not near zero, " << std::hex << uint32_queue_index;
+ return ReadResult::TOO_OLD;
}
}
VLOG(3) << "Eq: " << std::hex << starting_queue_index.index() << ", "
@@ -960,6 +1256,8 @@
<< ::std::endl;
::std::cout << " size_t num_senders = " << memory->config.num_senders
<< ::std::endl;
+ ::std::cout << " size_t num_pinners = " << memory->config.num_pinners
+ << ::std::endl;
::std::cout << " size_t queue_size = " << memory->config.queue_size
<< ::std::endl;
::std::cout << " size_t message_data_size = "
@@ -1049,6 +1347,22 @@
}
::std::cout << " }" << ::std::endl;
+ ::std::cout << " Pinner pinners[" << memory->num_pinners() << "] {"
+ << ::std::endl;
+ for (size_t i = 0; i < memory->num_pinners(); ++i) {
+ Pinner *p = memory->GetPinner(i);
+ ::std::cout << " [" << i << "] -> Pinner {" << ::std::endl;
+ ::std::cout << " aos_mutex tid = " << PrintMutex(&p->tid)
+ << ::std::endl;
+ ::std::cout << " AtomicIndex scratch_index = "
+ << p->scratch_index.Load().DebugString() << ::std::endl;
+ ::std::cout << " AtomicIndex pinned = "
+ << p->pinned.Load(memory->queue_size()).DebugString()
+ << ::std::endl;
+ ::std::cout << " }" << ::std::endl;
+ }
+ ::std::cout << " }" << ::std::endl;
+
::std::cout << " Watcher watchers[" << memory->num_watchers() << "] {"
<< ::std::endl;
for (size_t i = 0; i < memory->num_watchers(); ++i) {
diff --git a/aos/ipc_lib/lockless_queue.h b/aos/ipc_lib/lockless_queue.h
index de80f3d..5676c53 100644
--- a/aos/ipc_lib/lockless_queue.h
+++ b/aos/ipc_lib/lockless_queue.h
@@ -4,8 +4,8 @@
#include <signal.h>
#include <sys/signalfd.h>
#include <sys/types.h>
-#include <vector>
#include <optional>
+#include <vector>
#include "aos/ipc_lib/aos_sync.h"
#include "aos/ipc_lib/data_alignment.h"
@@ -51,6 +51,21 @@
AtomicIndex to_replace;
};
+// Structure to hold the state required to pin messages.
+struct Pinner {
+ // The same as Sender::tid. See there for docs.
+ aos_mutex tid;
+
+ // Queue index of the message we have pinned, or Invalid if there isn't one.
+ AtomicQueueIndex pinned;
+
+ // This should always be valid.
+ //
+ // Note that this is fully independent from pinned. It's just a place to stash
+ // a message, to ensure there's always an unpinned one for a writer to grab.
+ AtomicIndex scratch_index;
+};
+
// Structure representing a message.
struct Message {
struct Header {
@@ -98,6 +113,8 @@
size_t num_watchers;
// Size of the sender list.
size_t num_senders;
+ // Size of the pinner list.
+ size_t num_pinners;
// Size of the list of pointers into the messages list.
size_t queue_size;
@@ -106,7 +123,7 @@
size_t message_size() const;
- size_t num_messages() const { return num_senders + queue_size; }
+ size_t num_messages() const { return num_senders + num_pinners + queue_size; }
};
// Structure to hold the state of the queue.
@@ -258,10 +275,64 @@
int sender_index_ = -1;
};
+ // Pinner for blocks of data. The resources associated with a pinner are
+ // scoped to this object's lifetime.
+ class Pinner {
+ public:
+ Pinner(const Pinner &) = delete;
+ Pinner &operator=(const Pinner &) = delete;
+ Pinner(Pinner &&other)
+ : memory_(other.memory_), pinner_index_(other.pinner_index_) {
+ other.memory_ = nullptr;
+ other.pinner_index_ = -1;
+ }
+ Pinner &operator=(Pinner &&other) {
+ memory_ = other.memory_;
+ pinner_index_ = other.pinner_index_;
+ other.memory_ = nullptr;
+ other.pinner_index_ = -1;
+ return *this;
+ }
+
+ ~Pinner();
+
+ // Attempts to pin the message at queue_index.
+ // Un-pins the previous message.
+ // Returns true if it succeeds.
+ // Returns false if that message is no longer in the queue.
+ bool PinIndex(uint32_t queue_index);
+
+ // Read at most size() bytes of data into the memory pointed to by Data().
+ // Note: calls to Data() are expensive enough that you should cache it.
+ // Don't call Data() before a successful PinIndex call.
+ size_t size() const;
+ const void *Data() const;
+
+ private:
+ friend class LocklessQueue;
+
+ Pinner(LocklessQueueMemory *memory);
+
+ // Returns true if this pinner is valid. If it isn't valid, any of the
+ // other methods won't work. This is here to allow the lockless queue to
+ // only build a pinner if there was one available.
+ bool valid() const { return pinner_index_ != -1 && memory_ != nullptr; }
+
+ // Pointer to the backing memory.
+ LocklessQueueMemory *memory_ = nullptr;
+
+ // Index into the pinner list.
+ int pinner_index_ = -1;
+ };
+
// Creates a sender. If we couldn't allocate a sender, returns nullopt.
// TODO(austin): Change the API if we find ourselves with more errors.
std::optional<Sender> MakeSender();
+ // Creates a pinner. If we couldn't allocate a pinner, returns nullopt.
+ // TODO(austin): Change the API if we find ourselves with more errors.
+ std::optional<Pinner> MakePinner();
+
private:
LocklessQueueMemory *memory_ = nullptr;
diff --git a/aos/ipc_lib/lockless_queue_death_test.cc b/aos/ipc_lib/lockless_queue_death_test.cc
index b7cdfee..a8a2429 100644
--- a/aos/ipc_lib/lockless_queue_death_test.cc
+++ b/aos/ipc_lib/lockless_queue_death_test.cc
@@ -503,6 +503,7 @@
LocklessQueueConfiguration config;
config.num_watchers = 2;
config.num_senders = 2;
+ config.num_pinners = 1;
config.queue_size = 4;
config.message_data_size = 32;
@@ -550,6 +551,7 @@
}
if (print) {
+ printf("Bad version:\n");
PrintLocklessQueueMemory(memory);
}
diff --git a/aos/ipc_lib/lockless_queue_memory.h b/aos/ipc_lib/lockless_queue_memory.h
index a10609c..aa9b2e1 100644
--- a/aos/ipc_lib/lockless_queue_memory.h
+++ b/aos/ipc_lib/lockless_queue_memory.h
@@ -31,6 +31,8 @@
size_t num_watchers() const { return config.num_watchers; }
// Size of the sender list.
size_t num_senders() const { return config.num_senders; }
+ // Size of the pinner list.
+ size_t num_pinners() const { return config.num_pinners; }
// Number of messages logically in the queue at a time.
// List of pointers into the messages list.
@@ -60,9 +62,10 @@
// writing:
//
// AtomicIndex queue[config.queue_size];
- // Message messages[config.queue_size + config.num_senders];
+ // Message messages[config.num_messages()];
// Watcher watchers[config.num_watchers];
// Sender senders[config.num_senders];
+ // Pinner pinners[config.num_pinners];
static constexpr size_t kDataAlignment = alignof(std::max_align_t);
@@ -92,7 +95,21 @@
return AlignmentRoundUp(sizeof(Sender) * config.num_senders);
}
- // Getters for each of the 4 lists.
+ size_t SizeOfPinners() { return SizeOfPinners(config); }
+ static size_t SizeOfPinners(LocklessQueueConfiguration config) {
+ return AlignmentRoundUp(sizeof(Pinner) * config.num_pinners);
+ }
+
+ // Getters for each of the lists.
+
+ Pinner *GetPinner(size_t pinner_index) {
+ static_assert(alignof(Pinner) <= kDataAlignment,
+ "kDataAlignment is too small");
+ return reinterpret_cast<Pinner *>(
+ &data[0] + SizeOfQueue() + SizeOfMessages() + SizeOfWatchers() +
+ SizeOfSenders() + pinner_index * sizeof(Pinner));
+ }
+
Sender *GetSender(size_t sender_index) {
static_assert(alignof(Sender) <= kDataAlignment,
"kDataAlignment is too small");
@@ -116,8 +133,8 @@
sizeof(AtomicIndex) * index);
}
- // There are num_senders + queue_size messages. The free list is really the
- // sender list, since those are messages available to be filled in and sent.
+ // There are num_messages() messages. The free list is really the
+ // sender+pinner list, since those are messages available to be filled in.
// This removes the need to find lost messages when a sender dies.
Message *GetMessage(Index index) {
static_assert(alignof(Message) <= kDataAlignment,
diff --git a/aos/ipc_lib/lockless_queue_test.cc b/aos/ipc_lib/lockless_queue_test.cc
index 65b2a15..4e84b9f 100644
--- a/aos/ipc_lib/lockless_queue_test.cc
+++ b/aos/ipc_lib/lockless_queue_test.cc
@@ -44,6 +44,7 @@
LocklessQueueTest() {
config_.num_watchers = 10;
config_.num_senders = 100;
+ config_.num_pinners = 5;
config_.queue_size = 10000;
// Exercise the alignment code. This would throw off alignment.
config_.message_data_size = 101;
diff --git a/aos/ipc_lib/queue_racer.cc b/aos/ipc_lib/queue_racer.cc
index f5b3d7e..69f5f21 100644
--- a/aos/ipc_lib/queue_racer.cc
+++ b/aos/ipc_lib/queue_racer.cc
@@ -139,42 +139,63 @@
} else {
t.event_count = 0;
}
- t.thread =
- ::std::thread([this, &t, thread_index, &run, write_wrap_count]() {
- // Build up a sender.
- LocklessQueue queue(memory_, config_);
- LocklessQueue::Sender sender = queue.MakeSender().value();
+ t.thread = ::std::thread([this, &t, thread_index, &run,
+ write_wrap_count]() {
+ // Build up a sender.
+ LocklessQueue queue(memory_, config_);
+ LocklessQueue::Sender sender = queue.MakeSender().value();
+ CHECK_GE(sender.size(), sizeof(ThreadPlusCount));
- // Signal that we are ready to start sending.
- t.ready.Set();
+ // Signal that we are ready to start sending.
+ t.ready.Set();
- // Wait until signaled to start running.
- run.Wait();
+ // Wait until signaled to start running.
+ run.Wait();
- // Gogogo!
- for (uint64_t i = 0;
- i < num_messages_ * static_cast<uint64_t>(1 + write_wrap_count);
- ++i) {
- char data[sizeof(ThreadPlusCount)];
- ThreadPlusCount tpc;
- tpc.thread = thread_index;
- tpc.count = i;
-
- memcpy(data, &tpc, sizeof(ThreadPlusCount));
-
- if (i % 0x800000 == 0x100000) {
- fprintf(stderr, "Sent %" PRIu64 ", %f %%\n", i,
- static_cast<double>(i) /
- static_cast<double>(num_messages_ *
- (1 + write_wrap_count)) *
- 100.0);
+ // Gogogo!
+ for (uint64_t i = 0;
+ i < num_messages_ * static_cast<uint64_t>(1 + write_wrap_count);
+ ++i) {
+ char *const data = static_cast<char *>(sender.Data()) + sender.size() -
+ sizeof(ThreadPlusCount);
+ const char fill = (i + 55) & 0xFF;
+ memset(data, fill, sizeof(ThreadPlusCount));
+ {
+ bool found_nonzero = false;
+ for (size_t i = 0; i < sizeof(ThreadPlusCount); ++i) {
+ if (data[i] != fill) {
+ found_nonzero = true;
}
-
- ++started_writes_;
- sender.Send(data, sizeof(ThreadPlusCount));
- ++finished_writes_;
}
- });
+ CHECK(!found_nonzero) << ": Somebody else is writing to our buffer";
+ }
+
+ ThreadPlusCount tpc;
+ tpc.thread = thread_index;
+ tpc.count = i;
+
+ memcpy(data, &tpc, sizeof(ThreadPlusCount));
+
+ if (i % 0x800000 == 0x100000) {
+ fprintf(
+ stderr, "Sent %" PRIu64 ", %f %%\n", i,
+ static_cast<double>(i) /
+ static_cast<double>(num_messages_ * (1 + write_wrap_count)) *
+ 100.0);
+ }
+
+ ++started_writes_;
+ sender.Send(sizeof(ThreadPlusCount));
+ // Blank out the new scratch buffer, to catch other people using it.
+ {
+ char *const new_data = static_cast<char *>(sender.Data()) +
+ sender.size() - sizeof(ThreadPlusCount);
+ const char new_fill = ~fill;
+ memset(new_data, new_fill, sizeof(ThreadPlusCount));
+ }
+ ++finished_writes_;
+ }
+ });
++thread_index;
}
@@ -303,18 +324,18 @@
if (race_reads) {
// Make sure nothing goes backwards. Really not much we can do here.
- ASSERT_LE((*threads)[tpc.thread].event_count, tpc.count) << ": Thread "
- << tpc.thread;
+ ASSERT_LE((*threads)[tpc.thread].event_count, tpc.count)
+ << ": Thread " << tpc.thread;
(*threads)[tpc.thread].event_count = tpc.count;
} else {
// Make sure nothing goes backwards. Really not much we can do here.
- ASSERT_EQ((*threads)[tpc.thread].event_count, tpc.count) << ": Thread "
- << tpc.thread;
+ ASSERT_EQ((*threads)[tpc.thread].event_count, tpc.count)
+ << ": Thread " << tpc.thread;
}
} else {
// Confirm that we see every message counter from every thread.
- ASSERT_EQ((*threads)[tpc.thread].event_count, tpc.count) << ": Thread "
- << tpc.thread;
+ ASSERT_EQ((*threads)[tpc.thread].event_count, tpc.count)
+ << ": Thread " << tpc.thread;
}
++(*threads)[tpc.thread].event_count;
}