xref: /illumos-gate/usr/src/lib/libc/port/rt/sigev_thread.c (revision 1da57d551424de5a9d469760be7c4b4d4f10a755)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 
22 /*
23  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 
27 #include "lint.h"
28 #include "thr_uberdata.h"
29 #include <sys/types.h>
30 #include <pthread.h>
31 #include <unistd.h>
32 #include <stdlib.h>
33 #include <thread.h>
34 #include <pthread.h>
35 #include <synch.h>
36 #include <port.h>
37 #include <signal.h>
38 #include <stdio.h>
39 #include <errno.h>
40 #include <stdarg.h>
41 #include <string.h>
42 #include <sys/aiocb.h>
43 #include <time.h>
44 #include <signal.h>
45 #include <fcntl.h>
46 #include "sigev_thread.h"
47 
48 /*
49  * There is but one spawner for all aio operations.
50  */
51 thread_communication_data_t *sigev_aio_tcd = NULL;
52 
53 /*
54  * Set non-zero via _RT_DEBUG to enable debugging printf's.
55  */
56 static int _rt_debug = 0;
57 
58 void
init_sigev_thread(void)59 init_sigev_thread(void)
60 {
61 	char *ldebug;
62 
63 	if ((ldebug = getenv("_RT_DEBUG")) != NULL)
64 		_rt_debug = atoi(ldebug);
65 }
66 
67 /*
68  * Routine to print debug messages:
69  * If _rt_debug is set, printf the debug message to stderr
70  * with an appropriate prefix.
71  */
72 /*PRINTFLIKE1*/
73 static void
dprintf(const char * format,...)74 dprintf(const char *format, ...)
75 {
76 	if (_rt_debug) {
77 		va_list alist;
78 
79 		va_start(alist, format);
80 		flockfile(stderr);
81 		pthread_cleanup_push(funlockfile, stderr);
82 		(void) fputs("DEBUG: ", stderr);
83 		(void) vfprintf(stderr, format, alist);
84 		pthread_cleanup_pop(1);		/* funlockfile(stderr) */
85 		va_end(alist);
86 	}
87 }
88 
89 /*
90  * The notify_thread() function can be used as the start function of a new
91  * thread but it is normally called from notifier(), below, in the context
92  * of a thread pool worker thread.  It is used as the start function of a
93  * new thread only when individual pthread attributes differ from those
94  * that are common to all workers.  This only occurs in the AIO case.
95  */
96 static void *
notify_thread(void * arg)97 notify_thread(void *arg)
98 {
99 	sigev_thread_data_t *stdp = arg;
100 	void (*function)(union sigval) = stdp->std_func;
101 	union sigval argument = stdp->std_arg;
102 
103 	lfree(stdp, sizeof (*stdp));
104 	function(argument);
105 	return (NULL);
106 }
107 
108 /*
109  * Thread pool interface to call the user-supplied notification function.
110  */
111 static void
notifier(void * arg)112 notifier(void *arg)
113 {
114 	(void) notify_thread(arg);
115 }
116 
117 /*
118  * This routine adds a new work request, described by function
119  * and argument, to the list of outstanding jobs.
120  * It returns 0 indicating success.  A value != 0 indicates an error.
121  */
122 static int
sigev_add_work(thread_communication_data_t * tcdp,void (* function)(union sigval),union sigval argument)123 sigev_add_work(thread_communication_data_t *tcdp,
124 	void (*function)(union sigval), union sigval argument)
125 {
126 	tpool_t *tpool = tcdp->tcd_poolp;
127 	sigev_thread_data_t *stdp;
128 
129 	if (tpool == NULL)
130 		return (EINVAL);
131 	if ((stdp = lmalloc(sizeof (*stdp))) == NULL)
132 		return (errno);
133 	stdp->std_func = function;
134 	stdp->std_arg = argument;
135 	if (tpool_dispatch(tpool, notifier, stdp) != 0) {
136 		lfree(stdp, sizeof (*stdp));
137 		return (errno);
138 	}
139 	return (0);
140 }
141 
142 static void
sigev_destroy_pool(thread_communication_data_t * tcdp)143 sigev_destroy_pool(thread_communication_data_t *tcdp)
144 {
145 	if (tcdp->tcd_poolp != NULL)
146 		tpool_abandon(tcdp->tcd_poolp);
147 	tcdp->tcd_poolp = NULL;
148 
149 	if (tcdp->tcd_subsystem == MQ) {
150 		/*
151 		 * synchronize with del_sigev_mq()
152 		 */
153 		sig_mutex_lock(&tcdp->tcd_lock);
154 		tcdp->tcd_server_id = 0;
155 		if (tcdp->tcd_msg_closing) {
156 			(void) cond_broadcast(&tcdp->tcd_cv);
157 			sig_mutex_unlock(&tcdp->tcd_lock);
158 			return;		/* del_sigev_mq() will free the tcd */
159 		}
160 		sig_mutex_unlock(&tcdp->tcd_lock);
161 	}
162 
163 	/*
164 	 * now delete everything
165 	 */
166 	free_sigev_handler(tcdp);
167 }
168 
169 /*
170  * timer_spawner(), mqueue_spawner(), and aio_spawner() are the main
171  * functions for the daemon threads that get the event(s) for the
172  * respective SIGEV_THREAD subsystems.  There is one timer spawner for
173  * each timer_create(), one mqueue spawner for every mq_open(), and
174  * exactly one aio spawner for all aio requests.  These spawners add
175  * work requests to be done by a pool of daemon worker threads.  In case
176  * the event requires creation of a worker thread with different pthread
177  * attributes than those from the pool of workers, a new daemon thread
178  * with these attributes is spawned apart from the pool of workers.
179  * If the spawner fails to add work or fails to create an additional
180  * thread because of lacking resources, it puts the event back into
181  * the kernel queue and re-tries some time later.
182  */
183 
184 void *
timer_spawner(void * arg)185 timer_spawner(void *arg)
186 {
187 	thread_communication_data_t *tcdp = (thread_communication_data_t *)arg;
188 	port_event_t port_event;
189 
190 	/* destroy the pool if we are cancelled */
191 	pthread_cleanup_push(sigev_destroy_pool, tcdp);
192 
193 	for (;;) {
194 		if (port_get(tcdp->tcd_port, &port_event, NULL) != 0) {
195 			dprintf("port_get on port %d failed with %d <%s>\n",
196 			    tcdp->tcd_port, errno, strerror(errno));
197 			break;
198 		}
199 		switch (port_event.portev_source) {
200 		case PORT_SOURCE_TIMER:
201 			break;
202 		case PORT_SOURCE_ALERT:
203 			if (port_event.portev_events != SIGEV_THREAD_TERM)
204 				errno = EPROTO;
205 			goto out;
206 		default:
207 			dprintf("port_get on port %d returned %u "
208 			    "(not PORT_SOURCE_TIMER)\n",
209 			    tcdp->tcd_port, port_event.portev_source);
210 			errno = EPROTO;
211 			goto out;
212 		}
213 
214 		tcdp->tcd_overruns = port_event.portev_events - 1;
215 		if (sigev_add_work(tcdp,
216 		    tcdp->tcd_notif.sigev_notify_function,
217 		    tcdp->tcd_notif.sigev_value) != 0)
218 			break;
219 		/* wait until job is done before looking for another */
220 		tpool_wait(tcdp->tcd_poolp);
221 	}
222 out:
223 	pthread_cleanup_pop(1);
224 	return (NULL);
225 }
226 
227 void *
mqueue_spawner(void * arg)228 mqueue_spawner(void *arg)
229 {
230 	thread_communication_data_t *tcdp = (thread_communication_data_t *)arg;
231 	int ret = 0;
232 	int ntype;
233 	void (*function)(union sigval);
234 	union sigval argument;
235 
236 	/* destroy the pool if we are cancelled */
237 	pthread_cleanup_push(sigev_destroy_pool, tcdp);
238 
239 	while (ret == 0) {
240 		sig_mutex_lock(&tcdp->tcd_lock);
241 		pthread_cleanup_push(sig_mutex_unlock, &tcdp->tcd_lock);
242 		while ((ntype = tcdp->tcd_msg_enabled) == 0)
243 			(void) sig_cond_wait(&tcdp->tcd_cv, &tcdp->tcd_lock);
244 		pthread_cleanup_pop(1);
245 
246 		while (sem_wait(tcdp->tcd_msg_avail) == -1)
247 			continue;
248 
249 		sig_mutex_lock(&tcdp->tcd_lock);
250 		tcdp->tcd_msg_enabled = 0;
251 		sig_mutex_unlock(&tcdp->tcd_lock);
252 
253 		/* ASSERT(ntype == SIGEV_THREAD || ntype == SIGEV_PORT); */
254 		if (ntype == SIGEV_THREAD) {
255 			function = tcdp->tcd_notif.sigev_notify_function;
256 			argument.sival_ptr = tcdp->tcd_msg_userval;
257 			ret = sigev_add_work(tcdp, function, argument);
258 		} else {	/* ntype == SIGEV_PORT */
259 			ret = _port_dispatch(tcdp->tcd_port, 0, PORT_SOURCE_MQ,
260 			    0, (uintptr_t)tcdp->tcd_msg_object,
261 			    tcdp->tcd_msg_userval);
262 		}
263 	}
264 	sig_mutex_unlock(&tcdp->tcd_lock);
265 
266 	pthread_cleanup_pop(1);
267 	return (NULL);
268 }
269 
270 void *
aio_spawner(void * arg)271 aio_spawner(void *arg)
272 {
273 	thread_communication_data_t *tcdp = (thread_communication_data_t *)arg;
274 	int error = 0;
275 	void (*function)(union sigval);
276 	union sigval argument;
277 	port_event_t port_event;
278 	struct sigevent *sigevp;
279 	timespec_t delta;
280 	pthread_attr_t *attrp;
281 
282 	/* destroy the pool if we are cancelled */
283 	pthread_cleanup_push(sigev_destroy_pool, tcdp);
284 
285 	while (error == 0) {
286 		if (port_get(tcdp->tcd_port, &port_event, NULL) != 0) {
287 			error = errno;
288 			dprintf("port_get on port %d failed with %d <%s>\n",
289 			    tcdp->tcd_port, error, strerror(error));
290 			break;
291 		}
292 		switch (port_event.portev_source) {
293 		case PORT_SOURCE_AIO:
294 			break;
295 		case PORT_SOURCE_ALERT:
296 			if (port_event.portev_events != SIGEV_THREAD_TERM)
297 				errno = EPROTO;
298 			goto out;
299 		default:
300 			dprintf("port_get on port %d returned %u "
301 			    "(not PORT_SOURCE_AIO)\n",
302 			    tcdp->tcd_port, port_event.portev_source);
303 			errno = EPROTO;
304 			goto out;
305 		}
306 		argument.sival_ptr = port_event.portev_user;
307 		switch (port_event.portev_events) {
308 		case AIOLIO:
309 #if !defined(_LP64)
310 		case AIOLIO64:
311 #endif
312 			sigevp = (struct sigevent *)port_event.portev_object;
313 			function = sigevp->sigev_notify_function;
314 			attrp = sigevp->sigev_notify_attributes;
315 			break;
316 		case AIOAREAD:
317 		case AIOAWRITE:
318 		case AIOFSYNC:
319 			{
320 			aiocb_t *aiocbp =
321 			    (aiocb_t *)port_event.portev_object;
322 			function = aiocbp->aio_sigevent.sigev_notify_function;
323 			attrp = aiocbp->aio_sigevent.sigev_notify_attributes;
324 			break;
325 			}
326 #if !defined(_LP64)
327 		case AIOAREAD64:
328 		case AIOAWRITE64:
329 		case AIOFSYNC64:
330 			{
331 			aiocb64_t *aiocbp =
332 			    (aiocb64_t *)port_event.portev_object;
333 			function = aiocbp->aio_sigevent.sigev_notify_function;
334 			attrp = aiocbp->aio_sigevent.sigev_notify_attributes;
335 			break;
336 			}
337 #endif
338 		default:
339 			function = NULL;
340 			attrp = NULL;
341 			break;
342 		}
343 
344 		if (function == NULL)
345 			error = EINVAL;
346 		else if (pthread_attr_equal(attrp, tcdp->tcd_attrp))
347 			error = sigev_add_work(tcdp, function, argument);
348 		else {
349 			/*
350 			 * The attributes don't match.
351 			 * Spawn a thread with the non-matching attributes.
352 			 */
353 			pthread_attr_t local_attr;
354 			sigev_thread_data_t *stdp;
355 
356 			if ((stdp = lmalloc(sizeof (*stdp))) == NULL)
357 				error = ENOMEM;
358 			else
359 				error = pthread_attr_clone(&local_attr, attrp);
360 
361 			if (error == 0) {
362 				(void) pthread_attr_setdetachstate(
363 				    &local_attr, PTHREAD_CREATE_DETACHED);
364 				(void) pthread_attr_setdaemonstate_np(
365 				    &local_attr, PTHREAD_CREATE_DAEMON_NP);
366 				stdp->std_func = function;
367 				stdp->std_arg = argument;
368 				error = pthread_create(NULL, &local_attr,
369 				    notify_thread, stdp);
370 				(void) pthread_attr_destroy(&local_attr);
371 			}
372 			if (error && stdp != NULL)
373 				lfree(stdp, sizeof (*stdp));
374 		}
375 
376 		if (error) {
377 			dprintf("Cannot add work, error=%d <%s>.\n",
378 			    error, strerror(error));
379 			if (error == EAGAIN || error == ENOMEM) {
380 				/* (Temporary) no resources are available. */
381 				if (_port_dispatch(tcdp->tcd_port, 0,
382 				    PORT_SOURCE_AIO, port_event.portev_events,
383 				    port_event.portev_object,
384 				    port_event.portev_user) != 0)
385 					break;
386 				error = 0;
387 				delta.tv_sec = 0;
388 				delta.tv_nsec = NANOSEC / 20;	/* 50 msec */
389 				(void) nanosleep(&delta, NULL);
390 			}
391 		}
392 	}
393 out:
394 	pthread_cleanup_pop(1);
395 	return (NULL);
396 }
397 
398 /*
399  * Allocate a thread_communication_data_t block.
400  */
401 static thread_communication_data_t *
alloc_sigev_handler(subsystem_t caller)402 alloc_sigev_handler(subsystem_t caller)
403 {
404 	thread_communication_data_t *tcdp;
405 
406 	if ((tcdp = lmalloc(sizeof (*tcdp))) != NULL) {
407 		tcdp->tcd_subsystem = caller;
408 		tcdp->tcd_port = -1;
409 		(void) mutex_init(&tcdp->tcd_lock, USYNC_THREAD, NULL);
410 		(void) cond_init(&tcdp->tcd_cv, USYNC_THREAD, NULL);
411 	}
412 	return (tcdp);
413 }
414 
415 /*
416  * Free a thread_communication_data_t block.
417  */
418 void
free_sigev_handler(thread_communication_data_t * tcdp)419 free_sigev_handler(thread_communication_data_t *tcdp)
420 {
421 	if (tcdp->tcd_attrp) {
422 		(void) pthread_attr_destroy(tcdp->tcd_attrp);
423 		tcdp->tcd_attrp = NULL;
424 	}
425 	(void) memset(&tcdp->tcd_notif, 0, sizeof (tcdp->tcd_notif));
426 
427 	switch (tcdp->tcd_subsystem) {
428 	case TIMER:
429 	case AIO:
430 		if (tcdp->tcd_port >= 0)
431 			(void) close(tcdp->tcd_port);
432 		break;
433 	case MQ:
434 		tcdp->tcd_msg_avail = NULL;
435 		tcdp->tcd_msg_object = NULL;
436 		tcdp->tcd_msg_userval = NULL;
437 		tcdp->tcd_msg_enabled = 0;
438 		break;
439 	}
440 
441 	lfree(tcdp, sizeof (*tcdp));
442 }
443 
444 /*
445  * Initialize data structure and create the port.
446  */
447 thread_communication_data_t *
setup_sigev_handler(const struct sigevent * sigevp,subsystem_t caller)448 setup_sigev_handler(const struct sigevent *sigevp, subsystem_t caller)
449 {
450 	thread_communication_data_t *tcdp;
451 	int error;
452 
453 	if (sigevp == NULL) {
454 		errno = EINVAL;
455 		return (NULL);
456 	}
457 
458 	if ((tcdp = alloc_sigev_handler(caller)) == NULL) {
459 		errno = ENOMEM;
460 		return (NULL);
461 	}
462 
463 	if (sigevp->sigev_notify_attributes == NULL)
464 		tcdp->tcd_attrp = NULL;		/* default attributes */
465 	else {
466 		/*
467 		 * We cannot just copy the sigevp->sigev_notify_attributes
468 		 * pointer.  We need to initialize a new pthread_attr_t
469 		 * structure with the values from the user-supplied
470 		 * pthread_attr_t.
471 		 */
472 		tcdp->tcd_attrp = &tcdp->tcd_user_attr;
473 		error = pthread_attr_clone(tcdp->tcd_attrp,
474 		    sigevp->sigev_notify_attributes);
475 		if (error) {
476 			tcdp->tcd_attrp = NULL;
477 			free_sigev_handler(tcdp);
478 			errno = error;
479 			return (NULL);
480 		}
481 	}
482 	tcdp->tcd_notif = *sigevp;
483 	tcdp->tcd_notif.sigev_notify_attributes = tcdp->tcd_attrp;
484 
485 	if (caller == TIMER || caller == AIO) {
486 		if ((tcdp->tcd_port = port_create()) < 0 ||
487 		    fcntl(tcdp->tcd_port, FD_CLOEXEC) == -1) {
488 			free_sigev_handler(tcdp);
489 			errno = EBADF;
490 			return (NULL);
491 		}
492 	}
493 	return (tcdp);
494 }
495 
496 /*
497  * Create a thread pool and launch the spawner.
498  */
499 int
launch_spawner(thread_communication_data_t * tcdp)500 launch_spawner(thread_communication_data_t *tcdp)
501 {
502 	int ret;
503 	int maxworkers;
504 	void *(*spawner)(void *);
505 	sigset_t set;
506 	sigset_t oset;
507 
508 	switch (tcdp->tcd_subsystem) {
509 	case TIMER:
510 		spawner = timer_spawner;
511 		maxworkers = 1;
512 		break;
513 	case MQ:
514 		spawner = mqueue_spawner;
515 		maxworkers = 1;
516 		break;
517 	case AIO:
518 		spawner = aio_spawner;
519 		maxworkers = 100;
520 		break;
521 	default:
522 		return (-1);
523 	}
524 	tcdp->tcd_poolp = tpool_create(1, maxworkers, 20,
525 	    tcdp->tcd_notif.sigev_notify_attributes);
526 	if (tcdp->tcd_poolp == NULL)
527 		return (-1);
528 	/* create the spawner with all signals blocked */
529 	(void) sigfillset(&set);
530 	(void) thr_sigsetmask(SIG_SETMASK, &set, &oset);
531 	ret = thr_create(NULL, 0, spawner, tcdp,
532 	    THR_DETACHED | THR_DAEMON, &tcdp->tcd_server_id);
533 	(void) thr_sigsetmask(SIG_SETMASK, &oset, NULL);
534 	if (ret != 0) {
535 		tpool_destroy(tcdp->tcd_poolp);
536 		tcdp->tcd_poolp = NULL;
537 		return (-1);
538 	}
539 	return (0);
540 }
541 
542 /*
543  * Delete the data associated with the sigev_thread timer, if timer is
544  * associated with such a notification option.
545  * Destroy the timer_spawner thread.
546  */
547 int
del_sigev_timer(timer_t timer)548 del_sigev_timer(timer_t timer)
549 {
550 	int rc = 0;
551 	thread_communication_data_t *tcdp;
552 
553 	if ((uint_t)timer < timer_max && (tcdp = timer_tcd[timer]) != NULL) {
554 		sig_mutex_lock(&tcdp->tcd_lock);
555 		if (tcdp->tcd_port >= 0) {
556 			if ((rc = port_alert(tcdp->tcd_port,
557 			    PORT_ALERT_SET, SIGEV_THREAD_TERM, NULL)) == 0) {
558 				dprintf("del_sigev_timer(%d) OK.\n", timer);
559 			}
560 		}
561 		timer_tcd[timer] = NULL;
562 		sig_mutex_unlock(&tcdp->tcd_lock);
563 	}
564 	return (rc);
565 }
566 
567 int
sigev_timer_getoverrun(timer_t timer)568 sigev_timer_getoverrun(timer_t timer)
569 {
570 	thread_communication_data_t *tcdp;
571 
572 	if ((uint_t)timer < timer_max && (tcdp = timer_tcd[timer]) != NULL)
573 		return (tcdp->tcd_overruns);
574 	return (0);
575 }
576 
577 static void
del_sigev_mq_cleanup(thread_communication_data_t * tcdp)578 del_sigev_mq_cleanup(thread_communication_data_t *tcdp)
579 {
580 	sig_mutex_unlock(&tcdp->tcd_lock);
581 	free_sigev_handler(tcdp);
582 }
583 
584 /*
585  * Delete the data associated with the sigev_thread message queue,
586  * if the message queue is associated with such a notification option.
587  * Destroy the mqueue_spawner thread.
588  */
589 void
del_sigev_mq(thread_communication_data_t * tcdp)590 del_sigev_mq(thread_communication_data_t *tcdp)
591 {
592 	pthread_t server_id;
593 	int rc;
594 
595 	sig_mutex_lock(&tcdp->tcd_lock);
596 
597 	server_id = tcdp->tcd_server_id;
598 	tcdp->tcd_msg_closing = 1;
599 	if ((rc = pthread_cancel(server_id)) != 0) {	/* "can't happen" */
600 		sig_mutex_unlock(&tcdp->tcd_lock);
601 		dprintf("Fail to cancel %u with error %d <%s>.\n",
602 		    server_id, rc, strerror(rc));
603 		return;
604 	}
605 
606 	/*
607 	 * wait for sigev_destroy_pool() to finish
608 	 */
609 	pthread_cleanup_push(del_sigev_mq_cleanup, tcdp);
610 	while (tcdp->tcd_server_id == server_id)
611 		(void) sig_cond_wait(&tcdp->tcd_cv, &tcdp->tcd_lock);
612 	pthread_cleanup_pop(1);
613 }
614 
615 /*
616  * POSIX aio:
617  * If the notification type is SIGEV_THREAD, set up
618  * the port number for notifications.  Create the
619  * thread pool and launch the spawner if necessary.
620  * If the notification type is not SIGEV_THREAD, do nothing.
621  */
622 int
_aio_sigev_thread_init(struct sigevent * sigevp)623 _aio_sigev_thread_init(struct sigevent *sigevp)
624 {
625 	static mutex_t sigev_aio_lock = DEFAULTMUTEX;
626 	static cond_t sigev_aio_cv = DEFAULTCV;
627 	static int sigev_aio_busy = 0;
628 
629 	thread_communication_data_t *tcdp;
630 	int port;
631 	int cancel_state;
632 	int rc = 0;
633 
634 	if (sigevp == NULL ||
635 	    sigevp->sigev_notify != SIGEV_THREAD ||
636 	    sigevp->sigev_notify_function == NULL)
637 		return (0);
638 
639 	lmutex_lock(&sigev_aio_lock);
640 	(void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state);
641 	while (sigev_aio_busy)
642 		(void) cond_wait(&sigev_aio_cv, &sigev_aio_lock);
643 	(void) pthread_setcancelstate(cancel_state, NULL);
644 	if ((tcdp = sigev_aio_tcd) != NULL)
645 		port = tcdp->tcd_port;
646 	else {
647 		sigev_aio_busy = 1;
648 		lmutex_unlock(&sigev_aio_lock);
649 
650 		tcdp = setup_sigev_handler(sigevp, AIO);
651 		if (tcdp == NULL) {
652 			port = -1;
653 			rc = -1;
654 		} else if (launch_spawner(tcdp) != 0) {
655 			free_sigev_handler(tcdp);
656 			tcdp = NULL;
657 			port = -1;
658 			rc = -1;
659 		} else {
660 			port = tcdp->tcd_port;
661 		}
662 
663 		lmutex_lock(&sigev_aio_lock);
664 		sigev_aio_tcd = tcdp;
665 		sigev_aio_busy = 0;
666 		(void) cond_broadcast(&sigev_aio_cv);
667 	}
668 	lmutex_unlock(&sigev_aio_lock);
669 	sigevp->sigev_signo = port;
670 	return (rc);
671 }
672 
673 int
_aio_sigev_thread(aiocb_t * aiocbp)674 _aio_sigev_thread(aiocb_t *aiocbp)
675 {
676 	if (aiocbp == NULL)
677 		return (0);
678 	return (_aio_sigev_thread_init(&aiocbp->aio_sigevent));
679 }
680 
681 #if !defined(_LP64)
682 int
_aio_sigev_thread64(aiocb64_t * aiocbp)683 _aio_sigev_thread64(aiocb64_t *aiocbp)
684 {
685 	if (aiocbp == NULL)
686 		return (0);
687 	return (_aio_sigev_thread_init(&aiocbp->aio_sigevent));
688 }
689 #endif
690 
691 /*
692  * Cleanup POSIX aio after fork1() in the child process.
693  */
694 void
postfork1_child_sigev_aio(void)695 postfork1_child_sigev_aio(void)
696 {
697 	thread_communication_data_t *tcdp;
698 
699 	if ((tcdp = sigev_aio_tcd) != NULL) {
700 		sigev_aio_tcd = NULL;
701 		tcd_teardown(tcdp);
702 	}
703 }
704 
705 /*
706  * Utility function for the various postfork1_child_sigev_*() functions.
707  * Clean up the tcdp data structure and close the port.
708  */
709 void
tcd_teardown(thread_communication_data_t * tcdp)710 tcd_teardown(thread_communication_data_t *tcdp)
711 {
712 	if (tcdp->tcd_poolp != NULL)
713 		tpool_abandon(tcdp->tcd_poolp);
714 	tcdp->tcd_poolp = NULL;
715 	tcdp->tcd_server_id = 0;
716 	free_sigev_handler(tcdp);
717 }
718