xref: /illumos-gate/usr/src/lib/libc/port/aio/aio.c (revision c2575b5e)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 
22 /*
23  * Copyright 2006 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 
27 #pragma ident	"%Z%%M%	%I%	%E% SMI"
28 
29 #include "libaio.h"
30 #include <atomic.h>
31 #include <sys/param.h>
32 #include <sys/file.h>
33 #include <sys/port.h>
34 
35 static int _aio_hash_insert(aio_result_t *, aio_req_t *);
36 static aio_req_t *_aio_req_get(aio_worker_t *);
37 static void _aio_req_add(aio_req_t *, aio_worker_t **, int);
38 static void _aio_req_del(aio_worker_t *, aio_req_t *, int);
39 static void _aio_work_done(aio_worker_t *);
40 aio_req_t *_aio_req_remove(aio_req_t *);
41 static void _aio_enq_doneq(aio_req_t *);
42 
43 extern void _aio_lio_free(aio_lio_t *);
44 
45 extern int __fdsync(int, int);
46 extern int _port_dispatch(int, int, int, int, uintptr_t, void *);
47 
48 static int _aio_fsync_del(aio_worker_t *, aio_req_t *);
49 static void _aiodone(aio_req_t *, ssize_t, int);
50 static void _aio_cancel_work(aio_worker_t *, int, int *, int *);
51 static void _aio_finish_request(aio_worker_t *, ssize_t, int);
52 
53 /*
54  * switch for kernel async I/O
55  */
56 int _kaio_ok = 0;		/* 0 = disabled, 1 = on, -1 = error */
57 
58 /*
59  * Key for thread-specific data
60  */
61 pthread_key_t _aio_key;
62 
63 /*
64  * Array for determining whether or not a file supports kaio.
65  * Initialized in _kaio_init().
66  */
67 uint32_t *_kaio_supported = NULL;
68 
69 /*
70  *  workers for read/write requests
71  * (__aio_mutex lock protects circular linked list of workers)
72  */
73 aio_worker_t *__workers_rw;	/* circular list of AIO workers */
74 aio_worker_t *__nextworker_rw;	/* next worker in list of workers */
75 int __rw_workerscnt;		/* number of read/write workers */
76 
77 /*
78  * worker for notification requests.
79  */
80 aio_worker_t *__workers_no;	/* circular list of AIO workers */
81 aio_worker_t *__nextworker_no;	/* next worker in list of workers */
82 int __no_workerscnt;		/* number of write workers */
83 
84 aio_req_t *_aio_done_tail;		/* list of done requests */
85 aio_req_t *_aio_done_head;
86 
87 mutex_t __aio_initlock = DEFAULTMUTEX;	/* makes aio initialization atomic */
88 mutex_t __aio_mutex = DEFAULTMUTEX;	/* protects counts, and linked lists */
89 cond_t _aio_iowait_cv = DEFAULTCV;	/* wait for userland I/Os */
90 
91 pid_t __pid = (pid_t)-1;		/* initialize as invalid pid */
92 int _sigio_enabled = 0;			/* when set, send SIGIO signal */
93 
94 aio_hash_t *_aio_hash;
95 
96 aio_req_t *_aio_doneq;			/* double linked done queue list */
97 
98 int _aio_donecnt = 0;
99 int _aio_waitncnt = 0;			/* # of requests for aio_waitn */
100 int _aio_doneq_cnt = 0;
101 int _aio_outstand_cnt = 0;		/* # of outstanding requests */
102 int _kaio_outstand_cnt = 0;		/* # of outstanding kaio requests */
103 int _aio_req_done_cnt = 0;		/* req. done but not in "done queue" */
104 int _aio_kernel_suspend = 0;		/* active kernel kaio calls */
105 int _aio_suscv_cnt = 0;			/* aio_suspend calls waiting on cv's */
106 
107 int _max_workers = 256;			/* max number of workers permitted */
108 int _min_workers = 8;			/* min number of workers */
109 int _minworkload = 2;			/* min number of request in q */
110 int _aio_worker_cnt = 0;		/* number of workers to do requests */
111 int __uaio_ok = 0;			/* AIO has been enabled */
112 sigset_t _worker_set;			/* worker's signal mask */
113 sigset_t _full_set;			/* all signals (sigfillset()) */
114 
115 int _aiowait_flag = 0;			/* when set, aiowait() is inprogress */
116 int _aio_flags = 0;			/* see libaio.h defines for */
117 
118 aio_worker_t *_kaiowp;			/* points to kaio cleanup thread */
119 
120 int hz;					/* clock ticks per second */
121 
122 static int
123 _kaio_supported_init(void)
124 {
125 	void *ptr;
126 	size_t size;
127 
128 	if (_kaio_supported != NULL)	/* already initialized */
129 		return (0);
130 
131 	size = MAX_KAIO_FDARRAY_SIZE * sizeof (uint32_t);
132 	ptr = mmap(NULL, size, PROT_READ | PROT_WRITE,
133 	    MAP_PRIVATE | MAP_ANON, -1, (off_t)0);
134 	if (ptr == MAP_FAILED)
135 		return (-1);
136 	_kaio_supported = ptr;
137 	return (0);
138 }
139 
140 /*
141  * libaio is initialized when an AIO request is made.  Important
142  * constants are initialized like the max number of workers that
143  * libaio can create, and the minimum number of workers permitted before
144  * imposing some restrictions.  Also, some workers are created.
145  */
146 int
147 __uaio_init(void)
148 {
149 	int i;
150 	int ret;
151 
152 	sig_mutex_lock(&__aio_initlock);
153 	if (__uaio_ok) {	/* already initialized */
154 		sig_mutex_unlock(&__aio_initlock);
155 		return (0);
156 	}
157 
158 	ret = -1;
159 
160 	hz = (int)sysconf(_SC_CLK_TCK);
161 	__pid = getpid();
162 
163 	init_signals();
164 
165 	if (_kaio_supported_init() != 0)
166 		goto out;
167 
168 	/*
169 	 * Allocate and initialize the hash table.
170 	 */
171 	/* LINTED pointer cast */
172 	_aio_hash = (aio_hash_t *)mmap(NULL,
173 	    HASHSZ * sizeof (aio_hash_t), PROT_READ | PROT_WRITE,
174 	    MAP_PRIVATE | MAP_ANON, -1, (off_t)0);
175 	if ((void *)_aio_hash == MAP_FAILED) {
176 		_aio_hash = NULL;
177 		goto out;
178 	}
179 	for (i = 0; i < HASHSZ; i++)
180 		(void) mutex_init(&_aio_hash[i].hash_lock, USYNC_THREAD, NULL);
181 
182 	/*
183 	 * Initialize worker's signal mask to only catch SIGAIOCANCEL.
184 	 */
185 	(void) sigfillset(&_full_set);
186 	(void) sigfillset(&_worker_set);
187 	(void) sigdelset(&_worker_set, SIGAIOCANCEL);
188 
189 	/*
190 	 * Create the minimum number of workers.
191 	 */
192 	for (i = 0; i < _min_workers; i++)
193 		(void) _aio_create_worker(NULL, AIOREAD);
194 
195 	/*
196 	 * Create one worker to send asynchronous notifications.
197 	 */
198 	(void) _aio_create_worker(NULL, AIONOTIFY);
199 
200 	__uaio_ok = 1;
201 	ret = 0;
202 
203 out:
204 	sig_mutex_unlock(&__aio_initlock);
205 	return (ret);
206 }
207 
208 /*
209  * special kaio cleanup thread sits in a loop in the
210  * kernel waiting for pending kaio requests to complete.
211  */
212 void *
213 _kaio_cleanup_thread(void *arg)
214 {
215 	if (pthread_setspecific(_aio_key, arg) != 0)
216 		_aiopanic("_kaio_cleanup_thread, pthread_setspecific()");
217 	(void) _kaio(AIOSTART);
218 	return (arg);
219 }
220 
221 /*
222  * initialize kaio.
223  */
224 void
225 _kaio_init()
226 {
227 	int error;
228 	sigset_t set;
229 	sigset_t oset;
230 
231 	sig_mutex_lock(&__aio_initlock);
232 	if (_kaio_supported_init() != 0)
233 		_kaio_ok = -1;
234 	if (_kaio_ok == 0) {
235 		if ((_kaiowp = _aio_worker_alloc()) == NULL) {
236 			error =  ENOMEM;
237 		} else {
238 			if ((error = (int)_kaio(AIOINIT)) == 0) {
239 				(void) sigfillset(&set);
240 				(void) pthread_sigmask(SIG_SETMASK,
241 				    &set, &oset);
242 				error = thr_create(NULL, AIOSTKSIZE,
243 				    _kaio_cleanup_thread, _kaiowp,
244 				    THR_DAEMON, &_kaiowp->work_tid);
245 				(void) pthread_sigmask(SIG_SETMASK,
246 				    &oset, NULL);
247 			}
248 			if (error) {
249 				_aio_worker_free(_kaiowp);
250 				_kaiowp = NULL;
251 			}
252 		}
253 		if (error)
254 			_kaio_ok = -1;
255 		else
256 			_kaio_ok = 1;
257 	}
258 	sig_mutex_unlock(&__aio_initlock);
259 }
260 
261 int
262 aioread(int fd, caddr_t buf, int bufsz, off_t offset, int whence,
263     aio_result_t *resultp)
264 {
265 	return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOREAD));
266 }
267 
268 int
269 aiowrite(int fd, caddr_t buf, int bufsz, off_t offset, int whence,
270     aio_result_t *resultp)
271 {
272 	return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOWRITE));
273 }
274 
275 #if !defined(_LP64)
276 int
277 aioread64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence,
278     aio_result_t *resultp)
279 {
280 	return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAREAD64));
281 }
282 
283 int
284 aiowrite64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence,
285     aio_result_t *resultp)
286 {
287 	return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAWRITE64));
288 }
289 #endif	/* !defined(_LP64) */
290 
291 int
292 _aiorw(int fd, caddr_t buf, int bufsz, offset_t offset, int whence,
293     aio_result_t *resultp, int mode)
294 {
295 	aio_req_t *reqp;
296 	aio_args_t *ap;
297 	offset_t loffset;
298 	struct stat stat;
299 	int error = 0;
300 	int kerr;
301 	int umode;
302 
303 	switch (whence) {
304 
305 	case SEEK_SET:
306 		loffset = offset;
307 		break;
308 	case SEEK_CUR:
309 		if ((loffset = llseek(fd, 0, SEEK_CUR)) == -1)
310 			error = -1;
311 		else
312 			loffset += offset;
313 		break;
314 	case SEEK_END:
315 		if (fstat(fd, &stat) == -1)
316 			error = -1;
317 		else
318 			loffset = offset + stat.st_size;
319 		break;
320 	default:
321 		errno = EINVAL;
322 		error = -1;
323 	}
324 
325 	if (error)
326 		return (error);
327 
328 	/* initialize kaio */
329 	if (!_kaio_ok)
330 		_kaio_init();
331 
332 	/*
333 	 * _aio_do_request() needs the original request code (mode) to be able
334 	 * to choose the appropiate 32/64 bit function.  All other functions
335 	 * only require the difference between READ and WRITE (umode).
336 	 */
337 	if (mode == AIOAREAD64 || mode == AIOAWRITE64)
338 		umode = mode - AIOAREAD64;
339 	else
340 		umode = mode;
341 
342 	/*
343 	 * Try kernel aio first.
344 	 * If errno is ENOTSUP/EBADFD, fall back to the thread implementation.
345 	 */
346 	if (_kaio_ok > 0 && KAIO_SUPPORTED(fd)) {
347 		resultp->aio_errno = 0;
348 		sig_mutex_lock(&__aio_mutex);
349 		_kaio_outstand_cnt++;
350 		kerr = (int)_kaio(((resultp->aio_return == AIO_INPROGRESS) ?
351 		    (umode | AIO_POLL_BIT) : umode),
352 		    fd, buf, bufsz, loffset, resultp);
353 		if (kerr == 0) {
354 			sig_mutex_unlock(&__aio_mutex);
355 			return (0);
356 		}
357 		_kaio_outstand_cnt--;
358 		sig_mutex_unlock(&__aio_mutex);
359 		if (errno != ENOTSUP && errno != EBADFD)
360 			return (-1);
361 		if (errno == EBADFD)
362 			SET_KAIO_NOT_SUPPORTED(fd);
363 	}
364 
365 	if (!__uaio_ok && __uaio_init() == -1)
366 		return (-1);
367 
368 	if ((reqp = _aio_req_alloc()) == NULL) {
369 		errno = EAGAIN;
370 		return (-1);
371 	}
372 
373 	/*
374 	 * _aio_do_request() checks reqp->req_op to differentiate
375 	 * between 32 and 64 bit access.
376 	 */
377 	reqp->req_op = mode;
378 	reqp->req_resultp = resultp;
379 	ap = &reqp->req_args;
380 	ap->fd = fd;
381 	ap->buf = buf;
382 	ap->bufsz = bufsz;
383 	ap->offset = loffset;
384 
385 	if (_aio_hash_insert(resultp, reqp) != 0) {
386 		_aio_req_free(reqp);
387 		errno = EINVAL;
388 		return (-1);
389 	}
390 	/*
391 	 * _aio_req_add() only needs the difference between READ and
392 	 * WRITE to choose the right worker queue.
393 	 */
394 	_aio_req_add(reqp, &__nextworker_rw, umode);
395 	return (0);
396 }
397 
398 int
399 aiocancel(aio_result_t *resultp)
400 {
401 	aio_req_t *reqp;
402 	aio_worker_t *aiowp;
403 	int ret;
404 	int done = 0;
405 	int canceled = 0;
406 
407 	if (!__uaio_ok) {
408 		errno = EINVAL;
409 		return (-1);
410 	}
411 
412 	sig_mutex_lock(&__aio_mutex);
413 	reqp = _aio_hash_find(resultp);
414 	if (reqp == NULL) {
415 		if (_aio_outstand_cnt == _aio_req_done_cnt)
416 			errno = EINVAL;
417 		else
418 			errno = EACCES;
419 		ret = -1;
420 	} else {
421 		aiowp = reqp->req_worker;
422 		sig_mutex_lock(&aiowp->work_qlock1);
423 		(void) _aio_cancel_req(aiowp, reqp, &canceled, &done);
424 		sig_mutex_unlock(&aiowp->work_qlock1);
425 
426 		if (canceled) {
427 			ret = 0;
428 		} else {
429 			if (_aio_outstand_cnt == 0 ||
430 			    _aio_outstand_cnt == _aio_req_done_cnt)
431 				errno = EINVAL;
432 			else
433 				errno = EACCES;
434 			ret = -1;
435 		}
436 	}
437 	sig_mutex_unlock(&__aio_mutex);
438 	return (ret);
439 }
440 
441 /*
442  * This must be asynch safe
443  */
444 aio_result_t *
445 aiowait(struct timeval *uwait)
446 {
447 	aio_result_t *uresultp;
448 	aio_result_t *kresultp;
449 	aio_result_t *resultp;
450 	int dontblock;
451 	int timedwait = 0;
452 	int kaio_errno = 0;
453 	struct timeval twait;
454 	struct timeval *wait = NULL;
455 	hrtime_t hrtend;
456 	hrtime_t hres;
457 
458 	if (uwait) {
459 		/*
460 		 * Check for a valid specified wait time.
461 		 * If it is invalid, fail the call right away.
462 		 */
463 		if (uwait->tv_sec < 0 || uwait->tv_usec < 0 ||
464 		    uwait->tv_usec >= MICROSEC) {
465 			errno = EINVAL;
466 			return ((aio_result_t *)-1);
467 		}
468 
469 		if (uwait->tv_sec > 0 || uwait->tv_usec > 0) {
470 			hrtend = gethrtime() +
471 				(hrtime_t)uwait->tv_sec * NANOSEC +
472 				(hrtime_t)uwait->tv_usec * (NANOSEC / MICROSEC);
473 			twait = *uwait;
474 			wait = &twait;
475 			timedwait++;
476 		} else {
477 			/* polling */
478 			sig_mutex_lock(&__aio_mutex);
479 			if (_kaio_outstand_cnt == 0) {
480 				kresultp = (aio_result_t *)-1;
481 			} else {
482 				kresultp = (aio_result_t *)_kaio(AIOWAIT,
483 				    (struct timeval *)-1, 1);
484 				if (kresultp != (aio_result_t *)-1 &&
485 				    kresultp != NULL &&
486 				    kresultp != (aio_result_t *)1) {
487 					_kaio_outstand_cnt--;
488 					sig_mutex_unlock(&__aio_mutex);
489 					return (kresultp);
490 				}
491 			}
492 			uresultp = _aio_req_done();
493 			sig_mutex_unlock(&__aio_mutex);
494 			if (uresultp != NULL &&
495 			    uresultp != (aio_result_t *)-1) {
496 				return (uresultp);
497 			}
498 			if (uresultp == (aio_result_t *)-1 &&
499 			    kresultp == (aio_result_t *)-1) {
500 				errno = EINVAL;
501 				return ((aio_result_t *)-1);
502 			} else {
503 				return (NULL);
504 			}
505 		}
506 	}
507 
508 	for (;;) {
509 		sig_mutex_lock(&__aio_mutex);
510 		uresultp = _aio_req_done();
511 		if (uresultp != NULL && uresultp != (aio_result_t *)-1) {
512 			sig_mutex_unlock(&__aio_mutex);
513 			resultp = uresultp;
514 			break;
515 		}
516 		_aiowait_flag++;
517 		dontblock = (uresultp == (aio_result_t *)-1);
518 		if (dontblock && _kaio_outstand_cnt == 0) {
519 			kresultp = (aio_result_t *)-1;
520 			kaio_errno = EINVAL;
521 		} else {
522 			sig_mutex_unlock(&__aio_mutex);
523 			kresultp = (aio_result_t *)_kaio(AIOWAIT,
524 			    wait, dontblock);
525 			sig_mutex_lock(&__aio_mutex);
526 			kaio_errno = errno;
527 		}
528 		_aiowait_flag--;
529 		sig_mutex_unlock(&__aio_mutex);
530 		if (kresultp == (aio_result_t *)1) {
531 			/* aiowait() awakened by an aionotify() */
532 			continue;
533 		} else if (kresultp != NULL &&
534 		    kresultp != (aio_result_t *)-1) {
535 			resultp = kresultp;
536 			sig_mutex_lock(&__aio_mutex);
537 			_kaio_outstand_cnt--;
538 			sig_mutex_unlock(&__aio_mutex);
539 			break;
540 		} else if (kresultp == (aio_result_t *)-1 &&
541 		    kaio_errno == EINVAL &&
542 		    uresultp == (aio_result_t *)-1) {
543 			errno = kaio_errno;
544 			resultp = (aio_result_t *)-1;
545 			break;
546 		} else if (kresultp == (aio_result_t *)-1 &&
547 		    kaio_errno == EINTR) {
548 			errno = kaio_errno;
549 			resultp = (aio_result_t *)-1;
550 			break;
551 		} else if (timedwait) {
552 			hres = hrtend - gethrtime();
553 			if (hres <= 0) {
554 				/* time is up; return */
555 				resultp = NULL;
556 				break;
557 			} else {
558 				/*
559 				 * Some time left.  Round up the remaining time
560 				 * in nanoseconds to microsec.  Retry the call.
561 				 */
562 				hres += (NANOSEC / MICROSEC) - 1;
563 				wait->tv_sec = hres / NANOSEC;
564 				wait->tv_usec =
565 					(hres % NANOSEC) / (NANOSEC / MICROSEC);
566 			}
567 		} else {
568 			ASSERT(kresultp == NULL && uresultp == NULL);
569 			resultp = NULL;
570 			continue;
571 		}
572 	}
573 	return (resultp);
574 }
575 
576 /*
577  * _aio_get_timedelta calculates the remaining time and stores the result
578  * into timespec_t *wait.
579  */
580 
581 int
582 _aio_get_timedelta(timespec_t *end, timespec_t *wait)
583 {
584 	int	ret = 0;
585 	struct	timeval cur;
586 	timespec_t curtime;
587 
588 	(void) gettimeofday(&cur, NULL);
589 	curtime.tv_sec = cur.tv_sec;
590 	curtime.tv_nsec = cur.tv_usec * 1000;   /* convert us to ns */
591 
592 	if (end->tv_sec >= curtime.tv_sec) {
593 		wait->tv_sec = end->tv_sec - curtime.tv_sec;
594 		if (end->tv_nsec >= curtime.tv_nsec) {
595 			wait->tv_nsec = end->tv_nsec - curtime.tv_nsec;
596 			if (wait->tv_sec == 0 && wait->tv_nsec == 0)
597 				ret = -1;	/* timer expired */
598 		} else {
599 			if (end->tv_sec > curtime.tv_sec) {
600 				wait->tv_sec -= 1;
601 				wait->tv_nsec = NANOSEC -
602 				    (curtime.tv_nsec - end->tv_nsec);
603 			} else {
604 				ret = -1;	/* timer expired */
605 			}
606 		}
607 	} else {
608 		ret = -1;
609 	}
610 	return (ret);
611 }
612 
613 /*
614  * If closing by file descriptor: we will simply cancel all the outstanding
615  * aio`s and return.  Those aio's in question will have either noticed the
616  * cancellation notice before, during, or after initiating io.
617  */
618 int
619 aiocancel_all(int fd)
620 {
621 	aio_req_t *reqp;
622 	aio_req_t **reqpp;
623 	aio_worker_t *first;
624 	aio_worker_t *next;
625 	int canceled = 0;
626 	int done = 0;
627 	int cancelall = 0;
628 
629 	sig_mutex_lock(&__aio_mutex);
630 
631 	if (_aio_outstand_cnt == 0) {
632 		sig_mutex_unlock(&__aio_mutex);
633 		return (AIO_ALLDONE);
634 	}
635 
636 	/*
637 	 * Cancel requests from the read/write workers' queues.
638 	 */
639 	first = __nextworker_rw;
640 	next = first;
641 	do {
642 		_aio_cancel_work(next, fd, &canceled, &done);
643 	} while ((next = next->work_forw) != first);
644 
645 	/*
646 	 * finally, check if there are requests on the done queue that
647 	 * should be canceled.
648 	 */
649 	if (fd < 0)
650 		cancelall = 1;
651 	reqpp = &_aio_done_tail;
652 	while ((reqp = *reqpp) != NULL) {
653 		if (cancelall || reqp->req_args.fd == fd) {
654 			*reqpp = reqp->req_next;
655 			_aio_donecnt--;
656 			(void) _aio_hash_del(reqp->req_resultp);
657 			_aio_req_free(reqp);
658 		} else
659 			reqpp = &reqp->req_next;
660 	}
661 	if (cancelall) {
662 		ASSERT(_aio_donecnt == 0);
663 		_aio_done_head = NULL;
664 	}
665 	sig_mutex_unlock(&__aio_mutex);
666 
667 	if (canceled && done == 0)
668 		return (AIO_CANCELED);
669 	else if (done && canceled == 0)
670 		return (AIO_ALLDONE);
671 	else if ((canceled + done == 0) && KAIO_SUPPORTED(fd))
672 		return ((int)_kaio(AIOCANCEL, fd, NULL));
673 	return (AIO_NOTCANCELED);
674 }
675 
676 /*
677  * Cancel requests from a given work queue.  If the file descriptor
678  * parameter, fd, is non-negative, then only cancel those requests
679  * in this queue that are to this file descriptor.  If the fd
680  * parameter is -1, then cancel all requests.
681  */
682 static void
683 _aio_cancel_work(aio_worker_t *aiowp, int fd, int *canceled, int *done)
684 {
685 	aio_req_t *reqp;
686 
687 	sig_mutex_lock(&aiowp->work_qlock1);
688 	/*
689 	 * cancel queued requests first.
690 	 */
691 	reqp = aiowp->work_tail1;
692 	while (reqp != NULL) {
693 		if (fd < 0 || reqp->req_args.fd == fd) {
694 			if (_aio_cancel_req(aiowp, reqp, canceled, done)) {
695 				/*
696 				 * Callers locks were dropped.
697 				 * reqp is invalid; start traversing
698 				 * the list from the beginning again.
699 				 */
700 				reqp = aiowp->work_tail1;
701 				continue;
702 			}
703 		}
704 		reqp = reqp->req_next;
705 	}
706 	/*
707 	 * Since the queued requests have been canceled, there can
708 	 * only be one inprogress request that should be canceled.
709 	 */
710 	if ((reqp = aiowp->work_req) != NULL &&
711 	    (fd < 0 || reqp->req_args.fd == fd))
712 		(void) _aio_cancel_req(aiowp, reqp, canceled, done);
713 	sig_mutex_unlock(&aiowp->work_qlock1);
714 }
715 
716 /*
717  * Cancel a request.  Return 1 if the callers locks were temporarily
718  * dropped, otherwise return 0.
719  */
720 int
721 _aio_cancel_req(aio_worker_t *aiowp, aio_req_t *reqp, int *canceled, int *done)
722 {
723 	int ostate = reqp->req_state;
724 
725 	ASSERT(MUTEX_HELD(&__aio_mutex));
726 	ASSERT(MUTEX_HELD(&aiowp->work_qlock1));
727 	if (ostate == AIO_REQ_CANCELED)
728 		return (0);
729 	if (ostate == AIO_REQ_DONE || ostate == AIO_REQ_DONEQ) {
730 		(*done)++;
731 		return (0);
732 	}
733 	if (reqp->req_op == AIOFSYNC && reqp != aiowp->work_req) {
734 		ASSERT(POSIX_AIO(reqp));
735 		/* Cancel the queued aio_fsync() request */
736 		if (!reqp->req_head->lio_canned) {
737 			reqp->req_head->lio_canned = 1;
738 			_aio_outstand_cnt--;
739 			(*canceled)++;
740 		}
741 		return (0);
742 	}
743 	reqp->req_state = AIO_REQ_CANCELED;
744 	_aio_req_del(aiowp, reqp, ostate);
745 	(void) _aio_hash_del(reqp->req_resultp);
746 	(*canceled)++;
747 	if (reqp == aiowp->work_req) {
748 		ASSERT(ostate == AIO_REQ_INPROGRESS);
749 		/*
750 		 * Set the result values now, before _aiodone() is called.
751 		 * We do this because the application can expect aio_return
752 		 * and aio_errno to be set to -1 and ECANCELED, respectively,
753 		 * immediately after a successful return from aiocancel()
754 		 * or aio_cancel().
755 		 */
756 		_aio_set_result(reqp, -1, ECANCELED);
757 		(void) thr_kill(aiowp->work_tid, SIGAIOCANCEL);
758 		return (0);
759 	}
760 	if (!POSIX_AIO(reqp)) {
761 		_aio_outstand_cnt--;
762 		_aio_set_result(reqp, -1, ECANCELED);
763 		return (0);
764 	}
765 	sig_mutex_unlock(&aiowp->work_qlock1);
766 	sig_mutex_unlock(&__aio_mutex);
767 	_aiodone(reqp, -1, ECANCELED);
768 	sig_mutex_lock(&__aio_mutex);
769 	sig_mutex_lock(&aiowp->work_qlock1);
770 	return (1);
771 }
772 
773 /*
774  * This is the worker's main routine.
775  * The task of this function is to execute all queued requests;
776  * once the last pending request is executed this function will block
777  * in _aio_idle().  A new incoming request must wakeup this thread to
778  * restart the work.
779  * Every worker has an own work queue.  The queue lock is required
780  * to synchronize the addition of new requests for this worker or
781  * cancellation of pending/running requests.
782  *
783  * Cancellation scenarios:
784  * The cancellation of a request is being done asynchronously using
785  * _aio_cancel_req() from another thread context.
786  * A queued request can be cancelled in different manners :
787  * a) request is queued but not "in progress" or "done" (AIO_REQ_QUEUED):
788  *	- lock the queue -> remove the request -> unlock the queue
789  *	- this function/thread does not detect this cancellation process
790  * b) request is in progress (AIO_REQ_INPROGRESS) :
791  *	- this function first allow the cancellation of the running
792  *	  request with the flag "work_cancel_flg=1"
793  * 		see _aio_req_get() -> _aio_cancel_on()
794  *	  During this phase, it is allowed to interrupt the worker
795  *	  thread running the request (this thread) using the SIGAIOCANCEL
796  *	  signal.
797  *	  Once this thread returns from the kernel (because the request
798  *	  is just done), then it must disable a possible cancellation
799  *	  and proceed to finish the request.  To disable the cancellation
800  *	  this thread must use _aio_cancel_off() to set "work_cancel_flg=0".
801  * c) request is already done (AIO_REQ_DONE || AIO_REQ_DONEQ):
802  *	  same procedure as in a)
803  *
804  * To b)
805  *	This thread uses sigsetjmp() to define the position in the code, where
806  *	it wish to continue working in the case that a SIGAIOCANCEL signal
807  *	is detected.
808  *	Normally this thread should get the cancellation signal during the
809  *	kernel phase (reading or writing).  In that case the signal handler
810  *	aiosigcancelhndlr() is activated using the worker thread context,
811  *	which again will use the siglongjmp() function to break the standard
812  *	code flow and jump to the "sigsetjmp" position, provided that
813  *	"work_cancel_flg" is set to "1".
814  *	Because the "work_cancel_flg" is only manipulated by this worker
815  *	thread and it can only run on one CPU at a given time, it is not
816  *	necessary to protect that flag with the queue lock.
817  *	Returning from the kernel (read or write system call) we must
818  *	first disable the use of the SIGAIOCANCEL signal and accordingly
819  *	the use of the siglongjmp() function to prevent a possible deadlock:
820  *	- It can happens that this worker thread returns from the kernel and
821  *	  blocks in "work_qlock1",
822  *	- then a second thread cancels the apparently "in progress" request
823  *	  and sends the SIGAIOCANCEL signal to the worker thread,
824  *	- the worker thread gets assigned the "work_qlock1" and will returns
825  *	  from the kernel,
826  *	- the kernel detects the pending signal and activates the signal
827  *	  handler instead,
828  *	- if the "work_cancel_flg" is still set then the signal handler
829  *	  should use siglongjmp() to cancel the "in progress" request and
830  *	  it would try to acquire the same work_qlock1 in _aio_req_get()
831  *	  for a second time => deadlock.
832  *	To avoid that situation we disable the cancellation of the request
833  *	in progress BEFORE we try to acquire the work_qlock1.
834  *	In that case the signal handler will not call siglongjmp() and the
835  *	worker thread will continue running the standard code flow.
836  *	Then this thread must check the AIO_REQ_CANCELED flag to emulate
837  *	an eventually required siglongjmp() freeing the work_qlock1 and
838  *	avoiding a deadlock.
839  */
840 void *
841 _aio_do_request(void *arglist)
842 {
843 	aio_worker_t *aiowp = (aio_worker_t *)arglist;
844 	struct aio_args *arg;
845 	aio_req_t *reqp;		/* current AIO request */
846 	ssize_t retval;
847 	int error;
848 
849 	if (pthread_setspecific(_aio_key, aiowp) != 0)
850 		_aiopanic("_aio_do_request, pthread_setspecific()");
851 	(void) pthread_sigmask(SIG_SETMASK, &_worker_set, NULL);
852 	ASSERT(aiowp->work_req == NULL);
853 
854 	/*
855 	 * We resume here when an operation is cancelled.
856 	 * On first entry, aiowp->work_req == NULL, so all
857 	 * we do is block SIGAIOCANCEL.
858 	 */
859 	(void) sigsetjmp(aiowp->work_jmp_buf, 0);
860 
861 	_sigoff();	/* block SIGAIOCANCEL */
862 	if (aiowp->work_req != NULL)
863 		_aio_finish_request(aiowp, -1, ECANCELED);
864 
865 	for (;;) {
866 		/*
867 		 * Put completed requests on aio_done_list.  This has
868 		 * to be done as part of the main loop to ensure that
869 		 * we don't artificially starve any aiowait'ers.
870 		 */
871 		if (aiowp->work_done1)
872 			_aio_work_done(aiowp);
873 
874 top:
875 		/* consume any deferred SIGAIOCANCEL signal here */
876 		_sigon();
877 		_sigoff();
878 
879 		while ((reqp = _aio_req_get(aiowp)) == NULL)
880 			_aio_idle(aiowp);
881 		arg = &reqp->req_args;
882 		ASSERT(reqp->req_state == AIO_REQ_INPROGRESS ||
883 		    reqp->req_state == AIO_REQ_CANCELED);
884 		error = 0;
885 
886 		switch (reqp->req_op) {
887 		case AIOREAD:
888 		case AIOAREAD:
889 			_sigon();	/* unblock SIGAIOCANCEL */
890 			retval = pread(arg->fd, arg->buf,
891 			    arg->bufsz, arg->offset);
892 			if (retval == -1) {
893 				if (errno == ESPIPE) {
894 					retval = read(arg->fd,
895 					    arg->buf, arg->bufsz);
896 					if (retval == -1)
897 						error = errno;
898 				} else {
899 					error = errno;
900 				}
901 			}
902 			_sigoff();	/* block SIGAIOCANCEL */
903 			break;
904 		case AIOWRITE:
905 		case AIOAWRITE:
906 			_sigon();	/* unblock SIGAIOCANCEL */
907 			retval = pwrite(arg->fd, arg->buf,
908 			    arg->bufsz, arg->offset);
909 			if (retval == -1) {
910 				if (errno == ESPIPE) {
911 					retval = write(arg->fd,
912 					    arg->buf, arg->bufsz);
913 					if (retval == -1)
914 						error = errno;
915 				} else {
916 					error = errno;
917 				}
918 			}
919 			_sigoff();	/* block SIGAIOCANCEL */
920 			break;
921 #if !defined(_LP64)
922 		case AIOAREAD64:
923 			_sigon();	/* unblock SIGAIOCANCEL */
924 			retval = pread64(arg->fd, arg->buf,
925 			    arg->bufsz, arg->offset);
926 			if (retval == -1) {
927 				if (errno == ESPIPE) {
928 					retval = read(arg->fd,
929 					    arg->buf, arg->bufsz);
930 					if (retval == -1)
931 						error = errno;
932 				} else {
933 					error = errno;
934 				}
935 			}
936 			_sigoff();	/* block SIGAIOCANCEL */
937 			break;
938 		case AIOAWRITE64:
939 			_sigon();	/* unblock SIGAIOCANCEL */
940 			retval = pwrite64(arg->fd, arg->buf,
941 			    arg->bufsz, arg->offset);
942 			if (retval == -1) {
943 				if (errno == ESPIPE) {
944 					retval = write(arg->fd,
945 					    arg->buf, arg->bufsz);
946 					if (retval == -1)
947 						error = errno;
948 				} else {
949 					error = errno;
950 				}
951 			}
952 			_sigoff();	/* block SIGAIOCANCEL */
953 			break;
954 #endif	/* !defined(_LP64) */
955 		case AIOFSYNC:
956 			if (_aio_fsync_del(aiowp, reqp))
957 				goto top;
958 			ASSERT(reqp->req_head == NULL);
959 			/*
960 			 * All writes for this fsync request are now
961 			 * acknowledged.  Now make these writes visible
962 			 * and put the final request into the hash table.
963 			 */
964 			if (reqp->req_state == AIO_REQ_CANCELED) {
965 				/* EMPTY */;
966 			} else if (arg->offset == O_SYNC) {
967 				if ((retval = __fdsync(arg->fd, FSYNC)) == -1)
968 					error = errno;
969 			} else {
970 				if ((retval = __fdsync(arg->fd, FDSYNC)) == -1)
971 					error = errno;
972 			}
973 			if (_aio_hash_insert(reqp->req_resultp, reqp) != 0)
974 				_aiopanic("_aio_do_request(): AIOFSYNC: "
975 				    "request already in hash table");
976 			break;
977 		default:
978 			_aiopanic("_aio_do_request, bad op");
979 		}
980 
981 		_aio_finish_request(aiowp, retval, error);
982 	}
983 	/* NOTREACHED */
984 	return (NULL);
985 }
986 
987 /*
988  * Perform the tail processing for _aio_do_request().
989  * The in-progress request may or may not have been cancelled.
990  */
991 static void
992 _aio_finish_request(aio_worker_t *aiowp, ssize_t retval, int error)
993 {
994 	aio_req_t *reqp;
995 
996 	sig_mutex_lock(&aiowp->work_qlock1);
997 	if ((reqp = aiowp->work_req) == NULL)
998 		sig_mutex_unlock(&aiowp->work_qlock1);
999 	else {
1000 		aiowp->work_req = NULL;
1001 		if (reqp->req_state == AIO_REQ_CANCELED) {
1002 			retval = -1;
1003 			error = ECANCELED;
1004 		}
1005 		if (!POSIX_AIO(reqp)) {
1006 			sig_mutex_unlock(&aiowp->work_qlock1);
1007 			sig_mutex_lock(&__aio_mutex);
1008 			if (reqp->req_state == AIO_REQ_INPROGRESS)
1009 				reqp->req_state = AIO_REQ_DONE;
1010 			_aio_req_done_cnt++;
1011 			_aio_set_result(reqp, retval, error);
1012 			if (error == ECANCELED)
1013 				_aio_outstand_cnt--;
1014 			sig_mutex_unlock(&__aio_mutex);
1015 		} else {
1016 			if (reqp->req_state == AIO_REQ_INPROGRESS)
1017 				reqp->req_state = AIO_REQ_DONE;
1018 			sig_mutex_unlock(&aiowp->work_qlock1);
1019 			_aiodone(reqp, retval, error);
1020 		}
1021 	}
1022 }
1023 
1024 void
1025 _aio_req_mark_done(aio_req_t *reqp)
1026 {
1027 #if !defined(_LP64)
1028 	if (reqp->req_largefile)
1029 		((aiocb64_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE;
1030 	else
1031 #endif
1032 		((aiocb_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE;
1033 }
1034 
1035 /*
1036  * Sleep for 'ticks' clock ticks to give somebody else a chance to run,
1037  * hopefully to consume one of our queued signals.
1038  */
1039 static void
1040 _aio_delay(int ticks)
1041 {
1042 	(void) usleep(ticks * (MICROSEC / hz));
1043 }
1044 
1045 /*
1046  * Actually send the notifications.
1047  * We could block indefinitely here if the application
1048  * is not listening for the signal or port notifications.
1049  */
1050 static void
1051 send_notification(notif_param_t *npp)
1052 {
1053 	int backoff;
1054 
1055 	if (npp->np_signo) {
1056 		backoff = 0;
1057 		while (__sigqueue(__pid, npp->np_signo, npp->np_user,
1058 		    SI_ASYNCIO) == -1) {
1059 			ASSERT(errno == EAGAIN);
1060 			if (++backoff > 10)
1061 				backoff = 10;
1062 			_aio_delay(backoff);
1063 		}
1064 	} else if (npp->np_port >= 0) {
1065 		(void) _port_dispatch(npp->np_port, 0, PORT_SOURCE_AIO,
1066 		    npp->np_event, npp->np_object, npp->np_user);
1067 	}
1068 	if (npp->np_lio_signo) {
1069 		backoff = 0;
1070 		while (__sigqueue(__pid, npp->np_lio_signo, npp->np_lio_user,
1071 		    SI_ASYNCIO) == -1) {
1072 			ASSERT(errno == EAGAIN);
1073 			if (++backoff > 10)
1074 				backoff = 10;
1075 			_aio_delay(backoff);
1076 		}
1077 	} else if (npp->np_lio_port >= 0) {
1078 		(void) _port_dispatch(npp->np_lio_port, 0, PORT_SOURCE_AIO,
1079 		    npp->np_lio_event, npp->np_lio_object, npp->np_lio_user);
1080 	}
1081 }
1082 
1083 /*
1084  * Asynchronous notification worker.
1085  */
1086 void *
1087 _aio_do_notify(void *arg)
1088 {
1089 	aio_worker_t *aiowp = (aio_worker_t *)arg;
1090 	aio_req_t *reqp;
1091 
1092 	/*
1093 	 * This isn't really necessary.  All signals are blocked.
1094 	 */
1095 	if (pthread_setspecific(_aio_key, aiowp) != 0)
1096 		_aiopanic("_aio_do_notify, pthread_setspecific()");
1097 
1098 	/*
1099 	 * Notifications are never cancelled.
1100 	 * All signals remain blocked, forever.
1101 	 */
1102 
1103 	for (;;) {
1104 		while ((reqp = _aio_req_get(aiowp)) == NULL)
1105 			_aio_idle(aiowp);
1106 		send_notification(&reqp->req_notify);
1107 		_aio_req_free(reqp);
1108 	}
1109 
1110 	/* NOTREACHED */
1111 	return (NULL);
1112 }
1113 
1114 /*
1115  * Do the completion semantics for a request that was either canceled
1116  * by _aio_cancel_req() or was completed by _aio_do_request().
1117  */
1118 static void
1119 _aiodone(aio_req_t *reqp, ssize_t retval, int error)
1120 {
1121 	aio_result_t *resultp = reqp->req_resultp;
1122 	int notify = 0;
1123 	aio_lio_t *head;
1124 	int sigev_none;
1125 	int sigev_signal;
1126 	int sigev_thread;
1127 	int sigev_port;
1128 	notif_param_t np;
1129 
1130 	/*
1131 	 * We call _aiodone() only for Posix I/O.
1132 	 */
1133 	ASSERT(POSIX_AIO(reqp));
1134 
1135 	sigev_none = 0;
1136 	sigev_signal = 0;
1137 	sigev_thread = 0;
1138 	sigev_port = 0;
1139 	np.np_signo = 0;
1140 	np.np_port = -1;
1141 	np.np_lio_signo = 0;
1142 	np.np_lio_port = -1;
1143 
1144 	switch (reqp->req_sigevent.sigev_notify) {
1145 	case SIGEV_NONE:
1146 		sigev_none = 1;
1147 		break;
1148 	case SIGEV_SIGNAL:
1149 		sigev_signal = 1;
1150 		break;
1151 	case SIGEV_THREAD:
1152 		sigev_thread = 1;
1153 		break;
1154 	case SIGEV_PORT:
1155 		sigev_port = 1;
1156 		break;
1157 	default:
1158 		_aiopanic("_aiodone: improper sigev_notify");
1159 		break;
1160 	}
1161 
1162 	/*
1163 	 * Figure out the notification parameters while holding __aio_mutex.
1164 	 * Actually perform the notifications after dropping __aio_mutex.
1165 	 * This allows us to sleep for a long time (if the notifications
1166 	 * incur delays) without impeding other async I/O operations.
1167 	 */
1168 
1169 	sig_mutex_lock(&__aio_mutex);
1170 
1171 	if (sigev_signal) {
1172 		if ((np.np_signo = reqp->req_sigevent.sigev_signo) != 0)
1173 			notify = 1;
1174 		np.np_user = reqp->req_sigevent.sigev_value.sival_ptr;
1175 	} else if (sigev_thread | sigev_port) {
1176 		if ((np.np_port = reqp->req_sigevent.sigev_signo) >= 0)
1177 			notify = 1;
1178 		np.np_event = reqp->req_op;
1179 		if (np.np_event == AIOFSYNC && reqp->req_largefile)
1180 			np.np_event = AIOFSYNC64;
1181 		np.np_object = (uintptr_t)reqp->req_aiocbp;
1182 		np.np_user = reqp->req_sigevent.sigev_value.sival_ptr;
1183 	}
1184 
1185 	if (resultp->aio_errno == EINPROGRESS)
1186 		_aio_set_result(reqp, retval, error);
1187 
1188 	_aio_outstand_cnt--;
1189 
1190 	head = reqp->req_head;
1191 	reqp->req_head = NULL;
1192 
1193 	if (sigev_none) {
1194 		_aio_enq_doneq(reqp);
1195 		reqp = NULL;
1196 	} else {
1197 		(void) _aio_hash_del(resultp);
1198 		_aio_req_mark_done(reqp);
1199 	}
1200 
1201 	_aio_waitn_wakeup();
1202 
1203 	/*
1204 	 * __aio_waitn() sets AIO_WAIT_INPROGRESS and
1205 	 * __aio_suspend() increments "_aio_kernel_suspend"
1206 	 * when they are waiting in the kernel for completed I/Os.
1207 	 *
1208 	 * _kaio(AIONOTIFY) awakes the corresponding function
1209 	 * in the kernel; then the corresponding __aio_waitn() or
1210 	 * __aio_suspend() function could reap the recently
1211 	 * completed I/Os (_aiodone()).
1212 	 */
1213 	if ((_aio_flags & AIO_WAIT_INPROGRESS) || _aio_kernel_suspend > 0)
1214 		(void) _kaio(AIONOTIFY);
1215 
1216 	sig_mutex_unlock(&__aio_mutex);
1217 
1218 	if (head != NULL) {
1219 		/*
1220 		 * If all the lio requests have completed,
1221 		 * prepare to notify the waiting thread.
1222 		 */
1223 		sig_mutex_lock(&head->lio_mutex);
1224 		ASSERT(head->lio_refcnt == head->lio_nent);
1225 		if (head->lio_refcnt == 1) {
1226 			int waiting = 0;
1227 			if (head->lio_mode == LIO_WAIT) {
1228 				if ((waiting = head->lio_waiting) != 0)
1229 					(void) cond_signal(&head->lio_cond_cv);
1230 			} else if (head->lio_port < 0) { /* none or signal */
1231 				if ((np.np_lio_signo = head->lio_signo) != 0)
1232 					notify = 1;
1233 				np.np_lio_user = head->lio_sigval.sival_ptr;
1234 			} else {			/* thread or port */
1235 				notify = 1;
1236 				np.np_lio_port = head->lio_port;
1237 				np.np_lio_event = head->lio_event;
1238 				np.np_lio_object =
1239 				    (uintptr_t)head->lio_sigevent;
1240 				np.np_lio_user = head->lio_sigval.sival_ptr;
1241 			}
1242 			head->lio_nent = head->lio_refcnt = 0;
1243 			sig_mutex_unlock(&head->lio_mutex);
1244 			if (waiting == 0)
1245 				_aio_lio_free(head);
1246 		} else {
1247 			head->lio_nent--;
1248 			head->lio_refcnt--;
1249 			sig_mutex_unlock(&head->lio_mutex);
1250 		}
1251 	}
1252 
1253 	/*
1254 	 * The request is completed; now perform the notifications.
1255 	 */
1256 	if (notify) {
1257 		if (reqp != NULL) {
1258 			/*
1259 			 * We usually put the request on the notification
1260 			 * queue because we don't want to block and delay
1261 			 * other operations behind us in the work queue.
1262 			 * Also we must never block on a cancel notification
1263 			 * because we are being called from an application
1264 			 * thread in this case and that could lead to deadlock
1265 			 * if no other thread is receiving notificatins.
1266 			 */
1267 			reqp->req_notify = np;
1268 			reqp->req_op = AIONOTIFY;
1269 			_aio_req_add(reqp, &__workers_no, AIONOTIFY);
1270 			reqp = NULL;
1271 		} else {
1272 			/*
1273 			 * We already put the request on the done queue,
1274 			 * so we can't queue it to the notification queue.
1275 			 * Just do the notification directly.
1276 			 */
1277 			send_notification(&np);
1278 		}
1279 	}
1280 
1281 	if (reqp != NULL)
1282 		_aio_req_free(reqp);
1283 }
1284 
1285 /*
1286  * Delete fsync requests from list head until there is
1287  * only one left.  Return 0 when there is only one,
1288  * otherwise return a non-zero value.
1289  */
1290 static int
1291 _aio_fsync_del(aio_worker_t *aiowp, aio_req_t *reqp)
1292 {
1293 	aio_lio_t *head = reqp->req_head;
1294 	int rval = 0;
1295 
1296 	ASSERT(reqp == aiowp->work_req);
1297 	sig_mutex_lock(&aiowp->work_qlock1);
1298 	sig_mutex_lock(&head->lio_mutex);
1299 	if (head->lio_refcnt > 1) {
1300 		head->lio_refcnt--;
1301 		head->lio_nent--;
1302 		aiowp->work_req = NULL;
1303 		sig_mutex_unlock(&head->lio_mutex);
1304 		sig_mutex_unlock(&aiowp->work_qlock1);
1305 		sig_mutex_lock(&__aio_mutex);
1306 		_aio_outstand_cnt--;
1307 		_aio_waitn_wakeup();
1308 		sig_mutex_unlock(&__aio_mutex);
1309 		_aio_req_free(reqp);
1310 		return (1);
1311 	}
1312 	ASSERT(head->lio_nent == 1 && head->lio_refcnt == 1);
1313 	reqp->req_head = NULL;
1314 	if (head->lio_canned)
1315 		reqp->req_state = AIO_REQ_CANCELED;
1316 	if (head->lio_mode == LIO_DESTROY) {
1317 		aiowp->work_req = NULL;
1318 		rval = 1;
1319 	}
1320 	sig_mutex_unlock(&head->lio_mutex);
1321 	sig_mutex_unlock(&aiowp->work_qlock1);
1322 	head->lio_refcnt--;
1323 	head->lio_nent--;
1324 	_aio_lio_free(head);
1325 	if (rval != 0)
1326 		_aio_req_free(reqp);
1327 	return (rval);
1328 }
1329 
1330 /*
1331  * worker is set idle when its work queue is empty.
1332  * The worker checks again that it has no more work and then
1333  * goes to sleep waiting for more work.
1334  */
1335 void
1336 _aio_idle(aio_worker_t *aiowp)
1337 {
1338 	int error = 0;
1339 
1340 	sig_mutex_lock(&aiowp->work_qlock1);
1341 	if (aiowp->work_count1 == 0) {
1342 		ASSERT(aiowp->work_minload1 == 0);
1343 		aiowp->work_idleflg = 1;
1344 		/*
1345 		 * A cancellation handler is not needed here.
1346 		 * aio worker threads are never cancelled via pthread_cancel().
1347 		 */
1348 		error = sig_cond_wait(&aiowp->work_idle_cv,
1349 		    &aiowp->work_qlock1);
1350 		/*
1351 		 * The idle flag is normally cleared before worker is awakened
1352 		 * by aio_req_add().  On error (EINTR), we clear it ourself.
1353 		 */
1354 		if (error)
1355 			aiowp->work_idleflg = 0;
1356 	}
1357 	sig_mutex_unlock(&aiowp->work_qlock1);
1358 }
1359 
1360 /*
1361  * A worker's completed AIO requests are placed onto a global
1362  * done queue.  The application is only sent a SIGIO signal if
1363  * the process has a handler enabled and it is not waiting via
1364  * aiowait().
1365  */
1366 static void
1367 _aio_work_done(aio_worker_t *aiowp)
1368 {
1369 	aio_req_t *reqp;
1370 
1371 	sig_mutex_lock(&aiowp->work_qlock1);
1372 	reqp = aiowp->work_prev1;
1373 	reqp->req_next = NULL;
1374 	aiowp->work_done1 = 0;
1375 	aiowp->work_tail1 = aiowp->work_next1;
1376 	if (aiowp->work_tail1 == NULL)
1377 		aiowp->work_head1 = NULL;
1378 	aiowp->work_prev1 = NULL;
1379 	sig_mutex_unlock(&aiowp->work_qlock1);
1380 	sig_mutex_lock(&__aio_mutex);
1381 	_aio_donecnt++;
1382 	_aio_outstand_cnt--;
1383 	_aio_req_done_cnt--;
1384 	ASSERT(_aio_donecnt > 0 &&
1385 	    _aio_outstand_cnt >= 0 &&
1386 	    _aio_req_done_cnt >= 0);
1387 	ASSERT(reqp != NULL);
1388 
1389 	if (_aio_done_tail == NULL) {
1390 		_aio_done_head = _aio_done_tail = reqp;
1391 	} else {
1392 		_aio_done_head->req_next = reqp;
1393 		_aio_done_head = reqp;
1394 	}
1395 
1396 	if (_aiowait_flag) {
1397 		sig_mutex_unlock(&__aio_mutex);
1398 		(void) _kaio(AIONOTIFY);
1399 	} else {
1400 		sig_mutex_unlock(&__aio_mutex);
1401 		if (_sigio_enabled)
1402 			(void) kill(__pid, SIGIO);
1403 	}
1404 }
1405 
1406 /*
1407  * The done queue consists of AIO requests that are in either the
1408  * AIO_REQ_DONE or AIO_REQ_CANCELED state.  Requests that were cancelled
1409  * are discarded.  If the done queue is empty then NULL is returned.
1410  * Otherwise the address of a done aio_result_t is returned.
1411  */
1412 aio_result_t *
1413 _aio_req_done(void)
1414 {
1415 	aio_req_t *reqp;
1416 	aio_result_t *resultp;
1417 
1418 	ASSERT(MUTEX_HELD(&__aio_mutex));
1419 
1420 	if ((reqp = _aio_done_tail) != NULL) {
1421 		if ((_aio_done_tail = reqp->req_next) == NULL)
1422 			_aio_done_head = NULL;
1423 		ASSERT(_aio_donecnt > 0);
1424 		_aio_donecnt--;
1425 		(void) _aio_hash_del(reqp->req_resultp);
1426 		resultp = reqp->req_resultp;
1427 		ASSERT(reqp->req_state == AIO_REQ_DONE);
1428 		_aio_req_free(reqp);
1429 		return (resultp);
1430 	}
1431 	/* is queue empty? */
1432 	if (reqp == NULL && _aio_outstand_cnt == 0) {
1433 		return ((aio_result_t *)-1);
1434 	}
1435 	return (NULL);
1436 }
1437 
1438 /*
1439  * Set the return and errno values for the application's use.
1440  *
1441  * For the Posix interfaces, we must set the return value first followed
1442  * by the errno value because the Posix interfaces allow for a change
1443  * in the errno value from EINPROGRESS to something else to signal
1444  * the completion of the asynchronous request.
1445  *
1446  * The opposite is true for the Solaris interfaces.  These allow for
1447  * a change in the return value from AIO_INPROGRESS to something else
1448  * to signal the completion of the asynchronous request.
1449  */
1450 void
1451 _aio_set_result(aio_req_t *reqp, ssize_t retval, int error)
1452 {
1453 	aio_result_t *resultp = reqp->req_resultp;
1454 
1455 	if (POSIX_AIO(reqp)) {
1456 		resultp->aio_return = retval;
1457 		membar_producer();
1458 		resultp->aio_errno = error;
1459 	} else {
1460 		resultp->aio_errno = error;
1461 		membar_producer();
1462 		resultp->aio_return = retval;
1463 	}
1464 }
1465 
1466 /*
1467  * Add an AIO request onto the next work queue.
1468  * A circular list of workers is used to choose the next worker.
1469  */
1470 void
1471 _aio_req_add(aio_req_t *reqp, aio_worker_t **nextworker, int mode)
1472 {
1473 	aio_worker_t *aiowp;
1474 	aio_worker_t *first;
1475 	int load_bal_flg = 1;
1476 	int found;
1477 
1478 	ASSERT(reqp->req_state != AIO_REQ_DONEQ);
1479 	reqp->req_next = NULL;
1480 	/*
1481 	 * Try to acquire the next worker's work queue.  If it is locked,
1482 	 * then search the list of workers until a queue is found unlocked,
1483 	 * or until the list is completely traversed at which point another
1484 	 * worker will be created.
1485 	 */
1486 	_sigoff();		/* defer SIGIO */
1487 	sig_mutex_lock(&__aio_mutex);
1488 	first = aiowp = *nextworker;
1489 	if (mode != AIONOTIFY)
1490 		_aio_outstand_cnt++;
1491 	sig_mutex_unlock(&__aio_mutex);
1492 
1493 	switch (mode) {
1494 	case AIOREAD:
1495 	case AIOWRITE:
1496 	case AIOAREAD:
1497 	case AIOAWRITE:
1498 #if !defined(_LP64)
1499 	case AIOAREAD64:
1500 	case AIOAWRITE64:
1501 #endif
1502 		/* try to find an idle worker */
1503 		found = 0;
1504 		do {
1505 			if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) {
1506 				if (aiowp->work_idleflg) {
1507 					found = 1;
1508 					break;
1509 				}
1510 				sig_mutex_unlock(&aiowp->work_qlock1);
1511 			}
1512 		} while ((aiowp = aiowp->work_forw) != first);
1513 
1514 		if (found) {
1515 			aiowp->work_minload1++;
1516 			break;
1517 		}
1518 
1519 		/* try to acquire some worker's queue lock */
1520 		do {
1521 			if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) {
1522 				found = 1;
1523 				break;
1524 			}
1525 		} while ((aiowp = aiowp->work_forw) != first);
1526 
1527 		/*
1528 		 * Create more workers when the workers appear overloaded.
1529 		 * Either all the workers are busy draining their queues
1530 		 * or no worker's queue lock could be acquired.
1531 		 */
1532 		if (!found) {
1533 			if (_aio_worker_cnt < _max_workers) {
1534 				if (_aio_create_worker(reqp, mode))
1535 					_aiopanic("_aio_req_add: add worker");
1536 				_sigon();	/* reenable SIGIO */
1537 				return;
1538 			}
1539 
1540 			/*
1541 			 * No worker available and we have created
1542 			 * _max_workers, keep going through the
1543 			 * list slowly until we get a lock
1544 			 */
1545 			while (sig_mutex_trylock(&aiowp->work_qlock1) != 0) {
1546 				/*
1547 				 * give someone else a chance
1548 				 */
1549 				_aio_delay(1);
1550 				aiowp = aiowp->work_forw;
1551 			}
1552 		}
1553 
1554 		ASSERT(MUTEX_HELD(&aiowp->work_qlock1));
1555 		if (_aio_worker_cnt < _max_workers &&
1556 		    aiowp->work_minload1 >= _minworkload) {
1557 			sig_mutex_unlock(&aiowp->work_qlock1);
1558 			sig_mutex_lock(&__aio_mutex);
1559 			*nextworker = aiowp->work_forw;
1560 			sig_mutex_unlock(&__aio_mutex);
1561 			if (_aio_create_worker(reqp, mode))
1562 				_aiopanic("aio_req_add: add worker");
1563 			_sigon();	/* reenable SIGIO */
1564 			return;
1565 		}
1566 		aiowp->work_minload1++;
1567 		break;
1568 	case AIOFSYNC:
1569 	case AIONOTIFY:
1570 		load_bal_flg = 0;
1571 		sig_mutex_lock(&aiowp->work_qlock1);
1572 		break;
1573 	default:
1574 		_aiopanic("_aio_req_add: invalid mode");
1575 		break;
1576 	}
1577 	/*
1578 	 * Put request onto worker's work queue.
1579 	 */
1580 	if (aiowp->work_tail1 == NULL) {
1581 		ASSERT(aiowp->work_count1 == 0);
1582 		aiowp->work_tail1 = reqp;
1583 		aiowp->work_next1 = reqp;
1584 	} else {
1585 		aiowp->work_head1->req_next = reqp;
1586 		if (aiowp->work_next1 == NULL)
1587 			aiowp->work_next1 = reqp;
1588 	}
1589 	reqp->req_state = AIO_REQ_QUEUED;
1590 	reqp->req_worker = aiowp;
1591 	aiowp->work_head1 = reqp;
1592 	/*
1593 	 * Awaken worker if it is not currently active.
1594 	 */
1595 	if (aiowp->work_count1++ == 0 && aiowp->work_idleflg) {
1596 		aiowp->work_idleflg = 0;
1597 		(void) cond_signal(&aiowp->work_idle_cv);
1598 	}
1599 	sig_mutex_unlock(&aiowp->work_qlock1);
1600 
1601 	if (load_bal_flg) {
1602 		sig_mutex_lock(&__aio_mutex);
1603 		*nextworker = aiowp->work_forw;
1604 		sig_mutex_unlock(&__aio_mutex);
1605 	}
1606 	_sigon();	/* reenable SIGIO */
1607 }
1608 
1609 /*
1610  * Get an AIO request for a specified worker.
1611  * If the work queue is empty, return NULL.
1612  */
1613 aio_req_t *
1614 _aio_req_get(aio_worker_t *aiowp)
1615 {
1616 	aio_req_t *reqp;
1617 
1618 	sig_mutex_lock(&aiowp->work_qlock1);
1619 	if ((reqp = aiowp->work_next1) != NULL) {
1620 		/*
1621 		 * Remove a POSIX request from the queue; the
1622 		 * request queue is a singularly linked list
1623 		 * with a previous pointer.  The request is
1624 		 * removed by updating the previous pointer.
1625 		 *
1626 		 * Non-posix requests are left on the queue
1627 		 * to eventually be placed on the done queue.
1628 		 */
1629 
1630 		if (POSIX_AIO(reqp)) {
1631 			if (aiowp->work_prev1 == NULL) {
1632 				aiowp->work_tail1 = reqp->req_next;
1633 				if (aiowp->work_tail1 == NULL)
1634 					aiowp->work_head1 = NULL;
1635 			} else {
1636 				aiowp->work_prev1->req_next = reqp->req_next;
1637 				if (aiowp->work_head1 == reqp)
1638 					aiowp->work_head1 = reqp->req_next;
1639 			}
1640 
1641 		} else {
1642 			aiowp->work_prev1 = reqp;
1643 			ASSERT(aiowp->work_done1 >= 0);
1644 			aiowp->work_done1++;
1645 		}
1646 		ASSERT(reqp != reqp->req_next);
1647 		aiowp->work_next1 = reqp->req_next;
1648 		ASSERT(aiowp->work_count1 >= 1);
1649 		aiowp->work_count1--;
1650 		switch (reqp->req_op) {
1651 		case AIOREAD:
1652 		case AIOWRITE:
1653 		case AIOAREAD:
1654 		case AIOAWRITE:
1655 #if !defined(_LP64)
1656 		case AIOAREAD64:
1657 		case AIOAWRITE64:
1658 #endif
1659 			ASSERT(aiowp->work_minload1 > 0);
1660 			aiowp->work_minload1--;
1661 			break;
1662 		}
1663 		reqp->req_state = AIO_REQ_INPROGRESS;
1664 	}
1665 	aiowp->work_req = reqp;
1666 	ASSERT(reqp != NULL || aiowp->work_count1 == 0);
1667 	sig_mutex_unlock(&aiowp->work_qlock1);
1668 	return (reqp);
1669 }
1670 
1671 static void
1672 _aio_req_del(aio_worker_t *aiowp, aio_req_t *reqp, int ostate)
1673 {
1674 	aio_req_t **last;
1675 	aio_req_t *lastrp;
1676 	aio_req_t *next;
1677 
1678 	ASSERT(aiowp != NULL);
1679 	ASSERT(MUTEX_HELD(&aiowp->work_qlock1));
1680 	if (POSIX_AIO(reqp)) {
1681 		if (ostate != AIO_REQ_QUEUED)
1682 			return;
1683 	}
1684 	last = &aiowp->work_tail1;
1685 	lastrp = aiowp->work_tail1;
1686 	ASSERT(ostate == AIO_REQ_QUEUED || ostate == AIO_REQ_INPROGRESS);
1687 	while ((next = *last) != NULL) {
1688 		if (next == reqp) {
1689 			*last = next->req_next;
1690 			if (aiowp->work_next1 == next)
1691 				aiowp->work_next1 = next->req_next;
1692 
1693 			if ((next->req_next != NULL) ||
1694 			    (aiowp->work_done1 == 0)) {
1695 				if (aiowp->work_head1 == next)
1696 					aiowp->work_head1 = next->req_next;
1697 				if (aiowp->work_prev1 == next)
1698 					aiowp->work_prev1 = next->req_next;
1699 			} else {
1700 				if (aiowp->work_head1 == next)
1701 					aiowp->work_head1 = lastrp;
1702 				if (aiowp->work_prev1 == next)
1703 					aiowp->work_prev1 = lastrp;
1704 			}
1705 
1706 			if (ostate == AIO_REQ_QUEUED) {
1707 				ASSERT(aiowp->work_count1 >= 1);
1708 				aiowp->work_count1--;
1709 				ASSERT(aiowp->work_minload1 >= 1);
1710 				aiowp->work_minload1--;
1711 			} else {
1712 				ASSERT(ostate == AIO_REQ_INPROGRESS &&
1713 				    !POSIX_AIO(reqp));
1714 				aiowp->work_done1--;
1715 			}
1716 			return;
1717 		}
1718 		last = &next->req_next;
1719 		lastrp = next;
1720 	}
1721 	/* NOTREACHED */
1722 }
1723 
1724 static void
1725 _aio_enq_doneq(aio_req_t *reqp)
1726 {
1727 	if (_aio_doneq == NULL) {
1728 		_aio_doneq = reqp;
1729 		reqp->req_next = reqp->req_prev = reqp;
1730 	} else {
1731 		reqp->req_next = _aio_doneq;
1732 		reqp->req_prev = _aio_doneq->req_prev;
1733 		_aio_doneq->req_prev->req_next = reqp;
1734 		_aio_doneq->req_prev = reqp;
1735 	}
1736 	reqp->req_state = AIO_REQ_DONEQ;
1737 	_aio_doneq_cnt++;
1738 }
1739 
1740 /*
1741  * caller owns the _aio_mutex
1742  */
1743 aio_req_t *
1744 _aio_req_remove(aio_req_t *reqp)
1745 {
1746 	if (reqp && reqp->req_state != AIO_REQ_DONEQ)
1747 		return (NULL);
1748 
1749 	if (reqp) {
1750 		/* request in done queue */
1751 		if (_aio_doneq == reqp)
1752 			_aio_doneq = reqp->req_next;
1753 		if (_aio_doneq == reqp) {
1754 			/* only one request on queue */
1755 			_aio_doneq = NULL;
1756 		} else {
1757 			aio_req_t *tmp = reqp->req_next;
1758 			reqp->req_prev->req_next = tmp;
1759 			tmp->req_prev = reqp->req_prev;
1760 		}
1761 	} else if ((reqp = _aio_doneq) != NULL) {
1762 		if (reqp == reqp->req_next) {
1763 			/* only one request on queue */
1764 			_aio_doneq = NULL;
1765 		} else {
1766 			reqp->req_prev->req_next = _aio_doneq = reqp->req_next;
1767 			_aio_doneq->req_prev = reqp->req_prev;
1768 		}
1769 	}
1770 	if (reqp) {
1771 		_aio_doneq_cnt--;
1772 		reqp->req_next = reqp->req_prev = reqp;
1773 		reqp->req_state = AIO_REQ_DONE;
1774 	}
1775 	return (reqp);
1776 }
1777 
1778 /*
1779  * An AIO request is identified by an aio_result_t pointer.  The library
1780  * maps this aio_result_t pointer to its internal representation using a
1781  * hash table.  This function adds an aio_result_t pointer to the hash table.
1782  */
1783 static int
1784 _aio_hash_insert(aio_result_t *resultp, aio_req_t *reqp)
1785 {
1786 	aio_hash_t *hashp;
1787 	aio_req_t **prev;
1788 	aio_req_t *next;
1789 
1790 	hashp = _aio_hash + AIOHASH(resultp);
1791 	sig_mutex_lock(&hashp->hash_lock);
1792 	prev = &hashp->hash_ptr;
1793 	while ((next = *prev) != NULL) {
1794 		if (resultp == next->req_resultp) {
1795 			sig_mutex_unlock(&hashp->hash_lock);
1796 			return (-1);
1797 		}
1798 		prev = &next->req_link;
1799 	}
1800 	*prev = reqp;
1801 	ASSERT(reqp->req_link == NULL);
1802 	sig_mutex_unlock(&hashp->hash_lock);
1803 	return (0);
1804 }
1805 
1806 /*
1807  * Remove an entry from the hash table.
1808  */
1809 aio_req_t *
1810 _aio_hash_del(aio_result_t *resultp)
1811 {
1812 	aio_hash_t *hashp;
1813 	aio_req_t **prev;
1814 	aio_req_t *next = NULL;
1815 
1816 	if (_aio_hash != NULL) {
1817 		hashp = _aio_hash + AIOHASH(resultp);
1818 		sig_mutex_lock(&hashp->hash_lock);
1819 		prev = &hashp->hash_ptr;
1820 		while ((next = *prev) != NULL) {
1821 			if (resultp == next->req_resultp) {
1822 				*prev = next->req_link;
1823 				next->req_link = NULL;
1824 				break;
1825 			}
1826 			prev = &next->req_link;
1827 		}
1828 		sig_mutex_unlock(&hashp->hash_lock);
1829 	}
1830 	return (next);
1831 }
1832 
1833 /*
1834  *  find an entry in the hash table
1835  */
1836 aio_req_t *
1837 _aio_hash_find(aio_result_t *resultp)
1838 {
1839 	aio_hash_t *hashp;
1840 	aio_req_t **prev;
1841 	aio_req_t *next = NULL;
1842 
1843 	if (_aio_hash != NULL) {
1844 		hashp = _aio_hash + AIOHASH(resultp);
1845 		sig_mutex_lock(&hashp->hash_lock);
1846 		prev = &hashp->hash_ptr;
1847 		while ((next = *prev) != NULL) {
1848 			if (resultp == next->req_resultp)
1849 				break;
1850 			prev = &next->req_link;
1851 		}
1852 		sig_mutex_unlock(&hashp->hash_lock);
1853 	}
1854 	return (next);
1855 }
1856 
1857 /*
1858  * AIO interface for POSIX
1859  */
1860 int
1861 _aio_rw(aiocb_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker,
1862     int mode, int flg)
1863 {
1864 	aio_req_t *reqp;
1865 	aio_args_t *ap;
1866 	int kerr;
1867 
1868 	if (aiocbp == NULL) {
1869 		errno = EINVAL;
1870 		return (-1);
1871 	}
1872 
1873 	/* initialize kaio */
1874 	if (!_kaio_ok)
1875 		_kaio_init();
1876 
1877 	aiocbp->aio_state = NOCHECK;
1878 
1879 	/*
1880 	 * If we have been called because a list I/O
1881 	 * kaio() failed, we dont want to repeat the
1882 	 * system call
1883 	 */
1884 
1885 	if (flg & AIO_KAIO) {
1886 		/*
1887 		 * Try kernel aio first.
1888 		 * If errno is ENOTSUP/EBADFD,
1889 		 * fall back to the thread implementation.
1890 		 */
1891 		if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) {
1892 			aiocbp->aio_resultp.aio_errno = EINPROGRESS;
1893 			aiocbp->aio_state = CHECK;
1894 			kerr = (int)_kaio(mode, aiocbp);
1895 			if (kerr == 0)
1896 				return (0);
1897 			if (errno != ENOTSUP && errno != EBADFD) {
1898 				aiocbp->aio_resultp.aio_errno = errno;
1899 				aiocbp->aio_resultp.aio_return = -1;
1900 				aiocbp->aio_state = NOCHECK;
1901 				return (-1);
1902 			}
1903 			if (errno == EBADFD)
1904 				SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes);
1905 		}
1906 	}
1907 
1908 	aiocbp->aio_resultp.aio_errno = EINPROGRESS;
1909 	aiocbp->aio_state = USERAIO;
1910 
1911 	if (!__uaio_ok && __uaio_init() == -1)
1912 		return (-1);
1913 
1914 	if ((reqp = _aio_req_alloc()) == NULL) {
1915 		errno = EAGAIN;
1916 		return (-1);
1917 	}
1918 
1919 	/*
1920 	 * If an LIO request, add the list head to the aio request
1921 	 */
1922 	reqp->req_head = lio_head;
1923 	reqp->req_type = AIO_POSIX_REQ;
1924 	reqp->req_op = mode;
1925 	reqp->req_largefile = 0;
1926 
1927 	if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) {
1928 		reqp->req_sigevent.sigev_notify = SIGEV_NONE;
1929 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) {
1930 		reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL;
1931 		reqp->req_sigevent.sigev_signo =
1932 		    aiocbp->aio_sigevent.sigev_signo;
1933 		reqp->req_sigevent.sigev_value.sival_ptr =
1934 		    aiocbp->aio_sigevent.sigev_value.sival_ptr;
1935 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) {
1936 		port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr;
1937 		reqp->req_sigevent.sigev_notify = SIGEV_PORT;
1938 		/*
1939 		 * Reuse the sigevent structure to contain the port number
1940 		 * and the user value.  Same for SIGEV_THREAD, below.
1941 		 */
1942 		reqp->req_sigevent.sigev_signo =
1943 		    pn->portnfy_port;
1944 		reqp->req_sigevent.sigev_value.sival_ptr =
1945 		    pn->portnfy_user;
1946 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) {
1947 		reqp->req_sigevent.sigev_notify = SIGEV_THREAD;
1948 		/*
1949 		 * The sigevent structure contains the port number
1950 		 * and the user value.  Same for SIGEV_PORT, above.
1951 		 */
1952 		reqp->req_sigevent.sigev_signo =
1953 		    aiocbp->aio_sigevent.sigev_signo;
1954 		reqp->req_sigevent.sigev_value.sival_ptr =
1955 		    aiocbp->aio_sigevent.sigev_value.sival_ptr;
1956 	}
1957 
1958 	reqp->req_resultp = &aiocbp->aio_resultp;
1959 	reqp->req_aiocbp = aiocbp;
1960 	ap = &reqp->req_args;
1961 	ap->fd = aiocbp->aio_fildes;
1962 	ap->buf = (caddr_t)aiocbp->aio_buf;
1963 	ap->bufsz = aiocbp->aio_nbytes;
1964 	ap->offset = aiocbp->aio_offset;
1965 
1966 	if ((flg & AIO_NO_DUPS) &&
1967 	    _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) {
1968 		_aiopanic("_aio_rw(): request already in hash table");
1969 		_aio_req_free(reqp);
1970 		errno = EINVAL;
1971 		return (-1);
1972 	}
1973 	_aio_req_add(reqp, nextworker, mode);
1974 	return (0);
1975 }
1976 
1977 #if !defined(_LP64)
1978 /*
1979  * 64-bit AIO interface for POSIX
1980  */
1981 int
1982 _aio_rw64(aiocb64_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker,
1983     int mode, int flg)
1984 {
1985 	aio_req_t *reqp;
1986 	aio_args_t *ap;
1987 	int kerr;
1988 
1989 	if (aiocbp == NULL) {
1990 		errno = EINVAL;
1991 		return (-1);
1992 	}
1993 
1994 	/* initialize kaio */
1995 	if (!_kaio_ok)
1996 		_kaio_init();
1997 
1998 	aiocbp->aio_state = NOCHECK;
1999 
2000 	/*
2001 	 * If we have been called because a list I/O
2002 	 * kaio() failed, we dont want to repeat the
2003 	 * system call
2004 	 */
2005 
2006 	if (flg & AIO_KAIO) {
2007 		/*
2008 		 * Try kernel aio first.
2009 		 * If errno is ENOTSUP/EBADFD,
2010 		 * fall back to the thread implementation.
2011 		 */
2012 		if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) {
2013 			aiocbp->aio_resultp.aio_errno = EINPROGRESS;
2014 			aiocbp->aio_state = CHECK;
2015 			kerr = (int)_kaio(mode, aiocbp);
2016 			if (kerr == 0)
2017 				return (0);
2018 			if (errno != ENOTSUP && errno != EBADFD) {
2019 				aiocbp->aio_resultp.aio_errno = errno;
2020 				aiocbp->aio_resultp.aio_return = -1;
2021 				aiocbp->aio_state = NOCHECK;
2022 				return (-1);
2023 			}
2024 			if (errno == EBADFD)
2025 				SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes);
2026 		}
2027 	}
2028 
2029 	aiocbp->aio_resultp.aio_errno = EINPROGRESS;
2030 	aiocbp->aio_state = USERAIO;
2031 
2032 	if (!__uaio_ok && __uaio_init() == -1)
2033 		return (-1);
2034 
2035 	if ((reqp = _aio_req_alloc()) == NULL) {
2036 		errno = EAGAIN;
2037 		return (-1);
2038 	}
2039 
2040 	/*
2041 	 * If an LIO request, add the list head to the aio request
2042 	 */
2043 	reqp->req_head = lio_head;
2044 	reqp->req_type = AIO_POSIX_REQ;
2045 	reqp->req_op = mode;
2046 	reqp->req_largefile = 1;
2047 
2048 	if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) {
2049 		reqp->req_sigevent.sigev_notify = SIGEV_NONE;
2050 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) {
2051 		reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL;
2052 		reqp->req_sigevent.sigev_signo =
2053 		    aiocbp->aio_sigevent.sigev_signo;
2054 		reqp->req_sigevent.sigev_value.sival_ptr =
2055 		    aiocbp->aio_sigevent.sigev_value.sival_ptr;
2056 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) {
2057 		port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr;
2058 		reqp->req_sigevent.sigev_notify = SIGEV_PORT;
2059 		reqp->req_sigevent.sigev_signo =
2060 		    pn->portnfy_port;
2061 		reqp->req_sigevent.sigev_value.sival_ptr =
2062 		    pn->portnfy_user;
2063 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) {
2064 		reqp->req_sigevent.sigev_notify = SIGEV_THREAD;
2065 		reqp->req_sigevent.sigev_signo =
2066 		    aiocbp->aio_sigevent.sigev_signo;
2067 		reqp->req_sigevent.sigev_value.sival_ptr =
2068 		    aiocbp->aio_sigevent.sigev_value.sival_ptr;
2069 	}
2070 
2071 	reqp->req_resultp = &aiocbp->aio_resultp;
2072 	reqp->req_aiocbp = aiocbp;
2073 	ap = &reqp->req_args;
2074 	ap->fd = aiocbp->aio_fildes;
2075 	ap->buf = (caddr_t)aiocbp->aio_buf;
2076 	ap->bufsz = aiocbp->aio_nbytes;
2077 	ap->offset = aiocbp->aio_offset;
2078 
2079 	if ((flg & AIO_NO_DUPS) &&
2080 	    _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) {
2081 		_aiopanic("_aio_rw64(): request already in hash table");
2082 		_aio_req_free(reqp);
2083 		errno = EINVAL;
2084 		return (-1);
2085 	}
2086 	_aio_req_add(reqp, nextworker, mode);
2087 	return (0);
2088 }
2089 #endif	/* !defined(_LP64) */
2090