1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 22 /* 23 * Copyright 2008 Sun Microsystems, Inc. All rights reserved. 24 * Use is subject to license terms. 25 */ 26 27 #include <stdlib.h> 28 #include <signal.h> 29 #include <errno.h> 30 #include <assert.h> 31 #include "thread_pool_impl.h" 32 33 static pthread_mutex_t thread_pool_lock = PTHREAD_MUTEX_INITIALIZER; 34 static tpool_t *thread_pools = NULL; 35 36 static void 37 delete_pool(tpool_t *tpool) 38 { 39 tpool_job_t *job; 40 41 ASSERT(tpool->tp_current == 0 && tpool->tp_active == NULL); 42 43 /* 44 * Unlink the pool from the global list of all pools. 45 */ 46 (void) pthread_mutex_lock(&thread_pool_lock); 47 if (thread_pools == tpool) 48 thread_pools = tpool->tp_forw; 49 if (thread_pools == tpool) 50 thread_pools = NULL; 51 else { 52 tpool->tp_back->tp_forw = tpool->tp_forw; 53 tpool->tp_forw->tp_back = tpool->tp_back; 54 } 55 pthread_mutex_unlock(&thread_pool_lock); 56 57 /* 58 * There should be no pending jobs, but just in case... 59 */ 60 for (job = tpool->tp_head; job != NULL; job = tpool->tp_head) { 61 tpool->tp_head = job->tpj_next; 62 free(job); 63 } 64 (void) pthread_attr_destroy(&tpool->tp_attr); 65 free(tpool); 66 } 67 68 /* 69 * Worker thread is terminating. 70 */ 71 static void 72 worker_cleanup(void *arg) 73 { 74 tpool_t *tpool = (tpool_t *)arg; 75 76 if (--tpool->tp_current == 0 && 77 (tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) { 78 if (tpool->tp_flags & TP_ABANDON) { 79 pthread_mutex_unlock(&tpool->tp_mutex); 80 delete_pool(tpool); 81 return; 82 } 83 if (tpool->tp_flags & TP_DESTROY) 84 (void) pthread_cond_broadcast(&tpool->tp_busycv); 85 } 86 pthread_mutex_unlock(&tpool->tp_mutex); 87 } 88 89 static void 90 notify_waiters(tpool_t *tpool) 91 { 92 if (tpool->tp_head == NULL && tpool->tp_active == NULL) { 93 tpool->tp_flags &= ~TP_WAIT; 94 (void) pthread_cond_broadcast(&tpool->tp_waitcv); 95 } 96 } 97 98 /* 99 * Called by a worker thread on return from a tpool_dispatch()d job. 100 */ 101 static void 102 job_cleanup(void *arg) 103 { 104 tpool_t *tpool = (tpool_t *)arg; 105 106 pthread_t my_tid = pthread_self(); 107 tpool_active_t *activep; 108 tpool_active_t **activepp; 109 110 pthread_mutex_lock(&tpool->tp_mutex); 111 /* CSTYLED */ 112 for (activepp = &tpool->tp_active;; activepp = &activep->tpa_next) { 113 activep = *activepp; 114 if (activep->tpa_tid == my_tid) { 115 *activepp = activep->tpa_next; 116 break; 117 } 118 } 119 if (tpool->tp_flags & TP_WAIT) 120 notify_waiters(tpool); 121 } 122 123 static void * 124 tpool_worker(void *arg) 125 { 126 tpool_t *tpool = (tpool_t *)arg; 127 int elapsed; 128 tpool_job_t *job; 129 void (*func)(void *); 130 tpool_active_t active; 131 132 pthread_mutex_lock(&tpool->tp_mutex); 133 pthread_cleanup_push(worker_cleanup, tpool); 134 135 /* 136 * This is the worker's main loop. 137 * It will only be left if a timeout or an error has occurred. 138 */ 139 active.tpa_tid = pthread_self(); 140 for (;;) { 141 elapsed = 0; 142 tpool->tp_idle++; 143 if (tpool->tp_flags & TP_WAIT) 144 notify_waiters(tpool); 145 while ((tpool->tp_head == NULL || 146 (tpool->tp_flags & TP_SUSPEND)) && 147 !(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) { 148 if (tpool->tp_current <= tpool->tp_minimum || 149 tpool->tp_linger == 0) { 150 (void) pthread_cond_wait(&tpool->tp_workcv, 151 &tpool->tp_mutex); 152 } else { 153 struct timespec ts; 154 155 clock_gettime(CLOCK_REALTIME, &ts); 156 ts.tv_sec += tpool->tp_linger; 157 158 if (pthread_cond_timedwait(&tpool->tp_workcv, 159 &tpool->tp_mutex, &ts) != 0) { 160 elapsed = 1; 161 break; 162 } 163 } 164 } 165 tpool->tp_idle--; 166 if (tpool->tp_flags & TP_DESTROY) 167 break; 168 if (tpool->tp_flags & TP_ABANDON) { 169 /* can't abandon a suspended pool */ 170 if (tpool->tp_flags & TP_SUSPEND) { 171 tpool->tp_flags &= ~TP_SUSPEND; 172 (void) pthread_cond_broadcast( 173 &tpool->tp_workcv); 174 } 175 if (tpool->tp_head == NULL) 176 break; 177 } 178 if ((job = tpool->tp_head) != NULL && 179 !(tpool->tp_flags & TP_SUSPEND)) { 180 elapsed = 0; 181 func = job->tpj_func; 182 arg = job->tpj_arg; 183 tpool->tp_head = job->tpj_next; 184 if (job == tpool->tp_tail) 185 tpool->tp_tail = NULL; 186 tpool->tp_njobs--; 187 active.tpa_next = tpool->tp_active; 188 tpool->tp_active = &active; 189 pthread_mutex_unlock(&tpool->tp_mutex); 190 pthread_cleanup_push(job_cleanup, tpool); 191 free(job); 192 193 sigset_t maskset; 194 (void) pthread_sigmask(SIG_SETMASK, NULL, &maskset); 195 196 /* 197 * Call the specified function. 198 */ 199 func(arg); 200 /* 201 * We don't know what this thread has been doing, 202 * so we reset its signal mask and cancellation 203 * state back to the values prior to calling func(). 204 */ 205 (void) pthread_sigmask(SIG_SETMASK, &maskset, NULL); 206 (void) pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, 207 NULL); 208 (void) pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, 209 NULL); 210 pthread_cleanup_pop(1); 211 } 212 if (elapsed && tpool->tp_current > tpool->tp_minimum) { 213 /* 214 * We timed out and there is no work to be done 215 * and the number of workers exceeds the minimum. 216 * Exit now to reduce the size of the pool. 217 */ 218 break; 219 } 220 } 221 pthread_cleanup_pop(1); 222 return (arg); 223 } 224 225 /* 226 * Create a worker thread, with default signals blocked. 227 */ 228 static int 229 create_worker(tpool_t *tpool) 230 { 231 pthread_t thread; 232 sigset_t oset; 233 int error; 234 235 (void) pthread_sigmask(SIG_SETMASK, NULL, &oset); 236 error = pthread_create(&thread, &tpool->tp_attr, tpool_worker, tpool); 237 (void) pthread_sigmask(SIG_SETMASK, &oset, NULL); 238 return (error); 239 } 240 241 242 /* 243 * pthread_attr_clone: make a copy of a pthread_attr_t. When old_attr 244 * is NULL initialize the cloned attr using default values. 245 */ 246 static int 247 pthread_attr_clone(pthread_attr_t *attr, const pthread_attr_t *old_attr) 248 { 249 int error; 250 251 error = pthread_attr_init(attr); 252 if (error || (old_attr == NULL)) 253 return (error); 254 255 #ifdef __GLIBC__ 256 cpu_set_t cpuset; 257 size_t cpusetsize = sizeof (cpuset); 258 error = pthread_attr_getaffinity_np(old_attr, cpusetsize, &cpuset); 259 if (error == 0) 260 error = pthread_attr_setaffinity_np(attr, cpusetsize, &cpuset); 261 if (error) 262 goto error; 263 #endif /* __GLIBC__ */ 264 265 int detachstate; 266 error = pthread_attr_getdetachstate(old_attr, &detachstate); 267 if (error == 0) 268 error = pthread_attr_setdetachstate(attr, detachstate); 269 if (error) 270 goto error; 271 272 size_t guardsize; 273 error = pthread_attr_getguardsize(old_attr, &guardsize); 274 if (error == 0) 275 error = pthread_attr_setguardsize(attr, guardsize); 276 if (error) 277 goto error; 278 279 int inheritsched; 280 error = pthread_attr_getinheritsched(old_attr, &inheritsched); 281 if (error == 0) 282 error = pthread_attr_setinheritsched(attr, inheritsched); 283 if (error) 284 goto error; 285 286 struct sched_param param; 287 error = pthread_attr_getschedparam(old_attr, ¶m); 288 if (error == 0) 289 error = pthread_attr_setschedparam(attr, ¶m); 290 if (error) 291 goto error; 292 293 int policy; 294 error = pthread_attr_getschedpolicy(old_attr, &policy); 295 if (error == 0) 296 error = pthread_attr_setschedpolicy(attr, policy); 297 if (error) 298 goto error; 299 300 int scope; 301 error = pthread_attr_getscope(old_attr, &scope); 302 if (error == 0) 303 error = pthread_attr_setscope(attr, scope); 304 if (error) 305 goto error; 306 307 void *stackaddr; 308 size_t stacksize; 309 error = pthread_attr_getstack(old_attr, &stackaddr, &stacksize); 310 if (error == 0) 311 error = pthread_attr_setstack(attr, stackaddr, stacksize); 312 if (error) 313 goto error; 314 315 return (0); 316 error: 317 pthread_attr_destroy(attr); 318 return (error); 319 } 320 321 tpool_t * 322 tpool_create(uint_t min_threads, uint_t max_threads, uint_t linger, 323 pthread_attr_t *attr) 324 { 325 tpool_t *tpool; 326 void *stackaddr; 327 size_t stacksize; 328 size_t minstack; 329 int error; 330 331 if (min_threads > max_threads || max_threads < 1) { 332 errno = EINVAL; 333 return (NULL); 334 } 335 if (attr != NULL) { 336 if (pthread_attr_getstack(attr, &stackaddr, &stacksize) != 0) { 337 errno = EINVAL; 338 return (NULL); 339 } 340 /* 341 * Allow only one thread in the pool with a specified stack. 342 * Require threads to have at least the minimum stack size. 343 */ 344 minstack = PTHREAD_STACK_MIN; 345 if (stackaddr != NULL) { 346 if (stacksize < minstack || max_threads != 1) { 347 errno = EINVAL; 348 return (NULL); 349 } 350 } else if (stacksize != 0 && stacksize < minstack) { 351 errno = EINVAL; 352 return (NULL); 353 } 354 } 355 356 tpool = calloc(1, sizeof (*tpool)); 357 if (tpool == NULL) { 358 errno = ENOMEM; 359 return (NULL); 360 } 361 (void) pthread_mutex_init(&tpool->tp_mutex, NULL); 362 (void) pthread_cond_init(&tpool->tp_busycv, NULL); 363 (void) pthread_cond_init(&tpool->tp_workcv, NULL); 364 (void) pthread_cond_init(&tpool->tp_waitcv, NULL); 365 tpool->tp_minimum = min_threads; 366 tpool->tp_maximum = max_threads; 367 tpool->tp_linger = linger; 368 369 /* 370 * We cannot just copy the attribute pointer. 371 * We need to initialize a new pthread_attr_t structure 372 * with the values from the user-supplied pthread_attr_t. 373 * If the attribute pointer is NULL, we need to initialize 374 * the new pthread_attr_t structure with default values. 375 */ 376 error = pthread_attr_clone(&tpool->tp_attr, attr); 377 if (error) { 378 free(tpool); 379 errno = error; 380 return (NULL); 381 } 382 383 /* make all pool threads be detached daemon threads */ 384 (void) pthread_attr_setdetachstate(&tpool->tp_attr, 385 PTHREAD_CREATE_DETACHED); 386 387 /* insert into the global list of all thread pools */ 388 pthread_mutex_lock(&thread_pool_lock); 389 if (thread_pools == NULL) { 390 tpool->tp_forw = tpool; 391 tpool->tp_back = tpool; 392 thread_pools = tpool; 393 } else { 394 thread_pools->tp_back->tp_forw = tpool; 395 tpool->tp_forw = thread_pools; 396 tpool->tp_back = thread_pools->tp_back; 397 thread_pools->tp_back = tpool; 398 } 399 pthread_mutex_unlock(&thread_pool_lock); 400 401 return (tpool); 402 } 403 404 /* 405 * Dispatch a work request to the thread pool. 406 * If there are idle workers, awaken one. 407 * Else, if the maximum number of workers has 408 * not been reached, spawn a new worker thread. 409 * Else just return with the job added to the queue. 410 */ 411 int 412 tpool_dispatch(tpool_t *tpool, void (*func)(void *), void *arg) 413 { 414 tpool_job_t *job; 415 416 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); 417 418 if ((job = calloc(1, sizeof (*job))) == NULL) 419 return (-1); 420 job->tpj_next = NULL; 421 job->tpj_func = func; 422 job->tpj_arg = arg; 423 424 pthread_mutex_lock(&tpool->tp_mutex); 425 426 if (tpool->tp_head == NULL) 427 tpool->tp_head = job; 428 else 429 tpool->tp_tail->tpj_next = job; 430 tpool->tp_tail = job; 431 tpool->tp_njobs++; 432 433 if (!(tpool->tp_flags & TP_SUSPEND)) { 434 if (tpool->tp_idle > 0) 435 (void) pthread_cond_signal(&tpool->tp_workcv); 436 else if (tpool->tp_current < tpool->tp_maximum && 437 create_worker(tpool) == 0) 438 tpool->tp_current++; 439 } 440 441 pthread_mutex_unlock(&tpool->tp_mutex); 442 return (0); 443 } 444 445 static void 446 tpool_cleanup(void *arg) 447 { 448 tpool_t *tpool = (tpool_t *)arg; 449 450 pthread_mutex_unlock(&tpool->tp_mutex); 451 } 452 453 /* 454 * Assumes: by the time tpool_destroy() is called no one will use this 455 * thread pool in any way and no one will try to dispatch entries to it. 456 * Calling tpool_destroy() from a job in the pool will cause deadlock. 457 */ 458 void 459 tpool_destroy(tpool_t *tpool) 460 { 461 tpool_active_t *activep; 462 463 ASSERT(!tpool_member(tpool)); 464 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); 465 466 pthread_mutex_lock(&tpool->tp_mutex); 467 pthread_cleanup_push(tpool_cleanup, tpool); 468 469 /* mark the pool as being destroyed; wakeup idle workers */ 470 tpool->tp_flags |= TP_DESTROY; 471 tpool->tp_flags &= ~TP_SUSPEND; 472 (void) pthread_cond_broadcast(&tpool->tp_workcv); 473 474 /* cancel all active workers */ 475 for (activep = tpool->tp_active; activep; activep = activep->tpa_next) 476 (void) pthread_cancel(activep->tpa_tid); 477 478 /* wait for all active workers to finish */ 479 while (tpool->tp_active != NULL) { 480 tpool->tp_flags |= TP_WAIT; 481 (void) pthread_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex); 482 } 483 484 /* the last worker to terminate will wake us up */ 485 while (tpool->tp_current != 0) 486 (void) pthread_cond_wait(&tpool->tp_busycv, &tpool->tp_mutex); 487 488 pthread_cleanup_pop(1); /* pthread_mutex_unlock(&tpool->tp_mutex); */ 489 delete_pool(tpool); 490 } 491 492 /* 493 * Like tpool_destroy(), but don't cancel workers or wait for them to finish. 494 * The last worker to terminate will delete the pool. 495 */ 496 void 497 tpool_abandon(tpool_t *tpool) 498 { 499 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); 500 501 pthread_mutex_lock(&tpool->tp_mutex); 502 if (tpool->tp_current == 0) { 503 /* no workers, just delete the pool */ 504 pthread_mutex_unlock(&tpool->tp_mutex); 505 delete_pool(tpool); 506 } else { 507 /* wake up all workers, last one will delete the pool */ 508 tpool->tp_flags |= TP_ABANDON; 509 tpool->tp_flags &= ~TP_SUSPEND; 510 (void) pthread_cond_broadcast(&tpool->tp_workcv); 511 pthread_mutex_unlock(&tpool->tp_mutex); 512 } 513 } 514 515 /* 516 * Wait for all jobs to complete. 517 * Calling tpool_wait() from a job in the pool will cause deadlock. 518 */ 519 void 520 tpool_wait(tpool_t *tpool) 521 { 522 ASSERT(!tpool_member(tpool)); 523 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); 524 525 pthread_mutex_lock(&tpool->tp_mutex); 526 pthread_cleanup_push(tpool_cleanup, tpool); 527 while (tpool->tp_head != NULL || tpool->tp_active != NULL) { 528 tpool->tp_flags |= TP_WAIT; 529 (void) pthread_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex); 530 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); 531 } 532 pthread_cleanup_pop(1); /* pthread_mutex_unlock(&tpool->tp_mutex); */ 533 } 534 535 void 536 tpool_suspend(tpool_t *tpool) 537 { 538 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); 539 540 pthread_mutex_lock(&tpool->tp_mutex); 541 tpool->tp_flags |= TP_SUSPEND; 542 pthread_mutex_unlock(&tpool->tp_mutex); 543 } 544 545 int 546 tpool_suspended(tpool_t *tpool) 547 { 548 int suspended; 549 550 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); 551 552 pthread_mutex_lock(&tpool->tp_mutex); 553 suspended = (tpool->tp_flags & TP_SUSPEND) != 0; 554 pthread_mutex_unlock(&tpool->tp_mutex); 555 556 return (suspended); 557 } 558 559 void 560 tpool_resume(tpool_t *tpool) 561 { 562 int excess; 563 564 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); 565 566 pthread_mutex_lock(&tpool->tp_mutex); 567 if (!(tpool->tp_flags & TP_SUSPEND)) { 568 pthread_mutex_unlock(&tpool->tp_mutex); 569 return; 570 } 571 tpool->tp_flags &= ~TP_SUSPEND; 572 (void) pthread_cond_broadcast(&tpool->tp_workcv); 573 excess = tpool->tp_njobs - tpool->tp_idle; 574 while (excess-- > 0 && tpool->tp_current < tpool->tp_maximum) { 575 if (create_worker(tpool) != 0) 576 break; /* pthread_create() failed */ 577 tpool->tp_current++; 578 } 579 pthread_mutex_unlock(&tpool->tp_mutex); 580 } 581 582 int 583 tpool_member(tpool_t *tpool) 584 { 585 pthread_t my_tid = pthread_self(); 586 tpool_active_t *activep; 587 588 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); 589 590 pthread_mutex_lock(&tpool->tp_mutex); 591 for (activep = tpool->tp_active; activep; activep = activep->tpa_next) { 592 if (activep->tpa_tid == my_tid) { 593 pthread_mutex_unlock(&tpool->tp_mutex); 594 return (1); 595 } 596 } 597 pthread_mutex_unlock(&tpool->tp_mutex); 598 return (0); 599 } 600