1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 22 /* 23 * Copyright 2008 Sun Microsystems, Inc. All rights reserved. 24 * Use is subject to license terms. 25 */ 26 27 #include "lint.h" 28 #include "thr_uberdata.h" 29 #include "libc.h" 30 #include "asyncio.h" 31 #include <atomic.h> 32 #include <sys/param.h> 33 #include <sys/file.h> 34 #include <sys/port.h> 35 36 static int _aio_hash_insert(aio_result_t *, aio_req_t *); 37 static aio_req_t *_aio_req_get(aio_worker_t *); 38 static void _aio_req_add(aio_req_t *, aio_worker_t **, int); 39 static void _aio_req_del(aio_worker_t *, aio_req_t *, int); 40 static void _aio_work_done(aio_worker_t *); 41 static void _aio_enq_doneq(aio_req_t *); 42 43 extern void _aio_lio_free(aio_lio_t *); 44 45 extern int __fcntl(int, int, ...); 46 extern int _port_dispatch(int, int, int, int, uintptr_t, void *); 47 48 static int _aio_fsync_del(aio_worker_t *, aio_req_t *); 49 static void _aiodone(aio_req_t *, ssize_t, int); 50 static void _aio_cancel_work(aio_worker_t *, int, int *, int *); 51 static void _aio_finish_request(aio_worker_t *, ssize_t, int); 52 53 /* 54 * switch for kernel async I/O 55 */ 56 int _kaio_ok = 0; /* 0 = disabled, 1 = on, -1 = error */ 57 58 /* 59 * Key for thread-specific data 60 */ 61 pthread_key_t _aio_key; 62 63 /* 64 * Array for determining whether or not a file supports kaio. 65 * Initialized in _kaio_init(). 66 */ 67 uint32_t *_kaio_supported = NULL; 68 69 /* 70 * workers for read/write requests 71 * (__aio_mutex lock protects circular linked list of workers) 72 */ 73 aio_worker_t *__workers_rw; /* circular list of AIO workers */ 74 aio_worker_t *__nextworker_rw; /* next worker in list of workers */ 75 int __rw_workerscnt; /* number of read/write workers */ 76 77 /* 78 * worker for notification requests. 79 */ 80 aio_worker_t *__workers_no; /* circular list of AIO workers */ 81 aio_worker_t *__nextworker_no; /* next worker in list of workers */ 82 int __no_workerscnt; /* number of write workers */ 83 84 aio_req_t *_aio_done_tail; /* list of done requests */ 85 aio_req_t *_aio_done_head; 86 87 mutex_t __aio_initlock = DEFAULTMUTEX; /* makes aio initialization atomic */ 88 cond_t __aio_initcv = DEFAULTCV; 89 int __aio_initbusy = 0; 90 91 mutex_t __aio_mutex = DEFAULTMUTEX; /* protects counts, and linked lists */ 92 cond_t _aio_iowait_cv = DEFAULTCV; /* wait for userland I/Os */ 93 94 pid_t __pid = (pid_t)-1; /* initialize as invalid pid */ 95 int _sigio_enabled = 0; /* when set, send SIGIO signal */ 96 97 aio_hash_t *_aio_hash; 98 99 aio_req_t *_aio_doneq; /* double linked done queue list */ 100 101 int _aio_donecnt = 0; 102 int _aio_waitncnt = 0; /* # of requests for aio_waitn */ 103 int _aio_doneq_cnt = 0; 104 int _aio_outstand_cnt = 0; /* # of outstanding requests */ 105 int _kaio_outstand_cnt = 0; /* # of outstanding kaio requests */ 106 int _aio_req_done_cnt = 0; /* req. done but not in "done queue" */ 107 int _aio_kernel_suspend = 0; /* active kernel kaio calls */ 108 int _aio_suscv_cnt = 0; /* aio_suspend calls waiting on cv's */ 109 110 int _max_workers = 256; /* max number of workers permitted */ 111 int _min_workers = 4; /* min number of workers */ 112 int _minworkload = 2; /* min number of request in q */ 113 int _aio_worker_cnt = 0; /* number of workers to do requests */ 114 int __uaio_ok = 0; /* AIO has been enabled */ 115 sigset_t _worker_set; /* worker's signal mask */ 116 117 int _aiowait_flag = 0; /* when set, aiowait() is inprogress */ 118 int _aio_flags = 0; /* see asyncio.h defines for */ 119 120 aio_worker_t *_kaiowp = NULL; /* points to kaio cleanup thread */ 121 122 int hz; /* clock ticks per second */ 123 124 static int 125 _kaio_supported_init(void) 126 { 127 void *ptr; 128 size_t size; 129 130 if (_kaio_supported != NULL) /* already initialized */ 131 return (0); 132 133 size = MAX_KAIO_FDARRAY_SIZE * sizeof (uint32_t); 134 ptr = mmap(NULL, size, PROT_READ | PROT_WRITE, 135 MAP_PRIVATE | MAP_ANON, -1, (off_t)0); 136 if (ptr == MAP_FAILED) 137 return (-1); 138 _kaio_supported = ptr; 139 return (0); 140 } 141 142 /* 143 * The aio subsystem is initialized when an AIO request is made. 144 * Constants are initialized like the max number of workers that 145 * the subsystem can create, and the minimum number of workers 146 * permitted before imposing some restrictions. Also, some 147 * workers are created. 148 */ 149 int 150 __uaio_init(void) 151 { 152 int ret = -1; 153 int i; 154 int cancel_state; 155 156 lmutex_lock(&__aio_initlock); 157 (void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state); 158 while (__aio_initbusy) 159 (void) cond_wait(&__aio_initcv, &__aio_initlock); 160 (void) pthread_setcancelstate(cancel_state, NULL); 161 if (__uaio_ok) { /* already initialized */ 162 lmutex_unlock(&__aio_initlock); 163 return (0); 164 } 165 __aio_initbusy = 1; 166 lmutex_unlock(&__aio_initlock); 167 168 hz = (int)sysconf(_SC_CLK_TCK); 169 __pid = getpid(); 170 171 setup_cancelsig(SIGAIOCANCEL); 172 173 if (_kaio_supported_init() != 0) 174 goto out; 175 176 /* 177 * Allocate and initialize the hash table. 178 * Do this only once, even if __uaio_init() is called twice. 179 */ 180 if (_aio_hash == NULL) { 181 /* LINTED pointer cast */ 182 _aio_hash = (aio_hash_t *)mmap(NULL, 183 HASHSZ * sizeof (aio_hash_t), PROT_READ | PROT_WRITE, 184 MAP_PRIVATE | MAP_ANON, -1, (off_t)0); 185 if ((void *)_aio_hash == MAP_FAILED) { 186 _aio_hash = NULL; 187 goto out; 188 } 189 for (i = 0; i < HASHSZ; i++) 190 (void) mutex_init(&_aio_hash[i].hash_lock, 191 USYNC_THREAD, NULL); 192 } 193 194 /* 195 * Initialize worker's signal mask to only catch SIGAIOCANCEL. 196 */ 197 (void) sigfillset(&_worker_set); 198 (void) sigdelset(&_worker_set, SIGAIOCANCEL); 199 200 /* 201 * Create one worker to send asynchronous notifications. 202 * Do this only once, even if __uaio_init() is called twice. 203 */ 204 if (__no_workerscnt == 0 && 205 (_aio_create_worker(NULL, AIONOTIFY) != 0)) { 206 errno = EAGAIN; 207 goto out; 208 } 209 210 /* 211 * Create the minimum number of read/write workers. 212 * And later check whether atleast one worker is created; 213 * lwp_create() calls could fail because of segkp exhaustion. 214 */ 215 for (i = 0; i < _min_workers; i++) 216 (void) _aio_create_worker(NULL, AIOREAD); 217 if (__rw_workerscnt == 0) { 218 errno = EAGAIN; 219 goto out; 220 } 221 222 ret = 0; 223 out: 224 lmutex_lock(&__aio_initlock); 225 if (ret == 0) 226 __uaio_ok = 1; 227 __aio_initbusy = 0; 228 (void) cond_broadcast(&__aio_initcv); 229 lmutex_unlock(&__aio_initlock); 230 return (ret); 231 } 232 233 /* 234 * Called from close() before actually performing the real _close(). 235 */ 236 void 237 _aio_close(int fd) 238 { 239 if (fd < 0) /* avoid cancelling everything */ 240 return; 241 /* 242 * Cancel all outstanding aio requests for this file descriptor. 243 */ 244 if (__uaio_ok) 245 (void) aiocancel_all(fd); 246 /* 247 * If we have allocated the bit array, clear the bit for this file. 248 * The next open may re-use this file descriptor and the new file 249 * may have different kaio() behaviour. 250 */ 251 if (_kaio_supported != NULL) 252 CLEAR_KAIO_SUPPORTED(fd); 253 } 254 255 /* 256 * special kaio cleanup thread sits in a loop in the 257 * kernel waiting for pending kaio requests to complete. 258 */ 259 void * 260 _kaio_cleanup_thread(void *arg) 261 { 262 if (pthread_setspecific(_aio_key, arg) != 0) 263 aio_panic("_kaio_cleanup_thread, pthread_setspecific()"); 264 (void) _kaio(AIOSTART); 265 return (arg); 266 } 267 268 /* 269 * initialize kaio. 270 */ 271 void 272 _kaio_init() 273 { 274 int error; 275 sigset_t oset; 276 int cancel_state; 277 278 lmutex_lock(&__aio_initlock); 279 (void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state); 280 while (__aio_initbusy) 281 (void) cond_wait(&__aio_initcv, &__aio_initlock); 282 (void) pthread_setcancelstate(cancel_state, NULL); 283 if (_kaio_ok) { /* already initialized */ 284 lmutex_unlock(&__aio_initlock); 285 return; 286 } 287 __aio_initbusy = 1; 288 lmutex_unlock(&__aio_initlock); 289 290 if (_kaio_supported_init() != 0) 291 error = ENOMEM; 292 else if ((_kaiowp = _aio_worker_alloc()) == NULL) 293 error = ENOMEM; 294 else if ((error = (int)_kaio(AIOINIT)) == 0) { 295 (void) pthread_sigmask(SIG_SETMASK, &maskset, &oset); 296 error = thr_create(NULL, AIOSTKSIZE, _kaio_cleanup_thread, 297 _kaiowp, THR_DAEMON, &_kaiowp->work_tid); 298 (void) pthread_sigmask(SIG_SETMASK, &oset, NULL); 299 } 300 if (error && _kaiowp != NULL) { 301 _aio_worker_free(_kaiowp); 302 _kaiowp = NULL; 303 } 304 305 lmutex_lock(&__aio_initlock); 306 if (error) 307 _kaio_ok = -1; 308 else 309 _kaio_ok = 1; 310 __aio_initbusy = 0; 311 (void) cond_broadcast(&__aio_initcv); 312 lmutex_unlock(&__aio_initlock); 313 } 314 315 int 316 aioread(int fd, caddr_t buf, int bufsz, off_t offset, int whence, 317 aio_result_t *resultp) 318 { 319 return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOREAD)); 320 } 321 322 int 323 aiowrite(int fd, caddr_t buf, int bufsz, off_t offset, int whence, 324 aio_result_t *resultp) 325 { 326 return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOWRITE)); 327 } 328 329 #if !defined(_LP64) 330 int 331 aioread64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence, 332 aio_result_t *resultp) 333 { 334 return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAREAD64)); 335 } 336 337 int 338 aiowrite64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence, 339 aio_result_t *resultp) 340 { 341 return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAWRITE64)); 342 } 343 #endif /* !defined(_LP64) */ 344 345 int 346 _aiorw(int fd, caddr_t buf, int bufsz, offset_t offset, int whence, 347 aio_result_t *resultp, int mode) 348 { 349 aio_req_t *reqp; 350 aio_args_t *ap; 351 offset_t loffset; 352 struct stat64 stat64; 353 int error = 0; 354 int kerr; 355 int umode; 356 357 switch (whence) { 358 359 case SEEK_SET: 360 loffset = offset; 361 break; 362 case SEEK_CUR: 363 if ((loffset = llseek(fd, 0, SEEK_CUR)) == -1) 364 error = -1; 365 else 366 loffset += offset; 367 break; 368 case SEEK_END: 369 if (fstat64(fd, &stat64) == -1) 370 error = -1; 371 else 372 loffset = offset + stat64.st_size; 373 break; 374 default: 375 errno = EINVAL; 376 error = -1; 377 } 378 379 if (error) 380 return (error); 381 382 /* initialize kaio */ 383 if (!_kaio_ok) 384 _kaio_init(); 385 386 /* 387 * _aio_do_request() needs the original request code (mode) to be able 388 * to choose the appropiate 32/64 bit function. All other functions 389 * only require the difference between READ and WRITE (umode). 390 */ 391 if (mode == AIOAREAD64 || mode == AIOAWRITE64) 392 umode = mode - AIOAREAD64; 393 else 394 umode = mode; 395 396 /* 397 * Try kernel aio first. 398 * If errno is ENOTSUP/EBADFD, fall back to the thread implementation. 399 */ 400 if (_kaio_ok > 0 && KAIO_SUPPORTED(fd)) { 401 resultp->aio_errno = 0; 402 sig_mutex_lock(&__aio_mutex); 403 _kaio_outstand_cnt++; 404 sig_mutex_unlock(&__aio_mutex); 405 kerr = (int)_kaio(((resultp->aio_return == AIO_INPROGRESS) ? 406 (umode | AIO_POLL_BIT) : umode), 407 fd, buf, bufsz, loffset, resultp); 408 if (kerr == 0) { 409 return (0); 410 } 411 sig_mutex_lock(&__aio_mutex); 412 _kaio_outstand_cnt--; 413 sig_mutex_unlock(&__aio_mutex); 414 if (errno != ENOTSUP && errno != EBADFD) 415 return (-1); 416 if (errno == EBADFD) 417 SET_KAIO_NOT_SUPPORTED(fd); 418 } 419 420 if (!__uaio_ok && __uaio_init() == -1) 421 return (-1); 422 423 if ((reqp = _aio_req_alloc()) == NULL) { 424 errno = EAGAIN; 425 return (-1); 426 } 427 428 /* 429 * _aio_do_request() checks reqp->req_op to differentiate 430 * between 32 and 64 bit access. 431 */ 432 reqp->req_op = mode; 433 reqp->req_resultp = resultp; 434 ap = &reqp->req_args; 435 ap->fd = fd; 436 ap->buf = buf; 437 ap->bufsz = bufsz; 438 ap->offset = loffset; 439 440 if (_aio_hash_insert(resultp, reqp) != 0) { 441 _aio_req_free(reqp); 442 errno = EINVAL; 443 return (-1); 444 } 445 /* 446 * _aio_req_add() only needs the difference between READ and 447 * WRITE to choose the right worker queue. 448 */ 449 _aio_req_add(reqp, &__nextworker_rw, umode); 450 return (0); 451 } 452 453 int 454 aiocancel(aio_result_t *resultp) 455 { 456 aio_req_t *reqp; 457 aio_worker_t *aiowp; 458 int ret; 459 int done = 0; 460 int canceled = 0; 461 462 if (!__uaio_ok) { 463 errno = EINVAL; 464 return (-1); 465 } 466 467 sig_mutex_lock(&__aio_mutex); 468 reqp = _aio_hash_find(resultp); 469 if (reqp == NULL) { 470 if (_aio_outstand_cnt == _aio_req_done_cnt) 471 errno = EINVAL; 472 else 473 errno = EACCES; 474 ret = -1; 475 } else { 476 aiowp = reqp->req_worker; 477 sig_mutex_lock(&aiowp->work_qlock1); 478 (void) _aio_cancel_req(aiowp, reqp, &canceled, &done); 479 sig_mutex_unlock(&aiowp->work_qlock1); 480 481 if (canceled) { 482 ret = 0; 483 } else { 484 if (_aio_outstand_cnt == 0 || 485 _aio_outstand_cnt == _aio_req_done_cnt) 486 errno = EINVAL; 487 else 488 errno = EACCES; 489 ret = -1; 490 } 491 } 492 sig_mutex_unlock(&__aio_mutex); 493 return (ret); 494 } 495 496 static void 497 _aiowait_cleanup(void *arg __unused) 498 { 499 sig_mutex_lock(&__aio_mutex); 500 _aiowait_flag--; 501 sig_mutex_unlock(&__aio_mutex); 502 } 503 504 /* 505 * This must be asynch safe and cancel safe 506 */ 507 aio_result_t * 508 aiowait(struct timeval *uwait) 509 { 510 aio_result_t *uresultp; 511 aio_result_t *kresultp; 512 aio_result_t *resultp; 513 int dontblock; 514 int timedwait = 0; 515 int kaio_errno = 0; 516 struct timeval twait; 517 struct timeval *wait = NULL; 518 hrtime_t hrtend; 519 hrtime_t hres; 520 521 if (uwait) { 522 /* 523 * Check for a valid specified wait time. 524 * If it is invalid, fail the call right away. 525 */ 526 if (uwait->tv_sec < 0 || uwait->tv_usec < 0 || 527 uwait->tv_usec >= MICROSEC) { 528 errno = EINVAL; 529 return ((aio_result_t *)-1); 530 } 531 532 if (uwait->tv_sec > 0 || uwait->tv_usec > 0) { 533 hrtend = gethrtime() + 534 (hrtime_t)uwait->tv_sec * NANOSEC + 535 (hrtime_t)uwait->tv_usec * (NANOSEC / MICROSEC); 536 twait = *uwait; 537 wait = &twait; 538 timedwait++; 539 } else { 540 /* polling */ 541 sig_mutex_lock(&__aio_mutex); 542 if (_kaio_outstand_cnt == 0) { 543 kresultp = (aio_result_t *)-1; 544 } else { 545 kresultp = (aio_result_t *)_kaio(AIOWAIT, 546 (struct timeval *)-1, 1); 547 if (kresultp != (aio_result_t *)-1 && 548 kresultp != NULL && 549 kresultp != (aio_result_t *)1) { 550 _kaio_outstand_cnt--; 551 sig_mutex_unlock(&__aio_mutex); 552 return (kresultp); 553 } 554 } 555 uresultp = _aio_req_done(); 556 sig_mutex_unlock(&__aio_mutex); 557 if (uresultp != NULL && 558 uresultp != (aio_result_t *)-1) { 559 return (uresultp); 560 } 561 if (uresultp == (aio_result_t *)-1 && 562 kresultp == (aio_result_t *)-1) { 563 errno = EINVAL; 564 return ((aio_result_t *)-1); 565 } else { 566 return (NULL); 567 } 568 } 569 } 570 571 for (;;) { 572 sig_mutex_lock(&__aio_mutex); 573 uresultp = _aio_req_done(); 574 if (uresultp != NULL && uresultp != (aio_result_t *)-1) { 575 sig_mutex_unlock(&__aio_mutex); 576 resultp = uresultp; 577 break; 578 } 579 _aiowait_flag++; 580 dontblock = (uresultp == (aio_result_t *)-1); 581 if (dontblock && _kaio_outstand_cnt == 0) { 582 kresultp = (aio_result_t *)-1; 583 kaio_errno = EINVAL; 584 } else { 585 sig_mutex_unlock(&__aio_mutex); 586 pthread_cleanup_push(_aiowait_cleanup, NULL); 587 _cancel_prologue(); 588 kresultp = (aio_result_t *)_kaio(AIOWAIT, 589 wait, dontblock); 590 _cancel_epilogue(); 591 pthread_cleanup_pop(0); 592 sig_mutex_lock(&__aio_mutex); 593 kaio_errno = errno; 594 } 595 _aiowait_flag--; 596 sig_mutex_unlock(&__aio_mutex); 597 if (kresultp == (aio_result_t *)1) { 598 /* aiowait() awakened by an aionotify() */ 599 continue; 600 } else if (kresultp != NULL && 601 kresultp != (aio_result_t *)-1) { 602 resultp = kresultp; 603 sig_mutex_lock(&__aio_mutex); 604 _kaio_outstand_cnt--; 605 sig_mutex_unlock(&__aio_mutex); 606 break; 607 } else if (kresultp == (aio_result_t *)-1 && 608 kaio_errno == EINVAL && 609 uresultp == (aio_result_t *)-1) { 610 errno = kaio_errno; 611 resultp = (aio_result_t *)-1; 612 break; 613 } else if (kresultp == (aio_result_t *)-1 && 614 kaio_errno == EINTR) { 615 errno = kaio_errno; 616 resultp = (aio_result_t *)-1; 617 break; 618 } else if (timedwait) { 619 hres = hrtend - gethrtime(); 620 if (hres <= 0) { 621 /* time is up; return */ 622 resultp = NULL; 623 break; 624 } else { 625 /* 626 * Some time left. Round up the remaining time 627 * in nanoseconds to microsec. Retry the call. 628 */ 629 hres += (NANOSEC / MICROSEC) - 1; 630 wait->tv_sec = hres / NANOSEC; 631 wait->tv_usec = 632 (hres % NANOSEC) / (NANOSEC / MICROSEC); 633 } 634 } else { 635 ASSERT(kresultp == NULL && uresultp == NULL); 636 resultp = NULL; 637 continue; 638 } 639 } 640 return (resultp); 641 } 642 643 /* 644 * _aio_get_timedelta calculates the remaining time and stores the result 645 * into timespec_t *wait. 646 */ 647 648 int 649 _aio_get_timedelta(timespec_t *end, timespec_t *wait) 650 { 651 int ret = 0; 652 struct timeval cur; 653 timespec_t curtime; 654 655 (void) gettimeofday(&cur, NULL); 656 curtime.tv_sec = cur.tv_sec; 657 curtime.tv_nsec = cur.tv_usec * 1000; /* convert us to ns */ 658 659 if (end->tv_sec >= curtime.tv_sec) { 660 wait->tv_sec = end->tv_sec - curtime.tv_sec; 661 if (end->tv_nsec >= curtime.tv_nsec) { 662 wait->tv_nsec = end->tv_nsec - curtime.tv_nsec; 663 if (wait->tv_sec == 0 && wait->tv_nsec == 0) 664 ret = -1; /* timer expired */ 665 } else { 666 if (end->tv_sec > curtime.tv_sec) { 667 wait->tv_sec -= 1; 668 wait->tv_nsec = NANOSEC - 669 (curtime.tv_nsec - end->tv_nsec); 670 } else { 671 ret = -1; /* timer expired */ 672 } 673 } 674 } else { 675 ret = -1; 676 } 677 return (ret); 678 } 679 680 /* 681 * If closing by file descriptor: we will simply cancel all the outstanding 682 * aio`s and return. Those aio's in question will have either noticed the 683 * cancellation notice before, during, or after initiating io. 684 */ 685 int 686 aiocancel_all(int fd) 687 { 688 aio_req_t *reqp; 689 aio_req_t **reqpp, *last; 690 aio_worker_t *first; 691 aio_worker_t *next; 692 int canceled = 0; 693 int done = 0; 694 int cancelall = 0; 695 696 sig_mutex_lock(&__aio_mutex); 697 698 if (_aio_outstand_cnt == 0) { 699 sig_mutex_unlock(&__aio_mutex); 700 return (AIO_ALLDONE); 701 } 702 703 /* 704 * Cancel requests from the read/write workers' queues. 705 */ 706 first = __nextworker_rw; 707 next = first; 708 do { 709 _aio_cancel_work(next, fd, &canceled, &done); 710 } while ((next = next->work_forw) != first); 711 712 /* 713 * finally, check if there are requests on the done queue that 714 * should be canceled. 715 */ 716 if (fd < 0) 717 cancelall = 1; 718 reqpp = &_aio_done_tail; 719 last = _aio_done_tail; 720 while ((reqp = *reqpp) != NULL) { 721 if (cancelall || reqp->req_args.fd == fd) { 722 *reqpp = reqp->req_next; 723 if (last == reqp) { 724 last = reqp->req_next; 725 } 726 if (_aio_done_head == reqp) { 727 /* this should be the last req in list */ 728 _aio_done_head = last; 729 } 730 _aio_donecnt--; 731 _aio_set_result(reqp, -1, ECANCELED); 732 (void) _aio_hash_del(reqp->req_resultp); 733 _aio_req_free(reqp); 734 } else { 735 reqpp = &reqp->req_next; 736 last = reqp; 737 } 738 } 739 740 if (cancelall) { 741 ASSERT(_aio_donecnt == 0); 742 _aio_done_head = NULL; 743 } 744 sig_mutex_unlock(&__aio_mutex); 745 746 if (canceled && done == 0) 747 return (AIO_CANCELED); 748 else if (done && canceled == 0) 749 return (AIO_ALLDONE); 750 else if ((canceled + done == 0) && KAIO_SUPPORTED(fd)) 751 return ((int)_kaio(AIOCANCEL, fd, NULL)); 752 return (AIO_NOTCANCELED); 753 } 754 755 /* 756 * Cancel requests from a given work queue. If the file descriptor 757 * parameter, fd, is non-negative, then only cancel those requests 758 * in this queue that are to this file descriptor. If the fd 759 * parameter is -1, then cancel all requests. 760 */ 761 static void 762 _aio_cancel_work(aio_worker_t *aiowp, int fd, int *canceled, int *done) 763 { 764 aio_req_t *reqp; 765 766 sig_mutex_lock(&aiowp->work_qlock1); 767 /* 768 * cancel queued requests first. 769 */ 770 reqp = aiowp->work_tail1; 771 while (reqp != NULL) { 772 if (fd < 0 || reqp->req_args.fd == fd) { 773 if (_aio_cancel_req(aiowp, reqp, canceled, done)) { 774 /* 775 * Callers locks were dropped. 776 * reqp is invalid; start traversing 777 * the list from the beginning again. 778 */ 779 reqp = aiowp->work_tail1; 780 continue; 781 } 782 } 783 reqp = reqp->req_next; 784 } 785 /* 786 * Since the queued requests have been canceled, there can 787 * only be one inprogress request that should be canceled. 788 */ 789 if ((reqp = aiowp->work_req) != NULL && 790 (fd < 0 || reqp->req_args.fd == fd)) 791 (void) _aio_cancel_req(aiowp, reqp, canceled, done); 792 sig_mutex_unlock(&aiowp->work_qlock1); 793 } 794 795 /* 796 * Cancel a request. Return 1 if the callers locks were temporarily 797 * dropped, otherwise return 0. 798 */ 799 int 800 _aio_cancel_req(aio_worker_t *aiowp, aio_req_t *reqp, int *canceled, int *done) 801 { 802 int ostate = reqp->req_state; 803 804 ASSERT(MUTEX_HELD(&__aio_mutex)); 805 ASSERT(MUTEX_HELD(&aiowp->work_qlock1)); 806 if (ostate == AIO_REQ_CANCELED) 807 return (0); 808 if (ostate == AIO_REQ_DONE && !POSIX_AIO(reqp) && 809 aiowp->work_prev1 == reqp) { 810 ASSERT(aiowp->work_done1 != 0); 811 /* 812 * If not on the done queue yet, just mark it CANCELED, 813 * _aio_work_done() will do the necessary clean up. 814 * This is required to ensure that aiocancel_all() cancels 815 * all the outstanding requests, including this one which 816 * is not yet on done queue but has been marked done. 817 */ 818 _aio_set_result(reqp, -1, ECANCELED); 819 (void) _aio_hash_del(reqp->req_resultp); 820 reqp->req_state = AIO_REQ_CANCELED; 821 (*canceled)++; 822 return (0); 823 } 824 825 if (ostate == AIO_REQ_DONE || ostate == AIO_REQ_DONEQ) { 826 (*done)++; 827 return (0); 828 } 829 if (reqp->req_op == AIOFSYNC && reqp != aiowp->work_req) { 830 ASSERT(POSIX_AIO(reqp)); 831 /* Cancel the queued aio_fsync() request */ 832 if (!reqp->req_head->lio_canned) { 833 reqp->req_head->lio_canned = 1; 834 _aio_outstand_cnt--; 835 (*canceled)++; 836 } 837 return (0); 838 } 839 reqp->req_state = AIO_REQ_CANCELED; 840 _aio_req_del(aiowp, reqp, ostate); 841 (void) _aio_hash_del(reqp->req_resultp); 842 (*canceled)++; 843 if (reqp == aiowp->work_req) { 844 ASSERT(ostate == AIO_REQ_INPROGRESS); 845 /* 846 * Set the result values now, before _aiodone() is called. 847 * We do this because the application can expect aio_return 848 * and aio_errno to be set to -1 and ECANCELED, respectively, 849 * immediately after a successful return from aiocancel() 850 * or aio_cancel(). 851 */ 852 _aio_set_result(reqp, -1, ECANCELED); 853 (void) thr_kill(aiowp->work_tid, SIGAIOCANCEL); 854 return (0); 855 } 856 if (!POSIX_AIO(reqp)) { 857 _aio_outstand_cnt--; 858 _aio_set_result(reqp, -1, ECANCELED); 859 _aio_req_free(reqp); 860 return (0); 861 } 862 sig_mutex_unlock(&aiowp->work_qlock1); 863 sig_mutex_unlock(&__aio_mutex); 864 _aiodone(reqp, -1, ECANCELED); 865 sig_mutex_lock(&__aio_mutex); 866 sig_mutex_lock(&aiowp->work_qlock1); 867 return (1); 868 } 869 870 int 871 _aio_create_worker(aio_req_t *reqp, int mode) 872 { 873 aio_worker_t *aiowp, **workers, **nextworker; 874 int *aio_workerscnt; 875 void *(*func)(void *); 876 sigset_t oset; 877 int error; 878 879 /* 880 * Put the new worker thread in the right queue. 881 */ 882 switch (mode) { 883 case AIOREAD: 884 case AIOWRITE: 885 case AIOAREAD: 886 case AIOAWRITE: 887 #if !defined(_LP64) 888 case AIOAREAD64: 889 case AIOAWRITE64: 890 #endif 891 workers = &__workers_rw; 892 nextworker = &__nextworker_rw; 893 aio_workerscnt = &__rw_workerscnt; 894 func = _aio_do_request; 895 break; 896 case AIONOTIFY: 897 workers = &__workers_no; 898 nextworker = &__nextworker_no; 899 func = _aio_do_notify; 900 aio_workerscnt = &__no_workerscnt; 901 break; 902 default: 903 aio_panic("_aio_create_worker: invalid mode"); 904 break; 905 } 906 907 if ((aiowp = _aio_worker_alloc()) == NULL) 908 return (-1); 909 910 if (reqp) { 911 reqp->req_state = AIO_REQ_QUEUED; 912 reqp->req_worker = aiowp; 913 aiowp->work_head1 = reqp; 914 aiowp->work_tail1 = reqp; 915 aiowp->work_next1 = reqp; 916 aiowp->work_count1 = 1; 917 aiowp->work_minload1 = 1; 918 } 919 920 (void) pthread_sigmask(SIG_SETMASK, &maskset, &oset); 921 error = thr_create(NULL, AIOSTKSIZE, func, aiowp, 922 THR_DAEMON | THR_SUSPENDED, &aiowp->work_tid); 923 (void) pthread_sigmask(SIG_SETMASK, &oset, NULL); 924 if (error) { 925 if (reqp) { 926 reqp->req_state = 0; 927 reqp->req_worker = NULL; 928 } 929 _aio_worker_free(aiowp); 930 return (-1); 931 } 932 933 lmutex_lock(&__aio_mutex); 934 (*aio_workerscnt)++; 935 if (*workers == NULL) { 936 aiowp->work_forw = aiowp; 937 aiowp->work_backw = aiowp; 938 *nextworker = aiowp; 939 *workers = aiowp; 940 } else { 941 aiowp->work_backw = (*workers)->work_backw; 942 aiowp->work_forw = (*workers); 943 (*workers)->work_backw->work_forw = aiowp; 944 (*workers)->work_backw = aiowp; 945 } 946 _aio_worker_cnt++; 947 lmutex_unlock(&__aio_mutex); 948 949 (void) thr_continue(aiowp->work_tid); 950 951 return (0); 952 } 953 954 /* 955 * This is the worker's main routine. 956 * The task of this function is to execute all queued requests; 957 * once the last pending request is executed this function will block 958 * in _aio_idle(). A new incoming request must wakeup this thread to 959 * restart the work. 960 * Every worker has an own work queue. The queue lock is required 961 * to synchronize the addition of new requests for this worker or 962 * cancellation of pending/running requests. 963 * 964 * Cancellation scenarios: 965 * The cancellation of a request is being done asynchronously using 966 * _aio_cancel_req() from another thread context. 967 * A queued request can be cancelled in different manners : 968 * a) request is queued but not "in progress" or "done" (AIO_REQ_QUEUED): 969 * - lock the queue -> remove the request -> unlock the queue 970 * - this function/thread does not detect this cancellation process 971 * b) request is in progress (AIO_REQ_INPROGRESS) : 972 * - this function first allow the cancellation of the running 973 * request with the flag "work_cancel_flg=1" 974 * see _aio_req_get() -> _aio_cancel_on() 975 * During this phase, it is allowed to interrupt the worker 976 * thread running the request (this thread) using the SIGAIOCANCEL 977 * signal. 978 * Once this thread returns from the kernel (because the request 979 * is just done), then it must disable a possible cancellation 980 * and proceed to finish the request. To disable the cancellation 981 * this thread must use _aio_cancel_off() to set "work_cancel_flg=0". 982 * c) request is already done (AIO_REQ_DONE || AIO_REQ_DONEQ): 983 * same procedure as in a) 984 * 985 * To b) 986 * This thread uses sigsetjmp() to define the position in the code, where 987 * it wish to continue working in the case that a SIGAIOCANCEL signal 988 * is detected. 989 * Normally this thread should get the cancellation signal during the 990 * kernel phase (reading or writing). In that case the signal handler 991 * aiosigcancelhndlr() is activated using the worker thread context, 992 * which again will use the siglongjmp() function to break the standard 993 * code flow and jump to the "sigsetjmp" position, provided that 994 * "work_cancel_flg" is set to "1". 995 * Because the "work_cancel_flg" is only manipulated by this worker 996 * thread and it can only run on one CPU at a given time, it is not 997 * necessary to protect that flag with the queue lock. 998 * Returning from the kernel (read or write system call) we must 999 * first disable the use of the SIGAIOCANCEL signal and accordingly 1000 * the use of the siglongjmp() function to prevent a possible deadlock: 1001 * - It can happens that this worker thread returns from the kernel and 1002 * blocks in "work_qlock1", 1003 * - then a second thread cancels the apparently "in progress" request 1004 * and sends the SIGAIOCANCEL signal to the worker thread, 1005 * - the worker thread gets assigned the "work_qlock1" and will returns 1006 * from the kernel, 1007 * - the kernel detects the pending signal and activates the signal 1008 * handler instead, 1009 * - if the "work_cancel_flg" is still set then the signal handler 1010 * should use siglongjmp() to cancel the "in progress" request and 1011 * it would try to acquire the same work_qlock1 in _aio_req_get() 1012 * for a second time => deadlock. 1013 * To avoid that situation we disable the cancellation of the request 1014 * in progress BEFORE we try to acquire the work_qlock1. 1015 * In that case the signal handler will not call siglongjmp() and the 1016 * worker thread will continue running the standard code flow. 1017 * Then this thread must check the AIO_REQ_CANCELED flag to emulate 1018 * an eventually required siglongjmp() freeing the work_qlock1 and 1019 * avoiding a deadlock. 1020 */ 1021 void * 1022 _aio_do_request(void *arglist) 1023 { 1024 aio_worker_t *aiowp = (aio_worker_t *)arglist; 1025 ulwp_t *self = curthread; 1026 struct aio_args *arg; 1027 aio_req_t *reqp; /* current AIO request */ 1028 ssize_t retval; 1029 int append; 1030 int error; 1031 1032 if (pthread_setspecific(_aio_key, aiowp) != 0) 1033 aio_panic("_aio_do_request, pthread_setspecific()"); 1034 (void) pthread_sigmask(SIG_SETMASK, &_worker_set, NULL); 1035 ASSERT(aiowp->work_req == NULL); 1036 1037 /* 1038 * We resume here when an operation is cancelled. 1039 * On first entry, aiowp->work_req == NULL, so all 1040 * we do is block SIGAIOCANCEL. 1041 */ 1042 (void) sigsetjmp(aiowp->work_jmp_buf, 0); 1043 ASSERT(self->ul_sigdefer == 0); 1044 1045 sigoff(self); /* block SIGAIOCANCEL */ 1046 if (aiowp->work_req != NULL) 1047 _aio_finish_request(aiowp, -1, ECANCELED); 1048 1049 for (;;) { 1050 /* 1051 * Put completed requests on aio_done_list. This has 1052 * to be done as part of the main loop to ensure that 1053 * we don't artificially starve any aiowait'ers. 1054 */ 1055 if (aiowp->work_done1) 1056 _aio_work_done(aiowp); 1057 1058 top: 1059 /* consume any deferred SIGAIOCANCEL signal here */ 1060 sigon(self); 1061 sigoff(self); 1062 1063 while ((reqp = _aio_req_get(aiowp)) == NULL) { 1064 if (_aio_idle(aiowp) != 0) 1065 goto top; 1066 } 1067 arg = &reqp->req_args; 1068 ASSERT(reqp->req_state == AIO_REQ_INPROGRESS || 1069 reqp->req_state == AIO_REQ_CANCELED); 1070 error = 0; 1071 1072 switch (reqp->req_op) { 1073 case AIOREAD: 1074 case AIOAREAD: 1075 sigon(self); /* unblock SIGAIOCANCEL */ 1076 retval = pread(arg->fd, arg->buf, 1077 arg->bufsz, arg->offset); 1078 if (retval == -1) { 1079 if (errno == ESPIPE) { 1080 retval = read(arg->fd, 1081 arg->buf, arg->bufsz); 1082 if (retval == -1) 1083 error = errno; 1084 } else { 1085 error = errno; 1086 } 1087 } 1088 sigoff(self); /* block SIGAIOCANCEL */ 1089 break; 1090 case AIOWRITE: 1091 case AIOAWRITE: 1092 /* 1093 * The SUSv3 POSIX spec for aio_write() states: 1094 * If O_APPEND is set for the file descriptor, 1095 * write operations append to the file in the 1096 * same order as the calls were made. 1097 * but, somewhat inconsistently, it requires pwrite() 1098 * to ignore the O_APPEND setting. So we have to use 1099 * fcntl() to get the open modes and call write() for 1100 * the O_APPEND case. 1101 */ 1102 append = (__fcntl(arg->fd, F_GETFL) & O_APPEND); 1103 sigon(self); /* unblock SIGAIOCANCEL */ 1104 retval = append? 1105 write(arg->fd, arg->buf, arg->bufsz) : 1106 pwrite(arg->fd, arg->buf, arg->bufsz, 1107 arg->offset); 1108 if (retval == -1) { 1109 if (errno == ESPIPE) { 1110 retval = write(arg->fd, 1111 arg->buf, arg->bufsz); 1112 if (retval == -1) 1113 error = errno; 1114 } else { 1115 error = errno; 1116 } 1117 } 1118 sigoff(self); /* block SIGAIOCANCEL */ 1119 break; 1120 #if !defined(_LP64) 1121 case AIOAREAD64: 1122 sigon(self); /* unblock SIGAIOCANCEL */ 1123 retval = pread64(arg->fd, arg->buf, 1124 arg->bufsz, arg->offset); 1125 if (retval == -1) { 1126 if (errno == ESPIPE) { 1127 retval = read(arg->fd, 1128 arg->buf, arg->bufsz); 1129 if (retval == -1) 1130 error = errno; 1131 } else { 1132 error = errno; 1133 } 1134 } 1135 sigoff(self); /* block SIGAIOCANCEL */ 1136 break; 1137 case AIOAWRITE64: 1138 /* 1139 * The SUSv3 POSIX spec for aio_write() states: 1140 * If O_APPEND is set for the file descriptor, 1141 * write operations append to the file in the 1142 * same order as the calls were made. 1143 * but, somewhat inconsistently, it requires pwrite() 1144 * to ignore the O_APPEND setting. So we have to use 1145 * fcntl() to get the open modes and call write() for 1146 * the O_APPEND case. 1147 */ 1148 append = (__fcntl(arg->fd, F_GETFL) & O_APPEND); 1149 sigon(self); /* unblock SIGAIOCANCEL */ 1150 retval = append? 1151 write(arg->fd, arg->buf, arg->bufsz) : 1152 pwrite64(arg->fd, arg->buf, arg->bufsz, 1153 arg->offset); 1154 if (retval == -1) { 1155 if (errno == ESPIPE) { 1156 retval = write(arg->fd, 1157 arg->buf, arg->bufsz); 1158 if (retval == -1) 1159 error = errno; 1160 } else { 1161 error = errno; 1162 } 1163 } 1164 sigoff(self); /* block SIGAIOCANCEL */ 1165 break; 1166 #endif /* !defined(_LP64) */ 1167 case AIOFSYNC: 1168 if (_aio_fsync_del(aiowp, reqp)) 1169 goto top; 1170 ASSERT(reqp->req_head == NULL); 1171 /* 1172 * All writes for this fsync request are now 1173 * acknowledged. Now make these writes visible 1174 * and put the final request into the hash table. 1175 */ 1176 if (reqp->req_state == AIO_REQ_CANCELED) { 1177 /* EMPTY */; 1178 } else if (arg->offset == O_SYNC) { 1179 if ((retval = __fdsync(arg->fd, FDSYNC_FILE)) == 1180 -1) { 1181 error = errno; 1182 } 1183 } else { 1184 if ((retval = __fdsync(arg->fd, FDSYNC_DATA)) == 1185 -1) { 1186 error = errno; 1187 } 1188 } 1189 if (_aio_hash_insert(reqp->req_resultp, reqp) != 0) 1190 aio_panic("_aio_do_request(): AIOFSYNC: " 1191 "request already in hash table"); 1192 break; 1193 default: 1194 aio_panic("_aio_do_request, bad op"); 1195 } 1196 1197 _aio_finish_request(aiowp, retval, error); 1198 } 1199 /* NOTREACHED */ 1200 return (NULL); 1201 } 1202 1203 /* 1204 * Perform the tail processing for _aio_do_request(). 1205 * The in-progress request may or may not have been cancelled. 1206 */ 1207 static void 1208 _aio_finish_request(aio_worker_t *aiowp, ssize_t retval, int error) 1209 { 1210 aio_req_t *reqp; 1211 1212 sig_mutex_lock(&aiowp->work_qlock1); 1213 if ((reqp = aiowp->work_req) == NULL) 1214 sig_mutex_unlock(&aiowp->work_qlock1); 1215 else { 1216 aiowp->work_req = NULL; 1217 if (reqp->req_state == AIO_REQ_CANCELED) { 1218 retval = -1; 1219 error = ECANCELED; 1220 } 1221 if (!POSIX_AIO(reqp)) { 1222 int notify; 1223 if (reqp->req_state == AIO_REQ_INPROGRESS) { 1224 reqp->req_state = AIO_REQ_DONE; 1225 _aio_set_result(reqp, retval, error); 1226 } 1227 sig_mutex_unlock(&aiowp->work_qlock1); 1228 sig_mutex_lock(&__aio_mutex); 1229 /* 1230 * If it was canceled, this request will not be 1231 * added to done list. Just free it. 1232 */ 1233 if (error == ECANCELED) { 1234 _aio_outstand_cnt--; 1235 _aio_req_free(reqp); 1236 } else { 1237 _aio_req_done_cnt++; 1238 } 1239 /* 1240 * Notify any thread that may have blocked 1241 * because it saw an outstanding request. 1242 */ 1243 notify = 0; 1244 if (_aio_outstand_cnt == 0 && _aiowait_flag) { 1245 notify = 1; 1246 } 1247 sig_mutex_unlock(&__aio_mutex); 1248 if (notify) { 1249 (void) _kaio(AIONOTIFY); 1250 } 1251 } else { 1252 if (reqp->req_state == AIO_REQ_INPROGRESS) 1253 reqp->req_state = AIO_REQ_DONE; 1254 sig_mutex_unlock(&aiowp->work_qlock1); 1255 _aiodone(reqp, retval, error); 1256 } 1257 } 1258 } 1259 1260 void 1261 _aio_req_mark_done(aio_req_t *reqp) 1262 { 1263 #if !defined(_LP64) 1264 if (reqp->req_largefile) 1265 ((aiocb64_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE; 1266 else 1267 #endif 1268 ((aiocb_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE; 1269 } 1270 1271 /* 1272 * Sleep for 'ticks' clock ticks to give somebody else a chance to run, 1273 * hopefully to consume one of our queued signals. 1274 */ 1275 static void 1276 _aio_delay(int ticks) 1277 { 1278 (void) usleep(ticks * (MICROSEC / hz)); 1279 } 1280 1281 /* 1282 * Actually send the notifications. 1283 * We could block indefinitely here if the application 1284 * is not listening for the signal or port notifications. 1285 */ 1286 static void 1287 send_notification(notif_param_t *npp) 1288 { 1289 extern int __sigqueue(pid_t pid, int signo, 1290 /* const union sigval */ void *value, int si_code, int block); 1291 1292 if (npp->np_signo) 1293 (void) __sigqueue(__pid, npp->np_signo, npp->np_user, 1294 SI_ASYNCIO, 1); 1295 else if (npp->np_port >= 0) 1296 (void) _port_dispatch(npp->np_port, 0, PORT_SOURCE_AIO, 1297 npp->np_event, npp->np_object, npp->np_user); 1298 1299 if (npp->np_lio_signo) 1300 (void) __sigqueue(__pid, npp->np_lio_signo, npp->np_lio_user, 1301 SI_ASYNCIO, 1); 1302 else if (npp->np_lio_port >= 0) 1303 (void) _port_dispatch(npp->np_lio_port, 0, PORT_SOURCE_AIO, 1304 npp->np_lio_event, npp->np_lio_object, npp->np_lio_user); 1305 } 1306 1307 /* 1308 * Asynchronous notification worker. 1309 */ 1310 void * 1311 _aio_do_notify(void *arg) 1312 { 1313 aio_worker_t *aiowp = (aio_worker_t *)arg; 1314 aio_req_t *reqp; 1315 1316 /* 1317 * This isn't really necessary. All signals are blocked. 1318 */ 1319 if (pthread_setspecific(_aio_key, aiowp) != 0) 1320 aio_panic("_aio_do_notify, pthread_setspecific()"); 1321 1322 /* 1323 * Notifications are never cancelled. 1324 * All signals remain blocked, forever. 1325 */ 1326 for (;;) { 1327 while ((reqp = _aio_req_get(aiowp)) == NULL) { 1328 if (_aio_idle(aiowp) != 0) 1329 aio_panic("_aio_do_notify: _aio_idle() failed"); 1330 } 1331 send_notification(&reqp->req_notify); 1332 _aio_req_free(reqp); 1333 } 1334 1335 /* NOTREACHED */ 1336 return (NULL); 1337 } 1338 1339 /* 1340 * Do the completion semantics for a request that was either canceled 1341 * by _aio_cancel_req() or was completed by _aio_do_request(). 1342 */ 1343 static void 1344 _aiodone(aio_req_t *reqp, ssize_t retval, int error) 1345 { 1346 aio_result_t *resultp = reqp->req_resultp; 1347 int notify = 0; 1348 aio_lio_t *head; 1349 int sigev_none; 1350 int sigev_signal; 1351 int sigev_thread; 1352 int sigev_port; 1353 notif_param_t np; 1354 1355 /* 1356 * We call _aiodone() only for Posix I/O. 1357 */ 1358 ASSERT(POSIX_AIO(reqp)); 1359 1360 sigev_none = 0; 1361 sigev_signal = 0; 1362 sigev_thread = 0; 1363 sigev_port = 0; 1364 np.np_signo = 0; 1365 np.np_port = -1; 1366 np.np_lio_signo = 0; 1367 np.np_lio_port = -1; 1368 1369 switch (reqp->req_sigevent.sigev_notify) { 1370 case SIGEV_NONE: 1371 sigev_none = 1; 1372 break; 1373 case SIGEV_SIGNAL: 1374 sigev_signal = 1; 1375 break; 1376 case SIGEV_THREAD: 1377 sigev_thread = 1; 1378 break; 1379 case SIGEV_PORT: 1380 sigev_port = 1; 1381 break; 1382 default: 1383 aio_panic("_aiodone: improper sigev_notify"); 1384 break; 1385 } 1386 1387 /* 1388 * Figure out the notification parameters while holding __aio_mutex. 1389 * Actually perform the notifications after dropping __aio_mutex. 1390 * This allows us to sleep for a long time (if the notifications 1391 * incur delays) without impeding other async I/O operations. 1392 */ 1393 1394 sig_mutex_lock(&__aio_mutex); 1395 1396 if (sigev_signal) { 1397 if ((np.np_signo = reqp->req_sigevent.sigev_signo) != 0) 1398 notify = 1; 1399 np.np_user = reqp->req_sigevent.sigev_value.sival_ptr; 1400 } else if (sigev_thread | sigev_port) { 1401 if ((np.np_port = reqp->req_sigevent.sigev_signo) >= 0) 1402 notify = 1; 1403 np.np_event = reqp->req_op; 1404 if (np.np_event == AIOFSYNC && reqp->req_largefile) 1405 np.np_event = AIOFSYNC64; 1406 np.np_object = (uintptr_t)reqp->req_aiocbp; 1407 np.np_user = reqp->req_sigevent.sigev_value.sival_ptr; 1408 } 1409 1410 if (resultp->aio_errno == EINPROGRESS) 1411 _aio_set_result(reqp, retval, error); 1412 1413 _aio_outstand_cnt--; 1414 1415 head = reqp->req_head; 1416 reqp->req_head = NULL; 1417 1418 if (sigev_none) { 1419 _aio_enq_doneq(reqp); 1420 reqp = NULL; 1421 } else { 1422 (void) _aio_hash_del(resultp); 1423 _aio_req_mark_done(reqp); 1424 } 1425 1426 _aio_waitn_wakeup(); 1427 1428 /* 1429 * __aio_waitn() sets AIO_WAIT_INPROGRESS and 1430 * __aio_suspend() increments "_aio_kernel_suspend" 1431 * when they are waiting in the kernel for completed I/Os. 1432 * 1433 * _kaio(AIONOTIFY) awakes the corresponding function 1434 * in the kernel; then the corresponding __aio_waitn() or 1435 * __aio_suspend() function could reap the recently 1436 * completed I/Os (_aiodone()). 1437 */ 1438 if ((_aio_flags & AIO_WAIT_INPROGRESS) || _aio_kernel_suspend > 0) 1439 (void) _kaio(AIONOTIFY); 1440 1441 sig_mutex_unlock(&__aio_mutex); 1442 1443 if (head != NULL) { 1444 /* 1445 * If all the lio requests have completed, 1446 * prepare to notify the waiting thread. 1447 */ 1448 sig_mutex_lock(&head->lio_mutex); 1449 ASSERT(head->lio_refcnt == head->lio_nent); 1450 if (head->lio_refcnt == 1) { 1451 int waiting = 0; 1452 if (head->lio_mode == LIO_WAIT) { 1453 if ((waiting = head->lio_waiting) != 0) 1454 (void) cond_signal(&head->lio_cond_cv); 1455 } else if (head->lio_port < 0) { /* none or signal */ 1456 if ((np.np_lio_signo = head->lio_signo) != 0) 1457 notify = 1; 1458 np.np_lio_user = head->lio_sigval.sival_ptr; 1459 } else { /* thread or port */ 1460 notify = 1; 1461 np.np_lio_port = head->lio_port; 1462 np.np_lio_event = head->lio_event; 1463 np.np_lio_object = 1464 (uintptr_t)head->lio_sigevent; 1465 np.np_lio_user = head->lio_sigval.sival_ptr; 1466 } 1467 head->lio_nent = head->lio_refcnt = 0; 1468 sig_mutex_unlock(&head->lio_mutex); 1469 if (waiting == 0) 1470 _aio_lio_free(head); 1471 } else { 1472 head->lio_nent--; 1473 head->lio_refcnt--; 1474 sig_mutex_unlock(&head->lio_mutex); 1475 } 1476 } 1477 1478 /* 1479 * The request is completed; now perform the notifications. 1480 */ 1481 if (notify) { 1482 if (reqp != NULL) { 1483 /* 1484 * We usually put the request on the notification 1485 * queue because we don't want to block and delay 1486 * other operations behind us in the work queue. 1487 * Also we must never block on a cancel notification 1488 * because we are being called from an application 1489 * thread in this case and that could lead to deadlock 1490 * if no other thread is receiving notificatins. 1491 */ 1492 reqp->req_notify = np; 1493 reqp->req_op = AIONOTIFY; 1494 _aio_req_add(reqp, &__workers_no, AIONOTIFY); 1495 reqp = NULL; 1496 } else { 1497 /* 1498 * We already put the request on the done queue, 1499 * so we can't queue it to the notification queue. 1500 * Just do the notification directly. 1501 */ 1502 send_notification(&np); 1503 } 1504 } 1505 1506 if (reqp != NULL) 1507 _aio_req_free(reqp); 1508 } 1509 1510 /* 1511 * Delete fsync requests from list head until there is 1512 * only one left. Return 0 when there is only one, 1513 * otherwise return a non-zero value. 1514 */ 1515 static int 1516 _aio_fsync_del(aio_worker_t *aiowp, aio_req_t *reqp) 1517 { 1518 aio_lio_t *head = reqp->req_head; 1519 int rval = 0; 1520 1521 ASSERT(reqp == aiowp->work_req); 1522 sig_mutex_lock(&aiowp->work_qlock1); 1523 sig_mutex_lock(&head->lio_mutex); 1524 if (head->lio_refcnt > 1) { 1525 head->lio_refcnt--; 1526 head->lio_nent--; 1527 aiowp->work_req = NULL; 1528 sig_mutex_unlock(&head->lio_mutex); 1529 sig_mutex_unlock(&aiowp->work_qlock1); 1530 sig_mutex_lock(&__aio_mutex); 1531 _aio_outstand_cnt--; 1532 _aio_waitn_wakeup(); 1533 sig_mutex_unlock(&__aio_mutex); 1534 _aio_req_free(reqp); 1535 return (1); 1536 } 1537 ASSERT(head->lio_nent == 1 && head->lio_refcnt == 1); 1538 reqp->req_head = NULL; 1539 if (head->lio_canned) 1540 reqp->req_state = AIO_REQ_CANCELED; 1541 if (head->lio_mode == LIO_DESTROY) { 1542 aiowp->work_req = NULL; 1543 rval = 1; 1544 } 1545 sig_mutex_unlock(&head->lio_mutex); 1546 sig_mutex_unlock(&aiowp->work_qlock1); 1547 head->lio_refcnt--; 1548 head->lio_nent--; 1549 _aio_lio_free(head); 1550 if (rval != 0) 1551 _aio_req_free(reqp); 1552 return (rval); 1553 } 1554 1555 /* 1556 * A worker is set idle when its work queue is empty. 1557 * The worker checks again that it has no more work 1558 * and then goes to sleep waiting for more work. 1559 */ 1560 int 1561 _aio_idle(aio_worker_t *aiowp) 1562 { 1563 int error = 0; 1564 1565 sig_mutex_lock(&aiowp->work_qlock1); 1566 if (aiowp->work_count1 == 0) { 1567 ASSERT(aiowp->work_minload1 == 0); 1568 aiowp->work_idleflg = 1; 1569 /* 1570 * A cancellation handler is not needed here. 1571 * aio worker threads are never cancelled via pthread_cancel(). 1572 */ 1573 error = sig_cond_wait(&aiowp->work_idle_cv, 1574 &aiowp->work_qlock1); 1575 /* 1576 * The idle flag is normally cleared before worker is awakened 1577 * by aio_req_add(). On error (EINTR), we clear it ourself. 1578 */ 1579 if (error) 1580 aiowp->work_idleflg = 0; 1581 } 1582 sig_mutex_unlock(&aiowp->work_qlock1); 1583 return (error); 1584 } 1585 1586 /* 1587 * A worker's completed AIO requests are placed onto a global 1588 * done queue. The application is only sent a SIGIO signal if 1589 * the process has a handler enabled and it is not waiting via 1590 * aiowait(). 1591 */ 1592 static void 1593 _aio_work_done(aio_worker_t *aiowp) 1594 { 1595 aio_req_t *reqp; 1596 1597 sig_mutex_lock(&__aio_mutex); 1598 sig_mutex_lock(&aiowp->work_qlock1); 1599 reqp = aiowp->work_prev1; 1600 reqp->req_next = NULL; 1601 aiowp->work_done1 = 0; 1602 aiowp->work_tail1 = aiowp->work_next1; 1603 if (aiowp->work_tail1 == NULL) 1604 aiowp->work_head1 = NULL; 1605 aiowp->work_prev1 = NULL; 1606 _aio_outstand_cnt--; 1607 _aio_req_done_cnt--; 1608 if (reqp->req_state == AIO_REQ_CANCELED) { 1609 /* 1610 * Request got cancelled after it was marked done. This can 1611 * happen because _aio_finish_request() marks it AIO_REQ_DONE 1612 * and drops all locks. Don't add the request to the done 1613 * queue and just discard it. 1614 */ 1615 sig_mutex_unlock(&aiowp->work_qlock1); 1616 _aio_req_free(reqp); 1617 if (_aio_outstand_cnt == 0 && _aiowait_flag) { 1618 sig_mutex_unlock(&__aio_mutex); 1619 (void) _kaio(AIONOTIFY); 1620 } else { 1621 sig_mutex_unlock(&__aio_mutex); 1622 } 1623 return; 1624 } 1625 sig_mutex_unlock(&aiowp->work_qlock1); 1626 _aio_donecnt++; 1627 ASSERT(_aio_donecnt > 0 && 1628 _aio_outstand_cnt >= 0 && 1629 _aio_req_done_cnt >= 0); 1630 ASSERT(reqp != NULL); 1631 1632 if (_aio_done_tail == NULL) { 1633 _aio_done_head = _aio_done_tail = reqp; 1634 } else { 1635 _aio_done_head->req_next = reqp; 1636 _aio_done_head = reqp; 1637 } 1638 1639 if (_aiowait_flag) { 1640 sig_mutex_unlock(&__aio_mutex); 1641 (void) _kaio(AIONOTIFY); 1642 } else { 1643 sig_mutex_unlock(&__aio_mutex); 1644 if (_sigio_enabled) 1645 (void) kill(__pid, SIGIO); 1646 } 1647 } 1648 1649 /* 1650 * The done queue consists of AIO requests that are in either the 1651 * AIO_REQ_DONE or AIO_REQ_CANCELED state. Requests that were cancelled 1652 * are discarded. If the done queue is empty then NULL is returned. 1653 * Otherwise the address of a done aio_result_t is returned. 1654 */ 1655 aio_result_t * 1656 _aio_req_done(void) 1657 { 1658 aio_req_t *reqp; 1659 aio_result_t *resultp; 1660 1661 ASSERT(MUTEX_HELD(&__aio_mutex)); 1662 1663 if ((reqp = _aio_done_tail) != NULL) { 1664 if ((_aio_done_tail = reqp->req_next) == NULL) 1665 _aio_done_head = NULL; 1666 ASSERT(_aio_donecnt > 0); 1667 _aio_donecnt--; 1668 (void) _aio_hash_del(reqp->req_resultp); 1669 resultp = reqp->req_resultp; 1670 ASSERT(reqp->req_state == AIO_REQ_DONE); 1671 _aio_req_free(reqp); 1672 return (resultp); 1673 } 1674 /* is queue empty? */ 1675 if (reqp == NULL && _aio_outstand_cnt == 0) { 1676 return ((aio_result_t *)-1); 1677 } 1678 return (NULL); 1679 } 1680 1681 /* 1682 * Set the return and errno values for the application's use. 1683 * 1684 * For the Posix interfaces, we must set the return value first followed 1685 * by the errno value because the Posix interfaces allow for a change 1686 * in the errno value from EINPROGRESS to something else to signal 1687 * the completion of the asynchronous request. 1688 * 1689 * The opposite is true for the Solaris interfaces. These allow for 1690 * a change in the return value from AIO_INPROGRESS to something else 1691 * to signal the completion of the asynchronous request. 1692 */ 1693 void 1694 _aio_set_result(aio_req_t *reqp, ssize_t retval, int error) 1695 { 1696 aio_result_t *resultp = reqp->req_resultp; 1697 1698 if (POSIX_AIO(reqp)) { 1699 resultp->aio_return = retval; 1700 membar_producer(); 1701 resultp->aio_errno = error; 1702 } else { 1703 resultp->aio_errno = error; 1704 membar_producer(); 1705 resultp->aio_return = retval; 1706 } 1707 } 1708 1709 /* 1710 * Add an AIO request onto the next work queue. 1711 * A circular list of workers is used to choose the next worker. 1712 */ 1713 void 1714 _aio_req_add(aio_req_t *reqp, aio_worker_t **nextworker, int mode) 1715 { 1716 ulwp_t *self = curthread; 1717 aio_worker_t *aiowp; 1718 aio_worker_t *first; 1719 int load_bal_flg = 1; 1720 int found; 1721 1722 ASSERT(reqp->req_state != AIO_REQ_DONEQ); 1723 reqp->req_next = NULL; 1724 /* 1725 * Try to acquire the next worker's work queue. If it is locked, 1726 * then search the list of workers until a queue is found unlocked, 1727 * or until the list is completely traversed at which point another 1728 * worker will be created. 1729 */ 1730 sigoff(self); /* defer SIGIO */ 1731 sig_mutex_lock(&__aio_mutex); 1732 first = aiowp = *nextworker; 1733 if (mode != AIONOTIFY) 1734 _aio_outstand_cnt++; 1735 sig_mutex_unlock(&__aio_mutex); 1736 1737 switch (mode) { 1738 case AIOREAD: 1739 case AIOWRITE: 1740 case AIOAREAD: 1741 case AIOAWRITE: 1742 #if !defined(_LP64) 1743 case AIOAREAD64: 1744 case AIOAWRITE64: 1745 #endif 1746 /* try to find an idle worker */ 1747 found = 0; 1748 do { 1749 if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) { 1750 if (aiowp->work_idleflg) { 1751 found = 1; 1752 break; 1753 } 1754 sig_mutex_unlock(&aiowp->work_qlock1); 1755 } 1756 } while ((aiowp = aiowp->work_forw) != first); 1757 1758 if (found) { 1759 aiowp->work_minload1++; 1760 break; 1761 } 1762 1763 /* try to acquire some worker's queue lock */ 1764 do { 1765 if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) { 1766 found = 1; 1767 break; 1768 } 1769 } while ((aiowp = aiowp->work_forw) != first); 1770 1771 /* 1772 * Create more workers when the workers appear overloaded. 1773 * Either all the workers are busy draining their queues 1774 * or no worker's queue lock could be acquired. 1775 */ 1776 if (!found) { 1777 if (_aio_worker_cnt < _max_workers) { 1778 if (_aio_create_worker(reqp, mode)) 1779 aio_panic("_aio_req_add: add worker"); 1780 sigon(self); /* reenable SIGIO */ 1781 return; 1782 } 1783 1784 /* 1785 * No worker available and we have created 1786 * _max_workers, keep going through the 1787 * list slowly until we get a lock 1788 */ 1789 while (sig_mutex_trylock(&aiowp->work_qlock1) != 0) { 1790 /* 1791 * give someone else a chance 1792 */ 1793 _aio_delay(1); 1794 aiowp = aiowp->work_forw; 1795 } 1796 } 1797 1798 ASSERT(MUTEX_HELD(&aiowp->work_qlock1)); 1799 if (_aio_worker_cnt < _max_workers && 1800 aiowp->work_minload1 >= _minworkload) { 1801 sig_mutex_unlock(&aiowp->work_qlock1); 1802 sig_mutex_lock(&__aio_mutex); 1803 *nextworker = aiowp->work_forw; 1804 sig_mutex_unlock(&__aio_mutex); 1805 if (_aio_create_worker(reqp, mode)) 1806 aio_panic("aio_req_add: add worker"); 1807 sigon(self); /* reenable SIGIO */ 1808 return; 1809 } 1810 aiowp->work_minload1++; 1811 break; 1812 case AIOFSYNC: 1813 case AIONOTIFY: 1814 load_bal_flg = 0; 1815 sig_mutex_lock(&aiowp->work_qlock1); 1816 break; 1817 default: 1818 aio_panic("_aio_req_add: invalid mode"); 1819 break; 1820 } 1821 /* 1822 * Put request onto worker's work queue. 1823 */ 1824 if (aiowp->work_tail1 == NULL) { 1825 ASSERT(aiowp->work_count1 == 0); 1826 aiowp->work_tail1 = reqp; 1827 aiowp->work_next1 = reqp; 1828 } else { 1829 aiowp->work_head1->req_next = reqp; 1830 if (aiowp->work_next1 == NULL) 1831 aiowp->work_next1 = reqp; 1832 } 1833 reqp->req_state = AIO_REQ_QUEUED; 1834 reqp->req_worker = aiowp; 1835 aiowp->work_head1 = reqp; 1836 /* 1837 * Awaken worker if it is not currently active. 1838 */ 1839 if (aiowp->work_count1++ == 0 && aiowp->work_idleflg) { 1840 aiowp->work_idleflg = 0; 1841 (void) cond_signal(&aiowp->work_idle_cv); 1842 } 1843 sig_mutex_unlock(&aiowp->work_qlock1); 1844 1845 if (load_bal_flg) { 1846 sig_mutex_lock(&__aio_mutex); 1847 *nextworker = aiowp->work_forw; 1848 sig_mutex_unlock(&__aio_mutex); 1849 } 1850 sigon(self); /* reenable SIGIO */ 1851 } 1852 1853 /* 1854 * Get an AIO request for a specified worker. 1855 * If the work queue is empty, return NULL. 1856 */ 1857 aio_req_t * 1858 _aio_req_get(aio_worker_t *aiowp) 1859 { 1860 aio_req_t *reqp; 1861 1862 sig_mutex_lock(&aiowp->work_qlock1); 1863 if ((reqp = aiowp->work_next1) != NULL) { 1864 /* 1865 * Remove a POSIX request from the queue; the 1866 * request queue is a singularly linked list 1867 * with a previous pointer. The request is 1868 * removed by updating the previous pointer. 1869 * 1870 * Non-posix requests are left on the queue 1871 * to eventually be placed on the done queue. 1872 */ 1873 1874 if (POSIX_AIO(reqp)) { 1875 if (aiowp->work_prev1 == NULL) { 1876 aiowp->work_tail1 = reqp->req_next; 1877 if (aiowp->work_tail1 == NULL) 1878 aiowp->work_head1 = NULL; 1879 } else { 1880 aiowp->work_prev1->req_next = reqp->req_next; 1881 if (aiowp->work_head1 == reqp) 1882 aiowp->work_head1 = reqp->req_next; 1883 } 1884 1885 } else { 1886 aiowp->work_prev1 = reqp; 1887 ASSERT(aiowp->work_done1 >= 0); 1888 aiowp->work_done1++; 1889 } 1890 ASSERT(reqp != reqp->req_next); 1891 aiowp->work_next1 = reqp->req_next; 1892 ASSERT(aiowp->work_count1 >= 1); 1893 aiowp->work_count1--; 1894 switch (reqp->req_op) { 1895 case AIOREAD: 1896 case AIOWRITE: 1897 case AIOAREAD: 1898 case AIOAWRITE: 1899 #if !defined(_LP64) 1900 case AIOAREAD64: 1901 case AIOAWRITE64: 1902 #endif 1903 ASSERT(aiowp->work_minload1 > 0); 1904 aiowp->work_minload1--; 1905 break; 1906 } 1907 reqp->req_state = AIO_REQ_INPROGRESS; 1908 } 1909 aiowp->work_req = reqp; 1910 ASSERT(reqp != NULL || aiowp->work_count1 == 0); 1911 sig_mutex_unlock(&aiowp->work_qlock1); 1912 return (reqp); 1913 } 1914 1915 static void 1916 _aio_req_del(aio_worker_t *aiowp, aio_req_t *reqp, int ostate) 1917 { 1918 aio_req_t **last; 1919 aio_req_t *lastrp; 1920 aio_req_t *next; 1921 1922 ASSERT(aiowp != NULL); 1923 ASSERT(MUTEX_HELD(&aiowp->work_qlock1)); 1924 if (POSIX_AIO(reqp)) { 1925 if (ostate != AIO_REQ_QUEUED) 1926 return; 1927 } 1928 last = &aiowp->work_tail1; 1929 lastrp = aiowp->work_tail1; 1930 ASSERT(ostate == AIO_REQ_QUEUED || ostate == AIO_REQ_INPROGRESS); 1931 while ((next = *last) != NULL) { 1932 if (next == reqp) { 1933 *last = next->req_next; 1934 if (aiowp->work_next1 == next) 1935 aiowp->work_next1 = next->req_next; 1936 1937 /* 1938 * if this is the first request on the queue, move 1939 * the lastrp pointer forward. 1940 */ 1941 if (lastrp == next) 1942 lastrp = next->req_next; 1943 1944 /* 1945 * if this request is pointed by work_head1, then 1946 * make work_head1 point to the last request that is 1947 * present on the queue. 1948 */ 1949 if (aiowp->work_head1 == next) 1950 aiowp->work_head1 = lastrp; 1951 1952 /* 1953 * work_prev1 is used only in non posix case and it 1954 * points to the current AIO_REQ_INPROGRESS request. 1955 * If work_prev1 points to this request which is being 1956 * deleted, make work_prev1 NULL and set work_done1 1957 * to 0. 1958 * 1959 * A worker thread can be processing only one request 1960 * at a time. 1961 */ 1962 if (aiowp->work_prev1 == next) { 1963 ASSERT(ostate == AIO_REQ_INPROGRESS && 1964 !POSIX_AIO(reqp) && aiowp->work_done1 > 0); 1965 aiowp->work_prev1 = NULL; 1966 aiowp->work_done1--; 1967 } 1968 1969 if (ostate == AIO_REQ_QUEUED) { 1970 ASSERT(aiowp->work_count1 >= 1); 1971 aiowp->work_count1--; 1972 ASSERT(aiowp->work_minload1 >= 1); 1973 aiowp->work_minload1--; 1974 } 1975 return; 1976 } 1977 last = &next->req_next; 1978 lastrp = next; 1979 } 1980 /* NOTREACHED */ 1981 } 1982 1983 static void 1984 _aio_enq_doneq(aio_req_t *reqp) 1985 { 1986 if (_aio_doneq == NULL) { 1987 _aio_doneq = reqp; 1988 reqp->req_next = reqp->req_prev = reqp; 1989 } else { 1990 reqp->req_next = _aio_doneq; 1991 reqp->req_prev = _aio_doneq->req_prev; 1992 _aio_doneq->req_prev->req_next = reqp; 1993 _aio_doneq->req_prev = reqp; 1994 } 1995 reqp->req_state = AIO_REQ_DONEQ; 1996 _aio_doneq_cnt++; 1997 } 1998 1999 /* 2000 * caller owns the _aio_mutex 2001 */ 2002 aio_req_t * 2003 _aio_req_remove(aio_req_t *reqp) 2004 { 2005 if (reqp && reqp->req_state != AIO_REQ_DONEQ) 2006 return (NULL); 2007 2008 if (reqp) { 2009 /* request in done queue */ 2010 if (_aio_doneq == reqp) 2011 _aio_doneq = reqp->req_next; 2012 if (_aio_doneq == reqp) { 2013 /* only one request on queue */ 2014 _aio_doneq = NULL; 2015 } else { 2016 aio_req_t *tmp = reqp->req_next; 2017 reqp->req_prev->req_next = tmp; 2018 tmp->req_prev = reqp->req_prev; 2019 } 2020 } else if ((reqp = _aio_doneq) != NULL) { 2021 if (reqp == reqp->req_next) { 2022 /* only one request on queue */ 2023 _aio_doneq = NULL; 2024 } else { 2025 reqp->req_prev->req_next = _aio_doneq = reqp->req_next; 2026 _aio_doneq->req_prev = reqp->req_prev; 2027 } 2028 } 2029 if (reqp) { 2030 _aio_doneq_cnt--; 2031 reqp->req_next = reqp->req_prev = reqp; 2032 reqp->req_state = AIO_REQ_DONE; 2033 } 2034 return (reqp); 2035 } 2036 2037 /* 2038 * An AIO request is identified by an aio_result_t pointer. The library 2039 * maps this aio_result_t pointer to its internal representation using a 2040 * hash table. This function adds an aio_result_t pointer to the hash table. 2041 */ 2042 static int 2043 _aio_hash_insert(aio_result_t *resultp, aio_req_t *reqp) 2044 { 2045 aio_hash_t *hashp; 2046 aio_req_t **prev; 2047 aio_req_t *next; 2048 2049 hashp = _aio_hash + AIOHASH(resultp); 2050 lmutex_lock(&hashp->hash_lock); 2051 prev = &hashp->hash_ptr; 2052 while ((next = *prev) != NULL) { 2053 if (resultp == next->req_resultp) { 2054 lmutex_unlock(&hashp->hash_lock); 2055 return (-1); 2056 } 2057 prev = &next->req_link; 2058 } 2059 *prev = reqp; 2060 ASSERT(reqp->req_link == NULL); 2061 lmutex_unlock(&hashp->hash_lock); 2062 return (0); 2063 } 2064 2065 /* 2066 * Remove an entry from the hash table. 2067 */ 2068 aio_req_t * 2069 _aio_hash_del(aio_result_t *resultp) 2070 { 2071 aio_hash_t *hashp; 2072 aio_req_t **prev; 2073 aio_req_t *next = NULL; 2074 2075 if (_aio_hash != NULL) { 2076 hashp = _aio_hash + AIOHASH(resultp); 2077 lmutex_lock(&hashp->hash_lock); 2078 prev = &hashp->hash_ptr; 2079 while ((next = *prev) != NULL) { 2080 if (resultp == next->req_resultp) { 2081 *prev = next->req_link; 2082 next->req_link = NULL; 2083 break; 2084 } 2085 prev = &next->req_link; 2086 } 2087 lmutex_unlock(&hashp->hash_lock); 2088 } 2089 return (next); 2090 } 2091 2092 /* 2093 * find an entry in the hash table 2094 */ 2095 aio_req_t * 2096 _aio_hash_find(aio_result_t *resultp) 2097 { 2098 aio_hash_t *hashp; 2099 aio_req_t **prev; 2100 aio_req_t *next = NULL; 2101 2102 if (_aio_hash != NULL) { 2103 hashp = _aio_hash + AIOHASH(resultp); 2104 lmutex_lock(&hashp->hash_lock); 2105 prev = &hashp->hash_ptr; 2106 while ((next = *prev) != NULL) { 2107 if (resultp == next->req_resultp) 2108 break; 2109 prev = &next->req_link; 2110 } 2111 lmutex_unlock(&hashp->hash_lock); 2112 } 2113 return (next); 2114 } 2115 2116 /* 2117 * AIO interface for POSIX 2118 */ 2119 int 2120 _aio_rw(aiocb_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker, 2121 int mode, int flg) 2122 { 2123 aio_req_t *reqp; 2124 aio_args_t *ap; 2125 int kerr; 2126 2127 if (aiocbp == NULL) { 2128 errno = EINVAL; 2129 return (-1); 2130 } 2131 2132 /* initialize kaio */ 2133 if (!_kaio_ok) 2134 _kaio_init(); 2135 2136 aiocbp->aio_state = NOCHECK; 2137 2138 /* 2139 * If we have been called because a list I/O 2140 * kaio() failed, we dont want to repeat the 2141 * system call 2142 */ 2143 2144 if (flg & AIO_KAIO) { 2145 /* 2146 * Try kernel aio first. 2147 * If errno is ENOTSUP/EBADFD, 2148 * fall back to the thread implementation. 2149 */ 2150 if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) { 2151 aiocbp->aio_resultp.aio_errno = EINPROGRESS; 2152 aiocbp->aio_state = CHECK; 2153 kerr = (int)_kaio(mode, aiocbp); 2154 if (kerr == 0) 2155 return (0); 2156 if (errno != ENOTSUP && errno != EBADFD) { 2157 aiocbp->aio_resultp.aio_errno = errno; 2158 aiocbp->aio_resultp.aio_return = -1; 2159 aiocbp->aio_state = NOCHECK; 2160 return (-1); 2161 } 2162 if (errno == EBADFD) 2163 SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes); 2164 } 2165 } 2166 2167 aiocbp->aio_resultp.aio_errno = EINPROGRESS; 2168 aiocbp->aio_state = USERAIO; 2169 2170 if (!__uaio_ok && __uaio_init() == -1) 2171 return (-1); 2172 2173 if ((reqp = _aio_req_alloc()) == NULL) { 2174 errno = EAGAIN; 2175 return (-1); 2176 } 2177 2178 /* 2179 * If an LIO request, add the list head to the aio request 2180 */ 2181 reqp->req_head = lio_head; 2182 reqp->req_type = AIO_POSIX_REQ; 2183 reqp->req_op = mode; 2184 reqp->req_largefile = 0; 2185 2186 if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) { 2187 reqp->req_sigevent.sigev_notify = SIGEV_NONE; 2188 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) { 2189 reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL; 2190 reqp->req_sigevent.sigev_signo = 2191 aiocbp->aio_sigevent.sigev_signo; 2192 reqp->req_sigevent.sigev_value.sival_ptr = 2193 aiocbp->aio_sigevent.sigev_value.sival_ptr; 2194 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) { 2195 port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr; 2196 reqp->req_sigevent.sigev_notify = SIGEV_PORT; 2197 /* 2198 * Reuse the sigevent structure to contain the port number 2199 * and the user value. Same for SIGEV_THREAD, below. 2200 */ 2201 reqp->req_sigevent.sigev_signo = 2202 pn->portnfy_port; 2203 reqp->req_sigevent.sigev_value.sival_ptr = 2204 pn->portnfy_user; 2205 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) { 2206 reqp->req_sigevent.sigev_notify = SIGEV_THREAD; 2207 /* 2208 * The sigevent structure contains the port number 2209 * and the user value. Same for SIGEV_PORT, above. 2210 */ 2211 reqp->req_sigevent.sigev_signo = 2212 aiocbp->aio_sigevent.sigev_signo; 2213 reqp->req_sigevent.sigev_value.sival_ptr = 2214 aiocbp->aio_sigevent.sigev_value.sival_ptr; 2215 } 2216 2217 reqp->req_resultp = &aiocbp->aio_resultp; 2218 reqp->req_aiocbp = aiocbp; 2219 ap = &reqp->req_args; 2220 ap->fd = aiocbp->aio_fildes; 2221 ap->buf = (caddr_t)aiocbp->aio_buf; 2222 ap->bufsz = aiocbp->aio_nbytes; 2223 ap->offset = aiocbp->aio_offset; 2224 2225 if ((flg & AIO_NO_DUPS) && 2226 _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) { 2227 aio_panic("_aio_rw(): request already in hash table"); 2228 _aio_req_free(reqp); 2229 errno = EINVAL; 2230 return (-1); 2231 } 2232 _aio_req_add(reqp, nextworker, mode); 2233 return (0); 2234 } 2235 2236 #if !defined(_LP64) 2237 /* 2238 * 64-bit AIO interface for POSIX 2239 */ 2240 int 2241 _aio_rw64(aiocb64_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker, 2242 int mode, int flg) 2243 { 2244 aio_req_t *reqp; 2245 aio_args_t *ap; 2246 int kerr; 2247 2248 if (aiocbp == NULL) { 2249 errno = EINVAL; 2250 return (-1); 2251 } 2252 2253 /* initialize kaio */ 2254 if (!_kaio_ok) 2255 _kaio_init(); 2256 2257 aiocbp->aio_state = NOCHECK; 2258 2259 /* 2260 * If we have been called because a list I/O 2261 * kaio() failed, we dont want to repeat the 2262 * system call 2263 */ 2264 2265 if (flg & AIO_KAIO) { 2266 /* 2267 * Try kernel aio first. 2268 * If errno is ENOTSUP/EBADFD, 2269 * fall back to the thread implementation. 2270 */ 2271 if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) { 2272 aiocbp->aio_resultp.aio_errno = EINPROGRESS; 2273 aiocbp->aio_state = CHECK; 2274 kerr = (int)_kaio(mode, aiocbp); 2275 if (kerr == 0) 2276 return (0); 2277 if (errno != ENOTSUP && errno != EBADFD) { 2278 aiocbp->aio_resultp.aio_errno = errno; 2279 aiocbp->aio_resultp.aio_return = -1; 2280 aiocbp->aio_state = NOCHECK; 2281 return (-1); 2282 } 2283 if (errno == EBADFD) 2284 SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes); 2285 } 2286 } 2287 2288 aiocbp->aio_resultp.aio_errno = EINPROGRESS; 2289 aiocbp->aio_state = USERAIO; 2290 2291 if (!__uaio_ok && __uaio_init() == -1) 2292 return (-1); 2293 2294 if ((reqp = _aio_req_alloc()) == NULL) { 2295 errno = EAGAIN; 2296 return (-1); 2297 } 2298 2299 /* 2300 * If an LIO request, add the list head to the aio request 2301 */ 2302 reqp->req_head = lio_head; 2303 reqp->req_type = AIO_POSIX_REQ; 2304 reqp->req_op = mode; 2305 reqp->req_largefile = 1; 2306 2307 if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) { 2308 reqp->req_sigevent.sigev_notify = SIGEV_NONE; 2309 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) { 2310 reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL; 2311 reqp->req_sigevent.sigev_signo = 2312 aiocbp->aio_sigevent.sigev_signo; 2313 reqp->req_sigevent.sigev_value.sival_ptr = 2314 aiocbp->aio_sigevent.sigev_value.sival_ptr; 2315 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) { 2316 port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr; 2317 reqp->req_sigevent.sigev_notify = SIGEV_PORT; 2318 reqp->req_sigevent.sigev_signo = 2319 pn->portnfy_port; 2320 reqp->req_sigevent.sigev_value.sival_ptr = 2321 pn->portnfy_user; 2322 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) { 2323 reqp->req_sigevent.sigev_notify = SIGEV_THREAD; 2324 reqp->req_sigevent.sigev_signo = 2325 aiocbp->aio_sigevent.sigev_signo; 2326 reqp->req_sigevent.sigev_value.sival_ptr = 2327 aiocbp->aio_sigevent.sigev_value.sival_ptr; 2328 } 2329 2330 reqp->req_resultp = &aiocbp->aio_resultp; 2331 reqp->req_aiocbp = aiocbp; 2332 ap = &reqp->req_args; 2333 ap->fd = aiocbp->aio_fildes; 2334 ap->buf = (caddr_t)aiocbp->aio_buf; 2335 ap->bufsz = aiocbp->aio_nbytes; 2336 ap->offset = aiocbp->aio_offset; 2337 2338 if ((flg & AIO_NO_DUPS) && 2339 _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) { 2340 aio_panic("_aio_rw64(): request already in hash table"); 2341 _aio_req_free(reqp); 2342 errno = EINVAL; 2343 return (-1); 2344 } 2345 _aio_req_add(reqp, nextworker, mode); 2346 return (0); 2347 } 2348 #endif /* !defined(_LP64) */ 2349