copying condition-api-fix branch over from my 2012 repo
diff --git a/aos/atom_code/ipc_lib/aos_sync.c b/aos/atom_code/ipc_lib/aos_sync.c
index a9a4779..4c14a35 100644
--- a/aos/atom_code/ipc_lib/aos_sync.c
+++ b/aos/atom_code/ipc_lib/aos_sync.c
@@ -1,13 +1,16 @@
+#include "aos_sync.h"
+
 #include <stdio.h>
 #include <linux/futex.h>
 #include <unistd.h>
 #include <sys/syscall.h>
 #include <errno.h>
-#include "aos_sync.h"
-#include "cmpxchg.h"
 #include <stdint.h>
 #include <limits.h>
 #include <string.h>
+#include <inttypes.h>
+
+#include "cmpxchg.h"
 
 // this code is based on something that appears to be based on <http://www.akkadia.org/drepper/futex.pdf>, which also has a lot of useful information
 // should probably use <http://lxr.linux.no/linux+v2.6.34/Documentation/robust-futexes.txt> once it becomes available
@@ -23,113 +26,183 @@
 // 0 = unset
 // 1 = set
 
-static inline int sys_futex(mutex *addr1, int op, int val1, const struct timespec *timeout,
-		void *addr2, int val3) {
-	return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
+static inline int sys_futex(mutex *addr1, int op, int val1,
+    const struct timespec *timeout, void *addr2, int val3) {
+  return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
+}
+static inline int sys_futex_requeue(mutex *addr1, int op, int num_wake,
+    int num_requeue, mutex *m) {
+  return syscall(SYS_futex, addr1, op, num_wake, num_requeue, m);
+}
+static inline int sys_futex_op(mutex *addr1, int op, int num_waiters1,
+    int num_waiters2, mutex *addr2, int op_args_etc) {
+  return syscall(SYS_futex, addr1, op, num_waiters1,
+      num_waiters2, addr2, op_args_etc);
 }
 
 static inline int mutex_get(mutex *m, uint8_t signals_fail, const
-		struct timespec *timeout) {
-	int c;
-	c = cmpxchg(m, 0, 1);
-	if (!c) return 0;
-	/* The lock is now contended */
-	if (c == 1) c = xchg(m, 2);
-	while (c) {
-		/* Wait in the kernel */
-		//printf("sync here %d\n", __LINE__);
-		if (sys_futex(m, FUTEX_WAIT, 2, timeout, NULL, 0) == -1) {
-			if (signals_fail && errno == EINTR) {
-				return 1;
-			}
-			if (timeout != NULL && errno == ETIMEDOUT) {
-				return 2;
-			}
-		}
-		//printf("sync here %d\n", __LINE__);
-		c = xchg(m, 2);
-	}
-	return 0;
+    struct timespec *timeout) {
+  int c;
+  c = cmpxchg(m, 0, 1);
+  if (!c) return 0;
+  /* The lock is now contended */
+  if (c == 1) c = xchg(m, 2);
+  while (c) {
+    /* Wait in the kernel */
+    //printf("sync here %d\n", __LINE__);
+    if (sys_futex(m, FUTEX_WAIT, 2, timeout, NULL, 0) == -1) {
+      if (signals_fail && errno == EINTR) {
+        return 1;
+      }
+      if (timeout != NULL && errno == ETIMEDOUT) {
+        return 2;
+      }
+    }
+    //printf("sync here %d\n", __LINE__);
+    c = xchg(m, 2);
+  }
+  return 0;
 }
 int mutex_lock(mutex *m) {
-	return mutex_get(m, 1, NULL);
+  return mutex_get(m, 1, NULL);
 }
 int mutex_lock_timeout(mutex *m, const struct timespec *timeout) {
-	return mutex_get(m, 1, timeout);
+  return mutex_get(m, 1, timeout);
 }
 int mutex_grab(mutex *m) {
-	return mutex_get(m, 0, NULL);
+  return mutex_get(m, 0, NULL);
 }
 
-int mutex_unlock(mutex *m) {
-	/* Unlock, and if not contended then exit. */
-	//printf("mutex_unlock(%p) => %d \n",m,*m);
-	switch (xchg(m, 0)) {
-		case 0:
-			fprintf(stderr, "sync: multiple unlock of %p. aborting\n", m);
-			printf("see stderr\n");
-			abort();
-		case 1:
-			//printf("mutex_unlock return(%p) => %d \n",m,*m);
-			return 0;
-		case 2:
-			if (sys_futex(m, FUTEX_WAKE, 1, NULL, NULL, 0) == -1) {
-				fprintf(stderr, "sync: waking 1 from %p failed with %d: %s\n",
-						m, errno, strerror(errno));
-				return -1;
-			} else {
-				return 0;
-			}
-		default:
-			fprintf(stderr, "sync: got a garbage value from mutex %p. aborting\n",
-					m);
-			printf("see stderr\n");
-			abort();
-	}
+void mutex_unlock(mutex *m) {
+  /* Unlock, and if not contended then exit. */
+  //printf("mutex_unlock(%p) => %d \n",m,*m);
+  switch (xchg(m, 0)) {
+    case 0:
+      fprintf(stderr, "sync: multiple unlock of %p. aborting\n", m);
+      printf("see stderr\n");
+      abort();
+    case 1:
+      //printf("mutex_unlock return(%p) => %d \n",m,*m);
+      break;
+    case 2:
+      if (sys_futex(m, FUTEX_WAKE, 1, NULL, NULL, 0) == -1) {
+        fprintf(stderr, "sync: waking 1 from %p failed with %d: %s\n",
+            m, errno, strerror(errno));
+        printf("see stderr\n");
+        abort();
+      } else {
+        break;
+      }
+    default:
+      fprintf(stderr, "sync: got a garbage value from mutex %p. aborting\n",
+          m);
+      printf("see stderr\n");
+      abort();
+  }
 }
 int mutex_trylock(mutex *m) {
-	/* Try to take the lock, if is currently unlocked */
-	unsigned c = cmpxchg(m, 0, 1);
-	if (!c) return 0;
-	return 1;
+  /* Try to take the lock, if is currently unlocked */
+  unsigned c = cmpxchg(m, 0, 1);
+  if (!c) return 0;
+  return 1;
 }
 
-int condition_wait(mutex *m) {
-	if (*m) {
-		return 0;
-	}
-	if (sys_futex(m, FUTEX_WAIT, 0, NULL, NULL, 0) == -1) {
-		if (errno == EINTR) {
-			return 1;
-		} else if (errno != EWOULDBLOCK) {
-			return -1;
-		}
-	}
-	return 0;
+int futex_wait(mutex *m) {
+  if (*m) {
+    return 0;
+  }
+  if (sys_futex(m, FUTEX_WAIT, 0, NULL, NULL, 0) == -1) {
+    if (errno == EINTR) {
+      return 1;
+    } else if (errno != EWOULDBLOCK) {
+      return -1;
+    }
+  }
+  return 0;
 }
-int condition_wait_force(mutex *m) {
-	while (1) {
-		if (sys_futex(m, FUTEX_WAIT, *m, NULL, NULL, 0) == -1) {
-			if (errno != EWOULDBLOCK) { // if it was an actual problem
-				if (errno == EINTR) {
-					return 1;
-				} else {
-					return -1;
-				}
-			}
-		} else {
-			return 0;
-		}
-	}
+int futex_set_value(mutex *m, mutex value) {
+  xchg(m, value);
+  return sys_futex(m, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
 }
-inline int condition_set_value(mutex *m, mutex value) {
-	xchg(m, value);
-	return sys_futex(m, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
+int futex_set(mutex *m) {
+  return futex_set_value(m, 1);
 }
-int condition_set(mutex *m) {
-	return condition_set_value(m, 1);
-}
-int condition_unset(mutex *m) {
-	return !xchg(m, 0);
+int futex_unset(mutex *m) {
+  return !xchg(m, 0);
 }
 
+void condition_wait(condition_variable *c, mutex *m) {
+  if (__builtin_expect(c->m != m, 0)) {
+    if (c->m != NULL) {
+      fprintf(stderr, "sync: c(=%p)->m(=%p) != m(=%p)\n",
+          c, c->m, m);
+      printf("see stderr\n");
+      abort();
+    }
+    (void)cmpxchg(&c->m, NULL, m);
+    if (c->m != m) {
+      fprintf(stderr, "sync: c(=%p)->m(=%p) != m(=%p)"
+          " after trying to set it\n",
+          c, c->m, m);
+      printf("see stderr\n");
+      abort();
+    }
+  }
+
+  const mutex wait_start = c->wait;
+
+  mutex_unlock(m);
+
+  errno = 0;
+  do {
+    // If the syscall succeeded or somebody did a wake before we
+    // actually made it to sleep.
+    if (sys_futex(&c->wait, FUTEX_WAIT, wait_start, NULL, NULL, 0) == 0 ||
+        c->wait != wait_start) {
+      // Simplified mutex_lock that always leaves it
+      // contended in case anybody else got requeued.
+      while (xchg(&c->m, 2) != 0) {
+        // If the syscall failed and it wasn't
+        // because of a signal or a wake.
+        if (sys_futex(c->m, FUTEX_WAIT, 2, NULL, NULL, 0) == -1 &&
+            errno != EINTR) {
+          fprintf(stderr, "sync: FUTEX_WAIT(%p, 2, NULL, NULL, 0)"
+              " failed with %d: %s\n",
+              c->m, errno, strerror(errno));
+          printf("see stderr\n");
+          abort();
+        }
+      }
+    }
+  } while (__builtin_expect(errno == EINTR, 1));
+  fprintf(stderr, "FUTEX_WAIT(%p, %"PRIu32", NULL, NULL, 0) failed"
+      " with %d: %s\n",
+      &c->wait, wait_start, errno, strerror(errno));
+  printf("see stderr\n");
+  abort();
+}
+
+void condition_signal(condition_variable *c) {
+  __sync_fetch_and_add(&c->wait, 1);
+  if (sys_futex(&c->wait, FUTEX_WAKE, 1, NULL, NULL, 0) == -1) {
+    fprintf(stderr, "sync: FUTEX_WAKE(%p, 1, NULL, NULL, 0)"
+        " failed with %d: %s\n",
+        &c->wait, errno, strerror(errno));
+    printf("see stderr\n");
+    abort();
+  }
+}
+
+void condition_broadcast(condition_variable *c) {
+  // Have to check or else the wake syscall would fail.
+  if (__builtin_expect(c->m == NULL, 0)) return;
+  __sync_fetch_and_add(&c->wait, 1);
+  // Wake 1 waiter and requeue the rest.
+  if (sys_futex_requeue(&c->wait, FUTEX_REQUEUE, 1, INT_MAX, c->m) == -1) {
+    fprintf(stderr, "sync: FUTEX_REQUEUE(%p, 1, INT_MAX, %p, 0)"
+        " failed with %d: %s\n",
+        &c->wait, c->m, errno, strerror(errno));
+    printf("see stderr\n");
+    abort();
+  }
+}
diff --git a/aos/atom_code/ipc_lib/aos_sync.h b/aos/atom_code/ipc_lib/aos_sync.h
index 3c66264..d28ae3a 100644
--- a/aos/atom_code/ipc_lib/aos_sync.h
+++ b/aos/atom_code/ipc_lib/aos_sync.h
@@ -14,13 +14,23 @@
 // and <http://www.valgrind.org/docs/manual/drd-manual.html#drd-manual.clientreqs>
 // list the interesting ones
 
-// Have to align structs containing it to to sizeof(int).
+// Have to align structs containing it to sizeof(int).
 // Valid initial values for use with mutex_ functions are 0 (unlocked) and 1 (locked).
-// Valid initial values for use with condition_ functions are 0 (unset) and 1 (set).
+// Valid initial values for use with futex_ functions are 0 (unset) and 1 (set).
 // The value should not be changed after multiple processes have started
 // accessing an instance except through the functions declared in this file.
 typedef volatile uint32_t mutex __attribute__((aligned(sizeof(int))));
 
+// Have to align structs containing it to sizeof(int).
+// Used with the condition_ functions.
+// It should be 0-initialized.
+typedef struct {
+  // The futex that is actually used to wait.
+  mutex wait;
+  // The mutex associated with this condition variable.
+  mutex *m;
+} condition_variable;
+
 // All return -1 for other error (which will be in errno from futex(2)).
 
 // Returns 1 if interrupted by a signal.
@@ -30,31 +40,41 @@
   __attribute__((warn_unused_result));
 // Ignores signals. Can not fail.
 int mutex_grab(mutex *m);
-// Returns 1 for multiple unlocking and -1 if something bad happened and
-// whoever's waiting didn't get woken up.
-int mutex_unlock(mutex *m);
+// abort(2)s for multiple unlocking.
+void mutex_unlock(mutex *m);
 // Returns 0 when successful in locking the mutex and 1 if somebody else has it
 // locked.
 int mutex_trylock(mutex *m) __attribute__((warn_unused_result));
 
-// The condition_ functions are similar to the mutex_ ones but different.
+// The futex_ functions are similar to the mutex_ ones but different.
 // They are designed for signalling when something happens (possibly to
 // multiple listeners). A mutex manipulated with them can only be set or unset.
 
-// Wait for the condition to be set. Will return immediately if it's already set.
+// Wait for the futex to be set. Will return immediately if it's already set.
 // Returns 0 if successful or it was already set, 1 if interrupted by a signal, or -1.
-int condition_wait(mutex *m) __attribute__((warn_unused_result));
-// Will wait for the next condition_set, even if the condition is already set.
-// Returns 0 if successful, 1 if interrupted by a signal, or -1.
-int condition_wait_force(mutex *m) __attribute__((warn_unused_result));
-// Set the condition and wake up anybody waiting on it.
+int futex_wait(mutex *m) __attribute__((warn_unused_result));
+// Set the futex and wake up anybody waiting on it.
 // Returns the number that were woken or -1.
-int condition_set(mutex *m);
+int futex_set(mutex *m);
 // Same as above except lets something other than 1 be used as the final value.
-int condition_set_value(mutex *m, mutex value);
-// Unsets the condition.
+int futex_set_value(mutex *m, mutex value);
+// Unsets the futex.
 // Returns 0 if it was set before and 1 if it wasn't.
-int condition_unset(mutex *m);
+int futex_unset(mutex *m);
+
+// The condition_ functions implement condition variable support. The API is
+// similar to the pthreads api and works the same way.
+
+// Wait for the condition variable to be signalled. m will be unlocked
+// atomically with actually starting to wait. The same m argument must be used
+// for all calls with a given c.
+void condition_wait(condition_variable *c, mutex *m);
+// If any other processes are condition_waiting on c, wake 1 of them. Does not
+// require the m used with condition_wait on this c to be locked.
+void condition_signal(condition_variable *c);
+// Wakes all processes that are condition_waiting on c. Does not require the m
+// used with the condition_wait on this c to be locked.
+void condition_broadcast(condition_variable *c);
 
 #ifdef __cplusplus
 }
diff --git a/aos/atom_code/ipc_lib/aos_sync_test.cc b/aos/atom_code/ipc_lib/aos_sync_test.cc
new file mode 100644
index 0000000..016602b
--- /dev/null
+++ b/aos/atom_code/ipc_lib/aos_sync_test.cc
@@ -0,0 +1,5 @@
+#include "aos/aos_core.h"
+
+#include "gtest/gtest.h"
+
+
diff --git a/aos/atom_code/ipc_lib/mutex.cpp b/aos/atom_code/ipc_lib/mutex.cpp
index 4f908dd..47fc92a 100644
--- a/aos/atom_code/ipc_lib/mutex.cpp
+++ b/aos/atom_code/ipc_lib/mutex.cpp
@@ -26,10 +26,7 @@
 }
 
 void Mutex::Unlock() {
-  if (mutex_unlock(&impl_) != 0) {
-    LOG(FATAL, "mutex_unlock(%p(=%" PRIu32 ")) failed because of %d: %s\n",
-        &impl_, impl_, errno, strerror(errno));
-  }
+  mutex_unlock(&impl_);
 }
 
 bool Mutex::TryLock() {
diff --git a/aos/atom_code/ipc_lib/queue.c b/aos/atom_code/ipc_lib/queue.c
index 2e45326..6c1ce71 100644
--- a/aos/atom_code/ipc_lib/queue.c
+++ b/aos/atom_code/ipc_lib/queue.c
@@ -144,8 +144,8 @@
 	buf->start = 0;
 	buf->end = 0;
 	buf->msgs = 0;
-	buf->writable = 1;
-	buf->readable = 0;
+	memset(&buf->writable, 0, sizeof(buf->writable));
+	memset(&buf->readable, 0, sizeof(buf->readable));
 	buf->buff_lock = 0;
 	pool->pool_lock = 0;
 	queue->recycle = NULL;
@@ -236,30 +236,18 @@
       msg_ref_dec(buf->data[buf->start], &queue->pool, queue);
       buf->start = (buf->start + 1) % buf->length;
     } else { // BLOCK
-      mutex_unlock(&buf->buff_lock);
 #if WRITE_DEBUG
       printf("queue: going to wait for writable(=%p) of %p\n",
           &buf->writable, queue);
 #endif
-      if (condition_wait(&buf->writable)) {
+       condition_wait(&buf->writable, &buf->buff_lock);
 #if WRITE_DEBUG
-        printf("queue: waiting for writable(=%p) of %p failed\n",
-            &buf->writable, queue);
+       printf("queue: done waiting for writable(=%p) of %p\n",
+              &buf->writable, queue);
 #endif
-        return -1;
-      }
-#if WRITE_DEBUG
-      printf("queue: going to re-lock buff_lock of %p to write\n", queue);
-#endif
-      if (mutex_lock(&buf->buff_lock)) {
-#if WRITE_DEBUG
-        printf("queue: error locking buff_lock of %p\n", queue);
-#endif
-        return -1;
-      }
-    }
-    new_end = (buf->end + 1) % buf->length;
-  }
+     }
+     new_end = (buf->end + 1) % buf->length;
+   }
   buf->data[buf->end] = msg;
   ++buf->msgs;
   buf->end = new_end;
@@ -267,10 +255,7 @@
 #if WRITE_DEBUG
   printf("queue: setting readable(=%p) of %p\n", &buf->readable, queue);
 #endif
-  condition_set(&buf->readable);
-  if (((buf->end + 1) % buf->length) == buf->start) { // if it's now full
-    condition_unset(&buf->writable);
-  }
+  condition_signal(&buf->readable);
 #if WRITE_DEBUG
   printf("queue: write returning %d on queue %p\n", rv, queue);
 #endif
@@ -288,10 +273,7 @@
 // read is whether or not this read call read one off the queue
 static inline void aos_read_msg_common_end(aos_ring_buf *const buf, int read) {
 	if (read) {
-		condition_set(&buf->writable);
-		if (buf->start == buf->end) {
-			condition_unset(&buf->readable);
-		}
+		condition_signal(&buf->writable);
 	}
 }
 // Returns with buff_lock locked and a readable message in buf.
@@ -308,8 +290,8 @@
 		return -1;
 	}
 	while (buf->start == buf->end || ((index != NULL) && buf->msgs <= *index)) {
-		mutex_unlock(&buf->buff_lock);
 		if (opts & NON_BLOCK) {
+			mutex_unlock(&buf->buff_lock);
 #if READ_DEBUG
 			printf("queue: not going to block waiting on %p\n", queue);
 #endif
@@ -320,23 +302,11 @@
 					&buf->readable, queue);
 #endif
 			// wait for a message to become readable
-			if ((index == NULL) ? condition_wait(&buf->readable) :
-					condition_wait_force(&buf->readable)) {
+			condition_wait(&buf->readable, &buf->buff_lock);
 #if READ_DEBUG
-				printf("queue: waiting for readable(=%p) of %p failed\n",
-						&buf->readable, queue);
+			printf("queue: done waiting for readable(=%p) of %p\n",
+					&buf->readable, queue);
 #endif
-				return -1;
-			}
-		}
-#if READ_DEBUG
-		printf("queue: going to re-lock buff_lock of %p to read\n", queue);
-#endif
-		if (mutex_lock(&buf->buff_lock)) {
-#if READ_DEBUG
-			printf("couldn't re-lock buff_lock of %p\n", queue);
-#endif
-			return -1;
 		}
 	}
 #if READ_DEBUG
diff --git a/aos/atom_code/ipc_lib/queue_internal.h b/aos/atom_code/ipc_lib/queue_internal.h
index e6b23ef..e592649 100644
--- a/aos/atom_code/ipc_lib/queue_internal.h
+++ b/aos/atom_code/ipc_lib/queue_internal.h
@@ -35,8 +35,8 @@
 typedef struct aos_ring_buf_t {
 	mutex buff_lock; // the main lock protecting operations on this buffer
   // conditions
-	mutex writable;
-	mutex readable;
+	condition_variable writable;
+	condition_variable readable;
 	int length; // max index into data + 1
 	int start; // is an index into data
 	int end; // is an index into data
diff --git a/aos/atom_code/ipc_lib/shared_mem.c b/aos/atom_code/ipc_lib/shared_mem.c
index f631b78..cea9a67 100644
--- a/aos/atom_code/ipc_lib/shared_mem.c
+++ b/aos/atom_code/ipc_lib/shared_mem.c
@@ -46,8 +46,8 @@
   }
   if (shm == -1) {
     fprintf(stderr, "shared_mem:"
-        " shm_open(" AOS_SHM_NAME ", O_RDWR [| O_CREAT | O_EXCL, 0|0666)"
-        " failed with %d: %s\n", errno, strerror(errno));
+            " shm_open(" AOS_SHM_NAME ", O_RDWR [| O_CREAT | O_EXCL, 0|0666)"
+            " failed with %d: %s\n", errno, strerror(errno));
     return -1;
   }
   if (global_core->owner) {
@@ -62,8 +62,8 @@
       MAP_SHARED | MAP_FIXED | MAP_LOCKED | MAP_POPULATE, shm, 0);
   if (shm_address == MAP_FAILED) {
     fprintf(stderr, "shared_mem: mmap(%p, 0x%zx, stuff, stuff, %d, 0) failed"
-        " with %d: %s\n",
-        (void *)SHM_START, SIZEOFSHMSEG, shm, errno, strerror(errno));
+            " with %d: %s\n",
+            (void *)SHM_START, SIZEOFSHMSEG, shm, errno, strerror(errno));
     return -1;
   }
   printf("shared_mem: shm at: %p\n", shm_address);
@@ -88,9 +88,9 @@
     init_shared_mem_core(global_core->mem_struct);
   }
   if (global_core->owner) {
-    condition_set(&global_core->mem_struct->creation_condition);
+    futex_set(&global_core->mem_struct->creation_condition);
   } else {
-    if (condition_wait(&global_core->mem_struct->creation_condition) != 0) {
+    if (futex_wait(&global_core->mem_struct->creation_condition) != 0) {
       fprintf(stderr, "waiting on creation_condition failed\n");
       return -1;
     }
@@ -116,4 +116,3 @@
   }
   return 0;
 }
-