blob: 8103b6e2bc54d6e769ee6d24c856b81358fc6cb9 [file] [log] [blame]
Brian Silverman14fd0fb2014-01-14 21:42:01 -08001#include "aos/linux_code/ipc_lib/queue.h"
Brian Silvermana6d1b562013-09-01 14:39:39 -07002
3#include <stdio.h>
4#include <string.h>
5#include <errno.h>
6#include <assert.h>
7
8#include <memory>
Brian Silvermanc39e2bd2014-02-21 09:17:35 -08009#include <algorithm>
Brian Silvermana6d1b562013-09-01 14:39:39 -070010
11#include "aos/common/logging/logging.h"
12#include "aos/common/type_traits.h"
Brian Silverman14fd0fb2014-01-14 21:42:01 -080013#include "aos/linux_code/ipc_lib/core_lib.h"
Brian Silvermana6d1b562013-09-01 14:39:39 -070014
15namespace aos {
Brian Silvermana6d1b562013-09-01 14:39:39 -070016namespace {
17
Brian Silverman08661c72013-09-01 17:24:38 -070018static_assert(shm_ok<RawQueue>::value,
19 "RawQueue instances go into shared memory");
Brian Silvermana6d1b562013-09-01 14:39:39 -070020
21const bool kReadDebug = false;
22const bool kWriteDebug = false;
23const bool kRefDebug = false;
24const bool kFetchDebug = false;
Brian Silvermancd2d84c2014-03-13 23:30:58 -070025const bool kReadIndexDebug = false;
Brian Silvermana6d1b562013-09-01 14:39:39 -070026
27// The number of extra messages the pool associated with each queue will be able
Brian Silverman08661c72013-09-01 17:24:38 -070028// to hold (for readers who are slow about freeing them or who leak one when
29// they get killed).
Brian Silvermana6d1b562013-09-01 14:39:39 -070030const int kExtraMessages = 20;
31
32} // namespace
33
Brian Silverman08661c72013-09-01 17:24:38 -070034const int RawQueue::kPeek;
35const int RawQueue::kFromEnd;
36const int RawQueue::kNonBlock;
37const int RawQueue::kBlock;
38const int RawQueue::kOverride;
39
40struct RawQueue::MessageHeader {
Brian Silvermana6d1b562013-09-01 14:39:39 -070041 int ref_count;
42 int index; // in pool_
Brian Silverman5f8c4922014-02-11 21:22:38 -080043 // Gets the message header immediately preceding msg.
Brian Silvermana6d1b562013-09-01 14:39:39 -070044 static MessageHeader *Get(const void *msg) {
Brian Silverman63cf2412013-11-17 05:44:36 -080045 return reinterpret_cast<MessageHeader *>(__builtin_assume_aligned(
46 static_cast<uint8_t *>(const_cast<void *>(msg)) - sizeof(MessageHeader),
47 alignof(MessageHeader)));
Brian Silvermana6d1b562013-09-01 14:39:39 -070048 }
49 void Swap(MessageHeader *other) {
50 MessageHeader temp;
51 memcpy(&temp, other, sizeof(temp));
52 memcpy(other, this, sizeof(*other));
53 memcpy(this, &temp, sizeof(*this));
54 }
55};
Brian Silverman5f8c4922014-02-11 21:22:38 -080056static_assert(shm_ok<RawQueue::MessageHeader>::value,
57 "the whole point is to stick it in shared memory");
Brian Silvermana6d1b562013-09-01 14:39:39 -070058
Brian Silverman797e71e2013-09-06 17:29:39 -070059struct RawQueue::ReadData {
60 bool writable_start;
61};
62
Brian Silvermana6d1b562013-09-01 14:39:39 -070063// TODO(brians) maybe do this with atomic integer instructions so it doesn't
64// have to lock/unlock pool_lock_
Brian Silverman08661c72013-09-01 17:24:38 -070065void RawQueue::DecrementMessageReferenceCount(const void *msg) {
Brian Silvermaneb51cbb2014-03-14 22:57:08 -070066 // TODO(brians): Test this function.
Brian Silvermana6d1b562013-09-01 14:39:39 -070067 MutexLocker locker(&pool_lock_);
68 MessageHeader *header = MessageHeader::Get(msg);
69 --header->ref_count;
70 assert(header->ref_count >= 0);
71 if (kRefDebug) {
72 printf("ref_dec_count: %p count=%d\n", msg, header->ref_count);
73 }
74 if (header->ref_count == 0) {
75 DoFreeMessage(msg);
76 }
77}
78
Brian Silverman08661c72013-09-01 17:24:38 -070079RawQueue::RawQueue(const char *name, size_t length, int hash, int queue_length)
Brian Silverman5f8c4922014-02-11 21:22:38 -080080 : readable_(&data_lock_), writable_(&data_lock_) {
Brian Silvermana6d1b562013-09-01 14:39:39 -070081 const size_t name_size = strlen(name) + 1;
82 char *temp = static_cast<char *>(shm_malloc(name_size));
83 memcpy(temp, name, name_size);
84 name_ = temp;
85 length_ = length;
86 hash_ = hash;
87 queue_length_ = queue_length;
88
89 next_ = NULL;
90 recycle_ = NULL;
91
92 if (kFetchDebug) {
93 printf("initializing name=%s, length=%zd, hash=%d, queue_length=%d\n",
94 name, length, hash, queue_length);
95 }
96
97 data_length_ = queue_length + 1;
98 if (data_length_ < 2) { // TODO(brians) when could this happen?
99 data_length_ = 2;
100 }
101 data_ = static_cast<void **>(shm_malloc(sizeof(void *) * data_length_));
102 data_start_ = 0;
103 data_end_ = 0;
104 messages_ = 0;
105
106 mem_length_ = queue_length + kExtraMessages;
107 pool_length_ = 0;
108 messages_used_ = 0;
109 msg_length_ = length + sizeof(MessageHeader);
110 pool_ = static_cast<MessageHeader **>(
111 shm_malloc(sizeof(MessageHeader *) * mem_length_));
112
113 if (kFetchDebug) {
114 printf("made queue %s\n", name);
115 }
116}
Brian Silverman08661c72013-09-01 17:24:38 -0700117RawQueue *RawQueue::Fetch(const char *name, size_t length, int hash,
Brian Silvermana6d1b562013-09-01 14:39:39 -0700118 int queue_length) {
119 if (kFetchDebug) {
120 printf("fetching queue %s\n", name);
121 }
Brian Silverman4aeac5f2014-02-11 22:17:07 -0800122 if (mutex_lock(&global_core->mem_struct->queues.lock) != 0) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700123 return NULL;
124 }
Brian Silverman08661c72013-09-01 17:24:38 -0700125 RawQueue *current = static_cast<RawQueue *>(
Brian Silverman4aeac5f2014-02-11 22:17:07 -0800126 global_core->mem_struct->queues.pointer);
Brian Silverman797e71e2013-09-06 17:29:39 -0700127 if (current != NULL) {
128 while (true) {
129 // If we found a matching queue.
130 if (strcmp(current->name_, name) == 0 && current->length_ == length &&
131 current->hash_ == hash && current->queue_length_ == queue_length) {
Brian Silverman4aeac5f2014-02-11 22:17:07 -0800132 mutex_unlock(&global_core->mem_struct->queues.lock);
Brian Silverman797e71e2013-09-06 17:29:39 -0700133 return current;
134 } else {
135 if (kFetchDebug) {
136 printf("rejected queue %s strcmp=%d target=%s\n", current->name_,
137 strcmp(current->name_, name), name);
138 }
Brian Silvermana6d1b562013-09-01 14:39:39 -0700139 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700140 // If this is the last one.
141 if (current->next_ == NULL) break;
142 current = current->next_;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700143 }
Brian Silvermana6d1b562013-09-01 14:39:39 -0700144 }
145
Brian Silverman797e71e2013-09-06 17:29:39 -0700146 RawQueue *r = new (shm_malloc(sizeof(RawQueue)))
147 RawQueue(name, length, hash, queue_length);
148 if (current == NULL) { // if we don't already have one
Brian Silverman4aeac5f2014-02-11 22:17:07 -0800149 global_core->mem_struct->queues.pointer = r;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700150 } else {
Brian Silverman797e71e2013-09-06 17:29:39 -0700151 current->next_ = r;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700152 }
153
Brian Silverman4aeac5f2014-02-11 22:17:07 -0800154 mutex_unlock(&global_core->mem_struct->queues.lock);
Brian Silverman797e71e2013-09-06 17:29:39 -0700155 return r;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700156}
Brian Silverman08661c72013-09-01 17:24:38 -0700157RawQueue *RawQueue::Fetch(const char *name, size_t length, int hash,
Brian Silvermana6d1b562013-09-01 14:39:39 -0700158 int queue_length,
Brian Silverman08661c72013-09-01 17:24:38 -0700159 int recycle_hash, int recycle_length, RawQueue **recycle) {
160 RawQueue *r = Fetch(name, length, hash, queue_length);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700161 r->recycle_ = Fetch(name, length, recycle_hash, recycle_length);
162 if (r == r->recycle_) {
163 fprintf(stderr, "queue: r->recycle_(=%p) == r(=%p)\n", r->recycle_, r);
164 printf("see stderr\n");
Brian Silverman797e71e2013-09-06 17:29:39 -0700165 r->recycle_ = NULL;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700166 abort();
167 }
168 *recycle = r->recycle_;
169 return r;
170}
171
Brian Silverman08661c72013-09-01 17:24:38 -0700172void RawQueue::DoFreeMessage(const void *msg) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700173 MessageHeader *header = MessageHeader::Get(msg);
174 if (pool_[header->index] != header) { // if something's messed up
175 fprintf(stderr, "queue: something is very very wrong with queue %p."
176 " pool_(=%p)[header->index(=%d)] != header(=%p)\n",
177 this, pool_, header->index, header);
178 printf("queue: see stderr\n");
179 abort();
180 }
181 if (kRefDebug) {
182 printf("ref free: %p\n", msg);
183 }
184 --messages_used_;
185
186 if (recycle_ != NULL) {
187 void *const new_msg = recycle_->GetMessage();
188 if (new_msg == NULL) {
189 fprintf(stderr, "queue: couldn't get a message"
190 " for recycle queue %p\n", recycle_);
191 } else {
192 // Take a message from recycle_ and switch its
193 // header with the one being freed, which effectively
194 // switches which queue each message belongs to.
195 MessageHeader *const new_header = MessageHeader::Get(new_msg);
Brian Silverman797e71e2013-09-06 17:29:39 -0700196 // Also switch the messages between the pools.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700197 pool_[header->index] = new_header;
198 {
199 MutexLocker locker(&recycle_->pool_lock_);
200 recycle_->pool_[new_header->index] = header;
Brian Silverman797e71e2013-09-06 17:29:39 -0700201 // Swap the information in both headers.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700202 header->Swap(new_header);
Brian Silverman797e71e2013-09-06 17:29:39 -0700203 // Don't unlock the other pool until all of its messages are valid.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700204 }
205 // use the header for new_msg which is now for this pool
206 header = new_header;
207 if (!recycle_->WriteMessage(const_cast<void *>(msg), kOverride)) {
208 fprintf(stderr, "queue: %p->WriteMessage(%p, kOverride) failed."
209 " aborting\n", recycle_, msg);
210 printf("see stderr\n");
211 abort();
212 }
213 msg = new_msg;
214 }
215 }
216
Brian Silverman797e71e2013-09-06 17:29:39 -0700217 // Where the one we're freeing was.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700218 int index = header->index;
219 header->index = -1;
220 if (index != messages_used_) { // if we're not freeing the one on the end
Brian Silverman797e71e2013-09-06 17:29:39 -0700221 // Put the last one where the one we're freeing was.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700222 header = pool_[index] = pool_[messages_used_];
Brian Silverman797e71e2013-09-06 17:29:39 -0700223 // Put the one we're freeing at the end.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700224 pool_[messages_used_] = MessageHeader::Get(msg);
Brian Silverman797e71e2013-09-06 17:29:39 -0700225 // Update the former last one's index.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700226 header->index = index;
227 }
228}
229
Brian Silverman08661c72013-09-01 17:24:38 -0700230bool RawQueue::WriteMessage(void *msg, int options) {
Brian Silvermaneb51cbb2014-03-14 22:57:08 -0700231 // TODO(brians): Test this function.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700232 if (kWriteDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700233 printf("queue: %p->WriteMessage(%p, %x)\n", this, msg, options);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700234 }
235 if (msg == NULL || msg < reinterpret_cast<void *>(global_core->mem_struct) ||
236 msg > static_cast<void *>((
Brian Silverman08661c72013-09-01 17:24:38 -0700237 reinterpret_cast<char *>(global_core->mem_struct) +
Brian Silvermana6d1b562013-09-01 14:39:39 -0700238 global_core->size))) {
239 fprintf(stderr, "queue: attempt to write bad message %p to %p. aborting\n",
240 msg, this);
241 printf("see stderr\n");
242 abort();
243 }
244 {
245 MutexLocker locker(&data_lock_);
Brian Silverman797e71e2013-09-06 17:29:39 -0700246 bool writable_waited = false;
247
248 int new_end;
249 while (true) {
250 new_end = (data_end_ + 1) % data_length_;
251 // If there is room in the queue right now.
252 if (new_end != data_start_) break;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700253 if (options & kNonBlock) {
254 if (kWriteDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700255 printf("queue: not blocking on %p. returning false\n", this);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700256 }
Brian Silverman358c49f2014-03-05 16:56:34 -0800257 DecrementMessageReferenceCount(msg);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700258 return false;
259 } else if (options & kOverride) {
260 if (kWriteDebug) {
261 printf("queue: overriding on %p\n", this);
262 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700263 // Avoid leaking the message that we're going to overwrite.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700264 DecrementMessageReferenceCount(data_[data_start_]);
265 data_start_ = (data_start_ + 1) % data_length_;
266 } else { // kBlock
267 if (kWriteDebug) {
268 printf("queue: going to wait for writable_ of %p\n", this);
269 }
Brian Silverman08661c72013-09-01 17:24:38 -0700270 writable_.Wait();
Brian Silverman797e71e2013-09-06 17:29:39 -0700271 writable_waited = true;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700272 }
Brian Silvermana6d1b562013-09-01 14:39:39 -0700273 }
274 data_[data_end_] = msg;
275 ++messages_;
276 data_end_ = new_end;
Brian Silverman797e71e2013-09-06 17:29:39 -0700277
278 if (kWriteDebug) {
279 printf("queue: broadcasting to readable_ of %p\n", this);
280 }
281 readable_.Broadcast();
282
283 // If we got a signal on writable_ here and it's still writable, then we
284 // need to signal the next person in line (if any).
285 if (writable_waited && is_writable()) {
286 if (kWriteDebug) {
287 printf("queue: resignalling writable_ of %p\n", this);
288 }
289 writable_.Signal();
290 }
Brian Silvermana6d1b562013-09-01 14:39:39 -0700291 }
292 if (kWriteDebug) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700293 printf("queue: write returning true on queue %p\n", this);
294 }
295 return true;
296}
297
Brian Silverman797e71e2013-09-06 17:29:39 -0700298void RawQueue::ReadCommonEnd(ReadData *read_data) {
299 if (is_writable()) {
300 if (kReadDebug) {
301 printf("queue: %ssignalling writable_ of %p\n",
302 read_data->writable_start ? "not " : "", this);
303 }
304 if (!read_data->writable_start) writable_.Signal();
Brian Silvermana6d1b562013-09-01 14:39:39 -0700305 }
306}
Brian Silverman797e71e2013-09-06 17:29:39 -0700307bool RawQueue::ReadCommonStart(int options, int *index, ReadData *read_data) {
308 read_data->writable_start = is_writable();
Brian Silvermana6d1b562013-09-01 14:39:39 -0700309 while (data_start_ == data_end_ || ((index != NULL) && messages_ <= *index)) {
310 if (options & kNonBlock) {
311 if (kReadDebug) {
312 printf("queue: not going to block waiting on %p\n", this);
313 }
314 return false;
315 } else { // kBlock
316 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700317 printf("queue: going to wait for readable_ of %p\n", this);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700318 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700319 // Wait for a message to become readable.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700320 readable_.Wait();
321 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700322 printf("queue: done waiting for readable_ of %p\n", this);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700323 }
Brian Silvermana6d1b562013-09-01 14:39:39 -0700324 }
325 }
326 if (kReadDebug) {
Brian Silvermanc39e2bd2014-02-21 09:17:35 -0800327 printf("queue: %p->read(%p) start=%d end=%d\n", this, index, data_start_,
328 data_end_);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700329 }
330 return true;
331}
Brian Silvermaneb51cbb2014-03-14 22:57:08 -0700332void *RawQueue::ReadPeek(int options, int start) const {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700333 void *ret;
334 if (options & kFromEnd) {
Brian Silvermaneb51cbb2014-03-14 22:57:08 -0700335 // TODO(brians): Test this block with ReadMessageIndex.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700336 int pos = data_end_ - 1;
337 if (pos < 0) { // if it needs to wrap
338 pos = data_length_ - 1;
339 }
340 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700341 printf("queue: %p reading from line %d: %d\n", this, __LINE__, pos);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700342 }
343 ret = data_[pos];
344 } else {
Brian Silvermaneb51cbb2014-03-14 22:57:08 -0700345 assert(start != -1);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700346 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700347 printf("queue: %p reading from line %d: %d\n", this, __LINE__, start);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700348 }
349 ret = data_[start];
350 }
351 MessageHeader *const header = MessageHeader::Get(ret);
352 ++header->ref_count;
353 if (kRefDebug) {
354 printf("ref inc count: %p\n", ret);
355 }
356 return ret;
357}
Brian Silverman08661c72013-09-01 17:24:38 -0700358const void *RawQueue::ReadMessage(int options) {
Brian Silvermaneb51cbb2014-03-14 22:57:08 -0700359 // TODO(brians): Test this function.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700360 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700361 printf("queue: %p->ReadMessage(%x)\n", this, options);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700362 }
363 void *msg = NULL;
Brian Silverman797e71e2013-09-06 17:29:39 -0700364
Brian Silvermana6d1b562013-09-01 14:39:39 -0700365 MutexLocker locker(&data_lock_);
Brian Silverman797e71e2013-09-06 17:29:39 -0700366
367 ReadData read_data;
368 if (!ReadCommonStart(options, NULL, &read_data)) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700369 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700370 printf("queue: %p common returned false\n", this);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700371 }
372 return NULL;
373 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700374
Brian Silvermana6d1b562013-09-01 14:39:39 -0700375 if (options & kPeek) {
376 msg = ReadPeek(options, data_start_);
377 } else {
378 if (options & kFromEnd) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700379 while (true) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700380 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700381 printf("queue: %p start of c2\n", this);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700382 }
383 // This loop pulls each message out of the buffer.
384 const int pos = data_start_;
385 data_start_ = (data_start_ + 1) % data_length_;
Brian Silverman797e71e2013-09-06 17:29:39 -0700386 // If this is the last one.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700387 if (data_start_ == data_end_) {
388 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700389 printf("queue: %p reading from c2: %d\n", this, pos);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700390 }
391 msg = data_[pos];
392 break;
393 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700394 // This message is not going to be in the queue any more.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700395 DecrementMessageReferenceCount(data_[pos]);
396 }
397 } else {
398 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700399 printf("queue: %p reading from d2: %d\n", this, data_start_);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700400 }
401 msg = data_[data_start_];
402 data_start_ = (data_start_ + 1) % data_length_;
403 }
404 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700405 ReadCommonEnd(&read_data);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700406 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700407 printf("queue: %p read returning %p\n", this, msg);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700408 }
409 return msg;
410}
Brian Silverman08661c72013-09-01 17:24:38 -0700411const void *RawQueue::ReadMessageIndex(int options, int *index) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700412 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700413 printf("queue: %p->ReadMessageIndex(%x, %p(*=%d))\n",
Brian Silvermana6d1b562013-09-01 14:39:39 -0700414 this, options, index, *index);
415 }
416 void *msg = NULL;
Brian Silverman797e71e2013-09-06 17:29:39 -0700417
418 MutexLocker locker(&data_lock_);
419
420 ReadData read_data;
421 if (!ReadCommonStart(options, index, &read_data)) {
422 if (kReadDebug) {
423 printf("queue: %p common returned false\n", this);
424 }
425 return NULL;
426 }
427
428 // TODO(parker): Handle integer wrap on the index.
429
Brian Silverman797e71e2013-09-06 17:29:39 -0700430 // Where we're going to start reading.
Brian Silvermanc39e2bd2014-02-21 09:17:35 -0800431 int my_start;
432
Brian Silvermaneb51cbb2014-03-14 22:57:08 -0700433 if (options & kFromEnd) {
434 my_start = -1;
Brian Silvermanc39e2bd2014-02-21 09:17:35 -0800435 } else {
Brian Silvermaneb51cbb2014-03-14 22:57:08 -0700436 const int unread_messages = messages_ - *index;
437 assert(unread_messages > 0);
438 int current_messages = data_end_ - data_start_;
439 if (current_messages < 0) current_messages += data_length_;
Brian Silvermancd2d84c2014-03-13 23:30:58 -0700440 if (kReadIndexDebug) {
Brian Silvermaneb51cbb2014-03-14 22:57:08 -0700441 printf("queue: %p start=%d end=%d current=%d\n",
442 this, data_start_, data_end_, current_messages);
Brian Silvermancd2d84c2014-03-13 23:30:58 -0700443 }
Brian Silvermaneb51cbb2014-03-14 22:57:08 -0700444 assert(current_messages > 0);
445 // If we're behind the available messages.
446 if (unread_messages > current_messages) {
447 // Catch index up to the last available message.
448 *index = messages_ - current_messages;
449 // And that's the one we're going to read.
450 my_start = data_start_;
451 if (kReadIndexDebug) {
452 printf("queue: %p jumping ahead to message %d (have %d) (at %d)\n",
453 this, *index, messages_, data_start_);
454 }
Brian Silvermancd2d84c2014-03-13 23:30:58 -0700455 } else {
Brian Silvermaneb51cbb2014-03-14 22:57:08 -0700456 // Just start reading at the first available message that we haven't yet
457 // read.
458 my_start = data_end_ - unread_messages;
459 if (kReadIndexDebug) {
460 printf("queue: %p original read from %d\n", this, my_start);
461 }
462 if (data_start_ < data_end_) {
463 assert(my_start >= data_start_);
464 } else {
465 if (my_start < 0) my_start += data_length_;
466 }
Brian Silverman67e34f52014-03-13 15:52:57 -0700467 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700468 }
Brian Silvermanc39e2bd2014-02-21 09:17:35 -0800469
Brian Silverman797e71e2013-09-06 17:29:39 -0700470 if (options & kPeek) {
471 msg = ReadPeek(options, my_start);
472 } else {
473 if (options & kFromEnd) {
Brian Silvermaneb51cbb2014-03-14 22:57:08 -0700474 // TODO(brians): Test this block.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700475 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700476 printf("queue: %p start of c1\n", this);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700477 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700478 int pos = data_end_ - 1;
Brian Silvermaneb51cbb2014-03-14 22:57:08 -0700479 if (kReadIndexDebug) {
480 printf("queue: %p end pos start %d\n", this, pos);
481 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700482 if (pos < 0) { // If it wrapped.
483 pos = data_length_ - 1; // Unwrap it.
484 }
485 if (kReadDebug) {
486 printf("queue: %p reading from c1: %d\n", this, pos);
487 }
488 msg = data_[pos];
489 *index = messages_;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700490 } else {
Brian Silverman797e71e2013-09-06 17:29:39 -0700491 if (kReadDebug) {
492 printf("queue: %p reading from d1: %d\n", this, my_start);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700493 }
Brian Silvermanc39e2bd2014-02-21 09:17:35 -0800494 // This assert checks that we're either within both endpoints (duh) or
Brian Silvermancd2d84c2014-03-13 23:30:58 -0700495 // not between them (if the queue is wrapped around).
Brian Silvermanc39e2bd2014-02-21 09:17:35 -0800496 assert((my_start >= data_start_ && my_start < data_end_) ||
Brian Silvermancd2d84c2014-03-13 23:30:58 -0700497 ((my_start >= data_start_) == (my_start > data_end_)));
Brian Silvermaneb51cbb2014-03-14 22:57:08 -0700498 // More sanity checking.
499 assert((my_start >= 0) && (my_start < data_length_));
Brian Silverman797e71e2013-09-06 17:29:39 -0700500 msg = data_[my_start];
501 ++(*index);
502 }
503 MessageHeader *const header = MessageHeader::Get(msg);
504 ++header->ref_count;
505 if (kRefDebug) {
506 printf("ref_inc_count: %p\n", msg);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700507 }
508 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700509 ReadCommonEnd(&read_data);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700510 return msg;
511}
512
Brian Silverman08661c72013-09-01 17:24:38 -0700513void *RawQueue::GetMessage() {
Brian Silvermaneb51cbb2014-03-14 22:57:08 -0700514 // TODO(brians): Test this function.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700515 MutexLocker locker(&pool_lock_);
516 MessageHeader *header;
517 if (pool_length_ - messages_used_ > 0) {
518 header = pool_[messages_used_];
519 } else {
520 if (pool_length_ >= mem_length_) {
Brian Silverman08661c72013-09-01 17:24:38 -0700521 LOG(FATAL, "overused pool of queue %p\n", this);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700522 }
523 header = pool_[pool_length_] =
524 static_cast<MessageHeader *>(shm_malloc(msg_length_));
525 ++pool_length_;
526 }
527 void *msg = reinterpret_cast<uint8_t *>(header) + sizeof(MessageHeader);
528 header->ref_count = 1;
529 if (kRefDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700530 printf("%p ref alloc: %p\n", this, msg);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700531 }
532 header->index = messages_used_;
533 ++messages_used_;
534 return msg;
535}
536
537} // namespace aos