blob: 34dbc40046c057da2603bc2d63eb64938027295e [file] [log] [blame]
James Kuszmaul82f6c042021-01-17 11:30:16 -08001/**
2 * @file mqueue.c Thread Safe Message Queue
3 *
4 * Copyright (C) 2010 Creytiv.com
5 */
6#ifdef HAVE_UNISTD_H
7#include <unistd.h>
8#endif
9#include <re_types.h>
10#include <re_fmt.h>
11#include <re_mem.h>
12#include <re_main.h>
13#include <re_net.h>
14#include <re_mqueue.h>
15#include "mqueue.h"
16
17
18#define MAGIC 0x14553399
19
20
21#ifdef WIN32
22#include <winsock2.h>
23#include <ws2tcpip.h>
24#define close closesocket
25#endif
26
27
28/**
29 * Defines a Thread-safe Message Queue
30 *
31 * The Message Queue can be used to communicate between two threads. The
32 * receiving thread must run the re_main() loop which will be woken up on
33 * incoming messages from other threads. The sender thread can be any thread.
34 */
35struct mqueue {
36 int pfd[2];
37 mqueue_h *h;
38 void *arg;
39};
40
41struct msg {
42 void *data;
43 uint32_t magic;
44 int id;
45};
46
47
48static void destructor(void *arg)
49{
50 struct mqueue *q = arg;
51
52 if (q->pfd[0] >= 0) {
53 fd_close(q->pfd[0]);
54 (void)close(q->pfd[0]);
55 }
56 if (q->pfd[1] >= 0)
57 (void)close(q->pfd[1]);
58}
59
60
61static void event_handler(int flags, void *arg)
62{
63 struct mqueue *mq = arg;
64 struct msg msg;
65 ssize_t n;
66
67 if (!(flags & FD_READ))
68 return;
69
70 n = pipe_read(mq->pfd[0], &msg, sizeof(msg));
71 if (n < 0)
72 return;
73
74 if (n != sizeof(msg)) {
75 (void)re_fprintf(stderr, "mqueue: short read of %d bytes\n",
76 n);
77 return;
78 }
79
80 if (msg.magic != MAGIC) {
81 (void)re_fprintf(stderr, "mqueue: bad magic on read (%08x)\n",
82 msg.magic);
83 return;
84 }
85
86 mq->h(msg.id, msg.data, mq->arg);
87}
88
89
90/**
91 * Allocate a new Message Queue
92 *
93 * @param mqp Pointer to allocated Message Queue
94 * @param h Message handler
95 * @param arg Handler argument
96 *
97 * @return 0 if success, otherwise errorcode
98 */
99int mqueue_alloc(struct mqueue **mqp, mqueue_h *h, void *arg)
100{
101 struct mqueue *mq;
102 int err = 0;
103
104 if (!mqp || !h)
105 return EINVAL;
106
107 mq = mem_zalloc(sizeof(*mq), destructor);
108 if (!mq)
109 return ENOMEM;
110
111 mq->h = h;
112 mq->arg = arg;
113
114 mq->pfd[0] = mq->pfd[1] = -1;
115 if (pipe(mq->pfd) < 0) {
116 err = errno;
117 goto out;
118 }
119
120 err = net_sockopt_blocking_set(mq->pfd[0], false);
121 if (err)
122 goto out;
123
124 err = net_sockopt_blocking_set(mq->pfd[1], false);
125 if (err)
126 goto out;
127
128 err = fd_listen(mq->pfd[0], FD_READ, event_handler, mq);
129 if (err)
130 goto out;
131
132 out:
133 if (err)
134 mem_deref(mq);
135 else
136 *mqp = mq;
137
138 return err;
139}
140
141
142/**
143 * Push a new message onto the Message Queue
144 *
145 * @param mq Message Queue
146 * @param id General purpose Identifier
147 * @param data Application data
148 *
149 * @return 0 if success, otherwise errorcode
150 */
151int mqueue_push(struct mqueue *mq, int id, void *data)
152{
153 struct msg msg;
154 ssize_t n;
155
156 if (!mq)
157 return EINVAL;
158
159 msg.id = id;
160 msg.data = data;
161 msg.magic = MAGIC;
162
163 n = pipe_write(mq->pfd[1], &msg, sizeof(msg));
164 if (n < 0)
165 return errno;
166
167 return (n != sizeof(msg)) ? EPIPE : 0;
168}