1 /*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or https://opensource.org/licenses/CDDL-1.0.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21
22 /*
23 * Copyright 2008 Sun Microsystems, Inc. All rights reserved.
24 * Use is subject to license terms.
25 */
26
27 #include <stdlib.h>
28 #include <signal.h>
29 #include <errno.h>
30 #include <assert.h>
31 #include <limits.h>
32 #include "thread_pool_impl.h"
33
34 static pthread_mutex_t thread_pool_lock = PTHREAD_MUTEX_INITIALIZER;
35 static tpool_t *thread_pools = NULL;
36
37 static void
delete_pool(tpool_t * tpool)38 delete_pool(tpool_t *tpool)
39 {
40 tpool_job_t *job;
41
42 ASSERT(tpool->tp_current == 0 && tpool->tp_active == NULL);
43
44 /*
45 * Unlink the pool from the global list of all pools.
46 */
47 (void) pthread_mutex_lock(&thread_pool_lock);
48 if (thread_pools == tpool)
49 thread_pools = tpool->tp_forw;
50 if (thread_pools == tpool)
51 thread_pools = NULL;
52 else {
53 tpool->tp_back->tp_forw = tpool->tp_forw;
54 tpool->tp_forw->tp_back = tpool->tp_back;
55 }
56 pthread_mutex_unlock(&thread_pool_lock);
57
58 /*
59 * There should be no pending jobs, but just in case...
60 */
61 for (job = tpool->tp_head; job != NULL; job = tpool->tp_head) {
62 tpool->tp_head = job->tpj_next;
63 free(job);
64 }
65 (void) pthread_attr_destroy(&tpool->tp_attr);
66 free(tpool);
67 }
68
69 /*
70 * Worker thread is terminating.
71 */
72 static void
worker_cleanup(void * arg)73 worker_cleanup(void *arg)
74 {
75 tpool_t *tpool = (tpool_t *)arg;
76
77 if (--tpool->tp_current == 0 &&
78 (tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) {
79 if (tpool->tp_flags & TP_ABANDON) {
80 pthread_mutex_unlock(&tpool->tp_mutex);
81 delete_pool(tpool);
82 return;
83 }
84 if (tpool->tp_flags & TP_DESTROY)
85 (void) pthread_cond_broadcast(&tpool->tp_busycv);
86 }
87 pthread_mutex_unlock(&tpool->tp_mutex);
88 }
89
90 static void
notify_waiters(tpool_t * tpool)91 notify_waiters(tpool_t *tpool)
92 {
93 if (tpool->tp_head == NULL && tpool->tp_active == NULL) {
94 tpool->tp_flags &= ~TP_WAIT;
95 (void) pthread_cond_broadcast(&tpool->tp_waitcv);
96 }
97 }
98
99 /*
100 * Called by a worker thread on return from a tpool_dispatch()d job.
101 */
102 static void
job_cleanup(void * arg)103 job_cleanup(void *arg)
104 {
105 tpool_t *tpool = (tpool_t *)arg;
106
107 pthread_t my_tid = pthread_self();
108 tpool_active_t *activep;
109 tpool_active_t **activepp;
110
111 pthread_mutex_lock(&tpool->tp_mutex);
112 for (activepp = &tpool->tp_active; ; activepp = &activep->tpa_next) {
113 activep = *activepp;
114 if (activep->tpa_tid == my_tid) {
115 *activepp = activep->tpa_next;
116 break;
117 }
118 }
119 if (tpool->tp_flags & TP_WAIT)
120 notify_waiters(tpool);
121 }
122
123 static void *
tpool_worker(void * arg)124 tpool_worker(void *arg)
125 {
126 tpool_t *tpool = (tpool_t *)arg;
127 int elapsed;
128 tpool_job_t *job;
129 void (*func)(void *);
130 tpool_active_t active;
131
132 pthread_mutex_lock(&tpool->tp_mutex);
133 pthread_cleanup_push(worker_cleanup, tpool);
134
135 /*
136 * This is the worker's main loop.
137 * It will only be left if a timeout or an error has occurred.
138 */
139 active.tpa_tid = pthread_self();
140 for (;;) {
141 elapsed = 0;
142 tpool->tp_idle++;
143 if (tpool->tp_flags & TP_WAIT)
144 notify_waiters(tpool);
145 while ((tpool->tp_head == NULL ||
146 (tpool->tp_flags & TP_SUSPEND)) &&
147 !(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) {
148 if (tpool->tp_current <= tpool->tp_minimum ||
149 tpool->tp_linger == 0) {
150 (void) pthread_cond_wait(&tpool->tp_workcv,
151 &tpool->tp_mutex);
152 } else {
153 struct timespec ts;
154
155 clock_gettime(CLOCK_REALTIME, &ts);
156 ts.tv_sec += tpool->tp_linger;
157
158 if (pthread_cond_timedwait(&tpool->tp_workcv,
159 &tpool->tp_mutex, &ts) != 0) {
160 elapsed = 1;
161 break;
162 }
163 }
164 }
165 tpool->tp_idle--;
166 if (tpool->tp_flags & TP_DESTROY)
167 break;
168 if (tpool->tp_flags & TP_ABANDON) {
169 /* can't abandon a suspended pool */
170 if (tpool->tp_flags & TP_SUSPEND) {
171 tpool->tp_flags &= ~TP_SUSPEND;
172 (void) pthread_cond_broadcast(
173 &tpool->tp_workcv);
174 }
175 if (tpool->tp_head == NULL)
176 break;
177 }
178 if ((job = tpool->tp_head) != NULL &&
179 !(tpool->tp_flags & TP_SUSPEND)) {
180 elapsed = 0;
181 func = job->tpj_func;
182 arg = job->tpj_arg;
183 tpool->tp_head = job->tpj_next;
184 if (job == tpool->tp_tail)
185 tpool->tp_tail = NULL;
186 tpool->tp_njobs--;
187 active.tpa_next = tpool->tp_active;
188 tpool->tp_active = &active;
189 pthread_mutex_unlock(&tpool->tp_mutex);
190 pthread_cleanup_push(job_cleanup, tpool);
191 free(job);
192
193 sigset_t maskset;
194 (void) pthread_sigmask(SIG_SETMASK, NULL, &maskset);
195
196 /*
197 * Call the specified function.
198 */
199 func(arg);
200 /*
201 * We don't know what this thread has been doing,
202 * so we reset its signal mask and cancellation
203 * state back to the values prior to calling func().
204 */
205 (void) pthread_sigmask(SIG_SETMASK, &maskset, NULL);
206 (void) pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED,
207 NULL);
208 (void) pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,
209 NULL);
210 pthread_cleanup_pop(1);
211 }
212 if (elapsed && tpool->tp_current > tpool->tp_minimum) {
213 /*
214 * We timed out and there is no work to be done
215 * and the number of workers exceeds the minimum.
216 * Exit now to reduce the size of the pool.
217 */
218 break;
219 }
220 }
221 pthread_cleanup_pop(1);
222 return (arg);
223 }
224
225 /*
226 * Create a worker thread, with default signals blocked.
227 */
228 static int
create_worker(tpool_t * tpool)229 create_worker(tpool_t *tpool)
230 {
231 pthread_t thread;
232 sigset_t oset;
233 int error;
234
235 (void) pthread_sigmask(SIG_SETMASK, NULL, &oset);
236 error = pthread_create(&thread, &tpool->tp_attr, tpool_worker, tpool);
237 (void) pthread_sigmask(SIG_SETMASK, &oset, NULL);
238 return (error);
239 }
240
241
242 /*
243 * pthread_attr_clone: make a copy of a pthread_attr_t. When old_attr
244 * is NULL initialize the cloned attr using default values.
245 */
246 static int
pthread_attr_clone(pthread_attr_t * attr,const pthread_attr_t * old_attr)247 pthread_attr_clone(pthread_attr_t *attr, const pthread_attr_t *old_attr)
248 {
249 int error;
250
251 error = pthread_attr_init(attr);
252 if (error || (old_attr == NULL))
253 return (error);
254
255 #ifdef __GLIBC__
256 cpu_set_t cpuset;
257 size_t cpusetsize = sizeof (cpuset);
258 error = pthread_attr_getaffinity_np(old_attr, cpusetsize, &cpuset);
259 if (error == 0)
260 error = pthread_attr_setaffinity_np(attr, cpusetsize, &cpuset);
261 if (error)
262 goto error;
263 #endif /* __GLIBC__ */
264
265 int detachstate;
266 error = pthread_attr_getdetachstate(old_attr, &detachstate);
267 if (error == 0)
268 error = pthread_attr_setdetachstate(attr, detachstate);
269 if (error)
270 goto error;
271
272 size_t guardsize;
273 error = pthread_attr_getguardsize(old_attr, &guardsize);
274 if (error == 0)
275 error = pthread_attr_setguardsize(attr, guardsize);
276 if (error)
277 goto error;
278
279 int inheritsched;
280 error = pthread_attr_getinheritsched(old_attr, &inheritsched);
281 if (error == 0)
282 error = pthread_attr_setinheritsched(attr, inheritsched);
283 if (error)
284 goto error;
285
286 struct sched_param param;
287 error = pthread_attr_getschedparam(old_attr, ¶m);
288 if (error == 0)
289 error = pthread_attr_setschedparam(attr, ¶m);
290 if (error)
291 goto error;
292
293 int policy;
294 error = pthread_attr_getschedpolicy(old_attr, &policy);
295 if (error == 0)
296 error = pthread_attr_setschedpolicy(attr, policy);
297 if (error)
298 goto error;
299
300 int scope;
301 error = pthread_attr_getscope(old_attr, &scope);
302 if (error == 0)
303 error = pthread_attr_setscope(attr, scope);
304 if (error)
305 goto error;
306
307 void *stackaddr;
308 size_t stacksize;
309 error = pthread_attr_getstack(old_attr, &stackaddr, &stacksize);
310 if (error == 0)
311 error = pthread_attr_setstack(attr, stackaddr, stacksize);
312 if (error)
313 goto error;
314
315 return (0);
316 error:
317 pthread_attr_destroy(attr);
318 return (error);
319 }
320
321 tpool_t *
tpool_create(uint_t min_threads,uint_t max_threads,uint_t linger,pthread_attr_t * attr)322 tpool_create(uint_t min_threads, uint_t max_threads, uint_t linger,
323 pthread_attr_t *attr)
324 {
325 tpool_t *tpool;
326 void *stackaddr;
327 size_t stacksize;
328 size_t minstack;
329 int error;
330
331 if (min_threads > max_threads || max_threads < 1) {
332 errno = EINVAL;
333 return (NULL);
334 }
335 if (attr != NULL) {
336 if (pthread_attr_getstack(attr, &stackaddr, &stacksize) != 0) {
337 errno = EINVAL;
338 return (NULL);
339 }
340 /*
341 * Allow only one thread in the pool with a specified stack.
342 * Require threads to have at least the minimum stack size.
343 */
344 minstack = PTHREAD_STACK_MIN;
345 if (stackaddr != NULL) {
346 if (stacksize < minstack || max_threads != 1) {
347 errno = EINVAL;
348 return (NULL);
349 }
350 } else if (stacksize != 0 && stacksize < minstack) {
351 errno = EINVAL;
352 return (NULL);
353 }
354 }
355
356 tpool = calloc(1, sizeof (*tpool));
357 if (tpool == NULL) {
358 errno = ENOMEM;
359 return (NULL);
360 }
361 (void) pthread_mutex_init(&tpool->tp_mutex, NULL);
362 (void) pthread_cond_init(&tpool->tp_busycv, NULL);
363 (void) pthread_cond_init(&tpool->tp_workcv, NULL);
364 (void) pthread_cond_init(&tpool->tp_waitcv, NULL);
365 tpool->tp_minimum = min_threads;
366 tpool->tp_maximum = max_threads;
367 tpool->tp_linger = linger;
368
369 /*
370 * We cannot just copy the attribute pointer.
371 * We need to initialize a new pthread_attr_t structure
372 * with the values from the user-supplied pthread_attr_t.
373 * If the attribute pointer is NULL, we need to initialize
374 * the new pthread_attr_t structure with default values.
375 */
376 error = pthread_attr_clone(&tpool->tp_attr, attr);
377 if (error) {
378 free(tpool);
379 errno = error;
380 return (NULL);
381 }
382
383 /* make all pool threads be detached daemon threads */
384 (void) pthread_attr_setdetachstate(&tpool->tp_attr,
385 PTHREAD_CREATE_DETACHED);
386
387 /* insert into the global list of all thread pools */
388 pthread_mutex_lock(&thread_pool_lock);
389 if (thread_pools == NULL) {
390 tpool->tp_forw = tpool;
391 tpool->tp_back = tpool;
392 thread_pools = tpool;
393 } else {
394 thread_pools->tp_back->tp_forw = tpool;
395 tpool->tp_forw = thread_pools;
396 tpool->tp_back = thread_pools->tp_back;
397 thread_pools->tp_back = tpool;
398 }
399 pthread_mutex_unlock(&thread_pool_lock);
400
401 return (tpool);
402 }
403
404 /*
405 * Dispatch a work request to the thread pool.
406 * If there are idle workers, awaken one.
407 * Else, if the maximum number of workers has
408 * not been reached, spawn a new worker thread.
409 * Else just return with the job added to the queue.
410 */
411 int
tpool_dispatch(tpool_t * tpool,void (* func)(void *),void * arg)412 tpool_dispatch(tpool_t *tpool, void (*func)(void *), void *arg)
413 {
414 tpool_job_t *job;
415
416 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
417
418 if ((job = calloc(1, sizeof (*job))) == NULL)
419 return (-1);
420 job->tpj_next = NULL;
421 job->tpj_func = func;
422 job->tpj_arg = arg;
423
424 pthread_mutex_lock(&tpool->tp_mutex);
425
426 if (!(tpool->tp_flags & TP_SUSPEND)) {
427 if (tpool->tp_idle > 0)
428 (void) pthread_cond_signal(&tpool->tp_workcv);
429 else if (tpool->tp_current >= tpool->tp_maximum) {
430 /* At worker limit. Leave task on queue */
431 } else {
432 if (create_worker(tpool) == 0) {
433 /* Started a new worker thread */
434 tpool->tp_current++;
435 } else if (tpool->tp_current > 0) {
436 /* Leave task on queue */
437 } else {
438 /* Cannot start a single worker! */
439 pthread_mutex_unlock(&tpool->tp_mutex);
440 free(job);
441 return (-1);
442 }
443 }
444 }
445
446 if (tpool->tp_head == NULL)
447 tpool->tp_head = job;
448 else
449 tpool->tp_tail->tpj_next = job;
450 tpool->tp_tail = job;
451 tpool->tp_njobs++;
452
453 pthread_mutex_unlock(&tpool->tp_mutex);
454 return (0);
455 }
456
457 static void
tpool_cleanup(void * arg)458 tpool_cleanup(void *arg)
459 {
460 tpool_t *tpool = (tpool_t *)arg;
461
462 pthread_mutex_unlock(&tpool->tp_mutex);
463 }
464
465 /*
466 * Assumes: by the time tpool_destroy() is called no one will use this
467 * thread pool in any way and no one will try to dispatch entries to it.
468 * Calling tpool_destroy() from a job in the pool will cause deadlock.
469 */
470 void
tpool_destroy(tpool_t * tpool)471 tpool_destroy(tpool_t *tpool)
472 {
473 tpool_active_t *activep;
474
475 ASSERT(!tpool_member(tpool));
476 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
477
478 pthread_mutex_lock(&tpool->tp_mutex);
479 pthread_cleanup_push(tpool_cleanup, tpool);
480
481 /* mark the pool as being destroyed; wakeup idle workers */
482 tpool->tp_flags |= TP_DESTROY;
483 tpool->tp_flags &= ~TP_SUSPEND;
484 (void) pthread_cond_broadcast(&tpool->tp_workcv);
485
486 /* cancel all active workers */
487 for (activep = tpool->tp_active; activep; activep = activep->tpa_next)
488 (void) pthread_cancel(activep->tpa_tid);
489
490 /* wait for all active workers to finish */
491 while (tpool->tp_active != NULL) {
492 tpool->tp_flags |= TP_WAIT;
493 (void) pthread_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex);
494 }
495
496 /* the last worker to terminate will wake us up */
497 while (tpool->tp_current != 0)
498 (void) pthread_cond_wait(&tpool->tp_busycv, &tpool->tp_mutex);
499
500 pthread_cleanup_pop(1); /* pthread_mutex_unlock(&tpool->tp_mutex); */
501 delete_pool(tpool);
502 }
503
504 /*
505 * Like tpool_destroy(), but don't cancel workers or wait for them to finish.
506 * The last worker to terminate will delete the pool.
507 */
508 void
tpool_abandon(tpool_t * tpool)509 tpool_abandon(tpool_t *tpool)
510 {
511 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
512
513 pthread_mutex_lock(&tpool->tp_mutex);
514 if (tpool->tp_current == 0) {
515 /* no workers, just delete the pool */
516 pthread_mutex_unlock(&tpool->tp_mutex);
517 delete_pool(tpool);
518 } else {
519 /* wake up all workers, last one will delete the pool */
520 tpool->tp_flags |= TP_ABANDON;
521 tpool->tp_flags &= ~TP_SUSPEND;
522 (void) pthread_cond_broadcast(&tpool->tp_workcv);
523 pthread_mutex_unlock(&tpool->tp_mutex);
524 }
525 }
526
527 /*
528 * Wait for all jobs to complete.
529 * Calling tpool_wait() from a job in the pool will cause deadlock.
530 */
531 void
tpool_wait(tpool_t * tpool)532 tpool_wait(tpool_t *tpool)
533 {
534 ASSERT(!tpool_member(tpool));
535 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
536
537 pthread_mutex_lock(&tpool->tp_mutex);
538 pthread_cleanup_push(tpool_cleanup, tpool);
539 while (tpool->tp_head != NULL || tpool->tp_active != NULL) {
540 tpool->tp_flags |= TP_WAIT;
541 (void) pthread_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex);
542 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
543 }
544 pthread_cleanup_pop(1); /* pthread_mutex_unlock(&tpool->tp_mutex); */
545 }
546
547 void
tpool_suspend(tpool_t * tpool)548 tpool_suspend(tpool_t *tpool)
549 {
550 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
551
552 pthread_mutex_lock(&tpool->tp_mutex);
553 tpool->tp_flags |= TP_SUSPEND;
554 pthread_mutex_unlock(&tpool->tp_mutex);
555 }
556
557 int
tpool_suspended(tpool_t * tpool)558 tpool_suspended(tpool_t *tpool)
559 {
560 int suspended;
561
562 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
563
564 pthread_mutex_lock(&tpool->tp_mutex);
565 suspended = (tpool->tp_flags & TP_SUSPEND) != 0;
566 pthread_mutex_unlock(&tpool->tp_mutex);
567
568 return (suspended);
569 }
570
571 void
tpool_resume(tpool_t * tpool)572 tpool_resume(tpool_t *tpool)
573 {
574 int excess;
575
576 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
577
578 pthread_mutex_lock(&tpool->tp_mutex);
579 if (!(tpool->tp_flags & TP_SUSPEND)) {
580 pthread_mutex_unlock(&tpool->tp_mutex);
581 return;
582 }
583 tpool->tp_flags &= ~TP_SUSPEND;
584 (void) pthread_cond_broadcast(&tpool->tp_workcv);
585 excess = tpool->tp_njobs - tpool->tp_idle;
586 while (excess-- > 0 && tpool->tp_current < tpool->tp_maximum) {
587 if (create_worker(tpool) != 0)
588 break; /* pthread_create() failed */
589 tpool->tp_current++;
590 }
591 pthread_mutex_unlock(&tpool->tp_mutex);
592 }
593
594 int
tpool_member(tpool_t * tpool)595 tpool_member(tpool_t *tpool)
596 {
597 pthread_t my_tid = pthread_self();
598 tpool_active_t *activep;
599
600 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
601
602 pthread_mutex_lock(&tpool->tp_mutex);
603 for (activep = tpool->tp_active; activep; activep = activep->tpa_next) {
604 if (activep->tpa_tid == my_tid) {
605 pthread_mutex_unlock(&tpool->tp_mutex);
606 return (1);
607 }
608 }
609 pthread_mutex_unlock(&tpool->tp_mutex);
610 return (0);
611 }
612