blob: 8a2f15bd7922a50bfd39c7f36bec023bbcb41d3d [file] [log] [blame]
Brian Silverman14fd0fb2014-01-14 21:42:01 -08001#include "aos/linux_code/ipc_lib/queue.h"
Brian Silvermana6d1b562013-09-01 14:39:39 -07002
3#include <stdio.h>
4#include <string.h>
5#include <errno.h>
6#include <assert.h>
7
8#include <memory>
9
10#include "aos/common/logging/logging.h"
11#include "aos/common/type_traits.h"
Brian Silverman14fd0fb2014-01-14 21:42:01 -080012#include "aos/linux_code/ipc_lib/core_lib.h"
Brian Silvermana6d1b562013-09-01 14:39:39 -070013
14namespace aos {
Brian Silvermana6d1b562013-09-01 14:39:39 -070015namespace {
16
Brian Silverman08661c72013-09-01 17:24:38 -070017static_assert(shm_ok<RawQueue>::value,
18 "RawQueue instances go into shared memory");
Brian Silvermana6d1b562013-09-01 14:39:39 -070019
20const bool kReadDebug = false;
21const bool kWriteDebug = false;
22const bool kRefDebug = false;
23const bool kFetchDebug = false;
24
25// The number of extra messages the pool associated with each queue will be able
Brian Silverman08661c72013-09-01 17:24:38 -070026// to hold (for readers who are slow about freeing them or who leak one when
27// they get killed).
Brian Silvermana6d1b562013-09-01 14:39:39 -070028const int kExtraMessages = 20;
29
30} // namespace
31
Brian Silverman08661c72013-09-01 17:24:38 -070032const int RawQueue::kPeek;
33const int RawQueue::kFromEnd;
34const int RawQueue::kNonBlock;
35const int RawQueue::kBlock;
36const int RawQueue::kOverride;
37
38struct RawQueue::MessageHeader {
Brian Silvermana6d1b562013-09-01 14:39:39 -070039 int ref_count;
40 int index; // in pool_
Brian Silverman5f8c4922014-02-11 21:22:38 -080041 // Gets the message header immediately preceding msg.
Brian Silvermana6d1b562013-09-01 14:39:39 -070042 static MessageHeader *Get(const void *msg) {
Brian Silverman63cf2412013-11-17 05:44:36 -080043 return reinterpret_cast<MessageHeader *>(__builtin_assume_aligned(
44 static_cast<uint8_t *>(const_cast<void *>(msg)) - sizeof(MessageHeader),
45 alignof(MessageHeader)));
Brian Silvermana6d1b562013-09-01 14:39:39 -070046 }
47 void Swap(MessageHeader *other) {
48 MessageHeader temp;
49 memcpy(&temp, other, sizeof(temp));
50 memcpy(other, this, sizeof(*other));
51 memcpy(this, &temp, sizeof(*this));
52 }
53};
Brian Silverman5f8c4922014-02-11 21:22:38 -080054static_assert(shm_ok<RawQueue::MessageHeader>::value,
55 "the whole point is to stick it in shared memory");
Brian Silvermana6d1b562013-09-01 14:39:39 -070056
Brian Silverman797e71e2013-09-06 17:29:39 -070057struct RawQueue::ReadData {
58 bool writable_start;
59};
60
Brian Silvermana6d1b562013-09-01 14:39:39 -070061// TODO(brians) maybe do this with atomic integer instructions so it doesn't
62// have to lock/unlock pool_lock_
Brian Silverman08661c72013-09-01 17:24:38 -070063void RawQueue::DecrementMessageReferenceCount(const void *msg) {
Brian Silvermana6d1b562013-09-01 14:39:39 -070064 MutexLocker locker(&pool_lock_);
65 MessageHeader *header = MessageHeader::Get(msg);
66 --header->ref_count;
67 assert(header->ref_count >= 0);
68 if (kRefDebug) {
69 printf("ref_dec_count: %p count=%d\n", msg, header->ref_count);
70 }
71 if (header->ref_count == 0) {
72 DoFreeMessage(msg);
73 }
74}
75
Brian Silverman08661c72013-09-01 17:24:38 -070076RawQueue::RawQueue(const char *name, size_t length, int hash, int queue_length)
Brian Silverman5f8c4922014-02-11 21:22:38 -080077 : readable_(&data_lock_), writable_(&data_lock_) {
Brian Silvermana6d1b562013-09-01 14:39:39 -070078 const size_t name_size = strlen(name) + 1;
79 char *temp = static_cast<char *>(shm_malloc(name_size));
80 memcpy(temp, name, name_size);
81 name_ = temp;
82 length_ = length;
83 hash_ = hash;
84 queue_length_ = queue_length;
85
86 next_ = NULL;
87 recycle_ = NULL;
88
89 if (kFetchDebug) {
90 printf("initializing name=%s, length=%zd, hash=%d, queue_length=%d\n",
91 name, length, hash, queue_length);
92 }
93
94 data_length_ = queue_length + 1;
95 if (data_length_ < 2) { // TODO(brians) when could this happen?
96 data_length_ = 2;
97 }
98 data_ = static_cast<void **>(shm_malloc(sizeof(void *) * data_length_));
99 data_start_ = 0;
100 data_end_ = 0;
101 messages_ = 0;
102
103 mem_length_ = queue_length + kExtraMessages;
104 pool_length_ = 0;
105 messages_used_ = 0;
106 msg_length_ = length + sizeof(MessageHeader);
107 pool_ = static_cast<MessageHeader **>(
108 shm_malloc(sizeof(MessageHeader *) * mem_length_));
109
110 if (kFetchDebug) {
111 printf("made queue %s\n", name);
112 }
113}
Brian Silverman08661c72013-09-01 17:24:38 -0700114RawQueue *RawQueue::Fetch(const char *name, size_t length, int hash,
Brian Silvermana6d1b562013-09-01 14:39:39 -0700115 int queue_length) {
116 if (kFetchDebug) {
117 printf("fetching queue %s\n", name);
118 }
119 if (mutex_lock(&global_core->mem_struct->queues.alloc_lock) != 0) {
120 return NULL;
121 }
Brian Silverman08661c72013-09-01 17:24:38 -0700122 RawQueue *current = static_cast<RawQueue *>(
Brian Silvermana6d1b562013-09-01 14:39:39 -0700123 global_core->mem_struct->queues.queue_list);
Brian Silverman797e71e2013-09-06 17:29:39 -0700124 if (current != NULL) {
125 while (true) {
126 // If we found a matching queue.
127 if (strcmp(current->name_, name) == 0 && current->length_ == length &&
128 current->hash_ == hash && current->queue_length_ == queue_length) {
129 mutex_unlock(&global_core->mem_struct->queues.alloc_lock);
130 return current;
131 } else {
132 if (kFetchDebug) {
133 printf("rejected queue %s strcmp=%d target=%s\n", current->name_,
134 strcmp(current->name_, name), name);
135 }
Brian Silvermana6d1b562013-09-01 14:39:39 -0700136 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700137 // If this is the last one.
138 if (current->next_ == NULL) break;
139 current = current->next_;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700140 }
Brian Silvermana6d1b562013-09-01 14:39:39 -0700141 }
142
Brian Silverman797e71e2013-09-06 17:29:39 -0700143 RawQueue *r = new (shm_malloc(sizeof(RawQueue)))
144 RawQueue(name, length, hash, queue_length);
145 if (current == NULL) { // if we don't already have one
146 global_core->mem_struct->queues.queue_list = r;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700147 } else {
Brian Silverman797e71e2013-09-06 17:29:39 -0700148 current->next_ = r;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700149 }
150
151 mutex_unlock(&global_core->mem_struct->queues.alloc_lock);
Brian Silverman797e71e2013-09-06 17:29:39 -0700152 return r;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700153}
Brian Silverman08661c72013-09-01 17:24:38 -0700154RawQueue *RawQueue::Fetch(const char *name, size_t length, int hash,
Brian Silvermana6d1b562013-09-01 14:39:39 -0700155 int queue_length,
Brian Silverman08661c72013-09-01 17:24:38 -0700156 int recycle_hash, int recycle_length, RawQueue **recycle) {
157 RawQueue *r = Fetch(name, length, hash, queue_length);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700158 r->recycle_ = Fetch(name, length, recycle_hash, recycle_length);
159 if (r == r->recycle_) {
160 fprintf(stderr, "queue: r->recycle_(=%p) == r(=%p)\n", r->recycle_, r);
161 printf("see stderr\n");
Brian Silverman797e71e2013-09-06 17:29:39 -0700162 r->recycle_ = NULL;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700163 abort();
164 }
165 *recycle = r->recycle_;
166 return r;
167}
168
Brian Silverman08661c72013-09-01 17:24:38 -0700169void RawQueue::DoFreeMessage(const void *msg) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700170 MessageHeader *header = MessageHeader::Get(msg);
171 if (pool_[header->index] != header) { // if something's messed up
172 fprintf(stderr, "queue: something is very very wrong with queue %p."
173 " pool_(=%p)[header->index(=%d)] != header(=%p)\n",
174 this, pool_, header->index, header);
175 printf("queue: see stderr\n");
176 abort();
177 }
178 if (kRefDebug) {
179 printf("ref free: %p\n", msg);
180 }
181 --messages_used_;
182
183 if (recycle_ != NULL) {
184 void *const new_msg = recycle_->GetMessage();
185 if (new_msg == NULL) {
186 fprintf(stderr, "queue: couldn't get a message"
187 " for recycle queue %p\n", recycle_);
188 } else {
189 // Take a message from recycle_ and switch its
190 // header with the one being freed, which effectively
191 // switches which queue each message belongs to.
192 MessageHeader *const new_header = MessageHeader::Get(new_msg);
Brian Silverman797e71e2013-09-06 17:29:39 -0700193 // Also switch the messages between the pools.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700194 pool_[header->index] = new_header;
195 {
196 MutexLocker locker(&recycle_->pool_lock_);
197 recycle_->pool_[new_header->index] = header;
Brian Silverman797e71e2013-09-06 17:29:39 -0700198 // Swap the information in both headers.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700199 header->Swap(new_header);
Brian Silverman797e71e2013-09-06 17:29:39 -0700200 // Don't unlock the other pool until all of its messages are valid.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700201 }
202 // use the header for new_msg which is now for this pool
203 header = new_header;
204 if (!recycle_->WriteMessage(const_cast<void *>(msg), kOverride)) {
205 fprintf(stderr, "queue: %p->WriteMessage(%p, kOverride) failed."
206 " aborting\n", recycle_, msg);
207 printf("see stderr\n");
208 abort();
209 }
210 msg = new_msg;
211 }
212 }
213
Brian Silverman797e71e2013-09-06 17:29:39 -0700214 // Where the one we're freeing was.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700215 int index = header->index;
216 header->index = -1;
217 if (index != messages_used_) { // if we're not freeing the one on the end
Brian Silverman797e71e2013-09-06 17:29:39 -0700218 // Put the last one where the one we're freeing was.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700219 header = pool_[index] = pool_[messages_used_];
Brian Silverman797e71e2013-09-06 17:29:39 -0700220 // Put the one we're freeing at the end.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700221 pool_[messages_used_] = MessageHeader::Get(msg);
Brian Silverman797e71e2013-09-06 17:29:39 -0700222 // Update the former last one's index.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700223 header->index = index;
224 }
225}
226
Brian Silverman08661c72013-09-01 17:24:38 -0700227bool RawQueue::WriteMessage(void *msg, int options) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700228 if (kWriteDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700229 printf("queue: %p->WriteMessage(%p, %x)\n", this, msg, options);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700230 }
231 if (msg == NULL || msg < reinterpret_cast<void *>(global_core->mem_struct) ||
232 msg > static_cast<void *>((
Brian Silverman08661c72013-09-01 17:24:38 -0700233 reinterpret_cast<char *>(global_core->mem_struct) +
Brian Silvermana6d1b562013-09-01 14:39:39 -0700234 global_core->size))) {
235 fprintf(stderr, "queue: attempt to write bad message %p to %p. aborting\n",
236 msg, this);
237 printf("see stderr\n");
238 abort();
239 }
240 {
241 MutexLocker locker(&data_lock_);
Brian Silverman797e71e2013-09-06 17:29:39 -0700242 bool writable_waited = false;
243
244 int new_end;
245 while (true) {
246 new_end = (data_end_ + 1) % data_length_;
247 // If there is room in the queue right now.
248 if (new_end != data_start_) break;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700249 if (options & kNonBlock) {
250 if (kWriteDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700251 printf("queue: not blocking on %p. returning false\n", this);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700252 }
253 return false;
254 } else if (options & kOverride) {
255 if (kWriteDebug) {
256 printf("queue: overriding on %p\n", this);
257 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700258 // Avoid leaking the message that we're going to overwrite.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700259 DecrementMessageReferenceCount(data_[data_start_]);
260 data_start_ = (data_start_ + 1) % data_length_;
261 } else { // kBlock
262 if (kWriteDebug) {
263 printf("queue: going to wait for writable_ of %p\n", this);
264 }
Brian Silverman08661c72013-09-01 17:24:38 -0700265 writable_.Wait();
Brian Silverman797e71e2013-09-06 17:29:39 -0700266 writable_waited = true;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700267 }
Brian Silvermana6d1b562013-09-01 14:39:39 -0700268 }
269 data_[data_end_] = msg;
270 ++messages_;
271 data_end_ = new_end;
Brian Silverman797e71e2013-09-06 17:29:39 -0700272
273 if (kWriteDebug) {
274 printf("queue: broadcasting to readable_ of %p\n", this);
275 }
276 readable_.Broadcast();
277
278 // If we got a signal on writable_ here and it's still writable, then we
279 // need to signal the next person in line (if any).
280 if (writable_waited && is_writable()) {
281 if (kWriteDebug) {
282 printf("queue: resignalling writable_ of %p\n", this);
283 }
284 writable_.Signal();
285 }
Brian Silvermana6d1b562013-09-01 14:39:39 -0700286 }
287 if (kWriteDebug) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700288 printf("queue: write returning true on queue %p\n", this);
289 }
290 return true;
291}
292
Brian Silverman797e71e2013-09-06 17:29:39 -0700293void RawQueue::ReadCommonEnd(ReadData *read_data) {
294 if (is_writable()) {
295 if (kReadDebug) {
296 printf("queue: %ssignalling writable_ of %p\n",
297 read_data->writable_start ? "not " : "", this);
298 }
299 if (!read_data->writable_start) writable_.Signal();
Brian Silvermana6d1b562013-09-01 14:39:39 -0700300 }
301}
Brian Silverman797e71e2013-09-06 17:29:39 -0700302bool RawQueue::ReadCommonStart(int options, int *index, ReadData *read_data) {
303 read_data->writable_start = is_writable();
Brian Silvermana6d1b562013-09-01 14:39:39 -0700304 while (data_start_ == data_end_ || ((index != NULL) && messages_ <= *index)) {
305 if (options & kNonBlock) {
306 if (kReadDebug) {
307 printf("queue: not going to block waiting on %p\n", this);
308 }
309 return false;
310 } else { // kBlock
311 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700312 printf("queue: going to wait for readable_ of %p\n", this);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700313 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700314 // Wait for a message to become readable.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700315 readable_.Wait();
316 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700317 printf("queue: done waiting for readable_ of %p\n", this);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700318 }
Brian Silvermana6d1b562013-09-01 14:39:39 -0700319 }
320 }
321 if (kReadDebug) {
322 printf("queue: %p->read start=%d end=%d\n", this, data_start_, data_end_);
323 }
324 return true;
325}
Brian Silverman08661c72013-09-01 17:24:38 -0700326void *RawQueue::ReadPeek(int options, int start) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700327 void *ret;
328 if (options & kFromEnd) {
329 int pos = data_end_ - 1;
330 if (pos < 0) { // if it needs to wrap
331 pos = data_length_ - 1;
332 }
333 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700334 printf("queue: %p reading from line %d: %d\n", this, __LINE__, pos);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700335 }
336 ret = data_[pos];
337 } else {
338 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700339 printf("queue: %p reading from line %d: %d\n", this, __LINE__, start);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700340 }
341 ret = data_[start];
342 }
343 MessageHeader *const header = MessageHeader::Get(ret);
344 ++header->ref_count;
345 if (kRefDebug) {
346 printf("ref inc count: %p\n", ret);
347 }
348 return ret;
349}
Brian Silverman08661c72013-09-01 17:24:38 -0700350const void *RawQueue::ReadMessage(int options) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700351 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700352 printf("queue: %p->ReadMessage(%x)\n", this, options);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700353 }
354 void *msg = NULL;
Brian Silverman797e71e2013-09-06 17:29:39 -0700355
Brian Silvermana6d1b562013-09-01 14:39:39 -0700356 MutexLocker locker(&data_lock_);
Brian Silverman797e71e2013-09-06 17:29:39 -0700357
358 ReadData read_data;
359 if (!ReadCommonStart(options, NULL, &read_data)) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700360 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700361 printf("queue: %p common returned false\n", this);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700362 }
363 return NULL;
364 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700365
Brian Silvermana6d1b562013-09-01 14:39:39 -0700366 if (options & kPeek) {
367 msg = ReadPeek(options, data_start_);
368 } else {
369 if (options & kFromEnd) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700370 while (true) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700371 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700372 printf("queue: %p start of c2\n", this);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700373 }
374 // This loop pulls each message out of the buffer.
375 const int pos = data_start_;
376 data_start_ = (data_start_ + 1) % data_length_;
Brian Silverman797e71e2013-09-06 17:29:39 -0700377 // If this is the last one.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700378 if (data_start_ == data_end_) {
379 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700380 printf("queue: %p reading from c2: %d\n", this, pos);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700381 }
382 msg = data_[pos];
383 break;
384 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700385 // This message is not going to be in the queue any more.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700386 DecrementMessageReferenceCount(data_[pos]);
387 }
388 } else {
389 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700390 printf("queue: %p reading from d2: %d\n", this, data_start_);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700391 }
392 msg = data_[data_start_];
Brian Silverman797e71e2013-09-06 17:29:39 -0700393 // TODO(brians): Doesn't this need to increment the ref count?
Brian Silvermana6d1b562013-09-01 14:39:39 -0700394 data_start_ = (data_start_ + 1) % data_length_;
395 }
396 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700397 ReadCommonEnd(&read_data);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700398 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700399 printf("queue: %p read returning %p\n", this, msg);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700400 }
401 return msg;
402}
Brian Silverman08661c72013-09-01 17:24:38 -0700403const void *RawQueue::ReadMessageIndex(int options, int *index) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700404 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700405 printf("queue: %p->ReadMessageIndex(%x, %p(*=%d))\n",
Brian Silvermana6d1b562013-09-01 14:39:39 -0700406 this, options, index, *index);
407 }
408 void *msg = NULL;
Brian Silverman797e71e2013-09-06 17:29:39 -0700409
410 MutexLocker locker(&data_lock_);
411
412 ReadData read_data;
413 if (!ReadCommonStart(options, index, &read_data)) {
414 if (kReadDebug) {
415 printf("queue: %p common returned false\n", this);
416 }
417 return NULL;
418 }
419
420 // TODO(parker): Handle integer wrap on the index.
421
422 // How many unread messages we have.
423 const int offset = messages_ - *index;
424 // Where we're going to start reading.
425 int my_start = data_end_ - offset;
426 if (my_start < 0) { // If we want to read off the end of the buffer.
427 // Unwrap it.
428 my_start += data_length_;
429 }
430 if (offset >= data_length_) { // If we're behind the available messages.
431 // Catch index up to the last available message.
432 *index += data_start_ - my_start;
433 // And that's the one we're going to read.
434 my_start = data_start_;
435 }
436 if (options & kPeek) {
437 msg = ReadPeek(options, my_start);
438 } else {
439 if (options & kFromEnd) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700440 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700441 printf("queue: %p start of c1\n", this);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700442 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700443 int pos = data_end_ - 1;
444 if (pos < 0) { // If it wrapped.
445 pos = data_length_ - 1; // Unwrap it.
446 }
447 if (kReadDebug) {
448 printf("queue: %p reading from c1: %d\n", this, pos);
449 }
450 msg = data_[pos];
451 *index = messages_;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700452 } else {
Brian Silverman797e71e2013-09-06 17:29:39 -0700453 if (kReadDebug) {
454 printf("queue: %p reading from d1: %d\n", this, my_start);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700455 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700456 msg = data_[my_start];
457 ++(*index);
458 }
459 MessageHeader *const header = MessageHeader::Get(msg);
460 ++header->ref_count;
461 if (kRefDebug) {
462 printf("ref_inc_count: %p\n", msg);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700463 }
464 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700465 ReadCommonEnd(&read_data);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700466 return msg;
467}
468
Brian Silverman08661c72013-09-01 17:24:38 -0700469void *RawQueue::GetMessage() {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700470 MutexLocker locker(&pool_lock_);
471 MessageHeader *header;
472 if (pool_length_ - messages_used_ > 0) {
473 header = pool_[messages_used_];
474 } else {
475 if (pool_length_ >= mem_length_) {
Brian Silverman08661c72013-09-01 17:24:38 -0700476 LOG(FATAL, "overused pool of queue %p\n", this);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700477 }
478 header = pool_[pool_length_] =
479 static_cast<MessageHeader *>(shm_malloc(msg_length_));
480 ++pool_length_;
481 }
482 void *msg = reinterpret_cast<uint8_t *>(header) + sizeof(MessageHeader);
483 header->ref_count = 1;
484 if (kRefDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700485 printf("%p ref alloc: %p\n", this, msg);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700486 }
487 header->index = messages_used_;
488 ++messages_used_;
489 return msg;
490}
491
492} // namespace aos