Add timeout to Queue read operations

This lets us poll a quit flag up a layer in the event loop.

Change-Id: Id25c5098ded06c5af1d74591e4d606c0b7220c38
diff --git a/aos/ipc_lib/queue.cc b/aos/ipc_lib/queue.cc
index c751a7e..295e854 100644
--- a/aos/ipc_lib/queue.cc
+++ b/aos/ipc_lib/queue.cc
@@ -5,20 +5,22 @@
 
 #include "aos/ipc_lib/queue.h"
 
+#include <assert.h>
+#include <errno.h>
 #include <stdio.h>
 #include <string.h>
-#include <errno.h>
-#include <assert.h>
 
-#include <memory>
 #include <algorithm>
+#include <memory>
 
-#include "aos/type_traits/type_traits.h"
 #include "aos/ipc_lib/core_lib.h"
+#include "aos/type_traits/type_traits.h"
 
 namespace aos {
 namespace {
 
+namespace chrono = ::std::chrono;
+
 static_assert(shm_ok<RawQueue>::value,
               "RawQueue instances go into shared memory");
 
@@ -61,20 +63,16 @@
     __atomic_store_n(&ref_count_, val, __ATOMIC_RELAXED);
   }
 
-  void ref_count_sub() {
-    __atomic_sub_fetch(&ref_count_, 1, __ATOMIC_RELAXED);
-  }
-  void ref_count_add() {
-    __atomic_add_fetch(&ref_count_, 1, __ATOMIC_RELAXED);
-  }
+  void ref_count_sub() { __atomic_sub_fetch(&ref_count_, 1, __ATOMIC_RELAXED); }
+  void ref_count_add() { __atomic_add_fetch(&ref_count_, 1, __ATOMIC_RELAXED); }
 
  private:
   // This gets accessed with atomic instructions without any
   // locks held by various member functions.
   int32_t ref_count_;
 
-  // Padding to make the total size 8 bytes if we have 4-byte pointers or bump
-  // it to 16 if a pointer is 8 bytes by itself.
+// Padding to make the total size 8 bytes if we have 4-byte pointers or bump
+// it to 16 if a pointer is 8 bytes by itself.
 #if __SIZEOF_POINTER__ == 8
 #ifdef __clang__
   // Clang is smart enough to realize this is unused, but GCC doesn't like the
@@ -83,7 +81,7 @@
 #endif
   char padding[4];
 #elif __SIZEOF_POINTER__ == 4
-  // No padding needed to get 8 byte total size.
+// No padding needed to get 8 byte total size.
 #else
 #error Unknown pointer size.
 #endif
@@ -135,13 +133,17 @@
   if (__builtin_expect(recycle_ != nullptr, 0)) {
     void *const new_msg = recycle_->GetMessage();
     if (new_msg == nullptr) {
-      fprintf(stderr, "queue: couldn't get a message"
-              " for recycle queue %p\n", recycle_);
+      fprintf(stderr,
+              "queue: couldn't get a message"
+              " for recycle queue %p\n",
+              recycle_);
     } else {
       header->ref_count_add();
       if (!recycle_->WriteMessage(const_cast<void *>(msg), kOverride)) {
-        fprintf(stderr, "queue: %p->WriteMessage(%p, kOverride) failed."
-                " aborting\n", recycle_, msg);
+        fprintf(stderr,
+                "queue: %p->WriteMessage(%p, kOverride) failed."
+                " aborting\n",
+                recycle_, msg);
         printf("see stderr\n");
         abort();
       }
@@ -210,8 +212,8 @@
   recycle_ = NULL;
 
   if (kFetchDebug) {
-    printf("initializing name=%s, length=%zd, hash=%d, queue_length=%d\n",
-           name, length, hash, queue_length);
+    printf("initializing name=%s, length=%zd, hash=%d, queue_length=%d\n", name,
+           length, hash, queue_length);
   }
 
   data_length_ = queue_length + 1;
@@ -242,7 +244,7 @@
 }
 
 RawQueue *RawQueue::Fetch(const char *name, size_t length, int hash,
-                    int queue_length) {
+                          int queue_length) {
   if (kFetchDebug) {
     printf("fetching queue %s\n", name);
   }
@@ -250,8 +252,8 @@
     LOG(FATAL, "mutex_lock(%p) failed\n",
         &global_core->mem_struct->queues.lock);
   }
-  RawQueue *current = static_cast<RawQueue *>(
-      global_core->mem_struct->queues.pointer);
+  RawQueue *current =
+      static_cast<RawQueue *>(global_core->mem_struct->queues.pointer);
   if (current != NULL) {
     while (true) {
       // If we found a matching queue.
@@ -284,8 +286,8 @@
 }
 
 RawQueue *RawQueue::Fetch(const char *name, size_t length, int hash,
-                    int queue_length,
-                    int recycle_hash, int recycle_length, RawQueue **recycle) {
+                          int queue_length, int recycle_hash,
+                          int recycle_length, RawQueue **recycle) {
   RawQueue *r = Fetch(name, length, hash, queue_length);
   r->recycle_ = Fetch(name, length, recycle_hash, recycle_length);
   if (r == r->recycle_) {
@@ -368,7 +370,8 @@
   }
 }
 
-bool RawQueue::ReadCommonStart(Options<RawQueue> options, int *index) {
+bool RawQueue::ReadCommonStart(Options<RawQueue> options, int *index,
+                               chrono::nanoseconds timeout) {
   while (data_start_ == data_end_ || ((index != NULL) && messages_ <= *index)) {
     if (options & kNonBlock) {
       if (kReadDebug) {
@@ -382,7 +385,17 @@
       }
       readable_waiting_ = true;
       // Wait for a message to become readable.
-      CHECK(!readable_.Wait());
+      while (true) {
+        Condition::WaitResult wait_result = readable_.WaitTimed(timeout);
+        if (wait_result == Condition::WaitResult::kOk) {
+          break;
+        }
+        CHECK(wait_result != Condition::WaitResult::kOwnerDied);
+        if (wait_result == Condition::WaitResult::kTimeout) {
+          return false;
+        }
+      }
+
       if (kReadDebug) {
         printf("queue: done waiting for readable_ of %p\n", this);
       }
@@ -392,8 +405,8 @@
   // Wait()ing above so this value might have changed.
   writable_start_ = is_writable();
   if (kReadDebug) {
-    printf("queue: %p->read(%p) start=%d end=%d writable_start=%d\n",
-           this, index, data_start_, data_end_, writable_start_);
+    printf("queue: %p->read(%p) start=%d end=%d writable_start=%d\n", this,
+           index, data_start_, data_end_, writable_start_);
   }
   return true;
 }
@@ -416,7 +429,7 @@
   IPCMutexLocker locker(&data_lock_);
   CHECK(!locker.owner_died());
 
-  if (!ReadCommonStart(options, nullptr)) {
+  if (!ReadCommonStart(options, nullptr, chrono::nanoseconds(0))) {
     if (kReadDebug) {
       printf("queue: %p common returned false\n", this);
     }
@@ -468,18 +481,18 @@
   return msg;
 }
 
-const void *RawQueue::DoReadMessageIndex(Options<RawQueue> options,
-                                         int *index) {
+const void *RawQueue::DoReadMessageIndex(Options<RawQueue> options, int *index,
+                                         chrono::nanoseconds timeout) {
   if (kReadDebug) {
-    printf("queue: %p->ReadMessageIndex(%x, %p(*=%d))\n",
-           this, options.printable(), index, *index);
+    printf("queue: %p->ReadMessageIndex(%x, %p(*=%d))\n", this,
+           options.printable(), index, *index);
   }
   void *msg = NULL;
 
   IPCMutexLocker locker(&data_lock_);
   CHECK(!locker.owner_died());
 
-  if (!ReadCommonStart(options, index)) {
+  if (!ReadCommonStart(options, index, timeout)) {
     if (kReadDebug) {
       printf("queue: %p common returned false\n", this);
     }
@@ -506,8 +519,8 @@
     int current_messages = data_end_ - data_start_;
     if (current_messages < 0) current_messages += data_length_;
     if (kReadIndexDebug) {
-      printf("queue: %p start=%d end=%d current=%d\n",
-             this, data_start_, data_end_, current_messages);
+      printf("queue: %p start=%d end=%d current=%d\n", this, data_start_,
+             data_end_, current_messages);
     }
     assert(current_messages > 0);
     // If we're behind the available messages.