134709573Sraf /*
234709573Sraf  * CDDL HEADER START
334709573Sraf  *
434709573Sraf  * The contents of this file are subject to the terms of the
534709573Sraf  * Common Development and Distribution License (the "License").
634709573Sraf  * You may not use this file except in compliance with the License.
734709573Sraf  *
834709573Sraf  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
934709573Sraf  * or http://www.opensolaris.org/os/licensing.
1034709573Sraf  * See the License for the specific language governing permissions
1134709573Sraf  * and limitations under the License.
1234709573Sraf  *
1334709573Sraf  * When distributing Covered Code, include this CDDL HEADER in each
1434709573Sraf  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
1534709573Sraf  * If applicable, add the following below this CDDL HEADER, with the
1634709573Sraf  * fields enclosed by brackets "[]" replaced with your own identifying
1734709573Sraf  * information: Portions Copyright [yyyy] [name of copyright owner]
1834709573Sraf  *
1934709573Sraf  * CDDL HEADER END
2034709573Sraf  */
2134709573Sraf 
2234709573Sraf /*
23a574db85Sraf  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
2434709573Sraf  * Use is subject to license terms.
2534709573Sraf  */
2634709573Sraf 
27*7257d1b4Sraf #include "lint.h"
28f841f6adSraf #include "thr_uberdata.h"
2934709573Sraf #include <sys/types.h>
3034709573Sraf #include <pthread.h>
3134709573Sraf #include <unistd.h>
3234709573Sraf #include <stdlib.h>
3334709573Sraf #include <thread.h>
3434709573Sraf #include <pthread.h>
3534709573Sraf #include <synch.h>
3634709573Sraf #include <port.h>
3734709573Sraf #include <signal.h>
3834709573Sraf #include <stdio.h>
3934709573Sraf #include <errno.h>
4034709573Sraf #include <stdarg.h>
4134709573Sraf #include <string.h>
4234709573Sraf #include <sys/aiocb.h>
4334709573Sraf #include <time.h>
4434709573Sraf #include <signal.h>
4534709573Sraf #include <fcntl.h>
4634709573Sraf #include "sigev_thread.h"
4734709573Sraf 
48f841f6adSraf /*
49f841f6adSraf  * There is but one spawner for all aio operations.
50f841f6adSraf  */
51f841f6adSraf thread_communication_data_t *sigev_aio_tcd = NULL;
5234709573Sraf 
5334709573Sraf /*
54f841f6adSraf  * Set non-zero via _RT_DEBUG to enable debugging printf's.
5534709573Sraf  */
56f841f6adSraf static int _rt_debug = 0;
5734709573Sraf 
58f841f6adSraf void
init_sigev_thread(void)59f841f6adSraf init_sigev_thread(void)
6034709573Sraf {
6134709573Sraf 	char *ldebug;
6234709573Sraf 
63f841f6adSraf 	if ((ldebug = getenv("_RT_DEBUG")) != NULL)
64f841f6adSraf 		_rt_debug = atoi(ldebug);
6534709573Sraf }
6634709573Sraf 
6734709573Sraf /*
6834709573Sraf  * Routine to print debug messages:
69f841f6adSraf  * If _rt_debug is set, printf the debug message to stderr
7034709573Sraf  * with an appropriate prefix.
7134709573Sraf  */
7234709573Sraf /*PRINTFLIKE1*/
7334709573Sraf static void
dprintf(const char * format,...)7434709573Sraf dprintf(const char *format, ...)
7534709573Sraf {
76f841f6adSraf 	if (_rt_debug) {
7734709573Sraf 		va_list alist;
7834709573Sraf 
7934709573Sraf 		va_start(alist, format);
8034709573Sraf 		flockfile(stderr);
81a574db85Sraf 		pthread_cleanup_push(funlockfile, stderr);
82f841f6adSraf 		(void) fputs("DEBUG: ", stderr);
8334709573Sraf 		(void) vfprintf(stderr, format, alist);
84a574db85Sraf 		pthread_cleanup_pop(1);		/* funlockfile(stderr) */
8534709573Sraf 		va_end(alist);
8634709573Sraf 	}
8734709573Sraf }
8834709573Sraf 
8934709573Sraf /*
9034709573Sraf  * The notify_thread() function can be used as the start function of a new
9134709573Sraf  * thread but it is normally called from notifier(), below, in the context
9234709573Sraf  * of a thread pool worker thread.  It is used as the start function of a
9334709573Sraf  * new thread only when individual pthread attributes differ from those
9434709573Sraf  * that are common to all workers.  This only occurs in the AIO case.
9534709573Sraf  */
9634709573Sraf static void *
notify_thread(void * arg)9734709573Sraf notify_thread(void *arg)
9834709573Sraf {
9934709573Sraf 	sigev_thread_data_t *stdp = arg;
10034709573Sraf 	void (*function)(union sigval) = stdp->std_func;
10134709573Sraf 	union sigval argument = stdp->std_arg;
10234709573Sraf 
103f841f6adSraf 	lfree(stdp, sizeof (*stdp));
10434709573Sraf 	function(argument);
10534709573Sraf 	return (NULL);
10634709573Sraf }
10734709573Sraf 
10834709573Sraf /*
10934709573Sraf  * Thread pool interface to call the user-supplied notification function.
11034709573Sraf  */
11134709573Sraf static void
notifier(void * arg)11234709573Sraf notifier(void *arg)
11334709573Sraf {
11434709573Sraf 	(void) notify_thread(arg);
11534709573Sraf }
11634709573Sraf 
11734709573Sraf /*
11834709573Sraf  * This routine adds a new work request, described by function
11934709573Sraf  * and argument, to the list of outstanding jobs.
12034709573Sraf  * It returns 0 indicating success.  A value != 0 indicates an error.
12134709573Sraf  */
12234709573Sraf static int
sigev_add_work(thread_communication_data_t * tcdp,void (* function)(union sigval),union sigval argument)12334709573Sraf sigev_add_work(thread_communication_data_t *tcdp,
12434709573Sraf 	void (*function)(union sigval), union sigval argument)
12534709573Sraf {
12634709573Sraf 	tpool_t *tpool = tcdp->tcd_poolp;
12734709573Sraf 	sigev_thread_data_t *stdp;
12834709573Sraf 
12934709573Sraf 	if (tpool == NULL)
13034709573Sraf 		return (EINVAL);
131f841f6adSraf 	if ((stdp = lmalloc(sizeof (*stdp))) == NULL)
13234709573Sraf 		return (errno);
13334709573Sraf 	stdp->std_func = function;
13434709573Sraf 	stdp->std_arg = argument;
13534709573Sraf 	if (tpool_dispatch(tpool, notifier, stdp) != 0) {
136f841f6adSraf 		lfree(stdp, sizeof (*stdp));
13734709573Sraf 		return (errno);
13834709573Sraf 	}
13934709573Sraf 	return (0);
14034709573Sraf }
14134709573Sraf 
14234709573Sraf static void
sigev_destroy_pool(thread_communication_data_t * tcdp)14334709573Sraf sigev_destroy_pool(thread_communication_data_t *tcdp)
14434709573Sraf {
14534709573Sraf 	if (tcdp->tcd_poolp != NULL)
14634709573Sraf 		tpool_abandon(tcdp->tcd_poolp);
14734709573Sraf 	tcdp->tcd_poolp = NULL;
14834709573Sraf 
14934709573Sraf 	if (tcdp->tcd_subsystem == MQ) {
15034709573Sraf 		/*
15134709573Sraf 		 * synchronize with del_sigev_mq()
15234709573Sraf 		 */
153f841f6adSraf 		sig_mutex_lock(&tcdp->tcd_lock);
15434709573Sraf 		tcdp->tcd_server_id = 0;
15534709573Sraf 		if (tcdp->tcd_msg_closing) {
15634709573Sraf 			(void) cond_broadcast(&tcdp->tcd_cv);
157f841f6adSraf 			sig_mutex_unlock(&tcdp->tcd_lock);
15834709573Sraf 			return;		/* del_sigev_mq() will free the tcd */
15934709573Sraf 		}
160f841f6adSraf 		sig_mutex_unlock(&tcdp->tcd_lock);
16134709573Sraf 	}
16234709573Sraf 
16334709573Sraf 	/*
16434709573Sraf 	 * now delete everything
16534709573Sraf 	 */
16634709573Sraf 	free_sigev_handler(tcdp);
16734709573Sraf }
16834709573Sraf 
16934709573Sraf /*
17034709573Sraf  * timer_spawner(), mqueue_spawner(), and aio_spawner() are the main
17134709573Sraf  * functions for the daemon threads that get the event(s) for the
17234709573Sraf  * respective SIGEV_THREAD subsystems.  There is one timer spawner for
17334709573Sraf  * each timer_create(), one mqueue spawner for every mq_open(), and
17434709573Sraf  * exactly one aio spawner for all aio requests.  These spawners add
17534709573Sraf  * work requests to be done by a pool of daemon worker threads.  In case
17634709573Sraf  * the event requires creation of a worker thread with different pthread
17734709573Sraf  * attributes than those from the pool of workers, a new daemon thread
17834709573Sraf  * with these attributes is spawned apart from the pool of workers.
17934709573Sraf  * If the spawner fails to add work or fails to create an additional
18034709573Sraf  * thread because of lacking resources, it puts the event back into
18134709573Sraf  * the kernel queue and re-tries some time later.
18234709573Sraf  */
18334709573Sraf 
18434709573Sraf void *
timer_spawner(void * arg)18534709573Sraf timer_spawner(void *arg)
18634709573Sraf {
18734709573Sraf 	thread_communication_data_t *tcdp = (thread_communication_data_t *)arg;
18834709573Sraf 	port_event_t port_event;
18934709573Sraf 
19034709573Sraf 	/* destroy the pool if we are cancelled */
19134709573Sraf 	pthread_cleanup_push(sigev_destroy_pool, tcdp);
19234709573Sraf 
19334709573Sraf 	for (;;) {
19434709573Sraf 		if (port_get(tcdp->tcd_port, &port_event, NULL) != 0) {
19534709573Sraf 			dprintf("port_get on port %d failed with %d <%s>\n",
19634709573Sraf 			    tcdp->tcd_port, errno, strerror(errno));
19734709573Sraf 			break;
19834709573Sraf 		}
19934709573Sraf 		switch (port_event.portev_source) {
20034709573Sraf 		case PORT_SOURCE_TIMER:
20134709573Sraf 			break;
20234709573Sraf 		case PORT_SOURCE_ALERT:
20334709573Sraf 			if (port_event.portev_events != SIGEV_THREAD_TERM)
20434709573Sraf 				errno = EPROTO;
20534709573Sraf 			goto out;
20634709573Sraf 		default:
20734709573Sraf 			dprintf("port_get on port %d returned %u "
20834709573Sraf 			    "(not PORT_SOURCE_TIMER)\n",
20934709573Sraf 			    tcdp->tcd_port, port_event.portev_source);
21034709573Sraf 			errno = EPROTO;
21134709573Sraf 			goto out;
21234709573Sraf 		}
21334709573Sraf 
21434709573Sraf 		tcdp->tcd_overruns = port_event.portev_events - 1;
21534709573Sraf 		if (sigev_add_work(tcdp,
21634709573Sraf 		    tcdp->tcd_notif.sigev_notify_function,
21734709573Sraf 		    tcdp->tcd_notif.sigev_value) != 0)
21834709573Sraf 			break;
21934709573Sraf 		/* wait until job is done before looking for another */
22034709573Sraf 		tpool_wait(tcdp->tcd_poolp);
22134709573Sraf 	}
22234709573Sraf out:
22334709573Sraf 	pthread_cleanup_pop(1);
22434709573Sraf 	return (NULL);
22534709573Sraf }
22634709573Sraf 
22734709573Sraf void *
mqueue_spawner(void * arg)22834709573Sraf mqueue_spawner(void *arg)
22934709573Sraf {
23034709573Sraf 	thread_communication_data_t *tcdp = (thread_communication_data_t *)arg;
23134709573Sraf 	int ret = 0;
23234709573Sraf 	int ntype;
23334709573Sraf 	void (*function)(union sigval);
23434709573Sraf 	union sigval argument;
23534709573Sraf 
23634709573Sraf 	/* destroy the pool if we are cancelled */
23734709573Sraf 	pthread_cleanup_push(sigev_destroy_pool, tcdp);
23834709573Sraf 
23934709573Sraf 	while (ret == 0) {
240f841f6adSraf 		sig_mutex_lock(&tcdp->tcd_lock);
241f841f6adSraf 		pthread_cleanup_push(sig_mutex_unlock, &tcdp->tcd_lock);
24234709573Sraf 		while ((ntype = tcdp->tcd_msg_enabled) == 0)
243f841f6adSraf 			(void) sig_cond_wait(&tcdp->tcd_cv, &tcdp->tcd_lock);
24434709573Sraf 		pthread_cleanup_pop(1);
24534709573Sraf 
24634709573Sraf 		while (sem_wait(tcdp->tcd_msg_avail) == -1)
24734709573Sraf 			continue;
24834709573Sraf 
249f841f6adSraf 		sig_mutex_lock(&tcdp->tcd_lock);
25034709573Sraf 		tcdp->tcd_msg_enabled = 0;
251f841f6adSraf 		sig_mutex_unlock(&tcdp->tcd_lock);
25234709573Sraf 
25334709573Sraf 		/* ASSERT(ntype == SIGEV_THREAD || ntype == SIGEV_PORT); */
25434709573Sraf 		if (ntype == SIGEV_THREAD) {
25534709573Sraf 			function = tcdp->tcd_notif.sigev_notify_function;
25634709573Sraf 			argument.sival_ptr = tcdp->tcd_msg_userval;
25734709573Sraf 			ret = sigev_add_work(tcdp, function, argument);
25834709573Sraf 		} else {	/* ntype == SIGEV_PORT */
25934709573Sraf 			ret = _port_dispatch(tcdp->tcd_port, 0, PORT_SOURCE_MQ,
26034709573Sraf 			    0, (uintptr_t)tcdp->tcd_msg_object,
26134709573Sraf 			    tcdp->tcd_msg_userval);
26234709573Sraf 		}
26334709573Sraf 	}
264f841f6adSraf 	sig_mutex_unlock(&tcdp->tcd_lock);
26534709573Sraf 
26634709573Sraf 	pthread_cleanup_pop(1);
26734709573Sraf 	return (NULL);
26834709573Sraf }
26934709573Sraf 
27034709573Sraf void *
aio_spawner(void * arg)27134709573Sraf aio_spawner(void *arg)
27234709573Sraf {
27334709573Sraf 	thread_communication_data_t *tcdp = (thread_communication_data_t *)arg;
27434709573Sraf 	int error = 0;
27534709573Sraf 	void (*function)(union sigval);
27634709573Sraf 	union sigval argument;
27734709573Sraf 	port_event_t port_event;
27834709573Sraf 	struct sigevent *sigevp;
27934709573Sraf 	timespec_t delta;
28034709573Sraf 	pthread_attr_t *attrp;
28134709573Sraf 
28234709573Sraf 	/* destroy the pool if we are cancelled */
28334709573Sraf 	pthread_cleanup_push(sigev_destroy_pool, tcdp);
28434709573Sraf 
28534709573Sraf 	while (error == 0) {
28634709573Sraf 		if (port_get(tcdp->tcd_port, &port_event, NULL) != 0) {
28734709573Sraf 			error = errno;
28834709573Sraf 			dprintf("port_get on port %d failed with %d <%s>\n",
28934709573Sraf 			    tcdp->tcd_port, error, strerror(error));
29034709573Sraf 			break;
29134709573Sraf 		}
29234709573Sraf 		switch (port_event.portev_source) {
29334709573Sraf 		case PORT_SOURCE_AIO:
29434709573Sraf 			break;
29534709573Sraf 		case PORT_SOURCE_ALERT:
29634709573Sraf 			if (port_event.portev_events != SIGEV_THREAD_TERM)
29734709573Sraf 				errno = EPROTO;
29834709573Sraf 			goto out;
29934709573Sraf 		default:
30034709573Sraf 			dprintf("port_get on port %d returned %u "
30134709573Sraf 			    "(not PORT_SOURCE_AIO)\n",
30234709573Sraf 			    tcdp->tcd_port, port_event.portev_source);
30334709573Sraf 			errno = EPROTO;
30434709573Sraf 			goto out;
30534709573Sraf 		}
30634709573Sraf 		argument.sival_ptr = port_event.portev_user;
30734709573Sraf 		switch (port_event.portev_events) {
30834709573Sraf 		case AIOLIO:
30934709573Sraf #if !defined(_LP64)
31034709573Sraf 		case AIOLIO64:
31134709573Sraf #endif
31234709573Sraf 			sigevp = (struct sigevent *)port_event.portev_object;
31334709573Sraf 			function = sigevp->sigev_notify_function;
31434709573Sraf 			attrp = sigevp->sigev_notify_attributes;
31534709573Sraf 			break;
31634709573Sraf 		case AIOAREAD:
31734709573Sraf 		case AIOAWRITE:
31834709573Sraf 		case AIOFSYNC:
319a574db85Sraf 			{
32034709573Sraf 			aiocb_t *aiocbp =
32134709573Sraf 			    (aiocb_t *)port_event.portev_object;
32234709573Sraf 			function = aiocbp->aio_sigevent.sigev_notify_function;
32334709573Sraf 			attrp = aiocbp->aio_sigevent.sigev_notify_attributes;
32434709573Sraf 			break;
325a574db85Sraf 			}
32634709573Sraf #if !defined(_LP64)
32734709573Sraf 		case AIOAREAD64:
32834709573Sraf 		case AIOAWRITE64:
32934709573Sraf 		case AIOFSYNC64:
330a574db85Sraf 			{
33134709573Sraf 			aiocb64_t *aiocbp =
33234709573Sraf 			    (aiocb64_t *)port_event.portev_object;
33334709573Sraf 			function = aiocbp->aio_sigevent.sigev_notify_function;
33434709573Sraf 			attrp = aiocbp->aio_sigevent.sigev_notify_attributes;
33534709573Sraf 			break;
336a574db85Sraf 			}
33734709573Sraf #endif
33834709573Sraf 		default:
33934709573Sraf 			function = NULL;
34034709573Sraf 			attrp = NULL;
34134709573Sraf 			break;
34234709573Sraf 		}
34334709573Sraf 
34434709573Sraf 		if (function == NULL)
34534709573Sraf 			error = EINVAL;
346*7257d1b4Sraf 		else if (pthread_attr_equal(attrp, tcdp->tcd_attrp))
34734709573Sraf 			error = sigev_add_work(tcdp, function, argument);
34834709573Sraf 		else {
34934709573Sraf 			/*
35034709573Sraf 			 * The attributes don't match.
35134709573Sraf 			 * Spawn a thread with the non-matching attributes.
35234709573Sraf 			 */
35334709573Sraf 			pthread_attr_t local_attr;
35434709573Sraf 			sigev_thread_data_t *stdp;
35534709573Sraf 
356f841f6adSraf 			if ((stdp = lmalloc(sizeof (*stdp))) == NULL)
35734709573Sraf 				error = ENOMEM;
35834709573Sraf 			else
359*7257d1b4Sraf 				error = pthread_attr_clone(&local_attr, attrp);
36034709573Sraf 
36134709573Sraf 			if (error == 0) {
36234709573Sraf 				(void) pthread_attr_setdetachstate(
36334709573Sraf 				    &local_attr, PTHREAD_CREATE_DETACHED);
364*7257d1b4Sraf 				(void) pthread_attr_setdaemonstate_np(
36534709573Sraf 				    &local_attr, PTHREAD_CREATE_DAEMON_NP);
36634709573Sraf 				stdp->std_func = function;
36734709573Sraf 				stdp->std_arg = argument;
36834709573Sraf 				error = pthread_create(NULL, &local_attr,
36934709573Sraf 				    notify_thread, stdp);
37034709573Sraf 				(void) pthread_attr_destroy(&local_attr);
37134709573Sraf 			}
37234709573Sraf 			if (error && stdp != NULL)
373f841f6adSraf 				lfree(stdp, sizeof (*stdp));
37434709573Sraf 		}
37534709573Sraf 
37634709573Sraf 		if (error) {
37734709573Sraf 			dprintf("Cannot add work, error=%d <%s>.\n",
37834709573Sraf 			    error, strerror(error));
37934709573Sraf 			if (error == EAGAIN || error == ENOMEM) {
38034709573Sraf 				/* (Temporary) no resources are available. */
38134709573Sraf 				if (_port_dispatch(tcdp->tcd_port, 0,
38234709573Sraf 				    PORT_SOURCE_AIO, port_event.portev_events,
38334709573Sraf 				    port_event.portev_object,
38434709573Sraf 				    port_event.portev_user) != 0)
38534709573Sraf 					break;
38634709573Sraf 				error = 0;
38734709573Sraf 				delta.tv_sec = 0;
38834709573Sraf 				delta.tv_nsec = NANOSEC / 20;	/* 50 msec */
38934709573Sraf 				(void) nanosleep(&delta, NULL);
39034709573Sraf 			}
39134709573Sraf 		}
39234709573Sraf 	}
39334709573Sraf out:
39434709573Sraf 	pthread_cleanup_pop(1);
39534709573Sraf 	return (NULL);
39634709573Sraf }
39734709573Sraf 
39834709573Sraf /*
39934709573Sraf  * Allocate a thread_communication_data_t block.
40034709573Sraf  */
40134709573Sraf static thread_communication_data_t *
alloc_sigev_handler(subsystem_t caller)40234709573Sraf alloc_sigev_handler(subsystem_t caller)
40334709573Sraf {
40434709573Sraf 	thread_communication_data_t *tcdp;
40534709573Sraf 
406f841f6adSraf 	if ((tcdp = lmalloc(sizeof (*tcdp))) != NULL) {
407f841f6adSraf 		tcdp->tcd_subsystem = caller;
408f841f6adSraf 		tcdp->tcd_port = -1;
409f841f6adSraf 		(void) mutex_init(&tcdp->tcd_lock, USYNC_THREAD, NULL);
410f841f6adSraf 		(void) cond_init(&tcdp->tcd_cv, USYNC_THREAD, NULL);
41134709573Sraf 	}
41234709573Sraf 	return (tcdp);
41334709573Sraf }
41434709573Sraf 
41534709573Sraf /*
41634709573Sraf  * Free a thread_communication_data_t block.
41734709573Sraf  */
41834709573Sraf void
free_sigev_handler(thread_communication_data_t * tcdp)41934709573Sraf free_sigev_handler(thread_communication_data_t *tcdp)
42034709573Sraf {
42134709573Sraf 	if (tcdp->tcd_attrp) {
42234709573Sraf 		(void) pthread_attr_destroy(tcdp->tcd_attrp);
42334709573Sraf 		tcdp->tcd_attrp = NULL;
42434709573Sraf 	}
42534709573Sraf 	(void) memset(&tcdp->tcd_notif, 0, sizeof (tcdp->tcd_notif));
42634709573Sraf 
42734709573Sraf 	switch (tcdp->tcd_subsystem) {
42834709573Sraf 	case TIMER:
42934709573Sraf 	case AIO:
43034709573Sraf 		if (tcdp->tcd_port >= 0)
43134709573Sraf 			(void) close(tcdp->tcd_port);
43234709573Sraf 		break;
43334709573Sraf 	case MQ:
43434709573Sraf 		tcdp->tcd_msg_avail = NULL;
43534709573Sraf 		tcdp->tcd_msg_object = NULL;
43634709573Sraf 		tcdp->tcd_msg_userval = NULL;
43734709573Sraf 		tcdp->tcd_msg_enabled = 0;
43834709573Sraf 		break;
43934709573Sraf 	}
44034709573Sraf 
441f841f6adSraf 	lfree(tcdp, sizeof (*tcdp));
44234709573Sraf }
44334709573Sraf 
44434709573Sraf /*
44534709573Sraf  * Initialize data structure and create the port.
44634709573Sraf  */
44734709573Sraf thread_communication_data_t *
setup_sigev_handler(const struct sigevent * sigevp,subsystem_t caller)44834709573Sraf setup_sigev_handler(const struct sigevent *sigevp, subsystem_t caller)
44934709573Sraf {
45034709573Sraf 	thread_communication_data_t *tcdp;
45134709573Sraf 	int error;
45234709573Sraf 
45334709573Sraf 	if (sigevp == NULL) {
45434709573Sraf 		errno = EINVAL;
45534709573Sraf 		return (NULL);
45634709573Sraf 	}
45734709573Sraf 
45834709573Sraf 	if ((tcdp = alloc_sigev_handler(caller)) == NULL) {
45934709573Sraf 		errno = ENOMEM;
46034709573Sraf 		return (NULL);
46134709573Sraf 	}
46234709573Sraf 
46334709573Sraf 	if (sigevp->sigev_notify_attributes == NULL)
46434709573Sraf 		tcdp->tcd_attrp = NULL;		/* default attributes */
46534709573Sraf 	else {
46634709573Sraf 		/*
46734709573Sraf 		 * We cannot just copy the sigevp->sigev_notify_attributes
46834709573Sraf 		 * pointer.  We need to initialize a new pthread_attr_t
46934709573Sraf 		 * structure with the values from the user-supplied
47034709573Sraf 		 * pthread_attr_t.
47134709573Sraf 		 */
47234709573Sraf 		tcdp->tcd_attrp = &tcdp->tcd_user_attr;
473*7257d1b4Sraf 		error = pthread_attr_clone(tcdp->tcd_attrp,
474a574db85Sraf 		    sigevp->sigev_notify_attributes);
47534709573Sraf 		if (error) {
47634709573Sraf 			tcdp->tcd_attrp = NULL;
47734709573Sraf 			free_sigev_handler(tcdp);
47834709573Sraf 			errno = error;
47934709573Sraf 			return (NULL);
48034709573Sraf 		}
48134709573Sraf 	}
48234709573Sraf 	tcdp->tcd_notif = *sigevp;
48334709573Sraf 	tcdp->tcd_notif.sigev_notify_attributes = tcdp->tcd_attrp;
48434709573Sraf 
48534709573Sraf 	if (caller == TIMER || caller == AIO) {
48634709573Sraf 		if ((tcdp->tcd_port = port_create()) < 0 ||
48734709573Sraf 		    fcntl(tcdp->tcd_port, FD_CLOEXEC) == -1) {
48834709573Sraf 			free_sigev_handler(tcdp);
48934709573Sraf 			errno = EBADF;
49034709573Sraf 			return (NULL);
49134709573Sraf 		}
49234709573Sraf 	}
49334709573Sraf 	return (tcdp);
49434709573Sraf }
49534709573Sraf 
49634709573Sraf /*
49734709573Sraf  * Create a thread pool and launch the spawner.
49834709573Sraf  */
49934709573Sraf int
launch_spawner(thread_communication_data_t * tcdp)50034709573Sraf launch_spawner(thread_communication_data_t *tcdp)
50134709573Sraf {
50234709573Sraf 	int ret;
50334709573Sraf 	int maxworkers;
50434709573Sraf 	void *(*spawner)(void *);
50534709573Sraf 	sigset_t set;
50634709573Sraf 	sigset_t oset;
50734709573Sraf 
50834709573Sraf 	switch (tcdp->tcd_subsystem) {
50934709573Sraf 	case TIMER:
51034709573Sraf 		spawner = timer_spawner;
51134709573Sraf 		maxworkers = 1;
51234709573Sraf 		break;
51334709573Sraf 	case MQ:
51434709573Sraf 		spawner = mqueue_spawner;
51534709573Sraf 		maxworkers = 1;
51634709573Sraf 		break;
51734709573Sraf 	case AIO:
51834709573Sraf 		spawner = aio_spawner;
51934709573Sraf 		maxworkers = 100;
52034709573Sraf 		break;
52134709573Sraf 	default:
52234709573Sraf 		return (-1);
52334709573Sraf 	}
52434709573Sraf 	tcdp->tcd_poolp = tpool_create(1, maxworkers, 20,
52534709573Sraf 	    tcdp->tcd_notif.sigev_notify_attributes);
52634709573Sraf 	if (tcdp->tcd_poolp == NULL)
52734709573Sraf 		return (-1);
52834709573Sraf 	/* create the spawner with all signals blocked */
52934709573Sraf 	(void) sigfillset(&set);
53034709573Sraf 	(void) thr_sigsetmask(SIG_SETMASK, &set, &oset);
53134709573Sraf 	ret = thr_create(NULL, 0, spawner, tcdp,
53234709573Sraf 	    THR_DETACHED | THR_DAEMON, &tcdp->tcd_server_id);
53334709573Sraf 	(void) thr_sigsetmask(SIG_SETMASK, &oset, NULL);
53434709573Sraf 	if (ret != 0) {
53534709573Sraf 		tpool_destroy(tcdp->tcd_poolp);
53634709573Sraf 		tcdp->tcd_poolp = NULL;
53734709573Sraf 		return (-1);
53834709573Sraf 	}
53934709573Sraf 	return (0);
54034709573Sraf }
54134709573Sraf 
54234709573Sraf /*
54334709573Sraf  * Delete the data associated with the sigev_thread timer, if timer is
54434709573Sraf  * associated with such a notification option.
54534709573Sraf  * Destroy the timer_spawner thread.
54634709573Sraf  */
54734709573Sraf int
del_sigev_timer(timer_t timer)54834709573Sraf del_sigev_timer(timer_t timer)
54934709573Sraf {
55034709573Sraf 	int rc = 0;
55134709573Sraf 	thread_communication_data_t *tcdp;
55234709573Sraf 
55334709573Sraf 	if ((uint_t)timer < timer_max && (tcdp = timer_tcd[timer]) != NULL) {
554f841f6adSraf 		sig_mutex_lock(&tcdp->tcd_lock);
55534709573Sraf 		if (tcdp->tcd_port >= 0) {
55634709573Sraf 			if ((rc = port_alert(tcdp->tcd_port,
55734709573Sraf 			    PORT_ALERT_SET, SIGEV_THREAD_TERM, NULL)) == 0) {
55834709573Sraf 				dprintf("del_sigev_timer(%d) OK.\n", timer);
55934709573Sraf 			}
56034709573Sraf 		}
56134709573Sraf 		timer_tcd[timer] = NULL;
562f841f6adSraf 		sig_mutex_unlock(&tcdp->tcd_lock);
56334709573Sraf 	}
56434709573Sraf 	return (rc);
56534709573Sraf }
56634709573Sraf 
56734709573Sraf int
sigev_timer_getoverrun(timer_t timer)56834709573Sraf sigev_timer_getoverrun(timer_t timer)
56934709573Sraf {
57034709573Sraf 	thread_communication_data_t *tcdp;
57134709573Sraf 
57234709573Sraf 	if ((uint_t)timer < timer_max && (tcdp = timer_tcd[timer]) != NULL)
57334709573Sraf 		return (tcdp->tcd_overruns);
57434709573Sraf 	return (0);
57534709573Sraf }
57634709573Sraf 
577f841f6adSraf static void
del_sigev_mq_cleanup(thread_communication_data_t * tcdp)578f841f6adSraf del_sigev_mq_cleanup(thread_communication_data_t *tcdp)
579f841f6adSraf {
580f841f6adSraf 	sig_mutex_unlock(&tcdp->tcd_lock);
581f841f6adSraf 	free_sigev_handler(tcdp);
582f841f6adSraf }
583f841f6adSraf 
58434709573Sraf /*
58534709573Sraf  * Delete the data associated with the sigev_thread message queue,
58634709573Sraf  * if the message queue is associated with such a notification option.
58734709573Sraf  * Destroy the mqueue_spawner thread.
58834709573Sraf  */
58934709573Sraf void
del_sigev_mq(thread_communication_data_t * tcdp)59034709573Sraf del_sigev_mq(thread_communication_data_t *tcdp)
59134709573Sraf {
59234709573Sraf 	pthread_t server_id;
59334709573Sraf 	int rc;
59434709573Sraf 
595f841f6adSraf 	sig_mutex_lock(&tcdp->tcd_lock);
596f841f6adSraf 
59734709573Sraf 	server_id = tcdp->tcd_server_id;
59834709573Sraf 	tcdp->tcd_msg_closing = 1;
599f841f6adSraf 	if ((rc = pthread_cancel(server_id)) != 0) {	/* "can't happen" */
600f841f6adSraf 		sig_mutex_unlock(&tcdp->tcd_lock);
60134709573Sraf 		dprintf("Fail to cancel %u with error %d <%s>.\n",
60234709573Sraf 		    server_id, rc, strerror(rc));
603f841f6adSraf 		return;
604f841f6adSraf 	}
605f841f6adSraf 
606f841f6adSraf 	/*
607f841f6adSraf 	 * wait for sigev_destroy_pool() to finish
608f841f6adSraf 	 */
609f841f6adSraf 	pthread_cleanup_push(del_sigev_mq_cleanup, tcdp);
610f841f6adSraf 	while (tcdp->tcd_server_id == server_id)
611f841f6adSraf 		(void) sig_cond_wait(&tcdp->tcd_cv, &tcdp->tcd_lock);
612f841f6adSraf 	pthread_cleanup_pop(1);
613f841f6adSraf }
614f841f6adSraf 
615f841f6adSraf /*
616f841f6adSraf  * POSIX aio:
617f841f6adSraf  * If the notification type is SIGEV_THREAD, set up
618f841f6adSraf  * the port number for notifications.  Create the
619f841f6adSraf  * thread pool and launch the spawner if necessary.
620f841f6adSraf  * If the notification type is not SIGEV_THREAD, do nothing.
621f841f6adSraf  */
622f841f6adSraf int
_aio_sigev_thread_init(struct sigevent * sigevp)623f841f6adSraf _aio_sigev_thread_init(struct sigevent *sigevp)
624f841f6adSraf {
625f841f6adSraf 	static mutex_t sigev_aio_lock = DEFAULTMUTEX;
626f841f6adSraf 	static cond_t sigev_aio_cv = DEFAULTCV;
627f841f6adSraf 	static int sigev_aio_busy = 0;
628f841f6adSraf 
629f841f6adSraf 	thread_communication_data_t *tcdp;
630f841f6adSraf 	int port;
631a574db85Sraf 	int cancel_state;
632f841f6adSraf 	int rc = 0;
633f841f6adSraf 
634f841f6adSraf 	if (sigevp == NULL ||
635f841f6adSraf 	    sigevp->sigev_notify != SIGEV_THREAD ||
636f841f6adSraf 	    sigevp->sigev_notify_function == NULL)
637f841f6adSraf 		return (0);
638f841f6adSraf 
639f841f6adSraf 	lmutex_lock(&sigev_aio_lock);
640a574db85Sraf 	(void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state);
641f841f6adSraf 	while (sigev_aio_busy)
642a574db85Sraf 		(void) cond_wait(&sigev_aio_cv, &sigev_aio_lock);
643a574db85Sraf 	(void) pthread_setcancelstate(cancel_state, NULL);
644f841f6adSraf 	if ((tcdp = sigev_aio_tcd) != NULL)
645f841f6adSraf 		port = tcdp->tcd_port;
646f841f6adSraf 	else {
647f841f6adSraf 		sigev_aio_busy = 1;
648f841f6adSraf 		lmutex_unlock(&sigev_aio_lock);
649f841f6adSraf 
650f841f6adSraf 		tcdp = setup_sigev_handler(sigevp, AIO);
651f841f6adSraf 		if (tcdp == NULL) {
652f841f6adSraf 			port = -1;
653f841f6adSraf 			rc = -1;
654f841f6adSraf 		} else if (launch_spawner(tcdp) != 0) {
655f841f6adSraf 			free_sigev_handler(tcdp);
656f841f6adSraf 			tcdp = NULL;
657f841f6adSraf 			port = -1;
658f841f6adSraf 			rc = -1;
659f841f6adSraf 		} else {
660f841f6adSraf 			port = tcdp->tcd_port;
661f841f6adSraf 		}
662f841f6adSraf 
663f841f6adSraf 		lmutex_lock(&sigev_aio_lock);
664f841f6adSraf 		sigev_aio_tcd = tcdp;
665f841f6adSraf 		sigev_aio_busy = 0;
666f841f6adSraf 		(void) cond_broadcast(&sigev_aio_cv);
667f841f6adSraf 	}
668f841f6adSraf 	lmutex_unlock(&sigev_aio_lock);
669f841f6adSraf 	sigevp->sigev_signo = port;
670f841f6adSraf 	return (rc);
671f841f6adSraf }
672f841f6adSraf 
673f841f6adSraf int
_aio_sigev_thread(aiocb_t * aiocbp)674f841f6adSraf _aio_sigev_thread(aiocb_t *aiocbp)
675f841f6adSraf {
676f841f6adSraf 	if (aiocbp == NULL)
677f841f6adSraf 		return (0);
678f841f6adSraf 	return (_aio_sigev_thread_init(&aiocbp->aio_sigevent));
679f841f6adSraf }
680f841f6adSraf 
681f841f6adSraf #if !defined(_LP64)
682f841f6adSraf int
_aio_sigev_thread64(aiocb64_t * aiocbp)683f841f6adSraf _aio_sigev_thread64(aiocb64_t *aiocbp)
684f841f6adSraf {
685f841f6adSraf 	if (aiocbp == NULL)
686f841f6adSraf 		return (0);
687f841f6adSraf 	return (_aio_sigev_thread_init(&aiocbp->aio_sigevent));
688f841f6adSraf }
689f841f6adSraf #endif
690f841f6adSraf 
691f841f6adSraf /*
692f841f6adSraf  * Cleanup POSIX aio after fork1() in the child process.
693f841f6adSraf  */
694f841f6adSraf void
postfork1_child_sigev_aio(void)695f841f6adSraf postfork1_child_sigev_aio(void)
696f841f6adSraf {
697f841f6adSraf 	thread_communication_data_t *tcdp;
698f841f6adSraf 
699f841f6adSraf 	if ((tcdp = sigev_aio_tcd) != NULL) {
700f841f6adSraf 		sigev_aio_tcd = NULL;
701f841f6adSraf 		tcd_teardown(tcdp);
70234709573Sraf 	}
703f841f6adSraf }
704f841f6adSraf 
705f841f6adSraf /*
706f841f6adSraf  * Utility function for the various postfork1_child_sigev_*() functions.
707f841f6adSraf  * Clean up the tcdp data structure and close the port.
708f841f6adSraf  */
709f841f6adSraf void
tcd_teardown(thread_communication_data_t * tcdp)710f841f6adSraf tcd_teardown(thread_communication_data_t *tcdp)
711f841f6adSraf {
712f841f6adSraf 	if (tcdp->tcd_poolp != NULL)
713f841f6adSraf 		tpool_abandon(tcdp->tcd_poolp);
714f841f6adSraf 	tcdp->tcd_poolp = NULL;
715f841f6adSraf 	tcdp->tcd_server_id = 0;
716f841f6adSraf 	free_sigev_handler(tcdp);
71734709573Sraf }
718