xref: /freebsd/sys/contrib/openzfs/lib/libtpool/thread_pool.c (revision 643ac419fafba89f5adda0e0ea75b538727453fb)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 
22 /*
23  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 
27 #include <stdlib.h>
28 #include <signal.h>
29 #include <errno.h>
30 #include <assert.h>
31 #include "thread_pool_impl.h"
32 
33 static pthread_mutex_t thread_pool_lock = PTHREAD_MUTEX_INITIALIZER;
34 static tpool_t *thread_pools = NULL;
35 
36 static void
37 delete_pool(tpool_t *tpool)
38 {
39 	tpool_job_t *job;
40 
41 	ASSERT(tpool->tp_current == 0 && tpool->tp_active == NULL);
42 
43 	/*
44 	 * Unlink the pool from the global list of all pools.
45 	 */
46 	(void) pthread_mutex_lock(&thread_pool_lock);
47 	if (thread_pools == tpool)
48 		thread_pools = tpool->tp_forw;
49 	if (thread_pools == tpool)
50 		thread_pools = NULL;
51 	else {
52 		tpool->tp_back->tp_forw = tpool->tp_forw;
53 		tpool->tp_forw->tp_back = tpool->tp_back;
54 	}
55 	pthread_mutex_unlock(&thread_pool_lock);
56 
57 	/*
58 	 * There should be no pending jobs, but just in case...
59 	 */
60 	for (job = tpool->tp_head; job != NULL; job = tpool->tp_head) {
61 		tpool->tp_head = job->tpj_next;
62 		free(job);
63 	}
64 	(void) pthread_attr_destroy(&tpool->tp_attr);
65 	free(tpool);
66 }
67 
68 /*
69  * Worker thread is terminating.
70  */
71 static void
72 worker_cleanup(void *arg)
73 {
74 	tpool_t *tpool = (tpool_t *)arg;
75 
76 	if (--tpool->tp_current == 0 &&
77 	    (tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) {
78 		if (tpool->tp_flags & TP_ABANDON) {
79 			pthread_mutex_unlock(&tpool->tp_mutex);
80 			delete_pool(tpool);
81 			return;
82 		}
83 		if (tpool->tp_flags & TP_DESTROY)
84 			(void) pthread_cond_broadcast(&tpool->tp_busycv);
85 	}
86 	pthread_mutex_unlock(&tpool->tp_mutex);
87 }
88 
89 static void
90 notify_waiters(tpool_t *tpool)
91 {
92 	if (tpool->tp_head == NULL && tpool->tp_active == NULL) {
93 		tpool->tp_flags &= ~TP_WAIT;
94 		(void) pthread_cond_broadcast(&tpool->tp_waitcv);
95 	}
96 }
97 
98 /*
99  * Called by a worker thread on return from a tpool_dispatch()d job.
100  */
101 static void
102 job_cleanup(void *arg)
103 {
104 	tpool_t *tpool = (tpool_t *)arg;
105 
106 	pthread_t my_tid = pthread_self();
107 	tpool_active_t *activep;
108 	tpool_active_t **activepp;
109 
110 	pthread_mutex_lock(&tpool->tp_mutex);
111 	for (activepp = &tpool->tp_active; ; activepp = &activep->tpa_next) {
112 		activep = *activepp;
113 		if (activep->tpa_tid == my_tid) {
114 			*activepp = activep->tpa_next;
115 			break;
116 		}
117 	}
118 	if (tpool->tp_flags & TP_WAIT)
119 		notify_waiters(tpool);
120 }
121 
122 static void *
123 tpool_worker(void *arg)
124 {
125 	tpool_t *tpool = (tpool_t *)arg;
126 	int elapsed;
127 	tpool_job_t *job;
128 	void (*func)(void *);
129 	tpool_active_t active;
130 
131 	pthread_mutex_lock(&tpool->tp_mutex);
132 	pthread_cleanup_push(worker_cleanup, tpool);
133 
134 	/*
135 	 * This is the worker's main loop.
136 	 * It will only be left if a timeout or an error has occurred.
137 	 */
138 	active.tpa_tid = pthread_self();
139 	for (;;) {
140 		elapsed = 0;
141 		tpool->tp_idle++;
142 		if (tpool->tp_flags & TP_WAIT)
143 			notify_waiters(tpool);
144 		while ((tpool->tp_head == NULL ||
145 		    (tpool->tp_flags & TP_SUSPEND)) &&
146 		    !(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) {
147 			if (tpool->tp_current <= tpool->tp_minimum ||
148 			    tpool->tp_linger == 0) {
149 				(void) pthread_cond_wait(&tpool->tp_workcv,
150 				    &tpool->tp_mutex);
151 			} else {
152 				struct timespec ts;
153 
154 				clock_gettime(CLOCK_REALTIME, &ts);
155 				ts.tv_sec += tpool->tp_linger;
156 
157 				if (pthread_cond_timedwait(&tpool->tp_workcv,
158 				    &tpool->tp_mutex, &ts) != 0) {
159 					elapsed = 1;
160 					break;
161 				}
162 			}
163 		}
164 		tpool->tp_idle--;
165 		if (tpool->tp_flags & TP_DESTROY)
166 			break;
167 		if (tpool->tp_flags & TP_ABANDON) {
168 			/* can't abandon a suspended pool */
169 			if (tpool->tp_flags & TP_SUSPEND) {
170 				tpool->tp_flags &= ~TP_SUSPEND;
171 				(void) pthread_cond_broadcast(
172 				    &tpool->tp_workcv);
173 			}
174 			if (tpool->tp_head == NULL)
175 				break;
176 		}
177 		if ((job = tpool->tp_head) != NULL &&
178 		    !(tpool->tp_flags & TP_SUSPEND)) {
179 			elapsed = 0;
180 			func = job->tpj_func;
181 			arg = job->tpj_arg;
182 			tpool->tp_head = job->tpj_next;
183 			if (job == tpool->tp_tail)
184 				tpool->tp_tail = NULL;
185 			tpool->tp_njobs--;
186 			active.tpa_next = tpool->tp_active;
187 			tpool->tp_active = &active;
188 			pthread_mutex_unlock(&tpool->tp_mutex);
189 			pthread_cleanup_push(job_cleanup, tpool);
190 			free(job);
191 
192 			sigset_t maskset;
193 			(void) pthread_sigmask(SIG_SETMASK, NULL, &maskset);
194 
195 			/*
196 			 * Call the specified function.
197 			 */
198 			func(arg);
199 			/*
200 			 * We don't know what this thread has been doing,
201 			 * so we reset its signal mask and cancellation
202 			 * state back to the values prior to calling func().
203 			 */
204 			(void) pthread_sigmask(SIG_SETMASK, &maskset, NULL);
205 			(void) pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED,
206 			    NULL);
207 			(void) pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,
208 			    NULL);
209 			pthread_cleanup_pop(1);
210 		}
211 		if (elapsed && tpool->tp_current > tpool->tp_minimum) {
212 			/*
213 			 * We timed out and there is no work to be done
214 			 * and the number of workers exceeds the minimum.
215 			 * Exit now to reduce the size of the pool.
216 			 */
217 			break;
218 		}
219 	}
220 	pthread_cleanup_pop(1);
221 	return (arg);
222 }
223 
224 /*
225  * Create a worker thread, with default signals blocked.
226  */
227 static int
228 create_worker(tpool_t *tpool)
229 {
230 	pthread_t thread;
231 	sigset_t oset;
232 	int error;
233 
234 	(void) pthread_sigmask(SIG_SETMASK, NULL, &oset);
235 	error = pthread_create(&thread, &tpool->tp_attr, tpool_worker, tpool);
236 	(void) pthread_sigmask(SIG_SETMASK, &oset, NULL);
237 	return (error);
238 }
239 
240 
241 /*
242  * pthread_attr_clone: make a copy of a pthread_attr_t.  When old_attr
243  * is NULL initialize the cloned attr using default values.
244  */
245 static int
246 pthread_attr_clone(pthread_attr_t *attr, const pthread_attr_t *old_attr)
247 {
248 	int error;
249 
250 	error = pthread_attr_init(attr);
251 	if (error || (old_attr == NULL))
252 		return (error);
253 
254 #ifdef __GLIBC__
255 	cpu_set_t cpuset;
256 	size_t cpusetsize = sizeof (cpuset);
257 	error = pthread_attr_getaffinity_np(old_attr, cpusetsize, &cpuset);
258 	if (error == 0)
259 		error = pthread_attr_setaffinity_np(attr, cpusetsize, &cpuset);
260 	if (error)
261 		goto error;
262 #endif /* __GLIBC__ */
263 
264 	int detachstate;
265 	error = pthread_attr_getdetachstate(old_attr, &detachstate);
266 	if (error == 0)
267 		error = pthread_attr_setdetachstate(attr, detachstate);
268 	if (error)
269 		goto error;
270 
271 	size_t guardsize;
272 	error = pthread_attr_getguardsize(old_attr, &guardsize);
273 	if (error == 0)
274 		error = pthread_attr_setguardsize(attr, guardsize);
275 	if (error)
276 		goto error;
277 
278 	int inheritsched;
279 	error = pthread_attr_getinheritsched(old_attr, &inheritsched);
280 	if (error == 0)
281 		error = pthread_attr_setinheritsched(attr, inheritsched);
282 	if (error)
283 		goto error;
284 
285 	struct sched_param param;
286 	error = pthread_attr_getschedparam(old_attr, &param);
287 	if (error == 0)
288 		error = pthread_attr_setschedparam(attr, &param);
289 	if (error)
290 		goto error;
291 
292 	int policy;
293 	error = pthread_attr_getschedpolicy(old_attr, &policy);
294 	if (error == 0)
295 		error = pthread_attr_setschedpolicy(attr, policy);
296 	if (error)
297 		goto error;
298 
299 	int scope;
300 	error = pthread_attr_getscope(old_attr, &scope);
301 	if (error == 0)
302 		error = pthread_attr_setscope(attr, scope);
303 	if (error)
304 		goto error;
305 
306 	void *stackaddr;
307 	size_t stacksize;
308 	error = pthread_attr_getstack(old_attr, &stackaddr, &stacksize);
309 	if (error == 0)
310 		error = pthread_attr_setstack(attr, stackaddr, stacksize);
311 	if (error)
312 		goto error;
313 
314 	return (0);
315 error:
316 	pthread_attr_destroy(attr);
317 	return (error);
318 }
319 
320 tpool_t	*
321 tpool_create(uint_t min_threads, uint_t max_threads, uint_t linger,
322     pthread_attr_t *attr)
323 {
324 	tpool_t	*tpool;
325 	void *stackaddr;
326 	size_t stacksize;
327 	size_t minstack;
328 	int error;
329 
330 	if (min_threads > max_threads || max_threads < 1) {
331 		errno = EINVAL;
332 		return (NULL);
333 	}
334 	if (attr != NULL) {
335 		if (pthread_attr_getstack(attr, &stackaddr, &stacksize) != 0) {
336 			errno = EINVAL;
337 			return (NULL);
338 		}
339 		/*
340 		 * Allow only one thread in the pool with a specified stack.
341 		 * Require threads to have at least the minimum stack size.
342 		 */
343 		minstack = PTHREAD_STACK_MIN;
344 		if (stackaddr != NULL) {
345 			if (stacksize < minstack || max_threads != 1) {
346 				errno = EINVAL;
347 				return (NULL);
348 			}
349 		} else if (stacksize != 0 && stacksize < minstack) {
350 			errno = EINVAL;
351 			return (NULL);
352 		}
353 	}
354 
355 	tpool = calloc(1, sizeof (*tpool));
356 	if (tpool == NULL) {
357 		errno = ENOMEM;
358 		return (NULL);
359 	}
360 	(void) pthread_mutex_init(&tpool->tp_mutex, NULL);
361 	(void) pthread_cond_init(&tpool->tp_busycv, NULL);
362 	(void) pthread_cond_init(&tpool->tp_workcv, NULL);
363 	(void) pthread_cond_init(&tpool->tp_waitcv, NULL);
364 	tpool->tp_minimum = min_threads;
365 	tpool->tp_maximum = max_threads;
366 	tpool->tp_linger = linger;
367 
368 	/*
369 	 * We cannot just copy the attribute pointer.
370 	 * We need to initialize a new pthread_attr_t structure
371 	 * with the values from the user-supplied pthread_attr_t.
372 	 * If the attribute pointer is NULL, we need to initialize
373 	 * the new pthread_attr_t structure with default values.
374 	 */
375 	error = pthread_attr_clone(&tpool->tp_attr, attr);
376 	if (error) {
377 		free(tpool);
378 		errno = error;
379 		return (NULL);
380 	}
381 
382 	/* make all pool threads be detached daemon threads */
383 	(void) pthread_attr_setdetachstate(&tpool->tp_attr,
384 	    PTHREAD_CREATE_DETACHED);
385 
386 	/* insert into the global list of all thread pools */
387 	pthread_mutex_lock(&thread_pool_lock);
388 	if (thread_pools == NULL) {
389 		tpool->tp_forw = tpool;
390 		tpool->tp_back = tpool;
391 		thread_pools = tpool;
392 	} else {
393 		thread_pools->tp_back->tp_forw = tpool;
394 		tpool->tp_forw = thread_pools;
395 		tpool->tp_back = thread_pools->tp_back;
396 		thread_pools->tp_back = tpool;
397 	}
398 	pthread_mutex_unlock(&thread_pool_lock);
399 
400 	return (tpool);
401 }
402 
403 /*
404  * Dispatch a work request to the thread pool.
405  * If there are idle workers, awaken one.
406  * Else, if the maximum number of workers has
407  * not been reached, spawn a new worker thread.
408  * Else just return with the job added to the queue.
409  */
410 int
411 tpool_dispatch(tpool_t *tpool, void (*func)(void *), void *arg)
412 {
413 	tpool_job_t *job;
414 
415 	ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
416 
417 	if ((job = calloc(1, sizeof (*job))) == NULL)
418 		return (-1);
419 	job->tpj_next = NULL;
420 	job->tpj_func = func;
421 	job->tpj_arg = arg;
422 
423 	pthread_mutex_lock(&tpool->tp_mutex);
424 
425 	if (tpool->tp_head == NULL)
426 		tpool->tp_head = job;
427 	else
428 		tpool->tp_tail->tpj_next = job;
429 	tpool->tp_tail = job;
430 	tpool->tp_njobs++;
431 
432 	if (!(tpool->tp_flags & TP_SUSPEND)) {
433 		if (tpool->tp_idle > 0)
434 			(void) pthread_cond_signal(&tpool->tp_workcv);
435 		else if (tpool->tp_current < tpool->tp_maximum &&
436 		    create_worker(tpool) == 0)
437 			tpool->tp_current++;
438 	}
439 
440 	pthread_mutex_unlock(&tpool->tp_mutex);
441 	return (0);
442 }
443 
444 static void
445 tpool_cleanup(void *arg)
446 {
447 	tpool_t *tpool = (tpool_t *)arg;
448 
449 	pthread_mutex_unlock(&tpool->tp_mutex);
450 }
451 
452 /*
453  * Assumes: by the time tpool_destroy() is called no one will use this
454  * thread pool in any way and no one will try to dispatch entries to it.
455  * Calling tpool_destroy() from a job in the pool will cause deadlock.
456  */
457 void
458 tpool_destroy(tpool_t *tpool)
459 {
460 	tpool_active_t *activep;
461 
462 	ASSERT(!tpool_member(tpool));
463 	ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
464 
465 	pthread_mutex_lock(&tpool->tp_mutex);
466 	pthread_cleanup_push(tpool_cleanup, tpool);
467 
468 	/* mark the pool as being destroyed; wakeup idle workers */
469 	tpool->tp_flags |= TP_DESTROY;
470 	tpool->tp_flags &= ~TP_SUSPEND;
471 	(void) pthread_cond_broadcast(&tpool->tp_workcv);
472 
473 	/* cancel all active workers */
474 	for (activep = tpool->tp_active; activep; activep = activep->tpa_next)
475 		(void) pthread_cancel(activep->tpa_tid);
476 
477 	/* wait for all active workers to finish */
478 	while (tpool->tp_active != NULL) {
479 		tpool->tp_flags |= TP_WAIT;
480 		(void) pthread_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex);
481 	}
482 
483 	/* the last worker to terminate will wake us up */
484 	while (tpool->tp_current != 0)
485 		(void) pthread_cond_wait(&tpool->tp_busycv, &tpool->tp_mutex);
486 
487 	pthread_cleanup_pop(1);	/* pthread_mutex_unlock(&tpool->tp_mutex); */
488 	delete_pool(tpool);
489 }
490 
491 /*
492  * Like tpool_destroy(), but don't cancel workers or wait for them to finish.
493  * The last worker to terminate will delete the pool.
494  */
495 void
496 tpool_abandon(tpool_t *tpool)
497 {
498 	ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
499 
500 	pthread_mutex_lock(&tpool->tp_mutex);
501 	if (tpool->tp_current == 0) {
502 		/* no workers, just delete the pool */
503 		pthread_mutex_unlock(&tpool->tp_mutex);
504 		delete_pool(tpool);
505 	} else {
506 		/* wake up all workers, last one will delete the pool */
507 		tpool->tp_flags |= TP_ABANDON;
508 		tpool->tp_flags &= ~TP_SUSPEND;
509 		(void) pthread_cond_broadcast(&tpool->tp_workcv);
510 		pthread_mutex_unlock(&tpool->tp_mutex);
511 	}
512 }
513 
514 /*
515  * Wait for all jobs to complete.
516  * Calling tpool_wait() from a job in the pool will cause deadlock.
517  */
518 void
519 tpool_wait(tpool_t *tpool)
520 {
521 	ASSERT(!tpool_member(tpool));
522 	ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
523 
524 	pthread_mutex_lock(&tpool->tp_mutex);
525 	pthread_cleanup_push(tpool_cleanup, tpool);
526 	while (tpool->tp_head != NULL || tpool->tp_active != NULL) {
527 		tpool->tp_flags |= TP_WAIT;
528 		(void) pthread_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex);
529 		ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
530 	}
531 	pthread_cleanup_pop(1);	/* pthread_mutex_unlock(&tpool->tp_mutex); */
532 }
533 
534 void
535 tpool_suspend(tpool_t *tpool)
536 {
537 	ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
538 
539 	pthread_mutex_lock(&tpool->tp_mutex);
540 	tpool->tp_flags |= TP_SUSPEND;
541 	pthread_mutex_unlock(&tpool->tp_mutex);
542 }
543 
544 int
545 tpool_suspended(tpool_t *tpool)
546 {
547 	int suspended;
548 
549 	ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
550 
551 	pthread_mutex_lock(&tpool->tp_mutex);
552 	suspended = (tpool->tp_flags & TP_SUSPEND) != 0;
553 	pthread_mutex_unlock(&tpool->tp_mutex);
554 
555 	return (suspended);
556 }
557 
558 void
559 tpool_resume(tpool_t *tpool)
560 {
561 	int excess;
562 
563 	ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
564 
565 	pthread_mutex_lock(&tpool->tp_mutex);
566 	if (!(tpool->tp_flags & TP_SUSPEND)) {
567 		pthread_mutex_unlock(&tpool->tp_mutex);
568 		return;
569 	}
570 	tpool->tp_flags &= ~TP_SUSPEND;
571 	(void) pthread_cond_broadcast(&tpool->tp_workcv);
572 	excess = tpool->tp_njobs - tpool->tp_idle;
573 	while (excess-- > 0 && tpool->tp_current < tpool->tp_maximum) {
574 		if (create_worker(tpool) != 0)
575 			break;		/* pthread_create() failed */
576 		tpool->tp_current++;
577 	}
578 	pthread_mutex_unlock(&tpool->tp_mutex);
579 }
580 
581 int
582 tpool_member(tpool_t *tpool)
583 {
584 	pthread_t my_tid = pthread_self();
585 	tpool_active_t *activep;
586 
587 	ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
588 
589 	pthread_mutex_lock(&tpool->tp_mutex);
590 	for (activep = tpool->tp_active; activep; activep = activep->tpa_next) {
591 		if (activep->tpa_tid == my_tid) {
592 			pthread_mutex_unlock(&tpool->tp_mutex);
593 			return (1);
594 		}
595 	}
596 	pthread_mutex_unlock(&tpool->tp_mutex);
597 	return (0);
598 }
599