blob: f1d9f820dcdaa8148f6a11e5410da21a7e7af858 [file] [log] [blame]
brians343bc112013-02-10 01:53:46 +00001#ifndef AOS_COMMON_QUEUE_H_
2#define AOS_COMMON_QUEUE_H_
3
4#include <assert.h>
5
6#if defined(__VXWORKS__) || defined(__TEST_VXWORKS__)
7#define USE_UNSAFE
8#else
9#undef USE_UNSAFE
10#endif
11
Brian Silverman3204dd82013-03-12 18:42:01 -070012#include "aos/common/time.h"
brians343bc112013-02-10 01:53:46 +000013#include "aos/common/macros.h"
Brian Silverman1b86a192014-02-10 22:36:54 -080014#ifndef SWIG
15#include "aos/common/queue_types.h"
16#endif // SWIG
brians343bc112013-02-10 01:53:46 +000017#ifndef USE_UNSAFE
Brian Silverman14fd0fb2014-01-14 21:42:01 -080018#include "aos/linux_code/ipc_lib/queue.h"
brians343bc112013-02-10 01:53:46 +000019#endif // USE_UNSAFE
20#include "aos/common/time.h"
21
brians343bc112013-02-10 01:53:46 +000022namespace aos {
23
24// This class is a base class for all messages sent over queues.
25class Message {
26 public:
27 typedef ::aos::time::Time Time;
28 // The time that the message was sent at.
29 Time sent_time;
30
31 Message() : sent_time(0, 0) {}
32
33 // Zeros out the time.
Brian Silvermanc4546d22014-02-10 18:03:09 -080034 // Overriden to zero the whole message.
brians343bc112013-02-10 01:53:46 +000035 void Zero();
Brian Silvermanc4546d22014-02-10 18:03:09 -080036 // Returns the size of the common fields.
37 // Overriden to return the size of the whole message.
brians343bc112013-02-10 01:53:46 +000038 static size_t Size() { return sizeof(Time); }
39
40 // Deserializes the common fields from the buffer.
Brian Silvermanc4546d22014-02-10 18:03:09 -080041 // Overriden to deserialize the whole message.
brians343bc112013-02-10 01:53:46 +000042 size_t Deserialize(const char *buffer);
43 // Serializes the common fields into the buffer.
Brian Silvermanc4546d22014-02-10 18:03:09 -080044 // Overriden to serialize the whole message.
brians343bc112013-02-10 01:53:46 +000045 size_t Serialize(char *buffer) const;
46
47 // Populates sent_time with the current time.
48 void SetTimeToNow() { sent_time = Time::Now(); }
49
50 // Writes the contents of the message to the provided buffer.
51 size_t Print(char *buffer, int length) const;
Brian Silvermanc4546d22014-02-10 18:03:09 -080052
53#ifndef SWIG
54 const MessageType &GetType() const;
55#endif // SWIG
brians343bc112013-02-10 01:53:46 +000056};
57
58template <class T> class Queue;
59template <class T> class MessageBuilder;
60template <class T> class ScopedMessagePtr;
61#ifndef USE_UNSAFE
62template <class T> class SafeMessageBuilder;
63template <class T> class SafeScopedMessagePtr;
64#endif // USE_UNSAFE
65
66// A ScopedMessagePtr<> manages a queue message pointer.
67// It frees it properly when the ScopedMessagePtr<> goes out of scope or gets
68// sent. By design, there is no way to get the ScopedMessagePtr to release the
69// message pointer.
70template <class T>
71class ScopedMessagePtr {
72 public:
73 // Returns a pointer to the message.
74 // This stays valid until Send or the destructor have been called.
75 const T *get() const { return msg_; }
76 T *get() { return msg_; }
77
78 const T &operator*() const {
79 const T *msg = get();
80 assert(msg != NULL);
81 return *msg;
82 }
83
84 T &operator*() {
85 T *msg = get();
86 assert(msg != NULL);
87 return *msg;
88 }
89
90 const T *operator->() const {
91 const T *msg = get();
92 assert(msg != NULL);
93 return msg;
94 }
95
96 T *operator->() {
97 T *msg = get();
98 assert(msg != NULL);
99 return msg;
100 }
101
102 operator bool() {
103 return msg_ != NULL;
104 }
105
106 // Sends the message and removes our reference to it.
107 // If the queue is full, over-ride the oldest message in it with our new
108 // message.
109 // Returns true on success, and false otherwise.
110 // The message will be freed.
111 bool Send();
112
113 // Sends the message and removes our reference to it.
114 // If the queue is full, block until it is no longer full.
115 // Returns true on success, and false otherwise.
116 // The message will be freed.
117 bool SendBlocking();
118
119 // Frees the contained message.
120 ~ScopedMessagePtr() {
121 reset();
122 }
123
124#ifndef SWIG
125 // Implements a move constructor. This only takes rvalue references
126 // because we want to allow someone to say
127 // ScopedMessagePtr<X> ptr = queue.MakeMessage();
128 // but we don't want to allow them to then say
129 // ScopedMessagePtr<X> new_ptr = ptr;
Brian Silvermanc4546d22014-02-10 18:03:09 -0800130 // And, if they do actually want to move the pointer, then it will correctly
brians343bc112013-02-10 01:53:46 +0000131 // clear out the source so there aren't 2 pointers to the message lying
132 // around.
133 ScopedMessagePtr(ScopedMessagePtr<T> &&ptr)
134 :
135#ifndef USE_UNSAFE
136 queue_(ptr.queue_),
137#endif // USE_UNSAFE
138 msg_(ptr.msg_) {
139 ptr.msg_ = NULL;
140 }
141#endif // SWIG
142
143 private:
144 // Provide access to set_queue and the constructor for init.
145 friend class aos::Queue<typename std::remove_const<T>::type>;
146 // Provide access to the copy constructor for MakeWithBuilder.
147 friend class aos::MessageBuilder<T>;
148
149#ifndef USE_UNSAFE
150 // Only Queue should be able to build a queue.
Brian Silverman08661c72013-09-01 17:24:38 -0700151 ScopedMessagePtr(RawQueue *queue, T *msg)
brians343bc112013-02-10 01:53:46 +0000152 : queue_(queue), msg_(msg) {}
153#else
154 ScopedMessagePtr(T *msg)
155 : msg_(msg) {}
156#endif // USE_UNSAFE
157
158 // Sets the pointer to msg, freeing the old value if it was there.
159 // This is private because nobody should be able to get a pointer to a message
160 // that needs to be scoped without using the queue.
161 void reset(T *msg = NULL);
162
163#ifndef USE_UNSAFE
164 // Sets the queue that owns this message.
Brian Silverman08661c72013-09-01 17:24:38 -0700165 void set_queue(RawQueue *queue) { queue_ = queue; }
brians343bc112013-02-10 01:53:46 +0000166
167 // The queue that the message is a part of.
Brian Silverman08661c72013-09-01 17:24:38 -0700168 RawQueue *queue_;
brians343bc112013-02-10 01:53:46 +0000169#endif // USE_UNSAFE
170 // The message or NULL.
171 T *msg_;
172
173 // Protect evil constructors.
174 DISALLOW_COPY_AND_ASSIGN(ScopedMessagePtr<T>);
175};
176
177// Specializations for the Builders will be automatically generated in the .q.h
178// header files with all of the handy builder methods.
179// This builder uses an actual shm message pointer, which is more efficient and
180// more dangerous than the linux only SafeMessageBuilder.
181template <class T>
182class MessageBuilder {
183 public:
184 typedef T Message;
185 bool Send();
186};
187
188// TODO(aschuh): Base class
189// T must be a Message with the same format as the messages generated by
190// the q files.
191template <class T>
192class Queue {
193 public:
194 typedef T Message;
195
196 Queue(const char *queue_name)
197 : queue_name_(queue_name),
198#ifdef USE_UNSAFE
199 queue_msg_(&msg_)
200#else
201 queue_(NULL),
202 queue_msg_(NULL, NULL)
203#endif // USE_UNSAFE
204 {
205 static_assert(shm_ok<T>::value,
206 "The provided message type can't be put in shmem.");
207 }
208
209 // Initializes the queue. This may optionally be called to do any one time
Brian Silvermanc4546d22014-02-10 18:03:09 -0800210 // work before sending information, and may be be called multiple times.
brians343bc112013-02-10 01:53:46 +0000211 // Init will be called when a message is sent, but this will cause sending to
212 // take a different amount of time the first cycle.
213 void Init();
214
Austin Schuhdc1c84a2013-02-23 16:33:10 -0800215 // Removes all internal references to shared memory so shared memory can be
216 // restarted safely. This should only be used in testing.
217 void Clear();
218
brians343bc112013-02-10 01:53:46 +0000219 // Fetches the next message from the queue.
220 // Returns true if there was a new message available and we successfully
221 // fetched it. This removes the message from the queue for all readers.
222 // TODO(aschuh): Fix this to use a different way of fetching messages so other
223 // readers can also FetchNext.
224 bool FetchNext();
225 bool FetchNextBlocking();
226
227 // Fetches the last message from the queue.
228 // Returns true if there was a new message available and we successfully
229 // fetched it.
230 bool FetchLatest();
231
232 // Returns the age of the message.
233 const time::Time Age() { return time::Time::Now() - queue_msg_->sent_time; }
234
235 // Returns true if the latest value in the queue is newer than age mseconds.
236 bool IsNewerThanMS(int age) {
Brian Silvermanc4546d22014-02-10 18:03:09 -0800237 // TODO(aschuh): Log very verbosely if something is _ever_ stale.
brians343bc112013-02-10 01:53:46 +0000238 if (get() != NULL) {
239 return Age() < time::Time::InMS(age);
240 } else {
241 return false;
242 }
243 }
244
245 // Returns a pointer to the current message.
246 // This pointer will be valid until a new message is fetched.
247 const T *get() const { return queue_msg_.get(); }
248
249 // Returns a reference to the message.
250 // The message will be valid until a new message is fetched.
251 const T &operator*() const {
252 const T *msg = get();
253 assert(msg != NULL);
254 return *msg;
255 }
256
257 // Returns a pointer to the current message.
258 // This pointer will be valid until a new message is fetched.
259 const T *operator->() const {
260 const T *msg = get();
261 assert(msg != NULL);
262 return msg;
263 }
264
265#ifndef USE_UNSAFE
266 // Returns a scoped_ptr containing a message.
267 // GCC will optimize away the copy constructor, so this is safe.
268 SafeScopedMessagePtr<T> SafeMakeMessage();
269
270 // Returns a message builder that contains a pre-allocated message.
271 aos::SafeMessageBuilder<T> SafeMakeWithBuilder();
272#endif // USE_UNSAFE
273
274#ifndef SWIG
275 // Returns a scoped_ptr containing a message.
276 // GCC will optimize away the copy constructor, so this is safe.
277 ScopedMessagePtr<T> MakeMessage();
278
279 // Returns a message builder that contains a pre-allocated message.
280 aos::MessageBuilder<T> MakeWithBuilder();
281#endif // SWIG
282
283 const char *name() const { return queue_name_; }
284
285 private:
286 const char *queue_name_;
287
288#ifdef USE_UNSAFE
289 // The unsafe queue only has 1 entry and no safety, so 1 message is fine.
290 T msg_;
291#else
292 T *MakeRawMessage();
293 // Pointer to the queue that this object fetches from.
Brian Silverman08661c72013-09-01 17:24:38 -0700294 RawQueue *queue_;
brians343bc112013-02-10 01:53:46 +0000295#endif
296 // Scoped pointer holding the latest message or NULL.
297 ScopedMessagePtr<const T> queue_msg_;
298 DISALLOW_COPY_AND_ASSIGN(Queue<T>);
299};
300
301// Base class for all queue groups.
302class QueueGroup {
303 public:
304 // Constructs a queue group given its name and a unique hash of the name and
305 // type.
306 QueueGroup(const char *name, uint32_t hash) : name_(name), hash_(hash) {}
307
308 // Returns the name of the queue group.
309 const char *name() const { return name_.c_str(); }
310 // Returns a unique hash representing this instance of the queue group.
311 uint32_t hash() const { return hash_; }
312
313 private:
314 std::string name_;
315 uint32_t hash_;
316};
317
318} // namespace aos
319
320#ifdef USE_UNSAFE
321#include "aos/crio/queue-tmpl.h"
322#else
Brian Silverman14fd0fb2014-01-14 21:42:01 -0800323#include "aos/linux_code/queue-tmpl.h"
brians343bc112013-02-10 01:53:46 +0000324#endif
325#undef USE_UNSAFE
326
327#endif // AOS_COMMON_QUEUE_H_