1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 22 /* 23 * Copyright 2006 Sun Microsystems, Inc. All rights reserved. 24 * Use is subject to license terms. 25 */ 26 27 #pragma ident "%Z%%M% %I% %E% SMI" 28 29 #include "libaio.h" 30 #include <atomic.h> 31 #include <sys/param.h> 32 #include <sys/file.h> 33 #include <sys/port.h> 34 35 static int _aio_hash_insert(aio_result_t *, aio_req_t *); 36 static aio_req_t *_aio_req_get(aio_worker_t *); 37 static void _aio_req_add(aio_req_t *, aio_worker_t **, int); 38 static void _aio_req_del(aio_worker_t *, aio_req_t *, int); 39 static void _aio_work_done(aio_worker_t *); 40 aio_req_t *_aio_req_remove(aio_req_t *); 41 static void _aio_enq_doneq(aio_req_t *); 42 43 extern void _aio_lio_free(aio_lio_t *); 44 45 extern int __fdsync(int, int); 46 extern int _port_dispatch(int, int, int, int, uintptr_t, void *); 47 48 static int _aio_fsync_del(aio_worker_t *, aio_req_t *); 49 static void _aiodone(aio_req_t *, ssize_t, int); 50 static void _aio_cancel_work(aio_worker_t *, int, int *, int *); 51 static void _aio_finish_request(aio_worker_t *, ssize_t, int); 52 53 /* 54 * switch for kernel async I/O 55 */ 56 int _kaio_ok = 0; /* 0 = disabled, 1 = on, -1 = error */ 57 58 /* 59 * Key for thread-specific data 60 */ 61 pthread_key_t _aio_key; 62 63 /* 64 * Array for determining whether or not a file supports kaio. 65 * Initialized in _kaio_init(). 66 */ 67 uint32_t *_kaio_supported = NULL; 68 69 /* 70 * workers for read/write requests 71 * (__aio_mutex lock protects circular linked list of workers) 72 */ 73 aio_worker_t *__workers_rw; /* circular list of AIO workers */ 74 aio_worker_t *__nextworker_rw; /* next worker in list of workers */ 75 int __rw_workerscnt; /* number of read/write workers */ 76 77 /* 78 * worker for notification requests. 79 */ 80 aio_worker_t *__workers_no; /* circular list of AIO workers */ 81 aio_worker_t *__nextworker_no; /* next worker in list of workers */ 82 int __no_workerscnt; /* number of write workers */ 83 84 aio_req_t *_aio_done_tail; /* list of done requests */ 85 aio_req_t *_aio_done_head; 86 87 mutex_t __aio_initlock = DEFAULTMUTEX; /* makes aio initialization atomic */ 88 mutex_t __aio_mutex = DEFAULTMUTEX; /* protects counts, and linked lists */ 89 cond_t _aio_iowait_cv = DEFAULTCV; /* wait for userland I/Os */ 90 91 pid_t __pid = (pid_t)-1; /* initialize as invalid pid */ 92 int _sigio_enabled = 0; /* when set, send SIGIO signal */ 93 94 aio_hash_t *_aio_hash; 95 96 aio_req_t *_aio_doneq; /* double linked done queue list */ 97 98 int _aio_donecnt = 0; 99 int _aio_waitncnt = 0; /* # of requests for aio_waitn */ 100 int _aio_doneq_cnt = 0; 101 int _aio_outstand_cnt = 0; /* # of outstanding requests */ 102 int _kaio_outstand_cnt = 0; /* # of outstanding kaio requests */ 103 int _aio_req_done_cnt = 0; /* req. done but not in "done queue" */ 104 int _aio_kernel_suspend = 0; /* active kernel kaio calls */ 105 int _aio_suscv_cnt = 0; /* aio_suspend calls waiting on cv's */ 106 107 int _max_workers = 256; /* max number of workers permitted */ 108 int _min_workers = 8; /* min number of workers */ 109 int _minworkload = 2; /* min number of request in q */ 110 int _aio_worker_cnt = 0; /* number of workers to do requests */ 111 int __uaio_ok = 0; /* AIO has been enabled */ 112 sigset_t _worker_set; /* worker's signal mask */ 113 sigset_t _full_set; /* all signals (sigfillset()) */ 114 115 int _aiowait_flag = 0; /* when set, aiowait() is inprogress */ 116 int _aio_flags = 0; /* see libaio.h defines for */ 117 118 aio_worker_t *_kaiowp; /* points to kaio cleanup thread */ 119 120 int hz; /* clock ticks per second */ 121 122 static int 123 _kaio_supported_init(void) 124 { 125 void *ptr; 126 size_t size; 127 128 if (_kaio_supported != NULL) /* already initialized */ 129 return (0); 130 131 size = MAX_KAIO_FDARRAY_SIZE * sizeof (uint32_t); 132 ptr = mmap(NULL, size, PROT_READ | PROT_WRITE, 133 MAP_PRIVATE | MAP_ANON, -1, (off_t)0); 134 if (ptr == MAP_FAILED) 135 return (-1); 136 _kaio_supported = ptr; 137 return (0); 138 } 139 140 /* 141 * libaio is initialized when an AIO request is made. Important 142 * constants are initialized like the max number of workers that 143 * libaio can create, and the minimum number of workers permitted before 144 * imposing some restrictions. Also, some workers are created. 145 */ 146 int 147 __uaio_init(void) 148 { 149 int i; 150 int ret; 151 152 sig_mutex_lock(&__aio_initlock); 153 if (__uaio_ok) { /* already initialized */ 154 sig_mutex_unlock(&__aio_initlock); 155 return (0); 156 } 157 158 ret = -1; 159 160 hz = (int)sysconf(_SC_CLK_TCK); 161 __pid = getpid(); 162 163 init_signals(); 164 165 if (_kaio_supported_init() != 0) 166 goto out; 167 168 /* 169 * Allocate and initialize the hash table. 170 */ 171 /* LINTED pointer cast */ 172 _aio_hash = (aio_hash_t *)mmap(NULL, 173 HASHSZ * sizeof (aio_hash_t), PROT_READ | PROT_WRITE, 174 MAP_PRIVATE | MAP_ANON, -1, (off_t)0); 175 if ((void *)_aio_hash == MAP_FAILED) { 176 _aio_hash = NULL; 177 goto out; 178 } 179 for (i = 0; i < HASHSZ; i++) 180 (void) mutex_init(&_aio_hash[i].hash_lock, USYNC_THREAD, NULL); 181 182 /* 183 * Initialize worker's signal mask to only catch SIGAIOCANCEL. 184 */ 185 (void) sigfillset(&_full_set); 186 (void) sigfillset(&_worker_set); 187 (void) sigdelset(&_worker_set, SIGAIOCANCEL); 188 189 /* 190 * Create the minimum number of workers. 191 */ 192 for (i = 0; i < _min_workers; i++) 193 (void) _aio_create_worker(NULL, AIOREAD); 194 195 /* 196 * Create one worker to send asynchronous notifications. 197 */ 198 (void) _aio_create_worker(NULL, AIONOTIFY); 199 200 __uaio_ok = 1; 201 ret = 0; 202 203 out: 204 sig_mutex_unlock(&__aio_initlock); 205 return (ret); 206 } 207 208 /* 209 * special kaio cleanup thread sits in a loop in the 210 * kernel waiting for pending kaio requests to complete. 211 */ 212 void * 213 _kaio_cleanup_thread(void *arg) 214 { 215 if (pthread_setspecific(_aio_key, arg) != 0) 216 _aiopanic("_kaio_cleanup_thread, pthread_setspecific()"); 217 (void) _kaio(AIOSTART); 218 return (arg); 219 } 220 221 /* 222 * initialize kaio. 223 */ 224 void 225 _kaio_init() 226 { 227 int error; 228 sigset_t set; 229 sigset_t oset; 230 231 sig_mutex_lock(&__aio_initlock); 232 if (_kaio_supported_init() != 0) 233 _kaio_ok = -1; 234 if (_kaio_ok == 0) { 235 if ((_kaiowp = _aio_worker_alloc()) == NULL) { 236 error = ENOMEM; 237 } else { 238 if ((error = (int)_kaio(AIOINIT)) == 0) { 239 (void) sigfillset(&set); 240 (void) pthread_sigmask(SIG_SETMASK, 241 &set, &oset); 242 error = thr_create(NULL, AIOSTKSIZE, 243 _kaio_cleanup_thread, _kaiowp, 244 THR_DAEMON, &_kaiowp->work_tid); 245 (void) pthread_sigmask(SIG_SETMASK, 246 &oset, NULL); 247 } 248 if (error) { 249 _aio_worker_free(_kaiowp); 250 _kaiowp = NULL; 251 } 252 } 253 if (error) 254 _kaio_ok = -1; 255 else 256 _kaio_ok = 1; 257 } 258 sig_mutex_unlock(&__aio_initlock); 259 } 260 261 int 262 aioread(int fd, caddr_t buf, int bufsz, off_t offset, int whence, 263 aio_result_t *resultp) 264 { 265 return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOREAD)); 266 } 267 268 int 269 aiowrite(int fd, caddr_t buf, int bufsz, off_t offset, int whence, 270 aio_result_t *resultp) 271 { 272 return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOWRITE)); 273 } 274 275 #if !defined(_LP64) 276 int 277 aioread64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence, 278 aio_result_t *resultp) 279 { 280 return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAREAD64)); 281 } 282 283 int 284 aiowrite64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence, 285 aio_result_t *resultp) 286 { 287 return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAWRITE64)); 288 } 289 #endif /* !defined(_LP64) */ 290 291 int 292 _aiorw(int fd, caddr_t buf, int bufsz, offset_t offset, int whence, 293 aio_result_t *resultp, int mode) 294 { 295 aio_req_t *reqp; 296 aio_args_t *ap; 297 offset_t loffset; 298 struct stat stat; 299 int error = 0; 300 int kerr; 301 int umode; 302 303 switch (whence) { 304 305 case SEEK_SET: 306 loffset = offset; 307 break; 308 case SEEK_CUR: 309 if ((loffset = llseek(fd, 0, SEEK_CUR)) == -1) 310 error = -1; 311 else 312 loffset += offset; 313 break; 314 case SEEK_END: 315 if (fstat(fd, &stat) == -1) 316 error = -1; 317 else 318 loffset = offset + stat.st_size; 319 break; 320 default: 321 errno = EINVAL; 322 error = -1; 323 } 324 325 if (error) 326 return (error); 327 328 /* initialize kaio */ 329 if (!_kaio_ok) 330 _kaio_init(); 331 332 /* 333 * _aio_do_request() needs the original request code (mode) to be able 334 * to choose the appropiate 32/64 bit function. All other functions 335 * only require the difference between READ and WRITE (umode). 336 */ 337 if (mode == AIOAREAD64 || mode == AIOAWRITE64) 338 umode = mode - AIOAREAD64; 339 else 340 umode = mode; 341 342 /* 343 * Try kernel aio first. 344 * If errno is ENOTSUP/EBADFD, fall back to the thread implementation. 345 */ 346 if (_kaio_ok > 0 && KAIO_SUPPORTED(fd)) { 347 resultp->aio_errno = 0; 348 sig_mutex_lock(&__aio_mutex); 349 _kaio_outstand_cnt++; 350 kerr = (int)_kaio(((resultp->aio_return == AIO_INPROGRESS) ? 351 (umode | AIO_POLL_BIT) : umode), 352 fd, buf, bufsz, loffset, resultp); 353 if (kerr == 0) { 354 sig_mutex_unlock(&__aio_mutex); 355 return (0); 356 } 357 _kaio_outstand_cnt--; 358 sig_mutex_unlock(&__aio_mutex); 359 if (errno != ENOTSUP && errno != EBADFD) 360 return (-1); 361 if (errno == EBADFD) 362 SET_KAIO_NOT_SUPPORTED(fd); 363 } 364 365 if (!__uaio_ok && __uaio_init() == -1) 366 return (-1); 367 368 if ((reqp = _aio_req_alloc()) == NULL) { 369 errno = EAGAIN; 370 return (-1); 371 } 372 373 /* 374 * _aio_do_request() checks reqp->req_op to differentiate 375 * between 32 and 64 bit access. 376 */ 377 reqp->req_op = mode; 378 reqp->req_resultp = resultp; 379 ap = &reqp->req_args; 380 ap->fd = fd; 381 ap->buf = buf; 382 ap->bufsz = bufsz; 383 ap->offset = loffset; 384 385 if (_aio_hash_insert(resultp, reqp) != 0) { 386 _aio_req_free(reqp); 387 errno = EINVAL; 388 return (-1); 389 } 390 /* 391 * _aio_req_add() only needs the difference between READ and 392 * WRITE to choose the right worker queue. 393 */ 394 _aio_req_add(reqp, &__nextworker_rw, umode); 395 return (0); 396 } 397 398 int 399 aiocancel(aio_result_t *resultp) 400 { 401 aio_req_t *reqp; 402 aio_worker_t *aiowp; 403 int ret; 404 int done = 0; 405 int canceled = 0; 406 407 if (!__uaio_ok) { 408 errno = EINVAL; 409 return (-1); 410 } 411 412 sig_mutex_lock(&__aio_mutex); 413 reqp = _aio_hash_find(resultp); 414 if (reqp == NULL) { 415 if (_aio_outstand_cnt == _aio_req_done_cnt) 416 errno = EINVAL; 417 else 418 errno = EACCES; 419 ret = -1; 420 } else { 421 aiowp = reqp->req_worker; 422 sig_mutex_lock(&aiowp->work_qlock1); 423 (void) _aio_cancel_req(aiowp, reqp, &canceled, &done); 424 sig_mutex_unlock(&aiowp->work_qlock1); 425 426 if (canceled) { 427 ret = 0; 428 } else { 429 if (_aio_outstand_cnt == 0 || 430 _aio_outstand_cnt == _aio_req_done_cnt) 431 errno = EINVAL; 432 else 433 errno = EACCES; 434 ret = -1; 435 } 436 } 437 sig_mutex_unlock(&__aio_mutex); 438 return (ret); 439 } 440 441 /* 442 * This must be asynch safe 443 */ 444 aio_result_t * 445 aiowait(struct timeval *uwait) 446 { 447 aio_result_t *uresultp; 448 aio_result_t *kresultp; 449 aio_result_t *resultp; 450 int dontblock; 451 int timedwait = 0; 452 int kaio_errno = 0; 453 struct timeval twait; 454 struct timeval *wait = NULL; 455 hrtime_t hrtend; 456 hrtime_t hres; 457 458 if (uwait) { 459 /* 460 * Check for a valid specified wait time. 461 * If it is invalid, fail the call right away. 462 */ 463 if (uwait->tv_sec < 0 || uwait->tv_usec < 0 || 464 uwait->tv_usec >= MICROSEC) { 465 errno = EINVAL; 466 return ((aio_result_t *)-1); 467 } 468 469 if (uwait->tv_sec > 0 || uwait->tv_usec > 0) { 470 hrtend = gethrtime() + 471 (hrtime_t)uwait->tv_sec * NANOSEC + 472 (hrtime_t)uwait->tv_usec * (NANOSEC / MICROSEC); 473 twait = *uwait; 474 wait = &twait; 475 timedwait++; 476 } else { 477 /* polling */ 478 sig_mutex_lock(&__aio_mutex); 479 if (_kaio_outstand_cnt == 0) { 480 kresultp = (aio_result_t *)-1; 481 } else { 482 kresultp = (aio_result_t *)_kaio(AIOWAIT, 483 (struct timeval *)-1, 1); 484 if (kresultp != (aio_result_t *)-1 && 485 kresultp != NULL && 486 kresultp != (aio_result_t *)1) { 487 _kaio_outstand_cnt--; 488 sig_mutex_unlock(&__aio_mutex); 489 return (kresultp); 490 } 491 } 492 uresultp = _aio_req_done(); 493 sig_mutex_unlock(&__aio_mutex); 494 if (uresultp != NULL && 495 uresultp != (aio_result_t *)-1) { 496 return (uresultp); 497 } 498 if (uresultp == (aio_result_t *)-1 && 499 kresultp == (aio_result_t *)-1) { 500 errno = EINVAL; 501 return ((aio_result_t *)-1); 502 } else { 503 return (NULL); 504 } 505 } 506 } 507 508 for (;;) { 509 sig_mutex_lock(&__aio_mutex); 510 uresultp = _aio_req_done(); 511 if (uresultp != NULL && uresultp != (aio_result_t *)-1) { 512 sig_mutex_unlock(&__aio_mutex); 513 resultp = uresultp; 514 break; 515 } 516 _aiowait_flag++; 517 dontblock = (uresultp == (aio_result_t *)-1); 518 if (dontblock && _kaio_outstand_cnt == 0) { 519 kresultp = (aio_result_t *)-1; 520 kaio_errno = EINVAL; 521 } else { 522 sig_mutex_unlock(&__aio_mutex); 523 kresultp = (aio_result_t *)_kaio(AIOWAIT, 524 wait, dontblock); 525 sig_mutex_lock(&__aio_mutex); 526 kaio_errno = errno; 527 } 528 _aiowait_flag--; 529 sig_mutex_unlock(&__aio_mutex); 530 if (kresultp == (aio_result_t *)1) { 531 /* aiowait() awakened by an aionotify() */ 532 continue; 533 } else if (kresultp != NULL && 534 kresultp != (aio_result_t *)-1) { 535 resultp = kresultp; 536 sig_mutex_lock(&__aio_mutex); 537 _kaio_outstand_cnt--; 538 sig_mutex_unlock(&__aio_mutex); 539 break; 540 } else if (kresultp == (aio_result_t *)-1 && 541 kaio_errno == EINVAL && 542 uresultp == (aio_result_t *)-1) { 543 errno = kaio_errno; 544 resultp = (aio_result_t *)-1; 545 break; 546 } else if (kresultp == (aio_result_t *)-1 && 547 kaio_errno == EINTR) { 548 errno = kaio_errno; 549 resultp = (aio_result_t *)-1; 550 break; 551 } else if (timedwait) { 552 hres = hrtend - gethrtime(); 553 if (hres <= 0) { 554 /* time is up; return */ 555 resultp = NULL; 556 break; 557 } else { 558 /* 559 * Some time left. Round up the remaining time 560 * in nanoseconds to microsec. Retry the call. 561 */ 562 hres += (NANOSEC / MICROSEC) - 1; 563 wait->tv_sec = hres / NANOSEC; 564 wait->tv_usec = 565 (hres % NANOSEC) / (NANOSEC / MICROSEC); 566 } 567 } else { 568 ASSERT(kresultp == NULL && uresultp == NULL); 569 resultp = NULL; 570 continue; 571 } 572 } 573 return (resultp); 574 } 575 576 /* 577 * _aio_get_timedelta calculates the remaining time and stores the result 578 * into timespec_t *wait. 579 */ 580 581 int 582 _aio_get_timedelta(timespec_t *end, timespec_t *wait) 583 { 584 int ret = 0; 585 struct timeval cur; 586 timespec_t curtime; 587 588 (void) gettimeofday(&cur, NULL); 589 curtime.tv_sec = cur.tv_sec; 590 curtime.tv_nsec = cur.tv_usec * 1000; /* convert us to ns */ 591 592 if (end->tv_sec >= curtime.tv_sec) { 593 wait->tv_sec = end->tv_sec - curtime.tv_sec; 594 if (end->tv_nsec >= curtime.tv_nsec) { 595 wait->tv_nsec = end->tv_nsec - curtime.tv_nsec; 596 if (wait->tv_sec == 0 && wait->tv_nsec == 0) 597 ret = -1; /* timer expired */ 598 } else { 599 if (end->tv_sec > curtime.tv_sec) { 600 wait->tv_sec -= 1; 601 wait->tv_nsec = NANOSEC - 602 (curtime.tv_nsec - end->tv_nsec); 603 } else { 604 ret = -1; /* timer expired */ 605 } 606 } 607 } else { 608 ret = -1; 609 } 610 return (ret); 611 } 612 613 /* 614 * If closing by file descriptor: we will simply cancel all the outstanding 615 * aio`s and return. Those aio's in question will have either noticed the 616 * cancellation notice before, during, or after initiating io. 617 */ 618 int 619 aiocancel_all(int fd) 620 { 621 aio_req_t *reqp; 622 aio_req_t **reqpp; 623 aio_worker_t *first; 624 aio_worker_t *next; 625 int canceled = 0; 626 int done = 0; 627 int cancelall = 0; 628 629 sig_mutex_lock(&__aio_mutex); 630 631 if (_aio_outstand_cnt == 0) { 632 sig_mutex_unlock(&__aio_mutex); 633 return (AIO_ALLDONE); 634 } 635 636 /* 637 * Cancel requests from the read/write workers' queues. 638 */ 639 first = __nextworker_rw; 640 next = first; 641 do { 642 _aio_cancel_work(next, fd, &canceled, &done); 643 } while ((next = next->work_forw) != first); 644 645 /* 646 * finally, check if there are requests on the done queue that 647 * should be canceled. 648 */ 649 if (fd < 0) 650 cancelall = 1; 651 reqpp = &_aio_done_tail; 652 while ((reqp = *reqpp) != NULL) { 653 if (cancelall || reqp->req_args.fd == fd) { 654 *reqpp = reqp->req_next; 655 _aio_donecnt--; 656 (void) _aio_hash_del(reqp->req_resultp); 657 _aio_req_free(reqp); 658 } else 659 reqpp = &reqp->req_next; 660 } 661 if (cancelall) { 662 ASSERT(_aio_donecnt == 0); 663 _aio_done_head = NULL; 664 } 665 sig_mutex_unlock(&__aio_mutex); 666 667 if (canceled && done == 0) 668 return (AIO_CANCELED); 669 else if (done && canceled == 0) 670 return (AIO_ALLDONE); 671 else if ((canceled + done == 0) && KAIO_SUPPORTED(fd)) 672 return ((int)_kaio(AIOCANCEL, fd, NULL)); 673 return (AIO_NOTCANCELED); 674 } 675 676 /* 677 * Cancel requests from a given work queue. If the file descriptor 678 * parameter, fd, is non-negative, then only cancel those requests 679 * in this queue that are to this file descriptor. If the fd 680 * parameter is -1, then cancel all requests. 681 */ 682 static void 683 _aio_cancel_work(aio_worker_t *aiowp, int fd, int *canceled, int *done) 684 { 685 aio_req_t *reqp; 686 687 sig_mutex_lock(&aiowp->work_qlock1); 688 /* 689 * cancel queued requests first. 690 */ 691 reqp = aiowp->work_tail1; 692 while (reqp != NULL) { 693 if (fd < 0 || reqp->req_args.fd == fd) { 694 if (_aio_cancel_req(aiowp, reqp, canceled, done)) { 695 /* 696 * Callers locks were dropped. 697 * reqp is invalid; start traversing 698 * the list from the beginning again. 699 */ 700 reqp = aiowp->work_tail1; 701 continue; 702 } 703 } 704 reqp = reqp->req_next; 705 } 706 /* 707 * Since the queued requests have been canceled, there can 708 * only be one inprogress request that should be canceled. 709 */ 710 if ((reqp = aiowp->work_req) != NULL && 711 (fd < 0 || reqp->req_args.fd == fd)) 712 (void) _aio_cancel_req(aiowp, reqp, canceled, done); 713 sig_mutex_unlock(&aiowp->work_qlock1); 714 } 715 716 /* 717 * Cancel a request. Return 1 if the callers locks were temporarily 718 * dropped, otherwise return 0. 719 */ 720 int 721 _aio_cancel_req(aio_worker_t *aiowp, aio_req_t *reqp, int *canceled, int *done) 722 { 723 int ostate = reqp->req_state; 724 725 ASSERT(MUTEX_HELD(&__aio_mutex)); 726 ASSERT(MUTEX_HELD(&aiowp->work_qlock1)); 727 if (ostate == AIO_REQ_CANCELED) 728 return (0); 729 if (ostate == AIO_REQ_DONE || ostate == AIO_REQ_DONEQ) { 730 (*done)++; 731 return (0); 732 } 733 if (reqp->req_op == AIOFSYNC && reqp != aiowp->work_req) { 734 ASSERT(POSIX_AIO(reqp)); 735 /* Cancel the queued aio_fsync() request */ 736 if (!reqp->req_head->lio_canned) { 737 reqp->req_head->lio_canned = 1; 738 _aio_outstand_cnt--; 739 (*canceled)++; 740 } 741 return (0); 742 } 743 reqp->req_state = AIO_REQ_CANCELED; 744 _aio_req_del(aiowp, reqp, ostate); 745 (void) _aio_hash_del(reqp->req_resultp); 746 (*canceled)++; 747 if (reqp == aiowp->work_req) { 748 ASSERT(ostate == AIO_REQ_INPROGRESS); 749 /* 750 * Set the result values now, before _aiodone() is called. 751 * We do this because the application can expect aio_return 752 * and aio_errno to be set to -1 and ECANCELED, respectively, 753 * immediately after a successful return from aiocancel() 754 * or aio_cancel(). 755 */ 756 _aio_set_result(reqp, -1, ECANCELED); 757 (void) thr_kill(aiowp->work_tid, SIGAIOCANCEL); 758 return (0); 759 } 760 if (!POSIX_AIO(reqp)) { 761 _aio_outstand_cnt--; 762 _aio_set_result(reqp, -1, ECANCELED); 763 return (0); 764 } 765 sig_mutex_unlock(&aiowp->work_qlock1); 766 sig_mutex_unlock(&__aio_mutex); 767 _aiodone(reqp, -1, ECANCELED); 768 sig_mutex_lock(&__aio_mutex); 769 sig_mutex_lock(&aiowp->work_qlock1); 770 return (1); 771 } 772 773 /* 774 * This is the worker's main routine. 775 * The task of this function is to execute all queued requests; 776 * once the last pending request is executed this function will block 777 * in _aio_idle(). A new incoming request must wakeup this thread to 778 * restart the work. 779 * Every worker has an own work queue. The queue lock is required 780 * to synchronize the addition of new requests for this worker or 781 * cancellation of pending/running requests. 782 * 783 * Cancellation scenarios: 784 * The cancellation of a request is being done asynchronously using 785 * _aio_cancel_req() from another thread context. 786 * A queued request can be cancelled in different manners : 787 * a) request is queued but not "in progress" or "done" (AIO_REQ_QUEUED): 788 * - lock the queue -> remove the request -> unlock the queue 789 * - this function/thread does not detect this cancellation process 790 * b) request is in progress (AIO_REQ_INPROGRESS) : 791 * - this function first allow the cancellation of the running 792 * request with the flag "work_cancel_flg=1" 793 * see _aio_req_get() -> _aio_cancel_on() 794 * During this phase, it is allowed to interrupt the worker 795 * thread running the request (this thread) using the SIGAIOCANCEL 796 * signal. 797 * Once this thread returns from the kernel (because the request 798 * is just done), then it must disable a possible cancellation 799 * and proceed to finish the request. To disable the cancellation 800 * this thread must use _aio_cancel_off() to set "work_cancel_flg=0". 801 * c) request is already done (AIO_REQ_DONE || AIO_REQ_DONEQ): 802 * same procedure as in a) 803 * 804 * To b) 805 * This thread uses sigsetjmp() to define the position in the code, where 806 * it wish to continue working in the case that a SIGAIOCANCEL signal 807 * is detected. 808 * Normally this thread should get the cancellation signal during the 809 * kernel phase (reading or writing). In that case the signal handler 810 * aiosigcancelhndlr() is activated using the worker thread context, 811 * which again will use the siglongjmp() function to break the standard 812 * code flow and jump to the "sigsetjmp" position, provided that 813 * "work_cancel_flg" is set to "1". 814 * Because the "work_cancel_flg" is only manipulated by this worker 815 * thread and it can only run on one CPU at a given time, it is not 816 * necessary to protect that flag with the queue lock. 817 * Returning from the kernel (read or write system call) we must 818 * first disable the use of the SIGAIOCANCEL signal and accordingly 819 * the use of the siglongjmp() function to prevent a possible deadlock: 820 * - It can happens that this worker thread returns from the kernel and 821 * blocks in "work_qlock1", 822 * - then a second thread cancels the apparently "in progress" request 823 * and sends the SIGAIOCANCEL signal to the worker thread, 824 * - the worker thread gets assigned the "work_qlock1" and will returns 825 * from the kernel, 826 * - the kernel detects the pending signal and activates the signal 827 * handler instead, 828 * - if the "work_cancel_flg" is still set then the signal handler 829 * should use siglongjmp() to cancel the "in progress" request and 830 * it would try to acquire the same work_qlock1 in _aio_req_get() 831 * for a second time => deadlock. 832 * To avoid that situation we disable the cancellation of the request 833 * in progress BEFORE we try to acquire the work_qlock1. 834 * In that case the signal handler will not call siglongjmp() and the 835 * worker thread will continue running the standard code flow. 836 * Then this thread must check the AIO_REQ_CANCELED flag to emulate 837 * an eventually required siglongjmp() freeing the work_qlock1 and 838 * avoiding a deadlock. 839 */ 840 void * 841 _aio_do_request(void *arglist) 842 { 843 aio_worker_t *aiowp = (aio_worker_t *)arglist; 844 struct aio_args *arg; 845 aio_req_t *reqp; /* current AIO request */ 846 ssize_t retval; 847 int error; 848 849 if (pthread_setspecific(_aio_key, aiowp) != 0) 850 _aiopanic("_aio_do_request, pthread_setspecific()"); 851 (void) pthread_sigmask(SIG_SETMASK, &_worker_set, NULL); 852 ASSERT(aiowp->work_req == NULL); 853 854 /* 855 * We resume here when an operation is cancelled. 856 * On first entry, aiowp->work_req == NULL, so all 857 * we do is block SIGAIOCANCEL. 858 */ 859 (void) sigsetjmp(aiowp->work_jmp_buf, 0); 860 861 _sigoff(); /* block SIGAIOCANCEL */ 862 if (aiowp->work_req != NULL) 863 _aio_finish_request(aiowp, -1, ECANCELED); 864 865 for (;;) { 866 /* 867 * Put completed requests on aio_done_list. This has 868 * to be done as part of the main loop to ensure that 869 * we don't artificially starve any aiowait'ers. 870 */ 871 if (aiowp->work_done1) 872 _aio_work_done(aiowp); 873 874 top: 875 /* consume any deferred SIGAIOCANCEL signal here */ 876 _sigon(); 877 _sigoff(); 878 879 while ((reqp = _aio_req_get(aiowp)) == NULL) 880 _aio_idle(aiowp); 881 arg = &reqp->req_args; 882 ASSERT(reqp->req_state == AIO_REQ_INPROGRESS || 883 reqp->req_state == AIO_REQ_CANCELED); 884 error = 0; 885 886 switch (reqp->req_op) { 887 case AIOREAD: 888 case AIOAREAD: 889 _sigon(); /* unblock SIGAIOCANCEL */ 890 retval = pread(arg->fd, arg->buf, 891 arg->bufsz, arg->offset); 892 if (retval == -1) { 893 if (errno == ESPIPE) { 894 retval = read(arg->fd, 895 arg->buf, arg->bufsz); 896 if (retval == -1) 897 error = errno; 898 } else { 899 error = errno; 900 } 901 } 902 _sigoff(); /* block SIGAIOCANCEL */ 903 break; 904 case AIOWRITE: 905 case AIOAWRITE: 906 _sigon(); /* unblock SIGAIOCANCEL */ 907 retval = pwrite(arg->fd, arg->buf, 908 arg->bufsz, arg->offset); 909 if (retval == -1) { 910 if (errno == ESPIPE) { 911 retval = write(arg->fd, 912 arg->buf, arg->bufsz); 913 if (retval == -1) 914 error = errno; 915 } else { 916 error = errno; 917 } 918 } 919 _sigoff(); /* block SIGAIOCANCEL */ 920 break; 921 #if !defined(_LP64) 922 case AIOAREAD64: 923 _sigon(); /* unblock SIGAIOCANCEL */ 924 retval = pread64(arg->fd, arg->buf, 925 arg->bufsz, arg->offset); 926 if (retval == -1) { 927 if (errno == ESPIPE) { 928 retval = read(arg->fd, 929 arg->buf, arg->bufsz); 930 if (retval == -1) 931 error = errno; 932 } else { 933 error = errno; 934 } 935 } 936 _sigoff(); /* block SIGAIOCANCEL */ 937 break; 938 case AIOAWRITE64: 939 _sigon(); /* unblock SIGAIOCANCEL */ 940 retval = pwrite64(arg->fd, arg->buf, 941 arg->bufsz, arg->offset); 942 if (retval == -1) { 943 if (errno == ESPIPE) { 944 retval = write(arg->fd, 945 arg->buf, arg->bufsz); 946 if (retval == -1) 947 error = errno; 948 } else { 949 error = errno; 950 } 951 } 952 _sigoff(); /* block SIGAIOCANCEL */ 953 break; 954 #endif /* !defined(_LP64) */ 955 case AIOFSYNC: 956 if (_aio_fsync_del(aiowp, reqp)) 957 goto top; 958 ASSERT(reqp->req_head == NULL); 959 /* 960 * All writes for this fsync request are now 961 * acknowledged. Now make these writes visible 962 * and put the final request into the hash table. 963 */ 964 if (reqp->req_state == AIO_REQ_CANCELED) { 965 /* EMPTY */; 966 } else if (arg->offset == O_SYNC) { 967 if ((retval = __fdsync(arg->fd, FSYNC)) == -1) 968 error = errno; 969 } else { 970 if ((retval = __fdsync(arg->fd, FDSYNC)) == -1) 971 error = errno; 972 } 973 if (_aio_hash_insert(reqp->req_resultp, reqp) != 0) 974 _aiopanic("_aio_do_request(): AIOFSYNC: " 975 "request already in hash table"); 976 break; 977 default: 978 _aiopanic("_aio_do_request, bad op"); 979 } 980 981 _aio_finish_request(aiowp, retval, error); 982 } 983 /* NOTREACHED */ 984 return (NULL); 985 } 986 987 /* 988 * Perform the tail processing for _aio_do_request(). 989 * The in-progress request may or may not have been cancelled. 990 */ 991 static void 992 _aio_finish_request(aio_worker_t *aiowp, ssize_t retval, int error) 993 { 994 aio_req_t *reqp; 995 996 sig_mutex_lock(&aiowp->work_qlock1); 997 if ((reqp = aiowp->work_req) == NULL) 998 sig_mutex_unlock(&aiowp->work_qlock1); 999 else { 1000 aiowp->work_req = NULL; 1001 if (reqp->req_state == AIO_REQ_CANCELED) { 1002 retval = -1; 1003 error = ECANCELED; 1004 } 1005 if (!POSIX_AIO(reqp)) { 1006 sig_mutex_unlock(&aiowp->work_qlock1); 1007 sig_mutex_lock(&__aio_mutex); 1008 if (reqp->req_state == AIO_REQ_INPROGRESS) 1009 reqp->req_state = AIO_REQ_DONE; 1010 _aio_req_done_cnt++; 1011 _aio_set_result(reqp, retval, error); 1012 if (error == ECANCELED) 1013 _aio_outstand_cnt--; 1014 sig_mutex_unlock(&__aio_mutex); 1015 } else { 1016 if (reqp->req_state == AIO_REQ_INPROGRESS) 1017 reqp->req_state = AIO_REQ_DONE; 1018 sig_mutex_unlock(&aiowp->work_qlock1); 1019 _aiodone(reqp, retval, error); 1020 } 1021 } 1022 } 1023 1024 void 1025 _aio_req_mark_done(aio_req_t *reqp) 1026 { 1027 #if !defined(_LP64) 1028 if (reqp->req_largefile) 1029 ((aiocb64_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE; 1030 else 1031 #endif 1032 ((aiocb_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE; 1033 } 1034 1035 /* 1036 * Sleep for 'ticks' clock ticks to give somebody else a chance to run, 1037 * hopefully to consume one of our queued signals. 1038 */ 1039 static void 1040 _aio_delay(int ticks) 1041 { 1042 (void) usleep(ticks * (MICROSEC / hz)); 1043 } 1044 1045 /* 1046 * Actually send the notifications. 1047 * We could block indefinitely here if the application 1048 * is not listening for the signal or port notifications. 1049 */ 1050 static void 1051 send_notification(notif_param_t *npp) 1052 { 1053 int backoff; 1054 1055 if (npp->np_signo) { 1056 backoff = 0; 1057 while (__sigqueue(__pid, npp->np_signo, npp->np_user, 1058 SI_ASYNCIO) == -1) { 1059 ASSERT(errno == EAGAIN); 1060 if (++backoff > 10) 1061 backoff = 10; 1062 _aio_delay(backoff); 1063 } 1064 } else if (npp->np_port >= 0) { 1065 (void) _port_dispatch(npp->np_port, 0, PORT_SOURCE_AIO, 1066 npp->np_event, npp->np_object, npp->np_user); 1067 } 1068 if (npp->np_lio_signo) { 1069 backoff = 0; 1070 while (__sigqueue(__pid, npp->np_lio_signo, npp->np_lio_user, 1071 SI_ASYNCIO) == -1) { 1072 ASSERT(errno == EAGAIN); 1073 if (++backoff > 10) 1074 backoff = 10; 1075 _aio_delay(backoff); 1076 } 1077 } else if (npp->np_lio_port >= 0) { 1078 (void) _port_dispatch(npp->np_lio_port, 0, PORT_SOURCE_AIO, 1079 npp->np_lio_event, npp->np_lio_object, npp->np_lio_user); 1080 } 1081 } 1082 1083 /* 1084 * Asynchronous notification worker. 1085 */ 1086 void * 1087 _aio_do_notify(void *arg) 1088 { 1089 aio_worker_t *aiowp = (aio_worker_t *)arg; 1090 aio_req_t *reqp; 1091 1092 /* 1093 * This isn't really necessary. All signals are blocked. 1094 */ 1095 if (pthread_setspecific(_aio_key, aiowp) != 0) 1096 _aiopanic("_aio_do_notify, pthread_setspecific()"); 1097 1098 /* 1099 * Notifications are never cancelled. 1100 * All signals remain blocked, forever. 1101 */ 1102 1103 for (;;) { 1104 while ((reqp = _aio_req_get(aiowp)) == NULL) 1105 _aio_idle(aiowp); 1106 send_notification(&reqp->req_notify); 1107 _aio_req_free(reqp); 1108 } 1109 1110 /* NOTREACHED */ 1111 return (NULL); 1112 } 1113 1114 /* 1115 * Do the completion semantics for a request that was either canceled 1116 * by _aio_cancel_req() or was completed by _aio_do_request(). 1117 */ 1118 static void 1119 _aiodone(aio_req_t *reqp, ssize_t retval, int error) 1120 { 1121 aio_result_t *resultp = reqp->req_resultp; 1122 int notify = 0; 1123 aio_lio_t *head; 1124 int sigev_none; 1125 int sigev_signal; 1126 int sigev_thread; 1127 int sigev_port; 1128 notif_param_t np; 1129 1130 /* 1131 * We call _aiodone() only for Posix I/O. 1132 */ 1133 ASSERT(POSIX_AIO(reqp)); 1134 1135 sigev_none = 0; 1136 sigev_signal = 0; 1137 sigev_thread = 0; 1138 sigev_port = 0; 1139 np.np_signo = 0; 1140 np.np_port = -1; 1141 np.np_lio_signo = 0; 1142 np.np_lio_port = -1; 1143 1144 switch (reqp->req_sigevent.sigev_notify) { 1145 case SIGEV_NONE: 1146 sigev_none = 1; 1147 break; 1148 case SIGEV_SIGNAL: 1149 sigev_signal = 1; 1150 break; 1151 case SIGEV_THREAD: 1152 sigev_thread = 1; 1153 break; 1154 case SIGEV_PORT: 1155 sigev_port = 1; 1156 break; 1157 default: 1158 _aiopanic("_aiodone: improper sigev_notify"); 1159 break; 1160 } 1161 1162 /* 1163 * Figure out the notification parameters while holding __aio_mutex. 1164 * Actually perform the notifications after dropping __aio_mutex. 1165 * This allows us to sleep for a long time (if the notifications 1166 * incur delays) without impeding other async I/O operations. 1167 */ 1168 1169 sig_mutex_lock(&__aio_mutex); 1170 1171 if (sigev_signal) { 1172 if ((np.np_signo = reqp->req_sigevent.sigev_signo) != 0) 1173 notify = 1; 1174 np.np_user = reqp->req_sigevent.sigev_value.sival_ptr; 1175 } else if (sigev_thread | sigev_port) { 1176 if ((np.np_port = reqp->req_sigevent.sigev_signo) >= 0) 1177 notify = 1; 1178 np.np_event = reqp->req_op; 1179 if (np.np_event == AIOFSYNC && reqp->req_largefile) 1180 np.np_event = AIOFSYNC64; 1181 np.np_object = (uintptr_t)reqp->req_aiocbp; 1182 np.np_user = reqp->req_sigevent.sigev_value.sival_ptr; 1183 } 1184 1185 if (resultp->aio_errno == EINPROGRESS) 1186 _aio_set_result(reqp, retval, error); 1187 1188 _aio_outstand_cnt--; 1189 1190 head = reqp->req_head; 1191 reqp->req_head = NULL; 1192 1193 if (sigev_none) { 1194 _aio_enq_doneq(reqp); 1195 reqp = NULL; 1196 } else { 1197 (void) _aio_hash_del(resultp); 1198 _aio_req_mark_done(reqp); 1199 } 1200 1201 _aio_waitn_wakeup(); 1202 1203 /* 1204 * __aio_waitn() sets AIO_WAIT_INPROGRESS and 1205 * __aio_suspend() increments "_aio_kernel_suspend" 1206 * when they are waiting in the kernel for completed I/Os. 1207 * 1208 * _kaio(AIONOTIFY) awakes the corresponding function 1209 * in the kernel; then the corresponding __aio_waitn() or 1210 * __aio_suspend() function could reap the recently 1211 * completed I/Os (_aiodone()). 1212 */ 1213 if ((_aio_flags & AIO_WAIT_INPROGRESS) || _aio_kernel_suspend > 0) 1214 (void) _kaio(AIONOTIFY); 1215 1216 sig_mutex_unlock(&__aio_mutex); 1217 1218 if (head != NULL) { 1219 /* 1220 * If all the lio requests have completed, 1221 * prepare to notify the waiting thread. 1222 */ 1223 sig_mutex_lock(&head->lio_mutex); 1224 ASSERT(head->lio_refcnt == head->lio_nent); 1225 if (head->lio_refcnt == 1) { 1226 int waiting = 0; 1227 if (head->lio_mode == LIO_WAIT) { 1228 if ((waiting = head->lio_waiting) != 0) 1229 (void) cond_signal(&head->lio_cond_cv); 1230 } else if (head->lio_port < 0) { /* none or signal */ 1231 if ((np.np_lio_signo = head->lio_signo) != 0) 1232 notify = 1; 1233 np.np_lio_user = head->lio_sigval.sival_ptr; 1234 } else { /* thread or port */ 1235 notify = 1; 1236 np.np_lio_port = head->lio_port; 1237 np.np_lio_event = head->lio_event; 1238 np.np_lio_object = 1239 (uintptr_t)head->lio_sigevent; 1240 np.np_lio_user = head->lio_sigval.sival_ptr; 1241 } 1242 head->lio_nent = head->lio_refcnt = 0; 1243 sig_mutex_unlock(&head->lio_mutex); 1244 if (waiting == 0) 1245 _aio_lio_free(head); 1246 } else { 1247 head->lio_nent--; 1248 head->lio_refcnt--; 1249 sig_mutex_unlock(&head->lio_mutex); 1250 } 1251 } 1252 1253 /* 1254 * The request is completed; now perform the notifications. 1255 */ 1256 if (notify) { 1257 if (reqp != NULL) { 1258 /* 1259 * We usually put the request on the notification 1260 * queue because we don't want to block and delay 1261 * other operations behind us in the work queue. 1262 * Also we must never block on a cancel notification 1263 * because we are being called from an application 1264 * thread in this case and that could lead to deadlock 1265 * if no other thread is receiving notificatins. 1266 */ 1267 reqp->req_notify = np; 1268 reqp->req_op = AIONOTIFY; 1269 _aio_req_add(reqp, &__workers_no, AIONOTIFY); 1270 reqp = NULL; 1271 } else { 1272 /* 1273 * We already put the request on the done queue, 1274 * so we can't queue it to the notification queue. 1275 * Just do the notification directly. 1276 */ 1277 send_notification(&np); 1278 } 1279 } 1280 1281 if (reqp != NULL) 1282 _aio_req_free(reqp); 1283 } 1284 1285 /* 1286 * Delete fsync requests from list head until there is 1287 * only one left. Return 0 when there is only one, 1288 * otherwise return a non-zero value. 1289 */ 1290 static int 1291 _aio_fsync_del(aio_worker_t *aiowp, aio_req_t *reqp) 1292 { 1293 aio_lio_t *head = reqp->req_head; 1294 int rval = 0; 1295 1296 ASSERT(reqp == aiowp->work_req); 1297 sig_mutex_lock(&aiowp->work_qlock1); 1298 sig_mutex_lock(&head->lio_mutex); 1299 if (head->lio_refcnt > 1) { 1300 head->lio_refcnt--; 1301 head->lio_nent--; 1302 aiowp->work_req = NULL; 1303 sig_mutex_unlock(&head->lio_mutex); 1304 sig_mutex_unlock(&aiowp->work_qlock1); 1305 sig_mutex_lock(&__aio_mutex); 1306 _aio_outstand_cnt--; 1307 _aio_waitn_wakeup(); 1308 sig_mutex_unlock(&__aio_mutex); 1309 _aio_req_free(reqp); 1310 return (1); 1311 } 1312 ASSERT(head->lio_nent == 1 && head->lio_refcnt == 1); 1313 reqp->req_head = NULL; 1314 if (head->lio_canned) 1315 reqp->req_state = AIO_REQ_CANCELED; 1316 if (head->lio_mode == LIO_DESTROY) { 1317 aiowp->work_req = NULL; 1318 rval = 1; 1319 } 1320 sig_mutex_unlock(&head->lio_mutex); 1321 sig_mutex_unlock(&aiowp->work_qlock1); 1322 head->lio_refcnt--; 1323 head->lio_nent--; 1324 _aio_lio_free(head); 1325 if (rval != 0) 1326 _aio_req_free(reqp); 1327 return (rval); 1328 } 1329 1330 /* 1331 * worker is set idle when its work queue is empty. 1332 * The worker checks again that it has no more work and then 1333 * goes to sleep waiting for more work. 1334 */ 1335 void 1336 _aio_idle(aio_worker_t *aiowp) 1337 { 1338 int error = 0; 1339 1340 sig_mutex_lock(&aiowp->work_qlock1); 1341 if (aiowp->work_count1 == 0) { 1342 ASSERT(aiowp->work_minload1 == 0); 1343 aiowp->work_idleflg = 1; 1344 /* 1345 * A cancellation handler is not needed here. 1346 * aio worker threads are never cancelled via pthread_cancel(). 1347 */ 1348 error = sig_cond_wait(&aiowp->work_idle_cv, 1349 &aiowp->work_qlock1); 1350 /* 1351 * The idle flag is normally cleared before worker is awakened 1352 * by aio_req_add(). On error (EINTR), we clear it ourself. 1353 */ 1354 if (error) 1355 aiowp->work_idleflg = 0; 1356 } 1357 sig_mutex_unlock(&aiowp->work_qlock1); 1358 } 1359 1360 /* 1361 * A worker's completed AIO requests are placed onto a global 1362 * done queue. The application is only sent a SIGIO signal if 1363 * the process has a handler enabled and it is not waiting via 1364 * aiowait(). 1365 */ 1366 static void 1367 _aio_work_done(aio_worker_t *aiowp) 1368 { 1369 aio_req_t *reqp; 1370 1371 sig_mutex_lock(&aiowp->work_qlock1); 1372 reqp = aiowp->work_prev1; 1373 reqp->req_next = NULL; 1374 aiowp->work_done1 = 0; 1375 aiowp->work_tail1 = aiowp->work_next1; 1376 if (aiowp->work_tail1 == NULL) 1377 aiowp->work_head1 = NULL; 1378 aiowp->work_prev1 = NULL; 1379 sig_mutex_unlock(&aiowp->work_qlock1); 1380 sig_mutex_lock(&__aio_mutex); 1381 _aio_donecnt++; 1382 _aio_outstand_cnt--; 1383 _aio_req_done_cnt--; 1384 ASSERT(_aio_donecnt > 0 && 1385 _aio_outstand_cnt >= 0 && 1386 _aio_req_done_cnt >= 0); 1387 ASSERT(reqp != NULL); 1388 1389 if (_aio_done_tail == NULL) { 1390 _aio_done_head = _aio_done_tail = reqp; 1391 } else { 1392 _aio_done_head->req_next = reqp; 1393 _aio_done_head = reqp; 1394 } 1395 1396 if (_aiowait_flag) { 1397 sig_mutex_unlock(&__aio_mutex); 1398 (void) _kaio(AIONOTIFY); 1399 } else { 1400 sig_mutex_unlock(&__aio_mutex); 1401 if (_sigio_enabled) 1402 (void) kill(__pid, SIGIO); 1403 } 1404 } 1405 1406 /* 1407 * The done queue consists of AIO requests that are in either the 1408 * AIO_REQ_DONE or AIO_REQ_CANCELED state. Requests that were cancelled 1409 * are discarded. If the done queue is empty then NULL is returned. 1410 * Otherwise the address of a done aio_result_t is returned. 1411 */ 1412 aio_result_t * 1413 _aio_req_done(void) 1414 { 1415 aio_req_t *reqp; 1416 aio_result_t *resultp; 1417 1418 ASSERT(MUTEX_HELD(&__aio_mutex)); 1419 1420 if ((reqp = _aio_done_tail) != NULL) { 1421 if ((_aio_done_tail = reqp->req_next) == NULL) 1422 _aio_done_head = NULL; 1423 ASSERT(_aio_donecnt > 0); 1424 _aio_donecnt--; 1425 (void) _aio_hash_del(reqp->req_resultp); 1426 resultp = reqp->req_resultp; 1427 ASSERT(reqp->req_state == AIO_REQ_DONE); 1428 _aio_req_free(reqp); 1429 return (resultp); 1430 } 1431 /* is queue empty? */ 1432 if (reqp == NULL && _aio_outstand_cnt == 0) { 1433 return ((aio_result_t *)-1); 1434 } 1435 return (NULL); 1436 } 1437 1438 /* 1439 * Set the return and errno values for the application's use. 1440 * 1441 * For the Posix interfaces, we must set the return value first followed 1442 * by the errno value because the Posix interfaces allow for a change 1443 * in the errno value from EINPROGRESS to something else to signal 1444 * the completion of the asynchronous request. 1445 * 1446 * The opposite is true for the Solaris interfaces. These allow for 1447 * a change in the return value from AIO_INPROGRESS to something else 1448 * to signal the completion of the asynchronous request. 1449 */ 1450 void 1451 _aio_set_result(aio_req_t *reqp, ssize_t retval, int error) 1452 { 1453 aio_result_t *resultp = reqp->req_resultp; 1454 1455 if (POSIX_AIO(reqp)) { 1456 resultp->aio_return = retval; 1457 membar_producer(); 1458 resultp->aio_errno = error; 1459 } else { 1460 resultp->aio_errno = error; 1461 membar_producer(); 1462 resultp->aio_return = retval; 1463 } 1464 } 1465 1466 /* 1467 * Add an AIO request onto the next work queue. 1468 * A circular list of workers is used to choose the next worker. 1469 */ 1470 void 1471 _aio_req_add(aio_req_t *reqp, aio_worker_t **nextworker, int mode) 1472 { 1473 aio_worker_t *aiowp; 1474 aio_worker_t *first; 1475 int load_bal_flg = 1; 1476 int found; 1477 1478 ASSERT(reqp->req_state != AIO_REQ_DONEQ); 1479 reqp->req_next = NULL; 1480 /* 1481 * Try to acquire the next worker's work queue. If it is locked, 1482 * then search the list of workers until a queue is found unlocked, 1483 * or until the list is completely traversed at which point another 1484 * worker will be created. 1485 */ 1486 _sigoff(); /* defer SIGIO */ 1487 sig_mutex_lock(&__aio_mutex); 1488 first = aiowp = *nextworker; 1489 if (mode != AIONOTIFY) 1490 _aio_outstand_cnt++; 1491 sig_mutex_unlock(&__aio_mutex); 1492 1493 switch (mode) { 1494 case AIOREAD: 1495 case AIOWRITE: 1496 case AIOAREAD: 1497 case AIOAWRITE: 1498 #if !defined(_LP64) 1499 case AIOAREAD64: 1500 case AIOAWRITE64: 1501 #endif 1502 /* try to find an idle worker */ 1503 found = 0; 1504 do { 1505 if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) { 1506 if (aiowp->work_idleflg) { 1507 found = 1; 1508 break; 1509 } 1510 sig_mutex_unlock(&aiowp->work_qlock1); 1511 } 1512 } while ((aiowp = aiowp->work_forw) != first); 1513 1514 if (found) { 1515 aiowp->work_minload1++; 1516 break; 1517 } 1518 1519 /* try to acquire some worker's queue lock */ 1520 do { 1521 if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) { 1522 found = 1; 1523 break; 1524 } 1525 } while ((aiowp = aiowp->work_forw) != first); 1526 1527 /* 1528 * Create more workers when the workers appear overloaded. 1529 * Either all the workers are busy draining their queues 1530 * or no worker's queue lock could be acquired. 1531 */ 1532 if (!found) { 1533 if (_aio_worker_cnt < _max_workers) { 1534 if (_aio_create_worker(reqp, mode)) 1535 _aiopanic("_aio_req_add: add worker"); 1536 _sigon(); /* reenable SIGIO */ 1537 return; 1538 } 1539 1540 /* 1541 * No worker available and we have created 1542 * _max_workers, keep going through the 1543 * list slowly until we get a lock 1544 */ 1545 while (sig_mutex_trylock(&aiowp->work_qlock1) != 0) { 1546 /* 1547 * give someone else a chance 1548 */ 1549 _aio_delay(1); 1550 aiowp = aiowp->work_forw; 1551 } 1552 } 1553 1554 ASSERT(MUTEX_HELD(&aiowp->work_qlock1)); 1555 if (_aio_worker_cnt < _max_workers && 1556 aiowp->work_minload1 >= _minworkload) { 1557 sig_mutex_unlock(&aiowp->work_qlock1); 1558 sig_mutex_lock(&__aio_mutex); 1559 *nextworker = aiowp->work_forw; 1560 sig_mutex_unlock(&__aio_mutex); 1561 if (_aio_create_worker(reqp, mode)) 1562 _aiopanic("aio_req_add: add worker"); 1563 _sigon(); /* reenable SIGIO */ 1564 return; 1565 } 1566 aiowp->work_minload1++; 1567 break; 1568 case AIOFSYNC: 1569 case AIONOTIFY: 1570 load_bal_flg = 0; 1571 sig_mutex_lock(&aiowp->work_qlock1); 1572 break; 1573 default: 1574 _aiopanic("_aio_req_add: invalid mode"); 1575 break; 1576 } 1577 /* 1578 * Put request onto worker's work queue. 1579 */ 1580 if (aiowp->work_tail1 == NULL) { 1581 ASSERT(aiowp->work_count1 == 0); 1582 aiowp->work_tail1 = reqp; 1583 aiowp->work_next1 = reqp; 1584 } else { 1585 aiowp->work_head1->req_next = reqp; 1586 if (aiowp->work_next1 == NULL) 1587 aiowp->work_next1 = reqp; 1588 } 1589 reqp->req_state = AIO_REQ_QUEUED; 1590 reqp->req_worker = aiowp; 1591 aiowp->work_head1 = reqp; 1592 /* 1593 * Awaken worker if it is not currently active. 1594 */ 1595 if (aiowp->work_count1++ == 0 && aiowp->work_idleflg) { 1596 aiowp->work_idleflg = 0; 1597 (void) cond_signal(&aiowp->work_idle_cv); 1598 } 1599 sig_mutex_unlock(&aiowp->work_qlock1); 1600 1601 if (load_bal_flg) { 1602 sig_mutex_lock(&__aio_mutex); 1603 *nextworker = aiowp->work_forw; 1604 sig_mutex_unlock(&__aio_mutex); 1605 } 1606 _sigon(); /* reenable SIGIO */ 1607 } 1608 1609 /* 1610 * Get an AIO request for a specified worker. 1611 * If the work queue is empty, return NULL. 1612 */ 1613 aio_req_t * 1614 _aio_req_get(aio_worker_t *aiowp) 1615 { 1616 aio_req_t *reqp; 1617 1618 sig_mutex_lock(&aiowp->work_qlock1); 1619 if ((reqp = aiowp->work_next1) != NULL) { 1620 /* 1621 * Remove a POSIX request from the queue; the 1622 * request queue is a singularly linked list 1623 * with a previous pointer. The request is 1624 * removed by updating the previous pointer. 1625 * 1626 * Non-posix requests are left on the queue 1627 * to eventually be placed on the done queue. 1628 */ 1629 1630 if (POSIX_AIO(reqp)) { 1631 if (aiowp->work_prev1 == NULL) { 1632 aiowp->work_tail1 = reqp->req_next; 1633 if (aiowp->work_tail1 == NULL) 1634 aiowp->work_head1 = NULL; 1635 } else { 1636 aiowp->work_prev1->req_next = reqp->req_next; 1637 if (aiowp->work_head1 == reqp) 1638 aiowp->work_head1 = reqp->req_next; 1639 } 1640 1641 } else { 1642 aiowp->work_prev1 = reqp; 1643 ASSERT(aiowp->work_done1 >= 0); 1644 aiowp->work_done1++; 1645 } 1646 ASSERT(reqp != reqp->req_next); 1647 aiowp->work_next1 = reqp->req_next; 1648 ASSERT(aiowp->work_count1 >= 1); 1649 aiowp->work_count1--; 1650 switch (reqp->req_op) { 1651 case AIOREAD: 1652 case AIOWRITE: 1653 case AIOAREAD: 1654 case AIOAWRITE: 1655 #if !defined(_LP64) 1656 case AIOAREAD64: 1657 case AIOAWRITE64: 1658 #endif 1659 ASSERT(aiowp->work_minload1 > 0); 1660 aiowp->work_minload1--; 1661 break; 1662 } 1663 reqp->req_state = AIO_REQ_INPROGRESS; 1664 } 1665 aiowp->work_req = reqp; 1666 ASSERT(reqp != NULL || aiowp->work_count1 == 0); 1667 sig_mutex_unlock(&aiowp->work_qlock1); 1668 return (reqp); 1669 } 1670 1671 static void 1672 _aio_req_del(aio_worker_t *aiowp, aio_req_t *reqp, int ostate) 1673 { 1674 aio_req_t **last; 1675 aio_req_t *lastrp; 1676 aio_req_t *next; 1677 1678 ASSERT(aiowp != NULL); 1679 ASSERT(MUTEX_HELD(&aiowp->work_qlock1)); 1680 if (POSIX_AIO(reqp)) { 1681 if (ostate != AIO_REQ_QUEUED) 1682 return; 1683 } 1684 last = &aiowp->work_tail1; 1685 lastrp = aiowp->work_tail1; 1686 ASSERT(ostate == AIO_REQ_QUEUED || ostate == AIO_REQ_INPROGRESS); 1687 while ((next = *last) != NULL) { 1688 if (next == reqp) { 1689 *last = next->req_next; 1690 if (aiowp->work_next1 == next) 1691 aiowp->work_next1 = next->req_next; 1692 1693 if ((next->req_next != NULL) || 1694 (aiowp->work_done1 == 0)) { 1695 if (aiowp->work_head1 == next) 1696 aiowp->work_head1 = next->req_next; 1697 if (aiowp->work_prev1 == next) 1698 aiowp->work_prev1 = next->req_next; 1699 } else { 1700 if (aiowp->work_head1 == next) 1701 aiowp->work_head1 = lastrp; 1702 if (aiowp->work_prev1 == next) 1703 aiowp->work_prev1 = lastrp; 1704 } 1705 1706 if (ostate == AIO_REQ_QUEUED) { 1707 ASSERT(aiowp->work_count1 >= 1); 1708 aiowp->work_count1--; 1709 ASSERT(aiowp->work_minload1 >= 1); 1710 aiowp->work_minload1--; 1711 } else { 1712 ASSERT(ostate == AIO_REQ_INPROGRESS && 1713 !POSIX_AIO(reqp)); 1714 aiowp->work_done1--; 1715 } 1716 return; 1717 } 1718 last = &next->req_next; 1719 lastrp = next; 1720 } 1721 /* NOTREACHED */ 1722 } 1723 1724 static void 1725 _aio_enq_doneq(aio_req_t *reqp) 1726 { 1727 if (_aio_doneq == NULL) { 1728 _aio_doneq = reqp; 1729 reqp->req_next = reqp->req_prev = reqp; 1730 } else { 1731 reqp->req_next = _aio_doneq; 1732 reqp->req_prev = _aio_doneq->req_prev; 1733 _aio_doneq->req_prev->req_next = reqp; 1734 _aio_doneq->req_prev = reqp; 1735 } 1736 reqp->req_state = AIO_REQ_DONEQ; 1737 _aio_doneq_cnt++; 1738 } 1739 1740 /* 1741 * caller owns the _aio_mutex 1742 */ 1743 aio_req_t * 1744 _aio_req_remove(aio_req_t *reqp) 1745 { 1746 if (reqp && reqp->req_state != AIO_REQ_DONEQ) 1747 return (NULL); 1748 1749 if (reqp) { 1750 /* request in done queue */ 1751 if (_aio_doneq == reqp) 1752 _aio_doneq = reqp->req_next; 1753 if (_aio_doneq == reqp) { 1754 /* only one request on queue */ 1755 _aio_doneq = NULL; 1756 } else { 1757 aio_req_t *tmp = reqp->req_next; 1758 reqp->req_prev->req_next = tmp; 1759 tmp->req_prev = reqp->req_prev; 1760 } 1761 } else if ((reqp = _aio_doneq) != NULL) { 1762 if (reqp == reqp->req_next) { 1763 /* only one request on queue */ 1764 _aio_doneq = NULL; 1765 } else { 1766 reqp->req_prev->req_next = _aio_doneq = reqp->req_next; 1767 _aio_doneq->req_prev = reqp->req_prev; 1768 } 1769 } 1770 if (reqp) { 1771 _aio_doneq_cnt--; 1772 reqp->req_next = reqp->req_prev = reqp; 1773 reqp->req_state = AIO_REQ_DONE; 1774 } 1775 return (reqp); 1776 } 1777 1778 /* 1779 * An AIO request is identified by an aio_result_t pointer. The library 1780 * maps this aio_result_t pointer to its internal representation using a 1781 * hash table. This function adds an aio_result_t pointer to the hash table. 1782 */ 1783 static int 1784 _aio_hash_insert(aio_result_t *resultp, aio_req_t *reqp) 1785 { 1786 aio_hash_t *hashp; 1787 aio_req_t **prev; 1788 aio_req_t *next; 1789 1790 hashp = _aio_hash + AIOHASH(resultp); 1791 sig_mutex_lock(&hashp->hash_lock); 1792 prev = &hashp->hash_ptr; 1793 while ((next = *prev) != NULL) { 1794 if (resultp == next->req_resultp) { 1795 sig_mutex_unlock(&hashp->hash_lock); 1796 return (-1); 1797 } 1798 prev = &next->req_link; 1799 } 1800 *prev = reqp; 1801 ASSERT(reqp->req_link == NULL); 1802 sig_mutex_unlock(&hashp->hash_lock); 1803 return (0); 1804 } 1805 1806 /* 1807 * Remove an entry from the hash table. 1808 */ 1809 aio_req_t * 1810 _aio_hash_del(aio_result_t *resultp) 1811 { 1812 aio_hash_t *hashp; 1813 aio_req_t **prev; 1814 aio_req_t *next = NULL; 1815 1816 if (_aio_hash != NULL) { 1817 hashp = _aio_hash + AIOHASH(resultp); 1818 sig_mutex_lock(&hashp->hash_lock); 1819 prev = &hashp->hash_ptr; 1820 while ((next = *prev) != NULL) { 1821 if (resultp == next->req_resultp) { 1822 *prev = next->req_link; 1823 next->req_link = NULL; 1824 break; 1825 } 1826 prev = &next->req_link; 1827 } 1828 sig_mutex_unlock(&hashp->hash_lock); 1829 } 1830 return (next); 1831 } 1832 1833 /* 1834 * find an entry in the hash table 1835 */ 1836 aio_req_t * 1837 _aio_hash_find(aio_result_t *resultp) 1838 { 1839 aio_hash_t *hashp; 1840 aio_req_t **prev; 1841 aio_req_t *next = NULL; 1842 1843 if (_aio_hash != NULL) { 1844 hashp = _aio_hash + AIOHASH(resultp); 1845 sig_mutex_lock(&hashp->hash_lock); 1846 prev = &hashp->hash_ptr; 1847 while ((next = *prev) != NULL) { 1848 if (resultp == next->req_resultp) 1849 break; 1850 prev = &next->req_link; 1851 } 1852 sig_mutex_unlock(&hashp->hash_lock); 1853 } 1854 return (next); 1855 } 1856 1857 /* 1858 * AIO interface for POSIX 1859 */ 1860 int 1861 _aio_rw(aiocb_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker, 1862 int mode, int flg) 1863 { 1864 aio_req_t *reqp; 1865 aio_args_t *ap; 1866 int kerr; 1867 1868 if (aiocbp == NULL) { 1869 errno = EINVAL; 1870 return (-1); 1871 } 1872 1873 /* initialize kaio */ 1874 if (!_kaio_ok) 1875 _kaio_init(); 1876 1877 aiocbp->aio_state = NOCHECK; 1878 1879 /* 1880 * If we have been called because a list I/O 1881 * kaio() failed, we dont want to repeat the 1882 * system call 1883 */ 1884 1885 if (flg & AIO_KAIO) { 1886 /* 1887 * Try kernel aio first. 1888 * If errno is ENOTSUP/EBADFD, 1889 * fall back to the thread implementation. 1890 */ 1891 if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) { 1892 aiocbp->aio_resultp.aio_errno = EINPROGRESS; 1893 aiocbp->aio_state = CHECK; 1894 kerr = (int)_kaio(mode, aiocbp); 1895 if (kerr == 0) 1896 return (0); 1897 if (errno != ENOTSUP && errno != EBADFD) { 1898 aiocbp->aio_resultp.aio_errno = errno; 1899 aiocbp->aio_resultp.aio_return = -1; 1900 aiocbp->aio_state = NOCHECK; 1901 return (-1); 1902 } 1903 if (errno == EBADFD) 1904 SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes); 1905 } 1906 } 1907 1908 aiocbp->aio_resultp.aio_errno = EINPROGRESS; 1909 aiocbp->aio_state = USERAIO; 1910 1911 if (!__uaio_ok && __uaio_init() == -1) 1912 return (-1); 1913 1914 if ((reqp = _aio_req_alloc()) == NULL) { 1915 errno = EAGAIN; 1916 return (-1); 1917 } 1918 1919 /* 1920 * If an LIO request, add the list head to the aio request 1921 */ 1922 reqp->req_head = lio_head; 1923 reqp->req_type = AIO_POSIX_REQ; 1924 reqp->req_op = mode; 1925 reqp->req_largefile = 0; 1926 1927 if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) { 1928 reqp->req_sigevent.sigev_notify = SIGEV_NONE; 1929 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) { 1930 reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL; 1931 reqp->req_sigevent.sigev_signo = 1932 aiocbp->aio_sigevent.sigev_signo; 1933 reqp->req_sigevent.sigev_value.sival_ptr = 1934 aiocbp->aio_sigevent.sigev_value.sival_ptr; 1935 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) { 1936 port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr; 1937 reqp->req_sigevent.sigev_notify = SIGEV_PORT; 1938 /* 1939 * Reuse the sigevent structure to contain the port number 1940 * and the user value. Same for SIGEV_THREAD, below. 1941 */ 1942 reqp->req_sigevent.sigev_signo = 1943 pn->portnfy_port; 1944 reqp->req_sigevent.sigev_value.sival_ptr = 1945 pn->portnfy_user; 1946 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) { 1947 reqp->req_sigevent.sigev_notify = SIGEV_THREAD; 1948 /* 1949 * The sigevent structure contains the port number 1950 * and the user value. Same for SIGEV_PORT, above. 1951 */ 1952 reqp->req_sigevent.sigev_signo = 1953 aiocbp->aio_sigevent.sigev_signo; 1954 reqp->req_sigevent.sigev_value.sival_ptr = 1955 aiocbp->aio_sigevent.sigev_value.sival_ptr; 1956 } 1957 1958 reqp->req_resultp = &aiocbp->aio_resultp; 1959 reqp->req_aiocbp = aiocbp; 1960 ap = &reqp->req_args; 1961 ap->fd = aiocbp->aio_fildes; 1962 ap->buf = (caddr_t)aiocbp->aio_buf; 1963 ap->bufsz = aiocbp->aio_nbytes; 1964 ap->offset = aiocbp->aio_offset; 1965 1966 if ((flg & AIO_NO_DUPS) && 1967 _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) { 1968 _aiopanic("_aio_rw(): request already in hash table"); 1969 _aio_req_free(reqp); 1970 errno = EINVAL; 1971 return (-1); 1972 } 1973 _aio_req_add(reqp, nextworker, mode); 1974 return (0); 1975 } 1976 1977 #if !defined(_LP64) 1978 /* 1979 * 64-bit AIO interface for POSIX 1980 */ 1981 int 1982 _aio_rw64(aiocb64_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker, 1983 int mode, int flg) 1984 { 1985 aio_req_t *reqp; 1986 aio_args_t *ap; 1987 int kerr; 1988 1989 if (aiocbp == NULL) { 1990 errno = EINVAL; 1991 return (-1); 1992 } 1993 1994 /* initialize kaio */ 1995 if (!_kaio_ok) 1996 _kaio_init(); 1997 1998 aiocbp->aio_state = NOCHECK; 1999 2000 /* 2001 * If we have been called because a list I/O 2002 * kaio() failed, we dont want to repeat the 2003 * system call 2004 */ 2005 2006 if (flg & AIO_KAIO) { 2007 /* 2008 * Try kernel aio first. 2009 * If errno is ENOTSUP/EBADFD, 2010 * fall back to the thread implementation. 2011 */ 2012 if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) { 2013 aiocbp->aio_resultp.aio_errno = EINPROGRESS; 2014 aiocbp->aio_state = CHECK; 2015 kerr = (int)_kaio(mode, aiocbp); 2016 if (kerr == 0) 2017 return (0); 2018 if (errno != ENOTSUP && errno != EBADFD) { 2019 aiocbp->aio_resultp.aio_errno = errno; 2020 aiocbp->aio_resultp.aio_return = -1; 2021 aiocbp->aio_state = NOCHECK; 2022 return (-1); 2023 } 2024 if (errno == EBADFD) 2025 SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes); 2026 } 2027 } 2028 2029 aiocbp->aio_resultp.aio_errno = EINPROGRESS; 2030 aiocbp->aio_state = USERAIO; 2031 2032 if (!__uaio_ok && __uaio_init() == -1) 2033 return (-1); 2034 2035 if ((reqp = _aio_req_alloc()) == NULL) { 2036 errno = EAGAIN; 2037 return (-1); 2038 } 2039 2040 /* 2041 * If an LIO request, add the list head to the aio request 2042 */ 2043 reqp->req_head = lio_head; 2044 reqp->req_type = AIO_POSIX_REQ; 2045 reqp->req_op = mode; 2046 reqp->req_largefile = 1; 2047 2048 if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) { 2049 reqp->req_sigevent.sigev_notify = SIGEV_NONE; 2050 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) { 2051 reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL; 2052 reqp->req_sigevent.sigev_signo = 2053 aiocbp->aio_sigevent.sigev_signo; 2054 reqp->req_sigevent.sigev_value.sival_ptr = 2055 aiocbp->aio_sigevent.sigev_value.sival_ptr; 2056 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) { 2057 port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr; 2058 reqp->req_sigevent.sigev_notify = SIGEV_PORT; 2059 reqp->req_sigevent.sigev_signo = 2060 pn->portnfy_port; 2061 reqp->req_sigevent.sigev_value.sival_ptr = 2062 pn->portnfy_user; 2063 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) { 2064 reqp->req_sigevent.sigev_notify = SIGEV_THREAD; 2065 reqp->req_sigevent.sigev_signo = 2066 aiocbp->aio_sigevent.sigev_signo; 2067 reqp->req_sigevent.sigev_value.sival_ptr = 2068 aiocbp->aio_sigevent.sigev_value.sival_ptr; 2069 } 2070 2071 reqp->req_resultp = &aiocbp->aio_resultp; 2072 reqp->req_aiocbp = aiocbp; 2073 ap = &reqp->req_args; 2074 ap->fd = aiocbp->aio_fildes; 2075 ap->buf = (caddr_t)aiocbp->aio_buf; 2076 ap->bufsz = aiocbp->aio_nbytes; 2077 ap->offset = aiocbp->aio_offset; 2078 2079 if ((flg & AIO_NO_DUPS) && 2080 _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) { 2081 _aiopanic("_aio_rw64(): request already in hash table"); 2082 _aio_req_free(reqp); 2083 errno = EINVAL; 2084 return (-1); 2085 } 2086 _aio_req_add(reqp, nextworker, mode); 2087 return (0); 2088 } 2089 #endif /* !defined(_LP64) */ 2090