got all of the code to actually compile again
I don't think it actually works though.
diff --git a/aos/atom_code/ipc_lib/queue.cc b/aos/atom_code/ipc_lib/queue.cc
index 57e2a5e..81d036c 100644
--- a/aos/atom_code/ipc_lib/queue.cc
+++ b/aos/atom_code/ipc_lib/queue.cc
@@ -1,4 +1,4 @@
-#include "aos/common/queue.h"
+#include "aos/atom_code/ipc_lib/queue.h"
#include <stdio.h>
#include <string.h>
@@ -9,12 +9,13 @@
#include "aos/common/logging/logging.h"
#include "aos/common/type_traits.h"
+#include "aos/atom_code/ipc_lib/core_lib.h"
namespace aos {
-
namespace {
-static_assert(shm_ok<Queue>::value, "Queue instances go into shared memory");
+static_assert(shm_ok<RawQueue>::value,
+ "RawQueue instances go into shared memory");
const bool kReadDebug = false;
const bool kWriteDebug = false;
@@ -22,12 +23,19 @@
const bool kFetchDebug = false;
// The number of extra messages the pool associated with each queue will be able
-// to hold (for readers who are slow about freeing them).
+// to hold (for readers who are slow about freeing them or who leak one when
+// they get killed).
const int kExtraMessages = 20;
} // namespace
-struct Queue::MessageHeader {
+const int RawQueue::kPeek;
+const int RawQueue::kFromEnd;
+const int RawQueue::kNonBlock;
+const int RawQueue::kBlock;
+const int RawQueue::kOverride;
+
+struct RawQueue::MessageHeader {
int ref_count;
int index; // in pool_
static MessageHeader *Get(const void *msg) {
@@ -42,12 +50,12 @@
memcpy(this, &temp, sizeof(*this));
}
};
-static_assert(shm_ok<Queue::MessageHeader>::value, "the whole point"
+static_assert(shm_ok<RawQueue::MessageHeader>::value, "the whole point"
" is to stick it in shared memory");
// TODO(brians) maybe do this with atomic integer instructions so it doesn't
// have to lock/unlock pool_lock_
-void Queue::DecrementMessageReferenceCount(const void *msg) {
+void RawQueue::DecrementMessageReferenceCount(const void *msg) {
MutexLocker locker(&pool_lock_);
MessageHeader *header = MessageHeader::Get(msg);
--header->ref_count;
@@ -60,7 +68,8 @@
}
}
-Queue::Queue(const char *name, size_t length, int hash, int queue_length) {
+RawQueue::RawQueue(const char *name, size_t length, int hash, int queue_length)
+ : readable_(&data_lock_), writable_(&data_lock_) {
const size_t name_size = strlen(name) + 1;
char *temp = static_cast<char *>(shm_malloc(name_size));
memcpy(temp, name, name_size);
@@ -97,7 +106,7 @@
printf("made queue %s\n", name);
}
}
-Queue *Queue::Fetch(const char *name, size_t length, int hash,
+RawQueue *RawQueue::Fetch(const char *name, size_t length, int hash,
int queue_length) {
if (kFetchDebug) {
printf("fetching queue %s\n", name);
@@ -105,9 +114,9 @@
if (mutex_lock(&global_core->mem_struct->queues.alloc_lock) != 0) {
return NULL;
}
- Queue *current = static_cast<Queue *>(
+ RawQueue *current = static_cast<RawQueue *>(
global_core->mem_struct->queues.queue_list);
- Queue *last = NULL;
+ RawQueue *last = NULL;
while (current != NULL) {
// if we found a matching queue
if (strcmp(current->name_, name) == 0 && current->length_ == length &&
@@ -123,8 +132,8 @@
current = current->next_;
}
- void *temp = shm_malloc(sizeof(Queue));
- current = new (temp) Queue(name, length, hash, queue_length);
+ void *temp = shm_malloc(sizeof(RawQueue));
+ current = new (temp) RawQueue(name, length, hash, queue_length);
if (last == NULL) { // if we don't have one to tack the new one on to
global_core->mem_struct->queues.queue_list = current;
} else {
@@ -134,10 +143,10 @@
mutex_unlock(&global_core->mem_struct->queues.alloc_lock);
return current;
}
-Queue *Queue::Fetch(const char *name, size_t length, int hash,
+RawQueue *RawQueue::Fetch(const char *name, size_t length, int hash,
int queue_length,
- int recycle_hash, int recycle_length, Queue **recycle) {
- Queue *r = Fetch(name, length, hash, queue_length);
+ int recycle_hash, int recycle_length, RawQueue **recycle) {
+ RawQueue *r = Fetch(name, length, hash, queue_length);
r->recycle_ = Fetch(name, length, recycle_hash, recycle_length);
if (r == r->recycle_) {
fprintf(stderr, "queue: r->recycle_(=%p) == r(=%p)\n", r->recycle_, r);
@@ -148,7 +157,7 @@
return r;
}
-void Queue::DoFreeMessage(const void *msg) {
+void RawQueue::DoFreeMessage(const void *msg) {
MessageHeader *header = MessageHeader::Get(msg);
if (pool_[header->index] != header) { // if something's messed up
fprintf(stderr, "queue: something is very very wrong with queue %p."
@@ -206,13 +215,13 @@
}
}
-bool Queue::WriteMessage(void *msg, int options) {
+bool RawQueue::WriteMessage(void *msg, int options) {
if (kWriteDebug) {
printf("queue: %p->WriteMessage(%p, %d)\n", this, msg, options);
}
if (msg == NULL || msg < reinterpret_cast<void *>(global_core->mem_struct) ||
msg > static_cast<void *>((
- reinterpret_cast<uintptr_t>(global_core->mem_struct) +
+ reinterpret_cast<char *>(global_core->mem_struct) +
global_core->size))) {
fprintf(stderr, "queue: attempt to write bad message %p to %p. aborting\n",
msg, this);
@@ -239,7 +248,7 @@
if (kWriteDebug) {
printf("queue: going to wait for writable_ of %p\n", this);
}
- writable_.Wait(&data_lock_);
+ writable_.Wait();
}
new_end = (data_end_ + 1) % data_length_;
}
@@ -257,12 +266,12 @@
return true;
}
-void Queue::ReadCommonEnd(bool read) {
+void RawQueue::ReadCommonEnd(bool read) {
if (read) {
writable_.Signal();
}
}
-bool Queue::ReadCommonStart(int options, int *index) {
+bool RawQueue::ReadCommonStart(int options, int *index) {
while (data_start_ == data_end_ || ((index != NULL) && messages_ <= *index)) {
if (options & kNonBlock) {
if (kReadDebug) {
@@ -287,7 +296,7 @@
}
return true;
}
-void *Queue::ReadPeek(int options, int start) {
+void *RawQueue::ReadPeek(int options, int start) {
void *ret;
if (options & kFromEnd) {
int pos = data_end_ - 1;
@@ -311,7 +320,7 @@
}
return ret;
}
-const void *Queue::ReadMessage(int options) {
+const void *RawQueue::ReadMessage(int options) {
if (kReadDebug) {
printf("queue: %p->ReadMessage(%d)\n", this, options);
}
@@ -359,7 +368,7 @@
}
return msg;
}
-const void *Queue::ReadMessageIndex(int options, int *index) {
+const void *RawQueue::ReadMessageIndex(int options, int *index) {
if (kReadDebug) {
printf("queue: %p->ReadMessageIndex(%d, %p(*=%d))\n",
this, options, index, *index);
@@ -421,14 +430,14 @@
return msg;
}
-void *Queue::GetMessage() {
+void *RawQueue::GetMessage() {
MutexLocker locker(&pool_lock_);
MessageHeader *header;
if (pool_length_ - messages_used_ > 0) {
header = pool_[messages_used_];
} else {
if (pool_length_ >= mem_length_) {
- LOG(FATAL, "overused pool %p from queue %p\n", pool, queue);
+ LOG(FATAL, "overused pool of queue %p\n", this);
}
header = pool_[pool_length_] =
static_cast<MessageHeader *>(shm_malloc(msg_length_));