1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 22 /* 23 * Copyright 2007 Sun Microsystems, Inc. All rights reserved. 24 * Use is subject to license terms. 25 */ 26 27 #pragma ident "%Z%%M% %I% %E% SMI" 28 29 #include "synonyms.h" 30 #include "thr_uberdata.h" 31 #include "asyncio.h" 32 #include <atomic.h> 33 #include <sys/param.h> 34 #include <sys/file.h> 35 #include <sys/port.h> 36 37 static int _aio_hash_insert(aio_result_t *, aio_req_t *); 38 static aio_req_t *_aio_req_get(aio_worker_t *); 39 static void _aio_req_add(aio_req_t *, aio_worker_t **, int); 40 static void _aio_req_del(aio_worker_t *, aio_req_t *, int); 41 static void _aio_work_done(aio_worker_t *); 42 static void _aio_enq_doneq(aio_req_t *); 43 44 extern void _aio_lio_free(aio_lio_t *); 45 46 extern int __fdsync(int, int); 47 extern int _port_dispatch(int, int, int, int, uintptr_t, void *); 48 49 static int _aio_fsync_del(aio_worker_t *, aio_req_t *); 50 static void _aiodone(aio_req_t *, ssize_t, int); 51 static void _aio_cancel_work(aio_worker_t *, int, int *, int *); 52 static void _aio_finish_request(aio_worker_t *, ssize_t, int); 53 54 /* 55 * switch for kernel async I/O 56 */ 57 int _kaio_ok = 0; /* 0 = disabled, 1 = on, -1 = error */ 58 59 /* 60 * Key for thread-specific data 61 */ 62 pthread_key_t _aio_key; 63 64 /* 65 * Array for determining whether or not a file supports kaio. 66 * Initialized in _kaio_init(). 67 */ 68 uint32_t *_kaio_supported = NULL; 69 70 /* 71 * workers for read/write requests 72 * (__aio_mutex lock protects circular linked list of workers) 73 */ 74 aio_worker_t *__workers_rw; /* circular list of AIO workers */ 75 aio_worker_t *__nextworker_rw; /* next worker in list of workers */ 76 int __rw_workerscnt; /* number of read/write workers */ 77 78 /* 79 * worker for notification requests. 80 */ 81 aio_worker_t *__workers_no; /* circular list of AIO workers */ 82 aio_worker_t *__nextworker_no; /* next worker in list of workers */ 83 int __no_workerscnt; /* number of write workers */ 84 85 aio_req_t *_aio_done_tail; /* list of done requests */ 86 aio_req_t *_aio_done_head; 87 88 mutex_t __aio_initlock = DEFAULTMUTEX; /* makes aio initialization atomic */ 89 cond_t __aio_initcv = DEFAULTCV; 90 int __aio_initbusy = 0; 91 92 mutex_t __aio_mutex = DEFAULTMUTEX; /* protects counts, and linked lists */ 93 cond_t _aio_iowait_cv = DEFAULTCV; /* wait for userland I/Os */ 94 95 pid_t __pid = (pid_t)-1; /* initialize as invalid pid */ 96 int _sigio_enabled = 0; /* when set, send SIGIO signal */ 97 98 aio_hash_t *_aio_hash; 99 100 aio_req_t *_aio_doneq; /* double linked done queue list */ 101 102 int _aio_donecnt = 0; 103 int _aio_waitncnt = 0; /* # of requests for aio_waitn */ 104 int _aio_doneq_cnt = 0; 105 int _aio_outstand_cnt = 0; /* # of outstanding requests */ 106 int _kaio_outstand_cnt = 0; /* # of outstanding kaio requests */ 107 int _aio_req_done_cnt = 0; /* req. done but not in "done queue" */ 108 int _aio_kernel_suspend = 0; /* active kernel kaio calls */ 109 int _aio_suscv_cnt = 0; /* aio_suspend calls waiting on cv's */ 110 111 int _max_workers = 256; /* max number of workers permitted */ 112 int _min_workers = 4; /* min number of workers */ 113 int _minworkload = 2; /* min number of request in q */ 114 int _aio_worker_cnt = 0; /* number of workers to do requests */ 115 int __uaio_ok = 0; /* AIO has been enabled */ 116 sigset_t _worker_set; /* worker's signal mask */ 117 118 int _aiowait_flag = 0; /* when set, aiowait() is inprogress */ 119 int _aio_flags = 0; /* see asyncio.h defines for */ 120 121 aio_worker_t *_kaiowp = NULL; /* points to kaio cleanup thread */ 122 123 int hz; /* clock ticks per second */ 124 125 static int 126 _kaio_supported_init(void) 127 { 128 void *ptr; 129 size_t size; 130 131 if (_kaio_supported != NULL) /* already initialized */ 132 return (0); 133 134 size = MAX_KAIO_FDARRAY_SIZE * sizeof (uint32_t); 135 ptr = mmap(NULL, size, PROT_READ | PROT_WRITE, 136 MAP_PRIVATE | MAP_ANON, -1, (off_t)0); 137 if (ptr == MAP_FAILED) 138 return (-1); 139 _kaio_supported = ptr; 140 return (0); 141 } 142 143 /* 144 * The aio subsystem is initialized when an AIO request is made. 145 * Constants are initialized like the max number of workers that 146 * the subsystem can create, and the minimum number of workers 147 * permitted before imposing some restrictions. Also, some 148 * workers are created. 149 */ 150 int 151 __uaio_init(void) 152 { 153 int ret = -1; 154 int i; 155 156 lmutex_lock(&__aio_initlock); 157 while (__aio_initbusy) 158 (void) _cond_wait(&__aio_initcv, &__aio_initlock); 159 if (__uaio_ok) { /* already initialized */ 160 lmutex_unlock(&__aio_initlock); 161 return (0); 162 } 163 __aio_initbusy = 1; 164 lmutex_unlock(&__aio_initlock); 165 166 hz = (int)sysconf(_SC_CLK_TCK); 167 __pid = getpid(); 168 169 setup_cancelsig(SIGAIOCANCEL); 170 171 if (_kaio_supported_init() != 0) 172 goto out; 173 174 /* 175 * Allocate and initialize the hash table. 176 * Do this only once, even if __uaio_init() is called twice. 177 */ 178 if (_aio_hash == NULL) { 179 /* LINTED pointer cast */ 180 _aio_hash = (aio_hash_t *)mmap(NULL, 181 HASHSZ * sizeof (aio_hash_t), PROT_READ | PROT_WRITE, 182 MAP_PRIVATE | MAP_ANON, -1, (off_t)0); 183 if ((void *)_aio_hash == MAP_FAILED) { 184 _aio_hash = NULL; 185 goto out; 186 } 187 for (i = 0; i < HASHSZ; i++) 188 (void) mutex_init(&_aio_hash[i].hash_lock, 189 USYNC_THREAD, NULL); 190 } 191 192 /* 193 * Initialize worker's signal mask to only catch SIGAIOCANCEL. 194 */ 195 (void) sigfillset(&_worker_set); 196 (void) sigdelset(&_worker_set, SIGAIOCANCEL); 197 198 /* 199 * Create one worker to send asynchronous notifications. 200 * Do this only once, even if __uaio_init() is called twice. 201 */ 202 if (__no_workerscnt == 0 && 203 (_aio_create_worker(NULL, AIONOTIFY) != 0)) { 204 errno = EAGAIN; 205 goto out; 206 } 207 208 /* 209 * Create the minimum number of read/write workers. 210 * And later check whether atleast one worker is created; 211 * lwp_create() calls could fail because of segkp exhaustion. 212 */ 213 for (i = 0; i < _min_workers; i++) 214 (void) _aio_create_worker(NULL, AIOREAD); 215 if (__rw_workerscnt == 0) { 216 errno = EAGAIN; 217 goto out; 218 } 219 220 ret = 0; 221 out: 222 lmutex_lock(&__aio_initlock); 223 if (ret == 0) 224 __uaio_ok = 1; 225 __aio_initbusy = 0; 226 (void) cond_broadcast(&__aio_initcv); 227 lmutex_unlock(&__aio_initlock); 228 return (ret); 229 } 230 231 /* 232 * Called from close() before actually performing the real _close(). 233 */ 234 void 235 _aio_close(int fd) 236 { 237 if (fd < 0) /* avoid cancelling everything */ 238 return; 239 /* 240 * Cancel all outstanding aio requests for this file descriptor. 241 */ 242 if (__uaio_ok) 243 (void) aiocancel_all(fd); 244 /* 245 * If we have allocated the bit array, clear the bit for this file. 246 * The next open may re-use this file descriptor and the new file 247 * may have different kaio() behaviour. 248 */ 249 if (_kaio_supported != NULL) 250 CLEAR_KAIO_SUPPORTED(fd); 251 } 252 253 /* 254 * special kaio cleanup thread sits in a loop in the 255 * kernel waiting for pending kaio requests to complete. 256 */ 257 void * 258 _kaio_cleanup_thread(void *arg) 259 { 260 if (pthread_setspecific(_aio_key, arg) != 0) 261 aio_panic("_kaio_cleanup_thread, pthread_setspecific()"); 262 (void) _kaio(AIOSTART); 263 return (arg); 264 } 265 266 /* 267 * initialize kaio. 268 */ 269 void 270 _kaio_init() 271 { 272 int error; 273 sigset_t oset; 274 275 lmutex_lock(&__aio_initlock); 276 while (__aio_initbusy) 277 (void) _cond_wait(&__aio_initcv, &__aio_initlock); 278 if (_kaio_ok) { /* already initialized */ 279 lmutex_unlock(&__aio_initlock); 280 return; 281 } 282 __aio_initbusy = 1; 283 lmutex_unlock(&__aio_initlock); 284 285 if (_kaio_supported_init() != 0) 286 error = ENOMEM; 287 else if ((_kaiowp = _aio_worker_alloc()) == NULL) 288 error = ENOMEM; 289 else if ((error = (int)_kaio(AIOINIT)) == 0) { 290 (void) pthread_sigmask(SIG_SETMASK, &maskset, &oset); 291 error = thr_create(NULL, AIOSTKSIZE, _kaio_cleanup_thread, 292 _kaiowp, THR_DAEMON, &_kaiowp->work_tid); 293 (void) pthread_sigmask(SIG_SETMASK, &oset, NULL); 294 } 295 if (error && _kaiowp != NULL) { 296 _aio_worker_free(_kaiowp); 297 _kaiowp = NULL; 298 } 299 300 lmutex_lock(&__aio_initlock); 301 if (error) 302 _kaio_ok = -1; 303 else 304 _kaio_ok = 1; 305 __aio_initbusy = 0; 306 (void) cond_broadcast(&__aio_initcv); 307 lmutex_unlock(&__aio_initlock); 308 } 309 310 int 311 aioread(int fd, caddr_t buf, int bufsz, off_t offset, int whence, 312 aio_result_t *resultp) 313 { 314 return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOREAD)); 315 } 316 317 int 318 aiowrite(int fd, caddr_t buf, int bufsz, off_t offset, int whence, 319 aio_result_t *resultp) 320 { 321 return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOWRITE)); 322 } 323 324 #if !defined(_LP64) 325 int 326 aioread64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence, 327 aio_result_t *resultp) 328 { 329 return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAREAD64)); 330 } 331 332 int 333 aiowrite64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence, 334 aio_result_t *resultp) 335 { 336 return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAWRITE64)); 337 } 338 #endif /* !defined(_LP64) */ 339 340 int 341 _aiorw(int fd, caddr_t buf, int bufsz, offset_t offset, int whence, 342 aio_result_t *resultp, int mode) 343 { 344 aio_req_t *reqp; 345 aio_args_t *ap; 346 offset_t loffset; 347 struct stat stat; 348 int error = 0; 349 int kerr; 350 int umode; 351 352 switch (whence) { 353 354 case SEEK_SET: 355 loffset = offset; 356 break; 357 case SEEK_CUR: 358 if ((loffset = llseek(fd, 0, SEEK_CUR)) == -1) 359 error = -1; 360 else 361 loffset += offset; 362 break; 363 case SEEK_END: 364 if (fstat(fd, &stat) == -1) 365 error = -1; 366 else 367 loffset = offset + stat.st_size; 368 break; 369 default: 370 errno = EINVAL; 371 error = -1; 372 } 373 374 if (error) 375 return (error); 376 377 /* initialize kaio */ 378 if (!_kaio_ok) 379 _kaio_init(); 380 381 /* 382 * _aio_do_request() needs the original request code (mode) to be able 383 * to choose the appropiate 32/64 bit function. All other functions 384 * only require the difference between READ and WRITE (umode). 385 */ 386 if (mode == AIOAREAD64 || mode == AIOAWRITE64) 387 umode = mode - AIOAREAD64; 388 else 389 umode = mode; 390 391 /* 392 * Try kernel aio first. 393 * If errno is ENOTSUP/EBADFD, fall back to the thread implementation. 394 */ 395 if (_kaio_ok > 0 && KAIO_SUPPORTED(fd)) { 396 resultp->aio_errno = 0; 397 sig_mutex_lock(&__aio_mutex); 398 _kaio_outstand_cnt++; 399 kerr = (int)_kaio(((resultp->aio_return == AIO_INPROGRESS) ? 400 (umode | AIO_POLL_BIT) : umode), 401 fd, buf, bufsz, loffset, resultp); 402 if (kerr == 0) { 403 sig_mutex_unlock(&__aio_mutex); 404 return (0); 405 } 406 _kaio_outstand_cnt--; 407 sig_mutex_unlock(&__aio_mutex); 408 if (errno != ENOTSUP && errno != EBADFD) 409 return (-1); 410 if (errno == EBADFD) 411 SET_KAIO_NOT_SUPPORTED(fd); 412 } 413 414 if (!__uaio_ok && __uaio_init() == -1) 415 return (-1); 416 417 if ((reqp = _aio_req_alloc()) == NULL) { 418 errno = EAGAIN; 419 return (-1); 420 } 421 422 /* 423 * _aio_do_request() checks reqp->req_op to differentiate 424 * between 32 and 64 bit access. 425 */ 426 reqp->req_op = mode; 427 reqp->req_resultp = resultp; 428 ap = &reqp->req_args; 429 ap->fd = fd; 430 ap->buf = buf; 431 ap->bufsz = bufsz; 432 ap->offset = loffset; 433 434 if (_aio_hash_insert(resultp, reqp) != 0) { 435 _aio_req_free(reqp); 436 errno = EINVAL; 437 return (-1); 438 } 439 /* 440 * _aio_req_add() only needs the difference between READ and 441 * WRITE to choose the right worker queue. 442 */ 443 _aio_req_add(reqp, &__nextworker_rw, umode); 444 return (0); 445 } 446 447 int 448 aiocancel(aio_result_t *resultp) 449 { 450 aio_req_t *reqp; 451 aio_worker_t *aiowp; 452 int ret; 453 int done = 0; 454 int canceled = 0; 455 456 if (!__uaio_ok) { 457 errno = EINVAL; 458 return (-1); 459 } 460 461 sig_mutex_lock(&__aio_mutex); 462 reqp = _aio_hash_find(resultp); 463 if (reqp == NULL) { 464 if (_aio_outstand_cnt == _aio_req_done_cnt) 465 errno = EINVAL; 466 else 467 errno = EACCES; 468 ret = -1; 469 } else { 470 aiowp = reqp->req_worker; 471 sig_mutex_lock(&aiowp->work_qlock1); 472 (void) _aio_cancel_req(aiowp, reqp, &canceled, &done); 473 sig_mutex_unlock(&aiowp->work_qlock1); 474 475 if (canceled) { 476 ret = 0; 477 } else { 478 if (_aio_outstand_cnt == 0 || 479 _aio_outstand_cnt == _aio_req_done_cnt) 480 errno = EINVAL; 481 else 482 errno = EACCES; 483 ret = -1; 484 } 485 } 486 sig_mutex_unlock(&__aio_mutex); 487 return (ret); 488 } 489 490 /* 491 * This must be asynch safe 492 */ 493 aio_result_t * 494 aiowait(struct timeval *uwait) 495 { 496 aio_result_t *uresultp; 497 aio_result_t *kresultp; 498 aio_result_t *resultp; 499 int dontblock; 500 int timedwait = 0; 501 int kaio_errno = 0; 502 struct timeval twait; 503 struct timeval *wait = NULL; 504 hrtime_t hrtend; 505 hrtime_t hres; 506 507 if (uwait) { 508 /* 509 * Check for a valid specified wait time. 510 * If it is invalid, fail the call right away. 511 */ 512 if (uwait->tv_sec < 0 || uwait->tv_usec < 0 || 513 uwait->tv_usec >= MICROSEC) { 514 errno = EINVAL; 515 return ((aio_result_t *)-1); 516 } 517 518 if (uwait->tv_sec > 0 || uwait->tv_usec > 0) { 519 hrtend = gethrtime() + 520 (hrtime_t)uwait->tv_sec * NANOSEC + 521 (hrtime_t)uwait->tv_usec * (NANOSEC / MICROSEC); 522 twait = *uwait; 523 wait = &twait; 524 timedwait++; 525 } else { 526 /* polling */ 527 sig_mutex_lock(&__aio_mutex); 528 if (_kaio_outstand_cnt == 0) { 529 kresultp = (aio_result_t *)-1; 530 } else { 531 kresultp = (aio_result_t *)_kaio(AIOWAIT, 532 (struct timeval *)-1, 1); 533 if (kresultp != (aio_result_t *)-1 && 534 kresultp != NULL && 535 kresultp != (aio_result_t *)1) { 536 _kaio_outstand_cnt--; 537 sig_mutex_unlock(&__aio_mutex); 538 return (kresultp); 539 } 540 } 541 uresultp = _aio_req_done(); 542 sig_mutex_unlock(&__aio_mutex); 543 if (uresultp != NULL && 544 uresultp != (aio_result_t *)-1) { 545 return (uresultp); 546 } 547 if (uresultp == (aio_result_t *)-1 && 548 kresultp == (aio_result_t *)-1) { 549 errno = EINVAL; 550 return ((aio_result_t *)-1); 551 } else { 552 return (NULL); 553 } 554 } 555 } 556 557 for (;;) { 558 sig_mutex_lock(&__aio_mutex); 559 uresultp = _aio_req_done(); 560 if (uresultp != NULL && uresultp != (aio_result_t *)-1) { 561 sig_mutex_unlock(&__aio_mutex); 562 resultp = uresultp; 563 break; 564 } 565 _aiowait_flag++; 566 dontblock = (uresultp == (aio_result_t *)-1); 567 if (dontblock && _kaio_outstand_cnt == 0) { 568 kresultp = (aio_result_t *)-1; 569 kaio_errno = EINVAL; 570 } else { 571 sig_mutex_unlock(&__aio_mutex); 572 kresultp = (aio_result_t *)_kaio(AIOWAIT, 573 wait, dontblock); 574 sig_mutex_lock(&__aio_mutex); 575 kaio_errno = errno; 576 } 577 _aiowait_flag--; 578 sig_mutex_unlock(&__aio_mutex); 579 if (kresultp == (aio_result_t *)1) { 580 /* aiowait() awakened by an aionotify() */ 581 continue; 582 } else if (kresultp != NULL && 583 kresultp != (aio_result_t *)-1) { 584 resultp = kresultp; 585 sig_mutex_lock(&__aio_mutex); 586 _kaio_outstand_cnt--; 587 sig_mutex_unlock(&__aio_mutex); 588 break; 589 } else if (kresultp == (aio_result_t *)-1 && 590 kaio_errno == EINVAL && 591 uresultp == (aio_result_t *)-1) { 592 errno = kaio_errno; 593 resultp = (aio_result_t *)-1; 594 break; 595 } else if (kresultp == (aio_result_t *)-1 && 596 kaio_errno == EINTR) { 597 errno = kaio_errno; 598 resultp = (aio_result_t *)-1; 599 break; 600 } else if (timedwait) { 601 hres = hrtend - gethrtime(); 602 if (hres <= 0) { 603 /* time is up; return */ 604 resultp = NULL; 605 break; 606 } else { 607 /* 608 * Some time left. Round up the remaining time 609 * in nanoseconds to microsec. Retry the call. 610 */ 611 hres += (NANOSEC / MICROSEC) - 1; 612 wait->tv_sec = hres / NANOSEC; 613 wait->tv_usec = 614 (hres % NANOSEC) / (NANOSEC / MICROSEC); 615 } 616 } else { 617 ASSERT(kresultp == NULL && uresultp == NULL); 618 resultp = NULL; 619 continue; 620 } 621 } 622 return (resultp); 623 } 624 625 /* 626 * _aio_get_timedelta calculates the remaining time and stores the result 627 * into timespec_t *wait. 628 */ 629 630 int 631 _aio_get_timedelta(timespec_t *end, timespec_t *wait) 632 { 633 int ret = 0; 634 struct timeval cur; 635 timespec_t curtime; 636 637 (void) gettimeofday(&cur, NULL); 638 curtime.tv_sec = cur.tv_sec; 639 curtime.tv_nsec = cur.tv_usec * 1000; /* convert us to ns */ 640 641 if (end->tv_sec >= curtime.tv_sec) { 642 wait->tv_sec = end->tv_sec - curtime.tv_sec; 643 if (end->tv_nsec >= curtime.tv_nsec) { 644 wait->tv_nsec = end->tv_nsec - curtime.tv_nsec; 645 if (wait->tv_sec == 0 && wait->tv_nsec == 0) 646 ret = -1; /* timer expired */ 647 } else { 648 if (end->tv_sec > curtime.tv_sec) { 649 wait->tv_sec -= 1; 650 wait->tv_nsec = NANOSEC - 651 (curtime.tv_nsec - end->tv_nsec); 652 } else { 653 ret = -1; /* timer expired */ 654 } 655 } 656 } else { 657 ret = -1; 658 } 659 return (ret); 660 } 661 662 /* 663 * If closing by file descriptor: we will simply cancel all the outstanding 664 * aio`s and return. Those aio's in question will have either noticed the 665 * cancellation notice before, during, or after initiating io. 666 */ 667 int 668 aiocancel_all(int fd) 669 { 670 aio_req_t *reqp; 671 aio_req_t **reqpp; 672 aio_worker_t *first; 673 aio_worker_t *next; 674 int canceled = 0; 675 int done = 0; 676 int cancelall = 0; 677 678 sig_mutex_lock(&__aio_mutex); 679 680 if (_aio_outstand_cnt == 0) { 681 sig_mutex_unlock(&__aio_mutex); 682 return (AIO_ALLDONE); 683 } 684 685 /* 686 * Cancel requests from the read/write workers' queues. 687 */ 688 first = __nextworker_rw; 689 next = first; 690 do { 691 _aio_cancel_work(next, fd, &canceled, &done); 692 } while ((next = next->work_forw) != first); 693 694 /* 695 * finally, check if there are requests on the done queue that 696 * should be canceled. 697 */ 698 if (fd < 0) 699 cancelall = 1; 700 reqpp = &_aio_done_tail; 701 while ((reqp = *reqpp) != NULL) { 702 if (cancelall || reqp->req_args.fd == fd) { 703 *reqpp = reqp->req_next; 704 _aio_donecnt--; 705 (void) _aio_hash_del(reqp->req_resultp); 706 _aio_req_free(reqp); 707 } else 708 reqpp = &reqp->req_next; 709 } 710 if (cancelall) { 711 ASSERT(_aio_donecnt == 0); 712 _aio_done_head = NULL; 713 } 714 sig_mutex_unlock(&__aio_mutex); 715 716 if (canceled && done == 0) 717 return (AIO_CANCELED); 718 else if (done && canceled == 0) 719 return (AIO_ALLDONE); 720 else if ((canceled + done == 0) && KAIO_SUPPORTED(fd)) 721 return ((int)_kaio(AIOCANCEL, fd, NULL)); 722 return (AIO_NOTCANCELED); 723 } 724 725 /* 726 * Cancel requests from a given work queue. If the file descriptor 727 * parameter, fd, is non-negative, then only cancel those requests 728 * in this queue that are to this file descriptor. If the fd 729 * parameter is -1, then cancel all requests. 730 */ 731 static void 732 _aio_cancel_work(aio_worker_t *aiowp, int fd, int *canceled, int *done) 733 { 734 aio_req_t *reqp; 735 736 sig_mutex_lock(&aiowp->work_qlock1); 737 /* 738 * cancel queued requests first. 739 */ 740 reqp = aiowp->work_tail1; 741 while (reqp != NULL) { 742 if (fd < 0 || reqp->req_args.fd == fd) { 743 if (_aio_cancel_req(aiowp, reqp, canceled, done)) { 744 /* 745 * Callers locks were dropped. 746 * reqp is invalid; start traversing 747 * the list from the beginning again. 748 */ 749 reqp = aiowp->work_tail1; 750 continue; 751 } 752 } 753 reqp = reqp->req_next; 754 } 755 /* 756 * Since the queued requests have been canceled, there can 757 * only be one inprogress request that should be canceled. 758 */ 759 if ((reqp = aiowp->work_req) != NULL && 760 (fd < 0 || reqp->req_args.fd == fd)) 761 (void) _aio_cancel_req(aiowp, reqp, canceled, done); 762 sig_mutex_unlock(&aiowp->work_qlock1); 763 } 764 765 /* 766 * Cancel a request. Return 1 if the callers locks were temporarily 767 * dropped, otherwise return 0. 768 */ 769 int 770 _aio_cancel_req(aio_worker_t *aiowp, aio_req_t *reqp, int *canceled, int *done) 771 { 772 int ostate = reqp->req_state; 773 774 ASSERT(MUTEX_HELD(&__aio_mutex)); 775 ASSERT(MUTEX_HELD(&aiowp->work_qlock1)); 776 if (ostate == AIO_REQ_CANCELED) 777 return (0); 778 if (ostate == AIO_REQ_DONE || ostate == AIO_REQ_DONEQ) { 779 (*done)++; 780 return (0); 781 } 782 if (reqp->req_op == AIOFSYNC && reqp != aiowp->work_req) { 783 ASSERT(POSIX_AIO(reqp)); 784 /* Cancel the queued aio_fsync() request */ 785 if (!reqp->req_head->lio_canned) { 786 reqp->req_head->lio_canned = 1; 787 _aio_outstand_cnt--; 788 (*canceled)++; 789 } 790 return (0); 791 } 792 reqp->req_state = AIO_REQ_CANCELED; 793 _aio_req_del(aiowp, reqp, ostate); 794 (void) _aio_hash_del(reqp->req_resultp); 795 (*canceled)++; 796 if (reqp == aiowp->work_req) { 797 ASSERT(ostate == AIO_REQ_INPROGRESS); 798 /* 799 * Set the result values now, before _aiodone() is called. 800 * We do this because the application can expect aio_return 801 * and aio_errno to be set to -1 and ECANCELED, respectively, 802 * immediately after a successful return from aiocancel() 803 * or aio_cancel(). 804 */ 805 _aio_set_result(reqp, -1, ECANCELED); 806 (void) thr_kill(aiowp->work_tid, SIGAIOCANCEL); 807 return (0); 808 } 809 if (!POSIX_AIO(reqp)) { 810 _aio_outstand_cnt--; 811 _aio_set_result(reqp, -1, ECANCELED); 812 return (0); 813 } 814 sig_mutex_unlock(&aiowp->work_qlock1); 815 sig_mutex_unlock(&__aio_mutex); 816 _aiodone(reqp, -1, ECANCELED); 817 sig_mutex_lock(&__aio_mutex); 818 sig_mutex_lock(&aiowp->work_qlock1); 819 return (1); 820 } 821 822 int 823 _aio_create_worker(aio_req_t *reqp, int mode) 824 { 825 aio_worker_t *aiowp, **workers, **nextworker; 826 int *aio_workerscnt; 827 void *(*func)(void *); 828 sigset_t oset; 829 int error; 830 831 /* 832 * Put the new worker thread in the right queue. 833 */ 834 switch (mode) { 835 case AIOREAD: 836 case AIOWRITE: 837 case AIOAREAD: 838 case AIOAWRITE: 839 #if !defined(_LP64) 840 case AIOAREAD64: 841 case AIOAWRITE64: 842 #endif 843 workers = &__workers_rw; 844 nextworker = &__nextworker_rw; 845 aio_workerscnt = &__rw_workerscnt; 846 func = _aio_do_request; 847 break; 848 case AIONOTIFY: 849 workers = &__workers_no; 850 nextworker = &__nextworker_no; 851 func = _aio_do_notify; 852 aio_workerscnt = &__no_workerscnt; 853 break; 854 default: 855 aio_panic("_aio_create_worker: invalid mode"); 856 break; 857 } 858 859 if ((aiowp = _aio_worker_alloc()) == NULL) 860 return (-1); 861 862 if (reqp) { 863 reqp->req_state = AIO_REQ_QUEUED; 864 reqp->req_worker = aiowp; 865 aiowp->work_head1 = reqp; 866 aiowp->work_tail1 = reqp; 867 aiowp->work_next1 = reqp; 868 aiowp->work_count1 = 1; 869 aiowp->work_minload1 = 1; 870 } 871 872 (void) pthread_sigmask(SIG_SETMASK, &maskset, &oset); 873 error = thr_create(NULL, AIOSTKSIZE, func, aiowp, 874 THR_DAEMON | THR_SUSPENDED, &aiowp->work_tid); 875 (void) pthread_sigmask(SIG_SETMASK, &oset, NULL); 876 if (error) { 877 if (reqp) { 878 reqp->req_state = 0; 879 reqp->req_worker = NULL; 880 } 881 _aio_worker_free(aiowp); 882 return (-1); 883 } 884 885 lmutex_lock(&__aio_mutex); 886 (*aio_workerscnt)++; 887 if (*workers == NULL) { 888 aiowp->work_forw = aiowp; 889 aiowp->work_backw = aiowp; 890 *nextworker = aiowp; 891 *workers = aiowp; 892 } else { 893 aiowp->work_backw = (*workers)->work_backw; 894 aiowp->work_forw = (*workers); 895 (*workers)->work_backw->work_forw = aiowp; 896 (*workers)->work_backw = aiowp; 897 } 898 _aio_worker_cnt++; 899 lmutex_unlock(&__aio_mutex); 900 901 (void) thr_continue(aiowp->work_tid); 902 903 return (0); 904 } 905 906 /* 907 * This is the worker's main routine. 908 * The task of this function is to execute all queued requests; 909 * once the last pending request is executed this function will block 910 * in _aio_idle(). A new incoming request must wakeup this thread to 911 * restart the work. 912 * Every worker has an own work queue. The queue lock is required 913 * to synchronize the addition of new requests for this worker or 914 * cancellation of pending/running requests. 915 * 916 * Cancellation scenarios: 917 * The cancellation of a request is being done asynchronously using 918 * _aio_cancel_req() from another thread context. 919 * A queued request can be cancelled in different manners : 920 * a) request is queued but not "in progress" or "done" (AIO_REQ_QUEUED): 921 * - lock the queue -> remove the request -> unlock the queue 922 * - this function/thread does not detect this cancellation process 923 * b) request is in progress (AIO_REQ_INPROGRESS) : 924 * - this function first allow the cancellation of the running 925 * request with the flag "work_cancel_flg=1" 926 * see _aio_req_get() -> _aio_cancel_on() 927 * During this phase, it is allowed to interrupt the worker 928 * thread running the request (this thread) using the SIGAIOCANCEL 929 * signal. 930 * Once this thread returns from the kernel (because the request 931 * is just done), then it must disable a possible cancellation 932 * and proceed to finish the request. To disable the cancellation 933 * this thread must use _aio_cancel_off() to set "work_cancel_flg=0". 934 * c) request is already done (AIO_REQ_DONE || AIO_REQ_DONEQ): 935 * same procedure as in a) 936 * 937 * To b) 938 * This thread uses sigsetjmp() to define the position in the code, where 939 * it wish to continue working in the case that a SIGAIOCANCEL signal 940 * is detected. 941 * Normally this thread should get the cancellation signal during the 942 * kernel phase (reading or writing). In that case the signal handler 943 * aiosigcancelhndlr() is activated using the worker thread context, 944 * which again will use the siglongjmp() function to break the standard 945 * code flow and jump to the "sigsetjmp" position, provided that 946 * "work_cancel_flg" is set to "1". 947 * Because the "work_cancel_flg" is only manipulated by this worker 948 * thread and it can only run on one CPU at a given time, it is not 949 * necessary to protect that flag with the queue lock. 950 * Returning from the kernel (read or write system call) we must 951 * first disable the use of the SIGAIOCANCEL signal and accordingly 952 * the use of the siglongjmp() function to prevent a possible deadlock: 953 * - It can happens that this worker thread returns from the kernel and 954 * blocks in "work_qlock1", 955 * - then a second thread cancels the apparently "in progress" request 956 * and sends the SIGAIOCANCEL signal to the worker thread, 957 * - the worker thread gets assigned the "work_qlock1" and will returns 958 * from the kernel, 959 * - the kernel detects the pending signal and activates the signal 960 * handler instead, 961 * - if the "work_cancel_flg" is still set then the signal handler 962 * should use siglongjmp() to cancel the "in progress" request and 963 * it would try to acquire the same work_qlock1 in _aio_req_get() 964 * for a second time => deadlock. 965 * To avoid that situation we disable the cancellation of the request 966 * in progress BEFORE we try to acquire the work_qlock1. 967 * In that case the signal handler will not call siglongjmp() and the 968 * worker thread will continue running the standard code flow. 969 * Then this thread must check the AIO_REQ_CANCELED flag to emulate 970 * an eventually required siglongjmp() freeing the work_qlock1 and 971 * avoiding a deadlock. 972 */ 973 void * 974 _aio_do_request(void *arglist) 975 { 976 aio_worker_t *aiowp = (aio_worker_t *)arglist; 977 ulwp_t *self = curthread; 978 struct aio_args *arg; 979 aio_req_t *reqp; /* current AIO request */ 980 ssize_t retval; 981 int error; 982 983 if (pthread_setspecific(_aio_key, aiowp) != 0) 984 aio_panic("_aio_do_request, pthread_setspecific()"); 985 (void) pthread_sigmask(SIG_SETMASK, &_worker_set, NULL); 986 ASSERT(aiowp->work_req == NULL); 987 988 /* 989 * We resume here when an operation is cancelled. 990 * On first entry, aiowp->work_req == NULL, so all 991 * we do is block SIGAIOCANCEL. 992 */ 993 (void) sigsetjmp(aiowp->work_jmp_buf, 0); 994 ASSERT(self->ul_sigdefer == 0); 995 996 sigoff(self); /* block SIGAIOCANCEL */ 997 if (aiowp->work_req != NULL) 998 _aio_finish_request(aiowp, -1, ECANCELED); 999 1000 for (;;) { 1001 /* 1002 * Put completed requests on aio_done_list. This has 1003 * to be done as part of the main loop to ensure that 1004 * we don't artificially starve any aiowait'ers. 1005 */ 1006 if (aiowp->work_done1) 1007 _aio_work_done(aiowp); 1008 1009 top: 1010 /* consume any deferred SIGAIOCANCEL signal here */ 1011 sigon(self); 1012 sigoff(self); 1013 1014 while ((reqp = _aio_req_get(aiowp)) == NULL) { 1015 if (_aio_idle(aiowp) != 0) 1016 goto top; 1017 } 1018 arg = &reqp->req_args; 1019 ASSERT(reqp->req_state == AIO_REQ_INPROGRESS || 1020 reqp->req_state == AIO_REQ_CANCELED); 1021 error = 0; 1022 1023 switch (reqp->req_op) { 1024 case AIOREAD: 1025 case AIOAREAD: 1026 sigon(self); /* unblock SIGAIOCANCEL */ 1027 retval = pread(arg->fd, arg->buf, 1028 arg->bufsz, arg->offset); 1029 if (retval == -1) { 1030 if (errno == ESPIPE) { 1031 retval = read(arg->fd, 1032 arg->buf, arg->bufsz); 1033 if (retval == -1) 1034 error = errno; 1035 } else { 1036 error = errno; 1037 } 1038 } 1039 sigoff(self); /* block SIGAIOCANCEL */ 1040 break; 1041 case AIOWRITE: 1042 case AIOAWRITE: 1043 sigon(self); /* unblock SIGAIOCANCEL */ 1044 retval = pwrite(arg->fd, arg->buf, 1045 arg->bufsz, arg->offset); 1046 if (retval == -1) { 1047 if (errno == ESPIPE) { 1048 retval = write(arg->fd, 1049 arg->buf, arg->bufsz); 1050 if (retval == -1) 1051 error = errno; 1052 } else { 1053 error = errno; 1054 } 1055 } 1056 sigoff(self); /* block SIGAIOCANCEL */ 1057 break; 1058 #if !defined(_LP64) 1059 case AIOAREAD64: 1060 sigon(self); /* unblock SIGAIOCANCEL */ 1061 retval = pread64(arg->fd, arg->buf, 1062 arg->bufsz, arg->offset); 1063 if (retval == -1) { 1064 if (errno == ESPIPE) { 1065 retval = read(arg->fd, 1066 arg->buf, arg->bufsz); 1067 if (retval == -1) 1068 error = errno; 1069 } else { 1070 error = errno; 1071 } 1072 } 1073 sigoff(self); /* block SIGAIOCANCEL */ 1074 break; 1075 case AIOAWRITE64: 1076 sigon(self); /* unblock SIGAIOCANCEL */ 1077 retval = pwrite64(arg->fd, arg->buf, 1078 arg->bufsz, arg->offset); 1079 if (retval == -1) { 1080 if (errno == ESPIPE) { 1081 retval = write(arg->fd, 1082 arg->buf, arg->bufsz); 1083 if (retval == -1) 1084 error = errno; 1085 } else { 1086 error = errno; 1087 } 1088 } 1089 sigoff(self); /* block SIGAIOCANCEL */ 1090 break; 1091 #endif /* !defined(_LP64) */ 1092 case AIOFSYNC: 1093 if (_aio_fsync_del(aiowp, reqp)) 1094 goto top; 1095 ASSERT(reqp->req_head == NULL); 1096 /* 1097 * All writes for this fsync request are now 1098 * acknowledged. Now make these writes visible 1099 * and put the final request into the hash table. 1100 */ 1101 if (reqp->req_state == AIO_REQ_CANCELED) { 1102 /* EMPTY */; 1103 } else if (arg->offset == O_SYNC) { 1104 if ((retval = __fdsync(arg->fd, FSYNC)) == -1) 1105 error = errno; 1106 } else { 1107 if ((retval = __fdsync(arg->fd, FDSYNC)) == -1) 1108 error = errno; 1109 } 1110 if (_aio_hash_insert(reqp->req_resultp, reqp) != 0) 1111 aio_panic("_aio_do_request(): AIOFSYNC: " 1112 "request already in hash table"); 1113 break; 1114 default: 1115 aio_panic("_aio_do_request, bad op"); 1116 } 1117 1118 _aio_finish_request(aiowp, retval, error); 1119 } 1120 /* NOTREACHED */ 1121 return (NULL); 1122 } 1123 1124 /* 1125 * Perform the tail processing for _aio_do_request(). 1126 * The in-progress request may or may not have been cancelled. 1127 */ 1128 static void 1129 _aio_finish_request(aio_worker_t *aiowp, ssize_t retval, int error) 1130 { 1131 aio_req_t *reqp; 1132 1133 sig_mutex_lock(&aiowp->work_qlock1); 1134 if ((reqp = aiowp->work_req) == NULL) 1135 sig_mutex_unlock(&aiowp->work_qlock1); 1136 else { 1137 aiowp->work_req = NULL; 1138 if (reqp->req_state == AIO_REQ_CANCELED) { 1139 retval = -1; 1140 error = ECANCELED; 1141 } 1142 if (!POSIX_AIO(reqp)) { 1143 int notify; 1144 sig_mutex_unlock(&aiowp->work_qlock1); 1145 sig_mutex_lock(&__aio_mutex); 1146 if (reqp->req_state == AIO_REQ_INPROGRESS) 1147 reqp->req_state = AIO_REQ_DONE; 1148 /* 1149 * If it was canceled, this request will not be 1150 * added to done list. Just free it. 1151 */ 1152 if (error == ECANCELED) { 1153 _aio_outstand_cnt--; 1154 _aio_req_free(reqp); 1155 } else { 1156 _aio_set_result(reqp, retval, error); 1157 _aio_req_done_cnt++; 1158 } 1159 /* 1160 * Notify any thread that may have blocked 1161 * because it saw an outstanding request. 1162 */ 1163 notify = 0; 1164 if (_aio_outstand_cnt == 0 && _aiowait_flag) { 1165 notify = 1; 1166 } 1167 sig_mutex_unlock(&__aio_mutex); 1168 if (notify) { 1169 (void) _kaio(AIONOTIFY); 1170 } 1171 } else { 1172 if (reqp->req_state == AIO_REQ_INPROGRESS) 1173 reqp->req_state = AIO_REQ_DONE; 1174 sig_mutex_unlock(&aiowp->work_qlock1); 1175 _aiodone(reqp, retval, error); 1176 } 1177 } 1178 } 1179 1180 void 1181 _aio_req_mark_done(aio_req_t *reqp) 1182 { 1183 #if !defined(_LP64) 1184 if (reqp->req_largefile) 1185 ((aiocb64_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE; 1186 else 1187 #endif 1188 ((aiocb_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE; 1189 } 1190 1191 /* 1192 * Sleep for 'ticks' clock ticks to give somebody else a chance to run, 1193 * hopefully to consume one of our queued signals. 1194 */ 1195 static void 1196 _aio_delay(int ticks) 1197 { 1198 (void) usleep(ticks * (MICROSEC / hz)); 1199 } 1200 1201 /* 1202 * Actually send the notifications. 1203 * We could block indefinitely here if the application 1204 * is not listening for the signal or port notifications. 1205 */ 1206 static void 1207 send_notification(notif_param_t *npp) 1208 { 1209 extern int __sigqueue(pid_t pid, int signo, 1210 /* const union sigval */ void *value, int si_code, int block); 1211 1212 if (npp->np_signo) 1213 (void) __sigqueue(__pid, npp->np_signo, npp->np_user, 1214 SI_ASYNCIO, 1); 1215 else if (npp->np_port >= 0) 1216 (void) _port_dispatch(npp->np_port, 0, PORT_SOURCE_AIO, 1217 npp->np_event, npp->np_object, npp->np_user); 1218 1219 if (npp->np_lio_signo) 1220 (void) __sigqueue(__pid, npp->np_lio_signo, npp->np_lio_user, 1221 SI_ASYNCIO, 1); 1222 else if (npp->np_lio_port >= 0) 1223 (void) _port_dispatch(npp->np_lio_port, 0, PORT_SOURCE_AIO, 1224 npp->np_lio_event, npp->np_lio_object, npp->np_lio_user); 1225 } 1226 1227 /* 1228 * Asynchronous notification worker. 1229 */ 1230 void * 1231 _aio_do_notify(void *arg) 1232 { 1233 aio_worker_t *aiowp = (aio_worker_t *)arg; 1234 aio_req_t *reqp; 1235 1236 /* 1237 * This isn't really necessary. All signals are blocked. 1238 */ 1239 if (pthread_setspecific(_aio_key, aiowp) != 0) 1240 aio_panic("_aio_do_notify, pthread_setspecific()"); 1241 1242 /* 1243 * Notifications are never cancelled. 1244 * All signals remain blocked, forever. 1245 */ 1246 for (;;) { 1247 while ((reqp = _aio_req_get(aiowp)) == NULL) { 1248 if (_aio_idle(aiowp) != 0) 1249 aio_panic("_aio_do_notify: _aio_idle() failed"); 1250 } 1251 send_notification(&reqp->req_notify); 1252 _aio_req_free(reqp); 1253 } 1254 1255 /* NOTREACHED */ 1256 return (NULL); 1257 } 1258 1259 /* 1260 * Do the completion semantics for a request that was either canceled 1261 * by _aio_cancel_req() or was completed by _aio_do_request(). 1262 */ 1263 static void 1264 _aiodone(aio_req_t *reqp, ssize_t retval, int error) 1265 { 1266 aio_result_t *resultp = reqp->req_resultp; 1267 int notify = 0; 1268 aio_lio_t *head; 1269 int sigev_none; 1270 int sigev_signal; 1271 int sigev_thread; 1272 int sigev_port; 1273 notif_param_t np; 1274 1275 /* 1276 * We call _aiodone() only for Posix I/O. 1277 */ 1278 ASSERT(POSIX_AIO(reqp)); 1279 1280 sigev_none = 0; 1281 sigev_signal = 0; 1282 sigev_thread = 0; 1283 sigev_port = 0; 1284 np.np_signo = 0; 1285 np.np_port = -1; 1286 np.np_lio_signo = 0; 1287 np.np_lio_port = -1; 1288 1289 switch (reqp->req_sigevent.sigev_notify) { 1290 case SIGEV_NONE: 1291 sigev_none = 1; 1292 break; 1293 case SIGEV_SIGNAL: 1294 sigev_signal = 1; 1295 break; 1296 case SIGEV_THREAD: 1297 sigev_thread = 1; 1298 break; 1299 case SIGEV_PORT: 1300 sigev_port = 1; 1301 break; 1302 default: 1303 aio_panic("_aiodone: improper sigev_notify"); 1304 break; 1305 } 1306 1307 /* 1308 * Figure out the notification parameters while holding __aio_mutex. 1309 * Actually perform the notifications after dropping __aio_mutex. 1310 * This allows us to sleep for a long time (if the notifications 1311 * incur delays) without impeding other async I/O operations. 1312 */ 1313 1314 sig_mutex_lock(&__aio_mutex); 1315 1316 if (sigev_signal) { 1317 if ((np.np_signo = reqp->req_sigevent.sigev_signo) != 0) 1318 notify = 1; 1319 np.np_user = reqp->req_sigevent.sigev_value.sival_ptr; 1320 } else if (sigev_thread | sigev_port) { 1321 if ((np.np_port = reqp->req_sigevent.sigev_signo) >= 0) 1322 notify = 1; 1323 np.np_event = reqp->req_op; 1324 if (np.np_event == AIOFSYNC && reqp->req_largefile) 1325 np.np_event = AIOFSYNC64; 1326 np.np_object = (uintptr_t)reqp->req_aiocbp; 1327 np.np_user = reqp->req_sigevent.sigev_value.sival_ptr; 1328 } 1329 1330 if (resultp->aio_errno == EINPROGRESS) 1331 _aio_set_result(reqp, retval, error); 1332 1333 _aio_outstand_cnt--; 1334 1335 head = reqp->req_head; 1336 reqp->req_head = NULL; 1337 1338 if (sigev_none) { 1339 _aio_enq_doneq(reqp); 1340 reqp = NULL; 1341 } else { 1342 (void) _aio_hash_del(resultp); 1343 _aio_req_mark_done(reqp); 1344 } 1345 1346 _aio_waitn_wakeup(); 1347 1348 /* 1349 * __aio_waitn() sets AIO_WAIT_INPROGRESS and 1350 * __aio_suspend() increments "_aio_kernel_suspend" 1351 * when they are waiting in the kernel for completed I/Os. 1352 * 1353 * _kaio(AIONOTIFY) awakes the corresponding function 1354 * in the kernel; then the corresponding __aio_waitn() or 1355 * __aio_suspend() function could reap the recently 1356 * completed I/Os (_aiodone()). 1357 */ 1358 if ((_aio_flags & AIO_WAIT_INPROGRESS) || _aio_kernel_suspend > 0) 1359 (void) _kaio(AIONOTIFY); 1360 1361 sig_mutex_unlock(&__aio_mutex); 1362 1363 if (head != NULL) { 1364 /* 1365 * If all the lio requests have completed, 1366 * prepare to notify the waiting thread. 1367 */ 1368 sig_mutex_lock(&head->lio_mutex); 1369 ASSERT(head->lio_refcnt == head->lio_nent); 1370 if (head->lio_refcnt == 1) { 1371 int waiting = 0; 1372 if (head->lio_mode == LIO_WAIT) { 1373 if ((waiting = head->lio_waiting) != 0) 1374 (void) cond_signal(&head->lio_cond_cv); 1375 } else if (head->lio_port < 0) { /* none or signal */ 1376 if ((np.np_lio_signo = head->lio_signo) != 0) 1377 notify = 1; 1378 np.np_lio_user = head->lio_sigval.sival_ptr; 1379 } else { /* thread or port */ 1380 notify = 1; 1381 np.np_lio_port = head->lio_port; 1382 np.np_lio_event = head->lio_event; 1383 np.np_lio_object = 1384 (uintptr_t)head->lio_sigevent; 1385 np.np_lio_user = head->lio_sigval.sival_ptr; 1386 } 1387 head->lio_nent = head->lio_refcnt = 0; 1388 sig_mutex_unlock(&head->lio_mutex); 1389 if (waiting == 0) 1390 _aio_lio_free(head); 1391 } else { 1392 head->lio_nent--; 1393 head->lio_refcnt--; 1394 sig_mutex_unlock(&head->lio_mutex); 1395 } 1396 } 1397 1398 /* 1399 * The request is completed; now perform the notifications. 1400 */ 1401 if (notify) { 1402 if (reqp != NULL) { 1403 /* 1404 * We usually put the request on the notification 1405 * queue because we don't want to block and delay 1406 * other operations behind us in the work queue. 1407 * Also we must never block on a cancel notification 1408 * because we are being called from an application 1409 * thread in this case and that could lead to deadlock 1410 * if no other thread is receiving notificatins. 1411 */ 1412 reqp->req_notify = np; 1413 reqp->req_op = AIONOTIFY; 1414 _aio_req_add(reqp, &__workers_no, AIONOTIFY); 1415 reqp = NULL; 1416 } else { 1417 /* 1418 * We already put the request on the done queue, 1419 * so we can't queue it to the notification queue. 1420 * Just do the notification directly. 1421 */ 1422 send_notification(&np); 1423 } 1424 } 1425 1426 if (reqp != NULL) 1427 _aio_req_free(reqp); 1428 } 1429 1430 /* 1431 * Delete fsync requests from list head until there is 1432 * only one left. Return 0 when there is only one, 1433 * otherwise return a non-zero value. 1434 */ 1435 static int 1436 _aio_fsync_del(aio_worker_t *aiowp, aio_req_t *reqp) 1437 { 1438 aio_lio_t *head = reqp->req_head; 1439 int rval = 0; 1440 1441 ASSERT(reqp == aiowp->work_req); 1442 sig_mutex_lock(&aiowp->work_qlock1); 1443 sig_mutex_lock(&head->lio_mutex); 1444 if (head->lio_refcnt > 1) { 1445 head->lio_refcnt--; 1446 head->lio_nent--; 1447 aiowp->work_req = NULL; 1448 sig_mutex_unlock(&head->lio_mutex); 1449 sig_mutex_unlock(&aiowp->work_qlock1); 1450 sig_mutex_lock(&__aio_mutex); 1451 _aio_outstand_cnt--; 1452 _aio_waitn_wakeup(); 1453 sig_mutex_unlock(&__aio_mutex); 1454 _aio_req_free(reqp); 1455 return (1); 1456 } 1457 ASSERT(head->lio_nent == 1 && head->lio_refcnt == 1); 1458 reqp->req_head = NULL; 1459 if (head->lio_canned) 1460 reqp->req_state = AIO_REQ_CANCELED; 1461 if (head->lio_mode == LIO_DESTROY) { 1462 aiowp->work_req = NULL; 1463 rval = 1; 1464 } 1465 sig_mutex_unlock(&head->lio_mutex); 1466 sig_mutex_unlock(&aiowp->work_qlock1); 1467 head->lio_refcnt--; 1468 head->lio_nent--; 1469 _aio_lio_free(head); 1470 if (rval != 0) 1471 _aio_req_free(reqp); 1472 return (rval); 1473 } 1474 1475 /* 1476 * A worker is set idle when its work queue is empty. 1477 * The worker checks again that it has no more work 1478 * and then goes to sleep waiting for more work. 1479 */ 1480 int 1481 _aio_idle(aio_worker_t *aiowp) 1482 { 1483 int error = 0; 1484 1485 sig_mutex_lock(&aiowp->work_qlock1); 1486 if (aiowp->work_count1 == 0) { 1487 ASSERT(aiowp->work_minload1 == 0); 1488 aiowp->work_idleflg = 1; 1489 /* 1490 * A cancellation handler is not needed here. 1491 * aio worker threads are never cancelled via pthread_cancel(). 1492 */ 1493 error = sig_cond_wait(&aiowp->work_idle_cv, 1494 &aiowp->work_qlock1); 1495 /* 1496 * The idle flag is normally cleared before worker is awakened 1497 * by aio_req_add(). On error (EINTR), we clear it ourself. 1498 */ 1499 if (error) 1500 aiowp->work_idleflg = 0; 1501 } 1502 sig_mutex_unlock(&aiowp->work_qlock1); 1503 return (error); 1504 } 1505 1506 /* 1507 * A worker's completed AIO requests are placed onto a global 1508 * done queue. The application is only sent a SIGIO signal if 1509 * the process has a handler enabled and it is not waiting via 1510 * aiowait(). 1511 */ 1512 static void 1513 _aio_work_done(aio_worker_t *aiowp) 1514 { 1515 aio_req_t *reqp; 1516 1517 sig_mutex_lock(&aiowp->work_qlock1); 1518 reqp = aiowp->work_prev1; 1519 reqp->req_next = NULL; 1520 aiowp->work_done1 = 0; 1521 aiowp->work_tail1 = aiowp->work_next1; 1522 if (aiowp->work_tail1 == NULL) 1523 aiowp->work_head1 = NULL; 1524 aiowp->work_prev1 = NULL; 1525 sig_mutex_unlock(&aiowp->work_qlock1); 1526 sig_mutex_lock(&__aio_mutex); 1527 _aio_donecnt++; 1528 _aio_outstand_cnt--; 1529 _aio_req_done_cnt--; 1530 ASSERT(_aio_donecnt > 0 && 1531 _aio_outstand_cnt >= 0 && 1532 _aio_req_done_cnt >= 0); 1533 ASSERT(reqp != NULL); 1534 1535 if (_aio_done_tail == NULL) { 1536 _aio_done_head = _aio_done_tail = reqp; 1537 } else { 1538 _aio_done_head->req_next = reqp; 1539 _aio_done_head = reqp; 1540 } 1541 1542 if (_aiowait_flag) { 1543 sig_mutex_unlock(&__aio_mutex); 1544 (void) _kaio(AIONOTIFY); 1545 } else { 1546 sig_mutex_unlock(&__aio_mutex); 1547 if (_sigio_enabled) 1548 (void) kill(__pid, SIGIO); 1549 } 1550 } 1551 1552 /* 1553 * The done queue consists of AIO requests that are in either the 1554 * AIO_REQ_DONE or AIO_REQ_CANCELED state. Requests that were cancelled 1555 * are discarded. If the done queue is empty then NULL is returned. 1556 * Otherwise the address of a done aio_result_t is returned. 1557 */ 1558 aio_result_t * 1559 _aio_req_done(void) 1560 { 1561 aio_req_t *reqp; 1562 aio_result_t *resultp; 1563 1564 ASSERT(MUTEX_HELD(&__aio_mutex)); 1565 1566 if ((reqp = _aio_done_tail) != NULL) { 1567 if ((_aio_done_tail = reqp->req_next) == NULL) 1568 _aio_done_head = NULL; 1569 ASSERT(_aio_donecnt > 0); 1570 _aio_donecnt--; 1571 (void) _aio_hash_del(reqp->req_resultp); 1572 resultp = reqp->req_resultp; 1573 ASSERT(reqp->req_state == AIO_REQ_DONE); 1574 _aio_req_free(reqp); 1575 return (resultp); 1576 } 1577 /* is queue empty? */ 1578 if (reqp == NULL && _aio_outstand_cnt == 0) { 1579 return ((aio_result_t *)-1); 1580 } 1581 return (NULL); 1582 } 1583 1584 /* 1585 * Set the return and errno values for the application's use. 1586 * 1587 * For the Posix interfaces, we must set the return value first followed 1588 * by the errno value because the Posix interfaces allow for a change 1589 * in the errno value from EINPROGRESS to something else to signal 1590 * the completion of the asynchronous request. 1591 * 1592 * The opposite is true for the Solaris interfaces. These allow for 1593 * a change in the return value from AIO_INPROGRESS to something else 1594 * to signal the completion of the asynchronous request. 1595 */ 1596 void 1597 _aio_set_result(aio_req_t *reqp, ssize_t retval, int error) 1598 { 1599 aio_result_t *resultp = reqp->req_resultp; 1600 1601 if (POSIX_AIO(reqp)) { 1602 resultp->aio_return = retval; 1603 membar_producer(); 1604 resultp->aio_errno = error; 1605 } else { 1606 resultp->aio_errno = error; 1607 membar_producer(); 1608 resultp->aio_return = retval; 1609 } 1610 } 1611 1612 /* 1613 * Add an AIO request onto the next work queue. 1614 * A circular list of workers is used to choose the next worker. 1615 */ 1616 void 1617 _aio_req_add(aio_req_t *reqp, aio_worker_t **nextworker, int mode) 1618 { 1619 ulwp_t *self = curthread; 1620 aio_worker_t *aiowp; 1621 aio_worker_t *first; 1622 int load_bal_flg = 1; 1623 int found; 1624 1625 ASSERT(reqp->req_state != AIO_REQ_DONEQ); 1626 reqp->req_next = NULL; 1627 /* 1628 * Try to acquire the next worker's work queue. If it is locked, 1629 * then search the list of workers until a queue is found unlocked, 1630 * or until the list is completely traversed at which point another 1631 * worker will be created. 1632 */ 1633 sigoff(self); /* defer SIGIO */ 1634 sig_mutex_lock(&__aio_mutex); 1635 first = aiowp = *nextworker; 1636 if (mode != AIONOTIFY) 1637 _aio_outstand_cnt++; 1638 sig_mutex_unlock(&__aio_mutex); 1639 1640 switch (mode) { 1641 case AIOREAD: 1642 case AIOWRITE: 1643 case AIOAREAD: 1644 case AIOAWRITE: 1645 #if !defined(_LP64) 1646 case AIOAREAD64: 1647 case AIOAWRITE64: 1648 #endif 1649 /* try to find an idle worker */ 1650 found = 0; 1651 do { 1652 if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) { 1653 if (aiowp->work_idleflg) { 1654 found = 1; 1655 break; 1656 } 1657 sig_mutex_unlock(&aiowp->work_qlock1); 1658 } 1659 } while ((aiowp = aiowp->work_forw) != first); 1660 1661 if (found) { 1662 aiowp->work_minload1++; 1663 break; 1664 } 1665 1666 /* try to acquire some worker's queue lock */ 1667 do { 1668 if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) { 1669 found = 1; 1670 break; 1671 } 1672 } while ((aiowp = aiowp->work_forw) != first); 1673 1674 /* 1675 * Create more workers when the workers appear overloaded. 1676 * Either all the workers are busy draining their queues 1677 * or no worker's queue lock could be acquired. 1678 */ 1679 if (!found) { 1680 if (_aio_worker_cnt < _max_workers) { 1681 if (_aio_create_worker(reqp, mode)) 1682 aio_panic("_aio_req_add: add worker"); 1683 sigon(self); /* reenable SIGIO */ 1684 return; 1685 } 1686 1687 /* 1688 * No worker available and we have created 1689 * _max_workers, keep going through the 1690 * list slowly until we get a lock 1691 */ 1692 while (sig_mutex_trylock(&aiowp->work_qlock1) != 0) { 1693 /* 1694 * give someone else a chance 1695 */ 1696 _aio_delay(1); 1697 aiowp = aiowp->work_forw; 1698 } 1699 } 1700 1701 ASSERT(MUTEX_HELD(&aiowp->work_qlock1)); 1702 if (_aio_worker_cnt < _max_workers && 1703 aiowp->work_minload1 >= _minworkload) { 1704 sig_mutex_unlock(&aiowp->work_qlock1); 1705 sig_mutex_lock(&__aio_mutex); 1706 *nextworker = aiowp->work_forw; 1707 sig_mutex_unlock(&__aio_mutex); 1708 if (_aio_create_worker(reqp, mode)) 1709 aio_panic("aio_req_add: add worker"); 1710 sigon(self); /* reenable SIGIO */ 1711 return; 1712 } 1713 aiowp->work_minload1++; 1714 break; 1715 case AIOFSYNC: 1716 case AIONOTIFY: 1717 load_bal_flg = 0; 1718 sig_mutex_lock(&aiowp->work_qlock1); 1719 break; 1720 default: 1721 aio_panic("_aio_req_add: invalid mode"); 1722 break; 1723 } 1724 /* 1725 * Put request onto worker's work queue. 1726 */ 1727 if (aiowp->work_tail1 == NULL) { 1728 ASSERT(aiowp->work_count1 == 0); 1729 aiowp->work_tail1 = reqp; 1730 aiowp->work_next1 = reqp; 1731 } else { 1732 aiowp->work_head1->req_next = reqp; 1733 if (aiowp->work_next1 == NULL) 1734 aiowp->work_next1 = reqp; 1735 } 1736 reqp->req_state = AIO_REQ_QUEUED; 1737 reqp->req_worker = aiowp; 1738 aiowp->work_head1 = reqp; 1739 /* 1740 * Awaken worker if it is not currently active. 1741 */ 1742 if (aiowp->work_count1++ == 0 && aiowp->work_idleflg) { 1743 aiowp->work_idleflg = 0; 1744 (void) cond_signal(&aiowp->work_idle_cv); 1745 } 1746 sig_mutex_unlock(&aiowp->work_qlock1); 1747 1748 if (load_bal_flg) { 1749 sig_mutex_lock(&__aio_mutex); 1750 *nextworker = aiowp->work_forw; 1751 sig_mutex_unlock(&__aio_mutex); 1752 } 1753 sigon(self); /* reenable SIGIO */ 1754 } 1755 1756 /* 1757 * Get an AIO request for a specified worker. 1758 * If the work queue is empty, return NULL. 1759 */ 1760 aio_req_t * 1761 _aio_req_get(aio_worker_t *aiowp) 1762 { 1763 aio_req_t *reqp; 1764 1765 sig_mutex_lock(&aiowp->work_qlock1); 1766 if ((reqp = aiowp->work_next1) != NULL) { 1767 /* 1768 * Remove a POSIX request from the queue; the 1769 * request queue is a singularly linked list 1770 * with a previous pointer. The request is 1771 * removed by updating the previous pointer. 1772 * 1773 * Non-posix requests are left on the queue 1774 * to eventually be placed on the done queue. 1775 */ 1776 1777 if (POSIX_AIO(reqp)) { 1778 if (aiowp->work_prev1 == NULL) { 1779 aiowp->work_tail1 = reqp->req_next; 1780 if (aiowp->work_tail1 == NULL) 1781 aiowp->work_head1 = NULL; 1782 } else { 1783 aiowp->work_prev1->req_next = reqp->req_next; 1784 if (aiowp->work_head1 == reqp) 1785 aiowp->work_head1 = reqp->req_next; 1786 } 1787 1788 } else { 1789 aiowp->work_prev1 = reqp; 1790 ASSERT(aiowp->work_done1 >= 0); 1791 aiowp->work_done1++; 1792 } 1793 ASSERT(reqp != reqp->req_next); 1794 aiowp->work_next1 = reqp->req_next; 1795 ASSERT(aiowp->work_count1 >= 1); 1796 aiowp->work_count1--; 1797 switch (reqp->req_op) { 1798 case AIOREAD: 1799 case AIOWRITE: 1800 case AIOAREAD: 1801 case AIOAWRITE: 1802 #if !defined(_LP64) 1803 case AIOAREAD64: 1804 case AIOAWRITE64: 1805 #endif 1806 ASSERT(aiowp->work_minload1 > 0); 1807 aiowp->work_minload1--; 1808 break; 1809 } 1810 reqp->req_state = AIO_REQ_INPROGRESS; 1811 } 1812 aiowp->work_req = reqp; 1813 ASSERT(reqp != NULL || aiowp->work_count1 == 0); 1814 sig_mutex_unlock(&aiowp->work_qlock1); 1815 return (reqp); 1816 } 1817 1818 static void 1819 _aio_req_del(aio_worker_t *aiowp, aio_req_t *reqp, int ostate) 1820 { 1821 aio_req_t **last; 1822 aio_req_t *lastrp; 1823 aio_req_t *next; 1824 1825 ASSERT(aiowp != NULL); 1826 ASSERT(MUTEX_HELD(&aiowp->work_qlock1)); 1827 if (POSIX_AIO(reqp)) { 1828 if (ostate != AIO_REQ_QUEUED) 1829 return; 1830 } 1831 last = &aiowp->work_tail1; 1832 lastrp = aiowp->work_tail1; 1833 ASSERT(ostate == AIO_REQ_QUEUED || ostate == AIO_REQ_INPROGRESS); 1834 while ((next = *last) != NULL) { 1835 if (next == reqp) { 1836 *last = next->req_next; 1837 if (aiowp->work_next1 == next) 1838 aiowp->work_next1 = next->req_next; 1839 1840 if ((next->req_next != NULL) || 1841 (aiowp->work_done1 == 0)) { 1842 if (aiowp->work_head1 == next) 1843 aiowp->work_head1 = next->req_next; 1844 if (aiowp->work_prev1 == next) 1845 aiowp->work_prev1 = next->req_next; 1846 } else { 1847 if (aiowp->work_head1 == next) 1848 aiowp->work_head1 = lastrp; 1849 if (aiowp->work_prev1 == next) 1850 aiowp->work_prev1 = lastrp; 1851 } 1852 1853 if (ostate == AIO_REQ_QUEUED) { 1854 ASSERT(aiowp->work_count1 >= 1); 1855 aiowp->work_count1--; 1856 ASSERT(aiowp->work_minload1 >= 1); 1857 aiowp->work_minload1--; 1858 } else { 1859 ASSERT(ostate == AIO_REQ_INPROGRESS && 1860 !POSIX_AIO(reqp)); 1861 aiowp->work_done1--; 1862 } 1863 return; 1864 } 1865 last = &next->req_next; 1866 lastrp = next; 1867 } 1868 /* NOTREACHED */ 1869 } 1870 1871 static void 1872 _aio_enq_doneq(aio_req_t *reqp) 1873 { 1874 if (_aio_doneq == NULL) { 1875 _aio_doneq = reqp; 1876 reqp->req_next = reqp->req_prev = reqp; 1877 } else { 1878 reqp->req_next = _aio_doneq; 1879 reqp->req_prev = _aio_doneq->req_prev; 1880 _aio_doneq->req_prev->req_next = reqp; 1881 _aio_doneq->req_prev = reqp; 1882 } 1883 reqp->req_state = AIO_REQ_DONEQ; 1884 _aio_doneq_cnt++; 1885 } 1886 1887 /* 1888 * caller owns the _aio_mutex 1889 */ 1890 aio_req_t * 1891 _aio_req_remove(aio_req_t *reqp) 1892 { 1893 if (reqp && reqp->req_state != AIO_REQ_DONEQ) 1894 return (NULL); 1895 1896 if (reqp) { 1897 /* request in done queue */ 1898 if (_aio_doneq == reqp) 1899 _aio_doneq = reqp->req_next; 1900 if (_aio_doneq == reqp) { 1901 /* only one request on queue */ 1902 _aio_doneq = NULL; 1903 } else { 1904 aio_req_t *tmp = reqp->req_next; 1905 reqp->req_prev->req_next = tmp; 1906 tmp->req_prev = reqp->req_prev; 1907 } 1908 } else if ((reqp = _aio_doneq) != NULL) { 1909 if (reqp == reqp->req_next) { 1910 /* only one request on queue */ 1911 _aio_doneq = NULL; 1912 } else { 1913 reqp->req_prev->req_next = _aio_doneq = reqp->req_next; 1914 _aio_doneq->req_prev = reqp->req_prev; 1915 } 1916 } 1917 if (reqp) { 1918 _aio_doneq_cnt--; 1919 reqp->req_next = reqp->req_prev = reqp; 1920 reqp->req_state = AIO_REQ_DONE; 1921 } 1922 return (reqp); 1923 } 1924 1925 /* 1926 * An AIO request is identified by an aio_result_t pointer. The library 1927 * maps this aio_result_t pointer to its internal representation using a 1928 * hash table. This function adds an aio_result_t pointer to the hash table. 1929 */ 1930 static int 1931 _aio_hash_insert(aio_result_t *resultp, aio_req_t *reqp) 1932 { 1933 aio_hash_t *hashp; 1934 aio_req_t **prev; 1935 aio_req_t *next; 1936 1937 hashp = _aio_hash + AIOHASH(resultp); 1938 lmutex_lock(&hashp->hash_lock); 1939 prev = &hashp->hash_ptr; 1940 while ((next = *prev) != NULL) { 1941 if (resultp == next->req_resultp) { 1942 lmutex_unlock(&hashp->hash_lock); 1943 return (-1); 1944 } 1945 prev = &next->req_link; 1946 } 1947 *prev = reqp; 1948 ASSERT(reqp->req_link == NULL); 1949 lmutex_unlock(&hashp->hash_lock); 1950 return (0); 1951 } 1952 1953 /* 1954 * Remove an entry from the hash table. 1955 */ 1956 aio_req_t * 1957 _aio_hash_del(aio_result_t *resultp) 1958 { 1959 aio_hash_t *hashp; 1960 aio_req_t **prev; 1961 aio_req_t *next = NULL; 1962 1963 if (_aio_hash != NULL) { 1964 hashp = _aio_hash + AIOHASH(resultp); 1965 lmutex_lock(&hashp->hash_lock); 1966 prev = &hashp->hash_ptr; 1967 while ((next = *prev) != NULL) { 1968 if (resultp == next->req_resultp) { 1969 *prev = next->req_link; 1970 next->req_link = NULL; 1971 break; 1972 } 1973 prev = &next->req_link; 1974 } 1975 lmutex_unlock(&hashp->hash_lock); 1976 } 1977 return (next); 1978 } 1979 1980 /* 1981 * find an entry in the hash table 1982 */ 1983 aio_req_t * 1984 _aio_hash_find(aio_result_t *resultp) 1985 { 1986 aio_hash_t *hashp; 1987 aio_req_t **prev; 1988 aio_req_t *next = NULL; 1989 1990 if (_aio_hash != NULL) { 1991 hashp = _aio_hash + AIOHASH(resultp); 1992 lmutex_lock(&hashp->hash_lock); 1993 prev = &hashp->hash_ptr; 1994 while ((next = *prev) != NULL) { 1995 if (resultp == next->req_resultp) 1996 break; 1997 prev = &next->req_link; 1998 } 1999 lmutex_unlock(&hashp->hash_lock); 2000 } 2001 return (next); 2002 } 2003 2004 /* 2005 * AIO interface for POSIX 2006 */ 2007 int 2008 _aio_rw(aiocb_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker, 2009 int mode, int flg) 2010 { 2011 aio_req_t *reqp; 2012 aio_args_t *ap; 2013 int kerr; 2014 2015 if (aiocbp == NULL) { 2016 errno = EINVAL; 2017 return (-1); 2018 } 2019 2020 /* initialize kaio */ 2021 if (!_kaio_ok) 2022 _kaio_init(); 2023 2024 aiocbp->aio_state = NOCHECK; 2025 2026 /* 2027 * If we have been called because a list I/O 2028 * kaio() failed, we dont want to repeat the 2029 * system call 2030 */ 2031 2032 if (flg & AIO_KAIO) { 2033 /* 2034 * Try kernel aio first. 2035 * If errno is ENOTSUP/EBADFD, 2036 * fall back to the thread implementation. 2037 */ 2038 if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) { 2039 aiocbp->aio_resultp.aio_errno = EINPROGRESS; 2040 aiocbp->aio_state = CHECK; 2041 kerr = (int)_kaio(mode, aiocbp); 2042 if (kerr == 0) 2043 return (0); 2044 if (errno != ENOTSUP && errno != EBADFD) { 2045 aiocbp->aio_resultp.aio_errno = errno; 2046 aiocbp->aio_resultp.aio_return = -1; 2047 aiocbp->aio_state = NOCHECK; 2048 return (-1); 2049 } 2050 if (errno == EBADFD) 2051 SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes); 2052 } 2053 } 2054 2055 aiocbp->aio_resultp.aio_errno = EINPROGRESS; 2056 aiocbp->aio_state = USERAIO; 2057 2058 if (!__uaio_ok && __uaio_init() == -1) 2059 return (-1); 2060 2061 if ((reqp = _aio_req_alloc()) == NULL) { 2062 errno = EAGAIN; 2063 return (-1); 2064 } 2065 2066 /* 2067 * If an LIO request, add the list head to the aio request 2068 */ 2069 reqp->req_head = lio_head; 2070 reqp->req_type = AIO_POSIX_REQ; 2071 reqp->req_op = mode; 2072 reqp->req_largefile = 0; 2073 2074 if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) { 2075 reqp->req_sigevent.sigev_notify = SIGEV_NONE; 2076 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) { 2077 reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL; 2078 reqp->req_sigevent.sigev_signo = 2079 aiocbp->aio_sigevent.sigev_signo; 2080 reqp->req_sigevent.sigev_value.sival_ptr = 2081 aiocbp->aio_sigevent.sigev_value.sival_ptr; 2082 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) { 2083 port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr; 2084 reqp->req_sigevent.sigev_notify = SIGEV_PORT; 2085 /* 2086 * Reuse the sigevent structure to contain the port number 2087 * and the user value. Same for SIGEV_THREAD, below. 2088 */ 2089 reqp->req_sigevent.sigev_signo = 2090 pn->portnfy_port; 2091 reqp->req_sigevent.sigev_value.sival_ptr = 2092 pn->portnfy_user; 2093 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) { 2094 reqp->req_sigevent.sigev_notify = SIGEV_THREAD; 2095 /* 2096 * The sigevent structure contains the port number 2097 * and the user value. Same for SIGEV_PORT, above. 2098 */ 2099 reqp->req_sigevent.sigev_signo = 2100 aiocbp->aio_sigevent.sigev_signo; 2101 reqp->req_sigevent.sigev_value.sival_ptr = 2102 aiocbp->aio_sigevent.sigev_value.sival_ptr; 2103 } 2104 2105 reqp->req_resultp = &aiocbp->aio_resultp; 2106 reqp->req_aiocbp = aiocbp; 2107 ap = &reqp->req_args; 2108 ap->fd = aiocbp->aio_fildes; 2109 ap->buf = (caddr_t)aiocbp->aio_buf; 2110 ap->bufsz = aiocbp->aio_nbytes; 2111 ap->offset = aiocbp->aio_offset; 2112 2113 if ((flg & AIO_NO_DUPS) && 2114 _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) { 2115 aio_panic("_aio_rw(): request already in hash table"); 2116 _aio_req_free(reqp); 2117 errno = EINVAL; 2118 return (-1); 2119 } 2120 _aio_req_add(reqp, nextworker, mode); 2121 return (0); 2122 } 2123 2124 #if !defined(_LP64) 2125 /* 2126 * 64-bit AIO interface for POSIX 2127 */ 2128 int 2129 _aio_rw64(aiocb64_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker, 2130 int mode, int flg) 2131 { 2132 aio_req_t *reqp; 2133 aio_args_t *ap; 2134 int kerr; 2135 2136 if (aiocbp == NULL) { 2137 errno = EINVAL; 2138 return (-1); 2139 } 2140 2141 /* initialize kaio */ 2142 if (!_kaio_ok) 2143 _kaio_init(); 2144 2145 aiocbp->aio_state = NOCHECK; 2146 2147 /* 2148 * If we have been called because a list I/O 2149 * kaio() failed, we dont want to repeat the 2150 * system call 2151 */ 2152 2153 if (flg & AIO_KAIO) { 2154 /* 2155 * Try kernel aio first. 2156 * If errno is ENOTSUP/EBADFD, 2157 * fall back to the thread implementation. 2158 */ 2159 if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) { 2160 aiocbp->aio_resultp.aio_errno = EINPROGRESS; 2161 aiocbp->aio_state = CHECK; 2162 kerr = (int)_kaio(mode, aiocbp); 2163 if (kerr == 0) 2164 return (0); 2165 if (errno != ENOTSUP && errno != EBADFD) { 2166 aiocbp->aio_resultp.aio_errno = errno; 2167 aiocbp->aio_resultp.aio_return = -1; 2168 aiocbp->aio_state = NOCHECK; 2169 return (-1); 2170 } 2171 if (errno == EBADFD) 2172 SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes); 2173 } 2174 } 2175 2176 aiocbp->aio_resultp.aio_errno = EINPROGRESS; 2177 aiocbp->aio_state = USERAIO; 2178 2179 if (!__uaio_ok && __uaio_init() == -1) 2180 return (-1); 2181 2182 if ((reqp = _aio_req_alloc()) == NULL) { 2183 errno = EAGAIN; 2184 return (-1); 2185 } 2186 2187 /* 2188 * If an LIO request, add the list head to the aio request 2189 */ 2190 reqp->req_head = lio_head; 2191 reqp->req_type = AIO_POSIX_REQ; 2192 reqp->req_op = mode; 2193 reqp->req_largefile = 1; 2194 2195 if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) { 2196 reqp->req_sigevent.sigev_notify = SIGEV_NONE; 2197 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) { 2198 reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL; 2199 reqp->req_sigevent.sigev_signo = 2200 aiocbp->aio_sigevent.sigev_signo; 2201 reqp->req_sigevent.sigev_value.sival_ptr = 2202 aiocbp->aio_sigevent.sigev_value.sival_ptr; 2203 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) { 2204 port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr; 2205 reqp->req_sigevent.sigev_notify = SIGEV_PORT; 2206 reqp->req_sigevent.sigev_signo = 2207 pn->portnfy_port; 2208 reqp->req_sigevent.sigev_value.sival_ptr = 2209 pn->portnfy_user; 2210 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) { 2211 reqp->req_sigevent.sigev_notify = SIGEV_THREAD; 2212 reqp->req_sigevent.sigev_signo = 2213 aiocbp->aio_sigevent.sigev_signo; 2214 reqp->req_sigevent.sigev_value.sival_ptr = 2215 aiocbp->aio_sigevent.sigev_value.sival_ptr; 2216 } 2217 2218 reqp->req_resultp = &aiocbp->aio_resultp; 2219 reqp->req_aiocbp = aiocbp; 2220 ap = &reqp->req_args; 2221 ap->fd = aiocbp->aio_fildes; 2222 ap->buf = (caddr_t)aiocbp->aio_buf; 2223 ap->bufsz = aiocbp->aio_nbytes; 2224 ap->offset = aiocbp->aio_offset; 2225 2226 if ((flg & AIO_NO_DUPS) && 2227 _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) { 2228 aio_panic("_aio_rw64(): request already in hash table"); 2229 _aio_req_free(reqp); 2230 errno = EINVAL; 2231 return (-1); 2232 } 2233 _aio_req_add(reqp, nextworker, mode); 2234 return (0); 2235 } 2236 #endif /* !defined(_LP64) */ 2237