xref: /freebsd/contrib/sendmail/libmilter/worker.c (revision e2c0e292e8a7ca00ba99bcfccc9e637f45c3e8b1)
1 /*
2  *  Copyright (c) 2003-2004, 2007, 2009-2012 Proofpoint, Inc. and its suppliers.
3  *	All rights reserved.
4  *
5  * By using this file, you agree to the terms and conditions set
6  * forth in the LICENSE file which can be found at the top level of
7  * the sendmail distribution.
8  *
9  * Contributed by Jose Marcio Martins da Cruz - Ecole des Mines de Paris
10  *   Jose-Marcio.Martins@ensmp.fr
11  */
12 
13 #include <sm/gen.h>
14 SM_RCSID("@(#)$Id: worker.c,v 8.25 2013-11-22 20:51:37 ca Exp $")
15 
16 #include "libmilter.h"
17 
18 #if _FFR_WORKERS_POOL
19 
20 typedef struct taskmgr_S taskmgr_T;
21 
22 #define TM_SIGNATURE		0x23021957
23 
24 struct taskmgr_S
25 {
26 	long		tm_signature; /* has the controller been initialized */
27 	sthread_t	tm_tid;	/* thread id of controller */
28 	smfi_hd_T	tm_ctx_head; /* head of the linked list of contexts */
29 
30 	int		tm_nb_workers;	/* number of workers in the pool */
31 	int		tm_nb_idle;	/* number of workers waiting */
32 
33 	int		tm_p[2];	/* poll control pipe */
34 
35 	smutex_t	tm_w_mutex;	/* linked list access mutex */
36 	scond_t		tm_w_cond;	/* */
37 };
38 
39 static taskmgr_T     Tskmgr = {0};
40 
41 #define WRK_CTX_HEAD	Tskmgr.tm_ctx_head
42 
43 #define RD_PIPE	(Tskmgr.tm_p[0])
44 #define WR_PIPE	(Tskmgr.tm_p[1])
45 
46 #define PIPE_SEND_SIGNAL()						\
47 	do								\
48 	{								\
49 		char evt = 0x5a;					\
50 		int fd = WR_PIPE;					\
51 		if (write(fd, &evt, sizeof(evt)) != sizeof(evt))	\
52 			smi_log(SMI_LOG_ERR,				\
53 				"Error writing to event pipe: %s",	\
54 				sm_errstring(errno));			\
55 	} while (0)
56 
57 #ifndef USE_PIPE_WAKE_POLL
58 # define USE_PIPE_WAKE_POLL 1
59 #endif
60 
61 /* poll check periodicity (default 10000 - 10 s) */
62 #define POLL_TIMEOUT   10000
63 
64 /* worker conditional wait timeout (default 10 s) */
65 #define COND_TIMEOUT     10
66 
67 /* functions */
68 static int mi_close_session __P((SMFICTX_PTR));
69 
70 static void *mi_worker __P((void *));
71 static void *mi_pool_controller __P((void *));
72 
73 static int mi_list_add_ctx __P((SMFICTX_PTR));
74 static int mi_list_del_ctx __P((SMFICTX_PTR));
75 
76 /*
77 **  periodicity of cleaning up old sessions (timedout)
78 **	sessions list will be checked to find old inactive
79 **	sessions each DT_CHECK_OLD_SESSIONS sec
80 */
81 
82 #define DT_CHECK_OLD_SESSIONS   600
83 
84 #ifndef OLD_SESSION_TIMEOUT
85 # define OLD_SESSION_TIMEOUT      ctx->ctx_timeout
86 #endif
87 
88 /* session states - with respect to the pool of workers */
89 #define WKST_INIT		0	/* initial state */
90 #define WKST_READY_TO_RUN	1	/* command ready do be read */
91 #define WKST_RUNNING		2	/* session running on a worker */
92 #define WKST_READY_TO_WAIT	3	/* session just finished by a worker */
93 #define WKST_WAITING		4	/* waiting for new command */
94 #define WKST_CLOSING		5	/* session finished */
95 
96 #ifndef MIN_WORKERS
97 # define MIN_WORKERS	2  /* minimum number of threads to keep around */
98 #endif
99 
100 #define MIN_IDLE	1  /* minimum number of idle threads */
101 
102 
103 /*
104 **  Macros for threads and mutex management
105 */
106 
107 #define TASKMGR_LOCK()							\
108 	do								\
109 	{								\
110 		if (!smutex_lock(&Tskmgr.tm_w_mutex))			\
111 			smi_log(SMI_LOG_ERR, "TASKMGR_LOCK error");	\
112 	} while (0)
113 
114 #define TASKMGR_UNLOCK()						\
115 	do								\
116 	{								\
117 		if (!smutex_unlock(&Tskmgr.tm_w_mutex))			\
118 			smi_log(SMI_LOG_ERR, "TASKMGR_UNLOCK error");	\
119 	} while (0)
120 
121 #define	TASKMGR_COND_WAIT()						\
122 	scond_timedwait(&Tskmgr.tm_w_cond, &Tskmgr.tm_w_mutex, COND_TIMEOUT)
123 
124 #define	TASKMGR_COND_SIGNAL()						\
125 	do								\
126 	{								\
127 		if (scond_signal(&Tskmgr.tm_w_cond) != 0)		\
128 			smi_log(SMI_LOG_ERR, "TASKMGR_COND_SIGNAL error"); \
129 	} while (0)
130 
131 #define LAUNCH_WORKER(ctx)						\
132 	do								\
133 	{								\
134 		int r;							\
135 		sthread_t tid;						\
136 									\
137 		if ((r = thread_create(&tid, mi_worker, ctx)) != 0)	\
138 			smi_log(SMI_LOG_ERR, "LAUNCH_WORKER error: %s",\
139 				sm_errstring(r));			\
140 	} while (0)
141 
142 #if POOL_DEBUG
143 # define POOL_LEV_DPRINTF(lev, x)					\
144 	do								\
145 	{								\
146 		if (ctx != NULL && (lev) < ctx->ctx_dbg)		\
147 			sm_dprintf x;					\
148 	} while (0)
149 #else /* POOL_DEBUG */
150 # define POOL_LEV_DPRINTF(lev, x)
151 #endif /* POOL_DEBUG */
152 
153 /*
154 **  MI_START_SESSION -- Start a session in the pool of workers
155 **
156 **	Parameters:
157 **		ctx -- context structure
158 **
159 **	Returns:
160 **		MI_SUCCESS/MI_FAILURE
161 */
162 
163 int
mi_start_session(ctx)164 mi_start_session(ctx)
165 	SMFICTX_PTR ctx;
166 {
167 	static long id = 0;
168 
169 	/* this can happen if the milter is shutting down */
170 	if (Tskmgr.tm_signature != TM_SIGNATURE)
171 		return MI_FAILURE;
172 	SM_ASSERT(ctx != NULL);
173 	POOL_LEV_DPRINTF(4, ("PIPE r=[%d] w=[%d]", RD_PIPE, WR_PIPE));
174 	TASKMGR_LOCK();
175 
176 	if (mi_list_add_ctx(ctx) != MI_SUCCESS)
177 	{
178 		TASKMGR_UNLOCK();
179 		return MI_FAILURE;
180 	}
181 
182 	ctx->ctx_sid = id++;
183 
184 	/* if there is an idle worker, signal it, otherwise start new worker */
185 	if (Tskmgr.tm_nb_idle > 0)
186 	{
187 		ctx->ctx_wstate = WKST_READY_TO_RUN;
188 		TASKMGR_COND_SIGNAL();
189 	}
190 	else
191 	{
192 		ctx->ctx_wstate = WKST_RUNNING;
193 		LAUNCH_WORKER(ctx);
194 	}
195 	TASKMGR_UNLOCK();
196 	return MI_SUCCESS;
197 }
198 
199 /*
200 **  MI_CLOSE_SESSION -- Close a session and clean up data structures
201 **
202 **	Parameters:
203 **		ctx -- context structure
204 **
205 **	Returns:
206 **		MI_SUCCESS/MI_FAILURE
207 */
208 
209 static int
mi_close_session(ctx)210 mi_close_session(ctx)
211 	SMFICTX_PTR ctx;
212 {
213 	SM_ASSERT(ctx != NULL);
214 
215 	(void) mi_list_del_ctx(ctx);
216 	mi_clr_ctx(ctx);
217 
218 	return MI_SUCCESS;
219 }
220 
221 /*
222 **  NONBLOCKING -- set nonblocking mode for a file descriptor.
223 **
224 **	Parameters:
225 **		fd -- file descriptor
226 **		name -- name for (error) logging
227 **
228 **	Returns:
229 **		MI_SUCCESS/MI_FAILURE
230 */
231 
232 static int
nonblocking(int fd,const char * name)233 nonblocking(int fd, const char *name)
234 {
235 	int r;
236 
237 	errno = 0;
238 	r = fcntl(fd, F_GETFL, 0);
239 	if (r == -1)
240 	{
241 		smi_log(SMI_LOG_ERR, "fcntl(%s, F_GETFL)=%s",
242 			name, sm_errstring(errno));
243 		return MI_FAILURE;
244 	}
245 	errno = 0;
246 	r = fcntl(fd, F_SETFL, r | O_NONBLOCK);
247 	if (r == -1)
248 	{
249 		smi_log(SMI_LOG_ERR, "fcntl(%s, F_SETFL, O_NONBLOCK)=%s",
250 			name, sm_errstring(errno));
251 		return MI_FAILURE;
252 	}
253 	return MI_SUCCESS;
254 }
255 
256 /*
257 **  MI_POOL_CONTROLLER_INIT -- Launch the worker pool controller
258 **		Must be called before starting sessions.
259 **
260 **	Parameters:
261 **		none
262 **
263 **	Returns:
264 **		MI_SUCCESS/MI_FAILURE
265 */
266 
267 int
mi_pool_controller_init()268 mi_pool_controller_init()
269 {
270 	sthread_t tid;
271 	int r, i;
272 
273 	if (Tskmgr.tm_signature == TM_SIGNATURE)
274 		return MI_SUCCESS;
275 
276 	SM_TAILQ_INIT(&WRK_CTX_HEAD);
277 	Tskmgr.tm_tid = (sthread_t) -1;
278 	Tskmgr.tm_nb_workers = 0;
279 	Tskmgr.tm_nb_idle = 0;
280 
281 	if (pipe(Tskmgr.tm_p) != 0)
282 	{
283 		smi_log(SMI_LOG_ERR, "can't create event pipe: %s",
284 			sm_errstring(errno));
285 		return MI_FAILURE;
286 	}
287 	r = nonblocking(WR_PIPE, "WR_PIPE");
288 	if (r != MI_SUCCESS)
289 		return r;
290 	r = nonblocking(RD_PIPE, "RD_PIPE");
291 	if (r != MI_SUCCESS)
292 		return r;
293 
294 	(void) smutex_init(&Tskmgr.tm_w_mutex);
295 	(void) scond_init(&Tskmgr.tm_w_cond);
296 
297 	/* Launch the pool controller */
298 	if ((r = thread_create(&tid, mi_pool_controller, (void *) NULL)) != 0)
299 	{
300 		smi_log(SMI_LOG_ERR, "can't create controller thread: %s",
301 			sm_errstring(r));
302 		return MI_FAILURE;
303 	}
304 	Tskmgr.tm_tid = tid;
305 	Tskmgr.tm_signature = TM_SIGNATURE;
306 
307 	/* Create the pool of workers */
308 	for (i = 0; i < MIN_WORKERS; i++)
309 	{
310 		if ((r = thread_create(&tid, mi_worker, (void *) NULL)) != 0)
311 		{
312 			smi_log(SMI_LOG_ERR, "can't create workers crew: %s",
313 				sm_errstring(r));
314 			return MI_FAILURE;
315 		}
316 	}
317 
318 	return MI_SUCCESS;
319 }
320 
321 /*
322 **  MI_POOL_CONTROLLER -- manage the pool of workers
323 **	This thread must be running when listener begins
324 **	starting sessions
325 **
326 **	Parameters:
327 **		arg -- unused
328 **
329 **	Returns:
330 **		NULL
331 **
332 **	Control flow:
333 **		for (;;)
334 **			Look for timed out sessions
335 **			Select sessions to wait for sendmail command
336 **			Poll set of file descriptors
337 **			if timeout
338 **				continue
339 **			For each file descriptor ready
340 **				launch new thread if no worker available
341 **				else
342 **				signal waiting worker
343 */
344 
345 /* Poll structure array (pollfd) size step */
346 #define PFD_STEP	256
347 
348 #define WAIT_FD(i)	(pfd[i].fd)
349 #define WAITFN		"POLL"
350 
351 static void *
mi_pool_controller(arg)352 mi_pool_controller(arg)
353 	void *arg;
354 {
355 	struct pollfd *pfd = NULL;
356 	int dim_pfd = 0;
357 	bool rebuild_set = true;
358 	int pcnt = 0; /* error count for poll() failures */
359 	time_t lastcheck;
360 
361 	Tskmgr.tm_tid = sthread_get_id();
362 	if (pthread_detach(Tskmgr.tm_tid) != 0)
363 	{
364 		smi_log(SMI_LOG_ERR, "Failed to detach pool controller thread");
365 		return NULL;
366 	}
367 
368 	pfd = (struct pollfd *) malloc(PFD_STEP * sizeof(struct pollfd));
369 	if (pfd == NULL)
370 	{
371 		smi_log(SMI_LOG_ERR, "Failed to malloc pollfd array: %s",
372 			sm_errstring(errno));
373 		return NULL;
374 	}
375 	dim_pfd = PFD_STEP;
376 
377 	lastcheck = time(NULL);
378 	for (;;)
379 	{
380 		SMFICTX_PTR ctx;
381 		int nfd, r, i;
382 		time_t now;
383 
384 		if (mi_stop() != MILTER_CONT)
385 			break;
386 
387 		TASKMGR_LOCK();
388 
389 		now = time(NULL);
390 
391 		/* check for timed out sessions? */
392 		if (lastcheck + DT_CHECK_OLD_SESSIONS < now)
393 		{
394 			ctx = SM_TAILQ_FIRST(&WRK_CTX_HEAD);
395 			while (ctx != SM_TAILQ_END(&WRK_CTX_HEAD))
396 			{
397 				SMFICTX_PTR ctx_nxt;
398 
399 				ctx_nxt = SM_TAILQ_NEXT(ctx, ctx_link);
400 				if (ctx->ctx_wstate == WKST_WAITING)
401 				{
402 					if (ctx->ctx_wait == 0)
403 						ctx->ctx_wait = now;
404 					else if (ctx->ctx_wait + OLD_SESSION_TIMEOUT
405 						 < now)
406 					{
407 						/* if session timed out, close it */
408 						sfsistat (*fi_close) __P((SMFICTX *));
409 
410 						POOL_LEV_DPRINTF(4,
411 							("Closing old connection: sd=%d id=%d",
412 							ctx->ctx_sd,
413 							ctx->ctx_sid));
414 
415 						if ((fi_close = ctx->ctx_smfi->xxfi_close) != NULL)
416 							(void) (*fi_close)(ctx);
417 
418 						mi_close_session(ctx);
419 					}
420 				}
421 				ctx = ctx_nxt;
422 			}
423 			lastcheck = now;
424 		}
425 
426 		if (rebuild_set)
427 		{
428 			/*
429 			**  Initialize poll set.
430 			**  Insert into the poll set the file descriptors of
431 			**  all sessions waiting for a command from sendmail.
432 			*/
433 
434 			nfd = 0;
435 
436 			/* begin with worker pipe */
437 			pfd[nfd].fd = RD_PIPE;
438 			pfd[nfd].events = MI_POLL_RD_FLAGS;
439 			pfd[nfd].revents = 0;
440 			nfd++;
441 
442 			SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link)
443 			{
444 				/*
445 				**  update ctx_wait - start of wait moment -
446 				**  for timeout
447 				*/
448 
449 				if (ctx->ctx_wstate == WKST_READY_TO_WAIT)
450 					ctx->ctx_wait = now;
451 
452 				/* add the session to the pollfd array? */
453 				if ((ctx->ctx_wstate == WKST_READY_TO_WAIT) ||
454 				    (ctx->ctx_wstate == WKST_WAITING))
455 				{
456 					/*
457 					**  Resize the pollfd array if it
458 					**  isn't large enough.
459 					*/
460 
461 					if (nfd >= dim_pfd)
462 					{
463 						struct pollfd *tpfd;
464 						size_t new;
465 
466 						new = (dim_pfd + PFD_STEP) *
467 							sizeof(*tpfd);
468 						tpfd = (struct pollfd *)
469 							realloc(pfd, new);
470 						if (tpfd != NULL)
471 						{
472 							pfd = tpfd;
473 							dim_pfd += PFD_STEP;
474 						}
475 						else
476 						{
477 							smi_log(SMI_LOG_ERR,
478 								"Failed to realloc pollfd array:%s",
479 								sm_errstring(errno));
480 						}
481 					}
482 
483 					/* add the session to pollfd array */
484 					if (nfd < dim_pfd)
485 					{
486 						ctx->ctx_wstate = WKST_WAITING;
487 						pfd[nfd].fd = ctx->ctx_sd;
488 						pfd[nfd].events = MI_POLL_RD_FLAGS;
489 						pfd[nfd].revents = 0;
490 						nfd++;
491 					}
492 				}
493 			}
494 			rebuild_set = false;
495 		}
496 
497 		TASKMGR_UNLOCK();
498 
499 		/* Everything is ready, let's wait for an event */
500 		r = poll(pfd, nfd, POLL_TIMEOUT);
501 
502 		POOL_LEV_DPRINTF(4, ("%s returned: at epoch %d value %d",
503 			WAITFN, now, nfd));
504 
505 		/* timeout */
506 		if (r == 0)
507 			continue;
508 
509 		rebuild_set = true;
510 
511 		/* error */
512 		if (r < 0)
513 		{
514 			if (errno == EINTR)
515 				continue;
516 			pcnt++;
517 			smi_log(SMI_LOG_ERR,
518 				"%s() failed (%s), %s",
519 				WAITFN, sm_errstring(errno),
520 				pcnt >= MAX_FAILS_S ? "abort" : "try again");
521 
522 			if (pcnt >= MAX_FAILS_S)
523 				goto err;
524 			continue;
525 		}
526 		pcnt = 0;
527 
528 		/* something happened */
529 		for (i = 0; i < nfd; i++)
530 		{
531 			if (pfd[i].revents == 0)
532 				continue;
533 
534 			POOL_LEV_DPRINTF(4, ("%s event on pfd[%d/%d]=%d ",
535 				WAITFN, i, nfd,
536 			WAIT_FD(i)));
537 
538 			/* has a worker signaled an end of task? */
539 			if (WAIT_FD(i) == RD_PIPE)
540 			{
541 				char evts[256];
542 				ssize_t r;
543 
544 				POOL_LEV_DPRINTF(4,
545 					("PIPE WILL READ evt = %08X %08X",
546 					pfd[i].events, pfd[i].revents));
547 
548 				r = 1;
549 				while ((pfd[i].revents & MI_POLL_RD_FLAGS) != 0
550 					&& r != -1)
551 				{
552 					r = read(RD_PIPE, evts, sizeof(evts));
553 				}
554 
555 				POOL_LEV_DPRINTF(4,
556 					("PIPE DONE READ i=[%d] fd=[%d] r=[%d] evt=[%d]",
557 					i, RD_PIPE, (int) r, evts[0]));
558 
559 				if ((pfd[i].revents & ~MI_POLL_RD_FLAGS) != 0)
560 				{
561 					/* Exception handling */
562 				}
563 				continue;
564 			}
565 
566 			/*
567 			**  Not the pipe for workers waking us,
568 			**  so must be something on an MTA connection.
569 			*/
570 
571 			TASKMGR_LOCK();
572 			SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link)
573 			{
574 				if (ctx->ctx_wstate != WKST_WAITING)
575 					continue;
576 
577 				POOL_LEV_DPRINTF(4,
578 					("Checking context sd=%d - fd=%d ",
579 					ctx->ctx_sd , WAIT_FD(i)));
580 
581 				if (ctx->ctx_sd == pfd[i].fd)
582 				{
583 
584 					POOL_LEV_DPRINTF(4,
585 						("TASK: found %d for fd[%d]=%d",
586 						ctx->ctx_sid, i, WAIT_FD(i)));
587 
588 					if (Tskmgr.tm_nb_idle > 0)
589 					{
590 						ctx->ctx_wstate = WKST_READY_TO_RUN;
591 						TASKMGR_COND_SIGNAL();
592 					}
593 					else
594 					{
595 						ctx->ctx_wstate = WKST_RUNNING;
596 						LAUNCH_WORKER(ctx);
597 					}
598 					break;
599 				}
600 			}
601 			TASKMGR_UNLOCK();
602 
603 			POOL_LEV_DPRINTF(4,
604 				("TASK %s FOUND - Checking PIPE for fd[%d]",
605 				ctx != NULL ? "" : "NOT", WAIT_FD(i)));
606 		}
607 	}
608 
609   err:
610 	if (pfd != NULL)
611 		free(pfd);
612 
613 	Tskmgr.tm_signature = 0;
614 #if 0
615 	/*
616 	**  Do not clean up ctx -- it can cause double-free()s.
617 	**  The program is shutting down anyway, so it's not worth the trouble.
618 	**  There is a more complex solution that prevents race conditions
619 	**  while accessing ctx, but that's maybe for a later version.
620 	*/
621 
622 	for (;;)
623 	{
624 		SMFICTX_PTR ctx;
625 
626 		ctx = SM_TAILQ_FIRST(&WRK_CTX_HEAD);
627 		if (ctx == NULL)
628 			break;
629 		mi_close_session(ctx);
630 	}
631 #endif
632 
633 	(void) smutex_destroy(&Tskmgr.tm_w_mutex);
634 	(void) scond_destroy(&Tskmgr.tm_w_cond);
635 
636 	return NULL;
637 }
638 
639 /*
640 **  Look for a task ready to run.
641 **  Value of ctx is NULL or a pointer to a task ready to run.
642 */
643 
644 #define GET_TASK_READY_TO_RUN()					\
645 	SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link)		\
646 	{							\
647 		if (ctx->ctx_wstate == WKST_READY_TO_RUN)	\
648 		{						\
649 			ctx->ctx_wstate = WKST_RUNNING;		\
650 			break;					\
651 		}						\
652 	}
653 
654 /*
655 **  MI_WORKER -- worker thread
656 **	executes tasks distributed by the mi_pool_controller
657 **	or by mi_start_session
658 **
659 **	Parameters:
660 **		arg -- pointer to context structure
661 **
662 **	Returns:
663 **		NULL pointer
664 */
665 
666 static void *
mi_worker(arg)667 mi_worker(arg)
668 	void *arg;
669 {
670 	SMFICTX_PTR ctx;
671 	bool done;
672 	sthread_t t_id;
673 	int r;
674 
675 	ctx = (SMFICTX_PTR) arg;
676 	done = false;
677 	if (ctx != NULL)
678 		ctx->ctx_wstate = WKST_RUNNING;
679 
680 	t_id = sthread_get_id();
681 	if (pthread_detach(t_id) != 0)
682 	{
683 		smi_log(SMI_LOG_ERR, "Failed to detach worker thread");
684 		if (ctx != NULL)
685 			ctx->ctx_wstate = WKST_READY_TO_RUN;
686 		return NULL;
687 	}
688 
689 	TASKMGR_LOCK();
690 	Tskmgr.tm_nb_workers++;
691 	TASKMGR_UNLOCK();
692 
693 	while (!done)
694 	{
695 		if (mi_stop() != MILTER_CONT)
696 			break;
697 
698 		/* let's handle next task... */
699 		if (ctx != NULL)
700 		{
701 			int res;
702 
703 			POOL_LEV_DPRINTF(4,
704 				("worker %d: new task -> let's handle it",
705 				t_id));
706 			res = mi_engine(ctx);
707 			POOL_LEV_DPRINTF(4,
708 				("worker %d: mi_engine returned %d", t_id, res));
709 
710 			TASKMGR_LOCK();
711 			if (res != MI_CONTINUE)
712 			{
713 				ctx->ctx_wstate = WKST_CLOSING;
714 
715 				/*
716 				**  Delete context from linked list of
717 				**  sessions and close session.
718 				*/
719 
720 				mi_close_session(ctx);
721 			}
722 			else
723 			{
724 				ctx->ctx_wstate = WKST_READY_TO_WAIT;
725 
726 				POOL_LEV_DPRINTF(4,
727 					("writing to event pipe..."));
728 
729 				/*
730 				**  Signal task controller to add new session
731 				**  to poll set.
732 				*/
733 
734 				PIPE_SEND_SIGNAL();
735 			}
736 			TASKMGR_UNLOCK();
737 			ctx = NULL;
738 
739 		}
740 
741 		/* check if there is any task waiting to be served */
742 		TASKMGR_LOCK();
743 
744 		GET_TASK_READY_TO_RUN();
745 
746 		/* Got a task? */
747 		if (ctx != NULL)
748 		{
749 			TASKMGR_UNLOCK();
750 			continue;
751 		}
752 
753 		/*
754 		**  if not, let's check if there is enough idle workers
755 		**	if yes: quit
756 		*/
757 
758 		if (Tskmgr.tm_nb_workers > MIN_WORKERS &&
759 		    Tskmgr.tm_nb_idle > MIN_IDLE)
760 			done = true;
761 
762 		POOL_LEV_DPRINTF(4, ("worker %d: checking ... %d %d", t_id,
763 			Tskmgr.tm_nb_workers, Tskmgr.tm_nb_idle + 1));
764 
765 		if (done)
766 		{
767 			POOL_LEV_DPRINTF(4, ("worker %d: quitting... ", t_id));
768 			Tskmgr.tm_nb_workers--;
769 			TASKMGR_UNLOCK();
770 			continue;
771 		}
772 
773 		/*
774 		**  if no task ready to run, wait for another one
775 		*/
776 
777 		Tskmgr.tm_nb_idle++;
778 		TASKMGR_COND_WAIT();
779 		Tskmgr.tm_nb_idle--;
780 
781 		/* look for a task */
782 		GET_TASK_READY_TO_RUN();
783 
784 		TASKMGR_UNLOCK();
785 	}
786 	return NULL;
787 }
788 
789 /*
790 **  MI_LIST_ADD_CTX -- add new session to linked list
791 **
792 **	Parameters:
793 **		ctx -- context structure
794 **
795 **	Returns:
796 **		MI_FAILURE/MI_SUCCESS
797 */
798 
799 static int
mi_list_add_ctx(ctx)800 mi_list_add_ctx(ctx)
801 	SMFICTX_PTR ctx;
802 {
803 	SM_ASSERT(ctx != NULL);
804 	SM_TAILQ_INSERT_TAIL(&WRK_CTX_HEAD, ctx, ctx_link);
805 	return MI_SUCCESS;
806 }
807 
808 /*
809 **  MI_LIST_DEL_CTX -- remove session from linked list when finished
810 **
811 **	Parameters:
812 **		ctx -- context structure
813 **
814 **	Returns:
815 **		MI_FAILURE/MI_SUCCESS
816 */
817 
818 static int
mi_list_del_ctx(ctx)819 mi_list_del_ctx(ctx)
820 	SMFICTX_PTR ctx;
821 {
822 	SM_ASSERT(ctx != NULL);
823 	if (SM_TAILQ_EMPTY(&WRK_CTX_HEAD))
824 		return MI_FAILURE;
825 
826 	SM_TAILQ_REMOVE(&WRK_CTX_HEAD, ctx, ctx_link);
827 	return MI_SUCCESS;
828 }
829 #endif /* _FFR_WORKERS_POOL */
830