blob: b90c139daff0fc6e223a253b7fa6337370b4e0de [file] [log] [blame]
James Kuszmaul82f6c042021-01-17 11:30:16 -08001/**
2 * @file main.c Main polling routine
3 *
4 * Copyright (C) 2010 Creytiv.com
5 */
6#ifdef HAVE_SYS_TIME_H
7#include <sys/time.h>
8#endif
9#include <sys/types.h>
10#undef _STRICT_ANSI
11#include <string.h>
12#ifdef HAVE_UNISTD_H
13#include <unistd.h>
14#endif
15#ifdef WIN32
16#include <winsock.h>
17#endif
18#ifdef HAVE_SIGNAL
19#include <signal.h>
20#endif
21#ifdef HAVE_SELECT_H
22#include <sys/select.h>
23#endif
24#ifdef HAVE_POLL
25#include <poll.h>
26#endif
27#ifdef HAVE_EPOLL
28#include <sys/epoll.h>
29#endif
30#ifdef HAVE_KQUEUE
31#include <sys/types.h>
32#include <sys/event.h>
33#include <sys/time.h>
34#undef LIST_INIT
35#undef LIST_FOREACH
36#endif
37#include <re_types.h>
38#include <re_fmt.h>
39#include <re_mem.h>
40#include <re_mbuf.h>
41#include <re_list.h>
42#include <re_tmr.h>
43#include <re_main.h>
44#include "main.h"
45#ifdef HAVE_PTHREAD
46#define __USE_GNU 1
47#include <stdlib.h>
48#include <pthread.h>
49#endif
50
51
52#define DEBUG_MODULE "main"
53#define DEBUG_LEVEL 5
54#include <re_dbg.h>
55
56/*
57 epoll() has been tested successfully on the following kernels:
58
59 - Linux 2.6.16.29-xen (Debian 4.0 etch)
60 - Linux 2.6.18-4-amd64 (Debian 4.0 etch)
61
62
63 TODO clean this up
64
65 - The polling method is selectable both in compile-time and run-time
66 - The polling method can be changed in run time. this is cool!
67 - Maximum number of fds can be set from application, but only once!
68 - Look at howto optimise main loop
69 */
70
71#if !defined (RELEASE) && !defined (MAIN_DEBUG)
72#define MAIN_DEBUG 1 /**< Enable main loop debugging */
73#endif
74
75
76/** Main loop values */
77enum {
78 MAX_BLOCKING = 100, /**< Maximum time spent in handler in [ms] */
79#if defined (FD_SETSIZE)
80 DEFAULT_MAXFDS = FD_SETSIZE
81#else
82 DEFAULT_MAXFDS = 128
83#endif
84};
85
86
87/** Polling loop data */
88struct re {
89 /** File descriptor handler set */
90 struct {
91 int flags; /**< Polling flags (Read, Write, etc.) */
92 fd_h *fh; /**< Event handler */
93 void *arg; /**< Handler argument */
94 } *fhs;
95 int maxfds; /**< Maximum number of polling fds */
96 int nfds; /**< Number of active file descriptors */
97 enum poll_method method; /**< The current polling method */
98 bool update; /**< File descriptor set need updating */
99 bool polling; /**< Is polling flag */
100 int sig; /**< Last caught signal */
101 struct list tmrl; /**< List of timers */
102
103#ifdef HAVE_POLL
104 struct pollfd *fds; /**< Event set for poll() */
105#endif
106
107#ifdef HAVE_EPOLL
108 struct epoll_event *events; /**< Event set for epoll() */
109 int epfd; /**< epoll control file descriptor */
110#endif
111
112#ifdef HAVE_KQUEUE
113 struct kevent *evlist;
114 int kqfd;
115#endif
116
117#ifdef HAVE_PTHREAD
118 pthread_mutex_t mutex; /**< Mutex for thread synchronization */
119 pthread_mutex_t *mutexp; /**< Pointer to active mutex */
120#endif
121};
122
123static struct re global_re = {
124 NULL,
125 0,
126 0,
127 METHOD_NULL,
128 false,
129 false,
130 0,
131 LIST_INIT,
132#ifdef HAVE_POLL
133 NULL,
134#endif
135#ifdef HAVE_EPOLL
136 NULL,
137 -1,
138#endif
139#ifdef HAVE_KQUEUE
140 NULL,
141 -1,
142#endif
143#ifdef HAVE_PTHREAD
144#if MAIN_DEBUG && defined (PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP)
145 PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP,
146#else
147 PTHREAD_MUTEX_INITIALIZER,
148#endif
149 &global_re.mutex,
150#endif
151};
152
153
154#ifdef HAVE_PTHREAD
155
156static void poll_close(struct re *re);
157
158static pthread_once_t pt_once = PTHREAD_ONCE_INIT;
159static pthread_key_t pt_key;
160
161
162static void thread_destructor(void *arg)
163{
164 poll_close(arg);
165 free(arg);
166}
167
168
169static void re_once(void)
170{
171 pthread_key_create(&pt_key, thread_destructor);
172}
173
174
175static struct re *re_get(void)
176{
177 struct re *re;
178
179 pthread_once(&pt_once, re_once);
180
181 re = pthread_getspecific(pt_key);
182 if (!re) {
183 re = &global_re;
184 }
185
186 return re;
187}
188
189
190static inline void re_lock(struct re *re)
191{
192 int err;
193
194 err = pthread_mutex_lock(re->mutexp);
195 if (err) {
196 DEBUG_WARNING("re_lock: %m\n", err);
197 }
198}
199
200
201static inline void re_unlock(struct re *re)
202{
203 int err;
204
205 err = pthread_mutex_unlock(re->mutexp);
206 if (err) {
207 DEBUG_WARNING("re_unlock: %m\n", err);
208 }
209}
210
211
212#else
213
214static struct re *re_get(void)
215{
216 return &global_re;
217}
218
219#define re_lock(x) /**< Stub */
220#define re_unlock(x) /**< Stub */
221
222#endif
223
224
225#if MAIN_DEBUG
226/**
227 * Call the application event handler
228 *
229 * @param re Poll state
230 * @param fd File descriptor
231 * @param flags Event flags
232 */
233static void fd_handler(struct re *re, int fd, int flags)
234{
235 const uint64_t tick = tmr_jiffies();
236 uint32_t diff;
237
238 DEBUG_INFO("event on fd=%d (flags=0x%02x)...\n", fd, flags);
239
240 re->fhs[fd].fh(flags, re->fhs[fd].arg);
241
242 diff = (uint32_t)(tmr_jiffies() - tick);
243
244 if (diff > MAX_BLOCKING) {
245 DEBUG_WARNING("long async blocking: %u>%u ms (h=%p arg=%p)\n",
246 diff, MAX_BLOCKING,
247 re->fhs[fd].fh, re->fhs[fd].arg);
248 }
249}
250#endif
251
252
253#ifdef HAVE_POLL
254static int set_poll_fds(struct re *re, int fd, int flags)
255{
256 if (!re->fds)
257 return 0;
258
259 if (flags)
260 re->fds[fd].fd = fd;
261 else
262 re->fds[fd].fd = -1;
263
264 re->fds[fd].events = 0;
265 if (flags & FD_READ)
266 re->fds[fd].events |= POLLIN;
267 if (flags & FD_WRITE)
268 re->fds[fd].events |= POLLOUT;
269 if (flags & FD_EXCEPT)
270 re->fds[fd].events |= POLLERR;
271
272 return 0;
273}
274#endif
275
276
277#ifdef HAVE_EPOLL
278static int set_epoll_fds(struct re *re, int fd, int flags)
279{
280 struct epoll_event event;
281 int err = 0;
282
283 if (re->epfd < 0)
284 return EBADFD;
285
286 memset(&event, 0, sizeof(event));
287
288 DEBUG_INFO("set_epoll_fds: fd=%d flags=0x%02x\n", fd, flags);
289
290 if (flags) {
291 event.data.fd = fd;
292
293 if (flags & FD_READ)
294 event.events |= EPOLLIN;
295 if (flags & FD_WRITE)
296 event.events |= EPOLLOUT;
297 if (flags & FD_EXCEPT)
298 event.events |= EPOLLERR;
299
300 /* Try to add it first */
301 if (-1 == epoll_ctl(re->epfd, EPOLL_CTL_ADD, fd, &event)) {
302
303 /* If already exist then modify it */
304 if (EEXIST == errno) {
305
306 if (-1 == epoll_ctl(re->epfd, EPOLL_CTL_MOD,
307 fd, &event)) {
308 err = errno;
309 DEBUG_WARNING("epoll_ctl:"
310 " EPOLL_CTL_MOD:"
311 " fd=%d (%m)\n",
312 fd, err);
313 }
314 }
315 else {
316 err = errno;
317 DEBUG_WARNING("epoll_ctl: EPOLL_CTL_ADD:"
318 " fd=%d (%m)\n",
319 fd, err);
320 }
321 }
322 }
323 else {
324 if (-1 == epoll_ctl(re->epfd, EPOLL_CTL_DEL, fd, &event)) {
325 err = errno;
326 DEBUG_INFO("epoll_ctl: EPOLL_CTL_DEL: fd=%d (%m)\n",
327 fd, err);
328 }
329 }
330
331 return err;
332}
333#endif
334
335
336#ifdef HAVE_KQUEUE
337static int set_kqueue_fds(struct re *re, int fd, int flags)
338{
339 struct kevent kev[2];
340 int r, n = 0;
341
342 memset(kev, 0, sizeof(kev));
343
344 /* always delete the events */
345 EV_SET(&kev[0], fd, EVFILT_READ, EV_DELETE, 0, 0, 0);
346 EV_SET(&kev[1], fd, EVFILT_WRITE, EV_DELETE, 0, 0, 0);
347 kevent(re->kqfd, kev, 2, NULL, 0, NULL);
348
349 memset(kev, 0, sizeof(kev));
350
351 if (flags & FD_WRITE) {
352 EV_SET(&kev[n], fd, EVFILT_WRITE, EV_ADD, 0, 0, 0);
353 ++n;
354 }
355 if (flags & FD_READ) {
356 EV_SET(&kev[n], fd, EVFILT_READ, EV_ADD, 0, 0, 0);
357 ++n;
358 }
359
360 if (n) {
361 r = kevent(re->kqfd, kev, n, NULL, 0, NULL);
362 if (r < 0) {
363 int err = errno;
364
365 DEBUG_WARNING("set: [fd=%d, flags=%x] kevent: %m\n",
366 fd, flags, err);
367 return err;
368 }
369 }
370
371 return 0;
372}
373#endif
374
375
376/**
377 * Rebuild the file descriptor mapping table. This must be done whenever
378 * the polling method is changed.
379 */
380static int rebuild_fds(struct re *re)
381{
382 int i, err = 0;
383
384 DEBUG_INFO("rebuilding fds (nfds=%d)\n", re->nfds);
385
386 /* Update fd sets */
387 for (i=0; i<re->nfds; i++) {
388 if (!re->fhs[i].fh)
389 continue;
390
391 switch (re->method) {
392
393#ifdef HAVE_POLL
394 case METHOD_POLL:
395 err = set_poll_fds(re, i, re->fhs[i].flags);
396 break;
397#endif
398#ifdef HAVE_EPOLL
399 case METHOD_EPOLL:
400 err = set_epoll_fds(re, i, re->fhs[i].flags);
401 break;
402#endif
403
404#ifdef HAVE_KQUEUE
405 case METHOD_KQUEUE:
406 err = set_kqueue_fds(re, i, re->fhs[i].flags);
407 break;
408#endif
409
410 default:
411 break;
412 }
413
414 if (err)
415 break;
416 }
417
418 return err;
419}
420
421
422static int poll_init(struct re *re)
423{
424 DEBUG_INFO("poll init (maxfds=%d)\n", re->maxfds);
425
426 if (!re->maxfds) {
427 DEBUG_WARNING("poll init: maxfds is 0\n");
428 return EINVAL;
429 }
430
431 switch (re->method) {
432
433#ifdef HAVE_POLL
434 case METHOD_POLL:
435 if (!re->fds) {
436 re->fds = mem_zalloc(re->maxfds * sizeof(*re->fds),
437 NULL);
438 if (!re->fds)
439 return ENOMEM;
440 }
441 break;
442#endif
443#ifdef HAVE_EPOLL
444 case METHOD_EPOLL:
445 if (!re->events) {
446 DEBUG_INFO("allocate %u bytes for epoll set\n",
447 re->maxfds * sizeof(*re->events));
448 re->events = mem_zalloc(re->maxfds*sizeof(*re->events),
449 NULL);
450 if (!re->events)
451 return ENOMEM;
452 }
453
454 if (re->epfd < 0
455 && -1 == (re->epfd = epoll_create(re->maxfds))) {
456
457 int err = errno;
458
459 DEBUG_WARNING("epoll_create: %m (maxfds=%d)\n",
460 err, re->maxfds);
461 return err;
462 }
463 DEBUG_INFO("init: epoll_create() epfd=%d\n", re->epfd);
464 break;
465#endif
466
467#ifdef HAVE_KQUEUE
468 case METHOD_KQUEUE:
469
470 if (!re->evlist) {
471 size_t sz = re->maxfds * sizeof(*re->evlist);
472 re->evlist = mem_zalloc(sz, NULL);
473 if (!re->evlist)
474 return ENOMEM;
475 }
476
477 if (re->kqfd < 0) {
478 re->kqfd = kqueue();
479 if (re->kqfd < 0)
480 return errno;
481 DEBUG_INFO("kqueue: fd=%d\n", re->kqfd);
482 }
483
484 break;
485#endif
486
487 default:
488 break;
489 }
490 return 0;
491}
492
493
494/** Free all resources */
495static void poll_close(struct re *re)
496{
497 DEBUG_INFO("poll close\n");
498
499 re->fhs = mem_deref(re->fhs);
500 re->maxfds = 0;
501
502#ifdef HAVE_POLL
503 re->fds = mem_deref(re->fds);
504#endif
505#ifdef HAVE_EPOLL
506 DEBUG_INFO("poll_close: epfd=%d\n", re->epfd);
507
508 if (re->epfd >= 0) {
509 (void)close(re->epfd);
510 re->epfd = -1;
511 }
512
513 re->events = mem_deref(re->events);
514#endif
515
516#ifdef HAVE_KQUEUE
517 if (re->kqfd >= 0) {
518 close(re->kqfd);
519 re->kqfd = -1;
520 }
521
522 re->evlist = mem_deref(re->evlist);
523#endif
524}
525
526
527static int poll_setup(struct re *re)
528{
529 int err;
530
531 err = fd_setsize(DEFAULT_MAXFDS);
532 if (err)
533 goto out;
534
535 if (METHOD_NULL == re->method) {
536 err = poll_method_set(poll_method_best());
537 if (err)
538 goto out;
539
540 DEBUG_INFO("poll setup: poll method not set - set to `%s'\n",
541 poll_method_name(re->method));
542 }
543
544 err = poll_init(re);
545
546 out:
547 if (err)
548 poll_close(re);
549
550 return err;
551}
552
553
554/**
555 * Listen for events on a file descriptor
556 *
557 * @param fd File descriptor
558 * @param flags Wanted event flags
559 * @param fh Event handler
560 * @param arg Handler argument
561 *
562 * @return 0 if success, otherwise errorcode
563 */
564int fd_listen(int fd, int flags, fd_h *fh, void *arg)
565{
566 struct re *re = re_get();
567 int err = 0;
568
569 DEBUG_INFO("fd_listen: fd=%d flags=0x%02x\n", fd, flags);
570
571 if (fd < 0) {
572 DEBUG_WARNING("fd_listen: corrupt fd %d\n", fd);
573 return EBADF;
574 }
575
576 if (flags || fh) {
577 err = poll_setup(re);
578 if (err)
579 return err;
580 }
581
582 if (fd >= re->maxfds) {
583 if (flags) {
584 DEBUG_WARNING("fd_listen: fd=%d flags=0x%02x"
585 " - Max %d fds\n",
586 fd, flags, re->maxfds);
587 }
588 return EMFILE;
589 }
590
591 /* Update fh set */
592 if (re->fhs) {
593 re->fhs[fd].flags = flags;
594 re->fhs[fd].fh = fh;
595 re->fhs[fd].arg = arg;
596 }
597
598 re->nfds = max(re->nfds, fd+1);
599
600 switch (re->method) {
601
602#ifdef HAVE_POLL
603 case METHOD_POLL:
604 err = set_poll_fds(re, fd, flags);
605 break;
606#endif
607
608#ifdef HAVE_EPOLL
609 case METHOD_EPOLL:
610 if (re->epfd < 0)
611 return EBADFD;
612 err = set_epoll_fds(re, fd, flags);
613 break;
614#endif
615
616#ifdef HAVE_KQUEUE
617 case METHOD_KQUEUE:
618 err = set_kqueue_fds(re, fd, flags);
619 break;
620#endif
621
622 default:
623 break;
624 }
625
626 if (err) {
627 if (flags && fh) {
628 fd_close(fd);
629 DEBUG_WARNING("fd_listen: fd=%d flags=0x%02x (%m)\n",
630 fd, flags, err);
631 }
632 }
633
634 return err;
635}
636
637
638/**
639 * Stop listening for events on a file descriptor
640 *
641 * @param fd File descriptor
642 */
643void fd_close(int fd)
644{
645 (void)fd_listen(fd, 0, NULL, NULL);
646}
647
648
649/**
650 * Polling loop
651 *
652 * @param re Poll state.
653 *
654 * @return 0 if success, otherwise errorcode
655 */
656static int fd_poll(struct re *re)
657{
658 const uint64_t to = tmr_next_timeout(&re->tmrl);
659 int i, n;
660#ifdef HAVE_SELECT
661 fd_set rfds, wfds, efds;
662#endif
663
664 DEBUG_INFO("next timer: %llu ms\n", to);
665
666 /* Wait for I/O */
667 switch (re->method) {
668
669#ifdef HAVE_POLL
670 case METHOD_POLL:
671 re_unlock(re);
672 n = poll(re->fds, re->nfds, to ? (int)to : -1);
673 re_lock(re);
674 break;
675#endif
676#ifdef HAVE_SELECT
677 case METHOD_SELECT: {
678 struct timeval tv;
679
680 /* Clear and update fd sets */
681 FD_ZERO(&rfds);
682 FD_ZERO(&wfds);
683 FD_ZERO(&efds);
684
685 for (i=0; i<re->nfds; i++) {
686 if (!re->fhs[i].fh)
687 continue;
688
689 if (re->fhs[i].flags & FD_READ)
690 FD_SET(i, &rfds);
691 if (re->fhs[i].flags & FD_WRITE)
692 FD_SET(i, &wfds);
693 if (re->fhs[i].flags & FD_EXCEPT)
694 FD_SET(i, &efds);
695 }
696
697#ifdef WIN32
698 tv.tv_sec = (long) to / 1000;
699#else
700 tv.tv_sec = (time_t) to / 1000;
701#endif
702 tv.tv_usec = (uint32_t) (to % 1000) * 1000;
703 re_unlock(re);
704 n = select(re->nfds, &rfds, &wfds, &efds, to ? &tv : NULL);
705 re_lock(re);
706 }
707 break;
708#endif
709#ifdef HAVE_EPOLL
710 case METHOD_EPOLL:
711 re_unlock(re);
712 n = epoll_wait(re->epfd, re->events, re->maxfds,
713 to ? (int)to : -1);
714 re_lock(re);
715 break;
716#endif
717
718#ifdef HAVE_KQUEUE
719 case METHOD_KQUEUE: {
720 struct timespec timeout;
721
722 timeout.tv_sec = (time_t) (to / 1000);
723 timeout.tv_nsec = (to % 1000) * 1000000;
724
725 re_unlock(re);
726 n = kevent(re->kqfd, NULL, 0, re->evlist, re->maxfds,
727 to ? &timeout : NULL);
728 re_lock(re);
729 }
730 break;
731#endif
732
733 default:
734 (void)to;
735 DEBUG_WARNING("no polling method set\n");
736 return EINVAL;
737 }
738
739 if (n < 0)
740 return errno;
741
742 /* Check for events */
743 for (i=0; (n > 0) && (i < re->nfds); i++) {
744 int fd, flags = 0;
745
746 switch (re->method) {
747
748#ifdef HAVE_POLL
749 case METHOD_POLL:
750 fd = i;
751 if (re->fds[fd].revents & POLLIN)
752 flags |= FD_READ;
753 if (re->fds[fd].revents & POLLOUT)
754 flags |= FD_WRITE;
755 if (re->fds[fd].revents & (POLLERR|POLLHUP|POLLNVAL))
756 flags |= FD_EXCEPT;
757 if (re->fds[fd].revents & POLLNVAL) {
758 DEBUG_WARNING("event: fd=%d POLLNVAL"
759 " (fds.fd=%d,"
760 " fds.events=0x%02x)\n",
761 fd, re->fds[fd].fd,
762 re->fds[fd].events);
763 }
764 /* Clear events */
765 re->fds[fd].revents = 0;
766 break;
767#endif
768#ifdef HAVE_SELECT
769 case METHOD_SELECT:
770 fd = i;
771 if (FD_ISSET(fd, &rfds))
772 flags |= FD_READ;
773 if (FD_ISSET(fd, &wfds))
774 flags |= FD_WRITE;
775 if (FD_ISSET(fd, &efds))
776 flags |= FD_EXCEPT;
777 break;
778#endif
779#ifdef HAVE_EPOLL
780 case METHOD_EPOLL:
781 fd = re->events[i].data.fd;
782
783 if (re->events[i].events & EPOLLIN)
784 flags |= FD_READ;
785 if (re->events[i].events & EPOLLOUT)
786 flags |= FD_WRITE;
787 if (re->events[i].events & (EPOLLERR|EPOLLHUP))
788 flags |= FD_EXCEPT;
789
790 if (!flags) {
791 DEBUG_WARNING("epoll: no flags fd=%d\n", fd);
792 }
793
794 break;
795#endif
796
797#ifdef HAVE_KQUEUE
798 case METHOD_KQUEUE: {
799
800 struct kevent *kev = &re->evlist[i];
801
802 fd = (int)kev->ident;
803
804 if (fd >= re->maxfds) {
805 DEBUG_WARNING("large fd=%d\n", fd);
806 break;
807 }
808
809 if (kev->filter == EVFILT_READ)
810 flags |= FD_READ;
811 else if (kev->filter == EVFILT_WRITE)
812 flags |= FD_WRITE;
813 else {
814 DEBUG_WARNING("kqueue: unhandled "
815 "filter %x\n",
816 kev->filter);
817 }
818
819 if (kev->flags & EV_EOF) {
820 flags |= FD_EXCEPT;
821 }
822 if (kev->flags & EV_ERROR) {
823 DEBUG_WARNING("kqueue: EV_ERROR on fd %d\n",
824 fd);
825 }
826
827 if (!flags) {
828 DEBUG_WARNING("kqueue: no flags fd=%d\n", fd);
829 }
830 }
831 break;
832#endif
833
834 default:
835 return EINVAL;
836 }
837
838 if (!flags)
839 continue;
840
841 if (re->fhs[fd].fh) {
842#if MAIN_DEBUG
843 fd_handler(re, fd, flags);
844#else
845 re->fhs[fd].fh(flags, re->fhs[fd].arg);
846#endif
847 }
848
849 /* Check if polling method was changed */
850 if (re->update) {
851 re->update = false;
852 return 0;
853 }
854
855 --n;
856 }
857
858 return 0;
859}
860
861
862/**
863 * Set the maximum number of file descriptors
864 *
865 * @param maxfds Max FDs. 0 to free.
866 *
867 * @return 0 if success, otherwise errorcode
868 */
869int fd_setsize(int maxfds)
870{
871 struct re *re = re_get();
872
873 if (!maxfds) {
874 fd_debug();
875 poll_close(re);
876 return 0;
877 }
878
879 if (!re->maxfds)
880 re->maxfds = maxfds;
881
882 if (!re->fhs) {
883 DEBUG_INFO("fd_setsize: maxfds=%d, allocating %u bytes\n",
884 re->maxfds, re->maxfds * sizeof(*re->fhs));
885
886 re->fhs = mem_zalloc(re->maxfds * sizeof(*re->fhs), NULL);
887 if (!re->fhs)
888 return ENOMEM;
889 }
890
891 return 0;
892}
893
894
895/**
896 * Print all file descriptors in-use
897 */
898void fd_debug(void)
899{
900 const struct re *re = re_get();
901 int i;
902
903 if (!re->fhs)
904 return;
905
906 for (i=0; i<re->nfds; i++) {
907
908 if (!re->fhs[i].flags)
909 continue;
910
911 (void)re_fprintf(stderr,
912 "fd %d in use: flags=%x fh=%p arg=%p\n",
913 i, re->fhs[i].flags, re->fhs[i].fh,
914 re->fhs[i].arg);
915 }
916}
917
918
919#ifdef HAVE_SIGNAL
920/* Thread-safe signal handling */
921static void signal_handler(int sig)
922{
923 (void)signal(sig, signal_handler);
924 re_get()->sig = sig;
925}
926#endif
927
928
929/**
930 * Main polling loop for async I/O events. This function will only return when
931 * re_cancel() is called or an error occured.
932 *
933 * @param signalh Optional Signal handler
934 *
935 * @return 0 if success, otherwise errorcode
936 */
937int re_main(re_signal_h *signalh)
938{
939 struct re *re = re_get();
940 int err;
941
942#ifdef HAVE_SIGNAL
943 if (signalh) {
944 (void)signal(SIGINT, signal_handler);
945 (void)signal(SIGALRM, signal_handler);
946 (void)signal(SIGTERM, signal_handler);
947 }
948#endif
949
950 if (re->polling) {
951 DEBUG_WARNING("main loop already polling\n");
952 return EALREADY;
953 }
954
955 err = poll_setup(re);
956 if (err)
957 goto out;
958
959 DEBUG_INFO("Using async I/O polling method: `%s'\n",
960 poll_method_name(re->method));
961
962 re->polling = true;
963
964 re_lock(re);
965 for (;;) {
966
967 if (re->sig) {
968 if (signalh)
969 signalh(re->sig);
970
971 re->sig = 0;
972 }
973
974 if (!re->polling) {
975 err = 0;
976 break;
977 }
978
979 err = fd_poll(re);
980 if (err) {
981 if (EINTR == err)
982 continue;
983
984#ifdef DARWIN
985 /* NOTE: workaround for Darwin */
986 if (EBADF == err)
987 continue;
988#endif
989
990 break;
991 }
992
993 tmr_poll(&re->tmrl);
994 }
995 re_unlock(re);
996
997 out:
998 re->polling = false;
999
1000 return err;
1001}
1002
1003
1004/**
1005 * Cancel the main polling loop
1006 */
1007void re_cancel(void)
1008{
1009 struct re *re = re_get();
1010
1011 re->polling = false;
1012}
1013
1014
1015/**
1016 * Debug the main polling loop
1017 *
1018 * @param pf Print handler where debug output is printed to
1019 * @param unused Unused parameter
1020 *
1021 * @return 0 if success, otherwise errorcode
1022 */
1023int re_debug(struct re_printf *pf, void *unused)
1024{
1025 struct re *re = re_get();
1026 int err = 0;
1027
1028 (void)unused;
1029
1030 err |= re_hprintf(pf, "re main loop:\n");
1031 err |= re_hprintf(pf, " maxfds: %d\n", re->maxfds);
1032 err |= re_hprintf(pf, " nfds: %d\n", re->nfds);
1033 err |= re_hprintf(pf, " method: %d (%s)\n", re->method,
1034 poll_method_name(re->method));
1035
1036 return err;
1037}
1038
1039
1040/**
1041 * Set async I/O polling method. This function can also be called while the
1042 * program is running.
1043 *
1044 * @param method New polling method
1045 *
1046 * @return 0 if success, otherwise errorcode
1047 */
1048int poll_method_set(enum poll_method method)
1049{
1050 struct re *re = re_get();
1051 int err;
1052
1053 err = fd_setsize(DEFAULT_MAXFDS);
1054 if (err)
1055 return err;
1056
1057 switch (method) {
1058
1059#ifdef HAVE_POLL
1060 case METHOD_POLL:
1061 break;
1062#endif
1063#ifdef HAVE_SELECT
1064 case METHOD_SELECT:
1065 if (re->maxfds > (int)FD_SETSIZE) {
1066 DEBUG_WARNING("SELECT: maxfds > FD_SETSIZE\n");
1067 return EMFILE;
1068 }
1069 break;
1070#endif
1071#ifdef HAVE_EPOLL
1072 case METHOD_EPOLL:
1073 if (!epoll_check())
1074 return EINVAL;
1075 break;
1076#endif
1077#ifdef HAVE_KQUEUE
1078 case METHOD_KQUEUE:
1079 break;
1080#endif
1081 default:
1082 DEBUG_WARNING("poll method not supported: '%s'\n",
1083 poll_method_name(method));
1084 return EINVAL;
1085 }
1086
1087 re->method = method;
1088 re->update = true;
1089
1090 DEBUG_INFO("Setting async I/O polling method to `%s'\n",
1091 poll_method_name(re->method));
1092
1093 err = poll_init(re);
1094 if (err)
1095 return err;
1096
1097 return rebuild_fds(re);
1098}
1099
1100
1101/**
1102 * Add a worker thread for this thread
1103 *
1104 * @return 0 if success, otherwise errorcode
1105 */
1106int re_thread_init(void)
1107{
1108#ifdef HAVE_PTHREAD
1109 struct re *re;
1110
1111 pthread_once(&pt_once, re_once);
1112
1113 re = pthread_getspecific(pt_key);
1114 if (re) {
1115 DEBUG_WARNING("thread_init: already added for thread %d\n",
1116 pthread_self());
1117 return EALREADY;
1118 }
1119
1120 re = malloc(sizeof(*re));
1121 if (!re)
1122 return ENOMEM;
1123
1124 memset(re, 0, sizeof(*re));
1125 pthread_mutex_init(&re->mutex, NULL);
1126 re->mutexp = &re->mutex;
1127
1128#ifdef HAVE_EPOLL
1129 re->epfd = -1;
1130#endif
1131
1132#ifdef HAVE_KQUEUE
1133 re->kqfd = -1;
1134#endif
1135
1136 pthread_setspecific(pt_key, re);
1137 return 0;
1138#else
1139 return ENOSYS;
1140#endif
1141}
1142
1143
1144/**
1145 * Remove the worker thread for this thread
1146 */
1147void re_thread_close(void)
1148{
1149#ifdef HAVE_PTHREAD
1150 struct re *re;
1151
1152 pthread_once(&pt_once, re_once);
1153
1154 re = pthread_getspecific(pt_key);
1155 if (re) {
1156 poll_close(re);
1157 free(re);
1158 pthread_setspecific(pt_key, NULL);
1159 }
1160#endif
1161}
1162
1163
1164/**
1165 * Enter an 're' thread
1166 *
1167 * @note Must only be called from a non-re thread
1168 */
1169void re_thread_enter(void)
1170{
1171 re_lock(re_get());
1172}
1173
1174
1175/**
1176 * Leave an 're' thread
1177 *
1178 * @note Must only be called from a non-re thread
1179 */
1180void re_thread_leave(void)
1181{
1182 re_unlock(re_get());
1183}
1184
1185
1186/**
1187 * Set an external mutex for this thread
1188 *
1189 * @param mutexp Pointer to external mutex, NULL to use internal
1190 */
1191void re_set_mutex(void *mutexp)
1192{
1193#ifdef HAVE_PTHREAD
1194 struct re *re = re_get();
1195
1196 re->mutexp = mutexp ? mutexp : &re->mutex;
1197#else
1198 (void)mutexp;
1199#endif
1200}
1201
1202
1203/**
1204 * Get the timer-list for this thread
1205 *
1206 * @return Timer list
1207 *
1208 * @note only used by tmr module
1209 */
1210struct list *tmrl_get(void);
1211struct list *tmrl_get(void)
1212{
1213 return &re_get()->tmrl;
1214}