1 // SPDX-License-Identifier: CDDL-1.0
2 /*
3 * CDDL HEADER START
4 *
5 * The contents of this file are subject to the terms of the
6 * Common Development and Distribution License (the "License").
7 * You may not use this file except in compliance with the License.
8 *
9 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10 * or https://opensource.org/licenses/CDDL-1.0.
11 * See the License for the specific language governing permissions
12 * and limitations under the License.
13 *
14 * When distributing Covered Code, include this CDDL HEADER in each
15 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16 * If applicable, add the following below this CDDL HEADER, with the
17 * fields enclosed by brackets "[]" replaced with your own identifying
18 * information: Portions Copyright [yyyy] [name of copyright owner]
19 *
20 * CDDL HEADER END
21 */
22
23 /*
24 * Copyright 2008 Sun Microsystems, Inc. All rights reserved.
25 * Use is subject to license terms.
26 */
27
28 #include <stdlib.h>
29 #include <signal.h>
30 #include <errno.h>
31 #include <assert.h>
32 #include <limits.h>
33 #include "thread_pool_impl.h"
34
35 static pthread_mutex_t thread_pool_lock = PTHREAD_MUTEX_INITIALIZER;
36 static tpool_t *thread_pools = NULL;
37
38 static void
delete_pool(tpool_t * tpool)39 delete_pool(tpool_t *tpool)
40 {
41 tpool_job_t *job;
42
43 ASSERT(tpool->tp_current == 0 && tpool->tp_active == NULL);
44
45 /*
46 * Unlink the pool from the global list of all pools.
47 */
48 (void) pthread_mutex_lock(&thread_pool_lock);
49 if (thread_pools == tpool)
50 thread_pools = tpool->tp_forw;
51 if (thread_pools == tpool)
52 thread_pools = NULL;
53 else {
54 tpool->tp_back->tp_forw = tpool->tp_forw;
55 tpool->tp_forw->tp_back = tpool->tp_back;
56 }
57 pthread_mutex_unlock(&thread_pool_lock);
58
59 /*
60 * There should be no pending jobs, but just in case...
61 */
62 for (job = tpool->tp_head; job != NULL; job = tpool->tp_head) {
63 tpool->tp_head = job->tpj_next;
64 free(job);
65 }
66 (void) pthread_attr_destroy(&tpool->tp_attr);
67 free(tpool);
68 }
69
70 /*
71 * Worker thread is terminating.
72 */
73 static void
worker_cleanup(void * arg)74 worker_cleanup(void *arg)
75 {
76 tpool_t *tpool = (tpool_t *)arg;
77
78 if (--tpool->tp_current == 0 &&
79 (tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) {
80 if (tpool->tp_flags & TP_ABANDON) {
81 pthread_mutex_unlock(&tpool->tp_mutex);
82 delete_pool(tpool);
83 return;
84 }
85 if (tpool->tp_flags & TP_DESTROY)
86 (void) pthread_cond_broadcast(&tpool->tp_busycv);
87 }
88 pthread_mutex_unlock(&tpool->tp_mutex);
89 }
90
91 static void
notify_waiters(tpool_t * tpool)92 notify_waiters(tpool_t *tpool)
93 {
94 if (tpool->tp_head == NULL && tpool->tp_active == NULL) {
95 tpool->tp_flags &= ~TP_WAIT;
96 (void) pthread_cond_broadcast(&tpool->tp_waitcv);
97 }
98 }
99
100 /*
101 * Called by a worker thread on return from a tpool_dispatch()d job.
102 */
103 static void
job_cleanup(void * arg)104 job_cleanup(void *arg)
105 {
106 tpool_t *tpool = (tpool_t *)arg;
107
108 pthread_t my_tid = pthread_self();
109 tpool_active_t *activep;
110 tpool_active_t **activepp;
111
112 pthread_mutex_lock(&tpool->tp_mutex);
113 for (activepp = &tpool->tp_active; ; activepp = &activep->tpa_next) {
114 activep = *activepp;
115 if (activep->tpa_tid == my_tid) {
116 *activepp = activep->tpa_next;
117 break;
118 }
119 }
120 if (tpool->tp_flags & TP_WAIT)
121 notify_waiters(tpool);
122 }
123
124 static void *
tpool_worker(void * arg)125 tpool_worker(void *arg)
126 {
127 tpool_t *tpool = (tpool_t *)arg;
128 int elapsed;
129 tpool_job_t *job;
130 void (*func)(void *);
131 tpool_active_t active;
132
133 pthread_mutex_lock(&tpool->tp_mutex);
134 pthread_cleanup_push(worker_cleanup, tpool);
135
136 /*
137 * This is the worker's main loop.
138 * It will only be left if a timeout or an error has occurred.
139 */
140 active.tpa_tid = pthread_self();
141 for (;;) {
142 elapsed = 0;
143 tpool->tp_idle++;
144 if (tpool->tp_flags & TP_WAIT)
145 notify_waiters(tpool);
146 while ((tpool->tp_head == NULL ||
147 (tpool->tp_flags & TP_SUSPEND)) &&
148 !(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) {
149 if (tpool->tp_current <= tpool->tp_minimum ||
150 tpool->tp_linger == 0) {
151 (void) pthread_cond_wait(&tpool->tp_workcv,
152 &tpool->tp_mutex);
153 } else {
154 struct timespec ts;
155
156 clock_gettime(CLOCK_REALTIME, &ts);
157 ts.tv_sec += tpool->tp_linger;
158
159 if (pthread_cond_timedwait(&tpool->tp_workcv,
160 &tpool->tp_mutex, &ts) != 0) {
161 elapsed = 1;
162 break;
163 }
164 }
165 }
166 tpool->tp_idle--;
167 if (tpool->tp_flags & TP_DESTROY)
168 break;
169 if (tpool->tp_flags & TP_ABANDON) {
170 /* can't abandon a suspended pool */
171 if (tpool->tp_flags & TP_SUSPEND) {
172 tpool->tp_flags &= ~TP_SUSPEND;
173 (void) pthread_cond_broadcast(
174 &tpool->tp_workcv);
175 }
176 if (tpool->tp_head == NULL)
177 break;
178 }
179 if ((job = tpool->tp_head) != NULL &&
180 !(tpool->tp_flags & TP_SUSPEND)) {
181 elapsed = 0;
182 func = job->tpj_func;
183 arg = job->tpj_arg;
184 tpool->tp_head = job->tpj_next;
185 if (job == tpool->tp_tail)
186 tpool->tp_tail = NULL;
187 tpool->tp_njobs--;
188 active.tpa_next = tpool->tp_active;
189 tpool->tp_active = &active;
190 pthread_mutex_unlock(&tpool->tp_mutex);
191 pthread_cleanup_push(job_cleanup, tpool);
192 free(job);
193
194 sigset_t maskset;
195 (void) pthread_sigmask(SIG_SETMASK, NULL, &maskset);
196
197 /*
198 * Call the specified function.
199 */
200 func(arg);
201 /*
202 * We don't know what this thread has been doing,
203 * so we reset its signal mask and cancellation
204 * state back to the values prior to calling func().
205 */
206 (void) pthread_sigmask(SIG_SETMASK, &maskset, NULL);
207 (void) pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED,
208 NULL);
209 (void) pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,
210 NULL);
211 pthread_cleanup_pop(1);
212 }
213 if (elapsed && tpool->tp_current > tpool->tp_minimum) {
214 /*
215 * We timed out and there is no work to be done
216 * and the number of workers exceeds the minimum.
217 * Exit now to reduce the size of the pool.
218 */
219 break;
220 }
221 }
222 pthread_cleanup_pop(1);
223 return (arg);
224 }
225
226 /*
227 * Create a worker thread, with default signals blocked.
228 */
229 static int
create_worker(tpool_t * tpool)230 create_worker(tpool_t *tpool)
231 {
232 pthread_t thread;
233 sigset_t oset;
234 int error;
235
236 (void) pthread_sigmask(SIG_SETMASK, NULL, &oset);
237 error = pthread_create(&thread, &tpool->tp_attr, tpool_worker, tpool);
238 (void) pthread_sigmask(SIG_SETMASK, &oset, NULL);
239 return (error);
240 }
241
242
243 /*
244 * pthread_attr_clone: make a copy of a pthread_attr_t. When old_attr
245 * is NULL initialize the cloned attr using default values.
246 */
247 static int
pthread_attr_clone(pthread_attr_t * attr,const pthread_attr_t * old_attr)248 pthread_attr_clone(pthread_attr_t *attr, const pthread_attr_t *old_attr)
249 {
250 int error;
251
252 error = pthread_attr_init(attr);
253 if (error || (old_attr == NULL))
254 return (error);
255
256 #ifdef __GLIBC__
257 cpu_set_t cpuset;
258 size_t cpusetsize = sizeof (cpuset);
259 error = pthread_attr_getaffinity_np(old_attr, cpusetsize, &cpuset);
260 if (error == 0)
261 error = pthread_attr_setaffinity_np(attr, cpusetsize, &cpuset);
262 if (error)
263 goto error;
264 #endif /* __GLIBC__ */
265
266 int detachstate;
267 error = pthread_attr_getdetachstate(old_attr, &detachstate);
268 if (error == 0)
269 error = pthread_attr_setdetachstate(attr, detachstate);
270 if (error)
271 goto error;
272
273 size_t guardsize;
274 error = pthread_attr_getguardsize(old_attr, &guardsize);
275 if (error == 0)
276 error = pthread_attr_setguardsize(attr, guardsize);
277 if (error)
278 goto error;
279
280 int inheritsched;
281 error = pthread_attr_getinheritsched(old_attr, &inheritsched);
282 if (error == 0)
283 error = pthread_attr_setinheritsched(attr, inheritsched);
284 if (error)
285 goto error;
286
287 struct sched_param param;
288 error = pthread_attr_getschedparam(old_attr, ¶m);
289 if (error == 0)
290 error = pthread_attr_setschedparam(attr, ¶m);
291 if (error)
292 goto error;
293
294 int policy;
295 error = pthread_attr_getschedpolicy(old_attr, &policy);
296 if (error == 0)
297 error = pthread_attr_setschedpolicy(attr, policy);
298 if (error)
299 goto error;
300
301 int scope;
302 error = pthread_attr_getscope(old_attr, &scope);
303 if (error == 0)
304 error = pthread_attr_setscope(attr, scope);
305 if (error)
306 goto error;
307
308 void *stackaddr;
309 size_t stacksize;
310 error = pthread_attr_getstack(old_attr, &stackaddr, &stacksize);
311 if (error == 0)
312 error = pthread_attr_setstack(attr, stackaddr, stacksize);
313 if (error)
314 goto error;
315
316 return (0);
317 error:
318 pthread_attr_destroy(attr);
319 return (error);
320 }
321
322 tpool_t *
tpool_create(uint_t min_threads,uint_t max_threads,uint_t linger,pthread_attr_t * attr)323 tpool_create(uint_t min_threads, uint_t max_threads, uint_t linger,
324 pthread_attr_t *attr)
325 {
326 tpool_t *tpool;
327 void *stackaddr;
328 size_t stacksize;
329 size_t minstack;
330 int error;
331
332 if (min_threads > max_threads || max_threads < 1) {
333 errno = EINVAL;
334 return (NULL);
335 }
336 if (attr != NULL) {
337 if (pthread_attr_getstack(attr, &stackaddr, &stacksize) != 0) {
338 errno = EINVAL;
339 return (NULL);
340 }
341 /*
342 * Allow only one thread in the pool with a specified stack.
343 * Require threads to have at least the minimum stack size.
344 */
345 minstack = PTHREAD_STACK_MIN;
346 if (stackaddr != NULL) {
347 if (stacksize < minstack || max_threads != 1) {
348 errno = EINVAL;
349 return (NULL);
350 }
351 } else if (stacksize != 0 && stacksize < minstack) {
352 errno = EINVAL;
353 return (NULL);
354 }
355 }
356
357 tpool = calloc(1, sizeof (*tpool));
358 if (tpool == NULL) {
359 errno = ENOMEM;
360 return (NULL);
361 }
362 (void) pthread_mutex_init(&tpool->tp_mutex, NULL);
363 (void) pthread_cond_init(&tpool->tp_busycv, NULL);
364 (void) pthread_cond_init(&tpool->tp_workcv, NULL);
365 (void) pthread_cond_init(&tpool->tp_waitcv, NULL);
366 tpool->tp_minimum = min_threads;
367 tpool->tp_maximum = max_threads;
368 tpool->tp_linger = linger;
369
370 /*
371 * We cannot just copy the attribute pointer.
372 * We need to initialize a new pthread_attr_t structure
373 * with the values from the user-supplied pthread_attr_t.
374 * If the attribute pointer is NULL, we need to initialize
375 * the new pthread_attr_t structure with default values.
376 */
377 error = pthread_attr_clone(&tpool->tp_attr, attr);
378 if (error) {
379 free(tpool);
380 errno = error;
381 return (NULL);
382 }
383
384 /* make all pool threads be detached daemon threads */
385 (void) pthread_attr_setdetachstate(&tpool->tp_attr,
386 PTHREAD_CREATE_DETACHED);
387
388 /* insert into the global list of all thread pools */
389 pthread_mutex_lock(&thread_pool_lock);
390 if (thread_pools == NULL) {
391 tpool->tp_forw = tpool;
392 tpool->tp_back = tpool;
393 thread_pools = tpool;
394 } else {
395 thread_pools->tp_back->tp_forw = tpool;
396 tpool->tp_forw = thread_pools;
397 tpool->tp_back = thread_pools->tp_back;
398 thread_pools->tp_back = tpool;
399 }
400 pthread_mutex_unlock(&thread_pool_lock);
401
402 return (tpool);
403 }
404
405 /*
406 * Dispatch a work request to the thread pool.
407 * If there are idle workers, awaken one.
408 * Else, if the maximum number of workers has
409 * not been reached, spawn a new worker thread.
410 * Else just return with the job added to the queue.
411 */
412 int
tpool_dispatch(tpool_t * tpool,void (* func)(void *),void * arg)413 tpool_dispatch(tpool_t *tpool, void (*func)(void *), void *arg)
414 {
415 tpool_job_t *job;
416
417 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
418
419 if ((job = calloc(1, sizeof (*job))) == NULL)
420 return (-1);
421 job->tpj_next = NULL;
422 job->tpj_func = func;
423 job->tpj_arg = arg;
424
425 pthread_mutex_lock(&tpool->tp_mutex);
426
427 if (!(tpool->tp_flags & TP_SUSPEND)) {
428 if (tpool->tp_idle > 0)
429 (void) pthread_cond_signal(&tpool->tp_workcv);
430 else if (tpool->tp_current >= tpool->tp_maximum) {
431 /* At worker limit. Leave task on queue */
432 } else {
433 if (create_worker(tpool) == 0) {
434 /* Started a new worker thread */
435 tpool->tp_current++;
436 } else if (tpool->tp_current > 0) {
437 /* Leave task on queue */
438 } else {
439 /* Cannot start a single worker! */
440 pthread_mutex_unlock(&tpool->tp_mutex);
441 free(job);
442 return (-1);
443 }
444 }
445 }
446
447 if (tpool->tp_head == NULL)
448 tpool->tp_head = job;
449 else
450 tpool->tp_tail->tpj_next = job;
451 tpool->tp_tail = job;
452 tpool->tp_njobs++;
453
454 pthread_mutex_unlock(&tpool->tp_mutex);
455 return (0);
456 }
457
458 static void
tpool_cleanup(void * arg)459 tpool_cleanup(void *arg)
460 {
461 tpool_t *tpool = (tpool_t *)arg;
462
463 pthread_mutex_unlock(&tpool->tp_mutex);
464 }
465
466 /*
467 * Assumes: by the time tpool_destroy() is called no one will use this
468 * thread pool in any way and no one will try to dispatch entries to it.
469 * Calling tpool_destroy() from a job in the pool will cause deadlock.
470 */
471 void
tpool_destroy(tpool_t * tpool)472 tpool_destroy(tpool_t *tpool)
473 {
474 tpool_active_t *activep;
475
476 ASSERT(!tpool_member(tpool));
477 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
478
479 pthread_mutex_lock(&tpool->tp_mutex);
480 pthread_cleanup_push(tpool_cleanup, tpool);
481
482 /* mark the pool as being destroyed; wakeup idle workers */
483 tpool->tp_flags |= TP_DESTROY;
484 tpool->tp_flags &= ~TP_SUSPEND;
485 (void) pthread_cond_broadcast(&tpool->tp_workcv);
486
487 /* cancel all active workers */
488 for (activep = tpool->tp_active; activep; activep = activep->tpa_next)
489 (void) pthread_cancel(activep->tpa_tid);
490
491 /* wait for all active workers to finish */
492 while (tpool->tp_active != NULL) {
493 tpool->tp_flags |= TP_WAIT;
494 (void) pthread_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex);
495 }
496
497 /* the last worker to terminate will wake us up */
498 while (tpool->tp_current != 0)
499 (void) pthread_cond_wait(&tpool->tp_busycv, &tpool->tp_mutex);
500
501 pthread_cleanup_pop(1); /* pthread_mutex_unlock(&tpool->tp_mutex); */
502 delete_pool(tpool);
503 }
504
505 /*
506 * Like tpool_destroy(), but don't cancel workers or wait for them to finish.
507 * The last worker to terminate will delete the pool.
508 */
509 void
tpool_abandon(tpool_t * tpool)510 tpool_abandon(tpool_t *tpool)
511 {
512 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
513
514 pthread_mutex_lock(&tpool->tp_mutex);
515 if (tpool->tp_current == 0) {
516 /* no workers, just delete the pool */
517 pthread_mutex_unlock(&tpool->tp_mutex);
518 delete_pool(tpool);
519 } else {
520 /* wake up all workers, last one will delete the pool */
521 tpool->tp_flags |= TP_ABANDON;
522 tpool->tp_flags &= ~TP_SUSPEND;
523 (void) pthread_cond_broadcast(&tpool->tp_workcv);
524 pthread_mutex_unlock(&tpool->tp_mutex);
525 }
526 }
527
528 /*
529 * Wait for all jobs to complete.
530 * Calling tpool_wait() from a job in the pool will cause deadlock.
531 */
532 void
tpool_wait(tpool_t * tpool)533 tpool_wait(tpool_t *tpool)
534 {
535 ASSERT(!tpool_member(tpool));
536 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
537
538 pthread_mutex_lock(&tpool->tp_mutex);
539 pthread_cleanup_push(tpool_cleanup, tpool);
540 while (tpool->tp_head != NULL || tpool->tp_active != NULL) {
541 tpool->tp_flags |= TP_WAIT;
542 (void) pthread_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex);
543 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
544 }
545 pthread_cleanup_pop(1); /* pthread_mutex_unlock(&tpool->tp_mutex); */
546 }
547
548 void
tpool_suspend(tpool_t * tpool)549 tpool_suspend(tpool_t *tpool)
550 {
551 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
552
553 pthread_mutex_lock(&tpool->tp_mutex);
554 tpool->tp_flags |= TP_SUSPEND;
555 pthread_mutex_unlock(&tpool->tp_mutex);
556 }
557
558 int
tpool_suspended(tpool_t * tpool)559 tpool_suspended(tpool_t *tpool)
560 {
561 int suspended;
562
563 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
564
565 pthread_mutex_lock(&tpool->tp_mutex);
566 suspended = (tpool->tp_flags & TP_SUSPEND) != 0;
567 pthread_mutex_unlock(&tpool->tp_mutex);
568
569 return (suspended);
570 }
571
572 void
tpool_resume(tpool_t * tpool)573 tpool_resume(tpool_t *tpool)
574 {
575 int excess;
576
577 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
578
579 pthread_mutex_lock(&tpool->tp_mutex);
580 if (!(tpool->tp_flags & TP_SUSPEND)) {
581 pthread_mutex_unlock(&tpool->tp_mutex);
582 return;
583 }
584 tpool->tp_flags &= ~TP_SUSPEND;
585 (void) pthread_cond_broadcast(&tpool->tp_workcv);
586 excess = tpool->tp_njobs - tpool->tp_idle;
587 while (excess-- > 0 && tpool->tp_current < tpool->tp_maximum) {
588 if (create_worker(tpool) != 0)
589 break; /* pthread_create() failed */
590 tpool->tp_current++;
591 }
592 pthread_mutex_unlock(&tpool->tp_mutex);
593 }
594
595 int
tpool_member(tpool_t * tpool)596 tpool_member(tpool_t *tpool)
597 {
598 pthread_t my_tid = pthread_self();
599 tpool_active_t *activep;
600
601 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
602
603 pthread_mutex_lock(&tpool->tp_mutex);
604 for (activep = tpool->tp_active; activep; activep = activep->tpa_next) {
605 if (activep->tpa_tid == my_tid) {
606 pthread_mutex_unlock(&tpool->tp_mutex);
607 return (1);
608 }
609 }
610 pthread_mutex_unlock(&tpool->tp_mutex);
611 return (0);
612 }
613