1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 22 /* 23 * Copyright 2008 Sun Microsystems, Inc. All rights reserved. 24 * Use is subject to license terms. 25 */ 26 27 #include "lint.h" 28 #include "thr_uberdata.h" 29 #include <sys/types.h> 30 #include <pthread.h> 31 #include <unistd.h> 32 #include <stdlib.h> 33 #include <thread.h> 34 #include <pthread.h> 35 #include <synch.h> 36 #include <port.h> 37 #include <signal.h> 38 #include <stdio.h> 39 #include <errno.h> 40 #include <stdarg.h> 41 #include <string.h> 42 #include <sys/aiocb.h> 43 #include <time.h> 44 #include <signal.h> 45 #include <fcntl.h> 46 #include "sigev_thread.h" 47 48 /* 49 * There is but one spawner for all aio operations. 50 */ 51 thread_communication_data_t *sigev_aio_tcd = NULL; 52 53 /* 54 * Set non-zero via _RT_DEBUG to enable debugging printf's. 55 */ 56 static int _rt_debug = 0; 57 58 void 59 init_sigev_thread(void) 60 { 61 char *ldebug; 62 63 if ((ldebug = getenv("_RT_DEBUG")) != NULL) 64 _rt_debug = atoi(ldebug); 65 } 66 67 /* 68 * Routine to print debug messages: 69 * If _rt_debug is set, printf the debug message to stderr 70 * with an appropriate prefix. 71 */ 72 /*PRINTFLIKE1*/ 73 static void 74 dprintf(const char *format, ...) 75 { 76 if (_rt_debug) { 77 va_list alist; 78 79 va_start(alist, format); 80 flockfile(stderr); 81 pthread_cleanup_push(funlockfile, stderr); 82 (void) fputs("DEBUG: ", stderr); 83 (void) vfprintf(stderr, format, alist); 84 pthread_cleanup_pop(1); /* funlockfile(stderr) */ 85 va_end(alist); 86 } 87 } 88 89 /* 90 * The notify_thread() function can be used as the start function of a new 91 * thread but it is normally called from notifier(), below, in the context 92 * of a thread pool worker thread. It is used as the start function of a 93 * new thread only when individual pthread attributes differ from those 94 * that are common to all workers. This only occurs in the AIO case. 95 */ 96 static void * 97 notify_thread(void *arg) 98 { 99 sigev_thread_data_t *stdp = arg; 100 void (*function)(union sigval) = stdp->std_func; 101 union sigval argument = stdp->std_arg; 102 103 lfree(stdp, sizeof (*stdp)); 104 function(argument); 105 return (NULL); 106 } 107 108 /* 109 * Thread pool interface to call the user-supplied notification function. 110 */ 111 static void 112 notifier(void *arg) 113 { 114 (void) notify_thread(arg); 115 } 116 117 /* 118 * This routine adds a new work request, described by function 119 * and argument, to the list of outstanding jobs. 120 * It returns 0 indicating success. A value != 0 indicates an error. 121 */ 122 static int 123 sigev_add_work(thread_communication_data_t *tcdp, 124 void (*function)(union sigval), union sigval argument) 125 { 126 tpool_t *tpool = tcdp->tcd_poolp; 127 sigev_thread_data_t *stdp; 128 129 if (tpool == NULL) 130 return (EINVAL); 131 if ((stdp = lmalloc(sizeof (*stdp))) == NULL) 132 return (errno); 133 stdp->std_func = function; 134 stdp->std_arg = argument; 135 if (tpool_dispatch(tpool, notifier, stdp) != 0) { 136 lfree(stdp, sizeof (*stdp)); 137 return (errno); 138 } 139 return (0); 140 } 141 142 static void 143 sigev_destroy_pool(thread_communication_data_t *tcdp) 144 { 145 if (tcdp->tcd_poolp != NULL) 146 tpool_abandon(tcdp->tcd_poolp); 147 tcdp->tcd_poolp = NULL; 148 149 if (tcdp->tcd_subsystem == MQ) { 150 /* 151 * synchronize with del_sigev_mq() 152 */ 153 sig_mutex_lock(&tcdp->tcd_lock); 154 tcdp->tcd_server_id = 0; 155 if (tcdp->tcd_msg_closing) { 156 (void) cond_broadcast(&tcdp->tcd_cv); 157 sig_mutex_unlock(&tcdp->tcd_lock); 158 return; /* del_sigev_mq() will free the tcd */ 159 } 160 sig_mutex_unlock(&tcdp->tcd_lock); 161 } 162 163 /* 164 * now delete everything 165 */ 166 free_sigev_handler(tcdp); 167 } 168 169 /* 170 * timer_spawner(), mqueue_spawner(), and aio_spawner() are the main 171 * functions for the daemon threads that get the event(s) for the 172 * respective SIGEV_THREAD subsystems. There is one timer spawner for 173 * each timer_create(), one mqueue spawner for every mq_open(), and 174 * exactly one aio spawner for all aio requests. These spawners add 175 * work requests to be done by a pool of daemon worker threads. In case 176 * the event requires creation of a worker thread with different pthread 177 * attributes than those from the pool of workers, a new daemon thread 178 * with these attributes is spawned apart from the pool of workers. 179 * If the spawner fails to add work or fails to create an additional 180 * thread because of lacking resources, it puts the event back into 181 * the kernel queue and re-tries some time later. 182 */ 183 184 void * 185 timer_spawner(void *arg) 186 { 187 thread_communication_data_t *tcdp = (thread_communication_data_t *)arg; 188 port_event_t port_event; 189 190 /* destroy the pool if we are cancelled */ 191 pthread_cleanup_push(sigev_destroy_pool, tcdp); 192 193 for (;;) { 194 if (port_get(tcdp->tcd_port, &port_event, NULL) != 0) { 195 dprintf("port_get on port %d failed with %d <%s>\n", 196 tcdp->tcd_port, errno, strerror(errno)); 197 break; 198 } 199 switch (port_event.portev_source) { 200 case PORT_SOURCE_TIMER: 201 break; 202 case PORT_SOURCE_ALERT: 203 if (port_event.portev_events != SIGEV_THREAD_TERM) 204 errno = EPROTO; 205 goto out; 206 default: 207 dprintf("port_get on port %d returned %u " 208 "(not PORT_SOURCE_TIMER)\n", 209 tcdp->tcd_port, port_event.portev_source); 210 errno = EPROTO; 211 goto out; 212 } 213 214 tcdp->tcd_overruns = port_event.portev_events - 1; 215 if (sigev_add_work(tcdp, 216 tcdp->tcd_notif.sigev_notify_function, 217 tcdp->tcd_notif.sigev_value) != 0) 218 break; 219 /* wait until job is done before looking for another */ 220 tpool_wait(tcdp->tcd_poolp); 221 } 222 out: 223 pthread_cleanup_pop(1); 224 return (NULL); 225 } 226 227 void * 228 mqueue_spawner(void *arg) 229 { 230 thread_communication_data_t *tcdp = (thread_communication_data_t *)arg; 231 int ret = 0; 232 int ntype; 233 void (*function)(union sigval); 234 union sigval argument; 235 236 /* destroy the pool if we are cancelled */ 237 pthread_cleanup_push(sigev_destroy_pool, tcdp); 238 239 while (ret == 0) { 240 sig_mutex_lock(&tcdp->tcd_lock); 241 pthread_cleanup_push(sig_mutex_unlock, &tcdp->tcd_lock); 242 while ((ntype = tcdp->tcd_msg_enabled) == 0) 243 (void) sig_cond_wait(&tcdp->tcd_cv, &tcdp->tcd_lock); 244 pthread_cleanup_pop(1); 245 246 while (sem_wait(tcdp->tcd_msg_avail) == -1) 247 continue; 248 249 sig_mutex_lock(&tcdp->tcd_lock); 250 tcdp->tcd_msg_enabled = 0; 251 sig_mutex_unlock(&tcdp->tcd_lock); 252 253 /* ASSERT(ntype == SIGEV_THREAD || ntype == SIGEV_PORT); */ 254 if (ntype == SIGEV_THREAD) { 255 function = tcdp->tcd_notif.sigev_notify_function; 256 argument.sival_ptr = tcdp->tcd_msg_userval; 257 ret = sigev_add_work(tcdp, function, argument); 258 } else { /* ntype == SIGEV_PORT */ 259 ret = _port_dispatch(tcdp->tcd_port, 0, PORT_SOURCE_MQ, 260 0, (uintptr_t)tcdp->tcd_msg_object, 261 tcdp->tcd_msg_userval); 262 } 263 } 264 sig_mutex_unlock(&tcdp->tcd_lock); 265 266 pthread_cleanup_pop(1); 267 return (NULL); 268 } 269 270 void * 271 aio_spawner(void *arg) 272 { 273 thread_communication_data_t *tcdp = (thread_communication_data_t *)arg; 274 int error = 0; 275 void (*function)(union sigval); 276 union sigval argument; 277 port_event_t port_event; 278 struct sigevent *sigevp; 279 timespec_t delta; 280 pthread_attr_t *attrp; 281 282 /* destroy the pool if we are cancelled */ 283 pthread_cleanup_push(sigev_destroy_pool, tcdp); 284 285 while (error == 0) { 286 if (port_get(tcdp->tcd_port, &port_event, NULL) != 0) { 287 error = errno; 288 dprintf("port_get on port %d failed with %d <%s>\n", 289 tcdp->tcd_port, error, strerror(error)); 290 break; 291 } 292 switch (port_event.portev_source) { 293 case PORT_SOURCE_AIO: 294 break; 295 case PORT_SOURCE_ALERT: 296 if (port_event.portev_events != SIGEV_THREAD_TERM) 297 errno = EPROTO; 298 goto out; 299 default: 300 dprintf("port_get on port %d returned %u " 301 "(not PORT_SOURCE_AIO)\n", 302 tcdp->tcd_port, port_event.portev_source); 303 errno = EPROTO; 304 goto out; 305 } 306 argument.sival_ptr = port_event.portev_user; 307 switch (port_event.portev_events) { 308 case AIOLIO: 309 #if !defined(_LP64) 310 case AIOLIO64: 311 #endif 312 sigevp = (struct sigevent *)port_event.portev_object; 313 function = sigevp->sigev_notify_function; 314 attrp = sigevp->sigev_notify_attributes; 315 break; 316 case AIOAREAD: 317 case AIOAWRITE: 318 case AIOFSYNC: 319 { 320 aiocb_t *aiocbp = 321 (aiocb_t *)port_event.portev_object; 322 function = aiocbp->aio_sigevent.sigev_notify_function; 323 attrp = aiocbp->aio_sigevent.sigev_notify_attributes; 324 break; 325 } 326 #if !defined(_LP64) 327 case AIOAREAD64: 328 case AIOAWRITE64: 329 case AIOFSYNC64: 330 { 331 aiocb64_t *aiocbp = 332 (aiocb64_t *)port_event.portev_object; 333 function = aiocbp->aio_sigevent.sigev_notify_function; 334 attrp = aiocbp->aio_sigevent.sigev_notify_attributes; 335 break; 336 } 337 #endif 338 default: 339 function = NULL; 340 attrp = NULL; 341 break; 342 } 343 344 if (function == NULL) 345 error = EINVAL; 346 else if (pthread_attr_equal(attrp, tcdp->tcd_attrp)) 347 error = sigev_add_work(tcdp, function, argument); 348 else { 349 /* 350 * The attributes don't match. 351 * Spawn a thread with the non-matching attributes. 352 */ 353 pthread_attr_t local_attr; 354 sigev_thread_data_t *stdp; 355 356 if ((stdp = lmalloc(sizeof (*stdp))) == NULL) 357 error = ENOMEM; 358 else 359 error = pthread_attr_clone(&local_attr, attrp); 360 361 if (error == 0) { 362 (void) pthread_attr_setdetachstate( 363 &local_attr, PTHREAD_CREATE_DETACHED); 364 (void) pthread_attr_setdaemonstate_np( 365 &local_attr, PTHREAD_CREATE_DAEMON_NP); 366 stdp->std_func = function; 367 stdp->std_arg = argument; 368 error = pthread_create(NULL, &local_attr, 369 notify_thread, stdp); 370 (void) pthread_attr_destroy(&local_attr); 371 } 372 if (error && stdp != NULL) 373 lfree(stdp, sizeof (*stdp)); 374 } 375 376 if (error) { 377 dprintf("Cannot add work, error=%d <%s>.\n", 378 error, strerror(error)); 379 if (error == EAGAIN || error == ENOMEM) { 380 /* (Temporary) no resources are available. */ 381 if (_port_dispatch(tcdp->tcd_port, 0, 382 PORT_SOURCE_AIO, port_event.portev_events, 383 port_event.portev_object, 384 port_event.portev_user) != 0) 385 break; 386 error = 0; 387 delta.tv_sec = 0; 388 delta.tv_nsec = NANOSEC / 20; /* 50 msec */ 389 (void) nanosleep(&delta, NULL); 390 } 391 } 392 } 393 out: 394 pthread_cleanup_pop(1); 395 return (NULL); 396 } 397 398 /* 399 * Allocate a thread_communication_data_t block. 400 */ 401 static thread_communication_data_t * 402 alloc_sigev_handler(subsystem_t caller) 403 { 404 thread_communication_data_t *tcdp; 405 406 if ((tcdp = lmalloc(sizeof (*tcdp))) != NULL) { 407 tcdp->tcd_subsystem = caller; 408 tcdp->tcd_port = -1; 409 (void) mutex_init(&tcdp->tcd_lock, USYNC_THREAD, NULL); 410 (void) cond_init(&tcdp->tcd_cv, USYNC_THREAD, NULL); 411 } 412 return (tcdp); 413 } 414 415 /* 416 * Free a thread_communication_data_t block. 417 */ 418 void 419 free_sigev_handler(thread_communication_data_t *tcdp) 420 { 421 if (tcdp->tcd_attrp) { 422 (void) pthread_attr_destroy(tcdp->tcd_attrp); 423 tcdp->tcd_attrp = NULL; 424 } 425 (void) memset(&tcdp->tcd_notif, 0, sizeof (tcdp->tcd_notif)); 426 427 switch (tcdp->tcd_subsystem) { 428 case TIMER: 429 case AIO: 430 if (tcdp->tcd_port >= 0) 431 (void) close(tcdp->tcd_port); 432 break; 433 case MQ: 434 tcdp->tcd_msg_avail = NULL; 435 tcdp->tcd_msg_object = NULL; 436 tcdp->tcd_msg_userval = NULL; 437 tcdp->tcd_msg_enabled = 0; 438 break; 439 } 440 441 lfree(tcdp, sizeof (*tcdp)); 442 } 443 444 /* 445 * Initialize data structure and create the port. 446 */ 447 thread_communication_data_t * 448 setup_sigev_handler(const struct sigevent *sigevp, subsystem_t caller) 449 { 450 thread_communication_data_t *tcdp; 451 int error; 452 453 if (sigevp == NULL) { 454 errno = EINVAL; 455 return (NULL); 456 } 457 458 if ((tcdp = alloc_sigev_handler(caller)) == NULL) { 459 errno = ENOMEM; 460 return (NULL); 461 } 462 463 if (sigevp->sigev_notify_attributes == NULL) 464 tcdp->tcd_attrp = NULL; /* default attributes */ 465 else { 466 /* 467 * We cannot just copy the sigevp->sigev_notify_attributes 468 * pointer. We need to initialize a new pthread_attr_t 469 * structure with the values from the user-supplied 470 * pthread_attr_t. 471 */ 472 tcdp->tcd_attrp = &tcdp->tcd_user_attr; 473 error = pthread_attr_clone(tcdp->tcd_attrp, 474 sigevp->sigev_notify_attributes); 475 if (error) { 476 tcdp->tcd_attrp = NULL; 477 free_sigev_handler(tcdp); 478 errno = error; 479 return (NULL); 480 } 481 } 482 tcdp->tcd_notif = *sigevp; 483 tcdp->tcd_notif.sigev_notify_attributes = tcdp->tcd_attrp; 484 485 if (caller == TIMER || caller == AIO) { 486 if ((tcdp->tcd_port = port_create()) < 0 || 487 fcntl(tcdp->tcd_port, FD_CLOEXEC) == -1) { 488 free_sigev_handler(tcdp); 489 errno = EBADF; 490 return (NULL); 491 } 492 } 493 return (tcdp); 494 } 495 496 /* 497 * Create a thread pool and launch the spawner. 498 */ 499 int 500 launch_spawner(thread_communication_data_t *tcdp) 501 { 502 int ret; 503 int maxworkers; 504 void *(*spawner)(void *); 505 sigset_t set; 506 sigset_t oset; 507 508 switch (tcdp->tcd_subsystem) { 509 case TIMER: 510 spawner = timer_spawner; 511 maxworkers = 1; 512 break; 513 case MQ: 514 spawner = mqueue_spawner; 515 maxworkers = 1; 516 break; 517 case AIO: 518 spawner = aio_spawner; 519 maxworkers = 100; 520 break; 521 default: 522 return (-1); 523 } 524 tcdp->tcd_poolp = tpool_create(1, maxworkers, 20, 525 tcdp->tcd_notif.sigev_notify_attributes); 526 if (tcdp->tcd_poolp == NULL) 527 return (-1); 528 /* create the spawner with all signals blocked */ 529 (void) sigfillset(&set); 530 (void) thr_sigsetmask(SIG_SETMASK, &set, &oset); 531 ret = thr_create(NULL, 0, spawner, tcdp, 532 THR_DETACHED | THR_DAEMON, &tcdp->tcd_server_id); 533 (void) thr_sigsetmask(SIG_SETMASK, &oset, NULL); 534 if (ret != 0) { 535 tpool_destroy(tcdp->tcd_poolp); 536 tcdp->tcd_poolp = NULL; 537 return (-1); 538 } 539 return (0); 540 } 541 542 /* 543 * Delete the data associated with the sigev_thread timer, if timer is 544 * associated with such a notification option. 545 * Destroy the timer_spawner thread. 546 */ 547 int 548 del_sigev_timer(timer_t timer) 549 { 550 int rc = 0; 551 thread_communication_data_t *tcdp; 552 553 if ((uint_t)timer < timer_max && (tcdp = timer_tcd[timer]) != NULL) { 554 sig_mutex_lock(&tcdp->tcd_lock); 555 if (tcdp->tcd_port >= 0) { 556 if ((rc = port_alert(tcdp->tcd_port, 557 PORT_ALERT_SET, SIGEV_THREAD_TERM, NULL)) == 0) { 558 dprintf("del_sigev_timer(%d) OK.\n", timer); 559 } 560 } 561 timer_tcd[timer] = NULL; 562 sig_mutex_unlock(&tcdp->tcd_lock); 563 } 564 return (rc); 565 } 566 567 int 568 sigev_timer_getoverrun(timer_t timer) 569 { 570 thread_communication_data_t *tcdp; 571 572 if ((uint_t)timer < timer_max && (tcdp = timer_tcd[timer]) != NULL) 573 return (tcdp->tcd_overruns); 574 return (0); 575 } 576 577 static void 578 del_sigev_mq_cleanup(thread_communication_data_t *tcdp) 579 { 580 sig_mutex_unlock(&tcdp->tcd_lock); 581 free_sigev_handler(tcdp); 582 } 583 584 /* 585 * Delete the data associated with the sigev_thread message queue, 586 * if the message queue is associated with such a notification option. 587 * Destroy the mqueue_spawner thread. 588 */ 589 void 590 del_sigev_mq(thread_communication_data_t *tcdp) 591 { 592 pthread_t server_id; 593 int rc; 594 595 sig_mutex_lock(&tcdp->tcd_lock); 596 597 server_id = tcdp->tcd_server_id; 598 tcdp->tcd_msg_closing = 1; 599 if ((rc = pthread_cancel(server_id)) != 0) { /* "can't happen" */ 600 sig_mutex_unlock(&tcdp->tcd_lock); 601 dprintf("Fail to cancel %u with error %d <%s>.\n", 602 server_id, rc, strerror(rc)); 603 return; 604 } 605 606 /* 607 * wait for sigev_destroy_pool() to finish 608 */ 609 pthread_cleanup_push(del_sigev_mq_cleanup, tcdp); 610 while (tcdp->tcd_server_id == server_id) 611 (void) sig_cond_wait(&tcdp->tcd_cv, &tcdp->tcd_lock); 612 pthread_cleanup_pop(1); 613 } 614 615 /* 616 * POSIX aio: 617 * If the notification type is SIGEV_THREAD, set up 618 * the port number for notifications. Create the 619 * thread pool and launch the spawner if necessary. 620 * If the notification type is not SIGEV_THREAD, do nothing. 621 */ 622 int 623 _aio_sigev_thread_init(struct sigevent *sigevp) 624 { 625 static mutex_t sigev_aio_lock = DEFAULTMUTEX; 626 static cond_t sigev_aio_cv = DEFAULTCV; 627 static int sigev_aio_busy = 0; 628 629 thread_communication_data_t *tcdp; 630 int port; 631 int cancel_state; 632 int rc = 0; 633 634 if (sigevp == NULL || 635 sigevp->sigev_notify != SIGEV_THREAD || 636 sigevp->sigev_notify_function == NULL) 637 return (0); 638 639 lmutex_lock(&sigev_aio_lock); 640 (void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state); 641 while (sigev_aio_busy) 642 (void) cond_wait(&sigev_aio_cv, &sigev_aio_lock); 643 (void) pthread_setcancelstate(cancel_state, NULL); 644 if ((tcdp = sigev_aio_tcd) != NULL) 645 port = tcdp->tcd_port; 646 else { 647 sigev_aio_busy = 1; 648 lmutex_unlock(&sigev_aio_lock); 649 650 tcdp = setup_sigev_handler(sigevp, AIO); 651 if (tcdp == NULL) { 652 port = -1; 653 rc = -1; 654 } else if (launch_spawner(tcdp) != 0) { 655 free_sigev_handler(tcdp); 656 tcdp = NULL; 657 port = -1; 658 rc = -1; 659 } else { 660 port = tcdp->tcd_port; 661 } 662 663 lmutex_lock(&sigev_aio_lock); 664 sigev_aio_tcd = tcdp; 665 sigev_aio_busy = 0; 666 (void) cond_broadcast(&sigev_aio_cv); 667 } 668 lmutex_unlock(&sigev_aio_lock); 669 sigevp->sigev_signo = port; 670 return (rc); 671 } 672 673 int 674 _aio_sigev_thread(aiocb_t *aiocbp) 675 { 676 if (aiocbp == NULL) 677 return (0); 678 return (_aio_sigev_thread_init(&aiocbp->aio_sigevent)); 679 } 680 681 #if !defined(_LP64) 682 int 683 _aio_sigev_thread64(aiocb64_t *aiocbp) 684 { 685 if (aiocbp == NULL) 686 return (0); 687 return (_aio_sigev_thread_init(&aiocbp->aio_sigevent)); 688 } 689 #endif 690 691 /* 692 * Cleanup POSIX aio after fork1() in the child process. 693 */ 694 void 695 postfork1_child_sigev_aio(void) 696 { 697 thread_communication_data_t *tcdp; 698 699 if ((tcdp = sigev_aio_tcd) != NULL) { 700 sigev_aio_tcd = NULL; 701 tcd_teardown(tcdp); 702 } 703 } 704 705 /* 706 * Utility function for the various postfork1_child_sigev_*() functions. 707 * Clean up the tcdp data structure and close the port. 708 */ 709 void 710 tcd_teardown(thread_communication_data_t *tcdp) 711 { 712 if (tcdp->tcd_poolp != NULL) 713 tpool_abandon(tcdp->tcd_poolp); 714 tcdp->tcd_poolp = NULL; 715 tcdp->tcd_server_id = 0; 716 free_sigev_handler(tcdp); 717 } 718