blob: 6e07993622e27637a61d29a79b3e388ab18d0590 [file] [log] [blame]
John Park398c74a2018-10-20 21:17:39 -07001#ifndef AOS_IPC_LIB_QUEUE_H_
2#define AOS_IPC_LIB_QUEUE_H_
brians343bc112013-02-10 01:53:46 +00003
John Park398c74a2018-10-20 21:17:39 -07004#include "aos/ipc_lib/shared_mem.h"
John Park33858a32018-09-28 23:05:48 -07005#include "aos/mutex/mutex.h"
6#include "aos/condition.h"
7#include "aos/util/options.h"
Alex Perrycb7da4b2019-08-28 19:35:56 -07008#include "glog/logging.h"
brians343bc112013-02-10 01:53:46 +00009
10// TODO(brians) add valgrind client requests to the queue and shared_mem_malloc
11// code to make checking for leaks work better
12// <http://www.valgrind.org/docs/manual/mc-manual.html#mc-manual.mempools>
13// describes how
14
brians343bc112013-02-10 01:53:46 +000015// Any pointers returned from these functions can be safely passed to other
16// processes because they are all shared memory pointers.
17// IMPORTANT: Any message pointer must be passed back in some way
Brian Silvermana6d1b562013-09-01 14:39:39 -070018// (FreeMessage and WriteMessage are common ones) or the
brians343bc112013-02-10 01:53:46 +000019// application will leak shared memory.
Brian Silvermana6d1b562013-09-01 14:39:39 -070020// NOTE: Taking a message from ReadMessage and then passing it to WriteMessage
21// might work, but it is not guaranteed to.
brians343bc112013-02-10 01:53:46 +000022
Brian Silvermana6d1b562013-09-01 14:39:39 -070023namespace aos {
brians343bc112013-02-10 01:53:46 +000024
Brian Silvermana6d1b562013-09-01 14:39:39 -070025// Queues are the primary way to use shared memory. Basic use consists of
26// calling Queue::Fetch and then reading and/or writing messages.
27// Queues (as the name suggests) are a FIFO stack of messages. Each combination
28// of name and type signature will result in a different queue, which means
29// that if you only recompile some code that uses differently sized messages,
30// it will simply use a different queue than the old code.
Brian Silverman08661c72013-09-01 17:24:38 -070031class RawQueue {
Brian Silvermana6d1b562013-09-01 14:39:39 -070032 public:
33 // Retrieves (and creates if necessary) a queue. Each combination of name and
34 // signature refers to a completely independent queue.
35 // length is how large each message will be
36 // hash can differentiate multiple otherwise identical queues
37 // queue_length is how many messages the queue will be able to hold
Brian Silverman227ad482014-03-23 11:21:32 -070038 // Will never return NULL.
Brian Silverman08661c72013-09-01 17:24:38 -070039 static RawQueue *Fetch(const char *name, size_t length, int hash,
Brian Silverman5d3ab7f2015-02-21 15:54:21 -050040 int queue_length);
Brian Silvermana6d1b562013-09-01 14:39:39 -070041 // Same as above, except sets up the returned queue so that it will put
42 // messages on *recycle when they are freed (after they have been released by
43 // all other readers/writers and are not in the queue).
44 // recycle_queue_length determines how many freed messages will be kept.
45 // Other code can retrieve the 2 queues separately (the recycle queue will
46 // have the same length and hash as the main one). However, any frees made
47 // using a queue with only (name,length,hash,queue_length) before the
48 // recycle queue has been associated with it will not go on to the recycle
49 // queue.
50 // NOTE: calling this function with the same (name,length,hash,queue_length)
51 // but multiple recycle_queue_lengths will result in each freed message being
52 // put onto an undefined one of the recycle queues.
Brian Silverman227ad482014-03-23 11:21:32 -070053 // Will never return NULL.
Brian Silverman08661c72013-09-01 17:24:38 -070054 static RawQueue *Fetch(const char *name, size_t length, int hash,
Brian Silverman5d3ab7f2015-02-21 15:54:21 -050055 int queue_length, int recycle_hash,
56 int recycle_queue_length, RawQueue **recycle);
brians343bc112013-02-10 01:53:46 +000057
Brian Silvermaneb51cbb2014-03-14 22:57:08 -070058 // Doesn't update the currently read index (the read messages in the queue or
59 // the index). This means the returned message (and any others skipped with
60 // kFromEnd) will be left in the queue.
Brian Silvermana6d1b562013-09-01 14:39:39 -070061 // For reading only.
Brian Silverman7faaec72014-05-26 16:25:38 -070062 // Not valid for ReadMessageIndex combined with kFromEnd.
63 static constexpr Options<RawQueue>::Option kPeek{0x0001};
Brian Silvermana6d1b562013-09-01 14:39:39 -070064 // Reads the last message in the queue instead of just the next one.
65 // NOTE: This removes all of the messages until the last one from the queue
Brian Silvermaneb51cbb2014-03-14 22:57:08 -070066 // (which means that nobody else will read them).
Brian Silvermana6d1b562013-09-01 14:39:39 -070067 // For reading only.
Brian Silverman7faaec72014-05-26 16:25:38 -070068 // Not valid for ReadMessageIndex combined with kPeek.
69 static constexpr Options<RawQueue>::Option kFromEnd{0x0002};
Brian Silvermana6d1b562013-09-01 14:39:39 -070070 // Causes reads to return NULL and writes to fail instead of waiting.
71 // For reading and writing.
Brian Silverman7faaec72014-05-26 16:25:38 -070072 static constexpr Options<RawQueue>::Option kNonBlock{0x0004};
Brian Silvermana6d1b562013-09-01 14:39:39 -070073 // Causes things to block.
Brian Silvermana6d1b562013-09-01 14:39:39 -070074 // For reading and writing.
Brian Silverman7faaec72014-05-26 16:25:38 -070075 static constexpr Options<RawQueue>::Option kBlock{0x0008};
Brian Silvermana6d1b562013-09-01 14:39:39 -070076 // Causes writes to overwrite the oldest message in the queue instead of
77 // blocking.
78 // For writing only.
Brian Silverman7faaec72014-05-26 16:25:38 -070079 static constexpr Options<RawQueue>::Option kOverride{0x0010};
brians343bc112013-02-10 01:53:46 +000080
Brian Silvermanb2db29b2018-01-03 20:29:07 -080081 RawQueue(const RawQueue &) = default;
82 RawQueue &operator=(const RawQueue &) = default;
83
Brian Silvermana6d1b562013-09-01 14:39:39 -070084 // Writes a message into the queue.
85 // This function takes ownership of msg.
86 // NOTE: msg must point to a valid message from this queue
Brian Silverman227ad482014-03-23 11:21:32 -070087 // Returns true on success. A return value of false means msg has already been
88 // freed.
Brian Silverman7faaec72014-05-26 16:25:38 -070089 bool WriteMessage(void *msg, Options<RawQueue> options) {
90 static constexpr Options<RawQueue> kWriteFailureOptions =
91 kNonBlock | kBlock | kOverride;
92 if (!options.NoOthersSet(kWriteFailureOptions)) {
Alex Perrycb7da4b2019-08-28 19:35:56 -070093 LOG(FATAL) << "illegal write options in " << std::hex
94 << options.printable();
Brian Silverman7faaec72014-05-26 16:25:38 -070095 }
96 if (!options.ExactlyOneSet(kWriteFailureOptions)) {
Alex Perrycb7da4b2019-08-28 19:35:56 -070097 LOG(FATAL) << "invalid write options " << std::hex << options.printable();
Brian Silverman7faaec72014-05-26 16:25:38 -070098 }
99 return DoWriteMessage(msg, options);
100 }
brians343bc112013-02-10 01:53:46 +0000101
Brian Silvermana6d1b562013-09-01 14:39:39 -0700102 // Reads a message out of the queue.
103 // The return value will have at least the length of this queue's worth of
104 // valid data where it's pointing to.
105 // The return value is const because other people might be viewing the same
106 // messsage. Do not cast the const away!
107 // IMPORTANT: The return value (if not NULL) must eventually be passed to
108 // FreeMessage.
Brian Silverman7faaec72014-05-26 16:25:38 -0700109 const void *ReadMessage(Options<RawQueue> options) {
110 CheckReadOptions(options);
111 return DoReadMessage(options);
112 }
Brian Silverman227ad482014-03-23 11:21:32 -0700113 // The same as ReadMessage, except it will never return the
114 // same message twice (when used with the same index argument). However,
115 // may not return some messages that pass through the queue.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700116 // *index should start as 0. index does not have to be in shared memory, but
Brian Silvermancd2d84c2014-03-13 23:30:58 -0700117 // it can be.
Brian Silverman7faaec72014-05-26 16:25:38 -0700118 // Calling with both kPeek and kFromEnd in options isn't valid because that
119 // would mean ignoring index, which would make this function the same as
120 // ReadMessage (which should be used instead).
Austin Schuh0ad2b6f2019-06-09 21:27:07 -0700121 const void *ReadMessageIndex(
122 Options<RawQueue> options, int *index,
123 ::std::chrono::nanoseconds timeout = ::std::chrono::nanoseconds(0)) {
Brian Silverman7faaec72014-05-26 16:25:38 -0700124 CheckReadOptions(options);
125 static constexpr Options<RawQueue> kFromEndAndPeek = kFromEnd | kPeek;
126 if (options.AllSet(kFromEndAndPeek)) {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700127 LOG(FATAL) << "ReadMessageIndex(kFromEnd | kPeek) is not allowed";
Brian Silverman7faaec72014-05-26 16:25:38 -0700128 }
Austin Schuh0ad2b6f2019-06-09 21:27:07 -0700129 return DoReadMessageIndex(options, index, timeout);
Brian Silverman7faaec72014-05-26 16:25:38 -0700130 }
brians343bc112013-02-10 01:53:46 +0000131
Brian Silvermana6d1b562013-09-01 14:39:39 -0700132 // Retrieves ("allocates") a message that can then be written to the queue.
133 // NOTE: the return value will be completely uninitialized
134 // The return value will have at least the length of this queue's worth of
135 // valid memory where it's pointing to.
136 // Returns NULL for error.
137 // IMPORTANT: The return value (if not NULL) must eventually be passed to
Brian Silverman227ad482014-03-23 11:21:32 -0700138 // FreeMessage or WriteMessage.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700139 void *GetMessage();
brians343bc112013-02-10 01:53:46 +0000140
Brian Silverman797e71e2013-09-06 17:29:39 -0700141 // It is ok to call this method with a NULL msg.
142 void FreeMessage(const void *msg) {
143 if (msg != NULL) DecrementMessageReferenceCount(msg);
144 }
brians343bc112013-02-10 01:53:46 +0000145
Brian Silvermanc2e04222014-03-22 12:43:44 -0700146 // UNSAFE! Returns the number of free messages we have. Only safe to use when
147 // only 1 task is using this object (ie in tests).
148 int FreeMessages() const;
Brian Silvermaneb51cbb2014-03-14 22:57:08 -0700149
Austin Schuhd681bbd2019-02-02 12:03:32 -0800150 // Returns the name of the queue.
151 const char *name() const { return name_; }
152
Brian Silvermana6d1b562013-09-01 14:39:39 -0700153 private:
154 struct MessageHeader;
Brian Silverman797e71e2013-09-06 17:29:39 -0700155
Brian Silverman7faaec72014-05-26 16:25:38 -0700156 // The public wrappers around these are inlined and do argument checking.
157 bool DoWriteMessage(void *msg, Options<RawQueue> options);
158 const void *DoReadMessage(Options<RawQueue> options);
Austin Schuh0ad2b6f2019-06-09 21:27:07 -0700159 const void *DoReadMessageIndex(Options<RawQueue> options, int *index,
160 ::std::chrono::nanoseconds timeout);
Brian Silverman7faaec72014-05-26 16:25:38 -0700161 void CheckReadOptions(Options<RawQueue> options) {
162 static constexpr Options<RawQueue> kValidOptions =
163 kPeek | kFromEnd | kNonBlock | kBlock;
164 if (!options.NoOthersSet(kValidOptions)) {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700165 LOG(FATAL) << "illegal read options in " << std::hex
166 << options.printable();
Brian Silverman7faaec72014-05-26 16:25:38 -0700167 }
168 static constexpr Options<RawQueue> kBlockChoices = kNonBlock | kBlock;
169 if (!options.ExactlyOneSet(kBlockChoices)) {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700170 LOG(FATAL) << "invalid read options " << std::hex << options.printable();
Brian Silverman7faaec72014-05-26 16:25:38 -0700171 }
172 }
173
Brian Silverman4d0789d2014-03-23 17:03:07 -0700174 // Adds 1 to the given index and handles wrapping correctly.
175 int index_add1(int index);
176
Brian Silverman797e71e2013-09-06 17:29:39 -0700177 bool is_readable() { return data_end_ != data_start_; }
Brian Silverman4d0789d2014-03-23 17:03:07 -0700178 bool is_writable() { return index_add1(data_end_) != data_start_; }
brians343bc112013-02-10 01:53:46 +0000179
Brian Silvermana6d1b562013-09-01 14:39:39 -0700180 // These next 4 allow finding the right one.
181 const char *name_;
182 size_t length_;
183 int hash_;
184 int queue_length_;
185 // The next one in the linked list of queues.
Brian Silverman08661c72013-09-01 17:24:38 -0700186 RawQueue *next_;
brians343bc112013-02-10 01:53:46 +0000187
Brian Silverman08661c72013-09-01 17:24:38 -0700188 RawQueue *recycle_;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700189
190 Mutex data_lock_; // protects operations on data_ etc
Brian Silverman797e71e2013-09-06 17:29:39 -0700191 // Always gets broadcasted to because different readers might have different
192 // ideas of what "readable" means (ie ones using separated indices).
Brian Silvermana6d1b562013-09-01 14:39:39 -0700193 Condition readable_;
194 Condition writable_;
195 int data_length_; // max length into data + 1
196 int data_start_; // is an index into data
197 int data_end_; // is an index into data
198 int messages_; // that have passed through
199 void **data_; // array of messages (with headers)
200
Brian Silvermana6d1b562013-09-01 14:39:39 -0700201 size_t msg_length_; // sizeof(each message) including the header
Brian Silvermanc2e04222014-03-22 12:43:44 -0700202 // A pointer to the first in the linked list of free messages.
203 MessageHeader *free_messages_;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700204
Brian Silverman42d52372014-03-23 15:29:13 -0700205 // Keeps track of if the queue was writable before a read so we can Signal() a
206 // reader if we transition it.
207 bool writable_start_;
208
Brian Silverman35109802014-04-09 14:31:53 -0700209 // True iff somebody is currently Wait()ing on readable_.
210 // Set to true by each reader before calling Wait() and set back to false
211 // before the Broadcast().
212 bool readable_waiting_;
213
Brian Silvermana6d1b562013-09-01 14:39:39 -0700214 // Actually frees the given message.
215 void DoFreeMessage(const void *msg);
216 // Calls DoFreeMessage if appropriate.
217 void DecrementMessageReferenceCount(const void *msg);
Brian Silverman430e7fa2014-03-21 16:58:33 -0700218 // Only does the actual incrementing of the reference count.
219 void IncrementMessageReferenceCount(const void *msg) const;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700220
Brian Silvermaneb51cbb2014-03-14 22:57:08 -0700221 // Must be called with data_lock_ locked.
Brian Silverman797e71e2013-09-06 17:29:39 -0700222 // *read_data will be initialized.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700223 // Returns with a readable message in data_ or false.
Austin Schuh0ad2b6f2019-06-09 21:27:07 -0700224 bool ReadCommonStart(Options<RawQueue> options, int *index,
225 ::std::chrono::nanoseconds timeout);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700226 // Deals with setting/unsetting readable_ and writable_.
Brian Silvermaneb51cbb2014-03-14 22:57:08 -0700227 // Must be called after data_lock_ has been unlocked.
Brian Silverman797e71e2013-09-06 17:29:39 -0700228 // read_data should be the same thing that was passed in to ReadCommonStart.
Brian Silverman42d52372014-03-23 15:29:13 -0700229 void ReadCommonEnd();
Brian Silverman227ad482014-03-23 11:21:32 -0700230 // Returns the index of the last message.
231 // Useful for reading with kPeek.
232 int LastMessageIndex() const;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700233
234 // Gets called by Fetch when necessary (with placement new).
Brian Silverman08661c72013-09-01 17:24:38 -0700235 RawQueue(const char *name, size_t length, int hash, int queue_length);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700236};
237
238} // namespace aos
239
John Park398c74a2018-10-20 21:17:39 -0700240#endif // AOS_IPC_LIB_QUEUE_H_