1058561cbSjbeck /* 2*e9af4bc0SJohn Beck * Copyright (c) 2003-2004, 2007, 2009 Sendmail, Inc. and its suppliers. 3058561cbSjbeck * All rights reserved. 4058561cbSjbeck * 5058561cbSjbeck * By using this file, you agree to the terms and conditions set 6058561cbSjbeck * forth in the LICENSE file which can be found at the top level of 7058561cbSjbeck * the sendmail distribution. 8058561cbSjbeck * 9058561cbSjbeck * Contributed by Jose Marcio Martins da Cruz - Ecole des Mines de Paris 10058561cbSjbeck * Jose-Marcio.Martins@ensmp.fr 11058561cbSjbeck */ 12058561cbSjbeck 13058561cbSjbeck #include <sm/gen.h> 14*e9af4bc0SJohn Beck SM_RCSID("@(#)$Id: worker.c,v 8.17 2009/06/15 15:34:54 ca Exp $") 15058561cbSjbeck 16058561cbSjbeck #include "libmilter.h" 17058561cbSjbeck 18058561cbSjbeck #if _FFR_WORKERS_POOL 19058561cbSjbeck 20058561cbSjbeck typedef struct taskmgr_S taskmgr_T; 21058561cbSjbeck 22058561cbSjbeck #define TM_SIGNATURE 0x23021957 23058561cbSjbeck 24058561cbSjbeck struct taskmgr_S 25058561cbSjbeck { 26058561cbSjbeck long tm_signature; /* has the controller been initialized */ 27058561cbSjbeck sthread_t tm_tid; /* thread id of controller */ 28058561cbSjbeck smfi_hd_T tm_ctx_head; /* head of the linked list of contexts */ 29058561cbSjbeck 30058561cbSjbeck int tm_nb_workers; /* number of workers in the pool */ 31058561cbSjbeck int tm_nb_idle; /* number of workers waiting */ 32058561cbSjbeck 33058561cbSjbeck int tm_p[2]; /* poll control pipe */ 34058561cbSjbeck 35058561cbSjbeck smutex_t tm_w_mutex; /* linked list access mutex */ 36058561cbSjbeck scond_t tm_w_cond; /* */ 37058561cbSjbeck }; 38058561cbSjbeck 39058561cbSjbeck static taskmgr_T Tskmgr = {0}; 40058561cbSjbeck 41058561cbSjbeck #define WRK_CTX_HEAD Tskmgr.tm_ctx_head 42058561cbSjbeck 43058561cbSjbeck #define RD_PIPE (Tskmgr.tm_p[0]) 44058561cbSjbeck #define WR_PIPE (Tskmgr.tm_p[1]) 45058561cbSjbeck 46058561cbSjbeck #define PIPE_SEND_SIGNAL() \ 47058561cbSjbeck do \ 48058561cbSjbeck { \ 49058561cbSjbeck char evt = 0x5a; \ 50058561cbSjbeck int fd = WR_PIPE; \ 51058561cbSjbeck if (write(fd, &evt, sizeof(evt)) != sizeof(evt)) \ 52058561cbSjbeck smi_log(SMI_LOG_ERR, \ 53058561cbSjbeck "Error writing to event pipe: %s", \ 54058561cbSjbeck sm_errstring(errno)); \ 55058561cbSjbeck } while (0) 56058561cbSjbeck 57058561cbSjbeck #ifndef USE_PIPE_WAKE_POLL 58058561cbSjbeck # define USE_PIPE_WAKE_POLL 1 59058561cbSjbeck #endif /* USE_PIPE_WAKE_POLL */ 60058561cbSjbeck 61058561cbSjbeck /* poll check periodicity (default 10000 - 10 s) */ 62058561cbSjbeck #define POLL_TIMEOUT 10000 63058561cbSjbeck 64058561cbSjbeck /* worker conditional wait timeout (default 10 s) */ 65058561cbSjbeck #define COND_TIMEOUT 10 66058561cbSjbeck 67058561cbSjbeck /* functions */ 68058561cbSjbeck static int mi_close_session __P((SMFICTX_PTR)); 69058561cbSjbeck 70058561cbSjbeck static void *mi_worker __P((void *)); 71058561cbSjbeck static void *mi_pool_controller __P((void *)); 72058561cbSjbeck 73058561cbSjbeck static int mi_list_add_ctx __P((SMFICTX_PTR)); 74058561cbSjbeck static int mi_list_del_ctx __P((SMFICTX_PTR)); 75058561cbSjbeck 76058561cbSjbeck /* 77058561cbSjbeck ** periodicity of cleaning up old sessions (timedout) 78058561cbSjbeck ** sessions list will be checked to find old inactive 79058561cbSjbeck ** sessions each DT_CHECK_OLD_SESSIONS sec 80058561cbSjbeck */ 81058561cbSjbeck 82058561cbSjbeck #define DT_CHECK_OLD_SESSIONS 600 83058561cbSjbeck 84058561cbSjbeck #ifndef OLD_SESSION_TIMEOUT 85058561cbSjbeck # define OLD_SESSION_TIMEOUT ctx->ctx_timeout 86058561cbSjbeck #endif /* OLD_SESSION_TIMEOUT */ 87058561cbSjbeck 88058561cbSjbeck /* session states - with respect to the pool of workers */ 89058561cbSjbeck #define WKST_INIT 0 /* initial state */ 90058561cbSjbeck #define WKST_READY_TO_RUN 1 /* command ready do be read */ 91058561cbSjbeck #define WKST_RUNNING 2 /* session running on a worker */ 92058561cbSjbeck #define WKST_READY_TO_WAIT 3 /* session just finished by a worker */ 93058561cbSjbeck #define WKST_WAITING 4 /* waiting for new command */ 94058561cbSjbeck #define WKST_CLOSING 5 /* session finished */ 95058561cbSjbeck 96058561cbSjbeck #ifndef MIN_WORKERS 97058561cbSjbeck # define MIN_WORKERS 2 /* minimum number of threads to keep around */ 98058561cbSjbeck #endif 99058561cbSjbeck 100058561cbSjbeck #define MIN_IDLE 1 /* minimum number of idle threads */ 101058561cbSjbeck 102058561cbSjbeck 103058561cbSjbeck /* 104058561cbSjbeck ** Macros for threads and mutex management 105058561cbSjbeck */ 106058561cbSjbeck 107058561cbSjbeck #define TASKMGR_LOCK() \ 108058561cbSjbeck do \ 109058561cbSjbeck { \ 110058561cbSjbeck if (!smutex_lock(&Tskmgr.tm_w_mutex)) \ 111058561cbSjbeck smi_log(SMI_LOG_ERR, "TASKMGR_LOCK error"); \ 112058561cbSjbeck } while (0) 113058561cbSjbeck 114058561cbSjbeck #define TASKMGR_UNLOCK() \ 115058561cbSjbeck do \ 116058561cbSjbeck { \ 117058561cbSjbeck if (!smutex_unlock(&Tskmgr.tm_w_mutex)) \ 118058561cbSjbeck smi_log(SMI_LOG_ERR, "TASKMGR_UNLOCK error"); \ 119058561cbSjbeck } while (0) 120058561cbSjbeck 121058561cbSjbeck #define TASKMGR_COND_WAIT() \ 122058561cbSjbeck scond_timedwait(&Tskmgr.tm_w_cond, &Tskmgr.tm_w_mutex, COND_TIMEOUT) 123058561cbSjbeck 124058561cbSjbeck #define TASKMGR_COND_SIGNAL() \ 125058561cbSjbeck do \ 126058561cbSjbeck { \ 127058561cbSjbeck if (scond_signal(&Tskmgr.tm_w_cond) != 0) \ 128058561cbSjbeck smi_log(SMI_LOG_ERR, "TASKMGR_COND_SIGNAL error"); \ 129058561cbSjbeck } while (0) 130058561cbSjbeck 131058561cbSjbeck #define LAUNCH_WORKER(ctx) \ 132058561cbSjbeck do \ 133058561cbSjbeck { \ 134058561cbSjbeck int r; \ 135058561cbSjbeck sthread_t tid; \ 136058561cbSjbeck \ 137058561cbSjbeck if ((r = thread_create(&tid, mi_worker, ctx)) != 0) \ 138058561cbSjbeck smi_log(SMI_LOG_ERR, "LAUNCH_WORKER error: %s",\ 139058561cbSjbeck sm_errstring(r)); \ 140058561cbSjbeck } while (0) 141058561cbSjbeck 142058561cbSjbeck #if POOL_DEBUG 143058561cbSjbeck # define POOL_LEV_DPRINTF(lev, x) \ 144058561cbSjbeck do { \ 145058561cbSjbeck if ((lev) < ctx->ctx_dbg) \ 146058561cbSjbeck sm_dprintf x; \ 147058561cbSjbeck } while (0) 148058561cbSjbeck #else /* POOL_DEBUG */ 149058561cbSjbeck # define POOL_LEV_DPRINTF(lev, x) 150058561cbSjbeck #endif /* POOL_DEBUG */ 151058561cbSjbeck 152058561cbSjbeck /* 153058561cbSjbeck ** MI_START_SESSION -- Start a session in the pool of workers 154058561cbSjbeck ** 155058561cbSjbeck ** Parameters: 156058561cbSjbeck ** ctx -- context structure 157058561cbSjbeck ** 158058561cbSjbeck ** Returns: 159058561cbSjbeck ** MI_SUCCESS/MI_FAILURE 160058561cbSjbeck */ 161058561cbSjbeck 162058561cbSjbeck int 163058561cbSjbeck mi_start_session(ctx) 164058561cbSjbeck SMFICTX_PTR ctx; 165058561cbSjbeck { 166058561cbSjbeck static long id = 0; 167058561cbSjbeck 168058561cbSjbeck SM_ASSERT(Tskmgr.tm_signature == TM_SIGNATURE); 169058561cbSjbeck SM_ASSERT(ctx != NULL); 170058561cbSjbeck POOL_LEV_DPRINTF(4, ("PIPE r=[%d] w=[%d]", RD_PIPE, WR_PIPE)); 171058561cbSjbeck TASKMGR_LOCK(); 172058561cbSjbeck 173058561cbSjbeck if (mi_list_add_ctx(ctx) != MI_SUCCESS) 174058561cbSjbeck { 175058561cbSjbeck TASKMGR_UNLOCK(); 176058561cbSjbeck return MI_FAILURE; 177058561cbSjbeck } 178058561cbSjbeck 179058561cbSjbeck ctx->ctx_sid = id++; 180058561cbSjbeck 181058561cbSjbeck /* if there is an idle worker, signal it, otherwise start new worker */ 182058561cbSjbeck if (Tskmgr.tm_nb_idle > 0) 183058561cbSjbeck { 184058561cbSjbeck ctx->ctx_wstate = WKST_READY_TO_RUN; 185058561cbSjbeck TASKMGR_COND_SIGNAL(); 186058561cbSjbeck } 187058561cbSjbeck else 188058561cbSjbeck { 189058561cbSjbeck ctx->ctx_wstate = WKST_RUNNING; 190058561cbSjbeck LAUNCH_WORKER(ctx); 191058561cbSjbeck } 192058561cbSjbeck TASKMGR_UNLOCK(); 193058561cbSjbeck return MI_SUCCESS; 194058561cbSjbeck } 195058561cbSjbeck 196058561cbSjbeck /* 197058561cbSjbeck ** MI_CLOSE_SESSION -- Close a session and clean up data structures 198058561cbSjbeck ** 199058561cbSjbeck ** Parameters: 200058561cbSjbeck ** ctx -- context structure 201058561cbSjbeck ** 202058561cbSjbeck ** Returns: 203058561cbSjbeck ** MI_SUCCESS/MI_FAILURE 204058561cbSjbeck */ 205058561cbSjbeck 206058561cbSjbeck static int 207058561cbSjbeck mi_close_session(ctx) 208058561cbSjbeck SMFICTX_PTR ctx; 209058561cbSjbeck { 210058561cbSjbeck SM_ASSERT(ctx != NULL); 211058561cbSjbeck 212058561cbSjbeck (void) mi_list_del_ctx(ctx); 213*e9af4bc0SJohn Beck mi_clr_ctx(ctx); 214058561cbSjbeck 215058561cbSjbeck return MI_SUCCESS; 216058561cbSjbeck } 217058561cbSjbeck 218058561cbSjbeck /* 219058561cbSjbeck ** MI_POOL_CONTROLER_INIT -- Launch the worker pool controller 220058561cbSjbeck ** Must be called before starting sessions. 221058561cbSjbeck ** 222058561cbSjbeck ** Parameters: 223058561cbSjbeck ** none 224058561cbSjbeck ** 225058561cbSjbeck ** Returns: 226058561cbSjbeck ** MI_SUCCESS/MI_FAILURE 227058561cbSjbeck */ 228058561cbSjbeck 229058561cbSjbeck int 230058561cbSjbeck mi_pool_controller_init() 231058561cbSjbeck { 232058561cbSjbeck sthread_t tid; 233058561cbSjbeck int r, i; 234058561cbSjbeck 235058561cbSjbeck if (Tskmgr.tm_signature == TM_SIGNATURE) 236058561cbSjbeck return MI_SUCCESS; 237058561cbSjbeck 238058561cbSjbeck SM_TAILQ_INIT(&WRK_CTX_HEAD); 239058561cbSjbeck Tskmgr.tm_tid = (sthread_t) -1; 240058561cbSjbeck Tskmgr.tm_nb_workers = 0; 241058561cbSjbeck Tskmgr.tm_nb_idle = 0; 242058561cbSjbeck 243058561cbSjbeck if (pipe(Tskmgr.tm_p) != 0) 244058561cbSjbeck { 245058561cbSjbeck smi_log(SMI_LOG_ERR, "can't create event pipe: %s", 246*e9af4bc0SJohn Beck sm_errstring(errno)); 247058561cbSjbeck return MI_FAILURE; 248058561cbSjbeck } 249058561cbSjbeck 250058561cbSjbeck (void) smutex_init(&Tskmgr.tm_w_mutex); 251058561cbSjbeck (void) scond_init(&Tskmgr.tm_w_cond); 252058561cbSjbeck 253058561cbSjbeck /* Launch the pool controller */ 254058561cbSjbeck if ((r = thread_create(&tid, mi_pool_controller, (void *) NULL)) != 0) 255058561cbSjbeck { 256058561cbSjbeck smi_log(SMI_LOG_ERR, "can't create controller thread: %s", 257058561cbSjbeck sm_errstring(r)); 258058561cbSjbeck return MI_FAILURE; 259058561cbSjbeck } 260058561cbSjbeck Tskmgr.tm_tid = tid; 261058561cbSjbeck Tskmgr.tm_signature = TM_SIGNATURE; 262058561cbSjbeck 263058561cbSjbeck /* Create the pool of workers */ 264058561cbSjbeck for (i = 0; i < MIN_WORKERS; i++) 265058561cbSjbeck { 266058561cbSjbeck if ((r = thread_create(&tid, mi_worker, (void *) NULL)) != 0) 267058561cbSjbeck { 268058561cbSjbeck smi_log(SMI_LOG_ERR, "can't create workers crew: %s", 269058561cbSjbeck sm_errstring(r)); 270058561cbSjbeck return MI_FAILURE; 271058561cbSjbeck } 272058561cbSjbeck } 273058561cbSjbeck 274058561cbSjbeck return MI_SUCCESS; 275058561cbSjbeck } 276058561cbSjbeck 277058561cbSjbeck /* 278058561cbSjbeck ** MI_POOL_CONTROLLER -- manage the pool of workers 279058561cbSjbeck ** This thread must be running when listener begins 280058561cbSjbeck ** starting sessions 281058561cbSjbeck ** 282058561cbSjbeck ** Parameters: 283058561cbSjbeck ** arg -- unused 284058561cbSjbeck ** 285058561cbSjbeck ** Returns: 286058561cbSjbeck ** NULL 287058561cbSjbeck ** 288058561cbSjbeck ** Control flow: 289058561cbSjbeck ** for (;;) 290058561cbSjbeck ** Look for timed out sessions 291058561cbSjbeck ** Select sessions to wait for sendmail command 292058561cbSjbeck ** Poll set of file descriptors 293058561cbSjbeck ** if timeout 294058561cbSjbeck ** continue 295058561cbSjbeck ** For each file descriptor ready 296058561cbSjbeck ** launch new thread if no worker available 297058561cbSjbeck ** else 298058561cbSjbeck ** signal waiting worker 299058561cbSjbeck */ 300058561cbSjbeck 301058561cbSjbeck /* Poll structure array (pollfd) size step */ 302058561cbSjbeck #define PFD_STEP 256 303058561cbSjbeck 304058561cbSjbeck #define WAIT_FD(i) (pfd[i].fd) 305058561cbSjbeck #define WAITFN "POLL" 306058561cbSjbeck 307058561cbSjbeck static void * 308058561cbSjbeck mi_pool_controller(arg) 309058561cbSjbeck void *arg; 310058561cbSjbeck { 311058561cbSjbeck struct pollfd *pfd = NULL; 312058561cbSjbeck int dim_pfd = 0; 313058561cbSjbeck bool rebuild_set = true; 314058561cbSjbeck int pcnt = 0; /* error count for poll() failures */ 315*e9af4bc0SJohn Beck time_t lastcheck; 316058561cbSjbeck 317058561cbSjbeck Tskmgr.tm_tid = sthread_get_id(); 318058561cbSjbeck if (pthread_detach(Tskmgr.tm_tid) != 0) 319058561cbSjbeck { 320058561cbSjbeck smi_log(SMI_LOG_ERR, "Failed to detach pool controller thread"); 321058561cbSjbeck return NULL; 322058561cbSjbeck } 323058561cbSjbeck 324058561cbSjbeck pfd = (struct pollfd *) malloc(PFD_STEP * sizeof(struct pollfd)); 325058561cbSjbeck if (pfd == NULL) 326058561cbSjbeck { 327058561cbSjbeck smi_log(SMI_LOG_ERR, "Failed to malloc pollfd array: %s", 328058561cbSjbeck sm_errstring(errno)); 329058561cbSjbeck return NULL; 330058561cbSjbeck } 331058561cbSjbeck dim_pfd = PFD_STEP; 332058561cbSjbeck 333*e9af4bc0SJohn Beck lastcheck = time(NULL); 334058561cbSjbeck for (;;) 335058561cbSjbeck { 336058561cbSjbeck SMFICTX_PTR ctx; 337058561cbSjbeck int nfd, rfd, i; 338058561cbSjbeck time_t now; 339058561cbSjbeck 340058561cbSjbeck POOL_LEV_DPRINTF(4, ("Let's %s again...", WAITFN)); 341058561cbSjbeck 342058561cbSjbeck if (mi_stop() != MILTER_CONT) 343058561cbSjbeck break; 344058561cbSjbeck 345058561cbSjbeck TASKMGR_LOCK(); 346058561cbSjbeck 347058561cbSjbeck now = time(NULL); 348058561cbSjbeck 349058561cbSjbeck /* check for timed out sessions? */ 350058561cbSjbeck if (lastcheck + DT_CHECK_OLD_SESSIONS < now) 351058561cbSjbeck { 352*e9af4bc0SJohn Beck ctx = SM_TAILQ_FIRST(&WRK_CTX_HEAD); 353*e9af4bc0SJohn Beck while (ctx != SM_TAILQ_END(&WRK_CTX_HEAD)) 354058561cbSjbeck { 355*e9af4bc0SJohn Beck SMFICTX_PTR ctx_nxt; 356*e9af4bc0SJohn Beck 357*e9af4bc0SJohn Beck ctx_nxt = SM_TAILQ_NEXT(ctx, ctx_link); 358058561cbSjbeck if (ctx->ctx_wstate == WKST_WAITING) 359058561cbSjbeck { 360058561cbSjbeck if (ctx->ctx_wait == 0) 361058561cbSjbeck ctx->ctx_wait = now; 362*e9af4bc0SJohn Beck else if (ctx->ctx_wait + OLD_SESSION_TIMEOUT 363058561cbSjbeck < now) 364058561cbSjbeck { 365*e9af4bc0SJohn Beck /* if session timed out, close it */ 366058561cbSjbeck sfsistat (*fi_close) __P((SMFICTX *)); 367058561cbSjbeck 368058561cbSjbeck POOL_LEV_DPRINTF(4, 369058561cbSjbeck ("Closing old connection: sd=%d id=%d", 370058561cbSjbeck ctx->ctx_sd, 371058561cbSjbeck ctx->ctx_sid)); 372058561cbSjbeck 373058561cbSjbeck if ((fi_close = ctx->ctx_smfi->xxfi_close) != NULL) 374058561cbSjbeck (void) (*fi_close)(ctx); 375058561cbSjbeck 376058561cbSjbeck mi_close_session(ctx); 377058561cbSjbeck } 378058561cbSjbeck } 379*e9af4bc0SJohn Beck ctx = ctx_nxt; 380058561cbSjbeck } 381058561cbSjbeck lastcheck = now; 382058561cbSjbeck } 383058561cbSjbeck 384058561cbSjbeck if (rebuild_set) 385058561cbSjbeck { 386058561cbSjbeck /* 387058561cbSjbeck ** Initialize poll set. 388058561cbSjbeck ** Insert into the poll set the file descriptors of 389058561cbSjbeck ** all sessions waiting for a command from sendmail. 390058561cbSjbeck */ 391058561cbSjbeck 392058561cbSjbeck nfd = 0; 393058561cbSjbeck 394058561cbSjbeck /* begin with worker pipe */ 395058561cbSjbeck pfd[nfd].fd = RD_PIPE; 396058561cbSjbeck pfd[nfd].events = MI_POLL_RD_FLAGS; 397058561cbSjbeck pfd[nfd].revents = 0; 398058561cbSjbeck nfd++; 399058561cbSjbeck 400058561cbSjbeck SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link) 401058561cbSjbeck { 402058561cbSjbeck /* 403058561cbSjbeck ** update ctx_wait - start of wait moment - 404058561cbSjbeck ** for timeout 405058561cbSjbeck */ 406058561cbSjbeck 407058561cbSjbeck if (ctx->ctx_wstate == WKST_READY_TO_WAIT) 408058561cbSjbeck ctx->ctx_wait = now; 409058561cbSjbeck 410058561cbSjbeck /* add the session to the pollfd array? */ 411058561cbSjbeck if ((ctx->ctx_wstate == WKST_READY_TO_WAIT) || 412058561cbSjbeck (ctx->ctx_wstate == WKST_WAITING)) 413058561cbSjbeck { 414058561cbSjbeck /* 415058561cbSjbeck ** Resize the pollfd array if it 416058561cbSjbeck ** isn't large enough. 417058561cbSjbeck */ 418058561cbSjbeck 419058561cbSjbeck if (nfd >= dim_pfd) 420058561cbSjbeck { 421058561cbSjbeck struct pollfd *tpfd; 422058561cbSjbeck size_t new; 423058561cbSjbeck 424058561cbSjbeck new = (dim_pfd + PFD_STEP) * 425058561cbSjbeck sizeof(*tpfd); 426058561cbSjbeck tpfd = (struct pollfd *) 427058561cbSjbeck realloc(pfd, new); 428058561cbSjbeck if (tpfd != NULL) 429058561cbSjbeck { 430058561cbSjbeck pfd = tpfd; 431058561cbSjbeck dim_pfd += PFD_STEP; 432058561cbSjbeck } 433058561cbSjbeck else 434058561cbSjbeck { 435058561cbSjbeck smi_log(SMI_LOG_ERR, 436058561cbSjbeck "Failed to realloc pollfd array:%s", 437058561cbSjbeck sm_errstring(errno)); 438058561cbSjbeck } 439058561cbSjbeck } 440058561cbSjbeck 441058561cbSjbeck /* add the session to pollfd array */ 442058561cbSjbeck if (nfd < dim_pfd) 443058561cbSjbeck { 444058561cbSjbeck ctx->ctx_wstate = WKST_WAITING; 445058561cbSjbeck pfd[nfd].fd = ctx->ctx_sd; 446058561cbSjbeck pfd[nfd].events = MI_POLL_RD_FLAGS; 447058561cbSjbeck pfd[nfd].revents = 0; 448058561cbSjbeck nfd++; 449058561cbSjbeck } 450058561cbSjbeck } 451058561cbSjbeck } 452*e9af4bc0SJohn Beck rebuild_set = false; 453058561cbSjbeck } 454058561cbSjbeck 455058561cbSjbeck TASKMGR_UNLOCK(); 456058561cbSjbeck 457058561cbSjbeck /* Everything is ready, let's wait for an event */ 458058561cbSjbeck rfd = poll(pfd, nfd, POLL_TIMEOUT); 459058561cbSjbeck 460058561cbSjbeck POOL_LEV_DPRINTF(4, ("%s returned: at epoch %d value %d", 461058561cbSjbeck WAITFN, now, nfd)); 462058561cbSjbeck 463058561cbSjbeck /* timeout */ 464058561cbSjbeck if (rfd == 0) 465058561cbSjbeck continue; 466058561cbSjbeck 467058561cbSjbeck rebuild_set = true; 468058561cbSjbeck 469058561cbSjbeck /* error */ 470058561cbSjbeck if (rfd < 0) 471058561cbSjbeck { 472058561cbSjbeck if (errno == EINTR) 473058561cbSjbeck continue; 474058561cbSjbeck pcnt++; 475058561cbSjbeck smi_log(SMI_LOG_ERR, 476058561cbSjbeck "%s() failed (%s), %s", 477058561cbSjbeck WAITFN, sm_errstring(errno), 478058561cbSjbeck pcnt >= MAX_FAILS_S ? "abort" : "try again"); 479058561cbSjbeck 480058561cbSjbeck if (pcnt >= MAX_FAILS_S) 481058561cbSjbeck goto err; 482058561cbSjbeck } 483058561cbSjbeck pcnt = 0; 484058561cbSjbeck 485058561cbSjbeck /* something happened */ 486058561cbSjbeck for (i = 0; i < nfd; i++) 487058561cbSjbeck { 488058561cbSjbeck if (pfd[i].revents == 0) 489058561cbSjbeck continue; 490058561cbSjbeck 491058561cbSjbeck POOL_LEV_DPRINTF(4, ("%s event on pfd[%d/%d]=%d ", 492058561cbSjbeck WAITFN, i, nfd, 493058561cbSjbeck WAIT_FD(i))); 494058561cbSjbeck 495058561cbSjbeck /* has a worker signaled an end of task ? */ 496058561cbSjbeck if (WAIT_FD(i) == RD_PIPE) 497058561cbSjbeck { 498058561cbSjbeck char evt = 0; 499058561cbSjbeck int r = 0; 500058561cbSjbeck 501058561cbSjbeck POOL_LEV_DPRINTF(4, 502058561cbSjbeck ("PIPE WILL READ evt = %08X %08X", 503058561cbSjbeck pfd[i].events, pfd[i].revents)); 504058561cbSjbeck 505058561cbSjbeck if ((pfd[i].revents & MI_POLL_RD_FLAGS) != 0) 506058561cbSjbeck { 507058561cbSjbeck r = read(RD_PIPE, &evt, sizeof(evt)); 508058561cbSjbeck if (r == sizeof(evt)) 509058561cbSjbeck { 510058561cbSjbeck /* Do nothing */ 511058561cbSjbeck } 512058561cbSjbeck } 513058561cbSjbeck 514058561cbSjbeck POOL_LEV_DPRINTF(4, 515058561cbSjbeck ("PIPE DONE READ i=[%d] fd=[%d] r=[%d] evt=[%d]", 516058561cbSjbeck i, RD_PIPE, r, evt)); 517058561cbSjbeck 518058561cbSjbeck if ((pfd[i].revents & ~MI_POLL_RD_FLAGS) != 0) 519058561cbSjbeck { 520058561cbSjbeck /* Exception handling */ 521058561cbSjbeck } 522058561cbSjbeck continue; 523058561cbSjbeck } 524058561cbSjbeck 525058561cbSjbeck /* no ! sendmail wants to send a command */ 526058561cbSjbeck SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link) 527058561cbSjbeck { 528058561cbSjbeck if (ctx->ctx_wstate != WKST_WAITING) 529058561cbSjbeck continue; 530058561cbSjbeck 531058561cbSjbeck POOL_LEV_DPRINTF(4, 532058561cbSjbeck ("Checking context sd=%d - fd=%d ", 533058561cbSjbeck ctx->ctx_sd , WAIT_FD(i))); 534058561cbSjbeck 535058561cbSjbeck if (ctx->ctx_sd == pfd[i].fd) 536058561cbSjbeck { 537058561cbSjbeck TASKMGR_LOCK(); 538058561cbSjbeck 539058561cbSjbeck POOL_LEV_DPRINTF(4, 540058561cbSjbeck ("TASK: found %d for fd[%d]=%d", 541058561cbSjbeck ctx->ctx_sid, i, WAIT_FD(i))); 542058561cbSjbeck 543058561cbSjbeck if (Tskmgr.tm_nb_idle > 0) 544058561cbSjbeck { 545058561cbSjbeck ctx->ctx_wstate = WKST_READY_TO_RUN; 546058561cbSjbeck TASKMGR_COND_SIGNAL(); 547058561cbSjbeck } 548058561cbSjbeck else 549058561cbSjbeck { 550058561cbSjbeck ctx->ctx_wstate = WKST_RUNNING; 551058561cbSjbeck LAUNCH_WORKER(ctx); 552058561cbSjbeck } 553058561cbSjbeck TASKMGR_UNLOCK(); 554058561cbSjbeck break; 555058561cbSjbeck } 556058561cbSjbeck } 557058561cbSjbeck 558058561cbSjbeck POOL_LEV_DPRINTF(4, 559058561cbSjbeck ("TASK %s FOUND - Checking PIPE for fd[%d]", 560058561cbSjbeck ctx != NULL ? "" : "NOT", WAIT_FD(i))); 561058561cbSjbeck } 562058561cbSjbeck } 563058561cbSjbeck 564058561cbSjbeck err: 565058561cbSjbeck if (pfd != NULL) 566058561cbSjbeck free(pfd); 567058561cbSjbeck 568058561cbSjbeck Tskmgr.tm_signature = 0; 569058561cbSjbeck for (;;) 570058561cbSjbeck { 571058561cbSjbeck SMFICTX_PTR ctx; 572058561cbSjbeck 573058561cbSjbeck ctx = SM_TAILQ_FIRST(&WRK_CTX_HEAD); 574058561cbSjbeck if (ctx == NULL) 575058561cbSjbeck break; 576058561cbSjbeck mi_close_session(ctx); 577058561cbSjbeck } 578058561cbSjbeck 579058561cbSjbeck (void) smutex_destroy(&Tskmgr.tm_w_mutex); 580058561cbSjbeck (void) scond_destroy(&Tskmgr.tm_w_cond); 581058561cbSjbeck 582058561cbSjbeck return NULL; 583058561cbSjbeck } 584058561cbSjbeck 585058561cbSjbeck /* 586058561cbSjbeck ** Look for a task ready to run. 587058561cbSjbeck ** Value of ctx is NULL or a pointer to a task ready to run. 588058561cbSjbeck */ 589058561cbSjbeck 590058561cbSjbeck #define GET_TASK_READY_TO_RUN() \ 591058561cbSjbeck SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link) \ 592058561cbSjbeck { \ 593058561cbSjbeck if (ctx->ctx_wstate == WKST_READY_TO_RUN) \ 594058561cbSjbeck { \ 595058561cbSjbeck ctx->ctx_wstate = WKST_RUNNING; \ 596058561cbSjbeck break; \ 597058561cbSjbeck } \ 598058561cbSjbeck } 599058561cbSjbeck 600058561cbSjbeck /* 601058561cbSjbeck ** MI_WORKER -- worker thread 602058561cbSjbeck ** executes tasks distributed by the mi_pool_controller 603058561cbSjbeck ** or by mi_start_session 604058561cbSjbeck ** 605058561cbSjbeck ** Parameters: 606058561cbSjbeck ** arg -- pointer to context structure 607058561cbSjbeck ** 608058561cbSjbeck ** Returns: 609058561cbSjbeck ** NULL pointer 610058561cbSjbeck */ 611058561cbSjbeck 612058561cbSjbeck static void * 613058561cbSjbeck mi_worker(arg) 614058561cbSjbeck void *arg; 615058561cbSjbeck { 616058561cbSjbeck SMFICTX_PTR ctx; 617058561cbSjbeck bool done; 618058561cbSjbeck sthread_t t_id; 619058561cbSjbeck int r; 620058561cbSjbeck 621058561cbSjbeck ctx = (SMFICTX_PTR) arg; 622058561cbSjbeck done = false; 623058561cbSjbeck if (ctx != NULL) 624058561cbSjbeck ctx->ctx_wstate = WKST_RUNNING; 625058561cbSjbeck 626058561cbSjbeck t_id = sthread_get_id(); 627058561cbSjbeck if (pthread_detach(t_id) != 0) 628058561cbSjbeck { 629058561cbSjbeck smi_log(SMI_LOG_ERR, "Failed to detach worker thread"); 630058561cbSjbeck if (ctx != NULL) 631058561cbSjbeck ctx->ctx_wstate = WKST_READY_TO_RUN; 632058561cbSjbeck return NULL; 633058561cbSjbeck } 634058561cbSjbeck 635058561cbSjbeck TASKMGR_LOCK(); 636058561cbSjbeck Tskmgr.tm_nb_workers++; 637058561cbSjbeck TASKMGR_UNLOCK(); 638058561cbSjbeck 639058561cbSjbeck while (!done) 640058561cbSjbeck { 641058561cbSjbeck if (mi_stop() != MILTER_CONT) 642058561cbSjbeck break; 643058561cbSjbeck 644058561cbSjbeck /* let's handle next task... */ 645058561cbSjbeck if (ctx != NULL) 646058561cbSjbeck { 647058561cbSjbeck int res; 648058561cbSjbeck 649058561cbSjbeck POOL_LEV_DPRINTF(4, 650058561cbSjbeck ("worker %d: new task -> let's handle it", 651058561cbSjbeck t_id)); 652058561cbSjbeck res = mi_engine(ctx); 653058561cbSjbeck POOL_LEV_DPRINTF(4, 654058561cbSjbeck ("worker %d: mi_engine returned %d", t_id, res)); 655058561cbSjbeck 656058561cbSjbeck TASKMGR_LOCK(); 657058561cbSjbeck if (res != MI_CONTINUE) 658058561cbSjbeck { 659058561cbSjbeck ctx->ctx_wstate = WKST_CLOSING; 660058561cbSjbeck 661058561cbSjbeck /* 662058561cbSjbeck ** Delete context from linked list of 663058561cbSjbeck ** sessions and close session. 664058561cbSjbeck */ 665058561cbSjbeck 666058561cbSjbeck mi_close_session(ctx); 667058561cbSjbeck } 668058561cbSjbeck else 669058561cbSjbeck { 670058561cbSjbeck ctx->ctx_wstate = WKST_READY_TO_WAIT; 671058561cbSjbeck 672058561cbSjbeck POOL_LEV_DPRINTF(4, 673058561cbSjbeck ("writing to event pipe...")); 674058561cbSjbeck 675058561cbSjbeck /* 676058561cbSjbeck ** Signal task controller to add new session 677058561cbSjbeck ** to poll set. 678058561cbSjbeck */ 679058561cbSjbeck 680058561cbSjbeck PIPE_SEND_SIGNAL(); 681058561cbSjbeck } 682058561cbSjbeck TASKMGR_UNLOCK(); 683058561cbSjbeck ctx = NULL; 684058561cbSjbeck 685058561cbSjbeck } 686058561cbSjbeck 687058561cbSjbeck /* check if there is any task waiting to be served */ 688058561cbSjbeck TASKMGR_LOCK(); 689058561cbSjbeck 690058561cbSjbeck GET_TASK_READY_TO_RUN(); 691058561cbSjbeck 692058561cbSjbeck /* Got a task? */ 693058561cbSjbeck if (ctx != NULL) 694058561cbSjbeck { 695058561cbSjbeck TASKMGR_UNLOCK(); 696058561cbSjbeck continue; 697058561cbSjbeck } 698058561cbSjbeck 699058561cbSjbeck /* 700058561cbSjbeck ** if not, let's check if there is enough idle workers 701058561cbSjbeck ** if yes: quit 702058561cbSjbeck */ 703058561cbSjbeck 704058561cbSjbeck if (Tskmgr.tm_nb_workers > MIN_WORKERS && 705058561cbSjbeck Tskmgr.tm_nb_idle > MIN_IDLE) 706058561cbSjbeck done = true; 707058561cbSjbeck 708058561cbSjbeck POOL_LEV_DPRINTF(4, ("worker %d: checking ... %d %d", t_id, 709058561cbSjbeck Tskmgr.tm_nb_workers, Tskmgr.tm_nb_idle + 1)); 710058561cbSjbeck 711058561cbSjbeck if (done) 712058561cbSjbeck { 713058561cbSjbeck POOL_LEV_DPRINTF(4, ("worker %d: quitting... ", t_id)); 714058561cbSjbeck Tskmgr.tm_nb_workers--; 715058561cbSjbeck TASKMGR_UNLOCK(); 716058561cbSjbeck continue; 717058561cbSjbeck } 718058561cbSjbeck 719058561cbSjbeck /* 720058561cbSjbeck ** if no task ready to run, wait for another one 721058561cbSjbeck */ 722058561cbSjbeck 723058561cbSjbeck Tskmgr.tm_nb_idle++; 724058561cbSjbeck TASKMGR_COND_WAIT(); 725058561cbSjbeck Tskmgr.tm_nb_idle--; 726058561cbSjbeck 727058561cbSjbeck /* look for a task */ 728058561cbSjbeck GET_TASK_READY_TO_RUN(); 729058561cbSjbeck 730058561cbSjbeck TASKMGR_UNLOCK(); 731058561cbSjbeck } 732058561cbSjbeck return NULL; 733058561cbSjbeck } 734058561cbSjbeck 735058561cbSjbeck /* 736058561cbSjbeck ** MI_LIST_ADD_CTX -- add new session to linked list 737058561cbSjbeck ** 738058561cbSjbeck ** Parameters: 739058561cbSjbeck ** ctx -- context structure 740058561cbSjbeck ** 741058561cbSjbeck ** Returns: 742058561cbSjbeck ** MI_FAILURE/MI_SUCCESS 743058561cbSjbeck */ 744058561cbSjbeck 745058561cbSjbeck static int 746058561cbSjbeck mi_list_add_ctx(ctx) 747058561cbSjbeck SMFICTX_PTR ctx; 748058561cbSjbeck { 749058561cbSjbeck SM_ASSERT(ctx != NULL); 750058561cbSjbeck SM_TAILQ_INSERT_TAIL(&WRK_CTX_HEAD, ctx, ctx_link); 751058561cbSjbeck return MI_SUCCESS; 752058561cbSjbeck } 753058561cbSjbeck 754058561cbSjbeck /* 755058561cbSjbeck ** MI_LIST_DEL_CTX -- remove session from linked list when finished 756058561cbSjbeck ** 757058561cbSjbeck ** Parameters: 758058561cbSjbeck ** ctx -- context structure 759058561cbSjbeck ** 760058561cbSjbeck ** Returns: 761058561cbSjbeck ** MI_FAILURE/MI_SUCCESS 762058561cbSjbeck */ 763058561cbSjbeck 764058561cbSjbeck static int 765058561cbSjbeck mi_list_del_ctx(ctx) 766058561cbSjbeck SMFICTX_PTR ctx; 767058561cbSjbeck { 768058561cbSjbeck SM_ASSERT(ctx != NULL); 769058561cbSjbeck if (SM_TAILQ_EMPTY(&WRK_CTX_HEAD)) 770058561cbSjbeck return MI_FAILURE; 771058561cbSjbeck 772058561cbSjbeck SM_TAILQ_REMOVE(&WRK_CTX_HEAD, ctx, ctx_link); 773058561cbSjbeck return MI_SUCCESS; 774058561cbSjbeck } 775058561cbSjbeck #endif /* _FFR_WORKERS_POOL */ 776