blob: 39fd5580fe0a914239a309bdf80c26b6fac6c162 [file] [log] [blame]
Brian Silverman14fd0fb2014-01-14 21:42:01 -08001#include "aos/linux_code/ipc_lib/queue.h"
Brian Silvermana6d1b562013-09-01 14:39:39 -07002
3#include <stdio.h>
4#include <string.h>
5#include <errno.h>
6#include <assert.h>
7
8#include <memory>
Brian Silvermanc39e2bd2014-02-21 09:17:35 -08009#include <algorithm>
Brian Silvermana6d1b562013-09-01 14:39:39 -070010
11#include "aos/common/logging/logging.h"
12#include "aos/common/type_traits.h"
Brian Silverman14fd0fb2014-01-14 21:42:01 -080013#include "aos/linux_code/ipc_lib/core_lib.h"
Brian Silvermana6d1b562013-09-01 14:39:39 -070014
15namespace aos {
Brian Silvermana6d1b562013-09-01 14:39:39 -070016namespace {
17
Brian Silverman08661c72013-09-01 17:24:38 -070018static_assert(shm_ok<RawQueue>::value,
19 "RawQueue instances go into shared memory");
Brian Silvermana6d1b562013-09-01 14:39:39 -070020
21const bool kReadDebug = false;
22const bool kWriteDebug = false;
23const bool kRefDebug = false;
24const bool kFetchDebug = false;
Brian Silvermancd2d84c2014-03-13 23:30:58 -070025const bool kReadIndexDebug = false;
Brian Silvermana6d1b562013-09-01 14:39:39 -070026
27// The number of extra messages the pool associated with each queue will be able
Brian Silverman08661c72013-09-01 17:24:38 -070028// to hold (for readers who are slow about freeing them or who leak one when
29// they get killed).
Brian Silvermana6d1b562013-09-01 14:39:39 -070030const int kExtraMessages = 20;
31
32} // namespace
33
Brian Silverman08661c72013-09-01 17:24:38 -070034const int RawQueue::kPeek;
35const int RawQueue::kFromEnd;
36const int RawQueue::kNonBlock;
37const int RawQueue::kBlock;
38const int RawQueue::kOverride;
39
40struct RawQueue::MessageHeader {
Brian Silvermanad290d82014-03-19 17:22:05 -070041 // This gets incremented and decremented with atomic instructions without any
42 // locks held.
Brian Silvermana6d1b562013-09-01 14:39:39 -070043 int ref_count;
44 int index; // in pool_
Brian Silverman5f8c4922014-02-11 21:22:38 -080045 // Gets the message header immediately preceding msg.
Brian Silvermana6d1b562013-09-01 14:39:39 -070046 static MessageHeader *Get(const void *msg) {
Brian Silverman63cf2412013-11-17 05:44:36 -080047 return reinterpret_cast<MessageHeader *>(__builtin_assume_aligned(
48 static_cast<uint8_t *>(const_cast<void *>(msg)) - sizeof(MessageHeader),
49 alignof(MessageHeader)));
Brian Silvermana6d1b562013-09-01 14:39:39 -070050 }
51 void Swap(MessageHeader *other) {
52 MessageHeader temp;
53 memcpy(&temp, other, sizeof(temp));
54 memcpy(other, this, sizeof(*other));
55 memcpy(this, &temp, sizeof(*this));
56 }
57};
Brian Silverman5f8c4922014-02-11 21:22:38 -080058static_assert(shm_ok<RawQueue::MessageHeader>::value,
59 "the whole point is to stick it in shared memory");
Brian Silvermana6d1b562013-09-01 14:39:39 -070060
Brian Silverman797e71e2013-09-06 17:29:39 -070061struct RawQueue::ReadData {
62 bool writable_start;
63};
64
Brian Silverman08661c72013-09-01 17:24:38 -070065void RawQueue::DecrementMessageReferenceCount(const void *msg) {
Brian Silvermana6d1b562013-09-01 14:39:39 -070066 MessageHeader *header = MessageHeader::Get(msg);
Brian Silvermanad290d82014-03-19 17:22:05 -070067 __atomic_sub_fetch(&header->ref_count, 1, __ATOMIC_RELAXED);
Brian Silvermana6d1b562013-09-01 14:39:39 -070068 if (kRefDebug) {
69 printf("ref_dec_count: %p count=%d\n", msg, header->ref_count);
70 }
Brian Silvermanad290d82014-03-19 17:22:05 -070071
72 // The only way it should ever be 0 is if we were the last one to decrement,
73 // in which case nobody else should have it around to re-increment it or
74 // anything in the middle, so this is safe to do not atomically with the
75 // decrement.
Brian Silvermana6d1b562013-09-01 14:39:39 -070076 if (header->ref_count == 0) {
Brian Silvermanad290d82014-03-19 17:22:05 -070077 MutexLocker locker(&pool_lock_);
Brian Silvermana6d1b562013-09-01 14:39:39 -070078 DoFreeMessage(msg);
Brian Silvermanad290d82014-03-19 17:22:05 -070079 } else {
80 assert(header->ref_count > 0);
Brian Silvermana6d1b562013-09-01 14:39:39 -070081 }
82}
83
Brian Silverman08661c72013-09-01 17:24:38 -070084RawQueue::RawQueue(const char *name, size_t length, int hash, int queue_length)
Brian Silverman5f8c4922014-02-11 21:22:38 -080085 : readable_(&data_lock_), writable_(&data_lock_) {
Brian Silvermana6d1b562013-09-01 14:39:39 -070086 const size_t name_size = strlen(name) + 1;
87 char *temp = static_cast<char *>(shm_malloc(name_size));
88 memcpy(temp, name, name_size);
89 name_ = temp;
90 length_ = length;
91 hash_ = hash;
92 queue_length_ = queue_length;
93
94 next_ = NULL;
95 recycle_ = NULL;
96
97 if (kFetchDebug) {
98 printf("initializing name=%s, length=%zd, hash=%d, queue_length=%d\n",
99 name, length, hash, queue_length);
100 }
101
102 data_length_ = queue_length + 1;
103 if (data_length_ < 2) { // TODO(brians) when could this happen?
104 data_length_ = 2;
105 }
106 data_ = static_cast<void **>(shm_malloc(sizeof(void *) * data_length_));
107 data_start_ = 0;
108 data_end_ = 0;
109 messages_ = 0;
110
111 mem_length_ = queue_length + kExtraMessages;
112 pool_length_ = 0;
113 messages_used_ = 0;
114 msg_length_ = length + sizeof(MessageHeader);
115 pool_ = static_cast<MessageHeader **>(
116 shm_malloc(sizeof(MessageHeader *) * mem_length_));
117
118 if (kFetchDebug) {
119 printf("made queue %s\n", name);
120 }
121}
Brian Silverman08661c72013-09-01 17:24:38 -0700122RawQueue *RawQueue::Fetch(const char *name, size_t length, int hash,
Brian Silvermana6d1b562013-09-01 14:39:39 -0700123 int queue_length) {
124 if (kFetchDebug) {
125 printf("fetching queue %s\n", name);
126 }
Brian Silverman4aeac5f2014-02-11 22:17:07 -0800127 if (mutex_lock(&global_core->mem_struct->queues.lock) != 0) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700128 return NULL;
129 }
Brian Silverman08661c72013-09-01 17:24:38 -0700130 RawQueue *current = static_cast<RawQueue *>(
Brian Silverman4aeac5f2014-02-11 22:17:07 -0800131 global_core->mem_struct->queues.pointer);
Brian Silverman797e71e2013-09-06 17:29:39 -0700132 if (current != NULL) {
133 while (true) {
134 // If we found a matching queue.
135 if (strcmp(current->name_, name) == 0 && current->length_ == length &&
136 current->hash_ == hash && current->queue_length_ == queue_length) {
Brian Silverman4aeac5f2014-02-11 22:17:07 -0800137 mutex_unlock(&global_core->mem_struct->queues.lock);
Brian Silverman797e71e2013-09-06 17:29:39 -0700138 return current;
139 } else {
140 if (kFetchDebug) {
141 printf("rejected queue %s strcmp=%d target=%s\n", current->name_,
142 strcmp(current->name_, name), name);
143 }
Brian Silvermana6d1b562013-09-01 14:39:39 -0700144 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700145 // If this is the last one.
146 if (current->next_ == NULL) break;
147 current = current->next_;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700148 }
Brian Silvermana6d1b562013-09-01 14:39:39 -0700149 }
150
Brian Silverman797e71e2013-09-06 17:29:39 -0700151 RawQueue *r = new (shm_malloc(sizeof(RawQueue)))
152 RawQueue(name, length, hash, queue_length);
153 if (current == NULL) { // if we don't already have one
Brian Silverman4aeac5f2014-02-11 22:17:07 -0800154 global_core->mem_struct->queues.pointer = r;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700155 } else {
Brian Silverman797e71e2013-09-06 17:29:39 -0700156 current->next_ = r;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700157 }
158
Brian Silverman4aeac5f2014-02-11 22:17:07 -0800159 mutex_unlock(&global_core->mem_struct->queues.lock);
Brian Silverman797e71e2013-09-06 17:29:39 -0700160 return r;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700161}
Brian Silverman08661c72013-09-01 17:24:38 -0700162RawQueue *RawQueue::Fetch(const char *name, size_t length, int hash,
Brian Silvermana6d1b562013-09-01 14:39:39 -0700163 int queue_length,
Brian Silverman08661c72013-09-01 17:24:38 -0700164 int recycle_hash, int recycle_length, RawQueue **recycle) {
165 RawQueue *r = Fetch(name, length, hash, queue_length);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700166 r->recycle_ = Fetch(name, length, recycle_hash, recycle_length);
167 if (r == r->recycle_) {
168 fprintf(stderr, "queue: r->recycle_(=%p) == r(=%p)\n", r->recycle_, r);
169 printf("see stderr\n");
Brian Silverman797e71e2013-09-06 17:29:39 -0700170 r->recycle_ = NULL;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700171 abort();
172 }
173 *recycle = r->recycle_;
174 return r;
175}
176
Brian Silverman08661c72013-09-01 17:24:38 -0700177void RawQueue::DoFreeMessage(const void *msg) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700178 MessageHeader *header = MessageHeader::Get(msg);
179 if (pool_[header->index] != header) { // if something's messed up
180 fprintf(stderr, "queue: something is very very wrong with queue %p."
181 " pool_(=%p)[header->index(=%d)] != header(=%p)\n",
182 this, pool_, header->index, header);
183 printf("queue: see stderr\n");
184 abort();
185 }
186 if (kRefDebug) {
187 printf("ref free: %p\n", msg);
188 }
189 --messages_used_;
190
191 if (recycle_ != NULL) {
192 void *const new_msg = recycle_->GetMessage();
193 if (new_msg == NULL) {
194 fprintf(stderr, "queue: couldn't get a message"
195 " for recycle queue %p\n", recycle_);
196 } else {
197 // Take a message from recycle_ and switch its
198 // header with the one being freed, which effectively
199 // switches which queue each message belongs to.
200 MessageHeader *const new_header = MessageHeader::Get(new_msg);
Brian Silverman797e71e2013-09-06 17:29:39 -0700201 // Also switch the messages between the pools.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700202 pool_[header->index] = new_header;
203 {
204 MutexLocker locker(&recycle_->pool_lock_);
205 recycle_->pool_[new_header->index] = header;
Brian Silverman797e71e2013-09-06 17:29:39 -0700206 // Swap the information in both headers.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700207 header->Swap(new_header);
Brian Silverman797e71e2013-09-06 17:29:39 -0700208 // Don't unlock the other pool until all of its messages are valid.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700209 }
210 // use the header for new_msg which is now for this pool
211 header = new_header;
212 if (!recycle_->WriteMessage(const_cast<void *>(msg), kOverride)) {
213 fprintf(stderr, "queue: %p->WriteMessage(%p, kOverride) failed."
214 " aborting\n", recycle_, msg);
215 printf("see stderr\n");
216 abort();
217 }
218 msg = new_msg;
219 }
220 }
221
Brian Silverman797e71e2013-09-06 17:29:39 -0700222 // Where the one we're freeing was.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700223 int index = header->index;
224 header->index = -1;
225 if (index != messages_used_) { // if we're not freeing the one on the end
Brian Silverman797e71e2013-09-06 17:29:39 -0700226 // Put the last one where the one we're freeing was.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700227 header = pool_[index] = pool_[messages_used_];
Brian Silverman797e71e2013-09-06 17:29:39 -0700228 // Put the one we're freeing at the end.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700229 pool_[messages_used_] = MessageHeader::Get(msg);
Brian Silverman797e71e2013-09-06 17:29:39 -0700230 // Update the former last one's index.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700231 header->index = index;
232 }
233}
234
Brian Silverman08661c72013-09-01 17:24:38 -0700235bool RawQueue::WriteMessage(void *msg, int options) {
Brian Silvermaneb51cbb2014-03-14 22:57:08 -0700236 // TODO(brians): Test this function.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700237 if (kWriteDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700238 printf("queue: %p->WriteMessage(%p, %x)\n", this, msg, options);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700239 }
240 if (msg == NULL || msg < reinterpret_cast<void *>(global_core->mem_struct) ||
241 msg > static_cast<void *>((
Brian Silverman08661c72013-09-01 17:24:38 -0700242 reinterpret_cast<char *>(global_core->mem_struct) +
Brian Silvermana6d1b562013-09-01 14:39:39 -0700243 global_core->size))) {
244 fprintf(stderr, "queue: attempt to write bad message %p to %p. aborting\n",
245 msg, this);
246 printf("see stderr\n");
247 abort();
248 }
249 {
250 MutexLocker locker(&data_lock_);
Brian Silverman797e71e2013-09-06 17:29:39 -0700251 bool writable_waited = false;
252
253 int new_end;
254 while (true) {
255 new_end = (data_end_ + 1) % data_length_;
256 // If there is room in the queue right now.
257 if (new_end != data_start_) break;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700258 if (options & kNonBlock) {
259 if (kWriteDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700260 printf("queue: not blocking on %p. returning false\n", this);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700261 }
Brian Silverman358c49f2014-03-05 16:56:34 -0800262 DecrementMessageReferenceCount(msg);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700263 return false;
264 } else if (options & kOverride) {
265 if (kWriteDebug) {
266 printf("queue: overriding on %p\n", this);
267 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700268 // Avoid leaking the message that we're going to overwrite.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700269 DecrementMessageReferenceCount(data_[data_start_]);
270 data_start_ = (data_start_ + 1) % data_length_;
271 } else { // kBlock
272 if (kWriteDebug) {
273 printf("queue: going to wait for writable_ of %p\n", this);
274 }
Brian Silverman08661c72013-09-01 17:24:38 -0700275 writable_.Wait();
Brian Silverman797e71e2013-09-06 17:29:39 -0700276 writable_waited = true;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700277 }
Brian Silvermana6d1b562013-09-01 14:39:39 -0700278 }
279 data_[data_end_] = msg;
280 ++messages_;
281 data_end_ = new_end;
Brian Silverman797e71e2013-09-06 17:29:39 -0700282
283 if (kWriteDebug) {
284 printf("queue: broadcasting to readable_ of %p\n", this);
285 }
286 readable_.Broadcast();
287
288 // If we got a signal on writable_ here and it's still writable, then we
289 // need to signal the next person in line (if any).
290 if (writable_waited && is_writable()) {
291 if (kWriteDebug) {
292 printf("queue: resignalling writable_ of %p\n", this);
293 }
294 writable_.Signal();
295 }
Brian Silvermana6d1b562013-09-01 14:39:39 -0700296 }
297 if (kWriteDebug) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700298 printf("queue: write returning true on queue %p\n", this);
299 }
300 return true;
301}
302
Brian Silverman797e71e2013-09-06 17:29:39 -0700303void RawQueue::ReadCommonEnd(ReadData *read_data) {
304 if (is_writable()) {
305 if (kReadDebug) {
306 printf("queue: %ssignalling writable_ of %p\n",
307 read_data->writable_start ? "not " : "", this);
308 }
309 if (!read_data->writable_start) writable_.Signal();
Brian Silvermana6d1b562013-09-01 14:39:39 -0700310 }
311}
Brian Silverman797e71e2013-09-06 17:29:39 -0700312bool RawQueue::ReadCommonStart(int options, int *index, ReadData *read_data) {
313 read_data->writable_start = is_writable();
Brian Silvermana6d1b562013-09-01 14:39:39 -0700314 while (data_start_ == data_end_ || ((index != NULL) && messages_ <= *index)) {
315 if (options & kNonBlock) {
316 if (kReadDebug) {
317 printf("queue: not going to block waiting on %p\n", this);
318 }
319 return false;
320 } else { // kBlock
321 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700322 printf("queue: going to wait for readable_ of %p\n", this);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700323 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700324 // Wait for a message to become readable.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700325 readable_.Wait();
326 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700327 printf("queue: done waiting for readable_ of %p\n", this);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700328 }
Brian Silvermana6d1b562013-09-01 14:39:39 -0700329 }
330 }
331 if (kReadDebug) {
Brian Silvermanc39e2bd2014-02-21 09:17:35 -0800332 printf("queue: %p->read(%p) start=%d end=%d\n", this, index, data_start_,
333 data_end_);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700334 }
335 return true;
336}
Brian Silvermaneb51cbb2014-03-14 22:57:08 -0700337void *RawQueue::ReadPeek(int options, int start) const {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700338 void *ret;
339 if (options & kFromEnd) {
340 int pos = data_end_ - 1;
341 if (pos < 0) { // if it needs to wrap
342 pos = data_length_ - 1;
343 }
344 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700345 printf("queue: %p reading from line %d: %d\n", this, __LINE__, pos);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700346 }
347 ret = data_[pos];
348 } else {
Brian Silvermaneb51cbb2014-03-14 22:57:08 -0700349 assert(start != -1);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700350 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700351 printf("queue: %p reading from line %d: %d\n", this, __LINE__, start);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700352 }
353 ret = data_[start];
354 }
355 MessageHeader *const header = MessageHeader::Get(ret);
Brian Silvermanad290d82014-03-19 17:22:05 -0700356 __atomic_add_fetch(&header->ref_count, 1, __ATOMIC_RELAXED);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700357 if (kRefDebug) {
Brian Silvermanad290d82014-03-19 17:22:05 -0700358 printf("ref inc count1: %p\n", ret);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700359 }
360 return ret;
361}
Brian Silverman08661c72013-09-01 17:24:38 -0700362const void *RawQueue::ReadMessage(int options) {
Brian Silvermaneb51cbb2014-03-14 22:57:08 -0700363 // TODO(brians): Test this function.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700364 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700365 printf("queue: %p->ReadMessage(%x)\n", this, options);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700366 }
367 void *msg = NULL;
Brian Silverman797e71e2013-09-06 17:29:39 -0700368
Brian Silvermana6d1b562013-09-01 14:39:39 -0700369 MutexLocker locker(&data_lock_);
Brian Silverman797e71e2013-09-06 17:29:39 -0700370
371 ReadData read_data;
372 if (!ReadCommonStart(options, NULL, &read_data)) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700373 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700374 printf("queue: %p common returned false\n", this);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700375 }
376 return NULL;
377 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700378
Brian Silvermana6d1b562013-09-01 14:39:39 -0700379 if (options & kPeek) {
380 msg = ReadPeek(options, data_start_);
381 } else {
382 if (options & kFromEnd) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700383 while (true) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700384 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700385 printf("queue: %p start of c2\n", this);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700386 }
387 // This loop pulls each message out of the buffer.
388 const int pos = data_start_;
389 data_start_ = (data_start_ + 1) % data_length_;
Brian Silverman797e71e2013-09-06 17:29:39 -0700390 // If this is the last one.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700391 if (data_start_ == data_end_) {
392 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700393 printf("queue: %p reading from c2: %d\n", this, pos);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700394 }
395 msg = data_[pos];
396 break;
397 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700398 // This message is not going to be in the queue any more.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700399 DecrementMessageReferenceCount(data_[pos]);
400 }
401 } else {
402 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700403 printf("queue: %p reading from d2: %d\n", this, data_start_);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700404 }
405 msg = data_[data_start_];
406 data_start_ = (data_start_ + 1) % data_length_;
407 }
408 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700409 ReadCommonEnd(&read_data);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700410 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700411 printf("queue: %p read returning %p\n", this, msg);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700412 }
413 return msg;
414}
Brian Silverman08661c72013-09-01 17:24:38 -0700415const void *RawQueue::ReadMessageIndex(int options, int *index) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700416 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700417 printf("queue: %p->ReadMessageIndex(%x, %p(*=%d))\n",
Brian Silvermana6d1b562013-09-01 14:39:39 -0700418 this, options, index, *index);
419 }
420 void *msg = NULL;
Brian Silverman797e71e2013-09-06 17:29:39 -0700421
422 MutexLocker locker(&data_lock_);
423
424 ReadData read_data;
425 if (!ReadCommonStart(options, index, &read_data)) {
426 if (kReadDebug) {
427 printf("queue: %p common returned false\n", this);
428 }
429 return NULL;
430 }
431
432 // TODO(parker): Handle integer wrap on the index.
433
Brian Silverman797e71e2013-09-06 17:29:39 -0700434 // Where we're going to start reading.
Brian Silvermanc39e2bd2014-02-21 09:17:35 -0800435 int my_start;
436
Brian Silvermaneb51cbb2014-03-14 22:57:08 -0700437 if (options & kFromEnd) {
438 my_start = -1;
Brian Silvermanc39e2bd2014-02-21 09:17:35 -0800439 } else {
Brian Silvermaneb51cbb2014-03-14 22:57:08 -0700440 const int unread_messages = messages_ - *index;
441 assert(unread_messages > 0);
442 int current_messages = data_end_ - data_start_;
443 if (current_messages < 0) current_messages += data_length_;
Brian Silvermancd2d84c2014-03-13 23:30:58 -0700444 if (kReadIndexDebug) {
Brian Silvermaneb51cbb2014-03-14 22:57:08 -0700445 printf("queue: %p start=%d end=%d current=%d\n",
446 this, data_start_, data_end_, current_messages);
Brian Silvermancd2d84c2014-03-13 23:30:58 -0700447 }
Brian Silvermaneb51cbb2014-03-14 22:57:08 -0700448 assert(current_messages > 0);
449 // If we're behind the available messages.
450 if (unread_messages > current_messages) {
451 // Catch index up to the last available message.
452 *index = messages_ - current_messages;
453 // And that's the one we're going to read.
454 my_start = data_start_;
455 if (kReadIndexDebug) {
456 printf("queue: %p jumping ahead to message %d (have %d) (at %d)\n",
457 this, *index, messages_, data_start_);
458 }
Brian Silvermancd2d84c2014-03-13 23:30:58 -0700459 } else {
Brian Silvermaneb51cbb2014-03-14 22:57:08 -0700460 // Just start reading at the first available message that we haven't yet
461 // read.
462 my_start = data_end_ - unread_messages;
463 if (kReadIndexDebug) {
464 printf("queue: %p original read from %d\n", this, my_start);
465 }
466 if (data_start_ < data_end_) {
467 assert(my_start >= data_start_);
468 } else {
469 if (my_start < 0) my_start += data_length_;
470 }
Brian Silverman67e34f52014-03-13 15:52:57 -0700471 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700472 }
Brian Silvermanc39e2bd2014-02-21 09:17:35 -0800473
Brian Silverman797e71e2013-09-06 17:29:39 -0700474 if (options & kPeek) {
475 msg = ReadPeek(options, my_start);
476 } else {
477 if (options & kFromEnd) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700478 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700479 printf("queue: %p start of c1\n", this);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700480 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700481 int pos = data_end_ - 1;
Brian Silvermaneb51cbb2014-03-14 22:57:08 -0700482 if (kReadIndexDebug) {
483 printf("queue: %p end pos start %d\n", this, pos);
484 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700485 if (pos < 0) { // If it wrapped.
486 pos = data_length_ - 1; // Unwrap it.
487 }
488 if (kReadDebug) {
489 printf("queue: %p reading from c1: %d\n", this, pos);
490 }
491 msg = data_[pos];
492 *index = messages_;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700493 } else {
Brian Silverman797e71e2013-09-06 17:29:39 -0700494 if (kReadDebug) {
495 printf("queue: %p reading from d1: %d\n", this, my_start);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700496 }
Brian Silvermanc39e2bd2014-02-21 09:17:35 -0800497 // This assert checks that we're either within both endpoints (duh) or
Brian Silvermancd2d84c2014-03-13 23:30:58 -0700498 // not between them (if the queue is wrapped around).
Brian Silvermanc39e2bd2014-02-21 09:17:35 -0800499 assert((my_start >= data_start_ && my_start < data_end_) ||
Brian Silvermancd2d84c2014-03-13 23:30:58 -0700500 ((my_start >= data_start_) == (my_start > data_end_)));
Brian Silvermaneb51cbb2014-03-14 22:57:08 -0700501 // More sanity checking.
502 assert((my_start >= 0) && (my_start < data_length_));
Brian Silverman797e71e2013-09-06 17:29:39 -0700503 msg = data_[my_start];
504 ++(*index);
505 }
506 MessageHeader *const header = MessageHeader::Get(msg);
Brian Silvermanad290d82014-03-19 17:22:05 -0700507 __atomic_add_fetch(&header->ref_count, 1, __ATOMIC_RELAXED);
Brian Silverman797e71e2013-09-06 17:29:39 -0700508 if (kRefDebug) {
Brian Silvermanad290d82014-03-19 17:22:05 -0700509 printf("ref_inc_count2: %p\n", msg);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700510 }
511 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700512 ReadCommonEnd(&read_data);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700513 return msg;
514}
515
Brian Silverman08661c72013-09-01 17:24:38 -0700516void *RawQueue::GetMessage() {
Brian Silvermaneb51cbb2014-03-14 22:57:08 -0700517 // TODO(brians): Test this function.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700518 MutexLocker locker(&pool_lock_);
519 MessageHeader *header;
520 if (pool_length_ - messages_used_ > 0) {
521 header = pool_[messages_used_];
522 } else {
523 if (pool_length_ >= mem_length_) {
Brian Silverman08661c72013-09-01 17:24:38 -0700524 LOG(FATAL, "overused pool of queue %p\n", this);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700525 }
526 header = pool_[pool_length_] =
527 static_cast<MessageHeader *>(shm_malloc(msg_length_));
528 ++pool_length_;
529 }
530 void *msg = reinterpret_cast<uint8_t *>(header) + sizeof(MessageHeader);
531 header->ref_count = 1;
Brian Silvermanad290d82014-03-19 17:22:05 -0700532 static_assert(
533 __atomic_always_lock_free(sizeof(header->ref_count), &header->ref_count),
534 "we access this using not specifically atomic loads and stores");
Brian Silvermana6d1b562013-09-01 14:39:39 -0700535 if (kRefDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700536 printf("%p ref alloc: %p\n", this, msg);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700537 }
538 header->index = messages_used_;
539 ++messages_used_;
540 return msg;
541}
542
543} // namespace aos