copying over queue-api-redo branch from my 2012 repo
This commit is not cleanly separated from the next one where I got
everything to compile again (and possibly even further on from there).
diff --git a/aos/atom_code/ipc_lib/queue.h b/aos/atom_code/ipc_lib/queue.h
index 4c279e1..f7c116b 100644
--- a/aos/atom_code/ipc_lib/queue.h
+++ b/aos/atom_code/ipc_lib/queue.h
@@ -1,134 +1,160 @@
-#ifndef AOS_IPC_LIB_QUEUE_H_
-#define AOS_IPC_LIB_QUEUE_H_
+#ifndef AOS_COMMON_QUEUE_H_
+#define AOS_COMMON_QUEUE_H_
-#include "shared_mem.h"
-#include "aos_sync.h"
+#include "aos/atom_code/ipc_lib/shared_mem.h"
+#include "aos/common/mutex.h"
+#include "aos/common/condition.h"
// TODO(brians) add valgrind client requests to the queue and shared_mem_malloc
// code to make checking for leaks work better
// <http://www.valgrind.org/docs/manual/mc-manual.html#mc-manual.mempools>
// describes how
-#ifdef __cplusplus
-extern "C" {
-#endif
-
-// Queues are the primary way to use shared memory. Basic use consists of
-// initializing an aos_type_sig and then calling aos_fetch_queue on it.
-// This aos_queue* can then be used to get a message and write it or to read a
-// message.
-// Queues (as the name suggests) are a FIFO stack of messages. Each combination
-// of name and aos_type_sig will result in a different queue, which means that
-// if you only recompile some code that uses differently sized messages, it will
-// simply use a different queue than the old code.
-//
// Any pointers returned from these functions can be safely passed to other
// processes because they are all shared memory pointers.
// IMPORTANT: Any message pointer must be passed back in some way
-// (aos_queue_free_msg and aos_queue_write_msg are common ones) or the
+// (FreeMessage and WriteMessage are common ones) or the
// application will leak shared memory.
-// NOTE: Taking a message from read_msg and then passing it to write_msg might
-// work, but it is not guaranteed to.
+// NOTE: Taking a message from ReadMessage and then passing it to WriteMessage
+// might work, but it is not guaranteed to.
-typedef struct aos_type_sig_t {
- size_t length; // sizeof(message)
- int hash; // can differentiate multiple otherwise identical queues
- int queue_length; // how many messages the queue can hold
-} aos_type_sig;
+namespace aos {
-// Structures that are opaque to users (defined in queue_internal.h).
-typedef struct aos_queue_list_t aos_queue_list;
-typedef struct aos_queue_t aos_queue;
+// Queues are the primary way to use shared memory. Basic use consists of
+// calling Queue::Fetch and then reading and/or writing messages.
+// Queues (as the name suggests) are a FIFO stack of messages. Each combination
+// of name and type signature will result in a different queue, which means
+// that if you only recompile some code that uses differently sized messages,
+// it will simply use a different queue than the old code.
+class Queue {
+ public:
+ // Retrieves (and creates if necessary) a queue. Each combination of name and
+ // signature refers to a completely independent queue.
+ // length is how large each message will be
+ // hash can differentiate multiple otherwise identical queues
+ // queue_length is how many messages the queue will be able to hold
+ static Queue *Fetch(const char *name, size_t length, int hash,
+ int queue_length);
+ // Same as above, except sets up the returned queue so that it will put
+ // messages on *recycle when they are freed (after they have been released by
+ // all other readers/writers and are not in the queue).
+ // recycle_queue_length determines how many freed messages will be kept.
+ // Other code can retrieve the 2 queues separately (the recycle queue will
+ // have the same length and hash as the main one). However, any frees made
+ // using a queue with only (name,length,hash,queue_length) before the
+ // recycle queue has been associated with it will not go on to the recycle
+ // queue.
+ // NOTE: calling this function with the same (name,length,hash,queue_length)
+ // but multiple recycle_queue_lengths will result in each freed message being
+ // put onto an undefined one of the recycle queues.
+ static Queue *Fetch(const char *name, size_t length, int hash,
+ int queue_length,
+ int recycle_hash, int recycle_queue_length,
+ Queue **recycle);
-// Retrieves (and creates if necessary) a queue. Each combination of name and
-// signature refers to a completely independent queue.
-aos_queue *aos_fetch_queue(const char *name, const aos_type_sig *sig);
-// Same as above, except sets up the returned queue so that it will put messages
-// on *recycle (retrieved with recycle_sig) when they are freed (after they have
-// been released by all other readers/writers and are not in the queue).
-// The length of recycle_sig determines how many freed messages will be kept.
-// Other code can retrieve recycle_sig and sig separately. However, any frees
-// made using aos_fetch_queue with only sig before the recycle queue has been
-// associated with it will not go on to the recyce queue.
-// Will return NULL for both queues if sig->length != recycle_sig->length or
-// sig->hash == recycle_sig->hash (just to be safe).
-// NOTE: calling this function with the same sig but multiple recycle_sig s
-// will result in each freed message being put onto an undefined recycle_sig.
-aos_queue *aos_fetch_queue_recycle(const char *name, const aos_type_sig *sig,
- const aos_type_sig *recycle_sig, aos_queue **recycle);
+ // Constants for passing to options arguments.
+ // The non-conflicting ones can be combined with bitwise-or.
-// Constants for passing to opts arguments.
-// #defines so that c code can use queues
-// The non-conflicting ones can be combined with bitwise-or.
-// TODO(brians) prefix these?
-//
-// Causes the returned message to be left in the queue.
-// For reading only.
-#define PEEK 0x0001
-// Reads the last message in the queue instead of just the next one.
-// NOTE: This removes all of the messages until the last one from the queue
-// (which means that nobody else will read them). However, PEEK means to not
-// remove any from the queue, including the ones that are skipped.
-// For reading only.
-#define FROM_END 0x0002
-// Causes reads to return NULL and writes to fail instead of waiting.
-// For reading and writing.
-#define NON_BLOCK 0x0004
-// Causes things to block.
-// IMPORTANT: #defined to 0 so that it is the default. This has to stay.
-// For reading and writing.
-#define BLOCK 0x0000
-// Causes writes to overwrite the oldest message in the queue instead of
-// blocking.
-// For writing only.
-#define OVERRIDE 0x0008
+ // Causes the returned message to be left in the queue.
+ // For reading only.
+ static const int kPeek = 0x0001;
+ // Reads the last message in the queue instead of just the next one.
+ // NOTE: This removes all of the messages until the last one from the queue
+ // (which means that nobody else will read them). However, PEEK means to not
+ // remove any from the queue, including the ones that are skipped.
+ // For reading only.
+ static const int kFromEnd = 0x0002;
+ // Causes reads to return NULL and writes to fail instead of waiting.
+ // For reading and writing.
+ static const int kNonBlock = 0x0004;
+ // Causes things to block.
+ // IMPORTANT: Has a value of 0 so that it is the default. This has to stay.
+ // For reading and writing.
+ static const int kBlock = 0x0000;
+ // Causes writes to overwrite the oldest message in the queue instead of
+ // blocking.
+ // For writing only.
+ static const int kOverride = 0x0008;
-// Frees a message. Does nothing if msg is NULL.
-int aos_queue_free_msg(aos_queue *queue, const void *msg);
+ // Writes a message into the queue.
+ // This function takes ownership of msg.
+ // NOTE: msg must point to a valid message from this queue
+ bool WriteMessage(void *msg, int options);
-// Writes a message into the queue.
-// NOTE: msg must point to at least the length of this queue's worth of valid
-// data to write
-// IMPORTANT: if this returns -1, then the caller must do something with msg
-// (like free it)
-int aos_queue_write_msg(aos_queue *queue, void *msg, int opts);
-// Exactly the same as aos_queue_write_msg, except it automatically frees the
-// message if writing fails.
-static inline int aos_queue_write_msg_free(aos_queue *queue, void *msg, int opts) {
- const int ret = aos_queue_write_msg(queue, msg, opts);
- if (ret != 0) {
- aos_queue_free_msg(queue, msg);
- }
- return ret;
-}
+ // Reads a message out of the queue.
+ // The return value will have at least the length of this queue's worth of
+ // valid data where it's pointing to.
+ // The return value is const because other people might be viewing the same
+ // messsage. Do not cast the const away!
+ // IMPORTANT: The return value (if not NULL) must eventually be passed to
+ // FreeMessage.
+ const void *ReadMessage(int options);
+ // Exactly the same as aos_queue_read_msg, except it will never return the
+ // same message twice with the same index argument. However, it may not
+ // return some messages that pass through the queue.
+ // *index should start as 0. index does not have to be in shared memory, but
+ // it can be
+ const void *ReadMessageIndex(int options, int *index);
-// Reads a message out of the queue.
-// The return value will have at least the length of this queue's worth of valid
-// data where it's pointing to.
-// The return value is const because other people might be viewing the same
-// messsage. Do not cast the const away!
-// IMPORTANT: The return value (if not NULL) must eventually be passed to
-// aos_queue_free_msg.
-const void *aos_queue_read_msg(aos_queue *buf, int opts);
-// Exactly the same as aos_queue_read_msg, except it will never return the same
-// message twice with the same index argument. However, it may not return some
-// messages that pass through the queue.
-// *index should start as 0. index does not have to be in shared memory, but it
-// can be
-const void *aos_queue_read_msg_index(aos_queue *queue, int opts, int *index);
+ // Retrieves ("allocates") a message that can then be written to the queue.
+ // NOTE: the return value will be completely uninitialized
+ // The return value will have at least the length of this queue's worth of
+ // valid memory where it's pointing to.
+ // Returns NULL for error.
+ // IMPORTANT: The return value (if not NULL) must eventually be passed to
+ // FreeMessage.
+ void *GetMessage();
-// Retrieves ("allocates") a message that can then be written to the queue.
-// NOTE: the return value will be completely uninitialized
-// The return value will have at least the length of this queue's worth of valid
-// data where it's pointing to.
-// Returns NULL for error.
-// IMPORTANT: The return value (if not NULL) must eventually be passed to
-// aos_queue_free_msg.
-void *aos_queue_get_msg(aos_queue *queue);
+ void FreeMessage(const void *msg) { DecrementMessageReferenceCount(msg); }
-#ifdef __cplusplus
-}
-#endif
+ private:
+ struct MessageHeader;
-#endif
+ // These next 4 allow finding the right one.
+ const char *name_;
+ size_t length_;
+ int hash_;
+ int queue_length_;
+ // The next one in the linked list of queues.
+ Queue *next_;
+ Queue *recycle_;
+
+ Mutex data_lock_; // protects operations on data_ etc
+ Condition readable_;
+ Condition writable_;
+ int data_length_; // max length into data + 1
+ int data_start_; // is an index into data
+ int data_end_; // is an index into data
+ int messages_; // that have passed through
+ void **data_; // array of messages (with headers)
+
+ Mutex pool_lock_;
+ size_t msg_length_; // sizeof(each message) including the header
+ int mem_length_; // the max number of messages that will ever be allocated
+ int messages_used_;
+ int pool_length_; // the number of allocated messages
+ MessageHeader **pool_; // array of pointers to messages
+
+ // Actually frees the given message.
+ void DoFreeMessage(const void *msg);
+ // Calls DoFreeMessage if appropriate.
+ void DecrementMessageReferenceCount(const void *msg);
+
+ // Should be called with data_lock_ locked.
+ // Returns with a readable message in data_ or false.
+ bool ReadCommonStart(int options, int *index);
+ // Deals with setting/unsetting readable_ and writable_.
+ // Should be called after data_lock_ has been unlocked.
+ // read is whether or not this read call read one off the queue
+ void ReadCommonEnd(bool read);
+ // Handles reading with kPeek.
+ void *ReadPeek(int options, int start);
+
+ // Gets called by Fetch when necessary (with placement new).
+ Queue(const char *name, size_t length, int hash, int queue_length);
+};
+
+} // namespace aos
+
+#endif // AOS_COMMONG_QUEUE_H_