blob: e93c655cbc7e9cd8aeb216e9c6e0a1819328780b [file] [log] [blame]
brians343bc112013-02-10 01:53:46 +00001#ifndef AOS_COMMON_QUEUE_H_
2#define AOS_COMMON_QUEUE_H_
3
4#include <assert.h>
5
6#if defined(__VXWORKS__) || defined(__TEST_VXWORKS__)
7#define USE_UNSAFE
8#else
9#undef USE_UNSAFE
10#endif
11
Brian Silverman3204dd82013-03-12 18:42:01 -070012#include "aos/common/time.h"
brians343bc112013-02-10 01:53:46 +000013#include "aos/common/macros.h"
14#ifndef USE_UNSAFE
15#include "aos/atom_code/ipc_lib/queue.h"
16#endif // USE_UNSAFE
17#include "aos/common/time.h"
18
19
20namespace aos {
21
22// This class is a base class for all messages sent over queues.
23class Message {
24 public:
25 typedef ::aos::time::Time Time;
26 // The time that the message was sent at.
27 Time sent_time;
28
29 Message() : sent_time(0, 0) {}
30
31 // Zeros out the time.
32 void Zero();
33 // Returns the size of the message in bytes.
34 static size_t Size() { return sizeof(Time); }
35
36 // Deserializes the common fields from the buffer.
37 size_t Deserialize(const char *buffer);
38 // Serializes the common fields into the buffer.
39 size_t Serialize(char *buffer) const;
40
41 // Populates sent_time with the current time.
42 void SetTimeToNow() { sent_time = Time::Now(); }
43
44 // Writes the contents of the message to the provided buffer.
45 size_t Print(char *buffer, int length) const;
46};
47
48template <class T> class Queue;
49template <class T> class MessageBuilder;
50template <class T> class ScopedMessagePtr;
51#ifndef USE_UNSAFE
52template <class T> class SafeMessageBuilder;
53template <class T> class SafeScopedMessagePtr;
54#endif // USE_UNSAFE
55
56// A ScopedMessagePtr<> manages a queue message pointer.
57// It frees it properly when the ScopedMessagePtr<> goes out of scope or gets
58// sent. By design, there is no way to get the ScopedMessagePtr to release the
59// message pointer.
60template <class T>
61class ScopedMessagePtr {
62 public:
63 // Returns a pointer to the message.
64 // This stays valid until Send or the destructor have been called.
65 const T *get() const { return msg_; }
66 T *get() { return msg_; }
67
68 const T &operator*() const {
69 const T *msg = get();
70 assert(msg != NULL);
71 return *msg;
72 }
73
74 T &operator*() {
75 T *msg = get();
76 assert(msg != NULL);
77 return *msg;
78 }
79
80 const T *operator->() const {
81 const T *msg = get();
82 assert(msg != NULL);
83 return msg;
84 }
85
86 T *operator->() {
87 T *msg = get();
88 assert(msg != NULL);
89 return msg;
90 }
91
92 operator bool() {
93 return msg_ != NULL;
94 }
95
96 // Sends the message and removes our reference to it.
97 // If the queue is full, over-ride the oldest message in it with our new
98 // message.
99 // Returns true on success, and false otherwise.
100 // The message will be freed.
101 bool Send();
102
103 // Sends the message and removes our reference to it.
104 // If the queue is full, block until it is no longer full.
105 // Returns true on success, and false otherwise.
106 // The message will be freed.
107 bool SendBlocking();
108
109 // Frees the contained message.
110 ~ScopedMessagePtr() {
111 reset();
112 }
113
114#ifndef SWIG
115 // Implements a move constructor. This only takes rvalue references
116 // because we want to allow someone to say
117 // ScopedMessagePtr<X> ptr = queue.MakeMessage();
118 // but we don't want to allow them to then say
119 // ScopedMessagePtr<X> new_ptr = ptr;
120 // And, if they do actually want to copy the pointer, then it will correctly
121 // clear out the source so there aren't 2 pointers to the message lying
122 // around.
123 ScopedMessagePtr(ScopedMessagePtr<T> &&ptr)
124 :
125#ifndef USE_UNSAFE
126 queue_(ptr.queue_),
127#endif // USE_UNSAFE
128 msg_(ptr.msg_) {
129 ptr.msg_ = NULL;
130 }
131#endif // SWIG
132
133 private:
134 // Provide access to set_queue and the constructor for init.
135 friend class aos::Queue<typename std::remove_const<T>::type>;
136 // Provide access to the copy constructor for MakeWithBuilder.
137 friend class aos::MessageBuilder<T>;
138
139#ifndef USE_UNSAFE
140 // Only Queue should be able to build a queue.
141 ScopedMessagePtr(aos_queue *queue, T *msg)
142 : queue_(queue), msg_(msg) {}
143#else
144 ScopedMessagePtr(T *msg)
145 : msg_(msg) {}
146#endif // USE_UNSAFE
147
148 // Sets the pointer to msg, freeing the old value if it was there.
149 // This is private because nobody should be able to get a pointer to a message
150 // that needs to be scoped without using the queue.
151 void reset(T *msg = NULL);
152
153#ifndef USE_UNSAFE
154 // Sets the queue that owns this message.
155 void set_queue(aos_queue *queue) { queue_ = queue; }
156
157 // The queue that the message is a part of.
158 aos_queue *queue_;
159#endif // USE_UNSAFE
160 // The message or NULL.
161 T *msg_;
162
163 // Protect evil constructors.
164 DISALLOW_COPY_AND_ASSIGN(ScopedMessagePtr<T>);
165};
166
167// Specializations for the Builders will be automatically generated in the .q.h
168// header files with all of the handy builder methods.
169// This builder uses an actual shm message pointer, which is more efficient and
170// more dangerous than the linux only SafeMessageBuilder.
171template <class T>
172class MessageBuilder {
173 public:
174 typedef T Message;
175 bool Send();
176};
177
178// TODO(aschuh): Base class
179// T must be a Message with the same format as the messages generated by
180// the q files.
181template <class T>
182class Queue {
183 public:
184 typedef T Message;
185
186 Queue(const char *queue_name)
187 : queue_name_(queue_name),
188#ifdef USE_UNSAFE
189 queue_msg_(&msg_)
190#else
191 queue_(NULL),
192 queue_msg_(NULL, NULL)
193#endif // USE_UNSAFE
194 {
195 static_assert(shm_ok<T>::value,
196 "The provided message type can't be put in shmem.");
197 }
198
199 // Initializes the queue. This may optionally be called to do any one time
200 // work before sending information, and may be be called mulitple times.
201 // Init will be called when a message is sent, but this will cause sending to
202 // take a different amount of time the first cycle.
203 void Init();
204
205 // Fetches the next message from the queue.
206 // Returns true if there was a new message available and we successfully
207 // fetched it. This removes the message from the queue for all readers.
208 // TODO(aschuh): Fix this to use a different way of fetching messages so other
209 // readers can also FetchNext.
210 bool FetchNext();
211 bool FetchNextBlocking();
212
213 // Fetches the last message from the queue.
214 // Returns true if there was a new message available and we successfully
215 // fetched it.
216 bool FetchLatest();
217
218 // Returns the age of the message.
219 const time::Time Age() { return time::Time::Now() - queue_msg_->sent_time; }
220
221 // Returns true if the latest value in the queue is newer than age mseconds.
222 bool IsNewerThanMS(int age) {
223 // TODO(aschuh): Log very verbosely if something is _ever_ stale;
224 if (get() != NULL) {
225 return Age() < time::Time::InMS(age);
226 } else {
227 return false;
228 }
229 }
230
231 // Returns a pointer to the current message.
232 // This pointer will be valid until a new message is fetched.
233 const T *get() const { return queue_msg_.get(); }
234
235 // Returns a reference to the message.
236 // The message will be valid until a new message is fetched.
237 const T &operator*() const {
238 const T *msg = get();
239 assert(msg != NULL);
240 return *msg;
241 }
242
243 // Returns a pointer to the current message.
244 // This pointer will be valid until a new message is fetched.
245 const T *operator->() const {
246 const T *msg = get();
247 assert(msg != NULL);
248 return msg;
249 }
250
251#ifndef USE_UNSAFE
252 // Returns a scoped_ptr containing a message.
253 // GCC will optimize away the copy constructor, so this is safe.
254 SafeScopedMessagePtr<T> SafeMakeMessage();
255
256 // Returns a message builder that contains a pre-allocated message.
257 aos::SafeMessageBuilder<T> SafeMakeWithBuilder();
258#endif // USE_UNSAFE
259
260#ifndef SWIG
261 // Returns a scoped_ptr containing a message.
262 // GCC will optimize away the copy constructor, so this is safe.
263 ScopedMessagePtr<T> MakeMessage();
264
265 // Returns a message builder that contains a pre-allocated message.
266 aos::MessageBuilder<T> MakeWithBuilder();
267#endif // SWIG
268
269 const char *name() const { return queue_name_; }
270
271 private:
272 const char *queue_name_;
273
274#ifdef USE_UNSAFE
275 // The unsafe queue only has 1 entry and no safety, so 1 message is fine.
276 T msg_;
277#else
278 T *MakeRawMessage();
279 // Pointer to the queue that this object fetches from.
280 aos_queue *queue_;
281#endif
282 // Scoped pointer holding the latest message or NULL.
283 ScopedMessagePtr<const T> queue_msg_;
284 DISALLOW_COPY_AND_ASSIGN(Queue<T>);
285};
286
287// Base class for all queue groups.
288class QueueGroup {
289 public:
290 // Constructs a queue group given its name and a unique hash of the name and
291 // type.
292 QueueGroup(const char *name, uint32_t hash) : name_(name), hash_(hash) {}
293
294 // Returns the name of the queue group.
295 const char *name() const { return name_.c_str(); }
296 // Returns a unique hash representing this instance of the queue group.
297 uint32_t hash() const { return hash_; }
298
299 private:
300 std::string name_;
301 uint32_t hash_;
302};
303
304} // namespace aos
305
306#ifdef USE_UNSAFE
307#include "aos/crio/queue-tmpl.h"
308#else
309#include "aos/atom_code/queue-tmpl.h"
310#endif
311#undef USE_UNSAFE
312
313#endif // AOS_COMMON_QUEUE_H_