blob: 77fdf0d869bf09c8f5ead8af096e33ef62d4b894 [file] [log] [blame]
brians343bc112013-02-10 01:53:46 +00001#ifndef AOS_COMMON_QUEUE_H_
2#define AOS_COMMON_QUEUE_H_
3
4#include <assert.h>
5
6#if defined(__VXWORKS__) || defined(__TEST_VXWORKS__)
7#define USE_UNSAFE
8#else
9#undef USE_UNSAFE
10#endif
11
Brian Silverman3204dd82013-03-12 18:42:01 -070012#include "aos/common/time.h"
brians343bc112013-02-10 01:53:46 +000013#include "aos/common/macros.h"
14#ifndef USE_UNSAFE
Brian Silverman14fd0fb2014-01-14 21:42:01 -080015#include "aos/linux_code/ipc_lib/queue.h"
brians343bc112013-02-10 01:53:46 +000016#endif // USE_UNSAFE
17#include "aos/common/time.h"
18
brians343bc112013-02-10 01:53:46 +000019namespace aos {
20
Brian Silvermanc4546d22014-02-10 18:03:09 -080021#ifndef SWIG
22
23// Prints the value from 1 message field into output.
24// output is where to write the text representation.
25// output_bytes should point to the number of bytes available to write in
26// output. It will be changed to the number of bytes that were actually written.
27// input is where to read the data in from.
28// input_bytes should point to the number of bytes available to read from input.
29// It will be changed to the number of bytes that were actually read.
30// type is the ID of a type to print. It must be a primitive type.
31//
32// The implementation of this is generated by the ruby code.
33// TODO(brians): Actually do that.
34bool PrintField(char *output, size_t *output_bytes, void *input,
35 size_t *input_bytes, uint32_t type);
36
37// The type IDs this uses are 2 parts: a 16 bit size and a 16 bit hash. Sizes
38// for primitive types are stored with 8192 added.
39//
40// Serializing/deserializing includes all of the fields too.
41struct MessageType {
42 struct Field {
43 uint32_t type;
44 const char *name;
45 };
46
47 ~MessageType() {
48 for (int i = 0; i < number_fields; ++i) {
49 delete fields[i];
50 }
51 }
52
53 // Returns -1 for error.
54 ssize_t Serialize(char *buffer, size_t max_bytes) const;
55 // bytes should start out as the number of bytes available in buffer and gets
56 // set to the number actually read before returning.
57 // Returns a new instance allocated with new or NULL for error.
58 static MessageType *Deserialize(const char *buffer, size_t *bytes);
59
60 uint32_t id;
61 const char *name;
62
63 int number_fields;
64 const Field *fields[];
65};
66
67#endif // SWIG
68
brians343bc112013-02-10 01:53:46 +000069// This class is a base class for all messages sent over queues.
70class Message {
71 public:
72 typedef ::aos::time::Time Time;
73 // The time that the message was sent at.
74 Time sent_time;
75
76 Message() : sent_time(0, 0) {}
77
78 // Zeros out the time.
Brian Silvermanc4546d22014-02-10 18:03:09 -080079 // Overriden to zero the whole message.
brians343bc112013-02-10 01:53:46 +000080 void Zero();
Brian Silvermanc4546d22014-02-10 18:03:09 -080081 // Returns the size of the common fields.
82 // Overriden to return the size of the whole message.
brians343bc112013-02-10 01:53:46 +000083 static size_t Size() { return sizeof(Time); }
84
85 // Deserializes the common fields from the buffer.
Brian Silvermanc4546d22014-02-10 18:03:09 -080086 // Overriden to deserialize the whole message.
brians343bc112013-02-10 01:53:46 +000087 size_t Deserialize(const char *buffer);
88 // Serializes the common fields into the buffer.
Brian Silvermanc4546d22014-02-10 18:03:09 -080089 // Overriden to serialize the whole message.
brians343bc112013-02-10 01:53:46 +000090 size_t Serialize(char *buffer) const;
91
92 // Populates sent_time with the current time.
93 void SetTimeToNow() { sent_time = Time::Now(); }
94
95 // Writes the contents of the message to the provided buffer.
96 size_t Print(char *buffer, int length) const;
Brian Silvermanc4546d22014-02-10 18:03:09 -080097
98#ifndef SWIG
99 const MessageType &GetType() const;
100#endif // SWIG
brians343bc112013-02-10 01:53:46 +0000101};
102
103template <class T> class Queue;
104template <class T> class MessageBuilder;
105template <class T> class ScopedMessagePtr;
106#ifndef USE_UNSAFE
107template <class T> class SafeMessageBuilder;
108template <class T> class SafeScopedMessagePtr;
109#endif // USE_UNSAFE
110
111// A ScopedMessagePtr<> manages a queue message pointer.
112// It frees it properly when the ScopedMessagePtr<> goes out of scope or gets
113// sent. By design, there is no way to get the ScopedMessagePtr to release the
114// message pointer.
115template <class T>
116class ScopedMessagePtr {
117 public:
118 // Returns a pointer to the message.
119 // This stays valid until Send or the destructor have been called.
120 const T *get() const { return msg_; }
121 T *get() { return msg_; }
122
123 const T &operator*() const {
124 const T *msg = get();
125 assert(msg != NULL);
126 return *msg;
127 }
128
129 T &operator*() {
130 T *msg = get();
131 assert(msg != NULL);
132 return *msg;
133 }
134
135 const T *operator->() const {
136 const T *msg = get();
137 assert(msg != NULL);
138 return msg;
139 }
140
141 T *operator->() {
142 T *msg = get();
143 assert(msg != NULL);
144 return msg;
145 }
146
147 operator bool() {
148 return msg_ != NULL;
149 }
150
151 // Sends the message and removes our reference to it.
152 // If the queue is full, over-ride the oldest message in it with our new
153 // message.
154 // Returns true on success, and false otherwise.
155 // The message will be freed.
156 bool Send();
157
158 // Sends the message and removes our reference to it.
159 // If the queue is full, block until it is no longer full.
160 // Returns true on success, and false otherwise.
161 // The message will be freed.
162 bool SendBlocking();
163
164 // Frees the contained message.
165 ~ScopedMessagePtr() {
166 reset();
167 }
168
169#ifndef SWIG
170 // Implements a move constructor. This only takes rvalue references
171 // because we want to allow someone to say
172 // ScopedMessagePtr<X> ptr = queue.MakeMessage();
173 // but we don't want to allow them to then say
174 // ScopedMessagePtr<X> new_ptr = ptr;
Brian Silvermanc4546d22014-02-10 18:03:09 -0800175 // And, if they do actually want to move the pointer, then it will correctly
brians343bc112013-02-10 01:53:46 +0000176 // clear out the source so there aren't 2 pointers to the message lying
177 // around.
178 ScopedMessagePtr(ScopedMessagePtr<T> &&ptr)
179 :
180#ifndef USE_UNSAFE
181 queue_(ptr.queue_),
182#endif // USE_UNSAFE
183 msg_(ptr.msg_) {
184 ptr.msg_ = NULL;
185 }
186#endif // SWIG
187
188 private:
189 // Provide access to set_queue and the constructor for init.
190 friend class aos::Queue<typename std::remove_const<T>::type>;
191 // Provide access to the copy constructor for MakeWithBuilder.
192 friend class aos::MessageBuilder<T>;
193
194#ifndef USE_UNSAFE
195 // Only Queue should be able to build a queue.
Brian Silverman08661c72013-09-01 17:24:38 -0700196 ScopedMessagePtr(RawQueue *queue, T *msg)
brians343bc112013-02-10 01:53:46 +0000197 : queue_(queue), msg_(msg) {}
198#else
199 ScopedMessagePtr(T *msg)
200 : msg_(msg) {}
201#endif // USE_UNSAFE
202
203 // Sets the pointer to msg, freeing the old value if it was there.
204 // This is private because nobody should be able to get a pointer to a message
205 // that needs to be scoped without using the queue.
206 void reset(T *msg = NULL);
207
208#ifndef USE_UNSAFE
209 // Sets the queue that owns this message.
Brian Silverman08661c72013-09-01 17:24:38 -0700210 void set_queue(RawQueue *queue) { queue_ = queue; }
brians343bc112013-02-10 01:53:46 +0000211
212 // The queue that the message is a part of.
Brian Silverman08661c72013-09-01 17:24:38 -0700213 RawQueue *queue_;
brians343bc112013-02-10 01:53:46 +0000214#endif // USE_UNSAFE
215 // The message or NULL.
216 T *msg_;
217
218 // Protect evil constructors.
219 DISALLOW_COPY_AND_ASSIGN(ScopedMessagePtr<T>);
220};
221
222// Specializations for the Builders will be automatically generated in the .q.h
223// header files with all of the handy builder methods.
224// This builder uses an actual shm message pointer, which is more efficient and
225// more dangerous than the linux only SafeMessageBuilder.
226template <class T>
227class MessageBuilder {
228 public:
229 typedef T Message;
230 bool Send();
231};
232
233// TODO(aschuh): Base class
234// T must be a Message with the same format as the messages generated by
235// the q files.
236template <class T>
237class Queue {
238 public:
239 typedef T Message;
240
241 Queue(const char *queue_name)
242 : queue_name_(queue_name),
243#ifdef USE_UNSAFE
244 queue_msg_(&msg_)
245#else
246 queue_(NULL),
247 queue_msg_(NULL, NULL)
248#endif // USE_UNSAFE
249 {
250 static_assert(shm_ok<T>::value,
251 "The provided message type can't be put in shmem.");
252 }
253
254 // Initializes the queue. This may optionally be called to do any one time
Brian Silvermanc4546d22014-02-10 18:03:09 -0800255 // work before sending information, and may be be called multiple times.
brians343bc112013-02-10 01:53:46 +0000256 // Init will be called when a message is sent, but this will cause sending to
257 // take a different amount of time the first cycle.
258 void Init();
259
Austin Schuhdc1c84a2013-02-23 16:33:10 -0800260 // Removes all internal references to shared memory so shared memory can be
261 // restarted safely. This should only be used in testing.
262 void Clear();
263
brians343bc112013-02-10 01:53:46 +0000264 // Fetches the next message from the queue.
265 // Returns true if there was a new message available and we successfully
266 // fetched it. This removes the message from the queue for all readers.
267 // TODO(aschuh): Fix this to use a different way of fetching messages so other
268 // readers can also FetchNext.
269 bool FetchNext();
270 bool FetchNextBlocking();
271
272 // Fetches the last message from the queue.
273 // Returns true if there was a new message available and we successfully
274 // fetched it.
275 bool FetchLatest();
276
277 // Returns the age of the message.
278 const time::Time Age() { return time::Time::Now() - queue_msg_->sent_time; }
279
280 // Returns true if the latest value in the queue is newer than age mseconds.
281 bool IsNewerThanMS(int age) {
Brian Silvermanc4546d22014-02-10 18:03:09 -0800282 // TODO(aschuh): Log very verbosely if something is _ever_ stale.
brians343bc112013-02-10 01:53:46 +0000283 if (get() != NULL) {
284 return Age() < time::Time::InMS(age);
285 } else {
286 return false;
287 }
288 }
289
290 // Returns a pointer to the current message.
291 // This pointer will be valid until a new message is fetched.
292 const T *get() const { return queue_msg_.get(); }
293
294 // Returns a reference to the message.
295 // The message will be valid until a new message is fetched.
296 const T &operator*() const {
297 const T *msg = get();
298 assert(msg != NULL);
299 return *msg;
300 }
301
302 // Returns a pointer to the current message.
303 // This pointer will be valid until a new message is fetched.
304 const T *operator->() const {
305 const T *msg = get();
306 assert(msg != NULL);
307 return msg;
308 }
309
310#ifndef USE_UNSAFE
311 // Returns a scoped_ptr containing a message.
312 // GCC will optimize away the copy constructor, so this is safe.
313 SafeScopedMessagePtr<T> SafeMakeMessage();
314
315 // Returns a message builder that contains a pre-allocated message.
316 aos::SafeMessageBuilder<T> SafeMakeWithBuilder();
317#endif // USE_UNSAFE
318
319#ifndef SWIG
320 // Returns a scoped_ptr containing a message.
321 // GCC will optimize away the copy constructor, so this is safe.
322 ScopedMessagePtr<T> MakeMessage();
323
324 // Returns a message builder that contains a pre-allocated message.
325 aos::MessageBuilder<T> MakeWithBuilder();
326#endif // SWIG
327
328 const char *name() const { return queue_name_; }
329
330 private:
331 const char *queue_name_;
332
333#ifdef USE_UNSAFE
334 // The unsafe queue only has 1 entry and no safety, so 1 message is fine.
335 T msg_;
336#else
337 T *MakeRawMessage();
338 // Pointer to the queue that this object fetches from.
Brian Silverman08661c72013-09-01 17:24:38 -0700339 RawQueue *queue_;
brians343bc112013-02-10 01:53:46 +0000340#endif
341 // Scoped pointer holding the latest message or NULL.
342 ScopedMessagePtr<const T> queue_msg_;
343 DISALLOW_COPY_AND_ASSIGN(Queue<T>);
344};
345
346// Base class for all queue groups.
347class QueueGroup {
348 public:
349 // Constructs a queue group given its name and a unique hash of the name and
350 // type.
351 QueueGroup(const char *name, uint32_t hash) : name_(name), hash_(hash) {}
352
353 // Returns the name of the queue group.
354 const char *name() const { return name_.c_str(); }
355 // Returns a unique hash representing this instance of the queue group.
356 uint32_t hash() const { return hash_; }
357
358 private:
359 std::string name_;
360 uint32_t hash_;
361};
362
363} // namespace aos
364
365#ifdef USE_UNSAFE
366#include "aos/crio/queue-tmpl.h"
367#else
Brian Silverman14fd0fb2014-01-14 21:42:01 -0800368#include "aos/linux_code/queue-tmpl.h"
brians343bc112013-02-10 01:53:46 +0000369#endif
370#undef USE_UNSAFE
371
372#endif // AOS_COMMON_QUEUE_H_