1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 22 /* 23 * Copyright 2008 Sun Microsystems, Inc. All rights reserved. 24 * Use is subject to license terms. 25 */ 26 27 #pragma ident "%Z%%M% %I% %E% SMI" 28 29 #include "lint.h" 30 #include "thr_uberdata.h" 31 #include <stdlib.h> 32 #include <signal.h> 33 #include <errno.h> 34 #include "thread_pool_impl.h" 35 36 static mutex_t thread_pool_lock = DEFAULTMUTEX; 37 static tpool_t *thread_pools = NULL; 38 39 static void 40 delete_pool(tpool_t *tpool) 41 { 42 tpool_job_t *job; 43 44 ASSERT(tpool->tp_current == 0 && tpool->tp_active == NULL); 45 46 /* 47 * Unlink the pool from the global list of all pools. 48 */ 49 lmutex_lock(&thread_pool_lock); 50 if (thread_pools == tpool) 51 thread_pools = tpool->tp_forw; 52 if (thread_pools == tpool) 53 thread_pools = NULL; 54 else { 55 tpool->tp_back->tp_forw = tpool->tp_forw; 56 tpool->tp_forw->tp_back = tpool->tp_back; 57 } 58 lmutex_unlock(&thread_pool_lock); 59 60 /* 61 * There should be no pending jobs, but just in case... 62 */ 63 for (job = tpool->tp_head; job != NULL; job = tpool->tp_head) { 64 tpool->tp_head = job->tpj_next; 65 lfree(job, sizeof (*job)); 66 } 67 (void) pthread_attr_destroy(&tpool->tp_attr); 68 lfree(tpool, sizeof (*tpool)); 69 } 70 71 /* 72 * Worker thread is terminating. 73 */ 74 static void 75 worker_cleanup(tpool_t *tpool) 76 { 77 ASSERT(MUTEX_HELD(&tpool->tp_mutex)); 78 79 if (--tpool->tp_current == 0 && 80 (tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) { 81 if (tpool->tp_flags & TP_ABANDON) { 82 sig_mutex_unlock(&tpool->tp_mutex); 83 delete_pool(tpool); 84 return; 85 } 86 if (tpool->tp_flags & TP_DESTROY) 87 (void) cond_broadcast(&tpool->tp_busycv); 88 } 89 sig_mutex_unlock(&tpool->tp_mutex); 90 } 91 92 static void 93 notify_waiters(tpool_t *tpool) 94 { 95 if (tpool->tp_head == NULL && tpool->tp_active == NULL) { 96 tpool->tp_flags &= ~TP_WAIT; 97 (void) cond_broadcast(&tpool->tp_waitcv); 98 } 99 } 100 101 /* 102 * Called by a worker thread on return from a tpool_dispatch()d job. 103 */ 104 static void 105 job_cleanup(tpool_t *tpool) 106 { 107 pthread_t my_tid = pthread_self(); 108 tpool_active_t *activep; 109 tpool_active_t **activepp; 110 111 sig_mutex_lock(&tpool->tp_mutex); 112 /* CSTYLED */ 113 for (activepp = &tpool->tp_active;; activepp = &activep->tpa_next) { 114 activep = *activepp; 115 if (activep->tpa_tid == my_tid) { 116 *activepp = activep->tpa_next; 117 break; 118 } 119 } 120 if (tpool->tp_flags & TP_WAIT) 121 notify_waiters(tpool); 122 } 123 124 static void * 125 tpool_worker(void *arg) 126 { 127 tpool_t *tpool = (tpool_t *)arg; 128 int elapsed; 129 tpool_job_t *job; 130 void (*func)(void *); 131 tpool_active_t active; 132 133 sig_mutex_lock(&tpool->tp_mutex); 134 pthread_cleanup_push(worker_cleanup, tpool); 135 136 /* 137 * This is the worker's main loop. 138 * It will only be left if a timeout or an error has occured. 139 */ 140 active.tpa_tid = pthread_self(); 141 for (;;) { 142 elapsed = 0; 143 tpool->tp_idle++; 144 if (tpool->tp_flags & TP_WAIT) 145 notify_waiters(tpool); 146 while ((tpool->tp_head == NULL || 147 (tpool->tp_flags & TP_SUSPEND)) && 148 !(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) { 149 if (tpool->tp_current <= tpool->tp_minimum || 150 tpool->tp_linger == 0) { 151 (void) sig_cond_wait(&tpool->tp_workcv, 152 &tpool->tp_mutex); 153 } else { 154 timestruc_t timeout; 155 156 timeout.tv_sec = tpool->tp_linger; 157 timeout.tv_nsec = 0; 158 if (sig_cond_reltimedwait(&tpool->tp_workcv, 159 &tpool->tp_mutex, &timeout) != 0) { 160 elapsed = 1; 161 break; 162 } 163 } 164 } 165 tpool->tp_idle--; 166 if (tpool->tp_flags & TP_DESTROY) 167 break; 168 if (tpool->tp_flags & TP_ABANDON) { 169 /* can't abandon a suspended pool */ 170 if (tpool->tp_flags & TP_SUSPEND) { 171 tpool->tp_flags &= ~TP_SUSPEND; 172 (void) cond_broadcast(&tpool->tp_workcv); 173 } 174 if (tpool->tp_head == NULL) 175 break; 176 } 177 if ((job = tpool->tp_head) != NULL && 178 !(tpool->tp_flags & TP_SUSPEND)) { 179 elapsed = 0; 180 func = job->tpj_func; 181 arg = job->tpj_arg; 182 tpool->tp_head = job->tpj_next; 183 if (job == tpool->tp_tail) 184 tpool->tp_tail = NULL; 185 tpool->tp_njobs--; 186 active.tpa_next = tpool->tp_active; 187 tpool->tp_active = &active; 188 sig_mutex_unlock(&tpool->tp_mutex); 189 pthread_cleanup_push(job_cleanup, tpool); 190 lfree(job, sizeof (*job)); 191 /* 192 * Call the specified function. 193 */ 194 func(arg); 195 /* 196 * We don't know what this thread has been doing, 197 * so we reset its signal mask and cancellation 198 * state back to the initial values. 199 */ 200 (void) pthread_sigmask(SIG_SETMASK, &maskset, NULL); 201 (void) pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, 202 NULL); 203 (void) pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, 204 NULL); 205 pthread_cleanup_pop(1); 206 } 207 if (elapsed && tpool->tp_current > tpool->tp_minimum) { 208 /* 209 * We timed out and there is no work to be done 210 * and the number of workers exceeds the minimum. 211 * Exit now to reduce the size of the pool. 212 */ 213 break; 214 } 215 } 216 pthread_cleanup_pop(1); 217 return (arg); 218 } 219 220 /* 221 * Create a worker thread, with all signals blocked. 222 */ 223 static int 224 create_worker(tpool_t *tpool) 225 { 226 sigset_t oset; 227 int error; 228 229 (void) pthread_sigmask(SIG_SETMASK, &maskset, &oset); 230 error = pthread_create(NULL, &tpool->tp_attr, tpool_worker, tpool); 231 (void) pthread_sigmask(SIG_SETMASK, &oset, NULL); 232 return (error); 233 } 234 235 tpool_t * 236 tpool_create(uint_t min_threads, uint_t max_threads, uint_t linger, 237 pthread_attr_t *attr) 238 { 239 tpool_t *tpool; 240 void *stackaddr; 241 size_t stacksize; 242 size_t minstack; 243 int error; 244 245 if (min_threads > max_threads || max_threads < 1) { 246 errno = EINVAL; 247 return (NULL); 248 } 249 if (attr != NULL) { 250 if (pthread_attr_getstack(attr, &stackaddr, &stacksize) != 0) { 251 errno = EINVAL; 252 return (NULL); 253 } 254 /* 255 * Allow only one thread in the pool with a specified stack. 256 * Require threads to have at least the minimum stack size. 257 */ 258 minstack = thr_min_stack(); 259 if (stackaddr != NULL) { 260 if (stacksize < minstack || max_threads != 1) { 261 errno = EINVAL; 262 return (NULL); 263 } 264 } else if (stacksize != 0 && stacksize < minstack) { 265 errno = EINVAL; 266 return (NULL); 267 } 268 } 269 270 tpool = lmalloc(sizeof (*tpool)); 271 if (tpool == NULL) { 272 errno = ENOMEM; 273 return (NULL); 274 } 275 (void) mutex_init(&tpool->tp_mutex, USYNC_THREAD, NULL); 276 (void) cond_init(&tpool->tp_busycv, USYNC_THREAD, NULL); 277 (void) cond_init(&tpool->tp_workcv, USYNC_THREAD, NULL); 278 (void) cond_init(&tpool->tp_waitcv, USYNC_THREAD, NULL); 279 tpool->tp_minimum = min_threads; 280 tpool->tp_maximum = max_threads; 281 tpool->tp_linger = linger; 282 283 /* 284 * We cannot just copy the attribute pointer. 285 * We need to initialize a new pthread_attr_t structure 286 * with the values from the user-supplied pthread_attr_t. 287 * If the attribute pointer is NULL, we need to initialize 288 * the new pthread_attr_t structure with default values. 289 */ 290 error = pthread_attr_clone(&tpool->tp_attr, attr); 291 if (error) { 292 lfree(tpool, sizeof (*tpool)); 293 errno = error; 294 return (NULL); 295 } 296 297 /* make all pool threads be detached daemon threads */ 298 (void) pthread_attr_setdetachstate(&tpool->tp_attr, 299 PTHREAD_CREATE_DETACHED); 300 (void) pthread_attr_setdaemonstate_np(&tpool->tp_attr, 301 PTHREAD_CREATE_DAEMON_NP); 302 303 /* insert into the global list of all thread pools */ 304 lmutex_lock(&thread_pool_lock); 305 if (thread_pools == NULL) { 306 tpool->tp_forw = tpool; 307 tpool->tp_back = tpool; 308 thread_pools = tpool; 309 } else { 310 thread_pools->tp_back->tp_forw = tpool; 311 tpool->tp_forw = thread_pools; 312 tpool->tp_back = thread_pools->tp_back; 313 thread_pools->tp_back = tpool; 314 } 315 lmutex_unlock(&thread_pool_lock); 316 317 return (tpool); 318 } 319 320 /* 321 * Dispatch a work request to the thread pool. 322 * If there are idle workers, awaken one. 323 * Else, if the maximum number of workers has 324 * not been reached, spawn a new worker thread. 325 * Else just return with the job added to the queue. 326 */ 327 int 328 tpool_dispatch(tpool_t *tpool, void (*func)(void *), void *arg) 329 { 330 tpool_job_t *job; 331 332 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); 333 334 if ((job = lmalloc(sizeof (*job))) == NULL) 335 return (-1); 336 job->tpj_next = NULL; 337 job->tpj_func = func; 338 job->tpj_arg = arg; 339 340 sig_mutex_lock(&tpool->tp_mutex); 341 342 if (tpool->tp_head == NULL) 343 tpool->tp_head = job; 344 else 345 tpool->tp_tail->tpj_next = job; 346 tpool->tp_tail = job; 347 tpool->tp_njobs++; 348 349 if (!(tpool->tp_flags & TP_SUSPEND)) { 350 if (tpool->tp_idle > 0) 351 (void) cond_signal(&tpool->tp_workcv); 352 else if (tpool->tp_current < tpool->tp_maximum && 353 create_worker(tpool) == 0) 354 tpool->tp_current++; 355 } 356 357 sig_mutex_unlock(&tpool->tp_mutex); 358 return (0); 359 } 360 361 /* 362 * Assumes: by the time tpool_destroy() is called no one will use this 363 * thread pool in any way and no one will try to dispatch entries to it. 364 * Calling tpool_destroy() from a job in the pool will cause deadlock. 365 */ 366 void 367 tpool_destroy(tpool_t *tpool) 368 { 369 tpool_active_t *activep; 370 371 ASSERT(!tpool_member(tpool)); 372 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); 373 374 sig_mutex_lock(&tpool->tp_mutex); 375 pthread_cleanup_push(sig_mutex_unlock, &tpool->tp_mutex); 376 377 /* mark the pool as being destroyed; wakeup idle workers */ 378 tpool->tp_flags |= TP_DESTROY; 379 tpool->tp_flags &= ~TP_SUSPEND; 380 (void) cond_broadcast(&tpool->tp_workcv); 381 382 /* cancel all active workers */ 383 for (activep = tpool->tp_active; activep; activep = activep->tpa_next) 384 (void) pthread_cancel(activep->tpa_tid); 385 386 /* wait for all active workers to finish */ 387 while (tpool->tp_active != NULL) { 388 tpool->tp_flags |= TP_WAIT; 389 (void) sig_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex); 390 } 391 392 /* the last worker to terminate will wake us up */ 393 while (tpool->tp_current != 0) 394 (void) sig_cond_wait(&tpool->tp_busycv, &tpool->tp_mutex); 395 396 pthread_cleanup_pop(1); /* sig_mutex_unlock(&tpool->tp_mutex); */ 397 delete_pool(tpool); 398 } 399 400 /* 401 * Like tpool_destroy(), but don't cancel workers or wait for them to finish. 402 * The last worker to terminate will delete the pool. 403 */ 404 void 405 tpool_abandon(tpool_t *tpool) 406 { 407 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); 408 409 sig_mutex_lock(&tpool->tp_mutex); 410 if (tpool->tp_current == 0) { 411 /* no workers, just delete the pool */ 412 sig_mutex_unlock(&tpool->tp_mutex); 413 delete_pool(tpool); 414 } else { 415 /* wake up all workers, last one will delete the pool */ 416 tpool->tp_flags |= TP_ABANDON; 417 tpool->tp_flags &= ~TP_SUSPEND; 418 (void) cond_broadcast(&tpool->tp_workcv); 419 sig_mutex_unlock(&tpool->tp_mutex); 420 } 421 } 422 423 /* 424 * Wait for all jobs to complete. 425 * Calling tpool_wait() from a job in the pool will cause deadlock. 426 */ 427 void 428 tpool_wait(tpool_t *tpool) 429 { 430 ASSERT(!tpool_member(tpool)); 431 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); 432 433 sig_mutex_lock(&tpool->tp_mutex); 434 pthread_cleanup_push(sig_mutex_unlock, &tpool->tp_mutex); 435 while (tpool->tp_head != NULL || tpool->tp_active != NULL) { 436 tpool->tp_flags |= TP_WAIT; 437 (void) sig_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex); 438 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); 439 } 440 pthread_cleanup_pop(1); /* sig_mutex_unlock(&tpool->tp_mutex); */ 441 } 442 443 void 444 tpool_suspend(tpool_t *tpool) 445 { 446 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); 447 448 sig_mutex_lock(&tpool->tp_mutex); 449 tpool->tp_flags |= TP_SUSPEND; 450 sig_mutex_unlock(&tpool->tp_mutex); 451 } 452 453 int 454 tpool_suspended(tpool_t *tpool) 455 { 456 int suspended; 457 458 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); 459 460 sig_mutex_lock(&tpool->tp_mutex); 461 suspended = (tpool->tp_flags & TP_SUSPEND) != 0; 462 sig_mutex_unlock(&tpool->tp_mutex); 463 464 return (suspended); 465 } 466 467 void 468 tpool_resume(tpool_t *tpool) 469 { 470 int excess; 471 472 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); 473 474 sig_mutex_lock(&tpool->tp_mutex); 475 if (!(tpool->tp_flags & TP_SUSPEND)) { 476 sig_mutex_unlock(&tpool->tp_mutex); 477 return; 478 } 479 tpool->tp_flags &= ~TP_SUSPEND; 480 (void) cond_broadcast(&tpool->tp_workcv); 481 excess = tpool->tp_njobs - tpool->tp_idle; 482 while (excess-- > 0 && tpool->tp_current < tpool->tp_maximum) { 483 if (create_worker(tpool) != 0) 484 break; /* pthread_create() failed */ 485 tpool->tp_current++; 486 } 487 sig_mutex_unlock(&tpool->tp_mutex); 488 } 489 490 int 491 tpool_member(tpool_t *tpool) 492 { 493 pthread_t my_tid = pthread_self(); 494 tpool_active_t *activep; 495 496 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); 497 498 sig_mutex_lock(&tpool->tp_mutex); 499 for (activep = tpool->tp_active; activep; activep = activep->tpa_next) { 500 if (activep->tpa_tid == my_tid) { 501 sig_mutex_unlock(&tpool->tp_mutex); 502 return (1); 503 } 504 } 505 sig_mutex_unlock(&tpool->tp_mutex); 506 return (0); 507 } 508 509 void 510 postfork1_child_tpool(void) 511 { 512 pthread_t my_tid = pthread_self(); 513 tpool_t *tpool; 514 tpool_job_t *job; 515 516 /* 517 * All of the thread pool workers are gone, except possibly 518 * for the current thread, if it is a thread pool worker thread. 519 * Retain the thread pools, but make them all empty. Whatever 520 * jobs were queued or running belong to the parent process. 521 */ 522 top: 523 if ((tpool = thread_pools) == NULL) 524 return; 525 526 do { 527 tpool_active_t *activep; 528 529 (void) mutex_init(&tpool->tp_mutex, USYNC_THREAD, NULL); 530 (void) cond_init(&tpool->tp_busycv, USYNC_THREAD, NULL); 531 (void) cond_init(&tpool->tp_workcv, USYNC_THREAD, NULL); 532 (void) cond_init(&tpool->tp_waitcv, USYNC_THREAD, NULL); 533 for (job = tpool->tp_head; job; job = tpool->tp_head) { 534 tpool->tp_head = job->tpj_next; 535 lfree(job, sizeof (*job)); 536 } 537 tpool->tp_tail = NULL; 538 tpool->tp_njobs = 0; 539 for (activep = tpool->tp_active; activep; 540 activep = activep->tpa_next) { 541 if (activep->tpa_tid == my_tid) { 542 activep->tpa_next = NULL; 543 break; 544 } 545 } 546 tpool->tp_idle = 0; 547 tpool->tp_current = 0; 548 if ((tpool->tp_active = activep) != NULL) 549 tpool->tp_current = 1; 550 tpool->tp_flags &= ~TP_WAIT; 551 if (tpool->tp_flags & (TP_DESTROY | TP_ABANDON)) { 552 tpool->tp_flags &= ~TP_DESTROY; 553 tpool->tp_flags |= TP_ABANDON; 554 if (tpool->tp_current == 0) { 555 delete_pool(tpool); 556 goto top; /* start over */ 557 } 558 } 559 } while ((tpool = tpool->tp_forw) != thread_pools); 560 } 561