xref: /titanic_41/usr/src/lib/libc/port/rt/sigev_thread.c (revision 7257d1b4d25bfac0c802847390e98a464fd787ac)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 
22 /*
23  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 
27 #pragma ident	"%Z%%M%	%I%	%E% SMI"
28 
29 #include "lint.h"
30 #include "thr_uberdata.h"
31 #include <sys/types.h>
32 #include <pthread.h>
33 #include <unistd.h>
34 #include <stdlib.h>
35 #include <thread.h>
36 #include <pthread.h>
37 #include <synch.h>
38 #include <port.h>
39 #include <signal.h>
40 #include <stdio.h>
41 #include <errno.h>
42 #include <stdarg.h>
43 #include <string.h>
44 #include <sys/aiocb.h>
45 #include <time.h>
46 #include <signal.h>
47 #include <fcntl.h>
48 #include "sigev_thread.h"
49 
50 /*
51  * There is but one spawner for all aio operations.
52  */
53 thread_communication_data_t *sigev_aio_tcd = NULL;
54 
55 /*
56  * Set non-zero via _RT_DEBUG to enable debugging printf's.
57  */
58 static int _rt_debug = 0;
59 
60 void
init_sigev_thread(void)61 init_sigev_thread(void)
62 {
63 	char *ldebug;
64 
65 	if ((ldebug = getenv("_RT_DEBUG")) != NULL)
66 		_rt_debug = atoi(ldebug);
67 }
68 
69 /*
70  * Routine to print debug messages:
71  * If _rt_debug is set, printf the debug message to stderr
72  * with an appropriate prefix.
73  */
74 /*PRINTFLIKE1*/
75 static void
dprintf(const char * format,...)76 dprintf(const char *format, ...)
77 {
78 	if (_rt_debug) {
79 		va_list alist;
80 
81 		va_start(alist, format);
82 		flockfile(stderr);
83 		pthread_cleanup_push(funlockfile, stderr);
84 		(void) fputs("DEBUG: ", stderr);
85 		(void) vfprintf(stderr, format, alist);
86 		pthread_cleanup_pop(1);		/* funlockfile(stderr) */
87 		va_end(alist);
88 	}
89 }
90 
91 /*
92  * The notify_thread() function can be used as the start function of a new
93  * thread but it is normally called from notifier(), below, in the context
94  * of a thread pool worker thread.  It is used as the start function of a
95  * new thread only when individual pthread attributes differ from those
96  * that are common to all workers.  This only occurs in the AIO case.
97  */
98 static void *
notify_thread(void * arg)99 notify_thread(void *arg)
100 {
101 	sigev_thread_data_t *stdp = arg;
102 	void (*function)(union sigval) = stdp->std_func;
103 	union sigval argument = stdp->std_arg;
104 
105 	lfree(stdp, sizeof (*stdp));
106 	function(argument);
107 	return (NULL);
108 }
109 
110 /*
111  * Thread pool interface to call the user-supplied notification function.
112  */
113 static void
notifier(void * arg)114 notifier(void *arg)
115 {
116 	(void) notify_thread(arg);
117 }
118 
119 /*
120  * This routine adds a new work request, described by function
121  * and argument, to the list of outstanding jobs.
122  * It returns 0 indicating success.  A value != 0 indicates an error.
123  */
124 static int
sigev_add_work(thread_communication_data_t * tcdp,void (* function)(union sigval),union sigval argument)125 sigev_add_work(thread_communication_data_t *tcdp,
126 	void (*function)(union sigval), union sigval argument)
127 {
128 	tpool_t *tpool = tcdp->tcd_poolp;
129 	sigev_thread_data_t *stdp;
130 
131 	if (tpool == NULL)
132 		return (EINVAL);
133 	if ((stdp = lmalloc(sizeof (*stdp))) == NULL)
134 		return (errno);
135 	stdp->std_func = function;
136 	stdp->std_arg = argument;
137 	if (tpool_dispatch(tpool, notifier, stdp) != 0) {
138 		lfree(stdp, sizeof (*stdp));
139 		return (errno);
140 	}
141 	return (0);
142 }
143 
144 static void
sigev_destroy_pool(thread_communication_data_t * tcdp)145 sigev_destroy_pool(thread_communication_data_t *tcdp)
146 {
147 	if (tcdp->tcd_poolp != NULL)
148 		tpool_abandon(tcdp->tcd_poolp);
149 	tcdp->tcd_poolp = NULL;
150 
151 	if (tcdp->tcd_subsystem == MQ) {
152 		/*
153 		 * synchronize with del_sigev_mq()
154 		 */
155 		sig_mutex_lock(&tcdp->tcd_lock);
156 		tcdp->tcd_server_id = 0;
157 		if (tcdp->tcd_msg_closing) {
158 			(void) cond_broadcast(&tcdp->tcd_cv);
159 			sig_mutex_unlock(&tcdp->tcd_lock);
160 			return;		/* del_sigev_mq() will free the tcd */
161 		}
162 		sig_mutex_unlock(&tcdp->tcd_lock);
163 	}
164 
165 	/*
166 	 * now delete everything
167 	 */
168 	free_sigev_handler(tcdp);
169 }
170 
171 /*
172  * timer_spawner(), mqueue_spawner(), and aio_spawner() are the main
173  * functions for the daemon threads that get the event(s) for the
174  * respective SIGEV_THREAD subsystems.  There is one timer spawner for
175  * each timer_create(), one mqueue spawner for every mq_open(), and
176  * exactly one aio spawner for all aio requests.  These spawners add
177  * work requests to be done by a pool of daemon worker threads.  In case
178  * the event requires creation of a worker thread with different pthread
179  * attributes than those from the pool of workers, a new daemon thread
180  * with these attributes is spawned apart from the pool of workers.
181  * If the spawner fails to add work or fails to create an additional
182  * thread because of lacking resources, it puts the event back into
183  * the kernel queue and re-tries some time later.
184  */
185 
186 void *
timer_spawner(void * arg)187 timer_spawner(void *arg)
188 {
189 	thread_communication_data_t *tcdp = (thread_communication_data_t *)arg;
190 	port_event_t port_event;
191 
192 	/* destroy the pool if we are cancelled */
193 	pthread_cleanup_push(sigev_destroy_pool, tcdp);
194 
195 	for (;;) {
196 		if (port_get(tcdp->tcd_port, &port_event, NULL) != 0) {
197 			dprintf("port_get on port %d failed with %d <%s>\n",
198 			    tcdp->tcd_port, errno, strerror(errno));
199 			break;
200 		}
201 		switch (port_event.portev_source) {
202 		case PORT_SOURCE_TIMER:
203 			break;
204 		case PORT_SOURCE_ALERT:
205 			if (port_event.portev_events != SIGEV_THREAD_TERM)
206 				errno = EPROTO;
207 			goto out;
208 		default:
209 			dprintf("port_get on port %d returned %u "
210 			    "(not PORT_SOURCE_TIMER)\n",
211 			    tcdp->tcd_port, port_event.portev_source);
212 			errno = EPROTO;
213 			goto out;
214 		}
215 
216 		tcdp->tcd_overruns = port_event.portev_events - 1;
217 		if (sigev_add_work(tcdp,
218 		    tcdp->tcd_notif.sigev_notify_function,
219 		    tcdp->tcd_notif.sigev_value) != 0)
220 			break;
221 		/* wait until job is done before looking for another */
222 		tpool_wait(tcdp->tcd_poolp);
223 	}
224 out:
225 	pthread_cleanup_pop(1);
226 	return (NULL);
227 }
228 
229 void *
mqueue_spawner(void * arg)230 mqueue_spawner(void *arg)
231 {
232 	thread_communication_data_t *tcdp = (thread_communication_data_t *)arg;
233 	int ret = 0;
234 	int ntype;
235 	void (*function)(union sigval);
236 	union sigval argument;
237 
238 	/* destroy the pool if we are cancelled */
239 	pthread_cleanup_push(sigev_destroy_pool, tcdp);
240 
241 	while (ret == 0) {
242 		sig_mutex_lock(&tcdp->tcd_lock);
243 		pthread_cleanup_push(sig_mutex_unlock, &tcdp->tcd_lock);
244 		while ((ntype = tcdp->tcd_msg_enabled) == 0)
245 			(void) sig_cond_wait(&tcdp->tcd_cv, &tcdp->tcd_lock);
246 		pthread_cleanup_pop(1);
247 
248 		while (sem_wait(tcdp->tcd_msg_avail) == -1)
249 			continue;
250 
251 		sig_mutex_lock(&tcdp->tcd_lock);
252 		tcdp->tcd_msg_enabled = 0;
253 		sig_mutex_unlock(&tcdp->tcd_lock);
254 
255 		/* ASSERT(ntype == SIGEV_THREAD || ntype == SIGEV_PORT); */
256 		if (ntype == SIGEV_THREAD) {
257 			function = tcdp->tcd_notif.sigev_notify_function;
258 			argument.sival_ptr = tcdp->tcd_msg_userval;
259 			ret = sigev_add_work(tcdp, function, argument);
260 		} else {	/* ntype == SIGEV_PORT */
261 			ret = _port_dispatch(tcdp->tcd_port, 0, PORT_SOURCE_MQ,
262 			    0, (uintptr_t)tcdp->tcd_msg_object,
263 			    tcdp->tcd_msg_userval);
264 		}
265 	}
266 	sig_mutex_unlock(&tcdp->tcd_lock);
267 
268 	pthread_cleanup_pop(1);
269 	return (NULL);
270 }
271 
272 void *
aio_spawner(void * arg)273 aio_spawner(void *arg)
274 {
275 	thread_communication_data_t *tcdp = (thread_communication_data_t *)arg;
276 	int error = 0;
277 	void (*function)(union sigval);
278 	union sigval argument;
279 	port_event_t port_event;
280 	struct sigevent *sigevp;
281 	timespec_t delta;
282 	pthread_attr_t *attrp;
283 
284 	/* destroy the pool if we are cancelled */
285 	pthread_cleanup_push(sigev_destroy_pool, tcdp);
286 
287 	while (error == 0) {
288 		if (port_get(tcdp->tcd_port, &port_event, NULL) != 0) {
289 			error = errno;
290 			dprintf("port_get on port %d failed with %d <%s>\n",
291 			    tcdp->tcd_port, error, strerror(error));
292 			break;
293 		}
294 		switch (port_event.portev_source) {
295 		case PORT_SOURCE_AIO:
296 			break;
297 		case PORT_SOURCE_ALERT:
298 			if (port_event.portev_events != SIGEV_THREAD_TERM)
299 				errno = EPROTO;
300 			goto out;
301 		default:
302 			dprintf("port_get on port %d returned %u "
303 			    "(not PORT_SOURCE_AIO)\n",
304 			    tcdp->tcd_port, port_event.portev_source);
305 			errno = EPROTO;
306 			goto out;
307 		}
308 		argument.sival_ptr = port_event.portev_user;
309 		switch (port_event.portev_events) {
310 		case AIOLIO:
311 #if !defined(_LP64)
312 		case AIOLIO64:
313 #endif
314 			sigevp = (struct sigevent *)port_event.portev_object;
315 			function = sigevp->sigev_notify_function;
316 			attrp = sigevp->sigev_notify_attributes;
317 			break;
318 		case AIOAREAD:
319 		case AIOAWRITE:
320 		case AIOFSYNC:
321 			{
322 			aiocb_t *aiocbp =
323 			    (aiocb_t *)port_event.portev_object;
324 			function = aiocbp->aio_sigevent.sigev_notify_function;
325 			attrp = aiocbp->aio_sigevent.sigev_notify_attributes;
326 			break;
327 			}
328 #if !defined(_LP64)
329 		case AIOAREAD64:
330 		case AIOAWRITE64:
331 		case AIOFSYNC64:
332 			{
333 			aiocb64_t *aiocbp =
334 			    (aiocb64_t *)port_event.portev_object;
335 			function = aiocbp->aio_sigevent.sigev_notify_function;
336 			attrp = aiocbp->aio_sigevent.sigev_notify_attributes;
337 			break;
338 			}
339 #endif
340 		default:
341 			function = NULL;
342 			attrp = NULL;
343 			break;
344 		}
345 
346 		if (function == NULL)
347 			error = EINVAL;
348 		else if (pthread_attr_equal(attrp, tcdp->tcd_attrp))
349 			error = sigev_add_work(tcdp, function, argument);
350 		else {
351 			/*
352 			 * The attributes don't match.
353 			 * Spawn a thread with the non-matching attributes.
354 			 */
355 			pthread_attr_t local_attr;
356 			sigev_thread_data_t *stdp;
357 
358 			if ((stdp = lmalloc(sizeof (*stdp))) == NULL)
359 				error = ENOMEM;
360 			else
361 				error = pthread_attr_clone(&local_attr, attrp);
362 
363 			if (error == 0) {
364 				(void) pthread_attr_setdetachstate(
365 				    &local_attr, PTHREAD_CREATE_DETACHED);
366 				(void) pthread_attr_setdaemonstate_np(
367 				    &local_attr, PTHREAD_CREATE_DAEMON_NP);
368 				stdp->std_func = function;
369 				stdp->std_arg = argument;
370 				error = pthread_create(NULL, &local_attr,
371 				    notify_thread, stdp);
372 				(void) pthread_attr_destroy(&local_attr);
373 			}
374 			if (error && stdp != NULL)
375 				lfree(stdp, sizeof (*stdp));
376 		}
377 
378 		if (error) {
379 			dprintf("Cannot add work, error=%d <%s>.\n",
380 			    error, strerror(error));
381 			if (error == EAGAIN || error == ENOMEM) {
382 				/* (Temporary) no resources are available. */
383 				if (_port_dispatch(tcdp->tcd_port, 0,
384 				    PORT_SOURCE_AIO, port_event.portev_events,
385 				    port_event.portev_object,
386 				    port_event.portev_user) != 0)
387 					break;
388 				error = 0;
389 				delta.tv_sec = 0;
390 				delta.tv_nsec = NANOSEC / 20;	/* 50 msec */
391 				(void) nanosleep(&delta, NULL);
392 			}
393 		}
394 	}
395 out:
396 	pthread_cleanup_pop(1);
397 	return (NULL);
398 }
399 
400 /*
401  * Allocate a thread_communication_data_t block.
402  */
403 static thread_communication_data_t *
alloc_sigev_handler(subsystem_t caller)404 alloc_sigev_handler(subsystem_t caller)
405 {
406 	thread_communication_data_t *tcdp;
407 
408 	if ((tcdp = lmalloc(sizeof (*tcdp))) != NULL) {
409 		tcdp->tcd_subsystem = caller;
410 		tcdp->tcd_port = -1;
411 		(void) mutex_init(&tcdp->tcd_lock, USYNC_THREAD, NULL);
412 		(void) cond_init(&tcdp->tcd_cv, USYNC_THREAD, NULL);
413 	}
414 	return (tcdp);
415 }
416 
417 /*
418  * Free a thread_communication_data_t block.
419  */
420 void
free_sigev_handler(thread_communication_data_t * tcdp)421 free_sigev_handler(thread_communication_data_t *tcdp)
422 {
423 	if (tcdp->tcd_attrp) {
424 		(void) pthread_attr_destroy(tcdp->tcd_attrp);
425 		tcdp->tcd_attrp = NULL;
426 	}
427 	(void) memset(&tcdp->tcd_notif, 0, sizeof (tcdp->tcd_notif));
428 
429 	switch (tcdp->tcd_subsystem) {
430 	case TIMER:
431 	case AIO:
432 		if (tcdp->tcd_port >= 0)
433 			(void) close(tcdp->tcd_port);
434 		break;
435 	case MQ:
436 		tcdp->tcd_msg_avail = NULL;
437 		tcdp->tcd_msg_object = NULL;
438 		tcdp->tcd_msg_userval = NULL;
439 		tcdp->tcd_msg_enabled = 0;
440 		break;
441 	}
442 
443 	lfree(tcdp, sizeof (*tcdp));
444 }
445 
446 /*
447  * Initialize data structure and create the port.
448  */
449 thread_communication_data_t *
setup_sigev_handler(const struct sigevent * sigevp,subsystem_t caller)450 setup_sigev_handler(const struct sigevent *sigevp, subsystem_t caller)
451 {
452 	thread_communication_data_t *tcdp;
453 	int error;
454 
455 	if (sigevp == NULL) {
456 		errno = EINVAL;
457 		return (NULL);
458 	}
459 
460 	if ((tcdp = alloc_sigev_handler(caller)) == NULL) {
461 		errno = ENOMEM;
462 		return (NULL);
463 	}
464 
465 	if (sigevp->sigev_notify_attributes == NULL)
466 		tcdp->tcd_attrp = NULL;		/* default attributes */
467 	else {
468 		/*
469 		 * We cannot just copy the sigevp->sigev_notify_attributes
470 		 * pointer.  We need to initialize a new pthread_attr_t
471 		 * structure with the values from the user-supplied
472 		 * pthread_attr_t.
473 		 */
474 		tcdp->tcd_attrp = &tcdp->tcd_user_attr;
475 		error = pthread_attr_clone(tcdp->tcd_attrp,
476 		    sigevp->sigev_notify_attributes);
477 		if (error) {
478 			tcdp->tcd_attrp = NULL;
479 			free_sigev_handler(tcdp);
480 			errno = error;
481 			return (NULL);
482 		}
483 	}
484 	tcdp->tcd_notif = *sigevp;
485 	tcdp->tcd_notif.sigev_notify_attributes = tcdp->tcd_attrp;
486 
487 	if (caller == TIMER || caller == AIO) {
488 		if ((tcdp->tcd_port = port_create()) < 0 ||
489 		    fcntl(tcdp->tcd_port, FD_CLOEXEC) == -1) {
490 			free_sigev_handler(tcdp);
491 			errno = EBADF;
492 			return (NULL);
493 		}
494 	}
495 	return (tcdp);
496 }
497 
498 /*
499  * Create a thread pool and launch the spawner.
500  */
501 int
launch_spawner(thread_communication_data_t * tcdp)502 launch_spawner(thread_communication_data_t *tcdp)
503 {
504 	int ret;
505 	int maxworkers;
506 	void *(*spawner)(void *);
507 	sigset_t set;
508 	sigset_t oset;
509 
510 	switch (tcdp->tcd_subsystem) {
511 	case TIMER:
512 		spawner = timer_spawner;
513 		maxworkers = 1;
514 		break;
515 	case MQ:
516 		spawner = mqueue_spawner;
517 		maxworkers = 1;
518 		break;
519 	case AIO:
520 		spawner = aio_spawner;
521 		maxworkers = 100;
522 		break;
523 	default:
524 		return (-1);
525 	}
526 	tcdp->tcd_poolp = tpool_create(1, maxworkers, 20,
527 	    tcdp->tcd_notif.sigev_notify_attributes);
528 	if (tcdp->tcd_poolp == NULL)
529 		return (-1);
530 	/* create the spawner with all signals blocked */
531 	(void) sigfillset(&set);
532 	(void) thr_sigsetmask(SIG_SETMASK, &set, &oset);
533 	ret = thr_create(NULL, 0, spawner, tcdp,
534 	    THR_DETACHED | THR_DAEMON, &tcdp->tcd_server_id);
535 	(void) thr_sigsetmask(SIG_SETMASK, &oset, NULL);
536 	if (ret != 0) {
537 		tpool_destroy(tcdp->tcd_poolp);
538 		tcdp->tcd_poolp = NULL;
539 		return (-1);
540 	}
541 	return (0);
542 }
543 
544 /*
545  * Delete the data associated with the sigev_thread timer, if timer is
546  * associated with such a notification option.
547  * Destroy the timer_spawner thread.
548  */
549 int
del_sigev_timer(timer_t timer)550 del_sigev_timer(timer_t timer)
551 {
552 	int rc = 0;
553 	thread_communication_data_t *tcdp;
554 
555 	if ((uint_t)timer < timer_max && (tcdp = timer_tcd[timer]) != NULL) {
556 		sig_mutex_lock(&tcdp->tcd_lock);
557 		if (tcdp->tcd_port >= 0) {
558 			if ((rc = port_alert(tcdp->tcd_port,
559 			    PORT_ALERT_SET, SIGEV_THREAD_TERM, NULL)) == 0) {
560 				dprintf("del_sigev_timer(%d) OK.\n", timer);
561 			}
562 		}
563 		timer_tcd[timer] = NULL;
564 		sig_mutex_unlock(&tcdp->tcd_lock);
565 	}
566 	return (rc);
567 }
568 
569 int
sigev_timer_getoverrun(timer_t timer)570 sigev_timer_getoverrun(timer_t timer)
571 {
572 	thread_communication_data_t *tcdp;
573 
574 	if ((uint_t)timer < timer_max && (tcdp = timer_tcd[timer]) != NULL)
575 		return (tcdp->tcd_overruns);
576 	return (0);
577 }
578 
579 static void
del_sigev_mq_cleanup(thread_communication_data_t * tcdp)580 del_sigev_mq_cleanup(thread_communication_data_t *tcdp)
581 {
582 	sig_mutex_unlock(&tcdp->tcd_lock);
583 	free_sigev_handler(tcdp);
584 }
585 
586 /*
587  * Delete the data associated with the sigev_thread message queue,
588  * if the message queue is associated with such a notification option.
589  * Destroy the mqueue_spawner thread.
590  */
591 void
del_sigev_mq(thread_communication_data_t * tcdp)592 del_sigev_mq(thread_communication_data_t *tcdp)
593 {
594 	pthread_t server_id;
595 	int rc;
596 
597 	sig_mutex_lock(&tcdp->tcd_lock);
598 
599 	server_id = tcdp->tcd_server_id;
600 	tcdp->tcd_msg_closing = 1;
601 	if ((rc = pthread_cancel(server_id)) != 0) {	/* "can't happen" */
602 		sig_mutex_unlock(&tcdp->tcd_lock);
603 		dprintf("Fail to cancel %u with error %d <%s>.\n",
604 		    server_id, rc, strerror(rc));
605 		return;
606 	}
607 
608 	/*
609 	 * wait for sigev_destroy_pool() to finish
610 	 */
611 	pthread_cleanup_push(del_sigev_mq_cleanup, tcdp);
612 	while (tcdp->tcd_server_id == server_id)
613 		(void) sig_cond_wait(&tcdp->tcd_cv, &tcdp->tcd_lock);
614 	pthread_cleanup_pop(1);
615 }
616 
617 /*
618  * POSIX aio:
619  * If the notification type is SIGEV_THREAD, set up
620  * the port number for notifications.  Create the
621  * thread pool and launch the spawner if necessary.
622  * If the notification type is not SIGEV_THREAD, do nothing.
623  */
624 int
_aio_sigev_thread_init(struct sigevent * sigevp)625 _aio_sigev_thread_init(struct sigevent *sigevp)
626 {
627 	static mutex_t sigev_aio_lock = DEFAULTMUTEX;
628 	static cond_t sigev_aio_cv = DEFAULTCV;
629 	static int sigev_aio_busy = 0;
630 
631 	thread_communication_data_t *tcdp;
632 	int port;
633 	int cancel_state;
634 	int rc = 0;
635 
636 	if (sigevp == NULL ||
637 	    sigevp->sigev_notify != SIGEV_THREAD ||
638 	    sigevp->sigev_notify_function == NULL)
639 		return (0);
640 
641 	lmutex_lock(&sigev_aio_lock);
642 	(void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state);
643 	while (sigev_aio_busy)
644 		(void) cond_wait(&sigev_aio_cv, &sigev_aio_lock);
645 	(void) pthread_setcancelstate(cancel_state, NULL);
646 	if ((tcdp = sigev_aio_tcd) != NULL)
647 		port = tcdp->tcd_port;
648 	else {
649 		sigev_aio_busy = 1;
650 		lmutex_unlock(&sigev_aio_lock);
651 
652 		tcdp = setup_sigev_handler(sigevp, AIO);
653 		if (tcdp == NULL) {
654 			port = -1;
655 			rc = -1;
656 		} else if (launch_spawner(tcdp) != 0) {
657 			free_sigev_handler(tcdp);
658 			tcdp = NULL;
659 			port = -1;
660 			rc = -1;
661 		} else {
662 			port = tcdp->tcd_port;
663 		}
664 
665 		lmutex_lock(&sigev_aio_lock);
666 		sigev_aio_tcd = tcdp;
667 		sigev_aio_busy = 0;
668 		(void) cond_broadcast(&sigev_aio_cv);
669 	}
670 	lmutex_unlock(&sigev_aio_lock);
671 	sigevp->sigev_signo = port;
672 	return (rc);
673 }
674 
675 int
_aio_sigev_thread(aiocb_t * aiocbp)676 _aio_sigev_thread(aiocb_t *aiocbp)
677 {
678 	if (aiocbp == NULL)
679 		return (0);
680 	return (_aio_sigev_thread_init(&aiocbp->aio_sigevent));
681 }
682 
683 #if !defined(_LP64)
684 int
_aio_sigev_thread64(aiocb64_t * aiocbp)685 _aio_sigev_thread64(aiocb64_t *aiocbp)
686 {
687 	if (aiocbp == NULL)
688 		return (0);
689 	return (_aio_sigev_thread_init(&aiocbp->aio_sigevent));
690 }
691 #endif
692 
693 /*
694  * Cleanup POSIX aio after fork1() in the child process.
695  */
696 void
postfork1_child_sigev_aio(void)697 postfork1_child_sigev_aio(void)
698 {
699 	thread_communication_data_t *tcdp;
700 
701 	if ((tcdp = sigev_aio_tcd) != NULL) {
702 		sigev_aio_tcd = NULL;
703 		tcd_teardown(tcdp);
704 	}
705 }
706 
707 /*
708  * Utility function for the various postfork1_child_sigev_*() functions.
709  * Clean up the tcdp data structure and close the port.
710  */
711 void
tcd_teardown(thread_communication_data_t * tcdp)712 tcd_teardown(thread_communication_data_t *tcdp)
713 {
714 	if (tcdp->tcd_poolp != NULL)
715 		tpool_abandon(tcdp->tcd_poolp);
716 	tcdp->tcd_poolp = NULL;
717 	tcdp->tcd_server_id = 0;
718 	free_sigev_handler(tcdp);
719 }
720