1 // SPDX-License-Identifier: CDDL-1.0 2 /* 3 * CDDL HEADER START 4 * 5 * The contents of this file are subject to the terms of the 6 * Common Development and Distribution License (the "License"). 7 * You may not use this file except in compliance with the License. 8 * 9 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 10 * or https://opensource.org/licenses/CDDL-1.0. 11 * See the License for the specific language governing permissions 12 * and limitations under the License. 13 * 14 * When distributing Covered Code, include this CDDL HEADER in each 15 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 16 * If applicable, add the following below this CDDL HEADER, with the 17 * fields enclosed by brackets "[]" replaced with your own identifying 18 * information: Portions Copyright [yyyy] [name of copyright owner] 19 * 20 * CDDL HEADER END 21 */ 22 23 /* 24 * Copyright 2008 Sun Microsystems, Inc. All rights reserved. 25 * Use is subject to license terms. 26 */ 27 28 #include <stdlib.h> 29 #include <signal.h> 30 #include <errno.h> 31 #include <assert.h> 32 #include <limits.h> 33 #include "thread_pool_impl.h" 34 35 static pthread_mutex_t thread_pool_lock = PTHREAD_MUTEX_INITIALIZER; 36 static tpool_t *thread_pools = NULL; 37 38 static void delete_pool(tpool_t * tpool)39 delete_pool(tpool_t *tpool) 40 { 41 tpool_job_t *job; 42 43 ASSERT(tpool->tp_current == 0 && tpool->tp_active == NULL); 44 45 /* 46 * Unlink the pool from the global list of all pools. 47 */ 48 (void) pthread_mutex_lock(&thread_pool_lock); 49 if (thread_pools == tpool) 50 thread_pools = tpool->tp_forw; 51 if (thread_pools == tpool) 52 thread_pools = NULL; 53 else { 54 tpool->tp_back->tp_forw = tpool->tp_forw; 55 tpool->tp_forw->tp_back = tpool->tp_back; 56 } 57 pthread_mutex_unlock(&thread_pool_lock); 58 59 /* 60 * There should be no pending jobs, but just in case... 61 */ 62 for (job = tpool->tp_head; job != NULL; job = tpool->tp_head) { 63 tpool->tp_head = job->tpj_next; 64 free(job); 65 } 66 (void) pthread_attr_destroy(&tpool->tp_attr); 67 free(tpool); 68 } 69 70 /* 71 * Worker thread is terminating. 72 */ 73 static void worker_cleanup(void * arg)74 worker_cleanup(void *arg) 75 { 76 tpool_t *tpool = (tpool_t *)arg; 77 78 if (--tpool->tp_current == 0 && 79 (tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) { 80 if (tpool->tp_flags & TP_ABANDON) { 81 pthread_mutex_unlock(&tpool->tp_mutex); 82 delete_pool(tpool); 83 return; 84 } 85 if (tpool->tp_flags & TP_DESTROY) 86 (void) pthread_cond_broadcast(&tpool->tp_busycv); 87 } 88 pthread_mutex_unlock(&tpool->tp_mutex); 89 } 90 91 static void notify_waiters(tpool_t * tpool)92 notify_waiters(tpool_t *tpool) 93 { 94 if (tpool->tp_head == NULL && tpool->tp_active == NULL) { 95 tpool->tp_flags &= ~TP_WAIT; 96 (void) pthread_cond_broadcast(&tpool->tp_waitcv); 97 } 98 } 99 100 /* 101 * Called by a worker thread on return from a tpool_dispatch()d job. 102 */ 103 static void job_cleanup(void * arg)104 job_cleanup(void *arg) 105 { 106 tpool_t *tpool = (tpool_t *)arg; 107 108 pthread_t my_tid = pthread_self(); 109 tpool_active_t *activep; 110 tpool_active_t **activepp; 111 112 pthread_mutex_lock(&tpool->tp_mutex); 113 for (activepp = &tpool->tp_active; ; activepp = &activep->tpa_next) { 114 activep = *activepp; 115 if (activep->tpa_tid == my_tid) { 116 *activepp = activep->tpa_next; 117 break; 118 } 119 } 120 if (tpool->tp_flags & TP_WAIT) 121 notify_waiters(tpool); 122 } 123 124 static void * tpool_worker(void * arg)125 tpool_worker(void *arg) 126 { 127 tpool_t *tpool = (tpool_t *)arg; 128 int elapsed; 129 tpool_job_t *job; 130 void (*func)(void *); 131 tpool_active_t active; 132 133 pthread_mutex_lock(&tpool->tp_mutex); 134 pthread_cleanup_push(worker_cleanup, tpool); 135 136 /* 137 * This is the worker's main loop. 138 * It will only be left if a timeout or an error has occurred. 139 */ 140 active.tpa_tid = pthread_self(); 141 for (;;) { 142 elapsed = 0; 143 tpool->tp_idle++; 144 if (tpool->tp_flags & TP_WAIT) 145 notify_waiters(tpool); 146 while ((tpool->tp_head == NULL || 147 (tpool->tp_flags & TP_SUSPEND)) && 148 !(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) { 149 if (tpool->tp_current <= tpool->tp_minimum || 150 tpool->tp_linger == 0) { 151 (void) pthread_cond_wait(&tpool->tp_workcv, 152 &tpool->tp_mutex); 153 } else { 154 struct timespec ts; 155 156 clock_gettime(CLOCK_REALTIME, &ts); 157 ts.tv_sec += tpool->tp_linger; 158 159 if (pthread_cond_timedwait(&tpool->tp_workcv, 160 &tpool->tp_mutex, &ts) != 0) { 161 elapsed = 1; 162 break; 163 } 164 } 165 } 166 tpool->tp_idle--; 167 if (tpool->tp_flags & TP_DESTROY) 168 break; 169 if (tpool->tp_flags & TP_ABANDON) { 170 /* can't abandon a suspended pool */ 171 if (tpool->tp_flags & TP_SUSPEND) { 172 tpool->tp_flags &= ~TP_SUSPEND; 173 (void) pthread_cond_broadcast( 174 &tpool->tp_workcv); 175 } 176 if (tpool->tp_head == NULL) 177 break; 178 } 179 if ((job = tpool->tp_head) != NULL && 180 !(tpool->tp_flags & TP_SUSPEND)) { 181 elapsed = 0; 182 func = job->tpj_func; 183 arg = job->tpj_arg; 184 tpool->tp_head = job->tpj_next; 185 if (job == tpool->tp_tail) 186 tpool->tp_tail = NULL; 187 tpool->tp_njobs--; 188 active.tpa_next = tpool->tp_active; 189 tpool->tp_active = &active; 190 pthread_mutex_unlock(&tpool->tp_mutex); 191 pthread_cleanup_push(job_cleanup, tpool); 192 free(job); 193 194 sigset_t maskset; 195 (void) pthread_sigmask(SIG_SETMASK, NULL, &maskset); 196 197 /* 198 * Call the specified function. 199 */ 200 func(arg); 201 /* 202 * We don't know what this thread has been doing, 203 * so we reset its signal mask and cancellation 204 * state back to the values prior to calling func(). 205 */ 206 (void) pthread_sigmask(SIG_SETMASK, &maskset, NULL); 207 (void) pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, 208 NULL); 209 (void) pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, 210 NULL); 211 pthread_cleanup_pop(1); 212 } 213 if (elapsed && tpool->tp_current > tpool->tp_minimum) { 214 /* 215 * We timed out and there is no work to be done 216 * and the number of workers exceeds the minimum. 217 * Exit now to reduce the size of the pool. 218 */ 219 break; 220 } 221 } 222 pthread_cleanup_pop(1); 223 return (arg); 224 } 225 226 /* 227 * Create a worker thread, with default signals blocked. 228 */ 229 static int create_worker(tpool_t * tpool)230 create_worker(tpool_t *tpool) 231 { 232 pthread_t thread; 233 sigset_t oset; 234 int error; 235 236 (void) pthread_sigmask(SIG_SETMASK, NULL, &oset); 237 error = pthread_create(&thread, &tpool->tp_attr, tpool_worker, tpool); 238 (void) pthread_sigmask(SIG_SETMASK, &oset, NULL); 239 return (error); 240 } 241 242 243 /* 244 * pthread_attr_clone: make a copy of a pthread_attr_t. When old_attr 245 * is NULL initialize the cloned attr using default values. 246 */ 247 static int pthread_attr_clone(pthread_attr_t * attr,const pthread_attr_t * old_attr)248 pthread_attr_clone(pthread_attr_t *attr, const pthread_attr_t *old_attr) 249 { 250 int error; 251 252 error = pthread_attr_init(attr); 253 if (error || (old_attr == NULL)) 254 return (error); 255 256 #ifdef __GLIBC__ 257 cpu_set_t cpuset; 258 size_t cpusetsize = sizeof (cpuset); 259 error = pthread_attr_getaffinity_np(old_attr, cpusetsize, &cpuset); 260 if (error == 0) 261 error = pthread_attr_setaffinity_np(attr, cpusetsize, &cpuset); 262 if (error) 263 goto error; 264 #endif /* __GLIBC__ */ 265 266 int detachstate; 267 error = pthread_attr_getdetachstate(old_attr, &detachstate); 268 if (error == 0) 269 error = pthread_attr_setdetachstate(attr, detachstate); 270 if (error) 271 goto error; 272 273 size_t guardsize; 274 error = pthread_attr_getguardsize(old_attr, &guardsize); 275 if (error == 0) 276 error = pthread_attr_setguardsize(attr, guardsize); 277 if (error) 278 goto error; 279 280 int inheritsched; 281 error = pthread_attr_getinheritsched(old_attr, &inheritsched); 282 if (error == 0) 283 error = pthread_attr_setinheritsched(attr, inheritsched); 284 if (error) 285 goto error; 286 287 struct sched_param param; 288 error = pthread_attr_getschedparam(old_attr, ¶m); 289 if (error == 0) 290 error = pthread_attr_setschedparam(attr, ¶m); 291 if (error) 292 goto error; 293 294 int policy; 295 error = pthread_attr_getschedpolicy(old_attr, &policy); 296 if (error == 0) 297 error = pthread_attr_setschedpolicy(attr, policy); 298 if (error) 299 goto error; 300 301 int scope; 302 error = pthread_attr_getscope(old_attr, &scope); 303 if (error == 0) 304 error = pthread_attr_setscope(attr, scope); 305 if (error) 306 goto error; 307 308 void *stackaddr; 309 size_t stacksize; 310 error = pthread_attr_getstack(old_attr, &stackaddr, &stacksize); 311 if (error == 0) 312 error = pthread_attr_setstack(attr, stackaddr, stacksize); 313 if (error) 314 goto error; 315 316 return (0); 317 error: 318 pthread_attr_destroy(attr); 319 return (error); 320 } 321 322 tpool_t * tpool_create(uint_t min_threads,uint_t max_threads,uint_t linger,pthread_attr_t * attr)323 tpool_create(uint_t min_threads, uint_t max_threads, uint_t linger, 324 pthread_attr_t *attr) 325 { 326 tpool_t *tpool; 327 void *stackaddr; 328 size_t stacksize; 329 size_t minstack; 330 int error; 331 332 if (min_threads > max_threads || max_threads < 1) { 333 errno = EINVAL; 334 return (NULL); 335 } 336 if (attr != NULL) { 337 if (pthread_attr_getstack(attr, &stackaddr, &stacksize) != 0) { 338 errno = EINVAL; 339 return (NULL); 340 } 341 /* 342 * Allow only one thread in the pool with a specified stack. 343 * Require threads to have at least the minimum stack size. 344 */ 345 minstack = PTHREAD_STACK_MIN; 346 if (stackaddr != NULL) { 347 if (stacksize < minstack || max_threads != 1) { 348 errno = EINVAL; 349 return (NULL); 350 } 351 } else if (stacksize != 0 && stacksize < minstack) { 352 errno = EINVAL; 353 return (NULL); 354 } 355 } 356 357 tpool = calloc(1, sizeof (*tpool)); 358 if (tpool == NULL) { 359 errno = ENOMEM; 360 return (NULL); 361 } 362 (void) pthread_mutex_init(&tpool->tp_mutex, NULL); 363 (void) pthread_cond_init(&tpool->tp_busycv, NULL); 364 (void) pthread_cond_init(&tpool->tp_workcv, NULL); 365 (void) pthread_cond_init(&tpool->tp_waitcv, NULL); 366 tpool->tp_minimum = min_threads; 367 tpool->tp_maximum = max_threads; 368 tpool->tp_linger = linger; 369 370 /* 371 * We cannot just copy the attribute pointer. 372 * We need to initialize a new pthread_attr_t structure 373 * with the values from the user-supplied pthread_attr_t. 374 * If the attribute pointer is NULL, we need to initialize 375 * the new pthread_attr_t structure with default values. 376 */ 377 error = pthread_attr_clone(&tpool->tp_attr, attr); 378 if (error) { 379 free(tpool); 380 errno = error; 381 return (NULL); 382 } 383 384 /* make all pool threads be detached daemon threads */ 385 (void) pthread_attr_setdetachstate(&tpool->tp_attr, 386 PTHREAD_CREATE_DETACHED); 387 388 /* insert into the global list of all thread pools */ 389 pthread_mutex_lock(&thread_pool_lock); 390 if (thread_pools == NULL) { 391 tpool->tp_forw = tpool; 392 tpool->tp_back = tpool; 393 thread_pools = tpool; 394 } else { 395 thread_pools->tp_back->tp_forw = tpool; 396 tpool->tp_forw = thread_pools; 397 tpool->tp_back = thread_pools->tp_back; 398 thread_pools->tp_back = tpool; 399 } 400 pthread_mutex_unlock(&thread_pool_lock); 401 402 return (tpool); 403 } 404 405 /* 406 * Dispatch a work request to the thread pool. 407 * If there are idle workers, awaken one. 408 * Else, if the maximum number of workers has 409 * not been reached, spawn a new worker thread. 410 * Else just return with the job added to the queue. 411 */ 412 int tpool_dispatch(tpool_t * tpool,void (* func)(void *),void * arg)413 tpool_dispatch(tpool_t *tpool, void (*func)(void *), void *arg) 414 { 415 tpool_job_t *job; 416 417 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); 418 419 if ((job = calloc(1, sizeof (*job))) == NULL) 420 return (-1); 421 job->tpj_next = NULL; 422 job->tpj_func = func; 423 job->tpj_arg = arg; 424 425 pthread_mutex_lock(&tpool->tp_mutex); 426 427 if (!(tpool->tp_flags & TP_SUSPEND)) { 428 if (tpool->tp_idle > 0) 429 (void) pthread_cond_signal(&tpool->tp_workcv); 430 else if (tpool->tp_current >= tpool->tp_maximum) { 431 /* At worker limit. Leave task on queue */ 432 } else { 433 if (create_worker(tpool) == 0) { 434 /* Started a new worker thread */ 435 tpool->tp_current++; 436 } else if (tpool->tp_current > 0) { 437 /* Leave task on queue */ 438 } else { 439 /* Cannot start a single worker! */ 440 pthread_mutex_unlock(&tpool->tp_mutex); 441 free(job); 442 return (-1); 443 } 444 } 445 } 446 447 if (tpool->tp_head == NULL) 448 tpool->tp_head = job; 449 else 450 tpool->tp_tail->tpj_next = job; 451 tpool->tp_tail = job; 452 tpool->tp_njobs++; 453 454 pthread_mutex_unlock(&tpool->tp_mutex); 455 return (0); 456 } 457 458 static void tpool_cleanup(void * arg)459 tpool_cleanup(void *arg) 460 { 461 tpool_t *tpool = (tpool_t *)arg; 462 463 pthread_mutex_unlock(&tpool->tp_mutex); 464 } 465 466 /* 467 * Assumes: by the time tpool_destroy() is called no one will use this 468 * thread pool in any way and no one will try to dispatch entries to it. 469 * Calling tpool_destroy() from a job in the pool will cause deadlock. 470 */ 471 void tpool_destroy(tpool_t * tpool)472 tpool_destroy(tpool_t *tpool) 473 { 474 tpool_active_t *activep; 475 476 ASSERT(!tpool_member(tpool)); 477 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); 478 479 pthread_mutex_lock(&tpool->tp_mutex); 480 pthread_cleanup_push(tpool_cleanup, tpool); 481 482 /* mark the pool as being destroyed; wakeup idle workers */ 483 tpool->tp_flags |= TP_DESTROY; 484 tpool->tp_flags &= ~TP_SUSPEND; 485 (void) pthread_cond_broadcast(&tpool->tp_workcv); 486 487 /* cancel all active workers */ 488 for (activep = tpool->tp_active; activep; activep = activep->tpa_next) 489 (void) pthread_cancel(activep->tpa_tid); 490 491 /* wait for all active workers to finish */ 492 while (tpool->tp_active != NULL) { 493 tpool->tp_flags |= TP_WAIT; 494 (void) pthread_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex); 495 } 496 497 /* the last worker to terminate will wake us up */ 498 while (tpool->tp_current != 0) 499 (void) pthread_cond_wait(&tpool->tp_busycv, &tpool->tp_mutex); 500 501 pthread_cleanup_pop(1); /* pthread_mutex_unlock(&tpool->tp_mutex); */ 502 delete_pool(tpool); 503 } 504 505 /* 506 * Like tpool_destroy(), but don't cancel workers or wait for them to finish. 507 * The last worker to terminate will delete the pool. 508 */ 509 void tpool_abandon(tpool_t * tpool)510 tpool_abandon(tpool_t *tpool) 511 { 512 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); 513 514 pthread_mutex_lock(&tpool->tp_mutex); 515 if (tpool->tp_current == 0) { 516 /* no workers, just delete the pool */ 517 pthread_mutex_unlock(&tpool->tp_mutex); 518 delete_pool(tpool); 519 } else { 520 /* wake up all workers, last one will delete the pool */ 521 tpool->tp_flags |= TP_ABANDON; 522 tpool->tp_flags &= ~TP_SUSPEND; 523 (void) pthread_cond_broadcast(&tpool->tp_workcv); 524 pthread_mutex_unlock(&tpool->tp_mutex); 525 } 526 } 527 528 /* 529 * Wait for all jobs to complete. 530 * Calling tpool_wait() from a job in the pool will cause deadlock. 531 */ 532 void tpool_wait(tpool_t * tpool)533 tpool_wait(tpool_t *tpool) 534 { 535 ASSERT(!tpool_member(tpool)); 536 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); 537 538 pthread_mutex_lock(&tpool->tp_mutex); 539 pthread_cleanup_push(tpool_cleanup, tpool); 540 while (tpool->tp_head != NULL || tpool->tp_active != NULL) { 541 tpool->tp_flags |= TP_WAIT; 542 (void) pthread_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex); 543 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); 544 } 545 pthread_cleanup_pop(1); /* pthread_mutex_unlock(&tpool->tp_mutex); */ 546 } 547 548 void tpool_suspend(tpool_t * tpool)549 tpool_suspend(tpool_t *tpool) 550 { 551 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); 552 553 pthread_mutex_lock(&tpool->tp_mutex); 554 tpool->tp_flags |= TP_SUSPEND; 555 pthread_mutex_unlock(&tpool->tp_mutex); 556 } 557 558 int tpool_suspended(tpool_t * tpool)559 tpool_suspended(tpool_t *tpool) 560 { 561 int suspended; 562 563 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); 564 565 pthread_mutex_lock(&tpool->tp_mutex); 566 suspended = (tpool->tp_flags & TP_SUSPEND) != 0; 567 pthread_mutex_unlock(&tpool->tp_mutex); 568 569 return (suspended); 570 } 571 572 void tpool_resume(tpool_t * tpool)573 tpool_resume(tpool_t *tpool) 574 { 575 int excess; 576 577 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); 578 579 pthread_mutex_lock(&tpool->tp_mutex); 580 if (!(tpool->tp_flags & TP_SUSPEND)) { 581 pthread_mutex_unlock(&tpool->tp_mutex); 582 return; 583 } 584 tpool->tp_flags &= ~TP_SUSPEND; 585 (void) pthread_cond_broadcast(&tpool->tp_workcv); 586 excess = tpool->tp_njobs - tpool->tp_idle; 587 while (excess-- > 0 && tpool->tp_current < tpool->tp_maximum) { 588 if (create_worker(tpool) != 0) 589 break; /* pthread_create() failed */ 590 tpool->tp_current++; 591 } 592 pthread_mutex_unlock(&tpool->tp_mutex); 593 } 594 595 int tpool_member(tpool_t * tpool)596 tpool_member(tpool_t *tpool) 597 { 598 pthread_t my_tid = pthread_self(); 599 tpool_active_t *activep; 600 601 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); 602 603 pthread_mutex_lock(&tpool->tp_mutex); 604 for (activep = tpool->tp_active; activep; activep = activep->tpa_next) { 605 if (activep->tpa_tid == my_tid) { 606 pthread_mutex_unlock(&tpool->tp_mutex); 607 return (1); 608 } 609 } 610 pthread_mutex_unlock(&tpool->tp_mutex); 611 return (0); 612 } 613