12b15cb3dSCy Schubert /* 22b15cb3dSCy Schubert * work_thread.c - threads implementation for blocking worker child. 32b15cb3dSCy Schubert */ 42b15cb3dSCy Schubert #include <config.h> 52b15cb3dSCy Schubert #include "ntp_workimpl.h" 62b15cb3dSCy Schubert 72b15cb3dSCy Schubert #ifdef WORK_THREAD 82b15cb3dSCy Schubert 92b15cb3dSCy Schubert #include <stdio.h> 102b15cb3dSCy Schubert #include <ctype.h> 112b15cb3dSCy Schubert #include <signal.h> 122b15cb3dSCy Schubert #ifndef SYS_WINNT 132b15cb3dSCy Schubert #include <pthread.h> 142b15cb3dSCy Schubert #endif 152b15cb3dSCy Schubert 162b15cb3dSCy Schubert #include "ntp_stdlib.h" 172b15cb3dSCy Schubert #include "ntp_malloc.h" 182b15cb3dSCy Schubert #include "ntp_syslog.h" 192b15cb3dSCy Schubert #include "ntpd.h" 202b15cb3dSCy Schubert #include "ntp_io.h" 212b15cb3dSCy Schubert #include "ntp_assert.h" 222b15cb3dSCy Schubert #include "ntp_unixtime.h" 232b15cb3dSCy Schubert #include "timespecops.h" 242b15cb3dSCy Schubert #include "ntp_worker.h" 252b15cb3dSCy Schubert 262b15cb3dSCy Schubert #define CHILD_EXIT_REQ ((blocking_pipe_header *)(intptr_t)-1) 272b15cb3dSCy Schubert #define CHILD_GONE_RESP CHILD_EXIT_REQ 2868ba7e87SXin LI /* Queue size increments: 2968ba7e87SXin LI * The request queue grows a bit faster than the response queue -- the 3009100258SXin LI * daemon can push requests and pull results faster on avarage than the 3168ba7e87SXin LI * worker can process requests and push results... If this really pays 3268ba7e87SXin LI * off is debatable. 3368ba7e87SXin LI */ 342b15cb3dSCy Schubert #define WORKITEMS_ALLOC_INC 16 352b15cb3dSCy Schubert #define RESPONSES_ALLOC_INC 4 362b15cb3dSCy Schubert 3768ba7e87SXin LI /* Fiddle with min/max stack sizes. 64kB minimum seems to work, so we 3868ba7e87SXin LI * set the maximum to 256kB. If the minimum goes below the 3968ba7e87SXin LI * system-defined minimum stack size, we have to adjust accordingly. 4068ba7e87SXin LI */ 412b15cb3dSCy Schubert #ifndef THREAD_MINSTACKSIZE 422b15cb3dSCy Schubert # define THREAD_MINSTACKSIZE (64U * 1024) 432b15cb3dSCy Schubert #endif 4468ba7e87SXin LI 4568ba7e87SXin LI #ifndef THREAD_MAXSTACKSIZE 4668ba7e87SXin LI # define THREAD_MAXSTACKSIZE (256U * 1024) 4768ba7e87SXin LI #endif 4868ba7e87SXin LI 494e1ef62aSXin LI /* need a good integer to store a pointer... */ 504e1ef62aSXin LI #ifndef UINTPTR_T 514e1ef62aSXin LI # if defined(UINTPTR_MAX) 524e1ef62aSXin LI # define UINTPTR_T uintptr_t 534e1ef62aSXin LI # elif defined(UINT_PTR) 544e1ef62aSXin LI # define UINTPTR_T UINT_PTR 554e1ef62aSXin LI # else 564e1ef62aSXin LI # define UINTPTR_T size_t 574e1ef62aSXin LI # endif 584e1ef62aSXin LI #endif 594e1ef62aSXin LI 602b15cb3dSCy Schubert 612b15cb3dSCy Schubert #ifdef SYS_WINNT 623311ff84SXin LI 632b15cb3dSCy Schubert # define thread_exit(c) _endthreadex(c) 643311ff84SXin LI # define tickle_sem(sh) ReleaseSemaphore((sh->shnd), 1, NULL) 653311ff84SXin LI u_int WINAPI blocking_thread(void *); 663311ff84SXin LI static BOOL same_os_sema(const sem_ref obj, void * osobj); 673311ff84SXin LI 682b15cb3dSCy Schubert #else 693311ff84SXin LI 704e1ef62aSXin LI # define thread_exit(c) pthread_exit((void*)(UINTPTR_T)(c)) 712b15cb3dSCy Schubert # define tickle_sem sem_post 723311ff84SXin LI void * blocking_thread(void *); 733311ff84SXin LI static void block_thread_signals(sigset_t *); 743311ff84SXin LI 752b15cb3dSCy Schubert #endif 762b15cb3dSCy Schubert 772b15cb3dSCy Schubert #ifdef WORK_PIPE 782b15cb3dSCy Schubert addremove_io_fd_func addremove_io_fd; 792b15cb3dSCy Schubert #else 802b15cb3dSCy Schubert addremove_io_semaphore_func addremove_io_semaphore; 812b15cb3dSCy Schubert #endif 822b15cb3dSCy Schubert 832b15cb3dSCy Schubert static void start_blocking_thread(blocking_child *); 842b15cb3dSCy Schubert static void start_blocking_thread_internal(blocking_child *); 852b15cb3dSCy Schubert static void prepare_child_sems(blocking_child *); 862b15cb3dSCy Schubert static int wait_for_sem(sem_ref, struct timespec *); 873311ff84SXin LI static int ensure_workitems_empty_slot(blocking_child *); 883311ff84SXin LI static int ensure_workresp_empty_slot(blocking_child *); 892b15cb3dSCy Schubert static int queue_req_pointer(blocking_child *, blocking_pipe_header *); 902b15cb3dSCy Schubert static void cleanup_after_child(blocking_child *); 912b15cb3dSCy Schubert 924990d495SXin LI static sema_type worker_mmutex; 934990d495SXin LI static sem_ref worker_memlock; 942b15cb3dSCy Schubert 954990d495SXin LI /* -------------------------------------------------------------------- 964990d495SXin LI * locking the global worker state table (and other global stuff) 974990d495SXin LI */ 984990d495SXin LI void 994990d495SXin LI worker_global_lock( 1004990d495SXin LI int inOrOut) 1014990d495SXin LI { 1024990d495SXin LI if (worker_memlock) { 1034990d495SXin LI if (inOrOut) 1044990d495SXin LI wait_for_sem(worker_memlock, NULL); 1054990d495SXin LI else 1064990d495SXin LI tickle_sem(worker_memlock); 1074990d495SXin LI } 1084990d495SXin LI } 1094990d495SXin LI 1104990d495SXin LI /* -------------------------------------------------------------------- 1114990d495SXin LI * implementation isolation wrapper 1124990d495SXin LI */ 1132b15cb3dSCy Schubert void 1142b15cb3dSCy Schubert exit_worker( 1152b15cb3dSCy Schubert int exitcode 1162b15cb3dSCy Schubert ) 1172b15cb3dSCy Schubert { 1182b15cb3dSCy Schubert thread_exit(exitcode); /* see #define thread_exit */ 1192b15cb3dSCy Schubert } 1202b15cb3dSCy Schubert 1213311ff84SXin LI /* -------------------------------------------------------------------- 1223311ff84SXin LI * sleep for a given time or until the wakup semaphore is tickled. 1233311ff84SXin LI */ 1242b15cb3dSCy Schubert int 1252b15cb3dSCy Schubert worker_sleep( 1262b15cb3dSCy Schubert blocking_child * c, 1272b15cb3dSCy Schubert time_t seconds 1282b15cb3dSCy Schubert ) 1292b15cb3dSCy Schubert { 1302b15cb3dSCy Schubert struct timespec until; 1312b15cb3dSCy Schubert int rc; 1322b15cb3dSCy Schubert 1332b15cb3dSCy Schubert # ifdef HAVE_CLOCK_GETTIME 1342b15cb3dSCy Schubert if (0 != clock_gettime(CLOCK_REALTIME, &until)) { 1352b15cb3dSCy Schubert msyslog(LOG_ERR, "worker_sleep: clock_gettime() failed: %m"); 1362b15cb3dSCy Schubert return -1; 1372b15cb3dSCy Schubert } 1382b15cb3dSCy Schubert # else 1392b15cb3dSCy Schubert if (0 != getclock(TIMEOFDAY, &until)) { 1402b15cb3dSCy Schubert msyslog(LOG_ERR, "worker_sleep: getclock() failed: %m"); 1412b15cb3dSCy Schubert return -1; 1422b15cb3dSCy Schubert } 1432b15cb3dSCy Schubert # endif 1442b15cb3dSCy Schubert until.tv_sec += seconds; 1452b15cb3dSCy Schubert rc = wait_for_sem(c->wake_scheduled_sleep, &until); 1462b15cb3dSCy Schubert if (0 == rc) 1472b15cb3dSCy Schubert return -1; 1482b15cb3dSCy Schubert if (-1 == rc && ETIMEDOUT == errno) 1492b15cb3dSCy Schubert return 0; 1502b15cb3dSCy Schubert msyslog(LOG_ERR, "worker_sleep: sem_timedwait: %m"); 1512b15cb3dSCy Schubert return -1; 1522b15cb3dSCy Schubert } 1532b15cb3dSCy Schubert 1542b15cb3dSCy Schubert 1553311ff84SXin LI /* -------------------------------------------------------------------- 1563311ff84SXin LI * Wake up a worker that takes a nap. 1573311ff84SXin LI */ 1582b15cb3dSCy Schubert void 1592b15cb3dSCy Schubert interrupt_worker_sleep(void) 1602b15cb3dSCy Schubert { 1612b15cb3dSCy Schubert u_int idx; 1622b15cb3dSCy Schubert blocking_child * c; 1632b15cb3dSCy Schubert 1642b15cb3dSCy Schubert for (idx = 0; idx < blocking_children_alloc; idx++) { 1652b15cb3dSCy Schubert c = blocking_children[idx]; 1662b15cb3dSCy Schubert if (NULL == c || NULL == c->wake_scheduled_sleep) 1672b15cb3dSCy Schubert continue; 1682b15cb3dSCy Schubert tickle_sem(c->wake_scheduled_sleep); 1692b15cb3dSCy Schubert } 1702b15cb3dSCy Schubert } 1712b15cb3dSCy Schubert 1723311ff84SXin LI /* -------------------------------------------------------------------- 1733311ff84SXin LI * Make sure there is an empty slot at the head of the request 1743311ff84SXin LI * queue. Tell if the queue is currently empty. 1753311ff84SXin LI */ 1763311ff84SXin LI static int 1772b15cb3dSCy Schubert ensure_workitems_empty_slot( 1782b15cb3dSCy Schubert blocking_child *c 1792b15cb3dSCy Schubert ) 1802b15cb3dSCy Schubert { 1813311ff84SXin LI /* 1823311ff84SXin LI ** !!! PRECONDITION: caller holds access lock! 1833311ff84SXin LI ** 1843311ff84SXin LI ** This simply tries to increase the size of the buffer if it 1853311ff84SXin LI ** becomes full. The resize operation does *not* maintain the 1863311ff84SXin LI ** order of requests, but that should be irrelevant since the 1873311ff84SXin LI ** processing is considered asynchronous anyway. 1883311ff84SXin LI ** 1893311ff84SXin LI ** Return if the buffer is currently empty. 1903311ff84SXin LI */ 1913311ff84SXin LI 1923311ff84SXin LI static const size_t each = 1933311ff84SXin LI sizeof(blocking_children[0]->workitems[0]); 1943311ff84SXin LI 1952b15cb3dSCy Schubert size_t new_alloc; 1963311ff84SXin LI size_t slots_used; 19768ba7e87SXin LI size_t sidx; 1982b15cb3dSCy Schubert 1993311ff84SXin LI slots_used = c->head_workitem - c->tail_workitem; 2003311ff84SXin LI if (slots_used >= c->workitems_alloc) { 2012b15cb3dSCy Schubert new_alloc = c->workitems_alloc + WORKITEMS_ALLOC_INC; 2023311ff84SXin LI c->workitems = erealloc(c->workitems, new_alloc * each); 20368ba7e87SXin LI for (sidx = c->workitems_alloc; sidx < new_alloc; ++sidx) 20468ba7e87SXin LI c->workitems[sidx] = NULL; 2053311ff84SXin LI c->tail_workitem = 0; 2063311ff84SXin LI c->head_workitem = c->workitems_alloc; 2072b15cb3dSCy Schubert c->workitems_alloc = new_alloc; 2082b15cb3dSCy Schubert } 20968ba7e87SXin LI INSIST(NULL == c->workitems[c->head_workitem % c->workitems_alloc]); 2103311ff84SXin LI return (0 == slots_used); 2113311ff84SXin LI } 2122b15cb3dSCy Schubert 2133311ff84SXin LI /* -------------------------------------------------------------------- 2143311ff84SXin LI * Make sure there is an empty slot at the head of the response 2153311ff84SXin LI * queue. Tell if the queue is currently empty. 2163311ff84SXin LI */ 2173311ff84SXin LI static int 2182b15cb3dSCy Schubert ensure_workresp_empty_slot( 2192b15cb3dSCy Schubert blocking_child *c 2202b15cb3dSCy Schubert ) 2212b15cb3dSCy Schubert { 2223311ff84SXin LI /* 2233311ff84SXin LI ** !!! PRECONDITION: caller holds access lock! 2243311ff84SXin LI ** 2253311ff84SXin LI ** Works like the companion function above. 2263311ff84SXin LI */ 2273311ff84SXin LI 2283311ff84SXin LI static const size_t each = 2293311ff84SXin LI sizeof(blocking_children[0]->responses[0]); 2303311ff84SXin LI 2312b15cb3dSCy Schubert size_t new_alloc; 2323311ff84SXin LI size_t slots_used; 23368ba7e87SXin LI size_t sidx; 2342b15cb3dSCy Schubert 2353311ff84SXin LI slots_used = c->head_response - c->tail_response; 2363311ff84SXin LI if (slots_used >= c->responses_alloc) { 2372b15cb3dSCy Schubert new_alloc = c->responses_alloc + RESPONSES_ALLOC_INC; 2383311ff84SXin LI c->responses = erealloc(c->responses, new_alloc * each); 23968ba7e87SXin LI for (sidx = c->responses_alloc; sidx < new_alloc; ++sidx) 24068ba7e87SXin LI c->responses[sidx] = NULL; 2413311ff84SXin LI c->tail_response = 0; 2423311ff84SXin LI c->head_response = c->responses_alloc; 2432b15cb3dSCy Schubert c->responses_alloc = new_alloc; 2442b15cb3dSCy Schubert } 24568ba7e87SXin LI INSIST(NULL == c->responses[c->head_response % c->responses_alloc]); 2463311ff84SXin LI return (0 == slots_used); 2473311ff84SXin LI } 2482b15cb3dSCy Schubert 2492b15cb3dSCy Schubert 2503311ff84SXin LI /* -------------------------------------------------------------------- 2512b15cb3dSCy Schubert * queue_req_pointer() - append a work item or idle exit request to 2523311ff84SXin LI * blocking_workitems[]. Employ proper locking. 2532b15cb3dSCy Schubert */ 2542b15cb3dSCy Schubert static int 2552b15cb3dSCy Schubert queue_req_pointer( 2562b15cb3dSCy Schubert blocking_child * c, 2572b15cb3dSCy Schubert blocking_pipe_header * hdr 2582b15cb3dSCy Schubert ) 2592b15cb3dSCy Schubert { 2603311ff84SXin LI size_t qhead; 2612b15cb3dSCy Schubert 2623311ff84SXin LI /* >>>> ACCESS LOCKING STARTS >>>> */ 2633311ff84SXin LI wait_for_sem(c->accesslock, NULL); 2643311ff84SXin LI ensure_workitems_empty_slot(c); 2653311ff84SXin LI qhead = c->head_workitem; 2663311ff84SXin LI c->workitems[qhead % c->workitems_alloc] = hdr; 2673311ff84SXin LI c->head_workitem = 1 + qhead; 2683311ff84SXin LI tickle_sem(c->accesslock); 2693311ff84SXin LI /* <<<< ACCESS LOCKING ENDS <<<< */ 2703311ff84SXin LI 2713311ff84SXin LI /* queue consumer wake-up notification */ 2723311ff84SXin LI tickle_sem(c->workitems_pending); 2732b15cb3dSCy Schubert 2742b15cb3dSCy Schubert return 0; 2752b15cb3dSCy Schubert } 2762b15cb3dSCy Schubert 2773311ff84SXin LI /* -------------------------------------------------------------------- 2783311ff84SXin LI * API function to make sure a worker is running, a proper private copy 2793311ff84SXin LI * of the data is made, the data eneterd into the queue and the worker 2803311ff84SXin LI * is signalled. 2813311ff84SXin LI */ 2822b15cb3dSCy Schubert int 2832b15cb3dSCy Schubert send_blocking_req_internal( 2842b15cb3dSCy Schubert blocking_child * c, 2852b15cb3dSCy Schubert blocking_pipe_header * hdr, 2862b15cb3dSCy Schubert void * data 2872b15cb3dSCy Schubert ) 2882b15cb3dSCy Schubert { 2892b15cb3dSCy Schubert blocking_pipe_header * threadcopy; 2902b15cb3dSCy Schubert size_t payload_octets; 2912b15cb3dSCy Schubert 2922b15cb3dSCy Schubert REQUIRE(hdr != NULL); 2932b15cb3dSCy Schubert REQUIRE(data != NULL); 2942b15cb3dSCy Schubert DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == hdr->magic_sig); 2952b15cb3dSCy Schubert 2962b15cb3dSCy Schubert if (hdr->octets <= sizeof(*hdr)) 2972b15cb3dSCy Schubert return 1; /* failure */ 2982b15cb3dSCy Schubert payload_octets = hdr->octets - sizeof(*hdr); 2992b15cb3dSCy Schubert 3003311ff84SXin LI if (NULL == c->thread_ref) 3012b15cb3dSCy Schubert start_blocking_thread(c); 3022b15cb3dSCy Schubert threadcopy = emalloc(hdr->octets); 3032b15cb3dSCy Schubert memcpy(threadcopy, hdr, sizeof(*hdr)); 3042b15cb3dSCy Schubert memcpy((char *)threadcopy + sizeof(*hdr), data, payload_octets); 3052b15cb3dSCy Schubert 3062b15cb3dSCy Schubert return queue_req_pointer(c, threadcopy); 3072b15cb3dSCy Schubert } 3082b15cb3dSCy Schubert 3093311ff84SXin LI /* -------------------------------------------------------------------- 3103311ff84SXin LI * Wait for the 'incoming queue no longer empty' signal, lock the shared 3113311ff84SXin LI * structure and dequeue an item. 3123311ff84SXin LI */ 3132b15cb3dSCy Schubert blocking_pipe_header * 3142b15cb3dSCy Schubert receive_blocking_req_internal( 3152b15cb3dSCy Schubert blocking_child * c 3162b15cb3dSCy Schubert ) 3172b15cb3dSCy Schubert { 3182b15cb3dSCy Schubert blocking_pipe_header * req; 3193311ff84SXin LI size_t qhead, qtail; 3202b15cb3dSCy Schubert 3213311ff84SXin LI req = NULL; 3222b15cb3dSCy Schubert do { 3233311ff84SXin LI /* wait for tickle from the producer side */ 3243311ff84SXin LI wait_for_sem(c->workitems_pending, NULL); 3252b15cb3dSCy Schubert 3263311ff84SXin LI /* >>>> ACCESS LOCKING STARTS >>>> */ 3273311ff84SXin LI wait_for_sem(c->accesslock, NULL); 3283311ff84SXin LI qhead = c->head_workitem; 3293311ff84SXin LI do { 3303311ff84SXin LI qtail = c->tail_workitem; 3313311ff84SXin LI if (qhead == qtail) 3323311ff84SXin LI break; 3333311ff84SXin LI c->tail_workitem = qtail + 1; 3343311ff84SXin LI qtail %= c->workitems_alloc; 3353311ff84SXin LI req = c->workitems[qtail]; 3363311ff84SXin LI c->workitems[qtail] = NULL; 3373311ff84SXin LI } while (NULL == req); 3383311ff84SXin LI tickle_sem(c->accesslock); 3393311ff84SXin LI /* <<<< ACCESS LOCKING ENDS <<<< */ 3403311ff84SXin LI 3413311ff84SXin LI } while (NULL == req); 3423311ff84SXin LI 3432b15cb3dSCy Schubert INSIST(NULL != req); 3442b15cb3dSCy Schubert if (CHILD_EXIT_REQ == req) { /* idled out */ 3452b15cb3dSCy Schubert send_blocking_resp_internal(c, CHILD_GONE_RESP); 3462b15cb3dSCy Schubert req = NULL; 3472b15cb3dSCy Schubert } 3482b15cb3dSCy Schubert 3492b15cb3dSCy Schubert return req; 3502b15cb3dSCy Schubert } 3512b15cb3dSCy Schubert 3523311ff84SXin LI /* -------------------------------------------------------------------- 3533311ff84SXin LI * Push a response into the return queue and eventually tickle the 3543311ff84SXin LI * receiver. 3553311ff84SXin LI */ 3562b15cb3dSCy Schubert int 3572b15cb3dSCy Schubert send_blocking_resp_internal( 3582b15cb3dSCy Schubert blocking_child * c, 3592b15cb3dSCy Schubert blocking_pipe_header * resp 3602b15cb3dSCy Schubert ) 3612b15cb3dSCy Schubert { 3623311ff84SXin LI size_t qhead; 3633311ff84SXin LI int empty; 3642b15cb3dSCy Schubert 3653311ff84SXin LI /* >>>> ACCESS LOCKING STARTS >>>> */ 3663311ff84SXin LI wait_for_sem(c->accesslock, NULL); 3673311ff84SXin LI empty = ensure_workresp_empty_slot(c); 3683311ff84SXin LI qhead = c->head_response; 3693311ff84SXin LI c->responses[qhead % c->responses_alloc] = resp; 3703311ff84SXin LI c->head_response = 1 + qhead; 3713311ff84SXin LI tickle_sem(c->accesslock); 3723311ff84SXin LI /* <<<< ACCESS LOCKING ENDS <<<< */ 3732b15cb3dSCy Schubert 3743311ff84SXin LI /* queue consumer wake-up notification */ 3753311ff84SXin LI if (empty) 3763311ff84SXin LI { 3772b15cb3dSCy Schubert # ifdef WORK_PIPE 3784e1ef62aSXin LI if (1 != write(c->resp_write_pipe, "", 1)) 379*f5f40dd6SCy Schubert msyslog(LOG_WARNING, "async resolver: blocking_get%sinfo" 380*f5f40dd6SCy Schubert " failed to notify main thread!", 381*f5f40dd6SCy Schubert (BLOCKING_GETNAMEINFO == resp->rtype) 382*f5f40dd6SCy Schubert ? "name" 383*f5f40dd6SCy Schubert : "addr" 384*f5f40dd6SCy Schubert ); 3852b15cb3dSCy Schubert # else 3863311ff84SXin LI tickle_sem(c->responses_pending); 3872b15cb3dSCy Schubert # endif 3883311ff84SXin LI } 3892b15cb3dSCy Schubert return 0; 3902b15cb3dSCy Schubert } 3912b15cb3dSCy Schubert 3922b15cb3dSCy Schubert 3932b15cb3dSCy Schubert #ifndef WORK_PIPE 3943311ff84SXin LI 3953311ff84SXin LI /* -------------------------------------------------------------------- 396a466cc55SCy Schubert * Check if a (Windows-)handle to a semaphore is actually the same we 3973311ff84SXin LI * are using inside the sema wrapper. 3983311ff84SXin LI */ 3993311ff84SXin LI static BOOL 4003311ff84SXin LI same_os_sema( 4013311ff84SXin LI const sem_ref obj, 4023311ff84SXin LI void* osh 4033311ff84SXin LI ) 4043311ff84SXin LI { 4053311ff84SXin LI return obj && osh && (obj->shnd == (HANDLE)osh); 4063311ff84SXin LI } 4073311ff84SXin LI 4083311ff84SXin LI /* -------------------------------------------------------------------- 4093311ff84SXin LI * Find the shared context that associates to an OS handle and make sure 4103311ff84SXin LI * the data is dequeued and processed. 4113311ff84SXin LI */ 4122b15cb3dSCy Schubert void 4132b15cb3dSCy Schubert handle_blocking_resp_sem( 4142b15cb3dSCy Schubert void * context 4152b15cb3dSCy Schubert ) 4162b15cb3dSCy Schubert { 4172b15cb3dSCy Schubert blocking_child * c; 4182b15cb3dSCy Schubert u_int idx; 4192b15cb3dSCy Schubert 4202b15cb3dSCy Schubert c = NULL; 4212b15cb3dSCy Schubert for (idx = 0; idx < blocking_children_alloc; idx++) { 4222b15cb3dSCy Schubert c = blocking_children[idx]; 4233311ff84SXin LI if (c != NULL && 4243311ff84SXin LI c->thread_ref != NULL && 4253311ff84SXin LI same_os_sema(c->responses_pending, context)) 4262b15cb3dSCy Schubert break; 4272b15cb3dSCy Schubert } 4282b15cb3dSCy Schubert if (idx < blocking_children_alloc) 4292b15cb3dSCy Schubert process_blocking_resp(c); 4302b15cb3dSCy Schubert } 4312b15cb3dSCy Schubert #endif /* !WORK_PIPE */ 4322b15cb3dSCy Schubert 4333311ff84SXin LI /* -------------------------------------------------------------------- 4343311ff84SXin LI * Fetch the next response from the return queue. In case of signalling 4353311ff84SXin LI * via pipe, make sure the pipe is flushed, too. 4363311ff84SXin LI */ 4372b15cb3dSCy Schubert blocking_pipe_header * 4382b15cb3dSCy Schubert receive_blocking_resp_internal( 4392b15cb3dSCy Schubert blocking_child * c 4402b15cb3dSCy Schubert ) 4412b15cb3dSCy Schubert { 4422b15cb3dSCy Schubert blocking_pipe_header * removed; 4433311ff84SXin LI size_t qhead, qtail, slot; 4443311ff84SXin LI 4452b15cb3dSCy Schubert #ifdef WORK_PIPE 4462b15cb3dSCy Schubert int rc; 4472b15cb3dSCy Schubert char scratch[32]; 4482b15cb3dSCy Schubert 4493311ff84SXin LI do 4502b15cb3dSCy Schubert rc = read(c->resp_read_pipe, scratch, sizeof(scratch)); 4513311ff84SXin LI while (-1 == rc && EINTR == errno); 4522b15cb3dSCy Schubert #endif 4533311ff84SXin LI 4543311ff84SXin LI /* >>>> ACCESS LOCKING STARTS >>>> */ 4553311ff84SXin LI wait_for_sem(c->accesslock, NULL); 4563311ff84SXin LI qhead = c->head_response; 4573311ff84SXin LI qtail = c->tail_response; 4583311ff84SXin LI for (removed = NULL; !removed && (qhead != qtail); ++qtail) { 4593311ff84SXin LI slot = qtail % c->responses_alloc; 4603311ff84SXin LI removed = c->responses[slot]; 4613311ff84SXin LI c->responses[slot] = NULL; 4623311ff84SXin LI } 4633311ff84SXin LI c->tail_response = qtail; 4643311ff84SXin LI tickle_sem(c->accesslock); 4653311ff84SXin LI /* <<<< ACCESS LOCKING ENDS <<<< */ 4663311ff84SXin LI 4672b15cb3dSCy Schubert if (NULL != removed) { 4682b15cb3dSCy Schubert DEBUG_ENSURE(CHILD_GONE_RESP == removed || 4692b15cb3dSCy Schubert BLOCKING_RESP_MAGIC == removed->magic_sig); 4702b15cb3dSCy Schubert } 4712b15cb3dSCy Schubert if (CHILD_GONE_RESP == removed) { 4722b15cb3dSCy Schubert cleanup_after_child(c); 4732b15cb3dSCy Schubert removed = NULL; 4742b15cb3dSCy Schubert } 4752b15cb3dSCy Schubert 4762b15cb3dSCy Schubert return removed; 4772b15cb3dSCy Schubert } 4782b15cb3dSCy Schubert 4793311ff84SXin LI /* -------------------------------------------------------------------- 4803311ff84SXin LI * Light up a new worker. 4813311ff84SXin LI */ 4822b15cb3dSCy Schubert static void 4832b15cb3dSCy Schubert start_blocking_thread( 4842b15cb3dSCy Schubert blocking_child * c 4852b15cb3dSCy Schubert ) 4862b15cb3dSCy Schubert { 4872b15cb3dSCy Schubert 4882b15cb3dSCy Schubert DEBUG_INSIST(!c->reusable); 4892b15cb3dSCy Schubert 4902b15cb3dSCy Schubert prepare_child_sems(c); 4912b15cb3dSCy Schubert start_blocking_thread_internal(c); 4922b15cb3dSCy Schubert } 4932b15cb3dSCy Schubert 4943311ff84SXin LI /* -------------------------------------------------------------------- 4953311ff84SXin LI * Create a worker thread. There are several differences between POSIX 496*f5f40dd6SCy Schubert * and Windows, of course -- most notably the Windows thread is a 4973311ff84SXin LI * detached thread, and we keep the handle around until we want to get 4983311ff84SXin LI * rid of the thread. The notification scheme also differs: Windows 4993311ff84SXin LI * makes use of semaphores in both directions, POSIX uses a pipe for 5003311ff84SXin LI * integration with 'select()' or alike. 5013311ff84SXin LI */ 5022b15cb3dSCy Schubert static void 5032b15cb3dSCy Schubert start_blocking_thread_internal( 5042b15cb3dSCy Schubert blocking_child * c 5052b15cb3dSCy Schubert ) 5062b15cb3dSCy Schubert #ifdef SYS_WINNT 5072b15cb3dSCy Schubert { 5082b15cb3dSCy Schubert BOOL resumed; 5092b15cb3dSCy Schubert 5103311ff84SXin LI c->thread_ref = NULL; 5113311ff84SXin LI (*addremove_io_semaphore)(c->responses_pending->shnd, FALSE); 5123311ff84SXin LI c->thr_table[0].thnd = 5132b15cb3dSCy Schubert (HANDLE)_beginthreadex( 5142b15cb3dSCy Schubert NULL, 5152b15cb3dSCy Schubert 0, 5162b15cb3dSCy Schubert &blocking_thread, 5172b15cb3dSCy Schubert c, 5182b15cb3dSCy Schubert CREATE_SUSPENDED, 5193311ff84SXin LI NULL); 5202b15cb3dSCy Schubert 5213311ff84SXin LI if (NULL == c->thr_table[0].thnd) { 5222b15cb3dSCy Schubert msyslog(LOG_ERR, "start blocking thread failed: %m"); 5232b15cb3dSCy Schubert exit(-1); 5242b15cb3dSCy Schubert } 5252b15cb3dSCy Schubert /* remember the thread priority is only within the process class */ 5263311ff84SXin LI if (!SetThreadPriority(c->thr_table[0].thnd, 527*f5f40dd6SCy Schubert THREAD_PRIORITY_BELOW_NORMAL)) { 5282b15cb3dSCy Schubert msyslog(LOG_ERR, "Error lowering blocking thread priority: %m"); 529*f5f40dd6SCy Schubert } 530*f5f40dd6SCy Schubert if (NULL != pSetThreadDescription) { 531*f5f40dd6SCy Schubert (*pSetThreadDescription)(c->thr_table[0].thnd, L"ntp_worker"); 532*f5f40dd6SCy Schubert } 5333311ff84SXin LI resumed = ResumeThread(c->thr_table[0].thnd); 5342b15cb3dSCy Schubert DEBUG_INSIST(resumed); 5353311ff84SXin LI c->thread_ref = &c->thr_table[0]; 5362b15cb3dSCy Schubert } 5372b15cb3dSCy Schubert #else /* pthreads start_blocking_thread_internal() follows */ 5382b15cb3dSCy Schubert { 5392b15cb3dSCy Schubert # ifdef NEED_PTHREAD_INIT 5402b15cb3dSCy Schubert static int pthread_init_called; 5412b15cb3dSCy Schubert # endif 5422b15cb3dSCy Schubert pthread_attr_t thr_attr; 5432b15cb3dSCy Schubert int rc; 5442b15cb3dSCy Schubert int pipe_ends[2]; /* read then write */ 5452b15cb3dSCy Schubert int is_pipe; 5462b15cb3dSCy Schubert int flags; 54768ba7e87SXin LI size_t ostacksize; 54868ba7e87SXin LI size_t nstacksize; 5492b15cb3dSCy Schubert sigset_t saved_sig_mask; 5502b15cb3dSCy Schubert 5513311ff84SXin LI c->thread_ref = NULL; 5523311ff84SXin LI 5532b15cb3dSCy Schubert # ifdef NEED_PTHREAD_INIT 5542b15cb3dSCy Schubert /* 5552b15cb3dSCy Schubert * from lib/isc/unix/app.c: 5562b15cb3dSCy Schubert * BSDI 3.1 seg faults in pthread_sigmask() if we don't do this. 5572b15cb3dSCy Schubert */ 5582b15cb3dSCy Schubert if (!pthread_init_called) { 5592b15cb3dSCy Schubert pthread_init(); 5602b15cb3dSCy Schubert pthread_init_called = TRUE; 5612b15cb3dSCy Schubert } 5622b15cb3dSCy Schubert # endif 5632b15cb3dSCy Schubert 5642b15cb3dSCy Schubert rc = pipe_socketpair(&pipe_ends[0], &is_pipe); 5652b15cb3dSCy Schubert if (0 != rc) { 5662b15cb3dSCy Schubert msyslog(LOG_ERR, "start_blocking_thread: pipe_socketpair() %m"); 5672b15cb3dSCy Schubert exit(1); 5682b15cb3dSCy Schubert } 5692b15cb3dSCy Schubert c->resp_read_pipe = move_fd(pipe_ends[0]); 5702b15cb3dSCy Schubert c->resp_write_pipe = move_fd(pipe_ends[1]); 5712b15cb3dSCy Schubert c->ispipe = is_pipe; 5722b15cb3dSCy Schubert flags = fcntl(c->resp_read_pipe, F_GETFL, 0); 5732b15cb3dSCy Schubert if (-1 == flags) { 5742b15cb3dSCy Schubert msyslog(LOG_ERR, "start_blocking_thread: fcntl(F_GETFL) %m"); 5752b15cb3dSCy Schubert exit(1); 5762b15cb3dSCy Schubert } 5772b15cb3dSCy Schubert rc = fcntl(c->resp_read_pipe, F_SETFL, O_NONBLOCK | flags); 5782b15cb3dSCy Schubert if (-1 == rc) { 5792b15cb3dSCy Schubert msyslog(LOG_ERR, 5802b15cb3dSCy Schubert "start_blocking_thread: fcntl(F_SETFL, O_NONBLOCK) %m"); 5812b15cb3dSCy Schubert exit(1); 5822b15cb3dSCy Schubert } 5832b15cb3dSCy Schubert (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, FALSE); 5842b15cb3dSCy Schubert pthread_attr_init(&thr_attr); 5852b15cb3dSCy Schubert pthread_attr_setdetachstate(&thr_attr, PTHREAD_CREATE_DETACHED); 5862b15cb3dSCy Schubert #if defined(HAVE_PTHREAD_ATTR_GETSTACKSIZE) && \ 5872b15cb3dSCy Schubert defined(HAVE_PTHREAD_ATTR_SETSTACKSIZE) 58868ba7e87SXin LI rc = pthread_attr_getstacksize(&thr_attr, &ostacksize); 58968ba7e87SXin LI if (0 != rc) { 5902b15cb3dSCy Schubert msyslog(LOG_ERR, 59168ba7e87SXin LI "start_blocking_thread: pthread_attr_getstacksize() -> %s", 59268ba7e87SXin LI strerror(rc)); 59368ba7e87SXin LI } else { 59468ba7e87SXin LI nstacksize = ostacksize; 595a466cc55SCy Schubert /* order is important here: first clamp on upper limit, 596a466cc55SCy Schubert * and the PTHREAD min stack size is ultimate override! 597a466cc55SCy Schubert */ 598a466cc55SCy Schubert if (nstacksize > THREAD_MAXSTACKSIZE) 599a466cc55SCy Schubert nstacksize = THREAD_MAXSTACKSIZE; 600a466cc55SCy Schubert # ifdef PTHREAD_STACK_MAX 601a466cc55SCy Schubert if (nstacksize > PTHREAD_STACK_MAX) 602a466cc55SCy Schubert nstacksize = PTHREAD_STACK_MAX; 603a466cc55SCy Schubert # endif 604a466cc55SCy Schubert 605a466cc55SCy Schubert /* now clamp on lower stack limit. */ 606a466cc55SCy Schubert if (nstacksize < THREAD_MINSTACKSIZE) 607a466cc55SCy Schubert nstacksize = THREAD_MINSTACKSIZE; 608a466cc55SCy Schubert # ifdef PTHREAD_STACK_MIN 609a466cc55SCy Schubert if (nstacksize < PTHREAD_STACK_MIN) 610a466cc55SCy Schubert nstacksize = PTHREAD_STACK_MIN; 611a466cc55SCy Schubert # endif 612a466cc55SCy Schubert 61368ba7e87SXin LI if (nstacksize != ostacksize) 61468ba7e87SXin LI rc = pthread_attr_setstacksize(&thr_attr, nstacksize); 61568ba7e87SXin LI if (0 != rc) 6162b15cb3dSCy Schubert msyslog(LOG_ERR, 61768ba7e87SXin LI "start_blocking_thread: pthread_attr_setstacksize(0x%lx -> 0x%lx) -> %s", 61868ba7e87SXin LI (u_long)ostacksize, (u_long)nstacksize, 61968ba7e87SXin LI strerror(rc)); 6202b15cb3dSCy Schubert } 6212b15cb3dSCy Schubert #else 62268ba7e87SXin LI UNUSED_ARG(nstacksize); 62368ba7e87SXin LI UNUSED_ARG(ostacksize); 6242b15cb3dSCy Schubert #endif 6252b15cb3dSCy Schubert #if defined(PTHREAD_SCOPE_SYSTEM) && defined(NEED_PTHREAD_SCOPE_SYSTEM) 6262b15cb3dSCy Schubert pthread_attr_setscope(&thr_attr, PTHREAD_SCOPE_SYSTEM); 6272b15cb3dSCy Schubert #endif 6282b15cb3dSCy Schubert c->thread_ref = emalloc_zero(sizeof(*c->thread_ref)); 6292b15cb3dSCy Schubert block_thread_signals(&saved_sig_mask); 6303311ff84SXin LI rc = pthread_create(&c->thr_table[0], &thr_attr, 6312b15cb3dSCy Schubert &blocking_thread, c); 6322b15cb3dSCy Schubert pthread_sigmask(SIG_SETMASK, &saved_sig_mask, NULL); 6332b15cb3dSCy Schubert pthread_attr_destroy(&thr_attr); 6342b15cb3dSCy Schubert if (0 != rc) { 63568ba7e87SXin LI msyslog(LOG_ERR, "start_blocking_thread: pthread_create() -> %s", 63668ba7e87SXin LI strerror(rc)); 6372b15cb3dSCy Schubert exit(1); 6382b15cb3dSCy Schubert } 6393311ff84SXin LI c->thread_ref = &c->thr_table[0]; 6402b15cb3dSCy Schubert } 6412b15cb3dSCy Schubert #endif 6422b15cb3dSCy Schubert 6433311ff84SXin LI /* -------------------------------------------------------------------- 6442b15cb3dSCy Schubert * block_thread_signals() 6452b15cb3dSCy Schubert * 6462b15cb3dSCy Schubert * Temporarily block signals used by ntpd main thread, so that signal 6472b15cb3dSCy Schubert * mask inherited by child threads leaves them blocked. Returns prior 6482b15cb3dSCy Schubert * active signal mask via pmask, to be restored by the main thread 6492b15cb3dSCy Schubert * after pthread_create(). 6502b15cb3dSCy Schubert */ 6512b15cb3dSCy Schubert #ifndef SYS_WINNT 6522b15cb3dSCy Schubert void 6532b15cb3dSCy Schubert block_thread_signals( 6542b15cb3dSCy Schubert sigset_t * pmask 6552b15cb3dSCy Schubert ) 6562b15cb3dSCy Schubert { 6572b15cb3dSCy Schubert sigset_t block; 6582b15cb3dSCy Schubert 6592b15cb3dSCy Schubert sigemptyset(&block); 6602b15cb3dSCy Schubert # ifdef HAVE_SIGNALED_IO 6612b15cb3dSCy Schubert # ifdef SIGIO 6622b15cb3dSCy Schubert sigaddset(&block, SIGIO); 6632b15cb3dSCy Schubert # endif 6642b15cb3dSCy Schubert # ifdef SIGPOLL 6652b15cb3dSCy Schubert sigaddset(&block, SIGPOLL); 6662b15cb3dSCy Schubert # endif 6672b15cb3dSCy Schubert # endif /* HAVE_SIGNALED_IO */ 6682b15cb3dSCy Schubert sigaddset(&block, SIGALRM); 6692b15cb3dSCy Schubert sigaddset(&block, MOREDEBUGSIG); 6702b15cb3dSCy Schubert sigaddset(&block, LESSDEBUGSIG); 6712b15cb3dSCy Schubert # ifdef SIGDIE1 6722b15cb3dSCy Schubert sigaddset(&block, SIGDIE1); 6732b15cb3dSCy Schubert # endif 6742b15cb3dSCy Schubert # ifdef SIGDIE2 6752b15cb3dSCy Schubert sigaddset(&block, SIGDIE2); 6762b15cb3dSCy Schubert # endif 6772b15cb3dSCy Schubert # ifdef SIGDIE3 6782b15cb3dSCy Schubert sigaddset(&block, SIGDIE3); 6792b15cb3dSCy Schubert # endif 6802b15cb3dSCy Schubert # ifdef SIGDIE4 6812b15cb3dSCy Schubert sigaddset(&block, SIGDIE4); 6822b15cb3dSCy Schubert # endif 6832b15cb3dSCy Schubert # ifdef SIGBUS 6842b15cb3dSCy Schubert sigaddset(&block, SIGBUS); 6852b15cb3dSCy Schubert # endif 6862b15cb3dSCy Schubert sigemptyset(pmask); 6872b15cb3dSCy Schubert pthread_sigmask(SIG_BLOCK, &block, pmask); 6882b15cb3dSCy Schubert } 6892b15cb3dSCy Schubert #endif /* !SYS_WINNT */ 6902b15cb3dSCy Schubert 6912b15cb3dSCy Schubert 6923311ff84SXin LI /* -------------------------------------------------------------------- 6933311ff84SXin LI * Create & destroy semaphores. This is sufficiently different between 6943311ff84SXin LI * POSIX and Windows to warrant wrapper functions and close enough to 6953311ff84SXin LI * use the concept of synchronization via semaphore for all platforms. 6963311ff84SXin LI */ 6973311ff84SXin LI static sem_ref 6983311ff84SXin LI create_sema( 6993311ff84SXin LI sema_type* semptr, 7003311ff84SXin LI u_int inival, 7013311ff84SXin LI u_int maxval) 7023311ff84SXin LI { 7033311ff84SXin LI #ifdef SYS_WINNT 7043311ff84SXin LI 7053311ff84SXin LI long svini, svmax; 7063311ff84SXin LI if (NULL != semptr) { 7073311ff84SXin LI svini = (inival < LONG_MAX) 7083311ff84SXin LI ? (long)inival : LONG_MAX; 7093311ff84SXin LI svmax = (maxval < LONG_MAX && maxval > 0) 7103311ff84SXin LI ? (long)maxval : LONG_MAX; 7113311ff84SXin LI semptr->shnd = CreateSemaphore(NULL, svini, svmax, NULL); 7123311ff84SXin LI if (NULL == semptr->shnd) 7133311ff84SXin LI semptr = NULL; 7143311ff84SXin LI } 7153311ff84SXin LI 7163311ff84SXin LI #else 7173311ff84SXin LI 7183311ff84SXin LI (void)maxval; 7193311ff84SXin LI if (semptr && sem_init(semptr, FALSE, inival)) 7203311ff84SXin LI semptr = NULL; 7213311ff84SXin LI 7223311ff84SXin LI #endif 7233311ff84SXin LI 7243311ff84SXin LI return semptr; 7253311ff84SXin LI } 7263311ff84SXin LI 7273311ff84SXin LI /* ------------------------------------------------------------------ */ 7283311ff84SXin LI static sem_ref 7293311ff84SXin LI delete_sema( 7303311ff84SXin LI sem_ref obj) 7313311ff84SXin LI { 7323311ff84SXin LI 7333311ff84SXin LI # ifdef SYS_WINNT 7343311ff84SXin LI 7353311ff84SXin LI if (obj) { 7363311ff84SXin LI if (obj->shnd) 7373311ff84SXin LI CloseHandle(obj->shnd); 7383311ff84SXin LI obj->shnd = NULL; 7393311ff84SXin LI } 7403311ff84SXin LI 7413311ff84SXin LI # else 7423311ff84SXin LI 7433311ff84SXin LI if (obj) 7443311ff84SXin LI sem_destroy(obj); 7453311ff84SXin LI 7463311ff84SXin LI # endif 7473311ff84SXin LI 7483311ff84SXin LI return NULL; 7493311ff84SXin LI } 7503311ff84SXin LI 7513311ff84SXin LI /* -------------------------------------------------------------------- 7522b15cb3dSCy Schubert * prepare_child_sems() 7532b15cb3dSCy Schubert * 7543311ff84SXin LI * create sync & access semaphores 7552b15cb3dSCy Schubert * 7563311ff84SXin LI * All semaphores are cleared, only the access semaphore has 1 unit. 7573311ff84SXin LI * Childs wait on 'workitems_pending', then grabs 'sema_access' 7583311ff84SXin LI * and dequeues jobs. When done, 'sema_access' is given one unit back. 7593311ff84SXin LI * 7603311ff84SXin LI * The producer grabs 'sema_access', manages the queue, restores 7613311ff84SXin LI * 'sema_access' and puts one unit into 'workitems_pending'. 7623311ff84SXin LI * 7633311ff84SXin LI * The story goes the same for the response queue. 7642b15cb3dSCy Schubert */ 7652b15cb3dSCy Schubert static void 7662b15cb3dSCy Schubert prepare_child_sems( 7672b15cb3dSCy Schubert blocking_child *c 7682b15cb3dSCy Schubert ) 7692b15cb3dSCy Schubert { 7704990d495SXin LI if (NULL == worker_memlock) 7714990d495SXin LI worker_memlock = create_sema(&worker_mmutex, 1, 1); 7724990d495SXin LI 7733311ff84SXin LI c->accesslock = create_sema(&c->sem_table[0], 1, 1); 7743311ff84SXin LI c->workitems_pending = create_sema(&c->sem_table[1], 0, 0); 7753311ff84SXin LI c->wake_scheduled_sleep = create_sema(&c->sem_table[2], 0, 1); 7763311ff84SXin LI # ifndef WORK_PIPE 7773311ff84SXin LI c->responses_pending = create_sema(&c->sem_table[3], 0, 0); 7782b15cb3dSCy Schubert # endif 7793311ff84SXin LI } 7802b15cb3dSCy Schubert 7813311ff84SXin LI /* -------------------------------------------------------------------- 7823311ff84SXin LI * wait for semaphore. Where the wait can be interrupted, it will 7833311ff84SXin LI * internally resume -- When this function returns, there is either no 7843311ff84SXin LI * semaphore at all, a timeout occurred, or the caller could 7853311ff84SXin LI * successfully take a token from the semaphore. 7863311ff84SXin LI * 7873311ff84SXin LI * For untimed wait, not checking the result of this function at all is 7883311ff84SXin LI * definitely an option. 7893311ff84SXin LI */ 7902b15cb3dSCy Schubert static int 7912b15cb3dSCy Schubert wait_for_sem( 7922b15cb3dSCy Schubert sem_ref sem, 7932b15cb3dSCy Schubert struct timespec * timeout /* wall-clock */ 7942b15cb3dSCy Schubert ) 7952b15cb3dSCy Schubert #ifdef SYS_WINNT 7962b15cb3dSCy Schubert { 7972b15cb3dSCy Schubert struct timespec now; 7982b15cb3dSCy Schubert struct timespec delta; 7992b15cb3dSCy Schubert DWORD msec; 8002b15cb3dSCy Schubert DWORD rc; 8012b15cb3dSCy Schubert 8023311ff84SXin LI if (!(sem && sem->shnd)) { 8033311ff84SXin LI errno = EINVAL; 8043311ff84SXin LI return -1; 8053311ff84SXin LI } 8063311ff84SXin LI 8072b15cb3dSCy Schubert if (NULL == timeout) { 8082b15cb3dSCy Schubert msec = INFINITE; 8092b15cb3dSCy Schubert } else { 8102b15cb3dSCy Schubert getclock(TIMEOFDAY, &now); 8112b15cb3dSCy Schubert delta = sub_tspec(*timeout, now); 8122b15cb3dSCy Schubert if (delta.tv_sec < 0) { 8132b15cb3dSCy Schubert msec = 0; 8142b15cb3dSCy Schubert } else if ((delta.tv_sec + 1) >= (MAXDWORD / 1000)) { 8152b15cb3dSCy Schubert msec = INFINITE; 8162b15cb3dSCy Schubert } else { 8172b15cb3dSCy Schubert msec = 1000 * (DWORD)delta.tv_sec; 8182b15cb3dSCy Schubert msec += delta.tv_nsec / (1000 * 1000); 8192b15cb3dSCy Schubert } 8202b15cb3dSCy Schubert } 8213311ff84SXin LI rc = WaitForSingleObject(sem->shnd, msec); 8222b15cb3dSCy Schubert if (WAIT_OBJECT_0 == rc) 8232b15cb3dSCy Schubert return 0; 8242b15cb3dSCy Schubert if (WAIT_TIMEOUT == rc) { 8252b15cb3dSCy Schubert errno = ETIMEDOUT; 8262b15cb3dSCy Schubert return -1; 8272b15cb3dSCy Schubert } 8282b15cb3dSCy Schubert msyslog(LOG_ERR, "WaitForSingleObject unexpected 0x%x", rc); 8292b15cb3dSCy Schubert errno = EFAULT; 8302b15cb3dSCy Schubert return -1; 8312b15cb3dSCy Schubert } 8322b15cb3dSCy Schubert #else /* pthreads wait_for_sem() follows */ 8332b15cb3dSCy Schubert { 8343311ff84SXin LI int rc = -1; 8352b15cb3dSCy Schubert 8363311ff84SXin LI if (sem) do { 8372b15cb3dSCy Schubert if (NULL == timeout) 8382b15cb3dSCy Schubert rc = sem_wait(sem); 8392b15cb3dSCy Schubert else 8402b15cb3dSCy Schubert rc = sem_timedwait(sem, timeout); 8413311ff84SXin LI } while (rc == -1 && errno == EINTR); 8423311ff84SXin LI else 8433311ff84SXin LI errno = EINVAL; 8442b15cb3dSCy Schubert 8452b15cb3dSCy Schubert return rc; 8462b15cb3dSCy Schubert } 8472b15cb3dSCy Schubert #endif 8482b15cb3dSCy Schubert 8493311ff84SXin LI /* -------------------------------------------------------------------- 8503311ff84SXin LI * blocking_thread - thread functions have WINAPI (aka 'stdcall') 8513311ff84SXin LI * calling conventions under Windows and POSIX-defined signature 8523311ff84SXin LI * otherwise. 8532b15cb3dSCy Schubert */ 8542b15cb3dSCy Schubert #ifdef SYS_WINNT 8553311ff84SXin LI u_int WINAPI 8562b15cb3dSCy Schubert #else 8572b15cb3dSCy Schubert void * 8582b15cb3dSCy Schubert #endif 8592b15cb3dSCy Schubert blocking_thread( 8602b15cb3dSCy Schubert void * ThreadArg 8612b15cb3dSCy Schubert ) 8622b15cb3dSCy Schubert { 8632b15cb3dSCy Schubert blocking_child *c; 8642b15cb3dSCy Schubert 8652b15cb3dSCy Schubert c = ThreadArg; 8662b15cb3dSCy Schubert exit_worker(blocking_child_common(c)); 8672b15cb3dSCy Schubert 8682b15cb3dSCy Schubert /* NOTREACHED */ 8692b15cb3dSCy Schubert return 0; 8702b15cb3dSCy Schubert } 8712b15cb3dSCy Schubert 8723311ff84SXin LI /* -------------------------------------------------------------------- 8732b15cb3dSCy Schubert * req_child_exit() runs in the parent. 8743311ff84SXin LI * 8753311ff84SXin LI * This function is called from from the idle timer, too, and possibly 8763311ff84SXin LI * without a thread being there any longer. Since we have folded up our 8773311ff84SXin LI * tent in that case and all the semaphores are already gone, we simply 8783311ff84SXin LI * ignore this request in this case. 8793311ff84SXin LI * 8803311ff84SXin LI * Since the existence of the semaphores is controlled exclusively by 8813311ff84SXin LI * the parent, there's no risk of data race here. 8822b15cb3dSCy Schubert */ 8832b15cb3dSCy Schubert int 8842b15cb3dSCy Schubert req_child_exit( 8852b15cb3dSCy Schubert blocking_child *c 8862b15cb3dSCy Schubert ) 8872b15cb3dSCy Schubert { 8883311ff84SXin LI return (c->accesslock) 8893311ff84SXin LI ? queue_req_pointer(c, CHILD_EXIT_REQ) 8903311ff84SXin LI : 0; 8912b15cb3dSCy Schubert } 8922b15cb3dSCy Schubert 8933311ff84SXin LI /* -------------------------------------------------------------------- 8942b15cb3dSCy Schubert * cleanup_after_child() runs in parent. 8952b15cb3dSCy Schubert */ 8962b15cb3dSCy Schubert static void 8972b15cb3dSCy Schubert cleanup_after_child( 8982b15cb3dSCy Schubert blocking_child * c 8992b15cb3dSCy Schubert ) 9002b15cb3dSCy Schubert { 9012b15cb3dSCy Schubert DEBUG_INSIST(!c->reusable); 9023311ff84SXin LI 9032b15cb3dSCy Schubert # ifdef SYS_WINNT 9043311ff84SXin LI /* The thread was not created in detached state, so we better 9053311ff84SXin LI * clean up. 9063311ff84SXin LI */ 9073311ff84SXin LI if (c->thread_ref && c->thread_ref->thnd) { 9083311ff84SXin LI WaitForSingleObject(c->thread_ref->thnd, INFINITE); 9093311ff84SXin LI INSIST(CloseHandle(c->thread_ref->thnd)); 9103311ff84SXin LI c->thread_ref->thnd = NULL; 9113311ff84SXin LI } 9122b15cb3dSCy Schubert # endif 9132b15cb3dSCy Schubert c->thread_ref = NULL; 9143311ff84SXin LI 9153311ff84SXin LI /* remove semaphores and (if signalling vi IO) pipes */ 9163311ff84SXin LI 9173311ff84SXin LI c->accesslock = delete_sema(c->accesslock); 9183311ff84SXin LI c->workitems_pending = delete_sema(c->workitems_pending); 9193311ff84SXin LI c->wake_scheduled_sleep = delete_sema(c->wake_scheduled_sleep); 9203311ff84SXin LI 9212b15cb3dSCy Schubert # ifdef WORK_PIPE 9222b15cb3dSCy Schubert DEBUG_INSIST(-1 != c->resp_read_pipe); 9232b15cb3dSCy Schubert DEBUG_INSIST(-1 != c->resp_write_pipe); 9242b15cb3dSCy Schubert (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, TRUE); 9252b15cb3dSCy Schubert close(c->resp_write_pipe); 9262b15cb3dSCy Schubert close(c->resp_read_pipe); 9272b15cb3dSCy Schubert c->resp_write_pipe = -1; 9282b15cb3dSCy Schubert c->resp_read_pipe = -1; 9292b15cb3dSCy Schubert # else 9303311ff84SXin LI DEBUG_INSIST(NULL != c->responses_pending); 9313311ff84SXin LI (*addremove_io_semaphore)(c->responses_pending->shnd, TRUE); 9323311ff84SXin LI c->responses_pending = delete_sema(c->responses_pending); 9332b15cb3dSCy Schubert # endif 9343311ff84SXin LI 9353311ff84SXin LI /* Is it necessary to check if there are pending requests and 9363311ff84SXin LI * responses? If so, and if there are, what to do with them? 9373311ff84SXin LI */ 9383311ff84SXin LI 9393311ff84SXin LI /* re-init buffer index sequencers */ 9403311ff84SXin LI c->head_workitem = 0; 9413311ff84SXin LI c->tail_workitem = 0; 9423311ff84SXin LI c->head_response = 0; 9433311ff84SXin LI c->tail_response = 0; 9443311ff84SXin LI 9452b15cb3dSCy Schubert c->reusable = TRUE; 9462b15cb3dSCy Schubert } 9472b15cb3dSCy Schubert 9482b15cb3dSCy Schubert 9492b15cb3dSCy Schubert #else /* !WORK_THREAD follows */ 9502b15cb3dSCy Schubert char work_thread_nonempty_compilation_unit; 9512b15cb3dSCy Schubert #endif 952