xref: /illumos-gate/usr/src/lib/libc/port/tpool/thread_pool.c (revision 1da57d551424de5a9d469760be7c4b4d4f10a755)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 
22 /*
23  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 
27 #include "lint.h"
28 #include "thr_uberdata.h"
29 #include <stdlib.h>
30 #include <signal.h>
31 #include <errno.h>
32 #include "thread_pool_impl.h"
33 
34 static mutex_t thread_pool_lock = DEFAULTMUTEX;
35 static tpool_t *thread_pools = NULL;
36 
37 static void
delete_pool(tpool_t * tpool)38 delete_pool(tpool_t *tpool)
39 {
40 	tpool_job_t *job;
41 
42 	ASSERT(tpool->tp_current == 0 && tpool->tp_active == NULL);
43 
44 	/*
45 	 * Unlink the pool from the global list of all pools.
46 	 */
47 	lmutex_lock(&thread_pool_lock);
48 	if (thread_pools == tpool)
49 		thread_pools = tpool->tp_forw;
50 	if (thread_pools == tpool)
51 		thread_pools = NULL;
52 	else {
53 		tpool->tp_back->tp_forw = tpool->tp_forw;
54 		tpool->tp_forw->tp_back = tpool->tp_back;
55 	}
56 	lmutex_unlock(&thread_pool_lock);
57 
58 	/*
59 	 * There should be no pending jobs, but just in case...
60 	 */
61 	for (job = tpool->tp_head; job != NULL; job = tpool->tp_head) {
62 		tpool->tp_head = job->tpj_next;
63 		lfree(job, sizeof (*job));
64 	}
65 	(void) pthread_attr_destroy(&tpool->tp_attr);
66 	lfree(tpool, sizeof (*tpool));
67 }
68 
69 /*
70  * Worker thread is terminating.
71  */
72 static void
worker_cleanup(tpool_t * tpool)73 worker_cleanup(tpool_t *tpool)
74 {
75 	ASSERT(MUTEX_HELD(&tpool->tp_mutex));
76 
77 	if (--tpool->tp_current == 0 &&
78 	    (tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) {
79 		if (tpool->tp_flags & TP_ABANDON) {
80 			sig_mutex_unlock(&tpool->tp_mutex);
81 			delete_pool(tpool);
82 			return;
83 		}
84 		if (tpool->tp_flags & TP_DESTROY)
85 			(void) cond_broadcast(&tpool->tp_busycv);
86 	}
87 	sig_mutex_unlock(&tpool->tp_mutex);
88 }
89 
90 static void
notify_waiters(tpool_t * tpool)91 notify_waiters(tpool_t *tpool)
92 {
93 	if (tpool->tp_head == NULL && tpool->tp_active == NULL) {
94 		tpool->tp_flags &= ~TP_WAIT;
95 		(void) cond_broadcast(&tpool->tp_waitcv);
96 	}
97 }
98 
99 /*
100  * Called by a worker thread on return from a tpool_dispatch()d job.
101  */
102 static void
job_cleanup(tpool_t * tpool)103 job_cleanup(tpool_t *tpool)
104 {
105 	pthread_t my_tid = pthread_self();
106 	tpool_active_t *activep;
107 	tpool_active_t **activepp;
108 
109 	sig_mutex_lock(&tpool->tp_mutex);
110 	/* CSTYLED */
111 	for (activepp = &tpool->tp_active;; activepp = &activep->tpa_next) {
112 		activep = *activepp;
113 		if (activep->tpa_tid == my_tid) {
114 			*activepp = activep->tpa_next;
115 			break;
116 		}
117 	}
118 	if (tpool->tp_flags & TP_WAIT)
119 		notify_waiters(tpool);
120 }
121 
122 static void *
tpool_worker(void * arg)123 tpool_worker(void *arg)
124 {
125 	tpool_t *tpool = (tpool_t *)arg;
126 	int elapsed;
127 	tpool_job_t *job;
128 	void (*func)(void *);
129 	tpool_active_t active;
130 
131 	sig_mutex_lock(&tpool->tp_mutex);
132 	pthread_cleanup_push(worker_cleanup, tpool);
133 
134 	/*
135 	 * This is the worker's main loop.
136 	 * It will only be left if a timeout or an error has occured.
137 	 */
138 	active.tpa_tid = pthread_self();
139 	for (;;) {
140 		elapsed = 0;
141 		tpool->tp_idle++;
142 		if (tpool->tp_flags & TP_WAIT)
143 			notify_waiters(tpool);
144 		while ((tpool->tp_head == NULL ||
145 		    (tpool->tp_flags & TP_SUSPEND)) &&
146 		    !(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) {
147 			if (tpool->tp_current <= tpool->tp_minimum ||
148 			    tpool->tp_linger == 0) {
149 				(void) sig_cond_wait(&tpool->tp_workcv,
150 				    &tpool->tp_mutex);
151 			} else {
152 				timestruc_t timeout;
153 
154 				timeout.tv_sec = tpool->tp_linger;
155 				timeout.tv_nsec = 0;
156 				if (sig_cond_reltimedwait(&tpool->tp_workcv,
157 				    &tpool->tp_mutex, &timeout) != 0) {
158 					elapsed = 1;
159 					break;
160 				}
161 			}
162 		}
163 		tpool->tp_idle--;
164 		if (tpool->tp_flags & TP_DESTROY)
165 			break;
166 		if (tpool->tp_flags & TP_ABANDON) {
167 			/* can't abandon a suspended pool */
168 			if (tpool->tp_flags & TP_SUSPEND) {
169 				tpool->tp_flags &= ~TP_SUSPEND;
170 				(void) cond_broadcast(&tpool->tp_workcv);
171 			}
172 			if (tpool->tp_head == NULL)
173 				break;
174 		}
175 		if ((job = tpool->tp_head) != NULL &&
176 		    !(tpool->tp_flags & TP_SUSPEND)) {
177 			elapsed = 0;
178 			func = job->tpj_func;
179 			arg = job->tpj_arg;
180 			tpool->tp_head = job->tpj_next;
181 			if (job == tpool->tp_tail)
182 				tpool->tp_tail = NULL;
183 			tpool->tp_njobs--;
184 			active.tpa_next = tpool->tp_active;
185 			tpool->tp_active = &active;
186 			sig_mutex_unlock(&tpool->tp_mutex);
187 			pthread_cleanup_push(job_cleanup, tpool);
188 			lfree(job, sizeof (*job));
189 			/*
190 			 * Call the specified function.
191 			 */
192 			func(arg);
193 			/*
194 			 * We don't know what this thread has been doing,
195 			 * so we reset its signal mask and cancellation
196 			 * state back to the initial values.
197 			 */
198 			(void) pthread_sigmask(SIG_SETMASK, &maskset, NULL);
199 			(void) pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED,
200 			    NULL);
201 			(void) pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,
202 			    NULL);
203 			pthread_cleanup_pop(1);
204 		}
205 		if (elapsed && tpool->tp_current > tpool->tp_minimum) {
206 			/*
207 			 * We timed out and there is no work to be done
208 			 * and the number of workers exceeds the minimum.
209 			 * Exit now to reduce the size of the pool.
210 			 */
211 			break;
212 		}
213 	}
214 	pthread_cleanup_pop(1);
215 	return (arg);
216 }
217 
218 /*
219  * Create a worker thread, with all signals blocked.
220  */
221 static int
create_worker(tpool_t * tpool)222 create_worker(tpool_t *tpool)
223 {
224 	sigset_t oset;
225 	int error;
226 
227 	(void) pthread_sigmask(SIG_SETMASK, &maskset, &oset);
228 	error = pthread_create(NULL, &tpool->tp_attr, tpool_worker, tpool);
229 	(void) pthread_sigmask(SIG_SETMASK, &oset, NULL);
230 	return (error);
231 }
232 
233 tpool_t	*
tpool_create(uint_t min_threads,uint_t max_threads,uint_t linger,pthread_attr_t * attr)234 tpool_create(uint_t min_threads, uint_t max_threads, uint_t linger,
235 	pthread_attr_t *attr)
236 {
237 	tpool_t	*tpool;
238 	void *stackaddr;
239 	size_t stacksize;
240 	size_t minstack;
241 	int error;
242 
243 	if (min_threads > max_threads || max_threads < 1) {
244 		errno = EINVAL;
245 		return (NULL);
246 	}
247 	if (attr != NULL) {
248 		if (pthread_attr_getstack(attr, &stackaddr, &stacksize) != 0) {
249 			errno = EINVAL;
250 			return (NULL);
251 		}
252 		/*
253 		 * Allow only one thread in the pool with a specified stack.
254 		 * Require threads to have at least the minimum stack size.
255 		 */
256 		minstack = thr_min_stack();
257 		if (stackaddr != NULL) {
258 			if (stacksize < minstack || max_threads != 1) {
259 				errno = EINVAL;
260 				return (NULL);
261 			}
262 		} else if (stacksize != 0 && stacksize < minstack) {
263 			errno = EINVAL;
264 			return (NULL);
265 		}
266 	}
267 
268 	tpool = lmalloc(sizeof (*tpool));
269 	if (tpool == NULL) {
270 		errno = ENOMEM;
271 		return (NULL);
272 	}
273 	(void) mutex_init(&tpool->tp_mutex, USYNC_THREAD, NULL);
274 	(void) cond_init(&tpool->tp_busycv, USYNC_THREAD, NULL);
275 	(void) cond_init(&tpool->tp_workcv, USYNC_THREAD, NULL);
276 	(void) cond_init(&tpool->tp_waitcv, USYNC_THREAD, NULL);
277 	tpool->tp_minimum = min_threads;
278 	tpool->tp_maximum = max_threads;
279 	tpool->tp_linger = linger;
280 
281 	/*
282 	 * We cannot just copy the attribute pointer.
283 	 * We need to initialize a new pthread_attr_t structure
284 	 * with the values from the user-supplied pthread_attr_t.
285 	 * If the attribute pointer is NULL, we need to initialize
286 	 * the new pthread_attr_t structure with default values.
287 	 */
288 	error = pthread_attr_clone(&tpool->tp_attr, attr);
289 	if (error) {
290 		lfree(tpool, sizeof (*tpool));
291 		errno = error;
292 		return (NULL);
293 	}
294 
295 	/* make all pool threads be detached daemon threads */
296 	(void) pthread_attr_setdetachstate(&tpool->tp_attr,
297 	    PTHREAD_CREATE_DETACHED);
298 	(void) pthread_attr_setdaemonstate_np(&tpool->tp_attr,
299 	    PTHREAD_CREATE_DAEMON_NP);
300 
301 	/* insert into the global list of all thread pools */
302 	lmutex_lock(&thread_pool_lock);
303 	if (thread_pools == NULL) {
304 		tpool->tp_forw = tpool;
305 		tpool->tp_back = tpool;
306 		thread_pools = tpool;
307 	} else {
308 		thread_pools->tp_back->tp_forw = tpool;
309 		tpool->tp_forw = thread_pools;
310 		tpool->tp_back = thread_pools->tp_back;
311 		thread_pools->tp_back = tpool;
312 	}
313 	lmutex_unlock(&thread_pool_lock);
314 
315 	return (tpool);
316 }
317 
318 /*
319  * Dispatch a work request to the thread pool.
320  * If there are idle workers, awaken one.
321  * Else, if the maximum number of workers has
322  * not been reached, spawn a new worker thread.
323  * Else just return with the job added to the queue.
324  */
325 int
tpool_dispatch(tpool_t * tpool,void (* func)(void *),void * arg)326 tpool_dispatch(tpool_t *tpool, void (*func)(void *), void *arg)
327 {
328 	tpool_job_t *job;
329 
330 	ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
331 
332 	if ((job = lmalloc(sizeof (*job))) == NULL)
333 		return (-1);
334 	job->tpj_next = NULL;
335 	job->tpj_func = func;
336 	job->tpj_arg = arg;
337 
338 	sig_mutex_lock(&tpool->tp_mutex);
339 
340 	if (tpool->tp_head == NULL)
341 		tpool->tp_head = job;
342 	else
343 		tpool->tp_tail->tpj_next = job;
344 	tpool->tp_tail = job;
345 	tpool->tp_njobs++;
346 
347 	if (!(tpool->tp_flags & TP_SUSPEND)) {
348 		if (tpool->tp_idle > 0)
349 			(void) cond_signal(&tpool->tp_workcv);
350 		else if (tpool->tp_current < tpool->tp_maximum &&
351 		    create_worker(tpool) == 0)
352 			tpool->tp_current++;
353 	}
354 
355 	sig_mutex_unlock(&tpool->tp_mutex);
356 	return (0);
357 }
358 
359 /*
360  * Assumes: by the time tpool_destroy() is called no one will use this
361  * thread pool in any way and no one will try to dispatch entries to it.
362  * Calling tpool_destroy() from a job in the pool will cause deadlock.
363  */
364 void
tpool_destroy(tpool_t * tpool)365 tpool_destroy(tpool_t *tpool)
366 {
367 	tpool_active_t *activep;
368 
369 	ASSERT(!tpool_member(tpool));
370 	ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
371 
372 	sig_mutex_lock(&tpool->tp_mutex);
373 	pthread_cleanup_push(sig_mutex_unlock, &tpool->tp_mutex);
374 
375 	/* mark the pool as being destroyed; wakeup idle workers */
376 	tpool->tp_flags |= TP_DESTROY;
377 	tpool->tp_flags &= ~TP_SUSPEND;
378 	(void) cond_broadcast(&tpool->tp_workcv);
379 
380 	/* cancel all active workers */
381 	for (activep = tpool->tp_active; activep; activep = activep->tpa_next)
382 		(void) pthread_cancel(activep->tpa_tid);
383 
384 	/* wait for all active workers to finish */
385 	while (tpool->tp_active != NULL) {
386 		tpool->tp_flags |= TP_WAIT;
387 		(void) sig_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex);
388 	}
389 
390 	/* the last worker to terminate will wake us up */
391 	while (tpool->tp_current != 0)
392 		(void) sig_cond_wait(&tpool->tp_busycv, &tpool->tp_mutex);
393 
394 	pthread_cleanup_pop(1);	/* sig_mutex_unlock(&tpool->tp_mutex); */
395 	delete_pool(tpool);
396 }
397 
398 /*
399  * Like tpool_destroy(), but don't cancel workers or wait for them to finish.
400  * The last worker to terminate will delete the pool.
401  */
402 void
tpool_abandon(tpool_t * tpool)403 tpool_abandon(tpool_t *tpool)
404 {
405 	ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
406 
407 	sig_mutex_lock(&tpool->tp_mutex);
408 	if (tpool->tp_current == 0) {
409 		/* no workers, just delete the pool */
410 		sig_mutex_unlock(&tpool->tp_mutex);
411 		delete_pool(tpool);
412 	} else {
413 		/* wake up all workers, last one will delete the pool */
414 		tpool->tp_flags |= TP_ABANDON;
415 		tpool->tp_flags &= ~TP_SUSPEND;
416 		(void) cond_broadcast(&tpool->tp_workcv);
417 		sig_mutex_unlock(&tpool->tp_mutex);
418 	}
419 }
420 
421 /*
422  * Wait for all jobs to complete.
423  * Calling tpool_wait() from a job in the pool will cause deadlock.
424  */
425 void
tpool_wait(tpool_t * tpool)426 tpool_wait(tpool_t *tpool)
427 {
428 	ASSERT(!tpool_member(tpool));
429 	ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
430 
431 	sig_mutex_lock(&tpool->tp_mutex);
432 	pthread_cleanup_push(sig_mutex_unlock, &tpool->tp_mutex);
433 	while (tpool->tp_head != NULL || tpool->tp_active != NULL) {
434 		tpool->tp_flags |= TP_WAIT;
435 		(void) sig_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex);
436 		ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
437 	}
438 	pthread_cleanup_pop(1);	/* sig_mutex_unlock(&tpool->tp_mutex); */
439 }
440 
441 void
tpool_suspend(tpool_t * tpool)442 tpool_suspend(tpool_t *tpool)
443 {
444 	ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
445 
446 	sig_mutex_lock(&tpool->tp_mutex);
447 	tpool->tp_flags |= TP_SUSPEND;
448 	sig_mutex_unlock(&tpool->tp_mutex);
449 }
450 
451 int
tpool_suspended(tpool_t * tpool)452 tpool_suspended(tpool_t *tpool)
453 {
454 	int suspended;
455 
456 	ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
457 
458 	sig_mutex_lock(&tpool->tp_mutex);
459 	suspended = (tpool->tp_flags & TP_SUSPEND) != 0;
460 	sig_mutex_unlock(&tpool->tp_mutex);
461 
462 	return (suspended);
463 }
464 
465 void
tpool_resume(tpool_t * tpool)466 tpool_resume(tpool_t *tpool)
467 {
468 	int excess;
469 
470 	ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
471 
472 	sig_mutex_lock(&tpool->tp_mutex);
473 	if (!(tpool->tp_flags & TP_SUSPEND)) {
474 		sig_mutex_unlock(&tpool->tp_mutex);
475 		return;
476 	}
477 	tpool->tp_flags &= ~TP_SUSPEND;
478 	(void) cond_broadcast(&tpool->tp_workcv);
479 	excess = tpool->tp_njobs - tpool->tp_idle;
480 	while (excess-- > 0 && tpool->tp_current < tpool->tp_maximum) {
481 		if (create_worker(tpool) != 0)
482 			break;		/* pthread_create() failed */
483 		tpool->tp_current++;
484 	}
485 	sig_mutex_unlock(&tpool->tp_mutex);
486 }
487 
488 int
tpool_member(tpool_t * tpool)489 tpool_member(tpool_t *tpool)
490 {
491 	pthread_t my_tid = pthread_self();
492 	tpool_active_t *activep;
493 
494 	ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
495 
496 	sig_mutex_lock(&tpool->tp_mutex);
497 	for (activep = tpool->tp_active; activep; activep = activep->tpa_next) {
498 		if (activep->tpa_tid == my_tid) {
499 			sig_mutex_unlock(&tpool->tp_mutex);
500 			return (1);
501 		}
502 	}
503 	sig_mutex_unlock(&tpool->tp_mutex);
504 	return (0);
505 }
506 
507 void
postfork1_child_tpool(void)508 postfork1_child_tpool(void)
509 {
510 	pthread_t my_tid = pthread_self();
511 	tpool_t *tpool;
512 	tpool_job_t *job;
513 
514 	/*
515 	 * All of the thread pool workers are gone, except possibly
516 	 * for the current thread, if it is a thread pool worker thread.
517 	 * Retain the thread pools, but make them all empty.  Whatever
518 	 * jobs were queued or running belong to the parent process.
519 	 */
520 top:
521 	if ((tpool = thread_pools) == NULL)
522 		return;
523 
524 	do {
525 		tpool_active_t *activep;
526 
527 		(void) mutex_init(&tpool->tp_mutex, USYNC_THREAD, NULL);
528 		(void) cond_init(&tpool->tp_busycv, USYNC_THREAD, NULL);
529 		(void) cond_init(&tpool->tp_workcv, USYNC_THREAD, NULL);
530 		(void) cond_init(&tpool->tp_waitcv, USYNC_THREAD, NULL);
531 		for (job = tpool->tp_head; job; job = tpool->tp_head) {
532 			tpool->tp_head = job->tpj_next;
533 			lfree(job, sizeof (*job));
534 		}
535 		tpool->tp_tail = NULL;
536 		tpool->tp_njobs = 0;
537 		for (activep = tpool->tp_active; activep;
538 		    activep = activep->tpa_next) {
539 			if (activep->tpa_tid == my_tid) {
540 				activep->tpa_next = NULL;
541 				break;
542 			}
543 		}
544 		tpool->tp_idle = 0;
545 		tpool->tp_current = 0;
546 		if ((tpool->tp_active = activep) != NULL)
547 			tpool->tp_current = 1;
548 		tpool->tp_flags &= ~TP_WAIT;
549 		if (tpool->tp_flags & (TP_DESTROY | TP_ABANDON)) {
550 			tpool->tp_flags &= ~TP_DESTROY;
551 			tpool->tp_flags |= TP_ABANDON;
552 			if (tpool->tp_current == 0) {
553 				delete_pool(tpool);
554 				goto top;	/* start over */
555 			}
556 		}
557 	} while ((tpool = tpool->tp_forw) != thread_pools);
558 }
559