xref: /freebsd/contrib/ntp/libntp/work_thread.c (revision c1cdf6a42f0d951ba720688dfc6ce07608b02f6e)
1 /*
2  * work_thread.c - threads implementation for blocking worker child.
3  */
4 #include <config.h>
5 #include "ntp_workimpl.h"
6 
7 #ifdef WORK_THREAD
8 
9 #include <stdio.h>
10 #include <ctype.h>
11 #include <signal.h>
12 #ifndef SYS_WINNT
13 #include <pthread.h>
14 #endif
15 
16 #include "ntp_stdlib.h"
17 #include "ntp_malloc.h"
18 #include "ntp_syslog.h"
19 #include "ntpd.h"
20 #include "ntp_io.h"
21 #include "ntp_assert.h"
22 #include "ntp_unixtime.h"
23 #include "timespecops.h"
24 #include "ntp_worker.h"
25 
26 #define CHILD_EXIT_REQ	((blocking_pipe_header *)(intptr_t)-1)
27 #define CHILD_GONE_RESP	CHILD_EXIT_REQ
28 /* Queue size increments:
29  * The request queue grows a bit faster than the response queue -- the
30  * daemon can push requests and pull results faster on avarage than the
31  * worker can process requests and push results...  If this really pays
32  * off is debatable.
33  */
34 #define WORKITEMS_ALLOC_INC	16
35 #define RESPONSES_ALLOC_INC	4
36 
37 /* Fiddle with min/max stack sizes. 64kB minimum seems to work, so we
38  * set the maximum to 256kB. If the minimum goes below the
39  * system-defined minimum stack size, we have to adjust accordingly.
40  */
41 #ifndef THREAD_MINSTACKSIZE
42 # define THREAD_MINSTACKSIZE	(64U * 1024)
43 #endif
44 #ifndef __sun
45 #if defined(PTHREAD_STACK_MIN) && THREAD_MINSTACKSIZE < PTHREAD_STACK_MIN
46 # undef THREAD_MINSTACKSIZE
47 # define THREAD_MINSTACKSIZE PTHREAD_STACK_MIN
48 #endif
49 #endif
50 
51 #ifndef THREAD_MAXSTACKSIZE
52 # define THREAD_MAXSTACKSIZE	(256U * 1024)
53 #endif
54 #if THREAD_MAXSTACKSIZE < THREAD_MINSTACKSIZE
55 # undef  THREAD_MAXSTACKSIZE
56 # define THREAD_MAXSTACKSIZE THREAD_MINSTACKSIZE
57 #endif
58 
59 /* need a good integer to store a pointer... */
60 #ifndef UINTPTR_T
61 # if defined(UINTPTR_MAX)
62 #  define UINTPTR_T uintptr_t
63 # elif defined(UINT_PTR)
64 #  define UINTPTR_T UINT_PTR
65 # else
66 #  define UINTPTR_T size_t
67 # endif
68 #endif
69 
70 
71 #ifdef SYS_WINNT
72 
73 # define thread_exit(c)	_endthreadex(c)
74 # define tickle_sem(sh) ReleaseSemaphore((sh->shnd), 1, NULL)
75 u_int	WINAPI	blocking_thread(void *);
76 static BOOL	same_os_sema(const sem_ref obj, void * osobj);
77 
78 #else
79 
80 # define thread_exit(c)	pthread_exit((void*)(UINTPTR_T)(c))
81 # define tickle_sem	sem_post
82 void *		blocking_thread(void *);
83 static	void	block_thread_signals(sigset_t *);
84 
85 #endif
86 
87 #ifdef WORK_PIPE
88 addremove_io_fd_func		addremove_io_fd;
89 #else
90 addremove_io_semaphore_func	addremove_io_semaphore;
91 #endif
92 
93 static	void	start_blocking_thread(blocking_child *);
94 static	void	start_blocking_thread_internal(blocking_child *);
95 static	void	prepare_child_sems(blocking_child *);
96 static	int	wait_for_sem(sem_ref, struct timespec *);
97 static	int	ensure_workitems_empty_slot(blocking_child *);
98 static	int	ensure_workresp_empty_slot(blocking_child *);
99 static	int	queue_req_pointer(blocking_child *, blocking_pipe_header *);
100 static	void	cleanup_after_child(blocking_child *);
101 
102 static sema_type worker_mmutex;
103 static sem_ref   worker_memlock;
104 
105 /* --------------------------------------------------------------------
106  * locking the global worker state table (and other global stuff)
107  */
108 void
109 worker_global_lock(
110 	int inOrOut)
111 {
112 	if (worker_memlock) {
113 		if (inOrOut)
114 			wait_for_sem(worker_memlock, NULL);
115 		else
116 			tickle_sem(worker_memlock);
117 	}
118 }
119 
120 /* --------------------------------------------------------------------
121  * implementation isolation wrapper
122  */
123 void
124 exit_worker(
125 	int	exitcode
126 	)
127 {
128 	thread_exit(exitcode);	/* see #define thread_exit */
129 }
130 
131 /* --------------------------------------------------------------------
132  * sleep for a given time or until the wakup semaphore is tickled.
133  */
134 int
135 worker_sleep(
136 	blocking_child *	c,
137 	time_t			seconds
138 	)
139 {
140 	struct timespec	until;
141 	int		rc;
142 
143 # ifdef HAVE_CLOCK_GETTIME
144 	if (0 != clock_gettime(CLOCK_REALTIME, &until)) {
145 		msyslog(LOG_ERR, "worker_sleep: clock_gettime() failed: %m");
146 		return -1;
147 	}
148 # else
149 	if (0 != getclock(TIMEOFDAY, &until)) {
150 		msyslog(LOG_ERR, "worker_sleep: getclock() failed: %m");
151 		return -1;
152 	}
153 # endif
154 	until.tv_sec += seconds;
155 	rc = wait_for_sem(c->wake_scheduled_sleep, &until);
156 	if (0 == rc)
157 		return -1;
158 	if (-1 == rc && ETIMEDOUT == errno)
159 		return 0;
160 	msyslog(LOG_ERR, "worker_sleep: sem_timedwait: %m");
161 	return -1;
162 }
163 
164 
165 /* --------------------------------------------------------------------
166  * Wake up a worker that takes a nap.
167  */
168 void
169 interrupt_worker_sleep(void)
170 {
171 	u_int			idx;
172 	blocking_child *	c;
173 
174 	for (idx = 0; idx < blocking_children_alloc; idx++) {
175 		c = blocking_children[idx];
176 		if (NULL == c || NULL == c->wake_scheduled_sleep)
177 			continue;
178 		tickle_sem(c->wake_scheduled_sleep);
179 	}
180 }
181 
182 /* --------------------------------------------------------------------
183  * Make sure there is an empty slot at the head of the request
184  * queue. Tell if the queue is currently empty.
185  */
186 static int
187 ensure_workitems_empty_slot(
188 	blocking_child *c
189 	)
190 {
191 	/*
192 	** !!! PRECONDITION: caller holds access lock!
193 	**
194 	** This simply tries to increase the size of the buffer if it
195 	** becomes full. The resize operation does *not* maintain the
196 	** order of requests, but that should be irrelevant since the
197 	** processing is considered asynchronous anyway.
198 	**
199 	** Return if the buffer is currently empty.
200 	*/
201 
202 	static const size_t each =
203 	    sizeof(blocking_children[0]->workitems[0]);
204 
205 	size_t	new_alloc;
206 	size_t  slots_used;
207 	size_t	sidx;
208 
209 	slots_used = c->head_workitem - c->tail_workitem;
210 	if (slots_used >= c->workitems_alloc) {
211 		new_alloc  = c->workitems_alloc + WORKITEMS_ALLOC_INC;
212 		c->workitems = erealloc(c->workitems, new_alloc * each);
213 		for (sidx = c->workitems_alloc; sidx < new_alloc; ++sidx)
214 		    c->workitems[sidx] = NULL;
215 		c->tail_workitem   = 0;
216 		c->head_workitem   = c->workitems_alloc;
217 		c->workitems_alloc = new_alloc;
218 	}
219 	INSIST(NULL == c->workitems[c->head_workitem % c->workitems_alloc]);
220 	return (0 == slots_used);
221 }
222 
223 /* --------------------------------------------------------------------
224  * Make sure there is an empty slot at the head of the response
225  * queue. Tell if the queue is currently empty.
226  */
227 static int
228 ensure_workresp_empty_slot(
229 	blocking_child *c
230 	)
231 {
232 	/*
233 	** !!! PRECONDITION: caller holds access lock!
234 	**
235 	** Works like the companion function above.
236 	*/
237 
238 	static const size_t each =
239 	    sizeof(blocking_children[0]->responses[0]);
240 
241 	size_t	new_alloc;
242 	size_t  slots_used;
243 	size_t	sidx;
244 
245 	slots_used = c->head_response - c->tail_response;
246 	if (slots_used >= c->responses_alloc) {
247 		new_alloc  = c->responses_alloc + RESPONSES_ALLOC_INC;
248 		c->responses = erealloc(c->responses, new_alloc * each);
249 		for (sidx = c->responses_alloc; sidx < new_alloc; ++sidx)
250 		    c->responses[sidx] = NULL;
251 		c->tail_response   = 0;
252 		c->head_response   = c->responses_alloc;
253 		c->responses_alloc = new_alloc;
254 	}
255 	INSIST(NULL == c->responses[c->head_response % c->responses_alloc]);
256 	return (0 == slots_used);
257 }
258 
259 
260 /* --------------------------------------------------------------------
261  * queue_req_pointer() - append a work item or idle exit request to
262  *			 blocking_workitems[]. Employ proper locking.
263  */
264 static int
265 queue_req_pointer(
266 	blocking_child	*	c,
267 	blocking_pipe_header *	hdr
268 	)
269 {
270 	size_t qhead;
271 
272 	/* >>>> ACCESS LOCKING STARTS >>>> */
273 	wait_for_sem(c->accesslock, NULL);
274 	ensure_workitems_empty_slot(c);
275 	qhead = c->head_workitem;
276 	c->workitems[qhead % c->workitems_alloc] = hdr;
277 	c->head_workitem = 1 + qhead;
278 	tickle_sem(c->accesslock);
279 	/* <<<< ACCESS LOCKING ENDS <<<< */
280 
281 	/* queue consumer wake-up notification */
282 	tickle_sem(c->workitems_pending);
283 
284 	return 0;
285 }
286 
287 /* --------------------------------------------------------------------
288  * API function to make sure a worker is running, a proper private copy
289  * of the data is made, the data eneterd into the queue and the worker
290  * is signalled.
291  */
292 int
293 send_blocking_req_internal(
294 	blocking_child *	c,
295 	blocking_pipe_header *	hdr,
296 	void *			data
297 	)
298 {
299 	blocking_pipe_header *	threadcopy;
300 	size_t			payload_octets;
301 
302 	REQUIRE(hdr != NULL);
303 	REQUIRE(data != NULL);
304 	DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == hdr->magic_sig);
305 
306 	if (hdr->octets <= sizeof(*hdr))
307 		return 1;	/* failure */
308 	payload_octets = hdr->octets - sizeof(*hdr);
309 
310 	if (NULL == c->thread_ref)
311 		start_blocking_thread(c);
312 	threadcopy = emalloc(hdr->octets);
313 	memcpy(threadcopy, hdr, sizeof(*hdr));
314 	memcpy((char *)threadcopy + sizeof(*hdr), data, payload_octets);
315 
316 	return queue_req_pointer(c, threadcopy);
317 }
318 
319 /* --------------------------------------------------------------------
320  * Wait for the 'incoming queue no longer empty' signal, lock the shared
321  * structure and dequeue an item.
322  */
323 blocking_pipe_header *
324 receive_blocking_req_internal(
325 	blocking_child *	c
326 	)
327 {
328 	blocking_pipe_header *	req;
329 	size_t			qhead, qtail;
330 
331 	req = NULL;
332 	do {
333 		/* wait for tickle from the producer side */
334 		wait_for_sem(c->workitems_pending, NULL);
335 
336 		/* >>>> ACCESS LOCKING STARTS >>>> */
337 		wait_for_sem(c->accesslock, NULL);
338 		qhead = c->head_workitem;
339 		do {
340 			qtail = c->tail_workitem;
341 			if (qhead == qtail)
342 				break;
343 			c->tail_workitem = qtail + 1;
344 			qtail %= c->workitems_alloc;
345 			req = c->workitems[qtail];
346 			c->workitems[qtail] = NULL;
347 		} while (NULL == req);
348 		tickle_sem(c->accesslock);
349 		/* <<<< ACCESS LOCKING ENDS <<<< */
350 
351 	} while (NULL == req);
352 
353 	INSIST(NULL != req);
354 	if (CHILD_EXIT_REQ == req) {	/* idled out */
355 		send_blocking_resp_internal(c, CHILD_GONE_RESP);
356 		req = NULL;
357 	}
358 
359 	return req;
360 }
361 
362 /* --------------------------------------------------------------------
363  * Push a response into the return queue and eventually tickle the
364  * receiver.
365  */
366 int
367 send_blocking_resp_internal(
368 	blocking_child *	c,
369 	blocking_pipe_header *	resp
370 	)
371 {
372 	size_t	qhead;
373 	int	empty;
374 
375 	/* >>>> ACCESS LOCKING STARTS >>>> */
376 	wait_for_sem(c->accesslock, NULL);
377 	empty = ensure_workresp_empty_slot(c);
378 	qhead = c->head_response;
379 	c->responses[qhead % c->responses_alloc] = resp;
380 	c->head_response = 1 + qhead;
381 	tickle_sem(c->accesslock);
382 	/* <<<< ACCESS LOCKING ENDS <<<< */
383 
384 	/* queue consumer wake-up notification */
385 	if (empty)
386 	{
387 #	    ifdef WORK_PIPE
388 		if (1 != write(c->resp_write_pipe, "", 1))
389 			msyslog(LOG_WARNING, "async resolver: %s",
390 				"failed to notify main thread!");
391 #	    else
392 		tickle_sem(c->responses_pending);
393 #	    endif
394 	}
395 	return 0;
396 }
397 
398 
399 #ifndef WORK_PIPE
400 
401 /* --------------------------------------------------------------------
402  * Check if a (Windows-)hanndle to a semaphore is actually the same we
403  * are using inside the sema wrapper.
404  */
405 static BOOL
406 same_os_sema(
407 	const sem_ref	obj,
408 	void*		osh
409 	)
410 {
411 	return obj && osh && (obj->shnd == (HANDLE)osh);
412 }
413 
414 /* --------------------------------------------------------------------
415  * Find the shared context that associates to an OS handle and make sure
416  * the data is dequeued and processed.
417  */
418 void
419 handle_blocking_resp_sem(
420 	void *	context
421 	)
422 {
423 	blocking_child *	c;
424 	u_int			idx;
425 
426 	c = NULL;
427 	for (idx = 0; idx < blocking_children_alloc; idx++) {
428 		c = blocking_children[idx];
429 		if (c != NULL &&
430 			c->thread_ref != NULL &&
431 			same_os_sema(c->responses_pending, context))
432 			break;
433 	}
434 	if (idx < blocking_children_alloc)
435 		process_blocking_resp(c);
436 }
437 #endif	/* !WORK_PIPE */
438 
439 /* --------------------------------------------------------------------
440  * Fetch the next response from the return queue. In case of signalling
441  * via pipe, make sure the pipe is flushed, too.
442  */
443 blocking_pipe_header *
444 receive_blocking_resp_internal(
445 	blocking_child *	c
446 	)
447 {
448 	blocking_pipe_header *	removed;
449 	size_t			qhead, qtail, slot;
450 
451 #ifdef WORK_PIPE
452 	int			rc;
453 	char			scratch[32];
454 
455 	do
456 		rc = read(c->resp_read_pipe, scratch, sizeof(scratch));
457 	while (-1 == rc && EINTR == errno);
458 #endif
459 
460 	/* >>>> ACCESS LOCKING STARTS >>>> */
461 	wait_for_sem(c->accesslock, NULL);
462 	qhead = c->head_response;
463 	qtail = c->tail_response;
464 	for (removed = NULL; !removed && (qhead != qtail); ++qtail) {
465 		slot = qtail % c->responses_alloc;
466 		removed = c->responses[slot];
467 		c->responses[slot] = NULL;
468 	}
469 	c->tail_response = qtail;
470 	tickle_sem(c->accesslock);
471 	/* <<<< ACCESS LOCKING ENDS <<<< */
472 
473 	if (NULL != removed) {
474 		DEBUG_ENSURE(CHILD_GONE_RESP == removed ||
475 			     BLOCKING_RESP_MAGIC == removed->magic_sig);
476 	}
477 	if (CHILD_GONE_RESP == removed) {
478 		cleanup_after_child(c);
479 		removed = NULL;
480 	}
481 
482 	return removed;
483 }
484 
485 /* --------------------------------------------------------------------
486  * Light up a new worker.
487  */
488 static void
489 start_blocking_thread(
490 	blocking_child *	c
491 	)
492 {
493 
494 	DEBUG_INSIST(!c->reusable);
495 
496 	prepare_child_sems(c);
497 	start_blocking_thread_internal(c);
498 }
499 
500 /* --------------------------------------------------------------------
501  * Create a worker thread. There are several differences between POSIX
502  * and Windows, of course -- most notably the Windows thread is no
503  * detached thread, and we keep the handle around until we want to get
504  * rid of the thread. The notification scheme also differs: Windows
505  * makes use of semaphores in both directions, POSIX uses a pipe for
506  * integration with 'select()' or alike.
507  */
508 static void
509 start_blocking_thread_internal(
510 	blocking_child *	c
511 	)
512 #ifdef SYS_WINNT
513 {
514 	BOOL	resumed;
515 
516 	c->thread_ref = NULL;
517 	(*addremove_io_semaphore)(c->responses_pending->shnd, FALSE);
518 	c->thr_table[0].thnd =
519 		(HANDLE)_beginthreadex(
520 			NULL,
521 			0,
522 			&blocking_thread,
523 			c,
524 			CREATE_SUSPENDED,
525 			NULL);
526 
527 	if (NULL == c->thr_table[0].thnd) {
528 		msyslog(LOG_ERR, "start blocking thread failed: %m");
529 		exit(-1);
530 	}
531 	/* remember the thread priority is only within the process class */
532 	if (!SetThreadPriority(c->thr_table[0].thnd,
533 			       THREAD_PRIORITY_BELOW_NORMAL))
534 		msyslog(LOG_ERR, "Error lowering blocking thread priority: %m");
535 
536 	resumed = ResumeThread(c->thr_table[0].thnd);
537 	DEBUG_INSIST(resumed);
538 	c->thread_ref = &c->thr_table[0];
539 }
540 #else	/* pthreads start_blocking_thread_internal() follows */
541 {
542 # ifdef NEED_PTHREAD_INIT
543 	static int	pthread_init_called;
544 # endif
545 	pthread_attr_t	thr_attr;
546 	int		rc;
547 	int		pipe_ends[2];	/* read then write */
548 	int		is_pipe;
549 	int		flags;
550 	size_t		ostacksize;
551 	size_t		nstacksize;
552 	sigset_t	saved_sig_mask;
553 
554 	c->thread_ref = NULL;
555 
556 # ifdef NEED_PTHREAD_INIT
557 	/*
558 	 * from lib/isc/unix/app.c:
559 	 * BSDI 3.1 seg faults in pthread_sigmask() if we don't do this.
560 	 */
561 	if (!pthread_init_called) {
562 		pthread_init();
563 		pthread_init_called = TRUE;
564 	}
565 # endif
566 
567 	rc = pipe_socketpair(&pipe_ends[0], &is_pipe);
568 	if (0 != rc) {
569 		msyslog(LOG_ERR, "start_blocking_thread: pipe_socketpair() %m");
570 		exit(1);
571 	}
572 	c->resp_read_pipe = move_fd(pipe_ends[0]);
573 	c->resp_write_pipe = move_fd(pipe_ends[1]);
574 	c->ispipe = is_pipe;
575 	flags = fcntl(c->resp_read_pipe, F_GETFL, 0);
576 	if (-1 == flags) {
577 		msyslog(LOG_ERR, "start_blocking_thread: fcntl(F_GETFL) %m");
578 		exit(1);
579 	}
580 	rc = fcntl(c->resp_read_pipe, F_SETFL, O_NONBLOCK | flags);
581 	if (-1 == rc) {
582 		msyslog(LOG_ERR,
583 			"start_blocking_thread: fcntl(F_SETFL, O_NONBLOCK) %m");
584 		exit(1);
585 	}
586 	(*addremove_io_fd)(c->resp_read_pipe, c->ispipe, FALSE);
587 	pthread_attr_init(&thr_attr);
588 	pthread_attr_setdetachstate(&thr_attr, PTHREAD_CREATE_DETACHED);
589 #if defined(HAVE_PTHREAD_ATTR_GETSTACKSIZE) && \
590     defined(HAVE_PTHREAD_ATTR_SETSTACKSIZE)
591 	rc = pthread_attr_getstacksize(&thr_attr, &ostacksize);
592 	if (0 != rc) {
593 		msyslog(LOG_ERR,
594 			"start_blocking_thread: pthread_attr_getstacksize() -> %s",
595 			strerror(rc));
596 	} else {
597 		if (ostacksize < THREAD_MINSTACKSIZE)
598 			nstacksize = THREAD_MINSTACKSIZE;
599 		else if (ostacksize > THREAD_MAXSTACKSIZE)
600 			nstacksize = THREAD_MAXSTACKSIZE;
601 		else
602 			nstacksize = ostacksize;
603 		if (nstacksize != ostacksize)
604 			rc = pthread_attr_setstacksize(&thr_attr, nstacksize);
605 		if (0 != rc)
606 			msyslog(LOG_ERR,
607 				"start_blocking_thread: pthread_attr_setstacksize(0x%lx -> 0x%lx) -> %s",
608 				(u_long)ostacksize, (u_long)nstacksize,
609 				strerror(rc));
610 	}
611 #else
612 	UNUSED_ARG(nstacksize);
613 	UNUSED_ARG(ostacksize);
614 #endif
615 #if defined(PTHREAD_SCOPE_SYSTEM) && defined(NEED_PTHREAD_SCOPE_SYSTEM)
616 	pthread_attr_setscope(&thr_attr, PTHREAD_SCOPE_SYSTEM);
617 #endif
618 	c->thread_ref = emalloc_zero(sizeof(*c->thread_ref));
619 	block_thread_signals(&saved_sig_mask);
620 	rc = pthread_create(&c->thr_table[0], &thr_attr,
621 			    &blocking_thread, c);
622 	pthread_sigmask(SIG_SETMASK, &saved_sig_mask, NULL);
623 	pthread_attr_destroy(&thr_attr);
624 	if (0 != rc) {
625 		msyslog(LOG_ERR, "start_blocking_thread: pthread_create() -> %s",
626 			strerror(rc));
627 		exit(1);
628 	}
629 	c->thread_ref = &c->thr_table[0];
630 }
631 #endif
632 
633 /* --------------------------------------------------------------------
634  * block_thread_signals()
635  *
636  * Temporarily block signals used by ntpd main thread, so that signal
637  * mask inherited by child threads leaves them blocked.  Returns prior
638  * active signal mask via pmask, to be restored by the main thread
639  * after pthread_create().
640  */
641 #ifndef SYS_WINNT
642 void
643 block_thread_signals(
644 	sigset_t *	pmask
645 	)
646 {
647 	sigset_t	block;
648 
649 	sigemptyset(&block);
650 # ifdef HAVE_SIGNALED_IO
651 #  ifdef SIGIO
652 	sigaddset(&block, SIGIO);
653 #  endif
654 #  ifdef SIGPOLL
655 	sigaddset(&block, SIGPOLL);
656 #  endif
657 # endif	/* HAVE_SIGNALED_IO */
658 	sigaddset(&block, SIGALRM);
659 	sigaddset(&block, MOREDEBUGSIG);
660 	sigaddset(&block, LESSDEBUGSIG);
661 # ifdef SIGDIE1
662 	sigaddset(&block, SIGDIE1);
663 # endif
664 # ifdef SIGDIE2
665 	sigaddset(&block, SIGDIE2);
666 # endif
667 # ifdef SIGDIE3
668 	sigaddset(&block, SIGDIE3);
669 # endif
670 # ifdef SIGDIE4
671 	sigaddset(&block, SIGDIE4);
672 # endif
673 # ifdef SIGBUS
674 	sigaddset(&block, SIGBUS);
675 # endif
676 	sigemptyset(pmask);
677 	pthread_sigmask(SIG_BLOCK, &block, pmask);
678 }
679 #endif	/* !SYS_WINNT */
680 
681 
682 /* --------------------------------------------------------------------
683  * Create & destroy semaphores. This is sufficiently different between
684  * POSIX and Windows to warrant wrapper functions and close enough to
685  * use the concept of synchronization via semaphore for all platforms.
686  */
687 static sem_ref
688 create_sema(
689 	sema_type*	semptr,
690 	u_int		inival,
691 	u_int		maxval)
692 {
693 #ifdef SYS_WINNT
694 
695 	long svini, svmax;
696 	if (NULL != semptr) {
697 		svini = (inival < LONG_MAX)
698 		    ? (long)inival : LONG_MAX;
699 		svmax = (maxval < LONG_MAX && maxval > 0)
700 		    ? (long)maxval : LONG_MAX;
701 		semptr->shnd = CreateSemaphore(NULL, svini, svmax, NULL);
702 		if (NULL == semptr->shnd)
703 			semptr = NULL;
704 	}
705 
706 #else
707 
708 	(void)maxval;
709 	if (semptr && sem_init(semptr, FALSE, inival))
710 		semptr = NULL;
711 
712 #endif
713 
714 	return semptr;
715 }
716 
717 /* ------------------------------------------------------------------ */
718 static sem_ref
719 delete_sema(
720 	sem_ref obj)
721 {
722 
723 #   ifdef SYS_WINNT
724 
725 	if (obj) {
726 		if (obj->shnd)
727 			CloseHandle(obj->shnd);
728 		obj->shnd = NULL;
729 	}
730 
731 #   else
732 
733 	if (obj)
734 		sem_destroy(obj);
735 
736 #   endif
737 
738 	return NULL;
739 }
740 
741 /* --------------------------------------------------------------------
742  * prepare_child_sems()
743  *
744  * create sync & access semaphores
745  *
746  * All semaphores are cleared, only the access semaphore has 1 unit.
747  * Childs wait on 'workitems_pending', then grabs 'sema_access'
748  * and dequeues jobs. When done, 'sema_access' is given one unit back.
749  *
750  * The producer grabs 'sema_access', manages the queue, restores
751  * 'sema_access' and puts one unit into 'workitems_pending'.
752  *
753  * The story goes the same for the response queue.
754  */
755 static void
756 prepare_child_sems(
757 	blocking_child *c
758 	)
759 {
760 	if (NULL == worker_memlock)
761 		worker_memlock = create_sema(&worker_mmutex, 1, 1);
762 
763 	c->accesslock           = create_sema(&c->sem_table[0], 1, 1);
764 	c->workitems_pending    = create_sema(&c->sem_table[1], 0, 0);
765 	c->wake_scheduled_sleep = create_sema(&c->sem_table[2], 0, 1);
766 #   ifndef WORK_PIPE
767 	c->responses_pending    = create_sema(&c->sem_table[3], 0, 0);
768 #   endif
769 }
770 
771 /* --------------------------------------------------------------------
772  * wait for semaphore. Where the wait can be interrupted, it will
773  * internally resume -- When this function returns, there is either no
774  * semaphore at all, a timeout occurred, or the caller could
775  * successfully take a token from the semaphore.
776  *
777  * For untimed wait, not checking the result of this function at all is
778  * definitely an option.
779  */
780 static int
781 wait_for_sem(
782 	sem_ref			sem,
783 	struct timespec *	timeout		/* wall-clock */
784 	)
785 #ifdef SYS_WINNT
786 {
787 	struct timespec now;
788 	struct timespec delta;
789 	DWORD		msec;
790 	DWORD		rc;
791 
792 	if (!(sem && sem->shnd)) {
793 		errno = EINVAL;
794 		return -1;
795 	}
796 
797 	if (NULL == timeout) {
798 		msec = INFINITE;
799 	} else {
800 		getclock(TIMEOFDAY, &now);
801 		delta = sub_tspec(*timeout, now);
802 		if (delta.tv_sec < 0) {
803 			msec = 0;
804 		} else if ((delta.tv_sec + 1) >= (MAXDWORD / 1000)) {
805 			msec = INFINITE;
806 		} else {
807 			msec = 1000 * (DWORD)delta.tv_sec;
808 			msec += delta.tv_nsec / (1000 * 1000);
809 		}
810 	}
811 	rc = WaitForSingleObject(sem->shnd, msec);
812 	if (WAIT_OBJECT_0 == rc)
813 		return 0;
814 	if (WAIT_TIMEOUT == rc) {
815 		errno = ETIMEDOUT;
816 		return -1;
817 	}
818 	msyslog(LOG_ERR, "WaitForSingleObject unexpected 0x%x", rc);
819 	errno = EFAULT;
820 	return -1;
821 }
822 #else	/* pthreads wait_for_sem() follows */
823 {
824 	int rc = -1;
825 
826 	if (sem) do {
827 			if (NULL == timeout)
828 				rc = sem_wait(sem);
829 			else
830 				rc = sem_timedwait(sem, timeout);
831 		} while (rc == -1 && errno == EINTR);
832 	else
833 		errno = EINVAL;
834 
835 	return rc;
836 }
837 #endif
838 
839 /* --------------------------------------------------------------------
840  * blocking_thread - thread functions have WINAPI (aka 'stdcall')
841  * calling conventions under Windows and POSIX-defined signature
842  * otherwise.
843  */
844 #ifdef SYS_WINNT
845 u_int WINAPI
846 #else
847 void *
848 #endif
849 blocking_thread(
850 	void *	ThreadArg
851 	)
852 {
853 	blocking_child *c;
854 
855 	c = ThreadArg;
856 	exit_worker(blocking_child_common(c));
857 
858 	/* NOTREACHED */
859 	return 0;
860 }
861 
862 /* --------------------------------------------------------------------
863  * req_child_exit() runs in the parent.
864  *
865  * This function is called from from the idle timer, too, and possibly
866  * without a thread being there any longer. Since we have folded up our
867  * tent in that case and all the semaphores are already gone, we simply
868  * ignore this request in this case.
869  *
870  * Since the existence of the semaphores is controlled exclusively by
871  * the parent, there's no risk of data race here.
872  */
873 int
874 req_child_exit(
875 	blocking_child *c
876 	)
877 {
878 	return (c->accesslock)
879 	    ? queue_req_pointer(c, CHILD_EXIT_REQ)
880 	    : 0;
881 }
882 
883 /* --------------------------------------------------------------------
884  * cleanup_after_child() runs in parent.
885  */
886 static void
887 cleanup_after_child(
888 	blocking_child *	c
889 	)
890 {
891 	DEBUG_INSIST(!c->reusable);
892 
893 #   ifdef SYS_WINNT
894 	/* The thread was not created in detached state, so we better
895 	 * clean up.
896 	 */
897 	if (c->thread_ref && c->thread_ref->thnd) {
898 		WaitForSingleObject(c->thread_ref->thnd, INFINITE);
899 		INSIST(CloseHandle(c->thread_ref->thnd));
900 		c->thread_ref->thnd = NULL;
901 	}
902 #   endif
903 	c->thread_ref = NULL;
904 
905 	/* remove semaphores and (if signalling vi IO) pipes */
906 
907 	c->accesslock           = delete_sema(c->accesslock);
908 	c->workitems_pending    = delete_sema(c->workitems_pending);
909 	c->wake_scheduled_sleep = delete_sema(c->wake_scheduled_sleep);
910 
911 #   ifdef WORK_PIPE
912 	DEBUG_INSIST(-1 != c->resp_read_pipe);
913 	DEBUG_INSIST(-1 != c->resp_write_pipe);
914 	(*addremove_io_fd)(c->resp_read_pipe, c->ispipe, TRUE);
915 	close(c->resp_write_pipe);
916 	close(c->resp_read_pipe);
917 	c->resp_write_pipe = -1;
918 	c->resp_read_pipe = -1;
919 #   else
920 	DEBUG_INSIST(NULL != c->responses_pending);
921 	(*addremove_io_semaphore)(c->responses_pending->shnd, TRUE);
922 	c->responses_pending = delete_sema(c->responses_pending);
923 #   endif
924 
925 	/* Is it necessary to check if there are pending requests and
926 	 * responses? If so, and if there are, what to do with them?
927 	 */
928 
929 	/* re-init buffer index sequencers */
930 	c->head_workitem = 0;
931 	c->tail_workitem = 0;
932 	c->head_response = 0;
933 	c->tail_response = 0;
934 
935 	c->reusable = TRUE;
936 }
937 
938 
939 #else	/* !WORK_THREAD follows */
940 char work_thread_nonempty_compilation_unit;
941 #endif
942