1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 22 /* 23 * Copyright 2008 Sun Microsystems, Inc. All rights reserved. 24 * Use is subject to license terms. 25 */ 26 27 #pragma ident "%Z%%M% %I% %E% SMI" 28 29 #include "lint.h" 30 #include "thr_uberdata.h" 31 #include "asyncio.h" 32 #include <atomic.h> 33 #include <sys/param.h> 34 #include <sys/file.h> 35 #include <sys/port.h> 36 37 static int _aio_hash_insert(aio_result_t *, aio_req_t *); 38 static aio_req_t *_aio_req_get(aio_worker_t *); 39 static void _aio_req_add(aio_req_t *, aio_worker_t **, int); 40 static void _aio_req_del(aio_worker_t *, aio_req_t *, int); 41 static void _aio_work_done(aio_worker_t *); 42 static void _aio_enq_doneq(aio_req_t *); 43 44 extern void _aio_lio_free(aio_lio_t *); 45 46 extern int __fdsync(int, int); 47 extern int __fcntl(int, int, ...); 48 extern int _port_dispatch(int, int, int, int, uintptr_t, void *); 49 50 static int _aio_fsync_del(aio_worker_t *, aio_req_t *); 51 static void _aiodone(aio_req_t *, ssize_t, int); 52 static void _aio_cancel_work(aio_worker_t *, int, int *, int *); 53 static void _aio_finish_request(aio_worker_t *, ssize_t, int); 54 55 /* 56 * switch for kernel async I/O 57 */ 58 int _kaio_ok = 0; /* 0 = disabled, 1 = on, -1 = error */ 59 60 /* 61 * Key for thread-specific data 62 */ 63 pthread_key_t _aio_key; 64 65 /* 66 * Array for determining whether or not a file supports kaio. 67 * Initialized in _kaio_init(). 68 */ 69 uint32_t *_kaio_supported = NULL; 70 71 /* 72 * workers for read/write requests 73 * (__aio_mutex lock protects circular linked list of workers) 74 */ 75 aio_worker_t *__workers_rw; /* circular list of AIO workers */ 76 aio_worker_t *__nextworker_rw; /* next worker in list of workers */ 77 int __rw_workerscnt; /* number of read/write workers */ 78 79 /* 80 * worker for notification requests. 81 */ 82 aio_worker_t *__workers_no; /* circular list of AIO workers */ 83 aio_worker_t *__nextworker_no; /* next worker in list of workers */ 84 int __no_workerscnt; /* number of write workers */ 85 86 aio_req_t *_aio_done_tail; /* list of done requests */ 87 aio_req_t *_aio_done_head; 88 89 mutex_t __aio_initlock = DEFAULTMUTEX; /* makes aio initialization atomic */ 90 cond_t __aio_initcv = DEFAULTCV; 91 int __aio_initbusy = 0; 92 93 mutex_t __aio_mutex = DEFAULTMUTEX; /* protects counts, and linked lists */ 94 cond_t _aio_iowait_cv = DEFAULTCV; /* wait for userland I/Os */ 95 96 pid_t __pid = (pid_t)-1; /* initialize as invalid pid */ 97 int _sigio_enabled = 0; /* when set, send SIGIO signal */ 98 99 aio_hash_t *_aio_hash; 100 101 aio_req_t *_aio_doneq; /* double linked done queue list */ 102 103 int _aio_donecnt = 0; 104 int _aio_waitncnt = 0; /* # of requests for aio_waitn */ 105 int _aio_doneq_cnt = 0; 106 int _aio_outstand_cnt = 0; /* # of outstanding requests */ 107 int _kaio_outstand_cnt = 0; /* # of outstanding kaio requests */ 108 int _aio_req_done_cnt = 0; /* req. done but not in "done queue" */ 109 int _aio_kernel_suspend = 0; /* active kernel kaio calls */ 110 int _aio_suscv_cnt = 0; /* aio_suspend calls waiting on cv's */ 111 112 int _max_workers = 256; /* max number of workers permitted */ 113 int _min_workers = 4; /* min number of workers */ 114 int _minworkload = 2; /* min number of request in q */ 115 int _aio_worker_cnt = 0; /* number of workers to do requests */ 116 int __uaio_ok = 0; /* AIO has been enabled */ 117 sigset_t _worker_set; /* worker's signal mask */ 118 119 int _aiowait_flag = 0; /* when set, aiowait() is inprogress */ 120 int _aio_flags = 0; /* see asyncio.h defines for */ 121 122 aio_worker_t *_kaiowp = NULL; /* points to kaio cleanup thread */ 123 124 int hz; /* clock ticks per second */ 125 126 static int 127 _kaio_supported_init(void) 128 { 129 void *ptr; 130 size_t size; 131 132 if (_kaio_supported != NULL) /* already initialized */ 133 return (0); 134 135 size = MAX_KAIO_FDARRAY_SIZE * sizeof (uint32_t); 136 ptr = mmap(NULL, size, PROT_READ | PROT_WRITE, 137 MAP_PRIVATE | MAP_ANON, -1, (off_t)0); 138 if (ptr == MAP_FAILED) 139 return (-1); 140 _kaio_supported = ptr; 141 return (0); 142 } 143 144 /* 145 * The aio subsystem is initialized when an AIO request is made. 146 * Constants are initialized like the max number of workers that 147 * the subsystem can create, and the minimum number of workers 148 * permitted before imposing some restrictions. Also, some 149 * workers are created. 150 */ 151 int 152 __uaio_init(void) 153 { 154 int ret = -1; 155 int i; 156 int cancel_state; 157 158 lmutex_lock(&__aio_initlock); 159 (void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state); 160 while (__aio_initbusy) 161 (void) cond_wait(&__aio_initcv, &__aio_initlock); 162 (void) pthread_setcancelstate(cancel_state, NULL); 163 if (__uaio_ok) { /* already initialized */ 164 lmutex_unlock(&__aio_initlock); 165 return (0); 166 } 167 __aio_initbusy = 1; 168 lmutex_unlock(&__aio_initlock); 169 170 hz = (int)sysconf(_SC_CLK_TCK); 171 __pid = getpid(); 172 173 setup_cancelsig(SIGAIOCANCEL); 174 175 if (_kaio_supported_init() != 0) 176 goto out; 177 178 /* 179 * Allocate and initialize the hash table. 180 * Do this only once, even if __uaio_init() is called twice. 181 */ 182 if (_aio_hash == NULL) { 183 /* LINTED pointer cast */ 184 _aio_hash = (aio_hash_t *)mmap(NULL, 185 HASHSZ * sizeof (aio_hash_t), PROT_READ | PROT_WRITE, 186 MAP_PRIVATE | MAP_ANON, -1, (off_t)0); 187 if ((void *)_aio_hash == MAP_FAILED) { 188 _aio_hash = NULL; 189 goto out; 190 } 191 for (i = 0; i < HASHSZ; i++) 192 (void) mutex_init(&_aio_hash[i].hash_lock, 193 USYNC_THREAD, NULL); 194 } 195 196 /* 197 * Initialize worker's signal mask to only catch SIGAIOCANCEL. 198 */ 199 (void) sigfillset(&_worker_set); 200 (void) sigdelset(&_worker_set, SIGAIOCANCEL); 201 202 /* 203 * Create one worker to send asynchronous notifications. 204 * Do this only once, even if __uaio_init() is called twice. 205 */ 206 if (__no_workerscnt == 0 && 207 (_aio_create_worker(NULL, AIONOTIFY) != 0)) { 208 errno = EAGAIN; 209 goto out; 210 } 211 212 /* 213 * Create the minimum number of read/write workers. 214 * And later check whether atleast one worker is created; 215 * lwp_create() calls could fail because of segkp exhaustion. 216 */ 217 for (i = 0; i < _min_workers; i++) 218 (void) _aio_create_worker(NULL, AIOREAD); 219 if (__rw_workerscnt == 0) { 220 errno = EAGAIN; 221 goto out; 222 } 223 224 ret = 0; 225 out: 226 lmutex_lock(&__aio_initlock); 227 if (ret == 0) 228 __uaio_ok = 1; 229 __aio_initbusy = 0; 230 (void) cond_broadcast(&__aio_initcv); 231 lmutex_unlock(&__aio_initlock); 232 return (ret); 233 } 234 235 /* 236 * Called from close() before actually performing the real _close(). 237 */ 238 void 239 _aio_close(int fd) 240 { 241 if (fd < 0) /* avoid cancelling everything */ 242 return; 243 /* 244 * Cancel all outstanding aio requests for this file descriptor. 245 */ 246 if (__uaio_ok) 247 (void) aiocancel_all(fd); 248 /* 249 * If we have allocated the bit array, clear the bit for this file. 250 * The next open may re-use this file descriptor and the new file 251 * may have different kaio() behaviour. 252 */ 253 if (_kaio_supported != NULL) 254 CLEAR_KAIO_SUPPORTED(fd); 255 } 256 257 /* 258 * special kaio cleanup thread sits in a loop in the 259 * kernel waiting for pending kaio requests to complete. 260 */ 261 void * 262 _kaio_cleanup_thread(void *arg) 263 { 264 if (pthread_setspecific(_aio_key, arg) != 0) 265 aio_panic("_kaio_cleanup_thread, pthread_setspecific()"); 266 (void) _kaio(AIOSTART); 267 return (arg); 268 } 269 270 /* 271 * initialize kaio. 272 */ 273 void 274 _kaio_init() 275 { 276 int error; 277 sigset_t oset; 278 int cancel_state; 279 280 lmutex_lock(&__aio_initlock); 281 (void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state); 282 while (__aio_initbusy) 283 (void) cond_wait(&__aio_initcv, &__aio_initlock); 284 (void) pthread_setcancelstate(cancel_state, NULL); 285 if (_kaio_ok) { /* already initialized */ 286 lmutex_unlock(&__aio_initlock); 287 return; 288 } 289 __aio_initbusy = 1; 290 lmutex_unlock(&__aio_initlock); 291 292 if (_kaio_supported_init() != 0) 293 error = ENOMEM; 294 else if ((_kaiowp = _aio_worker_alloc()) == NULL) 295 error = ENOMEM; 296 else if ((error = (int)_kaio(AIOINIT)) == 0) { 297 (void) pthread_sigmask(SIG_SETMASK, &maskset, &oset); 298 error = thr_create(NULL, AIOSTKSIZE, _kaio_cleanup_thread, 299 _kaiowp, THR_DAEMON, &_kaiowp->work_tid); 300 (void) pthread_sigmask(SIG_SETMASK, &oset, NULL); 301 } 302 if (error && _kaiowp != NULL) { 303 _aio_worker_free(_kaiowp); 304 _kaiowp = NULL; 305 } 306 307 lmutex_lock(&__aio_initlock); 308 if (error) 309 _kaio_ok = -1; 310 else 311 _kaio_ok = 1; 312 __aio_initbusy = 0; 313 (void) cond_broadcast(&__aio_initcv); 314 lmutex_unlock(&__aio_initlock); 315 } 316 317 int 318 aioread(int fd, caddr_t buf, int bufsz, off_t offset, int whence, 319 aio_result_t *resultp) 320 { 321 return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOREAD)); 322 } 323 324 int 325 aiowrite(int fd, caddr_t buf, int bufsz, off_t offset, int whence, 326 aio_result_t *resultp) 327 { 328 return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOWRITE)); 329 } 330 331 #if !defined(_LP64) 332 int 333 aioread64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence, 334 aio_result_t *resultp) 335 { 336 return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAREAD64)); 337 } 338 339 int 340 aiowrite64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence, 341 aio_result_t *resultp) 342 { 343 return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAWRITE64)); 344 } 345 #endif /* !defined(_LP64) */ 346 347 int 348 _aiorw(int fd, caddr_t buf, int bufsz, offset_t offset, int whence, 349 aio_result_t *resultp, int mode) 350 { 351 aio_req_t *reqp; 352 aio_args_t *ap; 353 offset_t loffset; 354 struct stat64 stat64; 355 int error = 0; 356 int kerr; 357 int umode; 358 359 switch (whence) { 360 361 case SEEK_SET: 362 loffset = offset; 363 break; 364 case SEEK_CUR: 365 if ((loffset = llseek(fd, 0, SEEK_CUR)) == -1) 366 error = -1; 367 else 368 loffset += offset; 369 break; 370 case SEEK_END: 371 if (fstat64(fd, &stat64) == -1) 372 error = -1; 373 else 374 loffset = offset + stat64.st_size; 375 break; 376 default: 377 errno = EINVAL; 378 error = -1; 379 } 380 381 if (error) 382 return (error); 383 384 /* initialize kaio */ 385 if (!_kaio_ok) 386 _kaio_init(); 387 388 /* 389 * _aio_do_request() needs the original request code (mode) to be able 390 * to choose the appropiate 32/64 bit function. All other functions 391 * only require the difference between READ and WRITE (umode). 392 */ 393 if (mode == AIOAREAD64 || mode == AIOAWRITE64) 394 umode = mode - AIOAREAD64; 395 else 396 umode = mode; 397 398 /* 399 * Try kernel aio first. 400 * If errno is ENOTSUP/EBADFD, fall back to the thread implementation. 401 */ 402 if (_kaio_ok > 0 && KAIO_SUPPORTED(fd)) { 403 resultp->aio_errno = 0; 404 sig_mutex_lock(&__aio_mutex); 405 _kaio_outstand_cnt++; 406 sig_mutex_unlock(&__aio_mutex); 407 kerr = (int)_kaio(((resultp->aio_return == AIO_INPROGRESS) ? 408 (umode | AIO_POLL_BIT) : umode), 409 fd, buf, bufsz, loffset, resultp); 410 if (kerr == 0) { 411 return (0); 412 } 413 sig_mutex_lock(&__aio_mutex); 414 _kaio_outstand_cnt--; 415 sig_mutex_unlock(&__aio_mutex); 416 if (errno != ENOTSUP && errno != EBADFD) 417 return (-1); 418 if (errno == EBADFD) 419 SET_KAIO_NOT_SUPPORTED(fd); 420 } 421 422 if (!__uaio_ok && __uaio_init() == -1) 423 return (-1); 424 425 if ((reqp = _aio_req_alloc()) == NULL) { 426 errno = EAGAIN; 427 return (-1); 428 } 429 430 /* 431 * _aio_do_request() checks reqp->req_op to differentiate 432 * between 32 and 64 bit access. 433 */ 434 reqp->req_op = mode; 435 reqp->req_resultp = resultp; 436 ap = &reqp->req_args; 437 ap->fd = fd; 438 ap->buf = buf; 439 ap->bufsz = bufsz; 440 ap->offset = loffset; 441 442 if (_aio_hash_insert(resultp, reqp) != 0) { 443 _aio_req_free(reqp); 444 errno = EINVAL; 445 return (-1); 446 } 447 /* 448 * _aio_req_add() only needs the difference between READ and 449 * WRITE to choose the right worker queue. 450 */ 451 _aio_req_add(reqp, &__nextworker_rw, umode); 452 return (0); 453 } 454 455 int 456 aiocancel(aio_result_t *resultp) 457 { 458 aio_req_t *reqp; 459 aio_worker_t *aiowp; 460 int ret; 461 int done = 0; 462 int canceled = 0; 463 464 if (!__uaio_ok) { 465 errno = EINVAL; 466 return (-1); 467 } 468 469 sig_mutex_lock(&__aio_mutex); 470 reqp = _aio_hash_find(resultp); 471 if (reqp == NULL) { 472 if (_aio_outstand_cnt == _aio_req_done_cnt) 473 errno = EINVAL; 474 else 475 errno = EACCES; 476 ret = -1; 477 } else { 478 aiowp = reqp->req_worker; 479 sig_mutex_lock(&aiowp->work_qlock1); 480 (void) _aio_cancel_req(aiowp, reqp, &canceled, &done); 481 sig_mutex_unlock(&aiowp->work_qlock1); 482 483 if (canceled) { 484 ret = 0; 485 } else { 486 if (_aio_outstand_cnt == 0 || 487 _aio_outstand_cnt == _aio_req_done_cnt) 488 errno = EINVAL; 489 else 490 errno = EACCES; 491 ret = -1; 492 } 493 } 494 sig_mutex_unlock(&__aio_mutex); 495 return (ret); 496 } 497 498 /* ARGSUSED */ 499 static void 500 _aiowait_cleanup(void *arg) 501 { 502 sig_mutex_lock(&__aio_mutex); 503 _aiowait_flag--; 504 sig_mutex_unlock(&__aio_mutex); 505 } 506 507 /* 508 * This must be asynch safe and cancel safe 509 */ 510 aio_result_t * 511 aiowait(struct timeval *uwait) 512 { 513 aio_result_t *uresultp; 514 aio_result_t *kresultp; 515 aio_result_t *resultp; 516 int dontblock; 517 int timedwait = 0; 518 int kaio_errno = 0; 519 struct timeval twait; 520 struct timeval *wait = NULL; 521 hrtime_t hrtend; 522 hrtime_t hres; 523 524 if (uwait) { 525 /* 526 * Check for a valid specified wait time. 527 * If it is invalid, fail the call right away. 528 */ 529 if (uwait->tv_sec < 0 || uwait->tv_usec < 0 || 530 uwait->tv_usec >= MICROSEC) { 531 errno = EINVAL; 532 return ((aio_result_t *)-1); 533 } 534 535 if (uwait->tv_sec > 0 || uwait->tv_usec > 0) { 536 hrtend = gethrtime() + 537 (hrtime_t)uwait->tv_sec * NANOSEC + 538 (hrtime_t)uwait->tv_usec * (NANOSEC / MICROSEC); 539 twait = *uwait; 540 wait = &twait; 541 timedwait++; 542 } else { 543 /* polling */ 544 sig_mutex_lock(&__aio_mutex); 545 if (_kaio_outstand_cnt == 0) { 546 kresultp = (aio_result_t *)-1; 547 } else { 548 kresultp = (aio_result_t *)_kaio(AIOWAIT, 549 (struct timeval *)-1, 1); 550 if (kresultp != (aio_result_t *)-1 && 551 kresultp != NULL && 552 kresultp != (aio_result_t *)1) { 553 _kaio_outstand_cnt--; 554 sig_mutex_unlock(&__aio_mutex); 555 return (kresultp); 556 } 557 } 558 uresultp = _aio_req_done(); 559 sig_mutex_unlock(&__aio_mutex); 560 if (uresultp != NULL && 561 uresultp != (aio_result_t *)-1) { 562 return (uresultp); 563 } 564 if (uresultp == (aio_result_t *)-1 && 565 kresultp == (aio_result_t *)-1) { 566 errno = EINVAL; 567 return ((aio_result_t *)-1); 568 } else { 569 return (NULL); 570 } 571 } 572 } 573 574 for (;;) { 575 sig_mutex_lock(&__aio_mutex); 576 uresultp = _aio_req_done(); 577 if (uresultp != NULL && uresultp != (aio_result_t *)-1) { 578 sig_mutex_unlock(&__aio_mutex); 579 resultp = uresultp; 580 break; 581 } 582 _aiowait_flag++; 583 dontblock = (uresultp == (aio_result_t *)-1); 584 if (dontblock && _kaio_outstand_cnt == 0) { 585 kresultp = (aio_result_t *)-1; 586 kaio_errno = EINVAL; 587 } else { 588 sig_mutex_unlock(&__aio_mutex); 589 pthread_cleanup_push(_aiowait_cleanup, NULL); 590 _cancel_prologue(); 591 kresultp = (aio_result_t *)_kaio(AIOWAIT, 592 wait, dontblock); 593 _cancel_epilogue(); 594 pthread_cleanup_pop(0); 595 sig_mutex_lock(&__aio_mutex); 596 kaio_errno = errno; 597 } 598 _aiowait_flag--; 599 sig_mutex_unlock(&__aio_mutex); 600 if (kresultp == (aio_result_t *)1) { 601 /* aiowait() awakened by an aionotify() */ 602 continue; 603 } else if (kresultp != NULL && 604 kresultp != (aio_result_t *)-1) { 605 resultp = kresultp; 606 sig_mutex_lock(&__aio_mutex); 607 _kaio_outstand_cnt--; 608 sig_mutex_unlock(&__aio_mutex); 609 break; 610 } else if (kresultp == (aio_result_t *)-1 && 611 kaio_errno == EINVAL && 612 uresultp == (aio_result_t *)-1) { 613 errno = kaio_errno; 614 resultp = (aio_result_t *)-1; 615 break; 616 } else if (kresultp == (aio_result_t *)-1 && 617 kaio_errno == EINTR) { 618 errno = kaio_errno; 619 resultp = (aio_result_t *)-1; 620 break; 621 } else if (timedwait) { 622 hres = hrtend - gethrtime(); 623 if (hres <= 0) { 624 /* time is up; return */ 625 resultp = NULL; 626 break; 627 } else { 628 /* 629 * Some time left. Round up the remaining time 630 * in nanoseconds to microsec. Retry the call. 631 */ 632 hres += (NANOSEC / MICROSEC) - 1; 633 wait->tv_sec = hres / NANOSEC; 634 wait->tv_usec = 635 (hres % NANOSEC) / (NANOSEC / MICROSEC); 636 } 637 } else { 638 ASSERT(kresultp == NULL && uresultp == NULL); 639 resultp = NULL; 640 continue; 641 } 642 } 643 return (resultp); 644 } 645 646 /* 647 * _aio_get_timedelta calculates the remaining time and stores the result 648 * into timespec_t *wait. 649 */ 650 651 int 652 _aio_get_timedelta(timespec_t *end, timespec_t *wait) 653 { 654 int ret = 0; 655 struct timeval cur; 656 timespec_t curtime; 657 658 (void) gettimeofday(&cur, NULL); 659 curtime.tv_sec = cur.tv_sec; 660 curtime.tv_nsec = cur.tv_usec * 1000; /* convert us to ns */ 661 662 if (end->tv_sec >= curtime.tv_sec) { 663 wait->tv_sec = end->tv_sec - curtime.tv_sec; 664 if (end->tv_nsec >= curtime.tv_nsec) { 665 wait->tv_nsec = end->tv_nsec - curtime.tv_nsec; 666 if (wait->tv_sec == 0 && wait->tv_nsec == 0) 667 ret = -1; /* timer expired */ 668 } else { 669 if (end->tv_sec > curtime.tv_sec) { 670 wait->tv_sec -= 1; 671 wait->tv_nsec = NANOSEC - 672 (curtime.tv_nsec - end->tv_nsec); 673 } else { 674 ret = -1; /* timer expired */ 675 } 676 } 677 } else { 678 ret = -1; 679 } 680 return (ret); 681 } 682 683 /* 684 * If closing by file descriptor: we will simply cancel all the outstanding 685 * aio`s and return. Those aio's in question will have either noticed the 686 * cancellation notice before, during, or after initiating io. 687 */ 688 int 689 aiocancel_all(int fd) 690 { 691 aio_req_t *reqp; 692 aio_req_t **reqpp, *last; 693 aio_worker_t *first; 694 aio_worker_t *next; 695 int canceled = 0; 696 int done = 0; 697 int cancelall = 0; 698 699 sig_mutex_lock(&__aio_mutex); 700 701 if (_aio_outstand_cnt == 0) { 702 sig_mutex_unlock(&__aio_mutex); 703 return (AIO_ALLDONE); 704 } 705 706 /* 707 * Cancel requests from the read/write workers' queues. 708 */ 709 first = __nextworker_rw; 710 next = first; 711 do { 712 _aio_cancel_work(next, fd, &canceled, &done); 713 } while ((next = next->work_forw) != first); 714 715 /* 716 * finally, check if there are requests on the done queue that 717 * should be canceled. 718 */ 719 if (fd < 0) 720 cancelall = 1; 721 reqpp = &_aio_done_tail; 722 last = _aio_done_tail; 723 while ((reqp = *reqpp) != NULL) { 724 if (cancelall || reqp->req_args.fd == fd) { 725 *reqpp = reqp->req_next; 726 if (last == reqp) { 727 last = reqp->req_next; 728 } 729 if (_aio_done_head == reqp) { 730 /* this should be the last req in list */ 731 _aio_done_head = last; 732 } 733 _aio_donecnt--; 734 _aio_set_result(reqp, -1, ECANCELED); 735 (void) _aio_hash_del(reqp->req_resultp); 736 _aio_req_free(reqp); 737 } else { 738 reqpp = &reqp->req_next; 739 last = reqp; 740 } 741 } 742 743 if (cancelall) { 744 ASSERT(_aio_donecnt == 0); 745 _aio_done_head = NULL; 746 } 747 sig_mutex_unlock(&__aio_mutex); 748 749 if (canceled && done == 0) 750 return (AIO_CANCELED); 751 else if (done && canceled == 0) 752 return (AIO_ALLDONE); 753 else if ((canceled + done == 0) && KAIO_SUPPORTED(fd)) 754 return ((int)_kaio(AIOCANCEL, fd, NULL)); 755 return (AIO_NOTCANCELED); 756 } 757 758 /* 759 * Cancel requests from a given work queue. If the file descriptor 760 * parameter, fd, is non-negative, then only cancel those requests 761 * in this queue that are to this file descriptor. If the fd 762 * parameter is -1, then cancel all requests. 763 */ 764 static void 765 _aio_cancel_work(aio_worker_t *aiowp, int fd, int *canceled, int *done) 766 { 767 aio_req_t *reqp; 768 769 sig_mutex_lock(&aiowp->work_qlock1); 770 /* 771 * cancel queued requests first. 772 */ 773 reqp = aiowp->work_tail1; 774 while (reqp != NULL) { 775 if (fd < 0 || reqp->req_args.fd == fd) { 776 if (_aio_cancel_req(aiowp, reqp, canceled, done)) { 777 /* 778 * Callers locks were dropped. 779 * reqp is invalid; start traversing 780 * the list from the beginning again. 781 */ 782 reqp = aiowp->work_tail1; 783 continue; 784 } 785 } 786 reqp = reqp->req_next; 787 } 788 /* 789 * Since the queued requests have been canceled, there can 790 * only be one inprogress request that should be canceled. 791 */ 792 if ((reqp = aiowp->work_req) != NULL && 793 (fd < 0 || reqp->req_args.fd == fd)) 794 (void) _aio_cancel_req(aiowp, reqp, canceled, done); 795 sig_mutex_unlock(&aiowp->work_qlock1); 796 } 797 798 /* 799 * Cancel a request. Return 1 if the callers locks were temporarily 800 * dropped, otherwise return 0. 801 */ 802 int 803 _aio_cancel_req(aio_worker_t *aiowp, aio_req_t *reqp, int *canceled, int *done) 804 { 805 int ostate = reqp->req_state; 806 807 ASSERT(MUTEX_HELD(&__aio_mutex)); 808 ASSERT(MUTEX_HELD(&aiowp->work_qlock1)); 809 if (ostate == AIO_REQ_CANCELED) 810 return (0); 811 if (ostate == AIO_REQ_DONE && !POSIX_AIO(reqp) && 812 aiowp->work_prev1 == reqp) { 813 ASSERT(aiowp->work_done1 != 0); 814 /* 815 * If not on the done queue yet, just mark it CANCELED, 816 * _aio_work_done() will do the necessary clean up. 817 * This is required to ensure that aiocancel_all() cancels 818 * all the outstanding requests, including this one which 819 * is not yet on done queue but has been marked done. 820 */ 821 _aio_set_result(reqp, -1, ECANCELED); 822 (void) _aio_hash_del(reqp->req_resultp); 823 reqp->req_state = AIO_REQ_CANCELED; 824 (*canceled)++; 825 return (0); 826 } 827 828 if (ostate == AIO_REQ_DONE || ostate == AIO_REQ_DONEQ) { 829 (*done)++; 830 return (0); 831 } 832 if (reqp->req_op == AIOFSYNC && reqp != aiowp->work_req) { 833 ASSERT(POSIX_AIO(reqp)); 834 /* Cancel the queued aio_fsync() request */ 835 if (!reqp->req_head->lio_canned) { 836 reqp->req_head->lio_canned = 1; 837 _aio_outstand_cnt--; 838 (*canceled)++; 839 } 840 return (0); 841 } 842 reqp->req_state = AIO_REQ_CANCELED; 843 _aio_req_del(aiowp, reqp, ostate); 844 (void) _aio_hash_del(reqp->req_resultp); 845 (*canceled)++; 846 if (reqp == aiowp->work_req) { 847 ASSERT(ostate == AIO_REQ_INPROGRESS); 848 /* 849 * Set the result values now, before _aiodone() is called. 850 * We do this because the application can expect aio_return 851 * and aio_errno to be set to -1 and ECANCELED, respectively, 852 * immediately after a successful return from aiocancel() 853 * or aio_cancel(). 854 */ 855 _aio_set_result(reqp, -1, ECANCELED); 856 (void) thr_kill(aiowp->work_tid, SIGAIOCANCEL); 857 return (0); 858 } 859 if (!POSIX_AIO(reqp)) { 860 _aio_outstand_cnt--; 861 _aio_set_result(reqp, -1, ECANCELED); 862 _aio_req_free(reqp); 863 return (0); 864 } 865 sig_mutex_unlock(&aiowp->work_qlock1); 866 sig_mutex_unlock(&__aio_mutex); 867 _aiodone(reqp, -1, ECANCELED); 868 sig_mutex_lock(&__aio_mutex); 869 sig_mutex_lock(&aiowp->work_qlock1); 870 return (1); 871 } 872 873 int 874 _aio_create_worker(aio_req_t *reqp, int mode) 875 { 876 aio_worker_t *aiowp, **workers, **nextworker; 877 int *aio_workerscnt; 878 void *(*func)(void *); 879 sigset_t oset; 880 int error; 881 882 /* 883 * Put the new worker thread in the right queue. 884 */ 885 switch (mode) { 886 case AIOREAD: 887 case AIOWRITE: 888 case AIOAREAD: 889 case AIOAWRITE: 890 #if !defined(_LP64) 891 case AIOAREAD64: 892 case AIOAWRITE64: 893 #endif 894 workers = &__workers_rw; 895 nextworker = &__nextworker_rw; 896 aio_workerscnt = &__rw_workerscnt; 897 func = _aio_do_request; 898 break; 899 case AIONOTIFY: 900 workers = &__workers_no; 901 nextworker = &__nextworker_no; 902 func = _aio_do_notify; 903 aio_workerscnt = &__no_workerscnt; 904 break; 905 default: 906 aio_panic("_aio_create_worker: invalid mode"); 907 break; 908 } 909 910 if ((aiowp = _aio_worker_alloc()) == NULL) 911 return (-1); 912 913 if (reqp) { 914 reqp->req_state = AIO_REQ_QUEUED; 915 reqp->req_worker = aiowp; 916 aiowp->work_head1 = reqp; 917 aiowp->work_tail1 = reqp; 918 aiowp->work_next1 = reqp; 919 aiowp->work_count1 = 1; 920 aiowp->work_minload1 = 1; 921 } 922 923 (void) pthread_sigmask(SIG_SETMASK, &maskset, &oset); 924 error = thr_create(NULL, AIOSTKSIZE, func, aiowp, 925 THR_DAEMON | THR_SUSPENDED, &aiowp->work_tid); 926 (void) pthread_sigmask(SIG_SETMASK, &oset, NULL); 927 if (error) { 928 if (reqp) { 929 reqp->req_state = 0; 930 reqp->req_worker = NULL; 931 } 932 _aio_worker_free(aiowp); 933 return (-1); 934 } 935 936 lmutex_lock(&__aio_mutex); 937 (*aio_workerscnt)++; 938 if (*workers == NULL) { 939 aiowp->work_forw = aiowp; 940 aiowp->work_backw = aiowp; 941 *nextworker = aiowp; 942 *workers = aiowp; 943 } else { 944 aiowp->work_backw = (*workers)->work_backw; 945 aiowp->work_forw = (*workers); 946 (*workers)->work_backw->work_forw = aiowp; 947 (*workers)->work_backw = aiowp; 948 } 949 _aio_worker_cnt++; 950 lmutex_unlock(&__aio_mutex); 951 952 (void) thr_continue(aiowp->work_tid); 953 954 return (0); 955 } 956 957 /* 958 * This is the worker's main routine. 959 * The task of this function is to execute all queued requests; 960 * once the last pending request is executed this function will block 961 * in _aio_idle(). A new incoming request must wakeup this thread to 962 * restart the work. 963 * Every worker has an own work queue. The queue lock is required 964 * to synchronize the addition of new requests for this worker or 965 * cancellation of pending/running requests. 966 * 967 * Cancellation scenarios: 968 * The cancellation of a request is being done asynchronously using 969 * _aio_cancel_req() from another thread context. 970 * A queued request can be cancelled in different manners : 971 * a) request is queued but not "in progress" or "done" (AIO_REQ_QUEUED): 972 * - lock the queue -> remove the request -> unlock the queue 973 * - this function/thread does not detect this cancellation process 974 * b) request is in progress (AIO_REQ_INPROGRESS) : 975 * - this function first allow the cancellation of the running 976 * request with the flag "work_cancel_flg=1" 977 * see _aio_req_get() -> _aio_cancel_on() 978 * During this phase, it is allowed to interrupt the worker 979 * thread running the request (this thread) using the SIGAIOCANCEL 980 * signal. 981 * Once this thread returns from the kernel (because the request 982 * is just done), then it must disable a possible cancellation 983 * and proceed to finish the request. To disable the cancellation 984 * this thread must use _aio_cancel_off() to set "work_cancel_flg=0". 985 * c) request is already done (AIO_REQ_DONE || AIO_REQ_DONEQ): 986 * same procedure as in a) 987 * 988 * To b) 989 * This thread uses sigsetjmp() to define the position in the code, where 990 * it wish to continue working in the case that a SIGAIOCANCEL signal 991 * is detected. 992 * Normally this thread should get the cancellation signal during the 993 * kernel phase (reading or writing). In that case the signal handler 994 * aiosigcancelhndlr() is activated using the worker thread context, 995 * which again will use the siglongjmp() function to break the standard 996 * code flow and jump to the "sigsetjmp" position, provided that 997 * "work_cancel_flg" is set to "1". 998 * Because the "work_cancel_flg" is only manipulated by this worker 999 * thread and it can only run on one CPU at a given time, it is not 1000 * necessary to protect that flag with the queue lock. 1001 * Returning from the kernel (read or write system call) we must 1002 * first disable the use of the SIGAIOCANCEL signal and accordingly 1003 * the use of the siglongjmp() function to prevent a possible deadlock: 1004 * - It can happens that this worker thread returns from the kernel and 1005 * blocks in "work_qlock1", 1006 * - then a second thread cancels the apparently "in progress" request 1007 * and sends the SIGAIOCANCEL signal to the worker thread, 1008 * - the worker thread gets assigned the "work_qlock1" and will returns 1009 * from the kernel, 1010 * - the kernel detects the pending signal and activates the signal 1011 * handler instead, 1012 * - if the "work_cancel_flg" is still set then the signal handler 1013 * should use siglongjmp() to cancel the "in progress" request and 1014 * it would try to acquire the same work_qlock1 in _aio_req_get() 1015 * for a second time => deadlock. 1016 * To avoid that situation we disable the cancellation of the request 1017 * in progress BEFORE we try to acquire the work_qlock1. 1018 * In that case the signal handler will not call siglongjmp() and the 1019 * worker thread will continue running the standard code flow. 1020 * Then this thread must check the AIO_REQ_CANCELED flag to emulate 1021 * an eventually required siglongjmp() freeing the work_qlock1 and 1022 * avoiding a deadlock. 1023 */ 1024 void * 1025 _aio_do_request(void *arglist) 1026 { 1027 aio_worker_t *aiowp = (aio_worker_t *)arglist; 1028 ulwp_t *self = curthread; 1029 struct aio_args *arg; 1030 aio_req_t *reqp; /* current AIO request */ 1031 ssize_t retval; 1032 int append; 1033 int error; 1034 1035 if (pthread_setspecific(_aio_key, aiowp) != 0) 1036 aio_panic("_aio_do_request, pthread_setspecific()"); 1037 (void) pthread_sigmask(SIG_SETMASK, &_worker_set, NULL); 1038 ASSERT(aiowp->work_req == NULL); 1039 1040 /* 1041 * We resume here when an operation is cancelled. 1042 * On first entry, aiowp->work_req == NULL, so all 1043 * we do is block SIGAIOCANCEL. 1044 */ 1045 (void) sigsetjmp(aiowp->work_jmp_buf, 0); 1046 ASSERT(self->ul_sigdefer == 0); 1047 1048 sigoff(self); /* block SIGAIOCANCEL */ 1049 if (aiowp->work_req != NULL) 1050 _aio_finish_request(aiowp, -1, ECANCELED); 1051 1052 for (;;) { 1053 /* 1054 * Put completed requests on aio_done_list. This has 1055 * to be done as part of the main loop to ensure that 1056 * we don't artificially starve any aiowait'ers. 1057 */ 1058 if (aiowp->work_done1) 1059 _aio_work_done(aiowp); 1060 1061 top: 1062 /* consume any deferred SIGAIOCANCEL signal here */ 1063 sigon(self); 1064 sigoff(self); 1065 1066 while ((reqp = _aio_req_get(aiowp)) == NULL) { 1067 if (_aio_idle(aiowp) != 0) 1068 goto top; 1069 } 1070 arg = &reqp->req_args; 1071 ASSERT(reqp->req_state == AIO_REQ_INPROGRESS || 1072 reqp->req_state == AIO_REQ_CANCELED); 1073 error = 0; 1074 1075 switch (reqp->req_op) { 1076 case AIOREAD: 1077 case AIOAREAD: 1078 sigon(self); /* unblock SIGAIOCANCEL */ 1079 retval = pread(arg->fd, arg->buf, 1080 arg->bufsz, arg->offset); 1081 if (retval == -1) { 1082 if (errno == ESPIPE) { 1083 retval = read(arg->fd, 1084 arg->buf, arg->bufsz); 1085 if (retval == -1) 1086 error = errno; 1087 } else { 1088 error = errno; 1089 } 1090 } 1091 sigoff(self); /* block SIGAIOCANCEL */ 1092 break; 1093 case AIOWRITE: 1094 case AIOAWRITE: 1095 /* 1096 * The SUSv3 POSIX spec for aio_write() states: 1097 * If O_APPEND is set for the file descriptor, 1098 * write operations append to the file in the 1099 * same order as the calls were made. 1100 * but, somewhat inconsistently, it requires pwrite() 1101 * to ignore the O_APPEND setting. So we have to use 1102 * fcntl() to get the open modes and call write() for 1103 * the O_APPEND case. 1104 */ 1105 append = (__fcntl(arg->fd, F_GETFL) & O_APPEND); 1106 sigon(self); /* unblock SIGAIOCANCEL */ 1107 retval = append? 1108 write(arg->fd, arg->buf, arg->bufsz) : 1109 pwrite(arg->fd, arg->buf, arg->bufsz, 1110 arg->offset); 1111 if (retval == -1) { 1112 if (errno == ESPIPE) { 1113 retval = write(arg->fd, 1114 arg->buf, arg->bufsz); 1115 if (retval == -1) 1116 error = errno; 1117 } else { 1118 error = errno; 1119 } 1120 } 1121 sigoff(self); /* block SIGAIOCANCEL */ 1122 break; 1123 #if !defined(_LP64) 1124 case AIOAREAD64: 1125 sigon(self); /* unblock SIGAIOCANCEL */ 1126 retval = pread64(arg->fd, arg->buf, 1127 arg->bufsz, arg->offset); 1128 if (retval == -1) { 1129 if (errno == ESPIPE) { 1130 retval = read(arg->fd, 1131 arg->buf, arg->bufsz); 1132 if (retval == -1) 1133 error = errno; 1134 } else { 1135 error = errno; 1136 } 1137 } 1138 sigoff(self); /* block SIGAIOCANCEL */ 1139 break; 1140 case AIOAWRITE64: 1141 /* 1142 * The SUSv3 POSIX spec for aio_write() states: 1143 * If O_APPEND is set for the file descriptor, 1144 * write operations append to the file in the 1145 * same order as the calls were made. 1146 * but, somewhat inconsistently, it requires pwrite() 1147 * to ignore the O_APPEND setting. So we have to use 1148 * fcntl() to get the open modes and call write() for 1149 * the O_APPEND case. 1150 */ 1151 append = (__fcntl(arg->fd, F_GETFL) & O_APPEND); 1152 sigon(self); /* unblock SIGAIOCANCEL */ 1153 retval = append? 1154 write(arg->fd, arg->buf, arg->bufsz) : 1155 pwrite64(arg->fd, arg->buf, arg->bufsz, 1156 arg->offset); 1157 if (retval == -1) { 1158 if (errno == ESPIPE) { 1159 retval = write(arg->fd, 1160 arg->buf, arg->bufsz); 1161 if (retval == -1) 1162 error = errno; 1163 } else { 1164 error = errno; 1165 } 1166 } 1167 sigoff(self); /* block SIGAIOCANCEL */ 1168 break; 1169 #endif /* !defined(_LP64) */ 1170 case AIOFSYNC: 1171 if (_aio_fsync_del(aiowp, reqp)) 1172 goto top; 1173 ASSERT(reqp->req_head == NULL); 1174 /* 1175 * All writes for this fsync request are now 1176 * acknowledged. Now make these writes visible 1177 * and put the final request into the hash table. 1178 */ 1179 if (reqp->req_state == AIO_REQ_CANCELED) { 1180 /* EMPTY */; 1181 } else if (arg->offset == O_SYNC) { 1182 if ((retval = __fdsync(arg->fd, FSYNC)) == -1) 1183 error = errno; 1184 } else { 1185 if ((retval = __fdsync(arg->fd, FDSYNC)) == -1) 1186 error = errno; 1187 } 1188 if (_aio_hash_insert(reqp->req_resultp, reqp) != 0) 1189 aio_panic("_aio_do_request(): AIOFSYNC: " 1190 "request already in hash table"); 1191 break; 1192 default: 1193 aio_panic("_aio_do_request, bad op"); 1194 } 1195 1196 _aio_finish_request(aiowp, retval, error); 1197 } 1198 /* NOTREACHED */ 1199 return (NULL); 1200 } 1201 1202 /* 1203 * Perform the tail processing for _aio_do_request(). 1204 * The in-progress request may or may not have been cancelled. 1205 */ 1206 static void 1207 _aio_finish_request(aio_worker_t *aiowp, ssize_t retval, int error) 1208 { 1209 aio_req_t *reqp; 1210 1211 sig_mutex_lock(&aiowp->work_qlock1); 1212 if ((reqp = aiowp->work_req) == NULL) 1213 sig_mutex_unlock(&aiowp->work_qlock1); 1214 else { 1215 aiowp->work_req = NULL; 1216 if (reqp->req_state == AIO_REQ_CANCELED) { 1217 retval = -1; 1218 error = ECANCELED; 1219 } 1220 if (!POSIX_AIO(reqp)) { 1221 int notify; 1222 if (reqp->req_state == AIO_REQ_INPROGRESS) { 1223 reqp->req_state = AIO_REQ_DONE; 1224 _aio_set_result(reqp, retval, error); 1225 } 1226 sig_mutex_unlock(&aiowp->work_qlock1); 1227 sig_mutex_lock(&__aio_mutex); 1228 /* 1229 * If it was canceled, this request will not be 1230 * added to done list. Just free it. 1231 */ 1232 if (error == ECANCELED) { 1233 _aio_outstand_cnt--; 1234 _aio_req_free(reqp); 1235 } else { 1236 _aio_req_done_cnt++; 1237 } 1238 /* 1239 * Notify any thread that may have blocked 1240 * because it saw an outstanding request. 1241 */ 1242 notify = 0; 1243 if (_aio_outstand_cnt == 0 && _aiowait_flag) { 1244 notify = 1; 1245 } 1246 sig_mutex_unlock(&__aio_mutex); 1247 if (notify) { 1248 (void) _kaio(AIONOTIFY); 1249 } 1250 } else { 1251 if (reqp->req_state == AIO_REQ_INPROGRESS) 1252 reqp->req_state = AIO_REQ_DONE; 1253 sig_mutex_unlock(&aiowp->work_qlock1); 1254 _aiodone(reqp, retval, error); 1255 } 1256 } 1257 } 1258 1259 void 1260 _aio_req_mark_done(aio_req_t *reqp) 1261 { 1262 #if !defined(_LP64) 1263 if (reqp->req_largefile) 1264 ((aiocb64_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE; 1265 else 1266 #endif 1267 ((aiocb_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE; 1268 } 1269 1270 /* 1271 * Sleep for 'ticks' clock ticks to give somebody else a chance to run, 1272 * hopefully to consume one of our queued signals. 1273 */ 1274 static void 1275 _aio_delay(int ticks) 1276 { 1277 (void) usleep(ticks * (MICROSEC / hz)); 1278 } 1279 1280 /* 1281 * Actually send the notifications. 1282 * We could block indefinitely here if the application 1283 * is not listening for the signal or port notifications. 1284 */ 1285 static void 1286 send_notification(notif_param_t *npp) 1287 { 1288 extern int __sigqueue(pid_t pid, int signo, 1289 /* const union sigval */ void *value, int si_code, int block); 1290 1291 if (npp->np_signo) 1292 (void) __sigqueue(__pid, npp->np_signo, npp->np_user, 1293 SI_ASYNCIO, 1); 1294 else if (npp->np_port >= 0) 1295 (void) _port_dispatch(npp->np_port, 0, PORT_SOURCE_AIO, 1296 npp->np_event, npp->np_object, npp->np_user); 1297 1298 if (npp->np_lio_signo) 1299 (void) __sigqueue(__pid, npp->np_lio_signo, npp->np_lio_user, 1300 SI_ASYNCIO, 1); 1301 else if (npp->np_lio_port >= 0) 1302 (void) _port_dispatch(npp->np_lio_port, 0, PORT_SOURCE_AIO, 1303 npp->np_lio_event, npp->np_lio_object, npp->np_lio_user); 1304 } 1305 1306 /* 1307 * Asynchronous notification worker. 1308 */ 1309 void * 1310 _aio_do_notify(void *arg) 1311 { 1312 aio_worker_t *aiowp = (aio_worker_t *)arg; 1313 aio_req_t *reqp; 1314 1315 /* 1316 * This isn't really necessary. All signals are blocked. 1317 */ 1318 if (pthread_setspecific(_aio_key, aiowp) != 0) 1319 aio_panic("_aio_do_notify, pthread_setspecific()"); 1320 1321 /* 1322 * Notifications are never cancelled. 1323 * All signals remain blocked, forever. 1324 */ 1325 for (;;) { 1326 while ((reqp = _aio_req_get(aiowp)) == NULL) { 1327 if (_aio_idle(aiowp) != 0) 1328 aio_panic("_aio_do_notify: _aio_idle() failed"); 1329 } 1330 send_notification(&reqp->req_notify); 1331 _aio_req_free(reqp); 1332 } 1333 1334 /* NOTREACHED */ 1335 return (NULL); 1336 } 1337 1338 /* 1339 * Do the completion semantics for a request that was either canceled 1340 * by _aio_cancel_req() or was completed by _aio_do_request(). 1341 */ 1342 static void 1343 _aiodone(aio_req_t *reqp, ssize_t retval, int error) 1344 { 1345 aio_result_t *resultp = reqp->req_resultp; 1346 int notify = 0; 1347 aio_lio_t *head; 1348 int sigev_none; 1349 int sigev_signal; 1350 int sigev_thread; 1351 int sigev_port; 1352 notif_param_t np; 1353 1354 /* 1355 * We call _aiodone() only for Posix I/O. 1356 */ 1357 ASSERT(POSIX_AIO(reqp)); 1358 1359 sigev_none = 0; 1360 sigev_signal = 0; 1361 sigev_thread = 0; 1362 sigev_port = 0; 1363 np.np_signo = 0; 1364 np.np_port = -1; 1365 np.np_lio_signo = 0; 1366 np.np_lio_port = -1; 1367 1368 switch (reqp->req_sigevent.sigev_notify) { 1369 case SIGEV_NONE: 1370 sigev_none = 1; 1371 break; 1372 case SIGEV_SIGNAL: 1373 sigev_signal = 1; 1374 break; 1375 case SIGEV_THREAD: 1376 sigev_thread = 1; 1377 break; 1378 case SIGEV_PORT: 1379 sigev_port = 1; 1380 break; 1381 default: 1382 aio_panic("_aiodone: improper sigev_notify"); 1383 break; 1384 } 1385 1386 /* 1387 * Figure out the notification parameters while holding __aio_mutex. 1388 * Actually perform the notifications after dropping __aio_mutex. 1389 * This allows us to sleep for a long time (if the notifications 1390 * incur delays) without impeding other async I/O operations. 1391 */ 1392 1393 sig_mutex_lock(&__aio_mutex); 1394 1395 if (sigev_signal) { 1396 if ((np.np_signo = reqp->req_sigevent.sigev_signo) != 0) 1397 notify = 1; 1398 np.np_user = reqp->req_sigevent.sigev_value.sival_ptr; 1399 } else if (sigev_thread | sigev_port) { 1400 if ((np.np_port = reqp->req_sigevent.sigev_signo) >= 0) 1401 notify = 1; 1402 np.np_event = reqp->req_op; 1403 if (np.np_event == AIOFSYNC && reqp->req_largefile) 1404 np.np_event = AIOFSYNC64; 1405 np.np_object = (uintptr_t)reqp->req_aiocbp; 1406 np.np_user = reqp->req_sigevent.sigev_value.sival_ptr; 1407 } 1408 1409 if (resultp->aio_errno == EINPROGRESS) 1410 _aio_set_result(reqp, retval, error); 1411 1412 _aio_outstand_cnt--; 1413 1414 head = reqp->req_head; 1415 reqp->req_head = NULL; 1416 1417 if (sigev_none) { 1418 _aio_enq_doneq(reqp); 1419 reqp = NULL; 1420 } else { 1421 (void) _aio_hash_del(resultp); 1422 _aio_req_mark_done(reqp); 1423 } 1424 1425 _aio_waitn_wakeup(); 1426 1427 /* 1428 * __aio_waitn() sets AIO_WAIT_INPROGRESS and 1429 * __aio_suspend() increments "_aio_kernel_suspend" 1430 * when they are waiting in the kernel for completed I/Os. 1431 * 1432 * _kaio(AIONOTIFY) awakes the corresponding function 1433 * in the kernel; then the corresponding __aio_waitn() or 1434 * __aio_suspend() function could reap the recently 1435 * completed I/Os (_aiodone()). 1436 */ 1437 if ((_aio_flags & AIO_WAIT_INPROGRESS) || _aio_kernel_suspend > 0) 1438 (void) _kaio(AIONOTIFY); 1439 1440 sig_mutex_unlock(&__aio_mutex); 1441 1442 if (head != NULL) { 1443 /* 1444 * If all the lio requests have completed, 1445 * prepare to notify the waiting thread. 1446 */ 1447 sig_mutex_lock(&head->lio_mutex); 1448 ASSERT(head->lio_refcnt == head->lio_nent); 1449 if (head->lio_refcnt == 1) { 1450 int waiting = 0; 1451 if (head->lio_mode == LIO_WAIT) { 1452 if ((waiting = head->lio_waiting) != 0) 1453 (void) cond_signal(&head->lio_cond_cv); 1454 } else if (head->lio_port < 0) { /* none or signal */ 1455 if ((np.np_lio_signo = head->lio_signo) != 0) 1456 notify = 1; 1457 np.np_lio_user = head->lio_sigval.sival_ptr; 1458 } else { /* thread or port */ 1459 notify = 1; 1460 np.np_lio_port = head->lio_port; 1461 np.np_lio_event = head->lio_event; 1462 np.np_lio_object = 1463 (uintptr_t)head->lio_sigevent; 1464 np.np_lio_user = head->lio_sigval.sival_ptr; 1465 } 1466 head->lio_nent = head->lio_refcnt = 0; 1467 sig_mutex_unlock(&head->lio_mutex); 1468 if (waiting == 0) 1469 _aio_lio_free(head); 1470 } else { 1471 head->lio_nent--; 1472 head->lio_refcnt--; 1473 sig_mutex_unlock(&head->lio_mutex); 1474 } 1475 } 1476 1477 /* 1478 * The request is completed; now perform the notifications. 1479 */ 1480 if (notify) { 1481 if (reqp != NULL) { 1482 /* 1483 * We usually put the request on the notification 1484 * queue because we don't want to block and delay 1485 * other operations behind us in the work queue. 1486 * Also we must never block on a cancel notification 1487 * because we are being called from an application 1488 * thread in this case and that could lead to deadlock 1489 * if no other thread is receiving notificatins. 1490 */ 1491 reqp->req_notify = np; 1492 reqp->req_op = AIONOTIFY; 1493 _aio_req_add(reqp, &__workers_no, AIONOTIFY); 1494 reqp = NULL; 1495 } else { 1496 /* 1497 * We already put the request on the done queue, 1498 * so we can't queue it to the notification queue. 1499 * Just do the notification directly. 1500 */ 1501 send_notification(&np); 1502 } 1503 } 1504 1505 if (reqp != NULL) 1506 _aio_req_free(reqp); 1507 } 1508 1509 /* 1510 * Delete fsync requests from list head until there is 1511 * only one left. Return 0 when there is only one, 1512 * otherwise return a non-zero value. 1513 */ 1514 static int 1515 _aio_fsync_del(aio_worker_t *aiowp, aio_req_t *reqp) 1516 { 1517 aio_lio_t *head = reqp->req_head; 1518 int rval = 0; 1519 1520 ASSERT(reqp == aiowp->work_req); 1521 sig_mutex_lock(&aiowp->work_qlock1); 1522 sig_mutex_lock(&head->lio_mutex); 1523 if (head->lio_refcnt > 1) { 1524 head->lio_refcnt--; 1525 head->lio_nent--; 1526 aiowp->work_req = NULL; 1527 sig_mutex_unlock(&head->lio_mutex); 1528 sig_mutex_unlock(&aiowp->work_qlock1); 1529 sig_mutex_lock(&__aio_mutex); 1530 _aio_outstand_cnt--; 1531 _aio_waitn_wakeup(); 1532 sig_mutex_unlock(&__aio_mutex); 1533 _aio_req_free(reqp); 1534 return (1); 1535 } 1536 ASSERT(head->lio_nent == 1 && head->lio_refcnt == 1); 1537 reqp->req_head = NULL; 1538 if (head->lio_canned) 1539 reqp->req_state = AIO_REQ_CANCELED; 1540 if (head->lio_mode == LIO_DESTROY) { 1541 aiowp->work_req = NULL; 1542 rval = 1; 1543 } 1544 sig_mutex_unlock(&head->lio_mutex); 1545 sig_mutex_unlock(&aiowp->work_qlock1); 1546 head->lio_refcnt--; 1547 head->lio_nent--; 1548 _aio_lio_free(head); 1549 if (rval != 0) 1550 _aio_req_free(reqp); 1551 return (rval); 1552 } 1553 1554 /* 1555 * A worker is set idle when its work queue is empty. 1556 * The worker checks again that it has no more work 1557 * and then goes to sleep waiting for more work. 1558 */ 1559 int 1560 _aio_idle(aio_worker_t *aiowp) 1561 { 1562 int error = 0; 1563 1564 sig_mutex_lock(&aiowp->work_qlock1); 1565 if (aiowp->work_count1 == 0) { 1566 ASSERT(aiowp->work_minload1 == 0); 1567 aiowp->work_idleflg = 1; 1568 /* 1569 * A cancellation handler is not needed here. 1570 * aio worker threads are never cancelled via pthread_cancel(). 1571 */ 1572 error = sig_cond_wait(&aiowp->work_idle_cv, 1573 &aiowp->work_qlock1); 1574 /* 1575 * The idle flag is normally cleared before worker is awakened 1576 * by aio_req_add(). On error (EINTR), we clear it ourself. 1577 */ 1578 if (error) 1579 aiowp->work_idleflg = 0; 1580 } 1581 sig_mutex_unlock(&aiowp->work_qlock1); 1582 return (error); 1583 } 1584 1585 /* 1586 * A worker's completed AIO requests are placed onto a global 1587 * done queue. The application is only sent a SIGIO signal if 1588 * the process has a handler enabled and it is not waiting via 1589 * aiowait(). 1590 */ 1591 static void 1592 _aio_work_done(aio_worker_t *aiowp) 1593 { 1594 aio_req_t *reqp; 1595 1596 sig_mutex_lock(&__aio_mutex); 1597 sig_mutex_lock(&aiowp->work_qlock1); 1598 reqp = aiowp->work_prev1; 1599 reqp->req_next = NULL; 1600 aiowp->work_done1 = 0; 1601 aiowp->work_tail1 = aiowp->work_next1; 1602 if (aiowp->work_tail1 == NULL) 1603 aiowp->work_head1 = NULL; 1604 aiowp->work_prev1 = NULL; 1605 _aio_outstand_cnt--; 1606 _aio_req_done_cnt--; 1607 if (reqp->req_state == AIO_REQ_CANCELED) { 1608 /* 1609 * Request got cancelled after it was marked done. This can 1610 * happen because _aio_finish_request() marks it AIO_REQ_DONE 1611 * and drops all locks. Don't add the request to the done 1612 * queue and just discard it. 1613 */ 1614 sig_mutex_unlock(&aiowp->work_qlock1); 1615 _aio_req_free(reqp); 1616 if (_aio_outstand_cnt == 0 && _aiowait_flag) { 1617 sig_mutex_unlock(&__aio_mutex); 1618 (void) _kaio(AIONOTIFY); 1619 } else { 1620 sig_mutex_unlock(&__aio_mutex); 1621 } 1622 return; 1623 } 1624 sig_mutex_unlock(&aiowp->work_qlock1); 1625 _aio_donecnt++; 1626 ASSERT(_aio_donecnt > 0 && 1627 _aio_outstand_cnt >= 0 && 1628 _aio_req_done_cnt >= 0); 1629 ASSERT(reqp != NULL); 1630 1631 if (_aio_done_tail == NULL) { 1632 _aio_done_head = _aio_done_tail = reqp; 1633 } else { 1634 _aio_done_head->req_next = reqp; 1635 _aio_done_head = reqp; 1636 } 1637 1638 if (_aiowait_flag) { 1639 sig_mutex_unlock(&__aio_mutex); 1640 (void) _kaio(AIONOTIFY); 1641 } else { 1642 sig_mutex_unlock(&__aio_mutex); 1643 if (_sigio_enabled) 1644 (void) kill(__pid, SIGIO); 1645 } 1646 } 1647 1648 /* 1649 * The done queue consists of AIO requests that are in either the 1650 * AIO_REQ_DONE or AIO_REQ_CANCELED state. Requests that were cancelled 1651 * are discarded. If the done queue is empty then NULL is returned. 1652 * Otherwise the address of a done aio_result_t is returned. 1653 */ 1654 aio_result_t * 1655 _aio_req_done(void) 1656 { 1657 aio_req_t *reqp; 1658 aio_result_t *resultp; 1659 1660 ASSERT(MUTEX_HELD(&__aio_mutex)); 1661 1662 if ((reqp = _aio_done_tail) != NULL) { 1663 if ((_aio_done_tail = reqp->req_next) == NULL) 1664 _aio_done_head = NULL; 1665 ASSERT(_aio_donecnt > 0); 1666 _aio_donecnt--; 1667 (void) _aio_hash_del(reqp->req_resultp); 1668 resultp = reqp->req_resultp; 1669 ASSERT(reqp->req_state == AIO_REQ_DONE); 1670 _aio_req_free(reqp); 1671 return (resultp); 1672 } 1673 /* is queue empty? */ 1674 if (reqp == NULL && _aio_outstand_cnt == 0) { 1675 return ((aio_result_t *)-1); 1676 } 1677 return (NULL); 1678 } 1679 1680 /* 1681 * Set the return and errno values for the application's use. 1682 * 1683 * For the Posix interfaces, we must set the return value first followed 1684 * by the errno value because the Posix interfaces allow for a change 1685 * in the errno value from EINPROGRESS to something else to signal 1686 * the completion of the asynchronous request. 1687 * 1688 * The opposite is true for the Solaris interfaces. These allow for 1689 * a change in the return value from AIO_INPROGRESS to something else 1690 * to signal the completion of the asynchronous request. 1691 */ 1692 void 1693 _aio_set_result(aio_req_t *reqp, ssize_t retval, int error) 1694 { 1695 aio_result_t *resultp = reqp->req_resultp; 1696 1697 if (POSIX_AIO(reqp)) { 1698 resultp->aio_return = retval; 1699 membar_producer(); 1700 resultp->aio_errno = error; 1701 } else { 1702 resultp->aio_errno = error; 1703 membar_producer(); 1704 resultp->aio_return = retval; 1705 } 1706 } 1707 1708 /* 1709 * Add an AIO request onto the next work queue. 1710 * A circular list of workers is used to choose the next worker. 1711 */ 1712 void 1713 _aio_req_add(aio_req_t *reqp, aio_worker_t **nextworker, int mode) 1714 { 1715 ulwp_t *self = curthread; 1716 aio_worker_t *aiowp; 1717 aio_worker_t *first; 1718 int load_bal_flg = 1; 1719 int found; 1720 1721 ASSERT(reqp->req_state != AIO_REQ_DONEQ); 1722 reqp->req_next = NULL; 1723 /* 1724 * Try to acquire the next worker's work queue. If it is locked, 1725 * then search the list of workers until a queue is found unlocked, 1726 * or until the list is completely traversed at which point another 1727 * worker will be created. 1728 */ 1729 sigoff(self); /* defer SIGIO */ 1730 sig_mutex_lock(&__aio_mutex); 1731 first = aiowp = *nextworker; 1732 if (mode != AIONOTIFY) 1733 _aio_outstand_cnt++; 1734 sig_mutex_unlock(&__aio_mutex); 1735 1736 switch (mode) { 1737 case AIOREAD: 1738 case AIOWRITE: 1739 case AIOAREAD: 1740 case AIOAWRITE: 1741 #if !defined(_LP64) 1742 case AIOAREAD64: 1743 case AIOAWRITE64: 1744 #endif 1745 /* try to find an idle worker */ 1746 found = 0; 1747 do { 1748 if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) { 1749 if (aiowp->work_idleflg) { 1750 found = 1; 1751 break; 1752 } 1753 sig_mutex_unlock(&aiowp->work_qlock1); 1754 } 1755 } while ((aiowp = aiowp->work_forw) != first); 1756 1757 if (found) { 1758 aiowp->work_minload1++; 1759 break; 1760 } 1761 1762 /* try to acquire some worker's queue lock */ 1763 do { 1764 if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) { 1765 found = 1; 1766 break; 1767 } 1768 } while ((aiowp = aiowp->work_forw) != first); 1769 1770 /* 1771 * Create more workers when the workers appear overloaded. 1772 * Either all the workers are busy draining their queues 1773 * or no worker's queue lock could be acquired. 1774 */ 1775 if (!found) { 1776 if (_aio_worker_cnt < _max_workers) { 1777 if (_aio_create_worker(reqp, mode)) 1778 aio_panic("_aio_req_add: add worker"); 1779 sigon(self); /* reenable SIGIO */ 1780 return; 1781 } 1782 1783 /* 1784 * No worker available and we have created 1785 * _max_workers, keep going through the 1786 * list slowly until we get a lock 1787 */ 1788 while (sig_mutex_trylock(&aiowp->work_qlock1) != 0) { 1789 /* 1790 * give someone else a chance 1791 */ 1792 _aio_delay(1); 1793 aiowp = aiowp->work_forw; 1794 } 1795 } 1796 1797 ASSERT(MUTEX_HELD(&aiowp->work_qlock1)); 1798 if (_aio_worker_cnt < _max_workers && 1799 aiowp->work_minload1 >= _minworkload) { 1800 sig_mutex_unlock(&aiowp->work_qlock1); 1801 sig_mutex_lock(&__aio_mutex); 1802 *nextworker = aiowp->work_forw; 1803 sig_mutex_unlock(&__aio_mutex); 1804 if (_aio_create_worker(reqp, mode)) 1805 aio_panic("aio_req_add: add worker"); 1806 sigon(self); /* reenable SIGIO */ 1807 return; 1808 } 1809 aiowp->work_minload1++; 1810 break; 1811 case AIOFSYNC: 1812 case AIONOTIFY: 1813 load_bal_flg = 0; 1814 sig_mutex_lock(&aiowp->work_qlock1); 1815 break; 1816 default: 1817 aio_panic("_aio_req_add: invalid mode"); 1818 break; 1819 } 1820 /* 1821 * Put request onto worker's work queue. 1822 */ 1823 if (aiowp->work_tail1 == NULL) { 1824 ASSERT(aiowp->work_count1 == 0); 1825 aiowp->work_tail1 = reqp; 1826 aiowp->work_next1 = reqp; 1827 } else { 1828 aiowp->work_head1->req_next = reqp; 1829 if (aiowp->work_next1 == NULL) 1830 aiowp->work_next1 = reqp; 1831 } 1832 reqp->req_state = AIO_REQ_QUEUED; 1833 reqp->req_worker = aiowp; 1834 aiowp->work_head1 = reqp; 1835 /* 1836 * Awaken worker if it is not currently active. 1837 */ 1838 if (aiowp->work_count1++ == 0 && aiowp->work_idleflg) { 1839 aiowp->work_idleflg = 0; 1840 (void) cond_signal(&aiowp->work_idle_cv); 1841 } 1842 sig_mutex_unlock(&aiowp->work_qlock1); 1843 1844 if (load_bal_flg) { 1845 sig_mutex_lock(&__aio_mutex); 1846 *nextworker = aiowp->work_forw; 1847 sig_mutex_unlock(&__aio_mutex); 1848 } 1849 sigon(self); /* reenable SIGIO */ 1850 } 1851 1852 /* 1853 * Get an AIO request for a specified worker. 1854 * If the work queue is empty, return NULL. 1855 */ 1856 aio_req_t * 1857 _aio_req_get(aio_worker_t *aiowp) 1858 { 1859 aio_req_t *reqp; 1860 1861 sig_mutex_lock(&aiowp->work_qlock1); 1862 if ((reqp = aiowp->work_next1) != NULL) { 1863 /* 1864 * Remove a POSIX request from the queue; the 1865 * request queue is a singularly linked list 1866 * with a previous pointer. The request is 1867 * removed by updating the previous pointer. 1868 * 1869 * Non-posix requests are left on the queue 1870 * to eventually be placed on the done queue. 1871 */ 1872 1873 if (POSIX_AIO(reqp)) { 1874 if (aiowp->work_prev1 == NULL) { 1875 aiowp->work_tail1 = reqp->req_next; 1876 if (aiowp->work_tail1 == NULL) 1877 aiowp->work_head1 = NULL; 1878 } else { 1879 aiowp->work_prev1->req_next = reqp->req_next; 1880 if (aiowp->work_head1 == reqp) 1881 aiowp->work_head1 = reqp->req_next; 1882 } 1883 1884 } else { 1885 aiowp->work_prev1 = reqp; 1886 ASSERT(aiowp->work_done1 >= 0); 1887 aiowp->work_done1++; 1888 } 1889 ASSERT(reqp != reqp->req_next); 1890 aiowp->work_next1 = reqp->req_next; 1891 ASSERT(aiowp->work_count1 >= 1); 1892 aiowp->work_count1--; 1893 switch (reqp->req_op) { 1894 case AIOREAD: 1895 case AIOWRITE: 1896 case AIOAREAD: 1897 case AIOAWRITE: 1898 #if !defined(_LP64) 1899 case AIOAREAD64: 1900 case AIOAWRITE64: 1901 #endif 1902 ASSERT(aiowp->work_minload1 > 0); 1903 aiowp->work_minload1--; 1904 break; 1905 } 1906 reqp->req_state = AIO_REQ_INPROGRESS; 1907 } 1908 aiowp->work_req = reqp; 1909 ASSERT(reqp != NULL || aiowp->work_count1 == 0); 1910 sig_mutex_unlock(&aiowp->work_qlock1); 1911 return (reqp); 1912 } 1913 1914 static void 1915 _aio_req_del(aio_worker_t *aiowp, aio_req_t *reqp, int ostate) 1916 { 1917 aio_req_t **last; 1918 aio_req_t *lastrp; 1919 aio_req_t *next; 1920 1921 ASSERT(aiowp != NULL); 1922 ASSERT(MUTEX_HELD(&aiowp->work_qlock1)); 1923 if (POSIX_AIO(reqp)) { 1924 if (ostate != AIO_REQ_QUEUED) 1925 return; 1926 } 1927 last = &aiowp->work_tail1; 1928 lastrp = aiowp->work_tail1; 1929 ASSERT(ostate == AIO_REQ_QUEUED || ostate == AIO_REQ_INPROGRESS); 1930 while ((next = *last) != NULL) { 1931 if (next == reqp) { 1932 *last = next->req_next; 1933 if (aiowp->work_next1 == next) 1934 aiowp->work_next1 = next->req_next; 1935 1936 /* 1937 * if this is the first request on the queue, move 1938 * the lastrp pointer forward. 1939 */ 1940 if (lastrp == next) 1941 lastrp = next->req_next; 1942 1943 /* 1944 * if this request is pointed by work_head1, then 1945 * make work_head1 point to the last request that is 1946 * present on the queue. 1947 */ 1948 if (aiowp->work_head1 == next) 1949 aiowp->work_head1 = lastrp; 1950 1951 /* 1952 * work_prev1 is used only in non posix case and it 1953 * points to the current AIO_REQ_INPROGRESS request. 1954 * If work_prev1 points to this request which is being 1955 * deleted, make work_prev1 NULL and set work_done1 1956 * to 0. 1957 * 1958 * A worker thread can be processing only one request 1959 * at a time. 1960 */ 1961 if (aiowp->work_prev1 == next) { 1962 ASSERT(ostate == AIO_REQ_INPROGRESS && 1963 !POSIX_AIO(reqp) && aiowp->work_done1 > 0); 1964 aiowp->work_prev1 = NULL; 1965 aiowp->work_done1--; 1966 } 1967 1968 if (ostate == AIO_REQ_QUEUED) { 1969 ASSERT(aiowp->work_count1 >= 1); 1970 aiowp->work_count1--; 1971 ASSERT(aiowp->work_minload1 >= 1); 1972 aiowp->work_minload1--; 1973 } 1974 return; 1975 } 1976 last = &next->req_next; 1977 lastrp = next; 1978 } 1979 /* NOTREACHED */ 1980 } 1981 1982 static void 1983 _aio_enq_doneq(aio_req_t *reqp) 1984 { 1985 if (_aio_doneq == NULL) { 1986 _aio_doneq = reqp; 1987 reqp->req_next = reqp->req_prev = reqp; 1988 } else { 1989 reqp->req_next = _aio_doneq; 1990 reqp->req_prev = _aio_doneq->req_prev; 1991 _aio_doneq->req_prev->req_next = reqp; 1992 _aio_doneq->req_prev = reqp; 1993 } 1994 reqp->req_state = AIO_REQ_DONEQ; 1995 _aio_doneq_cnt++; 1996 } 1997 1998 /* 1999 * caller owns the _aio_mutex 2000 */ 2001 aio_req_t * 2002 _aio_req_remove(aio_req_t *reqp) 2003 { 2004 if (reqp && reqp->req_state != AIO_REQ_DONEQ) 2005 return (NULL); 2006 2007 if (reqp) { 2008 /* request in done queue */ 2009 if (_aio_doneq == reqp) 2010 _aio_doneq = reqp->req_next; 2011 if (_aio_doneq == reqp) { 2012 /* only one request on queue */ 2013 _aio_doneq = NULL; 2014 } else { 2015 aio_req_t *tmp = reqp->req_next; 2016 reqp->req_prev->req_next = tmp; 2017 tmp->req_prev = reqp->req_prev; 2018 } 2019 } else if ((reqp = _aio_doneq) != NULL) { 2020 if (reqp == reqp->req_next) { 2021 /* only one request on queue */ 2022 _aio_doneq = NULL; 2023 } else { 2024 reqp->req_prev->req_next = _aio_doneq = reqp->req_next; 2025 _aio_doneq->req_prev = reqp->req_prev; 2026 } 2027 } 2028 if (reqp) { 2029 _aio_doneq_cnt--; 2030 reqp->req_next = reqp->req_prev = reqp; 2031 reqp->req_state = AIO_REQ_DONE; 2032 } 2033 return (reqp); 2034 } 2035 2036 /* 2037 * An AIO request is identified by an aio_result_t pointer. The library 2038 * maps this aio_result_t pointer to its internal representation using a 2039 * hash table. This function adds an aio_result_t pointer to the hash table. 2040 */ 2041 static int 2042 _aio_hash_insert(aio_result_t *resultp, aio_req_t *reqp) 2043 { 2044 aio_hash_t *hashp; 2045 aio_req_t **prev; 2046 aio_req_t *next; 2047 2048 hashp = _aio_hash + AIOHASH(resultp); 2049 lmutex_lock(&hashp->hash_lock); 2050 prev = &hashp->hash_ptr; 2051 while ((next = *prev) != NULL) { 2052 if (resultp == next->req_resultp) { 2053 lmutex_unlock(&hashp->hash_lock); 2054 return (-1); 2055 } 2056 prev = &next->req_link; 2057 } 2058 *prev = reqp; 2059 ASSERT(reqp->req_link == NULL); 2060 lmutex_unlock(&hashp->hash_lock); 2061 return (0); 2062 } 2063 2064 /* 2065 * Remove an entry from the hash table. 2066 */ 2067 aio_req_t * 2068 _aio_hash_del(aio_result_t *resultp) 2069 { 2070 aio_hash_t *hashp; 2071 aio_req_t **prev; 2072 aio_req_t *next = NULL; 2073 2074 if (_aio_hash != NULL) { 2075 hashp = _aio_hash + AIOHASH(resultp); 2076 lmutex_lock(&hashp->hash_lock); 2077 prev = &hashp->hash_ptr; 2078 while ((next = *prev) != NULL) { 2079 if (resultp == next->req_resultp) { 2080 *prev = next->req_link; 2081 next->req_link = NULL; 2082 break; 2083 } 2084 prev = &next->req_link; 2085 } 2086 lmutex_unlock(&hashp->hash_lock); 2087 } 2088 return (next); 2089 } 2090 2091 /* 2092 * find an entry in the hash table 2093 */ 2094 aio_req_t * 2095 _aio_hash_find(aio_result_t *resultp) 2096 { 2097 aio_hash_t *hashp; 2098 aio_req_t **prev; 2099 aio_req_t *next = NULL; 2100 2101 if (_aio_hash != NULL) { 2102 hashp = _aio_hash + AIOHASH(resultp); 2103 lmutex_lock(&hashp->hash_lock); 2104 prev = &hashp->hash_ptr; 2105 while ((next = *prev) != NULL) { 2106 if (resultp == next->req_resultp) 2107 break; 2108 prev = &next->req_link; 2109 } 2110 lmutex_unlock(&hashp->hash_lock); 2111 } 2112 return (next); 2113 } 2114 2115 /* 2116 * AIO interface for POSIX 2117 */ 2118 int 2119 _aio_rw(aiocb_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker, 2120 int mode, int flg) 2121 { 2122 aio_req_t *reqp; 2123 aio_args_t *ap; 2124 int kerr; 2125 2126 if (aiocbp == NULL) { 2127 errno = EINVAL; 2128 return (-1); 2129 } 2130 2131 /* initialize kaio */ 2132 if (!_kaio_ok) 2133 _kaio_init(); 2134 2135 aiocbp->aio_state = NOCHECK; 2136 2137 /* 2138 * If we have been called because a list I/O 2139 * kaio() failed, we dont want to repeat the 2140 * system call 2141 */ 2142 2143 if (flg & AIO_KAIO) { 2144 /* 2145 * Try kernel aio first. 2146 * If errno is ENOTSUP/EBADFD, 2147 * fall back to the thread implementation. 2148 */ 2149 if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) { 2150 aiocbp->aio_resultp.aio_errno = EINPROGRESS; 2151 aiocbp->aio_state = CHECK; 2152 kerr = (int)_kaio(mode, aiocbp); 2153 if (kerr == 0) 2154 return (0); 2155 if (errno != ENOTSUP && errno != EBADFD) { 2156 aiocbp->aio_resultp.aio_errno = errno; 2157 aiocbp->aio_resultp.aio_return = -1; 2158 aiocbp->aio_state = NOCHECK; 2159 return (-1); 2160 } 2161 if (errno == EBADFD) 2162 SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes); 2163 } 2164 } 2165 2166 aiocbp->aio_resultp.aio_errno = EINPROGRESS; 2167 aiocbp->aio_state = USERAIO; 2168 2169 if (!__uaio_ok && __uaio_init() == -1) 2170 return (-1); 2171 2172 if ((reqp = _aio_req_alloc()) == NULL) { 2173 errno = EAGAIN; 2174 return (-1); 2175 } 2176 2177 /* 2178 * If an LIO request, add the list head to the aio request 2179 */ 2180 reqp->req_head = lio_head; 2181 reqp->req_type = AIO_POSIX_REQ; 2182 reqp->req_op = mode; 2183 reqp->req_largefile = 0; 2184 2185 if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) { 2186 reqp->req_sigevent.sigev_notify = SIGEV_NONE; 2187 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) { 2188 reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL; 2189 reqp->req_sigevent.sigev_signo = 2190 aiocbp->aio_sigevent.sigev_signo; 2191 reqp->req_sigevent.sigev_value.sival_ptr = 2192 aiocbp->aio_sigevent.sigev_value.sival_ptr; 2193 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) { 2194 port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr; 2195 reqp->req_sigevent.sigev_notify = SIGEV_PORT; 2196 /* 2197 * Reuse the sigevent structure to contain the port number 2198 * and the user value. Same for SIGEV_THREAD, below. 2199 */ 2200 reqp->req_sigevent.sigev_signo = 2201 pn->portnfy_port; 2202 reqp->req_sigevent.sigev_value.sival_ptr = 2203 pn->portnfy_user; 2204 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) { 2205 reqp->req_sigevent.sigev_notify = SIGEV_THREAD; 2206 /* 2207 * The sigevent structure contains the port number 2208 * and the user value. Same for SIGEV_PORT, above. 2209 */ 2210 reqp->req_sigevent.sigev_signo = 2211 aiocbp->aio_sigevent.sigev_signo; 2212 reqp->req_sigevent.sigev_value.sival_ptr = 2213 aiocbp->aio_sigevent.sigev_value.sival_ptr; 2214 } 2215 2216 reqp->req_resultp = &aiocbp->aio_resultp; 2217 reqp->req_aiocbp = aiocbp; 2218 ap = &reqp->req_args; 2219 ap->fd = aiocbp->aio_fildes; 2220 ap->buf = (caddr_t)aiocbp->aio_buf; 2221 ap->bufsz = aiocbp->aio_nbytes; 2222 ap->offset = aiocbp->aio_offset; 2223 2224 if ((flg & AIO_NO_DUPS) && 2225 _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) { 2226 aio_panic("_aio_rw(): request already in hash table"); 2227 _aio_req_free(reqp); 2228 errno = EINVAL; 2229 return (-1); 2230 } 2231 _aio_req_add(reqp, nextworker, mode); 2232 return (0); 2233 } 2234 2235 #if !defined(_LP64) 2236 /* 2237 * 64-bit AIO interface for POSIX 2238 */ 2239 int 2240 _aio_rw64(aiocb64_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker, 2241 int mode, int flg) 2242 { 2243 aio_req_t *reqp; 2244 aio_args_t *ap; 2245 int kerr; 2246 2247 if (aiocbp == NULL) { 2248 errno = EINVAL; 2249 return (-1); 2250 } 2251 2252 /* initialize kaio */ 2253 if (!_kaio_ok) 2254 _kaio_init(); 2255 2256 aiocbp->aio_state = NOCHECK; 2257 2258 /* 2259 * If we have been called because a list I/O 2260 * kaio() failed, we dont want to repeat the 2261 * system call 2262 */ 2263 2264 if (flg & AIO_KAIO) { 2265 /* 2266 * Try kernel aio first. 2267 * If errno is ENOTSUP/EBADFD, 2268 * fall back to the thread implementation. 2269 */ 2270 if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) { 2271 aiocbp->aio_resultp.aio_errno = EINPROGRESS; 2272 aiocbp->aio_state = CHECK; 2273 kerr = (int)_kaio(mode, aiocbp); 2274 if (kerr == 0) 2275 return (0); 2276 if (errno != ENOTSUP && errno != EBADFD) { 2277 aiocbp->aio_resultp.aio_errno = errno; 2278 aiocbp->aio_resultp.aio_return = -1; 2279 aiocbp->aio_state = NOCHECK; 2280 return (-1); 2281 } 2282 if (errno == EBADFD) 2283 SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes); 2284 } 2285 } 2286 2287 aiocbp->aio_resultp.aio_errno = EINPROGRESS; 2288 aiocbp->aio_state = USERAIO; 2289 2290 if (!__uaio_ok && __uaio_init() == -1) 2291 return (-1); 2292 2293 if ((reqp = _aio_req_alloc()) == NULL) { 2294 errno = EAGAIN; 2295 return (-1); 2296 } 2297 2298 /* 2299 * If an LIO request, add the list head to the aio request 2300 */ 2301 reqp->req_head = lio_head; 2302 reqp->req_type = AIO_POSIX_REQ; 2303 reqp->req_op = mode; 2304 reqp->req_largefile = 1; 2305 2306 if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) { 2307 reqp->req_sigevent.sigev_notify = SIGEV_NONE; 2308 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) { 2309 reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL; 2310 reqp->req_sigevent.sigev_signo = 2311 aiocbp->aio_sigevent.sigev_signo; 2312 reqp->req_sigevent.sigev_value.sival_ptr = 2313 aiocbp->aio_sigevent.sigev_value.sival_ptr; 2314 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) { 2315 port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr; 2316 reqp->req_sigevent.sigev_notify = SIGEV_PORT; 2317 reqp->req_sigevent.sigev_signo = 2318 pn->portnfy_port; 2319 reqp->req_sigevent.sigev_value.sival_ptr = 2320 pn->portnfy_user; 2321 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) { 2322 reqp->req_sigevent.sigev_notify = SIGEV_THREAD; 2323 reqp->req_sigevent.sigev_signo = 2324 aiocbp->aio_sigevent.sigev_signo; 2325 reqp->req_sigevent.sigev_value.sival_ptr = 2326 aiocbp->aio_sigevent.sigev_value.sival_ptr; 2327 } 2328 2329 reqp->req_resultp = &aiocbp->aio_resultp; 2330 reqp->req_aiocbp = aiocbp; 2331 ap = &reqp->req_args; 2332 ap->fd = aiocbp->aio_fildes; 2333 ap->buf = (caddr_t)aiocbp->aio_buf; 2334 ap->bufsz = aiocbp->aio_nbytes; 2335 ap->offset = aiocbp->aio_offset; 2336 2337 if ((flg & AIO_NO_DUPS) && 2338 _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) { 2339 aio_panic("_aio_rw64(): request already in hash table"); 2340 _aio_req_free(reqp); 2341 errno = EINVAL; 2342 return (-1); 2343 } 2344 _aio_req_add(reqp, nextworker, mode); 2345 return (0); 2346 } 2347 #endif /* !defined(_LP64) */ 2348