glibusb wants a nice condition variable class
diff --git a/aos/atom_code/atom_code.gyp b/aos/atom_code/atom_code.gyp
index 37caccd..51dcaec 100644
--- a/aos/atom_code/atom_code.gyp
+++ b/aos/atom_code/atom_code.gyp
@@ -7,7 +7,7 @@
         '<(AOS)/atom_code/init.cc',
       ],
       'dependencies': [
-        '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:ipc_lib',
+        '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:shared_mem',
         '<(AOS)/common/common.gyp:die',
         '<(AOS)/build/aos.gyp:logging',
       ],
diff --git a/aos/atom_code/camera/Buffers.cpp b/aos/atom_code/camera/Buffers.cpp
index f580f89..9cd1a7d 100644
--- a/aos/atom_code/camera/Buffers.cpp
+++ b/aos/atom_code/camera/Buffers.cpp
@@ -15,7 +15,6 @@
 };
 const std::string Buffers::kFDServerName("/tmp/aos_fd_server");
 const std::string Buffers::kQueueName("CameraBufferQueue");
-const aos_type_sig Buffers::kSignature{sizeof(Message), 971, 1};
 
 int Buffers::CreateSocket(int (*bind_connect)(int, const sockaddr *, socklen_t)) {
   union af_unix_sockaddr {
@@ -66,7 +65,7 @@
 
 void Buffers::Release() {
   if (message_ != NULL) {
-    aos_queue_free_msg(queue_, message_);
+    queue_->FreeMessage(message_);
     message_ = NULL;
   }
 }
@@ -77,11 +76,12 @@
   // TODO(brians) make sure the camera reader process hasn't died
   do {
     if (block) {
-      message_ = static_cast<const Message *>(aos_queue_read_msg(queue_, PEEK | BLOCK));
+      message_ = static_cast<const Message *>(queue_->ReadMessage(
+              RawQueue::kPeek | RawQueue::kBlock));
     } else {
       static int index = 0;
-      message_ = static_cast<const Message *>(aos_queue_read_msg_index(queue_, BLOCK,
-                                                                       &index));
+      message_ = static_cast<const Message *>(queue_->ReadMessageIndex(
+              RawQueue::kBlock, &index));
     }
   } while (block && message_ == NULL);
   if (message_ != NULL) {
@@ -137,7 +137,7 @@
 }
 Buffers::Buffers() : server_(CreateSocket(connect)), fd_(FetchFD()), message_(NULL) {
   MMap();
-  queue_ = aos_fetch_queue(kQueueName.c_str(), &kSignature);
+  queue_ = RawQueue::Fetch(kQueueName.c_str(), sizeof(Message), 971, 1);
 }
 
 Buffers::~Buffers() {
@@ -157,6 +157,5 @@
   }
 }
 
-} // namespace camera
-} // namespace aos
-
+}  // namespace camera
+}  // namespace aos
diff --git a/aos/atom_code/camera/Buffers.h b/aos/atom_code/camera/Buffers.h
index 6b54188..080e9eb 100644
--- a/aos/atom_code/camera/Buffers.h
+++ b/aos/atom_code/camera/Buffers.h
@@ -53,9 +53,8 @@
   // The current one. Sometimes NULL.
   const Message *message_;
   static const std::string kQueueName;
-  static const aos_type_sig kSignature;
   // NULL for the Reader one.
-  aos_queue *queue_;
+  RawQueue *queue_;
   // Make the actual mmap calls.
   // Called by Buffers() automatically.
   void MMap();
diff --git a/aos/atom_code/camera/Reader.cpp b/aos/atom_code/camera/Reader.cpp
index 5f30cfe..95f6128 100644
--- a/aos/atom_code/camera/Reader.cpp
+++ b/aos/atom_code/camera/Reader.cpp
@@ -17,6 +17,7 @@
 #include "aos/atom_code/camera/V4L2.h"
 #include "aos/atom_code/camera/Buffers.h"
 #include "aos/common/logging/logging.h"
+#include "aos/atom_code/ipc_lib/queue.h"
 
 #define CLEAR(x) memset(&(x), 0, sizeof(x))
 
@@ -31,8 +32,7 @@
   // the bound socket listening for fd requests
   int server_fd_;
 
-  static const aos_type_sig kRecycleSignature;
-  aos_queue *queue_, *recycle_queue_;
+  RawQueue *queue_, *recycle_queue_;
   // the number of buffers currently queued in v4l2
   uint32_t queued_;
  public:
@@ -52,10 +52,11 @@
               dev_name, errno, strerror(errno));
     }
 
-    queue_ = aos_fetch_queue_recycle(Buffers::kQueueName.c_str(), &Buffers::kSignature,
-                                     &kRecycleSignature, &recycle_queue_);
+    queue_ = RawQueue::Fetch(Buffers::kQueueName.c_str(),
+                          sizeof(Buffers::Message), 971, 1,
+                          1, Buffers::kNumBuffers, &recycle_queue_);
     // read off any existing recycled messages
-    while (aos_queue_read_msg(recycle_queue_, NON_BLOCK) != NULL);
+    while (recycle_queue_->ReadMessage(RawQueue::kNonBlock) != NULL);
     queued_ = 0;
 
     InitServer();
@@ -140,10 +141,11 @@
       read = static_cast<const Buffers::Message *>(
           // we block waiting for one if we can't dequeue one without leaving
           // the driver <= 2 (to be safe)
-          aos_queue_read_msg(recycle_queue_, (queued_ <= 2) ? BLOCK : NON_BLOCK));
+          recycle_queue_->ReadMessage((queued_ <= 2) ?
+                                      RawQueue::kBlock : RawQueue::kNonBlock));
       if (read != NULL) {
         buf.index = read->index;
-        aos_queue_free_msg(recycle_queue_, read);
+        recycle_queue_->FreeMessage(read);
         QueueBuffer(&buf);
       }
     } while (read != NULL);
@@ -163,7 +165,7 @@
     }
 
     Buffers::Message *const msg = static_cast<Buffers::Message *>(
-        aos_queue_get_msg(queue_));
+        queue_->GetMessage());
     if (msg == NULL) {
       LOG(WARNING,
           "couldn't get a message to send buf #%" PRIu32 " from queue %p."
@@ -175,7 +177,7 @@
     msg->bytesused = buf.bytesused;
     memcpy(&msg->timestamp, &buf.timestamp, sizeof(msg->timestamp));
     msg->sequence = buf.sequence;
-    if (aos_queue_write_msg_free(queue_, msg, OVERRIDE) == -1) {
+    if (!queue_->WriteMessage(msg, RawQueue::kOverride)) {
       LOG(WARNING,
           "sending message %p with buf #%" PRIu32 " to queue %p failed."
           " re-queueing now\n", msg, buf.index, queue_);
@@ -405,8 +407,6 @@
   }
 };
 const char *const Reader::dev_name = "/dev/video0";
-const aos_type_sig Reader::kRecycleSignature{
-  sizeof(Buffers::Message), 1, Buffers::kNumBuffers};
 
 } // namespace camera
 } // namespace aos
diff --git a/aos/atom_code/camera/camera.gyp b/aos/atom_code/camera/camera.gyp
index 9931806..f1743be 100644
--- a/aos/atom_code/camera/camera.gyp
+++ b/aos/atom_code/camera/camera.gyp
@@ -45,11 +45,11 @@
         'Buffers.cpp',
       ],
       'dependencies': [
-        '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:ipc_lib',
+        '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:queue',
         '<(AOS)/build/aos.gyp:logging',
       ],
       'export_dependent_settings': [
-        '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:ipc_lib',
+        '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:queue',
       ],
     },
     {
@@ -74,6 +74,7 @@
         'buffers',
         '<(AOS)/atom_code/atom_code.gyp:init',
         '<(AOS)/build/aos.gyp:logging',
+        '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:queue',
       ],
     },
   ],
diff --git a/aos/atom_code/core/BinaryLogReader.cpp b/aos/atom_code/core/BinaryLogReader.cpp
index 9451fd1..3e96cc5 100644
--- a/aos/atom_code/core/BinaryLogReader.cpp
+++ b/aos/atom_code/core/BinaryLogReader.cpp
@@ -97,7 +97,7 @@
     output->sequence = msg->sequence;
     memcpy(output_strings, msg->name, name_size);
     memcpy(output_strings + name_size, msg->message, message_size);
-    condition_set(&output->marker);
+    futex_set(&output->marker);
 
     logging::atom::Free(msg);
   }
diff --git a/aos/atom_code/core/LogFileCommon.h b/aos/atom_code/core/LogFileCommon.h
index afb86b0..3798b06 100644
--- a/aos/atom_code/core/LogFileCommon.h
+++ b/aos/atom_code/core/LogFileCommon.h
@@ -28,13 +28,13 @@
 // A lot of the fields don't have comments because they're the same as the
 // identically named fields in LogMessage.
 struct __attribute__((aligned)) LogFileMessageHeader {
-  // gets condition_set once this one has been written
+  // gets futex_set once this one has been written
   // for readers keeping up with a live writer
   //
   // gets initialized to 0 by ftruncate
   // 
   // there will be something here after the last log on a "page" set to 2
-  // (by the condition_set) to indicate that the next log is on the next page
+  // (by the futex_set) to indicate that the next log is on the next page
   mutex marker;
   static_assert(sizeof(marker) == 4, "mutex changed size!");
   log_level level;
@@ -132,8 +132,8 @@
         sizeof(mutex) > kPageSize) {
       char *const temp = current_;
       MapNextPage();
-      if (condition_set_value(reinterpret_cast<mutex *>(&temp[position_]), 2) == -1) {
-        fprintf(stderr, "LogFileCommon: condition_set_value(%p, 2) failed with %d: %s."
+      if (futex_set_value(reinterpret_cast<mutex *>(&temp[position_]), 2) == -1) {
+        fprintf(stderr, "LogFileCommon: futex_set_value(%p, 2) failed with %d: %s."
                 " readers will hang\n", &temp[position_], errno, strerror(errno));
       }
       Unmap(temp);
@@ -152,7 +152,7 @@
     do {
       r = reinterpret_cast<LogFileMessageHeader *>(&current_[position_]);
       if (wait) {
-        if (condition_wait(&r->marker) != 0) continue;
+        if (futex_wait(&r->marker) != 0) continue;
       }
       if (r->marker == 2) {
         Unmap(current_);
diff --git a/aos/atom_code/core/LogStreamer.cpp b/aos/atom_code/core/LogStreamer.cpp
index d3d4928..20c5987 100644
--- a/aos/atom_code/core/LogStreamer.cpp
+++ b/aos/atom_code/core/LogStreamer.cpp
@@ -31,7 +31,7 @@
 
   int index = 0;
   while (true) {
-    const LogMessage *const msg = ReadNext(BLOCK, &index);
+    const LogMessage *const msg = ReadNext(RawQueue::kBlock, &index);
     if (msg == NULL) continue;
 
     internal::PrintMessage(stdout, *msg);
diff --git a/aos/atom_code/core/core.gyp b/aos/atom_code/core/core.gyp
index 74b3232..1aaebfb 100644
--- a/aos/atom_code/core/core.gyp
+++ b/aos/atom_code/core/core.gyp
@@ -32,6 +32,7 @@
         '<(AOS)/build/aos.gyp:logging',
         '<(AOS)/atom_code/atom_code.gyp:init',
         '<(AOS)/common/common.gyp:time',
+        '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:queue',
       ],
     },
     {
diff --git a/aos/atom_code/ipc_lib/aos_sync.c b/aos/atom_code/ipc_lib/aos_sync.c
index a9a4779..1980a4d 100644
--- a/aos/atom_code/ipc_lib/aos_sync.c
+++ b/aos/atom_code/ipc_lib/aos_sync.c
@@ -1,135 +1,197 @@
+#include "aos/atom_code/ipc_lib/aos_sync.h"
+
 #include <stdio.h>
 #include <linux/futex.h>
 #include <unistd.h>
 #include <sys/syscall.h>
 #include <errno.h>
-#include "aos_sync.h"
-#include "cmpxchg.h"
 #include <stdint.h>
 #include <limits.h>
 #include <string.h>
+#include <inttypes.h>
+
+#include "cmpxchg.h"
 
 // this code is based on something that appears to be based on <http://www.akkadia.org/drepper/futex.pdf>, which also has a lot of useful information
 // should probably use <http://lxr.linux.no/linux+v2.6.34/Documentation/robust-futexes.txt> once it becomes available
-// <http://locklessinc.com/articles/futex_cheat_sheet/> and <http://locklessinc.com/articles/mutex_cv_futex/> are useful
+//   (sys_set_robust_list appears to be the function name)
+// <http://locklessinc.com/articles/futex_cheat_sheet/> and
+//   <http://locklessinc.com/articles/mutex_cv_futex/> are useful
 // <http://lwn.net/Articles/360699/> has a nice overview of futexes in late 2009 (fairly recent compared to everything else...)
 // can't use PRIVATE futex operations because they use the pid (or something) as part of the hash
 //
+// Remember that EAGAIN and EWOUDBLOCK are the same! (ie if you get EAGAIN from
+// FUTEX_WAIT, the docs call it EWOULDBLOCK...)
+//
 // Values for a mutex:
 // 0 = unlocked
 // 1 = locked, not contended
 // 2 = locked, probably contended
-// Values for a condition:
+// Values for a "futex":
 // 0 = unset
 // 1 = set
 
-static inline int sys_futex(mutex *addr1, int op, int val1, const struct timespec *timeout,
-		void *addr2, int val3) {
-	return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
+static inline int sys_futex(mutex *addr1, int op, int val1,
+    const struct timespec *timeout, void *addr2, int val3) {
+  return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
+}
+static inline int sys_futex_requeue(mutex *addr1, int op, int num_wake,
+    int num_requeue, mutex *m) {
+  return syscall(SYS_futex, addr1, op, num_wake, num_requeue, m);
+}
+static inline int sys_futex_op(mutex *addr1, int op, int num_waiters1,
+    int num_waiters2, mutex *addr2, int op_args_etc) {
+  return syscall(SYS_futex, addr1, op, num_waiters1,
+      num_waiters2, addr2, op_args_etc);
 }
 
 static inline int mutex_get(mutex *m, uint8_t signals_fail, const
-		struct timespec *timeout) {
-	int c;
-	c = cmpxchg(m, 0, 1);
-	if (!c) return 0;
-	/* The lock is now contended */
-	if (c == 1) c = xchg(m, 2);
-	while (c) {
-		/* Wait in the kernel */
-		//printf("sync here %d\n", __LINE__);
-		if (sys_futex(m, FUTEX_WAIT, 2, timeout, NULL, 0) == -1) {
-			if (signals_fail && errno == EINTR) {
-				return 1;
-			}
-			if (timeout != NULL && errno == ETIMEDOUT) {
-				return 2;
-			}
-		}
-		//printf("sync here %d\n", __LINE__);
-		c = xchg(m, 2);
-	}
-	return 0;
+                            struct timespec *timeout) {
+  int c;
+  c = cmpxchg(m, 0, 1);
+  if (!c) return 0;
+  /* The lock is now contended */
+  if (c == 1) c = xchg(m, 2);
+  while (c) {
+    /* Wait in the kernel */
+    //printf("sync here %d\n", __LINE__);
+    if (sys_futex(m, FUTEX_WAIT, 2, timeout, NULL, 0) == -1) {
+      if (signals_fail && errno == EINTR) {
+        return 1;
+      }
+      if (timeout != NULL && errno == ETIMEDOUT) {
+        return 2;
+      }
+    }
+    //printf("sync here %d\n", __LINE__);
+    c = xchg(m, 2);
+  }
+  return 0;
 }
 int mutex_lock(mutex *m) {
-	return mutex_get(m, 1, NULL);
+  return mutex_get(m, 1, NULL);
 }
 int mutex_lock_timeout(mutex *m, const struct timespec *timeout) {
-	return mutex_get(m, 1, timeout);
+  return mutex_get(m, 1, timeout);
 }
 int mutex_grab(mutex *m) {
-	return mutex_get(m, 0, NULL);
+  return mutex_get(m, 0, NULL);
 }
 
-int mutex_unlock(mutex *m) {
-	/* Unlock, and if not contended then exit. */
-	//printf("mutex_unlock(%p) => %d \n",m,*m);
-	switch (xchg(m, 0)) {
-		case 0:
-			fprintf(stderr, "sync: multiple unlock of %p. aborting\n", m);
-			printf("see stderr\n");
-			abort();
-		case 1:
-			//printf("mutex_unlock return(%p) => %d \n",m,*m);
-			return 0;
-		case 2:
-			if (sys_futex(m, FUTEX_WAKE, 1, NULL, NULL, 0) == -1) {
-				fprintf(stderr, "sync: waking 1 from %p failed with %d: %s\n",
-						m, errno, strerror(errno));
-				return -1;
-			} else {
-				return 0;
-			}
-		default:
-			fprintf(stderr, "sync: got a garbage value from mutex %p. aborting\n",
-					m);
-			printf("see stderr\n");
-			abort();
-	}
+void mutex_unlock(mutex *m) {
+  /* Unlock, and if not contended then exit. */
+  //printf("mutex_unlock(%p) => %d \n",m,*m);
+  switch (xchg(m, 0)) {
+    case 0:
+      fprintf(stderr, "sync: multiple unlock of %p. aborting\n", m);
+      printf("see stderr\n");
+      abort();
+    case 1:
+      //printf("mutex_unlock return(%p) => %d \n",m,*m);
+      break;
+    case 2:
+      if (sys_futex(m, FUTEX_WAKE, 1, NULL, NULL, 0) == -1) {
+        fprintf(stderr, "sync: waking 1 from %p failed with %d: %s\n",
+            m, errno, strerror(errno));
+        printf("see stderr\n");
+        abort();
+      } else {
+        break;
+      }
+    default:
+      fprintf(stderr, "sync: got a garbage value from mutex %p. aborting\n",
+          m);
+      printf("see stderr\n");
+      abort();
+  }
 }
 int mutex_trylock(mutex *m) {
-	/* Try to take the lock, if is currently unlocked */
-	unsigned c = cmpxchg(m, 0, 1);
-	if (!c) return 0;
-	return 1;
+  /* Try to take the lock, if is currently unlocked */
+  unsigned c = cmpxchg(m, 0, 1);
+  if (!c) return 0;
+  return 1;
 }
 
-int condition_wait(mutex *m) {
-	if (*m) {
-		return 0;
-	}
-	if (sys_futex(m, FUTEX_WAIT, 0, NULL, NULL, 0) == -1) {
-		if (errno == EINTR) {
-			return 1;
-		} else if (errno != EWOULDBLOCK) {
-			return -1;
-		}
-	}
-	return 0;
+int futex_wait(mutex *m) {
+  if (*m) {
+    return 0;
+  }
+  if (sys_futex(m, FUTEX_WAIT, 0, NULL, NULL, 0) == -1) {
+    if (errno == EINTR) {
+      return 1;
+    } else if (errno != EWOULDBLOCK) {
+      return -1;
+    }
+  }
+  return 0;
 }
-int condition_wait_force(mutex *m) {
-	while (1) {
-		if (sys_futex(m, FUTEX_WAIT, *m, NULL, NULL, 0) == -1) {
-			if (errno != EWOULDBLOCK) { // if it was an actual problem
-				if (errno == EINTR) {
-					return 1;
-				} else {
-					return -1;
-				}
-			}
-		} else {
-			return 0;
-		}
-	}
+int futex_set_value(mutex *m, mutex value) {
+  xchg(m, value);
+  return sys_futex(m, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
 }
-inline int condition_set_value(mutex *m, mutex value) {
-	xchg(m, value);
-	return sys_futex(m, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
+int futex_set(mutex *m) {
+  return futex_set_value(m, 1);
 }
-int condition_set(mutex *m) {
-	return condition_set_value(m, 1);
-}
-int condition_unset(mutex *m) {
-	return !xchg(m, 0);
+int futex_unset(mutex *m) {
+  return !xchg(m, 0);
 }
 
+void condition_wait(mutex *c, mutex *m) {
+  const mutex wait_start = *c;
+
+  mutex_unlock(m);
+
+  while (1) {
+    if (sys_futex(c, FUTEX_WAIT, wait_start, NULL, NULL, 0) == -1) {
+      // If it failed for some reason other than somebody else doing a wake
+      // before we actually made it to sleep.
+      if (__builtin_expect(*c == wait_start, 0)) {
+        // Try again if it was because of a signal.
+        if (errno == EINTR) continue;
+        fprintf(stderr, "FUTEX_WAIT(%p, %"PRIu32", NULL, NULL, 0) failed"
+                " with %d: %s\n",
+                c, wait_start, errno, strerror(errno));
+        printf("see stderr\n");
+        abort();
+      }
+    }
+    // Simplified mutex_lock that always leaves it
+    // contended in case anybody else got requeued.
+    while (xchg(m, 2) != 0) {
+      if (sys_futex(m, FUTEX_WAIT, 2, NULL, NULL, 0) == -1) {
+        // Try again if it was because of a signal or somebody else unlocked it
+        // before we went to sleep.
+        if (errno == EINTR || errno == EWOULDBLOCK) continue;
+        fprintf(stderr, "sync: FUTEX_WAIT(%p, 2, NULL, NULL, 0)"
+                " failed with %d: %s\n",
+                m, errno, strerror(errno));
+        printf("see stderr\n");
+        abort();
+      }
+    }
+    return;
+  }
+}
+
+void condition_signal(mutex *c) {
+  __sync_fetch_and_add(c, 1);
+  if (sys_futex(c, FUTEX_WAKE, 1, NULL, NULL, 0) == -1) {
+    fprintf(stderr, "sync: FUTEX_WAKE(%p, 1, NULL, NULL, 0)"
+        " failed with %d: %s\n",
+        c, errno, strerror(errno));
+    printf("see stderr\n");
+    abort();
+  }
+}
+
+void condition_broadcast(mutex *c, mutex *m) {
+  __sync_fetch_and_add(c, 1);
+  // Wake 1 waiter and requeue the rest.
+  if (sys_futex_requeue(c, FUTEX_REQUEUE, 1, INT_MAX, m) == -1) {
+    fprintf(stderr, "sync: FUTEX_REQUEUE(%p, 1, INT_MAX, %p, 0)"
+        " failed with %d: %s\n",
+        c, m, errno, strerror(errno));
+    printf("see stderr\n");
+    abort();
+  }
+}
diff --git a/aos/atom_code/ipc_lib/aos_sync.h b/aos/atom_code/ipc_lib/aos_sync.h
index 3c66264..6d4bf55 100644
--- a/aos/atom_code/ipc_lib/aos_sync.h
+++ b/aos/atom_code/ipc_lib/aos_sync.h
@@ -1,5 +1,5 @@
-#ifndef AOS_IPC_LIB_SYNC_H_
-#define AOS_IPC_LIB_SYNC_H_
+#ifndef AOS_ATOM_CODE_IPC_LIB_SYNC_H_
+#define AOS_ATOM_CODE_IPC_LIB_SYNC_H_
 
 #include <stdlib.h>
 #include <signal.h>
@@ -14,50 +14,83 @@
 // and <http://www.valgrind.org/docs/manual/drd-manual.html#drd-manual.clientreqs>
 // list the interesting ones
 
-// Have to align structs containing it to to sizeof(int).
+// Have to align structs containing it to sizeof(int).
 // Valid initial values for use with mutex_ functions are 0 (unlocked) and 1 (locked).
-// Valid initial values for use with condition_ functions are 0 (unset) and 1 (set).
+// Valid initial values for use with futex_ functions are 0 (unset) and 1 (set).
+// No initialization is necessary for use as c with the condition_ functions.
 // The value should not be changed after multiple processes have started
 // accessing an instance except through the functions declared in this file.
 typedef volatile uint32_t mutex __attribute__((aligned(sizeof(int))));
 
 // All return -1 for other error (which will be in errno from futex(2)).
+//
+// There is no priority inversion protection.
+// TODO(brians) look at using
+// <http://www.kernel.org/doc/Documentation/pi-futex.txt>
 
 // Returns 1 if interrupted by a signal.
+//
+// One of the highest priority processes blocked on a given mutex will be the
+// one to lock it when it is unlocked.
 int mutex_lock(mutex *m) __attribute__((warn_unused_result));
 // Returns 2 if it timed out or 1 if interrupted by a signal.
 int mutex_lock_timeout(mutex *m, const struct timespec *timeout)
   __attribute__((warn_unused_result));
 // Ignores signals. Can not fail.
 int mutex_grab(mutex *m);
-// Returns 1 for multiple unlocking and -1 if something bad happened and
-// whoever's waiting didn't get woken up.
-int mutex_unlock(mutex *m);
+// abort(2)s for multiple unlocking.
+void mutex_unlock(mutex *m);
 // Returns 0 when successful in locking the mutex and 1 if somebody else has it
 // locked.
 int mutex_trylock(mutex *m) __attribute__((warn_unused_result));
 
-// The condition_ functions are similar to the mutex_ ones but different.
+// The futex_ functions are similar to the mutex_ ones but different.
 // They are designed for signalling when something happens (possibly to
 // multiple listeners). A mutex manipulated with them can only be set or unset.
+//
+// They are different from the condition_ functions in that they do NOT work
+// correctly as standard condition variables. While it is possible to keep
+// track of the "condition" using the value part of the futex_* functions, the
+// obvious implementation has basically the same race condition that condition
+// variables are designed to prevent between somebody else grabbing the mutex
+// and changing whether it's set or not and the futex_ function changing the
+// futex's value.
 
-// Wait for the condition to be set. Will return immediately if it's already set.
-// Returns 0 if successful or it was already set, 1 if interrupted by a signal, or -1.
-int condition_wait(mutex *m) __attribute__((warn_unused_result));
-// Will wait for the next condition_set, even if the condition is already set.
-// Returns 0 if successful, 1 if interrupted by a signal, or -1.
-int condition_wait_force(mutex *m) __attribute__((warn_unused_result));
-// Set the condition and wake up anybody waiting on it.
+// Wait for the futex to be set. Will return immediately if it's already set.
+// Returns 0 if successful or it was already set, 1 if interrupted by a signal,
+// or -1.
+int futex_wait(mutex *m) __attribute__((warn_unused_result));
+// Set the futex and wake up anybody waiting on it.
 // Returns the number that were woken or -1.
-int condition_set(mutex *m);
+//
+// This will always wake up all waiters at the same time and set the value to 1.
+int futex_set(mutex *m);
 // Same as above except lets something other than 1 be used as the final value.
-int condition_set_value(mutex *m, mutex value);
-// Unsets the condition.
+int futex_set_value(mutex *m, mutex value);
+// Unsets the futex (sets the value to 0).
 // Returns 0 if it was set before and 1 if it wasn't.
-int condition_unset(mutex *m);
+// Can not fail.
+int futex_unset(mutex *m);
+
+// The condition_ functions implement condition variable support. The API is
+// similar to the pthreads api and works the same way. The same m argument must
+// be passed in for all calls to all of the condition_ functions with a given c.
+
+// Wait for the condition variable to be signalled. m will be unlocked
+// atomically with actually starting to wait. m is guaranteed to be locked when
+// this function returns.
+// NOTE: The relocking of m is not atomic with stopping the actual wait and
+// other process(es) may lock (+unlock) the mutex first.
+void condition_wait(mutex *c, mutex *m);
+// If any other processes are condition_waiting on c, wake 1 of them. Does not
+// require m to be locked.
+void condition_signal(mutex *c);
+// Wakes all processes that are condition_waiting on c. Does not require m to be
+// locked.
+void condition_broadcast(mutex *c, mutex *m);
 
 #ifdef __cplusplus
 }
 #endif  // __cplusplus
 
-#endif
+#endif  // AOS_ATOM_CODE_IPC_LIB_SYNC_H_
diff --git a/aos/atom_code/ipc_lib/binheap.c b/aos/atom_code/ipc_lib/binheap.c
deleted file mode 100644
index 8eb024d..0000000
--- a/aos/atom_code/ipc_lib/binheap.c
+++ /dev/null
@@ -1,125 +0,0 @@
-#include "binheap.h"
-#include <stdlib.h>
-#include <stdio.h>
-
-#ifndef TESTING_ASSERT
-#define TESTING_ASSERT(...)
-#endif
-#define Error(x) TESTING_ASSERT(0, x)
-
-#define MinData (0)
-
-void Initialize( int Elements, PriorityQueue H )
-{
-	H->Capacity = Elements - 1;
-	H->Size = 0;
-	H->Elements[ 0 ] = MinData;
-}
-
-int Insert( ElementType X, PriorityQueue H )
-{
-	int i;
-
-	if( IsFull( H ) )
-	{
-		return -1;
-	}
-
-	for( i = ++H->Size; H->Elements[ i / 2 ] > X; i /= 2 )
-		H->Elements[ i ] = H->Elements[ i / 2 ];
-	H->Elements[ i ] = X;
-	return 0;
-}
-
-void Remove( ElementType X, PriorityQueue H )
-{
-	int i, Child, removed = 0;
-	ElementType LastElement;
-
-	for ( i = 1; i <= H->Size; ++i )
-	{
-		if( H->Elements[ i ] == X )
-		{
-			removed = i;
-			break;
-		}
-	}
-	if( removed == 0 )
-	{
-		fprintf(stderr, "could not find element %d to remove. not removing any\n", X);
-		return;
-	}
-
-	LastElement = H->Elements[ H->Size-- ];
-
-	for( i = removed; i * 2 <= H->Size; i = Child )
-	{
-		/* Find smaller child */
-		Child = i * 2;
-		if( Child != H->Size && H->Elements[ Child + 1 ]
-				< H->Elements[ Child ] )
-			Child++;
-
-		/* Percolate one level */
-		if( LastElement > H->Elements[ Child ] )
-			H->Elements[ i ] = H->Elements[ Child ];
-		else
-			break;
-	}
-	H->Elements[ i ] = LastElement;
-}
-
-ElementType DeleteMin( PriorityQueue H )
-{
-	int i, Child;
-	ElementType MinElement, LastElement;
-
-	if( IsEmpty( H ) )
-	{
-		Error( "Priority queue is empty" );
-		return H->Elements[ 0 ];
-	}
-	MinElement = H->Elements[ 1 ];
-	LastElement = H->Elements[ H->Size-- ];
-
-	for( i = 1; i * 2 <= H->Size; i = Child )
-	{
-		/* Find smaller child */
-		Child = i * 2;
-		if( Child != H->Size && H->Elements[ Child + 1 ]
-				< H->Elements[ Child ] )
-			Child++;
-
-		/* Percolate one level */
-		if( LastElement > H->Elements[ Child ] )
-			H->Elements[ i ] = H->Elements[ Child ];
-		else
-			break;
-	}
-	H->Elements[ i ] = LastElement;
-	return MinElement;
-}
-
-ElementType GetMin( PriorityQueue H )
-{
-	if( !IsEmpty( H ) )
-		return H->Elements[ 1 ];
-	Error( "Priority Queue is Empty" );
-	return H->Elements[ 0 ];
-}
-
-int IsEmpty( PriorityQueue H )
-{
-	return H->Size == 0;
-}
-
-int IsFull( PriorityQueue H )
-{
-	return H->Size == H->Capacity;
-}
-
-int GetSize( PriorityQueue H )
-{
-	return H->Size;
-}
-
diff --git a/aos/atom_code/ipc_lib/binheap.h b/aos/atom_code/ipc_lib/binheap.h
deleted file mode 100644
index 8c26f9f..0000000
--- a/aos/atom_code/ipc_lib/binheap.h
+++ /dev/null
@@ -1,29 +0,0 @@
-#ifndef _BinHeap_H
-#define _BinHeap_H
-
-#include <stdint.h>
-
-typedef uint8_t ElementType;
-struct HeapStruct;
-typedef struct HeapStruct *PriorityQueue;
-
-struct HeapStruct
-{
-	int Capacity;
-	int Size;
-	ElementType *Elements;
-};
-
-// Elements is the number allocated at H->Elements
-void Initialize( int Elements, PriorityQueue H );
-// 0 if successful, -1 if full
-int Insert( ElementType X, PriorityQueue H );
-void Remove( ElementType X, PriorityQueue H );
-ElementType DeleteMin( PriorityQueue H );
-ElementType GetMin( PriorityQueue H );
-int IsEmpty( PriorityQueue H );
-int IsFull( PriorityQueue H );
-int GetSize( PriorityQueue H );
-
-#endif
-
diff --git a/aos/atom_code/ipc_lib/binheap_test.cpp b/aos/atom_code/ipc_lib/binheap_test.cpp
deleted file mode 100644
index 62eecd4..0000000
--- a/aos/atom_code/ipc_lib/binheap_test.cpp
+++ /dev/null
@@ -1,65 +0,0 @@
-extern "C" {
-#include "binheap.h"
-}
-
-#include <gtest/gtest.h>
-
-class BinHeapTest : public testing::Test{
-	protected:
-		static const int TEST_ELEMENTS = 57;
-		PriorityQueue queue;
-		virtual void SetUp(){
-			queue = new HeapStruct();
-			queue->Elements = new uint8_t[TEST_ELEMENTS];
-			Initialize(TEST_ELEMENTS, queue);
-		}
-		virtual void TearDown(){
-			delete[] queue->Elements;
-			delete queue;
-		}
-};
-
-std::ostream& operator<< (std::ostream& o, uint8_t c){
-    return o<<(int)c;
-}
-
-testing::AssertionResult Contains(PriorityQueue queue, const uint8_t expected[], size_t length){
-	for(size_t i = 0; i < length; ++i){
-		//printf("expected[%d]=%d\n", i, expected[i]);
-		if(DeleteMin(queue) != expected[i]){
-			return testing::AssertionFailure() << "queue[" << i << "] != " << expected[i];
-		}
-	}
-	if(!IsEmpty(queue))
-		return testing::AssertionFailure() << "queue is longer than " << length;
-	return ::testing::AssertionSuccess();
-}
-
-TEST_F(BinHeapTest, SingleElement){
-	Insert(87, queue);
-	EXPECT_EQ(87, DeleteMin(queue));
-	EXPECT_TRUE(IsEmpty(queue));
-}
-TEST_F(BinHeapTest, MultipleElements){
-	Insert(54, queue);
-	Insert(1, queue);
-	Insert(0, queue);
-	Insert(255, queue);
-	Insert(123, queue);
-	uint8_t expected[] = {0, 1, 54, 123, 255};
-	EXPECT_TRUE(Contains(queue, expected, sizeof(expected)));
-}
-TEST_F(BinHeapTest, Removals){
-	Insert(54, queue);
-	Insert(1, queue);
-	Insert(0, queue);
-	Insert(255, queue);
-	Insert(123, queue);
-	Remove(255, queue);
-	Remove(0, queue);
-	Insert(222, queue);
-	Insert(67, queue);
-	uint8_t expected[] = {1, 54, 67, 123, 222};
-	EXPECT_TRUE(Contains(queue, expected, sizeof(expected)));
-}
-
diff --git a/aos/atom_code/ipc_lib/cmpxchg.h b/aos/atom_code/ipc_lib/cmpxchg.h
index acb4a3c..715c57d 100644
--- a/aos/atom_code/ipc_lib/cmpxchg.h
+++ b/aos/atom_code/ipc_lib/cmpxchg.h
@@ -9,10 +9,10 @@
 
 #define cmpxchg(ptr, o, n) __sync_val_compare_and_swap(ptr, o, n)
 /*#define xchg(ptr, n) ({typeof(*ptr) r; \
-		do{ \
-			r = *ptr; \
-		}while(!__sync_bool_compare_and_swap(ptr, r, n)); \
-		r; \
+    do{ \
+      r = *ptr; \
+    }while(!__sync_bool_compare_and_swap(ptr, r, n)); \
+    r; \
 })*/
 
 #  define LOCK "lock;"
@@ -24,7 +24,7 @@
 
 /*static inline void set_64bit(volatile unsigned long *ptr, unsigned long val)
 {
-	*ptr = val;
+  *ptr = val;
 }
 
 #define _set_64bit set_64bit*/
@@ -32,37 +32,37 @@
 /*
  * Note: no "lock" prefix even on SMP: xchg always implies lock anyway
  * Note 2: xchg has side effect, so that attribute volatile is necessary,
- *	  but generally the primitive is invalid, *ptr is output argument. --ANK
+ *    but generally the primitive is invalid, *ptr is output argument. --ANK
  */
 static inline unsigned long __xchg(unsigned long x, volatile void * ptr, int size)
 {
-	switch (size) {
-		case 1:
-			__asm__ __volatile__("xchgb %b0,%1"
-					:"=q" (x)
-					:"m" (*__xg(ptr)), "0" (x)
-					:"memory");
-			break;
-		case 2:
-			__asm__ __volatile__("xchgw %w0,%1"
-					:"=r" (x)
-					:"m" (*__xg(ptr)), "0" (x)
-					:"memory");
-			break;
-		case 4:
-			__asm__ __volatile__("xchgl %k0,%1"
-					:"=r" (x)
-					:"m" (*__xg(ptr)), "0" (x)
-					:"memory");
-			break;
-		case 8:
-			__asm__ __volatile__("xchg %0,%1"
-					:"=r" (x)
-					:"m" (*__xg(ptr)), "0" (x)
-					:"memory");
-			break;
-	}
-	return x;
+  switch (size) {
+    case 1:
+      __asm__ __volatile__("xchgb %b0,%1"
+          :"=q" (x)
+          :"m" (*__xg(ptr)), "0" (x)
+          :"memory");
+      break;
+    case 2:
+      __asm__ __volatile__("xchgw %w0,%1"
+          :"=r" (x)
+          :"m" (*__xg(ptr)), "0" (x)
+          :"memory");
+      break;
+    case 4:
+      __asm__ __volatile__("xchgl %k0,%1"
+          :"=r" (x)
+          :"m" (*__xg(ptr)), "0" (x)
+          :"memory");
+      break;
+    case 8:
+      __asm__ __volatile__("xchg %0,%1"
+          :"=r" (x)
+          :"m" (*__xg(ptr)), "0" (x)
+          :"memory");
+      break;
+  }
+  return x;
 }
 
 /*
@@ -76,78 +76,78 @@
 #define __HAVE_ARCH_CMPXCHG 1
 
 static inline unsigned long __cmpxchg(volatile void *ptr, unsigned long old,
-		unsigned long new, int size)
+    unsigned long new, int size)
 {
-	int32_t prev;
-	switch (size) {
-		case 1:
-			__asm__ __volatile__(LOCK_PREFIX "cmpxchgb %b1,%2"
-				    : "=a"(prev)
-				    : "q"(new), "m"(*__xg(ptr)), "0"(old)
-				    : "memory");
-			return prev;
-		case 2:
-			__asm__ __volatile__(LOCK_PREFIX "cmpxchgw %w1,%2"
-				    : "=a"(prev)
-				    : "r"(new), "m"(*__xg(ptr)), "0"(old)
-				    : "memory");
-			return prev;
-		case 4:
-			__asm__ __volatile__(LOCK_PREFIX "cmpxchgl %k1,%2"
-				    : "=a"(prev)
-				    : "r"(new), "m"(*__xg(ptr)), "0"(old)
-				    : "memory");
-			return prev;
-		case 8:
-			__asm__ __volatile__("lock; cmpxchg %1,%2"
-				    : "=a"(prev)
-				    : "q"(new), "m"(*__xg(ptr)), "0"(old)
-				    : "memory");
-			return prev;
-	}
-	return old;
+  int32_t prev;
+  switch (size) {
+    case 1:
+      __asm__ __volatile__(LOCK_PREFIX "cmpxchgb %b1,%2"
+            : "=a"(prev)
+            : "q"(new), "m"(*__xg(ptr)), "0"(old)
+            : "memory");
+      return prev;
+    case 2:
+      __asm__ __volatile__(LOCK_PREFIX "cmpxchgw %w1,%2"
+            : "=a"(prev)
+            : "r"(new), "m"(*__xg(ptr)), "0"(old)
+            : "memory");
+      return prev;
+    case 4:
+      __asm__ __volatile__(LOCK_PREFIX "cmpxchgl %k1,%2"
+            : "=a"(prev)
+            : "r"(new), "m"(*__xg(ptr)), "0"(old)
+            : "memory");
+      return prev;
+    case 8:
+      __asm__ __volatile__("lock; cmpxchg %1,%2"
+            : "=a"(prev)
+            : "q"(new), "m"(*__xg(ptr)), "0"(old)
+            : "memory");
+      return prev;
+  }
+  return old;
 }
 
 /*
 static inline unsigned long __cmpxchg_local(volatile void *ptr,
-			unsigned long old, unsigned long new, int size)
+      unsigned long old, unsigned long new, int size)
 {
-	unsigned long prev;
-	switch (size) {
-	case 1:
-		__asm__ __volatile__("cmpxchgb %b1,%2"
-				     : "=a"(prev)
-				     : "q"(new), "m"(*__xg(ptr)), "0"(old)
-				     : "memory");
-		return prev;
-	case 2:
-		__asm__ __volatile__("cmpxchgw %w1,%2"
-				     : "=a"(prev)
-				     : "r"(new), "m"(*__xg(ptr)), "0"(old)
-				     : "memory");
-		return prev;
-	case 4:
-		__asm__ __volatile__("cmpxchgl %k1,%2"
-				     : "=a"(prev)
-				     : "r"(new), "m"(*__xg(ptr)), "0"(old)
-				     : "memory");
-		return prev;
-	case 8:
-		__asm__ __volatile__("cmpxchgq %1,%2"
-				     : "=a"(prev)
-				     : "r"(new), "m"(*__xg(ptr)), "0"(old)
-				     : "memory");
-		return prev;
-	}
-	return old;
+  unsigned long prev;
+  switch (size) {
+  case 1:
+    __asm__ __volatile__("cmpxchgb %b1,%2"
+             : "=a"(prev)
+             : "q"(new), "m"(*__xg(ptr)), "0"(old)
+             : "memory");
+    return prev;
+  case 2:
+    __asm__ __volatile__("cmpxchgw %w1,%2"
+             : "=a"(prev)
+             : "r"(new), "m"(*__xg(ptr)), "0"(old)
+             : "memory");
+    return prev;
+  case 4:
+    __asm__ __volatile__("cmpxchgl %k1,%2"
+             : "=a"(prev)
+             : "r"(new), "m"(*__xg(ptr)), "0"(old)
+             : "memory");
+    return prev;
+  case 8:
+    __asm__ __volatile__("cmpxchgq %1,%2"
+             : "=a"(prev)
+             : "r"(new), "m"(*__xg(ptr)), "0"(old)
+             : "memory");
+    return prev;
+  }
+  return old;
 }*/
 
 #define cmpxchg(ptr,o,n)\
-	((__typeof__(*(ptr)))__cmpxchg((ptr),(unsigned long)(o),\
-					(unsigned long)(n),sizeof(*(ptr))))
+  ((__typeof__(*(ptr)))__cmpxchg((ptr),(unsigned long)(o),\
+          (unsigned long)(n),sizeof(*(ptr))))
 /*#define cmpxchg_local(ptr,o,n)\
-	((__typeof__(*(ptr)))__cmpxchg((ptr),(unsigned long)(o),\
-					(unsigned long)(n),sizeof(*(ptr))))*/
+  ((__typeof__(*(ptr)))__cmpxchg((ptr),(unsigned long)(o),\
+          (unsigned long)(n),sizeof(*(ptr))))*/
 #endif
 
 #endif
diff --git a/aos/atom_code/ipc_lib/condition.cc b/aos/atom_code/ipc_lib/condition.cc
new file mode 100644
index 0000000..b764026
--- /dev/null
+++ b/aos/atom_code/ipc_lib/condition.cc
@@ -0,0 +1,26 @@
+#include "aos/common/condition.h"
+
+#include <inttypes.h>
+
+#include "aos/common/type_traits.h"
+
+namespace aos {
+
+static_assert(shm_ok<Condition>::value, "Condition should work"
+              " in shared memory");
+
+Condition::Condition(Mutex *m) : impl_(), m_(m) {}
+
+void Condition::Wait() {
+  condition_wait(&impl_, &m_->impl_);
+}
+
+void Condition::Signal() {
+  condition_signal(&impl_);
+}
+
+void Condition::Broadcast() {
+  condition_broadcast(&impl_, &m_->impl_);
+}
+
+}  // namespace aos
diff --git a/aos/atom_code/ipc_lib/core_lib.c b/aos/atom_code/ipc_lib/core_lib.c
index 53587d8..bbd2f5b 100644
--- a/aos/atom_code/ipc_lib/core_lib.c
+++ b/aos/atom_code/ipc_lib/core_lib.c
@@ -1,49 +1,43 @@
+#include "aos/atom_code/ipc_lib/core_lib.h"
+
 #include <stdio.h>
 #include <stdlib.h>
-#include "shared_mem.h"
-#include "core_lib.h"
-#include <time.h>
 
-void init_shared_mem_core(aos_shm_core *shm_core) {
-	clock_gettime(CLOCK_REALTIME, &shm_core->identifier);
-	shm_core->queues.alloc_flag = 0;
-	shm_core->msg_alloc_lock = 0;
-	shm_core->queues.queue_list = NULL;
-	shm_core->queues.alloc_lock = 0;
-}
+#include "aos/atom_code/ipc_lib/shared_mem.h"
+
 static inline uint8_t aos_8max(uint8_t l, uint8_t r) {
-	return (l > r) ? l : r;
+  return (l > r) ? l : r;
 }
 void *shm_malloc_aligned(size_t length, uint8_t alignment) {
-	// minimum alignments from <http://software.intel.com/en-us/articles/data-alignment-when-migrating-to-64-bit-intel-architecture/>
-	if (length <= 1) {
-		alignment = aos_8max(alignment, 1);
-	} else if (length <= 2) {
-		alignment = aos_8max(alignment, 2);
-	} else if (length <= 4) {
-		alignment = aos_8max(alignment, 4);
-	} else if (length <= 8) {
-		alignment = aos_8max(alignment, 8);
-	} else if (length <= 16) {
-		alignment = aos_8max(alignment, 16);
-	} else {
-		alignment = aos_8max(alignment, (length >= 64) ? 64 : 16);
-	}
+  // minimum alignments from <http://software.intel.com/en-us/articles/data-alignment-when-migrating-to-64-bit-intel-architecture/>
+  if (length <= 1) {
+    alignment = aos_8max(alignment, 1);
+  } else if (length <= 2) {
+    alignment = aos_8max(alignment, 2);
+  } else if (length <= 4) {
+    alignment = aos_8max(alignment, 4);
+  } else if (length <= 8) {
+    alignment = aos_8max(alignment, 8);
+  } else if (length <= 16) {
+    alignment = aos_8max(alignment, 16);
+  } else {
+    alignment = aos_8max(alignment, (length >= 64) ? 64 : 16);
+  }
 
-	void *msg = NULL;
-	aos_shm_core *shm_core = global_core->mem_struct;
-	mutex_grab(&shm_core->msg_alloc_lock);
-	shm_core->msg_alloc = (uint8_t *)shm_core->msg_alloc - length;
-	const uint8_t align_extra = (uintptr_t)shm_core->msg_alloc % alignment;
-	shm_core->msg_alloc = (uint8_t *)shm_core->msg_alloc - align_extra;
-	msg = shm_core->msg_alloc;
-	if (msg <= global_core->shared_mem) {
-		fprintf(stderr, "core_lib: RAN OUT OF SHARED MEMORY!!!----------------------------------------------------------\n");
-		printf("if you didn't see the stderr output just then, you should\n");
-		abort();
-	}
-	//printf("alloc %p\n", msg);
-	mutex_unlock(&shm_core->msg_alloc_lock);
-	return msg;
+  void *msg = NULL;
+  aos_shm_core *shm_core = global_core->mem_struct;
+  mutex_grab(&shm_core->msg_alloc_lock);
+  shm_core->msg_alloc = (uint8_t *)shm_core->msg_alloc - length;
+  const uint8_t align_extra = (uintptr_t)shm_core->msg_alloc % alignment;
+  shm_core->msg_alloc = (uint8_t *)shm_core->msg_alloc - align_extra;
+  msg = shm_core->msg_alloc;
+  if (msg <= global_core->shared_mem) {
+    fprintf(stderr, "core_lib: RAN OUT OF SHARED MEMORY!!!----------------------------------------------------------\n");
+    printf("if you didn't see the stderr output just then, you should have\n");
+    abort();
+  }
+  //printf("alloc %p\n", msg);
+  mutex_unlock(&shm_core->msg_alloc_lock);
+  return msg;
 }
 
diff --git a/aos/atom_code/ipc_lib/core_lib.h b/aos/atom_code/ipc_lib/core_lib.h
index f72ae4c..5674220 100644
--- a/aos/atom_code/ipc_lib/core_lib.h
+++ b/aos/atom_code/ipc_lib/core_lib.h
@@ -1,44 +1,14 @@
 #ifndef _AOS_CORE_LIB_H_
 #define _AOS_CORE_LIB_H_
 
-// defined in shared_mem.c
-#ifdef __cplusplus
-extern "C" {
-#endif  // __cplusplus
-extern struct aos_core *global_core;
-#ifdef __cplusplus
-}
-#endif  // __cplusplus
-
-#include "aos_sync.h"
-#include "queue.h"
 #include <stdint.h>
 
+#include "aos/atom_code/ipc_lib/aos_sync.h"
+
 #ifdef __cplusplus
 extern "C" {
 #endif  // __cplusplus
 
-struct aos_queue_list_t;
-typedef struct aos_queue_hash_t {
-	int alloc_flag;
-	mutex alloc_lock;
-	struct aos_queue_list_t *queue_list;
-} aos_queue_hash;
-
-typedef struct aos_shm_core_t {
-  // clock_gettime(CLOCK_REALTIME, &identifier) gets called to identify
-  // this shared memory area
-  struct timespec identifier;
-  // gets 0-initialized at the start (as part of shared memory) and
-  // the owner sets as soon as it finishes setting stuff up
-  mutex creation_condition;
-  mutex msg_alloc_lock;
-  void *msg_alloc;
-  aos_queue_hash queues;
-} aos_shm_core;
-
-void init_shared_mem_core(aos_shm_core *shm_core);
-
 void *shm_malloc_aligned(size_t length, uint8_t alignment)
     __attribute__((alloc_size(1)));
 static void *shm_malloc(size_t length) __attribute__((alloc_size(1)));
diff --git a/aos/atom_code/ipc_lib/ipc_lib.gyp b/aos/atom_code/ipc_lib/ipc_lib.gyp
index 4614680..f947d5e 100644
--- a/aos/atom_code/ipc_lib/ipc_lib.gyp
+++ b/aos/atom_code/ipc_lib/ipc_lib.gyp
@@ -1,41 +1,81 @@
 {
   'targets': [
     {
-      'target_name': 'ipc_lib',
+      'target_name': 'aos_sync',
       'type': 'static_library',
       'sources': [
         'aos_sync.c',
-        'binheap.c',
+      ],
+    },
+    {
+      'target_name': 'core_lib',
+      'type': 'static_library',
+      'sources': [
         'core_lib.c',
-        'queue.c',
+      ],
+      'dependencies': [
+        'aos_sync',
+        'shared_mem',
+      ],
+      'export_dependent_settings': [
+        'aos_sync',
+      ],
+    },
+    {
+      'target_name': 'shared_mem',
+      'type': 'static_library',
+      'sources': [
         'shared_mem.c',
       ],
       'dependencies': [
+        'aos_sync',
+      ],
+      'export_dependent_settings': [
+        'aos_sync',
+      ],
+    },
+    {
+      'target_name': 'queue',
+      'type': 'static_library',
+      'sources': [
+        'queue.cc',
+      ],
+      'dependencies': [
+        '<(AOS)/common/common.gyp:condition',
+        '<(AOS)/common/common.gyp:mutex',
+        'core_lib',
         # TODO(brians): fix this once there's a nice logging interface to use
         # '<(AOS)/build/aos.gyp:logging',
       ],
     },
     {
-      'target_name': 'binheap_test',
+      'target_name': 'raw_queue_test',
       'type': 'executable',
       'sources': [
-        'binheap_test.cpp',
+        'queue_test.cc',
       ],
       'dependencies': [
         '<(EXTERNALS):gtest',
-        'ipc_lib',
+        'queue',
+        '<(AOS)/build/aos.gyp:logging',
+        'core_lib',
+        '<(AOS)/common/common.gyp:queue_testutils',
+        '<(AOS)/common/common.gyp:time',
       ],
     },
     {
-      'target_name': 'ipc_queue_test',
+      'target_name': 'ipc_stress_test',
       'type': 'executable',
       'sources': [
-        'queue_test.cpp',
+        'ipc_stress_test.cc',
       ],
       'dependencies': [
         '<(EXTERNALS):gtest',
-        'ipc_lib',
-        '<(AOS)/build/aos.gyp:logging',
+        '<(AOS)/common/common.gyp:time',
+        '<(AOS)/common/common.gyp:queue_testutils',
+        '<(AOS)/common/common.gyp:mutex',
+        'core_lib',
+        '<(AOS)/common/common.gyp:die',
       ],
     },
   ],
diff --git a/aos/atom_code/ipc_lib/ipc_stress_test.cc b/aos/atom_code/ipc_lib/ipc_stress_test.cc
new file mode 100644
index 0000000..38c425f
--- /dev/null
+++ b/aos/atom_code/ipc_lib/ipc_stress_test.cc
@@ -0,0 +1,248 @@
+#include <stdio.h>
+#include <unistd.h>
+#include <errno.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/wait.h>
+#include <libgen.h>
+#include <assert.h>
+
+#include <vector>
+#include <string>
+
+#include "aos/common/time.h"
+#include "aos/common/queue_testutils.h"
+#include "aos/common/type_traits.h"
+#include "aos/common/mutex.h"
+#include "aos/atom_code/ipc_lib/core_lib.h"
+#include "aos/common/die.h"
+
+// This runs all of the IPC-related tests in a bunch of parallel processes for a
+// while and makes sure that they don't fail. It also captures the stdout and
+// stderr output from each test run and only prints it out (not interleaved with
+// the output from any other run) if the test fails.
+//
+// It's written in C++ for performance. We need actual OS-level parallelism for
+// this to work, which means that Ruby's out because it doesn't have good
+// support for doing that. My Python implementation ended up pretty heavily disk
+// IO-bound, which is a bad way to test CPU contention.
+
+namespace aos {
+
+// Each test is represented by the name of the test binary and then any
+// arguments to pass to it.
+// Using --gtest_filter is a bad idea because it seems to result in a lot of
+// swapping which causes everything to be disk-bound (at least for me).
+static const ::std::vector< ::std::vector< ::std::string>> kTests = {
+  {"queue_test"},
+  {"condition_test"},
+  {"mutex_test"},
+  {"raw_queue_test"},
+};
+// These arguments get inserted before any per-test arguments.
+static const ::std::vector< ::std::string> kDefaultArgs = {
+  "--gtest_repeat=30",
+  "--gtest_shuffle",
+};
+
+// How many test processes to run at a time.
+static const int kTesters = 100;
+// How long to test for.
+static constexpr time::Time kTestTime = time::Time::InSeconds(30);
+
+// The structure that gets put into shared memory and then referenced by all of
+// the child processes.
+struct Shared {
+  Shared(const time::Time &stop_time)
+    : stop_time(stop_time), total_iterations(0) {}
+
+  // Synchronizes access to stdout/stderr to avoid interleaving failure
+  // messages.
+  Mutex output_mutex;
+
+  // When to stop.
+  time::Time stop_time;
+
+  // The total number of iterations. Updated by each child as it finishes.
+  int total_iterations;
+  // Sychronizes writes to total_iterations
+  Mutex total_iterations_mutex;
+
+  const char *path;
+};
+static_assert(shm_ok<Shared>::value,
+              "it's going to get shared between forked processes");
+
+// Gets called after each child forks to run a test.
+void __attribute__((noreturn)) DoRunTest(
+    Shared *shared, const ::std::vector< ::std::string> &test, int pipes[2]) {
+  if (close(pipes[0]) == -1) {
+    Die("close(%d) of read end of pipe failed with %d: %s\n",
+        pipes[0], errno, strerror(errno));
+  }
+  if (close(STDIN_FILENO) == -1) {
+    Die("close(STDIN_FILENO(=%d)) failed with %d: %s\n",
+        STDIN_FILENO, errno, strerror(errno));
+  }
+  if (dup2(pipes[1], STDOUT_FILENO) == -1) {
+    Die("dup2(%d, STDOUT_FILENO(=%d)) failed with %d: %s\n",
+        pipes[1], STDOUT_FILENO, errno, strerror(errno));
+  }
+  if (dup2(pipes[1], STDERR_FILENO) == -1) {
+    Die("dup2(%d, STDERR_FILENO(=%d)) failed with %d: %s\n",
+        pipes[1], STDERR_FILENO, errno, strerror(errno));
+  }
+
+  size_t size = test.size();
+  size_t default_size = kDefaultArgs.size();
+  const char **args = new const char *[size + default_size + 1];
+  // The actual executable to run.
+  ::std::string executable;
+  int i = 0;
+  for (const ::std::string &c : test) {
+    if (i == 0) {
+      executable = ::std::string(shared->path) + "/" + c;
+      args[0] = executable.c_str();
+      for (const ::std::string &ci : kDefaultArgs) {
+        args[++i] = ci.c_str();
+      }
+    } else {
+      args[i] = c.c_str();
+    }
+    ++i;
+  }
+  args[size] = NULL;
+  execv(executable.c_str(), const_cast<char *const *>(args));
+  Die("execv(%s, %p) failed with %d: %s\n",
+      executable.c_str(), args, errno, strerror(errno));
+}
+
+void DoRun(Shared *shared) {
+  int iterations = 0;
+  // An iterator pointing to a random one of the tests.
+  auto test = kTests.begin() + (getpid() % kTests.size());
+  int pipes[2];
+  while (time::Time::Now() < shared->stop_time) {
+    if (pipe(pipes) == -1) {
+      Die("pipe(%p) failed with %d: %s\n", &pipes, errno, strerror(errno));
+    }
+    switch (fork()) {
+      case 0:  // in runner
+        DoRunTest(shared, *test, pipes);
+      case -1:
+        Die("fork() failed with %d: %s\n", errno, strerror(errno));
+    }
+
+    if (close(pipes[1]) == -1) {
+      Die("close(%d) of write end of pipe failed with %d: %s\n",
+          pipes[1], errno, strerror(errno));
+    }
+
+    ::std::string output;
+    char buffer[2048];
+    while (true) {
+      ssize_t ret = read(pipes[0], &buffer, sizeof(buffer));
+      if (ret == 0) {  // EOF
+        if (close(pipes[0]) == -1) {
+          Die("close(%d) of pipe at EOF failed with %d: %s\n",
+              pipes[0], errno, strerror(errno));
+        }
+        break;
+      } else if (ret == -1) {
+        Die("read(%d, %p, %zd) failed with %d: %s\n",
+            pipes[0], &buffer, sizeof(buffer), errno, strerror(errno));
+      }
+      output += ::std::string(buffer, ret);
+    }
+
+    int status;
+    while (true) {
+      if (wait(&status) == -1) {
+        if (errno == EINTR) continue;
+        Die("wait(%p) in child failed with %d: %s\n",
+            &status, errno, strerror(errno));
+      } else {
+        break;
+      }
+    }
+    if (WIFEXITED(status)) {
+      if (WEXITSTATUS(status) != 0) {
+        MutexLocker sync(&shared->output_mutex);
+        fprintf(stderr, "Test %s exited with status %d. output:\n",
+                test->at(0).c_str(), WEXITSTATUS(status));
+        fputs(output.c_str(), stderr);
+      }
+    } else if (WIFSIGNALED(status)) {
+      MutexLocker sync(&shared->output_mutex);
+      fprintf(stderr, "Test %s terminated by signal %d: %s.\n",
+              test->at(0).c_str(),
+              WTERMSIG(status), strsignal(WTERMSIG(status)));
+        fputs(output.c_str(), stderr);
+    } else {
+      assert(WIFSTOPPED(status));
+      Die("Test %s was stopped.\n", test->at(0).c_str());
+    }
+
+    ++test;
+    if (test == kTests.end()) test = kTests.begin();
+    ++iterations;
+  }
+  {
+    MutexLocker sync(&shared->total_iterations_mutex);
+    shared->total_iterations += iterations;
+  }
+}
+
+void Run(Shared *shared) {
+  switch (fork()) {
+    case 0:  // in child
+      DoRun(shared);
+      _exit(EXIT_SUCCESS);
+    case -1:
+      Die("fork() of child failed with %d: %s\n", errno, strerror(errno));
+  }
+}
+
+int Main(int argc, char **argv) {
+  assert(argc >= 1);
+
+  ::aos::common::testing::GlobalCoreInstance global_core;
+
+  Shared *shared = static_cast<Shared *>(shm_malloc(sizeof(Shared)));
+  new (shared) Shared(time::Time::Now() + kTestTime);
+
+  char *temp = strdup(argv[0]);
+  shared->path = strdup(dirname(temp));
+  free(temp);
+
+  for (int i = 0; i < kTesters; ++i) {
+    Run(shared);
+  }
+
+  bool error = false;
+  for (int i = 0; i < kTesters; ++i) {
+    int status;
+    if (wait(&status) == -1) {
+      if (errno == EINTR) {
+        --i;
+      } else {
+        Die("wait(%p) failed with %d: %s\n", &status, errno, strerror(errno));
+      }
+    }
+    if (!WIFEXITED(status) || WEXITSTATUS(status) != 0) {
+      error = true;
+    }
+  }
+
+  printf("Ran a total of %d tests.\n", shared->total_iterations);
+  if (error) {
+    printf("A child had a problem during the test.\n");
+  }
+  return error ? EXIT_FAILURE : EXIT_SUCCESS;
+}
+
+}  // namespace aos
+
+int main(int argc, char **argv) {
+  return ::aos::Main(argc, argv);
+}
diff --git a/aos/atom_code/ipc_lib/mutex.cpp b/aos/atom_code/ipc_lib/mutex.cpp
index 4f908dd..47fc92a 100644
--- a/aos/atom_code/ipc_lib/mutex.cpp
+++ b/aos/atom_code/ipc_lib/mutex.cpp
@@ -26,10 +26,7 @@
 }
 
 void Mutex::Unlock() {
-  if (mutex_unlock(&impl_) != 0) {
-    LOG(FATAL, "mutex_unlock(%p(=%" PRIu32 ")) failed because of %d: %s\n",
-        &impl_, impl_, errno, strerror(errno));
-  }
+  mutex_unlock(&impl_);
 }
 
 bool Mutex::TryLock() {
diff --git a/aos/atom_code/ipc_lib/queue.c b/aos/atom_code/ipc_lib/queue.c
deleted file mode 100644
index 2e45326..0000000
--- a/aos/atom_code/ipc_lib/queue.c
+++ /dev/null
@@ -1,508 +0,0 @@
-#include "aos/atom_code/ipc_lib/queue.h"
-#include "aos/atom_code/ipc_lib/queue_internal.h"
-
-#include <stdio.h>
-#include <string.h>
-#include <errno.h>
-#include <assert.h>
-
-#include "aos/common/logging/logging.h"
-
-#define READ_DEBUG 0
-#define WRITE_DEBUG 0
-#define REF_DEBUG 0
-
-static inline aos_msg_header *get_header(void *msg) {
-	return (aos_msg_header *)((uint8_t *)msg - sizeof(aos_msg_header));
-}
-static inline aos_queue *aos_core_alloc_queue() {
-	return shm_malloc_aligned(sizeof(aos_queue), sizeof(int));
-}
-static inline void *aos_alloc_msg(aos_msg_pool *pool) {
-	return shm_malloc(pool->msg_length);
-}
-
-// actually free the given message
-static inline int aos_free_msg(aos_msg_pool *pool, void *msg, aos_queue *queue) {
-#if REF_DEBUG
-	if (pool->pool_lock == 0) {
-		//LOG(WARNING, "unprotected\n");
-	}
-#endif
-	aos_msg_header *header = get_header(msg);
-	if (pool->pool[header->index] != header) { // if something's messed up
-		fprintf(stderr, "queue: something is very very wrong with queue %p."
-				" pool->pool(=%p)[header->index(=%d)] != header(=%p)\n",
-				queue, pool->pool, header->index, header);
-		printf("queue: see stderr\n");
-		abort();
-	}
-#if REF_DEBUG
-	printf("ref free: %p\n", msg);
-#endif
-	--pool->used;
-
-	if (queue->recycle != NULL) {
-		void *const new_msg = aos_queue_get_msg(queue->recycle);
-		if (new_msg == NULL) {
-			fprintf(stderr, "queue: couldn't get a message"
-					" for recycle queue %p\n", queue->recycle);
-		} else {
-			// Take a message from recycle_queue and switch its
-			// header with the one being freed, which effectively
-			// switches which queue each message belongs to.
-			aos_msg_header *const new_header = get_header(new_msg);
-			// also switch the messages between the pools
-			pool->pool[header->index] = new_header;
-			if (mutex_lock(&queue->recycle->pool.pool_lock)) {
-				return -1;
-			}
-			queue->recycle->pool.pool[new_header->index] = header;
-			// swap the information in both headers
-			header_swap(header, new_header);
-			// don't unlock the other pool until all of its messages are valid
-			mutex_unlock(&queue->recycle->pool.pool_lock);
-			// use the header for new_msg which is now for this pool
-			header = new_header;
-			if (aos_queue_write_msg_free(queue->recycle,
-						(void *)msg, OVERRIDE) != 0) {
-				printf("queue: warning aos_queue_write_msg("
-						"%p(=queue(=%p)->recycle), %p, OVERRIDE)"
-						" failed\n",
-						queue->recycle, queue, msg);
-			}
-			msg = new_msg;
-		}
-	}
-
-	// where the one we're freeing was
-	int index = header->index;
-	header->index = -1;
-	if (index != pool->used) { // if we're not freeing the one on the end
-		// put the last one where the one we're freeing was
-		header = pool->pool[index] = pool->pool[pool->used];
-		// put the one we're freeing at the end
-		pool->pool[pool->used] = get_header(msg);
-		// update the former last one's index
-		header->index = index;
-	}
-	return 0;
-}
-// TODO(brians) maybe do this with atomic integer instructions so it doesn't have to lock/unlock pool_lock
-static inline int msg_ref_dec(void *msg, aos_msg_pool *pool, aos_queue *queue) {
-	if (msg == NULL) {
-		return 0;
-	}
-
-	int rv = 0;
-	if (mutex_lock(&pool->pool_lock)) {
-		return -1;
-	}
-	aos_msg_header *const header = get_header(msg);
-	header->ref_count --;
-	assert(header->ref_count >= 0);
-#if REF_DEBUG
-	printf("ref_dec_count: %p count=%d\n", msg, header->ref_count);
-#endif
-	if (header->ref_count == 0) {
-		rv = aos_free_msg(pool, msg, queue);
-	}
-	mutex_unlock(&pool->pool_lock);
-	return rv;
-}
-
-static inline int sigcmp(const aos_type_sig *sig1, const aos_type_sig *sig2) {
-	if (sig1->length != sig2->length) {
-		//LOG(DEBUG, "length mismatch 1=%d 2=%d\n", sig1->length, sig2->length);
-		return 0;
-	}
-	if (sig1->queue_length != sig2->queue_length) {
-		//LOG(DEBUG, "queue_length mismatch 1=%d 2=%d\n", sig1->queue_length, sig2->queue_length);
-		return 0;
-	}
-	if (sig1->hash != sig2->hash) {
-		//LOG(DEBUG, "hash mismatch 1=%d 2=%d\n", sig1->hash, sig2->hash);
-		return 0;
-	}
-	//LOG(DEBUG, "signature match\n");
-	return 1;
-}
-static inline aos_queue *aos_create_queue(const aos_type_sig *sig) {
-	aos_queue *const queue = aos_core_alloc_queue();
-	aos_msg_pool *const pool = &queue->pool;
-	pool->mem_length = sig->queue_length + EXTRA_MESSAGES;
-	pool->length = 0;
-	pool->used = 0;
-	pool->msg_length = sig->length + sizeof(aos_msg_header);
-	pool->pool = shm_malloc(sizeof(void *) * pool->mem_length);
-	aos_ring_buf *const buf = &queue->buf;
-	buf->length = sig->queue_length + 1;
-	if (buf->length < 2) { // TODO(brians) when could this happen?
-		buf->length = 2;
-	}
-	buf->data = shm_malloc(buf->length * sizeof(void *));
-	buf->start = 0;
-	buf->end = 0;
-	buf->msgs = 0;
-	buf->writable = 1;
-	buf->readable = 0;
-	buf->buff_lock = 0;
-	pool->pool_lock = 0;
-	queue->recycle = NULL;
-	return queue;
-}
-aos_queue *aos_fetch_queue(const char *name, const aos_type_sig *sig) {
-	//LOG(DEBUG, "Fetching the stupid queue: %s\n", name);
-	mutex_grab(&global_core->mem_struct->queues.alloc_lock);
-	aos_queue_list *list = global_core->mem_struct->queues.queue_list;
-	aos_queue_list *last = NULL;
-	while (list != NULL) {
-		// if we found a matching queue
-		if (strcmp(list->name, name) == 0 && sigcmp(&list->sig, sig)) {
-			mutex_unlock(&global_core->mem_struct->queues.alloc_lock);
-			return list->queue;
-		} else {
-			//LOG(DEBUG, "rejected queue %s strcmp=%d target=%s\n", (*list)->name, strcmp((*list)->name, name), name);
-		}
-		last = list;
-		list = list->next;
-	}
-	list = shm_malloc(sizeof(aos_queue_list));
-	if (last == NULL) {
-		global_core->mem_struct->queues.queue_list = list;
-	} else {
-		last->next = list;
-	}
-	list->sig = *sig;
-	const size_t name_size = strlen(name) + 1;
-	list->name = shm_malloc(name_size);
-	memcpy(list->name, name, name_size);
-	//LOG(INFO, "creating queue{name=%s, sig.length=%zd, sig.hash=%d, sig.queue_length=%d}\n", name, sig->length, sig->hash, sig->queue_length);
-	list->queue = aos_create_queue(sig);
-	//LOG(DEBUG, "Made the stupid queue: %s happy?\n", name);
-	list->next = NULL;
-	mutex_unlock(&global_core->mem_struct->queues.alloc_lock);
-	return list->queue;
-}
-aos_queue *aos_fetch_queue_recycle(const char *name, const aos_type_sig *sig,
-		const aos_type_sig *recycle_sig, aos_queue **recycle) {
-	if (sig->length != recycle_sig->length || sig->hash == recycle_sig->hash) {
-		*recycle = NULL;
-		return NULL;
-	}
-	aos_queue *const r = aos_fetch_queue(name, sig);
-	r->recycle = aos_fetch_queue(name, recycle_sig);
-	if (r == r->recycle) {
-		fprintf(stderr, "queue: r->recycle(=%p) == r(=%p)\n", r->recycle, r);
-		printf("see stderr\n");
-		abort();
-	}
-	*recycle = r->recycle;
-	return r;
-}
-
-int aos_queue_write_msg(aos_queue *queue, void *msg, int opts) {
-#if WRITE_DEBUG
-  printf("queue: write_msg(%p, %p, %d)\n", queue, msg, opts);
-#endif
-  int rv = 0;
-  if (msg == NULL || msg < (void *)global_core->mem_struct ||
-      msg > (void *)((intptr_t)global_core->mem_struct + global_core->size)) {
-    fprintf(stderr, "queue: attempt to write bad message %p to %p. aborting\n",
-            msg, queue);
-    printf("see stderr\n");
-    abort();
-  }
-  aos_ring_buf *const buf = &queue->buf;
-  if (mutex_lock(&buf->buff_lock)) {
-#if WRITE_DEBUG
-    printf("queue: locking buff_lock of %p failed\n", buf);
-#endif
-    return -1;
-  }
-  int new_end = (buf->end + 1) % buf->length;
-  while (new_end == buf->start) {
-    if (opts & NON_BLOCK) {
-#if WRITE_DEBUG
-      printf("queue: not blocking on %p. returning -1\n", queue);
-#endif
-      mutex_unlock(&buf->buff_lock);
-      return -1;
-    } else if (opts & OVERRIDE) {
-#if WRITE_DEBUG
-      printf("queue: overriding on %p\n", queue);
-#endif
-      // avoid leaking the message that we're going to overwrite
-      msg_ref_dec(buf->data[buf->start], &queue->pool, queue);
-      buf->start = (buf->start + 1) % buf->length;
-    } else { // BLOCK
-      mutex_unlock(&buf->buff_lock);
-#if WRITE_DEBUG
-      printf("queue: going to wait for writable(=%p) of %p\n",
-          &buf->writable, queue);
-#endif
-      if (condition_wait(&buf->writable)) {
-#if WRITE_DEBUG
-        printf("queue: waiting for writable(=%p) of %p failed\n",
-            &buf->writable, queue);
-#endif
-        return -1;
-      }
-#if WRITE_DEBUG
-      printf("queue: going to re-lock buff_lock of %p to write\n", queue);
-#endif
-      if (mutex_lock(&buf->buff_lock)) {
-#if WRITE_DEBUG
-        printf("queue: error locking buff_lock of %p\n", queue);
-#endif
-        return -1;
-      }
-    }
-    new_end = (buf->end + 1) % buf->length;
-  }
-  buf->data[buf->end] = msg;
-  ++buf->msgs;
-  buf->end = new_end;
-  mutex_unlock(&buf->buff_lock);
-#if WRITE_DEBUG
-  printf("queue: setting readable(=%p) of %p\n", &buf->readable, queue);
-#endif
-  condition_set(&buf->readable);
-  if (((buf->end + 1) % buf->length) == buf->start) { // if it's now full
-    condition_unset(&buf->writable);
-  }
-#if WRITE_DEBUG
-  printf("queue: write returning %d on queue %p\n", rv, queue);
-#endif
-  return rv;
-}
-
-int aos_queue_free_msg(aos_queue *queue, const void *msg) {
-	// TODO(brians) get rid of this
-	void *msg_temp;
-	memcpy(&msg_temp, &msg, sizeof(msg_temp));
-  	return msg_ref_dec(msg_temp, &queue->pool, queue);
-}
-// Deals with setting/unsetting readable and writable.
-// Should be called after buff_lock has been unlocked.
-// read is whether or not this read call read one off the queue
-static inline void aos_read_msg_common_end(aos_ring_buf *const buf, int read) {
-	if (read) {
-		condition_set(&buf->writable);
-		if (buf->start == buf->end) {
-			condition_unset(&buf->readable);
-		}
-	}
-}
-// Returns with buff_lock locked and a readable message in buf.
-// Returns -1 for error (if it returns -1, buff_lock will be unlocked).
-static inline int aos_queue_read_msg_common(int opts, aos_ring_buf *const buf,
-		aos_queue *const queue, int *index) {
-#if !READ_DEBUG
-	(void)queue;
-#endif
-	if (mutex_lock(&buf->buff_lock)) {
-#if READ_DEBUG
-		printf("queue: couldn't lock buff_lock of %p\n", queue);
-#endif
-		return -1;
-	}
-	while (buf->start == buf->end || ((index != NULL) && buf->msgs <= *index)) {
-		mutex_unlock(&buf->buff_lock);
-		if (opts & NON_BLOCK) {
-#if READ_DEBUG
-			printf("queue: not going to block waiting on %p\n", queue);
-#endif
-			return -1;
-		} else { // BLOCK
-#if READ_DEBUG
-			printf("queue: going to wait for readable(=%p) of %p\n",
-					&buf->readable, queue);
-#endif
-			// wait for a message to become readable
-			if ((index == NULL) ? condition_wait(&buf->readable) :
-					condition_wait_force(&buf->readable)) {
-#if READ_DEBUG
-				printf("queue: waiting for readable(=%p) of %p failed\n",
-						&buf->readable, queue);
-#endif
-				return -1;
-			}
-		}
-#if READ_DEBUG
-		printf("queue: going to re-lock buff_lock of %p to read\n", queue);
-#endif
-		if (mutex_lock(&buf->buff_lock)) {
-#if READ_DEBUG
-			printf("couldn't re-lock buff_lock of %p\n", queue);
-#endif
-			return -1;
-		}
-	}
-#if READ_DEBUG
-	printf("queue: read start=%d end=%d from %p\n", buf->start, buf->end, queue);
-#endif
-	return 0;
-}
-// handles reading with PEEK
-static inline void *read_msg_peek(aos_ring_buf *const buf, int opts, int start) {
-	void *ret;
-	if (opts & FROM_END) {
-		int pos = buf->end - 1;
-		if (pos < 0) { // if it needs to wrap
-			pos = buf->length - 1;
-		}
-#if READ_DEBUG
-		printf("queue: reading from line %d: %d\n", __LINE__, pos);
-#endif
-		ret = buf->data[pos];
-	} else {
-#if READ_DEBUG
-		printf("queue: reading from line %d: %d\n", __LINE__, start);
-#endif
-		ret = buf->data[start];
-	}
-	aos_msg_header *const header = get_header(ret);
-	header->ref_count ++;
-#if REF_DEBUG
-	printf("ref inc count: %p\n", ret);
-#endif
-	return ret;
-}
-const void *aos_queue_read_msg(aos_queue *queue, int opts) {
-#if READ_DEBUG
-	printf("queue: read_msg(%p, %d)\n", queue, opts);
-#endif
-	void *msg = NULL;
-	aos_ring_buf *const buf = &queue->buf;
-	if (aos_queue_read_msg_common(opts, buf, queue, NULL) == -1) {
-#if READ_DEBUG
-		printf("queue: common returned -1 for %p\n", queue);
-#endif
-		return NULL;
-	}
-	if (opts & PEEK) {
-		msg = read_msg_peek(buf, opts, buf->start);
-	} else {
-		if (opts & FROM_END) {
-			while (1) {
-#if READ_DEBUG
-				printf("queue: start of c2 of %p\n", queue);
-#endif
-				// This loop pulls each message out of the buffer.
-				const int pos = buf->start;
-				buf->start = (buf->start + 1) % buf->length;
-				// if this is the last one
-				if (buf->start == buf->end) {
-#if READ_DEBUG
-					printf("queue: reading from c2: %d\n", pos);
-#endif
-					msg = buf->data[pos];
-					break;
-				}
-				// it's not going to be in the queue any more
-				msg_ref_dec(buf->data[pos], &queue->pool, queue);
-			}
-		} else {
-#if READ_DEBUG
-			printf("queue: reading from d2: %d\n", buf->start);
-#endif
-			msg = buf->data[buf->start];
-			buf->start = (buf->start + 1) % buf->length;
-		}
-	}
-	mutex_unlock(&buf->buff_lock);
-	aos_read_msg_common_end(buf, !(opts & PEEK));
-#if READ_DEBUG
-	printf("queue: read returning %p\n", msg);
-#endif
-	return msg;
-}
-const void *aos_queue_read_msg_index(aos_queue *queue, int opts, int *index) {
-#if READ_DEBUG
-	printf("queue: read_msg_index(%p, %d, %p(*=%d))\n", queue, opts, index, *index);
-#endif
-	void *msg = NULL;
-	aos_ring_buf *const buf = &queue->buf;
-	if (aos_queue_read_msg_common(opts, buf, queue, index) == -1) {
-#if READ_DEBUG
-		printf("queue: common returned -1\n");
-#endif
-		return NULL;
-	}
-        // TODO(parker): Handle integer wrap on the index.
-	const int offset = buf->msgs - *index;
-	int my_start = buf->end - offset;
-	if (offset >= buf->length) { // if we're behind the available messages
-		// catch index up to the last available message
-		*index += buf->start - my_start;
-		// and that's the one we're going to read
-		my_start = buf->start;
-	}
-	if (my_start < 0) { // if we want to read off the end of the buffer
-		// unwrap where we're going to read from
-		my_start += buf->length;
-	}
-	if (opts & PEEK) {
-		msg = read_msg_peek(buf, opts, my_start);
-	} else {
-		if (opts & FROM_END) {
-#if READ_DEBUG
-			printf("queue: start of c1 of %p\n", queue);
-#endif
-			int pos = buf->end - 1;
-			if (pos < 0) { // if it wrapped
-				pos = buf->length - 1; // unwrap it
-			}
-#if READ_DEBUG
-			printf("queue: reading from c1: %d\n", pos);
-#endif
-			msg = buf->data[pos];
-			*index = buf->msgs;
-		} else {
-#if READ_DEBUG
-			printf("queue: reading from d1: %d\n", my_start);
-#endif
-			msg = buf->data[my_start];
-			++(*index);
-		}
-		aos_msg_header *const header = get_header(msg);
-		++header->ref_count;
-#if REF_DEBUG
-		printf("ref_inc_count: %p\n", msg);
-#endif
-	}
-	mutex_unlock(&buf->buff_lock);
-	// this function never consumes one off the queue
-	aos_read_msg_common_end(buf, 0);
-	return msg;
-}
-static inline void *aos_pool_get_msg(aos_msg_pool *pool) {
-	if (mutex_lock(&pool->pool_lock)) {
-		return NULL;
-	}
-	void *msg;
-	if (pool->length - pool->used > 0) {
-		msg = pool->pool[pool->used];
-	} else {
-		if (pool->length >= pool->mem_length) {
-			LOG(FATAL, "overused pool %p\n", pool);
-		}
-		msg = pool->pool[pool->length] = aos_alloc_msg(pool);
-		++pool->length;
-	}
-	aos_msg_header *const header = msg;
-	msg = (uint8_t *)msg + sizeof(aos_msg_header);
-	header->ref_count = 1;
-#if REF_DEBUG
-	printf("ref alloc: %p\n", msg);
-#endif
-	header->index = pool->used;
-	++pool->used;
-	mutex_unlock(&pool->pool_lock);
-	return msg;
-}
-void *aos_queue_get_msg(aos_queue *queue) {
-	return aos_pool_get_msg(&queue->pool);
-}
-
diff --git a/aos/atom_code/ipc_lib/queue.cc b/aos/atom_code/ipc_lib/queue.cc
new file mode 100644
index 0000000..7ab7b6c
--- /dev/null
+++ b/aos/atom_code/ipc_lib/queue.cc
@@ -0,0 +1,491 @@
+#include "aos/atom_code/ipc_lib/queue.h"
+
+#include <stdio.h>
+#include <string.h>
+#include <errno.h>
+#include <assert.h>
+
+#include <memory>
+
+#include "aos/common/logging/logging.h"
+#include "aos/common/type_traits.h"
+#include "aos/atom_code/ipc_lib/core_lib.h"
+
+namespace aos {
+namespace {
+
+static_assert(shm_ok<RawQueue>::value,
+              "RawQueue instances go into shared memory");
+
+const bool kReadDebug = false;
+const bool kWriteDebug = false;
+const bool kRefDebug = false;
+const bool kFetchDebug = false;
+
+// The number of extra messages the pool associated with each queue will be able
+// to hold (for readers who are slow about freeing them or who leak one when
+// they get killed).
+const int kExtraMessages = 20;
+
+}  // namespace
+
+const int RawQueue::kPeek;
+const int RawQueue::kFromEnd;
+const int RawQueue::kNonBlock;
+const int RawQueue::kBlock;
+const int RawQueue::kOverride;
+
+struct RawQueue::MessageHeader {
+  int ref_count;
+  int index;  // in pool_
+  static MessageHeader *Get(const void *msg) {
+    return reinterpret_cast<MessageHeader *>(
+        static_cast<uint8_t *>(const_cast<void *>(msg)) -
+        sizeof(MessageHeader));
+  }
+  void Swap(MessageHeader *other) {
+    MessageHeader temp;
+    memcpy(&temp, other, sizeof(temp));
+    memcpy(other, this, sizeof(*other));
+    memcpy(this, &temp, sizeof(*this));
+  }
+};
+static_assert(shm_ok<RawQueue::MessageHeader>::value, "the whole point"
+              " is to stick it in shared memory");
+
+struct RawQueue::ReadData {
+  bool writable_start;
+};
+
+// TODO(brians) maybe do this with atomic integer instructions so it doesn't
+//   have to lock/unlock pool_lock_
+void RawQueue::DecrementMessageReferenceCount(const void *msg) {
+  MutexLocker locker(&pool_lock_);
+  MessageHeader *header = MessageHeader::Get(msg);
+  --header->ref_count;
+  assert(header->ref_count >= 0);
+  if (kRefDebug) {
+    printf("ref_dec_count: %p count=%d\n", msg, header->ref_count);
+  }
+  if (header->ref_count == 0) {
+    DoFreeMessage(msg);
+  }
+}
+
+RawQueue::RawQueue(const char *name, size_t length, int hash, int queue_length)
+  : readable_(&data_lock_), writable_(&data_lock_) {
+  const size_t name_size = strlen(name) + 1;
+  char *temp = static_cast<char *>(shm_malloc(name_size));
+  memcpy(temp, name, name_size);
+  name_ = temp;
+  length_ = length;
+  hash_ = hash;
+  queue_length_ = queue_length;
+
+  next_ = NULL;
+  recycle_ = NULL;
+
+  if (kFetchDebug) {
+    printf("initializing name=%s, length=%zd, hash=%d, queue_length=%d\n",
+           name, length, hash, queue_length);
+  }
+
+  data_length_ = queue_length + 1;
+  if (data_length_ < 2) {  // TODO(brians) when could this happen?
+    data_length_ = 2;
+  }
+  data_ = static_cast<void **>(shm_malloc(sizeof(void *) * data_length_));
+  data_start_ = 0;
+  data_end_ = 0;
+  messages_ = 0;
+
+  mem_length_ = queue_length + kExtraMessages;
+  pool_length_ = 0;
+  messages_used_ = 0;
+  msg_length_ = length + sizeof(MessageHeader);
+  pool_ = static_cast<MessageHeader **>(
+      shm_malloc(sizeof(MessageHeader *) * mem_length_));
+
+  if (kFetchDebug) {
+    printf("made queue %s\n", name);
+  }
+}
+RawQueue *RawQueue::Fetch(const char *name, size_t length, int hash,
+                    int queue_length) {
+  if (kFetchDebug) {
+    printf("fetching queue %s\n", name);
+  }
+  if (mutex_lock(&global_core->mem_struct->queues.alloc_lock) != 0) {
+    return NULL;
+  }
+  RawQueue *current = static_cast<RawQueue *>(
+      global_core->mem_struct->queues.queue_list);
+  if (current != NULL) {
+    while (true) {
+      // If we found a matching queue.
+      if (strcmp(current->name_, name) == 0 && current->length_ == length &&
+          current->hash_ == hash && current->queue_length_ == queue_length) {
+        mutex_unlock(&global_core->mem_struct->queues.alloc_lock);
+        return current;
+      } else {
+        if (kFetchDebug) {
+          printf("rejected queue %s strcmp=%d target=%s\n", current->name_,
+                 strcmp(current->name_, name), name);
+        }
+      }
+      // If this is the last one.
+      if (current->next_ == NULL) break;
+      current = current->next_;
+    }
+  }
+
+  RawQueue *r = new (shm_malloc(sizeof(RawQueue)))
+      RawQueue(name, length, hash, queue_length);
+  if (current == NULL) {  // if we don't already have one
+    global_core->mem_struct->queues.queue_list = r;
+  } else {
+    current->next_ = r;
+  }
+
+  mutex_unlock(&global_core->mem_struct->queues.alloc_lock);
+  return r;
+}
+RawQueue *RawQueue::Fetch(const char *name, size_t length, int hash,
+                    int queue_length,
+                    int recycle_hash, int recycle_length, RawQueue **recycle) {
+  RawQueue *r = Fetch(name, length, hash, queue_length);
+  r->recycle_ = Fetch(name, length, recycle_hash, recycle_length);
+  if (r == r->recycle_) {
+    fprintf(stderr, "queue: r->recycle_(=%p) == r(=%p)\n", r->recycle_, r);
+    printf("see stderr\n");
+    r->recycle_ = NULL;
+    abort();
+  }
+  *recycle = r->recycle_;
+  return r;
+}
+
+void RawQueue::DoFreeMessage(const void *msg) {
+  MessageHeader *header = MessageHeader::Get(msg);
+  if (pool_[header->index] != header) {  // if something's messed up
+    fprintf(stderr, "queue: something is very very wrong with queue %p."
+            " pool_(=%p)[header->index(=%d)] != header(=%p)\n",
+            this, pool_, header->index, header);
+    printf("queue: see stderr\n");
+    abort();
+  }
+  if (kRefDebug) {
+    printf("ref free: %p\n", msg);
+  }
+  --messages_used_;
+
+  if (recycle_ != NULL) {
+    void *const new_msg = recycle_->GetMessage();
+    if (new_msg == NULL) {
+      fprintf(stderr, "queue: couldn't get a message"
+              " for recycle queue %p\n", recycle_);
+    } else {
+      // Take a message from recycle_ and switch its
+      // header with the one being freed, which effectively
+      // switches which queue each message belongs to.
+      MessageHeader *const new_header = MessageHeader::Get(new_msg);
+      // Also switch the messages between the pools.
+      pool_[header->index] = new_header;
+      {
+        MutexLocker locker(&recycle_->pool_lock_);
+        recycle_->pool_[new_header->index] = header;
+        // Swap the information in both headers.
+        header->Swap(new_header);
+        // Don't unlock the other pool until all of its messages are valid.
+      }
+      // use the header for new_msg which is now for this pool
+      header = new_header;
+      if (!recycle_->WriteMessage(const_cast<void *>(msg), kOverride)) {
+        fprintf(stderr, "queue: %p->WriteMessage(%p, kOverride) failed."
+                " aborting\n", recycle_, msg);
+        printf("see stderr\n");
+        abort();
+      }
+      msg = new_msg;
+    }
+  }
+
+  // Where the one we're freeing was.
+  int index = header->index;
+  header->index = -1;
+  if (index != messages_used_) {  // if we're not freeing the one on the end
+    // Put the last one where the one we're freeing was.
+    header = pool_[index] = pool_[messages_used_];
+    // Put the one we're freeing at the end.
+    pool_[messages_used_] = MessageHeader::Get(msg);
+    // Update the former last one's index.
+    header->index = index;
+  }
+}
+
+bool RawQueue::WriteMessage(void *msg, int options) {
+  if (kWriteDebug) {
+    printf("queue: %p->WriteMessage(%p, %x)\n", this, msg, options);
+  }
+  if (msg == NULL || msg < reinterpret_cast<void *>(global_core->mem_struct) ||
+      msg > static_cast<void *>((
+              reinterpret_cast<char *>(global_core->mem_struct) +
+              global_core->size))) {
+    fprintf(stderr, "queue: attempt to write bad message %p to %p. aborting\n",
+            msg, this);
+    printf("see stderr\n");
+    abort();
+  }
+  {
+    MutexLocker locker(&data_lock_);
+    bool writable_waited = false;
+
+    int new_end;
+    while (true) {
+      new_end = (data_end_ + 1) % data_length_;
+      // If there is room in the queue right now.
+      if (new_end != data_start_) break;
+      if (options & kNonBlock) {
+        if (kWriteDebug) {
+          printf("queue: not blocking on %p. returning false\n", this);
+        }
+        return false;
+      } else if (options & kOverride) {
+        if (kWriteDebug) {
+          printf("queue: overriding on %p\n", this);
+        }
+        // Avoid leaking the message that we're going to overwrite.
+        DecrementMessageReferenceCount(data_[data_start_]);
+        data_start_ = (data_start_ + 1) % data_length_;
+      } else {  // kBlock
+        if (kWriteDebug) {
+          printf("queue: going to wait for writable_ of %p\n", this);
+        }
+        writable_.Wait();
+        writable_waited = true;
+      }
+    }
+    data_[data_end_] = msg;
+    ++messages_;
+    data_end_ = new_end;
+
+    if (kWriteDebug) {
+      printf("queue: broadcasting to readable_ of %p\n", this);
+    }
+    readable_.Broadcast();
+
+    // If we got a signal on writable_ here and it's still writable, then we
+    // need to signal the next person in line (if any).
+    if (writable_waited && is_writable()) {
+      if (kWriteDebug) {
+        printf("queue: resignalling writable_ of %p\n", this);
+      }
+      writable_.Signal();
+    }
+  }
+  if (kWriteDebug) {
+    printf("queue: write returning true on queue %p\n", this);
+  }
+  return true;
+}
+
+void RawQueue::ReadCommonEnd(ReadData *read_data) {
+  if (is_writable()) {
+    if (kReadDebug) {
+      printf("queue: %ssignalling writable_ of %p\n",
+             read_data->writable_start ? "not " : "", this);
+    }
+    if (!read_data->writable_start) writable_.Signal();
+  }
+}
+bool RawQueue::ReadCommonStart(int options, int *index, ReadData *read_data) {
+  read_data->writable_start = is_writable();
+  while (data_start_ == data_end_ || ((index != NULL) && messages_ <= *index)) {
+    if (options & kNonBlock) {
+      if (kReadDebug) {
+        printf("queue: not going to block waiting on %p\n", this);
+      }
+      return false;
+    } else {  // kBlock
+      if (kReadDebug) {
+        printf("queue: going to wait for readable_ of %p\n", this);
+      }
+      // Wait for a message to become readable.
+      readable_.Wait();
+      if (kReadDebug) {
+        printf("queue: done waiting for readable_ of %p\n", this);
+      }
+    }
+  }
+  if (kReadDebug) {
+    printf("queue: %p->read start=%d end=%d\n", this, data_start_, data_end_);
+  }
+  return true;
+}
+void *RawQueue::ReadPeek(int options, int start) {
+  void *ret;
+  if (options & kFromEnd) {
+    int pos = data_end_ - 1;
+    if (pos < 0) {  // if it needs to wrap
+      pos = data_length_ - 1;
+    }
+    if (kReadDebug) {
+      printf("queue: %p reading from line %d: %d\n", this, __LINE__, pos);
+    }
+    ret = data_[pos];
+  } else {
+    if (kReadDebug) {
+      printf("queue: %p reading from line %d: %d\n", this, __LINE__, start);
+    }
+    ret = data_[start];
+  }
+  MessageHeader *const header = MessageHeader::Get(ret);
+  ++header->ref_count;
+  if (kRefDebug) {
+    printf("ref inc count: %p\n", ret);
+  }
+  return ret;
+}
+const void *RawQueue::ReadMessage(int options) {
+  if (kReadDebug) {
+    printf("queue: %p->ReadMessage(%x)\n", this, options);
+  }
+  void *msg = NULL;
+
+  MutexLocker locker(&data_lock_);
+
+  ReadData read_data;
+  if (!ReadCommonStart(options, NULL, &read_data)) {
+    if (kReadDebug) {
+      printf("queue: %p common returned false\n", this);
+    }
+    return NULL;
+  }
+
+  if (options & kPeek) {
+    msg = ReadPeek(options, data_start_);
+  } else {
+    if (options & kFromEnd) {
+      while (true) {
+        if (kReadDebug) {
+          printf("queue: %p start of c2\n", this);
+        }
+        // This loop pulls each message out of the buffer.
+        const int pos = data_start_;
+        data_start_ = (data_start_ + 1) % data_length_;
+        // If this is the last one.
+        if (data_start_ == data_end_) {
+          if (kReadDebug) {
+            printf("queue: %p reading from c2: %d\n", this, pos);
+          }
+          msg = data_[pos];
+          break;
+        }
+        // This message is not going to be in the queue any more.
+        DecrementMessageReferenceCount(data_[pos]);
+      }
+    } else {
+      if (kReadDebug) {
+        printf("queue: %p reading from d2: %d\n", this, data_start_);
+      }
+      msg = data_[data_start_];
+      // TODO(brians): Doesn't this need to increment the ref count?
+      data_start_ = (data_start_ + 1) % data_length_;
+    }
+  }
+  ReadCommonEnd(&read_data);
+  if (kReadDebug) {
+    printf("queue: %p read returning %p\n", this, msg);
+  }
+  return msg;
+}
+const void *RawQueue::ReadMessageIndex(int options, int *index) {
+  if (kReadDebug) {
+    printf("queue: %p->ReadMessageIndex(%x, %p(*=%d))\n",
+           this, options, index, *index);
+  }
+  void *msg = NULL;
+
+  MutexLocker locker(&data_lock_);
+
+  ReadData read_data;
+  if (!ReadCommonStart(options, index, &read_data)) {
+    if (kReadDebug) {
+      printf("queue: %p common returned false\n", this);
+    }
+    return NULL;
+  }
+
+  // TODO(parker): Handle integer wrap on the index.
+
+  // How many unread messages we have.
+  const int offset = messages_ - *index;
+  // Where we're going to start reading.
+  int my_start = data_end_ - offset;
+  if (my_start < 0) {  // If we want to read off the end of the buffer.
+    // Unwrap it.
+    my_start += data_length_;
+  }
+  if (offset >= data_length_) {  // If we're behind the available messages.
+    // Catch index up to the last available message.
+    *index += data_start_ - my_start;
+    // And that's the one we're going to read.
+    my_start = data_start_;
+  }
+  if (options & kPeek) {
+    msg = ReadPeek(options, my_start);
+  } else {
+    if (options & kFromEnd) {
+      if (kReadDebug) {
+        printf("queue: %p start of c1\n", this);
+      }
+      int pos = data_end_ - 1;
+      if (pos < 0) {  // If it wrapped.
+        pos = data_length_ - 1;  // Unwrap it.
+      }
+      if (kReadDebug) {
+        printf("queue: %p reading from c1: %d\n", this, pos);
+      }
+      msg = data_[pos];
+      *index = messages_;
+    } else {
+      if (kReadDebug) {
+        printf("queue: %p reading from d1: %d\n", this, my_start);
+      }
+      msg = data_[my_start];
+      ++(*index);
+    }
+    MessageHeader *const header = MessageHeader::Get(msg);
+    ++header->ref_count;
+    if (kRefDebug) {
+      printf("ref_inc_count: %p\n", msg);
+    }
+  }
+  ReadCommonEnd(&read_data);
+  return msg;
+}
+
+void *RawQueue::GetMessage() {
+  MutexLocker locker(&pool_lock_);
+  MessageHeader *header;
+  if (pool_length_ - messages_used_ > 0) {
+    header = pool_[messages_used_];
+  } else {
+    if (pool_length_ >= mem_length_) {
+      LOG(FATAL, "overused pool of queue %p\n", this);
+    }
+    header = pool_[pool_length_] =
+        static_cast<MessageHeader *>(shm_malloc(msg_length_));
+    ++pool_length_;
+  }
+  void *msg = reinterpret_cast<uint8_t *>(header) + sizeof(MessageHeader);
+  header->ref_count = 1;
+  if (kRefDebug) {
+    printf("%p ref alloc: %p\n", this, msg);
+  }
+  header->index = messages_used_;
+  ++messages_used_;
+  return msg;
+}
+
+}  // namespace aos
diff --git a/aos/atom_code/ipc_lib/queue.h b/aos/atom_code/ipc_lib/queue.h
index 4c279e1..5158558 100644
--- a/aos/atom_code/ipc_lib/queue.h
+++ b/aos/atom_code/ipc_lib/queue.h
@@ -1,134 +1,171 @@
-#ifndef AOS_IPC_LIB_QUEUE_H_
-#define AOS_IPC_LIB_QUEUE_H_
+#ifndef AOS_ATOM_CODE_IPC_LIB_QUEUE_H_
+#define AOS_ATOM_CODE_IPC_LIB_QUEUE_H_
 
-#include "shared_mem.h"
-#include "aos_sync.h"
+#include "aos/atom_code/ipc_lib/shared_mem.h"
+#include "aos/common/mutex.h"
+#include "aos/common/condition.h"
 
 // TODO(brians) add valgrind client requests to the queue and shared_mem_malloc
 // code to make checking for leaks work better
 // <http://www.valgrind.org/docs/manual/mc-manual.html#mc-manual.mempools>
 // describes how
 
-#ifdef __cplusplus
-extern "C" {
-#endif
-
-// Queues are the primary way to use shared memory. Basic use consists of
-// initializing an aos_type_sig and then calling aos_fetch_queue on it.
-// This aos_queue* can then be used to get a message and write it or to read a
-// message.
-// Queues (as the name suggests) are a FIFO stack of messages. Each combination
-// of name and aos_type_sig will result in a different queue, which means that
-// if you only recompile some code that uses differently sized messages, it will
-// simply use a different queue than the old code.
-//
 // Any pointers returned from these functions can be safely passed to other
 // processes because they are all shared memory pointers.
 // IMPORTANT: Any message pointer must be passed back in some way
-// (aos_queue_free_msg and aos_queue_write_msg are common ones) or the
+// (FreeMessage and WriteMessage are common ones) or the
 // application will leak shared memory.
-// NOTE: Taking a message from read_msg and then passing it to write_msg might
-// work, but it is not guaranteed to.
+// NOTE: Taking a message from ReadMessage and then passing it to WriteMessage
+// might work, but it is not guaranteed to.
 
-typedef struct aos_type_sig_t {
-	size_t length; // sizeof(message)
-	int hash; // can differentiate multiple otherwise identical queues
-	int queue_length; // how many messages the queue can hold
-} aos_type_sig;
+namespace aos {
 
-// Structures that are opaque to users (defined in queue_internal.h).
-typedef struct aos_queue_list_t aos_queue_list;
-typedef struct aos_queue_t aos_queue;
+// Queues are the primary way to use shared memory. Basic use consists of
+// calling Queue::Fetch and then reading and/or writing messages.
+// Queues (as the name suggests) are a FIFO stack of messages. Each combination
+// of name and type signature will result in a different queue, which means
+// that if you only recompile some code that uses differently sized messages,
+// it will simply use a different queue than the old code.
+class RawQueue {
+ public:
+  // Retrieves (and creates if necessary) a queue. Each combination of name and
+  // signature refers to a completely independent queue.
+  // length is how large each message will be
+  // hash can differentiate multiple otherwise identical queues
+  // queue_length is how many messages the queue will be able to hold
+  static RawQueue *Fetch(const char *name, size_t length, int hash,
+                      int queue_length);
+  // Same as above, except sets up the returned queue so that it will put
+  // messages on *recycle when they are freed (after they have been released by
+  // all other readers/writers and are not in the queue).
+  // recycle_queue_length determines how many freed messages will be kept.
+  // Other code can retrieve the 2 queues separately (the recycle queue will
+  // have the same length and hash as the main one). However, any frees made
+  // using a queue with only (name,length,hash,queue_length) before the
+  // recycle queue has been associated with it will not go on to the recycle
+  // queue.
+  // NOTE: calling this function with the same (name,length,hash,queue_length)
+  // but multiple recycle_queue_lengths will result in each freed message being
+  // put onto an undefined one of the recycle queues.
+  static RawQueue *Fetch(const char *name, size_t length, int hash,
+                      int queue_length,
+                      int recycle_hash, int recycle_queue_length,
+                      RawQueue **recycle);
 
-// Retrieves (and creates if necessary) a queue. Each combination of name and
-// signature refers to a completely independent queue.
-aos_queue *aos_fetch_queue(const char *name, const aos_type_sig *sig);
-// Same as above, except sets up the returned queue so that it will put messages
-// on *recycle (retrieved with recycle_sig) when they are freed (after they have
-// been released by all other readers/writers and are not in the queue).
-// The length of recycle_sig determines how many freed messages will be kept.
-// Other code can retrieve recycle_sig and sig separately. However, any frees
-// made using aos_fetch_queue with only sig before the recycle queue has been
-// associated with it will not go on to the recyce queue.
-// Will return NULL for both queues if sig->length != recycle_sig->length or
-// sig->hash == recycle_sig->hash (just to be safe).
-// NOTE: calling this function with the same sig but multiple recycle_sig s
-// will result in each freed message being put onto an undefined recycle_sig.
-aos_queue *aos_fetch_queue_recycle(const char *name, const aos_type_sig *sig,
-                                   const aos_type_sig *recycle_sig, aos_queue **recycle);
+  // Constants for passing to options arguments.
+  // The non-conflicting ones can be combined with bitwise-or.
 
-// Constants for passing to opts arguments.
-// #defines so that c code can use queues
-// The non-conflicting ones can be combined with bitwise-or.
-// TODO(brians) prefix these?
-//
-// Causes the returned message to be left in the queue.
-// For reading only.
-#define PEEK      0x0001
-// Reads the last message in the queue instead of just the next one.
-// NOTE: This removes all of the messages until the last one from the queue
-// (which means that nobody else will read them). However, PEEK means to not
-// remove any from the queue, including the ones that are skipped.
-// For reading only.
-#define FROM_END  0x0002
-// Causes reads to return NULL and writes to fail instead of waiting.
-// For reading and writing.
-#define NON_BLOCK 0x0004
-// Causes things to block.
-// IMPORTANT: #defined to 0 so that it is the default. This has to stay.
-// For reading and writing.
-#define BLOCK     0x0000
-// Causes writes to overwrite the oldest message in the queue instead of
-// blocking.
-// For writing only.
-#define OVERRIDE  0x0008
+  // Causes the returned message to be left in the queue.
+  // For reading only.
+  static const int kPeek = 0x0001;
+  // Reads the last message in the queue instead of just the next one.
+  // NOTE: This removes all of the messages until the last one from the queue
+  // (which means that nobody else will read them). However, PEEK means to not
+  // remove any from the queue, including the ones that are skipped.
+  // For reading only.
+  static const int kFromEnd = 0x0002;
+  // Causes reads to return NULL and writes to fail instead of waiting.
+  // For reading and writing.
+  static const int kNonBlock = 0x0004;
+  // Causes things to block.
+  // IMPORTANT: Has a value of 0 so that it is the default. This has to stay.
+  // For reading and writing.
+  static const int kBlock = 0x0000;
+  // Causes writes to overwrite the oldest message in the queue instead of
+  // blocking.
+  // For writing only.
+  static const int kOverride = 0x0008;
 
-// Frees a message. Does nothing if msg is NULL.
-int aos_queue_free_msg(aos_queue *queue, const void *msg);
+  // Writes a message into the queue.
+  // This function takes ownership of msg.
+  // NOTE: msg must point to a valid message from this queue
+  // Returns truen on success.
+  bool WriteMessage(void *msg, int options);
 
-// Writes a message into the queue.
-// NOTE: msg must point to at least the length of this queue's worth of valid
-// data to write
-// IMPORTANT: if this returns -1, then the caller must do something with msg
-// (like free it)
-int aos_queue_write_msg(aos_queue *queue, void *msg, int opts);
-// Exactly the same as aos_queue_write_msg, except it automatically frees the
-// message if writing fails.
-static inline int aos_queue_write_msg_free(aos_queue *queue, void *msg, int opts) {
-  const int ret = aos_queue_write_msg(queue, msg, opts);
-  if (ret != 0) {
-    aos_queue_free_msg(queue, msg);
+  // Reads a message out of the queue.
+  // The return value will have at least the length of this queue's worth of
+  // valid data where it's pointing to.
+  // The return value is const because other people might be viewing the same
+  // messsage. Do not cast the const away!
+  // IMPORTANT: The return value (if not NULL) must eventually be passed to
+  // FreeMessage.
+  const void *ReadMessage(int options);
+  // Exactly the same as aos_queue_read_msg, except it will never return the
+  // same message twice with the same index argument. However, it may not
+  // return some messages that pass through the queue.
+  // *index should start as 0. index does not have to be in shared memory, but
+  // it can be
+  const void *ReadMessageIndex(int options, int *index);
+
+  // Retrieves ("allocates") a message that can then be written to the queue.
+  // NOTE: the return value will be completely uninitialized
+  // The return value will have at least the length of this queue's worth of
+  // valid memory where it's pointing to.
+  // Returns NULL for error.
+  // IMPORTANT: The return value (if not NULL) must eventually be passed to
+  // FreeMessage.
+  void *GetMessage();
+
+  // It is ok to call this method with a NULL msg.
+  void FreeMessage(const void *msg) {
+    if (msg != NULL) DecrementMessageReferenceCount(msg);
   }
-  return ret;
-}
 
-// Reads a message out of the queue.
-// The return value will have at least the length of this queue's worth of valid
-// data where it's pointing to.
-// The return value is const because other people might be viewing the same
-// messsage. Do not cast the const away!
-// IMPORTANT: The return value (if not NULL) must eventually be passed to
-// aos_queue_free_msg.
-const void *aos_queue_read_msg(aos_queue *buf, int opts);
-// Exactly the same as aos_queue_read_msg, except it will never return the same
-// message twice with the same index argument. However, it may not return some
-// messages that pass through the queue.
-// *index should start as 0. index does not have to be in shared memory, but it
-// can be
-const void *aos_queue_read_msg_index(aos_queue *queue, int opts, int *index);
+ private:
+  struct MessageHeader;
+  struct ReadData;
 
-// Retrieves ("allocates") a message that can then be written to the queue.
-// NOTE: the return value will be completely uninitialized
-// The return value will have at least the length of this queue's worth of valid
-// data where it's pointing to.
-// Returns NULL for error.
-// IMPORTANT: The return value (if not NULL) must eventually be passed to
-// aos_queue_free_msg.
-void *aos_queue_get_msg(aos_queue *queue);
+  bool is_readable() { return data_end_ != data_start_; }
+  bool is_writable() { return ((data_end_ + 1) % data_length_) != data_start_; }
 
-#ifdef __cplusplus
-}
-#endif
+  // These next 4 allow finding the right one.
+  const char *name_;
+  size_t length_;
+  int hash_;
+  int queue_length_;
+  // The next one in the linked list of queues.
+  RawQueue *next_;
 
-#endif
+  RawQueue *recycle_;
 
+  Mutex data_lock_;  // protects operations on data_ etc
+  // Always gets broadcasted to because different readers might have different
+  // ideas of what "readable" means (ie ones using separated indices).
+  Condition readable_;
+  Condition writable_;
+  int data_length_;  // max length into data + 1
+  int data_start_;  // is an index into data
+  int data_end_;  // is an index into data
+  int messages_;  // that have passed through
+  void **data_;  // array of messages (with headers)
+
+  Mutex pool_lock_;
+  size_t msg_length_;  // sizeof(each message) including the header
+  int mem_length_;  // the max number of messages that will ever be allocated
+  int messages_used_;
+  int pool_length_;  // the number of allocated messages
+  MessageHeader **pool_;  // array of pointers to messages
+
+  // Actually frees the given message.
+  void DoFreeMessage(const void *msg);
+  // Calls DoFreeMessage if appropriate.
+  void DecrementMessageReferenceCount(const void *msg);
+
+  // Should be called with data_lock_ locked.
+  // *read_data will be initialized.
+  // Returns with a readable message in data_ or false.
+  bool ReadCommonStart(int options, int *index, ReadData *read_data);
+  // Deals with setting/unsetting readable_ and writable_.
+  // Should be called after data_lock_ has been unlocked.
+  // read_data should be the same thing that was passed in to ReadCommonStart.
+  void ReadCommonEnd(ReadData *read_data);
+  // Handles reading with kPeek.
+  void *ReadPeek(int options, int start);
+
+  // Gets called by Fetch when necessary (with placement new).
+  RawQueue(const char *name, size_t length, int hash, int queue_length);
+};
+
+}  // namespace aos
+
+#endif  // AOS_ATOM_CODE_IPC_LIB_QUEUE_H_
diff --git a/aos/atom_code/ipc_lib/queue_internal.h b/aos/atom_code/ipc_lib/queue_internal.h
deleted file mode 100644
index e6b23ef..0000000
--- a/aos/atom_code/ipc_lib/queue_internal.h
+++ /dev/null
@@ -1,62 +0,0 @@
-#ifndef AOS_IPC_LIB_QUEUE_INTERNAL_H_
-#define AOS_IPC_LIB_QUEUE_INTERNAL_H_
-
-#include "shared_mem.h"
-#include "aos_sync.h"
-
-// Should only be used by queue.c. Contains definitions of the structures
-// it uses.
-
-// The number of extra messages the pool associated with each queue will be able
-// to hold (for readers who are slow about freeing them).
-#define EXTRA_MESSAGES 20
-
-typedef struct aos_msg_header_t {
-	int ref_count;
-	int index; // in the pool
-} aos_msg_header;
-static inline void header_swap(aos_msg_header *l, aos_msg_header *r) {
-  aos_msg_header tmp;
-  tmp.ref_count = l->ref_count;
-  tmp.index = l->index;
-  l->ref_count = r->ref_count;
-  l->index = r->index;
-  r->ref_count = tmp.ref_count;
-  r->index = tmp.index;
-}
-
-struct aos_queue_list_t {
-	char *name;
-	aos_type_sig sig;
-	aos_queue *queue;
-	aos_queue_list *next;
-};
-
-typedef struct aos_ring_buf_t {
-	mutex buff_lock; // the main lock protecting operations on this buffer
-  // conditions
-	mutex writable;
-	mutex readable;
-	int length; // max index into data + 1
-	int start; // is an index into data
-	int end; // is an index into data
-	int msgs; // that have passed through
-	void **data; // array of messages (w/ headers)
-} aos_ring_buf;
-
-typedef struct aos_msg_pool_t {
-	mutex pool_lock;
-	size_t msg_length;
-	int mem_length; // the number of messages
-	int used; // number of messages
-	int length; // number of allocated messages
-	void **pool; // array of messages
-} aos_msg_pool;
-
-struct aos_queue_t {
-	aos_msg_pool pool;
-	aos_ring_buf buf;
-  aos_queue *recycle;
-};
-
-#endif
diff --git a/aos/atom_code/ipc_lib/queue_test.cpp b/aos/atom_code/ipc_lib/queue_test.cc
similarity index 61%
rename from aos/atom_code/ipc_lib/queue_test.cpp
rename to aos/atom_code/ipc_lib/queue_test.cc
index e9ac8d5..9e5f3ae 100644
--- a/aos/atom_code/ipc_lib/queue_test.cpp
+++ b/aos/atom_code/ipc_lib/queue_test.cc
@@ -1,3 +1,5 @@
+#include "aos/common/queue.h"
+
 #include <unistd.h>
 #include <sys/mman.h>
 #include <inttypes.h>
@@ -8,25 +10,25 @@
 
 #include "gtest/gtest.h"
 
-#include "aos/atom_code/ipc_lib/sharedmem_test_setup.h"
+#include "aos/atom_code/ipc_lib/core_lib.h"
 #include "aos/common/type_traits.h"
+#include "aos/common/queue_testutils.h"
+#include "aos/common/time.h"
+#include "aos/common/logging/logging.h"
 
-using testing::AssertionResult;
-using testing::AssertionSuccess;
-using testing::AssertionFailure;
+using ::testing::AssertionResult;
+using ::testing::AssertionSuccess;
+using ::testing::AssertionFailure;
+using ::aos::common::testing::GlobalCoreInstance;
 
-// IMPORTANT: Some of the functions that do test predicate functions allocate
-// shared memory (and don't free it).
-class QueueTest : public SharedMemTestSetup {
+namespace aos {
+namespace testing {
+
+class QueueTest : public ::testing::Test {
  protected:
   static const size_t kFailureSize = 400;
   static char *fatal_failure;
  private:
-  // This gets registered right after the fork, so it will get run before any
-  // exit handlers that had already been registered.
-  static void ExitExitHandler() {
-    _exit(EXIT_SUCCESS);
-  }
   enum class ResultType : uint8_t {
     NotCalled,
     Called,
@@ -44,32 +46,37 @@
         return std::string("unknown(" + static_cast<uint8_t>(result)) + ")";
     }
   }
-  static_assert(aos::shm_ok<ResultType>::value, "this will get put in shared memory");
-  // Gets allocated in shared memory so it has to be volatile.
-  template<typename T> struct FunctionToCall {
-    ResultType result;
+  static_assert(aos::shm_ok<ResultType>::value,
+                "this will get put in shared memory");
+  template<typename T>
+  struct FunctionToCall {
+    FunctionToCall() : result(ResultType::NotCalled) {
+      started.Lock();
+    }
+
+    volatile ResultType result;
     bool expected;
     void (*function)(T*, char*);
     T *arg;
     volatile char failure[kFailureSize];
+    Mutex started;
   };
-  template<typename T> static void Hangs_(volatile FunctionToCall<T> *const to_call) {
+  template<typename T>
+  static void Hangs_(FunctionToCall<T> *const to_call) {
+    to_call->started.Unlock();
     to_call->result = ResultType::Called;
     to_call->function(to_call->arg, const_cast<char *>(to_call->failure));
     to_call->result = ResultType::Returned;
   }
 
-  static const long kMsToNs = 1000000;
-  // The number of ms after which a function is considered to have hung.
-  // Must be < 1000.
-  static const long kHangTime = 10;
-  static const unsigned int kForkSleep = 0; // how many seconds to sleep after forking
+  // How long until a function is considered to have hung.
+  static constexpr time::Time kHangTime = time::Time::InSeconds(0.035);
+  // How long to sleep after forking (for debugging).
+  static constexpr time::Time kForkSleep = time::Time::InSeconds(0);
 
   // Represents a process that has been forked off. The destructor kills the
   // process and wait(2)s for it.
   class ForkedProcess {
-    const pid_t pid_;
-    mutex *const lock_;
    public:
     ForkedProcess(pid_t pid, mutex *lock) : pid_(pid), lock_(lock) {};
     ~ForkedProcess() {
@@ -84,24 +91,25 @@
       }
       const pid_t ret = wait(NULL);
       if (ret == -1) {
-        fprintf(stderr, "wait(NULL) failed."
-                " child %jd might still be alive\n",
-                static_cast<intmax_t>(pid_));
+        LOG(WARNING, "wait(NULL) failed."
+            " child %jd might still be alive\n",
+            static_cast<intmax_t>(pid_));
       } else if (ret == 0) {
-        fprintf(stderr, "child %jd wasn't waitable. it might still be alive\n",
-                static_cast<intmax_t>(pid_));
+        LOG(WARNING, "child %jd wasn't waitable. it might still be alive\n",
+            static_cast<intmax_t>(pid_));
       } else if (ret != pid_) {
-        fprintf(stderr, "child %d is dead, but child %jd might still be alive\n",
-               ret, static_cast<intmax_t>(pid_));
+        LOG(WARNING, "child %d is now confirmed dead"
+            ", but child %jd might still be alive\n",
+            ret, static_cast<intmax_t>(pid_));
       }
     }
 
     enum class JoinResult {
       Finished, Hung, Error
     };
-    JoinResult Join(long timeout = kHangTime) {
-      timespec ts{kForkSleep, timeout * kMsToNs};
-      switch (mutex_lock_timeout(lock_, &ts)) {
+    JoinResult Join(time::Time timeout = kHangTime) {
+      timespec lock_timeout = (kForkSleep + timeout).ToTimespec();
+      switch (mutex_lock_timeout(lock_, &lock_timeout)) {
         case 2:
           return JoinResult::Hung;
         case 0:
@@ -110,9 +118,13 @@
           return JoinResult::Error;
       }
     }
+
+   private:
+    const pid_t pid_;
+    mutex *const lock_;
   } __attribute__((unused));
 
-  // Member variables for HangsFork and HangsCheck.
+  // State for HangsFork and HangsCheck.
   typedef uint8_t ChildID;
   static void ReapExitHandler() {
     for (auto it = children_.begin(); it != children_.end(); ++it) {
@@ -120,11 +132,11 @@
     }
   }
   static std::map<ChildID, ForkedProcess *> children_;
-  std::map<ChildID, volatile FunctionToCall<void> *> to_calls_;
+  std::map<ChildID, FunctionToCall<void> *> to_calls_;
 
-  void SetUp() {
-    SharedMemTestSetup::SetUp();
-    fatal_failure = reinterpret_cast<char *>(shm_malloc(sizeof(fatal_failure)));
+  void SetUp() override {
+    ::testing::Test::SetUp();
+    fatal_failure = static_cast<char *>(shm_malloc(sizeof(fatal_failure)));
     static bool registered = false;
     if (!registered) {
       atexit(ReapExitHandler);
@@ -133,30 +145,30 @@
   }
 
  protected:
-  // Function gets called with arg in a forked process.
+  // function gets called with arg in a forked process.
   // Leaks shared memory.
-  // the attribute is in the middle to make gcc happy
   template<typename T> __attribute__((warn_unused_result))
-      std::unique_ptr<ForkedProcess> ForkExecute(void (*function)(T*), T *arg) {
-    mutex *lock = reinterpret_cast<mutex *>(shm_malloc_aligned(
+  std::unique_ptr<ForkedProcess> ForkExecute(void (*function)(T*), T *arg) {
+    mutex *lock = static_cast<mutex *>(shm_malloc_aligned(
             sizeof(*lock), sizeof(int)));
-    *lock = 1;
+    assert(mutex_lock(lock) == 0);
     const pid_t pid = fork();
     switch (pid) {
-      case 0: // child
-        if (kForkSleep != 0) {
-          printf("pid %jd sleeping for %u\n", static_cast<intmax_t>(getpid()),
-                 kForkSleep);
-          sleep(kForkSleep);
+      case 0:  // child
+        if (kForkSleep != time::Time(0, 0)) {
+          LOG(INFO, "pid %jd sleeping for %ds%dns\n",
+              static_cast<intmax_t>(getpid()),
+              kForkSleep.sec(), kForkSleep.nsec());
+          time::SleepFor(kForkSleep);
         }
-        atexit(ExitExitHandler);
+        ::aos::common::testing::PreventExit();
         function(arg);
         mutex_unlock(lock);
         exit(EXIT_SUCCESS);
-      case -1: // parent failure
-        printf("fork() failed with %d: %s\n", errno, strerror(errno));
+      case -1:  // parent failure
+        LOG(ERROR, "fork() failed with %d: %s\n", errno, strerror(errno));
         return std::unique_ptr<ForkedProcess>();
-      default: // parent
+      default:  // parent
         return std::unique_ptr<ForkedProcess>(new ForkedProcess(pid, lock));
     }
   }
@@ -166,8 +178,8 @@
   // NOTE: There are other reasons for it to return a failure than the function
   // doing the wrong thing.
   // Leaks shared memory.
-  template<typename T> AssertionResult Hangs(void (*function)(T*, char*), T *arg,
-                                             bool expected) {
+  template<typename T>
+  AssertionResult Hangs(void (*function)(T*, char*), T *arg, bool expected) {
     AssertionResult fork_result(HangsFork<T>(function, arg, expected, 0));
     if (!fork_result) {
       return fork_result;
@@ -178,21 +190,24 @@
   // Use HangsCheck to get the result.
   // Returns whether the fork succeeded or not, NOT whether or not the hang
   // check succeeded.
-  template<typename T> AssertionResult HangsFork(void (*function)(T*, char *), T *arg,
-                                                 bool expected, ChildID id) {
+  template<typename T>
+  AssertionResult HangsFork(void (*function)(T*, char *), T *arg,
+                            bool expected, ChildID id) {
     static_assert(aos::shm_ok<FunctionToCall<T>>::value,
                   "this is going into shared memory");
-    volatile FunctionToCall<T> *const to_call = reinterpret_cast<FunctionToCall<T> *>(
-        shm_malloc_aligned(sizeof(*to_call), sizeof(int)));
-    to_call->result = ResultType::NotCalled;
+    FunctionToCall<T> *const to_call =
+        static_cast<FunctionToCall<T> *>(
+            shm_malloc_aligned(sizeof(*to_call), alignof(FunctionToCall<T>)));
+    new (to_call) FunctionToCall<T>();
     to_call->function = function;
     to_call->arg = arg;
     to_call->expected = expected;
     to_call->failure[0] = '\0';
-    static_cast<volatile char *>(fatal_failure)[0] = '\0';
+    static_cast<char *>(fatal_failure)[0] = '\0';
     children_[id] = ForkExecute(Hangs_, to_call).release();
     if (!children_[id]) return AssertionFailure() << "ForkExecute failed";
-    to_calls_[id] = reinterpret_cast<volatile FunctionToCall<void> *>(to_call);
+    to_calls_[id] = reinterpret_cast<FunctionToCall<void> *>(to_call);
+    to_call->started.Lock();
     return AssertionSuccess();
   }
   // Checks whether or not a function hung like it was supposed to.
@@ -214,8 +229,12 @@
           << ResultTypeString(to_calls_[id]->result));
     } else {
       if (to_calls_[id]->result == ResultType::Called) {
-        return to_calls_[id]->expected ? AssertionSuccess() : AssertionFailure();
+        return to_calls_[id]->expected ? AssertionSuccess() :
+            AssertionFailure();
+      } else if (result == ForkedProcess::JoinResult::Error) {
+        return AssertionFailure() << "error joining child";
       } else {
+        abort();
         return AssertionFailure() << "something weird happened";
       }
     }
@@ -234,28 +253,30 @@
 } while (false)
 
   struct TestMessage {
-    int16_t data; // don't really want to test empty messages
+    // Some contents because we don't really want to test empty messages.
+    int16_t data;
   };
   struct MessageArgs {
-    aos_queue *const queue;
+    RawQueue *const queue;
     int flags;
-    int16_t data; // -1 means NULL expected
+    int16_t data;  // -1 means NULL expected
   };
   static void WriteTestMessage(MessageArgs *args, char *failure) {
-    TestMessage *msg = reinterpret_cast<TestMessage *>(aos_queue_get_msg(args->queue));
+    TestMessage *msg = static_cast<TestMessage *>(args->queue->GetMessage());
     if (msg == NULL) {
-      snprintf(fatal_failure, kFailureSize, "couldn't get_msg from %p", args->queue);
+      snprintf(fatal_failure, kFailureSize,
+               "couldn't get_msg from %p", args->queue);
       return;
     }
     msg->data = args->data;
-    if (aos_queue_write_msg_free(args->queue, msg, args->flags) == -1) {
+    if (!args->queue->WriteMessage(msg, args->flags)) {
       snprintf(failure, kFailureSize, "write_msg_free(%p, %p, %d) failed",
                args->queue, msg, args->flags);
     }
   }
   static void ReadTestMessage(MessageArgs *args, char *failure) {
-    const TestMessage *msg = reinterpret_cast<const TestMessage *>(
-        aos_queue_read_msg(args->queue, args->flags));
+    const TestMessage *msg = static_cast<const TestMessage *>(
+        args->queue->ReadMessage(args->flags));
     if (msg == NULL) {
       if (args->data != -1) {
         snprintf(failure, kFailureSize,
@@ -268,89 +289,88 @@
                  "expected data of %" PRId16 " but got %" PRId16 " instead",
                  args->data, msg->data);
       }
-      aos_queue_free_msg(args->queue, msg);
+      args->queue->FreeMessage(msg);
     }
   }
+
+ private:
+  GlobalCoreInstance my_core;
 };
 char *QueueTest::fatal_failure;
 std::map<QueueTest::ChildID, QueueTest::ForkedProcess *> QueueTest::children_;
+constexpr time::Time QueueTest::kHangTime;
+constexpr time::Time QueueTest::kForkSleep;
 
 TEST_F(QueueTest, Reading) {
-  static const aos_type_sig signature{sizeof(TestMessage), 1, 1};
-  aos_queue *const queue = aos_fetch_queue("Queue", &signature);
+  RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 1);
   MessageArgs args{queue, 0, -1};
 
-  EXPECT_EQ(BLOCK, 0);
-  EXPECT_EQ(BLOCK | FROM_END, FROM_END);
-
-  args.flags = NON_BLOCK;
+  args.flags = RawQueue::kNonBlock;
   EXPECT_RETURNS(ReadTestMessage, &args);
-  args.flags = NON_BLOCK | PEEK;
+  args.flags = RawQueue::kNonBlock | RawQueue::kPeek;
   EXPECT_RETURNS(ReadTestMessage, &args);
   args.flags = 0;
   EXPECT_HANGS(ReadTestMessage, &args);
-  args.flags = PEEK;
+  args.flags = RawQueue::kPeek;
   EXPECT_HANGS(ReadTestMessage, &args);
   args.data = 254;
-  args.flags = BLOCK;
+  args.flags = RawQueue::kBlock;
   EXPECT_RETURNS(WriteTestMessage, &args);
-  args.flags = PEEK;
+  args.flags = RawQueue::kPeek;
   EXPECT_RETURNS(ReadTestMessage, &args);
-  args.flags = PEEK;
+  args.flags = RawQueue::kPeek;
   EXPECT_RETURNS(ReadTestMessage, &args);
-  args.flags = PEEK | NON_BLOCK;
+  args.flags = RawQueue::kPeek | RawQueue::kNonBlock;
   EXPECT_RETURNS(ReadTestMessage, &args);
   args.flags = 0;
   EXPECT_RETURNS(ReadTestMessage, &args);
   args.flags = 0;
   args.data = -1;
   EXPECT_HANGS(ReadTestMessage, &args);
-  args.flags = NON_BLOCK;
+  args.flags = RawQueue::kNonBlock;
   EXPECT_RETURNS(ReadTestMessage, &args);
   args.flags = 0;
   args.data = 971;
   EXPECT_RETURNS_FAILS(ReadTestMessage, &args);
 }
 TEST_F(QueueTest, Writing) {
-  static const aos_type_sig signature{sizeof(TestMessage), 1, 1};
-  aos_queue *const queue = aos_fetch_queue("Queue", &signature);
+  RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 1);
   MessageArgs args{queue, 0, 973};
 
-  args.flags = BLOCK;
+  args.flags = RawQueue::kBlock;
   EXPECT_RETURNS(WriteTestMessage, &args);
-  args.flags = BLOCK;
+  args.flags = RawQueue::kBlock;
   EXPECT_HANGS(WriteTestMessage, &args);
-  args.flags = NON_BLOCK;
+  args.flags = RawQueue::kNonBlock;
   EXPECT_RETURNS_FAILS(WriteTestMessage, &args);
-  args.flags = NON_BLOCK;
+  args.flags = RawQueue::kNonBlock;
   EXPECT_RETURNS_FAILS(WriteTestMessage, &args);
-  args.flags = PEEK;
+  args.flags = RawQueue::kPeek;
   EXPECT_RETURNS(ReadTestMessage, &args);
   args.data = 971;
-  args.flags = OVERRIDE;
+  args.flags = RawQueue::kOverride;
   EXPECT_RETURNS(WriteTestMessage, &args);
-  args.flags = OVERRIDE;
+  args.flags = RawQueue::kOverride;
   EXPECT_RETURNS(WriteTestMessage, &args);
   args.flags = 0;
   EXPECT_RETURNS(ReadTestMessage, &args);
-  args.flags = NON_BLOCK;
+  args.flags = RawQueue::kNonBlock;
   EXPECT_RETURNS(WriteTestMessage, &args);
   args.flags = 0;
   EXPECT_RETURNS(ReadTestMessage, &args);
-  args.flags = OVERRIDE;
+  args.flags = RawQueue::kOverride;
   EXPECT_RETURNS(WriteTestMessage, &args);
   args.flags = 0;
   EXPECT_RETURNS(ReadTestMessage, &args);
 }
 
 TEST_F(QueueTest, MultiRead) {
-  static const aos_type_sig signature{sizeof(TestMessage), 1, 1};
-  aos_queue *const queue = aos_fetch_queue("Queue", &signature);
+  RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 1);
   MessageArgs args{queue, 0, 1323};
 
-  args.flags = BLOCK;
+  args.flags = RawQueue::kBlock;
   EXPECT_RETURNS(WriteTestMessage, &args);
-  args.flags = BLOCK;
+  args.flags = RawQueue::kBlock;
   ASSERT_TRUE(HangsFork(ReadTestMessage, &args, true, 1));
   ASSERT_TRUE(HangsFork(ReadTestMessage, &args, true, 2));
   EXPECT_TRUE(HangsCheck(1) != HangsCheck(2));
@@ -360,45 +380,45 @@
 TEST_F(QueueTest, Recycle) {
   // TODO(brians) basic test of recycle queue
   // include all of the ways a message can get into the recycle queue
-  static const aos_type_sig signature{sizeof(TestMessage), 1, 2},
-               recycle_signature{sizeof(TestMessage), 2, 2};
-  aos_queue *recycle_queue = reinterpret_cast<aos_queue *>(23);
-  aos_queue *const queue = aos_fetch_queue_recycle("Queue", &signature,
-                                                   &recycle_signature, &recycle_queue);
-  ASSERT_NE(reinterpret_cast<aos_queue *>(23), recycle_queue);
+  RawQueue *recycle_queue = reinterpret_cast<RawQueue *>(23);
+  RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage),
+                                          1, 2, 2, 2, &recycle_queue);
+  ASSERT_NE(reinterpret_cast<RawQueue *>(23), recycle_queue);
   MessageArgs args{queue, 0, 973}, recycle{recycle_queue, 0, 973};
 
-  args.flags = BLOCK;
+  args.flags = RawQueue::kBlock;
   EXPECT_RETURNS(WriteTestMessage, &args);
   EXPECT_HANGS(ReadTestMessage, &recycle);
   args.data = 254;
   EXPECT_RETURNS(WriteTestMessage, &args);
   EXPECT_HANGS(ReadTestMessage, &recycle);
   args.data = 971;
-  args.flags = OVERRIDE;
+  args.flags = RawQueue::kOverride;
   EXPECT_RETURNS(WriteTestMessage, &args);
-  recycle.flags = BLOCK;
+  recycle.flags = RawQueue::kBlock;
   EXPECT_RETURNS(ReadTestMessage, &recycle);
 
   EXPECT_HANGS(ReadTestMessage, &recycle);
 
-  TestMessage *msg = static_cast<TestMessage *>(aos_queue_get_msg(queue));
+  TestMessage *msg = static_cast<TestMessage *>(queue->GetMessage());
   ASSERT_TRUE(msg != NULL);
   msg->data = 341;
-  aos_queue_free_msg(queue, msg);
+  queue->FreeMessage(msg);
   recycle.data = 341;
   EXPECT_RETURNS(ReadTestMessage, &recycle);
 
   EXPECT_HANGS(ReadTestMessage, &recycle);
 
   args.data = 254;
-  args.flags = PEEK;
+  args.flags = RawQueue::kPeek;
   EXPECT_RETURNS(ReadTestMessage, &args);
-  recycle.flags = BLOCK;
+  recycle.flags = RawQueue::kBlock;
   EXPECT_HANGS(ReadTestMessage, &recycle);
-  args.flags = BLOCK;
+  args.flags = RawQueue::kBlock;
   EXPECT_RETURNS(ReadTestMessage, &args);
   recycle.data = 254;
   EXPECT_RETURNS(ReadTestMessage, &recycle);
 }
 
+}  // namespace testing
+}  // namespace aos
diff --git a/aos/atom_code/ipc_lib/shared_mem.c b/aos/atom_code/ipc_lib/shared_mem.c
index f631b78..e2c2c9e 100644
--- a/aos/atom_code/ipc_lib/shared_mem.c
+++ b/aos/atom_code/ipc_lib/shared_mem.c
@@ -1,4 +1,4 @@
-#include "shared_mem.h"
+#include "aos/atom_code/ipc_lib/shared_mem.h"
 
 #include <stdio.h>
 #include <string.h>
@@ -8,22 +8,31 @@
 #include <sys/types.h>
 #include <errno.h>
 
+#include "aos/atom_code/ipc_lib/core_lib.h"
+
 // the path for the shared memory segment. see shm_open(3) for restrictions
 #define AOS_SHM_NAME "/aos_shared_mem"
 // Size of the shared mem segment.
 // set to the maximum number that worked
 #define SIZEOFSHMSEG (4096 * 27813)
 
+void init_shared_mem_core(aos_shm_core *shm_core) {
+  clock_gettime(CLOCK_REALTIME, &shm_core->identifier);
+  shm_core->msg_alloc_lock = 0;
+  shm_core->queues.queue_list = NULL;
+  shm_core->queues.alloc_lock = 0;
+}
+
 ptrdiff_t aos_core_get_mem_usage(void) {
   return global_core->size -
       ((ptrdiff_t)global_core->mem_struct->msg_alloc -
        (ptrdiff_t)global_core->mem_struct);
 }
 
-struct aos_core global_core_data;
 struct aos_core *global_core = NULL;
 
 int aos_core_create_shared_mem(enum aos_core_create to_create) {
+  static struct aos_core global_core_data;
   global_core = &global_core_data;
   int shm;
 before:
@@ -46,8 +55,8 @@
   }
   if (shm == -1) {
     fprintf(stderr, "shared_mem:"
-        " shm_open(" AOS_SHM_NAME ", O_RDWR [| O_CREAT | O_EXCL, 0|0666)"
-        " failed with %d: %s\n", errno, strerror(errno));
+            " shm_open(" AOS_SHM_NAME ", O_RDWR [| O_CREAT | O_EXCL, 0|0666)"
+            " failed with %d: %s\n", errno, strerror(errno));
     return -1;
   }
   if (global_core->owner) {
@@ -62,8 +71,8 @@
       MAP_SHARED | MAP_FIXED | MAP_LOCKED | MAP_POPULATE, shm, 0);
   if (shm_address == MAP_FAILED) {
     fprintf(stderr, "shared_mem: mmap(%p, 0x%zx, stuff, stuff, %d, 0) failed"
-        " with %d: %s\n",
-        (void *)SHM_START, SIZEOFSHMSEG, shm, errno, strerror(errno));
+            " with %d: %s\n",
+            (void *)SHM_START, SIZEOFSHMSEG, shm, errno, strerror(errno));
     return -1;
   }
   printf("shared_mem: shm at: %p\n", shm_address);
@@ -88,9 +97,9 @@
     init_shared_mem_core(global_core->mem_struct);
   }
   if (global_core->owner) {
-    condition_set(&global_core->mem_struct->creation_condition);
+    futex_set(&global_core->mem_struct->creation_condition);
   } else {
-    if (condition_wait(&global_core->mem_struct->creation_condition) != 0) {
+    if (futex_wait(&global_core->mem_struct->creation_condition) != 0) {
       fprintf(stderr, "waiting on creation_condition failed\n");
       return -1;
     }
@@ -116,4 +125,3 @@
   }
   return 0;
 }
-
diff --git a/aos/atom_code/ipc_lib/shared_mem.h b/aos/atom_code/ipc_lib/shared_mem.h
index b1a2608..c0d21ac 100644
--- a/aos/atom_code/ipc_lib/shared_mem.h
+++ b/aos/atom_code/ipc_lib/shared_mem.h
@@ -1,19 +1,40 @@
 #ifndef _SHARED_MEM_H_
 #define _SHARED_MEM_H_
 
+#include <stddef.h>
+#include <unistd.h>
+#include <time.h>
+
+#include "aos/atom_code/ipc_lib/aos_sync.h"
+
 #ifdef __cplusplus
 extern "C" {
 #endif
 
-#include "core_lib.h"
-#include <stddef.h>
-#include <unistd.h>
+extern struct aos_core *global_core;
 
 // Where the shared memory segment starts in each process's address space.
 // Has to be the same in all of them so that stuff in shared memory
 // can have regular pointers to other stuff in shared memory.
 #define SHM_START 0x20000000
 
+typedef struct aos_queue_global_t {
+  mutex alloc_lock;
+  void *queue_list;  // an aos::Queue* declared in C code
+} aos_queue_global;
+
+typedef struct aos_shm_core_t {
+  // clock_gettime(CLOCK_REALTIME, &identifier) gets called to identify
+  // this shared memory area
+  struct timespec identifier;
+  // gets 0-initialized at the start (as part of shared memory) and
+  // the owner sets as soon as it finishes setting stuff up
+  mutex creation_condition;
+  mutex msg_alloc_lock;
+  void *msg_alloc;
+  aos_queue_global queues;
+} aos_shm_core;
+
 enum aos_core_create {
   create,
   reference
@@ -26,6 +47,8 @@
   aos_shm_core *mem_struct;
 };
 
+void init_shared_mem_core(aos_shm_core *shm_core);
+
 ptrdiff_t aos_core_get_mem_usage(void);
 
 // Takes the specified memory address and uses it as the shared memory.
diff --git a/aos/atom_code/ipc_lib/sharedmem_test_setup.h b/aos/atom_code/ipc_lib/sharedmem_test_setup.h
deleted file mode 100644
index e461c43..0000000
--- a/aos/atom_code/ipc_lib/sharedmem_test_setup.h
+++ /dev/null
@@ -1,147 +0,0 @@
-// defines a fixture (SharedMemTestSetup) that sets up shared memory
-
-extern "C" {
-#include "shared_mem.h"
-  extern struct aos_core *global_core;
-}
-
-#include <signal.h>
-
-#include <gtest/gtest.h>
-#include <sys/types.h>
-
-// TODO(brians) read logs from here
-class SharedMemTestSetup : public testing::Test{
- protected:
-  pid_t core;
-  int start[2];
-  int memcheck[2];
-  static void signal_handler(int){
-     if(aos_core_free_shared_mem()){
-      exit(- 1);
-    }
-    exit(0);
-  }
-  static int get_mem_usage(){
-    return global_core->size - ((uint8_t *)global_core->mem_struct->msg_alloc - (uint8_t *)SHM_START);
-  }
-  bool checking_mem;
-
-  virtual void BeforeLocalShmSetup() {}
-  virtual void SetUp(){
-    ASSERT_EQ(0, pipe(start)) << "couldn't create start pipes";
-    ASSERT_EQ(0, pipe(memcheck)) << "couldn't create memcheck pipes";
-    checking_mem = false;
-    if((core = fork()) == 0){
-      close(start[0]);
-      close(memcheck[1]);
-      struct sigaction act;
-      act.sa_handler = signal_handler;
-      sigaction(SIGINT, &act, NULL);
-      if(aos_core_create_shared_mem(create)){
-        exit(-1);
-      }
-      write_pipe(start[1], "a", 1);
-      int usage = 0;
-      while(1){
-        char buf1;
-        read_pipe(memcheck[0], &buf1, 1);
-        if(usage == 0)
-          usage = get_mem_usage();
-        if(usage == get_mem_usage())
-          buf1 = 1;
-        else
-          buf1 = 0;
-        write_pipe(start[1], &buf1, 1);
-      }
-    }
-    close(start[1]);
-    close(memcheck[0]);
-    ASSERT_NE(-1, core) << "fork failed";
-    char buf;
-    read_pipe(start[0], &buf, 1);
-
-    BeforeLocalShmSetup();
-
-    ASSERT_EQ(0, aos_core_create_shared_mem(reference)) << "couldn't create shared mem reference";
-  }
-  virtual void TearDown(){
-    if(checking_mem){
-      write_pipe(memcheck[1], "a", 1);
-      char buf;
-      read_pipe(start[0], &buf, 1);
-      EXPECT_EQ(1, buf) << "memory got leaked";
-    }
-    EXPECT_EQ(0, aos_core_free_shared_mem()) << "issues freeing shared mem";
-    if(core > 0){
-      kill(core, SIGINT);
-      siginfo_t status;
-      ASSERT_EQ(0, waitid(P_PID, core, &status, WEXITED)) << "waiting for the core to finish failed";
-      EXPECT_EQ(CLD_EXITED, status.si_code) << "core died";
-      EXPECT_EQ(0, status.si_status) << "core exited with an error";
-    }
-  }
-  // if any more shared memory gets allocated after calling this and not freed by the end, it's an error
-  void AllSharedMemAllocated(){
-    checking_mem = true;
-    write_pipe(memcheck[1], "a", 1);
-    char buf;
-    read_pipe(start[0], &buf, 1);
-  }
- private:
-  // Wrapper functions for pipes because they should never have errors.
-  void read_pipe(int fd, void *buf, size_t count) {
-    if (read(fd, buf, count) < 0) abort();
-  }
-  void write_pipe(int fd, const void *buf, size_t count) {
-    if (write(fd, buf, count) < 0) abort();
-  }
-};
-class ExecVeTestSetup : public SharedMemTestSetup {
- protected:
-  std::vector<std::string> files;
-  std::vector<pid_t> pids;
-  virtual void BeforeLocalShmSetup(){
-    std::vector<std::string>::iterator it;
-    pid_t child;
-    for(it = files.begin(); it < files.end(); ++it){
-      if((child = fork()) == 0){
-        char *null = NULL;
-        execve(it->c_str(), &null, &null);
-        ADD_FAILURE() << "execve failed";
-        perror("execve");
-        exit(0);
-      }
-      if(child > 0)
-        pids.push_back(child);
-      else
-        ADD_FAILURE() << "fork failed return=" << child;
-    }
-    usleep(150000);
-  }
-  virtual void TearDown(){
-    std::vector<pid_t>::iterator it;
-    siginfo_t status;
-    for(it = pids.begin(); it < pids.end(); ++it){
-      printf("attempting to SIGINT(%d) %d\n", SIGINT, *it);
-      if(*it > 0){
-        kill(*it, SIGINT);
-        ASSERT_EQ(0, waitid(P_PID, *it, &status, WEXITED)) << "waiting for the AsyncAction(pid=" << *it << ") to finish failed";
-        EXPECT_EQ(CLD_EXITED, status.si_code) << "child died (killed by signal is " << (int)CLD_KILLED << ")";
-        EXPECT_EQ(0, status.si_status) << "child exited with an error";
-      }else{
-        FAIL();
-      }
-    }
-
-    SharedMemTestSetup::TearDown();
-  }
-  // call this _before_ ExecVeTestSetup::SetUp()
-  void AddProcess(const std::string file){
-    files.push_back(file);
-  }
-  void PercolatePause(){
-    usleep(50000);
-  }
-};
-
diff --git a/aos/atom_code/logging/atom_logging.cc b/aos/atom_code/logging/atom_logging.cc
index 08a65cb..854da17 100644
--- a/aos/atom_code/logging/atom_logging.cc
+++ b/aos/atom_code/logging/atom_logging.cc
@@ -53,8 +53,7 @@
   return process_name + '.' + thread_name;
 }
 
-static const aos_type_sig message_sig = {sizeof(LogMessage), 1323, 1500};
-static aos_queue *queue;
+RawQueue *queue;
 
 }  // namespace
 namespace internal {
@@ -83,7 +82,7 @@
 
 class AtomQueueLogImplementation : public LogImplementation {
   virtual void DoLog(log_level level, const char *format, va_list ap) {
-    LogMessage *message = static_cast<LogMessage *>(aos_queue_get_msg(queue));
+    LogMessage *message = static_cast<LogMessage *>(queue->GetMessage());
     if (message == NULL) {
       LOG(FATAL, "queue get message failed\n");
     }
@@ -99,7 +98,7 @@
 void Register() {
   Init();
 
-  queue = aos_fetch_queue("LoggingQueue", &message_sig);
+  queue = RawQueue::Fetch("LoggingQueue", sizeof(LogMessage), 1323, 1500);
   if (queue == NULL) {
     Die("logging: couldn't fetch queue\n");
   }
@@ -108,33 +107,32 @@
 }
 
 const LogMessage *ReadNext(int flags, int *index) {
-  return static_cast<const LogMessage *>(
-      aos_queue_read_msg_index(queue, flags, index));
+  return static_cast<const LogMessage *>(queue->ReadMessageIndex(flags, index));
 }
 
 const LogMessage *ReadNext() {
-  return ReadNext(BLOCK);
+  return ReadNext(RawQueue::kBlock);
 }
 
 const LogMessage *ReadNext(int flags) {
   const LogMessage *r = NULL;
   do {
-    r = static_cast<const LogMessage *>(aos_queue_read_msg(queue, flags));
+    r = static_cast<const LogMessage *>(queue->ReadMessage(flags));
     // not blocking means return a NULL if that's what it gets
-  } while ((flags & BLOCK) && r == NULL);
+  } while ((flags & RawQueue::kBlock) && r == NULL);
   return r;
 }
 
 LogMessage *Get() {
-  return static_cast<LogMessage *>(aos_queue_get_msg(queue));
+  return static_cast<LogMessage *>(queue->GetMessage());
 }
 
 void Free(const LogMessage *msg) {
-  aos_queue_free_msg(queue, msg);
+  queue->FreeMessage(msg);
 }
 
 void Write(LogMessage *msg) {
-  if (aos_queue_write_msg_free(queue, msg, OVERRIDE) < 0) {
+  if (!queue->WriteMessage(msg, RawQueue::kOverride)) {
     LOG(FATAL, "writing failed");
   }
 }
diff --git a/aos/atom_code/queue-tmpl.h b/aos/atom_code/queue-tmpl.h
index bb043e1..0cc9392 100644
--- a/aos/atom_code/queue-tmpl.h
+++ b/aos/atom_code/queue-tmpl.h
@@ -5,7 +5,7 @@
   assert(msg_ != NULL);
   msg_->SetTimeToNow();
   assert(queue_ != NULL);
-  bool return_value = aos_queue_write_msg_free(queue_, msg_, OVERRIDE) == 0;
+  bool return_value = queue_->WriteMessage(msg_, RawQueue::kOverride);
   msg_ = NULL;
   return return_value;
 }
@@ -15,7 +15,7 @@
   assert(msg_ != NULL);
   msg_->SetTimeToNow();
   assert(queue_ != NULL);
-  bool return_value = aos_queue_write_msg_free(queue_, msg_, BLOCK) == 0;
+  bool return_value = queue_->WriteMessage(msg_, RawQueue::kBlock);
   msg_ = NULL;
   return return_value;
 }
@@ -23,7 +23,7 @@
 template <class T>
 void ScopedMessagePtr<T>::reset(T *msg) {
   if (queue_ != NULL && msg_ != NULL) {
-    aos_queue_free_msg(queue_, msg_);
+    queue_->FreeMessage(msg);
   }
   msg_ = msg;
 }
@@ -81,13 +81,12 @@
     assert(msg_ != NULL);
     assert(queue_ != NULL);
     msg_->SetTimeToNow();
-    T *shm_msg = static_cast<T *>(aos_queue_get_msg(queue_));
+    T *shm_msg = static_cast<T *>(queue_->GetMessage());
     if (shm_msg == NULL) {
       return false;
     }
     *shm_msg = *msg_;
-    bool return_value =
-        aos_queue_write_msg_free(queue_, shm_msg, OVERRIDE) == 0;
+    bool return_value = queue_->WriteMessage(shm_msg, RawQueue::kOverride);
     reset();
     return return_value;
   }
@@ -100,12 +99,12 @@
     assert(msg_ != NULL);
     assert(queue_ != NULL);
     msg_->SetTimeToNow();
-    T *shm_msg = static_cast<T *>(aos_queue_get_msg(queue_));
+    T *shm_msg = static_cast<T *>(queue_->GetMessage());
     if (shm_msg == NULL) {
       return false;
     }
     *shm_msg = *msg_;
-    bool return_value = aos_queue_write_msg_free(queue_, shm_msg, BLOCK) == 0;
+    bool return_value = queue_->WriteMessage(shm_msg, RawQueue::kBlock);
     reset();
     return return_value;
   }
@@ -145,7 +144,7 @@
   friend class aos::SafeMessageBuilder<T>;
 
   // Only Queue should be able to build a message pointer.
-  SafeScopedMessagePtr(aos_queue *queue)
+  SafeScopedMessagePtr(RawQueue *queue)
       : queue_(queue), msg_(new T()) {}
 
   // Sets the pointer to msg, freeing the old value if it was there.
@@ -159,10 +158,10 @@
   }
 
   // Sets the queue that owns this message.
-  void set_queue(aos_queue *queue) { queue_ = queue; }
+  void set_queue(RawQueue *queue) { queue_ = queue; }
 
   // The queue that the message is a part of.
-  aos_queue *queue_;
+  RawQueue *queue_;
   // The message or NULL.
   T *msg_;
 };
@@ -170,11 +169,9 @@
 template <class T>
 void Queue<T>::Init() {
   if (queue_ == NULL) {
-    // Signature of the message.
-    aos_type_sig kQueueSignature{sizeof(T), static_cast<int>(T::kHash),
-      T::kQueueLength};
-
-    queue_ = aos_fetch_queue(queue_name_, &kQueueSignature);
+    queue_ = RawQueue::Fetch(queue_name_, sizeof(T),
+                             static_cast<int>(T::kHash),
+                             T::kQueueLength);
     queue_msg_.set_queue(queue_);
   }
 }
@@ -191,11 +188,11 @@
 template <class T>
 bool Queue<T>::FetchNext() {
   Init();
-  // TODO(aschuh): Use aos_queue_read_msg_index so that multiple readers
+  // TODO(aschuh): Use RawQueue::ReadMessageIndex so that multiple readers
   // reading don't randomly get only part of the messages.
   // Document here the tradoffs that are part of each method.
-  const T *msg = static_cast<const T *>(aos_queue_read_msg(queue_,
-        NON_BLOCK));
+  const T *msg = static_cast<const T *>(
+      queue_->ReadMessage(RawQueue::kNonBlock));
   // Only update the internal pointer if we got a new message.
   if (msg != NULL) {
     queue_msg_.reset(msg);
@@ -206,7 +203,7 @@
 template <class T>
 bool Queue<T>::FetchNextBlocking() {
   Init();
-  const T *msg = static_cast<const T *>(aos_queue_read_msg(queue_, BLOCK));
+  const T *msg = static_cast<const T *>(queue_->ReadMessage(RawQueue::kBlock));
   queue_msg_.reset(msg);
   assert (msg != NULL);
   return true;
@@ -215,16 +212,16 @@
 template <class T>
 bool Queue<T>::FetchLatest() {
   Init();
-  const T *msg = static_cast<const T *>(aos_queue_read_msg(queue_,
-        FROM_END | NON_BLOCK | PEEK));
+  const T *msg = static_cast<const T *>(queue_->ReadMessage(
+          RawQueue::kFromEnd | RawQueue::kNonBlock | RawQueue::kPeek));
   // Only update the internal pointer if we got a new message.
   if (msg != NULL && msg != queue_msg_.get()) {
     queue_msg_.reset(msg);
     return true;
   }
-  // The message has to get freed if we didn't use it (and aos_queue_free_msg is
-  // ok to call on NULL).
-  aos_queue_free_msg(queue_, msg);
+  // The message has to get freed if we didn't use it (and RawQueue::FreeMessage
+  // is ok to call on NULL).
+  queue_->FreeMessage(msg);
   return false;
 }
 
@@ -244,7 +241,7 @@
 
 template <class T>
 T *Queue<T>::MakeRawMessage() {
-  T *ret = static_cast<T *>(aos_queue_get_msg(queue_));
+  T *ret = static_cast<T *>(queue_->GetMessage());
   assert(ret != NULL);
   return ret;
 }
diff --git a/aos/build/aos.gyp b/aos/build/aos.gyp
index 874b6a7..c91ab43 100644
--- a/aos/build/aos.gyp
+++ b/aos/build/aos.gyp
@@ -17,10 +17,10 @@
             '<(AOS)/atom_code/logging/atom_logging.cc',
           ],
           'dependencies': [
-            '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:ipc_lib',
+            '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:queue',
           ],
           'export_dependent_settings': [
-            '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:ipc_lib',
+            '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:queue',
           ]
         }],
       ],
diff --git a/aos/build/aos_all.gyp b/aos/build/aos_all.gyp
index 4bbd4fa..ba30a35 100644
--- a/aos/build/aos_all.gyp
+++ b/aos/build/aos_all.gyp
@@ -13,7 +13,8 @@
         '../atom_code/camera/camera.gyp:CameraHTTPStreamer',
         '../atom_code/camera/camera.gyp:CameraReader',
         '../atom_code/core/core.gyp:*',
-        '../atom_code/ipc_lib/ipc_lib.gyp:*',
+        '../atom_code/ipc_lib/ipc_lib.gyp:raw_queue_test',
+        '../atom_code/ipc_lib/ipc_lib.gyp:ipc_stress_test',
         '../atom_code/starter/starter.gyp:starter_exe',
         '../atom_code/starter/starter.gyp:netconsole',
         '../common/common.gyp:queue_test',
@@ -43,6 +44,7 @@
         '<(AOS)/common/common.gyp:type_traits_test',
         '<(AOS)/common/common.gyp:time_test',
         '<(AOS)/common/common.gyp:mutex_test',
+        '<(AOS)/common/common.gyp:condition_test',
         '<(AOS)/common/common.gyp:once_test',
         '<(AOS)/common/logging/logging.gyp:logging_impl_test',
       ],
diff --git a/aos/build/queues/output/message_dec.rb b/aos/build/queues/output/message_dec.rb
index a643392..fcc3c60 100644
--- a/aos/build/queues/output/message_dec.rb
+++ b/aos/build/queues/output/message_dec.rb
@@ -161,13 +161,13 @@
 		cons_ifdef_statement = CPP::PreprocessorIf.new(cons, unsafe_cons)
 		cons_ifdef_statement.name = "!defined(__VXWORKS__) && !defined(__TEST_VXWORKS__)"
 		template.add_member(:private,cons_ifdef_statement)
-		cons.args << "aos_queue *queue"
+		cons.args << "RawQueue *queue"
 		cons.args << "#{t} *msg"
 		unsafe_cons.args << "#{t} *msg"
 		cons.add_cons("msg_ptr_","queue","msg")
 		unsafe_cons.add_cons("msg_ptr_","msg")
 		cons = safetemplate.add_member(:private,CPP::Constructor.new(safetemplate))
-		cons.args << "aos_queue *queue"
+		cons.args << "RawQueue *queue"
 		cons.add_cons("msg_ptr_","queue")
 		safetemplate.public
 		template.public
diff --git a/aos/common/common.gyp b/aos/common/common.gyp
index 442e4ac..f3b4e69 100644
--- a/aos/common/common.gyp
+++ b/aos/common/common.gyp
@@ -21,11 +21,14 @@
         'queue_testutils.cc',
       ],
       'dependencies': [
-        '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:ipc_lib',
         '<(AOS)/build/aos.gyp:logging',
         'once',
         '<(EXTERNALS):gtest',
+        '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:shared_mem',
       ],
+      'export_dependent_settings': [
+        '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:shared_mem',
+       ],
     },
     {
       'target_name': 'time',
@@ -54,10 +57,10 @@
         },
         {
           'dependencies': [
-            '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:ipc_lib',
+            '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:queue',
           ],
           'export_dependent_settings': [
-            '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:ipc_lib',
+            '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:queue',
           ],
         }]
       ],
@@ -216,6 +219,24 @@
       ],
     },
     {
+      'target_name': 'condition',
+      'type': 'static_library',
+      'sources': [
+        '<(AOS)/atom_code/ipc_lib/condition.cc',
+      ],
+      'dependencies': [
+        'mutex',
+        '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:aos_sync',
+        # TODO(aschuh): Fix this dependency loop by
+        # providing a logging interface.
+        # '<(AOS)/build/aos.gyp:logging',
+      ],
+      'export_dependent_settings': [
+        'mutex',
+        '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:aos_sync',
+      ],
+    },
+    {
       'target_name': 'mutex',
       'type': 'static_library',
       'conditions': [
@@ -228,10 +249,10 @@
             '<(AOS)/atom_code/ipc_lib/mutex.cpp',
           ],
           'dependencies': [
-            '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:ipc_lib',
+            '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:aos_sync',
           ],
           'export_dependent_settings': [
-            '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:ipc_lib',
+            '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:aos_sync',
           ],
         }],
       ],
@@ -254,6 +275,23 @@
       ],
     },
     {
+      'target_name': 'condition_test',
+      'type': 'executable',
+      'sources': [
+        'condition_test.cc',
+      ],
+      'dependencies': [
+        '<(EXTERNALS):gtest',
+        'condition',
+        '<(AOS)/common/util/util.gyp:thread',
+        'time',
+        'mutex',
+        '<(AOS)/build/aos.gyp:logging',
+        'queue_testutils',
+        '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:core_lib',
+       ],
+    },
+    {
       'target_name': 'die_test',
       'type': 'executable',
       'sources': [
diff --git a/aos/common/condition.h b/aos/common/condition.h
new file mode 100644
index 0000000..c407070
--- /dev/null
+++ b/aos/common/condition.h
@@ -0,0 +1,79 @@
+#ifndef AOS_COMMON_CONDITION_H_
+#define AOS_COMMON_CONDITION_H_
+
+#include "aos/common/mutex.h"
+#include "aos/atom_code/ipc_lib/aos_sync.h"
+
+namespace aos {
+
+// A condition variable (IPC mechanism where 1 process/task can notify all
+// others that are waiting for something to happen) without the race condition
+// where a notification is sent after some process has checked if the thing has
+// happened but before it has started listening for notifications.
+//
+// This implementation will print debugging information and abort the process
+// if anything weird happens.
+//
+// A simple example of the use of a condition variable (adapted from
+// pthread_cond(3)):
+//
+// int x, y;
+// Mutex mutex;
+// Condition condition(&mutex);
+//
+// // Waiting until x is greater than y:
+// {
+//   MutexLocker locker(&mutex);
+//   while (!(x > y)) condition.Wait();
+//   // do whatever
+// }
+//
+// // Modifying x and/or y:
+// {
+//   MutexLocker locker(&mutex);
+//   // modify x and y
+//   if (x > y) condition.Broadcast();
+// }
+//
+// Notice the loop around the Wait(). This is very important because some other
+// process can lock the mutex and modify the shared state (possibly undoing
+// whatever the Wait()er was waiting for) in between the Broadcast()er unlocking
+// the mutex and the Wait()er(s) relocking it.
+//
+// Multiple condition variables may be associated with the same mutex but
+// exactly 1 mutex must be associated with each condition variable.
+class Condition {
+ public:
+  // m is the mutex that will be associated with this condition variable. This
+  // object will hold on to a reference to it but does not take ownership.
+  explicit Condition(Mutex *m);
+
+  // Waits for the condition variable to be signalled, atomically unlocking the
+  // mutex associated with this condition variable at the same time. The mutex
+  // associated with this condition variable must be locked when this is called
+  // and will be locked when this method returns.
+  // NOTE: The relocking of the mutex is not performed atomically with waking
+  // up.
+  void Wait();
+
+  // Signals at most 1 other process currently Wait()ing on this condition
+  // variable. Calling this does not require the mutex associated with this
+  // condition variable to be locked.
+  // One of the processes with the highest priority level will be woken.
+  void Signal();
+  // Wakes all processes that are currently Wait()ing on this condition
+  // variable. Calling this does not require the mutex associated with this
+  // condition variable to be locked.
+  void Broadcast();
+
+  // Retrieves the mutex associated with this condition variable.
+  Mutex *m() { return m_; }
+
+ private:
+  mutex impl_;
+  Mutex *m_;
+};
+
+}  // namespace aos
+
+#endif  // AOS_COMMON_CONDITION_H_
diff --git a/aos/common/condition_test.cc b/aos/common/condition_test.cc
new file mode 100644
index 0000000..fca2820
--- /dev/null
+++ b/aos/common/condition_test.cc
@@ -0,0 +1,308 @@
+#include "aos/common/condition.h"
+
+#include <unistd.h>
+#include <sys/types.h>
+#include <sys/wait.h>
+
+#include "gtest/gtest.h"
+
+#include "aos/common/time.h"
+#include "aos/common/mutex.h"
+#include "aos/common/queue_testutils.h"
+#include "aos/common/type_traits.h"
+#include "aos/atom_code/ipc_lib/core_lib.h"
+#include "aos/common/logging/logging.h"
+#include "aos/common/macros.h"
+
+using ::aos::time::Time;
+using ::aos::common::testing::GlobalCoreInstance;
+
+namespace aos {
+namespace testing {
+
+class ConditionTest : public ::testing::Test {
+ public:
+  struct Shared {
+    Shared() : condition(&mutex) {}
+
+    Mutex mutex;
+    Condition condition;
+  };
+  static_assert(shm_ok<Shared>::value,
+                "it's going to get shared between forked processes");
+
+  ConditionTest() : shared_(static_cast<Shared *>(shm_malloc(sizeof(Shared)))) {
+    new (shared_) Shared();
+  }
+
+  GlobalCoreInstance my_core;
+
+  Shared *const shared_;
+
+  void Settle() {
+    time::SleepFor(::Time::InSeconds(0.008));
+  }
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(ConditionTest);
+};
+
+class ConditionTestProcess {
+ public:
+  enum class Action {
+    kWaitLockStart,  // lock, delay, wait, unlock
+    kWait,  // delay, lock, wait, unlock
+    kWaitNoUnlock,  // delay, lock, wait
+  };
+
+  // This amount gets added to any passed in delay to make the test repeatable.
+  static constexpr ::Time kMinimumDelay = ::Time::InSeconds(0.015);
+  static constexpr ::Time kDefaultTimeout = ::Time::InSeconds(0.09);
+
+  // delay is how long to wait before doing action to condition.
+  // timeout is how long to wait after delay before deciding that it's hung.
+  ConditionTestProcess(const ::Time &delay, Action action, Condition *condition,
+                       const ::Time &timeout = kDefaultTimeout)
+    : delay_(kMinimumDelay + delay), action_(action), condition_(condition),
+      timeout_(delay_ + timeout), child_(-1),
+      shared_(static_cast<Shared *>(shm_malloc(sizeof(Shared)))) {
+    new (shared_) Shared();
+  }
+  ~ConditionTestProcess() {
+    assert(child_ == -1);
+  }
+
+  void Start() {
+    ASSERT_FALSE(shared_->started);
+
+    child_ = fork();
+    if (child_ == 0) {  // in child
+      ::aos::common::testing::PreventExit();
+      Run();
+      exit(EXIT_SUCCESS);
+    } else {  // in parent
+      assert(child_ != -1);
+
+      shared_->ready.Lock();
+
+      shared_->started = true;
+    }
+  }
+
+  bool IsFinished() {
+    return shared_->finished;
+  }
+
+  ::testing::AssertionResult Hung() {
+    if (!shared_->started) {
+      ADD_FAILURE();
+      return ::testing::AssertionFailure() << "not started yet";
+    }
+    if (shared_->finished) {
+      Join();
+      return ::testing::AssertionFailure() << "already returned";
+    }
+    if (shared_->delayed) {
+      if (shared_->start_time > ::Time::Now() + timeout_) {
+        Kill();
+        return ::testing::AssertionSuccess() << "already been too long";
+      }
+    } else {
+      shared_->done_delaying.Lock();
+    }
+    time::SleepFor(::Time::InSeconds(0.01));
+    if (!shared_->finished) time::SleepUntil(shared_->start_time + timeout_);
+    if (shared_->finished) {
+      Join();
+      return ::testing::AssertionFailure() << "completed within timeout";
+    } else {
+      Kill();
+      return ::testing::AssertionSuccess() << "took too long";
+    }
+  }
+  ::testing::AssertionResult Test() {
+    Start();
+    return Hung();
+  }
+
+ private:
+  struct Shared {
+    Shared()
+      : started(false), delayed(false), start_time(0, 0), finished(false) {
+      done_delaying.Lock();
+      ready.Lock();
+    }
+
+    volatile bool started;
+    volatile bool delayed;
+    Mutex done_delaying;
+    ::Time start_time;
+    volatile bool finished;
+    Mutex ready;
+  };
+  static_assert(shm_ok<Shared>::value,
+                "it's going to get shared between forked processes");
+
+  void Run() {
+    if (action_ == Action::kWaitLockStart) {
+      shared_->ready.Unlock();
+      condition_->m()->Lock();
+    }
+    time::SleepFor(delay_);
+    shared_->start_time = ::Time::Now();
+    shared_->delayed = true;
+    shared_->done_delaying.Unlock();
+    if (action_ != Action::kWaitLockStart) {
+      shared_->ready.Unlock();
+      condition_->m()->Lock();
+    }
+    condition_->Wait();
+    shared_->finished = true;
+    if (action_ != Action::kWaitNoUnlock) {
+      condition_->m()->Unlock();
+    }
+  }
+
+  void Join() {
+    assert(child_ != -1);
+    int status;
+    do {
+      assert(waitpid(child_, &status, 0) == child_);
+    } while (!(WIFEXITED(status) || WIFSIGNALED(status)));
+    child_ = -1;
+  }
+  void Kill() {
+    assert(child_ != -1);
+    assert(kill(child_, SIGTERM) == 0);
+    Join();
+  }
+
+  const ::Time delay_;
+  const Action action_;
+  Condition *const condition_;
+  const ::Time timeout_;
+
+  pid_t child_;
+
+  Shared *const shared_;
+
+  DISALLOW_COPY_AND_ASSIGN(ConditionTestProcess);
+};
+constexpr ::Time ConditionTestProcess::kMinimumDelay;
+constexpr ::Time ConditionTestProcess::kDefaultTimeout;
+
+// Makes sure that the testing framework and everything work for a really simple
+// Wait() and then Signal().
+TEST_F(ConditionTest, Basic) {
+  ConditionTestProcess child(::Time(0, 0),
+                             ConditionTestProcess::Action::kWait,
+                             &shared_->condition);
+  child.Start();
+  Settle();
+  EXPECT_FALSE(child.IsFinished());
+  shared_->condition.Signal();
+  EXPECT_FALSE(child.Hung());
+}
+
+// Makes sure that the worker child locks before it tries to Wait() etc.
+TEST_F(ConditionTest, Locking) {
+  ConditionTestProcess child(::Time(0, 0),
+                             ConditionTestProcess::Action::kWait,
+                             &shared_->condition);
+  shared_->mutex.Lock();
+  child.Start();
+  Settle();
+  // This Signal() shouldn't do anything because the child should still be
+  // waiting to lock the mutex.
+  shared_->condition.Signal();
+  Settle();
+  shared_->mutex.Unlock();
+  EXPECT_TRUE(child.Hung());
+}
+
+// Tests that the work child only catches a Signal() after the mutex gets
+// unlocked.
+TEST_F(ConditionTest, LockFirst) {
+  ConditionTestProcess child(::Time(0, 0),
+                             ConditionTestProcess::Action::kWait,
+                             &shared_->condition);
+  shared_->mutex.Lock();
+  child.Start();
+  Settle();
+  shared_->condition.Signal();
+  Settle();
+  EXPECT_FALSE(child.IsFinished());
+  shared_->mutex.Unlock();
+  Settle();
+  EXPECT_FALSE(child.IsFinished());
+  shared_->condition.Signal();
+  EXPECT_FALSE(child.Hung());
+}
+
+// Tests that the mutex gets relocked after Wait() returns.
+TEST_F(ConditionTest, Relocking) {
+  ConditionTestProcess child(::Time(0, 0),
+                             ConditionTestProcess::Action::kWaitNoUnlock,
+                             &shared_->condition);
+  child.Start();
+  Settle();
+  shared_->condition.Signal();
+  EXPECT_FALSE(child.Hung());
+  EXPECT_FALSE(shared_->mutex.TryLock());
+}
+
+// Tests that Signal() stops exactly 1 Wait()er.
+TEST_F(ConditionTest, SignalOne) {
+  ConditionTestProcess child1(::Time(0, 0),
+                              ConditionTestProcess::Action::kWait,
+                              &shared_->condition);
+  ConditionTestProcess child2(::Time(0, 0),
+                              ConditionTestProcess::Action::kWait,
+                              &shared_->condition);
+  ConditionTestProcess child3(::Time(0, 0),
+                              ConditionTestProcess::Action::kWait,
+                              &shared_->condition);
+  auto number_finished = [&]() { return (child1.IsFinished() ? 1 : 0) +
+    (child2.IsFinished() ? 1 : 0) + (child3.IsFinished() ? 1 : 0); };
+  child1.Start();
+  child2.Start();
+  child3.Start();
+  Settle();
+  EXPECT_EQ(0, number_finished());
+  shared_->condition.Signal();
+  Settle();
+  EXPECT_EQ(1, number_finished());
+  shared_->condition.Signal();
+  Settle();
+  EXPECT_EQ(2, number_finished());
+  shared_->condition.Signal();
+  Settle();
+  EXPECT_EQ(3, number_finished());
+  EXPECT_FALSE(child1.Hung());
+  EXPECT_FALSE(child2.Hung());
+  EXPECT_FALSE(child3.Hung());
+}
+
+// Tests that Brodcast() wakes multiple Wait()ers.
+TEST_F(ConditionTest, Broadcast) {
+  ConditionTestProcess child1(::Time(0, 0),
+                              ConditionTestProcess::Action::kWait,
+                              &shared_->condition);
+  ConditionTestProcess child2(::Time(0, 0),
+                              ConditionTestProcess::Action::kWait,
+                              &shared_->condition);
+  ConditionTestProcess child3(::Time(0, 0),
+                              ConditionTestProcess::Action::kWait,
+                              &shared_->condition);
+  child1.Start();
+  child2.Start();
+  child3.Start();
+  Settle();
+  shared_->condition.Broadcast();
+  EXPECT_FALSE(child1.Hung());
+  EXPECT_FALSE(child2.Hung());
+  EXPECT_FALSE(child3.Hung());
+}
+
+}  // namespace testing
+}  // namespace aos
diff --git a/aos/common/macros.h b/aos/common/macros.h
index 88fc52e..2018b36 100644
--- a/aos/common/macros.h
+++ b/aos/common/macros.h
@@ -6,8 +6,8 @@
 // A macro to disallow the copy constructor and operator= functions
 // This should be used in the private: declarations for a class
 #define DISALLOW_COPY_AND_ASSIGN(TypeName) \
-  TypeName(const TypeName&);               \
-  void operator=(const TypeName&)
+  TypeName(const TypeName&) = delete;      \
+  void operator=(const TypeName&) = delete
 // A macro to wrap arguments to macros that contain commas.
 // Useful for DISALLOW_COPY_AND_ASSIGNing templated types with multiple template
 // arguments.
diff --git a/aos/common/messages/QueueHolder.h b/aos/common/messages/QueueHolder.h
index 23ddc95..8d8ba51 100644
--- a/aos/common/messages/QueueHolder.h
+++ b/aos/common/messages/QueueHolder.h
@@ -71,7 +71,7 @@
 #define aos_check_rv __attribute__((warn_unused_result))
 template<typename T> class QueueHolderNoBuilder {
 #ifndef __VXWORKS__
-  aos_queue *const queue_;
+  Queue *const queue_;
   static_assert(shm_ok<T>::value, "T must be able to"
                 " go through shared memory and memcpy");
   T t_;
@@ -80,7 +80,7 @@
 #endif
  public:
 #ifndef __VXWORKS__
-  explicit QueueHolderNoBuilder(aos_queue *queue) : queue_(queue) {}
+  explicit QueueHolderNoBuilder(Queue *queue) : queue_(queue) {}
 #else
   QueueHolderNoBuilder() {}
 #endif
@@ -158,7 +158,7 @@
   QueueBuilder<T> builder_;
  public:
 #ifndef __VXWORKS__
-  explicit QueueHolder(aos_queue *queue) : QueueHolderNoBuilder<T>(queue),
+  explicit QueueHolder(Queue *queue) : QueueHolderNoBuilder<T>(queue),
     builder_(*this) {}
 #else
   QueueHolder() : builder_(*this) {}
@@ -171,7 +171,6 @@
   }
 };
 
-} // namespace aos
+}  // namespace aos
 
 #endif
-
diff --git a/aos/common/messages/messages.gyp b/aos/common/messages/messages.gyp
index 3b8d389..b76dbba 100644
--- a/aos/common/messages/messages.gyp
+++ b/aos/common/messages/messages.gyp
@@ -19,10 +19,10 @@
       'conditions': [
         ['OS!="crio"', {
           'dependencies': [
-            '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:ipc_lib',
+            '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:queue',
           ],
           'export_dependent_settings': [
-            '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:ipc_lib',
+            '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:queue',
           ],
         }],
       ],
diff --git a/aos/common/mutex.h b/aos/common/mutex.h
index 035889b..b6b277c 100644
--- a/aos/common/mutex.h
+++ b/aos/common/mutex.h
@@ -11,6 +11,8 @@
 
 namespace aos {
 
+class Condition;
+
 // An abstraction of a mutex that has implementations both for the
 // atom and for the cRIO.
 // If there are multiple tasks or processes contending for the mutex,
@@ -42,6 +44,9 @@
   typedef mutex ImplementationType;
 #endif
   ImplementationType impl_;
+
+  friend class Condition;  // for access to impl_
+
 #ifdef __VXWORKS__
   DISALLOW_COPY_AND_ASSIGN(Mutex);
 #endif
@@ -64,6 +69,20 @@
   Mutex *mutex_;
   DISALLOW_COPY_AND_ASSIGN(MutexLocker);
 };
+// The inverse of MutexLocker.
+class MutexUnlocker {
+ public:
+  explicit MutexUnlocker(Mutex *mutex) : mutex_(mutex) {
+    mutex_->Unlock();
+  }
+  ~MutexUnlocker() {
+    mutex_->Lock();
+  }
+
+ private:
+  Mutex *mutex_;
+  DISALLOW_COPY_AND_ASSIGN(MutexUnlocker);
+};
 
 }  // namespace aos
 
diff --git a/aos/common/mutex_test.cpp b/aos/common/mutex_test.cpp
index a5d5e86..652cd9e 100644
--- a/aos/common/mutex_test.cpp
+++ b/aos/common/mutex_test.cpp
@@ -1,7 +1,16 @@
 #include "aos/common/mutex.h"
 
+#include <sched.h>
+#include <math.h>
+#include <pthread.h>
+#ifdef __VXWORKS__
+#include <taskLib.h>
+#endif
+
 #include "gtest/gtest.h"
 
+#include "aos/atom_code/ipc_lib/aos_sync.h"
+
 namespace aos {
 namespace testing {
 
@@ -51,5 +60,17 @@
   EXPECT_TRUE(test_mutex.TryLock());
 }
 
+TEST_F(MutexTest, MutexUnlocker) {
+  test_mutex.Lock();
+  {
+    aos::MutexUnlocker unlocker(&test_mutex);
+    // If this fails, then something weird is going on and the next line might
+    // hang, so fail immediately.
+    ASSERT_TRUE(test_mutex.TryLock());
+    test_mutex.Unlock();
+  }
+  EXPECT_FALSE(test_mutex.TryLock());
+}
+
 }  // namespace testing
 }  // namespace aos
diff --git a/aos/common/queue.h b/aos/common/queue.h
index 612429a..68cb338 100644
--- a/aos/common/queue.h
+++ b/aos/common/queue.h
@@ -138,7 +138,7 @@
 
 #ifndef USE_UNSAFE
   // Only Queue should be able to build a queue.
-  ScopedMessagePtr(aos_queue *queue, T *msg)
+  ScopedMessagePtr(RawQueue *queue, T *msg)
       : queue_(queue), msg_(msg) {}
 #else
   ScopedMessagePtr(T *msg)
@@ -152,10 +152,10 @@
 
 #ifndef USE_UNSAFE
   // Sets the queue that owns this message.
-  void set_queue(aos_queue *queue) { queue_ = queue; }
+  void set_queue(RawQueue *queue) { queue_ = queue; }
 
   // The queue that the message is a part of.
-  aos_queue *queue_;
+  RawQueue *queue_;
 #endif  // USE_UNSAFE
   // The message or NULL.
   T *msg_;
@@ -281,7 +281,7 @@
 #else
   T *MakeRawMessage();
   // Pointer to the queue that this object fetches from.
-  aos_queue *queue_;
+  RawQueue *queue_;
 #endif
   // Scoped pointer holding the latest message or NULL.
   ScopedMessagePtr<const T> queue_msg_;
diff --git a/aos/common/queue_test.cc b/aos/common/queue_test.cc
index 65a1c25..32d1d23 100644
--- a/aos/common/queue_test.cc
+++ b/aos/common/queue_test.cc
@@ -48,7 +48,7 @@
   usleep(50000);
   my_test_queue.MakeWithBuilder().test_bool(true).test_int(0x971).Send();
   t.Join();
-  EXPECT_EQ(true, t.threaded_test_queue.IsNewerThanMS(20));
+  EXPECT_LE(t.threaded_test_queue.Age(), time::Time::InMS(55));
 }
 
 // Tests that we can send a message with the message pointer and get it back.
diff --git a/aos/common/queue_testutils.cc b/aos/common/queue_testutils.cc
index 1d47e62..8d195c5 100644
--- a/aos/common/queue_testutils.cc
+++ b/aos/common/queue_testutils.cc
@@ -1,6 +1,10 @@
 #include "aos/common/queue_testutils.h"
 
 #include <string.h>
+#include <sys/mman.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <assert.h>
 
 #include "gtest/gtest.h"
 
@@ -110,15 +114,23 @@
 
 Once<void> enable_test_logging_once(DoEnableTestLogging);
 
+const size_t kCoreSize = 0x100000;
+
+void TerminateExitHandler() {
+  _exit(EXIT_SUCCESS);
+}
+
 }  // namespace
 
 GlobalCoreInstance::GlobalCoreInstance() {
-  const size_t kCoreSize = 0x100000;
   global_core = &global_core_data_;
-  global_core->owner = 1;
-  void *memory = malloc(kCoreSize);
-  assert(memory != NULL);
-  memset(memory, 0, kCoreSize);
+  global_core->owner = true;
+  // Use mmap(2) manually instead of through malloc(3) so that we can pass
+  // MAP_SHARED which allows forked processes to communicate using the
+  // "shared" memory.
+  void *memory = mmap(NULL, kCoreSize, PROT_READ | PROT_WRITE,
+                      MAP_SHARED | MAP_ANONYMOUS, -1, 0);
+  assert(memory != MAP_FAILED);
 
   assert(aos_core_use_address_as_shared_mem(memory, kCoreSize) == 0);
 
@@ -126,7 +138,7 @@
 }
 
 GlobalCoreInstance::~GlobalCoreInstance() {
-  free(global_core->mem_struct);
+  assert(munmap(global_core->mem_struct, kCoreSize) == 0);
   global_core = NULL;
 }
 
@@ -134,6 +146,10 @@
   enable_test_logging_once.Get();
 }
 
+void PreventExit() {
+  assert(atexit(TerminateExitHandler) == 0);
+}
+
 }  // namespace testing
 }  // namespace common
 }  // namespace aos
diff --git a/aos/common/queue_testutils.h b/aos/common/queue_testutils.h
index aabdd2d..2d26262 100644
--- a/aos/common/queue_testutils.h
+++ b/aos/common/queue_testutils.h
@@ -1,11 +1,20 @@
-#include "aos/common/queue.h"
+#ifndef AOS_COMMON_QUEUE_TESTUTILS_H_
+#define AOS_COMMON_QUEUE_TESTUTILS_H_
+
+#include "aos/atom_code/ipc_lib/shared_mem.h"
+
+// This file has some general helper functions for dealing with testing things
+// that use shared memory etc.
 
 namespace aos {
 namespace common {
 namespace testing {
 
+// Manages creating and cleaning up "shared memory" which works within this
+// process and any that it fork(2)s.
 class GlobalCoreInstance {
  public:
+  // Calls EnableTestLogging().
   GlobalCoreInstance();
   ~GlobalCoreInstance();
 
@@ -20,6 +29,14 @@
 // initialized), however it can be called more than that.
 void EnableTestLogging();
 
+// Registers an exit handler (using atexit(3)) which will call _exit(2).
+// Intended to be called in a freshly fork(2)ed process where it will run before
+// any other exit handlers that were already registered and prevent them from
+// being run.
+void PreventExit();
+
 }  // namespace testing
 }  // namespace common
 }  // namespace aos
+
+#endif  // AOS_COMMON_QUEUE_TESTUTILS_H_