Removed linux_code
Change-Id: I7327828d2c9efdf03172d1b90f49d5c51fbba86e
diff --git a/aos/ipc_lib/BUILD b/aos/ipc_lib/BUILD
new file mode 100644
index 0000000..20972e4
--- /dev/null
+++ b/aos/ipc_lib/BUILD
@@ -0,0 +1,168 @@
+package(default_visibility = ["//visibility:public"])
+
+cc_library(
+ name = "aos_sync",
+ srcs = [
+ "aos_sync.cc",
+ ],
+ hdrs = [
+ "aos_sync.h",
+ ],
+ compatible_with = [
+ "//tools:armhf-debian",
+ ],
+ linkopts = [
+ "-lpthread",
+ ],
+ deps = [
+ "//aos:once",
+ "//aos:macros",
+ "//aos/logging",
+ "//aos/util:compiler_memory_barrier",
+ ],
+)
+
+cc_library(
+ name = "core_lib",
+ srcs = [
+ "core_lib.c",
+ ],
+ hdrs = [
+ "core_lib.h",
+ ],
+ compatible_with = [
+ "//tools:armhf-debian",
+ ],
+ deps = [
+ ":aos_sync",
+ ":shared_mem_types",
+ ],
+)
+
+cc_library(
+ name = "shared_mem",
+ srcs = [
+ "shared_mem.c",
+ ],
+ hdrs = [
+ "shared_mem.h",
+ ],
+ compatible_with = [
+ "//tools:armhf-debian",
+ ],
+ linkopts = [
+ "-lrt",
+ ],
+ deps = [
+ ":aos_sync",
+ ":core_lib",
+ ":shared_mem_types",
+ "//aos/logging",
+ ],
+)
+
+cc_library(
+ # TODO(Brian): This should be shared_mem{,.h}, and the other one should be
+ # shared_mem_init{,.cc,.h}.
+ name = "shared_mem_types",
+ hdrs = [
+ "shared_mem_types.h",
+ ],
+ compatible_with = [
+ "//tools:armhf-debian",
+ ],
+ deps = [
+ ":aos_sync",
+ ],
+)
+
+cc_library(
+ name = "queue",
+ srcs = [
+ "queue.cc",
+ ],
+ hdrs = [
+ "queue.h",
+ ],
+ compatible_with = [
+ "//tools:armhf-debian",
+ ],
+ linkopts = [
+ "-lrt",
+ ],
+ deps = [
+ ":core_lib",
+ ":shared_mem",
+ "//aos:condition",
+ "//aos/mutex:mutex",
+ "//aos/logging",
+ "//aos/util:options",
+ ],
+)
+
+cc_test(
+ name = "raw_queue_test",
+ srcs = [
+ "raw_queue_test.cc",
+ ],
+ deps = [
+ ":core_lib",
+ ":queue",
+ "//aos:die",
+ "//aos/time:time",
+ "//aos/logging",
+ "//aos/util:death_test_log_implementation",
+ "//aos/util:thread",
+ "//aos/testing:googletest",
+ "//aos/testing:prevent_exit",
+ "//aos/testing:test_shm",
+ ],
+)
+
+cc_test(
+ name = "ipc_stress_test",
+ srcs = [
+ "ipc_stress_test.cc",
+ ],
+ compatible_with = [
+ "//tools:armhf-debian",
+ ],
+ tags = [
+ "manual",
+ ],
+ deps = [
+ ":core_lib",
+ "//aos:die",
+ "//aos/mutex:mutex",
+ "//aos/time:time",
+ "//aos/libc:aos_strsignal",
+ "//aos/libc:dirname",
+ "//aos/logging",
+ "//aos/testing:googletest",
+ "//aos/testing:test_shm",
+ ],
+)
+
+cc_library(
+ name = "scoped_message_ptr",
+ deps = [
+ ":queue",
+ ],
+)
+
+cc_binary(
+ name = "ipc_comparison",
+ srcs = [
+ "ipc_comparison.cc",
+ ],
+ deps = [
+ ":queue",
+ "//aos:condition",
+ "//aos:event",
+ "//aos/mutex:mutex",
+ "//aos/logging",
+ "//aos/logging:implementations",
+ "//aos:init",
+ "//third_party/gflags",
+ ],
+)
diff --git a/aos/ipc_lib/aos_sync.cc b/aos/ipc_lib/aos_sync.cc
new file mode 100644
index 0000000..c1be351
--- /dev/null
+++ b/aos/ipc_lib/aos_sync.cc
@@ -0,0 +1,1030 @@
+#if !AOS_DEBUG
+#undef NDEBUG
+#define NDEBUG
+#endif
+
+#include "aos/ipc_lib/aos_sync.h"
+
+#include <linux/futex.h>
+#include <unistd.h>
+#include <sys/syscall.h>
+#include <errno.h>
+#include <stdint.h>
+#include <limits.h>
+#include <string.h>
+#include <inttypes.h>
+#include <sys/types.h>
+#include <stddef.h>
+#include <assert.h>
+#include <pthread.h>
+#include <sched.h>
+
+#ifdef AOS_SANITIZER_thread
+#include <sanitizer/tsan_interface_atomic.h>
+#endif
+
+#include <algorithm>
+#include <type_traits>
+
+#include "aos/logging/logging.h"
+#include "aos/macros.h"
+#include "aos/util/compiler_memory_barrier.h"
+#include "aos/once.h"
+
+using ::aos::linux_code::ipc_lib::FutexAccessorObserver;
+
+// This code was originally based on <https://www.akkadia.org/drepper/futex.pdf>,
+// but is has since evolved a lot. However, that still has useful information.
+//
+// Finding information about actually using futexes is really REALLY hard, so
+// here's a list of the stuff that I've used:
+// futex(7) has a really high-level overview.
+// <http://locklessinc.com/articles/futex_cheat_sheet/> describes some of the
+// operations in a bit more detail than most places.
+// <http://locklessinc.com/articles/mutex_cv_futex/> is the basis of our
+// implementations (before PI).
+// <http://lwn.net/Articles/360699/> has a nice overview of futexes in late 2009
+// (fairly recent compared to everything else...).
+// <https://www.kernel.org/doc/Documentation/pi-futex.txt>,
+// <https://www.kernel.org/doc/Documentation/futex-requeue-pi.txt>,
+// <https://www.kernel.org/doc/Documentation/robust-futexes.txt>,
+// and <https://www.kernel.org/doc/Documentation/robust-futex-ABI.txt> are all
+// useful references.
+// The kernel source (kernel/futex.c) has some useful comments about what the
+// various operations do (except figuring out which argument goes where in the
+// syscall is still confusing).
+// futex(2) is basically useless except for describing the order of the
+// arguments (it only has high-level descriptions of what some of the
+// operations do, and some of them are wrong in Wheezy).
+// glibc's nptl pthreads implementation is the intended user of most of these
+// things, so it is also a good place to look for examples. However, it is all
+// very hard to read because it supports ~20 different kinds of mutexes and
+// several variations of condition variables, and some of the pieces of code
+// are only written in assembly.
+// set_robust_list(2) is wrong in Wheezy (it doesn't actually take a TID
+// argument).
+//
+// Can't use PRIVATE futex operations because they use the pid (or something) as
+// part of the hash.
+//
+// ThreadSanitizer understands how these mutexes etc work. It appears to be able
+// to figure out the happens-before relationship from the __ATOMIC_SEQ_CST
+// atomic primitives.
+//
+// Remember that EAGAIN and EWOUDBLOCK are the same! (ie if you get EAGAIN from
+// FUTEX_WAIT, the docs call it EWOULDBLOCK...)
+
+// Values for an aos_mutex.futex (kernel-mandated):
+// 0 = unlocked
+// TID = locked, not contended
+// |FUTEX_WAITERS = there are waiters (aka contended)
+// |FUTEX_OWNER_DIED = old owner died
+//
+// Values for an aos_futex being used directly:
+// 0 = unset
+// 1 = set
+//
+// The value of an aos_condition is just a generation counter.
+
+// Whether or not to use the REQUEUE_PI operation. Using it is better (less
+// syscalls and the highest priority waiter is always the one that gets woken),
+// but there's a kernel bug that results in random memory corruption while using
+// them.
+// The alternative is to just wake everybody and have them all race to relock
+// the mutex (classic thundering herd).
+// Currently just whether or not we're not on ARM because we only run this on
+// ARM kernels with the patch to fix that issue applied. This will likely change
+// to something based on kernel version at some point.
+#ifdef __arm__
+#define USE_REQUEUE_PI 1
+#else
+#define USE_REQUEUE_PI 0
+#endif
+
+#ifdef AOS_SANITIZER_thread
+extern "C" void AnnotateHappensBefore(const char *file, int line,
+ uintptr_t addr);
+extern "C" void AnnotateHappensAfter(const char *file, int line,
+ uintptr_t addr);
+#define ANNOTATE_HAPPENS_BEFORE(address) \
+ AnnotateHappensBefore(__FILE__, __LINE__, \
+ reinterpret_cast<uintptr_t>(address))
+#define ANNOTATE_HAPPENS_AFTER(address) \
+ AnnotateHappensAfter(__FILE__, __LINE__, reinterpret_cast<uintptr_t>(address))
+#else
+#define ANNOTATE_HAPPENS_BEFORE(address)
+#define ANNOTATE_HAPPENS_AFTER(address)
+#endif
+
+namespace {
+
+const bool kRobustListDebug = false;
+const bool kLockDebug = false;
+const bool kPrintOperations = false;
+
+// These sys_futex_* functions are wrappers around syscall(SYS_futex). They each
+// take a specific set of arguments for a given futex operation. They return the
+// result or a negated errno value. -1..-4095 mean errors and not successful
+// results, which is guaranteed by the kernel.
+//
+// They each have optimized versions for ARM EABI (the syscall interface is
+// different for non-EABI ARM, so that is the right thing to test for) that
+// don't go through syscall(2) or errno.
+// These use register variables to get the values in the right registers to
+// actually make the syscall.
+
+// The actual macro that we key off of to use the inline versions or not.
+#if defined(__ARM_EABI__)
+#define ARM_EABI_INLINE_SYSCALL 1
+#else
+#define ARM_EABI_INLINE_SYSCALL 0
+#endif
+
+// Used for FUTEX_WAIT, FUTEX_LOCK_PI, and FUTEX_TRYLOCK_PI.
+inline int sys_futex_wait(int op, aos_futex *addr1, int val1,
+ const struct timespec *timeout) {
+#if ARM_EABI_INLINE_SYSCALL
+ register aos_futex *addr1_reg __asm__("r0") = addr1;
+ register int op_reg __asm__("r1") = op;
+ register int val1_reg __asm__("r2") = val1;
+ register const struct timespec *timeout_reg __asm__("r3") = timeout;
+ register int syscall_number __asm__("r7") = SYS_futex;
+ register int result __asm__("r0");
+ __asm__ volatile("swi #0"
+ : "=r"(result)
+ : "r"(addr1_reg), "r"(op_reg), "r"(val1_reg),
+ "r"(timeout_reg), "r"(syscall_number)
+ : "memory");
+ return result;
+#else
+ const int r = syscall(SYS_futex, addr1, op, val1, timeout);
+ if (r == -1) return -errno;
+ return r;
+#endif
+}
+
+inline int sys_futex_wake(aos_futex *addr1, int val1) {
+#if ARM_EABI_INLINE_SYSCALL
+ register aos_futex *addr1_reg __asm__("r0") = addr1;
+ register int op_reg __asm__("r1") = FUTEX_WAKE;
+ register int val1_reg __asm__("r2") = val1;
+ register int syscall_number __asm__("r7") = SYS_futex;
+ register int result __asm__("r0");
+ __asm__ volatile("swi #0"
+ : "=r"(result)
+ : "r"(addr1_reg), "r"(op_reg), "r"(val1_reg),
+ "r"(syscall_number)
+ : "memory");
+ return result;
+#else
+ const int r = syscall(SYS_futex, addr1, FUTEX_WAKE, val1);
+ if (r == -1) return -errno;
+ return r;
+#endif
+}
+
+inline int sys_futex_cmp_requeue_pi(aos_futex *addr1, int num_wake,
+ int num_requeue, aos_futex *m, uint32_t val) {
+#if ARM_EABI_INLINE_SYSCALL
+ register aos_futex *addr1_reg __asm__("r0") = addr1;
+ register int op_reg __asm__("r1") = FUTEX_CMP_REQUEUE_PI;
+ register int num_wake_reg __asm__("r2") = num_wake;
+ register int num_requeue_reg __asm__("r3") = num_requeue;
+ register aos_futex *m_reg __asm__("r4") = m;
+ register uint32_t val_reg __asm__("r5") = val;
+ register int syscall_number __asm__("r7") = SYS_futex;
+ register int result __asm__("r0");
+ __asm__ volatile("swi #0"
+ : "=r"(result)
+ : "r"(addr1_reg), "r"(op_reg), "r"(num_wake_reg),
+ "r"(num_requeue_reg), "r"(m_reg), "r"(val_reg),
+ "r"(syscall_number)
+ : "memory");
+ return result;
+#else
+ const int r = syscall(SYS_futex, addr1, FUTEX_CMP_REQUEUE_PI, num_wake,
+ num_requeue, m, val);
+ if (r == -1) return -errno;
+ return r;
+#endif
+}
+
+inline int sys_futex_wait_requeue_pi(aos_condition *addr1,
+ uint32_t start_val,
+ const struct timespec *timeout,
+ aos_futex *m) {
+#if ARM_EABI_INLINE_SYSCALL
+ register aos_condition *addr1_reg __asm__("r0") = addr1;
+ register int op_reg __asm__("r1") = FUTEX_WAIT_REQUEUE_PI;
+ register uint32_t start_val_reg __asm__("r2") = start_val;
+ register const struct timespec *timeout_reg __asm__("r3") = timeout;
+ register aos_futex *m_reg __asm__("r4") = m;
+ register int syscall_number __asm__("r7") = SYS_futex;
+ register int result __asm__("r0");
+ __asm__ volatile("swi #0"
+ : "=r"(result)
+ : "r"(addr1_reg), "r"(op_reg), "r"(start_val_reg),
+ "r"(timeout_reg), "r"(m_reg), "r"(syscall_number)
+ : "memory");
+ return result;
+#else
+ const int r =
+ syscall(SYS_futex, addr1, FUTEX_WAIT_REQUEUE_PI, start_val, timeout, m);
+ if (r == -1) return -errno;
+ return r;
+#endif
+}
+
+inline int sys_futex_unlock_pi(aos_futex *addr1) {
+#if ARM_EABI_INLINE_SYSCALL
+ register aos_futex *addr1_reg __asm__("r0") = addr1;
+ register int op_reg __asm__("r1") = FUTEX_UNLOCK_PI;
+ register int syscall_number __asm__("r7") = SYS_futex;
+ register int result __asm__("r0");
+ __asm__ volatile("swi #0"
+ : "=r"(result)
+ : "r"(addr1_reg), "r"(op_reg), "r"(syscall_number)
+ : "memory");
+ return result;
+#else
+ const int r = syscall(SYS_futex, addr1, FUTEX_UNLOCK_PI);
+ if (r == -1) return -errno;
+ return r;
+#endif
+}
+
+// Returns the previous value of f.
+inline uint32_t compare_and_swap_val(aos_futex *f, uint32_t before,
+ uint32_t after) {
+#ifdef AOS_SANITIZER_thread
+ // This is a workaround for <https://llvm.org/bugs/show_bug.cgi?id=23176>.
+ // Basically, most of the atomic operations are broken under tsan, but this
+ // particular one isn't.
+ // TODO(Brian): Remove this #ifdef (and the one in compare_and_swap) once we
+ // don't have to worry about tsan with this bug any more.
+ uint32_t before_value = before;
+ __tsan_atomic32_compare_exchange_strong(
+ reinterpret_cast<int32_t *>(f),
+ reinterpret_cast<int32_t *>(&before_value), after,
+ __tsan_memory_order_seq_cst, __tsan_memory_order_seq_cst);
+ return before_value;
+#else
+ return __sync_val_compare_and_swap(f, before, after);
+#endif
+}
+
+// Returns true if it succeeds and false if it fails.
+inline bool compare_and_swap(aos_futex *f, uint32_t before, uint32_t after) {
+#ifdef AOS_SANITIZER_thread
+ return compare_and_swap_val(f, before, after) == before;
+#else
+ return __sync_bool_compare_and_swap(f, before, after);
+#endif
+}
+
+#ifdef AOS_SANITIZER_thread
+
+// Simple macro for checking something which should always be true.
+// Using the standard CHECK macro isn't safe because failures often result in
+// reentering the mutex locking code, which doesn't work.
+#define SIMPLE_CHECK(expr) \
+ do { \
+ if (!(expr)) { \
+ fprintf(stderr, "%s: %d: SIMPLE_CHECK(" #expr ") failed!\n", __FILE__, \
+ __LINE__); \
+ abort(); \
+ } \
+ } while (false)
+
+// Forcibly initializes the pthread mutex for *m.
+// This sequence of operations is only safe for the simpler kinds of mutexes in
+// glibc's pthreads implementation on Linux.
+void init_pthread_mutex(aos_mutex *m) {
+ // Re-initialize the mutex so the destroy won't fail if it's locked.
+ // tsan ignores this.
+ SIMPLE_CHECK(0 == pthread_mutex_init(&m->pthread_mutex, nullptr));
+ // Destroy the mutex so tsan will forget about it if some now-dead thread
+ // locked it.
+ SIMPLE_CHECK(0 == pthread_mutex_destroy(&m->pthread_mutex));
+
+ // Now actually initialize it, making sure it's process-shareable so it works
+ // correctly across shared memory.
+ pthread_mutexattr_t attr;
+ SIMPLE_CHECK(0 == pthread_mutexattr_init(&attr));
+ SIMPLE_CHECK(0 == pthread_mutexattr_setpshared(&attr, true));
+ SIMPLE_CHECK(0 == pthread_mutex_init(&m->pthread_mutex, &attr));
+ SIMPLE_CHECK(0 == pthread_mutexattr_destroy(&attr));
+}
+
+// Locks the pthread mutex for *m.
+// If a stack trace ever reveals the pthread_mutex_lock call in here blocking,
+// there is a bug in our mutex code or the way somebody is calling it.
+void lock_pthread_mutex(aos_mutex *m) {
+ if (!m->pthread_mutex_init) {
+ init_pthread_mutex(m);
+ m->pthread_mutex_init = true;
+ }
+ SIMPLE_CHECK(0 == pthread_mutex_lock(&m->pthread_mutex));
+}
+
+// Forcibly locks the pthread mutex for *m.
+// This will (somewhat hackily) rip the lock out from underneath somebody else
+// who is already holding it.
+void force_lock_pthread_mutex(aos_mutex *m) {
+ if (!m->pthread_mutex_init) {
+ init_pthread_mutex(m);
+ m->pthread_mutex_init = true;
+ }
+ const int trylock_result = pthread_mutex_trylock(&m->pthread_mutex);
+ SIMPLE_CHECK(trylock_result == 0 || trylock_result == EBUSY);
+ if (trylock_result == 0) {
+ // We're good, so unlock it and then go for a real lock down below.
+ SIMPLE_CHECK(0 == pthread_mutex_unlock(&m->pthread_mutex));
+ } else {
+ // Somebody (should always be somebody else who died with it held) already
+ // has it, so make tsan forget about that.
+ init_pthread_mutex(m);
+ }
+ lock_pthread_mutex(m);
+}
+
+// Unlocks the pthread mutex for *m.
+void unlock_pthread_mutex(aos_mutex *m) {
+ assert(m->pthread_mutex_init);
+ SIMPLE_CHECK(0 == pthread_mutex_unlock(&m->pthread_mutex));
+}
+
+#else
+
+// Empty implementations of all these so the code below doesn't need #ifdefs.
+static inline void lock_pthread_mutex(aos_mutex *) {}
+static inline void force_lock_pthread_mutex(aos_mutex *) {}
+static inline void unlock_pthread_mutex(aos_mutex *) {}
+
+#endif
+
+pid_t do_get_tid() {
+ pid_t r = syscall(SYS_gettid);
+ assert(r > 0);
+ return r;
+}
+
+// This gets called by functions before LOG(FATAL)ing with error messages that
+// would be incorrect if the error was caused by a process forking without
+// initialize_in_new_thread getting called in the fork.
+void check_cached_tid(pid_t tid) {
+ pid_t actual = do_get_tid();
+ if (tid != actual) {
+ LOG(FATAL,
+ "task %jd forked into %jd without letting aos_sync know"
+ " so we're not really sure what's going on\n",
+ static_cast<intmax_t>(tid), static_cast<intmax_t>(actual));
+ }
+}
+
+// Starts off at 0 in each new thread (because that's what it gets initialized
+// to in most of them or it gets to reset to 0 after a fork by atfork_child()).
+thread_local pid_t my_tid = 0;
+
+// Gets called before the fork(2) wrapper function returns in the child.
+void atfork_child() {
+ // The next time get_tid() is called, it will set everything up again.
+ my_tid = 0;
+}
+
+void *InstallAtforkHook() {
+ if (pthread_atfork(NULL, NULL, atfork_child) != 0) {
+ PLOG(FATAL, "pthread_atfork(NULL, NULL, %p) failed", atfork_child);
+ }
+ return nullptr;
+}
+
+// This gets called to set everything up in a new thread by get_tid().
+void initialize_in_new_thread();
+
+// Gets the current thread's TID and does all of the 1-time initialization the
+// first time it's called in a given thread.
+inline uint32_t get_tid() {
+ if (__builtin_expect(my_tid == 0, false)) {
+ initialize_in_new_thread();
+ }
+ static_assert(sizeof(my_tid) <= sizeof(uint32_t), "pid_t is too big");
+ return static_cast<uint32_t>(my_tid);
+}
+
+// Contains all of the stuff for dealing with the robust list. Nothing outside
+// this namespace should touch anything inside it except Init, Adder, and
+// Remover.
+namespace my_robust_list {
+
+static_assert(offsetof(aos_mutex, next) == 0,
+ "Our math all assumes that the beginning of a mutex and its next "
+ "pointer are at the same place in memory.");
+
+// Our version of robust_list_head.
+// This is copied from the kernel header because that's a pretty stable ABI (and
+// any changes will be backwards compatible anyways) and we want ours to have
+// different types.
+// The uintptr_ts are &next of the elements in the list (with stuff |ed in).
+struct aos_robust_list_head {
+ uintptr_t next;
+ long futex_offset;
+ uintptr_t pending_next;
+};
+
+static_assert(offsetof(aos_robust_list_head, next) ==
+ offsetof(robust_list_head, list),
+ "Our aos_robust_list_head doesn't match the kernel's");
+static_assert(offsetof(aos_robust_list_head, futex_offset) ==
+ offsetof(robust_list_head, futex_offset),
+ "Our aos_robust_list_head doesn't match the kernel's");
+static_assert(offsetof(aos_robust_list_head, pending_next) ==
+ offsetof(robust_list_head, list_op_pending),
+ "Our aos_robust_list_head doesn't match the kernel's");
+static_assert(sizeof(aos_robust_list_head) == sizeof(robust_list_head),
+ "Our aos_robust_list_head doesn't match the kernel's");
+
+thread_local aos_robust_list_head robust_head;
+
+// Extra offset between mutex values and where we point to for their robust list
+// entries (from SetRobustListOffset).
+uintptr_t robust_list_offset = 0;
+
+// The value to OR each pointer's value with whenever putting it into the robust
+// list (technically only if it's PI, but all of ours are, so...).
+static const uintptr_t kRobustListOr = 1;
+
+// Returns the value which goes into a next variable to represent the head.
+inline uintptr_t robust_head_next_value() {
+ return reinterpret_cast<uintptr_t>(&robust_head.next);
+}
+// Returns true iff next represents the head.
+inline bool next_is_head(uintptr_t next) {
+ return next == robust_head_next_value();
+}
+// Returns the (psuedo-)mutex corresponding to the head.
+// This does NOT have a previous pointer, so be careful with the return value.
+inline aos_mutex *robust_head_mutex() {
+ return reinterpret_cast<aos_mutex *>(robust_head_next_value());
+}
+
+inline uintptr_t mutex_to_next(aos_mutex *m) {
+ return (reinterpret_cast<uintptr_t>(&m->next) + robust_list_offset) |
+ kRobustListOr;
+}
+inline aos_mutex *next_to_mutex(uintptr_t next) {
+ if (__builtin_expect(robust_list_offset != 0, false) && next_is_head(next)) {
+ // We don't offset the head pointer, so be careful.
+ return reinterpret_cast<aos_mutex *>(next);
+ }
+ return reinterpret_cast<aos_mutex *>(
+ (next & ~kRobustListOr) - robust_list_offset);
+}
+
+// Sets up the robust list for each thread.
+void Init() {
+ // It starts out just pointing back to itself.
+ robust_head.next = robust_head_next_value();
+ robust_head.futex_offset = static_cast<ssize_t>(offsetof(aos_mutex, futex)) -
+ static_cast<ssize_t>(offsetof(aos_mutex, next));
+ robust_head.pending_next = 0;
+ if (syscall(SYS_set_robust_list, robust_head_next_value(), sizeof(robust_head)) !=
+ 0) {
+ PLOG(FATAL, "set_robust_list(%p, %zd) failed",
+ reinterpret_cast<void *>(robust_head.next), sizeof(robust_head));
+ }
+ if (kRobustListDebug) {
+ printf("%" PRId32 ": init done\n", get_tid());
+ }
+}
+
+// Updating the offset with locked mutexes is important during robustness
+// testing, because there are mutexes which are locked before this is set to a
+// non-0 value and then unlocked after it is changed back. However, to make sure
+// the code works correctly when manipulating the next pointer of the last of
+// those mutexes, all of their next values have to be adjusted appropriately.
+void SetRobustListOffset(uintptr_t offset) {
+ const uintptr_t offset_change = offset - robust_list_offset;
+ robust_list_offset = offset;
+ aos_mutex *m = robust_head_mutex();
+ // Update the offset contained in each of the mutexes which is already locked.
+ while (!next_is_head(m->next)) {
+ m->next += offset_change;
+ m = next_to_mutex(m->next);
+ }
+}
+
+bool HaveLockedMutexes() {
+ return robust_head.next != robust_head_next_value();
+}
+
+// Handles adding a mutex to the robust list.
+// The idea is to create one of these at the beginning of a function that needs
+// to do this and then call Add() iff it should actually be added.
+class Adder {
+ public:
+ Adder(aos_mutex *m) : m_(m) {
+ assert(robust_head.pending_next == 0);
+ if (kRobustListDebug) {
+ printf("%" PRId32 ": maybe add %p\n", get_tid(), m_);
+ }
+ robust_head.pending_next = mutex_to_next(m);
+ aos_compiler_memory_barrier();
+ }
+ ~Adder() {
+ assert(robust_head.pending_next == mutex_to_next(m_));
+ if (kRobustListDebug) {
+ printf("%" PRId32 ": done maybe add %p, n=%p p=%p\n", get_tid(), m_,
+ next_to_mutex(m_->next), m_->previous);
+ }
+ aos_compiler_memory_barrier();
+ robust_head.pending_next = 0;
+ }
+
+ void Add() {
+ assert(robust_head.pending_next == mutex_to_next(m_));
+ if (kRobustListDebug) {
+ printf("%" PRId32 ": adding %p\n", get_tid(), m_);
+ }
+ const uintptr_t old_head_next_value = robust_head.next;
+
+ m_->next = old_head_next_value;
+ aos_compiler_memory_barrier();
+ robust_head.next = mutex_to_next(m_);
+
+ m_->previous = robust_head_mutex();
+ if (!next_is_head(old_head_next_value)) {
+ // robust_head's psuedo-mutex doesn't have a previous pointer to update.
+ next_to_mutex(old_head_next_value)->previous = m_;
+ }
+ aos_compiler_memory_barrier();
+ if (kRobustListDebug) {
+ printf("%" PRId32 ": done adding %p\n", get_tid(), m_);
+ }
+ }
+
+ private:
+ aos_mutex *const m_;
+
+ DISALLOW_COPY_AND_ASSIGN(Adder);
+};
+
+// Handles removing a mutex from the robust list.
+// The idea is to create one of these at the beginning of a function that needs
+// to do this.
+class Remover {
+ public:
+ Remover(aos_mutex *m) {
+ assert(robust_head.pending_next == 0);
+ if (kRobustListDebug) {
+ printf("%" PRId32 ": beginning to remove %p, n=%p p=%p\n", get_tid(), m,
+ next_to_mutex(m->next), m->previous);
+ }
+ robust_head.pending_next = mutex_to_next(m);
+ aos_compiler_memory_barrier();
+
+ aos_mutex *const previous = m->previous;
+ const uintptr_t next_value = m->next;
+
+ previous->next = m->next;
+ if (!next_is_head(next_value)) {
+ // robust_head's psuedo-mutex doesn't have a previous pointer to update.
+ next_to_mutex(next_value)->previous = previous;
+ }
+
+ if (kRobustListDebug) {
+ printf("%" PRId32 ": done removing %p\n", get_tid(), m);
+ }
+ }
+ ~Remover() {
+ assert(robust_head.pending_next != 0);
+ aos_compiler_memory_barrier();
+ robust_head.pending_next = 0;
+ if (kRobustListDebug) {
+ printf("%" PRId32 ": done with removal\n", get_tid());
+ }
+ }
+
+ private:
+ DISALLOW_COPY_AND_ASSIGN(Remover);
+};
+
+} // namespace my_robust_list
+
+void initialize_in_new_thread() {
+ // No synchronization necessary in most of this because it's all thread-local!
+
+ my_tid = do_get_tid();
+
+ static ::aos::Once<void> atfork_hook_installed(InstallAtforkHook);
+ atfork_hook_installed.Get();
+
+ my_robust_list::Init();
+}
+
+FutexAccessorObserver before_observer = nullptr, after_observer = nullptr;
+
+// RAII class which runs before_observer during construction and after_observer
+// during destruction.
+class RunObservers {
+ public:
+ template <class T>
+ RunObservers(T *address, bool write)
+ : address_(static_cast<void *>(
+ const_cast<typename ::std::remove_cv<T>::type *>(address))),
+ write_(write) {
+ if (__builtin_expect(before_observer != nullptr, false)) {
+ before_observer(address_, write_);
+ }
+ }
+ ~RunObservers() {
+ if (__builtin_expect(after_observer != nullptr, false)) {
+ after_observer(address_, write_);
+ }
+ }
+
+ private:
+ void *const address_;
+ const bool write_;
+
+ DISALLOW_COPY_AND_ASSIGN(RunObservers);
+};
+
+// Finishes the locking of a mutex by potentially clearing FUTEX_OWNER_DIED in
+// the futex and returning the correct value.
+inline int mutex_finish_lock(aos_mutex *m) {
+ const uint32_t value = __atomic_load_n(&m->futex, __ATOMIC_ACQUIRE);
+ if (__builtin_expect((value & FUTEX_OWNER_DIED) != 0, false)) {
+ __atomic_and_fetch(&m->futex, ~FUTEX_OWNER_DIED, __ATOMIC_RELAXED);
+ force_lock_pthread_mutex(m);
+ return 1;
+ } else {
+ lock_pthread_mutex(m);
+ return 0;
+ }
+}
+
+// Split out separately from mutex_get so condition_wait can call it and use its
+// own my_robust_list::Adder.
+inline int mutex_do_get(aos_mutex *m, bool signals_fail,
+ const struct timespec *timeout, uint32_t tid) {
+ RunObservers run_observers(m, true);
+ if (kPrintOperations) {
+ printf("%" PRId32 ": %p do_get\n", tid, m);
+ }
+
+ while (true) {
+ // If the atomic 0->TID transition fails.
+ if (!compare_and_swap(&m->futex, 0, tid)) {
+ // Wait in the kernel, which handles atomically ORing in FUTEX_WAITERS
+ // before actually sleeping.
+ const int ret = sys_futex_wait(FUTEX_LOCK_PI, &m->futex, 1, timeout);
+ if (ret != 0) {
+ if (timeout != NULL && ret == -ETIMEDOUT) {
+ return 3;
+ }
+ if (__builtin_expect(ret == -EINTR, true)) {
+ if (signals_fail) {
+ return 2;
+ } else {
+ continue;
+ }
+ }
+ my_robust_list::robust_head.pending_next = 0;
+ if (ret == -EDEADLK) {
+ LOG(FATAL, "multiple lock of %p by %" PRId32 "\n", m, tid);
+ }
+ PELOG(FATAL, -ret, "FUTEX_LOCK_PI(%p(=%" PRIu32 "), 1, %p) failed",
+ &m->futex, __atomic_load_n(&m->futex, __ATOMIC_SEQ_CST), timeout);
+ } else {
+ if (kLockDebug) {
+ printf("%" PRId32 ": %p kernel lock done\n", tid, m);
+ }
+ // The kernel already handled setting the value to our TID (ish).
+ break;
+ }
+ } else {
+ if (kLockDebug) {
+ printf("%" PRId32 ": %p fast lock done\n", tid, m);
+ }
+ lock_pthread_mutex(m);
+ // Fastpath succeeded, so no need to call into the kernel.
+ // Because this is the fastpath, it's a good idea to avoid even having to
+ // load the value again down below.
+ return 0;
+ }
+ }
+
+ return mutex_finish_lock(m);
+}
+
+// The common implementation for everything that wants to lock a mutex.
+// If signals_fail is false, the function will try again if the wait syscall is
+// interrupted by a signal.
+// timeout can be NULL for no timeout.
+inline int mutex_get(aos_mutex *m, bool signals_fail,
+ const struct timespec *timeout) {
+ const uint32_t tid = get_tid();
+ my_robust_list::Adder adder(m);
+ const int r = mutex_do_get(m, signals_fail, timeout, tid);
+ if (r == 0 || r == 1) adder.Add();
+ return r;
+}
+
+// The common implementation for broadcast and signal.
+// number_requeue is the number of waiters to requeue (probably INT_MAX or 0). 1
+// will always be woken.
+void condition_wake(aos_condition *c, aos_mutex *m, int number_requeue) {
+ RunObservers run_observers(c, true);
+ // Make it so that anybody just going to sleep won't.
+ // This is where we might accidentally wake more than just 1 waiter with 1
+ // signal():
+ // 1 already sleeping will be woken but n might never actually make it to
+ // sleep in the kernel because of this.
+ uint32_t new_value = __atomic_add_fetch(c, 1, __ATOMIC_SEQ_CST);
+
+ if (USE_REQUEUE_PI) {
+ while (true) {
+ // This really wants to be FUTEX_REQUEUE_PI, but the kernel doesn't have
+ // that... However, the code to support that is in the kernel, so it might
+ // be a good idea to patch it to support that and use it iff it's there.
+ const int ret =
+ sys_futex_cmp_requeue_pi(c, 1, number_requeue, &m->futex, new_value);
+ if (ret < 0) {
+ // If the value got changed out from under us (aka somebody else did a
+ // condition_wake).
+ if (__builtin_expect(ret == -EAGAIN, true)) {
+ // If we're doing a broadcast, the other guy might have done a signal
+ // instead, so we have to try again.
+ // If we're doing a signal, we have to go again to make sure that 2
+ // signals wake 2 processes.
+ new_value = __atomic_load_n(c, __ATOMIC_RELAXED);
+ continue;
+ }
+ my_robust_list::robust_head.pending_next = 0;
+ PELOG(FATAL, -ret, "FUTEX_CMP_REQUEUE_PI(%p, 1, %d, %p, *%p) failed",
+ c, number_requeue, &m->futex, c);
+ } else {
+ return;
+ }
+ }
+ } else {
+ const int ret = sys_futex_wake(
+ c, ::std::min(::std::max(number_requeue, 1), INT_MAX - 4096));
+ if (__builtin_expect(
+ static_cast<unsigned int>(ret) > static_cast<unsigned int>(-4096),
+ false)) {
+ my_robust_list::robust_head.pending_next = 0;
+ PELOG(FATAL, -ret, "FUTEX_WAKE(%p, %d) failed", c, INT_MAX - 4096);
+ }
+ }
+}
+
+} // namespace
+
+int mutex_lock(aos_mutex *m) {
+ return mutex_get(m, true, NULL);
+}
+int mutex_lock_timeout(aos_mutex *m, const struct timespec *timeout) {
+ return mutex_get(m, true, timeout);
+}
+int mutex_grab(aos_mutex *m) {
+ return mutex_get(m, false, NULL);
+}
+
+void mutex_unlock(aos_mutex *m) {
+ RunObservers run_observers(m, true);
+ const uint32_t tid = get_tid();
+ if (kPrintOperations) {
+ printf("%" PRId32 ": %p unlock\n", tid, m);
+ }
+
+ const uint32_t value = __atomic_load_n(&m->futex, __ATOMIC_SEQ_CST);
+ if (__builtin_expect((value & FUTEX_TID_MASK) != tid, false)) {
+ my_robust_list::robust_head.pending_next = 0;
+ check_cached_tid(tid);
+ if ((value & FUTEX_TID_MASK) == 0) {
+ LOG(FATAL, "multiple unlock of aos_mutex %p by %" PRId32 "\n", m, tid);
+ } else {
+ LOG(FATAL, "aos_mutex %p is locked by %" PRId32 ", not %" PRId32 "\n",
+ m, value & FUTEX_TID_MASK, tid);
+ }
+ }
+
+ my_robust_list::Remover remover(m);
+ unlock_pthread_mutex(m);
+
+ // If the atomic TID->0 transition fails (ie FUTEX_WAITERS is set),
+ if (!compare_and_swap(&m->futex, tid, 0)) {
+ // The kernel handles everything else.
+ const int ret = sys_futex_unlock_pi(&m->futex);
+ if (ret != 0) {
+ my_robust_list::robust_head.pending_next = 0;
+ PELOG(FATAL, -ret, "FUTEX_UNLOCK_PI(%p) failed", &m->futex);
+ }
+ } else {
+ // There aren't any waiters, so no need to call into the kernel.
+ }
+}
+
+int mutex_trylock(aos_mutex *m) {
+ RunObservers run_observers(m, true);
+ const uint32_t tid = get_tid();
+ if (kPrintOperations) {
+ printf("%" PRId32 ": %p trylock\n", tid, m);
+ }
+ my_robust_list::Adder adder(m);
+
+ // Try an atomic 0->TID transition.
+ uint32_t c = compare_and_swap_val(&m->futex, 0, tid);
+
+ if (c != 0) {
+ if (__builtin_expect((c & FUTEX_OWNER_DIED) == 0, true)) {
+ // Somebody else had it locked; we failed.
+ return 4;
+ } else {
+ // FUTEX_OWNER_DIED was set, so we have to call into the kernel to deal
+ // with resetting it.
+ const int ret = sys_futex_wait(FUTEX_TRYLOCK_PI, &m->futex, 0, NULL);
+ if (ret == 0) {
+ adder.Add();
+ // Only clear the owner died if somebody else didn't do the recovery
+ // and then unlock before our TRYLOCK happened.
+ return mutex_finish_lock(m);
+ } else {
+ // EWOULDBLOCK means that somebody else beat us to it.
+ if (__builtin_expect(ret == -EWOULDBLOCK, true)) {
+ return 4;
+ }
+ my_robust_list::robust_head.pending_next = 0;
+ PELOG(FATAL, -ret, "FUTEX_TRYLOCK_PI(%p, 0, NULL) failed", &m->futex);
+ }
+ }
+ }
+
+ lock_pthread_mutex(m);
+ adder.Add();
+ return 0;
+}
+
+bool mutex_islocked(const aos_mutex *m) {
+ const uint32_t tid = get_tid();
+
+ const uint32_t value = __atomic_load_n(&m->futex, __ATOMIC_RELAXED);
+ return (value & FUTEX_TID_MASK) == tid;
+}
+
+int condition_wait(aos_condition *c, aos_mutex *m) {
+ RunObservers run_observers(c, false);
+ const uint32_t tid = get_tid();
+ const uint32_t wait_start = __atomic_load_n(c, __ATOMIC_SEQ_CST);
+
+ mutex_unlock(m);
+
+ my_robust_list::Adder adder(m);
+
+ while (true) {
+ // Wait in the kernel iff the value of it doesn't change (ie somebody else
+ // does a wake) from before we unlocked the mutex.
+ int ret;
+ if (USE_REQUEUE_PI) {
+ ret = sys_futex_wait_requeue_pi(c, wait_start, nullptr, &m->futex);
+ } else {
+ ret = sys_futex_wait(FUTEX_WAIT, c, wait_start, nullptr);
+ }
+ if (ret != 0) {
+ // If it failed because somebody else did a wake and changed the value
+ // before we actually made it to sleep.
+ if (__builtin_expect(ret == -EAGAIN, true)) {
+ // There's no need to unconditionally set FUTEX_WAITERS here if we're
+ // using REQUEUE_PI because the kernel automatically does that in the
+ // REQUEUE_PI iff it requeued anybody.
+ // If we're not using REQUEUE_PI, then everything is just normal locks
+ // etc, so there's no need to do anything special there either.
+
+ // We have to relock it ourself because the kernel didn't do it.
+ const int r = mutex_do_get(m, false, nullptr, tid);
+ assert(__builtin_expect(r == 0 || r == 1, true));
+ adder.Add();
+ return r;
+ }
+ // Try again if it was because of a signal.
+ if (__builtin_expect(ret == -EINTR, true)) continue;
+ my_robust_list::robust_head.pending_next = 0;
+ if (USE_REQUEUE_PI) {
+ PELOG(FATAL, -ret, "FUTEX_WAIT_REQUEUE_PI(%p, %" PRIu32 ", %p) failed",
+ c, wait_start, &m->futex);
+ } else {
+ PELOG(FATAL, -ret, "FUTEX_WAIT(%p, %" PRIu32 ", nullptr) failed",
+ c, wait_start);
+ }
+ } else {
+ if (USE_REQUEUE_PI) {
+ // Record that the kernel relocked it for us.
+ lock_pthread_mutex(m);
+ } else {
+ // We have to take the lock ourself because the kernel won't, but
+ // there's no need for it to be anything special because all waiters
+ // just relock it like usual.
+ const int r = mutex_do_get(m, false, nullptr, tid);
+ assert(__builtin_expect(r == 0 || r == 1, true));
+ adder.Add();
+ return r;
+ }
+
+ // We succeeded in waiting, and the kernel took care of locking the mutex
+ // for us and setting FUTEX_WAITERS iff it needed to (for REQUEUE_PI).
+
+ adder.Add();
+
+ const uint32_t value = __atomic_load_n(&m->futex, __ATOMIC_RELAXED);
+ if (__builtin_expect((value & FUTEX_OWNER_DIED) != 0, false)) {
+ __atomic_and_fetch(&m->futex, ~FUTEX_OWNER_DIED, __ATOMIC_RELAXED);
+ return 1;
+ } else {
+ return 0;
+ }
+ }
+ }
+}
+
+void condition_signal(aos_condition *c, aos_mutex *m) {
+ condition_wake(c, m, 0);
+}
+
+void condition_broadcast(aos_condition *c, aos_mutex *m) {
+ condition_wake(c, m, INT_MAX);
+}
+
+int futex_wait_timeout(aos_futex *m, const struct timespec *timeout) {
+ RunObservers run_observers(m, false);
+ const int ret = sys_futex_wait(FUTEX_WAIT, m, 0, timeout);
+ if (ret != 0) {
+ if (ret == -EINTR) {
+ return 1;
+ } else if (ret == -ETIMEDOUT) {
+ return 2;
+ } else if (ret != -EWOULDBLOCK) {
+ errno = -ret;
+ return -1;
+ }
+ }
+ ANNOTATE_HAPPENS_AFTER(m);
+ return 0;
+}
+
+int futex_wait(aos_futex *m) { return futex_wait_timeout(m, NULL); }
+
+int futex_set_value(aos_futex *m, uint32_t value) {
+ RunObservers run_observers(m, false);
+ ANNOTATE_HAPPENS_BEFORE(m);
+ __atomic_store_n(m, value, __ATOMIC_SEQ_CST);
+ const int r = sys_futex_wake(m, INT_MAX - 4096);
+ if (__builtin_expect(
+ static_cast<unsigned int>(r) > static_cast<unsigned int>(-4096),
+ false)) {
+ errno = -r;
+ return -1;
+ } else {
+ return r;
+ }
+}
+
+int futex_set(aos_futex *m) {
+ return futex_set_value(m, 1);
+}
+
+int futex_unset(aos_futex *m) {
+ return !__atomic_exchange_n(m, 0, __ATOMIC_SEQ_CST);
+}
+
+namespace aos {
+namespace linux_code {
+namespace ipc_lib {
+
+// Sets functions to run befor eand after all futex operations.
+// This is important when doing robustness testing because the memory has to be
+// made writable for the whole futex operation, otherwise it never succeeds.
+void SetFutexAccessorObservers(FutexAccessorObserver before,
+ FutexAccessorObserver after) {
+ before_observer = before;
+ after_observer = after;
+}
+
+// Sets an extra offset between mutexes and the value we use for them in the
+// robust list (only the forward pointers). This is used to work around a kernel
+// bug by keeping a second set of mutexes which is always writable so the kernel
+// won't go into an infinite loop when trying to unlock them.
+void SetRobustListOffset(ptrdiff_t offset) {
+ my_robust_list::SetRobustListOffset(offset);
+}
+
+// Returns true iff there are any mutexes locked by the current thread.
+// This is mainly useful for testing.
+bool HaveLockedMutexes() {
+ return my_robust_list::HaveLockedMutexes();
+}
+
+} // namespace ipc_lib
+} // namespace linux_code
+} // namespace aos
diff --git a/aos/ipc_lib/aos_sync.h b/aos/ipc_lib/aos_sync.h
new file mode 100644
index 0000000..ae95c6d
--- /dev/null
+++ b/aos/ipc_lib/aos_sync.h
@@ -0,0 +1,186 @@
+#ifndef AOS_IPC_LIB_SYNC_H_
+#define AOS_IPC_LIB_SYNC_H_
+
+#include <stdlib.h>
+#include <signal.h>
+#include <stdint.h>
+#include <stddef.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif // __cplusplus
+
+// TODO(brians) add client requests to make helgrind useful with this code
+// <http://www.valgrind.org/docs/manual/hg-manual.html#hg-manual.client-requests>
+// and <http://www.valgrind.org/docs/manual/drd-manual.html#drd-manual.clientreqs>
+// list the interesting ones
+
+// Have to remember to align structs containing it (recursively) to sizeof(int).
+// Valid initial values for use with futex_ functions are 0 (unset) and 1 (set).
+// The value should not be changed after multiple processes have started
+// accessing an instance except through the functions declared in this file.
+typedef uint32_t aos_futex __attribute__((aligned(sizeof(int))));
+
+// For use with the condition_ functions.
+// No initialization is necessary.
+typedef aos_futex aos_condition;
+
+// For use with the mutex_ functions.
+// futex must be initialized to 0.
+// No initialization is necessary for next and previous.
+// Under ThreadSanitizer, pthread_mutex_init must be initialized to false.
+// The recommended way to initialize one of these is by memset(3)ing the whole
+// thing to 0 or using C++ () initialization to avoid depending on the
+// implementation.
+struct aos_mutex {
+ // 2 links to get O(1) adds and removes.
+ // This is &next of another element.
+ // next (might) have stuff |ed into it to indicate PI futexes and might also
+ // have an offset (see SetRobustListOffset); previous is an actual pointer
+ // without any of that.
+ // next has to stay the first element of this structure.
+ uintptr_t next;
+ struct aos_mutex *previous;
+ aos_futex futex;
+#ifdef AOS_SANITIZER_thread
+ // Internal pthread mutex which is kept in sync with the actual mutex so tsan
+ // can understand what's happening and help catch bugs.
+ pthread_mutex_t pthread_mutex;
+#ifndef __cplusplus
+ // TODO(brian): Remove this once the stupid C code is gone...
+#define bool uint8_t
+#endif
+ bool pthread_mutex_init;
+#ifndef __cplusplus
+#undef bool
+#endif
+#endif
+};
+
+// The mutex_ functions are designed to be used as mutexes. A mutex can only be
+// unlocked from the same task which originally locked it. Also, if a task dies
+// while holding a mutex, the next person who locks it will be notified. After a
+// fork(2), any mutexes held will be held ONLY in the parent process. Attempting
+// to unlock them from the child will give errors.
+// Priority inheritance (aka priority inversion protection) is enabled.
+
+// All of these return 1 if the previous owner died with it held, 2 if
+// interrupted by a signal, 3 if timed out, or 4 if an optional lock fails. Some
+// of them (obviously) can never return some of those values.
+//
+// One of the highest priority processes blocked on a given mutex will be the
+// one to lock it when it is unlocked.
+int mutex_lock(struct aos_mutex *m) __attribute__((warn_unused_result));
+// Returns 2 if it timed out or 1 if interrupted by a signal.
+int mutex_lock_timeout(struct aos_mutex *m, const struct timespec *timeout)
+ __attribute__((warn_unused_result));
+// Ignores signals (retries until something other than getting a signal
+// happens).
+int mutex_grab(struct aos_mutex *m) __attribute__((warn_unused_result));
+// LOG(FATAL)s for multiple unlocking.
+void mutex_unlock(struct aos_mutex *m);
+// Does not block waiting for the mutex.
+int mutex_trylock(struct aos_mutex *m) __attribute__((warn_unused_result));
+#ifdef __cplusplus
+// Returns whether or not the mutex is locked by this thread.
+// There aren't very many valid uses for this function; the main ones are
+// checking mutexes as they are destroyed to catch problems with that early and
+// stack-based recursive mutex locking.
+bool mutex_islocked(const aos_mutex *m);
+#endif
+
+// The futex_ functions are similar to the mutex_ ones but different.
+// They are designed for signalling when something happens (possibly to
+// multiple listeners). A aos_futex manipulated with them can only be set or
+// unset. Also, they can be set/unset/waited on from any task independently of
+// who did something first and have no priority inversion protection.
+// They return -1 for other error (which will be in errno from futex(2)).
+// They have no spurious wakeups (because everybody always gets woken up).
+//
+// Another name for this kind of synchronization mechanism is a "notification".
+// Python calls it an "event".
+//
+// They are different from the condition_ functions in that they do NOT work
+// correctly as standard condition variables. While it is possible to keep
+// track of the "condition" using the value part of the futex_* functions, the
+// obvious implementation has basically the same race condition that condition
+// variables are designed to prevent between somebody else grabbing the mutex
+// and changing whether it's set or not and the futex_ function changing the
+// futex's value. A futex is effectively a resettable condition variable with
+// the condition being "has it been set"; if you have some other condition (for
+// example messages are available to read on a queue), use the condition_
+// functions or there will be race conditions.
+
+// Wait for the futex to be set. Will return immediately if it's already set
+// (after a syscall).
+// Returns 0 if successful or it was already set, 1 if interrupted by a signal,
+// or -1 with an error in errno. Can return 0 spuriously.
+int futex_wait(aos_futex *m) __attribute__((warn_unused_result));
+// The same as futex_wait except returns 2 if it times out.
+int futex_wait_timeout(aos_futex *m, const struct timespec *timeout)
+ __attribute__((warn_unused_result));
+
+// Set the futex and wake up anybody waiting on it.
+// Returns the number that were woken or -1 with an error in errno.
+//
+// This will always wake up all waiters at the same time and set the value to 1.
+int futex_set(aos_futex *m);
+// Same as above except lets something other than 1 be used as the final value.
+int futex_set_value(aos_futex *m, aos_futex value);
+// Unsets the futex (sets the value to 0).
+// Returns 0 if it was set before and 1 if it wasn't.
+// Can not fail.
+int futex_unset(aos_futex *m);
+
+// The condition_ functions implement condition variable support. The API is
+// similar to the pthreads api and works the same way. The same m argument must
+// be passed in for all calls to all of the condition_ functions with a given c.
+// They do have the potential for spurious wakeups.
+
+// Wait for the condition variable to be signalled. m will be unlocked
+// atomically with actually starting to wait. m is guaranteed to be locked when
+// this function returns.
+// NOTE: The relocking of m is not atomic with stopping the actual wait and
+// other process(es) may lock (+unlock) the mutex first.
+// Returns 0 on success or 1 if the previous owner died.
+int condition_wait(aos_condition *c, struct aos_mutex *m)
+ __attribute__((warn_unused_result));
+// If any other processes are condition_waiting on c, wake 1 of them. Does not
+// require m to be locked.
+// NOTE: There is a small chance that this will wake more than just 1 waiter.
+void condition_signal(aos_condition *c, struct aos_mutex *m);
+// Wakes all processes that are condition_waiting on c. Does not require m to be
+// locked.
+void condition_broadcast(aos_condition *c, struct aos_mutex *m);
+
+#ifdef __cplusplus
+}
+
+namespace aos {
+namespace linux_code {
+namespace ipc_lib {
+
+typedef void (*FutexAccessorObserver)(void *address, bool write);
+
+// Set functions which get called before and after all futex operations.
+void SetFutexAccessorObservers(FutexAccessorObserver before,
+ FutexAccessorObserver after);
+
+// Set the offset to use for putting addresses into the robust list.
+// This is necessary to work around a kernel bug where it hangs when trying to
+// deal with a futex on the robust list when its memory has been changed to
+// read-only.
+void SetRobustListOffset(ptrdiff_t offset);
+
+// Returns true if there are any mutexes still locked by this task.
+// This is mainly useful for verifying tests don't mess up other ones by leaving
+// now-freed but still locked mutexes around.
+bool HaveLockedMutexes();
+
+} // namespace ipc_lib
+} // namespace linux_code
+} // namespace aos
+
+#endif // __cplusplus
+
+#endif // AOS_IPC_LIB_SYNC_H_
diff --git a/aos/ipc_lib/core_lib.c b/aos/ipc_lib/core_lib.c
new file mode 100644
index 0000000..a1e3315
--- /dev/null
+++ b/aos/ipc_lib/core_lib.c
@@ -0,0 +1,51 @@
+#include "aos/ipc_lib/core_lib.h"
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <assert.h>
+
+#include "aos/ipc_lib/shared_mem_types.h"
+
+static uint8_t aos_8max(uint8_t l, uint8_t r) {
+ return (l > r) ? l : r;
+}
+void *shm_malloc_aligned(size_t length, uint8_t alignment) {
+ // minimum alignments from
+ // <http://software.intel.com/en-us/articles/data-alignment-when-migrating-to-64-bit-intel-architecture/>
+ if (length <= 1) {
+ alignment = aos_8max(alignment, 1);
+ } else if (length <= 2) {
+ alignment = aos_8max(alignment, 2);
+ } else if (length <= 4) {
+ alignment = aos_8max(alignment, 4);
+ } else if (length <= 8) {
+ alignment = aos_8max(alignment, 8);
+ } else if (length <= 16) {
+ alignment = aos_8max(alignment, 16);
+ } else {
+ alignment = aos_8max(alignment, (length >= 64) ? 64 : 16);
+ }
+
+ void *msg = NULL;
+ aos_shm_core *shm_core = global_core->mem_struct;
+ int result =
+ mutex_grab(&shm_core->msg_alloc_lock);
+#ifdef NDEBUG
+ (void)result;
+#else
+ assert(result == 0);
+#endif
+ shm_core->msg_alloc = (uint8_t *)shm_core->msg_alloc - length;
+ const uint8_t align_extra = (uintptr_t)shm_core->msg_alloc % alignment;
+ shm_core->msg_alloc = (uint8_t *)shm_core->msg_alloc - align_extra;
+ msg = shm_core->msg_alloc;
+ if (msg <= global_core->shared_mem) {
+ fprintf(stderr, "core_lib: RAN OUT OF SHARED MEMORY!!!----------------------------------------------------------\n");
+ printf("if you didn't see the stderr output just then, you should have\n");
+ abort();
+ }
+ //printf("alloc %p\n", msg);
+ mutex_unlock(&shm_core->msg_alloc_lock);
+ return msg;
+}
+
diff --git a/aos/ipc_lib/core_lib.h b/aos/ipc_lib/core_lib.h
new file mode 100644
index 0000000..d62f602
--- /dev/null
+++ b/aos/ipc_lib/core_lib.h
@@ -0,0 +1,33 @@
+#ifndef _AOS_CORE_LIB_H_
+#define _AOS_CORE_LIB_H_
+
+#include <stdint.h>
+
+#include "aos/ipc_lib/aos_sync.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif // __cplusplus
+
+// alloc_size was taken out of clang in r197866. It appears that it never
+// actually did anything.
+#if defined(__clang__)
+#define attribute_alloc_size(n)
+#else
+#define attribute_alloc_size(n) __attribute__((alloc_size(n)))
+#endif
+
+void *shm_malloc_aligned(size_t length, uint8_t alignment)
+ attribute_alloc_size(1);
+static void *shm_malloc(size_t length) attribute_alloc_size(1);
+static inline void *shm_malloc(size_t length) {
+ return shm_malloc_aligned(length, 0);
+}
+
+#undef attribute_alloc_size
+
+#ifdef __cplusplus
+}
+#endif // __cplusplus
+
+#endif
diff --git a/aos/ipc_lib/ipc_comparison.cc b/aos/ipc_lib/ipc_comparison.cc
new file mode 100644
index 0000000..8774c73
--- /dev/null
+++ b/aos/ipc_lib/ipc_comparison.cc
@@ -0,0 +1,961 @@
+#include "third_party/gflags/include/gflags/gflags.h"
+
+#include <fcntl.h>
+#include <mqueue.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <pthread.h>
+#include <semaphore.h>
+#include <stdint.h>
+#include <sys/eventfd.h>
+#include <sys/msg.h>
+#include <sys/sem.h>
+#include <sys/socket.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <sys/un.h>
+
+#include <atomic>
+#include <chrono>
+#include <memory>
+#include <string>
+#include <thread>
+
+#include "aos/condition.h"
+#include "aos/event.h"
+#include "aos/logging/implementations.h"
+#include "aos/logging/logging.h"
+#include "aos/mutex/mutex.h"
+#include "aos/time/time.h"
+#include "aos/init.h"
+#include "aos/ipc_lib/queue.h"
+
+DEFINE_string(method, "", "Which IPC method to use");
+DEFINE_int32(messages, 1000000, "How many messages to send back and forth");
+DEFINE_int32(client_cpu, 0, "CPU to pin client to");
+DEFINE_int32(server_cpu, 0, "CPU to pin server to");
+DEFINE_int32(client_priority, 1,
+ "Realtime priority for client. Negative for don't change");
+DEFINE_int32(server_priority, 1,
+ "Realtime priority for server. Negative for don't change");
+
+namespace aos {
+
+namespace chrono = ::std::chrono;
+
+// A generic interface for an object which can send some data to another thread
+// and back.
+//
+// One side is called the "server". It should constantly Wait, do something with
+// the result, and then call Pong.
+// The other side is called the "client". It should repeatedly call Ping.
+class PingPongerInterface {
+ public:
+ // A chunk of memory definitely on its own cache line anywhere sane.
+ typedef uint8_t Data[1024] __attribute__((aligned(128)));
+
+ virtual ~PingPongerInterface() {}
+
+ // Returns where the "client" side should write data in preparation to send to
+ // the server.
+ // The result is valid until the next Ping call.
+ virtual Data *PingData() = 0;
+
+ // Sends the data returned from the most recent PingData call to the "server"
+ // side and returns its response.
+ // PingData must be called exactly once before each call of this method.
+ // The result is valid until the next PingData call.
+ virtual const Data *Ping() = 0;
+
+ // Waits for a Ping call and then returns the associated data.
+ // The result is valid until the beginning of the next Pong call.
+ virtual const Data *Wait() = 0;
+
+ // Returns where the "server" side should write data in preparation to send
+ // back to the "client".
+ // The result is valid until the next Pong call.
+ virtual Data *PongData() = 0;
+
+ // Sends data back to an in-progress Ping.
+ // Sends the data returned from the most recent PongData call back to an
+ // in-progress Ping.
+ // PongData must be called exactly once before each call of this method.
+ virtual void Pong() = 0;
+};
+
+// Base class for implementations which simple use a pair of Data objects for
+// all Pings and Pongs.
+class StaticPingPonger : public PingPongerInterface {
+ public:
+ Data *PingData() override { return &ping_data_; }
+ Data *PongData() override { return &pong_data_; }
+
+ private:
+ Data ping_data_, pong_data_;
+};
+
+// Implements ping-pong by sending the data over file descriptors.
+class FDPingPonger : public StaticPingPonger {
+ protected:
+ // Subclasses must override and call Init.
+ FDPingPonger() {}
+
+ // Subclasses must call this in their constructor.
+ // Does not take ownership of any of the file descriptors, any/all of which
+ // may be the same.
+ // {server,client}_read must be open for reading and {server,client}_write
+ // must be open for writing.
+ void Init(int server_read, int server_write, int client_read,
+ int client_write) {
+ server_read_ = server_read;
+ server_write_ = server_write;
+ client_read_ = client_read;
+ client_write_ = client_write;
+ }
+
+ private:
+ const Data *Ping() override {
+ WriteFully(client_write_, *PingData());
+ ReadFully(client_read_, &read_by_client_);
+ return &read_by_client_;
+ }
+
+ const Data *Wait() override {
+ ReadFully(server_read_, &read_by_server_);
+ return &read_by_server_;
+ }
+
+ void Pong() override { WriteFully(server_write_, *PongData()); }
+
+ void ReadFully(int fd, Data *data) {
+ size_t remaining = sizeof(*data);
+ uint8_t *current = &(*data)[0];
+ while (remaining > 0) {
+ const ssize_t result = PCHECK(read(fd, current, remaining));
+ CHECK_LE(static_cast<size_t>(result), remaining);
+ remaining -= result;
+ current += result;
+ }
+ }
+
+ void WriteFully(int fd, const Data &data) {
+ size_t remaining = sizeof(data);
+ const uint8_t *current = &data[0];
+ while (remaining > 0) {
+ const ssize_t result = PCHECK(write(fd, current, remaining));
+ CHECK_LE(static_cast<size_t>(result), remaining);
+ remaining -= result;
+ current += result;
+ }
+ }
+
+ Data read_by_client_, read_by_server_;
+ int server_read_ = -1, server_write_ = -1, client_read_ = -1,
+ client_write_ = -1;
+};
+
+class PipePingPonger : public FDPingPonger {
+ public:
+ PipePingPonger() {
+ PCHECK(pipe(to_server));
+ PCHECK(pipe(from_server));
+ Init(to_server[0], from_server[1], from_server[0], to_server[1]);
+ }
+ ~PipePingPonger() {
+ PCHECK(close(to_server[0]));
+ PCHECK(close(to_server[1]));
+ PCHECK(close(from_server[0]));
+ PCHECK(close(from_server[1]));
+ }
+
+ private:
+ int to_server[2], from_server[2];
+};
+
+class NamedPipePingPonger : public FDPingPonger {
+ public:
+ NamedPipePingPonger() {
+ OpenFifo("/tmp/to_server", &client_write_, &server_read_);
+ OpenFifo("/tmp/from_server", &server_write_, &client_read_);
+
+ Init(server_read_, server_write_, client_read_, client_write_);
+ }
+ ~NamedPipePingPonger() {
+ PCHECK(close(server_read_));
+ PCHECK(close(client_write_));
+ PCHECK(close(client_read_));
+ PCHECK(close(server_write_));
+ }
+
+ private:
+ void OpenFifo(const char *name, int *write, int *read) {
+ {
+ const int ret = unlink(name);
+ if (ret == -1 && errno != ENOENT) {
+ PLOG(FATAL, "unlink(%s)", name);
+ }
+ PCHECK(mkfifo(name, S_IWUSR | S_IRUSR));
+ // Have to open it nonblocking because the other end isn't open yet...
+ *read = PCHECK(open(name, O_RDONLY | O_NONBLOCK));
+ *write = PCHECK(open(name, O_WRONLY));
+ {
+ const int flags = PCHECK(fcntl(*read, F_GETFL));
+ PCHECK(fcntl(*read, F_SETFL, flags & ~O_NONBLOCK));
+ }
+ }
+ }
+
+ int server_read_, server_write_, client_read_, client_write_;
+};
+
+class UnixPingPonger : public FDPingPonger {
+ public:
+ UnixPingPonger(int type) {
+ PCHECK(socketpair(AF_UNIX, type, 0, to_server));
+ PCHECK(socketpair(AF_UNIX, type, 0, from_server));
+ Init(to_server[0], from_server[1], from_server[0], to_server[1]);
+ }
+ ~UnixPingPonger() {
+ PCHECK(close(to_server[0]));
+ PCHECK(close(to_server[1]));
+ PCHECK(close(from_server[0]));
+ PCHECK(close(from_server[1]));
+ }
+
+ private:
+ int to_server[2], from_server[2];
+};
+
+class TCPPingPonger : public FDPingPonger {
+ public:
+ TCPPingPonger(bool nodelay) {
+ server_ = PCHECK(socket(AF_INET, SOCK_STREAM, 0));
+ if (nodelay) {
+ const int yes = 1;
+ PCHECK(setsockopt(server_, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes)));
+ }
+ {
+ sockaddr_in server_address;
+ memset(&server_address, 0, sizeof(server_address));
+ server_address.sin_family = AF_INET;
+ server_address.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+ PCHECK(bind(server_, reinterpret_cast<sockaddr *>(&server_address),
+ sizeof(server_address)));
+ }
+ PCHECK(listen(server_, 1));
+
+ client_ = PCHECK(socket(AF_INET, SOCK_STREAM, 0));
+ if (nodelay) {
+ const int yes = 1;
+ PCHECK(setsockopt(client_, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes)));
+ }
+ {
+ sockaddr_in client_address;
+ unsigned int length = sizeof(client_address);
+ PCHECK(getsockname(server_, reinterpret_cast<sockaddr *>(&client_address),
+ &length));
+ PCHECK(connect(client_, reinterpret_cast<sockaddr *>(&client_address),
+ length));
+ }
+ server_connection_ = PCHECK(accept(server_, nullptr, 0));
+
+ Init(server_connection_, server_connection_, client_, client_);
+ }
+ ~TCPPingPonger() {
+ PCHECK(close(client_));
+ PCHECK(close(server_connection_));
+ PCHECK(close(server_));
+ }
+
+ private:
+ int server_, client_, server_connection_;
+};
+
+class UDPPingPonger : public FDPingPonger {
+ public:
+ UDPPingPonger() {
+ CreatePair(&server_read_, &client_write_);
+ CreatePair(&client_read_, &server_write_);
+
+ Init(server_read_, server_write_, client_read_, client_write_);
+ }
+ ~UDPPingPonger() {
+ PCHECK(close(server_read_));
+ PCHECK(close(client_write_));
+ PCHECK(close(client_read_));
+ PCHECK(close(server_write_));
+ }
+
+ private:
+ void CreatePair(int *server, int *client) {
+ *server = PCHECK(socket(AF_INET, SOCK_DGRAM, 0));
+ {
+ sockaddr_in server_address;
+ memset(&server_address, 0, sizeof(server_address));
+ server_address.sin_family = AF_INET;
+ server_address.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+ // server_address.sin_port = htons(server_ + 3000);
+ PCHECK(bind(*server, reinterpret_cast<sockaddr *>(&server_address),
+ sizeof(server_address)));
+ }
+ *client = PCHECK(socket(AF_INET, SOCK_DGRAM, 0));
+ {
+ sockaddr_in client_address;
+ unsigned int length = sizeof(client_address);
+ PCHECK(getsockname(*server, reinterpret_cast<sockaddr *>(&client_address),
+ &length));
+ PCHECK(connect(*client, reinterpret_cast<sockaddr *>(&client_address),
+ length));
+ }
+ }
+
+ int server_read_, server_write_, client_read_, client_write_;
+};
+
+// Implements ping-pong without copying the data using a condition variable-like
+// interface.
+class ConditionVariablePingPonger : public StaticPingPonger {
+ protected:
+ // Represents a condition variable bundled with a mutex.
+ //
+ // Wait may return spuriously.
+ class ConditionVariableInterface {
+ public:
+ virtual ~ConditionVariableInterface() {}
+
+ // Locks the mutex.
+ virtual void Lock() = 0;
+
+ // Unlocks the mutex.
+ virtual void Unlock() = 0;
+
+ // Waits on the condition variable.
+ //
+ // The mutex must be locked when this is called.
+ virtual void Wait() = 0;
+
+ // Signals the condition variable.
+ //
+ // The mutex does not have to be locked during this.
+ virtual void Signal() = 0;
+ };
+
+ ConditionVariablePingPonger(
+ ::std::unique_ptr<ConditionVariableInterface> ping,
+ ::std::unique_ptr<ConditionVariableInterface> pong)
+ : ping_(::std::move(ping)), pong_(::std::move(pong)) {}
+
+ private:
+ const Data *Ping() override {
+ ping_->Lock();
+ to_server_ = PingData();
+ ping_->Unlock();
+ ping_->Signal();
+ pong_->Lock();
+ while (from_server_ == nullptr) {
+ pong_->Wait();
+ }
+ const Data *r = from_server_;
+ from_server_ = nullptr;
+ pong_->Unlock();
+ return r;
+ }
+
+ const Data *Wait() override {
+ ping_->Lock();
+ while (to_server_ == nullptr) {
+ ping_->Wait();
+ }
+ const Data *r = to_server_;
+ to_server_ = nullptr;
+ ping_->Unlock();
+ return r;
+ }
+
+ void Pong() override {
+ pong_->Lock();
+ from_server_ = PongData();
+ pong_->Unlock();
+ pong_->Signal();
+ }
+
+ const Data *to_server_ = nullptr, *from_server_ = nullptr;
+ const ::std::unique_ptr<ConditionVariableInterface> ping_, pong_;
+};
+
+// Implements ping-pong without copying the data using a semaphore-like
+// interface.
+class SemaphorePingPonger : public StaticPingPonger {
+ protected:
+ // Represents a semaphore, which need only count to 1.
+ //
+ // The behavior when calling Get/Put in anything other than alternating order
+ // is undefined.
+ //
+ // Wait may NOT return spuriously.
+ class SemaphoreInterface {
+ public:
+ virtual ~SemaphoreInterface() {}
+
+ virtual void Get() = 0;
+ virtual void Put() = 0;
+ };
+
+ SemaphorePingPonger(::std::unique_ptr<SemaphoreInterface> ping,
+ ::std::unique_ptr<SemaphoreInterface> pong)
+ : ping_(::std::move(ping)), pong_(::std::move(pong)) {}
+
+ private:
+ const Data *Ping() override {
+ to_server_ = PingData();
+ ping_->Put();
+ pong_->Get();
+ return from_server_;
+ }
+
+ const Data *Wait() override {
+ ping_->Get();
+ return to_server_;
+ }
+
+ void Pong() override {
+ from_server_ = PongData();
+ pong_->Put();
+ }
+
+ const Data *to_server_ = nullptr, *from_server_ = nullptr;
+ const ::std::unique_ptr<SemaphoreInterface> ping_, pong_;
+};
+
+
+class AOSMutexPingPonger : public ConditionVariablePingPonger {
+ public:
+ AOSMutexPingPonger()
+ : ConditionVariablePingPonger(
+ ::std::unique_ptr<ConditionVariableInterface>(
+ new AOSConditionVariable()),
+ ::std::unique_ptr<ConditionVariableInterface>(
+ new AOSConditionVariable())) {}
+
+ private:
+ class AOSConditionVariable : public ConditionVariableInterface {
+ public:
+ AOSConditionVariable() : condition_(&mutex_) {}
+
+ private:
+ void Lock() override { CHECK(!mutex_.Lock()); }
+ void Unlock() override { mutex_.Unlock(); }
+ void Wait() override { CHECK(!condition_.Wait()); }
+ void Signal() override { condition_.Broadcast(); }
+
+ Mutex mutex_;
+ Condition condition_;
+ };
+};
+
+class AOSEventPingPonger : public SemaphorePingPonger {
+ public:
+ AOSEventPingPonger()
+ : SemaphorePingPonger(
+ ::std::unique_ptr<SemaphoreInterface>(
+ new AOSEventSemaphore()),
+ ::std::unique_ptr<SemaphoreInterface>(
+ new AOSEventSemaphore())) {}
+
+ private:
+ class AOSEventSemaphore : public SemaphoreInterface {
+ private:
+ void Get() override {
+ event_.Wait();
+ event_.Clear();
+ }
+ void Put() override { event_.Set(); }
+
+ Event event_;
+ };
+};
+
+class PthreadMutexPingPonger : public ConditionVariablePingPonger {
+ public:
+ PthreadMutexPingPonger(int pshared, bool pi)
+ : ConditionVariablePingPonger(
+ ::std::unique_ptr<ConditionVariableInterface>(
+ new PthreadConditionVariable(pshared, pi)),
+ ::std::unique_ptr<ConditionVariableInterface>(
+ new PthreadConditionVariable(pshared, pi))) {}
+
+ private:
+ class PthreadConditionVariable : public ConditionVariableInterface {
+ public:
+ PthreadConditionVariable(bool pshared, bool pi) {
+ {
+ pthread_condattr_t cond_attr;
+ PRCHECK(pthread_condattr_init(&cond_attr));
+ if (pshared) {
+ PRCHECK(
+ pthread_condattr_setpshared(&cond_attr, PTHREAD_PROCESS_SHARED));
+ }
+ PRCHECK(pthread_cond_init(&condition_, &cond_attr));
+ PRCHECK(pthread_condattr_destroy(&cond_attr));
+ }
+
+ {
+ pthread_mutexattr_t mutex_attr;
+ PRCHECK(pthread_mutexattr_init(&mutex_attr));
+ if (pshared) {
+ PRCHECK(pthread_mutexattr_setpshared(&mutex_attr,
+ PTHREAD_PROCESS_SHARED));
+ }
+ if (pi) {
+ PRCHECK(
+ pthread_mutexattr_setprotocol(&mutex_attr, PTHREAD_PRIO_INHERIT));
+ }
+ PRCHECK(pthread_mutex_init(&mutex_, nullptr));
+ PRCHECK(pthread_mutexattr_destroy(&mutex_attr));
+ }
+ }
+ ~PthreadConditionVariable() {
+ PRCHECK(pthread_mutex_destroy(&mutex_));
+ PRCHECK(pthread_cond_destroy(&condition_));
+ }
+
+ private:
+ void Lock() override { PRCHECK(pthread_mutex_lock(&mutex_)); }
+ void Unlock() override { PRCHECK(pthread_mutex_unlock(&mutex_)); }
+ void Wait() override { PRCHECK(pthread_cond_wait(&condition_, &mutex_)); }
+ void Signal() override { PRCHECK(pthread_cond_broadcast(&condition_)); }
+
+ pthread_cond_t condition_;
+ pthread_mutex_t mutex_;
+ };
+};
+
+class EventFDPingPonger : public SemaphorePingPonger {
+ public:
+ EventFDPingPonger()
+ : SemaphorePingPonger(
+ ::std::unique_ptr<SemaphoreInterface>(new EventFDSemaphore()),
+ ::std::unique_ptr<SemaphoreInterface>(new EventFDSemaphore())) {}
+
+ private:
+ class EventFDSemaphore : public SemaphoreInterface {
+ public:
+ EventFDSemaphore() : fd_(PCHECK(eventfd(0, 0))) {}
+ ~EventFDSemaphore() { PCHECK(close(fd_)); }
+
+ private:
+ void Get() override {
+ uint64_t value;
+ if (read(fd_, &value, sizeof(value)) != sizeof(value)) {
+ PLOG(FATAL, "reading from eventfd %d failed\n", fd_);
+ }
+ CHECK_EQ(1u, value);
+ }
+ void Put() override {
+ uint64_t value = 1;
+ if (write(fd_, &value, sizeof(value)) != sizeof(value)) {
+ PLOG(FATAL, "writing to eventfd %d failed\n", fd_);
+ }
+ }
+
+ const int fd_;
+ };
+};
+
+class SysvSemaphorePingPonger : public SemaphorePingPonger {
+ public:
+ SysvSemaphorePingPonger()
+ : SemaphorePingPonger(
+ ::std::unique_ptr<SemaphoreInterface>(new SysvSemaphore()),
+ ::std::unique_ptr<SemaphoreInterface>(new SysvSemaphore())) {}
+
+ private:
+ class SysvSemaphore : public SemaphoreInterface {
+ public:
+ SysvSemaphore()
+ : sem_id_(PCHECK(semget(IPC_PRIVATE, 1, 0600))) {}
+
+ private:
+ void Get() override {
+ struct sembuf op;
+ op.sem_num = 0;
+ op.sem_op = -1;
+ op.sem_flg = 0;
+ PCHECK(semop(sem_id_, &op, 1));
+ }
+ void Put() override {
+ struct sembuf op;
+ op.sem_num = 0;
+ op.sem_op = 1;
+ op.sem_flg = 0;
+ PCHECK(semop(sem_id_, &op, 1));
+ }
+
+ const int sem_id_;
+ };
+};
+
+class PosixSemaphorePingPonger : public SemaphorePingPonger {
+ protected:
+ PosixSemaphorePingPonger(sem_t *ping_sem, sem_t *pong_sem)
+ : SemaphorePingPonger(
+ ::std::unique_ptr<SemaphoreInterface>(new PosixSemaphore(ping_sem)),
+ ::std::unique_ptr<SemaphoreInterface>(
+ new PosixSemaphore(pong_sem))) {}
+
+ private:
+ class PosixSemaphore : public SemaphoreInterface {
+ public:
+ PosixSemaphore(sem_t *sem)
+ : sem_(sem) {}
+
+ private:
+ void Get() override { PCHECK(sem_wait(sem_)); }
+ void Put() override { PCHECK(sem_post(sem_)); }
+
+ sem_t *const sem_;
+ };
+};
+
+class SysvQueuePingPonger : public StaticPingPonger {
+ public:
+ SysvQueuePingPonger()
+ : ping_(PCHECK(msgget(IPC_PRIVATE, 0600))),
+ pong_(PCHECK(msgget(IPC_PRIVATE, 0600))) {}
+
+ const Data *Ping() override {
+ {
+ Message to_send;
+ memcpy(&to_send.data, PingData(), sizeof(Data));
+ PCHECK(msgsnd(ping_, &to_send, sizeof(Data), 0));
+ }
+ {
+ Message received;
+ PCHECK(msgrcv(pong_, &received, sizeof(Data), 1, 0));
+ memcpy(&pong_received_, &received.data, sizeof(Data));
+ }
+ return &pong_received_;
+ }
+
+ const Data *Wait() override {
+ {
+ Message received;
+ PCHECK(msgrcv(ping_, &received, sizeof(Data), 1, 0));
+ memcpy(&ping_received_, &received.data, sizeof(Data));
+ }
+ return &ping_received_;
+ }
+
+ virtual void Pong() override {
+ Message to_send;
+ memcpy(&to_send.data, PongData(), sizeof(Data));
+ PCHECK(msgsnd(pong_, &to_send, sizeof(Data), 0));
+ }
+
+ private:
+ struct Message {
+ long mtype = 1;
+ char data[sizeof(Data)];
+ };
+
+ Data ping_received_, pong_received_;
+
+ const int ping_, pong_;
+};
+
+class PosixQueuePingPonger : public StaticPingPonger {
+ public:
+ PosixQueuePingPonger() : ping_(Open("/ping")), pong_(Open("/pong")) {}
+ ~PosixQueuePingPonger() {
+ PCHECK(mq_close(ping_));
+ PCHECK(mq_close(pong_));
+ }
+
+ const Data *Ping() override {
+ PCHECK(mq_send(ping_, static_cast<char *>(static_cast<void *>(PingData())),
+ sizeof(Data), 1));
+ PCHECK(mq_receive(pong_,
+ static_cast<char *>(static_cast<void *>(&pong_received_)),
+ sizeof(Data), nullptr));
+ return &pong_received_;
+ }
+
+ const Data *Wait() override {
+ PCHECK(mq_receive(ping_,
+ static_cast<char *>(static_cast<void *>(&ping_received_)),
+ sizeof(Data), nullptr));
+ return &ping_received_;
+ }
+
+ virtual void Pong() override {
+ PCHECK(mq_send(pong_, static_cast<char *>(static_cast<void *>(PongData())),
+ sizeof(Data), 1));
+ }
+
+ private:
+ mqd_t Open(const char *name) {
+ if (mq_unlink(name) == -1 && errno != ENOENT) {
+ PLOG(FATAL, "mq_unlink(%s) failed", name);
+ }
+ struct mq_attr attr;
+ attr.mq_flags = 0;
+ attr.mq_maxmsg = 1;
+ attr.mq_msgsize = sizeof(Data);
+ attr.mq_curmsgs = 0;
+ const mqd_t r = mq_open(name, O_CREAT | O_RDWR | O_EXCL, 0600, &attr);
+ if (r == reinterpret_cast<mqd_t>(-1)) {
+ PLOG(FATAL, "mq_open(%s, O_CREAT | O_RDWR | O_EXCL) failed", name);
+ }
+ return r;
+ }
+
+ const mqd_t ping_, pong_;
+ Data ping_received_, pong_received_;
+};
+
+class PosixUnnamedSemaphorePingPonger : public PosixSemaphorePingPonger {
+ public:
+ PosixUnnamedSemaphorePingPonger(int pshared)
+ : PosixSemaphorePingPonger(&ping_sem_, &pong_sem_) {
+ PCHECK(sem_init(&ping_sem_, pshared, 0));
+ PCHECK(sem_init(&pong_sem_, pshared, 0));
+ }
+ ~PosixUnnamedSemaphorePingPonger() {
+ PCHECK(sem_destroy(&ping_sem_));
+ PCHECK(sem_destroy(&pong_sem_));
+ }
+
+ private:
+ sem_t ping_sem_, pong_sem_;
+};
+
+class PosixNamedSemaphorePingPonger : public PosixSemaphorePingPonger {
+ public:
+ PosixNamedSemaphorePingPonger()
+ : PosixSemaphorePingPonger(ping_sem_ = Open("/ping"),
+ pong_sem_ = Open("/pong")) {}
+ ~PosixNamedSemaphorePingPonger() {
+ PCHECK(sem_close(ping_sem_));
+ PCHECK(sem_close(pong_sem_));
+ }
+
+ private:
+ sem_t *Open(const char *name) {
+ if (sem_unlink(name) == -1 && errno != ENOENT) {
+ PLOG(FATAL, "shm_unlink(%s) failed", name);
+ }
+ sem_t *const r = sem_open(name, O_CREAT | O_RDWR | O_EXCL, 0600, 0);
+ if (r == SEM_FAILED) {
+ PLOG(FATAL, "sem_open(%s, O_CREAT | O_RDWR | O_EXCL) failed", name);
+ }
+ return r;
+ }
+
+ sem_t *ping_sem_, *pong_sem_;
+};
+
+class AOSQueuePingPonger : public PingPongerInterface {
+ public:
+ AOSQueuePingPonger()
+ : ping_queue_(RawQueue::Fetch("ping", sizeof(Data), 0, 1)),
+ pong_queue_(RawQueue::Fetch("pong", sizeof(Data), 0, 1)) {}
+
+ Data *PingData() override {
+ CHECK_EQ(nullptr, ping_to_send_);
+ ping_to_send_ = static_cast<Data *>(ping_queue_->GetMessage());
+ return ping_to_send_;
+ }
+
+ const Data *Ping() override {
+ CHECK_NE(nullptr, ping_to_send_);
+ CHECK(ping_queue_->WriteMessage(ping_to_send_, RawQueue::kBlock));
+ ping_to_send_ = nullptr;
+ pong_queue_->FreeMessage(pong_received_);
+ pong_received_ =
+ static_cast<const Data *>(pong_queue_->ReadMessage(RawQueue::kBlock));
+ return pong_received_;
+ }
+
+ const Data *Wait() override {
+ ping_queue_->FreeMessage(ping_received_);
+ ping_received_ =
+ static_cast<const Data *>(ping_queue_->ReadMessage(RawQueue::kBlock));
+ return ping_received_;
+ }
+
+ Data *PongData() override {
+ CHECK_EQ(nullptr, pong_to_send_);
+ pong_to_send_ = static_cast<Data *>(pong_queue_->GetMessage());
+ return pong_to_send_;
+ }
+
+ void Pong() override {
+ CHECK_NE(nullptr, pong_to_send_);
+ CHECK(pong_queue_->WriteMessage(pong_to_send_, RawQueue::kBlock));
+ pong_to_send_ = nullptr;
+ }
+
+ private:
+ RawQueue *const ping_queue_;
+ RawQueue *const pong_queue_;
+
+ Data *ping_to_send_ = nullptr, *pong_to_send_ = nullptr;
+ const Data *ping_received_ = nullptr, *pong_received_ = nullptr;
+};
+
+int Main(int /*argc*/, char **argv) {
+ ::std::unique_ptr<PingPongerInterface> ping_ponger;
+ if (FLAGS_method == "pipe") {
+ ping_ponger.reset(new PipePingPonger());
+ } else if (FLAGS_method == "named_pipe") {
+ ping_ponger.reset(new NamedPipePingPonger());
+ } else if (FLAGS_method == "aos_mutex") {
+ ping_ponger.reset(new AOSMutexPingPonger());
+ } else if (FLAGS_method == "aos_event") {
+ ping_ponger.reset(new AOSEventPingPonger());
+ } else if (FLAGS_method == "pthread_mutex") {
+ ping_ponger.reset(new PthreadMutexPingPonger(false, false));
+ } else if (FLAGS_method == "pthread_mutex_pshared") {
+ ping_ponger.reset(new PthreadMutexPingPonger(true, false));
+ } else if (FLAGS_method == "pthread_mutex_pshared_pi") {
+ ping_ponger.reset(new PthreadMutexPingPonger(true, true));
+ } else if (FLAGS_method == "pthread_mutex_pi") {
+ ping_ponger.reset(new PthreadMutexPingPonger(false, true));
+ } else if (FLAGS_method == "aos_queue") {
+ ping_ponger.reset(new AOSQueuePingPonger());
+ } else if (FLAGS_method == "eventfd") {
+ ping_ponger.reset(new EventFDPingPonger());
+ } else if (FLAGS_method == "sysv_semaphore") {
+ ping_ponger.reset(new SysvSemaphorePingPonger());
+ } else if (FLAGS_method == "sysv_queue") {
+ ping_ponger.reset(new SysvQueuePingPonger());
+ } else if (FLAGS_method == "posix_semaphore_unnamed_shared") {
+ ping_ponger.reset(new PosixUnnamedSemaphorePingPonger(1));
+ } else if (FLAGS_method == "posix_semaphore_unnamed_unshared") {
+ ping_ponger.reset(new PosixUnnamedSemaphorePingPonger(0));
+ } else if (FLAGS_method == "posix_semaphore_named") {
+ ping_ponger.reset(new PosixNamedSemaphorePingPonger());
+ } else if (FLAGS_method == "posix_queue") {
+ ping_ponger.reset(new PosixQueuePingPonger());
+ } else if (FLAGS_method == "unix_stream") {
+ ping_ponger.reset(new UnixPingPonger(SOCK_STREAM));
+ } else if (FLAGS_method == "unix_datagram") {
+ ping_ponger.reset(new UnixPingPonger(SOCK_DGRAM));
+ } else if (FLAGS_method == "unix_seqpacket") {
+ ping_ponger.reset(new UnixPingPonger(SOCK_SEQPACKET));
+ } else if (FLAGS_method == "tcp") {
+ ping_ponger.reset(new TCPPingPonger(false));
+ } else if (FLAGS_method == "tcp_nodelay") {
+ ping_ponger.reset(new TCPPingPonger(true));
+ } else if (FLAGS_method == "udp") {
+ ping_ponger.reset(new UDPPingPonger());
+ } else {
+ fprintf(stderr, "Unknown IPC method to test '%s'\n", FLAGS_method.c_str());
+ ::gflags::ShowUsageWithFlags(argv[0]);
+ return 1;
+ }
+
+ ::std::atomic<bool> done{false};
+
+ ::std::thread server([&ping_ponger, &done]() {
+ if (FLAGS_server_priority > 0) {
+ SetCurrentThreadRealtimePriority(FLAGS_server_priority);
+ }
+ PinCurrentThreadToCPU(FLAGS_server_cpu);
+
+ while (!done) {
+ const PingPongerInterface::Data *data = ping_ponger->Wait();
+ PingPongerInterface::Data *response = ping_ponger->PongData();
+ for (size_t i = 0; i < sizeof(*data); ++i) {
+ (*response)[i] = (*data)[i] + 1;
+ }
+ ping_ponger->Pong();
+ }
+ });
+
+ if (FLAGS_client_priority > 0) {
+ SetCurrentThreadRealtimePriority(FLAGS_client_priority);
+ }
+ PinCurrentThreadToCPU(FLAGS_client_cpu);
+
+ // Warm everything up.
+ for (int i = 0; i < 1000; ++i) {
+ PingPongerInterface::Data *warmup_data = ping_ponger->PingData();
+ memset(*warmup_data, i % 255, sizeof(*warmup_data));
+ ping_ponger->Ping();
+ }
+
+ const monotonic_clock::time_point start = monotonic_clock::now();
+
+ for (int32_t i = 0; i < FLAGS_messages; ++i) {
+ PingPongerInterface::Data *to_send = ping_ponger->PingData();
+ memset(*to_send, i % 123, sizeof(*to_send));
+ const PingPongerInterface::Data *received = ping_ponger->Ping();
+ for (size_t ii = 0; ii < sizeof(*received); ++ii) {
+ CHECK_EQ(((i % 123) + 1) % 255, (*received)[ii]);
+ }
+ }
+
+ const monotonic_clock::time_point end = monotonic_clock::now();
+
+ // Try to make sure the server thread gets past its check of done so our
+ // Ping() down below doesn't hang. Kind of lame, but doing better would
+ // require complicating the interface to each implementation which isn't worth
+ // it here.
+ ::std::this_thread::sleep_for(::std::chrono::milliseconds(200));
+ done = true;
+ ping_ponger->PingData();
+ ping_ponger->Ping();
+ server.join();
+
+ LOG(INFO, "Took %f seconds to send %" PRId32 " messages\n",
+ chrono::duration_cast<chrono::duration<double>>(end - start).count(),
+ FLAGS_messages);
+ const chrono::nanoseconds per_message = (end - start) / FLAGS_messages;
+ if (per_message >= chrono::seconds(1)) {
+ LOG(INFO, "More than 1 second per message ?!?\n");
+ } else {
+ LOG(INFO, "That is %" PRId32 " nanoseconds per message\n",
+ static_cast<int>(per_message.count()));
+ }
+
+ return 0;
+}
+
+} // namespace aos
+
+int main(int argc, char **argv) {
+ ::gflags::SetUsageMessage(
+ ::std::string("Compares various forms of IPC. Usage:\n") + argv[0] +
+ " --method=METHOD\n"
+ "METHOD can be one of the following:\n"
+ "\tpipe\n"
+ "\tnamed_pipe\n"
+ "\taos_mutex\n"
+ "\taos_event\n"
+ "\tpthread_mutex\n"
+ "\tpthread_mutex_pshared\n"
+ "\tpthread_mutex_pshared_pi\n"
+ "\tpthread_mutex_pi\n"
+ "\taos_queue\n"
+ "\teventfd\n"
+ "\tsysv_semaphore\n"
+ "\tsysv_queue\n"
+ "\tposix_semaphore_unnamed_shared\n"
+ "\tposix_semaphore_unnamed_unshared\n"
+ "\tposix_semaphore_named\n"
+ "\tposix_queue\n"
+ "\tunix_stream\n"
+ "\tunix_datagram\n"
+ "\tunix_seqpacket\n"
+ "\ttcp\n"
+ "\ttcp_nodelay\n"
+ "\tudp\n");
+ ::gflags::ParseCommandLineFlags(&argc, &argv, true);
+
+ ::aos::InitNRT();
+ ::aos::logging::AddImplementation(
+ new ::aos::logging::StreamLogImplementation(stdout));
+
+ return ::aos::Main(argc, argv);
+}
diff --git a/aos/ipc_lib/ipc_stress_test.cc b/aos/ipc_lib/ipc_stress_test.cc
new file mode 100644
index 0000000..aa5d9c5
--- /dev/null
+++ b/aos/ipc_lib/ipc_stress_test.cc
@@ -0,0 +1,256 @@
+#include <errno.h>
+#include <libgen.h>
+#include <stdio.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/wait.h>
+#include <unistd.h>
+
+#include <chrono>
+#include <string>
+
+#include "aos/die.h"
+#include "aos/libc/aos_strsignal.h"
+#include "aos/libc/dirname.h"
+#include "aos/logging/logging.h"
+#include "aos/mutex/mutex.h"
+#include "aos/time/time.h"
+#include "aos/type_traits/type_traits.h"
+#include "aos/ipc_lib/core_lib.h"
+#include "aos/testing/test_shm.h"
+
+// This runs all of the IPC-related tests in a bunch of parallel processes for a
+// while and makes sure that they don't fail. It also captures the stdout and
+// stderr output from each test run and only prints it out (not interleaved with
+// the output from any other run) if the test fails.
+//
+// They have to be run in separate processes because (in addition to various
+// parts of our code not being thread-safe...) gtest does not like multiple
+// threads.
+//
+// It's written in C++ for performance. We need actual OS-level parallelism for
+// this to work, which means that Ruby's out because it doesn't have good
+// support for doing that. My Python implementation ended up pretty heavily disk
+// IO-bound, which is a bad way to test CPU contention.
+
+namespace aos {
+
+namespace chrono = ::std::chrono;
+
+// Each test is represented by the name of the test binary and then any
+// arguments to pass to it.
+// Using --gtest_filter is a bad idea because it seems to result in a lot of
+// swapping which causes everything to be disk-bound (at least for me).
+static const size_t kTestMaxArgs = 10;
+static const char * kTests[][kTestMaxArgs] = {
+ {"queue_test"},
+ {"condition_test"},
+ {"mutex_test"},
+ {"raw_queue_test"},
+};
+static const size_t kTestsLength = sizeof(kTests) / sizeof(kTests[0]);
+// These arguments get inserted before any per-test arguments.
+static const char *kDefaultArgs[] = {
+ "--gtest_repeat=30",
+ "--gtest_shuffle",
+};
+
+// How many test processes to run at a time.
+static const int kTesters = 100;
+// How long to test for.
+static constexpr monotonic_clock::duration kTestTime = chrono::seconds(30);
+
+// The structure that gets put into shared memory and then referenced by all of
+// the child processes.
+struct Shared {
+ Shared(const monotonic_clock::time_point stop_time)
+ : stop_time(stop_time), total_iterations(0) {}
+
+ // Synchronizes access to stdout/stderr to avoid interleaving failure
+ // messages.
+ Mutex output_mutex;
+
+ // When to stop.
+ monotonic_clock::time_point stop_time;
+
+ // The total number of iterations. Updated by each child as it finishes.
+ int total_iterations;
+ // Sychronizes writes to total_iterations
+ Mutex total_iterations_mutex;
+
+ const char *path;
+};
+static_assert(shm_ok<Shared>::value,
+ "it's going to get shared between forked processes");
+
+// Gets called after each child forks to run a test.
+void __attribute__((noreturn)) DoRunTest(
+ Shared *shared, const char *(*test)[kTestMaxArgs], int pipes[2]) {
+ if (close(pipes[0]) == -1) {
+ PDie("close(%d) of read end of pipe failed", pipes[0]);
+ }
+ if (close(STDIN_FILENO) == -1) {
+ PDie("close(STDIN_FILENO(=%d)) failed", STDIN_FILENO);
+ }
+ if (dup2(pipes[1], STDOUT_FILENO) == -1) {
+ PDie("dup2(%d, STDOUT_FILENO(=%d)) failed", pipes[1], STDOUT_FILENO);
+ }
+ if (dup2(pipes[1], STDERR_FILENO) == -1) {
+ PDie("dup2(%d, STDERR_FILENO(=%d)) failed", pipes[1], STDERR_FILENO);
+ }
+
+ size_t size = kTestMaxArgs;
+ size_t default_size = sizeof(kDefaultArgs) / sizeof(kDefaultArgs[0]);
+ // There's no chance to free this because we either exec or Die.
+ const char **args = new const char *[size + default_size + 1];
+ // The actual executable to run.
+ ::std::string executable;
+ int i = 0;
+ for (size_t test_i = 0; test_i < size; ++test_i) {
+ const char *c = (*test)[test_i];
+ if (i == 0) {
+ executable = ::std::string(shared->path) + "/" + c;
+ args[0] = executable.c_str();
+ for (const ::std::string &ci : kDefaultArgs) {
+ args[++i] = ci.c_str();
+ }
+ } else {
+ args[i] = c;
+ }
+ ++i;
+ }
+ args[size] = NULL;
+ execv(executable.c_str(), const_cast<char *const *>(args));
+ PDie("execv(%s, %p) failed", executable.c_str(), args);
+}
+
+void DoRun(Shared *shared) {
+ int iterations = 0;
+ // An iterator pointing to a random one of the tests.
+ // We randomize based on PID because otherwise they all end up running the
+ // same test at the same time for the whole test.
+ const char *(*test)[kTestMaxArgs] = &kTests[getpid() % kTestsLength];
+ int pipes[2];
+ while (monotonic_clock::now() < shared->stop_time) {
+ if (pipe(pipes) == -1) {
+ PDie("pipe(%p) failed", &pipes);
+ }
+ switch (fork()) {
+ case 0: // in runner
+ DoRunTest(shared, test, pipes);
+ case -1:
+ PDie("fork() failed");
+ }
+
+ if (close(pipes[1]) == -1) {
+ PDie("close(%d) of write end of pipe failed", pipes[1]);
+ }
+
+ ::std::string output;
+ char buffer[2048];
+ while (true) {
+ ssize_t ret = read(pipes[0], &buffer, sizeof(buffer));
+ if (ret == 0) { // EOF
+ if (close(pipes[0]) == -1) {
+ PDie("close(%d) of pipe at EOF failed", pipes[0]);
+ }
+ break;
+ } else if (ret == -1) {
+ PDie("read(%d, %p, %zd) failed", pipes[0], &buffer, sizeof(buffer));
+ }
+ output += ::std::string(buffer, ret);
+ }
+
+ int status;
+ while (true) {
+ if (wait(&status) == -1) {
+ if (errno == EINTR) continue;
+ PDie("wait(%p) in child failed", &status);
+ } else {
+ break;
+ }
+ }
+ if (WIFEXITED(status)) {
+ if (WEXITSTATUS(status) != 0) {
+ MutexLocker sync(&shared->output_mutex);
+ fprintf(stderr, "Test %s exited with status %d. output:\n",
+ (*test)[0], WEXITSTATUS(status));
+ fputs(output.c_str(), stderr);
+ }
+ } else if (WIFSIGNALED(status)) {
+ MutexLocker sync(&shared->output_mutex);
+ fprintf(stderr, "Test %s terminated by signal %d: %s.\n", (*test)[0],
+ WTERMSIG(status), aos_strsignal(WTERMSIG(status)));
+ fputs(output.c_str(), stderr);
+ } else {
+ CHECK(WIFSTOPPED(status));
+ Die("Test %s was stopped.\n", (*test)[0]);
+ }
+
+ ++test;
+ if (test == kTests + 1) test = kTests;
+ ++iterations;
+ }
+ {
+ MutexLocker sync(&shared->total_iterations_mutex);
+ shared->total_iterations += iterations;
+ }
+}
+
+void Run(Shared *shared) {
+ switch (fork()) {
+ case 0: // in child
+ DoRun(shared);
+ _exit(EXIT_SUCCESS);
+ case -1:
+ PDie("fork() of child failed");
+ }
+}
+
+int Main(int argc, char **argv) {
+ if (argc < 1) {
+ fputs("need an argument\n", stderr);
+ return EXIT_FAILURE;
+ }
+
+ ::aos::testing::TestSharedMemory my_shm_;
+
+ Shared *shared = static_cast<Shared *>(shm_malloc(sizeof(Shared)));
+ new (shared) Shared(monotonic_clock::now() + kTestTime);
+
+ if (asprintf(const_cast<char **>(&shared->path),
+ "%s/../tests", ::aos::libc::Dirname(argv[0]).c_str()) == -1) {
+ PDie("asprintf failed");
+ }
+
+ for (int i = 0; i < kTesters; ++i) {
+ Run(shared);
+ }
+
+ bool error = false;
+ for (int i = 0; i < kTesters; ++i) {
+ int status;
+ if (wait(&status) == -1) {
+ if (errno == EINTR) {
+ --i;
+ } else {
+ PDie("wait(%p) failed", &status);
+ }
+ }
+ if (!WIFEXITED(status) || WEXITSTATUS(status) != 0) {
+ error = true;
+ }
+ }
+
+ printf("Ran a total of %d tests.\n", shared->total_iterations);
+ if (error) {
+ printf("A child had a problem during the test.\n");
+ }
+ return error ? EXIT_FAILURE : EXIT_SUCCESS;
+}
+
+} // namespace aos
+
+int main(int argc, char **argv) {
+ return ::aos::Main(argc, argv);
+}
diff --git a/aos/ipc_lib/queue.cc b/aos/ipc_lib/queue.cc
new file mode 100644
index 0000000..c751a7e
--- /dev/null
+++ b/aos/ipc_lib/queue.cc
@@ -0,0 +1,563 @@
+#if !AOS_DEBUG
+#undef NDEBUG
+#define NDEBUG
+#endif
+
+#include "aos/ipc_lib/queue.h"
+
+#include <stdio.h>
+#include <string.h>
+#include <errno.h>
+#include <assert.h>
+
+#include <memory>
+#include <algorithm>
+
+#include "aos/type_traits/type_traits.h"
+#include "aos/ipc_lib/core_lib.h"
+
+namespace aos {
+namespace {
+
+static_assert(shm_ok<RawQueue>::value,
+ "RawQueue instances go into shared memory");
+
+const bool kReadDebug = false;
+const bool kWriteDebug = false;
+const bool kRefDebug = false;
+const bool kFetchDebug = false;
+const bool kReadIndexDebug = false;
+
+// The number of extra messages the pool associated with each queue will be able
+// to hold (for readers who are slow about freeing them or who leak one when
+// they get killed).
+const int kExtraMessages = 20;
+
+} // namespace
+
+constexpr Options<RawQueue>::Option RawQueue::kPeek;
+constexpr Options<RawQueue>::Option RawQueue::kFromEnd;
+constexpr Options<RawQueue>::Option RawQueue::kNonBlock;
+constexpr Options<RawQueue>::Option RawQueue::kBlock;
+constexpr Options<RawQueue>::Option RawQueue::kOverride;
+
+// This is what gets stuck in before each queue message in memory. It is always
+// allocated aligned to 8 bytes and its size has to maintain that alignment for
+// the message that follows immediately.
+struct RawQueue::MessageHeader {
+ MessageHeader *next;
+
+ // Gets the message header immediately preceding msg.
+ static MessageHeader *Get(const void *msg) {
+ return reinterpret_cast<MessageHeader *>(__builtin_assume_aligned(
+ static_cast<uint8_t *>(const_cast<void *>(msg)) - sizeof(MessageHeader),
+ alignof(MessageHeader)));
+ }
+
+ int32_t ref_count() const {
+ return __atomic_load_n(&ref_count_, __ATOMIC_RELAXED);
+ }
+ void set_ref_count(int32_t val) {
+ __atomic_store_n(&ref_count_, val, __ATOMIC_RELAXED);
+ }
+
+ void ref_count_sub() {
+ __atomic_sub_fetch(&ref_count_, 1, __ATOMIC_RELAXED);
+ }
+ void ref_count_add() {
+ __atomic_add_fetch(&ref_count_, 1, __ATOMIC_RELAXED);
+ }
+
+ private:
+ // This gets accessed with atomic instructions without any
+ // locks held by various member functions.
+ int32_t ref_count_;
+
+ // Padding to make the total size 8 bytes if we have 4-byte pointers or bump
+ // it to 16 if a pointer is 8 bytes by itself.
+#if __SIZEOF_POINTER__ == 8
+#ifdef __clang__
+ // Clang is smart enough to realize this is unused, but GCC doesn't like the
+ // attribute here...
+ __attribute__((unused))
+#endif
+ char padding[4];
+#elif __SIZEOF_POINTER__ == 4
+ // No padding needed to get 8 byte total size.
+#else
+#error Unknown pointer size.
+#endif
+};
+
+inline int RawQueue::index_add1(int index) {
+ // Doing it this way instead of with % is more efficient on ARM.
+ int r = index + 1;
+ assert(index <= data_length_);
+ if (r == data_length_) {
+ return 0;
+ } else {
+ return r;
+ }
+}
+
+void RawQueue::DecrementMessageReferenceCount(const void *msg) {
+ MessageHeader *header = MessageHeader::Get(msg);
+ header->ref_count_sub();
+ if (kRefDebug) {
+ printf("%p ref dec count: %p count=%d\n", this, msg, header->ref_count());
+ }
+
+ // The only way it should ever be 0 is if we were the last one to decrement,
+ // in which case nobody else should have it around to re-increment it or
+ // anything in the middle, so this is safe to do not atomically with the
+ // decrement.
+ if (header->ref_count() == 0) {
+ DoFreeMessage(msg);
+ } else {
+ assert(header->ref_count() > 0);
+ }
+}
+
+inline void RawQueue::IncrementMessageReferenceCount(const void *msg) const {
+ MessageHeader *const header = MessageHeader::Get(msg);
+ header->ref_count_add();
+ if (kRefDebug) {
+ printf("%p ref inc count: %p\n", this, msg);
+ }
+}
+
+inline void RawQueue::DoFreeMessage(const void *msg) {
+ MessageHeader *header = MessageHeader::Get(msg);
+ if (kRefDebug) {
+ printf("%p ref free to %p: %p\n", this, recycle_, msg);
+ }
+
+ if (__builtin_expect(recycle_ != nullptr, 0)) {
+ void *const new_msg = recycle_->GetMessage();
+ if (new_msg == nullptr) {
+ fprintf(stderr, "queue: couldn't get a message"
+ " for recycle queue %p\n", recycle_);
+ } else {
+ header->ref_count_add();
+ if (!recycle_->WriteMessage(const_cast<void *>(msg), kOverride)) {
+ fprintf(stderr, "queue: %p->WriteMessage(%p, kOverride) failed."
+ " aborting\n", recycle_, msg);
+ printf("see stderr\n");
+ abort();
+ }
+ msg = new_msg;
+ header = MessageHeader::Get(new_msg);
+ }
+ }
+
+ // This works around GCC bug 60272 (fixed in 4.8.3).
+ // new_next should just get replaced with header->next (and the body of the
+ // loop should become empty).
+ // The bug is that the store to new_next after the compare/exchange is
+ // unconditional but it should only be if it fails, which could mean
+ // overwriting what somebody else who preempted us right then changed it to.
+ // TODO(brians): Get rid of this workaround once we get a new enough GCC.
+ MessageHeader *new_next = __atomic_load_n(&free_messages_, __ATOMIC_RELAXED);
+ do {
+ header->next = new_next;
+ } while (__builtin_expect(
+ !__atomic_compare_exchange_n(&free_messages_, &new_next, header, true,
+ __ATOMIC_RELEASE, __ATOMIC_RELAXED),
+ 0));
+}
+
+void *RawQueue::GetMessage() {
+ MessageHeader *header = __atomic_load_n(&free_messages_, __ATOMIC_RELAXED);
+ do {
+ if (__builtin_expect(header == nullptr, 0)) {
+ LOG(FATAL, "overused pool of queue %p (%s)\n", this, name_);
+ }
+ } while (__builtin_expect(
+ !__atomic_compare_exchange_n(&free_messages_, &header, header->next, true,
+ __ATOMIC_ACQ_REL, __ATOMIC_RELAXED),
+ 0));
+ void *msg = reinterpret_cast<uint8_t *>(header + 1);
+ // It might be uninitialized, 0 from a previous use, or 1 from previously
+ // being recycled.
+ header->set_ref_count(1);
+ if (kRefDebug) {
+ printf("%p ref alloc: %p\n", this, msg);
+ }
+ return msg;
+}
+
+RawQueue::RawQueue(const char *name, size_t length, int hash, int queue_length)
+ : readable_(&data_lock_), writable_(&data_lock_) {
+ static_assert(shm_ok<RawQueue::MessageHeader>::value,
+ "the whole point is to stick it in shared memory");
+ static_assert((sizeof(RawQueue::MessageHeader) % 8) == 0,
+ "need to revalidate size/alignent assumptions");
+
+ if (queue_length < 1) {
+ LOG(FATAL, "queue length %d of %s needs to be at least 1\n", queue_length,
+ name);
+ }
+
+ const size_t name_size = strlen(name) + 1;
+ char *temp = static_cast<char *>(shm_malloc(name_size));
+ memcpy(temp, name, name_size);
+ name_ = temp;
+ length_ = length;
+ hash_ = hash;
+ queue_length_ = queue_length;
+
+ next_ = NULL;
+ recycle_ = NULL;
+
+ if (kFetchDebug) {
+ printf("initializing name=%s, length=%zd, hash=%d, queue_length=%d\n",
+ name, length, hash, queue_length);
+ }
+
+ data_length_ = queue_length + 1;
+ data_ = static_cast<void **>(shm_malloc(sizeof(void *) * data_length_));
+ data_start_ = 0;
+ data_end_ = 0;
+ messages_ = 0;
+
+ msg_length_ = length + sizeof(MessageHeader);
+
+ // Create all of the messages for the free list and stick them on.
+ {
+ MessageHeader *previous = nullptr;
+ for (int i = 0; i < queue_length + kExtraMessages; ++i) {
+ MessageHeader *const message =
+ static_cast<MessageHeader *>(shm_malloc(msg_length_));
+ free_messages_ = message;
+ message->next = previous;
+ previous = message;
+ }
+ }
+
+ readable_waiting_ = false;
+
+ if (kFetchDebug) {
+ printf("made queue %s\n", name);
+ }
+}
+
+RawQueue *RawQueue::Fetch(const char *name, size_t length, int hash,
+ int queue_length) {
+ if (kFetchDebug) {
+ printf("fetching queue %s\n", name);
+ }
+ if (mutex_lock(&global_core->mem_struct->queues.lock) != 0) {
+ LOG(FATAL, "mutex_lock(%p) failed\n",
+ &global_core->mem_struct->queues.lock);
+ }
+ RawQueue *current = static_cast<RawQueue *>(
+ global_core->mem_struct->queues.pointer);
+ if (current != NULL) {
+ while (true) {
+ // If we found a matching queue.
+ if (strcmp(current->name_, name) == 0 && current->length_ == length &&
+ current->hash_ == hash && current->queue_length_ == queue_length) {
+ mutex_unlock(&global_core->mem_struct->queues.lock);
+ return current;
+ } else {
+ if (kFetchDebug) {
+ printf("rejected queue %s strcmp=%d target=%s\n", current->name_,
+ strcmp(current->name_, name), name);
+ }
+ }
+ // If this is the last one.
+ if (current->next_ == NULL) break;
+ current = current->next_;
+ }
+ }
+
+ RawQueue *r = new (shm_malloc(sizeof(RawQueue)))
+ RawQueue(name, length, hash, queue_length);
+ if (current == NULL) { // if we don't already have one
+ global_core->mem_struct->queues.pointer = r;
+ } else {
+ current->next_ = r;
+ }
+
+ mutex_unlock(&global_core->mem_struct->queues.lock);
+ return r;
+}
+
+RawQueue *RawQueue::Fetch(const char *name, size_t length, int hash,
+ int queue_length,
+ int recycle_hash, int recycle_length, RawQueue **recycle) {
+ RawQueue *r = Fetch(name, length, hash, queue_length);
+ r->recycle_ = Fetch(name, length, recycle_hash, recycle_length);
+ if (r == r->recycle_) {
+ fprintf(stderr, "queue: r->recycle_(=%p) == r(=%p)\n", r->recycle_, r);
+ printf("see stderr\n");
+ r->recycle_ = NULL;
+ abort();
+ }
+ *recycle = r->recycle_;
+ return r;
+}
+
+bool RawQueue::DoWriteMessage(void *msg, Options<RawQueue> options) {
+ if (kWriteDebug) {
+ printf("queue: %p->WriteMessage(%p, %x)\n", this, msg, options.printable());
+ }
+
+ bool signal_readable;
+
+ {
+ IPCMutexLocker locker(&data_lock_);
+ CHECK(!locker.owner_died());
+
+ int new_end;
+ while (true) {
+ new_end = index_add1(data_end_);
+ // If there is room in the queue right now.
+ if (new_end != data_start_) break;
+ if (options & kNonBlock) {
+ if (kWriteDebug) {
+ printf("queue: not blocking on %p. returning false\n", this);
+ }
+ DecrementMessageReferenceCount(msg);
+ return false;
+ } else if (options & kOverride) {
+ if (kWriteDebug) {
+ printf("queue: overriding on %p\n", this);
+ }
+ // Avoid leaking the message that we're going to overwrite.
+ DecrementMessageReferenceCount(data_[data_start_]);
+ data_start_ = index_add1(data_start_);
+ } else { // kBlock
+ assert(options & kBlock);
+ if (kWriteDebug) {
+ printf("queue: going to wait for writable_ of %p\n", this);
+ }
+ CHECK(!writable_.Wait());
+ }
+ }
+ data_[data_end_] = msg;
+ ++messages_;
+ data_end_ = new_end;
+
+ signal_readable = readable_waiting_;
+ readable_waiting_ = false;
+ }
+
+ if (signal_readable) {
+ if (kWriteDebug) {
+ printf("queue: broadcasting to readable_ of %p\n", this);
+ }
+ readable_.Broadcast();
+ } else if (kWriteDebug) {
+ printf("queue: skipping broadcast to readable_ of %p\n", this);
+ }
+
+ if (kWriteDebug) {
+ printf("queue: write returning true on queue %p\n", this);
+ }
+ return true;
+}
+
+inline void RawQueue::ReadCommonEnd() {
+ if (is_writable()) {
+ if (kReadDebug) {
+ printf("queue: %ssignalling writable_ of %p\n",
+ writable_start_ ? "not " : "", this);
+ }
+ if (!writable_start_) writable_.Broadcast();
+ }
+}
+
+bool RawQueue::ReadCommonStart(Options<RawQueue> options, int *index) {
+ while (data_start_ == data_end_ || ((index != NULL) && messages_ <= *index)) {
+ if (options & kNonBlock) {
+ if (kReadDebug) {
+ printf("queue: not going to block waiting on %p\n", this);
+ }
+ return false;
+ } else { // kBlock
+ assert(options & kBlock);
+ if (kReadDebug) {
+ printf("queue: going to wait for readable_ of %p\n", this);
+ }
+ readable_waiting_ = true;
+ // Wait for a message to become readable.
+ CHECK(!readable_.Wait());
+ if (kReadDebug) {
+ printf("queue: done waiting for readable_ of %p\n", this);
+ }
+ }
+ }
+ // We have to check down here because we might have unlocked the mutex while
+ // Wait()ing above so this value might have changed.
+ writable_start_ = is_writable();
+ if (kReadDebug) {
+ printf("queue: %p->read(%p) start=%d end=%d writable_start=%d\n",
+ this, index, data_start_, data_end_, writable_start_);
+ }
+ return true;
+}
+
+inline int RawQueue::LastMessageIndex() const {
+ int pos = data_end_ - 1;
+ if (pos < 0) { // If it wrapped around.
+ pos = data_length_ - 1;
+ }
+ return pos;
+}
+
+const void *RawQueue::DoReadMessage(Options<RawQueue> options) {
+ // TODO(brians): Test this function.
+ if (kReadDebug) {
+ printf("queue: %p->ReadMessage(%x)\n", this, options.printable());
+ }
+ void *msg = NULL;
+
+ IPCMutexLocker locker(&data_lock_);
+ CHECK(!locker.owner_died());
+
+ if (!ReadCommonStart(options, nullptr)) {
+ if (kReadDebug) {
+ printf("queue: %p common returned false\n", this);
+ }
+ return NULL;
+ }
+
+ if (options & kFromEnd) {
+ if (options & kPeek) {
+ if (kReadDebug) {
+ printf("queue: %p shortcutting c2: %d\n", this, LastMessageIndex());
+ }
+ msg = data_[LastMessageIndex()];
+ IncrementMessageReferenceCount(msg);
+ } else {
+ while (true) {
+ if (kReadDebug) {
+ printf("queue: %p start of c2\n", this);
+ }
+ // This loop pulls each message out of the buffer.
+ const int pos = data_start_;
+ data_start_ = index_add1(data_start_);
+ // If this is the last one.
+ if (data_start_ == data_end_) {
+ if (kReadDebug) {
+ printf("queue: %p reading from c2: %d\n", this, pos);
+ }
+ msg = data_[pos];
+ break;
+ }
+ // This message is not going to be in the queue any more.
+ DecrementMessageReferenceCount(data_[pos]);
+ }
+ }
+ } else {
+ if (kReadDebug) {
+ printf("queue: %p reading from d2: %d\n", this, data_start_);
+ }
+ msg = data_[data_start_];
+ if (options & kPeek) {
+ IncrementMessageReferenceCount(msg);
+ } else {
+ data_start_ = index_add1(data_start_);
+ }
+ }
+ ReadCommonEnd();
+ if (kReadDebug) {
+ printf("queue: %p read returning %p\n", this, msg);
+ }
+ return msg;
+}
+
+const void *RawQueue::DoReadMessageIndex(Options<RawQueue> options,
+ int *index) {
+ if (kReadDebug) {
+ printf("queue: %p->ReadMessageIndex(%x, %p(*=%d))\n",
+ this, options.printable(), index, *index);
+ }
+ void *msg = NULL;
+
+ IPCMutexLocker locker(&data_lock_);
+ CHECK(!locker.owner_died());
+
+ if (!ReadCommonStart(options, index)) {
+ if (kReadDebug) {
+ printf("queue: %p common returned false\n", this);
+ }
+ return NULL;
+ }
+
+ // TODO(parker): Handle integer wrap on the index.
+
+ if (options & kFromEnd) {
+ if (kReadDebug) {
+ printf("queue: %p reading from c1: %d\n", this, LastMessageIndex());
+ }
+ msg = data_[LastMessageIndex()];
+
+ // We'd skip this if we had kPeek, but kPeek | kFromEnd isn't valid for
+ // reading with an index.
+ *index = messages_;
+ } else {
+ // Where we're going to start reading.
+ int my_start;
+
+ const int unread_messages = messages_ - *index;
+ assert(unread_messages > 0);
+ int current_messages = data_end_ - data_start_;
+ if (current_messages < 0) current_messages += data_length_;
+ if (kReadIndexDebug) {
+ printf("queue: %p start=%d end=%d current=%d\n",
+ this, data_start_, data_end_, current_messages);
+ }
+ assert(current_messages > 0);
+ // If we're behind the available messages.
+ if (unread_messages > current_messages) {
+ // Catch index up to the last available message.
+ *index = messages_ - current_messages;
+ // And that's the one we're going to read.
+ my_start = data_start_;
+ if (kReadIndexDebug) {
+ printf("queue: %p jumping ahead to message %d (have %d) (at %d)\n",
+ this, *index, messages_, data_start_);
+ }
+ } else {
+ // Just start reading at the first available message that we haven't yet
+ // read.
+ my_start = data_end_ - unread_messages;
+ if (kReadIndexDebug) {
+ printf("queue: %p original read from %d\n", this, my_start);
+ }
+ if (data_start_ < data_end_) {
+ assert(my_start >= 0);
+ }
+ if (my_start < 0) my_start += data_length_;
+ }
+
+ if (kReadDebug) {
+ printf("queue: %p reading from d1: %d\n", this, my_start);
+ }
+ // We have to be either after the start or before the end, even if the queue
+ // is wrapped around (should be both if it's not).
+ assert((my_start >= data_start_) || (my_start < data_end_));
+ // More sanity checking.
+ assert((my_start >= 0) && (my_start < data_length_));
+ msg = data_[my_start];
+ if (!(options & kPeek)) ++(*index);
+ }
+ IncrementMessageReferenceCount(msg);
+
+ ReadCommonEnd();
+ return msg;
+}
+
+int RawQueue::FreeMessages() const {
+ int r = 0;
+ MessageHeader *header = free_messages_;
+ while (header != nullptr) {
+ ++r;
+ header = header->next;
+ }
+ return r;
+}
+
+} // namespace aos
diff --git a/aos/ipc_lib/queue.h b/aos/ipc_lib/queue.h
new file mode 100644
index 0000000..f21295f
--- /dev/null
+++ b/aos/ipc_lib/queue.h
@@ -0,0 +1,231 @@
+#ifndef AOS_IPC_LIB_QUEUE_H_
+#define AOS_IPC_LIB_QUEUE_H_
+
+#include "aos/ipc_lib/shared_mem.h"
+#include "aos/mutex/mutex.h"
+#include "aos/condition.h"
+#include "aos/util/options.h"
+#include "aos/logging/logging.h"
+
+// TODO(brians) add valgrind client requests to the queue and shared_mem_malloc
+// code to make checking for leaks work better
+// <http://www.valgrind.org/docs/manual/mc-manual.html#mc-manual.mempools>
+// describes how
+
+// Any pointers returned from these functions can be safely passed to other
+// processes because they are all shared memory pointers.
+// IMPORTANT: Any message pointer must be passed back in some way
+// (FreeMessage and WriteMessage are common ones) or the
+// application will leak shared memory.
+// NOTE: Taking a message from ReadMessage and then passing it to WriteMessage
+// might work, but it is not guaranteed to.
+
+namespace aos {
+
+// Queues are the primary way to use shared memory. Basic use consists of
+// calling Queue::Fetch and then reading and/or writing messages.
+// Queues (as the name suggests) are a FIFO stack of messages. Each combination
+// of name and type signature will result in a different queue, which means
+// that if you only recompile some code that uses differently sized messages,
+// it will simply use a different queue than the old code.
+class RawQueue {
+ public:
+ // Retrieves (and creates if necessary) a queue. Each combination of name and
+ // signature refers to a completely independent queue.
+ // length is how large each message will be
+ // hash can differentiate multiple otherwise identical queues
+ // queue_length is how many messages the queue will be able to hold
+ // Will never return NULL.
+ static RawQueue *Fetch(const char *name, size_t length, int hash,
+ int queue_length);
+ // Same as above, except sets up the returned queue so that it will put
+ // messages on *recycle when they are freed (after they have been released by
+ // all other readers/writers and are not in the queue).
+ // recycle_queue_length determines how many freed messages will be kept.
+ // Other code can retrieve the 2 queues separately (the recycle queue will
+ // have the same length and hash as the main one). However, any frees made
+ // using a queue with only (name,length,hash,queue_length) before the
+ // recycle queue has been associated with it will not go on to the recycle
+ // queue.
+ // NOTE: calling this function with the same (name,length,hash,queue_length)
+ // but multiple recycle_queue_lengths will result in each freed message being
+ // put onto an undefined one of the recycle queues.
+ // Will never return NULL.
+ static RawQueue *Fetch(const char *name, size_t length, int hash,
+ int queue_length, int recycle_hash,
+ int recycle_queue_length, RawQueue **recycle);
+
+ // Doesn't update the currently read index (the read messages in the queue or
+ // the index). This means the returned message (and any others skipped with
+ // kFromEnd) will be left in the queue.
+ // For reading only.
+ // Not valid for ReadMessageIndex combined with kFromEnd.
+ static constexpr Options<RawQueue>::Option kPeek{0x0001};
+ // Reads the last message in the queue instead of just the next one.
+ // NOTE: This removes all of the messages until the last one from the queue
+ // (which means that nobody else will read them).
+ // For reading only.
+ // Not valid for ReadMessageIndex combined with kPeek.
+ static constexpr Options<RawQueue>::Option kFromEnd{0x0002};
+ // Causes reads to return NULL and writes to fail instead of waiting.
+ // For reading and writing.
+ static constexpr Options<RawQueue>::Option kNonBlock{0x0004};
+ // Causes things to block.
+ // For reading and writing.
+ static constexpr Options<RawQueue>::Option kBlock{0x0008};
+ // Causes writes to overwrite the oldest message in the queue instead of
+ // blocking.
+ // For writing only.
+ static constexpr Options<RawQueue>::Option kOverride{0x0010};
+
+ RawQueue(const RawQueue &) = default;
+ RawQueue &operator=(const RawQueue &) = default;
+
+ // Writes a message into the queue.
+ // This function takes ownership of msg.
+ // NOTE: msg must point to a valid message from this queue
+ // Returns true on success. A return value of false means msg has already been
+ // freed.
+ bool WriteMessage(void *msg, Options<RawQueue> options) {
+ static constexpr Options<RawQueue> kWriteFailureOptions =
+ kNonBlock | kBlock | kOverride;
+ if (!options.NoOthersSet(kWriteFailureOptions)) {
+ LOG(FATAL, "illegal write options in %x\n", options.printable());
+ }
+ if (!options.ExactlyOneSet(kWriteFailureOptions)) {
+ LOG(FATAL, "invalid write options %x\n", options.printable());
+ }
+ return DoWriteMessage(msg, options);
+ }
+
+ // Reads a message out of the queue.
+ // The return value will have at least the length of this queue's worth of
+ // valid data where it's pointing to.
+ // The return value is const because other people might be viewing the same
+ // messsage. Do not cast the const away!
+ // IMPORTANT: The return value (if not NULL) must eventually be passed to
+ // FreeMessage.
+ const void *ReadMessage(Options<RawQueue> options) {
+ CheckReadOptions(options);
+ return DoReadMessage(options);
+ }
+ // The same as ReadMessage, except it will never return the
+ // same message twice (when used with the same index argument). However,
+ // may not return some messages that pass through the queue.
+ // *index should start as 0. index does not have to be in shared memory, but
+ // it can be.
+ // Calling with both kPeek and kFromEnd in options isn't valid because that
+ // would mean ignoring index, which would make this function the same as
+ // ReadMessage (which should be used instead).
+ const void *ReadMessageIndex(Options<RawQueue> options, int *index) {
+ CheckReadOptions(options);
+ static constexpr Options<RawQueue> kFromEndAndPeek = kFromEnd | kPeek;
+ if (options.AllSet(kFromEndAndPeek)) {
+ LOG(FATAL, "ReadMessageIndex(kFromEnd | kPeek) is not allowed\n");
+ }
+ return DoReadMessageIndex(options, index);
+ }
+
+ // Retrieves ("allocates") a message that can then be written to the queue.
+ // NOTE: the return value will be completely uninitialized
+ // The return value will have at least the length of this queue's worth of
+ // valid memory where it's pointing to.
+ // Returns NULL for error.
+ // IMPORTANT: The return value (if not NULL) must eventually be passed to
+ // FreeMessage or WriteMessage.
+ void *GetMessage();
+
+ // It is ok to call this method with a NULL msg.
+ void FreeMessage(const void *msg) {
+ if (msg != NULL) DecrementMessageReferenceCount(msg);
+ }
+
+ // UNSAFE! Returns the number of free messages we have. Only safe to use when
+ // only 1 task is using this object (ie in tests).
+ int FreeMessages() const;
+
+ private:
+ struct MessageHeader;
+
+ // The public wrappers around these are inlined and do argument checking.
+ bool DoWriteMessage(void *msg, Options<RawQueue> options);
+ const void *DoReadMessage(Options<RawQueue> options);
+ const void *DoReadMessageIndex(Options<RawQueue> options, int *index);
+ void CheckReadOptions(Options<RawQueue> options) {
+ static constexpr Options<RawQueue> kValidOptions =
+ kPeek | kFromEnd | kNonBlock | kBlock;
+ if (!options.NoOthersSet(kValidOptions)) {
+ LOG(FATAL, "illegal read options in %x\n", options.printable());
+ }
+ static constexpr Options<RawQueue> kBlockChoices = kNonBlock | kBlock;
+ if (!options.ExactlyOneSet(kBlockChoices)) {
+ LOG(FATAL, "invalid read options %x\n", options.printable());
+ }
+ }
+
+ // Adds 1 to the given index and handles wrapping correctly.
+ int index_add1(int index);
+
+ bool is_readable() { return data_end_ != data_start_; }
+ bool is_writable() { return index_add1(data_end_) != data_start_; }
+
+ // These next 4 allow finding the right one.
+ const char *name_;
+ size_t length_;
+ int hash_;
+ int queue_length_;
+ // The next one in the linked list of queues.
+ RawQueue *next_;
+
+ RawQueue *recycle_;
+
+ Mutex data_lock_; // protects operations on data_ etc
+ // Always gets broadcasted to because different readers might have different
+ // ideas of what "readable" means (ie ones using separated indices).
+ Condition readable_;
+ Condition writable_;
+ int data_length_; // max length into data + 1
+ int data_start_; // is an index into data
+ int data_end_; // is an index into data
+ int messages_; // that have passed through
+ void **data_; // array of messages (with headers)
+
+ size_t msg_length_; // sizeof(each message) including the header
+ // A pointer to the first in the linked list of free messages.
+ MessageHeader *free_messages_;
+
+ // Keeps track of if the queue was writable before a read so we can Signal() a
+ // reader if we transition it.
+ bool writable_start_;
+
+ // True iff somebody is currently Wait()ing on readable_.
+ // Set to true by each reader before calling Wait() and set back to false
+ // before the Broadcast().
+ bool readable_waiting_;
+
+ // Actually frees the given message.
+ void DoFreeMessage(const void *msg);
+ // Calls DoFreeMessage if appropriate.
+ void DecrementMessageReferenceCount(const void *msg);
+ // Only does the actual incrementing of the reference count.
+ void IncrementMessageReferenceCount(const void *msg) const;
+
+ // Must be called with data_lock_ locked.
+ // *read_data will be initialized.
+ // Returns with a readable message in data_ or false.
+ bool ReadCommonStart(Options<RawQueue> options, int *index);
+ // Deals with setting/unsetting readable_ and writable_.
+ // Must be called after data_lock_ has been unlocked.
+ // read_data should be the same thing that was passed in to ReadCommonStart.
+ void ReadCommonEnd();
+ // Returns the index of the last message.
+ // Useful for reading with kPeek.
+ int LastMessageIndex() const;
+
+ // Gets called by Fetch when necessary (with placement new).
+ RawQueue(const char *name, size_t length, int hash, int queue_length);
+};
+
+} // namespace aos
+
+#endif // AOS_IPC_LIB_QUEUE_H_
diff --git a/aos/ipc_lib/raw_queue_test.cc b/aos/ipc_lib/raw_queue_test.cc
new file mode 100644
index 0000000..ab98b1f
--- /dev/null
+++ b/aos/ipc_lib/raw_queue_test.cc
@@ -0,0 +1,1046 @@
+#include "aos/ipc_lib/queue.h"
+
+#include <unistd.h>
+#include <sys/mman.h>
+#include <inttypes.h>
+
+#include <chrono>
+#include <ostream>
+#include <memory>
+#include <map>
+
+#include "gtest/gtest.h"
+
+#include "aos/ipc_lib/core_lib.h"
+#include "aos/type_traits/type_traits.h"
+#include "aos/testing/test_shm.h"
+#include "aos/time/time.h"
+#include "aos/logging/logging.h"
+#include "aos/die.h"
+#include "aos/util/thread.h"
+#include "aos/util/options.h"
+#include "aos/util/death_test_log_implementation.h"
+#include "aos/testing/prevent_exit.h"
+
+using ::testing::AssertionResult;
+using ::testing::AssertionSuccess;
+using ::testing::AssertionFailure;
+
+namespace aos {
+namespace testing {
+
+namespace chrono = ::std::chrono;
+namespace this_thread = ::std::this_thread;
+
+// The same constant from queue.cc. This will have to be updated if that one is.
+const int kExtraMessages = 20;
+
+class RawQueueTest : public ::testing::Test {
+ protected:
+ static const size_t kFailureSize = 400;
+ static char *fatal_failure;
+ private:
+ enum class ResultType : uint8_t {
+ NotCalled,
+ Called,
+ Returned,
+ };
+ const std::string ResultTypeString(volatile const ResultType &result) {
+ switch (result) {
+ case ResultType::Returned:
+ return "Returned";
+ case ResultType::Called:
+ return "Called";
+ case ResultType::NotCalled:
+ return "NotCalled";
+ default:
+ return std::string("unknown(") +
+ ::std::to_string(static_cast<uint8_t>(result)) + ")";
+ }
+ }
+ static_assert(aos::shm_ok<ResultType>::value,
+ "this will get put in shared memory");
+ template<typename T>
+ struct FunctionToCall {
+ FunctionToCall() : result(ResultType::NotCalled), started() {
+ }
+
+ volatile ResultType result;
+ bool expected;
+ void (*function)(T*, char*);
+ T *arg;
+ volatile char failure[kFailureSize];
+ aos_futex started;
+ };
+ template<typename T>
+ static void Hangs_(FunctionToCall<T> *const to_call) {
+ this_thread::sleep_for(chrono::milliseconds(10));
+ ASSERT_EQ(1, futex_set(&to_call->started));
+ to_call->result = ResultType::Called;
+ to_call->function(to_call->arg, const_cast<char *>(to_call->failure));
+ to_call->result = ResultType::Returned;
+ }
+
+ // How long until a function is considered to have hung.
+ static constexpr chrono::nanoseconds kHangTime = chrono::milliseconds(90);
+ // How long to sleep after forking (for debugging).
+ static constexpr chrono::nanoseconds kForkSleep = chrono::milliseconds(0);
+
+ // Represents a process that has been forked off. The destructor kills the
+ // process and wait(2)s for it.
+ class ForkedProcess {
+ public:
+ ForkedProcess(pid_t pid, aos_futex *done)
+ : pid_(pid), done_(done), exiting_(false) {};
+ ~ForkedProcess() {
+ if (!exiting_) {
+ if (kill(pid_, SIGTERM) == -1) {
+ if (errno == ESRCH) {
+ printf("process %jd was already dead\n",
+ static_cast<intmax_t>(pid_));
+ } else {
+ PLOG(FATAL, "kill(SIGKILL, %jd) failed",
+ static_cast<intmax_t>(pid_));
+ }
+ }
+ }
+ const pid_t ret = wait(NULL);
+ if (ret == -1) {
+ LOG(WARNING, "wait(NULL) failed."
+ " child %jd might still be alive\n",
+ static_cast<intmax_t>(pid_));
+ } else if (ret == 0) {
+ LOG(WARNING, "child %jd wasn't waitable. it might still be alive\n",
+ static_cast<intmax_t>(pid_));
+ } else if (ret != pid_) {
+ LOG(WARNING, "child %d is now confirmed dead"
+ ", but child %jd might still be alive\n",
+ ret, static_cast<intmax_t>(pid_));
+ }
+ }
+
+ enum class JoinResult {
+ Finished, Hung, Error
+ };
+ JoinResult Join(chrono::nanoseconds timeout = kHangTime) {
+ struct timespec done_timeout;
+ {
+ auto full_timeout = kForkSleep + timeout;
+ ::std::chrono::seconds sec =
+ ::std::chrono::duration_cast<::std::chrono::seconds>(full_timeout);
+ ::std::chrono::nanoseconds nsec =
+ ::std::chrono::duration_cast<::std::chrono::nanoseconds>(
+ full_timeout - sec);
+ done_timeout.tv_sec = sec.count();
+ done_timeout.tv_nsec = nsec.count();
+ }
+
+ switch (futex_wait_timeout(done_, &done_timeout)) {
+ case 2:
+ return JoinResult::Hung;
+ case 0:
+ exiting_ = true;
+ return JoinResult::Finished;
+ default:
+ return JoinResult::Error;
+ }
+ }
+
+ private:
+ const pid_t pid_;
+ aos_futex *const done_;
+ // True iff we know that the process is already exiting.
+ bool exiting_;
+ } __attribute__((unused));
+
+ // State for HangsFork and HangsCheck.
+ typedef uint8_t ChildID;
+ static void ReapExitHandler() {
+ for (auto it = children_.begin(); it != children_.end(); ++it) {
+ delete it->second;
+ }
+ }
+ static std::map<ChildID, ForkedProcess *> children_;
+ std::map<ChildID, FunctionToCall<void> *> to_calls_;
+
+ void SetUp() override {
+ ::testing::Test::SetUp();
+
+ SetDieTestMode(true);
+
+ fatal_failure = static_cast<char *>(shm_malloc(sizeof(fatal_failure)));
+ static bool registered = false;
+ if (!registered) {
+ atexit(ReapExitHandler);
+ registered = true;
+ }
+ }
+
+ protected:
+ // function gets called with arg in a forked process.
+ // Leaks shared memory.
+ template<typename T> __attribute__((warn_unused_result))
+ std::unique_ptr<ForkedProcess> ForkExecute(void (*function)(T*), T *arg) {
+ aos_futex *done = static_cast<aos_futex *>(shm_malloc_aligned(
+ sizeof(*done), alignof(aos_futex)));
+ *done = 0;
+ const pid_t pid = fork();
+ switch (pid) {
+ case 0: // child
+ if (kForkSleep != chrono::milliseconds(0)) {
+ LOG(INFO, "pid %jd sleeping for %" PRId64 "ns\n",
+ static_cast<intmax_t>(getpid()), kForkSleep.count());
+ this_thread::sleep_for(kForkSleep);
+ }
+ ::aos::testing::PreventExit();
+ function(arg);
+ CHECK_NE(-1, futex_set(done));
+ exit(EXIT_SUCCESS);
+ case -1: // parent failure
+ PLOG(ERROR, "fork() failed");
+ return std::unique_ptr<ForkedProcess>();
+ default: // parent
+ return std::unique_ptr<ForkedProcess>(new ForkedProcess(pid, done));
+ }
+ }
+
+ // Checks whether or not the given function hangs.
+ // expected is whether to return success or failure if the function hangs
+ // NOTE: There are other reasons for it to return a failure than the function
+ // doing the wrong thing.
+ // Leaks shared memory.
+ template<typename T>
+ AssertionResult Hangs(void (*function)(T*, char*), T *arg, bool expected) {
+ AssertionResult fork_result(HangsFork<T>(function, arg, expected, 0));
+ if (!fork_result) {
+ return fork_result;
+ }
+ return HangsCheck(0);
+ }
+ // Starts the first part of Hangs.
+ // Use HangsCheck to get the result.
+ // Returns whether the fork succeeded or not, NOT whether or not the hang
+ // check succeeded.
+ template<typename T>
+ AssertionResult HangsFork(void (*function)(T*, char *), T *arg,
+ bool expected, ChildID id) {
+ static_assert(aos::shm_ok<FunctionToCall<T>>::value,
+ "this is going into shared memory");
+ FunctionToCall<T> *const to_call =
+ static_cast<FunctionToCall<T> *>(
+ shm_malloc_aligned(sizeof(*to_call), alignof(FunctionToCall<T>)));
+ new (to_call) FunctionToCall<T>();
+ to_call->function = function;
+ to_call->arg = arg;
+ to_call->expected = expected;
+ to_call->failure[0] = '\0';
+ static_cast<char *>(fatal_failure)[0] = '\0';
+ children_[id] = ForkExecute(Hangs_, to_call).release();
+ if (!children_[id]) return AssertionFailure() << "ForkExecute failed";
+ CHECK_EQ(0, futex_wait(&to_call->started));
+ to_calls_[id] = reinterpret_cast<FunctionToCall<void> *>(to_call);
+ return AssertionSuccess();
+ }
+ // Checks whether or not a function hung like it was supposed to.
+ // Use HangsFork first.
+ // NOTE: calls to HangsFork and HangsCheck with the same id argument will
+ // correspond, but they do not nest. Also, id 0 is used by Hangs.
+ // Return value is the same as Hangs.
+ AssertionResult HangsCheck(ChildID id) {
+ std::unique_ptr<ForkedProcess> child(children_[id]);
+ children_.erase(id);
+ const ForkedProcess::JoinResult result = child->Join();
+ if (to_calls_[id]->failure[0] != '\0') {
+ return AssertionFailure() << "function says: "
+ << const_cast<char *>(to_calls_[id]->failure);
+ }
+ if (result == ForkedProcess::JoinResult::Finished) {
+ return !to_calls_[id]->expected ? AssertionSuccess() : (AssertionFailure()
+ << "something happened and the the test only got to "
+ << ResultTypeString(to_calls_[id]->result));
+ } else {
+ if (to_calls_[id]->result == ResultType::Called) {
+ return to_calls_[id]->expected ? AssertionSuccess() :
+ AssertionFailure();
+ } else if (result == ForkedProcess::JoinResult::Error) {
+ return AssertionFailure() << "error joining child";
+ } else {
+ if (to_calls_[id]->result == ResultType::NotCalled) {
+ return AssertionFailure() << "stuff took too long getting started";
+ }
+ return AssertionFailure() << "something weird happened";
+ }
+ }
+ }
+#define EXPECT_HANGS(function, arg) \
+ EXPECT_HANGS_COND(function, arg, true, EXPECT_TRUE)
+#define EXPECT_RETURNS(function, arg) \
+ EXPECT_HANGS_COND(function, arg, false, EXPECT_TRUE)
+#define EXPECT_RETURNS_FAILS(function, arg) \
+ EXPECT_HANGS_COND(function, arg, false, EXPECT_FALSE)
+#define EXPECT_HANGS_COND(function, arg, hangs, cond) do { \
+ cond(Hangs(function, arg, hangs)); \
+ if (fatal_failure[0] != '\0') { \
+ FAIL() << fatal_failure; \
+ } \
+} while (false)
+
+ struct TestMessage {
+ // Some contents because we don't really want to test empty messages.
+ int16_t data;
+ };
+ struct MessageArgs {
+ RawQueue *const queue;
+ Options<RawQueue> flags;
+ int16_t data; // -1 means NULL expected
+ };
+ static void WriteTestMessage(MessageArgs *args, char *failure) {
+ TestMessage *msg = static_cast<TestMessage *>(args->queue->GetMessage());
+ if (msg == NULL) {
+ snprintf(fatal_failure, kFailureSize,
+ "couldn't get_msg from %p", args->queue);
+ return;
+ }
+ msg->data = args->data;
+ if (!args->queue->WriteMessage(msg, args->flags)) {
+ snprintf(failure, kFailureSize, "%p->WriteMessage(%p, %x) failed",
+ args->queue, msg, args->flags.printable());
+ }
+ }
+ static void ReadTestMessage(MessageArgs *args, char *failure) {
+ const TestMessage *msg = static_cast<const TestMessage *>(
+ args->queue->ReadMessage(args->flags));
+ if (msg == NULL) {
+ if (args->data != -1) {
+ snprintf(failure, kFailureSize,
+ "expected data of %" PRId16 " but got NULL message",
+ args->data);
+ }
+ } else {
+ if (args->data != msg->data) {
+ snprintf(failure, kFailureSize,
+ "expected data of %" PRId16 " but got %" PRId16 " instead",
+ args->data, msg->data);
+ }
+ args->queue->FreeMessage(msg);
+ }
+ }
+
+ void PushMessage(RawQueue *queue, uint16_t data) {
+ TestMessage *message = static_cast<TestMessage *>(queue->GetMessage());
+ message->data = data;
+ ASSERT_TRUE(queue->WriteMessage(message, RawQueue::kOverride));
+ }
+
+ private:
+ ::aos::testing::TestSharedMemory my_shm_;
+};
+
+char *RawQueueTest::fatal_failure;
+std::map<RawQueueTest::ChildID, RawQueueTest::ForkedProcess *>
+ RawQueueTest::children_;
+constexpr chrono::nanoseconds RawQueueTest::kHangTime;
+constexpr chrono::nanoseconds RawQueueTest::kForkSleep;
+
+typedef RawQueueTest RawQueueDeathTest;
+
+TEST_F(RawQueueTest, Reading) {
+ RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 1);
+ MessageArgs args{queue, RawQueue::kBlock, -1};
+
+ args.flags = RawQueue::kNonBlock;
+ EXPECT_RETURNS(ReadTestMessage, &args);
+ args.flags = RawQueue::kNonBlock | RawQueue::kPeek;
+ EXPECT_RETURNS(ReadTestMessage, &args);
+ args.flags = RawQueue::kBlock;
+ EXPECT_HANGS(ReadTestMessage, &args);
+ args.flags = RawQueue::kPeek | RawQueue::kBlock;
+ EXPECT_HANGS(ReadTestMessage, &args);
+ args.data = 254;
+ args.flags = RawQueue::kBlock;
+ EXPECT_RETURNS(WriteTestMessage, &args);
+ args.flags = RawQueue::kPeek | RawQueue::kBlock;
+ EXPECT_RETURNS(ReadTestMessage, &args);
+ args.flags = RawQueue::kPeek | RawQueue::kBlock;
+ EXPECT_RETURNS(ReadTestMessage, &args);
+ args.flags = RawQueue::kPeek | RawQueue::kNonBlock;
+ EXPECT_RETURNS(ReadTestMessage, &args);
+ args.flags = RawQueue::kBlock;
+ EXPECT_RETURNS(ReadTestMessage, &args);
+ args.flags = RawQueue::kBlock;
+ args.data = -1;
+ EXPECT_HANGS(ReadTestMessage, &args);
+ args.flags = RawQueue::kNonBlock;
+ EXPECT_RETURNS(ReadTestMessage, &args);
+ args.flags = RawQueue::kBlock;
+ args.data = 971;
+ EXPECT_RETURNS_FAILS(ReadTestMessage, &args);
+}
+TEST_F(RawQueueTest, Writing) {
+ RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 1);
+ MessageArgs args{queue, RawQueue::kBlock, 973};
+
+ args.flags = RawQueue::kBlock;
+ EXPECT_RETURNS(WriteTestMessage, &args);
+ args.flags = RawQueue::kBlock;
+ EXPECT_HANGS(WriteTestMessage, &args);
+ args.flags = RawQueue::kNonBlock;
+ EXPECT_RETURNS_FAILS(WriteTestMessage, &args);
+ args.flags = RawQueue::kNonBlock;
+ EXPECT_RETURNS_FAILS(WriteTestMessage, &args);
+ args.flags = RawQueue::kPeek | RawQueue::kBlock;
+ EXPECT_RETURNS(ReadTestMessage, &args);
+ args.data = 971;
+ args.flags = RawQueue::kOverride;
+ EXPECT_RETURNS(WriteTestMessage, &args);
+ args.flags = RawQueue::kOverride;
+ EXPECT_RETURNS(WriteTestMessage, &args);
+ args.flags = RawQueue::kBlock;
+ EXPECT_RETURNS(ReadTestMessage, &args);
+ args.flags = RawQueue::kNonBlock;
+ EXPECT_RETURNS(WriteTestMessage, &args);
+ args.flags = RawQueue::kBlock;
+ EXPECT_RETURNS(ReadTestMessage, &args);
+ args.flags = RawQueue::kOverride;
+ EXPECT_RETURNS(WriteTestMessage, &args);
+ args.flags = RawQueue::kBlock;
+ EXPECT_RETURNS(ReadTestMessage, &args);
+}
+
+TEST_F(RawQueueTest, MultiRead) {
+ RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 1);
+ MessageArgs args{queue, RawQueue::kBlock, 1323};
+
+ args.flags = RawQueue::kBlock;
+ EXPECT_RETURNS(WriteTestMessage, &args);
+ args.flags = RawQueue::kBlock;
+ ASSERT_TRUE(HangsFork(ReadTestMessage, &args, true, 1));
+ ASSERT_TRUE(HangsFork(ReadTestMessage, &args, true, 2));
+ AssertionResult one = HangsCheck(1);
+ AssertionResult two = HangsCheck(2);
+ EXPECT_TRUE(one != two) << "'" <<
+ one.failure_message() << "' vs '" << two.failure_message() << "'";
+ // TODO(brians) finish this
+}
+
+// There used to be a bug where reading first without an index and then with an
+// index would crash. This test makes sure that's fixed.
+TEST_F(RawQueueTest, ReadIndexAndNot) {
+ RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 2);
+
+ // Write a message, read it (with ReadMessage), and then write another
+ // message (before freeing the read one so the queue allocates a distinct
+ // message to use for it).
+ TestMessage *msg = static_cast<TestMessage *>(queue->GetMessage());
+ ASSERT_NE(nullptr, msg);
+ ASSERT_TRUE(queue->WriteMessage(msg, RawQueue::kBlock));
+ const void *read_msg = queue->ReadMessage(RawQueue::kBlock);
+ EXPECT_NE(nullptr, read_msg);
+ msg = static_cast<TestMessage *>(queue->GetMessage());
+ queue->FreeMessage(read_msg);
+ ASSERT_NE(nullptr, msg);
+ ASSERT_TRUE(queue->WriteMessage(msg, RawQueue::kBlock));
+
+ int index = 0;
+ const void *second_read_msg =
+ queue->ReadMessageIndex(RawQueue::kBlock, &index);
+ EXPECT_NE(nullptr, second_read_msg);
+ EXPECT_NE(read_msg, second_read_msg)
+ << "We already took that message out of the queue.";
+}
+
+TEST_F(RawQueueTest, Recycle) {
+ // TODO(brians) basic test of recycle queue
+ // include all of the ways a message can get into the recycle queue
+ RawQueue *recycle_queue = reinterpret_cast<RawQueue *>(23);
+ RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage),
+ 1, 2, 2, 2, &recycle_queue);
+ ASSERT_NE(reinterpret_cast<RawQueue *>(23), recycle_queue);
+ MessageArgs args{queue, RawQueue::kBlock, 973},
+ recycle{recycle_queue, RawQueue::kBlock, 973};
+
+ args.flags = RawQueue::kBlock;
+ EXPECT_RETURNS(WriteTestMessage, &args);
+ EXPECT_HANGS(ReadTestMessage, &recycle);
+ args.data = 254;
+ EXPECT_RETURNS(WriteTestMessage, &args);
+ EXPECT_HANGS(ReadTestMessage, &recycle);
+ args.data = 971;
+ args.flags = RawQueue::kOverride;
+ EXPECT_RETURNS(WriteTestMessage, &args);
+ recycle.flags = RawQueue::kBlock;
+ EXPECT_RETURNS(ReadTestMessage, &recycle);
+
+ EXPECT_HANGS(ReadTestMessage, &recycle);
+
+ TestMessage *msg = static_cast<TestMessage *>(queue->GetMessage());
+ ASSERT_TRUE(msg != NULL);
+ msg->data = 341;
+ queue->FreeMessage(msg);
+ recycle.data = 341;
+ EXPECT_RETURNS(ReadTestMessage, &recycle);
+
+ EXPECT_HANGS(ReadTestMessage, &recycle);
+
+ args.data = 254;
+ args.flags = RawQueue::kPeek | RawQueue::kBlock;
+ EXPECT_RETURNS(ReadTestMessage, &args);
+ recycle.flags = RawQueue::kBlock;
+ EXPECT_HANGS(ReadTestMessage, &recycle);
+ args.flags = RawQueue::kBlock;
+ EXPECT_RETURNS(ReadTestMessage, &args);
+ recycle.data = 254;
+ EXPECT_RETURNS(ReadTestMessage, &recycle);
+}
+
+// Makes sure that when a message doesn't get written with kNonBlock it does get
+// freed.
+TEST_F(RawQueueTest, NonBlockFailFree) {
+ RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 1);
+
+ void *message1 = queue->GetMessage();
+ void *message2 = queue->GetMessage();
+ ASSERT_TRUE(queue->WriteMessage(message1, RawQueue::kNonBlock));
+ ASSERT_FALSE(queue->WriteMessage(message2, RawQueue::kNonBlock));
+ EXPECT_EQ(message2, queue->GetMessage());
+}
+
+// All of the tests from here down are designed to test every branch to
+// make sure it does what it's supposed to. They are generally pretty repetitive
+// and boring, and some of them may duplicate other tests above, but these ones
+// make it a lot easier to figure out what's wrong with bugs not related to race
+// conditions.
+
+TEST_F(RawQueueTest, ReadIndexNotFull) {
+ RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 2);
+ const TestMessage *message, *peek_message;
+
+ EXPECT_EQ(0, kExtraMessages + 2 - queue->FreeMessages());
+ PushMessage(queue, 971);
+ EXPECT_EQ(1, kExtraMessages + 2 - queue->FreeMessages());
+
+ int index = 0;
+ peek_message = static_cast<const TestMessage *>(
+ queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
+ message = static_cast<const TestMessage *>(
+ queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
+ ASSERT_NE(nullptr, message);
+ EXPECT_EQ(message, peek_message);
+ EXPECT_EQ(971, message->data);
+ EXPECT_EQ(1, index);
+ queue->FreeMessage(message);
+ queue->FreeMessage(peek_message);
+
+ PushMessage(queue, 1768);
+ EXPECT_EQ(2, kExtraMessages + 2 - queue->FreeMessages());
+ peek_message = static_cast<const TestMessage *>(
+ queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
+ message = static_cast<const TestMessage *>(
+ queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
+ ASSERT_NE(nullptr, message);
+ EXPECT_EQ(message, peek_message);
+ EXPECT_EQ(1768, message->data);
+ EXPECT_EQ(2, index);
+ queue->FreeMessage(message);
+ queue->FreeMessage(peek_message);
+
+ PushMessage(queue, 254);
+ peek_message = static_cast<const TestMessage *>(
+ queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
+ message = static_cast<const TestMessage *>(
+ queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
+ ASSERT_NE(nullptr, message);
+ EXPECT_EQ(message, peek_message);
+ EXPECT_EQ(254, message->data);
+ EXPECT_EQ(3, index);
+ queue->FreeMessage(message);
+ queue->FreeMessage(peek_message);
+
+ index = 0;
+ peek_message = static_cast<const TestMessage *>(queue->ReadMessage(
+ RawQueue::kNonBlock | RawQueue::kPeek | RawQueue::kFromEnd));
+ message = static_cast<const TestMessage *>(queue->ReadMessageIndex(
+ RawQueue::kNonBlock | RawQueue::kFromEnd, &index));
+ ASSERT_NE(nullptr, message);
+ EXPECT_EQ(message, peek_message);
+ EXPECT_EQ(254, message->data);
+ EXPECT_EQ(3, index);
+ queue->FreeMessage(message);
+ queue->FreeMessage(peek_message);
+
+ EXPECT_EQ(2, kExtraMessages + 2 - queue->FreeMessages());
+}
+
+TEST_F(RawQueueTest, ReadIndexNotBehind) {
+ RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 2);
+ const TestMessage *message, *peek_message;
+
+ EXPECT_EQ(0, kExtraMessages + 2 - queue->FreeMessages());
+ PushMessage(queue, 971);
+ EXPECT_EQ(1, kExtraMessages + 2 - queue->FreeMessages());
+ PushMessage(queue, 1768);
+ EXPECT_EQ(2, kExtraMessages + 2 - queue->FreeMessages());
+
+ int index = 0;
+ peek_message = static_cast<const TestMessage *>(
+ queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
+ message = static_cast<const TestMessage *>(
+ queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
+ ASSERT_NE(nullptr, message);
+ EXPECT_EQ(message, peek_message);
+ EXPECT_EQ(971, message->data);
+ EXPECT_EQ(1, index);
+ queue->FreeMessage(message);
+ queue->FreeMessage(peek_message);
+
+ index = 0;
+ peek_message = static_cast<const TestMessage *>(queue->ReadMessage(
+ RawQueue::kNonBlock | RawQueue::kPeek | RawQueue::kFromEnd));
+ message = static_cast<const TestMessage *>(queue->ReadMessageIndex(
+ RawQueue::kNonBlock | RawQueue::kFromEnd, &index));
+ ASSERT_NE(nullptr, message);
+ EXPECT_EQ(message, peek_message);
+ EXPECT_EQ(1768, message->data);
+ EXPECT_EQ(2, index);
+ queue->FreeMessage(message);
+}
+
+TEST_F(RawQueueTest, ReadIndexLittleBehindNotFull) {
+ RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 2);
+ const TestMessage *message, *peek_message;
+
+ PushMessage(queue, 971);
+ PushMessage(queue, 1768);
+ ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
+
+ int index = 0;
+
+ peek_message = static_cast<const TestMessage *>(
+ queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
+ message = static_cast<const TestMessage *>(
+ queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
+ ASSERT_NE(nullptr, message);
+ EXPECT_EQ(message, peek_message);
+ EXPECT_EQ(1768, message->data);
+ EXPECT_EQ(2, index);
+ queue->FreeMessage(message);
+ queue->FreeMessage(peek_message);
+
+ index = 0;
+ peek_message = static_cast<const TestMessage *>(queue->ReadMessage(
+ RawQueue::kNonBlock | RawQueue::kPeek | RawQueue::kFromEnd));
+ message = static_cast<const TestMessage *>(queue->ReadMessageIndex(
+ RawQueue::kNonBlock | RawQueue::kFromEnd, &index));
+ ASSERT_NE(nullptr, message);
+ EXPECT_EQ(message, peek_message);
+ EXPECT_EQ(1768, message->data);
+ EXPECT_EQ(2, index);
+ queue->FreeMessage(message);
+}
+
+TEST_F(RawQueueTest, ReadIndexMoreBehind) {
+ RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 2);
+ const TestMessage *message, *peek_message;
+
+ PushMessage(queue, 971);
+ PushMessage(queue, 1768);
+ ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
+ PushMessage(queue, 254);
+
+ int index = 0;
+
+ peek_message = static_cast<const TestMessage *>(
+ queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
+ message = static_cast<const TestMessage *>(
+ queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
+ ASSERT_NE(nullptr, message);
+ EXPECT_EQ(message, peek_message);
+ EXPECT_EQ(1768, message->data);
+ EXPECT_EQ(2, index);
+ queue->FreeMessage(message);
+ queue->FreeMessage(peek_message);
+
+ peek_message = static_cast<const TestMessage *>(
+ queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
+ message = static_cast<const TestMessage *>(
+ queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
+ ASSERT_NE(nullptr, message);
+ EXPECT_EQ(message, peek_message);
+ EXPECT_EQ(254, message->data);
+ EXPECT_EQ(3, index);
+ queue->FreeMessage(message);
+ queue->FreeMessage(peek_message);
+
+ index = 0;
+ peek_message = static_cast<const TestMessage *>(queue->ReadMessage(
+ RawQueue::kNonBlock | RawQueue::kPeek | RawQueue::kFromEnd));
+ message = static_cast<const TestMessage *>(queue->ReadMessageIndex(
+ RawQueue::kNonBlock | RawQueue::kFromEnd, &index));
+ ASSERT_NE(nullptr, message);
+ EXPECT_EQ(message, peek_message);
+ EXPECT_EQ(254, message->data);
+ EXPECT_EQ(3, index);
+ queue->FreeMessage(message);
+}
+
+TEST_F(RawQueueTest, ReadIndexMoreBehindNotFull) {
+ RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 2);
+ const TestMessage *message, *peek_message;
+
+ PushMessage(queue, 971);
+ PushMessage(queue, 1768);
+ ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
+ PushMessage(queue, 254);
+ ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
+
+ int index = 0;
+
+ peek_message = static_cast<const TestMessage *>(
+ queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
+ message = static_cast<const TestMessage *>(
+ queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
+ ASSERT_NE(nullptr, message);
+ EXPECT_EQ(message, peek_message);
+ EXPECT_EQ(254, message->data);
+ EXPECT_EQ(3, index);
+ queue->FreeMessage(message);
+ queue->FreeMessage(peek_message);
+
+ index = 0;
+ peek_message = static_cast<const TestMessage *>(queue->ReadMessage(
+ RawQueue::kNonBlock | RawQueue::kPeek | RawQueue::kFromEnd));
+ message = static_cast<const TestMessage *>(queue->ReadMessageIndex(
+ RawQueue::kNonBlock | RawQueue::kFromEnd, &index));
+ ASSERT_NE(nullptr, message);
+ EXPECT_EQ(message, peek_message);
+ EXPECT_EQ(254, message->data);
+ EXPECT_EQ(3, index);
+ queue->FreeMessage(message);
+}
+
+TEST_F(RawQueueTest, ReadIndexLotBehind) {
+ RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 2);
+ const TestMessage *message, *peek_message;
+
+ PushMessage(queue, 971);
+ PushMessage(queue, 1768);
+ {
+ const void *message1, *message2;
+ message1 = queue->ReadMessage(RawQueue::kNonBlock);
+ ASSERT_NE(nullptr, message1);
+ PushMessage(queue, 254);
+ message2 = queue->ReadMessage(RawQueue::kNonBlock);
+ ASSERT_NE(nullptr, message2);
+ PushMessage(queue, 973);
+ EXPECT_EQ(4, kExtraMessages + 2 - queue->FreeMessages());
+ queue->FreeMessage(message1);
+ EXPECT_EQ(3, kExtraMessages + 2 - queue->FreeMessages());
+ queue->FreeMessage(message2);
+ EXPECT_EQ(2, kExtraMessages + 2 - queue->FreeMessages());
+ }
+
+ int index = 0;
+
+ peek_message = static_cast<const TestMessage *>(
+ queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
+ message = static_cast<const TestMessage *>(
+ queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
+ ASSERT_NE(nullptr, message);
+ EXPECT_EQ(message, peek_message);
+ EXPECT_EQ(254, message->data);
+ EXPECT_EQ(3, index);
+ queue->FreeMessage(message);
+ queue->FreeMessage(peek_message);
+
+ peek_message = static_cast<const TestMessage *>(
+ queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
+ message = static_cast<const TestMessage *>(
+ queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
+ ASSERT_NE(nullptr, message);
+ EXPECT_EQ(message, peek_message);
+ EXPECT_EQ(973, message->data);
+ EXPECT_EQ(4, index);
+ queue->FreeMessage(message);
+ queue->FreeMessage(peek_message);
+
+ index = 0;
+ peek_message = static_cast<const TestMessage *>(queue->ReadMessage(
+ RawQueue::kNonBlock | RawQueue::kPeek | RawQueue::kFromEnd));
+ message = static_cast<const TestMessage *>(queue->ReadMessageIndex(
+ RawQueue::kNonBlock | RawQueue::kFromEnd, &index));
+ ASSERT_NE(nullptr, message);
+ EXPECT_EQ(message, peek_message);
+ EXPECT_EQ(973, message->data);
+ EXPECT_EQ(4, index);
+ queue->FreeMessage(message);
+}
+
+TEST_F(RawQueueTest, ReadIndexLotBehindNotFull) {
+ RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 2);
+ const TestMessage *message, *peek_message;
+
+ PushMessage(queue, 971);
+ PushMessage(queue, 1768);
+ ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
+ PushMessage(queue, 254);
+ ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
+ PushMessage(queue, 973);
+ ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
+
+ int index = 0;
+
+ peek_message = static_cast<const TestMessage *>(
+ queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
+ message = static_cast<const TestMessage *>(
+ queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
+ ASSERT_NE(nullptr, message);
+ EXPECT_EQ(message, peek_message);
+ EXPECT_EQ(973, message->data);
+ EXPECT_EQ(4, index);
+ queue->FreeMessage(message);
+ queue->FreeMessage(peek_message);
+
+ index = 0;
+ peek_message = static_cast<const TestMessage *>(queue->ReadMessage(
+ RawQueue::kNonBlock | RawQueue::kPeek | RawQueue::kFromEnd));
+ message = static_cast<const TestMessage *>(queue->ReadMessageIndex(
+ RawQueue::kNonBlock | RawQueue::kFromEnd, &index));
+ ASSERT_NE(nullptr, message);
+ EXPECT_EQ(message, peek_message);
+ EXPECT_EQ(973, message->data);
+ EXPECT_EQ(4, index);
+ queue->FreeMessage(message);
+}
+
+TEST_F(RawQueueTest, ReadIndexEvenMoreBehind) {
+ RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 2);
+ const TestMessage *message, *peek_message;
+
+ PushMessage(queue, 971);
+ PushMessage(queue, 1768);
+ ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
+ PushMessage(queue, 254);
+ ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
+ PushMessage(queue, 973);
+ ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
+ PushMessage(queue, 1114);
+
+ int index = 0;
+
+ peek_message = static_cast<const TestMessage *>(
+ queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
+ message = static_cast<const TestMessage *>(
+ queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
+ ASSERT_NE(nullptr, message);
+ EXPECT_EQ(message, peek_message);
+ EXPECT_EQ(973, message->data);
+ EXPECT_EQ(4, index);
+ queue->FreeMessage(message);
+ queue->FreeMessage(peek_message);
+
+ peek_message = static_cast<const TestMessage *>(
+ queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
+ message = static_cast<const TestMessage *>(
+ queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
+ ASSERT_NE(nullptr, message);
+ EXPECT_EQ(message, peek_message);
+ EXPECT_EQ(1114, message->data);
+ EXPECT_EQ(5, index);
+ queue->FreeMessage(message);
+ queue->FreeMessage(peek_message);
+
+ index = 0;
+ peek_message = static_cast<const TestMessage *>(queue->ReadMessage(
+ RawQueue::kNonBlock | RawQueue::kPeek | RawQueue::kFromEnd));
+ message = static_cast<const TestMessage *>(queue->ReadMessageIndex(
+ RawQueue::kNonBlock | RawQueue::kFromEnd, &index));
+ ASSERT_NE(nullptr, message);
+ EXPECT_EQ(message, peek_message);
+ EXPECT_EQ(1114, message->data);
+ EXPECT_EQ(5, index);
+ queue->FreeMessage(message);
+}
+
+TEST_F(RawQueueTest, ReadIndexEvenMoreBehindNotFull) {
+ RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 2);
+ const TestMessage *message, *peek_message;
+
+ PushMessage(queue, 971);
+ PushMessage(queue, 1768);
+ ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
+ PushMessage(queue, 254);
+ ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
+ PushMessage(queue, 973);
+ ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
+ PushMessage(queue, 1114);
+ ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
+
+ int index = 0;
+
+ peek_message = static_cast<const TestMessage *>(
+ queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
+ message = static_cast<const TestMessage *>(
+ queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
+ ASSERT_NE(nullptr, message);
+ EXPECT_EQ(message, peek_message);
+ EXPECT_EQ(1114, message->data);
+ EXPECT_EQ(5, index);
+ queue->FreeMessage(message);
+ queue->FreeMessage(peek_message);
+
+ index = 0;
+ peek_message = static_cast<const TestMessage *>(queue->ReadMessage(
+ RawQueue::kNonBlock | RawQueue::kPeek | RawQueue::kFromEnd));
+ message = static_cast<const TestMessage *>(queue->ReadMessageIndex(
+ RawQueue::kNonBlock | RawQueue::kFromEnd, &index));
+ ASSERT_NE(nullptr, message);
+ EXPECT_EQ(message, peek_message);
+ EXPECT_EQ(1114, message->data);
+ EXPECT_EQ(5, index);
+ queue->FreeMessage(message);
+}
+
+TEST_F(RawQueueTest, MessageReferenceCounts) {
+ RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 2);
+ const void *message1, *message2;
+
+ EXPECT_EQ(0, kExtraMessages + 2 - queue->FreeMessages());
+ message1 = queue->GetMessage();
+ EXPECT_NE(nullptr, message1);
+ EXPECT_EQ(1, kExtraMessages + 2 - queue->FreeMessages());
+ message2 = queue->GetMessage();
+ EXPECT_NE(nullptr, message2);
+ EXPECT_EQ(2, kExtraMessages + 2 - queue->FreeMessages());
+ queue->FreeMessage(message1);
+ EXPECT_EQ(1, kExtraMessages + 2 - queue->FreeMessages());
+ queue->FreeMessage(message2);
+ EXPECT_EQ(0, kExtraMessages + 2 - queue->FreeMessages());
+}
+
+// Tests that writing with kNonBlock fails and frees the message.
+TEST_F(RawQueueTest, WriteDontBlock) {
+ RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 1);
+ void *message;
+
+ PushMessage(queue, 971);
+ int free_before = queue->FreeMessages();
+ message = queue->GetMessage();
+ ASSERT_NE(nullptr, message);
+ EXPECT_NE(free_before, queue->FreeMessages());
+ EXPECT_FALSE(queue->WriteMessage(message, RawQueue::kNonBlock));
+ EXPECT_EQ(free_before, queue->FreeMessages());
+}
+
+// Tests that writing with kOverride pushes the last message out of the queue.
+TEST_F(RawQueueTest, WriteOverride) {
+ RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 2);
+ TestMessage *message1;
+
+ PushMessage(queue, 971);
+ PushMessage(queue, 1768);
+ int free_before = queue->FreeMessages();
+ message1 = static_cast<TestMessage *>(queue->GetMessage());
+ ASSERT_NE(nullptr, message1);
+ EXPECT_NE(free_before, queue->FreeMessages());
+ message1->data = 254;
+ EXPECT_TRUE(queue->WriteMessage(message1, RawQueue::kOverride));
+ EXPECT_EQ(free_before, queue->FreeMessages());
+
+ const TestMessage *message2;
+ message2 =
+ static_cast<const TestMessage *>(queue->ReadMessage(RawQueue::kNonBlock));
+ EXPECT_EQ(1768, message2->data);
+ queue->FreeMessage(message2);
+ EXPECT_EQ(free_before + 1, queue->FreeMessages());
+ message2 =
+ static_cast<const TestMessage *>(queue->ReadMessage(RawQueue::kNonBlock));
+ EXPECT_EQ(254, message2->data);
+ queue->FreeMessage(message2);
+ EXPECT_EQ(free_before + 2, queue->FreeMessages());
+}
+
+// Makes sure that ThreadSanitizer doesn't catch any issues freeing from
+// multiple threads at once.
+TEST_F(RawQueueTest, MultiThreadedFree) {
+ RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 1);
+ PushMessage(queue, 971);
+ int free_before = queue->FreeMessages();
+
+ const void *const message1 =
+ queue->ReadMessage(RawQueue::kPeek | RawQueue::kNonBlock);
+ const void *const message2 =
+ queue->ReadMessage(RawQueue::kPeek | RawQueue::kNonBlock);
+ ASSERT_NE(nullptr, message1);
+ ASSERT_NE(nullptr, message2);
+ EXPECT_EQ(free_before, queue->FreeMessages());
+ util::FunctionThread t1([message1, queue](util::Thread *) {
+ queue->FreeMessage(message1);
+ });
+ util::FunctionThread t2([message2, queue](util::Thread *) {
+ queue->FreeMessage(message2);
+ });
+ t1.Start();
+ t2.Start();
+ t1.WaitUntilDone();
+ t2.WaitUntilDone();
+ EXPECT_EQ(free_before, queue->FreeMessages());
+}
+
+TEST_F(RawQueueDeathTest, OptionsValidation) {
+ RawQueue *const queue = RawQueue::Fetch("Queue", 1, 1, 1);
+
+ EXPECT_DEATH(
+ {
+ logging::AddImplementation(new util::DeathTestLogImplementation());
+ queue->WriteMessage(nullptr, RawQueue::kPeek);
+ },
+ ".*illegal write option.*");
+ EXPECT_DEATH(
+ {
+ logging::AddImplementation(new util::DeathTestLogImplementation());
+ queue->WriteMessage(nullptr, RawQueue::kFromEnd);
+ },
+ ".*illegal write option.*");
+ EXPECT_DEATH(
+ {
+ logging::AddImplementation(new util::DeathTestLogImplementation());
+ queue->WriteMessage(nullptr, RawQueue::kPeek | RawQueue::kFromEnd);
+ },
+ ".*illegal write option.*");
+ EXPECT_DEATH(
+ {
+ logging::AddImplementation(new util::DeathTestLogImplementation());
+ queue->WriteMessage(nullptr, RawQueue::kNonBlock | RawQueue::kBlock);
+ },
+ ".*invalid write option.*");
+
+ EXPECT_DEATH(
+ {
+ logging::AddImplementation(new util::DeathTestLogImplementation());
+ queue->ReadMessageIndex(
+ RawQueue::kBlock | RawQueue::kFromEnd | RawQueue::kPeek, nullptr);
+ },
+ ".*ReadMessageIndex.*is not allowed.*");
+ EXPECT_DEATH(
+ {
+ logging::AddImplementation(new util::DeathTestLogImplementation());
+ queue->ReadMessageIndex(RawQueue::kOverride, nullptr);
+ },
+ ".*illegal read option.*");
+ EXPECT_DEATH(
+ {
+ logging::AddImplementation(new util::DeathTestLogImplementation());
+ queue->ReadMessageIndex(RawQueue::kOverride | RawQueue::kBlock,
+ nullptr);
+ },
+ ".*illegal read option.*");
+ EXPECT_DEATH(
+ {
+ logging::AddImplementation(new util::DeathTestLogImplementation());
+ queue->ReadMessage(RawQueue::kNonBlock | RawQueue::kBlock);
+ },
+ ".*invalid read option.*");
+}
+
+} // namespace testing
+} // namespace aos
diff --git a/aos/ipc_lib/shared_mem.c b/aos/ipc_lib/shared_mem.c
new file mode 100644
index 0000000..e8edc9e
--- /dev/null
+++ b/aos/ipc_lib/shared_mem.c
@@ -0,0 +1,141 @@
+#include "aos/ipc_lib/shared_mem.h"
+
+#include <stdio.h>
+#include <string.h>
+#include <sys/mman.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <sys/types.h>
+#include <errno.h>
+#include <stdlib.h>
+#include <assert.h>
+
+#include "aos/ipc_lib/core_lib.h"
+#include "aos/logging/logging.h"
+#include "aos/ipc_lib/aos_sync.h"
+
+// the path for the shared memory segment. see shm_open(3) for restrictions
+#define AOS_SHM_NAME "/aos_shared_mem"
+// Size of the shared mem segment.
+// This must fit in the tmpfs for /dev/shm/
+#define SIZEOFSHMSEG (4096 * 0x3000)
+
+void init_shared_mem_core(aos_shm_core *shm_core) {
+ memset(&shm_core->time_offset, 0 , sizeof(shm_core->time_offset));
+ memset(&shm_core->msg_alloc_lock, 0, sizeof(shm_core->msg_alloc_lock));
+ shm_core->queues.pointer = NULL;
+ memset(&shm_core->queues.lock, 0, sizeof(shm_core->queues.lock));
+ shm_core->queue_types.pointer = NULL;
+ memset(&shm_core->queue_types.lock, 0, sizeof(shm_core->queue_types.lock));
+}
+
+ptrdiff_t aos_core_get_mem_usage(void) {
+ return global_core->size -
+ ((ptrdiff_t)global_core->mem_struct->msg_alloc -
+ (ptrdiff_t)global_core->mem_struct);
+}
+
+struct aos_core *global_core = NULL;
+
+// TODO(brians): madvise(2) it to put this shm in core dumps.
+void aos_core_create_shared_mem(int create, int lock) {
+ assert(global_core == NULL);
+ static struct aos_core global_core_data;
+ global_core = &global_core_data;
+
+ {
+ char *shm_name = getenv("AOS_SHM_NAME");
+ if (shm_name == NULL) {
+ global_core->shm_name = AOS_SHM_NAME;
+ } else {
+ printf("AOS_SHM_NAME defined, using %s\n", shm_name);
+ global_core->shm_name = shm_name;
+ }
+ }
+
+ int shm;
+ if (create) {
+ while (1) {
+ shm = shm_open(global_core->shm_name, O_RDWR | O_CREAT | O_EXCL, 0666);
+ global_core->owner = 1;
+ if (shm == -1 && errno == EEXIST) {
+ printf("shared_mem: going to shm_unlink(%s)\n", global_core->shm_name);
+ if (shm_unlink(global_core->shm_name) == -1) {
+ PLOG(WARNING, "shm_unlink(%s) failed", global_core->shm_name);
+ break;
+ }
+ } else {
+ break;
+ }
+ }
+ } else {
+ shm = shm_open(global_core->shm_name, O_RDWR, 0);
+ global_core->owner = 0;
+ }
+ if (shm == -1) {
+ PLOG(FATAL, "shm_open(%s, O_RDWR [| O_CREAT | O_EXCL, 0|0666) failed",
+ global_core->shm_name);
+ }
+ if (global_core->owner) {
+ if (ftruncate(shm, SIZEOFSHMSEG) == -1) {
+ PLOG(FATAL, "fruncate(%d, 0x%zx) failed", shm, (size_t)SIZEOFSHMSEG);
+ }
+ }
+ int flags = MAP_SHARED | MAP_FIXED;
+ if (lock) flags |= MAP_LOCKED | MAP_POPULATE;
+ void *shm_address = mmap((void *)SHM_START, SIZEOFSHMSEG,
+ PROT_READ | PROT_WRITE, flags, shm, 0);
+ if (shm_address == MAP_FAILED) {
+ PLOG(FATAL, "shared_mem: mmap(%p, 0x%zx, stuff, %x, %d, 0) failed",
+ (void *)SHM_START, (size_t)SIZEOFSHMSEG, flags, shm);
+ }
+ if (create) {
+ printf("shared_mem: creating %s, shm at: %p\n", global_core->shm_name,
+ shm_address);
+ } else {
+ printf("shared_mem: not creating, shm at: %p\n", shm_address);
+ }
+ if (close(shm) == -1) {
+ PLOG(WARNING, "close(%d(=shm) failed", shm);
+ }
+ if (shm_address != (void *)SHM_START) {
+ LOG(FATAL, "shm isn't at hard-coded %p. at %p instead\n",
+ (void *)SHM_START, shm_address);
+ }
+ aos_core_use_address_as_shared_mem(shm_address, SIZEOFSHMSEG);
+ LOG(INFO, "shared_mem: end of create_shared_mem owner=%d\n",
+ global_core->owner);
+}
+
+void aos_core_use_address_as_shared_mem(void *address, size_t size) {
+ global_core->mem_struct = address;
+ global_core->size = size;
+ global_core->shared_mem =
+ (uint8_t *)address + sizeof(*global_core->mem_struct);
+ if (global_core->owner) {
+ global_core->mem_struct->msg_alloc = (uint8_t *)address + global_core->size;
+ init_shared_mem_core(global_core->mem_struct);
+ futex_set(&global_core->mem_struct->creation_condition);
+ } else {
+ if (futex_wait(&global_core->mem_struct->creation_condition) != 0) {
+ LOG(FATAL, "waiting on creation_condition failed\n");
+ }
+ }
+}
+
+void aos_core_free_shared_mem() {
+ void *shm_address = global_core->shared_mem;
+ if (munmap((void *)SHM_START, SIZEOFSHMSEG) == -1) {
+ PLOG(FATAL, "munmap(%p, 0x%zx) failed", shm_address,
+ (size_t)SIZEOFSHMSEG);
+ }
+ if (global_core->owner) {
+ if (shm_unlink(global_core->shm_name)) {
+ PLOG(FATAL, "shared_mem: shm_unlink(%s) failed", global_core->shm_name);
+ }
+ }
+}
+
+int aos_core_is_init(void) {
+ return global_core != NULL;
+}
diff --git a/aos/ipc_lib/shared_mem.h b/aos/ipc_lib/shared_mem.h
new file mode 100644
index 0000000..edb9091
--- /dev/null
+++ b/aos/ipc_lib/shared_mem.h
@@ -0,0 +1,39 @@
+#ifndef _SHARED_MEM_H_
+#define _SHARED_MEM_H_
+
+#include <stddef.h>
+#include <unistd.h>
+#include <time.h>
+
+#include "aos/ipc_lib/shared_mem_types.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+void init_shared_mem_core(aos_shm_core *shm_core);
+
+ptrdiff_t aos_core_get_mem_usage(void);
+
+// Takes the specified memory address and uses it as the shared memory.
+// address is the memory address, and size is the size of the memory.
+// global_core needs to point to an instance of struct aos_core, and owner
+// should be set correctly there.
+// The owner should verify that the first sizeof(mutex) of data is set to 0
+// before passing the memory to this function.
+void aos_core_use_address_as_shared_mem(void *address, size_t size);
+
+// create is true to remove any existing shm to create a fresh one or false to
+// fail if it does not already exist.
+// lock is true to lock shared memory into RAM or false to not.
+void aos_core_create_shared_mem(int create, int lock);
+void aos_core_free_shared_mem(void);
+
+// Returns whether or not the shared memory system is active.
+int aos_core_is_init(void);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/aos/ipc_lib/shared_mem_types.h b/aos/ipc_lib/shared_mem_types.h
new file mode 100644
index 0000000..f8555d5
--- /dev/null
+++ b/aos/ipc_lib/shared_mem_types.h
@@ -0,0 +1,64 @@
+#ifndef AOS_IPC_LIB_SHARED_MEM_TYPES_H_
+#define AOS_IPC_LIB_SHARED_MEM_TYPES_H_
+
+#include <stddef.h>
+
+#include "aos/ipc_lib/aos_sync.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+extern struct aos_core *global_core __attribute__((weak));
+
+// Where the shared memory segment starts in each process's address space.
+// Has to be the same in all of them so that stuff in shared memory
+// can have regular pointers to other stuff in shared memory.
+#define SHM_START 0x20000000
+
+// A structure that represents some kind of global pointer that everything
+// shares.
+typedef struct aos_global_pointer_t {
+ struct aos_mutex lock;
+ void *pointer;
+} aos_global_pointer;
+
+typedef struct aos_shm_core_t {
+ // Gets 0-initialized at the start (as part of shared memory) and
+ // the owner sets as soon as it finishes setting stuff up.
+ aos_condition creation_condition;
+
+ // An offset from CLOCK_MONOTONIC to times for all the code in nanoseconds.
+ // This is currently only set to non-zero by the log replay code.
+ // There is no synchronization on this to avoid the overhead because it is
+ // only updated with proper memory barriers when only a single task is
+ // running.
+ int64_t time_offset;
+
+ struct aos_mutex msg_alloc_lock;
+ void *msg_alloc;
+
+ // A pointer to the head of the linked list of queues.
+ // pointer points to a ::aos::Queue.
+ aos_global_pointer queues;
+ // A pointer to the head of the linked list of queue message types.
+ // pointer points to a ::aos::type_cache::ShmType.
+ aos_global_pointer queue_types;
+} aos_shm_core;
+
+struct aos_core {
+ // Non-0 if we "own" shared_mem and should shm_unlink(3) it when we're done.
+ int owner;
+ void *shared_mem;
+ // How large the chunk of shared memory is.
+ ptrdiff_t size;
+ aos_shm_core *mem_struct;
+ // For the owner to store the name of the file to unlink when closing.
+ const char *shm_name;
+};
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // AOS_IPC_LIB_SHARED_MEM_TYPES_H_
diff --git a/aos/ipc_lib/unique_message_ptr.h b/aos/ipc_lib/unique_message_ptr.h
new file mode 100644
index 0000000..e30a4c0
--- /dev/null
+++ b/aos/ipc_lib/unique_message_ptr.h
@@ -0,0 +1,39 @@
+#include <memory>
+
+#include "aos/atom_code/ipc_lib/queue.h"
+
+namespace aos {
+namespace internal {
+
+template<typename T>
+class queue_free {
+ public:
+ queue_free(RawQueue *queue) : queue_(queue) {}
+
+ void operator()(const T *message) {
+ queue_->FreeMessage(static_cast<const void *>(message));
+ }
+
+ private:
+ RawQueue *const queue_;
+};
+
+} // namespace internal
+
+template<typename T>
+class unique_message_ptr : public ::std::unique_ptr<T, ::aos::internal::queue_free<T>> {
+ public:
+ unique_message_ptr(RawQueue *queue, T *message = NULL)
+ : ::std::unique_ptr<T, ::aos::internal::queue_free<T>>(message, ::aos::internal::queue_free<T>(queue)) {}
+
+ // Perfectly forward this so that the move functionality of ::std::unique_ptr
+ // works.
+ template <typename... Args>
+ unique_message_ptr<T> &operator=(Args &&... args) {
+ ::std::unique_ptr<T, ::aos::internal::queue_free<T>>::operator=(
+ ::std::forward<Args>(args)...);
+ return *this;
+ }
+};
+
+} // namespace aos