aos: Refactor lockless_queue ownership into a class
This sets us up to make it more robust with a safer API. Our end goal
is to more reliably detect thread deaths.
Change-Id: I00fad59a4d77eec31f0f51c85834b0892d2d2462
Co-authored-By: Austin Schuh <austin.schuh@bluerivertech.com>
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/ipc_lib/lockless_queue.cc b/aos/ipc_lib/lockless_queue.cc
index fd3a305..0752e2f 100644
--- a/aos/ipc_lib/lockless_queue.cc
+++ b/aos/ipc_lib/lockless_queue.cc
@@ -172,9 +172,7 @@
size_t valid_senders = 0;
for (size_t i = 0; i < num_senders; ++i) {
Sender *sender = memory->GetSender(i);
- const uint32_t tid =
- __atomic_load_n(&(sender->tid.futex), __ATOMIC_ACQUIRE);
- if (!(tid & FUTEX_OWNER_DIED)) {
+ if (!sender->ownership_tracker.LoadAcquire().OwnerIsDead()) {
// Not dead.
++valid_senders;
continue;
@@ -227,7 +225,7 @@
// always be a NOP if it's in 1), verified by a DCHECK.
memory->GetMessage(scratch_index)->header.queue_index.RelaxedInvalidate();
- __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_RELEASE);
+ sender->ownership_tracker.ForceClear();
++valid_senders;
continue;
}
@@ -245,7 +243,7 @@
aos_compiler_memory_barrier();
// And mark that we succeeded.
- __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_RELEASE);
+ sender->ownership_tracker.ForceClear();
++valid_senders;
continue;
}
@@ -259,13 +257,11 @@
// read it before it's set.
for (size_t i = 0; i < num_pinners; ++i) {
Pinner *const pinner = memory->GetPinner(i);
- const uint32_t tid =
- __atomic_load_n(&(pinner->tid.futex), __ATOMIC_ACQUIRE);
- if (!(tid & FUTEX_OWNER_DIED)) {
+ if (!pinner->ownership_tracker.LoadAcquire().OwnerIsDead()) {
continue;
}
pinner->pinned.Invalidate();
- __atomic_store_n(&(pinner->tid.futex), 0, __ATOMIC_RELEASE);
+ pinner->ownership_tracker.ForceClear();
}
// If all the senders are (or were made) good, there is no need to do the hard
@@ -284,9 +280,7 @@
num_missing = 0;
for (size_t i = 0; i < num_senders; ++i) {
Sender *const sender = memory->GetSender(i);
- const uint32_t tid =
- __atomic_load_n(&(sender->tid.futex), __ATOMIC_ACQUIRE);
- if (tid & FUTEX_OWNER_DIED) {
+ if (sender->ownership_tracker.LoadAcquire().OwnerIsDead()) {
if (!need_recovery[i]) {
return false;
}
@@ -331,9 +325,7 @@
const size_t starting_num_missing = num_missing;
for (size_t i = 0; i < num_senders; ++i) {
Sender *sender = memory->GetSender(i);
- const uint32_t tid =
- __atomic_load_n(&(sender->tid.futex), __ATOMIC_ACQUIRE);
- if (!(tid & FUTEX_OWNER_DIED)) {
+ if (!sender->ownership_tracker.LoadAcquire().OwnerIsDead()) {
CHECK(!need_recovery[i]) << ": Somebody else recovered a sender: " << i;
continue;
}
@@ -368,7 +360,7 @@
->header.queue_index.RelaxedInvalidate();
// And then mark this sender clean.
- __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_RELEASE);
+ sender->ownership_tracker.ForceClear();
need_recovery[i] = false;
// And account for scratch_index.
@@ -394,7 +386,7 @@
sender->to_replace.Invalidate();
// And then mark this sender clean.
- __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_RELEASE);
+ sender->ownership_tracker.ForceClear();
need_recovery[i] = false;
// And account for to_replace.
@@ -437,6 +429,14 @@
} // namespace
+bool PretendThatOwnerIsDeadForTesting(aos_mutex *mutex, pid_t tid) {
+ if (static_cast<pid_t>(mutex->futex & FUTEX_TID_MASK) == tid) {
+ mutex->futex = FUTEX_OWNER_DIED;
+ return true;
+ }
+ return false;
+}
+
size_t LocklessQueueConfiguration::message_size() const {
// Round up the message size so following data is aligned appropriately.
// Make sure to leave space to align the message data. It will be aligned
@@ -685,11 +685,10 @@
CHECK_NE(watcher_index_, -1);
// Make sure we still own the slot we are supposed to.
- CHECK(
- death_notification_is_held(&(memory_->GetWatcher(watcher_index_)->tid)));
+ CHECK(memory_->GetWatcher(watcher_index_)->ownership_tracker.IsHeldBySelf());
// The act of unlocking invalidates the entry. Invalidate it.
- death_notification_release(&(memory_->GetWatcher(watcher_index_)->tid));
+ memory_->GetWatcher(watcher_index_)->ownership_tracker.Release();
// And internally forget the slot.
watcher_index_ = -1;
@@ -699,7 +698,7 @@
// And confirm that nothing is owned by us.
const int num_watchers = memory_->num_watchers();
for (int i = 0; i < num_watchers; ++i) {
- CHECK(!death_notification_is_held(&(memory_->GetWatcher(i)->tid)))
+ CHECK(!memory_->GetWatcher(i)->ownership_tracker.IsHeldBySelf())
<< ": " << i;
}
}
@@ -732,14 +731,14 @@
for (int i = 0; i < num_watchers; ++i) {
// If we see a slot the kernel has marked as dead, everything we do reusing
// it needs to happen-after whatever that process did before dying.
- auto *const futex = &(memory_->GetWatcher(i)->tid.futex);
- const uint32_t tid = __atomic_load_n(futex, __ATOMIC_ACQUIRE);
- if (tid == 0 || (tid & FUTEX_OWNER_DIED)) {
+ auto *const ownership_tracker =
+ &(memory_->GetWatcher(i)->ownership_tracker);
+ if (ownership_tracker->LoadAcquire().IsUnclaimedOrOwnerIsDead()) {
watcher_index_ = i;
// Relaxed is OK here because we're the only task going to touch it
// between here and the write in death_notification_init below (other
// recovery is blocked by us holding the setup lock).
- __atomic_store_n(futex, 0, __ATOMIC_RELAXED);
+ ownership_tracker->ForceClear();
break;
}
}
@@ -756,7 +755,7 @@
// Grabbing a mutex is a compiler and memory barrier, so nothing before will
// get rearranged afterwords.
- death_notification_init(&(w->tid));
+ w->ownership_tracker.Acquire();
}
LocklessQueueWakeUpper::LocklessQueueWakeUpper(LocklessQueue queue)
@@ -778,25 +777,25 @@
// creating a pidfd is likely not RT.
for (size_t i = 0; i < num_watchers; ++i) {
const Watcher *w = memory_->GetWatcher(i);
- watcher_copy_[i].tid = __atomic_load_n(&(w->tid.futex), __ATOMIC_RELAXED);
+ watcher_copy_[i].ownership_snapshot = w->ownership_tracker.LoadRelaxed();
// Force the load of the TID to come first.
aos_compiler_memory_barrier();
watcher_copy_[i].pid = w->pid.load(std::memory_order_relaxed);
watcher_copy_[i].priority = w->priority.load(std::memory_order_relaxed);
// Use a priority of -1 to mean an invalid entry to make sorting easier.
- if (watcher_copy_[i].tid & FUTEX_OWNER_DIED || watcher_copy_[i].tid == 0) {
+ if (watcher_copy_[i].ownership_snapshot.OwnerIsDead() ||
+ watcher_copy_[i].ownership_snapshot.IsUnclaimed()) {
watcher_copy_[i].priority = -1;
} else {
// Ensure all of this happens after we're done looking at the pid+priority
// in shared memory.
aos_compiler_memory_barrier();
- if (watcher_copy_[i].tid != static_cast<pid_t>(__atomic_load_n(
- &(w->tid.futex), __ATOMIC_RELAXED))) {
+ if (watcher_copy_[i].ownership_snapshot !=
+ w->ownership_tracker.LoadRelaxed()) {
// Confirm that the watcher hasn't been re-used and modified while we
// read it. If it has, mark it invalid again.
watcher_copy_[i].priority = -1;
- watcher_copy_[i].tid = 0;
}
}
}
@@ -835,8 +834,8 @@
// Send the signal. Target just the thread that sent it so that we can
// support multiple watchers in a process (when someone creates multiple
// event loops in different threads).
- rt_tgsigqueueinfo(watcher_copy.pid, watcher_copy.tid, kWakeupSignal,
- &uinfo);
+ rt_tgsigqueueinfo(watcher_copy.pid, watcher_copy.ownership_snapshot.tid(),
+ kWakeupSignal, &uinfo);
++count;
}
@@ -872,8 +871,7 @@
// This doesn't need synchronization because we're the only process doing
// initialization right now, and nobody else will be touching senders which
// we're interested in.
- const uint32_t tid = __atomic_load_n(&(s->tid.futex), __ATOMIC_RELAXED);
- if (tid == 0) {
+ if (s->ownership_tracker.LoadRelaxed().IsUnclaimed()) {
sender_index_ = i;
break;
}
@@ -888,7 +886,7 @@
// Indicate that we are now alive by taking over the slot. If the previous
// owner died, we still want to do this.
- death_notification_init(&(sender->tid));
+ sender->ownership_tracker.Acquire();
const Index scratch_index = sender->scratch_index.RelaxedLoad();
Message *const message = memory_->GetMessage(scratch_index);
@@ -899,7 +897,7 @@
LocklessQueueSender::~LocklessQueueSender() {
if (sender_index_ != -1) {
CHECK(memory_ != nullptr);
- death_notification_release(&(memory_->GetSender(sender_index_)->tid));
+ memory_->GetSender(sender_index_)->ownership_tracker.Release();
}
}
@@ -1183,8 +1181,7 @@
// This doesn't need synchronization because we're the only process doing
// initialization right now, and nobody else will be touching pinners which
// we're interested in.
- const uint32_t tid = __atomic_load_n(&(p->tid.futex), __ATOMIC_RELAXED);
- if (tid == 0) {
+ if (p->ownership_tracker.LoadRelaxed().IsUnclaimed()) {
pinner_index_ = i;
break;
}
@@ -1200,7 +1197,7 @@
// Indicate that we are now alive by taking over the slot. If the previous
// owner died, we still want to do this.
- death_notification_init(&(p->tid));
+ p->ownership_tracker.Acquire();
}
LocklessQueuePinner::~LocklessQueuePinner() {
@@ -1208,7 +1205,7 @@
CHECK(memory_ != nullptr);
memory_->GetPinner(pinner_index_)->pinned.Invalidate();
aos_compiler_memory_barrier();
- death_notification_release(&(memory_->GetPinner(pinner_index_)->tid));
+ memory_->GetPinner(pinner_index_)->ownership_tracker.Release();
}
}
@@ -1622,8 +1619,8 @@
for (size_t i = 0; i < memory->num_senders(); ++i) {
const Sender *s = memory->GetSender(i);
::std::cout << " [" << i << "] -> Sender {" << ::std::endl;
- ::std::cout << " aos_mutex tid = " << PrintMutex(&s->tid)
- << ::std::endl;
+ ::std::cout << " RobustOwnershipTracker ownership_tracker = "
+ << s->ownership_tracker.DebugString() << ::std::endl;
::std::cout << " AtomicIndex scratch_index = "
<< s->scratch_index.Load().DebugString() << ::std::endl;
::std::cout << " AtomicIndex to_replace = "
@@ -1637,8 +1634,8 @@
for (size_t i = 0; i < memory->num_pinners(); ++i) {
const Pinner *p = memory->GetPinner(i);
::std::cout << " [" << i << "] -> Pinner {" << ::std::endl;
- ::std::cout << " aos_mutex tid = " << PrintMutex(&p->tid)
- << ::std::endl;
+ ::std::cout << " RobustOwnershipTracker ownership_tracker = "
+ << p->ownership_tracker.DebugString() << ::std::endl;
::std::cout << " AtomicIndex scratch_index = "
<< p->scratch_index.Load().DebugString() << ::std::endl;
::std::cout << " AtomicIndex pinned = "
@@ -1653,8 +1650,8 @@
for (size_t i = 0; i < memory->num_watchers(); ++i) {
const Watcher *w = memory->GetWatcher(i);
::std::cout << " [" << i << "] -> Watcher {" << ::std::endl;
- ::std::cout << " aos_mutex tid = " << PrintMutex(&w->tid)
- << ::std::endl;
+ ::std::cout << " RobustOwnershipTracker ownership_tracker = "
+ << w->ownership_tracker.DebugString() << ::std::endl;
::std::cout << " pid_t pid = " << w->pid << ::std::endl;
::std::cout << " int priority = " << w->priority << ::std::endl;
::std::cout << " }" << ::std::endl;