xref: /illumos-gate/usr/src/lib/libc/port/aio/aio.c (revision bced1f33)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 
22 /*
23  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 
27 #pragma ident	"%Z%%M%	%I%	%E% SMI"
28 
29 #include "lint.h"
30 #include "thr_uberdata.h"
31 #include "asyncio.h"
32 #include <atomic.h>
33 #include <sys/param.h>
34 #include <sys/file.h>
35 #include <sys/port.h>
36 
37 static int _aio_hash_insert(aio_result_t *, aio_req_t *);
38 static aio_req_t *_aio_req_get(aio_worker_t *);
39 static void _aio_req_add(aio_req_t *, aio_worker_t **, int);
40 static void _aio_req_del(aio_worker_t *, aio_req_t *, int);
41 static void _aio_work_done(aio_worker_t *);
42 static void _aio_enq_doneq(aio_req_t *);
43 
44 extern void _aio_lio_free(aio_lio_t *);
45 
46 extern int __fdsync(int, int);
47 extern int __fcntl(int, int, ...);
48 extern int _port_dispatch(int, int, int, int, uintptr_t, void *);
49 
50 static int _aio_fsync_del(aio_worker_t *, aio_req_t *);
51 static void _aiodone(aio_req_t *, ssize_t, int);
52 static void _aio_cancel_work(aio_worker_t *, int, int *, int *);
53 static void _aio_finish_request(aio_worker_t *, ssize_t, int);
54 
55 /*
56  * switch for kernel async I/O
57  */
58 int _kaio_ok = 0;		/* 0 = disabled, 1 = on, -1 = error */
59 
60 /*
61  * Key for thread-specific data
62  */
63 pthread_key_t _aio_key;
64 
65 /*
66  * Array for determining whether or not a file supports kaio.
67  * Initialized in _kaio_init().
68  */
69 uint32_t *_kaio_supported = NULL;
70 
71 /*
72  *  workers for read/write requests
73  * (__aio_mutex lock protects circular linked list of workers)
74  */
75 aio_worker_t *__workers_rw;	/* circular list of AIO workers */
76 aio_worker_t *__nextworker_rw;	/* next worker in list of workers */
77 int __rw_workerscnt;		/* number of read/write workers */
78 
79 /*
80  * worker for notification requests.
81  */
82 aio_worker_t *__workers_no;	/* circular list of AIO workers */
83 aio_worker_t *__nextworker_no;	/* next worker in list of workers */
84 int __no_workerscnt;		/* number of write workers */
85 
86 aio_req_t *_aio_done_tail;		/* list of done requests */
87 aio_req_t *_aio_done_head;
88 
89 mutex_t __aio_initlock = DEFAULTMUTEX;	/* makes aio initialization atomic */
90 cond_t __aio_initcv = DEFAULTCV;
91 int __aio_initbusy = 0;
92 
93 mutex_t __aio_mutex = DEFAULTMUTEX;	/* protects counts, and linked lists */
94 cond_t _aio_iowait_cv = DEFAULTCV;	/* wait for userland I/Os */
95 
96 pid_t __pid = (pid_t)-1;		/* initialize as invalid pid */
97 int _sigio_enabled = 0;			/* when set, send SIGIO signal */
98 
99 aio_hash_t *_aio_hash;
100 
101 aio_req_t *_aio_doneq;			/* double linked done queue list */
102 
103 int _aio_donecnt = 0;
104 int _aio_waitncnt = 0;			/* # of requests for aio_waitn */
105 int _aio_doneq_cnt = 0;
106 int _aio_outstand_cnt = 0;		/* # of outstanding requests */
107 int _kaio_outstand_cnt = 0;		/* # of outstanding kaio requests */
108 int _aio_req_done_cnt = 0;		/* req. done but not in "done queue" */
109 int _aio_kernel_suspend = 0;		/* active kernel kaio calls */
110 int _aio_suscv_cnt = 0;			/* aio_suspend calls waiting on cv's */
111 
112 int _max_workers = 256;			/* max number of workers permitted */
113 int _min_workers = 4;			/* min number of workers */
114 int _minworkload = 2;			/* min number of request in q */
115 int _aio_worker_cnt = 0;		/* number of workers to do requests */
116 int __uaio_ok = 0;			/* AIO has been enabled */
117 sigset_t _worker_set;			/* worker's signal mask */
118 
119 int _aiowait_flag = 0;			/* when set, aiowait() is inprogress */
120 int _aio_flags = 0;			/* see asyncio.h defines for */
121 
122 aio_worker_t *_kaiowp = NULL;		/* points to kaio cleanup thread */
123 
124 int hz;					/* clock ticks per second */
125 
126 static int
_kaio_supported_init(void)127 _kaio_supported_init(void)
128 {
129 	void *ptr;
130 	size_t size;
131 
132 	if (_kaio_supported != NULL)	/* already initialized */
133 		return (0);
134 
135 	size = MAX_KAIO_FDARRAY_SIZE * sizeof (uint32_t);
136 	ptr = mmap(NULL, size, PROT_READ | PROT_WRITE,
137 	    MAP_PRIVATE | MAP_ANON, -1, (off_t)0);
138 	if (ptr == MAP_FAILED)
139 		return (-1);
140 	_kaio_supported = ptr;
141 	return (0);
142 }
143 
144 /*
145  * The aio subsystem is initialized when an AIO request is made.
146  * Constants are initialized like the max number of workers that
147  * the subsystem can create, and the minimum number of workers
148  * permitted before imposing some restrictions.  Also, some
149  * workers are created.
150  */
151 int
__uaio_init(void)152 __uaio_init(void)
153 {
154 	int ret = -1;
155 	int i;
156 	int cancel_state;
157 
158 	lmutex_lock(&__aio_initlock);
159 	(void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state);
160 	while (__aio_initbusy)
161 		(void) cond_wait(&__aio_initcv, &__aio_initlock);
162 	(void) pthread_setcancelstate(cancel_state, NULL);
163 	if (__uaio_ok) {	/* already initialized */
164 		lmutex_unlock(&__aio_initlock);
165 		return (0);
166 	}
167 	__aio_initbusy = 1;
168 	lmutex_unlock(&__aio_initlock);
169 
170 	hz = (int)sysconf(_SC_CLK_TCK);
171 	__pid = getpid();
172 
173 	setup_cancelsig(SIGAIOCANCEL);
174 
175 	if (_kaio_supported_init() != 0)
176 		goto out;
177 
178 	/*
179 	 * Allocate and initialize the hash table.
180 	 * Do this only once, even if __uaio_init() is called twice.
181 	 */
182 	if (_aio_hash == NULL) {
183 		/* LINTED pointer cast */
184 		_aio_hash = (aio_hash_t *)mmap(NULL,
185 		    HASHSZ * sizeof (aio_hash_t), PROT_READ | PROT_WRITE,
186 		    MAP_PRIVATE | MAP_ANON, -1, (off_t)0);
187 		if ((void *)_aio_hash == MAP_FAILED) {
188 			_aio_hash = NULL;
189 			goto out;
190 		}
191 		for (i = 0; i < HASHSZ; i++)
192 			(void) mutex_init(&_aio_hash[i].hash_lock,
193 			    USYNC_THREAD, NULL);
194 	}
195 
196 	/*
197 	 * Initialize worker's signal mask to only catch SIGAIOCANCEL.
198 	 */
199 	(void) sigfillset(&_worker_set);
200 	(void) sigdelset(&_worker_set, SIGAIOCANCEL);
201 
202 	/*
203 	 * Create one worker to send asynchronous notifications.
204 	 * Do this only once, even if __uaio_init() is called twice.
205 	 */
206 	if (__no_workerscnt == 0 &&
207 	    (_aio_create_worker(NULL, AIONOTIFY) != 0)) {
208 		errno = EAGAIN;
209 		goto out;
210 	}
211 
212 	/*
213 	 * Create the minimum number of read/write workers.
214 	 * And later check whether atleast one worker is created;
215 	 * lwp_create() calls could fail because of segkp exhaustion.
216 	 */
217 	for (i = 0; i < _min_workers; i++)
218 		(void) _aio_create_worker(NULL, AIOREAD);
219 	if (__rw_workerscnt == 0) {
220 		errno = EAGAIN;
221 		goto out;
222 	}
223 
224 	ret = 0;
225 out:
226 	lmutex_lock(&__aio_initlock);
227 	if (ret == 0)
228 		__uaio_ok = 1;
229 	__aio_initbusy = 0;
230 	(void) cond_broadcast(&__aio_initcv);
231 	lmutex_unlock(&__aio_initlock);
232 	return (ret);
233 }
234 
235 /*
236  * Called from close() before actually performing the real _close().
237  */
238 void
_aio_close(int fd)239 _aio_close(int fd)
240 {
241 	if (fd < 0)	/* avoid cancelling everything */
242 		return;
243 	/*
244 	 * Cancel all outstanding aio requests for this file descriptor.
245 	 */
246 	if (__uaio_ok)
247 		(void) aiocancel_all(fd);
248 	/*
249 	 * If we have allocated the bit array, clear the bit for this file.
250 	 * The next open may re-use this file descriptor and the new file
251 	 * may have different kaio() behaviour.
252 	 */
253 	if (_kaio_supported != NULL)
254 		CLEAR_KAIO_SUPPORTED(fd);
255 }
256 
257 /*
258  * special kaio cleanup thread sits in a loop in the
259  * kernel waiting for pending kaio requests to complete.
260  */
261 void *
_kaio_cleanup_thread(void * arg)262 _kaio_cleanup_thread(void *arg)
263 {
264 	if (pthread_setspecific(_aio_key, arg) != 0)
265 		aio_panic("_kaio_cleanup_thread, pthread_setspecific()");
266 	(void) _kaio(AIOSTART);
267 	return (arg);
268 }
269 
270 /*
271  * initialize kaio.
272  */
273 void
_kaio_init()274 _kaio_init()
275 {
276 	int error;
277 	sigset_t oset;
278 	int cancel_state;
279 
280 	lmutex_lock(&__aio_initlock);
281 	(void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state);
282 	while (__aio_initbusy)
283 		(void) cond_wait(&__aio_initcv, &__aio_initlock);
284 	(void) pthread_setcancelstate(cancel_state, NULL);
285 	if (_kaio_ok) {		/* already initialized */
286 		lmutex_unlock(&__aio_initlock);
287 		return;
288 	}
289 	__aio_initbusy = 1;
290 	lmutex_unlock(&__aio_initlock);
291 
292 	if (_kaio_supported_init() != 0)
293 		error = ENOMEM;
294 	else if ((_kaiowp = _aio_worker_alloc()) == NULL)
295 		error = ENOMEM;
296 	else if ((error = (int)_kaio(AIOINIT)) == 0) {
297 		(void) pthread_sigmask(SIG_SETMASK, &maskset, &oset);
298 		error = thr_create(NULL, AIOSTKSIZE, _kaio_cleanup_thread,
299 		    _kaiowp, THR_DAEMON, &_kaiowp->work_tid);
300 		(void) pthread_sigmask(SIG_SETMASK, &oset, NULL);
301 	}
302 	if (error && _kaiowp != NULL) {
303 		_aio_worker_free(_kaiowp);
304 		_kaiowp = NULL;
305 	}
306 
307 	lmutex_lock(&__aio_initlock);
308 	if (error)
309 		_kaio_ok = -1;
310 	else
311 		_kaio_ok = 1;
312 	__aio_initbusy = 0;
313 	(void) cond_broadcast(&__aio_initcv);
314 	lmutex_unlock(&__aio_initlock);
315 }
316 
317 int
aioread(int fd,caddr_t buf,int bufsz,off_t offset,int whence,aio_result_t * resultp)318 aioread(int fd, caddr_t buf, int bufsz, off_t offset, int whence,
319     aio_result_t *resultp)
320 {
321 	return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOREAD));
322 }
323 
324 int
aiowrite(int fd,caddr_t buf,int bufsz,off_t offset,int whence,aio_result_t * resultp)325 aiowrite(int fd, caddr_t buf, int bufsz, off_t offset, int whence,
326     aio_result_t *resultp)
327 {
328 	return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOWRITE));
329 }
330 
331 #if !defined(_LP64)
332 int
aioread64(int fd,caddr_t buf,int bufsz,off64_t offset,int whence,aio_result_t * resultp)333 aioread64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence,
334     aio_result_t *resultp)
335 {
336 	return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAREAD64));
337 }
338 
339 int
aiowrite64(int fd,caddr_t buf,int bufsz,off64_t offset,int whence,aio_result_t * resultp)340 aiowrite64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence,
341     aio_result_t *resultp)
342 {
343 	return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAWRITE64));
344 }
345 #endif	/* !defined(_LP64) */
346 
347 int
_aiorw(int fd,caddr_t buf,int bufsz,offset_t offset,int whence,aio_result_t * resultp,int mode)348 _aiorw(int fd, caddr_t buf, int bufsz, offset_t offset, int whence,
349     aio_result_t *resultp, int mode)
350 {
351 	aio_req_t *reqp;
352 	aio_args_t *ap;
353 	offset_t loffset;
354 	struct stat64 stat64;
355 	int error = 0;
356 	int kerr;
357 	int umode;
358 
359 	switch (whence) {
360 
361 	case SEEK_SET:
362 		loffset = offset;
363 		break;
364 	case SEEK_CUR:
365 		if ((loffset = llseek(fd, 0, SEEK_CUR)) == -1)
366 			error = -1;
367 		else
368 			loffset += offset;
369 		break;
370 	case SEEK_END:
371 		if (fstat64(fd, &stat64) == -1)
372 			error = -1;
373 		else
374 			loffset = offset + stat64.st_size;
375 		break;
376 	default:
377 		errno = EINVAL;
378 		error = -1;
379 	}
380 
381 	if (error)
382 		return (error);
383 
384 	/* initialize kaio */
385 	if (!_kaio_ok)
386 		_kaio_init();
387 
388 	/*
389 	 * _aio_do_request() needs the original request code (mode) to be able
390 	 * to choose the appropiate 32/64 bit function.  All other functions
391 	 * only require the difference between READ and WRITE (umode).
392 	 */
393 	if (mode == AIOAREAD64 || mode == AIOAWRITE64)
394 		umode = mode - AIOAREAD64;
395 	else
396 		umode = mode;
397 
398 	/*
399 	 * Try kernel aio first.
400 	 * If errno is ENOTSUP/EBADFD, fall back to the thread implementation.
401 	 */
402 	if (_kaio_ok > 0 && KAIO_SUPPORTED(fd)) {
403 		resultp->aio_errno = 0;
404 		sig_mutex_lock(&__aio_mutex);
405 		_kaio_outstand_cnt++;
406 		sig_mutex_unlock(&__aio_mutex);
407 		kerr = (int)_kaio(((resultp->aio_return == AIO_INPROGRESS) ?
408 		    (umode | AIO_POLL_BIT) : umode),
409 		    fd, buf, bufsz, loffset, resultp);
410 		if (kerr == 0) {
411 			return (0);
412 		}
413 		sig_mutex_lock(&__aio_mutex);
414 		_kaio_outstand_cnt--;
415 		sig_mutex_unlock(&__aio_mutex);
416 		if (errno != ENOTSUP && errno != EBADFD)
417 			return (-1);
418 		if (errno == EBADFD)
419 			SET_KAIO_NOT_SUPPORTED(fd);
420 	}
421 
422 	if (!__uaio_ok && __uaio_init() == -1)
423 		return (-1);
424 
425 	if ((reqp = _aio_req_alloc()) == NULL) {
426 		errno = EAGAIN;
427 		return (-1);
428 	}
429 
430 	/*
431 	 * _aio_do_request() checks reqp->req_op to differentiate
432 	 * between 32 and 64 bit access.
433 	 */
434 	reqp->req_op = mode;
435 	reqp->req_resultp = resultp;
436 	ap = &reqp->req_args;
437 	ap->fd = fd;
438 	ap->buf = buf;
439 	ap->bufsz = bufsz;
440 	ap->offset = loffset;
441 
442 	if (_aio_hash_insert(resultp, reqp) != 0) {
443 		_aio_req_free(reqp);
444 		errno = EINVAL;
445 		return (-1);
446 	}
447 	/*
448 	 * _aio_req_add() only needs the difference between READ and
449 	 * WRITE to choose the right worker queue.
450 	 */
451 	_aio_req_add(reqp, &__nextworker_rw, umode);
452 	return (0);
453 }
454 
455 int
aiocancel(aio_result_t * resultp)456 aiocancel(aio_result_t *resultp)
457 {
458 	aio_req_t *reqp;
459 	aio_worker_t *aiowp;
460 	int ret;
461 	int done = 0;
462 	int canceled = 0;
463 
464 	if (!__uaio_ok) {
465 		errno = EINVAL;
466 		return (-1);
467 	}
468 
469 	sig_mutex_lock(&__aio_mutex);
470 	reqp = _aio_hash_find(resultp);
471 	if (reqp == NULL) {
472 		if (_aio_outstand_cnt == _aio_req_done_cnt)
473 			errno = EINVAL;
474 		else
475 			errno = EACCES;
476 		ret = -1;
477 	} else {
478 		aiowp = reqp->req_worker;
479 		sig_mutex_lock(&aiowp->work_qlock1);
480 		(void) _aio_cancel_req(aiowp, reqp, &canceled, &done);
481 		sig_mutex_unlock(&aiowp->work_qlock1);
482 
483 		if (canceled) {
484 			ret = 0;
485 		} else {
486 			if (_aio_outstand_cnt == 0 ||
487 			    _aio_outstand_cnt == _aio_req_done_cnt)
488 				errno = EINVAL;
489 			else
490 				errno = EACCES;
491 			ret = -1;
492 		}
493 	}
494 	sig_mutex_unlock(&__aio_mutex);
495 	return (ret);
496 }
497 
498 /* ARGSUSED */
499 static void
_aiowait_cleanup(void * arg)500 _aiowait_cleanup(void *arg)
501 {
502 	sig_mutex_lock(&__aio_mutex);
503 	_aiowait_flag--;
504 	sig_mutex_unlock(&__aio_mutex);
505 }
506 
507 /*
508  * This must be asynch safe and cancel safe
509  */
510 aio_result_t *
aiowait(struct timeval * uwait)511 aiowait(struct timeval *uwait)
512 {
513 	aio_result_t *uresultp;
514 	aio_result_t *kresultp;
515 	aio_result_t *resultp;
516 	int dontblock;
517 	int timedwait = 0;
518 	int kaio_errno = 0;
519 	struct timeval twait;
520 	struct timeval *wait = NULL;
521 	hrtime_t hrtend;
522 	hrtime_t hres;
523 
524 	if (uwait) {
525 		/*
526 		 * Check for a valid specified wait time.
527 		 * If it is invalid, fail the call right away.
528 		 */
529 		if (uwait->tv_sec < 0 || uwait->tv_usec < 0 ||
530 		    uwait->tv_usec >= MICROSEC) {
531 			errno = EINVAL;
532 			return ((aio_result_t *)-1);
533 		}
534 
535 		if (uwait->tv_sec > 0 || uwait->tv_usec > 0) {
536 			hrtend = gethrtime() +
537 			    (hrtime_t)uwait->tv_sec * NANOSEC +
538 			    (hrtime_t)uwait->tv_usec * (NANOSEC / MICROSEC);
539 			twait = *uwait;
540 			wait = &twait;
541 			timedwait++;
542 		} else {
543 			/* polling */
544 			sig_mutex_lock(&__aio_mutex);
545 			if (_kaio_outstand_cnt == 0) {
546 				kresultp = (aio_result_t *)-1;
547 			} else {
548 				kresultp = (aio_result_t *)_kaio(AIOWAIT,
549 				    (struct timeval *)-1, 1);
550 				if (kresultp != (aio_result_t *)-1 &&
551 				    kresultp != NULL &&
552 				    kresultp != (aio_result_t *)1) {
553 					_kaio_outstand_cnt--;
554 					sig_mutex_unlock(&__aio_mutex);
555 					return (kresultp);
556 				}
557 			}
558 			uresultp = _aio_req_done();
559 			sig_mutex_unlock(&__aio_mutex);
560 			if (uresultp != NULL &&
561 			    uresultp != (aio_result_t *)-1) {
562 				return (uresultp);
563 			}
564 			if (uresultp == (aio_result_t *)-1 &&
565 			    kresultp == (aio_result_t *)-1) {
566 				errno = EINVAL;
567 				return ((aio_result_t *)-1);
568 			} else {
569 				return (NULL);
570 			}
571 		}
572 	}
573 
574 	for (;;) {
575 		sig_mutex_lock(&__aio_mutex);
576 		uresultp = _aio_req_done();
577 		if (uresultp != NULL && uresultp != (aio_result_t *)-1) {
578 			sig_mutex_unlock(&__aio_mutex);
579 			resultp = uresultp;
580 			break;
581 		}
582 		_aiowait_flag++;
583 		dontblock = (uresultp == (aio_result_t *)-1);
584 		if (dontblock && _kaio_outstand_cnt == 0) {
585 			kresultp = (aio_result_t *)-1;
586 			kaio_errno = EINVAL;
587 		} else {
588 			sig_mutex_unlock(&__aio_mutex);
589 			pthread_cleanup_push(_aiowait_cleanup, NULL);
590 			_cancel_prologue();
591 			kresultp = (aio_result_t *)_kaio(AIOWAIT,
592 			    wait, dontblock);
593 			_cancel_epilogue();
594 			pthread_cleanup_pop(0);
595 			sig_mutex_lock(&__aio_mutex);
596 			kaio_errno = errno;
597 		}
598 		_aiowait_flag--;
599 		sig_mutex_unlock(&__aio_mutex);
600 		if (kresultp == (aio_result_t *)1) {
601 			/* aiowait() awakened by an aionotify() */
602 			continue;
603 		} else if (kresultp != NULL &&
604 		    kresultp != (aio_result_t *)-1) {
605 			resultp = kresultp;
606 			sig_mutex_lock(&__aio_mutex);
607 			_kaio_outstand_cnt--;
608 			sig_mutex_unlock(&__aio_mutex);
609 			break;
610 		} else if (kresultp == (aio_result_t *)-1 &&
611 		    kaio_errno == EINVAL &&
612 		    uresultp == (aio_result_t *)-1) {
613 			errno = kaio_errno;
614 			resultp = (aio_result_t *)-1;
615 			break;
616 		} else if (kresultp == (aio_result_t *)-1 &&
617 		    kaio_errno == EINTR) {
618 			errno = kaio_errno;
619 			resultp = (aio_result_t *)-1;
620 			break;
621 		} else if (timedwait) {
622 			hres = hrtend - gethrtime();
623 			if (hres <= 0) {
624 				/* time is up; return */
625 				resultp = NULL;
626 				break;
627 			} else {
628 				/*
629 				 * Some time left.  Round up the remaining time
630 				 * in nanoseconds to microsec.  Retry the call.
631 				 */
632 				hres += (NANOSEC / MICROSEC) - 1;
633 				wait->tv_sec = hres / NANOSEC;
634 				wait->tv_usec =
635 				    (hres % NANOSEC) / (NANOSEC / MICROSEC);
636 			}
637 		} else {
638 			ASSERT(kresultp == NULL && uresultp == NULL);
639 			resultp = NULL;
640 			continue;
641 		}
642 	}
643 	return (resultp);
644 }
645 
646 /*
647  * _aio_get_timedelta calculates the remaining time and stores the result
648  * into timespec_t *wait.
649  */
650 
651 int
_aio_get_timedelta(timespec_t * end,timespec_t * wait)652 _aio_get_timedelta(timespec_t *end, timespec_t *wait)
653 {
654 	int	ret = 0;
655 	struct	timeval cur;
656 	timespec_t curtime;
657 
658 	(void) gettimeofday(&cur, NULL);
659 	curtime.tv_sec = cur.tv_sec;
660 	curtime.tv_nsec = cur.tv_usec * 1000;   /* convert us to ns */
661 
662 	if (end->tv_sec >= curtime.tv_sec) {
663 		wait->tv_sec = end->tv_sec - curtime.tv_sec;
664 		if (end->tv_nsec >= curtime.tv_nsec) {
665 			wait->tv_nsec = end->tv_nsec - curtime.tv_nsec;
666 			if (wait->tv_sec == 0 && wait->tv_nsec == 0)
667 				ret = -1;	/* timer expired */
668 		} else {
669 			if (end->tv_sec > curtime.tv_sec) {
670 				wait->tv_sec -= 1;
671 				wait->tv_nsec = NANOSEC -
672 				    (curtime.tv_nsec - end->tv_nsec);
673 			} else {
674 				ret = -1;	/* timer expired */
675 			}
676 		}
677 	} else {
678 		ret = -1;
679 	}
680 	return (ret);
681 }
682 
683 /*
684  * If closing by file descriptor: we will simply cancel all the outstanding
685  * aio`s and return.  Those aio's in question will have either noticed the
686  * cancellation notice before, during, or after initiating io.
687  */
688 int
aiocancel_all(int fd)689 aiocancel_all(int fd)
690 {
691 	aio_req_t *reqp;
692 	aio_req_t **reqpp, *last;
693 	aio_worker_t *first;
694 	aio_worker_t *next;
695 	int canceled = 0;
696 	int done = 0;
697 	int cancelall = 0;
698 
699 	sig_mutex_lock(&__aio_mutex);
700 
701 	if (_aio_outstand_cnt == 0) {
702 		sig_mutex_unlock(&__aio_mutex);
703 		return (AIO_ALLDONE);
704 	}
705 
706 	/*
707 	 * Cancel requests from the read/write workers' queues.
708 	 */
709 	first = __nextworker_rw;
710 	next = first;
711 	do {
712 		_aio_cancel_work(next, fd, &canceled, &done);
713 	} while ((next = next->work_forw) != first);
714 
715 	/*
716 	 * finally, check if there are requests on the done queue that
717 	 * should be canceled.
718 	 */
719 	if (fd < 0)
720 		cancelall = 1;
721 	reqpp = &_aio_done_tail;
722 	last = _aio_done_tail;
723 	while ((reqp = *reqpp) != NULL) {
724 		if (cancelall || reqp->req_args.fd == fd) {
725 			*reqpp = reqp->req_next;
726 			if (last == reqp) {
727 				last = reqp->req_next;
728 			}
729 			if (_aio_done_head == reqp) {
730 				/* this should be the last req in list */
731 				_aio_done_head = last;
732 			}
733 			_aio_donecnt--;
734 			_aio_set_result(reqp, -1, ECANCELED);
735 			(void) _aio_hash_del(reqp->req_resultp);
736 			_aio_req_free(reqp);
737 		} else {
738 			reqpp = &reqp->req_next;
739 			last = reqp;
740 		}
741 	}
742 
743 	if (cancelall) {
744 		ASSERT(_aio_donecnt == 0);
745 		_aio_done_head = NULL;
746 	}
747 	sig_mutex_unlock(&__aio_mutex);
748 
749 	if (canceled && done == 0)
750 		return (AIO_CANCELED);
751 	else if (done && canceled == 0)
752 		return (AIO_ALLDONE);
753 	else if ((canceled + done == 0) && KAIO_SUPPORTED(fd))
754 		return ((int)_kaio(AIOCANCEL, fd, NULL));
755 	return (AIO_NOTCANCELED);
756 }
757 
758 /*
759  * Cancel requests from a given work queue.  If the file descriptor
760  * parameter, fd, is non-negative, then only cancel those requests
761  * in this queue that are to this file descriptor.  If the fd
762  * parameter is -1, then cancel all requests.
763  */
764 static void
_aio_cancel_work(aio_worker_t * aiowp,int fd,int * canceled,int * done)765 _aio_cancel_work(aio_worker_t *aiowp, int fd, int *canceled, int *done)
766 {
767 	aio_req_t *reqp;
768 
769 	sig_mutex_lock(&aiowp->work_qlock1);
770 	/*
771 	 * cancel queued requests first.
772 	 */
773 	reqp = aiowp->work_tail1;
774 	while (reqp != NULL) {
775 		if (fd < 0 || reqp->req_args.fd == fd) {
776 			if (_aio_cancel_req(aiowp, reqp, canceled, done)) {
777 				/*
778 				 * Callers locks were dropped.
779 				 * reqp is invalid; start traversing
780 				 * the list from the beginning again.
781 				 */
782 				reqp = aiowp->work_tail1;
783 				continue;
784 			}
785 		}
786 		reqp = reqp->req_next;
787 	}
788 	/*
789 	 * Since the queued requests have been canceled, there can
790 	 * only be one inprogress request that should be canceled.
791 	 */
792 	if ((reqp = aiowp->work_req) != NULL &&
793 	    (fd < 0 || reqp->req_args.fd == fd))
794 		(void) _aio_cancel_req(aiowp, reqp, canceled, done);
795 	sig_mutex_unlock(&aiowp->work_qlock1);
796 }
797 
798 /*
799  * Cancel a request.  Return 1 if the callers locks were temporarily
800  * dropped, otherwise return 0.
801  */
802 int
_aio_cancel_req(aio_worker_t * aiowp,aio_req_t * reqp,int * canceled,int * done)803 _aio_cancel_req(aio_worker_t *aiowp, aio_req_t *reqp, int *canceled, int *done)
804 {
805 	int ostate = reqp->req_state;
806 
807 	ASSERT(MUTEX_HELD(&__aio_mutex));
808 	ASSERT(MUTEX_HELD(&aiowp->work_qlock1));
809 	if (ostate == AIO_REQ_CANCELED)
810 		return (0);
811 	if (ostate == AIO_REQ_DONE && !POSIX_AIO(reqp) &&
812 	    aiowp->work_prev1 == reqp) {
813 		ASSERT(aiowp->work_done1 != 0);
814 		/*
815 		 * If not on the done queue yet, just mark it CANCELED,
816 		 * _aio_work_done() will do the necessary clean up.
817 		 * This is required to ensure that aiocancel_all() cancels
818 		 * all the outstanding requests, including this one which
819 		 * is not yet on done queue but has been marked done.
820 		 */
821 		_aio_set_result(reqp, -1, ECANCELED);
822 		(void) _aio_hash_del(reqp->req_resultp);
823 		reqp->req_state = AIO_REQ_CANCELED;
824 		(*canceled)++;
825 		return (0);
826 	}
827 
828 	if (ostate == AIO_REQ_DONE || ostate == AIO_REQ_DONEQ) {
829 		(*done)++;
830 		return (0);
831 	}
832 	if (reqp->req_op == AIOFSYNC && reqp != aiowp->work_req) {
833 		ASSERT(POSIX_AIO(reqp));
834 		/* Cancel the queued aio_fsync() request */
835 		if (!reqp->req_head->lio_canned) {
836 			reqp->req_head->lio_canned = 1;
837 			_aio_outstand_cnt--;
838 			(*canceled)++;
839 		}
840 		return (0);
841 	}
842 	reqp->req_state = AIO_REQ_CANCELED;
843 	_aio_req_del(aiowp, reqp, ostate);
844 	(void) _aio_hash_del(reqp->req_resultp);
845 	(*canceled)++;
846 	if (reqp == aiowp->work_req) {
847 		ASSERT(ostate == AIO_REQ_INPROGRESS);
848 		/*
849 		 * Set the result values now, before _aiodone() is called.
850 		 * We do this because the application can expect aio_return
851 		 * and aio_errno to be set to -1 and ECANCELED, respectively,
852 		 * immediately after a successful return from aiocancel()
853 		 * or aio_cancel().
854 		 */
855 		_aio_set_result(reqp, -1, ECANCELED);
856 		(void) thr_kill(aiowp->work_tid, SIGAIOCANCEL);
857 		return (0);
858 	}
859 	if (!POSIX_AIO(reqp)) {
860 		_aio_outstand_cnt--;
861 		_aio_set_result(reqp, -1, ECANCELED);
862 		_aio_req_free(reqp);
863 		return (0);
864 	}
865 	sig_mutex_unlock(&aiowp->work_qlock1);
866 	sig_mutex_unlock(&__aio_mutex);
867 	_aiodone(reqp, -1, ECANCELED);
868 	sig_mutex_lock(&__aio_mutex);
869 	sig_mutex_lock(&aiowp->work_qlock1);
870 	return (1);
871 }
872 
873 int
_aio_create_worker(aio_req_t * reqp,int mode)874 _aio_create_worker(aio_req_t *reqp, int mode)
875 {
876 	aio_worker_t *aiowp, **workers, **nextworker;
877 	int *aio_workerscnt;
878 	void *(*func)(void *);
879 	sigset_t oset;
880 	int error;
881 
882 	/*
883 	 * Put the new worker thread in the right queue.
884 	 */
885 	switch (mode) {
886 	case AIOREAD:
887 	case AIOWRITE:
888 	case AIOAREAD:
889 	case AIOAWRITE:
890 #if !defined(_LP64)
891 	case AIOAREAD64:
892 	case AIOAWRITE64:
893 #endif
894 		workers = &__workers_rw;
895 		nextworker = &__nextworker_rw;
896 		aio_workerscnt = &__rw_workerscnt;
897 		func = _aio_do_request;
898 		break;
899 	case AIONOTIFY:
900 		workers = &__workers_no;
901 		nextworker = &__nextworker_no;
902 		func = _aio_do_notify;
903 		aio_workerscnt = &__no_workerscnt;
904 		break;
905 	default:
906 		aio_panic("_aio_create_worker: invalid mode");
907 		break;
908 	}
909 
910 	if ((aiowp = _aio_worker_alloc()) == NULL)
911 		return (-1);
912 
913 	if (reqp) {
914 		reqp->req_state = AIO_REQ_QUEUED;
915 		reqp->req_worker = aiowp;
916 		aiowp->work_head1 = reqp;
917 		aiowp->work_tail1 = reqp;
918 		aiowp->work_next1 = reqp;
919 		aiowp->work_count1 = 1;
920 		aiowp->work_minload1 = 1;
921 	}
922 
923 	(void) pthread_sigmask(SIG_SETMASK, &maskset, &oset);
924 	error = thr_create(NULL, AIOSTKSIZE, func, aiowp,
925 	    THR_DAEMON | THR_SUSPENDED, &aiowp->work_tid);
926 	(void) pthread_sigmask(SIG_SETMASK, &oset, NULL);
927 	if (error) {
928 		if (reqp) {
929 			reqp->req_state = 0;
930 			reqp->req_worker = NULL;
931 		}
932 		_aio_worker_free(aiowp);
933 		return (-1);
934 	}
935 
936 	lmutex_lock(&__aio_mutex);
937 	(*aio_workerscnt)++;
938 	if (*workers == NULL) {
939 		aiowp->work_forw = aiowp;
940 		aiowp->work_backw = aiowp;
941 		*nextworker = aiowp;
942 		*workers = aiowp;
943 	} else {
944 		aiowp->work_backw = (*workers)->work_backw;
945 		aiowp->work_forw = (*workers);
946 		(*workers)->work_backw->work_forw = aiowp;
947 		(*workers)->work_backw = aiowp;
948 	}
949 	_aio_worker_cnt++;
950 	lmutex_unlock(&__aio_mutex);
951 
952 	(void) thr_continue(aiowp->work_tid);
953 
954 	return (0);
955 }
956 
957 /*
958  * This is the worker's main routine.
959  * The task of this function is to execute all queued requests;
960  * once the last pending request is executed this function will block
961  * in _aio_idle().  A new incoming request must wakeup this thread to
962  * restart the work.
963  * Every worker has an own work queue.  The queue lock is required
964  * to synchronize the addition of new requests for this worker or
965  * cancellation of pending/running requests.
966  *
967  * Cancellation scenarios:
968  * The cancellation of a request is being done asynchronously using
969  * _aio_cancel_req() from another thread context.
970  * A queued request can be cancelled in different manners :
971  * a) request is queued but not "in progress" or "done" (AIO_REQ_QUEUED):
972  *	- lock the queue -> remove the request -> unlock the queue
973  *	- this function/thread does not detect this cancellation process
974  * b) request is in progress (AIO_REQ_INPROGRESS) :
975  *	- this function first allow the cancellation of the running
976  *	  request with the flag "work_cancel_flg=1"
977  * 		see _aio_req_get() -> _aio_cancel_on()
978  *	  During this phase, it is allowed to interrupt the worker
979  *	  thread running the request (this thread) using the SIGAIOCANCEL
980  *	  signal.
981  *	  Once this thread returns from the kernel (because the request
982  *	  is just done), then it must disable a possible cancellation
983  *	  and proceed to finish the request.  To disable the cancellation
984  *	  this thread must use _aio_cancel_off() to set "work_cancel_flg=0".
985  * c) request is already done (AIO_REQ_DONE || AIO_REQ_DONEQ):
986  *	  same procedure as in a)
987  *
988  * To b)
989  *	This thread uses sigsetjmp() to define the position in the code, where
990  *	it wish to continue working in the case that a SIGAIOCANCEL signal
991  *	is detected.
992  *	Normally this thread should get the cancellation signal during the
993  *	kernel phase (reading or writing).  In that case the signal handler
994  *	aiosigcancelhndlr() is activated using the worker thread context,
995  *	which again will use the siglongjmp() function to break the standard
996  *	code flow and jump to the "sigsetjmp" position, provided that
997  *	"work_cancel_flg" is set to "1".
998  *	Because the "work_cancel_flg" is only manipulated by this worker
999  *	thread and it can only run on one CPU at a given time, it is not
1000  *	necessary to protect that flag with the queue lock.
1001  *	Returning from the kernel (read or write system call) we must
1002  *	first disable the use of the SIGAIOCANCEL signal and accordingly
1003  *	the use of the siglongjmp() function to prevent a possible deadlock:
1004  *	- It can happens that this worker thread returns from the kernel and
1005  *	  blocks in "work_qlock1",
1006  *	- then a second thread cancels the apparently "in progress" request
1007  *	  and sends the SIGAIOCANCEL signal to the worker thread,
1008  *	- the worker thread gets assigned the "work_qlock1" and will returns
1009  *	  from the kernel,
1010  *	- the kernel detects the pending signal and activates the signal
1011  *	  handler instead,
1012  *	- if the "work_cancel_flg" is still set then the signal handler
1013  *	  should use siglongjmp() to cancel the "in progress" request and
1014  *	  it would try to acquire the same work_qlock1 in _aio_req_get()
1015  *	  for a second time => deadlock.
1016  *	To avoid that situation we disable the cancellation of the request
1017  *	in progress BEFORE we try to acquire the work_qlock1.
1018  *	In that case the signal handler will not call siglongjmp() and the
1019  *	worker thread will continue running the standard code flow.
1020  *	Then this thread must check the AIO_REQ_CANCELED flag to emulate
1021  *	an eventually required siglongjmp() freeing the work_qlock1 and
1022  *	avoiding a deadlock.
1023  */
1024 void *
_aio_do_request(void * arglist)1025 _aio_do_request(void *arglist)
1026 {
1027 	aio_worker_t *aiowp = (aio_worker_t *)arglist;
1028 	ulwp_t *self = curthread;
1029 	struct aio_args *arg;
1030 	aio_req_t *reqp;		/* current AIO request */
1031 	ssize_t retval;
1032 	int append;
1033 	int error;
1034 
1035 	if (pthread_setspecific(_aio_key, aiowp) != 0)
1036 		aio_panic("_aio_do_request, pthread_setspecific()");
1037 	(void) pthread_sigmask(SIG_SETMASK, &_worker_set, NULL);
1038 	ASSERT(aiowp->work_req == NULL);
1039 
1040 	/*
1041 	 * We resume here when an operation is cancelled.
1042 	 * On first entry, aiowp->work_req == NULL, so all
1043 	 * we do is block SIGAIOCANCEL.
1044 	 */
1045 	(void) sigsetjmp(aiowp->work_jmp_buf, 0);
1046 	ASSERT(self->ul_sigdefer == 0);
1047 
1048 	sigoff(self);	/* block SIGAIOCANCEL */
1049 	if (aiowp->work_req != NULL)
1050 		_aio_finish_request(aiowp, -1, ECANCELED);
1051 
1052 	for (;;) {
1053 		/*
1054 		 * Put completed requests on aio_done_list.  This has
1055 		 * to be done as part of the main loop to ensure that
1056 		 * we don't artificially starve any aiowait'ers.
1057 		 */
1058 		if (aiowp->work_done1)
1059 			_aio_work_done(aiowp);
1060 
1061 top:
1062 		/* consume any deferred SIGAIOCANCEL signal here */
1063 		sigon(self);
1064 		sigoff(self);
1065 
1066 		while ((reqp = _aio_req_get(aiowp)) == NULL) {
1067 			if (_aio_idle(aiowp) != 0)
1068 				goto top;
1069 		}
1070 		arg = &reqp->req_args;
1071 		ASSERT(reqp->req_state == AIO_REQ_INPROGRESS ||
1072 		    reqp->req_state == AIO_REQ_CANCELED);
1073 		error = 0;
1074 
1075 		switch (reqp->req_op) {
1076 		case AIOREAD:
1077 		case AIOAREAD:
1078 			sigon(self);	/* unblock SIGAIOCANCEL */
1079 			retval = pread(arg->fd, arg->buf,
1080 			    arg->bufsz, arg->offset);
1081 			if (retval == -1) {
1082 				if (errno == ESPIPE) {
1083 					retval = read(arg->fd,
1084 					    arg->buf, arg->bufsz);
1085 					if (retval == -1)
1086 						error = errno;
1087 				} else {
1088 					error = errno;
1089 				}
1090 			}
1091 			sigoff(self);	/* block SIGAIOCANCEL */
1092 			break;
1093 		case AIOWRITE:
1094 		case AIOAWRITE:
1095 			/*
1096 			 * The SUSv3 POSIX spec for aio_write() states:
1097 			 *	If O_APPEND is set for the file descriptor,
1098 			 *	write operations append to the file in the
1099 			 *	same order as the calls were made.
1100 			 * but, somewhat inconsistently, it requires pwrite()
1101 			 * to ignore the O_APPEND setting.  So we have to use
1102 			 * fcntl() to get the open modes and call write() for
1103 			 * the O_APPEND case.
1104 			 */
1105 			append = (__fcntl(arg->fd, F_GETFL) & O_APPEND);
1106 			sigon(self);	/* unblock SIGAIOCANCEL */
1107 			retval = append?
1108 			    write(arg->fd, arg->buf, arg->bufsz) :
1109 			    pwrite(arg->fd, arg->buf, arg->bufsz,
1110 			    arg->offset);
1111 			if (retval == -1) {
1112 				if (errno == ESPIPE) {
1113 					retval = write(arg->fd,
1114 					    arg->buf, arg->bufsz);
1115 					if (retval == -1)
1116 						error = errno;
1117 				} else {
1118 					error = errno;
1119 				}
1120 			}
1121 			sigoff(self);	/* block SIGAIOCANCEL */
1122 			break;
1123 #if !defined(_LP64)
1124 		case AIOAREAD64:
1125 			sigon(self);	/* unblock SIGAIOCANCEL */
1126 			retval = pread64(arg->fd, arg->buf,
1127 			    arg->bufsz, arg->offset);
1128 			if (retval == -1) {
1129 				if (errno == ESPIPE) {
1130 					retval = read(arg->fd,
1131 					    arg->buf, arg->bufsz);
1132 					if (retval == -1)
1133 						error = errno;
1134 				} else {
1135 					error = errno;
1136 				}
1137 			}
1138 			sigoff(self);	/* block SIGAIOCANCEL */
1139 			break;
1140 		case AIOAWRITE64:
1141 			/*
1142 			 * The SUSv3 POSIX spec for aio_write() states:
1143 			 *	If O_APPEND is set for the file descriptor,
1144 			 *	write operations append to the file in the
1145 			 *	same order as the calls were made.
1146 			 * but, somewhat inconsistently, it requires pwrite()
1147 			 * to ignore the O_APPEND setting.  So we have to use
1148 			 * fcntl() to get the open modes and call write() for
1149 			 * the O_APPEND case.
1150 			 */
1151 			append = (__fcntl(arg->fd, F_GETFL) & O_APPEND);
1152 			sigon(self);	/* unblock SIGAIOCANCEL */
1153 			retval = append?
1154 			    write(arg->fd, arg->buf, arg->bufsz) :
1155 			    pwrite64(arg->fd, arg->buf, arg->bufsz,
1156 			    arg->offset);
1157 			if (retval == -1) {
1158 				if (errno == ESPIPE) {
1159 					retval = write(arg->fd,
1160 					    arg->buf, arg->bufsz);
1161 					if (retval == -1)
1162 						error = errno;
1163 				} else {
1164 					error = errno;
1165 				}
1166 			}
1167 			sigoff(self);	/* block SIGAIOCANCEL */
1168 			break;
1169 #endif	/* !defined(_LP64) */
1170 		case AIOFSYNC:
1171 			if (_aio_fsync_del(aiowp, reqp))
1172 				goto top;
1173 			ASSERT(reqp->req_head == NULL);
1174 			/*
1175 			 * All writes for this fsync request are now
1176 			 * acknowledged.  Now make these writes visible
1177 			 * and put the final request into the hash table.
1178 			 */
1179 			if (reqp->req_state == AIO_REQ_CANCELED) {
1180 				/* EMPTY */;
1181 			} else if (arg->offset == O_SYNC) {
1182 				if ((retval = __fdsync(arg->fd, FSYNC)) == -1)
1183 					error = errno;
1184 			} else {
1185 				if ((retval = __fdsync(arg->fd, FDSYNC)) == -1)
1186 					error = errno;
1187 			}
1188 			if (_aio_hash_insert(reqp->req_resultp, reqp) != 0)
1189 				aio_panic("_aio_do_request(): AIOFSYNC: "
1190 				    "request already in hash table");
1191 			break;
1192 		default:
1193 			aio_panic("_aio_do_request, bad op");
1194 		}
1195 
1196 		_aio_finish_request(aiowp, retval, error);
1197 	}
1198 	/* NOTREACHED */
1199 	return (NULL);
1200 }
1201 
1202 /*
1203  * Perform the tail processing for _aio_do_request().
1204  * The in-progress request may or may not have been cancelled.
1205  */
1206 static void
_aio_finish_request(aio_worker_t * aiowp,ssize_t retval,int error)1207 _aio_finish_request(aio_worker_t *aiowp, ssize_t retval, int error)
1208 {
1209 	aio_req_t *reqp;
1210 
1211 	sig_mutex_lock(&aiowp->work_qlock1);
1212 	if ((reqp = aiowp->work_req) == NULL)
1213 		sig_mutex_unlock(&aiowp->work_qlock1);
1214 	else {
1215 		aiowp->work_req = NULL;
1216 		if (reqp->req_state == AIO_REQ_CANCELED) {
1217 			retval = -1;
1218 			error = ECANCELED;
1219 		}
1220 		if (!POSIX_AIO(reqp)) {
1221 			int notify;
1222 			if (reqp->req_state == AIO_REQ_INPROGRESS) {
1223 				reqp->req_state = AIO_REQ_DONE;
1224 				_aio_set_result(reqp, retval, error);
1225 			}
1226 			sig_mutex_unlock(&aiowp->work_qlock1);
1227 			sig_mutex_lock(&__aio_mutex);
1228 			/*
1229 			 * If it was canceled, this request will not be
1230 			 * added to done list. Just free it.
1231 			 */
1232 			if (error == ECANCELED) {
1233 				_aio_outstand_cnt--;
1234 				_aio_req_free(reqp);
1235 			} else {
1236 				_aio_req_done_cnt++;
1237 			}
1238 			/*
1239 			 * Notify any thread that may have blocked
1240 			 * because it saw an outstanding request.
1241 			 */
1242 			notify = 0;
1243 			if (_aio_outstand_cnt == 0 && _aiowait_flag) {
1244 				notify = 1;
1245 			}
1246 			sig_mutex_unlock(&__aio_mutex);
1247 			if (notify) {
1248 				(void) _kaio(AIONOTIFY);
1249 			}
1250 		} else {
1251 			if (reqp->req_state == AIO_REQ_INPROGRESS)
1252 				reqp->req_state = AIO_REQ_DONE;
1253 			sig_mutex_unlock(&aiowp->work_qlock1);
1254 			_aiodone(reqp, retval, error);
1255 		}
1256 	}
1257 }
1258 
1259 void
_aio_req_mark_done(aio_req_t * reqp)1260 _aio_req_mark_done(aio_req_t *reqp)
1261 {
1262 #if !defined(_LP64)
1263 	if (reqp->req_largefile)
1264 		((aiocb64_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE;
1265 	else
1266 #endif
1267 		((aiocb_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE;
1268 }
1269 
1270 /*
1271  * Sleep for 'ticks' clock ticks to give somebody else a chance to run,
1272  * hopefully to consume one of our queued signals.
1273  */
1274 static void
_aio_delay(int ticks)1275 _aio_delay(int ticks)
1276 {
1277 	(void) usleep(ticks * (MICROSEC / hz));
1278 }
1279 
1280 /*
1281  * Actually send the notifications.
1282  * We could block indefinitely here if the application
1283  * is not listening for the signal or port notifications.
1284  */
1285 static void
send_notification(notif_param_t * npp)1286 send_notification(notif_param_t *npp)
1287 {
1288 	extern int __sigqueue(pid_t pid, int signo,
1289 	    /* const union sigval */ void *value, int si_code, int block);
1290 
1291 	if (npp->np_signo)
1292 		(void) __sigqueue(__pid, npp->np_signo, npp->np_user,
1293 		    SI_ASYNCIO, 1);
1294 	else if (npp->np_port >= 0)
1295 		(void) _port_dispatch(npp->np_port, 0, PORT_SOURCE_AIO,
1296 		    npp->np_event, npp->np_object, npp->np_user);
1297 
1298 	if (npp->np_lio_signo)
1299 		(void) __sigqueue(__pid, npp->np_lio_signo, npp->np_lio_user,
1300 		    SI_ASYNCIO, 1);
1301 	else if (npp->np_lio_port >= 0)
1302 		(void) _port_dispatch(npp->np_lio_port, 0, PORT_SOURCE_AIO,
1303 		    npp->np_lio_event, npp->np_lio_object, npp->np_lio_user);
1304 }
1305 
1306 /*
1307  * Asynchronous notification worker.
1308  */
1309 void *
_aio_do_notify(void * arg)1310 _aio_do_notify(void *arg)
1311 {
1312 	aio_worker_t *aiowp = (aio_worker_t *)arg;
1313 	aio_req_t *reqp;
1314 
1315 	/*
1316 	 * This isn't really necessary.  All signals are blocked.
1317 	 */
1318 	if (pthread_setspecific(_aio_key, aiowp) != 0)
1319 		aio_panic("_aio_do_notify, pthread_setspecific()");
1320 
1321 	/*
1322 	 * Notifications are never cancelled.
1323 	 * All signals remain blocked, forever.
1324 	 */
1325 	for (;;) {
1326 		while ((reqp = _aio_req_get(aiowp)) == NULL) {
1327 			if (_aio_idle(aiowp) != 0)
1328 				aio_panic("_aio_do_notify: _aio_idle() failed");
1329 		}
1330 		send_notification(&reqp->req_notify);
1331 		_aio_req_free(reqp);
1332 	}
1333 
1334 	/* NOTREACHED */
1335 	return (NULL);
1336 }
1337 
1338 /*
1339  * Do the completion semantics for a request that was either canceled
1340  * by _aio_cancel_req() or was completed by _aio_do_request().
1341  */
1342 static void
_aiodone(aio_req_t * reqp,ssize_t retval,int error)1343 _aiodone(aio_req_t *reqp, ssize_t retval, int error)
1344 {
1345 	aio_result_t *resultp = reqp->req_resultp;
1346 	int notify = 0;
1347 	aio_lio_t *head;
1348 	int sigev_none;
1349 	int sigev_signal;
1350 	int sigev_thread;
1351 	int sigev_port;
1352 	notif_param_t np;
1353 
1354 	/*
1355 	 * We call _aiodone() only for Posix I/O.
1356 	 */
1357 	ASSERT(POSIX_AIO(reqp));
1358 
1359 	sigev_none = 0;
1360 	sigev_signal = 0;
1361 	sigev_thread = 0;
1362 	sigev_port = 0;
1363 	np.np_signo = 0;
1364 	np.np_port = -1;
1365 	np.np_lio_signo = 0;
1366 	np.np_lio_port = -1;
1367 
1368 	switch (reqp->req_sigevent.sigev_notify) {
1369 	case SIGEV_NONE:
1370 		sigev_none = 1;
1371 		break;
1372 	case SIGEV_SIGNAL:
1373 		sigev_signal = 1;
1374 		break;
1375 	case SIGEV_THREAD:
1376 		sigev_thread = 1;
1377 		break;
1378 	case SIGEV_PORT:
1379 		sigev_port = 1;
1380 		break;
1381 	default:
1382 		aio_panic("_aiodone: improper sigev_notify");
1383 		break;
1384 	}
1385 
1386 	/*
1387 	 * Figure out the notification parameters while holding __aio_mutex.
1388 	 * Actually perform the notifications after dropping __aio_mutex.
1389 	 * This allows us to sleep for a long time (if the notifications
1390 	 * incur delays) without impeding other async I/O operations.
1391 	 */
1392 
1393 	sig_mutex_lock(&__aio_mutex);
1394 
1395 	if (sigev_signal) {
1396 		if ((np.np_signo = reqp->req_sigevent.sigev_signo) != 0)
1397 			notify = 1;
1398 		np.np_user = reqp->req_sigevent.sigev_value.sival_ptr;
1399 	} else if (sigev_thread | sigev_port) {
1400 		if ((np.np_port = reqp->req_sigevent.sigev_signo) >= 0)
1401 			notify = 1;
1402 		np.np_event = reqp->req_op;
1403 		if (np.np_event == AIOFSYNC && reqp->req_largefile)
1404 			np.np_event = AIOFSYNC64;
1405 		np.np_object = (uintptr_t)reqp->req_aiocbp;
1406 		np.np_user = reqp->req_sigevent.sigev_value.sival_ptr;
1407 	}
1408 
1409 	if (resultp->aio_errno == EINPROGRESS)
1410 		_aio_set_result(reqp, retval, error);
1411 
1412 	_aio_outstand_cnt--;
1413 
1414 	head = reqp->req_head;
1415 	reqp->req_head = NULL;
1416 
1417 	if (sigev_none) {
1418 		_aio_enq_doneq(reqp);
1419 		reqp = NULL;
1420 	} else {
1421 		(void) _aio_hash_del(resultp);
1422 		_aio_req_mark_done(reqp);
1423 	}
1424 
1425 	_aio_waitn_wakeup();
1426 
1427 	/*
1428 	 * __aio_waitn() sets AIO_WAIT_INPROGRESS and
1429 	 * __aio_suspend() increments "_aio_kernel_suspend"
1430 	 * when they are waiting in the kernel for completed I/Os.
1431 	 *
1432 	 * _kaio(AIONOTIFY) awakes the corresponding function
1433 	 * in the kernel; then the corresponding __aio_waitn() or
1434 	 * __aio_suspend() function could reap the recently
1435 	 * completed I/Os (_aiodone()).
1436 	 */
1437 	if ((_aio_flags & AIO_WAIT_INPROGRESS) || _aio_kernel_suspend > 0)
1438 		(void) _kaio(AIONOTIFY);
1439 
1440 	sig_mutex_unlock(&__aio_mutex);
1441 
1442 	if (head != NULL) {
1443 		/*
1444 		 * If all the lio requests have completed,
1445 		 * prepare to notify the waiting thread.
1446 		 */
1447 		sig_mutex_lock(&head->lio_mutex);
1448 		ASSERT(head->lio_refcnt == head->lio_nent);
1449 		if (head->lio_refcnt == 1) {
1450 			int waiting = 0;
1451 			if (head->lio_mode == LIO_WAIT) {
1452 				if ((waiting = head->lio_waiting) != 0)
1453 					(void) cond_signal(&head->lio_cond_cv);
1454 			} else if (head->lio_port < 0) { /* none or signal */
1455 				if ((np.np_lio_signo = head->lio_signo) != 0)
1456 					notify = 1;
1457 				np.np_lio_user = head->lio_sigval.sival_ptr;
1458 			} else {			/* thread or port */
1459 				notify = 1;
1460 				np.np_lio_port = head->lio_port;
1461 				np.np_lio_event = head->lio_event;
1462 				np.np_lio_object =
1463 				    (uintptr_t)head->lio_sigevent;
1464 				np.np_lio_user = head->lio_sigval.sival_ptr;
1465 			}
1466 			head->lio_nent = head->lio_refcnt = 0;
1467 			sig_mutex_unlock(&head->lio_mutex);
1468 			if (waiting == 0)
1469 				_aio_lio_free(head);
1470 		} else {
1471 			head->lio_nent--;
1472 			head->lio_refcnt--;
1473 			sig_mutex_unlock(&head->lio_mutex);
1474 		}
1475 	}
1476 
1477 	/*
1478 	 * The request is completed; now perform the notifications.
1479 	 */
1480 	if (notify) {
1481 		if (reqp != NULL) {
1482 			/*
1483 			 * We usually put the request on the notification
1484 			 * queue because we don't want to block and delay
1485 			 * other operations behind us in the work queue.
1486 			 * Also we must never block on a cancel notification
1487 			 * because we are being called from an application
1488 			 * thread in this case and that could lead to deadlock
1489 			 * if no other thread is receiving notificatins.
1490 			 */
1491 			reqp->req_notify = np;
1492 			reqp->req_op = AIONOTIFY;
1493 			_aio_req_add(reqp, &__workers_no, AIONOTIFY);
1494 			reqp = NULL;
1495 		} else {
1496 			/*
1497 			 * We already put the request on the done queue,
1498 			 * so we can't queue it to the notification queue.
1499 			 * Just do the notification directly.
1500 			 */
1501 			send_notification(&np);
1502 		}
1503 	}
1504 
1505 	if (reqp != NULL)
1506 		_aio_req_free(reqp);
1507 }
1508 
1509 /*
1510  * Delete fsync requests from list head until there is
1511  * only one left.  Return 0 when there is only one,
1512  * otherwise return a non-zero value.
1513  */
1514 static int
_aio_fsync_del(aio_worker_t * aiowp,aio_req_t * reqp)1515 _aio_fsync_del(aio_worker_t *aiowp, aio_req_t *reqp)
1516 {
1517 	aio_lio_t *head = reqp->req_head;
1518 	int rval = 0;
1519 
1520 	ASSERT(reqp == aiowp->work_req);
1521 	sig_mutex_lock(&aiowp->work_qlock1);
1522 	sig_mutex_lock(&head->lio_mutex);
1523 	if (head->lio_refcnt > 1) {
1524 		head->lio_refcnt--;
1525 		head->lio_nent--;
1526 		aiowp->work_req = NULL;
1527 		sig_mutex_unlock(&head->lio_mutex);
1528 		sig_mutex_unlock(&aiowp->work_qlock1);
1529 		sig_mutex_lock(&__aio_mutex);
1530 		_aio_outstand_cnt--;
1531 		_aio_waitn_wakeup();
1532 		sig_mutex_unlock(&__aio_mutex);
1533 		_aio_req_free(reqp);
1534 		return (1);
1535 	}
1536 	ASSERT(head->lio_nent == 1 && head->lio_refcnt == 1);
1537 	reqp->req_head = NULL;
1538 	if (head->lio_canned)
1539 		reqp->req_state = AIO_REQ_CANCELED;
1540 	if (head->lio_mode == LIO_DESTROY) {
1541 		aiowp->work_req = NULL;
1542 		rval = 1;
1543 	}
1544 	sig_mutex_unlock(&head->lio_mutex);
1545 	sig_mutex_unlock(&aiowp->work_qlock1);
1546 	head->lio_refcnt--;
1547 	head->lio_nent--;
1548 	_aio_lio_free(head);
1549 	if (rval != 0)
1550 		_aio_req_free(reqp);
1551 	return (rval);
1552 }
1553 
1554 /*
1555  * A worker is set idle when its work queue is empty.
1556  * The worker checks again that it has no more work
1557  * and then goes to sleep waiting for more work.
1558  */
1559 int
_aio_idle(aio_worker_t * aiowp)1560 _aio_idle(aio_worker_t *aiowp)
1561 {
1562 	int error = 0;
1563 
1564 	sig_mutex_lock(&aiowp->work_qlock1);
1565 	if (aiowp->work_count1 == 0) {
1566 		ASSERT(aiowp->work_minload1 == 0);
1567 		aiowp->work_idleflg = 1;
1568 		/*
1569 		 * A cancellation handler is not needed here.
1570 		 * aio worker threads are never cancelled via pthread_cancel().
1571 		 */
1572 		error = sig_cond_wait(&aiowp->work_idle_cv,
1573 		    &aiowp->work_qlock1);
1574 		/*
1575 		 * The idle flag is normally cleared before worker is awakened
1576 		 * by aio_req_add().  On error (EINTR), we clear it ourself.
1577 		 */
1578 		if (error)
1579 			aiowp->work_idleflg = 0;
1580 	}
1581 	sig_mutex_unlock(&aiowp->work_qlock1);
1582 	return (error);
1583 }
1584 
1585 /*
1586  * A worker's completed AIO requests are placed onto a global
1587  * done queue.  The application is only sent a SIGIO signal if
1588  * the process has a handler enabled and it is not waiting via
1589  * aiowait().
1590  */
1591 static void
_aio_work_done(aio_worker_t * aiowp)1592 _aio_work_done(aio_worker_t *aiowp)
1593 {
1594 	aio_req_t *reqp;
1595 
1596 	sig_mutex_lock(&__aio_mutex);
1597 	sig_mutex_lock(&aiowp->work_qlock1);
1598 	reqp = aiowp->work_prev1;
1599 	reqp->req_next = NULL;
1600 	aiowp->work_done1 = 0;
1601 	aiowp->work_tail1 = aiowp->work_next1;
1602 	if (aiowp->work_tail1 == NULL)
1603 		aiowp->work_head1 = NULL;
1604 	aiowp->work_prev1 = NULL;
1605 	_aio_outstand_cnt--;
1606 	_aio_req_done_cnt--;
1607 	if (reqp->req_state == AIO_REQ_CANCELED) {
1608 		/*
1609 		 * Request got cancelled after it was marked done. This can
1610 		 * happen because _aio_finish_request() marks it AIO_REQ_DONE
1611 		 * and drops all locks. Don't add the request to the done
1612 		 * queue and just discard it.
1613 		 */
1614 		sig_mutex_unlock(&aiowp->work_qlock1);
1615 		_aio_req_free(reqp);
1616 		if (_aio_outstand_cnt == 0 && _aiowait_flag) {
1617 			sig_mutex_unlock(&__aio_mutex);
1618 			(void) _kaio(AIONOTIFY);
1619 		} else {
1620 			sig_mutex_unlock(&__aio_mutex);
1621 		}
1622 		return;
1623 	}
1624 	sig_mutex_unlock(&aiowp->work_qlock1);
1625 	_aio_donecnt++;
1626 	ASSERT(_aio_donecnt > 0 &&
1627 	    _aio_outstand_cnt >= 0 &&
1628 	    _aio_req_done_cnt >= 0);
1629 	ASSERT(reqp != NULL);
1630 
1631 	if (_aio_done_tail == NULL) {
1632 		_aio_done_head = _aio_done_tail = reqp;
1633 	} else {
1634 		_aio_done_head->req_next = reqp;
1635 		_aio_done_head = reqp;
1636 	}
1637 
1638 	if (_aiowait_flag) {
1639 		sig_mutex_unlock(&__aio_mutex);
1640 		(void) _kaio(AIONOTIFY);
1641 	} else {
1642 		sig_mutex_unlock(&__aio_mutex);
1643 		if (_sigio_enabled)
1644 			(void) kill(__pid, SIGIO);
1645 	}
1646 }
1647 
1648 /*
1649  * The done queue consists of AIO requests that are in either the
1650  * AIO_REQ_DONE or AIO_REQ_CANCELED state.  Requests that were cancelled
1651  * are discarded.  If the done queue is empty then NULL is returned.
1652  * Otherwise the address of a done aio_result_t is returned.
1653  */
1654 aio_result_t *
_aio_req_done(void)1655 _aio_req_done(void)
1656 {
1657 	aio_req_t *reqp;
1658 	aio_result_t *resultp;
1659 
1660 	ASSERT(MUTEX_HELD(&__aio_mutex));
1661 
1662 	if ((reqp = _aio_done_tail) != NULL) {
1663 		if ((_aio_done_tail = reqp->req_next) == NULL)
1664 			_aio_done_head = NULL;
1665 		ASSERT(_aio_donecnt > 0);
1666 		_aio_donecnt--;
1667 		(void) _aio_hash_del(reqp->req_resultp);
1668 		resultp = reqp->req_resultp;
1669 		ASSERT(reqp->req_state == AIO_REQ_DONE);
1670 		_aio_req_free(reqp);
1671 		return (resultp);
1672 	}
1673 	/* is queue empty? */
1674 	if (reqp == NULL && _aio_outstand_cnt == 0) {
1675 		return ((aio_result_t *)-1);
1676 	}
1677 	return (NULL);
1678 }
1679 
1680 /*
1681  * Set the return and errno values for the application's use.
1682  *
1683  * For the Posix interfaces, we must set the return value first followed
1684  * by the errno value because the Posix interfaces allow for a change
1685  * in the errno value from EINPROGRESS to something else to signal
1686  * the completion of the asynchronous request.
1687  *
1688  * The opposite is true for the Solaris interfaces.  These allow for
1689  * a change in the return value from AIO_INPROGRESS to something else
1690  * to signal the completion of the asynchronous request.
1691  */
1692 void
_aio_set_result(aio_req_t * reqp,ssize_t retval,int error)1693 _aio_set_result(aio_req_t *reqp, ssize_t retval, int error)
1694 {
1695 	aio_result_t *resultp = reqp->req_resultp;
1696 
1697 	if (POSIX_AIO(reqp)) {
1698 		resultp->aio_return = retval;
1699 		membar_producer();
1700 		resultp->aio_errno = error;
1701 	} else {
1702 		resultp->aio_errno = error;
1703 		membar_producer();
1704 		resultp->aio_return = retval;
1705 	}
1706 }
1707 
1708 /*
1709  * Add an AIO request onto the next work queue.
1710  * A circular list of workers is used to choose the next worker.
1711  */
1712 void
_aio_req_add(aio_req_t * reqp,aio_worker_t ** nextworker,int mode)1713 _aio_req_add(aio_req_t *reqp, aio_worker_t **nextworker, int mode)
1714 {
1715 	ulwp_t *self = curthread;
1716 	aio_worker_t *aiowp;
1717 	aio_worker_t *first;
1718 	int load_bal_flg = 1;
1719 	int found;
1720 
1721 	ASSERT(reqp->req_state != AIO_REQ_DONEQ);
1722 	reqp->req_next = NULL;
1723 	/*
1724 	 * Try to acquire the next worker's work queue.  If it is locked,
1725 	 * then search the list of workers until a queue is found unlocked,
1726 	 * or until the list is completely traversed at which point another
1727 	 * worker will be created.
1728 	 */
1729 	sigoff(self);		/* defer SIGIO */
1730 	sig_mutex_lock(&__aio_mutex);
1731 	first = aiowp = *nextworker;
1732 	if (mode != AIONOTIFY)
1733 		_aio_outstand_cnt++;
1734 	sig_mutex_unlock(&__aio_mutex);
1735 
1736 	switch (mode) {
1737 	case AIOREAD:
1738 	case AIOWRITE:
1739 	case AIOAREAD:
1740 	case AIOAWRITE:
1741 #if !defined(_LP64)
1742 	case AIOAREAD64:
1743 	case AIOAWRITE64:
1744 #endif
1745 		/* try to find an idle worker */
1746 		found = 0;
1747 		do {
1748 			if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) {
1749 				if (aiowp->work_idleflg) {
1750 					found = 1;
1751 					break;
1752 				}
1753 				sig_mutex_unlock(&aiowp->work_qlock1);
1754 			}
1755 		} while ((aiowp = aiowp->work_forw) != first);
1756 
1757 		if (found) {
1758 			aiowp->work_minload1++;
1759 			break;
1760 		}
1761 
1762 		/* try to acquire some worker's queue lock */
1763 		do {
1764 			if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) {
1765 				found = 1;
1766 				break;
1767 			}
1768 		} while ((aiowp = aiowp->work_forw) != first);
1769 
1770 		/*
1771 		 * Create more workers when the workers appear overloaded.
1772 		 * Either all the workers are busy draining their queues
1773 		 * or no worker's queue lock could be acquired.
1774 		 */
1775 		if (!found) {
1776 			if (_aio_worker_cnt < _max_workers) {
1777 				if (_aio_create_worker(reqp, mode))
1778 					aio_panic("_aio_req_add: add worker");
1779 				sigon(self);	/* reenable SIGIO */
1780 				return;
1781 			}
1782 
1783 			/*
1784 			 * No worker available and we have created
1785 			 * _max_workers, keep going through the
1786 			 * list slowly until we get a lock
1787 			 */
1788 			while (sig_mutex_trylock(&aiowp->work_qlock1) != 0) {
1789 				/*
1790 				 * give someone else a chance
1791 				 */
1792 				_aio_delay(1);
1793 				aiowp = aiowp->work_forw;
1794 			}
1795 		}
1796 
1797 		ASSERT(MUTEX_HELD(&aiowp->work_qlock1));
1798 		if (_aio_worker_cnt < _max_workers &&
1799 		    aiowp->work_minload1 >= _minworkload) {
1800 			sig_mutex_unlock(&aiowp->work_qlock1);
1801 			sig_mutex_lock(&__aio_mutex);
1802 			*nextworker = aiowp->work_forw;
1803 			sig_mutex_unlock(&__aio_mutex);
1804 			if (_aio_create_worker(reqp, mode))
1805 				aio_panic("aio_req_add: add worker");
1806 			sigon(self);	/* reenable SIGIO */
1807 			return;
1808 		}
1809 		aiowp->work_minload1++;
1810 		break;
1811 	case AIOFSYNC:
1812 	case AIONOTIFY:
1813 		load_bal_flg = 0;
1814 		sig_mutex_lock(&aiowp->work_qlock1);
1815 		break;
1816 	default:
1817 		aio_panic("_aio_req_add: invalid mode");
1818 		break;
1819 	}
1820 	/*
1821 	 * Put request onto worker's work queue.
1822 	 */
1823 	if (aiowp->work_tail1 == NULL) {
1824 		ASSERT(aiowp->work_count1 == 0);
1825 		aiowp->work_tail1 = reqp;
1826 		aiowp->work_next1 = reqp;
1827 	} else {
1828 		aiowp->work_head1->req_next = reqp;
1829 		if (aiowp->work_next1 == NULL)
1830 			aiowp->work_next1 = reqp;
1831 	}
1832 	reqp->req_state = AIO_REQ_QUEUED;
1833 	reqp->req_worker = aiowp;
1834 	aiowp->work_head1 = reqp;
1835 	/*
1836 	 * Awaken worker if it is not currently active.
1837 	 */
1838 	if (aiowp->work_count1++ == 0 && aiowp->work_idleflg) {
1839 		aiowp->work_idleflg = 0;
1840 		(void) cond_signal(&aiowp->work_idle_cv);
1841 	}
1842 	sig_mutex_unlock(&aiowp->work_qlock1);
1843 
1844 	if (load_bal_flg) {
1845 		sig_mutex_lock(&__aio_mutex);
1846 		*nextworker = aiowp->work_forw;
1847 		sig_mutex_unlock(&__aio_mutex);
1848 	}
1849 	sigon(self);	/* reenable SIGIO */
1850 }
1851 
1852 /*
1853  * Get an AIO request for a specified worker.
1854  * If the work queue is empty, return NULL.
1855  */
1856 aio_req_t *
_aio_req_get(aio_worker_t * aiowp)1857 _aio_req_get(aio_worker_t *aiowp)
1858 {
1859 	aio_req_t *reqp;
1860 
1861 	sig_mutex_lock(&aiowp->work_qlock1);
1862 	if ((reqp = aiowp->work_next1) != NULL) {
1863 		/*
1864 		 * Remove a POSIX request from the queue; the
1865 		 * request queue is a singularly linked list
1866 		 * with a previous pointer.  The request is
1867 		 * removed by updating the previous pointer.
1868 		 *
1869 		 * Non-posix requests are left on the queue
1870 		 * to eventually be placed on the done queue.
1871 		 */
1872 
1873 		if (POSIX_AIO(reqp)) {
1874 			if (aiowp->work_prev1 == NULL) {
1875 				aiowp->work_tail1 = reqp->req_next;
1876 				if (aiowp->work_tail1 == NULL)
1877 					aiowp->work_head1 = NULL;
1878 			} else {
1879 				aiowp->work_prev1->req_next = reqp->req_next;
1880 				if (aiowp->work_head1 == reqp)
1881 					aiowp->work_head1 = reqp->req_next;
1882 			}
1883 
1884 		} else {
1885 			aiowp->work_prev1 = reqp;
1886 			ASSERT(aiowp->work_done1 >= 0);
1887 			aiowp->work_done1++;
1888 		}
1889 		ASSERT(reqp != reqp->req_next);
1890 		aiowp->work_next1 = reqp->req_next;
1891 		ASSERT(aiowp->work_count1 >= 1);
1892 		aiowp->work_count1--;
1893 		switch (reqp->req_op) {
1894 		case AIOREAD:
1895 		case AIOWRITE:
1896 		case AIOAREAD:
1897 		case AIOAWRITE:
1898 #if !defined(_LP64)
1899 		case AIOAREAD64:
1900 		case AIOAWRITE64:
1901 #endif
1902 			ASSERT(aiowp->work_minload1 > 0);
1903 			aiowp->work_minload1--;
1904 			break;
1905 		}
1906 		reqp->req_state = AIO_REQ_INPROGRESS;
1907 	}
1908 	aiowp->work_req = reqp;
1909 	ASSERT(reqp != NULL || aiowp->work_count1 == 0);
1910 	sig_mutex_unlock(&aiowp->work_qlock1);
1911 	return (reqp);
1912 }
1913 
1914 static void
_aio_req_del(aio_worker_t * aiowp,aio_req_t * reqp,int ostate)1915 _aio_req_del(aio_worker_t *aiowp, aio_req_t *reqp, int ostate)
1916 {
1917 	aio_req_t **last;
1918 	aio_req_t *lastrp;
1919 	aio_req_t *next;
1920 
1921 	ASSERT(aiowp != NULL);
1922 	ASSERT(MUTEX_HELD(&aiowp->work_qlock1));
1923 	if (POSIX_AIO(reqp)) {
1924 		if (ostate != AIO_REQ_QUEUED)
1925 			return;
1926 	}
1927 	last = &aiowp->work_tail1;
1928 	lastrp = aiowp->work_tail1;
1929 	ASSERT(ostate == AIO_REQ_QUEUED || ostate == AIO_REQ_INPROGRESS);
1930 	while ((next = *last) != NULL) {
1931 		if (next == reqp) {
1932 			*last = next->req_next;
1933 			if (aiowp->work_next1 == next)
1934 				aiowp->work_next1 = next->req_next;
1935 
1936 			/*
1937 			 * if this is the first request on the queue, move
1938 			 * the lastrp pointer forward.
1939 			 */
1940 			if (lastrp == next)
1941 				lastrp = next->req_next;
1942 
1943 			/*
1944 			 * if this request is pointed by work_head1, then
1945 			 * make work_head1 point to the last request that is
1946 			 * present on the queue.
1947 			 */
1948 			if (aiowp->work_head1 == next)
1949 				aiowp->work_head1 = lastrp;
1950 
1951 			/*
1952 			 * work_prev1 is used only in non posix case and it
1953 			 * points to the current AIO_REQ_INPROGRESS request.
1954 			 * If work_prev1 points to this request which is being
1955 			 * deleted, make work_prev1 NULL and set  work_done1
1956 			 * to 0.
1957 			 *
1958 			 * A worker thread can be processing only one request
1959 			 * at a time.
1960 			 */
1961 			if (aiowp->work_prev1 == next) {
1962 				ASSERT(ostate == AIO_REQ_INPROGRESS &&
1963 				    !POSIX_AIO(reqp) && aiowp->work_done1 > 0);
1964 					aiowp->work_prev1 = NULL;
1965 					aiowp->work_done1--;
1966 			}
1967 
1968 			if (ostate == AIO_REQ_QUEUED) {
1969 				ASSERT(aiowp->work_count1 >= 1);
1970 				aiowp->work_count1--;
1971 				ASSERT(aiowp->work_minload1 >= 1);
1972 				aiowp->work_minload1--;
1973 			}
1974 			return;
1975 		}
1976 		last = &next->req_next;
1977 		lastrp = next;
1978 	}
1979 	/* NOTREACHED */
1980 }
1981 
1982 static void
_aio_enq_doneq(aio_req_t * reqp)1983 _aio_enq_doneq(aio_req_t *reqp)
1984 {
1985 	if (_aio_doneq == NULL) {
1986 		_aio_doneq = reqp;
1987 		reqp->req_next = reqp->req_prev = reqp;
1988 	} else {
1989 		reqp->req_next = _aio_doneq;
1990 		reqp->req_prev = _aio_doneq->req_prev;
1991 		_aio_doneq->req_prev->req_next = reqp;
1992 		_aio_doneq->req_prev = reqp;
1993 	}
1994 	reqp->req_state = AIO_REQ_DONEQ;
1995 	_aio_doneq_cnt++;
1996 }
1997 
1998 /*
1999  * caller owns the _aio_mutex
2000  */
2001 aio_req_t *
_aio_req_remove(aio_req_t * reqp)2002 _aio_req_remove(aio_req_t *reqp)
2003 {
2004 	if (reqp && reqp->req_state != AIO_REQ_DONEQ)
2005 		return (NULL);
2006 
2007 	if (reqp) {
2008 		/* request in done queue */
2009 		if (_aio_doneq == reqp)
2010 			_aio_doneq = reqp->req_next;
2011 		if (_aio_doneq == reqp) {
2012 			/* only one request on queue */
2013 			_aio_doneq = NULL;
2014 		} else {
2015 			aio_req_t *tmp = reqp->req_next;
2016 			reqp->req_prev->req_next = tmp;
2017 			tmp->req_prev = reqp->req_prev;
2018 		}
2019 	} else if ((reqp = _aio_doneq) != NULL) {
2020 		if (reqp == reqp->req_next) {
2021 			/* only one request on queue */
2022 			_aio_doneq = NULL;
2023 		} else {
2024 			reqp->req_prev->req_next = _aio_doneq = reqp->req_next;
2025 			_aio_doneq->req_prev = reqp->req_prev;
2026 		}
2027 	}
2028 	if (reqp) {
2029 		_aio_doneq_cnt--;
2030 		reqp->req_next = reqp->req_prev = reqp;
2031 		reqp->req_state = AIO_REQ_DONE;
2032 	}
2033 	return (reqp);
2034 }
2035 
2036 /*
2037  * An AIO request is identified by an aio_result_t pointer.  The library
2038  * maps this aio_result_t pointer to its internal representation using a
2039  * hash table.  This function adds an aio_result_t pointer to the hash table.
2040  */
2041 static int
_aio_hash_insert(aio_result_t * resultp,aio_req_t * reqp)2042 _aio_hash_insert(aio_result_t *resultp, aio_req_t *reqp)
2043 {
2044 	aio_hash_t *hashp;
2045 	aio_req_t **prev;
2046 	aio_req_t *next;
2047 
2048 	hashp = _aio_hash + AIOHASH(resultp);
2049 	lmutex_lock(&hashp->hash_lock);
2050 	prev = &hashp->hash_ptr;
2051 	while ((next = *prev) != NULL) {
2052 		if (resultp == next->req_resultp) {
2053 			lmutex_unlock(&hashp->hash_lock);
2054 			return (-1);
2055 		}
2056 		prev = &next->req_link;
2057 	}
2058 	*prev = reqp;
2059 	ASSERT(reqp->req_link == NULL);
2060 	lmutex_unlock(&hashp->hash_lock);
2061 	return (0);
2062 }
2063 
2064 /*
2065  * Remove an entry from the hash table.
2066  */
2067 aio_req_t *
_aio_hash_del(aio_result_t * resultp)2068 _aio_hash_del(aio_result_t *resultp)
2069 {
2070 	aio_hash_t *hashp;
2071 	aio_req_t **prev;
2072 	aio_req_t *next = NULL;
2073 
2074 	if (_aio_hash != NULL) {
2075 		hashp = _aio_hash + AIOHASH(resultp);
2076 		lmutex_lock(&hashp->hash_lock);
2077 		prev = &hashp->hash_ptr;
2078 		while ((next = *prev) != NULL) {
2079 			if (resultp == next->req_resultp) {
2080 				*prev = next->req_link;
2081 				next->req_link = NULL;
2082 				break;
2083 			}
2084 			prev = &next->req_link;
2085 		}
2086 		lmutex_unlock(&hashp->hash_lock);
2087 	}
2088 	return (next);
2089 }
2090 
2091 /*
2092  *  find an entry in the hash table
2093  */
2094 aio_req_t *
_aio_hash_find(aio_result_t * resultp)2095 _aio_hash_find(aio_result_t *resultp)
2096 {
2097 	aio_hash_t *hashp;
2098 	aio_req_t **prev;
2099 	aio_req_t *next = NULL;
2100 
2101 	if (_aio_hash != NULL) {
2102 		hashp = _aio_hash + AIOHASH(resultp);
2103 		lmutex_lock(&hashp->hash_lock);
2104 		prev = &hashp->hash_ptr;
2105 		while ((next = *prev) != NULL) {
2106 			if (resultp == next->req_resultp)
2107 				break;
2108 			prev = &next->req_link;
2109 		}
2110 		lmutex_unlock(&hashp->hash_lock);
2111 	}
2112 	return (next);
2113 }
2114 
2115 /*
2116  * AIO interface for POSIX
2117  */
2118 int
_aio_rw(aiocb_t * aiocbp,aio_lio_t * lio_head,aio_worker_t ** nextworker,int mode,int flg)2119 _aio_rw(aiocb_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker,
2120     int mode, int flg)
2121 {
2122 	aio_req_t *reqp;
2123 	aio_args_t *ap;
2124 	int kerr;
2125 
2126 	if (aiocbp == NULL) {
2127 		errno = EINVAL;
2128 		return (-1);
2129 	}
2130 
2131 	/* initialize kaio */
2132 	if (!_kaio_ok)
2133 		_kaio_init();
2134 
2135 	aiocbp->aio_state = NOCHECK;
2136 
2137 	/*
2138 	 * If we have been called because a list I/O
2139 	 * kaio() failed, we dont want to repeat the
2140 	 * system call
2141 	 */
2142 
2143 	if (flg & AIO_KAIO) {
2144 		/*
2145 		 * Try kernel aio first.
2146 		 * If errno is ENOTSUP/EBADFD,
2147 		 * fall back to the thread implementation.
2148 		 */
2149 		if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) {
2150 			aiocbp->aio_resultp.aio_errno = EINPROGRESS;
2151 			aiocbp->aio_state = CHECK;
2152 			kerr = (int)_kaio(mode, aiocbp);
2153 			if (kerr == 0)
2154 				return (0);
2155 			if (errno != ENOTSUP && errno != EBADFD) {
2156 				aiocbp->aio_resultp.aio_errno = errno;
2157 				aiocbp->aio_resultp.aio_return = -1;
2158 				aiocbp->aio_state = NOCHECK;
2159 				return (-1);
2160 			}
2161 			if (errno == EBADFD)
2162 				SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes);
2163 		}
2164 	}
2165 
2166 	aiocbp->aio_resultp.aio_errno = EINPROGRESS;
2167 	aiocbp->aio_state = USERAIO;
2168 
2169 	if (!__uaio_ok && __uaio_init() == -1)
2170 		return (-1);
2171 
2172 	if ((reqp = _aio_req_alloc()) == NULL) {
2173 		errno = EAGAIN;
2174 		return (-1);
2175 	}
2176 
2177 	/*
2178 	 * If an LIO request, add the list head to the aio request
2179 	 */
2180 	reqp->req_head = lio_head;
2181 	reqp->req_type = AIO_POSIX_REQ;
2182 	reqp->req_op = mode;
2183 	reqp->req_largefile = 0;
2184 
2185 	if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) {
2186 		reqp->req_sigevent.sigev_notify = SIGEV_NONE;
2187 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) {
2188 		reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL;
2189 		reqp->req_sigevent.sigev_signo =
2190 		    aiocbp->aio_sigevent.sigev_signo;
2191 		reqp->req_sigevent.sigev_value.sival_ptr =
2192 		    aiocbp->aio_sigevent.sigev_value.sival_ptr;
2193 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) {
2194 		port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr;
2195 		reqp->req_sigevent.sigev_notify = SIGEV_PORT;
2196 		/*
2197 		 * Reuse the sigevent structure to contain the port number
2198 		 * and the user value.  Same for SIGEV_THREAD, below.
2199 		 */
2200 		reqp->req_sigevent.sigev_signo =
2201 		    pn->portnfy_port;
2202 		reqp->req_sigevent.sigev_value.sival_ptr =
2203 		    pn->portnfy_user;
2204 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) {
2205 		reqp->req_sigevent.sigev_notify = SIGEV_THREAD;
2206 		/*
2207 		 * The sigevent structure contains the port number
2208 		 * and the user value.  Same for SIGEV_PORT, above.
2209 		 */
2210 		reqp->req_sigevent.sigev_signo =
2211 		    aiocbp->aio_sigevent.sigev_signo;
2212 		reqp->req_sigevent.sigev_value.sival_ptr =
2213 		    aiocbp->aio_sigevent.sigev_value.sival_ptr;
2214 	}
2215 
2216 	reqp->req_resultp = &aiocbp->aio_resultp;
2217 	reqp->req_aiocbp = aiocbp;
2218 	ap = &reqp->req_args;
2219 	ap->fd = aiocbp->aio_fildes;
2220 	ap->buf = (caddr_t)aiocbp->aio_buf;
2221 	ap->bufsz = aiocbp->aio_nbytes;
2222 	ap->offset = aiocbp->aio_offset;
2223 
2224 	if ((flg & AIO_NO_DUPS) &&
2225 	    _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) {
2226 		aio_panic("_aio_rw(): request already in hash table");
2227 		_aio_req_free(reqp);
2228 		errno = EINVAL;
2229 		return (-1);
2230 	}
2231 	_aio_req_add(reqp, nextworker, mode);
2232 	return (0);
2233 }
2234 
2235 #if !defined(_LP64)
2236 /*
2237  * 64-bit AIO interface for POSIX
2238  */
2239 int
_aio_rw64(aiocb64_t * aiocbp,aio_lio_t * lio_head,aio_worker_t ** nextworker,int mode,int flg)2240 _aio_rw64(aiocb64_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker,
2241     int mode, int flg)
2242 {
2243 	aio_req_t *reqp;
2244 	aio_args_t *ap;
2245 	int kerr;
2246 
2247 	if (aiocbp == NULL) {
2248 		errno = EINVAL;
2249 		return (-1);
2250 	}
2251 
2252 	/* initialize kaio */
2253 	if (!_kaio_ok)
2254 		_kaio_init();
2255 
2256 	aiocbp->aio_state = NOCHECK;
2257 
2258 	/*
2259 	 * If we have been called because a list I/O
2260 	 * kaio() failed, we dont want to repeat the
2261 	 * system call
2262 	 */
2263 
2264 	if (flg & AIO_KAIO) {
2265 		/*
2266 		 * Try kernel aio first.
2267 		 * If errno is ENOTSUP/EBADFD,
2268 		 * fall back to the thread implementation.
2269 		 */
2270 		if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) {
2271 			aiocbp->aio_resultp.aio_errno = EINPROGRESS;
2272 			aiocbp->aio_state = CHECK;
2273 			kerr = (int)_kaio(mode, aiocbp);
2274 			if (kerr == 0)
2275 				return (0);
2276 			if (errno != ENOTSUP && errno != EBADFD) {
2277 				aiocbp->aio_resultp.aio_errno = errno;
2278 				aiocbp->aio_resultp.aio_return = -1;
2279 				aiocbp->aio_state = NOCHECK;
2280 				return (-1);
2281 			}
2282 			if (errno == EBADFD)
2283 				SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes);
2284 		}
2285 	}
2286 
2287 	aiocbp->aio_resultp.aio_errno = EINPROGRESS;
2288 	aiocbp->aio_state = USERAIO;
2289 
2290 	if (!__uaio_ok && __uaio_init() == -1)
2291 		return (-1);
2292 
2293 	if ((reqp = _aio_req_alloc()) == NULL) {
2294 		errno = EAGAIN;
2295 		return (-1);
2296 	}
2297 
2298 	/*
2299 	 * If an LIO request, add the list head to the aio request
2300 	 */
2301 	reqp->req_head = lio_head;
2302 	reqp->req_type = AIO_POSIX_REQ;
2303 	reqp->req_op = mode;
2304 	reqp->req_largefile = 1;
2305 
2306 	if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) {
2307 		reqp->req_sigevent.sigev_notify = SIGEV_NONE;
2308 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) {
2309 		reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL;
2310 		reqp->req_sigevent.sigev_signo =
2311 		    aiocbp->aio_sigevent.sigev_signo;
2312 		reqp->req_sigevent.sigev_value.sival_ptr =
2313 		    aiocbp->aio_sigevent.sigev_value.sival_ptr;
2314 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) {
2315 		port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr;
2316 		reqp->req_sigevent.sigev_notify = SIGEV_PORT;
2317 		reqp->req_sigevent.sigev_signo =
2318 		    pn->portnfy_port;
2319 		reqp->req_sigevent.sigev_value.sival_ptr =
2320 		    pn->portnfy_user;
2321 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) {
2322 		reqp->req_sigevent.sigev_notify = SIGEV_THREAD;
2323 		reqp->req_sigevent.sigev_signo =
2324 		    aiocbp->aio_sigevent.sigev_signo;
2325 		reqp->req_sigevent.sigev_value.sival_ptr =
2326 		    aiocbp->aio_sigevent.sigev_value.sival_ptr;
2327 	}
2328 
2329 	reqp->req_resultp = &aiocbp->aio_resultp;
2330 	reqp->req_aiocbp = aiocbp;
2331 	ap = &reqp->req_args;
2332 	ap->fd = aiocbp->aio_fildes;
2333 	ap->buf = (caddr_t)aiocbp->aio_buf;
2334 	ap->bufsz = aiocbp->aio_nbytes;
2335 	ap->offset = aiocbp->aio_offset;
2336 
2337 	if ((flg & AIO_NO_DUPS) &&
2338 	    _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) {
2339 		aio_panic("_aio_rw64(): request already in hash table");
2340 		_aio_req_free(reqp);
2341 		errno = EINVAL;
2342 		return (-1);
2343 	}
2344 	_aio_req_add(reqp, nextworker, mode);
2345 	return (0);
2346 }
2347 #endif	/* !defined(_LP64) */
2348