1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 22 /* 23 * Copyright 2008 Sun Microsystems, Inc. All rights reserved. 24 * Use is subject to license terms. 25 */ 26 27 #pragma ident "%Z%%M% %I% %E% SMI" 28 29 #include "synonyms.h" 30 #include "thr_uberdata.h" 31 #include "asyncio.h" 32 #include <atomic.h> 33 #include <sys/param.h> 34 #include <sys/file.h> 35 #include <sys/port.h> 36 37 static int _aio_hash_insert(aio_result_t *, aio_req_t *); 38 static aio_req_t *_aio_req_get(aio_worker_t *); 39 static void _aio_req_add(aio_req_t *, aio_worker_t **, int); 40 static void _aio_req_del(aio_worker_t *, aio_req_t *, int); 41 static void _aio_work_done(aio_worker_t *); 42 static void _aio_enq_doneq(aio_req_t *); 43 44 extern void _aio_lio_free(aio_lio_t *); 45 46 extern int __fdsync(int, int); 47 extern int _port_dispatch(int, int, int, int, uintptr_t, void *); 48 49 static int _aio_fsync_del(aio_worker_t *, aio_req_t *); 50 static void _aiodone(aio_req_t *, ssize_t, int); 51 static void _aio_cancel_work(aio_worker_t *, int, int *, int *); 52 static void _aio_finish_request(aio_worker_t *, ssize_t, int); 53 54 /* 55 * switch for kernel async I/O 56 */ 57 int _kaio_ok = 0; /* 0 = disabled, 1 = on, -1 = error */ 58 59 /* 60 * Key for thread-specific data 61 */ 62 pthread_key_t _aio_key; 63 64 /* 65 * Array for determining whether or not a file supports kaio. 66 * Initialized in _kaio_init(). 67 */ 68 uint32_t *_kaio_supported = NULL; 69 70 /* 71 * workers for read/write requests 72 * (__aio_mutex lock protects circular linked list of workers) 73 */ 74 aio_worker_t *__workers_rw; /* circular list of AIO workers */ 75 aio_worker_t *__nextworker_rw; /* next worker in list of workers */ 76 int __rw_workerscnt; /* number of read/write workers */ 77 78 /* 79 * worker for notification requests. 80 */ 81 aio_worker_t *__workers_no; /* circular list of AIO workers */ 82 aio_worker_t *__nextworker_no; /* next worker in list of workers */ 83 int __no_workerscnt; /* number of write workers */ 84 85 aio_req_t *_aio_done_tail; /* list of done requests */ 86 aio_req_t *_aio_done_head; 87 88 mutex_t __aio_initlock = DEFAULTMUTEX; /* makes aio initialization atomic */ 89 cond_t __aio_initcv = DEFAULTCV; 90 int __aio_initbusy = 0; 91 92 mutex_t __aio_mutex = DEFAULTMUTEX; /* protects counts, and linked lists */ 93 cond_t _aio_iowait_cv = DEFAULTCV; /* wait for userland I/Os */ 94 95 pid_t __pid = (pid_t)-1; /* initialize as invalid pid */ 96 int _sigio_enabled = 0; /* when set, send SIGIO signal */ 97 98 aio_hash_t *_aio_hash; 99 100 aio_req_t *_aio_doneq; /* double linked done queue list */ 101 102 int _aio_donecnt = 0; 103 int _aio_waitncnt = 0; /* # of requests for aio_waitn */ 104 int _aio_doneq_cnt = 0; 105 int _aio_outstand_cnt = 0; /* # of outstanding requests */ 106 int _kaio_outstand_cnt = 0; /* # of outstanding kaio requests */ 107 int _aio_req_done_cnt = 0; /* req. done but not in "done queue" */ 108 int _aio_kernel_suspend = 0; /* active kernel kaio calls */ 109 int _aio_suscv_cnt = 0; /* aio_suspend calls waiting on cv's */ 110 111 int _max_workers = 256; /* max number of workers permitted */ 112 int _min_workers = 4; /* min number of workers */ 113 int _minworkload = 2; /* min number of request in q */ 114 int _aio_worker_cnt = 0; /* number of workers to do requests */ 115 int __uaio_ok = 0; /* AIO has been enabled */ 116 sigset_t _worker_set; /* worker's signal mask */ 117 118 int _aiowait_flag = 0; /* when set, aiowait() is inprogress */ 119 int _aio_flags = 0; /* see asyncio.h defines for */ 120 121 aio_worker_t *_kaiowp = NULL; /* points to kaio cleanup thread */ 122 123 int hz; /* clock ticks per second */ 124 125 static int 126 _kaio_supported_init(void) 127 { 128 void *ptr; 129 size_t size; 130 131 if (_kaio_supported != NULL) /* already initialized */ 132 return (0); 133 134 size = MAX_KAIO_FDARRAY_SIZE * sizeof (uint32_t); 135 ptr = mmap(NULL, size, PROT_READ | PROT_WRITE, 136 MAP_PRIVATE | MAP_ANON, -1, (off_t)0); 137 if (ptr == MAP_FAILED) 138 return (-1); 139 _kaio_supported = ptr; 140 return (0); 141 } 142 143 /* 144 * The aio subsystem is initialized when an AIO request is made. 145 * Constants are initialized like the max number of workers that 146 * the subsystem can create, and the minimum number of workers 147 * permitted before imposing some restrictions. Also, some 148 * workers are created. 149 */ 150 int 151 __uaio_init(void) 152 { 153 int ret = -1; 154 int i; 155 int cancel_state; 156 157 lmutex_lock(&__aio_initlock); 158 (void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state); 159 while (__aio_initbusy) 160 (void) cond_wait(&__aio_initcv, &__aio_initlock); 161 (void) pthread_setcancelstate(cancel_state, NULL); 162 if (__uaio_ok) { /* already initialized */ 163 lmutex_unlock(&__aio_initlock); 164 return (0); 165 } 166 __aio_initbusy = 1; 167 lmutex_unlock(&__aio_initlock); 168 169 hz = (int)sysconf(_SC_CLK_TCK); 170 __pid = getpid(); 171 172 setup_cancelsig(SIGAIOCANCEL); 173 174 if (_kaio_supported_init() != 0) 175 goto out; 176 177 /* 178 * Allocate and initialize the hash table. 179 * Do this only once, even if __uaio_init() is called twice. 180 */ 181 if (_aio_hash == NULL) { 182 /* LINTED pointer cast */ 183 _aio_hash = (aio_hash_t *)mmap(NULL, 184 HASHSZ * sizeof (aio_hash_t), PROT_READ | PROT_WRITE, 185 MAP_PRIVATE | MAP_ANON, -1, (off_t)0); 186 if ((void *)_aio_hash == MAP_FAILED) { 187 _aio_hash = NULL; 188 goto out; 189 } 190 for (i = 0; i < HASHSZ; i++) 191 (void) mutex_init(&_aio_hash[i].hash_lock, 192 USYNC_THREAD, NULL); 193 } 194 195 /* 196 * Initialize worker's signal mask to only catch SIGAIOCANCEL. 197 */ 198 (void) sigfillset(&_worker_set); 199 (void) sigdelset(&_worker_set, SIGAIOCANCEL); 200 201 /* 202 * Create one worker to send asynchronous notifications. 203 * Do this only once, even if __uaio_init() is called twice. 204 */ 205 if (__no_workerscnt == 0 && 206 (_aio_create_worker(NULL, AIONOTIFY) != 0)) { 207 errno = EAGAIN; 208 goto out; 209 } 210 211 /* 212 * Create the minimum number of read/write workers. 213 * And later check whether atleast one worker is created; 214 * lwp_create() calls could fail because of segkp exhaustion. 215 */ 216 for (i = 0; i < _min_workers; i++) 217 (void) _aio_create_worker(NULL, AIOREAD); 218 if (__rw_workerscnt == 0) { 219 errno = EAGAIN; 220 goto out; 221 } 222 223 ret = 0; 224 out: 225 lmutex_lock(&__aio_initlock); 226 if (ret == 0) 227 __uaio_ok = 1; 228 __aio_initbusy = 0; 229 (void) cond_broadcast(&__aio_initcv); 230 lmutex_unlock(&__aio_initlock); 231 return (ret); 232 } 233 234 /* 235 * Called from close() before actually performing the real _close(). 236 */ 237 void 238 _aio_close(int fd) 239 { 240 if (fd < 0) /* avoid cancelling everything */ 241 return; 242 /* 243 * Cancel all outstanding aio requests for this file descriptor. 244 */ 245 if (__uaio_ok) 246 (void) aiocancel_all(fd); 247 /* 248 * If we have allocated the bit array, clear the bit for this file. 249 * The next open may re-use this file descriptor and the new file 250 * may have different kaio() behaviour. 251 */ 252 if (_kaio_supported != NULL) 253 CLEAR_KAIO_SUPPORTED(fd); 254 } 255 256 /* 257 * special kaio cleanup thread sits in a loop in the 258 * kernel waiting for pending kaio requests to complete. 259 */ 260 void * 261 _kaio_cleanup_thread(void *arg) 262 { 263 if (pthread_setspecific(_aio_key, arg) != 0) 264 aio_panic("_kaio_cleanup_thread, pthread_setspecific()"); 265 (void) _kaio(AIOSTART); 266 return (arg); 267 } 268 269 /* 270 * initialize kaio. 271 */ 272 void 273 _kaio_init() 274 { 275 int error; 276 sigset_t oset; 277 int cancel_state; 278 279 lmutex_lock(&__aio_initlock); 280 (void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state); 281 while (__aio_initbusy) 282 (void) cond_wait(&__aio_initcv, &__aio_initlock); 283 (void) pthread_setcancelstate(cancel_state, NULL); 284 if (_kaio_ok) { /* already initialized */ 285 lmutex_unlock(&__aio_initlock); 286 return; 287 } 288 __aio_initbusy = 1; 289 lmutex_unlock(&__aio_initlock); 290 291 if (_kaio_supported_init() != 0) 292 error = ENOMEM; 293 else if ((_kaiowp = _aio_worker_alloc()) == NULL) 294 error = ENOMEM; 295 else if ((error = (int)_kaio(AIOINIT)) == 0) { 296 (void) pthread_sigmask(SIG_SETMASK, &maskset, &oset); 297 error = thr_create(NULL, AIOSTKSIZE, _kaio_cleanup_thread, 298 _kaiowp, THR_DAEMON, &_kaiowp->work_tid); 299 (void) pthread_sigmask(SIG_SETMASK, &oset, NULL); 300 } 301 if (error && _kaiowp != NULL) { 302 _aio_worker_free(_kaiowp); 303 _kaiowp = NULL; 304 } 305 306 lmutex_lock(&__aio_initlock); 307 if (error) 308 _kaio_ok = -1; 309 else 310 _kaio_ok = 1; 311 __aio_initbusy = 0; 312 (void) cond_broadcast(&__aio_initcv); 313 lmutex_unlock(&__aio_initlock); 314 } 315 316 int 317 aioread(int fd, caddr_t buf, int bufsz, off_t offset, int whence, 318 aio_result_t *resultp) 319 { 320 return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOREAD)); 321 } 322 323 int 324 aiowrite(int fd, caddr_t buf, int bufsz, off_t offset, int whence, 325 aio_result_t *resultp) 326 { 327 return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOWRITE)); 328 } 329 330 #if !defined(_LP64) 331 int 332 aioread64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence, 333 aio_result_t *resultp) 334 { 335 return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAREAD64)); 336 } 337 338 int 339 aiowrite64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence, 340 aio_result_t *resultp) 341 { 342 return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAWRITE64)); 343 } 344 #endif /* !defined(_LP64) */ 345 346 int 347 _aiorw(int fd, caddr_t buf, int bufsz, offset_t offset, int whence, 348 aio_result_t *resultp, int mode) 349 { 350 aio_req_t *reqp; 351 aio_args_t *ap; 352 offset_t loffset; 353 struct stat64 stat64; 354 int error = 0; 355 int kerr; 356 int umode; 357 358 switch (whence) { 359 360 case SEEK_SET: 361 loffset = offset; 362 break; 363 case SEEK_CUR: 364 if ((loffset = llseek(fd, 0, SEEK_CUR)) == -1) 365 error = -1; 366 else 367 loffset += offset; 368 break; 369 case SEEK_END: 370 if (fstat64(fd, &stat64) == -1) 371 error = -1; 372 else 373 loffset = offset + stat64.st_size; 374 break; 375 default: 376 errno = EINVAL; 377 error = -1; 378 } 379 380 if (error) 381 return (error); 382 383 /* initialize kaio */ 384 if (!_kaio_ok) 385 _kaio_init(); 386 387 /* 388 * _aio_do_request() needs the original request code (mode) to be able 389 * to choose the appropiate 32/64 bit function. All other functions 390 * only require the difference between READ and WRITE (umode). 391 */ 392 if (mode == AIOAREAD64 || mode == AIOAWRITE64) 393 umode = mode - AIOAREAD64; 394 else 395 umode = mode; 396 397 /* 398 * Try kernel aio first. 399 * If errno is ENOTSUP/EBADFD, fall back to the thread implementation. 400 */ 401 if (_kaio_ok > 0 && KAIO_SUPPORTED(fd)) { 402 resultp->aio_errno = 0; 403 sig_mutex_lock(&__aio_mutex); 404 _kaio_outstand_cnt++; 405 sig_mutex_unlock(&__aio_mutex); 406 kerr = (int)_kaio(((resultp->aio_return == AIO_INPROGRESS) ? 407 (umode | AIO_POLL_BIT) : umode), 408 fd, buf, bufsz, loffset, resultp); 409 if (kerr == 0) { 410 return (0); 411 } 412 sig_mutex_lock(&__aio_mutex); 413 _kaio_outstand_cnt--; 414 sig_mutex_unlock(&__aio_mutex); 415 if (errno != ENOTSUP && errno != EBADFD) 416 return (-1); 417 if (errno == EBADFD) 418 SET_KAIO_NOT_SUPPORTED(fd); 419 } 420 421 if (!__uaio_ok && __uaio_init() == -1) 422 return (-1); 423 424 if ((reqp = _aio_req_alloc()) == NULL) { 425 errno = EAGAIN; 426 return (-1); 427 } 428 429 /* 430 * _aio_do_request() checks reqp->req_op to differentiate 431 * between 32 and 64 bit access. 432 */ 433 reqp->req_op = mode; 434 reqp->req_resultp = resultp; 435 ap = &reqp->req_args; 436 ap->fd = fd; 437 ap->buf = buf; 438 ap->bufsz = bufsz; 439 ap->offset = loffset; 440 441 if (_aio_hash_insert(resultp, reqp) != 0) { 442 _aio_req_free(reqp); 443 errno = EINVAL; 444 return (-1); 445 } 446 /* 447 * _aio_req_add() only needs the difference between READ and 448 * WRITE to choose the right worker queue. 449 */ 450 _aio_req_add(reqp, &__nextworker_rw, umode); 451 return (0); 452 } 453 454 int 455 aiocancel(aio_result_t *resultp) 456 { 457 aio_req_t *reqp; 458 aio_worker_t *aiowp; 459 int ret; 460 int done = 0; 461 int canceled = 0; 462 463 if (!__uaio_ok) { 464 errno = EINVAL; 465 return (-1); 466 } 467 468 sig_mutex_lock(&__aio_mutex); 469 reqp = _aio_hash_find(resultp); 470 if (reqp == NULL) { 471 if (_aio_outstand_cnt == _aio_req_done_cnt) 472 errno = EINVAL; 473 else 474 errno = EACCES; 475 ret = -1; 476 } else { 477 aiowp = reqp->req_worker; 478 sig_mutex_lock(&aiowp->work_qlock1); 479 (void) _aio_cancel_req(aiowp, reqp, &canceled, &done); 480 sig_mutex_unlock(&aiowp->work_qlock1); 481 482 if (canceled) { 483 ret = 0; 484 } else { 485 if (_aio_outstand_cnt == 0 || 486 _aio_outstand_cnt == _aio_req_done_cnt) 487 errno = EINVAL; 488 else 489 errno = EACCES; 490 ret = -1; 491 } 492 } 493 sig_mutex_unlock(&__aio_mutex); 494 return (ret); 495 } 496 497 /* ARGSUSED */ 498 static void 499 _aiowait_cleanup(void *arg) 500 { 501 sig_mutex_lock(&__aio_mutex); 502 _aiowait_flag--; 503 sig_mutex_unlock(&__aio_mutex); 504 } 505 506 /* 507 * This must be asynch safe and cancel safe 508 */ 509 aio_result_t * 510 aiowait(struct timeval *uwait) 511 { 512 aio_result_t *uresultp; 513 aio_result_t *kresultp; 514 aio_result_t *resultp; 515 int dontblock; 516 int timedwait = 0; 517 int kaio_errno = 0; 518 struct timeval twait; 519 struct timeval *wait = NULL; 520 hrtime_t hrtend; 521 hrtime_t hres; 522 523 if (uwait) { 524 /* 525 * Check for a valid specified wait time. 526 * If it is invalid, fail the call right away. 527 */ 528 if (uwait->tv_sec < 0 || uwait->tv_usec < 0 || 529 uwait->tv_usec >= MICROSEC) { 530 errno = EINVAL; 531 return ((aio_result_t *)-1); 532 } 533 534 if (uwait->tv_sec > 0 || uwait->tv_usec > 0) { 535 hrtend = gethrtime() + 536 (hrtime_t)uwait->tv_sec * NANOSEC + 537 (hrtime_t)uwait->tv_usec * (NANOSEC / MICROSEC); 538 twait = *uwait; 539 wait = &twait; 540 timedwait++; 541 } else { 542 /* polling */ 543 sig_mutex_lock(&__aio_mutex); 544 if (_kaio_outstand_cnt == 0) { 545 kresultp = (aio_result_t *)-1; 546 } else { 547 kresultp = (aio_result_t *)_kaio(AIOWAIT, 548 (struct timeval *)-1, 1); 549 if (kresultp != (aio_result_t *)-1 && 550 kresultp != NULL && 551 kresultp != (aio_result_t *)1) { 552 _kaio_outstand_cnt--; 553 sig_mutex_unlock(&__aio_mutex); 554 return (kresultp); 555 } 556 } 557 uresultp = _aio_req_done(); 558 sig_mutex_unlock(&__aio_mutex); 559 if (uresultp != NULL && 560 uresultp != (aio_result_t *)-1) { 561 return (uresultp); 562 } 563 if (uresultp == (aio_result_t *)-1 && 564 kresultp == (aio_result_t *)-1) { 565 errno = EINVAL; 566 return ((aio_result_t *)-1); 567 } else { 568 return (NULL); 569 } 570 } 571 } 572 573 for (;;) { 574 sig_mutex_lock(&__aio_mutex); 575 uresultp = _aio_req_done(); 576 if (uresultp != NULL && uresultp != (aio_result_t *)-1) { 577 sig_mutex_unlock(&__aio_mutex); 578 resultp = uresultp; 579 break; 580 } 581 _aiowait_flag++; 582 dontblock = (uresultp == (aio_result_t *)-1); 583 if (dontblock && _kaio_outstand_cnt == 0) { 584 kresultp = (aio_result_t *)-1; 585 kaio_errno = EINVAL; 586 } else { 587 sig_mutex_unlock(&__aio_mutex); 588 pthread_cleanup_push(_aiowait_cleanup, NULL); 589 _cancel_prologue(); 590 kresultp = (aio_result_t *)_kaio(AIOWAIT, 591 wait, dontblock); 592 _cancel_epilogue(); 593 pthread_cleanup_pop(0); 594 sig_mutex_lock(&__aio_mutex); 595 kaio_errno = errno; 596 } 597 _aiowait_flag--; 598 sig_mutex_unlock(&__aio_mutex); 599 if (kresultp == (aio_result_t *)1) { 600 /* aiowait() awakened by an aionotify() */ 601 continue; 602 } else if (kresultp != NULL && 603 kresultp != (aio_result_t *)-1) { 604 resultp = kresultp; 605 sig_mutex_lock(&__aio_mutex); 606 _kaio_outstand_cnt--; 607 sig_mutex_unlock(&__aio_mutex); 608 break; 609 } else if (kresultp == (aio_result_t *)-1 && 610 kaio_errno == EINVAL && 611 uresultp == (aio_result_t *)-1) { 612 errno = kaio_errno; 613 resultp = (aio_result_t *)-1; 614 break; 615 } else if (kresultp == (aio_result_t *)-1 && 616 kaio_errno == EINTR) { 617 errno = kaio_errno; 618 resultp = (aio_result_t *)-1; 619 break; 620 } else if (timedwait) { 621 hres = hrtend - gethrtime(); 622 if (hres <= 0) { 623 /* time is up; return */ 624 resultp = NULL; 625 break; 626 } else { 627 /* 628 * Some time left. Round up the remaining time 629 * in nanoseconds to microsec. Retry the call. 630 */ 631 hres += (NANOSEC / MICROSEC) - 1; 632 wait->tv_sec = hres / NANOSEC; 633 wait->tv_usec = 634 (hres % NANOSEC) / (NANOSEC / MICROSEC); 635 } 636 } else { 637 ASSERT(kresultp == NULL && uresultp == NULL); 638 resultp = NULL; 639 continue; 640 } 641 } 642 return (resultp); 643 } 644 645 /* 646 * _aio_get_timedelta calculates the remaining time and stores the result 647 * into timespec_t *wait. 648 */ 649 650 int 651 _aio_get_timedelta(timespec_t *end, timespec_t *wait) 652 { 653 int ret = 0; 654 struct timeval cur; 655 timespec_t curtime; 656 657 (void) gettimeofday(&cur, NULL); 658 curtime.tv_sec = cur.tv_sec; 659 curtime.tv_nsec = cur.tv_usec * 1000; /* convert us to ns */ 660 661 if (end->tv_sec >= curtime.tv_sec) { 662 wait->tv_sec = end->tv_sec - curtime.tv_sec; 663 if (end->tv_nsec >= curtime.tv_nsec) { 664 wait->tv_nsec = end->tv_nsec - curtime.tv_nsec; 665 if (wait->tv_sec == 0 && wait->tv_nsec == 0) 666 ret = -1; /* timer expired */ 667 } else { 668 if (end->tv_sec > curtime.tv_sec) { 669 wait->tv_sec -= 1; 670 wait->tv_nsec = NANOSEC - 671 (curtime.tv_nsec - end->tv_nsec); 672 } else { 673 ret = -1; /* timer expired */ 674 } 675 } 676 } else { 677 ret = -1; 678 } 679 return (ret); 680 } 681 682 /* 683 * If closing by file descriptor: we will simply cancel all the outstanding 684 * aio`s and return. Those aio's in question will have either noticed the 685 * cancellation notice before, during, or after initiating io. 686 */ 687 int 688 aiocancel_all(int fd) 689 { 690 aio_req_t *reqp; 691 aio_req_t **reqpp; 692 aio_worker_t *first; 693 aio_worker_t *next; 694 int canceled = 0; 695 int done = 0; 696 int cancelall = 0; 697 698 sig_mutex_lock(&__aio_mutex); 699 700 if (_aio_outstand_cnt == 0) { 701 sig_mutex_unlock(&__aio_mutex); 702 return (AIO_ALLDONE); 703 } 704 705 /* 706 * Cancel requests from the read/write workers' queues. 707 */ 708 first = __nextworker_rw; 709 next = first; 710 do { 711 _aio_cancel_work(next, fd, &canceled, &done); 712 } while ((next = next->work_forw) != first); 713 714 /* 715 * finally, check if there are requests on the done queue that 716 * should be canceled. 717 */ 718 if (fd < 0) 719 cancelall = 1; 720 reqpp = &_aio_done_tail; 721 while ((reqp = *reqpp) != NULL) { 722 if (cancelall || reqp->req_args.fd == fd) { 723 *reqpp = reqp->req_next; 724 _aio_donecnt--; 725 (void) _aio_hash_del(reqp->req_resultp); 726 _aio_req_free(reqp); 727 } else 728 reqpp = &reqp->req_next; 729 } 730 if (cancelall) { 731 ASSERT(_aio_donecnt == 0); 732 _aio_done_head = NULL; 733 } 734 sig_mutex_unlock(&__aio_mutex); 735 736 if (canceled && done == 0) 737 return (AIO_CANCELED); 738 else if (done && canceled == 0) 739 return (AIO_ALLDONE); 740 else if ((canceled + done == 0) && KAIO_SUPPORTED(fd)) 741 return ((int)_kaio(AIOCANCEL, fd, NULL)); 742 return (AIO_NOTCANCELED); 743 } 744 745 /* 746 * Cancel requests from a given work queue. If the file descriptor 747 * parameter, fd, is non-negative, then only cancel those requests 748 * in this queue that are to this file descriptor. If the fd 749 * parameter is -1, then cancel all requests. 750 */ 751 static void 752 _aio_cancel_work(aio_worker_t *aiowp, int fd, int *canceled, int *done) 753 { 754 aio_req_t *reqp; 755 756 sig_mutex_lock(&aiowp->work_qlock1); 757 /* 758 * cancel queued requests first. 759 */ 760 reqp = aiowp->work_tail1; 761 while (reqp != NULL) { 762 if (fd < 0 || reqp->req_args.fd == fd) { 763 if (_aio_cancel_req(aiowp, reqp, canceled, done)) { 764 /* 765 * Callers locks were dropped. 766 * reqp is invalid; start traversing 767 * the list from the beginning again. 768 */ 769 reqp = aiowp->work_tail1; 770 continue; 771 } 772 } 773 reqp = reqp->req_next; 774 } 775 /* 776 * Since the queued requests have been canceled, there can 777 * only be one inprogress request that should be canceled. 778 */ 779 if ((reqp = aiowp->work_req) != NULL && 780 (fd < 0 || reqp->req_args.fd == fd)) 781 (void) _aio_cancel_req(aiowp, reqp, canceled, done); 782 sig_mutex_unlock(&aiowp->work_qlock1); 783 } 784 785 /* 786 * Cancel a request. Return 1 if the callers locks were temporarily 787 * dropped, otherwise return 0. 788 */ 789 int 790 _aio_cancel_req(aio_worker_t *aiowp, aio_req_t *reqp, int *canceled, int *done) 791 { 792 int ostate = reqp->req_state; 793 794 ASSERT(MUTEX_HELD(&__aio_mutex)); 795 ASSERT(MUTEX_HELD(&aiowp->work_qlock1)); 796 if (ostate == AIO_REQ_CANCELED) 797 return (0); 798 if (ostate == AIO_REQ_DONE || ostate == AIO_REQ_DONEQ) { 799 (*done)++; 800 return (0); 801 } 802 if (reqp->req_op == AIOFSYNC && reqp != aiowp->work_req) { 803 ASSERT(POSIX_AIO(reqp)); 804 /* Cancel the queued aio_fsync() request */ 805 if (!reqp->req_head->lio_canned) { 806 reqp->req_head->lio_canned = 1; 807 _aio_outstand_cnt--; 808 (*canceled)++; 809 } 810 return (0); 811 } 812 reqp->req_state = AIO_REQ_CANCELED; 813 _aio_req_del(aiowp, reqp, ostate); 814 (void) _aio_hash_del(reqp->req_resultp); 815 (*canceled)++; 816 if (reqp == aiowp->work_req) { 817 ASSERT(ostate == AIO_REQ_INPROGRESS); 818 /* 819 * Set the result values now, before _aiodone() is called. 820 * We do this because the application can expect aio_return 821 * and aio_errno to be set to -1 and ECANCELED, respectively, 822 * immediately after a successful return from aiocancel() 823 * or aio_cancel(). 824 */ 825 _aio_set_result(reqp, -1, ECANCELED); 826 (void) thr_kill(aiowp->work_tid, SIGAIOCANCEL); 827 return (0); 828 } 829 if (!POSIX_AIO(reqp)) { 830 _aio_outstand_cnt--; 831 _aio_set_result(reqp, -1, ECANCELED); 832 return (0); 833 } 834 sig_mutex_unlock(&aiowp->work_qlock1); 835 sig_mutex_unlock(&__aio_mutex); 836 _aiodone(reqp, -1, ECANCELED); 837 sig_mutex_lock(&__aio_mutex); 838 sig_mutex_lock(&aiowp->work_qlock1); 839 return (1); 840 } 841 842 int 843 _aio_create_worker(aio_req_t *reqp, int mode) 844 { 845 aio_worker_t *aiowp, **workers, **nextworker; 846 int *aio_workerscnt; 847 void *(*func)(void *); 848 sigset_t oset; 849 int error; 850 851 /* 852 * Put the new worker thread in the right queue. 853 */ 854 switch (mode) { 855 case AIOREAD: 856 case AIOWRITE: 857 case AIOAREAD: 858 case AIOAWRITE: 859 #if !defined(_LP64) 860 case AIOAREAD64: 861 case AIOAWRITE64: 862 #endif 863 workers = &__workers_rw; 864 nextworker = &__nextworker_rw; 865 aio_workerscnt = &__rw_workerscnt; 866 func = _aio_do_request; 867 break; 868 case AIONOTIFY: 869 workers = &__workers_no; 870 nextworker = &__nextworker_no; 871 func = _aio_do_notify; 872 aio_workerscnt = &__no_workerscnt; 873 break; 874 default: 875 aio_panic("_aio_create_worker: invalid mode"); 876 break; 877 } 878 879 if ((aiowp = _aio_worker_alloc()) == NULL) 880 return (-1); 881 882 if (reqp) { 883 reqp->req_state = AIO_REQ_QUEUED; 884 reqp->req_worker = aiowp; 885 aiowp->work_head1 = reqp; 886 aiowp->work_tail1 = reqp; 887 aiowp->work_next1 = reqp; 888 aiowp->work_count1 = 1; 889 aiowp->work_minload1 = 1; 890 } 891 892 (void) pthread_sigmask(SIG_SETMASK, &maskset, &oset); 893 error = thr_create(NULL, AIOSTKSIZE, func, aiowp, 894 THR_DAEMON | THR_SUSPENDED, &aiowp->work_tid); 895 (void) pthread_sigmask(SIG_SETMASK, &oset, NULL); 896 if (error) { 897 if (reqp) { 898 reqp->req_state = 0; 899 reqp->req_worker = NULL; 900 } 901 _aio_worker_free(aiowp); 902 return (-1); 903 } 904 905 lmutex_lock(&__aio_mutex); 906 (*aio_workerscnt)++; 907 if (*workers == NULL) { 908 aiowp->work_forw = aiowp; 909 aiowp->work_backw = aiowp; 910 *nextworker = aiowp; 911 *workers = aiowp; 912 } else { 913 aiowp->work_backw = (*workers)->work_backw; 914 aiowp->work_forw = (*workers); 915 (*workers)->work_backw->work_forw = aiowp; 916 (*workers)->work_backw = aiowp; 917 } 918 _aio_worker_cnt++; 919 lmutex_unlock(&__aio_mutex); 920 921 (void) thr_continue(aiowp->work_tid); 922 923 return (0); 924 } 925 926 /* 927 * This is the worker's main routine. 928 * The task of this function is to execute all queued requests; 929 * once the last pending request is executed this function will block 930 * in _aio_idle(). A new incoming request must wakeup this thread to 931 * restart the work. 932 * Every worker has an own work queue. The queue lock is required 933 * to synchronize the addition of new requests for this worker or 934 * cancellation of pending/running requests. 935 * 936 * Cancellation scenarios: 937 * The cancellation of a request is being done asynchronously using 938 * _aio_cancel_req() from another thread context. 939 * A queued request can be cancelled in different manners : 940 * a) request is queued but not "in progress" or "done" (AIO_REQ_QUEUED): 941 * - lock the queue -> remove the request -> unlock the queue 942 * - this function/thread does not detect this cancellation process 943 * b) request is in progress (AIO_REQ_INPROGRESS) : 944 * - this function first allow the cancellation of the running 945 * request with the flag "work_cancel_flg=1" 946 * see _aio_req_get() -> _aio_cancel_on() 947 * During this phase, it is allowed to interrupt the worker 948 * thread running the request (this thread) using the SIGAIOCANCEL 949 * signal. 950 * Once this thread returns from the kernel (because the request 951 * is just done), then it must disable a possible cancellation 952 * and proceed to finish the request. To disable the cancellation 953 * this thread must use _aio_cancel_off() to set "work_cancel_flg=0". 954 * c) request is already done (AIO_REQ_DONE || AIO_REQ_DONEQ): 955 * same procedure as in a) 956 * 957 * To b) 958 * This thread uses sigsetjmp() to define the position in the code, where 959 * it wish to continue working in the case that a SIGAIOCANCEL signal 960 * is detected. 961 * Normally this thread should get the cancellation signal during the 962 * kernel phase (reading or writing). In that case the signal handler 963 * aiosigcancelhndlr() is activated using the worker thread context, 964 * which again will use the siglongjmp() function to break the standard 965 * code flow and jump to the "sigsetjmp" position, provided that 966 * "work_cancel_flg" is set to "1". 967 * Because the "work_cancel_flg" is only manipulated by this worker 968 * thread and it can only run on one CPU at a given time, it is not 969 * necessary to protect that flag with the queue lock. 970 * Returning from the kernel (read or write system call) we must 971 * first disable the use of the SIGAIOCANCEL signal and accordingly 972 * the use of the siglongjmp() function to prevent a possible deadlock: 973 * - It can happens that this worker thread returns from the kernel and 974 * blocks in "work_qlock1", 975 * - then a second thread cancels the apparently "in progress" request 976 * and sends the SIGAIOCANCEL signal to the worker thread, 977 * - the worker thread gets assigned the "work_qlock1" and will returns 978 * from the kernel, 979 * - the kernel detects the pending signal and activates the signal 980 * handler instead, 981 * - if the "work_cancel_flg" is still set then the signal handler 982 * should use siglongjmp() to cancel the "in progress" request and 983 * it would try to acquire the same work_qlock1 in _aio_req_get() 984 * for a second time => deadlock. 985 * To avoid that situation we disable the cancellation of the request 986 * in progress BEFORE we try to acquire the work_qlock1. 987 * In that case the signal handler will not call siglongjmp() and the 988 * worker thread will continue running the standard code flow. 989 * Then this thread must check the AIO_REQ_CANCELED flag to emulate 990 * an eventually required siglongjmp() freeing the work_qlock1 and 991 * avoiding a deadlock. 992 */ 993 void * 994 _aio_do_request(void *arglist) 995 { 996 aio_worker_t *aiowp = (aio_worker_t *)arglist; 997 ulwp_t *self = curthread; 998 struct aio_args *arg; 999 aio_req_t *reqp; /* current AIO request */ 1000 ssize_t retval; 1001 int error; 1002 1003 if (pthread_setspecific(_aio_key, aiowp) != 0) 1004 aio_panic("_aio_do_request, pthread_setspecific()"); 1005 (void) pthread_sigmask(SIG_SETMASK, &_worker_set, NULL); 1006 ASSERT(aiowp->work_req == NULL); 1007 1008 /* 1009 * We resume here when an operation is cancelled. 1010 * On first entry, aiowp->work_req == NULL, so all 1011 * we do is block SIGAIOCANCEL. 1012 */ 1013 (void) sigsetjmp(aiowp->work_jmp_buf, 0); 1014 ASSERT(self->ul_sigdefer == 0); 1015 1016 sigoff(self); /* block SIGAIOCANCEL */ 1017 if (aiowp->work_req != NULL) 1018 _aio_finish_request(aiowp, -1, ECANCELED); 1019 1020 for (;;) { 1021 /* 1022 * Put completed requests on aio_done_list. This has 1023 * to be done as part of the main loop to ensure that 1024 * we don't artificially starve any aiowait'ers. 1025 */ 1026 if (aiowp->work_done1) 1027 _aio_work_done(aiowp); 1028 1029 top: 1030 /* consume any deferred SIGAIOCANCEL signal here */ 1031 sigon(self); 1032 sigoff(self); 1033 1034 while ((reqp = _aio_req_get(aiowp)) == NULL) { 1035 if (_aio_idle(aiowp) != 0) 1036 goto top; 1037 } 1038 arg = &reqp->req_args; 1039 ASSERT(reqp->req_state == AIO_REQ_INPROGRESS || 1040 reqp->req_state == AIO_REQ_CANCELED); 1041 error = 0; 1042 1043 switch (reqp->req_op) { 1044 case AIOREAD: 1045 case AIOAREAD: 1046 sigon(self); /* unblock SIGAIOCANCEL */ 1047 retval = pread(arg->fd, arg->buf, 1048 arg->bufsz, arg->offset); 1049 if (retval == -1) { 1050 if (errno == ESPIPE) { 1051 retval = read(arg->fd, 1052 arg->buf, arg->bufsz); 1053 if (retval == -1) 1054 error = errno; 1055 } else { 1056 error = errno; 1057 } 1058 } 1059 sigoff(self); /* block SIGAIOCANCEL */ 1060 break; 1061 case AIOWRITE: 1062 case AIOAWRITE: 1063 sigon(self); /* unblock SIGAIOCANCEL */ 1064 retval = pwrite(arg->fd, arg->buf, 1065 arg->bufsz, arg->offset); 1066 if (retval == -1) { 1067 if (errno == ESPIPE) { 1068 retval = write(arg->fd, 1069 arg->buf, arg->bufsz); 1070 if (retval == -1) 1071 error = errno; 1072 } else { 1073 error = errno; 1074 } 1075 } 1076 sigoff(self); /* block SIGAIOCANCEL */ 1077 break; 1078 #if !defined(_LP64) 1079 case AIOAREAD64: 1080 sigon(self); /* unblock SIGAIOCANCEL */ 1081 retval = pread64(arg->fd, arg->buf, 1082 arg->bufsz, arg->offset); 1083 if (retval == -1) { 1084 if (errno == ESPIPE) { 1085 retval = read(arg->fd, 1086 arg->buf, arg->bufsz); 1087 if (retval == -1) 1088 error = errno; 1089 } else { 1090 error = errno; 1091 } 1092 } 1093 sigoff(self); /* block SIGAIOCANCEL */ 1094 break; 1095 case AIOAWRITE64: 1096 sigon(self); /* unblock SIGAIOCANCEL */ 1097 retval = pwrite64(arg->fd, arg->buf, 1098 arg->bufsz, arg->offset); 1099 if (retval == -1) { 1100 if (errno == ESPIPE) { 1101 retval = write(arg->fd, 1102 arg->buf, arg->bufsz); 1103 if (retval == -1) 1104 error = errno; 1105 } else { 1106 error = errno; 1107 } 1108 } 1109 sigoff(self); /* block SIGAIOCANCEL */ 1110 break; 1111 #endif /* !defined(_LP64) */ 1112 case AIOFSYNC: 1113 if (_aio_fsync_del(aiowp, reqp)) 1114 goto top; 1115 ASSERT(reqp->req_head == NULL); 1116 /* 1117 * All writes for this fsync request are now 1118 * acknowledged. Now make these writes visible 1119 * and put the final request into the hash table. 1120 */ 1121 if (reqp->req_state == AIO_REQ_CANCELED) { 1122 /* EMPTY */; 1123 } else if (arg->offset == O_SYNC) { 1124 if ((retval = __fdsync(arg->fd, FSYNC)) == -1) 1125 error = errno; 1126 } else { 1127 if ((retval = __fdsync(arg->fd, FDSYNC)) == -1) 1128 error = errno; 1129 } 1130 if (_aio_hash_insert(reqp->req_resultp, reqp) != 0) 1131 aio_panic("_aio_do_request(): AIOFSYNC: " 1132 "request already in hash table"); 1133 break; 1134 default: 1135 aio_panic("_aio_do_request, bad op"); 1136 } 1137 1138 _aio_finish_request(aiowp, retval, error); 1139 } 1140 /* NOTREACHED */ 1141 return (NULL); 1142 } 1143 1144 /* 1145 * Perform the tail processing for _aio_do_request(). 1146 * The in-progress request may or may not have been cancelled. 1147 */ 1148 static void 1149 _aio_finish_request(aio_worker_t *aiowp, ssize_t retval, int error) 1150 { 1151 aio_req_t *reqp; 1152 1153 sig_mutex_lock(&aiowp->work_qlock1); 1154 if ((reqp = aiowp->work_req) == NULL) 1155 sig_mutex_unlock(&aiowp->work_qlock1); 1156 else { 1157 aiowp->work_req = NULL; 1158 if (reqp->req_state == AIO_REQ_CANCELED) { 1159 retval = -1; 1160 error = ECANCELED; 1161 } 1162 if (!POSIX_AIO(reqp)) { 1163 int notify; 1164 sig_mutex_unlock(&aiowp->work_qlock1); 1165 sig_mutex_lock(&__aio_mutex); 1166 if (reqp->req_state == AIO_REQ_INPROGRESS) 1167 reqp->req_state = AIO_REQ_DONE; 1168 /* 1169 * If it was canceled, this request will not be 1170 * added to done list. Just free it. 1171 */ 1172 if (error == ECANCELED) { 1173 _aio_outstand_cnt--; 1174 _aio_req_free(reqp); 1175 } else { 1176 _aio_set_result(reqp, retval, error); 1177 _aio_req_done_cnt++; 1178 } 1179 /* 1180 * Notify any thread that may have blocked 1181 * because it saw an outstanding request. 1182 */ 1183 notify = 0; 1184 if (_aio_outstand_cnt == 0 && _aiowait_flag) { 1185 notify = 1; 1186 } 1187 sig_mutex_unlock(&__aio_mutex); 1188 if (notify) { 1189 (void) _kaio(AIONOTIFY); 1190 } 1191 } else { 1192 if (reqp->req_state == AIO_REQ_INPROGRESS) 1193 reqp->req_state = AIO_REQ_DONE; 1194 sig_mutex_unlock(&aiowp->work_qlock1); 1195 _aiodone(reqp, retval, error); 1196 } 1197 } 1198 } 1199 1200 void 1201 _aio_req_mark_done(aio_req_t *reqp) 1202 { 1203 #if !defined(_LP64) 1204 if (reqp->req_largefile) 1205 ((aiocb64_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE; 1206 else 1207 #endif 1208 ((aiocb_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE; 1209 } 1210 1211 /* 1212 * Sleep for 'ticks' clock ticks to give somebody else a chance to run, 1213 * hopefully to consume one of our queued signals. 1214 */ 1215 static void 1216 _aio_delay(int ticks) 1217 { 1218 (void) usleep(ticks * (MICROSEC / hz)); 1219 } 1220 1221 /* 1222 * Actually send the notifications. 1223 * We could block indefinitely here if the application 1224 * is not listening for the signal or port notifications. 1225 */ 1226 static void 1227 send_notification(notif_param_t *npp) 1228 { 1229 extern int __sigqueue(pid_t pid, int signo, 1230 /* const union sigval */ void *value, int si_code, int block); 1231 1232 if (npp->np_signo) 1233 (void) __sigqueue(__pid, npp->np_signo, npp->np_user, 1234 SI_ASYNCIO, 1); 1235 else if (npp->np_port >= 0) 1236 (void) _port_dispatch(npp->np_port, 0, PORT_SOURCE_AIO, 1237 npp->np_event, npp->np_object, npp->np_user); 1238 1239 if (npp->np_lio_signo) 1240 (void) __sigqueue(__pid, npp->np_lio_signo, npp->np_lio_user, 1241 SI_ASYNCIO, 1); 1242 else if (npp->np_lio_port >= 0) 1243 (void) _port_dispatch(npp->np_lio_port, 0, PORT_SOURCE_AIO, 1244 npp->np_lio_event, npp->np_lio_object, npp->np_lio_user); 1245 } 1246 1247 /* 1248 * Asynchronous notification worker. 1249 */ 1250 void * 1251 _aio_do_notify(void *arg) 1252 { 1253 aio_worker_t *aiowp = (aio_worker_t *)arg; 1254 aio_req_t *reqp; 1255 1256 /* 1257 * This isn't really necessary. All signals are blocked. 1258 */ 1259 if (pthread_setspecific(_aio_key, aiowp) != 0) 1260 aio_panic("_aio_do_notify, pthread_setspecific()"); 1261 1262 /* 1263 * Notifications are never cancelled. 1264 * All signals remain blocked, forever. 1265 */ 1266 for (;;) { 1267 while ((reqp = _aio_req_get(aiowp)) == NULL) { 1268 if (_aio_idle(aiowp) != 0) 1269 aio_panic("_aio_do_notify: _aio_idle() failed"); 1270 } 1271 send_notification(&reqp->req_notify); 1272 _aio_req_free(reqp); 1273 } 1274 1275 /* NOTREACHED */ 1276 return (NULL); 1277 } 1278 1279 /* 1280 * Do the completion semantics for a request that was either canceled 1281 * by _aio_cancel_req() or was completed by _aio_do_request(). 1282 */ 1283 static void 1284 _aiodone(aio_req_t *reqp, ssize_t retval, int error) 1285 { 1286 aio_result_t *resultp = reqp->req_resultp; 1287 int notify = 0; 1288 aio_lio_t *head; 1289 int sigev_none; 1290 int sigev_signal; 1291 int sigev_thread; 1292 int sigev_port; 1293 notif_param_t np; 1294 1295 /* 1296 * We call _aiodone() only for Posix I/O. 1297 */ 1298 ASSERT(POSIX_AIO(reqp)); 1299 1300 sigev_none = 0; 1301 sigev_signal = 0; 1302 sigev_thread = 0; 1303 sigev_port = 0; 1304 np.np_signo = 0; 1305 np.np_port = -1; 1306 np.np_lio_signo = 0; 1307 np.np_lio_port = -1; 1308 1309 switch (reqp->req_sigevent.sigev_notify) { 1310 case SIGEV_NONE: 1311 sigev_none = 1; 1312 break; 1313 case SIGEV_SIGNAL: 1314 sigev_signal = 1; 1315 break; 1316 case SIGEV_THREAD: 1317 sigev_thread = 1; 1318 break; 1319 case SIGEV_PORT: 1320 sigev_port = 1; 1321 break; 1322 default: 1323 aio_panic("_aiodone: improper sigev_notify"); 1324 break; 1325 } 1326 1327 /* 1328 * Figure out the notification parameters while holding __aio_mutex. 1329 * Actually perform the notifications after dropping __aio_mutex. 1330 * This allows us to sleep for a long time (if the notifications 1331 * incur delays) without impeding other async I/O operations. 1332 */ 1333 1334 sig_mutex_lock(&__aio_mutex); 1335 1336 if (sigev_signal) { 1337 if ((np.np_signo = reqp->req_sigevent.sigev_signo) != 0) 1338 notify = 1; 1339 np.np_user = reqp->req_sigevent.sigev_value.sival_ptr; 1340 } else if (sigev_thread | sigev_port) { 1341 if ((np.np_port = reqp->req_sigevent.sigev_signo) >= 0) 1342 notify = 1; 1343 np.np_event = reqp->req_op; 1344 if (np.np_event == AIOFSYNC && reqp->req_largefile) 1345 np.np_event = AIOFSYNC64; 1346 np.np_object = (uintptr_t)reqp->req_aiocbp; 1347 np.np_user = reqp->req_sigevent.sigev_value.sival_ptr; 1348 } 1349 1350 if (resultp->aio_errno == EINPROGRESS) 1351 _aio_set_result(reqp, retval, error); 1352 1353 _aio_outstand_cnt--; 1354 1355 head = reqp->req_head; 1356 reqp->req_head = NULL; 1357 1358 if (sigev_none) { 1359 _aio_enq_doneq(reqp); 1360 reqp = NULL; 1361 } else { 1362 (void) _aio_hash_del(resultp); 1363 _aio_req_mark_done(reqp); 1364 } 1365 1366 _aio_waitn_wakeup(); 1367 1368 /* 1369 * __aio_waitn() sets AIO_WAIT_INPROGRESS and 1370 * __aio_suspend() increments "_aio_kernel_suspend" 1371 * when they are waiting in the kernel for completed I/Os. 1372 * 1373 * _kaio(AIONOTIFY) awakes the corresponding function 1374 * in the kernel; then the corresponding __aio_waitn() or 1375 * __aio_suspend() function could reap the recently 1376 * completed I/Os (_aiodone()). 1377 */ 1378 if ((_aio_flags & AIO_WAIT_INPROGRESS) || _aio_kernel_suspend > 0) 1379 (void) _kaio(AIONOTIFY); 1380 1381 sig_mutex_unlock(&__aio_mutex); 1382 1383 if (head != NULL) { 1384 /* 1385 * If all the lio requests have completed, 1386 * prepare to notify the waiting thread. 1387 */ 1388 sig_mutex_lock(&head->lio_mutex); 1389 ASSERT(head->lio_refcnt == head->lio_nent); 1390 if (head->lio_refcnt == 1) { 1391 int waiting = 0; 1392 if (head->lio_mode == LIO_WAIT) { 1393 if ((waiting = head->lio_waiting) != 0) 1394 (void) cond_signal(&head->lio_cond_cv); 1395 } else if (head->lio_port < 0) { /* none or signal */ 1396 if ((np.np_lio_signo = head->lio_signo) != 0) 1397 notify = 1; 1398 np.np_lio_user = head->lio_sigval.sival_ptr; 1399 } else { /* thread or port */ 1400 notify = 1; 1401 np.np_lio_port = head->lio_port; 1402 np.np_lio_event = head->lio_event; 1403 np.np_lio_object = 1404 (uintptr_t)head->lio_sigevent; 1405 np.np_lio_user = head->lio_sigval.sival_ptr; 1406 } 1407 head->lio_nent = head->lio_refcnt = 0; 1408 sig_mutex_unlock(&head->lio_mutex); 1409 if (waiting == 0) 1410 _aio_lio_free(head); 1411 } else { 1412 head->lio_nent--; 1413 head->lio_refcnt--; 1414 sig_mutex_unlock(&head->lio_mutex); 1415 } 1416 } 1417 1418 /* 1419 * The request is completed; now perform the notifications. 1420 */ 1421 if (notify) { 1422 if (reqp != NULL) { 1423 /* 1424 * We usually put the request on the notification 1425 * queue because we don't want to block and delay 1426 * other operations behind us in the work queue. 1427 * Also we must never block on a cancel notification 1428 * because we are being called from an application 1429 * thread in this case and that could lead to deadlock 1430 * if no other thread is receiving notificatins. 1431 */ 1432 reqp->req_notify = np; 1433 reqp->req_op = AIONOTIFY; 1434 _aio_req_add(reqp, &__workers_no, AIONOTIFY); 1435 reqp = NULL; 1436 } else { 1437 /* 1438 * We already put the request on the done queue, 1439 * so we can't queue it to the notification queue. 1440 * Just do the notification directly. 1441 */ 1442 send_notification(&np); 1443 } 1444 } 1445 1446 if (reqp != NULL) 1447 _aio_req_free(reqp); 1448 } 1449 1450 /* 1451 * Delete fsync requests from list head until there is 1452 * only one left. Return 0 when there is only one, 1453 * otherwise return a non-zero value. 1454 */ 1455 static int 1456 _aio_fsync_del(aio_worker_t *aiowp, aio_req_t *reqp) 1457 { 1458 aio_lio_t *head = reqp->req_head; 1459 int rval = 0; 1460 1461 ASSERT(reqp == aiowp->work_req); 1462 sig_mutex_lock(&aiowp->work_qlock1); 1463 sig_mutex_lock(&head->lio_mutex); 1464 if (head->lio_refcnt > 1) { 1465 head->lio_refcnt--; 1466 head->lio_nent--; 1467 aiowp->work_req = NULL; 1468 sig_mutex_unlock(&head->lio_mutex); 1469 sig_mutex_unlock(&aiowp->work_qlock1); 1470 sig_mutex_lock(&__aio_mutex); 1471 _aio_outstand_cnt--; 1472 _aio_waitn_wakeup(); 1473 sig_mutex_unlock(&__aio_mutex); 1474 _aio_req_free(reqp); 1475 return (1); 1476 } 1477 ASSERT(head->lio_nent == 1 && head->lio_refcnt == 1); 1478 reqp->req_head = NULL; 1479 if (head->lio_canned) 1480 reqp->req_state = AIO_REQ_CANCELED; 1481 if (head->lio_mode == LIO_DESTROY) { 1482 aiowp->work_req = NULL; 1483 rval = 1; 1484 } 1485 sig_mutex_unlock(&head->lio_mutex); 1486 sig_mutex_unlock(&aiowp->work_qlock1); 1487 head->lio_refcnt--; 1488 head->lio_nent--; 1489 _aio_lio_free(head); 1490 if (rval != 0) 1491 _aio_req_free(reqp); 1492 return (rval); 1493 } 1494 1495 /* 1496 * A worker is set idle when its work queue is empty. 1497 * The worker checks again that it has no more work 1498 * and then goes to sleep waiting for more work. 1499 */ 1500 int 1501 _aio_idle(aio_worker_t *aiowp) 1502 { 1503 int error = 0; 1504 1505 sig_mutex_lock(&aiowp->work_qlock1); 1506 if (aiowp->work_count1 == 0) { 1507 ASSERT(aiowp->work_minload1 == 0); 1508 aiowp->work_idleflg = 1; 1509 /* 1510 * A cancellation handler is not needed here. 1511 * aio worker threads are never cancelled via pthread_cancel(). 1512 */ 1513 error = sig_cond_wait(&aiowp->work_idle_cv, 1514 &aiowp->work_qlock1); 1515 /* 1516 * The idle flag is normally cleared before worker is awakened 1517 * by aio_req_add(). On error (EINTR), we clear it ourself. 1518 */ 1519 if (error) 1520 aiowp->work_idleflg = 0; 1521 } 1522 sig_mutex_unlock(&aiowp->work_qlock1); 1523 return (error); 1524 } 1525 1526 /* 1527 * A worker's completed AIO requests are placed onto a global 1528 * done queue. The application is only sent a SIGIO signal if 1529 * the process has a handler enabled and it is not waiting via 1530 * aiowait(). 1531 */ 1532 static void 1533 _aio_work_done(aio_worker_t *aiowp) 1534 { 1535 aio_req_t *reqp; 1536 1537 sig_mutex_lock(&aiowp->work_qlock1); 1538 reqp = aiowp->work_prev1; 1539 reqp->req_next = NULL; 1540 aiowp->work_done1 = 0; 1541 aiowp->work_tail1 = aiowp->work_next1; 1542 if (aiowp->work_tail1 == NULL) 1543 aiowp->work_head1 = NULL; 1544 aiowp->work_prev1 = NULL; 1545 sig_mutex_unlock(&aiowp->work_qlock1); 1546 sig_mutex_lock(&__aio_mutex); 1547 _aio_donecnt++; 1548 _aio_outstand_cnt--; 1549 _aio_req_done_cnt--; 1550 ASSERT(_aio_donecnt > 0 && 1551 _aio_outstand_cnt >= 0 && 1552 _aio_req_done_cnt >= 0); 1553 ASSERT(reqp != NULL); 1554 1555 if (_aio_done_tail == NULL) { 1556 _aio_done_head = _aio_done_tail = reqp; 1557 } else { 1558 _aio_done_head->req_next = reqp; 1559 _aio_done_head = reqp; 1560 } 1561 1562 if (_aiowait_flag) { 1563 sig_mutex_unlock(&__aio_mutex); 1564 (void) _kaio(AIONOTIFY); 1565 } else { 1566 sig_mutex_unlock(&__aio_mutex); 1567 if (_sigio_enabled) 1568 (void) kill(__pid, SIGIO); 1569 } 1570 } 1571 1572 /* 1573 * The done queue consists of AIO requests that are in either the 1574 * AIO_REQ_DONE or AIO_REQ_CANCELED state. Requests that were cancelled 1575 * are discarded. If the done queue is empty then NULL is returned. 1576 * Otherwise the address of a done aio_result_t is returned. 1577 */ 1578 aio_result_t * 1579 _aio_req_done(void) 1580 { 1581 aio_req_t *reqp; 1582 aio_result_t *resultp; 1583 1584 ASSERT(MUTEX_HELD(&__aio_mutex)); 1585 1586 if ((reqp = _aio_done_tail) != NULL) { 1587 if ((_aio_done_tail = reqp->req_next) == NULL) 1588 _aio_done_head = NULL; 1589 ASSERT(_aio_donecnt > 0); 1590 _aio_donecnt--; 1591 (void) _aio_hash_del(reqp->req_resultp); 1592 resultp = reqp->req_resultp; 1593 ASSERT(reqp->req_state == AIO_REQ_DONE); 1594 _aio_req_free(reqp); 1595 return (resultp); 1596 } 1597 /* is queue empty? */ 1598 if (reqp == NULL && _aio_outstand_cnt == 0) { 1599 return ((aio_result_t *)-1); 1600 } 1601 return (NULL); 1602 } 1603 1604 /* 1605 * Set the return and errno values for the application's use. 1606 * 1607 * For the Posix interfaces, we must set the return value first followed 1608 * by the errno value because the Posix interfaces allow for a change 1609 * in the errno value from EINPROGRESS to something else to signal 1610 * the completion of the asynchronous request. 1611 * 1612 * The opposite is true for the Solaris interfaces. These allow for 1613 * a change in the return value from AIO_INPROGRESS to something else 1614 * to signal the completion of the asynchronous request. 1615 */ 1616 void 1617 _aio_set_result(aio_req_t *reqp, ssize_t retval, int error) 1618 { 1619 aio_result_t *resultp = reqp->req_resultp; 1620 1621 if (POSIX_AIO(reqp)) { 1622 resultp->aio_return = retval; 1623 membar_producer(); 1624 resultp->aio_errno = error; 1625 } else { 1626 resultp->aio_errno = error; 1627 membar_producer(); 1628 resultp->aio_return = retval; 1629 } 1630 } 1631 1632 /* 1633 * Add an AIO request onto the next work queue. 1634 * A circular list of workers is used to choose the next worker. 1635 */ 1636 void 1637 _aio_req_add(aio_req_t *reqp, aio_worker_t **nextworker, int mode) 1638 { 1639 ulwp_t *self = curthread; 1640 aio_worker_t *aiowp; 1641 aio_worker_t *first; 1642 int load_bal_flg = 1; 1643 int found; 1644 1645 ASSERT(reqp->req_state != AIO_REQ_DONEQ); 1646 reqp->req_next = NULL; 1647 /* 1648 * Try to acquire the next worker's work queue. If it is locked, 1649 * then search the list of workers until a queue is found unlocked, 1650 * or until the list is completely traversed at which point another 1651 * worker will be created. 1652 */ 1653 sigoff(self); /* defer SIGIO */ 1654 sig_mutex_lock(&__aio_mutex); 1655 first = aiowp = *nextworker; 1656 if (mode != AIONOTIFY) 1657 _aio_outstand_cnt++; 1658 sig_mutex_unlock(&__aio_mutex); 1659 1660 switch (mode) { 1661 case AIOREAD: 1662 case AIOWRITE: 1663 case AIOAREAD: 1664 case AIOAWRITE: 1665 #if !defined(_LP64) 1666 case AIOAREAD64: 1667 case AIOAWRITE64: 1668 #endif 1669 /* try to find an idle worker */ 1670 found = 0; 1671 do { 1672 if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) { 1673 if (aiowp->work_idleflg) { 1674 found = 1; 1675 break; 1676 } 1677 sig_mutex_unlock(&aiowp->work_qlock1); 1678 } 1679 } while ((aiowp = aiowp->work_forw) != first); 1680 1681 if (found) { 1682 aiowp->work_minload1++; 1683 break; 1684 } 1685 1686 /* try to acquire some worker's queue lock */ 1687 do { 1688 if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) { 1689 found = 1; 1690 break; 1691 } 1692 } while ((aiowp = aiowp->work_forw) != first); 1693 1694 /* 1695 * Create more workers when the workers appear overloaded. 1696 * Either all the workers are busy draining their queues 1697 * or no worker's queue lock could be acquired. 1698 */ 1699 if (!found) { 1700 if (_aio_worker_cnt < _max_workers) { 1701 if (_aio_create_worker(reqp, mode)) 1702 aio_panic("_aio_req_add: add worker"); 1703 sigon(self); /* reenable SIGIO */ 1704 return; 1705 } 1706 1707 /* 1708 * No worker available and we have created 1709 * _max_workers, keep going through the 1710 * list slowly until we get a lock 1711 */ 1712 while (sig_mutex_trylock(&aiowp->work_qlock1) != 0) { 1713 /* 1714 * give someone else a chance 1715 */ 1716 _aio_delay(1); 1717 aiowp = aiowp->work_forw; 1718 } 1719 } 1720 1721 ASSERT(MUTEX_HELD(&aiowp->work_qlock1)); 1722 if (_aio_worker_cnt < _max_workers && 1723 aiowp->work_minload1 >= _minworkload) { 1724 sig_mutex_unlock(&aiowp->work_qlock1); 1725 sig_mutex_lock(&__aio_mutex); 1726 *nextworker = aiowp->work_forw; 1727 sig_mutex_unlock(&__aio_mutex); 1728 if (_aio_create_worker(reqp, mode)) 1729 aio_panic("aio_req_add: add worker"); 1730 sigon(self); /* reenable SIGIO */ 1731 return; 1732 } 1733 aiowp->work_minload1++; 1734 break; 1735 case AIOFSYNC: 1736 case AIONOTIFY: 1737 load_bal_flg = 0; 1738 sig_mutex_lock(&aiowp->work_qlock1); 1739 break; 1740 default: 1741 aio_panic("_aio_req_add: invalid mode"); 1742 break; 1743 } 1744 /* 1745 * Put request onto worker's work queue. 1746 */ 1747 if (aiowp->work_tail1 == NULL) { 1748 ASSERT(aiowp->work_count1 == 0); 1749 aiowp->work_tail1 = reqp; 1750 aiowp->work_next1 = reqp; 1751 } else { 1752 aiowp->work_head1->req_next = reqp; 1753 if (aiowp->work_next1 == NULL) 1754 aiowp->work_next1 = reqp; 1755 } 1756 reqp->req_state = AIO_REQ_QUEUED; 1757 reqp->req_worker = aiowp; 1758 aiowp->work_head1 = reqp; 1759 /* 1760 * Awaken worker if it is not currently active. 1761 */ 1762 if (aiowp->work_count1++ == 0 && aiowp->work_idleflg) { 1763 aiowp->work_idleflg = 0; 1764 (void) cond_signal(&aiowp->work_idle_cv); 1765 } 1766 sig_mutex_unlock(&aiowp->work_qlock1); 1767 1768 if (load_bal_flg) { 1769 sig_mutex_lock(&__aio_mutex); 1770 *nextworker = aiowp->work_forw; 1771 sig_mutex_unlock(&__aio_mutex); 1772 } 1773 sigon(self); /* reenable SIGIO */ 1774 } 1775 1776 /* 1777 * Get an AIO request for a specified worker. 1778 * If the work queue is empty, return NULL. 1779 */ 1780 aio_req_t * 1781 _aio_req_get(aio_worker_t *aiowp) 1782 { 1783 aio_req_t *reqp; 1784 1785 sig_mutex_lock(&aiowp->work_qlock1); 1786 if ((reqp = aiowp->work_next1) != NULL) { 1787 /* 1788 * Remove a POSIX request from the queue; the 1789 * request queue is a singularly linked list 1790 * with a previous pointer. The request is 1791 * removed by updating the previous pointer. 1792 * 1793 * Non-posix requests are left on the queue 1794 * to eventually be placed on the done queue. 1795 */ 1796 1797 if (POSIX_AIO(reqp)) { 1798 if (aiowp->work_prev1 == NULL) { 1799 aiowp->work_tail1 = reqp->req_next; 1800 if (aiowp->work_tail1 == NULL) 1801 aiowp->work_head1 = NULL; 1802 } else { 1803 aiowp->work_prev1->req_next = reqp->req_next; 1804 if (aiowp->work_head1 == reqp) 1805 aiowp->work_head1 = reqp->req_next; 1806 } 1807 1808 } else { 1809 aiowp->work_prev1 = reqp; 1810 ASSERT(aiowp->work_done1 >= 0); 1811 aiowp->work_done1++; 1812 } 1813 ASSERT(reqp != reqp->req_next); 1814 aiowp->work_next1 = reqp->req_next; 1815 ASSERT(aiowp->work_count1 >= 1); 1816 aiowp->work_count1--; 1817 switch (reqp->req_op) { 1818 case AIOREAD: 1819 case AIOWRITE: 1820 case AIOAREAD: 1821 case AIOAWRITE: 1822 #if !defined(_LP64) 1823 case AIOAREAD64: 1824 case AIOAWRITE64: 1825 #endif 1826 ASSERT(aiowp->work_minload1 > 0); 1827 aiowp->work_minload1--; 1828 break; 1829 } 1830 reqp->req_state = AIO_REQ_INPROGRESS; 1831 } 1832 aiowp->work_req = reqp; 1833 ASSERT(reqp != NULL || aiowp->work_count1 == 0); 1834 sig_mutex_unlock(&aiowp->work_qlock1); 1835 return (reqp); 1836 } 1837 1838 static void 1839 _aio_req_del(aio_worker_t *aiowp, aio_req_t *reqp, int ostate) 1840 { 1841 aio_req_t **last; 1842 aio_req_t *lastrp; 1843 aio_req_t *next; 1844 1845 ASSERT(aiowp != NULL); 1846 ASSERT(MUTEX_HELD(&aiowp->work_qlock1)); 1847 if (POSIX_AIO(reqp)) { 1848 if (ostate != AIO_REQ_QUEUED) 1849 return; 1850 } 1851 last = &aiowp->work_tail1; 1852 lastrp = aiowp->work_tail1; 1853 ASSERT(ostate == AIO_REQ_QUEUED || ostate == AIO_REQ_INPROGRESS); 1854 while ((next = *last) != NULL) { 1855 if (next == reqp) { 1856 *last = next->req_next; 1857 if (aiowp->work_next1 == next) 1858 aiowp->work_next1 = next->req_next; 1859 1860 if ((next->req_next != NULL) || 1861 (aiowp->work_done1 == 0)) { 1862 if (aiowp->work_head1 == next) 1863 aiowp->work_head1 = next->req_next; 1864 if (aiowp->work_prev1 == next) 1865 aiowp->work_prev1 = next->req_next; 1866 } else { 1867 if (aiowp->work_head1 == next) 1868 aiowp->work_head1 = lastrp; 1869 if (aiowp->work_prev1 == next) 1870 aiowp->work_prev1 = lastrp; 1871 } 1872 1873 if (ostate == AIO_REQ_QUEUED) { 1874 ASSERT(aiowp->work_count1 >= 1); 1875 aiowp->work_count1--; 1876 ASSERT(aiowp->work_minload1 >= 1); 1877 aiowp->work_minload1--; 1878 } else { 1879 ASSERT(ostate == AIO_REQ_INPROGRESS && 1880 !POSIX_AIO(reqp)); 1881 aiowp->work_done1--; 1882 } 1883 return; 1884 } 1885 last = &next->req_next; 1886 lastrp = next; 1887 } 1888 /* NOTREACHED */ 1889 } 1890 1891 static void 1892 _aio_enq_doneq(aio_req_t *reqp) 1893 { 1894 if (_aio_doneq == NULL) { 1895 _aio_doneq = reqp; 1896 reqp->req_next = reqp->req_prev = reqp; 1897 } else { 1898 reqp->req_next = _aio_doneq; 1899 reqp->req_prev = _aio_doneq->req_prev; 1900 _aio_doneq->req_prev->req_next = reqp; 1901 _aio_doneq->req_prev = reqp; 1902 } 1903 reqp->req_state = AIO_REQ_DONEQ; 1904 _aio_doneq_cnt++; 1905 } 1906 1907 /* 1908 * caller owns the _aio_mutex 1909 */ 1910 aio_req_t * 1911 _aio_req_remove(aio_req_t *reqp) 1912 { 1913 if (reqp && reqp->req_state != AIO_REQ_DONEQ) 1914 return (NULL); 1915 1916 if (reqp) { 1917 /* request in done queue */ 1918 if (_aio_doneq == reqp) 1919 _aio_doneq = reqp->req_next; 1920 if (_aio_doneq == reqp) { 1921 /* only one request on queue */ 1922 _aio_doneq = NULL; 1923 } else { 1924 aio_req_t *tmp = reqp->req_next; 1925 reqp->req_prev->req_next = tmp; 1926 tmp->req_prev = reqp->req_prev; 1927 } 1928 } else if ((reqp = _aio_doneq) != NULL) { 1929 if (reqp == reqp->req_next) { 1930 /* only one request on queue */ 1931 _aio_doneq = NULL; 1932 } else { 1933 reqp->req_prev->req_next = _aio_doneq = reqp->req_next; 1934 _aio_doneq->req_prev = reqp->req_prev; 1935 } 1936 } 1937 if (reqp) { 1938 _aio_doneq_cnt--; 1939 reqp->req_next = reqp->req_prev = reqp; 1940 reqp->req_state = AIO_REQ_DONE; 1941 } 1942 return (reqp); 1943 } 1944 1945 /* 1946 * An AIO request is identified by an aio_result_t pointer. The library 1947 * maps this aio_result_t pointer to its internal representation using a 1948 * hash table. This function adds an aio_result_t pointer to the hash table. 1949 */ 1950 static int 1951 _aio_hash_insert(aio_result_t *resultp, aio_req_t *reqp) 1952 { 1953 aio_hash_t *hashp; 1954 aio_req_t **prev; 1955 aio_req_t *next; 1956 1957 hashp = _aio_hash + AIOHASH(resultp); 1958 lmutex_lock(&hashp->hash_lock); 1959 prev = &hashp->hash_ptr; 1960 while ((next = *prev) != NULL) { 1961 if (resultp == next->req_resultp) { 1962 lmutex_unlock(&hashp->hash_lock); 1963 return (-1); 1964 } 1965 prev = &next->req_link; 1966 } 1967 *prev = reqp; 1968 ASSERT(reqp->req_link == NULL); 1969 lmutex_unlock(&hashp->hash_lock); 1970 return (0); 1971 } 1972 1973 /* 1974 * Remove an entry from the hash table. 1975 */ 1976 aio_req_t * 1977 _aio_hash_del(aio_result_t *resultp) 1978 { 1979 aio_hash_t *hashp; 1980 aio_req_t **prev; 1981 aio_req_t *next = NULL; 1982 1983 if (_aio_hash != NULL) { 1984 hashp = _aio_hash + AIOHASH(resultp); 1985 lmutex_lock(&hashp->hash_lock); 1986 prev = &hashp->hash_ptr; 1987 while ((next = *prev) != NULL) { 1988 if (resultp == next->req_resultp) { 1989 *prev = next->req_link; 1990 next->req_link = NULL; 1991 break; 1992 } 1993 prev = &next->req_link; 1994 } 1995 lmutex_unlock(&hashp->hash_lock); 1996 } 1997 return (next); 1998 } 1999 2000 /* 2001 * find an entry in the hash table 2002 */ 2003 aio_req_t * 2004 _aio_hash_find(aio_result_t *resultp) 2005 { 2006 aio_hash_t *hashp; 2007 aio_req_t **prev; 2008 aio_req_t *next = NULL; 2009 2010 if (_aio_hash != NULL) { 2011 hashp = _aio_hash + AIOHASH(resultp); 2012 lmutex_lock(&hashp->hash_lock); 2013 prev = &hashp->hash_ptr; 2014 while ((next = *prev) != NULL) { 2015 if (resultp == next->req_resultp) 2016 break; 2017 prev = &next->req_link; 2018 } 2019 lmutex_unlock(&hashp->hash_lock); 2020 } 2021 return (next); 2022 } 2023 2024 /* 2025 * AIO interface for POSIX 2026 */ 2027 int 2028 _aio_rw(aiocb_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker, 2029 int mode, int flg) 2030 { 2031 aio_req_t *reqp; 2032 aio_args_t *ap; 2033 int kerr; 2034 2035 if (aiocbp == NULL) { 2036 errno = EINVAL; 2037 return (-1); 2038 } 2039 2040 /* initialize kaio */ 2041 if (!_kaio_ok) 2042 _kaio_init(); 2043 2044 aiocbp->aio_state = NOCHECK; 2045 2046 /* 2047 * If we have been called because a list I/O 2048 * kaio() failed, we dont want to repeat the 2049 * system call 2050 */ 2051 2052 if (flg & AIO_KAIO) { 2053 /* 2054 * Try kernel aio first. 2055 * If errno is ENOTSUP/EBADFD, 2056 * fall back to the thread implementation. 2057 */ 2058 if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) { 2059 aiocbp->aio_resultp.aio_errno = EINPROGRESS; 2060 aiocbp->aio_state = CHECK; 2061 kerr = (int)_kaio(mode, aiocbp); 2062 if (kerr == 0) 2063 return (0); 2064 if (errno != ENOTSUP && errno != EBADFD) { 2065 aiocbp->aio_resultp.aio_errno = errno; 2066 aiocbp->aio_resultp.aio_return = -1; 2067 aiocbp->aio_state = NOCHECK; 2068 return (-1); 2069 } 2070 if (errno == EBADFD) 2071 SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes); 2072 } 2073 } 2074 2075 aiocbp->aio_resultp.aio_errno = EINPROGRESS; 2076 aiocbp->aio_state = USERAIO; 2077 2078 if (!__uaio_ok && __uaio_init() == -1) 2079 return (-1); 2080 2081 if ((reqp = _aio_req_alloc()) == NULL) { 2082 errno = EAGAIN; 2083 return (-1); 2084 } 2085 2086 /* 2087 * If an LIO request, add the list head to the aio request 2088 */ 2089 reqp->req_head = lio_head; 2090 reqp->req_type = AIO_POSIX_REQ; 2091 reqp->req_op = mode; 2092 reqp->req_largefile = 0; 2093 2094 if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) { 2095 reqp->req_sigevent.sigev_notify = SIGEV_NONE; 2096 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) { 2097 reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL; 2098 reqp->req_sigevent.sigev_signo = 2099 aiocbp->aio_sigevent.sigev_signo; 2100 reqp->req_sigevent.sigev_value.sival_ptr = 2101 aiocbp->aio_sigevent.sigev_value.sival_ptr; 2102 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) { 2103 port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr; 2104 reqp->req_sigevent.sigev_notify = SIGEV_PORT; 2105 /* 2106 * Reuse the sigevent structure to contain the port number 2107 * and the user value. Same for SIGEV_THREAD, below. 2108 */ 2109 reqp->req_sigevent.sigev_signo = 2110 pn->portnfy_port; 2111 reqp->req_sigevent.sigev_value.sival_ptr = 2112 pn->portnfy_user; 2113 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) { 2114 reqp->req_sigevent.sigev_notify = SIGEV_THREAD; 2115 /* 2116 * The sigevent structure contains the port number 2117 * and the user value. Same for SIGEV_PORT, above. 2118 */ 2119 reqp->req_sigevent.sigev_signo = 2120 aiocbp->aio_sigevent.sigev_signo; 2121 reqp->req_sigevent.sigev_value.sival_ptr = 2122 aiocbp->aio_sigevent.sigev_value.sival_ptr; 2123 } 2124 2125 reqp->req_resultp = &aiocbp->aio_resultp; 2126 reqp->req_aiocbp = aiocbp; 2127 ap = &reqp->req_args; 2128 ap->fd = aiocbp->aio_fildes; 2129 ap->buf = (caddr_t)aiocbp->aio_buf; 2130 ap->bufsz = aiocbp->aio_nbytes; 2131 ap->offset = aiocbp->aio_offset; 2132 2133 if ((flg & AIO_NO_DUPS) && 2134 _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) { 2135 aio_panic("_aio_rw(): request already in hash table"); 2136 _aio_req_free(reqp); 2137 errno = EINVAL; 2138 return (-1); 2139 } 2140 _aio_req_add(reqp, nextworker, mode); 2141 return (0); 2142 } 2143 2144 #if !defined(_LP64) 2145 /* 2146 * 64-bit AIO interface for POSIX 2147 */ 2148 int 2149 _aio_rw64(aiocb64_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker, 2150 int mode, int flg) 2151 { 2152 aio_req_t *reqp; 2153 aio_args_t *ap; 2154 int kerr; 2155 2156 if (aiocbp == NULL) { 2157 errno = EINVAL; 2158 return (-1); 2159 } 2160 2161 /* initialize kaio */ 2162 if (!_kaio_ok) 2163 _kaio_init(); 2164 2165 aiocbp->aio_state = NOCHECK; 2166 2167 /* 2168 * If we have been called because a list I/O 2169 * kaio() failed, we dont want to repeat the 2170 * system call 2171 */ 2172 2173 if (flg & AIO_KAIO) { 2174 /* 2175 * Try kernel aio first. 2176 * If errno is ENOTSUP/EBADFD, 2177 * fall back to the thread implementation. 2178 */ 2179 if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) { 2180 aiocbp->aio_resultp.aio_errno = EINPROGRESS; 2181 aiocbp->aio_state = CHECK; 2182 kerr = (int)_kaio(mode, aiocbp); 2183 if (kerr == 0) 2184 return (0); 2185 if (errno != ENOTSUP && errno != EBADFD) { 2186 aiocbp->aio_resultp.aio_errno = errno; 2187 aiocbp->aio_resultp.aio_return = -1; 2188 aiocbp->aio_state = NOCHECK; 2189 return (-1); 2190 } 2191 if (errno == EBADFD) 2192 SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes); 2193 } 2194 } 2195 2196 aiocbp->aio_resultp.aio_errno = EINPROGRESS; 2197 aiocbp->aio_state = USERAIO; 2198 2199 if (!__uaio_ok && __uaio_init() == -1) 2200 return (-1); 2201 2202 if ((reqp = _aio_req_alloc()) == NULL) { 2203 errno = EAGAIN; 2204 return (-1); 2205 } 2206 2207 /* 2208 * If an LIO request, add the list head to the aio request 2209 */ 2210 reqp->req_head = lio_head; 2211 reqp->req_type = AIO_POSIX_REQ; 2212 reqp->req_op = mode; 2213 reqp->req_largefile = 1; 2214 2215 if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) { 2216 reqp->req_sigevent.sigev_notify = SIGEV_NONE; 2217 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) { 2218 reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL; 2219 reqp->req_sigevent.sigev_signo = 2220 aiocbp->aio_sigevent.sigev_signo; 2221 reqp->req_sigevent.sigev_value.sival_ptr = 2222 aiocbp->aio_sigevent.sigev_value.sival_ptr; 2223 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) { 2224 port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr; 2225 reqp->req_sigevent.sigev_notify = SIGEV_PORT; 2226 reqp->req_sigevent.sigev_signo = 2227 pn->portnfy_port; 2228 reqp->req_sigevent.sigev_value.sival_ptr = 2229 pn->portnfy_user; 2230 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) { 2231 reqp->req_sigevent.sigev_notify = SIGEV_THREAD; 2232 reqp->req_sigevent.sigev_signo = 2233 aiocbp->aio_sigevent.sigev_signo; 2234 reqp->req_sigevent.sigev_value.sival_ptr = 2235 aiocbp->aio_sigevent.sigev_value.sival_ptr; 2236 } 2237 2238 reqp->req_resultp = &aiocbp->aio_resultp; 2239 reqp->req_aiocbp = aiocbp; 2240 ap = &reqp->req_args; 2241 ap->fd = aiocbp->aio_fildes; 2242 ap->buf = (caddr_t)aiocbp->aio_buf; 2243 ap->bufsz = aiocbp->aio_nbytes; 2244 ap->offset = aiocbp->aio_offset; 2245 2246 if ((flg & AIO_NO_DUPS) && 2247 _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) { 2248 aio_panic("_aio_rw64(): request already in hash table"); 2249 _aio_req_free(reqp); 2250 errno = EINVAL; 2251 return (-1); 2252 } 2253 _aio_req_add(reqp, nextworker, mode); 2254 return (0); 2255 } 2256 #endif /* !defined(_LP64) */ 2257