blob: 6c1ce71cab0d73e7ce014c1296d9ee620fc512d2 [file] [log] [blame]
brians343bc112013-02-10 01:53:46 +00001#include "aos/atom_code/ipc_lib/queue.h"
2#include "aos/atom_code/ipc_lib/queue_internal.h"
3
4#include <stdio.h>
5#include <string.h>
6#include <errno.h>
7#include <assert.h>
8
Brian Silvermanf665d692013-02-17 22:11:39 -08009#include "aos/common/logging/logging.h"
10
brians343bc112013-02-10 01:53:46 +000011#define READ_DEBUG 0
12#define WRITE_DEBUG 0
13#define REF_DEBUG 0
14
15static inline aos_msg_header *get_header(void *msg) {
16 return (aos_msg_header *)((uint8_t *)msg - sizeof(aos_msg_header));
17}
18static inline aos_queue *aos_core_alloc_queue() {
19 return shm_malloc_aligned(sizeof(aos_queue), sizeof(int));
20}
21static inline void *aos_alloc_msg(aos_msg_pool *pool) {
22 return shm_malloc(pool->msg_length);
23}
24
25// actually free the given message
26static inline int aos_free_msg(aos_msg_pool *pool, void *msg, aos_queue *queue) {
27#if REF_DEBUG
28 if (pool->pool_lock == 0) {
29 //LOG(WARNING, "unprotected\n");
30 }
31#endif
32 aos_msg_header *header = get_header(msg);
33 if (pool->pool[header->index] != header) { // if something's messed up
34 fprintf(stderr, "queue: something is very very wrong with queue %p."
35 " pool->pool(=%p)[header->index(=%d)] != header(=%p)\n",
36 queue, pool->pool, header->index, header);
37 printf("queue: see stderr\n");
38 abort();
39 }
40#if REF_DEBUG
Brian Silvermanf665d692013-02-17 22:11:39 -080041 printf("ref free: %p\n", msg);
brians343bc112013-02-10 01:53:46 +000042#endif
43 --pool->used;
44
45 if (queue->recycle != NULL) {
46 void *const new_msg = aos_queue_get_msg(queue->recycle);
47 if (new_msg == NULL) {
48 fprintf(stderr, "queue: couldn't get a message"
49 " for recycle queue %p\n", queue->recycle);
50 } else {
51 // Take a message from recycle_queue and switch its
52 // header with the one being freed, which effectively
53 // switches which queue each message belongs to.
54 aos_msg_header *const new_header = get_header(new_msg);
55 // also switch the messages between the pools
56 pool->pool[header->index] = new_header;
57 if (mutex_lock(&queue->recycle->pool.pool_lock)) {
58 return -1;
59 }
60 queue->recycle->pool.pool[new_header->index] = header;
61 // swap the information in both headers
62 header_swap(header, new_header);
63 // don't unlock the other pool until all of its messages are valid
64 mutex_unlock(&queue->recycle->pool.pool_lock);
65 // use the header for new_msg which is now for this pool
66 header = new_header;
67 if (aos_queue_write_msg_free(queue->recycle,
68 (void *)msg, OVERRIDE) != 0) {
69 printf("queue: warning aos_queue_write_msg("
70 "%p(=queue(=%p)->recycle), %p, OVERRIDE)"
71 " failed\n",
72 queue->recycle, queue, msg);
73 }
74 msg = new_msg;
75 }
76 }
77
78 // where the one we're freeing was
79 int index = header->index;
80 header->index = -1;
81 if (index != pool->used) { // if we're not freeing the one on the end
82 // put the last one where the one we're freeing was
83 header = pool->pool[index] = pool->pool[pool->used];
84 // put the one we're freeing at the end
85 pool->pool[pool->used] = get_header(msg);
86 // update the former last one's index
87 header->index = index;
88 }
89 return 0;
90}
91// TODO(brians) maybe do this with atomic integer instructions so it doesn't have to lock/unlock pool_lock
92static inline int msg_ref_dec(void *msg, aos_msg_pool *pool, aos_queue *queue) {
93 if (msg == NULL) {
94 return 0;
95 }
96
97 int rv = 0;
98 if (mutex_lock(&pool->pool_lock)) {
99 return -1;
100 }
101 aos_msg_header *const header = get_header(msg);
102 header->ref_count --;
103 assert(header->ref_count >= 0);
104#if REF_DEBUG
105 printf("ref_dec_count: %p count=%d\n", msg, header->ref_count);
106#endif
107 if (header->ref_count == 0) {
108 rv = aos_free_msg(pool, msg, queue);
109 }
110 mutex_unlock(&pool->pool_lock);
111 return rv;
112}
113
114static inline int sigcmp(const aos_type_sig *sig1, const aos_type_sig *sig2) {
115 if (sig1->length != sig2->length) {
116 //LOG(DEBUG, "length mismatch 1=%d 2=%d\n", sig1->length, sig2->length);
117 return 0;
118 }
119 if (sig1->queue_length != sig2->queue_length) {
120 //LOG(DEBUG, "queue_length mismatch 1=%d 2=%d\n", sig1->queue_length, sig2->queue_length);
121 return 0;
122 }
123 if (sig1->hash != sig2->hash) {
124 //LOG(DEBUG, "hash mismatch 1=%d 2=%d\n", sig1->hash, sig2->hash);
125 return 0;
126 }
127 //LOG(DEBUG, "signature match\n");
128 return 1;
129}
130static inline aos_queue *aos_create_queue(const aos_type_sig *sig) {
131 aos_queue *const queue = aos_core_alloc_queue();
132 aos_msg_pool *const pool = &queue->pool;
133 pool->mem_length = sig->queue_length + EXTRA_MESSAGES;
134 pool->length = 0;
135 pool->used = 0;
136 pool->msg_length = sig->length + sizeof(aos_msg_header);
137 pool->pool = shm_malloc(sizeof(void *) * pool->mem_length);
138 aos_ring_buf *const buf = &queue->buf;
139 buf->length = sig->queue_length + 1;
140 if (buf->length < 2) { // TODO(brians) when could this happen?
141 buf->length = 2;
142 }
143 buf->data = shm_malloc(buf->length * sizeof(void *));
144 buf->start = 0;
145 buf->end = 0;
146 buf->msgs = 0;
Brian Silvermanaf221b82013-09-01 13:57:50 -0700147 memset(&buf->writable, 0, sizeof(buf->writable));
148 memset(&buf->readable, 0, sizeof(buf->readable));
brians343bc112013-02-10 01:53:46 +0000149 buf->buff_lock = 0;
150 pool->pool_lock = 0;
151 queue->recycle = NULL;
152 return queue;
153}
154aos_queue *aos_fetch_queue(const char *name, const aos_type_sig *sig) {
155 //LOG(DEBUG, "Fetching the stupid queue: %s\n", name);
156 mutex_grab(&global_core->mem_struct->queues.alloc_lock);
157 aos_queue_list *list = global_core->mem_struct->queues.queue_list;
158 aos_queue_list *last = NULL;
159 while (list != NULL) {
160 // if we found a matching queue
161 if (strcmp(list->name, name) == 0 && sigcmp(&list->sig, sig)) {
162 mutex_unlock(&global_core->mem_struct->queues.alloc_lock);
163 return list->queue;
164 } else {
165 //LOG(DEBUG, "rejected queue %s strcmp=%d target=%s\n", (*list)->name, strcmp((*list)->name, name), name);
166 }
167 last = list;
168 list = list->next;
169 }
170 list = shm_malloc(sizeof(aos_queue_list));
171 if (last == NULL) {
172 global_core->mem_struct->queues.queue_list = list;
173 } else {
174 last->next = list;
175 }
176 list->sig = *sig;
177 const size_t name_size = strlen(name) + 1;
178 list->name = shm_malloc(name_size);
179 memcpy(list->name, name, name_size);
180 //LOG(INFO, "creating queue{name=%s, sig.length=%zd, sig.hash=%d, sig.queue_length=%d}\n", name, sig->length, sig->hash, sig->queue_length);
181 list->queue = aos_create_queue(sig);
182 //LOG(DEBUG, "Made the stupid queue: %s happy?\n", name);
183 list->next = NULL;
184 mutex_unlock(&global_core->mem_struct->queues.alloc_lock);
185 return list->queue;
186}
187aos_queue *aos_fetch_queue_recycle(const char *name, const aos_type_sig *sig,
188 const aos_type_sig *recycle_sig, aos_queue **recycle) {
189 if (sig->length != recycle_sig->length || sig->hash == recycle_sig->hash) {
190 *recycle = NULL;
191 return NULL;
192 }
193 aos_queue *const r = aos_fetch_queue(name, sig);
194 r->recycle = aos_fetch_queue(name, recycle_sig);
195 if (r == r->recycle) {
196 fprintf(stderr, "queue: r->recycle(=%p) == r(=%p)\n", r->recycle, r);
197 printf("see stderr\n");
198 abort();
199 }
200 *recycle = r->recycle;
201 return r;
202}
203
204int aos_queue_write_msg(aos_queue *queue, void *msg, int opts) {
205#if WRITE_DEBUG
206 printf("queue: write_msg(%p, %p, %d)\n", queue, msg, opts);
207#endif
208 int rv = 0;
209 if (msg == NULL || msg < (void *)global_core->mem_struct ||
210 msg > (void *)((intptr_t)global_core->mem_struct + global_core->size)) {
211 fprintf(stderr, "queue: attempt to write bad message %p to %p. aborting\n",
212 msg, queue);
213 printf("see stderr\n");
214 abort();
215 }
216 aos_ring_buf *const buf = &queue->buf;
217 if (mutex_lock(&buf->buff_lock)) {
218#if WRITE_DEBUG
219 printf("queue: locking buff_lock of %p failed\n", buf);
220#endif
221 return -1;
222 }
223 int new_end = (buf->end + 1) % buf->length;
224 while (new_end == buf->start) {
225 if (opts & NON_BLOCK) {
226#if WRITE_DEBUG
227 printf("queue: not blocking on %p. returning -1\n", queue);
228#endif
229 mutex_unlock(&buf->buff_lock);
230 return -1;
231 } else if (opts & OVERRIDE) {
232#if WRITE_DEBUG
233 printf("queue: overriding on %p\n", queue);
234#endif
235 // avoid leaking the message that we're going to overwrite
236 msg_ref_dec(buf->data[buf->start], &queue->pool, queue);
237 buf->start = (buf->start + 1) % buf->length;
238 } else { // BLOCK
brians343bc112013-02-10 01:53:46 +0000239#if WRITE_DEBUG
240 printf("queue: going to wait for writable(=%p) of %p\n",
241 &buf->writable, queue);
242#endif
Brian Silvermanaf221b82013-09-01 13:57:50 -0700243 condition_wait(&buf->writable, &buf->buff_lock);
brians343bc112013-02-10 01:53:46 +0000244#if WRITE_DEBUG
Brian Silvermanaf221b82013-09-01 13:57:50 -0700245 printf("queue: done waiting for writable(=%p) of %p\n",
246 &buf->writable, queue);
brians343bc112013-02-10 01:53:46 +0000247#endif
Brian Silvermanaf221b82013-09-01 13:57:50 -0700248 }
249 new_end = (buf->end + 1) % buf->length;
250 }
brians343bc112013-02-10 01:53:46 +0000251 buf->data[buf->end] = msg;
252 ++buf->msgs;
253 buf->end = new_end;
254 mutex_unlock(&buf->buff_lock);
255#if WRITE_DEBUG
256 printf("queue: setting readable(=%p) of %p\n", &buf->readable, queue);
257#endif
Brian Silvermanaf221b82013-09-01 13:57:50 -0700258 condition_signal(&buf->readable);
brians343bc112013-02-10 01:53:46 +0000259#if WRITE_DEBUG
260 printf("queue: write returning %d on queue %p\n", rv, queue);
261#endif
262 return rv;
263}
264
265int aos_queue_free_msg(aos_queue *queue, const void *msg) {
266 // TODO(brians) get rid of this
267 void *msg_temp;
268 memcpy(&msg_temp, &msg, sizeof(msg_temp));
269 return msg_ref_dec(msg_temp, &queue->pool, queue);
270}
271// Deals with setting/unsetting readable and writable.
272// Should be called after buff_lock has been unlocked.
273// read is whether or not this read call read one off the queue
274static inline void aos_read_msg_common_end(aos_ring_buf *const buf, int read) {
275 if (read) {
Brian Silvermanaf221b82013-09-01 13:57:50 -0700276 condition_signal(&buf->writable);
brians343bc112013-02-10 01:53:46 +0000277 }
278}
279// Returns with buff_lock locked and a readable message in buf.
280// Returns -1 for error (if it returns -1, buff_lock will be unlocked).
281static inline int aos_queue_read_msg_common(int opts, aos_ring_buf *const buf,
282 aos_queue *const queue, int *index) {
283#if !READ_DEBUG
284 (void)queue;
285#endif
286 if (mutex_lock(&buf->buff_lock)) {
287#if READ_DEBUG
288 printf("queue: couldn't lock buff_lock of %p\n", queue);
289#endif
290 return -1;
291 }
292 while (buf->start == buf->end || ((index != NULL) && buf->msgs <= *index)) {
brians343bc112013-02-10 01:53:46 +0000293 if (opts & NON_BLOCK) {
Brian Silvermanaf221b82013-09-01 13:57:50 -0700294 mutex_unlock(&buf->buff_lock);
brians343bc112013-02-10 01:53:46 +0000295#if READ_DEBUG
296 printf("queue: not going to block waiting on %p\n", queue);
297#endif
298 return -1;
299 } else { // BLOCK
300#if READ_DEBUG
301 printf("queue: going to wait for readable(=%p) of %p\n",
302 &buf->readable, queue);
303#endif
304 // wait for a message to become readable
Brian Silvermanaf221b82013-09-01 13:57:50 -0700305 condition_wait(&buf->readable, &buf->buff_lock);
brians343bc112013-02-10 01:53:46 +0000306#if READ_DEBUG
Brian Silvermanaf221b82013-09-01 13:57:50 -0700307 printf("queue: done waiting for readable(=%p) of %p\n",
308 &buf->readable, queue);
brians343bc112013-02-10 01:53:46 +0000309#endif
brians343bc112013-02-10 01:53:46 +0000310 }
311 }
312#if READ_DEBUG
313 printf("queue: read start=%d end=%d from %p\n", buf->start, buf->end, queue);
314#endif
315 return 0;
316}
317// handles reading with PEEK
318static inline void *read_msg_peek(aos_ring_buf *const buf, int opts, int start) {
319 void *ret;
320 if (opts & FROM_END) {
321 int pos = buf->end - 1;
322 if (pos < 0) { // if it needs to wrap
323 pos = buf->length - 1;
324 }
325#if READ_DEBUG
326 printf("queue: reading from line %d: %d\n", __LINE__, pos);
327#endif
328 ret = buf->data[pos];
329 } else {
330#if READ_DEBUG
331 printf("queue: reading from line %d: %d\n", __LINE__, start);
332#endif
333 ret = buf->data[start];
334 }
335 aos_msg_header *const header = get_header(ret);
336 header->ref_count ++;
337#if REF_DEBUG
338 printf("ref inc count: %p\n", ret);
339#endif
340 return ret;
341}
342const void *aos_queue_read_msg(aos_queue *queue, int opts) {
343#if READ_DEBUG
344 printf("queue: read_msg(%p, %d)\n", queue, opts);
345#endif
346 void *msg = NULL;
347 aos_ring_buf *const buf = &queue->buf;
348 if (aos_queue_read_msg_common(opts, buf, queue, NULL) == -1) {
349#if READ_DEBUG
350 printf("queue: common returned -1 for %p\n", queue);
351#endif
352 return NULL;
353 }
354 if (opts & PEEK) {
355 msg = read_msg_peek(buf, opts, buf->start);
356 } else {
357 if (opts & FROM_END) {
358 while (1) {
359#if READ_DEBUG
360 printf("queue: start of c2 of %p\n", queue);
361#endif
362 // This loop pulls each message out of the buffer.
363 const int pos = buf->start;
364 buf->start = (buf->start + 1) % buf->length;
365 // if this is the last one
366 if (buf->start == buf->end) {
367#if READ_DEBUG
368 printf("queue: reading from c2: %d\n", pos);
369#endif
370 msg = buf->data[pos];
371 break;
372 }
373 // it's not going to be in the queue any more
374 msg_ref_dec(buf->data[pos], &queue->pool, queue);
375 }
376 } else {
377#if READ_DEBUG
378 printf("queue: reading from d2: %d\n", buf->start);
379#endif
380 msg = buf->data[buf->start];
381 buf->start = (buf->start + 1) % buf->length;
382 }
383 }
384 mutex_unlock(&buf->buff_lock);
385 aos_read_msg_common_end(buf, !(opts & PEEK));
386#if READ_DEBUG
387 printf("queue: read returning %p\n", msg);
388#endif
389 return msg;
390}
391const void *aos_queue_read_msg_index(aos_queue *queue, int opts, int *index) {
392#if READ_DEBUG
393 printf("queue: read_msg_index(%p, %d, %p(*=%d))\n", queue, opts, index, *index);
394#endif
395 void *msg = NULL;
396 aos_ring_buf *const buf = &queue->buf;
397 if (aos_queue_read_msg_common(opts, buf, queue, index) == -1) {
398#if READ_DEBUG
399 printf("queue: common returned -1\n");
400#endif
401 return NULL;
402 }
403 // TODO(parker): Handle integer wrap on the index.
404 const int offset = buf->msgs - *index;
405 int my_start = buf->end - offset;
406 if (offset >= buf->length) { // if we're behind the available messages
407 // catch index up to the last available message
408 *index += buf->start - my_start;
409 // and that's the one we're going to read
410 my_start = buf->start;
411 }
412 if (my_start < 0) { // if we want to read off the end of the buffer
413 // unwrap where we're going to read from
414 my_start += buf->length;
415 }
416 if (opts & PEEK) {
417 msg = read_msg_peek(buf, opts, my_start);
418 } else {
419 if (opts & FROM_END) {
420#if READ_DEBUG
421 printf("queue: start of c1 of %p\n", queue);
422#endif
423 int pos = buf->end - 1;
424 if (pos < 0) { // if it wrapped
425 pos = buf->length - 1; // unwrap it
426 }
427#if READ_DEBUG
428 printf("queue: reading from c1: %d\n", pos);
429#endif
430 msg = buf->data[pos];
431 *index = buf->msgs;
432 } else {
433#if READ_DEBUG
434 printf("queue: reading from d1: %d\n", my_start);
435#endif
436 msg = buf->data[my_start];
437 ++(*index);
438 }
439 aos_msg_header *const header = get_header(msg);
440 ++header->ref_count;
441#if REF_DEBUG
442 printf("ref_inc_count: %p\n", msg);
443#endif
444 }
445 mutex_unlock(&buf->buff_lock);
446 // this function never consumes one off the queue
447 aos_read_msg_common_end(buf, 0);
448 return msg;
449}
450static inline void *aos_pool_get_msg(aos_msg_pool *pool) {
451 if (mutex_lock(&pool->pool_lock)) {
452 return NULL;
453 }
454 void *msg;
455 if (pool->length - pool->used > 0) {
456 msg = pool->pool[pool->used];
457 } else {
458 if (pool->length >= pool->mem_length) {
Brian Silvermanf665d692013-02-17 22:11:39 -0800459 LOG(FATAL, "overused pool %p\n", pool);
brians343bc112013-02-10 01:53:46 +0000460 }
461 msg = pool->pool[pool->length] = aos_alloc_msg(pool);
462 ++pool->length;
463 }
464 aos_msg_header *const header = msg;
465 msg = (uint8_t *)msg + sizeof(aos_msg_header);
466 header->ref_count = 1;
467#if REF_DEBUG
468 printf("ref alloc: %p\n", msg);
469#endif
470 header->index = pool->used;
471 ++pool->used;
brians343bc112013-02-10 01:53:46 +0000472 mutex_unlock(&pool->pool_lock);
473 return msg;
474}
475void *aos_queue_get_msg(aos_queue *queue) {
476 return aos_pool_get_msg(&queue->pool);
477}
478