xref: /freebsd/contrib/ntp/libntp/work_thread.c (revision 1f4bcc459a76b7aa664f3fd557684cd0ba6da352)
1 /*
2  * work_thread.c - threads implementation for blocking worker child.
3  */
4 #include <config.h>
5 #include "ntp_workimpl.h"
6 
7 #ifdef WORK_THREAD
8 
9 #include <stdio.h>
10 #include <ctype.h>
11 #include <signal.h>
12 #ifndef SYS_WINNT
13 #include <pthread.h>
14 #endif
15 
16 #include "ntp_stdlib.h"
17 #include "ntp_malloc.h"
18 #include "ntp_syslog.h"
19 #include "ntpd.h"
20 #include "ntp_io.h"
21 #include "ntp_assert.h"
22 #include "ntp_unixtime.h"
23 #include "timespecops.h"
24 #include "ntp_worker.h"
25 
26 #define CHILD_EXIT_REQ	((blocking_pipe_header *)(intptr_t)-1)
27 #define CHILD_GONE_RESP	CHILD_EXIT_REQ
28 /* Queue size increments:
29  * The request queue grows a bit faster than the response queue -- the
30  * deamon can push requests and pull results faster on avarage than the
31  * worker can process requests and push results...  If this really pays
32  * off is debatable.
33  */
34 #define WORKITEMS_ALLOC_INC	16
35 #define RESPONSES_ALLOC_INC	4
36 
37 /* Fiddle with min/max stack sizes. 64kB minimum seems to work, so we
38  * set the maximum to 256kB. If the minimum goes below the
39  * system-defined minimum stack size, we have to adjust accordingly.
40  */
41 #ifndef THREAD_MINSTACKSIZE
42 # define THREAD_MINSTACKSIZE	(64U * 1024)
43 #endif
44 #ifndef __sun
45 #if defined(PTHREAD_STACK_MIN) && THREAD_MINSTACKSIZE < PTHREAD_STACK_MIN
46 # undef THREAD_MINSTACKSIZE
47 # define THREAD_MINSTACKSIZE PTHREAD_STACK_MIN
48 #endif
49 #endif
50 
51 #ifndef THREAD_MAXSTACKSIZE
52 # define THREAD_MAXSTACKSIZE	(256U * 1024)
53 #endif
54 #if THREAD_MAXSTACKSIZE < THREAD_MINSTACKSIZE
55 # undef  THREAD_MAXSTACKSIZE
56 # define THREAD_MAXSTACKSIZE THREAD_MINSTACKSIZE
57 #endif
58 
59 
60 #ifdef SYS_WINNT
61 
62 # define thread_exit(c)	_endthreadex(c)
63 # define tickle_sem(sh) ReleaseSemaphore((sh->shnd), 1, NULL)
64 u_int	WINAPI	blocking_thread(void *);
65 static BOOL	same_os_sema(const sem_ref obj, void * osobj);
66 
67 #else
68 
69 # define thread_exit(c)	pthread_exit((void*)(size_t)(c))
70 # define tickle_sem	sem_post
71 void *		blocking_thread(void *);
72 static	void	block_thread_signals(sigset_t *);
73 
74 #endif
75 
76 #ifdef WORK_PIPE
77 addremove_io_fd_func		addremove_io_fd;
78 #else
79 addremove_io_semaphore_func	addremove_io_semaphore;
80 #endif
81 
82 static	void	start_blocking_thread(blocking_child *);
83 static	void	start_blocking_thread_internal(blocking_child *);
84 static	void	prepare_child_sems(blocking_child *);
85 static	int	wait_for_sem(sem_ref, struct timespec *);
86 static	int	ensure_workitems_empty_slot(blocking_child *);
87 static	int	ensure_workresp_empty_slot(blocking_child *);
88 static	int	queue_req_pointer(blocking_child *, blocking_pipe_header *);
89 static	void	cleanup_after_child(blocking_child *);
90 
91 
92 void
93 exit_worker(
94 	int	exitcode
95 	)
96 {
97 	thread_exit(exitcode);	/* see #define thread_exit */
98 }
99 
100 /* --------------------------------------------------------------------
101  * sleep for a given time or until the wakup semaphore is tickled.
102  */
103 int
104 worker_sleep(
105 	blocking_child *	c,
106 	time_t			seconds
107 	)
108 {
109 	struct timespec	until;
110 	int		rc;
111 
112 # ifdef HAVE_CLOCK_GETTIME
113 	if (0 != clock_gettime(CLOCK_REALTIME, &until)) {
114 		msyslog(LOG_ERR, "worker_sleep: clock_gettime() failed: %m");
115 		return -1;
116 	}
117 # else
118 	if (0 != getclock(TIMEOFDAY, &until)) {
119 		msyslog(LOG_ERR, "worker_sleep: getclock() failed: %m");
120 		return -1;
121 	}
122 # endif
123 	until.tv_sec += seconds;
124 	rc = wait_for_sem(c->wake_scheduled_sleep, &until);
125 	if (0 == rc)
126 		return -1;
127 	if (-1 == rc && ETIMEDOUT == errno)
128 		return 0;
129 	msyslog(LOG_ERR, "worker_sleep: sem_timedwait: %m");
130 	return -1;
131 }
132 
133 
134 /* --------------------------------------------------------------------
135  * Wake up a worker that takes a nap.
136  */
137 void
138 interrupt_worker_sleep(void)
139 {
140 	u_int			idx;
141 	blocking_child *	c;
142 
143 	for (idx = 0; idx < blocking_children_alloc; idx++) {
144 		c = blocking_children[idx];
145 		if (NULL == c || NULL == c->wake_scheduled_sleep)
146 			continue;
147 		tickle_sem(c->wake_scheduled_sleep);
148 	}
149 }
150 
151 /* --------------------------------------------------------------------
152  * Make sure there is an empty slot at the head of the request
153  * queue. Tell if the queue is currently empty.
154  */
155 static int
156 ensure_workitems_empty_slot(
157 	blocking_child *c
158 	)
159 {
160 	/*
161 	** !!! PRECONDITION: caller holds access lock!
162 	**
163 	** This simply tries to increase the size of the buffer if it
164 	** becomes full. The resize operation does *not* maintain the
165 	** order of requests, but that should be irrelevant since the
166 	** processing is considered asynchronous anyway.
167 	**
168 	** Return if the buffer is currently empty.
169 	*/
170 
171 	static const size_t each =
172 	    sizeof(blocking_children[0]->workitems[0]);
173 
174 	size_t	new_alloc;
175 	size_t  slots_used;
176 	size_t	sidx;
177 
178 	slots_used = c->head_workitem - c->tail_workitem;
179 	if (slots_used >= c->workitems_alloc) {
180 		new_alloc  = c->workitems_alloc + WORKITEMS_ALLOC_INC;
181 		c->workitems = erealloc(c->workitems, new_alloc * each);
182 		for (sidx = c->workitems_alloc; sidx < new_alloc; ++sidx)
183 		    c->workitems[sidx] = NULL;
184 		c->tail_workitem   = 0;
185 		c->head_workitem   = c->workitems_alloc;
186 		c->workitems_alloc = new_alloc;
187 	}
188 	INSIST(NULL == c->workitems[c->head_workitem % c->workitems_alloc]);
189 	return (0 == slots_used);
190 }
191 
192 /* --------------------------------------------------------------------
193  * Make sure there is an empty slot at the head of the response
194  * queue. Tell if the queue is currently empty.
195  */
196 static int
197 ensure_workresp_empty_slot(
198 	blocking_child *c
199 	)
200 {
201 	/*
202 	** !!! PRECONDITION: caller holds access lock!
203 	**
204 	** Works like the companion function above.
205 	*/
206 
207 	static const size_t each =
208 	    sizeof(blocking_children[0]->responses[0]);
209 
210 	size_t	new_alloc;
211 	size_t  slots_used;
212 	size_t	sidx;
213 
214 	slots_used = c->head_response - c->tail_response;
215 	if (slots_used >= c->responses_alloc) {
216 		new_alloc  = c->responses_alloc + RESPONSES_ALLOC_INC;
217 		c->responses = erealloc(c->responses, new_alloc * each);
218 		for (sidx = c->responses_alloc; sidx < new_alloc; ++sidx)
219 		    c->responses[sidx] = NULL;
220 		c->tail_response   = 0;
221 		c->head_response   = c->responses_alloc;
222 		c->responses_alloc = new_alloc;
223 	}
224 	INSIST(NULL == c->responses[c->head_response % c->responses_alloc]);
225 	return (0 == slots_used);
226 }
227 
228 
229 /* --------------------------------------------------------------------
230  * queue_req_pointer() - append a work item or idle exit request to
231  *			 blocking_workitems[]. Employ proper locking.
232  */
233 static int
234 queue_req_pointer(
235 	blocking_child	*	c,
236 	blocking_pipe_header *	hdr
237 	)
238 {
239 	size_t qhead;
240 
241 	/* >>>> ACCESS LOCKING STARTS >>>> */
242 	wait_for_sem(c->accesslock, NULL);
243 	ensure_workitems_empty_slot(c);
244 	qhead = c->head_workitem;
245 	c->workitems[qhead % c->workitems_alloc] = hdr;
246 	c->head_workitem = 1 + qhead;
247 	tickle_sem(c->accesslock);
248 	/* <<<< ACCESS LOCKING ENDS <<<< */
249 
250 	/* queue consumer wake-up notification */
251 	tickle_sem(c->workitems_pending);
252 
253 	return 0;
254 }
255 
256 /* --------------------------------------------------------------------
257  * API function to make sure a worker is running, a proper private copy
258  * of the data is made, the data eneterd into the queue and the worker
259  * is signalled.
260  */
261 int
262 send_blocking_req_internal(
263 	blocking_child *	c,
264 	blocking_pipe_header *	hdr,
265 	void *			data
266 	)
267 {
268 	blocking_pipe_header *	threadcopy;
269 	size_t			payload_octets;
270 
271 	REQUIRE(hdr != NULL);
272 	REQUIRE(data != NULL);
273 	DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == hdr->magic_sig);
274 
275 	if (hdr->octets <= sizeof(*hdr))
276 		return 1;	/* failure */
277 	payload_octets = hdr->octets - sizeof(*hdr);
278 
279 	if (NULL == c->thread_ref)
280 		start_blocking_thread(c);
281 	threadcopy = emalloc(hdr->octets);
282 	memcpy(threadcopy, hdr, sizeof(*hdr));
283 	memcpy((char *)threadcopy + sizeof(*hdr), data, payload_octets);
284 
285 	return queue_req_pointer(c, threadcopy);
286 }
287 
288 /* --------------------------------------------------------------------
289  * Wait for the 'incoming queue no longer empty' signal, lock the shared
290  * structure and dequeue an item.
291  */
292 blocking_pipe_header *
293 receive_blocking_req_internal(
294 	blocking_child *	c
295 	)
296 {
297 	blocking_pipe_header *	req;
298 	size_t			qhead, qtail;
299 
300 	req = NULL;
301 	do {
302 		/* wait for tickle from the producer side */
303 		wait_for_sem(c->workitems_pending, NULL);
304 
305 		/* >>>> ACCESS LOCKING STARTS >>>> */
306 		wait_for_sem(c->accesslock, NULL);
307 		qhead = c->head_workitem;
308 		do {
309 			qtail = c->tail_workitem;
310 			if (qhead == qtail)
311 				break;
312 			c->tail_workitem = qtail + 1;
313 			qtail %= c->workitems_alloc;
314 			req = c->workitems[qtail];
315 			c->workitems[qtail] = NULL;
316 		} while (NULL == req);
317 		tickle_sem(c->accesslock);
318 		/* <<<< ACCESS LOCKING ENDS <<<< */
319 
320 	} while (NULL == req);
321 
322 	INSIST(NULL != req);
323 	if (CHILD_EXIT_REQ == req) {	/* idled out */
324 		send_blocking_resp_internal(c, CHILD_GONE_RESP);
325 		req = NULL;
326 	}
327 
328 	return req;
329 }
330 
331 /* --------------------------------------------------------------------
332  * Push a response into the return queue and eventually tickle the
333  * receiver.
334  */
335 int
336 send_blocking_resp_internal(
337 	blocking_child *	c,
338 	blocking_pipe_header *	resp
339 	)
340 {
341 	size_t	qhead;
342 	int	empty;
343 
344 	/* >>>> ACCESS LOCKING STARTS >>>> */
345 	wait_for_sem(c->accesslock, NULL);
346 	empty = ensure_workresp_empty_slot(c);
347 	qhead = c->head_response;
348 	c->responses[qhead % c->responses_alloc] = resp;
349 	c->head_response = 1 + qhead;
350 	tickle_sem(c->accesslock);
351 	/* <<<< ACCESS LOCKING ENDS <<<< */
352 
353 	/* queue consumer wake-up notification */
354 	if (empty)
355 	{
356 #	    ifdef WORK_PIPE
357 		write(c->resp_write_pipe, "", 1);
358 #	    else
359 		tickle_sem(c->responses_pending);
360 #	    endif
361 	}
362 	return 0;
363 }
364 
365 
366 #ifndef WORK_PIPE
367 
368 /* --------------------------------------------------------------------
369  * Check if a (Windows-)hanndle to a semaphore is actually the same we
370  * are using inside the sema wrapper.
371  */
372 static BOOL
373 same_os_sema(
374 	const sem_ref	obj,
375 	void*		osh
376 	)
377 {
378 	return obj && osh && (obj->shnd == (HANDLE)osh);
379 }
380 
381 /* --------------------------------------------------------------------
382  * Find the shared context that associates to an OS handle and make sure
383  * the data is dequeued and processed.
384  */
385 void
386 handle_blocking_resp_sem(
387 	void *	context
388 	)
389 {
390 	blocking_child *	c;
391 	u_int			idx;
392 
393 	c = NULL;
394 	for (idx = 0; idx < blocking_children_alloc; idx++) {
395 		c = blocking_children[idx];
396 		if (c != NULL &&
397 			c->thread_ref != NULL &&
398 			same_os_sema(c->responses_pending, context))
399 			break;
400 	}
401 	if (idx < blocking_children_alloc)
402 		process_blocking_resp(c);
403 }
404 #endif	/* !WORK_PIPE */
405 
406 /* --------------------------------------------------------------------
407  * Fetch the next response from the return queue. In case of signalling
408  * via pipe, make sure the pipe is flushed, too.
409  */
410 blocking_pipe_header *
411 receive_blocking_resp_internal(
412 	blocking_child *	c
413 	)
414 {
415 	blocking_pipe_header *	removed;
416 	size_t			qhead, qtail, slot;
417 
418 #ifdef WORK_PIPE
419 	int			rc;
420 	char			scratch[32];
421 
422 	do
423 		rc = read(c->resp_read_pipe, scratch, sizeof(scratch));
424 	while (-1 == rc && EINTR == errno);
425 #endif
426 
427 	/* >>>> ACCESS LOCKING STARTS >>>> */
428 	wait_for_sem(c->accesslock, NULL);
429 	qhead = c->head_response;
430 	qtail = c->tail_response;
431 	for (removed = NULL; !removed && (qhead != qtail); ++qtail) {
432 		slot = qtail % c->responses_alloc;
433 		removed = c->responses[slot];
434 		c->responses[slot] = NULL;
435 	}
436 	c->tail_response = qtail;
437 	tickle_sem(c->accesslock);
438 	/* <<<< ACCESS LOCKING ENDS <<<< */
439 
440 	if (NULL != removed) {
441 		DEBUG_ENSURE(CHILD_GONE_RESP == removed ||
442 			     BLOCKING_RESP_MAGIC == removed->magic_sig);
443 	}
444 	if (CHILD_GONE_RESP == removed) {
445 		cleanup_after_child(c);
446 		removed = NULL;
447 	}
448 
449 	return removed;
450 }
451 
452 /* --------------------------------------------------------------------
453  * Light up a new worker.
454  */
455 static void
456 start_blocking_thread(
457 	blocking_child *	c
458 	)
459 {
460 
461 	DEBUG_INSIST(!c->reusable);
462 
463 	prepare_child_sems(c);
464 	start_blocking_thread_internal(c);
465 }
466 
467 /* --------------------------------------------------------------------
468  * Create a worker thread. There are several differences between POSIX
469  * and Windows, of course -- most notably the Windows thread is no
470  * detached thread, and we keep the handle around until we want to get
471  * rid of the thread. The notification scheme also differs: Windows
472  * makes use of semaphores in both directions, POSIX uses a pipe for
473  * integration with 'select()' or alike.
474  */
475 static void
476 start_blocking_thread_internal(
477 	blocking_child *	c
478 	)
479 #ifdef SYS_WINNT
480 {
481 	BOOL	resumed;
482 
483 	c->thread_ref = NULL;
484 	(*addremove_io_semaphore)(c->responses_pending->shnd, FALSE);
485 	c->thr_table[0].thnd =
486 		(HANDLE)_beginthreadex(
487 			NULL,
488 			0,
489 			&blocking_thread,
490 			c,
491 			CREATE_SUSPENDED,
492 			NULL);
493 
494 	if (NULL == c->thr_table[0].thnd) {
495 		msyslog(LOG_ERR, "start blocking thread failed: %m");
496 		exit(-1);
497 	}
498 	/* remember the thread priority is only within the process class */
499 	if (!SetThreadPriority(c->thr_table[0].thnd,
500 			       THREAD_PRIORITY_BELOW_NORMAL))
501 		msyslog(LOG_ERR, "Error lowering blocking thread priority: %m");
502 
503 	resumed = ResumeThread(c->thr_table[0].thnd);
504 	DEBUG_INSIST(resumed);
505 	c->thread_ref = &c->thr_table[0];
506 }
507 #else	/* pthreads start_blocking_thread_internal() follows */
508 {
509 # ifdef NEED_PTHREAD_INIT
510 	static int	pthread_init_called;
511 # endif
512 	pthread_attr_t	thr_attr;
513 	int		rc;
514 	int		pipe_ends[2];	/* read then write */
515 	int		is_pipe;
516 	int		flags;
517 	size_t		ostacksize;
518 	size_t		nstacksize;
519 	sigset_t	saved_sig_mask;
520 
521 	c->thread_ref = NULL;
522 
523 # ifdef NEED_PTHREAD_INIT
524 	/*
525 	 * from lib/isc/unix/app.c:
526 	 * BSDI 3.1 seg faults in pthread_sigmask() if we don't do this.
527 	 */
528 	if (!pthread_init_called) {
529 		pthread_init();
530 		pthread_init_called = TRUE;
531 	}
532 # endif
533 
534 	rc = pipe_socketpair(&pipe_ends[0], &is_pipe);
535 	if (0 != rc) {
536 		msyslog(LOG_ERR, "start_blocking_thread: pipe_socketpair() %m");
537 		exit(1);
538 	}
539 	c->resp_read_pipe = move_fd(pipe_ends[0]);
540 	c->resp_write_pipe = move_fd(pipe_ends[1]);
541 	c->ispipe = is_pipe;
542 	flags = fcntl(c->resp_read_pipe, F_GETFL, 0);
543 	if (-1 == flags) {
544 		msyslog(LOG_ERR, "start_blocking_thread: fcntl(F_GETFL) %m");
545 		exit(1);
546 	}
547 	rc = fcntl(c->resp_read_pipe, F_SETFL, O_NONBLOCK | flags);
548 	if (-1 == rc) {
549 		msyslog(LOG_ERR,
550 			"start_blocking_thread: fcntl(F_SETFL, O_NONBLOCK) %m");
551 		exit(1);
552 	}
553 	(*addremove_io_fd)(c->resp_read_pipe, c->ispipe, FALSE);
554 	pthread_attr_init(&thr_attr);
555 	pthread_attr_setdetachstate(&thr_attr, PTHREAD_CREATE_DETACHED);
556 #if defined(HAVE_PTHREAD_ATTR_GETSTACKSIZE) && \
557     defined(HAVE_PTHREAD_ATTR_SETSTACKSIZE)
558 	rc = pthread_attr_getstacksize(&thr_attr, &ostacksize);
559 	if (0 != rc) {
560 		msyslog(LOG_ERR,
561 			"start_blocking_thread: pthread_attr_getstacksize() -> %s",
562 			strerror(rc));
563 	} else {
564 		if (ostacksize < THREAD_MINSTACKSIZE)
565 			nstacksize = THREAD_MINSTACKSIZE;
566 		else if (ostacksize > THREAD_MAXSTACKSIZE)
567 			nstacksize = THREAD_MAXSTACKSIZE;
568 		else
569 			nstacksize = ostacksize;
570 		if (nstacksize != ostacksize)
571 			rc = pthread_attr_setstacksize(&thr_attr, nstacksize);
572 		if (0 != rc)
573 			msyslog(LOG_ERR,
574 				"start_blocking_thread: pthread_attr_setstacksize(0x%lx -> 0x%lx) -> %s",
575 				(u_long)ostacksize, (u_long)nstacksize,
576 				strerror(rc));
577 	}
578 #else
579 	UNUSED_ARG(nstacksize);
580 	UNUSED_ARG(ostacksize);
581 #endif
582 #if defined(PTHREAD_SCOPE_SYSTEM) && defined(NEED_PTHREAD_SCOPE_SYSTEM)
583 	pthread_attr_setscope(&thr_attr, PTHREAD_SCOPE_SYSTEM);
584 #endif
585 	c->thread_ref = emalloc_zero(sizeof(*c->thread_ref));
586 	block_thread_signals(&saved_sig_mask);
587 	rc = pthread_create(&c->thr_table[0], &thr_attr,
588 			    &blocking_thread, c);
589 	pthread_sigmask(SIG_SETMASK, &saved_sig_mask, NULL);
590 	pthread_attr_destroy(&thr_attr);
591 	if (0 != rc) {
592 		msyslog(LOG_ERR, "start_blocking_thread: pthread_create() -> %s",
593 			strerror(rc));
594 		exit(1);
595 	}
596 	c->thread_ref = &c->thr_table[0];
597 }
598 #endif
599 
600 /* --------------------------------------------------------------------
601  * block_thread_signals()
602  *
603  * Temporarily block signals used by ntpd main thread, so that signal
604  * mask inherited by child threads leaves them blocked.  Returns prior
605  * active signal mask via pmask, to be restored by the main thread
606  * after pthread_create().
607  */
608 #ifndef SYS_WINNT
609 void
610 block_thread_signals(
611 	sigset_t *	pmask
612 	)
613 {
614 	sigset_t	block;
615 
616 	sigemptyset(&block);
617 # ifdef HAVE_SIGNALED_IO
618 #  ifdef SIGIO
619 	sigaddset(&block, SIGIO);
620 #  endif
621 #  ifdef SIGPOLL
622 	sigaddset(&block, SIGPOLL);
623 #  endif
624 # endif	/* HAVE_SIGNALED_IO */
625 	sigaddset(&block, SIGALRM);
626 	sigaddset(&block, MOREDEBUGSIG);
627 	sigaddset(&block, LESSDEBUGSIG);
628 # ifdef SIGDIE1
629 	sigaddset(&block, SIGDIE1);
630 # endif
631 # ifdef SIGDIE2
632 	sigaddset(&block, SIGDIE2);
633 # endif
634 # ifdef SIGDIE3
635 	sigaddset(&block, SIGDIE3);
636 # endif
637 # ifdef SIGDIE4
638 	sigaddset(&block, SIGDIE4);
639 # endif
640 # ifdef SIGBUS
641 	sigaddset(&block, SIGBUS);
642 # endif
643 	sigemptyset(pmask);
644 	pthread_sigmask(SIG_BLOCK, &block, pmask);
645 }
646 #endif	/* !SYS_WINNT */
647 
648 
649 /* --------------------------------------------------------------------
650  * Create & destroy semaphores. This is sufficiently different between
651  * POSIX and Windows to warrant wrapper functions and close enough to
652  * use the concept of synchronization via semaphore for all platforms.
653  */
654 static sem_ref
655 create_sema(
656 	sema_type*	semptr,
657 	u_int		inival,
658 	u_int		maxval)
659 {
660 #ifdef SYS_WINNT
661 
662 	long svini, svmax;
663 	if (NULL != semptr) {
664 		svini = (inival < LONG_MAX)
665 		    ? (long)inival : LONG_MAX;
666 		svmax = (maxval < LONG_MAX && maxval > 0)
667 		    ? (long)maxval : LONG_MAX;
668 		semptr->shnd = CreateSemaphore(NULL, svini, svmax, NULL);
669 		if (NULL == semptr->shnd)
670 			semptr = NULL;
671 	}
672 
673 #else
674 
675 	(void)maxval;
676 	if (semptr && sem_init(semptr, FALSE, inival))
677 		semptr = NULL;
678 
679 #endif
680 
681 	return semptr;
682 }
683 
684 /* ------------------------------------------------------------------ */
685 static sem_ref
686 delete_sema(
687 	sem_ref obj)
688 {
689 
690 #   ifdef SYS_WINNT
691 
692 	if (obj) {
693 		if (obj->shnd)
694 			CloseHandle(obj->shnd);
695 		obj->shnd = NULL;
696 	}
697 
698 #   else
699 
700 	if (obj)
701 		sem_destroy(obj);
702 
703 #   endif
704 
705 	return NULL;
706 }
707 
708 /* --------------------------------------------------------------------
709  * prepare_child_sems()
710  *
711  * create sync & access semaphores
712  *
713  * All semaphores are cleared, only the access semaphore has 1 unit.
714  * Childs wait on 'workitems_pending', then grabs 'sema_access'
715  * and dequeues jobs. When done, 'sema_access' is given one unit back.
716  *
717  * The producer grabs 'sema_access', manages the queue, restores
718  * 'sema_access' and puts one unit into 'workitems_pending'.
719  *
720  * The story goes the same for the response queue.
721  */
722 static void
723 prepare_child_sems(
724 	blocking_child *c
725 	)
726 {
727 	c->accesslock           = create_sema(&c->sem_table[0], 1, 1);
728 	c->workitems_pending    = create_sema(&c->sem_table[1], 0, 0);
729 	c->wake_scheduled_sleep = create_sema(&c->sem_table[2], 0, 1);
730 #   ifndef WORK_PIPE
731 	c->responses_pending    = create_sema(&c->sem_table[3], 0, 0);
732 #   endif
733 }
734 
735 /* --------------------------------------------------------------------
736  * wait for semaphore. Where the wait can be interrupted, it will
737  * internally resume -- When this function returns, there is either no
738  * semaphore at all, a timeout occurred, or the caller could
739  * successfully take a token from the semaphore.
740  *
741  * For untimed wait, not checking the result of this function at all is
742  * definitely an option.
743  */
744 static int
745 wait_for_sem(
746 	sem_ref			sem,
747 	struct timespec *	timeout		/* wall-clock */
748 	)
749 #ifdef SYS_WINNT
750 {
751 	struct timespec now;
752 	struct timespec delta;
753 	DWORD		msec;
754 	DWORD		rc;
755 
756 	if (!(sem && sem->shnd)) {
757 		errno = EINVAL;
758 		return -1;
759 	}
760 
761 	if (NULL == timeout) {
762 		msec = INFINITE;
763 	} else {
764 		getclock(TIMEOFDAY, &now);
765 		delta = sub_tspec(*timeout, now);
766 		if (delta.tv_sec < 0) {
767 			msec = 0;
768 		} else if ((delta.tv_sec + 1) >= (MAXDWORD / 1000)) {
769 			msec = INFINITE;
770 		} else {
771 			msec = 1000 * (DWORD)delta.tv_sec;
772 			msec += delta.tv_nsec / (1000 * 1000);
773 		}
774 	}
775 	rc = WaitForSingleObject(sem->shnd, msec);
776 	if (WAIT_OBJECT_0 == rc)
777 		return 0;
778 	if (WAIT_TIMEOUT == rc) {
779 		errno = ETIMEDOUT;
780 		return -1;
781 	}
782 	msyslog(LOG_ERR, "WaitForSingleObject unexpected 0x%x", rc);
783 	errno = EFAULT;
784 	return -1;
785 }
786 #else	/* pthreads wait_for_sem() follows */
787 {
788 	int rc = -1;
789 
790 	if (sem) do {
791 			if (NULL == timeout)
792 				rc = sem_wait(sem);
793 			else
794 				rc = sem_timedwait(sem, timeout);
795 		} while (rc == -1 && errno == EINTR);
796 	else
797 		errno = EINVAL;
798 
799 	return rc;
800 }
801 #endif
802 
803 /* --------------------------------------------------------------------
804  * blocking_thread - thread functions have WINAPI (aka 'stdcall')
805  * calling conventions under Windows and POSIX-defined signature
806  * otherwise.
807  */
808 #ifdef SYS_WINNT
809 u_int WINAPI
810 #else
811 void *
812 #endif
813 blocking_thread(
814 	void *	ThreadArg
815 	)
816 {
817 	blocking_child *c;
818 
819 	c = ThreadArg;
820 	exit_worker(blocking_child_common(c));
821 
822 	/* NOTREACHED */
823 	return 0;
824 }
825 
826 /* --------------------------------------------------------------------
827  * req_child_exit() runs in the parent.
828  *
829  * This function is called from from the idle timer, too, and possibly
830  * without a thread being there any longer. Since we have folded up our
831  * tent in that case and all the semaphores are already gone, we simply
832  * ignore this request in this case.
833  *
834  * Since the existence of the semaphores is controlled exclusively by
835  * the parent, there's no risk of data race here.
836  */
837 int
838 req_child_exit(
839 	blocking_child *c
840 	)
841 {
842 	return (c->accesslock)
843 	    ? queue_req_pointer(c, CHILD_EXIT_REQ)
844 	    : 0;
845 }
846 
847 /* --------------------------------------------------------------------
848  * cleanup_after_child() runs in parent.
849  */
850 static void
851 cleanup_after_child(
852 	blocking_child *	c
853 	)
854 {
855 	DEBUG_INSIST(!c->reusable);
856 
857 #   ifdef SYS_WINNT
858 	/* The thread was not created in detached state, so we better
859 	 * clean up.
860 	 */
861 	if (c->thread_ref && c->thread_ref->thnd) {
862 		WaitForSingleObject(c->thread_ref->thnd, INFINITE);
863 		INSIST(CloseHandle(c->thread_ref->thnd));
864 		c->thread_ref->thnd = NULL;
865 	}
866 #   endif
867 	c->thread_ref = NULL;
868 
869 	/* remove semaphores and (if signalling vi IO) pipes */
870 
871 	c->accesslock           = delete_sema(c->accesslock);
872 	c->workitems_pending    = delete_sema(c->workitems_pending);
873 	c->wake_scheduled_sleep = delete_sema(c->wake_scheduled_sleep);
874 
875 #   ifdef WORK_PIPE
876 	DEBUG_INSIST(-1 != c->resp_read_pipe);
877 	DEBUG_INSIST(-1 != c->resp_write_pipe);
878 	(*addremove_io_fd)(c->resp_read_pipe, c->ispipe, TRUE);
879 	close(c->resp_write_pipe);
880 	close(c->resp_read_pipe);
881 	c->resp_write_pipe = -1;
882 	c->resp_read_pipe = -1;
883 #   else
884 	DEBUG_INSIST(NULL != c->responses_pending);
885 	(*addremove_io_semaphore)(c->responses_pending->shnd, TRUE);
886 	c->responses_pending = delete_sema(c->responses_pending);
887 #   endif
888 
889 	/* Is it necessary to check if there are pending requests and
890 	 * responses? If so, and if there are, what to do with them?
891 	 */
892 
893 	/* re-init buffer index sequencers */
894 	c->head_workitem = 0;
895 	c->tail_workitem = 0;
896 	c->head_response = 0;
897 	c->tail_response = 0;
898 
899 	c->reusable = TRUE;
900 }
901 
902 
903 #else	/* !WORK_THREAD follows */
904 char work_thread_nonempty_compilation_unit;
905 #endif
906