copied everything over from 2012 and removed all of the actual robot code except the drivetrain stuff


git-svn-id: https://robotics.mvla.net/svn/frc971/2013/trunk/src@4078 f308d9b7-e957-4cde-b6ac-9a88185e7312
diff --git a/aos/atom_code/ipc_lib/queue.c b/aos/atom_code/ipc_lib/queue.c
new file mode 100644
index 0000000..5cfd2ac
--- /dev/null
+++ b/aos/atom_code/ipc_lib/queue.c
@@ -0,0 +1,510 @@
+#include "aos/atom_code/ipc_lib/queue.h"
+#include "aos/atom_code/ipc_lib/queue_internal.h"
+
+#include <stdio.h>
+#include <string.h>
+#include <errno.h>
+#include <assert.h>
+
+#define READ_DEBUG 0
+#define WRITE_DEBUG 0
+#define REF_DEBUG 0
+
+static inline aos_msg_header *get_header(void *msg) {
+	return (aos_msg_header *)((uint8_t *)msg - sizeof(aos_msg_header));
+}
+static inline aos_queue *aos_core_alloc_queue() {
+	return shm_malloc_aligned(sizeof(aos_queue), sizeof(int));
+}
+static inline void *aos_alloc_msg(aos_msg_pool *pool) {
+	return shm_malloc(pool->msg_length);
+}
+
+// actually free the given message
+static inline int aos_free_msg(aos_msg_pool *pool, void *msg, aos_queue *queue) {
+#if REF_DEBUG
+	if (pool->pool_lock == 0) {
+		//LOG(WARNING, "unprotected\n");
+	}
+#endif
+	aos_msg_header *header = get_header(msg);
+	if (pool->pool[header->index] != header) { // if something's messed up
+		fprintf(stderr, "queue: something is very very wrong with queue %p."
+				" pool->pool(=%p)[header->index(=%d)] != header(=%p)\n",
+				queue, pool->pool, header->index, header);
+		printf("queue: see stderr\n");
+		abort();
+	}
+#if REF_DEBUG
+	printf("ref_free_count: %p\n", msg);
+#endif
+	--pool->used;
+
+	if (queue->recycle != NULL) {
+		void *const new_msg = aos_queue_get_msg(queue->recycle);
+		if (new_msg == NULL) {
+			fprintf(stderr, "queue: couldn't get a message"
+					" for recycle queue %p\n", queue->recycle);
+		} else {
+			// Take a message from recycle_queue and switch its
+			// header with the one being freed, which effectively
+			// switches which queue each message belongs to.
+			aos_msg_header *const new_header = get_header(new_msg);
+			// also switch the messages between the pools
+			pool->pool[header->index] = new_header;
+			if (mutex_lock(&queue->recycle->pool.pool_lock)) {
+				return -1;
+			}
+			queue->recycle->pool.pool[new_header->index] = header;
+			// swap the information in both headers
+			header_swap(header, new_header);
+			// don't unlock the other pool until all of its messages are valid
+			mutex_unlock(&queue->recycle->pool.pool_lock);
+			// use the header for new_msg which is now for this pool
+			header = new_header;
+			if (aos_queue_write_msg_free(queue->recycle,
+						(void *)msg, OVERRIDE) != 0) {
+				printf("queue: warning aos_queue_write_msg("
+						"%p(=queue(=%p)->recycle), %p, OVERRIDE)"
+						" failed\n",
+						queue->recycle, queue, msg);
+			}
+			msg = new_msg;
+		}
+	}
+
+	// where the one we're freeing was
+	int index = header->index;
+	header->index = -1;
+	if (index != pool->used) { // if we're not freeing the one on the end
+		// put the last one where the one we're freeing was
+		header = pool->pool[index] = pool->pool[pool->used];
+		// put the one we're freeing at the end
+		pool->pool[pool->used] = get_header(msg);
+		// update the former last one's index
+		header->index = index;
+	}
+	return 0;
+}
+// TODO(brians) maybe do this with atomic integer instructions so it doesn't have to lock/unlock pool_lock
+static inline int msg_ref_dec(void *msg, aos_msg_pool *pool, aos_queue *queue) {
+	if (msg == NULL) {
+		return 0;
+	}
+
+	int rv = 0;
+	if (mutex_lock(&pool->pool_lock)) {
+		return -1;
+	}
+	aos_msg_header *const header = get_header(msg);
+	header->ref_count --;
+	assert(header->ref_count >= 0);
+#if REF_DEBUG
+	printf("ref_dec_count: %p count=%d\n", msg, header->ref_count);
+#endif
+	if (header->ref_count == 0) {
+		rv = aos_free_msg(pool, msg, queue);
+	}
+	mutex_unlock(&pool->pool_lock);
+	return rv;
+}
+
+static inline int sigcmp(const aos_type_sig *sig1, const aos_type_sig *sig2) {
+	if (sig1->length != sig2->length) {
+		//LOG(DEBUG, "length mismatch 1=%d 2=%d\n", sig1->length, sig2->length);
+		return 0;
+	}
+	if (sig1->queue_length != sig2->queue_length) {
+		//LOG(DEBUG, "queue_length mismatch 1=%d 2=%d\n", sig1->queue_length, sig2->queue_length);
+		return 0;
+	}
+	if (sig1->hash != sig2->hash) {
+		//LOG(DEBUG, "hash mismatch 1=%d 2=%d\n", sig1->hash, sig2->hash);
+		return 0;
+	}
+	//LOG(DEBUG, "signature match\n");
+	return 1;
+}
+static inline aos_queue *aos_create_queue(const aos_type_sig *sig) {
+	aos_queue *const queue = aos_core_alloc_queue();
+	aos_msg_pool *const pool = &queue->pool;
+	pool->mem_length = sig->queue_length + EXTRA_MESSAGES;
+	pool->length = 0;
+	pool->used = 0;
+	pool->msg_length = sig->length + sizeof(aos_msg_header);
+	pool->pool = shm_malloc(sizeof(void *) * pool->mem_length);
+	aos_ring_buf *const buf = &queue->buf;
+	buf->length = sig->queue_length + 1;
+	if (buf->length < 2) { // TODO(brians) when could this happen?
+		buf->length = 2;
+	}
+	buf->data = shm_malloc(buf->length * sizeof(void *));
+	buf->start = 0;
+	buf->end = 0;
+	buf->msgs = 0;
+	buf->writable = 1;
+	buf->readable = 0;
+	buf->buff_lock = 0;
+	pool->pool_lock = 0;
+	queue->recycle = NULL;
+	return queue;
+}
+aos_queue *aos_fetch_queue(const char *name, const aos_type_sig *sig) {
+	//LOG(DEBUG, "Fetching the stupid queue: %s\n", name);
+	mutex_grab(&global_core->mem_struct->queues.alloc_lock);
+	aos_queue_list *list = global_core->mem_struct->queues.queue_list;
+	aos_queue_list *last = NULL;
+	while (list != NULL) {
+		// if we found a matching queue
+		if (strcmp(list->name, name) == 0 && sigcmp(&list->sig, sig)) {
+			mutex_unlock(&global_core->mem_struct->queues.alloc_lock);
+			return list->queue;
+		} else {
+			//LOG(DEBUG, "rejected queue %s strcmp=%d target=%s\n", (*list)->name, strcmp((*list)->name, name), name);
+		}
+		last = list;
+		list = list->next;
+	}
+	list = shm_malloc(sizeof(aos_queue_list));
+	if (last == NULL) {
+		global_core->mem_struct->queues.queue_list = list;
+	} else {
+		last->next = list;
+	}
+	list->sig = *sig;
+	const size_t name_size = strlen(name) + 1;
+	list->name = shm_malloc(name_size);
+	memcpy(list->name, name, name_size);
+	//LOG(INFO, "creating queue{name=%s, sig.length=%zd, sig.hash=%d, sig.queue_length=%d}\n", name, sig->length, sig->hash, sig->queue_length);
+	list->queue = aos_create_queue(sig);
+	//LOG(DEBUG, "Made the stupid queue: %s happy?\n", name);
+	list->next = NULL;
+	mutex_unlock(&global_core->mem_struct->queues.alloc_lock);
+	return list->queue;
+}
+aos_queue *aos_fetch_queue_recycle(const char *name, const aos_type_sig *sig,
+		const aos_type_sig *recycle_sig, aos_queue **recycle) {
+	if (sig->length != recycle_sig->length || sig->hash == recycle_sig->hash) {
+		*recycle = NULL;
+		return NULL;
+	}
+	aos_queue *const r = aos_fetch_queue(name, sig);
+	r->recycle = aos_fetch_queue(name, recycle_sig);
+	if (r == r->recycle) {
+		fprintf(stderr, "queue: r->recycle(=%p) == r(=%p)\n", r->recycle, r);
+		printf("see stderr\n");
+		abort();
+	}
+	*recycle = r->recycle;
+	return r;
+}
+
+int aos_queue_write_msg(aos_queue *queue, void *msg, int opts) {
+#if WRITE_DEBUG
+  printf("queue: write_msg(%p, %p, %d)\n", queue, msg, opts);
+#endif
+  int rv = 0;
+  if (msg == NULL || msg < (void *)global_core->mem_struct ||
+      msg > (void *)((intptr_t)global_core->mem_struct + global_core->size)) {
+    fprintf(stderr, "queue: attempt to write bad message %p to %p. aborting\n",
+            msg, queue);
+    printf("see stderr\n");
+    abort();
+  }
+  aos_ring_buf *const buf = &queue->buf;
+  if (mutex_lock(&buf->buff_lock)) {
+#if WRITE_DEBUG
+    printf("queue: locking buff_lock of %p failed\n", buf);
+#endif
+    return -1;
+  }
+  int new_end = (buf->end + 1) % buf->length;
+  while (new_end == buf->start) {
+    if (opts & NON_BLOCK) {
+#if WRITE_DEBUG
+      printf("queue: not blocking on %p. returning -1\n", queue);
+#endif
+      mutex_unlock(&buf->buff_lock);
+      return -1;
+    } else if (opts & OVERRIDE) {
+#if WRITE_DEBUG
+      printf("queue: overriding on %p\n", queue);
+#endif
+      // avoid leaking the message that we're going to overwrite
+      msg_ref_dec(buf->data[buf->start], &queue->pool, queue);
+      buf->start = (buf->start + 1) % buf->length;
+    } else { // BLOCK
+      mutex_unlock(&buf->buff_lock);
+#if WRITE_DEBUG
+      printf("queue: going to wait for writable(=%p) of %p\n",
+          &buf->writable, queue);
+#endif
+      if (condition_wait(&buf->writable)) {
+#if WRITE_DEBUG
+        printf("queue: waiting for writable(=%p) of %p failed\n",
+            &buf->writable, queue);
+#endif
+        return -1;
+      }
+#if WRITE_DEBUG
+      printf("queue: going to re-lock buff_lock of %p to write\n", queue);
+#endif
+      if (mutex_lock(&buf->buff_lock)) {
+#if WRITE_DEBUG
+        printf("queue: error locking buff_lock of %p\n", queue);
+#endif
+        return -1;
+      }
+    }
+    new_end = (buf->end + 1) % buf->length;
+  }
+  buf->data[buf->end] = msg;
+  ++buf->msgs;
+  buf->end = new_end;
+  mutex_unlock(&buf->buff_lock);
+#if WRITE_DEBUG
+  printf("queue: setting readable(=%p) of %p\n", &buf->readable, queue);
+#endif
+  condition_set(&buf->readable);
+  if (((buf->end + 1) % buf->length) == buf->start) { // if it's now full
+    condition_unset(&buf->writable);
+  }
+#if WRITE_DEBUG
+  printf("queue: write returning %d on queue %p\n", rv, queue);
+#endif
+  return rv;
+}
+
+int aos_queue_free_msg(aos_queue *queue, const void *msg) {
+	// TODO(brians) get rid of this
+	void *msg_temp;
+	memcpy(&msg_temp, &msg, sizeof(msg_temp));
+  	return msg_ref_dec(msg_temp, &queue->pool, queue);
+}
+// Deals with setting/unsetting readable and writable.
+// Should be called after buff_lock has been unlocked.
+// read is whether or not this read call read one off the queue
+static inline void aos_read_msg_common_end(aos_ring_buf *const buf, int read) {
+	if (read) {
+		condition_set(&buf->writable);
+		if (buf->start == buf->end) {
+			condition_unset(&buf->readable);
+		}
+	}
+}
+// Returns with buff_lock locked and a readable message in buf.
+// Returns -1 for error (if it returns -1, buff_lock will be unlocked).
+static inline int aos_queue_read_msg_common(int opts, aos_ring_buf *const buf,
+		aos_queue *const queue, int *index) {
+#if !READ_DEBUG
+	(void)queue;
+#endif
+	if (mutex_lock(&buf->buff_lock)) {
+#if READ_DEBUG
+		printf("queue: couldn't lock buff_lock of %p\n", queue);
+#endif
+		return -1;
+	}
+	while (buf->start == buf->end || ((index != NULL) && buf->msgs <= *index)) {
+		mutex_unlock(&buf->buff_lock);
+		if (opts & NON_BLOCK) {
+#if READ_DEBUG
+			printf("queue: not going to block waiting on %p\n", queue);
+#endif
+			return -1;
+		} else { // BLOCK
+#if READ_DEBUG
+			printf("queue: going to wait for readable(=%p) of %p\n",
+					&buf->readable, queue);
+#endif
+			// wait for a message to become readable
+			if ((index == NULL) ? condition_wait(&buf->readable) :
+					condition_wait_force(&buf->readable)) {
+#if READ_DEBUG
+				printf("queue: waiting for readable(=%p) of %p failed\n",
+						&buf->readable, queue);
+#endif
+				return -1;
+			}
+		}
+#if READ_DEBUG
+		printf("queue: going to re-lock buff_lock of %p to read\n", queue);
+#endif
+		if (mutex_lock(&buf->buff_lock)) {
+#if READ_DEBUG
+			printf("couldn't re-lock buff_lock of %p\n", queue);
+#endif
+			return -1;
+		}
+	}
+#if READ_DEBUG
+	printf("queue: read start=%d end=%d from %p\n", buf->start, buf->end, queue);
+#endif
+	return 0;
+}
+// handles reading with PEEK
+static inline void *read_msg_peek(aos_ring_buf *const buf, int opts, int start) {
+	void *ret;
+	if (opts & FROM_END) {
+		int pos = buf->end - 1;
+		if (pos < 0) { // if it needs to wrap
+			pos = buf->length - 1;
+		}
+#if READ_DEBUG
+		printf("queue: reading from line %d: %d\n", __LINE__, pos);
+#endif
+		ret = buf->data[pos];
+	} else {
+#if READ_DEBUG
+		printf("queue: reading from line %d: %d\n", __LINE__, start);
+#endif
+		ret = buf->data[start];
+	}
+	aos_msg_header *const header = get_header(ret);
+	header->ref_count ++;
+#if REF_DEBUG
+	printf("ref inc count: %p\n", ret);
+#endif
+	return ret;
+}
+const void *aos_queue_read_msg(aos_queue *queue, int opts) {
+#if READ_DEBUG
+	printf("queue: read_msg(%p, %d)\n", queue, opts);
+#endif
+	void *msg = NULL;
+	aos_ring_buf *const buf = &queue->buf;
+	if (aos_queue_read_msg_common(opts, buf, queue, NULL) == -1) {
+#if READ_DEBUG
+		printf("queue: common returned -1 for %p\n", queue);
+#endif
+		return NULL;
+	}
+	if (opts & PEEK) {
+		msg = read_msg_peek(buf, opts, buf->start);
+	} else {
+		if (opts & FROM_END) {
+			while (1) {
+#if READ_DEBUG
+				printf("queue: start of c2 of %p\n", queue);
+#endif
+				// This loop pulls each message out of the buffer.
+				const int pos = buf->start;
+				buf->start = (buf->start + 1) % buf->length;
+				// if this is the last one
+				if (buf->start == buf->end) {
+#if READ_DEBUG
+					printf("queue: reading from c2: %d\n", pos);
+#endif
+					msg = buf->data[pos];
+					break;
+				}
+				// it's not going to be in the queue any more
+				msg_ref_dec(buf->data[pos], &queue->pool, queue);
+			}
+		} else {
+#if READ_DEBUG
+			printf("queue: reading from d2: %d\n", buf->start);
+#endif
+			msg = buf->data[buf->start];
+			buf->start = (buf->start + 1) % buf->length;
+		}
+	}
+	mutex_unlock(&buf->buff_lock);
+	aos_read_msg_common_end(buf, !(opts & PEEK));
+#if READ_DEBUG
+	printf("queue: read returning %p\n", msg);
+#endif
+	return msg;
+}
+const void *aos_queue_read_msg_index(aos_queue *queue, int opts, int *index) {
+#if READ_DEBUG
+	printf("queue: read_msg_index(%p, %d, %p(*=%d))\n", queue, opts, index, *index);
+#endif
+	void *msg = NULL;
+	aos_ring_buf *const buf = &queue->buf;
+	if (aos_queue_read_msg_common(opts, buf, queue, index) == -1) {
+#if READ_DEBUG
+		printf("queue: common returned -1\n");
+#endif
+		return NULL;
+	}
+        // TODO(parker): Handle integer wrap on the index.
+	const int offset = buf->msgs - *index;
+	int my_start = buf->end - offset;
+	if (offset >= buf->length) { // if we're behind the available messages
+		// catch index up to the last available message
+		*index += buf->start - my_start;
+		// and that's the one we're going to read
+		my_start = buf->start;
+	}
+	if (my_start < 0) { // if we want to read off the end of the buffer
+		// unwrap where we're going to read from
+		my_start += buf->length;
+	}
+	if (opts & PEEK) {
+		msg = read_msg_peek(buf, opts, my_start);
+	} else {
+		if (opts & FROM_END) {
+#if READ_DEBUG
+			printf("queue: start of c1 of %p\n", queue);
+#endif
+			int pos = buf->end - 1;
+			if (pos < 0) { // if it wrapped
+				pos = buf->length - 1; // unwrap it
+			}
+#if READ_DEBUG
+			printf("queue: reading from c1: %d\n", pos);
+#endif
+			msg = buf->data[pos];
+			*index = buf->msgs;
+		} else {
+#if READ_DEBUG
+			printf("queue: reading from d1: %d\n", my_start);
+#endif
+			msg = buf->data[my_start];
+			++(*index);
+		}
+		aos_msg_header *const header = get_header(msg);
+		++header->ref_count;
+#if REF_DEBUG
+		printf("ref_inc_count: %p\n", msg);
+#endif
+	}
+	mutex_unlock(&buf->buff_lock);
+	// this function never consumes one off the queue
+	aos_read_msg_common_end(buf, 0);
+	return msg;
+}
+static inline void *aos_pool_get_msg(aos_msg_pool *pool) {
+	if (mutex_lock(&pool->pool_lock)) {
+		return NULL;
+	}
+	void *msg;
+	if (pool->length - pool->used > 0) {
+		msg = pool->pool[pool->used];
+	} else {
+		if (pool->length >= pool->mem_length) {
+			//TODO(brians) log this if it isn't the log queue
+			fprintf(stderr, "queue: overused_pool\n");
+			msg = NULL;
+			goto exit;
+		}
+		msg = pool->pool[pool->length] = aos_alloc_msg(pool);
+		++pool->length;
+	}
+	aos_msg_header *const header = msg;
+	msg = (uint8_t *)msg + sizeof(aos_msg_header);
+	header->ref_count = 1;
+#if REF_DEBUG
+	printf("ref alloc: %p\n", msg);
+#endif
+	header->index = pool->used;
+	++pool->used;
+exit:
+	mutex_unlock(&pool->pool_lock);
+	return msg;
+}
+void *aos_queue_get_msg(aos_queue *queue) {
+	return aos_pool_get_msg(&queue->pool);
+}
+