blob: 2e45326c5b2d1738abf60aaeb3589596aace096c [file] [log] [blame]
brians343bc112013-02-10 01:53:46 +00001#include "aos/atom_code/ipc_lib/queue.h"
2#include "aos/atom_code/ipc_lib/queue_internal.h"
3
4#include <stdio.h>
5#include <string.h>
6#include <errno.h>
7#include <assert.h>
8
Brian Silvermanf665d692013-02-17 22:11:39 -08009#include "aos/common/logging/logging.h"
10
brians343bc112013-02-10 01:53:46 +000011#define READ_DEBUG 0
12#define WRITE_DEBUG 0
13#define REF_DEBUG 0
14
15static inline aos_msg_header *get_header(void *msg) {
16 return (aos_msg_header *)((uint8_t *)msg - sizeof(aos_msg_header));
17}
18static inline aos_queue *aos_core_alloc_queue() {
19 return shm_malloc_aligned(sizeof(aos_queue), sizeof(int));
20}
21static inline void *aos_alloc_msg(aos_msg_pool *pool) {
22 return shm_malloc(pool->msg_length);
23}
24
25// actually free the given message
26static inline int aos_free_msg(aos_msg_pool *pool, void *msg, aos_queue *queue) {
27#if REF_DEBUG
28 if (pool->pool_lock == 0) {
29 //LOG(WARNING, "unprotected\n");
30 }
31#endif
32 aos_msg_header *header = get_header(msg);
33 if (pool->pool[header->index] != header) { // if something's messed up
34 fprintf(stderr, "queue: something is very very wrong with queue %p."
35 " pool->pool(=%p)[header->index(=%d)] != header(=%p)\n",
36 queue, pool->pool, header->index, header);
37 printf("queue: see stderr\n");
38 abort();
39 }
40#if REF_DEBUG
Brian Silvermanf665d692013-02-17 22:11:39 -080041 printf("ref free: %p\n", msg);
brians343bc112013-02-10 01:53:46 +000042#endif
43 --pool->used;
44
45 if (queue->recycle != NULL) {
46 void *const new_msg = aos_queue_get_msg(queue->recycle);
47 if (new_msg == NULL) {
48 fprintf(stderr, "queue: couldn't get a message"
49 " for recycle queue %p\n", queue->recycle);
50 } else {
51 // Take a message from recycle_queue and switch its
52 // header with the one being freed, which effectively
53 // switches which queue each message belongs to.
54 aos_msg_header *const new_header = get_header(new_msg);
55 // also switch the messages between the pools
56 pool->pool[header->index] = new_header;
57 if (mutex_lock(&queue->recycle->pool.pool_lock)) {
58 return -1;
59 }
60 queue->recycle->pool.pool[new_header->index] = header;
61 // swap the information in both headers
62 header_swap(header, new_header);
63 // don't unlock the other pool until all of its messages are valid
64 mutex_unlock(&queue->recycle->pool.pool_lock);
65 // use the header for new_msg which is now for this pool
66 header = new_header;
67 if (aos_queue_write_msg_free(queue->recycle,
68 (void *)msg, OVERRIDE) != 0) {
69 printf("queue: warning aos_queue_write_msg("
70 "%p(=queue(=%p)->recycle), %p, OVERRIDE)"
71 " failed\n",
72 queue->recycle, queue, msg);
73 }
74 msg = new_msg;
75 }
76 }
77
78 // where the one we're freeing was
79 int index = header->index;
80 header->index = -1;
81 if (index != pool->used) { // if we're not freeing the one on the end
82 // put the last one where the one we're freeing was
83 header = pool->pool[index] = pool->pool[pool->used];
84 // put the one we're freeing at the end
85 pool->pool[pool->used] = get_header(msg);
86 // update the former last one's index
87 header->index = index;
88 }
89 return 0;
90}
91// TODO(brians) maybe do this with atomic integer instructions so it doesn't have to lock/unlock pool_lock
92static inline int msg_ref_dec(void *msg, aos_msg_pool *pool, aos_queue *queue) {
93 if (msg == NULL) {
94 return 0;
95 }
96
97 int rv = 0;
98 if (mutex_lock(&pool->pool_lock)) {
99 return -1;
100 }
101 aos_msg_header *const header = get_header(msg);
102 header->ref_count --;
103 assert(header->ref_count >= 0);
104#if REF_DEBUG
105 printf("ref_dec_count: %p count=%d\n", msg, header->ref_count);
106#endif
107 if (header->ref_count == 0) {
108 rv = aos_free_msg(pool, msg, queue);
109 }
110 mutex_unlock(&pool->pool_lock);
111 return rv;
112}
113
114static inline int sigcmp(const aos_type_sig *sig1, const aos_type_sig *sig2) {
115 if (sig1->length != sig2->length) {
116 //LOG(DEBUG, "length mismatch 1=%d 2=%d\n", sig1->length, sig2->length);
117 return 0;
118 }
119 if (sig1->queue_length != sig2->queue_length) {
120 //LOG(DEBUG, "queue_length mismatch 1=%d 2=%d\n", sig1->queue_length, sig2->queue_length);
121 return 0;
122 }
123 if (sig1->hash != sig2->hash) {
124 //LOG(DEBUG, "hash mismatch 1=%d 2=%d\n", sig1->hash, sig2->hash);
125 return 0;
126 }
127 //LOG(DEBUG, "signature match\n");
128 return 1;
129}
130static inline aos_queue *aos_create_queue(const aos_type_sig *sig) {
131 aos_queue *const queue = aos_core_alloc_queue();
132 aos_msg_pool *const pool = &queue->pool;
133 pool->mem_length = sig->queue_length + EXTRA_MESSAGES;
134 pool->length = 0;
135 pool->used = 0;
136 pool->msg_length = sig->length + sizeof(aos_msg_header);
137 pool->pool = shm_malloc(sizeof(void *) * pool->mem_length);
138 aos_ring_buf *const buf = &queue->buf;
139 buf->length = sig->queue_length + 1;
140 if (buf->length < 2) { // TODO(brians) when could this happen?
141 buf->length = 2;
142 }
143 buf->data = shm_malloc(buf->length * sizeof(void *));
144 buf->start = 0;
145 buf->end = 0;
146 buf->msgs = 0;
147 buf->writable = 1;
148 buf->readable = 0;
149 buf->buff_lock = 0;
150 pool->pool_lock = 0;
151 queue->recycle = NULL;
152 return queue;
153}
154aos_queue *aos_fetch_queue(const char *name, const aos_type_sig *sig) {
155 //LOG(DEBUG, "Fetching the stupid queue: %s\n", name);
156 mutex_grab(&global_core->mem_struct->queues.alloc_lock);
157 aos_queue_list *list = global_core->mem_struct->queues.queue_list;
158 aos_queue_list *last = NULL;
159 while (list != NULL) {
160 // if we found a matching queue
161 if (strcmp(list->name, name) == 0 && sigcmp(&list->sig, sig)) {
162 mutex_unlock(&global_core->mem_struct->queues.alloc_lock);
163 return list->queue;
164 } else {
165 //LOG(DEBUG, "rejected queue %s strcmp=%d target=%s\n", (*list)->name, strcmp((*list)->name, name), name);
166 }
167 last = list;
168 list = list->next;
169 }
170 list = shm_malloc(sizeof(aos_queue_list));
171 if (last == NULL) {
172 global_core->mem_struct->queues.queue_list = list;
173 } else {
174 last->next = list;
175 }
176 list->sig = *sig;
177 const size_t name_size = strlen(name) + 1;
178 list->name = shm_malloc(name_size);
179 memcpy(list->name, name, name_size);
180 //LOG(INFO, "creating queue{name=%s, sig.length=%zd, sig.hash=%d, sig.queue_length=%d}\n", name, sig->length, sig->hash, sig->queue_length);
181 list->queue = aos_create_queue(sig);
182 //LOG(DEBUG, "Made the stupid queue: %s happy?\n", name);
183 list->next = NULL;
184 mutex_unlock(&global_core->mem_struct->queues.alloc_lock);
185 return list->queue;
186}
187aos_queue *aos_fetch_queue_recycle(const char *name, const aos_type_sig *sig,
188 const aos_type_sig *recycle_sig, aos_queue **recycle) {
189 if (sig->length != recycle_sig->length || sig->hash == recycle_sig->hash) {
190 *recycle = NULL;
191 return NULL;
192 }
193 aos_queue *const r = aos_fetch_queue(name, sig);
194 r->recycle = aos_fetch_queue(name, recycle_sig);
195 if (r == r->recycle) {
196 fprintf(stderr, "queue: r->recycle(=%p) == r(=%p)\n", r->recycle, r);
197 printf("see stderr\n");
198 abort();
199 }
200 *recycle = r->recycle;
201 return r;
202}
203
204int aos_queue_write_msg(aos_queue *queue, void *msg, int opts) {
205#if WRITE_DEBUG
206 printf("queue: write_msg(%p, %p, %d)\n", queue, msg, opts);
207#endif
208 int rv = 0;
209 if (msg == NULL || msg < (void *)global_core->mem_struct ||
210 msg > (void *)((intptr_t)global_core->mem_struct + global_core->size)) {
211 fprintf(stderr, "queue: attempt to write bad message %p to %p. aborting\n",
212 msg, queue);
213 printf("see stderr\n");
214 abort();
215 }
216 aos_ring_buf *const buf = &queue->buf;
217 if (mutex_lock(&buf->buff_lock)) {
218#if WRITE_DEBUG
219 printf("queue: locking buff_lock of %p failed\n", buf);
220#endif
221 return -1;
222 }
223 int new_end = (buf->end + 1) % buf->length;
224 while (new_end == buf->start) {
225 if (opts & NON_BLOCK) {
226#if WRITE_DEBUG
227 printf("queue: not blocking on %p. returning -1\n", queue);
228#endif
229 mutex_unlock(&buf->buff_lock);
230 return -1;
231 } else if (opts & OVERRIDE) {
232#if WRITE_DEBUG
233 printf("queue: overriding on %p\n", queue);
234#endif
235 // avoid leaking the message that we're going to overwrite
236 msg_ref_dec(buf->data[buf->start], &queue->pool, queue);
237 buf->start = (buf->start + 1) % buf->length;
238 } else { // BLOCK
239 mutex_unlock(&buf->buff_lock);
240#if WRITE_DEBUG
241 printf("queue: going to wait for writable(=%p) of %p\n",
242 &buf->writable, queue);
243#endif
244 if (condition_wait(&buf->writable)) {
245#if WRITE_DEBUG
246 printf("queue: waiting for writable(=%p) of %p failed\n",
247 &buf->writable, queue);
248#endif
249 return -1;
250 }
251#if WRITE_DEBUG
252 printf("queue: going to re-lock buff_lock of %p to write\n", queue);
253#endif
254 if (mutex_lock(&buf->buff_lock)) {
255#if WRITE_DEBUG
256 printf("queue: error locking buff_lock of %p\n", queue);
257#endif
258 return -1;
259 }
260 }
261 new_end = (buf->end + 1) % buf->length;
262 }
263 buf->data[buf->end] = msg;
264 ++buf->msgs;
265 buf->end = new_end;
266 mutex_unlock(&buf->buff_lock);
267#if WRITE_DEBUG
268 printf("queue: setting readable(=%p) of %p\n", &buf->readable, queue);
269#endif
270 condition_set(&buf->readable);
271 if (((buf->end + 1) % buf->length) == buf->start) { // if it's now full
272 condition_unset(&buf->writable);
273 }
274#if WRITE_DEBUG
275 printf("queue: write returning %d on queue %p\n", rv, queue);
276#endif
277 return rv;
278}
279
280int aos_queue_free_msg(aos_queue *queue, const void *msg) {
281 // TODO(brians) get rid of this
282 void *msg_temp;
283 memcpy(&msg_temp, &msg, sizeof(msg_temp));
284 return msg_ref_dec(msg_temp, &queue->pool, queue);
285}
286// Deals with setting/unsetting readable and writable.
287// Should be called after buff_lock has been unlocked.
288// read is whether or not this read call read one off the queue
289static inline void aos_read_msg_common_end(aos_ring_buf *const buf, int read) {
290 if (read) {
291 condition_set(&buf->writable);
292 if (buf->start == buf->end) {
293 condition_unset(&buf->readable);
294 }
295 }
296}
297// Returns with buff_lock locked and a readable message in buf.
298// Returns -1 for error (if it returns -1, buff_lock will be unlocked).
299static inline int aos_queue_read_msg_common(int opts, aos_ring_buf *const buf,
300 aos_queue *const queue, int *index) {
301#if !READ_DEBUG
302 (void)queue;
303#endif
304 if (mutex_lock(&buf->buff_lock)) {
305#if READ_DEBUG
306 printf("queue: couldn't lock buff_lock of %p\n", queue);
307#endif
308 return -1;
309 }
310 while (buf->start == buf->end || ((index != NULL) && buf->msgs <= *index)) {
311 mutex_unlock(&buf->buff_lock);
312 if (opts & NON_BLOCK) {
313#if READ_DEBUG
314 printf("queue: not going to block waiting on %p\n", queue);
315#endif
316 return -1;
317 } else { // BLOCK
318#if READ_DEBUG
319 printf("queue: going to wait for readable(=%p) of %p\n",
320 &buf->readable, queue);
321#endif
322 // wait for a message to become readable
323 if ((index == NULL) ? condition_wait(&buf->readable) :
324 condition_wait_force(&buf->readable)) {
325#if READ_DEBUG
326 printf("queue: waiting for readable(=%p) of %p failed\n",
327 &buf->readable, queue);
328#endif
329 return -1;
330 }
331 }
332#if READ_DEBUG
333 printf("queue: going to re-lock buff_lock of %p to read\n", queue);
334#endif
335 if (mutex_lock(&buf->buff_lock)) {
336#if READ_DEBUG
337 printf("couldn't re-lock buff_lock of %p\n", queue);
338#endif
339 return -1;
340 }
341 }
342#if READ_DEBUG
343 printf("queue: read start=%d end=%d from %p\n", buf->start, buf->end, queue);
344#endif
345 return 0;
346}
347// handles reading with PEEK
348static inline void *read_msg_peek(aos_ring_buf *const buf, int opts, int start) {
349 void *ret;
350 if (opts & FROM_END) {
351 int pos = buf->end - 1;
352 if (pos < 0) { // if it needs to wrap
353 pos = buf->length - 1;
354 }
355#if READ_DEBUG
356 printf("queue: reading from line %d: %d\n", __LINE__, pos);
357#endif
358 ret = buf->data[pos];
359 } else {
360#if READ_DEBUG
361 printf("queue: reading from line %d: %d\n", __LINE__, start);
362#endif
363 ret = buf->data[start];
364 }
365 aos_msg_header *const header = get_header(ret);
366 header->ref_count ++;
367#if REF_DEBUG
368 printf("ref inc count: %p\n", ret);
369#endif
370 return ret;
371}
372const void *aos_queue_read_msg(aos_queue *queue, int opts) {
373#if READ_DEBUG
374 printf("queue: read_msg(%p, %d)\n", queue, opts);
375#endif
376 void *msg = NULL;
377 aos_ring_buf *const buf = &queue->buf;
378 if (aos_queue_read_msg_common(opts, buf, queue, NULL) == -1) {
379#if READ_DEBUG
380 printf("queue: common returned -1 for %p\n", queue);
381#endif
382 return NULL;
383 }
384 if (opts & PEEK) {
385 msg = read_msg_peek(buf, opts, buf->start);
386 } else {
387 if (opts & FROM_END) {
388 while (1) {
389#if READ_DEBUG
390 printf("queue: start of c2 of %p\n", queue);
391#endif
392 // This loop pulls each message out of the buffer.
393 const int pos = buf->start;
394 buf->start = (buf->start + 1) % buf->length;
395 // if this is the last one
396 if (buf->start == buf->end) {
397#if READ_DEBUG
398 printf("queue: reading from c2: %d\n", pos);
399#endif
400 msg = buf->data[pos];
401 break;
402 }
403 // it's not going to be in the queue any more
404 msg_ref_dec(buf->data[pos], &queue->pool, queue);
405 }
406 } else {
407#if READ_DEBUG
408 printf("queue: reading from d2: %d\n", buf->start);
409#endif
410 msg = buf->data[buf->start];
411 buf->start = (buf->start + 1) % buf->length;
412 }
413 }
414 mutex_unlock(&buf->buff_lock);
415 aos_read_msg_common_end(buf, !(opts & PEEK));
416#if READ_DEBUG
417 printf("queue: read returning %p\n", msg);
418#endif
419 return msg;
420}
421const void *aos_queue_read_msg_index(aos_queue *queue, int opts, int *index) {
422#if READ_DEBUG
423 printf("queue: read_msg_index(%p, %d, %p(*=%d))\n", queue, opts, index, *index);
424#endif
425 void *msg = NULL;
426 aos_ring_buf *const buf = &queue->buf;
427 if (aos_queue_read_msg_common(opts, buf, queue, index) == -1) {
428#if READ_DEBUG
429 printf("queue: common returned -1\n");
430#endif
431 return NULL;
432 }
433 // TODO(parker): Handle integer wrap on the index.
434 const int offset = buf->msgs - *index;
435 int my_start = buf->end - offset;
436 if (offset >= buf->length) { // if we're behind the available messages
437 // catch index up to the last available message
438 *index += buf->start - my_start;
439 // and that's the one we're going to read
440 my_start = buf->start;
441 }
442 if (my_start < 0) { // if we want to read off the end of the buffer
443 // unwrap where we're going to read from
444 my_start += buf->length;
445 }
446 if (opts & PEEK) {
447 msg = read_msg_peek(buf, opts, my_start);
448 } else {
449 if (opts & FROM_END) {
450#if READ_DEBUG
451 printf("queue: start of c1 of %p\n", queue);
452#endif
453 int pos = buf->end - 1;
454 if (pos < 0) { // if it wrapped
455 pos = buf->length - 1; // unwrap it
456 }
457#if READ_DEBUG
458 printf("queue: reading from c1: %d\n", pos);
459#endif
460 msg = buf->data[pos];
461 *index = buf->msgs;
462 } else {
463#if READ_DEBUG
464 printf("queue: reading from d1: %d\n", my_start);
465#endif
466 msg = buf->data[my_start];
467 ++(*index);
468 }
469 aos_msg_header *const header = get_header(msg);
470 ++header->ref_count;
471#if REF_DEBUG
472 printf("ref_inc_count: %p\n", msg);
473#endif
474 }
475 mutex_unlock(&buf->buff_lock);
476 // this function never consumes one off the queue
477 aos_read_msg_common_end(buf, 0);
478 return msg;
479}
480static inline void *aos_pool_get_msg(aos_msg_pool *pool) {
481 if (mutex_lock(&pool->pool_lock)) {
482 return NULL;
483 }
484 void *msg;
485 if (pool->length - pool->used > 0) {
486 msg = pool->pool[pool->used];
487 } else {
488 if (pool->length >= pool->mem_length) {
Brian Silvermanf665d692013-02-17 22:11:39 -0800489 LOG(FATAL, "overused pool %p\n", pool);
brians343bc112013-02-10 01:53:46 +0000490 }
491 msg = pool->pool[pool->length] = aos_alloc_msg(pool);
492 ++pool->length;
493 }
494 aos_msg_header *const header = msg;
495 msg = (uint8_t *)msg + sizeof(aos_msg_header);
496 header->ref_count = 1;
497#if REF_DEBUG
498 printf("ref alloc: %p\n", msg);
499#endif
500 header->index = pool->used;
501 ++pool->used;
brians343bc112013-02-10 01:53:46 +0000502 mutex_unlock(&pool->pool_lock);
503 return msg;
504}
505void *aos_queue_get_msg(aos_queue *queue) {
506 return aos_pool_get_msg(&queue->pool);
507}
508