1 /* 2 * Copyright (c) 2003-2004, 2007 Sendmail, Inc. and its suppliers. 3 * All rights reserved. 4 * 5 * By using this file, you agree to the terms and conditions set 6 * forth in the LICENSE file which can be found at the top level of 7 * the sendmail distribution. 8 * 9 * Contributed by Jose Marcio Martins da Cruz - Ecole des Mines de Paris 10 * Jose-Marcio.Martins@ensmp.fr 11 */ 12 13 #include <sm/gen.h> 14 SM_RCSID("@(#)$Id: worker.c,v 8.10 2007/12/03 22:06:05 ca Exp $") 15 16 #include "libmilter.h" 17 18 #if _FFR_WORKERS_POOL 19 20 typedef struct taskmgr_S taskmgr_T; 21 22 #define TM_SIGNATURE 0x23021957 23 24 struct taskmgr_S 25 { 26 long tm_signature; /* has the controller been initialized */ 27 sthread_t tm_tid; /* thread id of controller */ 28 smfi_hd_T tm_ctx_head; /* head of the linked list of contexts */ 29 30 int tm_nb_workers; /* number of workers in the pool */ 31 int tm_nb_idle; /* number of workers waiting */ 32 33 int tm_p[2]; /* poll control pipe */ 34 35 smutex_t tm_w_mutex; /* linked list access mutex */ 36 scond_t tm_w_cond; /* */ 37 }; 38 39 static taskmgr_T Tskmgr = {0}; 40 41 #define WRK_CTX_HEAD Tskmgr.tm_ctx_head 42 43 #define RD_PIPE (Tskmgr.tm_p[0]) 44 #define WR_PIPE (Tskmgr.tm_p[1]) 45 46 #define PIPE_SEND_SIGNAL() \ 47 do \ 48 { \ 49 char evt = 0x5a; \ 50 int fd = WR_PIPE; \ 51 if (write(fd, &evt, sizeof(evt)) != sizeof(evt)) \ 52 smi_log(SMI_LOG_ERR, \ 53 "Error writing to event pipe: %s", \ 54 sm_errstring(errno)); \ 55 } while (0) 56 57 #ifndef USE_PIPE_WAKE_POLL 58 # define USE_PIPE_WAKE_POLL 1 59 #endif /* USE_PIPE_WAKE_POLL */ 60 61 /* poll check periodicity (default 10000 - 10 s) */ 62 #define POLL_TIMEOUT 10000 63 64 /* worker conditional wait timeout (default 10 s) */ 65 #define COND_TIMEOUT 10 66 67 /* functions */ 68 static int mi_close_session __P((SMFICTX_PTR)); 69 70 static void *mi_worker __P((void *)); 71 static void *mi_pool_controller __P((void *)); 72 73 static int mi_list_add_ctx __P((SMFICTX_PTR)); 74 static int mi_list_del_ctx __P((SMFICTX_PTR)); 75 76 /* 77 ** periodicity of cleaning up old sessions (timedout) 78 ** sessions list will be checked to find old inactive 79 ** sessions each DT_CHECK_OLD_SESSIONS sec 80 */ 81 82 #define DT_CHECK_OLD_SESSIONS 600 83 84 #ifndef OLD_SESSION_TIMEOUT 85 # define OLD_SESSION_TIMEOUT ctx->ctx_timeout 86 #endif /* OLD_SESSION_TIMEOUT */ 87 88 /* session states - with respect to the pool of workers */ 89 #define WKST_INIT 0 /* initial state */ 90 #define WKST_READY_TO_RUN 1 /* command ready do be read */ 91 #define WKST_RUNNING 2 /* session running on a worker */ 92 #define WKST_READY_TO_WAIT 3 /* session just finished by a worker */ 93 #define WKST_WAITING 4 /* waiting for new command */ 94 #define WKST_CLOSING 5 /* session finished */ 95 96 #ifndef MIN_WORKERS 97 # define MIN_WORKERS 2 /* minimum number of threads to keep around */ 98 #endif 99 100 #define MIN_IDLE 1 /* minimum number of idle threads */ 101 102 103 /* 104 ** Macros for threads and mutex management 105 */ 106 107 #define TASKMGR_LOCK() \ 108 do \ 109 { \ 110 if (!smutex_lock(&Tskmgr.tm_w_mutex)) \ 111 smi_log(SMI_LOG_ERR, "TASKMGR_LOCK error"); \ 112 } while (0) 113 114 #define TASKMGR_UNLOCK() \ 115 do \ 116 { \ 117 if (!smutex_unlock(&Tskmgr.tm_w_mutex)) \ 118 smi_log(SMI_LOG_ERR, "TASKMGR_UNLOCK error"); \ 119 } while (0) 120 121 #define TASKMGR_COND_WAIT() \ 122 scond_timedwait(&Tskmgr.tm_w_cond, &Tskmgr.tm_w_mutex, COND_TIMEOUT) 123 124 #define TASKMGR_COND_SIGNAL() \ 125 do \ 126 { \ 127 if (scond_signal(&Tskmgr.tm_w_cond) != 0) \ 128 smi_log(SMI_LOG_ERR, "TASKMGR_COND_SIGNAL error"); \ 129 } while (0) 130 131 #define LAUNCH_WORKER(ctx) \ 132 do \ 133 { \ 134 int r; \ 135 sthread_t tid; \ 136 \ 137 if ((r = thread_create(&tid, mi_worker, ctx)) != 0) \ 138 smi_log(SMI_LOG_ERR, "LAUNCH_WORKER error: %s",\ 139 sm_errstring(r)); \ 140 } while (0) 141 142 #if POOL_DEBUG 143 # define POOL_LEV_DPRINTF(lev, x) \ 144 do { \ 145 if ((lev) < ctx->ctx_dbg) \ 146 sm_dprintf x; \ 147 } while (0) 148 #else /* POOL_DEBUG */ 149 # define POOL_LEV_DPRINTF(lev, x) 150 #endif /* POOL_DEBUG */ 151 152 /* 153 ** MI_START_SESSION -- Start a session in the pool of workers 154 ** 155 ** Parameters: 156 ** ctx -- context structure 157 ** 158 ** Returns: 159 ** MI_SUCCESS/MI_FAILURE 160 */ 161 162 int 163 mi_start_session(ctx) 164 SMFICTX_PTR ctx; 165 { 166 static long id = 0; 167 168 SM_ASSERT(Tskmgr.tm_signature == TM_SIGNATURE); 169 SM_ASSERT(ctx != NULL); 170 POOL_LEV_DPRINTF(4, ("PIPE r=[%d] w=[%d]", RD_PIPE, WR_PIPE)); 171 TASKMGR_LOCK(); 172 173 if (mi_list_add_ctx(ctx) != MI_SUCCESS) 174 { 175 TASKMGR_UNLOCK(); 176 return MI_FAILURE; 177 } 178 179 ctx->ctx_sid = id++; 180 181 /* if there is an idle worker, signal it, otherwise start new worker */ 182 if (Tskmgr.tm_nb_idle > 0) 183 { 184 ctx->ctx_wstate = WKST_READY_TO_RUN; 185 TASKMGR_COND_SIGNAL(); 186 } 187 else 188 { 189 ctx->ctx_wstate = WKST_RUNNING; 190 LAUNCH_WORKER(ctx); 191 } 192 TASKMGR_UNLOCK(); 193 return MI_SUCCESS; 194 } 195 196 /* 197 ** MI_CLOSE_SESSION -- Close a session and clean up data structures 198 ** 199 ** Parameters: 200 ** ctx -- context structure 201 ** 202 ** Returns: 203 ** MI_SUCCESS/MI_FAILURE 204 */ 205 206 static int 207 mi_close_session(ctx) 208 SMFICTX_PTR ctx; 209 { 210 SM_ASSERT(ctx != NULL); 211 212 (void) mi_list_del_ctx(ctx); 213 if (ValidSocket(ctx->ctx_sd)) 214 { 215 (void) closesocket(ctx->ctx_sd); 216 ctx->ctx_sd = INVALID_SOCKET; 217 } 218 if (ctx->ctx_reply != NULL) 219 { 220 free(ctx->ctx_reply); 221 ctx->ctx_reply = NULL; 222 } 223 if (ctx->ctx_privdata != NULL) 224 { 225 smi_log(SMI_LOG_WARN, "%s: private data not NULL", 226 ctx->ctx_smfi->xxfi_name); 227 } 228 mi_clr_macros(ctx, 0); 229 free(ctx); 230 231 return MI_SUCCESS; 232 } 233 234 /* 235 ** MI_POOL_CONTROLER_INIT -- Launch the worker pool controller 236 ** Must be called before starting sessions. 237 ** 238 ** Parameters: 239 ** none 240 ** 241 ** Returns: 242 ** MI_SUCCESS/MI_FAILURE 243 */ 244 245 int 246 mi_pool_controller_init() 247 { 248 sthread_t tid; 249 int r, i; 250 251 if (Tskmgr.tm_signature == TM_SIGNATURE) 252 return MI_SUCCESS; 253 254 SM_TAILQ_INIT(&WRK_CTX_HEAD); 255 Tskmgr.tm_tid = (sthread_t) -1; 256 Tskmgr.tm_nb_workers = 0; 257 Tskmgr.tm_nb_idle = 0; 258 259 if (pipe(Tskmgr.tm_p) != 0) 260 { 261 smi_log(SMI_LOG_ERR, "can't create event pipe: %s", 262 sm_errstring(r)); 263 return MI_FAILURE; 264 } 265 266 (void) smutex_init(&Tskmgr.tm_w_mutex); 267 (void) scond_init(&Tskmgr.tm_w_cond); 268 269 /* Launch the pool controller */ 270 if ((r = thread_create(&tid, mi_pool_controller, (void *) NULL)) != 0) 271 { 272 smi_log(SMI_LOG_ERR, "can't create controller thread: %s", 273 sm_errstring(r)); 274 return MI_FAILURE; 275 } 276 Tskmgr.tm_tid = tid; 277 Tskmgr.tm_signature = TM_SIGNATURE; 278 279 /* Create the pool of workers */ 280 for (i = 0; i < MIN_WORKERS; i++) 281 { 282 if ((r = thread_create(&tid, mi_worker, (void *) NULL)) != 0) 283 { 284 smi_log(SMI_LOG_ERR, "can't create workers crew: %s", 285 sm_errstring(r)); 286 return MI_FAILURE; 287 } 288 } 289 290 return MI_SUCCESS; 291 } 292 293 /* 294 ** MI_POOL_CONTROLLER -- manage the pool of workers 295 ** This thread must be running when listener begins 296 ** starting sessions 297 ** 298 ** Parameters: 299 ** arg -- unused 300 ** 301 ** Returns: 302 ** NULL 303 ** 304 ** Control flow: 305 ** for (;;) 306 ** Look for timed out sessions 307 ** Select sessions to wait for sendmail command 308 ** Poll set of file descriptors 309 ** if timeout 310 ** continue 311 ** For each file descriptor ready 312 ** launch new thread if no worker available 313 ** else 314 ** signal waiting worker 315 */ 316 317 /* Poll structure array (pollfd) size step */ 318 #define PFD_STEP 256 319 320 #define WAIT_FD(i) (pfd[i].fd) 321 #define WAITFN "POLL" 322 323 static void * 324 mi_pool_controller(arg) 325 void *arg; 326 { 327 struct pollfd *pfd = NULL; 328 int dim_pfd = 0; 329 bool rebuild_set = true; 330 int pcnt = 0; /* error count for poll() failures */ 331 332 Tskmgr.tm_tid = sthread_get_id(); 333 if (pthread_detach(Tskmgr.tm_tid) != 0) 334 { 335 smi_log(SMI_LOG_ERR, "Failed to detach pool controller thread"); 336 return NULL; 337 } 338 339 pfd = (struct pollfd *) malloc(PFD_STEP * sizeof(struct pollfd)); 340 if (pfd == NULL) 341 { 342 smi_log(SMI_LOG_ERR, "Failed to malloc pollfd array: %s", 343 sm_errstring(errno)); 344 return NULL; 345 } 346 dim_pfd = PFD_STEP; 347 348 for (;;) 349 { 350 SMFICTX_PTR ctx; 351 int nfd, rfd, i; 352 time_t now; 353 time_t lastcheck; 354 355 POOL_LEV_DPRINTF(4, ("Let's %s again...", WAITFN)); 356 357 if (mi_stop() != MILTER_CONT) 358 break; 359 360 TASKMGR_LOCK(); 361 362 now = time(NULL); 363 364 /* check for timed out sessions? */ 365 if (lastcheck + DT_CHECK_OLD_SESSIONS < now) 366 { 367 SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link) 368 { 369 if (ctx->ctx_wstate == WKST_WAITING) 370 { 371 if (ctx->ctx_wait == 0) 372 { 373 ctx->ctx_wait = now; 374 continue; 375 } 376 377 /* if session timed out, close it */ 378 if (ctx->ctx_wait + OLD_SESSION_TIMEOUT 379 < now) 380 { 381 sfsistat (*fi_close) __P((SMFICTX *)); 382 383 POOL_LEV_DPRINTF(4, 384 ("Closing old connection: sd=%d id=%d", 385 ctx->ctx_sd, 386 ctx->ctx_sid)); 387 388 if ((fi_close = ctx->ctx_smfi->xxfi_close) != NULL) 389 (void) (*fi_close)(ctx); 390 391 mi_close_session(ctx); 392 ctx = SM_TAILQ_FIRST(&WRK_CTX_HEAD); 393 continue; 394 } 395 } 396 } 397 lastcheck = now; 398 } 399 400 if (rebuild_set) 401 { 402 /* 403 ** Initialize poll set. 404 ** Insert into the poll set the file descriptors of 405 ** all sessions waiting for a command from sendmail. 406 */ 407 408 nfd = 0; 409 410 /* begin with worker pipe */ 411 pfd[nfd].fd = RD_PIPE; 412 pfd[nfd].events = MI_POLL_RD_FLAGS; 413 pfd[nfd].revents = 0; 414 nfd++; 415 416 SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link) 417 { 418 /* 419 ** update ctx_wait - start of wait moment - 420 ** for timeout 421 */ 422 423 if (ctx->ctx_wstate == WKST_READY_TO_WAIT) 424 ctx->ctx_wait = now; 425 426 /* add the session to the pollfd array? */ 427 if ((ctx->ctx_wstate == WKST_READY_TO_WAIT) || 428 (ctx->ctx_wstate == WKST_WAITING)) 429 { 430 /* 431 ** Resize the pollfd array if it 432 ** isn't large enough. 433 */ 434 435 if (nfd >= dim_pfd) 436 { 437 struct pollfd *tpfd; 438 size_t new; 439 440 new = (dim_pfd + PFD_STEP) * 441 sizeof(*tpfd); 442 tpfd = (struct pollfd *) 443 realloc(pfd, new); 444 if (tpfd != NULL) 445 { 446 pfd = tpfd; 447 dim_pfd += PFD_STEP; 448 } 449 else 450 { 451 smi_log(SMI_LOG_ERR, 452 "Failed to realloc pollfd array:%s", 453 sm_errstring(errno)); 454 } 455 } 456 457 /* add the session to pollfd array */ 458 if (nfd < dim_pfd) 459 { 460 ctx->ctx_wstate = WKST_WAITING; 461 pfd[nfd].fd = ctx->ctx_sd; 462 pfd[nfd].events = MI_POLL_RD_FLAGS; 463 pfd[nfd].revents = 0; 464 nfd++; 465 } 466 } 467 } 468 } 469 470 TASKMGR_UNLOCK(); 471 472 /* Everything is ready, let's wait for an event */ 473 rfd = poll(pfd, nfd, POLL_TIMEOUT); 474 475 POOL_LEV_DPRINTF(4, ("%s returned: at epoch %d value %d", 476 WAITFN, now, nfd)); 477 478 /* timeout */ 479 if (rfd == 0) 480 continue; 481 482 rebuild_set = true; 483 484 /* error */ 485 if (rfd < 0) 486 { 487 if (errno == EINTR) 488 continue; 489 pcnt++; 490 smi_log(SMI_LOG_ERR, 491 "%s() failed (%s), %s", 492 WAITFN, sm_errstring(errno), 493 pcnt >= MAX_FAILS_S ? "abort" : "try again"); 494 495 if (pcnt >= MAX_FAILS_S) 496 goto err; 497 } 498 pcnt = 0; 499 500 /* something happened */ 501 for (i = 0; i < nfd; i++) 502 { 503 if (pfd[i].revents == 0) 504 continue; 505 506 POOL_LEV_DPRINTF(4, ("%s event on pfd[%d/%d]=%d ", 507 WAITFN, i, nfd, 508 WAIT_FD(i))); 509 510 /* has a worker signaled an end of task ? */ 511 if (WAIT_FD(i) == RD_PIPE) 512 { 513 char evt = 0; 514 int r = 0; 515 516 POOL_LEV_DPRINTF(4, 517 ("PIPE WILL READ evt = %08X %08X", 518 pfd[i].events, pfd[i].revents)); 519 520 if ((pfd[i].revents & MI_POLL_RD_FLAGS) != 0) 521 { 522 r = read(RD_PIPE, &evt, sizeof(evt)); 523 if (r == sizeof(evt)) 524 { 525 /* Do nothing */ 526 } 527 } 528 529 POOL_LEV_DPRINTF(4, 530 ("PIPE DONE READ i=[%d] fd=[%d] r=[%d] evt=[%d]", 531 i, RD_PIPE, r, evt)); 532 533 if ((pfd[i].revents & ~MI_POLL_RD_FLAGS) != 0) 534 { 535 /* Exception handling */ 536 } 537 continue; 538 } 539 540 /* no ! sendmail wants to send a command */ 541 SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link) 542 { 543 if (ctx->ctx_wstate != WKST_WAITING) 544 continue; 545 546 POOL_LEV_DPRINTF(4, 547 ("Checking context sd=%d - fd=%d ", 548 ctx->ctx_sd , WAIT_FD(i))); 549 550 if (ctx->ctx_sd == pfd[i].fd) 551 { 552 TASKMGR_LOCK(); 553 554 POOL_LEV_DPRINTF(4, 555 ("TASK: found %d for fd[%d]=%d", 556 ctx->ctx_sid, i, WAIT_FD(i))); 557 558 if (Tskmgr.tm_nb_idle > 0) 559 { 560 ctx->ctx_wstate = WKST_READY_TO_RUN; 561 TASKMGR_COND_SIGNAL(); 562 } 563 else 564 { 565 ctx->ctx_wstate = WKST_RUNNING; 566 LAUNCH_WORKER(ctx); 567 } 568 TASKMGR_UNLOCK(); 569 break; 570 } 571 } 572 573 POOL_LEV_DPRINTF(4, 574 ("TASK %s FOUND - Checking PIPE for fd[%d]", 575 ctx != NULL ? "" : "NOT", WAIT_FD(i))); 576 } 577 } 578 579 err: 580 if (pfd != NULL) 581 free(pfd); 582 583 Tskmgr.tm_signature = 0; 584 for (;;) 585 { 586 SMFICTX_PTR ctx; 587 588 ctx = SM_TAILQ_FIRST(&WRK_CTX_HEAD); 589 if (ctx == NULL) 590 break; 591 mi_close_session(ctx); 592 } 593 594 (void) smutex_destroy(&Tskmgr.tm_w_mutex); 595 (void) scond_destroy(&Tskmgr.tm_w_cond); 596 597 return NULL; 598 } 599 600 /* 601 ** Look for a task ready to run. 602 ** Value of ctx is NULL or a pointer to a task ready to run. 603 */ 604 605 #define GET_TASK_READY_TO_RUN() \ 606 SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link) \ 607 { \ 608 if (ctx->ctx_wstate == WKST_READY_TO_RUN) \ 609 { \ 610 ctx->ctx_wstate = WKST_RUNNING; \ 611 break; \ 612 } \ 613 } 614 615 /* 616 ** MI_WORKER -- worker thread 617 ** executes tasks distributed by the mi_pool_controller 618 ** or by mi_start_session 619 ** 620 ** Parameters: 621 ** arg -- pointer to context structure 622 ** 623 ** Returns: 624 ** NULL pointer 625 */ 626 627 static void * 628 mi_worker(arg) 629 void *arg; 630 { 631 SMFICTX_PTR ctx; 632 bool done; 633 sthread_t t_id; 634 int r; 635 636 ctx = (SMFICTX_PTR) arg; 637 done = false; 638 if (ctx != NULL) 639 ctx->ctx_wstate = WKST_RUNNING; 640 641 t_id = sthread_get_id(); 642 if (pthread_detach(t_id) != 0) 643 { 644 smi_log(SMI_LOG_ERR, "Failed to detach worker thread"); 645 if (ctx != NULL) 646 ctx->ctx_wstate = WKST_READY_TO_RUN; 647 return NULL; 648 } 649 650 TASKMGR_LOCK(); 651 Tskmgr.tm_nb_workers++; 652 TASKMGR_UNLOCK(); 653 654 while (!done) 655 { 656 if (mi_stop() != MILTER_CONT) 657 break; 658 659 /* let's handle next task... */ 660 if (ctx != NULL) 661 { 662 int res; 663 664 POOL_LEV_DPRINTF(4, 665 ("worker %d: new task -> let's handle it", 666 t_id)); 667 res = mi_engine(ctx); 668 POOL_LEV_DPRINTF(4, 669 ("worker %d: mi_engine returned %d", t_id, res)); 670 671 TASKMGR_LOCK(); 672 if (res != MI_CONTINUE) 673 { 674 ctx->ctx_wstate = WKST_CLOSING; 675 676 /* 677 ** Delete context from linked list of 678 ** sessions and close session. 679 */ 680 681 mi_close_session(ctx); 682 } 683 else 684 { 685 ctx->ctx_wstate = WKST_READY_TO_WAIT; 686 687 POOL_LEV_DPRINTF(4, 688 ("writing to event pipe...")); 689 690 /* 691 ** Signal task controller to add new session 692 ** to poll set. 693 */ 694 695 PIPE_SEND_SIGNAL(); 696 } 697 TASKMGR_UNLOCK(); 698 ctx = NULL; 699 700 } 701 702 /* check if there is any task waiting to be served */ 703 TASKMGR_LOCK(); 704 705 GET_TASK_READY_TO_RUN(); 706 707 /* Got a task? */ 708 if (ctx != NULL) 709 { 710 TASKMGR_UNLOCK(); 711 continue; 712 } 713 714 /* 715 ** if not, let's check if there is enough idle workers 716 ** if yes: quit 717 */ 718 719 if (Tskmgr.tm_nb_workers > MIN_WORKERS && 720 Tskmgr.tm_nb_idle > MIN_IDLE) 721 done = true; 722 723 POOL_LEV_DPRINTF(4, ("worker %d: checking ... %d %d", t_id, 724 Tskmgr.tm_nb_workers, Tskmgr.tm_nb_idle + 1)); 725 726 if (done) 727 { 728 POOL_LEV_DPRINTF(4, ("worker %d: quitting... ", t_id)); 729 Tskmgr.tm_nb_workers--; 730 TASKMGR_UNLOCK(); 731 continue; 732 } 733 734 /* 735 ** if no task ready to run, wait for another one 736 */ 737 738 Tskmgr.tm_nb_idle++; 739 TASKMGR_COND_WAIT(); 740 Tskmgr.tm_nb_idle--; 741 742 /* look for a task */ 743 GET_TASK_READY_TO_RUN(); 744 745 TASKMGR_UNLOCK(); 746 } 747 return NULL; 748 } 749 750 /* 751 ** MI_LIST_ADD_CTX -- add new session to linked list 752 ** 753 ** Parameters: 754 ** ctx -- context structure 755 ** 756 ** Returns: 757 ** MI_FAILURE/MI_SUCCESS 758 */ 759 760 static int 761 mi_list_add_ctx(ctx) 762 SMFICTX_PTR ctx; 763 { 764 SM_ASSERT(ctx != NULL); 765 SM_TAILQ_INSERT_TAIL(&WRK_CTX_HEAD, ctx, ctx_link); 766 return MI_SUCCESS; 767 } 768 769 /* 770 ** MI_LIST_DEL_CTX -- remove session from linked list when finished 771 ** 772 ** Parameters: 773 ** ctx -- context structure 774 ** 775 ** Returns: 776 ** MI_FAILURE/MI_SUCCESS 777 */ 778 779 static int 780 mi_list_del_ctx(ctx) 781 SMFICTX_PTR ctx; 782 { 783 SM_ASSERT(ctx != NULL); 784 if (SM_TAILQ_EMPTY(&WRK_CTX_HEAD)) 785 return MI_FAILURE; 786 787 SM_TAILQ_REMOVE(&WRK_CTX_HEAD, ctx, ctx_link); 788 return MI_SUCCESS; 789 } 790 #endif /* _FFR_WORKERS_POOL */ 791