switched from the pool system to a (lockless) free list
diff --git a/aos/linux_code/ipc_lib/queue.cc b/aos/linux_code/ipc_lib/queue.cc
index b92ceb9..211882f 100644
--- a/aos/linux_code/ipc_lib/queue.cc
+++ b/aos/linux_code/ipc_lib/queue.cc
@@ -44,7 +44,7 @@
// This gets incremented and decremented with atomic instructions without any
// locks held.
int ref_count;
- int index; // in pool_
+ MessageHeader *next;
// Gets the message header immediately preceding msg.
static MessageHeader *Get(const void *msg) {
return reinterpret_cast<MessageHeader *>(__builtin_assume_aligned(
@@ -57,9 +57,14 @@
memcpy(other, this, sizeof(*other));
memcpy(this, &temp, sizeof(*this));
}
+#if __SIZEOF_POINTER__ == 8
+ char padding[16 - sizeof(next) - sizeof(ref_count)];
+#elif __SIZEOF_POINTER__ == 4
+ // No padding needed to get 8 byte total size.
+#else
+#error Unknown pointer size.
+#endif
};
-static_assert(shm_ok<RawQueue::MessageHeader>::value,
- "the whole point is to stick it in shared memory");
struct RawQueue::ReadData {
bool writable_start;
@@ -69,7 +74,7 @@
MessageHeader *header = MessageHeader::Get(msg);
__atomic_sub_fetch(&header->ref_count, 1, __ATOMIC_RELAXED);
if (kRefDebug) {
- printf("ref_dec_count: %p count=%d\n", msg, header->ref_count);
+ printf("%p ref dec count: %p count=%d\n", this, msg, header->ref_count);
}
// The only way it should ever be 0 is if we were the last one to decrement,
@@ -87,12 +92,17 @@
MessageHeader *const header = MessageHeader::Get(msg);
__atomic_add_fetch(&header->ref_count, 1, __ATOMIC_RELAXED);
if (kRefDebug) {
- printf("ref inc count: %p\n", msg);
+ printf("%p ref inc count: %p\n", this, msg);
}
}
RawQueue::RawQueue(const char *name, size_t length, int hash, int queue_length)
: readable_(&data_lock_), writable_(&data_lock_) {
+ static_assert(shm_ok<RawQueue::MessageHeader>::value,
+ "the whole point is to stick it in shared memory");
+ static_assert((sizeof(RawQueue::MessageHeader) % 8) == 0,
+ "need to revalidate size/alignent assumptions");
+
const size_t name_size = strlen(name) + 1;
char *temp = static_cast<char *>(shm_malloc(name_size));
memcpy(temp, name, name_size);
@@ -118,15 +128,15 @@
data_end_ = 0;
messages_ = 0;
- pool_length_ = queue_length + kExtraMessages;
- messages_used_ = 0;
msg_length_ = length + sizeof(MessageHeader);
- pool_ = static_cast<MessageHeader **>(
- shm_malloc(sizeof(MessageHeader *) * pool_length_));
- for (int i = 0; i < pool_length_; ++i) {
- pool_[i] =
+
+ MessageHeader *previous = nullptr;
+ for (int i = 0; i < queue_length + kExtraMessages; ++i) {
+ MessageHeader *const message =
static_cast<MessageHeader *>(shm_malloc(msg_length_));
- pool_[i]->index = i;
+ free_messages_ = message;
+ message->next = previous;
+ previous = message;
}
if (kFetchDebug) {
@@ -189,20 +199,10 @@
}
void RawQueue::DoFreeMessage(const void *msg) {
- MutexLocker locker(&pool_lock_);
-
MessageHeader *header = MessageHeader::Get(msg);
- if (pool_[header->index] != header) { // if something's messed up
- fprintf(stderr, "queue: something is very very wrong with queue %p."
- " pool_(=%p)[header->index(=%d)] != header(=%p)\n",
- this, pool_, header->index, header);
- printf("queue: see stderr\n");
- abort();
- }
if (kRefDebug) {
- printf("ref free: %p\n", msg);
+ printf("%p ref free->%p: %p\n", this, recycle_, msg);
}
- --messages_used_;
if (recycle_ != NULL) {
void *const new_msg = recycle_->GetMessage();
@@ -210,21 +210,7 @@
fprintf(stderr, "queue: couldn't get a message"
" for recycle queue %p\n", recycle_);
} else {
- // Take a message from recycle_ and switch its
- // header with the one being freed, which effectively
- // switches which queue each message belongs to.
- MessageHeader *const new_header = MessageHeader::Get(new_msg);
- // Also switch the messages between the pools.
- pool_[header->index] = new_header;
- {
- MutexLocker locker(&recycle_->pool_lock_);
- recycle_->pool_[new_header->index] = header;
- // Swap the information in both headers.
- header->Swap(new_header);
- // Don't unlock the other pool until all of its messages are valid.
- }
- // use the header for new_msg which is now for this pool
- header = new_header;
+ IncrementMessageReferenceCount(msg);
if (!recycle_->WriteMessage(const_cast<void *>(msg), kOverride)) {
fprintf(stderr, "queue: %p->WriteMessage(%p, kOverride) failed."
" aborting\n", recycle_, msg);
@@ -232,20 +218,14 @@
abort();
}
msg = new_msg;
+ header = MessageHeader::Get(new_msg);
}
}
- // If we're not freeing the one on the end.
- if (header->index != messages_used_) {
- // The one that is on the end before we change it.
- MessageHeader *const other_header = pool_[messages_used_];
- // Put the last one where the one we're freeing was.
- pool_[header->index] = other_header;
- // Update the former last one's index.
- other_header->index = header->index;
- // Put the one we're freeing at the end.
- pool_[messages_used_] = header;
- header->index = messages_used_;
+ header->next = __atomic_load_n(&free_messages_, __ATOMIC_RELAXED);
+ while (
+ !__atomic_compare_exchange_n(&free_messages_, &header->next, header,
+ true, __ATOMIC_RELEASE, __ATOMIC_RELAXED)) {
}
}
@@ -524,13 +504,18 @@
void *RawQueue::GetMessage() {
// TODO(brians): Test this function.
- MutexLocker locker(&pool_lock_);
- MessageHeader *const header = pool_[messages_used_];
- if (messages_used_ >= pool_length_) {
- LOG(FATAL, "overused pool of queue %p\n", this);
- }
- assert(header->index == messages_used_);
- void *msg = reinterpret_cast<uint8_t *>(header) + sizeof(MessageHeader);
+
+ MessageHeader *header = __atomic_load_n(&free_messages_, __ATOMIC_RELAXED);
+ do {
+ if (__builtin_expect(header == nullptr, 0)) {
+ LOG(FATAL, "overused pool of queue %p\n", this);
+ }
+ } while (
+ !__atomic_compare_exchange_n(&free_messages_, &header, header->next, true,
+ __ATOMIC_ACQ_REL, __ATOMIC_RELAXED));
+ void *msg = reinterpret_cast<uint8_t *>(header + 1);
+ // It might be uninitialized, 0 from a previous use, or 1 from previously
+ // getting recycled.
header->ref_count = 1;
static_assert(
__atomic_always_lock_free(sizeof(header->ref_count), &header->ref_count),
@@ -538,8 +523,17 @@
if (kRefDebug) {
printf("%p ref alloc: %p\n", this, msg);
}
- ++messages_used_;
return msg;
}
+int RawQueue::FreeMessages() const {
+ int r = 0;
+ MessageHeader *header = free_messages_;
+ while (header != nullptr) {
+ ++r;
+ header = header->next;
+ }
+ return r;
+}
+
} // namespace aos
diff --git a/aos/linux_code/ipc_lib/queue.h b/aos/linux_code/ipc_lib/queue.h
index 70f3943..33ed9f4 100644
--- a/aos/linux_code/ipc_lib/queue.h
+++ b/aos/linux_code/ipc_lib/queue.h
@@ -113,9 +113,9 @@
if (msg != NULL) DecrementMessageReferenceCount(msg);
}
- // Returns the number of messages from this queue that are currently used (in
- // the queue and/or given out as references).
- int messages_used() const { return messages_used_; }
+ // UNSAFE! Returns the number of free messages we have. Only safe to use when
+ // only 1 task is using this object (ie in tests).
+ int FreeMessages() const;
private:
struct MessageHeader;
@@ -145,12 +145,9 @@
int messages_; // that have passed through
void **data_; // array of messages (with headers)
- Mutex pool_lock_;
size_t msg_length_; // sizeof(each message) including the header
- int messages_used_;
- // The number of messages in pool_.
- int pool_length_;
- MessageHeader **pool_; // array of pointers to messages
+ // A pointer to the first in the linked list of free messages.
+ MessageHeader *free_messages_;
// Actually frees the given message.
void DoFreeMessage(const void *msg);
diff --git a/aos/linux_code/ipc_lib/raw_queue_test.cc b/aos/linux_code/ipc_lib/raw_queue_test.cc
index 6778df3..e91655a 100644
--- a/aos/linux_code/ipc_lib/raw_queue_test.cc
+++ b/aos/linux_code/ipc_lib/raw_queue_test.cc
@@ -25,6 +25,9 @@
namespace aos {
namespace testing {
+// The same constant from queue.cc. This will have to be updated if that one is.
+const int kExtraMessages = 20;
+
class RawQueueTest : public ::testing::Test {
protected:
static const size_t kFailureSize = 400;
@@ -472,9 +475,9 @@
RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 2);
const TestMessage *message, *peek_message;
- EXPECT_EQ(0, queue->messages_used());
+ EXPECT_EQ(0, kExtraMessages + 2 - queue->FreeMessages());
PushMessage(queue, 971);
- EXPECT_EQ(1, queue->messages_used());
+ EXPECT_EQ(1, kExtraMessages + 2 - queue->FreeMessages());
int index = 0;
peek_message = static_cast<const TestMessage *>(
@@ -489,7 +492,7 @@
queue->FreeMessage(peek_message);
PushMessage(queue, 1768);
- EXPECT_EQ(2, queue->messages_used());
+ EXPECT_EQ(2, kExtraMessages + 2 - queue->FreeMessages());
peek_message = static_cast<const TestMessage *>(
queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
message = static_cast<const TestMessage *>(
@@ -525,18 +528,18 @@
queue->FreeMessage(message);
queue->FreeMessage(peek_message);
- EXPECT_EQ(2, queue->messages_used());
+ EXPECT_EQ(2, kExtraMessages + 2 - queue->FreeMessages());
}
TEST_F(RawQueueTest, ReadIndexNotBehind) {
RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 2);
const TestMessage *message, *peek_message;
- EXPECT_EQ(0, queue->messages_used());
+ EXPECT_EQ(0, kExtraMessages + 2 - queue->FreeMessages());
PushMessage(queue, 971);
- EXPECT_EQ(1, queue->messages_used());
+ EXPECT_EQ(1, kExtraMessages + 2 - queue->FreeMessages());
PushMessage(queue, 1768);
- EXPECT_EQ(2, queue->messages_used());
+ EXPECT_EQ(2, kExtraMessages + 2 - queue->FreeMessages());
int index = 0;
peek_message = static_cast<const TestMessage *>(
@@ -689,11 +692,11 @@
message2 = queue->ReadMessage(RawQueue::kNonBlock);
ASSERT_NE(nullptr, message2);
PushMessage(queue, 973);
- EXPECT_EQ(4, queue->messages_used());
+ EXPECT_EQ(4, kExtraMessages + 2 - queue->FreeMessages());
queue->FreeMessage(message1);
- EXPECT_EQ(3, queue->messages_used());
+ EXPECT_EQ(3, kExtraMessages + 2 - queue->FreeMessages());
queue->FreeMessage(message2);
- EXPECT_EQ(2, queue->messages_used());
+ EXPECT_EQ(2, kExtraMessages + 2 - queue->FreeMessages());
}
int index = 0;
@@ -861,17 +864,17 @@
RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 2);
const void *message1, *message2;
- EXPECT_EQ(0, queue->messages_used());
+ EXPECT_EQ(0, kExtraMessages + 2 - queue->FreeMessages());
message1 = queue->GetMessage();
EXPECT_NE(nullptr, message1);
- EXPECT_EQ(1, queue->messages_used());
+ EXPECT_EQ(1, kExtraMessages + 2 - queue->FreeMessages());
message2 = queue->GetMessage();
EXPECT_NE(nullptr, message2);
- EXPECT_EQ(2, queue->messages_used());
+ EXPECT_EQ(2, kExtraMessages + 2 - queue->FreeMessages());
queue->FreeMessage(message1);
- EXPECT_EQ(1, queue->messages_used());
+ EXPECT_EQ(1, kExtraMessages + 2 - queue->FreeMessages());
queue->FreeMessage(message2);
- EXPECT_EQ(0, queue->messages_used());
+ EXPECT_EQ(0, kExtraMessages + 2 - queue->FreeMessages());
}
} // namespace testing