xref: /freebsd/sys/contrib/openzfs/lib/libtpool/thread_pool.c (revision 61145dc2b94f12f6a47344fb9aac702321880e43)
1  // SPDX-License-Identifier: CDDL-1.0
2  /*
3   * CDDL HEADER START
4   *
5   * The contents of this file are subject to the terms of the
6   * Common Development and Distribution License (the "License").
7   * You may not use this file except in compliance with the License.
8   *
9   * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10   * or https://opensource.org/licenses/CDDL-1.0.
11   * See the License for the specific language governing permissions
12   * and limitations under the License.
13   *
14   * When distributing Covered Code, include this CDDL HEADER in each
15   * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16   * If applicable, add the following below this CDDL HEADER, with the
17   * fields enclosed by brackets "[]" replaced with your own identifying
18   * information: Portions Copyright [yyyy] [name of copyright owner]
19   *
20   * CDDL HEADER END
21   */
22  
23  /*
24   * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
25   * Use is subject to license terms.
26   */
27  
28  #include <stdlib.h>
29  #include <signal.h>
30  #include <errno.h>
31  #include <assert.h>
32  #include <limits.h>
33  #include "thread_pool_impl.h"
34  
35  static pthread_mutex_t thread_pool_lock = PTHREAD_MUTEX_INITIALIZER;
36  static tpool_t *thread_pools = NULL;
37  
38  static void
delete_pool(tpool_t * tpool)39  delete_pool(tpool_t *tpool)
40  {
41  	tpool_job_t *job;
42  
43  	ASSERT(tpool->tp_current == 0 && tpool->tp_active == NULL);
44  
45  	/*
46  	 * Unlink the pool from the global list of all pools.
47  	 */
48  	(void) pthread_mutex_lock(&thread_pool_lock);
49  	if (thread_pools == tpool)
50  		thread_pools = tpool->tp_forw;
51  	if (thread_pools == tpool)
52  		thread_pools = NULL;
53  	else {
54  		tpool->tp_back->tp_forw = tpool->tp_forw;
55  		tpool->tp_forw->tp_back = tpool->tp_back;
56  	}
57  	pthread_mutex_unlock(&thread_pool_lock);
58  
59  	/*
60  	 * There should be no pending jobs, but just in case...
61  	 */
62  	for (job = tpool->tp_head; job != NULL; job = tpool->tp_head) {
63  		tpool->tp_head = job->tpj_next;
64  		free(job);
65  	}
66  	(void) pthread_attr_destroy(&tpool->tp_attr);
67  	free(tpool);
68  }
69  
70  /*
71   * Worker thread is terminating.
72   */
73  static void
worker_cleanup(void * arg)74  worker_cleanup(void *arg)
75  {
76  	tpool_t *tpool = (tpool_t *)arg;
77  
78  	if (--tpool->tp_current == 0 &&
79  	    (tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) {
80  		if (tpool->tp_flags & TP_ABANDON) {
81  			pthread_mutex_unlock(&tpool->tp_mutex);
82  			delete_pool(tpool);
83  			return;
84  		}
85  		if (tpool->tp_flags & TP_DESTROY)
86  			(void) pthread_cond_broadcast(&tpool->tp_busycv);
87  	}
88  	pthread_mutex_unlock(&tpool->tp_mutex);
89  }
90  
91  static void
notify_waiters(tpool_t * tpool)92  notify_waiters(tpool_t *tpool)
93  {
94  	if (tpool->tp_head == NULL && tpool->tp_active == NULL) {
95  		tpool->tp_flags &= ~TP_WAIT;
96  		(void) pthread_cond_broadcast(&tpool->tp_waitcv);
97  	}
98  }
99  
100  /*
101   * Called by a worker thread on return from a tpool_dispatch()d job.
102   */
103  static void
job_cleanup(void * arg)104  job_cleanup(void *arg)
105  {
106  	tpool_t *tpool = (tpool_t *)arg;
107  
108  	pthread_t my_tid = pthread_self();
109  	tpool_active_t *activep;
110  	tpool_active_t **activepp;
111  
112  	pthread_mutex_lock(&tpool->tp_mutex);
113  	for (activepp = &tpool->tp_active; ; activepp = &activep->tpa_next) {
114  		activep = *activepp;
115  		if (activep->tpa_tid == my_tid) {
116  			*activepp = activep->tpa_next;
117  			break;
118  		}
119  	}
120  	if (tpool->tp_flags & TP_WAIT)
121  		notify_waiters(tpool);
122  }
123  
124  static void *
tpool_worker(void * arg)125  tpool_worker(void *arg)
126  {
127  	tpool_t *tpool = (tpool_t *)arg;
128  	int elapsed;
129  	tpool_job_t *job;
130  	void (*func)(void *);
131  	tpool_active_t active;
132  
133  	pthread_mutex_lock(&tpool->tp_mutex);
134  	pthread_cleanup_push(worker_cleanup, tpool);
135  
136  	/*
137  	 * This is the worker's main loop.
138  	 * It will only be left if a timeout or an error has occurred.
139  	 */
140  	active.tpa_tid = pthread_self();
141  	for (;;) {
142  		elapsed = 0;
143  		tpool->tp_idle++;
144  		if (tpool->tp_flags & TP_WAIT)
145  			notify_waiters(tpool);
146  		while ((tpool->tp_head == NULL ||
147  		    (tpool->tp_flags & TP_SUSPEND)) &&
148  		    !(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) {
149  			if (tpool->tp_current <= tpool->tp_minimum ||
150  			    tpool->tp_linger == 0) {
151  				(void) pthread_cond_wait(&tpool->tp_workcv,
152  				    &tpool->tp_mutex);
153  			} else {
154  				struct timespec ts;
155  
156  				clock_gettime(CLOCK_REALTIME, &ts);
157  				ts.tv_sec += tpool->tp_linger;
158  
159  				if (pthread_cond_timedwait(&tpool->tp_workcv,
160  				    &tpool->tp_mutex, &ts) != 0) {
161  					elapsed = 1;
162  					break;
163  				}
164  			}
165  		}
166  		tpool->tp_idle--;
167  		if (tpool->tp_flags & TP_DESTROY)
168  			break;
169  		if (tpool->tp_flags & TP_ABANDON) {
170  			/* can't abandon a suspended pool */
171  			if (tpool->tp_flags & TP_SUSPEND) {
172  				tpool->tp_flags &= ~TP_SUSPEND;
173  				(void) pthread_cond_broadcast(
174  				    &tpool->tp_workcv);
175  			}
176  			if (tpool->tp_head == NULL)
177  				break;
178  		}
179  		if ((job = tpool->tp_head) != NULL &&
180  		    !(tpool->tp_flags & TP_SUSPEND)) {
181  			elapsed = 0;
182  			func = job->tpj_func;
183  			arg = job->tpj_arg;
184  			tpool->tp_head = job->tpj_next;
185  			if (job == tpool->tp_tail)
186  				tpool->tp_tail = NULL;
187  			tpool->tp_njobs--;
188  			active.tpa_next = tpool->tp_active;
189  			tpool->tp_active = &active;
190  			pthread_mutex_unlock(&tpool->tp_mutex);
191  			pthread_cleanup_push(job_cleanup, tpool);
192  			free(job);
193  
194  			sigset_t maskset;
195  			(void) pthread_sigmask(SIG_SETMASK, NULL, &maskset);
196  
197  			/*
198  			 * Call the specified function.
199  			 */
200  			func(arg);
201  			/*
202  			 * We don't know what this thread has been doing,
203  			 * so we reset its signal mask and cancellation
204  			 * state back to the values prior to calling func().
205  			 */
206  			(void) pthread_sigmask(SIG_SETMASK, &maskset, NULL);
207  			(void) pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED,
208  			    NULL);
209  			(void) pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,
210  			    NULL);
211  			pthread_cleanup_pop(1);
212  		}
213  		if (elapsed && tpool->tp_current > tpool->tp_minimum) {
214  			/*
215  			 * We timed out and there is no work to be done
216  			 * and the number of workers exceeds the minimum.
217  			 * Exit now to reduce the size of the pool.
218  			 */
219  			break;
220  		}
221  	}
222  	pthread_cleanup_pop(1);
223  	return (arg);
224  }
225  
226  /*
227   * Create a worker thread, with default signals blocked.
228   */
229  static int
create_worker(tpool_t * tpool)230  create_worker(tpool_t *tpool)
231  {
232  	pthread_t thread;
233  	sigset_t oset;
234  	int error;
235  
236  	(void) pthread_sigmask(SIG_SETMASK, NULL, &oset);
237  	error = pthread_create(&thread, &tpool->tp_attr, tpool_worker, tpool);
238  	(void) pthread_sigmask(SIG_SETMASK, &oset, NULL);
239  	return (error);
240  }
241  
242  
243  /*
244   * pthread_attr_clone: make a copy of a pthread_attr_t.  When old_attr
245   * is NULL initialize the cloned attr using default values.
246   */
247  static int
pthread_attr_clone(pthread_attr_t * attr,const pthread_attr_t * old_attr)248  pthread_attr_clone(pthread_attr_t *attr, const pthread_attr_t *old_attr)
249  {
250  	int error;
251  
252  	error = pthread_attr_init(attr);
253  	if (error || (old_attr == NULL))
254  		return (error);
255  
256  #ifdef __GLIBC__
257  	cpu_set_t cpuset;
258  	size_t cpusetsize = sizeof (cpuset);
259  	error = pthread_attr_getaffinity_np(old_attr, cpusetsize, &cpuset);
260  	if (error == 0)
261  		error = pthread_attr_setaffinity_np(attr, cpusetsize, &cpuset);
262  	if (error)
263  		goto error;
264  #endif /* __GLIBC__ */
265  
266  	int detachstate;
267  	error = pthread_attr_getdetachstate(old_attr, &detachstate);
268  	if (error == 0)
269  		error = pthread_attr_setdetachstate(attr, detachstate);
270  	if (error)
271  		goto error;
272  
273  	size_t guardsize;
274  	error = pthread_attr_getguardsize(old_attr, &guardsize);
275  	if (error == 0)
276  		error = pthread_attr_setguardsize(attr, guardsize);
277  	if (error)
278  		goto error;
279  
280  	int inheritsched;
281  	error = pthread_attr_getinheritsched(old_attr, &inheritsched);
282  	if (error == 0)
283  		error = pthread_attr_setinheritsched(attr, inheritsched);
284  	if (error)
285  		goto error;
286  
287  	struct sched_param param;
288  	error = pthread_attr_getschedparam(old_attr, &param);
289  	if (error == 0)
290  		error = pthread_attr_setschedparam(attr, &param);
291  	if (error)
292  		goto error;
293  
294  	int policy;
295  	error = pthread_attr_getschedpolicy(old_attr, &policy);
296  	if (error == 0)
297  		error = pthread_attr_setschedpolicy(attr, policy);
298  	if (error)
299  		goto error;
300  
301  	int scope;
302  	error = pthread_attr_getscope(old_attr, &scope);
303  	if (error == 0)
304  		error = pthread_attr_setscope(attr, scope);
305  	if (error)
306  		goto error;
307  
308  	void *stackaddr;
309  	size_t stacksize;
310  	error = pthread_attr_getstack(old_attr, &stackaddr, &stacksize);
311  	if (error == 0)
312  		error = pthread_attr_setstack(attr, stackaddr, stacksize);
313  	if (error)
314  		goto error;
315  
316  	return (0);
317  error:
318  	pthread_attr_destroy(attr);
319  	return (error);
320  }
321  
322  tpool_t	*
tpool_create(uint_t min_threads,uint_t max_threads,uint_t linger,pthread_attr_t * attr)323  tpool_create(uint_t min_threads, uint_t max_threads, uint_t linger,
324      pthread_attr_t *attr)
325  {
326  	tpool_t	*tpool;
327  	void *stackaddr;
328  	size_t stacksize;
329  	size_t minstack;
330  	int error;
331  
332  	if (min_threads > max_threads || max_threads < 1) {
333  		errno = EINVAL;
334  		return (NULL);
335  	}
336  	if (attr != NULL) {
337  		if (pthread_attr_getstack(attr, &stackaddr, &stacksize) != 0) {
338  			errno = EINVAL;
339  			return (NULL);
340  		}
341  		/*
342  		 * Allow only one thread in the pool with a specified stack.
343  		 * Require threads to have at least the minimum stack size.
344  		 */
345  		minstack = PTHREAD_STACK_MIN;
346  		if (stackaddr != NULL) {
347  			if (stacksize < minstack || max_threads != 1) {
348  				errno = EINVAL;
349  				return (NULL);
350  			}
351  		} else if (stacksize != 0 && stacksize < minstack) {
352  			errno = EINVAL;
353  			return (NULL);
354  		}
355  	}
356  
357  	tpool = calloc(1, sizeof (*tpool));
358  	if (tpool == NULL) {
359  		errno = ENOMEM;
360  		return (NULL);
361  	}
362  	(void) pthread_mutex_init(&tpool->tp_mutex, NULL);
363  	(void) pthread_cond_init(&tpool->tp_busycv, NULL);
364  	(void) pthread_cond_init(&tpool->tp_workcv, NULL);
365  	(void) pthread_cond_init(&tpool->tp_waitcv, NULL);
366  	tpool->tp_minimum = min_threads;
367  	tpool->tp_maximum = max_threads;
368  	tpool->tp_linger = linger;
369  
370  	/*
371  	 * We cannot just copy the attribute pointer.
372  	 * We need to initialize a new pthread_attr_t structure
373  	 * with the values from the user-supplied pthread_attr_t.
374  	 * If the attribute pointer is NULL, we need to initialize
375  	 * the new pthread_attr_t structure with default values.
376  	 */
377  	error = pthread_attr_clone(&tpool->tp_attr, attr);
378  	if (error) {
379  		free(tpool);
380  		errno = error;
381  		return (NULL);
382  	}
383  
384  	/* make all pool threads be detached daemon threads */
385  	(void) pthread_attr_setdetachstate(&tpool->tp_attr,
386  	    PTHREAD_CREATE_DETACHED);
387  
388  	/* insert into the global list of all thread pools */
389  	pthread_mutex_lock(&thread_pool_lock);
390  	if (thread_pools == NULL) {
391  		tpool->tp_forw = tpool;
392  		tpool->tp_back = tpool;
393  		thread_pools = tpool;
394  	} else {
395  		thread_pools->tp_back->tp_forw = tpool;
396  		tpool->tp_forw = thread_pools;
397  		tpool->tp_back = thread_pools->tp_back;
398  		thread_pools->tp_back = tpool;
399  	}
400  	pthread_mutex_unlock(&thread_pool_lock);
401  
402  	return (tpool);
403  }
404  
405  /*
406   * Dispatch a work request to the thread pool.
407   * If there are idle workers, awaken one.
408   * Else, if the maximum number of workers has
409   * not been reached, spawn a new worker thread.
410   * Else just return with the job added to the queue.
411   */
412  int
tpool_dispatch(tpool_t * tpool,void (* func)(void *),void * arg)413  tpool_dispatch(tpool_t *tpool, void (*func)(void *), void *arg)
414  {
415  	tpool_job_t *job;
416  
417  	ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
418  
419  	if ((job = calloc(1, sizeof (*job))) == NULL)
420  		return (-1);
421  	job->tpj_next = NULL;
422  	job->tpj_func = func;
423  	job->tpj_arg = arg;
424  
425  	pthread_mutex_lock(&tpool->tp_mutex);
426  
427  	if (!(tpool->tp_flags & TP_SUSPEND)) {
428  		if (tpool->tp_idle > 0)
429  			(void) pthread_cond_signal(&tpool->tp_workcv);
430  		else if (tpool->tp_current >= tpool->tp_maximum) {
431  			/* At worker limit.  Leave task on queue */
432  		} else {
433  			if (create_worker(tpool) == 0) {
434  				/* Started a new worker thread */
435  				tpool->tp_current++;
436  			} else if (tpool->tp_current > 0) {
437  				/* Leave task on queue */
438  			} else {
439  				/* Cannot start a single worker! */
440  				pthread_mutex_unlock(&tpool->tp_mutex);
441  				free(job);
442  				return (-1);
443  			}
444  		}
445  	}
446  
447  	if (tpool->tp_head == NULL)
448  		tpool->tp_head = job;
449  	else
450  		tpool->tp_tail->tpj_next = job;
451  	tpool->tp_tail = job;
452  	tpool->tp_njobs++;
453  
454  	pthread_mutex_unlock(&tpool->tp_mutex);
455  	return (0);
456  }
457  
458  static void
tpool_cleanup(void * arg)459  tpool_cleanup(void *arg)
460  {
461  	tpool_t *tpool = (tpool_t *)arg;
462  
463  	pthread_mutex_unlock(&tpool->tp_mutex);
464  }
465  
466  /*
467   * Assumes: by the time tpool_destroy() is called no one will use this
468   * thread pool in any way and no one will try to dispatch entries to it.
469   * Calling tpool_destroy() from a job in the pool will cause deadlock.
470   */
471  void
tpool_destroy(tpool_t * tpool)472  tpool_destroy(tpool_t *tpool)
473  {
474  	tpool_active_t *activep;
475  
476  	ASSERT(!tpool_member(tpool));
477  	ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
478  
479  	pthread_mutex_lock(&tpool->tp_mutex);
480  	pthread_cleanup_push(tpool_cleanup, tpool);
481  
482  	/* mark the pool as being destroyed; wakeup idle workers */
483  	tpool->tp_flags |= TP_DESTROY;
484  	tpool->tp_flags &= ~TP_SUSPEND;
485  	(void) pthread_cond_broadcast(&tpool->tp_workcv);
486  
487  	/* cancel all active workers */
488  	for (activep = tpool->tp_active; activep; activep = activep->tpa_next)
489  		(void) pthread_cancel(activep->tpa_tid);
490  
491  	/* wait for all active workers to finish */
492  	while (tpool->tp_active != NULL) {
493  		tpool->tp_flags |= TP_WAIT;
494  		(void) pthread_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex);
495  	}
496  
497  	/* the last worker to terminate will wake us up */
498  	while (tpool->tp_current != 0)
499  		(void) pthread_cond_wait(&tpool->tp_busycv, &tpool->tp_mutex);
500  
501  	pthread_cleanup_pop(1);	/* pthread_mutex_unlock(&tpool->tp_mutex); */
502  	delete_pool(tpool);
503  }
504  
505  /*
506   * Like tpool_destroy(), but don't cancel workers or wait for them to finish.
507   * The last worker to terminate will delete the pool.
508   */
509  void
tpool_abandon(tpool_t * tpool)510  tpool_abandon(tpool_t *tpool)
511  {
512  	ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
513  
514  	pthread_mutex_lock(&tpool->tp_mutex);
515  	if (tpool->tp_current == 0) {
516  		/* no workers, just delete the pool */
517  		pthread_mutex_unlock(&tpool->tp_mutex);
518  		delete_pool(tpool);
519  	} else {
520  		/* wake up all workers, last one will delete the pool */
521  		tpool->tp_flags |= TP_ABANDON;
522  		tpool->tp_flags &= ~TP_SUSPEND;
523  		(void) pthread_cond_broadcast(&tpool->tp_workcv);
524  		pthread_mutex_unlock(&tpool->tp_mutex);
525  	}
526  }
527  
528  /*
529   * Wait for all jobs to complete.
530   * Calling tpool_wait() from a job in the pool will cause deadlock.
531   */
532  void
tpool_wait(tpool_t * tpool)533  tpool_wait(tpool_t *tpool)
534  {
535  	ASSERT(!tpool_member(tpool));
536  	ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
537  
538  	pthread_mutex_lock(&tpool->tp_mutex);
539  	pthread_cleanup_push(tpool_cleanup, tpool);
540  	while (tpool->tp_head != NULL || tpool->tp_active != NULL) {
541  		tpool->tp_flags |= TP_WAIT;
542  		(void) pthread_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex);
543  		ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
544  	}
545  	pthread_cleanup_pop(1);	/* pthread_mutex_unlock(&tpool->tp_mutex); */
546  }
547  
548  void
tpool_suspend(tpool_t * tpool)549  tpool_suspend(tpool_t *tpool)
550  {
551  	ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
552  
553  	pthread_mutex_lock(&tpool->tp_mutex);
554  	tpool->tp_flags |= TP_SUSPEND;
555  	pthread_mutex_unlock(&tpool->tp_mutex);
556  }
557  
558  int
tpool_suspended(tpool_t * tpool)559  tpool_suspended(tpool_t *tpool)
560  {
561  	int suspended;
562  
563  	ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
564  
565  	pthread_mutex_lock(&tpool->tp_mutex);
566  	suspended = (tpool->tp_flags & TP_SUSPEND) != 0;
567  	pthread_mutex_unlock(&tpool->tp_mutex);
568  
569  	return (suspended);
570  }
571  
572  void
tpool_resume(tpool_t * tpool)573  tpool_resume(tpool_t *tpool)
574  {
575  	int excess;
576  
577  	ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
578  
579  	pthread_mutex_lock(&tpool->tp_mutex);
580  	if (!(tpool->tp_flags & TP_SUSPEND)) {
581  		pthread_mutex_unlock(&tpool->tp_mutex);
582  		return;
583  	}
584  	tpool->tp_flags &= ~TP_SUSPEND;
585  	(void) pthread_cond_broadcast(&tpool->tp_workcv);
586  	excess = tpool->tp_njobs - tpool->tp_idle;
587  	while (excess-- > 0 && tpool->tp_current < tpool->tp_maximum) {
588  		if (create_worker(tpool) != 0)
589  			break;		/* pthread_create() failed */
590  		tpool->tp_current++;
591  	}
592  	pthread_mutex_unlock(&tpool->tp_mutex);
593  }
594  
595  int
tpool_member(tpool_t * tpool)596  tpool_member(tpool_t *tpool)
597  {
598  	pthread_t my_tid = pthread_self();
599  	tpool_active_t *activep;
600  
601  	ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
602  
603  	pthread_mutex_lock(&tpool->tp_mutex);
604  	for (activep = tpool->tp_active; activep; activep = activep->tpa_next) {
605  		if (activep->tpa_tid == my_tid) {
606  			pthread_mutex_unlock(&tpool->tp_mutex);
607  			return (1);
608  		}
609  	}
610  	pthread_mutex_unlock(&tpool->tp_mutex);
611  	return (0);
612  }
613