1 /* 2 * work_thread.c - threads implementation for blocking worker child. 3 */ 4 #include <config.h> 5 #include "ntp_workimpl.h" 6 7 #ifdef WORK_THREAD 8 9 #include <stdio.h> 10 #include <ctype.h> 11 #include <signal.h> 12 #ifndef SYS_WINNT 13 #include <pthread.h> 14 #endif 15 16 #include "ntp_stdlib.h" 17 #include "ntp_malloc.h" 18 #include "ntp_syslog.h" 19 #include "ntpd.h" 20 #include "ntp_io.h" 21 #include "ntp_assert.h" 22 #include "ntp_unixtime.h" 23 #include "timespecops.h" 24 #include "ntp_worker.h" 25 26 #define CHILD_EXIT_REQ ((blocking_pipe_header *)(intptr_t)-1) 27 #define CHILD_GONE_RESP CHILD_EXIT_REQ 28 /* Queue size increments: 29 * The request queue grows a bit faster than the response queue -- the 30 * daemon can push requests and pull results faster on avarage than the 31 * worker can process requests and push results... If this really pays 32 * off is debatable. 33 */ 34 #define WORKITEMS_ALLOC_INC 16 35 #define RESPONSES_ALLOC_INC 4 36 37 /* Fiddle with min/max stack sizes. 64kB minimum seems to work, so we 38 * set the maximum to 256kB. If the minimum goes below the 39 * system-defined minimum stack size, we have to adjust accordingly. 40 */ 41 #ifndef THREAD_MINSTACKSIZE 42 # define THREAD_MINSTACKSIZE (64U * 1024) 43 #endif 44 #ifndef __sun 45 #if defined(PTHREAD_STACK_MIN) && THREAD_MINSTACKSIZE < PTHREAD_STACK_MIN 46 # undef THREAD_MINSTACKSIZE 47 # define THREAD_MINSTACKSIZE PTHREAD_STACK_MIN 48 #endif 49 #endif 50 51 #ifndef THREAD_MAXSTACKSIZE 52 # define THREAD_MAXSTACKSIZE (256U * 1024) 53 #endif 54 #if THREAD_MAXSTACKSIZE < THREAD_MINSTACKSIZE 55 # undef THREAD_MAXSTACKSIZE 56 # define THREAD_MAXSTACKSIZE THREAD_MINSTACKSIZE 57 #endif 58 59 60 #ifdef SYS_WINNT 61 62 # define thread_exit(c) _endthreadex(c) 63 # define tickle_sem(sh) ReleaseSemaphore((sh->shnd), 1, NULL) 64 u_int WINAPI blocking_thread(void *); 65 static BOOL same_os_sema(const sem_ref obj, void * osobj); 66 67 #else 68 69 # define thread_exit(c) pthread_exit((void*)(size_t)(c)) 70 # define tickle_sem sem_post 71 void * blocking_thread(void *); 72 static void block_thread_signals(sigset_t *); 73 74 #endif 75 76 #ifdef WORK_PIPE 77 addremove_io_fd_func addremove_io_fd; 78 #else 79 addremove_io_semaphore_func addremove_io_semaphore; 80 #endif 81 82 static void start_blocking_thread(blocking_child *); 83 static void start_blocking_thread_internal(blocking_child *); 84 static void prepare_child_sems(blocking_child *); 85 static int wait_for_sem(sem_ref, struct timespec *); 86 static int ensure_workitems_empty_slot(blocking_child *); 87 static int ensure_workresp_empty_slot(blocking_child *); 88 static int queue_req_pointer(blocking_child *, blocking_pipe_header *); 89 static void cleanup_after_child(blocking_child *); 90 91 static sema_type worker_mmutex; 92 static sem_ref worker_memlock; 93 94 /* -------------------------------------------------------------------- 95 * locking the global worker state table (and other global stuff) 96 */ 97 void 98 worker_global_lock( 99 int inOrOut) 100 { 101 if (worker_memlock) { 102 if (inOrOut) 103 wait_for_sem(worker_memlock, NULL); 104 else 105 tickle_sem(worker_memlock); 106 } 107 } 108 109 /* -------------------------------------------------------------------- 110 * implementation isolation wrapper 111 */ 112 void 113 exit_worker( 114 int exitcode 115 ) 116 { 117 thread_exit(exitcode); /* see #define thread_exit */ 118 } 119 120 /* -------------------------------------------------------------------- 121 * sleep for a given time or until the wakup semaphore is tickled. 122 */ 123 int 124 worker_sleep( 125 blocking_child * c, 126 time_t seconds 127 ) 128 { 129 struct timespec until; 130 int rc; 131 132 # ifdef HAVE_CLOCK_GETTIME 133 if (0 != clock_gettime(CLOCK_REALTIME, &until)) { 134 msyslog(LOG_ERR, "worker_sleep: clock_gettime() failed: %m"); 135 return -1; 136 } 137 # else 138 if (0 != getclock(TIMEOFDAY, &until)) { 139 msyslog(LOG_ERR, "worker_sleep: getclock() failed: %m"); 140 return -1; 141 } 142 # endif 143 until.tv_sec += seconds; 144 rc = wait_for_sem(c->wake_scheduled_sleep, &until); 145 if (0 == rc) 146 return -1; 147 if (-1 == rc && ETIMEDOUT == errno) 148 return 0; 149 msyslog(LOG_ERR, "worker_sleep: sem_timedwait: %m"); 150 return -1; 151 } 152 153 154 /* -------------------------------------------------------------------- 155 * Wake up a worker that takes a nap. 156 */ 157 void 158 interrupt_worker_sleep(void) 159 { 160 u_int idx; 161 blocking_child * c; 162 163 for (idx = 0; idx < blocking_children_alloc; idx++) { 164 c = blocking_children[idx]; 165 if (NULL == c || NULL == c->wake_scheduled_sleep) 166 continue; 167 tickle_sem(c->wake_scheduled_sleep); 168 } 169 } 170 171 /* -------------------------------------------------------------------- 172 * Make sure there is an empty slot at the head of the request 173 * queue. Tell if the queue is currently empty. 174 */ 175 static int 176 ensure_workitems_empty_slot( 177 blocking_child *c 178 ) 179 { 180 /* 181 ** !!! PRECONDITION: caller holds access lock! 182 ** 183 ** This simply tries to increase the size of the buffer if it 184 ** becomes full. The resize operation does *not* maintain the 185 ** order of requests, but that should be irrelevant since the 186 ** processing is considered asynchronous anyway. 187 ** 188 ** Return if the buffer is currently empty. 189 */ 190 191 static const size_t each = 192 sizeof(blocking_children[0]->workitems[0]); 193 194 size_t new_alloc; 195 size_t slots_used; 196 size_t sidx; 197 198 slots_used = c->head_workitem - c->tail_workitem; 199 if (slots_used >= c->workitems_alloc) { 200 new_alloc = c->workitems_alloc + WORKITEMS_ALLOC_INC; 201 c->workitems = erealloc(c->workitems, new_alloc * each); 202 for (sidx = c->workitems_alloc; sidx < new_alloc; ++sidx) 203 c->workitems[sidx] = NULL; 204 c->tail_workitem = 0; 205 c->head_workitem = c->workitems_alloc; 206 c->workitems_alloc = new_alloc; 207 } 208 INSIST(NULL == c->workitems[c->head_workitem % c->workitems_alloc]); 209 return (0 == slots_used); 210 } 211 212 /* -------------------------------------------------------------------- 213 * Make sure there is an empty slot at the head of the response 214 * queue. Tell if the queue is currently empty. 215 */ 216 static int 217 ensure_workresp_empty_slot( 218 blocking_child *c 219 ) 220 { 221 /* 222 ** !!! PRECONDITION: caller holds access lock! 223 ** 224 ** Works like the companion function above. 225 */ 226 227 static const size_t each = 228 sizeof(blocking_children[0]->responses[0]); 229 230 size_t new_alloc; 231 size_t slots_used; 232 size_t sidx; 233 234 slots_used = c->head_response - c->tail_response; 235 if (slots_used >= c->responses_alloc) { 236 new_alloc = c->responses_alloc + RESPONSES_ALLOC_INC; 237 c->responses = erealloc(c->responses, new_alloc * each); 238 for (sidx = c->responses_alloc; sidx < new_alloc; ++sidx) 239 c->responses[sidx] = NULL; 240 c->tail_response = 0; 241 c->head_response = c->responses_alloc; 242 c->responses_alloc = new_alloc; 243 } 244 INSIST(NULL == c->responses[c->head_response % c->responses_alloc]); 245 return (0 == slots_used); 246 } 247 248 249 /* -------------------------------------------------------------------- 250 * queue_req_pointer() - append a work item or idle exit request to 251 * blocking_workitems[]. Employ proper locking. 252 */ 253 static int 254 queue_req_pointer( 255 blocking_child * c, 256 blocking_pipe_header * hdr 257 ) 258 { 259 size_t qhead; 260 261 /* >>>> ACCESS LOCKING STARTS >>>> */ 262 wait_for_sem(c->accesslock, NULL); 263 ensure_workitems_empty_slot(c); 264 qhead = c->head_workitem; 265 c->workitems[qhead % c->workitems_alloc] = hdr; 266 c->head_workitem = 1 + qhead; 267 tickle_sem(c->accesslock); 268 /* <<<< ACCESS LOCKING ENDS <<<< */ 269 270 /* queue consumer wake-up notification */ 271 tickle_sem(c->workitems_pending); 272 273 return 0; 274 } 275 276 /* -------------------------------------------------------------------- 277 * API function to make sure a worker is running, a proper private copy 278 * of the data is made, the data eneterd into the queue and the worker 279 * is signalled. 280 */ 281 int 282 send_blocking_req_internal( 283 blocking_child * c, 284 blocking_pipe_header * hdr, 285 void * data 286 ) 287 { 288 blocking_pipe_header * threadcopy; 289 size_t payload_octets; 290 291 REQUIRE(hdr != NULL); 292 REQUIRE(data != NULL); 293 DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == hdr->magic_sig); 294 295 if (hdr->octets <= sizeof(*hdr)) 296 return 1; /* failure */ 297 payload_octets = hdr->octets - sizeof(*hdr); 298 299 if (NULL == c->thread_ref) 300 start_blocking_thread(c); 301 threadcopy = emalloc(hdr->octets); 302 memcpy(threadcopy, hdr, sizeof(*hdr)); 303 memcpy((char *)threadcopy + sizeof(*hdr), data, payload_octets); 304 305 return queue_req_pointer(c, threadcopy); 306 } 307 308 /* -------------------------------------------------------------------- 309 * Wait for the 'incoming queue no longer empty' signal, lock the shared 310 * structure and dequeue an item. 311 */ 312 blocking_pipe_header * 313 receive_blocking_req_internal( 314 blocking_child * c 315 ) 316 { 317 blocking_pipe_header * req; 318 size_t qhead, qtail; 319 320 req = NULL; 321 do { 322 /* wait for tickle from the producer side */ 323 wait_for_sem(c->workitems_pending, NULL); 324 325 /* >>>> ACCESS LOCKING STARTS >>>> */ 326 wait_for_sem(c->accesslock, NULL); 327 qhead = c->head_workitem; 328 do { 329 qtail = c->tail_workitem; 330 if (qhead == qtail) 331 break; 332 c->tail_workitem = qtail + 1; 333 qtail %= c->workitems_alloc; 334 req = c->workitems[qtail]; 335 c->workitems[qtail] = NULL; 336 } while (NULL == req); 337 tickle_sem(c->accesslock); 338 /* <<<< ACCESS LOCKING ENDS <<<< */ 339 340 } while (NULL == req); 341 342 INSIST(NULL != req); 343 if (CHILD_EXIT_REQ == req) { /* idled out */ 344 send_blocking_resp_internal(c, CHILD_GONE_RESP); 345 req = NULL; 346 } 347 348 return req; 349 } 350 351 /* -------------------------------------------------------------------- 352 * Push a response into the return queue and eventually tickle the 353 * receiver. 354 */ 355 int 356 send_blocking_resp_internal( 357 blocking_child * c, 358 blocking_pipe_header * resp 359 ) 360 { 361 size_t qhead; 362 int empty; 363 364 /* >>>> ACCESS LOCKING STARTS >>>> */ 365 wait_for_sem(c->accesslock, NULL); 366 empty = ensure_workresp_empty_slot(c); 367 qhead = c->head_response; 368 c->responses[qhead % c->responses_alloc] = resp; 369 c->head_response = 1 + qhead; 370 tickle_sem(c->accesslock); 371 /* <<<< ACCESS LOCKING ENDS <<<< */ 372 373 /* queue consumer wake-up notification */ 374 if (empty) 375 { 376 # ifdef WORK_PIPE 377 write(c->resp_write_pipe, "", 1); 378 # else 379 tickle_sem(c->responses_pending); 380 # endif 381 } 382 return 0; 383 } 384 385 386 #ifndef WORK_PIPE 387 388 /* -------------------------------------------------------------------- 389 * Check if a (Windows-)hanndle to a semaphore is actually the same we 390 * are using inside the sema wrapper. 391 */ 392 static BOOL 393 same_os_sema( 394 const sem_ref obj, 395 void* osh 396 ) 397 { 398 return obj && osh && (obj->shnd == (HANDLE)osh); 399 } 400 401 /* -------------------------------------------------------------------- 402 * Find the shared context that associates to an OS handle and make sure 403 * the data is dequeued and processed. 404 */ 405 void 406 handle_blocking_resp_sem( 407 void * context 408 ) 409 { 410 blocking_child * c; 411 u_int idx; 412 413 c = NULL; 414 for (idx = 0; idx < blocking_children_alloc; idx++) { 415 c = blocking_children[idx]; 416 if (c != NULL && 417 c->thread_ref != NULL && 418 same_os_sema(c->responses_pending, context)) 419 break; 420 } 421 if (idx < blocking_children_alloc) 422 process_blocking_resp(c); 423 } 424 #endif /* !WORK_PIPE */ 425 426 /* -------------------------------------------------------------------- 427 * Fetch the next response from the return queue. In case of signalling 428 * via pipe, make sure the pipe is flushed, too. 429 */ 430 blocking_pipe_header * 431 receive_blocking_resp_internal( 432 blocking_child * c 433 ) 434 { 435 blocking_pipe_header * removed; 436 size_t qhead, qtail, slot; 437 438 #ifdef WORK_PIPE 439 int rc; 440 char scratch[32]; 441 442 do 443 rc = read(c->resp_read_pipe, scratch, sizeof(scratch)); 444 while (-1 == rc && EINTR == errno); 445 #endif 446 447 /* >>>> ACCESS LOCKING STARTS >>>> */ 448 wait_for_sem(c->accesslock, NULL); 449 qhead = c->head_response; 450 qtail = c->tail_response; 451 for (removed = NULL; !removed && (qhead != qtail); ++qtail) { 452 slot = qtail % c->responses_alloc; 453 removed = c->responses[slot]; 454 c->responses[slot] = NULL; 455 } 456 c->tail_response = qtail; 457 tickle_sem(c->accesslock); 458 /* <<<< ACCESS LOCKING ENDS <<<< */ 459 460 if (NULL != removed) { 461 DEBUG_ENSURE(CHILD_GONE_RESP == removed || 462 BLOCKING_RESP_MAGIC == removed->magic_sig); 463 } 464 if (CHILD_GONE_RESP == removed) { 465 cleanup_after_child(c); 466 removed = NULL; 467 } 468 469 return removed; 470 } 471 472 /* -------------------------------------------------------------------- 473 * Light up a new worker. 474 */ 475 static void 476 start_blocking_thread( 477 blocking_child * c 478 ) 479 { 480 481 DEBUG_INSIST(!c->reusable); 482 483 prepare_child_sems(c); 484 start_blocking_thread_internal(c); 485 } 486 487 /* -------------------------------------------------------------------- 488 * Create a worker thread. There are several differences between POSIX 489 * and Windows, of course -- most notably the Windows thread is no 490 * detached thread, and we keep the handle around until we want to get 491 * rid of the thread. The notification scheme also differs: Windows 492 * makes use of semaphores in both directions, POSIX uses a pipe for 493 * integration with 'select()' or alike. 494 */ 495 static void 496 start_blocking_thread_internal( 497 blocking_child * c 498 ) 499 #ifdef SYS_WINNT 500 { 501 BOOL resumed; 502 503 c->thread_ref = NULL; 504 (*addremove_io_semaphore)(c->responses_pending->shnd, FALSE); 505 c->thr_table[0].thnd = 506 (HANDLE)_beginthreadex( 507 NULL, 508 0, 509 &blocking_thread, 510 c, 511 CREATE_SUSPENDED, 512 NULL); 513 514 if (NULL == c->thr_table[0].thnd) { 515 msyslog(LOG_ERR, "start blocking thread failed: %m"); 516 exit(-1); 517 } 518 /* remember the thread priority is only within the process class */ 519 if (!SetThreadPriority(c->thr_table[0].thnd, 520 THREAD_PRIORITY_BELOW_NORMAL)) 521 msyslog(LOG_ERR, "Error lowering blocking thread priority: %m"); 522 523 resumed = ResumeThread(c->thr_table[0].thnd); 524 DEBUG_INSIST(resumed); 525 c->thread_ref = &c->thr_table[0]; 526 } 527 #else /* pthreads start_blocking_thread_internal() follows */ 528 { 529 # ifdef NEED_PTHREAD_INIT 530 static int pthread_init_called; 531 # endif 532 pthread_attr_t thr_attr; 533 int rc; 534 int pipe_ends[2]; /* read then write */ 535 int is_pipe; 536 int flags; 537 size_t ostacksize; 538 size_t nstacksize; 539 sigset_t saved_sig_mask; 540 541 c->thread_ref = NULL; 542 543 # ifdef NEED_PTHREAD_INIT 544 /* 545 * from lib/isc/unix/app.c: 546 * BSDI 3.1 seg faults in pthread_sigmask() if we don't do this. 547 */ 548 if (!pthread_init_called) { 549 pthread_init(); 550 pthread_init_called = TRUE; 551 } 552 # endif 553 554 rc = pipe_socketpair(&pipe_ends[0], &is_pipe); 555 if (0 != rc) { 556 msyslog(LOG_ERR, "start_blocking_thread: pipe_socketpair() %m"); 557 exit(1); 558 } 559 c->resp_read_pipe = move_fd(pipe_ends[0]); 560 c->resp_write_pipe = move_fd(pipe_ends[1]); 561 c->ispipe = is_pipe; 562 flags = fcntl(c->resp_read_pipe, F_GETFL, 0); 563 if (-1 == flags) { 564 msyslog(LOG_ERR, "start_blocking_thread: fcntl(F_GETFL) %m"); 565 exit(1); 566 } 567 rc = fcntl(c->resp_read_pipe, F_SETFL, O_NONBLOCK | flags); 568 if (-1 == rc) { 569 msyslog(LOG_ERR, 570 "start_blocking_thread: fcntl(F_SETFL, O_NONBLOCK) %m"); 571 exit(1); 572 } 573 (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, FALSE); 574 pthread_attr_init(&thr_attr); 575 pthread_attr_setdetachstate(&thr_attr, PTHREAD_CREATE_DETACHED); 576 #if defined(HAVE_PTHREAD_ATTR_GETSTACKSIZE) && \ 577 defined(HAVE_PTHREAD_ATTR_SETSTACKSIZE) 578 rc = pthread_attr_getstacksize(&thr_attr, &ostacksize); 579 if (0 != rc) { 580 msyslog(LOG_ERR, 581 "start_blocking_thread: pthread_attr_getstacksize() -> %s", 582 strerror(rc)); 583 } else { 584 if (ostacksize < THREAD_MINSTACKSIZE) 585 nstacksize = THREAD_MINSTACKSIZE; 586 else if (ostacksize > THREAD_MAXSTACKSIZE) 587 nstacksize = THREAD_MAXSTACKSIZE; 588 else 589 nstacksize = ostacksize; 590 if (nstacksize != ostacksize) 591 rc = pthread_attr_setstacksize(&thr_attr, nstacksize); 592 if (0 != rc) 593 msyslog(LOG_ERR, 594 "start_blocking_thread: pthread_attr_setstacksize(0x%lx -> 0x%lx) -> %s", 595 (u_long)ostacksize, (u_long)nstacksize, 596 strerror(rc)); 597 } 598 #else 599 UNUSED_ARG(nstacksize); 600 UNUSED_ARG(ostacksize); 601 #endif 602 #if defined(PTHREAD_SCOPE_SYSTEM) && defined(NEED_PTHREAD_SCOPE_SYSTEM) 603 pthread_attr_setscope(&thr_attr, PTHREAD_SCOPE_SYSTEM); 604 #endif 605 c->thread_ref = emalloc_zero(sizeof(*c->thread_ref)); 606 block_thread_signals(&saved_sig_mask); 607 rc = pthread_create(&c->thr_table[0], &thr_attr, 608 &blocking_thread, c); 609 pthread_sigmask(SIG_SETMASK, &saved_sig_mask, NULL); 610 pthread_attr_destroy(&thr_attr); 611 if (0 != rc) { 612 msyslog(LOG_ERR, "start_blocking_thread: pthread_create() -> %s", 613 strerror(rc)); 614 exit(1); 615 } 616 c->thread_ref = &c->thr_table[0]; 617 } 618 #endif 619 620 /* -------------------------------------------------------------------- 621 * block_thread_signals() 622 * 623 * Temporarily block signals used by ntpd main thread, so that signal 624 * mask inherited by child threads leaves them blocked. Returns prior 625 * active signal mask via pmask, to be restored by the main thread 626 * after pthread_create(). 627 */ 628 #ifndef SYS_WINNT 629 void 630 block_thread_signals( 631 sigset_t * pmask 632 ) 633 { 634 sigset_t block; 635 636 sigemptyset(&block); 637 # ifdef HAVE_SIGNALED_IO 638 # ifdef SIGIO 639 sigaddset(&block, SIGIO); 640 # endif 641 # ifdef SIGPOLL 642 sigaddset(&block, SIGPOLL); 643 # endif 644 # endif /* HAVE_SIGNALED_IO */ 645 sigaddset(&block, SIGALRM); 646 sigaddset(&block, MOREDEBUGSIG); 647 sigaddset(&block, LESSDEBUGSIG); 648 # ifdef SIGDIE1 649 sigaddset(&block, SIGDIE1); 650 # endif 651 # ifdef SIGDIE2 652 sigaddset(&block, SIGDIE2); 653 # endif 654 # ifdef SIGDIE3 655 sigaddset(&block, SIGDIE3); 656 # endif 657 # ifdef SIGDIE4 658 sigaddset(&block, SIGDIE4); 659 # endif 660 # ifdef SIGBUS 661 sigaddset(&block, SIGBUS); 662 # endif 663 sigemptyset(pmask); 664 pthread_sigmask(SIG_BLOCK, &block, pmask); 665 } 666 #endif /* !SYS_WINNT */ 667 668 669 /* -------------------------------------------------------------------- 670 * Create & destroy semaphores. This is sufficiently different between 671 * POSIX and Windows to warrant wrapper functions and close enough to 672 * use the concept of synchronization via semaphore for all platforms. 673 */ 674 static sem_ref 675 create_sema( 676 sema_type* semptr, 677 u_int inival, 678 u_int maxval) 679 { 680 #ifdef SYS_WINNT 681 682 long svini, svmax; 683 if (NULL != semptr) { 684 svini = (inival < LONG_MAX) 685 ? (long)inival : LONG_MAX; 686 svmax = (maxval < LONG_MAX && maxval > 0) 687 ? (long)maxval : LONG_MAX; 688 semptr->shnd = CreateSemaphore(NULL, svini, svmax, NULL); 689 if (NULL == semptr->shnd) 690 semptr = NULL; 691 } 692 693 #else 694 695 (void)maxval; 696 if (semptr && sem_init(semptr, FALSE, inival)) 697 semptr = NULL; 698 699 #endif 700 701 return semptr; 702 } 703 704 /* ------------------------------------------------------------------ */ 705 static sem_ref 706 delete_sema( 707 sem_ref obj) 708 { 709 710 # ifdef SYS_WINNT 711 712 if (obj) { 713 if (obj->shnd) 714 CloseHandle(obj->shnd); 715 obj->shnd = NULL; 716 } 717 718 # else 719 720 if (obj) 721 sem_destroy(obj); 722 723 # endif 724 725 return NULL; 726 } 727 728 /* -------------------------------------------------------------------- 729 * prepare_child_sems() 730 * 731 * create sync & access semaphores 732 * 733 * All semaphores are cleared, only the access semaphore has 1 unit. 734 * Childs wait on 'workitems_pending', then grabs 'sema_access' 735 * and dequeues jobs. When done, 'sema_access' is given one unit back. 736 * 737 * The producer grabs 'sema_access', manages the queue, restores 738 * 'sema_access' and puts one unit into 'workitems_pending'. 739 * 740 * The story goes the same for the response queue. 741 */ 742 static void 743 prepare_child_sems( 744 blocking_child *c 745 ) 746 { 747 if (NULL == worker_memlock) 748 worker_memlock = create_sema(&worker_mmutex, 1, 1); 749 750 c->accesslock = create_sema(&c->sem_table[0], 1, 1); 751 c->workitems_pending = create_sema(&c->sem_table[1], 0, 0); 752 c->wake_scheduled_sleep = create_sema(&c->sem_table[2], 0, 1); 753 # ifndef WORK_PIPE 754 c->responses_pending = create_sema(&c->sem_table[3], 0, 0); 755 # endif 756 } 757 758 /* -------------------------------------------------------------------- 759 * wait for semaphore. Where the wait can be interrupted, it will 760 * internally resume -- When this function returns, there is either no 761 * semaphore at all, a timeout occurred, or the caller could 762 * successfully take a token from the semaphore. 763 * 764 * For untimed wait, not checking the result of this function at all is 765 * definitely an option. 766 */ 767 static int 768 wait_for_sem( 769 sem_ref sem, 770 struct timespec * timeout /* wall-clock */ 771 ) 772 #ifdef SYS_WINNT 773 { 774 struct timespec now; 775 struct timespec delta; 776 DWORD msec; 777 DWORD rc; 778 779 if (!(sem && sem->shnd)) { 780 errno = EINVAL; 781 return -1; 782 } 783 784 if (NULL == timeout) { 785 msec = INFINITE; 786 } else { 787 getclock(TIMEOFDAY, &now); 788 delta = sub_tspec(*timeout, now); 789 if (delta.tv_sec < 0) { 790 msec = 0; 791 } else if ((delta.tv_sec + 1) >= (MAXDWORD / 1000)) { 792 msec = INFINITE; 793 } else { 794 msec = 1000 * (DWORD)delta.tv_sec; 795 msec += delta.tv_nsec / (1000 * 1000); 796 } 797 } 798 rc = WaitForSingleObject(sem->shnd, msec); 799 if (WAIT_OBJECT_0 == rc) 800 return 0; 801 if (WAIT_TIMEOUT == rc) { 802 errno = ETIMEDOUT; 803 return -1; 804 } 805 msyslog(LOG_ERR, "WaitForSingleObject unexpected 0x%x", rc); 806 errno = EFAULT; 807 return -1; 808 } 809 #else /* pthreads wait_for_sem() follows */ 810 { 811 int rc = -1; 812 813 if (sem) do { 814 if (NULL == timeout) 815 rc = sem_wait(sem); 816 else 817 rc = sem_timedwait(sem, timeout); 818 } while (rc == -1 && errno == EINTR); 819 else 820 errno = EINVAL; 821 822 return rc; 823 } 824 #endif 825 826 /* -------------------------------------------------------------------- 827 * blocking_thread - thread functions have WINAPI (aka 'stdcall') 828 * calling conventions under Windows and POSIX-defined signature 829 * otherwise. 830 */ 831 #ifdef SYS_WINNT 832 u_int WINAPI 833 #else 834 void * 835 #endif 836 blocking_thread( 837 void * ThreadArg 838 ) 839 { 840 blocking_child *c; 841 842 c = ThreadArg; 843 exit_worker(blocking_child_common(c)); 844 845 /* NOTREACHED */ 846 return 0; 847 } 848 849 /* -------------------------------------------------------------------- 850 * req_child_exit() runs in the parent. 851 * 852 * This function is called from from the idle timer, too, and possibly 853 * without a thread being there any longer. Since we have folded up our 854 * tent in that case and all the semaphores are already gone, we simply 855 * ignore this request in this case. 856 * 857 * Since the existence of the semaphores is controlled exclusively by 858 * the parent, there's no risk of data race here. 859 */ 860 int 861 req_child_exit( 862 blocking_child *c 863 ) 864 { 865 return (c->accesslock) 866 ? queue_req_pointer(c, CHILD_EXIT_REQ) 867 : 0; 868 } 869 870 /* -------------------------------------------------------------------- 871 * cleanup_after_child() runs in parent. 872 */ 873 static void 874 cleanup_after_child( 875 blocking_child * c 876 ) 877 { 878 DEBUG_INSIST(!c->reusable); 879 880 # ifdef SYS_WINNT 881 /* The thread was not created in detached state, so we better 882 * clean up. 883 */ 884 if (c->thread_ref && c->thread_ref->thnd) { 885 WaitForSingleObject(c->thread_ref->thnd, INFINITE); 886 INSIST(CloseHandle(c->thread_ref->thnd)); 887 c->thread_ref->thnd = NULL; 888 } 889 # endif 890 c->thread_ref = NULL; 891 892 /* remove semaphores and (if signalling vi IO) pipes */ 893 894 c->accesslock = delete_sema(c->accesslock); 895 c->workitems_pending = delete_sema(c->workitems_pending); 896 c->wake_scheduled_sleep = delete_sema(c->wake_scheduled_sleep); 897 898 # ifdef WORK_PIPE 899 DEBUG_INSIST(-1 != c->resp_read_pipe); 900 DEBUG_INSIST(-1 != c->resp_write_pipe); 901 (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, TRUE); 902 close(c->resp_write_pipe); 903 close(c->resp_read_pipe); 904 c->resp_write_pipe = -1; 905 c->resp_read_pipe = -1; 906 # else 907 DEBUG_INSIST(NULL != c->responses_pending); 908 (*addremove_io_semaphore)(c->responses_pending->shnd, TRUE); 909 c->responses_pending = delete_sema(c->responses_pending); 910 # endif 911 912 /* Is it necessary to check if there are pending requests and 913 * responses? If so, and if there are, what to do with them? 914 */ 915 916 /* re-init buffer index sequencers */ 917 c->head_workitem = 0; 918 c->tail_workitem = 0; 919 c->head_response = 0; 920 c->tail_response = 0; 921 922 c->reusable = TRUE; 923 } 924 925 926 #else /* !WORK_THREAD follows */ 927 char work_thread_nonempty_compilation_unit; 928 #endif 929