added more queue tests and switched to atomic ops for message ref counts
diff --git a/aos/linux_code/ipc_lib/queue.cc b/aos/linux_code/ipc_lib/queue.cc
index 8103b6e..39fd558 100644
--- a/aos/linux_code/ipc_lib/queue.cc
+++ b/aos/linux_code/ipc_lib/queue.cc
@@ -38,6 +38,8 @@
const int RawQueue::kOverride;
struct RawQueue::MessageHeader {
+ // This gets incremented and decremented with atomic instructions without any
+ // locks held.
int ref_count;
int index; // in pool_
// Gets the message header immediately preceding msg.
@@ -60,19 +62,22 @@
bool writable_start;
};
-// TODO(brians) maybe do this with atomic integer instructions so it doesn't
-// have to lock/unlock pool_lock_
void RawQueue::DecrementMessageReferenceCount(const void *msg) {
- // TODO(brians): Test this function.
- MutexLocker locker(&pool_lock_);
MessageHeader *header = MessageHeader::Get(msg);
- --header->ref_count;
- assert(header->ref_count >= 0);
+ __atomic_sub_fetch(&header->ref_count, 1, __ATOMIC_RELAXED);
if (kRefDebug) {
printf("ref_dec_count: %p count=%d\n", msg, header->ref_count);
}
+
+ // The only way it should ever be 0 is if we were the last one to decrement,
+ // in which case nobody else should have it around to re-increment it or
+ // anything in the middle, so this is safe to do not atomically with the
+ // decrement.
if (header->ref_count == 0) {
+ MutexLocker locker(&pool_lock_);
DoFreeMessage(msg);
+ } else {
+ assert(header->ref_count > 0);
}
}
@@ -332,7 +337,6 @@
void *RawQueue::ReadPeek(int options, int start) const {
void *ret;
if (options & kFromEnd) {
- // TODO(brians): Test this block with ReadMessageIndex.
int pos = data_end_ - 1;
if (pos < 0) { // if it needs to wrap
pos = data_length_ - 1;
@@ -349,9 +353,9 @@
ret = data_[start];
}
MessageHeader *const header = MessageHeader::Get(ret);
- ++header->ref_count;
+ __atomic_add_fetch(&header->ref_count, 1, __ATOMIC_RELAXED);
if (kRefDebug) {
- printf("ref inc count: %p\n", ret);
+ printf("ref inc count1: %p\n", ret);
}
return ret;
}
@@ -471,7 +475,6 @@
msg = ReadPeek(options, my_start);
} else {
if (options & kFromEnd) {
- // TODO(brians): Test this block.
if (kReadDebug) {
printf("queue: %p start of c1\n", this);
}
@@ -501,9 +504,9 @@
++(*index);
}
MessageHeader *const header = MessageHeader::Get(msg);
- ++header->ref_count;
+ __atomic_add_fetch(&header->ref_count, 1, __ATOMIC_RELAXED);
if (kRefDebug) {
- printf("ref_inc_count: %p\n", msg);
+ printf("ref_inc_count2: %p\n", msg);
}
}
ReadCommonEnd(&read_data);
@@ -526,6 +529,9 @@
}
void *msg = reinterpret_cast<uint8_t *>(header) + sizeof(MessageHeader);
header->ref_count = 1;
+ static_assert(
+ __atomic_always_lock_free(sizeof(header->ref_count), &header->ref_count),
+ "we access this using not specifically atomic loads and stores");
if (kRefDebug) {
printf("%p ref alloc: %p\n", this, msg);
}