blob: 018f03af6976522dd99a09f76b0d1c6f86d79513 [file] [log] [blame]
Brian Silverman08661c72013-09-01 17:24:38 -07001#include "aos/atom_code/ipc_lib/queue.h"
Brian Silvermana6d1b562013-09-01 14:39:39 -07002
3#include <stdio.h>
4#include <string.h>
5#include <errno.h>
6#include <assert.h>
7
8#include <memory>
9
10#include "aos/common/logging/logging.h"
11#include "aos/common/type_traits.h"
Brian Silverman08661c72013-09-01 17:24:38 -070012#include "aos/atom_code/ipc_lib/core_lib.h"
Brian Silvermana6d1b562013-09-01 14:39:39 -070013
14namespace aos {
Brian Silvermana6d1b562013-09-01 14:39:39 -070015namespace {
16
Brian Silverman08661c72013-09-01 17:24:38 -070017static_assert(shm_ok<RawQueue>::value,
18 "RawQueue instances go into shared memory");
Brian Silvermana6d1b562013-09-01 14:39:39 -070019
20const bool kReadDebug = false;
21const bool kWriteDebug = false;
22const bool kRefDebug = false;
23const bool kFetchDebug = false;
24
25// The number of extra messages the pool associated with each queue will be able
Brian Silverman08661c72013-09-01 17:24:38 -070026// to hold (for readers who are slow about freeing them or who leak one when
27// they get killed).
Brian Silvermana6d1b562013-09-01 14:39:39 -070028const int kExtraMessages = 20;
29
30} // namespace
31
Brian Silverman08661c72013-09-01 17:24:38 -070032const int RawQueue::kPeek;
33const int RawQueue::kFromEnd;
34const int RawQueue::kNonBlock;
35const int RawQueue::kBlock;
36const int RawQueue::kOverride;
37
38struct RawQueue::MessageHeader {
Brian Silvermana6d1b562013-09-01 14:39:39 -070039 int ref_count;
40 int index; // in pool_
41 static MessageHeader *Get(const void *msg) {
Brian Silverman63cf2412013-11-17 05:44:36 -080042 return reinterpret_cast<MessageHeader *>(__builtin_assume_aligned(
43 static_cast<uint8_t *>(const_cast<void *>(msg)) - sizeof(MessageHeader),
44 alignof(MessageHeader)));
Brian Silvermana6d1b562013-09-01 14:39:39 -070045 }
46 void Swap(MessageHeader *other) {
47 MessageHeader temp;
48 memcpy(&temp, other, sizeof(temp));
49 memcpy(other, this, sizeof(*other));
50 memcpy(this, &temp, sizeof(*this));
51 }
52};
Brian Silverman08661c72013-09-01 17:24:38 -070053static_assert(shm_ok<RawQueue::MessageHeader>::value, "the whole point"
Brian Silvermana6d1b562013-09-01 14:39:39 -070054 " is to stick it in shared memory");
55
Brian Silverman797e71e2013-09-06 17:29:39 -070056struct RawQueue::ReadData {
57 bool writable_start;
58};
59
Brian Silvermana6d1b562013-09-01 14:39:39 -070060// TODO(brians) maybe do this with atomic integer instructions so it doesn't
61// have to lock/unlock pool_lock_
Brian Silverman08661c72013-09-01 17:24:38 -070062void RawQueue::DecrementMessageReferenceCount(const void *msg) {
Brian Silvermana6d1b562013-09-01 14:39:39 -070063 MutexLocker locker(&pool_lock_);
64 MessageHeader *header = MessageHeader::Get(msg);
65 --header->ref_count;
66 assert(header->ref_count >= 0);
67 if (kRefDebug) {
68 printf("ref_dec_count: %p count=%d\n", msg, header->ref_count);
69 }
70 if (header->ref_count == 0) {
71 DoFreeMessage(msg);
72 }
73}
74
Brian Silverman08661c72013-09-01 17:24:38 -070075RawQueue::RawQueue(const char *name, size_t length, int hash, int queue_length)
76 : readable_(&data_lock_), writable_(&data_lock_) {
Brian Silvermana6d1b562013-09-01 14:39:39 -070077 const size_t name_size = strlen(name) + 1;
78 char *temp = static_cast<char *>(shm_malloc(name_size));
79 memcpy(temp, name, name_size);
80 name_ = temp;
81 length_ = length;
82 hash_ = hash;
83 queue_length_ = queue_length;
84
85 next_ = NULL;
86 recycle_ = NULL;
87
88 if (kFetchDebug) {
89 printf("initializing name=%s, length=%zd, hash=%d, queue_length=%d\n",
90 name, length, hash, queue_length);
91 }
92
93 data_length_ = queue_length + 1;
94 if (data_length_ < 2) { // TODO(brians) when could this happen?
95 data_length_ = 2;
96 }
97 data_ = static_cast<void **>(shm_malloc(sizeof(void *) * data_length_));
98 data_start_ = 0;
99 data_end_ = 0;
100 messages_ = 0;
101
102 mem_length_ = queue_length + kExtraMessages;
103 pool_length_ = 0;
104 messages_used_ = 0;
105 msg_length_ = length + sizeof(MessageHeader);
106 pool_ = static_cast<MessageHeader **>(
107 shm_malloc(sizeof(MessageHeader *) * mem_length_));
108
109 if (kFetchDebug) {
110 printf("made queue %s\n", name);
111 }
112}
Brian Silverman08661c72013-09-01 17:24:38 -0700113RawQueue *RawQueue::Fetch(const char *name, size_t length, int hash,
Brian Silvermana6d1b562013-09-01 14:39:39 -0700114 int queue_length) {
115 if (kFetchDebug) {
116 printf("fetching queue %s\n", name);
117 }
118 if (mutex_lock(&global_core->mem_struct->queues.alloc_lock) != 0) {
119 return NULL;
120 }
Brian Silverman08661c72013-09-01 17:24:38 -0700121 RawQueue *current = static_cast<RawQueue *>(
Brian Silvermana6d1b562013-09-01 14:39:39 -0700122 global_core->mem_struct->queues.queue_list);
Brian Silverman797e71e2013-09-06 17:29:39 -0700123 if (current != NULL) {
124 while (true) {
125 // If we found a matching queue.
126 if (strcmp(current->name_, name) == 0 && current->length_ == length &&
127 current->hash_ == hash && current->queue_length_ == queue_length) {
128 mutex_unlock(&global_core->mem_struct->queues.alloc_lock);
129 return current;
130 } else {
131 if (kFetchDebug) {
132 printf("rejected queue %s strcmp=%d target=%s\n", current->name_,
133 strcmp(current->name_, name), name);
134 }
Brian Silvermana6d1b562013-09-01 14:39:39 -0700135 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700136 // If this is the last one.
137 if (current->next_ == NULL) break;
138 current = current->next_;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700139 }
Brian Silvermana6d1b562013-09-01 14:39:39 -0700140 }
141
Brian Silverman797e71e2013-09-06 17:29:39 -0700142 RawQueue *r = new (shm_malloc(sizeof(RawQueue)))
143 RawQueue(name, length, hash, queue_length);
144 if (current == NULL) { // if we don't already have one
145 global_core->mem_struct->queues.queue_list = r;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700146 } else {
Brian Silverman797e71e2013-09-06 17:29:39 -0700147 current->next_ = r;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700148 }
149
150 mutex_unlock(&global_core->mem_struct->queues.alloc_lock);
Brian Silverman797e71e2013-09-06 17:29:39 -0700151 return r;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700152}
Brian Silverman08661c72013-09-01 17:24:38 -0700153RawQueue *RawQueue::Fetch(const char *name, size_t length, int hash,
Brian Silvermana6d1b562013-09-01 14:39:39 -0700154 int queue_length,
Brian Silverman08661c72013-09-01 17:24:38 -0700155 int recycle_hash, int recycle_length, RawQueue **recycle) {
156 RawQueue *r = Fetch(name, length, hash, queue_length);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700157 r->recycle_ = Fetch(name, length, recycle_hash, recycle_length);
158 if (r == r->recycle_) {
159 fprintf(stderr, "queue: r->recycle_(=%p) == r(=%p)\n", r->recycle_, r);
160 printf("see stderr\n");
Brian Silverman797e71e2013-09-06 17:29:39 -0700161 r->recycle_ = NULL;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700162 abort();
163 }
164 *recycle = r->recycle_;
165 return r;
166}
167
Brian Silverman08661c72013-09-01 17:24:38 -0700168void RawQueue::DoFreeMessage(const void *msg) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700169 MessageHeader *header = MessageHeader::Get(msg);
170 if (pool_[header->index] != header) { // if something's messed up
171 fprintf(stderr, "queue: something is very very wrong with queue %p."
172 " pool_(=%p)[header->index(=%d)] != header(=%p)\n",
173 this, pool_, header->index, header);
174 printf("queue: see stderr\n");
175 abort();
176 }
177 if (kRefDebug) {
178 printf("ref free: %p\n", msg);
179 }
180 --messages_used_;
181
182 if (recycle_ != NULL) {
183 void *const new_msg = recycle_->GetMessage();
184 if (new_msg == NULL) {
185 fprintf(stderr, "queue: couldn't get a message"
186 " for recycle queue %p\n", recycle_);
187 } else {
188 // Take a message from recycle_ and switch its
189 // header with the one being freed, which effectively
190 // switches which queue each message belongs to.
191 MessageHeader *const new_header = MessageHeader::Get(new_msg);
Brian Silverman797e71e2013-09-06 17:29:39 -0700192 // Also switch the messages between the pools.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700193 pool_[header->index] = new_header;
194 {
195 MutexLocker locker(&recycle_->pool_lock_);
196 recycle_->pool_[new_header->index] = header;
Brian Silverman797e71e2013-09-06 17:29:39 -0700197 // Swap the information in both headers.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700198 header->Swap(new_header);
Brian Silverman797e71e2013-09-06 17:29:39 -0700199 // Don't unlock the other pool until all of its messages are valid.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700200 }
201 // use the header for new_msg which is now for this pool
202 header = new_header;
203 if (!recycle_->WriteMessage(const_cast<void *>(msg), kOverride)) {
204 fprintf(stderr, "queue: %p->WriteMessage(%p, kOverride) failed."
205 " aborting\n", recycle_, msg);
206 printf("see stderr\n");
207 abort();
208 }
209 msg = new_msg;
210 }
211 }
212
Brian Silverman797e71e2013-09-06 17:29:39 -0700213 // Where the one we're freeing was.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700214 int index = header->index;
215 header->index = -1;
216 if (index != messages_used_) { // if we're not freeing the one on the end
Brian Silverman797e71e2013-09-06 17:29:39 -0700217 // Put the last one where the one we're freeing was.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700218 header = pool_[index] = pool_[messages_used_];
Brian Silverman797e71e2013-09-06 17:29:39 -0700219 // Put the one we're freeing at the end.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700220 pool_[messages_used_] = MessageHeader::Get(msg);
Brian Silverman797e71e2013-09-06 17:29:39 -0700221 // Update the former last one's index.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700222 header->index = index;
223 }
224}
225
Brian Silverman08661c72013-09-01 17:24:38 -0700226bool RawQueue::WriteMessage(void *msg, int options) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700227 if (kWriteDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700228 printf("queue: %p->WriteMessage(%p, %x)\n", this, msg, options);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700229 }
230 if (msg == NULL || msg < reinterpret_cast<void *>(global_core->mem_struct) ||
231 msg > static_cast<void *>((
Brian Silverman08661c72013-09-01 17:24:38 -0700232 reinterpret_cast<char *>(global_core->mem_struct) +
Brian Silvermana6d1b562013-09-01 14:39:39 -0700233 global_core->size))) {
234 fprintf(stderr, "queue: attempt to write bad message %p to %p. aborting\n",
235 msg, this);
236 printf("see stderr\n");
237 abort();
238 }
239 {
240 MutexLocker locker(&data_lock_);
Brian Silverman797e71e2013-09-06 17:29:39 -0700241 bool writable_waited = false;
242
243 int new_end;
244 while (true) {
245 new_end = (data_end_ + 1) % data_length_;
246 // If there is room in the queue right now.
247 if (new_end != data_start_) break;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700248 if (options & kNonBlock) {
249 if (kWriteDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700250 printf("queue: not blocking on %p. returning false\n", this);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700251 }
252 return false;
253 } else if (options & kOverride) {
254 if (kWriteDebug) {
255 printf("queue: overriding on %p\n", this);
256 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700257 // Avoid leaking the message that we're going to overwrite.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700258 DecrementMessageReferenceCount(data_[data_start_]);
259 data_start_ = (data_start_ + 1) % data_length_;
260 } else { // kBlock
261 if (kWriteDebug) {
262 printf("queue: going to wait for writable_ of %p\n", this);
263 }
Brian Silverman08661c72013-09-01 17:24:38 -0700264 writable_.Wait();
Brian Silverman797e71e2013-09-06 17:29:39 -0700265 writable_waited = true;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700266 }
Brian Silvermana6d1b562013-09-01 14:39:39 -0700267 }
268 data_[data_end_] = msg;
269 ++messages_;
270 data_end_ = new_end;
Brian Silverman797e71e2013-09-06 17:29:39 -0700271
272 if (kWriteDebug) {
273 printf("queue: broadcasting to readable_ of %p\n", this);
274 }
275 readable_.Broadcast();
276
277 // If we got a signal on writable_ here and it's still writable, then we
278 // need to signal the next person in line (if any).
279 if (writable_waited && is_writable()) {
280 if (kWriteDebug) {
281 printf("queue: resignalling writable_ of %p\n", this);
282 }
283 writable_.Signal();
284 }
Brian Silvermana6d1b562013-09-01 14:39:39 -0700285 }
286 if (kWriteDebug) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700287 printf("queue: write returning true on queue %p\n", this);
288 }
289 return true;
290}
291
Brian Silverman797e71e2013-09-06 17:29:39 -0700292void RawQueue::ReadCommonEnd(ReadData *read_data) {
293 if (is_writable()) {
294 if (kReadDebug) {
295 printf("queue: %ssignalling writable_ of %p\n",
296 read_data->writable_start ? "not " : "", this);
297 }
298 if (!read_data->writable_start) writable_.Signal();
Brian Silvermana6d1b562013-09-01 14:39:39 -0700299 }
300}
Brian Silverman797e71e2013-09-06 17:29:39 -0700301bool RawQueue::ReadCommonStart(int options, int *index, ReadData *read_data) {
302 read_data->writable_start = is_writable();
Brian Silvermana6d1b562013-09-01 14:39:39 -0700303 while (data_start_ == data_end_ || ((index != NULL) && messages_ <= *index)) {
304 if (options & kNonBlock) {
305 if (kReadDebug) {
306 printf("queue: not going to block waiting on %p\n", this);
307 }
308 return false;
309 } else { // kBlock
310 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700311 printf("queue: going to wait for readable_ of %p\n", this);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700312 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700313 // Wait for a message to become readable.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700314 readable_.Wait();
315 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700316 printf("queue: done waiting for readable_ of %p\n", this);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700317 }
Brian Silvermana6d1b562013-09-01 14:39:39 -0700318 }
319 }
320 if (kReadDebug) {
321 printf("queue: %p->read start=%d end=%d\n", this, data_start_, data_end_);
322 }
323 return true;
324}
Brian Silverman08661c72013-09-01 17:24:38 -0700325void *RawQueue::ReadPeek(int options, int start) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700326 void *ret;
327 if (options & kFromEnd) {
328 int pos = data_end_ - 1;
329 if (pos < 0) { // if it needs to wrap
330 pos = data_length_ - 1;
331 }
332 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700333 printf("queue: %p reading from line %d: %d\n", this, __LINE__, pos);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700334 }
335 ret = data_[pos];
336 } else {
337 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700338 printf("queue: %p reading from line %d: %d\n", this, __LINE__, start);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700339 }
340 ret = data_[start];
341 }
342 MessageHeader *const header = MessageHeader::Get(ret);
343 ++header->ref_count;
344 if (kRefDebug) {
345 printf("ref inc count: %p\n", ret);
346 }
347 return ret;
348}
Brian Silverman08661c72013-09-01 17:24:38 -0700349const void *RawQueue::ReadMessage(int options) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700350 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700351 printf("queue: %p->ReadMessage(%x)\n", this, options);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700352 }
353 void *msg = NULL;
Brian Silverman797e71e2013-09-06 17:29:39 -0700354
Brian Silvermana6d1b562013-09-01 14:39:39 -0700355 MutexLocker locker(&data_lock_);
Brian Silverman797e71e2013-09-06 17:29:39 -0700356
357 ReadData read_data;
358 if (!ReadCommonStart(options, NULL, &read_data)) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700359 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700360 printf("queue: %p common returned false\n", this);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700361 }
362 return NULL;
363 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700364
Brian Silvermana6d1b562013-09-01 14:39:39 -0700365 if (options & kPeek) {
366 msg = ReadPeek(options, data_start_);
367 } else {
368 if (options & kFromEnd) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700369 while (true) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700370 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700371 printf("queue: %p start of c2\n", this);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700372 }
373 // This loop pulls each message out of the buffer.
374 const int pos = data_start_;
375 data_start_ = (data_start_ + 1) % data_length_;
Brian Silverman797e71e2013-09-06 17:29:39 -0700376 // If this is the last one.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700377 if (data_start_ == data_end_) {
378 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700379 printf("queue: %p reading from c2: %d\n", this, pos);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700380 }
381 msg = data_[pos];
382 break;
383 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700384 // This message is not going to be in the queue any more.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700385 DecrementMessageReferenceCount(data_[pos]);
386 }
387 } else {
388 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700389 printf("queue: %p reading from d2: %d\n", this, data_start_);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700390 }
391 msg = data_[data_start_];
Brian Silverman797e71e2013-09-06 17:29:39 -0700392 // TODO(brians): Doesn't this need to increment the ref count?
Brian Silvermana6d1b562013-09-01 14:39:39 -0700393 data_start_ = (data_start_ + 1) % data_length_;
394 }
395 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700396 ReadCommonEnd(&read_data);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700397 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700398 printf("queue: %p read returning %p\n", this, msg);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700399 }
400 return msg;
401}
Brian Silverman08661c72013-09-01 17:24:38 -0700402const void *RawQueue::ReadMessageIndex(int options, int *index) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700403 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700404 printf("queue: %p->ReadMessageIndex(%x, %p(*=%d))\n",
Brian Silvermana6d1b562013-09-01 14:39:39 -0700405 this, options, index, *index);
406 }
407 void *msg = NULL;
Brian Silverman797e71e2013-09-06 17:29:39 -0700408
409 MutexLocker locker(&data_lock_);
410
411 ReadData read_data;
412 if (!ReadCommonStart(options, index, &read_data)) {
413 if (kReadDebug) {
414 printf("queue: %p common returned false\n", this);
415 }
416 return NULL;
417 }
418
419 // TODO(parker): Handle integer wrap on the index.
420
421 // How many unread messages we have.
422 const int offset = messages_ - *index;
423 // Where we're going to start reading.
424 int my_start = data_end_ - offset;
425 if (my_start < 0) { // If we want to read off the end of the buffer.
426 // Unwrap it.
427 my_start += data_length_;
428 }
429 if (offset >= data_length_) { // If we're behind the available messages.
430 // Catch index up to the last available message.
431 *index += data_start_ - my_start;
432 // And that's the one we're going to read.
433 my_start = data_start_;
434 }
435 if (options & kPeek) {
436 msg = ReadPeek(options, my_start);
437 } else {
438 if (options & kFromEnd) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700439 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700440 printf("queue: %p start of c1\n", this);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700441 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700442 int pos = data_end_ - 1;
443 if (pos < 0) { // If it wrapped.
444 pos = data_length_ - 1; // Unwrap it.
445 }
446 if (kReadDebug) {
447 printf("queue: %p reading from c1: %d\n", this, pos);
448 }
449 msg = data_[pos];
450 *index = messages_;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700451 } else {
Brian Silverman797e71e2013-09-06 17:29:39 -0700452 if (kReadDebug) {
453 printf("queue: %p reading from d1: %d\n", this, my_start);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700454 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700455 msg = data_[my_start];
456 ++(*index);
457 }
458 MessageHeader *const header = MessageHeader::Get(msg);
459 ++header->ref_count;
460 if (kRefDebug) {
461 printf("ref_inc_count: %p\n", msg);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700462 }
463 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700464 ReadCommonEnd(&read_data);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700465 return msg;
466}
467
Brian Silverman08661c72013-09-01 17:24:38 -0700468void *RawQueue::GetMessage() {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700469 MutexLocker locker(&pool_lock_);
470 MessageHeader *header;
471 if (pool_length_ - messages_used_ > 0) {
472 header = pool_[messages_used_];
473 } else {
474 if (pool_length_ >= mem_length_) {
Brian Silverman08661c72013-09-01 17:24:38 -0700475 LOG(FATAL, "overused pool of queue %p\n", this);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700476 }
477 header = pool_[pool_length_] =
478 static_cast<MessageHeader *>(shm_malloc(msg_length_));
479 ++pool_length_;
480 }
481 void *msg = reinterpret_cast<uint8_t *>(header) + sizeof(MessageHeader);
482 header->ref_count = 1;
483 if (kRefDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700484 printf("%p ref alloc: %p\n", this, msg);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700485 }
486 header->index = messages_used_;
487 ++messages_used_;
488 return msg;
489}
490
491} // namespace aos