1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 22 /* 23 * Copyright 2006 Sun Microsystems, Inc. All rights reserved. 24 * Use is subject to license terms. 25 */ 26 27 #pragma ident "%Z%%M% %I% %E% SMI" 28 29 #include "synonyms.h" 30 #include "thr_uberdata.h" 31 #include <sys/types.h> 32 #include <pthread.h> 33 #include <unistd.h> 34 #include <stdlib.h> 35 #include <thread.h> 36 #include <pthread.h> 37 #include <synch.h> 38 #include <port.h> 39 #include <signal.h> 40 #include <stdio.h> 41 #include <errno.h> 42 #include <stdarg.h> 43 #include <string.h> 44 #include <sys/aiocb.h> 45 #include <time.h> 46 #include <signal.h> 47 #include <fcntl.h> 48 #include "sigev_thread.h" 49 50 /* 51 * There is but one spawner for all aio operations. 52 */ 53 thread_communication_data_t *sigev_aio_tcd = NULL; 54 55 /* 56 * Set non-zero via _RT_DEBUG to enable debugging printf's. 57 */ 58 static int _rt_debug = 0; 59 60 void 61 init_sigev_thread(void) 62 { 63 char *ldebug; 64 65 if ((ldebug = getenv("_RT_DEBUG")) != NULL) 66 _rt_debug = atoi(ldebug); 67 } 68 69 /* 70 * Routine to print debug messages: 71 * If _rt_debug is set, printf the debug message to stderr 72 * with an appropriate prefix. 73 */ 74 /*PRINTFLIKE1*/ 75 static void 76 dprintf(const char *format, ...) 77 { 78 if (_rt_debug) { 79 va_list alist; 80 81 va_start(alist, format); 82 flockfile(stderr); 83 (void) fputs("DEBUG: ", stderr); 84 (void) vfprintf(stderr, format, alist); 85 funlockfile(stderr); 86 va_end(alist); 87 } 88 } 89 90 /* 91 * The notify_thread() function can be used as the start function of a new 92 * thread but it is normally called from notifier(), below, in the context 93 * of a thread pool worker thread. It is used as the start function of a 94 * new thread only when individual pthread attributes differ from those 95 * that are common to all workers. This only occurs in the AIO case. 96 */ 97 static void * 98 notify_thread(void *arg) 99 { 100 sigev_thread_data_t *stdp = arg; 101 void (*function)(union sigval) = stdp->std_func; 102 union sigval argument = stdp->std_arg; 103 104 lfree(stdp, sizeof (*stdp)); 105 function(argument); 106 return (NULL); 107 } 108 109 /* 110 * Thread pool interface to call the user-supplied notification function. 111 */ 112 static void 113 notifier(void *arg) 114 { 115 (void) notify_thread(arg); 116 } 117 118 /* 119 * This routine adds a new work request, described by function 120 * and argument, to the list of outstanding jobs. 121 * It returns 0 indicating success. A value != 0 indicates an error. 122 */ 123 static int 124 sigev_add_work(thread_communication_data_t *tcdp, 125 void (*function)(union sigval), union sigval argument) 126 { 127 tpool_t *tpool = tcdp->tcd_poolp; 128 sigev_thread_data_t *stdp; 129 130 if (tpool == NULL) 131 return (EINVAL); 132 if ((stdp = lmalloc(sizeof (*stdp))) == NULL) 133 return (errno); 134 stdp->std_func = function; 135 stdp->std_arg = argument; 136 if (tpool_dispatch(tpool, notifier, stdp) != 0) { 137 lfree(stdp, sizeof (*stdp)); 138 return (errno); 139 } 140 return (0); 141 } 142 143 static void 144 sigev_destroy_pool(thread_communication_data_t *tcdp) 145 { 146 if (tcdp->tcd_poolp != NULL) 147 tpool_abandon(tcdp->tcd_poolp); 148 tcdp->tcd_poolp = NULL; 149 150 if (tcdp->tcd_subsystem == MQ) { 151 /* 152 * synchronize with del_sigev_mq() 153 */ 154 sig_mutex_lock(&tcdp->tcd_lock); 155 tcdp->tcd_server_id = 0; 156 if (tcdp->tcd_msg_closing) { 157 (void) cond_broadcast(&tcdp->tcd_cv); 158 sig_mutex_unlock(&tcdp->tcd_lock); 159 return; /* del_sigev_mq() will free the tcd */ 160 } 161 sig_mutex_unlock(&tcdp->tcd_lock); 162 } 163 164 /* 165 * now delete everything 166 */ 167 free_sigev_handler(tcdp); 168 } 169 170 /* 171 * timer_spawner(), mqueue_spawner(), and aio_spawner() are the main 172 * functions for the daemon threads that get the event(s) for the 173 * respective SIGEV_THREAD subsystems. There is one timer spawner for 174 * each timer_create(), one mqueue spawner for every mq_open(), and 175 * exactly one aio spawner for all aio requests. These spawners add 176 * work requests to be done by a pool of daemon worker threads. In case 177 * the event requires creation of a worker thread with different pthread 178 * attributes than those from the pool of workers, a new daemon thread 179 * with these attributes is spawned apart from the pool of workers. 180 * If the spawner fails to add work or fails to create an additional 181 * thread because of lacking resources, it puts the event back into 182 * the kernel queue and re-tries some time later. 183 */ 184 185 void * 186 timer_spawner(void *arg) 187 { 188 thread_communication_data_t *tcdp = (thread_communication_data_t *)arg; 189 port_event_t port_event; 190 191 /* destroy the pool if we are cancelled */ 192 pthread_cleanup_push(sigev_destroy_pool, tcdp); 193 194 for (;;) { 195 if (port_get(tcdp->tcd_port, &port_event, NULL) != 0) { 196 dprintf("port_get on port %d failed with %d <%s>\n", 197 tcdp->tcd_port, errno, strerror(errno)); 198 break; 199 } 200 switch (port_event.portev_source) { 201 case PORT_SOURCE_TIMER: 202 break; 203 case PORT_SOURCE_ALERT: 204 if (port_event.portev_events != SIGEV_THREAD_TERM) 205 errno = EPROTO; 206 goto out; 207 default: 208 dprintf("port_get on port %d returned %u " 209 "(not PORT_SOURCE_TIMER)\n", 210 tcdp->tcd_port, port_event.portev_source); 211 errno = EPROTO; 212 goto out; 213 } 214 215 tcdp->tcd_overruns = port_event.portev_events - 1; 216 if (sigev_add_work(tcdp, 217 tcdp->tcd_notif.sigev_notify_function, 218 tcdp->tcd_notif.sigev_value) != 0) 219 break; 220 /* wait until job is done before looking for another */ 221 tpool_wait(tcdp->tcd_poolp); 222 } 223 out: 224 pthread_cleanup_pop(1); 225 return (NULL); 226 } 227 228 void * 229 mqueue_spawner(void *arg) 230 { 231 thread_communication_data_t *tcdp = (thread_communication_data_t *)arg; 232 int ret = 0; 233 int ntype; 234 void (*function)(union sigval); 235 union sigval argument; 236 237 /* destroy the pool if we are cancelled */ 238 pthread_cleanup_push(sigev_destroy_pool, tcdp); 239 240 while (ret == 0) { 241 sig_mutex_lock(&tcdp->tcd_lock); 242 pthread_cleanup_push(sig_mutex_unlock, &tcdp->tcd_lock); 243 while ((ntype = tcdp->tcd_msg_enabled) == 0) 244 (void) sig_cond_wait(&tcdp->tcd_cv, &tcdp->tcd_lock); 245 pthread_cleanup_pop(1); 246 247 while (sem_wait(tcdp->tcd_msg_avail) == -1) 248 continue; 249 250 sig_mutex_lock(&tcdp->tcd_lock); 251 tcdp->tcd_msg_enabled = 0; 252 sig_mutex_unlock(&tcdp->tcd_lock); 253 254 /* ASSERT(ntype == SIGEV_THREAD || ntype == SIGEV_PORT); */ 255 if (ntype == SIGEV_THREAD) { 256 function = tcdp->tcd_notif.sigev_notify_function; 257 argument.sival_ptr = tcdp->tcd_msg_userval; 258 ret = sigev_add_work(tcdp, function, argument); 259 } else { /* ntype == SIGEV_PORT */ 260 ret = _port_dispatch(tcdp->tcd_port, 0, PORT_SOURCE_MQ, 261 0, (uintptr_t)tcdp->tcd_msg_object, 262 tcdp->tcd_msg_userval); 263 } 264 } 265 sig_mutex_unlock(&tcdp->tcd_lock); 266 267 pthread_cleanup_pop(1); 268 return (NULL); 269 } 270 271 void * 272 aio_spawner(void *arg) 273 { 274 thread_communication_data_t *tcdp = (thread_communication_data_t *)arg; 275 int error = 0; 276 void (*function)(union sigval); 277 union sigval argument; 278 port_event_t port_event; 279 struct sigevent *sigevp; 280 timespec_t delta; 281 pthread_attr_t *attrp; 282 283 /* destroy the pool if we are cancelled */ 284 pthread_cleanup_push(sigev_destroy_pool, tcdp); 285 286 while (error == 0) { 287 if (port_get(tcdp->tcd_port, &port_event, NULL) != 0) { 288 error = errno; 289 dprintf("port_get on port %d failed with %d <%s>\n", 290 tcdp->tcd_port, error, strerror(error)); 291 break; 292 } 293 switch (port_event.portev_source) { 294 case PORT_SOURCE_AIO: 295 break; 296 case PORT_SOURCE_ALERT: 297 if (port_event.portev_events != SIGEV_THREAD_TERM) 298 errno = EPROTO; 299 goto out; 300 default: 301 dprintf("port_get on port %d returned %u " 302 "(not PORT_SOURCE_AIO)\n", 303 tcdp->tcd_port, port_event.portev_source); 304 errno = EPROTO; 305 goto out; 306 } 307 argument.sival_ptr = port_event.portev_user; 308 switch (port_event.portev_events) { 309 case AIOLIO: 310 #if !defined(_LP64) 311 case AIOLIO64: 312 #endif 313 sigevp = (struct sigevent *)port_event.portev_object; 314 function = sigevp->sigev_notify_function; 315 attrp = sigevp->sigev_notify_attributes; 316 break; 317 case AIOAREAD: 318 case AIOAWRITE: 319 case AIOFSYNC: 320 { 321 aiocb_t *aiocbp = 322 (aiocb_t *)port_event.portev_object; 323 function = aiocbp->aio_sigevent.sigev_notify_function; 324 attrp = aiocbp->aio_sigevent.sigev_notify_attributes; 325 break; 326 } 327 #if !defined(_LP64) 328 case AIOAREAD64: 329 case AIOAWRITE64: 330 case AIOFSYNC64: 331 { 332 aiocb64_t *aiocbp = 333 (aiocb64_t *)port_event.portev_object; 334 function = aiocbp->aio_sigevent.sigev_notify_function; 335 attrp = aiocbp->aio_sigevent.sigev_notify_attributes; 336 break; 337 } 338 #endif 339 default: 340 function = NULL; 341 attrp = NULL; 342 break; 343 } 344 345 if (function == NULL) 346 error = EINVAL; 347 else if (_pthread_attr_equal(attrp, tcdp->tcd_attrp)) 348 error = sigev_add_work(tcdp, function, argument); 349 else { 350 /* 351 * The attributes don't match. 352 * Spawn a thread with the non-matching attributes. 353 */ 354 pthread_attr_t local_attr; 355 sigev_thread_data_t *stdp; 356 357 if ((stdp = lmalloc(sizeof (*stdp))) == NULL) 358 error = ENOMEM; 359 else 360 error = _pthread_attr_clone(&local_attr, attrp); 361 362 if (error == 0) { 363 (void) pthread_attr_setdetachstate( 364 &local_attr, PTHREAD_CREATE_DETACHED); 365 (void) _pthread_attr_setdaemonstate_np( 366 &local_attr, PTHREAD_CREATE_DAEMON_NP); 367 stdp->std_func = function; 368 stdp->std_arg = argument; 369 error = pthread_create(NULL, &local_attr, 370 notify_thread, stdp); 371 (void) pthread_attr_destroy(&local_attr); 372 } 373 if (error && stdp != NULL) 374 lfree(stdp, sizeof (*stdp)); 375 } 376 377 if (error) { 378 dprintf("Cannot add work, error=%d <%s>.\n", 379 error, strerror(error)); 380 if (error == EAGAIN || error == ENOMEM) { 381 /* (Temporary) no resources are available. */ 382 if (_port_dispatch(tcdp->tcd_port, 0, 383 PORT_SOURCE_AIO, port_event.portev_events, 384 port_event.portev_object, 385 port_event.portev_user) != 0) 386 break; 387 error = 0; 388 delta.tv_sec = 0; 389 delta.tv_nsec = NANOSEC / 20; /* 50 msec */ 390 (void) nanosleep(&delta, NULL); 391 } 392 } 393 } 394 out: 395 pthread_cleanup_pop(1); 396 return (NULL); 397 } 398 399 /* 400 * Allocate a thread_communication_data_t block. 401 */ 402 static thread_communication_data_t * 403 alloc_sigev_handler(subsystem_t caller) 404 { 405 thread_communication_data_t *tcdp; 406 407 if ((tcdp = lmalloc(sizeof (*tcdp))) != NULL) { 408 tcdp->tcd_subsystem = caller; 409 tcdp->tcd_port = -1; 410 (void) mutex_init(&tcdp->tcd_lock, USYNC_THREAD, NULL); 411 (void) cond_init(&tcdp->tcd_cv, USYNC_THREAD, NULL); 412 } 413 return (tcdp); 414 } 415 416 /* 417 * Free a thread_communication_data_t block. 418 */ 419 void 420 free_sigev_handler(thread_communication_data_t *tcdp) 421 { 422 if (tcdp->tcd_attrp) { 423 (void) pthread_attr_destroy(tcdp->tcd_attrp); 424 tcdp->tcd_attrp = NULL; 425 } 426 (void) memset(&tcdp->tcd_notif, 0, sizeof (tcdp->tcd_notif)); 427 428 switch (tcdp->tcd_subsystem) { 429 case TIMER: 430 case AIO: 431 if (tcdp->tcd_port >= 0) 432 (void) close(tcdp->tcd_port); 433 break; 434 case MQ: 435 tcdp->tcd_msg_avail = NULL; 436 tcdp->tcd_msg_object = NULL; 437 tcdp->tcd_msg_userval = NULL; 438 tcdp->tcd_msg_enabled = 0; 439 break; 440 } 441 442 lfree(tcdp, sizeof (*tcdp)); 443 } 444 445 /* 446 * Initialize data structure and create the port. 447 */ 448 thread_communication_data_t * 449 setup_sigev_handler(const struct sigevent *sigevp, subsystem_t caller) 450 { 451 thread_communication_data_t *tcdp; 452 int error; 453 454 if (sigevp == NULL) { 455 errno = EINVAL; 456 return (NULL); 457 } 458 459 if ((tcdp = alloc_sigev_handler(caller)) == NULL) { 460 errno = ENOMEM; 461 return (NULL); 462 } 463 464 if (sigevp->sigev_notify_attributes == NULL) 465 tcdp->tcd_attrp = NULL; /* default attributes */ 466 else { 467 /* 468 * We cannot just copy the sigevp->sigev_notify_attributes 469 * pointer. We need to initialize a new pthread_attr_t 470 * structure with the values from the user-supplied 471 * pthread_attr_t. 472 */ 473 tcdp->tcd_attrp = &tcdp->tcd_user_attr; 474 error = _pthread_attr_clone(tcdp->tcd_attrp, 475 sigevp->sigev_notify_attributes); 476 if (error) { 477 tcdp->tcd_attrp = NULL; 478 free_sigev_handler(tcdp); 479 errno = error; 480 return (NULL); 481 } 482 } 483 tcdp->tcd_notif = *sigevp; 484 tcdp->tcd_notif.sigev_notify_attributes = tcdp->tcd_attrp; 485 486 if (caller == TIMER || caller == AIO) { 487 if ((tcdp->tcd_port = port_create()) < 0 || 488 fcntl(tcdp->tcd_port, FD_CLOEXEC) == -1) { 489 free_sigev_handler(tcdp); 490 errno = EBADF; 491 return (NULL); 492 } 493 } 494 return (tcdp); 495 } 496 497 /* 498 * Create a thread pool and launch the spawner. 499 */ 500 int 501 launch_spawner(thread_communication_data_t *tcdp) 502 { 503 int ret; 504 int maxworkers; 505 void *(*spawner)(void *); 506 sigset_t set; 507 sigset_t oset; 508 509 switch (tcdp->tcd_subsystem) { 510 case TIMER: 511 spawner = timer_spawner; 512 maxworkers = 1; 513 break; 514 case MQ: 515 spawner = mqueue_spawner; 516 maxworkers = 1; 517 break; 518 case AIO: 519 spawner = aio_spawner; 520 maxworkers = 100; 521 break; 522 default: 523 return (-1); 524 } 525 tcdp->tcd_poolp = tpool_create(1, maxworkers, 20, 526 tcdp->tcd_notif.sigev_notify_attributes); 527 if (tcdp->tcd_poolp == NULL) 528 return (-1); 529 /* create the spawner with all signals blocked */ 530 (void) sigfillset(&set); 531 (void) thr_sigsetmask(SIG_SETMASK, &set, &oset); 532 ret = thr_create(NULL, 0, spawner, tcdp, 533 THR_DETACHED | THR_DAEMON, &tcdp->tcd_server_id); 534 (void) thr_sigsetmask(SIG_SETMASK, &oset, NULL); 535 if (ret != 0) { 536 tpool_destroy(tcdp->tcd_poolp); 537 tcdp->tcd_poolp = NULL; 538 return (-1); 539 } 540 return (0); 541 } 542 543 /* 544 * Delete the data associated with the sigev_thread timer, if timer is 545 * associated with such a notification option. 546 * Destroy the timer_spawner thread. 547 */ 548 int 549 del_sigev_timer(timer_t timer) 550 { 551 int rc = 0; 552 thread_communication_data_t *tcdp; 553 554 if ((uint_t)timer < timer_max && (tcdp = timer_tcd[timer]) != NULL) { 555 sig_mutex_lock(&tcdp->tcd_lock); 556 if (tcdp->tcd_port >= 0) { 557 if ((rc = port_alert(tcdp->tcd_port, 558 PORT_ALERT_SET, SIGEV_THREAD_TERM, NULL)) == 0) { 559 dprintf("del_sigev_timer(%d) OK.\n", timer); 560 } 561 } 562 timer_tcd[timer] = NULL; 563 sig_mutex_unlock(&tcdp->tcd_lock); 564 } 565 return (rc); 566 } 567 568 int 569 sigev_timer_getoverrun(timer_t timer) 570 { 571 thread_communication_data_t *tcdp; 572 573 if ((uint_t)timer < timer_max && (tcdp = timer_tcd[timer]) != NULL) 574 return (tcdp->tcd_overruns); 575 return (0); 576 } 577 578 static void 579 del_sigev_mq_cleanup(thread_communication_data_t *tcdp) 580 { 581 sig_mutex_unlock(&tcdp->tcd_lock); 582 free_sigev_handler(tcdp); 583 } 584 585 /* 586 * Delete the data associated with the sigev_thread message queue, 587 * if the message queue is associated with such a notification option. 588 * Destroy the mqueue_spawner thread. 589 */ 590 void 591 del_sigev_mq(thread_communication_data_t *tcdp) 592 { 593 pthread_t server_id; 594 int rc; 595 596 sig_mutex_lock(&tcdp->tcd_lock); 597 598 server_id = tcdp->tcd_server_id; 599 tcdp->tcd_msg_closing = 1; 600 if ((rc = pthread_cancel(server_id)) != 0) { /* "can't happen" */ 601 sig_mutex_unlock(&tcdp->tcd_lock); 602 dprintf("Fail to cancel %u with error %d <%s>.\n", 603 server_id, rc, strerror(rc)); 604 return; 605 } 606 607 /* 608 * wait for sigev_destroy_pool() to finish 609 */ 610 pthread_cleanup_push(del_sigev_mq_cleanup, tcdp); 611 while (tcdp->tcd_server_id == server_id) 612 (void) sig_cond_wait(&tcdp->tcd_cv, &tcdp->tcd_lock); 613 pthread_cleanup_pop(1); 614 } 615 616 /* 617 * POSIX aio: 618 * If the notification type is SIGEV_THREAD, set up 619 * the port number for notifications. Create the 620 * thread pool and launch the spawner if necessary. 621 * If the notification type is not SIGEV_THREAD, do nothing. 622 */ 623 int 624 _aio_sigev_thread_init(struct sigevent *sigevp) 625 { 626 static mutex_t sigev_aio_lock = DEFAULTMUTEX; 627 static cond_t sigev_aio_cv = DEFAULTCV; 628 static int sigev_aio_busy = 0; 629 630 thread_communication_data_t *tcdp; 631 int port; 632 int rc = 0; 633 634 if (sigevp == NULL || 635 sigevp->sigev_notify != SIGEV_THREAD || 636 sigevp->sigev_notify_function == NULL) 637 return (0); 638 639 lmutex_lock(&sigev_aio_lock); 640 while (sigev_aio_busy) 641 (void) _cond_wait(&sigev_aio_cv, &sigev_aio_lock); 642 if ((tcdp = sigev_aio_tcd) != NULL) 643 port = tcdp->tcd_port; 644 else { 645 sigev_aio_busy = 1; 646 lmutex_unlock(&sigev_aio_lock); 647 648 tcdp = setup_sigev_handler(sigevp, AIO); 649 if (tcdp == NULL) { 650 port = -1; 651 rc = -1; 652 } else if (launch_spawner(tcdp) != 0) { 653 free_sigev_handler(tcdp); 654 tcdp = NULL; 655 port = -1; 656 rc = -1; 657 } else { 658 port = tcdp->tcd_port; 659 } 660 661 lmutex_lock(&sigev_aio_lock); 662 sigev_aio_tcd = tcdp; 663 sigev_aio_busy = 0; 664 (void) cond_broadcast(&sigev_aio_cv); 665 } 666 lmutex_unlock(&sigev_aio_lock); 667 sigevp->sigev_signo = port; 668 return (rc); 669 } 670 671 int 672 _aio_sigev_thread(aiocb_t *aiocbp) 673 { 674 if (aiocbp == NULL) 675 return (0); 676 return (_aio_sigev_thread_init(&aiocbp->aio_sigevent)); 677 } 678 679 #if !defined(_LP64) 680 int 681 _aio_sigev_thread64(aiocb64_t *aiocbp) 682 { 683 if (aiocbp == NULL) 684 return (0); 685 return (_aio_sigev_thread_init(&aiocbp->aio_sigevent)); 686 } 687 #endif 688 689 /* 690 * Cleanup POSIX aio after fork1() in the child process. 691 */ 692 void 693 postfork1_child_sigev_aio(void) 694 { 695 thread_communication_data_t *tcdp; 696 697 if ((tcdp = sigev_aio_tcd) != NULL) { 698 sigev_aio_tcd = NULL; 699 tcd_teardown(tcdp); 700 } 701 } 702 703 /* 704 * Utility function for the various postfork1_child_sigev_*() functions. 705 * Clean up the tcdp data structure and close the port. 706 */ 707 void 708 tcd_teardown(thread_communication_data_t *tcdp) 709 { 710 if (tcdp->tcd_poolp != NULL) 711 tpool_abandon(tcdp->tcd_poolp); 712 tcdp->tcd_poolp = NULL; 713 tcdp->tcd_server_id = 0; 714 free_sigev_handler(tcdp); 715 } 716