1 /*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21
22 /*
23 * Copyright 2008 Sun Microsystems, Inc. All rights reserved.
24 * Use is subject to license terms.
25 */
26
27 #pragma ident "%Z%%M% %I% %E% SMI"
28
29 #include "lint.h"
30 #include "thr_uberdata.h"
31 #include <stdlib.h>
32 #include <signal.h>
33 #include <errno.h>
34 #include "thread_pool_impl.h"
35
36 static mutex_t thread_pool_lock = DEFAULTMUTEX;
37 static tpool_t *thread_pools = NULL;
38
39 static void
delete_pool(tpool_t * tpool)40 delete_pool(tpool_t *tpool)
41 {
42 tpool_job_t *job;
43
44 ASSERT(tpool->tp_current == 0 && tpool->tp_active == NULL);
45
46 /*
47 * Unlink the pool from the global list of all pools.
48 */
49 lmutex_lock(&thread_pool_lock);
50 if (thread_pools == tpool)
51 thread_pools = tpool->tp_forw;
52 if (thread_pools == tpool)
53 thread_pools = NULL;
54 else {
55 tpool->tp_back->tp_forw = tpool->tp_forw;
56 tpool->tp_forw->tp_back = tpool->tp_back;
57 }
58 lmutex_unlock(&thread_pool_lock);
59
60 /*
61 * There should be no pending jobs, but just in case...
62 */
63 for (job = tpool->tp_head; job != NULL; job = tpool->tp_head) {
64 tpool->tp_head = job->tpj_next;
65 lfree(job, sizeof (*job));
66 }
67 (void) pthread_attr_destroy(&tpool->tp_attr);
68 lfree(tpool, sizeof (*tpool));
69 }
70
71 /*
72 * Worker thread is terminating.
73 */
74 static void
worker_cleanup(tpool_t * tpool)75 worker_cleanup(tpool_t *tpool)
76 {
77 ASSERT(MUTEX_HELD(&tpool->tp_mutex));
78
79 if (--tpool->tp_current == 0 &&
80 (tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) {
81 if (tpool->tp_flags & TP_ABANDON) {
82 sig_mutex_unlock(&tpool->tp_mutex);
83 delete_pool(tpool);
84 return;
85 }
86 if (tpool->tp_flags & TP_DESTROY)
87 (void) cond_broadcast(&tpool->tp_busycv);
88 }
89 sig_mutex_unlock(&tpool->tp_mutex);
90 }
91
92 static void
notify_waiters(tpool_t * tpool)93 notify_waiters(tpool_t *tpool)
94 {
95 if (tpool->tp_head == NULL && tpool->tp_active == NULL) {
96 tpool->tp_flags &= ~TP_WAIT;
97 (void) cond_broadcast(&tpool->tp_waitcv);
98 }
99 }
100
101 /*
102 * Called by a worker thread on return from a tpool_dispatch()d job.
103 */
104 static void
job_cleanup(tpool_t * tpool)105 job_cleanup(tpool_t *tpool)
106 {
107 pthread_t my_tid = pthread_self();
108 tpool_active_t *activep;
109 tpool_active_t **activepp;
110
111 sig_mutex_lock(&tpool->tp_mutex);
112 /* CSTYLED */
113 for (activepp = &tpool->tp_active;; activepp = &activep->tpa_next) {
114 activep = *activepp;
115 if (activep->tpa_tid == my_tid) {
116 *activepp = activep->tpa_next;
117 break;
118 }
119 }
120 if (tpool->tp_flags & TP_WAIT)
121 notify_waiters(tpool);
122 }
123
124 static void *
tpool_worker(void * arg)125 tpool_worker(void *arg)
126 {
127 tpool_t *tpool = (tpool_t *)arg;
128 int elapsed;
129 tpool_job_t *job;
130 void (*func)(void *);
131 tpool_active_t active;
132
133 sig_mutex_lock(&tpool->tp_mutex);
134 pthread_cleanup_push(worker_cleanup, tpool);
135
136 /*
137 * This is the worker's main loop.
138 * It will only be left if a timeout or an error has occured.
139 */
140 active.tpa_tid = pthread_self();
141 for (;;) {
142 elapsed = 0;
143 tpool->tp_idle++;
144 if (tpool->tp_flags & TP_WAIT)
145 notify_waiters(tpool);
146 while ((tpool->tp_head == NULL ||
147 (tpool->tp_flags & TP_SUSPEND)) &&
148 !(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) {
149 if (tpool->tp_current <= tpool->tp_minimum ||
150 tpool->tp_linger == 0) {
151 (void) sig_cond_wait(&tpool->tp_workcv,
152 &tpool->tp_mutex);
153 } else {
154 timestruc_t timeout;
155
156 timeout.tv_sec = tpool->tp_linger;
157 timeout.tv_nsec = 0;
158 if (sig_cond_reltimedwait(&tpool->tp_workcv,
159 &tpool->tp_mutex, &timeout) != 0) {
160 elapsed = 1;
161 break;
162 }
163 }
164 }
165 tpool->tp_idle--;
166 if (tpool->tp_flags & TP_DESTROY)
167 break;
168 if (tpool->tp_flags & TP_ABANDON) {
169 /* can't abandon a suspended pool */
170 if (tpool->tp_flags & TP_SUSPEND) {
171 tpool->tp_flags &= ~TP_SUSPEND;
172 (void) cond_broadcast(&tpool->tp_workcv);
173 }
174 if (tpool->tp_head == NULL)
175 break;
176 }
177 if ((job = tpool->tp_head) != NULL &&
178 !(tpool->tp_flags & TP_SUSPEND)) {
179 elapsed = 0;
180 func = job->tpj_func;
181 arg = job->tpj_arg;
182 tpool->tp_head = job->tpj_next;
183 if (job == tpool->tp_tail)
184 tpool->tp_tail = NULL;
185 tpool->tp_njobs--;
186 active.tpa_next = tpool->tp_active;
187 tpool->tp_active = &active;
188 sig_mutex_unlock(&tpool->tp_mutex);
189 pthread_cleanup_push(job_cleanup, tpool);
190 lfree(job, sizeof (*job));
191 /*
192 * Call the specified function.
193 */
194 func(arg);
195 /*
196 * We don't know what this thread has been doing,
197 * so we reset its signal mask and cancellation
198 * state back to the initial values.
199 */
200 (void) pthread_sigmask(SIG_SETMASK, &maskset, NULL);
201 (void) pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED,
202 NULL);
203 (void) pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,
204 NULL);
205 pthread_cleanup_pop(1);
206 }
207 if (elapsed && tpool->tp_current > tpool->tp_minimum) {
208 /*
209 * We timed out and there is no work to be done
210 * and the number of workers exceeds the minimum.
211 * Exit now to reduce the size of the pool.
212 */
213 break;
214 }
215 }
216 pthread_cleanup_pop(1);
217 return (arg);
218 }
219
220 /*
221 * Create a worker thread, with all signals blocked.
222 */
223 static int
create_worker(tpool_t * tpool)224 create_worker(tpool_t *tpool)
225 {
226 sigset_t oset;
227 int error;
228
229 (void) pthread_sigmask(SIG_SETMASK, &maskset, &oset);
230 error = pthread_create(NULL, &tpool->tp_attr, tpool_worker, tpool);
231 (void) pthread_sigmask(SIG_SETMASK, &oset, NULL);
232 return (error);
233 }
234
235 tpool_t *
tpool_create(uint_t min_threads,uint_t max_threads,uint_t linger,pthread_attr_t * attr)236 tpool_create(uint_t min_threads, uint_t max_threads, uint_t linger,
237 pthread_attr_t *attr)
238 {
239 tpool_t *tpool;
240 void *stackaddr;
241 size_t stacksize;
242 size_t minstack;
243 int error;
244
245 if (min_threads > max_threads || max_threads < 1) {
246 errno = EINVAL;
247 return (NULL);
248 }
249 if (attr != NULL) {
250 if (pthread_attr_getstack(attr, &stackaddr, &stacksize) != 0) {
251 errno = EINVAL;
252 return (NULL);
253 }
254 /*
255 * Allow only one thread in the pool with a specified stack.
256 * Require threads to have at least the minimum stack size.
257 */
258 minstack = thr_min_stack();
259 if (stackaddr != NULL) {
260 if (stacksize < minstack || max_threads != 1) {
261 errno = EINVAL;
262 return (NULL);
263 }
264 } else if (stacksize != 0 && stacksize < minstack) {
265 errno = EINVAL;
266 return (NULL);
267 }
268 }
269
270 tpool = lmalloc(sizeof (*tpool));
271 if (tpool == NULL) {
272 errno = ENOMEM;
273 return (NULL);
274 }
275 (void) mutex_init(&tpool->tp_mutex, USYNC_THREAD, NULL);
276 (void) cond_init(&tpool->tp_busycv, USYNC_THREAD, NULL);
277 (void) cond_init(&tpool->tp_workcv, USYNC_THREAD, NULL);
278 (void) cond_init(&tpool->tp_waitcv, USYNC_THREAD, NULL);
279 tpool->tp_minimum = min_threads;
280 tpool->tp_maximum = max_threads;
281 tpool->tp_linger = linger;
282
283 /*
284 * We cannot just copy the attribute pointer.
285 * We need to initialize a new pthread_attr_t structure
286 * with the values from the user-supplied pthread_attr_t.
287 * If the attribute pointer is NULL, we need to initialize
288 * the new pthread_attr_t structure with default values.
289 */
290 error = pthread_attr_clone(&tpool->tp_attr, attr);
291 if (error) {
292 lfree(tpool, sizeof (*tpool));
293 errno = error;
294 return (NULL);
295 }
296
297 /* make all pool threads be detached daemon threads */
298 (void) pthread_attr_setdetachstate(&tpool->tp_attr,
299 PTHREAD_CREATE_DETACHED);
300 (void) pthread_attr_setdaemonstate_np(&tpool->tp_attr,
301 PTHREAD_CREATE_DAEMON_NP);
302
303 /* insert into the global list of all thread pools */
304 lmutex_lock(&thread_pool_lock);
305 if (thread_pools == NULL) {
306 tpool->tp_forw = tpool;
307 tpool->tp_back = tpool;
308 thread_pools = tpool;
309 } else {
310 thread_pools->tp_back->tp_forw = tpool;
311 tpool->tp_forw = thread_pools;
312 tpool->tp_back = thread_pools->tp_back;
313 thread_pools->tp_back = tpool;
314 }
315 lmutex_unlock(&thread_pool_lock);
316
317 return (tpool);
318 }
319
320 /*
321 * Dispatch a work request to the thread pool.
322 * If there are idle workers, awaken one.
323 * Else, if the maximum number of workers has
324 * not been reached, spawn a new worker thread.
325 * Else just return with the job added to the queue.
326 */
327 int
tpool_dispatch(tpool_t * tpool,void (* func)(void *),void * arg)328 tpool_dispatch(tpool_t *tpool, void (*func)(void *), void *arg)
329 {
330 tpool_job_t *job;
331
332 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
333
334 if ((job = lmalloc(sizeof (*job))) == NULL)
335 return (-1);
336 job->tpj_next = NULL;
337 job->tpj_func = func;
338 job->tpj_arg = arg;
339
340 sig_mutex_lock(&tpool->tp_mutex);
341
342 if (tpool->tp_head == NULL)
343 tpool->tp_head = job;
344 else
345 tpool->tp_tail->tpj_next = job;
346 tpool->tp_tail = job;
347 tpool->tp_njobs++;
348
349 if (!(tpool->tp_flags & TP_SUSPEND)) {
350 if (tpool->tp_idle > 0)
351 (void) cond_signal(&tpool->tp_workcv);
352 else if (tpool->tp_current < tpool->tp_maximum &&
353 create_worker(tpool) == 0)
354 tpool->tp_current++;
355 }
356
357 sig_mutex_unlock(&tpool->tp_mutex);
358 return (0);
359 }
360
361 /*
362 * Assumes: by the time tpool_destroy() is called no one will use this
363 * thread pool in any way and no one will try to dispatch entries to it.
364 * Calling tpool_destroy() from a job in the pool will cause deadlock.
365 */
366 void
tpool_destroy(tpool_t * tpool)367 tpool_destroy(tpool_t *tpool)
368 {
369 tpool_active_t *activep;
370
371 ASSERT(!tpool_member(tpool));
372 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
373
374 sig_mutex_lock(&tpool->tp_mutex);
375 pthread_cleanup_push(sig_mutex_unlock, &tpool->tp_mutex);
376
377 /* mark the pool as being destroyed; wakeup idle workers */
378 tpool->tp_flags |= TP_DESTROY;
379 tpool->tp_flags &= ~TP_SUSPEND;
380 (void) cond_broadcast(&tpool->tp_workcv);
381
382 /* cancel all active workers */
383 for (activep = tpool->tp_active; activep; activep = activep->tpa_next)
384 (void) pthread_cancel(activep->tpa_tid);
385
386 /* wait for all active workers to finish */
387 while (tpool->tp_active != NULL) {
388 tpool->tp_flags |= TP_WAIT;
389 (void) sig_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex);
390 }
391
392 /* the last worker to terminate will wake us up */
393 while (tpool->tp_current != 0)
394 (void) sig_cond_wait(&tpool->tp_busycv, &tpool->tp_mutex);
395
396 pthread_cleanup_pop(1); /* sig_mutex_unlock(&tpool->tp_mutex); */
397 delete_pool(tpool);
398 }
399
400 /*
401 * Like tpool_destroy(), but don't cancel workers or wait for them to finish.
402 * The last worker to terminate will delete the pool.
403 */
404 void
tpool_abandon(tpool_t * tpool)405 tpool_abandon(tpool_t *tpool)
406 {
407 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
408
409 sig_mutex_lock(&tpool->tp_mutex);
410 if (tpool->tp_current == 0) {
411 /* no workers, just delete the pool */
412 sig_mutex_unlock(&tpool->tp_mutex);
413 delete_pool(tpool);
414 } else {
415 /* wake up all workers, last one will delete the pool */
416 tpool->tp_flags |= TP_ABANDON;
417 tpool->tp_flags &= ~TP_SUSPEND;
418 (void) cond_broadcast(&tpool->tp_workcv);
419 sig_mutex_unlock(&tpool->tp_mutex);
420 }
421 }
422
423 /*
424 * Wait for all jobs to complete.
425 * Calling tpool_wait() from a job in the pool will cause deadlock.
426 */
427 void
tpool_wait(tpool_t * tpool)428 tpool_wait(tpool_t *tpool)
429 {
430 ASSERT(!tpool_member(tpool));
431 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
432
433 sig_mutex_lock(&tpool->tp_mutex);
434 pthread_cleanup_push(sig_mutex_unlock, &tpool->tp_mutex);
435 while (tpool->tp_head != NULL || tpool->tp_active != NULL) {
436 tpool->tp_flags |= TP_WAIT;
437 (void) sig_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex);
438 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
439 }
440 pthread_cleanup_pop(1); /* sig_mutex_unlock(&tpool->tp_mutex); */
441 }
442
443 void
tpool_suspend(tpool_t * tpool)444 tpool_suspend(tpool_t *tpool)
445 {
446 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
447
448 sig_mutex_lock(&tpool->tp_mutex);
449 tpool->tp_flags |= TP_SUSPEND;
450 sig_mutex_unlock(&tpool->tp_mutex);
451 }
452
453 int
tpool_suspended(tpool_t * tpool)454 tpool_suspended(tpool_t *tpool)
455 {
456 int suspended;
457
458 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
459
460 sig_mutex_lock(&tpool->tp_mutex);
461 suspended = (tpool->tp_flags & TP_SUSPEND) != 0;
462 sig_mutex_unlock(&tpool->tp_mutex);
463
464 return (suspended);
465 }
466
467 void
tpool_resume(tpool_t * tpool)468 tpool_resume(tpool_t *tpool)
469 {
470 int excess;
471
472 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
473
474 sig_mutex_lock(&tpool->tp_mutex);
475 if (!(tpool->tp_flags & TP_SUSPEND)) {
476 sig_mutex_unlock(&tpool->tp_mutex);
477 return;
478 }
479 tpool->tp_flags &= ~TP_SUSPEND;
480 (void) cond_broadcast(&tpool->tp_workcv);
481 excess = tpool->tp_njobs - tpool->tp_idle;
482 while (excess-- > 0 && tpool->tp_current < tpool->tp_maximum) {
483 if (create_worker(tpool) != 0)
484 break; /* pthread_create() failed */
485 tpool->tp_current++;
486 }
487 sig_mutex_unlock(&tpool->tp_mutex);
488 }
489
490 int
tpool_member(tpool_t * tpool)491 tpool_member(tpool_t *tpool)
492 {
493 pthread_t my_tid = pthread_self();
494 tpool_active_t *activep;
495
496 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
497
498 sig_mutex_lock(&tpool->tp_mutex);
499 for (activep = tpool->tp_active; activep; activep = activep->tpa_next) {
500 if (activep->tpa_tid == my_tid) {
501 sig_mutex_unlock(&tpool->tp_mutex);
502 return (1);
503 }
504 }
505 sig_mutex_unlock(&tpool->tp_mutex);
506 return (0);
507 }
508
509 void
postfork1_child_tpool(void)510 postfork1_child_tpool(void)
511 {
512 pthread_t my_tid = pthread_self();
513 tpool_t *tpool;
514 tpool_job_t *job;
515
516 /*
517 * All of the thread pool workers are gone, except possibly
518 * for the current thread, if it is a thread pool worker thread.
519 * Retain the thread pools, but make them all empty. Whatever
520 * jobs were queued or running belong to the parent process.
521 */
522 top:
523 if ((tpool = thread_pools) == NULL)
524 return;
525
526 do {
527 tpool_active_t *activep;
528
529 (void) mutex_init(&tpool->tp_mutex, USYNC_THREAD, NULL);
530 (void) cond_init(&tpool->tp_busycv, USYNC_THREAD, NULL);
531 (void) cond_init(&tpool->tp_workcv, USYNC_THREAD, NULL);
532 (void) cond_init(&tpool->tp_waitcv, USYNC_THREAD, NULL);
533 for (job = tpool->tp_head; job; job = tpool->tp_head) {
534 tpool->tp_head = job->tpj_next;
535 lfree(job, sizeof (*job));
536 }
537 tpool->tp_tail = NULL;
538 tpool->tp_njobs = 0;
539 for (activep = tpool->tp_active; activep;
540 activep = activep->tpa_next) {
541 if (activep->tpa_tid == my_tid) {
542 activep->tpa_next = NULL;
543 break;
544 }
545 }
546 tpool->tp_idle = 0;
547 tpool->tp_current = 0;
548 if ((tpool->tp_active = activep) != NULL)
549 tpool->tp_current = 1;
550 tpool->tp_flags &= ~TP_WAIT;
551 if (tpool->tp_flags & (TP_DESTROY | TP_ABANDON)) {
552 tpool->tp_flags &= ~TP_DESTROY;
553 tpool->tp_flags |= TP_ABANDON;
554 if (tpool->tp_current == 0) {
555 delete_pool(tpool);
556 goto top; /* start over */
557 }
558 }
559 } while ((tpool = tpool->tp_forw) != thread_pools);
560 }
561