blob: 211882f748a7276a9f1ff4ad20128b7d083c3ad0 [file] [log] [blame]
Brian Silverman14fd0fb2014-01-14 21:42:01 -08001#include "aos/linux_code/ipc_lib/queue.h"
Brian Silvermana6d1b562013-09-01 14:39:39 -07002
3#include <stdio.h>
4#include <string.h>
5#include <errno.h>
6#include <assert.h>
7
8#include <memory>
Brian Silvermanc39e2bd2014-02-21 09:17:35 -08009#include <algorithm>
Brian Silvermana6d1b562013-09-01 14:39:39 -070010
11#include "aos/common/logging/logging.h"
12#include "aos/common/type_traits.h"
Brian Silverman14fd0fb2014-01-14 21:42:01 -080013#include "aos/linux_code/ipc_lib/core_lib.h"
Brian Silvermana6d1b562013-09-01 14:39:39 -070014
15namespace aos {
Brian Silvermana6d1b562013-09-01 14:39:39 -070016namespace {
17
Brian Silverman08661c72013-09-01 17:24:38 -070018static_assert(shm_ok<RawQueue>::value,
19 "RawQueue instances go into shared memory");
Brian Silvermana6d1b562013-09-01 14:39:39 -070020
21const bool kReadDebug = false;
22const bool kWriteDebug = false;
23const bool kRefDebug = false;
24const bool kFetchDebug = false;
Brian Silvermancd2d84c2014-03-13 23:30:58 -070025const bool kReadIndexDebug = false;
Brian Silvermana6d1b562013-09-01 14:39:39 -070026
27// The number of extra messages the pool associated with each queue will be able
Brian Silverman08661c72013-09-01 17:24:38 -070028// to hold (for readers who are slow about freeing them or who leak one when
29// they get killed).
Brian Silvermana6d1b562013-09-01 14:39:39 -070030const int kExtraMessages = 20;
31
32} // namespace
33
Brian Silverman08661c72013-09-01 17:24:38 -070034const int RawQueue::kPeek;
35const int RawQueue::kFromEnd;
36const int RawQueue::kNonBlock;
37const int RawQueue::kBlock;
38const int RawQueue::kOverride;
39
Brian Silverman430e7fa2014-03-21 16:58:33 -070040// This is what gets stuck in before each queue message in memory. It is always
41// allocated aligned to 8 bytes and its size has to maintain that alignment for
42// the message that follows immediately.
Brian Silverman08661c72013-09-01 17:24:38 -070043struct RawQueue::MessageHeader {
Brian Silvermanad290d82014-03-19 17:22:05 -070044 // This gets incremented and decremented with atomic instructions without any
45 // locks held.
Brian Silvermana6d1b562013-09-01 14:39:39 -070046 int ref_count;
Brian Silvermanc2e04222014-03-22 12:43:44 -070047 MessageHeader *next;
Brian Silverman5f8c4922014-02-11 21:22:38 -080048 // Gets the message header immediately preceding msg.
Brian Silvermana6d1b562013-09-01 14:39:39 -070049 static MessageHeader *Get(const void *msg) {
Brian Silverman63cf2412013-11-17 05:44:36 -080050 return reinterpret_cast<MessageHeader *>(__builtin_assume_aligned(
51 static_cast<uint8_t *>(const_cast<void *>(msg)) - sizeof(MessageHeader),
52 alignof(MessageHeader)));
Brian Silvermana6d1b562013-09-01 14:39:39 -070053 }
54 void Swap(MessageHeader *other) {
55 MessageHeader temp;
56 memcpy(&temp, other, sizeof(temp));
57 memcpy(other, this, sizeof(*other));
58 memcpy(this, &temp, sizeof(*this));
59 }
Brian Silvermanc2e04222014-03-22 12:43:44 -070060#if __SIZEOF_POINTER__ == 8
61 char padding[16 - sizeof(next) - sizeof(ref_count)];
62#elif __SIZEOF_POINTER__ == 4
63 // No padding needed to get 8 byte total size.
64#else
65#error Unknown pointer size.
66#endif
Brian Silvermana6d1b562013-09-01 14:39:39 -070067};
Brian Silvermana6d1b562013-09-01 14:39:39 -070068
Brian Silverman797e71e2013-09-06 17:29:39 -070069struct RawQueue::ReadData {
70 bool writable_start;
71};
72
Brian Silverman08661c72013-09-01 17:24:38 -070073void RawQueue::DecrementMessageReferenceCount(const void *msg) {
Brian Silvermana6d1b562013-09-01 14:39:39 -070074 MessageHeader *header = MessageHeader::Get(msg);
Brian Silvermanad290d82014-03-19 17:22:05 -070075 __atomic_sub_fetch(&header->ref_count, 1, __ATOMIC_RELAXED);
Brian Silvermana6d1b562013-09-01 14:39:39 -070076 if (kRefDebug) {
Brian Silvermanc2e04222014-03-22 12:43:44 -070077 printf("%p ref dec count: %p count=%d\n", this, msg, header->ref_count);
Brian Silvermana6d1b562013-09-01 14:39:39 -070078 }
Brian Silvermanad290d82014-03-19 17:22:05 -070079
80 // The only way it should ever be 0 is if we were the last one to decrement,
81 // in which case nobody else should have it around to re-increment it or
82 // anything in the middle, so this is safe to do not atomically with the
83 // decrement.
Brian Silvermana6d1b562013-09-01 14:39:39 -070084 if (header->ref_count == 0) {
85 DoFreeMessage(msg);
Brian Silvermanad290d82014-03-19 17:22:05 -070086 } else {
87 assert(header->ref_count > 0);
Brian Silvermana6d1b562013-09-01 14:39:39 -070088 }
89}
90
Brian Silverman430e7fa2014-03-21 16:58:33 -070091void RawQueue::IncrementMessageReferenceCount(const void *msg) const {
92 MessageHeader *const header = MessageHeader::Get(msg);
93 __atomic_add_fetch(&header->ref_count, 1, __ATOMIC_RELAXED);
94 if (kRefDebug) {
Brian Silvermanc2e04222014-03-22 12:43:44 -070095 printf("%p ref inc count: %p\n", this, msg);
Brian Silverman430e7fa2014-03-21 16:58:33 -070096 }
97}
98
Brian Silverman08661c72013-09-01 17:24:38 -070099RawQueue::RawQueue(const char *name, size_t length, int hash, int queue_length)
Brian Silverman5f8c4922014-02-11 21:22:38 -0800100 : readable_(&data_lock_), writable_(&data_lock_) {
Brian Silvermanc2e04222014-03-22 12:43:44 -0700101 static_assert(shm_ok<RawQueue::MessageHeader>::value,
102 "the whole point is to stick it in shared memory");
103 static_assert((sizeof(RawQueue::MessageHeader) % 8) == 0,
104 "need to revalidate size/alignent assumptions");
105
Brian Silvermana6d1b562013-09-01 14:39:39 -0700106 const size_t name_size = strlen(name) + 1;
107 char *temp = static_cast<char *>(shm_malloc(name_size));
108 memcpy(temp, name, name_size);
109 name_ = temp;
110 length_ = length;
111 hash_ = hash;
112 queue_length_ = queue_length;
113
114 next_ = NULL;
115 recycle_ = NULL;
116
117 if (kFetchDebug) {
118 printf("initializing name=%s, length=%zd, hash=%d, queue_length=%d\n",
119 name, length, hash, queue_length);
120 }
121
122 data_length_ = queue_length + 1;
123 if (data_length_ < 2) { // TODO(brians) when could this happen?
124 data_length_ = 2;
125 }
126 data_ = static_cast<void **>(shm_malloc(sizeof(void *) * data_length_));
127 data_start_ = 0;
128 data_end_ = 0;
129 messages_ = 0;
130
Brian Silvermana6d1b562013-09-01 14:39:39 -0700131 msg_length_ = length + sizeof(MessageHeader);
Brian Silvermanc2e04222014-03-22 12:43:44 -0700132
133 MessageHeader *previous = nullptr;
134 for (int i = 0; i < queue_length + kExtraMessages; ++i) {
135 MessageHeader *const message =
Brian Silverman60eff202014-03-21 17:10:02 -0700136 static_cast<MessageHeader *>(shm_malloc(msg_length_));
Brian Silvermanc2e04222014-03-22 12:43:44 -0700137 free_messages_ = message;
138 message->next = previous;
139 previous = message;
Brian Silverman60eff202014-03-21 17:10:02 -0700140 }
Brian Silvermana6d1b562013-09-01 14:39:39 -0700141
142 if (kFetchDebug) {
143 printf("made queue %s\n", name);
144 }
145}
Brian Silverman08661c72013-09-01 17:24:38 -0700146RawQueue *RawQueue::Fetch(const char *name, size_t length, int hash,
Brian Silvermana6d1b562013-09-01 14:39:39 -0700147 int queue_length) {
148 if (kFetchDebug) {
149 printf("fetching queue %s\n", name);
150 }
Brian Silverman4aeac5f2014-02-11 22:17:07 -0800151 if (mutex_lock(&global_core->mem_struct->queues.lock) != 0) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700152 return NULL;
153 }
Brian Silverman08661c72013-09-01 17:24:38 -0700154 RawQueue *current = static_cast<RawQueue *>(
Brian Silverman4aeac5f2014-02-11 22:17:07 -0800155 global_core->mem_struct->queues.pointer);
Brian Silverman797e71e2013-09-06 17:29:39 -0700156 if (current != NULL) {
157 while (true) {
158 // If we found a matching queue.
159 if (strcmp(current->name_, name) == 0 && current->length_ == length &&
160 current->hash_ == hash && current->queue_length_ == queue_length) {
Brian Silverman4aeac5f2014-02-11 22:17:07 -0800161 mutex_unlock(&global_core->mem_struct->queues.lock);
Brian Silverman797e71e2013-09-06 17:29:39 -0700162 return current;
163 } else {
164 if (kFetchDebug) {
165 printf("rejected queue %s strcmp=%d target=%s\n", current->name_,
166 strcmp(current->name_, name), name);
167 }
Brian Silvermana6d1b562013-09-01 14:39:39 -0700168 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700169 // If this is the last one.
170 if (current->next_ == NULL) break;
171 current = current->next_;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700172 }
Brian Silvermana6d1b562013-09-01 14:39:39 -0700173 }
174
Brian Silverman797e71e2013-09-06 17:29:39 -0700175 RawQueue *r = new (shm_malloc(sizeof(RawQueue)))
176 RawQueue(name, length, hash, queue_length);
177 if (current == NULL) { // if we don't already have one
Brian Silverman4aeac5f2014-02-11 22:17:07 -0800178 global_core->mem_struct->queues.pointer = r;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700179 } else {
Brian Silverman797e71e2013-09-06 17:29:39 -0700180 current->next_ = r;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700181 }
182
Brian Silverman4aeac5f2014-02-11 22:17:07 -0800183 mutex_unlock(&global_core->mem_struct->queues.lock);
Brian Silverman797e71e2013-09-06 17:29:39 -0700184 return r;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700185}
Brian Silverman08661c72013-09-01 17:24:38 -0700186RawQueue *RawQueue::Fetch(const char *name, size_t length, int hash,
Brian Silvermana6d1b562013-09-01 14:39:39 -0700187 int queue_length,
Brian Silverman08661c72013-09-01 17:24:38 -0700188 int recycle_hash, int recycle_length, RawQueue **recycle) {
189 RawQueue *r = Fetch(name, length, hash, queue_length);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700190 r->recycle_ = Fetch(name, length, recycle_hash, recycle_length);
191 if (r == r->recycle_) {
192 fprintf(stderr, "queue: r->recycle_(=%p) == r(=%p)\n", r->recycle_, r);
193 printf("see stderr\n");
Brian Silverman797e71e2013-09-06 17:29:39 -0700194 r->recycle_ = NULL;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700195 abort();
196 }
197 *recycle = r->recycle_;
198 return r;
199}
200
Brian Silverman08661c72013-09-01 17:24:38 -0700201void RawQueue::DoFreeMessage(const void *msg) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700202 MessageHeader *header = MessageHeader::Get(msg);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700203 if (kRefDebug) {
Brian Silvermanc2e04222014-03-22 12:43:44 -0700204 printf("%p ref free->%p: %p\n", this, recycle_, msg);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700205 }
Brian Silvermana6d1b562013-09-01 14:39:39 -0700206
207 if (recycle_ != NULL) {
208 void *const new_msg = recycle_->GetMessage();
209 if (new_msg == NULL) {
210 fprintf(stderr, "queue: couldn't get a message"
211 " for recycle queue %p\n", recycle_);
212 } else {
Brian Silvermanc2e04222014-03-22 12:43:44 -0700213 IncrementMessageReferenceCount(msg);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700214 if (!recycle_->WriteMessage(const_cast<void *>(msg), kOverride)) {
215 fprintf(stderr, "queue: %p->WriteMessage(%p, kOverride) failed."
216 " aborting\n", recycle_, msg);
217 printf("see stderr\n");
218 abort();
219 }
220 msg = new_msg;
Brian Silvermanc2e04222014-03-22 12:43:44 -0700221 header = MessageHeader::Get(new_msg);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700222 }
223 }
224
Brian Silvermanc2e04222014-03-22 12:43:44 -0700225 header->next = __atomic_load_n(&free_messages_, __ATOMIC_RELAXED);
226 while (
227 !__atomic_compare_exchange_n(&free_messages_, &header->next, header,
228 true, __ATOMIC_RELEASE, __ATOMIC_RELAXED)) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700229 }
230}
231
Brian Silverman08661c72013-09-01 17:24:38 -0700232bool RawQueue::WriteMessage(void *msg, int options) {
Brian Silvermaneb51cbb2014-03-14 22:57:08 -0700233 // TODO(brians): Test this function.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700234 if (kWriteDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700235 printf("queue: %p->WriteMessage(%p, %x)\n", this, msg, options);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700236 }
237 if (msg == NULL || msg < reinterpret_cast<void *>(global_core->mem_struct) ||
238 msg > static_cast<void *>((
Brian Silverman08661c72013-09-01 17:24:38 -0700239 reinterpret_cast<char *>(global_core->mem_struct) +
Brian Silvermana6d1b562013-09-01 14:39:39 -0700240 global_core->size))) {
241 fprintf(stderr, "queue: attempt to write bad message %p to %p. aborting\n",
242 msg, this);
243 printf("see stderr\n");
244 abort();
245 }
246 {
247 MutexLocker locker(&data_lock_);
Brian Silverman797e71e2013-09-06 17:29:39 -0700248 bool writable_waited = false;
249
250 int new_end;
251 while (true) {
252 new_end = (data_end_ + 1) % data_length_;
253 // If there is room in the queue right now.
254 if (new_end != data_start_) break;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700255 if (options & kNonBlock) {
256 if (kWriteDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700257 printf("queue: not blocking on %p. returning false\n", this);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700258 }
Brian Silverman358c49f2014-03-05 16:56:34 -0800259 DecrementMessageReferenceCount(msg);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700260 return false;
261 } else if (options & kOverride) {
262 if (kWriteDebug) {
263 printf("queue: overriding on %p\n", this);
264 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700265 // Avoid leaking the message that we're going to overwrite.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700266 DecrementMessageReferenceCount(data_[data_start_]);
267 data_start_ = (data_start_ + 1) % data_length_;
268 } else { // kBlock
269 if (kWriteDebug) {
270 printf("queue: going to wait for writable_ of %p\n", this);
271 }
Brian Silverman08661c72013-09-01 17:24:38 -0700272 writable_.Wait();
Brian Silverman797e71e2013-09-06 17:29:39 -0700273 writable_waited = true;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700274 }
Brian Silvermana6d1b562013-09-01 14:39:39 -0700275 }
276 data_[data_end_] = msg;
277 ++messages_;
278 data_end_ = new_end;
Brian Silverman797e71e2013-09-06 17:29:39 -0700279
280 if (kWriteDebug) {
281 printf("queue: broadcasting to readable_ of %p\n", this);
282 }
283 readable_.Broadcast();
284
285 // If we got a signal on writable_ here and it's still writable, then we
286 // need to signal the next person in line (if any).
287 if (writable_waited && is_writable()) {
288 if (kWriteDebug) {
289 printf("queue: resignalling writable_ of %p\n", this);
290 }
291 writable_.Signal();
292 }
Brian Silvermana6d1b562013-09-01 14:39:39 -0700293 }
294 if (kWriteDebug) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700295 printf("queue: write returning true on queue %p\n", this);
296 }
297 return true;
298}
299
Brian Silverman797e71e2013-09-06 17:29:39 -0700300void RawQueue::ReadCommonEnd(ReadData *read_data) {
301 if (is_writable()) {
302 if (kReadDebug) {
303 printf("queue: %ssignalling writable_ of %p\n",
304 read_data->writable_start ? "not " : "", this);
305 }
306 if (!read_data->writable_start) writable_.Signal();
Brian Silvermana6d1b562013-09-01 14:39:39 -0700307 }
308}
Brian Silverman797e71e2013-09-06 17:29:39 -0700309bool RawQueue::ReadCommonStart(int options, int *index, ReadData *read_data) {
310 read_data->writable_start = is_writable();
Brian Silvermana6d1b562013-09-01 14:39:39 -0700311 while (data_start_ == data_end_ || ((index != NULL) && messages_ <= *index)) {
312 if (options & kNonBlock) {
313 if (kReadDebug) {
314 printf("queue: not going to block waiting on %p\n", this);
315 }
316 return false;
317 } else { // kBlock
318 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700319 printf("queue: going to wait for readable_ of %p\n", this);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700320 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700321 // Wait for a message to become readable.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700322 readable_.Wait();
323 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700324 printf("queue: done waiting for readable_ of %p\n", this);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700325 }
Brian Silvermana6d1b562013-09-01 14:39:39 -0700326 }
327 }
328 if (kReadDebug) {
Brian Silvermanc39e2bd2014-02-21 09:17:35 -0800329 printf("queue: %p->read(%p) start=%d end=%d\n", this, index, data_start_,
330 data_end_);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700331 }
332 return true;
333}
Brian Silvermaneb51cbb2014-03-14 22:57:08 -0700334void *RawQueue::ReadPeek(int options, int start) const {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700335 void *ret;
336 if (options & kFromEnd) {
337 int pos = data_end_ - 1;
338 if (pos < 0) { // if it needs to wrap
339 pos = data_length_ - 1;
340 }
341 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700342 printf("queue: %p reading from line %d: %d\n", this, __LINE__, pos);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700343 }
344 ret = data_[pos];
345 } else {
Brian Silvermaneb51cbb2014-03-14 22:57:08 -0700346 assert(start != -1);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700347 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700348 printf("queue: %p reading from line %d: %d\n", this, __LINE__, start);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700349 }
350 ret = data_[start];
351 }
Brian Silverman430e7fa2014-03-21 16:58:33 -0700352 IncrementMessageReferenceCount(ret);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700353 return ret;
354}
Brian Silverman08661c72013-09-01 17:24:38 -0700355const void *RawQueue::ReadMessage(int options) {
Brian Silvermaneb51cbb2014-03-14 22:57:08 -0700356 // TODO(brians): Test this function.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700357 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700358 printf("queue: %p->ReadMessage(%x)\n", this, options);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700359 }
360 void *msg = NULL;
Brian Silverman797e71e2013-09-06 17:29:39 -0700361
Brian Silvermana6d1b562013-09-01 14:39:39 -0700362 MutexLocker locker(&data_lock_);
Brian Silverman797e71e2013-09-06 17:29:39 -0700363
364 ReadData read_data;
365 if (!ReadCommonStart(options, NULL, &read_data)) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700366 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700367 printf("queue: %p common returned false\n", this);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700368 }
369 return NULL;
370 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700371
Brian Silvermana6d1b562013-09-01 14:39:39 -0700372 if (options & kPeek) {
373 msg = ReadPeek(options, data_start_);
374 } else {
375 if (options & kFromEnd) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700376 while (true) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700377 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700378 printf("queue: %p start of c2\n", this);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700379 }
380 // This loop pulls each message out of the buffer.
381 const int pos = data_start_;
382 data_start_ = (data_start_ + 1) % data_length_;
Brian Silverman797e71e2013-09-06 17:29:39 -0700383 // If this is the last one.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700384 if (data_start_ == data_end_) {
385 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700386 printf("queue: %p reading from c2: %d\n", this, pos);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700387 }
388 msg = data_[pos];
389 break;
390 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700391 // This message is not going to be in the queue any more.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700392 DecrementMessageReferenceCount(data_[pos]);
393 }
394 } else {
395 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700396 printf("queue: %p reading from d2: %d\n", this, data_start_);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700397 }
398 msg = data_[data_start_];
399 data_start_ = (data_start_ + 1) % data_length_;
400 }
401 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700402 ReadCommonEnd(&read_data);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700403 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700404 printf("queue: %p read returning %p\n", this, msg);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700405 }
406 return msg;
407}
Brian Silverman08661c72013-09-01 17:24:38 -0700408const void *RawQueue::ReadMessageIndex(int options, int *index) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700409 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700410 printf("queue: %p->ReadMessageIndex(%x, %p(*=%d))\n",
Brian Silvermana6d1b562013-09-01 14:39:39 -0700411 this, options, index, *index);
412 }
413 void *msg = NULL;
Brian Silverman797e71e2013-09-06 17:29:39 -0700414
415 MutexLocker locker(&data_lock_);
416
417 ReadData read_data;
418 if (!ReadCommonStart(options, index, &read_data)) {
419 if (kReadDebug) {
420 printf("queue: %p common returned false\n", this);
421 }
422 return NULL;
423 }
424
425 // TODO(parker): Handle integer wrap on the index.
426
Brian Silverman797e71e2013-09-06 17:29:39 -0700427 // Where we're going to start reading.
Brian Silvermanc39e2bd2014-02-21 09:17:35 -0800428 int my_start;
429
Brian Silvermaneb51cbb2014-03-14 22:57:08 -0700430 if (options & kFromEnd) {
431 my_start = -1;
Brian Silvermanc39e2bd2014-02-21 09:17:35 -0800432 } else {
Brian Silvermaneb51cbb2014-03-14 22:57:08 -0700433 const int unread_messages = messages_ - *index;
434 assert(unread_messages > 0);
435 int current_messages = data_end_ - data_start_;
436 if (current_messages < 0) current_messages += data_length_;
Brian Silvermancd2d84c2014-03-13 23:30:58 -0700437 if (kReadIndexDebug) {
Brian Silvermaneb51cbb2014-03-14 22:57:08 -0700438 printf("queue: %p start=%d end=%d current=%d\n",
439 this, data_start_, data_end_, current_messages);
Brian Silvermancd2d84c2014-03-13 23:30:58 -0700440 }
Brian Silvermaneb51cbb2014-03-14 22:57:08 -0700441 assert(current_messages > 0);
442 // If we're behind the available messages.
443 if (unread_messages > current_messages) {
444 // Catch index up to the last available message.
445 *index = messages_ - current_messages;
446 // And that's the one we're going to read.
447 my_start = data_start_;
448 if (kReadIndexDebug) {
449 printf("queue: %p jumping ahead to message %d (have %d) (at %d)\n",
450 this, *index, messages_, data_start_);
451 }
Brian Silvermancd2d84c2014-03-13 23:30:58 -0700452 } else {
Brian Silvermaneb51cbb2014-03-14 22:57:08 -0700453 // Just start reading at the first available message that we haven't yet
454 // read.
455 my_start = data_end_ - unread_messages;
456 if (kReadIndexDebug) {
457 printf("queue: %p original read from %d\n", this, my_start);
458 }
459 if (data_start_ < data_end_) {
460 assert(my_start >= data_start_);
461 } else {
462 if (my_start < 0) my_start += data_length_;
463 }
Brian Silverman67e34f52014-03-13 15:52:57 -0700464 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700465 }
Brian Silvermanc39e2bd2014-02-21 09:17:35 -0800466
Brian Silverman797e71e2013-09-06 17:29:39 -0700467 if (options & kPeek) {
468 msg = ReadPeek(options, my_start);
469 } else {
470 if (options & kFromEnd) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700471 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700472 printf("queue: %p start of c1\n", this);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700473 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700474 int pos = data_end_ - 1;
Brian Silvermaneb51cbb2014-03-14 22:57:08 -0700475 if (kReadIndexDebug) {
476 printf("queue: %p end pos start %d\n", this, pos);
477 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700478 if (pos < 0) { // If it wrapped.
479 pos = data_length_ - 1; // Unwrap it.
480 }
481 if (kReadDebug) {
482 printf("queue: %p reading from c1: %d\n", this, pos);
483 }
484 msg = data_[pos];
485 *index = messages_;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700486 } else {
Brian Silverman797e71e2013-09-06 17:29:39 -0700487 if (kReadDebug) {
488 printf("queue: %p reading from d1: %d\n", this, my_start);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700489 }
Brian Silvermanc39e2bd2014-02-21 09:17:35 -0800490 // This assert checks that we're either within both endpoints (duh) or
Brian Silvermancd2d84c2014-03-13 23:30:58 -0700491 // not between them (if the queue is wrapped around).
Brian Silvermanc39e2bd2014-02-21 09:17:35 -0800492 assert((my_start >= data_start_ && my_start < data_end_) ||
Brian Silvermancd2d84c2014-03-13 23:30:58 -0700493 ((my_start >= data_start_) == (my_start > data_end_)));
Brian Silvermaneb51cbb2014-03-14 22:57:08 -0700494 // More sanity checking.
495 assert((my_start >= 0) && (my_start < data_length_));
Brian Silverman797e71e2013-09-06 17:29:39 -0700496 msg = data_[my_start];
497 ++(*index);
498 }
Brian Silverman430e7fa2014-03-21 16:58:33 -0700499 IncrementMessageReferenceCount(msg);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700500 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700501 ReadCommonEnd(&read_data);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700502 return msg;
503}
504
Brian Silverman08661c72013-09-01 17:24:38 -0700505void *RawQueue::GetMessage() {
Brian Silvermaneb51cbb2014-03-14 22:57:08 -0700506 // TODO(brians): Test this function.
Brian Silvermanc2e04222014-03-22 12:43:44 -0700507
508 MessageHeader *header = __atomic_load_n(&free_messages_, __ATOMIC_RELAXED);
509 do {
510 if (__builtin_expect(header == nullptr, 0)) {
511 LOG(FATAL, "overused pool of queue %p\n", this);
512 }
513 } while (
514 !__atomic_compare_exchange_n(&free_messages_, &header, header->next, true,
515 __ATOMIC_ACQ_REL, __ATOMIC_RELAXED));
516 void *msg = reinterpret_cast<uint8_t *>(header + 1);
517 // It might be uninitialized, 0 from a previous use, or 1 from previously
518 // getting recycled.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700519 header->ref_count = 1;
Brian Silvermanad290d82014-03-19 17:22:05 -0700520 static_assert(
521 __atomic_always_lock_free(sizeof(header->ref_count), &header->ref_count),
522 "we access this using not specifically atomic loads and stores");
Brian Silvermana6d1b562013-09-01 14:39:39 -0700523 if (kRefDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700524 printf("%p ref alloc: %p\n", this, msg);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700525 }
Brian Silvermana6d1b562013-09-01 14:39:39 -0700526 return msg;
527}
528
Brian Silvermanc2e04222014-03-22 12:43:44 -0700529int RawQueue::FreeMessages() const {
530 int r = 0;
531 MessageHeader *header = free_messages_;
532 while (header != nullptr) {
533 ++r;
534 header = header->next;
535 }
536 return r;
537}
538
Brian Silvermana6d1b562013-09-01 14:39:39 -0700539} // namespace aos