blob: 81d036cfdcfc2be119e7808cbf0371b104475e77 [file] [log] [blame]
Brian Silverman08661c72013-09-01 17:24:38 -07001#include "aos/atom_code/ipc_lib/queue.h"
Brian Silvermana6d1b562013-09-01 14:39:39 -07002
3#include <stdio.h>
4#include <string.h>
5#include <errno.h>
6#include <assert.h>
7
8#include <memory>
9
10#include "aos/common/logging/logging.h"
11#include "aos/common/type_traits.h"
Brian Silverman08661c72013-09-01 17:24:38 -070012#include "aos/atom_code/ipc_lib/core_lib.h"
Brian Silvermana6d1b562013-09-01 14:39:39 -070013
14namespace aos {
Brian Silvermana6d1b562013-09-01 14:39:39 -070015namespace {
16
Brian Silverman08661c72013-09-01 17:24:38 -070017static_assert(shm_ok<RawQueue>::value,
18 "RawQueue instances go into shared memory");
Brian Silvermana6d1b562013-09-01 14:39:39 -070019
20const bool kReadDebug = false;
21const bool kWriteDebug = false;
22const bool kRefDebug = false;
23const bool kFetchDebug = false;
24
25// The number of extra messages the pool associated with each queue will be able
Brian Silverman08661c72013-09-01 17:24:38 -070026// to hold (for readers who are slow about freeing them or who leak one when
27// they get killed).
Brian Silvermana6d1b562013-09-01 14:39:39 -070028const int kExtraMessages = 20;
29
30} // namespace
31
Brian Silverman08661c72013-09-01 17:24:38 -070032const int RawQueue::kPeek;
33const int RawQueue::kFromEnd;
34const int RawQueue::kNonBlock;
35const int RawQueue::kBlock;
36const int RawQueue::kOverride;
37
38struct RawQueue::MessageHeader {
Brian Silvermana6d1b562013-09-01 14:39:39 -070039 int ref_count;
40 int index; // in pool_
41 static MessageHeader *Get(const void *msg) {
42 return reinterpret_cast<MessageHeader *>(
43 static_cast<uint8_t *>(const_cast<void *>(msg)) -
44 sizeof(MessageHeader));
45 }
46 void Swap(MessageHeader *other) {
47 MessageHeader temp;
48 memcpy(&temp, other, sizeof(temp));
49 memcpy(other, this, sizeof(*other));
50 memcpy(this, &temp, sizeof(*this));
51 }
52};
Brian Silverman08661c72013-09-01 17:24:38 -070053static_assert(shm_ok<RawQueue::MessageHeader>::value, "the whole point"
Brian Silvermana6d1b562013-09-01 14:39:39 -070054 " is to stick it in shared memory");
55
56// TODO(brians) maybe do this with atomic integer instructions so it doesn't
57// have to lock/unlock pool_lock_
Brian Silverman08661c72013-09-01 17:24:38 -070058void RawQueue::DecrementMessageReferenceCount(const void *msg) {
Brian Silvermana6d1b562013-09-01 14:39:39 -070059 MutexLocker locker(&pool_lock_);
60 MessageHeader *header = MessageHeader::Get(msg);
61 --header->ref_count;
62 assert(header->ref_count >= 0);
63 if (kRefDebug) {
64 printf("ref_dec_count: %p count=%d\n", msg, header->ref_count);
65 }
66 if (header->ref_count == 0) {
67 DoFreeMessage(msg);
68 }
69}
70
Brian Silverman08661c72013-09-01 17:24:38 -070071RawQueue::RawQueue(const char *name, size_t length, int hash, int queue_length)
72 : readable_(&data_lock_), writable_(&data_lock_) {
Brian Silvermana6d1b562013-09-01 14:39:39 -070073 const size_t name_size = strlen(name) + 1;
74 char *temp = static_cast<char *>(shm_malloc(name_size));
75 memcpy(temp, name, name_size);
76 name_ = temp;
77 length_ = length;
78 hash_ = hash;
79 queue_length_ = queue_length;
80
81 next_ = NULL;
82 recycle_ = NULL;
83
84 if (kFetchDebug) {
85 printf("initializing name=%s, length=%zd, hash=%d, queue_length=%d\n",
86 name, length, hash, queue_length);
87 }
88
89 data_length_ = queue_length + 1;
90 if (data_length_ < 2) { // TODO(brians) when could this happen?
91 data_length_ = 2;
92 }
93 data_ = static_cast<void **>(shm_malloc(sizeof(void *) * data_length_));
94 data_start_ = 0;
95 data_end_ = 0;
96 messages_ = 0;
97
98 mem_length_ = queue_length + kExtraMessages;
99 pool_length_ = 0;
100 messages_used_ = 0;
101 msg_length_ = length + sizeof(MessageHeader);
102 pool_ = static_cast<MessageHeader **>(
103 shm_malloc(sizeof(MessageHeader *) * mem_length_));
104
105 if (kFetchDebug) {
106 printf("made queue %s\n", name);
107 }
108}
Brian Silverman08661c72013-09-01 17:24:38 -0700109RawQueue *RawQueue::Fetch(const char *name, size_t length, int hash,
Brian Silvermana6d1b562013-09-01 14:39:39 -0700110 int queue_length) {
111 if (kFetchDebug) {
112 printf("fetching queue %s\n", name);
113 }
114 if (mutex_lock(&global_core->mem_struct->queues.alloc_lock) != 0) {
115 return NULL;
116 }
Brian Silverman08661c72013-09-01 17:24:38 -0700117 RawQueue *current = static_cast<RawQueue *>(
Brian Silvermana6d1b562013-09-01 14:39:39 -0700118 global_core->mem_struct->queues.queue_list);
Brian Silverman08661c72013-09-01 17:24:38 -0700119 RawQueue *last = NULL;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700120 while (current != NULL) {
121 // if we found a matching queue
122 if (strcmp(current->name_, name) == 0 && current->length_ == length &&
123 current->hash_ == hash && current->queue_length_ == queue_length) {
124 mutex_unlock(&global_core->mem_struct->queues.alloc_lock);
125 return current;
126 } else {
127 if (kFetchDebug) {
128 printf("rejected queue %s strcmp=%d target=%s\n", current->name_,
129 strcmp(current->name_, name), name);
130 }
131 }
132 current = current->next_;
133 }
134
Brian Silverman08661c72013-09-01 17:24:38 -0700135 void *temp = shm_malloc(sizeof(RawQueue));
136 current = new (temp) RawQueue(name, length, hash, queue_length);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700137 if (last == NULL) { // if we don't have one to tack the new one on to
138 global_core->mem_struct->queues.queue_list = current;
139 } else {
140 last->next_ = current;
141 }
142
143 mutex_unlock(&global_core->mem_struct->queues.alloc_lock);
144 return current;
145}
Brian Silverman08661c72013-09-01 17:24:38 -0700146RawQueue *RawQueue::Fetch(const char *name, size_t length, int hash,
Brian Silvermana6d1b562013-09-01 14:39:39 -0700147 int queue_length,
Brian Silverman08661c72013-09-01 17:24:38 -0700148 int recycle_hash, int recycle_length, RawQueue **recycle) {
149 RawQueue *r = Fetch(name, length, hash, queue_length);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700150 r->recycle_ = Fetch(name, length, recycle_hash, recycle_length);
151 if (r == r->recycle_) {
152 fprintf(stderr, "queue: r->recycle_(=%p) == r(=%p)\n", r->recycle_, r);
153 printf("see stderr\n");
154 abort();
155 }
156 *recycle = r->recycle_;
157 return r;
158}
159
Brian Silverman08661c72013-09-01 17:24:38 -0700160void RawQueue::DoFreeMessage(const void *msg) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700161 MessageHeader *header = MessageHeader::Get(msg);
162 if (pool_[header->index] != header) { // if something's messed up
163 fprintf(stderr, "queue: something is very very wrong with queue %p."
164 " pool_(=%p)[header->index(=%d)] != header(=%p)\n",
165 this, pool_, header->index, header);
166 printf("queue: see stderr\n");
167 abort();
168 }
169 if (kRefDebug) {
170 printf("ref free: %p\n", msg);
171 }
172 --messages_used_;
173
174 if (recycle_ != NULL) {
175 void *const new_msg = recycle_->GetMessage();
176 if (new_msg == NULL) {
177 fprintf(stderr, "queue: couldn't get a message"
178 " for recycle queue %p\n", recycle_);
179 } else {
180 // Take a message from recycle_ and switch its
181 // header with the one being freed, which effectively
182 // switches which queue each message belongs to.
183 MessageHeader *const new_header = MessageHeader::Get(new_msg);
184 // also switch the messages between the pools
185 pool_[header->index] = new_header;
186 {
187 MutexLocker locker(&recycle_->pool_lock_);
188 recycle_->pool_[new_header->index] = header;
189 // swap the information in both headers
190 header->Swap(new_header);
191 // don't unlock the other pool until all of its messages are valid
192 }
193 // use the header for new_msg which is now for this pool
194 header = new_header;
195 if (!recycle_->WriteMessage(const_cast<void *>(msg), kOverride)) {
196 fprintf(stderr, "queue: %p->WriteMessage(%p, kOverride) failed."
197 " aborting\n", recycle_, msg);
198 printf("see stderr\n");
199 abort();
200 }
201 msg = new_msg;
202 }
203 }
204
205 // where the one we're freeing was
206 int index = header->index;
207 header->index = -1;
208 if (index != messages_used_) { // if we're not freeing the one on the end
209 // put the last one where the one we're freeing was
210 header = pool_[index] = pool_[messages_used_];
211 // put the one we're freeing at the end
212 pool_[messages_used_] = MessageHeader::Get(msg);
213 // update the former last one's index
214 header->index = index;
215 }
216}
217
Brian Silverman08661c72013-09-01 17:24:38 -0700218bool RawQueue::WriteMessage(void *msg, int options) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700219 if (kWriteDebug) {
220 printf("queue: %p->WriteMessage(%p, %d)\n", this, msg, options);
221 }
222 if (msg == NULL || msg < reinterpret_cast<void *>(global_core->mem_struct) ||
223 msg > static_cast<void *>((
Brian Silverman08661c72013-09-01 17:24:38 -0700224 reinterpret_cast<char *>(global_core->mem_struct) +
Brian Silvermana6d1b562013-09-01 14:39:39 -0700225 global_core->size))) {
226 fprintf(stderr, "queue: attempt to write bad message %p to %p. aborting\n",
227 msg, this);
228 printf("see stderr\n");
229 abort();
230 }
231 {
232 MutexLocker locker(&data_lock_);
233 int new_end = (data_end_ + 1) % data_length_;
234 while (new_end == data_start_) {
235 if (options & kNonBlock) {
236 if (kWriteDebug) {
237 printf("queue: not blocking on %p. returning -1\n", this);
238 }
239 return false;
240 } else if (options & kOverride) {
241 if (kWriteDebug) {
242 printf("queue: overriding on %p\n", this);
243 }
244 // avoid leaking the message that we're going to overwrite
245 DecrementMessageReferenceCount(data_[data_start_]);
246 data_start_ = (data_start_ + 1) % data_length_;
247 } else { // kBlock
248 if (kWriteDebug) {
249 printf("queue: going to wait for writable_ of %p\n", this);
250 }
Brian Silverman08661c72013-09-01 17:24:38 -0700251 writable_.Wait();
Brian Silvermana6d1b562013-09-01 14:39:39 -0700252 }
253 new_end = (data_end_ + 1) % data_length_;
254 }
255 data_[data_end_] = msg;
256 ++messages_;
257 data_end_ = new_end;
258 }
259 if (kWriteDebug) {
260 printf("queue: setting readable of %p\n", this);
261 }
262 readable_.Signal();
263 if (kWriteDebug) {
264 printf("queue: write returning true on queue %p\n", this);
265 }
266 return true;
267}
268
Brian Silverman08661c72013-09-01 17:24:38 -0700269void RawQueue::ReadCommonEnd(bool read) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700270 if (read) {
271 writable_.Signal();
272 }
273}
Brian Silverman08661c72013-09-01 17:24:38 -0700274bool RawQueue::ReadCommonStart(int options, int *index) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700275 while (data_start_ == data_end_ || ((index != NULL) && messages_ <= *index)) {
276 if (options & kNonBlock) {
277 if (kReadDebug) {
278 printf("queue: not going to block waiting on %p\n", this);
279 }
280 return false;
281 } else { // kBlock
282 if (kReadDebug) {
283 printf("queue: going to wait for readable of %p\n", this);
284 }
285 data_lock_.Unlock();
286 // wait for a message to become readable
287 readable_.Wait();
288 if (kReadDebug) {
289 printf("queue: done waiting for readable of %p\n", this);
290 }
291 data_lock_.Lock();
292 }
293 }
294 if (kReadDebug) {
295 printf("queue: %p->read start=%d end=%d\n", this, data_start_, data_end_);
296 }
297 return true;
298}
Brian Silverman08661c72013-09-01 17:24:38 -0700299void *RawQueue::ReadPeek(int options, int start) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700300 void *ret;
301 if (options & kFromEnd) {
302 int pos = data_end_ - 1;
303 if (pos < 0) { // if it needs to wrap
304 pos = data_length_ - 1;
305 }
306 if (kReadDebug) {
307 printf("queue: reading from line %d: %d\n", __LINE__, pos);
308 }
309 ret = data_[pos];
310 } else {
311 if (kReadDebug) {
312 printf("queue: reading from line %d: %d\n", __LINE__, start);
313 }
314 ret = data_[start];
315 }
316 MessageHeader *const header = MessageHeader::Get(ret);
317 ++header->ref_count;
318 if (kRefDebug) {
319 printf("ref inc count: %p\n", ret);
320 }
321 return ret;
322}
Brian Silverman08661c72013-09-01 17:24:38 -0700323const void *RawQueue::ReadMessage(int options) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700324 if (kReadDebug) {
325 printf("queue: %p->ReadMessage(%d)\n", this, options);
326 }
327 void *msg = NULL;
328 MutexLocker locker(&data_lock_);
329 if (!ReadCommonStart(options, NULL)) {
330 if (kReadDebug) {
331 printf("queue: common returned false for %p\n", this);
332 }
333 return NULL;
334 }
335 if (options & kPeek) {
336 msg = ReadPeek(options, data_start_);
337 } else {
338 if (options & kFromEnd) {
339 while (1) {
340 if (kReadDebug) {
341 printf("queue: start of c2 of %p\n", this);
342 }
343 // This loop pulls each message out of the buffer.
344 const int pos = data_start_;
345 data_start_ = (data_start_ + 1) % data_length_;
346 // if this is the last one
347 if (data_start_ == data_end_) {
348 if (kReadDebug) {
349 printf("queue: reading from c2: %d\n", pos);
350 }
351 msg = data_[pos];
352 break;
353 }
354 // it's not going to be in the queue any more
355 DecrementMessageReferenceCount(data_[pos]);
356 }
357 } else {
358 if (kReadDebug) {
359 printf("queue: reading from d2: %d\n", data_start_);
360 }
361 msg = data_[data_start_];
362 data_start_ = (data_start_ + 1) % data_length_;
363 }
364 }
365 ReadCommonEnd(!(options & kPeek));
366 if (kReadDebug) {
367 printf("queue: read returning %p\n", msg);
368 }
369 return msg;
370}
Brian Silverman08661c72013-09-01 17:24:38 -0700371const void *RawQueue::ReadMessageIndex(int options, int *index) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700372 if (kReadDebug) {
373 printf("queue: %p->ReadMessageIndex(%d, %p(*=%d))\n",
374 this, options, index, *index);
375 }
376 void *msg = NULL;
377 {
378 MutexLocker locker(&data_lock_);
379 if (!ReadCommonStart(options, index)) {
380 if (kReadDebug) {
381 printf("queue: common returned false for %p\n", this);
382 }
383 return NULL;
384 }
385 // TODO(parker): Handle integer wrap on the index.
386 const int offset = messages_ - *index;
387 int my_start = data_end_ - offset;
388 if (offset >= data_length_) { // if we're behind the available messages
389 // catch index up to the last available message
390 *index += data_start_ - my_start;
391 // and that's the one we're going to read
392 my_start = data_start_;
393 }
394 if (my_start < 0) { // if we want to read off the end of the buffer
395 // unwrap where we're going to read from
396 my_start += data_length_;
397 }
398 if (options & kPeek) {
399 msg = ReadPeek(options, my_start);
400 } else {
401 if (options & kFromEnd) {
402 if (kReadDebug) {
403 printf("queue: start of c1 of %p\n", this);
404 }
405 int pos = data_end_ - 1;
406 if (pos < 0) { // if it wrapped
407 pos = data_length_ - 1; // unwrap it
408 }
409 if (kReadDebug) {
410 printf("queue: reading from c1: %d\n", pos);
411 }
412 msg = data_[pos];
413 *index = messages_;
414 } else {
415 if (kReadDebug) {
416 printf("queue: reading from d1: %d\n", my_start);
417 }
418 msg = data_[my_start];
419 ++(*index);
420 }
421 MessageHeader *const header = MessageHeader::Get(msg);
422 ++header->ref_count;
423 if (kRefDebug) {
424 printf("ref_inc_count: %p\n", msg);
425 }
426 }
427 }
428 // this function never consumes one off the queue
429 ReadCommonEnd(false);
430 return msg;
431}
432
Brian Silverman08661c72013-09-01 17:24:38 -0700433void *RawQueue::GetMessage() {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700434 MutexLocker locker(&pool_lock_);
435 MessageHeader *header;
436 if (pool_length_ - messages_used_ > 0) {
437 header = pool_[messages_used_];
438 } else {
439 if (pool_length_ >= mem_length_) {
Brian Silverman08661c72013-09-01 17:24:38 -0700440 LOG(FATAL, "overused pool of queue %p\n", this);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700441 }
442 header = pool_[pool_length_] =
443 static_cast<MessageHeader *>(shm_malloc(msg_length_));
444 ++pool_length_;
445 }
446 void *msg = reinterpret_cast<uint8_t *>(header) + sizeof(MessageHeader);
447 header->ref_count = 1;
448 if (kRefDebug) {
449 printf("ref alloc: %p\n", msg);
450 }
451 header->index = messages_used_;
452 ++messages_used_;
453 return msg;
454}
455
456} // namespace aos