blob: 5cfd2ac6c5a1b41f2d01756b62bd3e81244281da [file] [log] [blame]
#include "aos/atom_code/ipc_lib/queue.h"
#include "aos/atom_code/ipc_lib/queue_internal.h"
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <assert.h>
#define READ_DEBUG 0
#define WRITE_DEBUG 0
#define REF_DEBUG 0
static inline aos_msg_header *get_header(void *msg) {
return (aos_msg_header *)((uint8_t *)msg - sizeof(aos_msg_header));
}
static inline aos_queue *aos_core_alloc_queue() {
return shm_malloc_aligned(sizeof(aos_queue), sizeof(int));
}
static inline void *aos_alloc_msg(aos_msg_pool *pool) {
return shm_malloc(pool->msg_length);
}
// actually free the given message
static inline int aos_free_msg(aos_msg_pool *pool, void *msg, aos_queue *queue) {
#if REF_DEBUG
if (pool->pool_lock == 0) {
//LOG(WARNING, "unprotected\n");
}
#endif
aos_msg_header *header = get_header(msg);
if (pool->pool[header->index] != header) { // if something's messed up
fprintf(stderr, "queue: something is very very wrong with queue %p."
" pool->pool(=%p)[header->index(=%d)] != header(=%p)\n",
queue, pool->pool, header->index, header);
printf("queue: see stderr\n");
abort();
}
#if REF_DEBUG
printf("ref_free_count: %p\n", msg);
#endif
--pool->used;
if (queue->recycle != NULL) {
void *const new_msg = aos_queue_get_msg(queue->recycle);
if (new_msg == NULL) {
fprintf(stderr, "queue: couldn't get a message"
" for recycle queue %p\n", queue->recycle);
} else {
// Take a message from recycle_queue and switch its
// header with the one being freed, which effectively
// switches which queue each message belongs to.
aos_msg_header *const new_header = get_header(new_msg);
// also switch the messages between the pools
pool->pool[header->index] = new_header;
if (mutex_lock(&queue->recycle->pool.pool_lock)) {
return -1;
}
queue->recycle->pool.pool[new_header->index] = header;
// swap the information in both headers
header_swap(header, new_header);
// don't unlock the other pool until all of its messages are valid
mutex_unlock(&queue->recycle->pool.pool_lock);
// use the header for new_msg which is now for this pool
header = new_header;
if (aos_queue_write_msg_free(queue->recycle,
(void *)msg, OVERRIDE) != 0) {
printf("queue: warning aos_queue_write_msg("
"%p(=queue(=%p)->recycle), %p, OVERRIDE)"
" failed\n",
queue->recycle, queue, msg);
}
msg = new_msg;
}
}
// where the one we're freeing was
int index = header->index;
header->index = -1;
if (index != pool->used) { // if we're not freeing the one on the end
// put the last one where the one we're freeing was
header = pool->pool[index] = pool->pool[pool->used];
// put the one we're freeing at the end
pool->pool[pool->used] = get_header(msg);
// update the former last one's index
header->index = index;
}
return 0;
}
// TODO(brians) maybe do this with atomic integer instructions so it doesn't have to lock/unlock pool_lock
static inline int msg_ref_dec(void *msg, aos_msg_pool *pool, aos_queue *queue) {
if (msg == NULL) {
return 0;
}
int rv = 0;
if (mutex_lock(&pool->pool_lock)) {
return -1;
}
aos_msg_header *const header = get_header(msg);
header->ref_count --;
assert(header->ref_count >= 0);
#if REF_DEBUG
printf("ref_dec_count: %p count=%d\n", msg, header->ref_count);
#endif
if (header->ref_count == 0) {
rv = aos_free_msg(pool, msg, queue);
}
mutex_unlock(&pool->pool_lock);
return rv;
}
static inline int sigcmp(const aos_type_sig *sig1, const aos_type_sig *sig2) {
if (sig1->length != sig2->length) {
//LOG(DEBUG, "length mismatch 1=%d 2=%d\n", sig1->length, sig2->length);
return 0;
}
if (sig1->queue_length != sig2->queue_length) {
//LOG(DEBUG, "queue_length mismatch 1=%d 2=%d\n", sig1->queue_length, sig2->queue_length);
return 0;
}
if (sig1->hash != sig2->hash) {
//LOG(DEBUG, "hash mismatch 1=%d 2=%d\n", sig1->hash, sig2->hash);
return 0;
}
//LOG(DEBUG, "signature match\n");
return 1;
}
static inline aos_queue *aos_create_queue(const aos_type_sig *sig) {
aos_queue *const queue = aos_core_alloc_queue();
aos_msg_pool *const pool = &queue->pool;
pool->mem_length = sig->queue_length + EXTRA_MESSAGES;
pool->length = 0;
pool->used = 0;
pool->msg_length = sig->length + sizeof(aos_msg_header);
pool->pool = shm_malloc(sizeof(void *) * pool->mem_length);
aos_ring_buf *const buf = &queue->buf;
buf->length = sig->queue_length + 1;
if (buf->length < 2) { // TODO(brians) when could this happen?
buf->length = 2;
}
buf->data = shm_malloc(buf->length * sizeof(void *));
buf->start = 0;
buf->end = 0;
buf->msgs = 0;
buf->writable = 1;
buf->readable = 0;
buf->buff_lock = 0;
pool->pool_lock = 0;
queue->recycle = NULL;
return queue;
}
aos_queue *aos_fetch_queue(const char *name, const aos_type_sig *sig) {
//LOG(DEBUG, "Fetching the stupid queue: %s\n", name);
mutex_grab(&global_core->mem_struct->queues.alloc_lock);
aos_queue_list *list = global_core->mem_struct->queues.queue_list;
aos_queue_list *last = NULL;
while (list != NULL) {
// if we found a matching queue
if (strcmp(list->name, name) == 0 && sigcmp(&list->sig, sig)) {
mutex_unlock(&global_core->mem_struct->queues.alloc_lock);
return list->queue;
} else {
//LOG(DEBUG, "rejected queue %s strcmp=%d target=%s\n", (*list)->name, strcmp((*list)->name, name), name);
}
last = list;
list = list->next;
}
list = shm_malloc(sizeof(aos_queue_list));
if (last == NULL) {
global_core->mem_struct->queues.queue_list = list;
} else {
last->next = list;
}
list->sig = *sig;
const size_t name_size = strlen(name) + 1;
list->name = shm_malloc(name_size);
memcpy(list->name, name, name_size);
//LOG(INFO, "creating queue{name=%s, sig.length=%zd, sig.hash=%d, sig.queue_length=%d}\n", name, sig->length, sig->hash, sig->queue_length);
list->queue = aos_create_queue(sig);
//LOG(DEBUG, "Made the stupid queue: %s happy?\n", name);
list->next = NULL;
mutex_unlock(&global_core->mem_struct->queues.alloc_lock);
return list->queue;
}
aos_queue *aos_fetch_queue_recycle(const char *name, const aos_type_sig *sig,
const aos_type_sig *recycle_sig, aos_queue **recycle) {
if (sig->length != recycle_sig->length || sig->hash == recycle_sig->hash) {
*recycle = NULL;
return NULL;
}
aos_queue *const r = aos_fetch_queue(name, sig);
r->recycle = aos_fetch_queue(name, recycle_sig);
if (r == r->recycle) {
fprintf(stderr, "queue: r->recycle(=%p) == r(=%p)\n", r->recycle, r);
printf("see stderr\n");
abort();
}
*recycle = r->recycle;
return r;
}
int aos_queue_write_msg(aos_queue *queue, void *msg, int opts) {
#if WRITE_DEBUG
printf("queue: write_msg(%p, %p, %d)\n", queue, msg, opts);
#endif
int rv = 0;
if (msg == NULL || msg < (void *)global_core->mem_struct ||
msg > (void *)((intptr_t)global_core->mem_struct + global_core->size)) {
fprintf(stderr, "queue: attempt to write bad message %p to %p. aborting\n",
msg, queue);
printf("see stderr\n");
abort();
}
aos_ring_buf *const buf = &queue->buf;
if (mutex_lock(&buf->buff_lock)) {
#if WRITE_DEBUG
printf("queue: locking buff_lock of %p failed\n", buf);
#endif
return -1;
}
int new_end = (buf->end + 1) % buf->length;
while (new_end == buf->start) {
if (opts & NON_BLOCK) {
#if WRITE_DEBUG
printf("queue: not blocking on %p. returning -1\n", queue);
#endif
mutex_unlock(&buf->buff_lock);
return -1;
} else if (opts & OVERRIDE) {
#if WRITE_DEBUG
printf("queue: overriding on %p\n", queue);
#endif
// avoid leaking the message that we're going to overwrite
msg_ref_dec(buf->data[buf->start], &queue->pool, queue);
buf->start = (buf->start + 1) % buf->length;
} else { // BLOCK
mutex_unlock(&buf->buff_lock);
#if WRITE_DEBUG
printf("queue: going to wait for writable(=%p) of %p\n",
&buf->writable, queue);
#endif
if (condition_wait(&buf->writable)) {
#if WRITE_DEBUG
printf("queue: waiting for writable(=%p) of %p failed\n",
&buf->writable, queue);
#endif
return -1;
}
#if WRITE_DEBUG
printf("queue: going to re-lock buff_lock of %p to write\n", queue);
#endif
if (mutex_lock(&buf->buff_lock)) {
#if WRITE_DEBUG
printf("queue: error locking buff_lock of %p\n", queue);
#endif
return -1;
}
}
new_end = (buf->end + 1) % buf->length;
}
buf->data[buf->end] = msg;
++buf->msgs;
buf->end = new_end;
mutex_unlock(&buf->buff_lock);
#if WRITE_DEBUG
printf("queue: setting readable(=%p) of %p\n", &buf->readable, queue);
#endif
condition_set(&buf->readable);
if (((buf->end + 1) % buf->length) == buf->start) { // if it's now full
condition_unset(&buf->writable);
}
#if WRITE_DEBUG
printf("queue: write returning %d on queue %p\n", rv, queue);
#endif
return rv;
}
int aos_queue_free_msg(aos_queue *queue, const void *msg) {
// TODO(brians) get rid of this
void *msg_temp;
memcpy(&msg_temp, &msg, sizeof(msg_temp));
return msg_ref_dec(msg_temp, &queue->pool, queue);
}
// Deals with setting/unsetting readable and writable.
// Should be called after buff_lock has been unlocked.
// read is whether or not this read call read one off the queue
static inline void aos_read_msg_common_end(aos_ring_buf *const buf, int read) {
if (read) {
condition_set(&buf->writable);
if (buf->start == buf->end) {
condition_unset(&buf->readable);
}
}
}
// Returns with buff_lock locked and a readable message in buf.
// Returns -1 for error (if it returns -1, buff_lock will be unlocked).
static inline int aos_queue_read_msg_common(int opts, aos_ring_buf *const buf,
aos_queue *const queue, int *index) {
#if !READ_DEBUG
(void)queue;
#endif
if (mutex_lock(&buf->buff_lock)) {
#if READ_DEBUG
printf("queue: couldn't lock buff_lock of %p\n", queue);
#endif
return -1;
}
while (buf->start == buf->end || ((index != NULL) && buf->msgs <= *index)) {
mutex_unlock(&buf->buff_lock);
if (opts & NON_BLOCK) {
#if READ_DEBUG
printf("queue: not going to block waiting on %p\n", queue);
#endif
return -1;
} else { // BLOCK
#if READ_DEBUG
printf("queue: going to wait for readable(=%p) of %p\n",
&buf->readable, queue);
#endif
// wait for a message to become readable
if ((index == NULL) ? condition_wait(&buf->readable) :
condition_wait_force(&buf->readable)) {
#if READ_DEBUG
printf("queue: waiting for readable(=%p) of %p failed\n",
&buf->readable, queue);
#endif
return -1;
}
}
#if READ_DEBUG
printf("queue: going to re-lock buff_lock of %p to read\n", queue);
#endif
if (mutex_lock(&buf->buff_lock)) {
#if READ_DEBUG
printf("couldn't re-lock buff_lock of %p\n", queue);
#endif
return -1;
}
}
#if READ_DEBUG
printf("queue: read start=%d end=%d from %p\n", buf->start, buf->end, queue);
#endif
return 0;
}
// handles reading with PEEK
static inline void *read_msg_peek(aos_ring_buf *const buf, int opts, int start) {
void *ret;
if (opts & FROM_END) {
int pos = buf->end - 1;
if (pos < 0) { // if it needs to wrap
pos = buf->length - 1;
}
#if READ_DEBUG
printf("queue: reading from line %d: %d\n", __LINE__, pos);
#endif
ret = buf->data[pos];
} else {
#if READ_DEBUG
printf("queue: reading from line %d: %d\n", __LINE__, start);
#endif
ret = buf->data[start];
}
aos_msg_header *const header = get_header(ret);
header->ref_count ++;
#if REF_DEBUG
printf("ref inc count: %p\n", ret);
#endif
return ret;
}
const void *aos_queue_read_msg(aos_queue *queue, int opts) {
#if READ_DEBUG
printf("queue: read_msg(%p, %d)\n", queue, opts);
#endif
void *msg = NULL;
aos_ring_buf *const buf = &queue->buf;
if (aos_queue_read_msg_common(opts, buf, queue, NULL) == -1) {
#if READ_DEBUG
printf("queue: common returned -1 for %p\n", queue);
#endif
return NULL;
}
if (opts & PEEK) {
msg = read_msg_peek(buf, opts, buf->start);
} else {
if (opts & FROM_END) {
while (1) {
#if READ_DEBUG
printf("queue: start of c2 of %p\n", queue);
#endif
// This loop pulls each message out of the buffer.
const int pos = buf->start;
buf->start = (buf->start + 1) % buf->length;
// if this is the last one
if (buf->start == buf->end) {
#if READ_DEBUG
printf("queue: reading from c2: %d\n", pos);
#endif
msg = buf->data[pos];
break;
}
// it's not going to be in the queue any more
msg_ref_dec(buf->data[pos], &queue->pool, queue);
}
} else {
#if READ_DEBUG
printf("queue: reading from d2: %d\n", buf->start);
#endif
msg = buf->data[buf->start];
buf->start = (buf->start + 1) % buf->length;
}
}
mutex_unlock(&buf->buff_lock);
aos_read_msg_common_end(buf, !(opts & PEEK));
#if READ_DEBUG
printf("queue: read returning %p\n", msg);
#endif
return msg;
}
const void *aos_queue_read_msg_index(aos_queue *queue, int opts, int *index) {
#if READ_DEBUG
printf("queue: read_msg_index(%p, %d, %p(*=%d))\n", queue, opts, index, *index);
#endif
void *msg = NULL;
aos_ring_buf *const buf = &queue->buf;
if (aos_queue_read_msg_common(opts, buf, queue, index) == -1) {
#if READ_DEBUG
printf("queue: common returned -1\n");
#endif
return NULL;
}
// TODO(parker): Handle integer wrap on the index.
const int offset = buf->msgs - *index;
int my_start = buf->end - offset;
if (offset >= buf->length) { // if we're behind the available messages
// catch index up to the last available message
*index += buf->start - my_start;
// and that's the one we're going to read
my_start = buf->start;
}
if (my_start < 0) { // if we want to read off the end of the buffer
// unwrap where we're going to read from
my_start += buf->length;
}
if (opts & PEEK) {
msg = read_msg_peek(buf, opts, my_start);
} else {
if (opts & FROM_END) {
#if READ_DEBUG
printf("queue: start of c1 of %p\n", queue);
#endif
int pos = buf->end - 1;
if (pos < 0) { // if it wrapped
pos = buf->length - 1; // unwrap it
}
#if READ_DEBUG
printf("queue: reading from c1: %d\n", pos);
#endif
msg = buf->data[pos];
*index = buf->msgs;
} else {
#if READ_DEBUG
printf("queue: reading from d1: %d\n", my_start);
#endif
msg = buf->data[my_start];
++(*index);
}
aos_msg_header *const header = get_header(msg);
++header->ref_count;
#if REF_DEBUG
printf("ref_inc_count: %p\n", msg);
#endif
}
mutex_unlock(&buf->buff_lock);
// this function never consumes one off the queue
aos_read_msg_common_end(buf, 0);
return msg;
}
static inline void *aos_pool_get_msg(aos_msg_pool *pool) {
if (mutex_lock(&pool->pool_lock)) {
return NULL;
}
void *msg;
if (pool->length - pool->used > 0) {
msg = pool->pool[pool->used];
} else {
if (pool->length >= pool->mem_length) {
//TODO(brians) log this if it isn't the log queue
fprintf(stderr, "queue: overused_pool\n");
msg = NULL;
goto exit;
}
msg = pool->pool[pool->length] = aos_alloc_msg(pool);
++pool->length;
}
aos_msg_header *const header = msg;
msg = (uint8_t *)msg + sizeof(aos_msg_header);
header->ref_count = 1;
#if REF_DEBUG
printf("ref alloc: %p\n", msg);
#endif
header->index = pool->used;
++pool->used;
exit:
mutex_unlock(&pool->pool_lock);
return msg;
}
void *aos_queue_get_msg(aos_queue *queue) {
return aos_pool_get_msg(&queue->pool);
}