xref: /illumos-gate/usr/src/lib/libc/port/aio/aio.c (revision 34b3058f)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 
22 /*
23  * Copyright 2007 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 
27 #pragma ident	"%Z%%M%	%I%	%E% SMI"
28 
29 #include "synonyms.h"
30 #include "thr_uberdata.h"
31 #include "asyncio.h"
32 #include <atomic.h>
33 #include <sys/param.h>
34 #include <sys/file.h>
35 #include <sys/port.h>
36 
37 static int _aio_hash_insert(aio_result_t *, aio_req_t *);
38 static aio_req_t *_aio_req_get(aio_worker_t *);
39 static void _aio_req_add(aio_req_t *, aio_worker_t **, int);
40 static void _aio_req_del(aio_worker_t *, aio_req_t *, int);
41 static void _aio_work_done(aio_worker_t *);
42 static void _aio_enq_doneq(aio_req_t *);
43 
44 extern void _aio_lio_free(aio_lio_t *);
45 
46 extern int __fdsync(int, int);
47 extern int _port_dispatch(int, int, int, int, uintptr_t, void *);
48 
49 static int _aio_fsync_del(aio_worker_t *, aio_req_t *);
50 static void _aiodone(aio_req_t *, ssize_t, int);
51 static void _aio_cancel_work(aio_worker_t *, int, int *, int *);
52 static void _aio_finish_request(aio_worker_t *, ssize_t, int);
53 
54 /*
55  * switch for kernel async I/O
56  */
57 int _kaio_ok = 0;		/* 0 = disabled, 1 = on, -1 = error */
58 
59 /*
60  * Key for thread-specific data
61  */
62 pthread_key_t _aio_key;
63 
64 /*
65  * Array for determining whether or not a file supports kaio.
66  * Initialized in _kaio_init().
67  */
68 uint32_t *_kaio_supported = NULL;
69 
70 /*
71  *  workers for read/write requests
72  * (__aio_mutex lock protects circular linked list of workers)
73  */
74 aio_worker_t *__workers_rw;	/* circular list of AIO workers */
75 aio_worker_t *__nextworker_rw;	/* next worker in list of workers */
76 int __rw_workerscnt;		/* number of read/write workers */
77 
78 /*
79  * worker for notification requests.
80  */
81 aio_worker_t *__workers_no;	/* circular list of AIO workers */
82 aio_worker_t *__nextworker_no;	/* next worker in list of workers */
83 int __no_workerscnt;		/* number of write workers */
84 
85 aio_req_t *_aio_done_tail;		/* list of done requests */
86 aio_req_t *_aio_done_head;
87 
88 mutex_t __aio_initlock = DEFAULTMUTEX;	/* makes aio initialization atomic */
89 cond_t __aio_initcv = DEFAULTCV;
90 int __aio_initbusy = 0;
91 
92 mutex_t __aio_mutex = DEFAULTMUTEX;	/* protects counts, and linked lists */
93 cond_t _aio_iowait_cv = DEFAULTCV;	/* wait for userland I/Os */
94 
95 pid_t __pid = (pid_t)-1;		/* initialize as invalid pid */
96 int _sigio_enabled = 0;			/* when set, send SIGIO signal */
97 
98 aio_hash_t *_aio_hash;
99 
100 aio_req_t *_aio_doneq;			/* double linked done queue list */
101 
102 int _aio_donecnt = 0;
103 int _aio_waitncnt = 0;			/* # of requests for aio_waitn */
104 int _aio_doneq_cnt = 0;
105 int _aio_outstand_cnt = 0;		/* # of outstanding requests */
106 int _kaio_outstand_cnt = 0;		/* # of outstanding kaio requests */
107 int _aio_req_done_cnt = 0;		/* req. done but not in "done queue" */
108 int _aio_kernel_suspend = 0;		/* active kernel kaio calls */
109 int _aio_suscv_cnt = 0;			/* aio_suspend calls waiting on cv's */
110 
111 int _max_workers = 256;			/* max number of workers permitted */
112 int _min_workers = 4;			/* min number of workers */
113 int _minworkload = 2;			/* min number of request in q */
114 int _aio_worker_cnt = 0;		/* number of workers to do requests */
115 int __uaio_ok = 0;			/* AIO has been enabled */
116 sigset_t _worker_set;			/* worker's signal mask */
117 
118 int _aiowait_flag = 0;			/* when set, aiowait() is inprogress */
119 int _aio_flags = 0;			/* see asyncio.h defines for */
120 
121 aio_worker_t *_kaiowp = NULL;		/* points to kaio cleanup thread */
122 
123 int hz;					/* clock ticks per second */
124 
125 static int
126 _kaio_supported_init(void)
127 {
128 	void *ptr;
129 	size_t size;
130 
131 	if (_kaio_supported != NULL)	/* already initialized */
132 		return (0);
133 
134 	size = MAX_KAIO_FDARRAY_SIZE * sizeof (uint32_t);
135 	ptr = mmap(NULL, size, PROT_READ | PROT_WRITE,
136 	    MAP_PRIVATE | MAP_ANON, -1, (off_t)0);
137 	if (ptr == MAP_FAILED)
138 		return (-1);
139 	_kaio_supported = ptr;
140 	return (0);
141 }
142 
143 /*
144  * The aio subsystem is initialized when an AIO request is made.
145  * Constants are initialized like the max number of workers that
146  * the subsystem can create, and the minimum number of workers
147  * permitted before imposing some restrictions.  Also, some
148  * workers are created.
149  */
150 int
151 __uaio_init(void)
152 {
153 	int ret = -1;
154 	int i;
155 
156 	lmutex_lock(&__aio_initlock);
157 	while (__aio_initbusy)
158 		(void) _cond_wait(&__aio_initcv, &__aio_initlock);
159 	if (__uaio_ok) {	/* already initialized */
160 		lmutex_unlock(&__aio_initlock);
161 		return (0);
162 	}
163 	__aio_initbusy = 1;
164 	lmutex_unlock(&__aio_initlock);
165 
166 	hz = (int)sysconf(_SC_CLK_TCK);
167 	__pid = getpid();
168 
169 	setup_cancelsig(SIGAIOCANCEL);
170 
171 	if (_kaio_supported_init() != 0)
172 		goto out;
173 
174 	/*
175 	 * Allocate and initialize the hash table.
176 	 * Do this only once, even if __uaio_init() is called twice.
177 	 */
178 	if (_aio_hash == NULL) {
179 		/* LINTED pointer cast */
180 		_aio_hash = (aio_hash_t *)mmap(NULL,
181 		    HASHSZ * sizeof (aio_hash_t), PROT_READ | PROT_WRITE,
182 		    MAP_PRIVATE | MAP_ANON, -1, (off_t)0);
183 		if ((void *)_aio_hash == MAP_FAILED) {
184 			_aio_hash = NULL;
185 			goto out;
186 		}
187 		for (i = 0; i < HASHSZ; i++)
188 			(void) mutex_init(&_aio_hash[i].hash_lock,
189 			    USYNC_THREAD, NULL);
190 	}
191 
192 	/*
193 	 * Initialize worker's signal mask to only catch SIGAIOCANCEL.
194 	 */
195 	(void) sigfillset(&_worker_set);
196 	(void) sigdelset(&_worker_set, SIGAIOCANCEL);
197 
198 	/*
199 	 * Create one worker to send asynchronous notifications.
200 	 * Do this only once, even if __uaio_init() is called twice.
201 	 */
202 	if (__no_workerscnt == 0 &&
203 	    (_aio_create_worker(NULL, AIONOTIFY) != 0)) {
204 		errno = EAGAIN;
205 		goto out;
206 	}
207 
208 	/*
209 	 * Create the minimum number of read/write workers.
210 	 * And later check whether atleast one worker is created;
211 	 * lwp_create() calls could fail because of segkp exhaustion.
212 	 */
213 	for (i = 0; i < _min_workers; i++)
214 		(void) _aio_create_worker(NULL, AIOREAD);
215 	if (__rw_workerscnt == 0) {
216 		errno = EAGAIN;
217 		goto out;
218 	}
219 
220 	ret = 0;
221 out:
222 	lmutex_lock(&__aio_initlock);
223 	if (ret == 0)
224 		__uaio_ok = 1;
225 	__aio_initbusy = 0;
226 	(void) cond_broadcast(&__aio_initcv);
227 	lmutex_unlock(&__aio_initlock);
228 	return (ret);
229 }
230 
231 /*
232  * Called from close() before actually performing the real _close().
233  */
234 void
235 _aio_close(int fd)
236 {
237 	if (fd < 0)	/* avoid cancelling everything */
238 		return;
239 	/*
240 	 * Cancel all outstanding aio requests for this file descriptor.
241 	 */
242 	if (__uaio_ok)
243 		(void) aiocancel_all(fd);
244 	/*
245 	 * If we have allocated the bit array, clear the bit for this file.
246 	 * The next open may re-use this file descriptor and the new file
247 	 * may have different kaio() behaviour.
248 	 */
249 	if (_kaio_supported != NULL)
250 		CLEAR_KAIO_SUPPORTED(fd);
251 }
252 
253 /*
254  * special kaio cleanup thread sits in a loop in the
255  * kernel waiting for pending kaio requests to complete.
256  */
257 void *
258 _kaio_cleanup_thread(void *arg)
259 {
260 	if (pthread_setspecific(_aio_key, arg) != 0)
261 		aio_panic("_kaio_cleanup_thread, pthread_setspecific()");
262 	(void) _kaio(AIOSTART);
263 	return (arg);
264 }
265 
266 /*
267  * initialize kaio.
268  */
269 void
270 _kaio_init()
271 {
272 	int error;
273 	sigset_t oset;
274 
275 	lmutex_lock(&__aio_initlock);
276 	while (__aio_initbusy)
277 		(void) _cond_wait(&__aio_initcv, &__aio_initlock);
278 	if (_kaio_ok) {		/* already initialized */
279 		lmutex_unlock(&__aio_initlock);
280 		return;
281 	}
282 	__aio_initbusy = 1;
283 	lmutex_unlock(&__aio_initlock);
284 
285 	if (_kaio_supported_init() != 0)
286 		error = ENOMEM;
287 	else if ((_kaiowp = _aio_worker_alloc()) == NULL)
288 		error = ENOMEM;
289 	else if ((error = (int)_kaio(AIOINIT)) == 0) {
290 		(void) pthread_sigmask(SIG_SETMASK, &maskset, &oset);
291 		error = thr_create(NULL, AIOSTKSIZE, _kaio_cleanup_thread,
292 		    _kaiowp, THR_DAEMON, &_kaiowp->work_tid);
293 		(void) pthread_sigmask(SIG_SETMASK, &oset, NULL);
294 	}
295 	if (error && _kaiowp != NULL) {
296 		_aio_worker_free(_kaiowp);
297 		_kaiowp = NULL;
298 	}
299 
300 	lmutex_lock(&__aio_initlock);
301 	if (error)
302 		_kaio_ok = -1;
303 	else
304 		_kaio_ok = 1;
305 	__aio_initbusy = 0;
306 	(void) cond_broadcast(&__aio_initcv);
307 	lmutex_unlock(&__aio_initlock);
308 }
309 
310 int
311 aioread(int fd, caddr_t buf, int bufsz, off_t offset, int whence,
312     aio_result_t *resultp)
313 {
314 	return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOREAD));
315 }
316 
317 int
318 aiowrite(int fd, caddr_t buf, int bufsz, off_t offset, int whence,
319     aio_result_t *resultp)
320 {
321 	return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOWRITE));
322 }
323 
324 #if !defined(_LP64)
325 int
326 aioread64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence,
327     aio_result_t *resultp)
328 {
329 	return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAREAD64));
330 }
331 
332 int
333 aiowrite64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence,
334     aio_result_t *resultp)
335 {
336 	return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAWRITE64));
337 }
338 #endif	/* !defined(_LP64) */
339 
340 int
341 _aiorw(int fd, caddr_t buf, int bufsz, offset_t offset, int whence,
342     aio_result_t *resultp, int mode)
343 {
344 	aio_req_t *reqp;
345 	aio_args_t *ap;
346 	offset_t loffset;
347 	struct stat stat;
348 	int error = 0;
349 	int kerr;
350 	int umode;
351 
352 	switch (whence) {
353 
354 	case SEEK_SET:
355 		loffset = offset;
356 		break;
357 	case SEEK_CUR:
358 		if ((loffset = llseek(fd, 0, SEEK_CUR)) == -1)
359 			error = -1;
360 		else
361 			loffset += offset;
362 		break;
363 	case SEEK_END:
364 		if (fstat(fd, &stat) == -1)
365 			error = -1;
366 		else
367 			loffset = offset + stat.st_size;
368 		break;
369 	default:
370 		errno = EINVAL;
371 		error = -1;
372 	}
373 
374 	if (error)
375 		return (error);
376 
377 	/* initialize kaio */
378 	if (!_kaio_ok)
379 		_kaio_init();
380 
381 	/*
382 	 * _aio_do_request() needs the original request code (mode) to be able
383 	 * to choose the appropiate 32/64 bit function.  All other functions
384 	 * only require the difference between READ and WRITE (umode).
385 	 */
386 	if (mode == AIOAREAD64 || mode == AIOAWRITE64)
387 		umode = mode - AIOAREAD64;
388 	else
389 		umode = mode;
390 
391 	/*
392 	 * Try kernel aio first.
393 	 * If errno is ENOTSUP/EBADFD, fall back to the thread implementation.
394 	 */
395 	if (_kaio_ok > 0 && KAIO_SUPPORTED(fd)) {
396 		resultp->aio_errno = 0;
397 		sig_mutex_lock(&__aio_mutex);
398 		_kaio_outstand_cnt++;
399 		kerr = (int)_kaio(((resultp->aio_return == AIO_INPROGRESS) ?
400 		    (umode | AIO_POLL_BIT) : umode),
401 		    fd, buf, bufsz, loffset, resultp);
402 		if (kerr == 0) {
403 			sig_mutex_unlock(&__aio_mutex);
404 			return (0);
405 		}
406 		_kaio_outstand_cnt--;
407 		sig_mutex_unlock(&__aio_mutex);
408 		if (errno != ENOTSUP && errno != EBADFD)
409 			return (-1);
410 		if (errno == EBADFD)
411 			SET_KAIO_NOT_SUPPORTED(fd);
412 	}
413 
414 	if (!__uaio_ok && __uaio_init() == -1)
415 		return (-1);
416 
417 	if ((reqp = _aio_req_alloc()) == NULL) {
418 		errno = EAGAIN;
419 		return (-1);
420 	}
421 
422 	/*
423 	 * _aio_do_request() checks reqp->req_op to differentiate
424 	 * between 32 and 64 bit access.
425 	 */
426 	reqp->req_op = mode;
427 	reqp->req_resultp = resultp;
428 	ap = &reqp->req_args;
429 	ap->fd = fd;
430 	ap->buf = buf;
431 	ap->bufsz = bufsz;
432 	ap->offset = loffset;
433 
434 	if (_aio_hash_insert(resultp, reqp) != 0) {
435 		_aio_req_free(reqp);
436 		errno = EINVAL;
437 		return (-1);
438 	}
439 	/*
440 	 * _aio_req_add() only needs the difference between READ and
441 	 * WRITE to choose the right worker queue.
442 	 */
443 	_aio_req_add(reqp, &__nextworker_rw, umode);
444 	return (0);
445 }
446 
447 int
448 aiocancel(aio_result_t *resultp)
449 {
450 	aio_req_t *reqp;
451 	aio_worker_t *aiowp;
452 	int ret;
453 	int done = 0;
454 	int canceled = 0;
455 
456 	if (!__uaio_ok) {
457 		errno = EINVAL;
458 		return (-1);
459 	}
460 
461 	sig_mutex_lock(&__aio_mutex);
462 	reqp = _aio_hash_find(resultp);
463 	if (reqp == NULL) {
464 		if (_aio_outstand_cnt == _aio_req_done_cnt)
465 			errno = EINVAL;
466 		else
467 			errno = EACCES;
468 		ret = -1;
469 	} else {
470 		aiowp = reqp->req_worker;
471 		sig_mutex_lock(&aiowp->work_qlock1);
472 		(void) _aio_cancel_req(aiowp, reqp, &canceled, &done);
473 		sig_mutex_unlock(&aiowp->work_qlock1);
474 
475 		if (canceled) {
476 			ret = 0;
477 		} else {
478 			if (_aio_outstand_cnt == 0 ||
479 			    _aio_outstand_cnt == _aio_req_done_cnt)
480 				errno = EINVAL;
481 			else
482 				errno = EACCES;
483 			ret = -1;
484 		}
485 	}
486 	sig_mutex_unlock(&__aio_mutex);
487 	return (ret);
488 }
489 
490 /*
491  * This must be asynch safe
492  */
493 aio_result_t *
494 aiowait(struct timeval *uwait)
495 {
496 	aio_result_t *uresultp;
497 	aio_result_t *kresultp;
498 	aio_result_t *resultp;
499 	int dontblock;
500 	int timedwait = 0;
501 	int kaio_errno = 0;
502 	struct timeval twait;
503 	struct timeval *wait = NULL;
504 	hrtime_t hrtend;
505 	hrtime_t hres;
506 
507 	if (uwait) {
508 		/*
509 		 * Check for a valid specified wait time.
510 		 * If it is invalid, fail the call right away.
511 		 */
512 		if (uwait->tv_sec < 0 || uwait->tv_usec < 0 ||
513 		    uwait->tv_usec >= MICROSEC) {
514 			errno = EINVAL;
515 			return ((aio_result_t *)-1);
516 		}
517 
518 		if (uwait->tv_sec > 0 || uwait->tv_usec > 0) {
519 			hrtend = gethrtime() +
520 			    (hrtime_t)uwait->tv_sec * NANOSEC +
521 			    (hrtime_t)uwait->tv_usec * (NANOSEC / MICROSEC);
522 			twait = *uwait;
523 			wait = &twait;
524 			timedwait++;
525 		} else {
526 			/* polling */
527 			sig_mutex_lock(&__aio_mutex);
528 			if (_kaio_outstand_cnt == 0) {
529 				kresultp = (aio_result_t *)-1;
530 			} else {
531 				kresultp = (aio_result_t *)_kaio(AIOWAIT,
532 				    (struct timeval *)-1, 1);
533 				if (kresultp != (aio_result_t *)-1 &&
534 				    kresultp != NULL &&
535 				    kresultp != (aio_result_t *)1) {
536 					_kaio_outstand_cnt--;
537 					sig_mutex_unlock(&__aio_mutex);
538 					return (kresultp);
539 				}
540 			}
541 			uresultp = _aio_req_done();
542 			sig_mutex_unlock(&__aio_mutex);
543 			if (uresultp != NULL &&
544 			    uresultp != (aio_result_t *)-1) {
545 				return (uresultp);
546 			}
547 			if (uresultp == (aio_result_t *)-1 &&
548 			    kresultp == (aio_result_t *)-1) {
549 				errno = EINVAL;
550 				return ((aio_result_t *)-1);
551 			} else {
552 				return (NULL);
553 			}
554 		}
555 	}
556 
557 	for (;;) {
558 		sig_mutex_lock(&__aio_mutex);
559 		uresultp = _aio_req_done();
560 		if (uresultp != NULL && uresultp != (aio_result_t *)-1) {
561 			sig_mutex_unlock(&__aio_mutex);
562 			resultp = uresultp;
563 			break;
564 		}
565 		_aiowait_flag++;
566 		dontblock = (uresultp == (aio_result_t *)-1);
567 		if (dontblock && _kaio_outstand_cnt == 0) {
568 			kresultp = (aio_result_t *)-1;
569 			kaio_errno = EINVAL;
570 		} else {
571 			sig_mutex_unlock(&__aio_mutex);
572 			kresultp = (aio_result_t *)_kaio(AIOWAIT,
573 			    wait, dontblock);
574 			sig_mutex_lock(&__aio_mutex);
575 			kaio_errno = errno;
576 		}
577 		_aiowait_flag--;
578 		sig_mutex_unlock(&__aio_mutex);
579 		if (kresultp == (aio_result_t *)1) {
580 			/* aiowait() awakened by an aionotify() */
581 			continue;
582 		} else if (kresultp != NULL &&
583 		    kresultp != (aio_result_t *)-1) {
584 			resultp = kresultp;
585 			sig_mutex_lock(&__aio_mutex);
586 			_kaio_outstand_cnt--;
587 			sig_mutex_unlock(&__aio_mutex);
588 			break;
589 		} else if (kresultp == (aio_result_t *)-1 &&
590 		    kaio_errno == EINVAL &&
591 		    uresultp == (aio_result_t *)-1) {
592 			errno = kaio_errno;
593 			resultp = (aio_result_t *)-1;
594 			break;
595 		} else if (kresultp == (aio_result_t *)-1 &&
596 		    kaio_errno == EINTR) {
597 			errno = kaio_errno;
598 			resultp = (aio_result_t *)-1;
599 			break;
600 		} else if (timedwait) {
601 			hres = hrtend - gethrtime();
602 			if (hres <= 0) {
603 				/* time is up; return */
604 				resultp = NULL;
605 				break;
606 			} else {
607 				/*
608 				 * Some time left.  Round up the remaining time
609 				 * in nanoseconds to microsec.  Retry the call.
610 				 */
611 				hres += (NANOSEC / MICROSEC) - 1;
612 				wait->tv_sec = hres / NANOSEC;
613 				wait->tv_usec =
614 				    (hres % NANOSEC) / (NANOSEC / MICROSEC);
615 			}
616 		} else {
617 			ASSERT(kresultp == NULL && uresultp == NULL);
618 			resultp = NULL;
619 			continue;
620 		}
621 	}
622 	return (resultp);
623 }
624 
625 /*
626  * _aio_get_timedelta calculates the remaining time and stores the result
627  * into timespec_t *wait.
628  */
629 
630 int
631 _aio_get_timedelta(timespec_t *end, timespec_t *wait)
632 {
633 	int	ret = 0;
634 	struct	timeval cur;
635 	timespec_t curtime;
636 
637 	(void) gettimeofday(&cur, NULL);
638 	curtime.tv_sec = cur.tv_sec;
639 	curtime.tv_nsec = cur.tv_usec * 1000;   /* convert us to ns */
640 
641 	if (end->tv_sec >= curtime.tv_sec) {
642 		wait->tv_sec = end->tv_sec - curtime.tv_sec;
643 		if (end->tv_nsec >= curtime.tv_nsec) {
644 			wait->tv_nsec = end->tv_nsec - curtime.tv_nsec;
645 			if (wait->tv_sec == 0 && wait->tv_nsec == 0)
646 				ret = -1;	/* timer expired */
647 		} else {
648 			if (end->tv_sec > curtime.tv_sec) {
649 				wait->tv_sec -= 1;
650 				wait->tv_nsec = NANOSEC -
651 				    (curtime.tv_nsec - end->tv_nsec);
652 			} else {
653 				ret = -1;	/* timer expired */
654 			}
655 		}
656 	} else {
657 		ret = -1;
658 	}
659 	return (ret);
660 }
661 
662 /*
663  * If closing by file descriptor: we will simply cancel all the outstanding
664  * aio`s and return.  Those aio's in question will have either noticed the
665  * cancellation notice before, during, or after initiating io.
666  */
667 int
668 aiocancel_all(int fd)
669 {
670 	aio_req_t *reqp;
671 	aio_req_t **reqpp;
672 	aio_worker_t *first;
673 	aio_worker_t *next;
674 	int canceled = 0;
675 	int done = 0;
676 	int cancelall = 0;
677 
678 	sig_mutex_lock(&__aio_mutex);
679 
680 	if (_aio_outstand_cnt == 0) {
681 		sig_mutex_unlock(&__aio_mutex);
682 		return (AIO_ALLDONE);
683 	}
684 
685 	/*
686 	 * Cancel requests from the read/write workers' queues.
687 	 */
688 	first = __nextworker_rw;
689 	next = first;
690 	do {
691 		_aio_cancel_work(next, fd, &canceled, &done);
692 	} while ((next = next->work_forw) != first);
693 
694 	/*
695 	 * finally, check if there are requests on the done queue that
696 	 * should be canceled.
697 	 */
698 	if (fd < 0)
699 		cancelall = 1;
700 	reqpp = &_aio_done_tail;
701 	while ((reqp = *reqpp) != NULL) {
702 		if (cancelall || reqp->req_args.fd == fd) {
703 			*reqpp = reqp->req_next;
704 			_aio_donecnt--;
705 			(void) _aio_hash_del(reqp->req_resultp);
706 			_aio_req_free(reqp);
707 		} else
708 			reqpp = &reqp->req_next;
709 	}
710 	if (cancelall) {
711 		ASSERT(_aio_donecnt == 0);
712 		_aio_done_head = NULL;
713 	}
714 	sig_mutex_unlock(&__aio_mutex);
715 
716 	if (canceled && done == 0)
717 		return (AIO_CANCELED);
718 	else if (done && canceled == 0)
719 		return (AIO_ALLDONE);
720 	else if ((canceled + done == 0) && KAIO_SUPPORTED(fd))
721 		return ((int)_kaio(AIOCANCEL, fd, NULL));
722 	return (AIO_NOTCANCELED);
723 }
724 
725 /*
726  * Cancel requests from a given work queue.  If the file descriptor
727  * parameter, fd, is non-negative, then only cancel those requests
728  * in this queue that are to this file descriptor.  If the fd
729  * parameter is -1, then cancel all requests.
730  */
731 static void
732 _aio_cancel_work(aio_worker_t *aiowp, int fd, int *canceled, int *done)
733 {
734 	aio_req_t *reqp;
735 
736 	sig_mutex_lock(&aiowp->work_qlock1);
737 	/*
738 	 * cancel queued requests first.
739 	 */
740 	reqp = aiowp->work_tail1;
741 	while (reqp != NULL) {
742 		if (fd < 0 || reqp->req_args.fd == fd) {
743 			if (_aio_cancel_req(aiowp, reqp, canceled, done)) {
744 				/*
745 				 * Callers locks were dropped.
746 				 * reqp is invalid; start traversing
747 				 * the list from the beginning again.
748 				 */
749 				reqp = aiowp->work_tail1;
750 				continue;
751 			}
752 		}
753 		reqp = reqp->req_next;
754 	}
755 	/*
756 	 * Since the queued requests have been canceled, there can
757 	 * only be one inprogress request that should be canceled.
758 	 */
759 	if ((reqp = aiowp->work_req) != NULL &&
760 	    (fd < 0 || reqp->req_args.fd == fd))
761 		(void) _aio_cancel_req(aiowp, reqp, canceled, done);
762 	sig_mutex_unlock(&aiowp->work_qlock1);
763 }
764 
765 /*
766  * Cancel a request.  Return 1 if the callers locks were temporarily
767  * dropped, otherwise return 0.
768  */
769 int
770 _aio_cancel_req(aio_worker_t *aiowp, aio_req_t *reqp, int *canceled, int *done)
771 {
772 	int ostate = reqp->req_state;
773 
774 	ASSERT(MUTEX_HELD(&__aio_mutex));
775 	ASSERT(MUTEX_HELD(&aiowp->work_qlock1));
776 	if (ostate == AIO_REQ_CANCELED)
777 		return (0);
778 	if (ostate == AIO_REQ_DONE || ostate == AIO_REQ_DONEQ) {
779 		(*done)++;
780 		return (0);
781 	}
782 	if (reqp->req_op == AIOFSYNC && reqp != aiowp->work_req) {
783 		ASSERT(POSIX_AIO(reqp));
784 		/* Cancel the queued aio_fsync() request */
785 		if (!reqp->req_head->lio_canned) {
786 			reqp->req_head->lio_canned = 1;
787 			_aio_outstand_cnt--;
788 			(*canceled)++;
789 		}
790 		return (0);
791 	}
792 	reqp->req_state = AIO_REQ_CANCELED;
793 	_aio_req_del(aiowp, reqp, ostate);
794 	(void) _aio_hash_del(reqp->req_resultp);
795 	(*canceled)++;
796 	if (reqp == aiowp->work_req) {
797 		ASSERT(ostate == AIO_REQ_INPROGRESS);
798 		/*
799 		 * Set the result values now, before _aiodone() is called.
800 		 * We do this because the application can expect aio_return
801 		 * and aio_errno to be set to -1 and ECANCELED, respectively,
802 		 * immediately after a successful return from aiocancel()
803 		 * or aio_cancel().
804 		 */
805 		_aio_set_result(reqp, -1, ECANCELED);
806 		(void) thr_kill(aiowp->work_tid, SIGAIOCANCEL);
807 		return (0);
808 	}
809 	if (!POSIX_AIO(reqp)) {
810 		_aio_outstand_cnt--;
811 		_aio_set_result(reqp, -1, ECANCELED);
812 		return (0);
813 	}
814 	sig_mutex_unlock(&aiowp->work_qlock1);
815 	sig_mutex_unlock(&__aio_mutex);
816 	_aiodone(reqp, -1, ECANCELED);
817 	sig_mutex_lock(&__aio_mutex);
818 	sig_mutex_lock(&aiowp->work_qlock1);
819 	return (1);
820 }
821 
822 int
823 _aio_create_worker(aio_req_t *reqp, int mode)
824 {
825 	aio_worker_t *aiowp, **workers, **nextworker;
826 	int *aio_workerscnt;
827 	void *(*func)(void *);
828 	sigset_t oset;
829 	int error;
830 
831 	/*
832 	 * Put the new worker thread in the right queue.
833 	 */
834 	switch (mode) {
835 	case AIOREAD:
836 	case AIOWRITE:
837 	case AIOAREAD:
838 	case AIOAWRITE:
839 #if !defined(_LP64)
840 	case AIOAREAD64:
841 	case AIOAWRITE64:
842 #endif
843 		workers = &__workers_rw;
844 		nextworker = &__nextworker_rw;
845 		aio_workerscnt = &__rw_workerscnt;
846 		func = _aio_do_request;
847 		break;
848 	case AIONOTIFY:
849 		workers = &__workers_no;
850 		nextworker = &__nextworker_no;
851 		func = _aio_do_notify;
852 		aio_workerscnt = &__no_workerscnt;
853 		break;
854 	default:
855 		aio_panic("_aio_create_worker: invalid mode");
856 		break;
857 	}
858 
859 	if ((aiowp = _aio_worker_alloc()) == NULL)
860 		return (-1);
861 
862 	if (reqp) {
863 		reqp->req_state = AIO_REQ_QUEUED;
864 		reqp->req_worker = aiowp;
865 		aiowp->work_head1 = reqp;
866 		aiowp->work_tail1 = reqp;
867 		aiowp->work_next1 = reqp;
868 		aiowp->work_count1 = 1;
869 		aiowp->work_minload1 = 1;
870 	}
871 
872 	(void) pthread_sigmask(SIG_SETMASK, &maskset, &oset);
873 	error = thr_create(NULL, AIOSTKSIZE, func, aiowp,
874 	    THR_DAEMON | THR_SUSPENDED, &aiowp->work_tid);
875 	(void) pthread_sigmask(SIG_SETMASK, &oset, NULL);
876 	if (error) {
877 		if (reqp) {
878 			reqp->req_state = 0;
879 			reqp->req_worker = NULL;
880 		}
881 		_aio_worker_free(aiowp);
882 		return (-1);
883 	}
884 
885 	lmutex_lock(&__aio_mutex);
886 	(*aio_workerscnt)++;
887 	if (*workers == NULL) {
888 		aiowp->work_forw = aiowp;
889 		aiowp->work_backw = aiowp;
890 		*nextworker = aiowp;
891 		*workers = aiowp;
892 	} else {
893 		aiowp->work_backw = (*workers)->work_backw;
894 		aiowp->work_forw = (*workers);
895 		(*workers)->work_backw->work_forw = aiowp;
896 		(*workers)->work_backw = aiowp;
897 	}
898 	_aio_worker_cnt++;
899 	lmutex_unlock(&__aio_mutex);
900 
901 	(void) thr_continue(aiowp->work_tid);
902 
903 	return (0);
904 }
905 
906 /*
907  * This is the worker's main routine.
908  * The task of this function is to execute all queued requests;
909  * once the last pending request is executed this function will block
910  * in _aio_idle().  A new incoming request must wakeup this thread to
911  * restart the work.
912  * Every worker has an own work queue.  The queue lock is required
913  * to synchronize the addition of new requests for this worker or
914  * cancellation of pending/running requests.
915  *
916  * Cancellation scenarios:
917  * The cancellation of a request is being done asynchronously using
918  * _aio_cancel_req() from another thread context.
919  * A queued request can be cancelled in different manners :
920  * a) request is queued but not "in progress" or "done" (AIO_REQ_QUEUED):
921  *	- lock the queue -> remove the request -> unlock the queue
922  *	- this function/thread does not detect this cancellation process
923  * b) request is in progress (AIO_REQ_INPROGRESS) :
924  *	- this function first allow the cancellation of the running
925  *	  request with the flag "work_cancel_flg=1"
926  * 		see _aio_req_get() -> _aio_cancel_on()
927  *	  During this phase, it is allowed to interrupt the worker
928  *	  thread running the request (this thread) using the SIGAIOCANCEL
929  *	  signal.
930  *	  Once this thread returns from the kernel (because the request
931  *	  is just done), then it must disable a possible cancellation
932  *	  and proceed to finish the request.  To disable the cancellation
933  *	  this thread must use _aio_cancel_off() to set "work_cancel_flg=0".
934  * c) request is already done (AIO_REQ_DONE || AIO_REQ_DONEQ):
935  *	  same procedure as in a)
936  *
937  * To b)
938  *	This thread uses sigsetjmp() to define the position in the code, where
939  *	it wish to continue working in the case that a SIGAIOCANCEL signal
940  *	is detected.
941  *	Normally this thread should get the cancellation signal during the
942  *	kernel phase (reading or writing).  In that case the signal handler
943  *	aiosigcancelhndlr() is activated using the worker thread context,
944  *	which again will use the siglongjmp() function to break the standard
945  *	code flow and jump to the "sigsetjmp" position, provided that
946  *	"work_cancel_flg" is set to "1".
947  *	Because the "work_cancel_flg" is only manipulated by this worker
948  *	thread and it can only run on one CPU at a given time, it is not
949  *	necessary to protect that flag with the queue lock.
950  *	Returning from the kernel (read or write system call) we must
951  *	first disable the use of the SIGAIOCANCEL signal and accordingly
952  *	the use of the siglongjmp() function to prevent a possible deadlock:
953  *	- It can happens that this worker thread returns from the kernel and
954  *	  blocks in "work_qlock1",
955  *	- then a second thread cancels the apparently "in progress" request
956  *	  and sends the SIGAIOCANCEL signal to the worker thread,
957  *	- the worker thread gets assigned the "work_qlock1" and will returns
958  *	  from the kernel,
959  *	- the kernel detects the pending signal and activates the signal
960  *	  handler instead,
961  *	- if the "work_cancel_flg" is still set then the signal handler
962  *	  should use siglongjmp() to cancel the "in progress" request and
963  *	  it would try to acquire the same work_qlock1 in _aio_req_get()
964  *	  for a second time => deadlock.
965  *	To avoid that situation we disable the cancellation of the request
966  *	in progress BEFORE we try to acquire the work_qlock1.
967  *	In that case the signal handler will not call siglongjmp() and the
968  *	worker thread will continue running the standard code flow.
969  *	Then this thread must check the AIO_REQ_CANCELED flag to emulate
970  *	an eventually required siglongjmp() freeing the work_qlock1 and
971  *	avoiding a deadlock.
972  */
973 void *
974 _aio_do_request(void *arglist)
975 {
976 	aio_worker_t *aiowp = (aio_worker_t *)arglist;
977 	ulwp_t *self = curthread;
978 	struct aio_args *arg;
979 	aio_req_t *reqp;		/* current AIO request */
980 	ssize_t retval;
981 	int error;
982 
983 	if (pthread_setspecific(_aio_key, aiowp) != 0)
984 		aio_panic("_aio_do_request, pthread_setspecific()");
985 	(void) pthread_sigmask(SIG_SETMASK, &_worker_set, NULL);
986 	ASSERT(aiowp->work_req == NULL);
987 
988 	/*
989 	 * We resume here when an operation is cancelled.
990 	 * On first entry, aiowp->work_req == NULL, so all
991 	 * we do is block SIGAIOCANCEL.
992 	 */
993 	(void) sigsetjmp(aiowp->work_jmp_buf, 0);
994 	ASSERT(self->ul_sigdefer == 0);
995 
996 	sigoff(self);	/* block SIGAIOCANCEL */
997 	if (aiowp->work_req != NULL)
998 		_aio_finish_request(aiowp, -1, ECANCELED);
999 
1000 	for (;;) {
1001 		/*
1002 		 * Put completed requests on aio_done_list.  This has
1003 		 * to be done as part of the main loop to ensure that
1004 		 * we don't artificially starve any aiowait'ers.
1005 		 */
1006 		if (aiowp->work_done1)
1007 			_aio_work_done(aiowp);
1008 
1009 top:
1010 		/* consume any deferred SIGAIOCANCEL signal here */
1011 		sigon(self);
1012 		sigoff(self);
1013 
1014 		while ((reqp = _aio_req_get(aiowp)) == NULL) {
1015 			if (_aio_idle(aiowp) != 0)
1016 				goto top;
1017 		}
1018 		arg = &reqp->req_args;
1019 		ASSERT(reqp->req_state == AIO_REQ_INPROGRESS ||
1020 		    reqp->req_state == AIO_REQ_CANCELED);
1021 		error = 0;
1022 
1023 		switch (reqp->req_op) {
1024 		case AIOREAD:
1025 		case AIOAREAD:
1026 			sigon(self);	/* unblock SIGAIOCANCEL */
1027 			retval = pread(arg->fd, arg->buf,
1028 			    arg->bufsz, arg->offset);
1029 			if (retval == -1) {
1030 				if (errno == ESPIPE) {
1031 					retval = read(arg->fd,
1032 					    arg->buf, arg->bufsz);
1033 					if (retval == -1)
1034 						error = errno;
1035 				} else {
1036 					error = errno;
1037 				}
1038 			}
1039 			sigoff(self);	/* block SIGAIOCANCEL */
1040 			break;
1041 		case AIOWRITE:
1042 		case AIOAWRITE:
1043 			sigon(self);	/* unblock SIGAIOCANCEL */
1044 			retval = pwrite(arg->fd, arg->buf,
1045 			    arg->bufsz, arg->offset);
1046 			if (retval == -1) {
1047 				if (errno == ESPIPE) {
1048 					retval = write(arg->fd,
1049 					    arg->buf, arg->bufsz);
1050 					if (retval == -1)
1051 						error = errno;
1052 				} else {
1053 					error = errno;
1054 				}
1055 			}
1056 			sigoff(self);	/* block SIGAIOCANCEL */
1057 			break;
1058 #if !defined(_LP64)
1059 		case AIOAREAD64:
1060 			sigon(self);	/* unblock SIGAIOCANCEL */
1061 			retval = pread64(arg->fd, arg->buf,
1062 			    arg->bufsz, arg->offset);
1063 			if (retval == -1) {
1064 				if (errno == ESPIPE) {
1065 					retval = read(arg->fd,
1066 					    arg->buf, arg->bufsz);
1067 					if (retval == -1)
1068 						error = errno;
1069 				} else {
1070 					error = errno;
1071 				}
1072 			}
1073 			sigoff(self);	/* block SIGAIOCANCEL */
1074 			break;
1075 		case AIOAWRITE64:
1076 			sigon(self);	/* unblock SIGAIOCANCEL */
1077 			retval = pwrite64(arg->fd, arg->buf,
1078 			    arg->bufsz, arg->offset);
1079 			if (retval == -1) {
1080 				if (errno == ESPIPE) {
1081 					retval = write(arg->fd,
1082 					    arg->buf, arg->bufsz);
1083 					if (retval == -1)
1084 						error = errno;
1085 				} else {
1086 					error = errno;
1087 				}
1088 			}
1089 			sigoff(self);	/* block SIGAIOCANCEL */
1090 			break;
1091 #endif	/* !defined(_LP64) */
1092 		case AIOFSYNC:
1093 			if (_aio_fsync_del(aiowp, reqp))
1094 				goto top;
1095 			ASSERT(reqp->req_head == NULL);
1096 			/*
1097 			 * All writes for this fsync request are now
1098 			 * acknowledged.  Now make these writes visible
1099 			 * and put the final request into the hash table.
1100 			 */
1101 			if (reqp->req_state == AIO_REQ_CANCELED) {
1102 				/* EMPTY */;
1103 			} else if (arg->offset == O_SYNC) {
1104 				if ((retval = __fdsync(arg->fd, FSYNC)) == -1)
1105 					error = errno;
1106 			} else {
1107 				if ((retval = __fdsync(arg->fd, FDSYNC)) == -1)
1108 					error = errno;
1109 			}
1110 			if (_aio_hash_insert(reqp->req_resultp, reqp) != 0)
1111 				aio_panic("_aio_do_request(): AIOFSYNC: "
1112 				    "request already in hash table");
1113 			break;
1114 		default:
1115 			aio_panic("_aio_do_request, bad op");
1116 		}
1117 
1118 		_aio_finish_request(aiowp, retval, error);
1119 	}
1120 	/* NOTREACHED */
1121 	return (NULL);
1122 }
1123 
1124 /*
1125  * Perform the tail processing for _aio_do_request().
1126  * The in-progress request may or may not have been cancelled.
1127  */
1128 static void
1129 _aio_finish_request(aio_worker_t *aiowp, ssize_t retval, int error)
1130 {
1131 	aio_req_t *reqp;
1132 
1133 	sig_mutex_lock(&aiowp->work_qlock1);
1134 	if ((reqp = aiowp->work_req) == NULL)
1135 		sig_mutex_unlock(&aiowp->work_qlock1);
1136 	else {
1137 		aiowp->work_req = NULL;
1138 		if (reqp->req_state == AIO_REQ_CANCELED) {
1139 			retval = -1;
1140 			error = ECANCELED;
1141 		}
1142 		if (!POSIX_AIO(reqp)) {
1143 			int notify;
1144 			sig_mutex_unlock(&aiowp->work_qlock1);
1145 			sig_mutex_lock(&__aio_mutex);
1146 			if (reqp->req_state == AIO_REQ_INPROGRESS)
1147 				reqp->req_state = AIO_REQ_DONE;
1148 			/*
1149 			 * If it was canceled, this request will not be
1150 			 * added to done list. Just free it.
1151 			 */
1152 			if (error == ECANCELED) {
1153 				_aio_outstand_cnt--;
1154 				_aio_req_free(reqp);
1155 			} else {
1156 				_aio_set_result(reqp, retval, error);
1157 				_aio_req_done_cnt++;
1158 			}
1159 			/*
1160 			 * Notify any thread that may have blocked
1161 			 * because it saw an outstanding request.
1162 			 */
1163 			notify = 0;
1164 			if (_aio_outstand_cnt == 0 && _aiowait_flag) {
1165 				notify = 1;
1166 			}
1167 			sig_mutex_unlock(&__aio_mutex);
1168 			if (notify) {
1169 				(void) _kaio(AIONOTIFY);
1170 			}
1171 		} else {
1172 			if (reqp->req_state == AIO_REQ_INPROGRESS)
1173 				reqp->req_state = AIO_REQ_DONE;
1174 			sig_mutex_unlock(&aiowp->work_qlock1);
1175 			_aiodone(reqp, retval, error);
1176 		}
1177 	}
1178 }
1179 
1180 void
1181 _aio_req_mark_done(aio_req_t *reqp)
1182 {
1183 #if !defined(_LP64)
1184 	if (reqp->req_largefile)
1185 		((aiocb64_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE;
1186 	else
1187 #endif
1188 		((aiocb_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE;
1189 }
1190 
1191 /*
1192  * Sleep for 'ticks' clock ticks to give somebody else a chance to run,
1193  * hopefully to consume one of our queued signals.
1194  */
1195 static void
1196 _aio_delay(int ticks)
1197 {
1198 	(void) usleep(ticks * (MICROSEC / hz));
1199 }
1200 
1201 /*
1202  * Actually send the notifications.
1203  * We could block indefinitely here if the application
1204  * is not listening for the signal or port notifications.
1205  */
1206 static void
1207 send_notification(notif_param_t *npp)
1208 {
1209 	extern int __sigqueue(pid_t pid, int signo,
1210 	    /* const union sigval */ void *value, int si_code, int block);
1211 
1212 	if (npp->np_signo)
1213 		(void) __sigqueue(__pid, npp->np_signo, npp->np_user,
1214 		    SI_ASYNCIO, 1);
1215 	else if (npp->np_port >= 0)
1216 		(void) _port_dispatch(npp->np_port, 0, PORT_SOURCE_AIO,
1217 		    npp->np_event, npp->np_object, npp->np_user);
1218 
1219 	if (npp->np_lio_signo)
1220 		(void) __sigqueue(__pid, npp->np_lio_signo, npp->np_lio_user,
1221 		    SI_ASYNCIO, 1);
1222 	else if (npp->np_lio_port >= 0)
1223 		(void) _port_dispatch(npp->np_lio_port, 0, PORT_SOURCE_AIO,
1224 		    npp->np_lio_event, npp->np_lio_object, npp->np_lio_user);
1225 }
1226 
1227 /*
1228  * Asynchronous notification worker.
1229  */
1230 void *
1231 _aio_do_notify(void *arg)
1232 {
1233 	aio_worker_t *aiowp = (aio_worker_t *)arg;
1234 	aio_req_t *reqp;
1235 
1236 	/*
1237 	 * This isn't really necessary.  All signals are blocked.
1238 	 */
1239 	if (pthread_setspecific(_aio_key, aiowp) != 0)
1240 		aio_panic("_aio_do_notify, pthread_setspecific()");
1241 
1242 	/*
1243 	 * Notifications are never cancelled.
1244 	 * All signals remain blocked, forever.
1245 	 */
1246 	for (;;) {
1247 		while ((reqp = _aio_req_get(aiowp)) == NULL) {
1248 			if (_aio_idle(aiowp) != 0)
1249 				aio_panic("_aio_do_notify: _aio_idle() failed");
1250 		}
1251 		send_notification(&reqp->req_notify);
1252 		_aio_req_free(reqp);
1253 	}
1254 
1255 	/* NOTREACHED */
1256 	return (NULL);
1257 }
1258 
1259 /*
1260  * Do the completion semantics for a request that was either canceled
1261  * by _aio_cancel_req() or was completed by _aio_do_request().
1262  */
1263 static void
1264 _aiodone(aio_req_t *reqp, ssize_t retval, int error)
1265 {
1266 	aio_result_t *resultp = reqp->req_resultp;
1267 	int notify = 0;
1268 	aio_lio_t *head;
1269 	int sigev_none;
1270 	int sigev_signal;
1271 	int sigev_thread;
1272 	int sigev_port;
1273 	notif_param_t np;
1274 
1275 	/*
1276 	 * We call _aiodone() only for Posix I/O.
1277 	 */
1278 	ASSERT(POSIX_AIO(reqp));
1279 
1280 	sigev_none = 0;
1281 	sigev_signal = 0;
1282 	sigev_thread = 0;
1283 	sigev_port = 0;
1284 	np.np_signo = 0;
1285 	np.np_port = -1;
1286 	np.np_lio_signo = 0;
1287 	np.np_lio_port = -1;
1288 
1289 	switch (reqp->req_sigevent.sigev_notify) {
1290 	case SIGEV_NONE:
1291 		sigev_none = 1;
1292 		break;
1293 	case SIGEV_SIGNAL:
1294 		sigev_signal = 1;
1295 		break;
1296 	case SIGEV_THREAD:
1297 		sigev_thread = 1;
1298 		break;
1299 	case SIGEV_PORT:
1300 		sigev_port = 1;
1301 		break;
1302 	default:
1303 		aio_panic("_aiodone: improper sigev_notify");
1304 		break;
1305 	}
1306 
1307 	/*
1308 	 * Figure out the notification parameters while holding __aio_mutex.
1309 	 * Actually perform the notifications after dropping __aio_mutex.
1310 	 * This allows us to sleep for a long time (if the notifications
1311 	 * incur delays) without impeding other async I/O operations.
1312 	 */
1313 
1314 	sig_mutex_lock(&__aio_mutex);
1315 
1316 	if (sigev_signal) {
1317 		if ((np.np_signo = reqp->req_sigevent.sigev_signo) != 0)
1318 			notify = 1;
1319 		np.np_user = reqp->req_sigevent.sigev_value.sival_ptr;
1320 	} else if (sigev_thread | sigev_port) {
1321 		if ((np.np_port = reqp->req_sigevent.sigev_signo) >= 0)
1322 			notify = 1;
1323 		np.np_event = reqp->req_op;
1324 		if (np.np_event == AIOFSYNC && reqp->req_largefile)
1325 			np.np_event = AIOFSYNC64;
1326 		np.np_object = (uintptr_t)reqp->req_aiocbp;
1327 		np.np_user = reqp->req_sigevent.sigev_value.sival_ptr;
1328 	}
1329 
1330 	if (resultp->aio_errno == EINPROGRESS)
1331 		_aio_set_result(reqp, retval, error);
1332 
1333 	_aio_outstand_cnt--;
1334 
1335 	head = reqp->req_head;
1336 	reqp->req_head = NULL;
1337 
1338 	if (sigev_none) {
1339 		_aio_enq_doneq(reqp);
1340 		reqp = NULL;
1341 	} else {
1342 		(void) _aio_hash_del(resultp);
1343 		_aio_req_mark_done(reqp);
1344 	}
1345 
1346 	_aio_waitn_wakeup();
1347 
1348 	/*
1349 	 * __aio_waitn() sets AIO_WAIT_INPROGRESS and
1350 	 * __aio_suspend() increments "_aio_kernel_suspend"
1351 	 * when they are waiting in the kernel for completed I/Os.
1352 	 *
1353 	 * _kaio(AIONOTIFY) awakes the corresponding function
1354 	 * in the kernel; then the corresponding __aio_waitn() or
1355 	 * __aio_suspend() function could reap the recently
1356 	 * completed I/Os (_aiodone()).
1357 	 */
1358 	if ((_aio_flags & AIO_WAIT_INPROGRESS) || _aio_kernel_suspend > 0)
1359 		(void) _kaio(AIONOTIFY);
1360 
1361 	sig_mutex_unlock(&__aio_mutex);
1362 
1363 	if (head != NULL) {
1364 		/*
1365 		 * If all the lio requests have completed,
1366 		 * prepare to notify the waiting thread.
1367 		 */
1368 		sig_mutex_lock(&head->lio_mutex);
1369 		ASSERT(head->lio_refcnt == head->lio_nent);
1370 		if (head->lio_refcnt == 1) {
1371 			int waiting = 0;
1372 			if (head->lio_mode == LIO_WAIT) {
1373 				if ((waiting = head->lio_waiting) != 0)
1374 					(void) cond_signal(&head->lio_cond_cv);
1375 			} else if (head->lio_port < 0) { /* none or signal */
1376 				if ((np.np_lio_signo = head->lio_signo) != 0)
1377 					notify = 1;
1378 				np.np_lio_user = head->lio_sigval.sival_ptr;
1379 			} else {			/* thread or port */
1380 				notify = 1;
1381 				np.np_lio_port = head->lio_port;
1382 				np.np_lio_event = head->lio_event;
1383 				np.np_lio_object =
1384 				    (uintptr_t)head->lio_sigevent;
1385 				np.np_lio_user = head->lio_sigval.sival_ptr;
1386 			}
1387 			head->lio_nent = head->lio_refcnt = 0;
1388 			sig_mutex_unlock(&head->lio_mutex);
1389 			if (waiting == 0)
1390 				_aio_lio_free(head);
1391 		} else {
1392 			head->lio_nent--;
1393 			head->lio_refcnt--;
1394 			sig_mutex_unlock(&head->lio_mutex);
1395 		}
1396 	}
1397 
1398 	/*
1399 	 * The request is completed; now perform the notifications.
1400 	 */
1401 	if (notify) {
1402 		if (reqp != NULL) {
1403 			/*
1404 			 * We usually put the request on the notification
1405 			 * queue because we don't want to block and delay
1406 			 * other operations behind us in the work queue.
1407 			 * Also we must never block on a cancel notification
1408 			 * because we are being called from an application
1409 			 * thread in this case and that could lead to deadlock
1410 			 * if no other thread is receiving notificatins.
1411 			 */
1412 			reqp->req_notify = np;
1413 			reqp->req_op = AIONOTIFY;
1414 			_aio_req_add(reqp, &__workers_no, AIONOTIFY);
1415 			reqp = NULL;
1416 		} else {
1417 			/*
1418 			 * We already put the request on the done queue,
1419 			 * so we can't queue it to the notification queue.
1420 			 * Just do the notification directly.
1421 			 */
1422 			send_notification(&np);
1423 		}
1424 	}
1425 
1426 	if (reqp != NULL)
1427 		_aio_req_free(reqp);
1428 }
1429 
1430 /*
1431  * Delete fsync requests from list head until there is
1432  * only one left.  Return 0 when there is only one,
1433  * otherwise return a non-zero value.
1434  */
1435 static int
1436 _aio_fsync_del(aio_worker_t *aiowp, aio_req_t *reqp)
1437 {
1438 	aio_lio_t *head = reqp->req_head;
1439 	int rval = 0;
1440 
1441 	ASSERT(reqp == aiowp->work_req);
1442 	sig_mutex_lock(&aiowp->work_qlock1);
1443 	sig_mutex_lock(&head->lio_mutex);
1444 	if (head->lio_refcnt > 1) {
1445 		head->lio_refcnt--;
1446 		head->lio_nent--;
1447 		aiowp->work_req = NULL;
1448 		sig_mutex_unlock(&head->lio_mutex);
1449 		sig_mutex_unlock(&aiowp->work_qlock1);
1450 		sig_mutex_lock(&__aio_mutex);
1451 		_aio_outstand_cnt--;
1452 		_aio_waitn_wakeup();
1453 		sig_mutex_unlock(&__aio_mutex);
1454 		_aio_req_free(reqp);
1455 		return (1);
1456 	}
1457 	ASSERT(head->lio_nent == 1 && head->lio_refcnt == 1);
1458 	reqp->req_head = NULL;
1459 	if (head->lio_canned)
1460 		reqp->req_state = AIO_REQ_CANCELED;
1461 	if (head->lio_mode == LIO_DESTROY) {
1462 		aiowp->work_req = NULL;
1463 		rval = 1;
1464 	}
1465 	sig_mutex_unlock(&head->lio_mutex);
1466 	sig_mutex_unlock(&aiowp->work_qlock1);
1467 	head->lio_refcnt--;
1468 	head->lio_nent--;
1469 	_aio_lio_free(head);
1470 	if (rval != 0)
1471 		_aio_req_free(reqp);
1472 	return (rval);
1473 }
1474 
1475 /*
1476  * A worker is set idle when its work queue is empty.
1477  * The worker checks again that it has no more work
1478  * and then goes to sleep waiting for more work.
1479  */
1480 int
1481 _aio_idle(aio_worker_t *aiowp)
1482 {
1483 	int error = 0;
1484 
1485 	sig_mutex_lock(&aiowp->work_qlock1);
1486 	if (aiowp->work_count1 == 0) {
1487 		ASSERT(aiowp->work_minload1 == 0);
1488 		aiowp->work_idleflg = 1;
1489 		/*
1490 		 * A cancellation handler is not needed here.
1491 		 * aio worker threads are never cancelled via pthread_cancel().
1492 		 */
1493 		error = sig_cond_wait(&aiowp->work_idle_cv,
1494 		    &aiowp->work_qlock1);
1495 		/*
1496 		 * The idle flag is normally cleared before worker is awakened
1497 		 * by aio_req_add().  On error (EINTR), we clear it ourself.
1498 		 */
1499 		if (error)
1500 			aiowp->work_idleflg = 0;
1501 	}
1502 	sig_mutex_unlock(&aiowp->work_qlock1);
1503 	return (error);
1504 }
1505 
1506 /*
1507  * A worker's completed AIO requests are placed onto a global
1508  * done queue.  The application is only sent a SIGIO signal if
1509  * the process has a handler enabled and it is not waiting via
1510  * aiowait().
1511  */
1512 static void
1513 _aio_work_done(aio_worker_t *aiowp)
1514 {
1515 	aio_req_t *reqp;
1516 
1517 	sig_mutex_lock(&aiowp->work_qlock1);
1518 	reqp = aiowp->work_prev1;
1519 	reqp->req_next = NULL;
1520 	aiowp->work_done1 = 0;
1521 	aiowp->work_tail1 = aiowp->work_next1;
1522 	if (aiowp->work_tail1 == NULL)
1523 		aiowp->work_head1 = NULL;
1524 	aiowp->work_prev1 = NULL;
1525 	sig_mutex_unlock(&aiowp->work_qlock1);
1526 	sig_mutex_lock(&__aio_mutex);
1527 	_aio_donecnt++;
1528 	_aio_outstand_cnt--;
1529 	_aio_req_done_cnt--;
1530 	ASSERT(_aio_donecnt > 0 &&
1531 	    _aio_outstand_cnt >= 0 &&
1532 	    _aio_req_done_cnt >= 0);
1533 	ASSERT(reqp != NULL);
1534 
1535 	if (_aio_done_tail == NULL) {
1536 		_aio_done_head = _aio_done_tail = reqp;
1537 	} else {
1538 		_aio_done_head->req_next = reqp;
1539 		_aio_done_head = reqp;
1540 	}
1541 
1542 	if (_aiowait_flag) {
1543 		sig_mutex_unlock(&__aio_mutex);
1544 		(void) _kaio(AIONOTIFY);
1545 	} else {
1546 		sig_mutex_unlock(&__aio_mutex);
1547 		if (_sigio_enabled)
1548 			(void) kill(__pid, SIGIO);
1549 	}
1550 }
1551 
1552 /*
1553  * The done queue consists of AIO requests that are in either the
1554  * AIO_REQ_DONE or AIO_REQ_CANCELED state.  Requests that were cancelled
1555  * are discarded.  If the done queue is empty then NULL is returned.
1556  * Otherwise the address of a done aio_result_t is returned.
1557  */
1558 aio_result_t *
1559 _aio_req_done(void)
1560 {
1561 	aio_req_t *reqp;
1562 	aio_result_t *resultp;
1563 
1564 	ASSERT(MUTEX_HELD(&__aio_mutex));
1565 
1566 	if ((reqp = _aio_done_tail) != NULL) {
1567 		if ((_aio_done_tail = reqp->req_next) == NULL)
1568 			_aio_done_head = NULL;
1569 		ASSERT(_aio_donecnt > 0);
1570 		_aio_donecnt--;
1571 		(void) _aio_hash_del(reqp->req_resultp);
1572 		resultp = reqp->req_resultp;
1573 		ASSERT(reqp->req_state == AIO_REQ_DONE);
1574 		_aio_req_free(reqp);
1575 		return (resultp);
1576 	}
1577 	/* is queue empty? */
1578 	if (reqp == NULL && _aio_outstand_cnt == 0) {
1579 		return ((aio_result_t *)-1);
1580 	}
1581 	return (NULL);
1582 }
1583 
1584 /*
1585  * Set the return and errno values for the application's use.
1586  *
1587  * For the Posix interfaces, we must set the return value first followed
1588  * by the errno value because the Posix interfaces allow for a change
1589  * in the errno value from EINPROGRESS to something else to signal
1590  * the completion of the asynchronous request.
1591  *
1592  * The opposite is true for the Solaris interfaces.  These allow for
1593  * a change in the return value from AIO_INPROGRESS to something else
1594  * to signal the completion of the asynchronous request.
1595  */
1596 void
1597 _aio_set_result(aio_req_t *reqp, ssize_t retval, int error)
1598 {
1599 	aio_result_t *resultp = reqp->req_resultp;
1600 
1601 	if (POSIX_AIO(reqp)) {
1602 		resultp->aio_return = retval;
1603 		membar_producer();
1604 		resultp->aio_errno = error;
1605 	} else {
1606 		resultp->aio_errno = error;
1607 		membar_producer();
1608 		resultp->aio_return = retval;
1609 	}
1610 }
1611 
1612 /*
1613  * Add an AIO request onto the next work queue.
1614  * A circular list of workers is used to choose the next worker.
1615  */
1616 void
1617 _aio_req_add(aio_req_t *reqp, aio_worker_t **nextworker, int mode)
1618 {
1619 	ulwp_t *self = curthread;
1620 	aio_worker_t *aiowp;
1621 	aio_worker_t *first;
1622 	int load_bal_flg = 1;
1623 	int found;
1624 
1625 	ASSERT(reqp->req_state != AIO_REQ_DONEQ);
1626 	reqp->req_next = NULL;
1627 	/*
1628 	 * Try to acquire the next worker's work queue.  If it is locked,
1629 	 * then search the list of workers until a queue is found unlocked,
1630 	 * or until the list is completely traversed at which point another
1631 	 * worker will be created.
1632 	 */
1633 	sigoff(self);		/* defer SIGIO */
1634 	sig_mutex_lock(&__aio_mutex);
1635 	first = aiowp = *nextworker;
1636 	if (mode != AIONOTIFY)
1637 		_aio_outstand_cnt++;
1638 	sig_mutex_unlock(&__aio_mutex);
1639 
1640 	switch (mode) {
1641 	case AIOREAD:
1642 	case AIOWRITE:
1643 	case AIOAREAD:
1644 	case AIOAWRITE:
1645 #if !defined(_LP64)
1646 	case AIOAREAD64:
1647 	case AIOAWRITE64:
1648 #endif
1649 		/* try to find an idle worker */
1650 		found = 0;
1651 		do {
1652 			if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) {
1653 				if (aiowp->work_idleflg) {
1654 					found = 1;
1655 					break;
1656 				}
1657 				sig_mutex_unlock(&aiowp->work_qlock1);
1658 			}
1659 		} while ((aiowp = aiowp->work_forw) != first);
1660 
1661 		if (found) {
1662 			aiowp->work_minload1++;
1663 			break;
1664 		}
1665 
1666 		/* try to acquire some worker's queue lock */
1667 		do {
1668 			if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) {
1669 				found = 1;
1670 				break;
1671 			}
1672 		} while ((aiowp = aiowp->work_forw) != first);
1673 
1674 		/*
1675 		 * Create more workers when the workers appear overloaded.
1676 		 * Either all the workers are busy draining their queues
1677 		 * or no worker's queue lock could be acquired.
1678 		 */
1679 		if (!found) {
1680 			if (_aio_worker_cnt < _max_workers) {
1681 				if (_aio_create_worker(reqp, mode))
1682 					aio_panic("_aio_req_add: add worker");
1683 				sigon(self);	/* reenable SIGIO */
1684 				return;
1685 			}
1686 
1687 			/*
1688 			 * No worker available and we have created
1689 			 * _max_workers, keep going through the
1690 			 * list slowly until we get a lock
1691 			 */
1692 			while (sig_mutex_trylock(&aiowp->work_qlock1) != 0) {
1693 				/*
1694 				 * give someone else a chance
1695 				 */
1696 				_aio_delay(1);
1697 				aiowp = aiowp->work_forw;
1698 			}
1699 		}
1700 
1701 		ASSERT(MUTEX_HELD(&aiowp->work_qlock1));
1702 		if (_aio_worker_cnt < _max_workers &&
1703 		    aiowp->work_minload1 >= _minworkload) {
1704 			sig_mutex_unlock(&aiowp->work_qlock1);
1705 			sig_mutex_lock(&__aio_mutex);
1706 			*nextworker = aiowp->work_forw;
1707 			sig_mutex_unlock(&__aio_mutex);
1708 			if (_aio_create_worker(reqp, mode))
1709 				aio_panic("aio_req_add: add worker");
1710 			sigon(self);	/* reenable SIGIO */
1711 			return;
1712 		}
1713 		aiowp->work_minload1++;
1714 		break;
1715 	case AIOFSYNC:
1716 	case AIONOTIFY:
1717 		load_bal_flg = 0;
1718 		sig_mutex_lock(&aiowp->work_qlock1);
1719 		break;
1720 	default:
1721 		aio_panic("_aio_req_add: invalid mode");
1722 		break;
1723 	}
1724 	/*
1725 	 * Put request onto worker's work queue.
1726 	 */
1727 	if (aiowp->work_tail1 == NULL) {
1728 		ASSERT(aiowp->work_count1 == 0);
1729 		aiowp->work_tail1 = reqp;
1730 		aiowp->work_next1 = reqp;
1731 	} else {
1732 		aiowp->work_head1->req_next = reqp;
1733 		if (aiowp->work_next1 == NULL)
1734 			aiowp->work_next1 = reqp;
1735 	}
1736 	reqp->req_state = AIO_REQ_QUEUED;
1737 	reqp->req_worker = aiowp;
1738 	aiowp->work_head1 = reqp;
1739 	/*
1740 	 * Awaken worker if it is not currently active.
1741 	 */
1742 	if (aiowp->work_count1++ == 0 && aiowp->work_idleflg) {
1743 		aiowp->work_idleflg = 0;
1744 		(void) cond_signal(&aiowp->work_idle_cv);
1745 	}
1746 	sig_mutex_unlock(&aiowp->work_qlock1);
1747 
1748 	if (load_bal_flg) {
1749 		sig_mutex_lock(&__aio_mutex);
1750 		*nextworker = aiowp->work_forw;
1751 		sig_mutex_unlock(&__aio_mutex);
1752 	}
1753 	sigon(self);	/* reenable SIGIO */
1754 }
1755 
1756 /*
1757  * Get an AIO request for a specified worker.
1758  * If the work queue is empty, return NULL.
1759  */
1760 aio_req_t *
1761 _aio_req_get(aio_worker_t *aiowp)
1762 {
1763 	aio_req_t *reqp;
1764 
1765 	sig_mutex_lock(&aiowp->work_qlock1);
1766 	if ((reqp = aiowp->work_next1) != NULL) {
1767 		/*
1768 		 * Remove a POSIX request from the queue; the
1769 		 * request queue is a singularly linked list
1770 		 * with a previous pointer.  The request is
1771 		 * removed by updating the previous pointer.
1772 		 *
1773 		 * Non-posix requests are left on the queue
1774 		 * to eventually be placed on the done queue.
1775 		 */
1776 
1777 		if (POSIX_AIO(reqp)) {
1778 			if (aiowp->work_prev1 == NULL) {
1779 				aiowp->work_tail1 = reqp->req_next;
1780 				if (aiowp->work_tail1 == NULL)
1781 					aiowp->work_head1 = NULL;
1782 			} else {
1783 				aiowp->work_prev1->req_next = reqp->req_next;
1784 				if (aiowp->work_head1 == reqp)
1785 					aiowp->work_head1 = reqp->req_next;
1786 			}
1787 
1788 		} else {
1789 			aiowp->work_prev1 = reqp;
1790 			ASSERT(aiowp->work_done1 >= 0);
1791 			aiowp->work_done1++;
1792 		}
1793 		ASSERT(reqp != reqp->req_next);
1794 		aiowp->work_next1 = reqp->req_next;
1795 		ASSERT(aiowp->work_count1 >= 1);
1796 		aiowp->work_count1--;
1797 		switch (reqp->req_op) {
1798 		case AIOREAD:
1799 		case AIOWRITE:
1800 		case AIOAREAD:
1801 		case AIOAWRITE:
1802 #if !defined(_LP64)
1803 		case AIOAREAD64:
1804 		case AIOAWRITE64:
1805 #endif
1806 			ASSERT(aiowp->work_minload1 > 0);
1807 			aiowp->work_minload1--;
1808 			break;
1809 		}
1810 		reqp->req_state = AIO_REQ_INPROGRESS;
1811 	}
1812 	aiowp->work_req = reqp;
1813 	ASSERT(reqp != NULL || aiowp->work_count1 == 0);
1814 	sig_mutex_unlock(&aiowp->work_qlock1);
1815 	return (reqp);
1816 }
1817 
1818 static void
1819 _aio_req_del(aio_worker_t *aiowp, aio_req_t *reqp, int ostate)
1820 {
1821 	aio_req_t **last;
1822 	aio_req_t *lastrp;
1823 	aio_req_t *next;
1824 
1825 	ASSERT(aiowp != NULL);
1826 	ASSERT(MUTEX_HELD(&aiowp->work_qlock1));
1827 	if (POSIX_AIO(reqp)) {
1828 		if (ostate != AIO_REQ_QUEUED)
1829 			return;
1830 	}
1831 	last = &aiowp->work_tail1;
1832 	lastrp = aiowp->work_tail1;
1833 	ASSERT(ostate == AIO_REQ_QUEUED || ostate == AIO_REQ_INPROGRESS);
1834 	while ((next = *last) != NULL) {
1835 		if (next == reqp) {
1836 			*last = next->req_next;
1837 			if (aiowp->work_next1 == next)
1838 				aiowp->work_next1 = next->req_next;
1839 
1840 			if ((next->req_next != NULL) ||
1841 			    (aiowp->work_done1 == 0)) {
1842 				if (aiowp->work_head1 == next)
1843 					aiowp->work_head1 = next->req_next;
1844 				if (aiowp->work_prev1 == next)
1845 					aiowp->work_prev1 = next->req_next;
1846 			} else {
1847 				if (aiowp->work_head1 == next)
1848 					aiowp->work_head1 = lastrp;
1849 				if (aiowp->work_prev1 == next)
1850 					aiowp->work_prev1 = lastrp;
1851 			}
1852 
1853 			if (ostate == AIO_REQ_QUEUED) {
1854 				ASSERT(aiowp->work_count1 >= 1);
1855 				aiowp->work_count1--;
1856 				ASSERT(aiowp->work_minload1 >= 1);
1857 				aiowp->work_minload1--;
1858 			} else {
1859 				ASSERT(ostate == AIO_REQ_INPROGRESS &&
1860 				    !POSIX_AIO(reqp));
1861 				aiowp->work_done1--;
1862 			}
1863 			return;
1864 		}
1865 		last = &next->req_next;
1866 		lastrp = next;
1867 	}
1868 	/* NOTREACHED */
1869 }
1870 
1871 static void
1872 _aio_enq_doneq(aio_req_t *reqp)
1873 {
1874 	if (_aio_doneq == NULL) {
1875 		_aio_doneq = reqp;
1876 		reqp->req_next = reqp->req_prev = reqp;
1877 	} else {
1878 		reqp->req_next = _aio_doneq;
1879 		reqp->req_prev = _aio_doneq->req_prev;
1880 		_aio_doneq->req_prev->req_next = reqp;
1881 		_aio_doneq->req_prev = reqp;
1882 	}
1883 	reqp->req_state = AIO_REQ_DONEQ;
1884 	_aio_doneq_cnt++;
1885 }
1886 
1887 /*
1888  * caller owns the _aio_mutex
1889  */
1890 aio_req_t *
1891 _aio_req_remove(aio_req_t *reqp)
1892 {
1893 	if (reqp && reqp->req_state != AIO_REQ_DONEQ)
1894 		return (NULL);
1895 
1896 	if (reqp) {
1897 		/* request in done queue */
1898 		if (_aio_doneq == reqp)
1899 			_aio_doneq = reqp->req_next;
1900 		if (_aio_doneq == reqp) {
1901 			/* only one request on queue */
1902 			_aio_doneq = NULL;
1903 		} else {
1904 			aio_req_t *tmp = reqp->req_next;
1905 			reqp->req_prev->req_next = tmp;
1906 			tmp->req_prev = reqp->req_prev;
1907 		}
1908 	} else if ((reqp = _aio_doneq) != NULL) {
1909 		if (reqp == reqp->req_next) {
1910 			/* only one request on queue */
1911 			_aio_doneq = NULL;
1912 		} else {
1913 			reqp->req_prev->req_next = _aio_doneq = reqp->req_next;
1914 			_aio_doneq->req_prev = reqp->req_prev;
1915 		}
1916 	}
1917 	if (reqp) {
1918 		_aio_doneq_cnt--;
1919 		reqp->req_next = reqp->req_prev = reqp;
1920 		reqp->req_state = AIO_REQ_DONE;
1921 	}
1922 	return (reqp);
1923 }
1924 
1925 /*
1926  * An AIO request is identified by an aio_result_t pointer.  The library
1927  * maps this aio_result_t pointer to its internal representation using a
1928  * hash table.  This function adds an aio_result_t pointer to the hash table.
1929  */
1930 static int
1931 _aio_hash_insert(aio_result_t *resultp, aio_req_t *reqp)
1932 {
1933 	aio_hash_t *hashp;
1934 	aio_req_t **prev;
1935 	aio_req_t *next;
1936 
1937 	hashp = _aio_hash + AIOHASH(resultp);
1938 	lmutex_lock(&hashp->hash_lock);
1939 	prev = &hashp->hash_ptr;
1940 	while ((next = *prev) != NULL) {
1941 		if (resultp == next->req_resultp) {
1942 			lmutex_unlock(&hashp->hash_lock);
1943 			return (-1);
1944 		}
1945 		prev = &next->req_link;
1946 	}
1947 	*prev = reqp;
1948 	ASSERT(reqp->req_link == NULL);
1949 	lmutex_unlock(&hashp->hash_lock);
1950 	return (0);
1951 }
1952 
1953 /*
1954  * Remove an entry from the hash table.
1955  */
1956 aio_req_t *
1957 _aio_hash_del(aio_result_t *resultp)
1958 {
1959 	aio_hash_t *hashp;
1960 	aio_req_t **prev;
1961 	aio_req_t *next = NULL;
1962 
1963 	if (_aio_hash != NULL) {
1964 		hashp = _aio_hash + AIOHASH(resultp);
1965 		lmutex_lock(&hashp->hash_lock);
1966 		prev = &hashp->hash_ptr;
1967 		while ((next = *prev) != NULL) {
1968 			if (resultp == next->req_resultp) {
1969 				*prev = next->req_link;
1970 				next->req_link = NULL;
1971 				break;
1972 			}
1973 			prev = &next->req_link;
1974 		}
1975 		lmutex_unlock(&hashp->hash_lock);
1976 	}
1977 	return (next);
1978 }
1979 
1980 /*
1981  *  find an entry in the hash table
1982  */
1983 aio_req_t *
1984 _aio_hash_find(aio_result_t *resultp)
1985 {
1986 	aio_hash_t *hashp;
1987 	aio_req_t **prev;
1988 	aio_req_t *next = NULL;
1989 
1990 	if (_aio_hash != NULL) {
1991 		hashp = _aio_hash + AIOHASH(resultp);
1992 		lmutex_lock(&hashp->hash_lock);
1993 		prev = &hashp->hash_ptr;
1994 		while ((next = *prev) != NULL) {
1995 			if (resultp == next->req_resultp)
1996 				break;
1997 			prev = &next->req_link;
1998 		}
1999 		lmutex_unlock(&hashp->hash_lock);
2000 	}
2001 	return (next);
2002 }
2003 
2004 /*
2005  * AIO interface for POSIX
2006  */
2007 int
2008 _aio_rw(aiocb_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker,
2009     int mode, int flg)
2010 {
2011 	aio_req_t *reqp;
2012 	aio_args_t *ap;
2013 	int kerr;
2014 
2015 	if (aiocbp == NULL) {
2016 		errno = EINVAL;
2017 		return (-1);
2018 	}
2019 
2020 	/* initialize kaio */
2021 	if (!_kaio_ok)
2022 		_kaio_init();
2023 
2024 	aiocbp->aio_state = NOCHECK;
2025 
2026 	/*
2027 	 * If we have been called because a list I/O
2028 	 * kaio() failed, we dont want to repeat the
2029 	 * system call
2030 	 */
2031 
2032 	if (flg & AIO_KAIO) {
2033 		/*
2034 		 * Try kernel aio first.
2035 		 * If errno is ENOTSUP/EBADFD,
2036 		 * fall back to the thread implementation.
2037 		 */
2038 		if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) {
2039 			aiocbp->aio_resultp.aio_errno = EINPROGRESS;
2040 			aiocbp->aio_state = CHECK;
2041 			kerr = (int)_kaio(mode, aiocbp);
2042 			if (kerr == 0)
2043 				return (0);
2044 			if (errno != ENOTSUP && errno != EBADFD) {
2045 				aiocbp->aio_resultp.aio_errno = errno;
2046 				aiocbp->aio_resultp.aio_return = -1;
2047 				aiocbp->aio_state = NOCHECK;
2048 				return (-1);
2049 			}
2050 			if (errno == EBADFD)
2051 				SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes);
2052 		}
2053 	}
2054 
2055 	aiocbp->aio_resultp.aio_errno = EINPROGRESS;
2056 	aiocbp->aio_state = USERAIO;
2057 
2058 	if (!__uaio_ok && __uaio_init() == -1)
2059 		return (-1);
2060 
2061 	if ((reqp = _aio_req_alloc()) == NULL) {
2062 		errno = EAGAIN;
2063 		return (-1);
2064 	}
2065 
2066 	/*
2067 	 * If an LIO request, add the list head to the aio request
2068 	 */
2069 	reqp->req_head = lio_head;
2070 	reqp->req_type = AIO_POSIX_REQ;
2071 	reqp->req_op = mode;
2072 	reqp->req_largefile = 0;
2073 
2074 	if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) {
2075 		reqp->req_sigevent.sigev_notify = SIGEV_NONE;
2076 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) {
2077 		reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL;
2078 		reqp->req_sigevent.sigev_signo =
2079 		    aiocbp->aio_sigevent.sigev_signo;
2080 		reqp->req_sigevent.sigev_value.sival_ptr =
2081 		    aiocbp->aio_sigevent.sigev_value.sival_ptr;
2082 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) {
2083 		port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr;
2084 		reqp->req_sigevent.sigev_notify = SIGEV_PORT;
2085 		/*
2086 		 * Reuse the sigevent structure to contain the port number
2087 		 * and the user value.  Same for SIGEV_THREAD, below.
2088 		 */
2089 		reqp->req_sigevent.sigev_signo =
2090 		    pn->portnfy_port;
2091 		reqp->req_sigevent.sigev_value.sival_ptr =
2092 		    pn->portnfy_user;
2093 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) {
2094 		reqp->req_sigevent.sigev_notify = SIGEV_THREAD;
2095 		/*
2096 		 * The sigevent structure contains the port number
2097 		 * and the user value.  Same for SIGEV_PORT, above.
2098 		 */
2099 		reqp->req_sigevent.sigev_signo =
2100 		    aiocbp->aio_sigevent.sigev_signo;
2101 		reqp->req_sigevent.sigev_value.sival_ptr =
2102 		    aiocbp->aio_sigevent.sigev_value.sival_ptr;
2103 	}
2104 
2105 	reqp->req_resultp = &aiocbp->aio_resultp;
2106 	reqp->req_aiocbp = aiocbp;
2107 	ap = &reqp->req_args;
2108 	ap->fd = aiocbp->aio_fildes;
2109 	ap->buf = (caddr_t)aiocbp->aio_buf;
2110 	ap->bufsz = aiocbp->aio_nbytes;
2111 	ap->offset = aiocbp->aio_offset;
2112 
2113 	if ((flg & AIO_NO_DUPS) &&
2114 	    _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) {
2115 		aio_panic("_aio_rw(): request already in hash table");
2116 		_aio_req_free(reqp);
2117 		errno = EINVAL;
2118 		return (-1);
2119 	}
2120 	_aio_req_add(reqp, nextworker, mode);
2121 	return (0);
2122 }
2123 
2124 #if !defined(_LP64)
2125 /*
2126  * 64-bit AIO interface for POSIX
2127  */
2128 int
2129 _aio_rw64(aiocb64_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker,
2130     int mode, int flg)
2131 {
2132 	aio_req_t *reqp;
2133 	aio_args_t *ap;
2134 	int kerr;
2135 
2136 	if (aiocbp == NULL) {
2137 		errno = EINVAL;
2138 		return (-1);
2139 	}
2140 
2141 	/* initialize kaio */
2142 	if (!_kaio_ok)
2143 		_kaio_init();
2144 
2145 	aiocbp->aio_state = NOCHECK;
2146 
2147 	/*
2148 	 * If we have been called because a list I/O
2149 	 * kaio() failed, we dont want to repeat the
2150 	 * system call
2151 	 */
2152 
2153 	if (flg & AIO_KAIO) {
2154 		/*
2155 		 * Try kernel aio first.
2156 		 * If errno is ENOTSUP/EBADFD,
2157 		 * fall back to the thread implementation.
2158 		 */
2159 		if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) {
2160 			aiocbp->aio_resultp.aio_errno = EINPROGRESS;
2161 			aiocbp->aio_state = CHECK;
2162 			kerr = (int)_kaio(mode, aiocbp);
2163 			if (kerr == 0)
2164 				return (0);
2165 			if (errno != ENOTSUP && errno != EBADFD) {
2166 				aiocbp->aio_resultp.aio_errno = errno;
2167 				aiocbp->aio_resultp.aio_return = -1;
2168 				aiocbp->aio_state = NOCHECK;
2169 				return (-1);
2170 			}
2171 			if (errno == EBADFD)
2172 				SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes);
2173 		}
2174 	}
2175 
2176 	aiocbp->aio_resultp.aio_errno = EINPROGRESS;
2177 	aiocbp->aio_state = USERAIO;
2178 
2179 	if (!__uaio_ok && __uaio_init() == -1)
2180 		return (-1);
2181 
2182 	if ((reqp = _aio_req_alloc()) == NULL) {
2183 		errno = EAGAIN;
2184 		return (-1);
2185 	}
2186 
2187 	/*
2188 	 * If an LIO request, add the list head to the aio request
2189 	 */
2190 	reqp->req_head = lio_head;
2191 	reqp->req_type = AIO_POSIX_REQ;
2192 	reqp->req_op = mode;
2193 	reqp->req_largefile = 1;
2194 
2195 	if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) {
2196 		reqp->req_sigevent.sigev_notify = SIGEV_NONE;
2197 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) {
2198 		reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL;
2199 		reqp->req_sigevent.sigev_signo =
2200 		    aiocbp->aio_sigevent.sigev_signo;
2201 		reqp->req_sigevent.sigev_value.sival_ptr =
2202 		    aiocbp->aio_sigevent.sigev_value.sival_ptr;
2203 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) {
2204 		port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr;
2205 		reqp->req_sigevent.sigev_notify = SIGEV_PORT;
2206 		reqp->req_sigevent.sigev_signo =
2207 		    pn->portnfy_port;
2208 		reqp->req_sigevent.sigev_value.sival_ptr =
2209 		    pn->portnfy_user;
2210 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) {
2211 		reqp->req_sigevent.sigev_notify = SIGEV_THREAD;
2212 		reqp->req_sigevent.sigev_signo =
2213 		    aiocbp->aio_sigevent.sigev_signo;
2214 		reqp->req_sigevent.sigev_value.sival_ptr =
2215 		    aiocbp->aio_sigevent.sigev_value.sival_ptr;
2216 	}
2217 
2218 	reqp->req_resultp = &aiocbp->aio_resultp;
2219 	reqp->req_aiocbp = aiocbp;
2220 	ap = &reqp->req_args;
2221 	ap->fd = aiocbp->aio_fildes;
2222 	ap->buf = (caddr_t)aiocbp->aio_buf;
2223 	ap->bufsz = aiocbp->aio_nbytes;
2224 	ap->offset = aiocbp->aio_offset;
2225 
2226 	if ((flg & AIO_NO_DUPS) &&
2227 	    _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) {
2228 		aio_panic("_aio_rw64(): request already in hash table");
2229 		_aio_req_free(reqp);
2230 		errno = EINVAL;
2231 		return (-1);
2232 	}
2233 	_aio_req_add(reqp, nextworker, mode);
2234 	return (0);
2235 }
2236 #endif	/* !defined(_LP64) */
2237