1058561cbSjbeck /*
2*e9af4bc0SJohn Beck * Copyright (c) 2003-2004, 2007, 2009 Sendmail, Inc. and its suppliers.
3058561cbSjbeck * All rights reserved.
4058561cbSjbeck *
5058561cbSjbeck * By using this file, you agree to the terms and conditions set
6058561cbSjbeck * forth in the LICENSE file which can be found at the top level of
7058561cbSjbeck * the sendmail distribution.
8058561cbSjbeck *
9058561cbSjbeck * Contributed by Jose Marcio Martins da Cruz - Ecole des Mines de Paris
10058561cbSjbeck * Jose-Marcio.Martins@ensmp.fr
11058561cbSjbeck */
12058561cbSjbeck
13058561cbSjbeck #include <sm/gen.h>
14*e9af4bc0SJohn Beck SM_RCSID("@(#)$Id: worker.c,v 8.17 2009/06/15 15:34:54 ca Exp $")
15058561cbSjbeck
16058561cbSjbeck #include "libmilter.h"
17058561cbSjbeck
18058561cbSjbeck #if _FFR_WORKERS_POOL
19058561cbSjbeck
20058561cbSjbeck typedef struct taskmgr_S taskmgr_T;
21058561cbSjbeck
22058561cbSjbeck #define TM_SIGNATURE 0x23021957
23058561cbSjbeck
24058561cbSjbeck struct taskmgr_S
25058561cbSjbeck {
26058561cbSjbeck long tm_signature; /* has the controller been initialized */
27058561cbSjbeck sthread_t tm_tid; /* thread id of controller */
28058561cbSjbeck smfi_hd_T tm_ctx_head; /* head of the linked list of contexts */
29058561cbSjbeck
30058561cbSjbeck int tm_nb_workers; /* number of workers in the pool */
31058561cbSjbeck int tm_nb_idle; /* number of workers waiting */
32058561cbSjbeck
33058561cbSjbeck int tm_p[2]; /* poll control pipe */
34058561cbSjbeck
35058561cbSjbeck smutex_t tm_w_mutex; /* linked list access mutex */
36058561cbSjbeck scond_t tm_w_cond; /* */
37058561cbSjbeck };
38058561cbSjbeck
39058561cbSjbeck static taskmgr_T Tskmgr = {0};
40058561cbSjbeck
41058561cbSjbeck #define WRK_CTX_HEAD Tskmgr.tm_ctx_head
42058561cbSjbeck
43058561cbSjbeck #define RD_PIPE (Tskmgr.tm_p[0])
44058561cbSjbeck #define WR_PIPE (Tskmgr.tm_p[1])
45058561cbSjbeck
46058561cbSjbeck #define PIPE_SEND_SIGNAL() \
47058561cbSjbeck do \
48058561cbSjbeck { \
49058561cbSjbeck char evt = 0x5a; \
50058561cbSjbeck int fd = WR_PIPE; \
51058561cbSjbeck if (write(fd, &evt, sizeof(evt)) != sizeof(evt)) \
52058561cbSjbeck smi_log(SMI_LOG_ERR, \
53058561cbSjbeck "Error writing to event pipe: %s", \
54058561cbSjbeck sm_errstring(errno)); \
55058561cbSjbeck } while (0)
56058561cbSjbeck
57058561cbSjbeck #ifndef USE_PIPE_WAKE_POLL
58058561cbSjbeck # define USE_PIPE_WAKE_POLL 1
59058561cbSjbeck #endif /* USE_PIPE_WAKE_POLL */
60058561cbSjbeck
61058561cbSjbeck /* poll check periodicity (default 10000 - 10 s) */
62058561cbSjbeck #define POLL_TIMEOUT 10000
63058561cbSjbeck
64058561cbSjbeck /* worker conditional wait timeout (default 10 s) */
65058561cbSjbeck #define COND_TIMEOUT 10
66058561cbSjbeck
67058561cbSjbeck /* functions */
68058561cbSjbeck static int mi_close_session __P((SMFICTX_PTR));
69058561cbSjbeck
70058561cbSjbeck static void *mi_worker __P((void *));
71058561cbSjbeck static void *mi_pool_controller __P((void *));
72058561cbSjbeck
73058561cbSjbeck static int mi_list_add_ctx __P((SMFICTX_PTR));
74058561cbSjbeck static int mi_list_del_ctx __P((SMFICTX_PTR));
75058561cbSjbeck
76058561cbSjbeck /*
77058561cbSjbeck ** periodicity of cleaning up old sessions (timedout)
78058561cbSjbeck ** sessions list will be checked to find old inactive
79058561cbSjbeck ** sessions each DT_CHECK_OLD_SESSIONS sec
80058561cbSjbeck */
81058561cbSjbeck
82058561cbSjbeck #define DT_CHECK_OLD_SESSIONS 600
83058561cbSjbeck
84058561cbSjbeck #ifndef OLD_SESSION_TIMEOUT
85058561cbSjbeck # define OLD_SESSION_TIMEOUT ctx->ctx_timeout
86058561cbSjbeck #endif /* OLD_SESSION_TIMEOUT */
87058561cbSjbeck
88058561cbSjbeck /* session states - with respect to the pool of workers */
89058561cbSjbeck #define WKST_INIT 0 /* initial state */
90058561cbSjbeck #define WKST_READY_TO_RUN 1 /* command ready do be read */
91058561cbSjbeck #define WKST_RUNNING 2 /* session running on a worker */
92058561cbSjbeck #define WKST_READY_TO_WAIT 3 /* session just finished by a worker */
93058561cbSjbeck #define WKST_WAITING 4 /* waiting for new command */
94058561cbSjbeck #define WKST_CLOSING 5 /* session finished */
95058561cbSjbeck
96058561cbSjbeck #ifndef MIN_WORKERS
97058561cbSjbeck # define MIN_WORKERS 2 /* minimum number of threads to keep around */
98058561cbSjbeck #endif
99058561cbSjbeck
100058561cbSjbeck #define MIN_IDLE 1 /* minimum number of idle threads */
101058561cbSjbeck
102058561cbSjbeck
103058561cbSjbeck /*
104058561cbSjbeck ** Macros for threads and mutex management
105058561cbSjbeck */
106058561cbSjbeck
107058561cbSjbeck #define TASKMGR_LOCK() \
108058561cbSjbeck do \
109058561cbSjbeck { \
110058561cbSjbeck if (!smutex_lock(&Tskmgr.tm_w_mutex)) \
111058561cbSjbeck smi_log(SMI_LOG_ERR, "TASKMGR_LOCK error"); \
112058561cbSjbeck } while (0)
113058561cbSjbeck
114058561cbSjbeck #define TASKMGR_UNLOCK() \
115058561cbSjbeck do \
116058561cbSjbeck { \
117058561cbSjbeck if (!smutex_unlock(&Tskmgr.tm_w_mutex)) \
118058561cbSjbeck smi_log(SMI_LOG_ERR, "TASKMGR_UNLOCK error"); \
119058561cbSjbeck } while (0)
120058561cbSjbeck
121058561cbSjbeck #define TASKMGR_COND_WAIT() \
122058561cbSjbeck scond_timedwait(&Tskmgr.tm_w_cond, &Tskmgr.tm_w_mutex, COND_TIMEOUT)
123058561cbSjbeck
124058561cbSjbeck #define TASKMGR_COND_SIGNAL() \
125058561cbSjbeck do \
126058561cbSjbeck { \
127058561cbSjbeck if (scond_signal(&Tskmgr.tm_w_cond) != 0) \
128058561cbSjbeck smi_log(SMI_LOG_ERR, "TASKMGR_COND_SIGNAL error"); \
129058561cbSjbeck } while (0)
130058561cbSjbeck
131058561cbSjbeck #define LAUNCH_WORKER(ctx) \
132058561cbSjbeck do \
133058561cbSjbeck { \
134058561cbSjbeck int r; \
135058561cbSjbeck sthread_t tid; \
136058561cbSjbeck \
137058561cbSjbeck if ((r = thread_create(&tid, mi_worker, ctx)) != 0) \
138058561cbSjbeck smi_log(SMI_LOG_ERR, "LAUNCH_WORKER error: %s",\
139058561cbSjbeck sm_errstring(r)); \
140058561cbSjbeck } while (0)
141058561cbSjbeck
142058561cbSjbeck #if POOL_DEBUG
143058561cbSjbeck # define POOL_LEV_DPRINTF(lev, x) \
144058561cbSjbeck do { \
145058561cbSjbeck if ((lev) < ctx->ctx_dbg) \
146058561cbSjbeck sm_dprintf x; \
147058561cbSjbeck } while (0)
148058561cbSjbeck #else /* POOL_DEBUG */
149058561cbSjbeck # define POOL_LEV_DPRINTF(lev, x)
150058561cbSjbeck #endif /* POOL_DEBUG */
151058561cbSjbeck
152058561cbSjbeck /*
153058561cbSjbeck ** MI_START_SESSION -- Start a session in the pool of workers
154058561cbSjbeck **
155058561cbSjbeck ** Parameters:
156058561cbSjbeck ** ctx -- context structure
157058561cbSjbeck **
158058561cbSjbeck ** Returns:
159058561cbSjbeck ** MI_SUCCESS/MI_FAILURE
160058561cbSjbeck */
161058561cbSjbeck
162058561cbSjbeck int
mi_start_session(ctx)163058561cbSjbeck mi_start_session(ctx)
164058561cbSjbeck SMFICTX_PTR ctx;
165058561cbSjbeck {
166058561cbSjbeck static long id = 0;
167058561cbSjbeck
168058561cbSjbeck SM_ASSERT(Tskmgr.tm_signature == TM_SIGNATURE);
169058561cbSjbeck SM_ASSERT(ctx != NULL);
170058561cbSjbeck POOL_LEV_DPRINTF(4, ("PIPE r=[%d] w=[%d]", RD_PIPE, WR_PIPE));
171058561cbSjbeck TASKMGR_LOCK();
172058561cbSjbeck
173058561cbSjbeck if (mi_list_add_ctx(ctx) != MI_SUCCESS)
174058561cbSjbeck {
175058561cbSjbeck TASKMGR_UNLOCK();
176058561cbSjbeck return MI_FAILURE;
177058561cbSjbeck }
178058561cbSjbeck
179058561cbSjbeck ctx->ctx_sid = id++;
180058561cbSjbeck
181058561cbSjbeck /* if there is an idle worker, signal it, otherwise start new worker */
182058561cbSjbeck if (Tskmgr.tm_nb_idle > 0)
183058561cbSjbeck {
184058561cbSjbeck ctx->ctx_wstate = WKST_READY_TO_RUN;
185058561cbSjbeck TASKMGR_COND_SIGNAL();
186058561cbSjbeck }
187058561cbSjbeck else
188058561cbSjbeck {
189058561cbSjbeck ctx->ctx_wstate = WKST_RUNNING;
190058561cbSjbeck LAUNCH_WORKER(ctx);
191058561cbSjbeck }
192058561cbSjbeck TASKMGR_UNLOCK();
193058561cbSjbeck return MI_SUCCESS;
194058561cbSjbeck }
195058561cbSjbeck
196058561cbSjbeck /*
197058561cbSjbeck ** MI_CLOSE_SESSION -- Close a session and clean up data structures
198058561cbSjbeck **
199058561cbSjbeck ** Parameters:
200058561cbSjbeck ** ctx -- context structure
201058561cbSjbeck **
202058561cbSjbeck ** Returns:
203058561cbSjbeck ** MI_SUCCESS/MI_FAILURE
204058561cbSjbeck */
205058561cbSjbeck
206058561cbSjbeck static int
mi_close_session(ctx)207058561cbSjbeck mi_close_session(ctx)
208058561cbSjbeck SMFICTX_PTR ctx;
209058561cbSjbeck {
210058561cbSjbeck SM_ASSERT(ctx != NULL);
211058561cbSjbeck
212058561cbSjbeck (void) mi_list_del_ctx(ctx);
213*e9af4bc0SJohn Beck mi_clr_ctx(ctx);
214058561cbSjbeck
215058561cbSjbeck return MI_SUCCESS;
216058561cbSjbeck }
217058561cbSjbeck
218058561cbSjbeck /*
219058561cbSjbeck ** MI_POOL_CONTROLER_INIT -- Launch the worker pool controller
220058561cbSjbeck ** Must be called before starting sessions.
221058561cbSjbeck **
222058561cbSjbeck ** Parameters:
223058561cbSjbeck ** none
224058561cbSjbeck **
225058561cbSjbeck ** Returns:
226058561cbSjbeck ** MI_SUCCESS/MI_FAILURE
227058561cbSjbeck */
228058561cbSjbeck
229058561cbSjbeck int
mi_pool_controller_init()230058561cbSjbeck mi_pool_controller_init()
231058561cbSjbeck {
232058561cbSjbeck sthread_t tid;
233058561cbSjbeck int r, i;
234058561cbSjbeck
235058561cbSjbeck if (Tskmgr.tm_signature == TM_SIGNATURE)
236058561cbSjbeck return MI_SUCCESS;
237058561cbSjbeck
238058561cbSjbeck SM_TAILQ_INIT(&WRK_CTX_HEAD);
239058561cbSjbeck Tskmgr.tm_tid = (sthread_t) -1;
240058561cbSjbeck Tskmgr.tm_nb_workers = 0;
241058561cbSjbeck Tskmgr.tm_nb_idle = 0;
242058561cbSjbeck
243058561cbSjbeck if (pipe(Tskmgr.tm_p) != 0)
244058561cbSjbeck {
245058561cbSjbeck smi_log(SMI_LOG_ERR, "can't create event pipe: %s",
246*e9af4bc0SJohn Beck sm_errstring(errno));
247058561cbSjbeck return MI_FAILURE;
248058561cbSjbeck }
249058561cbSjbeck
250058561cbSjbeck (void) smutex_init(&Tskmgr.tm_w_mutex);
251058561cbSjbeck (void) scond_init(&Tskmgr.tm_w_cond);
252058561cbSjbeck
253058561cbSjbeck /* Launch the pool controller */
254058561cbSjbeck if ((r = thread_create(&tid, mi_pool_controller, (void *) NULL)) != 0)
255058561cbSjbeck {
256058561cbSjbeck smi_log(SMI_LOG_ERR, "can't create controller thread: %s",
257058561cbSjbeck sm_errstring(r));
258058561cbSjbeck return MI_FAILURE;
259058561cbSjbeck }
260058561cbSjbeck Tskmgr.tm_tid = tid;
261058561cbSjbeck Tskmgr.tm_signature = TM_SIGNATURE;
262058561cbSjbeck
263058561cbSjbeck /* Create the pool of workers */
264058561cbSjbeck for (i = 0; i < MIN_WORKERS; i++)
265058561cbSjbeck {
266058561cbSjbeck if ((r = thread_create(&tid, mi_worker, (void *) NULL)) != 0)
267058561cbSjbeck {
268058561cbSjbeck smi_log(SMI_LOG_ERR, "can't create workers crew: %s",
269058561cbSjbeck sm_errstring(r));
270058561cbSjbeck return MI_FAILURE;
271058561cbSjbeck }
272058561cbSjbeck }
273058561cbSjbeck
274058561cbSjbeck return MI_SUCCESS;
275058561cbSjbeck }
276058561cbSjbeck
277058561cbSjbeck /*
278058561cbSjbeck ** MI_POOL_CONTROLLER -- manage the pool of workers
279058561cbSjbeck ** This thread must be running when listener begins
280058561cbSjbeck ** starting sessions
281058561cbSjbeck **
282058561cbSjbeck ** Parameters:
283058561cbSjbeck ** arg -- unused
284058561cbSjbeck **
285058561cbSjbeck ** Returns:
286058561cbSjbeck ** NULL
287058561cbSjbeck **
288058561cbSjbeck ** Control flow:
289058561cbSjbeck ** for (;;)
290058561cbSjbeck ** Look for timed out sessions
291058561cbSjbeck ** Select sessions to wait for sendmail command
292058561cbSjbeck ** Poll set of file descriptors
293058561cbSjbeck ** if timeout
294058561cbSjbeck ** continue
295058561cbSjbeck ** For each file descriptor ready
296058561cbSjbeck ** launch new thread if no worker available
297058561cbSjbeck ** else
298058561cbSjbeck ** signal waiting worker
299058561cbSjbeck */
300058561cbSjbeck
301058561cbSjbeck /* Poll structure array (pollfd) size step */
302058561cbSjbeck #define PFD_STEP 256
303058561cbSjbeck
304058561cbSjbeck #define WAIT_FD(i) (pfd[i].fd)
305058561cbSjbeck #define WAITFN "POLL"
306058561cbSjbeck
307058561cbSjbeck static void *
mi_pool_controller(arg)308058561cbSjbeck mi_pool_controller(arg)
309058561cbSjbeck void *arg;
310058561cbSjbeck {
311058561cbSjbeck struct pollfd *pfd = NULL;
312058561cbSjbeck int dim_pfd = 0;
313058561cbSjbeck bool rebuild_set = true;
314058561cbSjbeck int pcnt = 0; /* error count for poll() failures */
315*e9af4bc0SJohn Beck time_t lastcheck;
316058561cbSjbeck
317058561cbSjbeck Tskmgr.tm_tid = sthread_get_id();
318058561cbSjbeck if (pthread_detach(Tskmgr.tm_tid) != 0)
319058561cbSjbeck {
320058561cbSjbeck smi_log(SMI_LOG_ERR, "Failed to detach pool controller thread");
321058561cbSjbeck return NULL;
322058561cbSjbeck }
323058561cbSjbeck
324058561cbSjbeck pfd = (struct pollfd *) malloc(PFD_STEP * sizeof(struct pollfd));
325058561cbSjbeck if (pfd == NULL)
326058561cbSjbeck {
327058561cbSjbeck smi_log(SMI_LOG_ERR, "Failed to malloc pollfd array: %s",
328058561cbSjbeck sm_errstring(errno));
329058561cbSjbeck return NULL;
330058561cbSjbeck }
331058561cbSjbeck dim_pfd = PFD_STEP;
332058561cbSjbeck
333*e9af4bc0SJohn Beck lastcheck = time(NULL);
334058561cbSjbeck for (;;)
335058561cbSjbeck {
336058561cbSjbeck SMFICTX_PTR ctx;
337058561cbSjbeck int nfd, rfd, i;
338058561cbSjbeck time_t now;
339058561cbSjbeck
340058561cbSjbeck POOL_LEV_DPRINTF(4, ("Let's %s again...", WAITFN));
341058561cbSjbeck
342058561cbSjbeck if (mi_stop() != MILTER_CONT)
343058561cbSjbeck break;
344058561cbSjbeck
345058561cbSjbeck TASKMGR_LOCK();
346058561cbSjbeck
347058561cbSjbeck now = time(NULL);
348058561cbSjbeck
349058561cbSjbeck /* check for timed out sessions? */
350058561cbSjbeck if (lastcheck + DT_CHECK_OLD_SESSIONS < now)
351058561cbSjbeck {
352*e9af4bc0SJohn Beck ctx = SM_TAILQ_FIRST(&WRK_CTX_HEAD);
353*e9af4bc0SJohn Beck while (ctx != SM_TAILQ_END(&WRK_CTX_HEAD))
354058561cbSjbeck {
355*e9af4bc0SJohn Beck SMFICTX_PTR ctx_nxt;
356*e9af4bc0SJohn Beck
357*e9af4bc0SJohn Beck ctx_nxt = SM_TAILQ_NEXT(ctx, ctx_link);
358058561cbSjbeck if (ctx->ctx_wstate == WKST_WAITING)
359058561cbSjbeck {
360058561cbSjbeck if (ctx->ctx_wait == 0)
361058561cbSjbeck ctx->ctx_wait = now;
362*e9af4bc0SJohn Beck else if (ctx->ctx_wait + OLD_SESSION_TIMEOUT
363*e9af4bc0SJohn Beck < now)
364058561cbSjbeck {
365*e9af4bc0SJohn Beck /* if session timed out, close it */
366058561cbSjbeck sfsistat (*fi_close) __P((SMFICTX *));
367058561cbSjbeck
368058561cbSjbeck POOL_LEV_DPRINTF(4,
369058561cbSjbeck ("Closing old connection: sd=%d id=%d",
370058561cbSjbeck ctx->ctx_sd,
371058561cbSjbeck ctx->ctx_sid));
372058561cbSjbeck
373058561cbSjbeck if ((fi_close = ctx->ctx_smfi->xxfi_close) != NULL)
374058561cbSjbeck (void) (*fi_close)(ctx);
375058561cbSjbeck
376058561cbSjbeck mi_close_session(ctx);
377058561cbSjbeck }
378058561cbSjbeck }
379*e9af4bc0SJohn Beck ctx = ctx_nxt;
380058561cbSjbeck }
381058561cbSjbeck lastcheck = now;
382058561cbSjbeck }
383058561cbSjbeck
384058561cbSjbeck if (rebuild_set)
385058561cbSjbeck {
386058561cbSjbeck /*
387058561cbSjbeck ** Initialize poll set.
388058561cbSjbeck ** Insert into the poll set the file descriptors of
389058561cbSjbeck ** all sessions waiting for a command from sendmail.
390058561cbSjbeck */
391058561cbSjbeck
392058561cbSjbeck nfd = 0;
393058561cbSjbeck
394058561cbSjbeck /* begin with worker pipe */
395058561cbSjbeck pfd[nfd].fd = RD_PIPE;
396058561cbSjbeck pfd[nfd].events = MI_POLL_RD_FLAGS;
397058561cbSjbeck pfd[nfd].revents = 0;
398058561cbSjbeck nfd++;
399058561cbSjbeck
400058561cbSjbeck SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link)
401058561cbSjbeck {
402058561cbSjbeck /*
403058561cbSjbeck ** update ctx_wait - start of wait moment -
404058561cbSjbeck ** for timeout
405058561cbSjbeck */
406058561cbSjbeck
407058561cbSjbeck if (ctx->ctx_wstate == WKST_READY_TO_WAIT)
408058561cbSjbeck ctx->ctx_wait = now;
409058561cbSjbeck
410058561cbSjbeck /* add the session to the pollfd array? */
411058561cbSjbeck if ((ctx->ctx_wstate == WKST_READY_TO_WAIT) ||
412058561cbSjbeck (ctx->ctx_wstate == WKST_WAITING))
413058561cbSjbeck {
414058561cbSjbeck /*
415058561cbSjbeck ** Resize the pollfd array if it
416058561cbSjbeck ** isn't large enough.
417058561cbSjbeck */
418058561cbSjbeck
419058561cbSjbeck if (nfd >= dim_pfd)
420058561cbSjbeck {
421058561cbSjbeck struct pollfd *tpfd;
422058561cbSjbeck size_t new;
423058561cbSjbeck
424058561cbSjbeck new = (dim_pfd + PFD_STEP) *
425058561cbSjbeck sizeof(*tpfd);
426058561cbSjbeck tpfd = (struct pollfd *)
427058561cbSjbeck realloc(pfd, new);
428058561cbSjbeck if (tpfd != NULL)
429058561cbSjbeck {
430058561cbSjbeck pfd = tpfd;
431058561cbSjbeck dim_pfd += PFD_STEP;
432058561cbSjbeck }
433058561cbSjbeck else
434058561cbSjbeck {
435058561cbSjbeck smi_log(SMI_LOG_ERR,
436058561cbSjbeck "Failed to realloc pollfd array:%s",
437058561cbSjbeck sm_errstring(errno));
438058561cbSjbeck }
439058561cbSjbeck }
440058561cbSjbeck
441058561cbSjbeck /* add the session to pollfd array */
442058561cbSjbeck if (nfd < dim_pfd)
443058561cbSjbeck {
444058561cbSjbeck ctx->ctx_wstate = WKST_WAITING;
445058561cbSjbeck pfd[nfd].fd = ctx->ctx_sd;
446058561cbSjbeck pfd[nfd].events = MI_POLL_RD_FLAGS;
447058561cbSjbeck pfd[nfd].revents = 0;
448058561cbSjbeck nfd++;
449058561cbSjbeck }
450058561cbSjbeck }
451058561cbSjbeck }
452*e9af4bc0SJohn Beck rebuild_set = false;
453058561cbSjbeck }
454058561cbSjbeck
455058561cbSjbeck TASKMGR_UNLOCK();
456058561cbSjbeck
457058561cbSjbeck /* Everything is ready, let's wait for an event */
458058561cbSjbeck rfd = poll(pfd, nfd, POLL_TIMEOUT);
459058561cbSjbeck
460058561cbSjbeck POOL_LEV_DPRINTF(4, ("%s returned: at epoch %d value %d",
461058561cbSjbeck WAITFN, now, nfd));
462058561cbSjbeck
463058561cbSjbeck /* timeout */
464058561cbSjbeck if (rfd == 0)
465058561cbSjbeck continue;
466058561cbSjbeck
467058561cbSjbeck rebuild_set = true;
468058561cbSjbeck
469058561cbSjbeck /* error */
470058561cbSjbeck if (rfd < 0)
471058561cbSjbeck {
472058561cbSjbeck if (errno == EINTR)
473058561cbSjbeck continue;
474058561cbSjbeck pcnt++;
475058561cbSjbeck smi_log(SMI_LOG_ERR,
476058561cbSjbeck "%s() failed (%s), %s",
477058561cbSjbeck WAITFN, sm_errstring(errno),
478058561cbSjbeck pcnt >= MAX_FAILS_S ? "abort" : "try again");
479058561cbSjbeck
480058561cbSjbeck if (pcnt >= MAX_FAILS_S)
481058561cbSjbeck goto err;
482058561cbSjbeck }
483058561cbSjbeck pcnt = 0;
484058561cbSjbeck
485058561cbSjbeck /* something happened */
486058561cbSjbeck for (i = 0; i < nfd; i++)
487058561cbSjbeck {
488058561cbSjbeck if (pfd[i].revents == 0)
489058561cbSjbeck continue;
490058561cbSjbeck
491058561cbSjbeck POOL_LEV_DPRINTF(4, ("%s event on pfd[%d/%d]=%d ",
492058561cbSjbeck WAITFN, i, nfd,
493058561cbSjbeck WAIT_FD(i)));
494058561cbSjbeck
495058561cbSjbeck /* has a worker signaled an end of task ? */
496058561cbSjbeck if (WAIT_FD(i) == RD_PIPE)
497058561cbSjbeck {
498058561cbSjbeck char evt = 0;
499058561cbSjbeck int r = 0;
500058561cbSjbeck
501058561cbSjbeck POOL_LEV_DPRINTF(4,
502058561cbSjbeck ("PIPE WILL READ evt = %08X %08X",
503058561cbSjbeck pfd[i].events, pfd[i].revents));
504058561cbSjbeck
505058561cbSjbeck if ((pfd[i].revents & MI_POLL_RD_FLAGS) != 0)
506058561cbSjbeck {
507058561cbSjbeck r = read(RD_PIPE, &evt, sizeof(evt));
508058561cbSjbeck if (r == sizeof(evt))
509058561cbSjbeck {
510058561cbSjbeck /* Do nothing */
511058561cbSjbeck }
512058561cbSjbeck }
513058561cbSjbeck
514058561cbSjbeck POOL_LEV_DPRINTF(4,
515058561cbSjbeck ("PIPE DONE READ i=[%d] fd=[%d] r=[%d] evt=[%d]",
516058561cbSjbeck i, RD_PIPE, r, evt));
517058561cbSjbeck
518058561cbSjbeck if ((pfd[i].revents & ~MI_POLL_RD_FLAGS) != 0)
519058561cbSjbeck {
520058561cbSjbeck /* Exception handling */
521058561cbSjbeck }
522058561cbSjbeck continue;
523058561cbSjbeck }
524058561cbSjbeck
525058561cbSjbeck /* no ! sendmail wants to send a command */
526058561cbSjbeck SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link)
527058561cbSjbeck {
528058561cbSjbeck if (ctx->ctx_wstate != WKST_WAITING)
529058561cbSjbeck continue;
530058561cbSjbeck
531058561cbSjbeck POOL_LEV_DPRINTF(4,
532058561cbSjbeck ("Checking context sd=%d - fd=%d ",
533058561cbSjbeck ctx->ctx_sd , WAIT_FD(i)));
534058561cbSjbeck
535058561cbSjbeck if (ctx->ctx_sd == pfd[i].fd)
536058561cbSjbeck {
537058561cbSjbeck TASKMGR_LOCK();
538058561cbSjbeck
539058561cbSjbeck POOL_LEV_DPRINTF(4,
540058561cbSjbeck ("TASK: found %d for fd[%d]=%d",
541058561cbSjbeck ctx->ctx_sid, i, WAIT_FD(i)));
542058561cbSjbeck
543058561cbSjbeck if (Tskmgr.tm_nb_idle > 0)
544058561cbSjbeck {
545058561cbSjbeck ctx->ctx_wstate = WKST_READY_TO_RUN;
546058561cbSjbeck TASKMGR_COND_SIGNAL();
547058561cbSjbeck }
548058561cbSjbeck else
549058561cbSjbeck {
550058561cbSjbeck ctx->ctx_wstate = WKST_RUNNING;
551058561cbSjbeck LAUNCH_WORKER(ctx);
552058561cbSjbeck }
553058561cbSjbeck TASKMGR_UNLOCK();
554058561cbSjbeck break;
555058561cbSjbeck }
556058561cbSjbeck }
557058561cbSjbeck
558058561cbSjbeck POOL_LEV_DPRINTF(4,
559058561cbSjbeck ("TASK %s FOUND - Checking PIPE for fd[%d]",
560058561cbSjbeck ctx != NULL ? "" : "NOT", WAIT_FD(i)));
561058561cbSjbeck }
562058561cbSjbeck }
563058561cbSjbeck
564058561cbSjbeck err:
565058561cbSjbeck if (pfd != NULL)
566058561cbSjbeck free(pfd);
567058561cbSjbeck
568058561cbSjbeck Tskmgr.tm_signature = 0;
569058561cbSjbeck for (;;)
570058561cbSjbeck {
571058561cbSjbeck SMFICTX_PTR ctx;
572058561cbSjbeck
573058561cbSjbeck ctx = SM_TAILQ_FIRST(&WRK_CTX_HEAD);
574058561cbSjbeck if (ctx == NULL)
575058561cbSjbeck break;
576058561cbSjbeck mi_close_session(ctx);
577058561cbSjbeck }
578058561cbSjbeck
579058561cbSjbeck (void) smutex_destroy(&Tskmgr.tm_w_mutex);
580058561cbSjbeck (void) scond_destroy(&Tskmgr.tm_w_cond);
581058561cbSjbeck
582058561cbSjbeck return NULL;
583058561cbSjbeck }
584058561cbSjbeck
585058561cbSjbeck /*
586058561cbSjbeck ** Look for a task ready to run.
587058561cbSjbeck ** Value of ctx is NULL or a pointer to a task ready to run.
588058561cbSjbeck */
589058561cbSjbeck
590058561cbSjbeck #define GET_TASK_READY_TO_RUN() \
591058561cbSjbeck SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link) \
592058561cbSjbeck { \
593058561cbSjbeck if (ctx->ctx_wstate == WKST_READY_TO_RUN) \
594058561cbSjbeck { \
595058561cbSjbeck ctx->ctx_wstate = WKST_RUNNING; \
596058561cbSjbeck break; \
597058561cbSjbeck } \
598058561cbSjbeck }
599058561cbSjbeck
600058561cbSjbeck /*
601058561cbSjbeck ** MI_WORKER -- worker thread
602058561cbSjbeck ** executes tasks distributed by the mi_pool_controller
603058561cbSjbeck ** or by mi_start_session
604058561cbSjbeck **
605058561cbSjbeck ** Parameters:
606058561cbSjbeck ** arg -- pointer to context structure
607058561cbSjbeck **
608058561cbSjbeck ** Returns:
609058561cbSjbeck ** NULL pointer
610058561cbSjbeck */
611058561cbSjbeck
612058561cbSjbeck static void *
mi_worker(arg)613058561cbSjbeck mi_worker(arg)
614058561cbSjbeck void *arg;
615058561cbSjbeck {
616058561cbSjbeck SMFICTX_PTR ctx;
617058561cbSjbeck bool done;
618058561cbSjbeck sthread_t t_id;
619058561cbSjbeck int r;
620058561cbSjbeck
621058561cbSjbeck ctx = (SMFICTX_PTR) arg;
622058561cbSjbeck done = false;
623058561cbSjbeck if (ctx != NULL)
624058561cbSjbeck ctx->ctx_wstate = WKST_RUNNING;
625058561cbSjbeck
626058561cbSjbeck t_id = sthread_get_id();
627058561cbSjbeck if (pthread_detach(t_id) != 0)
628058561cbSjbeck {
629058561cbSjbeck smi_log(SMI_LOG_ERR, "Failed to detach worker thread");
630058561cbSjbeck if (ctx != NULL)
631058561cbSjbeck ctx->ctx_wstate = WKST_READY_TO_RUN;
632058561cbSjbeck return NULL;
633058561cbSjbeck }
634058561cbSjbeck
635058561cbSjbeck TASKMGR_LOCK();
636058561cbSjbeck Tskmgr.tm_nb_workers++;
637058561cbSjbeck TASKMGR_UNLOCK();
638058561cbSjbeck
639058561cbSjbeck while (!done)
640058561cbSjbeck {
641058561cbSjbeck if (mi_stop() != MILTER_CONT)
642058561cbSjbeck break;
643058561cbSjbeck
644058561cbSjbeck /* let's handle next task... */
645058561cbSjbeck if (ctx != NULL)
646058561cbSjbeck {
647058561cbSjbeck int res;
648058561cbSjbeck
649058561cbSjbeck POOL_LEV_DPRINTF(4,
650058561cbSjbeck ("worker %d: new task -> let's handle it",
651058561cbSjbeck t_id));
652058561cbSjbeck res = mi_engine(ctx);
653058561cbSjbeck POOL_LEV_DPRINTF(4,
654058561cbSjbeck ("worker %d: mi_engine returned %d", t_id, res));
655058561cbSjbeck
656058561cbSjbeck TASKMGR_LOCK();
657058561cbSjbeck if (res != MI_CONTINUE)
658058561cbSjbeck {
659058561cbSjbeck ctx->ctx_wstate = WKST_CLOSING;
660058561cbSjbeck
661058561cbSjbeck /*
662058561cbSjbeck ** Delete context from linked list of
663058561cbSjbeck ** sessions and close session.
664058561cbSjbeck */
665058561cbSjbeck
666058561cbSjbeck mi_close_session(ctx);
667058561cbSjbeck }
668058561cbSjbeck else
669058561cbSjbeck {
670058561cbSjbeck ctx->ctx_wstate = WKST_READY_TO_WAIT;
671058561cbSjbeck
672058561cbSjbeck POOL_LEV_DPRINTF(4,
673058561cbSjbeck ("writing to event pipe..."));
674058561cbSjbeck
675058561cbSjbeck /*
676058561cbSjbeck ** Signal task controller to add new session
677058561cbSjbeck ** to poll set.
678058561cbSjbeck */
679058561cbSjbeck
680058561cbSjbeck PIPE_SEND_SIGNAL();
681058561cbSjbeck }
682058561cbSjbeck TASKMGR_UNLOCK();
683058561cbSjbeck ctx = NULL;
684058561cbSjbeck
685058561cbSjbeck }
686058561cbSjbeck
687058561cbSjbeck /* check if there is any task waiting to be served */
688058561cbSjbeck TASKMGR_LOCK();
689058561cbSjbeck
690058561cbSjbeck GET_TASK_READY_TO_RUN();
691058561cbSjbeck
692058561cbSjbeck /* Got a task? */
693058561cbSjbeck if (ctx != NULL)
694058561cbSjbeck {
695058561cbSjbeck TASKMGR_UNLOCK();
696058561cbSjbeck continue;
697058561cbSjbeck }
698058561cbSjbeck
699058561cbSjbeck /*
700058561cbSjbeck ** if not, let's check if there is enough idle workers
701058561cbSjbeck ** if yes: quit
702058561cbSjbeck */
703058561cbSjbeck
704058561cbSjbeck if (Tskmgr.tm_nb_workers > MIN_WORKERS &&
705058561cbSjbeck Tskmgr.tm_nb_idle > MIN_IDLE)
706058561cbSjbeck done = true;
707058561cbSjbeck
708058561cbSjbeck POOL_LEV_DPRINTF(4, ("worker %d: checking ... %d %d", t_id,
709058561cbSjbeck Tskmgr.tm_nb_workers, Tskmgr.tm_nb_idle + 1));
710058561cbSjbeck
711058561cbSjbeck if (done)
712058561cbSjbeck {
713058561cbSjbeck POOL_LEV_DPRINTF(4, ("worker %d: quitting... ", t_id));
714058561cbSjbeck Tskmgr.tm_nb_workers--;
715058561cbSjbeck TASKMGR_UNLOCK();
716058561cbSjbeck continue;
717058561cbSjbeck }
718058561cbSjbeck
719058561cbSjbeck /*
720058561cbSjbeck ** if no task ready to run, wait for another one
721058561cbSjbeck */
722058561cbSjbeck
723058561cbSjbeck Tskmgr.tm_nb_idle++;
724058561cbSjbeck TASKMGR_COND_WAIT();
725058561cbSjbeck Tskmgr.tm_nb_idle--;
726058561cbSjbeck
727058561cbSjbeck /* look for a task */
728058561cbSjbeck GET_TASK_READY_TO_RUN();
729058561cbSjbeck
730058561cbSjbeck TASKMGR_UNLOCK();
731058561cbSjbeck }
732058561cbSjbeck return NULL;
733058561cbSjbeck }
734058561cbSjbeck
735058561cbSjbeck /*
736058561cbSjbeck ** MI_LIST_ADD_CTX -- add new session to linked list
737058561cbSjbeck **
738058561cbSjbeck ** Parameters:
739058561cbSjbeck ** ctx -- context structure
740058561cbSjbeck **
741058561cbSjbeck ** Returns:
742058561cbSjbeck ** MI_FAILURE/MI_SUCCESS
743058561cbSjbeck */
744058561cbSjbeck
745058561cbSjbeck static int
mi_list_add_ctx(ctx)746058561cbSjbeck mi_list_add_ctx(ctx)
747058561cbSjbeck SMFICTX_PTR ctx;
748058561cbSjbeck {
749058561cbSjbeck SM_ASSERT(ctx != NULL);
750058561cbSjbeck SM_TAILQ_INSERT_TAIL(&WRK_CTX_HEAD, ctx, ctx_link);
751058561cbSjbeck return MI_SUCCESS;
752058561cbSjbeck }
753058561cbSjbeck
754058561cbSjbeck /*
755058561cbSjbeck ** MI_LIST_DEL_CTX -- remove session from linked list when finished
756058561cbSjbeck **
757058561cbSjbeck ** Parameters:
758058561cbSjbeck ** ctx -- context structure
759058561cbSjbeck **
760058561cbSjbeck ** Returns:
761058561cbSjbeck ** MI_FAILURE/MI_SUCCESS
762058561cbSjbeck */
763058561cbSjbeck
764058561cbSjbeck static int
mi_list_del_ctx(ctx)765058561cbSjbeck mi_list_del_ctx(ctx)
766058561cbSjbeck SMFICTX_PTR ctx;
767058561cbSjbeck {
768058561cbSjbeck SM_ASSERT(ctx != NULL);
769058561cbSjbeck if (SM_TAILQ_EMPTY(&WRK_CTX_HEAD))
770058561cbSjbeck return MI_FAILURE;
771058561cbSjbeck
772058561cbSjbeck SM_TAILQ_REMOVE(&WRK_CTX_HEAD, ctx, ctx_link);
773058561cbSjbeck return MI_SUCCESS;
774058561cbSjbeck }
775058561cbSjbeck #endif /* _FFR_WORKERS_POOL */
776