/* * Copyright (c) 2003-2004, 2006 Sendmail, Inc. and its suppliers. * All rights reserved. * * By using this file, you agree to the terms and conditions set * forth in the LICENSE file which can be found at the top level of * the sendmail distribution. * * Contributed by Jose Marcio Martins da Cruz - Ecole des Mines de Paris * Jose-Marcio.Martins@ensmp.fr */ #pragma ident "%Z%%M% %I% %E% SMI" #include SM_RCSID("@(#)$Id: worker.c,v 8.9 2006/12/18 18:26:51 ca Exp $") #include "libmilter.h" #if _FFR_WORKERS_POOL typedef struct taskmgr_S taskmgr_T; #define TM_SIGNATURE 0x23021957 struct taskmgr_S { long tm_signature; /* has the controller been initialized */ sthread_t tm_tid; /* thread id of controller */ smfi_hd_T tm_ctx_head; /* head of the linked list of contexts */ int tm_nb_workers; /* number of workers in the pool */ int tm_nb_idle; /* number of workers waiting */ int tm_p[2]; /* poll control pipe */ smutex_t tm_w_mutex; /* linked list access mutex */ scond_t tm_w_cond; /* */ }; static taskmgr_T Tskmgr = {0}; #define WRK_CTX_HEAD Tskmgr.tm_ctx_head #define RD_PIPE (Tskmgr.tm_p[0]) #define WR_PIPE (Tskmgr.tm_p[1]) #define PIPE_SEND_SIGNAL() \ do \ { \ char evt = 0x5a; \ int fd = WR_PIPE; \ if (write(fd, &evt, sizeof(evt)) != sizeof(evt)) \ smi_log(SMI_LOG_ERR, \ "Error writing to event pipe: %s", \ sm_errstring(errno)); \ } while (0) #ifndef USE_PIPE_WAKE_POLL # define USE_PIPE_WAKE_POLL 1 #endif /* USE_PIPE_WAKE_POLL */ /* poll check periodicity (default 10000 - 10 s) */ #define POLL_TIMEOUT 10000 /* worker conditional wait timeout (default 10 s) */ #define COND_TIMEOUT 10 /* functions */ static int mi_close_session __P((SMFICTX_PTR)); static void *mi_worker __P((void *)); static void *mi_pool_controller __P((void *)); static int mi_list_add_ctx __P((SMFICTX_PTR)); static int mi_list_del_ctx __P((SMFICTX_PTR)); /* ** periodicity of cleaning up old sessions (timedout) ** sessions list will be checked to find old inactive ** sessions each DT_CHECK_OLD_SESSIONS sec */ #define DT_CHECK_OLD_SESSIONS 600 #ifndef OLD_SESSION_TIMEOUT # define OLD_SESSION_TIMEOUT ctx->ctx_timeout #endif /* OLD_SESSION_TIMEOUT */ /* session states - with respect to the pool of workers */ #define WKST_INIT 0 /* initial state */ #define WKST_READY_TO_RUN 1 /* command ready do be read */ #define WKST_RUNNING 2 /* session running on a worker */ #define WKST_READY_TO_WAIT 3 /* session just finished by a worker */ #define WKST_WAITING 4 /* waiting for new command */ #define WKST_CLOSING 5 /* session finished */ #ifndef MIN_WORKERS # define MIN_WORKERS 2 /* minimum number of threads to keep around */ #endif #define MIN_IDLE 1 /* minimum number of idle threads */ /* ** Macros for threads and mutex management */ #define TASKMGR_LOCK() \ do \ { \ if (!smutex_lock(&Tskmgr.tm_w_mutex)) \ smi_log(SMI_LOG_ERR, "TASKMGR_LOCK error"); \ } while (0) #define TASKMGR_UNLOCK() \ do \ { \ if (!smutex_unlock(&Tskmgr.tm_w_mutex)) \ smi_log(SMI_LOG_ERR, "TASKMGR_UNLOCK error"); \ } while (0) #define TASKMGR_COND_WAIT() \ scond_timedwait(&Tskmgr.tm_w_cond, &Tskmgr.tm_w_mutex, COND_TIMEOUT) #define TASKMGR_COND_SIGNAL() \ do \ { \ if (scond_signal(&Tskmgr.tm_w_cond) != 0) \ smi_log(SMI_LOG_ERR, "TASKMGR_COND_SIGNAL error"); \ } while (0) #define LAUNCH_WORKER(ctx) \ do \ { \ int r; \ sthread_t tid; \ \ if ((r = thread_create(&tid, mi_worker, ctx)) != 0) \ smi_log(SMI_LOG_ERR, "LAUNCH_WORKER error: %s",\ sm_errstring(r)); \ } while (0) #if POOL_DEBUG # define POOL_LEV_DPRINTF(lev, x) \ do { \ if ((lev) < ctx->ctx_dbg) \ sm_dprintf x; \ } while (0) #else /* POOL_DEBUG */ # define POOL_LEV_DPRINTF(lev, x) #endif /* POOL_DEBUG */ /* ** MI_START_SESSION -- Start a session in the pool of workers ** ** Parameters: ** ctx -- context structure ** ** Returns: ** MI_SUCCESS/MI_FAILURE */ int mi_start_session(ctx) SMFICTX_PTR ctx; { static long id = 0; SM_ASSERT(Tskmgr.tm_signature == TM_SIGNATURE); SM_ASSERT(ctx != NULL); POOL_LEV_DPRINTF(4, ("PIPE r=[%d] w=[%d]", RD_PIPE, WR_PIPE)); TASKMGR_LOCK(); if (mi_list_add_ctx(ctx) != MI_SUCCESS) { TASKMGR_UNLOCK(); return MI_FAILURE; } ctx->ctx_sid = id++; /* if there is an idle worker, signal it, otherwise start new worker */ if (Tskmgr.tm_nb_idle > 0) { ctx->ctx_wstate = WKST_READY_TO_RUN; TASKMGR_COND_SIGNAL(); } else { ctx->ctx_wstate = WKST_RUNNING; LAUNCH_WORKER(ctx); } TASKMGR_UNLOCK(); return MI_SUCCESS; } /* ** MI_CLOSE_SESSION -- Close a session and clean up data structures ** ** Parameters: ** ctx -- context structure ** ** Returns: ** MI_SUCCESS/MI_FAILURE */ static int mi_close_session(ctx) SMFICTX_PTR ctx; { SM_ASSERT(ctx != NULL); (void) mi_list_del_ctx(ctx); if (ValidSocket(ctx->ctx_sd)) { (void) closesocket(ctx->ctx_sd); ctx->ctx_sd = INVALID_SOCKET; } if (ctx->ctx_reply != NULL) { free(ctx->ctx_reply); ctx->ctx_reply = NULL; } if (ctx->ctx_privdata != NULL) { smi_log(SMI_LOG_WARN, "%s: private data not NULL", ctx->ctx_smfi->xxfi_name); } mi_clr_macros(ctx, 0); free(ctx); return MI_SUCCESS; } /* ** MI_POOL_CONTROLER_INIT -- Launch the worker pool controller ** Must be called before starting sessions. ** ** Parameters: ** none ** ** Returns: ** MI_SUCCESS/MI_FAILURE */ int mi_pool_controller_init() { sthread_t tid; int r, i; if (Tskmgr.tm_signature == TM_SIGNATURE) return MI_SUCCESS; SM_TAILQ_INIT(&WRK_CTX_HEAD); Tskmgr.tm_tid = (sthread_t) -1; Tskmgr.tm_nb_workers = 0; Tskmgr.tm_nb_idle = 0; if (pipe(Tskmgr.tm_p) != 0) { smi_log(SMI_LOG_ERR, "can't create event pipe: %s", sm_errstring(r)); return MI_FAILURE; } POOL_LEV_DPRINTF(4, ("PIPE r=[%d] w=[%d]", RD_PIPE, WR_PIPE)); (void) smutex_init(&Tskmgr.tm_w_mutex); (void) scond_init(&Tskmgr.tm_w_cond); /* Launch the pool controller */ if ((r = thread_create(&tid, mi_pool_controller, (void *) NULL)) != 0) { smi_log(SMI_LOG_ERR, "can't create controller thread: %s", sm_errstring(r)); return MI_FAILURE; } Tskmgr.tm_tid = tid; Tskmgr.tm_signature = TM_SIGNATURE; /* Create the pool of workers */ for (i = 0; i < MIN_WORKERS; i++) { if ((r = thread_create(&tid, mi_worker, (void *) NULL)) != 0) { smi_log(SMI_LOG_ERR, "can't create workers crew: %s", sm_errstring(r)); return MI_FAILURE; } } return MI_SUCCESS; } /* ** MI_POOL_CONTROLLER -- manage the pool of workers ** This thread must be running when listener begins ** starting sessions ** ** Parameters: ** arg -- unused ** ** Returns: ** NULL ** ** Control flow: ** for (;;) ** Look for timed out sessions ** Select sessions to wait for sendmail command ** Poll set of file descriptors ** if timeout ** continue ** For each file descriptor ready ** launch new thread if no worker available ** else ** signal waiting worker */ /* Poll structure array (pollfd) size step */ #define PFD_STEP 256 #define WAIT_FD(i) (pfd[i].fd) #define WAITFN "POLL" static void * mi_pool_controller(arg) void *arg; { struct pollfd *pfd = NULL; int dim_pfd = 0; bool rebuild_set = true; int pcnt = 0; /* error count for poll() failures */ Tskmgr.tm_tid = sthread_get_id(); if (pthread_detach(Tskmgr.tm_tid) != 0) { smi_log(SMI_LOG_ERR, "Failed to detach pool controller thread"); return NULL; } pfd = (struct pollfd *) malloc(PFD_STEP * sizeof(struct pollfd)); if (pfd == NULL) { smi_log(SMI_LOG_ERR, "Failed to malloc pollfd array: %s", sm_errstring(errno)); return NULL; } dim_pfd = PFD_STEP; for (;;) { SMFICTX_PTR ctx; int nfd, rfd, i; time_t now; time_t lastcheck; POOL_LEV_DPRINTF(4, ("Let's %s again...", WAITFN)); if (mi_stop() != MILTER_CONT) break; TASKMGR_LOCK(); now = time(NULL); /* check for timed out sessions? */ if (lastcheck + DT_CHECK_OLD_SESSIONS < now) { SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link) { if (ctx->ctx_wstate == WKST_WAITING) { if (ctx->ctx_wait == 0) { ctx->ctx_wait = now; continue; } /* if session timed out, close it */ if (ctx->ctx_wait + OLD_SESSION_TIMEOUT < now) { sfsistat (*fi_close) __P((SMFICTX *)); POOL_LEV_DPRINTF(4, ("Closing old connection: sd=%d id=%d", ctx->ctx_sd, ctx->ctx_sid)); if ((fi_close = ctx->ctx_smfi->xxfi_close) != NULL) (void) (*fi_close)(ctx); mi_close_session(ctx); ctx = SM_TAILQ_FIRST(&WRK_CTX_HEAD); continue; } } } lastcheck = now; } if (rebuild_set) { /* ** Initialize poll set. ** Insert into the poll set the file descriptors of ** all sessions waiting for a command from sendmail. */ nfd = 0; /* begin with worker pipe */ pfd[nfd].fd = RD_PIPE; pfd[nfd].events = MI_POLL_RD_FLAGS; pfd[nfd].revents = 0; nfd++; SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link) { /* ** update ctx_wait - start of wait moment - ** for timeout */ if (ctx->ctx_wstate == WKST_READY_TO_WAIT) ctx->ctx_wait = now; /* add the session to the pollfd array? */ if ((ctx->ctx_wstate == WKST_READY_TO_WAIT) || (ctx->ctx_wstate == WKST_WAITING)) { /* ** Resize the pollfd array if it ** isn't large enough. */ if (nfd >= dim_pfd) { struct pollfd *tpfd; size_t new; new = (dim_pfd + PFD_STEP) * sizeof(*tpfd); tpfd = (struct pollfd *) realloc(pfd, new); if (tpfd != NULL) { pfd = tpfd; dim_pfd += PFD_STEP; } else { smi_log(SMI_LOG_ERR, "Failed to realloc pollfd array:%s", sm_errstring(errno)); } } /* add the session to pollfd array */ if (nfd < dim_pfd) { ctx->ctx_wstate = WKST_WAITING; pfd[nfd].fd = ctx->ctx_sd; pfd[nfd].events = MI_POLL_RD_FLAGS; pfd[nfd].revents = 0; nfd++; } } } } TASKMGR_UNLOCK(); /* Everything is ready, let's wait for an event */ rfd = poll(pfd, nfd, POLL_TIMEOUT); POOL_LEV_DPRINTF(4, ("%s returned: at epoch %d value %d", WAITFN, now, nfd)); /* timeout */ if (rfd == 0) continue; rebuild_set = true; /* error */ if (rfd < 0) { if (errno == EINTR) continue; pcnt++; smi_log(SMI_LOG_ERR, "%s() failed (%s), %s", WAITFN, sm_errstring(errno), pcnt >= MAX_FAILS_S ? "abort" : "try again"); if (pcnt >= MAX_FAILS_S) goto err; } pcnt = 0; /* something happened */ for (i = 0; i < nfd; i++) { if (pfd[i].revents == 0) continue; POOL_LEV_DPRINTF(4, ("%s event on pfd[%d/%d]=%d ", WAITFN, i, nfd, WAIT_FD(i))); /* has a worker signaled an end of task ? */ if (WAIT_FD(i) == RD_PIPE) { char evt = 0; int r = 0; POOL_LEV_DPRINTF(4, ("PIPE WILL READ evt = %08X %08X", pfd[i].events, pfd[i].revents)); if ((pfd[i].revents & MI_POLL_RD_FLAGS) != 0) { r = read(RD_PIPE, &evt, sizeof(evt)); if (r == sizeof(evt)) { /* Do nothing */ } } POOL_LEV_DPRINTF(4, ("PIPE DONE READ i=[%d] fd=[%d] r=[%d] evt=[%d]", i, RD_PIPE, r, evt)); if ((pfd[i].revents & ~MI_POLL_RD_FLAGS) != 0) { /* Exception handling */ } continue; } /* no ! sendmail wants to send a command */ SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link) { if (ctx->ctx_wstate != WKST_WAITING) continue; POOL_LEV_DPRINTF(4, ("Checking context sd=%d - fd=%d ", ctx->ctx_sd , WAIT_FD(i))); if (ctx->ctx_sd == pfd[i].fd) { TASKMGR_LOCK(); POOL_LEV_DPRINTF(4, ("TASK: found %d for fd[%d]=%d", ctx->ctx_sid, i, WAIT_FD(i))); if (Tskmgr.tm_nb_idle > 0) { ctx->ctx_wstate = WKST_READY_TO_RUN; TASKMGR_COND_SIGNAL(); } else { ctx->ctx_wstate = WKST_RUNNING; LAUNCH_WORKER(ctx); } TASKMGR_UNLOCK(); break; } } POOL_LEV_DPRINTF(4, ("TASK %s FOUND - Checking PIPE for fd[%d]", ctx != NULL ? "" : "NOT", WAIT_FD(i))); } } err: if (pfd != NULL) free(pfd); Tskmgr.tm_signature = 0; for (;;) { SMFICTX_PTR ctx; ctx = SM_TAILQ_FIRST(&WRK_CTX_HEAD); if (ctx == NULL) break; mi_close_session(ctx); } (void) smutex_destroy(&Tskmgr.tm_w_mutex); (void) scond_destroy(&Tskmgr.tm_w_cond); return NULL; } /* ** Look for a task ready to run. ** Value of ctx is NULL or a pointer to a task ready to run. */ #define GET_TASK_READY_TO_RUN() \ SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link) \ { \ if (ctx->ctx_wstate == WKST_READY_TO_RUN) \ { \ ctx->ctx_wstate = WKST_RUNNING; \ break; \ } \ } /* ** MI_WORKER -- worker thread ** executes tasks distributed by the mi_pool_controller ** or by mi_start_session ** ** Parameters: ** arg -- pointer to context structure ** ** Returns: ** NULL pointer */ static void * mi_worker(arg) void *arg; { SMFICTX_PTR ctx; bool done; sthread_t t_id; int r; ctx = (SMFICTX_PTR) arg; done = false; if (ctx != NULL) ctx->ctx_wstate = WKST_RUNNING; t_id = sthread_get_id(); if (pthread_detach(t_id) != 0) { smi_log(SMI_LOG_ERR, "Failed to detach worker thread"); if (ctx != NULL) ctx->ctx_wstate = WKST_READY_TO_RUN; return NULL; } TASKMGR_LOCK(); Tskmgr.tm_nb_workers++; TASKMGR_UNLOCK(); while (!done) { if (mi_stop() != MILTER_CONT) break; /* let's handle next task... */ if (ctx != NULL) { int res; POOL_LEV_DPRINTF(4, ("worker %d: new task -> let's handle it", t_id)); res = mi_engine(ctx); POOL_LEV_DPRINTF(4, ("worker %d: mi_engine returned %d", t_id, res)); TASKMGR_LOCK(); if (res != MI_CONTINUE) { ctx->ctx_wstate = WKST_CLOSING; /* ** Delete context from linked list of ** sessions and close session. */ mi_close_session(ctx); } else { ctx->ctx_wstate = WKST_READY_TO_WAIT; POOL_LEV_DPRINTF(4, ("writing to event pipe...")); /* ** Signal task controller to add new session ** to poll set. */ PIPE_SEND_SIGNAL(); } TASKMGR_UNLOCK(); ctx = NULL; } /* check if there is any task waiting to be served */ TASKMGR_LOCK(); GET_TASK_READY_TO_RUN(); /* Got a task? */ if (ctx != NULL) { TASKMGR_UNLOCK(); continue; } /* ** if not, let's check if there is enough idle workers ** if yes: quit */ if (Tskmgr.tm_nb_workers > MIN_WORKERS && Tskmgr.tm_nb_idle > MIN_IDLE) done = true; POOL_LEV_DPRINTF(4, ("worker %d: checking ... %d %d", t_id, Tskmgr.tm_nb_workers, Tskmgr.tm_nb_idle + 1)); if (done) { POOL_LEV_DPRINTF(4, ("worker %d: quitting... ", t_id)); Tskmgr.tm_nb_workers--; TASKMGR_UNLOCK(); continue; } /* ** if no task ready to run, wait for another one */ Tskmgr.tm_nb_idle++; TASKMGR_COND_WAIT(); Tskmgr.tm_nb_idle--; /* look for a task */ GET_TASK_READY_TO_RUN(); TASKMGR_UNLOCK(); } return NULL; } /* ** MI_LIST_ADD_CTX -- add new session to linked list ** ** Parameters: ** ctx -- context structure ** ** Returns: ** MI_FAILURE/MI_SUCCESS */ static int mi_list_add_ctx(ctx) SMFICTX_PTR ctx; { SM_ASSERT(ctx != NULL); SM_TAILQ_INSERT_TAIL(&WRK_CTX_HEAD, ctx, ctx_link); return MI_SUCCESS; } /* ** MI_LIST_DEL_CTX -- remove session from linked list when finished ** ** Parameters: ** ctx -- context structure ** ** Returns: ** MI_FAILURE/MI_SUCCESS */ static int mi_list_del_ctx(ctx) SMFICTX_PTR ctx; { SM_ASSERT(ctx != NULL); if (SM_TAILQ_EMPTY(&WRK_CTX_HEAD)) return MI_FAILURE; SM_TAILQ_REMOVE(&WRK_CTX_HEAD, ctx, ctx_link); return MI_SUCCESS; } #endif /* _FFR_WORKERS_POOL */