make all accesses to ref_count explicitly atomic
It wasn't like this before out of sheer laziness. I think clang failed
the static_assert that made sure it worked, but that might have only
been when it was generating code for the wrong platform. Regardless,
this is still a good idea.
diff --git a/aos/linux_code/ipc_lib/queue.cc b/aos/linux_code/ipc_lib/queue.cc
index 9489265..148deed 100644
--- a/aos/linux_code/ipc_lib/queue.cc
+++ b/aos/linux_code/ipc_lib/queue.cc
@@ -45,16 +45,44 @@
// allocated aligned to 8 bytes and its size has to maintain that alignment for
// the message that follows immediately.
struct RawQueue::MessageHeader {
- // This gets incremented and decremented with atomic instructions without any
- // locks held.
- int32_t ref_count;
MessageHeader *next;
+
// Gets the message header immediately preceding msg.
static MessageHeader *Get(const void *msg) {
return reinterpret_cast<MessageHeader *>(__builtin_assume_aligned(
static_cast<uint8_t *>(const_cast<void *>(msg)) - sizeof(MessageHeader),
alignof(MessageHeader)));
}
+
+ int32_t ref_count() const {
+ return __atomic_load_n(&ref_count_, __ATOMIC_RELAXED);
+ }
+ void set_ref_count(int32_t val) {
+ __atomic_store_n(&ref_count_, val, __ATOMIC_RELAXED);
+ }
+
+ void ref_count_sub() {
+ // TODO(brians): Take the #ifdef out once clang can handle the
+ // __atomic_*_fetch variants which could be more efficient.
+#ifdef __clang__
+ __atomic_fetch_sub(&ref_count_, 1, __ATOMIC_RELAXED);
+#else
+ __atomic_sub_fetch(&ref_count_, 1, __ATOMIC_RELAXED);
+#endif
+ }
+ void ref_count_add() {
+#ifdef __clang__
+ __atomic_fetch_add(&ref_count_, 1, __ATOMIC_RELAXED);
+#else
+ __atomic_add_fetch(&ref_count_, 1, __ATOMIC_RELAXED);
+#endif
+ }
+
+ private:
+ // This gets accessed with atomic instructions without any
+ // locks held by various member functions.
+ int32_t ref_count_;
+
// Padding to make the total size 8 bytes if we have 4-byte pointers or bump
// it to 16 if a pointer is 8 bytes by itself.
#if __SIZEOF_POINTER__ == 8
@@ -79,25 +107,25 @@
void RawQueue::DecrementMessageReferenceCount(const void *msg) {
MessageHeader *header = MessageHeader::Get(msg);
- __atomic_sub_fetch(&header->ref_count, 1, __ATOMIC_RELAXED);
+ header->ref_count_sub();
if (kRefDebug) {
- printf("%p ref dec count: %p count=%d\n", this, msg, header->ref_count);
+ printf("%p ref dec count: %p count=%d\n", this, msg, header->ref_count());
}
// The only way it should ever be 0 is if we were the last one to decrement,
// in which case nobody else should have it around to re-increment it or
// anything in the middle, so this is safe to do not atomically with the
// decrement.
- if (header->ref_count == 0) {
+ if (header->ref_count() == 0) {
DoFreeMessage(msg);
} else {
- assert(header->ref_count > 0);
+ assert(header->ref_count() > 0);
}
}
inline void RawQueue::IncrementMessageReferenceCount(const void *msg) const {
MessageHeader *const header = MessageHeader::Get(msg);
- __atomic_add_fetch(&header->ref_count, 1, __ATOMIC_RELAXED);
+ header->ref_count_add();
if (kRefDebug) {
printf("%p ref inc count: %p\n", this, msg);
}
@@ -115,9 +143,7 @@
fprintf(stderr, "queue: couldn't get a message"
" for recycle queue %p\n", recycle_);
} else {
- // Nobody else has a reference to the message at this point, so no need to
- // be fancy about it.
- ++header->ref_count;
+ header->ref_count_add();
if (!recycle_->WriteMessage(const_cast<void *>(msg), kOverride)) {
fprintf(stderr, "queue: %p->WriteMessage(%p, kOverride) failed."
" aborting\n", recycle_, msg);
@@ -158,10 +184,7 @@
void *msg = reinterpret_cast<uint8_t *>(header + 1);
// It might be uninitialized, 0 from a previous use, or 1 from previously
// being recycled.
- header->ref_count = 1;
- static_assert(
- __atomic_always_lock_free(sizeof(header->ref_count), &header->ref_count),
- "we access this using not specifically atomic loads and stores");
+ header->set_ref_count(1);
if (kRefDebug) {
printf("%p ref alloc: %p\n", this, msg);
}