blob: bb043e145114137d8187cc096b26dd769e7cc48e [file] [log] [blame]
brians343bc112013-02-10 01:53:46 +00001namespace aos {
2
3template <class T>
4bool ScopedMessagePtr<T>::Send() {
5 assert(msg_ != NULL);
6 msg_->SetTimeToNow();
7 assert(queue_ != NULL);
8 bool return_value = aos_queue_write_msg_free(queue_, msg_, OVERRIDE) == 0;
9 msg_ = NULL;
10 return return_value;
11}
12
13template <class T>
14bool ScopedMessagePtr<T>::SendBlocking() {
15 assert(msg_ != NULL);
16 msg_->SetTimeToNow();
17 assert(queue_ != NULL);
18 bool return_value = aos_queue_write_msg_free(queue_, msg_, BLOCK) == 0;
19 msg_ = NULL;
20 return return_value;
21}
22
23template <class T>
24void ScopedMessagePtr<T>::reset(T *msg) {
25 if (queue_ != NULL && msg_ != NULL) {
26 aos_queue_free_msg(queue_, msg_);
27 }
28 msg_ = msg;
29}
30
31// A SafeScopedMessagePtr<> manages a message pointer.
32// It frees it properly when the ScopedMessagePtr<> goes out of scope or gets
33// sent. By design, there is no way to get the ScopedMessagePtr to release the
34// message pointer. When the message gets sent, it allocates a queue message,
35// copies the data into it, and then sends it. Copies copy the message.
36template <class T>
37class SafeScopedMessagePtr {
38 public:
39 // Returns a pointer to the message.
40 // This stays valid until Send or the destructor have been called.
41 T *get() { return msg_; }
42
43 T &operator*() {
44 T *msg = get();
45 assert(msg != NULL);
46 return *msg;
47 }
48
49 T *operator->() {
50 T *msg = get();
51 assert(msg != NULL);
52 return msg;
53 }
54
55#ifndef SWIG
56 operator bool() {
57 return msg_ != NULL;
58 }
59
60 const T *get() const { return msg_; }
61
62 const T &operator*() const {
63 const T *msg = get();
64 assert(msg != NULL);
65 return *msg;
66 }
67
68 const T *operator->() const {
69 const T *msg = get();
70 assert(msg != NULL);
71 return msg;
72 }
73#endif // SWIG
74
75 // Sends the message and removes our reference to it.
76 // If the queue is full, over-rides the oldest message in it with our new
77 // message.
78 // Returns true on success, and false otherwise.
79 // The message will be freed.
80 bool Send() {
81 assert(msg_ != NULL);
82 assert(queue_ != NULL);
83 msg_->SetTimeToNow();
84 T *shm_msg = static_cast<T *>(aos_queue_get_msg(queue_));
85 if (shm_msg == NULL) {
86 return false;
87 }
88 *shm_msg = *msg_;
89 bool return_value =
90 aos_queue_write_msg_free(queue_, shm_msg, OVERRIDE) == 0;
91 reset();
92 return return_value;
93 }
94
95 // Sends the message and removes our reference to it.
96 // If the queue is full, blocks until it is no longer full.
97 // Returns true on success, and false otherwise.
98 // Frees the message.
99 bool SendBlocking() {
100 assert(msg_ != NULL);
101 assert(queue_ != NULL);
102 msg_->SetTimeToNow();
103 T *shm_msg = static_cast<T *>(aos_queue_get_msg(queue_));
104 if (shm_msg == NULL) {
105 return false;
106 }
107 *shm_msg = *msg_;
108 bool return_value = aos_queue_write_msg_free(queue_, shm_msg, BLOCK) == 0;
109 reset();
110 return return_value;
111 }
112
113 // Frees the contained message.
114 ~SafeScopedMessagePtr() {
115 reset();
116 }
117
118#ifndef SWIG
119 // Implements a move constructor to take the message pointer from the
120 // temporary object to save work.
121 SafeScopedMessagePtr(SafeScopedMessagePtr<T> &&ptr)
122 : queue_(ptr.queue_),
123 msg_(ptr.msg_) {
124 ptr.msg_ = NULL;
125 }
126#endif // SWIG
127
128 // Copy constructor actually copies the data.
129 SafeScopedMessagePtr(const SafeScopedMessagePtr<T> &ptr)
130 : queue_(ptr.queue_),
131 msg_(NULL) {
132 reset(new T(*ptr.get()));
133 }
134#ifndef SWIG
135 // Equal operator copies the data.
136 void operator=(const SafeScopedMessagePtr<T> &ptr) {
137 queue_ = ptr.queue_;
138 reset(new T(*ptr.get()));
139 }
140#endif // SWIG
141
142 private:
143 // Provide access to private constructor.
144 friend class aos::Queue<typename std::remove_const<T>::type>;
145 friend class aos::SafeMessageBuilder<T>;
146
147 // Only Queue should be able to build a message pointer.
148 SafeScopedMessagePtr(aos_queue *queue)
149 : queue_(queue), msg_(new T()) {}
150
151 // Sets the pointer to msg, freeing the old value if it was there.
152 // This is private because nobody should be able to get a pointer to a message
153 // that needs to be scoped without using the queue.
154 void reset(T *msg = NULL) {
155 if (msg_) {
156 delete msg_;
157 }
158 msg_ = msg;
159 }
160
161 // Sets the queue that owns this message.
162 void set_queue(aos_queue *queue) { queue_ = queue; }
163
164 // The queue that the message is a part of.
165 aos_queue *queue_;
166 // The message or NULL.
167 T *msg_;
168};
169
170template <class T>
171void Queue<T>::Init() {
172 if (queue_ == NULL) {
173 // Signature of the message.
Brian Silverman8efe23e2013-07-07 23:31:37 -0700174 aos_type_sig kQueueSignature{sizeof(T), static_cast<int>(T::kHash),
175 T::kQueueLength};
brians343bc112013-02-10 01:53:46 +0000176
177 queue_ = aos_fetch_queue(queue_name_, &kQueueSignature);
178 queue_msg_.set_queue(queue_);
179 }
180}
181
182template <class T>
Austin Schuhdc1c84a2013-02-23 16:33:10 -0800183void Queue<T>::Clear() {
Austin Schuhfa033692013-02-24 01:00:55 -0800184 if (queue_ != NULL) {
Austin Schuhdc1c84a2013-02-23 16:33:10 -0800185 queue_msg_.reset();
186 queue_ = NULL;
187 queue_msg_.set_queue(NULL);
188 }
189}
190
191template <class T>
brians343bc112013-02-10 01:53:46 +0000192bool Queue<T>::FetchNext() {
193 Init();
194 // TODO(aschuh): Use aos_queue_read_msg_index so that multiple readers
195 // reading don't randomly get only part of the messages.
196 // Document here the tradoffs that are part of each method.
197 const T *msg = static_cast<const T *>(aos_queue_read_msg(queue_,
198 NON_BLOCK));
199 // Only update the internal pointer if we got a new message.
200 if (msg != NULL) {
201 queue_msg_.reset(msg);
202 }
203 return msg != NULL;
204}
205
206template <class T>
207bool Queue<T>::FetchNextBlocking() {
208 Init();
209 const T *msg = static_cast<const T *>(aos_queue_read_msg(queue_, BLOCK));
210 queue_msg_.reset(msg);
211 assert (msg != NULL);
212 return true;
213}
214
215template <class T>
216bool Queue<T>::FetchLatest() {
217 Init();
218 const T *msg = static_cast<const T *>(aos_queue_read_msg(queue_,
219 FROM_END | NON_BLOCK | PEEK));
220 // Only update the internal pointer if we got a new message.
221 if (msg != NULL && msg != queue_msg_.get()) {
222 queue_msg_.reset(msg);
223 return true;
224 }
225 // The message has to get freed if we didn't use it (and aos_queue_free_msg is
226 // ok to call on NULL).
227 aos_queue_free_msg(queue_, msg);
228 return false;
229}
230
231template <class T>
232SafeScopedMessagePtr<T> Queue<T>::SafeMakeMessage() {
233 Init();
234 SafeScopedMessagePtr<T> safe_msg(queue_);
235 safe_msg->Zero();
236 return safe_msg;
237}
238
239template <class T>
240ScopedMessagePtr<T> Queue<T>::MakeMessage() {
241 Init();
242 return ScopedMessagePtr<T>(queue_, MakeRawMessage());
243}
244
245template <class T>
246T *Queue<T>::MakeRawMessage() {
brians35b15c42013-02-26 04:39:09 +0000247 T *ret = static_cast<T *>(aos_queue_get_msg(queue_));
248 assert(ret != NULL);
249 return ret;
brians343bc112013-02-10 01:53:46 +0000250}
251
252template <class T>
253aos::MessageBuilder<T> Queue<T>::MakeWithBuilder() {
254 Init();
255 return aos::MessageBuilder<T>(queue_, MakeRawMessage());
256}
257
258
259// This builder uses the safe message pointer so that it can be safely copied
260// and used by SWIG or in places where it could be leaked.
261template <class T>
262class SafeMessageBuilder {
263 public:
264 typedef T Message;
265 bool Send();
266};
267
268template <class T>
269aos::SafeMessageBuilder<T> Queue<T>::SafeMakeWithBuilder() {
270 Init();
271 return aos::SafeMessageBuilder<T>(queue_);
272}
273
274
275} // namespace aos