1 /*
2 * Copyright (c) 2003-2004, 2007, 2009 Sendmail, Inc. and its suppliers.
3 * All rights reserved.
4 *
5 * By using this file, you agree to the terms and conditions set
6 * forth in the LICENSE file which can be found at the top level of
7 * the sendmail distribution.
8 *
9 * Contributed by Jose Marcio Martins da Cruz - Ecole des Mines de Paris
10 * Jose-Marcio.Martins@ensmp.fr
11 */
12
13 #include <sm/gen.h>
14 SM_RCSID("@(#)$Id: worker.c,v 8.17 2009/06/15 15:34:54 ca Exp $")
15
16 #include "libmilter.h"
17
18 #if _FFR_WORKERS_POOL
19
20 typedef struct taskmgr_S taskmgr_T;
21
22 #define TM_SIGNATURE 0x23021957
23
24 struct taskmgr_S
25 {
26 long tm_signature; /* has the controller been initialized */
27 sthread_t tm_tid; /* thread id of controller */
28 smfi_hd_T tm_ctx_head; /* head of the linked list of contexts */
29
30 int tm_nb_workers; /* number of workers in the pool */
31 int tm_nb_idle; /* number of workers waiting */
32
33 int tm_p[2]; /* poll control pipe */
34
35 smutex_t tm_w_mutex; /* linked list access mutex */
36 scond_t tm_w_cond; /* */
37 };
38
39 static taskmgr_T Tskmgr = {0};
40
41 #define WRK_CTX_HEAD Tskmgr.tm_ctx_head
42
43 #define RD_PIPE (Tskmgr.tm_p[0])
44 #define WR_PIPE (Tskmgr.tm_p[1])
45
46 #define PIPE_SEND_SIGNAL() \
47 do \
48 { \
49 char evt = 0x5a; \
50 int fd = WR_PIPE; \
51 if (write(fd, &evt, sizeof(evt)) != sizeof(evt)) \
52 smi_log(SMI_LOG_ERR, \
53 "Error writing to event pipe: %s", \
54 sm_errstring(errno)); \
55 } while (0)
56
57 #ifndef USE_PIPE_WAKE_POLL
58 # define USE_PIPE_WAKE_POLL 1
59 #endif /* USE_PIPE_WAKE_POLL */
60
61 /* poll check periodicity (default 10000 - 10 s) */
62 #define POLL_TIMEOUT 10000
63
64 /* worker conditional wait timeout (default 10 s) */
65 #define COND_TIMEOUT 10
66
67 /* functions */
68 static int mi_close_session __P((SMFICTX_PTR));
69
70 static void *mi_worker __P((void *));
71 static void *mi_pool_controller __P((void *));
72
73 static int mi_list_add_ctx __P((SMFICTX_PTR));
74 static int mi_list_del_ctx __P((SMFICTX_PTR));
75
76 /*
77 ** periodicity of cleaning up old sessions (timedout)
78 ** sessions list will be checked to find old inactive
79 ** sessions each DT_CHECK_OLD_SESSIONS sec
80 */
81
82 #define DT_CHECK_OLD_SESSIONS 600
83
84 #ifndef OLD_SESSION_TIMEOUT
85 # define OLD_SESSION_TIMEOUT ctx->ctx_timeout
86 #endif /* OLD_SESSION_TIMEOUT */
87
88 /* session states - with respect to the pool of workers */
89 #define WKST_INIT 0 /* initial state */
90 #define WKST_READY_TO_RUN 1 /* command ready do be read */
91 #define WKST_RUNNING 2 /* session running on a worker */
92 #define WKST_READY_TO_WAIT 3 /* session just finished by a worker */
93 #define WKST_WAITING 4 /* waiting for new command */
94 #define WKST_CLOSING 5 /* session finished */
95
96 #ifndef MIN_WORKERS
97 # define MIN_WORKERS 2 /* minimum number of threads to keep around */
98 #endif
99
100 #define MIN_IDLE 1 /* minimum number of idle threads */
101
102
103 /*
104 ** Macros for threads and mutex management
105 */
106
107 #define TASKMGR_LOCK() \
108 do \
109 { \
110 if (!smutex_lock(&Tskmgr.tm_w_mutex)) \
111 smi_log(SMI_LOG_ERR, "TASKMGR_LOCK error"); \
112 } while (0)
113
114 #define TASKMGR_UNLOCK() \
115 do \
116 { \
117 if (!smutex_unlock(&Tskmgr.tm_w_mutex)) \
118 smi_log(SMI_LOG_ERR, "TASKMGR_UNLOCK error"); \
119 } while (0)
120
121 #define TASKMGR_COND_WAIT() \
122 scond_timedwait(&Tskmgr.tm_w_cond, &Tskmgr.tm_w_mutex, COND_TIMEOUT)
123
124 #define TASKMGR_COND_SIGNAL() \
125 do \
126 { \
127 if (scond_signal(&Tskmgr.tm_w_cond) != 0) \
128 smi_log(SMI_LOG_ERR, "TASKMGR_COND_SIGNAL error"); \
129 } while (0)
130
131 #define LAUNCH_WORKER(ctx) \
132 do \
133 { \
134 int r; \
135 sthread_t tid; \
136 \
137 if ((r = thread_create(&tid, mi_worker, ctx)) != 0) \
138 smi_log(SMI_LOG_ERR, "LAUNCH_WORKER error: %s",\
139 sm_errstring(r)); \
140 } while (0)
141
142 #if POOL_DEBUG
143 # define POOL_LEV_DPRINTF(lev, x) \
144 do { \
145 if ((lev) < ctx->ctx_dbg) \
146 sm_dprintf x; \
147 } while (0)
148 #else /* POOL_DEBUG */
149 # define POOL_LEV_DPRINTF(lev, x)
150 #endif /* POOL_DEBUG */
151
152 /*
153 ** MI_START_SESSION -- Start a session in the pool of workers
154 **
155 ** Parameters:
156 ** ctx -- context structure
157 **
158 ** Returns:
159 ** MI_SUCCESS/MI_FAILURE
160 */
161
162 int
mi_start_session(ctx)163 mi_start_session(ctx)
164 SMFICTX_PTR ctx;
165 {
166 static long id = 0;
167
168 SM_ASSERT(Tskmgr.tm_signature == TM_SIGNATURE);
169 SM_ASSERT(ctx != NULL);
170 POOL_LEV_DPRINTF(4, ("PIPE r=[%d] w=[%d]", RD_PIPE, WR_PIPE));
171 TASKMGR_LOCK();
172
173 if (mi_list_add_ctx(ctx) != MI_SUCCESS)
174 {
175 TASKMGR_UNLOCK();
176 return MI_FAILURE;
177 }
178
179 ctx->ctx_sid = id++;
180
181 /* if there is an idle worker, signal it, otherwise start new worker */
182 if (Tskmgr.tm_nb_idle > 0)
183 {
184 ctx->ctx_wstate = WKST_READY_TO_RUN;
185 TASKMGR_COND_SIGNAL();
186 }
187 else
188 {
189 ctx->ctx_wstate = WKST_RUNNING;
190 LAUNCH_WORKER(ctx);
191 }
192 TASKMGR_UNLOCK();
193 return MI_SUCCESS;
194 }
195
196 /*
197 ** MI_CLOSE_SESSION -- Close a session and clean up data structures
198 **
199 ** Parameters:
200 ** ctx -- context structure
201 **
202 ** Returns:
203 ** MI_SUCCESS/MI_FAILURE
204 */
205
206 static int
mi_close_session(ctx)207 mi_close_session(ctx)
208 SMFICTX_PTR ctx;
209 {
210 SM_ASSERT(ctx != NULL);
211
212 (void) mi_list_del_ctx(ctx);
213 mi_clr_ctx(ctx);
214
215 return MI_SUCCESS;
216 }
217
218 /*
219 ** MI_POOL_CONTROLER_INIT -- Launch the worker pool controller
220 ** Must be called before starting sessions.
221 **
222 ** Parameters:
223 ** none
224 **
225 ** Returns:
226 ** MI_SUCCESS/MI_FAILURE
227 */
228
229 int
mi_pool_controller_init()230 mi_pool_controller_init()
231 {
232 sthread_t tid;
233 int r, i;
234
235 if (Tskmgr.tm_signature == TM_SIGNATURE)
236 return MI_SUCCESS;
237
238 SM_TAILQ_INIT(&WRK_CTX_HEAD);
239 Tskmgr.tm_tid = (sthread_t) -1;
240 Tskmgr.tm_nb_workers = 0;
241 Tskmgr.tm_nb_idle = 0;
242
243 if (pipe(Tskmgr.tm_p) != 0)
244 {
245 smi_log(SMI_LOG_ERR, "can't create event pipe: %s",
246 sm_errstring(errno));
247 return MI_FAILURE;
248 }
249
250 (void) smutex_init(&Tskmgr.tm_w_mutex);
251 (void) scond_init(&Tskmgr.tm_w_cond);
252
253 /* Launch the pool controller */
254 if ((r = thread_create(&tid, mi_pool_controller, (void *) NULL)) != 0)
255 {
256 smi_log(SMI_LOG_ERR, "can't create controller thread: %s",
257 sm_errstring(r));
258 return MI_FAILURE;
259 }
260 Tskmgr.tm_tid = tid;
261 Tskmgr.tm_signature = TM_SIGNATURE;
262
263 /* Create the pool of workers */
264 for (i = 0; i < MIN_WORKERS; i++)
265 {
266 if ((r = thread_create(&tid, mi_worker, (void *) NULL)) != 0)
267 {
268 smi_log(SMI_LOG_ERR, "can't create workers crew: %s",
269 sm_errstring(r));
270 return MI_FAILURE;
271 }
272 }
273
274 return MI_SUCCESS;
275 }
276
277 /*
278 ** MI_POOL_CONTROLLER -- manage the pool of workers
279 ** This thread must be running when listener begins
280 ** starting sessions
281 **
282 ** Parameters:
283 ** arg -- unused
284 **
285 ** Returns:
286 ** NULL
287 **
288 ** Control flow:
289 ** for (;;)
290 ** Look for timed out sessions
291 ** Select sessions to wait for sendmail command
292 ** Poll set of file descriptors
293 ** if timeout
294 ** continue
295 ** For each file descriptor ready
296 ** launch new thread if no worker available
297 ** else
298 ** signal waiting worker
299 */
300
301 /* Poll structure array (pollfd) size step */
302 #define PFD_STEP 256
303
304 #define WAIT_FD(i) (pfd[i].fd)
305 #define WAITFN "POLL"
306
307 static void *
mi_pool_controller(arg)308 mi_pool_controller(arg)
309 void *arg;
310 {
311 struct pollfd *pfd = NULL;
312 int dim_pfd = 0;
313 bool rebuild_set = true;
314 int pcnt = 0; /* error count for poll() failures */
315 time_t lastcheck;
316
317 Tskmgr.tm_tid = sthread_get_id();
318 if (pthread_detach(Tskmgr.tm_tid) != 0)
319 {
320 smi_log(SMI_LOG_ERR, "Failed to detach pool controller thread");
321 return NULL;
322 }
323
324 pfd = (struct pollfd *) malloc(PFD_STEP * sizeof(struct pollfd));
325 if (pfd == NULL)
326 {
327 smi_log(SMI_LOG_ERR, "Failed to malloc pollfd array: %s",
328 sm_errstring(errno));
329 return NULL;
330 }
331 dim_pfd = PFD_STEP;
332
333 lastcheck = time(NULL);
334 for (;;)
335 {
336 SMFICTX_PTR ctx;
337 int nfd, rfd, i;
338 time_t now;
339
340 POOL_LEV_DPRINTF(4, ("Let's %s again...", WAITFN));
341
342 if (mi_stop() != MILTER_CONT)
343 break;
344
345 TASKMGR_LOCK();
346
347 now = time(NULL);
348
349 /* check for timed out sessions? */
350 if (lastcheck + DT_CHECK_OLD_SESSIONS < now)
351 {
352 ctx = SM_TAILQ_FIRST(&WRK_CTX_HEAD);
353 while (ctx != SM_TAILQ_END(&WRK_CTX_HEAD))
354 {
355 SMFICTX_PTR ctx_nxt;
356
357 ctx_nxt = SM_TAILQ_NEXT(ctx, ctx_link);
358 if (ctx->ctx_wstate == WKST_WAITING)
359 {
360 if (ctx->ctx_wait == 0)
361 ctx->ctx_wait = now;
362 else if (ctx->ctx_wait + OLD_SESSION_TIMEOUT
363 < now)
364 {
365 /* if session timed out, close it */
366 sfsistat (*fi_close) __P((SMFICTX *));
367
368 POOL_LEV_DPRINTF(4,
369 ("Closing old connection: sd=%d id=%d",
370 ctx->ctx_sd,
371 ctx->ctx_sid));
372
373 if ((fi_close = ctx->ctx_smfi->xxfi_close) != NULL)
374 (void) (*fi_close)(ctx);
375
376 mi_close_session(ctx);
377 }
378 }
379 ctx = ctx_nxt;
380 }
381 lastcheck = now;
382 }
383
384 if (rebuild_set)
385 {
386 /*
387 ** Initialize poll set.
388 ** Insert into the poll set the file descriptors of
389 ** all sessions waiting for a command from sendmail.
390 */
391
392 nfd = 0;
393
394 /* begin with worker pipe */
395 pfd[nfd].fd = RD_PIPE;
396 pfd[nfd].events = MI_POLL_RD_FLAGS;
397 pfd[nfd].revents = 0;
398 nfd++;
399
400 SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link)
401 {
402 /*
403 ** update ctx_wait - start of wait moment -
404 ** for timeout
405 */
406
407 if (ctx->ctx_wstate == WKST_READY_TO_WAIT)
408 ctx->ctx_wait = now;
409
410 /* add the session to the pollfd array? */
411 if ((ctx->ctx_wstate == WKST_READY_TO_WAIT) ||
412 (ctx->ctx_wstate == WKST_WAITING))
413 {
414 /*
415 ** Resize the pollfd array if it
416 ** isn't large enough.
417 */
418
419 if (nfd >= dim_pfd)
420 {
421 struct pollfd *tpfd;
422 size_t new;
423
424 new = (dim_pfd + PFD_STEP) *
425 sizeof(*tpfd);
426 tpfd = (struct pollfd *)
427 realloc(pfd, new);
428 if (tpfd != NULL)
429 {
430 pfd = tpfd;
431 dim_pfd += PFD_STEP;
432 }
433 else
434 {
435 smi_log(SMI_LOG_ERR,
436 "Failed to realloc pollfd array:%s",
437 sm_errstring(errno));
438 }
439 }
440
441 /* add the session to pollfd array */
442 if (nfd < dim_pfd)
443 {
444 ctx->ctx_wstate = WKST_WAITING;
445 pfd[nfd].fd = ctx->ctx_sd;
446 pfd[nfd].events = MI_POLL_RD_FLAGS;
447 pfd[nfd].revents = 0;
448 nfd++;
449 }
450 }
451 }
452 rebuild_set = false;
453 }
454
455 TASKMGR_UNLOCK();
456
457 /* Everything is ready, let's wait for an event */
458 rfd = poll(pfd, nfd, POLL_TIMEOUT);
459
460 POOL_LEV_DPRINTF(4, ("%s returned: at epoch %d value %d",
461 WAITFN, now, nfd));
462
463 /* timeout */
464 if (rfd == 0)
465 continue;
466
467 rebuild_set = true;
468
469 /* error */
470 if (rfd < 0)
471 {
472 if (errno == EINTR)
473 continue;
474 pcnt++;
475 smi_log(SMI_LOG_ERR,
476 "%s() failed (%s), %s",
477 WAITFN, sm_errstring(errno),
478 pcnt >= MAX_FAILS_S ? "abort" : "try again");
479
480 if (pcnt >= MAX_FAILS_S)
481 goto err;
482 }
483 pcnt = 0;
484
485 /* something happened */
486 for (i = 0; i < nfd; i++)
487 {
488 if (pfd[i].revents == 0)
489 continue;
490
491 POOL_LEV_DPRINTF(4, ("%s event on pfd[%d/%d]=%d ",
492 WAITFN, i, nfd,
493 WAIT_FD(i)));
494
495 /* has a worker signaled an end of task ? */
496 if (WAIT_FD(i) == RD_PIPE)
497 {
498 char evt = 0;
499 int r = 0;
500
501 POOL_LEV_DPRINTF(4,
502 ("PIPE WILL READ evt = %08X %08X",
503 pfd[i].events, pfd[i].revents));
504
505 if ((pfd[i].revents & MI_POLL_RD_FLAGS) != 0)
506 {
507 r = read(RD_PIPE, &evt, sizeof(evt));
508 if (r == sizeof(evt))
509 {
510 /* Do nothing */
511 }
512 }
513
514 POOL_LEV_DPRINTF(4,
515 ("PIPE DONE READ i=[%d] fd=[%d] r=[%d] evt=[%d]",
516 i, RD_PIPE, r, evt));
517
518 if ((pfd[i].revents & ~MI_POLL_RD_FLAGS) != 0)
519 {
520 /* Exception handling */
521 }
522 continue;
523 }
524
525 /* no ! sendmail wants to send a command */
526 SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link)
527 {
528 if (ctx->ctx_wstate != WKST_WAITING)
529 continue;
530
531 POOL_LEV_DPRINTF(4,
532 ("Checking context sd=%d - fd=%d ",
533 ctx->ctx_sd , WAIT_FD(i)));
534
535 if (ctx->ctx_sd == pfd[i].fd)
536 {
537 TASKMGR_LOCK();
538
539 POOL_LEV_DPRINTF(4,
540 ("TASK: found %d for fd[%d]=%d",
541 ctx->ctx_sid, i, WAIT_FD(i)));
542
543 if (Tskmgr.tm_nb_idle > 0)
544 {
545 ctx->ctx_wstate = WKST_READY_TO_RUN;
546 TASKMGR_COND_SIGNAL();
547 }
548 else
549 {
550 ctx->ctx_wstate = WKST_RUNNING;
551 LAUNCH_WORKER(ctx);
552 }
553 TASKMGR_UNLOCK();
554 break;
555 }
556 }
557
558 POOL_LEV_DPRINTF(4,
559 ("TASK %s FOUND - Checking PIPE for fd[%d]",
560 ctx != NULL ? "" : "NOT", WAIT_FD(i)));
561 }
562 }
563
564 err:
565 if (pfd != NULL)
566 free(pfd);
567
568 Tskmgr.tm_signature = 0;
569 for (;;)
570 {
571 SMFICTX_PTR ctx;
572
573 ctx = SM_TAILQ_FIRST(&WRK_CTX_HEAD);
574 if (ctx == NULL)
575 break;
576 mi_close_session(ctx);
577 }
578
579 (void) smutex_destroy(&Tskmgr.tm_w_mutex);
580 (void) scond_destroy(&Tskmgr.tm_w_cond);
581
582 return NULL;
583 }
584
585 /*
586 ** Look for a task ready to run.
587 ** Value of ctx is NULL or a pointer to a task ready to run.
588 */
589
590 #define GET_TASK_READY_TO_RUN() \
591 SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link) \
592 { \
593 if (ctx->ctx_wstate == WKST_READY_TO_RUN) \
594 { \
595 ctx->ctx_wstate = WKST_RUNNING; \
596 break; \
597 } \
598 }
599
600 /*
601 ** MI_WORKER -- worker thread
602 ** executes tasks distributed by the mi_pool_controller
603 ** or by mi_start_session
604 **
605 ** Parameters:
606 ** arg -- pointer to context structure
607 **
608 ** Returns:
609 ** NULL pointer
610 */
611
612 static void *
mi_worker(arg)613 mi_worker(arg)
614 void *arg;
615 {
616 SMFICTX_PTR ctx;
617 bool done;
618 sthread_t t_id;
619 int r;
620
621 ctx = (SMFICTX_PTR) arg;
622 done = false;
623 if (ctx != NULL)
624 ctx->ctx_wstate = WKST_RUNNING;
625
626 t_id = sthread_get_id();
627 if (pthread_detach(t_id) != 0)
628 {
629 smi_log(SMI_LOG_ERR, "Failed to detach worker thread");
630 if (ctx != NULL)
631 ctx->ctx_wstate = WKST_READY_TO_RUN;
632 return NULL;
633 }
634
635 TASKMGR_LOCK();
636 Tskmgr.tm_nb_workers++;
637 TASKMGR_UNLOCK();
638
639 while (!done)
640 {
641 if (mi_stop() != MILTER_CONT)
642 break;
643
644 /* let's handle next task... */
645 if (ctx != NULL)
646 {
647 int res;
648
649 POOL_LEV_DPRINTF(4,
650 ("worker %d: new task -> let's handle it",
651 t_id));
652 res = mi_engine(ctx);
653 POOL_LEV_DPRINTF(4,
654 ("worker %d: mi_engine returned %d", t_id, res));
655
656 TASKMGR_LOCK();
657 if (res != MI_CONTINUE)
658 {
659 ctx->ctx_wstate = WKST_CLOSING;
660
661 /*
662 ** Delete context from linked list of
663 ** sessions and close session.
664 */
665
666 mi_close_session(ctx);
667 }
668 else
669 {
670 ctx->ctx_wstate = WKST_READY_TO_WAIT;
671
672 POOL_LEV_DPRINTF(4,
673 ("writing to event pipe..."));
674
675 /*
676 ** Signal task controller to add new session
677 ** to poll set.
678 */
679
680 PIPE_SEND_SIGNAL();
681 }
682 TASKMGR_UNLOCK();
683 ctx = NULL;
684
685 }
686
687 /* check if there is any task waiting to be served */
688 TASKMGR_LOCK();
689
690 GET_TASK_READY_TO_RUN();
691
692 /* Got a task? */
693 if (ctx != NULL)
694 {
695 TASKMGR_UNLOCK();
696 continue;
697 }
698
699 /*
700 ** if not, let's check if there is enough idle workers
701 ** if yes: quit
702 */
703
704 if (Tskmgr.tm_nb_workers > MIN_WORKERS &&
705 Tskmgr.tm_nb_idle > MIN_IDLE)
706 done = true;
707
708 POOL_LEV_DPRINTF(4, ("worker %d: checking ... %d %d", t_id,
709 Tskmgr.tm_nb_workers, Tskmgr.tm_nb_idle + 1));
710
711 if (done)
712 {
713 POOL_LEV_DPRINTF(4, ("worker %d: quitting... ", t_id));
714 Tskmgr.tm_nb_workers--;
715 TASKMGR_UNLOCK();
716 continue;
717 }
718
719 /*
720 ** if no task ready to run, wait for another one
721 */
722
723 Tskmgr.tm_nb_idle++;
724 TASKMGR_COND_WAIT();
725 Tskmgr.tm_nb_idle--;
726
727 /* look for a task */
728 GET_TASK_READY_TO_RUN();
729
730 TASKMGR_UNLOCK();
731 }
732 return NULL;
733 }
734
735 /*
736 ** MI_LIST_ADD_CTX -- add new session to linked list
737 **
738 ** Parameters:
739 ** ctx -- context structure
740 **
741 ** Returns:
742 ** MI_FAILURE/MI_SUCCESS
743 */
744
745 static int
mi_list_add_ctx(ctx)746 mi_list_add_ctx(ctx)
747 SMFICTX_PTR ctx;
748 {
749 SM_ASSERT(ctx != NULL);
750 SM_TAILQ_INSERT_TAIL(&WRK_CTX_HEAD, ctx, ctx_link);
751 return MI_SUCCESS;
752 }
753
754 /*
755 ** MI_LIST_DEL_CTX -- remove session from linked list when finished
756 **
757 ** Parameters:
758 ** ctx -- context structure
759 **
760 ** Returns:
761 ** MI_FAILURE/MI_SUCCESS
762 */
763
764 static int
mi_list_del_ctx(ctx)765 mi_list_del_ctx(ctx)
766 SMFICTX_PTR ctx;
767 {
768 SM_ASSERT(ctx != NULL);
769 if (SM_TAILQ_EMPTY(&WRK_CTX_HEAD))
770 return MI_FAILURE;
771
772 SM_TAILQ_REMOVE(&WRK_CTX_HEAD, ctx, ctx_link);
773 return MI_SUCCESS;
774 }
775 #endif /* _FFR_WORKERS_POOL */
776