blob: 56424343f2fe71b3dd7813481b2d93efbbe3930f [file] [log] [blame]
#if !AOS_DEBUG
#undef NDEBUG
#define NDEBUG
#endif
#include "aos/ipc_lib/aos_sync.h"
#include <linux/futex.h>
#include <pthread.h>
#include <signal.h>
#include <stdio.h>
#include <sys/syscall.h>
#include <unistd.h>
#include <cassert>
#include <cerrno>
#include <cinttypes>
#include <climits>
#include <ostream>
#include "aos/ipc_lib/shm_observers.h"
#ifdef AOS_SANITIZER_thread
#include <sanitizer/tsan_interface_atomic.h>
#endif
#include "absl/base/call_once.h"
#include "absl/log/check.h"
#include "absl/log/log.h"
#include "aos/macros.h"
#include "aos/util/compiler_memory_barrier.h"
using ::aos::linux_code::ipc_lib::RunShmObservers;
// This code was originally based on
// <https://www.akkadia.org/drepper/futex.pdf>, but is has since evolved a lot.
// However, that still has useful information.
//
// Finding information about actually using futexes is really REALLY hard, so
// here's a list of the stuff that I've used:
// futex(7) has a really high-level overview.
// <http://locklessinc.com/articles/futex_cheat_sheet/> describes some of the
// operations in a bit more detail than most places.
// <http://locklessinc.com/articles/mutex_cv_futex/> is the basis of our
// implementations (before PI).
// <http://lwn.net/Articles/360699/> has a nice overview of futexes in late 2009
// (fairly recent compared to everything else...).
// <https://www.kernel.org/doc/Documentation/pi-futex.txt>,
// <https://www.kernel.org/doc/Documentation/futex-requeue-pi.txt>,
// <https://www.kernel.org/doc/Documentation/robust-futexes.txt>,
// and <https://www.kernel.org/doc/Documentation/robust-futex-ABI.txt> are all
// useful references.
// The kernel source (kernel/futex.c) has some useful comments about what the
// various operations do (except figuring out which argument goes where in the
// syscall is still confusing).
// futex(2) is basically useless except for describing the order of the
// arguments (it only has high-level descriptions of what some of the
// operations do, and some of them are wrong in Wheezy).
// glibc's nptl pthreads implementation is the intended user of most of these
// things, so it is also a good place to look for examples. However, it is all
// very hard to read because it supports ~20 different kinds of mutexes and
// several variations of condition variables, and some of the pieces of code
// are only written in assembly.
// set_robust_list(2) is wrong in Wheezy (it doesn't actually take a TID
// argument).
//
// Can't use PRIVATE futex operations because they use the pid (or something) as
// part of the hash.
//
// ThreadSanitizer understands how these mutexes etc work. It appears to be able
// to figure out the happens-before relationship from the __ATOMIC_SEQ_CST
// atomic primitives.
//
// Remember that EAGAIN and EWOUDBLOCK are the same! (ie if you get EAGAIN from
// FUTEX_WAIT, the docs call it EWOULDBLOCK...)
// Values for an aos_mutex.futex (kernel-mandated):
// 0 = unlocked
// TID = locked, not contended
// |FUTEX_WAITERS = there are waiters (aka contended)
// |FUTEX_OWNER_DIED = old owner died
//
// Values for an aos_futex being used directly:
// 0 = unset
// 1 = set
//
// The value of an aos_condition is just a generation counter.
#ifdef AOS_SANITIZER_thread
extern "C" void AnnotateHappensBefore(const char *file, int line,
uintptr_t addr);
extern "C" void AnnotateHappensAfter(const char *file, int line,
uintptr_t addr);
#define ANNOTATE_HAPPENS_BEFORE(address) \
AnnotateHappensBefore(__FILE__, __LINE__, \
reinterpret_cast<uintptr_t>(address))
#define ANNOTATE_HAPPENS_AFTER(address) \
AnnotateHappensAfter(__FILE__, __LINE__, reinterpret_cast<uintptr_t>(address))
#else
#define ANNOTATE_HAPPENS_BEFORE(address)
#define ANNOTATE_HAPPENS_AFTER(address)
#endif
namespace {
const bool kRobustListDebug = false;
const bool kLockDebug = false;
const bool kPrintOperations = false;
// These sys_futex_* functions are wrappers around syscall(SYS_futex). They each
// take a specific set of arguments for a given futex operation. They return the
// result or a negated errno value. -1..-4095 mean errors and not successful
// results, which is guaranteed by the kernel.
//
// They each have optimized versions for some architectures which don't go
// through syscall(2) or errno. These use register variables to get the values
// in the right registers to actually make the syscall.
// The actual macros that we key off of to use the inline versions or not.
#if defined(__ARM_EABI__)
// The syscall interface is different for non-EABI ARM, so we test specifically
// for EABI.
#define ARM_EABI_INLINE_SYSCALL 1
#define AARCH64_INLINE_SYSCALL 0
#elif defined(__aarch64__)
// Linux only has one supported syscall ABI on aarch64, which is the one we
// support.
#define ARM_EABI_INLINE_SYSCALL 0
#define AARCH64_INLINE_SYSCALL 1
#else
#define ARM_EABI_INLINE_SYSCALL 0
#define AARCH64_INLINE_SYSCALL 0
#endif
// Used for FUTEX_WAIT, FUTEX_LOCK_PI, and FUTEX_TRYLOCK_PI.
inline int sys_futex_wait(int op, aos_futex *addr1, int val1,
const struct timespec *timeout) {
#if ARM_EABI_INLINE_SYSCALL
register aos_futex *addr1_reg __asm__("r0") = addr1;
register int op_reg __asm__("r1") = op;
register int val1_reg __asm__("r2") = val1;
register const struct timespec *timeout_reg __asm__("r3") = timeout;
register int syscall_number __asm__("r7") = SYS_futex;
register int result __asm__("r0");
__asm__ volatile("swi #0"
: "=r"(result)
: "r"(addr1_reg), "r"(op_reg), "r"(val1_reg),
"r"(timeout_reg), "r"(syscall_number)
: "memory");
return result;
#elif AARCH64_INLINE_SYSCALL
register aos_futex *addr1_reg __asm__("x0") = addr1;
register int op_reg __asm__("x1") = op;
register int val1_reg __asm__("x2") = val1;
register const struct timespec *timeout_reg __asm__("x3") = timeout;
register int syscall_number __asm__("x8") = SYS_futex;
register int result __asm__("x0");
__asm__ volatile("svc #0"
: "=r"(result)
: "r"(addr1_reg), "r"(op_reg), "r"(val1_reg),
"r"(timeout_reg), "r"(syscall_number)
: "memory");
return result;
#else
const int r = syscall(SYS_futex, addr1, op, val1, timeout);
if (r == -1) return -errno;
return r;
#endif
}
inline int sys_futex_wake(aos_futex *addr1, int val1) {
#if ARM_EABI_INLINE_SYSCALL
register aos_futex *addr1_reg __asm__("r0") = addr1;
register int op_reg __asm__("r1") = FUTEX_WAKE;
register int val1_reg __asm__("r2") = val1;
register int syscall_number __asm__("r7") = SYS_futex;
register int result __asm__("r0");
__asm__ volatile("swi #0"
: "=r"(result)
: "r"(addr1_reg), "r"(op_reg), "r"(val1_reg),
"r"(syscall_number)
: "memory");
return result;
#elif AARCH64_INLINE_SYSCALL
register aos_futex *addr1_reg __asm__("x0") = addr1;
register int op_reg __asm__("x1") = FUTEX_WAKE;
register int val1_reg __asm__("x2") = val1;
register int syscall_number __asm__("x8") = SYS_futex;
register int result __asm__("x0");
__asm__ volatile("svc #0"
: "=r"(result)
: "r"(addr1_reg), "r"(op_reg), "r"(val1_reg),
"r"(syscall_number)
: "memory");
return result;
#else
const int r = syscall(SYS_futex, addr1, FUTEX_WAKE, val1);
if (r == -1) return -errno;
return r;
#endif
}
inline int sys_futex_cmp_requeue_pi(aos_futex *addr1, int num_wake,
int num_requeue, aos_futex *m,
uint32_t val) {
#if ARM_EABI_INLINE_SYSCALL
register aos_futex *addr1_reg __asm__("r0") = addr1;
register int op_reg __asm__("r1") = FUTEX_CMP_REQUEUE_PI;
register int num_wake_reg __asm__("r2") = num_wake;
register int num_requeue_reg __asm__("r3") = num_requeue;
register aos_futex *m_reg __asm__("r4") = m;
register uint32_t val_reg __asm__("r5") = val;
register int syscall_number __asm__("r7") = SYS_futex;
register int result __asm__("r0");
__asm__ volatile("swi #0"
: "=r"(result)
: "r"(addr1_reg), "r"(op_reg), "r"(num_wake_reg),
"r"(num_requeue_reg), "r"(m_reg), "r"(val_reg),
"r"(syscall_number)
: "memory");
return result;
#elif AARCH64_INLINE_SYSCALL
register aos_futex *addr1_reg __asm__("x0") = addr1;
register int op_reg __asm__("x1") = FUTEX_CMP_REQUEUE_PI;
register int num_wake_reg __asm__("x2") = num_wake;
register int num_requeue_reg __asm__("x3") = num_requeue;
register aos_futex *m_reg __asm__("x4") = m;
register uint32_t val_reg __asm__("x5") = val;
register int syscall_number __asm__("x8") = SYS_futex;
register int result __asm__("x0");
__asm__ volatile("svc #0"
: "=r"(result)
: "r"(addr1_reg), "r"(op_reg), "r"(num_wake_reg),
"r"(num_requeue_reg), "r"(m_reg), "r"(val_reg),
"r"(syscall_number)
: "memory");
return result;
#else
const int r = syscall(SYS_futex, addr1, FUTEX_CMP_REQUEUE_PI, num_wake,
num_requeue, m, val);
if (r == -1) return -errno;
return r;
#endif
}
inline int sys_futex_wait_requeue_pi(aos_condition *addr1, uint32_t start_val,
const struct timespec *timeout,
aos_futex *m) {
#if ARM_EABI_INLINE_SYSCALL
register aos_condition *addr1_reg __asm__("r0") = addr1;
register int op_reg __asm__("r1") = FUTEX_WAIT_REQUEUE_PI;
register uint32_t start_val_reg __asm__("r2") = start_val;
register const struct timespec *timeout_reg __asm__("r3") = timeout;
register aos_futex *m_reg __asm__("r4") = m;
register int syscall_number __asm__("r7") = SYS_futex;
register int result __asm__("r0");
__asm__ volatile("swi #0"
: "=r"(result)
: "r"(addr1_reg), "r"(op_reg), "r"(start_val_reg),
"r"(timeout_reg), "r"(m_reg), "r"(syscall_number)
: "memory");
return result;
#elif AARCH64_INLINE_SYSCALL
register aos_condition *addr1_reg __asm__("x0") = addr1;
register int op_reg __asm__("x1") = FUTEX_WAIT_REQUEUE_PI;
register uint32_t start_val_reg __asm__("x2") = start_val;
register const struct timespec *timeout_reg __asm__("x3") = timeout;
register aos_futex *m_reg __asm__("x4") = m;
register int syscall_number __asm__("x8") = SYS_futex;
register int result __asm__("x0");
__asm__ volatile("svc #0"
: "=r"(result)
: "r"(addr1_reg), "r"(op_reg), "r"(start_val_reg),
"r"(timeout_reg), "r"(m_reg), "r"(syscall_number)
: "memory");
return result;
#else
const int r =
syscall(SYS_futex, addr1, FUTEX_WAIT_REQUEUE_PI, start_val, timeout, m);
if (r == -1) return -errno;
return r;
#endif
}
inline int sys_futex_unlock_pi(aos_futex *addr1) {
#if ARM_EABI_INLINE_SYSCALL
register aos_futex *addr1_reg __asm__("r0") = addr1;
register int op_reg __asm__("r1") = FUTEX_UNLOCK_PI;
register int syscall_number __asm__("r7") = SYS_futex;
register int result __asm__("r0");
__asm__ volatile("swi #0"
: "=r"(result)
: "r"(addr1_reg), "r"(op_reg), "r"(syscall_number)
: "memory");
return result;
#elif AARCH64_INLINE_SYSCALL
register aos_futex *addr1_reg __asm__("x0") = addr1;
register int op_reg __asm__("x1") = FUTEX_UNLOCK_PI;
register int syscall_number __asm__("x8") = SYS_futex;
register int result __asm__("x0");
__asm__ volatile("svc #0"
: "=r"(result)
: "r"(addr1_reg), "r"(op_reg), "r"(syscall_number)
: "memory");
return result;
#else
const int r = syscall(SYS_futex, addr1, FUTEX_UNLOCK_PI);
if (r == -1) return -errno;
return r;
#endif
}
// Returns the previous value of f.
inline uint32_t compare_and_swap_val(aos_futex *f, uint32_t before,
uint32_t after) {
#ifdef AOS_SANITIZER_thread
// This is a workaround for <https://llvm.org/bugs/show_bug.cgi?id=23176>.
// Basically, most of the atomic operations are broken under tsan, but this
// particular one isn't.
// TODO(Brian): Remove this #ifdef (and the one in compare_and_swap) once we
// don't have to worry about tsan with this bug any more.
uint32_t before_value = before;
__tsan_atomic32_compare_exchange_strong(
reinterpret_cast<int32_t *>(f),
reinterpret_cast<int32_t *>(&before_value), after,
__tsan_memory_order_seq_cst, __tsan_memory_order_seq_cst);
return before_value;
#else
return __sync_val_compare_and_swap(f, before, after);
#endif
}
// Returns true if it succeeds and false if it fails.
inline bool compare_and_swap(aos_futex *f, uint32_t before, uint32_t after) {
#ifdef AOS_SANITIZER_thread
return compare_and_swap_val(f, before, after) == before;
#else
return __sync_bool_compare_and_swap(f, before, after);
#endif
}
#ifdef AOS_SANITIZER_thread
// Simple macro for checking something which should always be true.
// Using the standard CHECK macro isn't safe because failures often result in
// reentering the mutex locking code, which doesn't work.
#define SIMPLE_CHECK(expr) \
do { \
if (!(expr)) { \
fprintf(stderr, "%s: %d: SIMPLE_CHECK(" #expr ") failed!\n", __FILE__, \
__LINE__); \
abort(); \
} \
} while (false)
// Forcibly initializes the pthread mutex for *m.
// This sequence of operations is only safe for the simpler kinds of mutexes in
// glibc's pthreads implementation on Linux.
void init_pthread_mutex(aos_mutex *m) {
// Re-initialize the mutex so the destroy won't fail if it's locked.
// tsan ignores this.
SIMPLE_CHECK(0 == pthread_mutex_init(&m->pthread_mutex, nullptr));
// Destroy the mutex so tsan will forget about it if some now-dead thread
// locked it.
SIMPLE_CHECK(0 == pthread_mutex_destroy(&m->pthread_mutex));
// Now actually initialize it, making sure it's process-shareable so it works
// correctly across shared memory.
pthread_mutexattr_t attr;
SIMPLE_CHECK(0 == pthread_mutexattr_init(&attr));
SIMPLE_CHECK(0 == pthread_mutexattr_setpshared(&attr, true));
SIMPLE_CHECK(0 == pthread_mutex_init(&m->pthread_mutex, &attr));
SIMPLE_CHECK(0 == pthread_mutexattr_destroy(&attr));
}
// Locks the pthread mutex for *m.
// If a stack trace ever reveals the pthread_mutex_lock call in here blocking,
// there is a bug in our mutex code or the way somebody is calling it.
void lock_pthread_mutex(aos_mutex *m) {
if (!m->pthread_mutex_init) {
init_pthread_mutex(m);
m->pthread_mutex_init = true;
}
SIMPLE_CHECK(0 == pthread_mutex_lock(&m->pthread_mutex));
}
// Forcibly locks the pthread mutex for *m.
// This will (somewhat hackily) rip the lock out from underneath somebody else
// who is already holding it.
void force_lock_pthread_mutex(aos_mutex *m) {
if (!m->pthread_mutex_init) {
init_pthread_mutex(m);
m->pthread_mutex_init = true;
}
const int trylock_result = pthread_mutex_trylock(&m->pthread_mutex);
SIMPLE_CHECK(trylock_result == 0 || trylock_result == EBUSY);
if (trylock_result == 0) {
// We're good, so unlock it and then go for a real lock down below.
SIMPLE_CHECK(0 == pthread_mutex_unlock(&m->pthread_mutex));
} else {
// Somebody (should always be somebody else who died with it held) already
// has it, so make tsan forget about that.
init_pthread_mutex(m);
}
lock_pthread_mutex(m);
}
// Unlocks the pthread mutex for *m.
void unlock_pthread_mutex(aos_mutex *m) {
assert(m->pthread_mutex_init);
SIMPLE_CHECK(0 == pthread_mutex_unlock(&m->pthread_mutex));
}
#else
// Empty implementations of all these so the code below doesn't need #ifdefs.
static inline void lock_pthread_mutex(aos_mutex *) {}
static inline void force_lock_pthread_mutex(aos_mutex *) {}
static inline void unlock_pthread_mutex(aos_mutex *) {}
#endif
pid_t do_get_tid() {
pid_t r = syscall(SYS_gettid);
assert(r > 0);
return r;
}
// This gets called by functions before LOG(FATAL)ing with error messages
// that would be incorrect if the error was caused by a process forking without
// initialize_in_new_thread getting called in the fork.
void check_cached_tid(pid_t tid) {
pid_t actual = do_get_tid();
if (tid != actual) {
LOG(FATAL) << "task " << static_cast<intmax_t>(tid) << " forked into "
<< static_cast<intmax_t>(actual)
<< " without letting aos_sync know so we're not really sure "
"what's going on";
}
}
// Starts off at 0 in each new thread (because that's what it gets initialized
// to in most of them or it gets to reset to 0 after a fork by atfork_child()).
thread_local pid_t my_tid = 0;
// Gets called before the fork(2) wrapper function returns in the child.
void atfork_child() {
// The next time get_tid() is called, it will set everything up again.
my_tid = 0;
}
void InstallAtforkHook() {
PCHECK(pthread_atfork(NULL, NULL, &atfork_child) == 0)
<< ": pthread_atfork(NULL, NULL, "
<< reinterpret_cast<void *>(&atfork_child) << ") failed";
}
// This gets called to set everything up in a new thread by get_tid().
void initialize_in_new_thread();
// Gets the current thread's TID and does all of the 1-time initialization the
// first time it's called in a given thread.
inline uint32_t get_tid() {
if (__builtin_expect(my_tid == 0, false)) {
initialize_in_new_thread();
}
static_assert(sizeof(my_tid) <= sizeof(uint32_t), "pid_t is too big");
return static_cast<uint32_t>(my_tid);
}
// Contains all of the stuff for dealing with the robust list. Nothing outside
// this namespace should touch anything inside it except Init, Adder, and
// Remover.
namespace my_robust_list {
static_assert(offsetof(aos_mutex, next) == 0,
"Our math all assumes that the beginning of a mutex and its next "
"pointer are at the same place in memory.");
// Our version of robust_list_head.
// This is copied from the kernel header because that's a pretty stable ABI (and
// any changes will be backwards compatible anyways) and we want ours to have
// different types.
// The uintptr_ts are &next of the elements in the list (with stuff |ed in).
struct aos_robust_list_head {
uintptr_t next;
long futex_offset;
uintptr_t pending_next;
};
static_assert(offsetof(aos_robust_list_head, next) ==
offsetof(robust_list_head, list),
"Our aos_robust_list_head doesn't match the kernel's");
static_assert(offsetof(aos_robust_list_head, futex_offset) ==
offsetof(robust_list_head, futex_offset),
"Our aos_robust_list_head doesn't match the kernel's");
static_assert(offsetof(aos_robust_list_head, pending_next) ==
offsetof(robust_list_head, list_op_pending),
"Our aos_robust_list_head doesn't match the kernel's");
static_assert(sizeof(aos_robust_list_head) == sizeof(robust_list_head),
"Our aos_robust_list_head doesn't match the kernel's");
thread_local aos_robust_list_head robust_head;
// Extra offset between mutex values and where we point to for their robust list
// entries (from SetRobustListOffset).
uintptr_t robust_list_offset = 0;
// The value to OR each pointer's value with whenever putting it into the robust
// list (technically only if it's PI, but all of ours are, so...).
static const uintptr_t kRobustListOr = 1;
// Returns the value which goes into a next variable to represent the head.
inline uintptr_t robust_head_next_value() {
return reinterpret_cast<uintptr_t>(&robust_head.next);
}
// Returns true iff next represents the head.
inline bool next_is_head(uintptr_t next) {
return next == robust_head_next_value();
}
// Returns the (psuedo-)mutex corresponding to the head.
// This does NOT have a previous pointer, so be careful with the return value.
inline aos_mutex *robust_head_mutex() {
return reinterpret_cast<aos_mutex *>(robust_head_next_value());
}
inline uintptr_t mutex_to_next(aos_mutex *m) {
return (reinterpret_cast<uintptr_t>(&m->next) + robust_list_offset) |
kRobustListOr;
}
inline aos_mutex *next_to_mutex(uintptr_t next) {
if (__builtin_expect(robust_list_offset != 0, false) && next_is_head(next)) {
// We don't offset the head pointer, so be careful.
return reinterpret_cast<aos_mutex *>(next);
}
return reinterpret_cast<aos_mutex *>((next & ~kRobustListOr) -
robust_list_offset);
}
// Sets up the robust list for each thread.
void Init() {
// It starts out just pointing back to itself.
robust_head.next = robust_head_next_value();
robust_head.futex_offset = static_cast<ssize_t>(offsetof(aos_mutex, futex)) -
static_cast<ssize_t>(offsetof(aos_mutex, next));
robust_head.pending_next = 0;
PCHECK(syscall(SYS_set_robust_list, robust_head_next_value(),
sizeof(robust_head)) == 0)
<< ": set_robust_list(" << reinterpret_cast<void *>(robust_head.next)
<< ", " << sizeof(robust_head) << ") failed";
if (kRobustListDebug) {
printf("%" PRId32 ": init done\n", get_tid());
}
}
// Updating the offset with locked mutexes is important during robustness
// testing, because there are mutexes which are locked before this is set to a
// non-0 value and then unlocked after it is changed back. However, to make sure
// the code works correctly when manipulating the next pointer of the last of
// those mutexes, all of their next values have to be adjusted appropriately.
void SetRobustListOffset(uintptr_t offset) {
const uintptr_t offset_change = offset - robust_list_offset;
robust_list_offset = offset;
aos_mutex *m = robust_head_mutex();
// Update the offset contained in each of the mutexes which is already locked.
while (!next_is_head(m->next)) {
m->next += offset_change;
m = next_to_mutex(m->next);
}
}
bool HaveLockedMutexes() {
return robust_head.next != robust_head_next_value();
}
// Handles adding a mutex to the robust list.
// The idea is to create one of these at the beginning of a function that needs
// to do this and then call Add() iff it should actually be added.
class Adder {
public:
Adder(aos_mutex *m) : m_(m) {
assert(robust_head.pending_next == 0);
if (kRobustListDebug) {
printf("%" PRId32 ": maybe add %p\n", get_tid(), m_);
}
robust_head.pending_next = mutex_to_next(m);
aos_compiler_memory_barrier();
}
~Adder() {
assert(robust_head.pending_next == mutex_to_next(m_));
if (kRobustListDebug) {
printf("%" PRId32 ": done maybe add %p, n=%p p=%p\n", get_tid(), m_,
next_to_mutex(m_->next), m_->previous);
}
aos_compiler_memory_barrier();
robust_head.pending_next = 0;
}
void Add() {
assert(robust_head.pending_next == mutex_to_next(m_));
if (kRobustListDebug) {
printf("%" PRId32 ": adding %p\n", get_tid(), m_);
}
const uintptr_t old_head_next_value = robust_head.next;
m_->next = old_head_next_value;
aos_compiler_memory_barrier();
robust_head.next = mutex_to_next(m_);
m_->previous = robust_head_mutex();
if (!next_is_head(old_head_next_value)) {
// robust_head's psuedo-mutex doesn't have a previous pointer to update.
next_to_mutex(old_head_next_value)->previous = m_;
}
aos_compiler_memory_barrier();
if (kRobustListDebug) {
printf("%" PRId32 ": done adding %p\n", get_tid(), m_);
}
}
private:
aos_mutex *const m_;
DISALLOW_COPY_AND_ASSIGN(Adder);
};
// Handles removing a mutex from the robust list.
// The idea is to create one of these at the beginning of a function that needs
// to do this.
class Remover {
public:
Remover(aos_mutex *m) {
assert(robust_head.pending_next == 0);
if (kRobustListDebug) {
printf("%" PRId32 ": beginning to remove %p, n=%p p=%p\n", get_tid(), m,
next_to_mutex(m->next), m->previous);
}
robust_head.pending_next = mutex_to_next(m);
aos_compiler_memory_barrier();
aos_mutex *const previous = m->previous;
const uintptr_t next_value = m->next;
previous->next = m->next;
if (!next_is_head(next_value)) {
// robust_head's psuedo-mutex doesn't have a previous pointer to update.
next_to_mutex(next_value)->previous = previous;
}
if (kRobustListDebug) {
printf("%" PRId32 ": done removing %p\n", get_tid(), m);
}
}
~Remover() {
assert(robust_head.pending_next != 0);
aos_compiler_memory_barrier();
robust_head.pending_next = 0;
if (kRobustListDebug) {
printf("%" PRId32 ": done with removal\n", get_tid());
}
}
private:
DISALLOW_COPY_AND_ASSIGN(Remover);
};
} // namespace my_robust_list
void initialize_in_new_thread() {
// No synchronization necessary in most of this because it's all thread-local!
my_tid = do_get_tid();
static absl::once_flag once;
absl::call_once(once, InstallAtforkHook);
my_robust_list::Init();
}
// Finishes the locking of a mutex by potentially clearing FUTEX_OWNER_DIED in
// the futex and returning the correct value.
inline int mutex_finish_lock(aos_mutex *m) {
const uint32_t value = __atomic_load_n(&m->futex, __ATOMIC_ACQUIRE);
if (__builtin_expect((value & FUTEX_OWNER_DIED) != 0, false)) {
__atomic_and_fetch(&m->futex, ~FUTEX_OWNER_DIED, __ATOMIC_RELAXED);
force_lock_pthread_mutex(m);
return 1;
} else {
lock_pthread_mutex(m);
return 0;
}
}
// Split out separately from mutex_get so condition_wait can call it and use its
// own my_robust_list::Adder.
inline int mutex_do_get(aos_mutex *m, bool signals_fail,
const struct timespec *timeout, uint32_t tid) {
RunShmObservers run_observers(m, true);
if (kPrintOperations) {
printf("%" PRId32 ": %p do_get\n", tid, m);
}
while (true) {
// If the atomic 0->TID transition fails.
if (!compare_and_swap(&m->futex, 0, tid)) {
// Wait in the kernel, which handles atomically ORing in FUTEX_WAITERS
// before actually sleeping.
const int ret = sys_futex_wait(FUTEX_LOCK_PI, &m->futex, 1, timeout);
if (ret != 0) {
if (timeout != NULL && ret == -ETIMEDOUT) {
return 3;
}
if (__builtin_expect(ret == -EINTR, true)) {
if (signals_fail) {
return 2;
} else {
continue;
}
}
my_robust_list::robust_head.pending_next = 0;
CHECK_NE(ret, -EDEADLK) << ": multiple lock of " << m << " by " << tid;
errno = -ret;
PLOG(FATAL) << "FUTEX_LOCK_PI(" << &m->futex
<< "(=" << __atomic_load_n(&m->futex, __ATOMIC_SEQ_CST)
<< "), 1, " << timeout << ") failed";
} else {
if (kLockDebug) {
printf("%" PRId32 ": %p kernel lock done\n", tid, m);
}
// The kernel already handled setting the value to our TID (ish).
break;
}
} else {
if (kLockDebug) {
printf("%" PRId32 ": %p fast lock done\n", tid, m);
}
lock_pthread_mutex(m);
// Fastpath succeeded, so no need to call into the kernel.
// Because this is the fastpath, it's a good idea to avoid even having to
// load the value again down below.
return 0;
}
}
return mutex_finish_lock(m);
}
// The common implementation for everything that wants to lock a mutex.
// If signals_fail is false, the function will try again if the wait syscall is
// interrupted by a signal.
// timeout can be NULL for no timeout.
inline int mutex_get(aos_mutex *m, bool signals_fail,
const struct timespec *timeout) {
const uint32_t tid = get_tid();
my_robust_list::Adder adder(m);
const int r = mutex_do_get(m, signals_fail, timeout, tid);
if (r == 0 || r == 1) adder.Add();
return r;
}
// The common implementation for broadcast and signal.
// number_requeue is the number of waiters to requeue (probably INT_MAX or 0). 1
// will always be woken.
void condition_wake(aos_condition *c, aos_mutex *m, int number_requeue) {
RunShmObservers run_observers(c, true);
// Make it so that anybody just going to sleep won't.
// This is where we might accidentally wake more than just 1 waiter with 1
// signal():
// 1 already sleeping will be woken but n might never actually make it to
// sleep in the kernel because of this.
uint32_t new_value = __atomic_add_fetch(c, 1, __ATOMIC_SEQ_CST);
while (true) {
// This really wants to be FUTEX_REQUEUE_PI, but the kernel doesn't have
// that... However, the code to support that is in the kernel, so it might
// be a good idea to patch it to support that and use it iff it's there.
const int ret =
sys_futex_cmp_requeue_pi(c, 1, number_requeue, &m->futex, new_value);
if (ret < 0) {
// If the value got changed out from under us (aka somebody else did a
// condition_wake).
if (__builtin_expect(ret == -EAGAIN, true)) {
// If we're doing a broadcast, the other guy might have done a signal
// instead, so we have to try again.
// If we're doing a signal, we have to go again to make sure that 2
// signals wake 2 processes.
new_value = __atomic_load_n(c, __ATOMIC_RELAXED);
continue;
}
my_robust_list::robust_head.pending_next = 0;
errno = -ret;
PLOG(FATAL) << "FUTEX_CMP_REQUEUE_PI(" << c << ", 1, " << number_requeue
<< ", " << &m->futex << ", *" << c << ") failed";
} else {
return;
}
}
}
} // namespace
int mutex_lock(aos_mutex *m) { return mutex_get(m, true, NULL); }
int mutex_lock_timeout(aos_mutex *m, const struct timespec *timeout) {
return mutex_get(m, true, timeout);
}
int mutex_grab(aos_mutex *m) { return mutex_get(m, false, NULL); }
void mutex_unlock(aos_mutex *m) {
RunShmObservers run_observers(m, true);
const uint32_t tid = get_tid();
if (kPrintOperations) {
printf("%" PRId32 ": %p unlock\n", tid, m);
}
const uint32_t value = __atomic_load_n(&m->futex, __ATOMIC_SEQ_CST);
if (__builtin_expect((value & FUTEX_TID_MASK) != tid, false)) {
my_robust_list::robust_head.pending_next = 0;
check_cached_tid(tid);
if ((value & FUTEX_TID_MASK) == 0) {
LOG(FATAL) << "multiple unlock of aos_mutex " << m << " by " << tid;
} else {
LOG(FATAL) << "aos_mutex " << m << " is locked by "
<< (value & FUTEX_TID_MASK) << ", not " << tid;
}
}
my_robust_list::Remover remover(m);
unlock_pthread_mutex(m);
// If the atomic TID->0 transition fails (ie FUTEX_WAITERS is set),
if (!compare_and_swap(&m->futex, tid, 0)) {
// The kernel handles everything else.
const int ret = sys_futex_unlock_pi(&m->futex);
if (ret != 0) {
my_robust_list::robust_head.pending_next = 0;
errno = -ret;
PLOG(FATAL) << "FUTEX_UNLOCK_PI(" << (&m->futex) << ") failed";
}
} else {
// There aren't any waiters, so no need to call into the kernel.
}
}
int mutex_trylock(aos_mutex *m) {
RunShmObservers run_observers(m, true);
const uint32_t tid = get_tid();
if (kPrintOperations) {
printf("%" PRId32 ": %p trylock\n", tid, m);
}
my_robust_list::Adder adder(m);
// Try an atomic 0->TID transition.
uint32_t c = compare_and_swap_val(&m->futex, 0, tid);
if (c != 0) {
if (__builtin_expect((c & FUTEX_OWNER_DIED) == 0, true)) {
// Somebody else had it locked; we failed.
return 4;
} else {
// FUTEX_OWNER_DIED was set, so we have to call into the kernel to deal
// with resetting it.
const int ret = sys_futex_wait(FUTEX_TRYLOCK_PI, &m->futex, 0, NULL);
if (ret == 0) {
adder.Add();
// Only clear the owner died if somebody else didn't do the recovery
// and then unlock before our TRYLOCK happened.
return mutex_finish_lock(m);
} else {
// EWOULDBLOCK means that somebody else beat us to it.
if (__builtin_expect(ret == -EWOULDBLOCK, true)) {
return 4;
}
my_robust_list::robust_head.pending_next = 0;
errno = -ret;
PLOG(FATAL) << "FUTEX_TRYLOCK_PI(" << (&m->futex)
<< ", 0, NULL) failed";
}
}
}
lock_pthread_mutex(m);
adder.Add();
return 0;
}
bool mutex_islocked(const aos_mutex *m) {
const uint32_t tid = get_tid();
const uint32_t value = __atomic_load_n(&m->futex, __ATOMIC_RELAXED);
return (value & FUTEX_TID_MASK) == tid;
}
void death_notification_init(aos_mutex *m) {
const uint32_t tid = get_tid();
if (kPrintOperations) {
printf("%" PRId32 ": %p death_notification start\n", tid, m);
}
my_robust_list::Adder adder(m);
{
RunShmObservers run_observers(m, true);
CHECK(compare_and_swap(&m->futex, 0, tid));
}
adder.Add();
}
void death_notification_release(aos_mutex *m) {
RunShmObservers run_observers(m, true);
#ifndef NDEBUG
// Verify it's "locked", like it should be.
{
const uint32_t tid = get_tid();
if (kPrintOperations) {
printf("%" PRId32 ": %p death_notification release\n", tid, m);
}
const uint32_t value = __atomic_load_n(&m->futex, __ATOMIC_SEQ_CST);
assert((value & ~FUTEX_WAITERS) == tid);
}
#endif
my_robust_list::Remover remover(m);
ANNOTATE_HAPPENS_BEFORE(m);
const int ret = sys_futex_unlock_pi(&m->futex);
if (ret != 0) {
my_robust_list::robust_head.pending_next = 0;
errno = -ret;
PLOG(FATAL) << "FUTEX_UNLOCK_PI(" << &m->futex << ") failed";
}
}
int condition_wait(aos_condition *c, aos_mutex *m, struct timespec *end_time) {
RunShmObservers run_observers(c, false);
const uint32_t tid = get_tid();
const uint32_t wait_start = __atomic_load_n(c, __ATOMIC_SEQ_CST);
mutex_unlock(m);
my_robust_list::Adder adder(m);
while (true) {
// Wait in the kernel iff the value of it doesn't change (ie somebody else
// does a wake) from before we unlocked the mutex.
int ret = sys_futex_wait_requeue_pi(c, wait_start, end_time, &m->futex);
if (ret != 0) {
// Timed out waiting. Signal that back up to the user.
if (__builtin_expect(ret == -ETIMEDOUT, true)) {
// We have to relock it ourself because the kernel didn't do it.
const int r = mutex_do_get(m, false, nullptr, tid);
assert(__builtin_expect(r == 0 || r == 1, true));
adder.Add();
// OWNER_DIED takes priority. Pass it on if we found it.
if (r == 1) return r;
// Otherwise communicate that we were interrupted.
return -1;
}
// If it failed because somebody else did a wake and changed the value
// before we actually made it to sleep.
if (__builtin_expect(ret == -EAGAIN, true)) {
// There's no need to unconditionally set FUTEX_WAITERS here if we're
// using REQUEUE_PI because the kernel automatically does that in the
// REQUEUE_PI iff it requeued anybody.
// If we're not using REQUEUE_PI, then everything is just normal locks
// etc, so there's no need to do anything special there either.
// We have to relock it ourself because the kernel didn't do it.
const int r = mutex_do_get(m, false, nullptr, tid);
assert(__builtin_expect(r == 0 || r == 1, true));
adder.Add();
return r;
}
// Try again if it was because of a signal.
if (__builtin_expect((ret == -EINTR), true)) {
continue;
}
my_robust_list::robust_head.pending_next = 0;
errno = -ret;
PLOG(FATAL) << "FUTEX_WAIT_REQUEUE_PI(" << c << ", " << wait_start << ", "
<< (&m->futex) << ") failed";
} else {
// Record that the kernel relocked it for us.
lock_pthread_mutex(m);
// We succeeded in waiting, and the kernel took care of locking the
// mutex
// for us and setting FUTEX_WAITERS iff it needed to (for REQUEUE_PI).
adder.Add();
const uint32_t value = __atomic_load_n(&m->futex, __ATOMIC_RELAXED);
if (__builtin_expect((value & FUTEX_OWNER_DIED) != 0, false)) {
__atomic_and_fetch(&m->futex, ~FUTEX_OWNER_DIED, __ATOMIC_RELAXED);
return 1;
} else {
return 0;
}
}
}
}
void condition_signal(aos_condition *c, aos_mutex *m) {
condition_wake(c, m, 0);
}
void condition_broadcast(aos_condition *c, aos_mutex *m) {
condition_wake(c, m, INT_MAX);
}
int futex_wait_timeout(aos_futex *m, const struct timespec *timeout) {
RunShmObservers run_observers(m, false);
const int ret = sys_futex_wait(FUTEX_WAIT, m, 0, timeout);
if (ret != 0) {
if (ret == -EINTR) {
return 1;
} else if (ret == -ETIMEDOUT) {
return 2;
} else if (ret != -EWOULDBLOCK) {
errno = -ret;
return -1;
}
}
ANNOTATE_HAPPENS_AFTER(m);
return 0;
}
int futex_wait(aos_futex *m) { return futex_wait_timeout(m, NULL); }
int futex_set_value(aos_futex *m, uint32_t value) {
RunShmObservers run_observers(m, false);
ANNOTATE_HAPPENS_BEFORE(m);
__atomic_store_n(m, value, __ATOMIC_SEQ_CST);
const int r = sys_futex_wake(m, INT_MAX - 4096);
if (__builtin_expect(
static_cast<unsigned int>(r) > static_cast<unsigned int>(-4096),
false)) {
errno = -r;
return -1;
} else {
return r;
}
}
int futex_set(aos_futex *m) { return futex_set_value(m, 1); }
int futex_unset(aos_futex *m) {
return !__atomic_exchange_n(m, 0, __ATOMIC_SEQ_CST);
}
namespace aos::linux_code::ipc_lib {
// Sets an extra offset between mutexes and the value we use for them in the
// robust list (only the forward pointers). This is used to work around a kernel
// bug by keeping a second set of mutexes which is always writable so the kernel
// won't go into an infinite loop when trying to unlock them.
void SetRobustListOffset(ptrdiff_t offset) {
my_robust_list::SetRobustListOffset(offset);
}
// Returns true iff there are any mutexes locked by the current thread.
// This is mainly useful for testing.
bool HaveLockedMutexes() { return my_robust_list::HaveLockedMutexes(); }
} // namespace aos::linux_code::ipc_lib