1 /* 2 * work_thread.c - threads implementation for blocking worker child. 3 */ 4 #include <config.h> 5 #include "ntp_workimpl.h" 6 7 #ifdef WORK_THREAD 8 9 #include <stdio.h> 10 #include <ctype.h> 11 #include <signal.h> 12 #ifndef SYS_WINNT 13 #include <pthread.h> 14 #endif 15 16 #include "ntp_stdlib.h" 17 #include "ntp_malloc.h" 18 #include "ntp_syslog.h" 19 #include "ntpd.h" 20 #include "ntp_io.h" 21 #include "ntp_assert.h" 22 #include "ntp_unixtime.h" 23 #include "timespecops.h" 24 #include "ntp_worker.h" 25 26 #define CHILD_EXIT_REQ ((blocking_pipe_header *)(intptr_t)-1) 27 #define CHILD_GONE_RESP CHILD_EXIT_REQ 28 /* Queue size increments: 29 * The request queue grows a bit faster than the response queue -- the 30 * daemon can push requests and pull results faster on avarage than the 31 * worker can process requests and push results... If this really pays 32 * off is debatable. 33 */ 34 #define WORKITEMS_ALLOC_INC 16 35 #define RESPONSES_ALLOC_INC 4 36 37 /* Fiddle with min/max stack sizes. 64kB minimum seems to work, so we 38 * set the maximum to 256kB. If the minimum goes below the 39 * system-defined minimum stack size, we have to adjust accordingly. 40 */ 41 #ifndef THREAD_MINSTACKSIZE 42 # define THREAD_MINSTACKSIZE (64U * 1024) 43 #endif 44 45 #ifndef THREAD_MAXSTACKSIZE 46 # define THREAD_MAXSTACKSIZE (256U * 1024) 47 #endif 48 49 /* need a good integer to store a pointer... */ 50 #ifndef UINTPTR_T 51 # if defined(UINTPTR_MAX) 52 # define UINTPTR_T uintptr_t 53 # elif defined(UINT_PTR) 54 # define UINTPTR_T UINT_PTR 55 # else 56 # define UINTPTR_T size_t 57 # endif 58 #endif 59 60 61 #ifdef SYS_WINNT 62 63 # define thread_exit(c) _endthreadex(c) 64 # define tickle_sem(sh) ReleaseSemaphore((sh->shnd), 1, NULL) 65 u_int WINAPI blocking_thread(void *); 66 static BOOL same_os_sema(const sem_ref obj, void * osobj); 67 68 #else 69 70 # define thread_exit(c) pthread_exit((void*)(UINTPTR_T)(c)) 71 # define tickle_sem sem_post 72 void * blocking_thread(void *); 73 static void block_thread_signals(sigset_t *); 74 75 #endif 76 77 #ifdef WORK_PIPE 78 addremove_io_fd_func addremove_io_fd; 79 #else 80 addremove_io_semaphore_func addremove_io_semaphore; 81 #endif 82 83 static void start_blocking_thread(blocking_child *); 84 static void start_blocking_thread_internal(blocking_child *); 85 static void prepare_child_sems(blocking_child *); 86 static int wait_for_sem(sem_ref, struct timespec *); 87 static int ensure_workitems_empty_slot(blocking_child *); 88 static int ensure_workresp_empty_slot(blocking_child *); 89 static int queue_req_pointer(blocking_child *, blocking_pipe_header *); 90 static void cleanup_after_child(blocking_child *); 91 92 static sema_type worker_mmutex; 93 static sem_ref worker_memlock; 94 95 /* -------------------------------------------------------------------- 96 * locking the global worker state table (and other global stuff) 97 */ 98 void 99 worker_global_lock( 100 int inOrOut) 101 { 102 if (worker_memlock) { 103 if (inOrOut) 104 wait_for_sem(worker_memlock, NULL); 105 else 106 tickle_sem(worker_memlock); 107 } 108 } 109 110 /* -------------------------------------------------------------------- 111 * implementation isolation wrapper 112 */ 113 void 114 exit_worker( 115 int exitcode 116 ) 117 { 118 thread_exit(exitcode); /* see #define thread_exit */ 119 } 120 121 /* -------------------------------------------------------------------- 122 * sleep for a given time or until the wakup semaphore is tickled. 123 */ 124 int 125 worker_sleep( 126 blocking_child * c, 127 time_t seconds 128 ) 129 { 130 struct timespec until; 131 int rc; 132 133 # ifdef HAVE_CLOCK_GETTIME 134 if (0 != clock_gettime(CLOCK_REALTIME, &until)) { 135 msyslog(LOG_ERR, "worker_sleep: clock_gettime() failed: %m"); 136 return -1; 137 } 138 # else 139 if (0 != getclock(TIMEOFDAY, &until)) { 140 msyslog(LOG_ERR, "worker_sleep: getclock() failed: %m"); 141 return -1; 142 } 143 # endif 144 until.tv_sec += seconds; 145 rc = wait_for_sem(c->wake_scheduled_sleep, &until); 146 if (0 == rc) 147 return -1; 148 if (-1 == rc && ETIMEDOUT == errno) 149 return 0; 150 msyslog(LOG_ERR, "worker_sleep: sem_timedwait: %m"); 151 return -1; 152 } 153 154 155 /* -------------------------------------------------------------------- 156 * Wake up a worker that takes a nap. 157 */ 158 void 159 interrupt_worker_sleep(void) 160 { 161 u_int idx; 162 blocking_child * c; 163 164 for (idx = 0; idx < blocking_children_alloc; idx++) { 165 c = blocking_children[idx]; 166 if (NULL == c || NULL == c->wake_scheduled_sleep) 167 continue; 168 tickle_sem(c->wake_scheduled_sleep); 169 } 170 } 171 172 /* -------------------------------------------------------------------- 173 * Make sure there is an empty slot at the head of the request 174 * queue. Tell if the queue is currently empty. 175 */ 176 static int 177 ensure_workitems_empty_slot( 178 blocking_child *c 179 ) 180 { 181 /* 182 ** !!! PRECONDITION: caller holds access lock! 183 ** 184 ** This simply tries to increase the size of the buffer if it 185 ** becomes full. The resize operation does *not* maintain the 186 ** order of requests, but that should be irrelevant since the 187 ** processing is considered asynchronous anyway. 188 ** 189 ** Return if the buffer is currently empty. 190 */ 191 192 static const size_t each = 193 sizeof(blocking_children[0]->workitems[0]); 194 195 size_t new_alloc; 196 size_t slots_used; 197 size_t sidx; 198 199 slots_used = c->head_workitem - c->tail_workitem; 200 if (slots_used >= c->workitems_alloc) { 201 new_alloc = c->workitems_alloc + WORKITEMS_ALLOC_INC; 202 c->workitems = erealloc(c->workitems, new_alloc * each); 203 for (sidx = c->workitems_alloc; sidx < new_alloc; ++sidx) 204 c->workitems[sidx] = NULL; 205 c->tail_workitem = 0; 206 c->head_workitem = c->workitems_alloc; 207 c->workitems_alloc = new_alloc; 208 } 209 INSIST(NULL == c->workitems[c->head_workitem % c->workitems_alloc]); 210 return (0 == slots_used); 211 } 212 213 /* -------------------------------------------------------------------- 214 * Make sure there is an empty slot at the head of the response 215 * queue. Tell if the queue is currently empty. 216 */ 217 static int 218 ensure_workresp_empty_slot( 219 blocking_child *c 220 ) 221 { 222 /* 223 ** !!! PRECONDITION: caller holds access lock! 224 ** 225 ** Works like the companion function above. 226 */ 227 228 static const size_t each = 229 sizeof(blocking_children[0]->responses[0]); 230 231 size_t new_alloc; 232 size_t slots_used; 233 size_t sidx; 234 235 slots_used = c->head_response - c->tail_response; 236 if (slots_used >= c->responses_alloc) { 237 new_alloc = c->responses_alloc + RESPONSES_ALLOC_INC; 238 c->responses = erealloc(c->responses, new_alloc * each); 239 for (sidx = c->responses_alloc; sidx < new_alloc; ++sidx) 240 c->responses[sidx] = NULL; 241 c->tail_response = 0; 242 c->head_response = c->responses_alloc; 243 c->responses_alloc = new_alloc; 244 } 245 INSIST(NULL == c->responses[c->head_response % c->responses_alloc]); 246 return (0 == slots_used); 247 } 248 249 250 /* -------------------------------------------------------------------- 251 * queue_req_pointer() - append a work item or idle exit request to 252 * blocking_workitems[]. Employ proper locking. 253 */ 254 static int 255 queue_req_pointer( 256 blocking_child * c, 257 blocking_pipe_header * hdr 258 ) 259 { 260 size_t qhead; 261 262 /* >>>> ACCESS LOCKING STARTS >>>> */ 263 wait_for_sem(c->accesslock, NULL); 264 ensure_workitems_empty_slot(c); 265 qhead = c->head_workitem; 266 c->workitems[qhead % c->workitems_alloc] = hdr; 267 c->head_workitem = 1 + qhead; 268 tickle_sem(c->accesslock); 269 /* <<<< ACCESS LOCKING ENDS <<<< */ 270 271 /* queue consumer wake-up notification */ 272 tickle_sem(c->workitems_pending); 273 274 return 0; 275 } 276 277 /* -------------------------------------------------------------------- 278 * API function to make sure a worker is running, a proper private copy 279 * of the data is made, the data eneterd into the queue and the worker 280 * is signalled. 281 */ 282 int 283 send_blocking_req_internal( 284 blocking_child * c, 285 blocking_pipe_header * hdr, 286 void * data 287 ) 288 { 289 blocking_pipe_header * threadcopy; 290 size_t payload_octets; 291 292 REQUIRE(hdr != NULL); 293 REQUIRE(data != NULL); 294 DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == hdr->magic_sig); 295 296 if (hdr->octets <= sizeof(*hdr)) 297 return 1; /* failure */ 298 payload_octets = hdr->octets - sizeof(*hdr); 299 300 if (NULL == c->thread_ref) 301 start_blocking_thread(c); 302 threadcopy = emalloc(hdr->octets); 303 memcpy(threadcopy, hdr, sizeof(*hdr)); 304 memcpy((char *)threadcopy + sizeof(*hdr), data, payload_octets); 305 306 return queue_req_pointer(c, threadcopy); 307 } 308 309 /* -------------------------------------------------------------------- 310 * Wait for the 'incoming queue no longer empty' signal, lock the shared 311 * structure and dequeue an item. 312 */ 313 blocking_pipe_header * 314 receive_blocking_req_internal( 315 blocking_child * c 316 ) 317 { 318 blocking_pipe_header * req; 319 size_t qhead, qtail; 320 321 req = NULL; 322 do { 323 /* wait for tickle from the producer side */ 324 wait_for_sem(c->workitems_pending, NULL); 325 326 /* >>>> ACCESS LOCKING STARTS >>>> */ 327 wait_for_sem(c->accesslock, NULL); 328 qhead = c->head_workitem; 329 do { 330 qtail = c->tail_workitem; 331 if (qhead == qtail) 332 break; 333 c->tail_workitem = qtail + 1; 334 qtail %= c->workitems_alloc; 335 req = c->workitems[qtail]; 336 c->workitems[qtail] = NULL; 337 } while (NULL == req); 338 tickle_sem(c->accesslock); 339 /* <<<< ACCESS LOCKING ENDS <<<< */ 340 341 } while (NULL == req); 342 343 INSIST(NULL != req); 344 if (CHILD_EXIT_REQ == req) { /* idled out */ 345 send_blocking_resp_internal(c, CHILD_GONE_RESP); 346 req = NULL; 347 } 348 349 return req; 350 } 351 352 /* -------------------------------------------------------------------- 353 * Push a response into the return queue and eventually tickle the 354 * receiver. 355 */ 356 int 357 send_blocking_resp_internal( 358 blocking_child * c, 359 blocking_pipe_header * resp 360 ) 361 { 362 size_t qhead; 363 int empty; 364 365 /* >>>> ACCESS LOCKING STARTS >>>> */ 366 wait_for_sem(c->accesslock, NULL); 367 empty = ensure_workresp_empty_slot(c); 368 qhead = c->head_response; 369 c->responses[qhead % c->responses_alloc] = resp; 370 c->head_response = 1 + qhead; 371 tickle_sem(c->accesslock); 372 /* <<<< ACCESS LOCKING ENDS <<<< */ 373 374 /* queue consumer wake-up notification */ 375 if (empty) 376 { 377 # ifdef WORK_PIPE 378 if (1 != write(c->resp_write_pipe, "", 1)) 379 msyslog(LOG_WARNING, "async resolver: %s", 380 "failed to notify main thread!"); 381 # else 382 tickle_sem(c->responses_pending); 383 # endif 384 } 385 return 0; 386 } 387 388 389 #ifndef WORK_PIPE 390 391 /* -------------------------------------------------------------------- 392 * Check if a (Windows-)handle to a semaphore is actually the same we 393 * are using inside the sema wrapper. 394 */ 395 static BOOL 396 same_os_sema( 397 const sem_ref obj, 398 void* osh 399 ) 400 { 401 return obj && osh && (obj->shnd == (HANDLE)osh); 402 } 403 404 /* -------------------------------------------------------------------- 405 * Find the shared context that associates to an OS handle and make sure 406 * the data is dequeued and processed. 407 */ 408 void 409 handle_blocking_resp_sem( 410 void * context 411 ) 412 { 413 blocking_child * c; 414 u_int idx; 415 416 c = NULL; 417 for (idx = 0; idx < blocking_children_alloc; idx++) { 418 c = blocking_children[idx]; 419 if (c != NULL && 420 c->thread_ref != NULL && 421 same_os_sema(c->responses_pending, context)) 422 break; 423 } 424 if (idx < blocking_children_alloc) 425 process_blocking_resp(c); 426 } 427 #endif /* !WORK_PIPE */ 428 429 /* -------------------------------------------------------------------- 430 * Fetch the next response from the return queue. In case of signalling 431 * via pipe, make sure the pipe is flushed, too. 432 */ 433 blocking_pipe_header * 434 receive_blocking_resp_internal( 435 blocking_child * c 436 ) 437 { 438 blocking_pipe_header * removed; 439 size_t qhead, qtail, slot; 440 441 #ifdef WORK_PIPE 442 int rc; 443 char scratch[32]; 444 445 do 446 rc = read(c->resp_read_pipe, scratch, sizeof(scratch)); 447 while (-1 == rc && EINTR == errno); 448 #endif 449 450 /* >>>> ACCESS LOCKING STARTS >>>> */ 451 wait_for_sem(c->accesslock, NULL); 452 qhead = c->head_response; 453 qtail = c->tail_response; 454 for (removed = NULL; !removed && (qhead != qtail); ++qtail) { 455 slot = qtail % c->responses_alloc; 456 removed = c->responses[slot]; 457 c->responses[slot] = NULL; 458 } 459 c->tail_response = qtail; 460 tickle_sem(c->accesslock); 461 /* <<<< ACCESS LOCKING ENDS <<<< */ 462 463 if (NULL != removed) { 464 DEBUG_ENSURE(CHILD_GONE_RESP == removed || 465 BLOCKING_RESP_MAGIC == removed->magic_sig); 466 } 467 if (CHILD_GONE_RESP == removed) { 468 cleanup_after_child(c); 469 removed = NULL; 470 } 471 472 return removed; 473 } 474 475 /* -------------------------------------------------------------------- 476 * Light up a new worker. 477 */ 478 static void 479 start_blocking_thread( 480 blocking_child * c 481 ) 482 { 483 484 DEBUG_INSIST(!c->reusable); 485 486 prepare_child_sems(c); 487 start_blocking_thread_internal(c); 488 } 489 490 /* -------------------------------------------------------------------- 491 * Create a worker thread. There are several differences between POSIX 492 * and Windows, of course -- most notably the Windows thread is no 493 * detached thread, and we keep the handle around until we want to get 494 * rid of the thread. The notification scheme also differs: Windows 495 * makes use of semaphores in both directions, POSIX uses a pipe for 496 * integration with 'select()' or alike. 497 */ 498 static void 499 start_blocking_thread_internal( 500 blocking_child * c 501 ) 502 #ifdef SYS_WINNT 503 { 504 BOOL resumed; 505 506 c->thread_ref = NULL; 507 (*addremove_io_semaphore)(c->responses_pending->shnd, FALSE); 508 c->thr_table[0].thnd = 509 (HANDLE)_beginthreadex( 510 NULL, 511 0, 512 &blocking_thread, 513 c, 514 CREATE_SUSPENDED, 515 NULL); 516 517 if (NULL == c->thr_table[0].thnd) { 518 msyslog(LOG_ERR, "start blocking thread failed: %m"); 519 exit(-1); 520 } 521 /* remember the thread priority is only within the process class */ 522 if (!SetThreadPriority(c->thr_table[0].thnd, 523 THREAD_PRIORITY_BELOW_NORMAL)) 524 msyslog(LOG_ERR, "Error lowering blocking thread priority: %m"); 525 526 resumed = ResumeThread(c->thr_table[0].thnd); 527 DEBUG_INSIST(resumed); 528 c->thread_ref = &c->thr_table[0]; 529 } 530 #else /* pthreads start_blocking_thread_internal() follows */ 531 { 532 # ifdef NEED_PTHREAD_INIT 533 static int pthread_init_called; 534 # endif 535 pthread_attr_t thr_attr; 536 int rc; 537 int pipe_ends[2]; /* read then write */ 538 int is_pipe; 539 int flags; 540 size_t ostacksize; 541 size_t nstacksize; 542 sigset_t saved_sig_mask; 543 544 c->thread_ref = NULL; 545 546 # ifdef NEED_PTHREAD_INIT 547 /* 548 * from lib/isc/unix/app.c: 549 * BSDI 3.1 seg faults in pthread_sigmask() if we don't do this. 550 */ 551 if (!pthread_init_called) { 552 pthread_init(); 553 pthread_init_called = TRUE; 554 } 555 # endif 556 557 rc = pipe_socketpair(&pipe_ends[0], &is_pipe); 558 if (0 != rc) { 559 msyslog(LOG_ERR, "start_blocking_thread: pipe_socketpair() %m"); 560 exit(1); 561 } 562 c->resp_read_pipe = move_fd(pipe_ends[0]); 563 c->resp_write_pipe = move_fd(pipe_ends[1]); 564 c->ispipe = is_pipe; 565 flags = fcntl(c->resp_read_pipe, F_GETFL, 0); 566 if (-1 == flags) { 567 msyslog(LOG_ERR, "start_blocking_thread: fcntl(F_GETFL) %m"); 568 exit(1); 569 } 570 rc = fcntl(c->resp_read_pipe, F_SETFL, O_NONBLOCK | flags); 571 if (-1 == rc) { 572 msyslog(LOG_ERR, 573 "start_blocking_thread: fcntl(F_SETFL, O_NONBLOCK) %m"); 574 exit(1); 575 } 576 (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, FALSE); 577 pthread_attr_init(&thr_attr); 578 pthread_attr_setdetachstate(&thr_attr, PTHREAD_CREATE_DETACHED); 579 #if defined(HAVE_PTHREAD_ATTR_GETSTACKSIZE) && \ 580 defined(HAVE_PTHREAD_ATTR_SETSTACKSIZE) 581 rc = pthread_attr_getstacksize(&thr_attr, &ostacksize); 582 if (0 != rc) { 583 msyslog(LOG_ERR, 584 "start_blocking_thread: pthread_attr_getstacksize() -> %s", 585 strerror(rc)); 586 } else { 587 nstacksize = ostacksize; 588 /* order is important here: first clamp on upper limit, 589 * and the PTHREAD min stack size is ultimate override! 590 */ 591 if (nstacksize > THREAD_MAXSTACKSIZE) 592 nstacksize = THREAD_MAXSTACKSIZE; 593 # ifdef PTHREAD_STACK_MAX 594 if (nstacksize > PTHREAD_STACK_MAX) 595 nstacksize = PTHREAD_STACK_MAX; 596 # endif 597 598 /* now clamp on lower stack limit. */ 599 if (nstacksize < THREAD_MINSTACKSIZE) 600 nstacksize = THREAD_MINSTACKSIZE; 601 # ifdef PTHREAD_STACK_MIN 602 if (nstacksize < PTHREAD_STACK_MIN) 603 nstacksize = PTHREAD_STACK_MIN; 604 # endif 605 606 if (nstacksize != ostacksize) 607 rc = pthread_attr_setstacksize(&thr_attr, nstacksize); 608 if (0 != rc) 609 msyslog(LOG_ERR, 610 "start_blocking_thread: pthread_attr_setstacksize(0x%lx -> 0x%lx) -> %s", 611 (u_long)ostacksize, (u_long)nstacksize, 612 strerror(rc)); 613 } 614 #else 615 UNUSED_ARG(nstacksize); 616 UNUSED_ARG(ostacksize); 617 #endif 618 #if defined(PTHREAD_SCOPE_SYSTEM) && defined(NEED_PTHREAD_SCOPE_SYSTEM) 619 pthread_attr_setscope(&thr_attr, PTHREAD_SCOPE_SYSTEM); 620 #endif 621 c->thread_ref = emalloc_zero(sizeof(*c->thread_ref)); 622 block_thread_signals(&saved_sig_mask); 623 rc = pthread_create(&c->thr_table[0], &thr_attr, 624 &blocking_thread, c); 625 pthread_sigmask(SIG_SETMASK, &saved_sig_mask, NULL); 626 pthread_attr_destroy(&thr_attr); 627 if (0 != rc) { 628 msyslog(LOG_ERR, "start_blocking_thread: pthread_create() -> %s", 629 strerror(rc)); 630 exit(1); 631 } 632 c->thread_ref = &c->thr_table[0]; 633 } 634 #endif 635 636 /* -------------------------------------------------------------------- 637 * block_thread_signals() 638 * 639 * Temporarily block signals used by ntpd main thread, so that signal 640 * mask inherited by child threads leaves them blocked. Returns prior 641 * active signal mask via pmask, to be restored by the main thread 642 * after pthread_create(). 643 */ 644 #ifndef SYS_WINNT 645 void 646 block_thread_signals( 647 sigset_t * pmask 648 ) 649 { 650 sigset_t block; 651 652 sigemptyset(&block); 653 # ifdef HAVE_SIGNALED_IO 654 # ifdef SIGIO 655 sigaddset(&block, SIGIO); 656 # endif 657 # ifdef SIGPOLL 658 sigaddset(&block, SIGPOLL); 659 # endif 660 # endif /* HAVE_SIGNALED_IO */ 661 sigaddset(&block, SIGALRM); 662 sigaddset(&block, MOREDEBUGSIG); 663 sigaddset(&block, LESSDEBUGSIG); 664 # ifdef SIGDIE1 665 sigaddset(&block, SIGDIE1); 666 # endif 667 # ifdef SIGDIE2 668 sigaddset(&block, SIGDIE2); 669 # endif 670 # ifdef SIGDIE3 671 sigaddset(&block, SIGDIE3); 672 # endif 673 # ifdef SIGDIE4 674 sigaddset(&block, SIGDIE4); 675 # endif 676 # ifdef SIGBUS 677 sigaddset(&block, SIGBUS); 678 # endif 679 sigemptyset(pmask); 680 pthread_sigmask(SIG_BLOCK, &block, pmask); 681 } 682 #endif /* !SYS_WINNT */ 683 684 685 /* -------------------------------------------------------------------- 686 * Create & destroy semaphores. This is sufficiently different between 687 * POSIX and Windows to warrant wrapper functions and close enough to 688 * use the concept of synchronization via semaphore for all platforms. 689 */ 690 static sem_ref 691 create_sema( 692 sema_type* semptr, 693 u_int inival, 694 u_int maxval) 695 { 696 #ifdef SYS_WINNT 697 698 long svini, svmax; 699 if (NULL != semptr) { 700 svini = (inival < LONG_MAX) 701 ? (long)inival : LONG_MAX; 702 svmax = (maxval < LONG_MAX && maxval > 0) 703 ? (long)maxval : LONG_MAX; 704 semptr->shnd = CreateSemaphore(NULL, svini, svmax, NULL); 705 if (NULL == semptr->shnd) 706 semptr = NULL; 707 } 708 709 #else 710 711 (void)maxval; 712 if (semptr && sem_init(semptr, FALSE, inival)) 713 semptr = NULL; 714 715 #endif 716 717 return semptr; 718 } 719 720 /* ------------------------------------------------------------------ */ 721 static sem_ref 722 delete_sema( 723 sem_ref obj) 724 { 725 726 # ifdef SYS_WINNT 727 728 if (obj) { 729 if (obj->shnd) 730 CloseHandle(obj->shnd); 731 obj->shnd = NULL; 732 } 733 734 # else 735 736 if (obj) 737 sem_destroy(obj); 738 739 # endif 740 741 return NULL; 742 } 743 744 /* -------------------------------------------------------------------- 745 * prepare_child_sems() 746 * 747 * create sync & access semaphores 748 * 749 * All semaphores are cleared, only the access semaphore has 1 unit. 750 * Childs wait on 'workitems_pending', then grabs 'sema_access' 751 * and dequeues jobs. When done, 'sema_access' is given one unit back. 752 * 753 * The producer grabs 'sema_access', manages the queue, restores 754 * 'sema_access' and puts one unit into 'workitems_pending'. 755 * 756 * The story goes the same for the response queue. 757 */ 758 static void 759 prepare_child_sems( 760 blocking_child *c 761 ) 762 { 763 if (NULL == worker_memlock) 764 worker_memlock = create_sema(&worker_mmutex, 1, 1); 765 766 c->accesslock = create_sema(&c->sem_table[0], 1, 1); 767 c->workitems_pending = create_sema(&c->sem_table[1], 0, 0); 768 c->wake_scheduled_sleep = create_sema(&c->sem_table[2], 0, 1); 769 # ifndef WORK_PIPE 770 c->responses_pending = create_sema(&c->sem_table[3], 0, 0); 771 # endif 772 } 773 774 /* -------------------------------------------------------------------- 775 * wait for semaphore. Where the wait can be interrupted, it will 776 * internally resume -- When this function returns, there is either no 777 * semaphore at all, a timeout occurred, or the caller could 778 * successfully take a token from the semaphore. 779 * 780 * For untimed wait, not checking the result of this function at all is 781 * definitely an option. 782 */ 783 static int 784 wait_for_sem( 785 sem_ref sem, 786 struct timespec * timeout /* wall-clock */ 787 ) 788 #ifdef SYS_WINNT 789 { 790 struct timespec now; 791 struct timespec delta; 792 DWORD msec; 793 DWORD rc; 794 795 if (!(sem && sem->shnd)) { 796 errno = EINVAL; 797 return -1; 798 } 799 800 if (NULL == timeout) { 801 msec = INFINITE; 802 } else { 803 getclock(TIMEOFDAY, &now); 804 delta = sub_tspec(*timeout, now); 805 if (delta.tv_sec < 0) { 806 msec = 0; 807 } else if ((delta.tv_sec + 1) >= (MAXDWORD / 1000)) { 808 msec = INFINITE; 809 } else { 810 msec = 1000 * (DWORD)delta.tv_sec; 811 msec += delta.tv_nsec / (1000 * 1000); 812 } 813 } 814 rc = WaitForSingleObject(sem->shnd, msec); 815 if (WAIT_OBJECT_0 == rc) 816 return 0; 817 if (WAIT_TIMEOUT == rc) { 818 errno = ETIMEDOUT; 819 return -1; 820 } 821 msyslog(LOG_ERR, "WaitForSingleObject unexpected 0x%x", rc); 822 errno = EFAULT; 823 return -1; 824 } 825 #else /* pthreads wait_for_sem() follows */ 826 { 827 int rc = -1; 828 829 if (sem) do { 830 if (NULL == timeout) 831 rc = sem_wait(sem); 832 else 833 rc = sem_timedwait(sem, timeout); 834 } while (rc == -1 && errno == EINTR); 835 else 836 errno = EINVAL; 837 838 return rc; 839 } 840 #endif 841 842 /* -------------------------------------------------------------------- 843 * blocking_thread - thread functions have WINAPI (aka 'stdcall') 844 * calling conventions under Windows and POSIX-defined signature 845 * otherwise. 846 */ 847 #ifdef SYS_WINNT 848 u_int WINAPI 849 #else 850 void * 851 #endif 852 blocking_thread( 853 void * ThreadArg 854 ) 855 { 856 blocking_child *c; 857 858 c = ThreadArg; 859 exit_worker(blocking_child_common(c)); 860 861 /* NOTREACHED */ 862 return 0; 863 } 864 865 /* -------------------------------------------------------------------- 866 * req_child_exit() runs in the parent. 867 * 868 * This function is called from from the idle timer, too, and possibly 869 * without a thread being there any longer. Since we have folded up our 870 * tent in that case and all the semaphores are already gone, we simply 871 * ignore this request in this case. 872 * 873 * Since the existence of the semaphores is controlled exclusively by 874 * the parent, there's no risk of data race here. 875 */ 876 int 877 req_child_exit( 878 blocking_child *c 879 ) 880 { 881 return (c->accesslock) 882 ? queue_req_pointer(c, CHILD_EXIT_REQ) 883 : 0; 884 } 885 886 /* -------------------------------------------------------------------- 887 * cleanup_after_child() runs in parent. 888 */ 889 static void 890 cleanup_after_child( 891 blocking_child * c 892 ) 893 { 894 DEBUG_INSIST(!c->reusable); 895 896 # ifdef SYS_WINNT 897 /* The thread was not created in detached state, so we better 898 * clean up. 899 */ 900 if (c->thread_ref && c->thread_ref->thnd) { 901 WaitForSingleObject(c->thread_ref->thnd, INFINITE); 902 INSIST(CloseHandle(c->thread_ref->thnd)); 903 c->thread_ref->thnd = NULL; 904 } 905 # endif 906 c->thread_ref = NULL; 907 908 /* remove semaphores and (if signalling vi IO) pipes */ 909 910 c->accesslock = delete_sema(c->accesslock); 911 c->workitems_pending = delete_sema(c->workitems_pending); 912 c->wake_scheduled_sleep = delete_sema(c->wake_scheduled_sleep); 913 914 # ifdef WORK_PIPE 915 DEBUG_INSIST(-1 != c->resp_read_pipe); 916 DEBUG_INSIST(-1 != c->resp_write_pipe); 917 (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, TRUE); 918 close(c->resp_write_pipe); 919 close(c->resp_read_pipe); 920 c->resp_write_pipe = -1; 921 c->resp_read_pipe = -1; 922 # else 923 DEBUG_INSIST(NULL != c->responses_pending); 924 (*addremove_io_semaphore)(c->responses_pending->shnd, TRUE); 925 c->responses_pending = delete_sema(c->responses_pending); 926 # endif 927 928 /* Is it necessary to check if there are pending requests and 929 * responses? If so, and if there are, what to do with them? 930 */ 931 932 /* re-init buffer index sequencers */ 933 c->head_workitem = 0; 934 c->tail_workitem = 0; 935 c->head_response = 0; 936 c->tail_response = 0; 937 938 c->reusable = TRUE; 939 } 940 941 942 #else /* !WORK_THREAD follows */ 943 char work_thread_nonempty_compilation_unit; 944 #endif 945