John Park | 398c74a | 2018-10-20 21:17:39 -0700 | [diff] [blame] | 1 | #ifndef AOS_IPC_LIB_QUEUE_H_ |
| 2 | #define AOS_IPC_LIB_QUEUE_H_ |
brians | 343bc11 | 2013-02-10 01:53:46 +0000 | [diff] [blame] | 3 | |
John Park | 398c74a | 2018-10-20 21:17:39 -0700 | [diff] [blame] | 4 | #include "aos/ipc_lib/shared_mem.h" |
John Park | 33858a3 | 2018-09-28 23:05:48 -0700 | [diff] [blame] | 5 | #include "aos/mutex/mutex.h" |
| 6 | #include "aos/condition.h" |
| 7 | #include "aos/util/options.h" |
Alex Perry | cb7da4b | 2019-08-28 19:35:56 -0700 | [diff] [blame^] | 8 | #include "glog/logging.h" |
brians | 343bc11 | 2013-02-10 01:53:46 +0000 | [diff] [blame] | 9 | |
| 10 | // TODO(brians) add valgrind client requests to the queue and shared_mem_malloc |
| 11 | // code to make checking for leaks work better |
| 12 | // <http://www.valgrind.org/docs/manual/mc-manual.html#mc-manual.mempools> |
| 13 | // describes how |
| 14 | |
brians | 343bc11 | 2013-02-10 01:53:46 +0000 | [diff] [blame] | 15 | // Any pointers returned from these functions can be safely passed to other |
| 16 | // processes because they are all shared memory pointers. |
| 17 | // IMPORTANT: Any message pointer must be passed back in some way |
Brian Silverman | a6d1b56 | 2013-09-01 14:39:39 -0700 | [diff] [blame] | 18 | // (FreeMessage and WriteMessage are common ones) or the |
brians | 343bc11 | 2013-02-10 01:53:46 +0000 | [diff] [blame] | 19 | // application will leak shared memory. |
Brian Silverman | a6d1b56 | 2013-09-01 14:39:39 -0700 | [diff] [blame] | 20 | // NOTE: Taking a message from ReadMessage and then passing it to WriteMessage |
| 21 | // might work, but it is not guaranteed to. |
brians | 343bc11 | 2013-02-10 01:53:46 +0000 | [diff] [blame] | 22 | |
Brian Silverman | a6d1b56 | 2013-09-01 14:39:39 -0700 | [diff] [blame] | 23 | namespace aos { |
brians | 343bc11 | 2013-02-10 01:53:46 +0000 | [diff] [blame] | 24 | |
Brian Silverman | a6d1b56 | 2013-09-01 14:39:39 -0700 | [diff] [blame] | 25 | // Queues are the primary way to use shared memory. Basic use consists of |
| 26 | // calling Queue::Fetch and then reading and/or writing messages. |
| 27 | // Queues (as the name suggests) are a FIFO stack of messages. Each combination |
| 28 | // of name and type signature will result in a different queue, which means |
| 29 | // that if you only recompile some code that uses differently sized messages, |
| 30 | // it will simply use a different queue than the old code. |
Brian Silverman | 08661c7 | 2013-09-01 17:24:38 -0700 | [diff] [blame] | 31 | class RawQueue { |
Brian Silverman | a6d1b56 | 2013-09-01 14:39:39 -0700 | [diff] [blame] | 32 | public: |
| 33 | // Retrieves (and creates if necessary) a queue. Each combination of name and |
| 34 | // signature refers to a completely independent queue. |
| 35 | // length is how large each message will be |
| 36 | // hash can differentiate multiple otherwise identical queues |
| 37 | // queue_length is how many messages the queue will be able to hold |
Brian Silverman | 227ad48 | 2014-03-23 11:21:32 -0700 | [diff] [blame] | 38 | // Will never return NULL. |
Brian Silverman | 08661c7 | 2013-09-01 17:24:38 -0700 | [diff] [blame] | 39 | static RawQueue *Fetch(const char *name, size_t length, int hash, |
Brian Silverman | 5d3ab7f | 2015-02-21 15:54:21 -0500 | [diff] [blame] | 40 | int queue_length); |
Brian Silverman | a6d1b56 | 2013-09-01 14:39:39 -0700 | [diff] [blame] | 41 | // Same as above, except sets up the returned queue so that it will put |
| 42 | // messages on *recycle when they are freed (after they have been released by |
| 43 | // all other readers/writers and are not in the queue). |
| 44 | // recycle_queue_length determines how many freed messages will be kept. |
| 45 | // Other code can retrieve the 2 queues separately (the recycle queue will |
| 46 | // have the same length and hash as the main one). However, any frees made |
| 47 | // using a queue with only (name,length,hash,queue_length) before the |
| 48 | // recycle queue has been associated with it will not go on to the recycle |
| 49 | // queue. |
| 50 | // NOTE: calling this function with the same (name,length,hash,queue_length) |
| 51 | // but multiple recycle_queue_lengths will result in each freed message being |
| 52 | // put onto an undefined one of the recycle queues. |
Brian Silverman | 227ad48 | 2014-03-23 11:21:32 -0700 | [diff] [blame] | 53 | // Will never return NULL. |
Brian Silverman | 08661c7 | 2013-09-01 17:24:38 -0700 | [diff] [blame] | 54 | static RawQueue *Fetch(const char *name, size_t length, int hash, |
Brian Silverman | 5d3ab7f | 2015-02-21 15:54:21 -0500 | [diff] [blame] | 55 | int queue_length, int recycle_hash, |
| 56 | int recycle_queue_length, RawQueue **recycle); |
brians | 343bc11 | 2013-02-10 01:53:46 +0000 | [diff] [blame] | 57 | |
Brian Silverman | eb51cbb | 2014-03-14 22:57:08 -0700 | [diff] [blame] | 58 | // Doesn't update the currently read index (the read messages in the queue or |
| 59 | // the index). This means the returned message (and any others skipped with |
| 60 | // kFromEnd) will be left in the queue. |
Brian Silverman | a6d1b56 | 2013-09-01 14:39:39 -0700 | [diff] [blame] | 61 | // For reading only. |
Brian Silverman | 7faaec7 | 2014-05-26 16:25:38 -0700 | [diff] [blame] | 62 | // Not valid for ReadMessageIndex combined with kFromEnd. |
| 63 | static constexpr Options<RawQueue>::Option kPeek{0x0001}; |
Brian Silverman | a6d1b56 | 2013-09-01 14:39:39 -0700 | [diff] [blame] | 64 | // Reads the last message in the queue instead of just the next one. |
| 65 | // NOTE: This removes all of the messages until the last one from the queue |
Brian Silverman | eb51cbb | 2014-03-14 22:57:08 -0700 | [diff] [blame] | 66 | // (which means that nobody else will read them). |
Brian Silverman | a6d1b56 | 2013-09-01 14:39:39 -0700 | [diff] [blame] | 67 | // For reading only. |
Brian Silverman | 7faaec7 | 2014-05-26 16:25:38 -0700 | [diff] [blame] | 68 | // Not valid for ReadMessageIndex combined with kPeek. |
| 69 | static constexpr Options<RawQueue>::Option kFromEnd{0x0002}; |
Brian Silverman | a6d1b56 | 2013-09-01 14:39:39 -0700 | [diff] [blame] | 70 | // Causes reads to return NULL and writes to fail instead of waiting. |
| 71 | // For reading and writing. |
Brian Silverman | 7faaec7 | 2014-05-26 16:25:38 -0700 | [diff] [blame] | 72 | static constexpr Options<RawQueue>::Option kNonBlock{0x0004}; |
Brian Silverman | a6d1b56 | 2013-09-01 14:39:39 -0700 | [diff] [blame] | 73 | // Causes things to block. |
Brian Silverman | a6d1b56 | 2013-09-01 14:39:39 -0700 | [diff] [blame] | 74 | // For reading and writing. |
Brian Silverman | 7faaec7 | 2014-05-26 16:25:38 -0700 | [diff] [blame] | 75 | static constexpr Options<RawQueue>::Option kBlock{0x0008}; |
Brian Silverman | a6d1b56 | 2013-09-01 14:39:39 -0700 | [diff] [blame] | 76 | // Causes writes to overwrite the oldest message in the queue instead of |
| 77 | // blocking. |
| 78 | // For writing only. |
Brian Silverman | 7faaec7 | 2014-05-26 16:25:38 -0700 | [diff] [blame] | 79 | static constexpr Options<RawQueue>::Option kOverride{0x0010}; |
brians | 343bc11 | 2013-02-10 01:53:46 +0000 | [diff] [blame] | 80 | |
Brian Silverman | b2db29b | 2018-01-03 20:29:07 -0800 | [diff] [blame] | 81 | RawQueue(const RawQueue &) = default; |
| 82 | RawQueue &operator=(const RawQueue &) = default; |
| 83 | |
Brian Silverman | a6d1b56 | 2013-09-01 14:39:39 -0700 | [diff] [blame] | 84 | // Writes a message into the queue. |
| 85 | // This function takes ownership of msg. |
| 86 | // NOTE: msg must point to a valid message from this queue |
Brian Silverman | 227ad48 | 2014-03-23 11:21:32 -0700 | [diff] [blame] | 87 | // Returns true on success. A return value of false means msg has already been |
| 88 | // freed. |
Brian Silverman | 7faaec7 | 2014-05-26 16:25:38 -0700 | [diff] [blame] | 89 | bool WriteMessage(void *msg, Options<RawQueue> options) { |
| 90 | static constexpr Options<RawQueue> kWriteFailureOptions = |
| 91 | kNonBlock | kBlock | kOverride; |
| 92 | if (!options.NoOthersSet(kWriteFailureOptions)) { |
Alex Perry | cb7da4b | 2019-08-28 19:35:56 -0700 | [diff] [blame^] | 93 | LOG(FATAL) << "illegal write options in " << std::hex |
| 94 | << options.printable(); |
Brian Silverman | 7faaec7 | 2014-05-26 16:25:38 -0700 | [diff] [blame] | 95 | } |
| 96 | if (!options.ExactlyOneSet(kWriteFailureOptions)) { |
Alex Perry | cb7da4b | 2019-08-28 19:35:56 -0700 | [diff] [blame^] | 97 | LOG(FATAL) << "invalid write options " << std::hex << options.printable(); |
Brian Silverman | 7faaec7 | 2014-05-26 16:25:38 -0700 | [diff] [blame] | 98 | } |
| 99 | return DoWriteMessage(msg, options); |
| 100 | } |
brians | 343bc11 | 2013-02-10 01:53:46 +0000 | [diff] [blame] | 101 | |
Brian Silverman | a6d1b56 | 2013-09-01 14:39:39 -0700 | [diff] [blame] | 102 | // Reads a message out of the queue. |
| 103 | // The return value will have at least the length of this queue's worth of |
| 104 | // valid data where it's pointing to. |
| 105 | // The return value is const because other people might be viewing the same |
| 106 | // messsage. Do not cast the const away! |
| 107 | // IMPORTANT: The return value (if not NULL) must eventually be passed to |
| 108 | // FreeMessage. |
Brian Silverman | 7faaec7 | 2014-05-26 16:25:38 -0700 | [diff] [blame] | 109 | const void *ReadMessage(Options<RawQueue> options) { |
| 110 | CheckReadOptions(options); |
| 111 | return DoReadMessage(options); |
| 112 | } |
Brian Silverman | 227ad48 | 2014-03-23 11:21:32 -0700 | [diff] [blame] | 113 | // The same as ReadMessage, except it will never return the |
| 114 | // same message twice (when used with the same index argument). However, |
| 115 | // may not return some messages that pass through the queue. |
Brian Silverman | a6d1b56 | 2013-09-01 14:39:39 -0700 | [diff] [blame] | 116 | // *index should start as 0. index does not have to be in shared memory, but |
Brian Silverman | cd2d84c | 2014-03-13 23:30:58 -0700 | [diff] [blame] | 117 | // it can be. |
Brian Silverman | 7faaec7 | 2014-05-26 16:25:38 -0700 | [diff] [blame] | 118 | // Calling with both kPeek and kFromEnd in options isn't valid because that |
| 119 | // would mean ignoring index, which would make this function the same as |
| 120 | // ReadMessage (which should be used instead). |
Austin Schuh | 0ad2b6f | 2019-06-09 21:27:07 -0700 | [diff] [blame] | 121 | const void *ReadMessageIndex( |
| 122 | Options<RawQueue> options, int *index, |
| 123 | ::std::chrono::nanoseconds timeout = ::std::chrono::nanoseconds(0)) { |
Brian Silverman | 7faaec7 | 2014-05-26 16:25:38 -0700 | [diff] [blame] | 124 | CheckReadOptions(options); |
| 125 | static constexpr Options<RawQueue> kFromEndAndPeek = kFromEnd | kPeek; |
| 126 | if (options.AllSet(kFromEndAndPeek)) { |
Alex Perry | cb7da4b | 2019-08-28 19:35:56 -0700 | [diff] [blame^] | 127 | LOG(FATAL) << "ReadMessageIndex(kFromEnd | kPeek) is not allowed"; |
Brian Silverman | 7faaec7 | 2014-05-26 16:25:38 -0700 | [diff] [blame] | 128 | } |
Austin Schuh | 0ad2b6f | 2019-06-09 21:27:07 -0700 | [diff] [blame] | 129 | return DoReadMessageIndex(options, index, timeout); |
Brian Silverman | 7faaec7 | 2014-05-26 16:25:38 -0700 | [diff] [blame] | 130 | } |
brians | 343bc11 | 2013-02-10 01:53:46 +0000 | [diff] [blame] | 131 | |
Brian Silverman | a6d1b56 | 2013-09-01 14:39:39 -0700 | [diff] [blame] | 132 | // Retrieves ("allocates") a message that can then be written to the queue. |
| 133 | // NOTE: the return value will be completely uninitialized |
| 134 | // The return value will have at least the length of this queue's worth of |
| 135 | // valid memory where it's pointing to. |
| 136 | // Returns NULL for error. |
| 137 | // IMPORTANT: The return value (if not NULL) must eventually be passed to |
Brian Silverman | 227ad48 | 2014-03-23 11:21:32 -0700 | [diff] [blame] | 138 | // FreeMessage or WriteMessage. |
Brian Silverman | a6d1b56 | 2013-09-01 14:39:39 -0700 | [diff] [blame] | 139 | void *GetMessage(); |
brians | 343bc11 | 2013-02-10 01:53:46 +0000 | [diff] [blame] | 140 | |
Brian Silverman | 797e71e | 2013-09-06 17:29:39 -0700 | [diff] [blame] | 141 | // It is ok to call this method with a NULL msg. |
| 142 | void FreeMessage(const void *msg) { |
| 143 | if (msg != NULL) DecrementMessageReferenceCount(msg); |
| 144 | } |
brians | 343bc11 | 2013-02-10 01:53:46 +0000 | [diff] [blame] | 145 | |
Brian Silverman | c2e0422 | 2014-03-22 12:43:44 -0700 | [diff] [blame] | 146 | // UNSAFE! Returns the number of free messages we have. Only safe to use when |
| 147 | // only 1 task is using this object (ie in tests). |
| 148 | int FreeMessages() const; |
Brian Silverman | eb51cbb | 2014-03-14 22:57:08 -0700 | [diff] [blame] | 149 | |
Austin Schuh | d681bbd | 2019-02-02 12:03:32 -0800 | [diff] [blame] | 150 | // Returns the name of the queue. |
| 151 | const char *name() const { return name_; } |
| 152 | |
Brian Silverman | a6d1b56 | 2013-09-01 14:39:39 -0700 | [diff] [blame] | 153 | private: |
| 154 | struct MessageHeader; |
Brian Silverman | 797e71e | 2013-09-06 17:29:39 -0700 | [diff] [blame] | 155 | |
Brian Silverman | 7faaec7 | 2014-05-26 16:25:38 -0700 | [diff] [blame] | 156 | // The public wrappers around these are inlined and do argument checking. |
| 157 | bool DoWriteMessage(void *msg, Options<RawQueue> options); |
| 158 | const void *DoReadMessage(Options<RawQueue> options); |
Austin Schuh | 0ad2b6f | 2019-06-09 21:27:07 -0700 | [diff] [blame] | 159 | const void *DoReadMessageIndex(Options<RawQueue> options, int *index, |
| 160 | ::std::chrono::nanoseconds timeout); |
Brian Silverman | 7faaec7 | 2014-05-26 16:25:38 -0700 | [diff] [blame] | 161 | void CheckReadOptions(Options<RawQueue> options) { |
| 162 | static constexpr Options<RawQueue> kValidOptions = |
| 163 | kPeek | kFromEnd | kNonBlock | kBlock; |
| 164 | if (!options.NoOthersSet(kValidOptions)) { |
Alex Perry | cb7da4b | 2019-08-28 19:35:56 -0700 | [diff] [blame^] | 165 | LOG(FATAL) << "illegal read options in " << std::hex |
| 166 | << options.printable(); |
Brian Silverman | 7faaec7 | 2014-05-26 16:25:38 -0700 | [diff] [blame] | 167 | } |
| 168 | static constexpr Options<RawQueue> kBlockChoices = kNonBlock | kBlock; |
| 169 | if (!options.ExactlyOneSet(kBlockChoices)) { |
Alex Perry | cb7da4b | 2019-08-28 19:35:56 -0700 | [diff] [blame^] | 170 | LOG(FATAL) << "invalid read options " << std::hex << options.printable(); |
Brian Silverman | 7faaec7 | 2014-05-26 16:25:38 -0700 | [diff] [blame] | 171 | } |
| 172 | } |
| 173 | |
Brian Silverman | 4d0789d | 2014-03-23 17:03:07 -0700 | [diff] [blame] | 174 | // Adds 1 to the given index and handles wrapping correctly. |
| 175 | int index_add1(int index); |
| 176 | |
Brian Silverman | 797e71e | 2013-09-06 17:29:39 -0700 | [diff] [blame] | 177 | bool is_readable() { return data_end_ != data_start_; } |
Brian Silverman | 4d0789d | 2014-03-23 17:03:07 -0700 | [diff] [blame] | 178 | bool is_writable() { return index_add1(data_end_) != data_start_; } |
brians | 343bc11 | 2013-02-10 01:53:46 +0000 | [diff] [blame] | 179 | |
Brian Silverman | a6d1b56 | 2013-09-01 14:39:39 -0700 | [diff] [blame] | 180 | // These next 4 allow finding the right one. |
| 181 | const char *name_; |
| 182 | size_t length_; |
| 183 | int hash_; |
| 184 | int queue_length_; |
| 185 | // The next one in the linked list of queues. |
Brian Silverman | 08661c7 | 2013-09-01 17:24:38 -0700 | [diff] [blame] | 186 | RawQueue *next_; |
brians | 343bc11 | 2013-02-10 01:53:46 +0000 | [diff] [blame] | 187 | |
Brian Silverman | 08661c7 | 2013-09-01 17:24:38 -0700 | [diff] [blame] | 188 | RawQueue *recycle_; |
Brian Silverman | a6d1b56 | 2013-09-01 14:39:39 -0700 | [diff] [blame] | 189 | |
| 190 | Mutex data_lock_; // protects operations on data_ etc |
Brian Silverman | 797e71e | 2013-09-06 17:29:39 -0700 | [diff] [blame] | 191 | // Always gets broadcasted to because different readers might have different |
| 192 | // ideas of what "readable" means (ie ones using separated indices). |
Brian Silverman | a6d1b56 | 2013-09-01 14:39:39 -0700 | [diff] [blame] | 193 | Condition readable_; |
| 194 | Condition writable_; |
| 195 | int data_length_; // max length into data + 1 |
| 196 | int data_start_; // is an index into data |
| 197 | int data_end_; // is an index into data |
| 198 | int messages_; // that have passed through |
| 199 | void **data_; // array of messages (with headers) |
| 200 | |
Brian Silverman | a6d1b56 | 2013-09-01 14:39:39 -0700 | [diff] [blame] | 201 | size_t msg_length_; // sizeof(each message) including the header |
Brian Silverman | c2e0422 | 2014-03-22 12:43:44 -0700 | [diff] [blame] | 202 | // A pointer to the first in the linked list of free messages. |
| 203 | MessageHeader *free_messages_; |
Brian Silverman | a6d1b56 | 2013-09-01 14:39:39 -0700 | [diff] [blame] | 204 | |
Brian Silverman | 42d5237 | 2014-03-23 15:29:13 -0700 | [diff] [blame] | 205 | // Keeps track of if the queue was writable before a read so we can Signal() a |
| 206 | // reader if we transition it. |
| 207 | bool writable_start_; |
| 208 | |
Brian Silverman | 3510980 | 2014-04-09 14:31:53 -0700 | [diff] [blame] | 209 | // True iff somebody is currently Wait()ing on readable_. |
| 210 | // Set to true by each reader before calling Wait() and set back to false |
| 211 | // before the Broadcast(). |
| 212 | bool readable_waiting_; |
| 213 | |
Brian Silverman | a6d1b56 | 2013-09-01 14:39:39 -0700 | [diff] [blame] | 214 | // Actually frees the given message. |
| 215 | void DoFreeMessage(const void *msg); |
| 216 | // Calls DoFreeMessage if appropriate. |
| 217 | void DecrementMessageReferenceCount(const void *msg); |
Brian Silverman | 430e7fa | 2014-03-21 16:58:33 -0700 | [diff] [blame] | 218 | // Only does the actual incrementing of the reference count. |
| 219 | void IncrementMessageReferenceCount(const void *msg) const; |
Brian Silverman | a6d1b56 | 2013-09-01 14:39:39 -0700 | [diff] [blame] | 220 | |
Brian Silverman | eb51cbb | 2014-03-14 22:57:08 -0700 | [diff] [blame] | 221 | // Must be called with data_lock_ locked. |
Brian Silverman | 797e71e | 2013-09-06 17:29:39 -0700 | [diff] [blame] | 222 | // *read_data will be initialized. |
Brian Silverman | a6d1b56 | 2013-09-01 14:39:39 -0700 | [diff] [blame] | 223 | // Returns with a readable message in data_ or false. |
Austin Schuh | 0ad2b6f | 2019-06-09 21:27:07 -0700 | [diff] [blame] | 224 | bool ReadCommonStart(Options<RawQueue> options, int *index, |
| 225 | ::std::chrono::nanoseconds timeout); |
Brian Silverman | a6d1b56 | 2013-09-01 14:39:39 -0700 | [diff] [blame] | 226 | // Deals with setting/unsetting readable_ and writable_. |
Brian Silverman | eb51cbb | 2014-03-14 22:57:08 -0700 | [diff] [blame] | 227 | // Must be called after data_lock_ has been unlocked. |
Brian Silverman | 797e71e | 2013-09-06 17:29:39 -0700 | [diff] [blame] | 228 | // read_data should be the same thing that was passed in to ReadCommonStart. |
Brian Silverman | 42d5237 | 2014-03-23 15:29:13 -0700 | [diff] [blame] | 229 | void ReadCommonEnd(); |
Brian Silverman | 227ad48 | 2014-03-23 11:21:32 -0700 | [diff] [blame] | 230 | // Returns the index of the last message. |
| 231 | // Useful for reading with kPeek. |
| 232 | int LastMessageIndex() const; |
Brian Silverman | a6d1b56 | 2013-09-01 14:39:39 -0700 | [diff] [blame] | 233 | |
| 234 | // Gets called by Fetch when necessary (with placement new). |
Brian Silverman | 08661c7 | 2013-09-01 17:24:38 -0700 | [diff] [blame] | 235 | RawQueue(const char *name, size_t length, int hash, int queue_length); |
Brian Silverman | a6d1b56 | 2013-09-01 14:39:39 -0700 | [diff] [blame] | 236 | }; |
| 237 | |
| 238 | } // namespace aos |
| 239 | |
John Park | 398c74a | 2018-10-20 21:17:39 -0700 | [diff] [blame] | 240 | #endif // AOS_IPC_LIB_QUEUE_H_ |