1058561cbSjbeck /*
2*e9af4bc0SJohn Beck  *  Copyright (c) 2003-2004, 2007, 2009 Sendmail, Inc. and its suppliers.
3058561cbSjbeck  *	All rights reserved.
4058561cbSjbeck  *
5058561cbSjbeck  * By using this file, you agree to the terms and conditions set
6058561cbSjbeck  * forth in the LICENSE file which can be found at the top level of
7058561cbSjbeck  * the sendmail distribution.
8058561cbSjbeck  *
9058561cbSjbeck  * Contributed by Jose Marcio Martins da Cruz - Ecole des Mines de Paris
10058561cbSjbeck  *   Jose-Marcio.Martins@ensmp.fr
11058561cbSjbeck  */
12058561cbSjbeck 
13058561cbSjbeck #include <sm/gen.h>
14*e9af4bc0SJohn Beck SM_RCSID("@(#)$Id: worker.c,v 8.17 2009/06/15 15:34:54 ca Exp $")
15058561cbSjbeck 
16058561cbSjbeck #include "libmilter.h"
17058561cbSjbeck 
18058561cbSjbeck #if _FFR_WORKERS_POOL
19058561cbSjbeck 
20058561cbSjbeck typedef struct taskmgr_S taskmgr_T;
21058561cbSjbeck 
22058561cbSjbeck #define TM_SIGNATURE		0x23021957
23058561cbSjbeck 
24058561cbSjbeck struct taskmgr_S
25058561cbSjbeck {
26058561cbSjbeck 	long		tm_signature; /* has the controller been initialized */
27058561cbSjbeck 	sthread_t	tm_tid;	/* thread id of controller */
28058561cbSjbeck 	smfi_hd_T	tm_ctx_head; /* head of the linked list of contexts */
29058561cbSjbeck 
30058561cbSjbeck 	int		tm_nb_workers;	/* number of workers in the pool */
31058561cbSjbeck 	int		tm_nb_idle;	/* number of workers waiting */
32058561cbSjbeck 
33058561cbSjbeck 	int		tm_p[2];	/* poll control pipe */
34058561cbSjbeck 
35058561cbSjbeck 	smutex_t	tm_w_mutex;	/* linked list access mutex */
36058561cbSjbeck 	scond_t		tm_w_cond;	/* */
37058561cbSjbeck };
38058561cbSjbeck 
39058561cbSjbeck static taskmgr_T     Tskmgr = {0};
40058561cbSjbeck 
41058561cbSjbeck #define WRK_CTX_HEAD	Tskmgr.tm_ctx_head
42058561cbSjbeck 
43058561cbSjbeck #define RD_PIPE	(Tskmgr.tm_p[0])
44058561cbSjbeck #define WR_PIPE	(Tskmgr.tm_p[1])
45058561cbSjbeck 
46058561cbSjbeck #define PIPE_SEND_SIGNAL()						\
47058561cbSjbeck 	do								\
48058561cbSjbeck 	{								\
49058561cbSjbeck 		char evt = 0x5a;					\
50058561cbSjbeck 		int fd = WR_PIPE;					\
51058561cbSjbeck 		if (write(fd, &evt, sizeof(evt)) != sizeof(evt))	\
52058561cbSjbeck 			smi_log(SMI_LOG_ERR,				\
53058561cbSjbeck 				"Error writing to event pipe: %s",	\
54058561cbSjbeck 				sm_errstring(errno));			\
55058561cbSjbeck 	} while (0)
56058561cbSjbeck 
57058561cbSjbeck #ifndef USE_PIPE_WAKE_POLL
58058561cbSjbeck # define USE_PIPE_WAKE_POLL 1
59058561cbSjbeck #endif /* USE_PIPE_WAKE_POLL */
60058561cbSjbeck 
61058561cbSjbeck /* poll check periodicity (default 10000 - 10 s) */
62058561cbSjbeck #define POLL_TIMEOUT   10000
63058561cbSjbeck 
64058561cbSjbeck /* worker conditional wait timeout (default 10 s) */
65058561cbSjbeck #define COND_TIMEOUT     10
66058561cbSjbeck 
67058561cbSjbeck /* functions */
68058561cbSjbeck static int mi_close_session __P((SMFICTX_PTR));
69058561cbSjbeck 
70058561cbSjbeck static void *mi_worker __P((void *));
71058561cbSjbeck static void *mi_pool_controller __P((void *));
72058561cbSjbeck 
73058561cbSjbeck static int mi_list_add_ctx __P((SMFICTX_PTR));
74058561cbSjbeck static int mi_list_del_ctx __P((SMFICTX_PTR));
75058561cbSjbeck 
76058561cbSjbeck /*
77058561cbSjbeck **  periodicity of cleaning up old sessions (timedout)
78058561cbSjbeck **	sessions list will be checked to find old inactive
79058561cbSjbeck **	sessions each DT_CHECK_OLD_SESSIONS sec
80058561cbSjbeck */
81058561cbSjbeck 
82058561cbSjbeck #define DT_CHECK_OLD_SESSIONS   600
83058561cbSjbeck 
84058561cbSjbeck #ifndef OLD_SESSION_TIMEOUT
85058561cbSjbeck # define OLD_SESSION_TIMEOUT      ctx->ctx_timeout
86058561cbSjbeck #endif /* OLD_SESSION_TIMEOUT */
87058561cbSjbeck 
88058561cbSjbeck /* session states - with respect to the pool of workers */
89058561cbSjbeck #define WKST_INIT		0	/* initial state */
90058561cbSjbeck #define WKST_READY_TO_RUN	1	/* command ready do be read */
91058561cbSjbeck #define WKST_RUNNING		2	/* session running on a worker */
92058561cbSjbeck #define WKST_READY_TO_WAIT	3	/* session just finished by a worker */
93058561cbSjbeck #define WKST_WAITING		4	/* waiting for new command */
94058561cbSjbeck #define WKST_CLOSING		5	/* session finished */
95058561cbSjbeck 
96058561cbSjbeck #ifndef MIN_WORKERS
97058561cbSjbeck # define MIN_WORKERS	2  /* minimum number of threads to keep around */
98058561cbSjbeck #endif
99058561cbSjbeck 
100058561cbSjbeck #define MIN_IDLE	1  /* minimum number of idle threads */
101058561cbSjbeck 
102058561cbSjbeck 
103058561cbSjbeck /*
104058561cbSjbeck **  Macros for threads and mutex management
105058561cbSjbeck */
106058561cbSjbeck 
107058561cbSjbeck #define TASKMGR_LOCK()							\
108058561cbSjbeck 	do								\
109058561cbSjbeck 	{								\
110058561cbSjbeck 		if (!smutex_lock(&Tskmgr.tm_w_mutex))			\
111058561cbSjbeck 			smi_log(SMI_LOG_ERR, "TASKMGR_LOCK error");	\
112058561cbSjbeck 	} while (0)
113058561cbSjbeck 
114058561cbSjbeck #define TASKMGR_UNLOCK()						\
115058561cbSjbeck 	do								\
116058561cbSjbeck 	{								\
117058561cbSjbeck 		if (!smutex_unlock(&Tskmgr.tm_w_mutex))			\
118058561cbSjbeck 			smi_log(SMI_LOG_ERR, "TASKMGR_UNLOCK error");	\
119058561cbSjbeck 	} while (0)
120058561cbSjbeck 
121058561cbSjbeck #define	TASKMGR_COND_WAIT()						\
122058561cbSjbeck 	scond_timedwait(&Tskmgr.tm_w_cond, &Tskmgr.tm_w_mutex, COND_TIMEOUT)
123058561cbSjbeck 
124058561cbSjbeck #define	TASKMGR_COND_SIGNAL()						\
125058561cbSjbeck 	do								\
126058561cbSjbeck 	{								\
127058561cbSjbeck 		if (scond_signal(&Tskmgr.tm_w_cond) != 0)		\
128058561cbSjbeck 			smi_log(SMI_LOG_ERR, "TASKMGR_COND_SIGNAL error"); \
129058561cbSjbeck 	} while (0)
130058561cbSjbeck 
131058561cbSjbeck #define LAUNCH_WORKER(ctx)						\
132058561cbSjbeck 	do								\
133058561cbSjbeck 	{								\
134058561cbSjbeck 		int r;							\
135058561cbSjbeck 		sthread_t tid;						\
136058561cbSjbeck 									\
137058561cbSjbeck 		if ((r = thread_create(&tid, mi_worker, ctx)) != 0)	\
138058561cbSjbeck 			smi_log(SMI_LOG_ERR, "LAUNCH_WORKER error: %s",\
139058561cbSjbeck 				sm_errstring(r));			\
140058561cbSjbeck 	} while (0)
141058561cbSjbeck 
142058561cbSjbeck #if POOL_DEBUG
143058561cbSjbeck # define POOL_LEV_DPRINTF(lev, x)					\
144058561cbSjbeck 	do {								\
145058561cbSjbeck 		if ((lev) < ctx->ctx_dbg)				\
146058561cbSjbeck 			sm_dprintf x;					\
147058561cbSjbeck 	} while (0)
148058561cbSjbeck #else /* POOL_DEBUG */
149058561cbSjbeck # define POOL_LEV_DPRINTF(lev, x)
150058561cbSjbeck #endif /* POOL_DEBUG */
151058561cbSjbeck 
152058561cbSjbeck /*
153058561cbSjbeck **  MI_START_SESSION -- Start a session in the pool of workers
154058561cbSjbeck **
155058561cbSjbeck **	Parameters:
156058561cbSjbeck **		ctx -- context structure
157058561cbSjbeck **
158058561cbSjbeck **	Returns:
159058561cbSjbeck **		MI_SUCCESS/MI_FAILURE
160058561cbSjbeck */
161058561cbSjbeck 
162058561cbSjbeck int
mi_start_session(ctx)163058561cbSjbeck mi_start_session(ctx)
164058561cbSjbeck 	SMFICTX_PTR ctx;
165058561cbSjbeck {
166058561cbSjbeck 	static long id = 0;
167058561cbSjbeck 
168058561cbSjbeck 	SM_ASSERT(Tskmgr.tm_signature == TM_SIGNATURE);
169058561cbSjbeck 	SM_ASSERT(ctx != NULL);
170058561cbSjbeck 	POOL_LEV_DPRINTF(4, ("PIPE r=[%d] w=[%d]", RD_PIPE, WR_PIPE));
171058561cbSjbeck 	TASKMGR_LOCK();
172058561cbSjbeck 
173058561cbSjbeck 	if (mi_list_add_ctx(ctx) != MI_SUCCESS)
174058561cbSjbeck 	{
175058561cbSjbeck 		TASKMGR_UNLOCK();
176058561cbSjbeck 		return MI_FAILURE;
177058561cbSjbeck 	}
178058561cbSjbeck 
179058561cbSjbeck 	ctx->ctx_sid = id++;
180058561cbSjbeck 
181058561cbSjbeck 	/* if there is an idle worker, signal it, otherwise start new worker */
182058561cbSjbeck 	if (Tskmgr.tm_nb_idle > 0)
183058561cbSjbeck 	{
184058561cbSjbeck 		ctx->ctx_wstate = WKST_READY_TO_RUN;
185058561cbSjbeck 		TASKMGR_COND_SIGNAL();
186058561cbSjbeck 	}
187058561cbSjbeck 	else
188058561cbSjbeck 	{
189058561cbSjbeck 		ctx->ctx_wstate = WKST_RUNNING;
190058561cbSjbeck 		LAUNCH_WORKER(ctx);
191058561cbSjbeck 	}
192058561cbSjbeck 	TASKMGR_UNLOCK();
193058561cbSjbeck 	return MI_SUCCESS;
194058561cbSjbeck }
195058561cbSjbeck 
196058561cbSjbeck /*
197058561cbSjbeck **  MI_CLOSE_SESSION -- Close a session and clean up data structures
198058561cbSjbeck **
199058561cbSjbeck **	Parameters:
200058561cbSjbeck **		ctx -- context structure
201058561cbSjbeck **
202058561cbSjbeck **	Returns:
203058561cbSjbeck **		MI_SUCCESS/MI_FAILURE
204058561cbSjbeck */
205058561cbSjbeck 
206058561cbSjbeck static int
mi_close_session(ctx)207058561cbSjbeck mi_close_session(ctx)
208058561cbSjbeck 	SMFICTX_PTR ctx;
209058561cbSjbeck {
210058561cbSjbeck 	SM_ASSERT(ctx != NULL);
211058561cbSjbeck 
212058561cbSjbeck 	(void) mi_list_del_ctx(ctx);
213*e9af4bc0SJohn Beck 	mi_clr_ctx(ctx);
214058561cbSjbeck 
215058561cbSjbeck 	return MI_SUCCESS;
216058561cbSjbeck }
217058561cbSjbeck 
218058561cbSjbeck /*
219058561cbSjbeck **  MI_POOL_CONTROLER_INIT -- Launch the worker pool controller
220058561cbSjbeck **		Must be called before starting sessions.
221058561cbSjbeck **
222058561cbSjbeck **	Parameters:
223058561cbSjbeck **		none
224058561cbSjbeck **
225058561cbSjbeck **	Returns:
226058561cbSjbeck **		MI_SUCCESS/MI_FAILURE
227058561cbSjbeck */
228058561cbSjbeck 
229058561cbSjbeck int
mi_pool_controller_init()230058561cbSjbeck mi_pool_controller_init()
231058561cbSjbeck {
232058561cbSjbeck 	sthread_t tid;
233058561cbSjbeck 	int r, i;
234058561cbSjbeck 
235058561cbSjbeck 	if (Tskmgr.tm_signature == TM_SIGNATURE)
236058561cbSjbeck 		return MI_SUCCESS;
237058561cbSjbeck 
238058561cbSjbeck 	SM_TAILQ_INIT(&WRK_CTX_HEAD);
239058561cbSjbeck 	Tskmgr.tm_tid = (sthread_t) -1;
240058561cbSjbeck 	Tskmgr.tm_nb_workers = 0;
241058561cbSjbeck 	Tskmgr.tm_nb_idle = 0;
242058561cbSjbeck 
243058561cbSjbeck 	if (pipe(Tskmgr.tm_p) != 0)
244058561cbSjbeck 	{
245058561cbSjbeck 		smi_log(SMI_LOG_ERR, "can't create event pipe: %s",
246*e9af4bc0SJohn Beck 			sm_errstring(errno));
247058561cbSjbeck 		return MI_FAILURE;
248058561cbSjbeck 	}
249058561cbSjbeck 
250058561cbSjbeck 	(void) smutex_init(&Tskmgr.tm_w_mutex);
251058561cbSjbeck 	(void) scond_init(&Tskmgr.tm_w_cond);
252058561cbSjbeck 
253058561cbSjbeck 	/* Launch the pool controller */
254058561cbSjbeck 	if ((r = thread_create(&tid, mi_pool_controller, (void *) NULL)) != 0)
255058561cbSjbeck 	{
256058561cbSjbeck 		smi_log(SMI_LOG_ERR, "can't create controller thread: %s",
257058561cbSjbeck 			sm_errstring(r));
258058561cbSjbeck 		return MI_FAILURE;
259058561cbSjbeck 	}
260058561cbSjbeck 	Tskmgr.tm_tid = tid;
261058561cbSjbeck 	Tskmgr.tm_signature = TM_SIGNATURE;
262058561cbSjbeck 
263058561cbSjbeck 	/* Create the pool of workers */
264058561cbSjbeck 	for (i = 0; i < MIN_WORKERS; i++)
265058561cbSjbeck 	{
266058561cbSjbeck 		if ((r = thread_create(&tid, mi_worker, (void *) NULL)) != 0)
267058561cbSjbeck 		{
268058561cbSjbeck 			smi_log(SMI_LOG_ERR, "can't create workers crew: %s",
269058561cbSjbeck 				sm_errstring(r));
270058561cbSjbeck 			return MI_FAILURE;
271058561cbSjbeck 		}
272058561cbSjbeck 	}
273058561cbSjbeck 
274058561cbSjbeck 	return MI_SUCCESS;
275058561cbSjbeck }
276058561cbSjbeck 
277058561cbSjbeck /*
278058561cbSjbeck **  MI_POOL_CONTROLLER -- manage the pool of workers
279058561cbSjbeck **	This thread must be running when listener begins
280058561cbSjbeck **	starting sessions
281058561cbSjbeck **
282058561cbSjbeck **	Parameters:
283058561cbSjbeck **		arg -- unused
284058561cbSjbeck **
285058561cbSjbeck **	Returns:
286058561cbSjbeck **		NULL
287058561cbSjbeck **
288058561cbSjbeck **	Control flow:
289058561cbSjbeck **		for (;;)
290058561cbSjbeck **			Look for timed out sessions
291058561cbSjbeck **			Select sessions to wait for sendmail command
292058561cbSjbeck **			Poll set of file descriptors
293058561cbSjbeck **			if timeout
294058561cbSjbeck **				continue
295058561cbSjbeck **			For each file descriptor ready
296058561cbSjbeck **				launch new thread if no worker available
297058561cbSjbeck **				else
298058561cbSjbeck **				signal waiting worker
299058561cbSjbeck */
300058561cbSjbeck 
301058561cbSjbeck /* Poll structure array (pollfd) size step */
302058561cbSjbeck #define PFD_STEP	256
303058561cbSjbeck 
304058561cbSjbeck #define WAIT_FD(i)	(pfd[i].fd)
305058561cbSjbeck #define WAITFN		"POLL"
306058561cbSjbeck 
307058561cbSjbeck static void *
mi_pool_controller(arg)308058561cbSjbeck mi_pool_controller(arg)
309058561cbSjbeck 	void *arg;
310058561cbSjbeck {
311058561cbSjbeck 	struct pollfd *pfd = NULL;
312058561cbSjbeck 	int dim_pfd = 0;
313058561cbSjbeck 	bool rebuild_set = true;
314058561cbSjbeck 	int pcnt = 0; /* error count for poll() failures */
315*e9af4bc0SJohn Beck 	time_t lastcheck;
316058561cbSjbeck 
317058561cbSjbeck 	Tskmgr.tm_tid = sthread_get_id();
318058561cbSjbeck 	if (pthread_detach(Tskmgr.tm_tid) != 0)
319058561cbSjbeck 	{
320058561cbSjbeck 		smi_log(SMI_LOG_ERR, "Failed to detach pool controller thread");
321058561cbSjbeck 		return NULL;
322058561cbSjbeck 	}
323058561cbSjbeck 
324058561cbSjbeck 	pfd = (struct pollfd *) malloc(PFD_STEP * sizeof(struct pollfd));
325058561cbSjbeck 	if (pfd == NULL)
326058561cbSjbeck 	{
327058561cbSjbeck 		smi_log(SMI_LOG_ERR, "Failed to malloc pollfd array: %s",
328058561cbSjbeck 			sm_errstring(errno));
329058561cbSjbeck 		return NULL;
330058561cbSjbeck 	}
331058561cbSjbeck 	dim_pfd = PFD_STEP;
332058561cbSjbeck 
333*e9af4bc0SJohn Beck 	lastcheck = time(NULL);
334058561cbSjbeck 	for (;;)
335058561cbSjbeck 	{
336058561cbSjbeck 		SMFICTX_PTR ctx;
337058561cbSjbeck 		int nfd, rfd, i;
338058561cbSjbeck 		time_t now;
339058561cbSjbeck 
340058561cbSjbeck 		POOL_LEV_DPRINTF(4, ("Let's %s again...", WAITFN));
341058561cbSjbeck 
342058561cbSjbeck 		if (mi_stop() != MILTER_CONT)
343058561cbSjbeck 			break;
344058561cbSjbeck 
345058561cbSjbeck 		TASKMGR_LOCK();
346058561cbSjbeck 
347058561cbSjbeck 		now = time(NULL);
348058561cbSjbeck 
349058561cbSjbeck 		/* check for timed out sessions? */
350058561cbSjbeck 		if (lastcheck + DT_CHECK_OLD_SESSIONS < now)
351058561cbSjbeck 		{
352*e9af4bc0SJohn Beck 			ctx = SM_TAILQ_FIRST(&WRK_CTX_HEAD);
353*e9af4bc0SJohn Beck 			while (ctx != SM_TAILQ_END(&WRK_CTX_HEAD))
354058561cbSjbeck 			{
355*e9af4bc0SJohn Beck 				SMFICTX_PTR ctx_nxt;
356*e9af4bc0SJohn Beck 
357*e9af4bc0SJohn Beck 				ctx_nxt = SM_TAILQ_NEXT(ctx, ctx_link);
358058561cbSjbeck 				if (ctx->ctx_wstate == WKST_WAITING)
359058561cbSjbeck 				{
360058561cbSjbeck 					if (ctx->ctx_wait == 0)
361058561cbSjbeck 						ctx->ctx_wait = now;
362*e9af4bc0SJohn Beck 					else if (ctx->ctx_wait + OLD_SESSION_TIMEOUT
363*e9af4bc0SJohn Beck 						 < now)
364058561cbSjbeck 					{
365*e9af4bc0SJohn Beck 						/* if session timed out, close it */
366058561cbSjbeck 						sfsistat (*fi_close) __P((SMFICTX *));
367058561cbSjbeck 
368058561cbSjbeck 						POOL_LEV_DPRINTF(4,
369058561cbSjbeck 							("Closing old connection: sd=%d id=%d",
370058561cbSjbeck 							ctx->ctx_sd,
371058561cbSjbeck 							ctx->ctx_sid));
372058561cbSjbeck 
373058561cbSjbeck 						if ((fi_close = ctx->ctx_smfi->xxfi_close) != NULL)
374058561cbSjbeck 							(void) (*fi_close)(ctx);
375058561cbSjbeck 
376058561cbSjbeck 						mi_close_session(ctx);
377058561cbSjbeck 					}
378058561cbSjbeck 				}
379*e9af4bc0SJohn Beck 				ctx = ctx_nxt;
380058561cbSjbeck 			}
381058561cbSjbeck 			lastcheck = now;
382058561cbSjbeck 		}
383058561cbSjbeck 
384058561cbSjbeck 		if (rebuild_set)
385058561cbSjbeck 		{
386058561cbSjbeck 			/*
387058561cbSjbeck 			**  Initialize poll set.
388058561cbSjbeck 			**  Insert into the poll set the file descriptors of
389058561cbSjbeck 			**  all sessions waiting for a command from sendmail.
390058561cbSjbeck 			*/
391058561cbSjbeck 
392058561cbSjbeck 			nfd = 0;
393058561cbSjbeck 
394058561cbSjbeck 			/* begin with worker pipe */
395058561cbSjbeck 			pfd[nfd].fd = RD_PIPE;
396058561cbSjbeck 			pfd[nfd].events = MI_POLL_RD_FLAGS;
397058561cbSjbeck 			pfd[nfd].revents = 0;
398058561cbSjbeck 			nfd++;
399058561cbSjbeck 
400058561cbSjbeck 			SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link)
401058561cbSjbeck 			{
402058561cbSjbeck 				/*
403058561cbSjbeck 				**  update ctx_wait - start of wait moment -
404058561cbSjbeck 				**  for timeout
405058561cbSjbeck 				*/
406058561cbSjbeck 
407058561cbSjbeck 				if (ctx->ctx_wstate == WKST_READY_TO_WAIT)
408058561cbSjbeck 					ctx->ctx_wait = now;
409058561cbSjbeck 
410058561cbSjbeck 				/* add the session to the pollfd array? */
411058561cbSjbeck 				if ((ctx->ctx_wstate == WKST_READY_TO_WAIT) ||
412058561cbSjbeck 				    (ctx->ctx_wstate == WKST_WAITING))
413058561cbSjbeck 				{
414058561cbSjbeck 					/*
415058561cbSjbeck 					**  Resize the pollfd array if it
416058561cbSjbeck 					**  isn't large enough.
417058561cbSjbeck 					*/
418058561cbSjbeck 
419058561cbSjbeck 					if (nfd >= dim_pfd)
420058561cbSjbeck 					{
421058561cbSjbeck 						struct pollfd *tpfd;
422058561cbSjbeck 						size_t new;
423058561cbSjbeck 
424058561cbSjbeck 						new = (dim_pfd + PFD_STEP) *
425058561cbSjbeck 							sizeof(*tpfd);
426058561cbSjbeck 						tpfd = (struct pollfd *)
427058561cbSjbeck 							realloc(pfd, new);
428058561cbSjbeck 						if (tpfd != NULL)
429058561cbSjbeck 						{
430058561cbSjbeck 							pfd = tpfd;
431058561cbSjbeck 							dim_pfd += PFD_STEP;
432058561cbSjbeck 						}
433058561cbSjbeck 						else
434058561cbSjbeck 						{
435058561cbSjbeck 							smi_log(SMI_LOG_ERR,
436058561cbSjbeck 								"Failed to realloc pollfd array:%s",
437058561cbSjbeck 								sm_errstring(errno));
438058561cbSjbeck 						}
439058561cbSjbeck 					}
440058561cbSjbeck 
441058561cbSjbeck 					/* add the session to pollfd array */
442058561cbSjbeck 					if (nfd < dim_pfd)
443058561cbSjbeck 					{
444058561cbSjbeck 						ctx->ctx_wstate = WKST_WAITING;
445058561cbSjbeck 						pfd[nfd].fd = ctx->ctx_sd;
446058561cbSjbeck 						pfd[nfd].events = MI_POLL_RD_FLAGS;
447058561cbSjbeck 						pfd[nfd].revents = 0;
448058561cbSjbeck 						nfd++;
449058561cbSjbeck 					}
450058561cbSjbeck 				}
451058561cbSjbeck 			}
452*e9af4bc0SJohn Beck 			rebuild_set = false;
453058561cbSjbeck 		}
454058561cbSjbeck 
455058561cbSjbeck 		TASKMGR_UNLOCK();
456058561cbSjbeck 
457058561cbSjbeck 		/* Everything is ready, let's wait for an event */
458058561cbSjbeck 		rfd = poll(pfd, nfd, POLL_TIMEOUT);
459058561cbSjbeck 
460058561cbSjbeck 		POOL_LEV_DPRINTF(4, ("%s returned: at epoch %d value %d",
461058561cbSjbeck 			WAITFN, now, nfd));
462058561cbSjbeck 
463058561cbSjbeck 		/* timeout */
464058561cbSjbeck 		if (rfd == 0)
465058561cbSjbeck 			continue;
466058561cbSjbeck 
467058561cbSjbeck 		rebuild_set = true;
468058561cbSjbeck 
469058561cbSjbeck 		/* error */
470058561cbSjbeck 		if (rfd < 0)
471058561cbSjbeck 		{
472058561cbSjbeck 			if (errno == EINTR)
473058561cbSjbeck 				continue;
474058561cbSjbeck 			pcnt++;
475058561cbSjbeck 			smi_log(SMI_LOG_ERR,
476058561cbSjbeck 				"%s() failed (%s), %s",
477058561cbSjbeck 				WAITFN, sm_errstring(errno),
478058561cbSjbeck 				pcnt >= MAX_FAILS_S ? "abort" : "try again");
479058561cbSjbeck 
480058561cbSjbeck 			if (pcnt >= MAX_FAILS_S)
481058561cbSjbeck 				goto err;
482058561cbSjbeck 		}
483058561cbSjbeck 		pcnt = 0;
484058561cbSjbeck 
485058561cbSjbeck 		/* something happened */
486058561cbSjbeck 		for (i = 0; i < nfd; i++)
487058561cbSjbeck 		{
488058561cbSjbeck 			if (pfd[i].revents == 0)
489058561cbSjbeck 				continue;
490058561cbSjbeck 
491058561cbSjbeck 			POOL_LEV_DPRINTF(4, ("%s event on pfd[%d/%d]=%d ",
492058561cbSjbeck 				WAITFN, i, nfd,
493058561cbSjbeck 			WAIT_FD(i)));
494058561cbSjbeck 
495058561cbSjbeck 			/* has a worker signaled an end of task ? */
496058561cbSjbeck 			if (WAIT_FD(i) == RD_PIPE)
497058561cbSjbeck 			{
498058561cbSjbeck 				char evt = 0;
499058561cbSjbeck 				int r = 0;
500058561cbSjbeck 
501058561cbSjbeck 				POOL_LEV_DPRINTF(4,
502058561cbSjbeck 					("PIPE WILL READ evt = %08X %08X",
503058561cbSjbeck 					pfd[i].events, pfd[i].revents));
504058561cbSjbeck 
505058561cbSjbeck 				if ((pfd[i].revents & MI_POLL_RD_FLAGS) != 0)
506058561cbSjbeck 				{
507058561cbSjbeck 					r = read(RD_PIPE, &evt, sizeof(evt));
508058561cbSjbeck 					if (r == sizeof(evt))
509058561cbSjbeck 					{
510058561cbSjbeck 						/* Do nothing */
511058561cbSjbeck 					}
512058561cbSjbeck 				}
513058561cbSjbeck 
514058561cbSjbeck 				POOL_LEV_DPRINTF(4,
515058561cbSjbeck 					("PIPE DONE READ i=[%d] fd=[%d] r=[%d] evt=[%d]",
516058561cbSjbeck 					i, RD_PIPE, r, evt));
517058561cbSjbeck 
518058561cbSjbeck 				if ((pfd[i].revents & ~MI_POLL_RD_FLAGS) != 0)
519058561cbSjbeck 				{
520058561cbSjbeck 					/* Exception handling */
521058561cbSjbeck 				}
522058561cbSjbeck 				continue;
523058561cbSjbeck 			}
524058561cbSjbeck 
525058561cbSjbeck 			/* no ! sendmail wants to send a command */
526058561cbSjbeck 			SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link)
527058561cbSjbeck 			{
528058561cbSjbeck 				if (ctx->ctx_wstate != WKST_WAITING)
529058561cbSjbeck 					continue;
530058561cbSjbeck 
531058561cbSjbeck 				POOL_LEV_DPRINTF(4,
532058561cbSjbeck 					("Checking context sd=%d - fd=%d ",
533058561cbSjbeck 					ctx->ctx_sd , WAIT_FD(i)));
534058561cbSjbeck 
535058561cbSjbeck 				if (ctx->ctx_sd == pfd[i].fd)
536058561cbSjbeck 				{
537058561cbSjbeck 					TASKMGR_LOCK();
538058561cbSjbeck 
539058561cbSjbeck 					POOL_LEV_DPRINTF(4,
540058561cbSjbeck 						("TASK: found %d for fd[%d]=%d",
541058561cbSjbeck 						ctx->ctx_sid, i, WAIT_FD(i)));
542058561cbSjbeck 
543058561cbSjbeck 					if (Tskmgr.tm_nb_idle > 0)
544058561cbSjbeck 					{
545058561cbSjbeck 						ctx->ctx_wstate = WKST_READY_TO_RUN;
546058561cbSjbeck 						TASKMGR_COND_SIGNAL();
547058561cbSjbeck 					}
548058561cbSjbeck 					else
549058561cbSjbeck 					{
550058561cbSjbeck 						ctx->ctx_wstate = WKST_RUNNING;
551058561cbSjbeck 						LAUNCH_WORKER(ctx);
552058561cbSjbeck 					}
553058561cbSjbeck 					TASKMGR_UNLOCK();
554058561cbSjbeck 					break;
555058561cbSjbeck 				}
556058561cbSjbeck 			}
557058561cbSjbeck 
558058561cbSjbeck 			POOL_LEV_DPRINTF(4,
559058561cbSjbeck 				("TASK %s FOUND - Checking PIPE for fd[%d]",
560058561cbSjbeck 				ctx != NULL ? "" : "NOT", WAIT_FD(i)));
561058561cbSjbeck 		}
562058561cbSjbeck 	}
563058561cbSjbeck 
564058561cbSjbeck   err:
565058561cbSjbeck 	if (pfd != NULL)
566058561cbSjbeck 		free(pfd);
567058561cbSjbeck 
568058561cbSjbeck 	Tskmgr.tm_signature = 0;
569058561cbSjbeck 	for (;;)
570058561cbSjbeck 	{
571058561cbSjbeck 		SMFICTX_PTR ctx;
572058561cbSjbeck 
573058561cbSjbeck 		ctx = SM_TAILQ_FIRST(&WRK_CTX_HEAD);
574058561cbSjbeck 		if (ctx == NULL)
575058561cbSjbeck 			break;
576058561cbSjbeck 		mi_close_session(ctx);
577058561cbSjbeck 	}
578058561cbSjbeck 
579058561cbSjbeck 	(void) smutex_destroy(&Tskmgr.tm_w_mutex);
580058561cbSjbeck 	(void) scond_destroy(&Tskmgr.tm_w_cond);
581058561cbSjbeck 
582058561cbSjbeck 	return NULL;
583058561cbSjbeck }
584058561cbSjbeck 
585058561cbSjbeck /*
586058561cbSjbeck **  Look for a task ready to run.
587058561cbSjbeck **  Value of ctx is NULL or a pointer to a task ready to run.
588058561cbSjbeck */
589058561cbSjbeck 
590058561cbSjbeck #define GET_TASK_READY_TO_RUN()					\
591058561cbSjbeck 	SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link)		\
592058561cbSjbeck 	{							\
593058561cbSjbeck 		if (ctx->ctx_wstate == WKST_READY_TO_RUN)	\
594058561cbSjbeck 		{						\
595058561cbSjbeck 			ctx->ctx_wstate = WKST_RUNNING;		\
596058561cbSjbeck 			break;					\
597058561cbSjbeck 		}						\
598058561cbSjbeck 	}
599058561cbSjbeck 
600058561cbSjbeck /*
601058561cbSjbeck **  MI_WORKER -- worker thread
602058561cbSjbeck **	executes tasks distributed by the mi_pool_controller
603058561cbSjbeck **	or by mi_start_session
604058561cbSjbeck **
605058561cbSjbeck **	Parameters:
606058561cbSjbeck **		arg -- pointer to context structure
607058561cbSjbeck **
608058561cbSjbeck **	Returns:
609058561cbSjbeck **		NULL pointer
610058561cbSjbeck */
611058561cbSjbeck 
612058561cbSjbeck static void *
mi_worker(arg)613058561cbSjbeck mi_worker(arg)
614058561cbSjbeck 	void *arg;
615058561cbSjbeck {
616058561cbSjbeck 	SMFICTX_PTR ctx;
617058561cbSjbeck 	bool done;
618058561cbSjbeck 	sthread_t t_id;
619058561cbSjbeck 	int r;
620058561cbSjbeck 
621058561cbSjbeck 	ctx = (SMFICTX_PTR) arg;
622058561cbSjbeck 	done = false;
623058561cbSjbeck 	if (ctx != NULL)
624058561cbSjbeck 		ctx->ctx_wstate = WKST_RUNNING;
625058561cbSjbeck 
626058561cbSjbeck 	t_id = sthread_get_id();
627058561cbSjbeck 	if (pthread_detach(t_id) != 0)
628058561cbSjbeck 	{
629058561cbSjbeck 		smi_log(SMI_LOG_ERR, "Failed to detach worker thread");
630058561cbSjbeck 		if (ctx != NULL)
631058561cbSjbeck 			ctx->ctx_wstate = WKST_READY_TO_RUN;
632058561cbSjbeck 		return NULL;
633058561cbSjbeck 	}
634058561cbSjbeck 
635058561cbSjbeck 	TASKMGR_LOCK();
636058561cbSjbeck 	Tskmgr.tm_nb_workers++;
637058561cbSjbeck 	TASKMGR_UNLOCK();
638058561cbSjbeck 
639058561cbSjbeck 	while (!done)
640058561cbSjbeck 	{
641058561cbSjbeck 		if (mi_stop() != MILTER_CONT)
642058561cbSjbeck 			break;
643058561cbSjbeck 
644058561cbSjbeck 		/* let's handle next task... */
645058561cbSjbeck 		if (ctx != NULL)
646058561cbSjbeck 		{
647058561cbSjbeck 			int res;
648058561cbSjbeck 
649058561cbSjbeck 			POOL_LEV_DPRINTF(4,
650058561cbSjbeck 				("worker %d: new task -> let's handle it",
651058561cbSjbeck 				t_id));
652058561cbSjbeck 			res = mi_engine(ctx);
653058561cbSjbeck 			POOL_LEV_DPRINTF(4,
654058561cbSjbeck 				("worker %d: mi_engine returned %d", t_id, res));
655058561cbSjbeck 
656058561cbSjbeck 			TASKMGR_LOCK();
657058561cbSjbeck 			if (res != MI_CONTINUE)
658058561cbSjbeck 			{
659058561cbSjbeck 				ctx->ctx_wstate = WKST_CLOSING;
660058561cbSjbeck 
661058561cbSjbeck 				/*
662058561cbSjbeck 				**  Delete context from linked list of
663058561cbSjbeck 				**  sessions and close session.
664058561cbSjbeck 				*/
665058561cbSjbeck 
666058561cbSjbeck 				mi_close_session(ctx);
667058561cbSjbeck 			}
668058561cbSjbeck 			else
669058561cbSjbeck 			{
670058561cbSjbeck 				ctx->ctx_wstate = WKST_READY_TO_WAIT;
671058561cbSjbeck 
672058561cbSjbeck 				POOL_LEV_DPRINTF(4,
673058561cbSjbeck 					("writing to event pipe..."));
674058561cbSjbeck 
675058561cbSjbeck 				/*
676058561cbSjbeck 				**  Signal task controller to add new session
677058561cbSjbeck 				**  to poll set.
678058561cbSjbeck 				*/
679058561cbSjbeck 
680058561cbSjbeck 				PIPE_SEND_SIGNAL();
681058561cbSjbeck 			}
682058561cbSjbeck 			TASKMGR_UNLOCK();
683058561cbSjbeck 			ctx = NULL;
684058561cbSjbeck 
685058561cbSjbeck 		}
686058561cbSjbeck 
687058561cbSjbeck 		/* check if there is any task waiting to be served */
688058561cbSjbeck 		TASKMGR_LOCK();
689058561cbSjbeck 
690058561cbSjbeck 		GET_TASK_READY_TO_RUN();
691058561cbSjbeck 
692058561cbSjbeck 		/* Got a task? */
693058561cbSjbeck 		if (ctx != NULL)
694058561cbSjbeck 		{
695058561cbSjbeck 			TASKMGR_UNLOCK();
696058561cbSjbeck 			continue;
697058561cbSjbeck 		}
698058561cbSjbeck 
699058561cbSjbeck 		/*
700058561cbSjbeck 		**  if not, let's check if there is enough idle workers
701058561cbSjbeck 		**	if yes: quit
702058561cbSjbeck 		*/
703058561cbSjbeck 
704058561cbSjbeck 		if (Tskmgr.tm_nb_workers > MIN_WORKERS &&
705058561cbSjbeck 		    Tskmgr.tm_nb_idle > MIN_IDLE)
706058561cbSjbeck 			done = true;
707058561cbSjbeck 
708058561cbSjbeck 		POOL_LEV_DPRINTF(4, ("worker %d: checking ... %d %d", t_id,
709058561cbSjbeck 			Tskmgr.tm_nb_workers, Tskmgr.tm_nb_idle + 1));
710058561cbSjbeck 
711058561cbSjbeck 		if (done)
712058561cbSjbeck 		{
713058561cbSjbeck 			POOL_LEV_DPRINTF(4, ("worker %d: quitting... ", t_id));
714058561cbSjbeck 			Tskmgr.tm_nb_workers--;
715058561cbSjbeck 			TASKMGR_UNLOCK();
716058561cbSjbeck 			continue;
717058561cbSjbeck 		}
718058561cbSjbeck 
719058561cbSjbeck 		/*
720058561cbSjbeck 		**  if no task ready to run, wait for another one
721058561cbSjbeck 		*/
722058561cbSjbeck 
723058561cbSjbeck 		Tskmgr.tm_nb_idle++;
724058561cbSjbeck 		TASKMGR_COND_WAIT();
725058561cbSjbeck 		Tskmgr.tm_nb_idle--;
726058561cbSjbeck 
727058561cbSjbeck 		/* look for a task */
728058561cbSjbeck 		GET_TASK_READY_TO_RUN();
729058561cbSjbeck 
730058561cbSjbeck 		TASKMGR_UNLOCK();
731058561cbSjbeck 	}
732058561cbSjbeck 	return NULL;
733058561cbSjbeck }
734058561cbSjbeck 
735058561cbSjbeck /*
736058561cbSjbeck **  MI_LIST_ADD_CTX -- add new session to linked list
737058561cbSjbeck **
738058561cbSjbeck **	Parameters:
739058561cbSjbeck **		ctx -- context structure
740058561cbSjbeck **
741058561cbSjbeck **	Returns:
742058561cbSjbeck **		MI_FAILURE/MI_SUCCESS
743058561cbSjbeck */
744058561cbSjbeck 
745058561cbSjbeck static int
mi_list_add_ctx(ctx)746058561cbSjbeck mi_list_add_ctx(ctx)
747058561cbSjbeck 	SMFICTX_PTR ctx;
748058561cbSjbeck {
749058561cbSjbeck 	SM_ASSERT(ctx != NULL);
750058561cbSjbeck 	SM_TAILQ_INSERT_TAIL(&WRK_CTX_HEAD, ctx, ctx_link);
751058561cbSjbeck 	return MI_SUCCESS;
752058561cbSjbeck }
753058561cbSjbeck 
754058561cbSjbeck /*
755058561cbSjbeck **  MI_LIST_DEL_CTX -- remove session from linked list when finished
756058561cbSjbeck **
757058561cbSjbeck **	Parameters:
758058561cbSjbeck **		ctx -- context structure
759058561cbSjbeck **
760058561cbSjbeck **	Returns:
761058561cbSjbeck **		MI_FAILURE/MI_SUCCESS
762058561cbSjbeck */
763058561cbSjbeck 
764058561cbSjbeck static int
mi_list_del_ctx(ctx)765058561cbSjbeck mi_list_del_ctx(ctx)
766058561cbSjbeck 	SMFICTX_PTR ctx;
767058561cbSjbeck {
768058561cbSjbeck 	SM_ASSERT(ctx != NULL);
769058561cbSjbeck 	if (SM_TAILQ_EMPTY(&WRK_CTX_HEAD))
770058561cbSjbeck 		return MI_FAILURE;
771058561cbSjbeck 
772058561cbSjbeck 	SM_TAILQ_REMOVE(&WRK_CTX_HEAD, ctx, ctx_link);
773058561cbSjbeck 	return MI_SUCCESS;
774058561cbSjbeck }
775058561cbSjbeck #endif /* _FFR_WORKERS_POOL */
776