/*
 *  Copyright (c) 2003-2004, 2007, 2009 Sendmail, Inc. and its suppliers.
 *	All rights reserved.
 *
 * By using this file, you agree to the terms and conditions set
 * forth in the LICENSE file which can be found at the top level of
 * the sendmail distribution.
 *
 * Contributed by Jose Marcio Martins da Cruz - Ecole des Mines de Paris
 *   Jose-Marcio.Martins@ensmp.fr
 */

#include <sm/gen.h>
SM_RCSID("@(#)$Id: worker.c,v 8.17 2009/06/15 15:34:54 ca Exp $")

#include "libmilter.h"

#if _FFR_WORKERS_POOL

typedef struct taskmgr_S taskmgr_T;

#define TM_SIGNATURE		0x23021957

struct taskmgr_S
{
	long		tm_signature; /* has the controller been initialized */
	sthread_t	tm_tid;	/* thread id of controller */
	smfi_hd_T	tm_ctx_head; /* head of the linked list of contexts */

	int		tm_nb_workers;	/* number of workers in the pool */
	int		tm_nb_idle;	/* number of workers waiting */

	int		tm_p[2];	/* poll control pipe */

	smutex_t	tm_w_mutex;	/* linked list access mutex */
	scond_t		tm_w_cond;	/* */
};

static taskmgr_T     Tskmgr = {0};

#define WRK_CTX_HEAD	Tskmgr.tm_ctx_head

#define RD_PIPE	(Tskmgr.tm_p[0])
#define WR_PIPE	(Tskmgr.tm_p[1])

#define PIPE_SEND_SIGNAL()						\
	do								\
	{								\
		char evt = 0x5a;					\
		int fd = WR_PIPE;					\
		if (write(fd, &evt, sizeof(evt)) != sizeof(evt))	\
			smi_log(SMI_LOG_ERR,				\
				"Error writing to event pipe: %s",	\
				sm_errstring(errno));			\
	} while (0)

#ifndef USE_PIPE_WAKE_POLL
# define USE_PIPE_WAKE_POLL 1
#endif /* USE_PIPE_WAKE_POLL */

/* poll check periodicity (default 10000 - 10 s) */
#define POLL_TIMEOUT   10000

/* worker conditional wait timeout (default 10 s) */
#define COND_TIMEOUT     10

/* functions */
static int mi_close_session __P((SMFICTX_PTR));

static void *mi_worker __P((void *));
static void *mi_pool_controller __P((void *));

static int mi_list_add_ctx __P((SMFICTX_PTR));
static int mi_list_del_ctx __P((SMFICTX_PTR));

/*
**  periodicity of cleaning up old sessions (timedout)
**	sessions list will be checked to find old inactive
**	sessions each DT_CHECK_OLD_SESSIONS sec
*/

#define DT_CHECK_OLD_SESSIONS   600

#ifndef OLD_SESSION_TIMEOUT
# define OLD_SESSION_TIMEOUT      ctx->ctx_timeout
#endif /* OLD_SESSION_TIMEOUT */

/* session states - with respect to the pool of workers */
#define WKST_INIT		0	/* initial state */
#define WKST_READY_TO_RUN	1	/* command ready do be read */
#define WKST_RUNNING		2	/* session running on a worker */
#define WKST_READY_TO_WAIT	3	/* session just finished by a worker */
#define WKST_WAITING		4	/* waiting for new command */
#define WKST_CLOSING		5	/* session finished */

#ifndef MIN_WORKERS
# define MIN_WORKERS	2  /* minimum number of threads to keep around */
#endif

#define MIN_IDLE	1  /* minimum number of idle threads */


/*
**  Macros for threads and mutex management
*/

#define TASKMGR_LOCK()							\
	do								\
	{								\
		if (!smutex_lock(&Tskmgr.tm_w_mutex))			\
			smi_log(SMI_LOG_ERR, "TASKMGR_LOCK error");	\
	} while (0)

#define TASKMGR_UNLOCK()						\
	do								\
	{								\
		if (!smutex_unlock(&Tskmgr.tm_w_mutex))			\
			smi_log(SMI_LOG_ERR, "TASKMGR_UNLOCK error");	\
	} while (0)

#define	TASKMGR_COND_WAIT()						\
	scond_timedwait(&Tskmgr.tm_w_cond, &Tskmgr.tm_w_mutex, COND_TIMEOUT)

#define	TASKMGR_COND_SIGNAL()						\
	do								\
	{								\
		if (scond_signal(&Tskmgr.tm_w_cond) != 0)		\
			smi_log(SMI_LOG_ERR, "TASKMGR_COND_SIGNAL error"); \
	} while (0)

#define LAUNCH_WORKER(ctx)						\
	do								\
	{								\
		int r;							\
		sthread_t tid;						\
									\
		if ((r = thread_create(&tid, mi_worker, ctx)) != 0)	\
			smi_log(SMI_LOG_ERR, "LAUNCH_WORKER error: %s",\
				sm_errstring(r));			\
	} while (0)

#if POOL_DEBUG
# define POOL_LEV_DPRINTF(lev, x)					\
	do {								\
		if ((lev) < ctx->ctx_dbg)				\
			sm_dprintf x;					\
	} while (0)
#else /* POOL_DEBUG */
# define POOL_LEV_DPRINTF(lev, x)
#endif /* POOL_DEBUG */

/*
**  MI_START_SESSION -- Start a session in the pool of workers
**
**	Parameters:
**		ctx -- context structure
**
**	Returns:
**		MI_SUCCESS/MI_FAILURE
*/

int
mi_start_session(ctx)
	SMFICTX_PTR ctx;
{
	static long id = 0;

	SM_ASSERT(Tskmgr.tm_signature == TM_SIGNATURE);
	SM_ASSERT(ctx != NULL);
	POOL_LEV_DPRINTF(4, ("PIPE r=[%d] w=[%d]", RD_PIPE, WR_PIPE));
	TASKMGR_LOCK();

	if (mi_list_add_ctx(ctx) != MI_SUCCESS)
	{
		TASKMGR_UNLOCK();
		return MI_FAILURE;
	}

	ctx->ctx_sid = id++;

	/* if there is an idle worker, signal it, otherwise start new worker */
	if (Tskmgr.tm_nb_idle > 0)
	{
		ctx->ctx_wstate = WKST_READY_TO_RUN;
		TASKMGR_COND_SIGNAL();
	}
	else
	{
		ctx->ctx_wstate = WKST_RUNNING;
		LAUNCH_WORKER(ctx);
	}
	TASKMGR_UNLOCK();
	return MI_SUCCESS;
}

/*
**  MI_CLOSE_SESSION -- Close a session and clean up data structures
**
**	Parameters:
**		ctx -- context structure
**
**	Returns:
**		MI_SUCCESS/MI_FAILURE
*/

static int
mi_close_session(ctx)
	SMFICTX_PTR ctx;
{
	SM_ASSERT(ctx != NULL);

	(void) mi_list_del_ctx(ctx);
	mi_clr_ctx(ctx);

	return MI_SUCCESS;
}

/*
**  MI_POOL_CONTROLER_INIT -- Launch the worker pool controller
**		Must be called before starting sessions.
**
**	Parameters:
**		none
**
**	Returns:
**		MI_SUCCESS/MI_FAILURE
*/

int
mi_pool_controller_init()
{
	sthread_t tid;
	int r, i;

	if (Tskmgr.tm_signature == TM_SIGNATURE)
		return MI_SUCCESS;

	SM_TAILQ_INIT(&WRK_CTX_HEAD);
	Tskmgr.tm_tid = (sthread_t) -1;
	Tskmgr.tm_nb_workers = 0;
	Tskmgr.tm_nb_idle = 0;

	if (pipe(Tskmgr.tm_p) != 0)
	{
		smi_log(SMI_LOG_ERR, "can't create event pipe: %s",
			sm_errstring(errno));
		return MI_FAILURE;
	}

	(void) smutex_init(&Tskmgr.tm_w_mutex);
	(void) scond_init(&Tskmgr.tm_w_cond);

	/* Launch the pool controller */
	if ((r = thread_create(&tid, mi_pool_controller, (void *) NULL)) != 0)
	{
		smi_log(SMI_LOG_ERR, "can't create controller thread: %s",
			sm_errstring(r));
		return MI_FAILURE;
	}
	Tskmgr.tm_tid = tid;
	Tskmgr.tm_signature = TM_SIGNATURE;

	/* Create the pool of workers */
	for (i = 0; i < MIN_WORKERS; i++)
	{
		if ((r = thread_create(&tid, mi_worker, (void *) NULL)) != 0)
		{
			smi_log(SMI_LOG_ERR, "can't create workers crew: %s",
				sm_errstring(r));
			return MI_FAILURE;
		}
	}

	return MI_SUCCESS;
}

/*
**  MI_POOL_CONTROLLER -- manage the pool of workers
**	This thread must be running when listener begins
**	starting sessions
**
**	Parameters:
**		arg -- unused
**
**	Returns:
**		NULL
**
**	Control flow:
**		for (;;)
**			Look for timed out sessions
**			Select sessions to wait for sendmail command
**			Poll set of file descriptors
**			if timeout
**				continue
**			For each file descriptor ready
**				launch new thread if no worker available
**				else
**				signal waiting worker
*/

/* Poll structure array (pollfd) size step */
#define PFD_STEP	256

#define WAIT_FD(i)	(pfd[i].fd)
#define WAITFN		"POLL"

static void *
mi_pool_controller(arg)
	void *arg;
{
	struct pollfd *pfd = NULL;
	int dim_pfd = 0;
	bool rebuild_set = true;
	int pcnt = 0; /* error count for poll() failures */
	time_t lastcheck;

	Tskmgr.tm_tid = sthread_get_id();
	if (pthread_detach(Tskmgr.tm_tid) != 0)
	{
		smi_log(SMI_LOG_ERR, "Failed to detach pool controller thread");
		return NULL;
	}

	pfd = (struct pollfd *) malloc(PFD_STEP * sizeof(struct pollfd));
	if (pfd == NULL)
	{
		smi_log(SMI_LOG_ERR, "Failed to malloc pollfd array: %s",
			sm_errstring(errno));
		return NULL;
	}
	dim_pfd = PFD_STEP;

	lastcheck = time(NULL);
	for (;;)
	{
		SMFICTX_PTR ctx;
		int nfd, rfd, i;
		time_t now;

		POOL_LEV_DPRINTF(4, ("Let's %s again...", WAITFN));

		if (mi_stop() != MILTER_CONT)
			break;

		TASKMGR_LOCK();

		now = time(NULL);

		/* check for timed out sessions? */
		if (lastcheck + DT_CHECK_OLD_SESSIONS < now)
		{
			ctx = SM_TAILQ_FIRST(&WRK_CTX_HEAD);
			while (ctx != SM_TAILQ_END(&WRK_CTX_HEAD))
			{
				SMFICTX_PTR ctx_nxt;

				ctx_nxt = SM_TAILQ_NEXT(ctx, ctx_link);
				if (ctx->ctx_wstate == WKST_WAITING)
				{
					if (ctx->ctx_wait == 0)
						ctx->ctx_wait = now;
					else if (ctx->ctx_wait + OLD_SESSION_TIMEOUT
						 < now)
					{
						/* if session timed out, close it */
						sfsistat (*fi_close) __P((SMFICTX *));

						POOL_LEV_DPRINTF(4,
							("Closing old connection: sd=%d id=%d",
							ctx->ctx_sd,
							ctx->ctx_sid));

						if ((fi_close = ctx->ctx_smfi->xxfi_close) != NULL)
							(void) (*fi_close)(ctx);

						mi_close_session(ctx);
					}
				}
				ctx = ctx_nxt;
			}
			lastcheck = now;
		}

		if (rebuild_set)
		{
			/*
			**  Initialize poll set.
			**  Insert into the poll set the file descriptors of
			**  all sessions waiting for a command from sendmail.
			*/

			nfd = 0;

			/* begin with worker pipe */
			pfd[nfd].fd = RD_PIPE;
			pfd[nfd].events = MI_POLL_RD_FLAGS;
			pfd[nfd].revents = 0;
			nfd++;

			SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link)
			{
				/*
				**  update ctx_wait - start of wait moment -
				**  for timeout
				*/

				if (ctx->ctx_wstate == WKST_READY_TO_WAIT)
					ctx->ctx_wait = now;

				/* add the session to the pollfd array? */
				if ((ctx->ctx_wstate == WKST_READY_TO_WAIT) ||
				    (ctx->ctx_wstate == WKST_WAITING))
				{
					/*
					**  Resize the pollfd array if it
					**  isn't large enough.
					*/

					if (nfd >= dim_pfd)
					{
						struct pollfd *tpfd;
						size_t new;

						new = (dim_pfd + PFD_STEP) *
							sizeof(*tpfd);
						tpfd = (struct pollfd *)
							realloc(pfd, new);
						if (tpfd != NULL)
						{
							pfd = tpfd;
							dim_pfd += PFD_STEP;
						}
						else
						{
							smi_log(SMI_LOG_ERR,
								"Failed to realloc pollfd array:%s",
								sm_errstring(errno));
						}
					}

					/* add the session to pollfd array */
					if (nfd < dim_pfd)
					{
						ctx->ctx_wstate = WKST_WAITING;
						pfd[nfd].fd = ctx->ctx_sd;
						pfd[nfd].events = MI_POLL_RD_FLAGS;
						pfd[nfd].revents = 0;
						nfd++;
					}
				}
			}
			rebuild_set = false;
		}

		TASKMGR_UNLOCK();

		/* Everything is ready, let's wait for an event */
		rfd = poll(pfd, nfd, POLL_TIMEOUT);

		POOL_LEV_DPRINTF(4, ("%s returned: at epoch %d value %d",
			WAITFN, now, nfd));

		/* timeout */
		if (rfd == 0)
			continue;

		rebuild_set = true;

		/* error */
		if (rfd < 0)
		{
			if (errno == EINTR)
				continue;
			pcnt++;
			smi_log(SMI_LOG_ERR,
				"%s() failed (%s), %s",
				WAITFN, sm_errstring(errno),
				pcnt >= MAX_FAILS_S ? "abort" : "try again");

			if (pcnt >= MAX_FAILS_S)
				goto err;
		}
		pcnt = 0;

		/* something happened */
		for (i = 0; i < nfd; i++)
		{
			if (pfd[i].revents == 0)
				continue;

			POOL_LEV_DPRINTF(4, ("%s event on pfd[%d/%d]=%d ",
				WAITFN, i, nfd,
			WAIT_FD(i)));

			/* has a worker signaled an end of task ? */
			if (WAIT_FD(i) == RD_PIPE)
			{
				char evt = 0;
				int r = 0;

				POOL_LEV_DPRINTF(4,
					("PIPE WILL READ evt = %08X %08X",
					pfd[i].events, pfd[i].revents));

				if ((pfd[i].revents & MI_POLL_RD_FLAGS) != 0)
				{
					r = read(RD_PIPE, &evt, sizeof(evt));
					if (r == sizeof(evt))
					{
						/* Do nothing */
					}
				}

				POOL_LEV_DPRINTF(4,
					("PIPE DONE READ i=[%d] fd=[%d] r=[%d] evt=[%d]",
					i, RD_PIPE, r, evt));

				if ((pfd[i].revents & ~MI_POLL_RD_FLAGS) != 0)
				{
					/* Exception handling */
				}
				continue;
			}

			/* no ! sendmail wants to send a command */
			SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link)
			{
				if (ctx->ctx_wstate != WKST_WAITING)
					continue;

				POOL_LEV_DPRINTF(4,
					("Checking context sd=%d - fd=%d ",
					ctx->ctx_sd , WAIT_FD(i)));

				if (ctx->ctx_sd == pfd[i].fd)
				{
					TASKMGR_LOCK();

					POOL_LEV_DPRINTF(4,
						("TASK: found %d for fd[%d]=%d",
						ctx->ctx_sid, i, WAIT_FD(i)));

					if (Tskmgr.tm_nb_idle > 0)
					{
						ctx->ctx_wstate = WKST_READY_TO_RUN;
						TASKMGR_COND_SIGNAL();
					}
					else
					{
						ctx->ctx_wstate = WKST_RUNNING;
						LAUNCH_WORKER(ctx);
					}
					TASKMGR_UNLOCK();
					break;
				}
			}

			POOL_LEV_DPRINTF(4,
				("TASK %s FOUND - Checking PIPE for fd[%d]",
				ctx != NULL ? "" : "NOT", WAIT_FD(i)));
		}
	}

  err:
	if (pfd != NULL)
		free(pfd);

	Tskmgr.tm_signature = 0;
	for (;;)
	{
		SMFICTX_PTR ctx;

		ctx = SM_TAILQ_FIRST(&WRK_CTX_HEAD);
		if (ctx == NULL)
			break;
		mi_close_session(ctx);
	}

	(void) smutex_destroy(&Tskmgr.tm_w_mutex);
	(void) scond_destroy(&Tskmgr.tm_w_cond);

	return NULL;
}

/*
**  Look for a task ready to run.
**  Value of ctx is NULL or a pointer to a task ready to run.
*/

#define GET_TASK_READY_TO_RUN()					\
	SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link)		\
	{							\
		if (ctx->ctx_wstate == WKST_READY_TO_RUN)	\
		{						\
			ctx->ctx_wstate = WKST_RUNNING;		\
			break;					\
		}						\
	}

/*
**  MI_WORKER -- worker thread
**	executes tasks distributed by the mi_pool_controller
**	or by mi_start_session
**
**	Parameters:
**		arg -- pointer to context structure
**
**	Returns:
**		NULL pointer
*/

static void *
mi_worker(arg)
	void *arg;
{
	SMFICTX_PTR ctx;
	bool done;
	sthread_t t_id;
	int r;

	ctx = (SMFICTX_PTR) arg;
	done = false;
	if (ctx != NULL)
		ctx->ctx_wstate = WKST_RUNNING;

	t_id = sthread_get_id();
	if (pthread_detach(t_id) != 0)
	{
		smi_log(SMI_LOG_ERR, "Failed to detach worker thread");
		if (ctx != NULL)
			ctx->ctx_wstate = WKST_READY_TO_RUN;
		return NULL;
	}

	TASKMGR_LOCK();
	Tskmgr.tm_nb_workers++;
	TASKMGR_UNLOCK();

	while (!done)
	{
		if (mi_stop() != MILTER_CONT)
			break;

		/* let's handle next task... */
		if (ctx != NULL)
		{
			int res;

			POOL_LEV_DPRINTF(4,
				("worker %d: new task -> let's handle it",
				t_id));
			res = mi_engine(ctx);
			POOL_LEV_DPRINTF(4,
				("worker %d: mi_engine returned %d", t_id, res));

			TASKMGR_LOCK();
			if (res != MI_CONTINUE)
			{
				ctx->ctx_wstate = WKST_CLOSING;

				/*
				**  Delete context from linked list of
				**  sessions and close session.
				*/

				mi_close_session(ctx);
			}
			else
			{
				ctx->ctx_wstate = WKST_READY_TO_WAIT;

				POOL_LEV_DPRINTF(4,
					("writing to event pipe..."));

				/*
				**  Signal task controller to add new session
				**  to poll set.
				*/

				PIPE_SEND_SIGNAL();
			}
			TASKMGR_UNLOCK();
			ctx = NULL;

		}

		/* check if there is any task waiting to be served */
		TASKMGR_LOCK();

		GET_TASK_READY_TO_RUN();

		/* Got a task? */
		if (ctx != NULL)
		{
			TASKMGR_UNLOCK();
			continue;
		}

		/*
		**  if not, let's check if there is enough idle workers
		**	if yes: quit
		*/

		if (Tskmgr.tm_nb_workers > MIN_WORKERS &&
		    Tskmgr.tm_nb_idle > MIN_IDLE)
			done = true;

		POOL_LEV_DPRINTF(4, ("worker %d: checking ... %d %d", t_id,
			Tskmgr.tm_nb_workers, Tskmgr.tm_nb_idle + 1));

		if (done)
		{
			POOL_LEV_DPRINTF(4, ("worker %d: quitting... ", t_id));
			Tskmgr.tm_nb_workers--;
			TASKMGR_UNLOCK();
			continue;
		}

		/*
		**  if no task ready to run, wait for another one
		*/

		Tskmgr.tm_nb_idle++;
		TASKMGR_COND_WAIT();
		Tskmgr.tm_nb_idle--;

		/* look for a task */
		GET_TASK_READY_TO_RUN();

		TASKMGR_UNLOCK();
	}
	return NULL;
}

/*
**  MI_LIST_ADD_CTX -- add new session to linked list
**
**	Parameters:
**		ctx -- context structure
**
**	Returns:
**		MI_FAILURE/MI_SUCCESS
*/

static int
mi_list_add_ctx(ctx)
	SMFICTX_PTR ctx;
{
	SM_ASSERT(ctx != NULL);
	SM_TAILQ_INSERT_TAIL(&WRK_CTX_HEAD, ctx, ctx_link);
	return MI_SUCCESS;
}

/*
**  MI_LIST_DEL_CTX -- remove session from linked list when finished
**
**	Parameters:
**		ctx -- context structure
**
**	Returns:
**		MI_FAILURE/MI_SUCCESS
*/

static int
mi_list_del_ctx(ctx)
	SMFICTX_PTR ctx;
{
	SM_ASSERT(ctx != NULL);
	if (SM_TAILQ_EMPTY(&WRK_CTX_HEAD))
		return MI_FAILURE;

	SM_TAILQ_REMOVE(&WRK_CTX_HEAD, ctx, ctx_link);
	return MI_SUCCESS;
}
#endif /* _FFR_WORKERS_POOL */