blob: 8bcfa19c8dca822594dcb72ca6dbf42b6ea6982a [file] [log] [blame]
Brian Silverman14fd0fb2014-01-14 21:42:01 -08001#include "aos/linux_code/ipc_lib/queue.h"
Brian Silvermana6d1b562013-09-01 14:39:39 -07002
3#include <stdio.h>
4#include <string.h>
5#include <errno.h>
6#include <assert.h>
7
8#include <memory>
Brian Silvermanc39e2bd2014-02-21 09:17:35 -08009#include <algorithm>
Brian Silvermana6d1b562013-09-01 14:39:39 -070010
11#include "aos/common/logging/logging.h"
12#include "aos/common/type_traits.h"
Brian Silverman14fd0fb2014-01-14 21:42:01 -080013#include "aos/linux_code/ipc_lib/core_lib.h"
Brian Silvermana6d1b562013-09-01 14:39:39 -070014
15namespace aos {
Brian Silvermana6d1b562013-09-01 14:39:39 -070016namespace {
17
Brian Silverman08661c72013-09-01 17:24:38 -070018static_assert(shm_ok<RawQueue>::value,
19 "RawQueue instances go into shared memory");
Brian Silvermana6d1b562013-09-01 14:39:39 -070020
21const bool kReadDebug = false;
22const bool kWriteDebug = false;
23const bool kRefDebug = false;
24const bool kFetchDebug = false;
25
26// The number of extra messages the pool associated with each queue will be able
Brian Silverman08661c72013-09-01 17:24:38 -070027// to hold (for readers who are slow about freeing them or who leak one when
28// they get killed).
Brian Silvermana6d1b562013-09-01 14:39:39 -070029const int kExtraMessages = 20;
30
31} // namespace
32
Brian Silverman08661c72013-09-01 17:24:38 -070033const int RawQueue::kPeek;
34const int RawQueue::kFromEnd;
35const int RawQueue::kNonBlock;
36const int RawQueue::kBlock;
37const int RawQueue::kOverride;
38
39struct RawQueue::MessageHeader {
Brian Silvermana6d1b562013-09-01 14:39:39 -070040 int ref_count;
41 int index; // in pool_
Brian Silverman5f8c4922014-02-11 21:22:38 -080042 // Gets the message header immediately preceding msg.
Brian Silvermana6d1b562013-09-01 14:39:39 -070043 static MessageHeader *Get(const void *msg) {
Brian Silverman63cf2412013-11-17 05:44:36 -080044 return reinterpret_cast<MessageHeader *>(__builtin_assume_aligned(
45 static_cast<uint8_t *>(const_cast<void *>(msg)) - sizeof(MessageHeader),
46 alignof(MessageHeader)));
Brian Silvermana6d1b562013-09-01 14:39:39 -070047 }
48 void Swap(MessageHeader *other) {
49 MessageHeader temp;
50 memcpy(&temp, other, sizeof(temp));
51 memcpy(other, this, sizeof(*other));
52 memcpy(this, &temp, sizeof(*this));
53 }
54};
Brian Silverman5f8c4922014-02-11 21:22:38 -080055static_assert(shm_ok<RawQueue::MessageHeader>::value,
56 "the whole point is to stick it in shared memory");
Brian Silvermana6d1b562013-09-01 14:39:39 -070057
Brian Silverman797e71e2013-09-06 17:29:39 -070058struct RawQueue::ReadData {
59 bool writable_start;
60};
61
Brian Silvermana6d1b562013-09-01 14:39:39 -070062// TODO(brians) maybe do this with atomic integer instructions so it doesn't
63// have to lock/unlock pool_lock_
Brian Silverman08661c72013-09-01 17:24:38 -070064void RawQueue::DecrementMessageReferenceCount(const void *msg) {
Brian Silvermana6d1b562013-09-01 14:39:39 -070065 MutexLocker locker(&pool_lock_);
66 MessageHeader *header = MessageHeader::Get(msg);
67 --header->ref_count;
68 assert(header->ref_count >= 0);
69 if (kRefDebug) {
70 printf("ref_dec_count: %p count=%d\n", msg, header->ref_count);
71 }
72 if (header->ref_count == 0) {
73 DoFreeMessage(msg);
74 }
75}
76
Brian Silverman08661c72013-09-01 17:24:38 -070077RawQueue::RawQueue(const char *name, size_t length, int hash, int queue_length)
Brian Silverman5f8c4922014-02-11 21:22:38 -080078 : readable_(&data_lock_), writable_(&data_lock_) {
Brian Silvermana6d1b562013-09-01 14:39:39 -070079 const size_t name_size = strlen(name) + 1;
80 char *temp = static_cast<char *>(shm_malloc(name_size));
81 memcpy(temp, name, name_size);
82 name_ = temp;
83 length_ = length;
84 hash_ = hash;
85 queue_length_ = queue_length;
86
87 next_ = NULL;
88 recycle_ = NULL;
89
90 if (kFetchDebug) {
91 printf("initializing name=%s, length=%zd, hash=%d, queue_length=%d\n",
92 name, length, hash, queue_length);
93 }
94
95 data_length_ = queue_length + 1;
96 if (data_length_ < 2) { // TODO(brians) when could this happen?
97 data_length_ = 2;
98 }
99 data_ = static_cast<void **>(shm_malloc(sizeof(void *) * data_length_));
100 data_start_ = 0;
101 data_end_ = 0;
102 messages_ = 0;
103
104 mem_length_ = queue_length + kExtraMessages;
105 pool_length_ = 0;
106 messages_used_ = 0;
107 msg_length_ = length + sizeof(MessageHeader);
108 pool_ = static_cast<MessageHeader **>(
109 shm_malloc(sizeof(MessageHeader *) * mem_length_));
110
111 if (kFetchDebug) {
112 printf("made queue %s\n", name);
113 }
114}
Brian Silverman08661c72013-09-01 17:24:38 -0700115RawQueue *RawQueue::Fetch(const char *name, size_t length, int hash,
Brian Silvermana6d1b562013-09-01 14:39:39 -0700116 int queue_length) {
117 if (kFetchDebug) {
118 printf("fetching queue %s\n", name);
119 }
Brian Silverman4aeac5f2014-02-11 22:17:07 -0800120 if (mutex_lock(&global_core->mem_struct->queues.lock) != 0) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700121 return NULL;
122 }
Brian Silverman08661c72013-09-01 17:24:38 -0700123 RawQueue *current = static_cast<RawQueue *>(
Brian Silverman4aeac5f2014-02-11 22:17:07 -0800124 global_core->mem_struct->queues.pointer);
Brian Silverman797e71e2013-09-06 17:29:39 -0700125 if (current != NULL) {
126 while (true) {
127 // If we found a matching queue.
128 if (strcmp(current->name_, name) == 0 && current->length_ == length &&
129 current->hash_ == hash && current->queue_length_ == queue_length) {
Brian Silverman4aeac5f2014-02-11 22:17:07 -0800130 mutex_unlock(&global_core->mem_struct->queues.lock);
Brian Silverman797e71e2013-09-06 17:29:39 -0700131 return current;
132 } else {
133 if (kFetchDebug) {
134 printf("rejected queue %s strcmp=%d target=%s\n", current->name_,
135 strcmp(current->name_, name), name);
136 }
Brian Silvermana6d1b562013-09-01 14:39:39 -0700137 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700138 // If this is the last one.
139 if (current->next_ == NULL) break;
140 current = current->next_;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700141 }
Brian Silvermana6d1b562013-09-01 14:39:39 -0700142 }
143
Brian Silverman797e71e2013-09-06 17:29:39 -0700144 RawQueue *r = new (shm_malloc(sizeof(RawQueue)))
145 RawQueue(name, length, hash, queue_length);
146 if (current == NULL) { // if we don't already have one
Brian Silverman4aeac5f2014-02-11 22:17:07 -0800147 global_core->mem_struct->queues.pointer = r;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700148 } else {
Brian Silverman797e71e2013-09-06 17:29:39 -0700149 current->next_ = r;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700150 }
151
Brian Silverman4aeac5f2014-02-11 22:17:07 -0800152 mutex_unlock(&global_core->mem_struct->queues.lock);
Brian Silverman797e71e2013-09-06 17:29:39 -0700153 return r;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700154}
Brian Silverman08661c72013-09-01 17:24:38 -0700155RawQueue *RawQueue::Fetch(const char *name, size_t length, int hash,
Brian Silvermana6d1b562013-09-01 14:39:39 -0700156 int queue_length,
Brian Silverman08661c72013-09-01 17:24:38 -0700157 int recycle_hash, int recycle_length, RawQueue **recycle) {
158 RawQueue *r = Fetch(name, length, hash, queue_length);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700159 r->recycle_ = Fetch(name, length, recycle_hash, recycle_length);
160 if (r == r->recycle_) {
161 fprintf(stderr, "queue: r->recycle_(=%p) == r(=%p)\n", r->recycle_, r);
162 printf("see stderr\n");
Brian Silverman797e71e2013-09-06 17:29:39 -0700163 r->recycle_ = NULL;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700164 abort();
165 }
166 *recycle = r->recycle_;
167 return r;
168}
169
Brian Silverman08661c72013-09-01 17:24:38 -0700170void RawQueue::DoFreeMessage(const void *msg) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700171 MessageHeader *header = MessageHeader::Get(msg);
172 if (pool_[header->index] != header) { // if something's messed up
173 fprintf(stderr, "queue: something is very very wrong with queue %p."
174 " pool_(=%p)[header->index(=%d)] != header(=%p)\n",
175 this, pool_, header->index, header);
176 printf("queue: see stderr\n");
177 abort();
178 }
179 if (kRefDebug) {
180 printf("ref free: %p\n", msg);
181 }
182 --messages_used_;
183
184 if (recycle_ != NULL) {
185 void *const new_msg = recycle_->GetMessage();
186 if (new_msg == NULL) {
187 fprintf(stderr, "queue: couldn't get a message"
188 " for recycle queue %p\n", recycle_);
189 } else {
190 // Take a message from recycle_ and switch its
191 // header with the one being freed, which effectively
192 // switches which queue each message belongs to.
193 MessageHeader *const new_header = MessageHeader::Get(new_msg);
Brian Silverman797e71e2013-09-06 17:29:39 -0700194 // Also switch the messages between the pools.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700195 pool_[header->index] = new_header;
196 {
197 MutexLocker locker(&recycle_->pool_lock_);
198 recycle_->pool_[new_header->index] = header;
Brian Silverman797e71e2013-09-06 17:29:39 -0700199 // Swap the information in both headers.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700200 header->Swap(new_header);
Brian Silverman797e71e2013-09-06 17:29:39 -0700201 // Don't unlock the other pool until all of its messages are valid.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700202 }
203 // use the header for new_msg which is now for this pool
204 header = new_header;
205 if (!recycle_->WriteMessage(const_cast<void *>(msg), kOverride)) {
206 fprintf(stderr, "queue: %p->WriteMessage(%p, kOverride) failed."
207 " aborting\n", recycle_, msg);
208 printf("see stderr\n");
209 abort();
210 }
211 msg = new_msg;
212 }
213 }
214
Brian Silverman797e71e2013-09-06 17:29:39 -0700215 // Where the one we're freeing was.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700216 int index = header->index;
217 header->index = -1;
218 if (index != messages_used_) { // if we're not freeing the one on the end
Brian Silverman797e71e2013-09-06 17:29:39 -0700219 // Put the last one where the one we're freeing was.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700220 header = pool_[index] = pool_[messages_used_];
Brian Silverman797e71e2013-09-06 17:29:39 -0700221 // Put the one we're freeing at the end.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700222 pool_[messages_used_] = MessageHeader::Get(msg);
Brian Silverman797e71e2013-09-06 17:29:39 -0700223 // Update the former last one's index.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700224 header->index = index;
225 }
226}
227
Brian Silverman08661c72013-09-01 17:24:38 -0700228bool RawQueue::WriteMessage(void *msg, int options) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700229 if (kWriteDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700230 printf("queue: %p->WriteMessage(%p, %x)\n", this, msg, options);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700231 }
232 if (msg == NULL || msg < reinterpret_cast<void *>(global_core->mem_struct) ||
233 msg > static_cast<void *>((
Brian Silverman08661c72013-09-01 17:24:38 -0700234 reinterpret_cast<char *>(global_core->mem_struct) +
Brian Silvermana6d1b562013-09-01 14:39:39 -0700235 global_core->size))) {
236 fprintf(stderr, "queue: attempt to write bad message %p to %p. aborting\n",
237 msg, this);
238 printf("see stderr\n");
239 abort();
240 }
241 {
242 MutexLocker locker(&data_lock_);
Brian Silverman797e71e2013-09-06 17:29:39 -0700243 bool writable_waited = false;
244
245 int new_end;
246 while (true) {
247 new_end = (data_end_ + 1) % data_length_;
248 // If there is room in the queue right now.
249 if (new_end != data_start_) break;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700250 if (options & kNonBlock) {
251 if (kWriteDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700252 printf("queue: not blocking on %p. returning false\n", this);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700253 }
Brian Silverman358c49f2014-03-05 16:56:34 -0800254 DecrementMessageReferenceCount(msg);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700255 return false;
256 } else if (options & kOverride) {
257 if (kWriteDebug) {
258 printf("queue: overriding on %p\n", this);
259 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700260 // Avoid leaking the message that we're going to overwrite.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700261 DecrementMessageReferenceCount(data_[data_start_]);
262 data_start_ = (data_start_ + 1) % data_length_;
263 } else { // kBlock
264 if (kWriteDebug) {
265 printf("queue: going to wait for writable_ of %p\n", this);
266 }
Brian Silverman08661c72013-09-01 17:24:38 -0700267 writable_.Wait();
Brian Silverman797e71e2013-09-06 17:29:39 -0700268 writable_waited = true;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700269 }
Brian Silvermana6d1b562013-09-01 14:39:39 -0700270 }
271 data_[data_end_] = msg;
272 ++messages_;
273 data_end_ = new_end;
Brian Silverman797e71e2013-09-06 17:29:39 -0700274
275 if (kWriteDebug) {
276 printf("queue: broadcasting to readable_ of %p\n", this);
277 }
278 readable_.Broadcast();
279
280 // If we got a signal on writable_ here and it's still writable, then we
281 // need to signal the next person in line (if any).
282 if (writable_waited && is_writable()) {
283 if (kWriteDebug) {
284 printf("queue: resignalling writable_ of %p\n", this);
285 }
286 writable_.Signal();
287 }
Brian Silvermana6d1b562013-09-01 14:39:39 -0700288 }
289 if (kWriteDebug) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700290 printf("queue: write returning true on queue %p\n", this);
291 }
292 return true;
293}
294
Brian Silverman797e71e2013-09-06 17:29:39 -0700295void RawQueue::ReadCommonEnd(ReadData *read_data) {
296 if (is_writable()) {
297 if (kReadDebug) {
298 printf("queue: %ssignalling writable_ of %p\n",
299 read_data->writable_start ? "not " : "", this);
300 }
301 if (!read_data->writable_start) writable_.Signal();
Brian Silvermana6d1b562013-09-01 14:39:39 -0700302 }
303}
Brian Silverman797e71e2013-09-06 17:29:39 -0700304bool RawQueue::ReadCommonStart(int options, int *index, ReadData *read_data) {
305 read_data->writable_start = is_writable();
Brian Silvermana6d1b562013-09-01 14:39:39 -0700306 while (data_start_ == data_end_ || ((index != NULL) && messages_ <= *index)) {
307 if (options & kNonBlock) {
308 if (kReadDebug) {
309 printf("queue: not going to block waiting on %p\n", this);
310 }
311 return false;
312 } else { // kBlock
313 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700314 printf("queue: going to wait for readable_ of %p\n", this);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700315 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700316 // Wait for a message to become readable.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700317 readable_.Wait();
318 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700319 printf("queue: done waiting for readable_ of %p\n", this);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700320 }
Brian Silvermana6d1b562013-09-01 14:39:39 -0700321 }
322 }
323 if (kReadDebug) {
Brian Silvermanc39e2bd2014-02-21 09:17:35 -0800324 printf("queue: %p->read(%p) start=%d end=%d\n", this, index, data_start_,
325 data_end_);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700326 }
327 return true;
328}
Brian Silverman08661c72013-09-01 17:24:38 -0700329void *RawQueue::ReadPeek(int options, int start) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700330 void *ret;
331 if (options & kFromEnd) {
332 int pos = data_end_ - 1;
333 if (pos < 0) { // if it needs to wrap
334 pos = data_length_ - 1;
335 }
336 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700337 printf("queue: %p reading from line %d: %d\n", this, __LINE__, pos);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700338 }
339 ret = data_[pos];
340 } else {
341 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700342 printf("queue: %p reading from line %d: %d\n", this, __LINE__, start);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700343 }
344 ret = data_[start];
345 }
346 MessageHeader *const header = MessageHeader::Get(ret);
347 ++header->ref_count;
348 if (kRefDebug) {
349 printf("ref inc count: %p\n", ret);
350 }
351 return ret;
352}
Brian Silverman08661c72013-09-01 17:24:38 -0700353const void *RawQueue::ReadMessage(int options) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700354 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700355 printf("queue: %p->ReadMessage(%x)\n", this, options);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700356 }
357 void *msg = NULL;
Brian Silverman797e71e2013-09-06 17:29:39 -0700358
Brian Silvermana6d1b562013-09-01 14:39:39 -0700359 MutexLocker locker(&data_lock_);
Brian Silverman797e71e2013-09-06 17:29:39 -0700360
361 ReadData read_data;
362 if (!ReadCommonStart(options, NULL, &read_data)) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700363 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700364 printf("queue: %p common returned false\n", this);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700365 }
366 return NULL;
367 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700368
Brian Silvermana6d1b562013-09-01 14:39:39 -0700369 if (options & kPeek) {
370 msg = ReadPeek(options, data_start_);
371 } else {
372 if (options & kFromEnd) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700373 while (true) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700374 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700375 printf("queue: %p start of c2\n", this);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700376 }
377 // This loop pulls each message out of the buffer.
378 const int pos = data_start_;
379 data_start_ = (data_start_ + 1) % data_length_;
Brian Silverman797e71e2013-09-06 17:29:39 -0700380 // If this is the last one.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700381 if (data_start_ == data_end_) {
382 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700383 printf("queue: %p reading from c2: %d\n", this, pos);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700384 }
385 msg = data_[pos];
386 break;
387 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700388 // This message is not going to be in the queue any more.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700389 DecrementMessageReferenceCount(data_[pos]);
390 }
391 } else {
392 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700393 printf("queue: %p reading from d2: %d\n", this, data_start_);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700394 }
395 msg = data_[data_start_];
396 data_start_ = (data_start_ + 1) % data_length_;
397 }
398 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700399 ReadCommonEnd(&read_data);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700400 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700401 printf("queue: %p read returning %p\n", this, msg);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700402 }
403 return msg;
404}
Brian Silverman08661c72013-09-01 17:24:38 -0700405const void *RawQueue::ReadMessageIndex(int options, int *index) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700406 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700407 printf("queue: %p->ReadMessageIndex(%x, %p(*=%d))\n",
Brian Silvermana6d1b562013-09-01 14:39:39 -0700408 this, options, index, *index);
409 }
410 void *msg = NULL;
Brian Silverman797e71e2013-09-06 17:29:39 -0700411
412 MutexLocker locker(&data_lock_);
413
414 ReadData read_data;
415 if (!ReadCommonStart(options, index, &read_data)) {
416 if (kReadDebug) {
417 printf("queue: %p common returned false\n", this);
418 }
419 return NULL;
420 }
421
422 // TODO(parker): Handle integer wrap on the index.
423
Brian Silverman797e71e2013-09-06 17:29:39 -0700424 // Where we're going to start reading.
Brian Silvermanc39e2bd2014-02-21 09:17:35 -0800425 int my_start;
426
427 const int unread_messages = messages_ - *index;
Austin Schuh287d98e2014-03-09 00:41:55 -0800428 int current_messages = data_end_ - data_start_;
429 if (current_messages < 0) current_messages += data_length_ - 1;
Brian Silvermanc39e2bd2014-02-21 09:17:35 -0800430 if (unread_messages > current_messages) { // If we're behind the available messages.
Brian Silverman797e71e2013-09-06 17:29:39 -0700431 // Catch index up to the last available message.
Brian Silvermanc39e2bd2014-02-21 09:17:35 -0800432 *index = messages_ - current_messages;
Brian Silverman797e71e2013-09-06 17:29:39 -0700433 // And that's the one we're going to read.
434 my_start = data_start_;
Brian Silvermanc39e2bd2014-02-21 09:17:35 -0800435 } else {
436 // Just start reading at the first available message that we haven't yet
437 // read.
Austin Schuh287d98e2014-03-09 00:41:55 -0800438 my_start = (data_end_ - unread_messages) % data_length_;
439 if (my_start < 0) my_start = data_start_ + unread_messages - 1;
Brian Silverman797e71e2013-09-06 17:29:39 -0700440 }
Brian Silvermanc39e2bd2014-02-21 09:17:35 -0800441
Brian Silverman797e71e2013-09-06 17:29:39 -0700442 if (options & kPeek) {
443 msg = ReadPeek(options, my_start);
444 } else {
445 if (options & kFromEnd) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700446 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700447 printf("queue: %p start of c1\n", this);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700448 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700449 int pos = data_end_ - 1;
450 if (pos < 0) { // If it wrapped.
451 pos = data_length_ - 1; // Unwrap it.
452 }
453 if (kReadDebug) {
454 printf("queue: %p reading from c1: %d\n", this, pos);
455 }
456 msg = data_[pos];
457 *index = messages_;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700458 } else {
Brian Silverman797e71e2013-09-06 17:29:39 -0700459 if (kReadDebug) {
460 printf("queue: %p reading from d1: %d\n", this, my_start);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700461 }
Austin Schuh287d98e2014-03-09 00:41:55 -0800462#if 0
463 // TODO(brians): Do this check right? (make sure full queue works etc)
Brian Silvermanc39e2bd2014-02-21 09:17:35 -0800464 // This assert checks that we're either within both endpoints (duh) or
465 // outside of both of them (if the queue is wrapped around).
466 assert((my_start >= data_start_ && my_start < data_end_) ||
467 (my_start > data_end_ && my_start <= data_start_));
Austin Schuh287d98e2014-03-09 00:41:55 -0800468#endif
Brian Silverman797e71e2013-09-06 17:29:39 -0700469 msg = data_[my_start];
470 ++(*index);
471 }
472 MessageHeader *const header = MessageHeader::Get(msg);
473 ++header->ref_count;
474 if (kRefDebug) {
475 printf("ref_inc_count: %p\n", msg);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700476 }
477 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700478 ReadCommonEnd(&read_data);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700479 return msg;
480}
481
Brian Silverman08661c72013-09-01 17:24:38 -0700482void *RawQueue::GetMessage() {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700483 MutexLocker locker(&pool_lock_);
484 MessageHeader *header;
485 if (pool_length_ - messages_used_ > 0) {
486 header = pool_[messages_used_];
487 } else {
488 if (pool_length_ >= mem_length_) {
Brian Silverman08661c72013-09-01 17:24:38 -0700489 LOG(FATAL, "overused pool of queue %p\n", this);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700490 }
491 header = pool_[pool_length_] =
492 static_cast<MessageHeader *>(shm_malloc(msg_length_));
493 ++pool_length_;
494 }
495 void *msg = reinterpret_cast<uint8_t *>(header) + sizeof(MessageHeader);
496 header->ref_count = 1;
497 if (kRefDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700498 printf("%p ref alloc: %p\n", this, msg);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700499 }
500 header->index = messages_used_;
501 ++messages_used_;
502 return msg;
503}
504
505} // namespace aos