xref: /freebsd/contrib/ntp/libntp/work_thread.c (revision f2b7bf8afcfd630e0fbd8417f1ce974de79feaf0)
1 /*
2  * work_thread.c - threads implementation for blocking worker child.
3  */
4 #include <config.h>
5 #include "ntp_workimpl.h"
6 
7 #ifdef WORK_THREAD
8 
9 #include <stdio.h>
10 #include <ctype.h>
11 #include <signal.h>
12 #ifndef SYS_WINNT
13 #include <pthread.h>
14 #endif
15 
16 #include "ntp_stdlib.h"
17 #include "ntp_malloc.h"
18 #include "ntp_syslog.h"
19 #include "ntpd.h"
20 #include "ntp_io.h"
21 #include "ntp_assert.h"
22 #include "ntp_unixtime.h"
23 #include "timespecops.h"
24 #include "ntp_worker.h"
25 
26 #define CHILD_EXIT_REQ	((blocking_pipe_header *)(intptr_t)-1)
27 #define CHILD_GONE_RESP	CHILD_EXIT_REQ
28 /* Queue size increments:
29  * The request queue grows a bit faster than the response queue -- the
30  * deamon can push requests and pull results faster on avarage than the
31  * worker can process requests and push results...  If this really pays
32  * off is debatable.
33  */
34 #define WORKITEMS_ALLOC_INC	16
35 #define RESPONSES_ALLOC_INC	4
36 
37 /* Fiddle with min/max stack sizes. 64kB minimum seems to work, so we
38  * set the maximum to 256kB. If the minimum goes below the
39  * system-defined minimum stack size, we have to adjust accordingly.
40  */
41 #ifndef THREAD_MINSTACKSIZE
42 # define THREAD_MINSTACKSIZE	(64U * 1024)
43 #endif
44 #ifndef __sun
45 #if defined(PTHREAD_STACK_MIN) && THREAD_MINSTACKSIZE < PTHREAD_STACK_MIN
46 # undef THREAD_MINSTACKSIZE
47 # define THREAD_MINSTACKSIZE PTHREAD_STACK_MIN
48 #endif
49 #endif
50 
51 #ifndef THREAD_MAXSTACKSIZE
52 # define THREAD_MAXSTACKSIZE	(256U * 1024)
53 #endif
54 #if THREAD_MAXSTACKSIZE < THREAD_MINSTACKSIZE
55 # undef  THREAD_MAXSTACKSIZE
56 # define THREAD_MAXSTACKSIZE THREAD_MINSTACKSIZE
57 #endif
58 
59 
60 #ifdef SYS_WINNT
61 
62 # define thread_exit(c)	_endthreadex(c)
63 # define tickle_sem(sh) ReleaseSemaphore((sh->shnd), 1, NULL)
64 u_int	WINAPI	blocking_thread(void *);
65 static BOOL	same_os_sema(const sem_ref obj, void * osobj);
66 
67 #else
68 
69 # define thread_exit(c)	pthread_exit((void*)(size_t)(c))
70 # define tickle_sem	sem_post
71 void *		blocking_thread(void *);
72 static	void	block_thread_signals(sigset_t *);
73 
74 #endif
75 
76 #ifdef WORK_PIPE
77 addremove_io_fd_func		addremove_io_fd;
78 #else
79 addremove_io_semaphore_func	addremove_io_semaphore;
80 #endif
81 
82 static	void	start_blocking_thread(blocking_child *);
83 static	void	start_blocking_thread_internal(blocking_child *);
84 static	void	prepare_child_sems(blocking_child *);
85 static	int	wait_for_sem(sem_ref, struct timespec *);
86 static	int	ensure_workitems_empty_slot(blocking_child *);
87 static	int	ensure_workresp_empty_slot(blocking_child *);
88 static	int	queue_req_pointer(blocking_child *, blocking_pipe_header *);
89 static	void	cleanup_after_child(blocking_child *);
90 
91 static sema_type worker_mmutex;
92 static sem_ref   worker_memlock;
93 
94 /* --------------------------------------------------------------------
95  * locking the global worker state table (and other global stuff)
96  */
97 void
98 worker_global_lock(
99 	int inOrOut)
100 {
101 	if (worker_memlock) {
102 		if (inOrOut)
103 			wait_for_sem(worker_memlock, NULL);
104 		else
105 			tickle_sem(worker_memlock);
106 	}
107 }
108 
109 /* --------------------------------------------------------------------
110  * implementation isolation wrapper
111  */
112 void
113 exit_worker(
114 	int	exitcode
115 	)
116 {
117 	thread_exit(exitcode);	/* see #define thread_exit */
118 }
119 
120 /* --------------------------------------------------------------------
121  * sleep for a given time or until the wakup semaphore is tickled.
122  */
123 int
124 worker_sleep(
125 	blocking_child *	c,
126 	time_t			seconds
127 	)
128 {
129 	struct timespec	until;
130 	int		rc;
131 
132 # ifdef HAVE_CLOCK_GETTIME
133 	if (0 != clock_gettime(CLOCK_REALTIME, &until)) {
134 		msyslog(LOG_ERR, "worker_sleep: clock_gettime() failed: %m");
135 		return -1;
136 	}
137 # else
138 	if (0 != getclock(TIMEOFDAY, &until)) {
139 		msyslog(LOG_ERR, "worker_sleep: getclock() failed: %m");
140 		return -1;
141 	}
142 # endif
143 	until.tv_sec += seconds;
144 	rc = wait_for_sem(c->wake_scheduled_sleep, &until);
145 	if (0 == rc)
146 		return -1;
147 	if (-1 == rc && ETIMEDOUT == errno)
148 		return 0;
149 	msyslog(LOG_ERR, "worker_sleep: sem_timedwait: %m");
150 	return -1;
151 }
152 
153 
154 /* --------------------------------------------------------------------
155  * Wake up a worker that takes a nap.
156  */
157 void
158 interrupt_worker_sleep(void)
159 {
160 	u_int			idx;
161 	blocking_child *	c;
162 
163 	for (idx = 0; idx < blocking_children_alloc; idx++) {
164 		c = blocking_children[idx];
165 		if (NULL == c || NULL == c->wake_scheduled_sleep)
166 			continue;
167 		tickle_sem(c->wake_scheduled_sleep);
168 	}
169 }
170 
171 /* --------------------------------------------------------------------
172  * Make sure there is an empty slot at the head of the request
173  * queue. Tell if the queue is currently empty.
174  */
175 static int
176 ensure_workitems_empty_slot(
177 	blocking_child *c
178 	)
179 {
180 	/*
181 	** !!! PRECONDITION: caller holds access lock!
182 	**
183 	** This simply tries to increase the size of the buffer if it
184 	** becomes full. The resize operation does *not* maintain the
185 	** order of requests, but that should be irrelevant since the
186 	** processing is considered asynchronous anyway.
187 	**
188 	** Return if the buffer is currently empty.
189 	*/
190 
191 	static const size_t each =
192 	    sizeof(blocking_children[0]->workitems[0]);
193 
194 	size_t	new_alloc;
195 	size_t  slots_used;
196 	size_t	sidx;
197 
198 	slots_used = c->head_workitem - c->tail_workitem;
199 	if (slots_used >= c->workitems_alloc) {
200 		new_alloc  = c->workitems_alloc + WORKITEMS_ALLOC_INC;
201 		c->workitems = erealloc(c->workitems, new_alloc * each);
202 		for (sidx = c->workitems_alloc; sidx < new_alloc; ++sidx)
203 		    c->workitems[sidx] = NULL;
204 		c->tail_workitem   = 0;
205 		c->head_workitem   = c->workitems_alloc;
206 		c->workitems_alloc = new_alloc;
207 	}
208 	INSIST(NULL == c->workitems[c->head_workitem % c->workitems_alloc]);
209 	return (0 == slots_used);
210 }
211 
212 /* --------------------------------------------------------------------
213  * Make sure there is an empty slot at the head of the response
214  * queue. Tell if the queue is currently empty.
215  */
216 static int
217 ensure_workresp_empty_slot(
218 	blocking_child *c
219 	)
220 {
221 	/*
222 	** !!! PRECONDITION: caller holds access lock!
223 	**
224 	** Works like the companion function above.
225 	*/
226 
227 	static const size_t each =
228 	    sizeof(blocking_children[0]->responses[0]);
229 
230 	size_t	new_alloc;
231 	size_t  slots_used;
232 	size_t	sidx;
233 
234 	slots_used = c->head_response - c->tail_response;
235 	if (slots_used >= c->responses_alloc) {
236 		new_alloc  = c->responses_alloc + RESPONSES_ALLOC_INC;
237 		c->responses = erealloc(c->responses, new_alloc * each);
238 		for (sidx = c->responses_alloc; sidx < new_alloc; ++sidx)
239 		    c->responses[sidx] = NULL;
240 		c->tail_response   = 0;
241 		c->head_response   = c->responses_alloc;
242 		c->responses_alloc = new_alloc;
243 	}
244 	INSIST(NULL == c->responses[c->head_response % c->responses_alloc]);
245 	return (0 == slots_used);
246 }
247 
248 
249 /* --------------------------------------------------------------------
250  * queue_req_pointer() - append a work item or idle exit request to
251  *			 blocking_workitems[]. Employ proper locking.
252  */
253 static int
254 queue_req_pointer(
255 	blocking_child	*	c,
256 	blocking_pipe_header *	hdr
257 	)
258 {
259 	size_t qhead;
260 
261 	/* >>>> ACCESS LOCKING STARTS >>>> */
262 	wait_for_sem(c->accesslock, NULL);
263 	ensure_workitems_empty_slot(c);
264 	qhead = c->head_workitem;
265 	c->workitems[qhead % c->workitems_alloc] = hdr;
266 	c->head_workitem = 1 + qhead;
267 	tickle_sem(c->accesslock);
268 	/* <<<< ACCESS LOCKING ENDS <<<< */
269 
270 	/* queue consumer wake-up notification */
271 	tickle_sem(c->workitems_pending);
272 
273 	return 0;
274 }
275 
276 /* --------------------------------------------------------------------
277  * API function to make sure a worker is running, a proper private copy
278  * of the data is made, the data eneterd into the queue and the worker
279  * is signalled.
280  */
281 int
282 send_blocking_req_internal(
283 	blocking_child *	c,
284 	blocking_pipe_header *	hdr,
285 	void *			data
286 	)
287 {
288 	blocking_pipe_header *	threadcopy;
289 	size_t			payload_octets;
290 
291 	REQUIRE(hdr != NULL);
292 	REQUIRE(data != NULL);
293 	DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == hdr->magic_sig);
294 
295 	if (hdr->octets <= sizeof(*hdr))
296 		return 1;	/* failure */
297 	payload_octets = hdr->octets - sizeof(*hdr);
298 
299 	if (NULL == c->thread_ref)
300 		start_blocking_thread(c);
301 	threadcopy = emalloc(hdr->octets);
302 	memcpy(threadcopy, hdr, sizeof(*hdr));
303 	memcpy((char *)threadcopy + sizeof(*hdr), data, payload_octets);
304 
305 	return queue_req_pointer(c, threadcopy);
306 }
307 
308 /* --------------------------------------------------------------------
309  * Wait for the 'incoming queue no longer empty' signal, lock the shared
310  * structure and dequeue an item.
311  */
312 blocking_pipe_header *
313 receive_blocking_req_internal(
314 	blocking_child *	c
315 	)
316 {
317 	blocking_pipe_header *	req;
318 	size_t			qhead, qtail;
319 
320 	req = NULL;
321 	do {
322 		/* wait for tickle from the producer side */
323 		wait_for_sem(c->workitems_pending, NULL);
324 
325 		/* >>>> ACCESS LOCKING STARTS >>>> */
326 		wait_for_sem(c->accesslock, NULL);
327 		qhead = c->head_workitem;
328 		do {
329 			qtail = c->tail_workitem;
330 			if (qhead == qtail)
331 				break;
332 			c->tail_workitem = qtail + 1;
333 			qtail %= c->workitems_alloc;
334 			req = c->workitems[qtail];
335 			c->workitems[qtail] = NULL;
336 		} while (NULL == req);
337 		tickle_sem(c->accesslock);
338 		/* <<<< ACCESS LOCKING ENDS <<<< */
339 
340 	} while (NULL == req);
341 
342 	INSIST(NULL != req);
343 	if (CHILD_EXIT_REQ == req) {	/* idled out */
344 		send_blocking_resp_internal(c, CHILD_GONE_RESP);
345 		req = NULL;
346 	}
347 
348 	return req;
349 }
350 
351 /* --------------------------------------------------------------------
352  * Push a response into the return queue and eventually tickle the
353  * receiver.
354  */
355 int
356 send_blocking_resp_internal(
357 	blocking_child *	c,
358 	blocking_pipe_header *	resp
359 	)
360 {
361 	size_t	qhead;
362 	int	empty;
363 
364 	/* >>>> ACCESS LOCKING STARTS >>>> */
365 	wait_for_sem(c->accesslock, NULL);
366 	empty = ensure_workresp_empty_slot(c);
367 	qhead = c->head_response;
368 	c->responses[qhead % c->responses_alloc] = resp;
369 	c->head_response = 1 + qhead;
370 	tickle_sem(c->accesslock);
371 	/* <<<< ACCESS LOCKING ENDS <<<< */
372 
373 	/* queue consumer wake-up notification */
374 	if (empty)
375 	{
376 #	    ifdef WORK_PIPE
377 		write(c->resp_write_pipe, "", 1);
378 #	    else
379 		tickle_sem(c->responses_pending);
380 #	    endif
381 	}
382 	return 0;
383 }
384 
385 
386 #ifndef WORK_PIPE
387 
388 /* --------------------------------------------------------------------
389  * Check if a (Windows-)hanndle to a semaphore is actually the same we
390  * are using inside the sema wrapper.
391  */
392 static BOOL
393 same_os_sema(
394 	const sem_ref	obj,
395 	void*		osh
396 	)
397 {
398 	return obj && osh && (obj->shnd == (HANDLE)osh);
399 }
400 
401 /* --------------------------------------------------------------------
402  * Find the shared context that associates to an OS handle and make sure
403  * the data is dequeued and processed.
404  */
405 void
406 handle_blocking_resp_sem(
407 	void *	context
408 	)
409 {
410 	blocking_child *	c;
411 	u_int			idx;
412 
413 	c = NULL;
414 	for (idx = 0; idx < blocking_children_alloc; idx++) {
415 		c = blocking_children[idx];
416 		if (c != NULL &&
417 			c->thread_ref != NULL &&
418 			same_os_sema(c->responses_pending, context))
419 			break;
420 	}
421 	if (idx < blocking_children_alloc)
422 		process_blocking_resp(c);
423 }
424 #endif	/* !WORK_PIPE */
425 
426 /* --------------------------------------------------------------------
427  * Fetch the next response from the return queue. In case of signalling
428  * via pipe, make sure the pipe is flushed, too.
429  */
430 blocking_pipe_header *
431 receive_blocking_resp_internal(
432 	blocking_child *	c
433 	)
434 {
435 	blocking_pipe_header *	removed;
436 	size_t			qhead, qtail, slot;
437 
438 #ifdef WORK_PIPE
439 	int			rc;
440 	char			scratch[32];
441 
442 	do
443 		rc = read(c->resp_read_pipe, scratch, sizeof(scratch));
444 	while (-1 == rc && EINTR == errno);
445 #endif
446 
447 	/* >>>> ACCESS LOCKING STARTS >>>> */
448 	wait_for_sem(c->accesslock, NULL);
449 	qhead = c->head_response;
450 	qtail = c->tail_response;
451 	for (removed = NULL; !removed && (qhead != qtail); ++qtail) {
452 		slot = qtail % c->responses_alloc;
453 		removed = c->responses[slot];
454 		c->responses[slot] = NULL;
455 	}
456 	c->tail_response = qtail;
457 	tickle_sem(c->accesslock);
458 	/* <<<< ACCESS LOCKING ENDS <<<< */
459 
460 	if (NULL != removed) {
461 		DEBUG_ENSURE(CHILD_GONE_RESP == removed ||
462 			     BLOCKING_RESP_MAGIC == removed->magic_sig);
463 	}
464 	if (CHILD_GONE_RESP == removed) {
465 		cleanup_after_child(c);
466 		removed = NULL;
467 	}
468 
469 	return removed;
470 }
471 
472 /* --------------------------------------------------------------------
473  * Light up a new worker.
474  */
475 static void
476 start_blocking_thread(
477 	blocking_child *	c
478 	)
479 {
480 
481 	DEBUG_INSIST(!c->reusable);
482 
483 	prepare_child_sems(c);
484 	start_blocking_thread_internal(c);
485 }
486 
487 /* --------------------------------------------------------------------
488  * Create a worker thread. There are several differences between POSIX
489  * and Windows, of course -- most notably the Windows thread is no
490  * detached thread, and we keep the handle around until we want to get
491  * rid of the thread. The notification scheme also differs: Windows
492  * makes use of semaphores in both directions, POSIX uses a pipe for
493  * integration with 'select()' or alike.
494  */
495 static void
496 start_blocking_thread_internal(
497 	blocking_child *	c
498 	)
499 #ifdef SYS_WINNT
500 {
501 	BOOL	resumed;
502 
503 	c->thread_ref = NULL;
504 	(*addremove_io_semaphore)(c->responses_pending->shnd, FALSE);
505 	c->thr_table[0].thnd =
506 		(HANDLE)_beginthreadex(
507 			NULL,
508 			0,
509 			&blocking_thread,
510 			c,
511 			CREATE_SUSPENDED,
512 			NULL);
513 
514 	if (NULL == c->thr_table[0].thnd) {
515 		msyslog(LOG_ERR, "start blocking thread failed: %m");
516 		exit(-1);
517 	}
518 	/* remember the thread priority is only within the process class */
519 	if (!SetThreadPriority(c->thr_table[0].thnd,
520 			       THREAD_PRIORITY_BELOW_NORMAL))
521 		msyslog(LOG_ERR, "Error lowering blocking thread priority: %m");
522 
523 	resumed = ResumeThread(c->thr_table[0].thnd);
524 	DEBUG_INSIST(resumed);
525 	c->thread_ref = &c->thr_table[0];
526 }
527 #else	/* pthreads start_blocking_thread_internal() follows */
528 {
529 # ifdef NEED_PTHREAD_INIT
530 	static int	pthread_init_called;
531 # endif
532 	pthread_attr_t	thr_attr;
533 	int		rc;
534 	int		pipe_ends[2];	/* read then write */
535 	int		is_pipe;
536 	int		flags;
537 	size_t		ostacksize;
538 	size_t		nstacksize;
539 	sigset_t	saved_sig_mask;
540 
541 	c->thread_ref = NULL;
542 
543 # ifdef NEED_PTHREAD_INIT
544 	/*
545 	 * from lib/isc/unix/app.c:
546 	 * BSDI 3.1 seg faults in pthread_sigmask() if we don't do this.
547 	 */
548 	if (!pthread_init_called) {
549 		pthread_init();
550 		pthread_init_called = TRUE;
551 	}
552 # endif
553 
554 	rc = pipe_socketpair(&pipe_ends[0], &is_pipe);
555 	if (0 != rc) {
556 		msyslog(LOG_ERR, "start_blocking_thread: pipe_socketpair() %m");
557 		exit(1);
558 	}
559 	c->resp_read_pipe = move_fd(pipe_ends[0]);
560 	c->resp_write_pipe = move_fd(pipe_ends[1]);
561 	c->ispipe = is_pipe;
562 	flags = fcntl(c->resp_read_pipe, F_GETFL, 0);
563 	if (-1 == flags) {
564 		msyslog(LOG_ERR, "start_blocking_thread: fcntl(F_GETFL) %m");
565 		exit(1);
566 	}
567 	rc = fcntl(c->resp_read_pipe, F_SETFL, O_NONBLOCK | flags);
568 	if (-1 == rc) {
569 		msyslog(LOG_ERR,
570 			"start_blocking_thread: fcntl(F_SETFL, O_NONBLOCK) %m");
571 		exit(1);
572 	}
573 	(*addremove_io_fd)(c->resp_read_pipe, c->ispipe, FALSE);
574 	pthread_attr_init(&thr_attr);
575 	pthread_attr_setdetachstate(&thr_attr, PTHREAD_CREATE_DETACHED);
576 #if defined(HAVE_PTHREAD_ATTR_GETSTACKSIZE) && \
577     defined(HAVE_PTHREAD_ATTR_SETSTACKSIZE)
578 	rc = pthread_attr_getstacksize(&thr_attr, &ostacksize);
579 	if (0 != rc) {
580 		msyslog(LOG_ERR,
581 			"start_blocking_thread: pthread_attr_getstacksize() -> %s",
582 			strerror(rc));
583 	} else {
584 		if (ostacksize < THREAD_MINSTACKSIZE)
585 			nstacksize = THREAD_MINSTACKSIZE;
586 		else if (ostacksize > THREAD_MAXSTACKSIZE)
587 			nstacksize = THREAD_MAXSTACKSIZE;
588 		else
589 			nstacksize = ostacksize;
590 		if (nstacksize != ostacksize)
591 			rc = pthread_attr_setstacksize(&thr_attr, nstacksize);
592 		if (0 != rc)
593 			msyslog(LOG_ERR,
594 				"start_blocking_thread: pthread_attr_setstacksize(0x%lx -> 0x%lx) -> %s",
595 				(u_long)ostacksize, (u_long)nstacksize,
596 				strerror(rc));
597 	}
598 #else
599 	UNUSED_ARG(nstacksize);
600 	UNUSED_ARG(ostacksize);
601 #endif
602 #if defined(PTHREAD_SCOPE_SYSTEM) && defined(NEED_PTHREAD_SCOPE_SYSTEM)
603 	pthread_attr_setscope(&thr_attr, PTHREAD_SCOPE_SYSTEM);
604 #endif
605 	c->thread_ref = emalloc_zero(sizeof(*c->thread_ref));
606 	block_thread_signals(&saved_sig_mask);
607 	rc = pthread_create(&c->thr_table[0], &thr_attr,
608 			    &blocking_thread, c);
609 	pthread_sigmask(SIG_SETMASK, &saved_sig_mask, NULL);
610 	pthread_attr_destroy(&thr_attr);
611 	if (0 != rc) {
612 		msyslog(LOG_ERR, "start_blocking_thread: pthread_create() -> %s",
613 			strerror(rc));
614 		exit(1);
615 	}
616 	c->thread_ref = &c->thr_table[0];
617 }
618 #endif
619 
620 /* --------------------------------------------------------------------
621  * block_thread_signals()
622  *
623  * Temporarily block signals used by ntpd main thread, so that signal
624  * mask inherited by child threads leaves them blocked.  Returns prior
625  * active signal mask via pmask, to be restored by the main thread
626  * after pthread_create().
627  */
628 #ifndef SYS_WINNT
629 void
630 block_thread_signals(
631 	sigset_t *	pmask
632 	)
633 {
634 	sigset_t	block;
635 
636 	sigemptyset(&block);
637 # ifdef HAVE_SIGNALED_IO
638 #  ifdef SIGIO
639 	sigaddset(&block, SIGIO);
640 #  endif
641 #  ifdef SIGPOLL
642 	sigaddset(&block, SIGPOLL);
643 #  endif
644 # endif	/* HAVE_SIGNALED_IO */
645 	sigaddset(&block, SIGALRM);
646 	sigaddset(&block, MOREDEBUGSIG);
647 	sigaddset(&block, LESSDEBUGSIG);
648 # ifdef SIGDIE1
649 	sigaddset(&block, SIGDIE1);
650 # endif
651 # ifdef SIGDIE2
652 	sigaddset(&block, SIGDIE2);
653 # endif
654 # ifdef SIGDIE3
655 	sigaddset(&block, SIGDIE3);
656 # endif
657 # ifdef SIGDIE4
658 	sigaddset(&block, SIGDIE4);
659 # endif
660 # ifdef SIGBUS
661 	sigaddset(&block, SIGBUS);
662 # endif
663 	sigemptyset(pmask);
664 	pthread_sigmask(SIG_BLOCK, &block, pmask);
665 }
666 #endif	/* !SYS_WINNT */
667 
668 
669 /* --------------------------------------------------------------------
670  * Create & destroy semaphores. This is sufficiently different between
671  * POSIX and Windows to warrant wrapper functions and close enough to
672  * use the concept of synchronization via semaphore for all platforms.
673  */
674 static sem_ref
675 create_sema(
676 	sema_type*	semptr,
677 	u_int		inival,
678 	u_int		maxval)
679 {
680 #ifdef SYS_WINNT
681 
682 	long svini, svmax;
683 	if (NULL != semptr) {
684 		svini = (inival < LONG_MAX)
685 		    ? (long)inival : LONG_MAX;
686 		svmax = (maxval < LONG_MAX && maxval > 0)
687 		    ? (long)maxval : LONG_MAX;
688 		semptr->shnd = CreateSemaphore(NULL, svini, svmax, NULL);
689 		if (NULL == semptr->shnd)
690 			semptr = NULL;
691 	}
692 
693 #else
694 
695 	(void)maxval;
696 	if (semptr && sem_init(semptr, FALSE, inival))
697 		semptr = NULL;
698 
699 #endif
700 
701 	return semptr;
702 }
703 
704 /* ------------------------------------------------------------------ */
705 static sem_ref
706 delete_sema(
707 	sem_ref obj)
708 {
709 
710 #   ifdef SYS_WINNT
711 
712 	if (obj) {
713 		if (obj->shnd)
714 			CloseHandle(obj->shnd);
715 		obj->shnd = NULL;
716 	}
717 
718 #   else
719 
720 	if (obj)
721 		sem_destroy(obj);
722 
723 #   endif
724 
725 	return NULL;
726 }
727 
728 /* --------------------------------------------------------------------
729  * prepare_child_sems()
730  *
731  * create sync & access semaphores
732  *
733  * All semaphores are cleared, only the access semaphore has 1 unit.
734  * Childs wait on 'workitems_pending', then grabs 'sema_access'
735  * and dequeues jobs. When done, 'sema_access' is given one unit back.
736  *
737  * The producer grabs 'sema_access', manages the queue, restores
738  * 'sema_access' and puts one unit into 'workitems_pending'.
739  *
740  * The story goes the same for the response queue.
741  */
742 static void
743 prepare_child_sems(
744 	blocking_child *c
745 	)
746 {
747 	if (NULL == worker_memlock)
748 		worker_memlock = create_sema(&worker_mmutex, 1, 1);
749 
750 	c->accesslock           = create_sema(&c->sem_table[0], 1, 1);
751 	c->workitems_pending    = create_sema(&c->sem_table[1], 0, 0);
752 	c->wake_scheduled_sleep = create_sema(&c->sem_table[2], 0, 1);
753 #   ifndef WORK_PIPE
754 	c->responses_pending    = create_sema(&c->sem_table[3], 0, 0);
755 #   endif
756 }
757 
758 /* --------------------------------------------------------------------
759  * wait for semaphore. Where the wait can be interrupted, it will
760  * internally resume -- When this function returns, there is either no
761  * semaphore at all, a timeout occurred, or the caller could
762  * successfully take a token from the semaphore.
763  *
764  * For untimed wait, not checking the result of this function at all is
765  * definitely an option.
766  */
767 static int
768 wait_for_sem(
769 	sem_ref			sem,
770 	struct timespec *	timeout		/* wall-clock */
771 	)
772 #ifdef SYS_WINNT
773 {
774 	struct timespec now;
775 	struct timespec delta;
776 	DWORD		msec;
777 	DWORD		rc;
778 
779 	if (!(sem && sem->shnd)) {
780 		errno = EINVAL;
781 		return -1;
782 	}
783 
784 	if (NULL == timeout) {
785 		msec = INFINITE;
786 	} else {
787 		getclock(TIMEOFDAY, &now);
788 		delta = sub_tspec(*timeout, now);
789 		if (delta.tv_sec < 0) {
790 			msec = 0;
791 		} else if ((delta.tv_sec + 1) >= (MAXDWORD / 1000)) {
792 			msec = INFINITE;
793 		} else {
794 			msec = 1000 * (DWORD)delta.tv_sec;
795 			msec += delta.tv_nsec / (1000 * 1000);
796 		}
797 	}
798 	rc = WaitForSingleObject(sem->shnd, msec);
799 	if (WAIT_OBJECT_0 == rc)
800 		return 0;
801 	if (WAIT_TIMEOUT == rc) {
802 		errno = ETIMEDOUT;
803 		return -1;
804 	}
805 	msyslog(LOG_ERR, "WaitForSingleObject unexpected 0x%x", rc);
806 	errno = EFAULT;
807 	return -1;
808 }
809 #else	/* pthreads wait_for_sem() follows */
810 {
811 	int rc = -1;
812 
813 	if (sem) do {
814 			if (NULL == timeout)
815 				rc = sem_wait(sem);
816 			else
817 				rc = sem_timedwait(sem, timeout);
818 		} while (rc == -1 && errno == EINTR);
819 	else
820 		errno = EINVAL;
821 
822 	return rc;
823 }
824 #endif
825 
826 /* --------------------------------------------------------------------
827  * blocking_thread - thread functions have WINAPI (aka 'stdcall')
828  * calling conventions under Windows and POSIX-defined signature
829  * otherwise.
830  */
831 #ifdef SYS_WINNT
832 u_int WINAPI
833 #else
834 void *
835 #endif
836 blocking_thread(
837 	void *	ThreadArg
838 	)
839 {
840 	blocking_child *c;
841 
842 	c = ThreadArg;
843 	exit_worker(blocking_child_common(c));
844 
845 	/* NOTREACHED */
846 	return 0;
847 }
848 
849 /* --------------------------------------------------------------------
850  * req_child_exit() runs in the parent.
851  *
852  * This function is called from from the idle timer, too, and possibly
853  * without a thread being there any longer. Since we have folded up our
854  * tent in that case and all the semaphores are already gone, we simply
855  * ignore this request in this case.
856  *
857  * Since the existence of the semaphores is controlled exclusively by
858  * the parent, there's no risk of data race here.
859  */
860 int
861 req_child_exit(
862 	blocking_child *c
863 	)
864 {
865 	return (c->accesslock)
866 	    ? queue_req_pointer(c, CHILD_EXIT_REQ)
867 	    : 0;
868 }
869 
870 /* --------------------------------------------------------------------
871  * cleanup_after_child() runs in parent.
872  */
873 static void
874 cleanup_after_child(
875 	blocking_child *	c
876 	)
877 {
878 	DEBUG_INSIST(!c->reusable);
879 
880 #   ifdef SYS_WINNT
881 	/* The thread was not created in detached state, so we better
882 	 * clean up.
883 	 */
884 	if (c->thread_ref && c->thread_ref->thnd) {
885 		WaitForSingleObject(c->thread_ref->thnd, INFINITE);
886 		INSIST(CloseHandle(c->thread_ref->thnd));
887 		c->thread_ref->thnd = NULL;
888 	}
889 #   endif
890 	c->thread_ref = NULL;
891 
892 	/* remove semaphores and (if signalling vi IO) pipes */
893 
894 	c->accesslock           = delete_sema(c->accesslock);
895 	c->workitems_pending    = delete_sema(c->workitems_pending);
896 	c->wake_scheduled_sleep = delete_sema(c->wake_scheduled_sleep);
897 
898 #   ifdef WORK_PIPE
899 	DEBUG_INSIST(-1 != c->resp_read_pipe);
900 	DEBUG_INSIST(-1 != c->resp_write_pipe);
901 	(*addremove_io_fd)(c->resp_read_pipe, c->ispipe, TRUE);
902 	close(c->resp_write_pipe);
903 	close(c->resp_read_pipe);
904 	c->resp_write_pipe = -1;
905 	c->resp_read_pipe = -1;
906 #   else
907 	DEBUG_INSIST(NULL != c->responses_pending);
908 	(*addremove_io_semaphore)(c->responses_pending->shnd, TRUE);
909 	c->responses_pending = delete_sema(c->responses_pending);
910 #   endif
911 
912 	/* Is it necessary to check if there are pending requests and
913 	 * responses? If so, and if there are, what to do with them?
914 	 */
915 
916 	/* re-init buffer index sequencers */
917 	c->head_workitem = 0;
918 	c->tail_workitem = 0;
919 	c->head_response = 0;
920 	c->tail_response = 0;
921 
922 	c->reusable = TRUE;
923 }
924 
925 
926 #else	/* !WORK_THREAD follows */
927 char work_thread_nonempty_compilation_unit;
928 #endif
929