copying over queue-api-redo branch from my 2012 repo
This commit is not cleanly separated from the next one where I got
everything to compile again (and possibly even further on from there).
diff --git a/aos/atom_code/camera/Buffers.cpp b/aos/atom_code/camera/Buffers.cpp
index f580f89..bf16915 100644
--- a/aos/atom_code/camera/Buffers.cpp
+++ b/aos/atom_code/camera/Buffers.cpp
@@ -15,7 +15,6 @@
};
const std::string Buffers::kFDServerName("/tmp/aos_fd_server");
const std::string Buffers::kQueueName("CameraBufferQueue");
-const aos_type_sig Buffers::kSignature{sizeof(Message), 971, 1};
int Buffers::CreateSocket(int (*bind_connect)(int, const sockaddr *, socklen_t)) {
union af_unix_sockaddr {
@@ -66,7 +65,7 @@
void Buffers::Release() {
if (message_ != NULL) {
- aos_queue_free_msg(queue_, message_);
+ queue_->FreeMessage(message_);
message_ = NULL;
}
}
@@ -77,11 +76,12 @@
// TODO(brians) make sure the camera reader process hasn't died
do {
if (block) {
- message_ = static_cast<const Message *>(aos_queue_read_msg(queue_, PEEK | BLOCK));
+ message_ = static_cast<const Message *>(queue_->ReadMessage(
+ Queue::kPeek | Queue::kBlock));
} else {
static int index = 0;
- message_ = static_cast<const Message *>(aos_queue_read_msg_index(queue_, BLOCK,
- &index));
+ message_ = static_cast<const Message *>(queue_->ReadMessageIndex(
+ Queue::kBlock, &index));
}
} while (block && message_ == NULL);
if (message_ != NULL) {
@@ -137,7 +137,7 @@
}
Buffers::Buffers() : server_(CreateSocket(connect)), fd_(FetchFD()), message_(NULL) {
MMap();
- queue_ = aos_fetch_queue(kQueueName.c_str(), &kSignature);
+ queue_ = Queue::Fetch(kQueueName.c_str(), sizeof(Message), 971, 1);
}
Buffers::~Buffers() {
@@ -157,6 +157,5 @@
}
}
-} // namespace camera
-} // namespace aos
-
+} // namespace camera
+} // namespace aos
diff --git a/aos/atom_code/camera/Buffers.h b/aos/atom_code/camera/Buffers.h
index 6b54188..07177bc 100644
--- a/aos/atom_code/camera/Buffers.h
+++ b/aos/atom_code/camera/Buffers.h
@@ -53,9 +53,8 @@
// The current one. Sometimes NULL.
const Message *message_;
static const std::string kQueueName;
- static const aos_type_sig kSignature;
// NULL for the Reader one.
- aos_queue *queue_;
+ Queue *queue_;
// Make the actual mmap calls.
// Called by Buffers() automatically.
void MMap();
diff --git a/aos/atom_code/camera/Reader.cpp b/aos/atom_code/camera/Reader.cpp
index 5f30cfe..c87d173 100644
--- a/aos/atom_code/camera/Reader.cpp
+++ b/aos/atom_code/camera/Reader.cpp
@@ -31,8 +31,7 @@
// the bound socket listening for fd requests
int server_fd_;
- static const aos_type_sig kRecycleSignature;
- aos_queue *queue_, *recycle_queue_;
+ Queue *queue_, *recycle_queue_;
// the number of buffers currently queued in v4l2
uint32_t queued_;
public:
@@ -52,10 +51,11 @@
dev_name, errno, strerror(errno));
}
- queue_ = aos_fetch_queue_recycle(Buffers::kQueueName.c_str(), &Buffers::kSignature,
- &kRecycleSignature, &recycle_queue_);
+ queue_ = Queue::Fetch(Buffers::kQueueName.c_str(),
+ sizeof(Buffers::Message), 971, 1,
+ 1, Buffers::kNumBuffers, &recycle_queue_);
// read off any existing recycled messages
- while (aos_queue_read_msg(recycle_queue_, NON_BLOCK) != NULL);
+ while (recycle_queue_->ReadMessage(Queue::kNonBlock) != NULL);
queued_ = 0;
InitServer();
@@ -140,10 +140,11 @@
read = static_cast<const Buffers::Message *>(
// we block waiting for one if we can't dequeue one without leaving
// the driver <= 2 (to be safe)
- aos_queue_read_msg(recycle_queue_, (queued_ <= 2) ? BLOCK : NON_BLOCK));
+ recycle_queue_->ReadMessage((queued_ <= 2) ?
+ Queue::kBlock : Queue::kNonBlock));
if (read != NULL) {
buf.index = read->index;
- aos_queue_free_msg(recycle_queue_, read);
+ recycle_queue_->FreeMessage(read);
QueueBuffer(&buf);
}
} while (read != NULL);
@@ -163,7 +164,7 @@
}
Buffers::Message *const msg = static_cast<Buffers::Message *>(
- aos_queue_get_msg(queue_));
+ queue_->GetMessage());
if (msg == NULL) {
LOG(WARNING,
"couldn't get a message to send buf #%" PRIu32 " from queue %p."
@@ -175,7 +176,7 @@
msg->bytesused = buf.bytesused;
memcpy(&msg->timestamp, &buf.timestamp, sizeof(msg->timestamp));
msg->sequence = buf.sequence;
- if (aos_queue_write_msg_free(queue_, msg, OVERRIDE) == -1) {
+ if (!queue->WriteMessage(msg, Queue::kOverride)) {
LOG(WARNING,
"sending message %p with buf #%" PRIu32 " to queue %p failed."
" re-queueing now\n", msg, buf.index, queue_);
@@ -405,8 +406,6 @@
}
};
const char *const Reader::dev_name = "/dev/video0";
-const aos_type_sig Reader::kRecycleSignature{
- sizeof(Buffers::Message), 1, Buffers::kNumBuffers};
} // namespace camera
} // namespace aos
diff --git a/aos/atom_code/core/LogStreamer.cpp b/aos/atom_code/core/LogStreamer.cpp
index d3d4928..1294128 100644
--- a/aos/atom_code/core/LogStreamer.cpp
+++ b/aos/atom_code/core/LogStreamer.cpp
@@ -31,7 +31,7 @@
int index = 0;
while (true) {
- const LogMessage *const msg = ReadNext(BLOCK, &index);
+ const LogMessage *const msg = ReadNext(Queue::kBlock, &index);
if (msg == NULL) continue;
internal::PrintMessage(stdout, *msg);
diff --git a/aos/atom_code/ipc_lib/core_lib.c b/aos/atom_code/ipc_lib/core_lib.c
index 53587d8..26581a1 100644
--- a/aos/atom_code/ipc_lib/core_lib.c
+++ b/aos/atom_code/ipc_lib/core_lib.c
@@ -6,7 +6,6 @@
void init_shared_mem_core(aos_shm_core *shm_core) {
clock_gettime(CLOCK_REALTIME, &shm_core->identifier);
- shm_core->queues.alloc_flag = 0;
shm_core->msg_alloc_lock = 0;
shm_core->queues.queue_list = NULL;
shm_core->queues.alloc_lock = 0;
diff --git a/aos/atom_code/ipc_lib/core_lib.h b/aos/atom_code/ipc_lib/core_lib.h
index f72ae4c..233660b 100644
--- a/aos/atom_code/ipc_lib/core_lib.h
+++ b/aos/atom_code/ipc_lib/core_lib.h
@@ -18,12 +18,10 @@
extern "C" {
#endif // __cplusplus
-struct aos_queue_list_t;
-typedef struct aos_queue_hash_t {
- int alloc_flag;
+typedef struct aos_queue_global_t {
mutex alloc_lock;
- struct aos_queue_list_t *queue_list;
-} aos_queue_hash;
+ void *queue_list; // an aos::Queue* declared in C code
+} aos_queue_global;
typedef struct aos_shm_core_t {
// clock_gettime(CLOCK_REALTIME, &identifier) gets called to identify
@@ -34,7 +32,7 @@
mutex creation_condition;
mutex msg_alloc_lock;
void *msg_alloc;
- aos_queue_hash queues;
+ aos_queue_global queues;
} aos_shm_core;
void init_shared_mem_core(aos_shm_core *shm_core);
diff --git a/aos/atom_code/ipc_lib/queue.c b/aos/atom_code/ipc_lib/queue.c
deleted file mode 100644
index 6c1ce71..0000000
--- a/aos/atom_code/ipc_lib/queue.c
+++ /dev/null
@@ -1,478 +0,0 @@
-#include "aos/atom_code/ipc_lib/queue.h"
-#include "aos/atom_code/ipc_lib/queue_internal.h"
-
-#include <stdio.h>
-#include <string.h>
-#include <errno.h>
-#include <assert.h>
-
-#include "aos/common/logging/logging.h"
-
-#define READ_DEBUG 0
-#define WRITE_DEBUG 0
-#define REF_DEBUG 0
-
-static inline aos_msg_header *get_header(void *msg) {
- return (aos_msg_header *)((uint8_t *)msg - sizeof(aos_msg_header));
-}
-static inline aos_queue *aos_core_alloc_queue() {
- return shm_malloc_aligned(sizeof(aos_queue), sizeof(int));
-}
-static inline void *aos_alloc_msg(aos_msg_pool *pool) {
- return shm_malloc(pool->msg_length);
-}
-
-// actually free the given message
-static inline int aos_free_msg(aos_msg_pool *pool, void *msg, aos_queue *queue) {
-#if REF_DEBUG
- if (pool->pool_lock == 0) {
- //LOG(WARNING, "unprotected\n");
- }
-#endif
- aos_msg_header *header = get_header(msg);
- if (pool->pool[header->index] != header) { // if something's messed up
- fprintf(stderr, "queue: something is very very wrong with queue %p."
- " pool->pool(=%p)[header->index(=%d)] != header(=%p)\n",
- queue, pool->pool, header->index, header);
- printf("queue: see stderr\n");
- abort();
- }
-#if REF_DEBUG
- printf("ref free: %p\n", msg);
-#endif
- --pool->used;
-
- if (queue->recycle != NULL) {
- void *const new_msg = aos_queue_get_msg(queue->recycle);
- if (new_msg == NULL) {
- fprintf(stderr, "queue: couldn't get a message"
- " for recycle queue %p\n", queue->recycle);
- } else {
- // Take a message from recycle_queue and switch its
- // header with the one being freed, which effectively
- // switches which queue each message belongs to.
- aos_msg_header *const new_header = get_header(new_msg);
- // also switch the messages between the pools
- pool->pool[header->index] = new_header;
- if (mutex_lock(&queue->recycle->pool.pool_lock)) {
- return -1;
- }
- queue->recycle->pool.pool[new_header->index] = header;
- // swap the information in both headers
- header_swap(header, new_header);
- // don't unlock the other pool until all of its messages are valid
- mutex_unlock(&queue->recycle->pool.pool_lock);
- // use the header for new_msg which is now for this pool
- header = new_header;
- if (aos_queue_write_msg_free(queue->recycle,
- (void *)msg, OVERRIDE) != 0) {
- printf("queue: warning aos_queue_write_msg("
- "%p(=queue(=%p)->recycle), %p, OVERRIDE)"
- " failed\n",
- queue->recycle, queue, msg);
- }
- msg = new_msg;
- }
- }
-
- // where the one we're freeing was
- int index = header->index;
- header->index = -1;
- if (index != pool->used) { // if we're not freeing the one on the end
- // put the last one where the one we're freeing was
- header = pool->pool[index] = pool->pool[pool->used];
- // put the one we're freeing at the end
- pool->pool[pool->used] = get_header(msg);
- // update the former last one's index
- header->index = index;
- }
- return 0;
-}
-// TODO(brians) maybe do this with atomic integer instructions so it doesn't have to lock/unlock pool_lock
-static inline int msg_ref_dec(void *msg, aos_msg_pool *pool, aos_queue *queue) {
- if (msg == NULL) {
- return 0;
- }
-
- int rv = 0;
- if (mutex_lock(&pool->pool_lock)) {
- return -1;
- }
- aos_msg_header *const header = get_header(msg);
- header->ref_count --;
- assert(header->ref_count >= 0);
-#if REF_DEBUG
- printf("ref_dec_count: %p count=%d\n", msg, header->ref_count);
-#endif
- if (header->ref_count == 0) {
- rv = aos_free_msg(pool, msg, queue);
- }
- mutex_unlock(&pool->pool_lock);
- return rv;
-}
-
-static inline int sigcmp(const aos_type_sig *sig1, const aos_type_sig *sig2) {
- if (sig1->length != sig2->length) {
- //LOG(DEBUG, "length mismatch 1=%d 2=%d\n", sig1->length, sig2->length);
- return 0;
- }
- if (sig1->queue_length != sig2->queue_length) {
- //LOG(DEBUG, "queue_length mismatch 1=%d 2=%d\n", sig1->queue_length, sig2->queue_length);
- return 0;
- }
- if (sig1->hash != sig2->hash) {
- //LOG(DEBUG, "hash mismatch 1=%d 2=%d\n", sig1->hash, sig2->hash);
- return 0;
- }
- //LOG(DEBUG, "signature match\n");
- return 1;
-}
-static inline aos_queue *aos_create_queue(const aos_type_sig *sig) {
- aos_queue *const queue = aos_core_alloc_queue();
- aos_msg_pool *const pool = &queue->pool;
- pool->mem_length = sig->queue_length + EXTRA_MESSAGES;
- pool->length = 0;
- pool->used = 0;
- pool->msg_length = sig->length + sizeof(aos_msg_header);
- pool->pool = shm_malloc(sizeof(void *) * pool->mem_length);
- aos_ring_buf *const buf = &queue->buf;
- buf->length = sig->queue_length + 1;
- if (buf->length < 2) { // TODO(brians) when could this happen?
- buf->length = 2;
- }
- buf->data = shm_malloc(buf->length * sizeof(void *));
- buf->start = 0;
- buf->end = 0;
- buf->msgs = 0;
- memset(&buf->writable, 0, sizeof(buf->writable));
- memset(&buf->readable, 0, sizeof(buf->readable));
- buf->buff_lock = 0;
- pool->pool_lock = 0;
- queue->recycle = NULL;
- return queue;
-}
-aos_queue *aos_fetch_queue(const char *name, const aos_type_sig *sig) {
- //LOG(DEBUG, "Fetching the stupid queue: %s\n", name);
- mutex_grab(&global_core->mem_struct->queues.alloc_lock);
- aos_queue_list *list = global_core->mem_struct->queues.queue_list;
- aos_queue_list *last = NULL;
- while (list != NULL) {
- // if we found a matching queue
- if (strcmp(list->name, name) == 0 && sigcmp(&list->sig, sig)) {
- mutex_unlock(&global_core->mem_struct->queues.alloc_lock);
- return list->queue;
- } else {
- //LOG(DEBUG, "rejected queue %s strcmp=%d target=%s\n", (*list)->name, strcmp((*list)->name, name), name);
- }
- last = list;
- list = list->next;
- }
- list = shm_malloc(sizeof(aos_queue_list));
- if (last == NULL) {
- global_core->mem_struct->queues.queue_list = list;
- } else {
- last->next = list;
- }
- list->sig = *sig;
- const size_t name_size = strlen(name) + 1;
- list->name = shm_malloc(name_size);
- memcpy(list->name, name, name_size);
- //LOG(INFO, "creating queue{name=%s, sig.length=%zd, sig.hash=%d, sig.queue_length=%d}\n", name, sig->length, sig->hash, sig->queue_length);
- list->queue = aos_create_queue(sig);
- //LOG(DEBUG, "Made the stupid queue: %s happy?\n", name);
- list->next = NULL;
- mutex_unlock(&global_core->mem_struct->queues.alloc_lock);
- return list->queue;
-}
-aos_queue *aos_fetch_queue_recycle(const char *name, const aos_type_sig *sig,
- const aos_type_sig *recycle_sig, aos_queue **recycle) {
- if (sig->length != recycle_sig->length || sig->hash == recycle_sig->hash) {
- *recycle = NULL;
- return NULL;
- }
- aos_queue *const r = aos_fetch_queue(name, sig);
- r->recycle = aos_fetch_queue(name, recycle_sig);
- if (r == r->recycle) {
- fprintf(stderr, "queue: r->recycle(=%p) == r(=%p)\n", r->recycle, r);
- printf("see stderr\n");
- abort();
- }
- *recycle = r->recycle;
- return r;
-}
-
-int aos_queue_write_msg(aos_queue *queue, void *msg, int opts) {
-#if WRITE_DEBUG
- printf("queue: write_msg(%p, %p, %d)\n", queue, msg, opts);
-#endif
- int rv = 0;
- if (msg == NULL || msg < (void *)global_core->mem_struct ||
- msg > (void *)((intptr_t)global_core->mem_struct + global_core->size)) {
- fprintf(stderr, "queue: attempt to write bad message %p to %p. aborting\n",
- msg, queue);
- printf("see stderr\n");
- abort();
- }
- aos_ring_buf *const buf = &queue->buf;
- if (mutex_lock(&buf->buff_lock)) {
-#if WRITE_DEBUG
- printf("queue: locking buff_lock of %p failed\n", buf);
-#endif
- return -1;
- }
- int new_end = (buf->end + 1) % buf->length;
- while (new_end == buf->start) {
- if (opts & NON_BLOCK) {
-#if WRITE_DEBUG
- printf("queue: not blocking on %p. returning -1\n", queue);
-#endif
- mutex_unlock(&buf->buff_lock);
- return -1;
- } else if (opts & OVERRIDE) {
-#if WRITE_DEBUG
- printf("queue: overriding on %p\n", queue);
-#endif
- // avoid leaking the message that we're going to overwrite
- msg_ref_dec(buf->data[buf->start], &queue->pool, queue);
- buf->start = (buf->start + 1) % buf->length;
- } else { // BLOCK
-#if WRITE_DEBUG
- printf("queue: going to wait for writable(=%p) of %p\n",
- &buf->writable, queue);
-#endif
- condition_wait(&buf->writable, &buf->buff_lock);
-#if WRITE_DEBUG
- printf("queue: done waiting for writable(=%p) of %p\n",
- &buf->writable, queue);
-#endif
- }
- new_end = (buf->end + 1) % buf->length;
- }
- buf->data[buf->end] = msg;
- ++buf->msgs;
- buf->end = new_end;
- mutex_unlock(&buf->buff_lock);
-#if WRITE_DEBUG
- printf("queue: setting readable(=%p) of %p\n", &buf->readable, queue);
-#endif
- condition_signal(&buf->readable);
-#if WRITE_DEBUG
- printf("queue: write returning %d on queue %p\n", rv, queue);
-#endif
- return rv;
-}
-
-int aos_queue_free_msg(aos_queue *queue, const void *msg) {
- // TODO(brians) get rid of this
- void *msg_temp;
- memcpy(&msg_temp, &msg, sizeof(msg_temp));
- return msg_ref_dec(msg_temp, &queue->pool, queue);
-}
-// Deals with setting/unsetting readable and writable.
-// Should be called after buff_lock has been unlocked.
-// read is whether or not this read call read one off the queue
-static inline void aos_read_msg_common_end(aos_ring_buf *const buf, int read) {
- if (read) {
- condition_signal(&buf->writable);
- }
-}
-// Returns with buff_lock locked and a readable message in buf.
-// Returns -1 for error (if it returns -1, buff_lock will be unlocked).
-static inline int aos_queue_read_msg_common(int opts, aos_ring_buf *const buf,
- aos_queue *const queue, int *index) {
-#if !READ_DEBUG
- (void)queue;
-#endif
- if (mutex_lock(&buf->buff_lock)) {
-#if READ_DEBUG
- printf("queue: couldn't lock buff_lock of %p\n", queue);
-#endif
- return -1;
- }
- while (buf->start == buf->end || ((index != NULL) && buf->msgs <= *index)) {
- if (opts & NON_BLOCK) {
- mutex_unlock(&buf->buff_lock);
-#if READ_DEBUG
- printf("queue: not going to block waiting on %p\n", queue);
-#endif
- return -1;
- } else { // BLOCK
-#if READ_DEBUG
- printf("queue: going to wait for readable(=%p) of %p\n",
- &buf->readable, queue);
-#endif
- // wait for a message to become readable
- condition_wait(&buf->readable, &buf->buff_lock);
-#if READ_DEBUG
- printf("queue: done waiting for readable(=%p) of %p\n",
- &buf->readable, queue);
-#endif
- }
- }
-#if READ_DEBUG
- printf("queue: read start=%d end=%d from %p\n", buf->start, buf->end, queue);
-#endif
- return 0;
-}
-// handles reading with PEEK
-static inline void *read_msg_peek(aos_ring_buf *const buf, int opts, int start) {
- void *ret;
- if (opts & FROM_END) {
- int pos = buf->end - 1;
- if (pos < 0) { // if it needs to wrap
- pos = buf->length - 1;
- }
-#if READ_DEBUG
- printf("queue: reading from line %d: %d\n", __LINE__, pos);
-#endif
- ret = buf->data[pos];
- } else {
-#if READ_DEBUG
- printf("queue: reading from line %d: %d\n", __LINE__, start);
-#endif
- ret = buf->data[start];
- }
- aos_msg_header *const header = get_header(ret);
- header->ref_count ++;
-#if REF_DEBUG
- printf("ref inc count: %p\n", ret);
-#endif
- return ret;
-}
-const void *aos_queue_read_msg(aos_queue *queue, int opts) {
-#if READ_DEBUG
- printf("queue: read_msg(%p, %d)\n", queue, opts);
-#endif
- void *msg = NULL;
- aos_ring_buf *const buf = &queue->buf;
- if (aos_queue_read_msg_common(opts, buf, queue, NULL) == -1) {
-#if READ_DEBUG
- printf("queue: common returned -1 for %p\n", queue);
-#endif
- return NULL;
- }
- if (opts & PEEK) {
- msg = read_msg_peek(buf, opts, buf->start);
- } else {
- if (opts & FROM_END) {
- while (1) {
-#if READ_DEBUG
- printf("queue: start of c2 of %p\n", queue);
-#endif
- // This loop pulls each message out of the buffer.
- const int pos = buf->start;
- buf->start = (buf->start + 1) % buf->length;
- // if this is the last one
- if (buf->start == buf->end) {
-#if READ_DEBUG
- printf("queue: reading from c2: %d\n", pos);
-#endif
- msg = buf->data[pos];
- break;
- }
- // it's not going to be in the queue any more
- msg_ref_dec(buf->data[pos], &queue->pool, queue);
- }
- } else {
-#if READ_DEBUG
- printf("queue: reading from d2: %d\n", buf->start);
-#endif
- msg = buf->data[buf->start];
- buf->start = (buf->start + 1) % buf->length;
- }
- }
- mutex_unlock(&buf->buff_lock);
- aos_read_msg_common_end(buf, !(opts & PEEK));
-#if READ_DEBUG
- printf("queue: read returning %p\n", msg);
-#endif
- return msg;
-}
-const void *aos_queue_read_msg_index(aos_queue *queue, int opts, int *index) {
-#if READ_DEBUG
- printf("queue: read_msg_index(%p, %d, %p(*=%d))\n", queue, opts, index, *index);
-#endif
- void *msg = NULL;
- aos_ring_buf *const buf = &queue->buf;
- if (aos_queue_read_msg_common(opts, buf, queue, index) == -1) {
-#if READ_DEBUG
- printf("queue: common returned -1\n");
-#endif
- return NULL;
- }
- // TODO(parker): Handle integer wrap on the index.
- const int offset = buf->msgs - *index;
- int my_start = buf->end - offset;
- if (offset >= buf->length) { // if we're behind the available messages
- // catch index up to the last available message
- *index += buf->start - my_start;
- // and that's the one we're going to read
- my_start = buf->start;
- }
- if (my_start < 0) { // if we want to read off the end of the buffer
- // unwrap where we're going to read from
- my_start += buf->length;
- }
- if (opts & PEEK) {
- msg = read_msg_peek(buf, opts, my_start);
- } else {
- if (opts & FROM_END) {
-#if READ_DEBUG
- printf("queue: start of c1 of %p\n", queue);
-#endif
- int pos = buf->end - 1;
- if (pos < 0) { // if it wrapped
- pos = buf->length - 1; // unwrap it
- }
-#if READ_DEBUG
- printf("queue: reading from c1: %d\n", pos);
-#endif
- msg = buf->data[pos];
- *index = buf->msgs;
- } else {
-#if READ_DEBUG
- printf("queue: reading from d1: %d\n", my_start);
-#endif
- msg = buf->data[my_start];
- ++(*index);
- }
- aos_msg_header *const header = get_header(msg);
- ++header->ref_count;
-#if REF_DEBUG
- printf("ref_inc_count: %p\n", msg);
-#endif
- }
- mutex_unlock(&buf->buff_lock);
- // this function never consumes one off the queue
- aos_read_msg_common_end(buf, 0);
- return msg;
-}
-static inline void *aos_pool_get_msg(aos_msg_pool *pool) {
- if (mutex_lock(&pool->pool_lock)) {
- return NULL;
- }
- void *msg;
- if (pool->length - pool->used > 0) {
- msg = pool->pool[pool->used];
- } else {
- if (pool->length >= pool->mem_length) {
- LOG(FATAL, "overused pool %p\n", pool);
- }
- msg = pool->pool[pool->length] = aos_alloc_msg(pool);
- ++pool->length;
- }
- aos_msg_header *const header = msg;
- msg = (uint8_t *)msg + sizeof(aos_msg_header);
- header->ref_count = 1;
-#if REF_DEBUG
- printf("ref alloc: %p\n", msg);
-#endif
- header->index = pool->used;
- ++pool->used;
- mutex_unlock(&pool->pool_lock);
- return msg;
-}
-void *aos_queue_get_msg(aos_queue *queue) {
- return aos_pool_get_msg(&queue->pool);
-}
-
diff --git a/aos/atom_code/ipc_lib/queue.cc b/aos/atom_code/ipc_lib/queue.cc
new file mode 100644
index 0000000..57e2a5e
--- /dev/null
+++ b/aos/atom_code/ipc_lib/queue.cc
@@ -0,0 +1,447 @@
+#include "aos/common/queue.h"
+
+#include <stdio.h>
+#include <string.h>
+#include <errno.h>
+#include <assert.h>
+
+#include <memory>
+
+#include "aos/common/logging/logging.h"
+#include "aos/common/type_traits.h"
+
+namespace aos {
+
+namespace {
+
+static_assert(shm_ok<Queue>::value, "Queue instances go into shared memory");
+
+const bool kReadDebug = false;
+const bool kWriteDebug = false;
+const bool kRefDebug = false;
+const bool kFetchDebug = false;
+
+// The number of extra messages the pool associated with each queue will be able
+// to hold (for readers who are slow about freeing them).
+const int kExtraMessages = 20;
+
+} // namespace
+
+struct Queue::MessageHeader {
+ int ref_count;
+ int index; // in pool_
+ static MessageHeader *Get(const void *msg) {
+ return reinterpret_cast<MessageHeader *>(
+ static_cast<uint8_t *>(const_cast<void *>(msg)) -
+ sizeof(MessageHeader));
+ }
+ void Swap(MessageHeader *other) {
+ MessageHeader temp;
+ memcpy(&temp, other, sizeof(temp));
+ memcpy(other, this, sizeof(*other));
+ memcpy(this, &temp, sizeof(*this));
+ }
+};
+static_assert(shm_ok<Queue::MessageHeader>::value, "the whole point"
+ " is to stick it in shared memory");
+
+// TODO(brians) maybe do this with atomic integer instructions so it doesn't
+// have to lock/unlock pool_lock_
+void Queue::DecrementMessageReferenceCount(const void *msg) {
+ MutexLocker locker(&pool_lock_);
+ MessageHeader *header = MessageHeader::Get(msg);
+ --header->ref_count;
+ assert(header->ref_count >= 0);
+ if (kRefDebug) {
+ printf("ref_dec_count: %p count=%d\n", msg, header->ref_count);
+ }
+ if (header->ref_count == 0) {
+ DoFreeMessage(msg);
+ }
+}
+
+Queue::Queue(const char *name, size_t length, int hash, int queue_length) {
+ const size_t name_size = strlen(name) + 1;
+ char *temp = static_cast<char *>(shm_malloc(name_size));
+ memcpy(temp, name, name_size);
+ name_ = temp;
+ length_ = length;
+ hash_ = hash;
+ queue_length_ = queue_length;
+
+ next_ = NULL;
+ recycle_ = NULL;
+
+ if (kFetchDebug) {
+ printf("initializing name=%s, length=%zd, hash=%d, queue_length=%d\n",
+ name, length, hash, queue_length);
+ }
+
+ data_length_ = queue_length + 1;
+ if (data_length_ < 2) { // TODO(brians) when could this happen?
+ data_length_ = 2;
+ }
+ data_ = static_cast<void **>(shm_malloc(sizeof(void *) * data_length_));
+ data_start_ = 0;
+ data_end_ = 0;
+ messages_ = 0;
+
+ mem_length_ = queue_length + kExtraMessages;
+ pool_length_ = 0;
+ messages_used_ = 0;
+ msg_length_ = length + sizeof(MessageHeader);
+ pool_ = static_cast<MessageHeader **>(
+ shm_malloc(sizeof(MessageHeader *) * mem_length_));
+
+ if (kFetchDebug) {
+ printf("made queue %s\n", name);
+ }
+}
+Queue *Queue::Fetch(const char *name, size_t length, int hash,
+ int queue_length) {
+ if (kFetchDebug) {
+ printf("fetching queue %s\n", name);
+ }
+ if (mutex_lock(&global_core->mem_struct->queues.alloc_lock) != 0) {
+ return NULL;
+ }
+ Queue *current = static_cast<Queue *>(
+ global_core->mem_struct->queues.queue_list);
+ Queue *last = NULL;
+ while (current != NULL) {
+ // if we found a matching queue
+ if (strcmp(current->name_, name) == 0 && current->length_ == length &&
+ current->hash_ == hash && current->queue_length_ == queue_length) {
+ mutex_unlock(&global_core->mem_struct->queues.alloc_lock);
+ return current;
+ } else {
+ if (kFetchDebug) {
+ printf("rejected queue %s strcmp=%d target=%s\n", current->name_,
+ strcmp(current->name_, name), name);
+ }
+ }
+ current = current->next_;
+ }
+
+ void *temp = shm_malloc(sizeof(Queue));
+ current = new (temp) Queue(name, length, hash, queue_length);
+ if (last == NULL) { // if we don't have one to tack the new one on to
+ global_core->mem_struct->queues.queue_list = current;
+ } else {
+ last->next_ = current;
+ }
+
+ mutex_unlock(&global_core->mem_struct->queues.alloc_lock);
+ return current;
+}
+Queue *Queue::Fetch(const char *name, size_t length, int hash,
+ int queue_length,
+ int recycle_hash, int recycle_length, Queue **recycle) {
+ Queue *r = Fetch(name, length, hash, queue_length);
+ r->recycle_ = Fetch(name, length, recycle_hash, recycle_length);
+ if (r == r->recycle_) {
+ fprintf(stderr, "queue: r->recycle_(=%p) == r(=%p)\n", r->recycle_, r);
+ printf("see stderr\n");
+ abort();
+ }
+ *recycle = r->recycle_;
+ return r;
+}
+
+void Queue::DoFreeMessage(const void *msg) {
+ MessageHeader *header = MessageHeader::Get(msg);
+ if (pool_[header->index] != header) { // if something's messed up
+ fprintf(stderr, "queue: something is very very wrong with queue %p."
+ " pool_(=%p)[header->index(=%d)] != header(=%p)\n",
+ this, pool_, header->index, header);
+ printf("queue: see stderr\n");
+ abort();
+ }
+ if (kRefDebug) {
+ printf("ref free: %p\n", msg);
+ }
+ --messages_used_;
+
+ if (recycle_ != NULL) {
+ void *const new_msg = recycle_->GetMessage();
+ if (new_msg == NULL) {
+ fprintf(stderr, "queue: couldn't get a message"
+ " for recycle queue %p\n", recycle_);
+ } else {
+ // Take a message from recycle_ and switch its
+ // header with the one being freed, which effectively
+ // switches which queue each message belongs to.
+ MessageHeader *const new_header = MessageHeader::Get(new_msg);
+ // also switch the messages between the pools
+ pool_[header->index] = new_header;
+ {
+ MutexLocker locker(&recycle_->pool_lock_);
+ recycle_->pool_[new_header->index] = header;
+ // swap the information in both headers
+ header->Swap(new_header);
+ // don't unlock the other pool until all of its messages are valid
+ }
+ // use the header for new_msg which is now for this pool
+ header = new_header;
+ if (!recycle_->WriteMessage(const_cast<void *>(msg), kOverride)) {
+ fprintf(stderr, "queue: %p->WriteMessage(%p, kOverride) failed."
+ " aborting\n", recycle_, msg);
+ printf("see stderr\n");
+ abort();
+ }
+ msg = new_msg;
+ }
+ }
+
+ // where the one we're freeing was
+ int index = header->index;
+ header->index = -1;
+ if (index != messages_used_) { // if we're not freeing the one on the end
+ // put the last one where the one we're freeing was
+ header = pool_[index] = pool_[messages_used_];
+ // put the one we're freeing at the end
+ pool_[messages_used_] = MessageHeader::Get(msg);
+ // update the former last one's index
+ header->index = index;
+ }
+}
+
+bool Queue::WriteMessage(void *msg, int options) {
+ if (kWriteDebug) {
+ printf("queue: %p->WriteMessage(%p, %d)\n", this, msg, options);
+ }
+ if (msg == NULL || msg < reinterpret_cast<void *>(global_core->mem_struct) ||
+ msg > static_cast<void *>((
+ reinterpret_cast<uintptr_t>(global_core->mem_struct) +
+ global_core->size))) {
+ fprintf(stderr, "queue: attempt to write bad message %p to %p. aborting\n",
+ msg, this);
+ printf("see stderr\n");
+ abort();
+ }
+ {
+ MutexLocker locker(&data_lock_);
+ int new_end = (data_end_ + 1) % data_length_;
+ while (new_end == data_start_) {
+ if (options & kNonBlock) {
+ if (kWriteDebug) {
+ printf("queue: not blocking on %p. returning -1\n", this);
+ }
+ return false;
+ } else if (options & kOverride) {
+ if (kWriteDebug) {
+ printf("queue: overriding on %p\n", this);
+ }
+ // avoid leaking the message that we're going to overwrite
+ DecrementMessageReferenceCount(data_[data_start_]);
+ data_start_ = (data_start_ + 1) % data_length_;
+ } else { // kBlock
+ if (kWriteDebug) {
+ printf("queue: going to wait for writable_ of %p\n", this);
+ }
+ writable_.Wait(&data_lock_);
+ }
+ new_end = (data_end_ + 1) % data_length_;
+ }
+ data_[data_end_] = msg;
+ ++messages_;
+ data_end_ = new_end;
+ }
+ if (kWriteDebug) {
+ printf("queue: setting readable of %p\n", this);
+ }
+ readable_.Signal();
+ if (kWriteDebug) {
+ printf("queue: write returning true on queue %p\n", this);
+ }
+ return true;
+}
+
+void Queue::ReadCommonEnd(bool read) {
+ if (read) {
+ writable_.Signal();
+ }
+}
+bool Queue::ReadCommonStart(int options, int *index) {
+ while (data_start_ == data_end_ || ((index != NULL) && messages_ <= *index)) {
+ if (options & kNonBlock) {
+ if (kReadDebug) {
+ printf("queue: not going to block waiting on %p\n", this);
+ }
+ return false;
+ } else { // kBlock
+ if (kReadDebug) {
+ printf("queue: going to wait for readable of %p\n", this);
+ }
+ data_lock_.Unlock();
+ // wait for a message to become readable
+ readable_.Wait();
+ if (kReadDebug) {
+ printf("queue: done waiting for readable of %p\n", this);
+ }
+ data_lock_.Lock();
+ }
+ }
+ if (kReadDebug) {
+ printf("queue: %p->read start=%d end=%d\n", this, data_start_, data_end_);
+ }
+ return true;
+}
+void *Queue::ReadPeek(int options, int start) {
+ void *ret;
+ if (options & kFromEnd) {
+ int pos = data_end_ - 1;
+ if (pos < 0) { // if it needs to wrap
+ pos = data_length_ - 1;
+ }
+ if (kReadDebug) {
+ printf("queue: reading from line %d: %d\n", __LINE__, pos);
+ }
+ ret = data_[pos];
+ } else {
+ if (kReadDebug) {
+ printf("queue: reading from line %d: %d\n", __LINE__, start);
+ }
+ ret = data_[start];
+ }
+ MessageHeader *const header = MessageHeader::Get(ret);
+ ++header->ref_count;
+ if (kRefDebug) {
+ printf("ref inc count: %p\n", ret);
+ }
+ return ret;
+}
+const void *Queue::ReadMessage(int options) {
+ if (kReadDebug) {
+ printf("queue: %p->ReadMessage(%d)\n", this, options);
+ }
+ void *msg = NULL;
+ MutexLocker locker(&data_lock_);
+ if (!ReadCommonStart(options, NULL)) {
+ if (kReadDebug) {
+ printf("queue: common returned false for %p\n", this);
+ }
+ return NULL;
+ }
+ if (options & kPeek) {
+ msg = ReadPeek(options, data_start_);
+ } else {
+ if (options & kFromEnd) {
+ while (1) {
+ if (kReadDebug) {
+ printf("queue: start of c2 of %p\n", this);
+ }
+ // This loop pulls each message out of the buffer.
+ const int pos = data_start_;
+ data_start_ = (data_start_ + 1) % data_length_;
+ // if this is the last one
+ if (data_start_ == data_end_) {
+ if (kReadDebug) {
+ printf("queue: reading from c2: %d\n", pos);
+ }
+ msg = data_[pos];
+ break;
+ }
+ // it's not going to be in the queue any more
+ DecrementMessageReferenceCount(data_[pos]);
+ }
+ } else {
+ if (kReadDebug) {
+ printf("queue: reading from d2: %d\n", data_start_);
+ }
+ msg = data_[data_start_];
+ data_start_ = (data_start_ + 1) % data_length_;
+ }
+ }
+ ReadCommonEnd(!(options & kPeek));
+ if (kReadDebug) {
+ printf("queue: read returning %p\n", msg);
+ }
+ return msg;
+}
+const void *Queue::ReadMessageIndex(int options, int *index) {
+ if (kReadDebug) {
+ printf("queue: %p->ReadMessageIndex(%d, %p(*=%d))\n",
+ this, options, index, *index);
+ }
+ void *msg = NULL;
+ {
+ MutexLocker locker(&data_lock_);
+ if (!ReadCommonStart(options, index)) {
+ if (kReadDebug) {
+ printf("queue: common returned false for %p\n", this);
+ }
+ return NULL;
+ }
+ // TODO(parker): Handle integer wrap on the index.
+ const int offset = messages_ - *index;
+ int my_start = data_end_ - offset;
+ if (offset >= data_length_) { // if we're behind the available messages
+ // catch index up to the last available message
+ *index += data_start_ - my_start;
+ // and that's the one we're going to read
+ my_start = data_start_;
+ }
+ if (my_start < 0) { // if we want to read off the end of the buffer
+ // unwrap where we're going to read from
+ my_start += data_length_;
+ }
+ if (options & kPeek) {
+ msg = ReadPeek(options, my_start);
+ } else {
+ if (options & kFromEnd) {
+ if (kReadDebug) {
+ printf("queue: start of c1 of %p\n", this);
+ }
+ int pos = data_end_ - 1;
+ if (pos < 0) { // if it wrapped
+ pos = data_length_ - 1; // unwrap it
+ }
+ if (kReadDebug) {
+ printf("queue: reading from c1: %d\n", pos);
+ }
+ msg = data_[pos];
+ *index = messages_;
+ } else {
+ if (kReadDebug) {
+ printf("queue: reading from d1: %d\n", my_start);
+ }
+ msg = data_[my_start];
+ ++(*index);
+ }
+ MessageHeader *const header = MessageHeader::Get(msg);
+ ++header->ref_count;
+ if (kRefDebug) {
+ printf("ref_inc_count: %p\n", msg);
+ }
+ }
+ }
+ // this function never consumes one off the queue
+ ReadCommonEnd(false);
+ return msg;
+}
+
+void *Queue::GetMessage() {
+ MutexLocker locker(&pool_lock_);
+ MessageHeader *header;
+ if (pool_length_ - messages_used_ > 0) {
+ header = pool_[messages_used_];
+ } else {
+ if (pool_length_ >= mem_length_) {
+ LOG(FATAL, "overused pool %p from queue %p\n", pool, queue);
+ }
+ header = pool_[pool_length_] =
+ static_cast<MessageHeader *>(shm_malloc(msg_length_));
+ ++pool_length_;
+ }
+ void *msg = reinterpret_cast<uint8_t *>(header) + sizeof(MessageHeader);
+ header->ref_count = 1;
+ if (kRefDebug) {
+ printf("ref alloc: %p\n", msg);
+ }
+ header->index = messages_used_;
+ ++messages_used_;
+ return msg;
+}
+
+} // namespace aos
diff --git a/aos/atom_code/ipc_lib/queue.h b/aos/atom_code/ipc_lib/queue.h
index 4c279e1..f7c116b 100644
--- a/aos/atom_code/ipc_lib/queue.h
+++ b/aos/atom_code/ipc_lib/queue.h
@@ -1,134 +1,160 @@
-#ifndef AOS_IPC_LIB_QUEUE_H_
-#define AOS_IPC_LIB_QUEUE_H_
+#ifndef AOS_COMMON_QUEUE_H_
+#define AOS_COMMON_QUEUE_H_
-#include "shared_mem.h"
-#include "aos_sync.h"
+#include "aos/atom_code/ipc_lib/shared_mem.h"
+#include "aos/common/mutex.h"
+#include "aos/common/condition.h"
// TODO(brians) add valgrind client requests to the queue and shared_mem_malloc
// code to make checking for leaks work better
// <http://www.valgrind.org/docs/manual/mc-manual.html#mc-manual.mempools>
// describes how
-#ifdef __cplusplus
-extern "C" {
-#endif
-
-// Queues are the primary way to use shared memory. Basic use consists of
-// initializing an aos_type_sig and then calling aos_fetch_queue on it.
-// This aos_queue* can then be used to get a message and write it or to read a
-// message.
-// Queues (as the name suggests) are a FIFO stack of messages. Each combination
-// of name and aos_type_sig will result in a different queue, which means that
-// if you only recompile some code that uses differently sized messages, it will
-// simply use a different queue than the old code.
-//
// Any pointers returned from these functions can be safely passed to other
// processes because they are all shared memory pointers.
// IMPORTANT: Any message pointer must be passed back in some way
-// (aos_queue_free_msg and aos_queue_write_msg are common ones) or the
+// (FreeMessage and WriteMessage are common ones) or the
// application will leak shared memory.
-// NOTE: Taking a message from read_msg and then passing it to write_msg might
-// work, but it is not guaranteed to.
+// NOTE: Taking a message from ReadMessage and then passing it to WriteMessage
+// might work, but it is not guaranteed to.
-typedef struct aos_type_sig_t {
- size_t length; // sizeof(message)
- int hash; // can differentiate multiple otherwise identical queues
- int queue_length; // how many messages the queue can hold
-} aos_type_sig;
+namespace aos {
-// Structures that are opaque to users (defined in queue_internal.h).
-typedef struct aos_queue_list_t aos_queue_list;
-typedef struct aos_queue_t aos_queue;
+// Queues are the primary way to use shared memory. Basic use consists of
+// calling Queue::Fetch and then reading and/or writing messages.
+// Queues (as the name suggests) are a FIFO stack of messages. Each combination
+// of name and type signature will result in a different queue, which means
+// that if you only recompile some code that uses differently sized messages,
+// it will simply use a different queue than the old code.
+class Queue {
+ public:
+ // Retrieves (and creates if necessary) a queue. Each combination of name and
+ // signature refers to a completely independent queue.
+ // length is how large each message will be
+ // hash can differentiate multiple otherwise identical queues
+ // queue_length is how many messages the queue will be able to hold
+ static Queue *Fetch(const char *name, size_t length, int hash,
+ int queue_length);
+ // Same as above, except sets up the returned queue so that it will put
+ // messages on *recycle when they are freed (after they have been released by
+ // all other readers/writers and are not in the queue).
+ // recycle_queue_length determines how many freed messages will be kept.
+ // Other code can retrieve the 2 queues separately (the recycle queue will
+ // have the same length and hash as the main one). However, any frees made
+ // using a queue with only (name,length,hash,queue_length) before the
+ // recycle queue has been associated with it will not go on to the recycle
+ // queue.
+ // NOTE: calling this function with the same (name,length,hash,queue_length)
+ // but multiple recycle_queue_lengths will result in each freed message being
+ // put onto an undefined one of the recycle queues.
+ static Queue *Fetch(const char *name, size_t length, int hash,
+ int queue_length,
+ int recycle_hash, int recycle_queue_length,
+ Queue **recycle);
-// Retrieves (and creates if necessary) a queue. Each combination of name and
-// signature refers to a completely independent queue.
-aos_queue *aos_fetch_queue(const char *name, const aos_type_sig *sig);
-// Same as above, except sets up the returned queue so that it will put messages
-// on *recycle (retrieved with recycle_sig) when they are freed (after they have
-// been released by all other readers/writers and are not in the queue).
-// The length of recycle_sig determines how many freed messages will be kept.
-// Other code can retrieve recycle_sig and sig separately. However, any frees
-// made using aos_fetch_queue with only sig before the recycle queue has been
-// associated with it will not go on to the recyce queue.
-// Will return NULL for both queues if sig->length != recycle_sig->length or
-// sig->hash == recycle_sig->hash (just to be safe).
-// NOTE: calling this function with the same sig but multiple recycle_sig s
-// will result in each freed message being put onto an undefined recycle_sig.
-aos_queue *aos_fetch_queue_recycle(const char *name, const aos_type_sig *sig,
- const aos_type_sig *recycle_sig, aos_queue **recycle);
+ // Constants for passing to options arguments.
+ // The non-conflicting ones can be combined with bitwise-or.
-// Constants for passing to opts arguments.
-// #defines so that c code can use queues
-// The non-conflicting ones can be combined with bitwise-or.
-// TODO(brians) prefix these?
-//
-// Causes the returned message to be left in the queue.
-// For reading only.
-#define PEEK 0x0001
-// Reads the last message in the queue instead of just the next one.
-// NOTE: This removes all of the messages until the last one from the queue
-// (which means that nobody else will read them). However, PEEK means to not
-// remove any from the queue, including the ones that are skipped.
-// For reading only.
-#define FROM_END 0x0002
-// Causes reads to return NULL and writes to fail instead of waiting.
-// For reading and writing.
-#define NON_BLOCK 0x0004
-// Causes things to block.
-// IMPORTANT: #defined to 0 so that it is the default. This has to stay.
-// For reading and writing.
-#define BLOCK 0x0000
-// Causes writes to overwrite the oldest message in the queue instead of
-// blocking.
-// For writing only.
-#define OVERRIDE 0x0008
+ // Causes the returned message to be left in the queue.
+ // For reading only.
+ static const int kPeek = 0x0001;
+ // Reads the last message in the queue instead of just the next one.
+ // NOTE: This removes all of the messages until the last one from the queue
+ // (which means that nobody else will read them). However, PEEK means to not
+ // remove any from the queue, including the ones that are skipped.
+ // For reading only.
+ static const int kFromEnd = 0x0002;
+ // Causes reads to return NULL and writes to fail instead of waiting.
+ // For reading and writing.
+ static const int kNonBlock = 0x0004;
+ // Causes things to block.
+ // IMPORTANT: Has a value of 0 so that it is the default. This has to stay.
+ // For reading and writing.
+ static const int kBlock = 0x0000;
+ // Causes writes to overwrite the oldest message in the queue instead of
+ // blocking.
+ // For writing only.
+ static const int kOverride = 0x0008;
-// Frees a message. Does nothing if msg is NULL.
-int aos_queue_free_msg(aos_queue *queue, const void *msg);
+ // Writes a message into the queue.
+ // This function takes ownership of msg.
+ // NOTE: msg must point to a valid message from this queue
+ bool WriteMessage(void *msg, int options);
-// Writes a message into the queue.
-// NOTE: msg must point to at least the length of this queue's worth of valid
-// data to write
-// IMPORTANT: if this returns -1, then the caller must do something with msg
-// (like free it)
-int aos_queue_write_msg(aos_queue *queue, void *msg, int opts);
-// Exactly the same as aos_queue_write_msg, except it automatically frees the
-// message if writing fails.
-static inline int aos_queue_write_msg_free(aos_queue *queue, void *msg, int opts) {
- const int ret = aos_queue_write_msg(queue, msg, opts);
- if (ret != 0) {
- aos_queue_free_msg(queue, msg);
- }
- return ret;
-}
+ // Reads a message out of the queue.
+ // The return value will have at least the length of this queue's worth of
+ // valid data where it's pointing to.
+ // The return value is const because other people might be viewing the same
+ // messsage. Do not cast the const away!
+ // IMPORTANT: The return value (if not NULL) must eventually be passed to
+ // FreeMessage.
+ const void *ReadMessage(int options);
+ // Exactly the same as aos_queue_read_msg, except it will never return the
+ // same message twice with the same index argument. However, it may not
+ // return some messages that pass through the queue.
+ // *index should start as 0. index does not have to be in shared memory, but
+ // it can be
+ const void *ReadMessageIndex(int options, int *index);
-// Reads a message out of the queue.
-// The return value will have at least the length of this queue's worth of valid
-// data where it's pointing to.
-// The return value is const because other people might be viewing the same
-// messsage. Do not cast the const away!
-// IMPORTANT: The return value (if not NULL) must eventually be passed to
-// aos_queue_free_msg.
-const void *aos_queue_read_msg(aos_queue *buf, int opts);
-// Exactly the same as aos_queue_read_msg, except it will never return the same
-// message twice with the same index argument. However, it may not return some
-// messages that pass through the queue.
-// *index should start as 0. index does not have to be in shared memory, but it
-// can be
-const void *aos_queue_read_msg_index(aos_queue *queue, int opts, int *index);
+ // Retrieves ("allocates") a message that can then be written to the queue.
+ // NOTE: the return value will be completely uninitialized
+ // The return value will have at least the length of this queue's worth of
+ // valid memory where it's pointing to.
+ // Returns NULL for error.
+ // IMPORTANT: The return value (if not NULL) must eventually be passed to
+ // FreeMessage.
+ void *GetMessage();
-// Retrieves ("allocates") a message that can then be written to the queue.
-// NOTE: the return value will be completely uninitialized
-// The return value will have at least the length of this queue's worth of valid
-// data where it's pointing to.
-// Returns NULL for error.
-// IMPORTANT: The return value (if not NULL) must eventually be passed to
-// aos_queue_free_msg.
-void *aos_queue_get_msg(aos_queue *queue);
+ void FreeMessage(const void *msg) { DecrementMessageReferenceCount(msg); }
-#ifdef __cplusplus
-}
-#endif
+ private:
+ struct MessageHeader;
-#endif
+ // These next 4 allow finding the right one.
+ const char *name_;
+ size_t length_;
+ int hash_;
+ int queue_length_;
+ // The next one in the linked list of queues.
+ Queue *next_;
+ Queue *recycle_;
+
+ Mutex data_lock_; // protects operations on data_ etc
+ Condition readable_;
+ Condition writable_;
+ int data_length_; // max length into data + 1
+ int data_start_; // is an index into data
+ int data_end_; // is an index into data
+ int messages_; // that have passed through
+ void **data_; // array of messages (with headers)
+
+ Mutex pool_lock_;
+ size_t msg_length_; // sizeof(each message) including the header
+ int mem_length_; // the max number of messages that will ever be allocated
+ int messages_used_;
+ int pool_length_; // the number of allocated messages
+ MessageHeader **pool_; // array of pointers to messages
+
+ // Actually frees the given message.
+ void DoFreeMessage(const void *msg);
+ // Calls DoFreeMessage if appropriate.
+ void DecrementMessageReferenceCount(const void *msg);
+
+ // Should be called with data_lock_ locked.
+ // Returns with a readable message in data_ or false.
+ bool ReadCommonStart(int options, int *index);
+ // Deals with setting/unsetting readable_ and writable_.
+ // Should be called after data_lock_ has been unlocked.
+ // read is whether or not this read call read one off the queue
+ void ReadCommonEnd(bool read);
+ // Handles reading with kPeek.
+ void *ReadPeek(int options, int start);
+
+ // Gets called by Fetch when necessary (with placement new).
+ Queue(const char *name, size_t length, int hash, int queue_length);
+};
+
+} // namespace aos
+
+#endif // AOS_COMMONG_QUEUE_H_
diff --git a/aos/atom_code/ipc_lib/queue_internal.h b/aos/atom_code/ipc_lib/queue_internal.h
deleted file mode 100644
index e592649..0000000
--- a/aos/atom_code/ipc_lib/queue_internal.h
+++ /dev/null
@@ -1,62 +0,0 @@
-#ifndef AOS_IPC_LIB_QUEUE_INTERNAL_H_
-#define AOS_IPC_LIB_QUEUE_INTERNAL_H_
-
-#include "shared_mem.h"
-#include "aos_sync.h"
-
-// Should only be used by queue.c. Contains definitions of the structures
-// it uses.
-
-// The number of extra messages the pool associated with each queue will be able
-// to hold (for readers who are slow about freeing them).
-#define EXTRA_MESSAGES 20
-
-typedef struct aos_msg_header_t {
- int ref_count;
- int index; // in the pool
-} aos_msg_header;
-static inline void header_swap(aos_msg_header *l, aos_msg_header *r) {
- aos_msg_header tmp;
- tmp.ref_count = l->ref_count;
- tmp.index = l->index;
- l->ref_count = r->ref_count;
- l->index = r->index;
- r->ref_count = tmp.ref_count;
- r->index = tmp.index;
-}
-
-struct aos_queue_list_t {
- char *name;
- aos_type_sig sig;
- aos_queue *queue;
- aos_queue_list *next;
-};
-
-typedef struct aos_ring_buf_t {
- mutex buff_lock; // the main lock protecting operations on this buffer
- // conditions
- condition_variable writable;
- condition_variable readable;
- int length; // max index into data + 1
- int start; // is an index into data
- int end; // is an index into data
- int msgs; // that have passed through
- void **data; // array of messages (w/ headers)
-} aos_ring_buf;
-
-typedef struct aos_msg_pool_t {
- mutex pool_lock;
- size_t msg_length;
- int mem_length; // the number of messages
- int used; // number of messages
- int length; // number of allocated messages
- void **pool; // array of messages
-} aos_msg_pool;
-
-struct aos_queue_t {
- aos_msg_pool pool;
- aos_ring_buf buf;
- aos_queue *recycle;
-};
-
-#endif
diff --git a/aos/atom_code/ipc_lib/queue_test.cpp b/aos/atom_code/ipc_lib/queue_test.cc
similarity index 80%
rename from aos/atom_code/ipc_lib/queue_test.cpp
rename to aos/atom_code/ipc_lib/queue_test.cc
index e9ac8d5..b437c86 100644
--- a/aos/atom_code/ipc_lib/queue_test.cpp
+++ b/aos/atom_code/ipc_lib/queue_test.cc
@@ -1,3 +1,5 @@
+#include "aos/common/queue.h"
+
#include <unistd.h>
#include <sys/mman.h>
#include <inttypes.h>
@@ -11,9 +13,12 @@
#include "aos/atom_code/ipc_lib/sharedmem_test_setup.h"
#include "aos/common/type_traits.h"
-using testing::AssertionResult;
-using testing::AssertionSuccess;
-using testing::AssertionFailure;
+using ::testing::AssertionResult;
+using ::testing::AssertionSuccess;
+using ::testing::AssertionFailure;
+
+namespace aos {
+namespace testing {
// IMPORTANT: Some of the functions that do test predicate functions allocate
// shared memory (and don't free it).
@@ -44,16 +49,19 @@
return std::string("unknown(" + static_cast<uint8_t>(result)) + ")";
}
}
- static_assert(aos::shm_ok<ResultType>::value, "this will get put in shared memory");
+ static_assert(aos::shm_ok<ResultType>::value,
+ "this will get put in shared memory");
// Gets allocated in shared memory so it has to be volatile.
- template<typename T> struct FunctionToCall {
+ template<typename T>
+ struct FunctionToCall {
ResultType result;
bool expected;
void (*function)(T*, char*);
T *arg;
volatile char failure[kFailureSize];
};
- template<typename T> static void Hangs_(volatile FunctionToCall<T> *const to_call) {
+ template<typename T>
+ static void Hangs_(volatile FunctionToCall<T> *const to_call) {
to_call->result = ResultType::Called;
to_call->function(to_call->arg, const_cast<char *>(to_call->failure));
to_call->result = ResultType::Returned;
@@ -124,7 +132,7 @@
void SetUp() {
SharedMemTestSetup::SetUp();
- fatal_failure = reinterpret_cast<char *>(shm_malloc(sizeof(fatal_failure)));
+ fatal_failure = static_cast<char *>(shm_malloc(sizeof(fatal_failure)));
static bool registered = false;
if (!registered) {
atexit(ReapExitHandler);
@@ -138,7 +146,7 @@
// the attribute is in the middle to make gcc happy
template<typename T> __attribute__((warn_unused_result))
std::unique_ptr<ForkedProcess> ForkExecute(void (*function)(T*), T *arg) {
- mutex *lock = reinterpret_cast<mutex *>(shm_malloc_aligned(
+ mutex *lock = static_cast<mutex *>(shm_malloc_aligned(
sizeof(*lock), sizeof(int)));
*lock = 1;
const pid_t pid = fork();
@@ -182,7 +190,7 @@
bool expected, ChildID id) {
static_assert(aos::shm_ok<FunctionToCall<T>>::value,
"this is going into shared memory");
- volatile FunctionToCall<T> *const to_call = reinterpret_cast<FunctionToCall<T> *>(
+ volatile FunctionToCall<T> *const to_call = static_cast<FunctionToCall<T> *>(
shm_malloc_aligned(sizeof(*to_call), sizeof(int)));
to_call->result = ResultType::NotCalled;
to_call->function = function;
@@ -237,38 +245,37 @@
int16_t data; // don't really want to test empty messages
};
struct MessageArgs {
- aos_queue *const queue;
+ Queue *const queue;
int flags;
int16_t data; // -1 means NULL expected
};
static void WriteTestMessage(MessageArgs *args, char *failure) {
- TestMessage *msg = reinterpret_cast<TestMessage *>(aos_queue_get_msg(args->queue));
+ TestMessage *msg = static_cast<TestMessage *>(args->queue->GetMessage());
if (msg == NULL) {
snprintf(fatal_failure, kFailureSize, "couldn't get_msg from %p", args->queue);
return;
}
msg->data = args->data;
- if (aos_queue_write_msg_free(args->queue, msg, args->flags) == -1) {
+ if (!args->queue->WriteMessage(msg, args->flags)) {
snprintf(failure, kFailureSize, "write_msg_free(%p, %p, %d) failed",
args->queue, msg, args->flags);
}
}
static void ReadTestMessage(MessageArgs *args, char *failure) {
- const TestMessage *msg = reinterpret_cast<const TestMessage *>(
- aos_queue_read_msg(args->queue, args->flags));
+ const TestMessage *msg = static_cast<const TestMessage *>(
+ args->queue->ReadMessage(args->flags));
if (msg == NULL) {
if (args->data != -1) {
- snprintf(failure, kFailureSize,
- "expected data of %" PRId16 " but got NULL message",
+ snprintf(failure, kFailureSize, "expected data of %"PRId16" but got NULL message",
args->data);
}
} else {
if (args->data != msg->data) {
snprintf(failure, kFailureSize,
- "expected data of %" PRId16 " but got %" PRId16 " instead",
+ "expected data of %"PRId16" but got %"PRId16" instead",
args->data, msg->data);
}
- aos_queue_free_msg(args->queue, msg);
+ args->queue->FreeMessage(msg);
}
}
};
@@ -276,81 +283,75 @@
std::map<QueueTest::ChildID, QueueTest::ForkedProcess *> QueueTest::children_;
TEST_F(QueueTest, Reading) {
- static const aos_type_sig signature{sizeof(TestMessage), 1, 1};
- aos_queue *const queue = aos_fetch_queue("Queue", &signature);
+ Queue *const queue = Queue::Fetch("Queue", sizeof(TestMessage), 1, 1);
MessageArgs args{queue, 0, -1};
- EXPECT_EQ(BLOCK, 0);
- EXPECT_EQ(BLOCK | FROM_END, FROM_END);
-
- args.flags = NON_BLOCK;
+ args.flags = Queue::kNonBlock;
EXPECT_RETURNS(ReadTestMessage, &args);
- args.flags = NON_BLOCK | PEEK;
+ args.flags = Queue::kNonBlock | Queue::kPeek;
EXPECT_RETURNS(ReadTestMessage, &args);
args.flags = 0;
EXPECT_HANGS(ReadTestMessage, &args);
- args.flags = PEEK;
+ args.flags = Queue::kPeek;
EXPECT_HANGS(ReadTestMessage, &args);
args.data = 254;
- args.flags = BLOCK;
+ args.flags = Queue::kBlock;
EXPECT_RETURNS(WriteTestMessage, &args);
- args.flags = PEEK;
+ args.flags = Queue::kPeek;
EXPECT_RETURNS(ReadTestMessage, &args);
- args.flags = PEEK;
+ args.flags = Queue::kPeek;
EXPECT_RETURNS(ReadTestMessage, &args);
- args.flags = PEEK | NON_BLOCK;
+ args.flags = Queue::kPeek | Queue::kNonBlock;
EXPECT_RETURNS(ReadTestMessage, &args);
args.flags = 0;
EXPECT_RETURNS(ReadTestMessage, &args);
args.flags = 0;
args.data = -1;
EXPECT_HANGS(ReadTestMessage, &args);
- args.flags = NON_BLOCK;
+ args.flags = Queue::kNonBlock;
EXPECT_RETURNS(ReadTestMessage, &args);
args.flags = 0;
args.data = 971;
EXPECT_RETURNS_FAILS(ReadTestMessage, &args);
}
TEST_F(QueueTest, Writing) {
- static const aos_type_sig signature{sizeof(TestMessage), 1, 1};
- aos_queue *const queue = aos_fetch_queue("Queue", &signature);
+ Queue *const queue = Queue::Fetch("Queue", sizeof(TestMessage), 1, 1);
MessageArgs args{queue, 0, 973};
- args.flags = BLOCK;
+ args.flags = Queue::kBlock;
EXPECT_RETURNS(WriteTestMessage, &args);
- args.flags = BLOCK;
+ args.flags = Queue::kBlock;
EXPECT_HANGS(WriteTestMessage, &args);
- args.flags = NON_BLOCK;
+ args.flags = Queue::kNonBlock;
EXPECT_RETURNS_FAILS(WriteTestMessage, &args);
- args.flags = NON_BLOCK;
+ args.flags = Queue::kNonBlock;
EXPECT_RETURNS_FAILS(WriteTestMessage, &args);
- args.flags = PEEK;
+ args.flags = Queue::kPeek;
EXPECT_RETURNS(ReadTestMessage, &args);
args.data = 971;
- args.flags = OVERRIDE;
+ args.flags = Queue::kOverride;
EXPECT_RETURNS(WriteTestMessage, &args);
- args.flags = OVERRIDE;
+ args.flags = Queue::kOverride;
EXPECT_RETURNS(WriteTestMessage, &args);
args.flags = 0;
EXPECT_RETURNS(ReadTestMessage, &args);
- args.flags = NON_BLOCK;
+ args.flags = Queue::kNonBlock;
EXPECT_RETURNS(WriteTestMessage, &args);
args.flags = 0;
EXPECT_RETURNS(ReadTestMessage, &args);
- args.flags = OVERRIDE;
+ args.flags = Queue::kOverride;
EXPECT_RETURNS(WriteTestMessage, &args);
args.flags = 0;
EXPECT_RETURNS(ReadTestMessage, &args);
}
TEST_F(QueueTest, MultiRead) {
- static const aos_type_sig signature{sizeof(TestMessage), 1, 1};
- aos_queue *const queue = aos_fetch_queue("Queue", &signature);
+ Queue *const queue = Queue::Fetch("Queue", sizeof(TestMessage), 1, 1);
MessageArgs args{queue, 0, 1323};
- args.flags = BLOCK;
+ args.flags = Queue::kBlock;
EXPECT_RETURNS(WriteTestMessage, &args);
- args.flags = BLOCK;
+ args.flags = Queue::kBlock;
ASSERT_TRUE(HangsFork(ReadTestMessage, &args, true, 1));
ASSERT_TRUE(HangsFork(ReadTestMessage, &args, true, 2));
EXPECT_TRUE(HangsCheck(1) != HangsCheck(2));
@@ -360,45 +361,45 @@
TEST_F(QueueTest, Recycle) {
// TODO(brians) basic test of recycle queue
// include all of the ways a message can get into the recycle queue
- static const aos_type_sig signature{sizeof(TestMessage), 1, 2},
- recycle_signature{sizeof(TestMessage), 2, 2};
- aos_queue *recycle_queue = reinterpret_cast<aos_queue *>(23);
- aos_queue *const queue = aos_fetch_queue_recycle("Queue", &signature,
- &recycle_signature, &recycle_queue);
- ASSERT_NE(reinterpret_cast<aos_queue *>(23), recycle_queue);
+ Queue *recycle_queue = reinterpret_cast<Queue *>(23);
+ Queue *const queue = Queue::Fetch("Queue", sizeof(TestMessage), 1, 2, 2, 2,
+ &recycle_queue);
+ ASSERT_NE(reinterpret_cast<Queue *>(23), recycle_queue);
MessageArgs args{queue, 0, 973}, recycle{recycle_queue, 0, 973};
- args.flags = BLOCK;
+ args.flags = Queue::kBlock;
EXPECT_RETURNS(WriteTestMessage, &args);
EXPECT_HANGS(ReadTestMessage, &recycle);
args.data = 254;
EXPECT_RETURNS(WriteTestMessage, &args);
EXPECT_HANGS(ReadTestMessage, &recycle);
args.data = 971;
- args.flags = OVERRIDE;
+ args.flags = Queue::kOverride;
EXPECT_RETURNS(WriteTestMessage, &args);
- recycle.flags = BLOCK;
+ recycle.flags = Queue::kBlock;
EXPECT_RETURNS(ReadTestMessage, &recycle);
EXPECT_HANGS(ReadTestMessage, &recycle);
- TestMessage *msg = static_cast<TestMessage *>(aos_queue_get_msg(queue));
+ TestMessage *msg = static_cast<TestMessage *>(queue->GetMessage());
ASSERT_TRUE(msg != NULL);
msg->data = 341;
- aos_queue_free_msg(queue, msg);
+ queue->FreeMessage(msg);
recycle.data = 341;
EXPECT_RETURNS(ReadTestMessage, &recycle);
EXPECT_HANGS(ReadTestMessage, &recycle);
args.data = 254;
- args.flags = PEEK;
+ args.flags = Queue::kPeek;
EXPECT_RETURNS(ReadTestMessage, &args);
- recycle.flags = BLOCK;
+ recycle.flags = Queue::kBlock;
EXPECT_HANGS(ReadTestMessage, &recycle);
- args.flags = BLOCK;
+ args.flags = Queue::kBlock;
EXPECT_RETURNS(ReadTestMessage, &args);
recycle.data = 254;
EXPECT_RETURNS(ReadTestMessage, &recycle);
}
+} // namespace testing
+} // namespace aos
diff --git a/aos/atom_code/ipc_lib/shared_mem.h b/aos/atom_code/ipc_lib/shared_mem.h
index b1a2608..d8f2d18 100644
--- a/aos/atom_code/ipc_lib/shared_mem.h
+++ b/aos/atom_code/ipc_lib/shared_mem.h
@@ -1,14 +1,14 @@
#ifndef _SHARED_MEM_H_
#define _SHARED_MEM_H_
-#ifdef __cplusplus
-extern "C" {
-#endif
-
#include "core_lib.h"
#include <stddef.h>
#include <unistd.h>
+#ifdef __cplusplus
+extern "C" {
+#endif
+
// Where the shared memory segment starts in each process's address space.
// Has to be the same in all of them so that stuff in shared memory
// can have regular pointers to other stuff in shared memory.
diff --git a/aos/atom_code/logging/atom_logging.cc b/aos/atom_code/logging/atom_logging.cc
index 08a65cb..6ea3b24 100644
--- a/aos/atom_code/logging/atom_logging.cc
+++ b/aos/atom_code/logging/atom_logging.cc
@@ -17,6 +17,8 @@
#include "aos/atom_code/thread_local.h"
#include "aos/atom_code/ipc_lib/queue.h"
+using ::aos::Queue;
+
namespace aos {
namespace logging {
namespace {
@@ -53,8 +55,7 @@
return process_name + '.' + thread_name;
}
-static const aos_type_sig message_sig = {sizeof(LogMessage), 1323, 1500};
-static aos_queue *queue;
+static Queue *queue;
} // namespace
namespace internal {
@@ -83,7 +84,7 @@
class AtomQueueLogImplementation : public LogImplementation {
virtual void DoLog(log_level level, const char *format, va_list ap) {
- LogMessage *message = static_cast<LogMessage *>(aos_queue_get_msg(queue));
+ LogMessage *message = static_cast<LogMessage *>(queue->GetMessage());
if (message == NULL) {
LOG(FATAL, "queue get message failed\n");
}
@@ -99,7 +100,7 @@
void Register() {
Init();
- queue = aos_fetch_queue("LoggingQueue", &message_sig);
+ queue = Queue::Fetch("LoggingQueue", sizeof(LogMessage), 1323, 1500);
if (queue == NULL) {
Die("logging: couldn't fetch queue\n");
}
@@ -108,33 +109,32 @@
}
const LogMessage *ReadNext(int flags, int *index) {
- return static_cast<const LogMessage *>(
- aos_queue_read_msg_index(queue, flags, index));
+ return static_cast<const LogMessage *>(queue->ReadMessageIndex(flags, index));
}
const LogMessage *ReadNext() {
- return ReadNext(BLOCK);
+ return ReadNext(Queue::kBlock);
}
const LogMessage *ReadNext(int flags) {
const LogMessage *r = NULL;
do {
- r = static_cast<const LogMessage *>(aos_queue_read_msg(queue, flags));
+ r = static_cast<const LogMessage *>(queue->ReadMessage(flags));
// not blocking means return a NULL if that's what it gets
- } while ((flags & BLOCK) && r == NULL);
+ } while ((flags & Queue::kBlock) && r == NULL);
return r;
}
LogMessage *Get() {
- return static_cast<LogMessage *>(aos_queue_get_msg(queue));
+ return static_cast<LogMessage *>(queue->GetMessage());
}
void Free(const LogMessage *msg) {
- aos_queue_free_msg(queue, msg);
+ queue->FreeMessage(msg);
}
void Write(LogMessage *msg) {
- if (aos_queue_write_msg_free(queue, msg, OVERRIDE) < 0) {
+ if (!queue->WriteMessage(msg, Queue::kOverride)) {
LOG(FATAL, "writing failed");
}
}