1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 22 /* 23 * Copyright 2008 Sun Microsystems, Inc. All rights reserved. 24 * Use is subject to license terms. 25 */ 26 27 #include "lint.h" 28 #include "thr_uberdata.h" 29 #include <stdlib.h> 30 #include <signal.h> 31 #include <errno.h> 32 #include "thread_pool_impl.h" 33 34 static mutex_t thread_pool_lock = DEFAULTMUTEX; 35 static tpool_t *thread_pools = NULL; 36 37 static void 38 delete_pool(tpool_t *tpool) 39 { 40 tpool_job_t *job; 41 42 ASSERT(tpool->tp_current == 0 && tpool->tp_active == NULL); 43 44 /* 45 * Unlink the pool from the global list of all pools. 46 */ 47 lmutex_lock(&thread_pool_lock); 48 if (thread_pools == tpool) 49 thread_pools = tpool->tp_forw; 50 if (thread_pools == tpool) 51 thread_pools = NULL; 52 else { 53 tpool->tp_back->tp_forw = tpool->tp_forw; 54 tpool->tp_forw->tp_back = tpool->tp_back; 55 } 56 lmutex_unlock(&thread_pool_lock); 57 58 /* 59 * There should be no pending jobs, but just in case... 60 */ 61 for (job = tpool->tp_head; job != NULL; job = tpool->tp_head) { 62 tpool->tp_head = job->tpj_next; 63 lfree(job, sizeof (*job)); 64 } 65 (void) pthread_attr_destroy(&tpool->tp_attr); 66 lfree(tpool, sizeof (*tpool)); 67 } 68 69 /* 70 * Worker thread is terminating. 71 */ 72 static void 73 worker_cleanup(tpool_t *tpool) 74 { 75 ASSERT(MUTEX_HELD(&tpool->tp_mutex)); 76 77 if (--tpool->tp_current == 0 && 78 (tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) { 79 if (tpool->tp_flags & TP_ABANDON) { 80 sig_mutex_unlock(&tpool->tp_mutex); 81 delete_pool(tpool); 82 return; 83 } 84 if (tpool->tp_flags & TP_DESTROY) 85 (void) cond_broadcast(&tpool->tp_busycv); 86 } 87 sig_mutex_unlock(&tpool->tp_mutex); 88 } 89 90 static void 91 notify_waiters(tpool_t *tpool) 92 { 93 if (tpool->tp_head == NULL && tpool->tp_active == NULL) { 94 tpool->tp_flags &= ~TP_WAIT; 95 (void) cond_broadcast(&tpool->tp_waitcv); 96 } 97 } 98 99 /* 100 * Called by a worker thread on return from a tpool_dispatch()d job. 101 */ 102 static void 103 job_cleanup(tpool_t *tpool) 104 { 105 pthread_t my_tid = pthread_self(); 106 tpool_active_t *activep; 107 tpool_active_t **activepp; 108 109 sig_mutex_lock(&tpool->tp_mutex); 110 /* CSTYLED */ 111 for (activepp = &tpool->tp_active;; activepp = &activep->tpa_next) { 112 activep = *activepp; 113 if (activep->tpa_tid == my_tid) { 114 *activepp = activep->tpa_next; 115 break; 116 } 117 } 118 if (tpool->tp_flags & TP_WAIT) 119 notify_waiters(tpool); 120 } 121 122 static void * 123 tpool_worker(void *arg) 124 { 125 tpool_t *tpool = (tpool_t *)arg; 126 int elapsed; 127 tpool_job_t *job; 128 void (*func)(void *); 129 tpool_active_t active; 130 131 sig_mutex_lock(&tpool->tp_mutex); 132 pthread_cleanup_push(worker_cleanup, tpool); 133 134 /* 135 * This is the worker's main loop. 136 * It will only be left if a timeout or an error has occured. 137 */ 138 active.tpa_tid = pthread_self(); 139 for (;;) { 140 elapsed = 0; 141 tpool->tp_idle++; 142 if (tpool->tp_flags & TP_WAIT) 143 notify_waiters(tpool); 144 while ((tpool->tp_head == NULL || 145 (tpool->tp_flags & TP_SUSPEND)) && 146 !(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) { 147 if (tpool->tp_current <= tpool->tp_minimum || 148 tpool->tp_linger == 0) { 149 (void) sig_cond_wait(&tpool->tp_workcv, 150 &tpool->tp_mutex); 151 } else { 152 timestruc_t timeout; 153 154 timeout.tv_sec = tpool->tp_linger; 155 timeout.tv_nsec = 0; 156 if (sig_cond_reltimedwait(&tpool->tp_workcv, 157 &tpool->tp_mutex, &timeout) != 0) { 158 elapsed = 1; 159 break; 160 } 161 } 162 } 163 tpool->tp_idle--; 164 if (tpool->tp_flags & TP_DESTROY) 165 break; 166 if (tpool->tp_flags & TP_ABANDON) { 167 /* can't abandon a suspended pool */ 168 if (tpool->tp_flags & TP_SUSPEND) { 169 tpool->tp_flags &= ~TP_SUSPEND; 170 (void) cond_broadcast(&tpool->tp_workcv); 171 } 172 if (tpool->tp_head == NULL) 173 break; 174 } 175 if ((job = tpool->tp_head) != NULL && 176 !(tpool->tp_flags & TP_SUSPEND)) { 177 elapsed = 0; 178 func = job->tpj_func; 179 arg = job->tpj_arg; 180 tpool->tp_head = job->tpj_next; 181 if (job == tpool->tp_tail) 182 tpool->tp_tail = NULL; 183 tpool->tp_njobs--; 184 active.tpa_next = tpool->tp_active; 185 tpool->tp_active = &active; 186 sig_mutex_unlock(&tpool->tp_mutex); 187 pthread_cleanup_push(job_cleanup, tpool); 188 lfree(job, sizeof (*job)); 189 /* 190 * Call the specified function. 191 */ 192 func(arg); 193 /* 194 * We don't know what this thread has been doing, 195 * so we reset its signal mask and cancellation 196 * state back to the initial values. 197 */ 198 (void) pthread_sigmask(SIG_SETMASK, &maskset, NULL); 199 (void) pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, 200 NULL); 201 (void) pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, 202 NULL); 203 pthread_cleanup_pop(1); 204 } 205 if (elapsed && tpool->tp_current > tpool->tp_minimum) { 206 /* 207 * We timed out and there is no work to be done 208 * and the number of workers exceeds the minimum. 209 * Exit now to reduce the size of the pool. 210 */ 211 break; 212 } 213 } 214 pthread_cleanup_pop(1); 215 return (arg); 216 } 217 218 /* 219 * Create a worker thread, with all signals blocked. 220 */ 221 static int 222 create_worker(tpool_t *tpool) 223 { 224 sigset_t oset; 225 int error; 226 227 (void) pthread_sigmask(SIG_SETMASK, &maskset, &oset); 228 error = pthread_create(NULL, &tpool->tp_attr, tpool_worker, tpool); 229 (void) pthread_sigmask(SIG_SETMASK, &oset, NULL); 230 return (error); 231 } 232 233 tpool_t * 234 tpool_create(uint_t min_threads, uint_t max_threads, uint_t linger, 235 pthread_attr_t *attr) 236 { 237 tpool_t *tpool; 238 void *stackaddr; 239 size_t stacksize; 240 size_t minstack; 241 int error; 242 243 if (min_threads > max_threads || max_threads < 1) { 244 errno = EINVAL; 245 return (NULL); 246 } 247 if (attr != NULL) { 248 if (pthread_attr_getstack(attr, &stackaddr, &stacksize) != 0) { 249 errno = EINVAL; 250 return (NULL); 251 } 252 /* 253 * Allow only one thread in the pool with a specified stack. 254 * Require threads to have at least the minimum stack size. 255 */ 256 minstack = thr_min_stack(); 257 if (stackaddr != NULL) { 258 if (stacksize < minstack || max_threads != 1) { 259 errno = EINVAL; 260 return (NULL); 261 } 262 } else if (stacksize != 0 && stacksize < minstack) { 263 errno = EINVAL; 264 return (NULL); 265 } 266 } 267 268 tpool = lmalloc(sizeof (*tpool)); 269 if (tpool == NULL) { 270 errno = ENOMEM; 271 return (NULL); 272 } 273 (void) mutex_init(&tpool->tp_mutex, USYNC_THREAD, NULL); 274 (void) cond_init(&tpool->tp_busycv, USYNC_THREAD, NULL); 275 (void) cond_init(&tpool->tp_workcv, USYNC_THREAD, NULL); 276 (void) cond_init(&tpool->tp_waitcv, USYNC_THREAD, NULL); 277 tpool->tp_minimum = min_threads; 278 tpool->tp_maximum = max_threads; 279 tpool->tp_linger = linger; 280 281 /* 282 * We cannot just copy the attribute pointer. 283 * We need to initialize a new pthread_attr_t structure 284 * with the values from the user-supplied pthread_attr_t. 285 * If the attribute pointer is NULL, we need to initialize 286 * the new pthread_attr_t structure with default values. 287 */ 288 error = pthread_attr_clone(&tpool->tp_attr, attr); 289 if (error) { 290 lfree(tpool, sizeof (*tpool)); 291 errno = error; 292 return (NULL); 293 } 294 295 /* make all pool threads be detached daemon threads */ 296 (void) pthread_attr_setdetachstate(&tpool->tp_attr, 297 PTHREAD_CREATE_DETACHED); 298 (void) pthread_attr_setdaemonstate_np(&tpool->tp_attr, 299 PTHREAD_CREATE_DAEMON_NP); 300 301 /* insert into the global list of all thread pools */ 302 lmutex_lock(&thread_pool_lock); 303 if (thread_pools == NULL) { 304 tpool->tp_forw = tpool; 305 tpool->tp_back = tpool; 306 thread_pools = tpool; 307 } else { 308 thread_pools->tp_back->tp_forw = tpool; 309 tpool->tp_forw = thread_pools; 310 tpool->tp_back = thread_pools->tp_back; 311 thread_pools->tp_back = tpool; 312 } 313 lmutex_unlock(&thread_pool_lock); 314 315 return (tpool); 316 } 317 318 /* 319 * Dispatch a work request to the thread pool. 320 * If there are idle workers, awaken one. 321 * Else, if the maximum number of workers has 322 * not been reached, spawn a new worker thread. 323 * Else just return with the job added to the queue. 324 */ 325 int 326 tpool_dispatch(tpool_t *tpool, void (*func)(void *), void *arg) 327 { 328 tpool_job_t *job; 329 330 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); 331 332 if ((job = lmalloc(sizeof (*job))) == NULL) 333 return (-1); 334 job->tpj_next = NULL; 335 job->tpj_func = func; 336 job->tpj_arg = arg; 337 338 sig_mutex_lock(&tpool->tp_mutex); 339 340 if (tpool->tp_head == NULL) 341 tpool->tp_head = job; 342 else 343 tpool->tp_tail->tpj_next = job; 344 tpool->tp_tail = job; 345 tpool->tp_njobs++; 346 347 if (!(tpool->tp_flags & TP_SUSPEND)) { 348 if (tpool->tp_idle > 0) 349 (void) cond_signal(&tpool->tp_workcv); 350 else if (tpool->tp_current < tpool->tp_maximum && 351 create_worker(tpool) == 0) 352 tpool->tp_current++; 353 } 354 355 sig_mutex_unlock(&tpool->tp_mutex); 356 return (0); 357 } 358 359 /* 360 * Assumes: by the time tpool_destroy() is called no one will use this 361 * thread pool in any way and no one will try to dispatch entries to it. 362 * Calling tpool_destroy() from a job in the pool will cause deadlock. 363 */ 364 void 365 tpool_destroy(tpool_t *tpool) 366 { 367 tpool_active_t *activep; 368 369 ASSERT(!tpool_member(tpool)); 370 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); 371 372 sig_mutex_lock(&tpool->tp_mutex); 373 pthread_cleanup_push(sig_mutex_unlock, &tpool->tp_mutex); 374 375 /* mark the pool as being destroyed; wakeup idle workers */ 376 tpool->tp_flags |= TP_DESTROY; 377 tpool->tp_flags &= ~TP_SUSPEND; 378 (void) cond_broadcast(&tpool->tp_workcv); 379 380 /* cancel all active workers */ 381 for (activep = tpool->tp_active; activep; activep = activep->tpa_next) 382 (void) pthread_cancel(activep->tpa_tid); 383 384 /* wait for all active workers to finish */ 385 while (tpool->tp_active != NULL) { 386 tpool->tp_flags |= TP_WAIT; 387 (void) sig_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex); 388 } 389 390 /* the last worker to terminate will wake us up */ 391 while (tpool->tp_current != 0) 392 (void) sig_cond_wait(&tpool->tp_busycv, &tpool->tp_mutex); 393 394 pthread_cleanup_pop(1); /* sig_mutex_unlock(&tpool->tp_mutex); */ 395 delete_pool(tpool); 396 } 397 398 /* 399 * Like tpool_destroy(), but don't cancel workers or wait for them to finish. 400 * The last worker to terminate will delete the pool. 401 */ 402 void 403 tpool_abandon(tpool_t *tpool) 404 { 405 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); 406 407 sig_mutex_lock(&tpool->tp_mutex); 408 if (tpool->tp_current == 0) { 409 /* no workers, just delete the pool */ 410 sig_mutex_unlock(&tpool->tp_mutex); 411 delete_pool(tpool); 412 } else { 413 /* wake up all workers, last one will delete the pool */ 414 tpool->tp_flags |= TP_ABANDON; 415 tpool->tp_flags &= ~TP_SUSPEND; 416 (void) cond_broadcast(&tpool->tp_workcv); 417 sig_mutex_unlock(&tpool->tp_mutex); 418 } 419 } 420 421 /* 422 * Wait for all jobs to complete. 423 * Calling tpool_wait() from a job in the pool will cause deadlock. 424 */ 425 void 426 tpool_wait(tpool_t *tpool) 427 { 428 ASSERT(!tpool_member(tpool)); 429 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); 430 431 sig_mutex_lock(&tpool->tp_mutex); 432 pthread_cleanup_push(sig_mutex_unlock, &tpool->tp_mutex); 433 while (tpool->tp_head != NULL || tpool->tp_active != NULL) { 434 tpool->tp_flags |= TP_WAIT; 435 (void) sig_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex); 436 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); 437 } 438 pthread_cleanup_pop(1); /* sig_mutex_unlock(&tpool->tp_mutex); */ 439 } 440 441 void 442 tpool_suspend(tpool_t *tpool) 443 { 444 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); 445 446 sig_mutex_lock(&tpool->tp_mutex); 447 tpool->tp_flags |= TP_SUSPEND; 448 sig_mutex_unlock(&tpool->tp_mutex); 449 } 450 451 int 452 tpool_suspended(tpool_t *tpool) 453 { 454 int suspended; 455 456 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); 457 458 sig_mutex_lock(&tpool->tp_mutex); 459 suspended = (tpool->tp_flags & TP_SUSPEND) != 0; 460 sig_mutex_unlock(&tpool->tp_mutex); 461 462 return (suspended); 463 } 464 465 void 466 tpool_resume(tpool_t *tpool) 467 { 468 int excess; 469 470 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); 471 472 sig_mutex_lock(&tpool->tp_mutex); 473 if (!(tpool->tp_flags & TP_SUSPEND)) { 474 sig_mutex_unlock(&tpool->tp_mutex); 475 return; 476 } 477 tpool->tp_flags &= ~TP_SUSPEND; 478 (void) cond_broadcast(&tpool->tp_workcv); 479 excess = tpool->tp_njobs - tpool->tp_idle; 480 while (excess-- > 0 && tpool->tp_current < tpool->tp_maximum) { 481 if (create_worker(tpool) != 0) 482 break; /* pthread_create() failed */ 483 tpool->tp_current++; 484 } 485 sig_mutex_unlock(&tpool->tp_mutex); 486 } 487 488 int 489 tpool_member(tpool_t *tpool) 490 { 491 pthread_t my_tid = pthread_self(); 492 tpool_active_t *activep; 493 494 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); 495 496 sig_mutex_lock(&tpool->tp_mutex); 497 for (activep = tpool->tp_active; activep; activep = activep->tpa_next) { 498 if (activep->tpa_tid == my_tid) { 499 sig_mutex_unlock(&tpool->tp_mutex); 500 return (1); 501 } 502 } 503 sig_mutex_unlock(&tpool->tp_mutex); 504 return (0); 505 } 506 507 void 508 postfork1_child_tpool(void) 509 { 510 pthread_t my_tid = pthread_self(); 511 tpool_t *tpool; 512 tpool_job_t *job; 513 514 /* 515 * All of the thread pool workers are gone, except possibly 516 * for the current thread, if it is a thread pool worker thread. 517 * Retain the thread pools, but make them all empty. Whatever 518 * jobs were queued or running belong to the parent process. 519 */ 520 top: 521 if ((tpool = thread_pools) == NULL) 522 return; 523 524 do { 525 tpool_active_t *activep; 526 527 (void) mutex_init(&tpool->tp_mutex, USYNC_THREAD, NULL); 528 (void) cond_init(&tpool->tp_busycv, USYNC_THREAD, NULL); 529 (void) cond_init(&tpool->tp_workcv, USYNC_THREAD, NULL); 530 (void) cond_init(&tpool->tp_waitcv, USYNC_THREAD, NULL); 531 for (job = tpool->tp_head; job; job = tpool->tp_head) { 532 tpool->tp_head = job->tpj_next; 533 lfree(job, sizeof (*job)); 534 } 535 tpool->tp_tail = NULL; 536 tpool->tp_njobs = 0; 537 for (activep = tpool->tp_active; activep; 538 activep = activep->tpa_next) { 539 if (activep->tpa_tid == my_tid) { 540 activep->tpa_next = NULL; 541 break; 542 } 543 } 544 tpool->tp_idle = 0; 545 tpool->tp_current = 0; 546 if ((tpool->tp_active = activep) != NULL) 547 tpool->tp_current = 1; 548 tpool->tp_flags &= ~TP_WAIT; 549 if (tpool->tp_flags & (TP_DESTROY | TP_ABANDON)) { 550 tpool->tp_flags &= ~TP_DESTROY; 551 tpool->tp_flags |= TP_ABANDON; 552 if (tpool->tp_current == 0) { 553 delete_pool(tpool); 554 goto top; /* start over */ 555 } 556 } 557 } while ((tpool = tpool->tp_forw) != thread_pools); 558 } 559