blob: 0243b4bde94d7be2a3bf853a9b4ab7f527cb4572 [file] [log] [blame]
James Kuszmaul82f6c042021-01-17 11:30:16 -08001/**
2 * @file main.c Main polling routine
3 *
4 * Copyright (C) 2010 Creytiv.com
5 */
6#ifdef HAVE_SYS_TIME_H
7#include <sys/time.h>
8#endif
9#include <sys/types.h>
10#undef _STRICT_ANSI
11#include <string.h>
12#ifdef HAVE_UNISTD_H
13#include <unistd.h>
14#endif
15#ifdef WIN32
16#include <winsock.h>
17#endif
18#ifdef HAVE_SIGNAL
19#include <signal.h>
20#endif
21#ifdef HAVE_SELECT_H
22#include <sys/select.h>
23#endif
24#ifdef HAVE_POLL
25#include <poll.h>
26#endif
27#ifdef HAVE_EPOLL
28#include <sys/epoll.h>
29#endif
30#ifdef HAVE_KQUEUE
31#include <sys/types.h>
32#include <sys/event.h>
33#include <sys/time.h>
34#undef LIST_INIT
35#undef LIST_FOREACH
36#endif
37#include <re_types.h>
38#include <re_fmt.h>
39#include <re_mem.h>
40#include <re_mbuf.h>
41#include <re_list.h>
42#include <re_tmr.h>
43#include <re_main.h>
44#include "main.h"
45#ifdef HAVE_PTHREAD
46#define __USE_GNU 1
47#include <stdlib.h>
48#include <pthread.h>
49#endif
50
51
52#define DEBUG_MODULE "main"
53#define DEBUG_LEVEL 5
54#include <re_dbg.h>
55
56/*
57 epoll() has been tested successfully on the following kernels:
58
59 - Linux 2.6.16.29-xen (Debian 4.0 etch)
60 - Linux 2.6.18-4-amd64 (Debian 4.0 etch)
61
62
63 TODO clean this up
64
65 - The polling method is selectable both in compile-time and run-time
66 - The polling method can be changed in run time. this is cool!
67 - Maximum number of fds can be set from application, but only once!
68 - Look at howto optimise main loop
69 */
70
71#if !defined (RELEASE) && !defined (MAIN_DEBUG)
72#define MAIN_DEBUG 1 /**< Enable main loop debugging */
73#endif
74
75
76/** Main loop values */
77enum {
78 MAX_BLOCKING = 100, /**< Maximum time spent in handler in [ms] */
79#if defined (FD_SETSIZE)
80 DEFAULT_MAXFDS = FD_SETSIZE
81#else
82 DEFAULT_MAXFDS = 128
83#endif
84};
85
86
87/** Polling loop data */
88struct re {
89 /** File descriptor handler set */
90 struct {
91 int flags; /**< Polling flags (Read, Write, etc.) */
92 fd_h *fh; /**< Event handler */
93 void *arg; /**< Handler argument */
94 } *fhs;
95 int maxfds; /**< Maximum number of polling fds */
96 int nfds; /**< Number of active file descriptors */
97 enum poll_method method; /**< The current polling method */
98 bool update; /**< File descriptor set need updating */
99 bool polling; /**< Is polling flag */
100 int sig; /**< Last caught signal */
101 struct list tmrl; /**< List of timers */
102
103#ifdef HAVE_POLL
104 struct pollfd *fds; /**< Event set for poll() */
105#endif
106
107#ifdef HAVE_EPOLL
108 struct epoll_event *events; /**< Event set for epoll() */
109 int epfd; /**< epoll control file descriptor */
110#endif
111
112#ifdef HAVE_KQUEUE
113 struct kevent *evlist;
114 int kqfd;
115#endif
116
117#ifdef HAVE_PTHREAD
118 pthread_mutex_t mutex; /**< Mutex for thread synchronization */
119 pthread_mutex_t *mutexp; /**< Pointer to active mutex */
120#endif
121};
122
123static struct re global_re = {
124 NULL,
125 0,
126 0,
127 METHOD_NULL,
128 false,
129 false,
130 0,
131 LIST_INIT,
132#ifdef HAVE_POLL
133 NULL,
134#endif
135#ifdef HAVE_EPOLL
136 NULL,
137 -1,
138#endif
139#ifdef HAVE_KQUEUE
140 NULL,
141 -1,
142#endif
143#ifdef HAVE_PTHREAD
144#if MAIN_DEBUG && defined (PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP)
145 PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP,
146#else
147 PTHREAD_MUTEX_INITIALIZER,
148#endif
149 &global_re.mutex,
150#endif
151};
152
153
154#ifdef HAVE_PTHREAD
155
156static void poll_close(struct re *re);
157
158static pthread_once_t pt_once = PTHREAD_ONCE_INIT;
159static pthread_key_t pt_key;
160
161
162static void thread_destructor(void *arg)
163{
164 poll_close(arg);
165 free(arg);
166}
167
168
169static void re_once(void)
170{
171 pthread_key_create(&pt_key, thread_destructor);
172}
173
174
175static struct re *re_get(void)
176{
177 struct re *re;
178
179 pthread_once(&pt_once, re_once);
180
181 re = pthread_getspecific(pt_key);
182 if (!re) {
183 re = &global_re;
184 }
185
186 return re;
187}
188
189
190static inline void re_lock(struct re *re)
191{
192 int err;
193
194 err = pthread_mutex_lock(re->mutexp);
195 if (err) {
196 DEBUG_WARNING("re_lock: %m\n", err);
197 }
198}
199
200
201static inline void re_unlock(struct re *re)
202{
203 int err;
204
205 err = pthread_mutex_unlock(re->mutexp);
206 if (err) {
207 DEBUG_WARNING("re_unlock: %m\n", err);
208 }
209}
210
211
212#else
213
214static struct re *re_get(void)
215{
216 return &global_re;
217}
218
219#define re_lock(x) /**< Stub */
220#define re_unlock(x) /**< Stub */
221
222#endif
223
224
225#if MAIN_DEBUG
226/**
227 * Call the application event handler
228 *
229 * @param re Poll state
230 * @param fd File descriptor
231 * @param flags Event flags
232 */
233static void fd_handler(struct re *re, int fd, int flags)
234{
235 const uint64_t tick = tmr_jiffies();
236 uint32_t diff;
237
238 DEBUG_INFO("event on fd=%d (flags=0x%02x)...\n", fd, flags);
239
240 re->fhs[fd].fh(flags, re->fhs[fd].arg);
241
242 diff = (uint32_t)(tmr_jiffies() - tick);
243
244 if (diff > MAX_BLOCKING) {
245 DEBUG_WARNING("long async blocking: %u>%u ms (h=%p arg=%p)\n",
246 diff, MAX_BLOCKING,
247 re->fhs[fd].fh, re->fhs[fd].arg);
248 }
249}
250#endif
251
252
253#ifdef HAVE_POLL
254static int set_poll_fds(struct re *re, int fd, int flags)
255{
256 if (!re->fds)
257 return 0;
258
259 if (flags)
260 re->fds[fd].fd = fd;
261 else
262 re->fds[fd].fd = -1;
263
264 re->fds[fd].events = 0;
265 if (flags & FD_READ)
266 re->fds[fd].events |= POLLIN;
267 if (flags & FD_WRITE)
268 re->fds[fd].events |= POLLOUT;
269 if (flags & FD_EXCEPT)
270 re->fds[fd].events |= POLLERR;
271
272 return 0;
273}
274#endif
275
276
277#ifdef HAVE_EPOLL
278static int set_epoll_fds(struct re *re, int fd, int flags)
279{
280 struct epoll_event event;
281 int err = 0;
282
283 if (re->epfd < 0)
284 return EBADFD;
285
286 memset(&event, 0, sizeof(event));
287
288 DEBUG_INFO("set_epoll_fds: fd=%d flags=0x%02x\n", fd, flags);
289
290 if (flags) {
291 event.data.fd = fd;
292
293 if (flags & FD_READ)
294 event.events |= EPOLLIN;
295 if (flags & FD_WRITE)
296 event.events |= EPOLLOUT;
297 if (flags & FD_EXCEPT)
298 event.events |= EPOLLERR;
299
300 /* Try to add it first */
301 if (-1 == epoll_ctl(re->epfd, EPOLL_CTL_ADD, fd, &event)) {
302
303 /* If already exist then modify it */
304 if (EEXIST == errno) {
305
306 if (-1 == epoll_ctl(re->epfd, EPOLL_CTL_MOD,
307 fd, &event)) {
308 err = errno;
309 DEBUG_WARNING("epoll_ctl:"
310 " EPOLL_CTL_MOD:"
311 " fd=%d (%m)\n",
312 fd, err);
313 }
314 }
315 else {
316 err = errno;
317 DEBUG_WARNING("epoll_ctl: EPOLL_CTL_ADD:"
318 " fd=%d (%m)\n",
319 fd, err);
320 }
321 }
322 }
323 else {
324 if (-1 == epoll_ctl(re->epfd, EPOLL_CTL_DEL, fd, &event)) {
325 err = errno;
326 DEBUG_INFO("epoll_ctl: EPOLL_CTL_DEL: fd=%d (%m)\n",
327 fd, err);
328 }
329 }
330
331 return err;
332}
333#endif
334
335
336#ifdef HAVE_KQUEUE
337static int set_kqueue_fds(struct re *re, int fd, int flags)
338{
339 struct kevent kev[2];
340 int r, n = 0;
341
342 memset(kev, 0, sizeof(kev));
343
344 /* always delete the events */
345 EV_SET(&kev[0], fd, EVFILT_READ, EV_DELETE, 0, 0, 0);
346 EV_SET(&kev[1], fd, EVFILT_WRITE, EV_DELETE, 0, 0, 0);
347 kevent(re->kqfd, kev, 2, NULL, 0, NULL);
348
349 memset(kev, 0, sizeof(kev));
350
351 if (flags & FD_WRITE) {
352 EV_SET(&kev[n], fd, EVFILT_WRITE, EV_ADD, 0, 0, 0);
353 ++n;
354 }
355 if (flags & FD_READ) {
356 EV_SET(&kev[n], fd, EVFILT_READ, EV_ADD, 0, 0, 0);
357 ++n;
358 }
359
360 if (n) {
361 r = kevent(re->kqfd, kev, n, NULL, 0, NULL);
362 if (r < 0) {
363 int err = errno;
364
365 DEBUG_WARNING("set: [fd=%d, flags=%x] kevent: %m\n",
366 fd, flags, err);
367 return err;
368 }
369 }
370
371 return 0;
372}
373#endif
374
375
376/**
377 * Rebuild the file descriptor mapping table. This must be done whenever
378 * the polling method is changed.
379 */
380static int rebuild_fds(struct re *re)
381{
382 int i, err = 0;
383
384 DEBUG_INFO("rebuilding fds (nfds=%d)\n", re->nfds);
385
386 /* Update fd sets */
387 for (i=0; i<re->nfds; i++) {
388 if (!re->fhs[i].fh)
389 continue;
390
391 switch (re->method) {
392
393#ifdef HAVE_POLL
394 case METHOD_POLL:
395 err = set_poll_fds(re, i, re->fhs[i].flags);
396 break;
397#endif
398#ifdef HAVE_EPOLL
399 case METHOD_EPOLL:
400 err = set_epoll_fds(re, i, re->fhs[i].flags);
401 break;
402#endif
403
404#ifdef HAVE_KQUEUE
405 case METHOD_KQUEUE:
406 err = set_kqueue_fds(re, i, re->fhs[i].flags);
407 break;
408#endif
409
410 default:
411 break;
412 }
413
414 if (err)
415 break;
416 }
417
418 return err;
419}
420
421
422static int poll_init(struct re *re)
423{
424 DEBUG_INFO("poll init (maxfds=%d)\n", re->maxfds);
425
426 if (!re->maxfds) {
427 DEBUG_WARNING("poll init: maxfds is 0\n");
428 return EINVAL;
429 }
430
431 switch (re->method) {
432
433#ifdef HAVE_POLL
434 case METHOD_POLL:
435 if (!re->fds) {
436 re->fds = mem_zalloc(re->maxfds * sizeof(*re->fds),
437 NULL);
438 if (!re->fds)
439 return ENOMEM;
440 }
441 break;
442#endif
443#ifdef HAVE_EPOLL
444 case METHOD_EPOLL:
445 if (!re->events) {
446 DEBUG_INFO("allocate %u bytes for epoll set\n",
447 re->maxfds * sizeof(*re->events));
448 re->events = mem_zalloc(re->maxfds*sizeof(*re->events),
449 NULL);
450 if (!re->events)
451 return ENOMEM;
452 }
453
454 if (re->epfd < 0
455 && -1 == (re->epfd = epoll_create(re->maxfds))) {
456
457 int err = errno;
458
459 DEBUG_WARNING("epoll_create: %m (maxfds=%d)\n",
460 err, re->maxfds);
461 return err;
462 }
463 DEBUG_INFO("init: epoll_create() epfd=%d\n", re->epfd);
464 break;
465#endif
466
467#ifdef HAVE_KQUEUE
468 case METHOD_KQUEUE:
469
470 if (!re->evlist) {
471 size_t sz = re->maxfds * sizeof(*re->evlist);
472 re->evlist = mem_zalloc(sz, NULL);
473 if (!re->evlist)
474 return ENOMEM;
475 }
476
477 if (re->kqfd < 0) {
478 re->kqfd = kqueue();
479 if (re->kqfd < 0)
480 return errno;
481 DEBUG_INFO("kqueue: fd=%d\n", re->kqfd);
482 }
483
484 break;
485#endif
486
487 default:
488 break;
489 }
490 return 0;
491}
492
493
494/** Free all resources */
495static void poll_close(struct re *re)
496{
497 DEBUG_INFO("poll close\n");
498
499 re->fhs = mem_deref(re->fhs);
500 re->maxfds = 0;
501
502#ifdef HAVE_POLL
503 re->fds = mem_deref(re->fds);
504#endif
505#ifdef HAVE_EPOLL
506 DEBUG_INFO("poll_close: epfd=%d\n", re->epfd);
507
508 if (re->epfd >= 0) {
509 (void)close(re->epfd);
510 re->epfd = -1;
511 }
512
513 re->events = mem_deref(re->events);
514#endif
515
516#ifdef HAVE_KQUEUE
517 if (re->kqfd >= 0) {
518 close(re->kqfd);
519 re->kqfd = -1;
520 }
521
522 re->evlist = mem_deref(re->evlist);
523#endif
524}
525
526
527static int poll_setup(struct re *re)
528{
529 int err;
530
531 err = fd_setsize(DEFAULT_MAXFDS);
532 if (err)
533 goto out;
534
535 if (METHOD_NULL == re->method) {
536 err = poll_method_set(poll_method_best());
537 if (err)
538 goto out;
539
540 DEBUG_INFO("poll setup: poll method not set - set to `%s'\n",
541 poll_method_name(re->method));
542 }
543
544 err = poll_init(re);
545
546 out:
547 if (err)
548 poll_close(re);
549
550 return err;
551}
552
553
554/**
555 * Listen for events on a file descriptor
556 *
557 * @param fd File descriptor
558 * @param flags Wanted event flags
559 * @param fh Event handler
560 * @param arg Handler argument
561 *
562 * @return 0 if success, otherwise errorcode
563 */
564int fd_listen(int fd, int flags, fd_h *fh, void *arg)
565{
566 struct re *re = re_get();
567 int err = 0;
568
569 DEBUG_INFO("fd_listen: fd=%d flags=0x%02x\n", fd, flags);
570
571 if (fd < 0) {
572 DEBUG_WARNING("fd_listen: corrupt fd %d\n", fd);
573 return EBADF;
574 }
575
576 if (flags || fh) {
577 err = poll_setup(re);
578 if (err)
579 return err;
580 }
581
582 if (fd >= re->maxfds) {
583 if (flags) {
584 DEBUG_WARNING("fd_listen: fd=%d flags=0x%02x"
585 " - Max %d fds\n",
586 fd, flags, re->maxfds);
587 }
588 return EMFILE;
589 }
590
591 /* Update fh set */
592 if (re->fhs) {
593 re->fhs[fd].flags = flags;
594 re->fhs[fd].fh = fh;
595 re->fhs[fd].arg = arg;
596 }
597
598 re->nfds = max(re->nfds, fd+1);
599
600 switch (re->method) {
601
602#ifdef HAVE_POLL
603 case METHOD_POLL:
604 err = set_poll_fds(re, fd, flags);
605 break;
606#endif
607
608#ifdef HAVE_EPOLL
609 case METHOD_EPOLL:
610 if (re->epfd < 0)
611 return EBADFD;
612 err = set_epoll_fds(re, fd, flags);
613 break;
614#endif
615
616#ifdef HAVE_KQUEUE
617 case METHOD_KQUEUE:
618 err = set_kqueue_fds(re, fd, flags);
619 break;
620#endif
621
622 default:
623 break;
624 }
625
626 if (err) {
627 if (flags && fh) {
628 fd_close(fd);
629 DEBUG_WARNING("fd_listen: fd=%d flags=0x%02x (%m)\n",
630 fd, flags, err);
631 }
632 }
633
634 return err;
635}
636
637
638/**
639 * Stop listening for events on a file descriptor
640 *
641 * @param fd File descriptor
642 */
643void fd_close(int fd)
644{
645 (void)fd_listen(fd, 0, NULL, NULL);
646}
647
648
649/**
650 * Polling loop
651 *
652 * @param re Poll state.
653 *
654 * @return 0 if success, otherwise errorcode
655 */
656static int fd_poll(struct re *re)
657{
658 const uint64_t to = tmr_next_timeout(&re->tmrl);
659 int i, n;
660#ifdef HAVE_SELECT
661 fd_set rfds, wfds, efds;
662#endif
663
664 DEBUG_INFO("next timer: %llu ms\n", to);
665
666 /* Wait for I/O */
667 switch (re->method) {
668
669#ifdef HAVE_POLL
670 case METHOD_POLL:
671 re_unlock(re);
672 n = poll(re->fds, re->nfds, to ? (int)to : -1);
673 re_lock(re);
674 break;
675#endif
676#ifdef HAVE_SELECT
677 case METHOD_SELECT: {
678 struct timeval tv;
679
680 /* Clear and update fd sets */
681 FD_ZERO(&rfds);
682 FD_ZERO(&wfds);
683 FD_ZERO(&efds);
684
685 for (i=0; i<re->nfds; i++) {
686 if (!re->fhs[i].fh)
687 continue;
688
689 if (re->fhs[i].flags & FD_READ)
690 FD_SET(i, &rfds);
691 if (re->fhs[i].flags & FD_WRITE)
692 FD_SET(i, &wfds);
693 if (re->fhs[i].flags & FD_EXCEPT)
694 FD_SET(i, &efds);
695 }
696
697#ifdef WIN32
698 tv.tv_sec = (long) to / 1000;
699#else
700 tv.tv_sec = (time_t) to / 1000;
701#endif
702 tv.tv_usec = (uint32_t) (to % 1000) * 1000;
703 re_unlock(re);
704 n = select(re->nfds, &rfds, &wfds, &efds, to ? &tv : NULL);
705 re_lock(re);
706 }
707 break;
708#endif
709#ifdef HAVE_EPOLL
710 case METHOD_EPOLL:
711 re_unlock(re);
712 n = epoll_wait(re->epfd, re->events, re->maxfds,
713 to ? (int)to : -1);
714 re_lock(re);
715 break;
716#endif
717
718#ifdef HAVE_KQUEUE
719 case METHOD_KQUEUE: {
720 struct timespec timeout;
721
722 timeout.tv_sec = (time_t) (to / 1000);
723 timeout.tv_nsec = (to % 1000) * 1000000;
724
725 re_unlock(re);
726 n = kevent(re->kqfd, NULL, 0, re->evlist, re->maxfds,
727 to ? &timeout : NULL);
728 re_lock(re);
729 }
730 break;
731#endif
732
733 default:
734 (void)to;
735 DEBUG_WARNING("no polling method set\n");
736 return EINVAL;
737 }
738
739 if (n < 0)
740 return errno;
741
742 /* Check for events */
743 for (i=0; (n > 0) && (i < re->nfds); i++) {
744 int fd, flags = 0;
745
746 switch (re->method) {
747
748#ifdef HAVE_POLL
749 case METHOD_POLL:
750 fd = i;
751 if (re->fds[fd].revents & POLLIN)
752 flags |= FD_READ;
753 if (re->fds[fd].revents & POLLOUT)
754 flags |= FD_WRITE;
755 if (re->fds[fd].revents & (POLLERR|POLLHUP|POLLNVAL))
756 flags |= FD_EXCEPT;
757 if (re->fds[fd].revents & POLLNVAL) {
758 DEBUG_WARNING("event: fd=%d POLLNVAL"
759 " (fds.fd=%d,"
760 " fds.events=0x%02x)\n",
761 fd, re->fds[fd].fd,
762 re->fds[fd].events);
763 }
764 /* Clear events */
765 re->fds[fd].revents = 0;
766 break;
767#endif
768#ifdef HAVE_SELECT
769 case METHOD_SELECT:
770 fd = i;
771 if (FD_ISSET(fd, &rfds))
772 flags |= FD_READ;
773 if (FD_ISSET(fd, &wfds))
774 flags |= FD_WRITE;
775 if (FD_ISSET(fd, &efds))
776 flags |= FD_EXCEPT;
777 break;
778#endif
779#ifdef HAVE_EPOLL
780 case METHOD_EPOLL:
781 fd = re->events[i].data.fd;
782
783 if (re->events[i].events & EPOLLIN)
784 flags |= FD_READ;
785 if (re->events[i].events & EPOLLOUT)
786 flags |= FD_WRITE;
787 if (re->events[i].events & (EPOLLERR|EPOLLHUP))
788 flags |= FD_EXCEPT;
Austin Schuh35a2f492021-04-07 21:41:56 -0700789 if (re->events[i].events & EPOLLHUP)
790 flags |= FD_EXCEPT;
James Kuszmaul82f6c042021-01-17 11:30:16 -0800791
792 if (!flags) {
793 DEBUG_WARNING("epoll: no flags fd=%d\n", fd);
794 }
795
796 break;
797#endif
798
799#ifdef HAVE_KQUEUE
800 case METHOD_KQUEUE: {
801
802 struct kevent *kev = &re->evlist[i];
803
804 fd = (int)kev->ident;
805
806 if (fd >= re->maxfds) {
807 DEBUG_WARNING("large fd=%d\n", fd);
808 break;
809 }
810
811 if (kev->filter == EVFILT_READ)
812 flags |= FD_READ;
813 else if (kev->filter == EVFILT_WRITE)
814 flags |= FD_WRITE;
815 else {
816 DEBUG_WARNING("kqueue: unhandled "
817 "filter %x\n",
818 kev->filter);
819 }
820
821 if (kev->flags & EV_EOF) {
822 flags |= FD_EXCEPT;
823 }
824 if (kev->flags & EV_ERROR) {
825 DEBUG_WARNING("kqueue: EV_ERROR on fd %d\n",
826 fd);
827 }
828
829 if (!flags) {
830 DEBUG_WARNING("kqueue: no flags fd=%d\n", fd);
831 }
832 }
833 break;
834#endif
835
836 default:
837 return EINVAL;
838 }
839
840 if (!flags)
841 continue;
842
843 if (re->fhs[fd].fh) {
844#if MAIN_DEBUG
845 fd_handler(re, fd, flags);
846#else
847 re->fhs[fd].fh(flags, re->fhs[fd].arg);
848#endif
849 }
850
851 /* Check if polling method was changed */
852 if (re->update) {
853 re->update = false;
854 return 0;
855 }
856
857 --n;
858 }
859
860 return 0;
861}
862
863
864/**
865 * Set the maximum number of file descriptors
866 *
867 * @param maxfds Max FDs. 0 to free.
868 *
869 * @return 0 if success, otherwise errorcode
870 */
871int fd_setsize(int maxfds)
872{
873 struct re *re = re_get();
874
875 if (!maxfds) {
876 fd_debug();
877 poll_close(re);
878 return 0;
879 }
880
881 if (!re->maxfds)
882 re->maxfds = maxfds;
883
884 if (!re->fhs) {
885 DEBUG_INFO("fd_setsize: maxfds=%d, allocating %u bytes\n",
886 re->maxfds, re->maxfds * sizeof(*re->fhs));
887
888 re->fhs = mem_zalloc(re->maxfds * sizeof(*re->fhs), NULL);
889 if (!re->fhs)
890 return ENOMEM;
891 }
892
893 return 0;
894}
895
896
897/**
898 * Print all file descriptors in-use
899 */
900void fd_debug(void)
901{
902 const struct re *re = re_get();
903 int i;
904
905 if (!re->fhs)
906 return;
907
908 for (i=0; i<re->nfds; i++) {
909
910 if (!re->fhs[i].flags)
911 continue;
912
913 (void)re_fprintf(stderr,
914 "fd %d in use: flags=%x fh=%p arg=%p\n",
915 i, re->fhs[i].flags, re->fhs[i].fh,
916 re->fhs[i].arg);
917 }
918}
919
920
921#ifdef HAVE_SIGNAL
922/* Thread-safe signal handling */
923static void signal_handler(int sig)
924{
925 (void)signal(sig, signal_handler);
926 re_get()->sig = sig;
927}
928#endif
929
930
931/**
932 * Main polling loop for async I/O events. This function will only return when
933 * re_cancel() is called or an error occured.
934 *
935 * @param signalh Optional Signal handler
936 *
937 * @return 0 if success, otherwise errorcode
938 */
939int re_main(re_signal_h *signalh)
940{
941 struct re *re = re_get();
942 int err;
943
944#ifdef HAVE_SIGNAL
945 if (signalh) {
946 (void)signal(SIGINT, signal_handler);
947 (void)signal(SIGALRM, signal_handler);
948 (void)signal(SIGTERM, signal_handler);
949 }
950#endif
951
952 if (re->polling) {
953 DEBUG_WARNING("main loop already polling\n");
954 return EALREADY;
955 }
956
957 err = poll_setup(re);
958 if (err)
959 goto out;
960
961 DEBUG_INFO("Using async I/O polling method: `%s'\n",
962 poll_method_name(re->method));
963
964 re->polling = true;
965
966 re_lock(re);
967 for (;;) {
968
969 if (re->sig) {
970 if (signalh)
971 signalh(re->sig);
972
973 re->sig = 0;
974 }
975
976 if (!re->polling) {
977 err = 0;
978 break;
979 }
980
981 err = fd_poll(re);
982 if (err) {
983 if (EINTR == err)
984 continue;
985
986#ifdef DARWIN
987 /* NOTE: workaround for Darwin */
988 if (EBADF == err)
989 continue;
990#endif
991
992 break;
993 }
994
995 tmr_poll(&re->tmrl);
996 }
997 re_unlock(re);
998
999 out:
1000 re->polling = false;
1001
1002 return err;
1003}
1004
1005
1006/**
1007 * Cancel the main polling loop
1008 */
1009void re_cancel(void)
1010{
1011 struct re *re = re_get();
1012
1013 re->polling = false;
1014}
1015
1016
1017/**
1018 * Debug the main polling loop
1019 *
1020 * @param pf Print handler where debug output is printed to
1021 * @param unused Unused parameter
1022 *
1023 * @return 0 if success, otherwise errorcode
1024 */
1025int re_debug(struct re_printf *pf, void *unused)
1026{
1027 struct re *re = re_get();
1028 int err = 0;
1029
1030 (void)unused;
1031
1032 err |= re_hprintf(pf, "re main loop:\n");
1033 err |= re_hprintf(pf, " maxfds: %d\n", re->maxfds);
1034 err |= re_hprintf(pf, " nfds: %d\n", re->nfds);
1035 err |= re_hprintf(pf, " method: %d (%s)\n", re->method,
1036 poll_method_name(re->method));
1037
1038 return err;
1039}
1040
1041
1042/**
1043 * Set async I/O polling method. This function can also be called while the
1044 * program is running.
1045 *
1046 * @param method New polling method
1047 *
1048 * @return 0 if success, otherwise errorcode
1049 */
1050int poll_method_set(enum poll_method method)
1051{
1052 struct re *re = re_get();
1053 int err;
1054
1055 err = fd_setsize(DEFAULT_MAXFDS);
1056 if (err)
1057 return err;
1058
1059 switch (method) {
1060
1061#ifdef HAVE_POLL
1062 case METHOD_POLL:
1063 break;
1064#endif
1065#ifdef HAVE_SELECT
1066 case METHOD_SELECT:
1067 if (re->maxfds > (int)FD_SETSIZE) {
1068 DEBUG_WARNING("SELECT: maxfds > FD_SETSIZE\n");
1069 return EMFILE;
1070 }
1071 break;
1072#endif
1073#ifdef HAVE_EPOLL
1074 case METHOD_EPOLL:
1075 if (!epoll_check())
1076 return EINVAL;
1077 break;
1078#endif
1079#ifdef HAVE_KQUEUE
1080 case METHOD_KQUEUE:
1081 break;
1082#endif
1083 default:
1084 DEBUG_WARNING("poll method not supported: '%s'\n",
1085 poll_method_name(method));
1086 return EINVAL;
1087 }
1088
1089 re->method = method;
1090 re->update = true;
1091
1092 DEBUG_INFO("Setting async I/O polling method to `%s'\n",
1093 poll_method_name(re->method));
1094
1095 err = poll_init(re);
1096 if (err)
1097 return err;
1098
1099 return rebuild_fds(re);
1100}
1101
1102
1103/**
1104 * Add a worker thread for this thread
1105 *
1106 * @return 0 if success, otherwise errorcode
1107 */
1108int re_thread_init(void)
1109{
1110#ifdef HAVE_PTHREAD
1111 struct re *re;
1112
1113 pthread_once(&pt_once, re_once);
1114
1115 re = pthread_getspecific(pt_key);
1116 if (re) {
1117 DEBUG_WARNING("thread_init: already added for thread %d\n",
1118 pthread_self());
1119 return EALREADY;
1120 }
1121
1122 re = malloc(sizeof(*re));
1123 if (!re)
1124 return ENOMEM;
1125
1126 memset(re, 0, sizeof(*re));
1127 pthread_mutex_init(&re->mutex, NULL);
1128 re->mutexp = &re->mutex;
1129
1130#ifdef HAVE_EPOLL
1131 re->epfd = -1;
1132#endif
1133
1134#ifdef HAVE_KQUEUE
1135 re->kqfd = -1;
1136#endif
1137
1138 pthread_setspecific(pt_key, re);
1139 return 0;
1140#else
1141 return ENOSYS;
1142#endif
1143}
1144
1145
1146/**
1147 * Remove the worker thread for this thread
1148 */
1149void re_thread_close(void)
1150{
1151#ifdef HAVE_PTHREAD
1152 struct re *re;
1153
1154 pthread_once(&pt_once, re_once);
1155
1156 re = pthread_getspecific(pt_key);
1157 if (re) {
1158 poll_close(re);
1159 free(re);
1160 pthread_setspecific(pt_key, NULL);
1161 }
1162#endif
1163}
1164
1165
1166/**
1167 * Enter an 're' thread
1168 *
1169 * @note Must only be called from a non-re thread
1170 */
1171void re_thread_enter(void)
1172{
1173 re_lock(re_get());
1174}
1175
1176
1177/**
1178 * Leave an 're' thread
1179 *
1180 * @note Must only be called from a non-re thread
1181 */
1182void re_thread_leave(void)
1183{
1184 re_unlock(re_get());
1185}
1186
1187
1188/**
1189 * Set an external mutex for this thread
1190 *
1191 * @param mutexp Pointer to external mutex, NULL to use internal
1192 */
1193void re_set_mutex(void *mutexp)
1194{
1195#ifdef HAVE_PTHREAD
1196 struct re *re = re_get();
1197
1198 re->mutexp = mutexp ? mutexp : &re->mutex;
1199#else
1200 (void)mutexp;
1201#endif
1202}
1203
1204
1205/**
1206 * Get the timer-list for this thread
1207 *
1208 * @return Timer list
1209 *
1210 * @note only used by tmr module
1211 */
1212struct list *tmrl_get(void);
1213struct list *tmrl_get(void)
1214{
1215 return &re_get()->tmrl;
1216}