blob: 3a8eea82a77c0bf99ea18435bf02bdbbcbc5de67 [file] [log] [blame]
brians343bc112013-02-10 01:53:46 +00001namespace aos {
2
3template <class T>
4bool ScopedMessagePtr<T>::Send() {
5 assert(msg_ != NULL);
6 msg_->SetTimeToNow();
7 assert(queue_ != NULL);
8 bool return_value = aos_queue_write_msg_free(queue_, msg_, OVERRIDE) == 0;
9 msg_ = NULL;
10 return return_value;
11}
12
13template <class T>
14bool ScopedMessagePtr<T>::SendBlocking() {
15 assert(msg_ != NULL);
16 msg_->SetTimeToNow();
17 assert(queue_ != NULL);
18 bool return_value = aos_queue_write_msg_free(queue_, msg_, BLOCK) == 0;
19 msg_ = NULL;
20 return return_value;
21}
22
23template <class T>
24void ScopedMessagePtr<T>::reset(T *msg) {
25 if (queue_ != NULL && msg_ != NULL) {
26 aos_queue_free_msg(queue_, msg_);
27 }
28 msg_ = msg;
29}
30
31// A SafeScopedMessagePtr<> manages a message pointer.
32// It frees it properly when the ScopedMessagePtr<> goes out of scope or gets
33// sent. By design, there is no way to get the ScopedMessagePtr to release the
34// message pointer. When the message gets sent, it allocates a queue message,
35// copies the data into it, and then sends it. Copies copy the message.
36template <class T>
37class SafeScopedMessagePtr {
38 public:
39 // Returns a pointer to the message.
40 // This stays valid until Send or the destructor have been called.
41 T *get() { return msg_; }
42
43 T &operator*() {
44 T *msg = get();
45 assert(msg != NULL);
46 return *msg;
47 }
48
49 T *operator->() {
50 T *msg = get();
51 assert(msg != NULL);
52 return msg;
53 }
54
55#ifndef SWIG
56 operator bool() {
57 return msg_ != NULL;
58 }
59
60 const T *get() const { return msg_; }
61
62 const T &operator*() const {
63 const T *msg = get();
64 assert(msg != NULL);
65 return *msg;
66 }
67
68 const T *operator->() const {
69 const T *msg = get();
70 assert(msg != NULL);
71 return msg;
72 }
73#endif // SWIG
74
75 // Sends the message and removes our reference to it.
76 // If the queue is full, over-rides the oldest message in it with our new
77 // message.
78 // Returns true on success, and false otherwise.
79 // The message will be freed.
80 bool Send() {
81 assert(msg_ != NULL);
82 assert(queue_ != NULL);
83 msg_->SetTimeToNow();
84 T *shm_msg = static_cast<T *>(aos_queue_get_msg(queue_));
85 if (shm_msg == NULL) {
86 return false;
87 }
88 *shm_msg = *msg_;
89 bool return_value =
90 aos_queue_write_msg_free(queue_, shm_msg, OVERRIDE) == 0;
91 reset();
92 return return_value;
93 }
94
95 // Sends the message and removes our reference to it.
96 // If the queue is full, blocks until it is no longer full.
97 // Returns true on success, and false otherwise.
98 // Frees the message.
99 bool SendBlocking() {
100 assert(msg_ != NULL);
101 assert(queue_ != NULL);
102 msg_->SetTimeToNow();
103 T *shm_msg = static_cast<T *>(aos_queue_get_msg(queue_));
104 if (shm_msg == NULL) {
105 return false;
106 }
107 *shm_msg = *msg_;
108 bool return_value = aos_queue_write_msg_free(queue_, shm_msg, BLOCK) == 0;
109 reset();
110 return return_value;
111 }
112
113 // Frees the contained message.
114 ~SafeScopedMessagePtr() {
115 reset();
116 }
117
118#ifndef SWIG
119 // Implements a move constructor to take the message pointer from the
120 // temporary object to save work.
121 SafeScopedMessagePtr(SafeScopedMessagePtr<T> &&ptr)
122 : queue_(ptr.queue_),
123 msg_(ptr.msg_) {
124 ptr.msg_ = NULL;
125 }
126#endif // SWIG
127
128 // Copy constructor actually copies the data.
129 SafeScopedMessagePtr(const SafeScopedMessagePtr<T> &ptr)
130 : queue_(ptr.queue_),
131 msg_(NULL) {
132 reset(new T(*ptr.get()));
133 }
134#ifndef SWIG
135 // Equal operator copies the data.
136 void operator=(const SafeScopedMessagePtr<T> &ptr) {
137 queue_ = ptr.queue_;
138 reset(new T(*ptr.get()));
139 }
140#endif // SWIG
141
142 private:
143 // Provide access to private constructor.
144 friend class aos::Queue<typename std::remove_const<T>::type>;
145 friend class aos::SafeMessageBuilder<T>;
146
147 // Only Queue should be able to build a message pointer.
148 SafeScopedMessagePtr(aos_queue *queue)
149 : queue_(queue), msg_(new T()) {}
150
151 // Sets the pointer to msg, freeing the old value if it was there.
152 // This is private because nobody should be able to get a pointer to a message
153 // that needs to be scoped without using the queue.
154 void reset(T *msg = NULL) {
155 if (msg_) {
156 delete msg_;
157 }
158 msg_ = msg;
159 }
160
161 // Sets the queue that owns this message.
162 void set_queue(aos_queue *queue) { queue_ = queue; }
163
164 // The queue that the message is a part of.
165 aos_queue *queue_;
166 // The message or NULL.
167 T *msg_;
168};
169
170template <class T>
171void Queue<T>::Init() {
172 if (queue_ == NULL) {
173 // Signature of the message.
174 aos_type_sig kQueueSignature{sizeof(T), T::kHash, T::kQueueLength};
175
176 queue_ = aos_fetch_queue(queue_name_, &kQueueSignature);
177 queue_msg_.set_queue(queue_);
178 }
179}
180
181template <class T>
182bool Queue<T>::FetchNext() {
183 Init();
184 // TODO(aschuh): Use aos_queue_read_msg_index so that multiple readers
185 // reading don't randomly get only part of the messages.
186 // Document here the tradoffs that are part of each method.
187 const T *msg = static_cast<const T *>(aos_queue_read_msg(queue_,
188 NON_BLOCK));
189 // Only update the internal pointer if we got a new message.
190 if (msg != NULL) {
191 queue_msg_.reset(msg);
192 }
193 return msg != NULL;
194}
195
196template <class T>
197bool Queue<T>::FetchNextBlocking() {
198 Init();
199 const T *msg = static_cast<const T *>(aos_queue_read_msg(queue_, BLOCK));
200 queue_msg_.reset(msg);
201 assert (msg != NULL);
202 return true;
203}
204
205template <class T>
206bool Queue<T>::FetchLatest() {
207 Init();
208 const T *msg = static_cast<const T *>(aos_queue_read_msg(queue_,
209 FROM_END | NON_BLOCK | PEEK));
210 // Only update the internal pointer if we got a new message.
211 if (msg != NULL && msg != queue_msg_.get()) {
212 queue_msg_.reset(msg);
213 return true;
214 }
215 // The message has to get freed if we didn't use it (and aos_queue_free_msg is
216 // ok to call on NULL).
217 aos_queue_free_msg(queue_, msg);
218 return false;
219}
220
221template <class T>
222SafeScopedMessagePtr<T> Queue<T>::SafeMakeMessage() {
223 Init();
224 SafeScopedMessagePtr<T> safe_msg(queue_);
225 safe_msg->Zero();
226 return safe_msg;
227}
228
229template <class T>
230ScopedMessagePtr<T> Queue<T>::MakeMessage() {
231 Init();
232 return ScopedMessagePtr<T>(queue_, MakeRawMessage());
233}
234
235template <class T>
236T *Queue<T>::MakeRawMessage() {
brians35b15c42013-02-26 04:39:09 +0000237 T *ret = static_cast<T *>(aos_queue_get_msg(queue_));
238 assert(ret != NULL);
239 return ret;
brians343bc112013-02-10 01:53:46 +0000240}
241
242template <class T>
243aos::MessageBuilder<T> Queue<T>::MakeWithBuilder() {
244 Init();
245 return aos::MessageBuilder<T>(queue_, MakeRawMessage());
246}
247
248
249// This builder uses the safe message pointer so that it can be safely copied
250// and used by SWIG or in places where it could be leaked.
251template <class T>
252class SafeMessageBuilder {
253 public:
254 typedef T Message;
255 bool Send();
256};
257
258template <class T>
259aos::SafeMessageBuilder<T> Queue<T>::SafeMakeWithBuilder() {
260 Init();
261 return aos::SafeMessageBuilder<T>(queue_);
262}
263
264
265} // namespace aos