xref: /freebsd/contrib/sendmail/libmilter/worker.c (revision c6ec7d31830ab1c80edae95ad5e4b9dba10c47ac)
1 /*
2  *  Copyright (c) 2003-2004, 2007, 2009-2011 Sendmail, Inc. and its suppliers.
3  *	All rights reserved.
4  *
5  * By using this file, you agree to the terms and conditions set
6  * forth in the LICENSE file which can be found at the top level of
7  * the sendmail distribution.
8  *
9  * Contributed by Jose Marcio Martins da Cruz - Ecole des Mines de Paris
10  *   Jose-Marcio.Martins@ensmp.fr
11  */
12 
13 #include <sm/gen.h>
14 SM_RCSID("@(#)$Id: worker.c,v 8.19 2011/02/14 23:33:48 ca Exp $")
15 
16 #include "libmilter.h"
17 
18 #if _FFR_WORKERS_POOL
19 
20 typedef struct taskmgr_S taskmgr_T;
21 
22 #define TM_SIGNATURE		0x23021957
23 
24 struct taskmgr_S
25 {
26 	long		tm_signature; /* has the controller been initialized */
27 	sthread_t	tm_tid;	/* thread id of controller */
28 	smfi_hd_T	tm_ctx_head; /* head of the linked list of contexts */
29 
30 	int		tm_nb_workers;	/* number of workers in the pool */
31 	int		tm_nb_idle;	/* number of workers waiting */
32 
33 	int		tm_p[2];	/* poll control pipe */
34 
35 	smutex_t	tm_w_mutex;	/* linked list access mutex */
36 	scond_t		tm_w_cond;	/* */
37 };
38 
39 static taskmgr_T     Tskmgr = {0};
40 
41 #define WRK_CTX_HEAD	Tskmgr.tm_ctx_head
42 
43 #define RD_PIPE	(Tskmgr.tm_p[0])
44 #define WR_PIPE	(Tskmgr.tm_p[1])
45 
46 #define PIPE_SEND_SIGNAL()						\
47 	do								\
48 	{								\
49 		char evt = 0x5a;					\
50 		int fd = WR_PIPE;					\
51 		if (write(fd, &evt, sizeof(evt)) != sizeof(evt))	\
52 			smi_log(SMI_LOG_ERR,				\
53 				"Error writing to event pipe: %s",	\
54 				sm_errstring(errno));			\
55 	} while (0)
56 
57 #ifndef USE_PIPE_WAKE_POLL
58 # define USE_PIPE_WAKE_POLL 1
59 #endif /* USE_PIPE_WAKE_POLL */
60 
61 /* poll check periodicity (default 10000 - 10 s) */
62 #define POLL_TIMEOUT   10000
63 
64 /* worker conditional wait timeout (default 10 s) */
65 #define COND_TIMEOUT     10
66 
67 /* functions */
68 static int mi_close_session __P((SMFICTX_PTR));
69 
70 static void *mi_worker __P((void *));
71 static void *mi_pool_controller __P((void *));
72 
73 static int mi_list_add_ctx __P((SMFICTX_PTR));
74 static int mi_list_del_ctx __P((SMFICTX_PTR));
75 
76 /*
77 **  periodicity of cleaning up old sessions (timedout)
78 **	sessions list will be checked to find old inactive
79 **	sessions each DT_CHECK_OLD_SESSIONS sec
80 */
81 
82 #define DT_CHECK_OLD_SESSIONS   600
83 
84 #ifndef OLD_SESSION_TIMEOUT
85 # define OLD_SESSION_TIMEOUT      ctx->ctx_timeout
86 #endif /* OLD_SESSION_TIMEOUT */
87 
88 /* session states - with respect to the pool of workers */
89 #define WKST_INIT		0	/* initial state */
90 #define WKST_READY_TO_RUN	1	/* command ready do be read */
91 #define WKST_RUNNING		2	/* session running on a worker */
92 #define WKST_READY_TO_WAIT	3	/* session just finished by a worker */
93 #define WKST_WAITING		4	/* waiting for new command */
94 #define WKST_CLOSING		5	/* session finished */
95 
96 #ifndef MIN_WORKERS
97 # define MIN_WORKERS	2  /* minimum number of threads to keep around */
98 #endif
99 
100 #define MIN_IDLE	1  /* minimum number of idle threads */
101 
102 
103 /*
104 **  Macros for threads and mutex management
105 */
106 
107 #define TASKMGR_LOCK()							\
108 	do								\
109 	{								\
110 		if (!smutex_lock(&Tskmgr.tm_w_mutex))			\
111 			smi_log(SMI_LOG_ERR, "TASKMGR_LOCK error");	\
112 	} while (0)
113 
114 #define TASKMGR_UNLOCK()						\
115 	do								\
116 	{								\
117 		if (!smutex_unlock(&Tskmgr.tm_w_mutex))			\
118 			smi_log(SMI_LOG_ERR, "TASKMGR_UNLOCK error");	\
119 	} while (0)
120 
121 #define	TASKMGR_COND_WAIT()						\
122 	scond_timedwait(&Tskmgr.tm_w_cond, &Tskmgr.tm_w_mutex, COND_TIMEOUT)
123 
124 #define	TASKMGR_COND_SIGNAL()						\
125 	do								\
126 	{								\
127 		if (scond_signal(&Tskmgr.tm_w_cond) != 0)		\
128 			smi_log(SMI_LOG_ERR, "TASKMGR_COND_SIGNAL error"); \
129 	} while (0)
130 
131 #define LAUNCH_WORKER(ctx)						\
132 	do								\
133 	{								\
134 		int r;							\
135 		sthread_t tid;						\
136 									\
137 		if ((r = thread_create(&tid, mi_worker, ctx)) != 0)	\
138 			smi_log(SMI_LOG_ERR, "LAUNCH_WORKER error: %s",\
139 				sm_errstring(r));			\
140 	} while (0)
141 
142 #if POOL_DEBUG
143 # define POOL_LEV_DPRINTF(lev, x)					\
144 	do {								\
145 		if ((lev) < ctx->ctx_dbg)				\
146 			sm_dprintf x;					\
147 	} while (0)
148 #else /* POOL_DEBUG */
149 # define POOL_LEV_DPRINTF(lev, x)
150 #endif /* POOL_DEBUG */
151 
152 /*
153 **  MI_START_SESSION -- Start a session in the pool of workers
154 **
155 **	Parameters:
156 **		ctx -- context structure
157 **
158 **	Returns:
159 **		MI_SUCCESS/MI_FAILURE
160 */
161 
162 int
163 mi_start_session(ctx)
164 	SMFICTX_PTR ctx;
165 {
166 	static long id = 0;
167 
168 	/* this can happen if the milter is shutting down */
169 	if (Tskmgr.tm_signature != TM_SIGNATURE)
170 		return MI_FAILURE;
171 	SM_ASSERT(ctx != NULL);
172 	POOL_LEV_DPRINTF(4, ("PIPE r=[%d] w=[%d]", RD_PIPE, WR_PIPE));
173 	TASKMGR_LOCK();
174 
175 	if (mi_list_add_ctx(ctx) != MI_SUCCESS)
176 	{
177 		TASKMGR_UNLOCK();
178 		return MI_FAILURE;
179 	}
180 
181 	ctx->ctx_sid = id++;
182 
183 	/* if there is an idle worker, signal it, otherwise start new worker */
184 	if (Tskmgr.tm_nb_idle > 0)
185 	{
186 		ctx->ctx_wstate = WKST_READY_TO_RUN;
187 		TASKMGR_COND_SIGNAL();
188 	}
189 	else
190 	{
191 		ctx->ctx_wstate = WKST_RUNNING;
192 		LAUNCH_WORKER(ctx);
193 	}
194 	TASKMGR_UNLOCK();
195 	return MI_SUCCESS;
196 }
197 
198 /*
199 **  MI_CLOSE_SESSION -- Close a session and clean up data structures
200 **
201 **	Parameters:
202 **		ctx -- context structure
203 **
204 **	Returns:
205 **		MI_SUCCESS/MI_FAILURE
206 */
207 
208 static int
209 mi_close_session(ctx)
210 	SMFICTX_PTR ctx;
211 {
212 	SM_ASSERT(ctx != NULL);
213 
214 	(void) mi_list_del_ctx(ctx);
215 	mi_clr_ctx(ctx);
216 
217 	return MI_SUCCESS;
218 }
219 
220 /*
221 **  NONBLOCKING -- set nonblocking mode for a file descriptor.
222 **
223 **	Parameters:
224 **		fd -- file descriptor
225 **		name -- name for (error) logging
226 **
227 **	Returns:
228 **		MI_SUCCESS/MI_FAILURE
229 */
230 
231 static int
232 nonblocking(int fd, const char *name)
233 {
234 	int r;
235 
236 	errno = 0;
237 	r = fcntl(fd, F_GETFL, 0);
238 	if (r == -1)
239 	{
240 		smi_log(SMI_LOG_ERR, "fcntl(%s, F_GETFL)=%s",
241 			name, sm_errstring(errno));
242 		return MI_FAILURE;
243 	}
244 	errno = 0;
245 	r = fcntl(fd, F_SETFL, r | O_NONBLOCK);
246 	if (r == -1)
247 	{
248 		smi_log(SMI_LOG_ERR, "fcntl(%s, F_SETFL, O_NONBLOCK)=%s",
249 			name, sm_errstring(errno));
250 		return MI_FAILURE;
251 	}
252 	return MI_SUCCESS;
253 }
254 
255 /*
256 **  MI_POOL_CONTROLER_INIT -- Launch the worker pool controller
257 **		Must be called before starting sessions.
258 **
259 **	Parameters:
260 **		none
261 **
262 **	Returns:
263 **		MI_SUCCESS/MI_FAILURE
264 */
265 
266 int
267 mi_pool_controller_init()
268 {
269 	sthread_t tid;
270 	int r, i;
271 
272 	if (Tskmgr.tm_signature == TM_SIGNATURE)
273 		return MI_SUCCESS;
274 
275 	SM_TAILQ_INIT(&WRK_CTX_HEAD);
276 	Tskmgr.tm_tid = (sthread_t) -1;
277 	Tskmgr.tm_nb_workers = 0;
278 	Tskmgr.tm_nb_idle = 0;
279 
280 	if (pipe(Tskmgr.tm_p) != 0)
281 	{
282 		smi_log(SMI_LOG_ERR, "can't create event pipe: %s",
283 			sm_errstring(errno));
284 		return MI_FAILURE;
285 	}
286 	r = nonblocking(WR_PIPE, "WR_PIPE");
287 	if (r != MI_SUCCESS)
288 		return r;
289 	r = nonblocking(RD_PIPE, "RD_PIPE");
290 	if (r != MI_SUCCESS)
291 		return r;
292 
293 	(void) smutex_init(&Tskmgr.tm_w_mutex);
294 	(void) scond_init(&Tskmgr.tm_w_cond);
295 
296 	/* Launch the pool controller */
297 	if ((r = thread_create(&tid, mi_pool_controller, (void *) NULL)) != 0)
298 	{
299 		smi_log(SMI_LOG_ERR, "can't create controller thread: %s",
300 			sm_errstring(r));
301 		return MI_FAILURE;
302 	}
303 	Tskmgr.tm_tid = tid;
304 	Tskmgr.tm_signature = TM_SIGNATURE;
305 
306 	/* Create the pool of workers */
307 	for (i = 0; i < MIN_WORKERS; i++)
308 	{
309 		if ((r = thread_create(&tid, mi_worker, (void *) NULL)) != 0)
310 		{
311 			smi_log(SMI_LOG_ERR, "can't create workers crew: %s",
312 				sm_errstring(r));
313 			return MI_FAILURE;
314 		}
315 	}
316 
317 	return MI_SUCCESS;
318 }
319 
320 /*
321 **  MI_POOL_CONTROLLER -- manage the pool of workers
322 **	This thread must be running when listener begins
323 **	starting sessions
324 **
325 **	Parameters:
326 **		arg -- unused
327 **
328 **	Returns:
329 **		NULL
330 **
331 **	Control flow:
332 **		for (;;)
333 **			Look for timed out sessions
334 **			Select sessions to wait for sendmail command
335 **			Poll set of file descriptors
336 **			if timeout
337 **				continue
338 **			For each file descriptor ready
339 **				launch new thread if no worker available
340 **				else
341 **				signal waiting worker
342 */
343 
344 /* Poll structure array (pollfd) size step */
345 #define PFD_STEP	256
346 
347 #define WAIT_FD(i)	(pfd[i].fd)
348 #define WAITFN		"POLL"
349 
350 static void *
351 mi_pool_controller(arg)
352 	void *arg;
353 {
354 	struct pollfd *pfd = NULL;
355 	int dim_pfd = 0;
356 	bool rebuild_set = true;
357 	int pcnt = 0; /* error count for poll() failures */
358 	time_t lastcheck;
359 
360 	Tskmgr.tm_tid = sthread_get_id();
361 	if (pthread_detach(Tskmgr.tm_tid) != 0)
362 	{
363 		smi_log(SMI_LOG_ERR, "Failed to detach pool controller thread");
364 		return NULL;
365 	}
366 
367 	pfd = (struct pollfd *) malloc(PFD_STEP * sizeof(struct pollfd));
368 	if (pfd == NULL)
369 	{
370 		smi_log(SMI_LOG_ERR, "Failed to malloc pollfd array: %s",
371 			sm_errstring(errno));
372 		return NULL;
373 	}
374 	dim_pfd = PFD_STEP;
375 
376 	lastcheck = time(NULL);
377 	for (;;)
378 	{
379 		SMFICTX_PTR ctx;
380 		int nfd, rfd, i;
381 		time_t now;
382 
383 		POOL_LEV_DPRINTF(4, ("Let's %s again...", WAITFN));
384 
385 		if (mi_stop() != MILTER_CONT)
386 			break;
387 
388 		TASKMGR_LOCK();
389 
390 		now = time(NULL);
391 
392 		/* check for timed out sessions? */
393 		if (lastcheck + DT_CHECK_OLD_SESSIONS < now)
394 		{
395 			ctx = SM_TAILQ_FIRST(&WRK_CTX_HEAD);
396 			while (ctx != SM_TAILQ_END(&WRK_CTX_HEAD))
397 			{
398 				SMFICTX_PTR ctx_nxt;
399 
400 				ctx_nxt = SM_TAILQ_NEXT(ctx, ctx_link);
401 				if (ctx->ctx_wstate == WKST_WAITING)
402 				{
403 					if (ctx->ctx_wait == 0)
404 						ctx->ctx_wait = now;
405 					else if (ctx->ctx_wait + OLD_SESSION_TIMEOUT
406 						 < now)
407 					{
408 						/* if session timed out, close it */
409 						sfsistat (*fi_close) __P((SMFICTX *));
410 
411 						POOL_LEV_DPRINTF(4,
412 							("Closing old connection: sd=%d id=%d",
413 							ctx->ctx_sd,
414 							ctx->ctx_sid));
415 
416 						if ((fi_close = ctx->ctx_smfi->xxfi_close) != NULL)
417 							(void) (*fi_close)(ctx);
418 
419 						mi_close_session(ctx);
420 					}
421 				}
422 				ctx = ctx_nxt;
423 			}
424 			lastcheck = now;
425 		}
426 
427 		if (rebuild_set)
428 		{
429 			/*
430 			**  Initialize poll set.
431 			**  Insert into the poll set the file descriptors of
432 			**  all sessions waiting for a command from sendmail.
433 			*/
434 
435 			nfd = 0;
436 
437 			/* begin with worker pipe */
438 			pfd[nfd].fd = RD_PIPE;
439 			pfd[nfd].events = MI_POLL_RD_FLAGS;
440 			pfd[nfd].revents = 0;
441 			nfd++;
442 
443 			SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link)
444 			{
445 				/*
446 				**  update ctx_wait - start of wait moment -
447 				**  for timeout
448 				*/
449 
450 				if (ctx->ctx_wstate == WKST_READY_TO_WAIT)
451 					ctx->ctx_wait = now;
452 
453 				/* add the session to the pollfd array? */
454 				if ((ctx->ctx_wstate == WKST_READY_TO_WAIT) ||
455 				    (ctx->ctx_wstate == WKST_WAITING))
456 				{
457 					/*
458 					**  Resize the pollfd array if it
459 					**  isn't large enough.
460 					*/
461 
462 					if (nfd >= dim_pfd)
463 					{
464 						struct pollfd *tpfd;
465 						size_t new;
466 
467 						new = (dim_pfd + PFD_STEP) *
468 							sizeof(*tpfd);
469 						tpfd = (struct pollfd *)
470 							realloc(pfd, new);
471 						if (tpfd != NULL)
472 						{
473 							pfd = tpfd;
474 							dim_pfd += PFD_STEP;
475 						}
476 						else
477 						{
478 							smi_log(SMI_LOG_ERR,
479 								"Failed to realloc pollfd array:%s",
480 								sm_errstring(errno));
481 						}
482 					}
483 
484 					/* add the session to pollfd array */
485 					if (nfd < dim_pfd)
486 					{
487 						ctx->ctx_wstate = WKST_WAITING;
488 						pfd[nfd].fd = ctx->ctx_sd;
489 						pfd[nfd].events = MI_POLL_RD_FLAGS;
490 						pfd[nfd].revents = 0;
491 						nfd++;
492 					}
493 				}
494 			}
495 			rebuild_set = false;
496 		}
497 
498 		TASKMGR_UNLOCK();
499 
500 		/* Everything is ready, let's wait for an event */
501 		rfd = poll(pfd, nfd, POLL_TIMEOUT);
502 
503 		POOL_LEV_DPRINTF(4, ("%s returned: at epoch %d value %d",
504 			WAITFN, now, nfd));
505 
506 		/* timeout */
507 		if (rfd == 0)
508 			continue;
509 
510 		rebuild_set = true;
511 
512 		/* error */
513 		if (rfd < 0)
514 		{
515 			if (errno == EINTR)
516 				continue;
517 			pcnt++;
518 			smi_log(SMI_LOG_ERR,
519 				"%s() failed (%s), %s",
520 				WAITFN, sm_errstring(errno),
521 				pcnt >= MAX_FAILS_S ? "abort" : "try again");
522 
523 			if (pcnt >= MAX_FAILS_S)
524 				goto err;
525 		}
526 		pcnt = 0;
527 
528 		/* something happened */
529 		for (i = 0; i < nfd; i++)
530 		{
531 			if (pfd[i].revents == 0)
532 				continue;
533 
534 			POOL_LEV_DPRINTF(4, ("%s event on pfd[%d/%d]=%d ",
535 				WAITFN, i, nfd,
536 			WAIT_FD(i)));
537 
538 			/* has a worker signaled an end of task ? */
539 			if (WAIT_FD(i) == RD_PIPE)
540 			{
541 				char evts[256];
542 				ssize_t r;
543 
544 				POOL_LEV_DPRINTF(4,
545 					("PIPE WILL READ evt = %08X %08X",
546 					pfd[i].events, pfd[i].revents));
547 
548 				r = 1;
549 				while ((pfd[i].revents & MI_POLL_RD_FLAGS) != 0
550 					&& r != -1)
551 				{
552 					r = read(RD_PIPE, evts, sizeof(evts));
553 				}
554 
555 				POOL_LEV_DPRINTF(4,
556 					("PIPE DONE READ i=[%d] fd=[%d] r=[%d] evt=[%d]",
557 					i, RD_PIPE, (int) r, evts[0]));
558 
559 				if ((pfd[i].revents & ~MI_POLL_RD_FLAGS) != 0)
560 				{
561 					/* Exception handling */
562 				}
563 				continue;
564 			}
565 
566 			/* no ! sendmail wants to send a command */
567 			SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link)
568 			{
569 				if (ctx->ctx_wstate != WKST_WAITING)
570 					continue;
571 
572 				POOL_LEV_DPRINTF(4,
573 					("Checking context sd=%d - fd=%d ",
574 					ctx->ctx_sd , WAIT_FD(i)));
575 
576 				if (ctx->ctx_sd == pfd[i].fd)
577 				{
578 					TASKMGR_LOCK();
579 
580 					POOL_LEV_DPRINTF(4,
581 						("TASK: found %d for fd[%d]=%d",
582 						ctx->ctx_sid, i, WAIT_FD(i)));
583 
584 					if (Tskmgr.tm_nb_idle > 0)
585 					{
586 						ctx->ctx_wstate = WKST_READY_TO_RUN;
587 						TASKMGR_COND_SIGNAL();
588 					}
589 					else
590 					{
591 						ctx->ctx_wstate = WKST_RUNNING;
592 						LAUNCH_WORKER(ctx);
593 					}
594 					TASKMGR_UNLOCK();
595 					break;
596 				}
597 			}
598 
599 			POOL_LEV_DPRINTF(4,
600 				("TASK %s FOUND - Checking PIPE for fd[%d]",
601 				ctx != NULL ? "" : "NOT", WAIT_FD(i)));
602 		}
603 	}
604 
605   err:
606 	if (pfd != NULL)
607 		free(pfd);
608 
609 	Tskmgr.tm_signature = 0;
610 	for (;;)
611 	{
612 		SMFICTX_PTR ctx;
613 
614 		ctx = SM_TAILQ_FIRST(&WRK_CTX_HEAD);
615 		if (ctx == NULL)
616 			break;
617 		mi_close_session(ctx);
618 	}
619 
620 	(void) smutex_destroy(&Tskmgr.tm_w_mutex);
621 	(void) scond_destroy(&Tskmgr.tm_w_cond);
622 
623 	return NULL;
624 }
625 
626 /*
627 **  Look for a task ready to run.
628 **  Value of ctx is NULL or a pointer to a task ready to run.
629 */
630 
631 #define GET_TASK_READY_TO_RUN()					\
632 	SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link)		\
633 	{							\
634 		if (ctx->ctx_wstate == WKST_READY_TO_RUN)	\
635 		{						\
636 			ctx->ctx_wstate = WKST_RUNNING;		\
637 			break;					\
638 		}						\
639 	}
640 
641 /*
642 **  MI_WORKER -- worker thread
643 **	executes tasks distributed by the mi_pool_controller
644 **	or by mi_start_session
645 **
646 **	Parameters:
647 **		arg -- pointer to context structure
648 **
649 **	Returns:
650 **		NULL pointer
651 */
652 
653 static void *
654 mi_worker(arg)
655 	void *arg;
656 {
657 	SMFICTX_PTR ctx;
658 	bool done;
659 	sthread_t t_id;
660 	int r;
661 
662 	ctx = (SMFICTX_PTR) arg;
663 	done = false;
664 	if (ctx != NULL)
665 		ctx->ctx_wstate = WKST_RUNNING;
666 
667 	t_id = sthread_get_id();
668 	if (pthread_detach(t_id) != 0)
669 	{
670 		smi_log(SMI_LOG_ERR, "Failed to detach worker thread");
671 		if (ctx != NULL)
672 			ctx->ctx_wstate = WKST_READY_TO_RUN;
673 		return NULL;
674 	}
675 
676 	TASKMGR_LOCK();
677 	Tskmgr.tm_nb_workers++;
678 	TASKMGR_UNLOCK();
679 
680 	while (!done)
681 	{
682 		if (mi_stop() != MILTER_CONT)
683 			break;
684 
685 		/* let's handle next task... */
686 		if (ctx != NULL)
687 		{
688 			int res;
689 
690 			POOL_LEV_DPRINTF(4,
691 				("worker %d: new task -> let's handle it",
692 				t_id));
693 			res = mi_engine(ctx);
694 			POOL_LEV_DPRINTF(4,
695 				("worker %d: mi_engine returned %d", t_id, res));
696 
697 			TASKMGR_LOCK();
698 			if (res != MI_CONTINUE)
699 			{
700 				ctx->ctx_wstate = WKST_CLOSING;
701 
702 				/*
703 				**  Delete context from linked list of
704 				**  sessions and close session.
705 				*/
706 
707 				mi_close_session(ctx);
708 			}
709 			else
710 			{
711 				ctx->ctx_wstate = WKST_READY_TO_WAIT;
712 
713 				POOL_LEV_DPRINTF(4,
714 					("writing to event pipe..."));
715 
716 				/*
717 				**  Signal task controller to add new session
718 				**  to poll set.
719 				*/
720 
721 				PIPE_SEND_SIGNAL();
722 			}
723 			TASKMGR_UNLOCK();
724 			ctx = NULL;
725 
726 		}
727 
728 		/* check if there is any task waiting to be served */
729 		TASKMGR_LOCK();
730 
731 		GET_TASK_READY_TO_RUN();
732 
733 		/* Got a task? */
734 		if (ctx != NULL)
735 		{
736 			TASKMGR_UNLOCK();
737 			continue;
738 		}
739 
740 		/*
741 		**  if not, let's check if there is enough idle workers
742 		**	if yes: quit
743 		*/
744 
745 		if (Tskmgr.tm_nb_workers > MIN_WORKERS &&
746 		    Tskmgr.tm_nb_idle > MIN_IDLE)
747 			done = true;
748 
749 		POOL_LEV_DPRINTF(4, ("worker %d: checking ... %d %d", t_id,
750 			Tskmgr.tm_nb_workers, Tskmgr.tm_nb_idle + 1));
751 
752 		if (done)
753 		{
754 			POOL_LEV_DPRINTF(4, ("worker %d: quitting... ", t_id));
755 			Tskmgr.tm_nb_workers--;
756 			TASKMGR_UNLOCK();
757 			continue;
758 		}
759 
760 		/*
761 		**  if no task ready to run, wait for another one
762 		*/
763 
764 		Tskmgr.tm_nb_idle++;
765 		TASKMGR_COND_WAIT();
766 		Tskmgr.tm_nb_idle--;
767 
768 		/* look for a task */
769 		GET_TASK_READY_TO_RUN();
770 
771 		TASKMGR_UNLOCK();
772 	}
773 	return NULL;
774 }
775 
776 /*
777 **  MI_LIST_ADD_CTX -- add new session to linked list
778 **
779 **	Parameters:
780 **		ctx -- context structure
781 **
782 **	Returns:
783 **		MI_FAILURE/MI_SUCCESS
784 */
785 
786 static int
787 mi_list_add_ctx(ctx)
788 	SMFICTX_PTR ctx;
789 {
790 	SM_ASSERT(ctx != NULL);
791 	SM_TAILQ_INSERT_TAIL(&WRK_CTX_HEAD, ctx, ctx_link);
792 	return MI_SUCCESS;
793 }
794 
795 /*
796 **  MI_LIST_DEL_CTX -- remove session from linked list when finished
797 **
798 **	Parameters:
799 **		ctx -- context structure
800 **
801 **	Returns:
802 **		MI_FAILURE/MI_SUCCESS
803 */
804 
805 static int
806 mi_list_del_ctx(ctx)
807 	SMFICTX_PTR ctx;
808 {
809 	SM_ASSERT(ctx != NULL);
810 	if (SM_TAILQ_EMPTY(&WRK_CTX_HEAD))
811 		return MI_FAILURE;
812 
813 	SM_TAILQ_REMOVE(&WRK_CTX_HEAD, ctx, ctx_link);
814 	return MI_SUCCESS;
815 }
816 #endif /* _FFR_WORKERS_POOL */
817