/* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License (the "License"). * You may not use this file except in compliance with the License. * * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE * or http://www.opensolaris.org/os/licensing. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at usr/src/OPENSOLARIS.LICENSE. * If applicable, add the following below this CDDL HEADER, with the * fields enclosed by brackets "[]" replaced with your own identifying * information: Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END */ /* * Copyright 2008 Sun Microsystems, Inc. All rights reserved. * Use is subject to license terms. */ #pragma ident "%Z%%M% %I% %E% SMI" #include "lint.h" #include "thr_uberdata.h" #include "asyncio.h" #include #include #include #include static int _aio_hash_insert(aio_result_t *, aio_req_t *); static aio_req_t *_aio_req_get(aio_worker_t *); static void _aio_req_add(aio_req_t *, aio_worker_t **, int); static void _aio_req_del(aio_worker_t *, aio_req_t *, int); static void _aio_work_done(aio_worker_t *); static void _aio_enq_doneq(aio_req_t *); extern void _aio_lio_free(aio_lio_t *); extern int __fdsync(int, int); extern int __fcntl(int, int, ...); extern int _port_dispatch(int, int, int, int, uintptr_t, void *); static int _aio_fsync_del(aio_worker_t *, aio_req_t *); static void _aiodone(aio_req_t *, ssize_t, int); static void _aio_cancel_work(aio_worker_t *, int, int *, int *); static void _aio_finish_request(aio_worker_t *, ssize_t, int); /* * switch for kernel async I/O */ int _kaio_ok = 0; /* 0 = disabled, 1 = on, -1 = error */ /* * Key for thread-specific data */ pthread_key_t _aio_key; /* * Array for determining whether or not a file supports kaio. * Initialized in _kaio_init(). */ uint32_t *_kaio_supported = NULL; /* * workers for read/write requests * (__aio_mutex lock protects circular linked list of workers) */ aio_worker_t *__workers_rw; /* circular list of AIO workers */ aio_worker_t *__nextworker_rw; /* next worker in list of workers */ int __rw_workerscnt; /* number of read/write workers */ /* * worker for notification requests. */ aio_worker_t *__workers_no; /* circular list of AIO workers */ aio_worker_t *__nextworker_no; /* next worker in list of workers */ int __no_workerscnt; /* number of write workers */ aio_req_t *_aio_done_tail; /* list of done requests */ aio_req_t *_aio_done_head; mutex_t __aio_initlock = DEFAULTMUTEX; /* makes aio initialization atomic */ cond_t __aio_initcv = DEFAULTCV; int __aio_initbusy = 0; mutex_t __aio_mutex = DEFAULTMUTEX; /* protects counts, and linked lists */ cond_t _aio_iowait_cv = DEFAULTCV; /* wait for userland I/Os */ pid_t __pid = (pid_t)-1; /* initialize as invalid pid */ int _sigio_enabled = 0; /* when set, send SIGIO signal */ aio_hash_t *_aio_hash; aio_req_t *_aio_doneq; /* double linked done queue list */ int _aio_donecnt = 0; int _aio_waitncnt = 0; /* # of requests for aio_waitn */ int _aio_doneq_cnt = 0; int _aio_outstand_cnt = 0; /* # of outstanding requests */ int _kaio_outstand_cnt = 0; /* # of outstanding kaio requests */ int _aio_req_done_cnt = 0; /* req. done but not in "done queue" */ int _aio_kernel_suspend = 0; /* active kernel kaio calls */ int _aio_suscv_cnt = 0; /* aio_suspend calls waiting on cv's */ int _max_workers = 256; /* max number of workers permitted */ int _min_workers = 4; /* min number of workers */ int _minworkload = 2; /* min number of request in q */ int _aio_worker_cnt = 0; /* number of workers to do requests */ int __uaio_ok = 0; /* AIO has been enabled */ sigset_t _worker_set; /* worker's signal mask */ int _aiowait_flag = 0; /* when set, aiowait() is inprogress */ int _aio_flags = 0; /* see asyncio.h defines for */ aio_worker_t *_kaiowp = NULL; /* points to kaio cleanup thread */ int hz; /* clock ticks per second */ static int _kaio_supported_init(void) { void *ptr; size_t size; if (_kaio_supported != NULL) /* already initialized */ return (0); size = MAX_KAIO_FDARRAY_SIZE * sizeof (uint32_t); ptr = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANON, -1, (off_t)0); if (ptr == MAP_FAILED) return (-1); _kaio_supported = ptr; return (0); } /* * The aio subsystem is initialized when an AIO request is made. * Constants are initialized like the max number of workers that * the subsystem can create, and the minimum number of workers * permitted before imposing some restrictions. Also, some * workers are created. */ int __uaio_init(void) { int ret = -1; int i; int cancel_state; lmutex_lock(&__aio_initlock); (void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state); while (__aio_initbusy) (void) cond_wait(&__aio_initcv, &__aio_initlock); (void) pthread_setcancelstate(cancel_state, NULL); if (__uaio_ok) { /* already initialized */ lmutex_unlock(&__aio_initlock); return (0); } __aio_initbusy = 1; lmutex_unlock(&__aio_initlock); hz = (int)sysconf(_SC_CLK_TCK); __pid = getpid(); setup_cancelsig(SIGAIOCANCEL); if (_kaio_supported_init() != 0) goto out; /* * Allocate and initialize the hash table. * Do this only once, even if __uaio_init() is called twice. */ if (_aio_hash == NULL) { /* LINTED pointer cast */ _aio_hash = (aio_hash_t *)mmap(NULL, HASHSZ * sizeof (aio_hash_t), PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANON, -1, (off_t)0); if ((void *)_aio_hash == MAP_FAILED) { _aio_hash = NULL; goto out; } for (i = 0; i < HASHSZ; i++) (void) mutex_init(&_aio_hash[i].hash_lock, USYNC_THREAD, NULL); } /* * Initialize worker's signal mask to only catch SIGAIOCANCEL. */ (void) sigfillset(&_worker_set); (void) sigdelset(&_worker_set, SIGAIOCANCEL); /* * Create one worker to send asynchronous notifications. * Do this only once, even if __uaio_init() is called twice. */ if (__no_workerscnt == 0 && (_aio_create_worker(NULL, AIONOTIFY) != 0)) { errno = EAGAIN; goto out; } /* * Create the minimum number of read/write workers. * And later check whether atleast one worker is created; * lwp_create() calls could fail because of segkp exhaustion. */ for (i = 0; i < _min_workers; i++) (void) _aio_create_worker(NULL, AIOREAD); if (__rw_workerscnt == 0) { errno = EAGAIN; goto out; } ret = 0; out: lmutex_lock(&__aio_initlock); if (ret == 0) __uaio_ok = 1; __aio_initbusy = 0; (void) cond_broadcast(&__aio_initcv); lmutex_unlock(&__aio_initlock); return (ret); } /* * Called from close() before actually performing the real _close(). */ void _aio_close(int fd) { if (fd < 0) /* avoid cancelling everything */ return; /* * Cancel all outstanding aio requests for this file descriptor. */ if (__uaio_ok) (void) aiocancel_all(fd); /* * If we have allocated the bit array, clear the bit for this file. * The next open may re-use this file descriptor and the new file * may have different kaio() behaviour. */ if (_kaio_supported != NULL) CLEAR_KAIO_SUPPORTED(fd); } /* * special kaio cleanup thread sits in a loop in the * kernel waiting for pending kaio requests to complete. */ void * _kaio_cleanup_thread(void *arg) { if (pthread_setspecific(_aio_key, arg) != 0) aio_panic("_kaio_cleanup_thread, pthread_setspecific()"); (void) _kaio(AIOSTART); return (arg); } /* * initialize kaio. */ void _kaio_init() { int error; sigset_t oset; int cancel_state; lmutex_lock(&__aio_initlock); (void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state); while (__aio_initbusy) (void) cond_wait(&__aio_initcv, &__aio_initlock); (void) pthread_setcancelstate(cancel_state, NULL); if (_kaio_ok) { /* already initialized */ lmutex_unlock(&__aio_initlock); return; } __aio_initbusy = 1; lmutex_unlock(&__aio_initlock); if (_kaio_supported_init() != 0) error = ENOMEM; else if ((_kaiowp = _aio_worker_alloc()) == NULL) error = ENOMEM; else if ((error = (int)_kaio(AIOINIT)) == 0) { (void) pthread_sigmask(SIG_SETMASK, &maskset, &oset); error = thr_create(NULL, AIOSTKSIZE, _kaio_cleanup_thread, _kaiowp, THR_DAEMON, &_kaiowp->work_tid); (void) pthread_sigmask(SIG_SETMASK, &oset, NULL); } if (error && _kaiowp != NULL) { _aio_worker_free(_kaiowp); _kaiowp = NULL; } lmutex_lock(&__aio_initlock); if (error) _kaio_ok = -1; else _kaio_ok = 1; __aio_initbusy = 0; (void) cond_broadcast(&__aio_initcv); lmutex_unlock(&__aio_initlock); } int aioread(int fd, caddr_t buf, int bufsz, off_t offset, int whence, aio_result_t *resultp) { return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOREAD)); } int aiowrite(int fd, caddr_t buf, int bufsz, off_t offset, int whence, aio_result_t *resultp) { return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOWRITE)); } #if !defined(_LP64) int aioread64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence, aio_result_t *resultp) { return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAREAD64)); } int aiowrite64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence, aio_result_t *resultp) { return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAWRITE64)); } #endif /* !defined(_LP64) */ int _aiorw(int fd, caddr_t buf, int bufsz, offset_t offset, int whence, aio_result_t *resultp, int mode) { aio_req_t *reqp; aio_args_t *ap; offset_t loffset; struct stat64 stat64; int error = 0; int kerr; int umode; switch (whence) { case SEEK_SET: loffset = offset; break; case SEEK_CUR: if ((loffset = llseek(fd, 0, SEEK_CUR)) == -1) error = -1; else loffset += offset; break; case SEEK_END: if (fstat64(fd, &stat64) == -1) error = -1; else loffset = offset + stat64.st_size; break; default: errno = EINVAL; error = -1; } if (error) return (error); /* initialize kaio */ if (!_kaio_ok) _kaio_init(); /* * _aio_do_request() needs the original request code (mode) to be able * to choose the appropiate 32/64 bit function. All other functions * only require the difference between READ and WRITE (umode). */ if (mode == AIOAREAD64 || mode == AIOAWRITE64) umode = mode - AIOAREAD64; else umode = mode; /* * Try kernel aio first. * If errno is ENOTSUP/EBADFD, fall back to the thread implementation. */ if (_kaio_ok > 0 && KAIO_SUPPORTED(fd)) { resultp->aio_errno = 0; sig_mutex_lock(&__aio_mutex); _kaio_outstand_cnt++; sig_mutex_unlock(&__aio_mutex); kerr = (int)_kaio(((resultp->aio_return == AIO_INPROGRESS) ? (umode | AIO_POLL_BIT) : umode), fd, buf, bufsz, loffset, resultp); if (kerr == 0) { return (0); } sig_mutex_lock(&__aio_mutex); _kaio_outstand_cnt--; sig_mutex_unlock(&__aio_mutex); if (errno != ENOTSUP && errno != EBADFD) return (-1); if (errno == EBADFD) SET_KAIO_NOT_SUPPORTED(fd); } if (!__uaio_ok && __uaio_init() == -1) return (-1); if ((reqp = _aio_req_alloc()) == NULL) { errno = EAGAIN; return (-1); } /* * _aio_do_request() checks reqp->req_op to differentiate * between 32 and 64 bit access. */ reqp->req_op = mode; reqp->req_resultp = resultp; ap = &reqp->req_args; ap->fd = fd; ap->buf = buf; ap->bufsz = bufsz; ap->offset = loffset; if (_aio_hash_insert(resultp, reqp) != 0) { _aio_req_free(reqp); errno = EINVAL; return (-1); } /* * _aio_req_add() only needs the difference between READ and * WRITE to choose the right worker queue. */ _aio_req_add(reqp, &__nextworker_rw, umode); return (0); } int aiocancel(aio_result_t *resultp) { aio_req_t *reqp; aio_worker_t *aiowp; int ret; int done = 0; int canceled = 0; if (!__uaio_ok) { errno = EINVAL; return (-1); } sig_mutex_lock(&__aio_mutex); reqp = _aio_hash_find(resultp); if (reqp == NULL) { if (_aio_outstand_cnt == _aio_req_done_cnt) errno = EINVAL; else errno = EACCES; ret = -1; } else { aiowp = reqp->req_worker; sig_mutex_lock(&aiowp->work_qlock1); (void) _aio_cancel_req(aiowp, reqp, &canceled, &done); sig_mutex_unlock(&aiowp->work_qlock1); if (canceled) { ret = 0; } else { if (_aio_outstand_cnt == 0 || _aio_outstand_cnt == _aio_req_done_cnt) errno = EINVAL; else errno = EACCES; ret = -1; } } sig_mutex_unlock(&__aio_mutex); return (ret); } /* ARGSUSED */ static void _aiowait_cleanup(void *arg) { sig_mutex_lock(&__aio_mutex); _aiowait_flag--; sig_mutex_unlock(&__aio_mutex); } /* * This must be asynch safe and cancel safe */ aio_result_t * aiowait(struct timeval *uwait) { aio_result_t *uresultp; aio_result_t *kresultp; aio_result_t *resultp; int dontblock; int timedwait = 0; int kaio_errno = 0; struct timeval twait; struct timeval *wait = NULL; hrtime_t hrtend; hrtime_t hres; if (uwait) { /* * Check for a valid specified wait time. * If it is invalid, fail the call right away. */ if (uwait->tv_sec < 0 || uwait->tv_usec < 0 || uwait->tv_usec >= MICROSEC) { errno = EINVAL; return ((aio_result_t *)-1); } if (uwait->tv_sec > 0 || uwait->tv_usec > 0) { hrtend = gethrtime() + (hrtime_t)uwait->tv_sec * NANOSEC + (hrtime_t)uwait->tv_usec * (NANOSEC / MICROSEC); twait = *uwait; wait = &twait; timedwait++; } else { /* polling */ sig_mutex_lock(&__aio_mutex); if (_kaio_outstand_cnt == 0) { kresultp = (aio_result_t *)-1; } else { kresultp = (aio_result_t *)_kaio(AIOWAIT, (struct timeval *)-1, 1); if (kresultp != (aio_result_t *)-1 && kresultp != NULL && kresultp != (aio_result_t *)1) { _kaio_outstand_cnt--; sig_mutex_unlock(&__aio_mutex); return (kresultp); } } uresultp = _aio_req_done(); sig_mutex_unlock(&__aio_mutex); if (uresultp != NULL && uresultp != (aio_result_t *)-1) { return (uresultp); } if (uresultp == (aio_result_t *)-1 && kresultp == (aio_result_t *)-1) { errno = EINVAL; return ((aio_result_t *)-1); } else { return (NULL); } } } for (;;) { sig_mutex_lock(&__aio_mutex); uresultp = _aio_req_done(); if (uresultp != NULL && uresultp != (aio_result_t *)-1) { sig_mutex_unlock(&__aio_mutex); resultp = uresultp; break; } _aiowait_flag++; dontblock = (uresultp == (aio_result_t *)-1); if (dontblock && _kaio_outstand_cnt == 0) { kresultp = (aio_result_t *)-1; kaio_errno = EINVAL; } else { sig_mutex_unlock(&__aio_mutex); pthread_cleanup_push(_aiowait_cleanup, NULL); _cancel_prologue(); kresultp = (aio_result_t *)_kaio(AIOWAIT, wait, dontblock); _cancel_epilogue(); pthread_cleanup_pop(0); sig_mutex_lock(&__aio_mutex); kaio_errno = errno; } _aiowait_flag--; sig_mutex_unlock(&__aio_mutex); if (kresultp == (aio_result_t *)1) { /* aiowait() awakened by an aionotify() */ continue; } else if (kresultp != NULL && kresultp != (aio_result_t *)-1) { resultp = kresultp; sig_mutex_lock(&__aio_mutex); _kaio_outstand_cnt--; sig_mutex_unlock(&__aio_mutex); break; } else if (kresultp == (aio_result_t *)-1 && kaio_errno == EINVAL && uresultp == (aio_result_t *)-1) { errno = kaio_errno; resultp = (aio_result_t *)-1; break; } else if (kresultp == (aio_result_t *)-1 && kaio_errno == EINTR) { errno = kaio_errno; resultp = (aio_result_t *)-1; break; } else if (timedwait) { hres = hrtend - gethrtime(); if (hres <= 0) { /* time is up; return */ resultp = NULL; break; } else { /* * Some time left. Round up the remaining time * in nanoseconds to microsec. Retry the call. */ hres += (NANOSEC / MICROSEC) - 1; wait->tv_sec = hres / NANOSEC; wait->tv_usec = (hres % NANOSEC) / (NANOSEC / MICROSEC); } } else { ASSERT(kresultp == NULL && uresultp == NULL); resultp = NULL; continue; } } return (resultp); } /* * _aio_get_timedelta calculates the remaining time and stores the result * into timespec_t *wait. */ int _aio_get_timedelta(timespec_t *end, timespec_t *wait) { int ret = 0; struct timeval cur; timespec_t curtime; (void) gettimeofday(&cur, NULL); curtime.tv_sec = cur.tv_sec; curtime.tv_nsec = cur.tv_usec * 1000; /* convert us to ns */ if (end->tv_sec >= curtime.tv_sec) { wait->tv_sec = end->tv_sec - curtime.tv_sec; if (end->tv_nsec >= curtime.tv_nsec) { wait->tv_nsec = end->tv_nsec - curtime.tv_nsec; if (wait->tv_sec == 0 && wait->tv_nsec == 0) ret = -1; /* timer expired */ } else { if (end->tv_sec > curtime.tv_sec) { wait->tv_sec -= 1; wait->tv_nsec = NANOSEC - (curtime.tv_nsec - end->tv_nsec); } else { ret = -1; /* timer expired */ } } } else { ret = -1; } return (ret); } /* * If closing by file descriptor: we will simply cancel all the outstanding * aio`s and return. Those aio's in question will have either noticed the * cancellation notice before, during, or after initiating io. */ int aiocancel_all(int fd) { aio_req_t *reqp; aio_req_t **reqpp, *last; aio_worker_t *first; aio_worker_t *next; int canceled = 0; int done = 0; int cancelall = 0; sig_mutex_lock(&__aio_mutex); if (_aio_outstand_cnt == 0) { sig_mutex_unlock(&__aio_mutex); return (AIO_ALLDONE); } /* * Cancel requests from the read/write workers' queues. */ first = __nextworker_rw; next = first; do { _aio_cancel_work(next, fd, &canceled, &done); } while ((next = next->work_forw) != first); /* * finally, check if there are requests on the done queue that * should be canceled. */ if (fd < 0) cancelall = 1; reqpp = &_aio_done_tail; last = _aio_done_tail; while ((reqp = *reqpp) != NULL) { if (cancelall || reqp->req_args.fd == fd) { *reqpp = reqp->req_next; if (last == reqp) { last = reqp->req_next; } if (_aio_done_head == reqp) { /* this should be the last req in list */ _aio_done_head = last; } _aio_donecnt--; _aio_set_result(reqp, -1, ECANCELED); (void) _aio_hash_del(reqp->req_resultp); _aio_req_free(reqp); } else { reqpp = &reqp->req_next; last = reqp; } } if (cancelall) { ASSERT(_aio_donecnt == 0); _aio_done_head = NULL; } sig_mutex_unlock(&__aio_mutex); if (canceled && done == 0) return (AIO_CANCELED); else if (done && canceled == 0) return (AIO_ALLDONE); else if ((canceled + done == 0) && KAIO_SUPPORTED(fd)) return ((int)_kaio(AIOCANCEL, fd, NULL)); return (AIO_NOTCANCELED); } /* * Cancel requests from a given work queue. If the file descriptor * parameter, fd, is non-negative, then only cancel those requests * in this queue that are to this file descriptor. If the fd * parameter is -1, then cancel all requests. */ static void _aio_cancel_work(aio_worker_t *aiowp, int fd, int *canceled, int *done) { aio_req_t *reqp; sig_mutex_lock(&aiowp->work_qlock1); /* * cancel queued requests first. */ reqp = aiowp->work_tail1; while (reqp != NULL) { if (fd < 0 || reqp->req_args.fd == fd) { if (_aio_cancel_req(aiowp, reqp, canceled, done)) { /* * Callers locks were dropped. * reqp is invalid; start traversing * the list from the beginning again. */ reqp = aiowp->work_tail1; continue; } } reqp = reqp->req_next; } /* * Since the queued requests have been canceled, there can * only be one inprogress request that should be canceled. */ if ((reqp = aiowp->work_req) != NULL && (fd < 0 || reqp->req_args.fd == fd)) (void) _aio_cancel_req(aiowp, reqp, canceled, done); sig_mutex_unlock(&aiowp->work_qlock1); } /* * Cancel a request. Return 1 if the callers locks were temporarily * dropped, otherwise return 0. */ int _aio_cancel_req(aio_worker_t *aiowp, aio_req_t *reqp, int *canceled, int *done) { int ostate = reqp->req_state; ASSERT(MUTEX_HELD(&__aio_mutex)); ASSERT(MUTEX_HELD(&aiowp->work_qlock1)); if (ostate == AIO_REQ_CANCELED) return (0); if (ostate == AIO_REQ_DONE && !POSIX_AIO(reqp) && aiowp->work_prev1 == reqp) { ASSERT(aiowp->work_done1 != 0); /* * If not on the done queue yet, just mark it CANCELED, * _aio_work_done() will do the necessary clean up. * This is required to ensure that aiocancel_all() cancels * all the outstanding requests, including this one which * is not yet on done queue but has been marked done. */ _aio_set_result(reqp, -1, ECANCELED); (void) _aio_hash_del(reqp->req_resultp); reqp->req_state = AIO_REQ_CANCELED; (*canceled)++; return (0); } if (ostate == AIO_REQ_DONE || ostate == AIO_REQ_DONEQ) { (*done)++; return (0); } if (reqp->req_op == AIOFSYNC && reqp != aiowp->work_req) { ASSERT(POSIX_AIO(reqp)); /* Cancel the queued aio_fsync() request */ if (!reqp->req_head->lio_canned) { reqp->req_head->lio_canned = 1; _aio_outstand_cnt--; (*canceled)++; } return (0); } reqp->req_state = AIO_REQ_CANCELED; _aio_req_del(aiowp, reqp, ostate); (void) _aio_hash_del(reqp->req_resultp); (*canceled)++; if (reqp == aiowp->work_req) { ASSERT(ostate == AIO_REQ_INPROGRESS); /* * Set the result values now, before _aiodone() is called. * We do this because the application can expect aio_return * and aio_errno to be set to -1 and ECANCELED, respectively, * immediately after a successful return from aiocancel() * or aio_cancel(). */ _aio_set_result(reqp, -1, ECANCELED); (void) thr_kill(aiowp->work_tid, SIGAIOCANCEL); return (0); } if (!POSIX_AIO(reqp)) { _aio_outstand_cnt--; _aio_set_result(reqp, -1, ECANCELED); _aio_req_free(reqp); return (0); } sig_mutex_unlock(&aiowp->work_qlock1); sig_mutex_unlock(&__aio_mutex); _aiodone(reqp, -1, ECANCELED); sig_mutex_lock(&__aio_mutex); sig_mutex_lock(&aiowp->work_qlock1); return (1); } int _aio_create_worker(aio_req_t *reqp, int mode) { aio_worker_t *aiowp, **workers, **nextworker; int *aio_workerscnt; void *(*func)(void *); sigset_t oset; int error; /* * Put the new worker thread in the right queue. */ switch (mode) { case AIOREAD: case AIOWRITE: case AIOAREAD: case AIOAWRITE: #if !defined(_LP64) case AIOAREAD64: case AIOAWRITE64: #endif workers = &__workers_rw; nextworker = &__nextworker_rw; aio_workerscnt = &__rw_workerscnt; func = _aio_do_request; break; case AIONOTIFY: workers = &__workers_no; nextworker = &__nextworker_no; func = _aio_do_notify; aio_workerscnt = &__no_workerscnt; break; default: aio_panic("_aio_create_worker: invalid mode"); break; } if ((aiowp = _aio_worker_alloc()) == NULL) return (-1); if (reqp) { reqp->req_state = AIO_REQ_QUEUED; reqp->req_worker = aiowp; aiowp->work_head1 = reqp; aiowp->work_tail1 = reqp; aiowp->work_next1 = reqp; aiowp->work_count1 = 1; aiowp->work_minload1 = 1; } (void) pthread_sigmask(SIG_SETMASK, &maskset, &oset); error = thr_create(NULL, AIOSTKSIZE, func, aiowp, THR_DAEMON | THR_SUSPENDED, &aiowp->work_tid); (void) pthread_sigmask(SIG_SETMASK, &oset, NULL); if (error) { if (reqp) { reqp->req_state = 0; reqp->req_worker = NULL; } _aio_worker_free(aiowp); return (-1); } lmutex_lock(&__aio_mutex); (*aio_workerscnt)++; if (*workers == NULL) { aiowp->work_forw = aiowp; aiowp->work_backw = aiowp; *nextworker = aiowp; *workers = aiowp; } else { aiowp->work_backw = (*workers)->work_backw; aiowp->work_forw = (*workers); (*workers)->work_backw->work_forw = aiowp; (*workers)->work_backw = aiowp; } _aio_worker_cnt++; lmutex_unlock(&__aio_mutex); (void) thr_continue(aiowp->work_tid); return (0); } /* * This is the worker's main routine. * The task of this function is to execute all queued requests; * once the last pending request is executed this function will block * in _aio_idle(). A new incoming request must wakeup this thread to * restart the work. * Every worker has an own work queue. The queue lock is required * to synchronize the addition of new requests for this worker or * cancellation of pending/running requests. * * Cancellation scenarios: * The cancellation of a request is being done asynchronously using * _aio_cancel_req() from another thread context. * A queued request can be cancelled in different manners : * a) request is queued but not "in progress" or "done" (AIO_REQ_QUEUED): * - lock the queue -> remove the request -> unlock the queue * - this function/thread does not detect this cancellation process * b) request is in progress (AIO_REQ_INPROGRESS) : * - this function first allow the cancellation of the running * request with the flag "work_cancel_flg=1" * see _aio_req_get() -> _aio_cancel_on() * During this phase, it is allowed to interrupt the worker * thread running the request (this thread) using the SIGAIOCANCEL * signal. * Once this thread returns from the kernel (because the request * is just done), then it must disable a possible cancellation * and proceed to finish the request. To disable the cancellation * this thread must use _aio_cancel_off() to set "work_cancel_flg=0". * c) request is already done (AIO_REQ_DONE || AIO_REQ_DONEQ): * same procedure as in a) * * To b) * This thread uses sigsetjmp() to define the position in the code, where * it wish to continue working in the case that a SIGAIOCANCEL signal * is detected. * Normally this thread should get the cancellation signal during the * kernel phase (reading or writing). In that case the signal handler * aiosigcancelhndlr() is activated using the worker thread context, * which again will use the siglongjmp() function to break the standard * code flow and jump to the "sigsetjmp" position, provided that * "work_cancel_flg" is set to "1". * Because the "work_cancel_flg" is only manipulated by this worker * thread and it can only run on one CPU at a given time, it is not * necessary to protect that flag with the queue lock. * Returning from the kernel (read or write system call) we must * first disable the use of the SIGAIOCANCEL signal and accordingly * the use of the siglongjmp() function to prevent a possible deadlock: * - It can happens that this worker thread returns from the kernel and * blocks in "work_qlock1", * - then a second thread cancels the apparently "in progress" request * and sends the SIGAIOCANCEL signal to the worker thread, * - the worker thread gets assigned the "work_qlock1" and will returns * from the kernel, * - the kernel detects the pending signal and activates the signal * handler instead, * - if the "work_cancel_flg" is still set then the signal handler * should use siglongjmp() to cancel the "in progress" request and * it would try to acquire the same work_qlock1 in _aio_req_get() * for a second time => deadlock. * To avoid that situation we disable the cancellation of the request * in progress BEFORE we try to acquire the work_qlock1. * In that case the signal handler will not call siglongjmp() and the * worker thread will continue running the standard code flow. * Then this thread must check the AIO_REQ_CANCELED flag to emulate * an eventually required siglongjmp() freeing the work_qlock1 and * avoiding a deadlock. */ void * _aio_do_request(void *arglist) { aio_worker_t *aiowp = (aio_worker_t *)arglist; ulwp_t *self = curthread; struct aio_args *arg; aio_req_t *reqp; /* current AIO request */ ssize_t retval; int append; int error; if (pthread_setspecific(_aio_key, aiowp) != 0) aio_panic("_aio_do_request, pthread_setspecific()"); (void) pthread_sigmask(SIG_SETMASK, &_worker_set, NULL); ASSERT(aiowp->work_req == NULL); /* * We resume here when an operation is cancelled. * On first entry, aiowp->work_req == NULL, so all * we do is block SIGAIOCANCEL. */ (void) sigsetjmp(aiowp->work_jmp_buf, 0); ASSERT(self->ul_sigdefer == 0); sigoff(self); /* block SIGAIOCANCEL */ if (aiowp->work_req != NULL) _aio_finish_request(aiowp, -1, ECANCELED); for (;;) { /* * Put completed requests on aio_done_list. This has * to be done as part of the main loop to ensure that * we don't artificially starve any aiowait'ers. */ if (aiowp->work_done1) _aio_work_done(aiowp); top: /* consume any deferred SIGAIOCANCEL signal here */ sigon(self); sigoff(self); while ((reqp = _aio_req_get(aiowp)) == NULL) { if (_aio_idle(aiowp) != 0) goto top; } arg = &reqp->req_args; ASSERT(reqp->req_state == AIO_REQ_INPROGRESS || reqp->req_state == AIO_REQ_CANCELED); error = 0; switch (reqp->req_op) { case AIOREAD: case AIOAREAD: sigon(self); /* unblock SIGAIOCANCEL */ retval = pread(arg->fd, arg->buf, arg->bufsz, arg->offset); if (retval == -1) { if (errno == ESPIPE) { retval = read(arg->fd, arg->buf, arg->bufsz); if (retval == -1) error = errno; } else { error = errno; } } sigoff(self); /* block SIGAIOCANCEL */ break; case AIOWRITE: case AIOAWRITE: /* * The SUSv3 POSIX spec for aio_write() states: * If O_APPEND is set for the file descriptor, * write operations append to the file in the * same order as the calls were made. * but, somewhat inconsistently, it requires pwrite() * to ignore the O_APPEND setting. So we have to use * fcntl() to get the open modes and call write() for * the O_APPEND case. */ append = (__fcntl(arg->fd, F_GETFL) & O_APPEND); sigon(self); /* unblock SIGAIOCANCEL */ retval = append? write(arg->fd, arg->buf, arg->bufsz) : pwrite(arg->fd, arg->buf, arg->bufsz, arg->offset); if (retval == -1) { if (errno == ESPIPE) { retval = write(arg->fd, arg->buf, arg->bufsz); if (retval == -1) error = errno; } else { error = errno; } } sigoff(self); /* block SIGAIOCANCEL */ break; #if !defined(_LP64) case AIOAREAD64: sigon(self); /* unblock SIGAIOCANCEL */ retval = pread64(arg->fd, arg->buf, arg->bufsz, arg->offset); if (retval == -1) { if (errno == ESPIPE) { retval = read(arg->fd, arg->buf, arg->bufsz); if (retval == -1) error = errno; } else { error = errno; } } sigoff(self); /* block SIGAIOCANCEL */ break; case AIOAWRITE64: /* * The SUSv3 POSIX spec for aio_write() states: * If O_APPEND is set for the file descriptor, * write operations append to the file in the * same order as the calls were made. * but, somewhat inconsistently, it requires pwrite() * to ignore the O_APPEND setting. So we have to use * fcntl() to get the open modes and call write() for * the O_APPEND case. */ append = (__fcntl(arg->fd, F_GETFL) & O_APPEND); sigon(self); /* unblock SIGAIOCANCEL */ retval = append? write(arg->fd, arg->buf, arg->bufsz) : pwrite64(arg->fd, arg->buf, arg->bufsz, arg->offset); if (retval == -1) { if (errno == ESPIPE) { retval = write(arg->fd, arg->buf, arg->bufsz); if (retval == -1) error = errno; } else { error = errno; } } sigoff(self); /* block SIGAIOCANCEL */ break; #endif /* !defined(_LP64) */ case AIOFSYNC: if (_aio_fsync_del(aiowp, reqp)) goto top; ASSERT(reqp->req_head == NULL); /* * All writes for this fsync request are now * acknowledged. Now make these writes visible * and put the final request into the hash table. */ if (reqp->req_state == AIO_REQ_CANCELED) { /* EMPTY */; } else if (arg->offset == O_SYNC) { if ((retval = __fdsync(arg->fd, FSYNC)) == -1) error = errno; } else { if ((retval = __fdsync(arg->fd, FDSYNC)) == -1) error = errno; } if (_aio_hash_insert(reqp->req_resultp, reqp) != 0) aio_panic("_aio_do_request(): AIOFSYNC: " "request already in hash table"); break; default: aio_panic("_aio_do_request, bad op"); } _aio_finish_request(aiowp, retval, error); } /* NOTREACHED */ return (NULL); } /* * Perform the tail processing for _aio_do_request(). * The in-progress request may or may not have been cancelled. */ static void _aio_finish_request(aio_worker_t *aiowp, ssize_t retval, int error) { aio_req_t *reqp; sig_mutex_lock(&aiowp->work_qlock1); if ((reqp = aiowp->work_req) == NULL) sig_mutex_unlock(&aiowp->work_qlock1); else { aiowp->work_req = NULL; if (reqp->req_state == AIO_REQ_CANCELED) { retval = -1; error = ECANCELED; } if (!POSIX_AIO(reqp)) { int notify; if (reqp->req_state == AIO_REQ_INPROGRESS) { reqp->req_state = AIO_REQ_DONE; _aio_set_result(reqp, retval, error); } sig_mutex_unlock(&aiowp->work_qlock1); sig_mutex_lock(&__aio_mutex); /* * If it was canceled, this request will not be * added to done list. Just free it. */ if (error == ECANCELED) { _aio_outstand_cnt--; _aio_req_free(reqp); } else { _aio_req_done_cnt++; } /* * Notify any thread that may have blocked * because it saw an outstanding request. */ notify = 0; if (_aio_outstand_cnt == 0 && _aiowait_flag) { notify = 1; } sig_mutex_unlock(&__aio_mutex); if (notify) { (void) _kaio(AIONOTIFY); } } else { if (reqp->req_state == AIO_REQ_INPROGRESS) reqp->req_state = AIO_REQ_DONE; sig_mutex_unlock(&aiowp->work_qlock1); _aiodone(reqp, retval, error); } } } void _aio_req_mark_done(aio_req_t *reqp) { #if !defined(_LP64) if (reqp->req_largefile) ((aiocb64_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE; else #endif ((aiocb_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE; } /* * Sleep for 'ticks' clock ticks to give somebody else a chance to run, * hopefully to consume one of our queued signals. */ static void _aio_delay(int ticks) { (void) usleep(ticks * (MICROSEC / hz)); } /* * Actually send the notifications. * We could block indefinitely here if the application * is not listening for the signal or port notifications. */ static void send_notification(notif_param_t *npp) { extern int __sigqueue(pid_t pid, int signo, /* const union sigval */ void *value, int si_code, int block); if (npp->np_signo) (void) __sigqueue(__pid, npp->np_signo, npp->np_user, SI_ASYNCIO, 1); else if (npp->np_port >= 0) (void) _port_dispatch(npp->np_port, 0, PORT_SOURCE_AIO, npp->np_event, npp->np_object, npp->np_user); if (npp->np_lio_signo) (void) __sigqueue(__pid, npp->np_lio_signo, npp->np_lio_user, SI_ASYNCIO, 1); else if (npp->np_lio_port >= 0) (void) _port_dispatch(npp->np_lio_port, 0, PORT_SOURCE_AIO, npp->np_lio_event, npp->np_lio_object, npp->np_lio_user); } /* * Asynchronous notification worker. */ void * _aio_do_notify(void *arg) { aio_worker_t *aiowp = (aio_worker_t *)arg; aio_req_t *reqp; /* * This isn't really necessary. All signals are blocked. */ if (pthread_setspecific(_aio_key, aiowp) != 0) aio_panic("_aio_do_notify, pthread_setspecific()"); /* * Notifications are never cancelled. * All signals remain blocked, forever. */ for (;;) { while ((reqp = _aio_req_get(aiowp)) == NULL) { if (_aio_idle(aiowp) != 0) aio_panic("_aio_do_notify: _aio_idle() failed"); } send_notification(&reqp->req_notify); _aio_req_free(reqp); } /* NOTREACHED */ return (NULL); } /* * Do the completion semantics for a request that was either canceled * by _aio_cancel_req() or was completed by _aio_do_request(). */ static void _aiodone(aio_req_t *reqp, ssize_t retval, int error) { aio_result_t *resultp = reqp->req_resultp; int notify = 0; aio_lio_t *head; int sigev_none; int sigev_signal; int sigev_thread; int sigev_port; notif_param_t np; /* * We call _aiodone() only for Posix I/O. */ ASSERT(POSIX_AIO(reqp)); sigev_none = 0; sigev_signal = 0; sigev_thread = 0; sigev_port = 0; np.np_signo = 0; np.np_port = -1; np.np_lio_signo = 0; np.np_lio_port = -1; switch (reqp->req_sigevent.sigev_notify) { case SIGEV_NONE: sigev_none = 1; break; case SIGEV_SIGNAL: sigev_signal = 1; break; case SIGEV_THREAD: sigev_thread = 1; break; case SIGEV_PORT: sigev_port = 1; break; default: aio_panic("_aiodone: improper sigev_notify"); break; } /* * Figure out the notification parameters while holding __aio_mutex. * Actually perform the notifications after dropping __aio_mutex. * This allows us to sleep for a long time (if the notifications * incur delays) without impeding other async I/O operations. */ sig_mutex_lock(&__aio_mutex); if (sigev_signal) { if ((np.np_signo = reqp->req_sigevent.sigev_signo) != 0) notify = 1; np.np_user = reqp->req_sigevent.sigev_value.sival_ptr; } else if (sigev_thread | sigev_port) { if ((np.np_port = reqp->req_sigevent.sigev_signo) >= 0) notify = 1; np.np_event = reqp->req_op; if (np.np_event == AIOFSYNC && reqp->req_largefile) np.np_event = AIOFSYNC64; np.np_object = (uintptr_t)reqp->req_aiocbp; np.np_user = reqp->req_sigevent.sigev_value.sival_ptr; } if (resultp->aio_errno == EINPROGRESS) _aio_set_result(reqp, retval, error); _aio_outstand_cnt--; head = reqp->req_head; reqp->req_head = NULL; if (sigev_none) { _aio_enq_doneq(reqp); reqp = NULL; } else { (void) _aio_hash_del(resultp); _aio_req_mark_done(reqp); } _aio_waitn_wakeup(); /* * __aio_waitn() sets AIO_WAIT_INPROGRESS and * __aio_suspend() increments "_aio_kernel_suspend" * when they are waiting in the kernel for completed I/Os. * * _kaio(AIONOTIFY) awakes the corresponding function * in the kernel; then the corresponding __aio_waitn() or * __aio_suspend() function could reap the recently * completed I/Os (_aiodone()). */ if ((_aio_flags & AIO_WAIT_INPROGRESS) || _aio_kernel_suspend > 0) (void) _kaio(AIONOTIFY); sig_mutex_unlock(&__aio_mutex); if (head != NULL) { /* * If all the lio requests have completed, * prepare to notify the waiting thread. */ sig_mutex_lock(&head->lio_mutex); ASSERT(head->lio_refcnt == head->lio_nent); if (head->lio_refcnt == 1) { int waiting = 0; if (head->lio_mode == LIO_WAIT) { if ((waiting = head->lio_waiting) != 0) (void) cond_signal(&head->lio_cond_cv); } else if (head->lio_port < 0) { /* none or signal */ if ((np.np_lio_signo = head->lio_signo) != 0) notify = 1; np.np_lio_user = head->lio_sigval.sival_ptr; } else { /* thread or port */ notify = 1; np.np_lio_port = head->lio_port; np.np_lio_event = head->lio_event; np.np_lio_object = (uintptr_t)head->lio_sigevent; np.np_lio_user = head->lio_sigval.sival_ptr; } head->lio_nent = head->lio_refcnt = 0; sig_mutex_unlock(&head->lio_mutex); if (waiting == 0) _aio_lio_free(head); } else { head->lio_nent--; head->lio_refcnt--; sig_mutex_unlock(&head->lio_mutex); } } /* * The request is completed; now perform the notifications. */ if (notify) { if (reqp != NULL) { /* * We usually put the request on the notification * queue because we don't want to block and delay * other operations behind us in the work queue. * Also we must never block on a cancel notification * because we are being called from an application * thread in this case and that could lead to deadlock * if no other thread is receiving notificatins. */ reqp->req_notify = np; reqp->req_op = AIONOTIFY; _aio_req_add(reqp, &__workers_no, AIONOTIFY); reqp = NULL; } else { /* * We already put the request on the done queue, * so we can't queue it to the notification queue. * Just do the notification directly. */ send_notification(&np); } } if (reqp != NULL) _aio_req_free(reqp); } /* * Delete fsync requests from list head until there is * only one left. Return 0 when there is only one, * otherwise return a non-zero value. */ static int _aio_fsync_del(aio_worker_t *aiowp, aio_req_t *reqp) { aio_lio_t *head = reqp->req_head; int rval = 0; ASSERT(reqp == aiowp->work_req); sig_mutex_lock(&aiowp->work_qlock1); sig_mutex_lock(&head->lio_mutex); if (head->lio_refcnt > 1) { head->lio_refcnt--; head->lio_nent--; aiowp->work_req = NULL; sig_mutex_unlock(&head->lio_mutex); sig_mutex_unlock(&aiowp->work_qlock1); sig_mutex_lock(&__aio_mutex); _aio_outstand_cnt--; _aio_waitn_wakeup(); sig_mutex_unlock(&__aio_mutex); _aio_req_free(reqp); return (1); } ASSERT(head->lio_nent == 1 && head->lio_refcnt == 1); reqp->req_head = NULL; if (head->lio_canned) reqp->req_state = AIO_REQ_CANCELED; if (head->lio_mode == LIO_DESTROY) { aiowp->work_req = NULL; rval = 1; } sig_mutex_unlock(&head->lio_mutex); sig_mutex_unlock(&aiowp->work_qlock1); head->lio_refcnt--; head->lio_nent--; _aio_lio_free(head); if (rval != 0) _aio_req_free(reqp); return (rval); } /* * A worker is set idle when its work queue is empty. * The worker checks again that it has no more work * and then goes to sleep waiting for more work. */ int _aio_idle(aio_worker_t *aiowp) { int error = 0; sig_mutex_lock(&aiowp->work_qlock1); if (aiowp->work_count1 == 0) { ASSERT(aiowp->work_minload1 == 0); aiowp->work_idleflg = 1; /* * A cancellation handler is not needed here. * aio worker threads are never cancelled via pthread_cancel(). */ error = sig_cond_wait(&aiowp->work_idle_cv, &aiowp->work_qlock1); /* * The idle flag is normally cleared before worker is awakened * by aio_req_add(). On error (EINTR), we clear it ourself. */ if (error) aiowp->work_idleflg = 0; } sig_mutex_unlock(&aiowp->work_qlock1); return (error); } /* * A worker's completed AIO requests are placed onto a global * done queue. The application is only sent a SIGIO signal if * the process has a handler enabled and it is not waiting via * aiowait(). */ static void _aio_work_done(aio_worker_t *aiowp) { aio_req_t *reqp; sig_mutex_lock(&__aio_mutex); sig_mutex_lock(&aiowp->work_qlock1); reqp = aiowp->work_prev1; reqp->req_next = NULL; aiowp->work_done1 = 0; aiowp->work_tail1 = aiowp->work_next1; if (aiowp->work_tail1 == NULL) aiowp->work_head1 = NULL; aiowp->work_prev1 = NULL; _aio_outstand_cnt--; _aio_req_done_cnt--; if (reqp->req_state == AIO_REQ_CANCELED) { /* * Request got cancelled after it was marked done. This can * happen because _aio_finish_request() marks it AIO_REQ_DONE * and drops all locks. Don't add the request to the done * queue and just discard it. */ sig_mutex_unlock(&aiowp->work_qlock1); _aio_req_free(reqp); if (_aio_outstand_cnt == 0 && _aiowait_flag) { sig_mutex_unlock(&__aio_mutex); (void) _kaio(AIONOTIFY); } else { sig_mutex_unlock(&__aio_mutex); } return; } sig_mutex_unlock(&aiowp->work_qlock1); _aio_donecnt++; ASSERT(_aio_donecnt > 0 && _aio_outstand_cnt >= 0 && _aio_req_done_cnt >= 0); ASSERT(reqp != NULL); if (_aio_done_tail == NULL) { _aio_done_head = _aio_done_tail = reqp; } else { _aio_done_head->req_next = reqp; _aio_done_head = reqp; } if (_aiowait_flag) { sig_mutex_unlock(&__aio_mutex); (void) _kaio(AIONOTIFY); } else { sig_mutex_unlock(&__aio_mutex); if (_sigio_enabled) (void) kill(__pid, SIGIO); } } /* * The done queue consists of AIO requests that are in either the * AIO_REQ_DONE or AIO_REQ_CANCELED state. Requests that were cancelled * are discarded. If the done queue is empty then NULL is returned. * Otherwise the address of a done aio_result_t is returned. */ aio_result_t * _aio_req_done(void) { aio_req_t *reqp; aio_result_t *resultp; ASSERT(MUTEX_HELD(&__aio_mutex)); if ((reqp = _aio_done_tail) != NULL) { if ((_aio_done_tail = reqp->req_next) == NULL) _aio_done_head = NULL; ASSERT(_aio_donecnt > 0); _aio_donecnt--; (void) _aio_hash_del(reqp->req_resultp); resultp = reqp->req_resultp; ASSERT(reqp->req_state == AIO_REQ_DONE); _aio_req_free(reqp); return (resultp); } /* is queue empty? */ if (reqp == NULL && _aio_outstand_cnt == 0) { return ((aio_result_t *)-1); } return (NULL); } /* * Set the return and errno values for the application's use. * * For the Posix interfaces, we must set the return value first followed * by the errno value because the Posix interfaces allow for a change * in the errno value from EINPROGRESS to something else to signal * the completion of the asynchronous request. * * The opposite is true for the Solaris interfaces. These allow for * a change in the return value from AIO_INPROGRESS to something else * to signal the completion of the asynchronous request. */ void _aio_set_result(aio_req_t *reqp, ssize_t retval, int error) { aio_result_t *resultp = reqp->req_resultp; if (POSIX_AIO(reqp)) { resultp->aio_return = retval; membar_producer(); resultp->aio_errno = error; } else { resultp->aio_errno = error; membar_producer(); resultp->aio_return = retval; } } /* * Add an AIO request onto the next work queue. * A circular list of workers is used to choose the next worker. */ void _aio_req_add(aio_req_t *reqp, aio_worker_t **nextworker, int mode) { ulwp_t *self = curthread; aio_worker_t *aiowp; aio_worker_t *first; int load_bal_flg = 1; int found; ASSERT(reqp->req_state != AIO_REQ_DONEQ); reqp->req_next = NULL; /* * Try to acquire the next worker's work queue. If it is locked, * then search the list of workers until a queue is found unlocked, * or until the list is completely traversed at which point another * worker will be created. */ sigoff(self); /* defer SIGIO */ sig_mutex_lock(&__aio_mutex); first = aiowp = *nextworker; if (mode != AIONOTIFY) _aio_outstand_cnt++; sig_mutex_unlock(&__aio_mutex); switch (mode) { case AIOREAD: case AIOWRITE: case AIOAREAD: case AIOAWRITE: #if !defined(_LP64) case AIOAREAD64: case AIOAWRITE64: #endif /* try to find an idle worker */ found = 0; do { if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) { if (aiowp->work_idleflg) { found = 1; break; } sig_mutex_unlock(&aiowp->work_qlock1); } } while ((aiowp = aiowp->work_forw) != first); if (found) { aiowp->work_minload1++; break; } /* try to acquire some worker's queue lock */ do { if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) { found = 1; break; } } while ((aiowp = aiowp->work_forw) != first); /* * Create more workers when the workers appear overloaded. * Either all the workers are busy draining their queues * or no worker's queue lock could be acquired. */ if (!found) { if (_aio_worker_cnt < _max_workers) { if (_aio_create_worker(reqp, mode)) aio_panic("_aio_req_add: add worker"); sigon(self); /* reenable SIGIO */ return; } /* * No worker available and we have created * _max_workers, keep going through the * list slowly until we get a lock */ while (sig_mutex_trylock(&aiowp->work_qlock1) != 0) { /* * give someone else a chance */ _aio_delay(1); aiowp = aiowp->work_forw; } } ASSERT(MUTEX_HELD(&aiowp->work_qlock1)); if (_aio_worker_cnt < _max_workers && aiowp->work_minload1 >= _minworkload) { sig_mutex_unlock(&aiowp->work_qlock1); sig_mutex_lock(&__aio_mutex); *nextworker = aiowp->work_forw; sig_mutex_unlock(&__aio_mutex); if (_aio_create_worker(reqp, mode)) aio_panic("aio_req_add: add worker"); sigon(self); /* reenable SIGIO */ return; } aiowp->work_minload1++; break; case AIOFSYNC: case AIONOTIFY: load_bal_flg = 0; sig_mutex_lock(&aiowp->work_qlock1); break; default: aio_panic("_aio_req_add: invalid mode"); break; } /* * Put request onto worker's work queue. */ if (aiowp->work_tail1 == NULL) { ASSERT(aiowp->work_count1 == 0); aiowp->work_tail1 = reqp; aiowp->work_next1 = reqp; } else { aiowp->work_head1->req_next = reqp; if (aiowp->work_next1 == NULL) aiowp->work_next1 = reqp; } reqp->req_state = AIO_REQ_QUEUED; reqp->req_worker = aiowp; aiowp->work_head1 = reqp; /* * Awaken worker if it is not currently active. */ if (aiowp->work_count1++ == 0 && aiowp->work_idleflg) { aiowp->work_idleflg = 0; (void) cond_signal(&aiowp->work_idle_cv); } sig_mutex_unlock(&aiowp->work_qlock1); if (load_bal_flg) { sig_mutex_lock(&__aio_mutex); *nextworker = aiowp->work_forw; sig_mutex_unlock(&__aio_mutex); } sigon(self); /* reenable SIGIO */ } /* * Get an AIO request for a specified worker. * If the work queue is empty, return NULL. */ aio_req_t * _aio_req_get(aio_worker_t *aiowp) { aio_req_t *reqp; sig_mutex_lock(&aiowp->work_qlock1); if ((reqp = aiowp->work_next1) != NULL) { /* * Remove a POSIX request from the queue; the * request queue is a singularly linked list * with a previous pointer. The request is * removed by updating the previous pointer. * * Non-posix requests are left on the queue * to eventually be placed on the done queue. */ if (POSIX_AIO(reqp)) { if (aiowp->work_prev1 == NULL) { aiowp->work_tail1 = reqp->req_next; if (aiowp->work_tail1 == NULL) aiowp->work_head1 = NULL; } else { aiowp->work_prev1->req_next = reqp->req_next; if (aiowp->work_head1 == reqp) aiowp->work_head1 = reqp->req_next; } } else { aiowp->work_prev1 = reqp; ASSERT(aiowp->work_done1 >= 0); aiowp->work_done1++; } ASSERT(reqp != reqp->req_next); aiowp->work_next1 = reqp->req_next; ASSERT(aiowp->work_count1 >= 1); aiowp->work_count1--; switch (reqp->req_op) { case AIOREAD: case AIOWRITE: case AIOAREAD: case AIOAWRITE: #if !defined(_LP64) case AIOAREAD64: case AIOAWRITE64: #endif ASSERT(aiowp->work_minload1 > 0); aiowp->work_minload1--; break; } reqp->req_state = AIO_REQ_INPROGRESS; } aiowp->work_req = reqp; ASSERT(reqp != NULL || aiowp->work_count1 == 0); sig_mutex_unlock(&aiowp->work_qlock1); return (reqp); } static void _aio_req_del(aio_worker_t *aiowp, aio_req_t *reqp, int ostate) { aio_req_t **last; aio_req_t *lastrp; aio_req_t *next; ASSERT(aiowp != NULL); ASSERT(MUTEX_HELD(&aiowp->work_qlock1)); if (POSIX_AIO(reqp)) { if (ostate != AIO_REQ_QUEUED) return; } last = &aiowp->work_tail1; lastrp = aiowp->work_tail1; ASSERT(ostate == AIO_REQ_QUEUED || ostate == AIO_REQ_INPROGRESS); while ((next = *last) != NULL) { if (next == reqp) { *last = next->req_next; if (aiowp->work_next1 == next) aiowp->work_next1 = next->req_next; /* * if this is the first request on the queue, move * the lastrp pointer forward. */ if (lastrp == next) lastrp = next->req_next; /* * if this request is pointed by work_head1, then * make work_head1 point to the last request that is * present on the queue. */ if (aiowp->work_head1 == next) aiowp->work_head1 = lastrp; /* * work_prev1 is used only in non posix case and it * points to the current AIO_REQ_INPROGRESS request. * If work_prev1 points to this request which is being * deleted, make work_prev1 NULL and set work_done1 * to 0. * * A worker thread can be processing only one request * at a time. */ if (aiowp->work_prev1 == next) { ASSERT(ostate == AIO_REQ_INPROGRESS && !POSIX_AIO(reqp) && aiowp->work_done1 > 0); aiowp->work_prev1 = NULL; aiowp->work_done1--; } if (ostate == AIO_REQ_QUEUED) { ASSERT(aiowp->work_count1 >= 1); aiowp->work_count1--; ASSERT(aiowp->work_minload1 >= 1); aiowp->work_minload1--; } return; } last = &next->req_next; lastrp = next; } /* NOTREACHED */ } static void _aio_enq_doneq(aio_req_t *reqp) { if (_aio_doneq == NULL) { _aio_doneq = reqp; reqp->req_next = reqp->req_prev = reqp; } else { reqp->req_next = _aio_doneq; reqp->req_prev = _aio_doneq->req_prev; _aio_doneq->req_prev->req_next = reqp; _aio_doneq->req_prev = reqp; } reqp->req_state = AIO_REQ_DONEQ; _aio_doneq_cnt++; } /* * caller owns the _aio_mutex */ aio_req_t * _aio_req_remove(aio_req_t *reqp) { if (reqp && reqp->req_state != AIO_REQ_DONEQ) return (NULL); if (reqp) { /* request in done queue */ if (_aio_doneq == reqp) _aio_doneq = reqp->req_next; if (_aio_doneq == reqp) { /* only one request on queue */ _aio_doneq = NULL; } else { aio_req_t *tmp = reqp->req_next; reqp->req_prev->req_next = tmp; tmp->req_prev = reqp->req_prev; } } else if ((reqp = _aio_doneq) != NULL) { if (reqp == reqp->req_next) { /* only one request on queue */ _aio_doneq = NULL; } else { reqp->req_prev->req_next = _aio_doneq = reqp->req_next; _aio_doneq->req_prev = reqp->req_prev; } } if (reqp) { _aio_doneq_cnt--; reqp->req_next = reqp->req_prev = reqp; reqp->req_state = AIO_REQ_DONE; } return (reqp); } /* * An AIO request is identified by an aio_result_t pointer. The library * maps this aio_result_t pointer to its internal representation using a * hash table. This function adds an aio_result_t pointer to the hash table. */ static int _aio_hash_insert(aio_result_t *resultp, aio_req_t *reqp) { aio_hash_t *hashp; aio_req_t **prev; aio_req_t *next; hashp = _aio_hash + AIOHASH(resultp); lmutex_lock(&hashp->hash_lock); prev = &hashp->hash_ptr; while ((next = *prev) != NULL) { if (resultp == next->req_resultp) { lmutex_unlock(&hashp->hash_lock); return (-1); } prev = &next->req_link; } *prev = reqp; ASSERT(reqp->req_link == NULL); lmutex_unlock(&hashp->hash_lock); return (0); } /* * Remove an entry from the hash table. */ aio_req_t * _aio_hash_del(aio_result_t *resultp) { aio_hash_t *hashp; aio_req_t **prev; aio_req_t *next = NULL; if (_aio_hash != NULL) { hashp = _aio_hash + AIOHASH(resultp); lmutex_lock(&hashp->hash_lock); prev = &hashp->hash_ptr; while ((next = *prev) != NULL) { if (resultp == next->req_resultp) { *prev = next->req_link; next->req_link = NULL; break; } prev = &next->req_link; } lmutex_unlock(&hashp->hash_lock); } return (next); } /* * find an entry in the hash table */ aio_req_t * _aio_hash_find(aio_result_t *resultp) { aio_hash_t *hashp; aio_req_t **prev; aio_req_t *next = NULL; if (_aio_hash != NULL) { hashp = _aio_hash + AIOHASH(resultp); lmutex_lock(&hashp->hash_lock); prev = &hashp->hash_ptr; while ((next = *prev) != NULL) { if (resultp == next->req_resultp) break; prev = &next->req_link; } lmutex_unlock(&hashp->hash_lock); } return (next); } /* * AIO interface for POSIX */ int _aio_rw(aiocb_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker, int mode, int flg) { aio_req_t *reqp; aio_args_t *ap; int kerr; if (aiocbp == NULL) { errno = EINVAL; return (-1); } /* initialize kaio */ if (!_kaio_ok) _kaio_init(); aiocbp->aio_state = NOCHECK; /* * If we have been called because a list I/O * kaio() failed, we dont want to repeat the * system call */ if (flg & AIO_KAIO) { /* * Try kernel aio first. * If errno is ENOTSUP/EBADFD, * fall back to the thread implementation. */ if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) { aiocbp->aio_resultp.aio_errno = EINPROGRESS; aiocbp->aio_state = CHECK; kerr = (int)_kaio(mode, aiocbp); if (kerr == 0) return (0); if (errno != ENOTSUP && errno != EBADFD) { aiocbp->aio_resultp.aio_errno = errno; aiocbp->aio_resultp.aio_return = -1; aiocbp->aio_state = NOCHECK; return (-1); } if (errno == EBADFD) SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes); } } aiocbp->aio_resultp.aio_errno = EINPROGRESS; aiocbp->aio_state = USERAIO; if (!__uaio_ok && __uaio_init() == -1) return (-1); if ((reqp = _aio_req_alloc()) == NULL) { errno = EAGAIN; return (-1); } /* * If an LIO request, add the list head to the aio request */ reqp->req_head = lio_head; reqp->req_type = AIO_POSIX_REQ; reqp->req_op = mode; reqp->req_largefile = 0; if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) { reqp->req_sigevent.sigev_notify = SIGEV_NONE; } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) { reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL; reqp->req_sigevent.sigev_signo = aiocbp->aio_sigevent.sigev_signo; reqp->req_sigevent.sigev_value.sival_ptr = aiocbp->aio_sigevent.sigev_value.sival_ptr; } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) { port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr; reqp->req_sigevent.sigev_notify = SIGEV_PORT; /* * Reuse the sigevent structure to contain the port number * and the user value. Same for SIGEV_THREAD, below. */ reqp->req_sigevent.sigev_signo = pn->portnfy_port; reqp->req_sigevent.sigev_value.sival_ptr = pn->portnfy_user; } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) { reqp->req_sigevent.sigev_notify = SIGEV_THREAD; /* * The sigevent structure contains the port number * and the user value. Same for SIGEV_PORT, above. */ reqp->req_sigevent.sigev_signo = aiocbp->aio_sigevent.sigev_signo; reqp->req_sigevent.sigev_value.sival_ptr = aiocbp->aio_sigevent.sigev_value.sival_ptr; } reqp->req_resultp = &aiocbp->aio_resultp; reqp->req_aiocbp = aiocbp; ap = &reqp->req_args; ap->fd = aiocbp->aio_fildes; ap->buf = (caddr_t)aiocbp->aio_buf; ap->bufsz = aiocbp->aio_nbytes; ap->offset = aiocbp->aio_offset; if ((flg & AIO_NO_DUPS) && _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) { aio_panic("_aio_rw(): request already in hash table"); _aio_req_free(reqp); errno = EINVAL; return (-1); } _aio_req_add(reqp, nextworker, mode); return (0); } #if !defined(_LP64) /* * 64-bit AIO interface for POSIX */ int _aio_rw64(aiocb64_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker, int mode, int flg) { aio_req_t *reqp; aio_args_t *ap; int kerr; if (aiocbp == NULL) { errno = EINVAL; return (-1); } /* initialize kaio */ if (!_kaio_ok) _kaio_init(); aiocbp->aio_state = NOCHECK; /* * If we have been called because a list I/O * kaio() failed, we dont want to repeat the * system call */ if (flg & AIO_KAIO) { /* * Try kernel aio first. * If errno is ENOTSUP/EBADFD, * fall back to the thread implementation. */ if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) { aiocbp->aio_resultp.aio_errno = EINPROGRESS; aiocbp->aio_state = CHECK; kerr = (int)_kaio(mode, aiocbp); if (kerr == 0) return (0); if (errno != ENOTSUP && errno != EBADFD) { aiocbp->aio_resultp.aio_errno = errno; aiocbp->aio_resultp.aio_return = -1; aiocbp->aio_state = NOCHECK; return (-1); } if (errno == EBADFD) SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes); } } aiocbp->aio_resultp.aio_errno = EINPROGRESS; aiocbp->aio_state = USERAIO; if (!__uaio_ok && __uaio_init() == -1) return (-1); if ((reqp = _aio_req_alloc()) == NULL) { errno = EAGAIN; return (-1); } /* * If an LIO request, add the list head to the aio request */ reqp->req_head = lio_head; reqp->req_type = AIO_POSIX_REQ; reqp->req_op = mode; reqp->req_largefile = 1; if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) { reqp->req_sigevent.sigev_notify = SIGEV_NONE; } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) { reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL; reqp->req_sigevent.sigev_signo = aiocbp->aio_sigevent.sigev_signo; reqp->req_sigevent.sigev_value.sival_ptr = aiocbp->aio_sigevent.sigev_value.sival_ptr; } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) { port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr; reqp->req_sigevent.sigev_notify = SIGEV_PORT; reqp->req_sigevent.sigev_signo = pn->portnfy_port; reqp->req_sigevent.sigev_value.sival_ptr = pn->portnfy_user; } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) { reqp->req_sigevent.sigev_notify = SIGEV_THREAD; reqp->req_sigevent.sigev_signo = aiocbp->aio_sigevent.sigev_signo; reqp->req_sigevent.sigev_value.sival_ptr = aiocbp->aio_sigevent.sigev_value.sival_ptr; } reqp->req_resultp = &aiocbp->aio_resultp; reqp->req_aiocbp = aiocbp; ap = &reqp->req_args; ap->fd = aiocbp->aio_fildes; ap->buf = (caddr_t)aiocbp->aio_buf; ap->bufsz = aiocbp->aio_nbytes; ap->offset = aiocbp->aio_offset; if ((flg & AIO_NO_DUPS) && _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) { aio_panic("_aio_rw64(): request already in hash table"); _aio_req_free(reqp); errno = EINVAL; return (-1); } _aio_req_add(reqp, nextworker, mode); return (0); } #endif /* !defined(_LP64) */