Add timeout to Queue read operations

This lets us poll a quit flag up a layer in the event loop.

Change-Id: Id25c5098ded06c5af1d74591e4d606c0b7220c38
diff --git a/aos/ipc_lib/aos_sync.cc b/aos/ipc_lib/aos_sync.cc
index a87b15b..572b864 100644
--- a/aos/ipc_lib/aos_sync.cc
+++ b/aos/ipc_lib/aos_sync.cc
@@ -848,7 +848,7 @@
   return (value & FUTEX_TID_MASK) == tid;
 }
 
-int condition_wait(aos_condition *c, aos_mutex *m) {
+int condition_wait(aos_condition *c, aos_mutex *m, struct timespec *end_time) {
   RunObservers run_observers(c, false);
   const uint32_t tid = get_tid();
   const uint32_t wait_start = __atomic_load_n(c, __ATOMIC_SEQ_CST);
@@ -860,8 +860,22 @@
   while (true) {
     // Wait in the kernel iff the value of it doesn't change (ie somebody else
     // does a wake) from before we unlocked the mutex.
-    int ret = sys_futex_wait_requeue_pi(c, wait_start, nullptr, &m->futex);
+    int ret = sys_futex_wait_requeue_pi(c, wait_start, end_time, &m->futex);
+
     if (ret != 0) {
+      // Timed out waiting.  Signal that back up to the user.
+      if (__builtin_expect(ret == -ETIMEDOUT, true)) {
+        // We have to relock it ourself because the kernel didn't do it.
+        const int r = mutex_do_get(m, false, nullptr, tid);
+        assert(__builtin_expect(r == 0 || r == 1, true));
+        adder.Add();
+
+        // OWNER_DIED takes priority.  Pass it on if we found it.
+        if (r == 1) return r;
+        // Otherwise communicate that we were interrupted.
+        return -1;
+      }
+
       // If it failed because somebody else did a wake and changed the value
       // before we actually made it to sleep.
       if (__builtin_expect(ret == -EAGAIN, true)) {
@@ -878,7 +892,9 @@
         return r;
       }
       // Try again if it was because of a signal.
-      if (__builtin_expect(ret == -EINTR, true)) continue;
+      if (__builtin_expect((ret == -EINTR), true)) {
+        continue;
+      }
       my_robust_list::robust_head.pending_next = 0;
       PELOG(FATAL, -ret, "FUTEX_WAIT_REQUEUE_PI(%p, %" PRIu32 ", %p) failed", c,
             wait_start, &m->futex);
@@ -886,7 +902,8 @@
       // Record that the kernel relocked it for us.
       lock_pthread_mutex(m);
 
-      // We succeeded in waiting, and the kernel took care of locking the mutex
+      // We succeeded in waiting, and the kernel took care of locking the
+      // mutex
       // for us and setting FUTEX_WAITERS iff it needed to (for REQUEUE_PI).
 
       adder.Add();