blob: 68cb338aa0fcce3a38eaba7b285baea8abbf9a84 [file] [log] [blame]
brians343bc112013-02-10 01:53:46 +00001#ifndef AOS_COMMON_QUEUE_H_
2#define AOS_COMMON_QUEUE_H_
3
4#include <assert.h>
5
6#if defined(__VXWORKS__) || defined(__TEST_VXWORKS__)
7#define USE_UNSAFE
8#else
9#undef USE_UNSAFE
10#endif
11
Brian Silverman3204dd82013-03-12 18:42:01 -070012#include "aos/common/time.h"
brians343bc112013-02-10 01:53:46 +000013#include "aos/common/macros.h"
14#ifndef USE_UNSAFE
15#include "aos/atom_code/ipc_lib/queue.h"
16#endif // USE_UNSAFE
17#include "aos/common/time.h"
18
19
20namespace aos {
21
22// This class is a base class for all messages sent over queues.
23class Message {
24 public:
25 typedef ::aos::time::Time Time;
26 // The time that the message was sent at.
27 Time sent_time;
28
29 Message() : sent_time(0, 0) {}
30
31 // Zeros out the time.
32 void Zero();
33 // Returns the size of the message in bytes.
34 static size_t Size() { return sizeof(Time); }
35
36 // Deserializes the common fields from the buffer.
37 size_t Deserialize(const char *buffer);
38 // Serializes the common fields into the buffer.
39 size_t Serialize(char *buffer) const;
40
41 // Populates sent_time with the current time.
42 void SetTimeToNow() { sent_time = Time::Now(); }
43
44 // Writes the contents of the message to the provided buffer.
45 size_t Print(char *buffer, int length) const;
46};
47
48template <class T> class Queue;
49template <class T> class MessageBuilder;
50template <class T> class ScopedMessagePtr;
51#ifndef USE_UNSAFE
52template <class T> class SafeMessageBuilder;
53template <class T> class SafeScopedMessagePtr;
54#endif // USE_UNSAFE
55
56// A ScopedMessagePtr<> manages a queue message pointer.
57// It frees it properly when the ScopedMessagePtr<> goes out of scope or gets
58// sent. By design, there is no way to get the ScopedMessagePtr to release the
59// message pointer.
60template <class T>
61class ScopedMessagePtr {
62 public:
63 // Returns a pointer to the message.
64 // This stays valid until Send or the destructor have been called.
65 const T *get() const { return msg_; }
66 T *get() { return msg_; }
67
68 const T &operator*() const {
69 const T *msg = get();
70 assert(msg != NULL);
71 return *msg;
72 }
73
74 T &operator*() {
75 T *msg = get();
76 assert(msg != NULL);
77 return *msg;
78 }
79
80 const T *operator->() const {
81 const T *msg = get();
82 assert(msg != NULL);
83 return msg;
84 }
85
86 T *operator->() {
87 T *msg = get();
88 assert(msg != NULL);
89 return msg;
90 }
91
92 operator bool() {
93 return msg_ != NULL;
94 }
95
96 // Sends the message and removes our reference to it.
97 // If the queue is full, over-ride the oldest message in it with our new
98 // message.
99 // Returns true on success, and false otherwise.
100 // The message will be freed.
101 bool Send();
102
103 // Sends the message and removes our reference to it.
104 // If the queue is full, block until it is no longer full.
105 // Returns true on success, and false otherwise.
106 // The message will be freed.
107 bool SendBlocking();
108
109 // Frees the contained message.
110 ~ScopedMessagePtr() {
111 reset();
112 }
113
114#ifndef SWIG
115 // Implements a move constructor. This only takes rvalue references
116 // because we want to allow someone to say
117 // ScopedMessagePtr<X> ptr = queue.MakeMessage();
118 // but we don't want to allow them to then say
119 // ScopedMessagePtr<X> new_ptr = ptr;
120 // And, if they do actually want to copy the pointer, then it will correctly
121 // clear out the source so there aren't 2 pointers to the message lying
122 // around.
123 ScopedMessagePtr(ScopedMessagePtr<T> &&ptr)
124 :
125#ifndef USE_UNSAFE
126 queue_(ptr.queue_),
127#endif // USE_UNSAFE
128 msg_(ptr.msg_) {
129 ptr.msg_ = NULL;
130 }
131#endif // SWIG
132
133 private:
134 // Provide access to set_queue and the constructor for init.
135 friend class aos::Queue<typename std::remove_const<T>::type>;
136 // Provide access to the copy constructor for MakeWithBuilder.
137 friend class aos::MessageBuilder<T>;
138
139#ifndef USE_UNSAFE
140 // Only Queue should be able to build a queue.
Brian Silverman08661c72013-09-01 17:24:38 -0700141 ScopedMessagePtr(RawQueue *queue, T *msg)
brians343bc112013-02-10 01:53:46 +0000142 : queue_(queue), msg_(msg) {}
143#else
144 ScopedMessagePtr(T *msg)
145 : msg_(msg) {}
146#endif // USE_UNSAFE
147
148 // Sets the pointer to msg, freeing the old value if it was there.
149 // This is private because nobody should be able to get a pointer to a message
150 // that needs to be scoped without using the queue.
151 void reset(T *msg = NULL);
152
153#ifndef USE_UNSAFE
154 // Sets the queue that owns this message.
Brian Silverman08661c72013-09-01 17:24:38 -0700155 void set_queue(RawQueue *queue) { queue_ = queue; }
brians343bc112013-02-10 01:53:46 +0000156
157 // The queue that the message is a part of.
Brian Silverman08661c72013-09-01 17:24:38 -0700158 RawQueue *queue_;
brians343bc112013-02-10 01:53:46 +0000159#endif // USE_UNSAFE
160 // The message or NULL.
161 T *msg_;
162
163 // Protect evil constructors.
164 DISALLOW_COPY_AND_ASSIGN(ScopedMessagePtr<T>);
165};
166
167// Specializations for the Builders will be automatically generated in the .q.h
168// header files with all of the handy builder methods.
169// This builder uses an actual shm message pointer, which is more efficient and
170// more dangerous than the linux only SafeMessageBuilder.
171template <class T>
172class MessageBuilder {
173 public:
174 typedef T Message;
175 bool Send();
176};
177
178// TODO(aschuh): Base class
179// T must be a Message with the same format as the messages generated by
180// the q files.
181template <class T>
182class Queue {
183 public:
184 typedef T Message;
185
186 Queue(const char *queue_name)
187 : queue_name_(queue_name),
188#ifdef USE_UNSAFE
189 queue_msg_(&msg_)
190#else
191 queue_(NULL),
192 queue_msg_(NULL, NULL)
193#endif // USE_UNSAFE
194 {
195 static_assert(shm_ok<T>::value,
196 "The provided message type can't be put in shmem.");
197 }
198
199 // Initializes the queue. This may optionally be called to do any one time
200 // work before sending information, and may be be called mulitple times.
201 // Init will be called when a message is sent, but this will cause sending to
202 // take a different amount of time the first cycle.
203 void Init();
204
Austin Schuhdc1c84a2013-02-23 16:33:10 -0800205 // Removes all internal references to shared memory so shared memory can be
206 // restarted safely. This should only be used in testing.
207 void Clear();
208
brians343bc112013-02-10 01:53:46 +0000209 // Fetches the next message from the queue.
210 // Returns true if there was a new message available and we successfully
211 // fetched it. This removes the message from the queue for all readers.
212 // TODO(aschuh): Fix this to use a different way of fetching messages so other
213 // readers can also FetchNext.
214 bool FetchNext();
215 bool FetchNextBlocking();
216
217 // Fetches the last message from the queue.
218 // Returns true if there was a new message available and we successfully
219 // fetched it.
220 bool FetchLatest();
221
222 // Returns the age of the message.
223 const time::Time Age() { return time::Time::Now() - queue_msg_->sent_time; }
224
225 // Returns true if the latest value in the queue is newer than age mseconds.
226 bool IsNewerThanMS(int age) {
227 // TODO(aschuh): Log very verbosely if something is _ever_ stale;
228 if (get() != NULL) {
229 return Age() < time::Time::InMS(age);
230 } else {
231 return false;
232 }
233 }
234
235 // Returns a pointer to the current message.
236 // This pointer will be valid until a new message is fetched.
237 const T *get() const { return queue_msg_.get(); }
238
239 // Returns a reference to the message.
240 // The message will be valid until a new message is fetched.
241 const T &operator*() const {
242 const T *msg = get();
243 assert(msg != NULL);
244 return *msg;
245 }
246
247 // Returns a pointer to the current message.
248 // This pointer will be valid until a new message is fetched.
249 const T *operator->() const {
250 const T *msg = get();
251 assert(msg != NULL);
252 return msg;
253 }
254
255#ifndef USE_UNSAFE
256 // Returns a scoped_ptr containing a message.
257 // GCC will optimize away the copy constructor, so this is safe.
258 SafeScopedMessagePtr<T> SafeMakeMessage();
259
260 // Returns a message builder that contains a pre-allocated message.
261 aos::SafeMessageBuilder<T> SafeMakeWithBuilder();
262#endif // USE_UNSAFE
263
264#ifndef SWIG
265 // Returns a scoped_ptr containing a message.
266 // GCC will optimize away the copy constructor, so this is safe.
267 ScopedMessagePtr<T> MakeMessage();
268
269 // Returns a message builder that contains a pre-allocated message.
270 aos::MessageBuilder<T> MakeWithBuilder();
271#endif // SWIG
272
273 const char *name() const { return queue_name_; }
274
275 private:
276 const char *queue_name_;
277
278#ifdef USE_UNSAFE
279 // The unsafe queue only has 1 entry and no safety, so 1 message is fine.
280 T msg_;
281#else
282 T *MakeRawMessage();
283 // Pointer to the queue that this object fetches from.
Brian Silverman08661c72013-09-01 17:24:38 -0700284 RawQueue *queue_;
brians343bc112013-02-10 01:53:46 +0000285#endif
286 // Scoped pointer holding the latest message or NULL.
287 ScopedMessagePtr<const T> queue_msg_;
288 DISALLOW_COPY_AND_ASSIGN(Queue<T>);
289};
290
291// Base class for all queue groups.
292class QueueGroup {
293 public:
294 // Constructs a queue group given its name and a unique hash of the name and
295 // type.
296 QueueGroup(const char *name, uint32_t hash) : name_(name), hash_(hash) {}
297
298 // Returns the name of the queue group.
299 const char *name() const { return name_.c_str(); }
300 // Returns a unique hash representing this instance of the queue group.
301 uint32_t hash() const { return hash_; }
302
303 private:
304 std::string name_;
305 uint32_t hash_;
306};
307
308} // namespace aos
309
310#ifdef USE_UNSAFE
311#include "aos/crio/queue-tmpl.h"
312#else
313#include "aos/atom_code/queue-tmpl.h"
314#endif
315#undef USE_UNSAFE
316
317#endif // AOS_COMMON_QUEUE_H_