xref: /freebsd/contrib/ntp/libntp/work_thread.c (revision fafb1ee7bdc5d8a7d07cd03b2fb0bbb76f7a9d7c)
1 /*
2  * work_thread.c - threads implementation for blocking worker child.
3  */
4 #include <config.h>
5 #include "ntp_workimpl.h"
6 
7 #ifdef WORK_THREAD
8 
9 #include <stdio.h>
10 #include <ctype.h>
11 #include <signal.h>
12 #ifndef SYS_WINNT
13 #include <pthread.h>
14 #endif
15 
16 #include "ntp_stdlib.h"
17 #include "ntp_malloc.h"
18 #include "ntp_syslog.h"
19 #include "ntpd.h"
20 #include "ntp_io.h"
21 #include "ntp_assert.h"
22 #include "ntp_unixtime.h"
23 #include "timespecops.h"
24 #include "ntp_worker.h"
25 
26 #define CHILD_EXIT_REQ	((blocking_pipe_header *)(intptr_t)-1)
27 #define CHILD_GONE_RESP	CHILD_EXIT_REQ
28 #define WORKITEMS_ALLOC_INC	16
29 #define RESPONSES_ALLOC_INC	4
30 
31 #ifndef THREAD_MINSTACKSIZE
32 #define THREAD_MINSTACKSIZE	(64U * 1024)
33 #endif
34 
35 #ifdef SYS_WINNT
36 
37 # define thread_exit(c)	_endthreadex(c)
38 # define tickle_sem(sh) ReleaseSemaphore((sh->shnd), 1, NULL)
39 u_int	WINAPI	blocking_thread(void *);
40 static BOOL	same_os_sema(const sem_ref obj, void * osobj);
41 
42 #else
43 
44 # define thread_exit(c)	pthread_exit((void*)(size_t)(c))
45 # define tickle_sem	sem_post
46 void *		blocking_thread(void *);
47 static	void	block_thread_signals(sigset_t *);
48 
49 #endif
50 
51 #ifdef WORK_PIPE
52 addremove_io_fd_func		addremove_io_fd;
53 #else
54 addremove_io_semaphore_func	addremove_io_semaphore;
55 #endif
56 
57 static	void	start_blocking_thread(blocking_child *);
58 static	void	start_blocking_thread_internal(blocking_child *);
59 static	void	prepare_child_sems(blocking_child *);
60 static	int	wait_for_sem(sem_ref, struct timespec *);
61 static	int	ensure_workitems_empty_slot(blocking_child *);
62 static	int	ensure_workresp_empty_slot(blocking_child *);
63 static	int	queue_req_pointer(blocking_child *, blocking_pipe_header *);
64 static	void	cleanup_after_child(blocking_child *);
65 
66 
67 void
68 exit_worker(
69 	int	exitcode
70 	)
71 {
72 	thread_exit(exitcode);	/* see #define thread_exit */
73 }
74 
75 /* --------------------------------------------------------------------
76  * sleep for a given time or until the wakup semaphore is tickled.
77  */
78 int
79 worker_sleep(
80 	blocking_child *	c,
81 	time_t			seconds
82 	)
83 {
84 	struct timespec	until;
85 	int		rc;
86 
87 # ifdef HAVE_CLOCK_GETTIME
88 	if (0 != clock_gettime(CLOCK_REALTIME, &until)) {
89 		msyslog(LOG_ERR, "worker_sleep: clock_gettime() failed: %m");
90 		return -1;
91 	}
92 # else
93 	if (0 != getclock(TIMEOFDAY, &until)) {
94 		msyslog(LOG_ERR, "worker_sleep: getclock() failed: %m");
95 		return -1;
96 	}
97 # endif
98 	until.tv_sec += seconds;
99 	rc = wait_for_sem(c->wake_scheduled_sleep, &until);
100 	if (0 == rc)
101 		return -1;
102 	if (-1 == rc && ETIMEDOUT == errno)
103 		return 0;
104 	msyslog(LOG_ERR, "worker_sleep: sem_timedwait: %m");
105 	return -1;
106 }
107 
108 
109 /* --------------------------------------------------------------------
110  * Wake up a worker that takes a nap.
111  */
112 void
113 interrupt_worker_sleep(void)
114 {
115 	u_int			idx;
116 	blocking_child *	c;
117 
118 	for (idx = 0; idx < blocking_children_alloc; idx++) {
119 		c = blocking_children[idx];
120 		if (NULL == c || NULL == c->wake_scheduled_sleep)
121 			continue;
122 		tickle_sem(c->wake_scheduled_sleep);
123 	}
124 }
125 
126 /* --------------------------------------------------------------------
127  * Make sure there is an empty slot at the head of the request
128  * queue. Tell if the queue is currently empty.
129  */
130 static int
131 ensure_workitems_empty_slot(
132 	blocking_child *c
133 	)
134 {
135 	/*
136 	** !!! PRECONDITION: caller holds access lock!
137 	**
138 	** This simply tries to increase the size of the buffer if it
139 	** becomes full. The resize operation does *not* maintain the
140 	** order of requests, but that should be irrelevant since the
141 	** processing is considered asynchronous anyway.
142 	**
143 	** Return if the buffer is currently empty.
144 	*/
145 
146 	static const size_t each =
147 	    sizeof(blocking_children[0]->workitems[0]);
148 
149 	size_t	new_alloc;
150 	size_t  slots_used;
151 
152 	slots_used = c->head_workitem - c->tail_workitem;
153 	if (slots_used >= c->workitems_alloc) {
154 		new_alloc  = c->workitems_alloc + WORKITEMS_ALLOC_INC;
155 		c->workitems = erealloc(c->workitems, new_alloc * each);
156 		c->tail_workitem   = 0;
157 		c->head_workitem   = c->workitems_alloc;
158 		c->workitems_alloc = new_alloc;
159 	}
160 	return (0 == slots_used);
161 }
162 
163 /* --------------------------------------------------------------------
164  * Make sure there is an empty slot at the head of the response
165  * queue. Tell if the queue is currently empty.
166  */
167 static int
168 ensure_workresp_empty_slot(
169 	blocking_child *c
170 	)
171 {
172 	/*
173 	** !!! PRECONDITION: caller holds access lock!
174 	**
175 	** Works like the companion function above.
176 	*/
177 
178 	static const size_t each =
179 	    sizeof(blocking_children[0]->responses[0]);
180 
181 	size_t	new_alloc;
182 	size_t  slots_used;
183 
184 	slots_used = c->head_response - c->tail_response;
185 	if (slots_used >= c->responses_alloc) {
186 		new_alloc  = c->responses_alloc + RESPONSES_ALLOC_INC;
187 		c->responses = erealloc(c->responses, new_alloc * each);
188 		c->tail_response   = 0;
189 		c->head_response   = c->responses_alloc;
190 		c->responses_alloc = new_alloc;
191 	}
192 	return (0 == slots_used);
193 }
194 
195 
196 /* --------------------------------------------------------------------
197  * queue_req_pointer() - append a work item or idle exit request to
198  *			 blocking_workitems[]. Employ proper locking.
199  */
200 static int
201 queue_req_pointer(
202 	blocking_child	*	c,
203 	blocking_pipe_header *	hdr
204 	)
205 {
206 	size_t qhead;
207 
208 	/* >>>> ACCESS LOCKING STARTS >>>> */
209 	wait_for_sem(c->accesslock, NULL);
210 	ensure_workitems_empty_slot(c);
211 	qhead = c->head_workitem;
212 	c->workitems[qhead % c->workitems_alloc] = hdr;
213 	c->head_workitem = 1 + qhead;
214 	tickle_sem(c->accesslock);
215 	/* <<<< ACCESS LOCKING ENDS <<<< */
216 
217 	/* queue consumer wake-up notification */
218 	tickle_sem(c->workitems_pending);
219 
220 	return 0;
221 }
222 
223 /* --------------------------------------------------------------------
224  * API function to make sure a worker is running, a proper private copy
225  * of the data is made, the data eneterd into the queue and the worker
226  * is signalled.
227  */
228 int
229 send_blocking_req_internal(
230 	blocking_child *	c,
231 	blocking_pipe_header *	hdr,
232 	void *			data
233 	)
234 {
235 	blocking_pipe_header *	threadcopy;
236 	size_t			payload_octets;
237 
238 	REQUIRE(hdr != NULL);
239 	REQUIRE(data != NULL);
240 	DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == hdr->magic_sig);
241 
242 	if (hdr->octets <= sizeof(*hdr))
243 		return 1;	/* failure */
244 	payload_octets = hdr->octets - sizeof(*hdr);
245 
246 	if (NULL == c->thread_ref)
247 		start_blocking_thread(c);
248 	threadcopy = emalloc(hdr->octets);
249 	memcpy(threadcopy, hdr, sizeof(*hdr));
250 	memcpy((char *)threadcopy + sizeof(*hdr), data, payload_octets);
251 
252 	return queue_req_pointer(c, threadcopy);
253 }
254 
255 /* --------------------------------------------------------------------
256  * Wait for the 'incoming queue no longer empty' signal, lock the shared
257  * structure and dequeue an item.
258  */
259 blocking_pipe_header *
260 receive_blocking_req_internal(
261 	blocking_child *	c
262 	)
263 {
264 	blocking_pipe_header *	req;
265 	size_t			qhead, qtail;
266 
267 	req = NULL;
268 	do {
269 		/* wait for tickle from the producer side */
270 		wait_for_sem(c->workitems_pending, NULL);
271 
272 		/* >>>> ACCESS LOCKING STARTS >>>> */
273 		wait_for_sem(c->accesslock, NULL);
274 		qhead = c->head_workitem;
275 		do {
276 			qtail = c->tail_workitem;
277 			if (qhead == qtail)
278 				break;
279 			c->tail_workitem = qtail + 1;
280 			qtail %= c->workitems_alloc;
281 			req = c->workitems[qtail];
282 			c->workitems[qtail] = NULL;
283 		} while (NULL == req);
284 		tickle_sem(c->accesslock);
285 		/* <<<< ACCESS LOCKING ENDS <<<< */
286 
287 	} while (NULL == req);
288 
289 	INSIST(NULL != req);
290 	if (CHILD_EXIT_REQ == req) {	/* idled out */
291 		send_blocking_resp_internal(c, CHILD_GONE_RESP);
292 		req = NULL;
293 	}
294 
295 	return req;
296 }
297 
298 /* --------------------------------------------------------------------
299  * Push a response into the return queue and eventually tickle the
300  * receiver.
301  */
302 int
303 send_blocking_resp_internal(
304 	blocking_child *	c,
305 	blocking_pipe_header *	resp
306 	)
307 {
308 	size_t	qhead;
309 	int	empty;
310 
311 	/* >>>> ACCESS LOCKING STARTS >>>> */
312 	wait_for_sem(c->accesslock, NULL);
313 	empty = ensure_workresp_empty_slot(c);
314 	qhead = c->head_response;
315 	c->responses[qhead % c->responses_alloc] = resp;
316 	c->head_response = 1 + qhead;
317 	tickle_sem(c->accesslock);
318 	/* <<<< ACCESS LOCKING ENDS <<<< */
319 
320 	/* queue consumer wake-up notification */
321 	if (empty)
322 	{
323 #	    ifdef WORK_PIPE
324 		write(c->resp_write_pipe, "", 1);
325 #	    else
326 		tickle_sem(c->responses_pending);
327 #	    endif
328 	}
329 	return 0;
330 }
331 
332 
333 #ifndef WORK_PIPE
334 
335 /* --------------------------------------------------------------------
336  * Check if a (Windows-)hanndle to a semaphore is actually the same we
337  * are using inside the sema wrapper.
338  */
339 static BOOL
340 same_os_sema(
341 	const sem_ref	obj,
342 	void*		osh
343 	)
344 {
345 	return obj && osh && (obj->shnd == (HANDLE)osh);
346 }
347 
348 /* --------------------------------------------------------------------
349  * Find the shared context that associates to an OS handle and make sure
350  * the data is dequeued and processed.
351  */
352 void
353 handle_blocking_resp_sem(
354 	void *	context
355 	)
356 {
357 	blocking_child *	c;
358 	u_int			idx;
359 
360 	c = NULL;
361 	for (idx = 0; idx < blocking_children_alloc; idx++) {
362 		c = blocking_children[idx];
363 		if (c != NULL &&
364 			c->thread_ref != NULL &&
365 			same_os_sema(c->responses_pending, context))
366 			break;
367 	}
368 	if (idx < blocking_children_alloc)
369 		process_blocking_resp(c);
370 }
371 #endif	/* !WORK_PIPE */
372 
373 /* --------------------------------------------------------------------
374  * Fetch the next response from the return queue. In case of signalling
375  * via pipe, make sure the pipe is flushed, too.
376  */
377 blocking_pipe_header *
378 receive_blocking_resp_internal(
379 	blocking_child *	c
380 	)
381 {
382 	blocking_pipe_header *	removed;
383 	size_t			qhead, qtail, slot;
384 
385 #ifdef WORK_PIPE
386 	int			rc;
387 	char			scratch[32];
388 
389 	do
390 		rc = read(c->resp_read_pipe, scratch, sizeof(scratch));
391 	while (-1 == rc && EINTR == errno);
392 #endif
393 
394 	/* >>>> ACCESS LOCKING STARTS >>>> */
395 	wait_for_sem(c->accesslock, NULL);
396 	qhead = c->head_response;
397 	qtail = c->tail_response;
398 	for (removed = NULL; !removed && (qhead != qtail); ++qtail) {
399 		slot = qtail % c->responses_alloc;
400 		removed = c->responses[slot];
401 		c->responses[slot] = NULL;
402 	}
403 	c->tail_response = qtail;
404 	tickle_sem(c->accesslock);
405 	/* <<<< ACCESS LOCKING ENDS <<<< */
406 
407 	if (NULL != removed) {
408 		DEBUG_ENSURE(CHILD_GONE_RESP == removed ||
409 			     BLOCKING_RESP_MAGIC == removed->magic_sig);
410 	}
411 	if (CHILD_GONE_RESP == removed) {
412 		cleanup_after_child(c);
413 		removed = NULL;
414 	}
415 
416 	return removed;
417 }
418 
419 /* --------------------------------------------------------------------
420  * Light up a new worker.
421  */
422 static void
423 start_blocking_thread(
424 	blocking_child *	c
425 	)
426 {
427 
428 	DEBUG_INSIST(!c->reusable);
429 
430 	prepare_child_sems(c);
431 	start_blocking_thread_internal(c);
432 }
433 
434 /* --------------------------------------------------------------------
435  * Create a worker thread. There are several differences between POSIX
436  * and Windows, of course -- most notably the Windows thread is no
437  * detached thread, and we keep the handle around until we want to get
438  * rid of the thread. The notification scheme also differs: Windows
439  * makes use of semaphores in both directions, POSIX uses a pipe for
440  * integration with 'select()' or alike.
441  */
442 static void
443 start_blocking_thread_internal(
444 	blocking_child *	c
445 	)
446 #ifdef SYS_WINNT
447 {
448 	BOOL	resumed;
449 
450 	c->thread_ref = NULL;
451 	(*addremove_io_semaphore)(c->responses_pending->shnd, FALSE);
452 	c->thr_table[0].thnd =
453 		(HANDLE)_beginthreadex(
454 			NULL,
455 			0,
456 			&blocking_thread,
457 			c,
458 			CREATE_SUSPENDED,
459 			NULL);
460 
461 	if (NULL == c->thr_table[0].thnd) {
462 		msyslog(LOG_ERR, "start blocking thread failed: %m");
463 		exit(-1);
464 	}
465 	/* remember the thread priority is only within the process class */
466 	if (!SetThreadPriority(c->thr_table[0].thnd,
467 			       THREAD_PRIORITY_BELOW_NORMAL))
468 		msyslog(LOG_ERR, "Error lowering blocking thread priority: %m");
469 
470 	resumed = ResumeThread(c->thr_table[0].thnd);
471 	DEBUG_INSIST(resumed);
472 	c->thread_ref = &c->thr_table[0];
473 }
474 #else	/* pthreads start_blocking_thread_internal() follows */
475 {
476 # ifdef NEED_PTHREAD_INIT
477 	static int	pthread_init_called;
478 # endif
479 	pthread_attr_t	thr_attr;
480 	int		rc;
481 	int		saved_errno;
482 	int		pipe_ends[2];	/* read then write */
483 	int		is_pipe;
484 	int		flags;
485 	size_t		stacksize;
486 	sigset_t	saved_sig_mask;
487 
488 	c->thread_ref = NULL;
489 
490 # ifdef NEED_PTHREAD_INIT
491 	/*
492 	 * from lib/isc/unix/app.c:
493 	 * BSDI 3.1 seg faults in pthread_sigmask() if we don't do this.
494 	 */
495 	if (!pthread_init_called) {
496 		pthread_init();
497 		pthread_init_called = TRUE;
498 	}
499 # endif
500 
501 	rc = pipe_socketpair(&pipe_ends[0], &is_pipe);
502 	if (0 != rc) {
503 		msyslog(LOG_ERR, "start_blocking_thread: pipe_socketpair() %m");
504 		exit(1);
505 	}
506 	c->resp_read_pipe = move_fd(pipe_ends[0]);
507 	c->resp_write_pipe = move_fd(pipe_ends[1]);
508 	c->ispipe = is_pipe;
509 	flags = fcntl(c->resp_read_pipe, F_GETFL, 0);
510 	if (-1 == flags) {
511 		msyslog(LOG_ERR, "start_blocking_thread: fcntl(F_GETFL) %m");
512 		exit(1);
513 	}
514 	rc = fcntl(c->resp_read_pipe, F_SETFL, O_NONBLOCK | flags);
515 	if (-1 == rc) {
516 		msyslog(LOG_ERR,
517 			"start_blocking_thread: fcntl(F_SETFL, O_NONBLOCK) %m");
518 		exit(1);
519 	}
520 	(*addremove_io_fd)(c->resp_read_pipe, c->ispipe, FALSE);
521 	pthread_attr_init(&thr_attr);
522 	pthread_attr_setdetachstate(&thr_attr, PTHREAD_CREATE_DETACHED);
523 #if defined(HAVE_PTHREAD_ATTR_GETSTACKSIZE) && \
524     defined(HAVE_PTHREAD_ATTR_SETSTACKSIZE)
525 	rc = pthread_attr_getstacksize(&thr_attr, &stacksize);
526 	if (-1 == rc) {
527 		msyslog(LOG_ERR,
528 			"start_blocking_thread: pthread_attr_getstacksize %m");
529 	} else if (stacksize < THREAD_MINSTACKSIZE) {
530 		rc = pthread_attr_setstacksize(&thr_attr,
531 					       THREAD_MINSTACKSIZE);
532 		if (-1 == rc)
533 			msyslog(LOG_ERR,
534 				"start_blocking_thread: pthread_attr_setstacksize(0x%lx -> 0x%lx) %m",
535 				(u_long)stacksize,
536 				(u_long)THREAD_MINSTACKSIZE);
537 	}
538 #else
539 	UNUSED_ARG(stacksize);
540 #endif
541 #if defined(PTHREAD_SCOPE_SYSTEM) && defined(NEED_PTHREAD_SCOPE_SYSTEM)
542 	pthread_attr_setscope(&thr_attr, PTHREAD_SCOPE_SYSTEM);
543 #endif
544 	c->thread_ref = emalloc_zero(sizeof(*c->thread_ref));
545 	block_thread_signals(&saved_sig_mask);
546 	rc = pthread_create(&c->thr_table[0], &thr_attr,
547 			    &blocking_thread, c);
548 	saved_errno = errno;
549 	pthread_sigmask(SIG_SETMASK, &saved_sig_mask, NULL);
550 	pthread_attr_destroy(&thr_attr);
551 	if (0 != rc) {
552 		errno = saved_errno;
553 		msyslog(LOG_ERR, "pthread_create() blocking child: %m");
554 		exit(1);
555 	}
556 	c->thread_ref = &c->thr_table[0];
557 }
558 #endif
559 
560 /* --------------------------------------------------------------------
561  * block_thread_signals()
562  *
563  * Temporarily block signals used by ntpd main thread, so that signal
564  * mask inherited by child threads leaves them blocked.  Returns prior
565  * active signal mask via pmask, to be restored by the main thread
566  * after pthread_create().
567  */
568 #ifndef SYS_WINNT
569 void
570 block_thread_signals(
571 	sigset_t *	pmask
572 	)
573 {
574 	sigset_t	block;
575 
576 	sigemptyset(&block);
577 # ifdef HAVE_SIGNALED_IO
578 #  ifdef SIGIO
579 	sigaddset(&block, SIGIO);
580 #  endif
581 #  ifdef SIGPOLL
582 	sigaddset(&block, SIGPOLL);
583 #  endif
584 # endif	/* HAVE_SIGNALED_IO */
585 	sigaddset(&block, SIGALRM);
586 	sigaddset(&block, MOREDEBUGSIG);
587 	sigaddset(&block, LESSDEBUGSIG);
588 # ifdef SIGDIE1
589 	sigaddset(&block, SIGDIE1);
590 # endif
591 # ifdef SIGDIE2
592 	sigaddset(&block, SIGDIE2);
593 # endif
594 # ifdef SIGDIE3
595 	sigaddset(&block, SIGDIE3);
596 # endif
597 # ifdef SIGDIE4
598 	sigaddset(&block, SIGDIE4);
599 # endif
600 # ifdef SIGBUS
601 	sigaddset(&block, SIGBUS);
602 # endif
603 	sigemptyset(pmask);
604 	pthread_sigmask(SIG_BLOCK, &block, pmask);
605 }
606 #endif	/* !SYS_WINNT */
607 
608 
609 /* --------------------------------------------------------------------
610  * Create & destroy semaphores. This is sufficiently different between
611  * POSIX and Windows to warrant wrapper functions and close enough to
612  * use the concept of synchronization via semaphore for all platforms.
613  */
614 static sem_ref
615 create_sema(
616 	sema_type*	semptr,
617 	u_int		inival,
618 	u_int		maxval)
619 {
620 #ifdef SYS_WINNT
621 
622 	long svini, svmax;
623 	if (NULL != semptr) {
624 		svini = (inival < LONG_MAX)
625 		    ? (long)inival : LONG_MAX;
626 		svmax = (maxval < LONG_MAX && maxval > 0)
627 		    ? (long)maxval : LONG_MAX;
628 		semptr->shnd = CreateSemaphore(NULL, svini, svmax, NULL);
629 		if (NULL == semptr->shnd)
630 			semptr = NULL;
631 	}
632 
633 #else
634 
635 	(void)maxval;
636 	if (semptr && sem_init(semptr, FALSE, inival))
637 		semptr = NULL;
638 
639 #endif
640 
641 	return semptr;
642 }
643 
644 /* ------------------------------------------------------------------ */
645 static sem_ref
646 delete_sema(
647 	sem_ref obj)
648 {
649 
650 #   ifdef SYS_WINNT
651 
652 	if (obj) {
653 		if (obj->shnd)
654 			CloseHandle(obj->shnd);
655 		obj->shnd = NULL;
656 	}
657 
658 #   else
659 
660 	if (obj)
661 		sem_destroy(obj);
662 
663 #   endif
664 
665 	return NULL;
666 }
667 
668 /* --------------------------------------------------------------------
669  * prepare_child_sems()
670  *
671  * create sync & access semaphores
672  *
673  * All semaphores are cleared, only the access semaphore has 1 unit.
674  * Childs wait on 'workitems_pending', then grabs 'sema_access'
675  * and dequeues jobs. When done, 'sema_access' is given one unit back.
676  *
677  * The producer grabs 'sema_access', manages the queue, restores
678  * 'sema_access' and puts one unit into 'workitems_pending'.
679  *
680  * The story goes the same for the response queue.
681  */
682 static void
683 prepare_child_sems(
684 	blocking_child *c
685 	)
686 {
687 	c->accesslock           = create_sema(&c->sem_table[0], 1, 1);
688 	c->workitems_pending    = create_sema(&c->sem_table[1], 0, 0);
689 	c->wake_scheduled_sleep = create_sema(&c->sem_table[2], 0, 1);
690 #   ifndef WORK_PIPE
691 	c->responses_pending    = create_sema(&c->sem_table[3], 0, 0);
692 #   endif
693 }
694 
695 /* --------------------------------------------------------------------
696  * wait for semaphore. Where the wait can be interrupted, it will
697  * internally resume -- When this function returns, there is either no
698  * semaphore at all, a timeout occurred, or the caller could
699  * successfully take a token from the semaphore.
700  *
701  * For untimed wait, not checking the result of this function at all is
702  * definitely an option.
703  */
704 static int
705 wait_for_sem(
706 	sem_ref			sem,
707 	struct timespec *	timeout		/* wall-clock */
708 	)
709 #ifdef SYS_WINNT
710 {
711 	struct timespec now;
712 	struct timespec delta;
713 	DWORD		msec;
714 	DWORD		rc;
715 
716 	if (!(sem && sem->shnd)) {
717 		errno = EINVAL;
718 		return -1;
719 	}
720 
721 	if (NULL == timeout) {
722 		msec = INFINITE;
723 	} else {
724 		getclock(TIMEOFDAY, &now);
725 		delta = sub_tspec(*timeout, now);
726 		if (delta.tv_sec < 0) {
727 			msec = 0;
728 		} else if ((delta.tv_sec + 1) >= (MAXDWORD / 1000)) {
729 			msec = INFINITE;
730 		} else {
731 			msec = 1000 * (DWORD)delta.tv_sec;
732 			msec += delta.tv_nsec / (1000 * 1000);
733 		}
734 	}
735 	rc = WaitForSingleObject(sem->shnd, msec);
736 	if (WAIT_OBJECT_0 == rc)
737 		return 0;
738 	if (WAIT_TIMEOUT == rc) {
739 		errno = ETIMEDOUT;
740 		return -1;
741 	}
742 	msyslog(LOG_ERR, "WaitForSingleObject unexpected 0x%x", rc);
743 	errno = EFAULT;
744 	return -1;
745 }
746 #else	/* pthreads wait_for_sem() follows */
747 {
748 	int rc = -1;
749 
750 	if (sem) do {
751 			if (NULL == timeout)
752 				rc = sem_wait(sem);
753 			else
754 				rc = sem_timedwait(sem, timeout);
755 		} while (rc == -1 && errno == EINTR);
756 	else
757 		errno = EINVAL;
758 
759 	return rc;
760 }
761 #endif
762 
763 /* --------------------------------------------------------------------
764  * blocking_thread - thread functions have WINAPI (aka 'stdcall')
765  * calling conventions under Windows and POSIX-defined signature
766  * otherwise.
767  */
768 #ifdef SYS_WINNT
769 u_int WINAPI
770 #else
771 void *
772 #endif
773 blocking_thread(
774 	void *	ThreadArg
775 	)
776 {
777 	blocking_child *c;
778 
779 	c = ThreadArg;
780 	exit_worker(blocking_child_common(c));
781 
782 	/* NOTREACHED */
783 	return 0;
784 }
785 
786 /* --------------------------------------------------------------------
787  * req_child_exit() runs in the parent.
788  *
789  * This function is called from from the idle timer, too, and possibly
790  * without a thread being there any longer. Since we have folded up our
791  * tent in that case and all the semaphores are already gone, we simply
792  * ignore this request in this case.
793  *
794  * Since the existence of the semaphores is controlled exclusively by
795  * the parent, there's no risk of data race here.
796  */
797 int
798 req_child_exit(
799 	blocking_child *c
800 	)
801 {
802 	return (c->accesslock)
803 	    ? queue_req_pointer(c, CHILD_EXIT_REQ)
804 	    : 0;
805 }
806 
807 /* --------------------------------------------------------------------
808  * cleanup_after_child() runs in parent.
809  */
810 static void
811 cleanup_after_child(
812 	blocking_child *	c
813 	)
814 {
815 	DEBUG_INSIST(!c->reusable);
816 
817 #   ifdef SYS_WINNT
818 	/* The thread was not created in detached state, so we better
819 	 * clean up.
820 	 */
821 	if (c->thread_ref && c->thread_ref->thnd) {
822 		WaitForSingleObject(c->thread_ref->thnd, INFINITE);
823 		INSIST(CloseHandle(c->thread_ref->thnd));
824 		c->thread_ref->thnd = NULL;
825 	}
826 #   endif
827 	c->thread_ref = NULL;
828 
829 	/* remove semaphores and (if signalling vi IO) pipes */
830 
831 	c->accesslock           = delete_sema(c->accesslock);
832 	c->workitems_pending    = delete_sema(c->workitems_pending);
833 	c->wake_scheduled_sleep = delete_sema(c->wake_scheduled_sleep);
834 
835 #   ifdef WORK_PIPE
836 	DEBUG_INSIST(-1 != c->resp_read_pipe);
837 	DEBUG_INSIST(-1 != c->resp_write_pipe);
838 	(*addremove_io_fd)(c->resp_read_pipe, c->ispipe, TRUE);
839 	close(c->resp_write_pipe);
840 	close(c->resp_read_pipe);
841 	c->resp_write_pipe = -1;
842 	c->resp_read_pipe = -1;
843 #   else
844 	DEBUG_INSIST(NULL != c->responses_pending);
845 	(*addremove_io_semaphore)(c->responses_pending->shnd, TRUE);
846 	c->responses_pending = delete_sema(c->responses_pending);
847 #   endif
848 
849 	/* Is it necessary to check if there are pending requests and
850 	 * responses? If so, and if there are, what to do with them?
851 	 */
852 
853 	/* re-init buffer index sequencers */
854 	c->head_workitem = 0;
855 	c->tail_workitem = 0;
856 	c->head_response = 0;
857 	c->tail_response = 0;
858 
859 	c->reusable = TRUE;
860 }
861 
862 
863 #else	/* !WORK_THREAD follows */
864 char work_thread_nonempty_compilation_unit;
865 #endif
866