blob: 955adeac11bebb52514c0683ecac24748fd5fb8c [file] [log] [blame]
Brian Silverman14fd0fb2014-01-14 21:42:01 -08001#include "aos/linux_code/ipc_lib/queue.h"
Brian Silvermana6d1b562013-09-01 14:39:39 -07002
3#include <stdio.h>
4#include <string.h>
5#include <errno.h>
6#include <assert.h>
7
8#include <memory>
Brian Silvermanc39e2bd2014-02-21 09:17:35 -08009#include <algorithm>
Brian Silvermana6d1b562013-09-01 14:39:39 -070010
11#include "aos/common/logging/logging.h"
12#include "aos/common/type_traits.h"
Brian Silverman14fd0fb2014-01-14 21:42:01 -080013#include "aos/linux_code/ipc_lib/core_lib.h"
Brian Silvermana6d1b562013-09-01 14:39:39 -070014
15namespace aos {
Brian Silvermana6d1b562013-09-01 14:39:39 -070016namespace {
17
Brian Silverman08661c72013-09-01 17:24:38 -070018static_assert(shm_ok<RawQueue>::value,
19 "RawQueue instances go into shared memory");
Brian Silvermana6d1b562013-09-01 14:39:39 -070020
21const bool kReadDebug = false;
22const bool kWriteDebug = false;
23const bool kRefDebug = false;
24const bool kFetchDebug = false;
Brian Silvermancd2d84c2014-03-13 23:30:58 -070025const bool kReadIndexDebug = false;
Brian Silvermana6d1b562013-09-01 14:39:39 -070026
27// The number of extra messages the pool associated with each queue will be able
Brian Silverman08661c72013-09-01 17:24:38 -070028// to hold (for readers who are slow about freeing them or who leak one when
29// they get killed).
Brian Silvermana6d1b562013-09-01 14:39:39 -070030const int kExtraMessages = 20;
31
32} // namespace
33
Brian Silverman08661c72013-09-01 17:24:38 -070034const int RawQueue::kPeek;
35const int RawQueue::kFromEnd;
36const int RawQueue::kNonBlock;
37const int RawQueue::kBlock;
38const int RawQueue::kOverride;
39
Brian Silverman430e7fa2014-03-21 16:58:33 -070040// This is what gets stuck in before each queue message in memory. It is always
41// allocated aligned to 8 bytes and its size has to maintain that alignment for
42// the message that follows immediately.
Brian Silverman08661c72013-09-01 17:24:38 -070043struct RawQueue::MessageHeader {
Brian Silvermanad290d82014-03-19 17:22:05 -070044 // This gets incremented and decremented with atomic instructions without any
45 // locks held.
Brian Silvermana6d1b562013-09-01 14:39:39 -070046 int ref_count;
47 int index; // in pool_
Brian Silverman5f8c4922014-02-11 21:22:38 -080048 // Gets the message header immediately preceding msg.
Brian Silvermana6d1b562013-09-01 14:39:39 -070049 static MessageHeader *Get(const void *msg) {
Brian Silverman63cf2412013-11-17 05:44:36 -080050 return reinterpret_cast<MessageHeader *>(__builtin_assume_aligned(
51 static_cast<uint8_t *>(const_cast<void *>(msg)) - sizeof(MessageHeader),
52 alignof(MessageHeader)));
Brian Silvermana6d1b562013-09-01 14:39:39 -070053 }
54 void Swap(MessageHeader *other) {
55 MessageHeader temp;
56 memcpy(&temp, other, sizeof(temp));
57 memcpy(other, this, sizeof(*other));
58 memcpy(this, &temp, sizeof(*this));
59 }
60};
Brian Silverman5f8c4922014-02-11 21:22:38 -080061static_assert(shm_ok<RawQueue::MessageHeader>::value,
62 "the whole point is to stick it in shared memory");
Brian Silvermana6d1b562013-09-01 14:39:39 -070063
Brian Silverman797e71e2013-09-06 17:29:39 -070064struct RawQueue::ReadData {
65 bool writable_start;
66};
67
Brian Silverman08661c72013-09-01 17:24:38 -070068void RawQueue::DecrementMessageReferenceCount(const void *msg) {
Brian Silvermana6d1b562013-09-01 14:39:39 -070069 MessageHeader *header = MessageHeader::Get(msg);
Brian Silvermanad290d82014-03-19 17:22:05 -070070 __atomic_sub_fetch(&header->ref_count, 1, __ATOMIC_RELAXED);
Brian Silvermana6d1b562013-09-01 14:39:39 -070071 if (kRefDebug) {
72 printf("ref_dec_count: %p count=%d\n", msg, header->ref_count);
73 }
Brian Silvermanad290d82014-03-19 17:22:05 -070074
75 // The only way it should ever be 0 is if we were the last one to decrement,
76 // in which case nobody else should have it around to re-increment it or
77 // anything in the middle, so this is safe to do not atomically with the
78 // decrement.
Brian Silvermana6d1b562013-09-01 14:39:39 -070079 if (header->ref_count == 0) {
80 DoFreeMessage(msg);
Brian Silvermanad290d82014-03-19 17:22:05 -070081 } else {
82 assert(header->ref_count > 0);
Brian Silvermana6d1b562013-09-01 14:39:39 -070083 }
84}
85
Brian Silverman430e7fa2014-03-21 16:58:33 -070086void RawQueue::IncrementMessageReferenceCount(const void *msg) const {
87 MessageHeader *const header = MessageHeader::Get(msg);
88 __atomic_add_fetch(&header->ref_count, 1, __ATOMIC_RELAXED);
89 if (kRefDebug) {
90 printf("ref inc count: %p\n", msg);
91 }
92}
93
Brian Silverman08661c72013-09-01 17:24:38 -070094RawQueue::RawQueue(const char *name, size_t length, int hash, int queue_length)
Brian Silverman5f8c4922014-02-11 21:22:38 -080095 : readable_(&data_lock_), writable_(&data_lock_) {
Brian Silvermana6d1b562013-09-01 14:39:39 -070096 const size_t name_size = strlen(name) + 1;
97 char *temp = static_cast<char *>(shm_malloc(name_size));
98 memcpy(temp, name, name_size);
99 name_ = temp;
100 length_ = length;
101 hash_ = hash;
102 queue_length_ = queue_length;
103
104 next_ = NULL;
105 recycle_ = NULL;
106
107 if (kFetchDebug) {
108 printf("initializing name=%s, length=%zd, hash=%d, queue_length=%d\n",
109 name, length, hash, queue_length);
110 }
111
112 data_length_ = queue_length + 1;
113 if (data_length_ < 2) { // TODO(brians) when could this happen?
114 data_length_ = 2;
115 }
116 data_ = static_cast<void **>(shm_malloc(sizeof(void *) * data_length_));
117 data_start_ = 0;
118 data_end_ = 0;
119 messages_ = 0;
120
121 mem_length_ = queue_length + kExtraMessages;
122 pool_length_ = 0;
123 messages_used_ = 0;
124 msg_length_ = length + sizeof(MessageHeader);
125 pool_ = static_cast<MessageHeader **>(
126 shm_malloc(sizeof(MessageHeader *) * mem_length_));
127
128 if (kFetchDebug) {
129 printf("made queue %s\n", name);
130 }
131}
Brian Silverman08661c72013-09-01 17:24:38 -0700132RawQueue *RawQueue::Fetch(const char *name, size_t length, int hash,
Brian Silvermana6d1b562013-09-01 14:39:39 -0700133 int queue_length) {
134 if (kFetchDebug) {
135 printf("fetching queue %s\n", name);
136 }
Brian Silverman4aeac5f2014-02-11 22:17:07 -0800137 if (mutex_lock(&global_core->mem_struct->queues.lock) != 0) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700138 return NULL;
139 }
Brian Silverman08661c72013-09-01 17:24:38 -0700140 RawQueue *current = static_cast<RawQueue *>(
Brian Silverman4aeac5f2014-02-11 22:17:07 -0800141 global_core->mem_struct->queues.pointer);
Brian Silverman797e71e2013-09-06 17:29:39 -0700142 if (current != NULL) {
143 while (true) {
144 // If we found a matching queue.
145 if (strcmp(current->name_, name) == 0 && current->length_ == length &&
146 current->hash_ == hash && current->queue_length_ == queue_length) {
Brian Silverman4aeac5f2014-02-11 22:17:07 -0800147 mutex_unlock(&global_core->mem_struct->queues.lock);
Brian Silverman797e71e2013-09-06 17:29:39 -0700148 return current;
149 } else {
150 if (kFetchDebug) {
151 printf("rejected queue %s strcmp=%d target=%s\n", current->name_,
152 strcmp(current->name_, name), name);
153 }
Brian Silvermana6d1b562013-09-01 14:39:39 -0700154 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700155 // If this is the last one.
156 if (current->next_ == NULL) break;
157 current = current->next_;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700158 }
Brian Silvermana6d1b562013-09-01 14:39:39 -0700159 }
160
Brian Silverman797e71e2013-09-06 17:29:39 -0700161 RawQueue *r = new (shm_malloc(sizeof(RawQueue)))
162 RawQueue(name, length, hash, queue_length);
163 if (current == NULL) { // if we don't already have one
Brian Silverman4aeac5f2014-02-11 22:17:07 -0800164 global_core->mem_struct->queues.pointer = r;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700165 } else {
Brian Silverman797e71e2013-09-06 17:29:39 -0700166 current->next_ = r;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700167 }
168
Brian Silverman4aeac5f2014-02-11 22:17:07 -0800169 mutex_unlock(&global_core->mem_struct->queues.lock);
Brian Silverman797e71e2013-09-06 17:29:39 -0700170 return r;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700171}
Brian Silverman08661c72013-09-01 17:24:38 -0700172RawQueue *RawQueue::Fetch(const char *name, size_t length, int hash,
Brian Silvermana6d1b562013-09-01 14:39:39 -0700173 int queue_length,
Brian Silverman08661c72013-09-01 17:24:38 -0700174 int recycle_hash, int recycle_length, RawQueue **recycle) {
175 RawQueue *r = Fetch(name, length, hash, queue_length);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700176 r->recycle_ = Fetch(name, length, recycle_hash, recycle_length);
177 if (r == r->recycle_) {
178 fprintf(stderr, "queue: r->recycle_(=%p) == r(=%p)\n", r->recycle_, r);
179 printf("see stderr\n");
Brian Silverman797e71e2013-09-06 17:29:39 -0700180 r->recycle_ = NULL;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700181 abort();
182 }
183 *recycle = r->recycle_;
184 return r;
185}
186
Brian Silverman08661c72013-09-01 17:24:38 -0700187void RawQueue::DoFreeMessage(const void *msg) {
Brian Silverman430e7fa2014-03-21 16:58:33 -0700188 MutexLocker locker(&pool_lock_);
189
Brian Silvermana6d1b562013-09-01 14:39:39 -0700190 MessageHeader *header = MessageHeader::Get(msg);
191 if (pool_[header->index] != header) { // if something's messed up
192 fprintf(stderr, "queue: something is very very wrong with queue %p."
193 " pool_(=%p)[header->index(=%d)] != header(=%p)\n",
194 this, pool_, header->index, header);
195 printf("queue: see stderr\n");
196 abort();
197 }
198 if (kRefDebug) {
199 printf("ref free: %p\n", msg);
200 }
201 --messages_used_;
202
203 if (recycle_ != NULL) {
204 void *const new_msg = recycle_->GetMessage();
205 if (new_msg == NULL) {
206 fprintf(stderr, "queue: couldn't get a message"
207 " for recycle queue %p\n", recycle_);
208 } else {
209 // Take a message from recycle_ and switch its
210 // header with the one being freed, which effectively
211 // switches which queue each message belongs to.
212 MessageHeader *const new_header = MessageHeader::Get(new_msg);
Brian Silverman797e71e2013-09-06 17:29:39 -0700213 // Also switch the messages between the pools.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700214 pool_[header->index] = new_header;
215 {
216 MutexLocker locker(&recycle_->pool_lock_);
217 recycle_->pool_[new_header->index] = header;
Brian Silverman797e71e2013-09-06 17:29:39 -0700218 // Swap the information in both headers.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700219 header->Swap(new_header);
Brian Silverman797e71e2013-09-06 17:29:39 -0700220 // Don't unlock the other pool until all of its messages are valid.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700221 }
222 // use the header for new_msg which is now for this pool
223 header = new_header;
224 if (!recycle_->WriteMessage(const_cast<void *>(msg), kOverride)) {
225 fprintf(stderr, "queue: %p->WriteMessage(%p, kOverride) failed."
226 " aborting\n", recycle_, msg);
227 printf("see stderr\n");
228 abort();
229 }
230 msg = new_msg;
231 }
232 }
233
Brian Silverman430e7fa2014-03-21 16:58:33 -0700234 // If we're not freeing the one on the end.
235 if (header->index != messages_used_) {
236 // The one that is on the end before we change it.
237 MessageHeader *const other_header = pool_[messages_used_];
Brian Silverman797e71e2013-09-06 17:29:39 -0700238 // Put the last one where the one we're freeing was.
Brian Silverman430e7fa2014-03-21 16:58:33 -0700239 pool_[header->index] = other_header;
Brian Silverman797e71e2013-09-06 17:29:39 -0700240 // Update the former last one's index.
Brian Silverman430e7fa2014-03-21 16:58:33 -0700241 other_header->index = header->index;
242 // Put the one we're freeing at the end.
243 pool_[messages_used_] = header;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700244 }
Brian Silverman430e7fa2014-03-21 16:58:33 -0700245 header->index = -1;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700246}
247
Brian Silverman08661c72013-09-01 17:24:38 -0700248bool RawQueue::WriteMessage(void *msg, int options) {
Brian Silvermaneb51cbb2014-03-14 22:57:08 -0700249 // TODO(brians): Test this function.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700250 if (kWriteDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700251 printf("queue: %p->WriteMessage(%p, %x)\n", this, msg, options);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700252 }
253 if (msg == NULL || msg < reinterpret_cast<void *>(global_core->mem_struct) ||
254 msg > static_cast<void *>((
Brian Silverman08661c72013-09-01 17:24:38 -0700255 reinterpret_cast<char *>(global_core->mem_struct) +
Brian Silvermana6d1b562013-09-01 14:39:39 -0700256 global_core->size))) {
257 fprintf(stderr, "queue: attempt to write bad message %p to %p. aborting\n",
258 msg, this);
259 printf("see stderr\n");
260 abort();
261 }
262 {
263 MutexLocker locker(&data_lock_);
Brian Silverman797e71e2013-09-06 17:29:39 -0700264 bool writable_waited = false;
265
266 int new_end;
267 while (true) {
268 new_end = (data_end_ + 1) % data_length_;
269 // If there is room in the queue right now.
270 if (new_end != data_start_) break;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700271 if (options & kNonBlock) {
272 if (kWriteDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700273 printf("queue: not blocking on %p. returning false\n", this);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700274 }
Brian Silverman358c49f2014-03-05 16:56:34 -0800275 DecrementMessageReferenceCount(msg);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700276 return false;
277 } else if (options & kOverride) {
278 if (kWriteDebug) {
279 printf("queue: overriding on %p\n", this);
280 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700281 // Avoid leaking the message that we're going to overwrite.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700282 DecrementMessageReferenceCount(data_[data_start_]);
283 data_start_ = (data_start_ + 1) % data_length_;
284 } else { // kBlock
285 if (kWriteDebug) {
286 printf("queue: going to wait for writable_ of %p\n", this);
287 }
Brian Silverman08661c72013-09-01 17:24:38 -0700288 writable_.Wait();
Brian Silverman797e71e2013-09-06 17:29:39 -0700289 writable_waited = true;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700290 }
Brian Silvermana6d1b562013-09-01 14:39:39 -0700291 }
292 data_[data_end_] = msg;
293 ++messages_;
294 data_end_ = new_end;
Brian Silverman797e71e2013-09-06 17:29:39 -0700295
296 if (kWriteDebug) {
297 printf("queue: broadcasting to readable_ of %p\n", this);
298 }
299 readable_.Broadcast();
300
301 // If we got a signal on writable_ here and it's still writable, then we
302 // need to signal the next person in line (if any).
303 if (writable_waited && is_writable()) {
304 if (kWriteDebug) {
305 printf("queue: resignalling writable_ of %p\n", this);
306 }
307 writable_.Signal();
308 }
Brian Silvermana6d1b562013-09-01 14:39:39 -0700309 }
310 if (kWriteDebug) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700311 printf("queue: write returning true on queue %p\n", this);
312 }
313 return true;
314}
315
Brian Silverman797e71e2013-09-06 17:29:39 -0700316void RawQueue::ReadCommonEnd(ReadData *read_data) {
317 if (is_writable()) {
318 if (kReadDebug) {
319 printf("queue: %ssignalling writable_ of %p\n",
320 read_data->writable_start ? "not " : "", this);
321 }
322 if (!read_data->writable_start) writable_.Signal();
Brian Silvermana6d1b562013-09-01 14:39:39 -0700323 }
324}
Brian Silverman797e71e2013-09-06 17:29:39 -0700325bool RawQueue::ReadCommonStart(int options, int *index, ReadData *read_data) {
326 read_data->writable_start = is_writable();
Brian Silvermana6d1b562013-09-01 14:39:39 -0700327 while (data_start_ == data_end_ || ((index != NULL) && messages_ <= *index)) {
328 if (options & kNonBlock) {
329 if (kReadDebug) {
330 printf("queue: not going to block waiting on %p\n", this);
331 }
332 return false;
333 } else { // kBlock
334 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700335 printf("queue: going to wait for readable_ of %p\n", this);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700336 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700337 // Wait for a message to become readable.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700338 readable_.Wait();
339 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700340 printf("queue: done waiting for readable_ of %p\n", this);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700341 }
Brian Silvermana6d1b562013-09-01 14:39:39 -0700342 }
343 }
344 if (kReadDebug) {
Brian Silvermanc39e2bd2014-02-21 09:17:35 -0800345 printf("queue: %p->read(%p) start=%d end=%d\n", this, index, data_start_,
346 data_end_);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700347 }
348 return true;
349}
Brian Silvermaneb51cbb2014-03-14 22:57:08 -0700350void *RawQueue::ReadPeek(int options, int start) const {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700351 void *ret;
352 if (options & kFromEnd) {
353 int pos = data_end_ - 1;
354 if (pos < 0) { // if it needs to wrap
355 pos = data_length_ - 1;
356 }
357 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700358 printf("queue: %p reading from line %d: %d\n", this, __LINE__, pos);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700359 }
360 ret = data_[pos];
361 } else {
Brian Silvermaneb51cbb2014-03-14 22:57:08 -0700362 assert(start != -1);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700363 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700364 printf("queue: %p reading from line %d: %d\n", this, __LINE__, start);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700365 }
366 ret = data_[start];
367 }
Brian Silverman430e7fa2014-03-21 16:58:33 -0700368 IncrementMessageReferenceCount(ret);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700369 return ret;
370}
Brian Silverman08661c72013-09-01 17:24:38 -0700371const void *RawQueue::ReadMessage(int options) {
Brian Silvermaneb51cbb2014-03-14 22:57:08 -0700372 // TODO(brians): Test this function.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700373 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700374 printf("queue: %p->ReadMessage(%x)\n", this, options);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700375 }
376 void *msg = NULL;
Brian Silverman797e71e2013-09-06 17:29:39 -0700377
Brian Silvermana6d1b562013-09-01 14:39:39 -0700378 MutexLocker locker(&data_lock_);
Brian Silverman797e71e2013-09-06 17:29:39 -0700379
380 ReadData read_data;
381 if (!ReadCommonStart(options, NULL, &read_data)) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700382 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700383 printf("queue: %p common returned false\n", this);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700384 }
385 return NULL;
386 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700387
Brian Silvermana6d1b562013-09-01 14:39:39 -0700388 if (options & kPeek) {
389 msg = ReadPeek(options, data_start_);
390 } else {
391 if (options & kFromEnd) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700392 while (true) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700393 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700394 printf("queue: %p start of c2\n", this);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700395 }
396 // This loop pulls each message out of the buffer.
397 const int pos = data_start_;
398 data_start_ = (data_start_ + 1) % data_length_;
Brian Silverman797e71e2013-09-06 17:29:39 -0700399 // If this is the last one.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700400 if (data_start_ == data_end_) {
401 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700402 printf("queue: %p reading from c2: %d\n", this, pos);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700403 }
404 msg = data_[pos];
405 break;
406 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700407 // This message is not going to be in the queue any more.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700408 DecrementMessageReferenceCount(data_[pos]);
409 }
410 } else {
411 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700412 printf("queue: %p reading from d2: %d\n", this, data_start_);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700413 }
414 msg = data_[data_start_];
415 data_start_ = (data_start_ + 1) % data_length_;
416 }
417 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700418 ReadCommonEnd(&read_data);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700419 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700420 printf("queue: %p read returning %p\n", this, msg);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700421 }
422 return msg;
423}
Brian Silverman08661c72013-09-01 17:24:38 -0700424const void *RawQueue::ReadMessageIndex(int options, int *index) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700425 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700426 printf("queue: %p->ReadMessageIndex(%x, %p(*=%d))\n",
Brian Silvermana6d1b562013-09-01 14:39:39 -0700427 this, options, index, *index);
428 }
429 void *msg = NULL;
Brian Silverman797e71e2013-09-06 17:29:39 -0700430
431 MutexLocker locker(&data_lock_);
432
433 ReadData read_data;
434 if (!ReadCommonStart(options, index, &read_data)) {
435 if (kReadDebug) {
436 printf("queue: %p common returned false\n", this);
437 }
438 return NULL;
439 }
440
441 // TODO(parker): Handle integer wrap on the index.
442
Brian Silverman797e71e2013-09-06 17:29:39 -0700443 // Where we're going to start reading.
Brian Silvermanc39e2bd2014-02-21 09:17:35 -0800444 int my_start;
445
Brian Silvermaneb51cbb2014-03-14 22:57:08 -0700446 if (options & kFromEnd) {
447 my_start = -1;
Brian Silvermanc39e2bd2014-02-21 09:17:35 -0800448 } else {
Brian Silvermaneb51cbb2014-03-14 22:57:08 -0700449 const int unread_messages = messages_ - *index;
450 assert(unread_messages > 0);
451 int current_messages = data_end_ - data_start_;
452 if (current_messages < 0) current_messages += data_length_;
Brian Silvermancd2d84c2014-03-13 23:30:58 -0700453 if (kReadIndexDebug) {
Brian Silvermaneb51cbb2014-03-14 22:57:08 -0700454 printf("queue: %p start=%d end=%d current=%d\n",
455 this, data_start_, data_end_, current_messages);
Brian Silvermancd2d84c2014-03-13 23:30:58 -0700456 }
Brian Silvermaneb51cbb2014-03-14 22:57:08 -0700457 assert(current_messages > 0);
458 // If we're behind the available messages.
459 if (unread_messages > current_messages) {
460 // Catch index up to the last available message.
461 *index = messages_ - current_messages;
462 // And that's the one we're going to read.
463 my_start = data_start_;
464 if (kReadIndexDebug) {
465 printf("queue: %p jumping ahead to message %d (have %d) (at %d)\n",
466 this, *index, messages_, data_start_);
467 }
Brian Silvermancd2d84c2014-03-13 23:30:58 -0700468 } else {
Brian Silvermaneb51cbb2014-03-14 22:57:08 -0700469 // Just start reading at the first available message that we haven't yet
470 // read.
471 my_start = data_end_ - unread_messages;
472 if (kReadIndexDebug) {
473 printf("queue: %p original read from %d\n", this, my_start);
474 }
475 if (data_start_ < data_end_) {
476 assert(my_start >= data_start_);
477 } else {
478 if (my_start < 0) my_start += data_length_;
479 }
Brian Silverman67e34f52014-03-13 15:52:57 -0700480 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700481 }
Brian Silvermanc39e2bd2014-02-21 09:17:35 -0800482
Brian Silverman797e71e2013-09-06 17:29:39 -0700483 if (options & kPeek) {
484 msg = ReadPeek(options, my_start);
485 } else {
486 if (options & kFromEnd) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700487 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700488 printf("queue: %p start of c1\n", this);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700489 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700490 int pos = data_end_ - 1;
Brian Silvermaneb51cbb2014-03-14 22:57:08 -0700491 if (kReadIndexDebug) {
492 printf("queue: %p end pos start %d\n", this, pos);
493 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700494 if (pos < 0) { // If it wrapped.
495 pos = data_length_ - 1; // Unwrap it.
496 }
497 if (kReadDebug) {
498 printf("queue: %p reading from c1: %d\n", this, pos);
499 }
500 msg = data_[pos];
501 *index = messages_;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700502 } else {
Brian Silverman797e71e2013-09-06 17:29:39 -0700503 if (kReadDebug) {
504 printf("queue: %p reading from d1: %d\n", this, my_start);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700505 }
Brian Silvermanc39e2bd2014-02-21 09:17:35 -0800506 // This assert checks that we're either within both endpoints (duh) or
Brian Silvermancd2d84c2014-03-13 23:30:58 -0700507 // not between them (if the queue is wrapped around).
Brian Silvermanc39e2bd2014-02-21 09:17:35 -0800508 assert((my_start >= data_start_ && my_start < data_end_) ||
Brian Silvermancd2d84c2014-03-13 23:30:58 -0700509 ((my_start >= data_start_) == (my_start > data_end_)));
Brian Silvermaneb51cbb2014-03-14 22:57:08 -0700510 // More sanity checking.
511 assert((my_start >= 0) && (my_start < data_length_));
Brian Silverman797e71e2013-09-06 17:29:39 -0700512 msg = data_[my_start];
513 ++(*index);
514 }
Brian Silverman430e7fa2014-03-21 16:58:33 -0700515 IncrementMessageReferenceCount(msg);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700516 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700517 ReadCommonEnd(&read_data);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700518 return msg;
519}
520
Brian Silverman08661c72013-09-01 17:24:38 -0700521void *RawQueue::GetMessage() {
Brian Silvermaneb51cbb2014-03-14 22:57:08 -0700522 // TODO(brians): Test this function.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700523 MutexLocker locker(&pool_lock_);
524 MessageHeader *header;
Brian Silverman430e7fa2014-03-21 16:58:33 -0700525 if (pool_length_ > messages_used_) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700526 header = pool_[messages_used_];
Brian Silverman430e7fa2014-03-21 16:58:33 -0700527 //assert(header->index == messages_used_);
528 assert(header->index == -1);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700529 } else {
530 if (pool_length_ >= mem_length_) {
Brian Silverman08661c72013-09-01 17:24:38 -0700531 LOG(FATAL, "overused pool of queue %p\n", this);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700532 }
533 header = pool_[pool_length_] =
534 static_cast<MessageHeader *>(shm_malloc(msg_length_));
535 ++pool_length_;
536 }
Brian Silverman430e7fa2014-03-21 16:58:33 -0700537 header->index = messages_used_;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700538 void *msg = reinterpret_cast<uint8_t *>(header) + sizeof(MessageHeader);
539 header->ref_count = 1;
Brian Silvermanad290d82014-03-19 17:22:05 -0700540 static_assert(
541 __atomic_always_lock_free(sizeof(header->ref_count), &header->ref_count),
542 "we access this using not specifically atomic loads and stores");
Brian Silvermana6d1b562013-09-01 14:39:39 -0700543 if (kRefDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700544 printf("%p ref alloc: %p\n", this, msg);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700545 }
Brian Silvermana6d1b562013-09-01 14:39:39 -0700546 ++messages_used_;
547 return msg;
548}
549
550} // namespace aos