1 /* 2 * Copyright (c) 2003-2004, 2007, 2009 Sendmail, Inc. and its suppliers. 3 * All rights reserved. 4 * 5 * By using this file, you agree to the terms and conditions set 6 * forth in the LICENSE file which can be found at the top level of 7 * the sendmail distribution. 8 * 9 * Contributed by Jose Marcio Martins da Cruz - Ecole des Mines de Paris 10 * Jose-Marcio.Martins@ensmp.fr 11 */ 12 13 #include <sm/gen.h> 14 SM_RCSID("@(#)$Id: worker.c,v 8.17 2009/06/15 15:34:54 ca Exp $") 15 16 #include "libmilter.h" 17 18 #if _FFR_WORKERS_POOL 19 20 typedef struct taskmgr_S taskmgr_T; 21 22 #define TM_SIGNATURE 0x23021957 23 24 struct taskmgr_S 25 { 26 long tm_signature; /* has the controller been initialized */ 27 sthread_t tm_tid; /* thread id of controller */ 28 smfi_hd_T tm_ctx_head; /* head of the linked list of contexts */ 29 30 int tm_nb_workers; /* number of workers in the pool */ 31 int tm_nb_idle; /* number of workers waiting */ 32 33 int tm_p[2]; /* poll control pipe */ 34 35 smutex_t tm_w_mutex; /* linked list access mutex */ 36 scond_t tm_w_cond; /* */ 37 }; 38 39 static taskmgr_T Tskmgr = {0}; 40 41 #define WRK_CTX_HEAD Tskmgr.tm_ctx_head 42 43 #define RD_PIPE (Tskmgr.tm_p[0]) 44 #define WR_PIPE (Tskmgr.tm_p[1]) 45 46 #define PIPE_SEND_SIGNAL() \ 47 do \ 48 { \ 49 char evt = 0x5a; \ 50 int fd = WR_PIPE; \ 51 if (write(fd, &evt, sizeof(evt)) != sizeof(evt)) \ 52 smi_log(SMI_LOG_ERR, \ 53 "Error writing to event pipe: %s", \ 54 sm_errstring(errno)); \ 55 } while (0) 56 57 #ifndef USE_PIPE_WAKE_POLL 58 # define USE_PIPE_WAKE_POLL 1 59 #endif /* USE_PIPE_WAKE_POLL */ 60 61 /* poll check periodicity (default 10000 - 10 s) */ 62 #define POLL_TIMEOUT 10000 63 64 /* worker conditional wait timeout (default 10 s) */ 65 #define COND_TIMEOUT 10 66 67 /* functions */ 68 static int mi_close_session __P((SMFICTX_PTR)); 69 70 static void *mi_worker __P((void *)); 71 static void *mi_pool_controller __P((void *)); 72 73 static int mi_list_add_ctx __P((SMFICTX_PTR)); 74 static int mi_list_del_ctx __P((SMFICTX_PTR)); 75 76 /* 77 ** periodicity of cleaning up old sessions (timedout) 78 ** sessions list will be checked to find old inactive 79 ** sessions each DT_CHECK_OLD_SESSIONS sec 80 */ 81 82 #define DT_CHECK_OLD_SESSIONS 600 83 84 #ifndef OLD_SESSION_TIMEOUT 85 # define OLD_SESSION_TIMEOUT ctx->ctx_timeout 86 #endif /* OLD_SESSION_TIMEOUT */ 87 88 /* session states - with respect to the pool of workers */ 89 #define WKST_INIT 0 /* initial state */ 90 #define WKST_READY_TO_RUN 1 /* command ready do be read */ 91 #define WKST_RUNNING 2 /* session running on a worker */ 92 #define WKST_READY_TO_WAIT 3 /* session just finished by a worker */ 93 #define WKST_WAITING 4 /* waiting for new command */ 94 #define WKST_CLOSING 5 /* session finished */ 95 96 #ifndef MIN_WORKERS 97 # define MIN_WORKERS 2 /* minimum number of threads to keep around */ 98 #endif 99 100 #define MIN_IDLE 1 /* minimum number of idle threads */ 101 102 103 /* 104 ** Macros for threads and mutex management 105 */ 106 107 #define TASKMGR_LOCK() \ 108 do \ 109 { \ 110 if (!smutex_lock(&Tskmgr.tm_w_mutex)) \ 111 smi_log(SMI_LOG_ERR, "TASKMGR_LOCK error"); \ 112 } while (0) 113 114 #define TASKMGR_UNLOCK() \ 115 do \ 116 { \ 117 if (!smutex_unlock(&Tskmgr.tm_w_mutex)) \ 118 smi_log(SMI_LOG_ERR, "TASKMGR_UNLOCK error"); \ 119 } while (0) 120 121 #define TASKMGR_COND_WAIT() \ 122 scond_timedwait(&Tskmgr.tm_w_cond, &Tskmgr.tm_w_mutex, COND_TIMEOUT) 123 124 #define TASKMGR_COND_SIGNAL() \ 125 do \ 126 { \ 127 if (scond_signal(&Tskmgr.tm_w_cond) != 0) \ 128 smi_log(SMI_LOG_ERR, "TASKMGR_COND_SIGNAL error"); \ 129 } while (0) 130 131 #define LAUNCH_WORKER(ctx) \ 132 do \ 133 { \ 134 int r; \ 135 sthread_t tid; \ 136 \ 137 if ((r = thread_create(&tid, mi_worker, ctx)) != 0) \ 138 smi_log(SMI_LOG_ERR, "LAUNCH_WORKER error: %s",\ 139 sm_errstring(r)); \ 140 } while (0) 141 142 #if POOL_DEBUG 143 # define POOL_LEV_DPRINTF(lev, x) \ 144 do { \ 145 if ((lev) < ctx->ctx_dbg) \ 146 sm_dprintf x; \ 147 } while (0) 148 #else /* POOL_DEBUG */ 149 # define POOL_LEV_DPRINTF(lev, x) 150 #endif /* POOL_DEBUG */ 151 152 /* 153 ** MI_START_SESSION -- Start a session in the pool of workers 154 ** 155 ** Parameters: 156 ** ctx -- context structure 157 ** 158 ** Returns: 159 ** MI_SUCCESS/MI_FAILURE 160 */ 161 162 int 163 mi_start_session(ctx) 164 SMFICTX_PTR ctx; 165 { 166 static long id = 0; 167 168 SM_ASSERT(Tskmgr.tm_signature == TM_SIGNATURE); 169 SM_ASSERT(ctx != NULL); 170 POOL_LEV_DPRINTF(4, ("PIPE r=[%d] w=[%d]", RD_PIPE, WR_PIPE)); 171 TASKMGR_LOCK(); 172 173 if (mi_list_add_ctx(ctx) != MI_SUCCESS) 174 { 175 TASKMGR_UNLOCK(); 176 return MI_FAILURE; 177 } 178 179 ctx->ctx_sid = id++; 180 181 /* if there is an idle worker, signal it, otherwise start new worker */ 182 if (Tskmgr.tm_nb_idle > 0) 183 { 184 ctx->ctx_wstate = WKST_READY_TO_RUN; 185 TASKMGR_COND_SIGNAL(); 186 } 187 else 188 { 189 ctx->ctx_wstate = WKST_RUNNING; 190 LAUNCH_WORKER(ctx); 191 } 192 TASKMGR_UNLOCK(); 193 return MI_SUCCESS; 194 } 195 196 /* 197 ** MI_CLOSE_SESSION -- Close a session and clean up data structures 198 ** 199 ** Parameters: 200 ** ctx -- context structure 201 ** 202 ** Returns: 203 ** MI_SUCCESS/MI_FAILURE 204 */ 205 206 static int 207 mi_close_session(ctx) 208 SMFICTX_PTR ctx; 209 { 210 SM_ASSERT(ctx != NULL); 211 212 (void) mi_list_del_ctx(ctx); 213 mi_clr_ctx(ctx); 214 215 return MI_SUCCESS; 216 } 217 218 /* 219 ** MI_POOL_CONTROLER_INIT -- Launch the worker pool controller 220 ** Must be called before starting sessions. 221 ** 222 ** Parameters: 223 ** none 224 ** 225 ** Returns: 226 ** MI_SUCCESS/MI_FAILURE 227 */ 228 229 int 230 mi_pool_controller_init() 231 { 232 sthread_t tid; 233 int r, i; 234 235 if (Tskmgr.tm_signature == TM_SIGNATURE) 236 return MI_SUCCESS; 237 238 SM_TAILQ_INIT(&WRK_CTX_HEAD); 239 Tskmgr.tm_tid = (sthread_t) -1; 240 Tskmgr.tm_nb_workers = 0; 241 Tskmgr.tm_nb_idle = 0; 242 243 if (pipe(Tskmgr.tm_p) != 0) 244 { 245 smi_log(SMI_LOG_ERR, "can't create event pipe: %s", 246 sm_errstring(errno)); 247 return MI_FAILURE; 248 } 249 250 (void) smutex_init(&Tskmgr.tm_w_mutex); 251 (void) scond_init(&Tskmgr.tm_w_cond); 252 253 /* Launch the pool controller */ 254 if ((r = thread_create(&tid, mi_pool_controller, (void *) NULL)) != 0) 255 { 256 smi_log(SMI_LOG_ERR, "can't create controller thread: %s", 257 sm_errstring(r)); 258 return MI_FAILURE; 259 } 260 Tskmgr.tm_tid = tid; 261 Tskmgr.tm_signature = TM_SIGNATURE; 262 263 /* Create the pool of workers */ 264 for (i = 0; i < MIN_WORKERS; i++) 265 { 266 if ((r = thread_create(&tid, mi_worker, (void *) NULL)) != 0) 267 { 268 smi_log(SMI_LOG_ERR, "can't create workers crew: %s", 269 sm_errstring(r)); 270 return MI_FAILURE; 271 } 272 } 273 274 return MI_SUCCESS; 275 } 276 277 /* 278 ** MI_POOL_CONTROLLER -- manage the pool of workers 279 ** This thread must be running when listener begins 280 ** starting sessions 281 ** 282 ** Parameters: 283 ** arg -- unused 284 ** 285 ** Returns: 286 ** NULL 287 ** 288 ** Control flow: 289 ** for (;;) 290 ** Look for timed out sessions 291 ** Select sessions to wait for sendmail command 292 ** Poll set of file descriptors 293 ** if timeout 294 ** continue 295 ** For each file descriptor ready 296 ** launch new thread if no worker available 297 ** else 298 ** signal waiting worker 299 */ 300 301 /* Poll structure array (pollfd) size step */ 302 #define PFD_STEP 256 303 304 #define WAIT_FD(i) (pfd[i].fd) 305 #define WAITFN "POLL" 306 307 static void * 308 mi_pool_controller(arg) 309 void *arg; 310 { 311 struct pollfd *pfd = NULL; 312 int dim_pfd = 0; 313 bool rebuild_set = true; 314 int pcnt = 0; /* error count for poll() failures */ 315 time_t lastcheck; 316 317 Tskmgr.tm_tid = sthread_get_id(); 318 if (pthread_detach(Tskmgr.tm_tid) != 0) 319 { 320 smi_log(SMI_LOG_ERR, "Failed to detach pool controller thread"); 321 return NULL; 322 } 323 324 pfd = (struct pollfd *) malloc(PFD_STEP * sizeof(struct pollfd)); 325 if (pfd == NULL) 326 { 327 smi_log(SMI_LOG_ERR, "Failed to malloc pollfd array: %s", 328 sm_errstring(errno)); 329 return NULL; 330 } 331 dim_pfd = PFD_STEP; 332 333 lastcheck = time(NULL); 334 for (;;) 335 { 336 SMFICTX_PTR ctx; 337 int nfd, rfd, i; 338 time_t now; 339 340 POOL_LEV_DPRINTF(4, ("Let's %s again...", WAITFN)); 341 342 if (mi_stop() != MILTER_CONT) 343 break; 344 345 TASKMGR_LOCK(); 346 347 now = time(NULL); 348 349 /* check for timed out sessions? */ 350 if (lastcheck + DT_CHECK_OLD_SESSIONS < now) 351 { 352 ctx = SM_TAILQ_FIRST(&WRK_CTX_HEAD); 353 while (ctx != SM_TAILQ_END(&WRK_CTX_HEAD)) 354 { 355 SMFICTX_PTR ctx_nxt; 356 357 ctx_nxt = SM_TAILQ_NEXT(ctx, ctx_link); 358 if (ctx->ctx_wstate == WKST_WAITING) 359 { 360 if (ctx->ctx_wait == 0) 361 ctx->ctx_wait = now; 362 else if (ctx->ctx_wait + OLD_SESSION_TIMEOUT 363 < now) 364 { 365 /* if session timed out, close it */ 366 sfsistat (*fi_close) __P((SMFICTX *)); 367 368 POOL_LEV_DPRINTF(4, 369 ("Closing old connection: sd=%d id=%d", 370 ctx->ctx_sd, 371 ctx->ctx_sid)); 372 373 if ((fi_close = ctx->ctx_smfi->xxfi_close) != NULL) 374 (void) (*fi_close)(ctx); 375 376 mi_close_session(ctx); 377 } 378 } 379 ctx = ctx_nxt; 380 } 381 lastcheck = now; 382 } 383 384 if (rebuild_set) 385 { 386 /* 387 ** Initialize poll set. 388 ** Insert into the poll set the file descriptors of 389 ** all sessions waiting for a command from sendmail. 390 */ 391 392 nfd = 0; 393 394 /* begin with worker pipe */ 395 pfd[nfd].fd = RD_PIPE; 396 pfd[nfd].events = MI_POLL_RD_FLAGS; 397 pfd[nfd].revents = 0; 398 nfd++; 399 400 SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link) 401 { 402 /* 403 ** update ctx_wait - start of wait moment - 404 ** for timeout 405 */ 406 407 if (ctx->ctx_wstate == WKST_READY_TO_WAIT) 408 ctx->ctx_wait = now; 409 410 /* add the session to the pollfd array? */ 411 if ((ctx->ctx_wstate == WKST_READY_TO_WAIT) || 412 (ctx->ctx_wstate == WKST_WAITING)) 413 { 414 /* 415 ** Resize the pollfd array if it 416 ** isn't large enough. 417 */ 418 419 if (nfd >= dim_pfd) 420 { 421 struct pollfd *tpfd; 422 size_t new; 423 424 new = (dim_pfd + PFD_STEP) * 425 sizeof(*tpfd); 426 tpfd = (struct pollfd *) 427 realloc(pfd, new); 428 if (tpfd != NULL) 429 { 430 pfd = tpfd; 431 dim_pfd += PFD_STEP; 432 } 433 else 434 { 435 smi_log(SMI_LOG_ERR, 436 "Failed to realloc pollfd array:%s", 437 sm_errstring(errno)); 438 } 439 } 440 441 /* add the session to pollfd array */ 442 if (nfd < dim_pfd) 443 { 444 ctx->ctx_wstate = WKST_WAITING; 445 pfd[nfd].fd = ctx->ctx_sd; 446 pfd[nfd].events = MI_POLL_RD_FLAGS; 447 pfd[nfd].revents = 0; 448 nfd++; 449 } 450 } 451 } 452 rebuild_set = false; 453 } 454 455 TASKMGR_UNLOCK(); 456 457 /* Everything is ready, let's wait for an event */ 458 rfd = poll(pfd, nfd, POLL_TIMEOUT); 459 460 POOL_LEV_DPRINTF(4, ("%s returned: at epoch %d value %d", 461 WAITFN, now, nfd)); 462 463 /* timeout */ 464 if (rfd == 0) 465 continue; 466 467 rebuild_set = true; 468 469 /* error */ 470 if (rfd < 0) 471 { 472 if (errno == EINTR) 473 continue; 474 pcnt++; 475 smi_log(SMI_LOG_ERR, 476 "%s() failed (%s), %s", 477 WAITFN, sm_errstring(errno), 478 pcnt >= MAX_FAILS_S ? "abort" : "try again"); 479 480 if (pcnt >= MAX_FAILS_S) 481 goto err; 482 } 483 pcnt = 0; 484 485 /* something happened */ 486 for (i = 0; i < nfd; i++) 487 { 488 if (pfd[i].revents == 0) 489 continue; 490 491 POOL_LEV_DPRINTF(4, ("%s event on pfd[%d/%d]=%d ", 492 WAITFN, i, nfd, 493 WAIT_FD(i))); 494 495 /* has a worker signaled an end of task ? */ 496 if (WAIT_FD(i) == RD_PIPE) 497 { 498 char evt = 0; 499 int r = 0; 500 501 POOL_LEV_DPRINTF(4, 502 ("PIPE WILL READ evt = %08X %08X", 503 pfd[i].events, pfd[i].revents)); 504 505 if ((pfd[i].revents & MI_POLL_RD_FLAGS) != 0) 506 { 507 r = read(RD_PIPE, &evt, sizeof(evt)); 508 if (r == sizeof(evt)) 509 { 510 /* Do nothing */ 511 } 512 } 513 514 POOL_LEV_DPRINTF(4, 515 ("PIPE DONE READ i=[%d] fd=[%d] r=[%d] evt=[%d]", 516 i, RD_PIPE, r, evt)); 517 518 if ((pfd[i].revents & ~MI_POLL_RD_FLAGS) != 0) 519 { 520 /* Exception handling */ 521 } 522 continue; 523 } 524 525 /* no ! sendmail wants to send a command */ 526 SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link) 527 { 528 if (ctx->ctx_wstate != WKST_WAITING) 529 continue; 530 531 POOL_LEV_DPRINTF(4, 532 ("Checking context sd=%d - fd=%d ", 533 ctx->ctx_sd , WAIT_FD(i))); 534 535 if (ctx->ctx_sd == pfd[i].fd) 536 { 537 TASKMGR_LOCK(); 538 539 POOL_LEV_DPRINTF(4, 540 ("TASK: found %d for fd[%d]=%d", 541 ctx->ctx_sid, i, WAIT_FD(i))); 542 543 if (Tskmgr.tm_nb_idle > 0) 544 { 545 ctx->ctx_wstate = WKST_READY_TO_RUN; 546 TASKMGR_COND_SIGNAL(); 547 } 548 else 549 { 550 ctx->ctx_wstate = WKST_RUNNING; 551 LAUNCH_WORKER(ctx); 552 } 553 TASKMGR_UNLOCK(); 554 break; 555 } 556 } 557 558 POOL_LEV_DPRINTF(4, 559 ("TASK %s FOUND - Checking PIPE for fd[%d]", 560 ctx != NULL ? "" : "NOT", WAIT_FD(i))); 561 } 562 } 563 564 err: 565 if (pfd != NULL) 566 free(pfd); 567 568 Tskmgr.tm_signature = 0; 569 for (;;) 570 { 571 SMFICTX_PTR ctx; 572 573 ctx = SM_TAILQ_FIRST(&WRK_CTX_HEAD); 574 if (ctx == NULL) 575 break; 576 mi_close_session(ctx); 577 } 578 579 (void) smutex_destroy(&Tskmgr.tm_w_mutex); 580 (void) scond_destroy(&Tskmgr.tm_w_cond); 581 582 return NULL; 583 } 584 585 /* 586 ** Look for a task ready to run. 587 ** Value of ctx is NULL or a pointer to a task ready to run. 588 */ 589 590 #define GET_TASK_READY_TO_RUN() \ 591 SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link) \ 592 { \ 593 if (ctx->ctx_wstate == WKST_READY_TO_RUN) \ 594 { \ 595 ctx->ctx_wstate = WKST_RUNNING; \ 596 break; \ 597 } \ 598 } 599 600 /* 601 ** MI_WORKER -- worker thread 602 ** executes tasks distributed by the mi_pool_controller 603 ** or by mi_start_session 604 ** 605 ** Parameters: 606 ** arg -- pointer to context structure 607 ** 608 ** Returns: 609 ** NULL pointer 610 */ 611 612 static void * 613 mi_worker(arg) 614 void *arg; 615 { 616 SMFICTX_PTR ctx; 617 bool done; 618 sthread_t t_id; 619 int r; 620 621 ctx = (SMFICTX_PTR) arg; 622 done = false; 623 if (ctx != NULL) 624 ctx->ctx_wstate = WKST_RUNNING; 625 626 t_id = sthread_get_id(); 627 if (pthread_detach(t_id) != 0) 628 { 629 smi_log(SMI_LOG_ERR, "Failed to detach worker thread"); 630 if (ctx != NULL) 631 ctx->ctx_wstate = WKST_READY_TO_RUN; 632 return NULL; 633 } 634 635 TASKMGR_LOCK(); 636 Tskmgr.tm_nb_workers++; 637 TASKMGR_UNLOCK(); 638 639 while (!done) 640 { 641 if (mi_stop() != MILTER_CONT) 642 break; 643 644 /* let's handle next task... */ 645 if (ctx != NULL) 646 { 647 int res; 648 649 POOL_LEV_DPRINTF(4, 650 ("worker %d: new task -> let's handle it", 651 t_id)); 652 res = mi_engine(ctx); 653 POOL_LEV_DPRINTF(4, 654 ("worker %d: mi_engine returned %d", t_id, res)); 655 656 TASKMGR_LOCK(); 657 if (res != MI_CONTINUE) 658 { 659 ctx->ctx_wstate = WKST_CLOSING; 660 661 /* 662 ** Delete context from linked list of 663 ** sessions and close session. 664 */ 665 666 mi_close_session(ctx); 667 } 668 else 669 { 670 ctx->ctx_wstate = WKST_READY_TO_WAIT; 671 672 POOL_LEV_DPRINTF(4, 673 ("writing to event pipe...")); 674 675 /* 676 ** Signal task controller to add new session 677 ** to poll set. 678 */ 679 680 PIPE_SEND_SIGNAL(); 681 } 682 TASKMGR_UNLOCK(); 683 ctx = NULL; 684 685 } 686 687 /* check if there is any task waiting to be served */ 688 TASKMGR_LOCK(); 689 690 GET_TASK_READY_TO_RUN(); 691 692 /* Got a task? */ 693 if (ctx != NULL) 694 { 695 TASKMGR_UNLOCK(); 696 continue; 697 } 698 699 /* 700 ** if not, let's check if there is enough idle workers 701 ** if yes: quit 702 */ 703 704 if (Tskmgr.tm_nb_workers > MIN_WORKERS && 705 Tskmgr.tm_nb_idle > MIN_IDLE) 706 done = true; 707 708 POOL_LEV_DPRINTF(4, ("worker %d: checking ... %d %d", t_id, 709 Tskmgr.tm_nb_workers, Tskmgr.tm_nb_idle + 1)); 710 711 if (done) 712 { 713 POOL_LEV_DPRINTF(4, ("worker %d: quitting... ", t_id)); 714 Tskmgr.tm_nb_workers--; 715 TASKMGR_UNLOCK(); 716 continue; 717 } 718 719 /* 720 ** if no task ready to run, wait for another one 721 */ 722 723 Tskmgr.tm_nb_idle++; 724 TASKMGR_COND_WAIT(); 725 Tskmgr.tm_nb_idle--; 726 727 /* look for a task */ 728 GET_TASK_READY_TO_RUN(); 729 730 TASKMGR_UNLOCK(); 731 } 732 return NULL; 733 } 734 735 /* 736 ** MI_LIST_ADD_CTX -- add new session to linked list 737 ** 738 ** Parameters: 739 ** ctx -- context structure 740 ** 741 ** Returns: 742 ** MI_FAILURE/MI_SUCCESS 743 */ 744 745 static int 746 mi_list_add_ctx(ctx) 747 SMFICTX_PTR ctx; 748 { 749 SM_ASSERT(ctx != NULL); 750 SM_TAILQ_INSERT_TAIL(&WRK_CTX_HEAD, ctx, ctx_link); 751 return MI_SUCCESS; 752 } 753 754 /* 755 ** MI_LIST_DEL_CTX -- remove session from linked list when finished 756 ** 757 ** Parameters: 758 ** ctx -- context structure 759 ** 760 ** Returns: 761 ** MI_FAILURE/MI_SUCCESS 762 */ 763 764 static int 765 mi_list_del_ctx(ctx) 766 SMFICTX_PTR ctx; 767 { 768 SM_ASSERT(ctx != NULL); 769 if (SM_TAILQ_EMPTY(&WRK_CTX_HEAD)) 770 return MI_FAILURE; 771 772 SM_TAILQ_REMOVE(&WRK_CTX_HEAD, ctx, ctx_link); 773 return MI_SUCCESS; 774 } 775 #endif /* _FFR_WORKERS_POOL */ 776