xref: /freebsd/contrib/ntp/libntp/work_thread.c (revision 7ebc7d1ab76b9d06be9400d6c9fc74fcc43603a1)
1 /*
2  * work_thread.c - threads implementation for blocking worker child.
3  */
4 #include <config.h>
5 #include "ntp_workimpl.h"
6 
7 #ifdef WORK_THREAD
8 
9 #include <stdio.h>
10 #include <ctype.h>
11 #include <signal.h>
12 #ifndef SYS_WINNT
13 #include <pthread.h>
14 #endif
15 
16 #include "ntp_stdlib.h"
17 #include "ntp_malloc.h"
18 #include "ntp_syslog.h"
19 #include "ntpd.h"
20 #include "ntp_io.h"
21 #include "ntp_assert.h"
22 #include "ntp_unixtime.h"
23 #include "timespecops.h"
24 #include "ntp_worker.h"
25 
26 #define CHILD_EXIT_REQ	((blocking_pipe_header *)(intptr_t)-1)
27 #define CHILD_GONE_RESP	CHILD_EXIT_REQ
28 /* Queue size increments:
29  * The request queue grows a bit faster than the response queue -- the
30  * daemon can push requests and pull results faster on avarage than the
31  * worker can process requests and push results...  If this really pays
32  * off is debatable.
33  */
34 #define WORKITEMS_ALLOC_INC	16
35 #define RESPONSES_ALLOC_INC	4
36 
37 /* Fiddle with min/max stack sizes. 64kB minimum seems to work, so we
38  * set the maximum to 256kB. If the minimum goes below the
39  * system-defined minimum stack size, we have to adjust accordingly.
40  */
41 #ifndef THREAD_MINSTACKSIZE
42 # define THREAD_MINSTACKSIZE	(64U * 1024)
43 #endif
44 
45 #ifndef THREAD_MAXSTACKSIZE
46 # define THREAD_MAXSTACKSIZE	(256U * 1024)
47 #endif
48 
49 /* need a good integer to store a pointer... */
50 #ifndef UINTPTR_T
51 # if defined(UINTPTR_MAX)
52 #  define UINTPTR_T uintptr_t
53 # elif defined(UINT_PTR)
54 #  define UINTPTR_T UINT_PTR
55 # else
56 #  define UINTPTR_T size_t
57 # endif
58 #endif
59 
60 
61 #ifdef SYS_WINNT
62 
63 # define thread_exit(c)	_endthreadex(c)
64 # define tickle_sem(sh) ReleaseSemaphore((sh->shnd), 1, NULL)
65 u_int	WINAPI	blocking_thread(void *);
66 static BOOL	same_os_sema(const sem_ref obj, void * osobj);
67 
68 #else
69 
70 # define thread_exit(c)	pthread_exit((void*)(UINTPTR_T)(c))
71 # define tickle_sem	sem_post
72 void *		blocking_thread(void *);
73 static	void	block_thread_signals(sigset_t *);
74 
75 #endif
76 
77 #ifdef WORK_PIPE
78 addremove_io_fd_func		addremove_io_fd;
79 #else
80 addremove_io_semaphore_func	addremove_io_semaphore;
81 #endif
82 
83 static	void	start_blocking_thread(blocking_child *);
84 static	void	start_blocking_thread_internal(blocking_child *);
85 static	void	prepare_child_sems(blocking_child *);
86 static	int	wait_for_sem(sem_ref, struct timespec *);
87 static	int	ensure_workitems_empty_slot(blocking_child *);
88 static	int	ensure_workresp_empty_slot(blocking_child *);
89 static	int	queue_req_pointer(blocking_child *, blocking_pipe_header *);
90 static	void	cleanup_after_child(blocking_child *);
91 
92 static sema_type worker_mmutex;
93 static sem_ref   worker_memlock;
94 
95 /* --------------------------------------------------------------------
96  * locking the global worker state table (and other global stuff)
97  */
98 void
99 worker_global_lock(
100 	int inOrOut)
101 {
102 	if (worker_memlock) {
103 		if (inOrOut)
104 			wait_for_sem(worker_memlock, NULL);
105 		else
106 			tickle_sem(worker_memlock);
107 	}
108 }
109 
110 /* --------------------------------------------------------------------
111  * implementation isolation wrapper
112  */
113 void
114 exit_worker(
115 	int	exitcode
116 	)
117 {
118 	thread_exit(exitcode);	/* see #define thread_exit */
119 }
120 
121 /* --------------------------------------------------------------------
122  * sleep for a given time or until the wakup semaphore is tickled.
123  */
124 int
125 worker_sleep(
126 	blocking_child *	c,
127 	time_t			seconds
128 	)
129 {
130 	struct timespec	until;
131 	int		rc;
132 
133 # ifdef HAVE_CLOCK_GETTIME
134 	if (0 != clock_gettime(CLOCK_REALTIME, &until)) {
135 		msyslog(LOG_ERR, "worker_sleep: clock_gettime() failed: %m");
136 		return -1;
137 	}
138 # else
139 	if (0 != getclock(TIMEOFDAY, &until)) {
140 		msyslog(LOG_ERR, "worker_sleep: getclock() failed: %m");
141 		return -1;
142 	}
143 # endif
144 	until.tv_sec += seconds;
145 	rc = wait_for_sem(c->wake_scheduled_sleep, &until);
146 	if (0 == rc)
147 		return -1;
148 	if (-1 == rc && ETIMEDOUT == errno)
149 		return 0;
150 	msyslog(LOG_ERR, "worker_sleep: sem_timedwait: %m");
151 	return -1;
152 }
153 
154 
155 /* --------------------------------------------------------------------
156  * Wake up a worker that takes a nap.
157  */
158 void
159 interrupt_worker_sleep(void)
160 {
161 	u_int			idx;
162 	blocking_child *	c;
163 
164 	for (idx = 0; idx < blocking_children_alloc; idx++) {
165 		c = blocking_children[idx];
166 		if (NULL == c || NULL == c->wake_scheduled_sleep)
167 			continue;
168 		tickle_sem(c->wake_scheduled_sleep);
169 	}
170 }
171 
172 /* --------------------------------------------------------------------
173  * Make sure there is an empty slot at the head of the request
174  * queue. Tell if the queue is currently empty.
175  */
176 static int
177 ensure_workitems_empty_slot(
178 	blocking_child *c
179 	)
180 {
181 	/*
182 	** !!! PRECONDITION: caller holds access lock!
183 	**
184 	** This simply tries to increase the size of the buffer if it
185 	** becomes full. The resize operation does *not* maintain the
186 	** order of requests, but that should be irrelevant since the
187 	** processing is considered asynchronous anyway.
188 	**
189 	** Return if the buffer is currently empty.
190 	*/
191 
192 	static const size_t each =
193 	    sizeof(blocking_children[0]->workitems[0]);
194 
195 	size_t	new_alloc;
196 	size_t  slots_used;
197 	size_t	sidx;
198 
199 	slots_used = c->head_workitem - c->tail_workitem;
200 	if (slots_used >= c->workitems_alloc) {
201 		new_alloc  = c->workitems_alloc + WORKITEMS_ALLOC_INC;
202 		c->workitems = erealloc(c->workitems, new_alloc * each);
203 		for (sidx = c->workitems_alloc; sidx < new_alloc; ++sidx)
204 		    c->workitems[sidx] = NULL;
205 		c->tail_workitem   = 0;
206 		c->head_workitem   = c->workitems_alloc;
207 		c->workitems_alloc = new_alloc;
208 	}
209 	INSIST(NULL == c->workitems[c->head_workitem % c->workitems_alloc]);
210 	return (0 == slots_used);
211 }
212 
213 /* --------------------------------------------------------------------
214  * Make sure there is an empty slot at the head of the response
215  * queue. Tell if the queue is currently empty.
216  */
217 static int
218 ensure_workresp_empty_slot(
219 	blocking_child *c
220 	)
221 {
222 	/*
223 	** !!! PRECONDITION: caller holds access lock!
224 	**
225 	** Works like the companion function above.
226 	*/
227 
228 	static const size_t each =
229 	    sizeof(blocking_children[0]->responses[0]);
230 
231 	size_t	new_alloc;
232 	size_t  slots_used;
233 	size_t	sidx;
234 
235 	slots_used = c->head_response - c->tail_response;
236 	if (slots_used >= c->responses_alloc) {
237 		new_alloc  = c->responses_alloc + RESPONSES_ALLOC_INC;
238 		c->responses = erealloc(c->responses, new_alloc * each);
239 		for (sidx = c->responses_alloc; sidx < new_alloc; ++sidx)
240 		    c->responses[sidx] = NULL;
241 		c->tail_response   = 0;
242 		c->head_response   = c->responses_alloc;
243 		c->responses_alloc = new_alloc;
244 	}
245 	INSIST(NULL == c->responses[c->head_response % c->responses_alloc]);
246 	return (0 == slots_used);
247 }
248 
249 
250 /* --------------------------------------------------------------------
251  * queue_req_pointer() - append a work item or idle exit request to
252  *			 blocking_workitems[]. Employ proper locking.
253  */
254 static int
255 queue_req_pointer(
256 	blocking_child	*	c,
257 	blocking_pipe_header *	hdr
258 	)
259 {
260 	size_t qhead;
261 
262 	/* >>>> ACCESS LOCKING STARTS >>>> */
263 	wait_for_sem(c->accesslock, NULL);
264 	ensure_workitems_empty_slot(c);
265 	qhead = c->head_workitem;
266 	c->workitems[qhead % c->workitems_alloc] = hdr;
267 	c->head_workitem = 1 + qhead;
268 	tickle_sem(c->accesslock);
269 	/* <<<< ACCESS LOCKING ENDS <<<< */
270 
271 	/* queue consumer wake-up notification */
272 	tickle_sem(c->workitems_pending);
273 
274 	return 0;
275 }
276 
277 /* --------------------------------------------------------------------
278  * API function to make sure a worker is running, a proper private copy
279  * of the data is made, the data eneterd into the queue and the worker
280  * is signalled.
281  */
282 int
283 send_blocking_req_internal(
284 	blocking_child *	c,
285 	blocking_pipe_header *	hdr,
286 	void *			data
287 	)
288 {
289 	blocking_pipe_header *	threadcopy;
290 	size_t			payload_octets;
291 
292 	REQUIRE(hdr != NULL);
293 	REQUIRE(data != NULL);
294 	DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == hdr->magic_sig);
295 
296 	if (hdr->octets <= sizeof(*hdr))
297 		return 1;	/* failure */
298 	payload_octets = hdr->octets - sizeof(*hdr);
299 
300 	if (NULL == c->thread_ref)
301 		start_blocking_thread(c);
302 	threadcopy = emalloc(hdr->octets);
303 	memcpy(threadcopy, hdr, sizeof(*hdr));
304 	memcpy((char *)threadcopy + sizeof(*hdr), data, payload_octets);
305 
306 	return queue_req_pointer(c, threadcopy);
307 }
308 
309 /* --------------------------------------------------------------------
310  * Wait for the 'incoming queue no longer empty' signal, lock the shared
311  * structure and dequeue an item.
312  */
313 blocking_pipe_header *
314 receive_blocking_req_internal(
315 	blocking_child *	c
316 	)
317 {
318 	blocking_pipe_header *	req;
319 	size_t			qhead, qtail;
320 
321 	req = NULL;
322 	do {
323 		/* wait for tickle from the producer side */
324 		wait_for_sem(c->workitems_pending, NULL);
325 
326 		/* >>>> ACCESS LOCKING STARTS >>>> */
327 		wait_for_sem(c->accesslock, NULL);
328 		qhead = c->head_workitem;
329 		do {
330 			qtail = c->tail_workitem;
331 			if (qhead == qtail)
332 				break;
333 			c->tail_workitem = qtail + 1;
334 			qtail %= c->workitems_alloc;
335 			req = c->workitems[qtail];
336 			c->workitems[qtail] = NULL;
337 		} while (NULL == req);
338 		tickle_sem(c->accesslock);
339 		/* <<<< ACCESS LOCKING ENDS <<<< */
340 
341 	} while (NULL == req);
342 
343 	INSIST(NULL != req);
344 	if (CHILD_EXIT_REQ == req) {	/* idled out */
345 		send_blocking_resp_internal(c, CHILD_GONE_RESP);
346 		req = NULL;
347 	}
348 
349 	return req;
350 }
351 
352 /* --------------------------------------------------------------------
353  * Push a response into the return queue and eventually tickle the
354  * receiver.
355  */
356 int
357 send_blocking_resp_internal(
358 	blocking_child *	c,
359 	blocking_pipe_header *	resp
360 	)
361 {
362 	size_t	qhead;
363 	int	empty;
364 
365 	/* >>>> ACCESS LOCKING STARTS >>>> */
366 	wait_for_sem(c->accesslock, NULL);
367 	empty = ensure_workresp_empty_slot(c);
368 	qhead = c->head_response;
369 	c->responses[qhead % c->responses_alloc] = resp;
370 	c->head_response = 1 + qhead;
371 	tickle_sem(c->accesslock);
372 	/* <<<< ACCESS LOCKING ENDS <<<< */
373 
374 	/* queue consumer wake-up notification */
375 	if (empty)
376 	{
377 #	    ifdef WORK_PIPE
378 		if (1 != write(c->resp_write_pipe, "", 1))
379 			msyslog(LOG_WARNING, "async resolver: blocking_get%sinfo"
380 				" failed to notify main thread!",
381 				(BLOCKING_GETNAMEINFO == resp->rtype)
382 				    ? "name"
383 				    : "addr"
384 				);
385 #	    else
386 		tickle_sem(c->responses_pending);
387 #	    endif
388 	}
389 	return 0;
390 }
391 
392 
393 #ifndef WORK_PIPE
394 
395 /* --------------------------------------------------------------------
396  * Check if a (Windows-)handle to a semaphore is actually the same we
397  * are using inside the sema wrapper.
398  */
399 static BOOL
400 same_os_sema(
401 	const sem_ref	obj,
402 	void*		osh
403 	)
404 {
405 	return obj && osh && (obj->shnd == (HANDLE)osh);
406 }
407 
408 /* --------------------------------------------------------------------
409  * Find the shared context that associates to an OS handle and make sure
410  * the data is dequeued and processed.
411  */
412 void
413 handle_blocking_resp_sem(
414 	void *	context
415 	)
416 {
417 	blocking_child *	c;
418 	u_int			idx;
419 
420 	c = NULL;
421 	for (idx = 0; idx < blocking_children_alloc; idx++) {
422 		c = blocking_children[idx];
423 		if (c != NULL &&
424 			c->thread_ref != NULL &&
425 			same_os_sema(c->responses_pending, context))
426 			break;
427 	}
428 	if (idx < blocking_children_alloc)
429 		process_blocking_resp(c);
430 }
431 #endif	/* !WORK_PIPE */
432 
433 /* --------------------------------------------------------------------
434  * Fetch the next response from the return queue. In case of signalling
435  * via pipe, make sure the pipe is flushed, too.
436  */
437 blocking_pipe_header *
438 receive_blocking_resp_internal(
439 	blocking_child *	c
440 	)
441 {
442 	blocking_pipe_header *	removed;
443 	size_t			qhead, qtail, slot;
444 
445 #ifdef WORK_PIPE
446 	int			rc;
447 	char			scratch[32];
448 
449 	do
450 		rc = read(c->resp_read_pipe, scratch, sizeof(scratch));
451 	while (-1 == rc && EINTR == errno);
452 #endif
453 
454 	/* >>>> ACCESS LOCKING STARTS >>>> */
455 	wait_for_sem(c->accesslock, NULL);
456 	qhead = c->head_response;
457 	qtail = c->tail_response;
458 	for (removed = NULL; !removed && (qhead != qtail); ++qtail) {
459 		slot = qtail % c->responses_alloc;
460 		removed = c->responses[slot];
461 		c->responses[slot] = NULL;
462 	}
463 	c->tail_response = qtail;
464 	tickle_sem(c->accesslock);
465 	/* <<<< ACCESS LOCKING ENDS <<<< */
466 
467 	if (NULL != removed) {
468 		DEBUG_ENSURE(CHILD_GONE_RESP == removed ||
469 			     BLOCKING_RESP_MAGIC == removed->magic_sig);
470 	}
471 	if (CHILD_GONE_RESP == removed) {
472 		cleanup_after_child(c);
473 		removed = NULL;
474 	}
475 
476 	return removed;
477 }
478 
479 /* --------------------------------------------------------------------
480  * Light up a new worker.
481  */
482 static void
483 start_blocking_thread(
484 	blocking_child *	c
485 	)
486 {
487 
488 	DEBUG_INSIST(!c->reusable);
489 
490 	prepare_child_sems(c);
491 	start_blocking_thread_internal(c);
492 }
493 
494 /* --------------------------------------------------------------------
495  * Create a worker thread. There are several differences between POSIX
496  * and Windows, of course -- most notably the Windows thread is a
497  * detached thread, and we keep the handle around until we want to get
498  * rid of the thread. The notification scheme also differs: Windows
499  * makes use of semaphores in both directions, POSIX uses a pipe for
500  * integration with 'select()' or alike.
501  */
502 static void
503 start_blocking_thread_internal(
504 	blocking_child *	c
505 	)
506 #ifdef SYS_WINNT
507 {
508 	BOOL	resumed;
509 
510 	c->thread_ref = NULL;
511 	(*addremove_io_semaphore)(c->responses_pending->shnd, FALSE);
512 	c->thr_table[0].thnd =
513 		(HANDLE)_beginthreadex(
514 			NULL,
515 			0,
516 			&blocking_thread,
517 			c,
518 			CREATE_SUSPENDED,
519 			NULL);
520 
521 	if (NULL == c->thr_table[0].thnd) {
522 		msyslog(LOG_ERR, "start blocking thread failed: %m");
523 		exit(-1);
524 	}
525 	/* remember the thread priority is only within the process class */
526 	if (!SetThreadPriority(c->thr_table[0].thnd,
527 			       THREAD_PRIORITY_BELOW_NORMAL)) {
528 		msyslog(LOG_ERR, "Error lowering blocking thread priority: %m");
529 	}
530 	if (NULL != pSetThreadDescription) {
531 		(*pSetThreadDescription)(c->thr_table[0].thnd, L"ntp_worker");
532 	}
533 	resumed = ResumeThread(c->thr_table[0].thnd);
534 	DEBUG_INSIST(resumed);
535 	c->thread_ref = &c->thr_table[0];
536 }
537 #else	/* pthreads start_blocking_thread_internal() follows */
538 {
539 # ifdef NEED_PTHREAD_INIT
540 	static int	pthread_init_called;
541 # endif
542 	pthread_attr_t	thr_attr;
543 	int		rc;
544 	int		pipe_ends[2];	/* read then write */
545 	int		is_pipe;
546 	int		flags;
547 	size_t		ostacksize;
548 	size_t		nstacksize;
549 	sigset_t	saved_sig_mask;
550 
551 	c->thread_ref = NULL;
552 
553 # ifdef NEED_PTHREAD_INIT
554 	/*
555 	 * from lib/isc/unix/app.c:
556 	 * BSDI 3.1 seg faults in pthread_sigmask() if we don't do this.
557 	 */
558 	if (!pthread_init_called) {
559 		pthread_init();
560 		pthread_init_called = TRUE;
561 	}
562 # endif
563 
564 	rc = pipe_socketpair(&pipe_ends[0], &is_pipe);
565 	if (0 != rc) {
566 		msyslog(LOG_ERR, "start_blocking_thread: pipe_socketpair() %m");
567 		exit(1);
568 	}
569 	c->resp_read_pipe = move_fd(pipe_ends[0]);
570 	c->resp_write_pipe = move_fd(pipe_ends[1]);
571 	c->ispipe = is_pipe;
572 	flags = fcntl(c->resp_read_pipe, F_GETFL, 0);
573 	if (-1 == flags) {
574 		msyslog(LOG_ERR, "start_blocking_thread: fcntl(F_GETFL) %m");
575 		exit(1);
576 	}
577 	rc = fcntl(c->resp_read_pipe, F_SETFL, O_NONBLOCK | flags);
578 	if (-1 == rc) {
579 		msyslog(LOG_ERR,
580 			"start_blocking_thread: fcntl(F_SETFL, O_NONBLOCK) %m");
581 		exit(1);
582 	}
583 	(*addremove_io_fd)(c->resp_read_pipe, c->ispipe, FALSE);
584 	pthread_attr_init(&thr_attr);
585 	pthread_attr_setdetachstate(&thr_attr, PTHREAD_CREATE_DETACHED);
586 #if defined(HAVE_PTHREAD_ATTR_GETSTACKSIZE) && \
587     defined(HAVE_PTHREAD_ATTR_SETSTACKSIZE)
588 	rc = pthread_attr_getstacksize(&thr_attr, &ostacksize);
589 	if (0 != rc) {
590 		msyslog(LOG_ERR,
591 			"start_blocking_thread: pthread_attr_getstacksize() -> %s",
592 			strerror(rc));
593 	} else {
594 		nstacksize = ostacksize;
595 		/* order is important here: first clamp on upper limit,
596 		 * and the PTHREAD min stack size is ultimate override!
597 		 */
598 		if (nstacksize > THREAD_MAXSTACKSIZE)
599 			nstacksize = THREAD_MAXSTACKSIZE;
600 #            ifdef PTHREAD_STACK_MAX
601 		if (nstacksize > PTHREAD_STACK_MAX)
602 			nstacksize = PTHREAD_STACK_MAX;
603 #            endif
604 
605 		/* now clamp on lower stack limit. */
606 		if (nstacksize < THREAD_MINSTACKSIZE)
607 			nstacksize = THREAD_MINSTACKSIZE;
608 #            ifdef PTHREAD_STACK_MIN
609 		if (nstacksize < PTHREAD_STACK_MIN)
610 			nstacksize = PTHREAD_STACK_MIN;
611 #            endif
612 
613 		if (nstacksize != ostacksize)
614 			rc = pthread_attr_setstacksize(&thr_attr, nstacksize);
615 		if (0 != rc)
616 			msyslog(LOG_ERR,
617 				"start_blocking_thread: pthread_attr_setstacksize(0x%lx -> 0x%lx) -> %s",
618 				(u_long)ostacksize, (u_long)nstacksize,
619 				strerror(rc));
620 	}
621 #else
622 	UNUSED_ARG(nstacksize);
623 	UNUSED_ARG(ostacksize);
624 #endif
625 #if defined(PTHREAD_SCOPE_SYSTEM) && defined(NEED_PTHREAD_SCOPE_SYSTEM)
626 	pthread_attr_setscope(&thr_attr, PTHREAD_SCOPE_SYSTEM);
627 #endif
628 	c->thread_ref = emalloc_zero(sizeof(*c->thread_ref));
629 	block_thread_signals(&saved_sig_mask);
630 	rc = pthread_create(&c->thr_table[0], &thr_attr,
631 			    &blocking_thread, c);
632 	pthread_sigmask(SIG_SETMASK, &saved_sig_mask, NULL);
633 	pthread_attr_destroy(&thr_attr);
634 	if (0 != rc) {
635 		msyslog(LOG_ERR, "start_blocking_thread: pthread_create() -> %s",
636 			strerror(rc));
637 		exit(1);
638 	}
639 	c->thread_ref = &c->thr_table[0];
640 }
641 #endif
642 
643 /* --------------------------------------------------------------------
644  * block_thread_signals()
645  *
646  * Temporarily block signals used by ntpd main thread, so that signal
647  * mask inherited by child threads leaves them blocked.  Returns prior
648  * active signal mask via pmask, to be restored by the main thread
649  * after pthread_create().
650  */
651 #ifndef SYS_WINNT
652 void
653 block_thread_signals(
654 	sigset_t *	pmask
655 	)
656 {
657 	sigset_t	block;
658 
659 	sigemptyset(&block);
660 # ifdef HAVE_SIGNALED_IO
661 #  ifdef SIGIO
662 	sigaddset(&block, SIGIO);
663 #  endif
664 #  ifdef SIGPOLL
665 	sigaddset(&block, SIGPOLL);
666 #  endif
667 # endif	/* HAVE_SIGNALED_IO */
668 	sigaddset(&block, SIGALRM);
669 	sigaddset(&block, MOREDEBUGSIG);
670 	sigaddset(&block, LESSDEBUGSIG);
671 # ifdef SIGDIE1
672 	sigaddset(&block, SIGDIE1);
673 # endif
674 # ifdef SIGDIE2
675 	sigaddset(&block, SIGDIE2);
676 # endif
677 # ifdef SIGDIE3
678 	sigaddset(&block, SIGDIE3);
679 # endif
680 # ifdef SIGDIE4
681 	sigaddset(&block, SIGDIE4);
682 # endif
683 # ifdef SIGBUS
684 	sigaddset(&block, SIGBUS);
685 # endif
686 	sigemptyset(pmask);
687 	pthread_sigmask(SIG_BLOCK, &block, pmask);
688 }
689 #endif	/* !SYS_WINNT */
690 
691 
692 /* --------------------------------------------------------------------
693  * Create & destroy semaphores. This is sufficiently different between
694  * POSIX and Windows to warrant wrapper functions and close enough to
695  * use the concept of synchronization via semaphore for all platforms.
696  */
697 static sem_ref
698 create_sema(
699 	sema_type*	semptr,
700 	u_int		inival,
701 	u_int		maxval)
702 {
703 #ifdef SYS_WINNT
704 
705 	long svini, svmax;
706 	if (NULL != semptr) {
707 		svini = (inival < LONG_MAX)
708 		    ? (long)inival : LONG_MAX;
709 		svmax = (maxval < LONG_MAX && maxval > 0)
710 		    ? (long)maxval : LONG_MAX;
711 		semptr->shnd = CreateSemaphore(NULL, svini, svmax, NULL);
712 		if (NULL == semptr->shnd)
713 			semptr = NULL;
714 	}
715 
716 #else
717 
718 	(void)maxval;
719 	if (semptr && sem_init(semptr, FALSE, inival))
720 		semptr = NULL;
721 
722 #endif
723 
724 	return semptr;
725 }
726 
727 /* ------------------------------------------------------------------ */
728 static sem_ref
729 delete_sema(
730 	sem_ref obj)
731 {
732 
733 #   ifdef SYS_WINNT
734 
735 	if (obj) {
736 		if (obj->shnd)
737 			CloseHandle(obj->shnd);
738 		obj->shnd = NULL;
739 	}
740 
741 #   else
742 
743 	if (obj)
744 		sem_destroy(obj);
745 
746 #   endif
747 
748 	return NULL;
749 }
750 
751 /* --------------------------------------------------------------------
752  * prepare_child_sems()
753  *
754  * create sync & access semaphores
755  *
756  * All semaphores are cleared, only the access semaphore has 1 unit.
757  * Childs wait on 'workitems_pending', then grabs 'sema_access'
758  * and dequeues jobs. When done, 'sema_access' is given one unit back.
759  *
760  * The producer grabs 'sema_access', manages the queue, restores
761  * 'sema_access' and puts one unit into 'workitems_pending'.
762  *
763  * The story goes the same for the response queue.
764  */
765 static void
766 prepare_child_sems(
767 	blocking_child *c
768 	)
769 {
770 	if (NULL == worker_memlock)
771 		worker_memlock = create_sema(&worker_mmutex, 1, 1);
772 
773 	c->accesslock           = create_sema(&c->sem_table[0], 1, 1);
774 	c->workitems_pending    = create_sema(&c->sem_table[1], 0, 0);
775 	c->wake_scheduled_sleep = create_sema(&c->sem_table[2], 0, 1);
776 #   ifndef WORK_PIPE
777 	c->responses_pending    = create_sema(&c->sem_table[3], 0, 0);
778 #   endif
779 }
780 
781 /* --------------------------------------------------------------------
782  * wait for semaphore. Where the wait can be interrupted, it will
783  * internally resume -- When this function returns, there is either no
784  * semaphore at all, a timeout occurred, or the caller could
785  * successfully take a token from the semaphore.
786  *
787  * For untimed wait, not checking the result of this function at all is
788  * definitely an option.
789  */
790 static int
791 wait_for_sem(
792 	sem_ref			sem,
793 	struct timespec *	timeout		/* wall-clock */
794 	)
795 #ifdef SYS_WINNT
796 {
797 	struct timespec now;
798 	struct timespec delta;
799 	DWORD		msec;
800 	DWORD		rc;
801 
802 	if (!(sem && sem->shnd)) {
803 		errno = EINVAL;
804 		return -1;
805 	}
806 
807 	if (NULL == timeout) {
808 		msec = INFINITE;
809 	} else {
810 		getclock(TIMEOFDAY, &now);
811 		delta = sub_tspec(*timeout, now);
812 		if (delta.tv_sec < 0) {
813 			msec = 0;
814 		} else if ((delta.tv_sec + 1) >= (MAXDWORD / 1000)) {
815 			msec = INFINITE;
816 		} else {
817 			msec = 1000 * (DWORD)delta.tv_sec;
818 			msec += delta.tv_nsec / (1000 * 1000);
819 		}
820 	}
821 	rc = WaitForSingleObject(sem->shnd, msec);
822 	if (WAIT_OBJECT_0 == rc)
823 		return 0;
824 	if (WAIT_TIMEOUT == rc) {
825 		errno = ETIMEDOUT;
826 		return -1;
827 	}
828 	msyslog(LOG_ERR, "WaitForSingleObject unexpected 0x%x", rc);
829 	errno = EFAULT;
830 	return -1;
831 }
832 #else	/* pthreads wait_for_sem() follows */
833 {
834 	int rc = -1;
835 
836 	if (sem) do {
837 			if (NULL == timeout)
838 				rc = sem_wait(sem);
839 			else
840 				rc = sem_timedwait(sem, timeout);
841 		} while (rc == -1 && errno == EINTR);
842 	else
843 		errno = EINVAL;
844 
845 	return rc;
846 }
847 #endif
848 
849 /* --------------------------------------------------------------------
850  * blocking_thread - thread functions have WINAPI (aka 'stdcall')
851  * calling conventions under Windows and POSIX-defined signature
852  * otherwise.
853  */
854 #ifdef SYS_WINNT
855 u_int WINAPI
856 #else
857 void *
858 #endif
859 blocking_thread(
860 	void *	ThreadArg
861 	)
862 {
863 	blocking_child *c;
864 
865 	c = ThreadArg;
866 	exit_worker(blocking_child_common(c));
867 
868 	/* NOTREACHED */
869 	return 0;
870 }
871 
872 /* --------------------------------------------------------------------
873  * req_child_exit() runs in the parent.
874  *
875  * This function is called from from the idle timer, too, and possibly
876  * without a thread being there any longer. Since we have folded up our
877  * tent in that case and all the semaphores are already gone, we simply
878  * ignore this request in this case.
879  *
880  * Since the existence of the semaphores is controlled exclusively by
881  * the parent, there's no risk of data race here.
882  */
883 int
884 req_child_exit(
885 	blocking_child *c
886 	)
887 {
888 	return (c->accesslock)
889 	    ? queue_req_pointer(c, CHILD_EXIT_REQ)
890 	    : 0;
891 }
892 
893 /* --------------------------------------------------------------------
894  * cleanup_after_child() runs in parent.
895  */
896 static void
897 cleanup_after_child(
898 	blocking_child *	c
899 	)
900 {
901 	DEBUG_INSIST(!c->reusable);
902 
903 #   ifdef SYS_WINNT
904 	/* The thread was not created in detached state, so we better
905 	 * clean up.
906 	 */
907 	if (c->thread_ref && c->thread_ref->thnd) {
908 		WaitForSingleObject(c->thread_ref->thnd, INFINITE);
909 		INSIST(CloseHandle(c->thread_ref->thnd));
910 		c->thread_ref->thnd = NULL;
911 	}
912 #   endif
913 	c->thread_ref = NULL;
914 
915 	/* remove semaphores and (if signalling vi IO) pipes */
916 
917 	c->accesslock           = delete_sema(c->accesslock);
918 	c->workitems_pending    = delete_sema(c->workitems_pending);
919 	c->wake_scheduled_sleep = delete_sema(c->wake_scheduled_sleep);
920 
921 #   ifdef WORK_PIPE
922 	DEBUG_INSIST(-1 != c->resp_read_pipe);
923 	DEBUG_INSIST(-1 != c->resp_write_pipe);
924 	(*addremove_io_fd)(c->resp_read_pipe, c->ispipe, TRUE);
925 	close(c->resp_write_pipe);
926 	close(c->resp_read_pipe);
927 	c->resp_write_pipe = -1;
928 	c->resp_read_pipe = -1;
929 #   else
930 	DEBUG_INSIST(NULL != c->responses_pending);
931 	(*addremove_io_semaphore)(c->responses_pending->shnd, TRUE);
932 	c->responses_pending = delete_sema(c->responses_pending);
933 #   endif
934 
935 	/* Is it necessary to check if there are pending requests and
936 	 * responses? If so, and if there are, what to do with them?
937 	 */
938 
939 	/* re-init buffer index sequencers */
940 	c->head_workitem = 0;
941 	c->tail_workitem = 0;
942 	c->head_response = 0;
943 	c->tail_response = 0;
944 
945 	c->reusable = TRUE;
946 }
947 
948 
949 #else	/* !WORK_THREAD follows */
950 char work_thread_nonempty_compilation_unit;
951 #endif
952