Merge changes I94afdc39,Id92f11a3,I403535de,I468c5a35,I1f1749a6, ...
* changes:
Clean up memory barriers and documentation in lockless_queue
Align the shared memory data better
Use acquire/release instead of seq_cst
Fix compiler_memory_barrier
Use dedicated functions for using futexes to wait for a task to die
Make signalfd wrapper more robust
diff --git a/aos/ipc_lib/BUILD b/aos/ipc_lib/BUILD
index 908cb80..57adc6d 100644
--- a/aos/ipc_lib/BUILD
+++ b/aos/ipc_lib/BUILD
@@ -164,6 +164,19 @@
],
)
+cc_test(
+ name = "signalfd_test",
+ srcs = [
+ "signalfd_test.cc",
+ ],
+ deps = [
+ ":signalfd",
+ "//aos/testing:googletest",
+ "//aos/testing:test_logging",
+ "@com_github_google_glog//:glog",
+ ],
+)
+
cc_library(
name = "index",
srcs = ["index.cc"],
diff --git a/aos/ipc_lib/aos_sync.cc b/aos/ipc_lib/aos_sync.cc
index efb1fb1..d5c3d4e 100644
--- a/aos/ipc_lib/aos_sync.cc
+++ b/aos/ipc_lib/aos_sync.cc
@@ -851,6 +851,44 @@
return (value & FUTEX_TID_MASK) == tid;
}
+void death_notification_init(aos_mutex *m) {
+ const uint32_t tid = get_tid();
+ if (kPrintOperations) {
+ printf("%" PRId32 ": %p death_notification start\n", tid, m);
+ }
+ my_robust_list::Adder adder(m);
+ {
+ RunObservers run_observers(m, true);
+ CHECK(compare_and_swap(&m->futex, 0, tid));
+ }
+ adder.Add();
+}
+
+void death_notification_release(aos_mutex *m) {
+ RunObservers run_observers(m, true);
+
+#ifndef NDEBUG
+ // Verify it's "locked", like it should be.
+ {
+ const uint32_t tid = get_tid();
+ if (kPrintOperations) {
+ printf("%" PRId32 ": %p death_notification release\n", tid, m);
+ }
+ const uint32_t value = __atomic_load_n(&m->futex, __ATOMIC_SEQ_CST);
+ assert((value & ~FUTEX_WAITERS) == tid);
+ }
+#endif
+
+ my_robust_list::Remover remover(m);
+ ANNOTATE_HAPPENS_BEFORE(m);
+ const int ret = sys_futex_unlock_pi(&m->futex);
+ if (ret != 0) {
+ my_robust_list::robust_head.pending_next = 0;
+ errno = -ret;
+ PLOG(FATAL) << "FUTEX_UNLOCK_PI(" << &m->futex << ") failed";
+ }
+}
+
int condition_wait(aos_condition *c, aos_mutex *m, struct timespec *end_time) {
RunObservers run_observers(c, false);
const uint32_t tid = get_tid();
diff --git a/aos/ipc_lib/aos_sync.h b/aos/ipc_lib/aos_sync.h
index 2824516..8290faa 100644
--- a/aos/ipc_lib/aos_sync.h
+++ b/aos/ipc_lib/aos_sync.h
@@ -26,7 +26,7 @@
// No initialization is necessary.
typedef aos_futex aos_condition;
-// For use with the mutex_ functions.
+// For use with the mutex_ or death_notification_ functions.
// futex must be initialized to 0.
// No initialization is necessary for next and previous.
// Under ThreadSanitizer, pthread_mutex_init must be initialized to false.
@@ -88,6 +88,25 @@
// checking mutexes as they are destroyed to catch problems with that early and
// stack-based recursive mutex locking.
bool mutex_islocked(const aos_mutex *m);
+
+// The death_notification_ functions are designed for one thread to wait for
+// another thread (possibly in a different process) to end. This can mean
+// exiting gracefully or dying at any point. They use a standard aos_mutex, but
+// this mutex may not be used with any of the mutex_ functions.
+
+// Initializes a variable which can be used to wait for this thread to die.
+// This can only be called once after initializing *m.
+void death_notification_init(aos_mutex *m);
+
+// Manually triggers a death notification for this thread.
+// This thread must have previously called death_notification_init(m).
+void death_notification_release(aos_mutex *m);
+
+// Returns whether or not m is held by the current thread.
+// This is mainly useful as a debug assertion.
+inline bool death_notification_is_held(aos_mutex *m) {
+ return mutex_islocked(m);
+}
#endif
// The futex_ functions are similar to the mutex_ ones but different.
diff --git a/aos/ipc_lib/index.h b/aos/ipc_lib/index.h
index b1afdc8..7d979ea 100644
--- a/aos/ipc_lib/index.h
+++ b/aos/ipc_lib/index.h
@@ -138,8 +138,12 @@
}
// Full bidirectional barriers here.
- QueueIndex Load(uint32_t count) { return QueueIndex(index_.load(), count); }
- inline void Store(QueueIndex value) { index_.store(value.index_); }
+ QueueIndex Load(uint32_t count) {
+ return QueueIndex(index_.load(::std::memory_order_acquire), count);
+ }
+ inline void Store(QueueIndex value) {
+ index_.store(value.index_, ::std::memory_order_release);
+ }
// Invalidates the element unconditionally.
inline void Invalidate() { Store(QueueIndex::Invalid()); }
@@ -147,7 +151,8 @@
// Swaps expected for index atomically. Returns true on success, false
// otherwise.
inline bool CompareAndExchangeStrong(QueueIndex expected, QueueIndex index) {
- return index_.compare_exchange_strong(expected.index_, index.index_);
+ return index_.compare_exchange_strong(expected.index_, index.index_,
+ ::std::memory_order_acq_rel);
}
private:
@@ -221,16 +226,18 @@
// Invalidates the index atomically, but without any ordering constraints.
void RelaxedInvalidate() { RelaxedStore(Index::Invalid()); }
- // Full bidirectional barriers here.
+ // Full barriers here.
void Invalidate() { Store(Index::Invalid()); }
- void Store(Index index) { index_.store(index.index_); }
- Index Load() { return Index(index_.load()); }
-
+ void Store(Index index) {
+ index_.store(index.index_, ::std::memory_order_release);
+ }
+ Index Load() { return Index(index_.load(::std::memory_order_acquire)); }
// Swaps expected for index atomically. Returns true on success, false
// otherwise.
inline bool CompareAndExchangeStrong(Index expected, Index index) {
- return index_.compare_exchange_strong(expected.index_, index.index_);
+ return index_.compare_exchange_strong(expected.index_, index.index_,
+ ::std::memory_order_acq_rel);
}
private:
diff --git a/aos/ipc_lib/lockless_queue.cc b/aos/ipc_lib/lockless_queue.cc
index c2b0254..78f990d 100644
--- a/aos/ipc_lib/lockless_queue.cc
+++ b/aos/ipc_lib/lockless_queue.cc
@@ -16,16 +16,31 @@
namespace aos {
namespace ipc_lib {
-
namespace {
-void GrabQueueSetupLockOrDie(LocklessQueueMemory *memory) {
- const int result = mutex_grab(&(memory->queue_setup_lock));
- CHECK(result == 0 || result == 1);
-}
+class GrabQueueSetupLockOrDie {
+ public:
+ GrabQueueSetupLockOrDie(LocklessQueueMemory *memory) : memory_(memory) {
+ const int result = mutex_grab(&(memory->queue_setup_lock));
+ CHECK(result == 0 || result == 1) << ": " << result;
+ }
-// This must be called under the queue_setup_lock.
-void Cleanup(LocklessQueueMemory *memory) {
+ ~GrabQueueSetupLockOrDie() { mutex_unlock(&(memory_->queue_setup_lock)); }
+
+ GrabQueueSetupLockOrDie(const GrabQueueSetupLockOrDie &) = delete;
+ GrabQueueSetupLockOrDie &operator=(const GrabQueueSetupLockOrDie &) = delete;
+
+ private:
+ LocklessQueueMemory *const memory_;
+};
+
+void Cleanup(LocklessQueueMemory *memory, const GrabQueueSetupLockOrDie &) {
+ // Make sure we start looking at shared memory fresh right now. We'll handle
+ // people dying partway through by either cleaning up after them or not, but
+ // we want to ensure we clean up after anybody who has already died when we
+ // start.
+ aos_compiler_memory_barrier();
+
const size_t num_senders = memory->num_senders();
const size_t queue_size = memory->queue_size();
const size_t num_messages = memory->num_messages();
@@ -54,14 +69,16 @@
for (size_t i = 0; i < num_senders; ++i) {
Sender *sender = memory->GetSender(i);
const uint32_t tid =
- __atomic_load_n(&(sender->tid.futex), __ATOMIC_RELAXED);
+ __atomic_load_n(&(sender->tid.futex), __ATOMIC_ACQUIRE);
if (tid & FUTEX_OWNER_DIED) {
VLOG(3) << "Found an easy death for sender " << i;
+ // We can do a relaxed load here because we're the only person touching
+ // this sender at this point.
const Index to_replace = sender->to_replace.RelaxedLoad();
const Index scratch_index = sender->scratch_index.Load();
// I find it easiest to think about this in terms of the set of observable
- // states. The main code follows the following states:
+ // states. The main code progresses through the following states:
// 1) scratch_index = xxx
// to_replace = invalid
@@ -92,14 +109,14 @@
sender->to_replace.Invalidate();
// And mark that we succeeded.
- __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_SEQ_CST);
+ __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_RELEASE);
++valid_senders;
}
} else {
// 1) or 4). Make sure we aren't corrupted and declare victory.
CHECK(scratch_index.valid());
- __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_SEQ_CST);
+ __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_RELEASE);
++valid_senders;
}
} else {
@@ -123,12 +140,16 @@
while ((num_accounted_for + num_missing) != num_messages) {
num_missing = 0;
for (size_t i = 0; i < num_senders; ++i) {
- Sender *sender = memory->GetSender(i);
+ Sender *const sender = memory->GetSender(i);
const uint32_t tid =
- __atomic_load_n(&(sender->tid.futex), __ATOMIC_RELAXED);
+ __atomic_load_n(&(sender->tid.futex), __ATOMIC_ACQUIRE);
if (tid & FUTEX_OWNER_DIED) {
++num_missing;
} else {
+ // We can do a relaxed load here because we're the only person touching
+ // this sender at this point, if it matters. If it's not a dead sender,
+ // then any message it every has will already be accounted for, so this
+ // will always be a NOP.
const Index scratch_index = sender->scratch_index.RelaxedLoad();
if (!accounted_for[scratch_index.message_index()]) {
++num_accounted_for;
@@ -138,6 +159,7 @@
}
for (size_t i = 0; i < queue_size; ++i) {
+ // Same logic as above for scratch_index applies here too.
const Index index = memory->GetQueue(i)->RelaxedLoad();
if (!accounted_for[index.message_index()]) {
++num_accounted_for;
@@ -151,8 +173,10 @@
for (size_t i = 0; i < num_senders; ++i) {
Sender *sender = memory->GetSender(i);
const uint32_t tid =
- __atomic_load_n(&(sender->tid.futex), __ATOMIC_RELAXED);
+ __atomic_load_n(&(sender->tid.futex), __ATOMIC_ACQUIRE);
if (tid & FUTEX_OWNER_DIED) {
+ // We can do relaxed loads here because we're the only person touching
+ // this sender at this point.
const Index scratch_index = sender->scratch_index.RelaxedLoad();
const Index to_replace = sender->to_replace.RelaxedLoad();
@@ -170,7 +194,7 @@
sender->to_replace.Invalidate();
// And then mark this sender clean.
- __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_SEQ_CST);
+ __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_RELEASE);
// And account for scratch_index.
accounted_for[scratch_index.message_index()] = true;
@@ -188,7 +212,7 @@
sender->to_replace.Invalidate();
// And then mark this sender clean.
- __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_SEQ_CST);
+ __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_RELEASE);
// And account for to_replace.
accounted_for[to_replace.message_index()] = true;
@@ -208,6 +232,7 @@
// Exposes rt_tgsigqueueinfo so we can send the signal *just* to the target
// thread.
+// TODO(Brian): Do directly in assembly for armhf at least for efficiency.
int rt_tgsigqueueinfo(pid_t tgid, pid_t tid, int sig, siginfo_t *si) {
return syscall(SYS_rt_tgsigqueueinfo, tgid, tid, sig, si);
}
@@ -215,25 +240,25 @@
} // namespace
size_t LocklessQueueMemorySize(LocklessQueueConfiguration config) {
- // Round up the message size so following data is double aligned. That should
- // be overkill for most platforms. And the checks below confirms it.
- config.message_data_size = (config.message_data_size + 7) & ~0x7;
+ // Round up the message size so following data is aligned appropriately.
+ config.message_data_size =
+ LocklessQueueMemory::AlignmentRoundUp(config.message_data_size);
// As we build up the size, confirm that everything is aligned to the
// alignment requirements of the type.
size_t size = sizeof(LocklessQueueMemory);
- CHECK_EQ(size & (alignof(LocklessQueueMemory) - 1), 0u);
+ CHECK_EQ(size % alignof(LocklessQueueMemory), 0u);
- CHECK_EQ(size & (alignof(AtomicIndex) - 1), 0u);
+ CHECK_EQ(size % alignof(AtomicIndex), 0u);
size += LocklessQueueMemory::SizeOfQueue(config);
- CHECK_EQ(size & (alignof(Message) - 1), 0u);
+ CHECK_EQ(size % alignof(Message), 0u);
size += LocklessQueueMemory::SizeOfMessages(config);
- CHECK_EQ(size & (alignof(Watcher) - 1), 0u);
+ CHECK_EQ(size % alignof(Watcher), 0u);
size += LocklessQueueMemory::SizeOfWatchers(config);
- CHECK_EQ(size & (alignof(Sender) - 1), 0u);
+ CHECK_EQ(size % alignof(Sender), 0u);
size += LocklessQueueMemory::SizeOfSenders(config);
return size;
@@ -246,15 +271,15 @@
// Grab the mutex. We don't care if the previous reader died. We are going
// to check everything anyways.
- GrabQueueSetupLockOrDie(memory);
+ GrabQueueSetupLockOrDie grab_queue_setup_lock(memory);
if (!memory->initialized) {
// TODO(austin): Check these for out of bounds.
memory->config.num_watchers = config.num_watchers;
memory->config.num_senders = config.num_senders;
memory->config.queue_size = config.queue_size;
- // Round up to the nearest double word bytes.
- memory->config.message_data_size = (config.message_data_size + 7) & ~0x7;
+ memory->config.message_data_size =
+ LocklessQueueMemory::AlignmentRoundUp(config.message_data_size);
const size_t num_messages = memory->num_messages();
// There need to be at most MaxMessages() messages allocated.
@@ -278,18 +303,18 @@
for (size_t i = 0; i < memory->num_senders(); ++i) {
::aos::ipc_lib::Sender *s = memory->GetSender(i);
- s->scratch_index.Store(Index(0xffff, i + memory->queue_size()));
+ // Nobody else can possibly be touching these because we haven't set
+ // initialized to true yet.
+ s->scratch_index.RelaxedStore(Index(0xffff, i + memory->queue_size()));
s->to_replace.RelaxedInvalidate();
}
+ aos_compiler_memory_barrier();
// Signal everything is done. This needs to be done last, so if we die, we
// redo initialization.
- // This is a full atomic (probably overkill), but this is at initialization
- // time, so it is cheap.
- memory->initialized.store(true);
+ memory->initialized = true;
}
- mutex_unlock(&(memory->queue_setup_lock));
return memory;
}
@@ -303,15 +328,15 @@
LocklessQueue::~LocklessQueue() {
CHECK_EQ(watcher_index_, -1);
- GrabQueueSetupLockOrDie(memory_);
+ GrabQueueSetupLockOrDie grab_queue_setup_lock(memory_);
const int num_watchers = memory_->num_watchers();
- // Cleanup is cheap. Go for it anyways.
+ // Cleanup is cheap. The next user will do it anyways, so no need for us to do
+ // anything right now.
// And confirm that nothing is owned by us.
for (int i = 0; i < num_watchers; ++i) {
- CHECK(!mutex_islocked(&(memory_->GetWatcher(i)->tid)));
+ CHECK(!death_notification_is_held(&(memory_->GetWatcher(i)->tid)));
}
- mutex_unlock(&(memory_->queue_setup_lock));
}
size_t LocklessQueue::QueueSize() const { return memory_->queue_size(); }
@@ -323,15 +348,17 @@
// Since everything is self consistent, all we need to do is make sure nobody
// else is running. Someone dying will get caught in the generic consistency
// check.
- GrabQueueSetupLockOrDie(memory_);
+ GrabQueueSetupLockOrDie grab_queue_setup_lock(memory_);
const int num_watchers = memory_->num_watchers();
// Now, find the first empty watcher and grab it.
CHECK_EQ(watcher_index_, -1);
for (int i = 0; i < num_watchers; ++i) {
+ // If we see a slot the kernel has marked as dead, everything we do reusing
+ // it needs to happen-after whatever that process did before dying.
const uint32_t tid =
- __atomic_load_n(&(memory_->GetWatcher(i)->tid.futex), __ATOMIC_RELAXED);
- if (tid == 0 || tid & FUTEX_OWNER_DIED) {
+ __atomic_load_n(&(memory_->GetWatcher(i)->tid.futex), __ATOMIC_ACQUIRE);
+ if (tid == 0 || (tid & FUTEX_OWNER_DIED)) {
watcher_index_ = i;
break;
}
@@ -339,7 +366,6 @@
// Bail if we failed to find an open slot.
if (watcher_index_ == -1) {
- mutex_unlock(&(memory_->queue_setup_lock));
return false;
}
@@ -350,36 +376,27 @@
// Grabbing a mutex is a compiler and memory barrier, so nothing before will
// get rearranged afterwords.
- //
- // Since everything is done under the queue_setup_lock, this should always
- // return immediately.
- const int result = mutex_grab(&(w->tid));
-
- mutex_unlock(&(memory_->queue_setup_lock));
-
- // We should either get the lock, or the previous owner should have died.
- // Anything else is a pretty serious error.
- return result == 0 || result == 1;
+ death_notification_init(&(w->tid));
+ return true;
}
void LocklessQueue::UnregisterWakeup() {
// Since everything is self consistent, all we need to do is make sure nobody
// else is running. Someone dying will get caught in the generic consistency
// check.
- GrabQueueSetupLockOrDie(memory_);
+ GrabQueueSetupLockOrDie grab_queue_setup_lock(memory_);
// Make sure we are registered.
CHECK_NE(watcher_index_, -1);
// Make sure we still own the slot we are supposed to.
- CHECK(mutex_islocked(&(memory_->GetWatcher(watcher_index_)->tid)));
+ CHECK(
+ death_notification_is_held(&(memory_->GetWatcher(watcher_index_)->tid)));
// The act of unlocking invalidates the entry. Invalidate it.
- mutex_unlock(&(memory_->GetWatcher(watcher_index_)->tid));
+ death_notification_release(&(memory_->GetWatcher(watcher_index_)->tid));
// And internally forget the slot.
watcher_index_ = -1;
-
- mutex_unlock(&(memory_->queue_setup_lock));
}
int LocklessQueue::Wakeup(const int current_priority) {
@@ -395,22 +412,26 @@
// creating a pidfd is likely not RT.
for (size_t i = 0; i < num_watchers; ++i) {
Watcher *w = memory_->GetWatcher(i);
- // Start by reading the tid. This needs to be atomic to force it to come
- // first.
- watcher_copy_[i].tid = __atomic_load_n(&(w->tid.futex), __ATOMIC_SEQ_CST);
- watcher_copy_[i].pid = w->pid;
- watcher_copy_[i].priority = w->priority;
+ watcher_copy_[i].tid = __atomic_load_n(&(w->tid.futex), __ATOMIC_RELAXED);
+ // Force the load of the TID to come first.
+ aos_compiler_memory_barrier();
+ watcher_copy_[i].pid = w->pid.load(std::memory_order_relaxed);
+ watcher_copy_[i].priority = w->priority.load(std::memory_order_relaxed);
// Use a priority of -1 to mean an invalid entry to make sorting easier.
if (watcher_copy_[i].tid & FUTEX_OWNER_DIED || watcher_copy_[i].tid == 0) {
watcher_copy_[i].priority = -1;
- } else if (watcher_copy_[i].tid !=
- static_cast<pid_t>(
- __atomic_load_n(&(w->tid.futex), __ATOMIC_SEQ_CST))) {
- // Confirm that the watcher hasn't been re-used and modified while we read
- // it. If it has, mark it invalid again.
- watcher_copy_[i].priority = -1;
- watcher_copy_[i].tid = 0;
+ } else {
+ // Ensure all of this happens after we're done looking at the pid+priority
+ // in shared memory.
+ aos_compiler_memory_barrier();
+ if (watcher_copy_[i].tid != static_cast<pid_t>(__atomic_load_n(
+ &(w->tid.futex), __ATOMIC_RELAXED))) {
+ // Confirm that the watcher hasn't been re-used and modified while we
+ // read it. If it has, mark it invalid again.
+ watcher_copy_[i].priority = -1;
+ watcher_copy_[i].tid = 0;
+ }
}
}
@@ -464,15 +485,18 @@
}
LocklessQueue::Sender::Sender(LocklessQueueMemory *memory) : memory_(memory) {
- GrabQueueSetupLockOrDie(memory_);
+ GrabQueueSetupLockOrDie grab_queue_setup_lock(memory_);
// Since we already have the lock, go ahead and try cleaning up.
- Cleanup(memory_);
+ Cleanup(memory_, grab_queue_setup_lock);
const int num_senders = memory_->num_senders();
for (int i = 0; i < num_senders; ++i) {
::aos::ipc_lib::Sender *s = memory->GetSender(i);
+ // This doesn't need synchronization because we're the only process doing
+ // initialization right now, and nobody else will be touching senders which
+ // we're interested in.
const uint32_t tid = __atomic_load_n(&(s->tid.futex), __ATOMIC_RELAXED);
if (tid == 0) {
sender_index_ = i;
@@ -486,17 +510,14 @@
::aos::ipc_lib::Sender *s = memory_->GetSender(sender_index_);
- // Atomically grab the mutex. This signals that we are alive. If the
- // previous owner died, we don't care, and want to grab the mutex anyways.
- const int result = mutex_grab(&(s->tid));
- CHECK(result == 0 || result == 1);
-
- mutex_unlock(&(memory->queue_setup_lock));
+ // Indicate that we are now alive by taking over the slot. If the previous
+ // owner died, we still want to do this.
+ death_notification_init(&(s->tid));
}
LocklessQueue::Sender::~Sender() {
if (memory_ != nullptr) {
- mutex_unlock(&(memory_->GetSender(sender_index_)->tid));
+ death_notification_release(&(memory_->GetSender(sender_index_)->tid));
}
}
@@ -532,9 +553,11 @@
const size_t queue_size = memory_->queue_size();
CHECK_LE(length, size());
- ::aos::ipc_lib::Sender *sender = memory_->GetSender(sender_index_);
- Index scratch_index = sender->scratch_index.RelaxedLoad();
- Message *message = memory_->GetMessage(scratch_index);
+ ::aos::ipc_lib::Sender *const sender = memory_->GetSender(sender_index_);
+ // We can do a relaxed load on our sender because we're the only person
+ // modifying it right now.
+ const Index scratch_index = sender->scratch_index.RelaxedLoad();
+ Message *const message = memory_->GetMessage(scratch_index);
message->header.length = length;
@@ -545,7 +568,8 @@
const QueueIndex incremented_queue_index = next_queue_index.Increment();
- // TODO(austin): I think we can drop the barrier off this.
+ // This needs to synchronize with whoever the previous writer at this
+ // location was.
const Index to_replace = memory_->LoadIndex(next_queue_index);
const QueueIndex decremented_queue_index =
@@ -575,12 +599,8 @@
// Confirm that the message is what it should be.
{
- // We just need this to be atomic and after the index has been calculated
- // and before we exchange the index back in. Both of those will be strong
- // barriers, so this is fine.
const QueueIndex previous_index =
- memory_->GetMessage(to_replace)
- ->header.queue_index.RelaxedLoad(queue_size);
+ memory_->GetMessage(to_replace)->header.queue_index.Load(queue_size);
if (previous_index != decremented_queue_index && previous_index.valid()) {
// Retry.
VLOG(3) << "Something fishy happened, queue index doesn't match. "
@@ -599,22 +619,30 @@
// queue_index store.
const Index index_to_write(next_queue_index, scratch_index.message_index());
+ aos_compiler_memory_barrier();
+ // We're the only person who cares about our scratch index, besides somebody
+ // cleaning up after us.
sender->scratch_index.RelaxedStore(index_to_write);
+ aos_compiler_memory_barrier();
message->header.queue_index.Store(next_queue_index);
+ aos_compiler_memory_barrier();
// The message is now filled out, and we have a confirmed slot to store
// into.
//
// Start by writing down what we are going to pull out of the queue. This
- // was Invalid before now.
+ // was Invalid before now. Only person who will read this is whoever cleans
+ // up after us, so no synchronization necessary.
sender->to_replace.RelaxedStore(to_replace);
+ aos_compiler_memory_barrier();
// Then exchange the next index into the queue.
if (!memory_->GetQueue(next_queue_index.Wrapped())
->CompareAndExchangeStrong(to_replace, index_to_write)) {
// Aw, didn't succeed. Retry.
sender->to_replace.RelaxedInvalidate();
+ aos_compiler_memory_barrier();
VLOG(3) << "Failed to wrap into queue";
continue;
}
@@ -623,12 +651,11 @@
memory_->next_queue_index.CompareAndExchangeStrong(actual_next_queue_index,
incremented_queue_index);
+ aos_compiler_memory_barrier();
// Now update the scratch space and record that we succeeded.
sender->scratch_index.Store(to_replace);
- // And then clear out the entry used to replace. This just needs to be
- // atomic. It can't be moved above the store because that is a full
- // barrier, but delaying it until later will only affect things if something
- // died.
+ aos_compiler_memory_barrier();
+ // And then record that we succeeded, but definitely after the above store.
sender->to_replace.RelaxedInvalidate();
break;
}
@@ -665,7 +692,7 @@
// Someone has re-used this message between when we pulled it out of the
// queue and when we grabbed its index. It is pretty hard to deduce
// what happened. Just try again.
- Message *new_m = memory_->GetMessage(queue_index);
+ Message *const new_m = memory_->GetMessage(queue_index);
if (m != new_m) {
m = new_m;
VLOG(3) << "Retrying, m doesn't match";
@@ -698,7 +725,7 @@
VLOG(3) << "Near zero, " << std::hex << uint32_queue_index;
return ReadResult::NOTHING_NEW;
} else {
- VLOG(3) << "not near zero, " << std::hex << uint32_queue_index;
+ VLOG(3) << "Not near zero, " << std::hex << uint32_queue_index;
return ReadResult::TOO_OLD;
}
}
@@ -719,6 +746,7 @@
// want. This means it didn't change out from under us.
// If something changed out from under us, we were reading it much too late in
// it's lifetime.
+ aos_compiler_memory_barrier();
const QueueIndex final_queue_index = m->header.queue_index.Load(queue_size);
if (final_queue_index != queue_index) {
VLOG(3) << "Changed out from under us. Reading " << std::hex
@@ -778,8 +806,7 @@
::std::cout << "LocklessQueueMemory (" << memory << ") {" << ::std::endl;
::std::cout << " aos_mutex queue_setup_lock = "
<< PrintMutex(&memory->queue_setup_lock) << ::std::endl;
- ::std::cout << " ::std::atomic<bool> initialized = " << memory->initialized
- << ::std::endl;
+ ::std::cout << " bool initialized = " << memory->initialized << ::std::endl;
::std::cout << " config {" << ::std::endl;
::std::cout << " size_t num_watchers = " << memory->config.num_watchers
<< ::std::endl;
diff --git a/aos/ipc_lib/lockless_queue.h b/aos/ipc_lib/lockless_queue.h
index fcc5d79..4a8523b 100644
--- a/aos/ipc_lib/lockless_queue.h
+++ b/aos/ipc_lib/lockless_queue.h
@@ -4,7 +4,6 @@
#include <signal.h>
#include <sys/signalfd.h>
#include <sys/types.h>
-#include <atomic>
#include <vector>
#include "aos/ipc_lib/aos_sync.h"
@@ -20,15 +19,16 @@
// then this watcher is invalid. The futex variable will then hold the tid of
// the watcher, or FUTEX_OWNER_DIED if the task died.
//
- // Note: this is modified with a lock held, but is always able to be read.
+ // Note: this is only modified with the queue_setup_lock lock held, but may
+ // always be read.
// Any state modification should happen before the lock is acquired.
aos_mutex tid;
// PID of the watcher.
- pid_t pid;
+ std::atomic<pid_t> pid;
// RT priority of the watcher.
- int priority;
+ std::atomic<int> priority;
};
// Structure to hold the state required to send messages.
@@ -37,7 +37,8 @@
// this sender is invalid. The futex variable will then hold the tid of the
// sender, or FUTEX_OWNER_DIED if the task died.
//
- // Note: this is modified with a lock held, but is always able to be read.
+ // Note: this is only modified with the queue_setup_lock lock held, but may
+ // always be read.
aos_mutex tid;
// Index of the message we will be filling out.
@@ -59,7 +60,7 @@
// Note: a value of 0xffffffff always means that the contents aren't valid.
AtomicQueueIndex queue_index;
- // Timestamp of the message. Needs to be atomically incrementing in the
+ // Timestamp of the message. Needs to be monotonically incrementing in the
// queue, which means that time needs to be re-sampled every time a write
// fails.
::aos::monotonic_clock::time_point monotonic_sent_time;
diff --git a/aos/ipc_lib/lockless_queue_memory.h b/aos/ipc_lib/lockless_queue_memory.h
index 162403a..0c0973c 100644
--- a/aos/ipc_lib/lockless_queue_memory.h
+++ b/aos/ipc_lib/lockless_queue_memory.h
@@ -2,33 +2,37 @@
#define AOS_IPC_LIB_LOCKLESS_QUEUE_MEMORY_H_
#include <sys/types.h>
-#include <atomic>
+#include <cstddef>
#include "aos/ipc_lib/aos_sync.h"
#include "aos/ipc_lib/index.h"
+#include "aos/ipc_lib/lockless_queue.h"
#include "aos/time/time.h"
namespace aos {
namespace ipc_lib {
struct LocklessQueueMemory {
+ // This is held during initialization. Cleanup after dead processes happens
+ // during this initialization process.
+ //
// A lot of things get easier if the only lockless writes are when messages
// are published. Do note, the datastructures protected by this lock need to
// be consistent at all times because a reader (or writer) may read them
// regardless of if the lock is held or not.
- //
- // Any non-constant time operations need to be done at queue startup time,
- // including cleanup. Cleanup is done when the next queue is opened.
aos_mutex queue_setup_lock;
- ::std::atomic<bool> initialized;
+ // Tracks that one process has completed initialization once.
+ // Only accessed under queue_setup_lock.
+ bool initialized;
LocklessQueueConfiguration config;
- // Size of the watchers list.
+ // Size of the watcher list.
size_t num_watchers() const { return config.num_watchers; }
// Size of the sender list.
size_t num_senders() const { return config.num_senders; }
+ // Number of messages logically in the queue at a time.
// List of pointers into the messages list.
size_t queue_size() const { return config.queue_size; }
@@ -55,59 +59,60 @@
// Watcher watchers[config.num_watchers];
// Sender senders[config.num_senders];
+ static constexpr size_t kDataAlignment = alignof(std::max_align_t);
+
// Aligned pointer to where the data starts.
- // Use a 64 bit type to require 64 bit alignment of the data inside.
- uint64_t data[];
+ // Make sure it's of a type which can safely alias with the actual datatype,
+ // and is aligned enough for any of the structs we will access this memory as.
+ alignas(kDataAlignment) char data[];
// Memory size functions for all 4 lists.
size_t SizeOfQueue() { return SizeOfQueue(config); }
static size_t SizeOfQueue(LocklessQueueConfiguration config) {
- return sizeof(AtomicIndex) * config.queue_size;
+ return AlignmentRoundUp(sizeof(AtomicIndex) * config.queue_size);
}
size_t SizeOfMessages() { return SizeOfMessages(config); }
static size_t SizeOfMessages(LocklessQueueConfiguration config) {
- return config.message_size() * config.num_messages();
+ return AlignmentRoundUp(config.message_size() * config.num_messages());
}
size_t SizeOfWatchers() { return SizeOfWatchers(config); }
static size_t SizeOfWatchers(LocklessQueueConfiguration config) {
- return sizeof(Watcher) * config.num_watchers;
+ return AlignmentRoundUp(sizeof(Watcher) * config.num_watchers);
}
size_t SizeOfSenders() { return SizeOfSenders(config); }
static size_t SizeOfSenders(LocklessQueueConfiguration config) {
- return sizeof(Sender) * config.num_senders;
+ return AlignmentRoundUp(sizeof(Sender) * config.num_senders);
}
// Getters for each of the 4 lists.
Sender *GetSender(size_t sender_index) {
- return reinterpret_cast<Sender *>(
- reinterpret_cast<uintptr_t>(&data[0]) + SizeOfQueue() +
- SizeOfMessages() + SizeOfWatchers() + sender_index * sizeof(Sender));
+ return reinterpret_cast<Sender *>(&data[0] + SizeOfQueue() +
+ SizeOfMessages() + SizeOfWatchers() +
+ sender_index * sizeof(Sender));
}
Watcher *GetWatcher(size_t watcher_index) {
- return reinterpret_cast<Watcher *>(reinterpret_cast<uintptr_t>(&data[0]) +
- SizeOfQueue() + SizeOfMessages() +
+ return reinterpret_cast<Watcher *>(&data[0] + SizeOfQueue() +
+ SizeOfMessages() +
watcher_index * sizeof(Watcher));
}
AtomicIndex *GetQueue(uint32_t index) {
- return reinterpret_cast<AtomicIndex *>(
- reinterpret_cast<uintptr_t>(&data[0]) + sizeof(AtomicIndex) * index);
+ return reinterpret_cast<AtomicIndex *>(&data[0] +
+ sizeof(AtomicIndex) * index);
}
// There are num_senders + queue_size messages. The free list is really the
// sender list, since those are messages available to be filled in and sent.
// This removes the need to find lost messages when a sender dies.
Message *GetMessage(Index index) {
- return reinterpret_cast<Message *>(reinterpret_cast<uintptr_t>(&data[0]) +
- SizeOfQueue() +
+ return reinterpret_cast<Message *>(&data[0] + SizeOfQueue() +
index.message_index() * message_size());
}
-
// Helpers to fetch messages from the queue.
Index LoadIndex(QueueIndex index) {
return GetQueue(index.Wrapped())->Load();
@@ -115,6 +120,10 @@
Message *GetMessage(QueueIndex index) {
return GetMessage(LoadIndex(index));
}
+
+ static constexpr size_t AlignmentRoundUp(size_t in) {
+ return (in + kDataAlignment - 1) / kDataAlignment * kDataAlignment;
+ }
};
} // namespace ipc_lib
diff --git a/aos/ipc_lib/signalfd.cc b/aos/ipc_lib/signalfd.cc
index af95598..363bf4a 100644
--- a/aos/ipc_lib/signalfd.cc
+++ b/aos/ipc_lib/signalfd.cc
@@ -12,21 +12,35 @@
SignalFd::SignalFd(::std::initializer_list<unsigned int> signals) {
// Build up the mask with the provided signals.
- sigemptyset(&mask_);
+ CHECK_EQ(0, sigemptyset(&blocked_mask_));
for (int signal : signals) {
- sigaddset(&mask_, signal);
+ CHECK_EQ(0, sigaddset(&blocked_mask_, signal));
}
// Then build a signalfd. Make it nonblocking so it works well with an epoll
// loop, and have it close on exec.
- PCHECK((fd_ = signalfd(-1, &mask_, SFD_NONBLOCK | SFD_CLOEXEC)) != 0);
+ PCHECK((fd_ = signalfd(-1, &blocked_mask_, SFD_NONBLOCK | SFD_CLOEXEC)) != 0);
// Now that we have a consumer of the signal, block the signals so the
- // signalfd gets them.
- pthread_sigmask(SIG_BLOCK, &mask_, nullptr);
+ // signalfd gets them. Record which ones we actually blocked, so we can
+ // unblock just those later.
+ sigset_t old_mask;
+ CHECK_EQ(0, pthread_sigmask(SIG_BLOCK, &blocked_mask_, &old_mask));
+ for (int signal : signals) {
+ if (sigismember(&old_mask, signal)) {
+ CHECK_EQ(0, sigdelset(&blocked_mask_, signal));
+ }
+ }
}
SignalFd::~SignalFd() {
- // Unwind the constructor. Unblock the signals and close the fd.
- pthread_sigmask(SIG_UNBLOCK, &mask_, nullptr);
+ // Unwind the constructor. Unblock the signals and close the fd. Verify nobody
+ // else unblocked the signals we're supposed to unblock in the meantime.
+ sigset_t old_mask;
+ CHECK_EQ(0, pthread_sigmask(SIG_UNBLOCK, &blocked_mask_, &old_mask));
+ sigset_t unblocked_mask;
+ CHECK_EQ(0, sigandset(&unblocked_mask, &blocked_mask_, &old_mask));
+ if (memcmp(&unblocked_mask, &blocked_mask_, sizeof(unblocked_mask)) != 0) {
+ LOG(FATAL) << "Some other code unblocked one or more of our signals";
+ }
PCHECK(close(fd_) == 0);
}
@@ -39,6 +53,8 @@
// by setting the signal number to 0.
if (ret != static_cast<int>(sizeof(signalfd_siginfo))) {
result.ssi_signo = 0;
+ } else {
+ CHECK_NE(0u, result.ssi_signo);
}
return result;
}
diff --git a/aos/ipc_lib/signalfd.h b/aos/ipc_lib/signalfd.h
index 7d2021a..b3a9063 100644
--- a/aos/ipc_lib/signalfd.h
+++ b/aos/ipc_lib/signalfd.h
@@ -29,7 +29,8 @@
private:
int fd_ = -1;
- sigset_t mask_;
+ // The signals we blocked in the constructor.
+ sigset_t blocked_mask_;
};
} // namespace ipc_lib
diff --git a/aos/ipc_lib/signalfd_test.cc b/aos/ipc_lib/signalfd_test.cc
new file mode 100644
index 0000000..b4fe74b
--- /dev/null
+++ b/aos/ipc_lib/signalfd_test.cc
@@ -0,0 +1,76 @@
+#include "aos/ipc_lib/signalfd.h"
+
+#include "gtest/gtest.h"
+#include "glog/logging.h"
+#include "aos/testing/test_logging.h"
+
+namespace aos {
+namespace ipc_lib {
+namespace testing {
+
+// Tests in this file use separate threads to isolate all manipulation of signal
+// masks between test cases.
+
+// Verify that SignalFd will leave signals unblocked if we ask it to.
+TEST(SignalFdTest, LeaveSignalBlocked) {
+ ::aos::testing::EnableTestLogging();
+ std::thread thread([]() {
+ {
+ sigset_t test_mask;
+ CHECK_EQ(0, sigemptyset(&test_mask));
+ CHECK_EQ(0, sigaddset(&test_mask, SIGUSR1));
+ PCHECK(sigprocmask(SIG_BLOCK, &test_mask, nullptr) == 0);
+ }
+ SignalFd({SIGUSR1});
+ {
+ sigset_t blocked_now;
+ PCHECK(sigprocmask(SIG_BLOCK, nullptr, &blocked_now) == 0);
+ ASSERT_TRUE(sigismember(&blocked_now, SIGUSR1));
+ }
+ });
+ thread.join();
+}
+
+// Verify that SignalFd actually blocks the requested signals, and unblocks them
+// afterwards.
+TEST(SignalFdTest, BlockSignal) {
+ ::aos::testing::EnableTestLogging();
+ std::thread thread([]() {
+ {
+ sigset_t blocked_now;
+ PCHECK(sigprocmask(SIG_BLOCK, nullptr, &blocked_now) == 0);
+ ASSERT_FALSE(sigismember(&blocked_now, SIGUSR1));
+ }
+ {
+ SignalFd signalfd({SIGUSR1});
+ sigset_t blocked_now;
+ PCHECK(sigprocmask(SIG_BLOCK, nullptr, &blocked_now) == 0);
+ ASSERT_TRUE(sigismember(&blocked_now, SIGUSR1));
+ }
+ {
+ sigset_t blocked_now;
+ PCHECK(sigprocmask(SIG_BLOCK, nullptr, &blocked_now) == 0);
+ ASSERT_FALSE(sigismember(&blocked_now, SIGUSR1));
+ }
+ });
+ thread.join();
+}
+
+// Verify that SignalFd responds correctly when some other code unblocks one of
+// its signals.
+TEST(SignalFdDeathTest, ExternalUnblockSignal) {
+ ::aos::testing::EnableTestLogging();
+ EXPECT_DEATH(
+ {
+ SignalFd signalfd({SIGUSR1});
+ sigset_t test_mask;
+ CHECK_EQ(0, sigemptyset(&test_mask));
+ CHECK_EQ(0, sigaddset(&test_mask, SIGUSR1));
+ PCHECK(sigprocmask(SIG_UNBLOCK, &test_mask, nullptr) == 0);
+ },
+ "Some other code unblocked one or more of our signals");
+}
+
+} // namespace testing
+} // namespace ipc_lib
+} // namespace aos
diff --git a/aos/util/compiler_memory_barrier.h b/aos/util/compiler_memory_barrier.h
index 33da511..6be3c59 100644
--- a/aos/util/compiler_memory_barrier.h
+++ b/aos/util/compiler_memory_barrier.h
@@ -5,7 +5,7 @@
// Using this function makes it clearer what you're doing and easier to be
// portable.
static inline void aos_compiler_memory_barrier(void) {
- __asm__ __volatile__("" ::: "memory");
+ __asm__ __volatile__("" ::: "memory", "cc");
}
#endif // AOS_UTIL_COMPILER_MEMORY_BARRIER_H_