xref: /illumos-gate/usr/src/cmd/sendmail/libmilter/worker.c (revision 34a0f871d192b33b865455a8812a3d34c1866315)
1 /*
2  *  Copyright (c) 2003-2004, 2006 Sendmail, Inc. and its suppliers.
3  *	All rights reserved.
4  *
5  * By using this file, you agree to the terms and conditions set
6  * forth in the LICENSE file which can be found at the top level of
7  * the sendmail distribution.
8  *
9  * Contributed by Jose Marcio Martins da Cruz - Ecole des Mines de Paris
10  *   Jose-Marcio.Martins@ensmp.fr
11  */
12 
13 #pragma ident	"%Z%%M%	%I%	%E% SMI"
14 
15 #include <sm/gen.h>
16 SM_RCSID("@(#)$Id: worker.c,v 8.9 2006/12/18 18:26:51 ca Exp $")
17 
18 #include "libmilter.h"
19 
20 #if _FFR_WORKERS_POOL
21 
22 typedef struct taskmgr_S taskmgr_T;
23 
24 #define TM_SIGNATURE		0x23021957
25 
26 struct taskmgr_S
27 {
28 	long		tm_signature; /* has the controller been initialized */
29 	sthread_t	tm_tid;	/* thread id of controller */
30 	smfi_hd_T	tm_ctx_head; /* head of the linked list of contexts */
31 
32 	int		tm_nb_workers;	/* number of workers in the pool */
33 	int		tm_nb_idle;	/* number of workers waiting */
34 
35 	int		tm_p[2];	/* poll control pipe */
36 
37 	smutex_t	tm_w_mutex;	/* linked list access mutex */
38 	scond_t		tm_w_cond;	/* */
39 };
40 
41 static taskmgr_T     Tskmgr = {0};
42 
43 #define WRK_CTX_HEAD	Tskmgr.tm_ctx_head
44 
45 #define RD_PIPE	(Tskmgr.tm_p[0])
46 #define WR_PIPE	(Tskmgr.tm_p[1])
47 
48 #define PIPE_SEND_SIGNAL()						\
49 	do								\
50 	{								\
51 		char evt = 0x5a;					\
52 		int fd = WR_PIPE;					\
53 		if (write(fd, &evt, sizeof(evt)) != sizeof(evt))	\
54 			smi_log(SMI_LOG_ERR,				\
55 				"Error writing to event pipe: %s",	\
56 				sm_errstring(errno));			\
57 	} while (0)
58 
59 #ifndef USE_PIPE_WAKE_POLL
60 # define USE_PIPE_WAKE_POLL 1
61 #endif /* USE_PIPE_WAKE_POLL */
62 
63 /* poll check periodicity (default 10000 - 10 s) */
64 #define POLL_TIMEOUT   10000
65 
66 /* worker conditional wait timeout (default 10 s) */
67 #define COND_TIMEOUT     10
68 
69 /* functions */
70 static int mi_close_session __P((SMFICTX_PTR));
71 
72 static void *mi_worker __P((void *));
73 static void *mi_pool_controller __P((void *));
74 
75 static int mi_list_add_ctx __P((SMFICTX_PTR));
76 static int mi_list_del_ctx __P((SMFICTX_PTR));
77 
78 /*
79 **  periodicity of cleaning up old sessions (timedout)
80 **	sessions list will be checked to find old inactive
81 **	sessions each DT_CHECK_OLD_SESSIONS sec
82 */
83 
84 #define DT_CHECK_OLD_SESSIONS   600
85 
86 #ifndef OLD_SESSION_TIMEOUT
87 # define OLD_SESSION_TIMEOUT      ctx->ctx_timeout
88 #endif /* OLD_SESSION_TIMEOUT */
89 
90 /* session states - with respect to the pool of workers */
91 #define WKST_INIT		0	/* initial state */
92 #define WKST_READY_TO_RUN	1	/* command ready do be read */
93 #define WKST_RUNNING		2	/* session running on a worker */
94 #define WKST_READY_TO_WAIT	3	/* session just finished by a worker */
95 #define WKST_WAITING		4	/* waiting for new command */
96 #define WKST_CLOSING		5	/* session finished */
97 
98 #ifndef MIN_WORKERS
99 # define MIN_WORKERS	2  /* minimum number of threads to keep around */
100 #endif
101 
102 #define MIN_IDLE	1  /* minimum number of idle threads */
103 
104 
105 /*
106 **  Macros for threads and mutex management
107 */
108 
109 #define TASKMGR_LOCK()							\
110 	do								\
111 	{								\
112 		if (!smutex_lock(&Tskmgr.tm_w_mutex))			\
113 			smi_log(SMI_LOG_ERR, "TASKMGR_LOCK error");	\
114 	} while (0)
115 
116 #define TASKMGR_UNLOCK()						\
117 	do								\
118 	{								\
119 		if (!smutex_unlock(&Tskmgr.tm_w_mutex))			\
120 			smi_log(SMI_LOG_ERR, "TASKMGR_UNLOCK error");	\
121 	} while (0)
122 
123 #define	TASKMGR_COND_WAIT()						\
124 	scond_timedwait(&Tskmgr.tm_w_cond, &Tskmgr.tm_w_mutex, COND_TIMEOUT)
125 
126 #define	TASKMGR_COND_SIGNAL()						\
127 	do								\
128 	{								\
129 		if (scond_signal(&Tskmgr.tm_w_cond) != 0)		\
130 			smi_log(SMI_LOG_ERR, "TASKMGR_COND_SIGNAL error"); \
131 	} while (0)
132 
133 #define LAUNCH_WORKER(ctx)						\
134 	do								\
135 	{								\
136 		int r;							\
137 		sthread_t tid;						\
138 									\
139 		if ((r = thread_create(&tid, mi_worker, ctx)) != 0)	\
140 			smi_log(SMI_LOG_ERR, "LAUNCH_WORKER error: %s",\
141 				sm_errstring(r));			\
142 	} while (0)
143 
144 #if POOL_DEBUG
145 # define POOL_LEV_DPRINTF(lev, x)					\
146 	do {								\
147 		if ((lev) < ctx->ctx_dbg)				\
148 			sm_dprintf x;					\
149 	} while (0)
150 #else /* POOL_DEBUG */
151 # define POOL_LEV_DPRINTF(lev, x)
152 #endif /* POOL_DEBUG */
153 
154 /*
155 **  MI_START_SESSION -- Start a session in the pool of workers
156 **
157 **	Parameters:
158 **		ctx -- context structure
159 **
160 **	Returns:
161 **		MI_SUCCESS/MI_FAILURE
162 */
163 
164 int
165 mi_start_session(ctx)
166 	SMFICTX_PTR ctx;
167 {
168 	static long id = 0;
169 
170 	SM_ASSERT(Tskmgr.tm_signature == TM_SIGNATURE);
171 	SM_ASSERT(ctx != NULL);
172 	POOL_LEV_DPRINTF(4, ("PIPE r=[%d] w=[%d]", RD_PIPE, WR_PIPE));
173 	TASKMGR_LOCK();
174 
175 	if (mi_list_add_ctx(ctx) != MI_SUCCESS)
176 	{
177 		TASKMGR_UNLOCK();
178 		return MI_FAILURE;
179 	}
180 
181 	ctx->ctx_sid = id++;
182 
183 	/* if there is an idle worker, signal it, otherwise start new worker */
184 	if (Tskmgr.tm_nb_idle > 0)
185 	{
186 		ctx->ctx_wstate = WKST_READY_TO_RUN;
187 		TASKMGR_COND_SIGNAL();
188 	}
189 	else
190 	{
191 		ctx->ctx_wstate = WKST_RUNNING;
192 		LAUNCH_WORKER(ctx);
193 	}
194 	TASKMGR_UNLOCK();
195 	return MI_SUCCESS;
196 }
197 
198 /*
199 **  MI_CLOSE_SESSION -- Close a session and clean up data structures
200 **
201 **	Parameters:
202 **		ctx -- context structure
203 **
204 **	Returns:
205 **		MI_SUCCESS/MI_FAILURE
206 */
207 
208 static int
209 mi_close_session(ctx)
210 	SMFICTX_PTR ctx;
211 {
212 	SM_ASSERT(ctx != NULL);
213 
214 	(void) mi_list_del_ctx(ctx);
215 	if (ValidSocket(ctx->ctx_sd))
216 	{
217 		(void) closesocket(ctx->ctx_sd);
218 		ctx->ctx_sd = INVALID_SOCKET;
219 	}
220 	if (ctx->ctx_reply != NULL)
221 	{
222 		free(ctx->ctx_reply);
223 		ctx->ctx_reply = NULL;
224 	}
225 	if (ctx->ctx_privdata != NULL)
226 	{
227 		smi_log(SMI_LOG_WARN, "%s: private data not NULL",
228 			ctx->ctx_smfi->xxfi_name);
229 	}
230 	mi_clr_macros(ctx, 0);
231 	free(ctx);
232 
233 	return MI_SUCCESS;
234 }
235 
236 /*
237 **  MI_POOL_CONTROLER_INIT -- Launch the worker pool controller
238 **		Must be called before starting sessions.
239 **
240 **	Parameters:
241 **		none
242 **
243 **	Returns:
244 **		MI_SUCCESS/MI_FAILURE
245 */
246 
247 int
248 mi_pool_controller_init()
249 {
250 	sthread_t tid;
251 	int r, i;
252 
253 	if (Tskmgr.tm_signature == TM_SIGNATURE)
254 		return MI_SUCCESS;
255 
256 	SM_TAILQ_INIT(&WRK_CTX_HEAD);
257 	Tskmgr.tm_tid = (sthread_t) -1;
258 	Tskmgr.tm_nb_workers = 0;
259 	Tskmgr.tm_nb_idle = 0;
260 
261 	if (pipe(Tskmgr.tm_p) != 0)
262 	{
263 		smi_log(SMI_LOG_ERR, "can't create event pipe: %s",
264 			sm_errstring(r));
265 		return MI_FAILURE;
266 	}
267 
268 	POOL_LEV_DPRINTF(4, ("PIPE r=[%d] w=[%d]", RD_PIPE, WR_PIPE));
269 
270 	(void) smutex_init(&Tskmgr.tm_w_mutex);
271 	(void) scond_init(&Tskmgr.tm_w_cond);
272 
273 	/* Launch the pool controller */
274 	if ((r = thread_create(&tid, mi_pool_controller, (void *) NULL)) != 0)
275 	{
276 		smi_log(SMI_LOG_ERR, "can't create controller thread: %s",
277 			sm_errstring(r));
278 		return MI_FAILURE;
279 	}
280 	Tskmgr.tm_tid = tid;
281 	Tskmgr.tm_signature = TM_SIGNATURE;
282 
283 	/* Create the pool of workers */
284 	for (i = 0; i < MIN_WORKERS; i++)
285 	{
286 		if ((r = thread_create(&tid, mi_worker, (void *) NULL)) != 0)
287 		{
288 			smi_log(SMI_LOG_ERR, "can't create workers crew: %s",
289 				sm_errstring(r));
290 			return MI_FAILURE;
291 		}
292 	}
293 
294 	return MI_SUCCESS;
295 }
296 
297 /*
298 **  MI_POOL_CONTROLLER -- manage the pool of workers
299 **	This thread must be running when listener begins
300 **	starting sessions
301 **
302 **	Parameters:
303 **		arg -- unused
304 **
305 **	Returns:
306 **		NULL
307 **
308 **	Control flow:
309 **		for (;;)
310 **			Look for timed out sessions
311 **			Select sessions to wait for sendmail command
312 **			Poll set of file descriptors
313 **			if timeout
314 **				continue
315 **			For each file descriptor ready
316 **				launch new thread if no worker available
317 **				else
318 **				signal waiting worker
319 */
320 
321 /* Poll structure array (pollfd) size step */
322 #define PFD_STEP	256
323 
324 #define WAIT_FD(i)	(pfd[i].fd)
325 #define WAITFN		"POLL"
326 
327 static void *
328 mi_pool_controller(arg)
329 	void *arg;
330 {
331 	struct pollfd *pfd = NULL;
332 	int dim_pfd = 0;
333 	bool rebuild_set = true;
334 	int pcnt = 0; /* error count for poll() failures */
335 
336 	Tskmgr.tm_tid = sthread_get_id();
337 	if (pthread_detach(Tskmgr.tm_tid) != 0)
338 	{
339 		smi_log(SMI_LOG_ERR, "Failed to detach pool controller thread");
340 		return NULL;
341 	}
342 
343 	pfd = (struct pollfd *) malloc(PFD_STEP * sizeof(struct pollfd));
344 	if (pfd == NULL)
345 	{
346 		smi_log(SMI_LOG_ERR, "Failed to malloc pollfd array: %s",
347 			sm_errstring(errno));
348 		return NULL;
349 	}
350 	dim_pfd = PFD_STEP;
351 
352 	for (;;)
353 	{
354 		SMFICTX_PTR ctx;
355 		int nfd, rfd, i;
356 		time_t now;
357 		time_t lastcheck;
358 
359 		POOL_LEV_DPRINTF(4, ("Let's %s again...", WAITFN));
360 
361 		if (mi_stop() != MILTER_CONT)
362 			break;
363 
364 		TASKMGR_LOCK();
365 
366 		now = time(NULL);
367 
368 		/* check for timed out sessions? */
369 		if (lastcheck + DT_CHECK_OLD_SESSIONS < now)
370 		{
371 			SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link)
372 			{
373 				if (ctx->ctx_wstate == WKST_WAITING)
374 				{
375 					if (ctx->ctx_wait == 0)
376 					{
377 						ctx->ctx_wait = now;
378 						continue;
379 					}
380 
381 					/* if session timed out, close it */
382 					if (ctx->ctx_wait + OLD_SESSION_TIMEOUT
383 					    < now)
384 					{
385 						sfsistat (*fi_close) __P((SMFICTX *));
386 
387 						POOL_LEV_DPRINTF(4,
388 							("Closing old connection: sd=%d id=%d",
389 							ctx->ctx_sd,
390 							ctx->ctx_sid));
391 
392 						if ((fi_close = ctx->ctx_smfi->xxfi_close) != NULL)
393 							(void) (*fi_close)(ctx);
394 
395 						mi_close_session(ctx);
396 						ctx = SM_TAILQ_FIRST(&WRK_CTX_HEAD);
397 						continue;
398 					}
399 				}
400 			}
401 			lastcheck = now;
402 		}
403 
404 		if (rebuild_set)
405 		{
406 			/*
407 			**  Initialize poll set.
408 			**  Insert into the poll set the file descriptors of
409 			**  all sessions waiting for a command from sendmail.
410 			*/
411 
412 			nfd = 0;
413 
414 			/* begin with worker pipe */
415 			pfd[nfd].fd = RD_PIPE;
416 			pfd[nfd].events = MI_POLL_RD_FLAGS;
417 			pfd[nfd].revents = 0;
418 			nfd++;
419 
420 			SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link)
421 			{
422 				/*
423 				**  update ctx_wait - start of wait moment -
424 				**  for timeout
425 				*/
426 
427 				if (ctx->ctx_wstate == WKST_READY_TO_WAIT)
428 					ctx->ctx_wait = now;
429 
430 				/* add the session to the pollfd array? */
431 				if ((ctx->ctx_wstate == WKST_READY_TO_WAIT) ||
432 				    (ctx->ctx_wstate == WKST_WAITING))
433 				{
434 					/*
435 					**  Resize the pollfd array if it
436 					**  isn't large enough.
437 					*/
438 
439 					if (nfd >= dim_pfd)
440 					{
441 						struct pollfd *tpfd;
442 						size_t new;
443 
444 						new = (dim_pfd + PFD_STEP) *
445 							sizeof(*tpfd);
446 						tpfd = (struct pollfd *)
447 							realloc(pfd, new);
448 						if (tpfd != NULL)
449 						{
450 							pfd = tpfd;
451 							dim_pfd += PFD_STEP;
452 						}
453 						else
454 						{
455 							smi_log(SMI_LOG_ERR,
456 								"Failed to realloc pollfd array:%s",
457 								sm_errstring(errno));
458 						}
459 					}
460 
461 					/* add the session to pollfd array */
462 					if (nfd < dim_pfd)
463 					{
464 						ctx->ctx_wstate = WKST_WAITING;
465 						pfd[nfd].fd = ctx->ctx_sd;
466 						pfd[nfd].events = MI_POLL_RD_FLAGS;
467 						pfd[nfd].revents = 0;
468 						nfd++;
469 					}
470 				}
471 			}
472 		}
473 
474 		TASKMGR_UNLOCK();
475 
476 		/* Everything is ready, let's wait for an event */
477 		rfd = poll(pfd, nfd, POLL_TIMEOUT);
478 
479 		POOL_LEV_DPRINTF(4, ("%s returned: at epoch %d value %d",
480 			WAITFN, now, nfd));
481 
482 		/* timeout */
483 		if (rfd == 0)
484 			continue;
485 
486 		rebuild_set = true;
487 
488 		/* error */
489 		if (rfd < 0)
490 		{
491 			if (errno == EINTR)
492 				continue;
493 			pcnt++;
494 			smi_log(SMI_LOG_ERR,
495 				"%s() failed (%s), %s",
496 				WAITFN, sm_errstring(errno),
497 				pcnt >= MAX_FAILS_S ? "abort" : "try again");
498 
499 			if (pcnt >= MAX_FAILS_S)
500 				goto err;
501 		}
502 		pcnt = 0;
503 
504 		/* something happened */
505 		for (i = 0; i < nfd; i++)
506 		{
507 			if (pfd[i].revents == 0)
508 				continue;
509 
510 			POOL_LEV_DPRINTF(4, ("%s event on pfd[%d/%d]=%d ",
511 				WAITFN, i, nfd,
512 			WAIT_FD(i)));
513 
514 			/* has a worker signaled an end of task ? */
515 			if (WAIT_FD(i) == RD_PIPE)
516 			{
517 				char evt = 0;
518 				int r = 0;
519 
520 				POOL_LEV_DPRINTF(4,
521 					("PIPE WILL READ evt = %08X %08X",
522 					pfd[i].events, pfd[i].revents));
523 
524 				if ((pfd[i].revents & MI_POLL_RD_FLAGS) != 0)
525 				{
526 					r = read(RD_PIPE, &evt, sizeof(evt));
527 					if (r == sizeof(evt))
528 					{
529 						/* Do nothing */
530 					}
531 				}
532 
533 				POOL_LEV_DPRINTF(4,
534 					("PIPE DONE READ i=[%d] fd=[%d] r=[%d] evt=[%d]",
535 					i, RD_PIPE, r, evt));
536 
537 				if ((pfd[i].revents & ~MI_POLL_RD_FLAGS) != 0)
538 				{
539 					/* Exception handling */
540 				}
541 				continue;
542 			}
543 
544 			/* no ! sendmail wants to send a command */
545 			SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link)
546 			{
547 				if (ctx->ctx_wstate != WKST_WAITING)
548 					continue;
549 
550 				POOL_LEV_DPRINTF(4,
551 					("Checking context sd=%d - fd=%d ",
552 					ctx->ctx_sd , WAIT_FD(i)));
553 
554 				if (ctx->ctx_sd == pfd[i].fd)
555 				{
556 					TASKMGR_LOCK();
557 
558 					POOL_LEV_DPRINTF(4,
559 						("TASK: found %d for fd[%d]=%d",
560 						ctx->ctx_sid, i, WAIT_FD(i)));
561 
562 					if (Tskmgr.tm_nb_idle > 0)
563 					{
564 						ctx->ctx_wstate = WKST_READY_TO_RUN;
565 						TASKMGR_COND_SIGNAL();
566 					}
567 					else
568 					{
569 						ctx->ctx_wstate = WKST_RUNNING;
570 						LAUNCH_WORKER(ctx);
571 					}
572 					TASKMGR_UNLOCK();
573 					break;
574 				}
575 			}
576 
577 			POOL_LEV_DPRINTF(4,
578 				("TASK %s FOUND - Checking PIPE for fd[%d]",
579 				ctx != NULL ? "" : "NOT", WAIT_FD(i)));
580 		}
581 	}
582 
583   err:
584 	if (pfd != NULL)
585 		free(pfd);
586 
587 	Tskmgr.tm_signature = 0;
588 	for (;;)
589 	{
590 		SMFICTX_PTR ctx;
591 
592 		ctx = SM_TAILQ_FIRST(&WRK_CTX_HEAD);
593 		if (ctx == NULL)
594 			break;
595 		mi_close_session(ctx);
596 	}
597 
598 	(void) smutex_destroy(&Tskmgr.tm_w_mutex);
599 	(void) scond_destroy(&Tskmgr.tm_w_cond);
600 
601 	return NULL;
602 }
603 
604 /*
605 **  Look for a task ready to run.
606 **  Value of ctx is NULL or a pointer to a task ready to run.
607 */
608 
609 #define GET_TASK_READY_TO_RUN()					\
610 	SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link)		\
611 	{							\
612 		if (ctx->ctx_wstate == WKST_READY_TO_RUN)	\
613 		{						\
614 			ctx->ctx_wstate = WKST_RUNNING;		\
615 			break;					\
616 		}						\
617 	}
618 
619 /*
620 **  MI_WORKER -- worker thread
621 **	executes tasks distributed by the mi_pool_controller
622 **	or by mi_start_session
623 **
624 **	Parameters:
625 **		arg -- pointer to context structure
626 **
627 **	Returns:
628 **		NULL pointer
629 */
630 
631 static void *
632 mi_worker(arg)
633 	void *arg;
634 {
635 	SMFICTX_PTR ctx;
636 	bool done;
637 	sthread_t t_id;
638 	int r;
639 
640 	ctx = (SMFICTX_PTR) arg;
641 	done = false;
642 	if (ctx != NULL)
643 		ctx->ctx_wstate = WKST_RUNNING;
644 
645 	t_id = sthread_get_id();
646 	if (pthread_detach(t_id) != 0)
647 	{
648 		smi_log(SMI_LOG_ERR, "Failed to detach worker thread");
649 		if (ctx != NULL)
650 			ctx->ctx_wstate = WKST_READY_TO_RUN;
651 		return NULL;
652 	}
653 
654 	TASKMGR_LOCK();
655 	Tskmgr.tm_nb_workers++;
656 	TASKMGR_UNLOCK();
657 
658 	while (!done)
659 	{
660 		if (mi_stop() != MILTER_CONT)
661 			break;
662 
663 		/* let's handle next task... */
664 		if (ctx != NULL)
665 		{
666 			int res;
667 
668 			POOL_LEV_DPRINTF(4,
669 				("worker %d: new task -> let's handle it",
670 				t_id));
671 			res = mi_engine(ctx);
672 			POOL_LEV_DPRINTF(4,
673 				("worker %d: mi_engine returned %d", t_id, res));
674 
675 			TASKMGR_LOCK();
676 			if (res != MI_CONTINUE)
677 			{
678 				ctx->ctx_wstate = WKST_CLOSING;
679 
680 				/*
681 				**  Delete context from linked list of
682 				**  sessions and close session.
683 				*/
684 
685 				mi_close_session(ctx);
686 			}
687 			else
688 			{
689 				ctx->ctx_wstate = WKST_READY_TO_WAIT;
690 
691 				POOL_LEV_DPRINTF(4,
692 					("writing to event pipe..."));
693 
694 				/*
695 				**  Signal task controller to add new session
696 				**  to poll set.
697 				*/
698 
699 				PIPE_SEND_SIGNAL();
700 			}
701 			TASKMGR_UNLOCK();
702 			ctx = NULL;
703 
704 		}
705 
706 		/* check if there is any task waiting to be served */
707 		TASKMGR_LOCK();
708 
709 		GET_TASK_READY_TO_RUN();
710 
711 		/* Got a task? */
712 		if (ctx != NULL)
713 		{
714 			TASKMGR_UNLOCK();
715 			continue;
716 		}
717 
718 		/*
719 		**  if not, let's check if there is enough idle workers
720 		**	if yes: quit
721 		*/
722 
723 		if (Tskmgr.tm_nb_workers > MIN_WORKERS &&
724 		    Tskmgr.tm_nb_idle > MIN_IDLE)
725 			done = true;
726 
727 		POOL_LEV_DPRINTF(4, ("worker %d: checking ... %d %d", t_id,
728 			Tskmgr.tm_nb_workers, Tskmgr.tm_nb_idle + 1));
729 
730 		if (done)
731 		{
732 			POOL_LEV_DPRINTF(4, ("worker %d: quitting... ", t_id));
733 			Tskmgr.tm_nb_workers--;
734 			TASKMGR_UNLOCK();
735 			continue;
736 		}
737 
738 		/*
739 		**  if no task ready to run, wait for another one
740 		*/
741 
742 		Tskmgr.tm_nb_idle++;
743 		TASKMGR_COND_WAIT();
744 		Tskmgr.tm_nb_idle--;
745 
746 		/* look for a task */
747 		GET_TASK_READY_TO_RUN();
748 
749 		TASKMGR_UNLOCK();
750 	}
751 	return NULL;
752 }
753 
754 /*
755 **  MI_LIST_ADD_CTX -- add new session to linked list
756 **
757 **	Parameters:
758 **		ctx -- context structure
759 **
760 **	Returns:
761 **		MI_FAILURE/MI_SUCCESS
762 */
763 
764 static int
765 mi_list_add_ctx(ctx)
766 	SMFICTX_PTR ctx;
767 {
768 	SM_ASSERT(ctx != NULL);
769 	SM_TAILQ_INSERT_TAIL(&WRK_CTX_HEAD, ctx, ctx_link);
770 	return MI_SUCCESS;
771 }
772 
773 /*
774 **  MI_LIST_DEL_CTX -- remove session from linked list when finished
775 **
776 **	Parameters:
777 **		ctx -- context structure
778 **
779 **	Returns:
780 **		MI_FAILURE/MI_SUCCESS
781 */
782 
783 static int
784 mi_list_del_ctx(ctx)
785 	SMFICTX_PTR ctx;
786 {
787 	SM_ASSERT(ctx != NULL);
788 	if (SM_TAILQ_EMPTY(&WRK_CTX_HEAD))
789 		return MI_FAILURE;
790 
791 	SM_TAILQ_REMOVE(&WRK_CTX_HEAD, ctx, ctx_link);
792 	return MI_SUCCESS;
793 }
794 #endif /* _FFR_WORKERS_POOL */
795