1 /* 2 * Copyright (c) 2003-2004, 2007 Sendmail, Inc. and its suppliers. 3 * All rights reserved. 4 * 5 * By using this file, you agree to the terms and conditions set 6 * forth in the LICENSE file which can be found at the top level of 7 * the sendmail distribution. 8 * 9 * Contributed by Jose Marcio Martins da Cruz - Ecole des Mines de Paris 10 * Jose-Marcio.Martins@ensmp.fr 11 */ 12 13 #pragma ident "%Z%%M% %I% %E% SMI" 14 15 #include <sm/gen.h> 16 SM_RCSID("@(#)$Id: worker.c,v 8.10 2007/12/03 22:06:05 ca Exp $") 17 18 #include "libmilter.h" 19 20 #if _FFR_WORKERS_POOL 21 22 typedef struct taskmgr_S taskmgr_T; 23 24 #define TM_SIGNATURE 0x23021957 25 26 struct taskmgr_S 27 { 28 long tm_signature; /* has the controller been initialized */ 29 sthread_t tm_tid; /* thread id of controller */ 30 smfi_hd_T tm_ctx_head; /* head of the linked list of contexts */ 31 32 int tm_nb_workers; /* number of workers in the pool */ 33 int tm_nb_idle; /* number of workers waiting */ 34 35 int tm_p[2]; /* poll control pipe */ 36 37 smutex_t tm_w_mutex; /* linked list access mutex */ 38 scond_t tm_w_cond; /* */ 39 }; 40 41 static taskmgr_T Tskmgr = {0}; 42 43 #define WRK_CTX_HEAD Tskmgr.tm_ctx_head 44 45 #define RD_PIPE (Tskmgr.tm_p[0]) 46 #define WR_PIPE (Tskmgr.tm_p[1]) 47 48 #define PIPE_SEND_SIGNAL() \ 49 do \ 50 { \ 51 char evt = 0x5a; \ 52 int fd = WR_PIPE; \ 53 if (write(fd, &evt, sizeof(evt)) != sizeof(evt)) \ 54 smi_log(SMI_LOG_ERR, \ 55 "Error writing to event pipe: %s", \ 56 sm_errstring(errno)); \ 57 } while (0) 58 59 #ifndef USE_PIPE_WAKE_POLL 60 # define USE_PIPE_WAKE_POLL 1 61 #endif /* USE_PIPE_WAKE_POLL */ 62 63 /* poll check periodicity (default 10000 - 10 s) */ 64 #define POLL_TIMEOUT 10000 65 66 /* worker conditional wait timeout (default 10 s) */ 67 #define COND_TIMEOUT 10 68 69 /* functions */ 70 static int mi_close_session __P((SMFICTX_PTR)); 71 72 static void *mi_worker __P((void *)); 73 static void *mi_pool_controller __P((void *)); 74 75 static int mi_list_add_ctx __P((SMFICTX_PTR)); 76 static int mi_list_del_ctx __P((SMFICTX_PTR)); 77 78 /* 79 ** periodicity of cleaning up old sessions (timedout) 80 ** sessions list will be checked to find old inactive 81 ** sessions each DT_CHECK_OLD_SESSIONS sec 82 */ 83 84 #define DT_CHECK_OLD_SESSIONS 600 85 86 #ifndef OLD_SESSION_TIMEOUT 87 # define OLD_SESSION_TIMEOUT ctx->ctx_timeout 88 #endif /* OLD_SESSION_TIMEOUT */ 89 90 /* session states - with respect to the pool of workers */ 91 #define WKST_INIT 0 /* initial state */ 92 #define WKST_READY_TO_RUN 1 /* command ready do be read */ 93 #define WKST_RUNNING 2 /* session running on a worker */ 94 #define WKST_READY_TO_WAIT 3 /* session just finished by a worker */ 95 #define WKST_WAITING 4 /* waiting for new command */ 96 #define WKST_CLOSING 5 /* session finished */ 97 98 #ifndef MIN_WORKERS 99 # define MIN_WORKERS 2 /* minimum number of threads to keep around */ 100 #endif 101 102 #define MIN_IDLE 1 /* minimum number of idle threads */ 103 104 105 /* 106 ** Macros for threads and mutex management 107 */ 108 109 #define TASKMGR_LOCK() \ 110 do \ 111 { \ 112 if (!smutex_lock(&Tskmgr.tm_w_mutex)) \ 113 smi_log(SMI_LOG_ERR, "TASKMGR_LOCK error"); \ 114 } while (0) 115 116 #define TASKMGR_UNLOCK() \ 117 do \ 118 { \ 119 if (!smutex_unlock(&Tskmgr.tm_w_mutex)) \ 120 smi_log(SMI_LOG_ERR, "TASKMGR_UNLOCK error"); \ 121 } while (0) 122 123 #define TASKMGR_COND_WAIT() \ 124 scond_timedwait(&Tskmgr.tm_w_cond, &Tskmgr.tm_w_mutex, COND_TIMEOUT) 125 126 #define TASKMGR_COND_SIGNAL() \ 127 do \ 128 { \ 129 if (scond_signal(&Tskmgr.tm_w_cond) != 0) \ 130 smi_log(SMI_LOG_ERR, "TASKMGR_COND_SIGNAL error"); \ 131 } while (0) 132 133 #define LAUNCH_WORKER(ctx) \ 134 do \ 135 { \ 136 int r; \ 137 sthread_t tid; \ 138 \ 139 if ((r = thread_create(&tid, mi_worker, ctx)) != 0) \ 140 smi_log(SMI_LOG_ERR, "LAUNCH_WORKER error: %s",\ 141 sm_errstring(r)); \ 142 } while (0) 143 144 #if POOL_DEBUG 145 # define POOL_LEV_DPRINTF(lev, x) \ 146 do { \ 147 if ((lev) < ctx->ctx_dbg) \ 148 sm_dprintf x; \ 149 } while (0) 150 #else /* POOL_DEBUG */ 151 # define POOL_LEV_DPRINTF(lev, x) 152 #endif /* POOL_DEBUG */ 153 154 /* 155 ** MI_START_SESSION -- Start a session in the pool of workers 156 ** 157 ** Parameters: 158 ** ctx -- context structure 159 ** 160 ** Returns: 161 ** MI_SUCCESS/MI_FAILURE 162 */ 163 164 int 165 mi_start_session(ctx) 166 SMFICTX_PTR ctx; 167 { 168 static long id = 0; 169 170 SM_ASSERT(Tskmgr.tm_signature == TM_SIGNATURE); 171 SM_ASSERT(ctx != NULL); 172 POOL_LEV_DPRINTF(4, ("PIPE r=[%d] w=[%d]", RD_PIPE, WR_PIPE)); 173 TASKMGR_LOCK(); 174 175 if (mi_list_add_ctx(ctx) != MI_SUCCESS) 176 { 177 TASKMGR_UNLOCK(); 178 return MI_FAILURE; 179 } 180 181 ctx->ctx_sid = id++; 182 183 /* if there is an idle worker, signal it, otherwise start new worker */ 184 if (Tskmgr.tm_nb_idle > 0) 185 { 186 ctx->ctx_wstate = WKST_READY_TO_RUN; 187 TASKMGR_COND_SIGNAL(); 188 } 189 else 190 { 191 ctx->ctx_wstate = WKST_RUNNING; 192 LAUNCH_WORKER(ctx); 193 } 194 TASKMGR_UNLOCK(); 195 return MI_SUCCESS; 196 } 197 198 /* 199 ** MI_CLOSE_SESSION -- Close a session and clean up data structures 200 ** 201 ** Parameters: 202 ** ctx -- context structure 203 ** 204 ** Returns: 205 ** MI_SUCCESS/MI_FAILURE 206 */ 207 208 static int 209 mi_close_session(ctx) 210 SMFICTX_PTR ctx; 211 { 212 SM_ASSERT(ctx != NULL); 213 214 (void) mi_list_del_ctx(ctx); 215 if (ValidSocket(ctx->ctx_sd)) 216 { 217 (void) closesocket(ctx->ctx_sd); 218 ctx->ctx_sd = INVALID_SOCKET; 219 } 220 if (ctx->ctx_reply != NULL) 221 { 222 free(ctx->ctx_reply); 223 ctx->ctx_reply = NULL; 224 } 225 if (ctx->ctx_privdata != NULL) 226 { 227 smi_log(SMI_LOG_WARN, "%s: private data not NULL", 228 ctx->ctx_smfi->xxfi_name); 229 } 230 mi_clr_macros(ctx, 0); 231 free(ctx); 232 233 return MI_SUCCESS; 234 } 235 236 /* 237 ** MI_POOL_CONTROLER_INIT -- Launch the worker pool controller 238 ** Must be called before starting sessions. 239 ** 240 ** Parameters: 241 ** none 242 ** 243 ** Returns: 244 ** MI_SUCCESS/MI_FAILURE 245 */ 246 247 int 248 mi_pool_controller_init() 249 { 250 sthread_t tid; 251 int r, i; 252 253 if (Tskmgr.tm_signature == TM_SIGNATURE) 254 return MI_SUCCESS; 255 256 SM_TAILQ_INIT(&WRK_CTX_HEAD); 257 Tskmgr.tm_tid = (sthread_t) -1; 258 Tskmgr.tm_nb_workers = 0; 259 Tskmgr.tm_nb_idle = 0; 260 261 if (pipe(Tskmgr.tm_p) != 0) 262 { 263 smi_log(SMI_LOG_ERR, "can't create event pipe: %s", 264 sm_errstring(r)); 265 return MI_FAILURE; 266 } 267 268 (void) smutex_init(&Tskmgr.tm_w_mutex); 269 (void) scond_init(&Tskmgr.tm_w_cond); 270 271 /* Launch the pool controller */ 272 if ((r = thread_create(&tid, mi_pool_controller, (void *) NULL)) != 0) 273 { 274 smi_log(SMI_LOG_ERR, "can't create controller thread: %s", 275 sm_errstring(r)); 276 return MI_FAILURE; 277 } 278 Tskmgr.tm_tid = tid; 279 Tskmgr.tm_signature = TM_SIGNATURE; 280 281 /* Create the pool of workers */ 282 for (i = 0; i < MIN_WORKERS; i++) 283 { 284 if ((r = thread_create(&tid, mi_worker, (void *) NULL)) != 0) 285 { 286 smi_log(SMI_LOG_ERR, "can't create workers crew: %s", 287 sm_errstring(r)); 288 return MI_FAILURE; 289 } 290 } 291 292 return MI_SUCCESS; 293 } 294 295 /* 296 ** MI_POOL_CONTROLLER -- manage the pool of workers 297 ** This thread must be running when listener begins 298 ** starting sessions 299 ** 300 ** Parameters: 301 ** arg -- unused 302 ** 303 ** Returns: 304 ** NULL 305 ** 306 ** Control flow: 307 ** for (;;) 308 ** Look for timed out sessions 309 ** Select sessions to wait for sendmail command 310 ** Poll set of file descriptors 311 ** if timeout 312 ** continue 313 ** For each file descriptor ready 314 ** launch new thread if no worker available 315 ** else 316 ** signal waiting worker 317 */ 318 319 /* Poll structure array (pollfd) size step */ 320 #define PFD_STEP 256 321 322 #define WAIT_FD(i) (pfd[i].fd) 323 #define WAITFN "POLL" 324 325 static void * 326 mi_pool_controller(arg) 327 void *arg; 328 { 329 struct pollfd *pfd = NULL; 330 int dim_pfd = 0; 331 bool rebuild_set = true; 332 int pcnt = 0; /* error count for poll() failures */ 333 334 Tskmgr.tm_tid = sthread_get_id(); 335 if (pthread_detach(Tskmgr.tm_tid) != 0) 336 { 337 smi_log(SMI_LOG_ERR, "Failed to detach pool controller thread"); 338 return NULL; 339 } 340 341 pfd = (struct pollfd *) malloc(PFD_STEP * sizeof(struct pollfd)); 342 if (pfd == NULL) 343 { 344 smi_log(SMI_LOG_ERR, "Failed to malloc pollfd array: %s", 345 sm_errstring(errno)); 346 return NULL; 347 } 348 dim_pfd = PFD_STEP; 349 350 for (;;) 351 { 352 SMFICTX_PTR ctx; 353 int nfd, rfd, i; 354 time_t now; 355 time_t lastcheck; 356 357 POOL_LEV_DPRINTF(4, ("Let's %s again...", WAITFN)); 358 359 if (mi_stop() != MILTER_CONT) 360 break; 361 362 TASKMGR_LOCK(); 363 364 now = time(NULL); 365 366 /* check for timed out sessions? */ 367 if (lastcheck + DT_CHECK_OLD_SESSIONS < now) 368 { 369 SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link) 370 { 371 if (ctx->ctx_wstate == WKST_WAITING) 372 { 373 if (ctx->ctx_wait == 0) 374 { 375 ctx->ctx_wait = now; 376 continue; 377 } 378 379 /* if session timed out, close it */ 380 if (ctx->ctx_wait + OLD_SESSION_TIMEOUT 381 < now) 382 { 383 sfsistat (*fi_close) __P((SMFICTX *)); 384 385 POOL_LEV_DPRINTF(4, 386 ("Closing old connection: sd=%d id=%d", 387 ctx->ctx_sd, 388 ctx->ctx_sid)); 389 390 if ((fi_close = ctx->ctx_smfi->xxfi_close) != NULL) 391 (void) (*fi_close)(ctx); 392 393 mi_close_session(ctx); 394 ctx = SM_TAILQ_FIRST(&WRK_CTX_HEAD); 395 continue; 396 } 397 } 398 } 399 lastcheck = now; 400 } 401 402 if (rebuild_set) 403 { 404 /* 405 ** Initialize poll set. 406 ** Insert into the poll set the file descriptors of 407 ** all sessions waiting for a command from sendmail. 408 */ 409 410 nfd = 0; 411 412 /* begin with worker pipe */ 413 pfd[nfd].fd = RD_PIPE; 414 pfd[nfd].events = MI_POLL_RD_FLAGS; 415 pfd[nfd].revents = 0; 416 nfd++; 417 418 SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link) 419 { 420 /* 421 ** update ctx_wait - start of wait moment - 422 ** for timeout 423 */ 424 425 if (ctx->ctx_wstate == WKST_READY_TO_WAIT) 426 ctx->ctx_wait = now; 427 428 /* add the session to the pollfd array? */ 429 if ((ctx->ctx_wstate == WKST_READY_TO_WAIT) || 430 (ctx->ctx_wstate == WKST_WAITING)) 431 { 432 /* 433 ** Resize the pollfd array if it 434 ** isn't large enough. 435 */ 436 437 if (nfd >= dim_pfd) 438 { 439 struct pollfd *tpfd; 440 size_t new; 441 442 new = (dim_pfd + PFD_STEP) * 443 sizeof(*tpfd); 444 tpfd = (struct pollfd *) 445 realloc(pfd, new); 446 if (tpfd != NULL) 447 { 448 pfd = tpfd; 449 dim_pfd += PFD_STEP; 450 } 451 else 452 { 453 smi_log(SMI_LOG_ERR, 454 "Failed to realloc pollfd array:%s", 455 sm_errstring(errno)); 456 } 457 } 458 459 /* add the session to pollfd array */ 460 if (nfd < dim_pfd) 461 { 462 ctx->ctx_wstate = WKST_WAITING; 463 pfd[nfd].fd = ctx->ctx_sd; 464 pfd[nfd].events = MI_POLL_RD_FLAGS; 465 pfd[nfd].revents = 0; 466 nfd++; 467 } 468 } 469 } 470 } 471 472 TASKMGR_UNLOCK(); 473 474 /* Everything is ready, let's wait for an event */ 475 rfd = poll(pfd, nfd, POLL_TIMEOUT); 476 477 POOL_LEV_DPRINTF(4, ("%s returned: at epoch %d value %d", 478 WAITFN, now, nfd)); 479 480 /* timeout */ 481 if (rfd == 0) 482 continue; 483 484 rebuild_set = true; 485 486 /* error */ 487 if (rfd < 0) 488 { 489 if (errno == EINTR) 490 continue; 491 pcnt++; 492 smi_log(SMI_LOG_ERR, 493 "%s() failed (%s), %s", 494 WAITFN, sm_errstring(errno), 495 pcnt >= MAX_FAILS_S ? "abort" : "try again"); 496 497 if (pcnt >= MAX_FAILS_S) 498 goto err; 499 } 500 pcnt = 0; 501 502 /* something happened */ 503 for (i = 0; i < nfd; i++) 504 { 505 if (pfd[i].revents == 0) 506 continue; 507 508 POOL_LEV_DPRINTF(4, ("%s event on pfd[%d/%d]=%d ", 509 WAITFN, i, nfd, 510 WAIT_FD(i))); 511 512 /* has a worker signaled an end of task ? */ 513 if (WAIT_FD(i) == RD_PIPE) 514 { 515 char evt = 0; 516 int r = 0; 517 518 POOL_LEV_DPRINTF(4, 519 ("PIPE WILL READ evt = %08X %08X", 520 pfd[i].events, pfd[i].revents)); 521 522 if ((pfd[i].revents & MI_POLL_RD_FLAGS) != 0) 523 { 524 r = read(RD_PIPE, &evt, sizeof(evt)); 525 if (r == sizeof(evt)) 526 { 527 /* Do nothing */ 528 } 529 } 530 531 POOL_LEV_DPRINTF(4, 532 ("PIPE DONE READ i=[%d] fd=[%d] r=[%d] evt=[%d]", 533 i, RD_PIPE, r, evt)); 534 535 if ((pfd[i].revents & ~MI_POLL_RD_FLAGS) != 0) 536 { 537 /* Exception handling */ 538 } 539 continue; 540 } 541 542 /* no ! sendmail wants to send a command */ 543 SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link) 544 { 545 if (ctx->ctx_wstate != WKST_WAITING) 546 continue; 547 548 POOL_LEV_DPRINTF(4, 549 ("Checking context sd=%d - fd=%d ", 550 ctx->ctx_sd , WAIT_FD(i))); 551 552 if (ctx->ctx_sd == pfd[i].fd) 553 { 554 TASKMGR_LOCK(); 555 556 POOL_LEV_DPRINTF(4, 557 ("TASK: found %d for fd[%d]=%d", 558 ctx->ctx_sid, i, WAIT_FD(i))); 559 560 if (Tskmgr.tm_nb_idle > 0) 561 { 562 ctx->ctx_wstate = WKST_READY_TO_RUN; 563 TASKMGR_COND_SIGNAL(); 564 } 565 else 566 { 567 ctx->ctx_wstate = WKST_RUNNING; 568 LAUNCH_WORKER(ctx); 569 } 570 TASKMGR_UNLOCK(); 571 break; 572 } 573 } 574 575 POOL_LEV_DPRINTF(4, 576 ("TASK %s FOUND - Checking PIPE for fd[%d]", 577 ctx != NULL ? "" : "NOT", WAIT_FD(i))); 578 } 579 } 580 581 err: 582 if (pfd != NULL) 583 free(pfd); 584 585 Tskmgr.tm_signature = 0; 586 for (;;) 587 { 588 SMFICTX_PTR ctx; 589 590 ctx = SM_TAILQ_FIRST(&WRK_CTX_HEAD); 591 if (ctx == NULL) 592 break; 593 mi_close_session(ctx); 594 } 595 596 (void) smutex_destroy(&Tskmgr.tm_w_mutex); 597 (void) scond_destroy(&Tskmgr.tm_w_cond); 598 599 return NULL; 600 } 601 602 /* 603 ** Look for a task ready to run. 604 ** Value of ctx is NULL or a pointer to a task ready to run. 605 */ 606 607 #define GET_TASK_READY_TO_RUN() \ 608 SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link) \ 609 { \ 610 if (ctx->ctx_wstate == WKST_READY_TO_RUN) \ 611 { \ 612 ctx->ctx_wstate = WKST_RUNNING; \ 613 break; \ 614 } \ 615 } 616 617 /* 618 ** MI_WORKER -- worker thread 619 ** executes tasks distributed by the mi_pool_controller 620 ** or by mi_start_session 621 ** 622 ** Parameters: 623 ** arg -- pointer to context structure 624 ** 625 ** Returns: 626 ** NULL pointer 627 */ 628 629 static void * 630 mi_worker(arg) 631 void *arg; 632 { 633 SMFICTX_PTR ctx; 634 bool done; 635 sthread_t t_id; 636 int r; 637 638 ctx = (SMFICTX_PTR) arg; 639 done = false; 640 if (ctx != NULL) 641 ctx->ctx_wstate = WKST_RUNNING; 642 643 t_id = sthread_get_id(); 644 if (pthread_detach(t_id) != 0) 645 { 646 smi_log(SMI_LOG_ERR, "Failed to detach worker thread"); 647 if (ctx != NULL) 648 ctx->ctx_wstate = WKST_READY_TO_RUN; 649 return NULL; 650 } 651 652 TASKMGR_LOCK(); 653 Tskmgr.tm_nb_workers++; 654 TASKMGR_UNLOCK(); 655 656 while (!done) 657 { 658 if (mi_stop() != MILTER_CONT) 659 break; 660 661 /* let's handle next task... */ 662 if (ctx != NULL) 663 { 664 int res; 665 666 POOL_LEV_DPRINTF(4, 667 ("worker %d: new task -> let's handle it", 668 t_id)); 669 res = mi_engine(ctx); 670 POOL_LEV_DPRINTF(4, 671 ("worker %d: mi_engine returned %d", t_id, res)); 672 673 TASKMGR_LOCK(); 674 if (res != MI_CONTINUE) 675 { 676 ctx->ctx_wstate = WKST_CLOSING; 677 678 /* 679 ** Delete context from linked list of 680 ** sessions and close session. 681 */ 682 683 mi_close_session(ctx); 684 } 685 else 686 { 687 ctx->ctx_wstate = WKST_READY_TO_WAIT; 688 689 POOL_LEV_DPRINTF(4, 690 ("writing to event pipe...")); 691 692 /* 693 ** Signal task controller to add new session 694 ** to poll set. 695 */ 696 697 PIPE_SEND_SIGNAL(); 698 } 699 TASKMGR_UNLOCK(); 700 ctx = NULL; 701 702 } 703 704 /* check if there is any task waiting to be served */ 705 TASKMGR_LOCK(); 706 707 GET_TASK_READY_TO_RUN(); 708 709 /* Got a task? */ 710 if (ctx != NULL) 711 { 712 TASKMGR_UNLOCK(); 713 continue; 714 } 715 716 /* 717 ** if not, let's check if there is enough idle workers 718 ** if yes: quit 719 */ 720 721 if (Tskmgr.tm_nb_workers > MIN_WORKERS && 722 Tskmgr.tm_nb_idle > MIN_IDLE) 723 done = true; 724 725 POOL_LEV_DPRINTF(4, ("worker %d: checking ... %d %d", t_id, 726 Tskmgr.tm_nb_workers, Tskmgr.tm_nb_idle + 1)); 727 728 if (done) 729 { 730 POOL_LEV_DPRINTF(4, ("worker %d: quitting... ", t_id)); 731 Tskmgr.tm_nb_workers--; 732 TASKMGR_UNLOCK(); 733 continue; 734 } 735 736 /* 737 ** if no task ready to run, wait for another one 738 */ 739 740 Tskmgr.tm_nb_idle++; 741 TASKMGR_COND_WAIT(); 742 Tskmgr.tm_nb_idle--; 743 744 /* look for a task */ 745 GET_TASK_READY_TO_RUN(); 746 747 TASKMGR_UNLOCK(); 748 } 749 return NULL; 750 } 751 752 /* 753 ** MI_LIST_ADD_CTX -- add new session to linked list 754 ** 755 ** Parameters: 756 ** ctx -- context structure 757 ** 758 ** Returns: 759 ** MI_FAILURE/MI_SUCCESS 760 */ 761 762 static int 763 mi_list_add_ctx(ctx) 764 SMFICTX_PTR ctx; 765 { 766 SM_ASSERT(ctx != NULL); 767 SM_TAILQ_INSERT_TAIL(&WRK_CTX_HEAD, ctx, ctx_link); 768 return MI_SUCCESS; 769 } 770 771 /* 772 ** MI_LIST_DEL_CTX -- remove session from linked list when finished 773 ** 774 ** Parameters: 775 ** ctx -- context structure 776 ** 777 ** Returns: 778 ** MI_FAILURE/MI_SUCCESS 779 */ 780 781 static int 782 mi_list_del_ctx(ctx) 783 SMFICTX_PTR ctx; 784 { 785 SM_ASSERT(ctx != NULL); 786 if (SM_TAILQ_EMPTY(&WRK_CTX_HEAD)) 787 return MI_FAILURE; 788 789 SM_TAILQ_REMOVE(&WRK_CTX_HEAD, ctx, ctx_link); 790 return MI_SUCCESS; 791 } 792 #endif /* _FFR_WORKERS_POOL */ 793