xref: /freebsd/sys/contrib/openzfs/lib/libtpool/thread_pool.c (revision 61145dc2b94f12f6a47344fb9aac702321880e43)
1 // SPDX-License-Identifier: CDDL-1.0
2 /*
3  * CDDL HEADER START
4  *
5  * The contents of this file are subject to the terms of the
6  * Common Development and Distribution License (the "License").
7  * You may not use this file except in compliance with the License.
8  *
9  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10  * or https://opensource.org/licenses/CDDL-1.0.
11  * See the License for the specific language governing permissions
12  * and limitations under the License.
13  *
14  * When distributing Covered Code, include this CDDL HEADER in each
15  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16  * If applicable, add the following below this CDDL HEADER, with the
17  * fields enclosed by brackets "[]" replaced with your own identifying
18  * information: Portions Copyright [yyyy] [name of copyright owner]
19  *
20  * CDDL HEADER END
21  */
22 
23 /*
24  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
25  * Use is subject to license terms.
26  */
27 
28 #include <stdlib.h>
29 #include <signal.h>
30 #include <errno.h>
31 #include <assert.h>
32 #include <limits.h>
33 #include "thread_pool_impl.h"
34 
35 static pthread_mutex_t thread_pool_lock = PTHREAD_MUTEX_INITIALIZER;
36 static tpool_t *thread_pools = NULL;
37 
38 static void
delete_pool(tpool_t * tpool)39 delete_pool(tpool_t *tpool)
40 {
41 	tpool_job_t *job;
42 
43 	ASSERT(tpool->tp_current == 0 && tpool->tp_active == NULL);
44 
45 	/*
46 	 * Unlink the pool from the global list of all pools.
47 	 */
48 	(void) pthread_mutex_lock(&thread_pool_lock);
49 	if (thread_pools == tpool)
50 		thread_pools = tpool->tp_forw;
51 	if (thread_pools == tpool)
52 		thread_pools = NULL;
53 	else {
54 		tpool->tp_back->tp_forw = tpool->tp_forw;
55 		tpool->tp_forw->tp_back = tpool->tp_back;
56 	}
57 	pthread_mutex_unlock(&thread_pool_lock);
58 
59 	/*
60 	 * There should be no pending jobs, but just in case...
61 	 */
62 	for (job = tpool->tp_head; job != NULL; job = tpool->tp_head) {
63 		tpool->tp_head = job->tpj_next;
64 		free(job);
65 	}
66 	(void) pthread_attr_destroy(&tpool->tp_attr);
67 	free(tpool);
68 }
69 
70 /*
71  * Worker thread is terminating.
72  */
73 static void
worker_cleanup(void * arg)74 worker_cleanup(void *arg)
75 {
76 	tpool_t *tpool = (tpool_t *)arg;
77 
78 	if (--tpool->tp_current == 0 &&
79 	    (tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) {
80 		if (tpool->tp_flags & TP_ABANDON) {
81 			pthread_mutex_unlock(&tpool->tp_mutex);
82 			delete_pool(tpool);
83 			return;
84 		}
85 		if (tpool->tp_flags & TP_DESTROY)
86 			(void) pthread_cond_broadcast(&tpool->tp_busycv);
87 	}
88 	pthread_mutex_unlock(&tpool->tp_mutex);
89 }
90 
91 static void
notify_waiters(tpool_t * tpool)92 notify_waiters(tpool_t *tpool)
93 {
94 	if (tpool->tp_head == NULL && tpool->tp_active == NULL) {
95 		tpool->tp_flags &= ~TP_WAIT;
96 		(void) pthread_cond_broadcast(&tpool->tp_waitcv);
97 	}
98 }
99 
100 /*
101  * Called by a worker thread on return from a tpool_dispatch()d job.
102  */
103 static void
job_cleanup(void * arg)104 job_cleanup(void *arg)
105 {
106 	tpool_t *tpool = (tpool_t *)arg;
107 
108 	pthread_t my_tid = pthread_self();
109 	tpool_active_t *activep;
110 	tpool_active_t **activepp;
111 
112 	pthread_mutex_lock(&tpool->tp_mutex);
113 	for (activepp = &tpool->tp_active; ; activepp = &activep->tpa_next) {
114 		activep = *activepp;
115 		if (activep->tpa_tid == my_tid) {
116 			*activepp = activep->tpa_next;
117 			break;
118 		}
119 	}
120 	if (tpool->tp_flags & TP_WAIT)
121 		notify_waiters(tpool);
122 }
123 
124 static void *
tpool_worker(void * arg)125 tpool_worker(void *arg)
126 {
127 	tpool_t *tpool = (tpool_t *)arg;
128 	int elapsed;
129 	tpool_job_t *job;
130 	void (*func)(void *);
131 	tpool_active_t active;
132 
133 	pthread_mutex_lock(&tpool->tp_mutex);
134 	pthread_cleanup_push(worker_cleanup, tpool);
135 
136 	/*
137 	 * This is the worker's main loop.
138 	 * It will only be left if a timeout or an error has occurred.
139 	 */
140 	active.tpa_tid = pthread_self();
141 	for (;;) {
142 		elapsed = 0;
143 		tpool->tp_idle++;
144 		if (tpool->tp_flags & TP_WAIT)
145 			notify_waiters(tpool);
146 		while ((tpool->tp_head == NULL ||
147 		    (tpool->tp_flags & TP_SUSPEND)) &&
148 		    !(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) {
149 			if (tpool->tp_current <= tpool->tp_minimum ||
150 			    tpool->tp_linger == 0) {
151 				(void) pthread_cond_wait(&tpool->tp_workcv,
152 				    &tpool->tp_mutex);
153 			} else {
154 				struct timespec ts;
155 
156 				clock_gettime(CLOCK_REALTIME, &ts);
157 				ts.tv_sec += tpool->tp_linger;
158 
159 				if (pthread_cond_timedwait(&tpool->tp_workcv,
160 				    &tpool->tp_mutex, &ts) != 0) {
161 					elapsed = 1;
162 					break;
163 				}
164 			}
165 		}
166 		tpool->tp_idle--;
167 		if (tpool->tp_flags & TP_DESTROY)
168 			break;
169 		if (tpool->tp_flags & TP_ABANDON) {
170 			/* can't abandon a suspended pool */
171 			if (tpool->tp_flags & TP_SUSPEND) {
172 				tpool->tp_flags &= ~TP_SUSPEND;
173 				(void) pthread_cond_broadcast(
174 				    &tpool->tp_workcv);
175 			}
176 			if (tpool->tp_head == NULL)
177 				break;
178 		}
179 		if ((job = tpool->tp_head) != NULL &&
180 		    !(tpool->tp_flags & TP_SUSPEND)) {
181 			elapsed = 0;
182 			func = job->tpj_func;
183 			arg = job->tpj_arg;
184 			tpool->tp_head = job->tpj_next;
185 			if (job == tpool->tp_tail)
186 				tpool->tp_tail = NULL;
187 			tpool->tp_njobs--;
188 			active.tpa_next = tpool->tp_active;
189 			tpool->tp_active = &active;
190 			pthread_mutex_unlock(&tpool->tp_mutex);
191 			pthread_cleanup_push(job_cleanup, tpool);
192 			free(job);
193 
194 			sigset_t maskset;
195 			(void) pthread_sigmask(SIG_SETMASK, NULL, &maskset);
196 
197 			/*
198 			 * Call the specified function.
199 			 */
200 			func(arg);
201 			/*
202 			 * We don't know what this thread has been doing,
203 			 * so we reset its signal mask and cancellation
204 			 * state back to the values prior to calling func().
205 			 */
206 			(void) pthread_sigmask(SIG_SETMASK, &maskset, NULL);
207 			(void) pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED,
208 			    NULL);
209 			(void) pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,
210 			    NULL);
211 			pthread_cleanup_pop(1);
212 		}
213 		if (elapsed && tpool->tp_current > tpool->tp_minimum) {
214 			/*
215 			 * We timed out and there is no work to be done
216 			 * and the number of workers exceeds the minimum.
217 			 * Exit now to reduce the size of the pool.
218 			 */
219 			break;
220 		}
221 	}
222 	pthread_cleanup_pop(1);
223 	return (arg);
224 }
225 
226 /*
227  * Create a worker thread, with default signals blocked.
228  */
229 static int
create_worker(tpool_t * tpool)230 create_worker(tpool_t *tpool)
231 {
232 	pthread_t thread;
233 	sigset_t oset;
234 	int error;
235 
236 	(void) pthread_sigmask(SIG_SETMASK, NULL, &oset);
237 	error = pthread_create(&thread, &tpool->tp_attr, tpool_worker, tpool);
238 	(void) pthread_sigmask(SIG_SETMASK, &oset, NULL);
239 	return (error);
240 }
241 
242 
243 /*
244  * pthread_attr_clone: make a copy of a pthread_attr_t.  When old_attr
245  * is NULL initialize the cloned attr using default values.
246  */
247 static int
pthread_attr_clone(pthread_attr_t * attr,const pthread_attr_t * old_attr)248 pthread_attr_clone(pthread_attr_t *attr, const pthread_attr_t *old_attr)
249 {
250 	int error;
251 
252 	error = pthread_attr_init(attr);
253 	if (error || (old_attr == NULL))
254 		return (error);
255 
256 #ifdef __GLIBC__
257 	cpu_set_t cpuset;
258 	size_t cpusetsize = sizeof (cpuset);
259 	error = pthread_attr_getaffinity_np(old_attr, cpusetsize, &cpuset);
260 	if (error == 0)
261 		error = pthread_attr_setaffinity_np(attr, cpusetsize, &cpuset);
262 	if (error)
263 		goto error;
264 #endif /* __GLIBC__ */
265 
266 	int detachstate;
267 	error = pthread_attr_getdetachstate(old_attr, &detachstate);
268 	if (error == 0)
269 		error = pthread_attr_setdetachstate(attr, detachstate);
270 	if (error)
271 		goto error;
272 
273 	size_t guardsize;
274 	error = pthread_attr_getguardsize(old_attr, &guardsize);
275 	if (error == 0)
276 		error = pthread_attr_setguardsize(attr, guardsize);
277 	if (error)
278 		goto error;
279 
280 	int inheritsched;
281 	error = pthread_attr_getinheritsched(old_attr, &inheritsched);
282 	if (error == 0)
283 		error = pthread_attr_setinheritsched(attr, inheritsched);
284 	if (error)
285 		goto error;
286 
287 	struct sched_param param;
288 	error = pthread_attr_getschedparam(old_attr, &param);
289 	if (error == 0)
290 		error = pthread_attr_setschedparam(attr, &param);
291 	if (error)
292 		goto error;
293 
294 	int policy;
295 	error = pthread_attr_getschedpolicy(old_attr, &policy);
296 	if (error == 0)
297 		error = pthread_attr_setschedpolicy(attr, policy);
298 	if (error)
299 		goto error;
300 
301 	int scope;
302 	error = pthread_attr_getscope(old_attr, &scope);
303 	if (error == 0)
304 		error = pthread_attr_setscope(attr, scope);
305 	if (error)
306 		goto error;
307 
308 	void *stackaddr;
309 	size_t stacksize;
310 	error = pthread_attr_getstack(old_attr, &stackaddr, &stacksize);
311 	if (error == 0)
312 		error = pthread_attr_setstack(attr, stackaddr, stacksize);
313 	if (error)
314 		goto error;
315 
316 	return (0);
317 error:
318 	pthread_attr_destroy(attr);
319 	return (error);
320 }
321 
322 tpool_t	*
tpool_create(uint_t min_threads,uint_t max_threads,uint_t linger,pthread_attr_t * attr)323 tpool_create(uint_t min_threads, uint_t max_threads, uint_t linger,
324     pthread_attr_t *attr)
325 {
326 	tpool_t	*tpool;
327 	void *stackaddr;
328 	size_t stacksize;
329 	size_t minstack;
330 	int error;
331 
332 	if (min_threads > max_threads || max_threads < 1) {
333 		errno = EINVAL;
334 		return (NULL);
335 	}
336 	if (attr != NULL) {
337 		if (pthread_attr_getstack(attr, &stackaddr, &stacksize) != 0) {
338 			errno = EINVAL;
339 			return (NULL);
340 		}
341 		/*
342 		 * Allow only one thread in the pool with a specified stack.
343 		 * Require threads to have at least the minimum stack size.
344 		 */
345 		minstack = PTHREAD_STACK_MIN;
346 		if (stackaddr != NULL) {
347 			if (stacksize < minstack || max_threads != 1) {
348 				errno = EINVAL;
349 				return (NULL);
350 			}
351 		} else if (stacksize != 0 && stacksize < minstack) {
352 			errno = EINVAL;
353 			return (NULL);
354 		}
355 	}
356 
357 	tpool = calloc(1, sizeof (*tpool));
358 	if (tpool == NULL) {
359 		errno = ENOMEM;
360 		return (NULL);
361 	}
362 	(void) pthread_mutex_init(&tpool->tp_mutex, NULL);
363 	(void) pthread_cond_init(&tpool->tp_busycv, NULL);
364 	(void) pthread_cond_init(&tpool->tp_workcv, NULL);
365 	(void) pthread_cond_init(&tpool->tp_waitcv, NULL);
366 	tpool->tp_minimum = min_threads;
367 	tpool->tp_maximum = max_threads;
368 	tpool->tp_linger = linger;
369 
370 	/*
371 	 * We cannot just copy the attribute pointer.
372 	 * We need to initialize a new pthread_attr_t structure
373 	 * with the values from the user-supplied pthread_attr_t.
374 	 * If the attribute pointer is NULL, we need to initialize
375 	 * the new pthread_attr_t structure with default values.
376 	 */
377 	error = pthread_attr_clone(&tpool->tp_attr, attr);
378 	if (error) {
379 		free(tpool);
380 		errno = error;
381 		return (NULL);
382 	}
383 
384 	/* make all pool threads be detached daemon threads */
385 	(void) pthread_attr_setdetachstate(&tpool->tp_attr,
386 	    PTHREAD_CREATE_DETACHED);
387 
388 	/* insert into the global list of all thread pools */
389 	pthread_mutex_lock(&thread_pool_lock);
390 	if (thread_pools == NULL) {
391 		tpool->tp_forw = tpool;
392 		tpool->tp_back = tpool;
393 		thread_pools = tpool;
394 	} else {
395 		thread_pools->tp_back->tp_forw = tpool;
396 		tpool->tp_forw = thread_pools;
397 		tpool->tp_back = thread_pools->tp_back;
398 		thread_pools->tp_back = tpool;
399 	}
400 	pthread_mutex_unlock(&thread_pool_lock);
401 
402 	return (tpool);
403 }
404 
405 /*
406  * Dispatch a work request to the thread pool.
407  * If there are idle workers, awaken one.
408  * Else, if the maximum number of workers has
409  * not been reached, spawn a new worker thread.
410  * Else just return with the job added to the queue.
411  */
412 int
tpool_dispatch(tpool_t * tpool,void (* func)(void *),void * arg)413 tpool_dispatch(tpool_t *tpool, void (*func)(void *), void *arg)
414 {
415 	tpool_job_t *job;
416 
417 	ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
418 
419 	if ((job = calloc(1, sizeof (*job))) == NULL)
420 		return (-1);
421 	job->tpj_next = NULL;
422 	job->tpj_func = func;
423 	job->tpj_arg = arg;
424 
425 	pthread_mutex_lock(&tpool->tp_mutex);
426 
427 	if (!(tpool->tp_flags & TP_SUSPEND)) {
428 		if (tpool->tp_idle > 0)
429 			(void) pthread_cond_signal(&tpool->tp_workcv);
430 		else if (tpool->tp_current >= tpool->tp_maximum) {
431 			/* At worker limit.  Leave task on queue */
432 		} else {
433 			if (create_worker(tpool) == 0) {
434 				/* Started a new worker thread */
435 				tpool->tp_current++;
436 			} else if (tpool->tp_current > 0) {
437 				/* Leave task on queue */
438 			} else {
439 				/* Cannot start a single worker! */
440 				pthread_mutex_unlock(&tpool->tp_mutex);
441 				free(job);
442 				return (-1);
443 			}
444 		}
445 	}
446 
447 	if (tpool->tp_head == NULL)
448 		tpool->tp_head = job;
449 	else
450 		tpool->tp_tail->tpj_next = job;
451 	tpool->tp_tail = job;
452 	tpool->tp_njobs++;
453 
454 	pthread_mutex_unlock(&tpool->tp_mutex);
455 	return (0);
456 }
457 
458 static void
tpool_cleanup(void * arg)459 tpool_cleanup(void *arg)
460 {
461 	tpool_t *tpool = (tpool_t *)arg;
462 
463 	pthread_mutex_unlock(&tpool->tp_mutex);
464 }
465 
466 /*
467  * Assumes: by the time tpool_destroy() is called no one will use this
468  * thread pool in any way and no one will try to dispatch entries to it.
469  * Calling tpool_destroy() from a job in the pool will cause deadlock.
470  */
471 void
tpool_destroy(tpool_t * tpool)472 tpool_destroy(tpool_t *tpool)
473 {
474 	tpool_active_t *activep;
475 
476 	ASSERT(!tpool_member(tpool));
477 	ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
478 
479 	pthread_mutex_lock(&tpool->tp_mutex);
480 	pthread_cleanup_push(tpool_cleanup, tpool);
481 
482 	/* mark the pool as being destroyed; wakeup idle workers */
483 	tpool->tp_flags |= TP_DESTROY;
484 	tpool->tp_flags &= ~TP_SUSPEND;
485 	(void) pthread_cond_broadcast(&tpool->tp_workcv);
486 
487 	/* cancel all active workers */
488 	for (activep = tpool->tp_active; activep; activep = activep->tpa_next)
489 		(void) pthread_cancel(activep->tpa_tid);
490 
491 	/* wait for all active workers to finish */
492 	while (tpool->tp_active != NULL) {
493 		tpool->tp_flags |= TP_WAIT;
494 		(void) pthread_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex);
495 	}
496 
497 	/* the last worker to terminate will wake us up */
498 	while (tpool->tp_current != 0)
499 		(void) pthread_cond_wait(&tpool->tp_busycv, &tpool->tp_mutex);
500 
501 	pthread_cleanup_pop(1);	/* pthread_mutex_unlock(&tpool->tp_mutex); */
502 	delete_pool(tpool);
503 }
504 
505 /*
506  * Like tpool_destroy(), but don't cancel workers or wait for them to finish.
507  * The last worker to terminate will delete the pool.
508  */
509 void
tpool_abandon(tpool_t * tpool)510 tpool_abandon(tpool_t *tpool)
511 {
512 	ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
513 
514 	pthread_mutex_lock(&tpool->tp_mutex);
515 	if (tpool->tp_current == 0) {
516 		/* no workers, just delete the pool */
517 		pthread_mutex_unlock(&tpool->tp_mutex);
518 		delete_pool(tpool);
519 	} else {
520 		/* wake up all workers, last one will delete the pool */
521 		tpool->tp_flags |= TP_ABANDON;
522 		tpool->tp_flags &= ~TP_SUSPEND;
523 		(void) pthread_cond_broadcast(&tpool->tp_workcv);
524 		pthread_mutex_unlock(&tpool->tp_mutex);
525 	}
526 }
527 
528 /*
529  * Wait for all jobs to complete.
530  * Calling tpool_wait() from a job in the pool will cause deadlock.
531  */
532 void
tpool_wait(tpool_t * tpool)533 tpool_wait(tpool_t *tpool)
534 {
535 	ASSERT(!tpool_member(tpool));
536 	ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
537 
538 	pthread_mutex_lock(&tpool->tp_mutex);
539 	pthread_cleanup_push(tpool_cleanup, tpool);
540 	while (tpool->tp_head != NULL || tpool->tp_active != NULL) {
541 		tpool->tp_flags |= TP_WAIT;
542 		(void) pthread_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex);
543 		ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
544 	}
545 	pthread_cleanup_pop(1);	/* pthread_mutex_unlock(&tpool->tp_mutex); */
546 }
547 
548 void
tpool_suspend(tpool_t * tpool)549 tpool_suspend(tpool_t *tpool)
550 {
551 	ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
552 
553 	pthread_mutex_lock(&tpool->tp_mutex);
554 	tpool->tp_flags |= TP_SUSPEND;
555 	pthread_mutex_unlock(&tpool->tp_mutex);
556 }
557 
558 int
tpool_suspended(tpool_t * tpool)559 tpool_suspended(tpool_t *tpool)
560 {
561 	int suspended;
562 
563 	ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
564 
565 	pthread_mutex_lock(&tpool->tp_mutex);
566 	suspended = (tpool->tp_flags & TP_SUSPEND) != 0;
567 	pthread_mutex_unlock(&tpool->tp_mutex);
568 
569 	return (suspended);
570 }
571 
572 void
tpool_resume(tpool_t * tpool)573 tpool_resume(tpool_t *tpool)
574 {
575 	int excess;
576 
577 	ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
578 
579 	pthread_mutex_lock(&tpool->tp_mutex);
580 	if (!(tpool->tp_flags & TP_SUSPEND)) {
581 		pthread_mutex_unlock(&tpool->tp_mutex);
582 		return;
583 	}
584 	tpool->tp_flags &= ~TP_SUSPEND;
585 	(void) pthread_cond_broadcast(&tpool->tp_workcv);
586 	excess = tpool->tp_njobs - tpool->tp_idle;
587 	while (excess-- > 0 && tpool->tp_current < tpool->tp_maximum) {
588 		if (create_worker(tpool) != 0)
589 			break;		/* pthread_create() failed */
590 		tpool->tp_current++;
591 	}
592 	pthread_mutex_unlock(&tpool->tp_mutex);
593 }
594 
595 int
tpool_member(tpool_t * tpool)596 tpool_member(tpool_t *tpool)
597 {
598 	pthread_t my_tid = pthread_self();
599 	tpool_active_t *activep;
600 
601 	ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
602 
603 	pthread_mutex_lock(&tpool->tp_mutex);
604 	for (activep = tpool->tp_active; activep; activep = activep->tpa_next) {
605 		if (activep->tpa_tid == my_tid) {
606 			pthread_mutex_unlock(&tpool->tp_mutex);
607 			return (1);
608 		}
609 	}
610 	pthread_mutex_unlock(&tpool->tp_mutex);
611 	return (0);
612 }
613