1 /*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21
22 /*
23 * Copyright 2008 Sun Microsystems, Inc. All rights reserved.
24 * Use is subject to license terms.
25 */
26
27 #include "lint.h"
28 #include "thr_uberdata.h"
29 #include <stdlib.h>
30 #include <signal.h>
31 #include <errno.h>
32 #include "thread_pool_impl.h"
33
34 static mutex_t thread_pool_lock = DEFAULTMUTEX;
35 static tpool_t *thread_pools = NULL;
36
37 static void
delete_pool(tpool_t * tpool)38 delete_pool(tpool_t *tpool)
39 {
40 tpool_job_t *job;
41
42 ASSERT(tpool->tp_current == 0 && tpool->tp_active == NULL);
43
44 /*
45 * Unlink the pool from the global list of all pools.
46 */
47 lmutex_lock(&thread_pool_lock);
48 if (thread_pools == tpool)
49 thread_pools = tpool->tp_forw;
50 if (thread_pools == tpool)
51 thread_pools = NULL;
52 else {
53 tpool->tp_back->tp_forw = tpool->tp_forw;
54 tpool->tp_forw->tp_back = tpool->tp_back;
55 }
56 lmutex_unlock(&thread_pool_lock);
57
58 /*
59 * There should be no pending jobs, but just in case...
60 */
61 for (job = tpool->tp_head; job != NULL; job = tpool->tp_head) {
62 tpool->tp_head = job->tpj_next;
63 lfree(job, sizeof (*job));
64 }
65 (void) pthread_attr_destroy(&tpool->tp_attr);
66 lfree(tpool, sizeof (*tpool));
67 }
68
69 /*
70 * Worker thread is terminating.
71 */
72 static void
worker_cleanup(tpool_t * tpool)73 worker_cleanup(tpool_t *tpool)
74 {
75 ASSERT(MUTEX_HELD(&tpool->tp_mutex));
76
77 if (--tpool->tp_current == 0 &&
78 (tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) {
79 if (tpool->tp_flags & TP_ABANDON) {
80 sig_mutex_unlock(&tpool->tp_mutex);
81 delete_pool(tpool);
82 return;
83 }
84 if (tpool->tp_flags & TP_DESTROY)
85 (void) cond_broadcast(&tpool->tp_busycv);
86 }
87 sig_mutex_unlock(&tpool->tp_mutex);
88 }
89
90 static void
notify_waiters(tpool_t * tpool)91 notify_waiters(tpool_t *tpool)
92 {
93 if (tpool->tp_head == NULL && tpool->tp_active == NULL) {
94 tpool->tp_flags &= ~TP_WAIT;
95 (void) cond_broadcast(&tpool->tp_waitcv);
96 }
97 }
98
99 /*
100 * Called by a worker thread on return from a tpool_dispatch()d job.
101 */
102 static void
job_cleanup(tpool_t * tpool)103 job_cleanup(tpool_t *tpool)
104 {
105 pthread_t my_tid = pthread_self();
106 tpool_active_t *activep;
107 tpool_active_t **activepp;
108
109 sig_mutex_lock(&tpool->tp_mutex);
110 /* CSTYLED */
111 for (activepp = &tpool->tp_active;; activepp = &activep->tpa_next) {
112 activep = *activepp;
113 if (activep->tpa_tid == my_tid) {
114 *activepp = activep->tpa_next;
115 break;
116 }
117 }
118 if (tpool->tp_flags & TP_WAIT)
119 notify_waiters(tpool);
120 }
121
122 static void *
tpool_worker(void * arg)123 tpool_worker(void *arg)
124 {
125 tpool_t *tpool = (tpool_t *)arg;
126 int elapsed;
127 tpool_job_t *job;
128 void (*func)(void *);
129 tpool_active_t active;
130
131 sig_mutex_lock(&tpool->tp_mutex);
132 pthread_cleanup_push(worker_cleanup, tpool);
133
134 /*
135 * This is the worker's main loop.
136 * It will only be left if a timeout or an error has occured.
137 */
138 active.tpa_tid = pthread_self();
139 for (;;) {
140 elapsed = 0;
141 tpool->tp_idle++;
142 if (tpool->tp_flags & TP_WAIT)
143 notify_waiters(tpool);
144 while ((tpool->tp_head == NULL ||
145 (tpool->tp_flags & TP_SUSPEND)) &&
146 !(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) {
147 if (tpool->tp_current <= tpool->tp_minimum ||
148 tpool->tp_linger == 0) {
149 (void) sig_cond_wait(&tpool->tp_workcv,
150 &tpool->tp_mutex);
151 } else {
152 timestruc_t timeout;
153
154 timeout.tv_sec = tpool->tp_linger;
155 timeout.tv_nsec = 0;
156 if (sig_cond_reltimedwait(&tpool->tp_workcv,
157 &tpool->tp_mutex, &timeout) != 0) {
158 elapsed = 1;
159 break;
160 }
161 }
162 }
163 tpool->tp_idle--;
164 if (tpool->tp_flags & TP_DESTROY)
165 break;
166 if (tpool->tp_flags & TP_ABANDON) {
167 /* can't abandon a suspended pool */
168 if (tpool->tp_flags & TP_SUSPEND) {
169 tpool->tp_flags &= ~TP_SUSPEND;
170 (void) cond_broadcast(&tpool->tp_workcv);
171 }
172 if (tpool->tp_head == NULL)
173 break;
174 }
175 if ((job = tpool->tp_head) != NULL &&
176 !(tpool->tp_flags & TP_SUSPEND)) {
177 elapsed = 0;
178 func = job->tpj_func;
179 arg = job->tpj_arg;
180 tpool->tp_head = job->tpj_next;
181 if (job == tpool->tp_tail)
182 tpool->tp_tail = NULL;
183 tpool->tp_njobs--;
184 active.tpa_next = tpool->tp_active;
185 tpool->tp_active = &active;
186 sig_mutex_unlock(&tpool->tp_mutex);
187 pthread_cleanup_push(job_cleanup, tpool);
188 lfree(job, sizeof (*job));
189 /*
190 * Call the specified function.
191 */
192 func(arg);
193 /*
194 * We don't know what this thread has been doing,
195 * so we reset its signal mask and cancellation
196 * state back to the initial values.
197 */
198 (void) pthread_sigmask(SIG_SETMASK, &maskset, NULL);
199 (void) pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED,
200 NULL);
201 (void) pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,
202 NULL);
203 pthread_cleanup_pop(1);
204 }
205 if (elapsed && tpool->tp_current > tpool->tp_minimum) {
206 /*
207 * We timed out and there is no work to be done
208 * and the number of workers exceeds the minimum.
209 * Exit now to reduce the size of the pool.
210 */
211 break;
212 }
213 }
214 pthread_cleanup_pop(1);
215 return (arg);
216 }
217
218 /*
219 * Create a worker thread, with all signals blocked.
220 */
221 static int
create_worker(tpool_t * tpool)222 create_worker(tpool_t *tpool)
223 {
224 sigset_t oset;
225 int error;
226
227 (void) pthread_sigmask(SIG_SETMASK, &maskset, &oset);
228 error = pthread_create(NULL, &tpool->tp_attr, tpool_worker, tpool);
229 (void) pthread_sigmask(SIG_SETMASK, &oset, NULL);
230 return (error);
231 }
232
233 tpool_t *
tpool_create(uint_t min_threads,uint_t max_threads,uint_t linger,pthread_attr_t * attr)234 tpool_create(uint_t min_threads, uint_t max_threads, uint_t linger,
235 pthread_attr_t *attr)
236 {
237 tpool_t *tpool;
238 void *stackaddr;
239 size_t stacksize;
240 size_t minstack;
241 int error;
242
243 if (min_threads > max_threads || max_threads < 1) {
244 errno = EINVAL;
245 return (NULL);
246 }
247 if (attr != NULL) {
248 if (pthread_attr_getstack(attr, &stackaddr, &stacksize) != 0) {
249 errno = EINVAL;
250 return (NULL);
251 }
252 /*
253 * Allow only one thread in the pool with a specified stack.
254 * Require threads to have at least the minimum stack size.
255 */
256 minstack = thr_min_stack();
257 if (stackaddr != NULL) {
258 if (stacksize < minstack || max_threads != 1) {
259 errno = EINVAL;
260 return (NULL);
261 }
262 } else if (stacksize != 0 && stacksize < minstack) {
263 errno = EINVAL;
264 return (NULL);
265 }
266 }
267
268 tpool = lmalloc(sizeof (*tpool));
269 if (tpool == NULL) {
270 errno = ENOMEM;
271 return (NULL);
272 }
273 (void) mutex_init(&tpool->tp_mutex, USYNC_THREAD, NULL);
274 (void) cond_init(&tpool->tp_busycv, USYNC_THREAD, NULL);
275 (void) cond_init(&tpool->tp_workcv, USYNC_THREAD, NULL);
276 (void) cond_init(&tpool->tp_waitcv, USYNC_THREAD, NULL);
277 tpool->tp_minimum = min_threads;
278 tpool->tp_maximum = max_threads;
279 tpool->tp_linger = linger;
280
281 /*
282 * We cannot just copy the attribute pointer.
283 * We need to initialize a new pthread_attr_t structure
284 * with the values from the user-supplied pthread_attr_t.
285 * If the attribute pointer is NULL, we need to initialize
286 * the new pthread_attr_t structure with default values.
287 */
288 error = pthread_attr_clone(&tpool->tp_attr, attr);
289 if (error) {
290 lfree(tpool, sizeof (*tpool));
291 errno = error;
292 return (NULL);
293 }
294
295 /* make all pool threads be detached daemon threads */
296 (void) pthread_attr_setdetachstate(&tpool->tp_attr,
297 PTHREAD_CREATE_DETACHED);
298 (void) pthread_attr_setdaemonstate_np(&tpool->tp_attr,
299 PTHREAD_CREATE_DAEMON_NP);
300
301 /* insert into the global list of all thread pools */
302 lmutex_lock(&thread_pool_lock);
303 if (thread_pools == NULL) {
304 tpool->tp_forw = tpool;
305 tpool->tp_back = tpool;
306 thread_pools = tpool;
307 } else {
308 thread_pools->tp_back->tp_forw = tpool;
309 tpool->tp_forw = thread_pools;
310 tpool->tp_back = thread_pools->tp_back;
311 thread_pools->tp_back = tpool;
312 }
313 lmutex_unlock(&thread_pool_lock);
314
315 return (tpool);
316 }
317
318 /*
319 * Dispatch a work request to the thread pool.
320 * If there are idle workers, awaken one.
321 * Else, if the maximum number of workers has
322 * not been reached, spawn a new worker thread.
323 * Else just return with the job added to the queue.
324 */
325 int
tpool_dispatch(tpool_t * tpool,void (* func)(void *),void * arg)326 tpool_dispatch(tpool_t *tpool, void (*func)(void *), void *arg)
327 {
328 tpool_job_t *job;
329
330 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
331
332 if ((job = lmalloc(sizeof (*job))) == NULL)
333 return (-1);
334 job->tpj_next = NULL;
335 job->tpj_func = func;
336 job->tpj_arg = arg;
337
338 sig_mutex_lock(&tpool->tp_mutex);
339
340 if (tpool->tp_head == NULL)
341 tpool->tp_head = job;
342 else
343 tpool->tp_tail->tpj_next = job;
344 tpool->tp_tail = job;
345 tpool->tp_njobs++;
346
347 if (!(tpool->tp_flags & TP_SUSPEND)) {
348 if (tpool->tp_idle > 0)
349 (void) cond_signal(&tpool->tp_workcv);
350 else if (tpool->tp_current < tpool->tp_maximum &&
351 create_worker(tpool) == 0)
352 tpool->tp_current++;
353 }
354
355 sig_mutex_unlock(&tpool->tp_mutex);
356 return (0);
357 }
358
359 /*
360 * Assumes: by the time tpool_destroy() is called no one will use this
361 * thread pool in any way and no one will try to dispatch entries to it.
362 * Calling tpool_destroy() from a job in the pool will cause deadlock.
363 */
364 void
tpool_destroy(tpool_t * tpool)365 tpool_destroy(tpool_t *tpool)
366 {
367 tpool_active_t *activep;
368
369 ASSERT(!tpool_member(tpool));
370 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
371
372 sig_mutex_lock(&tpool->tp_mutex);
373 pthread_cleanup_push(sig_mutex_unlock, &tpool->tp_mutex);
374
375 /* mark the pool as being destroyed; wakeup idle workers */
376 tpool->tp_flags |= TP_DESTROY;
377 tpool->tp_flags &= ~TP_SUSPEND;
378 (void) cond_broadcast(&tpool->tp_workcv);
379
380 /* cancel all active workers */
381 for (activep = tpool->tp_active; activep; activep = activep->tpa_next)
382 (void) pthread_cancel(activep->tpa_tid);
383
384 /* wait for all active workers to finish */
385 while (tpool->tp_active != NULL) {
386 tpool->tp_flags |= TP_WAIT;
387 (void) sig_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex);
388 }
389
390 /* the last worker to terminate will wake us up */
391 while (tpool->tp_current != 0)
392 (void) sig_cond_wait(&tpool->tp_busycv, &tpool->tp_mutex);
393
394 pthread_cleanup_pop(1); /* sig_mutex_unlock(&tpool->tp_mutex); */
395 delete_pool(tpool);
396 }
397
398 /*
399 * Like tpool_destroy(), but don't cancel workers or wait for them to finish.
400 * The last worker to terminate will delete the pool.
401 */
402 void
tpool_abandon(tpool_t * tpool)403 tpool_abandon(tpool_t *tpool)
404 {
405 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
406
407 sig_mutex_lock(&tpool->tp_mutex);
408 if (tpool->tp_current == 0) {
409 /* no workers, just delete the pool */
410 sig_mutex_unlock(&tpool->tp_mutex);
411 delete_pool(tpool);
412 } else {
413 /* wake up all workers, last one will delete the pool */
414 tpool->tp_flags |= TP_ABANDON;
415 tpool->tp_flags &= ~TP_SUSPEND;
416 (void) cond_broadcast(&tpool->tp_workcv);
417 sig_mutex_unlock(&tpool->tp_mutex);
418 }
419 }
420
421 /*
422 * Wait for all jobs to complete.
423 * Calling tpool_wait() from a job in the pool will cause deadlock.
424 */
425 void
tpool_wait(tpool_t * tpool)426 tpool_wait(tpool_t *tpool)
427 {
428 ASSERT(!tpool_member(tpool));
429 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
430
431 sig_mutex_lock(&tpool->tp_mutex);
432 pthread_cleanup_push(sig_mutex_unlock, &tpool->tp_mutex);
433 while (tpool->tp_head != NULL || tpool->tp_active != NULL) {
434 tpool->tp_flags |= TP_WAIT;
435 (void) sig_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex);
436 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
437 }
438 pthread_cleanup_pop(1); /* sig_mutex_unlock(&tpool->tp_mutex); */
439 }
440
441 void
tpool_suspend(tpool_t * tpool)442 tpool_suspend(tpool_t *tpool)
443 {
444 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
445
446 sig_mutex_lock(&tpool->tp_mutex);
447 tpool->tp_flags |= TP_SUSPEND;
448 sig_mutex_unlock(&tpool->tp_mutex);
449 }
450
451 int
tpool_suspended(tpool_t * tpool)452 tpool_suspended(tpool_t *tpool)
453 {
454 int suspended;
455
456 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
457
458 sig_mutex_lock(&tpool->tp_mutex);
459 suspended = (tpool->tp_flags & TP_SUSPEND) != 0;
460 sig_mutex_unlock(&tpool->tp_mutex);
461
462 return (suspended);
463 }
464
465 void
tpool_resume(tpool_t * tpool)466 tpool_resume(tpool_t *tpool)
467 {
468 int excess;
469
470 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
471
472 sig_mutex_lock(&tpool->tp_mutex);
473 if (!(tpool->tp_flags & TP_SUSPEND)) {
474 sig_mutex_unlock(&tpool->tp_mutex);
475 return;
476 }
477 tpool->tp_flags &= ~TP_SUSPEND;
478 (void) cond_broadcast(&tpool->tp_workcv);
479 excess = tpool->tp_njobs - tpool->tp_idle;
480 while (excess-- > 0 && tpool->tp_current < tpool->tp_maximum) {
481 if (create_worker(tpool) != 0)
482 break; /* pthread_create() failed */
483 tpool->tp_current++;
484 }
485 sig_mutex_unlock(&tpool->tp_mutex);
486 }
487
488 int
tpool_member(tpool_t * tpool)489 tpool_member(tpool_t *tpool)
490 {
491 pthread_t my_tid = pthread_self();
492 tpool_active_t *activep;
493
494 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
495
496 sig_mutex_lock(&tpool->tp_mutex);
497 for (activep = tpool->tp_active; activep; activep = activep->tpa_next) {
498 if (activep->tpa_tid == my_tid) {
499 sig_mutex_unlock(&tpool->tp_mutex);
500 return (1);
501 }
502 }
503 sig_mutex_unlock(&tpool->tp_mutex);
504 return (0);
505 }
506
507 void
postfork1_child_tpool(void)508 postfork1_child_tpool(void)
509 {
510 pthread_t my_tid = pthread_self();
511 tpool_t *tpool;
512 tpool_job_t *job;
513
514 /*
515 * All of the thread pool workers are gone, except possibly
516 * for the current thread, if it is a thread pool worker thread.
517 * Retain the thread pools, but make them all empty. Whatever
518 * jobs were queued or running belong to the parent process.
519 */
520 top:
521 if ((tpool = thread_pools) == NULL)
522 return;
523
524 do {
525 tpool_active_t *activep;
526
527 (void) mutex_init(&tpool->tp_mutex, USYNC_THREAD, NULL);
528 (void) cond_init(&tpool->tp_busycv, USYNC_THREAD, NULL);
529 (void) cond_init(&tpool->tp_workcv, USYNC_THREAD, NULL);
530 (void) cond_init(&tpool->tp_waitcv, USYNC_THREAD, NULL);
531 for (job = tpool->tp_head; job; job = tpool->tp_head) {
532 tpool->tp_head = job->tpj_next;
533 lfree(job, sizeof (*job));
534 }
535 tpool->tp_tail = NULL;
536 tpool->tp_njobs = 0;
537 for (activep = tpool->tp_active; activep;
538 activep = activep->tpa_next) {
539 if (activep->tpa_tid == my_tid) {
540 activep->tpa_next = NULL;
541 break;
542 }
543 }
544 tpool->tp_idle = 0;
545 tpool->tp_current = 0;
546 if ((tpool->tp_active = activep) != NULL)
547 tpool->tp_current = 1;
548 tpool->tp_flags &= ~TP_WAIT;
549 if (tpool->tp_flags & (TP_DESTROY | TP_ABANDON)) {
550 tpool->tp_flags &= ~TP_DESTROY;
551 tpool->tp_flags |= TP_ABANDON;
552 if (tpool->tp_current == 0) {
553 delete_pool(tpool);
554 goto top; /* start over */
555 }
556 }
557 } while ((tpool = tpool->tp_forw) != thread_pools);
558 }
559