redo the aos_sync API and add PI support
Previously, it didn't have different types for mutexes vs futexes and
the one common type was poorly named. It's hard to split that change out
from adding PI support, so they're both together. Adding PI support is
important because we have many places (ie the logging queue) where high-priority
and low-priority code interact heavily.
This change adds some small parts of robustness support, but they all
result in CHECK/assert failures if triggered.
Change-Id: I841ccee52568c32d453ed14f930430debbd8d78e
diff --git a/aos/linux_code/ipc_lib/aos_sync.c b/aos/linux_code/ipc_lib/aos_sync.c
deleted file mode 100644
index 569b73d..0000000
--- a/aos/linux_code/ipc_lib/aos_sync.c
+++ /dev/null
@@ -1,272 +0,0 @@
-#include "aos/linux_code/ipc_lib/aos_sync.h"
-
-#include <linux/futex.h>
-#include <unistd.h>
-#include <sys/syscall.h>
-#include <errno.h>
-#include <stdint.h>
-#include <limits.h>
-#include <string.h>
-#include <inttypes.h>
-
-#include "aos/common/logging/logging.h"
-
-// TODO(brians): Inline this in the new PI version.
-#define cmpxchg(ptr, o, n) __sync_val_compare_and_swap(ptr, o, n)
-
-#define ARM_EABI_INLINE_SYSCALL defined(__ARM_EABI__)
-
-// this code is based on something that appears to be based on
-// <http://www.akkadia.org/drepper/futex.pdf>, which also has a lot of useful
-// information
-// should probably use
-// <http://lxr.linux.no/linux+v2.6.34/Documentation/robust-futexes.txt> once it
-// becomes available
-// (sys_set_robust_list appears to be the function name)
-// <http://locklessinc.com/articles/futex_cheat_sheet/> and
-// <http://locklessinc.com/articles/mutex_cv_futex/> are useful
-// <http://lwn.net/Articles/360699/> has a nice overview of futexes in late 2009
-// (fairly recent compared to everything else...)
-// can't use PRIVATE futex operations because they use the pid (or something) as
-// part of the hash
-//
-// ThreadSanitizer understands how these mutexes etc work. It appears to be able
-// to figure out the happens-before relationship from the __ATOMIC_SEQ_CST
-// atomic primitives.
-//
-// Remember that EAGAIN and EWOUDBLOCK are the same! (ie if you get EAGAIN from
-// FUTEX_WAIT, the docs call it EWOULDBLOCK...)
-//
-// Values for a mutex:
-// 0 = unlocked
-// 1 = locked, not contended
-// 2 = locked, probably contended
-// Values for a "futex":
-// 0 = unset
-// 1 = set
-
-// These sys_futex_* functions are wrappers around syscall(SYS_futex). They each
-// take a specific set of arguments for a given futex operation. They return the
-// result or a negated errno value. -1..-4095 mean errors and not successful
-// results, which is guaranteed by the kernel.
-// They each have optimized versions for ARM EABI (the syscall interface is
-// different for non-EABI ARM, so that is the right thing to test for) that
-// don't go through syscall(2) or errno.
-
-static inline int sys_futex_wait(mutex *addr1, int val1,
- const struct timespec *timeout) {
-#if ARM_EABI_INLINE_SYSCALL
- register mutex *addr1_reg __asm__("r0") = addr1;
- register int op_reg __asm__("r1") = FUTEX_WAIT;
- register int val1_reg __asm__("r2") = val1;
- register const struct timespec *timeout_reg __asm__("r3") = timeout;
- register int syscall_number __asm__("r7") = SYS_futex;
- register int result __asm__("r0");
- __asm__ volatile("swi #0"
- : "=r"(result)
- : "r"(addr1_reg), "r"(op_reg), "r"(val1_reg),
- "r"(timeout_reg), "r"(syscall_number)
- : "memory");
- return result;
-#else
- const int r = syscall(SYS_futex, addr1, FUTEX_WAIT, val1, timeout);
- if (r == -1) return -errno;
- return r;
-#endif
-}
-
-static inline int sys_futex_wake(mutex *addr1, int val1) {
-#if ARM_EABI_INLINE_SYSCALL
- register mutex *addr1_reg __asm__("r0") = addr1;
- register int op_reg __asm__("r1") = FUTEX_WAKE;
- register int val1_reg __asm__("r2") = val1;
- register int syscall_number __asm__("r7") = SYS_futex;
- register int result __asm__("r0");
- __asm__ volatile("swi #0"
- : "=r"(result)
- : "r"(addr1_reg), "r"(op_reg), "r"(val1_reg),
- "r"(syscall_number)
- : "memory");
- return result;
-#else
- const int r = syscall(SYS_futex, addr1, FUTEX_WAKE, val1);
- if (r == -1) return -errno;
- return r;
-#endif
-}
-
-static inline int sys_futex_requeue(mutex *addr1, int num_wake,
- int num_requeue, mutex *m) {
-#if ARM_EABI_INLINE_SYSCALL
- register mutex *addr1_reg __asm__("r0") = addr1;
- register int op_reg __asm__("r1") = FUTEX_REQUEUE;
- register int num_wake_reg __asm__("r2") = num_wake;
- register int num_requeue_reg __asm__("r3") = num_requeue;
- register mutex *m_reg __asm__("r4") = m;
- register int syscall_number __asm__("r7") = SYS_futex;
- register int result __asm__("r0");
- __asm__ volatile("swi #0"
- : "=r"(result)
- : "r"(addr1_reg), "r"(op_reg), "r"(num_wake_reg),
- "r"(num_requeue_reg), "r"(m_reg), "r"(syscall_number)
- : "memory");
- return result;
-#else
- const int r =
- syscall(SYS_futex, addr1, FUTEX_REQUEUE, num_wake, num_requeue, m);
- if (r == -1) return -errno;
- return r;
-#endif
-}
-
-static inline int mutex_get(mutex *m, uint8_t signals_fail, const
- struct timespec *timeout) {
- int c;
- c = cmpxchg(m, 0, 1);
- if (!c) return 0;
- /* The lock is now contended */
- if (c == 1) c = __atomic_exchange_n(m, 2, __ATOMIC_SEQ_CST);
- while (c) {
- /* Wait in the kernel */
- const int ret = sys_futex_wait(m, 2, timeout);
- if (ret != 0) {
- if (signals_fail && ret == -EINTR) {
- return 1;
- }
- if (timeout != NULL && ret == -ETIMEDOUT) {
- return 2;
- }
- }
- c = __atomic_exchange_n(m, 2, __ATOMIC_SEQ_CST);
- }
- return 0;
-}
-int mutex_lock(mutex *m) {
- return mutex_get(m, 1, NULL);
-}
-int mutex_lock_timeout(mutex *m, const struct timespec *timeout) {
- return mutex_get(m, 1, timeout);
-}
-int mutex_grab(mutex *m) {
- return mutex_get(m, 0, NULL);
-}
-
-void mutex_unlock(mutex *m) {
- /* Unlock, and if not contended then exit. */
- switch (__atomic_exchange_n(m, 0, __ATOMIC_SEQ_CST)) {
- case 0:
- LOG(FATAL, "multiple unlock of %p\n", m);
- case 1:
- break;
- case 2: {
- const int ret = sys_futex_wake(m, 1);
- if (ret < 0) {
- PELOG(FATAL, -ret, "waking 1 from %p failed", m);
- } else {
- break;
- }
- }
- default:
- LOG(FATAL, "got a garbage value from mutex %p\n", m);
- }
-}
-int mutex_trylock(mutex *m) {
- /* Try to take the lock, if is currently unlocked */
- unsigned c = cmpxchg(m, 0, 1);
- if (!c) return 0;
- return 1;
-}
-
-int futex_wait(mutex *m) {
- if (__atomic_load_n(m, __ATOMIC_SEQ_CST)) {
- return 0;
- }
- const int ret = sys_futex_wait(m, 0, NULL);
- if (ret != 0) {
- if (ret == -EINTR) {
- return 1;
- } else if (ret != -EWOULDBLOCK) {
- errno = -ret;
- return -1;
- }
- }
- return 0;
-}
-int futex_set_value(mutex *m, mutex value) {
- __atomic_store_n(m, value, __ATOMIC_SEQ_CST);
- const int r = sys_futex_wake(m, INT_MAX - 4096);
- if (__builtin_expect((unsigned int)r > (unsigned int)-4096, 0)) {
- errno = -r;
- return -1;
- } else {
- return r;
- }
-}
-int futex_set(mutex *m) {
- return futex_set_value(m, 1);
-}
-int futex_unset(mutex *m) {
- return !__atomic_exchange_n(m, 0, __ATOMIC_SEQ_CST);
-}
-
-void condition_wait(mutex *c, mutex *m) {
- const mutex wait_start = __atomic_load_n(c, __ATOMIC_RELAXED);
-
- mutex_unlock(m);
-
- while (1) {
- // Wait in the kernel iff the value of it doesn't change (ie somebody else
- // does a wake) from before we unlocked the mutex.
- const int ret = sys_futex_wait(c, wait_start, NULL);
- if (ret != 0) {
- // If it failed for some reason other than somebody else doing a wake
- // before we actually made it to sleep.
- if (__builtin_expect(__atomic_load_n(c, __ATOMIC_RELAXED) == wait_start,
- 0)) {
- // Try again if it was because of a signal.
- if (ret == -EINTR) continue;
- PELOG(FATAL, -ret, "FUTEX_WAIT(%p, %" PRIu32 ", NULL, NULL, 0) failed",
- c, wait_start);
- }
- }
- // Relock the mutex now that we're done waiting.
- // Simplified mutex_lock that always leaves it
- // contended in case anybody else got requeued.
- // If we got requeued above, this will just succeed the first time because
- // the person waking us from the above wait (changed to be on the mutex
- // instead of the condition) will have just set it to 0.
- while (__atomic_exchange_n(m, 2, __ATOMIC_ACQ_REL) != 0) {
- const int ret = sys_futex_wait(m, 2, NULL);
- if (ret != 0) {
- // Try again if it was because of a signal or somebody else unlocked it
- // before we went to sleep.
- if (ret == -EINTR || ret == -EWOULDBLOCK) continue;
- PELOG(FATAL, -ret, "FUTEX_WAIT(%p, 2, NULL, NULL, 0) failed", m);
- }
- }
- return;
- }
-}
-
-void condition_signal(mutex *c) {
- // This will cause anybody else who is in between unlocking the mutex and
- // going to sleep in the kernel to not go to sleep and return immediately
- // instead.
- __sync_fetch_and_add(c, 1);
- // Wake at most 1 person who is waiting in the kernel.
- const int ret = sys_futex_wake(c, 1);
- if (ret < 0) {
- PELOG(FATAL, -ret, "FUTEX_WAKE(%p, 1, NULL, NULL, 0) failed", c);
- }
-}
-
-void condition_broadcast(mutex *c, mutex *m) {
- __sync_fetch_and_add(c, 1);
- // Wake at most 1 waiter and requeue the rest.
- // Everybody else is going to have to wait for the 1st person to take the
- // mutex anyways.
- const int ret = sys_futex_requeue(c, 1, INT_MAX, m);
- if (ret < 0) {
- PELOG(FATAL, -ret, "FUTEX_REQUEUE(%p, 1, INT_MAX, %p, 0) failed", c, m);
- }
-}
diff --git a/aos/linux_code/ipc_lib/aos_sync.cc b/aos/linux_code/ipc_lib/aos_sync.cc
new file mode 100644
index 0000000..efda532
--- /dev/null
+++ b/aos/linux_code/ipc_lib/aos_sync.cc
@@ -0,0 +1,443 @@
+#if !AOS_DEBUG
+#define NDEBUG
+#endif
+
+#include "aos/linux_code/ipc_lib/aos_sync.h"
+
+#include <linux/futex.h>
+#include <unistd.h>
+#include <sys/syscall.h>
+#include <errno.h>
+#include <stdint.h>
+#include <limits.h>
+#include <string.h>
+#include <inttypes.h>
+#include <sys/types.h>
+#include <stddef.h>
+#include <assert.h>
+#include <pthread.h>
+#include <sched.h>
+
+#include <algorithm>
+
+#include "aos/common/logging/logging.h"
+#include "aos/linux_code/thread_local.h"
+#include "aos/common/once.h"
+#include "aos/common/macros.h"
+
+// This code was originally based on <http://www.akkadia.org/drepper/futex.pdf>,
+// but is has since evolved a lot. However, that still has useful information.
+//
+// Finding information about actually using futexes is really REALLY hard, so
+// here's a list of the stuff that I've used:
+// futex(7) has a really high-level overview.
+// <http://locklessinc.com/articles/futex_cheat_sheet/> describes some of the
+// operations in a bit more detail than most places.
+// <http://locklessinc.com/articles/mutex_cv_futex/> is the basis of our
+// implementations (before PI).
+// <http://lwn.net/Articles/360699/> has a nice overview of futexes in late 2009
+// (fairly recent compared to everything else...).
+// <https://www.kernel.org/doc/Documentation/pi-futex.txt>,
+// <https://www.kernel.org/doc/Documentation/futex-requeue-pi.txt>,
+// <https://www.kernel.org/doc/Documentation/robust-futexes.txt>,
+// and <https://www.kernel.org/doc/Documentation/robust-futex-ABI.txt> are all
+// useful references.
+// The kernel source (kernel/futex.c) has some useful comments about what the
+// various operations do (except figuring out which argument goes where in the
+// syscall is still confusing).
+// futex(2) is basically useless except for describing the order of the
+// arguments (it only has high-level descriptions of what some of the
+// operations do, and some of them are wrong in Wheezy).
+// glibc's nptl pthreads implementation is the intended user of most of these
+// things, so it is also a good place to look for examples. However, it is all
+// very hard to read because it supports ~20 different kinds of mutexes and
+// several variations of condition variables, and some of the pieces of code
+// are only written in assembly.
+// set_robust_list(2) is wrong in Wheezy (it doesn't actually take a TID
+// argument).
+//
+// Can't use PRIVATE futex operations because they use the pid (or something) as
+// part of the hash.
+//
+// ThreadSanitizer understands how these mutexes etc work. It appears to be able
+// to figure out the happens-before relationship from the __ATOMIC_SEQ_CST
+// atomic primitives.
+//
+// Remember that EAGAIN and EWOUDBLOCK are the same! (ie if you get EAGAIN from
+// FUTEX_WAIT, the docs call it EWOULDBLOCK...)
+
+// Values for an aos_mutex.futex (kernel-mandated):
+// 0 = unlocked
+// TID = locked, not contended
+// |FUTEX_WAITERS = there are waiters (aka contended)
+//
+// Values for an aos_futex being used directly:
+// 0 = unset
+// 1 = set
+//
+// The value of an aos_condition is just a generation counter.
+
+namespace {
+
+// These sys_futex_* functions are wrappers around syscall(SYS_futex). They each
+// take a specific set of arguments for a given futex operation. They return the
+// result or a negated errno value. -1..-4095 mean errors and not successful
+// results, which is guaranteed by the kernel.
+//
+// They each have optimized versions for ARM EABI (the syscall interface is
+// different for non-EABI ARM, so that is the right thing to test for) that
+// don't go through syscall(2) or errno.
+// These use register variables to get the values in the right registers to
+// actually make the syscall.
+
+// The actual macro that we key off of to use the inline versions or not.
+#define ARM_EABI_INLINE_SYSCALL defined(__ARM_EABI__)
+
+// Used for FUTEX_WAIT, FUTEX_LOCK_PI, and FUTEX_TRYLOCK_PI.
+inline int sys_futex_wait(int op, aos_futex *addr1, int val1,
+ const struct timespec *timeout) {
+#if ARM_EABI_INLINE_SYSCALL
+ register aos_futex *addr1_reg __asm__("r0") = addr1;
+ register int op_reg __asm__("r1") = op;
+ register int val1_reg __asm__("r2") = val1;
+ register const struct timespec *timeout_reg __asm__("r3") = timeout;
+ register int syscall_number __asm__("r7") = SYS_futex;
+ register int result __asm__("r0");
+ __asm__ volatile("swi #0"
+ : "=r"(result)
+ : "r"(addr1_reg), "r"(op_reg), "r"(val1_reg),
+ "r"(timeout_reg), "r"(syscall_number)
+ : "memory");
+ return result;
+#else
+ const int r = syscall(SYS_futex, addr1, op, val1, timeout);
+ if (r == -1) return -errno;
+ return r;
+#endif
+}
+
+inline int sys_futex_wake(aos_futex *addr1, int val1) {
+#if ARM_EABI_INLINE_SYSCALL
+ register aos_futex *addr1_reg __asm__("r0") = addr1;
+ register int op_reg __asm__("r1") = FUTEX_WAKE;
+ register int val1_reg __asm__("r2") = val1;
+ register int syscall_number __asm__("r7") = SYS_futex;
+ register int result __asm__("r0");
+ __asm__ volatile("swi #0"
+ : "=r"(result)
+ : "r"(addr1_reg), "r"(op_reg), "r"(val1_reg),
+ "r"(syscall_number)
+ : "memory");
+ return result;
+#else
+ const int r = syscall(SYS_futex, addr1, FUTEX_WAKE, val1);
+ if (r == -1) return -errno;
+ return r;
+#endif
+}
+
+inline int sys_futex_unlock_pi(aos_futex *addr1) {
+#if ARM_EABI_INLINE_SYSCALL
+ register aos_futex *addr1_reg __asm__("r0") = addr1;
+ register int op_reg __asm__("r1") = FUTEX_UNLOCK_PI;
+ register int syscall_number __asm__("r7") = SYS_futex;
+ register int result __asm__("r0");
+ __asm__ volatile("swi #0"
+ : "=r"(result)
+ : "r"(addr1_reg), "r"(op_reg), "r"(syscall_number)
+ : "memory");
+ return result;
+#else
+ const int r = syscall(SYS_futex, addr1, FUTEX_UNLOCK_PI);
+ if (r == -1) return -errno;
+ return r;
+#endif
+}
+
+// Returns true if it succeeds and false if it fails.
+// This is the same as __sync_bool_compare_and_swap, except it fixes that being
+// broken under tsan.
+inline bool compare_and_swap(aos_futex *f, uint32_t before, uint32_t after) {
+#ifdef AOS_SANITIZER_thread
+ // TODO(brians): Figure out how exactly tsan breaks this and fix it + make
+ // sure our workaround actually works.
+ // This workaround is unsafe in the general case, but does not change the
+ // effect in our specific case if the primitive works correctly (and seems to
+ // still do the right thing even when tsan's version falsely reports failing).
+ if (__atomic_load_n(f, __ATOMIC_SEQ_CST) == after) return false;
+ if (__sync_bool_compare_and_swap(f, before, after)) return true;
+ return __atomic_load_n(f, __ATOMIC_SEQ_CST) == after;
+#else
+ return __sync_bool_compare_and_swap(f, before, after);
+#endif
+}
+
+pid_t do_get_tid() {
+ pid_t r = syscall(SYS_gettid);
+ assert(r > 0);
+ return r;
+}
+
+// This gets called by functions before LOG(FATAL)ing with error messages that
+// would be incorrect if the error was caused by a process forking without
+// initialize_in_new_thread getting called in the fork.
+void check_cached_tid(pid_t tid) {
+ pid_t actual = do_get_tid();
+ if (tid != actual) {
+ LOG(FATAL,
+ "task %jd forked into %jd without letting aos_sync know"
+ " so we're not really sure what's going on\n",
+ static_cast<intmax_t>(tid), static_cast<intmax_t>(actual));
+ }
+}
+
+// Starts off at 0 in each new thread (because that's what it gets initialized
+// to in most of them or it gets to reset to 0 after a fork by atfork_child()).
+AOS_THREAD_LOCAL pid_t my_tid = 0;
+
+// Gets called before the fork(2) wrapper function returns in the child.
+void atfork_child() {
+ // The next time get_tid() is called, it will set everything up again.
+ my_tid = 0;
+}
+
+void *InstallAtforkHook() {
+ if (pthread_atfork(NULL, NULL, atfork_child) != 0) {
+ PLOG(FATAL, "pthread_atfork(NULL, NULL, %p) failed", atfork_child);
+ }
+ return nullptr;
+}
+
+// This gets called to set everything up in a new thread by get_tid().
+void initialize_in_new_thread();
+
+// Gets the current thread's TID and does all of the 1-time initialization the
+// first time it's called in a given thread.
+inline uint32_t get_tid() {
+ if (__builtin_expect(my_tid == 0, 0)) {
+ initialize_in_new_thread();
+ }
+ static_assert(sizeof(my_tid) <= sizeof(uint32_t), "pid_t is too big");
+ return static_cast<uint32_t>(my_tid);
+}
+
+void initialize_in_new_thread() {
+ // No synchronization necessary in most of this because it's all thread-local!
+
+ my_tid = do_get_tid();
+
+ static ::aos::Once<void> atfork_hook_installed(InstallAtforkHook);
+ atfork_hook_installed.Get();
+}
+
+// Split out separately from mutex_get so condition_wait can call it too.
+inline int mutex_do_get(aos_mutex *m, bool signals_fail,
+ const struct timespec *timeout) {
+ const uint32_t tid = get_tid();
+
+ while (true) {
+ // If the atomic 0->TID transition fails.
+ if (!compare_and_swap(&m->futex, 0, tid)) {
+ // Wait in the kernel, which handles atomically ORing in FUTEX_WAITERS
+ // before actually sleeping.
+ const int ret = sys_futex_wait(FUTEX_LOCK_PI, &m->futex, 1, timeout);
+ if (ret != 0) {
+ if (timeout != NULL && ret == -ETIMEDOUT) {
+ return 3;
+ }
+ if (__builtin_expect(ret == -EINTR, 1)) {
+ if (signals_fail) {
+ return 2;
+ } else {
+ continue;
+ }
+ }
+ if (ret == -EDEADLK) {
+ LOG(FATAL, "multiple lock of %p by %" PRId32 "\n", m, tid);
+ }
+ PELOG(FATAL, -ret, "FUTEX_LOCK_PI(%p(=%" PRIu32 "), 1, %p) failed",
+ &m->futex, __atomic_load_n(&m->futex, __ATOMIC_SEQ_CST), timeout);
+ } else {
+ // The kernel already handled setting the value to our TID (ish).
+ break;
+ }
+ } else {
+ // Fastpath succeeded, so no need to call into the kernel.
+ break;
+ }
+ }
+
+ return 0;
+}
+
+// The common implementation for everything that wants to lock a mutex.
+// If signals_fail is false, the function will try again if the wait syscall is
+// interrupted by a signal.
+// timeout can be NULL for no timeout.
+inline int mutex_get(aos_mutex *m, bool signals_fail,
+ const struct timespec *timeout) {
+ get_tid();
+ const int r = mutex_do_get(m, signals_fail, timeout);
+ return r;
+}
+
+// The common implementation for broadcast and signal.
+// number_requeue is the number of waiters to requeue (probably INT_MAX or 0). 1
+// will always be woken.
+void condition_wake(aos_condition *c, aos_mutex * /*m*/, int number_requeue) {
+ // Make it so that anybody just going to sleep won't.
+ // This is where we might accidentally wake more than just 1 waiter with 1
+ // signal():
+ // 1 already sleeping will be woken but n might never actually make it to
+ // sleep in the kernel because of this.
+ __atomic_add_fetch(c, 1, __ATOMIC_SEQ_CST);
+
+ const int ret = sys_futex_wake(
+ c, ::std::min(::std::max(number_requeue, 1), INT_MAX - 4096));
+ if (__builtin_expect(
+ static_cast<unsigned int>(ret) > static_cast<unsigned int>(-4096),
+ 0)) {
+ PELOG(FATAL, -ret, "FUTEX_WAKE(%p, %d) failed", c, INT_MAX - 4096);
+ }
+}
+
+} // namespace
+
+int mutex_lock(aos_mutex *m) {
+ return mutex_get(m, true, NULL);
+}
+int mutex_lock_timeout(aos_mutex *m, const struct timespec *timeout) {
+ return mutex_get(m, true, timeout);
+}
+int mutex_grab(aos_mutex *m) {
+ return mutex_get(m, false, NULL);
+}
+
+void mutex_unlock(aos_mutex *m) {
+ const uint32_t tid = get_tid();
+
+ const uint32_t value = __atomic_load_n(&m->futex, __ATOMIC_SEQ_CST);
+ if (__builtin_expect((value & FUTEX_TID_MASK) != tid, 0)) {
+ check_cached_tid(tid);
+ if ((value & FUTEX_TID_MASK) == 0) {
+ LOG(FATAL, "multiple unlock of aos_mutex %p by %" PRId32 "\n", m, tid);
+ } else {
+ LOG(FATAL, "aos_mutex %p is locked by %" PRId32 ", not %" PRId32 "\n",
+ m, value & FUTEX_TID_MASK, tid);
+ }
+ }
+
+ // If the atomic TID->0 transition fails (ie FUTEX_WAITERS is set),
+ if (!compare_and_swap(&m->futex, tid, 0)) {
+ // The kernel handles everything else.
+ const int ret = sys_futex_unlock_pi(&m->futex);
+ if (ret != 0) {
+ PELOG(FATAL, -ret, "FUTEX_UNLOCK_PI(%p) failed", &m->futex);
+ }
+ } else {
+ // There aren't any waiters, so no need to call into the kernel.
+ }
+}
+
+int mutex_trylock(aos_mutex *m) {
+ // Try an atomic 0->TID transition.
+ uint32_t c = __sync_val_compare_and_swap(&m->futex, 0, get_tid());
+
+ if (c != 0) {
+ // Somebody else had it locked; we failed.
+ return 4;
+ }
+ return 0;
+}
+
+bool mutex_islocked(const aos_mutex *m) {
+ const uint32_t tid = get_tid();
+
+ const uint32_t value = __atomic_load_n(&m->futex, __ATOMIC_RELAXED);
+ return (value & FUTEX_TID_MASK) == tid;
+}
+
+int condition_wait(aos_condition *c, aos_mutex *m) {
+ const uint32_t wait_start = __atomic_load_n(c, __ATOMIC_SEQ_CST);
+
+ mutex_unlock(m);
+
+ while (true) {
+ // Wait in the kernel iff the value of it doesn't change (ie somebody else
+ // does a wake) from before we unlocked the mutex.
+ int ret;
+ ret = sys_futex_wait(FUTEX_WAIT, c, wait_start, nullptr);
+ if (ret != 0) {
+ // If it failed because somebody else did a wake and changed the value
+ // before we actually made it to sleep.
+ if (__builtin_expect(ret == -EAGAIN, 1)) {
+ // Everything is just normal locks
+ // etc, so there's no need to do anything special here.
+
+ // We have to relock it ourself because the kernel didn't do it.
+ const int r = mutex_do_get(m, false, nullptr);
+ assert(__builtin_expect(r == 0 || r == 1, 1));
+ return r;
+ }
+ // Try again if it was because of a signal.
+ if (__builtin_expect(ret == -EINTR, 1)) continue;
+ PELOG(FATAL, -ret, "FUTEX_WAIT(%p, %" PRIu32 ", nullptr) failed",
+ c, wait_start);
+ } else {
+ // We have to take the lock ourself because the kernel won't, but
+ // there's no need for it to be anything special because all waiters
+ // just relock it like usual.
+ const int r = mutex_do_get(m, false, nullptr);
+ assert(__builtin_expect(r == 0 || r == 1, 1));
+ return r;
+ }
+ }
+}
+
+void condition_signal(aos_condition *c, aos_mutex *m) {
+ condition_wake(c, m, 0);
+}
+
+void condition_broadcast(aos_condition *c, aos_mutex *m) {
+ condition_wake(c, m, INT_MAX);
+}
+
+int futex_wait_timeout(aos_futex *m, const struct timespec *timeout) {
+ if (__atomic_load_n(m, __ATOMIC_SEQ_CST) != 0) {
+ return 0;
+ }
+ const int ret = sys_futex_wait(FUTEX_WAIT, m, 0, timeout);
+ if (ret != 0) {
+ if (ret == -EINTR) {
+ return 1;
+ } else if (ret == -ETIMEDOUT) {
+ return 2;
+ } else if (ret != -EWOULDBLOCK) {
+ errno = -ret;
+ return -1;
+ }
+ }
+ return 0;
+}
+
+int futex_wait(aos_futex *m) { return futex_wait_timeout(m, NULL); }
+
+int futex_set_value(aos_futex *m, uint32_t value) {
+ __atomic_store_n(m, value, __ATOMIC_SEQ_CST);
+ const int r = sys_futex_wake(m, INT_MAX - 4096);
+ if (__builtin_expect(
+ static_cast<unsigned int>(r) > static_cast<unsigned int>(-4096), 0)) {
+ errno = -r;
+ return -1;
+ } else {
+ return r;
+ }
+}
+
+int futex_set(aos_futex *m) {
+ return futex_set_value(m, 1);
+}
+
+int futex_unset(aos_futex *m) {
+ return !__atomic_exchange_n(m, 0, __ATOMIC_SEQ_CST);
+}
diff --git a/aos/linux_code/ipc_lib/aos_sync.h b/aos/linux_code/ipc_lib/aos_sync.h
index 0d55246..2628558 100644
--- a/aos/linux_code/ipc_lib/aos_sync.h
+++ b/aos/linux_code/ipc_lib/aos_sync.h
@@ -14,41 +14,63 @@
// and <http://www.valgrind.org/docs/manual/drd-manual.html#drd-manual.clientreqs>
// list the interesting ones
-// Have to align structs containing it to sizeof(int).
-// Valid initial values for use with mutex_ functions are 0 (unlocked) and 1 (locked).
+// Have to remember to align structs containing it (recursively) to sizeof(int).
// Valid initial values for use with futex_ functions are 0 (unset) and 1 (set).
-// No initialization is necessary for use as c with the condition_ functions.
// The value should not be changed after multiple processes have started
// accessing an instance except through the functions declared in this file.
-typedef volatile uint32_t mutex __attribute__((aligned(sizeof(int))));
+typedef uint32_t aos_futex __attribute__((aligned(sizeof(int))));
-// All return -1 for other error (which will be in errno from futex(2)).
-//
-// There is no priority inversion protection.
-// TODO(brians) look at using
-// <http://www.kernel.org/doc/Documentation/pi-futex.txt>
+// For use with the condition_ functions.
+// No initialization is necessary.
+typedef aos_futex aos_condition;
-// Returns 1 if interrupted by a signal.
+// For use with the mutex_ functions.
+// futex must be initialized to 0.
+// The recommended way to initialize one of these is by memset(3)ing the whole
+// thing to 0 or using C++ () initialization to avoid depending on the
+// implementation.
+struct aos_mutex {
+ aos_futex futex;
+};
+
+// The mutex_ functions are designed to be used as mutexes. A mutex can only be
+// unlocked from the same task which originally locked it.
+
+// All of these return 2 if
+// interrupted by a signal, 3 if timed out, or 4 if an optional lock fails. Some
+// of them (obviously) can never return some of those values.
//
// One of the highest priority processes blocked on a given mutex will be the
// one to lock it when it is unlocked.
-int mutex_lock(mutex *m) __attribute__((warn_unused_result));
+int mutex_lock(struct aos_mutex *m) __attribute__((warn_unused_result));
// Returns 2 if it timed out or 1 if interrupted by a signal.
-int mutex_lock_timeout(mutex *m, const struct timespec *timeout)
- __attribute__((warn_unused_result));
-// Ignores signals. Can not fail.
-int mutex_grab(mutex *m);
-// abort(2)s for multiple unlocking.
-void mutex_unlock(mutex *m);
-// Returns 0 when successful in locking the mutex and 1 if somebody else has it
-// locked.
-int mutex_trylock(mutex *m) __attribute__((warn_unused_result));
+int mutex_lock_timeout(struct aos_mutex *m, const struct timespec *timeout)
+ __attribute__((warn_unused_result));
+// Ignores signals (retries until something other than getting a signal
+// happens).
+int mutex_grab(struct aos_mutex *m) __attribute__((warn_unused_result));
+// LOG(FATAL)s for multiple unlocking.
+void mutex_unlock(struct aos_mutex *m);
+// Does not block waiting for the mutex.
+int mutex_trylock(struct aos_mutex *m) __attribute__((warn_unused_result));
+#ifdef __cplusplus
+// Returns whether or not the mutex is locked by this thread.
+// There aren't very many valid uses for this function; the main ones are
+// checking mutexes as they are destroyed to catch problems with that early and
+// stack-based recursive mutex locking.
+bool mutex_islocked(const aos_mutex *m);
+#endif
// The futex_ functions are similar to the mutex_ ones but different.
// They are designed for signalling when something happens (possibly to
-// multiple listeners). A mutex manipulated with them can only be set or unset.
+// multiple listeners). A aos_futex manipulated with them can only be set or
+// unset. Also, they can be set/unset/waited on from any task independently of
+// who did something first and have no priority inversion protection.
+// They return -1 for other error (which will be in errno from futex(2)).
+// They have no spurious wakeups (because everybody always gets woken up).
//
// Another name for this kind of synchronization mechanism is a "notification".
+// Python calls it an "event".
//
// They are different from the condition_ functions in that they do NOT work
// correctly as standard condition variables. While it is possible to keep
@@ -56,40 +78,51 @@
// obvious implementation has basically the same race condition that condition
// variables are designed to prevent between somebody else grabbing the mutex
// and changing whether it's set or not and the futex_ function changing the
-// futex's value.
+// futex's value. A futex is effectively a resettable condition variable with
+// the condition being "has it been set"; if you have some other condition (for
+// example messages are available to read on a queue), use the condition_
+// functions or there will be race conditions.
// Wait for the futex to be set. Will return immediately if it's already set.
// Returns 0 if successful or it was already set, 1 if interrupted by a signal,
-// or -1.
-int futex_wait(mutex *m) __attribute__((warn_unused_result));
+// or -1 with an error in errno.
+int futex_wait(aos_futex *m) __attribute__((warn_unused_result));
+// The same as futex_wait except returns 2 if it times out.
+int futex_wait_timeout(aos_futex *m, const struct timespec *timeout)
+ __attribute__((warn_unused_result));
+
// Set the futex and wake up anybody waiting on it.
-// Returns the number that were woken or -1.
+// Returns the number that were woken or -1 with an error in errno.
//
// This will always wake up all waiters at the same time and set the value to 1.
-int futex_set(mutex *m);
+int futex_set(aos_futex *m);
// Same as above except lets something other than 1 be used as the final value.
-int futex_set_value(mutex *m, mutex value);
+int futex_set_value(aos_futex *m, aos_futex value);
// Unsets the futex (sets the value to 0).
// Returns 0 if it was set before and 1 if it wasn't.
// Can not fail.
-int futex_unset(mutex *m);
+int futex_unset(aos_futex *m);
// The condition_ functions implement condition variable support. The API is
// similar to the pthreads api and works the same way. The same m argument must
// be passed in for all calls to all of the condition_ functions with a given c.
+// They do have the potential for spurious wakeups.
// Wait for the condition variable to be signalled. m will be unlocked
// atomically with actually starting to wait. m is guaranteed to be locked when
// this function returns.
// NOTE: The relocking of m is not atomic with stopping the actual wait and
// other process(es) may lock (+unlock) the mutex first.
-void condition_wait(mutex *c, mutex *m);
+// Returns 0.
+int condition_wait(aos_condition *c, struct aos_mutex *m)
+ __attribute__((warn_unused_result));
// If any other processes are condition_waiting on c, wake 1 of them. Does not
// require m to be locked.
-void condition_signal(mutex *c);
+// NOTE: There is a small chance that this will wake more than just 1 waiter.
+void condition_signal(aos_condition *c, struct aos_mutex *m);
// Wakes all processes that are condition_waiting on c. Does not require m to be
// locked.
-void condition_broadcast(mutex *c, mutex *m);
+void condition_broadcast(aos_condition *c, struct aos_mutex *m);
#ifdef __cplusplus
}
diff --git a/aos/linux_code/ipc_lib/condition.cc b/aos/linux_code/ipc_lib/condition.cc
index ed488c5..62ede1c 100644
--- a/aos/linux_code/ipc_lib/condition.cc
+++ b/aos/linux_code/ipc_lib/condition.cc
@@ -1,8 +1,10 @@
#include "aos/common/condition.h"
#include <inttypes.h>
+#include <assert.h>
#include "aos/common/type_traits.h"
+#include "aos/common/mutex.h"
namespace aos {
@@ -12,12 +14,13 @@
Condition::Condition(Mutex *m) : impl_(), m_(m) {}
bool Condition::Wait() {
- condition_wait(&impl_, &m_->impl_);
- return false;
+ const int ret = condition_wait(&impl_, &m_->impl_);
+ assert(__builtin_expect(ret == 0 || ret == 1, 1));
+ return ret == 1;
}
void Condition::Signal() {
- condition_signal(&impl_);
+ condition_signal(&impl_, &m_->impl_);
}
void Condition::Broadcast() {
diff --git a/aos/linux_code/ipc_lib/core_lib.c b/aos/linux_code/ipc_lib/core_lib.c
index 2bd6c25..de418a4 100644
--- a/aos/linux_code/ipc_lib/core_lib.c
+++ b/aos/linux_code/ipc_lib/core_lib.c
@@ -2,6 +2,7 @@
#include <stdio.h>
#include <stdlib.h>
+#include <assert.h>
#include "aos/linux_code/ipc_lib/shared_mem.h"
@@ -27,7 +28,8 @@
void *msg = NULL;
aos_shm_core *shm_core = global_core->mem_struct;
- mutex_grab(&shm_core->msg_alloc_lock);
+ int result = mutex_grab(&shm_core->msg_alloc_lock);
+ assert(result == 0);
shm_core->msg_alloc = (uint8_t *)shm_core->msg_alloc - length;
const uint8_t align_extra = (uintptr_t)shm_core->msg_alloc % alignment;
shm_core->msg_alloc = (uint8_t *)shm_core->msg_alloc - align_extra;
diff --git a/aos/linux_code/ipc_lib/ipc_lib.gyp b/aos/linux_code/ipc_lib/ipc_lib.gyp
index 9f0de33..974a9f6 100644
--- a/aos/linux_code/ipc_lib/ipc_lib.gyp
+++ b/aos/linux_code/ipc_lib/ipc_lib.gyp
@@ -4,10 +4,11 @@
'target_name': 'aos_sync',
'type': 'static_library',
'sources': [
- 'aos_sync.c',
+ 'aos_sync.cc',
],
'dependencies': [
'<(AOS)/build/aos.gyp:logging_interface',
+ '<(AOS)/common/common.gyp:once',
],
},
{
diff --git a/aos/linux_code/ipc_lib/mutex.cc b/aos/linux_code/ipc_lib/mutex.cc
new file mode 100644
index 0000000..9e270c9
--- /dev/null
+++ b/aos/linux_code/ipc_lib/mutex.cc
@@ -0,0 +1,54 @@
+#include "aos/common/mutex.h"
+
+#include <inttypes.h>
+#include <stdio.h>
+#include <string.h>
+
+#include "aos/common/type_traits.h"
+#include "aos/common/logging/logging.h"
+
+namespace aos {
+
+Mutex::Mutex() : impl_() {
+ static_assert(shm_ok<Mutex>::value,
+ "Mutex is not safe for use in shared memory.");
+}
+
+Mutex::~Mutex() {
+ if (__builtin_expect(mutex_islocked(&impl_), 0)) {
+ LOG(FATAL, "destroying locked mutex %p (aka %p)\n",
+ this, &impl_);
+ }
+}
+
+// Lock and Unlock use the return values of mutex_lock/mutex_unlock
+// to determine whether the lock/unlock succeeded.
+
+bool Mutex::Lock() {
+ const int ret = mutex_grab(&impl_);
+ if (ret == 0) {
+ return false;
+ } else {
+ LOG(FATAL, "mutex_grab(%p(=%" PRIu32 ")) failed with %d\n",
+ &impl_, impl_.futex, ret);
+ }
+}
+
+void Mutex::Unlock() {
+ mutex_unlock(&impl_);
+}
+
+Mutex::State Mutex::TryLock() {
+ const int ret = mutex_trylock(&impl_);
+ switch (ret) {
+ case 0:
+ return State::kLocked;
+ case 4:
+ return State::kUnlocked;
+ default:
+ LOG(FATAL, "mutex_trylock(%p(=%" PRIu32 ")) failed with %d\n",
+ &impl_, impl_.futex, ret);
+ }
+}
+
+} // namespace aos
diff --git a/aos/linux_code/ipc_lib/mutex.cpp b/aos/linux_code/ipc_lib/mutex.cpp
deleted file mode 100644
index 4bd0759..0000000
--- a/aos/linux_code/ipc_lib/mutex.cpp
+++ /dev/null
@@ -1,36 +0,0 @@
-#include "aos/common/mutex.h"
-
-#include <inttypes.h>
-#include <stdio.h>
-#include <string.h>
-
-#include "aos/common/type_traits.h"
-#include "aos/common/logging/logging.h"
-
-namespace aos {
-
-Mutex::Mutex() : impl_(0) {
- static_assert(shm_ok<Mutex>::value,
- "Mutex is not safe for use in shared memory.");
-}
-
-// Lock and Unlock use the return values of mutex_lock/mutex_unlock
-// to determine whether the lock/unlock succeeded.
-
-bool Mutex::Lock() {
- if (mutex_grab(&impl_) != 0) {
- PLOG(FATAL, "mutex_grab(%p(=%" PRIu32 ")) failed", &impl_, impl_);
- } else {
- return false;
- }
-}
-
-void Mutex::Unlock() {
- mutex_unlock(&impl_);
-}
-
-bool Mutex::TryLock() {
- return mutex_trylock(&impl_) == 0;
-}
-
-} // namespace aos
diff --git a/aos/linux_code/ipc_lib/queue.cc b/aos/linux_code/ipc_lib/queue.cc
index e8e81dc..9219192 100644
--- a/aos/linux_code/ipc_lib/queue.cc
+++ b/aos/linux_code/ipc_lib/queue.cc
@@ -301,7 +301,8 @@
printf("queue: %p->WriteMessage(%p, %x)\n", this, msg, options.printable());
}
{
- MutexLocker locker(&data_lock_);
+ IPCMutexLocker locker(&data_lock_);
+ CHECK(!locker.owner_died());
bool writable_waited = false;
int new_end;
@@ -327,7 +328,7 @@
if (kWriteDebug) {
printf("queue: going to wait for writable_ of %p\n", this);
}
- writable_.Wait();
+ CHECK(!writable_.Wait());
writable_waited = true;
}
}
@@ -384,7 +385,7 @@
}
readable_waiting_ = true;
// Wait for a message to become readable.
- readable_.Wait();
+ CHECK(!readable_.Wait());
if (kReadDebug) {
printf("queue: done waiting for readable_ of %p\n", this);
}
@@ -415,7 +416,8 @@
}
void *msg = NULL;
- MutexLocker locker(&data_lock_);
+ IPCMutexLocker locker(&data_lock_);
+ CHECK(!locker.owner_died());
if (!ReadCommonStart(options, nullptr)) {
if (kReadDebug) {
@@ -477,7 +479,8 @@
}
void *msg = NULL;
- MutexLocker locker(&data_lock_);
+ IPCMutexLocker locker(&data_lock_);
+ CHECK(!locker.owner_died());
if (!ReadCommonStart(options, index)) {
if (kReadDebug) {
diff --git a/aos/linux_code/ipc_lib/raw_queue_test.cc b/aos/linux_code/ipc_lib/raw_queue_test.cc
index 2267c92..adb84b9 100644
--- a/aos/linux_code/ipc_lib/raw_queue_test.cc
+++ b/aos/linux_code/ipc_lib/raw_queue_test.cc
@@ -58,8 +58,7 @@
"this will get put in shared memory");
template<typename T>
struct FunctionToCall {
- FunctionToCall() : result(ResultType::NotCalled) {
- started.Lock();
+ FunctionToCall() : result(ResultType::NotCalled), started() {
}
volatile ResultType result;
@@ -67,18 +66,19 @@
void (*function)(T*, char*);
T *arg;
volatile char failure[kFailureSize];
- Mutex started;
+ aos_futex started;
};
template<typename T>
static void Hangs_(FunctionToCall<T> *const to_call) {
- to_call->started.Unlock();
+ time::SleepFor(time::Time::InSeconds(0.01));
+ ASSERT_EQ(1, futex_set(&to_call->started));
to_call->result = ResultType::Called;
to_call->function(to_call->arg, const_cast<char *>(to_call->failure));
to_call->result = ResultType::Returned;
}
// How long until a function is considered to have hung.
- static constexpr time::Time kHangTime = time::Time::InSeconds(0.035);
+ static constexpr time::Time kHangTime = time::Time::InSeconds(0.09);
// How long to sleep after forking (for debugging).
static constexpr time::Time kForkSleep = time::Time::InSeconds(0);
@@ -86,15 +86,19 @@
// process and wait(2)s for it.
class ForkedProcess {
public:
- ForkedProcess(pid_t pid, mutex *lock) : pid_(pid), lock_(lock) {};
+ ForkedProcess(pid_t pid, aos_futex *done)
+ : pid_(pid), done_(done), exiting_(false) {};
~ForkedProcess() {
- if (kill(pid_, SIGINT) == -1) {
- if (errno == ESRCH) {
- printf("process %jd was already dead\n", static_cast<intmax_t>(pid_));
- } else {
- PLOG(FATAL, "kill(SIGKILL, %jd) failed", static_cast<intmax_t>(pid_));
+ if (!exiting_) {
+ if (kill(pid_, SIGTERM) == -1) {
+ if (errno == ESRCH) {
+ printf("process %jd was already dead\n",
+ static_cast<intmax_t>(pid_));
+ } else {
+ PLOG(FATAL, "kill(SIGKILL, %jd) failed",
+ static_cast<intmax_t>(pid_));
+ }
}
- return;
}
const pid_t ret = wait(NULL);
if (ret == -1) {
@@ -115,11 +119,12 @@
Finished, Hung, Error
};
JoinResult Join(time::Time timeout = kHangTime) {
- timespec lock_timeout = (kForkSleep + timeout).ToTimespec();
- switch (mutex_lock_timeout(lock_, &lock_timeout)) {
+ timespec done_timeout = (kForkSleep + timeout).ToTimespec();
+ switch (futex_wait_timeout(done_, &done_timeout)) {
case 2:
return JoinResult::Hung;
case 0:
+ exiting_ = true;
return JoinResult::Finished;
default:
return JoinResult::Error;
@@ -128,7 +133,9 @@
private:
const pid_t pid_;
- mutex *const lock_;
+ aos_futex *const done_;
+ // True iff we know that the process is already exiting.
+ bool exiting_;
} __attribute__((unused));
// State for HangsFork and HangsCheck.
@@ -159,9 +166,9 @@
// Leaks shared memory.
template<typename T> __attribute__((warn_unused_result))
std::unique_ptr<ForkedProcess> ForkExecute(void (*function)(T*), T *arg) {
- mutex *lock = static_cast<mutex *>(shm_malloc_aligned(
- sizeof(*lock), sizeof(int)));
- CHECK_EQ(mutex_lock(lock), 0);
+ aos_futex *done = static_cast<aos_futex *>(shm_malloc_aligned(
+ sizeof(*done), alignof(aos_futex)));
+ *done = 0;
const pid_t pid = fork();
switch (pid) {
case 0: // child
@@ -173,13 +180,13 @@
}
::aos::common::testing::PreventExit();
function(arg);
- mutex_unlock(lock);
+ CHECK_NE(-1, futex_set(done));
exit(EXIT_SUCCESS);
case -1: // parent failure
PLOG(ERROR, "fork() failed");
return std::unique_ptr<ForkedProcess>();
default: // parent
- return std::unique_ptr<ForkedProcess>(new ForkedProcess(pid, lock));
+ return std::unique_ptr<ForkedProcess>(new ForkedProcess(pid, done));
}
}
@@ -216,8 +223,8 @@
static_cast<char *>(fatal_failure)[0] = '\0';
children_[id] = ForkExecute(Hangs_, to_call).release();
if (!children_[id]) return AssertionFailure() << "ForkExecute failed";
+ CHECK_EQ(0, futex_wait(&to_call->started));
to_calls_[id] = reinterpret_cast<FunctionToCall<void> *>(to_call);
- to_call->started.Lock();
return AssertionSuccess();
}
// Checks whether or not a function hung like it was supposed to.
@@ -244,7 +251,9 @@
} else if (result == ForkedProcess::JoinResult::Error) {
return AssertionFailure() << "error joining child";
} else {
- abort();
+ if (to_calls_[id]->result == ResultType::NotCalled) {
+ return AssertionFailure() << "stuff took too long getting started";
+ }
return AssertionFailure() << "something weird happened";
}
}
@@ -393,7 +402,10 @@
args.flags = RawQueue::kBlock;
ASSERT_TRUE(HangsFork(ReadTestMessage, &args, true, 1));
ASSERT_TRUE(HangsFork(ReadTestMessage, &args, true, 2));
- EXPECT_TRUE(HangsCheck(1) != HangsCheck(2));
+ AssertionResult one = HangsCheck(1);
+ AssertionResult two = HangsCheck(2);
+ EXPECT_TRUE(one != two) << "'" <<
+ one.failure_message() << "' vs '" << two.failure_message() << "'";
// TODO(brians) finish this
}
diff --git a/aos/linux_code/ipc_lib/shared_mem.c b/aos/linux_code/ipc_lib/shared_mem.c
index a3b5ab0..bc2cc62 100644
--- a/aos/linux_code/ipc_lib/shared_mem.c
+++ b/aos/linux_code/ipc_lib/shared_mem.c
@@ -21,11 +21,11 @@
void init_shared_mem_core(aos_shm_core *shm_core) {
clock_gettime(CLOCK_REALTIME, &shm_core->identifier);
- shm_core->msg_alloc_lock = 0;
+ memset(&shm_core->msg_alloc_lock, 0, sizeof(shm_core->msg_alloc_lock));
shm_core->queues.pointer = NULL;
- shm_core->queues.lock = 0;
+ memset(&shm_core->queues.lock, 0, sizeof(shm_core->queues.lock));
shm_core->queue_types.pointer = NULL;
- shm_core->queue_types.lock = 0;
+ memset(&shm_core->queue_types.lock, 0, sizeof(shm_core->queue_types.lock));
}
ptrdiff_t aos_core_get_mem_usage(void) {
diff --git a/aos/linux_code/ipc_lib/shared_mem.h b/aos/linux_code/ipc_lib/shared_mem.h
index 33cb607..8366703 100644
--- a/aos/linux_code/ipc_lib/shared_mem.h
+++ b/aos/linux_code/ipc_lib/shared_mem.h
@@ -21,7 +21,7 @@
// A structure that represents some kind of global pointer that everything
// shares.
typedef struct aos_global_pointer_t {
- mutex lock;
+ struct aos_mutex lock;
void *pointer;
} aos_global_pointer;
@@ -31,8 +31,8 @@
struct timespec identifier;
// gets 0-initialized at the start (as part of shared memory) and
// the owner sets as soon as it finishes setting stuff up
- mutex creation_condition;
- mutex msg_alloc_lock;
+ aos_condition creation_condition;
+ struct aos_mutex msg_alloc_lock;
void *msg_alloc;
// A pointer to the head of the linked list of queues.
// pointer points to a ::aos::Queue.
diff --git a/aos/linux_code/logging/binary_log_file.cc b/aos/linux_code/logging/binary_log_file.cc
index 210d443..1f9815e 100644
--- a/aos/linux_code/logging/binary_log_file.cc
+++ b/aos/linux_code/logging/binary_log_file.cc
@@ -195,10 +195,10 @@
LogFileMessageHeader *LogFileWriter::GetWritePosition(size_t message_size) {
if (position() + message_size + (kAlignment - (message_size % kAlignment)) +
- sizeof(mutex) > kPageSize) {
+ sizeof(aos_futex) > kPageSize) {
char *const temp = current();
MapNextPage();
- if (futex_set_value(static_cast<mutex *>(static_cast<void *>(
+ if (futex_set_value(static_cast<aos_futex *>(static_cast<void *>(
&temp[position()])), 2) == -1) {
PLOG(WARNING, "readers will hang because futex_set_value(%p, 2) failed",
&temp[position()]);
diff --git a/aos/linux_code/logging/binary_log_file.h b/aos/linux_code/logging/binary_log_file.h
index efbffcf..84f8398 100644
--- a/aos/linux_code/logging/binary_log_file.h
+++ b/aos/linux_code/logging/binary_log_file.h
@@ -49,9 +49,9 @@
//
// There will be something here after the last message on a "page" set to 2
// (by the futex_set) to indicate that the next message is on the next page.
- mutex marker;
+ aos_futex marker;
static_assert(sizeof(marker) == 4, "mutex changed size!");
- static_assert(MESSAGE_ALIGNMENT >= alignof(mutex),
+ static_assert(MESSAGE_ALIGNMENT >= alignof(aos_futex),
"MESSAGE_ALIGNMENT is too small");
uint32_t time_sec;