xref: /freebsd/sys/contrib/openzfs/lib/libtpool/thread_pool.c (revision aca928a50a42f00f344df934005b09dbcb4e2f77)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or https://opensource.org/licenses/CDDL-1.0.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 
22 /*
23  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 
27 #include <stdlib.h>
28 #include <signal.h>
29 #include <errno.h>
30 #include <assert.h>
31 #include <limits.h>
32 #include "thread_pool_impl.h"
33 
34 static pthread_mutex_t thread_pool_lock = PTHREAD_MUTEX_INITIALIZER;
35 static tpool_t *thread_pools = NULL;
36 
37 static void
delete_pool(tpool_t * tpool)38 delete_pool(tpool_t *tpool)
39 {
40 	tpool_job_t *job;
41 
42 	ASSERT(tpool->tp_current == 0 && tpool->tp_active == NULL);
43 
44 	/*
45 	 * Unlink the pool from the global list of all pools.
46 	 */
47 	(void) pthread_mutex_lock(&thread_pool_lock);
48 	if (thread_pools == tpool)
49 		thread_pools = tpool->tp_forw;
50 	if (thread_pools == tpool)
51 		thread_pools = NULL;
52 	else {
53 		tpool->tp_back->tp_forw = tpool->tp_forw;
54 		tpool->tp_forw->tp_back = tpool->tp_back;
55 	}
56 	pthread_mutex_unlock(&thread_pool_lock);
57 
58 	/*
59 	 * There should be no pending jobs, but just in case...
60 	 */
61 	for (job = tpool->tp_head; job != NULL; job = tpool->tp_head) {
62 		tpool->tp_head = job->tpj_next;
63 		free(job);
64 	}
65 	(void) pthread_attr_destroy(&tpool->tp_attr);
66 	free(tpool);
67 }
68 
69 /*
70  * Worker thread is terminating.
71  */
72 static void
worker_cleanup(void * arg)73 worker_cleanup(void *arg)
74 {
75 	tpool_t *tpool = (tpool_t *)arg;
76 
77 	if (--tpool->tp_current == 0 &&
78 	    (tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) {
79 		if (tpool->tp_flags & TP_ABANDON) {
80 			pthread_mutex_unlock(&tpool->tp_mutex);
81 			delete_pool(tpool);
82 			return;
83 		}
84 		if (tpool->tp_flags & TP_DESTROY)
85 			(void) pthread_cond_broadcast(&tpool->tp_busycv);
86 	}
87 	pthread_mutex_unlock(&tpool->tp_mutex);
88 }
89 
90 static void
notify_waiters(tpool_t * tpool)91 notify_waiters(tpool_t *tpool)
92 {
93 	if (tpool->tp_head == NULL && tpool->tp_active == NULL) {
94 		tpool->tp_flags &= ~TP_WAIT;
95 		(void) pthread_cond_broadcast(&tpool->tp_waitcv);
96 	}
97 }
98 
99 /*
100  * Called by a worker thread on return from a tpool_dispatch()d job.
101  */
102 static void
job_cleanup(void * arg)103 job_cleanup(void *arg)
104 {
105 	tpool_t *tpool = (tpool_t *)arg;
106 
107 	pthread_t my_tid = pthread_self();
108 	tpool_active_t *activep;
109 	tpool_active_t **activepp;
110 
111 	pthread_mutex_lock(&tpool->tp_mutex);
112 	for (activepp = &tpool->tp_active; ; activepp = &activep->tpa_next) {
113 		activep = *activepp;
114 		if (activep->tpa_tid == my_tid) {
115 			*activepp = activep->tpa_next;
116 			break;
117 		}
118 	}
119 	if (tpool->tp_flags & TP_WAIT)
120 		notify_waiters(tpool);
121 }
122 
123 static void *
tpool_worker(void * arg)124 tpool_worker(void *arg)
125 {
126 	tpool_t *tpool = (tpool_t *)arg;
127 	int elapsed;
128 	tpool_job_t *job;
129 	void (*func)(void *);
130 	tpool_active_t active;
131 
132 	pthread_mutex_lock(&tpool->tp_mutex);
133 	pthread_cleanup_push(worker_cleanup, tpool);
134 
135 	/*
136 	 * This is the worker's main loop.
137 	 * It will only be left if a timeout or an error has occurred.
138 	 */
139 	active.tpa_tid = pthread_self();
140 	for (;;) {
141 		elapsed = 0;
142 		tpool->tp_idle++;
143 		if (tpool->tp_flags & TP_WAIT)
144 			notify_waiters(tpool);
145 		while ((tpool->tp_head == NULL ||
146 		    (tpool->tp_flags & TP_SUSPEND)) &&
147 		    !(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) {
148 			if (tpool->tp_current <= tpool->tp_minimum ||
149 			    tpool->tp_linger == 0) {
150 				(void) pthread_cond_wait(&tpool->tp_workcv,
151 				    &tpool->tp_mutex);
152 			} else {
153 				struct timespec ts;
154 
155 				clock_gettime(CLOCK_REALTIME, &ts);
156 				ts.tv_sec += tpool->tp_linger;
157 
158 				if (pthread_cond_timedwait(&tpool->tp_workcv,
159 				    &tpool->tp_mutex, &ts) != 0) {
160 					elapsed = 1;
161 					break;
162 				}
163 			}
164 		}
165 		tpool->tp_idle--;
166 		if (tpool->tp_flags & TP_DESTROY)
167 			break;
168 		if (tpool->tp_flags & TP_ABANDON) {
169 			/* can't abandon a suspended pool */
170 			if (tpool->tp_flags & TP_SUSPEND) {
171 				tpool->tp_flags &= ~TP_SUSPEND;
172 				(void) pthread_cond_broadcast(
173 				    &tpool->tp_workcv);
174 			}
175 			if (tpool->tp_head == NULL)
176 				break;
177 		}
178 		if ((job = tpool->tp_head) != NULL &&
179 		    !(tpool->tp_flags & TP_SUSPEND)) {
180 			elapsed = 0;
181 			func = job->tpj_func;
182 			arg = job->tpj_arg;
183 			tpool->tp_head = job->tpj_next;
184 			if (job == tpool->tp_tail)
185 				tpool->tp_tail = NULL;
186 			tpool->tp_njobs--;
187 			active.tpa_next = tpool->tp_active;
188 			tpool->tp_active = &active;
189 			pthread_mutex_unlock(&tpool->tp_mutex);
190 			pthread_cleanup_push(job_cleanup, tpool);
191 			free(job);
192 
193 			sigset_t maskset;
194 			(void) pthread_sigmask(SIG_SETMASK, NULL, &maskset);
195 
196 			/*
197 			 * Call the specified function.
198 			 */
199 			func(arg);
200 			/*
201 			 * We don't know what this thread has been doing,
202 			 * so we reset its signal mask and cancellation
203 			 * state back to the values prior to calling func().
204 			 */
205 			(void) pthread_sigmask(SIG_SETMASK, &maskset, NULL);
206 			(void) pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED,
207 			    NULL);
208 			(void) pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,
209 			    NULL);
210 			pthread_cleanup_pop(1);
211 		}
212 		if (elapsed && tpool->tp_current > tpool->tp_minimum) {
213 			/*
214 			 * We timed out and there is no work to be done
215 			 * and the number of workers exceeds the minimum.
216 			 * Exit now to reduce the size of the pool.
217 			 */
218 			break;
219 		}
220 	}
221 	pthread_cleanup_pop(1);
222 	return (arg);
223 }
224 
225 /*
226  * Create a worker thread, with default signals blocked.
227  */
228 static int
create_worker(tpool_t * tpool)229 create_worker(tpool_t *tpool)
230 {
231 	pthread_t thread;
232 	sigset_t oset;
233 	int error;
234 
235 	(void) pthread_sigmask(SIG_SETMASK, NULL, &oset);
236 	error = pthread_create(&thread, &tpool->tp_attr, tpool_worker, tpool);
237 	(void) pthread_sigmask(SIG_SETMASK, &oset, NULL);
238 	return (error);
239 }
240 
241 
242 /*
243  * pthread_attr_clone: make a copy of a pthread_attr_t.  When old_attr
244  * is NULL initialize the cloned attr using default values.
245  */
246 static int
pthread_attr_clone(pthread_attr_t * attr,const pthread_attr_t * old_attr)247 pthread_attr_clone(pthread_attr_t *attr, const pthread_attr_t *old_attr)
248 {
249 	int error;
250 
251 	error = pthread_attr_init(attr);
252 	if (error || (old_attr == NULL))
253 		return (error);
254 
255 #ifdef __GLIBC__
256 	cpu_set_t cpuset;
257 	size_t cpusetsize = sizeof (cpuset);
258 	error = pthread_attr_getaffinity_np(old_attr, cpusetsize, &cpuset);
259 	if (error == 0)
260 		error = pthread_attr_setaffinity_np(attr, cpusetsize, &cpuset);
261 	if (error)
262 		goto error;
263 #endif /* __GLIBC__ */
264 
265 	int detachstate;
266 	error = pthread_attr_getdetachstate(old_attr, &detachstate);
267 	if (error == 0)
268 		error = pthread_attr_setdetachstate(attr, detachstate);
269 	if (error)
270 		goto error;
271 
272 	size_t guardsize;
273 	error = pthread_attr_getguardsize(old_attr, &guardsize);
274 	if (error == 0)
275 		error = pthread_attr_setguardsize(attr, guardsize);
276 	if (error)
277 		goto error;
278 
279 	int inheritsched;
280 	error = pthread_attr_getinheritsched(old_attr, &inheritsched);
281 	if (error == 0)
282 		error = pthread_attr_setinheritsched(attr, inheritsched);
283 	if (error)
284 		goto error;
285 
286 	struct sched_param param;
287 	error = pthread_attr_getschedparam(old_attr, &param);
288 	if (error == 0)
289 		error = pthread_attr_setschedparam(attr, &param);
290 	if (error)
291 		goto error;
292 
293 	int policy;
294 	error = pthread_attr_getschedpolicy(old_attr, &policy);
295 	if (error == 0)
296 		error = pthread_attr_setschedpolicy(attr, policy);
297 	if (error)
298 		goto error;
299 
300 	int scope;
301 	error = pthread_attr_getscope(old_attr, &scope);
302 	if (error == 0)
303 		error = pthread_attr_setscope(attr, scope);
304 	if (error)
305 		goto error;
306 
307 	void *stackaddr;
308 	size_t stacksize;
309 	error = pthread_attr_getstack(old_attr, &stackaddr, &stacksize);
310 	if (error == 0)
311 		error = pthread_attr_setstack(attr, stackaddr, stacksize);
312 	if (error)
313 		goto error;
314 
315 	return (0);
316 error:
317 	pthread_attr_destroy(attr);
318 	return (error);
319 }
320 
321 tpool_t	*
tpool_create(uint_t min_threads,uint_t max_threads,uint_t linger,pthread_attr_t * attr)322 tpool_create(uint_t min_threads, uint_t max_threads, uint_t linger,
323     pthread_attr_t *attr)
324 {
325 	tpool_t	*tpool;
326 	void *stackaddr;
327 	size_t stacksize;
328 	size_t minstack;
329 	int error;
330 
331 	if (min_threads > max_threads || max_threads < 1) {
332 		errno = EINVAL;
333 		return (NULL);
334 	}
335 	if (attr != NULL) {
336 		if (pthread_attr_getstack(attr, &stackaddr, &stacksize) != 0) {
337 			errno = EINVAL;
338 			return (NULL);
339 		}
340 		/*
341 		 * Allow only one thread in the pool with a specified stack.
342 		 * Require threads to have at least the minimum stack size.
343 		 */
344 		minstack = PTHREAD_STACK_MIN;
345 		if (stackaddr != NULL) {
346 			if (stacksize < minstack || max_threads != 1) {
347 				errno = EINVAL;
348 				return (NULL);
349 			}
350 		} else if (stacksize != 0 && stacksize < minstack) {
351 			errno = EINVAL;
352 			return (NULL);
353 		}
354 	}
355 
356 	tpool = calloc(1, sizeof (*tpool));
357 	if (tpool == NULL) {
358 		errno = ENOMEM;
359 		return (NULL);
360 	}
361 	(void) pthread_mutex_init(&tpool->tp_mutex, NULL);
362 	(void) pthread_cond_init(&tpool->tp_busycv, NULL);
363 	(void) pthread_cond_init(&tpool->tp_workcv, NULL);
364 	(void) pthread_cond_init(&tpool->tp_waitcv, NULL);
365 	tpool->tp_minimum = min_threads;
366 	tpool->tp_maximum = max_threads;
367 	tpool->tp_linger = linger;
368 
369 	/*
370 	 * We cannot just copy the attribute pointer.
371 	 * We need to initialize a new pthread_attr_t structure
372 	 * with the values from the user-supplied pthread_attr_t.
373 	 * If the attribute pointer is NULL, we need to initialize
374 	 * the new pthread_attr_t structure with default values.
375 	 */
376 	error = pthread_attr_clone(&tpool->tp_attr, attr);
377 	if (error) {
378 		free(tpool);
379 		errno = error;
380 		return (NULL);
381 	}
382 
383 	/* make all pool threads be detached daemon threads */
384 	(void) pthread_attr_setdetachstate(&tpool->tp_attr,
385 	    PTHREAD_CREATE_DETACHED);
386 
387 	/* insert into the global list of all thread pools */
388 	pthread_mutex_lock(&thread_pool_lock);
389 	if (thread_pools == NULL) {
390 		tpool->tp_forw = tpool;
391 		tpool->tp_back = tpool;
392 		thread_pools = tpool;
393 	} else {
394 		thread_pools->tp_back->tp_forw = tpool;
395 		tpool->tp_forw = thread_pools;
396 		tpool->tp_back = thread_pools->tp_back;
397 		thread_pools->tp_back = tpool;
398 	}
399 	pthread_mutex_unlock(&thread_pool_lock);
400 
401 	return (tpool);
402 }
403 
404 /*
405  * Dispatch a work request to the thread pool.
406  * If there are idle workers, awaken one.
407  * Else, if the maximum number of workers has
408  * not been reached, spawn a new worker thread.
409  * Else just return with the job added to the queue.
410  */
411 int
tpool_dispatch(tpool_t * tpool,void (* func)(void *),void * arg)412 tpool_dispatch(tpool_t *tpool, void (*func)(void *), void *arg)
413 {
414 	tpool_job_t *job;
415 
416 	ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
417 
418 	if ((job = calloc(1, sizeof (*job))) == NULL)
419 		return (-1);
420 	job->tpj_next = NULL;
421 	job->tpj_func = func;
422 	job->tpj_arg = arg;
423 
424 	pthread_mutex_lock(&tpool->tp_mutex);
425 
426 	if (!(tpool->tp_flags & TP_SUSPEND)) {
427 		if (tpool->tp_idle > 0)
428 			(void) pthread_cond_signal(&tpool->tp_workcv);
429 		else if (tpool->tp_current >= tpool->tp_maximum) {
430 			/* At worker limit.  Leave task on queue */
431 		} else {
432 			if (create_worker(tpool) == 0) {
433 				/* Started a new worker thread */
434 				tpool->tp_current++;
435 			} else if (tpool->tp_current > 0) {
436 				/* Leave task on queue */
437 			} else {
438 				/* Cannot start a single worker! */
439 				pthread_mutex_unlock(&tpool->tp_mutex);
440 				free(job);
441 				return (-1);
442 			}
443 		}
444 	}
445 
446 	if (tpool->tp_head == NULL)
447 		tpool->tp_head = job;
448 	else
449 		tpool->tp_tail->tpj_next = job;
450 	tpool->tp_tail = job;
451 	tpool->tp_njobs++;
452 
453 	pthread_mutex_unlock(&tpool->tp_mutex);
454 	return (0);
455 }
456 
457 static void
tpool_cleanup(void * arg)458 tpool_cleanup(void *arg)
459 {
460 	tpool_t *tpool = (tpool_t *)arg;
461 
462 	pthread_mutex_unlock(&tpool->tp_mutex);
463 }
464 
465 /*
466  * Assumes: by the time tpool_destroy() is called no one will use this
467  * thread pool in any way and no one will try to dispatch entries to it.
468  * Calling tpool_destroy() from a job in the pool will cause deadlock.
469  */
470 void
tpool_destroy(tpool_t * tpool)471 tpool_destroy(tpool_t *tpool)
472 {
473 	tpool_active_t *activep;
474 
475 	ASSERT(!tpool_member(tpool));
476 	ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
477 
478 	pthread_mutex_lock(&tpool->tp_mutex);
479 	pthread_cleanup_push(tpool_cleanup, tpool);
480 
481 	/* mark the pool as being destroyed; wakeup idle workers */
482 	tpool->tp_flags |= TP_DESTROY;
483 	tpool->tp_flags &= ~TP_SUSPEND;
484 	(void) pthread_cond_broadcast(&tpool->tp_workcv);
485 
486 	/* cancel all active workers */
487 	for (activep = tpool->tp_active; activep; activep = activep->tpa_next)
488 		(void) pthread_cancel(activep->tpa_tid);
489 
490 	/* wait for all active workers to finish */
491 	while (tpool->tp_active != NULL) {
492 		tpool->tp_flags |= TP_WAIT;
493 		(void) pthread_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex);
494 	}
495 
496 	/* the last worker to terminate will wake us up */
497 	while (tpool->tp_current != 0)
498 		(void) pthread_cond_wait(&tpool->tp_busycv, &tpool->tp_mutex);
499 
500 	pthread_cleanup_pop(1);	/* pthread_mutex_unlock(&tpool->tp_mutex); */
501 	delete_pool(tpool);
502 }
503 
504 /*
505  * Like tpool_destroy(), but don't cancel workers or wait for them to finish.
506  * The last worker to terminate will delete the pool.
507  */
508 void
tpool_abandon(tpool_t * tpool)509 tpool_abandon(tpool_t *tpool)
510 {
511 	ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
512 
513 	pthread_mutex_lock(&tpool->tp_mutex);
514 	if (tpool->tp_current == 0) {
515 		/* no workers, just delete the pool */
516 		pthread_mutex_unlock(&tpool->tp_mutex);
517 		delete_pool(tpool);
518 	} else {
519 		/* wake up all workers, last one will delete the pool */
520 		tpool->tp_flags |= TP_ABANDON;
521 		tpool->tp_flags &= ~TP_SUSPEND;
522 		(void) pthread_cond_broadcast(&tpool->tp_workcv);
523 		pthread_mutex_unlock(&tpool->tp_mutex);
524 	}
525 }
526 
527 /*
528  * Wait for all jobs to complete.
529  * Calling tpool_wait() from a job in the pool will cause deadlock.
530  */
531 void
tpool_wait(tpool_t * tpool)532 tpool_wait(tpool_t *tpool)
533 {
534 	ASSERT(!tpool_member(tpool));
535 	ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
536 
537 	pthread_mutex_lock(&tpool->tp_mutex);
538 	pthread_cleanup_push(tpool_cleanup, tpool);
539 	while (tpool->tp_head != NULL || tpool->tp_active != NULL) {
540 		tpool->tp_flags |= TP_WAIT;
541 		(void) pthread_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex);
542 		ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
543 	}
544 	pthread_cleanup_pop(1);	/* pthread_mutex_unlock(&tpool->tp_mutex); */
545 }
546 
547 void
tpool_suspend(tpool_t * tpool)548 tpool_suspend(tpool_t *tpool)
549 {
550 	ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
551 
552 	pthread_mutex_lock(&tpool->tp_mutex);
553 	tpool->tp_flags |= TP_SUSPEND;
554 	pthread_mutex_unlock(&tpool->tp_mutex);
555 }
556 
557 int
tpool_suspended(tpool_t * tpool)558 tpool_suspended(tpool_t *tpool)
559 {
560 	int suspended;
561 
562 	ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
563 
564 	pthread_mutex_lock(&tpool->tp_mutex);
565 	suspended = (tpool->tp_flags & TP_SUSPEND) != 0;
566 	pthread_mutex_unlock(&tpool->tp_mutex);
567 
568 	return (suspended);
569 }
570 
571 void
tpool_resume(tpool_t * tpool)572 tpool_resume(tpool_t *tpool)
573 {
574 	int excess;
575 
576 	ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
577 
578 	pthread_mutex_lock(&tpool->tp_mutex);
579 	if (!(tpool->tp_flags & TP_SUSPEND)) {
580 		pthread_mutex_unlock(&tpool->tp_mutex);
581 		return;
582 	}
583 	tpool->tp_flags &= ~TP_SUSPEND;
584 	(void) pthread_cond_broadcast(&tpool->tp_workcv);
585 	excess = tpool->tp_njobs - tpool->tp_idle;
586 	while (excess-- > 0 && tpool->tp_current < tpool->tp_maximum) {
587 		if (create_worker(tpool) != 0)
588 			break;		/* pthread_create() failed */
589 		tpool->tp_current++;
590 	}
591 	pthread_mutex_unlock(&tpool->tp_mutex);
592 }
593 
594 int
tpool_member(tpool_t * tpool)595 tpool_member(tpool_t *tpool)
596 {
597 	pthread_t my_tid = pthread_self();
598 	tpool_active_t *activep;
599 
600 	ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
601 
602 	pthread_mutex_lock(&tpool->tp_mutex);
603 	for (activep = tpool->tp_active; activep; activep = activep->tpa_next) {
604 		if (activep->tpa_tid == my_tid) {
605 			pthread_mutex_unlock(&tpool->tp_mutex);
606 			return (1);
607 		}
608 	}
609 	pthread_mutex_unlock(&tpool->tp_mutex);
610 	return (0);
611 }
612