blob: 2b25b93a369d4f02ff75e838289b0d26830a4c58 [file] [log] [blame]
Brian Silverman14fd0fb2014-01-14 21:42:01 -08001#include "aos/linux_code/ipc_lib/queue.h"
Brian Silvermana6d1b562013-09-01 14:39:39 -07002
3#include <stdio.h>
4#include <string.h>
5#include <errno.h>
6#include <assert.h>
7
8#include <memory>
Brian Silvermanc39e2bd2014-02-21 09:17:35 -08009#include <algorithm>
Brian Silvermana6d1b562013-09-01 14:39:39 -070010
11#include "aos/common/logging/logging.h"
12#include "aos/common/type_traits.h"
Brian Silverman14fd0fb2014-01-14 21:42:01 -080013#include "aos/linux_code/ipc_lib/core_lib.h"
Brian Silvermana6d1b562013-09-01 14:39:39 -070014
Brian Silverman227ad482014-03-23 11:21:32 -070015#undef assert
16#define assert(...)
17
Brian Silvermana6d1b562013-09-01 14:39:39 -070018namespace aos {
Brian Silvermana6d1b562013-09-01 14:39:39 -070019namespace {
20
Brian Silverman08661c72013-09-01 17:24:38 -070021static_assert(shm_ok<RawQueue>::value,
22 "RawQueue instances go into shared memory");
Brian Silvermana6d1b562013-09-01 14:39:39 -070023
24const bool kReadDebug = false;
25const bool kWriteDebug = false;
26const bool kRefDebug = false;
27const bool kFetchDebug = false;
Brian Silvermancd2d84c2014-03-13 23:30:58 -070028const bool kReadIndexDebug = false;
Brian Silvermana6d1b562013-09-01 14:39:39 -070029
30// The number of extra messages the pool associated with each queue will be able
Brian Silverman08661c72013-09-01 17:24:38 -070031// to hold (for readers who are slow about freeing them or who leak one when
32// they get killed).
Brian Silvermana6d1b562013-09-01 14:39:39 -070033const int kExtraMessages = 20;
34
35} // namespace
36
Brian Silverman08661c72013-09-01 17:24:38 -070037const int RawQueue::kPeek;
38const int RawQueue::kFromEnd;
39const int RawQueue::kNonBlock;
40const int RawQueue::kBlock;
41const int RawQueue::kOverride;
42
Brian Silverman430e7fa2014-03-21 16:58:33 -070043// This is what gets stuck in before each queue message in memory. It is always
44// allocated aligned to 8 bytes and its size has to maintain that alignment for
45// the message that follows immediately.
Brian Silverman08661c72013-09-01 17:24:38 -070046struct RawQueue::MessageHeader {
Brian Silvermanad290d82014-03-19 17:22:05 -070047 // This gets incremented and decremented with atomic instructions without any
48 // locks held.
Brian Silverman227ad482014-03-23 11:21:32 -070049 int32_t ref_count;
Brian Silvermanc2e04222014-03-22 12:43:44 -070050 MessageHeader *next;
Brian Silverman5f8c4922014-02-11 21:22:38 -080051 // Gets the message header immediately preceding msg.
Brian Silvermana6d1b562013-09-01 14:39:39 -070052 static MessageHeader *Get(const void *msg) {
Brian Silverman63cf2412013-11-17 05:44:36 -080053 return reinterpret_cast<MessageHeader *>(__builtin_assume_aligned(
54 static_cast<uint8_t *>(const_cast<void *>(msg)) - sizeof(MessageHeader),
55 alignof(MessageHeader)));
Brian Silvermana6d1b562013-09-01 14:39:39 -070056 }
Brian Silverman227ad482014-03-23 11:21:32 -070057 // Padding to make the total size 8 bytes if we have 4-byte pointers or bump
58 // it to 16 if a pointer is 8 bytes by itself.
Brian Silvermanc2e04222014-03-22 12:43:44 -070059#if __SIZEOF_POINTER__ == 8
Brian Silverman227ad482014-03-23 11:21:32 -070060 char padding[4];
Brian Silvermanc2e04222014-03-22 12:43:44 -070061#elif __SIZEOF_POINTER__ == 4
62 // No padding needed to get 8 byte total size.
63#else
64#error Unknown pointer size.
65#endif
Brian Silvermana6d1b562013-09-01 14:39:39 -070066};
Brian Silvermana6d1b562013-09-01 14:39:39 -070067
Brian Silverman797e71e2013-09-06 17:29:39 -070068struct RawQueue::ReadData {
69 bool writable_start;
70};
71
Brian Silverman08661c72013-09-01 17:24:38 -070072void RawQueue::DecrementMessageReferenceCount(const void *msg) {
Brian Silvermana6d1b562013-09-01 14:39:39 -070073 MessageHeader *header = MessageHeader::Get(msg);
Brian Silvermanad290d82014-03-19 17:22:05 -070074 __atomic_sub_fetch(&header->ref_count, 1, __ATOMIC_RELAXED);
Brian Silvermana6d1b562013-09-01 14:39:39 -070075 if (kRefDebug) {
Brian Silvermanc2e04222014-03-22 12:43:44 -070076 printf("%p ref dec count: %p count=%d\n", this, msg, header->ref_count);
Brian Silvermana6d1b562013-09-01 14:39:39 -070077 }
Brian Silvermanad290d82014-03-19 17:22:05 -070078
79 // The only way it should ever be 0 is if we were the last one to decrement,
80 // in which case nobody else should have it around to re-increment it or
81 // anything in the middle, so this is safe to do not atomically with the
82 // decrement.
Brian Silvermana6d1b562013-09-01 14:39:39 -070083 if (header->ref_count == 0) {
84 DoFreeMessage(msg);
Brian Silvermanad290d82014-03-19 17:22:05 -070085 } else {
86 assert(header->ref_count > 0);
Brian Silvermana6d1b562013-09-01 14:39:39 -070087 }
88}
89
Brian Silverman227ad482014-03-23 11:21:32 -070090inline void RawQueue::IncrementMessageReferenceCount(const void *msg) const {
Brian Silverman430e7fa2014-03-21 16:58:33 -070091 MessageHeader *const header = MessageHeader::Get(msg);
92 __atomic_add_fetch(&header->ref_count, 1, __ATOMIC_RELAXED);
93 if (kRefDebug) {
Brian Silvermanc2e04222014-03-22 12:43:44 -070094 printf("%p ref inc count: %p\n", this, msg);
Brian Silverman430e7fa2014-03-21 16:58:33 -070095 }
96}
97
Brian Silverman08661c72013-09-01 17:24:38 -070098RawQueue::RawQueue(const char *name, size_t length, int hash, int queue_length)
Brian Silverman5f8c4922014-02-11 21:22:38 -080099 : readable_(&data_lock_), writable_(&data_lock_) {
Brian Silvermanc2e04222014-03-22 12:43:44 -0700100 static_assert(shm_ok<RawQueue::MessageHeader>::value,
101 "the whole point is to stick it in shared memory");
102 static_assert((sizeof(RawQueue::MessageHeader) % 8) == 0,
103 "need to revalidate size/alignent assumptions");
104
Brian Silverman227ad482014-03-23 11:21:32 -0700105 if (queue_length < 1) {
106 LOG(FATAL, "queue length %d needs to be at least 1\n", queue_length);
107 }
108
Brian Silvermana6d1b562013-09-01 14:39:39 -0700109 const size_t name_size = strlen(name) + 1;
110 char *temp = static_cast<char *>(shm_malloc(name_size));
111 memcpy(temp, name, name_size);
112 name_ = temp;
113 length_ = length;
114 hash_ = hash;
115 queue_length_ = queue_length;
116
117 next_ = NULL;
118 recycle_ = NULL;
119
120 if (kFetchDebug) {
121 printf("initializing name=%s, length=%zd, hash=%d, queue_length=%d\n",
122 name, length, hash, queue_length);
123 }
124
125 data_length_ = queue_length + 1;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700126 data_ = static_cast<void **>(shm_malloc(sizeof(void *) * data_length_));
127 data_start_ = 0;
128 data_end_ = 0;
129 messages_ = 0;
130
Brian Silvermana6d1b562013-09-01 14:39:39 -0700131 msg_length_ = length + sizeof(MessageHeader);
Brian Silvermanc2e04222014-03-22 12:43:44 -0700132
Brian Silverman227ad482014-03-23 11:21:32 -0700133 // Create all of the messages for the free list and stick them on.
134 {
135 MessageHeader *previous = nullptr;
136 for (int i = 0; i < queue_length + kExtraMessages; ++i) {
137 MessageHeader *const message =
138 static_cast<MessageHeader *>(shm_malloc(msg_length_));
139 free_messages_ = message;
140 message->next = previous;
141 previous = message;
142 }
Brian Silverman60eff202014-03-21 17:10:02 -0700143 }
Brian Silvermana6d1b562013-09-01 14:39:39 -0700144
145 if (kFetchDebug) {
146 printf("made queue %s\n", name);
147 }
148}
Brian Silverman08661c72013-09-01 17:24:38 -0700149RawQueue *RawQueue::Fetch(const char *name, size_t length, int hash,
Brian Silvermana6d1b562013-09-01 14:39:39 -0700150 int queue_length) {
151 if (kFetchDebug) {
152 printf("fetching queue %s\n", name);
153 }
Brian Silverman4aeac5f2014-02-11 22:17:07 -0800154 if (mutex_lock(&global_core->mem_struct->queues.lock) != 0) {
Brian Silverman227ad482014-03-23 11:21:32 -0700155 LOG(FATAL, "mutex_lock(%p) failed\n",
156 &global_core->mem_struct->queues.lock);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700157 }
Brian Silverman08661c72013-09-01 17:24:38 -0700158 RawQueue *current = static_cast<RawQueue *>(
Brian Silverman4aeac5f2014-02-11 22:17:07 -0800159 global_core->mem_struct->queues.pointer);
Brian Silverman797e71e2013-09-06 17:29:39 -0700160 if (current != NULL) {
161 while (true) {
162 // If we found a matching queue.
163 if (strcmp(current->name_, name) == 0 && current->length_ == length &&
164 current->hash_ == hash && current->queue_length_ == queue_length) {
Brian Silverman4aeac5f2014-02-11 22:17:07 -0800165 mutex_unlock(&global_core->mem_struct->queues.lock);
Brian Silverman797e71e2013-09-06 17:29:39 -0700166 return current;
167 } else {
168 if (kFetchDebug) {
169 printf("rejected queue %s strcmp=%d target=%s\n", current->name_,
170 strcmp(current->name_, name), name);
171 }
Brian Silvermana6d1b562013-09-01 14:39:39 -0700172 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700173 // If this is the last one.
174 if (current->next_ == NULL) break;
175 current = current->next_;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700176 }
Brian Silvermana6d1b562013-09-01 14:39:39 -0700177 }
178
Brian Silverman797e71e2013-09-06 17:29:39 -0700179 RawQueue *r = new (shm_malloc(sizeof(RawQueue)))
180 RawQueue(name, length, hash, queue_length);
181 if (current == NULL) { // if we don't already have one
Brian Silverman4aeac5f2014-02-11 22:17:07 -0800182 global_core->mem_struct->queues.pointer = r;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700183 } else {
Brian Silverman797e71e2013-09-06 17:29:39 -0700184 current->next_ = r;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700185 }
186
Brian Silverman4aeac5f2014-02-11 22:17:07 -0800187 mutex_unlock(&global_core->mem_struct->queues.lock);
Brian Silverman797e71e2013-09-06 17:29:39 -0700188 return r;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700189}
Brian Silverman08661c72013-09-01 17:24:38 -0700190RawQueue *RawQueue::Fetch(const char *name, size_t length, int hash,
Brian Silvermana6d1b562013-09-01 14:39:39 -0700191 int queue_length,
Brian Silverman08661c72013-09-01 17:24:38 -0700192 int recycle_hash, int recycle_length, RawQueue **recycle) {
193 RawQueue *r = Fetch(name, length, hash, queue_length);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700194 r->recycle_ = Fetch(name, length, recycle_hash, recycle_length);
195 if (r == r->recycle_) {
196 fprintf(stderr, "queue: r->recycle_(=%p) == r(=%p)\n", r->recycle_, r);
197 printf("see stderr\n");
Brian Silverman797e71e2013-09-06 17:29:39 -0700198 r->recycle_ = NULL;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700199 abort();
200 }
201 *recycle = r->recycle_;
202 return r;
203}
204
Brian Silverman227ad482014-03-23 11:21:32 -0700205inline void RawQueue::DoFreeMessage(const void *msg) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700206 MessageHeader *header = MessageHeader::Get(msg);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700207 if (kRefDebug) {
Brian Silverman227ad482014-03-23 11:21:32 -0700208 printf("%p ref free to %p: %p\n", this, recycle_, msg);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700209 }
Brian Silvermana6d1b562013-09-01 14:39:39 -0700210
Brian Silverman227ad482014-03-23 11:21:32 -0700211 if (__builtin_expect(recycle_ != nullptr, 0)) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700212 void *const new_msg = recycle_->GetMessage();
Brian Silverman227ad482014-03-23 11:21:32 -0700213 if (new_msg == nullptr) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700214 fprintf(stderr, "queue: couldn't get a message"
215 " for recycle queue %p\n", recycle_);
216 } else {
Brian Silverman227ad482014-03-23 11:21:32 -0700217 // Nobody else has a reference to the message at this point, so no need to
218 // be fancy about it.
219 ++header->ref_count;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700220 if (!recycle_->WriteMessage(const_cast<void *>(msg), kOverride)) {
221 fprintf(stderr, "queue: %p->WriteMessage(%p, kOverride) failed."
222 " aborting\n", recycle_, msg);
223 printf("see stderr\n");
224 abort();
225 }
226 msg = new_msg;
Brian Silvermanc2e04222014-03-22 12:43:44 -0700227 header = MessageHeader::Get(new_msg);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700228 }
229 }
230
Brian Silverman227ad482014-03-23 11:21:32 -0700231 // This works around GCC bug 60272 (fixed in 4.8.3).
232 // new_next should just get replaced with header->next (and the body of the
233 // loop should become empty).
234 // The bug is that the store to new_next after the compare/exchange is
235 // unconditional but it should only be if it fails, which could mean
236 // overwriting what somebody else who preempted us right then changed it to.
237 // TODO(brians): Get rid of this workaround once we get a new enough GCC.
238 MessageHeader *new_next = __atomic_load_n(&free_messages_, __ATOMIC_RELAXED);
239 do {
240 header->next = new_next;
241 } while (__builtin_expect(
242 !__atomic_compare_exchange_n(&free_messages_, &new_next, header, true,
243 __ATOMIC_RELEASE, __ATOMIC_RELAXED),
244 0));
Brian Silvermana6d1b562013-09-01 14:39:39 -0700245}
246
Brian Silverman08661c72013-09-01 17:24:38 -0700247bool RawQueue::WriteMessage(void *msg, int options) {
Brian Silvermaneb51cbb2014-03-14 22:57:08 -0700248 // TODO(brians): Test this function.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700249 if (kWriteDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700250 printf("queue: %p->WriteMessage(%p, %x)\n", this, msg, options);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700251 }
252 if (msg == NULL || msg < reinterpret_cast<void *>(global_core->mem_struct) ||
253 msg > static_cast<void *>((
Brian Silverman08661c72013-09-01 17:24:38 -0700254 reinterpret_cast<char *>(global_core->mem_struct) +
Brian Silvermana6d1b562013-09-01 14:39:39 -0700255 global_core->size))) {
256 fprintf(stderr, "queue: attempt to write bad message %p to %p. aborting\n",
257 msg, this);
258 printf("see stderr\n");
259 abort();
260 }
261 {
262 MutexLocker locker(&data_lock_);
Brian Silverman797e71e2013-09-06 17:29:39 -0700263 bool writable_waited = false;
264
265 int new_end;
266 while (true) {
267 new_end = (data_end_ + 1) % data_length_;
268 // If there is room in the queue right now.
269 if (new_end != data_start_) break;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700270 if (options & kNonBlock) {
271 if (kWriteDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700272 printf("queue: not blocking on %p. returning false\n", this);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700273 }
Brian Silverman358c49f2014-03-05 16:56:34 -0800274 DecrementMessageReferenceCount(msg);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700275 return false;
276 } else if (options & kOverride) {
277 if (kWriteDebug) {
278 printf("queue: overriding on %p\n", this);
279 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700280 // Avoid leaking the message that we're going to overwrite.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700281 DecrementMessageReferenceCount(data_[data_start_]);
282 data_start_ = (data_start_ + 1) % data_length_;
283 } else { // kBlock
284 if (kWriteDebug) {
285 printf("queue: going to wait for writable_ of %p\n", this);
286 }
Brian Silverman08661c72013-09-01 17:24:38 -0700287 writable_.Wait();
Brian Silverman797e71e2013-09-06 17:29:39 -0700288 writable_waited = true;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700289 }
Brian Silvermana6d1b562013-09-01 14:39:39 -0700290 }
291 data_[data_end_] = msg;
292 ++messages_;
293 data_end_ = new_end;
Brian Silverman797e71e2013-09-06 17:29:39 -0700294
295 if (kWriteDebug) {
296 printf("queue: broadcasting to readable_ of %p\n", this);
297 }
298 readable_.Broadcast();
299
300 // If we got a signal on writable_ here and it's still writable, then we
301 // need to signal the next person in line (if any).
302 if (writable_waited && is_writable()) {
303 if (kWriteDebug) {
304 printf("queue: resignalling writable_ of %p\n", this);
305 }
306 writable_.Signal();
307 }
Brian Silvermana6d1b562013-09-01 14:39:39 -0700308 }
309 if (kWriteDebug) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700310 printf("queue: write returning true on queue %p\n", this);
311 }
312 return true;
313}
314
Brian Silverman227ad482014-03-23 11:21:32 -0700315inline void RawQueue::ReadCommonEnd(ReadData *read_data) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700316 if (is_writable()) {
317 if (kReadDebug) {
318 printf("queue: %ssignalling writable_ of %p\n",
319 read_data->writable_start ? "not " : "", this);
320 }
321 if (!read_data->writable_start) writable_.Signal();
Brian Silvermana6d1b562013-09-01 14:39:39 -0700322 }
323}
Brian Silverman227ad482014-03-23 11:21:32 -0700324
Brian Silverman797e71e2013-09-06 17:29:39 -0700325bool RawQueue::ReadCommonStart(int options, int *index, ReadData *read_data) {
326 read_data->writable_start = is_writable();
Brian Silvermana6d1b562013-09-01 14:39:39 -0700327 while (data_start_ == data_end_ || ((index != NULL) && messages_ <= *index)) {
328 if (options & kNonBlock) {
329 if (kReadDebug) {
330 printf("queue: not going to block waiting on %p\n", this);
331 }
332 return false;
333 } else { // kBlock
334 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700335 printf("queue: going to wait for readable_ of %p\n", this);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700336 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700337 // Wait for a message to become readable.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700338 readable_.Wait();
339 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700340 printf("queue: done waiting for readable_ of %p\n", this);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700341 }
Brian Silvermana6d1b562013-09-01 14:39:39 -0700342 }
343 }
344 if (kReadDebug) {
Brian Silvermanc39e2bd2014-02-21 09:17:35 -0800345 printf("queue: %p->read(%p) start=%d end=%d\n", this, index, data_start_,
346 data_end_);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700347 }
348 return true;
349}
Brian Silverman227ad482014-03-23 11:21:32 -0700350
351inline int RawQueue::LastMessageIndex() const {
352 int pos = data_end_ - 1;
353 if (pos < 0) { // If it wrapped around.
354 pos = data_length_ - 1;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700355 }
Brian Silverman227ad482014-03-23 11:21:32 -0700356 return pos;
Brian Silvermana6d1b562013-09-01 14:39:39 -0700357}
Brian Silverman227ad482014-03-23 11:21:32 -0700358
Brian Silverman08661c72013-09-01 17:24:38 -0700359const void *RawQueue::ReadMessage(int options) {
Brian Silvermaneb51cbb2014-03-14 22:57:08 -0700360 // TODO(brians): Test this function.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700361 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700362 printf("queue: %p->ReadMessage(%x)\n", this, options);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700363 }
364 void *msg = NULL;
Brian Silverman797e71e2013-09-06 17:29:39 -0700365
Brian Silvermana6d1b562013-09-01 14:39:39 -0700366 MutexLocker locker(&data_lock_);
Brian Silverman797e71e2013-09-06 17:29:39 -0700367
368 ReadData read_data;
369 if (!ReadCommonStart(options, NULL, &read_data)) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700370 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700371 printf("queue: %p common returned false\n", this);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700372 }
373 return NULL;
374 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700375
Brian Silverman227ad482014-03-23 11:21:32 -0700376 if (options & kFromEnd) {
377 if (options & kPeek) {
378 if (kReadDebug) {
379 printf("queue: %p shortcutting c2: %d\n", this, LastMessageIndex());
380 }
381 msg = data_[LastMessageIndex()];
382 IncrementMessageReferenceCount(msg);
383 } else {
Brian Silverman797e71e2013-09-06 17:29:39 -0700384 while (true) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700385 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700386 printf("queue: %p start of c2\n", this);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700387 }
388 // This loop pulls each message out of the buffer.
389 const int pos = data_start_;
390 data_start_ = (data_start_ + 1) % data_length_;
Brian Silverman797e71e2013-09-06 17:29:39 -0700391 // If this is the last one.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700392 if (data_start_ == data_end_) {
393 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700394 printf("queue: %p reading from c2: %d\n", this, pos);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700395 }
396 msg = data_[pos];
397 break;
398 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700399 // This message is not going to be in the queue any more.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700400 DecrementMessageReferenceCount(data_[pos]);
401 }
Brian Silverman227ad482014-03-23 11:21:32 -0700402 }
403 } else {
404 if (kReadDebug) {
405 printf("queue: %p reading from d2: %d\n", this, data_start_);
406 }
407 msg = data_[data_start_];
408 if (options & kPeek) {
409 IncrementMessageReferenceCount(msg);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700410 } else {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700411 data_start_ = (data_start_ + 1) % data_length_;
412 }
413 }
Brian Silverman797e71e2013-09-06 17:29:39 -0700414 ReadCommonEnd(&read_data);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700415 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700416 printf("queue: %p read returning %p\n", this, msg);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700417 }
418 return msg;
419}
Brian Silverman227ad482014-03-23 11:21:32 -0700420
Brian Silverman08661c72013-09-01 17:24:38 -0700421const void *RawQueue::ReadMessageIndex(int options, int *index) {
Brian Silvermana6d1b562013-09-01 14:39:39 -0700422 if (kReadDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700423 printf("queue: %p->ReadMessageIndex(%x, %p(*=%d))\n",
Brian Silvermana6d1b562013-09-01 14:39:39 -0700424 this, options, index, *index);
425 }
426 void *msg = NULL;
Brian Silverman797e71e2013-09-06 17:29:39 -0700427
428 MutexLocker locker(&data_lock_);
429
430 ReadData read_data;
431 if (!ReadCommonStart(options, index, &read_data)) {
432 if (kReadDebug) {
433 printf("queue: %p common returned false\n", this);
434 }
435 return NULL;
436 }
437
438 // TODO(parker): Handle integer wrap on the index.
439
Brian Silvermaneb51cbb2014-03-14 22:57:08 -0700440 if (options & kFromEnd) {
Brian Silverman227ad482014-03-23 11:21:32 -0700441 if (kReadDebug) {
442 printf("queue: %p start of c1\n", this);
443 }
444 if (kReadDebug) {
445 printf("queue: %p reading from c1: %d\n", this, LastMessageIndex());
446 }
447 msg = data_[LastMessageIndex()];
448 if (!(options & kPeek)) *index = messages_;
Brian Silvermanc39e2bd2014-02-21 09:17:35 -0800449 } else {
Brian Silverman227ad482014-03-23 11:21:32 -0700450 // Where we're going to start reading.
451 int my_start;
452
Brian Silvermaneb51cbb2014-03-14 22:57:08 -0700453 const int unread_messages = messages_ - *index;
454 assert(unread_messages > 0);
455 int current_messages = data_end_ - data_start_;
456 if (current_messages < 0) current_messages += data_length_;
Brian Silvermancd2d84c2014-03-13 23:30:58 -0700457 if (kReadIndexDebug) {
Brian Silvermaneb51cbb2014-03-14 22:57:08 -0700458 printf("queue: %p start=%d end=%d current=%d\n",
459 this, data_start_, data_end_, current_messages);
Brian Silvermancd2d84c2014-03-13 23:30:58 -0700460 }
Brian Silvermaneb51cbb2014-03-14 22:57:08 -0700461 assert(current_messages > 0);
462 // If we're behind the available messages.
463 if (unread_messages > current_messages) {
464 // Catch index up to the last available message.
465 *index = messages_ - current_messages;
466 // And that's the one we're going to read.
467 my_start = data_start_;
468 if (kReadIndexDebug) {
469 printf("queue: %p jumping ahead to message %d (have %d) (at %d)\n",
470 this, *index, messages_, data_start_);
471 }
Brian Silvermancd2d84c2014-03-13 23:30:58 -0700472 } else {
Brian Silvermaneb51cbb2014-03-14 22:57:08 -0700473 // Just start reading at the first available message that we haven't yet
474 // read.
475 my_start = data_end_ - unread_messages;
476 if (kReadIndexDebug) {
477 printf("queue: %p original read from %d\n", this, my_start);
478 }
479 if (data_start_ < data_end_) {
480 assert(my_start >= data_start_);
481 } else {
482 if (my_start < 0) my_start += data_length_;
483 }
Brian Silverman67e34f52014-03-13 15:52:57 -0700484 }
Brian Silvermanc39e2bd2014-02-21 09:17:35 -0800485
Brian Silverman227ad482014-03-23 11:21:32 -0700486 if (kReadDebug) {
487 printf("queue: %p reading from d1: %d\n", this, my_start);
Brian Silverman797e71e2013-09-06 17:29:39 -0700488 }
Brian Silverman227ad482014-03-23 11:21:32 -0700489 // We have to be either after the start or before the end, even if the queue
490 // is wrapped around.
491 assert((my_start >= data_start_) || (my_start < data_end_));
492 // More sanity checking.
493 assert((my_start >= 0) && (my_start < data_length_));
494 msg = data_[my_start];
495 if (!(options & kPeek)) ++(*index);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700496 }
Brian Silverman227ad482014-03-23 11:21:32 -0700497 IncrementMessageReferenceCount(msg);
498
Brian Silverman797e71e2013-09-06 17:29:39 -0700499 ReadCommonEnd(&read_data);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700500 return msg;
501}
502
Brian Silverman08661c72013-09-01 17:24:38 -0700503void *RawQueue::GetMessage() {
Brian Silvermaneb51cbb2014-03-14 22:57:08 -0700504 // TODO(brians): Test this function.
Brian Silvermanc2e04222014-03-22 12:43:44 -0700505
506 MessageHeader *header = __atomic_load_n(&free_messages_, __ATOMIC_RELAXED);
507 do {
508 if (__builtin_expect(header == nullptr, 0)) {
509 LOG(FATAL, "overused pool of queue %p\n", this);
510 }
Brian Silverman227ad482014-03-23 11:21:32 -0700511 } while (__builtin_expect(
Brian Silvermanc2e04222014-03-22 12:43:44 -0700512 !__atomic_compare_exchange_n(&free_messages_, &header, header->next, true,
Brian Silverman227ad482014-03-23 11:21:32 -0700513 __ATOMIC_ACQ_REL, __ATOMIC_RELAXED),
514 0));
Brian Silvermanc2e04222014-03-22 12:43:44 -0700515 void *msg = reinterpret_cast<uint8_t *>(header + 1);
516 // It might be uninitialized, 0 from a previous use, or 1 from previously
517 // getting recycled.
Brian Silvermana6d1b562013-09-01 14:39:39 -0700518 header->ref_count = 1;
Brian Silvermanad290d82014-03-19 17:22:05 -0700519 static_assert(
520 __atomic_always_lock_free(sizeof(header->ref_count), &header->ref_count),
521 "we access this using not specifically atomic loads and stores");
Brian Silvermana6d1b562013-09-01 14:39:39 -0700522 if (kRefDebug) {
Brian Silverman797e71e2013-09-06 17:29:39 -0700523 printf("%p ref alloc: %p\n", this, msg);
Brian Silvermana6d1b562013-09-01 14:39:39 -0700524 }
Brian Silvermana6d1b562013-09-01 14:39:39 -0700525 return msg;
526}
527
Brian Silvermanc2e04222014-03-22 12:43:44 -0700528int RawQueue::FreeMessages() const {
529 int r = 0;
530 MessageHeader *header = free_messages_;
531 while (header != nullptr) {
532 ++r;
533 header = header->next;
534 }
535 return r;
536}
537
Brian Silvermana6d1b562013-09-01 14:39:39 -0700538} // namespace aos