1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 22 /* 23 * Copyright 2008 Sun Microsystems, Inc. All rights reserved. 24 * Use is subject to license terms. 25 */ 26 27 #pragma ident "%Z%%M% %I% %E% SMI" 28 29 #include "lint.h" 30 #include "thr_uberdata.h" 31 #include <sys/types.h> 32 #include <pthread.h> 33 #include <unistd.h> 34 #include <stdlib.h> 35 #include <thread.h> 36 #include <pthread.h> 37 #include <synch.h> 38 #include <port.h> 39 #include <signal.h> 40 #include <stdio.h> 41 #include <errno.h> 42 #include <stdarg.h> 43 #include <string.h> 44 #include <sys/aiocb.h> 45 #include <time.h> 46 #include <signal.h> 47 #include <fcntl.h> 48 #include "sigev_thread.h" 49 50 /* 51 * There is but one spawner for all aio operations. 52 */ 53 thread_communication_data_t *sigev_aio_tcd = NULL; 54 55 /* 56 * Set non-zero via _RT_DEBUG to enable debugging printf's. 57 */ 58 static int _rt_debug = 0; 59 60 void 61 init_sigev_thread(void) 62 { 63 char *ldebug; 64 65 if ((ldebug = getenv("_RT_DEBUG")) != NULL) 66 _rt_debug = atoi(ldebug); 67 } 68 69 /* 70 * Routine to print debug messages: 71 * If _rt_debug is set, printf the debug message to stderr 72 * with an appropriate prefix. 73 */ 74 /*PRINTFLIKE1*/ 75 static void 76 dprintf(const char *format, ...) 77 { 78 if (_rt_debug) { 79 va_list alist; 80 81 va_start(alist, format); 82 flockfile(stderr); 83 pthread_cleanup_push(funlockfile, stderr); 84 (void) fputs("DEBUG: ", stderr); 85 (void) vfprintf(stderr, format, alist); 86 pthread_cleanup_pop(1); /* funlockfile(stderr) */ 87 va_end(alist); 88 } 89 } 90 91 /* 92 * The notify_thread() function can be used as the start function of a new 93 * thread but it is normally called from notifier(), below, in the context 94 * of a thread pool worker thread. It is used as the start function of a 95 * new thread only when individual pthread attributes differ from those 96 * that are common to all workers. This only occurs in the AIO case. 97 */ 98 static void * 99 notify_thread(void *arg) 100 { 101 sigev_thread_data_t *stdp = arg; 102 void (*function)(union sigval) = stdp->std_func; 103 union sigval argument = stdp->std_arg; 104 105 lfree(stdp, sizeof (*stdp)); 106 function(argument); 107 return (NULL); 108 } 109 110 /* 111 * Thread pool interface to call the user-supplied notification function. 112 */ 113 static void 114 notifier(void *arg) 115 { 116 (void) notify_thread(arg); 117 } 118 119 /* 120 * This routine adds a new work request, described by function 121 * and argument, to the list of outstanding jobs. 122 * It returns 0 indicating success. A value != 0 indicates an error. 123 */ 124 static int 125 sigev_add_work(thread_communication_data_t *tcdp, 126 void (*function)(union sigval), union sigval argument) 127 { 128 tpool_t *tpool = tcdp->tcd_poolp; 129 sigev_thread_data_t *stdp; 130 131 if (tpool == NULL) 132 return (EINVAL); 133 if ((stdp = lmalloc(sizeof (*stdp))) == NULL) 134 return (errno); 135 stdp->std_func = function; 136 stdp->std_arg = argument; 137 if (tpool_dispatch(tpool, notifier, stdp) != 0) { 138 lfree(stdp, sizeof (*stdp)); 139 return (errno); 140 } 141 return (0); 142 } 143 144 static void 145 sigev_destroy_pool(thread_communication_data_t *tcdp) 146 { 147 if (tcdp->tcd_poolp != NULL) 148 tpool_abandon(tcdp->tcd_poolp); 149 tcdp->tcd_poolp = NULL; 150 151 if (tcdp->tcd_subsystem == MQ) { 152 /* 153 * synchronize with del_sigev_mq() 154 */ 155 sig_mutex_lock(&tcdp->tcd_lock); 156 tcdp->tcd_server_id = 0; 157 if (tcdp->tcd_msg_closing) { 158 (void) cond_broadcast(&tcdp->tcd_cv); 159 sig_mutex_unlock(&tcdp->tcd_lock); 160 return; /* del_sigev_mq() will free the tcd */ 161 } 162 sig_mutex_unlock(&tcdp->tcd_lock); 163 } 164 165 /* 166 * now delete everything 167 */ 168 free_sigev_handler(tcdp); 169 } 170 171 /* 172 * timer_spawner(), mqueue_spawner(), and aio_spawner() are the main 173 * functions for the daemon threads that get the event(s) for the 174 * respective SIGEV_THREAD subsystems. There is one timer spawner for 175 * each timer_create(), one mqueue spawner for every mq_open(), and 176 * exactly one aio spawner for all aio requests. These spawners add 177 * work requests to be done by a pool of daemon worker threads. In case 178 * the event requires creation of a worker thread with different pthread 179 * attributes than those from the pool of workers, a new daemon thread 180 * with these attributes is spawned apart from the pool of workers. 181 * If the spawner fails to add work or fails to create an additional 182 * thread because of lacking resources, it puts the event back into 183 * the kernel queue and re-tries some time later. 184 */ 185 186 void * 187 timer_spawner(void *arg) 188 { 189 thread_communication_data_t *tcdp = (thread_communication_data_t *)arg; 190 port_event_t port_event; 191 192 /* destroy the pool if we are cancelled */ 193 pthread_cleanup_push(sigev_destroy_pool, tcdp); 194 195 for (;;) { 196 if (port_get(tcdp->tcd_port, &port_event, NULL) != 0) { 197 dprintf("port_get on port %d failed with %d <%s>\n", 198 tcdp->tcd_port, errno, strerror(errno)); 199 break; 200 } 201 switch (port_event.portev_source) { 202 case PORT_SOURCE_TIMER: 203 break; 204 case PORT_SOURCE_ALERT: 205 if (port_event.portev_events != SIGEV_THREAD_TERM) 206 errno = EPROTO; 207 goto out; 208 default: 209 dprintf("port_get on port %d returned %u " 210 "(not PORT_SOURCE_TIMER)\n", 211 tcdp->tcd_port, port_event.portev_source); 212 errno = EPROTO; 213 goto out; 214 } 215 216 tcdp->tcd_overruns = port_event.portev_events - 1; 217 if (sigev_add_work(tcdp, 218 tcdp->tcd_notif.sigev_notify_function, 219 tcdp->tcd_notif.sigev_value) != 0) 220 break; 221 /* wait until job is done before looking for another */ 222 tpool_wait(tcdp->tcd_poolp); 223 } 224 out: 225 pthread_cleanup_pop(1); 226 return (NULL); 227 } 228 229 void * 230 mqueue_spawner(void *arg) 231 { 232 thread_communication_data_t *tcdp = (thread_communication_data_t *)arg; 233 int ret = 0; 234 int ntype; 235 void (*function)(union sigval); 236 union sigval argument; 237 238 /* destroy the pool if we are cancelled */ 239 pthread_cleanup_push(sigev_destroy_pool, tcdp); 240 241 while (ret == 0) { 242 sig_mutex_lock(&tcdp->tcd_lock); 243 pthread_cleanup_push(sig_mutex_unlock, &tcdp->tcd_lock); 244 while ((ntype = tcdp->tcd_msg_enabled) == 0) 245 (void) sig_cond_wait(&tcdp->tcd_cv, &tcdp->tcd_lock); 246 pthread_cleanup_pop(1); 247 248 while (sem_wait(tcdp->tcd_msg_avail) == -1) 249 continue; 250 251 sig_mutex_lock(&tcdp->tcd_lock); 252 tcdp->tcd_msg_enabled = 0; 253 sig_mutex_unlock(&tcdp->tcd_lock); 254 255 /* ASSERT(ntype == SIGEV_THREAD || ntype == SIGEV_PORT); */ 256 if (ntype == SIGEV_THREAD) { 257 function = tcdp->tcd_notif.sigev_notify_function; 258 argument.sival_ptr = tcdp->tcd_msg_userval; 259 ret = sigev_add_work(tcdp, function, argument); 260 } else { /* ntype == SIGEV_PORT */ 261 ret = _port_dispatch(tcdp->tcd_port, 0, PORT_SOURCE_MQ, 262 0, (uintptr_t)tcdp->tcd_msg_object, 263 tcdp->tcd_msg_userval); 264 } 265 } 266 sig_mutex_unlock(&tcdp->tcd_lock); 267 268 pthread_cleanup_pop(1); 269 return (NULL); 270 } 271 272 void * 273 aio_spawner(void *arg) 274 { 275 thread_communication_data_t *tcdp = (thread_communication_data_t *)arg; 276 int error = 0; 277 void (*function)(union sigval); 278 union sigval argument; 279 port_event_t port_event; 280 struct sigevent *sigevp; 281 timespec_t delta; 282 pthread_attr_t *attrp; 283 284 /* destroy the pool if we are cancelled */ 285 pthread_cleanup_push(sigev_destroy_pool, tcdp); 286 287 while (error == 0) { 288 if (port_get(tcdp->tcd_port, &port_event, NULL) != 0) { 289 error = errno; 290 dprintf("port_get on port %d failed with %d <%s>\n", 291 tcdp->tcd_port, error, strerror(error)); 292 break; 293 } 294 switch (port_event.portev_source) { 295 case PORT_SOURCE_AIO: 296 break; 297 case PORT_SOURCE_ALERT: 298 if (port_event.portev_events != SIGEV_THREAD_TERM) 299 errno = EPROTO; 300 goto out; 301 default: 302 dprintf("port_get on port %d returned %u " 303 "(not PORT_SOURCE_AIO)\n", 304 tcdp->tcd_port, port_event.portev_source); 305 errno = EPROTO; 306 goto out; 307 } 308 argument.sival_ptr = port_event.portev_user; 309 switch (port_event.portev_events) { 310 case AIOLIO: 311 #if !defined(_LP64) 312 case AIOLIO64: 313 #endif 314 sigevp = (struct sigevent *)port_event.portev_object; 315 function = sigevp->sigev_notify_function; 316 attrp = sigevp->sigev_notify_attributes; 317 break; 318 case AIOAREAD: 319 case AIOAWRITE: 320 case AIOFSYNC: 321 { 322 aiocb_t *aiocbp = 323 (aiocb_t *)port_event.portev_object; 324 function = aiocbp->aio_sigevent.sigev_notify_function; 325 attrp = aiocbp->aio_sigevent.sigev_notify_attributes; 326 break; 327 } 328 #if !defined(_LP64) 329 case AIOAREAD64: 330 case AIOAWRITE64: 331 case AIOFSYNC64: 332 { 333 aiocb64_t *aiocbp = 334 (aiocb64_t *)port_event.portev_object; 335 function = aiocbp->aio_sigevent.sigev_notify_function; 336 attrp = aiocbp->aio_sigevent.sigev_notify_attributes; 337 break; 338 } 339 #endif 340 default: 341 function = NULL; 342 attrp = NULL; 343 break; 344 } 345 346 if (function == NULL) 347 error = EINVAL; 348 else if (pthread_attr_equal(attrp, tcdp->tcd_attrp)) 349 error = sigev_add_work(tcdp, function, argument); 350 else { 351 /* 352 * The attributes don't match. 353 * Spawn a thread with the non-matching attributes. 354 */ 355 pthread_attr_t local_attr; 356 sigev_thread_data_t *stdp; 357 358 if ((stdp = lmalloc(sizeof (*stdp))) == NULL) 359 error = ENOMEM; 360 else 361 error = pthread_attr_clone(&local_attr, attrp); 362 363 if (error == 0) { 364 (void) pthread_attr_setdetachstate( 365 &local_attr, PTHREAD_CREATE_DETACHED); 366 (void) pthread_attr_setdaemonstate_np( 367 &local_attr, PTHREAD_CREATE_DAEMON_NP); 368 stdp->std_func = function; 369 stdp->std_arg = argument; 370 error = pthread_create(NULL, &local_attr, 371 notify_thread, stdp); 372 (void) pthread_attr_destroy(&local_attr); 373 } 374 if (error && stdp != NULL) 375 lfree(stdp, sizeof (*stdp)); 376 } 377 378 if (error) { 379 dprintf("Cannot add work, error=%d <%s>.\n", 380 error, strerror(error)); 381 if (error == EAGAIN || error == ENOMEM) { 382 /* (Temporary) no resources are available. */ 383 if (_port_dispatch(tcdp->tcd_port, 0, 384 PORT_SOURCE_AIO, port_event.portev_events, 385 port_event.portev_object, 386 port_event.portev_user) != 0) 387 break; 388 error = 0; 389 delta.tv_sec = 0; 390 delta.tv_nsec = NANOSEC / 20; /* 50 msec */ 391 (void) nanosleep(&delta, NULL); 392 } 393 } 394 } 395 out: 396 pthread_cleanup_pop(1); 397 return (NULL); 398 } 399 400 /* 401 * Allocate a thread_communication_data_t block. 402 */ 403 static thread_communication_data_t * 404 alloc_sigev_handler(subsystem_t caller) 405 { 406 thread_communication_data_t *tcdp; 407 408 if ((tcdp = lmalloc(sizeof (*tcdp))) != NULL) { 409 tcdp->tcd_subsystem = caller; 410 tcdp->tcd_port = -1; 411 (void) mutex_init(&tcdp->tcd_lock, USYNC_THREAD, NULL); 412 (void) cond_init(&tcdp->tcd_cv, USYNC_THREAD, NULL); 413 } 414 return (tcdp); 415 } 416 417 /* 418 * Free a thread_communication_data_t block. 419 */ 420 void 421 free_sigev_handler(thread_communication_data_t *tcdp) 422 { 423 if (tcdp->tcd_attrp) { 424 (void) pthread_attr_destroy(tcdp->tcd_attrp); 425 tcdp->tcd_attrp = NULL; 426 } 427 (void) memset(&tcdp->tcd_notif, 0, sizeof (tcdp->tcd_notif)); 428 429 switch (tcdp->tcd_subsystem) { 430 case TIMER: 431 case AIO: 432 if (tcdp->tcd_port >= 0) 433 (void) close(tcdp->tcd_port); 434 break; 435 case MQ: 436 tcdp->tcd_msg_avail = NULL; 437 tcdp->tcd_msg_object = NULL; 438 tcdp->tcd_msg_userval = NULL; 439 tcdp->tcd_msg_enabled = 0; 440 break; 441 } 442 443 lfree(tcdp, sizeof (*tcdp)); 444 } 445 446 /* 447 * Initialize data structure and create the port. 448 */ 449 thread_communication_data_t * 450 setup_sigev_handler(const struct sigevent *sigevp, subsystem_t caller) 451 { 452 thread_communication_data_t *tcdp; 453 int error; 454 455 if (sigevp == NULL) { 456 errno = EINVAL; 457 return (NULL); 458 } 459 460 if ((tcdp = alloc_sigev_handler(caller)) == NULL) { 461 errno = ENOMEM; 462 return (NULL); 463 } 464 465 if (sigevp->sigev_notify_attributes == NULL) 466 tcdp->tcd_attrp = NULL; /* default attributes */ 467 else { 468 /* 469 * We cannot just copy the sigevp->sigev_notify_attributes 470 * pointer. We need to initialize a new pthread_attr_t 471 * structure with the values from the user-supplied 472 * pthread_attr_t. 473 */ 474 tcdp->tcd_attrp = &tcdp->tcd_user_attr; 475 error = pthread_attr_clone(tcdp->tcd_attrp, 476 sigevp->sigev_notify_attributes); 477 if (error) { 478 tcdp->tcd_attrp = NULL; 479 free_sigev_handler(tcdp); 480 errno = error; 481 return (NULL); 482 } 483 } 484 tcdp->tcd_notif = *sigevp; 485 tcdp->tcd_notif.sigev_notify_attributes = tcdp->tcd_attrp; 486 487 if (caller == TIMER || caller == AIO) { 488 if ((tcdp->tcd_port = port_create()) < 0 || 489 fcntl(tcdp->tcd_port, FD_CLOEXEC) == -1) { 490 free_sigev_handler(tcdp); 491 errno = EBADF; 492 return (NULL); 493 } 494 } 495 return (tcdp); 496 } 497 498 /* 499 * Create a thread pool and launch the spawner. 500 */ 501 int 502 launch_spawner(thread_communication_data_t *tcdp) 503 { 504 int ret; 505 int maxworkers; 506 void *(*spawner)(void *); 507 sigset_t set; 508 sigset_t oset; 509 510 switch (tcdp->tcd_subsystem) { 511 case TIMER: 512 spawner = timer_spawner; 513 maxworkers = 1; 514 break; 515 case MQ: 516 spawner = mqueue_spawner; 517 maxworkers = 1; 518 break; 519 case AIO: 520 spawner = aio_spawner; 521 maxworkers = 100; 522 break; 523 default: 524 return (-1); 525 } 526 tcdp->tcd_poolp = tpool_create(1, maxworkers, 20, 527 tcdp->tcd_notif.sigev_notify_attributes); 528 if (tcdp->tcd_poolp == NULL) 529 return (-1); 530 /* create the spawner with all signals blocked */ 531 (void) sigfillset(&set); 532 (void) thr_sigsetmask(SIG_SETMASK, &set, &oset); 533 ret = thr_create(NULL, 0, spawner, tcdp, 534 THR_DETACHED | THR_DAEMON, &tcdp->tcd_server_id); 535 (void) thr_sigsetmask(SIG_SETMASK, &oset, NULL); 536 if (ret != 0) { 537 tpool_destroy(tcdp->tcd_poolp); 538 tcdp->tcd_poolp = NULL; 539 return (-1); 540 } 541 return (0); 542 } 543 544 /* 545 * Delete the data associated with the sigev_thread timer, if timer is 546 * associated with such a notification option. 547 * Destroy the timer_spawner thread. 548 */ 549 int 550 del_sigev_timer(timer_t timer) 551 { 552 int rc = 0; 553 thread_communication_data_t *tcdp; 554 555 if ((uint_t)timer < timer_max && (tcdp = timer_tcd[timer]) != NULL) { 556 sig_mutex_lock(&tcdp->tcd_lock); 557 if (tcdp->tcd_port >= 0) { 558 if ((rc = port_alert(tcdp->tcd_port, 559 PORT_ALERT_SET, SIGEV_THREAD_TERM, NULL)) == 0) { 560 dprintf("del_sigev_timer(%d) OK.\n", timer); 561 } 562 } 563 timer_tcd[timer] = NULL; 564 sig_mutex_unlock(&tcdp->tcd_lock); 565 } 566 return (rc); 567 } 568 569 int 570 sigev_timer_getoverrun(timer_t timer) 571 { 572 thread_communication_data_t *tcdp; 573 574 if ((uint_t)timer < timer_max && (tcdp = timer_tcd[timer]) != NULL) 575 return (tcdp->tcd_overruns); 576 return (0); 577 } 578 579 static void 580 del_sigev_mq_cleanup(thread_communication_data_t *tcdp) 581 { 582 sig_mutex_unlock(&tcdp->tcd_lock); 583 free_sigev_handler(tcdp); 584 } 585 586 /* 587 * Delete the data associated with the sigev_thread message queue, 588 * if the message queue is associated with such a notification option. 589 * Destroy the mqueue_spawner thread. 590 */ 591 void 592 del_sigev_mq(thread_communication_data_t *tcdp) 593 { 594 pthread_t server_id; 595 int rc; 596 597 sig_mutex_lock(&tcdp->tcd_lock); 598 599 server_id = tcdp->tcd_server_id; 600 tcdp->tcd_msg_closing = 1; 601 if ((rc = pthread_cancel(server_id)) != 0) { /* "can't happen" */ 602 sig_mutex_unlock(&tcdp->tcd_lock); 603 dprintf("Fail to cancel %u with error %d <%s>.\n", 604 server_id, rc, strerror(rc)); 605 return; 606 } 607 608 /* 609 * wait for sigev_destroy_pool() to finish 610 */ 611 pthread_cleanup_push(del_sigev_mq_cleanup, tcdp); 612 while (tcdp->tcd_server_id == server_id) 613 (void) sig_cond_wait(&tcdp->tcd_cv, &tcdp->tcd_lock); 614 pthread_cleanup_pop(1); 615 } 616 617 /* 618 * POSIX aio: 619 * If the notification type is SIGEV_THREAD, set up 620 * the port number for notifications. Create the 621 * thread pool and launch the spawner if necessary. 622 * If the notification type is not SIGEV_THREAD, do nothing. 623 */ 624 int 625 _aio_sigev_thread_init(struct sigevent *sigevp) 626 { 627 static mutex_t sigev_aio_lock = DEFAULTMUTEX; 628 static cond_t sigev_aio_cv = DEFAULTCV; 629 static int sigev_aio_busy = 0; 630 631 thread_communication_data_t *tcdp; 632 int port; 633 int cancel_state; 634 int rc = 0; 635 636 if (sigevp == NULL || 637 sigevp->sigev_notify != SIGEV_THREAD || 638 sigevp->sigev_notify_function == NULL) 639 return (0); 640 641 lmutex_lock(&sigev_aio_lock); 642 (void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state); 643 while (sigev_aio_busy) 644 (void) cond_wait(&sigev_aio_cv, &sigev_aio_lock); 645 (void) pthread_setcancelstate(cancel_state, NULL); 646 if ((tcdp = sigev_aio_tcd) != NULL) 647 port = tcdp->tcd_port; 648 else { 649 sigev_aio_busy = 1; 650 lmutex_unlock(&sigev_aio_lock); 651 652 tcdp = setup_sigev_handler(sigevp, AIO); 653 if (tcdp == NULL) { 654 port = -1; 655 rc = -1; 656 } else if (launch_spawner(tcdp) != 0) { 657 free_sigev_handler(tcdp); 658 tcdp = NULL; 659 port = -1; 660 rc = -1; 661 } else { 662 port = tcdp->tcd_port; 663 } 664 665 lmutex_lock(&sigev_aio_lock); 666 sigev_aio_tcd = tcdp; 667 sigev_aio_busy = 0; 668 (void) cond_broadcast(&sigev_aio_cv); 669 } 670 lmutex_unlock(&sigev_aio_lock); 671 sigevp->sigev_signo = port; 672 return (rc); 673 } 674 675 int 676 _aio_sigev_thread(aiocb_t *aiocbp) 677 { 678 if (aiocbp == NULL) 679 return (0); 680 return (_aio_sigev_thread_init(&aiocbp->aio_sigevent)); 681 } 682 683 #if !defined(_LP64) 684 int 685 _aio_sigev_thread64(aiocb64_t *aiocbp) 686 { 687 if (aiocbp == NULL) 688 return (0); 689 return (_aio_sigev_thread_init(&aiocbp->aio_sigevent)); 690 } 691 #endif 692 693 /* 694 * Cleanup POSIX aio after fork1() in the child process. 695 */ 696 void 697 postfork1_child_sigev_aio(void) 698 { 699 thread_communication_data_t *tcdp; 700 701 if ((tcdp = sigev_aio_tcd) != NULL) { 702 sigev_aio_tcd = NULL; 703 tcd_teardown(tcdp); 704 } 705 } 706 707 /* 708 * Utility function for the various postfork1_child_sigev_*() functions. 709 * Clean up the tcdp data structure and close the port. 710 */ 711 void 712 tcd_teardown(thread_communication_data_t *tcdp) 713 { 714 if (tcdp->tcd_poolp != NULL) 715 tpool_abandon(tcdp->tcd_poolp); 716 tcdp->tcd_poolp = NULL; 717 tcdp->tcd_server_id = 0; 718 free_sigev_handler(tcdp); 719 } 720