1 /* 2 * work_thread.c - threads implementation for blocking worker child. 3 */ 4 #include <config.h> 5 #include "ntp_workimpl.h" 6 7 #ifdef WORK_THREAD 8 9 #include <stdio.h> 10 #include <ctype.h> 11 #include <signal.h> 12 #ifndef SYS_WINNT 13 #include <pthread.h> 14 #endif 15 16 #include "ntp_stdlib.h" 17 #include "ntp_malloc.h" 18 #include "ntp_syslog.h" 19 #include "ntpd.h" 20 #include "ntp_io.h" 21 #include "ntp_assert.h" 22 #include "ntp_unixtime.h" 23 #include "timespecops.h" 24 #include "ntp_worker.h" 25 26 #define CHILD_EXIT_REQ ((blocking_pipe_header *)(intptr_t)-1) 27 #define CHILD_GONE_RESP CHILD_EXIT_REQ 28 #define WORKITEMS_ALLOC_INC 16 29 #define RESPONSES_ALLOC_INC 4 30 31 #ifndef THREAD_MINSTACKSIZE 32 #define THREAD_MINSTACKSIZE (64U * 1024) 33 #endif 34 35 #ifndef DEVOLATILE 36 #define DEVOLATILE(type, var) ((type)(uintptr_t)(volatile void *)(var)) 37 #endif 38 39 #ifdef SYS_WINNT 40 # define thread_exit(c) _endthreadex(c) 41 # define tickle_sem SetEvent 42 #else 43 # define thread_exit(c) pthread_exit((void*)(size_t)(c)) 44 # define tickle_sem sem_post 45 #endif 46 47 #ifdef WORK_PIPE 48 addremove_io_fd_func addremove_io_fd; 49 #else 50 addremove_io_semaphore_func addremove_io_semaphore; 51 #endif 52 53 static void start_blocking_thread(blocking_child *); 54 static void start_blocking_thread_internal(blocking_child *); 55 static void prepare_child_sems(blocking_child *); 56 static int wait_for_sem(sem_ref, struct timespec *); 57 static void ensure_workitems_empty_slot(blocking_child *); 58 static void ensure_workresp_empty_slot(blocking_child *); 59 static int queue_req_pointer(blocking_child *, blocking_pipe_header *); 60 static void cleanup_after_child(blocking_child *); 61 #ifdef SYS_WINNT 62 u_int WINAPI blocking_thread(void *); 63 #else 64 void * blocking_thread(void *); 65 #endif 66 #ifndef SYS_WINNT 67 static void block_thread_signals(sigset_t *); 68 #endif 69 70 71 void 72 exit_worker( 73 int exitcode 74 ) 75 { 76 thread_exit(exitcode); /* see #define thread_exit */ 77 } 78 79 80 int 81 worker_sleep( 82 blocking_child * c, 83 time_t seconds 84 ) 85 { 86 struct timespec until; 87 int rc; 88 89 # ifdef HAVE_CLOCK_GETTIME 90 if (0 != clock_gettime(CLOCK_REALTIME, &until)) { 91 msyslog(LOG_ERR, "worker_sleep: clock_gettime() failed: %m"); 92 return -1; 93 } 94 # else 95 if (0 != getclock(TIMEOFDAY, &until)) { 96 msyslog(LOG_ERR, "worker_sleep: getclock() failed: %m"); 97 return -1; 98 } 99 # endif 100 until.tv_sec += seconds; 101 do { 102 rc = wait_for_sem(c->wake_scheduled_sleep, &until); 103 } while (-1 == rc && EINTR == errno); 104 if (0 == rc) 105 return -1; 106 if (-1 == rc && ETIMEDOUT == errno) 107 return 0; 108 msyslog(LOG_ERR, "worker_sleep: sem_timedwait: %m"); 109 return -1; 110 } 111 112 113 void 114 interrupt_worker_sleep(void) 115 { 116 u_int idx; 117 blocking_child * c; 118 119 for (idx = 0; idx < blocking_children_alloc; idx++) { 120 c = blocking_children[idx]; 121 if (NULL == c || NULL == c->wake_scheduled_sleep) 122 continue; 123 tickle_sem(c->wake_scheduled_sleep); 124 } 125 } 126 127 128 static void 129 ensure_workitems_empty_slot( 130 blocking_child *c 131 ) 132 { 133 const size_t each = sizeof(blocking_children[0]->workitems[0]); 134 size_t new_alloc; 135 size_t old_octets; 136 size_t new_octets; 137 void * nonvol_workitems; 138 139 140 if (c->workitems != NULL && 141 NULL == c->workitems[c->next_workitem]) 142 return; 143 144 new_alloc = c->workitems_alloc + WORKITEMS_ALLOC_INC; 145 old_octets = c->workitems_alloc * each; 146 new_octets = new_alloc * each; 147 nonvol_workitems = DEVOLATILE(void *, c->workitems); 148 c->workitems = erealloc_zero(nonvol_workitems, new_octets, 149 old_octets); 150 if (0 == c->next_workitem) 151 c->next_workitem = c->workitems_alloc; 152 c->workitems_alloc = new_alloc; 153 } 154 155 156 static void 157 ensure_workresp_empty_slot( 158 blocking_child *c 159 ) 160 { 161 const size_t each = sizeof(blocking_children[0]->responses[0]); 162 size_t new_alloc; 163 size_t old_octets; 164 size_t new_octets; 165 void * nonvol_responses; 166 167 if (c->responses != NULL && 168 NULL == c->responses[c->next_response]) 169 return; 170 171 new_alloc = c->responses_alloc + RESPONSES_ALLOC_INC; 172 old_octets = c->responses_alloc * each; 173 new_octets = new_alloc * each; 174 nonvol_responses = DEVOLATILE(void *, c->responses); 175 c->responses = erealloc_zero(nonvol_responses, new_octets, 176 old_octets); 177 if (0 == c->next_response) 178 c->next_response = c->responses_alloc; 179 c->responses_alloc = new_alloc; 180 } 181 182 183 /* 184 * queue_req_pointer() - append a work item or idle exit request to 185 * blocking_workitems[]. 186 */ 187 static int 188 queue_req_pointer( 189 blocking_child * c, 190 blocking_pipe_header * hdr 191 ) 192 { 193 c->workitems[c->next_workitem] = hdr; 194 c->next_workitem = (1 + c->next_workitem) % c->workitems_alloc; 195 196 /* 197 * We only want to signal the wakeup event if the child is 198 * blocking on it, which is indicated by setting the blocking 199 * event. Wait with zero timeout to test. 200 */ 201 /* !!!! if (WAIT_OBJECT_0 == WaitForSingleObject(c->child_is_blocking, 0)) */ 202 tickle_sem(c->blocking_req_ready); 203 204 return 0; 205 } 206 207 208 int 209 send_blocking_req_internal( 210 blocking_child * c, 211 blocking_pipe_header * hdr, 212 void * data 213 ) 214 { 215 blocking_pipe_header * threadcopy; 216 size_t payload_octets; 217 218 REQUIRE(hdr != NULL); 219 REQUIRE(data != NULL); 220 DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == hdr->magic_sig); 221 222 if (hdr->octets <= sizeof(*hdr)) 223 return 1; /* failure */ 224 payload_octets = hdr->octets - sizeof(*hdr); 225 226 ensure_workitems_empty_slot(c); 227 if (NULL == c->thread_ref) { 228 ensure_workresp_empty_slot(c); 229 start_blocking_thread(c); 230 } 231 232 threadcopy = emalloc(hdr->octets); 233 memcpy(threadcopy, hdr, sizeof(*hdr)); 234 memcpy((char *)threadcopy + sizeof(*hdr), data, payload_octets); 235 236 return queue_req_pointer(c, threadcopy); 237 } 238 239 240 blocking_pipe_header * 241 receive_blocking_req_internal( 242 blocking_child * c 243 ) 244 { 245 blocking_pipe_header * req; 246 int rc; 247 248 /* 249 * Child blocks here when idle. SysV semaphores maintain a 250 * count and release from sem_wait() only when it reaches 0. 251 * Windows auto-reset events are simpler, and multiple SetEvent 252 * calls before any thread waits result in a single wakeup. 253 * On Windows, the child drains all workitems each wakeup, while 254 * with SysV semaphores wait_sem() is used before each item. 255 */ 256 #ifdef SYS_WINNT 257 while (NULL == c->workitems[c->next_workeritem]) { 258 /* !!!! SetEvent(c->child_is_blocking); */ 259 rc = wait_for_sem(c->blocking_req_ready, NULL); 260 INSIST(0 == rc); 261 /* !!!! ResetEvent(c->child_is_blocking); */ 262 } 263 #else 264 do { 265 rc = wait_for_sem(c->blocking_req_ready, NULL); 266 } while (-1 == rc && EINTR == errno); 267 INSIST(0 == rc); 268 #endif 269 270 req = c->workitems[c->next_workeritem]; 271 INSIST(NULL != req); 272 c->workitems[c->next_workeritem] = NULL; 273 c->next_workeritem = (1 + c->next_workeritem) % 274 c->workitems_alloc; 275 276 if (CHILD_EXIT_REQ == req) { /* idled out */ 277 send_blocking_resp_internal(c, CHILD_GONE_RESP); 278 req = NULL; 279 } 280 281 return req; 282 } 283 284 285 int 286 send_blocking_resp_internal( 287 blocking_child * c, 288 blocking_pipe_header * resp 289 ) 290 { 291 ensure_workresp_empty_slot(c); 292 293 c->responses[c->next_response] = resp; 294 c->next_response = (1 + c->next_response) % c->responses_alloc; 295 296 #ifdef WORK_PIPE 297 write(c->resp_write_pipe, "", 1); 298 #else 299 tickle_sem(c->blocking_response_ready); 300 #endif 301 302 return 0; 303 } 304 305 306 #ifndef WORK_PIPE 307 void 308 handle_blocking_resp_sem( 309 void * context 310 ) 311 { 312 HANDLE ready; 313 blocking_child * c; 314 u_int idx; 315 316 ready = (HANDLE)context; 317 c = NULL; 318 for (idx = 0; idx < blocking_children_alloc; idx++) { 319 c = blocking_children[idx]; 320 if (c != NULL && c->thread_ref != NULL && 321 ready == c->blocking_response_ready) 322 break; 323 } 324 if (idx < blocking_children_alloc) 325 process_blocking_resp(c); 326 } 327 #endif /* !WORK_PIPE */ 328 329 330 blocking_pipe_header * 331 receive_blocking_resp_internal( 332 blocking_child * c 333 ) 334 { 335 blocking_pipe_header * removed; 336 #ifdef WORK_PIPE 337 int rc; 338 char scratch[32]; 339 340 do { 341 rc = read(c->resp_read_pipe, scratch, sizeof(scratch)); 342 } while (-1 == rc && EINTR == errno); 343 #endif 344 removed = c->responses[c->next_workresp]; 345 if (NULL != removed) { 346 c->responses[c->next_workresp] = NULL; 347 c->next_workresp = (1 + c->next_workresp) % 348 c->responses_alloc; 349 DEBUG_ENSURE(CHILD_GONE_RESP == removed || 350 BLOCKING_RESP_MAGIC == removed->magic_sig); 351 } 352 if (CHILD_GONE_RESP == removed) { 353 cleanup_after_child(c); 354 removed = NULL; 355 } 356 357 return removed; 358 } 359 360 361 static void 362 start_blocking_thread( 363 blocking_child * c 364 ) 365 { 366 367 DEBUG_INSIST(!c->reusable); 368 369 prepare_child_sems(c); 370 start_blocking_thread_internal(c); 371 } 372 373 374 static void 375 start_blocking_thread_internal( 376 blocking_child * c 377 ) 378 #ifdef SYS_WINNT 379 { 380 thr_ref blocking_child_thread; 381 u_int blocking_thread_id; 382 BOOL resumed; 383 384 (*addremove_io_semaphore)(c->blocking_response_ready, FALSE); 385 blocking_child_thread = 386 (HANDLE)_beginthreadex( 387 NULL, 388 0, 389 &blocking_thread, 390 c, 391 CREATE_SUSPENDED, 392 &blocking_thread_id); 393 394 if (NULL == blocking_child_thread) { 395 msyslog(LOG_ERR, "start blocking thread failed: %m"); 396 exit(-1); 397 } 398 c->thread_id = blocking_thread_id; 399 c->thread_ref = blocking_child_thread; 400 /* remember the thread priority is only within the process class */ 401 if (!SetThreadPriority(blocking_child_thread, 402 THREAD_PRIORITY_BELOW_NORMAL)) 403 msyslog(LOG_ERR, "Error lowering blocking thread priority: %m"); 404 405 resumed = ResumeThread(blocking_child_thread); 406 DEBUG_INSIST(resumed); 407 } 408 #else /* pthreads start_blocking_thread_internal() follows */ 409 { 410 # ifdef NEED_PTHREAD_INIT 411 static int pthread_init_called; 412 # endif 413 pthread_attr_t thr_attr; 414 int rc; 415 int saved_errno; 416 int pipe_ends[2]; /* read then write */ 417 int is_pipe; 418 int flags; 419 size_t stacksize; 420 sigset_t saved_sig_mask; 421 422 # ifdef NEED_PTHREAD_INIT 423 /* 424 * from lib/isc/unix/app.c: 425 * BSDI 3.1 seg faults in pthread_sigmask() if we don't do this. 426 */ 427 if (!pthread_init_called) { 428 pthread_init(); 429 pthread_init_called = TRUE; 430 } 431 # endif 432 433 rc = pipe_socketpair(&pipe_ends[0], &is_pipe); 434 if (0 != rc) { 435 msyslog(LOG_ERR, "start_blocking_thread: pipe_socketpair() %m"); 436 exit(1); 437 } 438 c->resp_read_pipe = move_fd(pipe_ends[0]); 439 c->resp_write_pipe = move_fd(pipe_ends[1]); 440 c->ispipe = is_pipe; 441 flags = fcntl(c->resp_read_pipe, F_GETFL, 0); 442 if (-1 == flags) { 443 msyslog(LOG_ERR, "start_blocking_thread: fcntl(F_GETFL) %m"); 444 exit(1); 445 } 446 rc = fcntl(c->resp_read_pipe, F_SETFL, O_NONBLOCK | flags); 447 if (-1 == rc) { 448 msyslog(LOG_ERR, 449 "start_blocking_thread: fcntl(F_SETFL, O_NONBLOCK) %m"); 450 exit(1); 451 } 452 (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, FALSE); 453 pthread_attr_init(&thr_attr); 454 pthread_attr_setdetachstate(&thr_attr, PTHREAD_CREATE_DETACHED); 455 #if defined(HAVE_PTHREAD_ATTR_GETSTACKSIZE) && \ 456 defined(HAVE_PTHREAD_ATTR_SETSTACKSIZE) 457 rc = pthread_attr_getstacksize(&thr_attr, &stacksize); 458 if (-1 == rc) { 459 msyslog(LOG_ERR, 460 "start_blocking_thread: pthread_attr_getstacksize %m"); 461 } else if (stacksize < THREAD_MINSTACKSIZE) { 462 rc = pthread_attr_setstacksize(&thr_attr, 463 THREAD_MINSTACKSIZE); 464 if (-1 == rc) 465 msyslog(LOG_ERR, 466 "start_blocking_thread: pthread_attr_setstacksize(0x%lx -> 0x%lx) %m", 467 (u_long)stacksize, 468 (u_long)THREAD_MINSTACKSIZE); 469 } 470 #else 471 UNUSED_ARG(stacksize); 472 #endif 473 #if defined(PTHREAD_SCOPE_SYSTEM) && defined(NEED_PTHREAD_SCOPE_SYSTEM) 474 pthread_attr_setscope(&thr_attr, PTHREAD_SCOPE_SYSTEM); 475 #endif 476 c->thread_ref = emalloc_zero(sizeof(*c->thread_ref)); 477 block_thread_signals(&saved_sig_mask); 478 rc = pthread_create(c->thread_ref, &thr_attr, 479 &blocking_thread, c); 480 saved_errno = errno; 481 pthread_sigmask(SIG_SETMASK, &saved_sig_mask, NULL); 482 pthread_attr_destroy(&thr_attr); 483 if (0 != rc) { 484 errno = saved_errno; 485 msyslog(LOG_ERR, "pthread_create() blocking child: %m"); 486 exit(1); 487 } 488 } 489 #endif 490 491 492 /* 493 * block_thread_signals() 494 * 495 * Temporarily block signals used by ntpd main thread, so that signal 496 * mask inherited by child threads leaves them blocked. Returns prior 497 * active signal mask via pmask, to be restored by the main thread 498 * after pthread_create(). 499 */ 500 #ifndef SYS_WINNT 501 void 502 block_thread_signals( 503 sigset_t * pmask 504 ) 505 { 506 sigset_t block; 507 508 sigemptyset(&block); 509 # ifdef HAVE_SIGNALED_IO 510 # ifdef SIGIO 511 sigaddset(&block, SIGIO); 512 # endif 513 # ifdef SIGPOLL 514 sigaddset(&block, SIGPOLL); 515 # endif 516 # endif /* HAVE_SIGNALED_IO */ 517 sigaddset(&block, SIGALRM); 518 sigaddset(&block, MOREDEBUGSIG); 519 sigaddset(&block, LESSDEBUGSIG); 520 # ifdef SIGDIE1 521 sigaddset(&block, SIGDIE1); 522 # endif 523 # ifdef SIGDIE2 524 sigaddset(&block, SIGDIE2); 525 # endif 526 # ifdef SIGDIE3 527 sigaddset(&block, SIGDIE3); 528 # endif 529 # ifdef SIGDIE4 530 sigaddset(&block, SIGDIE4); 531 # endif 532 # ifdef SIGBUS 533 sigaddset(&block, SIGBUS); 534 # endif 535 sigemptyset(pmask); 536 pthread_sigmask(SIG_BLOCK, &block, pmask); 537 } 538 #endif /* !SYS_WINNT */ 539 540 541 /* 542 * prepare_child_sems() 543 * 544 * create sync events (semaphores) 545 * child_is_blocking initially unset 546 * blocking_req_ready initially unset 547 * 548 * Child waits for blocking_req_ready to be set after 549 * setting child_is_blocking. blocking_req_ready and 550 * blocking_response_ready are auto-reset, so wake one 551 * waiter and become unset (unsignalled) in one operation. 552 */ 553 static void 554 prepare_child_sems( 555 blocking_child *c 556 ) 557 #ifdef SYS_WINNT 558 { 559 if (NULL == c->blocking_req_ready) { 560 /* manual reset using ResetEvent() */ 561 /* !!!! c->child_is_blocking = CreateEvent(NULL, TRUE, FALSE, NULL); */ 562 /* auto reset - one thread released from wait each set */ 563 c->blocking_req_ready = CreateEvent(NULL, FALSE, FALSE, NULL); 564 c->blocking_response_ready = CreateEvent(NULL, FALSE, FALSE, NULL); 565 c->wake_scheduled_sleep = CreateEvent(NULL, FALSE, FALSE, NULL); 566 } else { 567 /* !!!! ResetEvent(c->child_is_blocking); */ 568 /* ResetEvent(c->blocking_req_ready); */ 569 /* ResetEvent(c->blocking_response_ready); */ 570 /* ResetEvent(c->wake_scheduled_sleep); */ 571 } 572 } 573 #else /* pthreads prepare_child_sems() follows */ 574 { 575 size_t octets; 576 577 if (NULL == c->blocking_req_ready) { 578 octets = sizeof(*c->blocking_req_ready); 579 octets += sizeof(*c->wake_scheduled_sleep); 580 /* !!!! octets += sizeof(*c->child_is_blocking); */ 581 c->blocking_req_ready = emalloc_zero(octets);; 582 c->wake_scheduled_sleep = 1 + c->blocking_req_ready; 583 /* !!!! c->child_is_blocking = 1 + c->wake_scheduled_sleep; */ 584 } else { 585 sem_destroy(c->blocking_req_ready); 586 sem_destroy(c->wake_scheduled_sleep); 587 /* !!!! sem_destroy(c->child_is_blocking); */ 588 } 589 sem_init(c->blocking_req_ready, FALSE, 0); 590 sem_init(c->wake_scheduled_sleep, FALSE, 0); 591 /* !!!! sem_init(c->child_is_blocking, FALSE, 0); */ 592 } 593 #endif 594 595 596 static int 597 wait_for_sem( 598 sem_ref sem, 599 struct timespec * timeout /* wall-clock */ 600 ) 601 #ifdef SYS_WINNT 602 { 603 struct timespec now; 604 struct timespec delta; 605 DWORD msec; 606 DWORD rc; 607 608 if (NULL == timeout) { 609 msec = INFINITE; 610 } else { 611 getclock(TIMEOFDAY, &now); 612 delta = sub_tspec(*timeout, now); 613 if (delta.tv_sec < 0) { 614 msec = 0; 615 } else if ((delta.tv_sec + 1) >= (MAXDWORD / 1000)) { 616 msec = INFINITE; 617 } else { 618 msec = 1000 * (DWORD)delta.tv_sec; 619 msec += delta.tv_nsec / (1000 * 1000); 620 } 621 } 622 rc = WaitForSingleObject(sem, msec); 623 if (WAIT_OBJECT_0 == rc) 624 return 0; 625 if (WAIT_TIMEOUT == rc) { 626 errno = ETIMEDOUT; 627 return -1; 628 } 629 msyslog(LOG_ERR, "WaitForSingleObject unexpected 0x%x", rc); 630 errno = EFAULT; 631 return -1; 632 } 633 #else /* pthreads wait_for_sem() follows */ 634 { 635 int rc; 636 637 if (NULL == timeout) 638 rc = sem_wait(sem); 639 else 640 rc = sem_timedwait(sem, timeout); 641 642 return rc; 643 } 644 #endif 645 646 647 /* 648 * blocking_thread - thread functions have WINAPI calling convention 649 */ 650 #ifdef SYS_WINNT 651 u_int 652 WINAPI 653 #else 654 void * 655 #endif 656 blocking_thread( 657 void * ThreadArg 658 ) 659 { 660 blocking_child *c; 661 662 c = ThreadArg; 663 exit_worker(blocking_child_common(c)); 664 665 /* NOTREACHED */ 666 return 0; 667 } 668 669 670 /* 671 * req_child_exit() runs in the parent. 672 */ 673 int 674 req_child_exit( 675 blocking_child *c 676 ) 677 { 678 return queue_req_pointer(c, CHILD_EXIT_REQ); 679 } 680 681 682 /* 683 * cleanup_after_child() runs in parent. 684 */ 685 static void 686 cleanup_after_child( 687 blocking_child * c 688 ) 689 { 690 u_int idx; 691 692 DEBUG_INSIST(!c->reusable); 693 #ifdef SYS_WINNT 694 INSIST(CloseHandle(c->thread_ref)); 695 #else 696 free(c->thread_ref); 697 #endif 698 c->thread_ref = NULL; 699 c->thread_id = 0; 700 #ifdef WORK_PIPE 701 DEBUG_INSIST(-1 != c->resp_read_pipe); 702 DEBUG_INSIST(-1 != c->resp_write_pipe); 703 (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, TRUE); 704 close(c->resp_write_pipe); 705 close(c->resp_read_pipe); 706 c->resp_write_pipe = -1; 707 c->resp_read_pipe = -1; 708 #else 709 DEBUG_INSIST(NULL != c->blocking_response_ready); 710 (*addremove_io_semaphore)(c->blocking_response_ready, TRUE); 711 #endif 712 for (idx = 0; idx < c->workitems_alloc; idx++) 713 c->workitems[idx] = NULL; 714 c->next_workitem = 0; 715 c->next_workeritem = 0; 716 for (idx = 0; idx < c->responses_alloc; idx++) 717 c->responses[idx] = NULL; 718 c->next_response = 0; 719 c->next_workresp = 0; 720 c->reusable = TRUE; 721 } 722 723 724 #else /* !WORK_THREAD follows */ 725 char work_thread_nonempty_compilation_unit; 726 #endif 727