1 /* 2 * work_thread.c - threads implementation for blocking worker child. 3 */ 4 #include <config.h> 5 #include "ntp_workimpl.h" 6 7 #ifdef WORK_THREAD 8 9 #include <stdio.h> 10 #include <ctype.h> 11 #include <signal.h> 12 #ifndef SYS_WINNT 13 #include <pthread.h> 14 #endif 15 16 #include "ntp_stdlib.h" 17 #include "ntp_malloc.h" 18 #include "ntp_syslog.h" 19 #include "ntpd.h" 20 #include "ntp_io.h" 21 #include "ntp_assert.h" 22 #include "ntp_unixtime.h" 23 #include "timespecops.h" 24 #include "ntp_worker.h" 25 26 #define CHILD_EXIT_REQ ((blocking_pipe_header *)(intptr_t)-1) 27 #define CHILD_GONE_RESP CHILD_EXIT_REQ 28 #define WORKITEMS_ALLOC_INC 16 29 #define RESPONSES_ALLOC_INC 4 30 31 #ifndef THREAD_MINSTACKSIZE 32 #define THREAD_MINSTACKSIZE (64U * 1024) 33 #endif 34 35 #ifdef SYS_WINNT 36 37 # define thread_exit(c) _endthreadex(c) 38 # define tickle_sem(sh) ReleaseSemaphore((sh->shnd), 1, NULL) 39 u_int WINAPI blocking_thread(void *); 40 static BOOL same_os_sema(const sem_ref obj, void * osobj); 41 42 #else 43 44 # define thread_exit(c) pthread_exit((void*)(size_t)(c)) 45 # define tickle_sem sem_post 46 void * blocking_thread(void *); 47 static void block_thread_signals(sigset_t *); 48 49 #endif 50 51 #ifdef WORK_PIPE 52 addremove_io_fd_func addremove_io_fd; 53 #else 54 addremove_io_semaphore_func addremove_io_semaphore; 55 #endif 56 57 static void start_blocking_thread(blocking_child *); 58 static void start_blocking_thread_internal(blocking_child *); 59 static void prepare_child_sems(blocking_child *); 60 static int wait_for_sem(sem_ref, struct timespec *); 61 static int ensure_workitems_empty_slot(blocking_child *); 62 static int ensure_workresp_empty_slot(blocking_child *); 63 static int queue_req_pointer(blocking_child *, blocking_pipe_header *); 64 static void cleanup_after_child(blocking_child *); 65 66 67 void 68 exit_worker( 69 int exitcode 70 ) 71 { 72 thread_exit(exitcode); /* see #define thread_exit */ 73 } 74 75 /* -------------------------------------------------------------------- 76 * sleep for a given time or until the wakup semaphore is tickled. 77 */ 78 int 79 worker_sleep( 80 blocking_child * c, 81 time_t seconds 82 ) 83 { 84 struct timespec until; 85 int rc; 86 87 # ifdef HAVE_CLOCK_GETTIME 88 if (0 != clock_gettime(CLOCK_REALTIME, &until)) { 89 msyslog(LOG_ERR, "worker_sleep: clock_gettime() failed: %m"); 90 return -1; 91 } 92 # else 93 if (0 != getclock(TIMEOFDAY, &until)) { 94 msyslog(LOG_ERR, "worker_sleep: getclock() failed: %m"); 95 return -1; 96 } 97 # endif 98 until.tv_sec += seconds; 99 rc = wait_for_sem(c->wake_scheduled_sleep, &until); 100 if (0 == rc) 101 return -1; 102 if (-1 == rc && ETIMEDOUT == errno) 103 return 0; 104 msyslog(LOG_ERR, "worker_sleep: sem_timedwait: %m"); 105 return -1; 106 } 107 108 109 /* -------------------------------------------------------------------- 110 * Wake up a worker that takes a nap. 111 */ 112 void 113 interrupt_worker_sleep(void) 114 { 115 u_int idx; 116 blocking_child * c; 117 118 for (idx = 0; idx < blocking_children_alloc; idx++) { 119 c = blocking_children[idx]; 120 if (NULL == c || NULL == c->wake_scheduled_sleep) 121 continue; 122 tickle_sem(c->wake_scheduled_sleep); 123 } 124 } 125 126 /* -------------------------------------------------------------------- 127 * Make sure there is an empty slot at the head of the request 128 * queue. Tell if the queue is currently empty. 129 */ 130 static int 131 ensure_workitems_empty_slot( 132 blocking_child *c 133 ) 134 { 135 /* 136 ** !!! PRECONDITION: caller holds access lock! 137 ** 138 ** This simply tries to increase the size of the buffer if it 139 ** becomes full. The resize operation does *not* maintain the 140 ** order of requests, but that should be irrelevant since the 141 ** processing is considered asynchronous anyway. 142 ** 143 ** Return if the buffer is currently empty. 144 */ 145 146 static const size_t each = 147 sizeof(blocking_children[0]->workitems[0]); 148 149 size_t new_alloc; 150 size_t slots_used; 151 152 slots_used = c->head_workitem - c->tail_workitem; 153 if (slots_used >= c->workitems_alloc) { 154 new_alloc = c->workitems_alloc + WORKITEMS_ALLOC_INC; 155 c->workitems = erealloc(c->workitems, new_alloc * each); 156 c->tail_workitem = 0; 157 c->head_workitem = c->workitems_alloc; 158 c->workitems_alloc = new_alloc; 159 } 160 return (0 == slots_used); 161 } 162 163 /* -------------------------------------------------------------------- 164 * Make sure there is an empty slot at the head of the response 165 * queue. Tell if the queue is currently empty. 166 */ 167 static int 168 ensure_workresp_empty_slot( 169 blocking_child *c 170 ) 171 { 172 /* 173 ** !!! PRECONDITION: caller holds access lock! 174 ** 175 ** Works like the companion function above. 176 */ 177 178 static const size_t each = 179 sizeof(blocking_children[0]->responses[0]); 180 181 size_t new_alloc; 182 size_t slots_used; 183 184 slots_used = c->head_response - c->tail_response; 185 if (slots_used >= c->responses_alloc) { 186 new_alloc = c->responses_alloc + RESPONSES_ALLOC_INC; 187 c->responses = erealloc(c->responses, new_alloc * each); 188 c->tail_response = 0; 189 c->head_response = c->responses_alloc; 190 c->responses_alloc = new_alloc; 191 } 192 return (0 == slots_used); 193 } 194 195 196 /* -------------------------------------------------------------------- 197 * queue_req_pointer() - append a work item or idle exit request to 198 * blocking_workitems[]. Employ proper locking. 199 */ 200 static int 201 queue_req_pointer( 202 blocking_child * c, 203 blocking_pipe_header * hdr 204 ) 205 { 206 size_t qhead; 207 208 /* >>>> ACCESS LOCKING STARTS >>>> */ 209 wait_for_sem(c->accesslock, NULL); 210 ensure_workitems_empty_slot(c); 211 qhead = c->head_workitem; 212 c->workitems[qhead % c->workitems_alloc] = hdr; 213 c->head_workitem = 1 + qhead; 214 tickle_sem(c->accesslock); 215 /* <<<< ACCESS LOCKING ENDS <<<< */ 216 217 /* queue consumer wake-up notification */ 218 tickle_sem(c->workitems_pending); 219 220 return 0; 221 } 222 223 /* -------------------------------------------------------------------- 224 * API function to make sure a worker is running, a proper private copy 225 * of the data is made, the data eneterd into the queue and the worker 226 * is signalled. 227 */ 228 int 229 send_blocking_req_internal( 230 blocking_child * c, 231 blocking_pipe_header * hdr, 232 void * data 233 ) 234 { 235 blocking_pipe_header * threadcopy; 236 size_t payload_octets; 237 238 REQUIRE(hdr != NULL); 239 REQUIRE(data != NULL); 240 DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == hdr->magic_sig); 241 242 if (hdr->octets <= sizeof(*hdr)) 243 return 1; /* failure */ 244 payload_octets = hdr->octets - sizeof(*hdr); 245 246 if (NULL == c->thread_ref) 247 start_blocking_thread(c); 248 threadcopy = emalloc(hdr->octets); 249 memcpy(threadcopy, hdr, sizeof(*hdr)); 250 memcpy((char *)threadcopy + sizeof(*hdr), data, payload_octets); 251 252 return queue_req_pointer(c, threadcopy); 253 } 254 255 /* -------------------------------------------------------------------- 256 * Wait for the 'incoming queue no longer empty' signal, lock the shared 257 * structure and dequeue an item. 258 */ 259 blocking_pipe_header * 260 receive_blocking_req_internal( 261 blocking_child * c 262 ) 263 { 264 blocking_pipe_header * req; 265 size_t qhead, qtail; 266 267 req = NULL; 268 do { 269 /* wait for tickle from the producer side */ 270 wait_for_sem(c->workitems_pending, NULL); 271 272 /* >>>> ACCESS LOCKING STARTS >>>> */ 273 wait_for_sem(c->accesslock, NULL); 274 qhead = c->head_workitem; 275 do { 276 qtail = c->tail_workitem; 277 if (qhead == qtail) 278 break; 279 c->tail_workitem = qtail + 1; 280 qtail %= c->workitems_alloc; 281 req = c->workitems[qtail]; 282 c->workitems[qtail] = NULL; 283 } while (NULL == req); 284 tickle_sem(c->accesslock); 285 /* <<<< ACCESS LOCKING ENDS <<<< */ 286 287 } while (NULL == req); 288 289 INSIST(NULL != req); 290 if (CHILD_EXIT_REQ == req) { /* idled out */ 291 send_blocking_resp_internal(c, CHILD_GONE_RESP); 292 req = NULL; 293 } 294 295 return req; 296 } 297 298 /* -------------------------------------------------------------------- 299 * Push a response into the return queue and eventually tickle the 300 * receiver. 301 */ 302 int 303 send_blocking_resp_internal( 304 blocking_child * c, 305 blocking_pipe_header * resp 306 ) 307 { 308 size_t qhead; 309 int empty; 310 311 /* >>>> ACCESS LOCKING STARTS >>>> */ 312 wait_for_sem(c->accesslock, NULL); 313 empty = ensure_workresp_empty_slot(c); 314 qhead = c->head_response; 315 c->responses[qhead % c->responses_alloc] = resp; 316 c->head_response = 1 + qhead; 317 tickle_sem(c->accesslock); 318 /* <<<< ACCESS LOCKING ENDS <<<< */ 319 320 /* queue consumer wake-up notification */ 321 if (empty) 322 { 323 # ifdef WORK_PIPE 324 write(c->resp_write_pipe, "", 1); 325 # else 326 tickle_sem(c->responses_pending); 327 # endif 328 } 329 return 0; 330 } 331 332 333 #ifndef WORK_PIPE 334 335 /* -------------------------------------------------------------------- 336 * Check if a (Windows-)hanndle to a semaphore is actually the same we 337 * are using inside the sema wrapper. 338 */ 339 static BOOL 340 same_os_sema( 341 const sem_ref obj, 342 void* osh 343 ) 344 { 345 return obj && osh && (obj->shnd == (HANDLE)osh); 346 } 347 348 /* -------------------------------------------------------------------- 349 * Find the shared context that associates to an OS handle and make sure 350 * the data is dequeued and processed. 351 */ 352 void 353 handle_blocking_resp_sem( 354 void * context 355 ) 356 { 357 blocking_child * c; 358 u_int idx; 359 360 c = NULL; 361 for (idx = 0; idx < blocking_children_alloc; idx++) { 362 c = blocking_children[idx]; 363 if (c != NULL && 364 c->thread_ref != NULL && 365 same_os_sema(c->responses_pending, context)) 366 break; 367 } 368 if (idx < blocking_children_alloc) 369 process_blocking_resp(c); 370 } 371 #endif /* !WORK_PIPE */ 372 373 /* -------------------------------------------------------------------- 374 * Fetch the next response from the return queue. In case of signalling 375 * via pipe, make sure the pipe is flushed, too. 376 */ 377 blocking_pipe_header * 378 receive_blocking_resp_internal( 379 blocking_child * c 380 ) 381 { 382 blocking_pipe_header * removed; 383 size_t qhead, qtail, slot; 384 385 #ifdef WORK_PIPE 386 int rc; 387 char scratch[32]; 388 389 do 390 rc = read(c->resp_read_pipe, scratch, sizeof(scratch)); 391 while (-1 == rc && EINTR == errno); 392 #endif 393 394 /* >>>> ACCESS LOCKING STARTS >>>> */ 395 wait_for_sem(c->accesslock, NULL); 396 qhead = c->head_response; 397 qtail = c->tail_response; 398 for (removed = NULL; !removed && (qhead != qtail); ++qtail) { 399 slot = qtail % c->responses_alloc; 400 removed = c->responses[slot]; 401 c->responses[slot] = NULL; 402 } 403 c->tail_response = qtail; 404 tickle_sem(c->accesslock); 405 /* <<<< ACCESS LOCKING ENDS <<<< */ 406 407 if (NULL != removed) { 408 DEBUG_ENSURE(CHILD_GONE_RESP == removed || 409 BLOCKING_RESP_MAGIC == removed->magic_sig); 410 } 411 if (CHILD_GONE_RESP == removed) { 412 cleanup_after_child(c); 413 removed = NULL; 414 } 415 416 return removed; 417 } 418 419 /* -------------------------------------------------------------------- 420 * Light up a new worker. 421 */ 422 static void 423 start_blocking_thread( 424 blocking_child * c 425 ) 426 { 427 428 DEBUG_INSIST(!c->reusable); 429 430 prepare_child_sems(c); 431 start_blocking_thread_internal(c); 432 } 433 434 /* -------------------------------------------------------------------- 435 * Create a worker thread. There are several differences between POSIX 436 * and Windows, of course -- most notably the Windows thread is no 437 * detached thread, and we keep the handle around until we want to get 438 * rid of the thread. The notification scheme also differs: Windows 439 * makes use of semaphores in both directions, POSIX uses a pipe for 440 * integration with 'select()' or alike. 441 */ 442 static void 443 start_blocking_thread_internal( 444 blocking_child * c 445 ) 446 #ifdef SYS_WINNT 447 { 448 BOOL resumed; 449 450 c->thread_ref = NULL; 451 (*addremove_io_semaphore)(c->responses_pending->shnd, FALSE); 452 c->thr_table[0].thnd = 453 (HANDLE)_beginthreadex( 454 NULL, 455 0, 456 &blocking_thread, 457 c, 458 CREATE_SUSPENDED, 459 NULL); 460 461 if (NULL == c->thr_table[0].thnd) { 462 msyslog(LOG_ERR, "start blocking thread failed: %m"); 463 exit(-1); 464 } 465 /* remember the thread priority is only within the process class */ 466 if (!SetThreadPriority(c->thr_table[0].thnd, 467 THREAD_PRIORITY_BELOW_NORMAL)) 468 msyslog(LOG_ERR, "Error lowering blocking thread priority: %m"); 469 470 resumed = ResumeThread(c->thr_table[0].thnd); 471 DEBUG_INSIST(resumed); 472 c->thread_ref = &c->thr_table[0]; 473 } 474 #else /* pthreads start_blocking_thread_internal() follows */ 475 { 476 # ifdef NEED_PTHREAD_INIT 477 static int pthread_init_called; 478 # endif 479 pthread_attr_t thr_attr; 480 int rc; 481 int saved_errno; 482 int pipe_ends[2]; /* read then write */ 483 int is_pipe; 484 int flags; 485 size_t stacksize; 486 sigset_t saved_sig_mask; 487 488 c->thread_ref = NULL; 489 490 # ifdef NEED_PTHREAD_INIT 491 /* 492 * from lib/isc/unix/app.c: 493 * BSDI 3.1 seg faults in pthread_sigmask() if we don't do this. 494 */ 495 if (!pthread_init_called) { 496 pthread_init(); 497 pthread_init_called = TRUE; 498 } 499 # endif 500 501 rc = pipe_socketpair(&pipe_ends[0], &is_pipe); 502 if (0 != rc) { 503 msyslog(LOG_ERR, "start_blocking_thread: pipe_socketpair() %m"); 504 exit(1); 505 } 506 c->resp_read_pipe = move_fd(pipe_ends[0]); 507 c->resp_write_pipe = move_fd(pipe_ends[1]); 508 c->ispipe = is_pipe; 509 flags = fcntl(c->resp_read_pipe, F_GETFL, 0); 510 if (-1 == flags) { 511 msyslog(LOG_ERR, "start_blocking_thread: fcntl(F_GETFL) %m"); 512 exit(1); 513 } 514 rc = fcntl(c->resp_read_pipe, F_SETFL, O_NONBLOCK | flags); 515 if (-1 == rc) { 516 msyslog(LOG_ERR, 517 "start_blocking_thread: fcntl(F_SETFL, O_NONBLOCK) %m"); 518 exit(1); 519 } 520 (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, FALSE); 521 pthread_attr_init(&thr_attr); 522 pthread_attr_setdetachstate(&thr_attr, PTHREAD_CREATE_DETACHED); 523 #if defined(HAVE_PTHREAD_ATTR_GETSTACKSIZE) && \ 524 defined(HAVE_PTHREAD_ATTR_SETSTACKSIZE) 525 rc = pthread_attr_getstacksize(&thr_attr, &stacksize); 526 if (-1 == rc) { 527 msyslog(LOG_ERR, 528 "start_blocking_thread: pthread_attr_getstacksize %m"); 529 } else if (stacksize < THREAD_MINSTACKSIZE) { 530 rc = pthread_attr_setstacksize(&thr_attr, 531 THREAD_MINSTACKSIZE); 532 if (-1 == rc) 533 msyslog(LOG_ERR, 534 "start_blocking_thread: pthread_attr_setstacksize(0x%lx -> 0x%lx) %m", 535 (u_long)stacksize, 536 (u_long)THREAD_MINSTACKSIZE); 537 } 538 #else 539 UNUSED_ARG(stacksize); 540 #endif 541 #if defined(PTHREAD_SCOPE_SYSTEM) && defined(NEED_PTHREAD_SCOPE_SYSTEM) 542 pthread_attr_setscope(&thr_attr, PTHREAD_SCOPE_SYSTEM); 543 #endif 544 c->thread_ref = emalloc_zero(sizeof(*c->thread_ref)); 545 block_thread_signals(&saved_sig_mask); 546 rc = pthread_create(&c->thr_table[0], &thr_attr, 547 &blocking_thread, c); 548 saved_errno = errno; 549 pthread_sigmask(SIG_SETMASK, &saved_sig_mask, NULL); 550 pthread_attr_destroy(&thr_attr); 551 if (0 != rc) { 552 errno = saved_errno; 553 msyslog(LOG_ERR, "pthread_create() blocking child: %m"); 554 exit(1); 555 } 556 c->thread_ref = &c->thr_table[0]; 557 } 558 #endif 559 560 /* -------------------------------------------------------------------- 561 * block_thread_signals() 562 * 563 * Temporarily block signals used by ntpd main thread, so that signal 564 * mask inherited by child threads leaves them blocked. Returns prior 565 * active signal mask via pmask, to be restored by the main thread 566 * after pthread_create(). 567 */ 568 #ifndef SYS_WINNT 569 void 570 block_thread_signals( 571 sigset_t * pmask 572 ) 573 { 574 sigset_t block; 575 576 sigemptyset(&block); 577 # ifdef HAVE_SIGNALED_IO 578 # ifdef SIGIO 579 sigaddset(&block, SIGIO); 580 # endif 581 # ifdef SIGPOLL 582 sigaddset(&block, SIGPOLL); 583 # endif 584 # endif /* HAVE_SIGNALED_IO */ 585 sigaddset(&block, SIGALRM); 586 sigaddset(&block, MOREDEBUGSIG); 587 sigaddset(&block, LESSDEBUGSIG); 588 # ifdef SIGDIE1 589 sigaddset(&block, SIGDIE1); 590 # endif 591 # ifdef SIGDIE2 592 sigaddset(&block, SIGDIE2); 593 # endif 594 # ifdef SIGDIE3 595 sigaddset(&block, SIGDIE3); 596 # endif 597 # ifdef SIGDIE4 598 sigaddset(&block, SIGDIE4); 599 # endif 600 # ifdef SIGBUS 601 sigaddset(&block, SIGBUS); 602 # endif 603 sigemptyset(pmask); 604 pthread_sigmask(SIG_BLOCK, &block, pmask); 605 } 606 #endif /* !SYS_WINNT */ 607 608 609 /* -------------------------------------------------------------------- 610 * Create & destroy semaphores. This is sufficiently different between 611 * POSIX and Windows to warrant wrapper functions and close enough to 612 * use the concept of synchronization via semaphore for all platforms. 613 */ 614 static sem_ref 615 create_sema( 616 sema_type* semptr, 617 u_int inival, 618 u_int maxval) 619 { 620 #ifdef SYS_WINNT 621 622 long svini, svmax; 623 if (NULL != semptr) { 624 svini = (inival < LONG_MAX) 625 ? (long)inival : LONG_MAX; 626 svmax = (maxval < LONG_MAX && maxval > 0) 627 ? (long)maxval : LONG_MAX; 628 semptr->shnd = CreateSemaphore(NULL, svini, svmax, NULL); 629 if (NULL == semptr->shnd) 630 semptr = NULL; 631 } 632 633 #else 634 635 (void)maxval; 636 if (semptr && sem_init(semptr, FALSE, inival)) 637 semptr = NULL; 638 639 #endif 640 641 return semptr; 642 } 643 644 /* ------------------------------------------------------------------ */ 645 static sem_ref 646 delete_sema( 647 sem_ref obj) 648 { 649 650 # ifdef SYS_WINNT 651 652 if (obj) { 653 if (obj->shnd) 654 CloseHandle(obj->shnd); 655 obj->shnd = NULL; 656 } 657 658 # else 659 660 if (obj) 661 sem_destroy(obj); 662 663 # endif 664 665 return NULL; 666 } 667 668 /* -------------------------------------------------------------------- 669 * prepare_child_sems() 670 * 671 * create sync & access semaphores 672 * 673 * All semaphores are cleared, only the access semaphore has 1 unit. 674 * Childs wait on 'workitems_pending', then grabs 'sema_access' 675 * and dequeues jobs. When done, 'sema_access' is given one unit back. 676 * 677 * The producer grabs 'sema_access', manages the queue, restores 678 * 'sema_access' and puts one unit into 'workitems_pending'. 679 * 680 * The story goes the same for the response queue. 681 */ 682 static void 683 prepare_child_sems( 684 blocking_child *c 685 ) 686 { 687 c->accesslock = create_sema(&c->sem_table[0], 1, 1); 688 c->workitems_pending = create_sema(&c->sem_table[1], 0, 0); 689 c->wake_scheduled_sleep = create_sema(&c->sem_table[2], 0, 1); 690 # ifndef WORK_PIPE 691 c->responses_pending = create_sema(&c->sem_table[3], 0, 0); 692 # endif 693 } 694 695 /* -------------------------------------------------------------------- 696 * wait for semaphore. Where the wait can be interrupted, it will 697 * internally resume -- When this function returns, there is either no 698 * semaphore at all, a timeout occurred, or the caller could 699 * successfully take a token from the semaphore. 700 * 701 * For untimed wait, not checking the result of this function at all is 702 * definitely an option. 703 */ 704 static int 705 wait_for_sem( 706 sem_ref sem, 707 struct timespec * timeout /* wall-clock */ 708 ) 709 #ifdef SYS_WINNT 710 { 711 struct timespec now; 712 struct timespec delta; 713 DWORD msec; 714 DWORD rc; 715 716 if (!(sem && sem->shnd)) { 717 errno = EINVAL; 718 return -1; 719 } 720 721 if (NULL == timeout) { 722 msec = INFINITE; 723 } else { 724 getclock(TIMEOFDAY, &now); 725 delta = sub_tspec(*timeout, now); 726 if (delta.tv_sec < 0) { 727 msec = 0; 728 } else if ((delta.tv_sec + 1) >= (MAXDWORD / 1000)) { 729 msec = INFINITE; 730 } else { 731 msec = 1000 * (DWORD)delta.tv_sec; 732 msec += delta.tv_nsec / (1000 * 1000); 733 } 734 } 735 rc = WaitForSingleObject(sem->shnd, msec); 736 if (WAIT_OBJECT_0 == rc) 737 return 0; 738 if (WAIT_TIMEOUT == rc) { 739 errno = ETIMEDOUT; 740 return -1; 741 } 742 msyslog(LOG_ERR, "WaitForSingleObject unexpected 0x%x", rc); 743 errno = EFAULT; 744 return -1; 745 } 746 #else /* pthreads wait_for_sem() follows */ 747 { 748 int rc = -1; 749 750 if (sem) do { 751 if (NULL == timeout) 752 rc = sem_wait(sem); 753 else 754 rc = sem_timedwait(sem, timeout); 755 } while (rc == -1 && errno == EINTR); 756 else 757 errno = EINVAL; 758 759 return rc; 760 } 761 #endif 762 763 /* -------------------------------------------------------------------- 764 * blocking_thread - thread functions have WINAPI (aka 'stdcall') 765 * calling conventions under Windows and POSIX-defined signature 766 * otherwise. 767 */ 768 #ifdef SYS_WINNT 769 u_int WINAPI 770 #else 771 void * 772 #endif 773 blocking_thread( 774 void * ThreadArg 775 ) 776 { 777 blocking_child *c; 778 779 c = ThreadArg; 780 exit_worker(blocking_child_common(c)); 781 782 /* NOTREACHED */ 783 return 0; 784 } 785 786 /* -------------------------------------------------------------------- 787 * req_child_exit() runs in the parent. 788 * 789 * This function is called from from the idle timer, too, and possibly 790 * without a thread being there any longer. Since we have folded up our 791 * tent in that case and all the semaphores are already gone, we simply 792 * ignore this request in this case. 793 * 794 * Since the existence of the semaphores is controlled exclusively by 795 * the parent, there's no risk of data race here. 796 */ 797 int 798 req_child_exit( 799 blocking_child *c 800 ) 801 { 802 return (c->accesslock) 803 ? queue_req_pointer(c, CHILD_EXIT_REQ) 804 : 0; 805 } 806 807 /* -------------------------------------------------------------------- 808 * cleanup_after_child() runs in parent. 809 */ 810 static void 811 cleanup_after_child( 812 blocking_child * c 813 ) 814 { 815 DEBUG_INSIST(!c->reusable); 816 817 # ifdef SYS_WINNT 818 /* The thread was not created in detached state, so we better 819 * clean up. 820 */ 821 if (c->thread_ref && c->thread_ref->thnd) { 822 WaitForSingleObject(c->thread_ref->thnd, INFINITE); 823 INSIST(CloseHandle(c->thread_ref->thnd)); 824 c->thread_ref->thnd = NULL; 825 } 826 # endif 827 c->thread_ref = NULL; 828 829 /* remove semaphores and (if signalling vi IO) pipes */ 830 831 c->accesslock = delete_sema(c->accesslock); 832 c->workitems_pending = delete_sema(c->workitems_pending); 833 c->wake_scheduled_sleep = delete_sema(c->wake_scheduled_sleep); 834 835 # ifdef WORK_PIPE 836 DEBUG_INSIST(-1 != c->resp_read_pipe); 837 DEBUG_INSIST(-1 != c->resp_write_pipe); 838 (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, TRUE); 839 close(c->resp_write_pipe); 840 close(c->resp_read_pipe); 841 c->resp_write_pipe = -1; 842 c->resp_read_pipe = -1; 843 # else 844 DEBUG_INSIST(NULL != c->responses_pending); 845 (*addremove_io_semaphore)(c->responses_pending->shnd, TRUE); 846 c->responses_pending = delete_sema(c->responses_pending); 847 # endif 848 849 /* Is it necessary to check if there are pending requests and 850 * responses? If so, and if there are, what to do with them? 851 */ 852 853 /* re-init buffer index sequencers */ 854 c->head_workitem = 0; 855 c->tail_workitem = 0; 856 c->head_response = 0; 857 c->tail_response = 0; 858 859 c->reusable = TRUE; 860 } 861 862 863 #else /* !WORK_THREAD follows */ 864 char work_thread_nonempty_compilation_unit; 865 #endif 866