1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 22 /* 23 * Copyright 2008 Sun Microsystems, Inc. All rights reserved. 24 * Use is subject to license terms. 25 */ 26 27 #include <stdlib.h> 28 #include <signal.h> 29 #include <errno.h> 30 #include <assert.h> 31 #include "thread_pool_impl.h" 32 33 static pthread_mutex_t thread_pool_lock = PTHREAD_MUTEX_INITIALIZER; 34 static tpool_t *thread_pools = NULL; 35 36 static void 37 delete_pool(tpool_t *tpool) 38 { 39 tpool_job_t *job; 40 41 ASSERT(tpool->tp_current == 0 && tpool->tp_active == NULL); 42 43 /* 44 * Unlink the pool from the global list of all pools. 45 */ 46 (void) pthread_mutex_lock(&thread_pool_lock); 47 if (thread_pools == tpool) 48 thread_pools = tpool->tp_forw; 49 if (thread_pools == tpool) 50 thread_pools = NULL; 51 else { 52 tpool->tp_back->tp_forw = tpool->tp_forw; 53 tpool->tp_forw->tp_back = tpool->tp_back; 54 } 55 pthread_mutex_unlock(&thread_pool_lock); 56 57 /* 58 * There should be no pending jobs, but just in case... 59 */ 60 for (job = tpool->tp_head; job != NULL; job = tpool->tp_head) { 61 tpool->tp_head = job->tpj_next; 62 free(job); 63 } 64 (void) pthread_attr_destroy(&tpool->tp_attr); 65 free(tpool); 66 } 67 68 /* 69 * Worker thread is terminating. 70 */ 71 static void 72 worker_cleanup(void *arg) 73 { 74 tpool_t *tpool = (tpool_t *)arg; 75 76 if (--tpool->tp_current == 0 && 77 (tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) { 78 if (tpool->tp_flags & TP_ABANDON) { 79 pthread_mutex_unlock(&tpool->tp_mutex); 80 delete_pool(tpool); 81 return; 82 } 83 if (tpool->tp_flags & TP_DESTROY) 84 (void) pthread_cond_broadcast(&tpool->tp_busycv); 85 } 86 pthread_mutex_unlock(&tpool->tp_mutex); 87 } 88 89 static void 90 notify_waiters(tpool_t *tpool) 91 { 92 if (tpool->tp_head == NULL && tpool->tp_active == NULL) { 93 tpool->tp_flags &= ~TP_WAIT; 94 (void) pthread_cond_broadcast(&tpool->tp_waitcv); 95 } 96 } 97 98 /* 99 * Called by a worker thread on return from a tpool_dispatch()d job. 100 */ 101 static void 102 job_cleanup(void *arg) 103 { 104 tpool_t *tpool = (tpool_t *)arg; 105 106 pthread_t my_tid = pthread_self(); 107 tpool_active_t *activep; 108 tpool_active_t **activepp; 109 110 pthread_mutex_lock(&tpool->tp_mutex); 111 for (activepp = &tpool->tp_active; ; activepp = &activep->tpa_next) { 112 activep = *activepp; 113 if (activep->tpa_tid == my_tid) { 114 *activepp = activep->tpa_next; 115 break; 116 } 117 } 118 if (tpool->tp_flags & TP_WAIT) 119 notify_waiters(tpool); 120 } 121 122 static void * 123 tpool_worker(void *arg) 124 { 125 tpool_t *tpool = (tpool_t *)arg; 126 int elapsed; 127 tpool_job_t *job; 128 void (*func)(void *); 129 tpool_active_t active; 130 131 pthread_mutex_lock(&tpool->tp_mutex); 132 pthread_cleanup_push(worker_cleanup, tpool); 133 134 /* 135 * This is the worker's main loop. 136 * It will only be left if a timeout or an error has occurred. 137 */ 138 active.tpa_tid = pthread_self(); 139 for (;;) { 140 elapsed = 0; 141 tpool->tp_idle++; 142 if (tpool->tp_flags & TP_WAIT) 143 notify_waiters(tpool); 144 while ((tpool->tp_head == NULL || 145 (tpool->tp_flags & TP_SUSPEND)) && 146 !(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) { 147 if (tpool->tp_current <= tpool->tp_minimum || 148 tpool->tp_linger == 0) { 149 (void) pthread_cond_wait(&tpool->tp_workcv, 150 &tpool->tp_mutex); 151 } else { 152 struct timespec ts; 153 154 clock_gettime(CLOCK_REALTIME, &ts); 155 ts.tv_sec += tpool->tp_linger; 156 157 if (pthread_cond_timedwait(&tpool->tp_workcv, 158 &tpool->tp_mutex, &ts) != 0) { 159 elapsed = 1; 160 break; 161 } 162 } 163 } 164 tpool->tp_idle--; 165 if (tpool->tp_flags & TP_DESTROY) 166 break; 167 if (tpool->tp_flags & TP_ABANDON) { 168 /* can't abandon a suspended pool */ 169 if (tpool->tp_flags & TP_SUSPEND) { 170 tpool->tp_flags &= ~TP_SUSPEND; 171 (void) pthread_cond_broadcast( 172 &tpool->tp_workcv); 173 } 174 if (tpool->tp_head == NULL) 175 break; 176 } 177 if ((job = tpool->tp_head) != NULL && 178 !(tpool->tp_flags & TP_SUSPEND)) { 179 elapsed = 0; 180 func = job->tpj_func; 181 arg = job->tpj_arg; 182 tpool->tp_head = job->tpj_next; 183 if (job == tpool->tp_tail) 184 tpool->tp_tail = NULL; 185 tpool->tp_njobs--; 186 active.tpa_next = tpool->tp_active; 187 tpool->tp_active = &active; 188 pthread_mutex_unlock(&tpool->tp_mutex); 189 pthread_cleanup_push(job_cleanup, tpool); 190 free(job); 191 192 sigset_t maskset; 193 (void) pthread_sigmask(SIG_SETMASK, NULL, &maskset); 194 195 /* 196 * Call the specified function. 197 */ 198 func(arg); 199 /* 200 * We don't know what this thread has been doing, 201 * so we reset its signal mask and cancellation 202 * state back to the values prior to calling func(). 203 */ 204 (void) pthread_sigmask(SIG_SETMASK, &maskset, NULL); 205 (void) pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, 206 NULL); 207 (void) pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, 208 NULL); 209 pthread_cleanup_pop(1); 210 } 211 if (elapsed && tpool->tp_current > tpool->tp_minimum) { 212 /* 213 * We timed out and there is no work to be done 214 * and the number of workers exceeds the minimum. 215 * Exit now to reduce the size of the pool. 216 */ 217 break; 218 } 219 } 220 pthread_cleanup_pop(1); 221 return (arg); 222 } 223 224 /* 225 * Create a worker thread, with default signals blocked. 226 */ 227 static int 228 create_worker(tpool_t *tpool) 229 { 230 pthread_t thread; 231 sigset_t oset; 232 int error; 233 234 (void) pthread_sigmask(SIG_SETMASK, NULL, &oset); 235 error = pthread_create(&thread, &tpool->tp_attr, tpool_worker, tpool); 236 (void) pthread_sigmask(SIG_SETMASK, &oset, NULL); 237 return (error); 238 } 239 240 241 /* 242 * pthread_attr_clone: make a copy of a pthread_attr_t. When old_attr 243 * is NULL initialize the cloned attr using default values. 244 */ 245 static int 246 pthread_attr_clone(pthread_attr_t *attr, const pthread_attr_t *old_attr) 247 { 248 int error; 249 250 error = pthread_attr_init(attr); 251 if (error || (old_attr == NULL)) 252 return (error); 253 254 #ifdef __GLIBC__ 255 cpu_set_t cpuset; 256 size_t cpusetsize = sizeof (cpuset); 257 error = pthread_attr_getaffinity_np(old_attr, cpusetsize, &cpuset); 258 if (error == 0) 259 error = pthread_attr_setaffinity_np(attr, cpusetsize, &cpuset); 260 if (error) 261 goto error; 262 #endif /* __GLIBC__ */ 263 264 int detachstate; 265 error = pthread_attr_getdetachstate(old_attr, &detachstate); 266 if (error == 0) 267 error = pthread_attr_setdetachstate(attr, detachstate); 268 if (error) 269 goto error; 270 271 size_t guardsize; 272 error = pthread_attr_getguardsize(old_attr, &guardsize); 273 if (error == 0) 274 error = pthread_attr_setguardsize(attr, guardsize); 275 if (error) 276 goto error; 277 278 int inheritsched; 279 error = pthread_attr_getinheritsched(old_attr, &inheritsched); 280 if (error == 0) 281 error = pthread_attr_setinheritsched(attr, inheritsched); 282 if (error) 283 goto error; 284 285 struct sched_param param; 286 error = pthread_attr_getschedparam(old_attr, ¶m); 287 if (error == 0) 288 error = pthread_attr_setschedparam(attr, ¶m); 289 if (error) 290 goto error; 291 292 int policy; 293 error = pthread_attr_getschedpolicy(old_attr, &policy); 294 if (error == 0) 295 error = pthread_attr_setschedpolicy(attr, policy); 296 if (error) 297 goto error; 298 299 int scope; 300 error = pthread_attr_getscope(old_attr, &scope); 301 if (error == 0) 302 error = pthread_attr_setscope(attr, scope); 303 if (error) 304 goto error; 305 306 void *stackaddr; 307 size_t stacksize; 308 error = pthread_attr_getstack(old_attr, &stackaddr, &stacksize); 309 if (error == 0) 310 error = pthread_attr_setstack(attr, stackaddr, stacksize); 311 if (error) 312 goto error; 313 314 return (0); 315 error: 316 pthread_attr_destroy(attr); 317 return (error); 318 } 319 320 tpool_t * 321 tpool_create(uint_t min_threads, uint_t max_threads, uint_t linger, 322 pthread_attr_t *attr) 323 { 324 tpool_t *tpool; 325 void *stackaddr; 326 size_t stacksize; 327 size_t minstack; 328 int error; 329 330 if (min_threads > max_threads || max_threads < 1) { 331 errno = EINVAL; 332 return (NULL); 333 } 334 if (attr != NULL) { 335 if (pthread_attr_getstack(attr, &stackaddr, &stacksize) != 0) { 336 errno = EINVAL; 337 return (NULL); 338 } 339 /* 340 * Allow only one thread in the pool with a specified stack. 341 * Require threads to have at least the minimum stack size. 342 */ 343 minstack = PTHREAD_STACK_MIN; 344 if (stackaddr != NULL) { 345 if (stacksize < minstack || max_threads != 1) { 346 errno = EINVAL; 347 return (NULL); 348 } 349 } else if (stacksize != 0 && stacksize < minstack) { 350 errno = EINVAL; 351 return (NULL); 352 } 353 } 354 355 tpool = calloc(1, sizeof (*tpool)); 356 if (tpool == NULL) { 357 errno = ENOMEM; 358 return (NULL); 359 } 360 (void) pthread_mutex_init(&tpool->tp_mutex, NULL); 361 (void) pthread_cond_init(&tpool->tp_busycv, NULL); 362 (void) pthread_cond_init(&tpool->tp_workcv, NULL); 363 (void) pthread_cond_init(&tpool->tp_waitcv, NULL); 364 tpool->tp_minimum = min_threads; 365 tpool->tp_maximum = max_threads; 366 tpool->tp_linger = linger; 367 368 /* 369 * We cannot just copy the attribute pointer. 370 * We need to initialize a new pthread_attr_t structure 371 * with the values from the user-supplied pthread_attr_t. 372 * If the attribute pointer is NULL, we need to initialize 373 * the new pthread_attr_t structure with default values. 374 */ 375 error = pthread_attr_clone(&tpool->tp_attr, attr); 376 if (error) { 377 free(tpool); 378 errno = error; 379 return (NULL); 380 } 381 382 /* make all pool threads be detached daemon threads */ 383 (void) pthread_attr_setdetachstate(&tpool->tp_attr, 384 PTHREAD_CREATE_DETACHED); 385 386 /* insert into the global list of all thread pools */ 387 pthread_mutex_lock(&thread_pool_lock); 388 if (thread_pools == NULL) { 389 tpool->tp_forw = tpool; 390 tpool->tp_back = tpool; 391 thread_pools = tpool; 392 } else { 393 thread_pools->tp_back->tp_forw = tpool; 394 tpool->tp_forw = thread_pools; 395 tpool->tp_back = thread_pools->tp_back; 396 thread_pools->tp_back = tpool; 397 } 398 pthread_mutex_unlock(&thread_pool_lock); 399 400 return (tpool); 401 } 402 403 /* 404 * Dispatch a work request to the thread pool. 405 * If there are idle workers, awaken one. 406 * Else, if the maximum number of workers has 407 * not been reached, spawn a new worker thread. 408 * Else just return with the job added to the queue. 409 */ 410 int 411 tpool_dispatch(tpool_t *tpool, void (*func)(void *), void *arg) 412 { 413 tpool_job_t *job; 414 415 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); 416 417 if ((job = calloc(1, sizeof (*job))) == NULL) 418 return (-1); 419 job->tpj_next = NULL; 420 job->tpj_func = func; 421 job->tpj_arg = arg; 422 423 pthread_mutex_lock(&tpool->tp_mutex); 424 425 if (tpool->tp_head == NULL) 426 tpool->tp_head = job; 427 else 428 tpool->tp_tail->tpj_next = job; 429 tpool->tp_tail = job; 430 tpool->tp_njobs++; 431 432 if (!(tpool->tp_flags & TP_SUSPEND)) { 433 if (tpool->tp_idle > 0) 434 (void) pthread_cond_signal(&tpool->tp_workcv); 435 else if (tpool->tp_current < tpool->tp_maximum && 436 create_worker(tpool) == 0) 437 tpool->tp_current++; 438 } 439 440 pthread_mutex_unlock(&tpool->tp_mutex); 441 return (0); 442 } 443 444 static void 445 tpool_cleanup(void *arg) 446 { 447 tpool_t *tpool = (tpool_t *)arg; 448 449 pthread_mutex_unlock(&tpool->tp_mutex); 450 } 451 452 /* 453 * Assumes: by the time tpool_destroy() is called no one will use this 454 * thread pool in any way and no one will try to dispatch entries to it. 455 * Calling tpool_destroy() from a job in the pool will cause deadlock. 456 */ 457 void 458 tpool_destroy(tpool_t *tpool) 459 { 460 tpool_active_t *activep; 461 462 ASSERT(!tpool_member(tpool)); 463 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); 464 465 pthread_mutex_lock(&tpool->tp_mutex); 466 pthread_cleanup_push(tpool_cleanup, tpool); 467 468 /* mark the pool as being destroyed; wakeup idle workers */ 469 tpool->tp_flags |= TP_DESTROY; 470 tpool->tp_flags &= ~TP_SUSPEND; 471 (void) pthread_cond_broadcast(&tpool->tp_workcv); 472 473 /* cancel all active workers */ 474 for (activep = tpool->tp_active; activep; activep = activep->tpa_next) 475 (void) pthread_cancel(activep->tpa_tid); 476 477 /* wait for all active workers to finish */ 478 while (tpool->tp_active != NULL) { 479 tpool->tp_flags |= TP_WAIT; 480 (void) pthread_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex); 481 } 482 483 /* the last worker to terminate will wake us up */ 484 while (tpool->tp_current != 0) 485 (void) pthread_cond_wait(&tpool->tp_busycv, &tpool->tp_mutex); 486 487 pthread_cleanup_pop(1); /* pthread_mutex_unlock(&tpool->tp_mutex); */ 488 delete_pool(tpool); 489 } 490 491 /* 492 * Like tpool_destroy(), but don't cancel workers or wait for them to finish. 493 * The last worker to terminate will delete the pool. 494 */ 495 void 496 tpool_abandon(tpool_t *tpool) 497 { 498 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); 499 500 pthread_mutex_lock(&tpool->tp_mutex); 501 if (tpool->tp_current == 0) { 502 /* no workers, just delete the pool */ 503 pthread_mutex_unlock(&tpool->tp_mutex); 504 delete_pool(tpool); 505 } else { 506 /* wake up all workers, last one will delete the pool */ 507 tpool->tp_flags |= TP_ABANDON; 508 tpool->tp_flags &= ~TP_SUSPEND; 509 (void) pthread_cond_broadcast(&tpool->tp_workcv); 510 pthread_mutex_unlock(&tpool->tp_mutex); 511 } 512 } 513 514 /* 515 * Wait for all jobs to complete. 516 * Calling tpool_wait() from a job in the pool will cause deadlock. 517 */ 518 void 519 tpool_wait(tpool_t *tpool) 520 { 521 ASSERT(!tpool_member(tpool)); 522 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); 523 524 pthread_mutex_lock(&tpool->tp_mutex); 525 pthread_cleanup_push(tpool_cleanup, tpool); 526 while (tpool->tp_head != NULL || tpool->tp_active != NULL) { 527 tpool->tp_flags |= TP_WAIT; 528 (void) pthread_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex); 529 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); 530 } 531 pthread_cleanup_pop(1); /* pthread_mutex_unlock(&tpool->tp_mutex); */ 532 } 533 534 void 535 tpool_suspend(tpool_t *tpool) 536 { 537 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); 538 539 pthread_mutex_lock(&tpool->tp_mutex); 540 tpool->tp_flags |= TP_SUSPEND; 541 pthread_mutex_unlock(&tpool->tp_mutex); 542 } 543 544 int 545 tpool_suspended(tpool_t *tpool) 546 { 547 int suspended; 548 549 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); 550 551 pthread_mutex_lock(&tpool->tp_mutex); 552 suspended = (tpool->tp_flags & TP_SUSPEND) != 0; 553 pthread_mutex_unlock(&tpool->tp_mutex); 554 555 return (suspended); 556 } 557 558 void 559 tpool_resume(tpool_t *tpool) 560 { 561 int excess; 562 563 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); 564 565 pthread_mutex_lock(&tpool->tp_mutex); 566 if (!(tpool->tp_flags & TP_SUSPEND)) { 567 pthread_mutex_unlock(&tpool->tp_mutex); 568 return; 569 } 570 tpool->tp_flags &= ~TP_SUSPEND; 571 (void) pthread_cond_broadcast(&tpool->tp_workcv); 572 excess = tpool->tp_njobs - tpool->tp_idle; 573 while (excess-- > 0 && tpool->tp_current < tpool->tp_maximum) { 574 if (create_worker(tpool) != 0) 575 break; /* pthread_create() failed */ 576 tpool->tp_current++; 577 } 578 pthread_mutex_unlock(&tpool->tp_mutex); 579 } 580 581 int 582 tpool_member(tpool_t *tpool) 583 { 584 pthread_t my_tid = pthread_self(); 585 tpool_active_t *activep; 586 587 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); 588 589 pthread_mutex_lock(&tpool->tp_mutex); 590 for (activep = tpool->tp_active; activep; activep = activep->tpa_next) { 591 if (activep->tpa_tid == my_tid) { 592 pthread_mutex_unlock(&tpool->tp_mutex); 593 return (1); 594 } 595 } 596 pthread_mutex_unlock(&tpool->tp_mutex); 597 return (0); 598 } 599