134709573Sraf /*
234709573Sraf * CDDL HEADER START
334709573Sraf *
434709573Sraf * The contents of this file are subject to the terms of the
534709573Sraf * Common Development and Distribution License (the "License").
634709573Sraf * You may not use this file except in compliance with the License.
734709573Sraf *
834709573Sraf * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
934709573Sraf * or http://www.opensolaris.org/os/licensing.
1034709573Sraf * See the License for the specific language governing permissions
1134709573Sraf * and limitations under the License.
1234709573Sraf *
1334709573Sraf * When distributing Covered Code, include this CDDL HEADER in each
1434709573Sraf * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
1534709573Sraf * If applicable, add the following below this CDDL HEADER, with the
1634709573Sraf * fields enclosed by brackets "[]" replaced with your own identifying
1734709573Sraf * information: Portions Copyright [yyyy] [name of copyright owner]
1834709573Sraf *
1934709573Sraf * CDDL HEADER END
2034709573Sraf */
2134709573Sraf
2234709573Sraf /*
23a574db85Sraf * Copyright 2008 Sun Microsystems, Inc. All rights reserved.
2434709573Sraf * Use is subject to license terms.
2534709573Sraf */
2634709573Sraf
27*7257d1b4Sraf #include "lint.h"
28f841f6adSraf #include "thr_uberdata.h"
2934709573Sraf #include <sys/types.h>
3034709573Sraf #include <pthread.h>
3134709573Sraf #include <unistd.h>
3234709573Sraf #include <stdlib.h>
3334709573Sraf #include <thread.h>
3434709573Sraf #include <pthread.h>
3534709573Sraf #include <synch.h>
3634709573Sraf #include <port.h>
3734709573Sraf #include <signal.h>
3834709573Sraf #include <stdio.h>
3934709573Sraf #include <errno.h>
4034709573Sraf #include <stdarg.h>
4134709573Sraf #include <string.h>
4234709573Sraf #include <sys/aiocb.h>
4334709573Sraf #include <time.h>
4434709573Sraf #include <signal.h>
4534709573Sraf #include <fcntl.h>
4634709573Sraf #include "sigev_thread.h"
4734709573Sraf
48f841f6adSraf /*
49f841f6adSraf * There is but one spawner for all aio operations.
50f841f6adSraf */
51f841f6adSraf thread_communication_data_t *sigev_aio_tcd = NULL;
5234709573Sraf
5334709573Sraf /*
54f841f6adSraf * Set non-zero via _RT_DEBUG to enable debugging printf's.
5534709573Sraf */
56f841f6adSraf static int _rt_debug = 0;
5734709573Sraf
58f841f6adSraf void
init_sigev_thread(void)59f841f6adSraf init_sigev_thread(void)
6034709573Sraf {
6134709573Sraf char *ldebug;
6234709573Sraf
63f841f6adSraf if ((ldebug = getenv("_RT_DEBUG")) != NULL)
64f841f6adSraf _rt_debug = atoi(ldebug);
6534709573Sraf }
6634709573Sraf
6734709573Sraf /*
6834709573Sraf * Routine to print debug messages:
69f841f6adSraf * If _rt_debug is set, printf the debug message to stderr
7034709573Sraf * with an appropriate prefix.
7134709573Sraf */
7234709573Sraf /*PRINTFLIKE1*/
7334709573Sraf static void
dprintf(const char * format,...)7434709573Sraf dprintf(const char *format, ...)
7534709573Sraf {
76f841f6adSraf if (_rt_debug) {
7734709573Sraf va_list alist;
7834709573Sraf
7934709573Sraf va_start(alist, format);
8034709573Sraf flockfile(stderr);
81a574db85Sraf pthread_cleanup_push(funlockfile, stderr);
82f841f6adSraf (void) fputs("DEBUG: ", stderr);
8334709573Sraf (void) vfprintf(stderr, format, alist);
84a574db85Sraf pthread_cleanup_pop(1); /* funlockfile(stderr) */
8534709573Sraf va_end(alist);
8634709573Sraf }
8734709573Sraf }
8834709573Sraf
8934709573Sraf /*
9034709573Sraf * The notify_thread() function can be used as the start function of a new
9134709573Sraf * thread but it is normally called from notifier(), below, in the context
9234709573Sraf * of a thread pool worker thread. It is used as the start function of a
9334709573Sraf * new thread only when individual pthread attributes differ from those
9434709573Sraf * that are common to all workers. This only occurs in the AIO case.
9534709573Sraf */
9634709573Sraf static void *
notify_thread(void * arg)9734709573Sraf notify_thread(void *arg)
9834709573Sraf {
9934709573Sraf sigev_thread_data_t *stdp = arg;
10034709573Sraf void (*function)(union sigval) = stdp->std_func;
10134709573Sraf union sigval argument = stdp->std_arg;
10234709573Sraf
103f841f6adSraf lfree(stdp, sizeof (*stdp));
10434709573Sraf function(argument);
10534709573Sraf return (NULL);
10634709573Sraf }
10734709573Sraf
10834709573Sraf /*
10934709573Sraf * Thread pool interface to call the user-supplied notification function.
11034709573Sraf */
11134709573Sraf static void
notifier(void * arg)11234709573Sraf notifier(void *arg)
11334709573Sraf {
11434709573Sraf (void) notify_thread(arg);
11534709573Sraf }
11634709573Sraf
11734709573Sraf /*
11834709573Sraf * This routine adds a new work request, described by function
11934709573Sraf * and argument, to the list of outstanding jobs.
12034709573Sraf * It returns 0 indicating success. A value != 0 indicates an error.
12134709573Sraf */
12234709573Sraf static int
sigev_add_work(thread_communication_data_t * tcdp,void (* function)(union sigval),union sigval argument)12334709573Sraf sigev_add_work(thread_communication_data_t *tcdp,
12434709573Sraf void (*function)(union sigval), union sigval argument)
12534709573Sraf {
12634709573Sraf tpool_t *tpool = tcdp->tcd_poolp;
12734709573Sraf sigev_thread_data_t *stdp;
12834709573Sraf
12934709573Sraf if (tpool == NULL)
13034709573Sraf return (EINVAL);
131f841f6adSraf if ((stdp = lmalloc(sizeof (*stdp))) == NULL)
13234709573Sraf return (errno);
13334709573Sraf stdp->std_func = function;
13434709573Sraf stdp->std_arg = argument;
13534709573Sraf if (tpool_dispatch(tpool, notifier, stdp) != 0) {
136f841f6adSraf lfree(stdp, sizeof (*stdp));
13734709573Sraf return (errno);
13834709573Sraf }
13934709573Sraf return (0);
14034709573Sraf }
14134709573Sraf
14234709573Sraf static void
sigev_destroy_pool(thread_communication_data_t * tcdp)14334709573Sraf sigev_destroy_pool(thread_communication_data_t *tcdp)
14434709573Sraf {
14534709573Sraf if (tcdp->tcd_poolp != NULL)
14634709573Sraf tpool_abandon(tcdp->tcd_poolp);
14734709573Sraf tcdp->tcd_poolp = NULL;
14834709573Sraf
14934709573Sraf if (tcdp->tcd_subsystem == MQ) {
15034709573Sraf /*
15134709573Sraf * synchronize with del_sigev_mq()
15234709573Sraf */
153f841f6adSraf sig_mutex_lock(&tcdp->tcd_lock);
15434709573Sraf tcdp->tcd_server_id = 0;
15534709573Sraf if (tcdp->tcd_msg_closing) {
15634709573Sraf (void) cond_broadcast(&tcdp->tcd_cv);
157f841f6adSraf sig_mutex_unlock(&tcdp->tcd_lock);
15834709573Sraf return; /* del_sigev_mq() will free the tcd */
15934709573Sraf }
160f841f6adSraf sig_mutex_unlock(&tcdp->tcd_lock);
16134709573Sraf }
16234709573Sraf
16334709573Sraf /*
16434709573Sraf * now delete everything
16534709573Sraf */
16634709573Sraf free_sigev_handler(tcdp);
16734709573Sraf }
16834709573Sraf
16934709573Sraf /*
17034709573Sraf * timer_spawner(), mqueue_spawner(), and aio_spawner() are the main
17134709573Sraf * functions for the daemon threads that get the event(s) for the
17234709573Sraf * respective SIGEV_THREAD subsystems. There is one timer spawner for
17334709573Sraf * each timer_create(), one mqueue spawner for every mq_open(), and
17434709573Sraf * exactly one aio spawner for all aio requests. These spawners add
17534709573Sraf * work requests to be done by a pool of daemon worker threads. In case
17634709573Sraf * the event requires creation of a worker thread with different pthread
17734709573Sraf * attributes than those from the pool of workers, a new daemon thread
17834709573Sraf * with these attributes is spawned apart from the pool of workers.
17934709573Sraf * If the spawner fails to add work or fails to create an additional
18034709573Sraf * thread because of lacking resources, it puts the event back into
18134709573Sraf * the kernel queue and re-tries some time later.
18234709573Sraf */
18334709573Sraf
18434709573Sraf void *
timer_spawner(void * arg)18534709573Sraf timer_spawner(void *arg)
18634709573Sraf {
18734709573Sraf thread_communication_data_t *tcdp = (thread_communication_data_t *)arg;
18834709573Sraf port_event_t port_event;
18934709573Sraf
19034709573Sraf /* destroy the pool if we are cancelled */
19134709573Sraf pthread_cleanup_push(sigev_destroy_pool, tcdp);
19234709573Sraf
19334709573Sraf for (;;) {
19434709573Sraf if (port_get(tcdp->tcd_port, &port_event, NULL) != 0) {
19534709573Sraf dprintf("port_get on port %d failed with %d <%s>\n",
19634709573Sraf tcdp->tcd_port, errno, strerror(errno));
19734709573Sraf break;
19834709573Sraf }
19934709573Sraf switch (port_event.portev_source) {
20034709573Sraf case PORT_SOURCE_TIMER:
20134709573Sraf break;
20234709573Sraf case PORT_SOURCE_ALERT:
20334709573Sraf if (port_event.portev_events != SIGEV_THREAD_TERM)
20434709573Sraf errno = EPROTO;
20534709573Sraf goto out;
20634709573Sraf default:
20734709573Sraf dprintf("port_get on port %d returned %u "
20834709573Sraf "(not PORT_SOURCE_TIMER)\n",
20934709573Sraf tcdp->tcd_port, port_event.portev_source);
21034709573Sraf errno = EPROTO;
21134709573Sraf goto out;
21234709573Sraf }
21334709573Sraf
21434709573Sraf tcdp->tcd_overruns = port_event.portev_events - 1;
21534709573Sraf if (sigev_add_work(tcdp,
21634709573Sraf tcdp->tcd_notif.sigev_notify_function,
21734709573Sraf tcdp->tcd_notif.sigev_value) != 0)
21834709573Sraf break;
21934709573Sraf /* wait until job is done before looking for another */
22034709573Sraf tpool_wait(tcdp->tcd_poolp);
22134709573Sraf }
22234709573Sraf out:
22334709573Sraf pthread_cleanup_pop(1);
22434709573Sraf return (NULL);
22534709573Sraf }
22634709573Sraf
22734709573Sraf void *
mqueue_spawner(void * arg)22834709573Sraf mqueue_spawner(void *arg)
22934709573Sraf {
23034709573Sraf thread_communication_data_t *tcdp = (thread_communication_data_t *)arg;
23134709573Sraf int ret = 0;
23234709573Sraf int ntype;
23334709573Sraf void (*function)(union sigval);
23434709573Sraf union sigval argument;
23534709573Sraf
23634709573Sraf /* destroy the pool if we are cancelled */
23734709573Sraf pthread_cleanup_push(sigev_destroy_pool, tcdp);
23834709573Sraf
23934709573Sraf while (ret == 0) {
240f841f6adSraf sig_mutex_lock(&tcdp->tcd_lock);
241f841f6adSraf pthread_cleanup_push(sig_mutex_unlock, &tcdp->tcd_lock);
24234709573Sraf while ((ntype = tcdp->tcd_msg_enabled) == 0)
243f841f6adSraf (void) sig_cond_wait(&tcdp->tcd_cv, &tcdp->tcd_lock);
24434709573Sraf pthread_cleanup_pop(1);
24534709573Sraf
24634709573Sraf while (sem_wait(tcdp->tcd_msg_avail) == -1)
24734709573Sraf continue;
24834709573Sraf
249f841f6adSraf sig_mutex_lock(&tcdp->tcd_lock);
25034709573Sraf tcdp->tcd_msg_enabled = 0;
251f841f6adSraf sig_mutex_unlock(&tcdp->tcd_lock);
25234709573Sraf
25334709573Sraf /* ASSERT(ntype == SIGEV_THREAD || ntype == SIGEV_PORT); */
25434709573Sraf if (ntype == SIGEV_THREAD) {
25534709573Sraf function = tcdp->tcd_notif.sigev_notify_function;
25634709573Sraf argument.sival_ptr = tcdp->tcd_msg_userval;
25734709573Sraf ret = sigev_add_work(tcdp, function, argument);
25834709573Sraf } else { /* ntype == SIGEV_PORT */
25934709573Sraf ret = _port_dispatch(tcdp->tcd_port, 0, PORT_SOURCE_MQ,
26034709573Sraf 0, (uintptr_t)tcdp->tcd_msg_object,
26134709573Sraf tcdp->tcd_msg_userval);
26234709573Sraf }
26334709573Sraf }
264f841f6adSraf sig_mutex_unlock(&tcdp->tcd_lock);
26534709573Sraf
26634709573Sraf pthread_cleanup_pop(1);
26734709573Sraf return (NULL);
26834709573Sraf }
26934709573Sraf
27034709573Sraf void *
aio_spawner(void * arg)27134709573Sraf aio_spawner(void *arg)
27234709573Sraf {
27334709573Sraf thread_communication_data_t *tcdp = (thread_communication_data_t *)arg;
27434709573Sraf int error = 0;
27534709573Sraf void (*function)(union sigval);
27634709573Sraf union sigval argument;
27734709573Sraf port_event_t port_event;
27834709573Sraf struct sigevent *sigevp;
27934709573Sraf timespec_t delta;
28034709573Sraf pthread_attr_t *attrp;
28134709573Sraf
28234709573Sraf /* destroy the pool if we are cancelled */
28334709573Sraf pthread_cleanup_push(sigev_destroy_pool, tcdp);
28434709573Sraf
28534709573Sraf while (error == 0) {
28634709573Sraf if (port_get(tcdp->tcd_port, &port_event, NULL) != 0) {
28734709573Sraf error = errno;
28834709573Sraf dprintf("port_get on port %d failed with %d <%s>\n",
28934709573Sraf tcdp->tcd_port, error, strerror(error));
29034709573Sraf break;
29134709573Sraf }
29234709573Sraf switch (port_event.portev_source) {
29334709573Sraf case PORT_SOURCE_AIO:
29434709573Sraf break;
29534709573Sraf case PORT_SOURCE_ALERT:
29634709573Sraf if (port_event.portev_events != SIGEV_THREAD_TERM)
29734709573Sraf errno = EPROTO;
29834709573Sraf goto out;
29934709573Sraf default:
30034709573Sraf dprintf("port_get on port %d returned %u "
30134709573Sraf "(not PORT_SOURCE_AIO)\n",
30234709573Sraf tcdp->tcd_port, port_event.portev_source);
30334709573Sraf errno = EPROTO;
30434709573Sraf goto out;
30534709573Sraf }
30634709573Sraf argument.sival_ptr = port_event.portev_user;
30734709573Sraf switch (port_event.portev_events) {
30834709573Sraf case AIOLIO:
30934709573Sraf #if !defined(_LP64)
31034709573Sraf case AIOLIO64:
31134709573Sraf #endif
31234709573Sraf sigevp = (struct sigevent *)port_event.portev_object;
31334709573Sraf function = sigevp->sigev_notify_function;
31434709573Sraf attrp = sigevp->sigev_notify_attributes;
31534709573Sraf break;
31634709573Sraf case AIOAREAD:
31734709573Sraf case AIOAWRITE:
31834709573Sraf case AIOFSYNC:
319a574db85Sraf {
32034709573Sraf aiocb_t *aiocbp =
32134709573Sraf (aiocb_t *)port_event.portev_object;
32234709573Sraf function = aiocbp->aio_sigevent.sigev_notify_function;
32334709573Sraf attrp = aiocbp->aio_sigevent.sigev_notify_attributes;
32434709573Sraf break;
325a574db85Sraf }
32634709573Sraf #if !defined(_LP64)
32734709573Sraf case AIOAREAD64:
32834709573Sraf case AIOAWRITE64:
32934709573Sraf case AIOFSYNC64:
330a574db85Sraf {
33134709573Sraf aiocb64_t *aiocbp =
33234709573Sraf (aiocb64_t *)port_event.portev_object;
33334709573Sraf function = aiocbp->aio_sigevent.sigev_notify_function;
33434709573Sraf attrp = aiocbp->aio_sigevent.sigev_notify_attributes;
33534709573Sraf break;
336a574db85Sraf }
33734709573Sraf #endif
33834709573Sraf default:
33934709573Sraf function = NULL;
34034709573Sraf attrp = NULL;
34134709573Sraf break;
34234709573Sraf }
34334709573Sraf
34434709573Sraf if (function == NULL)
34534709573Sraf error = EINVAL;
346*7257d1b4Sraf else if (pthread_attr_equal(attrp, tcdp->tcd_attrp))
34734709573Sraf error = sigev_add_work(tcdp, function, argument);
34834709573Sraf else {
34934709573Sraf /*
35034709573Sraf * The attributes don't match.
35134709573Sraf * Spawn a thread with the non-matching attributes.
35234709573Sraf */
35334709573Sraf pthread_attr_t local_attr;
35434709573Sraf sigev_thread_data_t *stdp;
35534709573Sraf
356f841f6adSraf if ((stdp = lmalloc(sizeof (*stdp))) == NULL)
35734709573Sraf error = ENOMEM;
35834709573Sraf else
359*7257d1b4Sraf error = pthread_attr_clone(&local_attr, attrp);
36034709573Sraf
36134709573Sraf if (error == 0) {
36234709573Sraf (void) pthread_attr_setdetachstate(
36334709573Sraf &local_attr, PTHREAD_CREATE_DETACHED);
364*7257d1b4Sraf (void) pthread_attr_setdaemonstate_np(
36534709573Sraf &local_attr, PTHREAD_CREATE_DAEMON_NP);
36634709573Sraf stdp->std_func = function;
36734709573Sraf stdp->std_arg = argument;
36834709573Sraf error = pthread_create(NULL, &local_attr,
36934709573Sraf notify_thread, stdp);
37034709573Sraf (void) pthread_attr_destroy(&local_attr);
37134709573Sraf }
37234709573Sraf if (error && stdp != NULL)
373f841f6adSraf lfree(stdp, sizeof (*stdp));
37434709573Sraf }
37534709573Sraf
37634709573Sraf if (error) {
37734709573Sraf dprintf("Cannot add work, error=%d <%s>.\n",
37834709573Sraf error, strerror(error));
37934709573Sraf if (error == EAGAIN || error == ENOMEM) {
38034709573Sraf /* (Temporary) no resources are available. */
38134709573Sraf if (_port_dispatch(tcdp->tcd_port, 0,
38234709573Sraf PORT_SOURCE_AIO, port_event.portev_events,
38334709573Sraf port_event.portev_object,
38434709573Sraf port_event.portev_user) != 0)
38534709573Sraf break;
38634709573Sraf error = 0;
38734709573Sraf delta.tv_sec = 0;
38834709573Sraf delta.tv_nsec = NANOSEC / 20; /* 50 msec */
38934709573Sraf (void) nanosleep(&delta, NULL);
39034709573Sraf }
39134709573Sraf }
39234709573Sraf }
39334709573Sraf out:
39434709573Sraf pthread_cleanup_pop(1);
39534709573Sraf return (NULL);
39634709573Sraf }
39734709573Sraf
39834709573Sraf /*
39934709573Sraf * Allocate a thread_communication_data_t block.
40034709573Sraf */
40134709573Sraf static thread_communication_data_t *
alloc_sigev_handler(subsystem_t caller)40234709573Sraf alloc_sigev_handler(subsystem_t caller)
40334709573Sraf {
40434709573Sraf thread_communication_data_t *tcdp;
40534709573Sraf
406f841f6adSraf if ((tcdp = lmalloc(sizeof (*tcdp))) != NULL) {
407f841f6adSraf tcdp->tcd_subsystem = caller;
408f841f6adSraf tcdp->tcd_port = -1;
409f841f6adSraf (void) mutex_init(&tcdp->tcd_lock, USYNC_THREAD, NULL);
410f841f6adSraf (void) cond_init(&tcdp->tcd_cv, USYNC_THREAD, NULL);
41134709573Sraf }
41234709573Sraf return (tcdp);
41334709573Sraf }
41434709573Sraf
41534709573Sraf /*
41634709573Sraf * Free a thread_communication_data_t block.
41734709573Sraf */
41834709573Sraf void
free_sigev_handler(thread_communication_data_t * tcdp)41934709573Sraf free_sigev_handler(thread_communication_data_t *tcdp)
42034709573Sraf {
42134709573Sraf if (tcdp->tcd_attrp) {
42234709573Sraf (void) pthread_attr_destroy(tcdp->tcd_attrp);
42334709573Sraf tcdp->tcd_attrp = NULL;
42434709573Sraf }
42534709573Sraf (void) memset(&tcdp->tcd_notif, 0, sizeof (tcdp->tcd_notif));
42634709573Sraf
42734709573Sraf switch (tcdp->tcd_subsystem) {
42834709573Sraf case TIMER:
42934709573Sraf case AIO:
43034709573Sraf if (tcdp->tcd_port >= 0)
43134709573Sraf (void) close(tcdp->tcd_port);
43234709573Sraf break;
43334709573Sraf case MQ:
43434709573Sraf tcdp->tcd_msg_avail = NULL;
43534709573Sraf tcdp->tcd_msg_object = NULL;
43634709573Sraf tcdp->tcd_msg_userval = NULL;
43734709573Sraf tcdp->tcd_msg_enabled = 0;
43834709573Sraf break;
43934709573Sraf }
44034709573Sraf
441f841f6adSraf lfree(tcdp, sizeof (*tcdp));
44234709573Sraf }
44334709573Sraf
44434709573Sraf /*
44534709573Sraf * Initialize data structure and create the port.
44634709573Sraf */
44734709573Sraf thread_communication_data_t *
setup_sigev_handler(const struct sigevent * sigevp,subsystem_t caller)44834709573Sraf setup_sigev_handler(const struct sigevent *sigevp, subsystem_t caller)
44934709573Sraf {
45034709573Sraf thread_communication_data_t *tcdp;
45134709573Sraf int error;
45234709573Sraf
45334709573Sraf if (sigevp == NULL) {
45434709573Sraf errno = EINVAL;
45534709573Sraf return (NULL);
45634709573Sraf }
45734709573Sraf
45834709573Sraf if ((tcdp = alloc_sigev_handler(caller)) == NULL) {
45934709573Sraf errno = ENOMEM;
46034709573Sraf return (NULL);
46134709573Sraf }
46234709573Sraf
46334709573Sraf if (sigevp->sigev_notify_attributes == NULL)
46434709573Sraf tcdp->tcd_attrp = NULL; /* default attributes */
46534709573Sraf else {
46634709573Sraf /*
46734709573Sraf * We cannot just copy the sigevp->sigev_notify_attributes
46834709573Sraf * pointer. We need to initialize a new pthread_attr_t
46934709573Sraf * structure with the values from the user-supplied
47034709573Sraf * pthread_attr_t.
47134709573Sraf */
47234709573Sraf tcdp->tcd_attrp = &tcdp->tcd_user_attr;
473*7257d1b4Sraf error = pthread_attr_clone(tcdp->tcd_attrp,
474a574db85Sraf sigevp->sigev_notify_attributes);
47534709573Sraf if (error) {
47634709573Sraf tcdp->tcd_attrp = NULL;
47734709573Sraf free_sigev_handler(tcdp);
47834709573Sraf errno = error;
47934709573Sraf return (NULL);
48034709573Sraf }
48134709573Sraf }
48234709573Sraf tcdp->tcd_notif = *sigevp;
48334709573Sraf tcdp->tcd_notif.sigev_notify_attributes = tcdp->tcd_attrp;
48434709573Sraf
48534709573Sraf if (caller == TIMER || caller == AIO) {
48634709573Sraf if ((tcdp->tcd_port = port_create()) < 0 ||
48734709573Sraf fcntl(tcdp->tcd_port, FD_CLOEXEC) == -1) {
48834709573Sraf free_sigev_handler(tcdp);
48934709573Sraf errno = EBADF;
49034709573Sraf return (NULL);
49134709573Sraf }
49234709573Sraf }
49334709573Sraf return (tcdp);
49434709573Sraf }
49534709573Sraf
49634709573Sraf /*
49734709573Sraf * Create a thread pool and launch the spawner.
49834709573Sraf */
49934709573Sraf int
launch_spawner(thread_communication_data_t * tcdp)50034709573Sraf launch_spawner(thread_communication_data_t *tcdp)
50134709573Sraf {
50234709573Sraf int ret;
50334709573Sraf int maxworkers;
50434709573Sraf void *(*spawner)(void *);
50534709573Sraf sigset_t set;
50634709573Sraf sigset_t oset;
50734709573Sraf
50834709573Sraf switch (tcdp->tcd_subsystem) {
50934709573Sraf case TIMER:
51034709573Sraf spawner = timer_spawner;
51134709573Sraf maxworkers = 1;
51234709573Sraf break;
51334709573Sraf case MQ:
51434709573Sraf spawner = mqueue_spawner;
51534709573Sraf maxworkers = 1;
51634709573Sraf break;
51734709573Sraf case AIO:
51834709573Sraf spawner = aio_spawner;
51934709573Sraf maxworkers = 100;
52034709573Sraf break;
52134709573Sraf default:
52234709573Sraf return (-1);
52334709573Sraf }
52434709573Sraf tcdp->tcd_poolp = tpool_create(1, maxworkers, 20,
52534709573Sraf tcdp->tcd_notif.sigev_notify_attributes);
52634709573Sraf if (tcdp->tcd_poolp == NULL)
52734709573Sraf return (-1);
52834709573Sraf /* create the spawner with all signals blocked */
52934709573Sraf (void) sigfillset(&set);
53034709573Sraf (void) thr_sigsetmask(SIG_SETMASK, &set, &oset);
53134709573Sraf ret = thr_create(NULL, 0, spawner, tcdp,
53234709573Sraf THR_DETACHED | THR_DAEMON, &tcdp->tcd_server_id);
53334709573Sraf (void) thr_sigsetmask(SIG_SETMASK, &oset, NULL);
53434709573Sraf if (ret != 0) {
53534709573Sraf tpool_destroy(tcdp->tcd_poolp);
53634709573Sraf tcdp->tcd_poolp = NULL;
53734709573Sraf return (-1);
53834709573Sraf }
53934709573Sraf return (0);
54034709573Sraf }
54134709573Sraf
54234709573Sraf /*
54334709573Sraf * Delete the data associated with the sigev_thread timer, if timer is
54434709573Sraf * associated with such a notification option.
54534709573Sraf * Destroy the timer_spawner thread.
54634709573Sraf */
54734709573Sraf int
del_sigev_timer(timer_t timer)54834709573Sraf del_sigev_timer(timer_t timer)
54934709573Sraf {
55034709573Sraf int rc = 0;
55134709573Sraf thread_communication_data_t *tcdp;
55234709573Sraf
55334709573Sraf if ((uint_t)timer < timer_max && (tcdp = timer_tcd[timer]) != NULL) {
554f841f6adSraf sig_mutex_lock(&tcdp->tcd_lock);
55534709573Sraf if (tcdp->tcd_port >= 0) {
55634709573Sraf if ((rc = port_alert(tcdp->tcd_port,
55734709573Sraf PORT_ALERT_SET, SIGEV_THREAD_TERM, NULL)) == 0) {
55834709573Sraf dprintf("del_sigev_timer(%d) OK.\n", timer);
55934709573Sraf }
56034709573Sraf }
56134709573Sraf timer_tcd[timer] = NULL;
562f841f6adSraf sig_mutex_unlock(&tcdp->tcd_lock);
56334709573Sraf }
56434709573Sraf return (rc);
56534709573Sraf }
56634709573Sraf
56734709573Sraf int
sigev_timer_getoverrun(timer_t timer)56834709573Sraf sigev_timer_getoverrun(timer_t timer)
56934709573Sraf {
57034709573Sraf thread_communication_data_t *tcdp;
57134709573Sraf
57234709573Sraf if ((uint_t)timer < timer_max && (tcdp = timer_tcd[timer]) != NULL)
57334709573Sraf return (tcdp->tcd_overruns);
57434709573Sraf return (0);
57534709573Sraf }
57634709573Sraf
577f841f6adSraf static void
del_sigev_mq_cleanup(thread_communication_data_t * tcdp)578f841f6adSraf del_sigev_mq_cleanup(thread_communication_data_t *tcdp)
579f841f6adSraf {
580f841f6adSraf sig_mutex_unlock(&tcdp->tcd_lock);
581f841f6adSraf free_sigev_handler(tcdp);
582f841f6adSraf }
583f841f6adSraf
58434709573Sraf /*
58534709573Sraf * Delete the data associated with the sigev_thread message queue,
58634709573Sraf * if the message queue is associated with such a notification option.
58734709573Sraf * Destroy the mqueue_spawner thread.
58834709573Sraf */
58934709573Sraf void
del_sigev_mq(thread_communication_data_t * tcdp)59034709573Sraf del_sigev_mq(thread_communication_data_t *tcdp)
59134709573Sraf {
59234709573Sraf pthread_t server_id;
59334709573Sraf int rc;
59434709573Sraf
595f841f6adSraf sig_mutex_lock(&tcdp->tcd_lock);
596f841f6adSraf
59734709573Sraf server_id = tcdp->tcd_server_id;
59834709573Sraf tcdp->tcd_msg_closing = 1;
599f841f6adSraf if ((rc = pthread_cancel(server_id)) != 0) { /* "can't happen" */
600f841f6adSraf sig_mutex_unlock(&tcdp->tcd_lock);
60134709573Sraf dprintf("Fail to cancel %u with error %d <%s>.\n",
60234709573Sraf server_id, rc, strerror(rc));
603f841f6adSraf return;
604f841f6adSraf }
605f841f6adSraf
606f841f6adSraf /*
607f841f6adSraf * wait for sigev_destroy_pool() to finish
608f841f6adSraf */
609f841f6adSraf pthread_cleanup_push(del_sigev_mq_cleanup, tcdp);
610f841f6adSraf while (tcdp->tcd_server_id == server_id)
611f841f6adSraf (void) sig_cond_wait(&tcdp->tcd_cv, &tcdp->tcd_lock);
612f841f6adSraf pthread_cleanup_pop(1);
613f841f6adSraf }
614f841f6adSraf
615f841f6adSraf /*
616f841f6adSraf * POSIX aio:
617f841f6adSraf * If the notification type is SIGEV_THREAD, set up
618f841f6adSraf * the port number for notifications. Create the
619f841f6adSraf * thread pool and launch the spawner if necessary.
620f841f6adSraf * If the notification type is not SIGEV_THREAD, do nothing.
621f841f6adSraf */
622f841f6adSraf int
_aio_sigev_thread_init(struct sigevent * sigevp)623f841f6adSraf _aio_sigev_thread_init(struct sigevent *sigevp)
624f841f6adSraf {
625f841f6adSraf static mutex_t sigev_aio_lock = DEFAULTMUTEX;
626f841f6adSraf static cond_t sigev_aio_cv = DEFAULTCV;
627f841f6adSraf static int sigev_aio_busy = 0;
628f841f6adSraf
629f841f6adSraf thread_communication_data_t *tcdp;
630f841f6adSraf int port;
631a574db85Sraf int cancel_state;
632f841f6adSraf int rc = 0;
633f841f6adSraf
634f841f6adSraf if (sigevp == NULL ||
635f841f6adSraf sigevp->sigev_notify != SIGEV_THREAD ||
636f841f6adSraf sigevp->sigev_notify_function == NULL)
637f841f6adSraf return (0);
638f841f6adSraf
639f841f6adSraf lmutex_lock(&sigev_aio_lock);
640a574db85Sraf (void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state);
641f841f6adSraf while (sigev_aio_busy)
642a574db85Sraf (void) cond_wait(&sigev_aio_cv, &sigev_aio_lock);
643a574db85Sraf (void) pthread_setcancelstate(cancel_state, NULL);
644f841f6adSraf if ((tcdp = sigev_aio_tcd) != NULL)
645f841f6adSraf port = tcdp->tcd_port;
646f841f6adSraf else {
647f841f6adSraf sigev_aio_busy = 1;
648f841f6adSraf lmutex_unlock(&sigev_aio_lock);
649f841f6adSraf
650f841f6adSraf tcdp = setup_sigev_handler(sigevp, AIO);
651f841f6adSraf if (tcdp == NULL) {
652f841f6adSraf port = -1;
653f841f6adSraf rc = -1;
654f841f6adSraf } else if (launch_spawner(tcdp) != 0) {
655f841f6adSraf free_sigev_handler(tcdp);
656f841f6adSraf tcdp = NULL;
657f841f6adSraf port = -1;
658f841f6adSraf rc = -1;
659f841f6adSraf } else {
660f841f6adSraf port = tcdp->tcd_port;
661f841f6adSraf }
662f841f6adSraf
663f841f6adSraf lmutex_lock(&sigev_aio_lock);
664f841f6adSraf sigev_aio_tcd = tcdp;
665f841f6adSraf sigev_aio_busy = 0;
666f841f6adSraf (void) cond_broadcast(&sigev_aio_cv);
667f841f6adSraf }
668f841f6adSraf lmutex_unlock(&sigev_aio_lock);
669f841f6adSraf sigevp->sigev_signo = port;
670f841f6adSraf return (rc);
671f841f6adSraf }
672f841f6adSraf
673f841f6adSraf int
_aio_sigev_thread(aiocb_t * aiocbp)674f841f6adSraf _aio_sigev_thread(aiocb_t *aiocbp)
675f841f6adSraf {
676f841f6adSraf if (aiocbp == NULL)
677f841f6adSraf return (0);
678f841f6adSraf return (_aio_sigev_thread_init(&aiocbp->aio_sigevent));
679f841f6adSraf }
680f841f6adSraf
681f841f6adSraf #if !defined(_LP64)
682f841f6adSraf int
_aio_sigev_thread64(aiocb64_t * aiocbp)683f841f6adSraf _aio_sigev_thread64(aiocb64_t *aiocbp)
684f841f6adSraf {
685f841f6adSraf if (aiocbp == NULL)
686f841f6adSraf return (0);
687f841f6adSraf return (_aio_sigev_thread_init(&aiocbp->aio_sigevent));
688f841f6adSraf }
689f841f6adSraf #endif
690f841f6adSraf
691f841f6adSraf /*
692f841f6adSraf * Cleanup POSIX aio after fork1() in the child process.
693f841f6adSraf */
694f841f6adSraf void
postfork1_child_sigev_aio(void)695f841f6adSraf postfork1_child_sigev_aio(void)
696f841f6adSraf {
697f841f6adSraf thread_communication_data_t *tcdp;
698f841f6adSraf
699f841f6adSraf if ((tcdp = sigev_aio_tcd) != NULL) {
700f841f6adSraf sigev_aio_tcd = NULL;
701f841f6adSraf tcd_teardown(tcdp);
70234709573Sraf }
703f841f6adSraf }
704f841f6adSraf
705f841f6adSraf /*
706f841f6adSraf * Utility function for the various postfork1_child_sigev_*() functions.
707f841f6adSraf * Clean up the tcdp data structure and close the port.
708f841f6adSraf */
709f841f6adSraf void
tcd_teardown(thread_communication_data_t * tcdp)710f841f6adSraf tcd_teardown(thread_communication_data_t *tcdp)
711f841f6adSraf {
712f841f6adSraf if (tcdp->tcd_poolp != NULL)
713f841f6adSraf tpool_abandon(tcdp->tcd_poolp);
714f841f6adSraf tcdp->tcd_poolp = NULL;
715f841f6adSraf tcdp->tcd_server_id = 0;
716f841f6adSraf free_sigev_handler(tcdp);
71734709573Sraf }
718