blob: 6c3382250ac20f27f7e3eee390b9bb5d06f5a6b6 [file] [log] [blame]
brians343bc112013-02-10 01:53:46 +00001#ifndef AOS_COMMON_QUEUE_H_
2#define AOS_COMMON_QUEUE_H_
3
4#include <assert.h>
5
6#if defined(__VXWORKS__) || defined(__TEST_VXWORKS__)
7#define USE_UNSAFE
8#else
9#undef USE_UNSAFE
10#endif
11
Brian Silverman3204dd82013-03-12 18:42:01 -070012#include "aos/common/time.h"
brians343bc112013-02-10 01:53:46 +000013#include "aos/common/macros.h"
14#ifndef USE_UNSAFE
Brian Silverman14fd0fb2014-01-14 21:42:01 -080015#include "aos/linux_code/ipc_lib/queue.h"
brians343bc112013-02-10 01:53:46 +000016#endif // USE_UNSAFE
17#include "aos/common/time.h"
18
brians343bc112013-02-10 01:53:46 +000019namespace aos {
20
Brian Silverman96395be2014-02-11 18:35:57 -080021class MessageType;
22
brians343bc112013-02-10 01:53:46 +000023// This class is a base class for all messages sent over queues.
Brian Silverman16c82972014-02-13 15:36:40 -080024// All of the methods are overloaded in (generated) subclasses to do the same
25// thing for the whole thing.
brians343bc112013-02-10 01:53:46 +000026class Message {
27 public:
28 typedef ::aos::time::Time Time;
29 // The time that the message was sent at.
30 Time sent_time;
31
32 Message() : sent_time(0, 0) {}
33
34 // Zeros out the time.
35 void Zero();
Brian Silvermanc4546d22014-02-10 18:03:09 -080036 // Returns the size of the common fields.
brians343bc112013-02-10 01:53:46 +000037 static size_t Size() { return sizeof(Time); }
38
39 // Deserializes the common fields from the buffer.
40 size_t Deserialize(const char *buffer);
41 // Serializes the common fields into the buffer.
42 size_t Serialize(char *buffer) const;
43
44 // Populates sent_time with the current time.
45 void SetTimeToNow() { sent_time = Time::Now(); }
46
47 // Writes the contents of the message to the provided buffer.
48 size_t Print(char *buffer, int length) const;
Brian Silvermanc4546d22014-02-10 18:03:09 -080049
Brian Silverman665e60c2014-02-12 13:57:10 -080050 static const MessageType *GetType();
brians343bc112013-02-10 01:53:46 +000051};
52
53template <class T> class Queue;
54template <class T> class MessageBuilder;
55template <class T> class ScopedMessagePtr;
56#ifndef USE_UNSAFE
57template <class T> class SafeMessageBuilder;
58template <class T> class SafeScopedMessagePtr;
59#endif // USE_UNSAFE
60
61// A ScopedMessagePtr<> manages a queue message pointer.
62// It frees it properly when the ScopedMessagePtr<> goes out of scope or gets
63// sent. By design, there is no way to get the ScopedMessagePtr to release the
64// message pointer.
65template <class T>
66class ScopedMessagePtr {
67 public:
68 // Returns a pointer to the message.
69 // This stays valid until Send or the destructor have been called.
70 const T *get() const { return msg_; }
71 T *get() { return msg_; }
72
73 const T &operator*() const {
74 const T *msg = get();
75 assert(msg != NULL);
76 return *msg;
77 }
78
79 T &operator*() {
80 T *msg = get();
81 assert(msg != NULL);
82 return *msg;
83 }
84
85 const T *operator->() const {
86 const T *msg = get();
87 assert(msg != NULL);
88 return msg;
89 }
90
91 T *operator->() {
92 T *msg = get();
93 assert(msg != NULL);
94 return msg;
95 }
96
97 operator bool() {
98 return msg_ != NULL;
99 }
100
101 // Sends the message and removes our reference to it.
102 // If the queue is full, over-ride the oldest message in it with our new
103 // message.
104 // Returns true on success, and false otherwise.
105 // The message will be freed.
106 bool Send();
107
108 // Sends the message and removes our reference to it.
109 // If the queue is full, block until it is no longer full.
110 // Returns true on success, and false otherwise.
111 // The message will be freed.
112 bool SendBlocking();
113
114 // Frees the contained message.
115 ~ScopedMessagePtr() {
116 reset();
117 }
118
119#ifndef SWIG
120 // Implements a move constructor. This only takes rvalue references
121 // because we want to allow someone to say
122 // ScopedMessagePtr<X> ptr = queue.MakeMessage();
123 // but we don't want to allow them to then say
124 // ScopedMessagePtr<X> new_ptr = ptr;
Brian Silvermanc4546d22014-02-10 18:03:09 -0800125 // And, if they do actually want to move the pointer, then it will correctly
brians343bc112013-02-10 01:53:46 +0000126 // clear out the source so there aren't 2 pointers to the message lying
127 // around.
128 ScopedMessagePtr(ScopedMessagePtr<T> &&ptr)
129 :
130#ifndef USE_UNSAFE
131 queue_(ptr.queue_),
132#endif // USE_UNSAFE
133 msg_(ptr.msg_) {
134 ptr.msg_ = NULL;
135 }
136#endif // SWIG
137
138 private:
139 // Provide access to set_queue and the constructor for init.
140 friend class aos::Queue<typename std::remove_const<T>::type>;
141 // Provide access to the copy constructor for MakeWithBuilder.
142 friend class aos::MessageBuilder<T>;
143
144#ifndef USE_UNSAFE
145 // Only Queue should be able to build a queue.
Brian Silverman08661c72013-09-01 17:24:38 -0700146 ScopedMessagePtr(RawQueue *queue, T *msg)
brians343bc112013-02-10 01:53:46 +0000147 : queue_(queue), msg_(msg) {}
148#else
149 ScopedMessagePtr(T *msg)
150 : msg_(msg) {}
151#endif // USE_UNSAFE
152
153 // Sets the pointer to msg, freeing the old value if it was there.
154 // This is private because nobody should be able to get a pointer to a message
155 // that needs to be scoped without using the queue.
156 void reset(T *msg = NULL);
157
158#ifndef USE_UNSAFE
159 // Sets the queue that owns this message.
Brian Silverman08661c72013-09-01 17:24:38 -0700160 void set_queue(RawQueue *queue) { queue_ = queue; }
brians343bc112013-02-10 01:53:46 +0000161
162 // The queue that the message is a part of.
Brian Silverman08661c72013-09-01 17:24:38 -0700163 RawQueue *queue_;
brians343bc112013-02-10 01:53:46 +0000164#endif // USE_UNSAFE
165 // The message or NULL.
166 T *msg_;
167
168 // Protect evil constructors.
169 DISALLOW_COPY_AND_ASSIGN(ScopedMessagePtr<T>);
170};
171
172// Specializations for the Builders will be automatically generated in the .q.h
173// header files with all of the handy builder methods.
174// This builder uses an actual shm message pointer, which is more efficient and
175// more dangerous than the linux only SafeMessageBuilder.
176template <class T>
177class MessageBuilder {
178 public:
179 typedef T Message;
180 bool Send();
181};
182
183// TODO(aschuh): Base class
184// T must be a Message with the same format as the messages generated by
185// the q files.
186template <class T>
187class Queue {
188 public:
189 typedef T Message;
190
191 Queue(const char *queue_name)
192 : queue_name_(queue_name),
193#ifdef USE_UNSAFE
194 queue_msg_(&msg_)
195#else
196 queue_(NULL),
197 queue_msg_(NULL, NULL)
198#endif // USE_UNSAFE
199 {
200 static_assert(shm_ok<T>::value,
201 "The provided message type can't be put in shmem.");
202 }
203
204 // Initializes the queue. This may optionally be called to do any one time
Brian Silvermanc4546d22014-02-10 18:03:09 -0800205 // work before sending information, and may be be called multiple times.
brians343bc112013-02-10 01:53:46 +0000206 // Init will be called when a message is sent, but this will cause sending to
207 // take a different amount of time the first cycle.
208 void Init();
209
Austin Schuhdc1c84a2013-02-23 16:33:10 -0800210 // Removes all internal references to shared memory so shared memory can be
211 // restarted safely. This should only be used in testing.
212 void Clear();
213
brians343bc112013-02-10 01:53:46 +0000214 // Fetches the next message from the queue.
215 // Returns true if there was a new message available and we successfully
216 // fetched it. This removes the message from the queue for all readers.
brians343bc112013-02-10 01:53:46 +0000217 bool FetchNext();
218 bool FetchNextBlocking();
219
220 // Fetches the last message from the queue.
221 // Returns true if there was a new message available and we successfully
222 // fetched it.
223 bool FetchLatest();
224
225 // Returns the age of the message.
226 const time::Time Age() { return time::Time::Now() - queue_msg_->sent_time; }
227
228 // Returns true if the latest value in the queue is newer than age mseconds.
229 bool IsNewerThanMS(int age) {
Brian Silvermanc4546d22014-02-10 18:03:09 -0800230 // TODO(aschuh): Log very verbosely if something is _ever_ stale.
brians343bc112013-02-10 01:53:46 +0000231 if (get() != NULL) {
232 return Age() < time::Time::InMS(age);
233 } else {
234 return false;
235 }
236 }
237
238 // Returns a pointer to the current message.
239 // This pointer will be valid until a new message is fetched.
240 const T *get() const { return queue_msg_.get(); }
241
242 // Returns a reference to the message.
243 // The message will be valid until a new message is fetched.
244 const T &operator*() const {
245 const T *msg = get();
246 assert(msg != NULL);
247 return *msg;
248 }
249
250 // Returns a pointer to the current message.
251 // This pointer will be valid until a new message is fetched.
252 const T *operator->() const {
253 const T *msg = get();
254 assert(msg != NULL);
255 return msg;
256 }
257
258#ifndef USE_UNSAFE
259 // Returns a scoped_ptr containing a message.
260 // GCC will optimize away the copy constructor, so this is safe.
261 SafeScopedMessagePtr<T> SafeMakeMessage();
262
263 // Returns a message builder that contains a pre-allocated message.
264 aos::SafeMessageBuilder<T> SafeMakeWithBuilder();
265#endif // USE_UNSAFE
266
267#ifndef SWIG
268 // Returns a scoped_ptr containing a message.
269 // GCC will optimize away the copy constructor, so this is safe.
270 ScopedMessagePtr<T> MakeMessage();
271
272 // Returns a message builder that contains a pre-allocated message.
273 aos::MessageBuilder<T> MakeWithBuilder();
274#endif // SWIG
275
276 const char *name() const { return queue_name_; }
277
278 private:
279 const char *queue_name_;
280
281#ifdef USE_UNSAFE
282 // The unsafe queue only has 1 entry and no safety, so 1 message is fine.
283 T msg_;
284#else
285 T *MakeRawMessage();
286 // Pointer to the queue that this object fetches from.
Brian Silverman08661c72013-09-01 17:24:38 -0700287 RawQueue *queue_;
Austin Schuh287d98e2014-03-09 00:41:55 -0800288 int index_ = 0;
brians343bc112013-02-10 01:53:46 +0000289#endif
290 // Scoped pointer holding the latest message or NULL.
291 ScopedMessagePtr<const T> queue_msg_;
292 DISALLOW_COPY_AND_ASSIGN(Queue<T>);
293};
294
295// Base class for all queue groups.
296class QueueGroup {
297 public:
298 // Constructs a queue group given its name and a unique hash of the name and
299 // type.
300 QueueGroup(const char *name, uint32_t hash) : name_(name), hash_(hash) {}
301
302 // Returns the name of the queue group.
303 const char *name() const { return name_.c_str(); }
304 // Returns a unique hash representing this instance of the queue group.
305 uint32_t hash() const { return hash_; }
306
307 private:
308 std::string name_;
309 uint32_t hash_;
310};
311
312} // namespace aos
313
314#ifdef USE_UNSAFE
315#include "aos/crio/queue-tmpl.h"
316#else
Brian Silverman14fd0fb2014-01-14 21:42:01 -0800317#include "aos/linux_code/queue-tmpl.h"
brians343bc112013-02-10 01:53:46 +0000318#endif
319#undef USE_UNSAFE
320
321#endif // AOS_COMMON_QUEUE_H_