1 /* 2 * Copyright (c) 2003-2004, 2007, 2009-2012 Proofpoint, Inc. and its suppliers. 3 * All rights reserved. 4 * 5 * By using this file, you agree to the terms and conditions set 6 * forth in the LICENSE file which can be found at the top level of 7 * the sendmail distribution. 8 * 9 * Contributed by Jose Marcio Martins da Cruz - Ecole des Mines de Paris 10 * Jose-Marcio.Martins@ensmp.fr 11 */ 12 13 #include <sm/gen.h> 14 SM_RCSID("@(#)$Id: worker.c,v 8.25 2013-11-22 20:51:37 ca Exp $") 15 16 #include "libmilter.h" 17 18 #if _FFR_WORKERS_POOL 19 20 typedef struct taskmgr_S taskmgr_T; 21 22 #define TM_SIGNATURE 0x23021957 23 24 struct taskmgr_S 25 { 26 long tm_signature; /* has the controller been initialized */ 27 sthread_t tm_tid; /* thread id of controller */ 28 smfi_hd_T tm_ctx_head; /* head of the linked list of contexts */ 29 30 int tm_nb_workers; /* number of workers in the pool */ 31 int tm_nb_idle; /* number of workers waiting */ 32 33 int tm_p[2]; /* poll control pipe */ 34 35 smutex_t tm_w_mutex; /* linked list access mutex */ 36 scond_t tm_w_cond; /* */ 37 }; 38 39 static taskmgr_T Tskmgr = {0}; 40 41 #define WRK_CTX_HEAD Tskmgr.tm_ctx_head 42 43 #define RD_PIPE (Tskmgr.tm_p[0]) 44 #define WR_PIPE (Tskmgr.tm_p[1]) 45 46 #define PIPE_SEND_SIGNAL() \ 47 do \ 48 { \ 49 char evt = 0x5a; \ 50 int fd = WR_PIPE; \ 51 if (write(fd, &evt, sizeof(evt)) != sizeof(evt)) \ 52 smi_log(SMI_LOG_ERR, \ 53 "Error writing to event pipe: %s", \ 54 sm_errstring(errno)); \ 55 } while (0) 56 57 #ifndef USE_PIPE_WAKE_POLL 58 # define USE_PIPE_WAKE_POLL 1 59 #endif /* USE_PIPE_WAKE_POLL */ 60 61 /* poll check periodicity (default 10000 - 10 s) */ 62 #define POLL_TIMEOUT 10000 63 64 /* worker conditional wait timeout (default 10 s) */ 65 #define COND_TIMEOUT 10 66 67 /* functions */ 68 static int mi_close_session __P((SMFICTX_PTR)); 69 70 static void *mi_worker __P((void *)); 71 static void *mi_pool_controller __P((void *)); 72 73 static int mi_list_add_ctx __P((SMFICTX_PTR)); 74 static int mi_list_del_ctx __P((SMFICTX_PTR)); 75 76 /* 77 ** periodicity of cleaning up old sessions (timedout) 78 ** sessions list will be checked to find old inactive 79 ** sessions each DT_CHECK_OLD_SESSIONS sec 80 */ 81 82 #define DT_CHECK_OLD_SESSIONS 600 83 84 #ifndef OLD_SESSION_TIMEOUT 85 # define OLD_SESSION_TIMEOUT ctx->ctx_timeout 86 #endif /* OLD_SESSION_TIMEOUT */ 87 88 /* session states - with respect to the pool of workers */ 89 #define WKST_INIT 0 /* initial state */ 90 #define WKST_READY_TO_RUN 1 /* command ready do be read */ 91 #define WKST_RUNNING 2 /* session running on a worker */ 92 #define WKST_READY_TO_WAIT 3 /* session just finished by a worker */ 93 #define WKST_WAITING 4 /* waiting for new command */ 94 #define WKST_CLOSING 5 /* session finished */ 95 96 #ifndef MIN_WORKERS 97 # define MIN_WORKERS 2 /* minimum number of threads to keep around */ 98 #endif 99 100 #define MIN_IDLE 1 /* minimum number of idle threads */ 101 102 103 /* 104 ** Macros for threads and mutex management 105 */ 106 107 #define TASKMGR_LOCK() \ 108 do \ 109 { \ 110 if (!smutex_lock(&Tskmgr.tm_w_mutex)) \ 111 smi_log(SMI_LOG_ERR, "TASKMGR_LOCK error"); \ 112 } while (0) 113 114 #define TASKMGR_UNLOCK() \ 115 do \ 116 { \ 117 if (!smutex_unlock(&Tskmgr.tm_w_mutex)) \ 118 smi_log(SMI_LOG_ERR, "TASKMGR_UNLOCK error"); \ 119 } while (0) 120 121 #define TASKMGR_COND_WAIT() \ 122 scond_timedwait(&Tskmgr.tm_w_cond, &Tskmgr.tm_w_mutex, COND_TIMEOUT) 123 124 #define TASKMGR_COND_SIGNAL() \ 125 do \ 126 { \ 127 if (scond_signal(&Tskmgr.tm_w_cond) != 0) \ 128 smi_log(SMI_LOG_ERR, "TASKMGR_COND_SIGNAL error"); \ 129 } while (0) 130 131 #define LAUNCH_WORKER(ctx) \ 132 do \ 133 { \ 134 int r; \ 135 sthread_t tid; \ 136 \ 137 if ((r = thread_create(&tid, mi_worker, ctx)) != 0) \ 138 smi_log(SMI_LOG_ERR, "LAUNCH_WORKER error: %s",\ 139 sm_errstring(r)); \ 140 } while (0) 141 142 #if POOL_DEBUG 143 # define POOL_LEV_DPRINTF(lev, x) \ 144 do \ 145 { \ 146 if ((lev) < ctx->ctx_dbg) \ 147 sm_dprintf x; \ 148 } while (0) 149 #else /* POOL_DEBUG */ 150 # define POOL_LEV_DPRINTF(lev, x) 151 #endif /* POOL_DEBUG */ 152 153 /* 154 ** MI_START_SESSION -- Start a session in the pool of workers 155 ** 156 ** Parameters: 157 ** ctx -- context structure 158 ** 159 ** Returns: 160 ** MI_SUCCESS/MI_FAILURE 161 */ 162 163 int 164 mi_start_session(ctx) 165 SMFICTX_PTR ctx; 166 { 167 static long id = 0; 168 169 /* this can happen if the milter is shutting down */ 170 if (Tskmgr.tm_signature != TM_SIGNATURE) 171 return MI_FAILURE; 172 SM_ASSERT(ctx != NULL); 173 POOL_LEV_DPRINTF(4, ("PIPE r=[%d] w=[%d]", RD_PIPE, WR_PIPE)); 174 TASKMGR_LOCK(); 175 176 if (mi_list_add_ctx(ctx) != MI_SUCCESS) 177 { 178 TASKMGR_UNLOCK(); 179 return MI_FAILURE; 180 } 181 182 ctx->ctx_sid = id++; 183 184 /* if there is an idle worker, signal it, otherwise start new worker */ 185 if (Tskmgr.tm_nb_idle > 0) 186 { 187 ctx->ctx_wstate = WKST_READY_TO_RUN; 188 TASKMGR_COND_SIGNAL(); 189 } 190 else 191 { 192 ctx->ctx_wstate = WKST_RUNNING; 193 LAUNCH_WORKER(ctx); 194 } 195 TASKMGR_UNLOCK(); 196 return MI_SUCCESS; 197 } 198 199 /* 200 ** MI_CLOSE_SESSION -- Close a session and clean up data structures 201 ** 202 ** Parameters: 203 ** ctx -- context structure 204 ** 205 ** Returns: 206 ** MI_SUCCESS/MI_FAILURE 207 */ 208 209 static int 210 mi_close_session(ctx) 211 SMFICTX_PTR ctx; 212 { 213 SM_ASSERT(ctx != NULL); 214 215 (void) mi_list_del_ctx(ctx); 216 mi_clr_ctx(ctx); 217 218 return MI_SUCCESS; 219 } 220 221 /* 222 ** NONBLOCKING -- set nonblocking mode for a file descriptor. 223 ** 224 ** Parameters: 225 ** fd -- file descriptor 226 ** name -- name for (error) logging 227 ** 228 ** Returns: 229 ** MI_SUCCESS/MI_FAILURE 230 */ 231 232 static int 233 nonblocking(int fd, const char *name) 234 { 235 int r; 236 237 errno = 0; 238 r = fcntl(fd, F_GETFL, 0); 239 if (r == -1) 240 { 241 smi_log(SMI_LOG_ERR, "fcntl(%s, F_GETFL)=%s", 242 name, sm_errstring(errno)); 243 return MI_FAILURE; 244 } 245 errno = 0; 246 r = fcntl(fd, F_SETFL, r | O_NONBLOCK); 247 if (r == -1) 248 { 249 smi_log(SMI_LOG_ERR, "fcntl(%s, F_SETFL, O_NONBLOCK)=%s", 250 name, sm_errstring(errno)); 251 return MI_FAILURE; 252 } 253 return MI_SUCCESS; 254 } 255 256 /* 257 ** MI_POOL_CONTROLLER_INIT -- Launch the worker pool controller 258 ** Must be called before starting sessions. 259 ** 260 ** Parameters: 261 ** none 262 ** 263 ** Returns: 264 ** MI_SUCCESS/MI_FAILURE 265 */ 266 267 int 268 mi_pool_controller_init() 269 { 270 sthread_t tid; 271 int r, i; 272 273 if (Tskmgr.tm_signature == TM_SIGNATURE) 274 return MI_SUCCESS; 275 276 SM_TAILQ_INIT(&WRK_CTX_HEAD); 277 Tskmgr.tm_tid = (sthread_t) -1; 278 Tskmgr.tm_nb_workers = 0; 279 Tskmgr.tm_nb_idle = 0; 280 281 if (pipe(Tskmgr.tm_p) != 0) 282 { 283 smi_log(SMI_LOG_ERR, "can't create event pipe: %s", 284 sm_errstring(errno)); 285 return MI_FAILURE; 286 } 287 r = nonblocking(WR_PIPE, "WR_PIPE"); 288 if (r != MI_SUCCESS) 289 return r; 290 r = nonblocking(RD_PIPE, "RD_PIPE"); 291 if (r != MI_SUCCESS) 292 return r; 293 294 (void) smutex_init(&Tskmgr.tm_w_mutex); 295 (void) scond_init(&Tskmgr.tm_w_cond); 296 297 /* Launch the pool controller */ 298 if ((r = thread_create(&tid, mi_pool_controller, (void *) NULL)) != 0) 299 { 300 smi_log(SMI_LOG_ERR, "can't create controller thread: %s", 301 sm_errstring(r)); 302 return MI_FAILURE; 303 } 304 Tskmgr.tm_tid = tid; 305 Tskmgr.tm_signature = TM_SIGNATURE; 306 307 /* Create the pool of workers */ 308 for (i = 0; i < MIN_WORKERS; i++) 309 { 310 if ((r = thread_create(&tid, mi_worker, (void *) NULL)) != 0) 311 { 312 smi_log(SMI_LOG_ERR, "can't create workers crew: %s", 313 sm_errstring(r)); 314 return MI_FAILURE; 315 } 316 } 317 318 return MI_SUCCESS; 319 } 320 321 /* 322 ** MI_POOL_CONTROLLER -- manage the pool of workers 323 ** This thread must be running when listener begins 324 ** starting sessions 325 ** 326 ** Parameters: 327 ** arg -- unused 328 ** 329 ** Returns: 330 ** NULL 331 ** 332 ** Control flow: 333 ** for (;;) 334 ** Look for timed out sessions 335 ** Select sessions to wait for sendmail command 336 ** Poll set of file descriptors 337 ** if timeout 338 ** continue 339 ** For each file descriptor ready 340 ** launch new thread if no worker available 341 ** else 342 ** signal waiting worker 343 */ 344 345 /* Poll structure array (pollfd) size step */ 346 #define PFD_STEP 256 347 348 #define WAIT_FD(i) (pfd[i].fd) 349 #define WAITFN "POLL" 350 351 static void * 352 mi_pool_controller(arg) 353 void *arg; 354 { 355 struct pollfd *pfd = NULL; 356 int dim_pfd = 0; 357 bool rebuild_set = true; 358 int pcnt = 0; /* error count for poll() failures */ 359 time_t lastcheck; 360 361 Tskmgr.tm_tid = sthread_get_id(); 362 if (pthread_detach(Tskmgr.tm_tid) != 0) 363 { 364 smi_log(SMI_LOG_ERR, "Failed to detach pool controller thread"); 365 return NULL; 366 } 367 368 pfd = (struct pollfd *) malloc(PFD_STEP * sizeof(struct pollfd)); 369 if (pfd == NULL) 370 { 371 smi_log(SMI_LOG_ERR, "Failed to malloc pollfd array: %s", 372 sm_errstring(errno)); 373 return NULL; 374 } 375 dim_pfd = PFD_STEP; 376 377 lastcheck = time(NULL); 378 for (;;) 379 { 380 SMFICTX_PTR ctx; 381 int nfd, r, i; 382 time_t now; 383 384 POOL_LEV_DPRINTF(4, ("Let's %s again...", WAITFN)); 385 386 if (mi_stop() != MILTER_CONT) 387 break; 388 389 TASKMGR_LOCK(); 390 391 now = time(NULL); 392 393 /* check for timed out sessions? */ 394 if (lastcheck + DT_CHECK_OLD_SESSIONS < now) 395 { 396 ctx = SM_TAILQ_FIRST(&WRK_CTX_HEAD); 397 while (ctx != SM_TAILQ_END(&WRK_CTX_HEAD)) 398 { 399 SMFICTX_PTR ctx_nxt; 400 401 ctx_nxt = SM_TAILQ_NEXT(ctx, ctx_link); 402 if (ctx->ctx_wstate == WKST_WAITING) 403 { 404 if (ctx->ctx_wait == 0) 405 ctx->ctx_wait = now; 406 else if (ctx->ctx_wait + OLD_SESSION_TIMEOUT 407 < now) 408 { 409 /* if session timed out, close it */ 410 sfsistat (*fi_close) __P((SMFICTX *)); 411 412 POOL_LEV_DPRINTF(4, 413 ("Closing old connection: sd=%d id=%d", 414 ctx->ctx_sd, 415 ctx->ctx_sid)); 416 417 if ((fi_close = ctx->ctx_smfi->xxfi_close) != NULL) 418 (void) (*fi_close)(ctx); 419 420 mi_close_session(ctx); 421 } 422 } 423 ctx = ctx_nxt; 424 } 425 lastcheck = now; 426 } 427 428 if (rebuild_set) 429 { 430 /* 431 ** Initialize poll set. 432 ** Insert into the poll set the file descriptors of 433 ** all sessions waiting for a command from sendmail. 434 */ 435 436 nfd = 0; 437 438 /* begin with worker pipe */ 439 pfd[nfd].fd = RD_PIPE; 440 pfd[nfd].events = MI_POLL_RD_FLAGS; 441 pfd[nfd].revents = 0; 442 nfd++; 443 444 SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link) 445 { 446 /* 447 ** update ctx_wait - start of wait moment - 448 ** for timeout 449 */ 450 451 if (ctx->ctx_wstate == WKST_READY_TO_WAIT) 452 ctx->ctx_wait = now; 453 454 /* add the session to the pollfd array? */ 455 if ((ctx->ctx_wstate == WKST_READY_TO_WAIT) || 456 (ctx->ctx_wstate == WKST_WAITING)) 457 { 458 /* 459 ** Resize the pollfd array if it 460 ** isn't large enough. 461 */ 462 463 if (nfd >= dim_pfd) 464 { 465 struct pollfd *tpfd; 466 size_t new; 467 468 new = (dim_pfd + PFD_STEP) * 469 sizeof(*tpfd); 470 tpfd = (struct pollfd *) 471 realloc(pfd, new); 472 if (tpfd != NULL) 473 { 474 pfd = tpfd; 475 dim_pfd += PFD_STEP; 476 } 477 else 478 { 479 smi_log(SMI_LOG_ERR, 480 "Failed to realloc pollfd array:%s", 481 sm_errstring(errno)); 482 } 483 } 484 485 /* add the session to pollfd array */ 486 if (nfd < dim_pfd) 487 { 488 ctx->ctx_wstate = WKST_WAITING; 489 pfd[nfd].fd = ctx->ctx_sd; 490 pfd[nfd].events = MI_POLL_RD_FLAGS; 491 pfd[nfd].revents = 0; 492 nfd++; 493 } 494 } 495 } 496 rebuild_set = false; 497 } 498 499 TASKMGR_UNLOCK(); 500 501 /* Everything is ready, let's wait for an event */ 502 r = poll(pfd, nfd, POLL_TIMEOUT); 503 504 POOL_LEV_DPRINTF(4, ("%s returned: at epoch %d value %d", 505 WAITFN, now, nfd)); 506 507 /* timeout */ 508 if (r == 0) 509 continue; 510 511 rebuild_set = true; 512 513 /* error */ 514 if (r < 0) 515 { 516 if (errno == EINTR) 517 continue; 518 pcnt++; 519 smi_log(SMI_LOG_ERR, 520 "%s() failed (%s), %s", 521 WAITFN, sm_errstring(errno), 522 pcnt >= MAX_FAILS_S ? "abort" : "try again"); 523 524 if (pcnt >= MAX_FAILS_S) 525 goto err; 526 continue; 527 } 528 pcnt = 0; 529 530 /* something happened */ 531 for (i = 0; i < nfd; i++) 532 { 533 if (pfd[i].revents == 0) 534 continue; 535 536 POOL_LEV_DPRINTF(4, ("%s event on pfd[%d/%d]=%d ", 537 WAITFN, i, nfd, 538 WAIT_FD(i))); 539 540 /* has a worker signaled an end of task? */ 541 if (WAIT_FD(i) == RD_PIPE) 542 { 543 char evts[256]; 544 ssize_t r; 545 546 POOL_LEV_DPRINTF(4, 547 ("PIPE WILL READ evt = %08X %08X", 548 pfd[i].events, pfd[i].revents)); 549 550 r = 1; 551 while ((pfd[i].revents & MI_POLL_RD_FLAGS) != 0 552 && r != -1) 553 { 554 r = read(RD_PIPE, evts, sizeof(evts)); 555 } 556 557 POOL_LEV_DPRINTF(4, 558 ("PIPE DONE READ i=[%d] fd=[%d] r=[%d] evt=[%d]", 559 i, RD_PIPE, (int) r, evts[0])); 560 561 if ((pfd[i].revents & ~MI_POLL_RD_FLAGS) != 0) 562 { 563 /* Exception handling */ 564 } 565 continue; 566 } 567 568 /* 569 ** Not the pipe for workers waking us, 570 ** so must be something on an MTA connection. 571 */ 572 573 TASKMGR_LOCK(); 574 SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link) 575 { 576 if (ctx->ctx_wstate != WKST_WAITING) 577 continue; 578 579 POOL_LEV_DPRINTF(4, 580 ("Checking context sd=%d - fd=%d ", 581 ctx->ctx_sd , WAIT_FD(i))); 582 583 if (ctx->ctx_sd == pfd[i].fd) 584 { 585 586 POOL_LEV_DPRINTF(4, 587 ("TASK: found %d for fd[%d]=%d", 588 ctx->ctx_sid, i, WAIT_FD(i))); 589 590 if (Tskmgr.tm_nb_idle > 0) 591 { 592 ctx->ctx_wstate = WKST_READY_TO_RUN; 593 TASKMGR_COND_SIGNAL(); 594 } 595 else 596 { 597 ctx->ctx_wstate = WKST_RUNNING; 598 LAUNCH_WORKER(ctx); 599 } 600 break; 601 } 602 } 603 TASKMGR_UNLOCK(); 604 605 POOL_LEV_DPRINTF(4, 606 ("TASK %s FOUND - Checking PIPE for fd[%d]", 607 ctx != NULL ? "" : "NOT", WAIT_FD(i))); 608 } 609 } 610 611 err: 612 if (pfd != NULL) 613 free(pfd); 614 615 Tskmgr.tm_signature = 0; 616 #if 0 617 /* 618 ** Do not clean up ctx -- it can cause double-free()s. 619 ** The program is shutting down anyway, so it's not worth the trouble. 620 ** There is a more complex solution that prevents race conditions 621 ** while accessing ctx, but that's maybe for a later version. 622 */ 623 624 for (;;) 625 { 626 SMFICTX_PTR ctx; 627 628 ctx = SM_TAILQ_FIRST(&WRK_CTX_HEAD); 629 if (ctx == NULL) 630 break; 631 mi_close_session(ctx); 632 } 633 #endif 634 635 (void) smutex_destroy(&Tskmgr.tm_w_mutex); 636 (void) scond_destroy(&Tskmgr.tm_w_cond); 637 638 return NULL; 639 } 640 641 /* 642 ** Look for a task ready to run. 643 ** Value of ctx is NULL or a pointer to a task ready to run. 644 */ 645 646 #define GET_TASK_READY_TO_RUN() \ 647 SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link) \ 648 { \ 649 if (ctx->ctx_wstate == WKST_READY_TO_RUN) \ 650 { \ 651 ctx->ctx_wstate = WKST_RUNNING; \ 652 break; \ 653 } \ 654 } 655 656 /* 657 ** MI_WORKER -- worker thread 658 ** executes tasks distributed by the mi_pool_controller 659 ** or by mi_start_session 660 ** 661 ** Parameters: 662 ** arg -- pointer to context structure 663 ** 664 ** Returns: 665 ** NULL pointer 666 */ 667 668 static void * 669 mi_worker(arg) 670 void *arg; 671 { 672 SMFICTX_PTR ctx; 673 bool done; 674 sthread_t t_id; 675 int r; 676 677 ctx = (SMFICTX_PTR) arg; 678 done = false; 679 if (ctx != NULL) 680 ctx->ctx_wstate = WKST_RUNNING; 681 682 t_id = sthread_get_id(); 683 if (pthread_detach(t_id) != 0) 684 { 685 smi_log(SMI_LOG_ERR, "Failed to detach worker thread"); 686 if (ctx != NULL) 687 ctx->ctx_wstate = WKST_READY_TO_RUN; 688 return NULL; 689 } 690 691 TASKMGR_LOCK(); 692 Tskmgr.tm_nb_workers++; 693 TASKMGR_UNLOCK(); 694 695 while (!done) 696 { 697 if (mi_stop() != MILTER_CONT) 698 break; 699 700 /* let's handle next task... */ 701 if (ctx != NULL) 702 { 703 int res; 704 705 POOL_LEV_DPRINTF(4, 706 ("worker %d: new task -> let's handle it", 707 t_id)); 708 res = mi_engine(ctx); 709 POOL_LEV_DPRINTF(4, 710 ("worker %d: mi_engine returned %d", t_id, res)); 711 712 TASKMGR_LOCK(); 713 if (res != MI_CONTINUE) 714 { 715 ctx->ctx_wstate = WKST_CLOSING; 716 717 /* 718 ** Delete context from linked list of 719 ** sessions and close session. 720 */ 721 722 mi_close_session(ctx); 723 } 724 else 725 { 726 ctx->ctx_wstate = WKST_READY_TO_WAIT; 727 728 POOL_LEV_DPRINTF(4, 729 ("writing to event pipe...")); 730 731 /* 732 ** Signal task controller to add new session 733 ** to poll set. 734 */ 735 736 PIPE_SEND_SIGNAL(); 737 } 738 TASKMGR_UNLOCK(); 739 ctx = NULL; 740 741 } 742 743 /* check if there is any task waiting to be served */ 744 TASKMGR_LOCK(); 745 746 GET_TASK_READY_TO_RUN(); 747 748 /* Got a task? */ 749 if (ctx != NULL) 750 { 751 TASKMGR_UNLOCK(); 752 continue; 753 } 754 755 /* 756 ** if not, let's check if there is enough idle workers 757 ** if yes: quit 758 */ 759 760 if (Tskmgr.tm_nb_workers > MIN_WORKERS && 761 Tskmgr.tm_nb_idle > MIN_IDLE) 762 done = true; 763 764 POOL_LEV_DPRINTF(4, ("worker %d: checking ... %d %d", t_id, 765 Tskmgr.tm_nb_workers, Tskmgr.tm_nb_idle + 1)); 766 767 if (done) 768 { 769 POOL_LEV_DPRINTF(4, ("worker %d: quitting... ", t_id)); 770 Tskmgr.tm_nb_workers--; 771 TASKMGR_UNLOCK(); 772 continue; 773 } 774 775 /* 776 ** if no task ready to run, wait for another one 777 */ 778 779 Tskmgr.tm_nb_idle++; 780 TASKMGR_COND_WAIT(); 781 Tskmgr.tm_nb_idle--; 782 783 /* look for a task */ 784 GET_TASK_READY_TO_RUN(); 785 786 TASKMGR_UNLOCK(); 787 } 788 return NULL; 789 } 790 791 /* 792 ** MI_LIST_ADD_CTX -- add new session to linked list 793 ** 794 ** Parameters: 795 ** ctx -- context structure 796 ** 797 ** Returns: 798 ** MI_FAILURE/MI_SUCCESS 799 */ 800 801 static int 802 mi_list_add_ctx(ctx) 803 SMFICTX_PTR ctx; 804 { 805 SM_ASSERT(ctx != NULL); 806 SM_TAILQ_INSERT_TAIL(&WRK_CTX_HEAD, ctx, ctx_link); 807 return MI_SUCCESS; 808 } 809 810 /* 811 ** MI_LIST_DEL_CTX -- remove session from linked list when finished 812 ** 813 ** Parameters: 814 ** ctx -- context structure 815 ** 816 ** Returns: 817 ** MI_FAILURE/MI_SUCCESS 818 */ 819 820 static int 821 mi_list_del_ctx(ctx) 822 SMFICTX_PTR ctx; 823 { 824 SM_ASSERT(ctx != NULL); 825 if (SM_TAILQ_EMPTY(&WRK_CTX_HEAD)) 826 return MI_FAILURE; 827 828 SM_TAILQ_REMOVE(&WRK_CTX_HEAD, ctx, ctx_link); 829 return MI_SUCCESS; 830 } 831 #endif /* _FFR_WORKERS_POOL */ 832