1 /* 2 * work_thread.c - threads implementation for blocking worker child. 3 */ 4 #include <config.h> 5 #include "ntp_workimpl.h" 6 7 #ifdef WORK_THREAD 8 9 #include <stdio.h> 10 #include <ctype.h> 11 #include <signal.h> 12 #ifndef SYS_WINNT 13 #include <pthread.h> 14 #endif 15 16 #include "ntp_stdlib.h" 17 #include "ntp_malloc.h" 18 #include "ntp_syslog.h" 19 #include "ntpd.h" 20 #include "ntp_io.h" 21 #include "ntp_assert.h" 22 #include "ntp_unixtime.h" 23 #include "timespecops.h" 24 #include "ntp_worker.h" 25 26 #define CHILD_EXIT_REQ ((blocking_pipe_header *)(intptr_t)-1) 27 #define CHILD_GONE_RESP CHILD_EXIT_REQ 28 /* Queue size increments: 29 * The request queue grows a bit faster than the response queue -- the 30 * daemon can push requests and pull results faster on avarage than the 31 * worker can process requests and push results... If this really pays 32 * off is debatable. 33 */ 34 #define WORKITEMS_ALLOC_INC 16 35 #define RESPONSES_ALLOC_INC 4 36 37 /* Fiddle with min/max stack sizes. 64kB minimum seems to work, so we 38 * set the maximum to 256kB. If the minimum goes below the 39 * system-defined minimum stack size, we have to adjust accordingly. 40 */ 41 #ifndef THREAD_MINSTACKSIZE 42 # define THREAD_MINSTACKSIZE (64U * 1024) 43 #endif 44 #ifndef __sun 45 #if defined(PTHREAD_STACK_MIN) && THREAD_MINSTACKSIZE < PTHREAD_STACK_MIN 46 # undef THREAD_MINSTACKSIZE 47 # define THREAD_MINSTACKSIZE PTHREAD_STACK_MIN 48 #endif 49 #endif 50 51 #ifndef THREAD_MAXSTACKSIZE 52 # define THREAD_MAXSTACKSIZE (256U * 1024) 53 #endif 54 #if THREAD_MAXSTACKSIZE < THREAD_MINSTACKSIZE 55 # undef THREAD_MAXSTACKSIZE 56 # define THREAD_MAXSTACKSIZE THREAD_MINSTACKSIZE 57 #endif 58 59 /* need a good integer to store a pointer... */ 60 #ifndef UINTPTR_T 61 # if defined(UINTPTR_MAX) 62 # define UINTPTR_T uintptr_t 63 # elif defined(UINT_PTR) 64 # define UINTPTR_T UINT_PTR 65 # else 66 # define UINTPTR_T size_t 67 # endif 68 #endif 69 70 71 #ifdef SYS_WINNT 72 73 # define thread_exit(c) _endthreadex(c) 74 # define tickle_sem(sh) ReleaseSemaphore((sh->shnd), 1, NULL) 75 u_int WINAPI blocking_thread(void *); 76 static BOOL same_os_sema(const sem_ref obj, void * osobj); 77 78 #else 79 80 # define thread_exit(c) pthread_exit((void*)(UINTPTR_T)(c)) 81 # define tickle_sem sem_post 82 void * blocking_thread(void *); 83 static void block_thread_signals(sigset_t *); 84 85 #endif 86 87 #ifdef WORK_PIPE 88 addremove_io_fd_func addremove_io_fd; 89 #else 90 addremove_io_semaphore_func addremove_io_semaphore; 91 #endif 92 93 static void start_blocking_thread(blocking_child *); 94 static void start_blocking_thread_internal(blocking_child *); 95 static void prepare_child_sems(blocking_child *); 96 static int wait_for_sem(sem_ref, struct timespec *); 97 static int ensure_workitems_empty_slot(blocking_child *); 98 static int ensure_workresp_empty_slot(blocking_child *); 99 static int queue_req_pointer(blocking_child *, blocking_pipe_header *); 100 static void cleanup_after_child(blocking_child *); 101 102 static sema_type worker_mmutex; 103 static sem_ref worker_memlock; 104 105 /* -------------------------------------------------------------------- 106 * locking the global worker state table (and other global stuff) 107 */ 108 void 109 worker_global_lock( 110 int inOrOut) 111 { 112 if (worker_memlock) { 113 if (inOrOut) 114 wait_for_sem(worker_memlock, NULL); 115 else 116 tickle_sem(worker_memlock); 117 } 118 } 119 120 /* -------------------------------------------------------------------- 121 * implementation isolation wrapper 122 */ 123 void 124 exit_worker( 125 int exitcode 126 ) 127 { 128 thread_exit(exitcode); /* see #define thread_exit */ 129 } 130 131 /* -------------------------------------------------------------------- 132 * sleep for a given time or until the wakup semaphore is tickled. 133 */ 134 int 135 worker_sleep( 136 blocking_child * c, 137 time_t seconds 138 ) 139 { 140 struct timespec until; 141 int rc; 142 143 # ifdef HAVE_CLOCK_GETTIME 144 if (0 != clock_gettime(CLOCK_REALTIME, &until)) { 145 msyslog(LOG_ERR, "worker_sleep: clock_gettime() failed: %m"); 146 return -1; 147 } 148 # else 149 if (0 != getclock(TIMEOFDAY, &until)) { 150 msyslog(LOG_ERR, "worker_sleep: getclock() failed: %m"); 151 return -1; 152 } 153 # endif 154 until.tv_sec += seconds; 155 rc = wait_for_sem(c->wake_scheduled_sleep, &until); 156 if (0 == rc) 157 return -1; 158 if (-1 == rc && ETIMEDOUT == errno) 159 return 0; 160 msyslog(LOG_ERR, "worker_sleep: sem_timedwait: %m"); 161 return -1; 162 } 163 164 165 /* -------------------------------------------------------------------- 166 * Wake up a worker that takes a nap. 167 */ 168 void 169 interrupt_worker_sleep(void) 170 { 171 u_int idx; 172 blocking_child * c; 173 174 for (idx = 0; idx < blocking_children_alloc; idx++) { 175 c = blocking_children[idx]; 176 if (NULL == c || NULL == c->wake_scheduled_sleep) 177 continue; 178 tickle_sem(c->wake_scheduled_sleep); 179 } 180 } 181 182 /* -------------------------------------------------------------------- 183 * Make sure there is an empty slot at the head of the request 184 * queue. Tell if the queue is currently empty. 185 */ 186 static int 187 ensure_workitems_empty_slot( 188 blocking_child *c 189 ) 190 { 191 /* 192 ** !!! PRECONDITION: caller holds access lock! 193 ** 194 ** This simply tries to increase the size of the buffer if it 195 ** becomes full. The resize operation does *not* maintain the 196 ** order of requests, but that should be irrelevant since the 197 ** processing is considered asynchronous anyway. 198 ** 199 ** Return if the buffer is currently empty. 200 */ 201 202 static const size_t each = 203 sizeof(blocking_children[0]->workitems[0]); 204 205 size_t new_alloc; 206 size_t slots_used; 207 size_t sidx; 208 209 slots_used = c->head_workitem - c->tail_workitem; 210 if (slots_used >= c->workitems_alloc) { 211 new_alloc = c->workitems_alloc + WORKITEMS_ALLOC_INC; 212 c->workitems = erealloc(c->workitems, new_alloc * each); 213 for (sidx = c->workitems_alloc; sidx < new_alloc; ++sidx) 214 c->workitems[sidx] = NULL; 215 c->tail_workitem = 0; 216 c->head_workitem = c->workitems_alloc; 217 c->workitems_alloc = new_alloc; 218 } 219 INSIST(NULL == c->workitems[c->head_workitem % c->workitems_alloc]); 220 return (0 == slots_used); 221 } 222 223 /* -------------------------------------------------------------------- 224 * Make sure there is an empty slot at the head of the response 225 * queue. Tell if the queue is currently empty. 226 */ 227 static int 228 ensure_workresp_empty_slot( 229 blocking_child *c 230 ) 231 { 232 /* 233 ** !!! PRECONDITION: caller holds access lock! 234 ** 235 ** Works like the companion function above. 236 */ 237 238 static const size_t each = 239 sizeof(blocking_children[0]->responses[0]); 240 241 size_t new_alloc; 242 size_t slots_used; 243 size_t sidx; 244 245 slots_used = c->head_response - c->tail_response; 246 if (slots_used >= c->responses_alloc) { 247 new_alloc = c->responses_alloc + RESPONSES_ALLOC_INC; 248 c->responses = erealloc(c->responses, new_alloc * each); 249 for (sidx = c->responses_alloc; sidx < new_alloc; ++sidx) 250 c->responses[sidx] = NULL; 251 c->tail_response = 0; 252 c->head_response = c->responses_alloc; 253 c->responses_alloc = new_alloc; 254 } 255 INSIST(NULL == c->responses[c->head_response % c->responses_alloc]); 256 return (0 == slots_used); 257 } 258 259 260 /* -------------------------------------------------------------------- 261 * queue_req_pointer() - append a work item or idle exit request to 262 * blocking_workitems[]. Employ proper locking. 263 */ 264 static int 265 queue_req_pointer( 266 blocking_child * c, 267 blocking_pipe_header * hdr 268 ) 269 { 270 size_t qhead; 271 272 /* >>>> ACCESS LOCKING STARTS >>>> */ 273 wait_for_sem(c->accesslock, NULL); 274 ensure_workitems_empty_slot(c); 275 qhead = c->head_workitem; 276 c->workitems[qhead % c->workitems_alloc] = hdr; 277 c->head_workitem = 1 + qhead; 278 tickle_sem(c->accesslock); 279 /* <<<< ACCESS LOCKING ENDS <<<< */ 280 281 /* queue consumer wake-up notification */ 282 tickle_sem(c->workitems_pending); 283 284 return 0; 285 } 286 287 /* -------------------------------------------------------------------- 288 * API function to make sure a worker is running, a proper private copy 289 * of the data is made, the data eneterd into the queue and the worker 290 * is signalled. 291 */ 292 int 293 send_blocking_req_internal( 294 blocking_child * c, 295 blocking_pipe_header * hdr, 296 void * data 297 ) 298 { 299 blocking_pipe_header * threadcopy; 300 size_t payload_octets; 301 302 REQUIRE(hdr != NULL); 303 REQUIRE(data != NULL); 304 DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == hdr->magic_sig); 305 306 if (hdr->octets <= sizeof(*hdr)) 307 return 1; /* failure */ 308 payload_octets = hdr->octets - sizeof(*hdr); 309 310 if (NULL == c->thread_ref) 311 start_blocking_thread(c); 312 threadcopy = emalloc(hdr->octets); 313 memcpy(threadcopy, hdr, sizeof(*hdr)); 314 memcpy((char *)threadcopy + sizeof(*hdr), data, payload_octets); 315 316 return queue_req_pointer(c, threadcopy); 317 } 318 319 /* -------------------------------------------------------------------- 320 * Wait for the 'incoming queue no longer empty' signal, lock the shared 321 * structure and dequeue an item. 322 */ 323 blocking_pipe_header * 324 receive_blocking_req_internal( 325 blocking_child * c 326 ) 327 { 328 blocking_pipe_header * req; 329 size_t qhead, qtail; 330 331 req = NULL; 332 do { 333 /* wait for tickle from the producer side */ 334 wait_for_sem(c->workitems_pending, NULL); 335 336 /* >>>> ACCESS LOCKING STARTS >>>> */ 337 wait_for_sem(c->accesslock, NULL); 338 qhead = c->head_workitem; 339 do { 340 qtail = c->tail_workitem; 341 if (qhead == qtail) 342 break; 343 c->tail_workitem = qtail + 1; 344 qtail %= c->workitems_alloc; 345 req = c->workitems[qtail]; 346 c->workitems[qtail] = NULL; 347 } while (NULL == req); 348 tickle_sem(c->accesslock); 349 /* <<<< ACCESS LOCKING ENDS <<<< */ 350 351 } while (NULL == req); 352 353 INSIST(NULL != req); 354 if (CHILD_EXIT_REQ == req) { /* idled out */ 355 send_blocking_resp_internal(c, CHILD_GONE_RESP); 356 req = NULL; 357 } 358 359 return req; 360 } 361 362 /* -------------------------------------------------------------------- 363 * Push a response into the return queue and eventually tickle the 364 * receiver. 365 */ 366 int 367 send_blocking_resp_internal( 368 blocking_child * c, 369 blocking_pipe_header * resp 370 ) 371 { 372 size_t qhead; 373 int empty; 374 375 /* >>>> ACCESS LOCKING STARTS >>>> */ 376 wait_for_sem(c->accesslock, NULL); 377 empty = ensure_workresp_empty_slot(c); 378 qhead = c->head_response; 379 c->responses[qhead % c->responses_alloc] = resp; 380 c->head_response = 1 + qhead; 381 tickle_sem(c->accesslock); 382 /* <<<< ACCESS LOCKING ENDS <<<< */ 383 384 /* queue consumer wake-up notification */ 385 if (empty) 386 { 387 # ifdef WORK_PIPE 388 if (1 != write(c->resp_write_pipe, "", 1)) 389 msyslog(LOG_WARNING, "async resolver: %s", 390 "failed to notify main thread!"); 391 # else 392 tickle_sem(c->responses_pending); 393 # endif 394 } 395 return 0; 396 } 397 398 399 #ifndef WORK_PIPE 400 401 /* -------------------------------------------------------------------- 402 * Check if a (Windows-)hanndle to a semaphore is actually the same we 403 * are using inside the sema wrapper. 404 */ 405 static BOOL 406 same_os_sema( 407 const sem_ref obj, 408 void* osh 409 ) 410 { 411 return obj && osh && (obj->shnd == (HANDLE)osh); 412 } 413 414 /* -------------------------------------------------------------------- 415 * Find the shared context that associates to an OS handle and make sure 416 * the data is dequeued and processed. 417 */ 418 void 419 handle_blocking_resp_sem( 420 void * context 421 ) 422 { 423 blocking_child * c; 424 u_int idx; 425 426 c = NULL; 427 for (idx = 0; idx < blocking_children_alloc; idx++) { 428 c = blocking_children[idx]; 429 if (c != NULL && 430 c->thread_ref != NULL && 431 same_os_sema(c->responses_pending, context)) 432 break; 433 } 434 if (idx < blocking_children_alloc) 435 process_blocking_resp(c); 436 } 437 #endif /* !WORK_PIPE */ 438 439 /* -------------------------------------------------------------------- 440 * Fetch the next response from the return queue. In case of signalling 441 * via pipe, make sure the pipe is flushed, too. 442 */ 443 blocking_pipe_header * 444 receive_blocking_resp_internal( 445 blocking_child * c 446 ) 447 { 448 blocking_pipe_header * removed; 449 size_t qhead, qtail, slot; 450 451 #ifdef WORK_PIPE 452 int rc; 453 char scratch[32]; 454 455 do 456 rc = read(c->resp_read_pipe, scratch, sizeof(scratch)); 457 while (-1 == rc && EINTR == errno); 458 #endif 459 460 /* >>>> ACCESS LOCKING STARTS >>>> */ 461 wait_for_sem(c->accesslock, NULL); 462 qhead = c->head_response; 463 qtail = c->tail_response; 464 for (removed = NULL; !removed && (qhead != qtail); ++qtail) { 465 slot = qtail % c->responses_alloc; 466 removed = c->responses[slot]; 467 c->responses[slot] = NULL; 468 } 469 c->tail_response = qtail; 470 tickle_sem(c->accesslock); 471 /* <<<< ACCESS LOCKING ENDS <<<< */ 472 473 if (NULL != removed) { 474 DEBUG_ENSURE(CHILD_GONE_RESP == removed || 475 BLOCKING_RESP_MAGIC == removed->magic_sig); 476 } 477 if (CHILD_GONE_RESP == removed) { 478 cleanup_after_child(c); 479 removed = NULL; 480 } 481 482 return removed; 483 } 484 485 /* -------------------------------------------------------------------- 486 * Light up a new worker. 487 */ 488 static void 489 start_blocking_thread( 490 blocking_child * c 491 ) 492 { 493 494 DEBUG_INSIST(!c->reusable); 495 496 prepare_child_sems(c); 497 start_blocking_thread_internal(c); 498 } 499 500 /* -------------------------------------------------------------------- 501 * Create a worker thread. There are several differences between POSIX 502 * and Windows, of course -- most notably the Windows thread is no 503 * detached thread, and we keep the handle around until we want to get 504 * rid of the thread. The notification scheme also differs: Windows 505 * makes use of semaphores in both directions, POSIX uses a pipe for 506 * integration with 'select()' or alike. 507 */ 508 static void 509 start_blocking_thread_internal( 510 blocking_child * c 511 ) 512 #ifdef SYS_WINNT 513 { 514 BOOL resumed; 515 516 c->thread_ref = NULL; 517 (*addremove_io_semaphore)(c->responses_pending->shnd, FALSE); 518 c->thr_table[0].thnd = 519 (HANDLE)_beginthreadex( 520 NULL, 521 0, 522 &blocking_thread, 523 c, 524 CREATE_SUSPENDED, 525 NULL); 526 527 if (NULL == c->thr_table[0].thnd) { 528 msyslog(LOG_ERR, "start blocking thread failed: %m"); 529 exit(-1); 530 } 531 /* remember the thread priority is only within the process class */ 532 if (!SetThreadPriority(c->thr_table[0].thnd, 533 THREAD_PRIORITY_BELOW_NORMAL)) 534 msyslog(LOG_ERR, "Error lowering blocking thread priority: %m"); 535 536 resumed = ResumeThread(c->thr_table[0].thnd); 537 DEBUG_INSIST(resumed); 538 c->thread_ref = &c->thr_table[0]; 539 } 540 #else /* pthreads start_blocking_thread_internal() follows */ 541 { 542 # ifdef NEED_PTHREAD_INIT 543 static int pthread_init_called; 544 # endif 545 pthread_attr_t thr_attr; 546 int rc; 547 int pipe_ends[2]; /* read then write */ 548 int is_pipe; 549 int flags; 550 size_t ostacksize; 551 size_t nstacksize; 552 sigset_t saved_sig_mask; 553 554 c->thread_ref = NULL; 555 556 # ifdef NEED_PTHREAD_INIT 557 /* 558 * from lib/isc/unix/app.c: 559 * BSDI 3.1 seg faults in pthread_sigmask() if we don't do this. 560 */ 561 if (!pthread_init_called) { 562 pthread_init(); 563 pthread_init_called = TRUE; 564 } 565 # endif 566 567 rc = pipe_socketpair(&pipe_ends[0], &is_pipe); 568 if (0 != rc) { 569 msyslog(LOG_ERR, "start_blocking_thread: pipe_socketpair() %m"); 570 exit(1); 571 } 572 c->resp_read_pipe = move_fd(pipe_ends[0]); 573 c->resp_write_pipe = move_fd(pipe_ends[1]); 574 c->ispipe = is_pipe; 575 flags = fcntl(c->resp_read_pipe, F_GETFL, 0); 576 if (-1 == flags) { 577 msyslog(LOG_ERR, "start_blocking_thread: fcntl(F_GETFL) %m"); 578 exit(1); 579 } 580 rc = fcntl(c->resp_read_pipe, F_SETFL, O_NONBLOCK | flags); 581 if (-1 == rc) { 582 msyslog(LOG_ERR, 583 "start_blocking_thread: fcntl(F_SETFL, O_NONBLOCK) %m"); 584 exit(1); 585 } 586 (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, FALSE); 587 pthread_attr_init(&thr_attr); 588 pthread_attr_setdetachstate(&thr_attr, PTHREAD_CREATE_DETACHED); 589 #if defined(HAVE_PTHREAD_ATTR_GETSTACKSIZE) && \ 590 defined(HAVE_PTHREAD_ATTR_SETSTACKSIZE) 591 rc = pthread_attr_getstacksize(&thr_attr, &ostacksize); 592 if (0 != rc) { 593 msyslog(LOG_ERR, 594 "start_blocking_thread: pthread_attr_getstacksize() -> %s", 595 strerror(rc)); 596 } else { 597 if (ostacksize < THREAD_MINSTACKSIZE) 598 nstacksize = THREAD_MINSTACKSIZE; 599 else if (ostacksize > THREAD_MAXSTACKSIZE) 600 nstacksize = THREAD_MAXSTACKSIZE; 601 else 602 nstacksize = ostacksize; 603 if (nstacksize != ostacksize) 604 rc = pthread_attr_setstacksize(&thr_attr, nstacksize); 605 if (0 != rc) 606 msyslog(LOG_ERR, 607 "start_blocking_thread: pthread_attr_setstacksize(0x%lx -> 0x%lx) -> %s", 608 (u_long)ostacksize, (u_long)nstacksize, 609 strerror(rc)); 610 } 611 #else 612 UNUSED_ARG(nstacksize); 613 UNUSED_ARG(ostacksize); 614 #endif 615 #if defined(PTHREAD_SCOPE_SYSTEM) && defined(NEED_PTHREAD_SCOPE_SYSTEM) 616 pthread_attr_setscope(&thr_attr, PTHREAD_SCOPE_SYSTEM); 617 #endif 618 c->thread_ref = emalloc_zero(sizeof(*c->thread_ref)); 619 block_thread_signals(&saved_sig_mask); 620 rc = pthread_create(&c->thr_table[0], &thr_attr, 621 &blocking_thread, c); 622 pthread_sigmask(SIG_SETMASK, &saved_sig_mask, NULL); 623 pthread_attr_destroy(&thr_attr); 624 if (0 != rc) { 625 msyslog(LOG_ERR, "start_blocking_thread: pthread_create() -> %s", 626 strerror(rc)); 627 exit(1); 628 } 629 c->thread_ref = &c->thr_table[0]; 630 } 631 #endif 632 633 /* -------------------------------------------------------------------- 634 * block_thread_signals() 635 * 636 * Temporarily block signals used by ntpd main thread, so that signal 637 * mask inherited by child threads leaves them blocked. Returns prior 638 * active signal mask via pmask, to be restored by the main thread 639 * after pthread_create(). 640 */ 641 #ifndef SYS_WINNT 642 void 643 block_thread_signals( 644 sigset_t * pmask 645 ) 646 { 647 sigset_t block; 648 649 sigemptyset(&block); 650 # ifdef HAVE_SIGNALED_IO 651 # ifdef SIGIO 652 sigaddset(&block, SIGIO); 653 # endif 654 # ifdef SIGPOLL 655 sigaddset(&block, SIGPOLL); 656 # endif 657 # endif /* HAVE_SIGNALED_IO */ 658 sigaddset(&block, SIGALRM); 659 sigaddset(&block, MOREDEBUGSIG); 660 sigaddset(&block, LESSDEBUGSIG); 661 # ifdef SIGDIE1 662 sigaddset(&block, SIGDIE1); 663 # endif 664 # ifdef SIGDIE2 665 sigaddset(&block, SIGDIE2); 666 # endif 667 # ifdef SIGDIE3 668 sigaddset(&block, SIGDIE3); 669 # endif 670 # ifdef SIGDIE4 671 sigaddset(&block, SIGDIE4); 672 # endif 673 # ifdef SIGBUS 674 sigaddset(&block, SIGBUS); 675 # endif 676 sigemptyset(pmask); 677 pthread_sigmask(SIG_BLOCK, &block, pmask); 678 } 679 #endif /* !SYS_WINNT */ 680 681 682 /* -------------------------------------------------------------------- 683 * Create & destroy semaphores. This is sufficiently different between 684 * POSIX and Windows to warrant wrapper functions and close enough to 685 * use the concept of synchronization via semaphore for all platforms. 686 */ 687 static sem_ref 688 create_sema( 689 sema_type* semptr, 690 u_int inival, 691 u_int maxval) 692 { 693 #ifdef SYS_WINNT 694 695 long svini, svmax; 696 if (NULL != semptr) { 697 svini = (inival < LONG_MAX) 698 ? (long)inival : LONG_MAX; 699 svmax = (maxval < LONG_MAX && maxval > 0) 700 ? (long)maxval : LONG_MAX; 701 semptr->shnd = CreateSemaphore(NULL, svini, svmax, NULL); 702 if (NULL == semptr->shnd) 703 semptr = NULL; 704 } 705 706 #else 707 708 (void)maxval; 709 if (semptr && sem_init(semptr, FALSE, inival)) 710 semptr = NULL; 711 712 #endif 713 714 return semptr; 715 } 716 717 /* ------------------------------------------------------------------ */ 718 static sem_ref 719 delete_sema( 720 sem_ref obj) 721 { 722 723 # ifdef SYS_WINNT 724 725 if (obj) { 726 if (obj->shnd) 727 CloseHandle(obj->shnd); 728 obj->shnd = NULL; 729 } 730 731 # else 732 733 if (obj) 734 sem_destroy(obj); 735 736 # endif 737 738 return NULL; 739 } 740 741 /* -------------------------------------------------------------------- 742 * prepare_child_sems() 743 * 744 * create sync & access semaphores 745 * 746 * All semaphores are cleared, only the access semaphore has 1 unit. 747 * Childs wait on 'workitems_pending', then grabs 'sema_access' 748 * and dequeues jobs. When done, 'sema_access' is given one unit back. 749 * 750 * The producer grabs 'sema_access', manages the queue, restores 751 * 'sema_access' and puts one unit into 'workitems_pending'. 752 * 753 * The story goes the same for the response queue. 754 */ 755 static void 756 prepare_child_sems( 757 blocking_child *c 758 ) 759 { 760 if (NULL == worker_memlock) 761 worker_memlock = create_sema(&worker_mmutex, 1, 1); 762 763 c->accesslock = create_sema(&c->sem_table[0], 1, 1); 764 c->workitems_pending = create_sema(&c->sem_table[1], 0, 0); 765 c->wake_scheduled_sleep = create_sema(&c->sem_table[2], 0, 1); 766 # ifndef WORK_PIPE 767 c->responses_pending = create_sema(&c->sem_table[3], 0, 0); 768 # endif 769 } 770 771 /* -------------------------------------------------------------------- 772 * wait for semaphore. Where the wait can be interrupted, it will 773 * internally resume -- When this function returns, there is either no 774 * semaphore at all, a timeout occurred, or the caller could 775 * successfully take a token from the semaphore. 776 * 777 * For untimed wait, not checking the result of this function at all is 778 * definitely an option. 779 */ 780 static int 781 wait_for_sem( 782 sem_ref sem, 783 struct timespec * timeout /* wall-clock */ 784 ) 785 #ifdef SYS_WINNT 786 { 787 struct timespec now; 788 struct timespec delta; 789 DWORD msec; 790 DWORD rc; 791 792 if (!(sem && sem->shnd)) { 793 errno = EINVAL; 794 return -1; 795 } 796 797 if (NULL == timeout) { 798 msec = INFINITE; 799 } else { 800 getclock(TIMEOFDAY, &now); 801 delta = sub_tspec(*timeout, now); 802 if (delta.tv_sec < 0) { 803 msec = 0; 804 } else if ((delta.tv_sec + 1) >= (MAXDWORD / 1000)) { 805 msec = INFINITE; 806 } else { 807 msec = 1000 * (DWORD)delta.tv_sec; 808 msec += delta.tv_nsec / (1000 * 1000); 809 } 810 } 811 rc = WaitForSingleObject(sem->shnd, msec); 812 if (WAIT_OBJECT_0 == rc) 813 return 0; 814 if (WAIT_TIMEOUT == rc) { 815 errno = ETIMEDOUT; 816 return -1; 817 } 818 msyslog(LOG_ERR, "WaitForSingleObject unexpected 0x%x", rc); 819 errno = EFAULT; 820 return -1; 821 } 822 #else /* pthreads wait_for_sem() follows */ 823 { 824 int rc = -1; 825 826 if (sem) do { 827 if (NULL == timeout) 828 rc = sem_wait(sem); 829 else 830 rc = sem_timedwait(sem, timeout); 831 } while (rc == -1 && errno == EINTR); 832 else 833 errno = EINVAL; 834 835 return rc; 836 } 837 #endif 838 839 /* -------------------------------------------------------------------- 840 * blocking_thread - thread functions have WINAPI (aka 'stdcall') 841 * calling conventions under Windows and POSIX-defined signature 842 * otherwise. 843 */ 844 #ifdef SYS_WINNT 845 u_int WINAPI 846 #else 847 void * 848 #endif 849 blocking_thread( 850 void * ThreadArg 851 ) 852 { 853 blocking_child *c; 854 855 c = ThreadArg; 856 exit_worker(blocking_child_common(c)); 857 858 /* NOTREACHED */ 859 return 0; 860 } 861 862 /* -------------------------------------------------------------------- 863 * req_child_exit() runs in the parent. 864 * 865 * This function is called from from the idle timer, too, and possibly 866 * without a thread being there any longer. Since we have folded up our 867 * tent in that case and all the semaphores are already gone, we simply 868 * ignore this request in this case. 869 * 870 * Since the existence of the semaphores is controlled exclusively by 871 * the parent, there's no risk of data race here. 872 */ 873 int 874 req_child_exit( 875 blocking_child *c 876 ) 877 { 878 return (c->accesslock) 879 ? queue_req_pointer(c, CHILD_EXIT_REQ) 880 : 0; 881 } 882 883 /* -------------------------------------------------------------------- 884 * cleanup_after_child() runs in parent. 885 */ 886 static void 887 cleanup_after_child( 888 blocking_child * c 889 ) 890 { 891 DEBUG_INSIST(!c->reusable); 892 893 # ifdef SYS_WINNT 894 /* The thread was not created in detached state, so we better 895 * clean up. 896 */ 897 if (c->thread_ref && c->thread_ref->thnd) { 898 WaitForSingleObject(c->thread_ref->thnd, INFINITE); 899 INSIST(CloseHandle(c->thread_ref->thnd)); 900 c->thread_ref->thnd = NULL; 901 } 902 # endif 903 c->thread_ref = NULL; 904 905 /* remove semaphores and (if signalling vi IO) pipes */ 906 907 c->accesslock = delete_sema(c->accesslock); 908 c->workitems_pending = delete_sema(c->workitems_pending); 909 c->wake_scheduled_sleep = delete_sema(c->wake_scheduled_sleep); 910 911 # ifdef WORK_PIPE 912 DEBUG_INSIST(-1 != c->resp_read_pipe); 913 DEBUG_INSIST(-1 != c->resp_write_pipe); 914 (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, TRUE); 915 close(c->resp_write_pipe); 916 close(c->resp_read_pipe); 917 c->resp_write_pipe = -1; 918 c->resp_read_pipe = -1; 919 # else 920 DEBUG_INSIST(NULL != c->responses_pending); 921 (*addremove_io_semaphore)(c->responses_pending->shnd, TRUE); 922 c->responses_pending = delete_sema(c->responses_pending); 923 # endif 924 925 /* Is it necessary to check if there are pending requests and 926 * responses? If so, and if there are, what to do with them? 927 */ 928 929 /* re-init buffer index sequencers */ 930 c->head_workitem = 0; 931 c->tail_workitem = 0; 932 c->head_response = 0; 933 c->tail_response = 0; 934 935 c->reusable = TRUE; 936 } 937 938 939 #else /* !WORK_THREAD follows */ 940 char work_thread_nonempty_compilation_unit; 941 #endif 942