1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 22 /* 23 * Copyright 2008 Sun Microsystems, Inc. All rights reserved. 24 * Use is subject to license terms. 25 */ 26 27 #pragma ident "%Z%%M% %I% %E% SMI" 28 29 #include "lint.h" 30 #include "thr_uberdata.h" 31 #include "asyncio.h" 32 #include <atomic.h> 33 #include <sys/param.h> 34 #include <sys/file.h> 35 #include <sys/port.h> 36 37 static int _aio_hash_insert(aio_result_t *, aio_req_t *); 38 static aio_req_t *_aio_req_get(aio_worker_t *); 39 static void _aio_req_add(aio_req_t *, aio_worker_t **, int); 40 static void _aio_req_del(aio_worker_t *, aio_req_t *, int); 41 static void _aio_work_done(aio_worker_t *); 42 static void _aio_enq_doneq(aio_req_t *); 43 44 extern void _aio_lio_free(aio_lio_t *); 45 46 extern int __fdsync(int, int); 47 extern int __fcntl(int, int, ...); 48 extern int _port_dispatch(int, int, int, int, uintptr_t, void *); 49 50 static int _aio_fsync_del(aio_worker_t *, aio_req_t *); 51 static void _aiodone(aio_req_t *, ssize_t, int); 52 static void _aio_cancel_work(aio_worker_t *, int, int *, int *); 53 static void _aio_finish_request(aio_worker_t *, ssize_t, int); 54 55 /* 56 * switch for kernel async I/O 57 */ 58 int _kaio_ok = 0; /* 0 = disabled, 1 = on, -1 = error */ 59 60 /* 61 * Key for thread-specific data 62 */ 63 pthread_key_t _aio_key; 64 65 /* 66 * Array for determining whether or not a file supports kaio. 67 * Initialized in _kaio_init(). 68 */ 69 uint32_t *_kaio_supported = NULL; 70 71 /* 72 * workers for read/write requests 73 * (__aio_mutex lock protects circular linked list of workers) 74 */ 75 aio_worker_t *__workers_rw; /* circular list of AIO workers */ 76 aio_worker_t *__nextworker_rw; /* next worker in list of workers */ 77 int __rw_workerscnt; /* number of read/write workers */ 78 79 /* 80 * worker for notification requests. 81 */ 82 aio_worker_t *__workers_no; /* circular list of AIO workers */ 83 aio_worker_t *__nextworker_no; /* next worker in list of workers */ 84 int __no_workerscnt; /* number of write workers */ 85 86 aio_req_t *_aio_done_tail; /* list of done requests */ 87 aio_req_t *_aio_done_head; 88 89 mutex_t __aio_initlock = DEFAULTMUTEX; /* makes aio initialization atomic */ 90 cond_t __aio_initcv = DEFAULTCV; 91 int __aio_initbusy = 0; 92 93 mutex_t __aio_mutex = DEFAULTMUTEX; /* protects counts, and linked lists */ 94 cond_t _aio_iowait_cv = DEFAULTCV; /* wait for userland I/Os */ 95 96 pid_t __pid = (pid_t)-1; /* initialize as invalid pid */ 97 int _sigio_enabled = 0; /* when set, send SIGIO signal */ 98 99 aio_hash_t *_aio_hash; 100 101 aio_req_t *_aio_doneq; /* double linked done queue list */ 102 103 int _aio_donecnt = 0; 104 int _aio_waitncnt = 0; /* # of requests for aio_waitn */ 105 int _aio_doneq_cnt = 0; 106 int _aio_outstand_cnt = 0; /* # of outstanding requests */ 107 int _kaio_outstand_cnt = 0; /* # of outstanding kaio requests */ 108 int _aio_req_done_cnt = 0; /* req. done but not in "done queue" */ 109 int _aio_kernel_suspend = 0; /* active kernel kaio calls */ 110 int _aio_suscv_cnt = 0; /* aio_suspend calls waiting on cv's */ 111 112 int _max_workers = 256; /* max number of workers permitted */ 113 int _min_workers = 4; /* min number of workers */ 114 int _minworkload = 2; /* min number of request in q */ 115 int _aio_worker_cnt = 0; /* number of workers to do requests */ 116 int __uaio_ok = 0; /* AIO has been enabled */ 117 sigset_t _worker_set; /* worker's signal mask */ 118 119 int _aiowait_flag = 0; /* when set, aiowait() is inprogress */ 120 int _aio_flags = 0; /* see asyncio.h defines for */ 121 122 aio_worker_t *_kaiowp = NULL; /* points to kaio cleanup thread */ 123 124 int hz; /* clock ticks per second */ 125 126 static int 127 _kaio_supported_init(void) 128 { 129 void *ptr; 130 size_t size; 131 132 if (_kaio_supported != NULL) /* already initialized */ 133 return (0); 134 135 size = MAX_KAIO_FDARRAY_SIZE * sizeof (uint32_t); 136 ptr = mmap(NULL, size, PROT_READ | PROT_WRITE, 137 MAP_PRIVATE | MAP_ANON, -1, (off_t)0); 138 if (ptr == MAP_FAILED) 139 return (-1); 140 _kaio_supported = ptr; 141 return (0); 142 } 143 144 /* 145 * The aio subsystem is initialized when an AIO request is made. 146 * Constants are initialized like the max number of workers that 147 * the subsystem can create, and the minimum number of workers 148 * permitted before imposing some restrictions. Also, some 149 * workers are created. 150 */ 151 int 152 __uaio_init(void) 153 { 154 int ret = -1; 155 int i; 156 int cancel_state; 157 158 lmutex_lock(&__aio_initlock); 159 (void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state); 160 while (__aio_initbusy) 161 (void) cond_wait(&__aio_initcv, &__aio_initlock); 162 (void) pthread_setcancelstate(cancel_state, NULL); 163 if (__uaio_ok) { /* already initialized */ 164 lmutex_unlock(&__aio_initlock); 165 return (0); 166 } 167 __aio_initbusy = 1; 168 lmutex_unlock(&__aio_initlock); 169 170 hz = (int)sysconf(_SC_CLK_TCK); 171 __pid = getpid(); 172 173 setup_cancelsig(SIGAIOCANCEL); 174 175 if (_kaio_supported_init() != 0) 176 goto out; 177 178 /* 179 * Allocate and initialize the hash table. 180 * Do this only once, even if __uaio_init() is called twice. 181 */ 182 if (_aio_hash == NULL) { 183 /* LINTED pointer cast */ 184 _aio_hash = (aio_hash_t *)mmap(NULL, 185 HASHSZ * sizeof (aio_hash_t), PROT_READ | PROT_WRITE, 186 MAP_PRIVATE | MAP_ANON, -1, (off_t)0); 187 if ((void *)_aio_hash == MAP_FAILED) { 188 _aio_hash = NULL; 189 goto out; 190 } 191 for (i = 0; i < HASHSZ; i++) 192 (void) mutex_init(&_aio_hash[i].hash_lock, 193 USYNC_THREAD, NULL); 194 } 195 196 /* 197 * Initialize worker's signal mask to only catch SIGAIOCANCEL. 198 */ 199 (void) sigfillset(&_worker_set); 200 (void) sigdelset(&_worker_set, SIGAIOCANCEL); 201 202 /* 203 * Create one worker to send asynchronous notifications. 204 * Do this only once, even if __uaio_init() is called twice. 205 */ 206 if (__no_workerscnt == 0 && 207 (_aio_create_worker(NULL, AIONOTIFY) != 0)) { 208 errno = EAGAIN; 209 goto out; 210 } 211 212 /* 213 * Create the minimum number of read/write workers. 214 * And later check whether atleast one worker is created; 215 * lwp_create() calls could fail because of segkp exhaustion. 216 */ 217 for (i = 0; i < _min_workers; i++) 218 (void) _aio_create_worker(NULL, AIOREAD); 219 if (__rw_workerscnt == 0) { 220 errno = EAGAIN; 221 goto out; 222 } 223 224 ret = 0; 225 out: 226 lmutex_lock(&__aio_initlock); 227 if (ret == 0) 228 __uaio_ok = 1; 229 __aio_initbusy = 0; 230 (void) cond_broadcast(&__aio_initcv); 231 lmutex_unlock(&__aio_initlock); 232 return (ret); 233 } 234 235 /* 236 * Called from close() before actually performing the real _close(). 237 */ 238 void 239 _aio_close(int fd) 240 { 241 if (fd < 0) /* avoid cancelling everything */ 242 return; 243 /* 244 * Cancel all outstanding aio requests for this file descriptor. 245 */ 246 if (__uaio_ok) 247 (void) aiocancel_all(fd); 248 /* 249 * If we have allocated the bit array, clear the bit for this file. 250 * The next open may re-use this file descriptor and the new file 251 * may have different kaio() behaviour. 252 */ 253 if (_kaio_supported != NULL) 254 CLEAR_KAIO_SUPPORTED(fd); 255 } 256 257 /* 258 * special kaio cleanup thread sits in a loop in the 259 * kernel waiting for pending kaio requests to complete. 260 */ 261 void * 262 _kaio_cleanup_thread(void *arg) 263 { 264 if (pthread_setspecific(_aio_key, arg) != 0) 265 aio_panic("_kaio_cleanup_thread, pthread_setspecific()"); 266 (void) _kaio(AIOSTART); 267 return (arg); 268 } 269 270 /* 271 * initialize kaio. 272 */ 273 void 274 _kaio_init() 275 { 276 int error; 277 sigset_t oset; 278 int cancel_state; 279 280 lmutex_lock(&__aio_initlock); 281 (void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state); 282 while (__aio_initbusy) 283 (void) cond_wait(&__aio_initcv, &__aio_initlock); 284 (void) pthread_setcancelstate(cancel_state, NULL); 285 if (_kaio_ok) { /* already initialized */ 286 lmutex_unlock(&__aio_initlock); 287 return; 288 } 289 __aio_initbusy = 1; 290 lmutex_unlock(&__aio_initlock); 291 292 if (_kaio_supported_init() != 0) 293 error = ENOMEM; 294 else if ((_kaiowp = _aio_worker_alloc()) == NULL) 295 error = ENOMEM; 296 else if ((error = (int)_kaio(AIOINIT)) == 0) { 297 (void) pthread_sigmask(SIG_SETMASK, &maskset, &oset); 298 error = thr_create(NULL, AIOSTKSIZE, _kaio_cleanup_thread, 299 _kaiowp, THR_DAEMON, &_kaiowp->work_tid); 300 (void) pthread_sigmask(SIG_SETMASK, &oset, NULL); 301 } 302 if (error && _kaiowp != NULL) { 303 _aio_worker_free(_kaiowp); 304 _kaiowp = NULL; 305 } 306 307 lmutex_lock(&__aio_initlock); 308 if (error) 309 _kaio_ok = -1; 310 else 311 _kaio_ok = 1; 312 __aio_initbusy = 0; 313 (void) cond_broadcast(&__aio_initcv); 314 lmutex_unlock(&__aio_initlock); 315 } 316 317 int 318 aioread(int fd, caddr_t buf, int bufsz, off_t offset, int whence, 319 aio_result_t *resultp) 320 { 321 return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOREAD)); 322 } 323 324 int 325 aiowrite(int fd, caddr_t buf, int bufsz, off_t offset, int whence, 326 aio_result_t *resultp) 327 { 328 return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOWRITE)); 329 } 330 331 #if !defined(_LP64) 332 int 333 aioread64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence, 334 aio_result_t *resultp) 335 { 336 return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAREAD64)); 337 } 338 339 int 340 aiowrite64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence, 341 aio_result_t *resultp) 342 { 343 return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAWRITE64)); 344 } 345 #endif /* !defined(_LP64) */ 346 347 int 348 _aiorw(int fd, caddr_t buf, int bufsz, offset_t offset, int whence, 349 aio_result_t *resultp, int mode) 350 { 351 aio_req_t *reqp; 352 aio_args_t *ap; 353 offset_t loffset; 354 struct stat64 stat64; 355 int error = 0; 356 int kerr; 357 int umode; 358 359 switch (whence) { 360 361 case SEEK_SET: 362 loffset = offset; 363 break; 364 case SEEK_CUR: 365 if ((loffset = llseek(fd, 0, SEEK_CUR)) == -1) 366 error = -1; 367 else 368 loffset += offset; 369 break; 370 case SEEK_END: 371 if (fstat64(fd, &stat64) == -1) 372 error = -1; 373 else 374 loffset = offset + stat64.st_size; 375 break; 376 default: 377 errno = EINVAL; 378 error = -1; 379 } 380 381 if (error) 382 return (error); 383 384 /* initialize kaio */ 385 if (!_kaio_ok) 386 _kaio_init(); 387 388 /* 389 * _aio_do_request() needs the original request code (mode) to be able 390 * to choose the appropiate 32/64 bit function. All other functions 391 * only require the difference between READ and WRITE (umode). 392 */ 393 if (mode == AIOAREAD64 || mode == AIOAWRITE64) 394 umode = mode - AIOAREAD64; 395 else 396 umode = mode; 397 398 /* 399 * Try kernel aio first. 400 * If errno is ENOTSUP/EBADFD, fall back to the thread implementation. 401 */ 402 if (_kaio_ok > 0 && KAIO_SUPPORTED(fd)) { 403 resultp->aio_errno = 0; 404 sig_mutex_lock(&__aio_mutex); 405 _kaio_outstand_cnt++; 406 sig_mutex_unlock(&__aio_mutex); 407 kerr = (int)_kaio(((resultp->aio_return == AIO_INPROGRESS) ? 408 (umode | AIO_POLL_BIT) : umode), 409 fd, buf, bufsz, loffset, resultp); 410 if (kerr == 0) { 411 return (0); 412 } 413 sig_mutex_lock(&__aio_mutex); 414 _kaio_outstand_cnt--; 415 sig_mutex_unlock(&__aio_mutex); 416 if (errno != ENOTSUP && errno != EBADFD) 417 return (-1); 418 if (errno == EBADFD) 419 SET_KAIO_NOT_SUPPORTED(fd); 420 } 421 422 if (!__uaio_ok && __uaio_init() == -1) 423 return (-1); 424 425 if ((reqp = _aio_req_alloc()) == NULL) { 426 errno = EAGAIN; 427 return (-1); 428 } 429 430 /* 431 * _aio_do_request() checks reqp->req_op to differentiate 432 * between 32 and 64 bit access. 433 */ 434 reqp->req_op = mode; 435 reqp->req_resultp = resultp; 436 ap = &reqp->req_args; 437 ap->fd = fd; 438 ap->buf = buf; 439 ap->bufsz = bufsz; 440 ap->offset = loffset; 441 442 if (_aio_hash_insert(resultp, reqp) != 0) { 443 _aio_req_free(reqp); 444 errno = EINVAL; 445 return (-1); 446 } 447 /* 448 * _aio_req_add() only needs the difference between READ and 449 * WRITE to choose the right worker queue. 450 */ 451 _aio_req_add(reqp, &__nextworker_rw, umode); 452 return (0); 453 } 454 455 int 456 aiocancel(aio_result_t *resultp) 457 { 458 aio_req_t *reqp; 459 aio_worker_t *aiowp; 460 int ret; 461 int done = 0; 462 int canceled = 0; 463 464 if (!__uaio_ok) { 465 errno = EINVAL; 466 return (-1); 467 } 468 469 sig_mutex_lock(&__aio_mutex); 470 reqp = _aio_hash_find(resultp); 471 if (reqp == NULL) { 472 if (_aio_outstand_cnt == _aio_req_done_cnt) 473 errno = EINVAL; 474 else 475 errno = EACCES; 476 ret = -1; 477 } else { 478 aiowp = reqp->req_worker; 479 sig_mutex_lock(&aiowp->work_qlock1); 480 (void) _aio_cancel_req(aiowp, reqp, &canceled, &done); 481 sig_mutex_unlock(&aiowp->work_qlock1); 482 483 if (canceled) { 484 ret = 0; 485 } else { 486 if (_aio_outstand_cnt == 0 || 487 _aio_outstand_cnt == _aio_req_done_cnt) 488 errno = EINVAL; 489 else 490 errno = EACCES; 491 ret = -1; 492 } 493 } 494 sig_mutex_unlock(&__aio_mutex); 495 return (ret); 496 } 497 498 /* ARGSUSED */ 499 static void 500 _aiowait_cleanup(void *arg) 501 { 502 sig_mutex_lock(&__aio_mutex); 503 _aiowait_flag--; 504 sig_mutex_unlock(&__aio_mutex); 505 } 506 507 /* 508 * This must be asynch safe and cancel safe 509 */ 510 aio_result_t * 511 aiowait(struct timeval *uwait) 512 { 513 aio_result_t *uresultp; 514 aio_result_t *kresultp; 515 aio_result_t *resultp; 516 int dontblock; 517 int timedwait = 0; 518 int kaio_errno = 0; 519 struct timeval twait; 520 struct timeval *wait = NULL; 521 hrtime_t hrtend; 522 hrtime_t hres; 523 524 if (uwait) { 525 /* 526 * Check for a valid specified wait time. 527 * If it is invalid, fail the call right away. 528 */ 529 if (uwait->tv_sec < 0 || uwait->tv_usec < 0 || 530 uwait->tv_usec >= MICROSEC) { 531 errno = EINVAL; 532 return ((aio_result_t *)-1); 533 } 534 535 if (uwait->tv_sec > 0 || uwait->tv_usec > 0) { 536 hrtend = gethrtime() + 537 (hrtime_t)uwait->tv_sec * NANOSEC + 538 (hrtime_t)uwait->tv_usec * (NANOSEC / MICROSEC); 539 twait = *uwait; 540 wait = &twait; 541 timedwait++; 542 } else { 543 /* polling */ 544 sig_mutex_lock(&__aio_mutex); 545 if (_kaio_outstand_cnt == 0) { 546 kresultp = (aio_result_t *)-1; 547 } else { 548 kresultp = (aio_result_t *)_kaio(AIOWAIT, 549 (struct timeval *)-1, 1); 550 if (kresultp != (aio_result_t *)-1 && 551 kresultp != NULL && 552 kresultp != (aio_result_t *)1) { 553 _kaio_outstand_cnt--; 554 sig_mutex_unlock(&__aio_mutex); 555 return (kresultp); 556 } 557 } 558 uresultp = _aio_req_done(); 559 sig_mutex_unlock(&__aio_mutex); 560 if (uresultp != NULL && 561 uresultp != (aio_result_t *)-1) { 562 return (uresultp); 563 } 564 if (uresultp == (aio_result_t *)-1 && 565 kresultp == (aio_result_t *)-1) { 566 errno = EINVAL; 567 return ((aio_result_t *)-1); 568 } else { 569 return (NULL); 570 } 571 } 572 } 573 574 for (;;) { 575 sig_mutex_lock(&__aio_mutex); 576 uresultp = _aio_req_done(); 577 if (uresultp != NULL && uresultp != (aio_result_t *)-1) { 578 sig_mutex_unlock(&__aio_mutex); 579 resultp = uresultp; 580 break; 581 } 582 _aiowait_flag++; 583 dontblock = (uresultp == (aio_result_t *)-1); 584 if (dontblock && _kaio_outstand_cnt == 0) { 585 kresultp = (aio_result_t *)-1; 586 kaio_errno = EINVAL; 587 } else { 588 sig_mutex_unlock(&__aio_mutex); 589 pthread_cleanup_push(_aiowait_cleanup, NULL); 590 _cancel_prologue(); 591 kresultp = (aio_result_t *)_kaio(AIOWAIT, 592 wait, dontblock); 593 _cancel_epilogue(); 594 pthread_cleanup_pop(0); 595 sig_mutex_lock(&__aio_mutex); 596 kaio_errno = errno; 597 } 598 _aiowait_flag--; 599 sig_mutex_unlock(&__aio_mutex); 600 if (kresultp == (aio_result_t *)1) { 601 /* aiowait() awakened by an aionotify() */ 602 continue; 603 } else if (kresultp != NULL && 604 kresultp != (aio_result_t *)-1) { 605 resultp = kresultp; 606 sig_mutex_lock(&__aio_mutex); 607 _kaio_outstand_cnt--; 608 sig_mutex_unlock(&__aio_mutex); 609 break; 610 } else if (kresultp == (aio_result_t *)-1 && 611 kaio_errno == EINVAL && 612 uresultp == (aio_result_t *)-1) { 613 errno = kaio_errno; 614 resultp = (aio_result_t *)-1; 615 break; 616 } else if (kresultp == (aio_result_t *)-1 && 617 kaio_errno == EINTR) { 618 errno = kaio_errno; 619 resultp = (aio_result_t *)-1; 620 break; 621 } else if (timedwait) { 622 hres = hrtend - gethrtime(); 623 if (hres <= 0) { 624 /* time is up; return */ 625 resultp = NULL; 626 break; 627 } else { 628 /* 629 * Some time left. Round up the remaining time 630 * in nanoseconds to microsec. Retry the call. 631 */ 632 hres += (NANOSEC / MICROSEC) - 1; 633 wait->tv_sec = hres / NANOSEC; 634 wait->tv_usec = 635 (hres % NANOSEC) / (NANOSEC / MICROSEC); 636 } 637 } else { 638 ASSERT(kresultp == NULL && uresultp == NULL); 639 resultp = NULL; 640 continue; 641 } 642 } 643 return (resultp); 644 } 645 646 /* 647 * _aio_get_timedelta calculates the remaining time and stores the result 648 * into timespec_t *wait. 649 */ 650 651 int 652 _aio_get_timedelta(timespec_t *end, timespec_t *wait) 653 { 654 int ret = 0; 655 struct timeval cur; 656 timespec_t curtime; 657 658 (void) gettimeofday(&cur, NULL); 659 curtime.tv_sec = cur.tv_sec; 660 curtime.tv_nsec = cur.tv_usec * 1000; /* convert us to ns */ 661 662 if (end->tv_sec >= curtime.tv_sec) { 663 wait->tv_sec = end->tv_sec - curtime.tv_sec; 664 if (end->tv_nsec >= curtime.tv_nsec) { 665 wait->tv_nsec = end->tv_nsec - curtime.tv_nsec; 666 if (wait->tv_sec == 0 && wait->tv_nsec == 0) 667 ret = -1; /* timer expired */ 668 } else { 669 if (end->tv_sec > curtime.tv_sec) { 670 wait->tv_sec -= 1; 671 wait->tv_nsec = NANOSEC - 672 (curtime.tv_nsec - end->tv_nsec); 673 } else { 674 ret = -1; /* timer expired */ 675 } 676 } 677 } else { 678 ret = -1; 679 } 680 return (ret); 681 } 682 683 /* 684 * If closing by file descriptor: we will simply cancel all the outstanding 685 * aio`s and return. Those aio's in question will have either noticed the 686 * cancellation notice before, during, or after initiating io. 687 */ 688 int 689 aiocancel_all(int fd) 690 { 691 aio_req_t *reqp; 692 aio_req_t **reqpp; 693 aio_worker_t *first; 694 aio_worker_t *next; 695 int canceled = 0; 696 int done = 0; 697 int cancelall = 0; 698 699 sig_mutex_lock(&__aio_mutex); 700 701 if (_aio_outstand_cnt == 0) { 702 sig_mutex_unlock(&__aio_mutex); 703 return (AIO_ALLDONE); 704 } 705 706 /* 707 * Cancel requests from the read/write workers' queues. 708 */ 709 first = __nextworker_rw; 710 next = first; 711 do { 712 _aio_cancel_work(next, fd, &canceled, &done); 713 } while ((next = next->work_forw) != first); 714 715 /* 716 * finally, check if there are requests on the done queue that 717 * should be canceled. 718 */ 719 if (fd < 0) 720 cancelall = 1; 721 reqpp = &_aio_done_tail; 722 while ((reqp = *reqpp) != NULL) { 723 if (cancelall || reqp->req_args.fd == fd) { 724 *reqpp = reqp->req_next; 725 _aio_donecnt--; 726 (void) _aio_hash_del(reqp->req_resultp); 727 _aio_req_free(reqp); 728 } else 729 reqpp = &reqp->req_next; 730 } 731 if (cancelall) { 732 ASSERT(_aio_donecnt == 0); 733 _aio_done_head = NULL; 734 } 735 sig_mutex_unlock(&__aio_mutex); 736 737 if (canceled && done == 0) 738 return (AIO_CANCELED); 739 else if (done && canceled == 0) 740 return (AIO_ALLDONE); 741 else if ((canceled + done == 0) && KAIO_SUPPORTED(fd)) 742 return ((int)_kaio(AIOCANCEL, fd, NULL)); 743 return (AIO_NOTCANCELED); 744 } 745 746 /* 747 * Cancel requests from a given work queue. If the file descriptor 748 * parameter, fd, is non-negative, then only cancel those requests 749 * in this queue that are to this file descriptor. If the fd 750 * parameter is -1, then cancel all requests. 751 */ 752 static void 753 _aio_cancel_work(aio_worker_t *aiowp, int fd, int *canceled, int *done) 754 { 755 aio_req_t *reqp; 756 757 sig_mutex_lock(&aiowp->work_qlock1); 758 /* 759 * cancel queued requests first. 760 */ 761 reqp = aiowp->work_tail1; 762 while (reqp != NULL) { 763 if (fd < 0 || reqp->req_args.fd == fd) { 764 if (_aio_cancel_req(aiowp, reqp, canceled, done)) { 765 /* 766 * Callers locks were dropped. 767 * reqp is invalid; start traversing 768 * the list from the beginning again. 769 */ 770 reqp = aiowp->work_tail1; 771 continue; 772 } 773 } 774 reqp = reqp->req_next; 775 } 776 /* 777 * Since the queued requests have been canceled, there can 778 * only be one inprogress request that should be canceled. 779 */ 780 if ((reqp = aiowp->work_req) != NULL && 781 (fd < 0 || reqp->req_args.fd == fd)) 782 (void) _aio_cancel_req(aiowp, reqp, canceled, done); 783 sig_mutex_unlock(&aiowp->work_qlock1); 784 } 785 786 /* 787 * Cancel a request. Return 1 if the callers locks were temporarily 788 * dropped, otherwise return 0. 789 */ 790 int 791 _aio_cancel_req(aio_worker_t *aiowp, aio_req_t *reqp, int *canceled, int *done) 792 { 793 int ostate = reqp->req_state; 794 795 ASSERT(MUTEX_HELD(&__aio_mutex)); 796 ASSERT(MUTEX_HELD(&aiowp->work_qlock1)); 797 if (ostate == AIO_REQ_CANCELED) 798 return (0); 799 if (ostate == AIO_REQ_DONE || ostate == AIO_REQ_DONEQ) { 800 (*done)++; 801 return (0); 802 } 803 if (reqp->req_op == AIOFSYNC && reqp != aiowp->work_req) { 804 ASSERT(POSIX_AIO(reqp)); 805 /* Cancel the queued aio_fsync() request */ 806 if (!reqp->req_head->lio_canned) { 807 reqp->req_head->lio_canned = 1; 808 _aio_outstand_cnt--; 809 (*canceled)++; 810 } 811 return (0); 812 } 813 reqp->req_state = AIO_REQ_CANCELED; 814 _aio_req_del(aiowp, reqp, ostate); 815 (void) _aio_hash_del(reqp->req_resultp); 816 (*canceled)++; 817 if (reqp == aiowp->work_req) { 818 ASSERT(ostate == AIO_REQ_INPROGRESS); 819 /* 820 * Set the result values now, before _aiodone() is called. 821 * We do this because the application can expect aio_return 822 * and aio_errno to be set to -1 and ECANCELED, respectively, 823 * immediately after a successful return from aiocancel() 824 * or aio_cancel(). 825 */ 826 _aio_set_result(reqp, -1, ECANCELED); 827 (void) thr_kill(aiowp->work_tid, SIGAIOCANCEL); 828 return (0); 829 } 830 if (!POSIX_AIO(reqp)) { 831 _aio_outstand_cnt--; 832 _aio_set_result(reqp, -1, ECANCELED); 833 return (0); 834 } 835 sig_mutex_unlock(&aiowp->work_qlock1); 836 sig_mutex_unlock(&__aio_mutex); 837 _aiodone(reqp, -1, ECANCELED); 838 sig_mutex_lock(&__aio_mutex); 839 sig_mutex_lock(&aiowp->work_qlock1); 840 return (1); 841 } 842 843 int 844 _aio_create_worker(aio_req_t *reqp, int mode) 845 { 846 aio_worker_t *aiowp, **workers, **nextworker; 847 int *aio_workerscnt; 848 void *(*func)(void *); 849 sigset_t oset; 850 int error; 851 852 /* 853 * Put the new worker thread in the right queue. 854 */ 855 switch (mode) { 856 case AIOREAD: 857 case AIOWRITE: 858 case AIOAREAD: 859 case AIOAWRITE: 860 #if !defined(_LP64) 861 case AIOAREAD64: 862 case AIOAWRITE64: 863 #endif 864 workers = &__workers_rw; 865 nextworker = &__nextworker_rw; 866 aio_workerscnt = &__rw_workerscnt; 867 func = _aio_do_request; 868 break; 869 case AIONOTIFY: 870 workers = &__workers_no; 871 nextworker = &__nextworker_no; 872 func = _aio_do_notify; 873 aio_workerscnt = &__no_workerscnt; 874 break; 875 default: 876 aio_panic("_aio_create_worker: invalid mode"); 877 break; 878 } 879 880 if ((aiowp = _aio_worker_alloc()) == NULL) 881 return (-1); 882 883 if (reqp) { 884 reqp->req_state = AIO_REQ_QUEUED; 885 reqp->req_worker = aiowp; 886 aiowp->work_head1 = reqp; 887 aiowp->work_tail1 = reqp; 888 aiowp->work_next1 = reqp; 889 aiowp->work_count1 = 1; 890 aiowp->work_minload1 = 1; 891 } 892 893 (void) pthread_sigmask(SIG_SETMASK, &maskset, &oset); 894 error = thr_create(NULL, AIOSTKSIZE, func, aiowp, 895 THR_DAEMON | THR_SUSPENDED, &aiowp->work_tid); 896 (void) pthread_sigmask(SIG_SETMASK, &oset, NULL); 897 if (error) { 898 if (reqp) { 899 reqp->req_state = 0; 900 reqp->req_worker = NULL; 901 } 902 _aio_worker_free(aiowp); 903 return (-1); 904 } 905 906 lmutex_lock(&__aio_mutex); 907 (*aio_workerscnt)++; 908 if (*workers == NULL) { 909 aiowp->work_forw = aiowp; 910 aiowp->work_backw = aiowp; 911 *nextworker = aiowp; 912 *workers = aiowp; 913 } else { 914 aiowp->work_backw = (*workers)->work_backw; 915 aiowp->work_forw = (*workers); 916 (*workers)->work_backw->work_forw = aiowp; 917 (*workers)->work_backw = aiowp; 918 } 919 _aio_worker_cnt++; 920 lmutex_unlock(&__aio_mutex); 921 922 (void) thr_continue(aiowp->work_tid); 923 924 return (0); 925 } 926 927 /* 928 * This is the worker's main routine. 929 * The task of this function is to execute all queued requests; 930 * once the last pending request is executed this function will block 931 * in _aio_idle(). A new incoming request must wakeup this thread to 932 * restart the work. 933 * Every worker has an own work queue. The queue lock is required 934 * to synchronize the addition of new requests for this worker or 935 * cancellation of pending/running requests. 936 * 937 * Cancellation scenarios: 938 * The cancellation of a request is being done asynchronously using 939 * _aio_cancel_req() from another thread context. 940 * A queued request can be cancelled in different manners : 941 * a) request is queued but not "in progress" or "done" (AIO_REQ_QUEUED): 942 * - lock the queue -> remove the request -> unlock the queue 943 * - this function/thread does not detect this cancellation process 944 * b) request is in progress (AIO_REQ_INPROGRESS) : 945 * - this function first allow the cancellation of the running 946 * request with the flag "work_cancel_flg=1" 947 * see _aio_req_get() -> _aio_cancel_on() 948 * During this phase, it is allowed to interrupt the worker 949 * thread running the request (this thread) using the SIGAIOCANCEL 950 * signal. 951 * Once this thread returns from the kernel (because the request 952 * is just done), then it must disable a possible cancellation 953 * and proceed to finish the request. To disable the cancellation 954 * this thread must use _aio_cancel_off() to set "work_cancel_flg=0". 955 * c) request is already done (AIO_REQ_DONE || AIO_REQ_DONEQ): 956 * same procedure as in a) 957 * 958 * To b) 959 * This thread uses sigsetjmp() to define the position in the code, where 960 * it wish to continue working in the case that a SIGAIOCANCEL signal 961 * is detected. 962 * Normally this thread should get the cancellation signal during the 963 * kernel phase (reading or writing). In that case the signal handler 964 * aiosigcancelhndlr() is activated using the worker thread context, 965 * which again will use the siglongjmp() function to break the standard 966 * code flow and jump to the "sigsetjmp" position, provided that 967 * "work_cancel_flg" is set to "1". 968 * Because the "work_cancel_flg" is only manipulated by this worker 969 * thread and it can only run on one CPU at a given time, it is not 970 * necessary to protect that flag with the queue lock. 971 * Returning from the kernel (read or write system call) we must 972 * first disable the use of the SIGAIOCANCEL signal and accordingly 973 * the use of the siglongjmp() function to prevent a possible deadlock: 974 * - It can happens that this worker thread returns from the kernel and 975 * blocks in "work_qlock1", 976 * - then a second thread cancels the apparently "in progress" request 977 * and sends the SIGAIOCANCEL signal to the worker thread, 978 * - the worker thread gets assigned the "work_qlock1" and will returns 979 * from the kernel, 980 * - the kernel detects the pending signal and activates the signal 981 * handler instead, 982 * - if the "work_cancel_flg" is still set then the signal handler 983 * should use siglongjmp() to cancel the "in progress" request and 984 * it would try to acquire the same work_qlock1 in _aio_req_get() 985 * for a second time => deadlock. 986 * To avoid that situation we disable the cancellation of the request 987 * in progress BEFORE we try to acquire the work_qlock1. 988 * In that case the signal handler will not call siglongjmp() and the 989 * worker thread will continue running the standard code flow. 990 * Then this thread must check the AIO_REQ_CANCELED flag to emulate 991 * an eventually required siglongjmp() freeing the work_qlock1 and 992 * avoiding a deadlock. 993 */ 994 void * 995 _aio_do_request(void *arglist) 996 { 997 aio_worker_t *aiowp = (aio_worker_t *)arglist; 998 ulwp_t *self = curthread; 999 struct aio_args *arg; 1000 aio_req_t *reqp; /* current AIO request */ 1001 ssize_t retval; 1002 int append; 1003 int error; 1004 1005 if (pthread_setspecific(_aio_key, aiowp) != 0) 1006 aio_panic("_aio_do_request, pthread_setspecific()"); 1007 (void) pthread_sigmask(SIG_SETMASK, &_worker_set, NULL); 1008 ASSERT(aiowp->work_req == NULL); 1009 1010 /* 1011 * We resume here when an operation is cancelled. 1012 * On first entry, aiowp->work_req == NULL, so all 1013 * we do is block SIGAIOCANCEL. 1014 */ 1015 (void) sigsetjmp(aiowp->work_jmp_buf, 0); 1016 ASSERT(self->ul_sigdefer == 0); 1017 1018 sigoff(self); /* block SIGAIOCANCEL */ 1019 if (aiowp->work_req != NULL) 1020 _aio_finish_request(aiowp, -1, ECANCELED); 1021 1022 for (;;) { 1023 /* 1024 * Put completed requests on aio_done_list. This has 1025 * to be done as part of the main loop to ensure that 1026 * we don't artificially starve any aiowait'ers. 1027 */ 1028 if (aiowp->work_done1) 1029 _aio_work_done(aiowp); 1030 1031 top: 1032 /* consume any deferred SIGAIOCANCEL signal here */ 1033 sigon(self); 1034 sigoff(self); 1035 1036 while ((reqp = _aio_req_get(aiowp)) == NULL) { 1037 if (_aio_idle(aiowp) != 0) 1038 goto top; 1039 } 1040 arg = &reqp->req_args; 1041 ASSERT(reqp->req_state == AIO_REQ_INPROGRESS || 1042 reqp->req_state == AIO_REQ_CANCELED); 1043 error = 0; 1044 1045 switch (reqp->req_op) { 1046 case AIOREAD: 1047 case AIOAREAD: 1048 sigon(self); /* unblock SIGAIOCANCEL */ 1049 retval = pread(arg->fd, arg->buf, 1050 arg->bufsz, arg->offset); 1051 if (retval == -1) { 1052 if (errno == ESPIPE) { 1053 retval = read(arg->fd, 1054 arg->buf, arg->bufsz); 1055 if (retval == -1) 1056 error = errno; 1057 } else { 1058 error = errno; 1059 } 1060 } 1061 sigoff(self); /* block SIGAIOCANCEL */ 1062 break; 1063 case AIOWRITE: 1064 case AIOAWRITE: 1065 /* 1066 * The SUSv3 POSIX spec for aio_write() states: 1067 * If O_APPEND is set for the file descriptor, 1068 * write operations append to the file in the 1069 * same order as the calls were made. 1070 * but, somewhat inconsistently, it requires pwrite() 1071 * to ignore the O_APPEND setting. So we have to use 1072 * fcntl() to get the open modes and call write() for 1073 * the O_APPEND case. 1074 */ 1075 append = (__fcntl(arg->fd, F_GETFL) & O_APPEND); 1076 sigon(self); /* unblock SIGAIOCANCEL */ 1077 retval = append? 1078 write(arg->fd, arg->buf, arg->bufsz) : 1079 pwrite(arg->fd, arg->buf, arg->bufsz, 1080 arg->offset); 1081 if (retval == -1) { 1082 if (errno == ESPIPE) { 1083 retval = write(arg->fd, 1084 arg->buf, arg->bufsz); 1085 if (retval == -1) 1086 error = errno; 1087 } else { 1088 error = errno; 1089 } 1090 } 1091 sigoff(self); /* block SIGAIOCANCEL */ 1092 break; 1093 #if !defined(_LP64) 1094 case AIOAREAD64: 1095 sigon(self); /* unblock SIGAIOCANCEL */ 1096 retval = pread64(arg->fd, arg->buf, 1097 arg->bufsz, arg->offset); 1098 if (retval == -1) { 1099 if (errno == ESPIPE) { 1100 retval = read(arg->fd, 1101 arg->buf, arg->bufsz); 1102 if (retval == -1) 1103 error = errno; 1104 } else { 1105 error = errno; 1106 } 1107 } 1108 sigoff(self); /* block SIGAIOCANCEL */ 1109 break; 1110 case AIOAWRITE64: 1111 /* 1112 * The SUSv3 POSIX spec for aio_write() states: 1113 * If O_APPEND is set for the file descriptor, 1114 * write operations append to the file in the 1115 * same order as the calls were made. 1116 * but, somewhat inconsistently, it requires pwrite() 1117 * to ignore the O_APPEND setting. So we have to use 1118 * fcntl() to get the open modes and call write() for 1119 * the O_APPEND case. 1120 */ 1121 append = (__fcntl(arg->fd, F_GETFL) & O_APPEND); 1122 sigon(self); /* unblock SIGAIOCANCEL */ 1123 retval = append? 1124 write(arg->fd, arg->buf, arg->bufsz) : 1125 pwrite64(arg->fd, arg->buf, arg->bufsz, 1126 arg->offset); 1127 if (retval == -1) { 1128 if (errno == ESPIPE) { 1129 retval = write(arg->fd, 1130 arg->buf, arg->bufsz); 1131 if (retval == -1) 1132 error = errno; 1133 } else { 1134 error = errno; 1135 } 1136 } 1137 sigoff(self); /* block SIGAIOCANCEL */ 1138 break; 1139 #endif /* !defined(_LP64) */ 1140 case AIOFSYNC: 1141 if (_aio_fsync_del(aiowp, reqp)) 1142 goto top; 1143 ASSERT(reqp->req_head == NULL); 1144 /* 1145 * All writes for this fsync request are now 1146 * acknowledged. Now make these writes visible 1147 * and put the final request into the hash table. 1148 */ 1149 if (reqp->req_state == AIO_REQ_CANCELED) { 1150 /* EMPTY */; 1151 } else if (arg->offset == O_SYNC) { 1152 if ((retval = __fdsync(arg->fd, FSYNC)) == -1) 1153 error = errno; 1154 } else { 1155 if ((retval = __fdsync(arg->fd, FDSYNC)) == -1) 1156 error = errno; 1157 } 1158 if (_aio_hash_insert(reqp->req_resultp, reqp) != 0) 1159 aio_panic("_aio_do_request(): AIOFSYNC: " 1160 "request already in hash table"); 1161 break; 1162 default: 1163 aio_panic("_aio_do_request, bad op"); 1164 } 1165 1166 _aio_finish_request(aiowp, retval, error); 1167 } 1168 /* NOTREACHED */ 1169 return (NULL); 1170 } 1171 1172 /* 1173 * Perform the tail processing for _aio_do_request(). 1174 * The in-progress request may or may not have been cancelled. 1175 */ 1176 static void 1177 _aio_finish_request(aio_worker_t *aiowp, ssize_t retval, int error) 1178 { 1179 aio_req_t *reqp; 1180 1181 sig_mutex_lock(&aiowp->work_qlock1); 1182 if ((reqp = aiowp->work_req) == NULL) 1183 sig_mutex_unlock(&aiowp->work_qlock1); 1184 else { 1185 aiowp->work_req = NULL; 1186 if (reqp->req_state == AIO_REQ_CANCELED) { 1187 retval = -1; 1188 error = ECANCELED; 1189 } 1190 if (!POSIX_AIO(reqp)) { 1191 int notify; 1192 sig_mutex_unlock(&aiowp->work_qlock1); 1193 sig_mutex_lock(&__aio_mutex); 1194 if (reqp->req_state == AIO_REQ_INPROGRESS) 1195 reqp->req_state = AIO_REQ_DONE; 1196 /* 1197 * If it was canceled, this request will not be 1198 * added to done list. Just free it. 1199 */ 1200 if (error == ECANCELED) { 1201 _aio_outstand_cnt--; 1202 _aio_req_free(reqp); 1203 } else { 1204 _aio_set_result(reqp, retval, error); 1205 _aio_req_done_cnt++; 1206 } 1207 /* 1208 * Notify any thread that may have blocked 1209 * because it saw an outstanding request. 1210 */ 1211 notify = 0; 1212 if (_aio_outstand_cnt == 0 && _aiowait_flag) { 1213 notify = 1; 1214 } 1215 sig_mutex_unlock(&__aio_mutex); 1216 if (notify) { 1217 (void) _kaio(AIONOTIFY); 1218 } 1219 } else { 1220 if (reqp->req_state == AIO_REQ_INPROGRESS) 1221 reqp->req_state = AIO_REQ_DONE; 1222 sig_mutex_unlock(&aiowp->work_qlock1); 1223 _aiodone(reqp, retval, error); 1224 } 1225 } 1226 } 1227 1228 void 1229 _aio_req_mark_done(aio_req_t *reqp) 1230 { 1231 #if !defined(_LP64) 1232 if (reqp->req_largefile) 1233 ((aiocb64_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE; 1234 else 1235 #endif 1236 ((aiocb_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE; 1237 } 1238 1239 /* 1240 * Sleep for 'ticks' clock ticks to give somebody else a chance to run, 1241 * hopefully to consume one of our queued signals. 1242 */ 1243 static void 1244 _aio_delay(int ticks) 1245 { 1246 (void) usleep(ticks * (MICROSEC / hz)); 1247 } 1248 1249 /* 1250 * Actually send the notifications. 1251 * We could block indefinitely here if the application 1252 * is not listening for the signal or port notifications. 1253 */ 1254 static void 1255 send_notification(notif_param_t *npp) 1256 { 1257 extern int __sigqueue(pid_t pid, int signo, 1258 /* const union sigval */ void *value, int si_code, int block); 1259 1260 if (npp->np_signo) 1261 (void) __sigqueue(__pid, npp->np_signo, npp->np_user, 1262 SI_ASYNCIO, 1); 1263 else if (npp->np_port >= 0) 1264 (void) _port_dispatch(npp->np_port, 0, PORT_SOURCE_AIO, 1265 npp->np_event, npp->np_object, npp->np_user); 1266 1267 if (npp->np_lio_signo) 1268 (void) __sigqueue(__pid, npp->np_lio_signo, npp->np_lio_user, 1269 SI_ASYNCIO, 1); 1270 else if (npp->np_lio_port >= 0) 1271 (void) _port_dispatch(npp->np_lio_port, 0, PORT_SOURCE_AIO, 1272 npp->np_lio_event, npp->np_lio_object, npp->np_lio_user); 1273 } 1274 1275 /* 1276 * Asynchronous notification worker. 1277 */ 1278 void * 1279 _aio_do_notify(void *arg) 1280 { 1281 aio_worker_t *aiowp = (aio_worker_t *)arg; 1282 aio_req_t *reqp; 1283 1284 /* 1285 * This isn't really necessary. All signals are blocked. 1286 */ 1287 if (pthread_setspecific(_aio_key, aiowp) != 0) 1288 aio_panic("_aio_do_notify, pthread_setspecific()"); 1289 1290 /* 1291 * Notifications are never cancelled. 1292 * All signals remain blocked, forever. 1293 */ 1294 for (;;) { 1295 while ((reqp = _aio_req_get(aiowp)) == NULL) { 1296 if (_aio_idle(aiowp) != 0) 1297 aio_panic("_aio_do_notify: _aio_idle() failed"); 1298 } 1299 send_notification(&reqp->req_notify); 1300 _aio_req_free(reqp); 1301 } 1302 1303 /* NOTREACHED */ 1304 return (NULL); 1305 } 1306 1307 /* 1308 * Do the completion semantics for a request that was either canceled 1309 * by _aio_cancel_req() or was completed by _aio_do_request(). 1310 */ 1311 static void 1312 _aiodone(aio_req_t *reqp, ssize_t retval, int error) 1313 { 1314 aio_result_t *resultp = reqp->req_resultp; 1315 int notify = 0; 1316 aio_lio_t *head; 1317 int sigev_none; 1318 int sigev_signal; 1319 int sigev_thread; 1320 int sigev_port; 1321 notif_param_t np; 1322 1323 /* 1324 * We call _aiodone() only for Posix I/O. 1325 */ 1326 ASSERT(POSIX_AIO(reqp)); 1327 1328 sigev_none = 0; 1329 sigev_signal = 0; 1330 sigev_thread = 0; 1331 sigev_port = 0; 1332 np.np_signo = 0; 1333 np.np_port = -1; 1334 np.np_lio_signo = 0; 1335 np.np_lio_port = -1; 1336 1337 switch (reqp->req_sigevent.sigev_notify) { 1338 case SIGEV_NONE: 1339 sigev_none = 1; 1340 break; 1341 case SIGEV_SIGNAL: 1342 sigev_signal = 1; 1343 break; 1344 case SIGEV_THREAD: 1345 sigev_thread = 1; 1346 break; 1347 case SIGEV_PORT: 1348 sigev_port = 1; 1349 break; 1350 default: 1351 aio_panic("_aiodone: improper sigev_notify"); 1352 break; 1353 } 1354 1355 /* 1356 * Figure out the notification parameters while holding __aio_mutex. 1357 * Actually perform the notifications after dropping __aio_mutex. 1358 * This allows us to sleep for a long time (if the notifications 1359 * incur delays) without impeding other async I/O operations. 1360 */ 1361 1362 sig_mutex_lock(&__aio_mutex); 1363 1364 if (sigev_signal) { 1365 if ((np.np_signo = reqp->req_sigevent.sigev_signo) != 0) 1366 notify = 1; 1367 np.np_user = reqp->req_sigevent.sigev_value.sival_ptr; 1368 } else if (sigev_thread | sigev_port) { 1369 if ((np.np_port = reqp->req_sigevent.sigev_signo) >= 0) 1370 notify = 1; 1371 np.np_event = reqp->req_op; 1372 if (np.np_event == AIOFSYNC && reqp->req_largefile) 1373 np.np_event = AIOFSYNC64; 1374 np.np_object = (uintptr_t)reqp->req_aiocbp; 1375 np.np_user = reqp->req_sigevent.sigev_value.sival_ptr; 1376 } 1377 1378 if (resultp->aio_errno == EINPROGRESS) 1379 _aio_set_result(reqp, retval, error); 1380 1381 _aio_outstand_cnt--; 1382 1383 head = reqp->req_head; 1384 reqp->req_head = NULL; 1385 1386 if (sigev_none) { 1387 _aio_enq_doneq(reqp); 1388 reqp = NULL; 1389 } else { 1390 (void) _aio_hash_del(resultp); 1391 _aio_req_mark_done(reqp); 1392 } 1393 1394 _aio_waitn_wakeup(); 1395 1396 /* 1397 * __aio_waitn() sets AIO_WAIT_INPROGRESS and 1398 * __aio_suspend() increments "_aio_kernel_suspend" 1399 * when they are waiting in the kernel for completed I/Os. 1400 * 1401 * _kaio(AIONOTIFY) awakes the corresponding function 1402 * in the kernel; then the corresponding __aio_waitn() or 1403 * __aio_suspend() function could reap the recently 1404 * completed I/Os (_aiodone()). 1405 */ 1406 if ((_aio_flags & AIO_WAIT_INPROGRESS) || _aio_kernel_suspend > 0) 1407 (void) _kaio(AIONOTIFY); 1408 1409 sig_mutex_unlock(&__aio_mutex); 1410 1411 if (head != NULL) { 1412 /* 1413 * If all the lio requests have completed, 1414 * prepare to notify the waiting thread. 1415 */ 1416 sig_mutex_lock(&head->lio_mutex); 1417 ASSERT(head->lio_refcnt == head->lio_nent); 1418 if (head->lio_refcnt == 1) { 1419 int waiting = 0; 1420 if (head->lio_mode == LIO_WAIT) { 1421 if ((waiting = head->lio_waiting) != 0) 1422 (void) cond_signal(&head->lio_cond_cv); 1423 } else if (head->lio_port < 0) { /* none or signal */ 1424 if ((np.np_lio_signo = head->lio_signo) != 0) 1425 notify = 1; 1426 np.np_lio_user = head->lio_sigval.sival_ptr; 1427 } else { /* thread or port */ 1428 notify = 1; 1429 np.np_lio_port = head->lio_port; 1430 np.np_lio_event = head->lio_event; 1431 np.np_lio_object = 1432 (uintptr_t)head->lio_sigevent; 1433 np.np_lio_user = head->lio_sigval.sival_ptr; 1434 } 1435 head->lio_nent = head->lio_refcnt = 0; 1436 sig_mutex_unlock(&head->lio_mutex); 1437 if (waiting == 0) 1438 _aio_lio_free(head); 1439 } else { 1440 head->lio_nent--; 1441 head->lio_refcnt--; 1442 sig_mutex_unlock(&head->lio_mutex); 1443 } 1444 } 1445 1446 /* 1447 * The request is completed; now perform the notifications. 1448 */ 1449 if (notify) { 1450 if (reqp != NULL) { 1451 /* 1452 * We usually put the request on the notification 1453 * queue because we don't want to block and delay 1454 * other operations behind us in the work queue. 1455 * Also we must never block on a cancel notification 1456 * because we are being called from an application 1457 * thread in this case and that could lead to deadlock 1458 * if no other thread is receiving notificatins. 1459 */ 1460 reqp->req_notify = np; 1461 reqp->req_op = AIONOTIFY; 1462 _aio_req_add(reqp, &__workers_no, AIONOTIFY); 1463 reqp = NULL; 1464 } else { 1465 /* 1466 * We already put the request on the done queue, 1467 * so we can't queue it to the notification queue. 1468 * Just do the notification directly. 1469 */ 1470 send_notification(&np); 1471 } 1472 } 1473 1474 if (reqp != NULL) 1475 _aio_req_free(reqp); 1476 } 1477 1478 /* 1479 * Delete fsync requests from list head until there is 1480 * only one left. Return 0 when there is only one, 1481 * otherwise return a non-zero value. 1482 */ 1483 static int 1484 _aio_fsync_del(aio_worker_t *aiowp, aio_req_t *reqp) 1485 { 1486 aio_lio_t *head = reqp->req_head; 1487 int rval = 0; 1488 1489 ASSERT(reqp == aiowp->work_req); 1490 sig_mutex_lock(&aiowp->work_qlock1); 1491 sig_mutex_lock(&head->lio_mutex); 1492 if (head->lio_refcnt > 1) { 1493 head->lio_refcnt--; 1494 head->lio_nent--; 1495 aiowp->work_req = NULL; 1496 sig_mutex_unlock(&head->lio_mutex); 1497 sig_mutex_unlock(&aiowp->work_qlock1); 1498 sig_mutex_lock(&__aio_mutex); 1499 _aio_outstand_cnt--; 1500 _aio_waitn_wakeup(); 1501 sig_mutex_unlock(&__aio_mutex); 1502 _aio_req_free(reqp); 1503 return (1); 1504 } 1505 ASSERT(head->lio_nent == 1 && head->lio_refcnt == 1); 1506 reqp->req_head = NULL; 1507 if (head->lio_canned) 1508 reqp->req_state = AIO_REQ_CANCELED; 1509 if (head->lio_mode == LIO_DESTROY) { 1510 aiowp->work_req = NULL; 1511 rval = 1; 1512 } 1513 sig_mutex_unlock(&head->lio_mutex); 1514 sig_mutex_unlock(&aiowp->work_qlock1); 1515 head->lio_refcnt--; 1516 head->lio_nent--; 1517 _aio_lio_free(head); 1518 if (rval != 0) 1519 _aio_req_free(reqp); 1520 return (rval); 1521 } 1522 1523 /* 1524 * A worker is set idle when its work queue is empty. 1525 * The worker checks again that it has no more work 1526 * and then goes to sleep waiting for more work. 1527 */ 1528 int 1529 _aio_idle(aio_worker_t *aiowp) 1530 { 1531 int error = 0; 1532 1533 sig_mutex_lock(&aiowp->work_qlock1); 1534 if (aiowp->work_count1 == 0) { 1535 ASSERT(aiowp->work_minload1 == 0); 1536 aiowp->work_idleflg = 1; 1537 /* 1538 * A cancellation handler is not needed here. 1539 * aio worker threads are never cancelled via pthread_cancel(). 1540 */ 1541 error = sig_cond_wait(&aiowp->work_idle_cv, 1542 &aiowp->work_qlock1); 1543 /* 1544 * The idle flag is normally cleared before worker is awakened 1545 * by aio_req_add(). On error (EINTR), we clear it ourself. 1546 */ 1547 if (error) 1548 aiowp->work_idleflg = 0; 1549 } 1550 sig_mutex_unlock(&aiowp->work_qlock1); 1551 return (error); 1552 } 1553 1554 /* 1555 * A worker's completed AIO requests are placed onto a global 1556 * done queue. The application is only sent a SIGIO signal if 1557 * the process has a handler enabled and it is not waiting via 1558 * aiowait(). 1559 */ 1560 static void 1561 _aio_work_done(aio_worker_t *aiowp) 1562 { 1563 aio_req_t *reqp; 1564 1565 sig_mutex_lock(&aiowp->work_qlock1); 1566 reqp = aiowp->work_prev1; 1567 reqp->req_next = NULL; 1568 aiowp->work_done1 = 0; 1569 aiowp->work_tail1 = aiowp->work_next1; 1570 if (aiowp->work_tail1 == NULL) 1571 aiowp->work_head1 = NULL; 1572 aiowp->work_prev1 = NULL; 1573 sig_mutex_unlock(&aiowp->work_qlock1); 1574 sig_mutex_lock(&__aio_mutex); 1575 _aio_donecnt++; 1576 _aio_outstand_cnt--; 1577 _aio_req_done_cnt--; 1578 ASSERT(_aio_donecnt > 0 && 1579 _aio_outstand_cnt >= 0 && 1580 _aio_req_done_cnt >= 0); 1581 ASSERT(reqp != NULL); 1582 1583 if (_aio_done_tail == NULL) { 1584 _aio_done_head = _aio_done_tail = reqp; 1585 } else { 1586 _aio_done_head->req_next = reqp; 1587 _aio_done_head = reqp; 1588 } 1589 1590 if (_aiowait_flag) { 1591 sig_mutex_unlock(&__aio_mutex); 1592 (void) _kaio(AIONOTIFY); 1593 } else { 1594 sig_mutex_unlock(&__aio_mutex); 1595 if (_sigio_enabled) 1596 (void) kill(__pid, SIGIO); 1597 } 1598 } 1599 1600 /* 1601 * The done queue consists of AIO requests that are in either the 1602 * AIO_REQ_DONE or AIO_REQ_CANCELED state. Requests that were cancelled 1603 * are discarded. If the done queue is empty then NULL is returned. 1604 * Otherwise the address of a done aio_result_t is returned. 1605 */ 1606 aio_result_t * 1607 _aio_req_done(void) 1608 { 1609 aio_req_t *reqp; 1610 aio_result_t *resultp; 1611 1612 ASSERT(MUTEX_HELD(&__aio_mutex)); 1613 1614 if ((reqp = _aio_done_tail) != NULL) { 1615 if ((_aio_done_tail = reqp->req_next) == NULL) 1616 _aio_done_head = NULL; 1617 ASSERT(_aio_donecnt > 0); 1618 _aio_donecnt--; 1619 (void) _aio_hash_del(reqp->req_resultp); 1620 resultp = reqp->req_resultp; 1621 ASSERT(reqp->req_state == AIO_REQ_DONE); 1622 _aio_req_free(reqp); 1623 return (resultp); 1624 } 1625 /* is queue empty? */ 1626 if (reqp == NULL && _aio_outstand_cnt == 0) { 1627 return ((aio_result_t *)-1); 1628 } 1629 return (NULL); 1630 } 1631 1632 /* 1633 * Set the return and errno values for the application's use. 1634 * 1635 * For the Posix interfaces, we must set the return value first followed 1636 * by the errno value because the Posix interfaces allow for a change 1637 * in the errno value from EINPROGRESS to something else to signal 1638 * the completion of the asynchronous request. 1639 * 1640 * The opposite is true for the Solaris interfaces. These allow for 1641 * a change in the return value from AIO_INPROGRESS to something else 1642 * to signal the completion of the asynchronous request. 1643 */ 1644 void 1645 _aio_set_result(aio_req_t *reqp, ssize_t retval, int error) 1646 { 1647 aio_result_t *resultp = reqp->req_resultp; 1648 1649 if (POSIX_AIO(reqp)) { 1650 resultp->aio_return = retval; 1651 membar_producer(); 1652 resultp->aio_errno = error; 1653 } else { 1654 resultp->aio_errno = error; 1655 membar_producer(); 1656 resultp->aio_return = retval; 1657 } 1658 } 1659 1660 /* 1661 * Add an AIO request onto the next work queue. 1662 * A circular list of workers is used to choose the next worker. 1663 */ 1664 void 1665 _aio_req_add(aio_req_t *reqp, aio_worker_t **nextworker, int mode) 1666 { 1667 ulwp_t *self = curthread; 1668 aio_worker_t *aiowp; 1669 aio_worker_t *first; 1670 int load_bal_flg = 1; 1671 int found; 1672 1673 ASSERT(reqp->req_state != AIO_REQ_DONEQ); 1674 reqp->req_next = NULL; 1675 /* 1676 * Try to acquire the next worker's work queue. If it is locked, 1677 * then search the list of workers until a queue is found unlocked, 1678 * or until the list is completely traversed at which point another 1679 * worker will be created. 1680 */ 1681 sigoff(self); /* defer SIGIO */ 1682 sig_mutex_lock(&__aio_mutex); 1683 first = aiowp = *nextworker; 1684 if (mode != AIONOTIFY) 1685 _aio_outstand_cnt++; 1686 sig_mutex_unlock(&__aio_mutex); 1687 1688 switch (mode) { 1689 case AIOREAD: 1690 case AIOWRITE: 1691 case AIOAREAD: 1692 case AIOAWRITE: 1693 #if !defined(_LP64) 1694 case AIOAREAD64: 1695 case AIOAWRITE64: 1696 #endif 1697 /* try to find an idle worker */ 1698 found = 0; 1699 do { 1700 if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) { 1701 if (aiowp->work_idleflg) { 1702 found = 1; 1703 break; 1704 } 1705 sig_mutex_unlock(&aiowp->work_qlock1); 1706 } 1707 } while ((aiowp = aiowp->work_forw) != first); 1708 1709 if (found) { 1710 aiowp->work_minload1++; 1711 break; 1712 } 1713 1714 /* try to acquire some worker's queue lock */ 1715 do { 1716 if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) { 1717 found = 1; 1718 break; 1719 } 1720 } while ((aiowp = aiowp->work_forw) != first); 1721 1722 /* 1723 * Create more workers when the workers appear overloaded. 1724 * Either all the workers are busy draining their queues 1725 * or no worker's queue lock could be acquired. 1726 */ 1727 if (!found) { 1728 if (_aio_worker_cnt < _max_workers) { 1729 if (_aio_create_worker(reqp, mode)) 1730 aio_panic("_aio_req_add: add worker"); 1731 sigon(self); /* reenable SIGIO */ 1732 return; 1733 } 1734 1735 /* 1736 * No worker available and we have created 1737 * _max_workers, keep going through the 1738 * list slowly until we get a lock 1739 */ 1740 while (sig_mutex_trylock(&aiowp->work_qlock1) != 0) { 1741 /* 1742 * give someone else a chance 1743 */ 1744 _aio_delay(1); 1745 aiowp = aiowp->work_forw; 1746 } 1747 } 1748 1749 ASSERT(MUTEX_HELD(&aiowp->work_qlock1)); 1750 if (_aio_worker_cnt < _max_workers && 1751 aiowp->work_minload1 >= _minworkload) { 1752 sig_mutex_unlock(&aiowp->work_qlock1); 1753 sig_mutex_lock(&__aio_mutex); 1754 *nextworker = aiowp->work_forw; 1755 sig_mutex_unlock(&__aio_mutex); 1756 if (_aio_create_worker(reqp, mode)) 1757 aio_panic("aio_req_add: add worker"); 1758 sigon(self); /* reenable SIGIO */ 1759 return; 1760 } 1761 aiowp->work_minload1++; 1762 break; 1763 case AIOFSYNC: 1764 case AIONOTIFY: 1765 load_bal_flg = 0; 1766 sig_mutex_lock(&aiowp->work_qlock1); 1767 break; 1768 default: 1769 aio_panic("_aio_req_add: invalid mode"); 1770 break; 1771 } 1772 /* 1773 * Put request onto worker's work queue. 1774 */ 1775 if (aiowp->work_tail1 == NULL) { 1776 ASSERT(aiowp->work_count1 == 0); 1777 aiowp->work_tail1 = reqp; 1778 aiowp->work_next1 = reqp; 1779 } else { 1780 aiowp->work_head1->req_next = reqp; 1781 if (aiowp->work_next1 == NULL) 1782 aiowp->work_next1 = reqp; 1783 } 1784 reqp->req_state = AIO_REQ_QUEUED; 1785 reqp->req_worker = aiowp; 1786 aiowp->work_head1 = reqp; 1787 /* 1788 * Awaken worker if it is not currently active. 1789 */ 1790 if (aiowp->work_count1++ == 0 && aiowp->work_idleflg) { 1791 aiowp->work_idleflg = 0; 1792 (void) cond_signal(&aiowp->work_idle_cv); 1793 } 1794 sig_mutex_unlock(&aiowp->work_qlock1); 1795 1796 if (load_bal_flg) { 1797 sig_mutex_lock(&__aio_mutex); 1798 *nextworker = aiowp->work_forw; 1799 sig_mutex_unlock(&__aio_mutex); 1800 } 1801 sigon(self); /* reenable SIGIO */ 1802 } 1803 1804 /* 1805 * Get an AIO request for a specified worker. 1806 * If the work queue is empty, return NULL. 1807 */ 1808 aio_req_t * 1809 _aio_req_get(aio_worker_t *aiowp) 1810 { 1811 aio_req_t *reqp; 1812 1813 sig_mutex_lock(&aiowp->work_qlock1); 1814 if ((reqp = aiowp->work_next1) != NULL) { 1815 /* 1816 * Remove a POSIX request from the queue; the 1817 * request queue is a singularly linked list 1818 * with a previous pointer. The request is 1819 * removed by updating the previous pointer. 1820 * 1821 * Non-posix requests are left on the queue 1822 * to eventually be placed on the done queue. 1823 */ 1824 1825 if (POSIX_AIO(reqp)) { 1826 if (aiowp->work_prev1 == NULL) { 1827 aiowp->work_tail1 = reqp->req_next; 1828 if (aiowp->work_tail1 == NULL) 1829 aiowp->work_head1 = NULL; 1830 } else { 1831 aiowp->work_prev1->req_next = reqp->req_next; 1832 if (aiowp->work_head1 == reqp) 1833 aiowp->work_head1 = reqp->req_next; 1834 } 1835 1836 } else { 1837 aiowp->work_prev1 = reqp; 1838 ASSERT(aiowp->work_done1 >= 0); 1839 aiowp->work_done1++; 1840 } 1841 ASSERT(reqp != reqp->req_next); 1842 aiowp->work_next1 = reqp->req_next; 1843 ASSERT(aiowp->work_count1 >= 1); 1844 aiowp->work_count1--; 1845 switch (reqp->req_op) { 1846 case AIOREAD: 1847 case AIOWRITE: 1848 case AIOAREAD: 1849 case AIOAWRITE: 1850 #if !defined(_LP64) 1851 case AIOAREAD64: 1852 case AIOAWRITE64: 1853 #endif 1854 ASSERT(aiowp->work_minload1 > 0); 1855 aiowp->work_minload1--; 1856 break; 1857 } 1858 reqp->req_state = AIO_REQ_INPROGRESS; 1859 } 1860 aiowp->work_req = reqp; 1861 ASSERT(reqp != NULL || aiowp->work_count1 == 0); 1862 sig_mutex_unlock(&aiowp->work_qlock1); 1863 return (reqp); 1864 } 1865 1866 static void 1867 _aio_req_del(aio_worker_t *aiowp, aio_req_t *reqp, int ostate) 1868 { 1869 aio_req_t **last; 1870 aio_req_t *lastrp; 1871 aio_req_t *next; 1872 1873 ASSERT(aiowp != NULL); 1874 ASSERT(MUTEX_HELD(&aiowp->work_qlock1)); 1875 if (POSIX_AIO(reqp)) { 1876 if (ostate != AIO_REQ_QUEUED) 1877 return; 1878 } 1879 last = &aiowp->work_tail1; 1880 lastrp = aiowp->work_tail1; 1881 ASSERT(ostate == AIO_REQ_QUEUED || ostate == AIO_REQ_INPROGRESS); 1882 while ((next = *last) != NULL) { 1883 if (next == reqp) { 1884 *last = next->req_next; 1885 if (aiowp->work_next1 == next) 1886 aiowp->work_next1 = next->req_next; 1887 1888 if ((next->req_next != NULL) || 1889 (aiowp->work_done1 == 0)) { 1890 if (aiowp->work_head1 == next) 1891 aiowp->work_head1 = next->req_next; 1892 if (aiowp->work_prev1 == next) 1893 aiowp->work_prev1 = next->req_next; 1894 } else { 1895 if (aiowp->work_head1 == next) 1896 aiowp->work_head1 = lastrp; 1897 if (aiowp->work_prev1 == next) 1898 aiowp->work_prev1 = lastrp; 1899 } 1900 1901 if (ostate == AIO_REQ_QUEUED) { 1902 ASSERT(aiowp->work_count1 >= 1); 1903 aiowp->work_count1--; 1904 ASSERT(aiowp->work_minload1 >= 1); 1905 aiowp->work_minload1--; 1906 } else { 1907 ASSERT(ostate == AIO_REQ_INPROGRESS && 1908 !POSIX_AIO(reqp)); 1909 aiowp->work_done1--; 1910 } 1911 return; 1912 } 1913 last = &next->req_next; 1914 lastrp = next; 1915 } 1916 /* NOTREACHED */ 1917 } 1918 1919 static void 1920 _aio_enq_doneq(aio_req_t *reqp) 1921 { 1922 if (_aio_doneq == NULL) { 1923 _aio_doneq = reqp; 1924 reqp->req_next = reqp->req_prev = reqp; 1925 } else { 1926 reqp->req_next = _aio_doneq; 1927 reqp->req_prev = _aio_doneq->req_prev; 1928 _aio_doneq->req_prev->req_next = reqp; 1929 _aio_doneq->req_prev = reqp; 1930 } 1931 reqp->req_state = AIO_REQ_DONEQ; 1932 _aio_doneq_cnt++; 1933 } 1934 1935 /* 1936 * caller owns the _aio_mutex 1937 */ 1938 aio_req_t * 1939 _aio_req_remove(aio_req_t *reqp) 1940 { 1941 if (reqp && reqp->req_state != AIO_REQ_DONEQ) 1942 return (NULL); 1943 1944 if (reqp) { 1945 /* request in done queue */ 1946 if (_aio_doneq == reqp) 1947 _aio_doneq = reqp->req_next; 1948 if (_aio_doneq == reqp) { 1949 /* only one request on queue */ 1950 _aio_doneq = NULL; 1951 } else { 1952 aio_req_t *tmp = reqp->req_next; 1953 reqp->req_prev->req_next = tmp; 1954 tmp->req_prev = reqp->req_prev; 1955 } 1956 } else if ((reqp = _aio_doneq) != NULL) { 1957 if (reqp == reqp->req_next) { 1958 /* only one request on queue */ 1959 _aio_doneq = NULL; 1960 } else { 1961 reqp->req_prev->req_next = _aio_doneq = reqp->req_next; 1962 _aio_doneq->req_prev = reqp->req_prev; 1963 } 1964 } 1965 if (reqp) { 1966 _aio_doneq_cnt--; 1967 reqp->req_next = reqp->req_prev = reqp; 1968 reqp->req_state = AIO_REQ_DONE; 1969 } 1970 return (reqp); 1971 } 1972 1973 /* 1974 * An AIO request is identified by an aio_result_t pointer. The library 1975 * maps this aio_result_t pointer to its internal representation using a 1976 * hash table. This function adds an aio_result_t pointer to the hash table. 1977 */ 1978 static int 1979 _aio_hash_insert(aio_result_t *resultp, aio_req_t *reqp) 1980 { 1981 aio_hash_t *hashp; 1982 aio_req_t **prev; 1983 aio_req_t *next; 1984 1985 hashp = _aio_hash + AIOHASH(resultp); 1986 lmutex_lock(&hashp->hash_lock); 1987 prev = &hashp->hash_ptr; 1988 while ((next = *prev) != NULL) { 1989 if (resultp == next->req_resultp) { 1990 lmutex_unlock(&hashp->hash_lock); 1991 return (-1); 1992 } 1993 prev = &next->req_link; 1994 } 1995 *prev = reqp; 1996 ASSERT(reqp->req_link == NULL); 1997 lmutex_unlock(&hashp->hash_lock); 1998 return (0); 1999 } 2000 2001 /* 2002 * Remove an entry from the hash table. 2003 */ 2004 aio_req_t * 2005 _aio_hash_del(aio_result_t *resultp) 2006 { 2007 aio_hash_t *hashp; 2008 aio_req_t **prev; 2009 aio_req_t *next = NULL; 2010 2011 if (_aio_hash != NULL) { 2012 hashp = _aio_hash + AIOHASH(resultp); 2013 lmutex_lock(&hashp->hash_lock); 2014 prev = &hashp->hash_ptr; 2015 while ((next = *prev) != NULL) { 2016 if (resultp == next->req_resultp) { 2017 *prev = next->req_link; 2018 next->req_link = NULL; 2019 break; 2020 } 2021 prev = &next->req_link; 2022 } 2023 lmutex_unlock(&hashp->hash_lock); 2024 } 2025 return (next); 2026 } 2027 2028 /* 2029 * find an entry in the hash table 2030 */ 2031 aio_req_t * 2032 _aio_hash_find(aio_result_t *resultp) 2033 { 2034 aio_hash_t *hashp; 2035 aio_req_t **prev; 2036 aio_req_t *next = NULL; 2037 2038 if (_aio_hash != NULL) { 2039 hashp = _aio_hash + AIOHASH(resultp); 2040 lmutex_lock(&hashp->hash_lock); 2041 prev = &hashp->hash_ptr; 2042 while ((next = *prev) != NULL) { 2043 if (resultp == next->req_resultp) 2044 break; 2045 prev = &next->req_link; 2046 } 2047 lmutex_unlock(&hashp->hash_lock); 2048 } 2049 return (next); 2050 } 2051 2052 /* 2053 * AIO interface for POSIX 2054 */ 2055 int 2056 _aio_rw(aiocb_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker, 2057 int mode, int flg) 2058 { 2059 aio_req_t *reqp; 2060 aio_args_t *ap; 2061 int kerr; 2062 2063 if (aiocbp == NULL) { 2064 errno = EINVAL; 2065 return (-1); 2066 } 2067 2068 /* initialize kaio */ 2069 if (!_kaio_ok) 2070 _kaio_init(); 2071 2072 aiocbp->aio_state = NOCHECK; 2073 2074 /* 2075 * If we have been called because a list I/O 2076 * kaio() failed, we dont want to repeat the 2077 * system call 2078 */ 2079 2080 if (flg & AIO_KAIO) { 2081 /* 2082 * Try kernel aio first. 2083 * If errno is ENOTSUP/EBADFD, 2084 * fall back to the thread implementation. 2085 */ 2086 if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) { 2087 aiocbp->aio_resultp.aio_errno = EINPROGRESS; 2088 aiocbp->aio_state = CHECK; 2089 kerr = (int)_kaio(mode, aiocbp); 2090 if (kerr == 0) 2091 return (0); 2092 if (errno != ENOTSUP && errno != EBADFD) { 2093 aiocbp->aio_resultp.aio_errno = errno; 2094 aiocbp->aio_resultp.aio_return = -1; 2095 aiocbp->aio_state = NOCHECK; 2096 return (-1); 2097 } 2098 if (errno == EBADFD) 2099 SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes); 2100 } 2101 } 2102 2103 aiocbp->aio_resultp.aio_errno = EINPROGRESS; 2104 aiocbp->aio_state = USERAIO; 2105 2106 if (!__uaio_ok && __uaio_init() == -1) 2107 return (-1); 2108 2109 if ((reqp = _aio_req_alloc()) == NULL) { 2110 errno = EAGAIN; 2111 return (-1); 2112 } 2113 2114 /* 2115 * If an LIO request, add the list head to the aio request 2116 */ 2117 reqp->req_head = lio_head; 2118 reqp->req_type = AIO_POSIX_REQ; 2119 reqp->req_op = mode; 2120 reqp->req_largefile = 0; 2121 2122 if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) { 2123 reqp->req_sigevent.sigev_notify = SIGEV_NONE; 2124 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) { 2125 reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL; 2126 reqp->req_sigevent.sigev_signo = 2127 aiocbp->aio_sigevent.sigev_signo; 2128 reqp->req_sigevent.sigev_value.sival_ptr = 2129 aiocbp->aio_sigevent.sigev_value.sival_ptr; 2130 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) { 2131 port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr; 2132 reqp->req_sigevent.sigev_notify = SIGEV_PORT; 2133 /* 2134 * Reuse the sigevent structure to contain the port number 2135 * and the user value. Same for SIGEV_THREAD, below. 2136 */ 2137 reqp->req_sigevent.sigev_signo = 2138 pn->portnfy_port; 2139 reqp->req_sigevent.sigev_value.sival_ptr = 2140 pn->portnfy_user; 2141 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) { 2142 reqp->req_sigevent.sigev_notify = SIGEV_THREAD; 2143 /* 2144 * The sigevent structure contains the port number 2145 * and the user value. Same for SIGEV_PORT, above. 2146 */ 2147 reqp->req_sigevent.sigev_signo = 2148 aiocbp->aio_sigevent.sigev_signo; 2149 reqp->req_sigevent.sigev_value.sival_ptr = 2150 aiocbp->aio_sigevent.sigev_value.sival_ptr; 2151 } 2152 2153 reqp->req_resultp = &aiocbp->aio_resultp; 2154 reqp->req_aiocbp = aiocbp; 2155 ap = &reqp->req_args; 2156 ap->fd = aiocbp->aio_fildes; 2157 ap->buf = (caddr_t)aiocbp->aio_buf; 2158 ap->bufsz = aiocbp->aio_nbytes; 2159 ap->offset = aiocbp->aio_offset; 2160 2161 if ((flg & AIO_NO_DUPS) && 2162 _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) { 2163 aio_panic("_aio_rw(): request already in hash table"); 2164 _aio_req_free(reqp); 2165 errno = EINVAL; 2166 return (-1); 2167 } 2168 _aio_req_add(reqp, nextworker, mode); 2169 return (0); 2170 } 2171 2172 #if !defined(_LP64) 2173 /* 2174 * 64-bit AIO interface for POSIX 2175 */ 2176 int 2177 _aio_rw64(aiocb64_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker, 2178 int mode, int flg) 2179 { 2180 aio_req_t *reqp; 2181 aio_args_t *ap; 2182 int kerr; 2183 2184 if (aiocbp == NULL) { 2185 errno = EINVAL; 2186 return (-1); 2187 } 2188 2189 /* initialize kaio */ 2190 if (!_kaio_ok) 2191 _kaio_init(); 2192 2193 aiocbp->aio_state = NOCHECK; 2194 2195 /* 2196 * If we have been called because a list I/O 2197 * kaio() failed, we dont want to repeat the 2198 * system call 2199 */ 2200 2201 if (flg & AIO_KAIO) { 2202 /* 2203 * Try kernel aio first. 2204 * If errno is ENOTSUP/EBADFD, 2205 * fall back to the thread implementation. 2206 */ 2207 if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) { 2208 aiocbp->aio_resultp.aio_errno = EINPROGRESS; 2209 aiocbp->aio_state = CHECK; 2210 kerr = (int)_kaio(mode, aiocbp); 2211 if (kerr == 0) 2212 return (0); 2213 if (errno != ENOTSUP && errno != EBADFD) { 2214 aiocbp->aio_resultp.aio_errno = errno; 2215 aiocbp->aio_resultp.aio_return = -1; 2216 aiocbp->aio_state = NOCHECK; 2217 return (-1); 2218 } 2219 if (errno == EBADFD) 2220 SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes); 2221 } 2222 } 2223 2224 aiocbp->aio_resultp.aio_errno = EINPROGRESS; 2225 aiocbp->aio_state = USERAIO; 2226 2227 if (!__uaio_ok && __uaio_init() == -1) 2228 return (-1); 2229 2230 if ((reqp = _aio_req_alloc()) == NULL) { 2231 errno = EAGAIN; 2232 return (-1); 2233 } 2234 2235 /* 2236 * If an LIO request, add the list head to the aio request 2237 */ 2238 reqp->req_head = lio_head; 2239 reqp->req_type = AIO_POSIX_REQ; 2240 reqp->req_op = mode; 2241 reqp->req_largefile = 1; 2242 2243 if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) { 2244 reqp->req_sigevent.sigev_notify = SIGEV_NONE; 2245 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) { 2246 reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL; 2247 reqp->req_sigevent.sigev_signo = 2248 aiocbp->aio_sigevent.sigev_signo; 2249 reqp->req_sigevent.sigev_value.sival_ptr = 2250 aiocbp->aio_sigevent.sigev_value.sival_ptr; 2251 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) { 2252 port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr; 2253 reqp->req_sigevent.sigev_notify = SIGEV_PORT; 2254 reqp->req_sigevent.sigev_signo = 2255 pn->portnfy_port; 2256 reqp->req_sigevent.sigev_value.sival_ptr = 2257 pn->portnfy_user; 2258 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) { 2259 reqp->req_sigevent.sigev_notify = SIGEV_THREAD; 2260 reqp->req_sigevent.sigev_signo = 2261 aiocbp->aio_sigevent.sigev_signo; 2262 reqp->req_sigevent.sigev_value.sival_ptr = 2263 aiocbp->aio_sigevent.sigev_value.sival_ptr; 2264 } 2265 2266 reqp->req_resultp = &aiocbp->aio_resultp; 2267 reqp->req_aiocbp = aiocbp; 2268 ap = &reqp->req_args; 2269 ap->fd = aiocbp->aio_fildes; 2270 ap->buf = (caddr_t)aiocbp->aio_buf; 2271 ap->bufsz = aiocbp->aio_nbytes; 2272 ap->offset = aiocbp->aio_offset; 2273 2274 if ((flg & AIO_NO_DUPS) && 2275 _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) { 2276 aio_panic("_aio_rw64(): request already in hash table"); 2277 _aio_req_free(reqp); 2278 errno = EINVAL; 2279 return (-1); 2280 } 2281 _aio_req_add(reqp, nextworker, mode); 2282 return (0); 2283 } 2284 #endif /* !defined(_LP64) */ 2285