xref: /titanic_44/usr/src/lib/libc/port/tpool/thread_pool.c (revision 7257d1b4d25bfac0c802847390e98a464fd787ac)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 
22 /*
23  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 
27 #pragma ident	"%Z%%M%	%I%	%E% SMI"
28 
29 #include "lint.h"
30 #include "thr_uberdata.h"
31 #include <stdlib.h>
32 #include <signal.h>
33 #include <errno.h>
34 #include "thread_pool_impl.h"
35 
36 static mutex_t thread_pool_lock = DEFAULTMUTEX;
37 static tpool_t *thread_pools = NULL;
38 
39 static void
delete_pool(tpool_t * tpool)40 delete_pool(tpool_t *tpool)
41 {
42 	tpool_job_t *job;
43 
44 	ASSERT(tpool->tp_current == 0 && tpool->tp_active == NULL);
45 
46 	/*
47 	 * Unlink the pool from the global list of all pools.
48 	 */
49 	lmutex_lock(&thread_pool_lock);
50 	if (thread_pools == tpool)
51 		thread_pools = tpool->tp_forw;
52 	if (thread_pools == tpool)
53 		thread_pools = NULL;
54 	else {
55 		tpool->tp_back->tp_forw = tpool->tp_forw;
56 		tpool->tp_forw->tp_back = tpool->tp_back;
57 	}
58 	lmutex_unlock(&thread_pool_lock);
59 
60 	/*
61 	 * There should be no pending jobs, but just in case...
62 	 */
63 	for (job = tpool->tp_head; job != NULL; job = tpool->tp_head) {
64 		tpool->tp_head = job->tpj_next;
65 		lfree(job, sizeof (*job));
66 	}
67 	(void) pthread_attr_destroy(&tpool->tp_attr);
68 	lfree(tpool, sizeof (*tpool));
69 }
70 
71 /*
72  * Worker thread is terminating.
73  */
74 static void
worker_cleanup(tpool_t * tpool)75 worker_cleanup(tpool_t *tpool)
76 {
77 	ASSERT(MUTEX_HELD(&tpool->tp_mutex));
78 
79 	if (--tpool->tp_current == 0 &&
80 	    (tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) {
81 		if (tpool->tp_flags & TP_ABANDON) {
82 			sig_mutex_unlock(&tpool->tp_mutex);
83 			delete_pool(tpool);
84 			return;
85 		}
86 		if (tpool->tp_flags & TP_DESTROY)
87 			(void) cond_broadcast(&tpool->tp_busycv);
88 	}
89 	sig_mutex_unlock(&tpool->tp_mutex);
90 }
91 
92 static void
notify_waiters(tpool_t * tpool)93 notify_waiters(tpool_t *tpool)
94 {
95 	if (tpool->tp_head == NULL && tpool->tp_active == NULL) {
96 		tpool->tp_flags &= ~TP_WAIT;
97 		(void) cond_broadcast(&tpool->tp_waitcv);
98 	}
99 }
100 
101 /*
102  * Called by a worker thread on return from a tpool_dispatch()d job.
103  */
104 static void
job_cleanup(tpool_t * tpool)105 job_cleanup(tpool_t *tpool)
106 {
107 	pthread_t my_tid = pthread_self();
108 	tpool_active_t *activep;
109 	tpool_active_t **activepp;
110 
111 	sig_mutex_lock(&tpool->tp_mutex);
112 	/* CSTYLED */
113 	for (activepp = &tpool->tp_active;; activepp = &activep->tpa_next) {
114 		activep = *activepp;
115 		if (activep->tpa_tid == my_tid) {
116 			*activepp = activep->tpa_next;
117 			break;
118 		}
119 	}
120 	if (tpool->tp_flags & TP_WAIT)
121 		notify_waiters(tpool);
122 }
123 
124 static void *
tpool_worker(void * arg)125 tpool_worker(void *arg)
126 {
127 	tpool_t *tpool = (tpool_t *)arg;
128 	int elapsed;
129 	tpool_job_t *job;
130 	void (*func)(void *);
131 	tpool_active_t active;
132 
133 	sig_mutex_lock(&tpool->tp_mutex);
134 	pthread_cleanup_push(worker_cleanup, tpool);
135 
136 	/*
137 	 * This is the worker's main loop.
138 	 * It will only be left if a timeout or an error has occured.
139 	 */
140 	active.tpa_tid = pthread_self();
141 	for (;;) {
142 		elapsed = 0;
143 		tpool->tp_idle++;
144 		if (tpool->tp_flags & TP_WAIT)
145 			notify_waiters(tpool);
146 		while ((tpool->tp_head == NULL ||
147 		    (tpool->tp_flags & TP_SUSPEND)) &&
148 		    !(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) {
149 			if (tpool->tp_current <= tpool->tp_minimum ||
150 			    tpool->tp_linger == 0) {
151 				(void) sig_cond_wait(&tpool->tp_workcv,
152 				    &tpool->tp_mutex);
153 			} else {
154 				timestruc_t timeout;
155 
156 				timeout.tv_sec = tpool->tp_linger;
157 				timeout.tv_nsec = 0;
158 				if (sig_cond_reltimedwait(&tpool->tp_workcv,
159 				    &tpool->tp_mutex, &timeout) != 0) {
160 					elapsed = 1;
161 					break;
162 				}
163 			}
164 		}
165 		tpool->tp_idle--;
166 		if (tpool->tp_flags & TP_DESTROY)
167 			break;
168 		if (tpool->tp_flags & TP_ABANDON) {
169 			/* can't abandon a suspended pool */
170 			if (tpool->tp_flags & TP_SUSPEND) {
171 				tpool->tp_flags &= ~TP_SUSPEND;
172 				(void) cond_broadcast(&tpool->tp_workcv);
173 			}
174 			if (tpool->tp_head == NULL)
175 				break;
176 		}
177 		if ((job = tpool->tp_head) != NULL &&
178 		    !(tpool->tp_flags & TP_SUSPEND)) {
179 			elapsed = 0;
180 			func = job->tpj_func;
181 			arg = job->tpj_arg;
182 			tpool->tp_head = job->tpj_next;
183 			if (job == tpool->tp_tail)
184 				tpool->tp_tail = NULL;
185 			tpool->tp_njobs--;
186 			active.tpa_next = tpool->tp_active;
187 			tpool->tp_active = &active;
188 			sig_mutex_unlock(&tpool->tp_mutex);
189 			pthread_cleanup_push(job_cleanup, tpool);
190 			lfree(job, sizeof (*job));
191 			/*
192 			 * Call the specified function.
193 			 */
194 			func(arg);
195 			/*
196 			 * We don't know what this thread has been doing,
197 			 * so we reset its signal mask and cancellation
198 			 * state back to the initial values.
199 			 */
200 			(void) pthread_sigmask(SIG_SETMASK, &maskset, NULL);
201 			(void) pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED,
202 			    NULL);
203 			(void) pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,
204 			    NULL);
205 			pthread_cleanup_pop(1);
206 		}
207 		if (elapsed && tpool->tp_current > tpool->tp_minimum) {
208 			/*
209 			 * We timed out and there is no work to be done
210 			 * and the number of workers exceeds the minimum.
211 			 * Exit now to reduce the size of the pool.
212 			 */
213 			break;
214 		}
215 	}
216 	pthread_cleanup_pop(1);
217 	return (arg);
218 }
219 
220 /*
221  * Create a worker thread, with all signals blocked.
222  */
223 static int
create_worker(tpool_t * tpool)224 create_worker(tpool_t *tpool)
225 {
226 	sigset_t oset;
227 	int error;
228 
229 	(void) pthread_sigmask(SIG_SETMASK, &maskset, &oset);
230 	error = pthread_create(NULL, &tpool->tp_attr, tpool_worker, tpool);
231 	(void) pthread_sigmask(SIG_SETMASK, &oset, NULL);
232 	return (error);
233 }
234 
235 tpool_t	*
tpool_create(uint_t min_threads,uint_t max_threads,uint_t linger,pthread_attr_t * attr)236 tpool_create(uint_t min_threads, uint_t max_threads, uint_t linger,
237 	pthread_attr_t *attr)
238 {
239 	tpool_t	*tpool;
240 	void *stackaddr;
241 	size_t stacksize;
242 	size_t minstack;
243 	int error;
244 
245 	if (min_threads > max_threads || max_threads < 1) {
246 		errno = EINVAL;
247 		return (NULL);
248 	}
249 	if (attr != NULL) {
250 		if (pthread_attr_getstack(attr, &stackaddr, &stacksize) != 0) {
251 			errno = EINVAL;
252 			return (NULL);
253 		}
254 		/*
255 		 * Allow only one thread in the pool with a specified stack.
256 		 * Require threads to have at least the minimum stack size.
257 		 */
258 		minstack = thr_min_stack();
259 		if (stackaddr != NULL) {
260 			if (stacksize < minstack || max_threads != 1) {
261 				errno = EINVAL;
262 				return (NULL);
263 			}
264 		} else if (stacksize != 0 && stacksize < minstack) {
265 			errno = EINVAL;
266 			return (NULL);
267 		}
268 	}
269 
270 	tpool = lmalloc(sizeof (*tpool));
271 	if (tpool == NULL) {
272 		errno = ENOMEM;
273 		return (NULL);
274 	}
275 	(void) mutex_init(&tpool->tp_mutex, USYNC_THREAD, NULL);
276 	(void) cond_init(&tpool->tp_busycv, USYNC_THREAD, NULL);
277 	(void) cond_init(&tpool->tp_workcv, USYNC_THREAD, NULL);
278 	(void) cond_init(&tpool->tp_waitcv, USYNC_THREAD, NULL);
279 	tpool->tp_minimum = min_threads;
280 	tpool->tp_maximum = max_threads;
281 	tpool->tp_linger = linger;
282 
283 	/*
284 	 * We cannot just copy the attribute pointer.
285 	 * We need to initialize a new pthread_attr_t structure
286 	 * with the values from the user-supplied pthread_attr_t.
287 	 * If the attribute pointer is NULL, we need to initialize
288 	 * the new pthread_attr_t structure with default values.
289 	 */
290 	error = pthread_attr_clone(&tpool->tp_attr, attr);
291 	if (error) {
292 		lfree(tpool, sizeof (*tpool));
293 		errno = error;
294 		return (NULL);
295 	}
296 
297 	/* make all pool threads be detached daemon threads */
298 	(void) pthread_attr_setdetachstate(&tpool->tp_attr,
299 	    PTHREAD_CREATE_DETACHED);
300 	(void) pthread_attr_setdaemonstate_np(&tpool->tp_attr,
301 	    PTHREAD_CREATE_DAEMON_NP);
302 
303 	/* insert into the global list of all thread pools */
304 	lmutex_lock(&thread_pool_lock);
305 	if (thread_pools == NULL) {
306 		tpool->tp_forw = tpool;
307 		tpool->tp_back = tpool;
308 		thread_pools = tpool;
309 	} else {
310 		thread_pools->tp_back->tp_forw = tpool;
311 		tpool->tp_forw = thread_pools;
312 		tpool->tp_back = thread_pools->tp_back;
313 		thread_pools->tp_back = tpool;
314 	}
315 	lmutex_unlock(&thread_pool_lock);
316 
317 	return (tpool);
318 }
319 
320 /*
321  * Dispatch a work request to the thread pool.
322  * If there are idle workers, awaken one.
323  * Else, if the maximum number of workers has
324  * not been reached, spawn a new worker thread.
325  * Else just return with the job added to the queue.
326  */
327 int
tpool_dispatch(tpool_t * tpool,void (* func)(void *),void * arg)328 tpool_dispatch(tpool_t *tpool, void (*func)(void *), void *arg)
329 {
330 	tpool_job_t *job;
331 
332 	ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
333 
334 	if ((job = lmalloc(sizeof (*job))) == NULL)
335 		return (-1);
336 	job->tpj_next = NULL;
337 	job->tpj_func = func;
338 	job->tpj_arg = arg;
339 
340 	sig_mutex_lock(&tpool->tp_mutex);
341 
342 	if (tpool->tp_head == NULL)
343 		tpool->tp_head = job;
344 	else
345 		tpool->tp_tail->tpj_next = job;
346 	tpool->tp_tail = job;
347 	tpool->tp_njobs++;
348 
349 	if (!(tpool->tp_flags & TP_SUSPEND)) {
350 		if (tpool->tp_idle > 0)
351 			(void) cond_signal(&tpool->tp_workcv);
352 		else if (tpool->tp_current < tpool->tp_maximum &&
353 		    create_worker(tpool) == 0)
354 			tpool->tp_current++;
355 	}
356 
357 	sig_mutex_unlock(&tpool->tp_mutex);
358 	return (0);
359 }
360 
361 /*
362  * Assumes: by the time tpool_destroy() is called no one will use this
363  * thread pool in any way and no one will try to dispatch entries to it.
364  * Calling tpool_destroy() from a job in the pool will cause deadlock.
365  */
366 void
tpool_destroy(tpool_t * tpool)367 tpool_destroy(tpool_t *tpool)
368 {
369 	tpool_active_t *activep;
370 
371 	ASSERT(!tpool_member(tpool));
372 	ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
373 
374 	sig_mutex_lock(&tpool->tp_mutex);
375 	pthread_cleanup_push(sig_mutex_unlock, &tpool->tp_mutex);
376 
377 	/* mark the pool as being destroyed; wakeup idle workers */
378 	tpool->tp_flags |= TP_DESTROY;
379 	tpool->tp_flags &= ~TP_SUSPEND;
380 	(void) cond_broadcast(&tpool->tp_workcv);
381 
382 	/* cancel all active workers */
383 	for (activep = tpool->tp_active; activep; activep = activep->tpa_next)
384 		(void) pthread_cancel(activep->tpa_tid);
385 
386 	/* wait for all active workers to finish */
387 	while (tpool->tp_active != NULL) {
388 		tpool->tp_flags |= TP_WAIT;
389 		(void) sig_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex);
390 	}
391 
392 	/* the last worker to terminate will wake us up */
393 	while (tpool->tp_current != 0)
394 		(void) sig_cond_wait(&tpool->tp_busycv, &tpool->tp_mutex);
395 
396 	pthread_cleanup_pop(1);	/* sig_mutex_unlock(&tpool->tp_mutex); */
397 	delete_pool(tpool);
398 }
399 
400 /*
401  * Like tpool_destroy(), but don't cancel workers or wait for them to finish.
402  * The last worker to terminate will delete the pool.
403  */
404 void
tpool_abandon(tpool_t * tpool)405 tpool_abandon(tpool_t *tpool)
406 {
407 	ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
408 
409 	sig_mutex_lock(&tpool->tp_mutex);
410 	if (tpool->tp_current == 0) {
411 		/* no workers, just delete the pool */
412 		sig_mutex_unlock(&tpool->tp_mutex);
413 		delete_pool(tpool);
414 	} else {
415 		/* wake up all workers, last one will delete the pool */
416 		tpool->tp_flags |= TP_ABANDON;
417 		tpool->tp_flags &= ~TP_SUSPEND;
418 		(void) cond_broadcast(&tpool->tp_workcv);
419 		sig_mutex_unlock(&tpool->tp_mutex);
420 	}
421 }
422 
423 /*
424  * Wait for all jobs to complete.
425  * Calling tpool_wait() from a job in the pool will cause deadlock.
426  */
427 void
tpool_wait(tpool_t * tpool)428 tpool_wait(tpool_t *tpool)
429 {
430 	ASSERT(!tpool_member(tpool));
431 	ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
432 
433 	sig_mutex_lock(&tpool->tp_mutex);
434 	pthread_cleanup_push(sig_mutex_unlock, &tpool->tp_mutex);
435 	while (tpool->tp_head != NULL || tpool->tp_active != NULL) {
436 		tpool->tp_flags |= TP_WAIT;
437 		(void) sig_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex);
438 		ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
439 	}
440 	pthread_cleanup_pop(1);	/* sig_mutex_unlock(&tpool->tp_mutex); */
441 }
442 
443 void
tpool_suspend(tpool_t * tpool)444 tpool_suspend(tpool_t *tpool)
445 {
446 	ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
447 
448 	sig_mutex_lock(&tpool->tp_mutex);
449 	tpool->tp_flags |= TP_SUSPEND;
450 	sig_mutex_unlock(&tpool->tp_mutex);
451 }
452 
453 int
tpool_suspended(tpool_t * tpool)454 tpool_suspended(tpool_t *tpool)
455 {
456 	int suspended;
457 
458 	ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
459 
460 	sig_mutex_lock(&tpool->tp_mutex);
461 	suspended = (tpool->tp_flags & TP_SUSPEND) != 0;
462 	sig_mutex_unlock(&tpool->tp_mutex);
463 
464 	return (suspended);
465 }
466 
467 void
tpool_resume(tpool_t * tpool)468 tpool_resume(tpool_t *tpool)
469 {
470 	int excess;
471 
472 	ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
473 
474 	sig_mutex_lock(&tpool->tp_mutex);
475 	if (!(tpool->tp_flags & TP_SUSPEND)) {
476 		sig_mutex_unlock(&tpool->tp_mutex);
477 		return;
478 	}
479 	tpool->tp_flags &= ~TP_SUSPEND;
480 	(void) cond_broadcast(&tpool->tp_workcv);
481 	excess = tpool->tp_njobs - tpool->tp_idle;
482 	while (excess-- > 0 && tpool->tp_current < tpool->tp_maximum) {
483 		if (create_worker(tpool) != 0)
484 			break;		/* pthread_create() failed */
485 		tpool->tp_current++;
486 	}
487 	sig_mutex_unlock(&tpool->tp_mutex);
488 }
489 
490 int
tpool_member(tpool_t * tpool)491 tpool_member(tpool_t *tpool)
492 {
493 	pthread_t my_tid = pthread_self();
494 	tpool_active_t *activep;
495 
496 	ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
497 
498 	sig_mutex_lock(&tpool->tp_mutex);
499 	for (activep = tpool->tp_active; activep; activep = activep->tpa_next) {
500 		if (activep->tpa_tid == my_tid) {
501 			sig_mutex_unlock(&tpool->tp_mutex);
502 			return (1);
503 		}
504 	}
505 	sig_mutex_unlock(&tpool->tp_mutex);
506 	return (0);
507 }
508 
509 void
postfork1_child_tpool(void)510 postfork1_child_tpool(void)
511 {
512 	pthread_t my_tid = pthread_self();
513 	tpool_t *tpool;
514 	tpool_job_t *job;
515 
516 	/*
517 	 * All of the thread pool workers are gone, except possibly
518 	 * for the current thread, if it is a thread pool worker thread.
519 	 * Retain the thread pools, but make them all empty.  Whatever
520 	 * jobs were queued or running belong to the parent process.
521 	 */
522 top:
523 	if ((tpool = thread_pools) == NULL)
524 		return;
525 
526 	do {
527 		tpool_active_t *activep;
528 
529 		(void) mutex_init(&tpool->tp_mutex, USYNC_THREAD, NULL);
530 		(void) cond_init(&tpool->tp_busycv, USYNC_THREAD, NULL);
531 		(void) cond_init(&tpool->tp_workcv, USYNC_THREAD, NULL);
532 		(void) cond_init(&tpool->tp_waitcv, USYNC_THREAD, NULL);
533 		for (job = tpool->tp_head; job; job = tpool->tp_head) {
534 			tpool->tp_head = job->tpj_next;
535 			lfree(job, sizeof (*job));
536 		}
537 		tpool->tp_tail = NULL;
538 		tpool->tp_njobs = 0;
539 		for (activep = tpool->tp_active; activep;
540 		    activep = activep->tpa_next) {
541 			if (activep->tpa_tid == my_tid) {
542 				activep->tpa_next = NULL;
543 				break;
544 			}
545 		}
546 		tpool->tp_idle = 0;
547 		tpool->tp_current = 0;
548 		if ((tpool->tp_active = activep) != NULL)
549 			tpool->tp_current = 1;
550 		tpool->tp_flags &= ~TP_WAIT;
551 		if (tpool->tp_flags & (TP_DESTROY | TP_ABANDON)) {
552 			tpool->tp_flags &= ~TP_DESTROY;
553 			tpool->tp_flags |= TP_ABANDON;
554 			if (tpool->tp_current == 0) {
555 				delete_pool(tpool);
556 				goto top;	/* start over */
557 			}
558 		}
559 	} while ((tpool = tpool->tp_forw) != thread_pools);
560 }
561