xref: /illumos-gate/usr/src/lib/libc/port/aio/aio.c (revision 7c478bd9)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License, Version 1.0 only
6  * (the "License").  You may not use this file except in compliance
7  * with the License.
8  *
9  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10  * or http://www.opensolaris.org/os/licensing.
11  * See the License for the specific language governing permissions
12  * and limitations under the License.
13  *
14  * When distributing Covered Code, include this CDDL HEADER in each
15  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16  * If applicable, add the following below this CDDL HEADER, with the
17  * fields enclosed by brackets "[]" replaced with your own identifying
18  * information: Portions Copyright [yyyy] [name of copyright owner]
19  *
20  * CDDL HEADER END
21  */
22 /*
23  * Copyright 2004 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 
27 #pragma ident	"%Z%%M%	%I%	%E% SMI"
28 
29 #include "libaio.h"
30 #include <sys/param.h>
31 #include <sys/file.h>
32 #include <sys/port.h>
33 
34 static int _aio_hash_insert(aio_result_t *, aio_req_t *);
35 static aio_req_t *_aio_req_alloc(void);
36 static aio_req_t *_aio_req_get(aio_worker_t *);
37 static void _aio_req_add(aio_req_t *, aio_worker_t **, int);
38 static void _aio_req_del(aio_worker_t *, aio_req_t *, int);
39 static aio_result_t *_aio_req_done(void);
40 static void _aio_work_done(aio_worker_t *);
41 aio_req_t *_aio_req_remove(aio_req_t *reqp);
42 static void _aio_enq_doneq(aio_req_t *reqp);
43 int _aio_get_timedelta(struct timespec *end, struct timespec *wait);
44 
45 aio_req_t *_aio_hash_find(aio_result_t *);
46 void _aio_req_free(aio_req_t *);
47 void _aio_lock(void);
48 void _aio_unlock(void);
49 
50 extern int __fdsync(int fd, int mode);
51 extern int _sigprocmask(int, const sigset_t *, sigset_t *);
52 extern int _port_dispatch(int, int, int, int, uintptr_t, void *);
53 
54 static int _aio_fsync_del(aio_req_t *, aio_lio_t *);
55 static int _aiodone(aio_req_t *, aio_lio_t *, int, ssize_t, int);
56 static void _aio_cancel_work(aio_worker_t *, int, int *, int *);
57 
58 #ifdef DEBUG
59 void _aio_stats(void);
60 #endif
61 
62 int _pagesize;
63 
64 #define	AIOREQSZ		(sizeof (struct aio_req))
65 #define	AIOCLICKS		((_pagesize)/AIOREQSZ)
66 #define	HASHSZ			8192L	/* power of 2 */
67 #define	AIOHASH(resultp)	((((uintptr_t)(resultp) >> 13) ^ \
68 				    ((uintptr_t)(resultp))) & (HASHSZ-1))
69 #define	POSIX_AIO(x)		((x)->req_type == AIO_POSIX_REQ)
70 
71 /*
72  * switch for kernel async I/O
73  */
74 int _kaio_ok = 0;			/* 0 = disabled, 1 = on, -1 = error */
75 
76 /*
77  * Key for thread-specific data
78  */
79 thread_key_t _aio_key = 0;
80 
81 /*
82  * Array for determining whether or not a file supports kaio
83  */
84 uint32_t _kaio_supported[MAX_KAIO_FDARRAY_SIZE];
85 
86 int _aioreqsize = AIOREQSZ;
87 
88 #ifdef DEBUG
89 int *_donecnt;				/* per worker AIO done count */
90 int *_idlecnt;				/* per worker idle count */
91 int *_qfullcnt;				/* per worker full q count */
92 int *_firstqcnt;			/* num times queue one is used */
93 int *_newworker;			/* num times new worker is created */
94 int _clogged = 0;			/* num times all queues are locked */
95 int _qlocked = 0;			/* num times submitter finds q locked */
96 int _aio_submitcnt = 0;
97 int _aio_submitcnt2 = 0;
98 int _submitcnt = 0;
99 int _avesubmitcnt = 0;
100 int _aiowaitcnt = 0;
101 int _startaiowaitcnt = 1;
102 int _avedone = 0;
103 int _new_workers = 0;
104 #endif
105 
106 /*
107  *  workers for read requests.
108  * (__aio_mutex lock protects circular linked list of workers.)
109  */
110 aio_worker_t *__workers_rd;	/* circular list of AIO workers */
111 aio_worker_t *__nextworker_rd;	/* next worker in list of workers */
112 int __rd_workerscnt;		/* number of read workers */
113 
114 /*
115  * workers for write requests.
116  * (__aio_mutex lock protects circular linked list of workers.)
117  */
118 aio_worker_t *__workers_wr;	/* circular list of AIO workers */
119 aio_worker_t *__nextworker_wr;	/* next worker in list of workers */
120 int __wr_workerscnt;		/* number of write workers */
121 
122 /*
123  * worker for sigevent requests.
124  */
125 aio_worker_t *__workers_si;	/* circular list of AIO workers */
126 aio_worker_t *__nextworker_si;	/* next worker in list of workers */
127 int __si_workerscnt;		/* number of write workers */
128 
129 struct aio_req *_aio_done_tail;		/* list of done requests */
130 struct aio_req *_aio_done_head;
131 
132 mutex_t __aio_initlock = DEFAULTMUTEX;	/* makes aio initialization atomic */
133 mutex_t __aio_mutex = DEFAULTMUTEX;	/* protects counts, and linked lists */
134 mutex_t __aio_cachefillock = DEFAULTMUTEX; /* single-thread aio cache filling */
135 cond_t _aio_iowait_cv = DEFAULTCV;	/* wait for userland I/Os */
136 cond_t __aio_cachefillcv = DEFAULTCV;	/* sleep cv for cache filling */
137 
138 mutex_t __lio_mutex = DEFAULTMUTEX;	/* protects lio lists */
139 
140 int __aiostksz;				/* aio worker's stack size */
141 int __aio_cachefilling = 0;		/* set when aio cache is filling */
142 int __sigio_masked = 0;			/* bit mask for SIGIO signal */
143 int __sigio_maskedcnt = 0;		/* mask count for SIGIO signal */
144 pid_t __pid = (pid_t)-1;		/* initialize as invalid pid */
145 static struct aio_req **_aio_hash;
146 static struct aio_req *_aio_freelist;
147 static struct aio_req *_aio_doneq;	/* double linked done queue list */
148 static int _aio_freelist_cnt;
149 
150 static struct sigaction act;
151 
152 cond_t _aio_done_cv = DEFAULTCV;
153 
154 /*
155  * Input queue of requests which is serviced by the aux. threads.
156  */
157 cond_t _aio_idle_cv = DEFAULTCV;
158 
159 int _aio_cnt = 0;
160 int _aio_donecnt = 0;
161 int _aio_waitncnt = 0;			/* # fs requests for aio_waitn */
162 int _aio_doneq_cnt = 0;
163 int _aio_outstand_cnt = 0;		/* number of outstanding requests */
164 int _aio_outstand_waitn = 0;		/* # of queued requests for aio_waitn */
165 int _aio_req_done_cnt = 0;		/* req. done but not in "done queue" */
166 int _aio_kernel_suspend = 0;		/* active kernel kaio calls */
167 int _aio_suscv_cnt = 0;			/* aio_suspend calls waiting on cv's */
168 
169 int _max_workers = 256;			/* max number of workers permitted */
170 int _min_workers = 8;			/* min number of workers */
171 int _maxworkload = 32;			/* max length of worker's request q */
172 int _minworkload = 2;			/* min number of request in q */
173 int _aio_worker_cnt = 0;		/* number of workers to do requests */
174 int _idle_workers = 0;			/* number of idle workers */
175 int __uaio_ok = 0;			/* AIO has been enabled */
176 sigset_t _worker_set;			/* worker's signal mask */
177 
178 int _aiowait_flag = 0;			/* when set, aiowait() is inprogress */
179 int _aio_flags = 0;			/* see libaio.h defines for */
180 
181 struct aio_worker *_kaiowp;		/* points to kaio cleanup thread */
182 
183 /*
184  * called by the child when the main thread forks. the child is
185  * cleaned up so that it can use libaio.
186  */
187 void
188 _aio_forkinit(void)
189 {
190 	__uaio_ok = 0;
191 	__workers_rd = NULL;
192 	__nextworker_rd = NULL;
193 	__workers_wr = NULL;
194 	__nextworker_wr = NULL;
195 	_aio_done_tail = NULL;
196 	_aio_done_head = NULL;
197 	_aio_hash = NULL;
198 	_aio_freelist = NULL;
199 	_aio_freelist_cnt = 0;
200 	_aio_doneq = NULL;
201 	_aio_doneq_cnt = 0;
202 	_aio_waitncnt = 0;
203 	_aio_outstand_cnt = 0;
204 	_aio_outstand_waitn = 0;
205 	_aio_req_done_cnt = 0;
206 	_aio_kernel_suspend = 0;
207 	_aio_suscv_cnt = 0;
208 	_aio_flags = 0;
209 	_aio_worker_cnt = 0;
210 	_idle_workers = 0;
211 	_kaio_ok = 0;
212 #ifdef	DEBUG
213 	_clogged = 0;
214 	_qlocked = 0;
215 #endif
216 }
217 
218 #ifdef DEBUG
219 /*
220  * print out a bunch of interesting statistics when the process
221  * exits.
222  */
223 void
224 _aio_stats()
225 {
226 	int i;
227 	char *fmt;
228 	int cnt;
229 	FILE *fp;
230 
231 	fp = fopen("/tmp/libaio.log", "w+a");
232 	if (fp == NULL)
233 		return;
234 	fprintf(fp, "size of AIO request struct = %d bytes\n", _aioreqsize);
235 	fprintf(fp, "number of AIO workers = %d\n", _aio_worker_cnt);
236 	cnt = _aio_worker_cnt + 1;
237 	for (i = 2; i <= cnt; i++) {
238 		fmt = "%d done %d, idle = %d, qfull = %d, newworker = %d\n";
239 		fprintf(fp, fmt, i, _donecnt[i], _idlecnt[i], _qfullcnt[i],
240 		    _newworker[i]);
241 	}
242 	fprintf(fp, "num times submitter found next work queue locked = %d\n",
243 	    _qlocked);
244 	fprintf(fp, "num times submitter found all work queues locked = %d\n",
245 	    _clogged);
246 	fprintf(fp, "average submit request = %d\n", _avesubmitcnt);
247 	fprintf(fp, "average number of submit requests per new worker = %d\n",
248 	    _avedone);
249 }
250 #endif
251 
252 /*
253  * libaio is initialized when an AIO request is made. important
254  * constants are initialized like the max number of workers that
255  * libaio can create, and the minimum number of workers permitted before
256  * imposing some restrictions. also, some workers are created.
257  */
258 int
259 __uaio_init(void)
260 {
261 	int i;
262 	size_t size;
263 	extern sigset_t __sigiomask;
264 	struct sigaction oact;
265 
266 	(void) mutex_lock(&__aio_initlock);
267 	if (_aio_key == 0 &&
268 	    thr_keycreate(&_aio_key, _aio_free_worker) != 0)
269 		_aiopanic("__uaio_init, thr_keycreate()\n");
270 	if (!__uaio_ok) {
271 		__pid = getpid();
272 
273 		if (_sigaction(SIGAIOCANCEL, NULL, &oact) == -1) {
274 			(void) mutex_unlock(&__aio_initlock);
275 			return (-1);
276 		}
277 
278 		if (oact.sa_handler != aiosigcancelhndlr) {
279 			act.sa_handler = aiosigcancelhndlr;
280 			act.sa_flags = SA_SIGINFO;
281 			if (_sigaction(SIGAIOCANCEL, &act, &sigcanact) == -1) {
282 				(void) mutex_unlock(&__aio_initlock);
283 				return (-1);
284 			}
285 		}
286 
287 		/*
288 		 * Constant sigiomask, used by _aiosendsig()
289 		 */
290 		(void) sigaddset(&__sigiomask, SIGIO);
291 #ifdef DEBUG
292 		size = _max_workers * (sizeof (int) * 5 +
293 		    sizeof (int));
294 		_donecnt = malloc(size);
295 		(void) memset((caddr_t)_donecnt, 0, size);
296 		_idlecnt = _donecnt + _max_workers;
297 		_qfullcnt = _idlecnt + _max_workers;
298 		_firstqcnt = _qfullcnt + _max_workers;
299 		_newworker = _firstqcnt + _max_workers;
300 		atexit(_aio_stats);
301 #endif
302 		size = HASHSZ * sizeof (struct aio_req *);
303 		_aio_hash = malloc(size);
304 		if (_aio_hash == NULL) {
305 			(void) mutex_unlock(&__aio_initlock);
306 			return (-1);
307 		}
308 		(void) memset((caddr_t)_aio_hash, 0, size);
309 
310 		/* initialize worker's signal mask to only catch SIGAIOCANCEL */
311 		(void) sigfillset(&_worker_set);
312 		(void) sigdelset(&_worker_set, SIGAIOCANCEL);
313 
314 		/*
315 		 * Create equal number of READ and WRITE workers.
316 		 */
317 		i = 0;
318 		while (i++ < (_min_workers/2))
319 			(void) _aio_create_worker(NULL, AIOREAD);
320 		i = 0;
321 		while (i++ < (_min_workers/2))
322 			(void) _aio_create_worker(NULL, AIOWRITE);
323 
324 		/* create one worker to send completion signals. */
325 		(void) _aio_create_worker(NULL, AIOSIGEV);
326 		(void) mutex_unlock(&__aio_initlock);
327 		__uaio_ok = 1;
328 		return (0);
329 	}
330 
331 	(void) mutex_unlock(&__aio_initlock);
332 	return (0);
333 }
334 
335 /*
336  * special kaio cleanup thread sits in a loop in the
337  * kernel waiting for pending kaio requests to complete.
338  */
339 void *
340 _kaio_cleanup_thread(void *arg)
341 {
342 	if (thr_setspecific(_aio_key, arg) != 0)
343 		_aiopanic("_kaio_cleanup_thread, thr_setspecific()\n");
344 	(void) _kaio(AIOSTART);
345 	return (arg);
346 }
347 
348 /*
349  * initialize kaio.
350  */
351 void
352 _kaio_init()
353 {
354 	int error;
355 	sigset_t set, oset;
356 
357 	(void) mutex_lock(&__aio_initlock);
358 	if (_aio_key == 0 &&
359 	    thr_keycreate(&_aio_key, _aio_free_worker) != 0)
360 		_aiopanic("_kaio_init, thr_keycreate()\n");
361 	if (!_kaio_ok) {
362 		_pagesize = (int)PAGESIZE;
363 		__aiostksz = 8 * _pagesize;
364 		if ((_kaiowp = _aio_alloc_worker()) == NULL) {
365 			error =  ENOMEM;
366 		} else {
367 			if ((error = (int)_kaio(AIOINIT)) == 0) {
368 				(void) sigfillset(&set);
369 				(void) _sigprocmask(SIG_SETMASK, &set, &oset);
370 				error = thr_create(NULL, __aiostksz,
371 				    _kaio_cleanup_thread, _kaiowp,
372 				    THR_BOUND | THR_DAEMON, &_kaiowp->work_tid);
373 				(void) _sigprocmask(SIG_SETMASK, &oset, NULL);
374 			}
375 			if (error) {
376 				_aio_free_worker(_kaiowp);
377 				_kaiowp = NULL;
378 			}
379 		}
380 		if (error)
381 			_kaio_ok = -1;
382 		else
383 			_kaio_ok = 1;
384 	}
385 	(void) mutex_unlock(&__aio_initlock);
386 }
387 
388 int
389 aioread(int fd, caddr_t buf, int bufsz, off_t offset, int whence,
390     aio_result_t *resultp)
391 {
392 	return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOREAD));
393 }
394 
395 int
396 aiowrite(int fd, caddr_t buf, int bufsz, off_t offset, int whence,
397     aio_result_t *resultp)
398 {
399 	return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOWRITE));
400 }
401 
402 #if	defined(_LARGEFILE64_SOURCE) && !defined(_LP64)
403 int
404 aioread64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence,
405     aio_result_t *resultp)
406 {
407 	return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAREAD64));
408 }
409 
410 int
411 aiowrite64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence,
412     aio_result_t *resultp)
413 {
414 	return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAWRITE64));
415 }
416 #endif	/* (_LARGEFILE64_SOURCE) && !defined(_LP64) */
417 
418 int
419 _aiorw(int fd, caddr_t buf, int bufsz, offset_t offset, int whence,
420     aio_result_t *resultp, int mode)
421 {
422 	aio_worker_t **nextworker;
423 	aio_req_t *aiorp = NULL;
424 	aio_args_t *ap = NULL;
425 	offset_t loffset = 0;
426 	struct stat stat;
427 	int err = 0;
428 	int kerr;
429 	int umode;
430 
431 	switch (whence) {
432 
433 	case SEEK_SET:
434 		loffset = offset;
435 		break;
436 	case SEEK_CUR:
437 		if ((loffset = llseek(fd, 0, SEEK_CUR)) == -1)
438 			err = -1;
439 		else
440 			loffset += offset;
441 		break;
442 	case SEEK_END:
443 		if (fstat(fd, &stat) == -1)
444 			err = -1;
445 		else
446 			loffset = offset + stat.st_size;
447 		break;
448 	default:
449 		errno = EINVAL;
450 		err = -1;
451 	}
452 
453 	if (err)
454 		return (err);
455 
456 	/* initialize kaio */
457 	if (!_kaio_ok)
458 		_kaio_init();
459 
460 	/*
461 	 * _aio_do_request() needs the original request code (mode) to be able
462 	 * to choose the appropiate 32/64 bit function. All other functions
463 	 * only require the difference between READ and WRITE (umode).
464 	 */
465 	if (mode == AIOAREAD64 || mode == AIOAWRITE64)
466 		umode = mode - AIOAREAD64;
467 	else
468 		umode = mode;
469 
470 	/*
471 	 * Try kernel aio first.
472 	 * If errno is ENOTSUP/EBADFD, fall back to the thread implementation.
473 	 */
474 	if ((_kaio_ok > 0) && (KAIO_SUPPORTED(fd))) {
475 		resultp->aio_errno = 0;
476 		kerr = (int)_kaio(((resultp->aio_return == AIO_INPROGRESS) ?
477 		    (umode | AIO_POLL_BIT) : umode),
478 		    fd, buf, bufsz, loffset, resultp);
479 		if (kerr == 0)
480 			return (0);
481 		else if ((errno != ENOTSUP) && (errno != EBADFD))
482 			return (-1);
483 		if (errno == EBADFD)
484 			SET_KAIO_NOT_SUPPORTED(fd);
485 	}
486 	if (!__uaio_ok) {
487 		if (__uaio_init() == -1)
488 			return (-1);
489 	}
490 
491 	aiorp = _aio_req_alloc();
492 	if (aiorp == (aio_req_t *)-1) {
493 		errno = EAGAIN;
494 		return (-1);
495 	}
496 
497 	/*
498 	 * _aio_do_request() checks aiorp->req_op to differentiate
499 	 * between 32 and 64 bit access.
500 	 */
501 	aiorp->req_op = mode;
502 	aiorp->req_resultp = resultp;
503 	ap = &(aiorp->req_args);
504 	ap->fd = fd;
505 	ap->buf = buf;
506 	ap->bufsz = bufsz;
507 	ap->offset = loffset;
508 
509 	nextworker = ((umode == AIOWRITE) ? &__nextworker_wr :
510 	    &__nextworker_rd);
511 	_aio_lock();
512 	if (_aio_hash_insert(resultp, aiorp)) {
513 		_aio_req_free(aiorp);
514 		_aio_unlock();
515 		errno = EINVAL;
516 		return (-1);
517 	} else {
518 		_aio_unlock();
519 
520 		/*
521 		 * _aio_req_add() only needs the difference between READ and
522 		 * WRITE to choose the right worker queue.
523 		 */
524 		_aio_req_add(aiorp, nextworker, umode);
525 		return (0);
526 	}
527 }
528 
529 int
530 aiocancel(aio_result_t *resultp)
531 {
532 	aio_req_t *aiorp;
533 	struct aio_worker *aiowp;
534 	int done = 0, canceled = 0;
535 
536 	if (!__uaio_ok) {
537 		errno = EINVAL;
538 		return (-1);
539 	}
540 
541 	_aio_lock();
542 	aiorp = _aio_hash_find(resultp);
543 	if (aiorp == NULL) {
544 		if (_aio_outstand_cnt == _aio_req_done_cnt)
545 			errno = EINVAL;
546 		else
547 			errno = EACCES;
548 
549 		_aio_unlock();
550 		return (-1);
551 	} else {
552 		aiowp = aiorp->req_worker;
553 		(void) mutex_lock(&aiowp->work_qlock1);
554 		(void) _aio_cancel_req(aiowp, aiorp, &canceled, &done);
555 		(void) mutex_unlock(&aiowp->work_qlock1);
556 
557 		if (canceled) {
558 			_aio_unlock();
559 			return (0);
560 		}
561 
562 		if (_aio_outstand_cnt == 0) {
563 			_aio_unlock();
564 			errno = EINVAL;
565 			return (-1);
566 		}
567 
568 		if (_aio_outstand_cnt == _aio_req_done_cnt)  {
569 			errno = EINVAL;
570 		} else {
571 			errno = EACCES;
572 		}
573 
574 		_aio_unlock();
575 		return (-1);
576 
577 	}
578 }
579 
580 /*
581  * This must be asynch safe
582  */
583 aio_result_t *
584 aiowait(struct timeval *uwait)
585 {
586 	aio_result_t *uresultp, *kresultp, *resultp;
587 	int dontblock;
588 	int timedwait = 0;
589 	int kaio_errno = 0;
590 	struct timeval twait, *wait = NULL;
591 	hrtime_t hrtend;
592 	hrtime_t hres;
593 
594 	if (uwait) {
595 		/*
596 		 * Check for valid specified wait time. If they are invalid
597 		 * fail the call right away.
598 		 */
599 		if (uwait->tv_sec < 0 || uwait->tv_usec < 0 ||
600 		    uwait->tv_usec >= MICROSEC) {
601 			errno = EINVAL;
602 			return ((aio_result_t *)-1);
603 		}
604 
605 		if ((uwait->tv_sec > 0) || (uwait->tv_usec > 0)) {
606 			hrtend = gethrtime() +
607 				(hrtime_t)uwait->tv_sec * NANOSEC +
608 				(hrtime_t)uwait->tv_usec * (NANOSEC / MICROSEC);
609 			twait = *uwait;
610 			wait = &twait;
611 			timedwait++;
612 		} else {
613 			/* polling */
614 			kresultp = (aio_result_t *)_kaio(AIOWAIT,
615 						(struct timeval *)-1, 1);
616 			if (kresultp != (aio_result_t *)-1 &&
617 			    kresultp != NULL && kresultp != (aio_result_t *)1)
618 				return (kresultp);
619 			_aio_lock();
620 			uresultp = _aio_req_done();
621 			if (uresultp != NULL && uresultp !=
622 			    (aio_result_t *)-1) {
623 				_aio_unlock();
624 				return (uresultp);
625 			}
626 			_aio_unlock();
627 			if (uresultp == (aio_result_t *)-1 &&
628 			    kresultp == (aio_result_t *)-1) {
629 				errno = EINVAL;
630 				return ((aio_result_t *)-1);
631 			} else
632 				return (NULL);
633 		}
634 	}
635 
636 	for (;;) {
637 		_aio_lock();
638 		uresultp = _aio_req_done();
639 		if (uresultp != NULL && uresultp != (aio_result_t *)-1) {
640 			_aio_unlock();
641 			resultp = uresultp;
642 			break;
643 		}
644 		_aiowait_flag++;
645 		_aio_unlock();
646 		dontblock = (uresultp == (aio_result_t *)-1);
647 		kresultp = (aio_result_t *)_kaio(AIOWAIT, wait, dontblock);
648 		kaio_errno = errno;
649 		_aio_lock();
650 		_aiowait_flag--;
651 		_aio_unlock();
652 		if (kresultp == (aio_result_t *)1) {
653 			/* aiowait() awakened by an aionotify() */
654 			continue;
655 		} else if (kresultp != NULL && kresultp != (aio_result_t *)-1) {
656 			resultp = kresultp;
657 			break;
658 		} else if (kresultp == (aio_result_t *)-1 && kaio_errno ==
659 		    EINVAL && uresultp == (aio_result_t *)-1) {
660 			errno = kaio_errno;
661 			resultp = (aio_result_t *)-1;
662 			break;
663 		} else if (kresultp == (aio_result_t *)-1 &&
664 		    kaio_errno == EINTR) {
665 			errno = kaio_errno;
666 			resultp = (aio_result_t *)-1;
667 			break;
668 		} else if (timedwait) {
669 			hres = hrtend - gethrtime();
670 			if (hres <= 0) {
671 				/* time is up. Return */
672 				resultp = NULL;
673 				break;
674 			} else {
675 				/*
676 				 * some time left. Round up the remaining time
677 				 * in nanoseconds to microsec. Retry the call.
678 				 */
679 				hres += (NANOSEC / MICROSEC)-1;
680 				wait->tv_sec = hres / NANOSEC;
681 				wait->tv_usec =
682 					(hres % NANOSEC) / (NANOSEC / MICROSEC);
683 			}
684 		} else {
685 			ASSERT((kresultp == NULL && uresultp == NULL));
686 			resultp = NULL;
687 			continue;
688 		}
689 	}
690 	return (resultp);
691 }
692 
693 /*
694  * _aio_get_timedelta calculates the remaining time and stores the result
695  * into struct timespec *wait.
696  */
697 
698 int
699 _aio_get_timedelta(struct timespec *end, struct timespec *wait)
700 {
701 
702 	int	ret = 0;
703 	struct	timeval cur;
704 	struct	timespec curtime;
705 
706 	(void) gettimeofday(&cur, NULL);
707 	curtime.tv_sec = cur.tv_sec;
708 	curtime.tv_nsec = cur.tv_usec * 1000;   /* convert us to ns */
709 
710 	if (end->tv_sec >= curtime.tv_sec) {
711 		wait->tv_sec = end->tv_sec - curtime.tv_sec;
712 		if (end->tv_nsec >= curtime.tv_nsec) {
713 			wait->tv_nsec = end->tv_nsec - curtime.tv_nsec;
714 			if (wait->tv_sec == 0 && wait->tv_nsec == 0)
715 				ret = -1;	/* timer expired */
716 		} else {
717 			if (end->tv_sec > curtime.tv_sec) {
718 				wait->tv_sec -= 1;
719 				wait->tv_nsec = NANOSEC -
720 				    (curtime.tv_nsec - end->tv_nsec);
721 			} else {
722 				ret = -1;	/* timer expired */
723 			}
724 		}
725 	} else {
726 		ret = -1;
727 	}
728 	return (ret);
729 }
730 
731 /*
732  * If closing by file descriptor: we will simply cancel all the outstanding
733  * aio`s and return. Those aio's in question will have either noticed the
734  * cancellation notice before, during, or after initiating io.
735  */
736 int
737 aiocancel_all(int fd)
738 {
739 	aio_req_t *aiorp;
740 	aio_req_t **aiorpp;
741 	struct aio_worker *first, *next;
742 	int canceled = 0;
743 	int done = 0;
744 	int cancelall = 0;
745 
746 	if (_aio_outstand_cnt == 0)
747 		return (AIO_ALLDONE);
748 
749 	_aio_lock();
750 	/*
751 	 * cancel read requests from the read worker's queue.
752 	 */
753 	first = __nextworker_rd;
754 	next = first;
755 	do {
756 		_aio_cancel_work(next, fd, &canceled, &done);
757 	} while ((next = next->work_forw) != first);
758 
759 	/*
760 	 * cancel write requests from the write workers queue.
761 	 */
762 
763 	first = __nextworker_wr;
764 	next = first;
765 	do {
766 		_aio_cancel_work(next, fd, &canceled, &done);
767 	} while ((next = next->work_forw) != first);
768 
769 	/*
770 	 * finally, check if there are requests on the done queue that
771 	 * should be canceled.
772 	 */
773 	if (fd < 0)
774 		cancelall = 1;
775 	aiorpp = &_aio_done_tail;
776 	while ((aiorp = *aiorpp) != NULL) {
777 		if (cancelall || aiorp->req_args.fd == fd) {
778 			*aiorpp = aiorp->req_next;
779 			_aio_donecnt--;
780 			(void) _aio_hash_del(aiorp->req_resultp);
781 			_aio_req_free(aiorp);
782 		} else
783 			aiorpp = &aiorp->req_next;
784 	}
785 	if (cancelall) {
786 		ASSERT(_aio_donecnt == 0);
787 		_aio_done_head = NULL;
788 	}
789 	_aio_unlock();
790 
791 	if (canceled && done == 0)
792 		return (AIO_CANCELED);
793 	else if (done && canceled == 0)
794 		return (AIO_ALLDONE);
795 	else if ((canceled + done == 0) && KAIO_SUPPORTED(fd))
796 		return ((int)_kaio(AIOCANCEL, fd, NULL));
797 	return (AIO_NOTCANCELED);
798 }
799 
800 /*
801  * cancel requests from a given work queue. if the file descriptor
802  * parameter, fd, is non NULL, then only cancel those requests in
803  * this queue that are to this file descriptor. if the "fd"
804  * parameter is -1, then cancel all requests.
805  */
806 static void
807 _aio_cancel_work(aio_worker_t *aiowp, int fd, int *canceled, int *done)
808 {
809 	aio_req_t *aiorp;
810 
811 	(void) mutex_lock(&aiowp->work_qlock1);
812 	/*
813 	 * cancel queued requests first.
814 	 */
815 	aiorp = aiowp->work_tail1;
816 	while (aiorp != NULL) {
817 		if (fd < 0 || aiorp->req_args.fd == fd) {
818 			if (_aio_cancel_req(aiowp, aiorp, canceled, done)) {
819 				/*
820 				 * callers locks were dropped. aiorp is
821 				 * invalid, start traversing the list from
822 				 * the beginning.
823 				 */
824 				aiorp = aiowp->work_tail1;
825 				continue;
826 			}
827 		}
828 		aiorp = aiorp->req_next;
829 	}
830 	/*
831 	 * since the queued requests have been canceled, there can
832 	 * only be one inprogress request that shoule be canceled.
833 	 */
834 	if ((aiorp = aiowp->work_req) != NULL) {
835 		if (fd < 0 || aiorp->req_args.fd == fd) {
836 			(void) _aio_cancel_req(aiowp, aiorp, canceled, done);
837 			aiowp->work_req = NULL;
838 		}
839 	}
840 	(void) mutex_unlock(&aiowp->work_qlock1);
841 }
842 
843 /*
844  * cancel a request. return 1 if the callers locks were temporarily
845  * dropped, otherwise return 0.
846  */
847 int
848 _aio_cancel_req(aio_worker_t *aiowp, aio_req_t *aiorp, int *canceled, int *done)
849 {
850 	int ostate;
851 	int rwflg = 1;
852 	int siqueued;
853 	int canned;
854 
855 	ASSERT(MUTEX_HELD(&__aio_mutex));
856 	ASSERT(MUTEX_HELD(&aiowp->work_qlock1));
857 	ostate = aiorp->req_state;
858 	if (ostate == AIO_REQ_CANCELED) {
859 		return (0);
860 	}
861 	if (ostate == AIO_REQ_DONE || ostate == AIO_REQ_DONEQ) {
862 		(*done)++;
863 		return (0);
864 	}
865 	if (ostate == AIO_REQ_FREE)
866 		return (0);
867 	if (aiorp->req_op == AIOFSYNC) {
868 		canned = aiorp->lio_head->lio_canned;
869 		aiorp->lio_head->lio_canned = 1;
870 		rwflg = 0;
871 		if (canned)
872 			return (0);
873 	}
874 	aiorp->req_state = AIO_REQ_CANCELED;
875 	_aio_req_del(aiowp, aiorp, ostate);
876 	if (ostate == AIO_REQ_INPROGRESS)
877 		(void) thr_kill(aiowp->work_tid, SIGAIOCANCEL);
878 	(void) mutex_unlock(&aiowp->work_qlock1);
879 	(void) _aio_hash_del(aiorp->req_resultp);
880 	(void) mutex_unlock(&__aio_mutex);
881 	siqueued = _aiodone(aiorp, aiorp->lio_head, rwflg, -1, ECANCELED);
882 	(void) mutex_lock(&__aio_mutex);
883 	(void) mutex_lock(&aiowp->work_qlock1);
884 	_lio_remove(aiorp->lio_head);
885 	if (!siqueued)
886 		_aio_req_free(aiorp);
887 	(*canceled)++;
888 	return (1);
889 }
890 
891 /*
892  * This is the worker's main routine.
893  * The task of this function is to execute all queued requests;
894  * once the last pending request is executed this function will block
895  * in _aio_idle(). A new incoming request must wakeup this thread to
896  * restart the work.
897  * Every worker has an own work queue. The queue lock is required
898  * to synchronize the addition of new requests for this worker or
899  * cancellation of pending/running requests.
900  *
901  * Cancellation scenarios:
902  * The cancellation of a request is being done asynchronously using
903  * _aio_cancel_req() from another thread context.
904  * A queued request can be cancelled in different manners :
905  * a) request is queued but not "in progress" or "done" (AIO_REQ_QUEUED):
906  *	- lock the queue -> remove the request -> unlock the queue
907  *	- this function/thread does not detect this cancellation process
908  * b) request is in progress (AIO_REQ_INPROGRESS) :
909  *	- this function first allow the cancellation of the running
910  *	  request with the flag "work_cancel_flg=1"
911  * 		see _aio_req_get() -> _aio_cancel_on()
912  *	  During this phase, it is allowed to interrupt the worker
913  *	  thread running the request (this thread) using the SIGAIOCANCEL
914  *	  signal.
915  *	  Once this thread returns from the kernel (because the request
916  *	  is just done), then it must disable a possible cancellation
917  *	  and proceed to finish the request. To disable the cancellation
918  *	  this thread must use _aio_cancel_off() to set "work_cancel_flg=0".
919  * c) request is already done (AIO_REQ_DONE || AIO_REQ_DONEQ):
920  *	  same procedure as in a)
921  *
922  * To b)
923  *	This thread uses sigsetjmp() to define the position in the code, where
924  *	it wish to continue working in the case that a SIGAIOCANCEL signal
925  *	is detected.
926  *	Normally this thread should get the cancellation signal during the
927  *	kernel phase (reading or writing). In that case the signal handler
928  *	aiosigcancelhndlr() is activated using the worker thread context,
929  *	which again will use the siglongjmp() function to break the standard
930  *	code flow and jump to the "sigsetjmp" position, provided that
931  *	"work_cancel_flg" is set to "1".
932  *	Because the "work_cancel_flg" is only manipulated by this worker
933  *	thread and it can only run on one CPU at a given time, it is not
934  *	necessary to protect that flag with the queue lock.
935  *	Returning from the kernel (read or write system call) we must
936  *	first disable the use of the SIGAIOCANCEL signal and accordingly
937  *	the use of the siglongjmp() function to prevent a possible deadlock:
938  *	- It can happens that this worker thread returns from the kernel and
939  *	  blocks in "work_qlock1",
940  *	- then a second thread cancels the apparently "in progress" request
941  *	  and sends the SIGAIOCANCEL signal to the worker thread,
942  *	- the worker thread gets assigned the "work_qlock1" and will returns
943  *	  from the kernel,
944  *	- the kernel detects the pending signal and activates the signal
945  *	  handler instead,
946  *	- if the "work_cancel_flg" is still set then the signal handler
947  *	  should use siglongjmp() to cancel the "in progress" request and
948  *	  it would try to acquire the same work_qlock1 in _aio_req_get()
949  *	  for a second time => deadlock.
950  *	To avoid that situation we disable the cancellation of the request
951  *	in progress BEFORE we try to acquire the work_qlock1.
952  *	In that case the signal handler will not call siglongjmp() and the
953  *	worker thread will continue running the standard code flow.
954  *	Then this thread must check the AIO_REQ_CANCELED flag to emulate
955  *	an eventually required siglongjmp() freeing the work_qlock1 and
956  *	avoiding a deadlock.
957  */
958 void *
959 _aio_do_request(void *arglist)
960 {
961 	aio_worker_t *aiowp = (aio_worker_t *)arglist;
962 	struct aio_args *arg;
963 	aio_req_t *aiorp;		/* current AIO request */
964 	int ostate;
965 	ssize_t retval;
966 	int rwflg;
967 
968 	aiowp->work_tid = thr_self();
969 	if (thr_setspecific(_aio_key, aiowp) != 0)
970 		_aiopanic("_aio_do_request, thr_setspecific()\n");
971 
972 cancelit:
973 	if (sigsetjmp(aiowp->work_jmp_buf, 0)) {
974 		_sigprocmask(SIG_SETMASK, &_worker_set, NULL);
975 		goto cancelit;
976 	}
977 
978 	for (;;) {
979 		int err = 0;
980 
981 		/*
982 		 * Put completed requests on aio_done_list. This has
983 		 * to be done as part of the main loop to ensure that
984 		 * we don't artificially starve any aiowait'ers.
985 		 */
986 		if (aiowp->work_done1)
987 			_aio_work_done(aiowp);
988 
989 		while ((aiorp = _aio_req_get(aiowp)) == NULL) {
990 			_aio_idle(aiowp);
991 		}
992 #ifdef DEBUG
993 		_donecnt[aiowp->work_tid]++;
994 #endif
995 		arg = &aiorp->req_args;
996 
997 		err = 0;
998 		rwflg = 1;
999 		switch (aiorp->req_op) {
1000 			case AIOREAD:
1001 				retval = pread(arg->fd, arg->buf,
1002 				    arg->bufsz, arg->offset);
1003 				if (retval == -1) {
1004 					if (errno == ESPIPE) {
1005 						retval = read(arg->fd,
1006 						    arg->buf, arg->bufsz);
1007 						if (retval == -1)
1008 							err = errno;
1009 					} else {
1010 						err = errno;
1011 					}
1012 				}
1013 				break;
1014 			case AIOWRITE:
1015 				retval = pwrite(arg->fd, arg->buf,
1016 				    arg->bufsz, arg->offset);
1017 				if (retval == -1) {
1018 					if (errno == ESPIPE) {
1019 						retval = write(arg->fd,
1020 						    arg->buf, arg->bufsz);
1021 						if (retval == -1)
1022 							err = errno;
1023 					} else {
1024 						err = errno;
1025 					}
1026 				}
1027 				break;
1028 #if	defined(_LARGEFILE64_SOURCE) && !defined(_LP64)
1029 			case AIOAREAD64:
1030 				retval = pread64(arg->fd, arg->buf,
1031 				    arg->bufsz, arg->offset);
1032 				if (retval == -1) {
1033 					if (errno == ESPIPE) {
1034 						retval = read(arg->fd,
1035 						    arg->buf, arg->bufsz);
1036 						if (retval == -1)
1037 							err = errno;
1038 					} else {
1039 						err = errno;
1040 					}
1041 				}
1042 				break;
1043 			case AIOAWRITE64:
1044 				retval = pwrite64(arg->fd, arg->buf,
1045 				    arg->bufsz, arg->offset);
1046 				if (retval == -1) {
1047 					if (errno == ESPIPE) {
1048 						retval = write(arg->fd,
1049 						    arg->buf, arg->bufsz);
1050 						if (retval == -1)
1051 							err = errno;
1052 					} else {
1053 						err = errno;
1054 					}
1055 				}
1056 				break;
1057 #endif	/* (_LARGEFILE64_SOURCE) && !defined(_LP64) */
1058 			case AIOFSYNC:
1059 				if (_aio_fsync_del(aiorp, aiorp->lio_head))
1060 					continue;
1061 				(void) mutex_lock(&aiowp->work_qlock1);
1062 				ostate = aiorp->req_state;
1063 				(void) mutex_unlock(&aiowp->work_qlock1);
1064 				if (ostate == AIO_REQ_CANCELED) {
1065 					(void) mutex_lock(&aiorp->req_lock);
1066 					aiorp->req_canned = 1;
1067 					(void) cond_broadcast(
1068 						&aiorp->req_cancv);
1069 					(void) mutex_unlock(&aiorp->req_lock);
1070 					continue;
1071 				}
1072 				rwflg = 0;
1073 				/*
1074 				 * all writes for this fsync request are
1075 				 * now acknowledged. now, make these writes
1076 				 * visible.
1077 				 */
1078 				if (arg->offset == O_SYNC)
1079 					retval = __fdsync(arg->fd, FSYNC);
1080 				else
1081 					retval = __fdsync(arg->fd, FDSYNC);
1082 				if (retval == -1)
1083 					err = errno;
1084 				break;
1085 			default:
1086 				rwflg = 0;
1087 				_aiopanic("_aio_do_request, bad op\n");
1088 		}
1089 
1090 		/*
1091 		 * Disable the cancellation of the "in progress"
1092 		 * request before trying to acquire the lock of the queue.
1093 		 *
1094 		 * It is not necessary to protect "work_cancel_flg" with
1095 		 * work_qlock1, because this thread can only run on one
1096 		 * CPU at a time.
1097 		 */
1098 
1099 		_aio_cancel_off(aiowp);
1100 		(void) mutex_lock(&aiowp->work_qlock1);
1101 
1102 		/*
1103 		 * if we return here either
1104 		 * - we got the lock and can close the transaction
1105 		 *   as usual or
1106 		 * - the current transaction was cancelled, but siglongjmp
1107 		 *   was not executed
1108 		 */
1109 
1110 		if (aiorp->req_state == AIO_REQ_CANCELED) {
1111 			(void) mutex_unlock(&aiowp->work_qlock1);
1112 			continue;
1113 		}
1114 
1115 		aiorp->req_state = AIO_REQ_DONE;
1116 		_aio_req_done_cnt++;
1117 		(void) mutex_unlock(&aiowp->work_qlock1);
1118 		(void) _aiodone(aiorp, aiorp->lio_head, rwflg, retval, err);
1119 	}
1120 	/* NOTREACHED */
1121 	return (NULL);
1122 }
1123 
1124 /*
1125  * posix supports signal notification for completed aio requests.
1126  * when aio_do_requests() notices that an aio requests should send
1127  * a signal, the aio request is moved to the signal notification
1128  * queue. this routine drains this queue, and guarentees that the
1129  * signal notification is sent.
1130  */
1131 void *
1132 _aio_send_sigev(void *arg)
1133 {
1134 	aio_req_t *rp;
1135 	aio_worker_t *aiowp = (aio_worker_t *)arg;
1136 
1137 	aiowp->work_tid = thr_self();
1138 	if (thr_setspecific(_aio_key, aiowp) != 0)
1139 		_aiopanic("_aio_send_sigev, thr_setspecific()\n");
1140 
1141 	for (;;) {
1142 		while ((rp = _aio_req_get(aiowp)) == NULL) {
1143 			_aio_idle(aiowp);
1144 		}
1145 		if (rp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) {
1146 			while (__sigqueue(__pid, rp->aio_sigevent.sigev_signo,
1147 			    rp->aio_sigevent.sigev_value.sival_ptr,
1148 			    SI_ASYNCIO) == -1)
1149 				thr_yield();
1150 		}
1151 		if (rp->lio_signo) {
1152 			while (__sigqueue(__pid, rp->lio_signo,
1153 			    rp->lio_sigval.sival_ptr, SI_ASYNCIO) == -1)
1154 				thr_yield();
1155 		}
1156 		_aio_lock();
1157 		_lio_remove(rp->lio_head);
1158 		_aio_req_free(rp);
1159 		_aio_unlock();
1160 	}
1161 	/* NOTREACHED */
1162 	return (NULL);
1163 }
1164 
1165 /*
1166  * do the completion semantic for a request that was either canceled
1167  * by _aio_cancel_req(), or was completed by _aio_do_request(). return
1168  * the value 1 when a sigevent was queued, otherwise return 0.
1169  */
1170 
1171 static int
1172 _aiodone(aio_req_t *rp, aio_lio_t *head, int rwflg, ssize_t retval, int err)
1173 {
1174 	volatile aio_result_t *resultp;
1175 #if defined(_LARGEFILE64_SOURCE) && !defined(_LP64)
1176 	aiocb64_t	*aiop64;
1177 #endif
1178 	int sigev;
1179 
1180 	_aio_lock();
1181 
1182 	if (POSIX_AIO(rp)) {
1183 		void	*user;
1184 		int	port;
1185 		int	error;
1186 
1187 		if (rp->aio_sigevent.sigev_notify == SIGEV_PORT) {
1188 			resultp = rp->req_resultp;
1189 			resultp->aio_return = retval;
1190 			resultp->aio_errno = err;
1191 
1192 			if (err == ECANCELED || rwflg)
1193 				_aio_outstand_cnt--;
1194 
1195 #if defined(_LARGEFILE64_SOURCE) && !defined(_LP64)
1196 			if (rp->req_op == AIOAREAD64 ||
1197 			    rp->req_op == AIOAWRITE64) {
1198 				aiop64 = (void *)rp->req_iocb;
1199 				aiop64->aio_state = USERAIO_DONE;
1200 			} else
1201 #endif
1202 				rp->req_iocb->aio_state = USERAIO_DONE;
1203 
1204 			port = rp->aio_sigevent.sigev_signo;
1205 			user = rp->aio_sigevent.sigev_value.sival_ptr;
1206 			error = _port_dispatch(port, 0, PORT_SOURCE_AIO, 0,
1207 			    (uintptr_t)rp->req_iocb, user);
1208 			if (error == 0) {
1209 				(void) _aio_hash_del(rp->req_resultp);
1210 				_aio_req_free(rp);
1211 				_aio_unlock();
1212 				return (1);
1213 			}
1214 			/*
1215 			 * Can not submit the I/O completion to the port,
1216 			 * set status of transaction to NONE
1217 			 */
1218 			rp->aio_sigevent.sigev_notify = SIGEV_NONE;
1219 			if (err == ECANCELED || rwflg)
1220 				_aio_outstand_cnt++;
1221 		}
1222 
1223 		sigev = (rp->aio_sigevent.sigev_notify == SIGEV_SIGNAL ||
1224 		    (head && head->lio_signo));
1225 		if (sigev)
1226 			(void) _aio_hash_del(rp->req_resultp);
1227 
1228 		resultp = rp->req_resultp;
1229 		/*
1230 		 * resultp is declared "volatile" (above) to avoid
1231 		 * optimization by compiler ie. switching order which could
1232 		 * lead aio_return getting checked by aio_error() following
1233 		 * a particular aio_errno value (aio_return would not have been
1234 		 * set yet)
1235 		 */
1236 		resultp->aio_return = retval;
1237 		resultp->aio_errno = err;
1238 
1239 		if (err == ECANCELED) {
1240 			_aio_outstand_cnt--;
1241 		} else {
1242 			if (rwflg) {
1243 				if (!sigev)
1244 					_aio_enq_doneq(rp);
1245 				_aio_outstand_cnt--;
1246 			}
1247 
1248 		}
1249 
1250 		/*
1251 		 * __aio_waitn() sets AIO_IO_WAITING to notify _aiodone() that
1252 		 * it is waiting for completed I/Os. The number of required
1253 		 * completed I/Os is stored into "_aio_waitncnt".
1254 		 * aio_waitn() is woken up when
1255 		 * - there are no further outstanding I/Os
1256 		 *   (_aio_outstand_cnt == 0) or
1257 		 * - the expected number of I/Os has completed.
1258 		 * Only one __aio_waitn() function waits for completed I/Os at
1259 		 * a time.
1260 		 *
1261 		 * __aio_suspend() increments "_aio_suscv_cnt" to notify
1262 		 * _aiodone() that at least one __aio_suspend() call is
1263 		 * waiting for completed I/Os.
1264 		 * There could be more than one __aio_suspend() function
1265 		 * waiting for completed I/Os. Because every function should
1266 		 * be waiting for different I/Os, _aiodone() has to wake up all
1267 		 * __aio_suspend() functions each time.
1268 		 * Every __aio_suspend() function will compare the recently
1269 		 * completed I/O with its own list.
1270 		 */
1271 		if (_aio_flags & AIO_IO_WAITING) {
1272 			if (_aio_waitncnt > 0)
1273 				_aio_waitncnt--;
1274 			if (_aio_outstand_cnt == 0 || _aio_waitncnt == 0 ||
1275 			    _aio_suscv_cnt > 0)
1276 				(void) cond_broadcast(&_aio_iowait_cv);
1277 		} else {
1278 			/* Wake up waiting aio_suspend calls */
1279 			if (_aio_suscv_cnt > 0)
1280 				(void) cond_broadcast(&_aio_iowait_cv);
1281 		}
1282 
1283 		_aio_unlock();
1284 
1285 		/*
1286 		 * __aio_waitn() sets AIO_WAIT_INPROGRESS and
1287 		 * __aio_suspend() increments "_aio_kernel_suspend"
1288 		 * when they are waiting in the kernel for completed I/Os.
1289 		 *
1290 		 * _kaio(AIONOTIFY) awakes the corresponding function
1291 		 * in the kernel; then the corresponding __aio_waitn() or
1292 		 * __aio_suspend() function could reap the recently
1293 		 * completed I/Os (_aiodone()).
1294 		 */
1295 		if (err != ECANCELED) {
1296 			if (_aio_flags & AIO_WAIT_INPROGRESS ||
1297 			    _aio_kernel_suspend > 0) {
1298 				(void) _kaio(AIONOTIFY);
1299 			}
1300 		}
1301 
1302 		rp->lio_signo = 0;
1303 		rp->lio_sigval.sival_int = 0;
1304 		if (head) {
1305 			/*
1306 			 * If all the lio requests have completed,
1307 			 * signal the waiting process
1308 			 */
1309 			(void) mutex_lock(&head->lio_mutex);
1310 			if (--head->lio_refcnt == 0) {
1311 				if (head->lio_mode == LIO_WAIT)
1312 					(void) cond_signal(&head->lio_cond_cv);
1313 				else {
1314 					rp->lio_signo = head->lio_signo;
1315 					rp->lio_sigval = head->lio_sigval;
1316 				}
1317 			}
1318 			(void) mutex_unlock(&head->lio_mutex);
1319 		}
1320 		if (sigev) {
1321 			_aio_req_add(rp, &__workers_si, AIOSIGEV);
1322 			return (1);
1323 		}
1324 	} else {
1325 		/* Solaris I/O */
1326 		if (err == ECANCELED)
1327 			_aio_outstand_cnt--;
1328 
1329 		_aio_unlock();
1330 
1331 		resultp = rp->req_resultp;
1332 		resultp->aio_return = retval;
1333 		resultp->aio_errno = err;
1334 	}
1335 	return (0);
1336 }
1337 
1338 /*
1339  * delete fsync requests from list head until there is
1340  * only one left. return 0 when there is only one, otherwise
1341  * return a non-zero value.
1342  */
1343 static int
1344 _aio_fsync_del(aio_req_t *rp, aio_lio_t *head)
1345 {
1346 	int refcnt;
1347 
1348 	(void) mutex_lock(&head->lio_mutex);
1349 	if (head->lio_refcnt > 1 || head->lio_mode == LIO_DESTROY ||
1350 	    head->lio_canned) {
1351 		refcnt = --head->lio_refcnt;
1352 		if (refcnt || head->lio_canned) {
1353 			head->lio_nent--;
1354 			(void) mutex_unlock(&head->lio_mutex);
1355 			(void) mutex_lock(&__aio_mutex);
1356 			_aio_req_free(rp);
1357 			(void) mutex_unlock(&__aio_mutex);
1358 			if (head->lio_canned) {
1359 				ASSERT(refcnt >= 0);
1360 				return (0);
1361 			}
1362 			return (1);
1363 		}
1364 		ASSERT(head->lio_mode == LIO_DESTROY);
1365 		ASSERT(head->lio_nent == 1 && head->lio_refcnt == 0);
1366 		(void) mutex_unlock(&head->lio_mutex);
1367 		_aio_remove(rp);
1368 		return (0);
1369 	}
1370 	ASSERT(head->lio_refcnt == head->lio_nent);
1371 	(void) mutex_unlock(&head->lio_mutex);
1372 	return (0);
1373 }
1374 
1375 /*
1376  * worker is set idle when its work queue is empty.
1377  * The worker checks again that it has no more work and then
1378  * goes to sleep waiting for more work.
1379  */
1380 void
1381 _aio_idle(aio_worker_t *aiowp)
1382 {
1383 	(void) mutex_lock(&aiowp->work_lock);
1384 	if (aiowp->work_cnt1 == 0) {
1385 #ifdef DEBUG
1386 		_idlecnt[aiowp->work_tid]++;
1387 #endif
1388 		aiowp->work_idleflg = 1;
1389 		(void) cond_wait(&aiowp->work_idle_cv, &aiowp->work_lock);
1390 		/*
1391 		 * idle flag is cleared before worker is awakened
1392 		 * by aio_req_add().
1393 		 */
1394 	}
1395 	(void) mutex_unlock(&aiowp->work_lock);
1396 }
1397 
1398 /*
1399  * A worker's completed AIO requests are placed onto a global
1400  * done queue. The application is only sent a SIGIO signal if
1401  * the process has a handler enabled and it is not waiting via
1402  * aiowait().
1403  */
1404 static void
1405 _aio_work_done(struct aio_worker *aiowp)
1406 {
1407 	struct aio_req *done_req = NULL;
1408 
1409 	(void) mutex_lock(&aiowp->work_qlock1);
1410 	done_req = aiowp->work_prev1;
1411 	done_req->req_next = NULL;
1412 	aiowp->work_done1 = 0;
1413 	aiowp->work_tail1 = aiowp->work_next1;
1414 	if (aiowp->work_tail1 == NULL)
1415 		aiowp->work_head1 = NULL;
1416 	aiowp->work_prev1 = NULL;
1417 	(void) mutex_unlock(&aiowp->work_qlock1);
1418 	(void) mutex_lock(&__aio_mutex);
1419 	_aio_donecnt++;
1420 	_aio_outstand_cnt--;
1421 	_aio_req_done_cnt--;
1422 	ASSERT(_aio_donecnt > 0 && _aio_outstand_cnt >= 0);
1423 	ASSERT(done_req != NULL);
1424 
1425 	if (_aio_done_tail == NULL) {
1426 		_aio_done_head = _aio_done_tail = done_req;
1427 	} else {
1428 		_aio_done_head->req_next = done_req;
1429 		_aio_done_head = done_req;
1430 	}
1431 
1432 	if (_aiowait_flag) {
1433 		(void) mutex_unlock(&__aio_mutex);
1434 		(void) _kaio(AIONOTIFY);
1435 	} else {
1436 		(void) mutex_unlock(&__aio_mutex);
1437 		if (_sigio_enabled) {
1438 			(void) kill(__pid, SIGIO);
1439 		}
1440 	}
1441 }
1442 
1443 /*
1444  * the done queue consists of AIO requests that are in either the
1445  * AIO_REQ_DONE or AIO_REQ_CANCELED state. requests that were cancelled
1446  * are discarded. if the done queue is empty then NULL is returned.
1447  * otherwise the address of a done aio_result_t is returned.
1448  */
1449 struct aio_result_t *
1450 _aio_req_done(void)
1451 {
1452 	struct aio_req *next;
1453 	aio_result_t *resultp;
1454 
1455 	ASSERT(MUTEX_HELD(&__aio_mutex));
1456 
1457 	if ((next = _aio_done_tail) != NULL) {
1458 		_aio_done_tail = next->req_next;
1459 		ASSERT(_aio_donecnt > 0);
1460 		_aio_donecnt--;
1461 		(void) _aio_hash_del(next->req_resultp);
1462 		resultp = next->req_resultp;
1463 		ASSERT(next->req_state == AIO_REQ_DONE);
1464 		_aio_req_free(next);
1465 		return (resultp);
1466 	}
1467 	/* is queue empty? */
1468 	if (next == NULL && _aio_outstand_cnt == 0) {
1469 		return ((aio_result_t *)-1);
1470 	}
1471 	return (NULL);
1472 }
1473 
1474 /*
1475  * add an AIO request onto the next work queue. a circular list of
1476  * workers is used to choose the next worker. each worker has two
1477  * work queues. if the lock for the first queue is busy then the
1478  * request is placed on the second queue. the request is always
1479  * placed on one of the two queues depending on which one is locked.
1480  */
1481 void
1482 _aio_req_add(aio_req_t *aiorp, aio_worker_t **nextworker, int mode)
1483 {
1484 	struct aio_worker *aiowp;
1485 	struct aio_worker *first;
1486 	int clogged = 0;
1487 	int found = 0;
1488 	int load_bal_flg;
1489 	int idleflg;
1490 	int qactive;
1491 
1492 	aiorp->req_next = NULL;
1493 	ASSERT(*nextworker != NULL);
1494 	aiowp = *nextworker;
1495 	/*
1496 	 * try to acquire the next worker's work queue. if it is locked,
1497 	 * then search the list of workers until a queue is found unlocked,
1498 	 * or until the list is completely traversed at which point another
1499 	 * worker will be created.
1500 	 */
1501 	first = aiowp;
1502 	_aio_lock();
1503 	__sigio_maskedcnt++;	/* disable SIGIO */
1504 	if (mode == AIOREAD || mode == AIOWRITE) {
1505 		_aio_outstand_cnt++;
1506 		load_bal_flg = 1;
1507 	}
1508 	_aio_unlock();
1509 	switch (mode) {
1510 		case AIOREAD:
1511 			/* try to find an idle worker. */
1512 			do {
1513 				if (mutex_trylock(&aiowp->work_qlock1) == 0) {
1514 					if (aiowp->work_idleflg) {
1515 						found = 1;
1516 						break;
1517 					}
1518 					(void) mutex_unlock(
1519 						&aiowp->work_qlock1);
1520 				}
1521 			} while ((aiowp = aiowp->work_forw) != first);
1522 			if (found)
1523 				break;
1524 			/*FALLTHROUGH*/
1525 		case AIOWRITE:
1526 			while (mutex_trylock(&aiowp->work_qlock1)) {
1527 #ifdef DEBUG
1528 				_qlocked++;
1529 #endif
1530 				if (((aiowp = aiowp->work_forw)) == first) {
1531 					clogged = 1;
1532 					break;
1533 				}
1534 			}
1535 			/*
1536 			 * create more workers when the workers appear
1537 			 * overloaded. either all the workers are busy
1538 			 * draining their queues, no worker's queue lock
1539 			 * could be acquired, or the selected worker has
1540 			 * exceeded its minimum work load, but has not
1541 			 * exceeded the max number of workers.
1542 			 */
1543 			if (clogged) {
1544 #ifdef DEBUG
1545 				_new_workers++;
1546 				_clogged++;
1547 #endif
1548 				if (_aio_worker_cnt < _max_workers) {
1549 					if (_aio_create_worker(aiorp, mode))
1550 						_aiopanic(
1551 						    "_aio_req_add: clogged");
1552 					_aio_lock();
1553 					__sigio_maskedcnt--;
1554 					_aio_unlock();
1555 					return;
1556 				}
1557 
1558 				/*
1559 				 * No worker available and we have created
1560 				 * _max_workers, keep going through the
1561 				 * list until we get a lock
1562 				 */
1563 				while (mutex_trylock(&aiowp->work_qlock1)) {
1564 					/*
1565 					 * give someone else a chance
1566 					 */
1567 					thr_yield();
1568 					aiowp = aiowp->work_forw;
1569 				}
1570 
1571 			}
1572 			ASSERT(MUTEX_HELD(&aiowp->work_qlock1));
1573 			aiowp->work_minload1++;
1574 			if (_aio_worker_cnt < _max_workers &&
1575 			    aiowp->work_minload1 > _minworkload) {
1576 				aiowp->work_minload1 = 0;
1577 				(void) mutex_unlock(&aiowp->work_qlock1);
1578 #ifdef DEBUG
1579 				_qfullcnt[aiowp->work_tid]++;
1580 				_new_workers++;
1581 				_newworker[aiowp->work_tid]++;
1582 				_avedone = _aio_submitcnt2/_new_workers;
1583 #endif
1584 				(void) mutex_lock(&__aio_mutex);
1585 				*nextworker = aiowp->work_forw;
1586 				(void) mutex_unlock(&__aio_mutex);
1587 				if (_aio_create_worker(aiorp, mode))
1588 					_aiopanic("aio_req_add: add worker");
1589 				_aio_lock();
1590 				__sigio_maskedcnt--; /* enable signals again */
1591 				_aio_unlock(); /* send evt. SIGIO signal */
1592 				return;
1593 			}
1594 			break;
1595 		case AIOFSYNC:
1596 			aiorp->req_op = mode;
1597 			/*FALLTHROUGH*/
1598 		case AIOSIGEV:
1599 			load_bal_flg = 0;
1600 			(void) mutex_lock(&aiowp->work_qlock1);
1601 			break;
1602 	}
1603 	/*
1604 	 * Put request onto worker's work queue.
1605 	 */
1606 	if (aiowp->work_tail1 == NULL) {
1607 		ASSERT(aiowp->work_cnt1 == 0);
1608 		aiowp->work_tail1 = aiorp;
1609 		aiowp->work_next1 = aiorp;
1610 	} else {
1611 		aiowp->work_head1->req_next = aiorp;
1612 		if (aiowp->work_next1 == NULL)
1613 			aiowp->work_next1 = aiorp;
1614 	}
1615 	aiorp->req_state = AIO_REQ_QUEUED;
1616 	aiorp->req_worker = aiowp;
1617 	aiowp->work_head1 = aiorp;
1618 	qactive = aiowp->work_cnt1++;
1619 	(void) mutex_unlock(&aiowp->work_qlock1);
1620 	if (load_bal_flg) {
1621 		_aio_lock();
1622 		*nextworker = aiowp->work_forw;
1623 		_aio_unlock();
1624 	}
1625 	/*
1626 	 * Awaken worker if it is not currently active.
1627 	 */
1628 	if (!qactive) {
1629 		(void) mutex_lock(&aiowp->work_lock);
1630 		idleflg = aiowp->work_idleflg;
1631 		aiowp->work_idleflg = 0;
1632 		(void) mutex_unlock(&aiowp->work_lock);
1633 		if (idleflg)
1634 			(void) cond_signal(&aiowp->work_idle_cv);
1635 	}
1636 	_aio_lock();
1637 	__sigio_maskedcnt--;	/* enable signals again */
1638 	_aio_unlock();		/* send SIGIO signal if pending */
1639 }
1640 
1641 /*
1642  * get an AIO request for a specified worker. each worker has
1643  * two work queues. find the first one that is not empty and
1644  * remove this request from the queue and return it back to the
1645  * caller. if both queues are empty, then return a NULL.
1646  */
1647 aio_req_t *
1648 _aio_req_get(aio_worker_t *aiowp)
1649 {
1650 	aio_req_t *next;
1651 	int mode;
1652 
1653 	(void) mutex_lock(&aiowp->work_qlock1);
1654 	if ((next = aiowp->work_next1) != NULL) {
1655 		/*
1656 		 * remove a POSIX request from the queue; the
1657 		 * request queue is a singularly linked list
1658 		 * with a previous pointer. The request is removed
1659 		 * by updating the previous pointer.
1660 		 *
1661 		 * non-posix requests are left on the queue to
1662 		 * eventually be placed on the done queue.
1663 		 */
1664 
1665 		if (next->req_type == AIO_POSIX_REQ) {
1666 			if (aiowp->work_prev1 == NULL) {
1667 				aiowp->work_tail1 = next->req_next;
1668 				if (aiowp->work_tail1 == NULL)
1669 					aiowp->work_head1 = NULL;
1670 			} else {
1671 				aiowp->work_prev1->req_next = next->req_next;
1672 				if (aiowp->work_head1 == next)
1673 					aiowp->work_head1 = next->req_next;
1674 			}
1675 
1676 		} else {
1677 			aiowp->work_prev1 = next;
1678 			ASSERT(aiowp->work_done1 >= 0);
1679 			aiowp->work_done1++;
1680 		}
1681 		ASSERT(next != next->req_next);
1682 		aiowp->work_next1 = next->req_next;
1683 		ASSERT(aiowp->work_cnt1 >= 1);
1684 		aiowp->work_cnt1--;
1685 		mode = next->req_op;
1686 		if (mode == AIOWRITE || mode == AIOREAD || mode == AIOAREAD64 ||
1687 		    mode == AIOAWRITE64)
1688 			aiowp->work_minload1--;
1689 #ifdef DEBUG
1690 		_firstqcnt[aiowp->work_tid]++;
1691 #endif
1692 		next->req_state = AIO_REQ_INPROGRESS;
1693 		_aio_cancel_on(aiowp);
1694 	}
1695 	aiowp->work_req = next;
1696 	ASSERT(next != NULL || (next == NULL && aiowp->work_cnt1 == 0));
1697 	(void) mutex_unlock(&aiowp->work_qlock1);
1698 	return (next);
1699 }
1700 
1701 static void
1702 _aio_req_del(aio_worker_t *aiowp, aio_req_t *rp, int ostate)
1703 {
1704 	aio_req_t **last, *lastrp, *next;
1705 
1706 	ASSERT(aiowp != NULL);
1707 	ASSERT(MUTEX_HELD(&aiowp->work_qlock1));
1708 	if (POSIX_AIO(rp)) {
1709 		if (ostate != AIO_REQ_QUEUED)
1710 			return;
1711 	}
1712 	last = &aiowp->work_tail1;
1713 	lastrp = aiowp->work_tail1;
1714 	ASSERT(ostate == AIO_REQ_QUEUED || ostate == AIO_REQ_INPROGRESS);
1715 	while ((next = *last) != NULL) {
1716 		if (next == rp) {
1717 			*last = next->req_next;
1718 			if (aiowp->work_next1 == next)
1719 				aiowp->work_next1 = next->req_next;
1720 
1721 			if ((next->req_next != NULL) ||
1722 			    (aiowp->work_done1 == 0)) {
1723 				if (aiowp->work_head1 == next)
1724 					aiowp->work_head1 = next->req_next;
1725 				if (aiowp->work_prev1 == next)
1726 					aiowp->work_prev1 = next->req_next;
1727 			} else {
1728 				if (aiowp->work_head1 == next)
1729 					aiowp->work_head1 = lastrp;
1730 				if (aiowp->work_prev1 == next)
1731 					aiowp->work_prev1 = lastrp;
1732 			}
1733 
1734 			if (ostate == AIO_REQ_QUEUED) {
1735 				ASSERT(aiowp->work_cnt1 >= 1);
1736 				aiowp->work_cnt1--;
1737 			} else {
1738 				ASSERT(ostate == AIO_REQ_INPROGRESS &&
1739 				    !POSIX_AIO(rp));
1740 				aiowp->work_done1--;
1741 			}
1742 			return;
1743 		}
1744 		last = &next->req_next;
1745 		lastrp = next;
1746 	}
1747 	/* NOTREACHED */
1748 }
1749 
1750 
1751 static void
1752 _aio_enq_doneq(aio_req_t *reqp)
1753 {
1754 	if (_aio_doneq == NULL) {
1755 		_aio_doneq = reqp;
1756 		reqp->req_next = reqp;
1757 		reqp->req_prev = reqp;
1758 	} else {
1759 		reqp->req_next = _aio_doneq;
1760 		reqp->req_prev = _aio_doneq->req_prev;
1761 		reqp->req_prev->req_next = reqp;
1762 		_aio_doneq->req_prev = reqp;
1763 	}
1764 	reqp->req_state = AIO_REQ_DONEQ;
1765 	_aio_doneq_cnt++;
1766 }
1767 
1768 /*
1769  * caller owns the _aio_mutex
1770  */
1771 
1772 aio_req_t *
1773 _aio_req_remove(aio_req_t *reqp)
1774 {
1775 	aio_req_t *head;
1776 
1777 	if (reqp && reqp->req_state != AIO_REQ_DONEQ)
1778 		return (NULL);
1779 
1780 	if (reqp) {
1781 		/* request in done queue */
1782 		if (reqp->req_next == reqp) {
1783 			/* only one request on queue */
1784 			_aio_doneq = NULL;
1785 		} else {
1786 			reqp->req_next->req_prev = reqp->req_prev;
1787 			reqp->req_prev->req_next = reqp->req_next;
1788 			if (reqp == _aio_doneq)
1789 				_aio_doneq = reqp->req_next;
1790 		}
1791 		_aio_doneq_cnt--;
1792 		return (reqp);
1793 	}
1794 
1795 	if (_aio_doneq) {
1796 		head = _aio_doneq;
1797 		if (head == head->req_next) {
1798 			/* only one request on queue */
1799 			_aio_doneq = NULL;
1800 		} else {
1801 			head->req_prev->req_next = head->req_next;
1802 			head->req_next->req_prev = head->req_prev;
1803 			_aio_doneq = head->req_next;
1804 		}
1805 		_aio_doneq_cnt--;
1806 		return (head);
1807 	}
1808 	return (NULL);
1809 
1810 }
1811 
1812 /*
1813  * An AIO request is identified by an aio_result_t pointer.  The AIO
1814  * library maps this aio_result_t pointer to its internal representation
1815  * via a hash table.  This function adds an aio_result_t pointer to
1816  * the hash table.
1817  */
1818 static int
1819 _aio_hash_insert(aio_result_t *resultp, aio_req_t *aiorp)
1820 {
1821 	uintptr_t i;
1822 	aio_req_t *next, **last;
1823 
1824 	ASSERT(MUTEX_HELD(&__aio_mutex));
1825 	i = AIOHASH(resultp);
1826 	last = (_aio_hash + i);
1827 	while ((next = *last) != NULL) {
1828 		if (resultp == next->req_resultp)
1829 			return (-1);
1830 		last = &next->req_link;
1831 	}
1832 	*last = aiorp;
1833 	ASSERT(aiorp->req_link == NULL);
1834 	return (0);
1835 }
1836 
1837 /*
1838  * remove an entry from the hash table.
1839  */
1840 struct aio_req *
1841 _aio_hash_del(aio_result_t *resultp)
1842 {
1843 	struct aio_req *next, **prev;
1844 	uintptr_t i;
1845 
1846 	ASSERT(MUTEX_HELD(&__aio_mutex));
1847 	i = AIOHASH(resultp);
1848 	prev = (_aio_hash + i);
1849 	while ((next = *prev) != NULL) {
1850 		if (resultp == next->req_resultp) {
1851 			*prev = next->req_link;
1852 			return (next);
1853 		}
1854 		prev = &next->req_link;
1855 	}
1856 	ASSERT(next == NULL);
1857 	return ((struct aio_req *)NULL);
1858 }
1859 
1860 /*
1861  *  find an entry on the hash table
1862  */
1863 struct aio_req *
1864 _aio_hash_find(aio_result_t *resultp)
1865 {
1866 	struct aio_req *next, **prev;
1867 	uintptr_t i;
1868 
1869 	/*
1870 	 * no user AIO
1871 	 */
1872 	if (_aio_hash == NULL)
1873 		return (NULL);
1874 
1875 	i = AIOHASH(resultp);
1876 	prev = (_aio_hash + i);
1877 	while ((next = *prev) != NULL) {
1878 		if (resultp == next->req_resultp) {
1879 			return (next);
1880 		}
1881 		prev = &next->req_link;
1882 	}
1883 	return (NULL);
1884 }
1885 
1886 /*
1887  * Allocate and free aios.  They are cached.
1888  */
1889 aio_req_t *
1890 _aio_req_alloc(void)
1891 {
1892 	aio_req_t *aiorp;
1893 	int err;
1894 
1895 	_aio_lock();
1896 	while (_aio_freelist == NULL) {
1897 		_aio_unlock();
1898 		err = 0;
1899 		(void) mutex_lock(&__aio_cachefillock);
1900 		if (__aio_cachefilling)
1901 			(void) cond_wait(&__aio_cachefillcv,
1902 				&__aio_cachefillock);
1903 		else
1904 			err = _fill_aiocache(HASHSZ);
1905 		(void) mutex_unlock(&__aio_cachefillock);
1906 		if (err)
1907 			return ((aio_req_t *)-1);
1908 		_aio_lock();
1909 	}
1910 	aiorp = _aio_freelist;
1911 	_aio_freelist = _aio_freelist->req_link;
1912 	aiorp->req_type = 0;
1913 	aiorp->req_link = NULL;
1914 	aiorp->req_next = NULL;
1915 	aiorp->lio_head = NULL;
1916 	aiorp->aio_sigevent.sigev_notify = SIGEV_NONE;
1917 	_aio_freelist_cnt--;
1918 	_aio_unlock();
1919 	return (aiorp);
1920 }
1921 
1922 /*
1923  * fill the aio request cache with empty aio request structures.
1924  */
1925 int
1926 _fill_aiocache(int n)
1927 {
1928 	aio_req_t *next, *aiorp, *first;
1929 	int cnt;
1930 	uintptr_t ptr;
1931 	int i;
1932 
1933 	__aio_cachefilling = 1;
1934 	if ((ptr = (uintptr_t)malloc(sizeof (struct aio_req) * n)) == NULL) {
1935 		__aio_cachefilling = 0;
1936 		(void) cond_broadcast(&__aio_cachefillcv);
1937 		return (-1);
1938 	}
1939 	if (ptr & 0x7)
1940 		_aiopanic("_fill_aiocache");
1941 	first = (struct aio_req *)ptr;
1942 	next = first;
1943 	cnt = n - 1;
1944 	for (i = 0; i < cnt; i++) {
1945 		aiorp = next++;
1946 		aiorp->req_state = AIO_REQ_FREE;
1947 		aiorp->req_link = next;
1948 		(void) mutex_init(&aiorp->req_lock, USYNC_THREAD, NULL);
1949 		(void) cond_init(&aiorp->req_cancv, USYNC_THREAD, NULL);
1950 	}
1951 	__aio_cachefilling = 0;
1952 	(void) cond_broadcast(&__aio_cachefillcv);
1953 	next->req_state = AIO_REQ_FREE;
1954 	next->req_link = NULL;
1955 	(void) mutex_init(&next->req_lock, USYNC_THREAD, NULL);
1956 	(void) cond_init(&next->req_cancv, USYNC_THREAD, NULL);
1957 	_aio_lock();
1958 	_aio_freelist_cnt = n;
1959 	_aio_freelist = first;
1960 	_aio_unlock();
1961 	return (0);
1962 }
1963 
1964 /*
1965  * put an aio request back onto the freelist.
1966  */
1967 void
1968 _aio_req_free(aio_req_t *aiorp)
1969 {
1970 	ASSERT(MUTEX_HELD(&__aio_mutex));
1971 	aiorp->req_state = AIO_REQ_FREE;
1972 	aiorp->req_link = _aio_freelist;
1973 	_aio_freelist = aiorp;
1974 	_aio_freelist_cnt++;
1975 }
1976 
1977 /*
1978  * global aio lock that masks SIGIO signals.
1979  */
1980 void
1981 _aio_lock(void)
1982 {
1983 	__sigio_masked = 1;
1984 	(void) mutex_lock(&__aio_mutex);
1985 	__sigio_maskedcnt++;
1986 }
1987 
1988 /*
1989  * release global aio lock. send SIGIO signal if one
1990  * is pending.
1991  */
1992 void
1993 _aio_unlock(void)
1994 {
1995 	if (--__sigio_maskedcnt == 0)
1996 		__sigio_masked = 0;
1997 	(void) mutex_unlock(&__aio_mutex);
1998 	if (__sigio_pending)
1999 		__aiosendsig();
2000 }
2001 
2002 /*
2003  * AIO interface for POSIX
2004  */
2005 int
2006 _aio_rw(aiocb_t *cb, aio_lio_t *lio_head, aio_worker_t **nextworker,
2007     int mode, int flg, struct sigevent *sigp)
2008 {
2009 	aio_req_t *aiorp = NULL;
2010 	aio_args_t *ap = NULL;
2011 	int kerr;
2012 	int umode;
2013 
2014 	if (cb == NULL) {
2015 		errno = EINVAL;
2016 		return (-1);
2017 	}
2018 
2019 	/* initialize kaio */
2020 	if (!_kaio_ok)
2021 		_kaio_init();
2022 
2023 	cb->aio_state = NOCHECK;
2024 
2025 	/*
2026 	 * If _aio_rw() is called because a list I/O
2027 	 * kaio() failed, we dont want to repeat the
2028 	 * system call
2029 	 */
2030 
2031 	if (flg & AIO_KAIO) {
2032 		/*
2033 		 * Try kernel aio first.
2034 		 * If errno is ENOTSUP/EBADFD,
2035 		 * fall back to the thread implementation.
2036 		 */
2037 		if ((_kaio_ok > 0) && (KAIO_SUPPORTED(cb->aio_fildes)))  {
2038 			cb->aio_resultp.aio_errno = EINPROGRESS;
2039 			cb->aio_state = CHECK;
2040 			kerr = (int)_kaio(mode, cb);
2041 			if (kerr == 0)
2042 				return (0);
2043 			else if ((errno != ENOTSUP) && (errno != EBADFD)) {
2044 				cb->aio_resultp.aio_errno = errno;
2045 				cb->aio_resultp.aio_return = -1;
2046 				cb->aio_state = NOCHECK;
2047 				return (-1);
2048 			}
2049 			if (errno == EBADFD)
2050 				SET_KAIO_NOT_SUPPORTED(cb->aio_fildes);
2051 		}
2052 	}
2053 
2054 	cb->aio_resultp.aio_errno = EINPROGRESS;
2055 	cb->aio_state = USERAIO;
2056 
2057 	if (!__uaio_ok) {
2058 		if (__uaio_init() == -1)
2059 			return (-1);
2060 	}
2061 
2062 	aiorp = _aio_req_alloc();
2063 	if (aiorp == (aio_req_t *)-1) {
2064 		errno = EAGAIN;
2065 		return (-1);
2066 	}
2067 
2068 	/*
2069 	 * If an LIO request, add the list head to the
2070 	 * aio request
2071 	 */
2072 	aiorp->lio_head = lio_head;
2073 	aiorp->req_type = AIO_POSIX_REQ;
2074 	umode = ((mode == AIOFSYNC) ? mode : mode - AIOAREAD);
2075 	aiorp->req_op = umode;
2076 
2077 	if (cb->aio_sigevent.sigev_notify == SIGEV_SIGNAL) {
2078 		aiorp->aio_sigevent.sigev_notify = SIGEV_SIGNAL;
2079 		aiorp->aio_sigevent.sigev_signo =
2080 		    cb->aio_sigevent.sigev_signo;
2081 		aiorp->aio_sigevent.sigev_value.sival_ptr =
2082 		    cb->aio_sigevent.sigev_value.sival_ptr;
2083 	}
2084 
2085 	if (sigp) {
2086 		/* SIGEV_PORT */
2087 		port_notify_t *pn = sigp->sigev_value.sival_ptr;
2088 		aiorp->aio_sigevent.sigev_notify = SIGEV_PORT;
2089 		aiorp->aio_sigevent.sigev_signo = pn->portnfy_port;
2090 		aiorp->aio_sigevent.sigev_value.sival_ptr = pn->portnfy_user;
2091 	} else if (cb->aio_sigevent.sigev_notify == SIGEV_PORT) {
2092 		port_notify_t *pn;
2093 		pn = cb->aio_sigevent.sigev_value.sival_ptr;
2094 		aiorp->aio_sigevent.sigev_notify = SIGEV_PORT;
2095 		aiorp->aio_sigevent.sigev_signo = pn->portnfy_port;
2096 		aiorp->aio_sigevent.sigev_value.sival_ptr = pn->portnfy_user;
2097 	}
2098 
2099 	aiorp->req_resultp = &cb->aio_resultp;
2100 	aiorp->req_iocb = cb;
2101 	ap = &(aiorp->req_args);
2102 	ap->fd = cb->aio_fildes;
2103 	ap->buf = (caddr_t)cb->aio_buf;
2104 	ap->bufsz = cb->aio_nbytes;
2105 	ap->offset = cb->aio_offset;
2106 
2107 	_aio_lock();
2108 	if ((flg & AIO_NO_DUPS) && _aio_hash_insert(&cb->aio_resultp, aiorp)) {
2109 		_aio_req_free(aiorp);
2110 		_aio_unlock();
2111 		errno = EINVAL;
2112 		return (-1);
2113 	} else {
2114 		_aio_unlock();
2115 		_aio_req_add(aiorp, nextworker, umode);
2116 		return (0);
2117 	}
2118 }
2119 
2120 #if	defined(_LARGEFILE64_SOURCE) && !defined(_LP64)
2121 /*
2122  * 64-bit AIO interface for POSIX
2123  */
2124 int
2125 _aio_rw64(aiocb64_t *cb, aio_lio_t *lio_head, aio_worker_t **nextworker,
2126     int mode, int flg, struct sigevent *sigp)
2127 {
2128 	aio_req_t *aiorp = NULL;
2129 	aio_args_t *ap = NULL;
2130 	int kerr;
2131 	int umode;
2132 
2133 	if (cb == NULL) {
2134 		errno = EINVAL;
2135 		return (-1);
2136 	}
2137 
2138 	/* initialize kaio */
2139 	if (!_kaio_ok)
2140 		_kaio_init();
2141 
2142 	cb->aio_state = NOCHECK;
2143 
2144 	/*
2145 	 * If _aio_rw() is called because a list I/O
2146 	 * kaio() failed, we dont want to repeat the
2147 	 * system call
2148 	 */
2149 
2150 	if (flg & AIO_KAIO) {
2151 		/*
2152 		 * Try kernel aio first.
2153 		 * If errno is ENOTSUP/EBADFD,
2154 		 * fall back to the thread implementation.
2155 		 */
2156 		if ((_kaio_ok > 0) && (KAIO_SUPPORTED(cb->aio_fildes))) {
2157 			cb->aio_resultp.aio_errno = EINPROGRESS;
2158 			cb->aio_state = CHECK;
2159 			kerr = (int)_kaio(mode, cb);
2160 			if (kerr == 0)
2161 				return (0);
2162 			else if ((errno != ENOTSUP) && (errno != EBADFD)) {
2163 				cb->aio_resultp.aio_errno = errno;
2164 				cb->aio_resultp.aio_return = -1;
2165 				cb->aio_state = NOCHECK;
2166 				return (-1);
2167 			}
2168 			if (errno == EBADFD)
2169 				SET_KAIO_NOT_SUPPORTED(cb->aio_fildes);
2170 		}
2171 	}
2172 
2173 	cb->aio_resultp.aio_errno = EINPROGRESS;
2174 	cb->aio_state = USERAIO;
2175 
2176 	if (!__uaio_ok) {
2177 		if (__uaio_init() == -1)
2178 			return (-1);
2179 	}
2180 
2181 
2182 	aiorp = _aio_req_alloc();
2183 	if (aiorp == (aio_req_t *)-1) {
2184 		errno = EAGAIN;
2185 		return (-1);
2186 	}
2187 
2188 	/*
2189 	 * If an LIO request, add the list head to the
2190 	 * aio request
2191 	 */
2192 	aiorp->lio_head = lio_head;
2193 	aiorp->req_type = AIO_POSIX_REQ;
2194 
2195 	/*
2196 	 * _aio_do_request() needs the original request code to be able
2197 	 * to choose the appropriate 32/64 bit function.
2198 	 */
2199 	aiorp->req_op = mode;
2200 
2201 	if (cb->aio_sigevent.sigev_notify == SIGEV_SIGNAL) {
2202 		aiorp->aio_sigevent.sigev_notify = SIGEV_SIGNAL;
2203 		aiorp->aio_sigevent.sigev_signo =
2204 		    cb->aio_sigevent.sigev_signo;
2205 		aiorp->aio_sigevent.sigev_value.sival_ptr =
2206 		    cb->aio_sigevent.sigev_value.sival_ptr;
2207 	}
2208 
2209 	if (sigp) {
2210 		/* SIGEV_PORT */
2211 		port_notify_t *pn = sigp->sigev_value.sival_ptr;
2212 		aiorp->aio_sigevent.sigev_notify = SIGEV_PORT;
2213 		aiorp->aio_sigevent.sigev_signo = pn->portnfy_port;
2214 		aiorp->aio_sigevent.sigev_value.sival_ptr = pn->portnfy_user;
2215 	} else if (cb->aio_sigevent.sigev_notify == SIGEV_PORT) {
2216 		port_notify_t *pn;
2217 		pn = cb->aio_sigevent.sigev_value.sival_ptr;
2218 		aiorp->aio_sigevent.sigev_notify = SIGEV_PORT;
2219 		aiorp->aio_sigevent.sigev_signo = pn->portnfy_port;
2220 		aiorp->aio_sigevent.sigev_value.sival_ptr = pn->portnfy_user;
2221 	}
2222 
2223 	aiorp->req_resultp = &cb->aio_resultp;
2224 	aiorp->req_iocb = (aiocb_t *)cb;
2225 	ap = &(aiorp->req_args);
2226 	ap->fd = cb->aio_fildes;
2227 	ap->buf = (caddr_t)cb->aio_buf;
2228 	ap->bufsz = cb->aio_nbytes;
2229 	ap->offset = cb->aio_offset;
2230 
2231 	_aio_lock();
2232 	if ((flg & AIO_NO_DUPS) && _aio_hash_insert(&cb->aio_resultp, aiorp)) {
2233 		_aio_req_free(aiorp);
2234 		_aio_unlock();
2235 		errno = EINVAL;
2236 		return (-1);
2237 	} else {
2238 		_aio_unlock();
2239 
2240 		/*
2241 		 * _aio_req_add() only needs the difference between READ,
2242 		 * WRITE and other to choose the right worker queue.
2243 		 * AIOAREAD64 is mapped to AIOREAD and
2244 		 * AIOAWRITE64 is mapped to AIOWRITE.
2245 		 * mode is AIOAREAD64, AIOAWRITE64 or AIOFSYNC.
2246 		 */
2247 		umode = ((mode == AIOFSYNC) ? mode : mode - AIOAREAD64);
2248 		_aio_req_add(aiorp, nextworker, umode);
2249 		return (0);
2250 	}
2251 }
2252 #endif	/* (_LARGEFILE64_SOURCE) && !defined(_LP64) */
2253