reordered functions and made writable_start a member variable
diff --git a/aos/linux_code/ipc_lib/queue.cc b/aos/linux_code/ipc_lib/queue.cc
index 2b25b93..67faf5b 100644
--- a/aos/linux_code/ipc_lib/queue.cc
+++ b/aos/linux_code/ipc_lib/queue.cc
@@ -65,10 +65,6 @@
#endif
};
-struct RawQueue::ReadData {
- bool writable_start;
-};
-
void RawQueue::DecrementMessageReferenceCount(const void *msg) {
MessageHeader *header = MessageHeader::Get(msg);
__atomic_sub_fetch(&header->ref_count, 1, __ATOMIC_RELAXED);
@@ -95,6 +91,73 @@
}
}
+inline void RawQueue::DoFreeMessage(const void *msg) {
+ MessageHeader *header = MessageHeader::Get(msg);
+ if (kRefDebug) {
+ printf("%p ref free to %p: %p\n", this, recycle_, msg);
+ }
+
+ if (__builtin_expect(recycle_ != nullptr, 0)) {
+ void *const new_msg = recycle_->GetMessage();
+ if (new_msg == nullptr) {
+ fprintf(stderr, "queue: couldn't get a message"
+ " for recycle queue %p\n", recycle_);
+ } else {
+ // Nobody else has a reference to the message at this point, so no need to
+ // be fancy about it.
+ ++header->ref_count;
+ if (!recycle_->WriteMessage(const_cast<void *>(msg), kOverride)) {
+ fprintf(stderr, "queue: %p->WriteMessage(%p, kOverride) failed."
+ " aborting\n", recycle_, msg);
+ printf("see stderr\n");
+ abort();
+ }
+ msg = new_msg;
+ header = MessageHeader::Get(new_msg);
+ }
+ }
+
+ // This works around GCC bug 60272 (fixed in 4.8.3).
+ // new_next should just get replaced with header->next (and the body of the
+ // loop should become empty).
+ // The bug is that the store to new_next after the compare/exchange is
+ // unconditional but it should only be if it fails, which could mean
+ // overwriting what somebody else who preempted us right then changed it to.
+ // TODO(brians): Get rid of this workaround once we get a new enough GCC.
+ MessageHeader *new_next = __atomic_load_n(&free_messages_, __ATOMIC_RELAXED);
+ do {
+ header->next = new_next;
+ } while (__builtin_expect(
+ !__atomic_compare_exchange_n(&free_messages_, &new_next, header, true,
+ __ATOMIC_RELEASE, __ATOMIC_RELAXED),
+ 0));
+}
+
+void *RawQueue::GetMessage() {
+ // TODO(brians): Test this function.
+
+ MessageHeader *header = __atomic_load_n(&free_messages_, __ATOMIC_RELAXED);
+ do {
+ if (__builtin_expect(header == nullptr, 0)) {
+ LOG(FATAL, "overused pool of queue %p\n", this);
+ }
+ } while (__builtin_expect(
+ !__atomic_compare_exchange_n(&free_messages_, &header, header->next, true,
+ __ATOMIC_ACQ_REL, __ATOMIC_RELAXED),
+ 0));
+ void *msg = reinterpret_cast<uint8_t *>(header + 1);
+ // It might be uninitialized, 0 from a previous use, or 1 from previously
+ // being recycled.
+ header->ref_count = 1;
+ static_assert(
+ __atomic_always_lock_free(sizeof(header->ref_count), &header->ref_count),
+ "we access this using not specifically atomic loads and stores");
+ if (kRefDebug) {
+ printf("%p ref alloc: %p\n", this, msg);
+ }
+ return msg;
+}
+
RawQueue::RawQueue(const char *name, size_t length, int hash, int queue_length)
: readable_(&data_lock_), writable_(&data_lock_) {
static_assert(shm_ok<RawQueue::MessageHeader>::value,
@@ -146,6 +209,7 @@
printf("made queue %s\n", name);
}
}
+
RawQueue *RawQueue::Fetch(const char *name, size_t length, int hash,
int queue_length) {
if (kFetchDebug) {
@@ -187,6 +251,7 @@
mutex_unlock(&global_core->mem_struct->queues.lock);
return r;
}
+
RawQueue *RawQueue::Fetch(const char *name, size_t length, int hash,
int queue_length,
int recycle_hash, int recycle_length, RawQueue **recycle) {
@@ -202,48 +267,6 @@
return r;
}
-inline void RawQueue::DoFreeMessage(const void *msg) {
- MessageHeader *header = MessageHeader::Get(msg);
- if (kRefDebug) {
- printf("%p ref free to %p: %p\n", this, recycle_, msg);
- }
-
- if (__builtin_expect(recycle_ != nullptr, 0)) {
- void *const new_msg = recycle_->GetMessage();
- if (new_msg == nullptr) {
- fprintf(stderr, "queue: couldn't get a message"
- " for recycle queue %p\n", recycle_);
- } else {
- // Nobody else has a reference to the message at this point, so no need to
- // be fancy about it.
- ++header->ref_count;
- if (!recycle_->WriteMessage(const_cast<void *>(msg), kOverride)) {
- fprintf(stderr, "queue: %p->WriteMessage(%p, kOverride) failed."
- " aborting\n", recycle_, msg);
- printf("see stderr\n");
- abort();
- }
- msg = new_msg;
- header = MessageHeader::Get(new_msg);
- }
- }
-
- // This works around GCC bug 60272 (fixed in 4.8.3).
- // new_next should just get replaced with header->next (and the body of the
- // loop should become empty).
- // The bug is that the store to new_next after the compare/exchange is
- // unconditional but it should only be if it fails, which could mean
- // overwriting what somebody else who preempted us right then changed it to.
- // TODO(brians): Get rid of this workaround once we get a new enough GCC.
- MessageHeader *new_next = __atomic_load_n(&free_messages_, __ATOMIC_RELAXED);
- do {
- header->next = new_next;
- } while (__builtin_expect(
- !__atomic_compare_exchange_n(&free_messages_, &new_next, header, true,
- __ATOMIC_RELEASE, __ATOMIC_RELAXED),
- 0));
-}
-
bool RawQueue::WriteMessage(void *msg, int options) {
// TODO(brians): Test this function.
if (kWriteDebug) {
@@ -312,18 +335,18 @@
return true;
}
-inline void RawQueue::ReadCommonEnd(ReadData *read_data) {
+inline void RawQueue::ReadCommonEnd() {
if (is_writable()) {
if (kReadDebug) {
printf("queue: %ssignalling writable_ of %p\n",
- read_data->writable_start ? "not " : "", this);
+ writable_start_ ? "not " : "", this);
}
- if (!read_data->writable_start) writable_.Signal();
+ if (!writable_start_) writable_.Signal();
}
}
-bool RawQueue::ReadCommonStart(int options, int *index, ReadData *read_data) {
- read_data->writable_start = is_writable();
+bool RawQueue::ReadCommonStart(int options, int *index) {
+ writable_start_ = is_writable();
while (data_start_ == data_end_ || ((index != NULL) && messages_ <= *index)) {
if (options & kNonBlock) {
if (kReadDebug) {
@@ -365,8 +388,7 @@
MutexLocker locker(&data_lock_);
- ReadData read_data;
- if (!ReadCommonStart(options, NULL, &read_data)) {
+ if (!ReadCommonStart(options, nullptr)) {
if (kReadDebug) {
printf("queue: %p common returned false\n", this);
}
@@ -411,7 +433,7 @@
data_start_ = (data_start_ + 1) % data_length_;
}
}
- ReadCommonEnd(&read_data);
+ ReadCommonEnd();
if (kReadDebug) {
printf("queue: %p read returning %p\n", this, msg);
}
@@ -427,8 +449,7 @@
MutexLocker locker(&data_lock_);
- ReadData read_data;
- if (!ReadCommonStart(options, index, &read_data)) {
+ if (!ReadCommonStart(options, index)) {
if (kReadDebug) {
printf("queue: %p common returned false\n", this);
}
@@ -439,9 +460,6 @@
if (options & kFromEnd) {
if (kReadDebug) {
- printf("queue: %p start of c1\n", this);
- }
- if (kReadDebug) {
printf("queue: %p reading from c1: %d\n", this, LastMessageIndex());
}
msg = data_[LastMessageIndex()];
@@ -477,17 +495,16 @@
printf("queue: %p original read from %d\n", this, my_start);
}
if (data_start_ < data_end_) {
- assert(my_start >= data_start_);
- } else {
- if (my_start < 0) my_start += data_length_;
+ assert(my_start >= 0);
}
+ if (my_start < 0) my_start += data_length_;
}
if (kReadDebug) {
printf("queue: %p reading from d1: %d\n", this, my_start);
}
// We have to be either after the start or before the end, even if the queue
- // is wrapped around.
+ // is wrapped around (should be both if it's not).
assert((my_start >= data_start_) || (my_start < data_end_));
// More sanity checking.
assert((my_start >= 0) && (my_start < data_length_));
@@ -496,32 +513,7 @@
}
IncrementMessageReferenceCount(msg);
- ReadCommonEnd(&read_data);
- return msg;
-}
-
-void *RawQueue::GetMessage() {
- // TODO(brians): Test this function.
-
- MessageHeader *header = __atomic_load_n(&free_messages_, __ATOMIC_RELAXED);
- do {
- if (__builtin_expect(header == nullptr, 0)) {
- LOG(FATAL, "overused pool of queue %p\n", this);
- }
- } while (__builtin_expect(
- !__atomic_compare_exchange_n(&free_messages_, &header, header->next, true,
- __ATOMIC_ACQ_REL, __ATOMIC_RELAXED),
- 0));
- void *msg = reinterpret_cast<uint8_t *>(header + 1);
- // It might be uninitialized, 0 from a previous use, or 1 from previously
- // getting recycled.
- header->ref_count = 1;
- static_assert(
- __atomic_always_lock_free(sizeof(header->ref_count), &header->ref_count),
- "we access this using not specifically atomic loads and stores");
- if (kRefDebug) {
- printf("%p ref alloc: %p\n", this, msg);
- }
+ ReadCommonEnd();
return msg;
}