xref: /freebsd/contrib/sendmail/libmilter/worker.c (revision 1e413cf93298b5b97441a21d9a50fdcd0ee9945e)
1 /*
2  *  Copyright (c) 2003-2004, 2006 Sendmail, Inc. and its suppliers.
3  *	All rights reserved.
4  *
5  * By using this file, you agree to the terms and conditions set
6  * forth in the LICENSE file which can be found at the top level of
7  * the sendmail distribution.
8  *
9  * Contributed by Jose Marcio Martins da Cruz - Ecole des Mines de Paris
10  *   Jose-Marcio.Martins@ensmp.fr
11  */
12 
13 #include <sm/gen.h>
14 SM_RCSID("@(#)$Id: worker.c,v 8.9 2006/12/18 18:26:51 ca Exp $")
15 
16 #include "libmilter.h"
17 
18 #if _FFR_WORKERS_POOL
19 
20 typedef struct taskmgr_S taskmgr_T;
21 
22 #define TM_SIGNATURE		0x23021957
23 
24 struct taskmgr_S
25 {
26 	long		tm_signature; /* has the controller been initialized */
27 	sthread_t	tm_tid;	/* thread id of controller */
28 	smfi_hd_T	tm_ctx_head; /* head of the linked list of contexts */
29 
30 	int		tm_nb_workers;	/* number of workers in the pool */
31 	int		tm_nb_idle;	/* number of workers waiting */
32 
33 	int		tm_p[2];	/* poll control pipe */
34 
35 	smutex_t	tm_w_mutex;	/* linked list access mutex */
36 	scond_t		tm_w_cond;	/* */
37 };
38 
39 static taskmgr_T     Tskmgr = {0};
40 
41 #define WRK_CTX_HEAD	Tskmgr.tm_ctx_head
42 
43 #define RD_PIPE	(Tskmgr.tm_p[0])
44 #define WR_PIPE	(Tskmgr.tm_p[1])
45 
46 #define PIPE_SEND_SIGNAL()						\
47 	do								\
48 	{								\
49 		char evt = 0x5a;					\
50 		int fd = WR_PIPE;					\
51 		if (write(fd, &evt, sizeof(evt)) != sizeof(evt))	\
52 			smi_log(SMI_LOG_ERR,				\
53 				"Error writing to event pipe: %s",	\
54 				sm_errstring(errno));			\
55 	} while (0)
56 
57 #ifndef USE_PIPE_WAKE_POLL
58 # define USE_PIPE_WAKE_POLL 1
59 #endif /* USE_PIPE_WAKE_POLL */
60 
61 /* poll check periodicity (default 10000 - 10 s) */
62 #define POLL_TIMEOUT   10000
63 
64 /* worker conditional wait timeout (default 10 s) */
65 #define COND_TIMEOUT     10
66 
67 /* functions */
68 static int mi_close_session __P((SMFICTX_PTR));
69 
70 static void *mi_worker __P((void *));
71 static void *mi_pool_controller __P((void *));
72 
73 static int mi_list_add_ctx __P((SMFICTX_PTR));
74 static int mi_list_del_ctx __P((SMFICTX_PTR));
75 
76 /*
77 **  periodicity of cleaning up old sessions (timedout)
78 **	sessions list will be checked to find old inactive
79 **	sessions each DT_CHECK_OLD_SESSIONS sec
80 */
81 
82 #define DT_CHECK_OLD_SESSIONS   600
83 
84 #ifndef OLD_SESSION_TIMEOUT
85 # define OLD_SESSION_TIMEOUT      ctx->ctx_timeout
86 #endif /* OLD_SESSION_TIMEOUT */
87 
88 /* session states - with respect to the pool of workers */
89 #define WKST_INIT		0	/* initial state */
90 #define WKST_READY_TO_RUN	1	/* command ready do be read */
91 #define WKST_RUNNING		2	/* session running on a worker */
92 #define WKST_READY_TO_WAIT	3	/* session just finished by a worker */
93 #define WKST_WAITING		4	/* waiting for new command */
94 #define WKST_CLOSING		5	/* session finished */
95 
96 #ifndef MIN_WORKERS
97 # define MIN_WORKERS	2  /* minimum number of threads to keep around */
98 #endif
99 
100 #define MIN_IDLE	1  /* minimum number of idle threads */
101 
102 
103 /*
104 **  Macros for threads and mutex management
105 */
106 
107 #define TASKMGR_LOCK()							\
108 	do								\
109 	{								\
110 		if (!smutex_lock(&Tskmgr.tm_w_mutex))			\
111 			smi_log(SMI_LOG_ERR, "TASKMGR_LOCK error");	\
112 	} while (0)
113 
114 #define TASKMGR_UNLOCK()						\
115 	do								\
116 	{								\
117 		if (!smutex_unlock(&Tskmgr.tm_w_mutex))			\
118 			smi_log(SMI_LOG_ERR, "TASKMGR_UNLOCK error");	\
119 	} while (0)
120 
121 #define	TASKMGR_COND_WAIT()						\
122 	scond_timedwait(&Tskmgr.tm_w_cond, &Tskmgr.tm_w_mutex, COND_TIMEOUT)
123 
124 #define	TASKMGR_COND_SIGNAL()						\
125 	do								\
126 	{								\
127 		if (scond_signal(&Tskmgr.tm_w_cond) != 0)		\
128 			smi_log(SMI_LOG_ERR, "TASKMGR_COND_SIGNAL error"); \
129 	} while (0)
130 
131 #define LAUNCH_WORKER(ctx)						\
132 	do								\
133 	{								\
134 		int r;							\
135 		sthread_t tid;						\
136 									\
137 		if ((r = thread_create(&tid, mi_worker, ctx)) != 0)	\
138 			smi_log(SMI_LOG_ERR, "LAUNCH_WORKER error: %s",\
139 				sm_errstring(r));			\
140 	} while (0)
141 
142 #if POOL_DEBUG
143 # define POOL_LEV_DPRINTF(lev, x)					\
144 	do {								\
145 		if ((lev) < ctx->ctx_dbg)				\
146 			sm_dprintf x;					\
147 	} while (0)
148 #else /* POOL_DEBUG */
149 # define POOL_LEV_DPRINTF(lev, x)
150 #endif /* POOL_DEBUG */
151 
152 /*
153 **  MI_START_SESSION -- Start a session in the pool of workers
154 **
155 **	Parameters:
156 **		ctx -- context structure
157 **
158 **	Returns:
159 **		MI_SUCCESS/MI_FAILURE
160 */
161 
162 int
163 mi_start_session(ctx)
164 	SMFICTX_PTR ctx;
165 {
166 	static long id = 0;
167 
168 	SM_ASSERT(Tskmgr.tm_signature == TM_SIGNATURE);
169 	SM_ASSERT(ctx != NULL);
170 	POOL_LEV_DPRINTF(4, ("PIPE r=[%d] w=[%d]", RD_PIPE, WR_PIPE));
171 	TASKMGR_LOCK();
172 
173 	if (mi_list_add_ctx(ctx) != MI_SUCCESS)
174 	{
175 		TASKMGR_UNLOCK();
176 		return MI_FAILURE;
177 	}
178 
179 	ctx->ctx_sid = id++;
180 
181 	/* if there is an idle worker, signal it, otherwise start new worker */
182 	if (Tskmgr.tm_nb_idle > 0)
183 	{
184 		ctx->ctx_wstate = WKST_READY_TO_RUN;
185 		TASKMGR_COND_SIGNAL();
186 	}
187 	else
188 	{
189 		ctx->ctx_wstate = WKST_RUNNING;
190 		LAUNCH_WORKER(ctx);
191 	}
192 	TASKMGR_UNLOCK();
193 	return MI_SUCCESS;
194 }
195 
196 /*
197 **  MI_CLOSE_SESSION -- Close a session and clean up data structures
198 **
199 **	Parameters:
200 **		ctx -- context structure
201 **
202 **	Returns:
203 **		MI_SUCCESS/MI_FAILURE
204 */
205 
206 static int
207 mi_close_session(ctx)
208 	SMFICTX_PTR ctx;
209 {
210 	SM_ASSERT(ctx != NULL);
211 
212 	(void) mi_list_del_ctx(ctx);
213 	if (ValidSocket(ctx->ctx_sd))
214 	{
215 		(void) closesocket(ctx->ctx_sd);
216 		ctx->ctx_sd = INVALID_SOCKET;
217 	}
218 	if (ctx->ctx_reply != NULL)
219 	{
220 		free(ctx->ctx_reply);
221 		ctx->ctx_reply = NULL;
222 	}
223 	if (ctx->ctx_privdata != NULL)
224 	{
225 		smi_log(SMI_LOG_WARN, "%s: private data not NULL",
226 			ctx->ctx_smfi->xxfi_name);
227 	}
228 	mi_clr_macros(ctx, 0);
229 	free(ctx);
230 
231 	return MI_SUCCESS;
232 }
233 
234 /*
235 **  MI_POOL_CONTROLER_INIT -- Launch the worker pool controller
236 **		Must be called before starting sessions.
237 **
238 **	Parameters:
239 **		none
240 **
241 **	Returns:
242 **		MI_SUCCESS/MI_FAILURE
243 */
244 
245 int
246 mi_pool_controller_init()
247 {
248 	sthread_t tid;
249 	int r, i;
250 
251 	if (Tskmgr.tm_signature == TM_SIGNATURE)
252 		return MI_SUCCESS;
253 
254 	SM_TAILQ_INIT(&WRK_CTX_HEAD);
255 	Tskmgr.tm_tid = (sthread_t) -1;
256 	Tskmgr.tm_nb_workers = 0;
257 	Tskmgr.tm_nb_idle = 0;
258 
259 	if (pipe(Tskmgr.tm_p) != 0)
260 	{
261 		smi_log(SMI_LOG_ERR, "can't create event pipe: %s",
262 			sm_errstring(r));
263 		return MI_FAILURE;
264 	}
265 
266 	POOL_LEV_DPRINTF(4, ("PIPE r=[%d] w=[%d]", RD_PIPE, WR_PIPE));
267 
268 	(void) smutex_init(&Tskmgr.tm_w_mutex);
269 	(void) scond_init(&Tskmgr.tm_w_cond);
270 
271 	/* Launch the pool controller */
272 	if ((r = thread_create(&tid, mi_pool_controller, (void *) NULL)) != 0)
273 	{
274 		smi_log(SMI_LOG_ERR, "can't create controller thread: %s",
275 			sm_errstring(r));
276 		return MI_FAILURE;
277 	}
278 	Tskmgr.tm_tid = tid;
279 	Tskmgr.tm_signature = TM_SIGNATURE;
280 
281 	/* Create the pool of workers */
282 	for (i = 0; i < MIN_WORKERS; i++)
283 	{
284 		if ((r = thread_create(&tid, mi_worker, (void *) NULL)) != 0)
285 		{
286 			smi_log(SMI_LOG_ERR, "can't create workers crew: %s",
287 				sm_errstring(r));
288 			return MI_FAILURE;
289 		}
290 	}
291 
292 	return MI_SUCCESS;
293 }
294 
295 /*
296 **  MI_POOL_CONTROLLER -- manage the pool of workers
297 **	This thread must be running when listener begins
298 **	starting sessions
299 **
300 **	Parameters:
301 **		arg -- unused
302 **
303 **	Returns:
304 **		NULL
305 **
306 **	Control flow:
307 **		for (;;)
308 **			Look for timed out sessions
309 **			Select sessions to wait for sendmail command
310 **			Poll set of file descriptors
311 **			if timeout
312 **				continue
313 **			For each file descriptor ready
314 **				launch new thread if no worker available
315 **				else
316 **				signal waiting worker
317 */
318 
319 /* Poll structure array (pollfd) size step */
320 #define PFD_STEP	256
321 
322 #define WAIT_FD(i)	(pfd[i].fd)
323 #define WAITFN		"POLL"
324 
325 static void *
326 mi_pool_controller(arg)
327 	void *arg;
328 {
329 	struct pollfd *pfd = NULL;
330 	int dim_pfd = 0;
331 	bool rebuild_set = true;
332 	int pcnt = 0; /* error count for poll() failures */
333 
334 	Tskmgr.tm_tid = sthread_get_id();
335 	if (pthread_detach(Tskmgr.tm_tid) != 0)
336 	{
337 		smi_log(SMI_LOG_ERR, "Failed to detach pool controller thread");
338 		return NULL;
339 	}
340 
341 	pfd = (struct pollfd *) malloc(PFD_STEP * sizeof(struct pollfd));
342 	if (pfd == NULL)
343 	{
344 		smi_log(SMI_LOG_ERR, "Failed to malloc pollfd array: %s",
345 			sm_errstring(errno));
346 		return NULL;
347 	}
348 	dim_pfd = PFD_STEP;
349 
350 	for (;;)
351 	{
352 		SMFICTX_PTR ctx;
353 		int nfd, rfd, i;
354 		time_t now;
355 		time_t lastcheck;
356 
357 		POOL_LEV_DPRINTF(4, ("Let's %s again...", WAITFN));
358 
359 		if (mi_stop() != MILTER_CONT)
360 			break;
361 
362 		TASKMGR_LOCK();
363 
364 		now = time(NULL);
365 
366 		/* check for timed out sessions? */
367 		if (lastcheck + DT_CHECK_OLD_SESSIONS < now)
368 		{
369 			SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link)
370 			{
371 				if (ctx->ctx_wstate == WKST_WAITING)
372 				{
373 					if (ctx->ctx_wait == 0)
374 					{
375 						ctx->ctx_wait = now;
376 						continue;
377 					}
378 
379 					/* if session timed out, close it */
380 					if (ctx->ctx_wait + OLD_SESSION_TIMEOUT
381 					    < now)
382 					{
383 						sfsistat (*fi_close) __P((SMFICTX *));
384 
385 						POOL_LEV_DPRINTF(4,
386 							("Closing old connection: sd=%d id=%d",
387 							ctx->ctx_sd,
388 							ctx->ctx_sid));
389 
390 						if ((fi_close = ctx->ctx_smfi->xxfi_close) != NULL)
391 							(void) (*fi_close)(ctx);
392 
393 						mi_close_session(ctx);
394 						ctx = SM_TAILQ_FIRST(&WRK_CTX_HEAD);
395 						continue;
396 					}
397 				}
398 			}
399 			lastcheck = now;
400 		}
401 
402 		if (rebuild_set)
403 		{
404 			/*
405 			**  Initialize poll set.
406 			**  Insert into the poll set the file descriptors of
407 			**  all sessions waiting for a command from sendmail.
408 			*/
409 
410 			nfd = 0;
411 
412 			/* begin with worker pipe */
413 			pfd[nfd].fd = RD_PIPE;
414 			pfd[nfd].events = MI_POLL_RD_FLAGS;
415 			pfd[nfd].revents = 0;
416 			nfd++;
417 
418 			SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link)
419 			{
420 				/*
421 				**  update ctx_wait - start of wait moment -
422 				**  for timeout
423 				*/
424 
425 				if (ctx->ctx_wstate == WKST_READY_TO_WAIT)
426 					ctx->ctx_wait = now;
427 
428 				/* add the session to the pollfd array? */
429 				if ((ctx->ctx_wstate == WKST_READY_TO_WAIT) ||
430 				    (ctx->ctx_wstate == WKST_WAITING))
431 				{
432 					/*
433 					**  Resize the pollfd array if it
434 					**  isn't large enough.
435 					*/
436 
437 					if (nfd >= dim_pfd)
438 					{
439 						struct pollfd *tpfd;
440 						size_t new;
441 
442 						new = (dim_pfd + PFD_STEP) *
443 							sizeof(*tpfd);
444 						tpfd = (struct pollfd *)
445 							realloc(pfd, new);
446 						if (tpfd != NULL)
447 						{
448 							pfd = tpfd;
449 							dim_pfd += PFD_STEP;
450 						}
451 						else
452 						{
453 							smi_log(SMI_LOG_ERR,
454 								"Failed to realloc pollfd array:%s",
455 								sm_errstring(errno));
456 						}
457 					}
458 
459 					/* add the session to pollfd array */
460 					if (nfd < dim_pfd)
461 					{
462 						ctx->ctx_wstate = WKST_WAITING;
463 						pfd[nfd].fd = ctx->ctx_sd;
464 						pfd[nfd].events = MI_POLL_RD_FLAGS;
465 						pfd[nfd].revents = 0;
466 						nfd++;
467 					}
468 				}
469 			}
470 		}
471 
472 		TASKMGR_UNLOCK();
473 
474 		/* Everything is ready, let's wait for an event */
475 		rfd = poll(pfd, nfd, POLL_TIMEOUT);
476 
477 		POOL_LEV_DPRINTF(4, ("%s returned: at epoch %d value %d",
478 			WAITFN, now, nfd));
479 
480 		/* timeout */
481 		if (rfd == 0)
482 			continue;
483 
484 		rebuild_set = true;
485 
486 		/* error */
487 		if (rfd < 0)
488 		{
489 			if (errno == EINTR)
490 				continue;
491 			pcnt++;
492 			smi_log(SMI_LOG_ERR,
493 				"%s() failed (%s), %s",
494 				WAITFN, sm_errstring(errno),
495 				pcnt >= MAX_FAILS_S ? "abort" : "try again");
496 
497 			if (pcnt >= MAX_FAILS_S)
498 				goto err;
499 		}
500 		pcnt = 0;
501 
502 		/* something happened */
503 		for (i = 0; i < nfd; i++)
504 		{
505 			if (pfd[i].revents == 0)
506 				continue;
507 
508 			POOL_LEV_DPRINTF(4, ("%s event on pfd[%d/%d]=%d ",
509 				WAITFN, i, nfd,
510 			WAIT_FD(i)));
511 
512 			/* has a worker signaled an end of task ? */
513 			if (WAIT_FD(i) == RD_PIPE)
514 			{
515 				char evt = 0;
516 				int r = 0;
517 
518 				POOL_LEV_DPRINTF(4,
519 					("PIPE WILL READ evt = %08X %08X",
520 					pfd[i].events, pfd[i].revents));
521 
522 				if ((pfd[i].revents & MI_POLL_RD_FLAGS) != 0)
523 				{
524 					r = read(RD_PIPE, &evt, sizeof(evt));
525 					if (r == sizeof(evt))
526 					{
527 						/* Do nothing */
528 					}
529 				}
530 
531 				POOL_LEV_DPRINTF(4,
532 					("PIPE DONE READ i=[%d] fd=[%d] r=[%d] evt=[%d]",
533 					i, RD_PIPE, r, evt));
534 
535 				if ((pfd[i].revents & ~MI_POLL_RD_FLAGS) != 0)
536 				{
537 					/* Exception handling */
538 				}
539 				continue;
540 			}
541 
542 			/* no ! sendmail wants to send a command */
543 			SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link)
544 			{
545 				if (ctx->ctx_wstate != WKST_WAITING)
546 					continue;
547 
548 				POOL_LEV_DPRINTF(4,
549 					("Checking context sd=%d - fd=%d ",
550 					ctx->ctx_sd , WAIT_FD(i)));
551 
552 				if (ctx->ctx_sd == pfd[i].fd)
553 				{
554 					TASKMGR_LOCK();
555 
556 					POOL_LEV_DPRINTF(4,
557 						("TASK: found %d for fd[%d]=%d",
558 						ctx->ctx_sid, i, WAIT_FD(i)));
559 
560 					if (Tskmgr.tm_nb_idle > 0)
561 					{
562 						ctx->ctx_wstate = WKST_READY_TO_RUN;
563 						TASKMGR_COND_SIGNAL();
564 					}
565 					else
566 					{
567 						ctx->ctx_wstate = WKST_RUNNING;
568 						LAUNCH_WORKER(ctx);
569 					}
570 					TASKMGR_UNLOCK();
571 					break;
572 				}
573 			}
574 
575 			POOL_LEV_DPRINTF(4,
576 				("TASK %s FOUND - Checking PIPE for fd[%d]",
577 				ctx != NULL ? "" : "NOT", WAIT_FD(i)));
578 		}
579 	}
580 
581   err:
582 	if (pfd != NULL)
583 		free(pfd);
584 
585 	Tskmgr.tm_signature = 0;
586 	for (;;)
587 	{
588 		SMFICTX_PTR ctx;
589 
590 		ctx = SM_TAILQ_FIRST(&WRK_CTX_HEAD);
591 		if (ctx == NULL)
592 			break;
593 		mi_close_session(ctx);
594 	}
595 
596 	(void) smutex_destroy(&Tskmgr.tm_w_mutex);
597 	(void) scond_destroy(&Tskmgr.tm_w_cond);
598 
599 	return NULL;
600 }
601 
602 /*
603 **  Look for a task ready to run.
604 **  Value of ctx is NULL or a pointer to a task ready to run.
605 */
606 
607 #define GET_TASK_READY_TO_RUN()					\
608 	SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link)		\
609 	{							\
610 		if (ctx->ctx_wstate == WKST_READY_TO_RUN)	\
611 		{						\
612 			ctx->ctx_wstate = WKST_RUNNING;		\
613 			break;					\
614 		}						\
615 	}
616 
617 /*
618 **  MI_WORKER -- worker thread
619 **	executes tasks distributed by the mi_pool_controller
620 **	or by mi_start_session
621 **
622 **	Parameters:
623 **		arg -- pointer to context structure
624 **
625 **	Returns:
626 **		NULL pointer
627 */
628 
629 static void *
630 mi_worker(arg)
631 	void *arg;
632 {
633 	SMFICTX_PTR ctx;
634 	bool done;
635 	sthread_t t_id;
636 	int r;
637 
638 	ctx = (SMFICTX_PTR) arg;
639 	done = false;
640 	if (ctx != NULL)
641 		ctx->ctx_wstate = WKST_RUNNING;
642 
643 	t_id = sthread_get_id();
644 	if (pthread_detach(t_id) != 0)
645 	{
646 		smi_log(SMI_LOG_ERR, "Failed to detach worker thread");
647 		if (ctx != NULL)
648 			ctx->ctx_wstate = WKST_READY_TO_RUN;
649 		return NULL;
650 	}
651 
652 	TASKMGR_LOCK();
653 	Tskmgr.tm_nb_workers++;
654 	TASKMGR_UNLOCK();
655 
656 	while (!done)
657 	{
658 		if (mi_stop() != MILTER_CONT)
659 			break;
660 
661 		/* let's handle next task... */
662 		if (ctx != NULL)
663 		{
664 			int res;
665 
666 			POOL_LEV_DPRINTF(4,
667 				("worker %d: new task -> let's handle it",
668 				t_id));
669 			res = mi_engine(ctx);
670 			POOL_LEV_DPRINTF(4,
671 				("worker %d: mi_engine returned %d", t_id, res));
672 
673 			TASKMGR_LOCK();
674 			if (res != MI_CONTINUE)
675 			{
676 				ctx->ctx_wstate = WKST_CLOSING;
677 
678 				/*
679 				**  Delete context from linked list of
680 				**  sessions and close session.
681 				*/
682 
683 				mi_close_session(ctx);
684 			}
685 			else
686 			{
687 				ctx->ctx_wstate = WKST_READY_TO_WAIT;
688 
689 				POOL_LEV_DPRINTF(4,
690 					("writing to event pipe..."));
691 
692 				/*
693 				**  Signal task controller to add new session
694 				**  to poll set.
695 				*/
696 
697 				PIPE_SEND_SIGNAL();
698 			}
699 			TASKMGR_UNLOCK();
700 			ctx = NULL;
701 
702 		}
703 
704 		/* check if there is any task waiting to be served */
705 		TASKMGR_LOCK();
706 
707 		GET_TASK_READY_TO_RUN();
708 
709 		/* Got a task? */
710 		if (ctx != NULL)
711 		{
712 			TASKMGR_UNLOCK();
713 			continue;
714 		}
715 
716 		/*
717 		**  if not, let's check if there is enough idle workers
718 		**	if yes: quit
719 		*/
720 
721 		if (Tskmgr.tm_nb_workers > MIN_WORKERS &&
722 		    Tskmgr.tm_nb_idle > MIN_IDLE)
723 			done = true;
724 
725 		POOL_LEV_DPRINTF(4, ("worker %d: checking ... %d %d", t_id,
726 			Tskmgr.tm_nb_workers, Tskmgr.tm_nb_idle + 1));
727 
728 		if (done)
729 		{
730 			POOL_LEV_DPRINTF(4, ("worker %d: quitting... ", t_id));
731 			Tskmgr.tm_nb_workers--;
732 			TASKMGR_UNLOCK();
733 			continue;
734 		}
735 
736 		/*
737 		**  if no task ready to run, wait for another one
738 		*/
739 
740 		Tskmgr.tm_nb_idle++;
741 		TASKMGR_COND_WAIT();
742 		Tskmgr.tm_nb_idle--;
743 
744 		/* look for a task */
745 		GET_TASK_READY_TO_RUN();
746 
747 		TASKMGR_UNLOCK();
748 	}
749 	return NULL;
750 }
751 
752 /*
753 **  MI_LIST_ADD_CTX -- add new session to linked list
754 **
755 **	Parameters:
756 **		ctx -- context structure
757 **
758 **	Returns:
759 **		MI_FAILURE/MI_SUCCESS
760 */
761 
762 static int
763 mi_list_add_ctx(ctx)
764 	SMFICTX_PTR ctx;
765 {
766 	SM_ASSERT(ctx != NULL);
767 	SM_TAILQ_INSERT_TAIL(&WRK_CTX_HEAD, ctx, ctx_link);
768 	return MI_SUCCESS;
769 }
770 
771 /*
772 **  MI_LIST_DEL_CTX -- remove session from linked list when finished
773 **
774 **	Parameters:
775 **		ctx -- context structure
776 **
777 **	Returns:
778 **		MI_FAILURE/MI_SUCCESS
779 */
780 
781 static int
782 mi_list_del_ctx(ctx)
783 	SMFICTX_PTR ctx;
784 {
785 	SM_ASSERT(ctx != NULL);
786 	if (SM_TAILQ_EMPTY(&WRK_CTX_HEAD))
787 		return MI_FAILURE;
788 
789 	SM_TAILQ_REMOVE(&WRK_CTX_HEAD, ctx, ctx_link);
790 	return MI_SUCCESS;
791 }
792 #endif /* _FFR_WORKERS_POOL */
793