commented and reorganized some queue code
Having ReadPeek separate didn't really make sense because it did the
same thing as non-peek except for changing the read index.
diff --git a/aos/linux_code/ipc_lib/queue.cc b/aos/linux_code/ipc_lib/queue.cc
index 211882f..2b25b93 100644
--- a/aos/linux_code/ipc_lib/queue.cc
+++ b/aos/linux_code/ipc_lib/queue.cc
@@ -12,6 +12,9 @@
#include "aos/common/type_traits.h"
#include "aos/linux_code/ipc_lib/core_lib.h"
+#undef assert
+#define assert(...)
+
namespace aos {
namespace {
@@ -43,7 +46,7 @@
struct RawQueue::MessageHeader {
// This gets incremented and decremented with atomic instructions without any
// locks held.
- int ref_count;
+ int32_t ref_count;
MessageHeader *next;
// Gets the message header immediately preceding msg.
static MessageHeader *Get(const void *msg) {
@@ -51,14 +54,10 @@
static_cast<uint8_t *>(const_cast<void *>(msg)) - sizeof(MessageHeader),
alignof(MessageHeader)));
}
- void Swap(MessageHeader *other) {
- MessageHeader temp;
- memcpy(&temp, other, sizeof(temp));
- memcpy(other, this, sizeof(*other));
- memcpy(this, &temp, sizeof(*this));
- }
+ // Padding to make the total size 8 bytes if we have 4-byte pointers or bump
+ // it to 16 if a pointer is 8 bytes by itself.
#if __SIZEOF_POINTER__ == 8
- char padding[16 - sizeof(next) - sizeof(ref_count)];
+ char padding[4];
#elif __SIZEOF_POINTER__ == 4
// No padding needed to get 8 byte total size.
#else
@@ -88,7 +87,7 @@
}
}
-void RawQueue::IncrementMessageReferenceCount(const void *msg) const {
+inline void RawQueue::IncrementMessageReferenceCount(const void *msg) const {
MessageHeader *const header = MessageHeader::Get(msg);
__atomic_add_fetch(&header->ref_count, 1, __ATOMIC_RELAXED);
if (kRefDebug) {
@@ -103,6 +102,10 @@
static_assert((sizeof(RawQueue::MessageHeader) % 8) == 0,
"need to revalidate size/alignent assumptions");
+ if (queue_length < 1) {
+ LOG(FATAL, "queue length %d needs to be at least 1\n", queue_length);
+ }
+
const size_t name_size = strlen(name) + 1;
char *temp = static_cast<char *>(shm_malloc(name_size));
memcpy(temp, name, name_size);
@@ -120,9 +123,6 @@
}
data_length_ = queue_length + 1;
- if (data_length_ < 2) { // TODO(brians) when could this happen?
- data_length_ = 2;
- }
data_ = static_cast<void **>(shm_malloc(sizeof(void *) * data_length_));
data_start_ = 0;
data_end_ = 0;
@@ -130,13 +130,16 @@
msg_length_ = length + sizeof(MessageHeader);
- MessageHeader *previous = nullptr;
- for (int i = 0; i < queue_length + kExtraMessages; ++i) {
- MessageHeader *const message =
- static_cast<MessageHeader *>(shm_malloc(msg_length_));
- free_messages_ = message;
- message->next = previous;
- previous = message;
+ // Create all of the messages for the free list and stick them on.
+ {
+ MessageHeader *previous = nullptr;
+ for (int i = 0; i < queue_length + kExtraMessages; ++i) {
+ MessageHeader *const message =
+ static_cast<MessageHeader *>(shm_malloc(msg_length_));
+ free_messages_ = message;
+ message->next = previous;
+ previous = message;
+ }
}
if (kFetchDebug) {
@@ -149,7 +152,8 @@
printf("fetching queue %s\n", name);
}
if (mutex_lock(&global_core->mem_struct->queues.lock) != 0) {
- return NULL;
+ LOG(FATAL, "mutex_lock(%p) failed\n",
+ &global_core->mem_struct->queues.lock);
}
RawQueue *current = static_cast<RawQueue *>(
global_core->mem_struct->queues.pointer);
@@ -198,19 +202,21 @@
return r;
}
-void RawQueue::DoFreeMessage(const void *msg) {
+inline void RawQueue::DoFreeMessage(const void *msg) {
MessageHeader *header = MessageHeader::Get(msg);
if (kRefDebug) {
- printf("%p ref free->%p: %p\n", this, recycle_, msg);
+ printf("%p ref free to %p: %p\n", this, recycle_, msg);
}
- if (recycle_ != NULL) {
+ if (__builtin_expect(recycle_ != nullptr, 0)) {
void *const new_msg = recycle_->GetMessage();
- if (new_msg == NULL) {
+ if (new_msg == nullptr) {
fprintf(stderr, "queue: couldn't get a message"
" for recycle queue %p\n", recycle_);
} else {
- IncrementMessageReferenceCount(msg);
+ // Nobody else has a reference to the message at this point, so no need to
+ // be fancy about it.
+ ++header->ref_count;
if (!recycle_->WriteMessage(const_cast<void *>(msg), kOverride)) {
fprintf(stderr, "queue: %p->WriteMessage(%p, kOverride) failed."
" aborting\n", recycle_, msg);
@@ -222,11 +228,20 @@
}
}
- header->next = __atomic_load_n(&free_messages_, __ATOMIC_RELAXED);
- while (
- !__atomic_compare_exchange_n(&free_messages_, &header->next, header,
- true, __ATOMIC_RELEASE, __ATOMIC_RELAXED)) {
- }
+ // This works around GCC bug 60272 (fixed in 4.8.3).
+ // new_next should just get replaced with header->next (and the body of the
+ // loop should become empty).
+ // The bug is that the store to new_next after the compare/exchange is
+ // unconditional but it should only be if it fails, which could mean
+ // overwriting what somebody else who preempted us right then changed it to.
+ // TODO(brians): Get rid of this workaround once we get a new enough GCC.
+ MessageHeader *new_next = __atomic_load_n(&free_messages_, __ATOMIC_RELAXED);
+ do {
+ header->next = new_next;
+ } while (__builtin_expect(
+ !__atomic_compare_exchange_n(&free_messages_, &new_next, header, true,
+ __ATOMIC_RELEASE, __ATOMIC_RELAXED),
+ 0));
}
bool RawQueue::WriteMessage(void *msg, int options) {
@@ -297,7 +312,7 @@
return true;
}
-void RawQueue::ReadCommonEnd(ReadData *read_data) {
+inline void RawQueue::ReadCommonEnd(ReadData *read_data) {
if (is_writable()) {
if (kReadDebug) {
printf("queue: %ssignalling writable_ of %p\n",
@@ -306,6 +321,7 @@
if (!read_data->writable_start) writable_.Signal();
}
}
+
bool RawQueue::ReadCommonStart(int options, int *index, ReadData *read_data) {
read_data->writable_start = is_writable();
while (data_start_ == data_end_ || ((index != NULL) && messages_ <= *index)) {
@@ -331,27 +347,15 @@
}
return true;
}
-void *RawQueue::ReadPeek(int options, int start) const {
- void *ret;
- if (options & kFromEnd) {
- int pos = data_end_ - 1;
- if (pos < 0) { // if it needs to wrap
- pos = data_length_ - 1;
- }
- if (kReadDebug) {
- printf("queue: %p reading from line %d: %d\n", this, __LINE__, pos);
- }
- ret = data_[pos];
- } else {
- assert(start != -1);
- if (kReadDebug) {
- printf("queue: %p reading from line %d: %d\n", this, __LINE__, start);
- }
- ret = data_[start];
+
+inline int RawQueue::LastMessageIndex() const {
+ int pos = data_end_ - 1;
+ if (pos < 0) { // If it wrapped around.
+ pos = data_length_ - 1;
}
- IncrementMessageReferenceCount(ret);
- return ret;
+ return pos;
}
+
const void *RawQueue::ReadMessage(int options) {
// TODO(brians): Test this function.
if (kReadDebug) {
@@ -369,10 +373,14 @@
return NULL;
}
- if (options & kPeek) {
- msg = ReadPeek(options, data_start_);
- } else {
- if (options & kFromEnd) {
+ if (options & kFromEnd) {
+ if (options & kPeek) {
+ if (kReadDebug) {
+ printf("queue: %p shortcutting c2: %d\n", this, LastMessageIndex());
+ }
+ msg = data_[LastMessageIndex()];
+ IncrementMessageReferenceCount(msg);
+ } else {
while (true) {
if (kReadDebug) {
printf("queue: %p start of c2\n", this);
@@ -391,11 +399,15 @@
// This message is not going to be in the queue any more.
DecrementMessageReferenceCount(data_[pos]);
}
+ }
+ } else {
+ if (kReadDebug) {
+ printf("queue: %p reading from d2: %d\n", this, data_start_);
+ }
+ msg = data_[data_start_];
+ if (options & kPeek) {
+ IncrementMessageReferenceCount(msg);
} else {
- if (kReadDebug) {
- printf("queue: %p reading from d2: %d\n", this, data_start_);
- }
- msg = data_[data_start_];
data_start_ = (data_start_ + 1) % data_length_;
}
}
@@ -405,6 +417,7 @@
}
return msg;
}
+
const void *RawQueue::ReadMessageIndex(int options, int *index) {
if (kReadDebug) {
printf("queue: %p->ReadMessageIndex(%x, %p(*=%d))\n",
@@ -424,12 +437,19 @@
// TODO(parker): Handle integer wrap on the index.
- // Where we're going to start reading.
- int my_start;
-
if (options & kFromEnd) {
- my_start = -1;
+ if (kReadDebug) {
+ printf("queue: %p start of c1\n", this);
+ }
+ if (kReadDebug) {
+ printf("queue: %p reading from c1: %d\n", this, LastMessageIndex());
+ }
+ msg = data_[LastMessageIndex()];
+ if (!(options & kPeek)) *index = messages_;
} else {
+ // Where we're going to start reading.
+ int my_start;
+
const int unread_messages = messages_ - *index;
assert(unread_messages > 0);
int current_messages = data_end_ - data_start_;
@@ -462,42 +482,20 @@
if (my_start < 0) my_start += data_length_;
}
}
- }
- if (options & kPeek) {
- msg = ReadPeek(options, my_start);
- } else {
- if (options & kFromEnd) {
- if (kReadDebug) {
- printf("queue: %p start of c1\n", this);
- }
- int pos = data_end_ - 1;
- if (kReadIndexDebug) {
- printf("queue: %p end pos start %d\n", this, pos);
- }
- if (pos < 0) { // If it wrapped.
- pos = data_length_ - 1; // Unwrap it.
- }
- if (kReadDebug) {
- printf("queue: %p reading from c1: %d\n", this, pos);
- }
- msg = data_[pos];
- *index = messages_;
- } else {
- if (kReadDebug) {
- printf("queue: %p reading from d1: %d\n", this, my_start);
- }
- // This assert checks that we're either within both endpoints (duh) or
- // not between them (if the queue is wrapped around).
- assert((my_start >= data_start_ && my_start < data_end_) ||
- ((my_start >= data_start_) == (my_start > data_end_)));
- // More sanity checking.
- assert((my_start >= 0) && (my_start < data_length_));
- msg = data_[my_start];
- ++(*index);
+ if (kReadDebug) {
+ printf("queue: %p reading from d1: %d\n", this, my_start);
}
- IncrementMessageReferenceCount(msg);
+ // We have to be either after the start or before the end, even if the queue
+ // is wrapped around.
+ assert((my_start >= data_start_) || (my_start < data_end_));
+ // More sanity checking.
+ assert((my_start >= 0) && (my_start < data_length_));
+ msg = data_[my_start];
+ if (!(options & kPeek)) ++(*index);
}
+ IncrementMessageReferenceCount(msg);
+
ReadCommonEnd(&read_data);
return msg;
}
@@ -510,9 +508,10 @@
if (__builtin_expect(header == nullptr, 0)) {
LOG(FATAL, "overused pool of queue %p\n", this);
}
- } while (
+ } while (__builtin_expect(
!__atomic_compare_exchange_n(&free_messages_, &header, header->next, true,
- __ATOMIC_ACQ_REL, __ATOMIC_RELAXED));
+ __ATOMIC_ACQ_REL, __ATOMIC_RELAXED),
+ 0));
void *msg = reinterpret_cast<uint8_t *>(header + 1);
// It might be uninitialized, 0 from a previous use, or 1 from previously
// getting recycled.