1 /* 2 * Copyright (c) 2003-2004, 2006 Sendmail, Inc. and its suppliers. 3 * All rights reserved. 4 * 5 * By using this file, you agree to the terms and conditions set 6 * forth in the LICENSE file which can be found at the top level of 7 * the sendmail distribution. 8 * 9 * Contributed by Jose Marcio Martins da Cruz - Ecole des Mines de Paris 10 * Jose-Marcio.Martins@ensmp.fr 11 */ 12 13 #include <sm/gen.h> 14 SM_RCSID("@(#)$Id: worker.c,v 8.9 2006/12/18 18:26:51 ca Exp $") 15 16 #include "libmilter.h" 17 18 #if _FFR_WORKERS_POOL 19 20 typedef struct taskmgr_S taskmgr_T; 21 22 #define TM_SIGNATURE 0x23021957 23 24 struct taskmgr_S 25 { 26 long tm_signature; /* has the controller been initialized */ 27 sthread_t tm_tid; /* thread id of controller */ 28 smfi_hd_T tm_ctx_head; /* head of the linked list of contexts */ 29 30 int tm_nb_workers; /* number of workers in the pool */ 31 int tm_nb_idle; /* number of workers waiting */ 32 33 int tm_p[2]; /* poll control pipe */ 34 35 smutex_t tm_w_mutex; /* linked list access mutex */ 36 scond_t tm_w_cond; /* */ 37 }; 38 39 static taskmgr_T Tskmgr = {0}; 40 41 #define WRK_CTX_HEAD Tskmgr.tm_ctx_head 42 43 #define RD_PIPE (Tskmgr.tm_p[0]) 44 #define WR_PIPE (Tskmgr.tm_p[1]) 45 46 #define PIPE_SEND_SIGNAL() \ 47 do \ 48 { \ 49 char evt = 0x5a; \ 50 int fd = WR_PIPE; \ 51 if (write(fd, &evt, sizeof(evt)) != sizeof(evt)) \ 52 smi_log(SMI_LOG_ERR, \ 53 "Error writing to event pipe: %s", \ 54 sm_errstring(errno)); \ 55 } while (0) 56 57 #ifndef USE_PIPE_WAKE_POLL 58 # define USE_PIPE_WAKE_POLL 1 59 #endif /* USE_PIPE_WAKE_POLL */ 60 61 /* poll check periodicity (default 10000 - 10 s) */ 62 #define POLL_TIMEOUT 10000 63 64 /* worker conditional wait timeout (default 10 s) */ 65 #define COND_TIMEOUT 10 66 67 /* functions */ 68 static int mi_close_session __P((SMFICTX_PTR)); 69 70 static void *mi_worker __P((void *)); 71 static void *mi_pool_controller __P((void *)); 72 73 static int mi_list_add_ctx __P((SMFICTX_PTR)); 74 static int mi_list_del_ctx __P((SMFICTX_PTR)); 75 76 /* 77 ** periodicity of cleaning up old sessions (timedout) 78 ** sessions list will be checked to find old inactive 79 ** sessions each DT_CHECK_OLD_SESSIONS sec 80 */ 81 82 #define DT_CHECK_OLD_SESSIONS 600 83 84 #ifndef OLD_SESSION_TIMEOUT 85 # define OLD_SESSION_TIMEOUT ctx->ctx_timeout 86 #endif /* OLD_SESSION_TIMEOUT */ 87 88 /* session states - with respect to the pool of workers */ 89 #define WKST_INIT 0 /* initial state */ 90 #define WKST_READY_TO_RUN 1 /* command ready do be read */ 91 #define WKST_RUNNING 2 /* session running on a worker */ 92 #define WKST_READY_TO_WAIT 3 /* session just finished by a worker */ 93 #define WKST_WAITING 4 /* waiting for new command */ 94 #define WKST_CLOSING 5 /* session finished */ 95 96 #ifndef MIN_WORKERS 97 # define MIN_WORKERS 2 /* minimum number of threads to keep around */ 98 #endif 99 100 #define MIN_IDLE 1 /* minimum number of idle threads */ 101 102 103 /* 104 ** Macros for threads and mutex management 105 */ 106 107 #define TASKMGR_LOCK() \ 108 do \ 109 { \ 110 if (!smutex_lock(&Tskmgr.tm_w_mutex)) \ 111 smi_log(SMI_LOG_ERR, "TASKMGR_LOCK error"); \ 112 } while (0) 113 114 #define TASKMGR_UNLOCK() \ 115 do \ 116 { \ 117 if (!smutex_unlock(&Tskmgr.tm_w_mutex)) \ 118 smi_log(SMI_LOG_ERR, "TASKMGR_UNLOCK error"); \ 119 } while (0) 120 121 #define TASKMGR_COND_WAIT() \ 122 scond_timedwait(&Tskmgr.tm_w_cond, &Tskmgr.tm_w_mutex, COND_TIMEOUT) 123 124 #define TASKMGR_COND_SIGNAL() \ 125 do \ 126 { \ 127 if (scond_signal(&Tskmgr.tm_w_cond) != 0) \ 128 smi_log(SMI_LOG_ERR, "TASKMGR_COND_SIGNAL error"); \ 129 } while (0) 130 131 #define LAUNCH_WORKER(ctx) \ 132 do \ 133 { \ 134 int r; \ 135 sthread_t tid; \ 136 \ 137 if ((r = thread_create(&tid, mi_worker, ctx)) != 0) \ 138 smi_log(SMI_LOG_ERR, "LAUNCH_WORKER error: %s",\ 139 sm_errstring(r)); \ 140 } while (0) 141 142 #if POOL_DEBUG 143 # define POOL_LEV_DPRINTF(lev, x) \ 144 do { \ 145 if ((lev) < ctx->ctx_dbg) \ 146 sm_dprintf x; \ 147 } while (0) 148 #else /* POOL_DEBUG */ 149 # define POOL_LEV_DPRINTF(lev, x) 150 #endif /* POOL_DEBUG */ 151 152 /* 153 ** MI_START_SESSION -- Start a session in the pool of workers 154 ** 155 ** Parameters: 156 ** ctx -- context structure 157 ** 158 ** Returns: 159 ** MI_SUCCESS/MI_FAILURE 160 */ 161 162 int 163 mi_start_session(ctx) 164 SMFICTX_PTR ctx; 165 { 166 static long id = 0; 167 168 SM_ASSERT(Tskmgr.tm_signature == TM_SIGNATURE); 169 SM_ASSERT(ctx != NULL); 170 POOL_LEV_DPRINTF(4, ("PIPE r=[%d] w=[%d]", RD_PIPE, WR_PIPE)); 171 TASKMGR_LOCK(); 172 173 if (mi_list_add_ctx(ctx) != MI_SUCCESS) 174 { 175 TASKMGR_UNLOCK(); 176 return MI_FAILURE; 177 } 178 179 ctx->ctx_sid = id++; 180 181 /* if there is an idle worker, signal it, otherwise start new worker */ 182 if (Tskmgr.tm_nb_idle > 0) 183 { 184 ctx->ctx_wstate = WKST_READY_TO_RUN; 185 TASKMGR_COND_SIGNAL(); 186 } 187 else 188 { 189 ctx->ctx_wstate = WKST_RUNNING; 190 LAUNCH_WORKER(ctx); 191 } 192 TASKMGR_UNLOCK(); 193 return MI_SUCCESS; 194 } 195 196 /* 197 ** MI_CLOSE_SESSION -- Close a session and clean up data structures 198 ** 199 ** Parameters: 200 ** ctx -- context structure 201 ** 202 ** Returns: 203 ** MI_SUCCESS/MI_FAILURE 204 */ 205 206 static int 207 mi_close_session(ctx) 208 SMFICTX_PTR ctx; 209 { 210 SM_ASSERT(ctx != NULL); 211 212 (void) mi_list_del_ctx(ctx); 213 if (ValidSocket(ctx->ctx_sd)) 214 { 215 (void) closesocket(ctx->ctx_sd); 216 ctx->ctx_sd = INVALID_SOCKET; 217 } 218 if (ctx->ctx_reply != NULL) 219 { 220 free(ctx->ctx_reply); 221 ctx->ctx_reply = NULL; 222 } 223 if (ctx->ctx_privdata != NULL) 224 { 225 smi_log(SMI_LOG_WARN, "%s: private data not NULL", 226 ctx->ctx_smfi->xxfi_name); 227 } 228 mi_clr_macros(ctx, 0); 229 free(ctx); 230 231 return MI_SUCCESS; 232 } 233 234 /* 235 ** MI_POOL_CONTROLER_INIT -- Launch the worker pool controller 236 ** Must be called before starting sessions. 237 ** 238 ** Parameters: 239 ** none 240 ** 241 ** Returns: 242 ** MI_SUCCESS/MI_FAILURE 243 */ 244 245 int 246 mi_pool_controller_init() 247 { 248 sthread_t tid; 249 int r, i; 250 251 if (Tskmgr.tm_signature == TM_SIGNATURE) 252 return MI_SUCCESS; 253 254 SM_TAILQ_INIT(&WRK_CTX_HEAD); 255 Tskmgr.tm_tid = (sthread_t) -1; 256 Tskmgr.tm_nb_workers = 0; 257 Tskmgr.tm_nb_idle = 0; 258 259 if (pipe(Tskmgr.tm_p) != 0) 260 { 261 smi_log(SMI_LOG_ERR, "can't create event pipe: %s", 262 sm_errstring(r)); 263 return MI_FAILURE; 264 } 265 266 POOL_LEV_DPRINTF(4, ("PIPE r=[%d] w=[%d]", RD_PIPE, WR_PIPE)); 267 268 (void) smutex_init(&Tskmgr.tm_w_mutex); 269 (void) scond_init(&Tskmgr.tm_w_cond); 270 271 /* Launch the pool controller */ 272 if ((r = thread_create(&tid, mi_pool_controller, (void *) NULL)) != 0) 273 { 274 smi_log(SMI_LOG_ERR, "can't create controller thread: %s", 275 sm_errstring(r)); 276 return MI_FAILURE; 277 } 278 Tskmgr.tm_tid = tid; 279 Tskmgr.tm_signature = TM_SIGNATURE; 280 281 /* Create the pool of workers */ 282 for (i = 0; i < MIN_WORKERS; i++) 283 { 284 if ((r = thread_create(&tid, mi_worker, (void *) NULL)) != 0) 285 { 286 smi_log(SMI_LOG_ERR, "can't create workers crew: %s", 287 sm_errstring(r)); 288 return MI_FAILURE; 289 } 290 } 291 292 return MI_SUCCESS; 293 } 294 295 /* 296 ** MI_POOL_CONTROLLER -- manage the pool of workers 297 ** This thread must be running when listener begins 298 ** starting sessions 299 ** 300 ** Parameters: 301 ** arg -- unused 302 ** 303 ** Returns: 304 ** NULL 305 ** 306 ** Control flow: 307 ** for (;;) 308 ** Look for timed out sessions 309 ** Select sessions to wait for sendmail command 310 ** Poll set of file descriptors 311 ** if timeout 312 ** continue 313 ** For each file descriptor ready 314 ** launch new thread if no worker available 315 ** else 316 ** signal waiting worker 317 */ 318 319 /* Poll structure array (pollfd) size step */ 320 #define PFD_STEP 256 321 322 #define WAIT_FD(i) (pfd[i].fd) 323 #define WAITFN "POLL" 324 325 static void * 326 mi_pool_controller(arg) 327 void *arg; 328 { 329 struct pollfd *pfd = NULL; 330 int dim_pfd = 0; 331 bool rebuild_set = true; 332 int pcnt = 0; /* error count for poll() failures */ 333 334 Tskmgr.tm_tid = sthread_get_id(); 335 if (pthread_detach(Tskmgr.tm_tid) != 0) 336 { 337 smi_log(SMI_LOG_ERR, "Failed to detach pool controller thread"); 338 return NULL; 339 } 340 341 pfd = (struct pollfd *) malloc(PFD_STEP * sizeof(struct pollfd)); 342 if (pfd == NULL) 343 { 344 smi_log(SMI_LOG_ERR, "Failed to malloc pollfd array: %s", 345 sm_errstring(errno)); 346 return NULL; 347 } 348 dim_pfd = PFD_STEP; 349 350 for (;;) 351 { 352 SMFICTX_PTR ctx; 353 int nfd, rfd, i; 354 time_t now; 355 time_t lastcheck; 356 357 POOL_LEV_DPRINTF(4, ("Let's %s again...", WAITFN)); 358 359 if (mi_stop() != MILTER_CONT) 360 break; 361 362 TASKMGR_LOCK(); 363 364 now = time(NULL); 365 366 /* check for timed out sessions? */ 367 if (lastcheck + DT_CHECK_OLD_SESSIONS < now) 368 { 369 SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link) 370 { 371 if (ctx->ctx_wstate == WKST_WAITING) 372 { 373 if (ctx->ctx_wait == 0) 374 { 375 ctx->ctx_wait = now; 376 continue; 377 } 378 379 /* if session timed out, close it */ 380 if (ctx->ctx_wait + OLD_SESSION_TIMEOUT 381 < now) 382 { 383 sfsistat (*fi_close) __P((SMFICTX *)); 384 385 POOL_LEV_DPRINTF(4, 386 ("Closing old connection: sd=%d id=%d", 387 ctx->ctx_sd, 388 ctx->ctx_sid)); 389 390 if ((fi_close = ctx->ctx_smfi->xxfi_close) != NULL) 391 (void) (*fi_close)(ctx); 392 393 mi_close_session(ctx); 394 ctx = SM_TAILQ_FIRST(&WRK_CTX_HEAD); 395 continue; 396 } 397 } 398 } 399 lastcheck = now; 400 } 401 402 if (rebuild_set) 403 { 404 /* 405 ** Initialize poll set. 406 ** Insert into the poll set the file descriptors of 407 ** all sessions waiting for a command from sendmail. 408 */ 409 410 nfd = 0; 411 412 /* begin with worker pipe */ 413 pfd[nfd].fd = RD_PIPE; 414 pfd[nfd].events = MI_POLL_RD_FLAGS; 415 pfd[nfd].revents = 0; 416 nfd++; 417 418 SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link) 419 { 420 /* 421 ** update ctx_wait - start of wait moment - 422 ** for timeout 423 */ 424 425 if (ctx->ctx_wstate == WKST_READY_TO_WAIT) 426 ctx->ctx_wait = now; 427 428 /* add the session to the pollfd array? */ 429 if ((ctx->ctx_wstate == WKST_READY_TO_WAIT) || 430 (ctx->ctx_wstate == WKST_WAITING)) 431 { 432 /* 433 ** Resize the pollfd array if it 434 ** isn't large enough. 435 */ 436 437 if (nfd >= dim_pfd) 438 { 439 struct pollfd *tpfd; 440 size_t new; 441 442 new = (dim_pfd + PFD_STEP) * 443 sizeof(*tpfd); 444 tpfd = (struct pollfd *) 445 realloc(pfd, new); 446 if (tpfd != NULL) 447 { 448 pfd = tpfd; 449 dim_pfd += PFD_STEP; 450 } 451 else 452 { 453 smi_log(SMI_LOG_ERR, 454 "Failed to realloc pollfd array:%s", 455 sm_errstring(errno)); 456 } 457 } 458 459 /* add the session to pollfd array */ 460 if (nfd < dim_pfd) 461 { 462 ctx->ctx_wstate = WKST_WAITING; 463 pfd[nfd].fd = ctx->ctx_sd; 464 pfd[nfd].events = MI_POLL_RD_FLAGS; 465 pfd[nfd].revents = 0; 466 nfd++; 467 } 468 } 469 } 470 } 471 472 TASKMGR_UNLOCK(); 473 474 /* Everything is ready, let's wait for an event */ 475 rfd = poll(pfd, nfd, POLL_TIMEOUT); 476 477 POOL_LEV_DPRINTF(4, ("%s returned: at epoch %d value %d", 478 WAITFN, now, nfd)); 479 480 /* timeout */ 481 if (rfd == 0) 482 continue; 483 484 rebuild_set = true; 485 486 /* error */ 487 if (rfd < 0) 488 { 489 if (errno == EINTR) 490 continue; 491 pcnt++; 492 smi_log(SMI_LOG_ERR, 493 "%s() failed (%s), %s", 494 WAITFN, sm_errstring(errno), 495 pcnt >= MAX_FAILS_S ? "abort" : "try again"); 496 497 if (pcnt >= MAX_FAILS_S) 498 goto err; 499 } 500 pcnt = 0; 501 502 /* something happened */ 503 for (i = 0; i < nfd; i++) 504 { 505 if (pfd[i].revents == 0) 506 continue; 507 508 POOL_LEV_DPRINTF(4, ("%s event on pfd[%d/%d]=%d ", 509 WAITFN, i, nfd, 510 WAIT_FD(i))); 511 512 /* has a worker signaled an end of task ? */ 513 if (WAIT_FD(i) == RD_PIPE) 514 { 515 char evt = 0; 516 int r = 0; 517 518 POOL_LEV_DPRINTF(4, 519 ("PIPE WILL READ evt = %08X %08X", 520 pfd[i].events, pfd[i].revents)); 521 522 if ((pfd[i].revents & MI_POLL_RD_FLAGS) != 0) 523 { 524 r = read(RD_PIPE, &evt, sizeof(evt)); 525 if (r == sizeof(evt)) 526 { 527 /* Do nothing */ 528 } 529 } 530 531 POOL_LEV_DPRINTF(4, 532 ("PIPE DONE READ i=[%d] fd=[%d] r=[%d] evt=[%d]", 533 i, RD_PIPE, r, evt)); 534 535 if ((pfd[i].revents & ~MI_POLL_RD_FLAGS) != 0) 536 { 537 /* Exception handling */ 538 } 539 continue; 540 } 541 542 /* no ! sendmail wants to send a command */ 543 SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link) 544 { 545 if (ctx->ctx_wstate != WKST_WAITING) 546 continue; 547 548 POOL_LEV_DPRINTF(4, 549 ("Checking context sd=%d - fd=%d ", 550 ctx->ctx_sd , WAIT_FD(i))); 551 552 if (ctx->ctx_sd == pfd[i].fd) 553 { 554 TASKMGR_LOCK(); 555 556 POOL_LEV_DPRINTF(4, 557 ("TASK: found %d for fd[%d]=%d", 558 ctx->ctx_sid, i, WAIT_FD(i))); 559 560 if (Tskmgr.tm_nb_idle > 0) 561 { 562 ctx->ctx_wstate = WKST_READY_TO_RUN; 563 TASKMGR_COND_SIGNAL(); 564 } 565 else 566 { 567 ctx->ctx_wstate = WKST_RUNNING; 568 LAUNCH_WORKER(ctx); 569 } 570 TASKMGR_UNLOCK(); 571 break; 572 } 573 } 574 575 POOL_LEV_DPRINTF(4, 576 ("TASK %s FOUND - Checking PIPE for fd[%d]", 577 ctx != NULL ? "" : "NOT", WAIT_FD(i))); 578 } 579 } 580 581 err: 582 if (pfd != NULL) 583 free(pfd); 584 585 Tskmgr.tm_signature = 0; 586 for (;;) 587 { 588 SMFICTX_PTR ctx; 589 590 ctx = SM_TAILQ_FIRST(&WRK_CTX_HEAD); 591 if (ctx == NULL) 592 break; 593 mi_close_session(ctx); 594 } 595 596 (void) smutex_destroy(&Tskmgr.tm_w_mutex); 597 (void) scond_destroy(&Tskmgr.tm_w_cond); 598 599 return NULL; 600 } 601 602 /* 603 ** Look for a task ready to run. 604 ** Value of ctx is NULL or a pointer to a task ready to run. 605 */ 606 607 #define GET_TASK_READY_TO_RUN() \ 608 SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link) \ 609 { \ 610 if (ctx->ctx_wstate == WKST_READY_TO_RUN) \ 611 { \ 612 ctx->ctx_wstate = WKST_RUNNING; \ 613 break; \ 614 } \ 615 } 616 617 /* 618 ** MI_WORKER -- worker thread 619 ** executes tasks distributed by the mi_pool_controller 620 ** or by mi_start_session 621 ** 622 ** Parameters: 623 ** arg -- pointer to context structure 624 ** 625 ** Returns: 626 ** NULL pointer 627 */ 628 629 static void * 630 mi_worker(arg) 631 void *arg; 632 { 633 SMFICTX_PTR ctx; 634 bool done; 635 sthread_t t_id; 636 int r; 637 638 ctx = (SMFICTX_PTR) arg; 639 done = false; 640 if (ctx != NULL) 641 ctx->ctx_wstate = WKST_RUNNING; 642 643 t_id = sthread_get_id(); 644 if (pthread_detach(t_id) != 0) 645 { 646 smi_log(SMI_LOG_ERR, "Failed to detach worker thread"); 647 if (ctx != NULL) 648 ctx->ctx_wstate = WKST_READY_TO_RUN; 649 return NULL; 650 } 651 652 TASKMGR_LOCK(); 653 Tskmgr.tm_nb_workers++; 654 TASKMGR_UNLOCK(); 655 656 while (!done) 657 { 658 if (mi_stop() != MILTER_CONT) 659 break; 660 661 /* let's handle next task... */ 662 if (ctx != NULL) 663 { 664 int res; 665 666 POOL_LEV_DPRINTF(4, 667 ("worker %d: new task -> let's handle it", 668 t_id)); 669 res = mi_engine(ctx); 670 POOL_LEV_DPRINTF(4, 671 ("worker %d: mi_engine returned %d", t_id, res)); 672 673 TASKMGR_LOCK(); 674 if (res != MI_CONTINUE) 675 { 676 ctx->ctx_wstate = WKST_CLOSING; 677 678 /* 679 ** Delete context from linked list of 680 ** sessions and close session. 681 */ 682 683 mi_close_session(ctx); 684 } 685 else 686 { 687 ctx->ctx_wstate = WKST_READY_TO_WAIT; 688 689 POOL_LEV_DPRINTF(4, 690 ("writing to event pipe...")); 691 692 /* 693 ** Signal task controller to add new session 694 ** to poll set. 695 */ 696 697 PIPE_SEND_SIGNAL(); 698 } 699 TASKMGR_UNLOCK(); 700 ctx = NULL; 701 702 } 703 704 /* check if there is any task waiting to be served */ 705 TASKMGR_LOCK(); 706 707 GET_TASK_READY_TO_RUN(); 708 709 /* Got a task? */ 710 if (ctx != NULL) 711 { 712 TASKMGR_UNLOCK(); 713 continue; 714 } 715 716 /* 717 ** if not, let's check if there is enough idle workers 718 ** if yes: quit 719 */ 720 721 if (Tskmgr.tm_nb_workers > MIN_WORKERS && 722 Tskmgr.tm_nb_idle > MIN_IDLE) 723 done = true; 724 725 POOL_LEV_DPRINTF(4, ("worker %d: checking ... %d %d", t_id, 726 Tskmgr.tm_nb_workers, Tskmgr.tm_nb_idle + 1)); 727 728 if (done) 729 { 730 POOL_LEV_DPRINTF(4, ("worker %d: quitting... ", t_id)); 731 Tskmgr.tm_nb_workers--; 732 TASKMGR_UNLOCK(); 733 continue; 734 } 735 736 /* 737 ** if no task ready to run, wait for another one 738 */ 739 740 Tskmgr.tm_nb_idle++; 741 TASKMGR_COND_WAIT(); 742 Tskmgr.tm_nb_idle--; 743 744 /* look for a task */ 745 GET_TASK_READY_TO_RUN(); 746 747 TASKMGR_UNLOCK(); 748 } 749 return NULL; 750 } 751 752 /* 753 ** MI_LIST_ADD_CTX -- add new session to linked list 754 ** 755 ** Parameters: 756 ** ctx -- context structure 757 ** 758 ** Returns: 759 ** MI_FAILURE/MI_SUCCESS 760 */ 761 762 static int 763 mi_list_add_ctx(ctx) 764 SMFICTX_PTR ctx; 765 { 766 SM_ASSERT(ctx != NULL); 767 SM_TAILQ_INSERT_TAIL(&WRK_CTX_HEAD, ctx, ctx_link); 768 return MI_SUCCESS; 769 } 770 771 /* 772 ** MI_LIST_DEL_CTX -- remove session from linked list when finished 773 ** 774 ** Parameters: 775 ** ctx -- context structure 776 ** 777 ** Returns: 778 ** MI_FAILURE/MI_SUCCESS 779 */ 780 781 static int 782 mi_list_del_ctx(ctx) 783 SMFICTX_PTR ctx; 784 { 785 SM_ASSERT(ctx != NULL); 786 if (SM_TAILQ_EMPTY(&WRK_CTX_HEAD)) 787 return MI_FAILURE; 788 789 SM_TAILQ_REMOVE(&WRK_CTX_HEAD, ctx, ctx_link); 790 return MI_SUCCESS; 791 } 792 #endif /* _FFR_WORKERS_POOL */ 793