fixed some bugs

The API for the condition variables was broken, so I changed that and
then fixed RawQueue to use it correctly. I also found a bug in the
condition variable implementation using the tests.
diff --git a/aos/atom_code/ipc_lib/aos_sync.c b/aos/atom_code/ipc_lib/aos_sync.c
index 92b11c4..3a70411 100644
--- a/aos/atom_code/ipc_lib/aos_sync.c
+++ b/aos/atom_code/ipc_lib/aos_sync.c
@@ -15,15 +15,19 @@
 // this code is based on something that appears to be based on <http://www.akkadia.org/drepper/futex.pdf>, which also has a lot of useful information
 // should probably use <http://lxr.linux.no/linux+v2.6.34/Documentation/robust-futexes.txt> once it becomes available
 //   (sys_set_robust_list appears to be the function name)
-// <http://locklessinc.com/articles/futex_cheat_sheet/> and <http://locklessinc.com/articles/mutex_cv_futex/> are useful
+// <http://locklessinc.com/articles/futex_cheat_sheet/> and
+//   <http://locklessinc.com/articles/mutex_cv_futex/> are useful
 // <http://lwn.net/Articles/360699/> has a nice overview of futexes in late 2009 (fairly recent compared to everything else...)
 // can't use PRIVATE futex operations because they use the pid (or something) as part of the hash
 //
+// Remember that EAGAIN and EWOUDBLOCK are the same! (ie if you get EAGAIN from
+// FUTEX_WAIT, the docs call it EWOULDBLOCK...)
+//
 // Values for a mutex:
 // 0 = unlocked
 // 1 = locked, not contended
 // 2 = locked, probably contended
-// Values for a condition:
+// Values for a "futex":
 // 0 = unset
 // 1 = set
 
@@ -42,7 +46,7 @@
 }
 
 static inline int mutex_get(mutex *m, uint8_t signals_fail, const
-    struct timespec *timeout) {
+                            struct timespec *timeout) {
   int c;
   c = cmpxchg(m, 0, 1);
   if (!c) return 0;
@@ -132,77 +136,61 @@
   return !xchg(m, 0);
 }
 
-void condition_wait(condition_variable *c, mutex *m) {
-  if (__builtin_expect(c->m != m, 0)) {
-    if (c->m != NULL) {
-      fprintf(stderr, "sync: c(=%p)->m(=%p) != m(=%p)\n",
-          c, c->m, m);
-      printf("see stderr\n");
-      abort();
-    }
-    (void)cmpxchg(&c->m, NULL, m);
-    if (c->m != m) {
-      fprintf(stderr, "sync: c(=%p)->m(=%p) != m(=%p)"
-          " after trying to set it\n",
-          c, c->m, m);
-      printf("see stderr\n");
-      abort();
-    }
-  }
-
-  const mutex wait_start = c->wait;
+void condition_wait(mutex *c, mutex *m) {
+  const mutex wait_start = *c;
 
   mutex_unlock(m);
 
-  errno = 0;
-  do {
-    // If the syscall succeeded or somebody did a wake before we
-    // actually made it to sleep.
-    if (sys_futex(&c->wait, FUTEX_WAIT, wait_start, NULL, NULL, 0) == 0 ||
-        c->wait != wait_start) {
-      // Simplified mutex_lock that always leaves it
-      // contended in case anybody else got requeued.
-      while (xchg(&c->m, 2) != 0) {
-        // If the syscall failed and it wasn't
-        // because of a signal or a wake.
-        if (sys_futex(c->m, FUTEX_WAIT, 2, NULL, NULL, 0) == -1 &&
-            errno != EINTR) {
-          fprintf(stderr, "sync: FUTEX_WAIT(%p, 2, NULL, NULL, 0)"
-              " failed with %d: %s\n",
-              c->m, errno, strerror(errno));
-          printf("see stderr\n");
-          abort();
-        }
+  while (1) {
+    if (sys_futex(c, FUTEX_WAIT, wait_start, NULL, NULL, 0) == -1) {
+      // If it failed for some reason other than somebody else doing a wake
+      // before we actually made it to sleep.
+      if (__builtin_expect(*c == wait_start, 0)) {
+        // Try again if it was because of a signal.
+        if (errno == EINTR) continue;
+        fprintf(stderr, "FUTEX_WAIT(%p, %"PRIu32", NULL, NULL, 0) failed"
+                " with %d: %s\n",
+                c, wait_start, errno, strerror(errno));
+        printf("see stderr\n");
+        abort();
       }
     }
-  } while (__builtin_expect(errno == EINTR, 1));
-  fprintf(stderr, "FUTEX_WAIT(%p, %"PRIu32", NULL, NULL, 0) failed"
-      " with %d: %s\n",
-      &c->wait, wait_start, errno, strerror(errno));
-  printf("see stderr\n");
-  abort();
+    // Simplified mutex_lock that always leaves it
+    // contended in case anybody else got requeued.
+    while (xchg(m, 2) != 0) {
+      if (sys_futex(m, FUTEX_WAIT, 2, NULL, NULL, 0) == -1) {
+        // Try again if it was because of a signal or somebody else unlocked it
+        // before we went to sleep.
+        if (errno == EINTR || errno == EWOULDBLOCK) continue;
+        fprintf(stderr, "sync: FUTEX_WAIT(%p, 2, NULL, NULL, 0)"
+                " failed with %d: %s\n",
+                m, errno, strerror(errno));
+        printf("see stderr\n");
+        abort();
+      }
+    }
+    return;
+  }
 }
 
-void condition_signal(condition_variable *c) {
-  __sync_fetch_and_add(&c->wait, 1);
-  if (sys_futex(&c->wait, FUTEX_WAKE, 1, NULL, NULL, 0) == -1) {
+void condition_signal(mutex *c, mutex *m __attribute__((unused))) {
+  __sync_fetch_and_add(c, 1);
+  if (sys_futex(c, FUTEX_WAKE, 1, NULL, NULL, 0) == -1) {
     fprintf(stderr, "sync: FUTEX_WAKE(%p, 1, NULL, NULL, 0)"
         " failed with %d: %s\n",
-        &c->wait, errno, strerror(errno));
+        c, errno, strerror(errno));
     printf("see stderr\n");
     abort();
   }
 }
 
-void condition_broadcast(condition_variable *c) {
-  // Have to check or else the wake syscall would fail.
-  if (__builtin_expect(c->m == NULL, 0)) return;
-  __sync_fetch_and_add(&c->wait, 1);
+void condition_broadcast(mutex *c, mutex *m) {
+  __sync_fetch_and_add(c, 1);
   // Wake 1 waiter and requeue the rest.
-  if (sys_futex_requeue(&c->wait, FUTEX_REQUEUE, 1, INT_MAX, c->m) == -1) {
+  if (sys_futex_requeue(c, FUTEX_REQUEUE, 1, INT_MAX, m) == -1) {
     fprintf(stderr, "sync: FUTEX_REQUEUE(%p, 1, INT_MAX, %p, 0)"
         " failed with %d: %s\n",
-        &c->wait, c->m, errno, strerror(errno));
+        c, m, errno, strerror(errno));
     printf("see stderr\n");
     abort();
   }
diff --git a/aos/atom_code/ipc_lib/aos_sync.h b/aos/atom_code/ipc_lib/aos_sync.h
index 0cecd68..9b28437 100644
--- a/aos/atom_code/ipc_lib/aos_sync.h
+++ b/aos/atom_code/ipc_lib/aos_sync.h
@@ -17,20 +17,11 @@
 // Have to align structs containing it to sizeof(int).
 // Valid initial values for use with mutex_ functions are 0 (unlocked) and 1 (locked).
 // Valid initial values for use with futex_ functions are 0 (unset) and 1 (set).
+// No initialization is necessary for use as c with the condition_ functions.
 // The value should not be changed after multiple processes have started
 // accessing an instance except through the functions declared in this file.
 typedef volatile uint32_t mutex __attribute__((aligned(sizeof(int))));
 
-// Have to align structs containing it to sizeof(int).
-// Used with the condition_ functions.
-// It should be 0-initialized.
-typedef struct {
-  // The futex that is actually used to wait.
-  mutex wait;
-  // The mutex associated with this condition variable.
-  mutex *m;
-} condition_variable;
-
 // All return -1 for other error (which will be in errno from futex(2)).
 //
 // There is no priority inversion protection.
@@ -73,18 +64,18 @@
 int futex_unset(mutex *m);
 
 // The condition_ functions implement condition variable support. The API is
-// similar to the pthreads api and works the same way.
+// similar to the pthreads api and works the same way. The same m argument must
+// be passed in for all calls to all of the functions with a given c.
 
 // Wait for the condition variable to be signalled. m will be unlocked
-// atomically with actually starting to wait. The same m argument must be used
-// for all calls with a given c.
-void condition_wait(condition_variable *c, mutex *m);
+// atomically with actually starting to wait.
+void condition_wait(mutex *c, mutex *m);
 // If any other processes are condition_waiting on c, wake 1 of them. Does not
-// require the m used with condition_wait on this c to be locked.
-void condition_signal(condition_variable *c);
-// Wakes all processes that are condition_waiting on c. Does not require the m
-// used with the condition_wait on this c to be locked.
-void condition_broadcast(condition_variable *c);
+// require m to be locked.
+void condition_signal(mutex *c, mutex *m);
+// Wakes all processes that are condition_waiting on c. Does not require m to be
+// locked.
+void condition_broadcast(mutex *c, mutex *m);
 
 #ifdef __cplusplus
 }
diff --git a/aos/atom_code/ipc_lib/aos_sync_test.cc b/aos/atom_code/ipc_lib/aos_sync_test.cc
deleted file mode 100644
index 016602b..0000000
--- a/aos/atom_code/ipc_lib/aos_sync_test.cc
+++ /dev/null
@@ -1,5 +0,0 @@
-#include "aos/aos_core.h"
-
-#include "gtest/gtest.h"
-
-
diff --git a/aos/atom_code/ipc_lib/condition.cc b/aos/atom_code/ipc_lib/condition.cc
index f65c67d..c67ea5f 100644
--- a/aos/atom_code/ipc_lib/condition.cc
+++ b/aos/atom_code/ipc_lib/condition.cc
@@ -6,8 +6,6 @@
 
 namespace aos {
 
-static_assert(shm_ok<condition_variable>::value,
-              "all C structs really should work in shared memory");
 static_assert(shm_ok<Condition>::value, "Condition should work"
               " in shared memory");
 
@@ -18,11 +16,11 @@
 }
 
 void Condition::Signal() {
-  condition_signal(&impl_);
+  condition_signal(&impl_, &m_->impl_);
 }
 
 void Condition::Broadcast() {
-  condition_broadcast(&impl_);
+  condition_broadcast(&impl_, &m_->impl_);
 }
 
 }  // namespace aos
diff --git a/aos/atom_code/ipc_lib/ipc_stress_test.py b/aos/atom_code/ipc_lib/ipc_stress_test.py
new file mode 100755
index 0000000..8e402ac
--- /dev/null
+++ b/aos/atom_code/ipc_lib/ipc_stress_test.py
@@ -0,0 +1,67 @@
+#!/usr/bin/python
+
+# This is a script that runs all of the tests that deal with the IPC stuff in
+# parallel for a while and makes sure that they don't ever fail.
+
+import subprocess
+import random
+import multiprocessing
+import time
+import os
+import Queue
+import sys
+
+TESTS = [
+  ['queue_test'],
+  ['condition_test'],
+  # The fairness test doesn't work under load.
+  ['mutex_test', '--gtest_filter=-MutexTest.Fairness'],
+  #['ipc_queue_test'],
+]
+TESTS_PATH = '../../../out_atom/Default/tests'
+# The tests spend a lot of their time waiting (ie for things to time out), so I
+# had to use this many to get the highest CPU utilization.
+TESTERS = 35
+TEST_TIME = 10
+
+def run(iterations_queue, output_lock, stop_time):
+  iterations = 0
+  while time.time() < stop_time:
+    test = random.choice(TESTS)
+    try:
+      output = subprocess.check_output(
+          ["%s/%s/%s" %(
+            os.path.dirname(os.path.abspath(__file__)), TESTS_PATH, test[0])] +
+            test[1:],
+          stderr=subprocess.STDOUT,
+          bufsize=-1)
+    except subprocess.CalledProcessError as error:
+      with output_lock:
+        sys.stderr.write("------Test %s failed with exit %d output:------\n%s" %
+            (test, error.returncode, error.output))
+    iterations += 1
+  iterations_queue.put(iterations)
+
+def main():
+  processes = []
+  output_lock = multiprocessing.Lock()
+  iterations_queue = multiprocessing.Queue()
+  stop_time = time.time() + TEST_TIME
+  stop_event = multiprocessing.Event()
+  for _ in xrange(TESTERS):
+    process = multiprocessing.Process(target=run,
+      args=(iterations_queue, output_lock, stop_time))
+    processes.append(process)
+    process.start()
+  for process in processes:
+    process.join()
+  total_iterations = 0
+  try:
+    while True:
+      total_iterations += iterations_queue.get_nowait()
+  except Queue.Empty:
+    pass
+  print("Iterated a total of %d times." % total_iterations)
+
+if __name__ == '__main__':
+  main()
diff --git a/aos/atom_code/ipc_lib/queue.cc b/aos/atom_code/ipc_lib/queue.cc
index 81d036c..7ab7b6c 100644
--- a/aos/atom_code/ipc_lib/queue.cc
+++ b/aos/atom_code/ipc_lib/queue.cc
@@ -53,6 +53,10 @@
 static_assert(shm_ok<RawQueue::MessageHeader>::value, "the whole point"
               " is to stick it in shared memory");
 
+struct RawQueue::ReadData {
+  bool writable_start;
+};
+
 // TODO(brians) maybe do this with atomic integer instructions so it doesn't
 //   have to lock/unlock pool_lock_
 void RawQueue::DecrementMessageReferenceCount(const void *msg) {
@@ -116,32 +120,35 @@
   }
   RawQueue *current = static_cast<RawQueue *>(
       global_core->mem_struct->queues.queue_list);
-  RawQueue *last = NULL;
-  while (current != NULL) {
-    // if we found a matching queue
-    if (strcmp(current->name_, name) == 0 && current->length_ == length &&
-        current->hash_ == hash && current->queue_length_ == queue_length) {
-      mutex_unlock(&global_core->mem_struct->queues.alloc_lock);
-      return current;
-    } else {
-      if (kFetchDebug) {
-        printf("rejected queue %s strcmp=%d target=%s\n", current->name_,
-               strcmp(current->name_, name), name);
+  if (current != NULL) {
+    while (true) {
+      // If we found a matching queue.
+      if (strcmp(current->name_, name) == 0 && current->length_ == length &&
+          current->hash_ == hash && current->queue_length_ == queue_length) {
+        mutex_unlock(&global_core->mem_struct->queues.alloc_lock);
+        return current;
+      } else {
+        if (kFetchDebug) {
+          printf("rejected queue %s strcmp=%d target=%s\n", current->name_,
+                 strcmp(current->name_, name), name);
+        }
       }
+      // If this is the last one.
+      if (current->next_ == NULL) break;
+      current = current->next_;
     }
-    current = current->next_;
   }
 
-  void *temp = shm_malloc(sizeof(RawQueue));
-  current = new (temp) RawQueue(name, length, hash, queue_length);
-  if (last == NULL) {  // if we don't have one to tack the new one on to
-    global_core->mem_struct->queues.queue_list = current;
+  RawQueue *r = new (shm_malloc(sizeof(RawQueue)))
+      RawQueue(name, length, hash, queue_length);
+  if (current == NULL) {  // if we don't already have one
+    global_core->mem_struct->queues.queue_list = r;
   } else {
-    last->next_ = current;
+    current->next_ = r;
   }
 
   mutex_unlock(&global_core->mem_struct->queues.alloc_lock);
-  return current;
+  return r;
 }
 RawQueue *RawQueue::Fetch(const char *name, size_t length, int hash,
                     int queue_length,
@@ -151,6 +158,7 @@
   if (r == r->recycle_) {
     fprintf(stderr, "queue: r->recycle_(=%p) == r(=%p)\n", r->recycle_, r);
     printf("see stderr\n");
+    r->recycle_ = NULL;
     abort();
   }
   *recycle = r->recycle_;
@@ -181,14 +189,14 @@
       // header with the one being freed, which effectively
       // switches which queue each message belongs to.
       MessageHeader *const new_header = MessageHeader::Get(new_msg);
-      // also switch the messages between the pools
+      // Also switch the messages between the pools.
       pool_[header->index] = new_header;
       {
         MutexLocker locker(&recycle_->pool_lock_);
         recycle_->pool_[new_header->index] = header;
-        // swap the information in both headers
+        // Swap the information in both headers.
         header->Swap(new_header);
-        // don't unlock the other pool until all of its messages are valid
+        // Don't unlock the other pool until all of its messages are valid.
       }
       // use the header for new_msg which is now for this pool
       header = new_header;
@@ -202,22 +210,22 @@
     }
   }
 
-  // where the one we're freeing was
+  // Where the one we're freeing was.
   int index = header->index;
   header->index = -1;
   if (index != messages_used_) {  // if we're not freeing the one on the end
-    // put the last one where the one we're freeing was
+    // Put the last one where the one we're freeing was.
     header = pool_[index] = pool_[messages_used_];
-    // put the one we're freeing at the end
+    // Put the one we're freeing at the end.
     pool_[messages_used_] = MessageHeader::Get(msg);
-    // update the former last one's index
+    // Update the former last one's index.
     header->index = index;
   }
 }
 
 bool RawQueue::WriteMessage(void *msg, int options) {
   if (kWriteDebug) {
-    printf("queue: %p->WriteMessage(%p, %d)\n", this, msg, options);
+    printf("queue: %p->WriteMessage(%p, %x)\n", this, msg, options);
   }
   if (msg == NULL || msg < reinterpret_cast<void *>(global_core->mem_struct) ||
       msg > static_cast<void *>((
@@ -230,18 +238,23 @@
   }
   {
     MutexLocker locker(&data_lock_);
-    int new_end = (data_end_ + 1) % data_length_;
-    while (new_end == data_start_) {
+    bool writable_waited = false;
+
+    int new_end;
+    while (true) {
+      new_end = (data_end_ + 1) % data_length_;
+      // If there is room in the queue right now.
+      if (new_end != data_start_) break;
       if (options & kNonBlock) {
         if (kWriteDebug) {
-          printf("queue: not blocking on %p. returning -1\n", this);
+          printf("queue: not blocking on %p. returning false\n", this);
         }
         return false;
       } else if (options & kOverride) {
         if (kWriteDebug) {
           printf("queue: overriding on %p\n", this);
         }
-        // avoid leaking the message that we're going to overwrite
+        // Avoid leaking the message that we're going to overwrite.
         DecrementMessageReferenceCount(data_[data_start_]);
         data_start_ = (data_start_ + 1) % data_length_;
       } else {  // kBlock
@@ -249,29 +262,44 @@
           printf("queue: going to wait for writable_ of %p\n", this);
         }
         writable_.Wait();
+        writable_waited = true;
       }
-      new_end = (data_end_ + 1) % data_length_;
     }
     data_[data_end_] = msg;
     ++messages_;
     data_end_ = new_end;
+
+    if (kWriteDebug) {
+      printf("queue: broadcasting to readable_ of %p\n", this);
+    }
+    readable_.Broadcast();
+
+    // If we got a signal on writable_ here and it's still writable, then we
+    // need to signal the next person in line (if any).
+    if (writable_waited && is_writable()) {
+      if (kWriteDebug) {
+        printf("queue: resignalling writable_ of %p\n", this);
+      }
+      writable_.Signal();
+    }
   }
   if (kWriteDebug) {
-    printf("queue: setting readable of %p\n", this);
-  }
-  readable_.Signal();
-  if (kWriteDebug) {
     printf("queue: write returning true on queue %p\n", this);
   }
   return true;
 }
 
-void RawQueue::ReadCommonEnd(bool read) {
-  if (read) {
-    writable_.Signal();
+void RawQueue::ReadCommonEnd(ReadData *read_data) {
+  if (is_writable()) {
+    if (kReadDebug) {
+      printf("queue: %ssignalling writable_ of %p\n",
+             read_data->writable_start ? "not " : "", this);
+    }
+    if (!read_data->writable_start) writable_.Signal();
   }
 }
-bool RawQueue::ReadCommonStart(int options, int *index) {
+bool RawQueue::ReadCommonStart(int options, int *index, ReadData *read_data) {
+  read_data->writable_start = is_writable();
   while (data_start_ == data_end_ || ((index != NULL) && messages_ <= *index)) {
     if (options & kNonBlock) {
       if (kReadDebug) {
@@ -280,15 +308,13 @@
       return false;
     } else {  // kBlock
       if (kReadDebug) {
-        printf("queue: going to wait for readable of %p\n", this);
+        printf("queue: going to wait for readable_ of %p\n", this);
       }
-      data_lock_.Unlock();
-      // wait for a message to become readable
+      // Wait for a message to become readable.
       readable_.Wait();
       if (kReadDebug) {
-        printf("queue: done waiting for readable of %p\n", this);
+        printf("queue: done waiting for readable_ of %p\n", this);
       }
-      data_lock_.Lock();
     }
   }
   if (kReadDebug) {
@@ -304,12 +330,12 @@
       pos = data_length_ - 1;
     }
     if (kReadDebug) {
-      printf("queue: reading from line %d: %d\n", __LINE__, pos);
+      printf("queue: %p reading from line %d: %d\n", this, __LINE__, pos);
     }
     ret = data_[pos];
   } else {
     if (kReadDebug) {
-      printf("queue: reading from line %d: %d\n", __LINE__, start);
+      printf("queue: %p reading from line %d: %d\n", this, __LINE__, start);
     }
     ret = data_[start];
   }
@@ -322,111 +348,120 @@
 }
 const void *RawQueue::ReadMessage(int options) {
   if (kReadDebug) {
-    printf("queue: %p->ReadMessage(%d)\n", this, options);
+    printf("queue: %p->ReadMessage(%x)\n", this, options);
   }
   void *msg = NULL;
+
   MutexLocker locker(&data_lock_);
-  if (!ReadCommonStart(options, NULL)) {
+
+  ReadData read_data;
+  if (!ReadCommonStart(options, NULL, &read_data)) {
     if (kReadDebug) {
-      printf("queue: common returned false for %p\n", this);
+      printf("queue: %p common returned false\n", this);
     }
     return NULL;
   }
+
   if (options & kPeek) {
     msg = ReadPeek(options, data_start_);
   } else {
     if (options & kFromEnd) {
-      while (1) {
+      while (true) {
         if (kReadDebug) {
-          printf("queue: start of c2 of %p\n", this);
+          printf("queue: %p start of c2\n", this);
         }
         // This loop pulls each message out of the buffer.
         const int pos = data_start_;
         data_start_ = (data_start_ + 1) % data_length_;
-        // if this is the last one
+        // If this is the last one.
         if (data_start_ == data_end_) {
           if (kReadDebug) {
-            printf("queue: reading from c2: %d\n", pos);
+            printf("queue: %p reading from c2: %d\n", this, pos);
           }
           msg = data_[pos];
           break;
         }
-        // it's not going to be in the queue any more
+        // This message is not going to be in the queue any more.
         DecrementMessageReferenceCount(data_[pos]);
       }
     } else {
       if (kReadDebug) {
-        printf("queue: reading from d2: %d\n", data_start_);
+        printf("queue: %p reading from d2: %d\n", this, data_start_);
       }
       msg = data_[data_start_];
+      // TODO(brians): Doesn't this need to increment the ref count?
       data_start_ = (data_start_ + 1) % data_length_;
     }
   }
-  ReadCommonEnd(!(options & kPeek));
+  ReadCommonEnd(&read_data);
   if (kReadDebug) {
-    printf("queue: read returning %p\n", msg);
+    printf("queue: %p read returning %p\n", this, msg);
   }
   return msg;
 }
 const void *RawQueue::ReadMessageIndex(int options, int *index) {
   if (kReadDebug) {
-    printf("queue: %p->ReadMessageIndex(%d, %p(*=%d))\n",
+    printf("queue: %p->ReadMessageIndex(%x, %p(*=%d))\n",
            this, options, index, *index);
   }
   void *msg = NULL;
-  {
-    MutexLocker locker(&data_lock_);
-    if (!ReadCommonStart(options, index)) {
+
+  MutexLocker locker(&data_lock_);
+
+  ReadData read_data;
+  if (!ReadCommonStart(options, index, &read_data)) {
+    if (kReadDebug) {
+      printf("queue: %p common returned false\n", this);
+    }
+    return NULL;
+  }
+
+  // TODO(parker): Handle integer wrap on the index.
+
+  // How many unread messages we have.
+  const int offset = messages_ - *index;
+  // Where we're going to start reading.
+  int my_start = data_end_ - offset;
+  if (my_start < 0) {  // If we want to read off the end of the buffer.
+    // Unwrap it.
+    my_start += data_length_;
+  }
+  if (offset >= data_length_) {  // If we're behind the available messages.
+    // Catch index up to the last available message.
+    *index += data_start_ - my_start;
+    // And that's the one we're going to read.
+    my_start = data_start_;
+  }
+  if (options & kPeek) {
+    msg = ReadPeek(options, my_start);
+  } else {
+    if (options & kFromEnd) {
       if (kReadDebug) {
-        printf("queue: common returned false for %p\n", this);
+        printf("queue: %p start of c1\n", this);
       }
-      return NULL;
-    }
-    // TODO(parker): Handle integer wrap on the index.
-    const int offset = messages_ - *index;
-    int my_start = data_end_ - offset;
-    if (offset >= data_length_) {  // if we're behind the available messages
-      // catch index up to the last available message
-      *index += data_start_ - my_start;
-      // and that's the one we're going to read
-      my_start = data_start_;
-    }
-    if (my_start < 0) {  // if we want to read off the end of the buffer
-      // unwrap where we're going to read from
-      my_start += data_length_;
-    }
-    if (options & kPeek) {
-      msg = ReadPeek(options, my_start);
+      int pos = data_end_ - 1;
+      if (pos < 0) {  // If it wrapped.
+        pos = data_length_ - 1;  // Unwrap it.
+      }
+      if (kReadDebug) {
+        printf("queue: %p reading from c1: %d\n", this, pos);
+      }
+      msg = data_[pos];
+      *index = messages_;
     } else {
-      if (options & kFromEnd) {
-        if (kReadDebug) {
-          printf("queue: start of c1 of %p\n", this);
-        }
-        int pos = data_end_ - 1;
-        if (pos < 0) {  // if it wrapped
-          pos = data_length_ - 1;  // unwrap it
-        }
-        if (kReadDebug) {
-          printf("queue: reading from c1: %d\n", pos);
-        }
-        msg = data_[pos];
-        *index = messages_;
-      } else {
-        if (kReadDebug) {
-          printf("queue: reading from d1: %d\n", my_start);
-        }
-        msg = data_[my_start];
-        ++(*index);
+      if (kReadDebug) {
+        printf("queue: %p reading from d1: %d\n", this, my_start);
       }
-      MessageHeader *const header = MessageHeader::Get(msg);
-      ++header->ref_count;
-      if (kRefDebug) {
-        printf("ref_inc_count: %p\n", msg);
-      }
+      msg = data_[my_start];
+      ++(*index);
+    }
+    MessageHeader *const header = MessageHeader::Get(msg);
+    ++header->ref_count;
+    if (kRefDebug) {
+      printf("ref_inc_count: %p\n", msg);
     }
   }
-  // this function never consumes one off the queue
-  ReadCommonEnd(false);
+  ReadCommonEnd(&read_data);
   return msg;
 }
 
@@ -446,7 +481,7 @@
   void *msg = reinterpret_cast<uint8_t *>(header) + sizeof(MessageHeader);
   header->ref_count = 1;
   if (kRefDebug) {
-    printf("ref alloc: %p\n", msg);
+    printf("%p ref alloc: %p\n", this, msg);
   }
   header->index = messages_used_;
   ++messages_used_;
diff --git a/aos/atom_code/ipc_lib/queue.h b/aos/atom_code/ipc_lib/queue.h
index 0ab4227..5158558 100644
--- a/aos/atom_code/ipc_lib/queue.h
+++ b/aos/atom_code/ipc_lib/queue.h
@@ -106,11 +106,17 @@
   // FreeMessage.
   void *GetMessage();
 
-  // It is ok to call this with msg == NULL.
-  void FreeMessage(const void *msg) { DecrementMessageReferenceCount(msg); }
+  // It is ok to call this method with a NULL msg.
+  void FreeMessage(const void *msg) {
+    if (msg != NULL) DecrementMessageReferenceCount(msg);
+  }
 
  private:
   struct MessageHeader;
+  struct ReadData;
+
+  bool is_readable() { return data_end_ != data_start_; }
+  bool is_writable() { return ((data_end_ + 1) % data_length_) != data_start_; }
 
   // These next 4 allow finding the right one.
   const char *name_;
@@ -123,6 +129,8 @@
   RawQueue *recycle_;
 
   Mutex data_lock_;  // protects operations on data_ etc
+  // Always gets broadcasted to because different readers might have different
+  // ideas of what "readable" means (ie ones using separated indices).
   Condition readable_;
   Condition writable_;
   int data_length_;  // max length into data + 1
@@ -144,12 +152,13 @@
   void DecrementMessageReferenceCount(const void *msg);
 
   // Should be called with data_lock_ locked.
+  // *read_data will be initialized.
   // Returns with a readable message in data_ or false.
-  bool ReadCommonStart(int options, int *index);
+  bool ReadCommonStart(int options, int *index, ReadData *read_data);
   // Deals with setting/unsetting readable_ and writable_.
   // Should be called after data_lock_ has been unlocked.
-  // read is whether or not this read call read one off the queue
-  void ReadCommonEnd(bool read);
+  // read_data should be the same thing that was passed in to ReadCommonStart.
+  void ReadCommonEnd(ReadData *read_data);
   // Handles reading with kPeek.
   void *ReadPeek(int options, int start);
 
diff --git a/aos/atom_code/ipc_lib/queue_test.cc b/aos/atom_code/ipc_lib/queue_test.cc
index 276e81a..8925c65 100644
--- a/aos/atom_code/ipc_lib/queue_test.cc
+++ b/aos/atom_code/ipc_lib/queue_test.cc
@@ -142,9 +142,8 @@
   }
 
  protected:
-  // Function gets called with arg in a forked process.
+  // function gets called with arg in a forked process.
   // Leaks shared memory.
-  // the attribute is in the middle to make gcc happy
   template<typename T> __attribute__((warn_unused_result))
       std::unique_ptr<ForkedProcess> ForkExecute(void (*function)(T*), T *arg) {
     mutex *lock = static_cast<mutex *>(shm_malloc_aligned(
@@ -152,7 +151,7 @@
     *lock = 1;
     const pid_t pid = fork();
     switch (pid) {
-      case 0: // child
+      case 0:  // child
         if (kForkSleep != 0) {
           printf("pid %jd sleeping for %u\n", static_cast<intmax_t>(getpid()),
                  kForkSleep);
@@ -162,10 +161,10 @@
         function(arg);
         mutex_unlock(lock);
         exit(EXIT_SUCCESS);
-      case -1: // parent failure
+      case -1:  // parent failure
         printf("fork() failed with %d: %s\n", errno, strerror(errno));
         return std::unique_ptr<ForkedProcess>();
-      default: // parent
+      default:  // parent
         return std::unique_ptr<ForkedProcess>(new ForkedProcess(pid, lock));
     }
   }
@@ -243,12 +242,12 @@
 } while (false)
 
   struct TestMessage {
-    int16_t data; // don't really want to test empty messages
+    int16_t data;  // don't really want to test empty messages
   };
   struct MessageArgs {
     RawQueue *const queue;
     int flags;
-    int16_t data; // -1 means NULL expected
+    int16_t data;  // -1 means NULL expected
   };
   static void WriteTestMessage(MessageArgs *args, char *failure) {
     TestMessage *msg = static_cast<TestMessage *>(args->queue->GetMessage());
diff --git a/aos/build/aos_all.gyp b/aos/build/aos_all.gyp
index bda36fa..adfaf65 100644
--- a/aos/build/aos_all.gyp
+++ b/aos/build/aos_all.gyp
@@ -42,6 +42,7 @@
         '<(AOS)/common/common.gyp:type_traits_test',
         '<(AOS)/common/common.gyp:time_test',
         '<(AOS)/common/common.gyp:mutex_test',
+        '<(AOS)/common/common.gyp:condition_test',
         '<(AOS)/common/common.gyp:once_test',
         '<(AOS)/common/logging/logging.gyp:logging_impl_test',
       ],
diff --git a/aos/common/common.gyp b/aos/common/common.gyp
index c867193..eee1d05 100644
--- a/aos/common/common.gyp
+++ b/aos/common/common.gyp
@@ -271,6 +271,23 @@
       ],
     },
     {
+      'target_name': 'condition_test',
+      'type': 'executable',
+      'sources': [
+        'condition_test.cc',
+      ],
+      'dependencies': [
+        '<(EXTERNALS):gtest',
+        'condition',
+        '<(AOS)/common/util/util.gyp:thread',
+        'time',
+        'mutex',
+        '<(AOS)/build/aos.gyp:logging',
+        'queue_testutils',
+        '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:core_lib',
+       ],
+    },
+    {
       'target_name': 'die_test',
       'type': 'executable',
       'sources': [
diff --git a/aos/common/condition.h b/aos/common/condition.h
index 956dc6c..346ae54 100644
--- a/aos/common/condition.h
+++ b/aos/common/condition.h
@@ -14,7 +14,8 @@
 // exactly 1 mutex must be associated with each condition variable.
 class Condition {
  public:
-  // m is the mutex that will be associated with this condition variable.
+  // m is the mutex that will be associated with this condition variable. This
+  // object will hold on to a reference to it but does not take ownership.
   explicit Condition(Mutex *m);
 
   // Waits for the condition variable to be signalled, atomically unlocking m at
@@ -25,16 +26,18 @@
   // Signals at most 1 other process currently Wait()ing on this condition
   // variable. Calling this does not require the mutex associated with this
   // condition variable to be locked.
-  // One of the processes with the highest priority level will be woken if there
-  // are multiple ones.
+  // One of the processes with the highest priority level will be woken.
   void Signal();
   // Wakes all processes that are currently Wait()ing on this condition
   // variable. Calling this does not require the mutex associated with this
   // condition variable to be locked.
   void Broadcast();
 
+  // Retrieves the mutex associated with this condition variable.
+  Mutex *m() { return m_; }
+
  private:
-  condition_variable impl_;
+  mutex impl_;
   Mutex *m_;
 };
 
diff --git a/aos/common/condition_test.cc b/aos/common/condition_test.cc
new file mode 100644
index 0000000..ddc12d5
--- /dev/null
+++ b/aos/common/condition_test.cc
@@ -0,0 +1,263 @@
+#include "aos/common/condition.h"
+
+#include <unistd.h>
+#include <sys/types.h>
+#include <sys/wait.h>
+
+#include "gtest/gtest.h"
+
+#include "aos/common/util/thread.h"
+#include "aos/common/time.h"
+#include "aos/common/mutex.h"
+#include "aos/common/queue_testutils.h"
+#include "aos/common/type_traits.h"
+#include "aos/atom_code/ipc_lib/core_lib.h"
+#include "aos/common/logging/logging.h"
+
+using ::aos::time::Time;
+using ::aos::common::testing::GlobalCoreInstance;
+
+namespace aos {
+namespace testing {
+
+class ConditionTest : public ::testing::Test {
+ public:
+  struct Shared {
+    Shared() : condition(&mutex) {}
+
+    Mutex mutex;
+    Condition condition;
+  };
+  static_assert(shm_ok<Shared>::value,
+                "it's going to get shared between forked processes");
+
+  ConditionTest() : shared_(static_cast<Shared *>(shm_malloc(sizeof(Shared)))) {
+    new (shared_) Shared();
+  }
+
+  GlobalCoreInstance my_core;
+
+  Shared *const shared_;
+
+  void Settle() {
+    time::SleepFor(::Time::InSeconds(0.009));
+  }
+};
+
+class ConditionTestProcess {
+ public:
+  enum class Action {
+    kWaitLockStart,  // lock, delay, wait, unlock
+    kWait,  // delay, lock, wait, unlock
+    kWaitNoUnlock,  // delay, lock, wait
+    kSignal,  // delay, signal
+    kBroadcast,  // delay, broadcast
+  };
+
+  // This amount gets added to any passed in delay to make the test repeatable.
+  static constexpr ::Time kMinimumDelay = ::Time::InSeconds(0.015);
+  static constexpr ::Time kDefaultTimeout = ::Time::InSeconds(0.06);
+
+  // delay is how long to wait before doing action to condition.
+  // timeout is how long to wait after delay before deciding that it's hung.
+  ConditionTestProcess(const ::Time &delay, Action action, Condition *condition,
+                       const ::Time &timeout = kDefaultTimeout)
+    : delay_(kMinimumDelay + delay), action_(action), condition_(condition),
+      timeout_(delay_ + timeout), child_(-1),
+      shared_(static_cast<Shared *>(shm_malloc(sizeof(Shared)))) {
+    new (shared_) Shared();
+  }
+  ~ConditionTestProcess() {
+    assert(child_ == -1);
+  }
+
+  void Start() {
+    ASSERT_FALSE(shared_->started);
+
+    child_ = fork();
+    if (child_ == 0) {  // in child
+      Run();
+      exit(EXIT_SUCCESS);
+    } else {  // in parent
+      assert(child_ != -1);
+
+      shared_->ready.Lock();
+
+      shared_->started = true;
+    }
+  }
+
+  bool IsFinished() {
+    return shared_->finished;
+  }
+
+  ::testing::AssertionResult Hung() {
+    if (!shared_->started) {
+      ADD_FAILURE();
+      return ::testing::AssertionFailure() << "not started yet";
+    }
+    if (shared_->finished) {
+      Join();
+      return ::testing::AssertionFailure() << "already returned";
+    }
+    if (shared_->delayed) {
+      if (shared_->start_time > ::Time::Now() + timeout_) {
+        Kill();
+        return ::testing::AssertionSuccess() << "already been too long";
+      }
+    } else {
+      shared_->done_delaying.Lock();
+    }
+    time::SleepFor(::Time::InSeconds(0.01));
+    if (!shared_->finished) time::SleepUntil(shared_->start_time + timeout_);
+    if (shared_->finished) {
+      Join();
+      return ::testing::AssertionFailure() << "completed within timeout";
+    } else {
+      Kill();
+      return ::testing::AssertionSuccess() << "took too long";
+    }
+  }
+  ::testing::AssertionResult Test() {
+    Start();
+    return Hung();
+  }
+
+ private:
+  struct Shared {
+    Shared()
+      : started(false), delayed(false), start_time(0, 0), finished(false) {
+      done_delaying.Lock();
+      ready.Lock();
+    }
+
+    bool started;
+    bool delayed;
+    Mutex done_delaying;
+    ::Time start_time;
+    bool finished;
+    Mutex ready;
+  };
+  static_assert(shm_ok<Shared>::value,
+                "it's going to get shared between forked processes");
+
+  void Run() {
+    if (action_ == Action::kWaitLockStart) {
+      shared_->ready.Unlock();
+      condition_->m()->Lock();
+    }
+    time::SleepFor(delay_);
+    shared_->start_time = ::Time::Now();
+    shared_->delayed = true;
+    shared_->done_delaying.Unlock();
+    switch (action_) {
+      case Action::kWait:
+      case Action::kWaitNoUnlock:
+        shared_->ready.Unlock();
+        condition_->m()->Lock();
+      case Action::kWaitLockStart:
+        condition_->Wait();
+        break;
+      case Action::kSignal:
+        shared_->ready.Unlock();
+        condition_->Signal();
+        break;
+      case Action::kBroadcast:
+        shared_->ready.Unlock();
+        condition_->Broadcast();
+        break;
+    }
+    shared_->finished = true;
+    if (action_ == Action::kWait || action_ == Action::kWaitLockStart) {
+      condition_->m()->Unlock();
+    }
+  }
+
+  void Join() {
+    assert(child_ != -1);
+    int status;
+    do {
+      assert(waitpid(child_, &status, 0) == child_);
+    } while (!(WIFEXITED(status) || WIFSIGNALED(status)));
+    child_ = -1;
+  }
+  void Kill() {
+    assert(child_ != -1);
+    assert(kill(child_, SIGTERM) == 0);
+    Join();
+  }
+
+  const ::Time delay_;
+  const Action action_;
+  Condition *const condition_;
+  const ::Time timeout_;
+
+  pid_t child_;
+
+  Shared *const shared_;
+};
+constexpr ::Time ConditionTestProcess::kMinimumDelay;
+constexpr ::Time ConditionTestProcess::kDefaultTimeout;
+
+// Makes sure that the testing framework and everything work for a really simple
+// Wait() and then Signal().
+TEST_F(ConditionTest, Basic) {
+  ConditionTestProcess thread(::Time(0, 0),
+                              ConditionTestProcess::Action::kWait,
+                              &shared_->condition);
+  thread.Start();
+  Settle();
+  EXPECT_FALSE(thread.IsFinished());
+  shared_->condition.Signal();
+  EXPECT_FALSE(thread.Hung());
+}
+
+// Makes sure that the worker thread locks before it tries to Wait() etc.
+TEST_F(ConditionTest, Locking) {
+  ConditionTestProcess thread(::Time(0, 0),
+                              ConditionTestProcess::Action::kWait,
+                              &shared_->condition);
+  shared_->mutex.Lock();
+  thread.Start();
+  Settle();
+  // This Signal() shouldn't do anything because the thread should still be
+  // waiting to lock the mutex.
+  shared_->condition.Signal();
+  Settle();
+  shared_->mutex.Unlock();
+  EXPECT_TRUE(thread.Hung());
+}
+
+// Tests that the work thread only catches a Signal() after the mutex gets
+// unlocked.
+TEST_F(ConditionTest, LockFirst) {
+  ConditionTestProcess thread(::Time(0, 0),
+                              ConditionTestProcess::Action::kWait,
+                              &shared_->condition);
+  shared_->mutex.Lock();
+  thread.Start();
+  Settle();
+  shared_->condition.Signal();
+  Settle();
+  EXPECT_FALSE(thread.IsFinished());
+  shared_->mutex.Unlock();
+  Settle();
+  EXPECT_FALSE(thread.IsFinished());
+  shared_->condition.Signal();
+  EXPECT_FALSE(thread.Hung());
+}
+
+// Tests that the mutex gets relocked after Wait() returns.
+TEST_F(ConditionTest, Relocking) {
+  ConditionTestProcess thread(::Time(0, 0),
+                              ConditionTestProcess::Action::kWaitNoUnlock,
+                              &shared_->condition);
+  thread.Start();
+  Settle();
+  shared_->condition.Signal();
+  EXPECT_FALSE(thread.Hung());
+  EXPECT_FALSE(shared_->mutex.TryLock());
+}
+
+}  // namespace testing
+}  // namespace aos
diff --git a/aos/common/mutex_test.cpp b/aos/common/mutex_test.cpp
index 7a5c2ce..d9327a1 100644
--- a/aos/common/mutex_test.cpp
+++ b/aos/common/mutex_test.cpp
@@ -59,12 +59,13 @@
   }
   EXPECT_TRUE(test_mutex.TryLock());
 }
+
 TEST_F(MutexTest, MutexUnlocker) {
   test_mutex.Lock();
   {
     aos::MutexUnlocker unlocker(&test_mutex);
     // If this fails, then something weird is going on and the next line might
-    // hang.
+    // hang, so fail immediately.
     ASSERT_TRUE(test_mutex.TryLock());
     test_mutex.Unlock();
   }
@@ -122,6 +123,7 @@
 };
 int MutexFairnessWorkerThread::cyclesRun;
 int MutexFairnessWorkerThread::totalCycles;
+
 // Tests the fairness of the implementation. It does this by repeatedly locking
 // and unlocking a mutex in multiple threads and then checking the standard
 // deviation of the number of times each one locks.
diff --git a/aos/common/queue_testutils.cc b/aos/common/queue_testutils.cc
index 1d47e62..629c9be 100644
--- a/aos/common/queue_testutils.cc
+++ b/aos/common/queue_testutils.cc
@@ -1,6 +1,7 @@
 #include "aos/common/queue_testutils.h"
 
 #include <string.h>
+#include <sys/mman.h>
 
 #include "gtest/gtest.h"
 
@@ -110,15 +111,18 @@
 
 Once<void> enable_test_logging_once(DoEnableTestLogging);
 
+const size_t kCoreSize = 0x100000;
+
 }  // namespace
 
 GlobalCoreInstance::GlobalCoreInstance() {
-  const size_t kCoreSize = 0x100000;
   global_core = &global_core_data_;
   global_core->owner = 1;
-  void *memory = malloc(kCoreSize);
-  assert(memory != NULL);
-  memset(memory, 0, kCoreSize);
+  // Use mmap(2) manually so that we can pass MAP_SHARED so that forked
+  // processes can still communicate using the "shared" memory.
+  void *memory = mmap(NULL, kCoreSize, PROT_READ | PROT_WRITE,
+                      MAP_SHARED | MAP_ANONYMOUS, -1, 0);
+  assert(memory != MAP_FAILED);
 
   assert(aos_core_use_address_as_shared_mem(memory, kCoreSize) == 0);
 
@@ -126,7 +130,7 @@
 }
 
 GlobalCoreInstance::~GlobalCoreInstance() {
-  free(global_core->mem_struct);
+  assert(munmap(global_core->mem_struct, kCoreSize) == 0);
   global_core = NULL;
 }