Add timeout to Queue read operations

This lets us poll a quit flag up a layer in the event loop.

Change-Id: Id25c5098ded06c5af1d74591e4d606c0b7220c38
diff --git a/aos/ipc_lib/aos_sync.cc b/aos/ipc_lib/aos_sync.cc
index a87b15b..572b864 100644
--- a/aos/ipc_lib/aos_sync.cc
+++ b/aos/ipc_lib/aos_sync.cc
@@ -848,7 +848,7 @@
   return (value & FUTEX_TID_MASK) == tid;
 }
 
-int condition_wait(aos_condition *c, aos_mutex *m) {
+int condition_wait(aos_condition *c, aos_mutex *m, struct timespec *end_time) {
   RunObservers run_observers(c, false);
   const uint32_t tid = get_tid();
   const uint32_t wait_start = __atomic_load_n(c, __ATOMIC_SEQ_CST);
@@ -860,8 +860,22 @@
   while (true) {
     // Wait in the kernel iff the value of it doesn't change (ie somebody else
     // does a wake) from before we unlocked the mutex.
-    int ret = sys_futex_wait_requeue_pi(c, wait_start, nullptr, &m->futex);
+    int ret = sys_futex_wait_requeue_pi(c, wait_start, end_time, &m->futex);
+
     if (ret != 0) {
+      // Timed out waiting.  Signal that back up to the user.
+      if (__builtin_expect(ret == -ETIMEDOUT, true)) {
+        // We have to relock it ourself because the kernel didn't do it.
+        const int r = mutex_do_get(m, false, nullptr, tid);
+        assert(__builtin_expect(r == 0 || r == 1, true));
+        adder.Add();
+
+        // OWNER_DIED takes priority.  Pass it on if we found it.
+        if (r == 1) return r;
+        // Otherwise communicate that we were interrupted.
+        return -1;
+      }
+
       // If it failed because somebody else did a wake and changed the value
       // before we actually made it to sleep.
       if (__builtin_expect(ret == -EAGAIN, true)) {
@@ -878,7 +892,9 @@
         return r;
       }
       // Try again if it was because of a signal.
-      if (__builtin_expect(ret == -EINTR, true)) continue;
+      if (__builtin_expect((ret == -EINTR), true)) {
+        continue;
+      }
       my_robust_list::robust_head.pending_next = 0;
       PELOG(FATAL, -ret, "FUTEX_WAIT_REQUEUE_PI(%p, %" PRIu32 ", %p) failed", c,
             wait_start, &m->futex);
@@ -886,7 +902,8 @@
       // Record that the kernel relocked it for us.
       lock_pthread_mutex(m);
 
-      // We succeeded in waiting, and the kernel took care of locking the mutex
+      // We succeeded in waiting, and the kernel took care of locking the
+      // mutex
       // for us and setting FUTEX_WAITERS iff it needed to (for REQUEUE_PI).
 
       adder.Add();
diff --git a/aos/ipc_lib/aos_sync.h b/aos/ipc_lib/aos_sync.h
index ae95c6d..2824516 100644
--- a/aos/ipc_lib/aos_sync.h
+++ b/aos/ipc_lib/aos_sync.h
@@ -1,10 +1,11 @@
 #ifndef AOS_IPC_LIB_SYNC_H_
 #define AOS_IPC_LIB_SYNC_H_
 
-#include <stdlib.h>
 #include <signal.h>
-#include <stdint.h>
+#include <stdbool.h>
 #include <stddef.h>
+#include <stdint.h>
+#include <stdlib.h>
 
 #ifdef __cplusplus
 extern "C" {
@@ -142,8 +143,10 @@
 // this function returns.
 // NOTE: The relocking of m is not atomic with stopping the actual wait and
 // other process(es) may lock (+unlock) the mutex first.
-// Returns 0 on success or 1 if the previous owner died.
-int condition_wait(aos_condition *c, struct aos_mutex *m)
+// Returns 0 on success, 1 if the previous owner died or -1 if we timed out.
+// Will only return -1 on timeout if end_time is not null.
+int condition_wait(aos_condition *c, struct aos_mutex *m,
+                   struct timespec *end_time)
     __attribute__((warn_unused_result));
 // If any other processes are condition_waiting on c, wake 1 of them. Does not
 // require m to be locked.
diff --git a/aos/ipc_lib/queue.cc b/aos/ipc_lib/queue.cc
index c751a7e..295e854 100644
--- a/aos/ipc_lib/queue.cc
+++ b/aos/ipc_lib/queue.cc
@@ -5,20 +5,22 @@
 
 #include "aos/ipc_lib/queue.h"
 
+#include <assert.h>
+#include <errno.h>
 #include <stdio.h>
 #include <string.h>
-#include <errno.h>
-#include <assert.h>
 
-#include <memory>
 #include <algorithm>
+#include <memory>
 
-#include "aos/type_traits/type_traits.h"
 #include "aos/ipc_lib/core_lib.h"
+#include "aos/type_traits/type_traits.h"
 
 namespace aos {
 namespace {
 
+namespace chrono = ::std::chrono;
+
 static_assert(shm_ok<RawQueue>::value,
               "RawQueue instances go into shared memory");
 
@@ -61,20 +63,16 @@
     __atomic_store_n(&ref_count_, val, __ATOMIC_RELAXED);
   }
 
-  void ref_count_sub() {
-    __atomic_sub_fetch(&ref_count_, 1, __ATOMIC_RELAXED);
-  }
-  void ref_count_add() {
-    __atomic_add_fetch(&ref_count_, 1, __ATOMIC_RELAXED);
-  }
+  void ref_count_sub() { __atomic_sub_fetch(&ref_count_, 1, __ATOMIC_RELAXED); }
+  void ref_count_add() { __atomic_add_fetch(&ref_count_, 1, __ATOMIC_RELAXED); }
 
  private:
   // This gets accessed with atomic instructions without any
   // locks held by various member functions.
   int32_t ref_count_;
 
-  // Padding to make the total size 8 bytes if we have 4-byte pointers or bump
-  // it to 16 if a pointer is 8 bytes by itself.
+// Padding to make the total size 8 bytes if we have 4-byte pointers or bump
+// it to 16 if a pointer is 8 bytes by itself.
 #if __SIZEOF_POINTER__ == 8
 #ifdef __clang__
   // Clang is smart enough to realize this is unused, but GCC doesn't like the
@@ -83,7 +81,7 @@
 #endif
   char padding[4];
 #elif __SIZEOF_POINTER__ == 4
-  // No padding needed to get 8 byte total size.
+// No padding needed to get 8 byte total size.
 #else
 #error Unknown pointer size.
 #endif
@@ -135,13 +133,17 @@
   if (__builtin_expect(recycle_ != nullptr, 0)) {
     void *const new_msg = recycle_->GetMessage();
     if (new_msg == nullptr) {
-      fprintf(stderr, "queue: couldn't get a message"
-              " for recycle queue %p\n", recycle_);
+      fprintf(stderr,
+              "queue: couldn't get a message"
+              " for recycle queue %p\n",
+              recycle_);
     } else {
       header->ref_count_add();
       if (!recycle_->WriteMessage(const_cast<void *>(msg), kOverride)) {
-        fprintf(stderr, "queue: %p->WriteMessage(%p, kOverride) failed."
-                " aborting\n", recycle_, msg);
+        fprintf(stderr,
+                "queue: %p->WriteMessage(%p, kOverride) failed."
+                " aborting\n",
+                recycle_, msg);
         printf("see stderr\n");
         abort();
       }
@@ -210,8 +212,8 @@
   recycle_ = NULL;
 
   if (kFetchDebug) {
-    printf("initializing name=%s, length=%zd, hash=%d, queue_length=%d\n",
-           name, length, hash, queue_length);
+    printf("initializing name=%s, length=%zd, hash=%d, queue_length=%d\n", name,
+           length, hash, queue_length);
   }
 
   data_length_ = queue_length + 1;
@@ -242,7 +244,7 @@
 }
 
 RawQueue *RawQueue::Fetch(const char *name, size_t length, int hash,
-                    int queue_length) {
+                          int queue_length) {
   if (kFetchDebug) {
     printf("fetching queue %s\n", name);
   }
@@ -250,8 +252,8 @@
     LOG(FATAL, "mutex_lock(%p) failed\n",
         &global_core->mem_struct->queues.lock);
   }
-  RawQueue *current = static_cast<RawQueue *>(
-      global_core->mem_struct->queues.pointer);
+  RawQueue *current =
+      static_cast<RawQueue *>(global_core->mem_struct->queues.pointer);
   if (current != NULL) {
     while (true) {
       // If we found a matching queue.
@@ -284,8 +286,8 @@
 }
 
 RawQueue *RawQueue::Fetch(const char *name, size_t length, int hash,
-                    int queue_length,
-                    int recycle_hash, int recycle_length, RawQueue **recycle) {
+                          int queue_length, int recycle_hash,
+                          int recycle_length, RawQueue **recycle) {
   RawQueue *r = Fetch(name, length, hash, queue_length);
   r->recycle_ = Fetch(name, length, recycle_hash, recycle_length);
   if (r == r->recycle_) {
@@ -368,7 +370,8 @@
   }
 }
 
-bool RawQueue::ReadCommonStart(Options<RawQueue> options, int *index) {
+bool RawQueue::ReadCommonStart(Options<RawQueue> options, int *index,
+                               chrono::nanoseconds timeout) {
   while (data_start_ == data_end_ || ((index != NULL) && messages_ <= *index)) {
     if (options & kNonBlock) {
       if (kReadDebug) {
@@ -382,7 +385,17 @@
       }
       readable_waiting_ = true;
       // Wait for a message to become readable.
-      CHECK(!readable_.Wait());
+      while (true) {
+        Condition::WaitResult wait_result = readable_.WaitTimed(timeout);
+        if (wait_result == Condition::WaitResult::kOk) {
+          break;
+        }
+        CHECK(wait_result != Condition::WaitResult::kOwnerDied);
+        if (wait_result == Condition::WaitResult::kTimeout) {
+          return false;
+        }
+      }
+
       if (kReadDebug) {
         printf("queue: done waiting for readable_ of %p\n", this);
       }
@@ -392,8 +405,8 @@
   // Wait()ing above so this value might have changed.
   writable_start_ = is_writable();
   if (kReadDebug) {
-    printf("queue: %p->read(%p) start=%d end=%d writable_start=%d\n",
-           this, index, data_start_, data_end_, writable_start_);
+    printf("queue: %p->read(%p) start=%d end=%d writable_start=%d\n", this,
+           index, data_start_, data_end_, writable_start_);
   }
   return true;
 }
@@ -416,7 +429,7 @@
   IPCMutexLocker locker(&data_lock_);
   CHECK(!locker.owner_died());
 
-  if (!ReadCommonStart(options, nullptr)) {
+  if (!ReadCommonStart(options, nullptr, chrono::nanoseconds(0))) {
     if (kReadDebug) {
       printf("queue: %p common returned false\n", this);
     }
@@ -468,18 +481,18 @@
   return msg;
 }
 
-const void *RawQueue::DoReadMessageIndex(Options<RawQueue> options,
-                                         int *index) {
+const void *RawQueue::DoReadMessageIndex(Options<RawQueue> options, int *index,
+                                         chrono::nanoseconds timeout) {
   if (kReadDebug) {
-    printf("queue: %p->ReadMessageIndex(%x, %p(*=%d))\n",
-           this, options.printable(), index, *index);
+    printf("queue: %p->ReadMessageIndex(%x, %p(*=%d))\n", this,
+           options.printable(), index, *index);
   }
   void *msg = NULL;
 
   IPCMutexLocker locker(&data_lock_);
   CHECK(!locker.owner_died());
 
-  if (!ReadCommonStart(options, index)) {
+  if (!ReadCommonStart(options, index, timeout)) {
     if (kReadDebug) {
       printf("queue: %p common returned false\n", this);
     }
@@ -506,8 +519,8 @@
     int current_messages = data_end_ - data_start_;
     if (current_messages < 0) current_messages += data_length_;
     if (kReadIndexDebug) {
-      printf("queue: %p start=%d end=%d current=%d\n",
-             this, data_start_, data_end_, current_messages);
+      printf("queue: %p start=%d end=%d current=%d\n", this, data_start_,
+             data_end_, current_messages);
     }
     assert(current_messages > 0);
     // If we're behind the available messages.
diff --git a/aos/ipc_lib/queue.h b/aos/ipc_lib/queue.h
index 6fef1d5..b93cc42 100644
--- a/aos/ipc_lib/queue.h
+++ b/aos/ipc_lib/queue.h
@@ -117,13 +117,15 @@
   // Calling with both kPeek and kFromEnd in options isn't valid because that
   // would mean ignoring index, which would make this function the same as
   // ReadMessage (which should be used instead).
-  const void *ReadMessageIndex(Options<RawQueue> options, int *index) {
+  const void *ReadMessageIndex(
+      Options<RawQueue> options, int *index,
+      ::std::chrono::nanoseconds timeout = ::std::chrono::nanoseconds(0)) {
     CheckReadOptions(options);
     static constexpr Options<RawQueue> kFromEndAndPeek = kFromEnd | kPeek;
     if (options.AllSet(kFromEndAndPeek)) {
       LOG(FATAL, "ReadMessageIndex(kFromEnd | kPeek) is not allowed\n");
     }
-    return DoReadMessageIndex(options, index);
+    return DoReadMessageIndex(options, index, timeout);
   }
 
   // Retrieves ("allocates") a message that can then be written to the queue.
@@ -153,7 +155,8 @@
   // The public wrappers around these are inlined and do argument checking.
   bool DoWriteMessage(void *msg, Options<RawQueue> options);
   const void *DoReadMessage(Options<RawQueue> options);
-  const void *DoReadMessageIndex(Options<RawQueue> options, int *index);
+  const void *DoReadMessageIndex(Options<RawQueue> options, int *index,
+                                 ::std::chrono::nanoseconds timeout);
   void CheckReadOptions(Options<RawQueue> options) {
     static constexpr Options<RawQueue> kValidOptions =
         kPeek | kFromEnd | kNonBlock | kBlock;
@@ -216,7 +219,8 @@
   // Must be called with data_lock_ locked.
   // *read_data will be initialized.
   // Returns with a readable message in data_ or false.
-  bool ReadCommonStart(Options<RawQueue> options, int *index);
+  bool ReadCommonStart(Options<RawQueue> options, int *index,
+                       ::std::chrono::nanoseconds timeout);
   // Deals with setting/unsetting readable_ and writable_.
   // Must be called after data_lock_ has been unlocked.
   // read_data should be the same thing that was passed in to ReadCommonStart.