Add timeout to Queue read operations
This lets us poll a quit flag up a layer in the event loop.
Change-Id: Id25c5098ded06c5af1d74591e4d606c0b7220c38
diff --git a/aos/ipc_lib/aos_sync.cc b/aos/ipc_lib/aos_sync.cc
index a87b15b..572b864 100644
--- a/aos/ipc_lib/aos_sync.cc
+++ b/aos/ipc_lib/aos_sync.cc
@@ -848,7 +848,7 @@
return (value & FUTEX_TID_MASK) == tid;
}
-int condition_wait(aos_condition *c, aos_mutex *m) {
+int condition_wait(aos_condition *c, aos_mutex *m, struct timespec *end_time) {
RunObservers run_observers(c, false);
const uint32_t tid = get_tid();
const uint32_t wait_start = __atomic_load_n(c, __ATOMIC_SEQ_CST);
@@ -860,8 +860,22 @@
while (true) {
// Wait in the kernel iff the value of it doesn't change (ie somebody else
// does a wake) from before we unlocked the mutex.
- int ret = sys_futex_wait_requeue_pi(c, wait_start, nullptr, &m->futex);
+ int ret = sys_futex_wait_requeue_pi(c, wait_start, end_time, &m->futex);
+
if (ret != 0) {
+ // Timed out waiting. Signal that back up to the user.
+ if (__builtin_expect(ret == -ETIMEDOUT, true)) {
+ // We have to relock it ourself because the kernel didn't do it.
+ const int r = mutex_do_get(m, false, nullptr, tid);
+ assert(__builtin_expect(r == 0 || r == 1, true));
+ adder.Add();
+
+ // OWNER_DIED takes priority. Pass it on if we found it.
+ if (r == 1) return r;
+ // Otherwise communicate that we were interrupted.
+ return -1;
+ }
+
// If it failed because somebody else did a wake and changed the value
// before we actually made it to sleep.
if (__builtin_expect(ret == -EAGAIN, true)) {
@@ -878,7 +892,9 @@
return r;
}
// Try again if it was because of a signal.
- if (__builtin_expect(ret == -EINTR, true)) continue;
+ if (__builtin_expect((ret == -EINTR), true)) {
+ continue;
+ }
my_robust_list::robust_head.pending_next = 0;
PELOG(FATAL, -ret, "FUTEX_WAIT_REQUEUE_PI(%p, %" PRIu32 ", %p) failed", c,
wait_start, &m->futex);
@@ -886,7 +902,8 @@
// Record that the kernel relocked it for us.
lock_pthread_mutex(m);
- // We succeeded in waiting, and the kernel took care of locking the mutex
+ // We succeeded in waiting, and the kernel took care of locking the
+ // mutex
// for us and setting FUTEX_WAITERS iff it needed to (for REQUEUE_PI).
adder.Add();
diff --git a/aos/ipc_lib/aos_sync.h b/aos/ipc_lib/aos_sync.h
index ae95c6d..2824516 100644
--- a/aos/ipc_lib/aos_sync.h
+++ b/aos/ipc_lib/aos_sync.h
@@ -1,10 +1,11 @@
#ifndef AOS_IPC_LIB_SYNC_H_
#define AOS_IPC_LIB_SYNC_H_
-#include <stdlib.h>
#include <signal.h>
-#include <stdint.h>
+#include <stdbool.h>
#include <stddef.h>
+#include <stdint.h>
+#include <stdlib.h>
#ifdef __cplusplus
extern "C" {
@@ -142,8 +143,10 @@
// this function returns.
// NOTE: The relocking of m is not atomic with stopping the actual wait and
// other process(es) may lock (+unlock) the mutex first.
-// Returns 0 on success or 1 if the previous owner died.
-int condition_wait(aos_condition *c, struct aos_mutex *m)
+// Returns 0 on success, 1 if the previous owner died or -1 if we timed out.
+// Will only return -1 on timeout if end_time is not null.
+int condition_wait(aos_condition *c, struct aos_mutex *m,
+ struct timespec *end_time)
__attribute__((warn_unused_result));
// If any other processes are condition_waiting on c, wake 1 of them. Does not
// require m to be locked.
diff --git a/aos/ipc_lib/queue.cc b/aos/ipc_lib/queue.cc
index c751a7e..295e854 100644
--- a/aos/ipc_lib/queue.cc
+++ b/aos/ipc_lib/queue.cc
@@ -5,20 +5,22 @@
#include "aos/ipc_lib/queue.h"
+#include <assert.h>
+#include <errno.h>
#include <stdio.h>
#include <string.h>
-#include <errno.h>
-#include <assert.h>
-#include <memory>
#include <algorithm>
+#include <memory>
-#include "aos/type_traits/type_traits.h"
#include "aos/ipc_lib/core_lib.h"
+#include "aos/type_traits/type_traits.h"
namespace aos {
namespace {
+namespace chrono = ::std::chrono;
+
static_assert(shm_ok<RawQueue>::value,
"RawQueue instances go into shared memory");
@@ -61,20 +63,16 @@
__atomic_store_n(&ref_count_, val, __ATOMIC_RELAXED);
}
- void ref_count_sub() {
- __atomic_sub_fetch(&ref_count_, 1, __ATOMIC_RELAXED);
- }
- void ref_count_add() {
- __atomic_add_fetch(&ref_count_, 1, __ATOMIC_RELAXED);
- }
+ void ref_count_sub() { __atomic_sub_fetch(&ref_count_, 1, __ATOMIC_RELAXED); }
+ void ref_count_add() { __atomic_add_fetch(&ref_count_, 1, __ATOMIC_RELAXED); }
private:
// This gets accessed with atomic instructions without any
// locks held by various member functions.
int32_t ref_count_;
- // Padding to make the total size 8 bytes if we have 4-byte pointers or bump
- // it to 16 if a pointer is 8 bytes by itself.
+// Padding to make the total size 8 bytes if we have 4-byte pointers or bump
+// it to 16 if a pointer is 8 bytes by itself.
#if __SIZEOF_POINTER__ == 8
#ifdef __clang__
// Clang is smart enough to realize this is unused, but GCC doesn't like the
@@ -83,7 +81,7 @@
#endif
char padding[4];
#elif __SIZEOF_POINTER__ == 4
- // No padding needed to get 8 byte total size.
+// No padding needed to get 8 byte total size.
#else
#error Unknown pointer size.
#endif
@@ -135,13 +133,17 @@
if (__builtin_expect(recycle_ != nullptr, 0)) {
void *const new_msg = recycle_->GetMessage();
if (new_msg == nullptr) {
- fprintf(stderr, "queue: couldn't get a message"
- " for recycle queue %p\n", recycle_);
+ fprintf(stderr,
+ "queue: couldn't get a message"
+ " for recycle queue %p\n",
+ recycle_);
} else {
header->ref_count_add();
if (!recycle_->WriteMessage(const_cast<void *>(msg), kOverride)) {
- fprintf(stderr, "queue: %p->WriteMessage(%p, kOverride) failed."
- " aborting\n", recycle_, msg);
+ fprintf(stderr,
+ "queue: %p->WriteMessage(%p, kOverride) failed."
+ " aborting\n",
+ recycle_, msg);
printf("see stderr\n");
abort();
}
@@ -210,8 +212,8 @@
recycle_ = NULL;
if (kFetchDebug) {
- printf("initializing name=%s, length=%zd, hash=%d, queue_length=%d\n",
- name, length, hash, queue_length);
+ printf("initializing name=%s, length=%zd, hash=%d, queue_length=%d\n", name,
+ length, hash, queue_length);
}
data_length_ = queue_length + 1;
@@ -242,7 +244,7 @@
}
RawQueue *RawQueue::Fetch(const char *name, size_t length, int hash,
- int queue_length) {
+ int queue_length) {
if (kFetchDebug) {
printf("fetching queue %s\n", name);
}
@@ -250,8 +252,8 @@
LOG(FATAL, "mutex_lock(%p) failed\n",
&global_core->mem_struct->queues.lock);
}
- RawQueue *current = static_cast<RawQueue *>(
- global_core->mem_struct->queues.pointer);
+ RawQueue *current =
+ static_cast<RawQueue *>(global_core->mem_struct->queues.pointer);
if (current != NULL) {
while (true) {
// If we found a matching queue.
@@ -284,8 +286,8 @@
}
RawQueue *RawQueue::Fetch(const char *name, size_t length, int hash,
- int queue_length,
- int recycle_hash, int recycle_length, RawQueue **recycle) {
+ int queue_length, int recycle_hash,
+ int recycle_length, RawQueue **recycle) {
RawQueue *r = Fetch(name, length, hash, queue_length);
r->recycle_ = Fetch(name, length, recycle_hash, recycle_length);
if (r == r->recycle_) {
@@ -368,7 +370,8 @@
}
}
-bool RawQueue::ReadCommonStart(Options<RawQueue> options, int *index) {
+bool RawQueue::ReadCommonStart(Options<RawQueue> options, int *index,
+ chrono::nanoseconds timeout) {
while (data_start_ == data_end_ || ((index != NULL) && messages_ <= *index)) {
if (options & kNonBlock) {
if (kReadDebug) {
@@ -382,7 +385,17 @@
}
readable_waiting_ = true;
// Wait for a message to become readable.
- CHECK(!readable_.Wait());
+ while (true) {
+ Condition::WaitResult wait_result = readable_.WaitTimed(timeout);
+ if (wait_result == Condition::WaitResult::kOk) {
+ break;
+ }
+ CHECK(wait_result != Condition::WaitResult::kOwnerDied);
+ if (wait_result == Condition::WaitResult::kTimeout) {
+ return false;
+ }
+ }
+
if (kReadDebug) {
printf("queue: done waiting for readable_ of %p\n", this);
}
@@ -392,8 +405,8 @@
// Wait()ing above so this value might have changed.
writable_start_ = is_writable();
if (kReadDebug) {
- printf("queue: %p->read(%p) start=%d end=%d writable_start=%d\n",
- this, index, data_start_, data_end_, writable_start_);
+ printf("queue: %p->read(%p) start=%d end=%d writable_start=%d\n", this,
+ index, data_start_, data_end_, writable_start_);
}
return true;
}
@@ -416,7 +429,7 @@
IPCMutexLocker locker(&data_lock_);
CHECK(!locker.owner_died());
- if (!ReadCommonStart(options, nullptr)) {
+ if (!ReadCommonStart(options, nullptr, chrono::nanoseconds(0))) {
if (kReadDebug) {
printf("queue: %p common returned false\n", this);
}
@@ -468,18 +481,18 @@
return msg;
}
-const void *RawQueue::DoReadMessageIndex(Options<RawQueue> options,
- int *index) {
+const void *RawQueue::DoReadMessageIndex(Options<RawQueue> options, int *index,
+ chrono::nanoseconds timeout) {
if (kReadDebug) {
- printf("queue: %p->ReadMessageIndex(%x, %p(*=%d))\n",
- this, options.printable(), index, *index);
+ printf("queue: %p->ReadMessageIndex(%x, %p(*=%d))\n", this,
+ options.printable(), index, *index);
}
void *msg = NULL;
IPCMutexLocker locker(&data_lock_);
CHECK(!locker.owner_died());
- if (!ReadCommonStart(options, index)) {
+ if (!ReadCommonStart(options, index, timeout)) {
if (kReadDebug) {
printf("queue: %p common returned false\n", this);
}
@@ -506,8 +519,8 @@
int current_messages = data_end_ - data_start_;
if (current_messages < 0) current_messages += data_length_;
if (kReadIndexDebug) {
- printf("queue: %p start=%d end=%d current=%d\n",
- this, data_start_, data_end_, current_messages);
+ printf("queue: %p start=%d end=%d current=%d\n", this, data_start_,
+ data_end_, current_messages);
}
assert(current_messages > 0);
// If we're behind the available messages.
diff --git a/aos/ipc_lib/queue.h b/aos/ipc_lib/queue.h
index 6fef1d5..b93cc42 100644
--- a/aos/ipc_lib/queue.h
+++ b/aos/ipc_lib/queue.h
@@ -117,13 +117,15 @@
// Calling with both kPeek and kFromEnd in options isn't valid because that
// would mean ignoring index, which would make this function the same as
// ReadMessage (which should be used instead).
- const void *ReadMessageIndex(Options<RawQueue> options, int *index) {
+ const void *ReadMessageIndex(
+ Options<RawQueue> options, int *index,
+ ::std::chrono::nanoseconds timeout = ::std::chrono::nanoseconds(0)) {
CheckReadOptions(options);
static constexpr Options<RawQueue> kFromEndAndPeek = kFromEnd | kPeek;
if (options.AllSet(kFromEndAndPeek)) {
LOG(FATAL, "ReadMessageIndex(kFromEnd | kPeek) is not allowed\n");
}
- return DoReadMessageIndex(options, index);
+ return DoReadMessageIndex(options, index, timeout);
}
// Retrieves ("allocates") a message that can then be written to the queue.
@@ -153,7 +155,8 @@
// The public wrappers around these are inlined and do argument checking.
bool DoWriteMessage(void *msg, Options<RawQueue> options);
const void *DoReadMessage(Options<RawQueue> options);
- const void *DoReadMessageIndex(Options<RawQueue> options, int *index);
+ const void *DoReadMessageIndex(Options<RawQueue> options, int *index,
+ ::std::chrono::nanoseconds timeout);
void CheckReadOptions(Options<RawQueue> options) {
static constexpr Options<RawQueue> kValidOptions =
kPeek | kFromEnd | kNonBlock | kBlock;
@@ -216,7 +219,8 @@
// Must be called with data_lock_ locked.
// *read_data will be initialized.
// Returns with a readable message in data_ or false.
- bool ReadCommonStart(Options<RawQueue> options, int *index);
+ bool ReadCommonStart(Options<RawQueue> options, int *index,
+ ::std::chrono::nanoseconds timeout);
// Deals with setting/unsetting readable_ and writable_.
// Must be called after data_lock_ has been unlocked.
// read_data should be the same thing that was passed in to ReadCommonStart.