blob: 5cfd2ac6c5a1b41f2d01756b62bd3e81244281da [file] [log] [blame]
brians343bc112013-02-10 01:53:46 +00001#include "aos/atom_code/ipc_lib/queue.h"
2#include "aos/atom_code/ipc_lib/queue_internal.h"
3
4#include <stdio.h>
5#include <string.h>
6#include <errno.h>
7#include <assert.h>
8
9#define READ_DEBUG 0
10#define WRITE_DEBUG 0
11#define REF_DEBUG 0
12
13static inline aos_msg_header *get_header(void *msg) {
14 return (aos_msg_header *)((uint8_t *)msg - sizeof(aos_msg_header));
15}
16static inline aos_queue *aos_core_alloc_queue() {
17 return shm_malloc_aligned(sizeof(aos_queue), sizeof(int));
18}
19static inline void *aos_alloc_msg(aos_msg_pool *pool) {
20 return shm_malloc(pool->msg_length);
21}
22
23// actually free the given message
24static inline int aos_free_msg(aos_msg_pool *pool, void *msg, aos_queue *queue) {
25#if REF_DEBUG
26 if (pool->pool_lock == 0) {
27 //LOG(WARNING, "unprotected\n");
28 }
29#endif
30 aos_msg_header *header = get_header(msg);
31 if (pool->pool[header->index] != header) { // if something's messed up
32 fprintf(stderr, "queue: something is very very wrong with queue %p."
33 " pool->pool(=%p)[header->index(=%d)] != header(=%p)\n",
34 queue, pool->pool, header->index, header);
35 printf("queue: see stderr\n");
36 abort();
37 }
38#if REF_DEBUG
39 printf("ref_free_count: %p\n", msg);
40#endif
41 --pool->used;
42
43 if (queue->recycle != NULL) {
44 void *const new_msg = aos_queue_get_msg(queue->recycle);
45 if (new_msg == NULL) {
46 fprintf(stderr, "queue: couldn't get a message"
47 " for recycle queue %p\n", queue->recycle);
48 } else {
49 // Take a message from recycle_queue and switch its
50 // header with the one being freed, which effectively
51 // switches which queue each message belongs to.
52 aos_msg_header *const new_header = get_header(new_msg);
53 // also switch the messages between the pools
54 pool->pool[header->index] = new_header;
55 if (mutex_lock(&queue->recycle->pool.pool_lock)) {
56 return -1;
57 }
58 queue->recycle->pool.pool[new_header->index] = header;
59 // swap the information in both headers
60 header_swap(header, new_header);
61 // don't unlock the other pool until all of its messages are valid
62 mutex_unlock(&queue->recycle->pool.pool_lock);
63 // use the header for new_msg which is now for this pool
64 header = new_header;
65 if (aos_queue_write_msg_free(queue->recycle,
66 (void *)msg, OVERRIDE) != 0) {
67 printf("queue: warning aos_queue_write_msg("
68 "%p(=queue(=%p)->recycle), %p, OVERRIDE)"
69 " failed\n",
70 queue->recycle, queue, msg);
71 }
72 msg = new_msg;
73 }
74 }
75
76 // where the one we're freeing was
77 int index = header->index;
78 header->index = -1;
79 if (index != pool->used) { // if we're not freeing the one on the end
80 // put the last one where the one we're freeing was
81 header = pool->pool[index] = pool->pool[pool->used];
82 // put the one we're freeing at the end
83 pool->pool[pool->used] = get_header(msg);
84 // update the former last one's index
85 header->index = index;
86 }
87 return 0;
88}
89// TODO(brians) maybe do this with atomic integer instructions so it doesn't have to lock/unlock pool_lock
90static inline int msg_ref_dec(void *msg, aos_msg_pool *pool, aos_queue *queue) {
91 if (msg == NULL) {
92 return 0;
93 }
94
95 int rv = 0;
96 if (mutex_lock(&pool->pool_lock)) {
97 return -1;
98 }
99 aos_msg_header *const header = get_header(msg);
100 header->ref_count --;
101 assert(header->ref_count >= 0);
102#if REF_DEBUG
103 printf("ref_dec_count: %p count=%d\n", msg, header->ref_count);
104#endif
105 if (header->ref_count == 0) {
106 rv = aos_free_msg(pool, msg, queue);
107 }
108 mutex_unlock(&pool->pool_lock);
109 return rv;
110}
111
112static inline int sigcmp(const aos_type_sig *sig1, const aos_type_sig *sig2) {
113 if (sig1->length != sig2->length) {
114 //LOG(DEBUG, "length mismatch 1=%d 2=%d\n", sig1->length, sig2->length);
115 return 0;
116 }
117 if (sig1->queue_length != sig2->queue_length) {
118 //LOG(DEBUG, "queue_length mismatch 1=%d 2=%d\n", sig1->queue_length, sig2->queue_length);
119 return 0;
120 }
121 if (sig1->hash != sig2->hash) {
122 //LOG(DEBUG, "hash mismatch 1=%d 2=%d\n", sig1->hash, sig2->hash);
123 return 0;
124 }
125 //LOG(DEBUG, "signature match\n");
126 return 1;
127}
128static inline aos_queue *aos_create_queue(const aos_type_sig *sig) {
129 aos_queue *const queue = aos_core_alloc_queue();
130 aos_msg_pool *const pool = &queue->pool;
131 pool->mem_length = sig->queue_length + EXTRA_MESSAGES;
132 pool->length = 0;
133 pool->used = 0;
134 pool->msg_length = sig->length + sizeof(aos_msg_header);
135 pool->pool = shm_malloc(sizeof(void *) * pool->mem_length);
136 aos_ring_buf *const buf = &queue->buf;
137 buf->length = sig->queue_length + 1;
138 if (buf->length < 2) { // TODO(brians) when could this happen?
139 buf->length = 2;
140 }
141 buf->data = shm_malloc(buf->length * sizeof(void *));
142 buf->start = 0;
143 buf->end = 0;
144 buf->msgs = 0;
145 buf->writable = 1;
146 buf->readable = 0;
147 buf->buff_lock = 0;
148 pool->pool_lock = 0;
149 queue->recycle = NULL;
150 return queue;
151}
152aos_queue *aos_fetch_queue(const char *name, const aos_type_sig *sig) {
153 //LOG(DEBUG, "Fetching the stupid queue: %s\n", name);
154 mutex_grab(&global_core->mem_struct->queues.alloc_lock);
155 aos_queue_list *list = global_core->mem_struct->queues.queue_list;
156 aos_queue_list *last = NULL;
157 while (list != NULL) {
158 // if we found a matching queue
159 if (strcmp(list->name, name) == 0 && sigcmp(&list->sig, sig)) {
160 mutex_unlock(&global_core->mem_struct->queues.alloc_lock);
161 return list->queue;
162 } else {
163 //LOG(DEBUG, "rejected queue %s strcmp=%d target=%s\n", (*list)->name, strcmp((*list)->name, name), name);
164 }
165 last = list;
166 list = list->next;
167 }
168 list = shm_malloc(sizeof(aos_queue_list));
169 if (last == NULL) {
170 global_core->mem_struct->queues.queue_list = list;
171 } else {
172 last->next = list;
173 }
174 list->sig = *sig;
175 const size_t name_size = strlen(name) + 1;
176 list->name = shm_malloc(name_size);
177 memcpy(list->name, name, name_size);
178 //LOG(INFO, "creating queue{name=%s, sig.length=%zd, sig.hash=%d, sig.queue_length=%d}\n", name, sig->length, sig->hash, sig->queue_length);
179 list->queue = aos_create_queue(sig);
180 //LOG(DEBUG, "Made the stupid queue: %s happy?\n", name);
181 list->next = NULL;
182 mutex_unlock(&global_core->mem_struct->queues.alloc_lock);
183 return list->queue;
184}
185aos_queue *aos_fetch_queue_recycle(const char *name, const aos_type_sig *sig,
186 const aos_type_sig *recycle_sig, aos_queue **recycle) {
187 if (sig->length != recycle_sig->length || sig->hash == recycle_sig->hash) {
188 *recycle = NULL;
189 return NULL;
190 }
191 aos_queue *const r = aos_fetch_queue(name, sig);
192 r->recycle = aos_fetch_queue(name, recycle_sig);
193 if (r == r->recycle) {
194 fprintf(stderr, "queue: r->recycle(=%p) == r(=%p)\n", r->recycle, r);
195 printf("see stderr\n");
196 abort();
197 }
198 *recycle = r->recycle;
199 return r;
200}
201
202int aos_queue_write_msg(aos_queue *queue, void *msg, int opts) {
203#if WRITE_DEBUG
204 printf("queue: write_msg(%p, %p, %d)\n", queue, msg, opts);
205#endif
206 int rv = 0;
207 if (msg == NULL || msg < (void *)global_core->mem_struct ||
208 msg > (void *)((intptr_t)global_core->mem_struct + global_core->size)) {
209 fprintf(stderr, "queue: attempt to write bad message %p to %p. aborting\n",
210 msg, queue);
211 printf("see stderr\n");
212 abort();
213 }
214 aos_ring_buf *const buf = &queue->buf;
215 if (mutex_lock(&buf->buff_lock)) {
216#if WRITE_DEBUG
217 printf("queue: locking buff_lock of %p failed\n", buf);
218#endif
219 return -1;
220 }
221 int new_end = (buf->end + 1) % buf->length;
222 while (new_end == buf->start) {
223 if (opts & NON_BLOCK) {
224#if WRITE_DEBUG
225 printf("queue: not blocking on %p. returning -1\n", queue);
226#endif
227 mutex_unlock(&buf->buff_lock);
228 return -1;
229 } else if (opts & OVERRIDE) {
230#if WRITE_DEBUG
231 printf("queue: overriding on %p\n", queue);
232#endif
233 // avoid leaking the message that we're going to overwrite
234 msg_ref_dec(buf->data[buf->start], &queue->pool, queue);
235 buf->start = (buf->start + 1) % buf->length;
236 } else { // BLOCK
237 mutex_unlock(&buf->buff_lock);
238#if WRITE_DEBUG
239 printf("queue: going to wait for writable(=%p) of %p\n",
240 &buf->writable, queue);
241#endif
242 if (condition_wait(&buf->writable)) {
243#if WRITE_DEBUG
244 printf("queue: waiting for writable(=%p) of %p failed\n",
245 &buf->writable, queue);
246#endif
247 return -1;
248 }
249#if WRITE_DEBUG
250 printf("queue: going to re-lock buff_lock of %p to write\n", queue);
251#endif
252 if (mutex_lock(&buf->buff_lock)) {
253#if WRITE_DEBUG
254 printf("queue: error locking buff_lock of %p\n", queue);
255#endif
256 return -1;
257 }
258 }
259 new_end = (buf->end + 1) % buf->length;
260 }
261 buf->data[buf->end] = msg;
262 ++buf->msgs;
263 buf->end = new_end;
264 mutex_unlock(&buf->buff_lock);
265#if WRITE_DEBUG
266 printf("queue: setting readable(=%p) of %p\n", &buf->readable, queue);
267#endif
268 condition_set(&buf->readable);
269 if (((buf->end + 1) % buf->length) == buf->start) { // if it's now full
270 condition_unset(&buf->writable);
271 }
272#if WRITE_DEBUG
273 printf("queue: write returning %d on queue %p\n", rv, queue);
274#endif
275 return rv;
276}
277
278int aos_queue_free_msg(aos_queue *queue, const void *msg) {
279 // TODO(brians) get rid of this
280 void *msg_temp;
281 memcpy(&msg_temp, &msg, sizeof(msg_temp));
282 return msg_ref_dec(msg_temp, &queue->pool, queue);
283}
284// Deals with setting/unsetting readable and writable.
285// Should be called after buff_lock has been unlocked.
286// read is whether or not this read call read one off the queue
287static inline void aos_read_msg_common_end(aos_ring_buf *const buf, int read) {
288 if (read) {
289 condition_set(&buf->writable);
290 if (buf->start == buf->end) {
291 condition_unset(&buf->readable);
292 }
293 }
294}
295// Returns with buff_lock locked and a readable message in buf.
296// Returns -1 for error (if it returns -1, buff_lock will be unlocked).
297static inline int aos_queue_read_msg_common(int opts, aos_ring_buf *const buf,
298 aos_queue *const queue, int *index) {
299#if !READ_DEBUG
300 (void)queue;
301#endif
302 if (mutex_lock(&buf->buff_lock)) {
303#if READ_DEBUG
304 printf("queue: couldn't lock buff_lock of %p\n", queue);
305#endif
306 return -1;
307 }
308 while (buf->start == buf->end || ((index != NULL) && buf->msgs <= *index)) {
309 mutex_unlock(&buf->buff_lock);
310 if (opts & NON_BLOCK) {
311#if READ_DEBUG
312 printf("queue: not going to block waiting on %p\n", queue);
313#endif
314 return -1;
315 } else { // BLOCK
316#if READ_DEBUG
317 printf("queue: going to wait for readable(=%p) of %p\n",
318 &buf->readable, queue);
319#endif
320 // wait for a message to become readable
321 if ((index == NULL) ? condition_wait(&buf->readable) :
322 condition_wait_force(&buf->readable)) {
323#if READ_DEBUG
324 printf("queue: waiting for readable(=%p) of %p failed\n",
325 &buf->readable, queue);
326#endif
327 return -1;
328 }
329 }
330#if READ_DEBUG
331 printf("queue: going to re-lock buff_lock of %p to read\n", queue);
332#endif
333 if (mutex_lock(&buf->buff_lock)) {
334#if READ_DEBUG
335 printf("couldn't re-lock buff_lock of %p\n", queue);
336#endif
337 return -1;
338 }
339 }
340#if READ_DEBUG
341 printf("queue: read start=%d end=%d from %p\n", buf->start, buf->end, queue);
342#endif
343 return 0;
344}
345// handles reading with PEEK
346static inline void *read_msg_peek(aos_ring_buf *const buf, int opts, int start) {
347 void *ret;
348 if (opts & FROM_END) {
349 int pos = buf->end - 1;
350 if (pos < 0) { // if it needs to wrap
351 pos = buf->length - 1;
352 }
353#if READ_DEBUG
354 printf("queue: reading from line %d: %d\n", __LINE__, pos);
355#endif
356 ret = buf->data[pos];
357 } else {
358#if READ_DEBUG
359 printf("queue: reading from line %d: %d\n", __LINE__, start);
360#endif
361 ret = buf->data[start];
362 }
363 aos_msg_header *const header = get_header(ret);
364 header->ref_count ++;
365#if REF_DEBUG
366 printf("ref inc count: %p\n", ret);
367#endif
368 return ret;
369}
370const void *aos_queue_read_msg(aos_queue *queue, int opts) {
371#if READ_DEBUG
372 printf("queue: read_msg(%p, %d)\n", queue, opts);
373#endif
374 void *msg = NULL;
375 aos_ring_buf *const buf = &queue->buf;
376 if (aos_queue_read_msg_common(opts, buf, queue, NULL) == -1) {
377#if READ_DEBUG
378 printf("queue: common returned -1 for %p\n", queue);
379#endif
380 return NULL;
381 }
382 if (opts & PEEK) {
383 msg = read_msg_peek(buf, opts, buf->start);
384 } else {
385 if (opts & FROM_END) {
386 while (1) {
387#if READ_DEBUG
388 printf("queue: start of c2 of %p\n", queue);
389#endif
390 // This loop pulls each message out of the buffer.
391 const int pos = buf->start;
392 buf->start = (buf->start + 1) % buf->length;
393 // if this is the last one
394 if (buf->start == buf->end) {
395#if READ_DEBUG
396 printf("queue: reading from c2: %d\n", pos);
397#endif
398 msg = buf->data[pos];
399 break;
400 }
401 // it's not going to be in the queue any more
402 msg_ref_dec(buf->data[pos], &queue->pool, queue);
403 }
404 } else {
405#if READ_DEBUG
406 printf("queue: reading from d2: %d\n", buf->start);
407#endif
408 msg = buf->data[buf->start];
409 buf->start = (buf->start + 1) % buf->length;
410 }
411 }
412 mutex_unlock(&buf->buff_lock);
413 aos_read_msg_common_end(buf, !(opts & PEEK));
414#if READ_DEBUG
415 printf("queue: read returning %p\n", msg);
416#endif
417 return msg;
418}
419const void *aos_queue_read_msg_index(aos_queue *queue, int opts, int *index) {
420#if READ_DEBUG
421 printf("queue: read_msg_index(%p, %d, %p(*=%d))\n", queue, opts, index, *index);
422#endif
423 void *msg = NULL;
424 aos_ring_buf *const buf = &queue->buf;
425 if (aos_queue_read_msg_common(opts, buf, queue, index) == -1) {
426#if READ_DEBUG
427 printf("queue: common returned -1\n");
428#endif
429 return NULL;
430 }
431 // TODO(parker): Handle integer wrap on the index.
432 const int offset = buf->msgs - *index;
433 int my_start = buf->end - offset;
434 if (offset >= buf->length) { // if we're behind the available messages
435 // catch index up to the last available message
436 *index += buf->start - my_start;
437 // and that's the one we're going to read
438 my_start = buf->start;
439 }
440 if (my_start < 0) { // if we want to read off the end of the buffer
441 // unwrap where we're going to read from
442 my_start += buf->length;
443 }
444 if (opts & PEEK) {
445 msg = read_msg_peek(buf, opts, my_start);
446 } else {
447 if (opts & FROM_END) {
448#if READ_DEBUG
449 printf("queue: start of c1 of %p\n", queue);
450#endif
451 int pos = buf->end - 1;
452 if (pos < 0) { // if it wrapped
453 pos = buf->length - 1; // unwrap it
454 }
455#if READ_DEBUG
456 printf("queue: reading from c1: %d\n", pos);
457#endif
458 msg = buf->data[pos];
459 *index = buf->msgs;
460 } else {
461#if READ_DEBUG
462 printf("queue: reading from d1: %d\n", my_start);
463#endif
464 msg = buf->data[my_start];
465 ++(*index);
466 }
467 aos_msg_header *const header = get_header(msg);
468 ++header->ref_count;
469#if REF_DEBUG
470 printf("ref_inc_count: %p\n", msg);
471#endif
472 }
473 mutex_unlock(&buf->buff_lock);
474 // this function never consumes one off the queue
475 aos_read_msg_common_end(buf, 0);
476 return msg;
477}
478static inline void *aos_pool_get_msg(aos_msg_pool *pool) {
479 if (mutex_lock(&pool->pool_lock)) {
480 return NULL;
481 }
482 void *msg;
483 if (pool->length - pool->used > 0) {
484 msg = pool->pool[pool->used];
485 } else {
486 if (pool->length >= pool->mem_length) {
487 //TODO(brians) log this if it isn't the log queue
488 fprintf(stderr, "queue: overused_pool\n");
489 msg = NULL;
490 goto exit;
491 }
492 msg = pool->pool[pool->length] = aos_alloc_msg(pool);
493 ++pool->length;
494 }
495 aos_msg_header *const header = msg;
496 msg = (uint8_t *)msg + sizeof(aos_msg_header);
497 header->ref_count = 1;
498#if REF_DEBUG
499 printf("ref alloc: %p\n", msg);
500#endif
501 header->index = pool->used;
502 ++pool->used;
503exit:
504 mutex_unlock(&pool->pool_lock);
505 return msg;
506}
507void *aos_queue_get_msg(aos_queue *queue) {
508 return aos_pool_get_msg(&queue->pool);
509}
510