improved the readability of the queue code a bit
diff --git a/aos/linux_code/ipc_lib/queue.cc b/aos/linux_code/ipc_lib/queue.cc
index 39fd558..955adea 100644
--- a/aos/linux_code/ipc_lib/queue.cc
+++ b/aos/linux_code/ipc_lib/queue.cc
@@ -37,6 +37,9 @@
const int RawQueue::kBlock;
const int RawQueue::kOverride;
+// This is what gets stuck in before each queue message in memory. It is always
+// allocated aligned to 8 bytes and its size has to maintain that alignment for
+// the message that follows immediately.
struct RawQueue::MessageHeader {
// This gets incremented and decremented with atomic instructions without any
// locks held.
@@ -74,13 +77,20 @@
// anything in the middle, so this is safe to do not atomically with the
// decrement.
if (header->ref_count == 0) {
- MutexLocker locker(&pool_lock_);
DoFreeMessage(msg);
} else {
assert(header->ref_count > 0);
}
}
+void RawQueue::IncrementMessageReferenceCount(const void *msg) const {
+ MessageHeader *const header = MessageHeader::Get(msg);
+ __atomic_add_fetch(&header->ref_count, 1, __ATOMIC_RELAXED);
+ if (kRefDebug) {
+ printf("ref inc count: %p\n", msg);
+ }
+}
+
RawQueue::RawQueue(const char *name, size_t length, int hash, int queue_length)
: readable_(&data_lock_), writable_(&data_lock_) {
const size_t name_size = strlen(name) + 1;
@@ -175,6 +185,8 @@
}
void RawQueue::DoFreeMessage(const void *msg) {
+ MutexLocker locker(&pool_lock_);
+
MessageHeader *header = MessageHeader::Get(msg);
if (pool_[header->index] != header) { // if something's messed up
fprintf(stderr, "queue: something is very very wrong with queue %p."
@@ -219,17 +231,18 @@
}
}
- // Where the one we're freeing was.
- int index = header->index;
- header->index = -1;
- if (index != messages_used_) { // if we're not freeing the one on the end
+ // If we're not freeing the one on the end.
+ if (header->index != messages_used_) {
+ // The one that is on the end before we change it.
+ MessageHeader *const other_header = pool_[messages_used_];
// Put the last one where the one we're freeing was.
- header = pool_[index] = pool_[messages_used_];
- // Put the one we're freeing at the end.
- pool_[messages_used_] = MessageHeader::Get(msg);
+ pool_[header->index] = other_header;
// Update the former last one's index.
- header->index = index;
+ other_header->index = header->index;
+ // Put the one we're freeing at the end.
+ pool_[messages_used_] = header;
}
+ header->index = -1;
}
bool RawQueue::WriteMessage(void *msg, int options) {
@@ -352,11 +365,7 @@
}
ret = data_[start];
}
- MessageHeader *const header = MessageHeader::Get(ret);
- __atomic_add_fetch(&header->ref_count, 1, __ATOMIC_RELAXED);
- if (kRefDebug) {
- printf("ref inc count1: %p\n", ret);
- }
+ IncrementMessageReferenceCount(ret);
return ret;
}
const void *RawQueue::ReadMessage(int options) {
@@ -503,11 +512,7 @@
msg = data_[my_start];
++(*index);
}
- MessageHeader *const header = MessageHeader::Get(msg);
- __atomic_add_fetch(&header->ref_count, 1, __ATOMIC_RELAXED);
- if (kRefDebug) {
- printf("ref_inc_count2: %p\n", msg);
- }
+ IncrementMessageReferenceCount(msg);
}
ReadCommonEnd(&read_data);
return msg;
@@ -517,8 +522,10 @@
// TODO(brians): Test this function.
MutexLocker locker(&pool_lock_);
MessageHeader *header;
- if (pool_length_ - messages_used_ > 0) {
+ if (pool_length_ > messages_used_) {
header = pool_[messages_used_];
+ //assert(header->index == messages_used_);
+ assert(header->index == -1);
} else {
if (pool_length_ >= mem_length_) {
LOG(FATAL, "overused pool of queue %p\n", this);
@@ -527,6 +534,7 @@
static_cast<MessageHeader *>(shm_malloc(msg_length_));
++pool_length_;
}
+ header->index = messages_used_;
void *msg = reinterpret_cast<uint8_t *>(header) + sizeof(MessageHeader);
header->ref_count = 1;
static_assert(
@@ -535,7 +543,6 @@
if (kRefDebug) {
printf("%p ref alloc: %p\n", this, msg);
}
- header->index = messages_used_;
++messages_used_;
return msg;
}