blob: 9f6bc9da04ee6820dc50c92b048cae7a0aa7d7ef [file] [log] [blame]
brians343bc112013-02-10 01:53:46 +00001#ifndef AOS_COMMON_QUEUE_H_
2#define AOS_COMMON_QUEUE_H_
3
4#include <assert.h>
5
6#if defined(__VXWORKS__) || defined(__TEST_VXWORKS__)
7#define USE_UNSAFE
8#else
9#undef USE_UNSAFE
10#endif
11
Brian Silverman3204dd82013-03-12 18:42:01 -070012#include "aos/common/time.h"
brians343bc112013-02-10 01:53:46 +000013#include "aos/common/macros.h"
14#ifndef USE_UNSAFE
Brian Silverman14fd0fb2014-01-14 21:42:01 -080015#include "aos/linux_code/ipc_lib/queue.h"
brians343bc112013-02-10 01:53:46 +000016#endif // USE_UNSAFE
17#include "aos/common/time.h"
18
brians343bc112013-02-10 01:53:46 +000019namespace aos {
20
Brian Silverman96395be2014-02-11 18:35:57 -080021class MessageType;
22
brians343bc112013-02-10 01:53:46 +000023// This class is a base class for all messages sent over queues.
24class Message {
25 public:
26 typedef ::aos::time::Time Time;
27 // The time that the message was sent at.
28 Time sent_time;
29
30 Message() : sent_time(0, 0) {}
31
32 // Zeros out the time.
Brian Silvermanc4546d22014-02-10 18:03:09 -080033 // Overriden to zero the whole message.
brians343bc112013-02-10 01:53:46 +000034 void Zero();
Brian Silvermanc4546d22014-02-10 18:03:09 -080035 // Returns the size of the common fields.
36 // Overriden to return the size of the whole message.
brians343bc112013-02-10 01:53:46 +000037 static size_t Size() { return sizeof(Time); }
38
39 // Deserializes the common fields from the buffer.
Brian Silvermanc4546d22014-02-10 18:03:09 -080040 // Overriden to deserialize the whole message.
brians343bc112013-02-10 01:53:46 +000041 size_t Deserialize(const char *buffer);
42 // Serializes the common fields into the buffer.
Brian Silvermanc4546d22014-02-10 18:03:09 -080043 // Overriden to serialize the whole message.
brians343bc112013-02-10 01:53:46 +000044 size_t Serialize(char *buffer) const;
45
46 // Populates sent_time with the current time.
47 void SetTimeToNow() { sent_time = Time::Now(); }
48
49 // Writes the contents of the message to the provided buffer.
50 size_t Print(char *buffer, int length) const;
Brian Silvermanc4546d22014-02-10 18:03:09 -080051
Brian Silverman96395be2014-02-11 18:35:57 -080052 const MessageType *GetType() const;
brians343bc112013-02-10 01:53:46 +000053};
54
55template <class T> class Queue;
56template <class T> class MessageBuilder;
57template <class T> class ScopedMessagePtr;
58#ifndef USE_UNSAFE
59template <class T> class SafeMessageBuilder;
60template <class T> class SafeScopedMessagePtr;
61#endif // USE_UNSAFE
62
63// A ScopedMessagePtr<> manages a queue message pointer.
64// It frees it properly when the ScopedMessagePtr<> goes out of scope or gets
65// sent. By design, there is no way to get the ScopedMessagePtr to release the
66// message pointer.
67template <class T>
68class ScopedMessagePtr {
69 public:
70 // Returns a pointer to the message.
71 // This stays valid until Send or the destructor have been called.
72 const T *get() const { return msg_; }
73 T *get() { return msg_; }
74
75 const T &operator*() const {
76 const T *msg = get();
77 assert(msg != NULL);
78 return *msg;
79 }
80
81 T &operator*() {
82 T *msg = get();
83 assert(msg != NULL);
84 return *msg;
85 }
86
87 const T *operator->() const {
88 const T *msg = get();
89 assert(msg != NULL);
90 return msg;
91 }
92
93 T *operator->() {
94 T *msg = get();
95 assert(msg != NULL);
96 return msg;
97 }
98
99 operator bool() {
100 return msg_ != NULL;
101 }
102
103 // Sends the message and removes our reference to it.
104 // If the queue is full, over-ride the oldest message in it with our new
105 // message.
106 // Returns true on success, and false otherwise.
107 // The message will be freed.
108 bool Send();
109
110 // Sends the message and removes our reference to it.
111 // If the queue is full, block until it is no longer full.
112 // Returns true on success, and false otherwise.
113 // The message will be freed.
114 bool SendBlocking();
115
116 // Frees the contained message.
117 ~ScopedMessagePtr() {
118 reset();
119 }
120
121#ifndef SWIG
122 // Implements a move constructor. This only takes rvalue references
123 // because we want to allow someone to say
124 // ScopedMessagePtr<X> ptr = queue.MakeMessage();
125 // but we don't want to allow them to then say
126 // ScopedMessagePtr<X> new_ptr = ptr;
Brian Silvermanc4546d22014-02-10 18:03:09 -0800127 // And, if they do actually want to move the pointer, then it will correctly
brians343bc112013-02-10 01:53:46 +0000128 // clear out the source so there aren't 2 pointers to the message lying
129 // around.
130 ScopedMessagePtr(ScopedMessagePtr<T> &&ptr)
131 :
132#ifndef USE_UNSAFE
133 queue_(ptr.queue_),
134#endif // USE_UNSAFE
135 msg_(ptr.msg_) {
136 ptr.msg_ = NULL;
137 }
138#endif // SWIG
139
140 private:
141 // Provide access to set_queue and the constructor for init.
142 friend class aos::Queue<typename std::remove_const<T>::type>;
143 // Provide access to the copy constructor for MakeWithBuilder.
144 friend class aos::MessageBuilder<T>;
145
146#ifndef USE_UNSAFE
147 // Only Queue should be able to build a queue.
Brian Silverman08661c72013-09-01 17:24:38 -0700148 ScopedMessagePtr(RawQueue *queue, T *msg)
brians343bc112013-02-10 01:53:46 +0000149 : queue_(queue), msg_(msg) {}
150#else
151 ScopedMessagePtr(T *msg)
152 : msg_(msg) {}
153#endif // USE_UNSAFE
154
155 // Sets the pointer to msg, freeing the old value if it was there.
156 // This is private because nobody should be able to get a pointer to a message
157 // that needs to be scoped without using the queue.
158 void reset(T *msg = NULL);
159
160#ifndef USE_UNSAFE
161 // Sets the queue that owns this message.
Brian Silverman08661c72013-09-01 17:24:38 -0700162 void set_queue(RawQueue *queue) { queue_ = queue; }
brians343bc112013-02-10 01:53:46 +0000163
164 // The queue that the message is a part of.
Brian Silverman08661c72013-09-01 17:24:38 -0700165 RawQueue *queue_;
brians343bc112013-02-10 01:53:46 +0000166#endif // USE_UNSAFE
167 // The message or NULL.
168 T *msg_;
169
170 // Protect evil constructors.
171 DISALLOW_COPY_AND_ASSIGN(ScopedMessagePtr<T>);
172};
173
174// Specializations for the Builders will be automatically generated in the .q.h
175// header files with all of the handy builder methods.
176// This builder uses an actual shm message pointer, which is more efficient and
177// more dangerous than the linux only SafeMessageBuilder.
178template <class T>
179class MessageBuilder {
180 public:
181 typedef T Message;
182 bool Send();
183};
184
185// TODO(aschuh): Base class
186// T must be a Message with the same format as the messages generated by
187// the q files.
188template <class T>
189class Queue {
190 public:
191 typedef T Message;
192
193 Queue(const char *queue_name)
194 : queue_name_(queue_name),
195#ifdef USE_UNSAFE
196 queue_msg_(&msg_)
197#else
198 queue_(NULL),
199 queue_msg_(NULL, NULL)
200#endif // USE_UNSAFE
201 {
202 static_assert(shm_ok<T>::value,
203 "The provided message type can't be put in shmem.");
204 }
205
206 // Initializes the queue. This may optionally be called to do any one time
Brian Silvermanc4546d22014-02-10 18:03:09 -0800207 // work before sending information, and may be be called multiple times.
brians343bc112013-02-10 01:53:46 +0000208 // Init will be called when a message is sent, but this will cause sending to
209 // take a different amount of time the first cycle.
210 void Init();
211
Austin Schuhdc1c84a2013-02-23 16:33:10 -0800212 // Removes all internal references to shared memory so shared memory can be
213 // restarted safely. This should only be used in testing.
214 void Clear();
215
brians343bc112013-02-10 01:53:46 +0000216 // Fetches the next message from the queue.
217 // Returns true if there was a new message available and we successfully
218 // fetched it. This removes the message from the queue for all readers.
219 // TODO(aschuh): Fix this to use a different way of fetching messages so other
220 // readers can also FetchNext.
221 bool FetchNext();
222 bool FetchNextBlocking();
223
224 // Fetches the last message from the queue.
225 // Returns true if there was a new message available and we successfully
226 // fetched it.
227 bool FetchLatest();
228
229 // Returns the age of the message.
230 const time::Time Age() { return time::Time::Now() - queue_msg_->sent_time; }
231
232 // Returns true if the latest value in the queue is newer than age mseconds.
233 bool IsNewerThanMS(int age) {
Brian Silvermanc4546d22014-02-10 18:03:09 -0800234 // TODO(aschuh): Log very verbosely if something is _ever_ stale.
brians343bc112013-02-10 01:53:46 +0000235 if (get() != NULL) {
236 return Age() < time::Time::InMS(age);
237 } else {
238 return false;
239 }
240 }
241
242 // Returns a pointer to the current message.
243 // This pointer will be valid until a new message is fetched.
244 const T *get() const { return queue_msg_.get(); }
245
246 // Returns a reference to the message.
247 // The message will be valid until a new message is fetched.
248 const T &operator*() const {
249 const T *msg = get();
250 assert(msg != NULL);
251 return *msg;
252 }
253
254 // Returns a pointer to the current message.
255 // This pointer will be valid until a new message is fetched.
256 const T *operator->() const {
257 const T *msg = get();
258 assert(msg != NULL);
259 return msg;
260 }
261
262#ifndef USE_UNSAFE
263 // Returns a scoped_ptr containing a message.
264 // GCC will optimize away the copy constructor, so this is safe.
265 SafeScopedMessagePtr<T> SafeMakeMessage();
266
267 // Returns a message builder that contains a pre-allocated message.
268 aos::SafeMessageBuilder<T> SafeMakeWithBuilder();
269#endif // USE_UNSAFE
270
271#ifndef SWIG
272 // Returns a scoped_ptr containing a message.
273 // GCC will optimize away the copy constructor, so this is safe.
274 ScopedMessagePtr<T> MakeMessage();
275
276 // Returns a message builder that contains a pre-allocated message.
277 aos::MessageBuilder<T> MakeWithBuilder();
278#endif // SWIG
279
280 const char *name() const { return queue_name_; }
281
282 private:
283 const char *queue_name_;
284
285#ifdef USE_UNSAFE
286 // The unsafe queue only has 1 entry and no safety, so 1 message is fine.
287 T msg_;
288#else
289 T *MakeRawMessage();
290 // Pointer to the queue that this object fetches from.
Brian Silverman08661c72013-09-01 17:24:38 -0700291 RawQueue *queue_;
brians343bc112013-02-10 01:53:46 +0000292#endif
293 // Scoped pointer holding the latest message or NULL.
294 ScopedMessagePtr<const T> queue_msg_;
295 DISALLOW_COPY_AND_ASSIGN(Queue<T>);
296};
297
298// Base class for all queue groups.
299class QueueGroup {
300 public:
301 // Constructs a queue group given its name and a unique hash of the name and
302 // type.
303 QueueGroup(const char *name, uint32_t hash) : name_(name), hash_(hash) {}
304
305 // Returns the name of the queue group.
306 const char *name() const { return name_.c_str(); }
307 // Returns a unique hash representing this instance of the queue group.
308 uint32_t hash() const { return hash_; }
309
310 private:
311 std::string name_;
312 uint32_t hash_;
313};
314
315} // namespace aos
316
317#ifdef USE_UNSAFE
318#include "aos/crio/queue-tmpl.h"
319#else
Brian Silverman14fd0fb2014-01-14 21:42:01 -0800320#include "aos/linux_code/queue-tmpl.h"
brians343bc112013-02-10 01:53:46 +0000321#endif
322#undef USE_UNSAFE
323
324#endif // AOS_COMMON_QUEUE_H_