Add timeout to Queue read operations
This lets us poll a quit flag up a layer in the event loop.
Change-Id: Id25c5098ded06c5af1d74591e4d606c0b7220c38
diff --git a/aos/ipc_lib/queue.cc b/aos/ipc_lib/queue.cc
index c751a7e..295e854 100644
--- a/aos/ipc_lib/queue.cc
+++ b/aos/ipc_lib/queue.cc
@@ -5,20 +5,22 @@
#include "aos/ipc_lib/queue.h"
+#include <assert.h>
+#include <errno.h>
#include <stdio.h>
#include <string.h>
-#include <errno.h>
-#include <assert.h>
-#include <memory>
#include <algorithm>
+#include <memory>
-#include "aos/type_traits/type_traits.h"
#include "aos/ipc_lib/core_lib.h"
+#include "aos/type_traits/type_traits.h"
namespace aos {
namespace {
+namespace chrono = ::std::chrono;
+
static_assert(shm_ok<RawQueue>::value,
"RawQueue instances go into shared memory");
@@ -61,20 +63,16 @@
__atomic_store_n(&ref_count_, val, __ATOMIC_RELAXED);
}
- void ref_count_sub() {
- __atomic_sub_fetch(&ref_count_, 1, __ATOMIC_RELAXED);
- }
- void ref_count_add() {
- __atomic_add_fetch(&ref_count_, 1, __ATOMIC_RELAXED);
- }
+ void ref_count_sub() { __atomic_sub_fetch(&ref_count_, 1, __ATOMIC_RELAXED); }
+ void ref_count_add() { __atomic_add_fetch(&ref_count_, 1, __ATOMIC_RELAXED); }
private:
// This gets accessed with atomic instructions without any
// locks held by various member functions.
int32_t ref_count_;
- // Padding to make the total size 8 bytes if we have 4-byte pointers or bump
- // it to 16 if a pointer is 8 bytes by itself.
+// Padding to make the total size 8 bytes if we have 4-byte pointers or bump
+// it to 16 if a pointer is 8 bytes by itself.
#if __SIZEOF_POINTER__ == 8
#ifdef __clang__
// Clang is smart enough to realize this is unused, but GCC doesn't like the
@@ -83,7 +81,7 @@
#endif
char padding[4];
#elif __SIZEOF_POINTER__ == 4
- // No padding needed to get 8 byte total size.
+// No padding needed to get 8 byte total size.
#else
#error Unknown pointer size.
#endif
@@ -135,13 +133,17 @@
if (__builtin_expect(recycle_ != nullptr, 0)) {
void *const new_msg = recycle_->GetMessage();
if (new_msg == nullptr) {
- fprintf(stderr, "queue: couldn't get a message"
- " for recycle queue %p\n", recycle_);
+ fprintf(stderr,
+ "queue: couldn't get a message"
+ " for recycle queue %p\n",
+ recycle_);
} else {
header->ref_count_add();
if (!recycle_->WriteMessage(const_cast<void *>(msg), kOverride)) {
- fprintf(stderr, "queue: %p->WriteMessage(%p, kOverride) failed."
- " aborting\n", recycle_, msg);
+ fprintf(stderr,
+ "queue: %p->WriteMessage(%p, kOverride) failed."
+ " aborting\n",
+ recycle_, msg);
printf("see stderr\n");
abort();
}
@@ -210,8 +212,8 @@
recycle_ = NULL;
if (kFetchDebug) {
- printf("initializing name=%s, length=%zd, hash=%d, queue_length=%d\n",
- name, length, hash, queue_length);
+ printf("initializing name=%s, length=%zd, hash=%d, queue_length=%d\n", name,
+ length, hash, queue_length);
}
data_length_ = queue_length + 1;
@@ -242,7 +244,7 @@
}
RawQueue *RawQueue::Fetch(const char *name, size_t length, int hash,
- int queue_length) {
+ int queue_length) {
if (kFetchDebug) {
printf("fetching queue %s\n", name);
}
@@ -250,8 +252,8 @@
LOG(FATAL, "mutex_lock(%p) failed\n",
&global_core->mem_struct->queues.lock);
}
- RawQueue *current = static_cast<RawQueue *>(
- global_core->mem_struct->queues.pointer);
+ RawQueue *current =
+ static_cast<RawQueue *>(global_core->mem_struct->queues.pointer);
if (current != NULL) {
while (true) {
// If we found a matching queue.
@@ -284,8 +286,8 @@
}
RawQueue *RawQueue::Fetch(const char *name, size_t length, int hash,
- int queue_length,
- int recycle_hash, int recycle_length, RawQueue **recycle) {
+ int queue_length, int recycle_hash,
+ int recycle_length, RawQueue **recycle) {
RawQueue *r = Fetch(name, length, hash, queue_length);
r->recycle_ = Fetch(name, length, recycle_hash, recycle_length);
if (r == r->recycle_) {
@@ -368,7 +370,8 @@
}
}
-bool RawQueue::ReadCommonStart(Options<RawQueue> options, int *index) {
+bool RawQueue::ReadCommonStart(Options<RawQueue> options, int *index,
+ chrono::nanoseconds timeout) {
while (data_start_ == data_end_ || ((index != NULL) && messages_ <= *index)) {
if (options & kNonBlock) {
if (kReadDebug) {
@@ -382,7 +385,17 @@
}
readable_waiting_ = true;
// Wait for a message to become readable.
- CHECK(!readable_.Wait());
+ while (true) {
+ Condition::WaitResult wait_result = readable_.WaitTimed(timeout);
+ if (wait_result == Condition::WaitResult::kOk) {
+ break;
+ }
+ CHECK(wait_result != Condition::WaitResult::kOwnerDied);
+ if (wait_result == Condition::WaitResult::kTimeout) {
+ return false;
+ }
+ }
+
if (kReadDebug) {
printf("queue: done waiting for readable_ of %p\n", this);
}
@@ -392,8 +405,8 @@
// Wait()ing above so this value might have changed.
writable_start_ = is_writable();
if (kReadDebug) {
- printf("queue: %p->read(%p) start=%d end=%d writable_start=%d\n",
- this, index, data_start_, data_end_, writable_start_);
+ printf("queue: %p->read(%p) start=%d end=%d writable_start=%d\n", this,
+ index, data_start_, data_end_, writable_start_);
}
return true;
}
@@ -416,7 +429,7 @@
IPCMutexLocker locker(&data_lock_);
CHECK(!locker.owner_died());
- if (!ReadCommonStart(options, nullptr)) {
+ if (!ReadCommonStart(options, nullptr, chrono::nanoseconds(0))) {
if (kReadDebug) {
printf("queue: %p common returned false\n", this);
}
@@ -468,18 +481,18 @@
return msg;
}
-const void *RawQueue::DoReadMessageIndex(Options<RawQueue> options,
- int *index) {
+const void *RawQueue::DoReadMessageIndex(Options<RawQueue> options, int *index,
+ chrono::nanoseconds timeout) {
if (kReadDebug) {
- printf("queue: %p->ReadMessageIndex(%x, %p(*=%d))\n",
- this, options.printable(), index, *index);
+ printf("queue: %p->ReadMessageIndex(%x, %p(*=%d))\n", this,
+ options.printable(), index, *index);
}
void *msg = NULL;
IPCMutexLocker locker(&data_lock_);
CHECK(!locker.owner_died());
- if (!ReadCommonStart(options, index)) {
+ if (!ReadCommonStart(options, index, timeout)) {
if (kReadDebug) {
printf("queue: %p common returned false\n", this);
}
@@ -506,8 +519,8 @@
int current_messages = data_end_ - data_start_;
if (current_messages < 0) current_messages += data_length_;
if (kReadIndexDebug) {
- printf("queue: %p start=%d end=%d current=%d\n",
- this, data_start_, data_end_, current_messages);
+ printf("queue: %p start=%d end=%d current=%d\n", this, data_start_,
+ data_end_, current_messages);
}
assert(current_messages > 0);
// If we're behind the available messages.