Squashed 'third_party/rawrtc/re/' content from commit f3163ce8b

Change-Id: I6a235e6ac0f03269d951026f9d195da05c40fdab
git-subtree-dir: third_party/rawrtc/re
git-subtree-split: f3163ce8b526a13b35ef71ce4dd6f43585064d8a
diff --git a/src/mqueue/mqueue.c b/src/mqueue/mqueue.c
new file mode 100644
index 0000000..34dbc40
--- /dev/null
+++ b/src/mqueue/mqueue.c
@@ -0,0 +1,168 @@
+/**
+ * @file mqueue.c Thread Safe Message Queue
+ *
+ * Copyright (C) 2010 Creytiv.com
+ */
+#ifdef HAVE_UNISTD_H
+#include <unistd.h>
+#endif
+#include <re_types.h>
+#include <re_fmt.h>
+#include <re_mem.h>
+#include <re_main.h>
+#include <re_net.h>
+#include <re_mqueue.h>
+#include "mqueue.h"
+
+
+#define MAGIC 0x14553399
+
+
+#ifdef WIN32
+#include <winsock2.h>
+#include <ws2tcpip.h>
+#define close closesocket
+#endif
+
+
+/**
+ * Defines a Thread-safe Message Queue
+ *
+ * The Message Queue can be used to communicate between two threads. The
+ * receiving thread must run the re_main() loop which will be woken up on
+ * incoming messages from other threads. The sender thread can be any thread.
+ */
+struct mqueue {
+	int pfd[2];
+	mqueue_h *h;
+	void *arg;
+};
+
+struct msg {
+	void *data;
+	uint32_t magic;
+	int id;
+};
+
+
+static void destructor(void *arg)
+{
+	struct mqueue *q = arg;
+
+	if (q->pfd[0] >= 0) {
+		fd_close(q->pfd[0]);
+		(void)close(q->pfd[0]);
+	}
+	if (q->pfd[1] >= 0)
+		(void)close(q->pfd[1]);
+}
+
+
+static void event_handler(int flags, void *arg)
+{
+	struct mqueue *mq = arg;
+	struct msg msg;
+	ssize_t n;
+
+	if (!(flags & FD_READ))
+		return;
+
+	n = pipe_read(mq->pfd[0], &msg, sizeof(msg));
+	if (n < 0)
+		return;
+
+	if (n != sizeof(msg)) {
+		(void)re_fprintf(stderr, "mqueue: short read of %d bytes\n",
+				 n);
+		return;
+	}
+
+	if (msg.magic != MAGIC) {
+		(void)re_fprintf(stderr, "mqueue: bad magic on read (%08x)\n",
+				 msg.magic);
+		return;
+	}
+
+	mq->h(msg.id, msg.data, mq->arg);
+}
+
+
+/**
+ * Allocate a new Message Queue
+ *
+ * @param mqp Pointer to allocated Message Queue
+ * @param h   Message handler
+ * @param arg Handler argument
+ *
+ * @return 0 if success, otherwise errorcode
+ */
+int mqueue_alloc(struct mqueue **mqp, mqueue_h *h, void *arg)
+{
+	struct mqueue *mq;
+	int err = 0;
+
+	if (!mqp || !h)
+		return EINVAL;
+
+	mq = mem_zalloc(sizeof(*mq), destructor);
+	if (!mq)
+		return ENOMEM;
+
+	mq->h   = h;
+	mq->arg = arg;
+
+	mq->pfd[0] = mq->pfd[1] = -1;
+	if (pipe(mq->pfd) < 0) {
+		err = errno;
+		goto out;
+	}
+
+	err = net_sockopt_blocking_set(mq->pfd[0], false);
+	if (err)
+		goto out;
+
+	err = net_sockopt_blocking_set(mq->pfd[1], false);
+	if (err)
+		goto out;
+
+	err = fd_listen(mq->pfd[0], FD_READ, event_handler, mq);
+	if (err)
+		goto out;
+
+ out:
+	if (err)
+		mem_deref(mq);
+	else
+		*mqp = mq;
+
+	return err;
+}
+
+
+/**
+ * Push a new message onto the Message Queue
+ *
+ * @param mq   Message Queue
+ * @param id   General purpose Identifier
+ * @param data Application data
+ *
+ * @return 0 if success, otherwise errorcode
+ */
+int mqueue_push(struct mqueue *mq, int id, void *data)
+{
+	struct msg msg;
+	ssize_t n;
+
+	if (!mq)
+		return EINVAL;
+
+	msg.id    = id;
+	msg.data  = data;
+	msg.magic = MAGIC;
+
+	n = pipe_write(mq->pfd[1], &msg, sizeof(msg));
+	if (n < 0)
+		return errno;
+
+	return (n != sizeof(msg)) ? EPIPE : 0;
+}