copying over queue-api-redo branch from my 2012 repo

This commit is not cleanly separated from the next one where I got
everything to compile again (and possibly even further on from there).
diff --git a/aos/atom_code/camera/Buffers.cpp b/aos/atom_code/camera/Buffers.cpp
index f580f89..bf16915 100644
--- a/aos/atom_code/camera/Buffers.cpp
+++ b/aos/atom_code/camera/Buffers.cpp
@@ -15,7 +15,6 @@
 };
 const std::string Buffers::kFDServerName("/tmp/aos_fd_server");
 const std::string Buffers::kQueueName("CameraBufferQueue");
-const aos_type_sig Buffers::kSignature{sizeof(Message), 971, 1};
 
 int Buffers::CreateSocket(int (*bind_connect)(int, const sockaddr *, socklen_t)) {
   union af_unix_sockaddr {
@@ -66,7 +65,7 @@
 
 void Buffers::Release() {
   if (message_ != NULL) {
-    aos_queue_free_msg(queue_, message_);
+    queue_->FreeMessage(message_);
     message_ = NULL;
   }
 }
@@ -77,11 +76,12 @@
   // TODO(brians) make sure the camera reader process hasn't died
   do {
     if (block) {
-      message_ = static_cast<const Message *>(aos_queue_read_msg(queue_, PEEK | BLOCK));
+      message_ = static_cast<const Message *>(queue_->ReadMessage(
+              Queue::kPeek | Queue::kBlock));
     } else {
       static int index = 0;
-      message_ = static_cast<const Message *>(aos_queue_read_msg_index(queue_, BLOCK,
-                                                                       &index));
+      message_ = static_cast<const Message *>(queue_->ReadMessageIndex(
+              Queue::kBlock, &index));
     }
   } while (block && message_ == NULL);
   if (message_ != NULL) {
@@ -137,7 +137,7 @@
 }
 Buffers::Buffers() : server_(CreateSocket(connect)), fd_(FetchFD()), message_(NULL) {
   MMap();
-  queue_ = aos_fetch_queue(kQueueName.c_str(), &kSignature);
+  queue_ = Queue::Fetch(kQueueName.c_str(), sizeof(Message), 971, 1);
 }
 
 Buffers::~Buffers() {
@@ -157,6 +157,5 @@
   }
 }
 
-} // namespace camera
-} // namespace aos
-
+}  // namespace camera
+}  // namespace aos
diff --git a/aos/atom_code/camera/Buffers.h b/aos/atom_code/camera/Buffers.h
index 6b54188..07177bc 100644
--- a/aos/atom_code/camera/Buffers.h
+++ b/aos/atom_code/camera/Buffers.h
@@ -53,9 +53,8 @@
   // The current one. Sometimes NULL.
   const Message *message_;
   static const std::string kQueueName;
-  static const aos_type_sig kSignature;
   // NULL for the Reader one.
-  aos_queue *queue_;
+  Queue *queue_;
   // Make the actual mmap calls.
   // Called by Buffers() automatically.
   void MMap();
diff --git a/aos/atom_code/camera/Reader.cpp b/aos/atom_code/camera/Reader.cpp
index 5f30cfe..c87d173 100644
--- a/aos/atom_code/camera/Reader.cpp
+++ b/aos/atom_code/camera/Reader.cpp
@@ -31,8 +31,7 @@
   // the bound socket listening for fd requests
   int server_fd_;
 
-  static const aos_type_sig kRecycleSignature;
-  aos_queue *queue_, *recycle_queue_;
+  Queue *queue_, *recycle_queue_;
   // the number of buffers currently queued in v4l2
   uint32_t queued_;
  public:
@@ -52,10 +51,11 @@
               dev_name, errno, strerror(errno));
     }
 
-    queue_ = aos_fetch_queue_recycle(Buffers::kQueueName.c_str(), &Buffers::kSignature,
-                                     &kRecycleSignature, &recycle_queue_);
+    queue_ = Queue::Fetch(Buffers::kQueueName.c_str(),
+                          sizeof(Buffers::Message), 971, 1,
+                          1, Buffers::kNumBuffers, &recycle_queue_);
     // read off any existing recycled messages
-    while (aos_queue_read_msg(recycle_queue_, NON_BLOCK) != NULL);
+    while (recycle_queue_->ReadMessage(Queue::kNonBlock) != NULL);
     queued_ = 0;
 
     InitServer();
@@ -140,10 +140,11 @@
       read = static_cast<const Buffers::Message *>(
           // we block waiting for one if we can't dequeue one without leaving
           // the driver <= 2 (to be safe)
-          aos_queue_read_msg(recycle_queue_, (queued_ <= 2) ? BLOCK : NON_BLOCK));
+          recycle_queue_->ReadMessage((queued_ <= 2) ?
+                                      Queue::kBlock : Queue::kNonBlock));
       if (read != NULL) {
         buf.index = read->index;
-        aos_queue_free_msg(recycle_queue_, read);
+        recycle_queue_->FreeMessage(read);
         QueueBuffer(&buf);
       }
     } while (read != NULL);
@@ -163,7 +164,7 @@
     }
 
     Buffers::Message *const msg = static_cast<Buffers::Message *>(
-        aos_queue_get_msg(queue_));
+        queue_->GetMessage());
     if (msg == NULL) {
       LOG(WARNING,
           "couldn't get a message to send buf #%" PRIu32 " from queue %p."
@@ -175,7 +176,7 @@
     msg->bytesused = buf.bytesused;
     memcpy(&msg->timestamp, &buf.timestamp, sizeof(msg->timestamp));
     msg->sequence = buf.sequence;
-    if (aos_queue_write_msg_free(queue_, msg, OVERRIDE) == -1) {
+    if (!queue->WriteMessage(msg, Queue::kOverride)) {
       LOG(WARNING,
           "sending message %p with buf #%" PRIu32 " to queue %p failed."
           " re-queueing now\n", msg, buf.index, queue_);
@@ -405,8 +406,6 @@
   }
 };
 const char *const Reader::dev_name = "/dev/video0";
-const aos_type_sig Reader::kRecycleSignature{
-  sizeof(Buffers::Message), 1, Buffers::kNumBuffers};
 
 } // namespace camera
 } // namespace aos
diff --git a/aos/atom_code/core/LogStreamer.cpp b/aos/atom_code/core/LogStreamer.cpp
index d3d4928..1294128 100644
--- a/aos/atom_code/core/LogStreamer.cpp
+++ b/aos/atom_code/core/LogStreamer.cpp
@@ -31,7 +31,7 @@
 
   int index = 0;
   while (true) {
-    const LogMessage *const msg = ReadNext(BLOCK, &index);
+    const LogMessage *const msg = ReadNext(Queue::kBlock, &index);
     if (msg == NULL) continue;
 
     internal::PrintMessage(stdout, *msg);
diff --git a/aos/atom_code/ipc_lib/core_lib.c b/aos/atom_code/ipc_lib/core_lib.c
index 53587d8..26581a1 100644
--- a/aos/atom_code/ipc_lib/core_lib.c
+++ b/aos/atom_code/ipc_lib/core_lib.c
@@ -6,7 +6,6 @@
 
 void init_shared_mem_core(aos_shm_core *shm_core) {
 	clock_gettime(CLOCK_REALTIME, &shm_core->identifier);
-	shm_core->queues.alloc_flag = 0;
 	shm_core->msg_alloc_lock = 0;
 	shm_core->queues.queue_list = NULL;
 	shm_core->queues.alloc_lock = 0;
diff --git a/aos/atom_code/ipc_lib/core_lib.h b/aos/atom_code/ipc_lib/core_lib.h
index f72ae4c..233660b 100644
--- a/aos/atom_code/ipc_lib/core_lib.h
+++ b/aos/atom_code/ipc_lib/core_lib.h
@@ -18,12 +18,10 @@
 extern "C" {
 #endif  // __cplusplus
 
-struct aos_queue_list_t;
-typedef struct aos_queue_hash_t {
-	int alloc_flag;
+typedef struct aos_queue_global_t {
 	mutex alloc_lock;
-	struct aos_queue_list_t *queue_list;
-} aos_queue_hash;
+	void *queue_list;  // an aos::Queue* declared in C code
+} aos_queue_global;
 
 typedef struct aos_shm_core_t {
   // clock_gettime(CLOCK_REALTIME, &identifier) gets called to identify
@@ -34,7 +32,7 @@
   mutex creation_condition;
   mutex msg_alloc_lock;
   void *msg_alloc;
-  aos_queue_hash queues;
+  aos_queue_global queues;
 } aos_shm_core;
 
 void init_shared_mem_core(aos_shm_core *shm_core);
diff --git a/aos/atom_code/ipc_lib/queue.c b/aos/atom_code/ipc_lib/queue.c
deleted file mode 100644
index 6c1ce71..0000000
--- a/aos/atom_code/ipc_lib/queue.c
+++ /dev/null
@@ -1,478 +0,0 @@
-#include "aos/atom_code/ipc_lib/queue.h"
-#include "aos/atom_code/ipc_lib/queue_internal.h"
-
-#include <stdio.h>
-#include <string.h>
-#include <errno.h>
-#include <assert.h>
-
-#include "aos/common/logging/logging.h"
-
-#define READ_DEBUG 0
-#define WRITE_DEBUG 0
-#define REF_DEBUG 0
-
-static inline aos_msg_header *get_header(void *msg) {
-	return (aos_msg_header *)((uint8_t *)msg - sizeof(aos_msg_header));
-}
-static inline aos_queue *aos_core_alloc_queue() {
-	return shm_malloc_aligned(sizeof(aos_queue), sizeof(int));
-}
-static inline void *aos_alloc_msg(aos_msg_pool *pool) {
-	return shm_malloc(pool->msg_length);
-}
-
-// actually free the given message
-static inline int aos_free_msg(aos_msg_pool *pool, void *msg, aos_queue *queue) {
-#if REF_DEBUG
-	if (pool->pool_lock == 0) {
-		//LOG(WARNING, "unprotected\n");
-	}
-#endif
-	aos_msg_header *header = get_header(msg);
-	if (pool->pool[header->index] != header) { // if something's messed up
-		fprintf(stderr, "queue: something is very very wrong with queue %p."
-				" pool->pool(=%p)[header->index(=%d)] != header(=%p)\n",
-				queue, pool->pool, header->index, header);
-		printf("queue: see stderr\n");
-		abort();
-	}
-#if REF_DEBUG
-	printf("ref free: %p\n", msg);
-#endif
-	--pool->used;
-
-	if (queue->recycle != NULL) {
-		void *const new_msg = aos_queue_get_msg(queue->recycle);
-		if (new_msg == NULL) {
-			fprintf(stderr, "queue: couldn't get a message"
-					" for recycle queue %p\n", queue->recycle);
-		} else {
-			// Take a message from recycle_queue and switch its
-			// header with the one being freed, which effectively
-			// switches which queue each message belongs to.
-			aos_msg_header *const new_header = get_header(new_msg);
-			// also switch the messages between the pools
-			pool->pool[header->index] = new_header;
-			if (mutex_lock(&queue->recycle->pool.pool_lock)) {
-				return -1;
-			}
-			queue->recycle->pool.pool[new_header->index] = header;
-			// swap the information in both headers
-			header_swap(header, new_header);
-			// don't unlock the other pool until all of its messages are valid
-			mutex_unlock(&queue->recycle->pool.pool_lock);
-			// use the header for new_msg which is now for this pool
-			header = new_header;
-			if (aos_queue_write_msg_free(queue->recycle,
-						(void *)msg, OVERRIDE) != 0) {
-				printf("queue: warning aos_queue_write_msg("
-						"%p(=queue(=%p)->recycle), %p, OVERRIDE)"
-						" failed\n",
-						queue->recycle, queue, msg);
-			}
-			msg = new_msg;
-		}
-	}
-
-	// where the one we're freeing was
-	int index = header->index;
-	header->index = -1;
-	if (index != pool->used) { // if we're not freeing the one on the end
-		// put the last one where the one we're freeing was
-		header = pool->pool[index] = pool->pool[pool->used];
-		// put the one we're freeing at the end
-		pool->pool[pool->used] = get_header(msg);
-		// update the former last one's index
-		header->index = index;
-	}
-	return 0;
-}
-// TODO(brians) maybe do this with atomic integer instructions so it doesn't have to lock/unlock pool_lock
-static inline int msg_ref_dec(void *msg, aos_msg_pool *pool, aos_queue *queue) {
-	if (msg == NULL) {
-		return 0;
-	}
-
-	int rv = 0;
-	if (mutex_lock(&pool->pool_lock)) {
-		return -1;
-	}
-	aos_msg_header *const header = get_header(msg);
-	header->ref_count --;
-	assert(header->ref_count >= 0);
-#if REF_DEBUG
-	printf("ref_dec_count: %p count=%d\n", msg, header->ref_count);
-#endif
-	if (header->ref_count == 0) {
-		rv = aos_free_msg(pool, msg, queue);
-	}
-	mutex_unlock(&pool->pool_lock);
-	return rv;
-}
-
-static inline int sigcmp(const aos_type_sig *sig1, const aos_type_sig *sig2) {
-	if (sig1->length != sig2->length) {
-		//LOG(DEBUG, "length mismatch 1=%d 2=%d\n", sig1->length, sig2->length);
-		return 0;
-	}
-	if (sig1->queue_length != sig2->queue_length) {
-		//LOG(DEBUG, "queue_length mismatch 1=%d 2=%d\n", sig1->queue_length, sig2->queue_length);
-		return 0;
-	}
-	if (sig1->hash != sig2->hash) {
-		//LOG(DEBUG, "hash mismatch 1=%d 2=%d\n", sig1->hash, sig2->hash);
-		return 0;
-	}
-	//LOG(DEBUG, "signature match\n");
-	return 1;
-}
-static inline aos_queue *aos_create_queue(const aos_type_sig *sig) {
-	aos_queue *const queue = aos_core_alloc_queue();
-	aos_msg_pool *const pool = &queue->pool;
-	pool->mem_length = sig->queue_length + EXTRA_MESSAGES;
-	pool->length = 0;
-	pool->used = 0;
-	pool->msg_length = sig->length + sizeof(aos_msg_header);
-	pool->pool = shm_malloc(sizeof(void *) * pool->mem_length);
-	aos_ring_buf *const buf = &queue->buf;
-	buf->length = sig->queue_length + 1;
-	if (buf->length < 2) { // TODO(brians) when could this happen?
-		buf->length = 2;
-	}
-	buf->data = shm_malloc(buf->length * sizeof(void *));
-	buf->start = 0;
-	buf->end = 0;
-	buf->msgs = 0;
-	memset(&buf->writable, 0, sizeof(buf->writable));
-	memset(&buf->readable, 0, sizeof(buf->readable));
-	buf->buff_lock = 0;
-	pool->pool_lock = 0;
-	queue->recycle = NULL;
-	return queue;
-}
-aos_queue *aos_fetch_queue(const char *name, const aos_type_sig *sig) {
-	//LOG(DEBUG, "Fetching the stupid queue: %s\n", name);
-	mutex_grab(&global_core->mem_struct->queues.alloc_lock);
-	aos_queue_list *list = global_core->mem_struct->queues.queue_list;
-	aos_queue_list *last = NULL;
-	while (list != NULL) {
-		// if we found a matching queue
-		if (strcmp(list->name, name) == 0 && sigcmp(&list->sig, sig)) {
-			mutex_unlock(&global_core->mem_struct->queues.alloc_lock);
-			return list->queue;
-		} else {
-			//LOG(DEBUG, "rejected queue %s strcmp=%d target=%s\n", (*list)->name, strcmp((*list)->name, name), name);
-		}
-		last = list;
-		list = list->next;
-	}
-	list = shm_malloc(sizeof(aos_queue_list));
-	if (last == NULL) {
-		global_core->mem_struct->queues.queue_list = list;
-	} else {
-		last->next = list;
-	}
-	list->sig = *sig;
-	const size_t name_size = strlen(name) + 1;
-	list->name = shm_malloc(name_size);
-	memcpy(list->name, name, name_size);
-	//LOG(INFO, "creating queue{name=%s, sig.length=%zd, sig.hash=%d, sig.queue_length=%d}\n", name, sig->length, sig->hash, sig->queue_length);
-	list->queue = aos_create_queue(sig);
-	//LOG(DEBUG, "Made the stupid queue: %s happy?\n", name);
-	list->next = NULL;
-	mutex_unlock(&global_core->mem_struct->queues.alloc_lock);
-	return list->queue;
-}
-aos_queue *aos_fetch_queue_recycle(const char *name, const aos_type_sig *sig,
-		const aos_type_sig *recycle_sig, aos_queue **recycle) {
-	if (sig->length != recycle_sig->length || sig->hash == recycle_sig->hash) {
-		*recycle = NULL;
-		return NULL;
-	}
-	aos_queue *const r = aos_fetch_queue(name, sig);
-	r->recycle = aos_fetch_queue(name, recycle_sig);
-	if (r == r->recycle) {
-		fprintf(stderr, "queue: r->recycle(=%p) == r(=%p)\n", r->recycle, r);
-		printf("see stderr\n");
-		abort();
-	}
-	*recycle = r->recycle;
-	return r;
-}
-
-int aos_queue_write_msg(aos_queue *queue, void *msg, int opts) {
-#if WRITE_DEBUG
-  printf("queue: write_msg(%p, %p, %d)\n", queue, msg, opts);
-#endif
-  int rv = 0;
-  if (msg == NULL || msg < (void *)global_core->mem_struct ||
-      msg > (void *)((intptr_t)global_core->mem_struct + global_core->size)) {
-    fprintf(stderr, "queue: attempt to write bad message %p to %p. aborting\n",
-            msg, queue);
-    printf("see stderr\n");
-    abort();
-  }
-  aos_ring_buf *const buf = &queue->buf;
-  if (mutex_lock(&buf->buff_lock)) {
-#if WRITE_DEBUG
-    printf("queue: locking buff_lock of %p failed\n", buf);
-#endif
-    return -1;
-  }
-  int new_end = (buf->end + 1) % buf->length;
-  while (new_end == buf->start) {
-    if (opts & NON_BLOCK) {
-#if WRITE_DEBUG
-      printf("queue: not blocking on %p. returning -1\n", queue);
-#endif
-      mutex_unlock(&buf->buff_lock);
-      return -1;
-    } else if (opts & OVERRIDE) {
-#if WRITE_DEBUG
-      printf("queue: overriding on %p\n", queue);
-#endif
-      // avoid leaking the message that we're going to overwrite
-      msg_ref_dec(buf->data[buf->start], &queue->pool, queue);
-      buf->start = (buf->start + 1) % buf->length;
-    } else { // BLOCK
-#if WRITE_DEBUG
-      printf("queue: going to wait for writable(=%p) of %p\n",
-          &buf->writable, queue);
-#endif
-       condition_wait(&buf->writable, &buf->buff_lock);
-#if WRITE_DEBUG
-       printf("queue: done waiting for writable(=%p) of %p\n",
-              &buf->writable, queue);
-#endif
-     }
-     new_end = (buf->end + 1) % buf->length;
-   }
-  buf->data[buf->end] = msg;
-  ++buf->msgs;
-  buf->end = new_end;
-  mutex_unlock(&buf->buff_lock);
-#if WRITE_DEBUG
-  printf("queue: setting readable(=%p) of %p\n", &buf->readable, queue);
-#endif
-  condition_signal(&buf->readable);
-#if WRITE_DEBUG
-  printf("queue: write returning %d on queue %p\n", rv, queue);
-#endif
-  return rv;
-}
-
-int aos_queue_free_msg(aos_queue *queue, const void *msg) {
-	// TODO(brians) get rid of this
-	void *msg_temp;
-	memcpy(&msg_temp, &msg, sizeof(msg_temp));
-  	return msg_ref_dec(msg_temp, &queue->pool, queue);
-}
-// Deals with setting/unsetting readable and writable.
-// Should be called after buff_lock has been unlocked.
-// read is whether or not this read call read one off the queue
-static inline void aos_read_msg_common_end(aos_ring_buf *const buf, int read) {
-	if (read) {
-		condition_signal(&buf->writable);
-	}
-}
-// Returns with buff_lock locked and a readable message in buf.
-// Returns -1 for error (if it returns -1, buff_lock will be unlocked).
-static inline int aos_queue_read_msg_common(int opts, aos_ring_buf *const buf,
-		aos_queue *const queue, int *index) {
-#if !READ_DEBUG
-	(void)queue;
-#endif
-	if (mutex_lock(&buf->buff_lock)) {
-#if READ_DEBUG
-		printf("queue: couldn't lock buff_lock of %p\n", queue);
-#endif
-		return -1;
-	}
-	while (buf->start == buf->end || ((index != NULL) && buf->msgs <= *index)) {
-		if (opts & NON_BLOCK) {
-			mutex_unlock(&buf->buff_lock);
-#if READ_DEBUG
-			printf("queue: not going to block waiting on %p\n", queue);
-#endif
-			return -1;
-		} else { // BLOCK
-#if READ_DEBUG
-			printf("queue: going to wait for readable(=%p) of %p\n",
-					&buf->readable, queue);
-#endif
-			// wait for a message to become readable
-			condition_wait(&buf->readable, &buf->buff_lock);
-#if READ_DEBUG
-			printf("queue: done waiting for readable(=%p) of %p\n",
-					&buf->readable, queue);
-#endif
-		}
-	}
-#if READ_DEBUG
-	printf("queue: read start=%d end=%d from %p\n", buf->start, buf->end, queue);
-#endif
-	return 0;
-}
-// handles reading with PEEK
-static inline void *read_msg_peek(aos_ring_buf *const buf, int opts, int start) {
-	void *ret;
-	if (opts & FROM_END) {
-		int pos = buf->end - 1;
-		if (pos < 0) { // if it needs to wrap
-			pos = buf->length - 1;
-		}
-#if READ_DEBUG
-		printf("queue: reading from line %d: %d\n", __LINE__, pos);
-#endif
-		ret = buf->data[pos];
-	} else {
-#if READ_DEBUG
-		printf("queue: reading from line %d: %d\n", __LINE__, start);
-#endif
-		ret = buf->data[start];
-	}
-	aos_msg_header *const header = get_header(ret);
-	header->ref_count ++;
-#if REF_DEBUG
-	printf("ref inc count: %p\n", ret);
-#endif
-	return ret;
-}
-const void *aos_queue_read_msg(aos_queue *queue, int opts) {
-#if READ_DEBUG
-	printf("queue: read_msg(%p, %d)\n", queue, opts);
-#endif
-	void *msg = NULL;
-	aos_ring_buf *const buf = &queue->buf;
-	if (aos_queue_read_msg_common(opts, buf, queue, NULL) == -1) {
-#if READ_DEBUG
-		printf("queue: common returned -1 for %p\n", queue);
-#endif
-		return NULL;
-	}
-	if (opts & PEEK) {
-		msg = read_msg_peek(buf, opts, buf->start);
-	} else {
-		if (opts & FROM_END) {
-			while (1) {
-#if READ_DEBUG
-				printf("queue: start of c2 of %p\n", queue);
-#endif
-				// This loop pulls each message out of the buffer.
-				const int pos = buf->start;
-				buf->start = (buf->start + 1) % buf->length;
-				// if this is the last one
-				if (buf->start == buf->end) {
-#if READ_DEBUG
-					printf("queue: reading from c2: %d\n", pos);
-#endif
-					msg = buf->data[pos];
-					break;
-				}
-				// it's not going to be in the queue any more
-				msg_ref_dec(buf->data[pos], &queue->pool, queue);
-			}
-		} else {
-#if READ_DEBUG
-			printf("queue: reading from d2: %d\n", buf->start);
-#endif
-			msg = buf->data[buf->start];
-			buf->start = (buf->start + 1) % buf->length;
-		}
-	}
-	mutex_unlock(&buf->buff_lock);
-	aos_read_msg_common_end(buf, !(opts & PEEK));
-#if READ_DEBUG
-	printf("queue: read returning %p\n", msg);
-#endif
-	return msg;
-}
-const void *aos_queue_read_msg_index(aos_queue *queue, int opts, int *index) {
-#if READ_DEBUG
-	printf("queue: read_msg_index(%p, %d, %p(*=%d))\n", queue, opts, index, *index);
-#endif
-	void *msg = NULL;
-	aos_ring_buf *const buf = &queue->buf;
-	if (aos_queue_read_msg_common(opts, buf, queue, index) == -1) {
-#if READ_DEBUG
-		printf("queue: common returned -1\n");
-#endif
-		return NULL;
-	}
-        // TODO(parker): Handle integer wrap on the index.
-	const int offset = buf->msgs - *index;
-	int my_start = buf->end - offset;
-	if (offset >= buf->length) { // if we're behind the available messages
-		// catch index up to the last available message
-		*index += buf->start - my_start;
-		// and that's the one we're going to read
-		my_start = buf->start;
-	}
-	if (my_start < 0) { // if we want to read off the end of the buffer
-		// unwrap where we're going to read from
-		my_start += buf->length;
-	}
-	if (opts & PEEK) {
-		msg = read_msg_peek(buf, opts, my_start);
-	} else {
-		if (opts & FROM_END) {
-#if READ_DEBUG
-			printf("queue: start of c1 of %p\n", queue);
-#endif
-			int pos = buf->end - 1;
-			if (pos < 0) { // if it wrapped
-				pos = buf->length - 1; // unwrap it
-			}
-#if READ_DEBUG
-			printf("queue: reading from c1: %d\n", pos);
-#endif
-			msg = buf->data[pos];
-			*index = buf->msgs;
-		} else {
-#if READ_DEBUG
-			printf("queue: reading from d1: %d\n", my_start);
-#endif
-			msg = buf->data[my_start];
-			++(*index);
-		}
-		aos_msg_header *const header = get_header(msg);
-		++header->ref_count;
-#if REF_DEBUG
-		printf("ref_inc_count: %p\n", msg);
-#endif
-	}
-	mutex_unlock(&buf->buff_lock);
-	// this function never consumes one off the queue
-	aos_read_msg_common_end(buf, 0);
-	return msg;
-}
-static inline void *aos_pool_get_msg(aos_msg_pool *pool) {
-	if (mutex_lock(&pool->pool_lock)) {
-		return NULL;
-	}
-	void *msg;
-	if (pool->length - pool->used > 0) {
-		msg = pool->pool[pool->used];
-	} else {
-		if (pool->length >= pool->mem_length) {
-			LOG(FATAL, "overused pool %p\n", pool);
-		}
-		msg = pool->pool[pool->length] = aos_alloc_msg(pool);
-		++pool->length;
-	}
-	aos_msg_header *const header = msg;
-	msg = (uint8_t *)msg + sizeof(aos_msg_header);
-	header->ref_count = 1;
-#if REF_DEBUG
-	printf("ref alloc: %p\n", msg);
-#endif
-	header->index = pool->used;
-	++pool->used;
-	mutex_unlock(&pool->pool_lock);
-	return msg;
-}
-void *aos_queue_get_msg(aos_queue *queue) {
-	return aos_pool_get_msg(&queue->pool);
-}
-
diff --git a/aos/atom_code/ipc_lib/queue.cc b/aos/atom_code/ipc_lib/queue.cc
new file mode 100644
index 0000000..57e2a5e
--- /dev/null
+++ b/aos/atom_code/ipc_lib/queue.cc
@@ -0,0 +1,447 @@
+#include "aos/common/queue.h"
+
+#include <stdio.h>
+#include <string.h>
+#include <errno.h>
+#include <assert.h>
+
+#include <memory>
+
+#include "aos/common/logging/logging.h"
+#include "aos/common/type_traits.h"
+
+namespace aos {
+
+namespace {
+
+static_assert(shm_ok<Queue>::value, "Queue instances go into shared memory");
+
+const bool kReadDebug = false;
+const bool kWriteDebug = false;
+const bool kRefDebug = false;
+const bool kFetchDebug = false;
+
+// The number of extra messages the pool associated with each queue will be able
+// to hold (for readers who are slow about freeing them).
+const int kExtraMessages = 20;
+
+}  // namespace
+
+struct Queue::MessageHeader {
+  int ref_count;
+  int index;  // in pool_
+  static MessageHeader *Get(const void *msg) {
+    return reinterpret_cast<MessageHeader *>(
+        static_cast<uint8_t *>(const_cast<void *>(msg)) -
+        sizeof(MessageHeader));
+  }
+  void Swap(MessageHeader *other) {
+    MessageHeader temp;
+    memcpy(&temp, other, sizeof(temp));
+    memcpy(other, this, sizeof(*other));
+    memcpy(this, &temp, sizeof(*this));
+  }
+};
+static_assert(shm_ok<Queue::MessageHeader>::value, "the whole point"
+              " is to stick it in shared memory");
+
+// TODO(brians) maybe do this with atomic integer instructions so it doesn't
+//   have to lock/unlock pool_lock_
+void Queue::DecrementMessageReferenceCount(const void *msg) {
+  MutexLocker locker(&pool_lock_);
+  MessageHeader *header = MessageHeader::Get(msg);
+  --header->ref_count;
+  assert(header->ref_count >= 0);
+  if (kRefDebug) {
+    printf("ref_dec_count: %p count=%d\n", msg, header->ref_count);
+  }
+  if (header->ref_count == 0) {
+    DoFreeMessage(msg);
+  }
+}
+
+Queue::Queue(const char *name, size_t length, int hash, int queue_length) {
+  const size_t name_size = strlen(name) + 1;
+  char *temp = static_cast<char *>(shm_malloc(name_size));
+  memcpy(temp, name, name_size);
+  name_ = temp;
+  length_ = length;
+  hash_ = hash;
+  queue_length_ = queue_length;
+
+  next_ = NULL;
+  recycle_ = NULL;
+
+  if (kFetchDebug) {
+    printf("initializing name=%s, length=%zd, hash=%d, queue_length=%d\n",
+           name, length, hash, queue_length);
+  }
+
+  data_length_ = queue_length + 1;
+  if (data_length_ < 2) {  // TODO(brians) when could this happen?
+    data_length_ = 2;
+  }
+  data_ = static_cast<void **>(shm_malloc(sizeof(void *) * data_length_));
+  data_start_ = 0;
+  data_end_ = 0;
+  messages_ = 0;
+
+  mem_length_ = queue_length + kExtraMessages;
+  pool_length_ = 0;
+  messages_used_ = 0;
+  msg_length_ = length + sizeof(MessageHeader);
+  pool_ = static_cast<MessageHeader **>(
+      shm_malloc(sizeof(MessageHeader *) * mem_length_));
+
+  if (kFetchDebug) {
+    printf("made queue %s\n", name);
+  }
+}
+Queue *Queue::Fetch(const char *name, size_t length, int hash,
+                    int queue_length) {
+  if (kFetchDebug) {
+    printf("fetching queue %s\n", name);
+  }
+  if (mutex_lock(&global_core->mem_struct->queues.alloc_lock) != 0) {
+    return NULL;
+  }
+  Queue *current = static_cast<Queue *>(
+      global_core->mem_struct->queues.queue_list);
+  Queue *last = NULL;
+  while (current != NULL) {
+    // if we found a matching queue
+    if (strcmp(current->name_, name) == 0 && current->length_ == length &&
+        current->hash_ == hash && current->queue_length_ == queue_length) {
+      mutex_unlock(&global_core->mem_struct->queues.alloc_lock);
+      return current;
+    } else {
+      if (kFetchDebug) {
+        printf("rejected queue %s strcmp=%d target=%s\n", current->name_,
+               strcmp(current->name_, name), name);
+      }
+    }
+    current = current->next_;
+  }
+
+  void *temp = shm_malloc(sizeof(Queue));
+  current = new (temp) Queue(name, length, hash, queue_length);
+  if (last == NULL) {  // if we don't have one to tack the new one on to
+    global_core->mem_struct->queues.queue_list = current;
+  } else {
+    last->next_ = current;
+  }
+
+  mutex_unlock(&global_core->mem_struct->queues.alloc_lock);
+  return current;
+}
+Queue *Queue::Fetch(const char *name, size_t length, int hash,
+                    int queue_length,
+                    int recycle_hash, int recycle_length, Queue **recycle) {
+  Queue *r = Fetch(name, length, hash, queue_length);
+  r->recycle_ = Fetch(name, length, recycle_hash, recycle_length);
+  if (r == r->recycle_) {
+    fprintf(stderr, "queue: r->recycle_(=%p) == r(=%p)\n", r->recycle_, r);
+    printf("see stderr\n");
+    abort();
+  }
+  *recycle = r->recycle_;
+  return r;
+}
+
+void Queue::DoFreeMessage(const void *msg) {
+  MessageHeader *header = MessageHeader::Get(msg);
+  if (pool_[header->index] != header) {  // if something's messed up
+    fprintf(stderr, "queue: something is very very wrong with queue %p."
+            " pool_(=%p)[header->index(=%d)] != header(=%p)\n",
+            this, pool_, header->index, header);
+    printf("queue: see stderr\n");
+    abort();
+  }
+  if (kRefDebug) {
+    printf("ref free: %p\n", msg);
+  }
+  --messages_used_;
+
+  if (recycle_ != NULL) {
+    void *const new_msg = recycle_->GetMessage();
+    if (new_msg == NULL) {
+      fprintf(stderr, "queue: couldn't get a message"
+              " for recycle queue %p\n", recycle_);
+    } else {
+      // Take a message from recycle_ and switch its
+      // header with the one being freed, which effectively
+      // switches which queue each message belongs to.
+      MessageHeader *const new_header = MessageHeader::Get(new_msg);
+      // also switch the messages between the pools
+      pool_[header->index] = new_header;
+      {
+        MutexLocker locker(&recycle_->pool_lock_);
+        recycle_->pool_[new_header->index] = header;
+        // swap the information in both headers
+        header->Swap(new_header);
+        // don't unlock the other pool until all of its messages are valid
+      }
+      // use the header for new_msg which is now for this pool
+      header = new_header;
+      if (!recycle_->WriteMessage(const_cast<void *>(msg), kOverride)) {
+        fprintf(stderr, "queue: %p->WriteMessage(%p, kOverride) failed."
+                " aborting\n", recycle_, msg);
+        printf("see stderr\n");
+        abort();
+      }
+      msg = new_msg;
+    }
+  }
+
+  // where the one we're freeing was
+  int index = header->index;
+  header->index = -1;
+  if (index != messages_used_) {  // if we're not freeing the one on the end
+    // put the last one where the one we're freeing was
+    header = pool_[index] = pool_[messages_used_];
+    // put the one we're freeing at the end
+    pool_[messages_used_] = MessageHeader::Get(msg);
+    // update the former last one's index
+    header->index = index;
+  }
+}
+
+bool Queue::WriteMessage(void *msg, int options) {
+  if (kWriteDebug) {
+    printf("queue: %p->WriteMessage(%p, %d)\n", this, msg, options);
+  }
+  if (msg == NULL || msg < reinterpret_cast<void *>(global_core->mem_struct) ||
+      msg > static_cast<void *>((
+              reinterpret_cast<uintptr_t>(global_core->mem_struct) +
+              global_core->size))) {
+    fprintf(stderr, "queue: attempt to write bad message %p to %p. aborting\n",
+            msg, this);
+    printf("see stderr\n");
+    abort();
+  }
+  {
+    MutexLocker locker(&data_lock_);
+    int new_end = (data_end_ + 1) % data_length_;
+    while (new_end == data_start_) {
+      if (options & kNonBlock) {
+        if (kWriteDebug) {
+          printf("queue: not blocking on %p. returning -1\n", this);
+        }
+        return false;
+      } else if (options & kOverride) {
+        if (kWriteDebug) {
+          printf("queue: overriding on %p\n", this);
+        }
+        // avoid leaking the message that we're going to overwrite
+        DecrementMessageReferenceCount(data_[data_start_]);
+        data_start_ = (data_start_ + 1) % data_length_;
+      } else {  // kBlock
+        if (kWriteDebug) {
+          printf("queue: going to wait for writable_ of %p\n", this);
+        }
+        writable_.Wait(&data_lock_);
+      }
+      new_end = (data_end_ + 1) % data_length_;
+    }
+    data_[data_end_] = msg;
+    ++messages_;
+    data_end_ = new_end;
+  }
+  if (kWriteDebug) {
+    printf("queue: setting readable of %p\n", this);
+  }
+  readable_.Signal();
+  if (kWriteDebug) {
+    printf("queue: write returning true on queue %p\n", this);
+  }
+  return true;
+}
+
+void Queue::ReadCommonEnd(bool read) {
+  if (read) {
+    writable_.Signal();
+  }
+}
+bool Queue::ReadCommonStart(int options, int *index) {
+  while (data_start_ == data_end_ || ((index != NULL) && messages_ <= *index)) {
+    if (options & kNonBlock) {
+      if (kReadDebug) {
+        printf("queue: not going to block waiting on %p\n", this);
+      }
+      return false;
+    } else {  // kBlock
+      if (kReadDebug) {
+        printf("queue: going to wait for readable of %p\n", this);
+      }
+      data_lock_.Unlock();
+      // wait for a message to become readable
+      readable_.Wait();
+      if (kReadDebug) {
+        printf("queue: done waiting for readable of %p\n", this);
+      }
+      data_lock_.Lock();
+    }
+  }
+  if (kReadDebug) {
+    printf("queue: %p->read start=%d end=%d\n", this, data_start_, data_end_);
+  }
+  return true;
+}
+void *Queue::ReadPeek(int options, int start) {
+  void *ret;
+  if (options & kFromEnd) {
+    int pos = data_end_ - 1;
+    if (pos < 0) {  // if it needs to wrap
+      pos = data_length_ - 1;
+    }
+    if (kReadDebug) {
+      printf("queue: reading from line %d: %d\n", __LINE__, pos);
+    }
+    ret = data_[pos];
+  } else {
+    if (kReadDebug) {
+      printf("queue: reading from line %d: %d\n", __LINE__, start);
+    }
+    ret = data_[start];
+  }
+  MessageHeader *const header = MessageHeader::Get(ret);
+  ++header->ref_count;
+  if (kRefDebug) {
+    printf("ref inc count: %p\n", ret);
+  }
+  return ret;
+}
+const void *Queue::ReadMessage(int options) {
+  if (kReadDebug) {
+    printf("queue: %p->ReadMessage(%d)\n", this, options);
+  }
+  void *msg = NULL;
+  MutexLocker locker(&data_lock_);
+  if (!ReadCommonStart(options, NULL)) {
+    if (kReadDebug) {
+      printf("queue: common returned false for %p\n", this);
+    }
+    return NULL;
+  }
+  if (options & kPeek) {
+    msg = ReadPeek(options, data_start_);
+  } else {
+    if (options & kFromEnd) {
+      while (1) {
+        if (kReadDebug) {
+          printf("queue: start of c2 of %p\n", this);
+        }
+        // This loop pulls each message out of the buffer.
+        const int pos = data_start_;
+        data_start_ = (data_start_ + 1) % data_length_;
+        // if this is the last one
+        if (data_start_ == data_end_) {
+          if (kReadDebug) {
+            printf("queue: reading from c2: %d\n", pos);
+          }
+          msg = data_[pos];
+          break;
+        }
+        // it's not going to be in the queue any more
+        DecrementMessageReferenceCount(data_[pos]);
+      }
+    } else {
+      if (kReadDebug) {
+        printf("queue: reading from d2: %d\n", data_start_);
+      }
+      msg = data_[data_start_];
+      data_start_ = (data_start_ + 1) % data_length_;
+    }
+  }
+  ReadCommonEnd(!(options & kPeek));
+  if (kReadDebug) {
+    printf("queue: read returning %p\n", msg);
+  }
+  return msg;
+}
+const void *Queue::ReadMessageIndex(int options, int *index) {
+  if (kReadDebug) {
+    printf("queue: %p->ReadMessageIndex(%d, %p(*=%d))\n",
+           this, options, index, *index);
+  }
+  void *msg = NULL;
+  {
+    MutexLocker locker(&data_lock_);
+    if (!ReadCommonStart(options, index)) {
+      if (kReadDebug) {
+        printf("queue: common returned false for %p\n", this);
+      }
+      return NULL;
+    }
+    // TODO(parker): Handle integer wrap on the index.
+    const int offset = messages_ - *index;
+    int my_start = data_end_ - offset;
+    if (offset >= data_length_) {  // if we're behind the available messages
+      // catch index up to the last available message
+      *index += data_start_ - my_start;
+      // and that's the one we're going to read
+      my_start = data_start_;
+    }
+    if (my_start < 0) {  // if we want to read off the end of the buffer
+      // unwrap where we're going to read from
+      my_start += data_length_;
+    }
+    if (options & kPeek) {
+      msg = ReadPeek(options, my_start);
+    } else {
+      if (options & kFromEnd) {
+        if (kReadDebug) {
+          printf("queue: start of c1 of %p\n", this);
+        }
+        int pos = data_end_ - 1;
+        if (pos < 0) {  // if it wrapped
+          pos = data_length_ - 1;  // unwrap it
+        }
+        if (kReadDebug) {
+          printf("queue: reading from c1: %d\n", pos);
+        }
+        msg = data_[pos];
+        *index = messages_;
+      } else {
+        if (kReadDebug) {
+          printf("queue: reading from d1: %d\n", my_start);
+        }
+        msg = data_[my_start];
+        ++(*index);
+      }
+      MessageHeader *const header = MessageHeader::Get(msg);
+      ++header->ref_count;
+      if (kRefDebug) {
+        printf("ref_inc_count: %p\n", msg);
+      }
+    }
+  }
+  // this function never consumes one off the queue
+  ReadCommonEnd(false);
+  return msg;
+}
+
+void *Queue::GetMessage() {
+  MutexLocker locker(&pool_lock_);
+  MessageHeader *header;
+  if (pool_length_ - messages_used_ > 0) {
+    header = pool_[messages_used_];
+  } else {
+    if (pool_length_ >= mem_length_) {
+      LOG(FATAL, "overused pool %p from queue %p\n", pool, queue);
+    }
+    header = pool_[pool_length_] =
+        static_cast<MessageHeader *>(shm_malloc(msg_length_));
+    ++pool_length_;
+  }
+  void *msg = reinterpret_cast<uint8_t *>(header) + sizeof(MessageHeader);
+  header->ref_count = 1;
+  if (kRefDebug) {
+    printf("ref alloc: %p\n", msg);
+  }
+  header->index = messages_used_;
+  ++messages_used_;
+  return msg;
+}
+
+}  // namespace aos
diff --git a/aos/atom_code/ipc_lib/queue.h b/aos/atom_code/ipc_lib/queue.h
index 4c279e1..f7c116b 100644
--- a/aos/atom_code/ipc_lib/queue.h
+++ b/aos/atom_code/ipc_lib/queue.h
@@ -1,134 +1,160 @@
-#ifndef AOS_IPC_LIB_QUEUE_H_
-#define AOS_IPC_LIB_QUEUE_H_
+#ifndef AOS_COMMON_QUEUE_H_
+#define AOS_COMMON_QUEUE_H_
 
-#include "shared_mem.h"
-#include "aos_sync.h"
+#include "aos/atom_code/ipc_lib/shared_mem.h"
+#include "aos/common/mutex.h"
+#include "aos/common/condition.h"
 
 // TODO(brians) add valgrind client requests to the queue and shared_mem_malloc
 // code to make checking for leaks work better
 // <http://www.valgrind.org/docs/manual/mc-manual.html#mc-manual.mempools>
 // describes how
 
-#ifdef __cplusplus
-extern "C" {
-#endif
-
-// Queues are the primary way to use shared memory. Basic use consists of
-// initializing an aos_type_sig and then calling aos_fetch_queue on it.
-// This aos_queue* can then be used to get a message and write it or to read a
-// message.
-// Queues (as the name suggests) are a FIFO stack of messages. Each combination
-// of name and aos_type_sig will result in a different queue, which means that
-// if you only recompile some code that uses differently sized messages, it will
-// simply use a different queue than the old code.
-//
 // Any pointers returned from these functions can be safely passed to other
 // processes because they are all shared memory pointers.
 // IMPORTANT: Any message pointer must be passed back in some way
-// (aos_queue_free_msg and aos_queue_write_msg are common ones) or the
+// (FreeMessage and WriteMessage are common ones) or the
 // application will leak shared memory.
-// NOTE: Taking a message from read_msg and then passing it to write_msg might
-// work, but it is not guaranteed to.
+// NOTE: Taking a message from ReadMessage and then passing it to WriteMessage
+// might work, but it is not guaranteed to.
 
-typedef struct aos_type_sig_t {
-	size_t length; // sizeof(message)
-	int hash; // can differentiate multiple otherwise identical queues
-	int queue_length; // how many messages the queue can hold
-} aos_type_sig;
+namespace aos {
 
-// Structures that are opaque to users (defined in queue_internal.h).
-typedef struct aos_queue_list_t aos_queue_list;
-typedef struct aos_queue_t aos_queue;
+// Queues are the primary way to use shared memory. Basic use consists of
+// calling Queue::Fetch and then reading and/or writing messages.
+// Queues (as the name suggests) are a FIFO stack of messages. Each combination
+// of name and type signature will result in a different queue, which means
+// that if you only recompile some code that uses differently sized messages,
+// it will simply use a different queue than the old code.
+class Queue {
+ public:
+  // Retrieves (and creates if necessary) a queue. Each combination of name and
+  // signature refers to a completely independent queue.
+  // length is how large each message will be
+  // hash can differentiate multiple otherwise identical queues
+  // queue_length is how many messages the queue will be able to hold
+  static Queue *Fetch(const char *name, size_t length, int hash,
+                      int queue_length);
+  // Same as above, except sets up the returned queue so that it will put
+  // messages on *recycle when they are freed (after they have been released by
+  // all other readers/writers and are not in the queue).
+  // recycle_queue_length determines how many freed messages will be kept.
+  // Other code can retrieve the 2 queues separately (the recycle queue will
+  // have the same length and hash as the main one). However, any frees made
+  // using a queue with only (name,length,hash,queue_length) before the
+  // recycle queue has been associated with it will not go on to the recycle
+  // queue.
+  // NOTE: calling this function with the same (name,length,hash,queue_length)
+  // but multiple recycle_queue_lengths will result in each freed message being
+  // put onto an undefined one of the recycle queues.
+  static Queue *Fetch(const char *name, size_t length, int hash,
+                      int queue_length,
+                      int recycle_hash, int recycle_queue_length,
+                      Queue **recycle);
 
-// Retrieves (and creates if necessary) a queue. Each combination of name and
-// signature refers to a completely independent queue.
-aos_queue *aos_fetch_queue(const char *name, const aos_type_sig *sig);
-// Same as above, except sets up the returned queue so that it will put messages
-// on *recycle (retrieved with recycle_sig) when they are freed (after they have
-// been released by all other readers/writers and are not in the queue).
-// The length of recycle_sig determines how many freed messages will be kept.
-// Other code can retrieve recycle_sig and sig separately. However, any frees
-// made using aos_fetch_queue with only sig before the recycle queue has been
-// associated with it will not go on to the recyce queue.
-// Will return NULL for both queues if sig->length != recycle_sig->length or
-// sig->hash == recycle_sig->hash (just to be safe).
-// NOTE: calling this function with the same sig but multiple recycle_sig s
-// will result in each freed message being put onto an undefined recycle_sig.
-aos_queue *aos_fetch_queue_recycle(const char *name, const aos_type_sig *sig,
-                                   const aos_type_sig *recycle_sig, aos_queue **recycle);
+  // Constants for passing to options arguments.
+  // The non-conflicting ones can be combined with bitwise-or.
 
-// Constants for passing to opts arguments.
-// #defines so that c code can use queues
-// The non-conflicting ones can be combined with bitwise-or.
-// TODO(brians) prefix these?
-//
-// Causes the returned message to be left in the queue.
-// For reading only.
-#define PEEK      0x0001
-// Reads the last message in the queue instead of just the next one.
-// NOTE: This removes all of the messages until the last one from the queue
-// (which means that nobody else will read them). However, PEEK means to not
-// remove any from the queue, including the ones that are skipped.
-// For reading only.
-#define FROM_END  0x0002
-// Causes reads to return NULL and writes to fail instead of waiting.
-// For reading and writing.
-#define NON_BLOCK 0x0004
-// Causes things to block.
-// IMPORTANT: #defined to 0 so that it is the default. This has to stay.
-// For reading and writing.
-#define BLOCK     0x0000
-// Causes writes to overwrite the oldest message in the queue instead of
-// blocking.
-// For writing only.
-#define OVERRIDE  0x0008
+  // Causes the returned message to be left in the queue.
+  // For reading only.
+  static const int kPeek = 0x0001;
+  // Reads the last message in the queue instead of just the next one.
+  // NOTE: This removes all of the messages until the last one from the queue
+  // (which means that nobody else will read them). However, PEEK means to not
+  // remove any from the queue, including the ones that are skipped.
+  // For reading only.
+  static const int kFromEnd = 0x0002;
+  // Causes reads to return NULL and writes to fail instead of waiting.
+  // For reading and writing.
+  static const int kNonBlock = 0x0004;
+  // Causes things to block.
+  // IMPORTANT: Has a value of 0 so that it is the default. This has to stay.
+  // For reading and writing.
+  static const int kBlock = 0x0000;
+  // Causes writes to overwrite the oldest message in the queue instead of
+  // blocking.
+  // For writing only.
+  static const int kOverride = 0x0008;
 
-// Frees a message. Does nothing if msg is NULL.
-int aos_queue_free_msg(aos_queue *queue, const void *msg);
+  // Writes a message into the queue.
+  // This function takes ownership of msg.
+  // NOTE: msg must point to a valid message from this queue
+  bool WriteMessage(void *msg, int options);
 
-// Writes a message into the queue.
-// NOTE: msg must point to at least the length of this queue's worth of valid
-// data to write
-// IMPORTANT: if this returns -1, then the caller must do something with msg
-// (like free it)
-int aos_queue_write_msg(aos_queue *queue, void *msg, int opts);
-// Exactly the same as aos_queue_write_msg, except it automatically frees the
-// message if writing fails.
-static inline int aos_queue_write_msg_free(aos_queue *queue, void *msg, int opts) {
-  const int ret = aos_queue_write_msg(queue, msg, opts);
-  if (ret != 0) {
-    aos_queue_free_msg(queue, msg);
-  }
-  return ret;
-}
+  // Reads a message out of the queue.
+  // The return value will have at least the length of this queue's worth of
+  // valid data where it's pointing to.
+  // The return value is const because other people might be viewing the same
+  // messsage. Do not cast the const away!
+  // IMPORTANT: The return value (if not NULL) must eventually be passed to
+  // FreeMessage.
+  const void *ReadMessage(int options);
+  // Exactly the same as aos_queue_read_msg, except it will never return the
+  // same message twice with the same index argument. However, it may not
+  // return some messages that pass through the queue.
+  // *index should start as 0. index does not have to be in shared memory, but
+  // it can be
+  const void *ReadMessageIndex(int options, int *index);
 
-// Reads a message out of the queue.
-// The return value will have at least the length of this queue's worth of valid
-// data where it's pointing to.
-// The return value is const because other people might be viewing the same
-// messsage. Do not cast the const away!
-// IMPORTANT: The return value (if not NULL) must eventually be passed to
-// aos_queue_free_msg.
-const void *aos_queue_read_msg(aos_queue *buf, int opts);
-// Exactly the same as aos_queue_read_msg, except it will never return the same
-// message twice with the same index argument. However, it may not return some
-// messages that pass through the queue.
-// *index should start as 0. index does not have to be in shared memory, but it
-// can be
-const void *aos_queue_read_msg_index(aos_queue *queue, int opts, int *index);
+  // Retrieves ("allocates") a message that can then be written to the queue.
+  // NOTE: the return value will be completely uninitialized
+  // The return value will have at least the length of this queue's worth of
+  // valid memory where it's pointing to.
+  // Returns NULL for error.
+  // IMPORTANT: The return value (if not NULL) must eventually be passed to
+  // FreeMessage.
+  void *GetMessage();
 
-// Retrieves ("allocates") a message that can then be written to the queue.
-// NOTE: the return value will be completely uninitialized
-// The return value will have at least the length of this queue's worth of valid
-// data where it's pointing to.
-// Returns NULL for error.
-// IMPORTANT: The return value (if not NULL) must eventually be passed to
-// aos_queue_free_msg.
-void *aos_queue_get_msg(aos_queue *queue);
+  void FreeMessage(const void *msg) { DecrementMessageReferenceCount(msg); }
 
-#ifdef __cplusplus
-}
-#endif
+ private:
+  struct MessageHeader;
 
-#endif
+  // These next 4 allow finding the right one.
+  const char *name_;
+  size_t length_;
+  int hash_;
+  int queue_length_;
+  // The next one in the linked list of queues.
+  Queue *next_;
 
+  Queue *recycle_;
+
+  Mutex data_lock_;  // protects operations on data_ etc
+  Condition readable_;
+  Condition writable_;
+  int data_length_;  // max length into data + 1
+  int data_start_;  // is an index into data
+  int data_end_;  // is an index into data
+  int messages_;  // that have passed through
+  void **data_;  // array of messages (with headers)
+
+  Mutex pool_lock_;
+  size_t msg_length_;  // sizeof(each message) including the header
+  int mem_length_;  // the max number of messages that will ever be allocated
+  int messages_used_;
+  int pool_length_;  // the number of allocated messages
+  MessageHeader **pool_;  // array of pointers to messages
+
+  // Actually frees the given message.
+  void DoFreeMessage(const void *msg);
+  // Calls DoFreeMessage if appropriate.
+  void DecrementMessageReferenceCount(const void *msg);
+
+  // Should be called with data_lock_ locked.
+  // Returns with a readable message in data_ or false.
+  bool ReadCommonStart(int options, int *index);
+  // Deals with setting/unsetting readable_ and writable_.
+  // Should be called after data_lock_ has been unlocked.
+  // read is whether or not this read call read one off the queue
+  void ReadCommonEnd(bool read);
+  // Handles reading with kPeek.
+  void *ReadPeek(int options, int start);
+
+  // Gets called by Fetch when necessary (with placement new).
+  Queue(const char *name, size_t length, int hash, int queue_length);
+};
+
+}  // namespace aos
+
+#endif  // AOS_COMMONG_QUEUE_H_
diff --git a/aos/atom_code/ipc_lib/queue_internal.h b/aos/atom_code/ipc_lib/queue_internal.h
deleted file mode 100644
index e592649..0000000
--- a/aos/atom_code/ipc_lib/queue_internal.h
+++ /dev/null
@@ -1,62 +0,0 @@
-#ifndef AOS_IPC_LIB_QUEUE_INTERNAL_H_
-#define AOS_IPC_LIB_QUEUE_INTERNAL_H_
-
-#include "shared_mem.h"
-#include "aos_sync.h"
-
-// Should only be used by queue.c. Contains definitions of the structures
-// it uses.
-
-// The number of extra messages the pool associated with each queue will be able
-// to hold (for readers who are slow about freeing them).
-#define EXTRA_MESSAGES 20
-
-typedef struct aos_msg_header_t {
-	int ref_count;
-	int index; // in the pool
-} aos_msg_header;
-static inline void header_swap(aos_msg_header *l, aos_msg_header *r) {
-  aos_msg_header tmp;
-  tmp.ref_count = l->ref_count;
-  tmp.index = l->index;
-  l->ref_count = r->ref_count;
-  l->index = r->index;
-  r->ref_count = tmp.ref_count;
-  r->index = tmp.index;
-}
-
-struct aos_queue_list_t {
-	char *name;
-	aos_type_sig sig;
-	aos_queue *queue;
-	aos_queue_list *next;
-};
-
-typedef struct aos_ring_buf_t {
-	mutex buff_lock; // the main lock protecting operations on this buffer
-  // conditions
-	condition_variable writable;
-	condition_variable readable;
-	int length; // max index into data + 1
-	int start; // is an index into data
-	int end; // is an index into data
-	int msgs; // that have passed through
-	void **data; // array of messages (w/ headers)
-} aos_ring_buf;
-
-typedef struct aos_msg_pool_t {
-	mutex pool_lock;
-	size_t msg_length;
-	int mem_length; // the number of messages
-	int used; // number of messages
-	int length; // number of allocated messages
-	void **pool; // array of messages
-} aos_msg_pool;
-
-struct aos_queue_t {
-	aos_msg_pool pool;
-	aos_ring_buf buf;
-  aos_queue *recycle;
-};
-
-#endif
diff --git a/aos/atom_code/ipc_lib/queue_test.cpp b/aos/atom_code/ipc_lib/queue_test.cc
similarity index 80%
rename from aos/atom_code/ipc_lib/queue_test.cpp
rename to aos/atom_code/ipc_lib/queue_test.cc
index e9ac8d5..b437c86 100644
--- a/aos/atom_code/ipc_lib/queue_test.cpp
+++ b/aos/atom_code/ipc_lib/queue_test.cc
@@ -1,3 +1,5 @@
+#include "aos/common/queue.h"
+
 #include <unistd.h>
 #include <sys/mman.h>
 #include <inttypes.h>
@@ -11,9 +13,12 @@
 #include "aos/atom_code/ipc_lib/sharedmem_test_setup.h"
 #include "aos/common/type_traits.h"
 
-using testing::AssertionResult;
-using testing::AssertionSuccess;
-using testing::AssertionFailure;
+using ::testing::AssertionResult;
+using ::testing::AssertionSuccess;
+using ::testing::AssertionFailure;
+
+namespace aos {
+namespace testing {
 
 // IMPORTANT: Some of the functions that do test predicate functions allocate
 // shared memory (and don't free it).
@@ -44,16 +49,19 @@
         return std::string("unknown(" + static_cast<uint8_t>(result)) + ")";
     }
   }
-  static_assert(aos::shm_ok<ResultType>::value, "this will get put in shared memory");
+  static_assert(aos::shm_ok<ResultType>::value,
+                "this will get put in shared memory");
   // Gets allocated in shared memory so it has to be volatile.
-  template<typename T> struct FunctionToCall {
+  template<typename T>
+  struct FunctionToCall {
     ResultType result;
     bool expected;
     void (*function)(T*, char*);
     T *arg;
     volatile char failure[kFailureSize];
   };
-  template<typename T> static void Hangs_(volatile FunctionToCall<T> *const to_call) {
+  template<typename T>
+  static void Hangs_(volatile FunctionToCall<T> *const to_call) {
     to_call->result = ResultType::Called;
     to_call->function(to_call->arg, const_cast<char *>(to_call->failure));
     to_call->result = ResultType::Returned;
@@ -124,7 +132,7 @@
 
   void SetUp() {
     SharedMemTestSetup::SetUp();
-    fatal_failure = reinterpret_cast<char *>(shm_malloc(sizeof(fatal_failure)));
+    fatal_failure = static_cast<char *>(shm_malloc(sizeof(fatal_failure)));
     static bool registered = false;
     if (!registered) {
       atexit(ReapExitHandler);
@@ -138,7 +146,7 @@
   // the attribute is in the middle to make gcc happy
   template<typename T> __attribute__((warn_unused_result))
       std::unique_ptr<ForkedProcess> ForkExecute(void (*function)(T*), T *arg) {
-    mutex *lock = reinterpret_cast<mutex *>(shm_malloc_aligned(
+    mutex *lock = static_cast<mutex *>(shm_malloc_aligned(
             sizeof(*lock), sizeof(int)));
     *lock = 1;
     const pid_t pid = fork();
@@ -182,7 +190,7 @@
                                                  bool expected, ChildID id) {
     static_assert(aos::shm_ok<FunctionToCall<T>>::value,
                   "this is going into shared memory");
-    volatile FunctionToCall<T> *const to_call = reinterpret_cast<FunctionToCall<T> *>(
+    volatile FunctionToCall<T> *const to_call = static_cast<FunctionToCall<T> *>(
         shm_malloc_aligned(sizeof(*to_call), sizeof(int)));
     to_call->result = ResultType::NotCalled;
     to_call->function = function;
@@ -237,38 +245,37 @@
     int16_t data; // don't really want to test empty messages
   };
   struct MessageArgs {
-    aos_queue *const queue;
+    Queue *const queue;
     int flags;
     int16_t data; // -1 means NULL expected
   };
   static void WriteTestMessage(MessageArgs *args, char *failure) {
-    TestMessage *msg = reinterpret_cast<TestMessage *>(aos_queue_get_msg(args->queue));
+    TestMessage *msg = static_cast<TestMessage *>(args->queue->GetMessage());
     if (msg == NULL) {
       snprintf(fatal_failure, kFailureSize, "couldn't get_msg from %p", args->queue);
       return;
     }
     msg->data = args->data;
-    if (aos_queue_write_msg_free(args->queue, msg, args->flags) == -1) {
+    if (!args->queue->WriteMessage(msg, args->flags)) {
       snprintf(failure, kFailureSize, "write_msg_free(%p, %p, %d) failed",
                args->queue, msg, args->flags);
     }
   }
   static void ReadTestMessage(MessageArgs *args, char *failure) {
-    const TestMessage *msg = reinterpret_cast<const TestMessage *>(
-        aos_queue_read_msg(args->queue, args->flags));
+    const TestMessage *msg = static_cast<const TestMessage *>(
+        args->queue->ReadMessage(args->flags));
     if (msg == NULL) {
       if (args->data != -1) {
-        snprintf(failure, kFailureSize,
-                 "expected data of %" PRId16 " but got NULL message",
+        snprintf(failure, kFailureSize, "expected data of %"PRId16" but got NULL message",
                  args->data);
       }
     } else {
       if (args->data != msg->data) {
         snprintf(failure, kFailureSize,
-                 "expected data of %" PRId16 " but got %" PRId16 " instead",
+                 "expected data of %"PRId16" but got %"PRId16" instead",
                  args->data, msg->data);
       }
-      aos_queue_free_msg(args->queue, msg);
+      args->queue->FreeMessage(msg);
     }
   }
 };
@@ -276,81 +283,75 @@
 std::map<QueueTest::ChildID, QueueTest::ForkedProcess *> QueueTest::children_;
 
 TEST_F(QueueTest, Reading) {
-  static const aos_type_sig signature{sizeof(TestMessage), 1, 1};
-  aos_queue *const queue = aos_fetch_queue("Queue", &signature);
+  Queue *const queue = Queue::Fetch("Queue", sizeof(TestMessage), 1, 1);
   MessageArgs args{queue, 0, -1};
 
-  EXPECT_EQ(BLOCK, 0);
-  EXPECT_EQ(BLOCK | FROM_END, FROM_END);
-
-  args.flags = NON_BLOCK;
+  args.flags = Queue::kNonBlock;
   EXPECT_RETURNS(ReadTestMessage, &args);
-  args.flags = NON_BLOCK | PEEK;
+  args.flags = Queue::kNonBlock | Queue::kPeek;
   EXPECT_RETURNS(ReadTestMessage, &args);
   args.flags = 0;
   EXPECT_HANGS(ReadTestMessage, &args);
-  args.flags = PEEK;
+  args.flags = Queue::kPeek;
   EXPECT_HANGS(ReadTestMessage, &args);
   args.data = 254;
-  args.flags = BLOCK;
+  args.flags = Queue::kBlock;
   EXPECT_RETURNS(WriteTestMessage, &args);
-  args.flags = PEEK;
+  args.flags = Queue::kPeek;
   EXPECT_RETURNS(ReadTestMessage, &args);
-  args.flags = PEEK;
+  args.flags = Queue::kPeek;
   EXPECT_RETURNS(ReadTestMessage, &args);
-  args.flags = PEEK | NON_BLOCK;
+  args.flags = Queue::kPeek | Queue::kNonBlock;
   EXPECT_RETURNS(ReadTestMessage, &args);
   args.flags = 0;
   EXPECT_RETURNS(ReadTestMessage, &args);
   args.flags = 0;
   args.data = -1;
   EXPECT_HANGS(ReadTestMessage, &args);
-  args.flags = NON_BLOCK;
+  args.flags = Queue::kNonBlock;
   EXPECT_RETURNS(ReadTestMessage, &args);
   args.flags = 0;
   args.data = 971;
   EXPECT_RETURNS_FAILS(ReadTestMessage, &args);
 }
 TEST_F(QueueTest, Writing) {
-  static const aos_type_sig signature{sizeof(TestMessage), 1, 1};
-  aos_queue *const queue = aos_fetch_queue("Queue", &signature);
+  Queue *const queue = Queue::Fetch("Queue", sizeof(TestMessage), 1, 1);
   MessageArgs args{queue, 0, 973};
 
-  args.flags = BLOCK;
+  args.flags = Queue::kBlock;
   EXPECT_RETURNS(WriteTestMessage, &args);
-  args.flags = BLOCK;
+  args.flags = Queue::kBlock;
   EXPECT_HANGS(WriteTestMessage, &args);
-  args.flags = NON_BLOCK;
+  args.flags = Queue::kNonBlock;
   EXPECT_RETURNS_FAILS(WriteTestMessage, &args);
-  args.flags = NON_BLOCK;
+  args.flags = Queue::kNonBlock;
   EXPECT_RETURNS_FAILS(WriteTestMessage, &args);
-  args.flags = PEEK;
+  args.flags = Queue::kPeek;
   EXPECT_RETURNS(ReadTestMessage, &args);
   args.data = 971;
-  args.flags = OVERRIDE;
+  args.flags = Queue::kOverride;
   EXPECT_RETURNS(WriteTestMessage, &args);
-  args.flags = OVERRIDE;
+  args.flags = Queue::kOverride;
   EXPECT_RETURNS(WriteTestMessage, &args);
   args.flags = 0;
   EXPECT_RETURNS(ReadTestMessage, &args);
-  args.flags = NON_BLOCK;
+  args.flags = Queue::kNonBlock;
   EXPECT_RETURNS(WriteTestMessage, &args);
   args.flags = 0;
   EXPECT_RETURNS(ReadTestMessage, &args);
-  args.flags = OVERRIDE;
+  args.flags = Queue::kOverride;
   EXPECT_RETURNS(WriteTestMessage, &args);
   args.flags = 0;
   EXPECT_RETURNS(ReadTestMessage, &args);
 }
 
 TEST_F(QueueTest, MultiRead) {
-  static const aos_type_sig signature{sizeof(TestMessage), 1, 1};
-  aos_queue *const queue = aos_fetch_queue("Queue", &signature);
+  Queue *const queue = Queue::Fetch("Queue", sizeof(TestMessage), 1, 1);
   MessageArgs args{queue, 0, 1323};
 
-  args.flags = BLOCK;
+  args.flags = Queue::kBlock;
   EXPECT_RETURNS(WriteTestMessage, &args);
-  args.flags = BLOCK;
+  args.flags = Queue::kBlock;
   ASSERT_TRUE(HangsFork(ReadTestMessage, &args, true, 1));
   ASSERT_TRUE(HangsFork(ReadTestMessage, &args, true, 2));
   EXPECT_TRUE(HangsCheck(1) != HangsCheck(2));
@@ -360,45 +361,45 @@
 TEST_F(QueueTest, Recycle) {
   // TODO(brians) basic test of recycle queue
   // include all of the ways a message can get into the recycle queue
-  static const aos_type_sig signature{sizeof(TestMessage), 1, 2},
-               recycle_signature{sizeof(TestMessage), 2, 2};
-  aos_queue *recycle_queue = reinterpret_cast<aos_queue *>(23);
-  aos_queue *const queue = aos_fetch_queue_recycle("Queue", &signature,
-                                                   &recycle_signature, &recycle_queue);
-  ASSERT_NE(reinterpret_cast<aos_queue *>(23), recycle_queue);
+  Queue *recycle_queue = reinterpret_cast<Queue *>(23);
+  Queue *const queue = Queue::Fetch("Queue", sizeof(TestMessage), 1, 2, 2, 2,
+                                    &recycle_queue);
+  ASSERT_NE(reinterpret_cast<Queue *>(23), recycle_queue);
   MessageArgs args{queue, 0, 973}, recycle{recycle_queue, 0, 973};
 
-  args.flags = BLOCK;
+  args.flags = Queue::kBlock;
   EXPECT_RETURNS(WriteTestMessage, &args);
   EXPECT_HANGS(ReadTestMessage, &recycle);
   args.data = 254;
   EXPECT_RETURNS(WriteTestMessage, &args);
   EXPECT_HANGS(ReadTestMessage, &recycle);
   args.data = 971;
-  args.flags = OVERRIDE;
+  args.flags = Queue::kOverride;
   EXPECT_RETURNS(WriteTestMessage, &args);
-  recycle.flags = BLOCK;
+  recycle.flags = Queue::kBlock;
   EXPECT_RETURNS(ReadTestMessage, &recycle);
 
   EXPECT_HANGS(ReadTestMessage, &recycle);
 
-  TestMessage *msg = static_cast<TestMessage *>(aos_queue_get_msg(queue));
+  TestMessage *msg = static_cast<TestMessage *>(queue->GetMessage());
   ASSERT_TRUE(msg != NULL);
   msg->data = 341;
-  aos_queue_free_msg(queue, msg);
+  queue->FreeMessage(msg);
   recycle.data = 341;
   EXPECT_RETURNS(ReadTestMessage, &recycle);
 
   EXPECT_HANGS(ReadTestMessage, &recycle);
 
   args.data = 254;
-  args.flags = PEEK;
+  args.flags = Queue::kPeek;
   EXPECT_RETURNS(ReadTestMessage, &args);
-  recycle.flags = BLOCK;
+  recycle.flags = Queue::kBlock;
   EXPECT_HANGS(ReadTestMessage, &recycle);
-  args.flags = BLOCK;
+  args.flags = Queue::kBlock;
   EXPECT_RETURNS(ReadTestMessage, &args);
   recycle.data = 254;
   EXPECT_RETURNS(ReadTestMessage, &recycle);
 }
 
+}  // namespace testing
+}  // namespace aos
diff --git a/aos/atom_code/ipc_lib/shared_mem.h b/aos/atom_code/ipc_lib/shared_mem.h
index b1a2608..d8f2d18 100644
--- a/aos/atom_code/ipc_lib/shared_mem.h
+++ b/aos/atom_code/ipc_lib/shared_mem.h
@@ -1,14 +1,14 @@
 #ifndef _SHARED_MEM_H_
 #define _SHARED_MEM_H_
 
-#ifdef __cplusplus
-extern "C" {
-#endif
-
 #include "core_lib.h"
 #include <stddef.h>
 #include <unistd.h>
 
+#ifdef __cplusplus
+extern "C" {
+#endif
+
 // Where the shared memory segment starts in each process's address space.
 // Has to be the same in all of them so that stuff in shared memory
 // can have regular pointers to other stuff in shared memory.
diff --git a/aos/atom_code/logging/atom_logging.cc b/aos/atom_code/logging/atom_logging.cc
index 08a65cb..6ea3b24 100644
--- a/aos/atom_code/logging/atom_logging.cc
+++ b/aos/atom_code/logging/atom_logging.cc
@@ -17,6 +17,8 @@
 #include "aos/atom_code/thread_local.h"
 #include "aos/atom_code/ipc_lib/queue.h"
 
+using ::aos::Queue;
+
 namespace aos {
 namespace logging {
 namespace {
@@ -53,8 +55,7 @@
   return process_name + '.' + thread_name;
 }
 
-static const aos_type_sig message_sig = {sizeof(LogMessage), 1323, 1500};
-static aos_queue *queue;
+static Queue *queue;
 
 }  // namespace
 namespace internal {
@@ -83,7 +84,7 @@
 
 class AtomQueueLogImplementation : public LogImplementation {
   virtual void DoLog(log_level level, const char *format, va_list ap) {
-    LogMessage *message = static_cast<LogMessage *>(aos_queue_get_msg(queue));
+    LogMessage *message = static_cast<LogMessage *>(queue->GetMessage());
     if (message == NULL) {
       LOG(FATAL, "queue get message failed\n");
     }
@@ -99,7 +100,7 @@
 void Register() {
   Init();
 
-  queue = aos_fetch_queue("LoggingQueue", &message_sig);
+  queue = Queue::Fetch("LoggingQueue", sizeof(LogMessage), 1323, 1500);
   if (queue == NULL) {
     Die("logging: couldn't fetch queue\n");
   }
@@ -108,33 +109,32 @@
 }
 
 const LogMessage *ReadNext(int flags, int *index) {
-  return static_cast<const LogMessage *>(
-      aos_queue_read_msg_index(queue, flags, index));
+  return static_cast<const LogMessage *>(queue->ReadMessageIndex(flags, index));
 }
 
 const LogMessage *ReadNext() {
-  return ReadNext(BLOCK);
+  return ReadNext(Queue::kBlock);
 }
 
 const LogMessage *ReadNext(int flags) {
   const LogMessage *r = NULL;
   do {
-    r = static_cast<const LogMessage *>(aos_queue_read_msg(queue, flags));
+    r = static_cast<const LogMessage *>(queue->ReadMessage(flags));
     // not blocking means return a NULL if that's what it gets
-  } while ((flags & BLOCK) && r == NULL);
+  } while ((flags & Queue::kBlock) && r == NULL);
   return r;
 }
 
 LogMessage *Get() {
-  return static_cast<LogMessage *>(aos_queue_get_msg(queue));
+  return static_cast<LogMessage *>(queue->GetMessage());
 }
 
 void Free(const LogMessage *msg) {
-  aos_queue_free_msg(queue, msg);
+  queue->FreeMessage(msg);
 }
 
 void Write(LogMessage *msg) {
-  if (aos_queue_write_msg_free(queue, msg, OVERRIDE) < 0) {
+  if (!queue->WriteMessage(msg, Queue::kOverride)) {
     LOG(FATAL, "writing failed");
   }
 }