blob: 34dbc40046c057da2603bc2d63eb64938027295e [file] [log] [blame]
/**
* @file mqueue.c Thread Safe Message Queue
*
* Copyright (C) 2010 Creytiv.com
*/
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif
#include <re_types.h>
#include <re_fmt.h>
#include <re_mem.h>
#include <re_main.h>
#include <re_net.h>
#include <re_mqueue.h>
#include "mqueue.h"
#define MAGIC 0x14553399
#ifdef WIN32
#include <winsock2.h>
#include <ws2tcpip.h>
#define close closesocket
#endif
/**
* Defines a Thread-safe Message Queue
*
* The Message Queue can be used to communicate between two threads. The
* receiving thread must run the re_main() loop which will be woken up on
* incoming messages from other threads. The sender thread can be any thread.
*/
struct mqueue {
int pfd[2];
mqueue_h *h;
void *arg;
};
struct msg {
void *data;
uint32_t magic;
int id;
};
static void destructor(void *arg)
{
struct mqueue *q = arg;
if (q->pfd[0] >= 0) {
fd_close(q->pfd[0]);
(void)close(q->pfd[0]);
}
if (q->pfd[1] >= 0)
(void)close(q->pfd[1]);
}
static void event_handler(int flags, void *arg)
{
struct mqueue *mq = arg;
struct msg msg;
ssize_t n;
if (!(flags & FD_READ))
return;
n = pipe_read(mq->pfd[0], &msg, sizeof(msg));
if (n < 0)
return;
if (n != sizeof(msg)) {
(void)re_fprintf(stderr, "mqueue: short read of %d bytes\n",
n);
return;
}
if (msg.magic != MAGIC) {
(void)re_fprintf(stderr, "mqueue: bad magic on read (%08x)\n",
msg.magic);
return;
}
mq->h(msg.id, msg.data, mq->arg);
}
/**
* Allocate a new Message Queue
*
* @param mqp Pointer to allocated Message Queue
* @param h Message handler
* @param arg Handler argument
*
* @return 0 if success, otherwise errorcode
*/
int mqueue_alloc(struct mqueue **mqp, mqueue_h *h, void *arg)
{
struct mqueue *mq;
int err = 0;
if (!mqp || !h)
return EINVAL;
mq = mem_zalloc(sizeof(*mq), destructor);
if (!mq)
return ENOMEM;
mq->h = h;
mq->arg = arg;
mq->pfd[0] = mq->pfd[1] = -1;
if (pipe(mq->pfd) < 0) {
err = errno;
goto out;
}
err = net_sockopt_blocking_set(mq->pfd[0], false);
if (err)
goto out;
err = net_sockopt_blocking_set(mq->pfd[1], false);
if (err)
goto out;
err = fd_listen(mq->pfd[0], FD_READ, event_handler, mq);
if (err)
goto out;
out:
if (err)
mem_deref(mq);
else
*mqp = mq;
return err;
}
/**
* Push a new message onto the Message Queue
*
* @param mq Message Queue
* @param id General purpose Identifier
* @param data Application data
*
* @return 0 if success, otherwise errorcode
*/
int mqueue_push(struct mqueue *mq, int id, void *data)
{
struct msg msg;
ssize_t n;
if (!mq)
return EINVAL;
msg.id = id;
msg.data = data;
msg.magic = MAGIC;
n = pipe_write(mq->pfd[1], &msg, sizeof(msg));
if (n < 0)
return errno;
return (n != sizeof(msg)) ? EPIPE : 0;
}