blob: e02df303dc1dc3360e6428ee34daa689370bc625 [file] [log] [blame]
Brian Silverman14fd0fb2014-01-14 21:42:01 -08001#include "aos/linux_code/ipc_lib/queue.h"
Brian Silvermana6d1b562013-09-01 14:39:39 -07002
3#include <stdio.h>
4#include <string.h>
5#include <errno.h>
6#include <assert.h>
7
8#include <memory>
Brian Silvermanc39e2bd2014-02-21 09:17:35 -08009#include <algorithm>
Brian Silvermana6d1b562013-09-01 14:39:39 -070010
11#include "aos/common/logging/logging.h"
12#include "aos/common/type_traits.h"
Brian Silverman14fd0fb2014-01-14 21:42:01 -080013#include "aos/linux_code/ipc_lib/core_lib.h"
Brian Silvermana6d1b562013-09-01 14:39:39 -070014
15namespace aos {
Brian Silvermana6d1b562013-09-01 14:39:39 -070016namespace {
17
Brian Silverman08661c72013-09-01 17:24:38 -070018static_assert(shm_ok<RawQueue>::value,
19 "RawQueue instances go into shared memory");
Brian Silvermana6d1b562013-09-01 14:39:39 -070020
21const bool kReadDebug = false;
22const bool kWriteDebug = false;
23const bool kRefDebug = false;
24const bool kFetchDebug = false;
Brian Silvermancd2d84c2014-03-13 23:30:58 -070025const bool kReadIndexDebug = false;
Brian Silvermana6d1b562013-09-01 14:39:39 -070026
27// The number of extra messages the pool associated with each queue will be able
Brian Silverman08661c72013-09-01 17:24:38 -070028// to hold (for readers who are slow about freeing them or who leak one when
29// they get killed).
Brian Silvermana6d1b562013-09-01 14:39:39 -070030const int kExtraMessages = 20;
31
32} // namespace
33
Brian Silverman08661c72013-09-01 17:24:38 -070034const int RawQueue::kPeek;
35const int RawQueue::kFromEnd;
36const int RawQueue::kNonBlock;
37const int RawQueue::kBlock;
38const int RawQueue::kOverride;
39
40struct RawQueue::MessageHeader {
Brian Silvermana6d1b562013-09-01 14:39:39 -070041 int ref_count;
42 int index; // in pool_
Brian Silverman5f8c4922014-02-11 21:22:38 -080043 // Gets the message header immediately preceding msg.
Brian Silvermana6d1b562013-09-01 14:39:39 -070044 static MessageHeader *Get(const void *msg) {
Brian Silverman63cf2412013-11-17 05:44:36 -080045 return reinterpret_cast<MessageHeader *>(__builtin_assume_aligned(
46 static_cast<uint8_t *>(const_cast<void *>(msg)) - sizeof(MessageHeader),
47 alignof(MessageHeader)));
Brian Silvermana6d1b562013-09-01 14:39:39 -070048 }
49 void Swap(MessageHeader *other) {
50 MessageHeader temp;
51 memcpy(&temp, other, sizeof(temp));
52 memcpy(other, this, sizeof(*other));
53 memcpy(this, &temp, sizeof(*this));
54 }
55};
Brian Silverman5f8c4922014-02-11 21:22:38 -080056static_assert(shm_ok<RawQueue::MessageHeader>::value,
57 "the whole point is to stick it in shared memory");
Brian Silvermana6d1b562013-09-01 14:39:39 -070058
Brian Silverman797e71e2013-09-06 17:29:39 -070059struct RawQueue::ReadData {
60 bool writable_start;
61};
62
Brian Silvermana6d1b562013-09-01 14:39:39 -070063// TODO(brians) maybe do this with atomic integer instructions so it doesn't
64// have to lock/unlock pool_lock_
Brian Silverman08661c72013-09-01 17:24:38 -070065void RawQueue::DecrementMessageReferenceCount(const void *msg) {
Brian Silvermana6d1b562013-09-01 14:39:39 -070066 MutexLocker locker(&pool_lock_);
67 MessageHeader *header = MessageHeader::Get(msg);
68 --header->ref_count;
69 assert(header->ref_count >= 0);
70 if (kRefDebug) {
71 printf("ref_dec_count: %p count=%d\n", msg, header->ref_count);
72 }
73 if (header->ref_count == 0) {
74 DoFreeMessage(msg);
75 }
76}
77
Brian Silverman08661c72013-09-01 17:24:38 -070078RawQueue::RawQueue(const char *name, size_t length, int hash, int queue_length)
Brian Silverman5f8c4922014-02-11 21:22:38 -080079 : readable_(&data_lock_), writable_(&data_lock_) {
Brian Silvermana6d1b562013-09-01 14:39:39 -070080 const size_t name_size = strlen(name) + 1;
81 char *temp = static_cast<char *>(shm_malloc(name_size));
82 memcpy(temp, name, name_size);
83 name_ = temp;
84 length_ = length;
85 hash_ = hash;
86 queue_length_ = queue_length;
87
88 next_ = NULL;
89 recycle_ = NULL;
90
91 if (kFetchDebug) {
92 printf("initializing name=%s, length=%zd, hash=%d, queue_length=%d\n",
93 name, length, hash, queue_length);
94 }
95
96 data_length_ = queue_length + 1;
97 if (data_length_ < 2) { // TODO(brians) when could this happen?
98 data_length_ = 2;
99 }
100 data_ = static_cast<void **>(shm_malloc(sizeof(void *) * data_length_));
101 data_start_ = 0;
102 data_end_ = 0;
103 messages_ = 0;
104
105 mem_length_ = queue_length + kExtraMessages;
106 pool_length_ = 0;
107 messages_used_ = 0;
108 msg_length_ = length + sizeof(MessageHeader);
109 pool_ = static_cast<MessageHeader **>(
110 shm_malloc(sizeof(MessageHeader *) * mem_length_));
111
112 if (kFetchDebug) {
113 printf("made queue %s\n", name);
114 }
115}
Brian Silverman08661c72013-09-01 17:24:38 -0700116RawQueue *RawQueue::Fetch(const char *name, size_t length, int hash,
Brian Silvermana6d1b562013-09-01 14:39:39 -0700117 int queue_length) {
118 if (kFetchDebug) {
119 printf("fetching queue %s\n", name);
120 }
Brian Silverman4aeac5f2014-02-11 22:17:07 -0800121 if (mutex_lock(&global_core->mem_struct->queues.lock) != 0) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700122 return NULL;
123 }
Brian Silverman08661c72013-09-01 17:24:38 -0700124 RawQueue *current = static_cast<RawQueue *>(
Brian Silverman4aeac5f2014-02-11 22:17:07 -0800125 global_core->mem_struct->queues.pointer);
Brian Silverman797e71e2013-09-06 17:29:39 -0700126 if (current != NULL) {
127 while (true) {
128 // If we found a matching queue.
129 if (strcmp(current->name_, name) == 0 && current->length_ == length &&
130 current->hash_ == hash && current->queue_length_ == queue_length) {
Brian Silverman4aeac5f2014-02-11 22:17:07 -0800131 mutex_unlock(&global_core->mem_struct->queues.lock);
Brian Silverman797e71e2013-09-06 17:29:39 -0700132 return current;
133 } else {
134 if (kFetchDebug) {
135 printf("rejected queue %s strcmp=%d target=%s\n", current->name_,
136 strcmp(current->name_, name), name);
137 }
Brian Silvermana6d1b562013-09-01 14:39:39 -0700138 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700139 // If this is the last one.
140 if (current->next_ == NULL) break;
141 current = current->next_;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700142 }
Brian Silvermana6d1b562013-09-01 14:39:39 -0700143 }
144
Brian Silverman797e71e2013-09-06 17:29:39 -0700145 RawQueue *r = new (shm_malloc(sizeof(RawQueue)))
146 RawQueue(name, length, hash, queue_length);
147 if (current == NULL) { // if we don't already have one
Brian Silverman4aeac5f2014-02-11 22:17:07 -0800148 global_core->mem_struct->queues.pointer = r;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700149 } else {
Brian Silverman797e71e2013-09-06 17:29:39 -0700150 current->next_ = r;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700151 }
152
Brian Silverman4aeac5f2014-02-11 22:17:07 -0800153 mutex_unlock(&global_core->mem_struct->queues.lock);
Brian Silverman797e71e2013-09-06 17:29:39 -0700154 return r;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700155}
Brian Silverman08661c72013-09-01 17:24:38 -0700156RawQueue *RawQueue::Fetch(const char *name, size_t length, int hash,
Brian Silvermana6d1b562013-09-01 14:39:39 -0700157 int queue_length,
Brian Silverman08661c72013-09-01 17:24:38 -0700158 int recycle_hash, int recycle_length, RawQueue **recycle) {
159 RawQueue *r = Fetch(name, length, hash, queue_length);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700160 r->recycle_ = Fetch(name, length, recycle_hash, recycle_length);
161 if (r == r->recycle_) {
162 fprintf(stderr, "queue: r->recycle_(=%p) == r(=%p)\n", r->recycle_, r);
163 printf("see stderr\n");
Brian Silverman797e71e2013-09-06 17:29:39 -0700164 r->recycle_ = NULL;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700165 abort();
166 }
167 *recycle = r->recycle_;
168 return r;
169}
170
Brian Silverman08661c72013-09-01 17:24:38 -0700171void RawQueue::DoFreeMessage(const void *msg) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700172 MessageHeader *header = MessageHeader::Get(msg);
173 if (pool_[header->index] != header) { // if something's messed up
174 fprintf(stderr, "queue: something is very very wrong with queue %p."
175 " pool_(=%p)[header->index(=%d)] != header(=%p)\n",
176 this, pool_, header->index, header);
177 printf("queue: see stderr\n");
178 abort();
179 }
180 if (kRefDebug) {
181 printf("ref free: %p\n", msg);
182 }
183 --messages_used_;
184
185 if (recycle_ != NULL) {
186 void *const new_msg = recycle_->GetMessage();
187 if (new_msg == NULL) {
188 fprintf(stderr, "queue: couldn't get a message"
189 " for recycle queue %p\n", recycle_);
190 } else {
191 // Take a message from recycle_ and switch its
192 // header with the one being freed, which effectively
193 // switches which queue each message belongs to.
194 MessageHeader *const new_header = MessageHeader::Get(new_msg);
Brian Silverman797e71e2013-09-06 17:29:39 -0700195 // Also switch the messages between the pools.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700196 pool_[header->index] = new_header;
197 {
198 MutexLocker locker(&recycle_->pool_lock_);
199 recycle_->pool_[new_header->index] = header;
Brian Silverman797e71e2013-09-06 17:29:39 -0700200 // Swap the information in both headers.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700201 header->Swap(new_header);
Brian Silverman797e71e2013-09-06 17:29:39 -0700202 // Don't unlock the other pool until all of its messages are valid.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700203 }
204 // use the header for new_msg which is now for this pool
205 header = new_header;
206 if (!recycle_->WriteMessage(const_cast<void *>(msg), kOverride)) {
207 fprintf(stderr, "queue: %p->WriteMessage(%p, kOverride) failed."
208 " aborting\n", recycle_, msg);
209 printf("see stderr\n");
210 abort();
211 }
212 msg = new_msg;
213 }
214 }
215
Brian Silverman797e71e2013-09-06 17:29:39 -0700216 // Where the one we're freeing was.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700217 int index = header->index;
218 header->index = -1;
219 if (index != messages_used_) { // if we're not freeing the one on the end
Brian Silverman797e71e2013-09-06 17:29:39 -0700220 // Put the last one where the one we're freeing was.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700221 header = pool_[index] = pool_[messages_used_];
Brian Silverman797e71e2013-09-06 17:29:39 -0700222 // Put the one we're freeing at the end.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700223 pool_[messages_used_] = MessageHeader::Get(msg);
Brian Silverman797e71e2013-09-06 17:29:39 -0700224 // Update the former last one's index.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700225 header->index = index;
226 }
227}
228
Brian Silverman08661c72013-09-01 17:24:38 -0700229bool RawQueue::WriteMessage(void *msg, int options) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700230 if (kWriteDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700231 printf("queue: %p->WriteMessage(%p, %x)\n", this, msg, options);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700232 }
233 if (msg == NULL || msg < reinterpret_cast<void *>(global_core->mem_struct) ||
234 msg > static_cast<void *>((
Brian Silverman08661c72013-09-01 17:24:38 -0700235 reinterpret_cast<char *>(global_core->mem_struct) +
Brian Silvermana6d1b562013-09-01 14:39:39 -0700236 global_core->size))) {
237 fprintf(stderr, "queue: attempt to write bad message %p to %p. aborting\n",
238 msg, this);
239 printf("see stderr\n");
240 abort();
241 }
242 {
243 MutexLocker locker(&data_lock_);
Brian Silverman797e71e2013-09-06 17:29:39 -0700244 bool writable_waited = false;
245
246 int new_end;
247 while (true) {
248 new_end = (data_end_ + 1) % data_length_;
249 // If there is room in the queue right now.
250 if (new_end != data_start_) break;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700251 if (options & kNonBlock) {
252 if (kWriteDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700253 printf("queue: not blocking on %p. returning false\n", this);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700254 }
Brian Silverman358c49f2014-03-05 16:56:34 -0800255 DecrementMessageReferenceCount(msg);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700256 return false;
257 } else if (options & kOverride) {
258 if (kWriteDebug) {
259 printf("queue: overriding on %p\n", this);
260 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700261 // Avoid leaking the message that we're going to overwrite.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700262 DecrementMessageReferenceCount(data_[data_start_]);
263 data_start_ = (data_start_ + 1) % data_length_;
264 } else { // kBlock
265 if (kWriteDebug) {
266 printf("queue: going to wait for writable_ of %p\n", this);
267 }
Brian Silverman08661c72013-09-01 17:24:38 -0700268 writable_.Wait();
Brian Silverman797e71e2013-09-06 17:29:39 -0700269 writable_waited = true;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700270 }
Brian Silvermana6d1b562013-09-01 14:39:39 -0700271 }
272 data_[data_end_] = msg;
273 ++messages_;
274 data_end_ = new_end;
Brian Silverman797e71e2013-09-06 17:29:39 -0700275
276 if (kWriteDebug) {
277 printf("queue: broadcasting to readable_ of %p\n", this);
278 }
279 readable_.Broadcast();
280
281 // If we got a signal on writable_ here and it's still writable, then we
282 // need to signal the next person in line (if any).
283 if (writable_waited && is_writable()) {
284 if (kWriteDebug) {
285 printf("queue: resignalling writable_ of %p\n", this);
286 }
287 writable_.Signal();
288 }
Brian Silvermana6d1b562013-09-01 14:39:39 -0700289 }
290 if (kWriteDebug) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700291 printf("queue: write returning true on queue %p\n", this);
292 }
293 return true;
294}
295
Brian Silverman797e71e2013-09-06 17:29:39 -0700296void RawQueue::ReadCommonEnd(ReadData *read_data) {
297 if (is_writable()) {
298 if (kReadDebug) {
299 printf("queue: %ssignalling writable_ of %p\n",
300 read_data->writable_start ? "not " : "", this);
301 }
302 if (!read_data->writable_start) writable_.Signal();
Brian Silvermana6d1b562013-09-01 14:39:39 -0700303 }
304}
Brian Silverman797e71e2013-09-06 17:29:39 -0700305bool RawQueue::ReadCommonStart(int options, int *index, ReadData *read_data) {
306 read_data->writable_start = is_writable();
Brian Silvermana6d1b562013-09-01 14:39:39 -0700307 while (data_start_ == data_end_ || ((index != NULL) && messages_ <= *index)) {
308 if (options & kNonBlock) {
309 if (kReadDebug) {
310 printf("queue: not going to block waiting on %p\n", this);
311 }
312 return false;
313 } else { // kBlock
314 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700315 printf("queue: going to wait for readable_ of %p\n", this);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700316 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700317 // Wait for a message to become readable.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700318 readable_.Wait();
319 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700320 printf("queue: done waiting for readable_ of %p\n", this);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700321 }
Brian Silvermana6d1b562013-09-01 14:39:39 -0700322 }
323 }
324 if (kReadDebug) {
Brian Silvermanc39e2bd2014-02-21 09:17:35 -0800325 printf("queue: %p->read(%p) start=%d end=%d\n", this, index, data_start_,
326 data_end_);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700327 }
328 return true;
329}
Brian Silverman08661c72013-09-01 17:24:38 -0700330void *RawQueue::ReadPeek(int options, int start) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700331 void *ret;
332 if (options & kFromEnd) {
333 int pos = data_end_ - 1;
334 if (pos < 0) { // if it needs to wrap
335 pos = data_length_ - 1;
336 }
337 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700338 printf("queue: %p reading from line %d: %d\n", this, __LINE__, pos);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700339 }
340 ret = data_[pos];
341 } else {
342 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700343 printf("queue: %p reading from line %d: %d\n", this, __LINE__, start);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700344 }
345 ret = data_[start];
346 }
347 MessageHeader *const header = MessageHeader::Get(ret);
348 ++header->ref_count;
349 if (kRefDebug) {
350 printf("ref inc count: %p\n", ret);
351 }
352 return ret;
353}
Brian Silverman08661c72013-09-01 17:24:38 -0700354const void *RawQueue::ReadMessage(int options) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700355 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700356 printf("queue: %p->ReadMessage(%x)\n", this, options);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700357 }
358 void *msg = NULL;
Brian Silverman797e71e2013-09-06 17:29:39 -0700359
Brian Silvermana6d1b562013-09-01 14:39:39 -0700360 MutexLocker locker(&data_lock_);
Brian Silverman797e71e2013-09-06 17:29:39 -0700361
362 ReadData read_data;
363 if (!ReadCommonStart(options, NULL, &read_data)) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700364 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700365 printf("queue: %p common returned false\n", this);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700366 }
367 return NULL;
368 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700369
Brian Silvermana6d1b562013-09-01 14:39:39 -0700370 if (options & kPeek) {
371 msg = ReadPeek(options, data_start_);
372 } else {
373 if (options & kFromEnd) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700374 while (true) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700375 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700376 printf("queue: %p start of c2\n", this);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700377 }
378 // This loop pulls each message out of the buffer.
379 const int pos = data_start_;
380 data_start_ = (data_start_ + 1) % data_length_;
Brian Silverman797e71e2013-09-06 17:29:39 -0700381 // If this is the last one.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700382 if (data_start_ == data_end_) {
383 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700384 printf("queue: %p reading from c2: %d\n", this, pos);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700385 }
386 msg = data_[pos];
387 break;
388 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700389 // This message is not going to be in the queue any more.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700390 DecrementMessageReferenceCount(data_[pos]);
391 }
392 } else {
393 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700394 printf("queue: %p reading from d2: %d\n", this, data_start_);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700395 }
396 msg = data_[data_start_];
397 data_start_ = (data_start_ + 1) % data_length_;
398 }
399 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700400 ReadCommonEnd(&read_data);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700401 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700402 printf("queue: %p read returning %p\n", this, msg);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700403 }
404 return msg;
405}
Brian Silverman08661c72013-09-01 17:24:38 -0700406const void *RawQueue::ReadMessageIndex(int options, int *index) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700407 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700408 printf("queue: %p->ReadMessageIndex(%x, %p(*=%d))\n",
Brian Silvermana6d1b562013-09-01 14:39:39 -0700409 this, options, index, *index);
410 }
411 void *msg = NULL;
Brian Silverman797e71e2013-09-06 17:29:39 -0700412
413 MutexLocker locker(&data_lock_);
414
415 ReadData read_data;
416 if (!ReadCommonStart(options, index, &read_data)) {
417 if (kReadDebug) {
418 printf("queue: %p common returned false\n", this);
419 }
420 return NULL;
421 }
422
423 // TODO(parker): Handle integer wrap on the index.
424
Brian Silverman797e71e2013-09-06 17:29:39 -0700425 // Where we're going to start reading.
Brian Silvermanc39e2bd2014-02-21 09:17:35 -0800426 int my_start;
427
428 const int unread_messages = messages_ - *index;
Brian Silvermancd2d84c2014-03-13 23:30:58 -0700429 assert(unread_messages > 0);
Austin Schuh287d98e2014-03-09 00:41:55 -0800430 int current_messages = data_end_ - data_start_;
Brian Silvermancd2d84c2014-03-13 23:30:58 -0700431 if (current_messages < 0) current_messages += data_length_;
432 if (kReadIndexDebug) {
433 printf("queue: %p start=%d end=%d current=%d\n",
434 this, data_start_, data_end_, current_messages);
435 }
436 assert(current_messages > 0);
Brian Silverman67e34f52014-03-13 15:52:57 -0700437 // If we're behind the available messages.
438 if (unread_messages > current_messages) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700439 // Catch index up to the last available message.
Brian Silvermanc39e2bd2014-02-21 09:17:35 -0800440 *index = messages_ - current_messages;
Brian Silverman797e71e2013-09-06 17:29:39 -0700441 // And that's the one we're going to read.
442 my_start = data_start_;
Brian Silvermancd2d84c2014-03-13 23:30:58 -0700443 if (kReadIndexDebug) {
444 printf("queue: %p jumping ahead to message %d (have %d) (at %d)\n",
445 this, *index, messages_, data_start_);
446 }
Brian Silvermanc39e2bd2014-02-21 09:17:35 -0800447 } else {
448 // Just start reading at the first available message that we haven't yet
449 // read.
Brian Silverman67e34f52014-03-13 15:52:57 -0700450 my_start = data_end_ - unread_messages;
Brian Silvermancd2d84c2014-03-13 23:30:58 -0700451 if (kReadIndexDebug) {
452 printf("queue: %p original read from %d\n", this, my_start);
453 }
454 if (data_start_ < data_end_) {
455 assert(my_start >= data_start_);
456 } else {
457 if (my_start < 0) my_start += data_length_;
Brian Silverman67e34f52014-03-13 15:52:57 -0700458 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700459 }
Brian Silvermanc39e2bd2014-02-21 09:17:35 -0800460
Brian Silvermancd2d84c2014-03-13 23:30:58 -0700461 // TODO(brians): Test kPeek and kFromEnd.
Brian Silverman797e71e2013-09-06 17:29:39 -0700462 if (options & kPeek) {
463 msg = ReadPeek(options, my_start);
464 } else {
465 if (options & kFromEnd) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700466 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700467 printf("queue: %p start of c1\n", this);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700468 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700469 int pos = data_end_ - 1;
470 if (pos < 0) { // If it wrapped.
471 pos = data_length_ - 1; // Unwrap it.
472 }
473 if (kReadDebug) {
474 printf("queue: %p reading from c1: %d\n", this, pos);
475 }
476 msg = data_[pos];
477 *index = messages_;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700478 } else {
Brian Silverman797e71e2013-09-06 17:29:39 -0700479 if (kReadDebug) {
480 printf("queue: %p reading from d1: %d\n", this, my_start);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700481 }
Brian Silvermanc39e2bd2014-02-21 09:17:35 -0800482 // This assert checks that we're either within both endpoints (duh) or
Brian Silvermancd2d84c2014-03-13 23:30:58 -0700483 // not between them (if the queue is wrapped around).
Brian Silvermanc39e2bd2014-02-21 09:17:35 -0800484 assert((my_start >= data_start_ && my_start < data_end_) ||
Brian Silvermancd2d84c2014-03-13 23:30:58 -0700485 ((my_start >= data_start_) == (my_start > data_end_)));
Brian Silverman797e71e2013-09-06 17:29:39 -0700486 msg = data_[my_start];
487 ++(*index);
488 }
489 MessageHeader *const header = MessageHeader::Get(msg);
490 ++header->ref_count;
491 if (kRefDebug) {
492 printf("ref_inc_count: %p\n", msg);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700493 }
494 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700495 ReadCommonEnd(&read_data);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700496 return msg;
497}
498
Brian Silverman08661c72013-09-01 17:24:38 -0700499void *RawQueue::GetMessage() {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700500 MutexLocker locker(&pool_lock_);
501 MessageHeader *header;
502 if (pool_length_ - messages_used_ > 0) {
503 header = pool_[messages_used_];
504 } else {
505 if (pool_length_ >= mem_length_) {
Brian Silverman08661c72013-09-01 17:24:38 -0700506 LOG(FATAL, "overused pool of queue %p\n", this);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700507 }
508 header = pool_[pool_length_] =
509 static_cast<MessageHeader *>(shm_malloc(msg_length_));
510 ++pool_length_;
511 }
512 void *msg = reinterpret_cast<uint8_t *>(header) + sizeof(MessageHeader);
513 header->ref_count = 1;
514 if (kRefDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700515 printf("%p ref alloc: %p\n", this, msg);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700516 }
517 header->index = messages_used_;
518 ++messages_used_;
519 return msg;
520}
521
522} // namespace aos