Removed linux_code

Change-Id: I7327828d2c9efdf03172d1b90f49d5c51fbba86e
diff --git a/aos/ipc_lib/BUILD b/aos/ipc_lib/BUILD
new file mode 100644
index 0000000..20972e4
--- /dev/null
+++ b/aos/ipc_lib/BUILD
@@ -0,0 +1,168 @@
+package(default_visibility = ["//visibility:public"])
+
+cc_library(
+    name = "aos_sync",
+    srcs = [
+        "aos_sync.cc",
+    ],
+    hdrs = [
+        "aos_sync.h",
+    ],
+    compatible_with = [
+        "//tools:armhf-debian",
+    ],
+    linkopts = [
+        "-lpthread",
+    ],
+    deps = [
+        "//aos:once",
+        "//aos:macros",
+        "//aos/logging",
+        "//aos/util:compiler_memory_barrier",
+    ],
+)
+
+cc_library(
+    name = "core_lib",
+    srcs = [
+        "core_lib.c",
+    ],
+    hdrs = [
+        "core_lib.h",
+    ],
+    compatible_with = [
+        "//tools:armhf-debian",
+    ],
+    deps = [
+        ":aos_sync",
+        ":shared_mem_types",
+    ],
+)
+
+cc_library(
+    name = "shared_mem",
+    srcs = [
+        "shared_mem.c",
+    ],
+    hdrs = [
+        "shared_mem.h",
+    ],
+    compatible_with = [
+        "//tools:armhf-debian",
+    ],
+    linkopts = [
+        "-lrt",
+    ],
+    deps = [
+        ":aos_sync",
+        ":core_lib",
+        ":shared_mem_types",
+        "//aos/logging",
+    ],
+)
+
+cc_library(
+    # TODO(Brian): This should be shared_mem{,.h}, and the other one should be
+    # shared_mem_init{,.cc,.h}.
+    name = "shared_mem_types",
+    hdrs = [
+        "shared_mem_types.h",
+    ],
+    compatible_with = [
+        "//tools:armhf-debian",
+    ],
+    deps = [
+        ":aos_sync",
+    ],
+)
+
+cc_library(
+    name = "queue",
+    srcs = [
+        "queue.cc",
+    ],
+    hdrs = [
+        "queue.h",
+    ],
+    compatible_with = [
+        "//tools:armhf-debian",
+    ],
+    linkopts = [
+        "-lrt",
+    ],
+    deps = [
+        ":core_lib",
+        ":shared_mem",
+        "//aos:condition",
+        "//aos/mutex:mutex",
+        "//aos/logging",
+        "//aos/util:options",
+    ],
+)
+
+cc_test(
+    name = "raw_queue_test",
+    srcs = [
+        "raw_queue_test.cc",
+    ],
+    deps = [
+        ":core_lib",
+        ":queue",
+        "//aos:die",
+        "//aos/time:time",
+        "//aos/logging",
+        "//aos/util:death_test_log_implementation",
+        "//aos/util:thread",
+        "//aos/testing:googletest",
+        "//aos/testing:prevent_exit",
+        "//aos/testing:test_shm",
+    ],
+)
+
+cc_test(
+    name = "ipc_stress_test",
+    srcs = [
+        "ipc_stress_test.cc",
+    ],
+    compatible_with = [
+        "//tools:armhf-debian",
+    ],
+    tags = [
+        "manual",
+    ],
+    deps = [
+        ":core_lib",
+        "//aos:die",
+        "//aos/mutex:mutex",
+        "//aos/time:time",
+        "//aos/libc:aos_strsignal",
+        "//aos/libc:dirname",
+        "//aos/logging",
+        "//aos/testing:googletest",
+        "//aos/testing:test_shm",
+    ],
+)
+
+cc_library(
+    name = "scoped_message_ptr",
+    deps = [
+        ":queue",
+    ],
+)
+
+cc_binary(
+    name = "ipc_comparison",
+    srcs = [
+        "ipc_comparison.cc",
+    ],
+    deps = [
+        ":queue",
+        "//aos:condition",
+        "//aos:event",
+        "//aos/mutex:mutex",
+        "//aos/logging",
+        "//aos/logging:implementations",
+        "//aos:init",
+        "//third_party/gflags",
+    ],
+)
diff --git a/aos/ipc_lib/aos_sync.cc b/aos/ipc_lib/aos_sync.cc
new file mode 100644
index 0000000..c1be351
--- /dev/null
+++ b/aos/ipc_lib/aos_sync.cc
@@ -0,0 +1,1030 @@
+#if !AOS_DEBUG
+#undef NDEBUG
+#define NDEBUG
+#endif
+
+#include "aos/ipc_lib/aos_sync.h"
+
+#include <linux/futex.h>
+#include <unistd.h>
+#include <sys/syscall.h>
+#include <errno.h>
+#include <stdint.h>
+#include <limits.h>
+#include <string.h>
+#include <inttypes.h>
+#include <sys/types.h>
+#include <stddef.h>
+#include <assert.h>
+#include <pthread.h>
+#include <sched.h>
+
+#ifdef AOS_SANITIZER_thread
+#include <sanitizer/tsan_interface_atomic.h>
+#endif
+
+#include <algorithm>
+#include <type_traits>
+
+#include "aos/logging/logging.h"
+#include "aos/macros.h"
+#include "aos/util/compiler_memory_barrier.h"
+#include "aos/once.h"
+
+using ::aos::linux_code::ipc_lib::FutexAccessorObserver;
+
+// This code was originally based on <https://www.akkadia.org/drepper/futex.pdf>,
+// but is has since evolved a lot. However, that still has useful information.
+//
+// Finding information about actually using futexes is really REALLY hard, so
+//   here's a list of the stuff that I've used:
+// futex(7) has a really high-level overview.
+// <http://locklessinc.com/articles/futex_cheat_sheet/> describes some of the
+//   operations in a bit more detail than most places.
+// <http://locklessinc.com/articles/mutex_cv_futex/> is the basis of our
+//   implementations (before PI).
+// <http://lwn.net/Articles/360699/> has a nice overview of futexes in late 2009
+//   (fairly recent compared to everything else...).
+// <https://www.kernel.org/doc/Documentation/pi-futex.txt>,
+//   <https://www.kernel.org/doc/Documentation/futex-requeue-pi.txt>,
+//   <https://www.kernel.org/doc/Documentation/robust-futexes.txt>,
+//   and <https://www.kernel.org/doc/Documentation/robust-futex-ABI.txt> are all
+//   useful references.
+// The kernel source (kernel/futex.c) has some useful comments about what the
+//   various operations do (except figuring out which argument goes where in the
+//   syscall is still confusing).
+// futex(2) is basically useless except for describing the order of the
+//   arguments (it only has high-level descriptions of what some of the
+//   operations do, and some of them are wrong in Wheezy).
+// glibc's nptl pthreads implementation is the intended user of most of these
+//   things, so it is also a good place to look for examples. However, it is all
+//   very hard to read because it supports ~20 different kinds of mutexes and
+//   several variations of condition variables, and some of the pieces of code
+//   are only written in assembly.
+// set_robust_list(2) is wrong in Wheezy (it doesn't actually take a TID
+//   argument).
+//
+// Can't use PRIVATE futex operations because they use the pid (or something) as
+//   part of the hash.
+//
+// ThreadSanitizer understands how these mutexes etc work. It appears to be able
+// to figure out the happens-before relationship from the __ATOMIC_SEQ_CST
+// atomic primitives.
+//
+// Remember that EAGAIN and EWOUDBLOCK are the same! (ie if you get EAGAIN from
+// FUTEX_WAIT, the docs call it EWOULDBLOCK...)
+
+// Values for an aos_mutex.futex (kernel-mandated):
+// 0 = unlocked
+// TID = locked, not contended
+// |FUTEX_WAITERS = there are waiters (aka contended)
+// |FUTEX_OWNER_DIED = old owner died
+//
+// Values for an aos_futex being used directly:
+// 0 = unset
+// 1 = set
+//
+// The value of an aos_condition is just a generation counter.
+
+// Whether or not to use the REQUEUE_PI operation. Using it is better (less
+// syscalls and the highest priority waiter is always the one that gets woken),
+// but there's a kernel bug that results in random memory corruption while using
+// them.
+// The alternative is to just wake everybody and have them all race to relock
+// the mutex (classic thundering herd).
+// Currently just whether or not we're not on ARM because we only run this on
+// ARM kernels with the patch to fix that issue applied. This will likely change
+// to something based on kernel version at some point.
+#ifdef __arm__
+#define USE_REQUEUE_PI 1
+#else
+#define USE_REQUEUE_PI 0
+#endif
+
+#ifdef AOS_SANITIZER_thread
+extern "C" void AnnotateHappensBefore(const char *file, int line,
+                                      uintptr_t addr);
+extern "C" void AnnotateHappensAfter(const char *file, int line,
+                                     uintptr_t addr);
+#define ANNOTATE_HAPPENS_BEFORE(address)    \
+  AnnotateHappensBefore(__FILE__, __LINE__, \
+                        reinterpret_cast<uintptr_t>(address))
+#define ANNOTATE_HAPPENS_AFTER(address) \
+  AnnotateHappensAfter(__FILE__, __LINE__, reinterpret_cast<uintptr_t>(address))
+#else
+#define ANNOTATE_HAPPENS_BEFORE(address)
+#define ANNOTATE_HAPPENS_AFTER(address)
+#endif
+
+namespace {
+
+const bool kRobustListDebug = false;
+const bool kLockDebug = false;
+const bool kPrintOperations = false;
+
+// These sys_futex_* functions are wrappers around syscall(SYS_futex). They each
+// take a specific set of arguments for a given futex operation. They return the
+// result or a negated errno value. -1..-4095 mean errors and not successful
+// results, which is guaranteed by the kernel.
+//
+// They each have optimized versions for ARM EABI (the syscall interface is
+// different for non-EABI ARM, so that is the right thing to test for) that
+// don't go through syscall(2) or errno.
+// These use register variables to get the values in the right registers to
+// actually make the syscall.
+
+// The actual macro that we key off of to use the inline versions or not.
+#if defined(__ARM_EABI__)
+#define ARM_EABI_INLINE_SYSCALL 1
+#else
+#define ARM_EABI_INLINE_SYSCALL 0
+#endif
+
+// Used for FUTEX_WAIT, FUTEX_LOCK_PI, and FUTEX_TRYLOCK_PI.
+inline int sys_futex_wait(int op, aos_futex *addr1, int val1,
+                          const struct timespec *timeout) {
+#if ARM_EABI_INLINE_SYSCALL
+  register aos_futex *addr1_reg __asm__("r0") = addr1;
+  register int op_reg __asm__("r1") = op;
+  register int val1_reg __asm__("r2") = val1;
+  register const struct timespec *timeout_reg __asm__("r3") = timeout;
+  register int syscall_number __asm__("r7") = SYS_futex;
+  register int result __asm__("r0");
+  __asm__ volatile("swi #0"
+                   : "=r"(result)
+                   : "r"(addr1_reg), "r"(op_reg), "r"(val1_reg),
+                     "r"(timeout_reg), "r"(syscall_number)
+                   : "memory");
+  return result;
+#else
+  const int r = syscall(SYS_futex, addr1, op, val1, timeout);
+  if (r == -1) return -errno;
+  return r;
+#endif
+}
+
+inline int sys_futex_wake(aos_futex *addr1, int val1) {
+#if ARM_EABI_INLINE_SYSCALL
+  register aos_futex *addr1_reg __asm__("r0") = addr1;
+  register int op_reg __asm__("r1") = FUTEX_WAKE;
+  register int val1_reg __asm__("r2") = val1;
+  register int syscall_number __asm__("r7") = SYS_futex;
+  register int result __asm__("r0");
+  __asm__ volatile("swi #0"
+                   : "=r"(result)
+                   : "r"(addr1_reg), "r"(op_reg), "r"(val1_reg),
+                     "r"(syscall_number)
+                   : "memory");
+  return result;
+#else
+  const int r = syscall(SYS_futex, addr1, FUTEX_WAKE, val1);
+  if (r == -1) return -errno;
+  return r;
+#endif
+}
+
+inline int sys_futex_cmp_requeue_pi(aos_futex *addr1, int num_wake,
+    int num_requeue, aos_futex *m, uint32_t val) {
+#if ARM_EABI_INLINE_SYSCALL
+  register aos_futex *addr1_reg __asm__("r0") = addr1;
+  register int op_reg __asm__("r1") = FUTEX_CMP_REQUEUE_PI;
+  register int num_wake_reg __asm__("r2") = num_wake;
+  register int num_requeue_reg __asm__("r3") = num_requeue;
+  register aos_futex *m_reg __asm__("r4") = m;
+  register uint32_t val_reg __asm__("r5") = val;
+  register int syscall_number __asm__("r7") = SYS_futex;
+  register int result __asm__("r0");
+  __asm__ volatile("swi #0"
+                   : "=r"(result)
+                   : "r"(addr1_reg), "r"(op_reg), "r"(num_wake_reg),
+                     "r"(num_requeue_reg), "r"(m_reg), "r"(val_reg),
+                     "r"(syscall_number)
+                   : "memory");
+  return result;
+#else
+  const int r = syscall(SYS_futex, addr1, FUTEX_CMP_REQUEUE_PI, num_wake,
+                        num_requeue, m, val);
+  if (r == -1) return -errno;
+  return r;
+#endif
+}
+
+inline int sys_futex_wait_requeue_pi(aos_condition *addr1,
+                                     uint32_t start_val,
+                                     const struct timespec *timeout,
+                                     aos_futex *m) {
+#if ARM_EABI_INLINE_SYSCALL
+  register aos_condition *addr1_reg __asm__("r0") = addr1;
+  register int op_reg __asm__("r1") = FUTEX_WAIT_REQUEUE_PI;
+  register uint32_t start_val_reg __asm__("r2") = start_val;
+  register const struct timespec *timeout_reg __asm__("r3") = timeout;
+  register aos_futex *m_reg __asm__("r4") = m;
+  register int syscall_number __asm__("r7") = SYS_futex;
+  register int result __asm__("r0");
+  __asm__ volatile("swi #0"
+                   : "=r"(result)
+                   : "r"(addr1_reg), "r"(op_reg), "r"(start_val_reg),
+                     "r"(timeout_reg), "r"(m_reg), "r"(syscall_number)
+                   : "memory");
+  return result;
+#else
+  const int r =
+      syscall(SYS_futex, addr1, FUTEX_WAIT_REQUEUE_PI, start_val, timeout, m);
+  if (r == -1) return -errno;
+  return r;
+#endif
+}
+
+inline int sys_futex_unlock_pi(aos_futex *addr1) {
+#if ARM_EABI_INLINE_SYSCALL
+  register aos_futex *addr1_reg __asm__("r0") = addr1;
+  register int op_reg __asm__("r1") = FUTEX_UNLOCK_PI;
+  register int syscall_number __asm__("r7") = SYS_futex;
+  register int result __asm__("r0");
+  __asm__ volatile("swi #0"
+                   : "=r"(result)
+                   : "r"(addr1_reg), "r"(op_reg), "r"(syscall_number)
+                   : "memory");
+  return result;
+#else
+  const int r = syscall(SYS_futex, addr1, FUTEX_UNLOCK_PI);
+  if (r == -1) return -errno;
+  return r;
+#endif
+}
+
+// Returns the previous value of f.
+inline uint32_t compare_and_swap_val(aos_futex *f, uint32_t before,
+                                     uint32_t after) {
+#ifdef AOS_SANITIZER_thread
+  // This is a workaround for <https://llvm.org/bugs/show_bug.cgi?id=23176>.
+  // Basically, most of the atomic operations are broken under tsan, but this
+  // particular one isn't.
+  // TODO(Brian): Remove this #ifdef (and the one in compare_and_swap) once we
+  // don't have to worry about tsan with this bug any more.
+  uint32_t before_value = before;
+  __tsan_atomic32_compare_exchange_strong(
+      reinterpret_cast<int32_t *>(f),
+      reinterpret_cast<int32_t *>(&before_value), after,
+      __tsan_memory_order_seq_cst, __tsan_memory_order_seq_cst);
+  return before_value;
+#else
+  return __sync_val_compare_and_swap(f, before, after);
+#endif
+}
+
+// Returns true if it succeeds and false if it fails.
+inline bool compare_and_swap(aos_futex *f, uint32_t before, uint32_t after) {
+#ifdef AOS_SANITIZER_thread
+  return compare_and_swap_val(f, before, after) == before;
+#else
+  return __sync_bool_compare_and_swap(f, before, after);
+#endif
+}
+
+#ifdef AOS_SANITIZER_thread
+
+// Simple macro for checking something which should always be true.
+// Using the standard CHECK macro isn't safe because failures often result in
+// reentering the mutex locking code, which doesn't work.
+#define SIMPLE_CHECK(expr)                                                   \
+  do {                                                                       \
+    if (!(expr)) {                                                           \
+      fprintf(stderr, "%s: %d: SIMPLE_CHECK(" #expr ") failed!\n", __FILE__, \
+              __LINE__);                                                     \
+      abort();                                                               \
+    }                                                                        \
+  } while (false)
+
+// Forcibly initializes the pthread mutex for *m.
+// This sequence of operations is only safe for the simpler kinds of mutexes in
+// glibc's pthreads implementation on Linux.
+void init_pthread_mutex(aos_mutex *m) {
+  // Re-initialize the mutex so the destroy won't fail if it's locked.
+  // tsan ignores this.
+  SIMPLE_CHECK(0 == pthread_mutex_init(&m->pthread_mutex, nullptr));
+  // Destroy the mutex so tsan will forget about it if some now-dead thread
+  // locked it.
+  SIMPLE_CHECK(0 == pthread_mutex_destroy(&m->pthread_mutex));
+
+  // Now actually initialize it, making sure it's process-shareable so it works
+  // correctly across shared memory.
+  pthread_mutexattr_t attr;
+  SIMPLE_CHECK(0 == pthread_mutexattr_init(&attr));
+  SIMPLE_CHECK(0 == pthread_mutexattr_setpshared(&attr, true));
+  SIMPLE_CHECK(0 == pthread_mutex_init(&m->pthread_mutex, &attr));
+  SIMPLE_CHECK(0 == pthread_mutexattr_destroy(&attr));
+}
+
+// Locks the pthread mutex for *m.
+// If a stack trace ever reveals the pthread_mutex_lock call in here blocking,
+// there is a bug in our mutex code or the way somebody is calling it.
+void lock_pthread_mutex(aos_mutex *m) {
+  if (!m->pthread_mutex_init) {
+    init_pthread_mutex(m);
+    m->pthread_mutex_init = true;
+  }
+  SIMPLE_CHECK(0 == pthread_mutex_lock(&m->pthread_mutex));
+}
+
+// Forcibly locks the pthread mutex for *m.
+// This will (somewhat hackily) rip the lock out from underneath somebody else
+// who is already holding it.
+void force_lock_pthread_mutex(aos_mutex *m) {
+  if (!m->pthread_mutex_init) {
+    init_pthread_mutex(m);
+    m->pthread_mutex_init = true;
+  }
+  const int trylock_result = pthread_mutex_trylock(&m->pthread_mutex);
+  SIMPLE_CHECK(trylock_result == 0 || trylock_result == EBUSY);
+  if (trylock_result == 0) {
+    // We're good, so unlock it and then go for a real lock down below.
+    SIMPLE_CHECK(0 == pthread_mutex_unlock(&m->pthread_mutex));
+  } else {
+    // Somebody (should always be somebody else who died with it held) already
+    // has it, so make tsan forget about that.
+    init_pthread_mutex(m);
+  }
+  lock_pthread_mutex(m);
+}
+
+// Unlocks the pthread mutex for *m.
+void unlock_pthread_mutex(aos_mutex *m) {
+  assert(m->pthread_mutex_init);
+  SIMPLE_CHECK(0 == pthread_mutex_unlock(&m->pthread_mutex));
+}
+
+#else
+
+// Empty implementations of all these so the code below doesn't need #ifdefs.
+static inline void lock_pthread_mutex(aos_mutex *) {}
+static inline void force_lock_pthread_mutex(aos_mutex *) {}
+static inline void unlock_pthread_mutex(aos_mutex *) {}
+
+#endif
+
+pid_t do_get_tid() {
+  pid_t r = syscall(SYS_gettid);
+  assert(r > 0);
+  return r;
+}
+
+// This gets called by functions before LOG(FATAL)ing with error messages that
+// would be incorrect if the error was caused by a process forking without
+// initialize_in_new_thread getting called in the fork.
+void check_cached_tid(pid_t tid) {
+  pid_t actual = do_get_tid();
+  if (tid != actual) {
+    LOG(FATAL,
+        "task %jd forked into %jd without letting aos_sync know"
+        " so we're not really sure what's going on\n",
+        static_cast<intmax_t>(tid), static_cast<intmax_t>(actual));
+  }
+}
+
+// Starts off at 0 in each new thread (because that's what it gets initialized
+// to in most of them or it gets to reset to 0 after a fork by atfork_child()).
+thread_local pid_t my_tid = 0;
+
+// Gets called before the fork(2) wrapper function returns in the child.
+void atfork_child() {
+  // The next time get_tid() is called, it will set everything up again.
+  my_tid = 0;
+}
+
+void *InstallAtforkHook() {
+  if (pthread_atfork(NULL, NULL, atfork_child) != 0) {
+    PLOG(FATAL, "pthread_atfork(NULL, NULL, %p) failed", atfork_child);
+  }
+  return nullptr;
+}
+
+// This gets called to set everything up in a new thread by get_tid().
+void initialize_in_new_thread();
+
+// Gets the current thread's TID and does all of the 1-time initialization the
+// first time it's called in a given thread.
+inline uint32_t get_tid() {
+  if (__builtin_expect(my_tid == 0, false)) {
+    initialize_in_new_thread();
+  }
+  static_assert(sizeof(my_tid) <= sizeof(uint32_t), "pid_t is too big");
+  return static_cast<uint32_t>(my_tid);
+}
+
+// Contains all of the stuff for dealing with the robust list. Nothing outside
+// this namespace should touch anything inside it except Init, Adder, and
+// Remover.
+namespace my_robust_list {
+
+static_assert(offsetof(aos_mutex, next) == 0,
+              "Our math all assumes that the beginning of a mutex and its next "
+              "pointer are at the same place in memory.");
+
+// Our version of robust_list_head.
+// This is copied from the kernel header because that's a pretty stable ABI (and
+// any changes will be backwards compatible anyways) and we want ours to have
+// different types.
+// The uintptr_ts are &next of the elements in the list (with stuff |ed in).
+struct aos_robust_list_head {
+  uintptr_t next;
+  long futex_offset;
+  uintptr_t pending_next;
+};
+
+static_assert(offsetof(aos_robust_list_head, next) ==
+                  offsetof(robust_list_head, list),
+              "Our aos_robust_list_head doesn't match the kernel's");
+static_assert(offsetof(aos_robust_list_head, futex_offset) ==
+                  offsetof(robust_list_head, futex_offset),
+              "Our aos_robust_list_head doesn't match the kernel's");
+static_assert(offsetof(aos_robust_list_head, pending_next) ==
+                  offsetof(robust_list_head, list_op_pending),
+              "Our aos_robust_list_head doesn't match the kernel's");
+static_assert(sizeof(aos_robust_list_head) == sizeof(robust_list_head),
+              "Our aos_robust_list_head doesn't match the kernel's");
+
+thread_local aos_robust_list_head robust_head;
+
+// Extra offset between mutex values and where we point to for their robust list
+// entries (from SetRobustListOffset).
+uintptr_t robust_list_offset = 0;
+
+// The value to OR each pointer's value with whenever putting it into the robust
+// list (technically only if it's PI, but all of ours are, so...).
+static const uintptr_t kRobustListOr = 1;
+
+// Returns the value which goes into a next variable to represent the head.
+inline uintptr_t robust_head_next_value() {
+  return reinterpret_cast<uintptr_t>(&robust_head.next);
+}
+// Returns true iff next represents the head.
+inline bool next_is_head(uintptr_t next) {
+  return next == robust_head_next_value();
+}
+// Returns the (psuedo-)mutex corresponding to the head.
+// This does NOT have a previous pointer, so be careful with the return value.
+inline aos_mutex *robust_head_mutex() {
+  return reinterpret_cast<aos_mutex *>(robust_head_next_value());
+}
+
+inline uintptr_t mutex_to_next(aos_mutex *m) {
+  return (reinterpret_cast<uintptr_t>(&m->next) + robust_list_offset) |
+         kRobustListOr;
+}
+inline aos_mutex *next_to_mutex(uintptr_t next) {
+  if (__builtin_expect(robust_list_offset != 0, false) && next_is_head(next)) {
+    // We don't offset the head pointer, so be careful.
+    return reinterpret_cast<aos_mutex *>(next);
+  }
+  return reinterpret_cast<aos_mutex *>(
+      (next & ~kRobustListOr) - robust_list_offset);
+}
+
+// Sets up the robust list for each thread.
+void Init() {
+  // It starts out just pointing back to itself.
+  robust_head.next = robust_head_next_value();
+  robust_head.futex_offset = static_cast<ssize_t>(offsetof(aos_mutex, futex)) -
+                             static_cast<ssize_t>(offsetof(aos_mutex, next));
+  robust_head.pending_next = 0;
+  if (syscall(SYS_set_robust_list, robust_head_next_value(), sizeof(robust_head)) !=
+      0) {
+    PLOG(FATAL, "set_robust_list(%p, %zd) failed",
+         reinterpret_cast<void *>(robust_head.next), sizeof(robust_head));
+  }
+  if (kRobustListDebug) {
+    printf("%" PRId32 ": init done\n", get_tid());
+  }
+}
+
+// Updating the offset with locked mutexes is important during robustness
+// testing, because there are mutexes which are locked before this is set to a
+// non-0 value and then unlocked after it is changed back. However, to make sure
+// the code works correctly when manipulating the next pointer of the last of
+// those mutexes, all of their next values have to be adjusted appropriately.
+void SetRobustListOffset(uintptr_t offset) {
+  const uintptr_t offset_change = offset - robust_list_offset;
+  robust_list_offset = offset;
+  aos_mutex *m = robust_head_mutex();
+  // Update the offset contained in each of the mutexes which is already locked.
+  while (!next_is_head(m->next)) {
+    m->next += offset_change;
+    m = next_to_mutex(m->next);
+  }
+}
+
+bool HaveLockedMutexes() {
+  return robust_head.next != robust_head_next_value();
+}
+
+// Handles adding a mutex to the robust list.
+// The idea is to create one of these at the beginning of a function that needs
+// to do this and then call Add() iff it should actually be added.
+class Adder {
+ public:
+  Adder(aos_mutex *m) : m_(m) {
+    assert(robust_head.pending_next == 0);
+    if (kRobustListDebug) {
+      printf("%" PRId32 ": maybe add %p\n", get_tid(), m_);
+    }
+    robust_head.pending_next = mutex_to_next(m);
+    aos_compiler_memory_barrier();
+  }
+  ~Adder() {
+    assert(robust_head.pending_next == mutex_to_next(m_));
+    if (kRobustListDebug) {
+      printf("%" PRId32 ": done maybe add %p, n=%p p=%p\n", get_tid(), m_,
+             next_to_mutex(m_->next), m_->previous);
+    }
+    aos_compiler_memory_barrier();
+    robust_head.pending_next = 0;
+  }
+
+  void Add() {
+    assert(robust_head.pending_next == mutex_to_next(m_));
+    if (kRobustListDebug) {
+      printf("%" PRId32 ": adding %p\n", get_tid(), m_);
+    }
+    const uintptr_t old_head_next_value = robust_head.next;
+
+    m_->next = old_head_next_value;
+    aos_compiler_memory_barrier();
+    robust_head.next = mutex_to_next(m_);
+
+    m_->previous = robust_head_mutex();
+    if (!next_is_head(old_head_next_value)) {
+      // robust_head's psuedo-mutex doesn't have a previous pointer to update.
+      next_to_mutex(old_head_next_value)->previous = m_;
+    }
+    aos_compiler_memory_barrier();
+    if (kRobustListDebug) {
+      printf("%" PRId32 ": done adding %p\n", get_tid(), m_);
+    }
+  }
+
+ private:
+  aos_mutex *const m_;
+
+  DISALLOW_COPY_AND_ASSIGN(Adder);
+};
+
+// Handles removing a mutex from the robust list.
+// The idea is to create one of these at the beginning of a function that needs
+// to do this.
+class Remover {
+ public:
+  Remover(aos_mutex *m) {
+    assert(robust_head.pending_next == 0);
+    if (kRobustListDebug) {
+      printf("%" PRId32 ": beginning to remove %p, n=%p p=%p\n", get_tid(), m,
+             next_to_mutex(m->next), m->previous);
+    }
+    robust_head.pending_next = mutex_to_next(m);
+    aos_compiler_memory_barrier();
+
+    aos_mutex *const previous = m->previous;
+    const uintptr_t next_value = m->next;
+
+    previous->next = m->next;
+    if (!next_is_head(next_value)) {
+      // robust_head's psuedo-mutex doesn't have a previous pointer to update.
+      next_to_mutex(next_value)->previous = previous;
+    }
+
+    if (kRobustListDebug) {
+      printf("%" PRId32 ": done removing %p\n", get_tid(), m);
+    }
+  }
+  ~Remover() {
+    assert(robust_head.pending_next != 0);
+    aos_compiler_memory_barrier();
+    robust_head.pending_next = 0;
+    if (kRobustListDebug) {
+      printf("%" PRId32 ": done with removal\n", get_tid());
+    }
+  }
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(Remover);
+};
+
+}  // namespace my_robust_list
+
+void initialize_in_new_thread() {
+  // No synchronization necessary in most of this because it's all thread-local!
+
+  my_tid = do_get_tid();
+
+  static ::aos::Once<void> atfork_hook_installed(InstallAtforkHook);
+  atfork_hook_installed.Get();
+
+  my_robust_list::Init();
+}
+
+FutexAccessorObserver before_observer = nullptr, after_observer = nullptr;
+
+// RAII class which runs before_observer during construction and after_observer
+// during destruction.
+class RunObservers {
+ public:
+  template <class T>
+  RunObservers(T *address, bool write)
+      : address_(static_cast<void *>(
+            const_cast<typename ::std::remove_cv<T>::type *>(address))),
+        write_(write) {
+    if (__builtin_expect(before_observer != nullptr, false)) {
+      before_observer(address_, write_);
+    }
+  }
+  ~RunObservers() {
+    if (__builtin_expect(after_observer != nullptr, false)) {
+      after_observer(address_, write_);
+    }
+  }
+
+ private:
+  void *const address_;
+  const bool write_;
+
+  DISALLOW_COPY_AND_ASSIGN(RunObservers);
+};
+
+// Finishes the locking of a mutex by potentially clearing FUTEX_OWNER_DIED in
+// the futex and returning the correct value.
+inline int mutex_finish_lock(aos_mutex *m) {
+  const uint32_t value = __atomic_load_n(&m->futex, __ATOMIC_ACQUIRE);
+  if (__builtin_expect((value & FUTEX_OWNER_DIED) != 0, false)) {
+    __atomic_and_fetch(&m->futex, ~FUTEX_OWNER_DIED, __ATOMIC_RELAXED);
+    force_lock_pthread_mutex(m);
+    return 1;
+  } else {
+    lock_pthread_mutex(m);
+    return 0;
+  }
+}
+
+// Split out separately from mutex_get so condition_wait can call it and use its
+// own my_robust_list::Adder.
+inline int mutex_do_get(aos_mutex *m, bool signals_fail,
+                        const struct timespec *timeout, uint32_t tid) {
+  RunObservers run_observers(m, true);
+  if (kPrintOperations) {
+    printf("%" PRId32 ": %p do_get\n", tid, m);
+  }
+
+  while (true) {
+    // If the atomic 0->TID transition fails.
+    if (!compare_and_swap(&m->futex, 0, tid)) {
+      // Wait in the kernel, which handles atomically ORing in FUTEX_WAITERS
+      // before actually sleeping.
+      const int ret = sys_futex_wait(FUTEX_LOCK_PI, &m->futex, 1, timeout);
+      if (ret != 0) {
+        if (timeout != NULL && ret == -ETIMEDOUT) {
+          return 3;
+        }
+        if (__builtin_expect(ret == -EINTR, true)) {
+          if (signals_fail) {
+            return 2;
+          } else {
+            continue;
+          }
+        }
+        my_robust_list::robust_head.pending_next = 0;
+        if (ret == -EDEADLK) {
+          LOG(FATAL, "multiple lock of %p by %" PRId32 "\n", m, tid);
+        }
+        PELOG(FATAL, -ret, "FUTEX_LOCK_PI(%p(=%" PRIu32 "), 1, %p) failed",
+              &m->futex, __atomic_load_n(&m->futex, __ATOMIC_SEQ_CST), timeout);
+      } else {
+        if (kLockDebug) {
+          printf("%" PRId32 ": %p kernel lock done\n", tid, m);
+        }
+        // The kernel already handled setting the value to our TID (ish).
+        break;
+      }
+    } else {
+      if (kLockDebug) {
+        printf("%" PRId32 ": %p fast lock done\n", tid, m);
+      }
+      lock_pthread_mutex(m);
+      // Fastpath succeeded, so no need to call into the kernel.
+      // Because this is the fastpath, it's a good idea to avoid even having to
+      // load the value again down below.
+      return 0;
+    }
+  }
+
+  return mutex_finish_lock(m);
+}
+
+// The common implementation for everything that wants to lock a mutex.
+// If signals_fail is false, the function will try again if the wait syscall is
+// interrupted by a signal.
+// timeout can be NULL for no timeout.
+inline int mutex_get(aos_mutex *m, bool signals_fail,
+                     const struct timespec *timeout) {
+  const uint32_t tid = get_tid();
+  my_robust_list::Adder adder(m);
+  const int r = mutex_do_get(m, signals_fail, timeout, tid);
+  if (r == 0 || r == 1) adder.Add();
+  return r;
+}
+
+// The common implementation for broadcast and signal.
+// number_requeue is the number of waiters to requeue (probably INT_MAX or 0). 1
+// will always be woken.
+void condition_wake(aos_condition *c, aos_mutex *m, int number_requeue) {
+  RunObservers run_observers(c, true);
+  // Make it so that anybody just going to sleep won't.
+  // This is where we might accidentally wake more than just 1 waiter with 1
+  // signal():
+  //   1 already sleeping will be woken but n might never actually make it to
+  //     sleep in the kernel because of this.
+  uint32_t new_value = __atomic_add_fetch(c, 1, __ATOMIC_SEQ_CST);
+
+  if (USE_REQUEUE_PI) {
+    while (true) {
+      // This really wants to be FUTEX_REQUEUE_PI, but the kernel doesn't have
+      // that... However, the code to support that is in the kernel, so it might
+      // be a good idea to patch it to support that and use it iff it's there.
+      const int ret =
+          sys_futex_cmp_requeue_pi(c, 1, number_requeue, &m->futex, new_value);
+      if (ret < 0) {
+        // If the value got changed out from under us (aka somebody else did a
+        // condition_wake).
+        if (__builtin_expect(ret == -EAGAIN, true)) {
+          // If we're doing a broadcast, the other guy might have done a signal
+          // instead, so we have to try again.
+          // If we're doing a signal, we have to go again to make sure that 2
+          // signals wake 2 processes.
+          new_value = __atomic_load_n(c, __ATOMIC_RELAXED);
+          continue;
+        }
+        my_robust_list::robust_head.pending_next = 0;
+        PELOG(FATAL, -ret, "FUTEX_CMP_REQUEUE_PI(%p, 1, %d, %p, *%p) failed",
+              c, number_requeue, &m->futex, c);
+      } else {
+        return;
+      }
+    }
+  } else {
+    const int ret = sys_futex_wake(
+        c, ::std::min(::std::max(number_requeue, 1), INT_MAX - 4096));
+    if (__builtin_expect(
+            static_cast<unsigned int>(ret) > static_cast<unsigned int>(-4096),
+            false)) {
+      my_robust_list::robust_head.pending_next = 0;
+      PELOG(FATAL, -ret, "FUTEX_WAKE(%p, %d) failed", c, INT_MAX - 4096);
+    }
+  }
+}
+
+}  // namespace
+
+int mutex_lock(aos_mutex *m) {
+  return mutex_get(m, true, NULL);
+}
+int mutex_lock_timeout(aos_mutex *m, const struct timespec *timeout) {
+  return mutex_get(m, true, timeout);
+}
+int mutex_grab(aos_mutex *m) {
+  return mutex_get(m, false, NULL);
+}
+
+void mutex_unlock(aos_mutex *m) {
+  RunObservers run_observers(m, true);
+  const uint32_t tid = get_tid();
+  if (kPrintOperations) {
+    printf("%" PRId32 ": %p unlock\n", tid, m);
+  }
+
+  const uint32_t value = __atomic_load_n(&m->futex, __ATOMIC_SEQ_CST);
+  if (__builtin_expect((value & FUTEX_TID_MASK) != tid, false)) {
+    my_robust_list::robust_head.pending_next = 0;
+    check_cached_tid(tid);
+    if ((value & FUTEX_TID_MASK) == 0) {
+      LOG(FATAL, "multiple unlock of aos_mutex %p by %" PRId32 "\n", m, tid);
+    } else {
+      LOG(FATAL, "aos_mutex %p is locked by %" PRId32 ", not %" PRId32 "\n",
+          m, value & FUTEX_TID_MASK, tid);
+    }
+  }
+
+  my_robust_list::Remover remover(m);
+  unlock_pthread_mutex(m);
+
+  // If the atomic TID->0 transition fails (ie FUTEX_WAITERS is set),
+  if (!compare_and_swap(&m->futex, tid, 0)) {
+    // The kernel handles everything else.
+    const int ret = sys_futex_unlock_pi(&m->futex);
+    if (ret != 0) {
+      my_robust_list::robust_head.pending_next = 0;
+      PELOG(FATAL, -ret, "FUTEX_UNLOCK_PI(%p) failed", &m->futex);
+    }
+  } else {
+    // There aren't any waiters, so no need to call into the kernel.
+  }
+}
+
+int mutex_trylock(aos_mutex *m) {
+  RunObservers run_observers(m, true);
+  const uint32_t tid = get_tid();
+  if (kPrintOperations) {
+    printf("%" PRId32 ": %p trylock\n", tid, m);
+  }
+  my_robust_list::Adder adder(m);
+
+  // Try an atomic 0->TID transition.
+  uint32_t c = compare_and_swap_val(&m->futex, 0, tid);
+
+  if (c != 0) {
+    if (__builtin_expect((c & FUTEX_OWNER_DIED) == 0, true)) {
+      // Somebody else had it locked; we failed.
+      return 4;
+    } else {
+      // FUTEX_OWNER_DIED was set, so we have to call into the kernel to deal
+      // with resetting it.
+      const int ret = sys_futex_wait(FUTEX_TRYLOCK_PI, &m->futex, 0, NULL);
+      if (ret == 0) {
+        adder.Add();
+        // Only clear the owner died if somebody else didn't do the recovery
+        // and then unlock before our TRYLOCK happened.
+        return mutex_finish_lock(m);
+      } else {
+        // EWOULDBLOCK means that somebody else beat us to it.
+        if (__builtin_expect(ret == -EWOULDBLOCK, true)) {
+          return 4;
+        }
+        my_robust_list::robust_head.pending_next = 0;
+        PELOG(FATAL, -ret, "FUTEX_TRYLOCK_PI(%p, 0, NULL) failed", &m->futex);
+      }
+    }
+  }
+
+  lock_pthread_mutex(m);
+  adder.Add();
+  return 0;
+}
+
+bool mutex_islocked(const aos_mutex *m) {
+  const uint32_t tid = get_tid();
+
+  const uint32_t value = __atomic_load_n(&m->futex, __ATOMIC_RELAXED);
+  return (value & FUTEX_TID_MASK) == tid;
+}
+
+int condition_wait(aos_condition *c, aos_mutex *m) {
+  RunObservers run_observers(c, false);
+  const uint32_t tid = get_tid();
+  const uint32_t wait_start = __atomic_load_n(c, __ATOMIC_SEQ_CST);
+
+  mutex_unlock(m);
+
+  my_robust_list::Adder adder(m);
+
+  while (true) {
+    // Wait in the kernel iff the value of it doesn't change (ie somebody else
+    // does a wake) from before we unlocked the mutex.
+    int ret;
+    if (USE_REQUEUE_PI) {
+      ret = sys_futex_wait_requeue_pi(c, wait_start, nullptr, &m->futex);
+    } else {
+      ret = sys_futex_wait(FUTEX_WAIT, c, wait_start, nullptr);
+    }
+    if (ret != 0) {
+      // If it failed because somebody else did a wake and changed the value
+      // before we actually made it to sleep.
+      if (__builtin_expect(ret == -EAGAIN, true)) {
+        // There's no need to unconditionally set FUTEX_WAITERS here if we're
+        // using REQUEUE_PI because the kernel automatically does that in the
+        // REQUEUE_PI iff it requeued anybody.
+        // If we're not using REQUEUE_PI, then everything is just normal locks
+        // etc, so there's no need to do anything special there either.
+
+        // We have to relock it ourself because the kernel didn't do it.
+        const int r = mutex_do_get(m, false, nullptr, tid);
+        assert(__builtin_expect(r == 0 || r == 1, true));
+        adder.Add();
+        return r;
+      }
+      // Try again if it was because of a signal.
+      if (__builtin_expect(ret == -EINTR, true)) continue;
+      my_robust_list::robust_head.pending_next = 0;
+      if (USE_REQUEUE_PI) {
+        PELOG(FATAL, -ret, "FUTEX_WAIT_REQUEUE_PI(%p, %" PRIu32 ", %p) failed",
+              c, wait_start, &m->futex);
+      } else {
+        PELOG(FATAL, -ret, "FUTEX_WAIT(%p, %" PRIu32 ", nullptr) failed",
+              c, wait_start);
+      }
+    } else {
+      if (USE_REQUEUE_PI) {
+        // Record that the kernel relocked it for us.
+        lock_pthread_mutex(m);
+      } else {
+        // We have to take the lock ourself because the kernel won't, but
+        // there's no need for it to be anything special because all waiters
+        // just relock it like usual.
+        const int r = mutex_do_get(m, false, nullptr, tid);
+        assert(__builtin_expect(r == 0 || r == 1, true));
+        adder.Add();
+        return r;
+      }
+
+      // We succeeded in waiting, and the kernel took care of locking the mutex
+      // for us and setting FUTEX_WAITERS iff it needed to (for REQUEUE_PI).
+
+      adder.Add();
+
+      const uint32_t value = __atomic_load_n(&m->futex, __ATOMIC_RELAXED);
+      if (__builtin_expect((value & FUTEX_OWNER_DIED) != 0, false)) {
+        __atomic_and_fetch(&m->futex, ~FUTEX_OWNER_DIED, __ATOMIC_RELAXED);
+        return 1;
+      } else {
+        return 0;
+      }
+    }
+  }
+}
+
+void condition_signal(aos_condition *c, aos_mutex *m) {
+  condition_wake(c, m, 0);
+}
+
+void condition_broadcast(aos_condition *c, aos_mutex *m) {
+  condition_wake(c, m, INT_MAX);
+}
+
+int futex_wait_timeout(aos_futex *m, const struct timespec *timeout) {
+  RunObservers run_observers(m, false);
+  const int ret = sys_futex_wait(FUTEX_WAIT, m, 0, timeout);
+  if (ret != 0) {
+    if (ret == -EINTR) {
+      return 1;
+    } else if (ret == -ETIMEDOUT) {
+      return 2;
+    } else if (ret != -EWOULDBLOCK) {
+      errno = -ret;
+      return -1;
+    }
+  }
+  ANNOTATE_HAPPENS_AFTER(m);
+  return 0;
+}
+
+int futex_wait(aos_futex *m) { return futex_wait_timeout(m, NULL); }
+
+int futex_set_value(aos_futex *m, uint32_t value) {
+  RunObservers run_observers(m, false);
+  ANNOTATE_HAPPENS_BEFORE(m);
+  __atomic_store_n(m, value, __ATOMIC_SEQ_CST);
+  const int r = sys_futex_wake(m, INT_MAX - 4096);
+  if (__builtin_expect(
+          static_cast<unsigned int>(r) > static_cast<unsigned int>(-4096),
+          false)) {
+    errno = -r;
+    return -1;
+  } else {
+    return r;
+  }
+}
+
+int futex_set(aos_futex *m) {
+  return futex_set_value(m, 1);
+}
+
+int futex_unset(aos_futex *m) {
+  return !__atomic_exchange_n(m, 0, __ATOMIC_SEQ_CST);
+}
+
+namespace aos {
+namespace linux_code {
+namespace ipc_lib {
+
+// Sets functions to run befor eand after all futex operations.
+// This is important when doing robustness testing because the memory has to be
+// made writable for the whole futex operation, otherwise it never succeeds.
+void SetFutexAccessorObservers(FutexAccessorObserver before,
+                               FutexAccessorObserver after) {
+  before_observer = before;
+  after_observer = after;
+}
+
+// Sets an extra offset between mutexes and the value we use for them in the
+// robust list (only the forward pointers). This is used to work around a kernel
+// bug by keeping a second set of mutexes which is always writable so the kernel
+// won't go into an infinite loop when trying to unlock them.
+void SetRobustListOffset(ptrdiff_t offset) {
+  my_robust_list::SetRobustListOffset(offset);
+}
+
+// Returns true iff there are any mutexes locked by the current thread.
+// This is mainly useful for testing.
+bool HaveLockedMutexes() {
+  return my_robust_list::HaveLockedMutexes();
+}
+
+}  // namespace ipc_lib
+}  // namespace linux_code
+}  // namespace aos
diff --git a/aos/ipc_lib/aos_sync.h b/aos/ipc_lib/aos_sync.h
new file mode 100644
index 0000000..ae95c6d
--- /dev/null
+++ b/aos/ipc_lib/aos_sync.h
@@ -0,0 +1,186 @@
+#ifndef AOS_IPC_LIB_SYNC_H_
+#define AOS_IPC_LIB_SYNC_H_
+
+#include <stdlib.h>
+#include <signal.h>
+#include <stdint.h>
+#include <stddef.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif  // __cplusplus
+
+// TODO(brians) add client requests to make helgrind useful with this code
+// <http://www.valgrind.org/docs/manual/hg-manual.html#hg-manual.client-requests>
+// and <http://www.valgrind.org/docs/manual/drd-manual.html#drd-manual.clientreqs>
+// list the interesting ones
+
+// Have to remember to align structs containing it (recursively) to sizeof(int).
+// Valid initial values for use with futex_ functions are 0 (unset) and 1 (set).
+// The value should not be changed after multiple processes have started
+// accessing an instance except through the functions declared in this file.
+typedef uint32_t aos_futex __attribute__((aligned(sizeof(int))));
+
+// For use with the condition_ functions.
+// No initialization is necessary.
+typedef aos_futex aos_condition;
+
+// For use with the mutex_ functions.
+// futex must be initialized to 0.
+// No initialization is necessary for next and previous.
+// Under ThreadSanitizer, pthread_mutex_init must be initialized to false.
+// The recommended way to initialize one of these is by memset(3)ing the whole
+// thing to 0 or using C++ () initialization to avoid depending on the
+// implementation.
+struct aos_mutex {
+  // 2 links to get O(1) adds and removes.
+  // This is &next of another element.
+  // next (might) have stuff |ed into it to indicate PI futexes and might also
+  // have an offset (see SetRobustListOffset); previous is an actual pointer
+  // without any of that.
+  // next has to stay the first element of this structure.
+  uintptr_t next;
+  struct aos_mutex *previous;
+  aos_futex futex;
+#ifdef AOS_SANITIZER_thread
+  // Internal pthread mutex which is kept in sync with the actual mutex so tsan
+  // can understand what's happening and help catch bugs.
+  pthread_mutex_t pthread_mutex;
+#ifndef __cplusplus
+  // TODO(brian): Remove this once the stupid C code is gone...
+#define bool uint8_t
+#endif
+  bool pthread_mutex_init;
+#ifndef __cplusplus
+#undef bool
+#endif
+#endif
+};
+
+// The mutex_ functions are designed to be used as mutexes. A mutex can only be
+// unlocked from the same task which originally locked it. Also, if a task dies
+// while holding a mutex, the next person who locks it will be notified. After a
+// fork(2), any mutexes held will be held ONLY in the parent process. Attempting
+// to unlock them from the child will give errors.
+// Priority inheritance (aka priority inversion protection) is enabled.
+
+// All of these return 1 if the previous owner died with it held, 2 if
+// interrupted by a signal, 3 if timed out, or 4 if an optional lock fails. Some
+// of them (obviously) can never return some of those values.
+//
+// One of the highest priority processes blocked on a given mutex will be the
+// one to lock it when it is unlocked.
+int mutex_lock(struct aos_mutex *m) __attribute__((warn_unused_result));
+// Returns 2 if it timed out or 1 if interrupted by a signal.
+int mutex_lock_timeout(struct aos_mutex *m, const struct timespec *timeout)
+    __attribute__((warn_unused_result));
+// Ignores signals (retries until something other than getting a signal
+// happens).
+int mutex_grab(struct aos_mutex *m) __attribute__((warn_unused_result));
+// LOG(FATAL)s for multiple unlocking.
+void mutex_unlock(struct aos_mutex *m);
+// Does not block waiting for the mutex.
+int mutex_trylock(struct aos_mutex *m) __attribute__((warn_unused_result));
+#ifdef __cplusplus
+// Returns whether or not the mutex is locked by this thread.
+// There aren't very many valid uses for this function; the main ones are
+// checking mutexes as they are destroyed to catch problems with that early and
+// stack-based recursive mutex locking.
+bool mutex_islocked(const aos_mutex *m);
+#endif
+
+// The futex_ functions are similar to the mutex_ ones but different.
+// They are designed for signalling when something happens (possibly to
+// multiple listeners). A aos_futex manipulated with them can only be set or
+// unset. Also, they can be set/unset/waited on from any task independently of
+// who did something first and have no priority inversion protection.
+// They return -1 for other error (which will be in errno from futex(2)).
+// They have no spurious wakeups (because everybody always gets woken up).
+//
+// Another name for this kind of synchronization mechanism is a "notification".
+// Python calls it an "event".
+//
+// They are different from the condition_ functions in that they do NOT work
+// correctly as standard condition variables. While it is possible to keep
+// track of the "condition" using the value part of the futex_* functions, the
+// obvious implementation has basically the same race condition that condition
+// variables are designed to prevent between somebody else grabbing the mutex
+// and changing whether it's set or not and the futex_ function changing the
+// futex's value. A futex is effectively a resettable condition variable with
+// the condition being "has it been set"; if you have some other condition (for
+// example messages are available to read on a queue), use the condition_
+// functions or there will be race conditions.
+
+// Wait for the futex to be set. Will return immediately if it's already set
+// (after a syscall).
+// Returns 0 if successful or it was already set, 1 if interrupted by a signal,
+// or -1 with an error in errno. Can return 0 spuriously.
+int futex_wait(aos_futex *m) __attribute__((warn_unused_result));
+// The same as futex_wait except returns 2 if it times out.
+int futex_wait_timeout(aos_futex *m, const struct timespec *timeout)
+  __attribute__((warn_unused_result));
+
+// Set the futex and wake up anybody waiting on it.
+// Returns the number that were woken or -1 with an error in errno.
+//
+// This will always wake up all waiters at the same time and set the value to 1.
+int futex_set(aos_futex *m);
+// Same as above except lets something other than 1 be used as the final value.
+int futex_set_value(aos_futex *m, aos_futex value);
+// Unsets the futex (sets the value to 0).
+// Returns 0 if it was set before and 1 if it wasn't.
+// Can not fail.
+int futex_unset(aos_futex *m);
+
+// The condition_ functions implement condition variable support. The API is
+// similar to the pthreads api and works the same way. The same m argument must
+// be passed in for all calls to all of the condition_ functions with a given c.
+// They do have the potential for spurious wakeups.
+
+// Wait for the condition variable to be signalled. m will be unlocked
+// atomically with actually starting to wait. m is guaranteed to be locked when
+// this function returns.
+// NOTE: The relocking of m is not atomic with stopping the actual wait and
+// other process(es) may lock (+unlock) the mutex first.
+// Returns 0 on success or 1 if the previous owner died.
+int condition_wait(aos_condition *c, struct aos_mutex *m)
+    __attribute__((warn_unused_result));
+// If any other processes are condition_waiting on c, wake 1 of them. Does not
+// require m to be locked.
+// NOTE: There is a small chance that this will wake more than just 1 waiter.
+void condition_signal(aos_condition *c, struct aos_mutex *m);
+// Wakes all processes that are condition_waiting on c. Does not require m to be
+// locked.
+void condition_broadcast(aos_condition *c, struct aos_mutex *m);
+
+#ifdef __cplusplus
+}
+
+namespace aos {
+namespace linux_code {
+namespace ipc_lib {
+
+typedef void (*FutexAccessorObserver)(void *address, bool write);
+
+// Set functions which get called before and after all futex operations.
+void SetFutexAccessorObservers(FutexAccessorObserver before,
+                               FutexAccessorObserver after);
+
+// Set the offset to use for putting addresses into the robust list.
+// This is necessary to work around a kernel bug where it hangs when trying to
+// deal with a futex on the robust list when its memory has been changed to
+// read-only.
+void SetRobustListOffset(ptrdiff_t offset);
+
+// Returns true if there are any mutexes still locked by this task.
+// This is mainly useful for verifying tests don't mess up other ones by leaving
+// now-freed but still locked mutexes around.
+bool HaveLockedMutexes();
+
+}  // namespace ipc_lib
+}  // namespace linux_code
+}  // namespace aos
+
+#endif  // __cplusplus
+
+#endif  // AOS_IPC_LIB_SYNC_H_
diff --git a/aos/ipc_lib/core_lib.c b/aos/ipc_lib/core_lib.c
new file mode 100644
index 0000000..a1e3315
--- /dev/null
+++ b/aos/ipc_lib/core_lib.c
@@ -0,0 +1,51 @@
+#include "aos/ipc_lib/core_lib.h"
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <assert.h>
+
+#include "aos/ipc_lib/shared_mem_types.h"
+
+static uint8_t aos_8max(uint8_t l, uint8_t r) {
+  return (l > r) ? l : r;
+}
+void *shm_malloc_aligned(size_t length, uint8_t alignment) {
+  // minimum alignments from
+  // <http://software.intel.com/en-us/articles/data-alignment-when-migrating-to-64-bit-intel-architecture/>
+  if (length <= 1) {
+    alignment = aos_8max(alignment, 1);
+  } else if (length <= 2) {
+    alignment = aos_8max(alignment, 2);
+  } else if (length <= 4) {
+    alignment = aos_8max(alignment, 4);
+  } else if (length <= 8) {
+    alignment = aos_8max(alignment, 8);
+  } else if (length <= 16) {
+    alignment = aos_8max(alignment, 16);
+  } else {
+    alignment = aos_8max(alignment, (length >= 64) ? 64 : 16);
+  }
+
+  void *msg = NULL;
+  aos_shm_core *shm_core = global_core->mem_struct;
+  int result =
+      mutex_grab(&shm_core->msg_alloc_lock);
+#ifdef NDEBUG
+  (void)result;
+#else
+  assert(result == 0);
+#endif
+  shm_core->msg_alloc = (uint8_t *)shm_core->msg_alloc - length;
+  const uint8_t align_extra = (uintptr_t)shm_core->msg_alloc % alignment;
+  shm_core->msg_alloc = (uint8_t *)shm_core->msg_alloc - align_extra;
+  msg = shm_core->msg_alloc;
+  if (msg <= global_core->shared_mem) {
+    fprintf(stderr, "core_lib: RAN OUT OF SHARED MEMORY!!!----------------------------------------------------------\n");
+    printf("if you didn't see the stderr output just then, you should have\n");
+    abort();
+  }
+  //printf("alloc %p\n", msg);
+  mutex_unlock(&shm_core->msg_alloc_lock);
+  return msg;
+}
+
diff --git a/aos/ipc_lib/core_lib.h b/aos/ipc_lib/core_lib.h
new file mode 100644
index 0000000..d62f602
--- /dev/null
+++ b/aos/ipc_lib/core_lib.h
@@ -0,0 +1,33 @@
+#ifndef _AOS_CORE_LIB_H_
+#define _AOS_CORE_LIB_H_
+
+#include <stdint.h>
+
+#include "aos/ipc_lib/aos_sync.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif  // __cplusplus
+
+// alloc_size was taken out of clang in r197866. It appears that it never
+// actually did anything.
+#if defined(__clang__)
+#define attribute_alloc_size(n)
+#else
+#define attribute_alloc_size(n) __attribute__((alloc_size(n)))
+#endif
+
+void *shm_malloc_aligned(size_t length, uint8_t alignment)
+    attribute_alloc_size(1);
+static void *shm_malloc(size_t length) attribute_alloc_size(1);
+static inline void *shm_malloc(size_t length) {
+  return shm_malloc_aligned(length, 0);
+}
+
+#undef attribute_alloc_size
+
+#ifdef __cplusplus
+}
+#endif  // __cplusplus
+
+#endif
diff --git a/aos/ipc_lib/ipc_comparison.cc b/aos/ipc_lib/ipc_comparison.cc
new file mode 100644
index 0000000..8774c73
--- /dev/null
+++ b/aos/ipc_lib/ipc_comparison.cc
@@ -0,0 +1,961 @@
+#include "third_party/gflags/include/gflags/gflags.h"
+
+#include <fcntl.h>
+#include <mqueue.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <pthread.h>
+#include <semaphore.h>
+#include <stdint.h>
+#include <sys/eventfd.h>
+#include <sys/msg.h>
+#include <sys/sem.h>
+#include <sys/socket.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <sys/un.h>
+
+#include <atomic>
+#include <chrono>
+#include <memory>
+#include <string>
+#include <thread>
+
+#include "aos/condition.h"
+#include "aos/event.h"
+#include "aos/logging/implementations.h"
+#include "aos/logging/logging.h"
+#include "aos/mutex/mutex.h"
+#include "aos/time/time.h"
+#include "aos/init.h"
+#include "aos/ipc_lib/queue.h"
+
+DEFINE_string(method, "", "Which IPC method to use");
+DEFINE_int32(messages, 1000000, "How many messages to send back and forth");
+DEFINE_int32(client_cpu, 0, "CPU to pin client to");
+DEFINE_int32(server_cpu, 0, "CPU to pin server to");
+DEFINE_int32(client_priority, 1,
+             "Realtime priority for client. Negative for don't change");
+DEFINE_int32(server_priority, 1,
+             "Realtime priority for server. Negative for don't change");
+
+namespace aos {
+
+namespace chrono = ::std::chrono;
+
+// A generic interface for an object which can send some data to another thread
+// and back.
+//
+// One side is called the "server". It should constantly Wait, do something with
+// the result, and then call Pong.
+// The other side is called the "client". It should repeatedly call Ping.
+class PingPongerInterface {
+ public:
+  // A chunk of memory definitely on its own cache line anywhere sane.
+  typedef uint8_t Data[1024] __attribute__((aligned(128)));
+
+  virtual ~PingPongerInterface() {}
+
+  // Returns where the "client" side should write data in preparation to send to
+  // the server.
+  // The result is valid until the next Ping call.
+  virtual Data *PingData() = 0;
+
+  // Sends the data returned from the most recent PingData call to the "server"
+  // side and returns its response.
+  // PingData must be called exactly once before each call of this method.
+  // The result is valid until the next PingData call.
+  virtual const Data *Ping() = 0;
+
+  // Waits for a Ping call and then returns the associated data.
+  // The result is valid until the beginning of the next Pong call.
+  virtual const Data *Wait() = 0;
+
+  // Returns where the "server" side should write data in preparation to send
+  // back to the "client".
+  // The result is valid until the next Pong call.
+  virtual Data *PongData() = 0;
+
+  // Sends data back to an in-progress Ping.
+  // Sends the data returned from the most recent PongData call back to an
+  // in-progress Ping.
+  // PongData must be called exactly once before each call of this method.
+  virtual void Pong() = 0;
+};
+
+// Base class for implementations which simple use a pair of Data objects for
+// all Pings and Pongs.
+class StaticPingPonger : public PingPongerInterface {
+ public:
+  Data *PingData() override { return &ping_data_; }
+  Data *PongData() override { return &pong_data_; }
+
+ private:
+  Data ping_data_, pong_data_;
+};
+
+// Implements ping-pong by sending the data over file descriptors.
+class FDPingPonger : public StaticPingPonger {
+ protected:
+  // Subclasses must override and call Init.
+  FDPingPonger() {}
+
+  // Subclasses must call this in their constructor.
+  // Does not take ownership of any of the file descriptors, any/all of which
+  // may be the same.
+  // {server,client}_read must be open for reading and {server,client}_write
+  // must be open for writing.
+  void Init(int server_read, int server_write, int client_read,
+            int client_write) {
+    server_read_ = server_read;
+    server_write_ = server_write;
+    client_read_ = client_read;
+    client_write_ = client_write;
+  }
+
+ private:
+  const Data *Ping() override {
+    WriteFully(client_write_, *PingData());
+    ReadFully(client_read_, &read_by_client_);
+    return &read_by_client_;
+  }
+
+  const Data *Wait() override {
+    ReadFully(server_read_, &read_by_server_);
+    return &read_by_server_;
+  }
+
+  void Pong() override { WriteFully(server_write_, *PongData()); }
+
+  void ReadFully(int fd, Data *data) {
+    size_t remaining = sizeof(*data);
+    uint8_t *current = &(*data)[0];
+    while (remaining > 0) {
+      const ssize_t result = PCHECK(read(fd, current, remaining));
+      CHECK_LE(static_cast<size_t>(result), remaining);
+      remaining -= result;
+      current += result;
+    }
+  }
+
+  void WriteFully(int fd, const Data &data) {
+    size_t remaining = sizeof(data);
+    const uint8_t *current = &data[0];
+    while (remaining > 0) {
+      const ssize_t result = PCHECK(write(fd, current, remaining));
+      CHECK_LE(static_cast<size_t>(result), remaining);
+      remaining -= result;
+      current += result;
+    }
+  }
+
+  Data read_by_client_, read_by_server_;
+  int server_read_ = -1, server_write_ = -1, client_read_ = -1,
+      client_write_ = -1;
+};
+
+class PipePingPonger : public FDPingPonger {
+ public:
+  PipePingPonger() {
+    PCHECK(pipe(to_server));
+    PCHECK(pipe(from_server));
+    Init(to_server[0], from_server[1], from_server[0], to_server[1]);
+  }
+  ~PipePingPonger() {
+    PCHECK(close(to_server[0]));
+    PCHECK(close(to_server[1]));
+    PCHECK(close(from_server[0]));
+    PCHECK(close(from_server[1]));
+  }
+
+ private:
+  int to_server[2], from_server[2];
+};
+
+class NamedPipePingPonger : public FDPingPonger {
+ public:
+  NamedPipePingPonger() {
+    OpenFifo("/tmp/to_server", &client_write_, &server_read_);
+    OpenFifo("/tmp/from_server", &server_write_, &client_read_);
+
+    Init(server_read_, server_write_, client_read_, client_write_);
+  }
+  ~NamedPipePingPonger() {
+    PCHECK(close(server_read_));
+    PCHECK(close(client_write_));
+    PCHECK(close(client_read_));
+    PCHECK(close(server_write_));
+  }
+
+ private:
+  void OpenFifo(const char *name, int *write, int *read) {
+    {
+      const int ret = unlink(name);
+      if (ret == -1 && errno != ENOENT) {
+        PLOG(FATAL, "unlink(%s)", name);
+      }
+      PCHECK(mkfifo(name, S_IWUSR | S_IRUSR));
+      // Have to open it nonblocking because the other end isn't open yet...
+      *read = PCHECK(open(name, O_RDONLY | O_NONBLOCK));
+      *write = PCHECK(open(name, O_WRONLY));
+      {
+        const int flags = PCHECK(fcntl(*read, F_GETFL));
+        PCHECK(fcntl(*read, F_SETFL, flags & ~O_NONBLOCK));
+      }
+    }
+  }
+
+  int server_read_, server_write_, client_read_, client_write_;
+};
+
+class UnixPingPonger : public FDPingPonger {
+ public:
+  UnixPingPonger(int type) {
+    PCHECK(socketpair(AF_UNIX, type, 0, to_server));
+    PCHECK(socketpair(AF_UNIX, type, 0, from_server));
+    Init(to_server[0], from_server[1], from_server[0], to_server[1]);
+  }
+  ~UnixPingPonger() {
+    PCHECK(close(to_server[0]));
+    PCHECK(close(to_server[1]));
+    PCHECK(close(from_server[0]));
+    PCHECK(close(from_server[1]));
+  }
+
+ private:
+  int to_server[2], from_server[2];
+};
+
+class TCPPingPonger : public FDPingPonger {
+ public:
+  TCPPingPonger(bool nodelay) {
+    server_ = PCHECK(socket(AF_INET, SOCK_STREAM, 0));
+    if (nodelay) {
+      const int yes = 1;
+      PCHECK(setsockopt(server_, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes)));
+    }
+    {
+      sockaddr_in server_address;
+      memset(&server_address, 0, sizeof(server_address));
+      server_address.sin_family = AF_INET;
+      server_address.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+      PCHECK(bind(server_, reinterpret_cast<sockaddr *>(&server_address),
+                  sizeof(server_address)));
+    }
+    PCHECK(listen(server_, 1));
+
+    client_ = PCHECK(socket(AF_INET, SOCK_STREAM, 0));
+    if (nodelay) {
+      const int yes = 1;
+      PCHECK(setsockopt(client_, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes)));
+    }
+    {
+      sockaddr_in client_address;
+      unsigned int length = sizeof(client_address);
+      PCHECK(getsockname(server_, reinterpret_cast<sockaddr *>(&client_address),
+                         &length));
+      PCHECK(connect(client_, reinterpret_cast<sockaddr *>(&client_address),
+                     length));
+    }
+    server_connection_ = PCHECK(accept(server_, nullptr, 0));
+
+    Init(server_connection_, server_connection_, client_, client_);
+  }
+  ~TCPPingPonger() {
+    PCHECK(close(client_));
+    PCHECK(close(server_connection_));
+    PCHECK(close(server_));
+  }
+
+ private:
+  int server_, client_, server_connection_;
+};
+
+class UDPPingPonger : public FDPingPonger {
+ public:
+  UDPPingPonger() {
+    CreatePair(&server_read_, &client_write_);
+    CreatePair(&client_read_, &server_write_);
+
+    Init(server_read_, server_write_, client_read_, client_write_);
+  }
+  ~UDPPingPonger() {
+    PCHECK(close(server_read_));
+    PCHECK(close(client_write_));
+    PCHECK(close(client_read_));
+    PCHECK(close(server_write_));
+  }
+
+ private:
+  void CreatePair(int *server, int *client) {
+    *server = PCHECK(socket(AF_INET, SOCK_DGRAM, 0));
+    {
+      sockaddr_in server_address;
+      memset(&server_address, 0, sizeof(server_address));
+      server_address.sin_family = AF_INET;
+      server_address.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+      // server_address.sin_port = htons(server_ + 3000);
+      PCHECK(bind(*server, reinterpret_cast<sockaddr *>(&server_address),
+                  sizeof(server_address)));
+    }
+    *client = PCHECK(socket(AF_INET, SOCK_DGRAM, 0));
+    {
+      sockaddr_in client_address;
+      unsigned int length = sizeof(client_address);
+      PCHECK(getsockname(*server, reinterpret_cast<sockaddr *>(&client_address),
+                         &length));
+      PCHECK(connect(*client, reinterpret_cast<sockaddr *>(&client_address),
+                     length));
+    }
+  }
+
+  int server_read_, server_write_, client_read_, client_write_;
+};
+
+// Implements ping-pong without copying the data using a condition variable-like
+// interface.
+class ConditionVariablePingPonger : public StaticPingPonger {
+ protected:
+  // Represents a condition variable bundled with a mutex.
+  //
+  // Wait may return spuriously.
+  class ConditionVariableInterface {
+   public:
+    virtual ~ConditionVariableInterface() {}
+
+    // Locks the mutex.
+    virtual void Lock() = 0;
+
+    // Unlocks the mutex.
+    virtual void Unlock() = 0;
+
+    // Waits on the condition variable.
+    //
+    // The mutex must be locked when this is called.
+    virtual void Wait() = 0;
+
+    // Signals the condition variable.
+    //
+    // The mutex does not have to be locked during this.
+    virtual void Signal() = 0;
+  };
+
+  ConditionVariablePingPonger(
+      ::std::unique_ptr<ConditionVariableInterface> ping,
+      ::std::unique_ptr<ConditionVariableInterface> pong)
+      : ping_(::std::move(ping)), pong_(::std::move(pong)) {}
+
+ private:
+  const Data *Ping() override {
+    ping_->Lock();
+    to_server_ = PingData();
+    ping_->Unlock();
+    ping_->Signal();
+    pong_->Lock();
+    while (from_server_ == nullptr) {
+      pong_->Wait();
+    }
+    const Data *r = from_server_;
+    from_server_ = nullptr;
+    pong_->Unlock();
+    return r;
+  }
+
+  const Data *Wait() override {
+    ping_->Lock();
+    while (to_server_ == nullptr) {
+      ping_->Wait();
+    }
+    const Data *r = to_server_;
+    to_server_ = nullptr;
+    ping_->Unlock();
+    return r;
+  }
+
+  void Pong() override {
+    pong_->Lock();
+    from_server_ = PongData();
+    pong_->Unlock();
+    pong_->Signal();
+  }
+
+  const Data *to_server_ = nullptr, *from_server_ = nullptr;
+  const ::std::unique_ptr<ConditionVariableInterface> ping_, pong_;
+};
+
+// Implements ping-pong without copying the data using a semaphore-like
+// interface.
+class SemaphorePingPonger : public StaticPingPonger {
+ protected:
+  // Represents a semaphore, which need only count to 1.
+  //
+  // The behavior when calling Get/Put in anything other than alternating order
+  // is undefined.
+  //
+  // Wait may NOT return spuriously.
+  class SemaphoreInterface {
+   public:
+    virtual ~SemaphoreInterface() {}
+
+    virtual void Get() = 0;
+    virtual void Put() = 0;
+  };
+
+  SemaphorePingPonger(::std::unique_ptr<SemaphoreInterface> ping,
+                      ::std::unique_ptr<SemaphoreInterface> pong)
+      : ping_(::std::move(ping)), pong_(::std::move(pong)) {}
+
+ private:
+  const Data *Ping() override {
+    to_server_ = PingData();
+    ping_->Put();
+    pong_->Get();
+    return from_server_;
+  }
+
+  const Data *Wait() override {
+    ping_->Get();
+    return to_server_;
+  }
+
+  void Pong() override {
+    from_server_ = PongData();
+    pong_->Put();
+  }
+
+  const Data *to_server_ = nullptr, *from_server_ = nullptr;
+  const ::std::unique_ptr<SemaphoreInterface> ping_, pong_;
+};
+
+
+class AOSMutexPingPonger : public ConditionVariablePingPonger {
+ public:
+  AOSMutexPingPonger()
+      : ConditionVariablePingPonger(
+            ::std::unique_ptr<ConditionVariableInterface>(
+                new AOSConditionVariable()),
+            ::std::unique_ptr<ConditionVariableInterface>(
+                new AOSConditionVariable())) {}
+
+ private:
+  class AOSConditionVariable : public ConditionVariableInterface {
+   public:
+    AOSConditionVariable() : condition_(&mutex_) {}
+
+   private:
+    void Lock() override { CHECK(!mutex_.Lock()); }
+    void Unlock() override { mutex_.Unlock(); }
+    void Wait() override { CHECK(!condition_.Wait()); }
+    void Signal() override { condition_.Broadcast(); }
+
+    Mutex mutex_;
+    Condition condition_;
+  };
+};
+
+class AOSEventPingPonger : public SemaphorePingPonger {
+ public:
+  AOSEventPingPonger()
+      : SemaphorePingPonger(
+            ::std::unique_ptr<SemaphoreInterface>(
+                new AOSEventSemaphore()),
+            ::std::unique_ptr<SemaphoreInterface>(
+                new AOSEventSemaphore())) {}
+
+ private:
+  class AOSEventSemaphore : public SemaphoreInterface {
+   private:
+    void Get() override {
+      event_.Wait();
+      event_.Clear();
+    }
+    void Put() override { event_.Set(); }
+
+    Event event_;
+  };
+};
+
+class PthreadMutexPingPonger : public ConditionVariablePingPonger {
+ public:
+  PthreadMutexPingPonger(int pshared, bool pi)
+      : ConditionVariablePingPonger(
+            ::std::unique_ptr<ConditionVariableInterface>(
+                new PthreadConditionVariable(pshared, pi)),
+            ::std::unique_ptr<ConditionVariableInterface>(
+                new PthreadConditionVariable(pshared, pi))) {}
+
+ private:
+  class PthreadConditionVariable : public ConditionVariableInterface {
+   public:
+    PthreadConditionVariable(bool pshared, bool pi) {
+      {
+        pthread_condattr_t cond_attr;
+        PRCHECK(pthread_condattr_init(&cond_attr));
+        if (pshared) {
+          PRCHECK(
+              pthread_condattr_setpshared(&cond_attr, PTHREAD_PROCESS_SHARED));
+        }
+        PRCHECK(pthread_cond_init(&condition_, &cond_attr));
+        PRCHECK(pthread_condattr_destroy(&cond_attr));
+      }
+
+      {
+        pthread_mutexattr_t mutex_attr;
+        PRCHECK(pthread_mutexattr_init(&mutex_attr));
+        if (pshared) {
+          PRCHECK(pthread_mutexattr_setpshared(&mutex_attr,
+                                               PTHREAD_PROCESS_SHARED));
+        }
+        if (pi) {
+          PRCHECK(
+              pthread_mutexattr_setprotocol(&mutex_attr, PTHREAD_PRIO_INHERIT));
+        }
+        PRCHECK(pthread_mutex_init(&mutex_, nullptr));
+        PRCHECK(pthread_mutexattr_destroy(&mutex_attr));
+      }
+    }
+    ~PthreadConditionVariable() {
+      PRCHECK(pthread_mutex_destroy(&mutex_));
+      PRCHECK(pthread_cond_destroy(&condition_));
+    }
+
+   private:
+    void Lock() override { PRCHECK(pthread_mutex_lock(&mutex_)); }
+    void Unlock() override { PRCHECK(pthread_mutex_unlock(&mutex_)); }
+    void Wait() override { PRCHECK(pthread_cond_wait(&condition_, &mutex_)); }
+    void Signal() override { PRCHECK(pthread_cond_broadcast(&condition_)); }
+
+    pthread_cond_t condition_;
+    pthread_mutex_t mutex_;
+  };
+};
+
+class EventFDPingPonger : public SemaphorePingPonger {
+ public:
+  EventFDPingPonger()
+      : SemaphorePingPonger(
+            ::std::unique_ptr<SemaphoreInterface>(new EventFDSemaphore()),
+            ::std::unique_ptr<SemaphoreInterface>(new EventFDSemaphore())) {}
+
+ private:
+  class EventFDSemaphore : public SemaphoreInterface {
+   public:
+    EventFDSemaphore() : fd_(PCHECK(eventfd(0, 0))) {}
+    ~EventFDSemaphore() { PCHECK(close(fd_)); }
+
+   private:
+    void Get() override {
+      uint64_t value;
+      if (read(fd_, &value, sizeof(value)) != sizeof(value)) {
+        PLOG(FATAL, "reading from eventfd %d failed\n", fd_);
+      }
+      CHECK_EQ(1u, value);
+    }
+    void Put() override {
+      uint64_t value = 1;
+      if (write(fd_, &value, sizeof(value)) != sizeof(value)) {
+        PLOG(FATAL, "writing to eventfd %d failed\n", fd_);
+      }
+    }
+
+    const int fd_;
+  };
+};
+
+class SysvSemaphorePingPonger : public SemaphorePingPonger {
+ public:
+  SysvSemaphorePingPonger()
+      : SemaphorePingPonger(
+            ::std::unique_ptr<SemaphoreInterface>(new SysvSemaphore()),
+            ::std::unique_ptr<SemaphoreInterface>(new SysvSemaphore())) {}
+
+ private:
+  class SysvSemaphore : public SemaphoreInterface {
+   public:
+    SysvSemaphore()
+        : sem_id_(PCHECK(semget(IPC_PRIVATE, 1, 0600))) {}
+
+   private:
+    void Get() override {
+      struct sembuf op;
+      op.sem_num = 0;
+      op.sem_op = -1;
+      op.sem_flg = 0;
+      PCHECK(semop(sem_id_, &op, 1));
+    }
+    void Put() override {
+      struct sembuf op;
+      op.sem_num = 0;
+      op.sem_op = 1;
+      op.sem_flg = 0;
+      PCHECK(semop(sem_id_, &op, 1));
+    }
+
+    const int sem_id_;
+  };
+};
+
+class PosixSemaphorePingPonger : public SemaphorePingPonger {
+ protected:
+  PosixSemaphorePingPonger(sem_t *ping_sem, sem_t *pong_sem)
+      : SemaphorePingPonger(
+            ::std::unique_ptr<SemaphoreInterface>(new PosixSemaphore(ping_sem)),
+            ::std::unique_ptr<SemaphoreInterface>(
+                new PosixSemaphore(pong_sem))) {}
+
+ private:
+  class PosixSemaphore : public SemaphoreInterface {
+   public:
+    PosixSemaphore(sem_t *sem)
+        : sem_(sem) {}
+
+   private:
+    void Get() override { PCHECK(sem_wait(sem_)); }
+    void Put() override { PCHECK(sem_post(sem_)); }
+
+    sem_t *const sem_;
+  };
+};
+
+class SysvQueuePingPonger : public StaticPingPonger {
+ public:
+  SysvQueuePingPonger()
+      : ping_(PCHECK(msgget(IPC_PRIVATE, 0600))),
+        pong_(PCHECK(msgget(IPC_PRIVATE, 0600))) {}
+
+  const Data *Ping() override {
+    {
+      Message to_send;
+      memcpy(&to_send.data, PingData(), sizeof(Data));
+      PCHECK(msgsnd(ping_, &to_send, sizeof(Data), 0));
+    }
+    {
+      Message received;
+      PCHECK(msgrcv(pong_, &received, sizeof(Data), 1, 0));
+      memcpy(&pong_received_, &received.data, sizeof(Data));
+    }
+    return &pong_received_;
+  }
+
+  const Data *Wait() override {
+    {
+      Message received;
+      PCHECK(msgrcv(ping_, &received, sizeof(Data), 1, 0));
+      memcpy(&ping_received_, &received.data, sizeof(Data));
+    }
+    return &ping_received_;
+  }
+
+  virtual void Pong() override {
+    Message to_send;
+    memcpy(&to_send.data, PongData(), sizeof(Data));
+    PCHECK(msgsnd(pong_, &to_send, sizeof(Data), 0));
+  }
+
+ private:
+  struct Message {
+    long mtype = 1;
+    char data[sizeof(Data)];
+  };
+
+  Data ping_received_, pong_received_;
+
+  const int ping_, pong_;
+};
+
+class PosixQueuePingPonger : public StaticPingPonger {
+ public:
+  PosixQueuePingPonger() : ping_(Open("/ping")), pong_(Open("/pong")) {}
+  ~PosixQueuePingPonger() {
+    PCHECK(mq_close(ping_));
+    PCHECK(mq_close(pong_));
+  }
+
+  const Data *Ping() override {
+    PCHECK(mq_send(ping_, static_cast<char *>(static_cast<void *>(PingData())),
+                   sizeof(Data), 1));
+    PCHECK(mq_receive(pong_,
+                      static_cast<char *>(static_cast<void *>(&pong_received_)),
+                      sizeof(Data), nullptr));
+    return &pong_received_;
+  }
+
+  const Data *Wait() override {
+    PCHECK(mq_receive(ping_,
+                      static_cast<char *>(static_cast<void *>(&ping_received_)),
+                      sizeof(Data), nullptr));
+    return &ping_received_;
+  }
+
+  virtual void Pong() override {
+    PCHECK(mq_send(pong_, static_cast<char *>(static_cast<void *>(PongData())),
+                   sizeof(Data), 1));
+  }
+
+ private:
+  mqd_t Open(const char *name) {
+    if (mq_unlink(name) == -1 && errno != ENOENT) {
+      PLOG(FATAL, "mq_unlink(%s) failed", name);
+    }
+    struct mq_attr attr;
+    attr.mq_flags = 0;
+    attr.mq_maxmsg = 1;
+    attr.mq_msgsize = sizeof(Data);
+    attr.mq_curmsgs = 0;
+    const mqd_t r = mq_open(name, O_CREAT | O_RDWR | O_EXCL, 0600, &attr);
+    if (r == reinterpret_cast<mqd_t>(-1)) {
+      PLOG(FATAL, "mq_open(%s, O_CREAT | O_RDWR | O_EXCL) failed", name);
+    }
+    return r;
+  }
+
+  const mqd_t ping_, pong_;
+  Data ping_received_, pong_received_;
+};
+
+class PosixUnnamedSemaphorePingPonger : public PosixSemaphorePingPonger {
+ public:
+  PosixUnnamedSemaphorePingPonger(int pshared)
+      : PosixSemaphorePingPonger(&ping_sem_, &pong_sem_) {
+    PCHECK(sem_init(&ping_sem_, pshared, 0));
+    PCHECK(sem_init(&pong_sem_, pshared, 0));
+  }
+  ~PosixUnnamedSemaphorePingPonger() {
+    PCHECK(sem_destroy(&ping_sem_));
+    PCHECK(sem_destroy(&pong_sem_));
+  }
+
+ private:
+  sem_t ping_sem_, pong_sem_;
+};
+
+class PosixNamedSemaphorePingPonger : public PosixSemaphorePingPonger {
+ public:
+  PosixNamedSemaphorePingPonger()
+      : PosixSemaphorePingPonger(ping_sem_ = Open("/ping"),
+                                 pong_sem_ = Open("/pong")) {}
+  ~PosixNamedSemaphorePingPonger() {
+    PCHECK(sem_close(ping_sem_));
+    PCHECK(sem_close(pong_sem_));
+  }
+
+ private:
+  sem_t *Open(const char *name) {
+    if (sem_unlink(name) == -1 && errno != ENOENT) {
+      PLOG(FATAL, "shm_unlink(%s) failed", name);
+    }
+    sem_t *const r = sem_open(name, O_CREAT | O_RDWR | O_EXCL, 0600, 0);
+    if (r == SEM_FAILED) {
+      PLOG(FATAL, "sem_open(%s, O_CREAT | O_RDWR | O_EXCL) failed", name);
+    }
+    return r;
+  }
+
+  sem_t *ping_sem_, *pong_sem_;
+};
+
+class AOSQueuePingPonger : public PingPongerInterface {
+ public:
+  AOSQueuePingPonger()
+      : ping_queue_(RawQueue::Fetch("ping", sizeof(Data), 0, 1)),
+        pong_queue_(RawQueue::Fetch("pong", sizeof(Data), 0, 1)) {}
+
+  Data *PingData() override {
+    CHECK_EQ(nullptr, ping_to_send_);
+    ping_to_send_ = static_cast<Data *>(ping_queue_->GetMessage());
+    return ping_to_send_;
+  }
+
+  const Data *Ping() override {
+    CHECK_NE(nullptr, ping_to_send_);
+    CHECK(ping_queue_->WriteMessage(ping_to_send_, RawQueue::kBlock));
+    ping_to_send_ = nullptr;
+    pong_queue_->FreeMessage(pong_received_);
+    pong_received_ =
+        static_cast<const Data *>(pong_queue_->ReadMessage(RawQueue::kBlock));
+    return pong_received_;
+  }
+
+  const Data *Wait() override {
+    ping_queue_->FreeMessage(ping_received_);
+    ping_received_ =
+        static_cast<const Data *>(ping_queue_->ReadMessage(RawQueue::kBlock));
+    return ping_received_;
+  }
+
+  Data *PongData() override {
+    CHECK_EQ(nullptr, pong_to_send_);
+    pong_to_send_ = static_cast<Data *>(pong_queue_->GetMessage());
+    return pong_to_send_;
+  }
+
+  void Pong() override {
+    CHECK_NE(nullptr, pong_to_send_);
+    CHECK(pong_queue_->WriteMessage(pong_to_send_, RawQueue::kBlock));
+    pong_to_send_ = nullptr;
+  }
+
+ private:
+  RawQueue *const ping_queue_;
+  RawQueue *const pong_queue_;
+
+  Data *ping_to_send_ = nullptr, *pong_to_send_ = nullptr;
+  const Data *ping_received_ = nullptr, *pong_received_ = nullptr;
+};
+
+int Main(int /*argc*/, char **argv) {
+  ::std::unique_ptr<PingPongerInterface> ping_ponger;
+  if (FLAGS_method == "pipe") {
+    ping_ponger.reset(new PipePingPonger());
+  } else if (FLAGS_method == "named_pipe") {
+    ping_ponger.reset(new NamedPipePingPonger());
+  } else if (FLAGS_method == "aos_mutex") {
+    ping_ponger.reset(new AOSMutexPingPonger());
+  } else if (FLAGS_method == "aos_event") {
+    ping_ponger.reset(new AOSEventPingPonger());
+  } else if (FLAGS_method == "pthread_mutex") {
+    ping_ponger.reset(new PthreadMutexPingPonger(false, false));
+  } else if (FLAGS_method == "pthread_mutex_pshared") {
+    ping_ponger.reset(new PthreadMutexPingPonger(true, false));
+  } else if (FLAGS_method == "pthread_mutex_pshared_pi") {
+    ping_ponger.reset(new PthreadMutexPingPonger(true, true));
+  } else if (FLAGS_method == "pthread_mutex_pi") {
+    ping_ponger.reset(new PthreadMutexPingPonger(false, true));
+  } else if (FLAGS_method == "aos_queue") {
+    ping_ponger.reset(new AOSQueuePingPonger());
+  } else if (FLAGS_method == "eventfd") {
+    ping_ponger.reset(new EventFDPingPonger());
+  } else if (FLAGS_method == "sysv_semaphore") {
+    ping_ponger.reset(new SysvSemaphorePingPonger());
+  } else if (FLAGS_method == "sysv_queue") {
+    ping_ponger.reset(new SysvQueuePingPonger());
+  } else if (FLAGS_method == "posix_semaphore_unnamed_shared") {
+    ping_ponger.reset(new PosixUnnamedSemaphorePingPonger(1));
+  } else if (FLAGS_method == "posix_semaphore_unnamed_unshared") {
+    ping_ponger.reset(new PosixUnnamedSemaphorePingPonger(0));
+  } else if (FLAGS_method == "posix_semaphore_named") {
+    ping_ponger.reset(new PosixNamedSemaphorePingPonger());
+  } else if (FLAGS_method == "posix_queue") {
+    ping_ponger.reset(new PosixQueuePingPonger());
+  } else if (FLAGS_method == "unix_stream") {
+    ping_ponger.reset(new UnixPingPonger(SOCK_STREAM));
+  } else if (FLAGS_method == "unix_datagram") {
+    ping_ponger.reset(new UnixPingPonger(SOCK_DGRAM));
+  } else if (FLAGS_method == "unix_seqpacket") {
+    ping_ponger.reset(new UnixPingPonger(SOCK_SEQPACKET));
+  } else if (FLAGS_method == "tcp") {
+    ping_ponger.reset(new TCPPingPonger(false));
+  } else if (FLAGS_method == "tcp_nodelay") {
+    ping_ponger.reset(new TCPPingPonger(true));
+  } else if (FLAGS_method == "udp") {
+    ping_ponger.reset(new UDPPingPonger());
+  } else {
+    fprintf(stderr, "Unknown IPC method to test '%s'\n", FLAGS_method.c_str());
+    ::gflags::ShowUsageWithFlags(argv[0]);
+    return 1;
+  }
+
+  ::std::atomic<bool> done{false};
+
+  ::std::thread server([&ping_ponger, &done]() {
+    if (FLAGS_server_priority > 0) {
+      SetCurrentThreadRealtimePriority(FLAGS_server_priority);
+    }
+    PinCurrentThreadToCPU(FLAGS_server_cpu);
+
+    while (!done) {
+      const PingPongerInterface::Data *data = ping_ponger->Wait();
+      PingPongerInterface::Data *response = ping_ponger->PongData();
+      for (size_t i = 0; i < sizeof(*data); ++i) {
+        (*response)[i] = (*data)[i] + 1;
+      }
+      ping_ponger->Pong();
+    }
+  });
+
+  if (FLAGS_client_priority > 0) {
+    SetCurrentThreadRealtimePriority(FLAGS_client_priority);
+  }
+  PinCurrentThreadToCPU(FLAGS_client_cpu);
+
+  // Warm everything up.
+  for (int i = 0; i < 1000; ++i) {
+    PingPongerInterface::Data *warmup_data = ping_ponger->PingData();
+    memset(*warmup_data, i % 255, sizeof(*warmup_data));
+    ping_ponger->Ping();
+  }
+
+  const monotonic_clock::time_point start = monotonic_clock::now();
+
+  for (int32_t i = 0; i < FLAGS_messages; ++i) {
+    PingPongerInterface::Data *to_send = ping_ponger->PingData();
+    memset(*to_send, i % 123, sizeof(*to_send));
+    const PingPongerInterface::Data *received = ping_ponger->Ping();
+    for (size_t ii = 0; ii < sizeof(*received); ++ii) {
+      CHECK_EQ(((i % 123) + 1) % 255, (*received)[ii]);
+    }
+  }
+
+  const monotonic_clock::time_point end = monotonic_clock::now();
+
+  // Try to make sure the server thread gets past its check of done so our
+  // Ping() down below doesn't hang. Kind of lame, but doing better would
+  // require complicating the interface to each implementation which isn't worth
+  // it here.
+  ::std::this_thread::sleep_for(::std::chrono::milliseconds(200));
+  done = true;
+  ping_ponger->PingData();
+  ping_ponger->Ping();
+  server.join();
+
+  LOG(INFO, "Took %f seconds to send %" PRId32 " messages\n",
+      chrono::duration_cast<chrono::duration<double>>(end - start).count(),
+      FLAGS_messages);
+  const chrono::nanoseconds per_message = (end - start) / FLAGS_messages;
+  if (per_message >= chrono::seconds(1)) {
+    LOG(INFO, "More than 1 second per message ?!?\n");
+  } else {
+    LOG(INFO, "That is %" PRId32 " nanoseconds per message\n",
+        static_cast<int>(per_message.count()));
+  }
+
+  return 0;
+}
+
+}  // namespace aos
+
+int main(int argc, char **argv) {
+  ::gflags::SetUsageMessage(
+      ::std::string("Compares various forms of IPC. Usage:\n") + argv[0] +
+      " --method=METHOD\n"
+      "METHOD can be one of the following:\n"
+      "\tpipe\n"
+      "\tnamed_pipe\n"
+      "\taos_mutex\n"
+      "\taos_event\n"
+      "\tpthread_mutex\n"
+      "\tpthread_mutex_pshared\n"
+      "\tpthread_mutex_pshared_pi\n"
+      "\tpthread_mutex_pi\n"
+      "\taos_queue\n"
+      "\teventfd\n"
+      "\tsysv_semaphore\n"
+      "\tsysv_queue\n"
+      "\tposix_semaphore_unnamed_shared\n"
+      "\tposix_semaphore_unnamed_unshared\n"
+      "\tposix_semaphore_named\n"
+      "\tposix_queue\n"
+      "\tunix_stream\n"
+      "\tunix_datagram\n"
+      "\tunix_seqpacket\n"
+      "\ttcp\n"
+      "\ttcp_nodelay\n"
+      "\tudp\n");
+  ::gflags::ParseCommandLineFlags(&argc, &argv, true);
+
+  ::aos::InitNRT();
+  ::aos::logging::AddImplementation(
+      new ::aos::logging::StreamLogImplementation(stdout));
+
+  return ::aos::Main(argc, argv);
+}
diff --git a/aos/ipc_lib/ipc_stress_test.cc b/aos/ipc_lib/ipc_stress_test.cc
new file mode 100644
index 0000000..aa5d9c5
--- /dev/null
+++ b/aos/ipc_lib/ipc_stress_test.cc
@@ -0,0 +1,256 @@
+#include <errno.h>
+#include <libgen.h>
+#include <stdio.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/wait.h>
+#include <unistd.h>
+
+#include <chrono>
+#include <string>
+
+#include "aos/die.h"
+#include "aos/libc/aos_strsignal.h"
+#include "aos/libc/dirname.h"
+#include "aos/logging/logging.h"
+#include "aos/mutex/mutex.h"
+#include "aos/time/time.h"
+#include "aos/type_traits/type_traits.h"
+#include "aos/ipc_lib/core_lib.h"
+#include "aos/testing/test_shm.h"
+
+// This runs all of the IPC-related tests in a bunch of parallel processes for a
+// while and makes sure that they don't fail. It also captures the stdout and
+// stderr output from each test run and only prints it out (not interleaved with
+// the output from any other run) if the test fails.
+//
+// They have to be run in separate processes because (in addition to various
+// parts of our code not being thread-safe...) gtest does not like multiple
+// threads.
+//
+// It's written in C++ for performance. We need actual OS-level parallelism for
+// this to work, which means that Ruby's out because it doesn't have good
+// support for doing that. My Python implementation ended up pretty heavily disk
+// IO-bound, which is a bad way to test CPU contention.
+
+namespace aos {
+
+namespace chrono = ::std::chrono;
+
+// Each test is represented by the name of the test binary and then any
+// arguments to pass to it.
+// Using --gtest_filter is a bad idea because it seems to result in a lot of
+// swapping which causes everything to be disk-bound (at least for me).
+static const size_t kTestMaxArgs = 10;
+static const char * kTests[][kTestMaxArgs] = {
+  {"queue_test"},
+  {"condition_test"},
+  {"mutex_test"},
+  {"raw_queue_test"},
+};
+static const size_t kTestsLength = sizeof(kTests) / sizeof(kTests[0]);
+// These arguments get inserted before any per-test arguments.
+static const char *kDefaultArgs[] = {
+  "--gtest_repeat=30",
+  "--gtest_shuffle",
+};
+
+// How many test processes to run at a time.
+static const int kTesters = 100;
+// How long to test for.
+static constexpr monotonic_clock::duration kTestTime = chrono::seconds(30);
+
+// The structure that gets put into shared memory and then referenced by all of
+// the child processes.
+struct Shared {
+  Shared(const monotonic_clock::time_point stop_time)
+    : stop_time(stop_time), total_iterations(0) {}
+
+  // Synchronizes access to stdout/stderr to avoid interleaving failure
+  // messages.
+  Mutex output_mutex;
+
+  // When to stop.
+  monotonic_clock::time_point stop_time;
+
+  // The total number of iterations. Updated by each child as it finishes.
+  int total_iterations;
+  // Sychronizes writes to total_iterations
+  Mutex total_iterations_mutex;
+
+  const char *path;
+};
+static_assert(shm_ok<Shared>::value,
+              "it's going to get shared between forked processes");
+
+// Gets called after each child forks to run a test.
+void __attribute__((noreturn)) DoRunTest(
+    Shared *shared, const char *(*test)[kTestMaxArgs], int pipes[2]) {
+  if (close(pipes[0]) == -1) {
+    PDie("close(%d) of read end of pipe failed", pipes[0]);
+  }
+  if (close(STDIN_FILENO) == -1) {
+    PDie("close(STDIN_FILENO(=%d)) failed", STDIN_FILENO);
+  }
+  if (dup2(pipes[1], STDOUT_FILENO) == -1) {
+    PDie("dup2(%d, STDOUT_FILENO(=%d)) failed", pipes[1], STDOUT_FILENO);
+  }
+  if (dup2(pipes[1], STDERR_FILENO) == -1) {
+    PDie("dup2(%d, STDERR_FILENO(=%d)) failed", pipes[1], STDERR_FILENO);
+  }
+
+  size_t size = kTestMaxArgs;
+  size_t default_size = sizeof(kDefaultArgs) / sizeof(kDefaultArgs[0]);
+  // There's no chance to free this because we either exec or Die.
+  const char **args = new const char *[size + default_size + 1];
+  // The actual executable to run.
+  ::std::string executable;
+  int i = 0;
+  for (size_t test_i = 0; test_i < size; ++test_i) {
+    const char *c = (*test)[test_i];
+    if (i == 0) {
+      executable = ::std::string(shared->path) + "/" + c;
+      args[0] = executable.c_str();
+      for (const ::std::string &ci : kDefaultArgs) {
+        args[++i] = ci.c_str();
+      }
+    } else {
+      args[i] = c;
+    }
+    ++i;
+  }
+  args[size] = NULL;
+  execv(executable.c_str(), const_cast<char *const *>(args));
+  PDie("execv(%s, %p) failed", executable.c_str(), args);
+}
+
+void DoRun(Shared *shared) {
+  int iterations = 0;
+  // An iterator pointing to a random one of the tests.
+  // We randomize based on PID because otherwise they all end up running the
+  // same test at the same time for the whole test.
+  const char *(*test)[kTestMaxArgs] = &kTests[getpid() % kTestsLength];
+  int pipes[2];
+  while (monotonic_clock::now() < shared->stop_time) {
+    if (pipe(pipes) == -1) {
+      PDie("pipe(%p) failed", &pipes);
+    }
+    switch (fork()) {
+      case 0:  // in runner
+        DoRunTest(shared, test, pipes);
+      case -1:
+        PDie("fork() failed");
+    }
+
+    if (close(pipes[1]) == -1) {
+      PDie("close(%d) of write end of pipe failed", pipes[1]);
+    }
+
+    ::std::string output;
+    char buffer[2048];
+    while (true) {
+      ssize_t ret = read(pipes[0], &buffer, sizeof(buffer));
+      if (ret == 0) {  // EOF
+        if (close(pipes[0]) == -1) {
+          PDie("close(%d) of pipe at EOF failed", pipes[0]);
+        }
+        break;
+      } else if (ret == -1) {
+        PDie("read(%d, %p, %zd) failed", pipes[0], &buffer, sizeof(buffer));
+      }
+      output += ::std::string(buffer, ret);
+    }
+
+    int status;
+    while (true) {
+      if (wait(&status) == -1) {
+        if (errno == EINTR) continue;
+        PDie("wait(%p) in child failed", &status);
+      } else {
+        break;
+      }
+    }
+    if (WIFEXITED(status)) {
+      if (WEXITSTATUS(status) != 0) {
+        MutexLocker sync(&shared->output_mutex);
+        fprintf(stderr, "Test %s exited with status %d. output:\n",
+                (*test)[0], WEXITSTATUS(status));
+        fputs(output.c_str(), stderr);
+      }
+    } else if (WIFSIGNALED(status)) {
+      MutexLocker sync(&shared->output_mutex);
+      fprintf(stderr, "Test %s terminated by signal %d: %s.\n", (*test)[0],
+              WTERMSIG(status), aos_strsignal(WTERMSIG(status)));
+        fputs(output.c_str(), stderr);
+    } else {
+      CHECK(WIFSTOPPED(status));
+      Die("Test %s was stopped.\n", (*test)[0]);
+    }
+
+    ++test;
+    if (test == kTests + 1) test = kTests;
+    ++iterations;
+  }
+  {
+    MutexLocker sync(&shared->total_iterations_mutex);
+    shared->total_iterations += iterations;
+  }
+}
+
+void Run(Shared *shared) {
+  switch (fork()) {
+    case 0:  // in child
+      DoRun(shared);
+      _exit(EXIT_SUCCESS);
+    case -1:
+      PDie("fork() of child failed");
+  }
+}
+
+int Main(int argc, char **argv) {
+  if (argc < 1) {
+    fputs("need an argument\n", stderr);
+    return EXIT_FAILURE;
+  }
+
+  ::aos::testing::TestSharedMemory my_shm_;
+
+  Shared *shared = static_cast<Shared *>(shm_malloc(sizeof(Shared)));
+  new (shared) Shared(monotonic_clock::now() + kTestTime);
+
+  if (asprintf(const_cast<char **>(&shared->path),
+               "%s/../tests", ::aos::libc::Dirname(argv[0]).c_str()) == -1) {
+    PDie("asprintf failed");
+  }
+
+  for (int i = 0; i < kTesters; ++i) {
+    Run(shared);
+  }
+
+  bool error = false;
+  for (int i = 0; i < kTesters; ++i) {
+    int status;
+    if (wait(&status) == -1) {
+      if (errno == EINTR) {
+        --i;
+      } else {
+        PDie("wait(%p) failed", &status);
+      }
+    }
+    if (!WIFEXITED(status) || WEXITSTATUS(status) != 0) {
+      error = true;
+    }
+  }
+
+  printf("Ran a total of %d tests.\n", shared->total_iterations);
+  if (error) {
+    printf("A child had a problem during the test.\n");
+  }
+  return error ? EXIT_FAILURE : EXIT_SUCCESS;
+}
+
+}  // namespace aos
+
+int main(int argc, char **argv) {
+  return ::aos::Main(argc, argv);
+}
diff --git a/aos/ipc_lib/queue.cc b/aos/ipc_lib/queue.cc
new file mode 100644
index 0000000..c751a7e
--- /dev/null
+++ b/aos/ipc_lib/queue.cc
@@ -0,0 +1,563 @@
+#if !AOS_DEBUG
+#undef NDEBUG
+#define NDEBUG
+#endif
+
+#include "aos/ipc_lib/queue.h"
+
+#include <stdio.h>
+#include <string.h>
+#include <errno.h>
+#include <assert.h>
+
+#include <memory>
+#include <algorithm>
+
+#include "aos/type_traits/type_traits.h"
+#include "aos/ipc_lib/core_lib.h"
+
+namespace aos {
+namespace {
+
+static_assert(shm_ok<RawQueue>::value,
+              "RawQueue instances go into shared memory");
+
+const bool kReadDebug = false;
+const bool kWriteDebug = false;
+const bool kRefDebug = false;
+const bool kFetchDebug = false;
+const bool kReadIndexDebug = false;
+
+// The number of extra messages the pool associated with each queue will be able
+// to hold (for readers who are slow about freeing them or who leak one when
+// they get killed).
+const int kExtraMessages = 20;
+
+}  // namespace
+
+constexpr Options<RawQueue>::Option RawQueue::kPeek;
+constexpr Options<RawQueue>::Option RawQueue::kFromEnd;
+constexpr Options<RawQueue>::Option RawQueue::kNonBlock;
+constexpr Options<RawQueue>::Option RawQueue::kBlock;
+constexpr Options<RawQueue>::Option RawQueue::kOverride;
+
+// This is what gets stuck in before each queue message in memory. It is always
+// allocated aligned to 8 bytes and its size has to maintain that alignment for
+// the message that follows immediately.
+struct RawQueue::MessageHeader {
+  MessageHeader *next;
+
+  // Gets the message header immediately preceding msg.
+  static MessageHeader *Get(const void *msg) {
+    return reinterpret_cast<MessageHeader *>(__builtin_assume_aligned(
+        static_cast<uint8_t *>(const_cast<void *>(msg)) - sizeof(MessageHeader),
+        alignof(MessageHeader)));
+  }
+
+  int32_t ref_count() const {
+    return __atomic_load_n(&ref_count_, __ATOMIC_RELAXED);
+  }
+  void set_ref_count(int32_t val) {
+    __atomic_store_n(&ref_count_, val, __ATOMIC_RELAXED);
+  }
+
+  void ref_count_sub() {
+    __atomic_sub_fetch(&ref_count_, 1, __ATOMIC_RELAXED);
+  }
+  void ref_count_add() {
+    __atomic_add_fetch(&ref_count_, 1, __ATOMIC_RELAXED);
+  }
+
+ private:
+  // This gets accessed with atomic instructions without any
+  // locks held by various member functions.
+  int32_t ref_count_;
+
+  // Padding to make the total size 8 bytes if we have 4-byte pointers or bump
+  // it to 16 if a pointer is 8 bytes by itself.
+#if __SIZEOF_POINTER__ == 8
+#ifdef __clang__
+  // Clang is smart enough to realize this is unused, but GCC doesn't like the
+  // attribute here...
+  __attribute__((unused))
+#endif
+  char padding[4];
+#elif __SIZEOF_POINTER__ == 4
+  // No padding needed to get 8 byte total size.
+#else
+#error Unknown pointer size.
+#endif
+};
+
+inline int RawQueue::index_add1(int index) {
+  // Doing it this way instead of with % is more efficient on ARM.
+  int r = index + 1;
+  assert(index <= data_length_);
+  if (r == data_length_) {
+    return 0;
+  } else {
+    return r;
+  }
+}
+
+void RawQueue::DecrementMessageReferenceCount(const void *msg) {
+  MessageHeader *header = MessageHeader::Get(msg);
+  header->ref_count_sub();
+  if (kRefDebug) {
+    printf("%p ref dec count: %p count=%d\n", this, msg, header->ref_count());
+  }
+
+  // The only way it should ever be 0 is if we were the last one to decrement,
+  // in which case nobody else should have it around to re-increment it or
+  // anything in the middle, so this is safe to do not atomically with the
+  // decrement.
+  if (header->ref_count() == 0) {
+    DoFreeMessage(msg);
+  } else {
+    assert(header->ref_count() > 0);
+  }
+}
+
+inline void RawQueue::IncrementMessageReferenceCount(const void *msg) const {
+  MessageHeader *const header = MessageHeader::Get(msg);
+  header->ref_count_add();
+  if (kRefDebug) {
+    printf("%p ref inc count: %p\n", this, msg);
+  }
+}
+
+inline void RawQueue::DoFreeMessage(const void *msg) {
+  MessageHeader *header = MessageHeader::Get(msg);
+  if (kRefDebug) {
+    printf("%p ref free to %p: %p\n", this, recycle_, msg);
+  }
+
+  if (__builtin_expect(recycle_ != nullptr, 0)) {
+    void *const new_msg = recycle_->GetMessage();
+    if (new_msg == nullptr) {
+      fprintf(stderr, "queue: couldn't get a message"
+              " for recycle queue %p\n", recycle_);
+    } else {
+      header->ref_count_add();
+      if (!recycle_->WriteMessage(const_cast<void *>(msg), kOverride)) {
+        fprintf(stderr, "queue: %p->WriteMessage(%p, kOverride) failed."
+                " aborting\n", recycle_, msg);
+        printf("see stderr\n");
+        abort();
+      }
+      msg = new_msg;
+      header = MessageHeader::Get(new_msg);
+    }
+  }
+
+  // This works around GCC bug 60272 (fixed in 4.8.3).
+  // new_next should just get replaced with header->next (and the body of the
+  // loop should become empty).
+  // The bug is that the store to new_next after the compare/exchange is
+  // unconditional but it should only be if it fails, which could mean
+  // overwriting what somebody else who preempted us right then changed it to.
+  // TODO(brians): Get rid of this workaround once we get a new enough GCC.
+  MessageHeader *new_next = __atomic_load_n(&free_messages_, __ATOMIC_RELAXED);
+  do {
+    header->next = new_next;
+  } while (__builtin_expect(
+      !__atomic_compare_exchange_n(&free_messages_, &new_next, header, true,
+                                   __ATOMIC_RELEASE, __ATOMIC_RELAXED),
+      0));
+}
+
+void *RawQueue::GetMessage() {
+  MessageHeader *header = __atomic_load_n(&free_messages_, __ATOMIC_RELAXED);
+  do {
+    if (__builtin_expect(header == nullptr, 0)) {
+      LOG(FATAL, "overused pool of queue %p (%s)\n", this, name_);
+    }
+  } while (__builtin_expect(
+      !__atomic_compare_exchange_n(&free_messages_, &header, header->next, true,
+                                   __ATOMIC_ACQ_REL, __ATOMIC_RELAXED),
+      0));
+  void *msg = reinterpret_cast<uint8_t *>(header + 1);
+  // It might be uninitialized, 0 from a previous use, or 1 from previously
+  // being recycled.
+  header->set_ref_count(1);
+  if (kRefDebug) {
+    printf("%p ref alloc: %p\n", this, msg);
+  }
+  return msg;
+}
+
+RawQueue::RawQueue(const char *name, size_t length, int hash, int queue_length)
+    : readable_(&data_lock_), writable_(&data_lock_) {
+  static_assert(shm_ok<RawQueue::MessageHeader>::value,
+                "the whole point is to stick it in shared memory");
+  static_assert((sizeof(RawQueue::MessageHeader) % 8) == 0,
+                "need to revalidate size/alignent assumptions");
+
+  if (queue_length < 1) {
+    LOG(FATAL, "queue length %d of %s needs to be at least 1\n", queue_length,
+        name);
+  }
+
+  const size_t name_size = strlen(name) + 1;
+  char *temp = static_cast<char *>(shm_malloc(name_size));
+  memcpy(temp, name, name_size);
+  name_ = temp;
+  length_ = length;
+  hash_ = hash;
+  queue_length_ = queue_length;
+
+  next_ = NULL;
+  recycle_ = NULL;
+
+  if (kFetchDebug) {
+    printf("initializing name=%s, length=%zd, hash=%d, queue_length=%d\n",
+           name, length, hash, queue_length);
+  }
+
+  data_length_ = queue_length + 1;
+  data_ = static_cast<void **>(shm_malloc(sizeof(void *) * data_length_));
+  data_start_ = 0;
+  data_end_ = 0;
+  messages_ = 0;
+
+  msg_length_ = length + sizeof(MessageHeader);
+
+  // Create all of the messages for the free list and stick them on.
+  {
+    MessageHeader *previous = nullptr;
+    for (int i = 0; i < queue_length + kExtraMessages; ++i) {
+      MessageHeader *const message =
+          static_cast<MessageHeader *>(shm_malloc(msg_length_));
+      free_messages_ = message;
+      message->next = previous;
+      previous = message;
+    }
+  }
+
+  readable_waiting_ = false;
+
+  if (kFetchDebug) {
+    printf("made queue %s\n", name);
+  }
+}
+
+RawQueue *RawQueue::Fetch(const char *name, size_t length, int hash,
+                    int queue_length) {
+  if (kFetchDebug) {
+    printf("fetching queue %s\n", name);
+  }
+  if (mutex_lock(&global_core->mem_struct->queues.lock) != 0) {
+    LOG(FATAL, "mutex_lock(%p) failed\n",
+        &global_core->mem_struct->queues.lock);
+  }
+  RawQueue *current = static_cast<RawQueue *>(
+      global_core->mem_struct->queues.pointer);
+  if (current != NULL) {
+    while (true) {
+      // If we found a matching queue.
+      if (strcmp(current->name_, name) == 0 && current->length_ == length &&
+          current->hash_ == hash && current->queue_length_ == queue_length) {
+        mutex_unlock(&global_core->mem_struct->queues.lock);
+        return current;
+      } else {
+        if (kFetchDebug) {
+          printf("rejected queue %s strcmp=%d target=%s\n", current->name_,
+                 strcmp(current->name_, name), name);
+        }
+      }
+      // If this is the last one.
+      if (current->next_ == NULL) break;
+      current = current->next_;
+    }
+  }
+
+  RawQueue *r = new (shm_malloc(sizeof(RawQueue)))
+      RawQueue(name, length, hash, queue_length);
+  if (current == NULL) {  // if we don't already have one
+    global_core->mem_struct->queues.pointer = r;
+  } else {
+    current->next_ = r;
+  }
+
+  mutex_unlock(&global_core->mem_struct->queues.lock);
+  return r;
+}
+
+RawQueue *RawQueue::Fetch(const char *name, size_t length, int hash,
+                    int queue_length,
+                    int recycle_hash, int recycle_length, RawQueue **recycle) {
+  RawQueue *r = Fetch(name, length, hash, queue_length);
+  r->recycle_ = Fetch(name, length, recycle_hash, recycle_length);
+  if (r == r->recycle_) {
+    fprintf(stderr, "queue: r->recycle_(=%p) == r(=%p)\n", r->recycle_, r);
+    printf("see stderr\n");
+    r->recycle_ = NULL;
+    abort();
+  }
+  *recycle = r->recycle_;
+  return r;
+}
+
+bool RawQueue::DoWriteMessage(void *msg, Options<RawQueue> options) {
+  if (kWriteDebug) {
+    printf("queue: %p->WriteMessage(%p, %x)\n", this, msg, options.printable());
+  }
+
+  bool signal_readable;
+
+  {
+    IPCMutexLocker locker(&data_lock_);
+    CHECK(!locker.owner_died());
+
+    int new_end;
+    while (true) {
+      new_end = index_add1(data_end_);
+      // If there is room in the queue right now.
+      if (new_end != data_start_) break;
+      if (options & kNonBlock) {
+        if (kWriteDebug) {
+          printf("queue: not blocking on %p. returning false\n", this);
+        }
+        DecrementMessageReferenceCount(msg);
+        return false;
+      } else if (options & kOverride) {
+        if (kWriteDebug) {
+          printf("queue: overriding on %p\n", this);
+        }
+        // Avoid leaking the message that we're going to overwrite.
+        DecrementMessageReferenceCount(data_[data_start_]);
+        data_start_ = index_add1(data_start_);
+      } else {  // kBlock
+        assert(options & kBlock);
+        if (kWriteDebug) {
+          printf("queue: going to wait for writable_ of %p\n", this);
+        }
+        CHECK(!writable_.Wait());
+      }
+    }
+    data_[data_end_] = msg;
+    ++messages_;
+    data_end_ = new_end;
+
+    signal_readable = readable_waiting_;
+    readable_waiting_ = false;
+  }
+
+  if (signal_readable) {
+    if (kWriteDebug) {
+      printf("queue: broadcasting to readable_ of %p\n", this);
+    }
+    readable_.Broadcast();
+  } else if (kWriteDebug) {
+    printf("queue: skipping broadcast to readable_ of %p\n", this);
+  }
+
+  if (kWriteDebug) {
+    printf("queue: write returning true on queue %p\n", this);
+  }
+  return true;
+}
+
+inline void RawQueue::ReadCommonEnd() {
+  if (is_writable()) {
+    if (kReadDebug) {
+      printf("queue: %ssignalling writable_ of %p\n",
+             writable_start_ ? "not " : "", this);
+    }
+    if (!writable_start_) writable_.Broadcast();
+  }
+}
+
+bool RawQueue::ReadCommonStart(Options<RawQueue> options, int *index) {
+  while (data_start_ == data_end_ || ((index != NULL) && messages_ <= *index)) {
+    if (options & kNonBlock) {
+      if (kReadDebug) {
+        printf("queue: not going to block waiting on %p\n", this);
+      }
+      return false;
+    } else {  // kBlock
+      assert(options & kBlock);
+      if (kReadDebug) {
+        printf("queue: going to wait for readable_ of %p\n", this);
+      }
+      readable_waiting_ = true;
+      // Wait for a message to become readable.
+      CHECK(!readable_.Wait());
+      if (kReadDebug) {
+        printf("queue: done waiting for readable_ of %p\n", this);
+      }
+    }
+  }
+  // We have to check down here because we might have unlocked the mutex while
+  // Wait()ing above so this value might have changed.
+  writable_start_ = is_writable();
+  if (kReadDebug) {
+    printf("queue: %p->read(%p) start=%d end=%d writable_start=%d\n",
+           this, index, data_start_, data_end_, writable_start_);
+  }
+  return true;
+}
+
+inline int RawQueue::LastMessageIndex() const {
+  int pos = data_end_ - 1;
+  if (pos < 0) {  // If it wrapped around.
+    pos = data_length_ - 1;
+  }
+  return pos;
+}
+
+const void *RawQueue::DoReadMessage(Options<RawQueue> options) {
+  // TODO(brians): Test this function.
+  if (kReadDebug) {
+    printf("queue: %p->ReadMessage(%x)\n", this, options.printable());
+  }
+  void *msg = NULL;
+
+  IPCMutexLocker locker(&data_lock_);
+  CHECK(!locker.owner_died());
+
+  if (!ReadCommonStart(options, nullptr)) {
+    if (kReadDebug) {
+      printf("queue: %p common returned false\n", this);
+    }
+    return NULL;
+  }
+
+  if (options & kFromEnd) {
+    if (options & kPeek) {
+      if (kReadDebug) {
+        printf("queue: %p shortcutting c2: %d\n", this, LastMessageIndex());
+      }
+      msg = data_[LastMessageIndex()];
+      IncrementMessageReferenceCount(msg);
+    } else {
+      while (true) {
+        if (kReadDebug) {
+          printf("queue: %p start of c2\n", this);
+        }
+        // This loop pulls each message out of the buffer.
+        const int pos = data_start_;
+        data_start_ = index_add1(data_start_);
+        // If this is the last one.
+        if (data_start_ == data_end_) {
+          if (kReadDebug) {
+            printf("queue: %p reading from c2: %d\n", this, pos);
+          }
+          msg = data_[pos];
+          break;
+        }
+        // This message is not going to be in the queue any more.
+        DecrementMessageReferenceCount(data_[pos]);
+      }
+    }
+  } else {
+    if (kReadDebug) {
+      printf("queue: %p reading from d2: %d\n", this, data_start_);
+    }
+    msg = data_[data_start_];
+    if (options & kPeek) {
+      IncrementMessageReferenceCount(msg);
+    } else {
+      data_start_ = index_add1(data_start_);
+    }
+  }
+  ReadCommonEnd();
+  if (kReadDebug) {
+    printf("queue: %p read returning %p\n", this, msg);
+  }
+  return msg;
+}
+
+const void *RawQueue::DoReadMessageIndex(Options<RawQueue> options,
+                                         int *index) {
+  if (kReadDebug) {
+    printf("queue: %p->ReadMessageIndex(%x, %p(*=%d))\n",
+           this, options.printable(), index, *index);
+  }
+  void *msg = NULL;
+
+  IPCMutexLocker locker(&data_lock_);
+  CHECK(!locker.owner_died());
+
+  if (!ReadCommonStart(options, index)) {
+    if (kReadDebug) {
+      printf("queue: %p common returned false\n", this);
+    }
+    return NULL;
+  }
+
+  // TODO(parker): Handle integer wrap on the index.
+
+  if (options & kFromEnd) {
+    if (kReadDebug) {
+      printf("queue: %p reading from c1: %d\n", this, LastMessageIndex());
+    }
+    msg = data_[LastMessageIndex()];
+
+    // We'd skip this if we had kPeek, but kPeek | kFromEnd isn't valid for
+    // reading with an index.
+    *index = messages_;
+  } else {
+    // Where we're going to start reading.
+    int my_start;
+
+    const int unread_messages = messages_ - *index;
+    assert(unread_messages > 0);
+    int current_messages = data_end_ - data_start_;
+    if (current_messages < 0) current_messages += data_length_;
+    if (kReadIndexDebug) {
+      printf("queue: %p start=%d end=%d current=%d\n",
+             this, data_start_, data_end_, current_messages);
+    }
+    assert(current_messages > 0);
+    // If we're behind the available messages.
+    if (unread_messages > current_messages) {
+      // Catch index up to the last available message.
+      *index = messages_ - current_messages;
+      // And that's the one we're going to read.
+      my_start = data_start_;
+      if (kReadIndexDebug) {
+        printf("queue: %p jumping ahead to message %d (have %d) (at %d)\n",
+               this, *index, messages_, data_start_);
+      }
+    } else {
+      // Just start reading at the first available message that we haven't yet
+      // read.
+      my_start = data_end_ - unread_messages;
+      if (kReadIndexDebug) {
+        printf("queue: %p original read from %d\n", this, my_start);
+      }
+      if (data_start_ < data_end_) {
+        assert(my_start >= 0);
+      }
+      if (my_start < 0) my_start += data_length_;
+    }
+
+    if (kReadDebug) {
+      printf("queue: %p reading from d1: %d\n", this, my_start);
+    }
+    // We have to be either after the start or before the end, even if the queue
+    // is wrapped around (should be both if it's not).
+    assert((my_start >= data_start_) || (my_start < data_end_));
+    // More sanity checking.
+    assert((my_start >= 0) && (my_start < data_length_));
+    msg = data_[my_start];
+    if (!(options & kPeek)) ++(*index);
+  }
+  IncrementMessageReferenceCount(msg);
+
+  ReadCommonEnd();
+  return msg;
+}
+
+int RawQueue::FreeMessages() const {
+  int r = 0;
+  MessageHeader *header = free_messages_;
+  while (header != nullptr) {
+    ++r;
+    header = header->next;
+  }
+  return r;
+}
+
+}  // namespace aos
diff --git a/aos/ipc_lib/queue.h b/aos/ipc_lib/queue.h
new file mode 100644
index 0000000..f21295f
--- /dev/null
+++ b/aos/ipc_lib/queue.h
@@ -0,0 +1,231 @@
+#ifndef AOS_IPC_LIB_QUEUE_H_
+#define AOS_IPC_LIB_QUEUE_H_
+
+#include "aos/ipc_lib/shared_mem.h"
+#include "aos/mutex/mutex.h"
+#include "aos/condition.h"
+#include "aos/util/options.h"
+#include "aos/logging/logging.h"
+
+// TODO(brians) add valgrind client requests to the queue and shared_mem_malloc
+// code to make checking for leaks work better
+// <http://www.valgrind.org/docs/manual/mc-manual.html#mc-manual.mempools>
+// describes how
+
+// Any pointers returned from these functions can be safely passed to other
+// processes because they are all shared memory pointers.
+// IMPORTANT: Any message pointer must be passed back in some way
+// (FreeMessage and WriteMessage are common ones) or the
+// application will leak shared memory.
+// NOTE: Taking a message from ReadMessage and then passing it to WriteMessage
+// might work, but it is not guaranteed to.
+
+namespace aos {
+
+// Queues are the primary way to use shared memory. Basic use consists of
+// calling Queue::Fetch and then reading and/or writing messages.
+// Queues (as the name suggests) are a FIFO stack of messages. Each combination
+// of name and type signature will result in a different queue, which means
+// that if you only recompile some code that uses differently sized messages,
+// it will simply use a different queue than the old code.
+class RawQueue {
+ public:
+  // Retrieves (and creates if necessary) a queue. Each combination of name and
+  // signature refers to a completely independent queue.
+  // length is how large each message will be
+  // hash can differentiate multiple otherwise identical queues
+  // queue_length is how many messages the queue will be able to hold
+  // Will never return NULL.
+  static RawQueue *Fetch(const char *name, size_t length, int hash,
+                         int queue_length);
+  // Same as above, except sets up the returned queue so that it will put
+  // messages on *recycle when they are freed (after they have been released by
+  // all other readers/writers and are not in the queue).
+  // recycle_queue_length determines how many freed messages will be kept.
+  // Other code can retrieve the 2 queues separately (the recycle queue will
+  // have the same length and hash as the main one). However, any frees made
+  // using a queue with only (name,length,hash,queue_length) before the
+  // recycle queue has been associated with it will not go on to the recycle
+  // queue.
+  // NOTE: calling this function with the same (name,length,hash,queue_length)
+  // but multiple recycle_queue_lengths will result in each freed message being
+  // put onto an undefined one of the recycle queues.
+  // Will never return NULL.
+  static RawQueue *Fetch(const char *name, size_t length, int hash,
+                         int queue_length, int recycle_hash,
+                         int recycle_queue_length, RawQueue **recycle);
+
+  // Doesn't update the currently read index (the read messages in the queue or
+  // the index). This means the returned message (and any others skipped with
+  // kFromEnd) will be left in the queue.
+  // For reading only.
+  // Not valid for ReadMessageIndex combined with kFromEnd.
+  static constexpr Options<RawQueue>::Option kPeek{0x0001};
+  // Reads the last message in the queue instead of just the next one.
+  // NOTE: This removes all of the messages until the last one from the queue
+  // (which means that nobody else will read them).
+  // For reading only.
+  // Not valid for ReadMessageIndex combined with kPeek.
+  static constexpr Options<RawQueue>::Option kFromEnd{0x0002};
+  // Causes reads to return NULL and writes to fail instead of waiting.
+  // For reading and writing.
+  static constexpr Options<RawQueue>::Option kNonBlock{0x0004};
+  // Causes things to block.
+  // For reading and writing.
+  static constexpr Options<RawQueue>::Option kBlock{0x0008};
+  // Causes writes to overwrite the oldest message in the queue instead of
+  // blocking.
+  // For writing only.
+  static constexpr Options<RawQueue>::Option kOverride{0x0010};
+
+  RawQueue(const RawQueue &) = default;
+  RawQueue &operator=(const RawQueue &) = default;
+
+  // Writes a message into the queue.
+  // This function takes ownership of msg.
+  // NOTE: msg must point to a valid message from this queue
+  // Returns true on success. A return value of false means msg has already been
+  // freed.
+  bool WriteMessage(void *msg, Options<RawQueue> options) {
+    static constexpr Options<RawQueue> kWriteFailureOptions =
+        kNonBlock | kBlock | kOverride;
+    if (!options.NoOthersSet(kWriteFailureOptions)) {
+      LOG(FATAL, "illegal write options in %x\n", options.printable());
+    }
+    if (!options.ExactlyOneSet(kWriteFailureOptions)) {
+      LOG(FATAL, "invalid write options %x\n", options.printable());
+    }
+    return DoWriteMessage(msg, options);
+  }
+
+  // Reads a message out of the queue.
+  // The return value will have at least the length of this queue's worth of
+  // valid data where it's pointing to.
+  // The return value is const because other people might be viewing the same
+  // messsage. Do not cast the const away!
+  // IMPORTANT: The return value (if not NULL) must eventually be passed to
+  // FreeMessage.
+  const void *ReadMessage(Options<RawQueue> options) {
+    CheckReadOptions(options);
+    return DoReadMessage(options);
+  }
+  // The same as ReadMessage, except it will never return the
+  // same message twice (when used with the same index argument). However,
+  // may not return some messages that pass through the queue.
+  // *index should start as 0. index does not have to be in shared memory, but
+  // it can be.
+  // Calling with both kPeek and kFromEnd in options isn't valid because that
+  // would mean ignoring index, which would make this function the same as
+  // ReadMessage (which should be used instead).
+  const void *ReadMessageIndex(Options<RawQueue> options, int *index) {
+    CheckReadOptions(options);
+    static constexpr Options<RawQueue> kFromEndAndPeek = kFromEnd | kPeek;
+    if (options.AllSet(kFromEndAndPeek)) {
+      LOG(FATAL, "ReadMessageIndex(kFromEnd | kPeek) is not allowed\n");
+    }
+    return DoReadMessageIndex(options, index);
+  }
+
+  // Retrieves ("allocates") a message that can then be written to the queue.
+  // NOTE: the return value will be completely uninitialized
+  // The return value will have at least the length of this queue's worth of
+  // valid memory where it's pointing to.
+  // Returns NULL for error.
+  // IMPORTANT: The return value (if not NULL) must eventually be passed to
+  // FreeMessage or WriteMessage.
+  void *GetMessage();
+
+  // It is ok to call this method with a NULL msg.
+  void FreeMessage(const void *msg) {
+    if (msg != NULL) DecrementMessageReferenceCount(msg);
+  }
+
+  // UNSAFE! Returns the number of free messages we have. Only safe to use when
+  // only 1 task is using this object (ie in tests).
+  int FreeMessages() const;
+
+ private:
+  struct MessageHeader;
+
+  // The public wrappers around these are inlined and do argument checking.
+  bool DoWriteMessage(void *msg, Options<RawQueue> options);
+  const void *DoReadMessage(Options<RawQueue> options);
+  const void *DoReadMessageIndex(Options<RawQueue> options, int *index);
+  void CheckReadOptions(Options<RawQueue> options) {
+    static constexpr Options<RawQueue> kValidOptions =
+        kPeek | kFromEnd | kNonBlock | kBlock;
+    if (!options.NoOthersSet(kValidOptions)) {
+      LOG(FATAL, "illegal read options in %x\n", options.printable());
+    }
+    static constexpr Options<RawQueue> kBlockChoices = kNonBlock | kBlock;
+    if (!options.ExactlyOneSet(kBlockChoices)) {
+      LOG(FATAL, "invalid read options %x\n", options.printable());
+    }
+  }
+
+  // Adds 1 to the given index and handles wrapping correctly.
+  int index_add1(int index);
+
+  bool is_readable() { return data_end_ != data_start_; }
+  bool is_writable() { return index_add1(data_end_) != data_start_; }
+
+  // These next 4 allow finding the right one.
+  const char *name_;
+  size_t length_;
+  int hash_;
+  int queue_length_;
+  // The next one in the linked list of queues.
+  RawQueue *next_;
+
+  RawQueue *recycle_;
+
+  Mutex data_lock_;  // protects operations on data_ etc
+  // Always gets broadcasted to because different readers might have different
+  // ideas of what "readable" means (ie ones using separated indices).
+  Condition readable_;
+  Condition writable_;
+  int data_length_;  // max length into data + 1
+  int data_start_;  // is an index into data
+  int data_end_;  // is an index into data
+  int messages_;  // that have passed through
+  void **data_;  // array of messages (with headers)
+
+  size_t msg_length_;  // sizeof(each message) including the header
+  // A pointer to the first in the linked list of free messages.
+  MessageHeader *free_messages_;
+
+  // Keeps track of if the queue was writable before a read so we can Signal() a
+  // reader if we transition it.
+  bool writable_start_;
+
+  // True iff somebody is currently Wait()ing on readable_.
+  // Set to true by each reader before calling Wait() and set back to false
+  // before the Broadcast().
+  bool readable_waiting_;
+
+  // Actually frees the given message.
+  void DoFreeMessage(const void *msg);
+  // Calls DoFreeMessage if appropriate.
+  void DecrementMessageReferenceCount(const void *msg);
+  // Only does the actual incrementing of the reference count.
+  void IncrementMessageReferenceCount(const void *msg) const;
+
+  // Must be called with data_lock_ locked.
+  // *read_data will be initialized.
+  // Returns with a readable message in data_ or false.
+  bool ReadCommonStart(Options<RawQueue> options, int *index);
+  // Deals with setting/unsetting readable_ and writable_.
+  // Must be called after data_lock_ has been unlocked.
+  // read_data should be the same thing that was passed in to ReadCommonStart.
+  void ReadCommonEnd();
+  // Returns the index of the last message.
+  // Useful for reading with kPeek.
+  int LastMessageIndex() const;
+
+  // Gets called by Fetch when necessary (with placement new).
+  RawQueue(const char *name, size_t length, int hash, int queue_length);
+};
+
+}  // namespace aos
+
+#endif  // AOS_IPC_LIB_QUEUE_H_
diff --git a/aos/ipc_lib/raw_queue_test.cc b/aos/ipc_lib/raw_queue_test.cc
new file mode 100644
index 0000000..ab98b1f
--- /dev/null
+++ b/aos/ipc_lib/raw_queue_test.cc
@@ -0,0 +1,1046 @@
+#include "aos/ipc_lib/queue.h"
+
+#include <unistd.h>
+#include <sys/mman.h>
+#include <inttypes.h>
+
+#include <chrono>
+#include <ostream>
+#include <memory>
+#include <map>
+
+#include "gtest/gtest.h"
+
+#include "aos/ipc_lib/core_lib.h"
+#include "aos/type_traits/type_traits.h"
+#include "aos/testing/test_shm.h"
+#include "aos/time/time.h"
+#include "aos/logging/logging.h"
+#include "aos/die.h"
+#include "aos/util/thread.h"
+#include "aos/util/options.h"
+#include "aos/util/death_test_log_implementation.h"
+#include "aos/testing/prevent_exit.h"
+
+using ::testing::AssertionResult;
+using ::testing::AssertionSuccess;
+using ::testing::AssertionFailure;
+
+namespace aos {
+namespace testing {
+
+namespace chrono = ::std::chrono;
+namespace this_thread = ::std::this_thread;
+
+// The same constant from queue.cc. This will have to be updated if that one is.
+const int kExtraMessages = 20;
+
+class RawQueueTest : public ::testing::Test {
+ protected:
+  static const size_t kFailureSize = 400;
+  static char *fatal_failure;
+ private:
+  enum class ResultType : uint8_t {
+    NotCalled,
+    Called,
+    Returned,
+  };
+  const std::string ResultTypeString(volatile const ResultType &result) {
+    switch (result) {
+      case ResultType::Returned:
+        return "Returned";
+      case ResultType::Called:
+        return "Called";
+      case ResultType::NotCalled:
+        return "NotCalled";
+      default:
+        return std::string("unknown(") +
+               ::std::to_string(static_cast<uint8_t>(result)) + ")";
+    }
+  }
+  static_assert(aos::shm_ok<ResultType>::value,
+                "this will get put in shared memory");
+  template<typename T>
+  struct FunctionToCall {
+    FunctionToCall() : result(ResultType::NotCalled), started() {
+    }
+
+    volatile ResultType result;
+    bool expected;
+    void (*function)(T*, char*);
+    T *arg;
+    volatile char failure[kFailureSize];
+    aos_futex started;
+  };
+  template<typename T>
+  static void Hangs_(FunctionToCall<T> *const to_call) {
+    this_thread::sleep_for(chrono::milliseconds(10));
+    ASSERT_EQ(1, futex_set(&to_call->started));
+    to_call->result = ResultType::Called;
+    to_call->function(to_call->arg, const_cast<char *>(to_call->failure));
+    to_call->result = ResultType::Returned;
+  }
+
+  // How long until a function is considered to have hung.
+  static constexpr chrono::nanoseconds kHangTime = chrono::milliseconds(90);
+  // How long to sleep after forking (for debugging).
+  static constexpr chrono::nanoseconds kForkSleep = chrono::milliseconds(0);
+
+  // Represents a process that has been forked off. The destructor kills the
+  // process and wait(2)s for it.
+  class ForkedProcess {
+   public:
+    ForkedProcess(pid_t pid, aos_futex *done)
+        : pid_(pid), done_(done), exiting_(false) {};
+    ~ForkedProcess() {
+      if (!exiting_) {
+        if (kill(pid_, SIGTERM) == -1) {
+          if (errno == ESRCH) {
+            printf("process %jd was already dead\n",
+                   static_cast<intmax_t>(pid_));
+          } else {
+            PLOG(FATAL, "kill(SIGKILL, %jd) failed",
+                 static_cast<intmax_t>(pid_));
+          }
+        }
+      }
+      const pid_t ret = wait(NULL);
+      if (ret == -1) {
+        LOG(WARNING, "wait(NULL) failed."
+            " child %jd might still be alive\n",
+            static_cast<intmax_t>(pid_));
+      } else if (ret == 0) {
+        LOG(WARNING, "child %jd wasn't waitable. it might still be alive\n",
+            static_cast<intmax_t>(pid_));
+      } else if (ret != pid_) {
+        LOG(WARNING, "child %d is now confirmed dead"
+            ", but child %jd might still be alive\n",
+            ret, static_cast<intmax_t>(pid_));
+      }
+    }
+
+    enum class JoinResult {
+      Finished, Hung, Error
+    };
+    JoinResult Join(chrono::nanoseconds timeout = kHangTime) {
+      struct timespec done_timeout;
+      {
+        auto full_timeout = kForkSleep + timeout;
+        ::std::chrono::seconds sec =
+            ::std::chrono::duration_cast<::std::chrono::seconds>(full_timeout);
+        ::std::chrono::nanoseconds nsec =
+            ::std::chrono::duration_cast<::std::chrono::nanoseconds>(
+                full_timeout - sec);
+        done_timeout.tv_sec = sec.count();
+        done_timeout.tv_nsec = nsec.count();
+      }
+
+      switch (futex_wait_timeout(done_, &done_timeout)) {
+        case 2:
+          return JoinResult::Hung;
+        case 0:
+          exiting_ = true;
+          return JoinResult::Finished;
+        default:
+          return JoinResult::Error;
+      }
+    }
+
+   private:
+    const pid_t pid_;
+    aos_futex *const done_;
+    // True iff we know that the process is already exiting.
+    bool exiting_;
+  } __attribute__((unused));
+
+  // State for HangsFork and HangsCheck.
+  typedef uint8_t ChildID;
+  static void ReapExitHandler() {
+    for (auto it = children_.begin(); it != children_.end(); ++it) {
+      delete it->second;
+    }
+  }
+  static std::map<ChildID, ForkedProcess *> children_;
+  std::map<ChildID, FunctionToCall<void> *> to_calls_;
+
+  void SetUp() override {
+    ::testing::Test::SetUp();
+
+    SetDieTestMode(true);
+
+    fatal_failure = static_cast<char *>(shm_malloc(sizeof(fatal_failure)));
+    static bool registered = false;
+    if (!registered) {
+      atexit(ReapExitHandler);
+      registered = true;
+    }
+  }
+
+ protected:
+  // function gets called with arg in a forked process.
+  // Leaks shared memory.
+  template<typename T> __attribute__((warn_unused_result))
+  std::unique_ptr<ForkedProcess> ForkExecute(void (*function)(T*), T *arg) {
+    aos_futex *done = static_cast<aos_futex *>(shm_malloc_aligned(
+            sizeof(*done), alignof(aos_futex)));
+    *done = 0;
+    const pid_t pid = fork();
+    switch (pid) {
+      case 0:  // child
+        if (kForkSleep != chrono::milliseconds(0)) {
+          LOG(INFO, "pid %jd sleeping for %" PRId64 "ns\n",
+              static_cast<intmax_t>(getpid()), kForkSleep.count());
+          this_thread::sleep_for(kForkSleep);
+        }
+        ::aos::testing::PreventExit();
+        function(arg);
+        CHECK_NE(-1, futex_set(done));
+        exit(EXIT_SUCCESS);
+      case -1:  // parent failure
+        PLOG(ERROR, "fork() failed");
+        return std::unique_ptr<ForkedProcess>();
+      default:  // parent
+        return std::unique_ptr<ForkedProcess>(new ForkedProcess(pid, done));
+    }
+  }
+
+  // Checks whether or not the given function hangs.
+  // expected is whether to return success or failure if the function hangs
+  // NOTE: There are other reasons for it to return a failure than the function
+  // doing the wrong thing.
+  // Leaks shared memory.
+  template<typename T>
+  AssertionResult Hangs(void (*function)(T*, char*), T *arg, bool expected) {
+    AssertionResult fork_result(HangsFork<T>(function, arg, expected, 0));
+    if (!fork_result) {
+      return fork_result;
+    }
+    return HangsCheck(0);
+  }
+  // Starts the first part of Hangs.
+  // Use HangsCheck to get the result.
+  // Returns whether the fork succeeded or not, NOT whether or not the hang
+  // check succeeded.
+  template<typename T>
+  AssertionResult HangsFork(void (*function)(T*, char *), T *arg,
+                            bool expected, ChildID id) {
+    static_assert(aos::shm_ok<FunctionToCall<T>>::value,
+                  "this is going into shared memory");
+    FunctionToCall<T> *const to_call =
+        static_cast<FunctionToCall<T> *>(
+            shm_malloc_aligned(sizeof(*to_call), alignof(FunctionToCall<T>)));
+    new (to_call) FunctionToCall<T>();
+    to_call->function = function;
+    to_call->arg = arg;
+    to_call->expected = expected;
+    to_call->failure[0] = '\0';
+    static_cast<char *>(fatal_failure)[0] = '\0';
+    children_[id] = ForkExecute(Hangs_, to_call).release();
+    if (!children_[id]) return AssertionFailure() << "ForkExecute failed";
+    CHECK_EQ(0, futex_wait(&to_call->started));
+    to_calls_[id] = reinterpret_cast<FunctionToCall<void> *>(to_call);
+    return AssertionSuccess();
+  }
+  // Checks whether or not a function hung like it was supposed to.
+  // Use HangsFork first.
+  // NOTE: calls to HangsFork and HangsCheck with the same id argument will
+  // correspond, but they do not nest. Also, id 0 is used by Hangs.
+  // Return value is the same as Hangs.
+  AssertionResult HangsCheck(ChildID id) {
+    std::unique_ptr<ForkedProcess> child(children_[id]);
+    children_.erase(id);
+    const ForkedProcess::JoinResult result = child->Join();
+    if (to_calls_[id]->failure[0] != '\0') {
+      return AssertionFailure() << "function says: "
+          << const_cast<char *>(to_calls_[id]->failure);
+    }
+    if (result == ForkedProcess::JoinResult::Finished) {
+      return !to_calls_[id]->expected ? AssertionSuccess() : (AssertionFailure()
+          << "something happened and the the test only got to "
+          << ResultTypeString(to_calls_[id]->result));
+    } else {
+      if (to_calls_[id]->result == ResultType::Called) {
+        return to_calls_[id]->expected ? AssertionSuccess() :
+            AssertionFailure();
+      } else if (result == ForkedProcess::JoinResult::Error) {
+        return AssertionFailure() << "error joining child";
+      } else {
+        if (to_calls_[id]->result == ResultType::NotCalled) {
+          return AssertionFailure() << "stuff took too long getting started";
+        }
+        return AssertionFailure() << "something weird happened";
+      }
+    }
+  }
+#define EXPECT_HANGS(function, arg) \
+  EXPECT_HANGS_COND(function, arg, true, EXPECT_TRUE)
+#define EXPECT_RETURNS(function, arg) \
+  EXPECT_HANGS_COND(function, arg, false, EXPECT_TRUE)
+#define EXPECT_RETURNS_FAILS(function, arg) \
+  EXPECT_HANGS_COND(function, arg, false, EXPECT_FALSE)
+#define EXPECT_HANGS_COND(function, arg, hangs, cond) do { \
+  cond(Hangs(function, arg, hangs)); \
+  if (fatal_failure[0] != '\0') { \
+    FAIL() << fatal_failure; \
+  } \
+} while (false)
+
+  struct TestMessage {
+    // Some contents because we don't really want to test empty messages.
+    int16_t data;
+  };
+  struct MessageArgs {
+    RawQueue *const queue;
+    Options<RawQueue> flags;
+    int16_t data;  // -1 means NULL expected
+  };
+  static void WriteTestMessage(MessageArgs *args, char *failure) {
+    TestMessage *msg = static_cast<TestMessage *>(args->queue->GetMessage());
+    if (msg == NULL) {
+      snprintf(fatal_failure, kFailureSize,
+               "couldn't get_msg from %p", args->queue);
+      return;
+    }
+    msg->data = args->data;
+    if (!args->queue->WriteMessage(msg, args->flags)) {
+      snprintf(failure, kFailureSize, "%p->WriteMessage(%p, %x) failed",
+               args->queue, msg, args->flags.printable());
+    }
+  }
+  static void ReadTestMessage(MessageArgs *args, char *failure) {
+    const TestMessage *msg = static_cast<const TestMessage *>(
+        args->queue->ReadMessage(args->flags));
+    if (msg == NULL) {
+      if (args->data != -1) {
+        snprintf(failure, kFailureSize,
+                 "expected data of %" PRId16 " but got NULL message",
+                 args->data);
+      }
+    } else {
+      if (args->data != msg->data) {
+        snprintf(failure, kFailureSize,
+                 "expected data of %" PRId16 " but got %" PRId16 " instead",
+                 args->data, msg->data);
+      }
+      args->queue->FreeMessage(msg);
+    }
+  }
+
+  void PushMessage(RawQueue *queue, uint16_t data) {
+    TestMessage *message = static_cast<TestMessage *>(queue->GetMessage());
+    message->data = data;
+    ASSERT_TRUE(queue->WriteMessage(message, RawQueue::kOverride));
+  }
+
+ private:
+  ::aos::testing::TestSharedMemory my_shm_;
+};
+
+char *RawQueueTest::fatal_failure;
+std::map<RawQueueTest::ChildID, RawQueueTest::ForkedProcess *>
+    RawQueueTest::children_;
+constexpr chrono::nanoseconds RawQueueTest::kHangTime;
+constexpr chrono::nanoseconds RawQueueTest::kForkSleep;
+
+typedef RawQueueTest RawQueueDeathTest;
+
+TEST_F(RawQueueTest, Reading) {
+  RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 1);
+  MessageArgs args{queue, RawQueue::kBlock, -1};
+
+  args.flags = RawQueue::kNonBlock;
+  EXPECT_RETURNS(ReadTestMessage, &args);
+  args.flags = RawQueue::kNonBlock | RawQueue::kPeek;
+  EXPECT_RETURNS(ReadTestMessage, &args);
+  args.flags = RawQueue::kBlock;
+  EXPECT_HANGS(ReadTestMessage, &args);
+  args.flags = RawQueue::kPeek | RawQueue::kBlock;
+  EXPECT_HANGS(ReadTestMessage, &args);
+  args.data = 254;
+  args.flags = RawQueue::kBlock;
+  EXPECT_RETURNS(WriteTestMessage, &args);
+  args.flags = RawQueue::kPeek | RawQueue::kBlock;
+  EXPECT_RETURNS(ReadTestMessage, &args);
+  args.flags = RawQueue::kPeek | RawQueue::kBlock;
+  EXPECT_RETURNS(ReadTestMessage, &args);
+  args.flags = RawQueue::kPeek | RawQueue::kNonBlock;
+  EXPECT_RETURNS(ReadTestMessage, &args);
+  args.flags = RawQueue::kBlock;
+  EXPECT_RETURNS(ReadTestMessage, &args);
+  args.flags = RawQueue::kBlock;
+  args.data = -1;
+  EXPECT_HANGS(ReadTestMessage, &args);
+  args.flags = RawQueue::kNonBlock;
+  EXPECT_RETURNS(ReadTestMessage, &args);
+  args.flags = RawQueue::kBlock;
+  args.data = 971;
+  EXPECT_RETURNS_FAILS(ReadTestMessage, &args);
+}
+TEST_F(RawQueueTest, Writing) {
+  RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 1);
+  MessageArgs args{queue, RawQueue::kBlock, 973};
+
+  args.flags = RawQueue::kBlock;
+  EXPECT_RETURNS(WriteTestMessage, &args);
+  args.flags = RawQueue::kBlock;
+  EXPECT_HANGS(WriteTestMessage, &args);
+  args.flags = RawQueue::kNonBlock;
+  EXPECT_RETURNS_FAILS(WriteTestMessage, &args);
+  args.flags = RawQueue::kNonBlock;
+  EXPECT_RETURNS_FAILS(WriteTestMessage, &args);
+  args.flags = RawQueue::kPeek | RawQueue::kBlock;
+  EXPECT_RETURNS(ReadTestMessage, &args);
+  args.data = 971;
+  args.flags = RawQueue::kOverride;
+  EXPECT_RETURNS(WriteTestMessage, &args);
+  args.flags = RawQueue::kOverride;
+  EXPECT_RETURNS(WriteTestMessage, &args);
+  args.flags = RawQueue::kBlock;
+  EXPECT_RETURNS(ReadTestMessage, &args);
+  args.flags = RawQueue::kNonBlock;
+  EXPECT_RETURNS(WriteTestMessage, &args);
+  args.flags = RawQueue::kBlock;
+  EXPECT_RETURNS(ReadTestMessage, &args);
+  args.flags = RawQueue::kOverride;
+  EXPECT_RETURNS(WriteTestMessage, &args);
+  args.flags = RawQueue::kBlock;
+  EXPECT_RETURNS(ReadTestMessage, &args);
+}
+
+TEST_F(RawQueueTest, MultiRead) {
+  RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 1);
+  MessageArgs args{queue, RawQueue::kBlock, 1323};
+
+  args.flags = RawQueue::kBlock;
+  EXPECT_RETURNS(WriteTestMessage, &args);
+  args.flags = RawQueue::kBlock;
+  ASSERT_TRUE(HangsFork(ReadTestMessage, &args, true, 1));
+  ASSERT_TRUE(HangsFork(ReadTestMessage, &args, true, 2));
+  AssertionResult one = HangsCheck(1);
+  AssertionResult two = HangsCheck(2);
+  EXPECT_TRUE(one != two) << "'" <<
+      one.failure_message() << "' vs '" << two.failure_message() << "'";
+  // TODO(brians) finish this
+}
+
+// There used to be a bug where reading first without an index and then with an
+// index would crash. This test makes sure that's fixed.
+TEST_F(RawQueueTest, ReadIndexAndNot) {
+  RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 2);
+
+  // Write a message, read it (with ReadMessage), and then write another
+  // message (before freeing the read one so the queue allocates a distinct
+  // message to use for it).
+  TestMessage *msg = static_cast<TestMessage *>(queue->GetMessage());
+  ASSERT_NE(nullptr, msg);
+  ASSERT_TRUE(queue->WriteMessage(msg, RawQueue::kBlock));
+  const void *read_msg = queue->ReadMessage(RawQueue::kBlock);
+  EXPECT_NE(nullptr, read_msg);
+  msg = static_cast<TestMessage *>(queue->GetMessage());
+  queue->FreeMessage(read_msg);
+  ASSERT_NE(nullptr, msg);
+  ASSERT_TRUE(queue->WriteMessage(msg, RawQueue::kBlock));
+
+  int index = 0;
+  const void *second_read_msg =
+      queue->ReadMessageIndex(RawQueue::kBlock, &index);
+  EXPECT_NE(nullptr, second_read_msg);
+  EXPECT_NE(read_msg, second_read_msg)
+      << "We already took that message out of the queue.";
+}
+
+TEST_F(RawQueueTest, Recycle) {
+  // TODO(brians) basic test of recycle queue
+  // include all of the ways a message can get into the recycle queue
+  RawQueue *recycle_queue = reinterpret_cast<RawQueue *>(23);
+  RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage),
+                                          1, 2, 2, 2, &recycle_queue);
+  ASSERT_NE(reinterpret_cast<RawQueue *>(23), recycle_queue);
+  MessageArgs args{queue, RawQueue::kBlock, 973},
+      recycle{recycle_queue, RawQueue::kBlock, 973};
+
+  args.flags = RawQueue::kBlock;
+  EXPECT_RETURNS(WriteTestMessage, &args);
+  EXPECT_HANGS(ReadTestMessage, &recycle);
+  args.data = 254;
+  EXPECT_RETURNS(WriteTestMessage, &args);
+  EXPECT_HANGS(ReadTestMessage, &recycle);
+  args.data = 971;
+  args.flags = RawQueue::kOverride;
+  EXPECT_RETURNS(WriteTestMessage, &args);
+  recycle.flags = RawQueue::kBlock;
+  EXPECT_RETURNS(ReadTestMessage, &recycle);
+
+  EXPECT_HANGS(ReadTestMessage, &recycle);
+
+  TestMessage *msg = static_cast<TestMessage *>(queue->GetMessage());
+  ASSERT_TRUE(msg != NULL);
+  msg->data = 341;
+  queue->FreeMessage(msg);
+  recycle.data = 341;
+  EXPECT_RETURNS(ReadTestMessage, &recycle);
+
+  EXPECT_HANGS(ReadTestMessage, &recycle);
+
+  args.data = 254;
+  args.flags = RawQueue::kPeek | RawQueue::kBlock;
+  EXPECT_RETURNS(ReadTestMessage, &args);
+  recycle.flags = RawQueue::kBlock;
+  EXPECT_HANGS(ReadTestMessage, &recycle);
+  args.flags = RawQueue::kBlock;
+  EXPECT_RETURNS(ReadTestMessage, &args);
+  recycle.data = 254;
+  EXPECT_RETURNS(ReadTestMessage, &recycle);
+}
+
+// Makes sure that when a message doesn't get written with kNonBlock it does get
+// freed.
+TEST_F(RawQueueTest, NonBlockFailFree) {
+  RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 1);
+
+  void *message1 = queue->GetMessage();
+  void *message2 = queue->GetMessage();
+  ASSERT_TRUE(queue->WriteMessage(message1, RawQueue::kNonBlock));
+  ASSERT_FALSE(queue->WriteMessage(message2, RawQueue::kNonBlock));
+  EXPECT_EQ(message2, queue->GetMessage());
+}
+
+// All of the tests from here down are designed to test every branch to
+// make sure it does what it's supposed to. They are generally pretty repetitive
+// and boring, and some of them may duplicate other tests above, but these ones
+// make it a lot easier to figure out what's wrong with bugs not related to race
+// conditions.
+
+TEST_F(RawQueueTest, ReadIndexNotFull) {
+  RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 2);
+  const TestMessage *message, *peek_message;
+
+  EXPECT_EQ(0, kExtraMessages + 2 - queue->FreeMessages());
+  PushMessage(queue, 971);
+  EXPECT_EQ(1, kExtraMessages + 2 - queue->FreeMessages());
+
+  int index = 0;
+  peek_message = static_cast<const TestMessage *>(
+      queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
+  message = static_cast<const TestMessage *>(
+      queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
+  ASSERT_NE(nullptr, message);
+  EXPECT_EQ(message, peek_message);
+  EXPECT_EQ(971, message->data);
+  EXPECT_EQ(1, index);
+  queue->FreeMessage(message);
+  queue->FreeMessage(peek_message);
+
+  PushMessage(queue, 1768);
+  EXPECT_EQ(2, kExtraMessages + 2 - queue->FreeMessages());
+  peek_message = static_cast<const TestMessage *>(
+      queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
+  message = static_cast<const TestMessage *>(
+      queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
+  ASSERT_NE(nullptr, message);
+  EXPECT_EQ(message, peek_message);
+  EXPECT_EQ(1768, message->data);
+  EXPECT_EQ(2, index);
+  queue->FreeMessage(message);
+  queue->FreeMessage(peek_message);
+
+  PushMessage(queue, 254);
+  peek_message = static_cast<const TestMessage *>(
+      queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
+  message = static_cast<const TestMessage *>(
+      queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
+  ASSERT_NE(nullptr, message);
+  EXPECT_EQ(message, peek_message);
+  EXPECT_EQ(254, message->data);
+  EXPECT_EQ(3, index);
+  queue->FreeMessage(message);
+  queue->FreeMessage(peek_message);
+
+  index = 0;
+  peek_message = static_cast<const TestMessage *>(queue->ReadMessage(
+      RawQueue::kNonBlock | RawQueue::kPeek | RawQueue::kFromEnd));
+  message = static_cast<const TestMessage *>(queue->ReadMessageIndex(
+      RawQueue::kNonBlock | RawQueue::kFromEnd, &index));
+  ASSERT_NE(nullptr, message);
+  EXPECT_EQ(message, peek_message);
+  EXPECT_EQ(254, message->data);
+  EXPECT_EQ(3, index);
+  queue->FreeMessage(message);
+  queue->FreeMessage(peek_message);
+
+  EXPECT_EQ(2, kExtraMessages + 2 - queue->FreeMessages());
+}
+
+TEST_F(RawQueueTest, ReadIndexNotBehind) {
+  RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 2);
+  const TestMessage *message, *peek_message;
+
+  EXPECT_EQ(0, kExtraMessages + 2 - queue->FreeMessages());
+  PushMessage(queue, 971);
+  EXPECT_EQ(1, kExtraMessages + 2 - queue->FreeMessages());
+  PushMessage(queue, 1768);
+  EXPECT_EQ(2, kExtraMessages + 2 - queue->FreeMessages());
+
+  int index = 0;
+  peek_message = static_cast<const TestMessage *>(
+      queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
+  message = static_cast<const TestMessage *>(
+      queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
+  ASSERT_NE(nullptr, message);
+  EXPECT_EQ(message, peek_message);
+  EXPECT_EQ(971, message->data);
+  EXPECT_EQ(1, index);
+  queue->FreeMessage(message);
+  queue->FreeMessage(peek_message);
+
+  index = 0;
+  peek_message = static_cast<const TestMessage *>(queue->ReadMessage(
+      RawQueue::kNonBlock | RawQueue::kPeek | RawQueue::kFromEnd));
+  message = static_cast<const TestMessage *>(queue->ReadMessageIndex(
+      RawQueue::kNonBlock | RawQueue::kFromEnd, &index));
+  ASSERT_NE(nullptr, message);
+  EXPECT_EQ(message, peek_message);
+  EXPECT_EQ(1768, message->data);
+  EXPECT_EQ(2, index);
+  queue->FreeMessage(message);
+}
+
+TEST_F(RawQueueTest, ReadIndexLittleBehindNotFull) {
+  RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 2);
+  const TestMessage *message, *peek_message;
+
+  PushMessage(queue, 971);
+  PushMessage(queue, 1768);
+  ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
+
+  int index = 0;
+
+  peek_message = static_cast<const TestMessage *>(
+      queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
+  message = static_cast<const TestMessage *>(
+      queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
+  ASSERT_NE(nullptr, message);
+  EXPECT_EQ(message, peek_message);
+  EXPECT_EQ(1768, message->data);
+  EXPECT_EQ(2, index);
+  queue->FreeMessage(message);
+  queue->FreeMessage(peek_message);
+
+  index = 0;
+  peek_message = static_cast<const TestMessage *>(queue->ReadMessage(
+      RawQueue::kNonBlock | RawQueue::kPeek | RawQueue::kFromEnd));
+  message = static_cast<const TestMessage *>(queue->ReadMessageIndex(
+      RawQueue::kNonBlock | RawQueue::kFromEnd, &index));
+  ASSERT_NE(nullptr, message);
+  EXPECT_EQ(message, peek_message);
+  EXPECT_EQ(1768, message->data);
+  EXPECT_EQ(2, index);
+  queue->FreeMessage(message);
+}
+
+TEST_F(RawQueueTest, ReadIndexMoreBehind) {
+  RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 2);
+  const TestMessage *message, *peek_message;
+
+  PushMessage(queue, 971);
+  PushMessage(queue, 1768);
+  ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
+  PushMessage(queue, 254);
+
+  int index = 0;
+
+  peek_message = static_cast<const TestMessage *>(
+      queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
+  message = static_cast<const TestMessage *>(
+      queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
+  ASSERT_NE(nullptr, message);
+  EXPECT_EQ(message, peek_message);
+  EXPECT_EQ(1768, message->data);
+  EXPECT_EQ(2, index);
+  queue->FreeMessage(message);
+  queue->FreeMessage(peek_message);
+
+  peek_message = static_cast<const TestMessage *>(
+      queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
+  message = static_cast<const TestMessage *>(
+      queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
+  ASSERT_NE(nullptr, message);
+  EXPECT_EQ(message, peek_message);
+  EXPECT_EQ(254, message->data);
+  EXPECT_EQ(3, index);
+  queue->FreeMessage(message);
+  queue->FreeMessage(peek_message);
+
+  index = 0;
+  peek_message = static_cast<const TestMessage *>(queue->ReadMessage(
+      RawQueue::kNonBlock | RawQueue::kPeek | RawQueue::kFromEnd));
+  message = static_cast<const TestMessage *>(queue->ReadMessageIndex(
+      RawQueue::kNonBlock | RawQueue::kFromEnd, &index));
+  ASSERT_NE(nullptr, message);
+  EXPECT_EQ(message, peek_message);
+  EXPECT_EQ(254, message->data);
+  EXPECT_EQ(3, index);
+  queue->FreeMessage(message);
+}
+
+TEST_F(RawQueueTest, ReadIndexMoreBehindNotFull) {
+  RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 2);
+  const TestMessage *message, *peek_message;
+
+  PushMessage(queue, 971);
+  PushMessage(queue, 1768);
+  ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
+  PushMessage(queue, 254);
+  ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
+
+  int index = 0;
+
+  peek_message = static_cast<const TestMessage *>(
+      queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
+  message = static_cast<const TestMessage *>(
+      queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
+  ASSERT_NE(nullptr, message);
+  EXPECT_EQ(message, peek_message);
+  EXPECT_EQ(254, message->data);
+  EXPECT_EQ(3, index);
+  queue->FreeMessage(message);
+  queue->FreeMessage(peek_message);
+
+  index = 0;
+  peek_message = static_cast<const TestMessage *>(queue->ReadMessage(
+      RawQueue::kNonBlock | RawQueue::kPeek | RawQueue::kFromEnd));
+  message = static_cast<const TestMessage *>(queue->ReadMessageIndex(
+      RawQueue::kNonBlock | RawQueue::kFromEnd, &index));
+  ASSERT_NE(nullptr, message);
+  EXPECT_EQ(message, peek_message);
+  EXPECT_EQ(254, message->data);
+  EXPECT_EQ(3, index);
+  queue->FreeMessage(message);
+}
+
+TEST_F(RawQueueTest, ReadIndexLotBehind) {
+  RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 2);
+  const TestMessage *message, *peek_message;
+
+  PushMessage(queue, 971);
+  PushMessage(queue, 1768);
+  {
+    const void *message1, *message2;
+    message1 = queue->ReadMessage(RawQueue::kNonBlock);
+    ASSERT_NE(nullptr, message1);
+    PushMessage(queue, 254);
+    message2 = queue->ReadMessage(RawQueue::kNonBlock);
+    ASSERT_NE(nullptr, message2);
+    PushMessage(queue, 973);
+    EXPECT_EQ(4, kExtraMessages + 2 - queue->FreeMessages());
+    queue->FreeMessage(message1);
+    EXPECT_EQ(3, kExtraMessages + 2 - queue->FreeMessages());
+    queue->FreeMessage(message2);
+    EXPECT_EQ(2, kExtraMessages + 2 - queue->FreeMessages());
+  }
+
+  int index = 0;
+
+  peek_message = static_cast<const TestMessage *>(
+      queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
+  message = static_cast<const TestMessage *>(
+      queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
+  ASSERT_NE(nullptr, message);
+  EXPECT_EQ(message, peek_message);
+  EXPECT_EQ(254, message->data);
+  EXPECT_EQ(3, index);
+  queue->FreeMessage(message);
+  queue->FreeMessage(peek_message);
+
+  peek_message = static_cast<const TestMessage *>(
+      queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
+  message = static_cast<const TestMessage *>(
+      queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
+  ASSERT_NE(nullptr, message);
+  EXPECT_EQ(message, peek_message);
+  EXPECT_EQ(973, message->data);
+  EXPECT_EQ(4, index);
+  queue->FreeMessage(message);
+  queue->FreeMessage(peek_message);
+
+  index = 0;
+  peek_message = static_cast<const TestMessage *>(queue->ReadMessage(
+      RawQueue::kNonBlock | RawQueue::kPeek | RawQueue::kFromEnd));
+  message = static_cast<const TestMessage *>(queue->ReadMessageIndex(
+      RawQueue::kNonBlock | RawQueue::kFromEnd, &index));
+  ASSERT_NE(nullptr, message);
+  EXPECT_EQ(message, peek_message);
+  EXPECT_EQ(973, message->data);
+  EXPECT_EQ(4, index);
+  queue->FreeMessage(message);
+}
+
+TEST_F(RawQueueTest, ReadIndexLotBehindNotFull) {
+  RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 2);
+  const TestMessage *message, *peek_message;
+
+  PushMessage(queue, 971);
+  PushMessage(queue, 1768);
+  ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
+  PushMessage(queue, 254);
+  ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
+  PushMessage(queue, 973);
+  ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
+
+  int index = 0;
+
+  peek_message = static_cast<const TestMessage *>(
+      queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
+  message = static_cast<const TestMessage *>(
+      queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
+  ASSERT_NE(nullptr, message);
+  EXPECT_EQ(message, peek_message);
+  EXPECT_EQ(973, message->data);
+  EXPECT_EQ(4, index);
+  queue->FreeMessage(message);
+  queue->FreeMessage(peek_message);
+
+  index = 0;
+  peek_message = static_cast<const TestMessage *>(queue->ReadMessage(
+      RawQueue::kNonBlock | RawQueue::kPeek | RawQueue::kFromEnd));
+  message = static_cast<const TestMessage *>(queue->ReadMessageIndex(
+      RawQueue::kNonBlock | RawQueue::kFromEnd, &index));
+  ASSERT_NE(nullptr, message);
+  EXPECT_EQ(message, peek_message);
+  EXPECT_EQ(973, message->data);
+  EXPECT_EQ(4, index);
+  queue->FreeMessage(message);
+}
+
+TEST_F(RawQueueTest, ReadIndexEvenMoreBehind) {
+  RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 2);
+  const TestMessage *message, *peek_message;
+
+  PushMessage(queue, 971);
+  PushMessage(queue, 1768);
+  ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
+  PushMessage(queue, 254);
+  ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
+  PushMessage(queue, 973);
+  ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
+  PushMessage(queue, 1114);
+
+  int index = 0;
+
+  peek_message = static_cast<const TestMessage *>(
+      queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
+  message = static_cast<const TestMessage *>(
+      queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
+  ASSERT_NE(nullptr, message);
+  EXPECT_EQ(message, peek_message);
+  EXPECT_EQ(973, message->data);
+  EXPECT_EQ(4, index);
+  queue->FreeMessage(message);
+  queue->FreeMessage(peek_message);
+
+  peek_message = static_cast<const TestMessage *>(
+      queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
+  message = static_cast<const TestMessage *>(
+      queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
+  ASSERT_NE(nullptr, message);
+  EXPECT_EQ(message, peek_message);
+  EXPECT_EQ(1114, message->data);
+  EXPECT_EQ(5, index);
+  queue->FreeMessage(message);
+  queue->FreeMessage(peek_message);
+
+  index = 0;
+  peek_message = static_cast<const TestMessage *>(queue->ReadMessage(
+      RawQueue::kNonBlock | RawQueue::kPeek | RawQueue::kFromEnd));
+  message = static_cast<const TestMessage *>(queue->ReadMessageIndex(
+      RawQueue::kNonBlock | RawQueue::kFromEnd, &index));
+  ASSERT_NE(nullptr, message);
+  EXPECT_EQ(message, peek_message);
+  EXPECT_EQ(1114, message->data);
+  EXPECT_EQ(5, index);
+  queue->FreeMessage(message);
+}
+
+TEST_F(RawQueueTest, ReadIndexEvenMoreBehindNotFull) {
+  RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 2);
+  const TestMessage *message, *peek_message;
+
+  PushMessage(queue, 971);
+  PushMessage(queue, 1768);
+  ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
+  PushMessage(queue, 254);
+  ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
+  PushMessage(queue, 973);
+  ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
+  PushMessage(queue, 1114);
+  ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
+
+  int index = 0;
+
+  peek_message = static_cast<const TestMessage *>(
+      queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
+  message = static_cast<const TestMessage *>(
+      queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
+  ASSERT_NE(nullptr, message);
+  EXPECT_EQ(message, peek_message);
+  EXPECT_EQ(1114, message->data);
+  EXPECT_EQ(5, index);
+  queue->FreeMessage(message);
+  queue->FreeMessage(peek_message);
+
+  index = 0;
+  peek_message = static_cast<const TestMessage *>(queue->ReadMessage(
+      RawQueue::kNonBlock | RawQueue::kPeek | RawQueue::kFromEnd));
+  message = static_cast<const TestMessage *>(queue->ReadMessageIndex(
+      RawQueue::kNonBlock | RawQueue::kFromEnd, &index));
+  ASSERT_NE(nullptr, message);
+  EXPECT_EQ(message, peek_message);
+  EXPECT_EQ(1114, message->data);
+  EXPECT_EQ(5, index);
+  queue->FreeMessage(message);
+}
+
+TEST_F(RawQueueTest, MessageReferenceCounts) {
+  RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 2);
+  const void *message1, *message2;
+
+  EXPECT_EQ(0, kExtraMessages + 2 - queue->FreeMessages());
+  message1 = queue->GetMessage();
+  EXPECT_NE(nullptr, message1);
+  EXPECT_EQ(1, kExtraMessages + 2 - queue->FreeMessages());
+  message2 = queue->GetMessage();
+  EXPECT_NE(nullptr, message2);
+  EXPECT_EQ(2, kExtraMessages + 2 - queue->FreeMessages());
+  queue->FreeMessage(message1);
+  EXPECT_EQ(1, kExtraMessages + 2 - queue->FreeMessages());
+  queue->FreeMessage(message2);
+  EXPECT_EQ(0, kExtraMessages + 2 - queue->FreeMessages());
+}
+
+// Tests that writing with kNonBlock fails and frees the message.
+TEST_F(RawQueueTest, WriteDontBlock) {
+  RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 1);
+  void *message;
+
+  PushMessage(queue, 971);
+  int free_before = queue->FreeMessages();
+  message = queue->GetMessage();
+  ASSERT_NE(nullptr, message);
+  EXPECT_NE(free_before, queue->FreeMessages());
+  EXPECT_FALSE(queue->WriteMessage(message, RawQueue::kNonBlock));
+  EXPECT_EQ(free_before, queue->FreeMessages());
+}
+
+// Tests that writing with kOverride pushes the last message out of the queue.
+TEST_F(RawQueueTest, WriteOverride) {
+  RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 2);
+  TestMessage *message1;
+
+  PushMessage(queue, 971);
+  PushMessage(queue, 1768);
+  int free_before = queue->FreeMessages();
+  message1 = static_cast<TestMessage *>(queue->GetMessage());
+  ASSERT_NE(nullptr, message1);
+  EXPECT_NE(free_before, queue->FreeMessages());
+  message1->data = 254;
+  EXPECT_TRUE(queue->WriteMessage(message1, RawQueue::kOverride));
+  EXPECT_EQ(free_before, queue->FreeMessages());
+
+  const TestMessage *message2;
+  message2 =
+      static_cast<const TestMessage *>(queue->ReadMessage(RawQueue::kNonBlock));
+  EXPECT_EQ(1768, message2->data);
+  queue->FreeMessage(message2);
+  EXPECT_EQ(free_before + 1, queue->FreeMessages());
+  message2 =
+      static_cast<const TestMessage *>(queue->ReadMessage(RawQueue::kNonBlock));
+  EXPECT_EQ(254, message2->data);
+  queue->FreeMessage(message2);
+  EXPECT_EQ(free_before + 2, queue->FreeMessages());
+}
+
+// Makes sure that ThreadSanitizer doesn't catch any issues freeing from
+// multiple threads at once.
+TEST_F(RawQueueTest, MultiThreadedFree) {
+  RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 1);
+  PushMessage(queue, 971);
+  int free_before = queue->FreeMessages();
+
+  const void *const message1 =
+      queue->ReadMessage(RawQueue::kPeek | RawQueue::kNonBlock);
+  const void *const message2 =
+      queue->ReadMessage(RawQueue::kPeek | RawQueue::kNonBlock);
+  ASSERT_NE(nullptr, message1);
+  ASSERT_NE(nullptr, message2);
+  EXPECT_EQ(free_before, queue->FreeMessages());
+  util::FunctionThread t1([message1, queue](util::Thread *) {
+    queue->FreeMessage(message1);
+  });
+  util::FunctionThread t2([message2, queue](util::Thread *) {
+    queue->FreeMessage(message2);
+  });
+  t1.Start();
+  t2.Start();
+  t1.WaitUntilDone();
+  t2.WaitUntilDone();
+  EXPECT_EQ(free_before, queue->FreeMessages());
+}
+
+TEST_F(RawQueueDeathTest, OptionsValidation) {
+  RawQueue *const queue = RawQueue::Fetch("Queue", 1, 1, 1);
+
+  EXPECT_DEATH(
+      {
+        logging::AddImplementation(new util::DeathTestLogImplementation());
+        queue->WriteMessage(nullptr, RawQueue::kPeek);
+      },
+      ".*illegal write option.*");
+  EXPECT_DEATH(
+      {
+        logging::AddImplementation(new util::DeathTestLogImplementation());
+        queue->WriteMessage(nullptr, RawQueue::kFromEnd);
+      },
+      ".*illegal write option.*");
+  EXPECT_DEATH(
+      {
+        logging::AddImplementation(new util::DeathTestLogImplementation());
+        queue->WriteMessage(nullptr, RawQueue::kPeek | RawQueue::kFromEnd);
+      },
+      ".*illegal write option.*");
+  EXPECT_DEATH(
+      {
+        logging::AddImplementation(new util::DeathTestLogImplementation());
+        queue->WriteMessage(nullptr, RawQueue::kNonBlock | RawQueue::kBlock);
+      },
+      ".*invalid write option.*");
+
+  EXPECT_DEATH(
+      {
+        logging::AddImplementation(new util::DeathTestLogImplementation());
+        queue->ReadMessageIndex(
+            RawQueue::kBlock | RawQueue::kFromEnd | RawQueue::kPeek, nullptr);
+      },
+      ".*ReadMessageIndex.*is not allowed.*");
+  EXPECT_DEATH(
+      {
+        logging::AddImplementation(new util::DeathTestLogImplementation());
+        queue->ReadMessageIndex(RawQueue::kOverride, nullptr);
+      },
+      ".*illegal read option.*");
+  EXPECT_DEATH(
+      {
+        logging::AddImplementation(new util::DeathTestLogImplementation());
+        queue->ReadMessageIndex(RawQueue::kOverride | RawQueue::kBlock,
+                                nullptr);
+      },
+      ".*illegal read option.*");
+  EXPECT_DEATH(
+      {
+        logging::AddImplementation(new util::DeathTestLogImplementation());
+        queue->ReadMessage(RawQueue::kNonBlock | RawQueue::kBlock);
+      },
+      ".*invalid read option.*");
+}
+
+}  // namespace testing
+}  // namespace aos
diff --git a/aos/ipc_lib/shared_mem.c b/aos/ipc_lib/shared_mem.c
new file mode 100644
index 0000000..e8edc9e
--- /dev/null
+++ b/aos/ipc_lib/shared_mem.c
@@ -0,0 +1,141 @@
+#include "aos/ipc_lib/shared_mem.h"
+
+#include <stdio.h>
+#include <string.h>
+#include <sys/mman.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <sys/types.h>
+#include <errno.h>
+#include <stdlib.h>
+#include <assert.h>
+
+#include "aos/ipc_lib/core_lib.h"
+#include "aos/logging/logging.h"
+#include "aos/ipc_lib/aos_sync.h"
+
+// the path for the shared memory segment. see shm_open(3) for restrictions
+#define AOS_SHM_NAME "/aos_shared_mem"
+// Size of the shared mem segment.
+// This must fit in the tmpfs for /dev/shm/
+#define SIZEOFSHMSEG (4096 * 0x3000)
+
+void init_shared_mem_core(aos_shm_core *shm_core) {
+  memset(&shm_core->time_offset, 0 , sizeof(shm_core->time_offset));
+  memset(&shm_core->msg_alloc_lock, 0, sizeof(shm_core->msg_alloc_lock));
+  shm_core->queues.pointer = NULL;
+  memset(&shm_core->queues.lock, 0, sizeof(shm_core->queues.lock));
+  shm_core->queue_types.pointer = NULL;
+  memset(&shm_core->queue_types.lock, 0, sizeof(shm_core->queue_types.lock));
+}
+
+ptrdiff_t aos_core_get_mem_usage(void) {
+  return global_core->size -
+      ((ptrdiff_t)global_core->mem_struct->msg_alloc -
+       (ptrdiff_t)global_core->mem_struct);
+}
+
+struct aos_core *global_core = NULL;
+
+// TODO(brians): madvise(2) it to put this shm in core dumps.
+void aos_core_create_shared_mem(int create, int lock) {
+  assert(global_core == NULL);
+  static struct aos_core global_core_data;
+  global_core = &global_core_data;
+
+  {
+    char *shm_name = getenv("AOS_SHM_NAME");
+    if (shm_name == NULL) {
+      global_core->shm_name = AOS_SHM_NAME;
+    } else {
+      printf("AOS_SHM_NAME defined, using %s\n", shm_name);
+      global_core->shm_name = shm_name;
+    }
+  }
+
+  int shm;
+  if (create) {
+    while (1) {
+      shm = shm_open(global_core->shm_name, O_RDWR | O_CREAT | O_EXCL, 0666);
+      global_core->owner = 1;
+      if (shm == -1 && errno == EEXIST) {
+        printf("shared_mem: going to shm_unlink(%s)\n", global_core->shm_name);
+        if (shm_unlink(global_core->shm_name) == -1) {
+          PLOG(WARNING, "shm_unlink(%s) failed", global_core->shm_name);
+          break;
+        }
+      } else {
+        break;
+      }
+    }
+  } else {
+    shm = shm_open(global_core->shm_name, O_RDWR, 0);
+    global_core->owner = 0;
+  }
+  if (shm == -1) {
+    PLOG(FATAL, "shm_open(%s, O_RDWR [| O_CREAT | O_EXCL, 0|0666) failed",
+         global_core->shm_name);
+  }
+  if (global_core->owner) {
+    if (ftruncate(shm, SIZEOFSHMSEG) == -1) {
+      PLOG(FATAL, "fruncate(%d, 0x%zx) failed", shm, (size_t)SIZEOFSHMSEG);
+    }
+  }
+  int flags = MAP_SHARED | MAP_FIXED;
+  if (lock) flags |= MAP_LOCKED | MAP_POPULATE;
+  void *shm_address = mmap((void *)SHM_START, SIZEOFSHMSEG,
+                           PROT_READ | PROT_WRITE, flags, shm, 0);
+  if (shm_address == MAP_FAILED) {
+    PLOG(FATAL, "shared_mem: mmap(%p, 0x%zx, stuff, %x, %d, 0) failed",
+         (void *)SHM_START, (size_t)SIZEOFSHMSEG, flags, shm);
+  }
+  if (create) {
+    printf("shared_mem: creating %s, shm at: %p\n", global_core->shm_name,
+           shm_address);
+  } else {
+    printf("shared_mem: not creating, shm at: %p\n", shm_address);
+  }
+  if (close(shm) == -1) {
+    PLOG(WARNING, "close(%d(=shm) failed", shm);
+  }
+  if (shm_address != (void *)SHM_START) {
+    LOG(FATAL, "shm isn't at hard-coded %p. at %p instead\n",
+        (void *)SHM_START, shm_address);
+  }
+  aos_core_use_address_as_shared_mem(shm_address, SIZEOFSHMSEG);
+  LOG(INFO, "shared_mem: end of create_shared_mem owner=%d\n",
+          global_core->owner);
+}
+
+void aos_core_use_address_as_shared_mem(void *address, size_t size) {
+  global_core->mem_struct = address;
+  global_core->size = size;
+  global_core->shared_mem =
+      (uint8_t *)address + sizeof(*global_core->mem_struct);
+  if (global_core->owner) {
+    global_core->mem_struct->msg_alloc = (uint8_t *)address + global_core->size;
+    init_shared_mem_core(global_core->mem_struct);
+    futex_set(&global_core->mem_struct->creation_condition);
+  } else {
+    if (futex_wait(&global_core->mem_struct->creation_condition) != 0) {
+      LOG(FATAL, "waiting on creation_condition failed\n");
+    }
+  }
+}
+
+void aos_core_free_shared_mem() {
+  void *shm_address = global_core->shared_mem;
+  if (munmap((void *)SHM_START, SIZEOFSHMSEG) == -1) {
+    PLOG(FATAL, "munmap(%p, 0x%zx) failed", shm_address,
+         (size_t)SIZEOFSHMSEG);
+  }
+  if (global_core->owner) {
+    if (shm_unlink(global_core->shm_name)) {
+      PLOG(FATAL, "shared_mem: shm_unlink(%s) failed", global_core->shm_name);
+    }
+  }
+}
+
+int aos_core_is_init(void) {
+  return global_core != NULL;
+}
diff --git a/aos/ipc_lib/shared_mem.h b/aos/ipc_lib/shared_mem.h
new file mode 100644
index 0000000..edb9091
--- /dev/null
+++ b/aos/ipc_lib/shared_mem.h
@@ -0,0 +1,39 @@
+#ifndef _SHARED_MEM_H_
+#define _SHARED_MEM_H_
+
+#include <stddef.h>
+#include <unistd.h>
+#include <time.h>
+
+#include "aos/ipc_lib/shared_mem_types.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+void init_shared_mem_core(aos_shm_core *shm_core);
+
+ptrdiff_t aos_core_get_mem_usage(void);
+
+// Takes the specified memory address and uses it as the shared memory.
+// address is the memory address, and size is the size of the memory.
+// global_core needs to point to an instance of struct aos_core, and owner
+// should be set correctly there.
+// The owner should verify that the first sizeof(mutex) of data is set to 0
+// before passing the memory to this function.
+void aos_core_use_address_as_shared_mem(void *address, size_t size);
+
+// create is true to remove any existing shm to create a fresh one or false to
+// fail if it does not already exist.
+// lock is true to lock shared memory into RAM or false to not.
+void aos_core_create_shared_mem(int create, int lock);
+void aos_core_free_shared_mem(void);
+
+// Returns whether or not the shared memory system is active.
+int aos_core_is_init(void);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/aos/ipc_lib/shared_mem_types.h b/aos/ipc_lib/shared_mem_types.h
new file mode 100644
index 0000000..f8555d5
--- /dev/null
+++ b/aos/ipc_lib/shared_mem_types.h
@@ -0,0 +1,64 @@
+#ifndef AOS_IPC_LIB_SHARED_MEM_TYPES_H_
+#define AOS_IPC_LIB_SHARED_MEM_TYPES_H_
+
+#include <stddef.h>
+
+#include "aos/ipc_lib/aos_sync.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+extern struct aos_core *global_core __attribute__((weak));
+
+// Where the shared memory segment starts in each process's address space.
+// Has to be the same in all of them so that stuff in shared memory
+// can have regular pointers to other stuff in shared memory.
+#define SHM_START 0x20000000
+
+// A structure that represents some kind of global pointer that everything
+// shares.
+typedef struct aos_global_pointer_t {
+  struct aos_mutex lock;
+  void *pointer;
+} aos_global_pointer;
+
+typedef struct aos_shm_core_t {
+  // Gets 0-initialized at the start (as part of shared memory) and
+  // the owner sets as soon as it finishes setting stuff up.
+  aos_condition creation_condition;
+
+  // An offset from CLOCK_MONOTONIC to times for all the code in nanoseconds.
+  // This is currently only set to non-zero by the log replay code.
+  // There is no synchronization on this to avoid the overhead because it is
+  // only updated with proper memory barriers when only a single task is
+  // running.
+  int64_t time_offset;
+
+  struct aos_mutex msg_alloc_lock;
+  void *msg_alloc;
+
+  // A pointer to the head of the linked list of queues.
+  // pointer points to a ::aos::Queue.
+  aos_global_pointer queues;
+  // A pointer to the head of the linked list of queue message types.
+  // pointer points to a ::aos::type_cache::ShmType.
+  aos_global_pointer queue_types;
+} aos_shm_core;
+
+struct aos_core {
+  // Non-0 if we "own" shared_mem and should shm_unlink(3) it when we're done.
+  int owner;
+  void *shared_mem;
+  // How large the chunk of shared memory is.
+  ptrdiff_t size;
+  aos_shm_core *mem_struct;
+  // For the owner to store the name of the file to unlink when closing.
+  const char *shm_name;
+};
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif  // AOS_IPC_LIB_SHARED_MEM_TYPES_H_
diff --git a/aos/ipc_lib/unique_message_ptr.h b/aos/ipc_lib/unique_message_ptr.h
new file mode 100644
index 0000000..e30a4c0
--- /dev/null
+++ b/aos/ipc_lib/unique_message_ptr.h
@@ -0,0 +1,39 @@
+#include <memory>
+
+#include "aos/atom_code/ipc_lib/queue.h"
+
+namespace aos {
+namespace internal {
+
+template<typename T>
+class queue_free {
+ public:
+  queue_free(RawQueue *queue) : queue_(queue) {}
+
+  void operator()(const T *message) {
+    queue_->FreeMessage(static_cast<const void *>(message));
+  }
+
+ private:
+  RawQueue *const queue_;
+};
+
+}  // namespace internal
+
+template<typename T>
+class unique_message_ptr : public ::std::unique_ptr<T, ::aos::internal::queue_free<T>> {
+ public:
+  unique_message_ptr(RawQueue *queue, T *message = NULL)
+      : ::std::unique_ptr<T, ::aos::internal::queue_free<T>>(message, ::aos::internal::queue_free<T>(queue)) {}
+
+  // Perfectly forward this so that the move functionality of ::std::unique_ptr
+  // works.
+  template <typename... Args>
+  unique_message_ptr<T> &operator=(Args &&... args) {
+        ::std::unique_ptr<T, ::aos::internal::queue_free<T>>::operator=(
+            ::std::forward<Args>(args)...);
+        return *this;
+  }
+};
+
+}  // namespace aos