1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or https://opensource.org/licenses/CDDL-1.0. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 22 /* 23 * Copyright 2008 Sun Microsystems, Inc. All rights reserved. 24 * Use is subject to license terms. 25 */ 26 27 #include <stdlib.h> 28 #include <signal.h> 29 #include <errno.h> 30 #include <assert.h> 31 #include <limits.h> 32 #include "thread_pool_impl.h" 33 34 static pthread_mutex_t thread_pool_lock = PTHREAD_MUTEX_INITIALIZER; 35 static tpool_t *thread_pools = NULL; 36 37 static void 38 delete_pool(tpool_t *tpool) 39 { 40 tpool_job_t *job; 41 42 ASSERT(tpool->tp_current == 0 && tpool->tp_active == NULL); 43 44 /* 45 * Unlink the pool from the global list of all pools. 46 */ 47 (void) pthread_mutex_lock(&thread_pool_lock); 48 if (thread_pools == tpool) 49 thread_pools = tpool->tp_forw; 50 if (thread_pools == tpool) 51 thread_pools = NULL; 52 else { 53 tpool->tp_back->tp_forw = tpool->tp_forw; 54 tpool->tp_forw->tp_back = tpool->tp_back; 55 } 56 pthread_mutex_unlock(&thread_pool_lock); 57 58 /* 59 * There should be no pending jobs, but just in case... 60 */ 61 for (job = tpool->tp_head; job != NULL; job = tpool->tp_head) { 62 tpool->tp_head = job->tpj_next; 63 free(job); 64 } 65 (void) pthread_attr_destroy(&tpool->tp_attr); 66 free(tpool); 67 } 68 69 /* 70 * Worker thread is terminating. 71 */ 72 static void 73 worker_cleanup(void *arg) 74 { 75 tpool_t *tpool = (tpool_t *)arg; 76 77 if (--tpool->tp_current == 0 && 78 (tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) { 79 if (tpool->tp_flags & TP_ABANDON) { 80 pthread_mutex_unlock(&tpool->tp_mutex); 81 delete_pool(tpool); 82 return; 83 } 84 if (tpool->tp_flags & TP_DESTROY) 85 (void) pthread_cond_broadcast(&tpool->tp_busycv); 86 } 87 pthread_mutex_unlock(&tpool->tp_mutex); 88 } 89 90 static void 91 notify_waiters(tpool_t *tpool) 92 { 93 if (tpool->tp_head == NULL && tpool->tp_active == NULL) { 94 tpool->tp_flags &= ~TP_WAIT; 95 (void) pthread_cond_broadcast(&tpool->tp_waitcv); 96 } 97 } 98 99 /* 100 * Called by a worker thread on return from a tpool_dispatch()d job. 101 */ 102 static void 103 job_cleanup(void *arg) 104 { 105 tpool_t *tpool = (tpool_t *)arg; 106 107 pthread_t my_tid = pthread_self(); 108 tpool_active_t *activep; 109 tpool_active_t **activepp; 110 111 pthread_mutex_lock(&tpool->tp_mutex); 112 for (activepp = &tpool->tp_active; ; activepp = &activep->tpa_next) { 113 activep = *activepp; 114 if (activep->tpa_tid == my_tid) { 115 *activepp = activep->tpa_next; 116 break; 117 } 118 } 119 if (tpool->tp_flags & TP_WAIT) 120 notify_waiters(tpool); 121 } 122 123 static void * 124 tpool_worker(void *arg) 125 { 126 tpool_t *tpool = (tpool_t *)arg; 127 int elapsed; 128 tpool_job_t *job; 129 void (*func)(void *); 130 tpool_active_t active; 131 132 pthread_mutex_lock(&tpool->tp_mutex); 133 pthread_cleanup_push(worker_cleanup, tpool); 134 135 /* 136 * This is the worker's main loop. 137 * It will only be left if a timeout or an error has occurred. 138 */ 139 active.tpa_tid = pthread_self(); 140 for (;;) { 141 elapsed = 0; 142 tpool->tp_idle++; 143 if (tpool->tp_flags & TP_WAIT) 144 notify_waiters(tpool); 145 while ((tpool->tp_head == NULL || 146 (tpool->tp_flags & TP_SUSPEND)) && 147 !(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) { 148 if (tpool->tp_current <= tpool->tp_minimum || 149 tpool->tp_linger == 0) { 150 (void) pthread_cond_wait(&tpool->tp_workcv, 151 &tpool->tp_mutex); 152 } else { 153 struct timespec ts; 154 155 clock_gettime(CLOCK_REALTIME, &ts); 156 ts.tv_sec += tpool->tp_linger; 157 158 if (pthread_cond_timedwait(&tpool->tp_workcv, 159 &tpool->tp_mutex, &ts) != 0) { 160 elapsed = 1; 161 break; 162 } 163 } 164 } 165 tpool->tp_idle--; 166 if (tpool->tp_flags & TP_DESTROY) 167 break; 168 if (tpool->tp_flags & TP_ABANDON) { 169 /* can't abandon a suspended pool */ 170 if (tpool->tp_flags & TP_SUSPEND) { 171 tpool->tp_flags &= ~TP_SUSPEND; 172 (void) pthread_cond_broadcast( 173 &tpool->tp_workcv); 174 } 175 if (tpool->tp_head == NULL) 176 break; 177 } 178 if ((job = tpool->tp_head) != NULL && 179 !(tpool->tp_flags & TP_SUSPEND)) { 180 elapsed = 0; 181 func = job->tpj_func; 182 arg = job->tpj_arg; 183 tpool->tp_head = job->tpj_next; 184 if (job == tpool->tp_tail) 185 tpool->tp_tail = NULL; 186 tpool->tp_njobs--; 187 active.tpa_next = tpool->tp_active; 188 tpool->tp_active = &active; 189 pthread_mutex_unlock(&tpool->tp_mutex); 190 pthread_cleanup_push(job_cleanup, tpool); 191 free(job); 192 193 sigset_t maskset; 194 (void) pthread_sigmask(SIG_SETMASK, NULL, &maskset); 195 196 /* 197 * Call the specified function. 198 */ 199 func(arg); 200 /* 201 * We don't know what this thread has been doing, 202 * so we reset its signal mask and cancellation 203 * state back to the values prior to calling func(). 204 */ 205 (void) pthread_sigmask(SIG_SETMASK, &maskset, NULL); 206 (void) pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, 207 NULL); 208 (void) pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, 209 NULL); 210 pthread_cleanup_pop(1); 211 } 212 if (elapsed && tpool->tp_current > tpool->tp_minimum) { 213 /* 214 * We timed out and there is no work to be done 215 * and the number of workers exceeds the minimum. 216 * Exit now to reduce the size of the pool. 217 */ 218 break; 219 } 220 } 221 pthread_cleanup_pop(1); 222 return (arg); 223 } 224 225 /* 226 * Create a worker thread, with default signals blocked. 227 */ 228 static int 229 create_worker(tpool_t *tpool) 230 { 231 pthread_t thread; 232 sigset_t oset; 233 int error; 234 235 (void) pthread_sigmask(SIG_SETMASK, NULL, &oset); 236 error = pthread_create(&thread, &tpool->tp_attr, tpool_worker, tpool); 237 (void) pthread_sigmask(SIG_SETMASK, &oset, NULL); 238 return (error); 239 } 240 241 242 /* 243 * pthread_attr_clone: make a copy of a pthread_attr_t. When old_attr 244 * is NULL initialize the cloned attr using default values. 245 */ 246 static int 247 pthread_attr_clone(pthread_attr_t *attr, const pthread_attr_t *old_attr) 248 { 249 int error; 250 251 error = pthread_attr_init(attr); 252 if (error || (old_attr == NULL)) 253 return (error); 254 255 #ifdef __GLIBC__ 256 cpu_set_t cpuset; 257 size_t cpusetsize = sizeof (cpuset); 258 error = pthread_attr_getaffinity_np(old_attr, cpusetsize, &cpuset); 259 if (error == 0) 260 error = pthread_attr_setaffinity_np(attr, cpusetsize, &cpuset); 261 if (error) 262 goto error; 263 #endif /* __GLIBC__ */ 264 265 int detachstate; 266 error = pthread_attr_getdetachstate(old_attr, &detachstate); 267 if (error == 0) 268 error = pthread_attr_setdetachstate(attr, detachstate); 269 if (error) 270 goto error; 271 272 size_t guardsize; 273 error = pthread_attr_getguardsize(old_attr, &guardsize); 274 if (error == 0) 275 error = pthread_attr_setguardsize(attr, guardsize); 276 if (error) 277 goto error; 278 279 int inheritsched; 280 error = pthread_attr_getinheritsched(old_attr, &inheritsched); 281 if (error == 0) 282 error = pthread_attr_setinheritsched(attr, inheritsched); 283 if (error) 284 goto error; 285 286 struct sched_param param; 287 error = pthread_attr_getschedparam(old_attr, ¶m); 288 if (error == 0) 289 error = pthread_attr_setschedparam(attr, ¶m); 290 if (error) 291 goto error; 292 293 int policy; 294 error = pthread_attr_getschedpolicy(old_attr, &policy); 295 if (error == 0) 296 error = pthread_attr_setschedpolicy(attr, policy); 297 if (error) 298 goto error; 299 300 int scope; 301 error = pthread_attr_getscope(old_attr, &scope); 302 if (error == 0) 303 error = pthread_attr_setscope(attr, scope); 304 if (error) 305 goto error; 306 307 void *stackaddr; 308 size_t stacksize; 309 error = pthread_attr_getstack(old_attr, &stackaddr, &stacksize); 310 if (error == 0) 311 error = pthread_attr_setstack(attr, stackaddr, stacksize); 312 if (error) 313 goto error; 314 315 return (0); 316 error: 317 pthread_attr_destroy(attr); 318 return (error); 319 } 320 321 tpool_t * 322 tpool_create(uint_t min_threads, uint_t max_threads, uint_t linger, 323 pthread_attr_t *attr) 324 { 325 tpool_t *tpool; 326 void *stackaddr; 327 size_t stacksize; 328 size_t minstack; 329 int error; 330 331 if (min_threads > max_threads || max_threads < 1) { 332 errno = EINVAL; 333 return (NULL); 334 } 335 if (attr != NULL) { 336 if (pthread_attr_getstack(attr, &stackaddr, &stacksize) != 0) { 337 errno = EINVAL; 338 return (NULL); 339 } 340 /* 341 * Allow only one thread in the pool with a specified stack. 342 * Require threads to have at least the minimum stack size. 343 */ 344 minstack = PTHREAD_STACK_MIN; 345 if (stackaddr != NULL) { 346 if (stacksize < minstack || max_threads != 1) { 347 errno = EINVAL; 348 return (NULL); 349 } 350 } else if (stacksize != 0 && stacksize < minstack) { 351 errno = EINVAL; 352 return (NULL); 353 } 354 } 355 356 tpool = calloc(1, sizeof (*tpool)); 357 if (tpool == NULL) { 358 errno = ENOMEM; 359 return (NULL); 360 } 361 (void) pthread_mutex_init(&tpool->tp_mutex, NULL); 362 (void) pthread_cond_init(&tpool->tp_busycv, NULL); 363 (void) pthread_cond_init(&tpool->tp_workcv, NULL); 364 (void) pthread_cond_init(&tpool->tp_waitcv, NULL); 365 tpool->tp_minimum = min_threads; 366 tpool->tp_maximum = max_threads; 367 tpool->tp_linger = linger; 368 369 /* 370 * We cannot just copy the attribute pointer. 371 * We need to initialize a new pthread_attr_t structure 372 * with the values from the user-supplied pthread_attr_t. 373 * If the attribute pointer is NULL, we need to initialize 374 * the new pthread_attr_t structure with default values. 375 */ 376 error = pthread_attr_clone(&tpool->tp_attr, attr); 377 if (error) { 378 free(tpool); 379 errno = error; 380 return (NULL); 381 } 382 383 /* make all pool threads be detached daemon threads */ 384 (void) pthread_attr_setdetachstate(&tpool->tp_attr, 385 PTHREAD_CREATE_DETACHED); 386 387 /* insert into the global list of all thread pools */ 388 pthread_mutex_lock(&thread_pool_lock); 389 if (thread_pools == NULL) { 390 tpool->tp_forw = tpool; 391 tpool->tp_back = tpool; 392 thread_pools = tpool; 393 } else { 394 thread_pools->tp_back->tp_forw = tpool; 395 tpool->tp_forw = thread_pools; 396 tpool->tp_back = thread_pools->tp_back; 397 thread_pools->tp_back = tpool; 398 } 399 pthread_mutex_unlock(&thread_pool_lock); 400 401 return (tpool); 402 } 403 404 /* 405 * Dispatch a work request to the thread pool. 406 * If there are idle workers, awaken one. 407 * Else, if the maximum number of workers has 408 * not been reached, spawn a new worker thread. 409 * Else just return with the job added to the queue. 410 */ 411 int 412 tpool_dispatch(tpool_t *tpool, void (*func)(void *), void *arg) 413 { 414 tpool_job_t *job; 415 416 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); 417 418 if ((job = calloc(1, sizeof (*job))) == NULL) 419 return (-1); 420 job->tpj_next = NULL; 421 job->tpj_func = func; 422 job->tpj_arg = arg; 423 424 pthread_mutex_lock(&tpool->tp_mutex); 425 426 if (!(tpool->tp_flags & TP_SUSPEND)) { 427 if (tpool->tp_idle > 0) 428 (void) pthread_cond_signal(&tpool->tp_workcv); 429 else if (tpool->tp_current >= tpool->tp_maximum) { 430 /* At worker limit. Leave task on queue */ 431 } else { 432 if (create_worker(tpool) == 0) { 433 /* Started a new worker thread */ 434 tpool->tp_current++; 435 } else if (tpool->tp_current > 0) { 436 /* Leave task on queue */ 437 } else { 438 /* Cannot start a single worker! */ 439 pthread_mutex_unlock(&tpool->tp_mutex); 440 free(job); 441 return (-1); 442 } 443 } 444 } 445 446 if (tpool->tp_head == NULL) 447 tpool->tp_head = job; 448 else 449 tpool->tp_tail->tpj_next = job; 450 tpool->tp_tail = job; 451 tpool->tp_njobs++; 452 453 pthread_mutex_unlock(&tpool->tp_mutex); 454 return (0); 455 } 456 457 static void 458 tpool_cleanup(void *arg) 459 { 460 tpool_t *tpool = (tpool_t *)arg; 461 462 pthread_mutex_unlock(&tpool->tp_mutex); 463 } 464 465 /* 466 * Assumes: by the time tpool_destroy() is called no one will use this 467 * thread pool in any way and no one will try to dispatch entries to it. 468 * Calling tpool_destroy() from a job in the pool will cause deadlock. 469 */ 470 void 471 tpool_destroy(tpool_t *tpool) 472 { 473 tpool_active_t *activep; 474 475 ASSERT(!tpool_member(tpool)); 476 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); 477 478 pthread_mutex_lock(&tpool->tp_mutex); 479 pthread_cleanup_push(tpool_cleanup, tpool); 480 481 /* mark the pool as being destroyed; wakeup idle workers */ 482 tpool->tp_flags |= TP_DESTROY; 483 tpool->tp_flags &= ~TP_SUSPEND; 484 (void) pthread_cond_broadcast(&tpool->tp_workcv); 485 486 /* cancel all active workers */ 487 for (activep = tpool->tp_active; activep; activep = activep->tpa_next) 488 (void) pthread_cancel(activep->tpa_tid); 489 490 /* wait for all active workers to finish */ 491 while (tpool->tp_active != NULL) { 492 tpool->tp_flags |= TP_WAIT; 493 (void) pthread_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex); 494 } 495 496 /* the last worker to terminate will wake us up */ 497 while (tpool->tp_current != 0) 498 (void) pthread_cond_wait(&tpool->tp_busycv, &tpool->tp_mutex); 499 500 pthread_cleanup_pop(1); /* pthread_mutex_unlock(&tpool->tp_mutex); */ 501 delete_pool(tpool); 502 } 503 504 /* 505 * Like tpool_destroy(), but don't cancel workers or wait for them to finish. 506 * The last worker to terminate will delete the pool. 507 */ 508 void 509 tpool_abandon(tpool_t *tpool) 510 { 511 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); 512 513 pthread_mutex_lock(&tpool->tp_mutex); 514 if (tpool->tp_current == 0) { 515 /* no workers, just delete the pool */ 516 pthread_mutex_unlock(&tpool->tp_mutex); 517 delete_pool(tpool); 518 } else { 519 /* wake up all workers, last one will delete the pool */ 520 tpool->tp_flags |= TP_ABANDON; 521 tpool->tp_flags &= ~TP_SUSPEND; 522 (void) pthread_cond_broadcast(&tpool->tp_workcv); 523 pthread_mutex_unlock(&tpool->tp_mutex); 524 } 525 } 526 527 /* 528 * Wait for all jobs to complete. 529 * Calling tpool_wait() from a job in the pool will cause deadlock. 530 */ 531 void 532 tpool_wait(tpool_t *tpool) 533 { 534 ASSERT(!tpool_member(tpool)); 535 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); 536 537 pthread_mutex_lock(&tpool->tp_mutex); 538 pthread_cleanup_push(tpool_cleanup, tpool); 539 while (tpool->tp_head != NULL || tpool->tp_active != NULL) { 540 tpool->tp_flags |= TP_WAIT; 541 (void) pthread_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex); 542 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); 543 } 544 pthread_cleanup_pop(1); /* pthread_mutex_unlock(&tpool->tp_mutex); */ 545 } 546 547 void 548 tpool_suspend(tpool_t *tpool) 549 { 550 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); 551 552 pthread_mutex_lock(&tpool->tp_mutex); 553 tpool->tp_flags |= TP_SUSPEND; 554 pthread_mutex_unlock(&tpool->tp_mutex); 555 } 556 557 int 558 tpool_suspended(tpool_t *tpool) 559 { 560 int suspended; 561 562 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); 563 564 pthread_mutex_lock(&tpool->tp_mutex); 565 suspended = (tpool->tp_flags & TP_SUSPEND) != 0; 566 pthread_mutex_unlock(&tpool->tp_mutex); 567 568 return (suspended); 569 } 570 571 void 572 tpool_resume(tpool_t *tpool) 573 { 574 int excess; 575 576 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); 577 578 pthread_mutex_lock(&tpool->tp_mutex); 579 if (!(tpool->tp_flags & TP_SUSPEND)) { 580 pthread_mutex_unlock(&tpool->tp_mutex); 581 return; 582 } 583 tpool->tp_flags &= ~TP_SUSPEND; 584 (void) pthread_cond_broadcast(&tpool->tp_workcv); 585 excess = tpool->tp_njobs - tpool->tp_idle; 586 while (excess-- > 0 && tpool->tp_current < tpool->tp_maximum) { 587 if (create_worker(tpool) != 0) 588 break; /* pthread_create() failed */ 589 tpool->tp_current++; 590 } 591 pthread_mutex_unlock(&tpool->tp_mutex); 592 } 593 594 int 595 tpool_member(tpool_t *tpool) 596 { 597 pthread_t my_tid = pthread_self(); 598 tpool_active_t *activep; 599 600 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); 601 602 pthread_mutex_lock(&tpool->tp_mutex); 603 for (activep = tpool->tp_active; activep; activep = activep->tpa_next) { 604 if (activep->tpa_tid == my_tid) { 605 pthread_mutex_unlock(&tpool->tp_mutex); 606 return (1); 607 } 608 } 609 pthread_mutex_unlock(&tpool->tp_mutex); 610 return (0); 611 } 612