114e22b5gshapiro/*
2b649239gshapiro *  Copyright (c) 2003-2004, 2007, 2009-2012 Proofpoint, Inc. and its suppliers.
314e22b5gshapiro *	All rights reserved.
414e22b5gshapiro *
514e22b5gshapiro * By using this file, you agree to the terms and conditions set
614e22b5gshapiro * forth in the LICENSE file which can be found at the top level of
714e22b5gshapiro * the sendmail distribution.
814e22b5gshapiro *
914e22b5gshapiro * Contributed by Jose Marcio Martins da Cruz - Ecole des Mines de Paris
1014e22b5gshapiro *   Jose-Marcio.Martins@ensmp.fr
1114e22b5gshapiro */
1214e22b5gshapiro
1314e22b5gshapiro#include <sm/gen.h>
14e978ee2gshapiroSM_RCSID("@(#)$Id: worker.c,v 8.25 2013-11-22 20:51:37 ca Exp $")
1514e22b5gshapiro
1614e22b5gshapiro#include "libmilter.h"
1714e22b5gshapiro
1814e22b5gshapiro#if _FFR_WORKERS_POOL
1914e22b5gshapiro
2014e22b5gshapirotypedef struct taskmgr_S taskmgr_T;
2114e22b5gshapiro
2214e22b5gshapiro#define TM_SIGNATURE		0x23021957
2314e22b5gshapiro
2414e22b5gshapirostruct taskmgr_S
2514e22b5gshapiro{
2614e22b5gshapiro	long		tm_signature; /* has the controller been initialized */
2714e22b5gshapiro	sthread_t	tm_tid;	/* thread id of controller */
2814e22b5gshapiro	smfi_hd_T	tm_ctx_head; /* head of the linked list of contexts */
2914e22b5gshapiro
3014e22b5gshapiro	int		tm_nb_workers;	/* number of workers in the pool */
3114e22b5gshapiro	int		tm_nb_idle;	/* number of workers waiting */
3214e22b5gshapiro
3314e22b5gshapiro	int		tm_p[2];	/* poll control pipe */
3414e22b5gshapiro
3514e22b5gshapiro	smutex_t	tm_w_mutex;	/* linked list access mutex */
3614e22b5gshapiro	scond_t		tm_w_cond;	/* */
3714e22b5gshapiro};
3814e22b5gshapiro
3914e22b5gshapirostatic taskmgr_T     Tskmgr = {0};
4014e22b5gshapiro
4114e22b5gshapiro#define WRK_CTX_HEAD	Tskmgr.tm_ctx_head
4214e22b5gshapiro
4314e22b5gshapiro#define RD_PIPE	(Tskmgr.tm_p[0])
4414e22b5gshapiro#define WR_PIPE	(Tskmgr.tm_p[1])
4514e22b5gshapiro
4614e22b5gshapiro#define PIPE_SEND_SIGNAL()						\
4714e22b5gshapiro	do								\
4814e22b5gshapiro	{								\
4914e22b5gshapiro		char evt = 0x5a;					\
5014e22b5gshapiro		int fd = WR_PIPE;					\
5114e22b5gshapiro		if (write(fd, &evt, sizeof(evt)) != sizeof(evt))	\
5214e22b5gshapiro			smi_log(SMI_LOG_ERR,				\
5314e22b5gshapiro				"Error writing to event pipe: %s",	\
5414e22b5gshapiro				sm_errstring(errno));			\
5514e22b5gshapiro	} while (0)
5614e22b5gshapiro
5714e22b5gshapiro#ifndef USE_PIPE_WAKE_POLL
5814e22b5gshapiro# define USE_PIPE_WAKE_POLL 1
5914e22b5gshapiro#endif /* USE_PIPE_WAKE_POLL */
6014e22b5gshapiro
6114e22b5gshapiro/* poll check periodicity (default 10000 - 10 s) */
6214e22b5gshapiro#define POLL_TIMEOUT   10000
6314e22b5gshapiro
6414e22b5gshapiro/* worker conditional wait timeout (default 10 s) */
6514e22b5gshapiro#define COND_TIMEOUT     10
6614e22b5gshapiro
6714e22b5gshapiro/* functions */
6814e22b5gshapirostatic int mi_close_session __P((SMFICTX_PTR));
6914e22b5gshapiro
7014e22b5gshapirostatic void *mi_worker __P((void *));
7114e22b5gshapirostatic void *mi_pool_controller __P((void *));
7214e22b5gshapiro
7314e22b5gshapirostatic int mi_list_add_ctx __P((SMFICTX_PTR));
7414e22b5gshapirostatic int mi_list_del_ctx __P((SMFICTX_PTR));
7514e22b5gshapiro
7614e22b5gshapiro/*
7714e22b5gshapiro**  periodicity of cleaning up old sessions (timedout)
7814e22b5gshapiro**	sessions list will be checked to find old inactive
7914e22b5gshapiro**	sessions each DT_CHECK_OLD_SESSIONS sec
8014e22b5gshapiro*/
8114e22b5gshapiro
8214e22b5gshapiro#define DT_CHECK_OLD_SESSIONS   600
8314e22b5gshapiro
8414e22b5gshapiro#ifndef OLD_SESSION_TIMEOUT
8514e22b5gshapiro# define OLD_SESSION_TIMEOUT      ctx->ctx_timeout
8614e22b5gshapiro#endif /* OLD_SESSION_TIMEOUT */
8714e22b5gshapiro
8814e22b5gshapiro/* session states - with respect to the pool of workers */
8914e22b5gshapiro#define WKST_INIT		0	/* initial state */
9014e22b5gshapiro#define WKST_READY_TO_RUN	1	/* command ready do be read */
9114e22b5gshapiro#define WKST_RUNNING		2	/* session running on a worker */
9214e22b5gshapiro#define WKST_READY_TO_WAIT	3	/* session just finished by a worker */
9314e22b5gshapiro#define WKST_WAITING		4	/* waiting for new command */
9414e22b5gshapiro#define WKST_CLOSING		5	/* session finished */
9514e22b5gshapiro
9614e22b5gshapiro#ifndef MIN_WORKERS
9714e22b5gshapiro# define MIN_WORKERS	2  /* minimum number of threads to keep around */
9814e22b5gshapiro#endif
9914e22b5gshapiro
10014e22b5gshapiro#define MIN_IDLE	1  /* minimum number of idle threads */
10114e22b5gshapiro
10214e22b5gshapiro
10314e22b5gshapiro/*
10414e22b5gshapiro**  Macros for threads and mutex management
10514e22b5gshapiro*/
10614e22b5gshapiro
10714e22b5gshapiro#define TASKMGR_LOCK()							\
10814e22b5gshapiro	do								\
10914e22b5gshapiro	{								\
11014e22b5gshapiro		if (!smutex_lock(&Tskmgr.tm_w_mutex))			\
11114e22b5gshapiro			smi_log(SMI_LOG_ERR, "TASKMGR_LOCK error");	\
11214e22b5gshapiro	} while (0)
11314e22b5gshapiro
11414e22b5gshapiro#define TASKMGR_UNLOCK()						\
11514e22b5gshapiro	do								\
11614e22b5gshapiro	{								\
11714e22b5gshapiro		if (!smutex_unlock(&Tskmgr.tm_w_mutex))			\
11814e22b5gshapiro			smi_log(SMI_LOG_ERR, "TASKMGR_UNLOCK error");	\
11914e22b5gshapiro	} while (0)
12014e22b5gshapiro
12114e22b5gshapiro#define	TASKMGR_COND_WAIT()						\
12214e22b5gshapiro	scond_timedwait(&Tskmgr.tm_w_cond, &Tskmgr.tm_w_mutex, COND_TIMEOUT)
12314e22b5gshapiro
12414e22b5gshapiro#define	TASKMGR_COND_SIGNAL()						\
12514e22b5gshapiro	do								\
12614e22b5gshapiro	{								\
12714e22b5gshapiro		if (scond_signal(&Tskmgr.tm_w_cond) != 0)		\
12814e22b5gshapiro			smi_log(SMI_LOG_ERR, "TASKMGR_COND_SIGNAL error"); \
12914e22b5gshapiro	} while (0)
13014e22b5gshapiro
13114e22b5gshapiro#define LAUNCH_WORKER(ctx)						\
13214e22b5gshapiro	do								\
13314e22b5gshapiro	{								\
13414e22b5gshapiro		int r;							\
13514e22b5gshapiro		sthread_t tid;						\
13614e22b5gshapiro									\
13714e22b5gshapiro		if ((r = thread_create(&tid, mi_worker, ctx)) != 0)	\
13814e22b5gshapiro			smi_log(SMI_LOG_ERR, "LAUNCH_WORKER error: %s",\
13914e22b5gshapiro				sm_errstring(r));			\
14014e22b5gshapiro	} while (0)
14114e22b5gshapiro
14214e22b5gshapiro#if POOL_DEBUG
14314e22b5gshapiro# define POOL_LEV_DPRINTF(lev, x)					\
1449deed03gshapiro	do								\
1459deed03gshapiro	{								\
14614e22b5gshapiro		if ((lev) < ctx->ctx_dbg)				\
14714e22b5gshapiro			sm_dprintf x;					\
14814e22b5gshapiro	} while (0)
14914e22b5gshapiro#else /* POOL_DEBUG */
15014e22b5gshapiro# define POOL_LEV_DPRINTF(lev, x)
15114e22b5gshapiro#endif /* POOL_DEBUG */
15214e22b5gshapiro
15314e22b5gshapiro/*
15414e22b5gshapiro**  MI_START_SESSION -- Start a session in the pool of workers
15514e22b5gshapiro**
15614e22b5gshapiro**	Parameters:
15714e22b5gshapiro**		ctx -- context structure
15814e22b5gshapiro**
15914e22b5gshapiro**	Returns:
16014e22b5gshapiro**		MI_SUCCESS/MI_FAILURE
16114e22b5gshapiro*/
16214e22b5gshapiro
16314e22b5gshapiroint
16414e22b5gshapiromi_start_session(ctx)
16514e22b5gshapiro	SMFICTX_PTR ctx;
16614e22b5gshapiro{
16714e22b5gshapiro	static long id = 0;
16814e22b5gshapiro
169dd85ecegshapiro	/* this can happen if the milter is shutting down */
170dd85ecegshapiro	if (Tskmgr.tm_signature != TM_SIGNATURE)
171dd85ecegshapiro		return MI_FAILURE;
17214e22b5gshapiro	SM_ASSERT(ctx != NULL);
17314e22b5gshapiro	POOL_LEV_DPRINTF(4, ("PIPE r=[%d] w=[%d]", RD_PIPE, WR_PIPE));
17414e22b5gshapiro	TASKMGR_LOCK();
17514e22b5gshapiro
17614e22b5gshapiro	if (mi_list_add_ctx(ctx) != MI_SUCCESS)
17714e22b5gshapiro	{
17814e22b5gshapiro		TASKMGR_UNLOCK();
17914e22b5gshapiro		return MI_FAILURE;
18014e22b5gshapiro	}
18114e22b5gshapiro
18214e22b5gshapiro	ctx->ctx_sid = id++;
18314e22b5gshapiro
18414e22b5gshapiro	/* if there is an idle worker, signal it, otherwise start new worker */
18514e22b5gshapiro	if (Tskmgr.tm_nb_idle > 0)
18614e22b5gshapiro	{
18714e22b5gshapiro		ctx->ctx_wstate = WKST_READY_TO_RUN;
18814e22b5gshapiro		TASKMGR_COND_SIGNAL();
18914e22b5gshapiro	}
19014e22b5gshapiro	else
19114e22b5gshapiro	{
19214e22b5gshapiro		ctx->ctx_wstate = WKST_RUNNING;
19314e22b5gshapiro		LAUNCH_WORKER(ctx);
19414e22b5gshapiro	}
19514e22b5gshapiro	TASKMGR_UNLOCK();
19614e22b5gshapiro	return MI_SUCCESS;
19714e22b5gshapiro}
19814e22b5gshapiro
19914e22b5gshapiro/*
20014e22b5gshapiro**  MI_CLOSE_SESSION -- Close a session and clean up data structures
20114e22b5gshapiro**
20214e22b5gshapiro**	Parameters:
20314e22b5gshapiro**		ctx -- context structure
20414e22b5gshapiro**
20514e22b5gshapiro**	Returns:
20614e22b5gshapiro**		MI_SUCCESS/MI_FAILURE
20714e22b5gshapiro*/
20814e22b5gshapiro
20914e22b5gshapirostatic int
21014e22b5gshapiromi_close_session(ctx)
21114e22b5gshapiro	SMFICTX_PTR ctx;
21214e22b5gshapiro{
21314e22b5gshapiro	SM_ASSERT(ctx != NULL);
21414e22b5gshapiro
21514e22b5gshapiro	(void) mi_list_del_ctx(ctx);
216a845449gshapiro	mi_clr_ctx(ctx);
21714e22b5gshapiro
21814e22b5gshapiro	return MI_SUCCESS;
21914e22b5gshapiro}
22014e22b5gshapiro
22114e22b5gshapiro/*
222dd85ecegshapiro**  NONBLOCKING -- set nonblocking mode for a file descriptor.
223dd85ecegshapiro**
224dd85ecegshapiro**	Parameters:
225dd85ecegshapiro**		fd -- file descriptor
226dd85ecegshapiro**		name -- name for (error) logging
227dd85ecegshapiro**
228dd85ecegshapiro**	Returns:
229dd85ecegshapiro**		MI_SUCCESS/MI_FAILURE
230dd85ecegshapiro*/
231dd85ecegshapiro
232dd85ecegshapirostatic int
233dd85ecegshapirononblocking(int fd, const char *name)
234dd85ecegshapiro{
235dd85ecegshapiro	int r;
236dd85ecegshapiro
237dd85ecegshapiro	errno = 0;
238dd85ecegshapiro	r = fcntl(fd, F_GETFL, 0);
239dd85ecegshapiro	if (r == -1)
240dd85ecegshapiro	{
241dd85ecegshapiro		smi_log(SMI_LOG_ERR, "fcntl(%s, F_GETFL)=%s",
242dd85ecegshapiro			name, sm_errstring(errno));
243dd85ecegshapiro		return MI_FAILURE;
244dd85ecegshapiro	}
245dd85ecegshapiro	errno = 0;
246dd85ecegshapiro	r = fcntl(fd, F_SETFL, r | O_NONBLOCK);
247dd85ecegshapiro	if (r == -1)
248dd85ecegshapiro	{
249dd85ecegshapiro		smi_log(SMI_LOG_ERR, "fcntl(%s, F_SETFL, O_NONBLOCK)=%s",
250dd85ecegshapiro			name, sm_errstring(errno));
251dd85ecegshapiro		return MI_FAILURE;
252dd85ecegshapiro	}
253dd85ecegshapiro	return MI_SUCCESS;
254dd85ecegshapiro}
255dd85ecegshapiro
256dd85ecegshapiro/*
257fe7726fgshapiro**  MI_POOL_CONTROLLER_INIT -- Launch the worker pool controller
25814e22b5gshapiro**		Must be called before starting sessions.
25914e22b5gshapiro**
26014e22b5gshapiro**	Parameters:
26114e22b5gshapiro**		none
26214e22b5gshapiro**
26314e22b5gshapiro**	Returns:
26414e22b5gshapiro**		MI_SUCCESS/MI_FAILURE
26514e22b5gshapiro*/
26614e22b5gshapiro
26714e22b5gshapiroint
26814e22b5gshapiromi_pool_controller_init()
26914e22b5gshapiro{
27014e22b5gshapiro	sthread_t tid;
27114e22b5gshapiro	int r, i;
27214e22b5gshapiro
27314e22b5gshapiro	if (Tskmgr.tm_signature == TM_SIGNATURE)
27414e22b5gshapiro		return MI_SUCCESS;
27514e22b5gshapiro
27614e22b5gshapiro	SM_TAILQ_INIT(&WRK_CTX_HEAD);
27714e22b5gshapiro	Tskmgr.tm_tid = (sthread_t) -1;
27814e22b5gshapiro	Tskmgr.tm_nb_workers = 0;
27914e22b5gshapiro	Tskmgr.tm_nb_idle = 0;
28014e22b5gshapiro
28114e22b5gshapiro	if (pipe(Tskmgr.tm_p) != 0)
28214e22b5gshapiro	{
28314e22b5gshapiro		smi_log(SMI_LOG_ERR, "can't create event pipe: %s",
284a845449gshapiro			sm_errstring(errno));
28514e22b5gshapiro		return MI_FAILURE;
28614e22b5gshapiro	}
287dd85ecegshapiro	r = nonblocking(WR_PIPE, "WR_PIPE");
288dd85ecegshapiro	if (r != MI_SUCCESS)
289dd85ecegshapiro		return r;
290dd85ecegshapiro	r = nonblocking(RD_PIPE, "RD_PIPE");
291dd85ecegshapiro	if (r != MI_SUCCESS)
292dd85ecegshapiro		return r;
29314e22b5gshapiro
29414e22b5gshapiro	(void) smutex_init(&Tskmgr.tm_w_mutex);
29514e22b5gshapiro	(void) scond_init(&Tskmgr.tm_w_cond);
29614e22b5gshapiro
29714e22b5gshapiro	/* Launch the pool controller */
29814e22b5gshapiro	if ((r = thread_create(&tid, mi_pool_controller, (void *) NULL)) != 0)
29914e22b5gshapiro	{
30014e22b5gshapiro		smi_log(SMI_LOG_ERR, "can't create controller thread: %s",
30114e22b5gshapiro			sm_errstring(r));
30214e22b5gshapiro		return MI_FAILURE;
30314e22b5gshapiro	}
30414e22b5gshapiro	Tskmgr.tm_tid = tid;
30514e22b5gshapiro	Tskmgr.tm_signature = TM_SIGNATURE;
30614e22b5gshapiro
30714e22b5gshapiro	/* Create the pool of workers */
30814e22b5gshapiro	for (i = 0; i < MIN_WORKERS; i++)
30914e22b5gshapiro	{
31014e22b5gshapiro		if ((r = thread_create(&tid, mi_worker, (void *) NULL)) != 0)
31114e22b5gshapiro		{
31214e22b5gshapiro			smi_log(SMI_LOG_ERR, "can't create workers crew: %s",
31314e22b5gshapiro				sm_errstring(r));
31414e22b5gshapiro			return MI_FAILURE;
31514e22b5gshapiro		}
31614e22b5gshapiro	}
31714e22b5gshapiro
31814e22b5gshapiro	return MI_SUCCESS;
31914e22b5gshapiro}
32014e22b5gshapiro
32114e22b5gshapiro/*
32214e22b5gshapiro**  MI_POOL_CONTROLLER -- manage the pool of workers
32314e22b5gshapiro**	This thread must be running when listener begins
32414e22b5gshapiro**	starting sessions
32514e22b5gshapiro**
32614e22b5gshapiro**	Parameters:
32714e22b5gshapiro**		arg -- unused
32814e22b5gshapiro**
32914e22b5gshapiro**	Returns:
33014e22b5gshapiro**		NULL
33114e22b5gshapiro**
33214e22b5gshapiro**	Control flow:
33314e22b5gshapiro**		for (;;)
33414e22b5gshapiro**			Look for timed out sessions
33514e22b5gshapiro**			Select sessions to wait for sendmail command
33614e22b5gshapiro**			Poll set of file descriptors
33714e22b5gshapiro**			if timeout
33814e22b5gshapiro**				continue
33914e22b5gshapiro**			For each file descriptor ready
34014e22b5gshapiro**				launch new thread if no worker available
34114e22b5gshapiro**				else
34214e22b5gshapiro**				signal waiting worker
34314e22b5gshapiro*/
34414e22b5gshapiro
34514e22b5gshapiro/* Poll structure array (pollfd) size step */
34614e22b5gshapiro#define PFD_STEP	256
34714e22b5gshapiro
34814e22b5gshapiro#define WAIT_FD(i)	(pfd[i].fd)
34914e22b5gshapiro#define WAITFN		"POLL"
35014e22b5gshapiro
35114e22b5gshapirostatic void *
35214e22b5gshapiromi_pool_controller(arg)
35314e22b5gshapiro	void *arg;
35414e22b5gshapiro{
35514e22b5gshapiro	struct pollfd *pfd = NULL;
35614e22b5gshapiro	int dim_pfd = 0;
35714e22b5gshapiro	bool rebuild_set = true;
35814e22b5gshapiro	int pcnt = 0; /* error count for poll() failures */
359a845449gshapiro	time_t lastcheck;
36014e22b5gshapiro
36114e22b5gshapiro	Tskmgr.tm_tid = sthread_get_id();
36214e22b5gshapiro	if (pthread_detach(Tskmgr.tm_tid) != 0)
36314e22b5gshapiro	{
36414e22b5gshapiro		smi_log(SMI_LOG_ERR, "Failed to detach pool controller thread");
36514e22b5gshapiro		return NULL;
36614e22b5gshapiro	}
36714e22b5gshapiro
36814e22b5gshapiro	pfd = (struct pollfd *) malloc(PFD_STEP * sizeof(struct pollfd));
36914e22b5gshapiro	if (pfd == NULL)
37014e22b5gshapiro	{
37114e22b5gshapiro		smi_log(SMI_LOG_ERR, "Failed to malloc pollfd array: %s",
37214e22b5gshapiro			sm_errstring(errno));
37314e22b5gshapiro		return NULL;
37414e22b5gshapiro	}
37514e22b5gshapiro	dim_pfd = PFD_STEP;
37614e22b5gshapiro
377a845449gshapiro	lastcheck = time(NULL);
37814e22b5gshapiro	for (;;)
37914e22b5gshapiro	{
38014e22b5gshapiro		SMFICTX_PTR ctx;
3819deed03gshapiro		int nfd, r, i;
38214e22b5gshapiro		time_t now;
38314e22b5gshapiro
38414e22b5gshapiro		POOL_LEV_DPRINTF(4, ("Let's %s again...", WAITFN));
38514e22b5gshapiro
38614e22b5gshapiro		if (mi_stop() != MILTER_CONT)
38714e22b5gshapiro			break;
38814e22b5gshapiro
38914e22b5gshapiro		TASKMGR_LOCK();
39014e22b5gshapiro
39114e22b5gshapiro		now = time(NULL);
39214e22b5gshapiro
39314e22b5gshapiro		/* check for timed out sessions? */
39414e22b5gshapiro		if (lastcheck + DT_CHECK_OLD_SESSIONS < now)
39514e22b5gshapiro		{
396a845449gshapiro			ctx = SM_TAILQ_FIRST(&WRK_CTX_HEAD);
397a845449gshapiro			while (ctx != SM_TAILQ_END(&WRK_CTX_HEAD))
39814e22b5gshapiro			{
399a845449gshapiro				SMFICTX_PTR ctx_nxt;
400a845449gshapiro
401a845449gshapiro				ctx_nxt = SM_TAILQ_NEXT(ctx, ctx_link);
40214e22b5gshapiro				if (ctx->ctx_wstate == WKST_WAITING)
40314e22b5gshapiro				{
40414e22b5gshapiro					if (ctx->ctx_wait == 0)
40514e22b5gshapiro						ctx->ctx_wait = now;
406a845449gshapiro					else if (ctx->ctx_wait + OLD_SESSION_TIMEOUT
407a845449gshapiro						 < now)
40814e22b5gshapiro					{
409a845449gshapiro						/* if session timed out, close it */
41014e22b5gshapiro						sfsistat (*fi_close) __P((SMFICTX *));
41114e22b5gshapiro
41214e22b5gshapiro						POOL_LEV_DPRINTF(4,
41314e22b5gshapiro							("Closing old connection: sd=%d id=%d",
41414e22b5gshapiro							ctx->ctx_sd,
41514e22b5gshapiro							ctx->ctx_sid));
41614e22b5gshapiro
41714e22b5gshapiro						if ((fi_close = ctx->ctx_smfi->xxfi_close) != NULL)
41814e22b5gshapiro							(void) (*fi_close)(ctx);
41914e22b5gshapiro
42014e22b5gshapiro						mi_close_session(ctx);
42114e22b5gshapiro					}
42214e22b5gshapiro				}
423a845449gshapiro				ctx = ctx_nxt;
42414e22b5gshapiro			}
42514e22b5gshapiro			lastcheck = now;
42614e22b5gshapiro		}
42714e22b5gshapiro
42814e22b5gshapiro		if (rebuild_set)
42914e22b5gshapiro		{
43014e22b5gshapiro			/*
43114e22b5gshapiro			**  Initialize poll set.
43214e22b5gshapiro			**  Insert into the poll set the file descriptors of
43314e22b5gshapiro			**  all sessions waiting for a command from sendmail.
43414e22b5gshapiro			*/
43514e22b5gshapiro
43614e22b5gshapiro			nfd = 0;
43714e22b5gshapiro
43814e22b5gshapiro			/* begin with worker pipe */
43914e22b5gshapiro			pfd[nfd].fd = RD_PIPE;
44014e22b5gshapiro			pfd[nfd].events = MI_POLL_RD_FLAGS;
44114e22b5gshapiro			pfd[nfd].revents = 0;
44214e22b5gshapiro			nfd++;
44314e22b5gshapiro
44414e22b5gshapiro			SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link)
44514e22b5gshapiro			{
44614e22b5gshapiro				/*
44714e22b5gshapiro				**  update ctx_wait - start of wait moment -
44814e22b5gshapiro				**  for timeout
44914e22b5gshapiro				*/
45014e22b5gshapiro
45114e22b5gshapiro				if (ctx->ctx_wstate == WKST_READY_TO_WAIT)
45214e22b5gshapiro					ctx->ctx_wait = now;
45314e22b5gshapiro
45414e22b5gshapiro				/* add the session to the pollfd array? */
45514e22b5gshapiro				if ((ctx->ctx_wstate == WKST_READY_TO_WAIT) ||
45614e22b5gshapiro				    (ctx->ctx_wstate == WKST_WAITING))
45714e22b5gshapiro				{
45814e22b5gshapiro					/*
45914e22b5gshapiro					**  Resize the pollfd array if it
46014e22b5gshapiro					**  isn't large enough.
46114e22b5gshapiro					*/
46214e22b5gshapiro
46314e22b5gshapiro					if (nfd >= dim_pfd)
46414e22b5gshapiro					{
46514e22b5gshapiro						struct pollfd *tpfd;
46614e22b5gshapiro						size_t new;
46714e22b5gshapiro
46814e22b5gshapiro						new = (dim_pfd + PFD_STEP) *
46914e22b5gshapiro							sizeof(*tpfd);
47014e22b5gshapiro						tpfd = (struct pollfd *)
47114e22b5gshapiro							realloc(pfd, new);
47214e22b5gshapiro						if (tpfd != NULL)
47314e22b5gshapiro						{
47414e22b5gshapiro							pfd = tpfd;
47514e22b5gshapiro							dim_pfd += PFD_STEP;
47614e22b5gshapiro						}
47714e22b5gshapiro						else
47814e22b5gshapiro						{
47914e22b5gshapiro							smi_log(SMI_LOG_ERR,
48014e22b5gshapiro								"Failed to realloc pollfd array:%s",
48114e22b5gshapiro								sm_errstring(errno));
48214e22b5gshapiro						}
48314e22b5gshapiro					}
48414e22b5gshapiro
48514e22b5gshapiro					/* add the session to pollfd array */
48614e22b5gshapiro					if (nfd < dim_pfd)
48714e22b5gshapiro					{
48814e22b5gshapiro						ctx->ctx_wstate = WKST_WAITING;
48914e22b5gshapiro						pfd[nfd].fd = ctx->ctx_sd;
49014e22b5gshapiro						pfd[nfd].events = MI_POLL_RD_FLAGS;
49114e22b5gshapiro						pfd[nfd].revents = 0;
49214e22b5gshapiro						nfd++;
49314e22b5gshapiro					}
49414e22b5gshapiro				}
49514e22b5gshapiro			}
496a845449gshapiro			rebuild_set = false;
49714e22b5gshapiro		}
49814e22b5gshapiro
49914e22b5gshapiro		TASKMGR_UNLOCK();
50014e22b5gshapiro
50114e22b5gshapiro		/* Everything is ready, let's wait for an event */
5029deed03gshapiro		r = poll(pfd, nfd, POLL_TIMEOUT);
50314e22b5gshapiro
50414e22b5gshapiro		POOL_LEV_DPRINTF(4, ("%s returned: at epoch %d value %d",
50514e22b5gshapiro			WAITFN, now, nfd));
50614e22b5gshapiro
50714e22b5gshapiro		/* timeout */
5089deed03gshapiro		if (r == 0)
50914e22b5gshapiro			continue;
51014e22b5gshapiro
51114e22b5gshapiro		rebuild_set = true;
51214e22b5gshapiro
51314e22b5gshapiro		/* error */
5149deed03gshapiro		if (r < 0)
51514e22b5gshapiro		{
51614e22b5gshapiro			if (errno == EINTR)
51714e22b5gshapiro				continue;
51814e22b5gshapiro			pcnt++;
51914e22b5gshapiro			smi_log(SMI_LOG_ERR,
52014e22b5gshapiro				"%s() failed (%s), %s",
52114e22b5gshapiro				WAITFN, sm_errstring(errno),
52214e22b5gshapiro				pcnt >= MAX_FAILS_S ? "abort" : "try again");
52314e22b5gshapiro
52414e22b5gshapiro			if (pcnt >= MAX_FAILS_S)
52514e22b5gshapiro				goto err;
5269deed03gshapiro			continue;
52714e22b5gshapiro		}
52814e22b5gshapiro		pcnt = 0;
52914e22b5gshapiro
53014e22b5gshapiro		/* something happened */
53114e22b5gshapiro		for (i = 0; i < nfd; i++)
53214e22b5gshapiro		{
53314e22b5gshapiro			if (pfd[i].revents == 0)
53414e22b5gshapiro				continue;
53514e22b5gshapiro
53614e22b5gshapiro			POOL_LEV_DPRINTF(4, ("%s event on pfd[%d/%d]=%d ",
53714e22b5gshapiro				WAITFN, i, nfd,
53814e22b5gshapiro			WAIT_FD(i)));
53914e22b5gshapiro
5409deed03gshapiro			/* has a worker signaled an end of task? */
54114e22b5gshapiro			if (WAIT_FD(i) == RD_PIPE)
54214e22b5gshapiro			{
543dd85ecegshapiro				char evts[256];
544dd85ecegshapiro				ssize_t r;
54514e22b5gshapiro
54614e22b5gshapiro				POOL_LEV_DPRINTF(4,
54714e22b5gshapiro					("PIPE WILL READ evt = %08X %08X",
54814e22b5gshapiro					pfd[i].events, pfd[i].revents));
54914e22b5gshapiro
550dd85ecegshapiro				r = 1;
551dd85ecegshapiro				while ((pfd[i].revents & MI_POLL_RD_FLAGS) != 0
552dd85ecegshapiro					&& r != -1)
55314e22b5gshapiro				{
554dd85ecegshapiro					r = read(RD_PIPE, evts, sizeof(evts));
55514e22b5gshapiro				}
55614e22b5gshapiro
55714e22b5gshapiro				POOL_LEV_DPRINTF(4,
55814e22b5gshapiro					("PIPE DONE READ i=[%d] fd=[%d] r=[%d] evt=[%d]",
559dd85ecegshapiro					i, RD_PIPE, (int) r, evts[0]));
56014e22b5gshapiro
56114e22b5gshapiro				if ((pfd[i].revents & ~MI_POLL_RD_FLAGS) != 0)
56214e22b5gshapiro				{
56314e22b5gshapiro					/* Exception handling */
56414e22b5gshapiro				}
56514e22b5gshapiro				continue;
56614e22b5gshapiro			}
56714e22b5gshapiro
5689deed03gshapiro			/*
5699deed03gshapiro			**  Not the pipe for workers waking us,
5709deed03gshapiro			**  so must be something on an MTA connection.
5719deed03gshapiro			*/
5729deed03gshapiro
5739deed03gshapiro			TASKMGR_LOCK();
57414e22b5gshapiro			SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link)
57514e22b5gshapiro			{
57614e22b5gshapiro				if (ctx->ctx_wstate != WKST_WAITING)
57714e22b5gshapiro					continue;
57814e22b5gshapiro
57914e22b5gshapiro				POOL_LEV_DPRINTF(4,
58014e22b5gshapiro					("Checking context sd=%d - fd=%d ",
58114e22b5gshapiro					ctx->ctx_sd , WAIT_FD(i)));
58214e22b5gshapiro
58314e22b5gshapiro				if (ctx->ctx_sd == pfd[i].fd)
58414e22b5gshapiro				{
58514e22b5gshapiro
58614e22b5gshapiro					POOL_LEV_DPRINTF(4,
58714e22b5gshapiro						("TASK: found %d for fd[%d]=%d",
58814e22b5gshapiro						ctx->ctx_sid, i, WAIT_FD(i)));
58914e22b5gshapiro
59014e22b5gshapiro					if (Tskmgr.tm_nb_idle > 0)
59114e22b5gshapiro					{
59214e22b5gshapiro						ctx->ctx_wstate = WKST_READY_TO_RUN;
59314e22b5gshapiro						TASKMGR_COND_SIGNAL();
59414e22b5gshapiro					}
59514e22b5gshapiro					else
59614e22b5gshapiro					{
59714e22b5gshapiro						ctx->ctx_wstate = WKST_RUNNING;
59814e22b5gshapiro						LAUNCH_WORKER(ctx);
59914e22b5gshapiro					}
60014e22b5gshapiro					break;
60114e22b5gshapiro				}
60214e22b5gshapiro			}
6039deed03gshapiro			TASKMGR_UNLOCK();
60414e22b5gshapiro
60514e22b5gshapiro			POOL_LEV_DPRINTF(4,
60614e22b5gshapiro				("TASK %s FOUND - Checking PIPE for fd[%d]",
60714e22b5gshapiro				ctx != NULL ? "" : "NOT", WAIT_FD(i)));
60814e22b5gshapiro		}
60914e22b5gshapiro	}
61014e22b5gshapiro
61114e22b5gshapiro  err:
61214e22b5gshapiro	if (pfd != NULL)
61314e22b5gshapiro		free(pfd);
61414e22b5gshapiro
61514e22b5gshapiro	Tskmgr.tm_signature = 0;
6169deed03gshapiro#if 0
6179deed03gshapiro	/*
6189deed03gshapiro	**  Do not clean up ctx -- it can cause double-free()s.
6199deed03gshapiro	**  The program is shutting down anyway, so it's not worth the trouble.
6209deed03gshapiro	**  There is a more complex solution that prevents race conditions
6219deed03gshapiro	**  while accessing ctx, but that's maybe for a later version.
6229deed03gshapiro	*/
6239deed03gshapiro
62414e22b5gshapiro	for (;;)
62514e22b5gshapiro	{
62614e22b5gshapiro		SMFICTX_PTR ctx;
62714e22b5gshapiro
62814e22b5gshapiro		ctx = SM_TAILQ_FIRST(&WRK_CTX_HEAD);
62914e22b5gshapiro		if (ctx == NULL)
63014e22b5gshapiro			break;
63114e22b5gshapiro		mi_close_session(ctx);
63214e22b5gshapiro	}
6339deed03gshapiro#endif
63414e22b5gshapiro
63514e22b5gshapiro	(void) smutex_destroy(&Tskmgr.tm_w_mutex);
63614e22b5gshapiro	(void) scond_destroy(&Tskmgr.tm_w_cond);
63714e22b5gshapiro
63814e22b5gshapiro	return NULL;
63914e22b5gshapiro}
64014e22b5gshapiro
64114e22b5gshapiro/*
64214e22b5gshapiro**  Look for a task ready to run.
64314e22b5gshapiro**  Value of ctx is NULL or a pointer to a task ready to run.
64414e22b5gshapiro*/
64514e22b5gshapiro
64614e22b5gshapiro#define GET_TASK_READY_TO_RUN()					\
64714e22b5gshapiro	SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link)		\
64814e22b5gshapiro	{							\
64914e22b5gshapiro		if (ctx->ctx_wstate == WKST_READY_TO_RUN)	\
65014e22b5gshapiro		{						\
65114e22b5gshapiro			ctx->ctx_wstate = WKST_RUNNING;		\
65214e22b5gshapiro			break;					\
65314e22b5gshapiro		}						\
65414e22b5gshapiro	}
65514e22b5gshapiro
65614e22b5gshapiro/*
65714e22b5gshapiro**  MI_WORKER -- worker thread
65814e22b5gshapiro**	executes tasks distributed by the mi_pool_controller
65914e22b5gshapiro**	or by mi_start_session
66014e22b5gshapiro**
66114e22b5gshapiro**	Parameters:
66214e22b5gshapiro**		arg -- pointer to context structure
66314e22b5gshapiro**
66414e22b5gshapiro**	Returns:
66514e22b5gshapiro**		NULL pointer
66614e22b5gshapiro*/
66714e22b5gshapiro
66814e22b5gshapirostatic void *
66914e22b5gshapiromi_worker(arg)
67014e22b5gshapiro	void *arg;
67114e22b5gshapiro{
67214e22b5gshapiro	SMFICTX_PTR ctx;
67314e22b5gshapiro	bool done;
67414e22b5gshapiro	sthread_t t_id;
67514e22b5gshapiro	int r;
67614e22b5gshapiro
67714e22b5gshapiro	ctx = (SMFICTX_PTR) arg;
67814e22b5gshapiro	done = false;
67914e22b5gshapiro	if (ctx != NULL)
68014e22b5gshapiro		ctx->ctx_wstate = WKST_RUNNING;
68114e22b5gshapiro
68214e22b5gshapiro	t_id = sthread_get_id();
68314e22b5gshapiro	if (pthread_detach(t_id) != 0)
68414e22b5gshapiro	{
68514e22b5gshapiro		smi_log(SMI_LOG_ERR, "Failed to detach worker thread");
68614e22b5gshapiro		if (ctx != NULL)
68714e22b5gshapiro			ctx->ctx_wstate = WKST_READY_TO_RUN;
68814e22b5gshapiro		return NULL;
68914e22b5gshapiro	}
69014e22b5gshapiro
69114e22b5gshapiro	TASKMGR_LOCK();
69214e22b5gshapiro	Tskmgr.tm_nb_workers++;
69314e22b5gshapiro	TASKMGR_UNLOCK();
69414e22b5gshapiro
69514e22b5gshapiro	while (!done)
69614e22b5gshapiro	{
69714e22b5gshapiro		if (mi_stop() != MILTER_CONT)
69814e22b5gshapiro			break;
69914e22b5gshapiro
70014e22b5gshapiro		/* let's handle next task... */
70114e22b5gshapiro		if (ctx != NULL)
70214e22b5gshapiro		{
70314e22b5gshapiro			int res;
70414e22b5gshapiro
70514e22b5gshapiro			POOL_LEV_DPRINTF(4,
70614e22b5gshapiro				("worker %d: new task -> let's handle it",
70714e22b5gshapiro				t_id));
70814e22b5gshapiro			res = mi_engine(ctx);
70914e22b5gshapiro			POOL_LEV_DPRINTF(4,
71014e22b5gshapiro				("worker %d: mi_engine returned %d", t_id, res));
71114e22b5gshapiro
71214e22b5gshapiro			TASKMGR_LOCK();
71314e22b5gshapiro			if (res != MI_CONTINUE)
71414e22b5gshapiro			{
71514e22b5gshapiro				ctx->ctx_wstate = WKST_CLOSING;
71614e22b5gshapiro
71714e22b5gshapiro				/*
71814e22b5gshapiro				**  Delete context from linked list of
71914e22b5gshapiro				**  sessions and close session.
72014e22b5gshapiro				*/
72114e22b5gshapiro
72214e22b5gshapiro				mi_close_session(ctx);
72314e22b5gshapiro			}
72414e22b5gshapiro			else
72514e22b5gshapiro			{
72614e22b5gshapiro				ctx->ctx_wstate = WKST_READY_TO_WAIT;
72714e22b5gshapiro
72814e22b5gshapiro				POOL_LEV_DPRINTF(4,
72914e22b5gshapiro					("writing to event pipe..."));
73014e22b5gshapiro
73114e22b5gshapiro				/*
73214e22b5gshapiro				**  Signal task controller to add new session
73314e22b5gshapiro				**  to poll set.
73414e22b5gshapiro				*/
73514e22b5gshapiro
73614e22b5gshapiro				PIPE_SEND_SIGNAL();
73714e22b5gshapiro			}
73814e22b5gshapiro			TASKMGR_UNLOCK();
73914e22b5gshapiro			ctx = NULL;
74014e22b5gshapiro
74114e22b5gshapiro		}
74214e22b5gshapiro
74314e22b5gshapiro		/* check if there is any task waiting to be served */
74414e22b5gshapiro		TASKMGR_LOCK();
74514e22b5gshapiro
74614e22b5gshapiro		GET_TASK_READY_TO_RUN();
74714e22b5gshapiro
74814e22b5gshapiro		/* Got a task? */
74914e22b5gshapiro		if (ctx != NULL)
75014e22b5gshapiro		{
75114e22b5gshapiro			TASKMGR_UNLOCK();
75214e22b5gshapiro			continue;
75314e22b5gshapiro		}
75414e22b5gshapiro
75514e22b5gshapiro		/*
75614e22b5gshapiro		**  if not, let's check if there is enough idle workers
75714e22b5gshapiro		**	if yes: quit
75814e22b5gshapiro		*/
75914e22b5gshapiro
76014e22b5gshapiro		if (Tskmgr.tm_nb_workers > MIN_WORKERS &&
76114e22b5gshapiro		    Tskmgr.tm_nb_idle > MIN_IDLE)
76214e22b5gshapiro			done = true;
76314e22b5gshapiro
76414e22b5gshapiro		POOL_LEV_DPRINTF(4, ("worker %d: checking ... %d %d", t_id,
76514e22b5gshapiro			Tskmgr.tm_nb_workers, Tskmgr.tm_nb_idle + 1));
76614e22b5gshapiro
76714e22b5gshapiro		if (done)
76814e22b5gshapiro		{
76914e22b5gshapiro			POOL_LEV_DPRINTF(4, ("worker %d: quitting... ", t_id));
77014e22b5gshapiro			Tskmgr.tm_nb_workers--;
77114e22b5gshapiro			TASKMGR_UNLOCK();
77214e22b5gshapiro			continue;
77314e22b5gshapiro		}
77414e22b5gshapiro
77514e22b5gshapiro		/*
77614e22b5gshapiro		**  if no task ready to run, wait for another one
77714e22b5gshapiro		*/
77814e22b5gshapiro
77914e22b5gshapiro		Tskmgr.tm_nb_idle++;
78014e22b5gshapiro		TASKMGR_COND_WAIT();
78114e22b5gshapiro		Tskmgr.tm_nb_idle--;
78214e22b5gshapiro
78314e22b5gshapiro		/* look for a task */
78414e22b5gshapiro		GET_TASK_READY_TO_RUN();
78514e22b5gshapiro
78614e22b5gshapiro		TASKMGR_UNLOCK();
78714e22b5gshapiro	}
78814e22b5gshapiro	return NULL;
78914e22b5gshapiro}
79014e22b5gshapiro
79114e22b5gshapiro/*
79214e22b5gshapiro**  MI_LIST_ADD_CTX -- add new session to linked list
79314e22b5gshapiro**
79414e22b5gshapiro**	Parameters:
79514e22b5gshapiro**		ctx -- context structure
79614e22b5gshapiro**
79714e22b5gshapiro**	Returns:
79814e22b5gshapiro**		MI_FAILURE/MI_SUCCESS
79914e22b5gshapiro*/
80014e22b5gshapiro
80114e22b5gshapirostatic int
80214e22b5gshapiromi_list_add_ctx(ctx)
80314e22b5gshapiro	SMFICTX_PTR ctx;
80414e22b5gshapiro{
80514e22b5gshapiro	SM_ASSERT(ctx != NULL);
80614e22b5gshapiro	SM_TAILQ_INSERT_TAIL(&WRK_CTX_HEAD, ctx, ctx_link);
80714e22b5gshapiro	return MI_SUCCESS;
80814e22b5gshapiro}
80914e22b5gshapiro
81014e22b5gshapiro/*
81114e22b5gshapiro**  MI_LIST_DEL_CTX -- remove session from linked list when finished
81214e22b5gshapiro**
81314e22b5gshapiro**	Parameters:
81414e22b5gshapiro**		ctx -- context structure
81514e22b5gshapiro**
81614e22b5gshapiro**	Returns:
81714e22b5gshapiro**		MI_FAILURE/MI_SUCCESS
81814e22b5gshapiro*/
81914e22b5gshapiro
82014e22b5gshapirostatic int
82114e22b5gshapiromi_list_del_ctx(ctx)
82214e22b5gshapiro	SMFICTX_PTR ctx;
82314e22b5gshapiro{
82414e22b5gshapiro	SM_ASSERT(ctx != NULL);
82514e22b5gshapiro	if (SM_TAILQ_EMPTY(&WRK_CTX_HEAD))
82614e22b5gshapiro		return MI_FAILURE;
82714e22b5gshapiro
82814e22b5gshapiro	SM_TAILQ_REMOVE(&WRK_CTX_HEAD, ctx, ctx_link);
82914e22b5gshapiro	return MI_SUCCESS;
83014e22b5gshapiro}
83114e22b5gshapiro#endif /* _FFR_WORKERS_POOL */
832